File:  [ELWIX - Embedded LightWeight unIX -] / embedaddon / libpdel / net / tcp_server.c
Revision 1.1.1.1 (vendor branch): download - view: text, annotated - select for diffs - revision graph
Tue Feb 21 23:25:53 2012 UTC (12 years, 3 months ago) by misho
Branches: libpdel, MAIN
CVS tags: v0_5_3, HEAD
libpdel


/*
 * Copyright (c) 2001-2002 Packet Design, LLC.
 * All rights reserved.
 * 
 * Subject to the following obligations and disclaimer of warranty,
 * use and redistribution of this software, in source or object code
 * forms, with or without modifications are expressly permitted by
 * Packet Design; provided, however, that:
 * 
 *    (i)  Any and all reproductions of the source or object code
 *         must include the copyright notice above and the following
 *         disclaimer of warranties; and
 *    (ii) No rights are granted, in any manner or form, to use
 *         Packet Design trademarks, including the mark "PACKET DESIGN"
 *         on advertising, endorsements, or otherwise except as such
 *         appears in the above copyright notice or in the software.
 * 
 * THIS SOFTWARE IS BEING PROVIDED BY PACKET DESIGN "AS IS", AND
 * TO THE MAXIMUM EXTENT PERMITTED BY LAW, PACKET DESIGN MAKES NO
 * REPRESENTATIONS OR WARRANTIES, EXPRESS OR IMPLIED, REGARDING
 * THIS SOFTWARE, INCLUDING WITHOUT LIMITATION, ANY AND ALL IMPLIED
 * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE,
 * OR NON-INFRINGEMENT.  PACKET DESIGN DOES NOT WARRANT, GUARANTEE,
 * OR MAKE ANY REPRESENTATIONS REGARDING THE USE OF, OR THE RESULTS
 * OF THE USE OF THIS SOFTWARE IN TERMS OF ITS CORRECTNESS, ACCURACY,
 * RELIABILITY OR OTHERWISE.  IN NO EVENT SHALL PACKET DESIGN BE
 * LIABLE FOR ANY DAMAGES RESULTING FROM OR ARISING OUT OF ANY USE
 * OF THIS SOFTWARE, INCLUDING WITHOUT LIMITATION, ANY DIRECT,
 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, PUNITIVE, OR CONSEQUENTIAL
 * DAMAGES, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES, LOSS OF
 * USE, DATA OR PROFITS, HOWEVER CAUSED AND UNDER ANY THEORY OF
 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
 * THE USE OF THIS SOFTWARE, EVEN IF PACKET DESIGN IS ADVISED OF
 * THE POSSIBILITY OF SUCH DAMAGE.
 *
 * Author: Archie Cobbs <archie@freebsd.org>
 */

#include <sys/types.h>
#include <sys/queue.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#include <assert.h>
#include <errno.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <syslog.h>
#include <string.h>
#include <unistd.h>
#include <stdarg.h>
#include <pthread.h>

#include "structs/structs.h"
#include "structs/type/array.h"

#include "util/pevent.h"
#include "util/typed_mem.h"
#include "io/timeout_fp.h"
#include "net/tcp_server.h"
#include "sys/alog.h"

/* How long to pause when we reach max # connections */
#define TCP_SERVER_PAUSE	250		/* 0.25 sec */

/* Server state */
struct tcp_server {
	struct pevent_ctx	*ctx;		/* event context */
	struct pevent		*conn_event;	/* incoming connection event */
	struct pevent		*wait_event;	/* pause timeout event */
	struct sockaddr_in	addr;		/* server bound address */
	pthread_mutex_t		mutex;		/* server mutex */
	u_int			num_conn;	/* # connections */
	u_int			max_conn;	/* max # connections */
	u_int			conn_timeout;	/* timeout for connections */
	int			sock;		/* listening socket */
	TAILQ_HEAD(, tcp_connection) conn_list;	/* connection list */
	void			*cookie;	/* application private data */
	tcp_setup_t		*setup;		/* connection setup handler */
	tcp_handler_t		*handler;	/* connection handler handler */
	tcp_teardown_t		*teardown;	/* connection teardown handlr */
	const char		*mtype;		/* typed memory type string */
	char			mtype_buf[TYPED_MEM_TYPELEN];
};

