/* * Copyright (c) 2001-2002 Packet Design, LLC. * All rights reserved. * * Subject to the following obligations and disclaimer of warranty, * use and redistribution of this software, in source or object code * forms, with or without modifications are expressly permitted by * Packet Design; provided, however, that: * * (i) Any and all reproductions of the source or object code * must include the copyright notice above and the following * disclaimer of warranties; and * (ii) No rights are granted, in any manner or form, to use * Packet Design trademarks, including the mark "PACKET DESIGN" * on advertising, endorsements, or otherwise except as such * appears in the above copyright notice or in the software. * * THIS SOFTWARE IS BEING PROVIDED BY PACKET DESIGN "AS IS", AND * TO THE MAXIMUM EXTENT PERMITTED BY LAW, PACKET DESIGN MAKES NO * REPRESENTATIONS OR WARRANTIES, EXPRESS OR IMPLIED, REGARDING * THIS SOFTWARE, INCLUDING WITHOUT LIMITATION, ANY AND ALL IMPLIED * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, * OR NON-INFRINGEMENT. PACKET DESIGN DOES NOT WARRANT, GUARANTEE, * OR MAKE ANY REPRESENTATIONS REGARDING THE USE OF, OR THE RESULTS * OF THE USE OF THIS SOFTWARE IN TERMS OF ITS CORRECTNESS, ACCURACY, * RELIABILITY OR OTHERWISE. IN NO EVENT SHALL PACKET DESIGN BE * LIABLE FOR ANY DAMAGES RESULTING FROM OR ARISING OUT OF ANY USE * OF THIS SOFTWARE, INCLUDING WITHOUT LIMITATION, ANY DIRECT, * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, PUNITIVE, OR CONSEQUENTIAL * DAMAGES, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES, LOSS OF * USE, DATA OR PROFITS, HOWEVER CAUSED AND UNDER ANY THEORY OF * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF * THE USE OF THIS SOFTWARE, EVEN IF PACKET DESIGN IS ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. * * Author: Archie Cobbs */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "structs/structs.h" #include "structs/type/array.h" #include "util/pevent.h" #include "util/typed_mem.h" #include "io/timeout_fp.h" #include "net/tcp_server.h" #include "sys/alog.h" /* How long to pause when we reach max # connections */ #define TCP_SERVER_PAUSE 250 /* 0.25 sec */ /* Server state */ struct tcp_server { struct pevent_ctx *ctx; /* event context */ struct pevent *conn_event; /* incoming connection event */ struct pevent *wait_event; /* pause timeout event */ struct sockaddr_in addr; /* server bound address */ pthread_mutex_t mutex; /* server mutex */ u_int num_conn; /* # connections */ u_int max_conn; /* max # connections */ u_int conn_timeout; /* timeout for connections */ int sock; /* listening socket */ TAILQ_HEAD(, tcp_connection) conn_list; /* connection list */ void *cookie; /* application private data */ tcp_setup_t *setup; /* connection setup handler */ tcp_handler_t *handler; /* connection handler handler */ tcp_teardown_t *teardown; /* connection teardown handlr */ const char *mtype; /* typed memory type string */ char mtype_buf[TYPED_MEM_TYPELEN]; }; /* Connection state */ struct tcp_connection { pthread_t tid; /* connection thread */ struct tcp_server *server; /* associated server */ struct sockaddr_in peer; /* remote side address */ u_char started; /* thread has started */ u_char destruct; /* object needs teardown */ int sock; /* connection socket */ FILE *fp; /* connection stream (unbuf) */ TAILQ_ENTRY(tcp_connection) next; /* next in connection list */ void *cookie; /* application private data */ }; /* Internal functions */ static void *tcp_server_connection_main(void *arg); static void tcp_server_connection_cleanup(void *arg); static pevent_handler_t tcp_server_accept; static pevent_handler_t tcp_server_restart; /* * Start a new TCP server */ struct tcp_server * tcp_server_start(struct pevent_ctx *ctx, void *cookie, const char *mtype, struct in_addr ip, u_int16_t port, u_int max_conn, u_int conn_timeout, tcp_setup_t *setup, tcp_handler_t *handler, tcp_teardown_t *teardown) { static const int one = 1; struct tcp_server *serv = NULL; /* Get new object */ if ((serv = MALLOC(mtype, sizeof(*serv))) == NULL) { alogf(LOG_ERR, "%s: %m", "malloc"); goto fail; } memset(serv, 0, sizeof(*serv)); serv->ctx = ctx; serv->cookie = cookie; serv->sock = -1; serv->max_conn = max_conn; serv->conn_timeout = conn_timeout; serv->setup = setup; serv->handler = handler; serv->teardown = teardown; TAILQ_INIT(&serv->conn_list); if (mtype != NULL) { strlcpy(serv->mtype_buf, mtype, sizeof(serv->mtype_buf)); serv->mtype = serv->mtype_buf; } /* Create and bind socket */ if ((serv->sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1) { alogf(LOG_ERR, "%s: %m", "socket"); goto fail; } (void)fcntl(serv->sock, F_SETFD, 1); if (setsockopt(serv->sock, SOL_SOCKET, SO_REUSEADDR, (char *)&one, sizeof(one)) == -1) { alogf(LOG_ERR, "%s: %m", "setsockopt"); goto fail; } #ifdef SO_REUSEPORT if (setsockopt(serv->sock, SOL_SOCKET, SO_REUSEPORT, (char *)&one, sizeof(one)) == -1) { alogf(LOG_ERR, "%s: %m", "setsockopt"); goto fail; } #endif memset(&serv->addr, 0, sizeof(serv->addr)); #ifndef __linux__ serv->addr.sin_len = sizeof(serv->addr); #endif serv->addr.sin_family = AF_INET; serv->addr.sin_port = htons(port); serv->addr.sin_addr = ip; if (bind(serv->sock, (struct sockaddr *)&serv->addr, sizeof(serv->addr)) == -1) { alogf(LOG_ERR, "%s: %m", "bind"); goto fail; } if (listen(serv->sock, 1024) == -1) { alogf(LOG_ERR, "%s: %m", "listen"); goto fail; } /* Accept incoming connections */ if (pevent_register(serv->ctx, &serv->conn_event, PEVENT_RECURRING, &serv->mutex, tcp_server_accept, serv, PEVENT_READ, serv->sock) == -1) { alogf(LOG_ERR, "%s: %m", "pevent_register"); goto fail; } /* Create mutex */ if ((errno = pthread_mutex_init(&serv->mutex, NULL)) != 0) { alogf(LOG_ERR, "%s: %m", "pthread_mutex_init"); goto fail; } /* Done */ return (serv); fail: /* Clean up and return error */ if (serv != NULL) { pevent_unregister(&serv->conn_event); if (serv->sock != -1) (void)close(serv->sock); FREE(serv->mtype, serv); } return (NULL); } /* * Stop a TCP server */ void tcp_server_stop(struct tcp_server **servp) { struct tcp_server *const serv = *servp; struct tcp_connection *conn; int r; /* Sanity */ if (serv == NULL) return; *servp = NULL; /* Acquire mutex */ r = pthread_mutex_lock(&serv->mutex); assert(r == 0); /* Stop accepting new connections */ pevent_unregister(&serv->conn_event); pevent_unregister(&serv->wait_event); /* Close listen socket */ (void)close(serv->sock); serv->sock = -1; /* Kill all outstanding connections */ while (!TAILQ_EMPTY(&serv->conn_list)) { /* Kill active connections; they will clean up themselves */ TAILQ_FOREACH(conn, &serv->conn_list, next) { if (conn->started && conn->tid != 0) { pthread_cancel(conn->tid); conn->tid = 0; /* don't cancel twice */ } } /* Wait for outstanding connections to complete */ r = pthread_mutex_unlock(&serv->mutex); assert(r == 0); usleep(100000); r = pthread_mutex_lock(&serv->mutex); assert(r == 0); } /* Free server structure */ r = pthread_mutex_unlock(&serv->mutex); assert(r == 0); pthread_mutex_destroy(&serv->mutex); FREE(serv->mtype, serv); } /* * Get server cookie. */ void * tcp_server_get_cookie(struct tcp_server *serv) { return (serv->cookie); } /* * Get connection cookie. */ void * tcp_connection_get_cookie(struct tcp_connection *conn) { return (conn->cookie); } /* * Get connection file descriptor. */ int tcp_connection_get_fd(struct tcp_connection *conn) { return (conn->sock); } /* * Get connection file stream. */ FILE * tcp_connection_get_fp(struct tcp_connection *conn) { return (conn->fp); } /* * Get peer's address. */ void tcp_connection_get_peer(struct tcp_connection *conn, struct sockaddr_in *sin) { memcpy(sin, &conn->peer, sizeof(*sin)); } /********************************************************************* NEW CONNECTION ACCEPTOR *********************************************************************/ /* * Accept a new incoming connection. * * This will be called with the server mutex acquired. */ static void tcp_server_accept(void *arg) { struct tcp_server *const serv = arg; struct tcp_connection *conn; socklen_t slen = sizeof(conn->peer); struct sockaddr_in sin; int sock; /* If maximum number of connections reached, pause a while */ if (serv->max_conn > 0 && serv->num_conn >= serv->max_conn) { pevent_unregister(&serv->wait_event); pevent_unregister(&serv->conn_event); if (pevent_register(serv->ctx, &serv->wait_event, 0, &serv->mutex, tcp_server_restart, serv, PEVENT_TIME, TCP_SERVER_PAUSE) == -1) alogf(LOG_ERR, "%s: %m", "pevent_register"); return; } /* Accept next connection */ if ((sock = accept(serv->sock, (struct sockaddr *)&sin, &slen)) == -1) { if (errno != ECONNABORTED && errno != ENOTCONN) alogf(LOG_ERR, "%s: %m", "accept"); return; } (void)fcntl(sock, F_SETFD, 1); /* Create connection state structure */ if ((conn = MALLOC(serv->mtype, sizeof(*conn))) == NULL) { alogf(LOG_ERR, "%s: %m", "malloc"); (void)close(sock); return; } memset(conn, 0, sizeof(*conn)); conn->server = serv; conn->sock = sock; conn->peer = sin; /* Put stream on top of file descriptor */ if ((conn->fp = timeout_fdopen(conn->sock, "r+", serv->conn_timeout)) == NULL) { alogf(LOG_ERR, "%s: %m", "timeout_fdopen"); (void)close(conn->sock); FREE(serv->mtype, conn); return; } setbuf(conn->fp, NULL); /* Spawn connection thread */ if ((errno = pthread_create(&conn->tid, NULL, tcp_server_connection_main, conn)) != 0) { conn->tid = 0; alogf(LOG_ERR, "%s: %m", "pthread_create"); fclose(conn->fp); FREE(serv->mtype, conn); return; } /* Detach thread */ pthread_detach(conn->tid); /* Add connection to list */ TAILQ_INSERT_TAIL(&serv->conn_list, conn, next); serv->num_conn++; } /* * Start accepting new connections after waiting a while. * * This will be called with the server mutex acquired. */ static void tcp_server_restart(void *arg) { struct tcp_server *const serv = arg; /* Accept incoming connections again */ pevent_unregister(&serv->wait_event); pevent_unregister(&serv->conn_event); if (pevent_register(serv->ctx, &serv->conn_event, 0, &serv->mutex, tcp_server_accept, serv, PEVENT_READ, serv->sock) == -1) alogf(LOG_ERR, "%s: %m", "pevent_register"); } /********************************************************************* TCP CONNECTION THREAD *********************************************************************/ /* * Connection thread main entry point. */ static void * tcp_server_connection_main(void *arg) { struct tcp_connection *const conn = arg; struct tcp_server *const serv = conn->server; /* Push cleanup hook */ pthread_cleanup_push(tcp_server_connection_cleanup, conn); conn->started = 1; /* now it's ok to cancel me */ /* Call application's setup routine */ if (serv->setup != NULL && (conn->cookie = (*serv->setup)(conn)) == NULL) goto done; conn->destruct = 1; /* Invoke application handler */ (*serv->handler)(conn); done:; /* Done */ pthread_cleanup_pop(1); return (NULL); } /* * Cleanup routine for tcp_server_connection_main(). */ static void tcp_server_connection_cleanup(void *arg) { struct tcp_connection *const conn = arg; struct tcp_server *const serv = conn->server; int r; /* Call application destructor */ if (conn->destruct && serv->teardown != NULL) { conn->destruct = 0; (*serv->teardown)(conn); } /* Close connection */ if (conn->fp != NULL) { (void)fclose(conn->fp); conn->sock = -1; } /* Unlink from server list */ r = pthread_mutex_lock(&serv->mutex); assert(r == 0); serv->num_conn--; TAILQ_REMOVE(&serv->conn_list, conn, next); r = pthread_mutex_unlock(&serv->mutex); assert(r == 0); /* Release connection object */ FREE(serv->mtype, conn); }