File:  [ELWIX - Embedded LightWeight unIX -] / embedaddon / libpdel / net / tcp_server.c
Revision 1.1.1.1 (vendor branch): download - view: text, annotated - select for diffs - revision graph
Tue Feb 21 23:25:53 2012 UTC (13 years, 4 months ago) by misho
Branches: libpdel, MAIN
CVS tags: v0_5_3, HEAD
libpdel

    1: 
    2: /*
    3:  * Copyright (c) 2001-2002 Packet Design, LLC.
    4:  * All rights reserved.
    5:  * 
    6:  * Subject to the following obligations and disclaimer of warranty,
    7:  * use and redistribution of this software, in source or object code
    8:  * forms, with or without modifications are expressly permitted by
    9:  * Packet Design; provided, however, that:
   10:  * 
   11:  *    (i)  Any and all reproductions of the source or object code
   12:  *         must include the copyright notice above and the following
   13:  *         disclaimer of warranties; and
   14:  *    (ii) No rights are granted, in any manner or form, to use
   15:  *         Packet Design trademarks, including the mark "PACKET DESIGN"
   16:  *         on advertising, endorsements, or otherwise except as such
   17:  *         appears in the above copyright notice or in the software.
   18:  * 
   19:  * THIS SOFTWARE IS BEING PROVIDED BY PACKET DESIGN "AS IS", AND
   20:  * TO THE MAXIMUM EXTENT PERMITTED BY LAW, PACKET DESIGN MAKES NO
   21:  * REPRESENTATIONS OR WARRANTIES, EXPRESS OR IMPLIED, REGARDING
   22:  * THIS SOFTWARE, INCLUDING WITHOUT LIMITATION, ANY AND ALL IMPLIED
   23:  * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE,
   24:  * OR NON-INFRINGEMENT.  PACKET DESIGN DOES NOT WARRANT, GUARANTEE,
   25:  * OR MAKE ANY REPRESENTATIONS REGARDING THE USE OF, OR THE RESULTS
   26:  * OF THE USE OF THIS SOFTWARE IN TERMS OF ITS CORRECTNESS, ACCURACY,
   27:  * RELIABILITY OR OTHERWISE.  IN NO EVENT SHALL PACKET DESIGN BE
   28:  * LIABLE FOR ANY DAMAGES RESULTING FROM OR ARISING OUT OF ANY USE
   29:  * OF THIS SOFTWARE, INCLUDING WITHOUT LIMITATION, ANY DIRECT,
   30:  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, PUNITIVE, OR CONSEQUENTIAL
   31:  * DAMAGES, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES, LOSS OF
   32:  * USE, DATA OR PROFITS, HOWEVER CAUSED AND UNDER ANY THEORY OF
   33:  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
   34:  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
   35:  * THE USE OF THIS SOFTWARE, EVEN IF PACKET DESIGN IS ADVISED OF
   36:  * THE POSSIBILITY OF SUCH DAMAGE.
   37:  *
   38:  * Author: Archie Cobbs <archie@freebsd.org>
   39:  */
   40: 
   41: #include <sys/types.h>
   42: #include <sys/queue.h>
   43: #include <sys/socket.h>
   44: #include <netinet/in.h>
   45: #include <arpa/inet.h>
   46: 
   47: #include <assert.h>
   48: #include <errno.h>
   49: #include <fcntl.h>
   50: #include <stdio.h>
   51: #include <stdlib.h>
   52: #include <syslog.h>
   53: #include <string.h>
   54: #include <unistd.h>
   55: #include <stdarg.h>
   56: #include <pthread.h>
   57: 
   58: #include "structs/structs.h"
   59: #include "structs/type/array.h"
   60: 
   61: #include "util/pevent.h"
   62: #include "util/typed_mem.h"
   63: #include "io/timeout_fp.h"
   64: #include "net/tcp_server.h"
   65: #include "sys/alog.h"
   66: 
   67: /* How long to pause when we reach max # connections */
   68: #define TCP_SERVER_PAUSE	250		/* 0.25 sec */
   69: 
   70: /* Server state */
   71: struct tcp_server {
   72: 	struct pevent_ctx	*ctx;		/* event context */
   73: 	struct pevent		*conn_event;	/* incoming connection event */
   74: 	struct pevent		*wait_event;	/* pause timeout event */
   75: 	struct sockaddr_in	addr;		/* server bound address */
   76: 	pthread_mutex_t		mutex;		/* server mutex */
   77: 	u_int			num_conn;	/* # connections */
   78: 	u_int			max_conn;	/* max # connections */
   79: 	u_int			conn_timeout;	/* timeout for connections */
   80: 	int			sock;		/* listening socket */
   81: 	TAILQ_HEAD(, tcp_connection) conn_list;	/* connection list */
   82: 	void			*cookie;	/* application private data */
   83: 	tcp_setup_t		*setup;		/* connection setup handler */
   84: 	tcp_handler_t		*handler;	/* connection handler handler */
   85: 	tcp_teardown_t		*teardown;	/* connection teardown handlr */
   86: 	const char		*mtype;		/* typed memory type string */
   87: 	char			mtype_buf[TYPED_MEM_TYPELEN];
   88: };
   89: 
   90: /* Connection state */
   91: struct tcp_connection {
   92: 	pthread_t		tid;		/* connection thread */
   93: 	struct tcp_server	*server;	/* associated server */
   94: 	struct sockaddr_in	peer;		/* remote side address */
   95: 	u_char			started;	/* thread has started */
   96: 	u_char			destruct;	/* object needs teardown */
   97: 	int			sock;		/* connection socket */
   98: 	FILE			*fp;		/* connection stream (unbuf) */
   99: 	TAILQ_ENTRY(tcp_connection) next;	/* next in connection list */
  100: 	void			*cookie;	/* application private data */
  101: };
  102: 
  103: /* Internal functions */
  104: static void	*tcp_server_connection_main(void *arg);
  105: static void	tcp_server_connection_cleanup(void *arg);
  106: 
  107: static pevent_handler_t	tcp_server_accept;
  108: static pevent_handler_t	tcp_server_restart;
  109: 
  110: /*
  111:  * Start a new TCP server
  112:  */
  113: struct tcp_server *
  114: tcp_server_start(struct pevent_ctx *ctx, void *cookie, const char *mtype,
  115: 	struct in_addr ip, u_int16_t port, u_int max_conn, u_int conn_timeout,
  116: 	tcp_setup_t *setup, tcp_handler_t *handler, tcp_teardown_t *teardown)
  117: {
  118: 	static const int one = 1;
  119: 	struct tcp_server *serv = NULL;
  120: 
  121: 	/* Get new object */
  122: 	if ((serv = MALLOC(mtype, sizeof(*serv))) == NULL) {
  123: 		alogf(LOG_ERR, "%s: %m", "malloc");
  124: 		goto fail;
  125: 	}
  126: 	memset(serv, 0, sizeof(*serv));
  127: 	serv->ctx = ctx;
  128: 	serv->cookie = cookie;
  129: 	serv->sock = -1;
  130: 	serv->max_conn = max_conn;
  131: 	serv->conn_timeout = conn_timeout;
  132: 	serv->setup = setup;
  133: 	serv->handler = handler;
  134: 	serv->teardown = teardown;
  135: 	TAILQ_INIT(&serv->conn_list);
  136: 	if (mtype != NULL) {
  137: 		strlcpy(serv->mtype_buf, mtype, sizeof(serv->mtype_buf));
  138: 		serv->mtype = serv->mtype_buf;
  139: 	}
  140: 
  141: 	/* Create and bind socket */
  142: 	if ((serv->sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1) {
  143: 		alogf(LOG_ERR, "%s: %m", "socket");
  144: 		goto fail;
  145: 	}
  146: 	(void)fcntl(serv->sock, F_SETFD, 1);
  147: 	if (setsockopt(serv->sock, SOL_SOCKET,
  148: 	    SO_REUSEADDR, (char *)&one, sizeof(one)) == -1) {
  149: 		alogf(LOG_ERR, "%s: %m", "setsockopt");
  150: 		goto fail;
  151: 	}
  152: #ifdef SO_REUSEPORT
  153: 	if (setsockopt(serv->sock, SOL_SOCKET,
  154: 	    SO_REUSEPORT, (char *)&one, sizeof(one)) == -1) {
  155: 		alogf(LOG_ERR, "%s: %m", "setsockopt");
  156: 		goto fail;
  157: 	}
  158: #endif
  159: 	memset(&serv->addr, 0, sizeof(serv->addr));
  160: #ifndef __linux__
  161: 	serv->addr.sin_len = sizeof(serv->addr);
  162: #endif
  163: 	serv->addr.sin_family = AF_INET;
  164: 	serv->addr.sin_port = htons(port);
  165: 	serv->addr.sin_addr = ip;
  166: 	if (bind(serv->sock,
  167: 	    (struct sockaddr *)&serv->addr, sizeof(serv->addr)) == -1) {
  168: 		alogf(LOG_ERR, "%s: %m", "bind");
  169: 		goto fail;
  170: 	}
  171: 	if (listen(serv->sock, 1024) == -1) {
  172: 		alogf(LOG_ERR, "%s: %m", "listen");
  173: 		goto fail;
  174: 	}
  175: 
  176: 	/* Accept incoming connections */
  177: 	if (pevent_register(serv->ctx, &serv->conn_event, PEVENT_RECURRING,
  178: 	      &serv->mutex, tcp_server_accept, serv, PEVENT_READ, serv->sock)
  179: 	    == -1) {
  180: 		alogf(LOG_ERR, "%s: %m", "pevent_register");
  181: 		goto fail;
  182: 	}
  183: 
  184: 	/* Create mutex */
  185: 	if ((errno = pthread_mutex_init(&serv->mutex, NULL)) != 0) {
  186: 		alogf(LOG_ERR, "%s: %m", "pthread_mutex_init");
  187: 		goto fail;
  188: 	}
  189: 
  190: 	/* Done */
  191: 	return (serv);
  192: 
  193: fail:
  194: 	/* Clean up and return error */
  195: 	if (serv != NULL) {
  196: 		pevent_unregister(&serv->conn_event);
  197: 		if (serv->sock != -1)
  198: 			(void)close(serv->sock);
  199: 		FREE(serv->mtype, serv);
  200: 	}
  201: 	return (NULL);
  202: }
  203: 
  204: /*
  205:  * Stop a TCP server
  206:  */
  207: void
  208: tcp_server_stop(struct tcp_server **servp)
  209: {
  210: 	struct tcp_server *const serv = *servp;
  211: 	struct tcp_connection *conn;
  212: 	int r;
  213: 
  214: 	/* Sanity */
  215: 	if (serv == NULL)
  216: 		return;
  217: 	*servp = NULL;
  218: 
  219: 	/* Acquire mutex */
  220: 	r = pthread_mutex_lock(&serv->mutex);
  221: 	assert(r == 0);
  222: 
  223: 	/* Stop accepting new connections */
  224: 	pevent_unregister(&serv->conn_event);
  225: 	pevent_unregister(&serv->wait_event);
  226: 
  227: 	/* Close listen socket */
  228: 	(void)close(serv->sock);
  229: 	serv->sock = -1;
  230: 
  231: 	/* Kill all outstanding connections */
  232: 	while (!TAILQ_EMPTY(&serv->conn_list)) {
  233: 
  234: 		/* Kill active connections; they will clean up themselves */
  235: 		TAILQ_FOREACH(conn, &serv->conn_list, next) {
  236: 			if (conn->started && conn->tid != 0) {
  237: 				pthread_cancel(conn->tid);
  238: 				conn->tid = 0;	/* don't cancel twice */
  239: 			}
  240: 		}
  241: 
  242: 		/* Wait for outstanding connections to complete */
  243: 		r = pthread_mutex_unlock(&serv->mutex);
  244: 		assert(r == 0);
  245: 		usleep(100000);
  246: 		r = pthread_mutex_lock(&serv->mutex);
  247: 		assert(r == 0);
  248: 	}
  249: 
  250: 	/* Free server structure */
  251: 	r = pthread_mutex_unlock(&serv->mutex);
  252: 	assert(r == 0);
  253: 	pthread_mutex_destroy(&serv->mutex);
  254: 	FREE(serv->mtype, serv);
  255: }
  256: 
  257: /*
  258:  * Get server cookie.
  259:  */
  260: void *
  261: tcp_server_get_cookie(struct tcp_server *serv)
  262: {
  263: 	return (serv->cookie);
  264: }
  265: 
  266: /*
  267:  * Get connection cookie.
  268:  */
  269: void *
  270: tcp_connection_get_cookie(struct tcp_connection *conn)
  271: {
  272: 	return (conn->cookie);
  273: }
  274: 
  275: /*
  276:  * Get connection file descriptor.
  277:  */
  278: int
  279: tcp_connection_get_fd(struct tcp_connection *conn)
  280: {
  281: 	return (conn->sock);
  282: }
  283: 
  284: /*
  285:  * Get connection file stream.
  286:  */
  287: FILE *
  288: tcp_connection_get_fp(struct tcp_connection *conn)
  289: {
  290: 	return (conn->fp);
  291: }
  292: 
  293: /*
  294:  * Get peer's address.
  295:  */
  296: void
  297: tcp_connection_get_peer(struct tcp_connection *conn, struct sockaddr_in *sin)
  298: {
  299: 	memcpy(sin, &conn->peer, sizeof(*sin));
  300: }
  301: 
  302: /*********************************************************************
  303: 		    NEW CONNECTION ACCEPTOR
  304: *********************************************************************/
  305: 
  306: /*
  307:  * Accept a new incoming connection.
  308:  *
  309:  * This will be called with the server mutex acquired.
  310:  */
  311: static void
  312: tcp_server_accept(void *arg)
  313: {
  314: 	struct tcp_server *const serv = arg;
  315: 	struct tcp_connection *conn;
  316: 	socklen_t slen = sizeof(conn->peer);
  317: 	struct sockaddr_in sin;
  318: 	int sock;
  319: 
  320: 	/* If maximum number of connections reached, pause a while */
  321: 	if (serv->max_conn > 0 && serv->num_conn >= serv->max_conn) {
  322: 		pevent_unregister(&serv->wait_event);
  323: 		pevent_unregister(&serv->conn_event);
  324: 		if (pevent_register(serv->ctx, &serv->wait_event, 0,
  325: 		    &serv->mutex, tcp_server_restart, serv, PEVENT_TIME,
  326: 		    TCP_SERVER_PAUSE) == -1)
  327: 			alogf(LOG_ERR, "%s: %m", "pevent_register");
  328: 		return;
  329: 	}
  330: 
  331: 	/* Accept next connection */
  332: 	if ((sock = accept(serv->sock, (struct sockaddr *)&sin, &slen)) == -1) {
  333: 		if (errno != ECONNABORTED && errno != ENOTCONN)
  334: 			alogf(LOG_ERR, "%s: %m", "accept");
  335: 		return;
  336: 	}
  337: 	(void)fcntl(sock, F_SETFD, 1);
  338: 
  339: 	/* Create connection state structure */
  340: 	if ((conn = MALLOC(serv->mtype, sizeof(*conn))) == NULL) {
  341: 		alogf(LOG_ERR, "%s: %m", "malloc");
  342: 		(void)close(sock);
  343: 		return;
  344: 	}
  345: 	memset(conn, 0, sizeof(*conn));
  346: 	conn->server = serv;
  347: 	conn->sock = sock;
  348: 	conn->peer = sin;
  349: 
  350: 	/* Put stream on top of file descriptor */
  351: 	if ((conn->fp = timeout_fdopen(conn->sock,
  352: 	    "r+", serv->conn_timeout)) == NULL) {
  353: 		alogf(LOG_ERR, "%s: %m", "timeout_fdopen");
  354: 		(void)close(conn->sock);
  355: 		FREE(serv->mtype, conn);
  356: 		return;
  357: 	}
  358: 	setbuf(conn->fp, NULL);
  359: 
  360: 	/* Spawn connection thread */
  361: 	if ((errno = pthread_create(&conn->tid, NULL,
  362: 	    tcp_server_connection_main, conn)) != 0) {
  363: 		conn->tid = 0;
  364: 		alogf(LOG_ERR, "%s: %m", "pthread_create");
  365: 		fclose(conn->fp);
  366: 		FREE(serv->mtype, conn);
  367: 		return;
  368: 	}
  369: 
  370: 	/* Detach thread */
  371: 	pthread_detach(conn->tid);
  372: 
  373: 	/* Add connection to list */
  374: 	TAILQ_INSERT_TAIL(&serv->conn_list, conn, next);
  375: 	serv->num_conn++;
  376: }
  377: 
  378: /*
  379:  * Start accepting new connections after waiting a while.
  380:  *
  381:  * This will be called with the server mutex acquired.
  382:  */
  383: static void
  384: tcp_server_restart(void *arg)
  385: {
  386: 	struct tcp_server *const serv = arg;
  387: 
  388: 	/* Accept incoming connections again */
  389: 	pevent_unregister(&serv->wait_event);
  390: 	pevent_unregister(&serv->conn_event);
  391: 	if (pevent_register(serv->ctx, &serv->conn_event, 0,
  392: 	    &serv->mutex, tcp_server_accept, serv, PEVENT_READ,
  393: 	    serv->sock) == -1)
  394: 		alogf(LOG_ERR, "%s: %m", "pevent_register");
  395: }
  396: 
  397: /*********************************************************************
  398: 		    TCP CONNECTION THREAD
  399: *********************************************************************/
  400: 
  401: /*
  402:  * Connection thread main entry point.
  403:  */
  404: static void *
  405: tcp_server_connection_main(void *arg)
  406: {
  407: 	struct tcp_connection *const conn = arg;
  408: 	struct tcp_server *const serv = conn->server;
  409: 
  410: 	/* Push cleanup hook */
  411: 	pthread_cleanup_push(tcp_server_connection_cleanup, conn);
  412: 	conn->started = 1;			/* now it's ok to cancel me */
  413: 
  414: 	/* Call application's setup routine */
  415: 	if (serv->setup != NULL
  416: 	    && (conn->cookie = (*serv->setup)(conn)) == NULL)
  417: 		goto done;
  418: 	conn->destruct = 1;
  419: 
  420: 	/* Invoke application handler */
  421: 	(*serv->handler)(conn);
  422: 
  423: done:;
  424: 	/* Done */
  425: 	pthread_cleanup_pop(1);
  426: 	return (NULL);
  427: }
  428: 
  429: /*
  430:  * Cleanup routine for tcp_server_connection_main().
  431:  */
  432: static void
  433: tcp_server_connection_cleanup(void *arg)
  434: {
  435: 	struct tcp_connection *const conn = arg;
  436: 	struct tcp_server *const serv = conn->server;
  437: 	int r;
  438: 
  439: 	/* Call application destructor */
  440: 	if (conn->destruct && serv->teardown != NULL) {
  441: 		conn->destruct = 0;
  442: 		(*serv->teardown)(conn);
  443: 	}
  444: 
  445: 	/* Close connection */
  446: 	if (conn->fp != NULL) {
  447: 		(void)fclose(conn->fp);
  448: 		conn->sock = -1;
  449: 	}
  450: 
  451: 	/* Unlink from server list */
  452: 	r = pthread_mutex_lock(&serv->mutex);
  453: 	assert(r == 0);
  454: 	serv->num_conn--;
  455: 	TAILQ_REMOVE(&serv->conn_list, conn, next);
  456: 	r = pthread_mutex_unlock(&serv->mutex);
  457: 	assert(r == 0);
  458: 
  459: 	/* Release connection object */
  460: 	FREE(serv->mtype, conn);
  461: }
  462: 

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>