/* * Copyright (C) 2021 Soleta Networks * * This program is free software: you can redistribute it and/or modify it under * the terms of the GNU Affero General Public License as published by the * Free Software Foundation; either version 3 of the License, or * (at your option) any later version. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define TIP_TORRENT_PORT 9999 /* number of chunks for files. */ #define MAX_CHUNKS 4 #define container_of(ptr, type, member) ({ \ typeof( ((type *)0)->member ) *__mptr = (ptr); \ (type *)( (char *)__mptr - offsetof(type,member) );}) static const char *filename; static const char *addr; struct ev_loop *tip_main_loop; enum { TIP_CLIENT_GET_HEADER, TIP_CLIENT_GET_PAYLOAD, TIP_CLIENT_POST_REDIRECT, TIP_CLIENT_HEAD_HEADER, TIP_CLIENT_DONE, }; struct tip_client { ev_io io; struct sockaddr_in addr; char buf[10240000]; uint32_t buf_len; uint64_t data_len; uint64_t content_len; uint64_t chunk_offset; int state; int num_retries; int fd; bool error; bool redirected; bool server_only; struct timeval tv_start, tv_last; const char *payload; }; static struct tip_client _cli = { .fd = -1, }; struct { uint32_t direct_from_server; uint32_t redirects; } tip_client_stats; static int tip_client_recv(struct tip_client *cli, int events) { struct ev_io *io = &cli->io; int ret; ret = recv(io->fd, cli->buf + cli->buf_len, sizeof(cli->buf) - cli->buf_len, 0); if (ret < 0) { syslog(LOG_ERR, "error reading from server %s:%hu (%s)\n", inet_ntoa(cli->addr.sin_addr), ntohs(cli->addr.sin_port), strerror(errno)); } return ret; } static void tip_client_close(struct tip_client *cli) { ev_io_stop(tip_main_loop, &cli->io); shutdown(cli->io.fd, SHUT_RDWR); close(cli->io.fd); cli->buf_len = 0; } static void tip_client_error(struct tip_client *cli) { cli->error = true; tip_client_close(cli); } /* display progress message every 30 seconds. */ #define TIP_CLIENT_PROGRESS 30 static void tip_client_progress(struct tip_client *cli, bool now) { struct timeval tv_cur, tv; gettimeofday(&tv_cur, NULL); timersub(&tv_cur, &cli->tv_last, &tv); if (now || tv.tv_sec >= TIP_CLIENT_PROGRESS) { timersub(&tv_cur, &cli->tv_start, &tv); printf("%3lu%% (%lu Mbytes/second) %s from %s:9999\n", cli->content_len > 0 ? 100 * cli->data_len / cli->content_len : 0, tv.tv_sec > 0 ? cli->data_len / 1024000 / tv.tv_sec : cli->data_len / 1024000, filename, inet_ntoa(cli->addr.sin_addr)); cli->tv_last = tv_cur; } } static int tip_client_connect(const char *addr); static int tip_client_get_hdr(struct tip_client *cli) { char *ptr, *trailer, *payload; char redirect_addr[32]; uint32_t payload_len; uint32_t header_len; int ret; ptr = strstr(cli->buf, "\r\n\r\n"); if (!ptr) return 0; if (!strncmp(cli->buf, "HTTP/1.1 404 Not Found", strlen("HTTP/1.1 404 Not Found"))) { syslog(LOG_ERR, "server says file `%s' not found\n", filename); return -1; } if (!strncmp(cli->buf, "HTTP/1.1 301 Moves Permanently", strlen("HTTP/1.1 301 Moves Permanently"))) { ptr = strstr(cli->buf, "Location:"); if (!ptr) return -1; ret = sscanf(ptr, "Location: http://%31s[^\r\n]", redirect_addr); if (ret != 1) return -1; ptr = strchr(redirect_addr, ':'); if (!ptr) return -1; ptr[0] = '\0'; syslog(LOG_INFO, "Redirected to %s to fetch file %s\n", redirect_addr, filename); cli->redirected = true; tip_client_close(cli); tip_client_connect(redirect_addr); cli->state = TIP_CLIENT_GET_HEADER; return 0; } trailer = ptr + 4; ptr = strstr(cli->buf, "Content-Length: "); if (!ptr) return -1; if (sscanf(ptr, "Content-Length: %lu[^\r\n]", &cli->content_len) != 1) return -1; if (cli->content_len < 0) return -1; tip_client_progress(cli, true); if (cli->content_len == 0) { cli->buf_len = 0; return 1; } lseek(cli->fd, cli->chunk_offset, SEEK_SET); header_len = trailer - cli->buf; payload = cli->buf + header_len; payload_len = cli->buf_len - header_len; cli->data_len += cli->buf_len; cli->buf_len = 0; gettimeofday(&cli->tv_start, NULL); cli->tv_last = cli->tv_start; if (payload_len > 0) { ret = write(cli->fd, payload, payload_len); if (ret < 0) { syslog(LOG_ERR, "failed to write to file %s: %s", filename, strerror(errno)); return ret; } } return 1; } static int tip_client_get_payload(struct tip_client *cli) { int ret; cli->data_len += cli->buf_len; ret = write(cli->fd, cli->buf, cli->buf_len); if (ret < 0) { syslog(LOG_ERR, "failed to write to file %s: %s", filename, strerror(errno)); return ret; } cli->buf_len = 0; tip_client_progress(cli, false); if (cli->data_len >= cli->content_len) { if (cli->redirected) { tip_client_close(cli); tip_client_connect(addr); cli->state = TIP_CLIENT_POST_REDIRECT; return 1; } cli->state = TIP_CLIENT_DONE; return 0; } return 1; } static int tip_client_head_hdr(struct tip_client *cli) { char *ptr; ptr = strstr(cli->buf, "\r\n\r\n"); if (!ptr) return 0; if (!strncmp(cli->buf, "HTTP/1.1 404 Not Found", strlen("HTTP/1.1 404 Not Found"))) { syslog(LOG_ERR, "server says file `%s' not found\n", filename); return -1; } ptr = strstr(cli->buf, "Content-Length: "); if (!ptr) return -1; if (sscanf(ptr, "Content-Length: %lu[^\r\n]", &cli->content_len) != 1) return -1; if (cli->content_len < 0) return -1; if (cli->content_len == 0) { syslog(LOG_ERR, "server reports zero size file %s", filename); return -1; } cli->fd = open(filename, O_WRONLY | O_CREAT | O_TRUNC, 0600); if (cli->fd < 0) { syslog(LOG_ERR, "failed to open file %s: %s", filename, strerror(errno)); return -1; } if (posix_fallocate(cli->fd, 0, cli->content_len) < 0) { syslog(LOG_ERR, "failed to allocate room for file %s: %s", filename, strerror(errno)); return -1; } cli->state = TIP_CLIENT_DONE; return 1; } static int tip_client_post_redirect(struct tip_client *cli) { char *ptr; ptr = strstr(cli->buf, "\r\n\r\n"); if (!ptr) return 0; if (strncmp(cli->buf, "HTTP/1.1 200 OK", strlen("HTTP/1.1 200 OK"))) return -1; ptr = strstr(cli->buf, "Content-Length: "); if (!ptr) return -1; if (sscanf(ptr, "Content-Length: %lu[^\r\n]", &cli->content_len) != 1) return -1; if (cli->content_len < 0) return -1; if (cli->content_len != 0) return -1; cli->state = TIP_CLIENT_DONE; return 0; } static void tip_client_read_cb(struct ev_loop *loop, struct ev_io *io, int events) { struct tip_client *cli; int ret; cli = container_of(io, struct tip_client, io); if (events & EV_ERROR) { tip_client_error(cli); return; } ret = tip_client_recv(cli, events); if (ret <= 0) goto error; cli->buf_len += ret; switch (cli->state) { case TIP_CLIENT_GET_HEADER: ret = tip_client_get_hdr(cli); if (ret < 0) goto error; if (!ret) return; cli->state = TIP_CLIENT_GET_PAYLOAD; /* Fall through. */ case TIP_CLIENT_GET_PAYLOAD: ret = tip_client_get_payload(cli); if (ret < 0) goto error; if (ret == 0) goto close; break; case TIP_CLIENT_HEAD_HEADER: ret = tip_client_head_hdr(cli); if (ret < 0) goto error; if (!ret) return; goto close; case TIP_CLIENT_POST_REDIRECT: ret = tip_client_post_redirect(cli); if (ret < 0) goto error; if (ret == 0) goto close; break; } return; error: tip_client_error(cli); return; close: tip_client_close(cli); return; } static void tip_client_connect_cb(struct ev_loop *loop, struct ev_io *io, int events) { struct tip_client *cli; char buf[PATH_MAX + 1]; int ret, len; cli = container_of(io, struct tip_client, io); if (events & EV_ERROR) { tip_client_error(cli); return; } len = sizeof(cli->addr); ret = connect(cli->io.fd, (struct sockaddr *)&cli->addr, len); if (ret < 0) { syslog(LOG_ERR, "failed to connect to server to fetch %s", filename); tip_client_error(cli); return; } switch (cli->state) { case TIP_CLIENT_GET_HEADER: syslog(LOG_INFO, "connected to %s to fetch file %s\n", inet_ntoa(cli->addr.sin_addr), filename); if (cli->server_only) snprintf(buf, sizeof(buf), "GET /%s HTTP/1.1\r\nX-Accept-Redirect: off\r\n\r\n", filename); else snprintf(buf, sizeof(buf), "GET /%s HTTP/1.1\r\n\r\n", filename); break; case TIP_CLIENT_HEAD_HEADER: syslog(LOG_INFO, "connected to %s to get file size of %s\n", inet_ntoa(cli->addr.sin_addr), filename); snprintf(buf, sizeof(buf), "HEAD /%s HTTP/1.1\r\n\r\n", filename); break; case TIP_CLIENT_POST_REDIRECT: syslog(LOG_INFO, "connected to %s to report redirection for %s\n", inet_ntoa(cli->addr.sin_addr), filename); snprintf(buf, sizeof(buf), "POST /%s HTTP/1.1\r\n\r\n", filename); break; } ret = send(cli->io.fd, buf, strlen(buf), 0); if (ret < 0) { syslog(LOG_ERR, "failed to send request for %s", filename); tip_client_error(cli); return; } ev_io_stop(tip_main_loop, &cli->io); ev_io_init(&cli->io, tip_client_read_cb, cli->io.fd, EV_READ); ev_io_start(tip_main_loop, &cli->io); } #define TIP_TCP_KEEPALIVE_IDLE 60 #define TIP_TCP_KEEPALIVE_INTL 30 #define TIP_TCP_KEEPALIVE_CNT 4 static int tip_client_connect(const char *addr) { int intl = TIP_TCP_KEEPALIVE_INTL, cnt = TIP_TCP_KEEPALIVE_CNT; int on = 1, idle = TIP_TCP_KEEPALIVE_IDLE; struct tip_client *cli = &_cli; int remote_fd; int flags; int len; int ret; remote_fd = socket(AF_INET, SOCK_STREAM, 0); if (remote_fd < 0) { tip_client_error(cli); return -1; } setsockopt(remote_fd, SOL_SOCKET, SO_KEEPALIVE, &on, sizeof(int)); setsockopt(remote_fd, IPPROTO_TCP, TCP_KEEPIDLE, &idle, sizeof(int)); setsockopt(remote_fd, IPPROTO_TCP, TCP_KEEPINTVL, &intl, sizeof(int)); setsockopt(remote_fd, IPPROTO_TCP, TCP_KEEPCNT, &cnt, sizeof(int)); flags = fcntl(remote_fd, F_GETFL); flags |= O_NONBLOCK; ret = fcntl(remote_fd, F_SETFL, flags); if (ret < 0) { tip_client_error(cli); return ret; } cli->addr.sin_family = AF_INET; cli->addr.sin_addr.s_addr = inet_addr(addr); cli->addr.sin_port = htons(TIP_TORRENT_PORT); len = sizeof(cli->addr); ret = connect(remote_fd, (struct sockaddr *)&cli->addr, len); if (ret < 0 && errno != EINPROGRESS) { syslog(LOG_ERR, "failed to connect to server to fetch %s", filename); tip_client_error(cli); return ret; } ev_io_init(&cli->io, tip_client_connect_cb, remote_fd, EV_WRITE); ev_io_start(tip_main_loop, &cli->io); return 0; } #define MAX_RETRIES 5 #define WAIT_RETRY 5 /* wait 5 seconds before retrying. */ static int tip_client_request_file(struct tip_client *cli, const char *server, const char *filename) { tip_client_connect(server); while (cli->state != TIP_CLIENT_DONE && !cli->error) ev_loop(tip_main_loop, 0); if (cli->error) { syslog(LOG_ERR, "Failed to fetch file %s\n", filename); sleep(WAIT_RETRY); if (cli->num_retries++ >= MAX_RETRIES) { syslog(LOG_ERR, "Maximum number of retries (%d), bailing out!\n", MAX_RETRIES); return -1; } cli->error = false; return 1; } return 0; } static uint32_t select_file_chunk(bool *file_chunk) { struct timeval tv; uint32_t k; int i; gettimeofday(&tv, NULL); srand(tv.tv_usec); k = rand() % MAX_CHUNKS; for (i = 0; i < MAX_CHUNKS; i++) { if (!file_chunk[k]) break; k++; if (k == MAX_CHUNKS) k = 0; } return k; } static void tip_client_reset_state(struct tip_client *cli, int fd, uint64_t chunk_offset) { memset(cli, 0, sizeof(*cli)); cli->chunk_offset = chunk_offset; cli->fd = fd; } static char _filename[PATH_MAX + 1]; int main(int argc, char *argv[]) { struct timeval tv_start, tv_stop, tv; uint64_t data_len = 0, file_size = 0; bool file_chunk[MAX_CHUNKS] = {}; uint64_t chunk_size; int i, k, fd, ret; if (argc != 3) { printf("%s [ip] [file]\n", argv[0]); return EXIT_FAILURE; } addr = argv[1]; openlog("tiptorrent-client", LOG_PID, LOG_DAEMON); signal(SIGPIPE, SIG_IGN); tip_main_loop = ev_default_loop(0); gettimeofday(&tv_start, NULL); do { filename = argv[2]; memset(&_cli, 0, sizeof(_cli)); _cli.state = TIP_CLIENT_HEAD_HEADER; ret = tip_client_request_file(&_cli, addr, filename); } while (ret > 0); if (ret < 0) goto err_max_retries; if (_cli.state != TIP_CLIENT_DONE) goto err; fd = _cli.fd; file_size = _cli.content_len; for (i = 0; i < MAX_CHUNKS; i++) { k = select_file_chunk(file_chunk); snprintf(_filename, sizeof(_filename), "%s.%u", argv[2], k); filename = _filename; chunk_size = file_size / MAX_CHUNKS; do { tip_client_reset_state(&_cli, fd, chunk_size * k); syslog(LOG_INFO, "Requesting file %s to server\n", filename); _cli.state = TIP_CLIENT_GET_HEADER; ret = tip_client_request_file(&_cli, addr, filename); } while (ret > 0); if (ret < 0) { do { tip_client_reset_state(&_cli, fd, chunk_size * k); _cli.server_only = true; syslog(LOG_INFO, "Requesting file %s to server only\n", filename); _cli.state = TIP_CLIENT_GET_HEADER; ret = tip_client_request_file(&_cli, addr, filename); } while (ret > 0); } if (ret < 0) goto err_max_retries; if (_cli.redirected) tip_client_stats.redirects++; else tip_client_stats.direct_from_server++; tip_client_progress(&_cli, true); file_chunk[k] = true; data_len += _cli.data_len; } close(fd); err: gettimeofday(&tv_stop, NULL); timersub(&tv_stop, &tv_start, &tv); if (_cli.state == TIP_CLIENT_DONE) { printf("OK.\n"); syslog(LOG_INFO, "Done in %lu.%06lu seconds (%lu Mbytes/second). " "Direct from server: %u Redirected: %u\n", tv.tv_sec, tv.tv_usec, tv.tv_sec > 0 ? data_len / 1024000 / tv.tv_sec : data_len / 1024000, tip_client_stats.direct_from_server, tip_client_stats.redirects); return EXIT_SUCCESS; } printf("Failure, see syslog for details.\n"); syslog(LOG_INFO, "Failure after %lu.%06lu seconds (%lu Mbytes/second). " "Direct from server: %u Redirected: %u\n", tv.tv_sec, tv.tv_usec, tv.tv_sec > 0 ? data_len / 1024000 / tv.tv_sec : data_len / 1024000, tip_client_stats.direct_from_server, tip_client_stats.redirects); return EXIT_FAILURE; err_max_retries: syslog(LOG_INFO, "Failure after maximum number of retries. " "Direct from server: %u Redirected: %u\n", tip_client_stats.direct_from_server, tip_client_stats.redirects); return EXIT_FAILURE; }