From f09aca620ccb7447f25abd04e05e31f994405005 Mon Sep 17 00:00:00 2001 From: OpenGnSys Support Team Date: Tue, 13 Nov 2018 11:30:46 +0100 Subject: #580 fix management of keepalive connections to clients OgAdmServer leaves a connection in keepalive more (similar to HTTP keepalive feature), the existing handling is not correct. The tbsocket table is never cleaned up and properly. Use the new og_client object that represents connections from the clients in tbsocket[] instead. The keepalive field now stores the index in the tbsocket table, so there is no need to consult mysql to fetch the slot that this client is using. This patch also extends syslog() support to include port number when reporting connections from clients. --- sources/ogAdmServer.cpp | 147 +++++++++++++++++++++++++++++++++--------------- sources/ogAdmServer.h | 4 +- 2 files changed, 105 insertions(+), 46 deletions(-) diff --git a/sources/ogAdmServer.cpp b/sources/ogAdmServer.cpp index a0a9585..1ed213d 100644 --- a/sources/ogAdmServer.cpp +++ b/sources/ogAdmServer.cpp @@ -116,7 +116,7 @@ struct og_client { char buf[4096]; unsigned int buf_len; unsigned int msg_len; - bool keepalive; + int keepalive_idx; }; static inline int og_client_socket(const struct og_client *cli) @@ -1200,7 +1200,7 @@ bool buscaComandos(char *ido, TRAMA *ptrTrama, int *ids) static bool DisponibilidadComandos(TRAMA *ptrTrama, struct og_client *cli) { char *iph, *tpc; - int idx,port_old=0,port_new; + int idx; iph = copiaParametro("iph",ptrTrama); // Toma ip if (!clienteExistente(iph, &idx)) { // Busca índice del cliente @@ -1210,18 +1210,7 @@ static bool DisponibilidadComandos(TRAMA *ptrTrama, struct og_client *cli) } tpc = copiaParametro("tpc",ptrTrama); // Tipo de cliente (Plataforma y S.O.) strcpy(tbsockets[idx].estado, tpc); - - port_new = tomaPuerto(og_client_socket(cli)); - - if (tbsockets[idx].sock != -1) { - port_old=tomaPuerto(tbsockets[idx].sock); - if(port_old!=port_new){ - close(tbsockets[idx].sock); // Cierra el socket si ya existia uno - } - } - - tbsockets[idx].sock = og_client_socket(cli); - cli->keepalive = true; + cli->keepalive_idx = idx; liberaMemoria(iph); liberaMemoria(tpc); return true; @@ -1346,8 +1335,10 @@ bool enviaComando(TRAMA* ptrTrama, const char *estado) FINCADaINTRO(ptrTrama); for (i = 0; i < lon; i++) { if (clienteDisponible(ptrIpes[i], &idx)) { // Si el cliente puede recibir comandos + int sock = tbsockets[idx].cli ? tbsockets[idx].cli->io.fd : -1; + strcpy(tbsockets[idx].estado, estado); // Actualiza el estado del cliente - if (!mandaTrama(&tbsockets[idx].sock, ptrTrama)) { + if (!mandaTrama(&sock, ptrTrama)) { og_log(26, false); return false; } @@ -3418,8 +3409,10 @@ static bool envioProgramacion(TRAMA *ptrTrama, struct og_client *cli) } if (clienteDisponible(iph, &idx)) { // Si el cliente puede recibir comandos + int sock = tbsockets[idx].cli ? tbsockets[idx].cli->io.fd : -1; + strcpy(tbsockets[idx].estado, CLIENTE_OCUPADO); // Actualiza el estado del cliente - if (!mandaTrama(&tbsockets[idx].sock, ptrTrama)) { + if (!mandaTrama(&sock, ptrTrama)) { og_log(26, false); return false; } @@ -3504,19 +3497,58 @@ static bool gestionaTrama(TRAMA *ptrTrama, struct og_client *cli) if (!strncmp(tbfuncionesServer[i].nf, nfn, strlen(tbfuncionesServer[i].nf))) { res = tbfuncionesServer[i].fcn(ptrTrama, cli); - syslog(LOG_INFO, "handling request %s (result=%d)\n", - tbfuncionesServer[i].nf, res); + syslog(LOG_INFO, "handling request %s (result=%d) for client %s:%hu\n", + tbfuncionesServer[i].nf, res, + inet_ntoa(cli->addr.sin_addr), + ntohs(cli->addr.sin_port)); break; } } if (!tbfuncionesServer[i].fcn) - syslog(LOG_ERR, "unknown request %s\n", nfn); + syslog(LOG_ERR, "unknown request %s from client %s:%hu\n", + nfn, inet_ntoa(cli->addr.sin_addr), + ntohs(cli->addr.sin_port)); liberaMemoria(nfn); } return res; } +static void og_client_release(struct ev_loop *loop, struct og_client *cli) +{ + if (cli->keepalive_idx >= 0) { + syslog(LOG_INFO, "closing keepalive connection for %s:%hu in slot %d\n", + inet_ntoa(cli->addr.sin_addr), + ntohs(cli->addr.sin_port), cli->keepalive_idx); + tbsockets[cli->keepalive_idx].cli = NULL; + } + + ev_io_stop(loop, &cli->io); + close(cli->io.fd); + free(cli); +} + +static void og_client_keepalive(struct ev_loop *loop, struct og_client *cli) +{ + struct og_client *old_cli; + + old_cli = tbsockets[cli->keepalive_idx].cli; + if (old_cli && old_cli != cli) { + syslog(LOG_INFO, "closing old keepalive connection for %s:%hu\n", + inet_ntoa(old_cli->addr.sin_addr), + ntohs(old_cli->addr.sin_port)); + + og_client_release(loop, old_cli); + } + tbsockets[cli->keepalive_idx].cli = cli; +} + +static void og_client_reset_state(struct og_client *cli) +{ + cli->state = OG_CLIENT_RECEIVING_HEADER; + cli->buf_len = 0; +} + static void og_client_read_cb(struct ev_loop *loop, struct ev_io *io, int events) { char hdrlen[LONHEXPRM]; @@ -3525,15 +3557,31 @@ static void og_client_read_cb(struct ev_loop *loop, struct ev_io *io, int events int ret, len; char *data; - if (events & EV_ERROR) - goto close; - cli = container_of(io, struct og_client, io); + if (events & EV_ERROR) { + syslog(LOG_ERR, "unexpected error event from client %s:%hu\n", + inet_ntoa(cli->addr.sin_addr), + ntohs(cli->addr.sin_port)); + goto close; + } + ret = recv(io->fd, cli->buf + cli->buf_len, sizeof(cli->buf) - cli->buf_len, 0); - if (ret <= 0) + if (ret <= 0) { + if (ret < 0) { + syslog(LOG_ERR, "error reading from client %s:%hu (%s)\n", + inet_ntoa(cli->addr.sin_addr), ntohs(cli->addr.sin_port), + strerror(errno)); + } else { + syslog(LOG_INFO, "closed connection by %s:%hu\n", + inet_ntoa(cli->addr.sin_addr), ntohs(cli->addr.sin_port)); + } goto close; + } + + if (cli->keepalive_idx >= 0) + return; ev_timer_again(loop, &cli->timer); @@ -3548,8 +3596,9 @@ static void og_client_read_cb(struct ev_loop *loop, struct ev_io *io, int events return; if (strncmp(cli->buf, "@JMMLCAMDJ_MCDJ", 15)) { - syslog(LOG_ERR, "bad fingerprint from client %s, closing\n", - inet_ntoa(cli->addr.sin_addr)); + syslog(LOG_ERR, "bad fingerprint from client %s:%hu, closing\n", + inet_ntoa(cli->addr.sin_addr), + ntohs(cli->addr.sin_port)); goto close; } @@ -3558,8 +3607,9 @@ static void og_client_read_cb(struct ev_loop *loop, struct ev_io *io, int events /* Header announces more that we can fit into buffer. */ if (cli->msg_len >= sizeof(cli->buf)) { - syslog(LOG_ERR, "too large message %u bytes from %s\n", - cli->msg_len, inet_ntoa(cli->addr.sin_addr)); + syslog(LOG_ERR, "too large message %u bytes from %s:%hu\n", + cli->msg_len, inet_ntoa(cli->addr.sin_addr), + ntohs(cli->addr.sin_port)); goto close; } @@ -3573,15 +3623,18 @@ static void og_client_read_cb(struct ev_loop *loop, struct ev_io *io, int events cli->state = OG_CLIENT_PROCESSING_REQUEST; /* fall through. */ case OG_CLIENT_PROCESSING_REQUEST: - syslog(LOG_INFO, "processing request from %s\n", - inet_ntoa(cli->addr.sin_addr)); + syslog(LOG_INFO, "processing request from %s:%hu\n", + inet_ntoa(cli->addr.sin_addr), + ntohs(cli->addr.sin_port)); len = cli->msg_len - (LONGITUD_CABECERATRAMA + LONHEXPRM); data = desencriptar(&cli->buf[LONGITUD_CABECERATRAMA + LONHEXPRM], &len); ptrTrama = (TRAMA *)reservaMemoria(sizeof(TRAMA)); - if (!ptrTrama) + if (!ptrTrama) { + syslog(LOG_ERR, "OOM\n"); goto close; + } initParametros(ptrTrama, len); memcpy(ptrTrama, cli->buf, LONGITUD_CABECERATRAMA); @@ -3594,8 +3647,17 @@ static void og_client_read_cb(struct ev_loop *loop, struct ev_io *io, int events liberaMemoria(ptrTrama->parametros); liberaMemoria(ptrTrama); - if (!cli->keepalive) + if (cli->keepalive_idx < 0) { + syslog(LOG_INFO, "server closing connection to %s:%hu\n", + inet_ntoa(cli->addr.sin_addr), ntohs(cli->addr.sin_port)); goto close; + } else { + syslog(LOG_INFO, "leaving client %s:%hu in keepalive mode\n", + inet_ntoa(cli->addr.sin_addr), + ntohs(cli->addr.sin_port)); + og_client_keepalive(loop, cli); + og_client_reset_state(cli); + } break; default: syslog(LOG_ERR, "unknown state, critical internal error\n"); @@ -3603,12 +3665,8 @@ static void og_client_read_cb(struct ev_loop *loop, struct ev_io *io, int events } return; close: - syslog(LOG_ERR, "closed connection by %s\n", - inet_ntoa(cli->addr.sin_addr)); ev_timer_stop(loop, &cli->timer); - ev_io_stop(loop, &cli->io); - close(cli->io.fd); - free(cli); + og_client_release(loop, cli); } static void og_client_timer_cb(struct ev_loop *loop, ev_timer *timer, int events) @@ -3616,16 +3674,14 @@ static void og_client_timer_cb(struct ev_loop *loop, ev_timer *timer, int events struct og_client *cli; cli = container_of(timer, struct og_client, timer); - if (cli->keepalive) { + if (cli->keepalive_idx >= 0) { ev_timer_again(loop, &cli->timer); return; } - syslog(LOG_ERR, "timeout request for client %s\n", - inet_ntoa(cli->addr.sin_addr)); + syslog(LOG_ERR, "timeout request for client %s:%hu\n", + inet_ntoa(cli->addr.sin_addr), ntohs(cli->addr.sin_port)); - ev_io_stop(loop, &cli->io); - close(cli->io.fd); - free(cli); + og_client_release(loop, cli); } static void og_server_accept_cb(struct ev_loop *loop, struct ev_io *io, @@ -3651,9 +3707,10 @@ static void og_server_accept_cb(struct ev_loop *loop, struct ev_io *io, return; } memcpy(&cli->addr, &client_addr, sizeof(client_addr)); + cli->keepalive_idx = -1; - syslog(LOG_INFO, "connection from client %s\n", - inet_ntoa(cli->addr.sin_addr)); + syslog(LOG_INFO, "connection from client %s:%hu\n", + inet_ntoa(cli->addr.sin_addr), ntohs(cli->addr.sin_port)); ev_io_init(&cli->io, og_client_read_cb, client_sd, EV_READ); ev_io_start(loop, &cli->io); @@ -3687,7 +3744,7 @@ int main(int argc, char *argv[]) ---------------------------------------------------------------------------------------------------------*/ for (i = 0; i < MAXIMOS_CLIENTES; i++) { tbsockets[i].ip[0] = '\0'; - tbsockets[i].sock = -1; + tbsockets[i].cli = NULL; } /*-------------------------------------------------------------------------------------------------------- Creación y configuración del socket del servicio diff --git a/sources/ogAdmServer.h b/sources/ogAdmServer.h index d0e36dd..4b4cdc5 100644 --- a/sources/ogAdmServer.h +++ b/sources/ogAdmServer.h @@ -27,10 +27,12 @@ char servidoradm[LONPRM]; // Dirección IP del servidor de administración char puerto[LONPRM]; // Puerto de comunicación +struct og_client; + typedef struct{ // Estructura usada para guardar información de los clientes char ip[LONIP]; // IP del cliente char estado[4]; // Tipo de Sistema Operativo en que se encuentra el cliente - SOCKET sock; // Socket por el que se comunica + struct og_client *cli; }SOCKETCL; SOCKETCL tbsockets[MAXIMOS_CLIENTES]; -- cgit v1.2.3-18-g5258