From 2610239d62d744294e55d44e46937bd6dea87559 Mon Sep 17 00:00:00 2001 From: tiptorrent development team Date: Tue, 17 Aug 2021 00:05:31 +0200 Subject: initial commit --- src/core.c | 440 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 440 insertions(+) create mode 100644 src/core.c (limited to 'src/core.c') diff --git a/src/core.c b/src/core.c new file mode 100644 index 0000000..2b64347 --- /dev/null +++ b/src/core.c @@ -0,0 +1,440 @@ +/* + * Copyright (C) 2020-2021 Soleta Networks + * + * This program is free software: you can redistribute it and/or modify it under + * the terms of the GNU Affero General Public License as published by the + * Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + */ + +#include "core.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +struct ev_loop *tip_main_loop; + +int num_clients; +static LIST_HEAD(client_list); +static LIST_HEAD(client_redirect_list); + +static void tip_client_activate_pending(void); + +static void tip_client_release(struct ev_loop *loop, struct tip_client *cli) +{ + syslog(LOG_INFO, "closing connection with %s:%hu", + inet_ntoa(cli->addr.sin_addr), htons(cli->addr.sin_port)); + + list_del(&cli->list); + ev_io_stop(loop, &cli->io); + close(cli->io.fd); + if (cli->fd > 0) + close(cli->fd); + + free((void *)cli->uri); + free((void *)cli->path); + free(cli); + num_clients--; + + tip_client_activate_pending(); +} + +static int tip_client_payload_too_large(struct tip_client *cli) +{ + char buf[] = "HTTP/1.1 413 Payload Too Large\r\n" + "Content-Length: 0\r\n\r\n"; + + send(tip_client_socket(cli), buf, strlen(buf), 0); + + return -1; +} + +static int tip_client_state_recv_hdr(struct tip_client *cli) +{ + char *ptr; + + ptr = strstr(cli->buf, "\r\n\r\n"); + if (!ptr) + return 0; + + cli->msg_len = ptr - cli->buf + 4; + + ptr = strstr(cli->buf, "Content-Length: "); + if (ptr) { + sscanf(ptr, "Content-Length: %i[^\r\n]", &cli->content_length); + if (cli->content_length < 0) + return -1; + cli->msg_len += cli->content_length; + } + + ptr = strstr(cli->buf, "Authorization: "); + if (ptr) + sscanf(ptr, "Authorization: %63[^\r\n]", cli->auth_token); + + return 1; +} + +static int tip_client_recv(struct tip_client *cli, int events) +{ + struct ev_io *io = &cli->io; + int ret; + + ret = recv(io->fd, cli->buf + cli->buf_len, + sizeof(cli->buf) - cli->buf_len, 0); + if (ret <= 0) { + if (ret < 0) { + syslog(LOG_ERR, "error reading from client %s:%hu (%s)\n", + inet_ntoa(cli->addr.sin_addr), ntohs(cli->addr.sin_port), + strerror(errno)); + } + return ret; + } + + return ret; +} + +static void tip_client_read_cb(struct ev_loop *loop, struct ev_io *io, int events) +{ + struct tip_client *cli; + int ret; + + cli = container_of(io, struct tip_client, io); + + ret = tip_client_recv(cli, events); + if (ret <= 0) + goto close; + + ev_timer_again(loop, &cli->timer); + + cli->buf_len += ret; + if (cli->buf_len >= sizeof(cli->buf)) { + syslog(LOG_ERR, "client request from %s:%hu is too long\n", + inet_ntoa(cli->addr.sin_addr), ntohs(cli->addr.sin_port)); + tip_client_payload_too_large(cli); + goto close; + } + + switch (cli->state) { + case TIP_CLIENT_RECEIVING_HEADER: + ret = tip_client_state_recv_hdr(cli); + if (ret < 0) + goto close; + if (!ret) + return; + + cli->state = TIP_CLIENT_RECEIVING_PAYLOAD; + /* Fall through. */ + case TIP_CLIENT_RECEIVING_PAYLOAD: + /* Still not enough data to process request. */ + if (cli->buf_len < cli->msg_len) + return; + + cli->state = TIP_CLIENT_PROCESSING_REQUEST; + /* fall through. */ + case TIP_CLIENT_PROCESSING_REQUEST: + ret = tip_client_state_process_payload(cli); + if (ret > 0) { + /* client is pending. */ + return; + } else if (ret < 0) { + syslog(LOG_ERR, "Failed to process HTTP request from %s:%hu\n", + inet_ntoa(cli->addr.sin_addr), + ntohs(cli->addr.sin_port)); + goto close; + } + + ev_io_stop(loop, &cli->io); + ev_io_set(&cli->io, tip_client_socket(cli), EV_READ | EV_WRITE); + ev_io_start(loop, &cli->io); + break; + default: + syslog(LOG_ERR, "unknown read state, critical internal error\n"); + goto close; + } + return; +close: + ev_timer_stop(loop, &cli->timer); + tip_client_release(loop, cli); +} + +static void tip_client_redirect_timer_cb(struct ev_loop *loop, ev_timer *timer, + int events) +{ + struct tip_client_redirect *redir; + + redir = container_of(timer, struct tip_client_redirect, timer); + + syslog(LOG_ERR, "timeout for client redirection to %s:%hu for %s\n", + inet_ntoa(redir->addr.sin_addr), ntohs(redir->addr.sin_port), + redir->uri); + + list_del(&redir->list); + free((void *)redir->uri); + free(redir); +} + +static int tip_client_redirect_create(const struct tip_client *cli) +{ + struct tip_client_redirect *redir; + bool found = false; + + if (!redirect || !cli->allow_redirect) + return 0; + + list_for_each_entry(redir, &client_redirect_list, list) { + if (!strcmp(redir->uri, cli->uri) && + redir->addr.sin_addr.s_addr == cli->addr.sin_addr.s_addr) { + found = true; + break; + } + } + + if (found) { + syslog(LOG_INFO, "client redirection to %s:%hu for %s already exists, skipping", + inet_ntoa(cli->addr.sin_addr), htons(cli->addr.sin_port), + cli->uri); + return 0; + } + + redir = calloc(1, sizeof(struct tip_client_redirect)); + if (!redir) + return -1; + + redir->addr = cli->addr; + redir->addr.sin_port = htons(9999); + redir->uri = strdup(cli->uri); + list_add_tail(&redir->list, &client_redirect_list); + + ev_timer_init(&redir->timer, tip_client_redirect_timer_cb, 60, 0.); + ev_timer_start(tip_main_loop, &redir->timer); + + syslog(LOG_INFO, "adding client redirection to %s:%hu for %s", + inet_ntoa(redir->addr.sin_addr), htons(redir->addr.sin_port), + redir->uri); + + return 0; +} + +static void tip_client_write_cb(struct ev_loop *loop, struct ev_io *io, int events) +{ + struct tip_client *cli; + int ret; + + cli = container_of(io, struct tip_client, io); + + ev_timer_again(loop, &cli->timer); + + switch (cli->state) { + case TIP_CLIENT_PROCESSING_REQUEST_2: + ret = tip_client_state_process_payload_reply(cli); + if (ret > 0) { + goto close; + } else if (ret < 0) { + syslog(LOG_ERR, "Failed to process HTTP request from %s:%hu\n", + inet_ntoa(cli->addr.sin_addr), + ntohs(cli->addr.sin_port)); + goto close; + } + break; + case TIP_CLIENT_PROCESSING_REQUEST_3: + ret = tip_client_state_process_payload_bulk(cli); + if (ret > 0) + goto shutdown; + else if (ret < 0) { + syslog(LOG_ERR, "Failed to process HTTP request from %s:%hu\n", + inet_ntoa(cli->addr.sin_addr), + ntohs(cli->addr.sin_port)); + goto close; + } + break; + default: + syslog(LOG_ERR, "unknown write state, critical internal error\n"); + goto close; + } + return; +shutdown: + if (cli->size > FILE_SIZE_THRESHOLD) + tip_client_redirect_create(cli); +close: + ev_timer_stop(loop, &cli->timer); + tip_client_release(loop, cli); +} + +static void tip_client_cb(struct ev_loop *loop, struct ev_io *io, int events) +{ + if (events & EV_READ) + return tip_client_read_cb(loop, io, events); + if (events & EV_WRITE) + return tip_client_write_cb(loop, io, events); +} + +static void tip_client_timer_cb(struct ev_loop *loop, ev_timer *timer, int events) +{ + struct tip_client *cli; + + cli = container_of(timer, struct tip_client, timer); + + syslog(LOG_ERR, "timeout request for client %s:%hu\n", + inet_ntoa(cli->addr.sin_addr), ntohs(cli->addr.sin_port)); + + tip_client_release(loop, cli); +} + +/* Shut down connection if there is no data after 15 seconds. */ +#define TIP_CLIENT_TIMEOUT 15 + +static void tip_client_start(struct tip_client *cli) +{ + cli->state = TIP_CLIENT_RECEIVING_HEADER; + ev_io_start(tip_main_loop, &cli->io); + ev_timer_init(&cli->timer, tip_client_timer_cb, TIP_CLIENT_TIMEOUT, 0.); + ev_timer_start(tip_main_loop, &cli->timer); +} + +void tip_client_pending(struct tip_client *cli) +{ + ev_io_stop(tip_main_loop, &cli->io); + ev_timer_stop(tip_main_loop, &cli->timer); + cli->state = TIP_CLIENT_PENDING; +} + +static void tip_client_activate_pending(void) +{ + struct tip_client *cli, *next; + + list_for_each_entry_safe(cli, next, &client_list, list) { + if (cli->state != TIP_CLIENT_PENDING) + continue; + + tip_client_redirect(cli); + + ev_io_set(&cli->io, tip_client_socket(cli), EV_READ | EV_WRITE); + ev_io_start(tip_main_loop, &cli->io); + ev_timer_start(tip_main_loop, &cli->timer); + cli->state = TIP_CLIENT_PROCESSING_REQUEST_2; + break; + } +} + +bool tip_client_redirect(struct tip_client *cli) +{ + struct tip_client_redirect *redir, *next; + char addr[INET_ADDRSTRLEN + 1]; + + if (!redirect) + return false; + + inet_ntop(AF_INET, &cli->addr.sin_addr, addr, INET_ADDRSTRLEN); + + list_for_each_entry_safe(redir, next, &client_redirect_list, list) { + if (strcmp(redir->uri, cli->uri) || + redir->addr.sin_addr.s_addr == cli->addr.sin_addr.s_addr) + continue; + + cli->redirect = true; + cli->redirect_addr = redir->addr; + + syslog(LOG_INFO, "redirecting client %s:%hu to %s:%hu", + addr, htons(cli->addr.sin_port), + inet_ntoa(redir->addr.sin_addr), htons(redir->addr.sin_port)); + + free((void *)redir->uri); + ev_timer_stop(tip_main_loop, &redir->timer); + list_del(&redir->list); + free(redir); + + return true; + } + syslog(LOG_INFO, "no client redirections are available for %s:%hu", + addr, htons(cli->addr.sin_port)); + + return false; +} + +#define TIP_TCP_KEEPALIVE_IDLE 60 +#define TIP_TCP_KEEPALIVE_INTL 30 +#define TIP_TCP_KEEPALIVE_CNT 4 + +void tip_server_accept_cb(struct ev_loop *loop, struct ev_io *io, int events) +{ + int intl = TIP_TCP_KEEPALIVE_INTL, cnt = TIP_TCP_KEEPALIVE_CNT; + int on = 1, idle = TIP_TCP_KEEPALIVE_IDLE; + struct sockaddr_in client_addr; + socklen_t addrlen = sizeof(client_addr); + struct tip_client *cli; + int client_sd, flags; + + if (events & EV_ERROR) + return; + + client_sd = accept(io->fd, (struct sockaddr *)&client_addr, &addrlen); + if (client_sd < 0) { + syslog(LOG_ERR, "cannot accept client connection\n"); + return; + } + + flags = fcntl(client_sd, F_GETFL); + fcntl(client_sd, F_SETFL, flags | O_NONBLOCK); + + setsockopt(client_sd, SOL_SOCKET, SO_KEEPALIVE, &on, sizeof(int)); + setsockopt(client_sd, IPPROTO_TCP, TCP_KEEPIDLE, &idle, sizeof(int)); + setsockopt(client_sd, IPPROTO_TCP, TCP_KEEPINTVL, &intl, sizeof(int)); + setsockopt(client_sd, IPPROTO_TCP, TCP_KEEPCNT, &cnt, sizeof(int)); + + cli = (struct tip_client *)calloc(1, sizeof(struct tip_client)); + if (!cli) { + close(client_sd); + return; + } + memcpy(&cli->addr, &client_addr, sizeof(client_addr)); + cli->fd = -1; + + syslog(LOG_ERR, "accepting client connection from %s:%hu", + inet_ntoa(cli->addr.sin_addr), htons(cli->addr.sin_port)); + + list_add_tail(&cli->list, &client_list); + ev_io_init(&cli->io, tip_client_cb, client_sd, EV_READ); + + tip_client_start(cli); +} + +int tip_socket_server_init(const char *port) +{ + struct sockaddr_in local; + int sd, on = 1; + + sd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + if (sd < 0) { + syslog(LOG_ERR, "cannot create main socket\n"); + return -1; + } + setsockopt(sd, SOL_SOCKET, SO_REUSEPORT, &on, sizeof(int)); + + local.sin_addr.s_addr = htonl(INADDR_ANY); + local.sin_family = AF_INET; + local.sin_port = htons(atoi(port)); + + if (bind(sd, (struct sockaddr *) &local, sizeof(local)) < 0) { + close(sd); + syslog(LOG_ERR, "cannot bind socket\n"); + return -1; + } + + listen(sd, 250); + + return sd; +} -- cgit v1.2.3-18-g5258