Annotation of embedaddon/libpdel/net/tcp_server.c, revision 1.1.1.1
1.1 misho 1:
2: /*
3: * Copyright (c) 2001-2002 Packet Design, LLC.
4: * All rights reserved.
5: *
6: * Subject to the following obligations and disclaimer of warranty,
7: * use and redistribution of this software, in source or object code
8: * forms, with or without modifications are expressly permitted by
9: * Packet Design; provided, however, that:
10: *
11: * (i) Any and all reproductions of the source or object code
12: * must include the copyright notice above and the following
13: * disclaimer of warranties; and
14: * (ii) No rights are granted, in any manner or form, to use
15: * Packet Design trademarks, including the mark "PACKET DESIGN"
16: * on advertising, endorsements, or otherwise except as such
17: * appears in the above copyright notice or in the software.
18: *
19: * THIS SOFTWARE IS BEING PROVIDED BY PACKET DESIGN "AS IS", AND
20: * TO THE MAXIMUM EXTENT PERMITTED BY LAW, PACKET DESIGN MAKES NO
21: * REPRESENTATIONS OR WARRANTIES, EXPRESS OR IMPLIED, REGARDING
22: * THIS SOFTWARE, INCLUDING WITHOUT LIMITATION, ANY AND ALL IMPLIED
23: * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE,
24: * OR NON-INFRINGEMENT. PACKET DESIGN DOES NOT WARRANT, GUARANTEE,
25: * OR MAKE ANY REPRESENTATIONS REGARDING THE USE OF, OR THE RESULTS
26: * OF THE USE OF THIS SOFTWARE IN TERMS OF ITS CORRECTNESS, ACCURACY,
27: * RELIABILITY OR OTHERWISE. IN NO EVENT SHALL PACKET DESIGN BE
28: * LIABLE FOR ANY DAMAGES RESULTING FROM OR ARISING OUT OF ANY USE
29: * OF THIS SOFTWARE, INCLUDING WITHOUT LIMITATION, ANY DIRECT,
30: * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, PUNITIVE, OR CONSEQUENTIAL
31: * DAMAGES, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES, LOSS OF
32: * USE, DATA OR PROFITS, HOWEVER CAUSED AND UNDER ANY THEORY OF
33: * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
34: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
35: * THE USE OF THIS SOFTWARE, EVEN IF PACKET DESIGN IS ADVISED OF
36: * THE POSSIBILITY OF SUCH DAMAGE.
37: *
38: * Author: Archie Cobbs <archie@freebsd.org>
39: */
40:
41: #include <sys/types.h>
42: #include <sys/queue.h>
43: #include <sys/socket.h>
44: #include <netinet/in.h>
45: #include <arpa/inet.h>
46:
47: #include <assert.h>
48: #include <errno.h>
49: #include <fcntl.h>
50: #include <stdio.h>
51: #include <stdlib.h>
52: #include <syslog.h>
53: #include <string.h>
54: #include <unistd.h>
55: #include <stdarg.h>
56: #include <pthread.h>
57:
58: #include "structs/structs.h"
59: #include "structs/type/array.h"
60:
61: #include "util/pevent.h"
62: #include "util/typed_mem.h"
63: #include "io/timeout_fp.h"
64: #include "net/tcp_server.h"
65: #include "sys/alog.h"
66:
67: /* How long to pause when we reach max # connections */
68: #define TCP_SERVER_PAUSE 250 /* 0.25 sec */
69:
70: /* Server state */
71: struct tcp_server {
72: struct pevent_ctx *ctx; /* event context */
73: struct pevent *conn_event; /* incoming connection event */
74: struct pevent *wait_event; /* pause timeout event */
75: struct sockaddr_in addr; /* server bound address */
76: pthread_mutex_t mutex; /* server mutex */
77: u_int num_conn; /* # connections */
78: u_int max_conn; /* max # connections */
79: u_int conn_timeout; /* timeout for connections */
80: int sock; /* listening socket */
81: TAILQ_HEAD(, tcp_connection) conn_list; /* connection list */
82: void *cookie; /* application private data */
83: tcp_setup_t *setup; /* connection setup handler */
84: tcp_handler_t *handler; /* connection handler handler */
85: tcp_teardown_t *teardown; /* connection teardown handlr */
86: const char *mtype; /* typed memory type string */
87: char mtype_buf[TYPED_MEM_TYPELEN];
88: };
89:
90: /* Connection state */
91: struct tcp_connection {
92: pthread_t tid; /* connection thread */
93: struct tcp_server *server; /* associated server */
94: struct sockaddr_in peer; /* remote side address */
95: u_char started; /* thread has started */
96: u_char destruct; /* object needs teardown */
97: int sock; /* connection socket */
98: FILE *fp; /* connection stream (unbuf) */
99: TAILQ_ENTRY(tcp_connection) next; /* next in connection list */
100: void *cookie; /* application private data */
101: };
102:
103: /* Internal functions */
104: static void *tcp_server_connection_main(void *arg);
105: static void tcp_server_connection_cleanup(void *arg);
106:
107: static pevent_handler_t tcp_server_accept;
108: static pevent_handler_t tcp_server_restart;
109:
110: /*
111: * Start a new TCP server
112: */
113: struct tcp_server *
114: tcp_server_start(struct pevent_ctx *ctx, void *cookie, const char *mtype,
115: struct in_addr ip, u_int16_t port, u_int max_conn, u_int conn_timeout,
116: tcp_setup_t *setup, tcp_handler_t *handler, tcp_teardown_t *teardown)
117: {
118: static const int one = 1;
119: struct tcp_server *serv = NULL;
120:
121: /* Get new object */
122: if ((serv = MALLOC(mtype, sizeof(*serv))) == NULL) {
123: alogf(LOG_ERR, "%s: %m", "malloc");
124: goto fail;
125: }
126: memset(serv, 0, sizeof(*serv));
127: serv->ctx = ctx;
128: serv->cookie = cookie;
129: serv->sock = -1;
130: serv->max_conn = max_conn;
131: serv->conn_timeout = conn_timeout;
132: serv->setup = setup;
133: serv->handler = handler;
134: serv->teardown = teardown;
135: TAILQ_INIT(&serv->conn_list);
136: if (mtype != NULL) {
137: strlcpy(serv->mtype_buf, mtype, sizeof(serv->mtype_buf));
138: serv->mtype = serv->mtype_buf;
139: }
140:
141: /* Create and bind socket */
142: if ((serv->sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1) {
143: alogf(LOG_ERR, "%s: %m", "socket");
144: goto fail;
145: }
146: (void)fcntl(serv->sock, F_SETFD, 1);
147: if (setsockopt(serv->sock, SOL_SOCKET,
148: SO_REUSEADDR, (char *)&one, sizeof(one)) == -1) {
149: alogf(LOG_ERR, "%s: %m", "setsockopt");
150: goto fail;
151: }
152: #ifdef SO_REUSEPORT
153: if (setsockopt(serv->sock, SOL_SOCKET,
154: SO_REUSEPORT, (char *)&one, sizeof(one)) == -1) {
155: alogf(LOG_ERR, "%s: %m", "setsockopt");
156: goto fail;
157: }
158: #endif
159: memset(&serv->addr, 0, sizeof(serv->addr));
160: #ifndef __linux__
161: serv->addr.sin_len = sizeof(serv->addr);
162: #endif
163: serv->addr.sin_family = AF_INET;
164: serv->addr.sin_port = htons(port);
165: serv->addr.sin_addr = ip;
166: if (bind(serv->sock,
167: (struct sockaddr *)&serv->addr, sizeof(serv->addr)) == -1) {
168: alogf(LOG_ERR, "%s: %m", "bind");
169: goto fail;
170: }
171: if (listen(serv->sock, 1024) == -1) {
172: alogf(LOG_ERR, "%s: %m", "listen");
173: goto fail;
174: }
175:
176: /* Accept incoming connections */
177: if (pevent_register(serv->ctx, &serv->conn_event, PEVENT_RECURRING,
178: &serv->mutex, tcp_server_accept, serv, PEVENT_READ, serv->sock)
179: == -1) {
180: alogf(LOG_ERR, "%s: %m", "pevent_register");
181: goto fail;
182: }
183:
184: /* Create mutex */
185: if ((errno = pthread_mutex_init(&serv->mutex, NULL)) != 0) {
186: alogf(LOG_ERR, "%s: %m", "pthread_mutex_init");
187: goto fail;
188: }
189:
190: /* Done */
191: return (serv);
192:
193: fail:
194: /* Clean up and return error */
195: if (serv != NULL) {
196: pevent_unregister(&serv->conn_event);
197: if (serv->sock != -1)
198: (void)close(serv->sock);
199: FREE(serv->mtype, serv);
200: }
201: return (NULL);
202: }
203:
204: /*
205: * Stop a TCP server
206: */
207: void
208: tcp_server_stop(struct tcp_server **servp)
209: {
210: struct tcp_server *const serv = *servp;
211: struct tcp_connection *conn;
212: int r;
213:
214: /* Sanity */
215: if (serv == NULL)
216: return;
217: *servp = NULL;
218:
219: /* Acquire mutex */
220: r = pthread_mutex_lock(&serv->mutex);
221: assert(r == 0);
222:
223: /* Stop accepting new connections */
224: pevent_unregister(&serv->conn_event);
225: pevent_unregister(&serv->wait_event);
226:
227: /* Close listen socket */
228: (void)close(serv->sock);
229: serv->sock = -1;
230:
231: /* Kill all outstanding connections */
232: while (!TAILQ_EMPTY(&serv->conn_list)) {
233:
234: /* Kill active connections; they will clean up themselves */
235: TAILQ_FOREACH(conn, &serv->conn_list, next) {
236: if (conn->started && conn->tid != 0) {
237: pthread_cancel(conn->tid);
238: conn->tid = 0; /* don't cancel twice */
239: }
240: }
241:
242: /* Wait for outstanding connections to complete */
243: r = pthread_mutex_unlock(&serv->mutex);
244: assert(r == 0);
245: usleep(100000);
246: r = pthread_mutex_lock(&serv->mutex);
247: assert(r == 0);
248: }
249:
250: /* Free server structure */
251: r = pthread_mutex_unlock(&serv->mutex);
252: assert(r == 0);
253: pthread_mutex_destroy(&serv->mutex);
254: FREE(serv->mtype, serv);
255: }
256:
257: /*
258: * Get server cookie.
259: */
260: void *
261: tcp_server_get_cookie(struct tcp_server *serv)
262: {
263: return (serv->cookie);
264: }
265:
266: /*
267: * Get connection cookie.
268: */
269: void *
270: tcp_connection_get_cookie(struct tcp_connection *conn)
271: {
272: return (conn->cookie);
273: }
274:
275: /*
276: * Get connection file descriptor.
277: */
278: int
279: tcp_connection_get_fd(struct tcp_connection *conn)
280: {
281: return (conn->sock);
282: }
283:
284: /*
285: * Get connection file stream.
286: */
287: FILE *
288: tcp_connection_get_fp(struct tcp_connection *conn)
289: {
290: return (conn->fp);
291: }
292:
293: /*
294: * Get peer's address.
295: */
296: void
297: tcp_connection_get_peer(struct tcp_connection *conn, struct sockaddr_in *sin)
298: {
299: memcpy(sin, &conn->peer, sizeof(*sin));
300: }
301:
302: /*********************************************************************
303: NEW CONNECTION ACCEPTOR
304: *********************************************************************/
305:
306: /*
307: * Accept a new incoming connection.
308: *
309: * This will be called with the server mutex acquired.
310: */
311: static void
312: tcp_server_accept(void *arg)
313: {
314: struct tcp_server *const serv = arg;
315: struct tcp_connection *conn;
316: socklen_t slen = sizeof(conn->peer);
317: struct sockaddr_in sin;
318: int sock;
319:
320: /* If maximum number of connections reached, pause a while */
321: if (serv->max_conn > 0 && serv->num_conn >= serv->max_conn) {
322: pevent_unregister(&serv->wait_event);
323: pevent_unregister(&serv->conn_event);
324: if (pevent_register(serv->ctx, &serv->wait_event, 0,
325: &serv->mutex, tcp_server_restart, serv, PEVENT_TIME,
326: TCP_SERVER_PAUSE) == -1)
327: alogf(LOG_ERR, "%s: %m", "pevent_register");
328: return;
329: }
330:
331: /* Accept next connection */
332: if ((sock = accept(serv->sock, (struct sockaddr *)&sin, &slen)) == -1) {
333: if (errno != ECONNABORTED && errno != ENOTCONN)
334: alogf(LOG_ERR, "%s: %m", "accept");
335: return;
336: }
337: (void)fcntl(sock, F_SETFD, 1);
338:
339: /* Create connection state structure */
340: if ((conn = MALLOC(serv->mtype, sizeof(*conn))) == NULL) {
341: alogf(LOG_ERR, "%s: %m", "malloc");
342: (void)close(sock);
343: return;
344: }
345: memset(conn, 0, sizeof(*conn));
346: conn->server = serv;
347: conn->sock = sock;
348: conn->peer = sin;
349:
350: /* Put stream on top of file descriptor */
351: if ((conn->fp = timeout_fdopen(conn->sock,
352: "r+", serv->conn_timeout)) == NULL) {
353: alogf(LOG_ERR, "%s: %m", "timeout_fdopen");
354: (void)close(conn->sock);
355: FREE(serv->mtype, conn);
356: return;
357: }
358: setbuf(conn->fp, NULL);
359:
360: /* Spawn connection thread */
361: if ((errno = pthread_create(&conn->tid, NULL,
362: tcp_server_connection_main, conn)) != 0) {
363: conn->tid = 0;
364: alogf(LOG_ERR, "%s: %m", "pthread_create");
365: fclose(conn->fp);
366: FREE(serv->mtype, conn);
367: return;
368: }
369:
370: /* Detach thread */
371: pthread_detach(conn->tid);
372:
373: /* Add connection to list */
374: TAILQ_INSERT_TAIL(&serv->conn_list, conn, next);
375: serv->num_conn++;
376: }
377:
378: /*
379: * Start accepting new connections after waiting a while.
380: *
381: * This will be called with the server mutex acquired.
382: */
383: static void
384: tcp_server_restart(void *arg)
385: {
386: struct tcp_server *const serv = arg;
387:
388: /* Accept incoming connections again */
389: pevent_unregister(&serv->wait_event);
390: pevent_unregister(&serv->conn_event);
391: if (pevent_register(serv->ctx, &serv->conn_event, 0,
392: &serv->mutex, tcp_server_accept, serv, PEVENT_READ,
393: serv->sock) == -1)
394: alogf(LOG_ERR, "%s: %m", "pevent_register");
395: }
396:
397: /*********************************************************************
398: TCP CONNECTION THREAD
399: *********************************************************************/
400:
401: /*
402: * Connection thread main entry point.
403: */
404: static void *
405: tcp_server_connection_main(void *arg)
406: {
407: struct tcp_connection *const conn = arg;
408: struct tcp_server *const serv = conn->server;
409:
410: /* Push cleanup hook */
411: pthread_cleanup_push(tcp_server_connection_cleanup, conn);
412: conn->started = 1; /* now it's ok to cancel me */
413:
414: /* Call application's setup routine */
415: if (serv->setup != NULL
416: && (conn->cookie = (*serv->setup)(conn)) == NULL)
417: goto done;
418: conn->destruct = 1;
419:
420: /* Invoke application handler */
421: (*serv->handler)(conn);
422:
423: done:;
424: /* Done */
425: pthread_cleanup_pop(1);
426: return (NULL);
427: }
428:
429: /*
430: * Cleanup routine for tcp_server_connection_main().
431: */
432: static void
433: tcp_server_connection_cleanup(void *arg)
434: {
435: struct tcp_connection *const conn = arg;
436: struct tcp_server *const serv = conn->server;
437: int r;
438:
439: /* Call application destructor */
440: if (conn->destruct && serv->teardown != NULL) {
441: conn->destruct = 0;
442: (*serv->teardown)(conn);
443: }
444:
445: /* Close connection */
446: if (conn->fp != NULL) {
447: (void)fclose(conn->fp);
448: conn->sock = -1;
449: }
450:
451: /* Unlink from server list */
452: r = pthread_mutex_lock(&serv->mutex);
453: assert(r == 0);
454: serv->num_conn--;
455: TAILQ_REMOVE(&serv->conn_list, conn, next);
456: r = pthread_mutex_unlock(&serv->mutex);
457: assert(r == 0);
458:
459: /* Release connection object */
460: FREE(serv->mtype, conn);
461: }
462:
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>