File:  [ELWIX - Embedded LightWeight unIX -] / embedaddon / libpdel / util / pevent.c
Revision 1.1.1.1 (vendor branch): download - view: text, annotated - select for diffs - revision graph
Tue Feb 21 23:25:53 2012 UTC (13 years, 1 month ago) by misho
Branches: libpdel, MAIN
CVS tags: v0_5_3, HEAD
libpdel

    1: 
    2: /*
    3:  * Copyright (c) 2001-2002 Packet Design, LLC.
    4:  * All rights reserved.
    5:  * 
    6:  * Subject to the following obligations and disclaimer of warranty,
    7:  * use and redistribution of this software, in source or object code
    8:  * forms, with or without modifications are expressly permitted by
    9:  * Packet Design; provided, however, that:
   10:  * 
   11:  *    (i)  Any and all reproductions of the source or object code
   12:  *         must include the copyright notice above and the following
   13:  *         disclaimer of warranties; and
   14:  *    (ii) No rights are granted, in any manner or form, to use
   15:  *         Packet Design trademarks, including the mark "PACKET DESIGN"
   16:  *         on advertising, endorsements, or otherwise except as such
   17:  *         appears in the above copyright notice or in the software.
   18:  * 
   19:  * THIS SOFTWARE IS BEING PROVIDED BY PACKET DESIGN "AS IS", AND
   20:  * TO THE MAXIMUM EXTENT PERMITTED BY LAW, PACKET DESIGN MAKES NO
   21:  * REPRESENTATIONS OR WARRANTIES, EXPRESS OR IMPLIED, REGARDING
   22:  * THIS SOFTWARE, INCLUDING WITHOUT LIMITATION, ANY AND ALL IMPLIED
   23:  * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE,
   24:  * OR NON-INFRINGEMENT.  PACKET DESIGN DOES NOT WARRANT, GUARANTEE,
   25:  * OR MAKE ANY REPRESENTATIONS REGARDING THE USE OF, OR THE RESULTS
   26:  * OF THE USE OF THIS SOFTWARE IN TERMS OF ITS CORRECTNESS, ACCURACY,
   27:  * RELIABILITY OR OTHERWISE.  IN NO EVENT SHALL PACKET DESIGN BE
   28:  * LIABLE FOR ANY DAMAGES RESULTING FROM OR ARISING OUT OF ANY USE
   29:  * OF THIS SOFTWARE, INCLUDING WITHOUT LIMITATION, ANY DIRECT,
   30:  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, PUNITIVE, OR CONSEQUENTIAL
   31:  * DAMAGES, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES, LOSS OF
   32:  * USE, DATA OR PROFITS, HOWEVER CAUSED AND UNDER ANY THEORY OF
   33:  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
   34:  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
   35:  * THE USE OF THIS SOFTWARE, EVEN IF PACKET DESIGN IS ADVISED OF
   36:  * THE POSSIBILITY OF SUCH DAMAGE.
   37:  *
   38:  * Author: Archie Cobbs <archie@freebsd.org>
   39:  */
   40: 
   41: #include <sys/types.h>
   42: #include <sys/param.h>
   43: #include <sys/queue.h>
   44: #include <sys/time.h>
   45: 
   46: #include <netinet/in.h>
   47: 
   48: #include <stdarg.h>
   49: #include <stdio.h>
   50: #include <syslog.h>
   51: #include <string.h>
   52: #include <errno.h>
   53: #include <assert.h>
   54: #include <unistd.h>
   55: #include <poll.h>
   56: #include <sched.h>
   57: #include <pthread.h>
   58: 
   59: #include "structs/structs.h"
   60: #include "structs/type/array.h"
   61: #include "util/typed_mem.h"
   62: #include "util/mesg_port.h"
   63: #include "util/pevent.h"
   64: #include "sys/alog.h"
   65: 
   66: #include "internal.h"
   67: #include "debug/debug.h"
   68: 
   69: #define PEVENT_MAGIC		0x31d7699b
   70: #define PEVENT_CTX_MAGIC	0x7842f901
   71: 
   72: /* Private flags */
   73: #define PEVENT_OCCURRED		0x8000		/* event has occurred */
   74: #define PEVENT_CANCELED		0x4000		/* event canceled or done */
   75: #define PEVENT_ENQUEUED		0x2000		/* in the ctx->events queue */
   76: #define PEVENT_GOT_MUTEX	0x1000		/* user mutex acquired */
   77: 
   78: #define PEVENT_USER_FLAGS	(PEVENT_RECURRING | PEVENT_OWN_THREAD)
   79: 
   80: #define READABLE_EVENTS		(POLLIN | POLLRDNORM | POLLERR \
   81: 				    | POLLHUP | POLLNVAL)
   82: #define WRITABLE_EVENTS		(POLLOUT | POLLWRNORM | POLLWRBAND \
   83: 				    | POLLERR | POLLHUP | POLLNVAL)
   84: 
   85: /* Event context */
   86: struct pevent_ctx {
   87: 	u_int32_t		magic;		/* magic number */
   88: 	pthread_mutex_t		mutex;		/* mutex for context */
   89: #if PDEL_DEBUG
   90: 	int			mutex_count;	/* mutex count */
   91: #endif
   92: 	pthread_attr_t		attr;		/* event thread attributes */
   93: 	pthread_t		thread;		/* event thread */
   94: 	TAILQ_HEAD(, pevent)	events;		/* pending event list */
   95: 	u_int			nevents;	/* length of 'events' list */
   96: 	u_int			nrwevents;	/* number read/write events */
   97: 	struct pollfd		*fds;		/* poll(2) fds array */
   98: 	u_int			fds_alloc;	/* allocated size of 'fds' */
   99: 	const char		*mtype;		/* typed_mem(3) memory type */
  100: 	char			mtype_buf[TYPED_MEM_TYPELEN];
  101: 	int			pipe[2];	/* event thread notify pipe */
  102: 	u_char			notified;	/* data in the pipe */
  103: 	u_char			has_attr;	/* 'attr' is valid */
  104: 	u_int			refs;		/* references to this context */
  105: };
  106: 
  107: /* Event object */
  108: struct pevent {
  109: 	u_int32_t		magic;		/* magic number */
  110: 	struct pevent_ctx	*ctx;		/* pointer to event context */
  111: 	struct pevent		**peventp;	/* user handle to this event */
  112: 	pevent_handler_t	*handler;	/* event handler function */
  113: 	void			*arg;		/* event handler function arg */
  114: 	int			flags;		/* event flags */
  115: 	int			poll_idx;	/* index in poll(2) fds array */
  116: 	pthread_mutex_t		*mutex;		/* user mutex, if any */
  117: #if PDEL_DEBUG
  118: 	int			mutex_count;	/* mutex count */
  119: #endif
  120: 	enum pevent_type	type;		/* type of this event */
  121: 	struct timeval		when;		/* expiration for time events */
  122: 	u_int			refs;		/* references to this event */
  123: 	union {
  124: 		int		fd;		/* file descriptor */
  125: 		int		millis;		/* time delay */
  126: 		struct mesg_port *port;		/* mesg_port */
  127: 	}			u;
  128: 	TAILQ_ENTRY(pevent)	next;		/* next in ctx->events */
  129: };
  130: 
  131: /* Macros */
  132: #define PEVENT_ENQUEUE(ctx, ev)						\
  133: 	do {								\
  134: 		assert(((ev)->flags & PEVENT_ENQUEUED) == 0);		\
  135: 		TAILQ_INSERT_TAIL(&(ctx)->events, (ev), next);		\
  136: 		(ev)->flags |= PEVENT_ENQUEUED;				\
  137: 		(ctx)->nevents++;					\
  138: 		if ((ev)->type == PEVENT_READ				\
  139: 		    || (ev)->type == PEVENT_WRITE)			\
  140: 			(ctx)->nrwevents++;				\
  141: 		DBG(PEVENT, "ev %p refs %d -> %d (enqueued)",		\
  142: 		    (ev), (ev)->refs, (ev)->refs + 1);			\
  143: 		(ev)->refs++;						\
  144: 	} while (0)
  145: 
  146: #define PEVENT_DEQUEUE(ctx, ev)						\
  147: 	do {								\
  148: 		assert(((ev)->flags & PEVENT_ENQUEUED) != 0);		\
  149: 		TAILQ_REMOVE(&(ctx)->events, (ev), next);		\
  150: 		(ctx)->nevents--;					\
  151: 		if ((ev)->type == PEVENT_READ				\
  152: 		    || (ev)->type == PEVENT_WRITE)			\
  153: 			(ctx)->nrwevents--;				\
  154: 		(ev)->flags &= ~PEVENT_ENQUEUED;			\
  155: 		_pevent_unref(ev);					\
  156: 	} while (0)
  157: 
  158: #define PEVENT_SET_OCCURRED(ctx, ev)					\
  159: 	do {								\
  160: 		(ev)->flags |= PEVENT_OCCURRED;				\
  161: 		if ((ev) != TAILQ_FIRST(&ctx->events)) {		\
  162: 			TAILQ_REMOVE(&(ctx)->events, (ev), next);	\
  163: 			TAILQ_INSERT_HEAD(&(ctx)->events, (ev), next);	\
  164: 		}							\
  165: 	} while (0)
  166: 
  167: /* Internal functions */
  168: static void	pevent_ctx_service(struct pevent *ev);
  169: static void	*pevent_ctx_main(void *arg);
  170: static void	pevent_ctx_main_cleanup(void *arg);
  171: static void	*pevent_ctx_execute(void *arg);
  172: static void	pevent_ctx_execute_cleanup(void *arg);
  173: static void	pevent_ctx_notify(struct pevent_ctx *ctx);
  174: static void	pevent_ctx_unref(struct pevent_ctx *ctx);
  175: static void	pevent_cancel(struct pevent *ev);
  176: 
  177: /* Internal variables */
  178: static char	pevent_byte;
  179: 
  180: /*
  181:  * Create a new event context.
  182:  */
  183: struct pevent_ctx *
  184: pevent_ctx_create(const char *mtype, const pthread_attr_t *attr)
  185: {
  186: 	pthread_mutexattr_t mutexattr;
  187: 	struct pevent_ctx *ctx;
  188: 	int got_mutexattr = 0;
  189: 	int got_mutex = 0;
  190: 
  191: 	/* Create context object */
  192: 	if ((ctx = MALLOC(mtype, sizeof(*ctx))) == NULL)
  193: 		return (NULL);
  194: 	memset(ctx, 0, sizeof(*ctx));
  195: 	if (mtype != NULL) {
  196: 		strlcpy(ctx->mtype_buf, mtype, sizeof(ctx->mtype_buf));
  197: 		ctx->mtype = ctx->mtype_buf;
  198: 	}
  199: 	TAILQ_INIT(&ctx->events);
  200: 
  201: 	/* Copy thread attributes */
  202: 	if (attr != NULL) {
  203: 		struct sched_param param;
  204: 		int value;
  205: 
  206: 		if ((errno = pthread_attr_init(&ctx->attr)) != 0)
  207: 			goto fail;
  208: 		ctx->has_attr = 1;
  209: 		pthread_attr_getinheritsched(attr, &value);
  210: 		pthread_attr_setinheritsched(&ctx->attr, value);
  211: 		pthread_attr_getschedparam(attr, &param);
  212: 		pthread_attr_setschedparam(&ctx->attr, &param);
  213: 		pthread_attr_getschedpolicy(attr, &value);
  214: 		pthread_attr_setschedpolicy(&ctx->attr, value);
  215: 		pthread_attr_getscope(attr, &value);
  216: 		pthread_attr_setscope(&ctx->attr, value);
  217: 	}
  218: 
  219: 	/* Initialize mutex */
  220: 	if ((errno = pthread_mutexattr_init(&mutexattr) != 0))
  221: 		goto fail;
  222: 	got_mutexattr = 1;
  223: 	pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_RECURSIVE);
  224: 	if ((errno = pthread_mutex_init(&ctx->mutex, &mutexattr)) != 0)
  225: 		goto fail;
  226: 	got_mutex = 1;
  227: 
  228: 	/* Initialize notify pipe */
  229: 	if (pipe(ctx->pipe) == -1)
  230: 		goto fail;
  231: 
  232: 	/* Finish up */
  233: 	pthread_mutexattr_destroy(&mutexattr);
  234: 	ctx->magic = PEVENT_CTX_MAGIC;
  235: 	ctx->refs = 1;
  236: 	DBG(PEVENT, "created ctx %p", ctx);
  237: 	return (ctx);
  238: 
  239: fail:
  240: 	/* Clean up after failure */
  241: 	if (got_mutex)
  242: 		pthread_mutex_destroy(&ctx->mutex);
  243: 	if (got_mutexattr)
  244: 		pthread_mutexattr_destroy(&mutexattr);
  245: 	if (ctx->has_attr)
  246: 		pthread_attr_destroy(&ctx->attr);
  247: 	FREE(mtype, ctx);
  248: 	return (NULL);
  249: }
  250: 
  251: /*
  252:  * Destroy an event context.
  253:  *
  254:  * All events are unregistered.
  255:  */
  256: void
  257: pevent_ctx_destroy(struct pevent_ctx **ctxp)
  258: {
  259: 	struct pevent_ctx *const ctx = *ctxp;
  260: 
  261: 	/* Allow NULL context */
  262: 	if (ctx == NULL)
  263: 		return;
  264: 	*ctxp = NULL;
  265: 	DBG(PEVENT, "destroying ctx %p", ctx);
  266: 
  267: 	/* Lock context */
  268: 	MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
  269: 
  270: 	/* Sanity check */
  271: 	assert(ctx->magic == PEVENT_CTX_MAGIC);
  272: 
  273: 	/* Unregister all events */
  274: 	while (!TAILQ_EMPTY(&ctx->events))
  275: 		pevent_unregister(TAILQ_FIRST(&ctx->events)->peventp);
  276: 
  277: 	/* Decrement context reference count and unlock context */
  278: 	pevent_ctx_unref(ctx);
  279: }
  280: 
  281: /*
  282:  * Return the number of registered events.
  283:  */
  284: u_int
  285: pevent_ctx_count(struct pevent_ctx *ctx)
  286: {
  287: 	u_int nevents;
  288: 
  289: 	assert(ctx->magic == PEVENT_CTX_MAGIC);
  290: 	MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
  291: 	nevents = ctx->nevents;
  292: 	MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
  293: 	return (nevents);
  294: }
  295: 
  296: /*
  297:  * Create a new schedule item.
  298:  */
  299: int
  300: pevent_register(struct pevent_ctx *ctx, struct pevent **peventp, int flags,
  301: 	pthread_mutex_t *mutex, pevent_handler_t *handler, void *arg,
  302: 	enum pevent_type type, ...)
  303: {
  304: 	struct pevent *ev;
  305: 	va_list args;
  306: 
  307: 	/* Sanity check */
  308: 	if (*peventp != NULL) {
  309: 		errno = EBUSY;
  310: 		return (-1);
  311: 	}
  312: 
  313: 	/* Check flags */
  314: 	if ((flags & ~PEVENT_USER_FLAGS) != 0) {
  315: 		errno = EINVAL;
  316: 		return (-1);
  317: 	}
  318: 
  319: 	/* Create new event */
  320: 	if ((ev = MALLOC(ctx->mtype, sizeof(*ev))) == NULL)
  321: 		return (-1);
  322: 	memset(ev, 0, sizeof(*ev));
  323: 	ev->magic = PEVENT_MAGIC;
  324: 	ev->ctx = ctx;
  325: 	ev->handler = handler;
  326: 	ev->arg = arg;
  327: 	ev->flags = flags;
  328: 	ev->poll_idx = -1;
  329: 	ev->mutex = mutex;
  330: 	ev->type = type;
  331: 	ev->refs = 1;				/* the caller's reference */
  332: 	DBG(PEVENT, "new ev %p in ctx %p", ev, ctx);
  333: 
  334: 	/* Add type-specific info */
  335: 	switch (type) {
  336: 	case PEVENT_READ:
  337: 	case PEVENT_WRITE:
  338: 		va_start(args, type);
  339: 		ev->u.fd = va_arg(args, int);
  340: 		va_end(args);
  341: 		break;
  342: 	case PEVENT_TIME:
  343: 		va_start(args, type);
  344: 		ev->u.millis = va_arg(args, int);
  345: 		va_end(args);
  346: 		if (ev->u.millis < 0)
  347: 			ev->u.millis = 0;
  348: 		gettimeofday(&ev->when, NULL);
  349: 		ev->when.tv_sec += ev->u.millis / 1000;
  350: 		ev->when.tv_usec += (ev->u.millis % 1000) * 1000;
  351: 		if (ev->when.tv_usec > 1000000) {
  352: 			ev->when.tv_sec++;
  353: 			ev->when.tv_usec -= 1000000;
  354: 		}
  355: 		break;
  356: 	case PEVENT_MESG_PORT:
  357: 		va_start(args, type);
  358: 		ev->u.port = va_arg(args, struct mesg_port *);
  359: 		va_end(args);
  360: 		if (ev->u.port == NULL)
  361: 			goto invalid;
  362: 		break;
  363: 	case PEVENT_USER:
  364: 		break;
  365: 	default:
  366: 	invalid:
  367: 		errno = EINVAL;
  368: 		_pevent_unref(ev);
  369: 		return (-1);
  370: 	}
  371: 
  372: 	/* Lock context */
  373: 	MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
  374: 
  375: 	/* Link to related object (if appropriate) */
  376: 	switch (ev->type) {
  377: 	case PEVENT_MESG_PORT:
  378: 		if (_mesg_port_set_event(ev->u.port, ev) == -1) {
  379: 			MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
  380: 			_pevent_unref(ev);
  381: 			return (-1);
  382: 		}
  383: 		DBG(PEVENT, "ev %p refs %d -> %d (mesg port)",
  384: 		    ev, ev->refs, ev->refs + 1);
  385: 		ev->refs++;			/* the mesg_port's reference */
  386: 		break;
  387: 	default:
  388: 		break;
  389: 	}
  390: 
  391: 	/* Start or notify event thread */
  392: 	if (ctx->thread == 0) {
  393: 		if ((errno = pthread_create(&ctx->thread,
  394: 		    ctx->has_attr ? &ctx->attr : NULL,
  395: 		    pevent_ctx_main, ctx)) != 0) {
  396: 			ctx->thread = 0;
  397: 			alogf(LOG_ERR, "%s: %m", "pthread_create");
  398: 			ev->flags |= PEVENT_CANCELED;
  399: 			MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
  400: 			_pevent_unref(ev);
  401: 			return (-1);
  402: 		}
  403: 		pthread_detach(ctx->thread);
  404: 		DBG(PEVENT, "created tid %p for ctx %p, refs -> %d",
  405: 		    ctx->thread, ctx, ctx->refs + 1);
  406: 		ctx->refs++;			/* add reference for thread */
  407: 	} else
  408: 		pevent_ctx_notify(ctx);
  409: 
  410: 	/* Caller gets the one reference */
  411: 	ev->peventp = peventp;
  412: 	*peventp = ev;
  413: 
  414: 	/* Add event to the pending event list */
  415: 	PEVENT_ENQUEUE(ctx, ev);
  416: 
  417: 	/* Unlock context */
  418: 	MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
  419: 
  420: 	return (0);
  421: }
  422: 
  423: /*
  424:  * Unregister an event.
  425:  */
  426: void
  427: pevent_unregister(struct pevent **peventp)
  428: {
  429: 	struct pevent *const ev = *peventp;
  430: 	struct pevent_ctx *ctx;
  431: 
  432: 	/* Allow NULL event */
  433: 	if (ev == NULL)
  434: 		return;
  435: 	ctx = ev->ctx;
  436: 	DBG(PEVENT, "unregister ev %p in ctx %p", ev, ctx);
  437: 
  438: 	/* Sanity checks */
  439: 	assert(ev->magic == PEVENT_MAGIC);
  440: 	assert(ctx->magic == PEVENT_CTX_MAGIC);
  441: 	assert(ev->peventp == peventp);
  442: 
  443: 	/* Lock context */
  444: 	MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
  445: 	DBG(PEVENT, "unregister ev %p in ctx %p: lock", ev, ctx);
  446: 
  447: 	/* Dequeue event if queued */
  448: 	if ((ev->flags & PEVENT_ENQUEUED) != 0) {
  449: 		PEVENT_DEQUEUE(ctx, ev);
  450: 		pevent_ctx_notify(ctx);			/* wakeup thread */
  451: 	}
  452: 
  453: 	/* Cancel the event */
  454: 	pevent_cancel(ev);
  455: 
  456: 	/* Unlock context */
  457: 	DBG(PEVENT, "unregister ev %p in ctx %p: unlock", ev, ctx);
  458: 	MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
  459: }
  460: 
  461: /*
  462:  * Trigger an event.
  463:  */
  464: void
  465: pevent_trigger(struct pevent *ev)
  466: {
  467: 	struct pevent_ctx *ctx;
  468: 
  469: 	/* Sanity check */
  470: 	if (ev == NULL)
  471: 		return;
  472: 
  473: 	/* Get context */
  474: 	ctx = ev->ctx;
  475: 	DBG(PEVENT, "trigger ev %p in ctx %p", ev, ctx);
  476: 	assert(ev->magic == PEVENT_MAGIC);
  477: 
  478: 	/* Lock context */
  479: 	MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
  480: 
  481: 	/* If event already canceled, bail */
  482: 	if ((ev->flags & PEVENT_CANCELED) != 0)
  483: 		goto done;
  484: 
  485: 	/* Mark event as having occurred */
  486: 	PEVENT_SET_OCCURRED(ctx, ev);
  487: 
  488: 	/* Wake up thread if event is still in the queue */
  489: 	if ((ev->flags & PEVENT_ENQUEUED) != 0)
  490: 		pevent_ctx_notify(ctx);
  491: 
  492: done:
  493: 	/* Unlock context */
  494: 	MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
  495: }
  496: 
  497: /*
  498:  * Get event info.
  499:  */
  500: int
  501: pevent_get_info(struct pevent *ev, struct pevent_info *info)
  502: {
  503: 	if (ev == NULL) {
  504: 		errno = ENXIO;
  505: 		return (-1);
  506: 	}
  507: 	assert(ev->magic == PEVENT_MAGIC);
  508: 	memset(info, 0, sizeof(*info));
  509: 	info->type = ev->type;
  510: 	switch (ev->type) {
  511: 	case PEVENT_READ:
  512: 	case PEVENT_WRITE:
  513: 		info->u.fd = ev->u.fd;
  514: 		break;
  515: 	case PEVENT_TIME:
  516: 		info->u.millis = ev->u.millis;
  517: 		break;
  518: 	case PEVENT_MESG_PORT:
  519: 		info->u.port = ev->u.port;
  520: 		break;
  521: 	case PEVENT_USER:
  522: 		break;
  523: 	default:
  524: 		errno = EINVAL;
  525: 		return (-1);
  526: 	}
  527: 	return (0);
  528: }
  529: 
  530: /*
  531:  * Event thread entry point.
  532:  */
  533: static void *
  534: pevent_ctx_main(void *arg)
  535: {
  536: 	struct pevent_ctx *const ctx = arg;
  537: 	struct timeval now;
  538: 	struct pollfd *fd;
  539: 	struct pevent *ev;
  540: 	struct pevent *next_ev;
  541: 	int poll_idx;
  542: 	int timeout;
  543: 	int r;
  544: 
  545: 	/* Push cleanup hook */
  546: 	pthread_cleanup_push(pevent_ctx_main_cleanup, ctx);
  547: 	assert(ctx->magic == PEVENT_CTX_MAGIC);
  548: 
  549: 	/* Lock context */
  550: 	MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
  551: 
  552: 	/* Safety belts */
  553: 	pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
  554: 
  555: 	/* Get current time */
  556: 	gettimeofday(&now, NULL);
  557: 	DBG(PEVENT, "ctx %p thread starting", ctx);
  558: 
  559: loop:
  560: 	/* Are there any events left? */
  561: 	if (TAILQ_EMPTY(&ctx->events)) {
  562: 		DBG(PEVENT, "ctx %p thread exiting", ctx);
  563: 		goto done;
  564: 	}
  565: 
  566: 	/* Make sure ctx->fds array is long enough */
  567: 	if (ctx->fds_alloc < 1 + ctx->nrwevents) {
  568: 		const u_int new_alloc = roundup(1 + ctx->nrwevents, 16);
  569: 		void *mem;
  570: 
  571: 		if ((mem = REALLOC(ctx->mtype, ctx->fds,
  572: 		    new_alloc * sizeof(*ctx->fds))) == NULL)
  573: 			alogf(LOG_ERR, "%s: %m", "realloc");
  574: 		else {
  575: 			ctx->fds = mem;
  576: 			ctx->fds_alloc = new_alloc;
  577: 		}
  578: 	}
  579: 
  580: 	/* If we were intentionally woken up, read the wakeup byte */
  581: 	if (ctx->notified) {
  582: 		DBG(PEVENT, "ctx %p thread was notified", ctx);
  583: 		(void)read(ctx->pipe[0], &pevent_byte, 1);
  584: 		ctx->notified = 0;
  585: 	}
  586: 
  587: 	/* Add event for the notify pipe */
  588: 	poll_idx = 0;
  589: 	if (ctx->fds_alloc > 0) {
  590: 		fd = &ctx->fds[poll_idx++];
  591: 		memset(fd, 0, sizeof(*fd));
  592: 		fd->fd = ctx->pipe[0];
  593: 		fd->events = POLLRDNORM;
  594: 	}
  595: 
  596: 	/* Fill in rest of poll() array */
  597: 	timeout = INFTIM;
  598: 	TAILQ_FOREACH(ev, &ctx->events, next) {
  599: 		switch (ev->type) {
  600: 		case PEVENT_READ:
  601: 		case PEVENT_WRITE:
  602: 		    {
  603: 			if (poll_idx >= ctx->fds_alloc) {
  604: 				ev->poll_idx = -1;
  605: 				break;
  606: 			}
  607: 			ev->poll_idx = poll_idx++;
  608: 			fd = &ctx->fds[ev->poll_idx];
  609: 			memset(fd, 0, sizeof(*fd));
  610: 			fd->fd = ev->u.fd;
  611: 			fd->events = (ev->type == PEVENT_READ) ?
  612: 			    POLLRDNORM : POLLWRNORM;
  613: 			break;
  614: 		    }
  615: 		case PEVENT_TIME:
  616: 		    {
  617: 			struct timeval remain;
  618: 			int millis;
  619: 
  620: 			/* Compute milliseconds until event */
  621: 			if (timercmp(&ev->when, &now, <=))
  622: 				millis = 0;
  623: 			else {
  624: 				timersub(&ev->when, &now, &remain);
  625: 				millis = remain.tv_sec * 1000;
  626: 				millis += remain.tv_usec / 1000;
  627: 			}
  628: 
  629: 			/* Remember the minimum delay */
  630: 			if (timeout == INFTIM || millis < timeout)
  631: 				timeout = millis;
  632: 			break;
  633: 		    }
  634: 		default:
  635: 			break;
  636: 		}
  637: 	}
  638: 
  639: 	/* Mark non-poll() events that have occurred */
  640: 	TAILQ_FOREACH(ev, &ctx->events, next) {
  641: 		assert(ev->magic == PEVENT_MAGIC);
  642: 		switch (ev->type) {
  643: 		case PEVENT_MESG_PORT:
  644: 			if (mesg_port_qlen(ev->u.port) > 0)
  645: 				PEVENT_SET_OCCURRED(ctx, ev);
  646: 			break;
  647: 		default:
  648: 			break;
  649: 		}
  650: 		if ((ev->flags & PEVENT_OCCURRED) != 0)
  651: 			timeout = 0;			/* don't delay */
  652: 	}
  653: 
  654: #if PDEL_DEBUG
  655: 	/* Debugging */
  656: 	DBG(PEVENT, "ctx %p thread event list:", ctx);
  657: 	TAILQ_FOREACH(ev, &ctx->events, next)
  658: 		DBG(PEVENT, "\tev %p", ev);
  659: #endif
  660: 
  661: 	/* Wait for something to happen */
  662: 	MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
  663: 	DBG(PEVENT, "ctx %p thread sleeping", ctx);
  664: 	r = poll(ctx->fds, poll_idx, timeout);
  665: 	DBG(PEVENT, "ctx %p thread woke up", ctx);
  666: 	assert(ctx->magic == PEVENT_CTX_MAGIC);
  667: 	MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
  668: 
  669: 	/* Check for errors */
  670: 	if (r == -1 && errno != EINTR) {
  671: 		alogf(LOG_CRIT, "%s: %m", "poll");
  672: 		assert(0);
  673: 	}
  674: 
  675: 	/* Update current time */
  676: 	gettimeofday(&now, NULL);
  677: 
  678: 	/* Mark poll() events that have occurred */
  679: 	for (ev = TAILQ_FIRST((&ctx->events)); ev != NULL; ev = next_ev) {
  680: 		next_ev = TAILQ_NEXT(ev, next);
  681: 		assert(ev->magic == PEVENT_MAGIC);
  682: 		switch (ev->type) {
  683: 		case PEVENT_READ:
  684: 		case PEVENT_WRITE:
  685: 			if (ev->poll_idx == -1)
  686: 				break;
  687: 			fd = &ctx->fds[ev->poll_idx];
  688: 			if ((fd->revents & ((ev->type == PEVENT_READ) ?
  689: 			    READABLE_EVENTS : WRITABLE_EVENTS)) != 0)
  690: 				PEVENT_SET_OCCURRED(ctx, ev);
  691: 			break;
  692: 		case PEVENT_TIME:
  693: 			if (timercmp(&ev->when, &now, <=))
  694: 				PEVENT_SET_OCCURRED(ctx, ev);
  695: 			break;
  696: 		default:
  697: 			break;
  698: 		}
  699: 	}
  700: 
  701: 	/* Service all events that are marked as having occurred */
  702: 	while (1) {
  703: 
  704: 		/* Find next event that needs service */
  705: 		ev = TAILQ_FIRST(&ctx->events);
  706: 		if (ev == NULL || (ev->flags & PEVENT_OCCURRED) == 0)
  707: 			break;
  708: 		DBG(PEVENT, "ctx %p thread servicing ev %p", ctx, ev);
  709: 
  710: 		/* Detach event and service it while keeping a reference */
  711: 		DBG(PEVENT, "ev %p refs %d -> %d (for servicing)",
  712: 		    ev, ev->refs, ev->refs + 1);
  713: 		ev->refs++;
  714: 		PEVENT_DEQUEUE(ctx, ev);
  715: 		pevent_ctx_service(ev);
  716: 	}
  717: 
  718: 	/* Spin again */
  719: 	DBG(PEVENT, "ctx %p thread spin again", ctx);
  720: 	goto loop;
  721: 
  722: done:;
  723: 	/* Done */
  724: 	pthread_cleanup_pop(1);
  725: 	return (NULL);
  726: }
  727: 
  728: /*
  729:  * Cleanup routine for event thread.
  730:  */
  731: static void
  732: pevent_ctx_main_cleanup(void *arg)
  733: {
  734: 	struct pevent_ctx *const ctx = arg;
  735: 
  736: 	DBG(PEVENT, "ctx %p thread cleanup", ctx);
  737: 	ctx->thread = 0;
  738: 	pevent_ctx_unref(ctx);		/* release thread's reference */
  739: }
  740: 
  741: /*
  742:  * Service an event.
  743:  *
  744:  * This is called with an extra reference on the event which is
  745:  * removed by pevent_ctx_execute().
  746:  */
  747: static void
  748: pevent_ctx_service(struct pevent *ev)
  749: {
  750: 	struct pevent_ctx *const ctx = ev->ctx;
  751: 	pthread_t tid;
  752: 
  753: 	/* Does handler want to run in its own thread? */
  754: 	if ((ev->flags & PEVENT_OWN_THREAD) != 0) {
  755: 		if ((errno = pthread_create(&tid,
  756: 		    NULL, pevent_ctx_execute, ev)) != 0) {
  757: 			alogf(LOG_ERR,
  758: 			    "can't spawn thread for event %p: %m", ev);
  759: 			pevent_cancel(ev);	/* removes the reference */
  760: 			return;
  761: 		}
  762: 		pthread_detach(tid);
  763: 		DBG(PEVENT, "dispatched thread %p for ev %p", tid, ev);
  764: 		return;
  765: 	}
  766: 
  767: 	/* Execute handler with context unlocked */
  768: 	MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
  769: 	pevent_ctx_execute(ev);
  770: 	MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
  771: }
  772: 
  773: /*
  774:  * Execute an event handler.
  775:  *
  776:  * Grab the user mutex, if any, check if event was canceled,
  777:  * execute handler, release mutex.
  778:  *
  779:  * This is called with an extra reference on the event, which is
  780:  * removed by this function.
  781:  *
  782:  * This should be NOT be called with a lock on the event context.
  783:  */
  784: static void *
  785: pevent_ctx_execute(void *arg)
  786: {
  787: 	struct pevent *const ev = arg;
  788: 	struct pevent_ctx *const ctx = ev->ctx;
  789: 	int r;
  790: 
  791: 	/* Sanity check */
  792: 	assert(ev->magic == PEVENT_MAGIC);
  793: 
  794: 	/* Register cleanup hook */
  795: 	DBG(PEVENT, "ev %p being executed", ev);
  796: 	pthread_cleanup_push(pevent_ctx_execute_cleanup, ev);
  797: 
  798: try_again:
  799: 	/* Acquire context mutex */
  800: 	DBG(PEVENT, "locking ctx %p for ev %p", ctx, ev);
  801: 	MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
  802: 
  803: 	/* Check for cancellation to close the cancel/timeout race */
  804: 	if ((ev->flags & PEVENT_CANCELED) != 0) {
  805: 		DBG(PEVENT, "ev %p was canceled", ev);
  806: 		MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
  807: 		goto done;
  808: 	}
  809: 
  810: 	/*
  811: 	 * Acquire user mutex but avoid two potential problems:
  812: 	 *
  813: 	 * - Race window: other thread could call timer_cancel()
  814: 	 *   and destroy the user mutex just before we try to
  815: 	 *   acquire it; we must hold context lock to prevent this.
  816: 	 *
  817: 	 * - Lock reversal deadlock: other threads acquire the
  818: 	 *   user lock before the context lock, we do the reverse.
  819: 	 *
  820: 	 * Once we've acquire both locks, we drop the context lock.
  821: 	 */
  822: 	if (ev->mutex != NULL) {
  823: 		MUTEX_TRYLOCK(ev->mutex, ev->mutex_count);
  824: 		if (errno != 0) {
  825: 			if (errno != EBUSY) {
  826: 				alogf(LOG_ERR,
  827: 				    "can't lock mutex for event %p: %m", ev);
  828: 				pevent_cancel(ev);
  829: 				MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
  830: 				goto done;
  831: 			}
  832: 			MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
  833: 			sched_yield();
  834: 			goto try_again;
  835: 		}
  836: 		ev->flags |= PEVENT_GOT_MUTEX;
  837: 	}
  838: 
  839: 	/* Remove user's event reference (we still have one though) */
  840: 	pevent_cancel(ev);
  841: 
  842: 	/* Release context mutex */
  843: 	MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
  844: 
  845: 	/* Reschedule a new event if recurring */
  846: 	if ((ev->flags & PEVENT_RECURRING) != 0) {
  847: 		switch (ev->type) {
  848: 		case PEVENT_READ:
  849: 		case PEVENT_WRITE:
  850: 			r = pevent_register(ev->ctx, ev->peventp,
  851: 			    (ev->flags & PEVENT_USER_FLAGS), ev->mutex,
  852: 			    ev->handler, ev->arg, ev->type, ev->u.fd);
  853: 			break;
  854: 		case PEVENT_TIME:
  855: 			r = pevent_register(ev->ctx, ev->peventp,
  856: 			    (ev->flags & PEVENT_USER_FLAGS), ev->mutex,
  857: 			    ev->handler, ev->arg, ev->type, ev->u.millis);
  858: 			break;
  859: 		case PEVENT_MESG_PORT:
  860: 			r = pevent_register(ev->ctx, ev->peventp,
  861: 			    (ev->flags & PEVENT_USER_FLAGS), ev->mutex,
  862: 			    ev->handler, ev->arg, ev->type, ev->u.port);
  863: 			break;
  864: 		case PEVENT_USER:
  865: 			r = pevent_register(ev->ctx, ev->peventp,
  866: 			    (ev->flags & PEVENT_USER_FLAGS), ev->mutex,
  867: 			    ev->handler, ev->arg, ev->type);
  868: 			break;
  869: 		default:
  870: 			r = -1;
  871: 			assert(0);
  872: 		}
  873: 		if (r == -1) {
  874: 			alogf(LOG_ERR,
  875: 			    "can't re-register recurring event %p: %m", ev);
  876: 		}
  877: 	}
  878: 
  879: 	/* Execute the handler */
  880: 	(*ev->handler)(ev->arg);
  881: 
  882: done:;
  883: 	/* Release user mutex and event reference */
  884: 	pthread_cleanup_pop(1);
  885: 	return (NULL);
  886: }
  887: 
  888: /*
  889:  * Clean up for pevent_ctx_execute()
  890:  */
  891: static void
  892: pevent_ctx_execute_cleanup(void *arg)
  893: {
  894: 	struct pevent *const ev = arg;
  895: 
  896: 	DBG(PEVENT, "ev %p cleanup", ev);
  897: 	assert(ev->magic == PEVENT_MAGIC);
  898: 	if ((ev->flags & PEVENT_GOT_MUTEX) != 0)
  899: 		MUTEX_UNLOCK(ev->mutex, ev->mutex_count);
  900: 	_pevent_unref(ev);
  901: }
  902: 
  903: /*
  904:  * Wakeup event thread because the event list has changed.
  905:  *
  906:  * This assumes the mutex is locked.
  907:  */
  908: static void
  909: pevent_ctx_notify(struct pevent_ctx *ctx)
  910: {
  911: 	DBG(PEVENT, "ctx %p being notified", ctx);
  912: 	if (!ctx->notified) {
  913: 		(void)write(ctx->pipe[1], &pevent_byte, 1);
  914: 		ctx->notified = 1;
  915: 	}
  916: }
  917: 
  918: /*
  919:  * Cancel an event (make it so that it never gets triggered) and
  920:  * remove the user reference to it.
  921:  *
  922:  * This assumes the lock is held.
  923:  */
  924: static void
  925: pevent_cancel(struct pevent *ev)
  926: {
  927: 	DBG(PEVENT, "canceling ev %p", ev);
  928: 	assert((ev->flags & PEVENT_CANCELED) == 0);
  929: 	ev->flags |= PEVENT_CANCELED;
  930: 	*ev->peventp = NULL;
  931: 	_pevent_unref(ev);
  932: }
  933: 
  934: /*
  935:  * Returns true if the event has been canceled.
  936:  */
  937: int
  938: _pevent_canceled(struct pevent *ev)
  939: {
  940: 	assert(ev->magic == PEVENT_MAGIC);
  941: 	return ((ev->flags & PEVENT_CANCELED) != 0);
  942: }
  943: 
  944: /*
  945:  * Drop a reference on an event.
  946:  */
  947: void
  948: _pevent_unref(struct pevent *ev)
  949: {
  950: 	struct pevent_ctx *const ctx = ev->ctx;
  951: 
  952: 	DBG(PEVENT, "ev %p refs %d -> %d", ev, ev->refs, ev->refs - 1);
  953: 	assert(ev->refs > 0);
  954: 	assert(ev->magic == PEVENT_MAGIC);
  955: 	MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
  956: 	if (--ev->refs > 0) {
  957: 		MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
  958: 		return;
  959: 	}
  960: 	assert((ev->flags & PEVENT_ENQUEUED) == 0);
  961: 	ev->magic = ~0;				/* invalidate magic number */
  962: 	DBG(PEVENT, "freeing ev %p", ev);
  963: 	FREE(ctx->mtype, ev);
  964: 	MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
  965: }
  966: 
  967: /*
  968:  * Release context lock and drop a context reference.
  969:  *
  970:  * This assumes the mutex is locked. Upon return it will be unlocked.
  971:  */
  972: static void
  973: pevent_ctx_unref(struct pevent_ctx *ctx)
  974: {
  975: 	DBG(PEVENT, "ctx %p refs %d -> %d", ctx, ctx->refs, ctx->refs - 1);
  976: 	assert(ctx->refs > 0);
  977: 	assert(ctx->magic == PEVENT_CTX_MAGIC);
  978: 	if (--ctx->refs > 0) {
  979: 		MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
  980: 		return;
  981: 	}
  982: 	assert(TAILQ_EMPTY(&ctx->events));
  983: 	assert(ctx->nevents == 0);
  984: 	assert(ctx->thread == 0);
  985: 	(void)close(ctx->pipe[0]);
  986: 	(void)close(ctx->pipe[1]);
  987: 	MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
  988: 	pthread_mutex_destroy(&ctx->mutex);
  989: 	if (ctx->has_attr)
  990: 		pthread_attr_destroy(&ctx->attr);
  991: 	ctx->magic = ~0;			/* invalidate magic number */
  992: 	DBG(PEVENT, "freeing ctx %p", ctx);
  993: 	FREE(ctx->mtype, ctx->fds);
  994: 	FREE(ctx->mtype, ctx);
  995: }
  996: 

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>