/* Connection state */
struct tcp_connection {
	pthread_t		tid;		/* connection thread */
	struct tcp_server	*server;	/* associated server */
	struct sockaddr_in	peer;		/* remote side address */
	u_char			started;	/* thread has started */
	u_char			destruct;	/* object needs teardown */
	int			sock;		/* connection socket */
	FILE			*fp;		/* connection stream (unbuf) */
	TAILQ_ENTRY(tcp_connection) next;	/* next in connection list */
	void			*cookie;	/* application private data */
};

/* Internal functions */
static void	*tcp_server_connection_main(void *arg);
static void	tcp_server_connection_cleanup(void *arg);

static pevent_handler_t	tcp_server_accept;
static pevent_handler_t	tcp_server_restart;

/*
 * Start a new TCP server
 */
struct tcp_server *
tcp_server_start(struct pevent_ctx *ctx, void *cookie, const char *mtype,
	struct in_addr ip, u_int16_t port, u_int max_conn, u_int conn_timeout,
	tcp_setup_t *setup, tcp_handler_t *handler, tcp_teardown_t *teardown)
{
	static const int one = 1;
	struct tcp_server *serv = NULL;

	/* Get new object */
	if ((serv = MALLOC(mtype, sizeof(*serv))) == NULL) {
		alogf(LOG_ERR, "%s: %m", "malloc");
		goto fail;
	}
	memset(serv, 0, sizeof(*serv));
	serv->ctx = ctx;
	serv->cookie = cookie;
	serv->sock = -1;
	serv->max_conn = max_conn;
	serv->conn_timeout = conn_timeout;
	serv->setup = setup;
	serv->handler = handler;
	serv->teardown = teardown;
	TAILQ_INIT(&serv->conn_list);
	if (mtype != NULL) {
		strlcpy(serv->mtype_buf, mtype, sizeof(serv->mtype_buf));
		serv->mtype = serv->mtype_buf;
	}

	/* Create and bind socket */
	if ((serv->sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1) {
		alogf(LOG_ERR, "%s: %m", "socket");
		goto fail;
	}
	(void)fcntl(serv->sock, F_SETFD, 1);
	if (setsockopt(serv->sock, SOL_SOCKET,
	    SO_REUSEADDR, (char *)&one, sizeof(one)) == -1) {
		alogf(LOG_ERR, "%s: %m", "setsockopt");
		goto fail;
	}
#ifdef SO_REUSEPORT
	if (setsockopt(serv->sock, SOL_SOCKET,
	    SO_REUSEPORT, (char *)&one, sizeof(one)) == -1) {
		alogf(LOG_ERR, "%s: %m", "setsockopt");
		goto fail;
	}
#endif
	memset(&serv->addr, 0, sizeof(serv->addr));
#ifndef __linux__
	serv->addr.sin_len = sizeof(serv->addr);
#endif
	serv->addr.sin_family = AF_INET;
	serv->addr.sin_port = htons(port);
	serv->addr.sin_addr = ip;
	if (bind(serv->sock,
	    (struct sockaddr *)&serv->addr, sizeof(serv->addr)) == -1) {
		alogf(LOG_ERR, "%s: %m", "bind");
		goto fail;
	}
	if (listen(serv->sock, 1024) == -1) {
		alogf(LOG_ERR, "%s: %m", "listen");
		goto fail;
	}

	/* Accept incoming connections */
	if (pevent_register(serv->ctx, &serv->conn_event, PEVENT_RECURRING,
	      &serv->mutex, tcp_server_accept, serv, PEVENT_READ, serv->sock)
	    == -1) {
		alogf(LOG_ERR, "%s: %m", "pevent_register");
		goto fail;
	}

	/* Create mutex */
	if ((errno = pthread_mutex_init(&serv->mutex, NULL)) != 0) {
		alogf(LOG_ERR, "%s: %m", "pthread_mutex_init");
		goto fail;
	}

	/* Done */
	return (serv);

fail:
	/* Clean up and return error */
	if (serv != NULL) {
		pevent_unregister(&serv->conn_event);
		if (serv->sock != -1)
			(void)close(serv->sock);
		FREE(serv->mtype, serv);
	}
	return (NULL);
}

/*
 * Stop a TCP server
 */
void
tcp_server_stop(struct tcp_server **servp)
{
	struct tcp_server *const serv = *servp;
	struct tcp_connection *conn;
	int r;

	/* Sanity */
	if (serv == NULL)
		return;
	*servp = NULL;

	/* Acquire mutex */
	r = pthread_mutex_lock(&serv->mutex);
	assert(r == 0);

	/* Stop accepting new connections */
	pevent_unregister(&serv->conn_event);
	pevent_unregister(&serv->wait_event);

	/* Close listen socket */
	(void)close(serv->sock);
	serv->sock = -1;

	/* Kill all outstanding connections */
	while (!TAILQ_EMPTY(&serv->conn_list)) {

		/* Kill active connections; they will clean up themselves */
		TAILQ_FOREACH(conn, &serv->conn_list, next) {
			if (conn->started && conn->tid != 0) {
				pthread_cancel(conn->tid);
				conn->tid = 0;	/* don't cancel twice */
			}
		}

		/* Wait for outstanding connections to complete */
		r = pthread_mutex_unlock(&serv->mutex);
		assert(r == 0);
		usleep(100000);
		r = pthread_mutex_lock(&serv->mutex);
		assert(r == 0);
	}

	/* Free server structure */
	r = pthread_mutex_unlock(&serv->mutex);
	assert(r == 0);
	pthread_mutex_destroy(&serv->mutex);
	FREE(serv->mtype, serv);
}

/*
 * Get server cookie.
 */
void *
tcp_server_get_cookie(struct tcp_server *serv)
{
	return (serv->cookie);
}

/*
 * Get connection cookie.
 */
void *
tcp_connection_get_cookie(struct tcp_connection *conn)
{
	return (conn->cookie);
}

/*
 * Get connection file descriptor.
 */
int
tcp_connection_get_fd(struct tcp_connection *conn)
{
	return (conn->sock);
}

/*
 * Get connection file stream.
 */
FILE *
tcp_connection_get_fp(struct tcp_connection *conn)
{
	return (conn->fp);
}

/*
 * Get peer's address.
 */
void
tcp_connection_get_peer(struct tcp_connection *conn, struct sockaddr_in *sin)
{
	memcpy(sin, &conn->peer, sizeof(*sin));
}

/*********************************************************************
		    NEW CONNECTION ACCEPTOR
*********************************************************************/

/*
 * Accept a new incoming connection.
 *
 * This will be called with the server mutex acquired.
 */
static void
tcp_server_accept(void *arg)
{
	struct tcp_server *const serv = arg;
	struct tcp_connection *conn;
	socklen_t slen = sizeof(conn->peer);
	struct sockaddr_in sin;
	int sock;

	/* If maximum number of connections reached, pause a while */
	if (serv->max_conn > 0 && serv->num_conn >= serv->max_conn) {
		pevent_unregister(&serv->wait_event);
		pevent_unregister(&serv->conn_event);
		if (pevent_register(serv->ctx, &serv->wait_event, 0,
		    &serv->mutex, tcp_server_restart, serv, PEVENT_TIME,
		    TCP_SERVER_PAUSE) == -1)
			alogf(LOG_ERR, "%s: %m", "pevent_register");
		return;
	}

	/* Accept next connection */
	if ((sock = accept(serv->sock, (struct sockaddr *)&sin, &slen)) == -1) {
		if (errno != ECONNABORTED && errno != ENOTCONN)
			alogf(LOG_ERR, "%s: %m", "accept");
		return;
	}
	(void)fcntl(sock, F_SETFD, 1);

	/* Create connection state structure */
	if ((conn = MALLOC(serv->mtype, sizeof(*conn))) == NULL) {
		alogf(LOG_ERR, "%s: %m", "malloc");
		(void)close(sock);
		return;
	}
	memset(conn, 0, sizeof(*conn));
	conn->server = serv;
	conn->sock = sock;
	conn->peer = sin;

	/* Put stream on top of file descriptor */
	if ((conn->fp = timeout_fdopen(conn->sock,
	    "r+", serv->conn_timeout)) == NULL) {
		alogf(LOG_ERR, "%s: %m", "timeout_fdopen");
		(void)close(conn->sock);
		FREE(serv->mtype, conn);
		return;
	}
	setbuf(conn->fp, NULL);

	/* Spawn connection thread */
	if ((errno = pthread_create(&conn->tid, NULL,
	    tcp_server_connection_main, conn)) != 0) {
		conn->tid = 0;
		alogf(LOG_ERR, "%s: %m", "pthread_create");
		fclose(conn->fp);
		FREE(serv->mtype, conn);
		return;
	}

	/* Detach thread */
	pthread_detach(conn->tid);

	/* Add connection to list */
	TAILQ_INSERT_TAIL(&serv->conn_list, conn, next);
	serv->num_conn++;
}

/*
 * Start accepting new connections after waiting a while.
 *
 * This will be called with the server mutex acquired.
 */
static void
tcp_server_restart(void *arg)
{
	struct tcp_server *const serv = arg;

	/* Accept incoming connections again */
	pevent_unregister(&serv->wait_event);
	pevent_unregister(&serv->conn_event);
	if (pevent_register(serv->ctx, &serv->conn_event, 0,
	    &serv->mutex, tcp_server_accept, serv, PEVENT_READ,
	    serv->sock) == -1)
		alogf(LOG_ERR, "%s: %m", "pevent_register");
}

/*********************************************************************
		    TCP CONNECTION THREAD
*********************************************************************/

/*
 * Connection thread main entry point.
 */
static void *
tcp_server_connection_main(void *arg)
{
	struct tcp_connection *const conn = arg;
	struct tcp_server *const serv = conn->server;

	/* Push cleanup hook */
	pthread_cleanup_push(tcp_server_connection_cleanup, conn);
	conn->started = 1;			/* now it's ok to cancel me */

	/* Call application's setup routine */
	if (serv->setup != NULL
	    && (conn->cookie = (*serv->setup)(conn)) == NULL)
		goto done;
	conn->destruct = 1;

	/* Invoke application handler */
	(*serv->handler)(conn);

done:;
	/* Done */
	pthread_cleanup_pop(1);
	return (NULL);
}

/*
 * Cleanup routine for tcp_server_connection_main().
 */
static void
tcp_server_connection_cleanup(void *arg)
{
	struct tcp_connection *const conn = arg;
	struct tcp_server *const serv = conn->server;
	int r;

	/* Call application destructor */
	if (conn->destruct && serv->teardown != NULL) {
		conn->destruct = 0;
		(*serv->teardown)(conn);
	}

	/* Close connection */
	if (conn->fp != NULL) {
		(void)fclose(conn->fp);
		conn->sock = -1;
	}

	/* Unlink from server list */
	r = pthread_mutex_lock(&serv->mutex);
	assert(r == 0);
	serv->num_conn--;
	TAILQ_REMOVE(&serv->conn_list, conn, next);
	r = pthread_mutex_unlock(&serv->mutex);
	assert(r == 0);

	/* Release connection object */
	FREE(serv->mtype, conn);
}


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>