File:  [ELWIX - Embedded LightWeight unIX -] / embedaddon / libpdel / util / pevent.c
Revision 1.1.1.1 (vendor branch): download - view: text, annotated - select for diffs - revision graph
Tue Feb 21 23:25:53 2012 UTC (12 years, 4 months ago) by misho
Branches: libpdel, MAIN
CVS tags: v0_5_3, HEAD
libpdel


/*
 * Copyright (c) 2001-2002 Packet Design, LLC.
 * All rights reserved.
 * 
 * Subject to the following obligations and disclaimer of warranty,
 * use and redistribution of this software, in source or object code
 * forms, with or without modifications are expressly permitted by
 * Packet Design; provided, however, that:
 * 
 *    (i)  Any and all reproductions of the source or object code
 *         must include the copyright notice above and the following
 *         disclaimer of warranties; and
 *    (ii) No rights are granted, in any manner or form, to use
 *         Packet Design trademarks, including the mark "PACKET DESIGN"
 *         on advertising, endorsements, or otherwise except as such
 *         appears in the above copyright notice or in the software.
 * 
 * THIS SOFTWARE IS BEING PROVIDED BY PACKET DESIGN "AS IS", AND
 * TO THE MAXIMUM EXTENT PERMITTED BY LAW, PACKET DESIGN MAKES NO
 * REPRESENTATIONS OR WARRANTIES, EXPRESS OR IMPLIED, REGARDING
 * THIS SOFTWARE, INCLUDING WITHOUT LIMITATION, ANY AND ALL IMPLIED
 * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE,
 * OR NON-INFRINGEMENT.  PACKET DESIGN DOES NOT WARRANT, GUARANTEE,
 * OR MAKE ANY REPRESENTATIONS REGARDING THE USE OF, OR THE RESULTS
 * OF THE USE OF THIS SOFTWARE IN TERMS OF ITS CORRECTNESS, ACCURACY,
 * RELIABILITY OR OTHERWISE.  IN NO EVENT SHALL PACKET DESIGN BE
 * LIABLE FOR ANY DAMAGES RESULTING FROM OR ARISING OUT OF ANY USE
 * OF THIS SOFTWARE, INCLUDING WITHOUT LIMITATION, ANY DIRECT,
 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, PUNITIVE, OR CONSEQUENTIAL
 * DAMAGES, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES, LOSS OF
 * USE, DATA OR PROFITS, HOWEVER CAUSED AND UNDER ANY THEORY OF
 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
 * THE USE OF THIS SOFTWARE, EVEN IF PACKET DESIGN IS ADVISED OF
 * THE POSSIBILITY OF SUCH DAMAGE.
 *
 * Author: Archie Cobbs <archie@freebsd.org>
 */

#include <sys/types.h>
#include <sys/param.h>
#include <sys/queue.h>
#include <sys/time.h>

#include <netinet/in.h>

#include <stdarg.h>
#include <stdio.h>
#include <syslog.h>
#include <string.h>
#include <errno.h>
#include <assert.h>
#include <unistd.h>
#include <poll.h>
#include <sched.h>
#include <pthread.h>

#include "structs/structs.h"
#include "structs/type/array.h"
#include "util/typed_mem.h"
#include "util/mesg_port.h"
#include "util/pevent.h"
#include "sys/alog.h"

#include "internal.h"
#include "debug/debug.h"

#define PEVENT_MAGIC		0x31d7699b
#define PEVENT_CTX_MAGIC	0x7842f901

/* Private flags */
#define PEVENT_OCCURRED		0x8000		/* event has occurred */
#define PEVENT_CANCELED		0x4000		/* event canceled or done */
#define PEVENT_ENQUEUED		0x2000		/* in the ctx->events queue */
#define PEVENT_GOT_MUTEX	0x1000		/* user mutex acquired */

#define PEVENT_USER_FLAGS	(PEVENT_RECURRING | PEVENT_OWN_THREAD)

#define READABLE_EVENTS		(POLLIN | POLLRDNORM | POLLERR \
				    | POLLHUP | POLLNVAL)
#define WRITABLE_EVENTS		(POLLOUT | POLLWRNORM | POLLWRBAND \
				    | POLLERR | POLLHUP | POLLNVAL)

/* Event context */
struct pevent_ctx {
	u_int32_t		magic;		/* magic number */
	pthread_mutex_t		mutex;		/* mutex for context */
#if PDEL_DEBUG
	int			mutex_count;	/* mutex count */
#endif
	pthread_attr_t		attr;		/* event thread attributes */
	pthread_t		thread;		/* event thread */
	TAILQ_HEAD(, pevent)	events;		/* pending event list */
	u_int			nevents;	/* length of 'events' list */
	u_int			nrwevents;	/* number read/write events */
	struct pollfd		*fds;		/* poll(2) fds array */
	u_int			fds_alloc;	/* allocated size of 'fds' */
	const char		*mtype;		/* typed_mem(3) memory type */
	char			mtype_buf[TYPED_MEM_TYPELEN];
	int			pipe[2];	/* event thread notify pipe */
	u_char			notified;	/* data in the pipe */
	u_char			has_attr;	/* 'attr' is valid */
	u_int			refs;		/* references to this context */
};

/* Event object */
struct pevent {
	u_int32_t		magic;		/* magic number */
	struct pevent_ctx	*ctx;		/* pointer to event context */
	struct pevent		**peventp;	/* user handle to this event */
	pevent_handler_t	*handler;	/* event handler function */
	void			*arg;		/* event handler function arg */
	int			flags;		/* event flags */
	int			poll_idx;	/* index in poll(2) fds array */
	pthread_mutex_t		*mutex;		/* user mutex, if any */
#if PDEL_DEBUG
	int			mutex_count;	/* mutex count */
#endif
	enum pevent_type	type;		/* type of this event */
	struct timeval		when;		/* expiration for time events */
	u_int			refs;		/* references to this event */
	union {
		int		fd;		/* file descriptor */
		int		millis;		/* time delay */
		struct mesg_port *port;		/* mesg_port */
	}			u;
	TAILQ_ENTRY(pevent)	next;		/* next in ctx->events */
};

/* Macros */
#define PEVENT_ENQUEUE(ctx, ev)						\
	do {								\
		assert(((ev)->flags & PEVENT_ENQUEUED) == 0);		\
		TAILQ_INSERT_TAIL(&(ctx)->events, (ev), next);		\
		(ev)->flags |= PEVENT_ENQUEUED;				\
		(ctx)->nevents++;					\
		if ((ev)->type == PEVENT_READ				\
		    || (ev)->type == PEVENT_WRITE)			\
			(ctx)->nrwevents++;				\
		DBG(PEVENT, "ev %p refs %d -> %d (enqueued)",		\
		    (ev), (ev)->refs, (ev)->refs + 1);			\
		(ev)->refs++;						\
	} while (0)

#define PEVENT_DEQUEUE(ctx, ev)						\
	do {								\
		assert(((ev)->flags & PEVENT_ENQUEUED) != 0);		\
		TAILQ_REMOVE(&(ctx)->events, (ev), next);		\
		(ctx)->nevents--;					\
		if ((ev)->type == PEVENT_READ				\
		    || (ev)->type == PEVENT_WRITE)			\
			(ctx)->nrwevents--;				\
		(ev)->flags &= ~PEVENT_ENQUEUED;			\
		_pevent_unref(ev);					\
	} while (0)

#define PEVENT_SET_OCCURRED(ctx, ev)					\
	do {								\
		(ev)->flags |= PEVENT_OCCURRED;				\
		if ((ev) != TAILQ_FIRST(&ctx->events)) {		\
			TAILQ_REMOVE(&(ctx)->events, (ev), next);	\
			TAILQ_INSERT_HEAD(&(ctx)->events, (ev), next);	\
		}							\
	} while (0)

/* Internal functions */
static void	pevent_ctx_service(struct pevent *ev);
static void	*pevent_ctx_main(void *arg);
static void	pevent_ctx_main_cleanup(void *arg);
static void	*pevent_ctx_execute(void *arg);
static void	pevent_ctx_execute_cleanup(void *arg);
static void	pevent_ctx_notify(struct pevent_ctx *ctx);
static void	pevent_ctx_unref(struct pevent_ctx *ctx);
static void	pevent_cancel(struct pevent *ev);

/* Internal variables */
static char	pevent_byte;

/*
 * Create a new event context.
 */
struct pevent_ctx *
pevent_ctx_create(const char *mtype, const pthread_attr_t *attr)
{
	pthread_mutexattr_t mutexattr;
	struct pevent_ctx *ctx;
	int got_mutexattr = 0;
	int got_mutex = 0;

	/* Create context object */
	if ((ctx = MALLOC(mtype, sizeof(*ctx))) == NULL)
		return (NULL);
	memset(ctx, 0, sizeof(*ctx));
	if (mtype != NULL) {
		strlcpy(ctx->mtype_buf, mtype, sizeof(ctx->mtype_buf));
		ctx->mtype = ctx->mtype_buf;
	}
	TAILQ_INIT(&ctx->events);

	/* Copy thread attributes */
	if (attr != NULL) {
		struct sched_param param;
		int value;

		if ((errno = pthread_attr_init(&ctx->attr)) != 0)
			goto fail;
		ctx->has_attr = 1;
		pthread_attr_getinheritsched(attr, &value);
		pthread_attr_setinheritsched(&ctx->attr, value);
		pthread_attr_getschedparam(attr, &param);
		pthread_attr_setschedparam(&ctx->attr, &param);
		pthread_attr_getschedpolicy(attr, &value);
		pthread_attr_setschedpolicy(&ctx->attr, value);
		pthread_attr_getscope(attr, &value);
		pthread_attr_setscope(&ctx->attr, value);
	}

	/* Initialize mutex */
	if ((errno = pthread_mutexattr_init(&mutexattr) != 0))
		goto fail;
	got_mutexattr = 1;
	pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_RECURSIVE);
	if ((errno = pthread_mutex_init(&ctx->mutex, &mutexattr)) != 0)
		goto fail;
	got_mutex = 1;

	/* Initialize notify pipe */
	if (pipe(ctx->pipe) == -1)
		goto fail;

	/* Finish up */
	pthread_mutexattr_destroy(&mutexattr);
	ctx->magic = PEVENT_CTX_MAGIC;
	ctx->refs = 1;
	DBG(PEVENT, "created ctx %p", ctx);
	return (ctx);

fail:
	/* Clean up after failure */
	if (got_mutex)
		pthread_mutex_destroy(&ctx->mutex);
	if (got_mutexattr)
		pthread_mutexattr_destroy(&mutexattr);
	if (ctx->has_attr)
		pthread_attr_destroy(&ctx->attr);
	FREE(mtype, ctx);
	return (NULL);
}

/*
 * Destroy an event context.
 *
 * All events are unregistered.
 */
void
pevent_ctx_destroy(struct pevent_ctx **ctxp)
{
	struct pevent_ctx *const ctx = *ctxp;

	/* Allow NULL context */
	if (ctx == NULL)
		return;
	*ctxp = NULL;
	DBG(PEVENT, "destroying ctx %p", ctx);

	/* Lock context */
	MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);

	/* Sanity check */
	assert(ctx->magic == PEVENT_CTX_MAGIC);

	/* Unregister all events */
	while (!TAILQ_EMPTY(&ctx->events))
		pevent_unregister(TAILQ_FIRST(&ctx->events)->peventp);

	/* Decrement context reference count and unlock context */
	pevent_ctx_unref(ctx);
}

/*
 * Return the number of registered events.
 */
u_int
pevent_ctx_count(struct pevent_ctx *ctx)
{
	u_int nevents;

	assert(ctx->magic == PEVENT_CTX_MAGIC);
	MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
	nevents = ctx->nevents;
	MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
	return (nevents);
}

/*
 * Create a new schedule item.
 */
int
pevent_register(struct pevent_ctx *ctx, struct pevent **peventp, int flags,
	pthread_mutex_t *mutex, pevent_handler_t *handler, void *arg,
	enum pevent_type type, ...)
{
	struct pevent *ev;
	va_list args;

	/* Sanity check */
	if (*peventp != NULL) {
		errno = EBUSY;
		return (-1);
	}

	/* Check flags */
	if ((flags & ~PEVENT_USER_FLAGS) != 0) {
		errno = EINVAL;
		return (-1);
	}

	/* Create new event */
	if ((ev = MALLOC(ctx->mtype, sizeof(*ev))) == NULL)
		return (-1);
	memset(ev, 0, sizeof(*ev));
	ev->magic = PEVENT_MAGIC;
	ev->ctx = ctx;
	ev->handler = handler;
	ev->arg = arg;
	ev->flags = flags;
	ev->poll_idx = -1;
	ev->mutex = mutex;
	ev->type = type;
	ev->refs = 1;				/* the caller's reference */
	DBG(PEVENT, "new ev %p in ctx %p", ev, ctx);

	/* Add type-specific info */
	switch (type) {
	case PEVENT_READ:
	case PEVENT_WRITE:
		va_start(args, type);
		ev->u.fd = va_arg(args, int);
		va_end(args);
		break;
	case PEVENT_TIME:
		va_start(args, type);
		ev->u.millis = va_arg(args, int);
		va_end(args);
		if (ev->u.millis < 0)
			ev->u.millis = 0;
		gettimeofday(&ev->when, NULL);
		ev->when.tv_sec += ev->u.millis / 1000;
		ev->when.tv_usec += (ev->u.millis % 1000) * 1000;
		if (ev->when.tv_usec > 1000000) {
			ev->when.tv_sec++;
			ev->when.tv_usec -= 1000000;
		}
		break;
	case PEVENT_MESG_PORT:
		va_start(args, type);
		ev->u.port = va_arg(args, struct mesg_port *);
		va_end(args);
		if (ev->u.port == NULL)
			goto invalid;
		break;
	case PEVENT_USER:
		break;
	default:
	invalid:
		errno = EINVAL;
		_pevent_unref(ev);
		return (-1);
	}

	/* Lock context */
	MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);

	/* Link to related object (if appropriate) */
	switch (ev->type) {
	case PEVENT_MESG_PORT:
		if (_mesg_port_set_event(ev->u.port, ev) == -1) {
			MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
			_pevent_unref(ev);
			return (-1);
		}
		DBG(PEVENT, "ev %p refs %d -> %d (mesg port)",
		    ev, ev->refs, ev->refs + 1);
		ev->refs++;			/* the mesg_port's reference */
		break;
	default:
		break;
	}

	/* Start or notify event thread */
	if (ctx->thread == 0) {
		if ((errno = pthread_create(&ctx->thread,
		    ctx->has_attr ? &ctx->attr : NULL,
		    pevent_ctx_main, ctx)) != 0) {
			ctx->thread = 0;
			alogf(LOG_ERR, "%s: %m", "pthread_create");
			ev->flags |= PEVENT_CANCELED;
			MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
			_pevent_unref(ev);
			return (-1);
		}
		pthread_detach(ctx->thread);
		DBG(PEVENT, "created tid %p for ctx %p, refs -> %d",
		    ctx->thread, ctx, ctx->refs + 1);
		ctx->refs++;			/* add reference for thread */
	} else
		pevent_ctx_notify(ctx);

	/* Caller gets the one reference */
	ev->peventp = peventp;
	*peventp = ev;

	/* Add event to the pending event list */
	PEVENT_ENQUEUE(ctx, ev);

	/* Unlock context */
	MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);

	return (0);
}

/*
 * Unregister an event.
 */
void
pevent_unregister(struct pevent **peventp)
{
	struct pevent *const ev = *peventp;
	struct pevent_ctx *ctx;

	/* Allow NULL event */
	if (ev == NULL)
		return;
	ctx = ev->ctx;
	DBG(PEVENT, "unregister ev %p in ctx %p", ev, ctx);

	/* Sanity checks */
	assert(ev->magic == PEVENT_MAGIC);
	assert(ctx->magic == PEVENT_CTX_MAGIC);
	assert(ev->peventp == peventp);

	/* Lock context */
	MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
	DBG(PEVENT, "unregister ev %p in ctx %p: lock", ev, ctx);

	/* Dequeue event if queued */
	if ((ev->flags & PEVENT_ENQUEUED) != 0) {
		PEVENT_DEQUEUE(ctx, ev);
		pevent_ctx_notify(ctx);			/* wakeup thread */
	}

	/* Cancel the event */
	pevent_cancel(ev);

	/* Unlock context */
	DBG(PEVENT, "unregister ev %p in ctx %p: unlock", ev, ctx);
	MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
}

/*
 * Trigger an event.
 */
void
pevent_trigger(struct pevent *ev)
{
	struct pevent_ctx *ctx;

	/* Sanity check */
	if (ev == NULL)
		return;

	/* Get context */
	ctx = ev->ctx;
	DBG(PEVENT, "trigger ev %p in ctx %p", ev, ctx);
	assert(ev->magic == PEVENT_MAGIC);

	/* Lock context */
	MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);

	/* If event already canceled, bail */
	if ((ev->flags & PEVENT_CANCELED) != 0)
		goto done;

	/* Mark event as having occurred */
	PEVENT_SET_OCCURRED(ctx, ev);

	/* Wake up thread if event is still in the queue */
	if ((ev->flags & PEVENT_ENQUEUED) != 0)
		pevent_ctx_notify(ctx);

done:
	/* Unlock context */
	MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
}

/*
 * Get event info.
 */
int
pevent_get_info(struct pevent *ev, struct pevent_info *info)
{
	if (ev == NULL) {
		errno = ENXIO;
		return (-1);
	}
	assert(ev->magic == PEVENT_MAGIC);
	memset(info, 0, sizeof(*info));
	info->type = ev->type;
	switch (ev->type) {
	case PEVENT_READ:
	case PEVENT_WRITE:
		info->u.fd = ev->u.fd;
		break;
	case PEVENT_TIME:
		info->u.millis = ev->u.millis;
		break;
	case PEVENT_MESG_PORT:
		info->u.port = ev->u.port;
		break;
	case PEVENT_USER:
		break;
	default:
		errno = EINVAL;
		return (-1);
	}
	return (0);
}

/*
 * Event thread entry point.
 */
static void *
pevent_ctx_main(void *arg)
{
	struct pevent_ctx *const ctx = arg;
	struct timeval now;
	struct pollfd *fd;
	struct pevent *ev;
	struct pevent *next_ev;
	int poll_idx;
	int timeout;
	int r;

	/* Push cleanup hook */
	pthread_cleanup_push(pevent_ctx_main_cleanup, ctx);
	assert(ctx->magic == PEVENT_CTX_MAGIC);

	/* Lock context */
	MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);

	/* Safety belts */
	pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);

	/* Get current time */
	gettimeofday(&now, NULL);
	DBG(PEVENT, "ctx %p thread starting", ctx);

loop:
	/* Are there any events left? */
	if (TAILQ_EMPTY(&ctx->events)) {
		DBG(PEVENT, "ctx %p thread exiting", ctx);
		goto done;
	}

	/* Make sure ctx->fds array is long enough */
	if (ctx->fds_alloc < 1 + ctx->nrwevents) {
		const u_int new_alloc = roundup(1 + ctx->nrwevents, 16);
		void *mem;

		if ((mem = REALLOC(ctx->mtype, ctx->fds,
		    new_alloc * sizeof(*ctx->fds))) == NULL)
			alogf(LOG_ERR, "%s: %m", "realloc");
		else {
			ctx->fds = mem;
			ctx->fds_alloc = new_alloc;
		}
	}

	/* If we were intentionally woken up, read the wakeup byte */
	if (ctx->notified) {
		DBG(PEVENT, "ctx %p thread was notified", ctx);
		(void)read(ctx->pipe[0], &pevent_byte, 1);
		ctx->notified = 0;
	}

	/* Add event for the notify pipe */
	poll_idx = 0;
	if (ctx->fds_alloc > 0) {
		fd = &ctx->fds[poll_idx++];
		memset(fd, 0, sizeof(*fd));
		fd->fd = ctx->pipe[0];
		fd->events = POLLRDNORM;
	}

	/* Fill in rest of poll() array */
	timeout = INFTIM;
	TAILQ_FOREACH(ev, &ctx->events, next) {
		switch (ev->type) {
		case PEVENT_READ:
		case PEVENT_WRITE:
		    {
			if (poll_idx >= ctx->fds_alloc) {
				ev->poll_idx = -1;
				break;
			}
			ev->poll_idx = poll_idx++;
			fd = &ctx->fds[ev->poll_idx];
			memset(fd, 0, sizeof(*fd));
			fd->fd = ev->u.fd;
			fd->events = (ev->type == PEVENT_READ) ?
			    POLLRDNORM : POLLWRNORM;
			break;
		    }
		case PEVENT_TIME:
		    {
			struct timeval remain;
			int millis;

			/* Compute milliseconds until event */
			if (timercmp(&ev->when, &now, <=))
				millis = 0;
			else {
				timersub(&ev->when, &now, &remain);
				millis = remain.tv_sec * 1000;
				millis += remain.tv_usec / 1000;
			}

			/* Remember the minimum delay */
			if (timeout == INFTIM || millis < timeout)
				timeout = millis;
			break;
		    }
		default:
			break;
		}
	}

	/* Mark non-poll() events that have occurred */
	TAILQ_FOREACH(ev, &ctx->events, next) {
		assert(ev->magic == PEVENT_MAGIC);
		switch (ev->type) {
		case PEVENT_MESG_PORT:
			if (mesg_port_qlen(ev->u.port) > 0)
				PEVENT_SET_OCCURRED(ctx, ev);
			break;
		default:
			break;
		}
		if ((ev->flags & PEVENT_OCCURRED) != 0)
			timeout = 0;			/* don't delay */
	}

#if PDEL_DEBUG
	/* Debugging */
	DBG(PEVENT, "ctx %p thread event list:", ctx);
	TAILQ_FOREACH(ev, &ctx->events, next)
		DBG(PEVENT, "\tev %p", ev);
#endif

	/* Wait for something to happen */
	MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
	DBG(PEVENT, "ctx %p thread sleeping", ctx);
	r = poll(ctx->fds, poll_idx, timeout);
	DBG(PEVENT, "ctx %p thread woke up", ctx);
	assert(ctx->magic == PEVENT_CTX_MAGIC);
	MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);

	/* Check for errors */
	if (r == -1 && errno != EINTR) {
		alogf(LOG_CRIT, "%s: %m", "poll");
		assert(0);
	}

	/* Update current time */
	gettimeofday(&now, NULL);

	/* Mark poll() events that have occurred */
	for (ev = TAILQ_FIRST((&ctx->events)); ev != NULL; ev = next_ev) {
		next_ev = TAILQ_NEXT(ev, next);
		assert(ev->magic == PEVENT_MAGIC);
		switch (ev->type) {
		case PEVENT_READ:
		case PEVENT_WRITE:
			if (ev->poll_idx == -1)
				break;
			fd = &ctx->fds[ev->poll_idx];
			if ((fd->revents & ((ev->type == PEVENT_READ) ?
			    READABLE_EVENTS : WRITABLE_EVENTS)) != 0)
				PEVENT_SET_OCCURRED(ctx, ev);
			break;
		case PEVENT_TIME:
			if (timercmp(&ev->when, &now, <=))
				PEVENT_SET_OCCURRED(ctx, ev);
			break;
		default:
			break;
		}
	}

	/* Service all events that are marked as having occurred */
	while (1) {

		/* Find next event that needs service */
		ev = TAILQ_FIRST(&ctx->events);
		if (ev == NULL || (ev->flags & PEVENT_OCCURRED) == 0)
			break;
		DBG(PEVENT, "ctx %p thread servicing ev %p", ctx, ev);

		/* Detach event and service it while keeping a reference */
		DBG(PEVENT, "ev %p refs %d -> %d (for servicing)",
		    ev, ev->refs, ev->refs + 1);
		ev->refs++;
		PEVENT_DEQUEUE(ctx, ev);
		pevent_ctx_service(ev);
	}

	/* Spin again */
	DBG(PEVENT, "ctx %p thread spin again", ctx);
	goto loop;

done:;
	/* Done */
	pthread_cleanup_pop(1);
	return (NULL);
}

/*
 * Cleanup routine for event thread.
 */
static void
pevent_ctx_main_cleanup(void *arg)
{
	struct pevent_ctx *const ctx = arg;

	DBG(PEVENT, "ctx %p thread cleanup", ctx);
	ctx->thread = 0;
	pevent_ctx_unref(ctx);		/* release thread's reference */
}

/*
 * Service an event.
 *
 * This is called with an extra reference on the event which is
 * removed by pevent_ctx_execute().
 */
static void
pevent_ctx_service(struct pevent *ev)
{
	struct pevent_ctx *const ctx = ev->ctx;
	pthread_t tid;

	/* Does handler want to run in its own thread? */
	if ((ev->flags & PEVENT_OWN_THREAD) != 0) {
		if ((errno = pthread_create(&tid,
		    NULL, pevent_ctx_execute, ev)) != 0) {
			alogf(LOG_ERR,
			    "can't spawn thread for event %p: %m", ev);
			pevent_cancel(ev);	/* removes the reference */
			return;
		}
		pthread_detach(tid);
		DBG(PEVENT, "dispatched thread %p for ev %p", tid, ev);
		return;
	}

	/* Execute handler with context unlocked */
	MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
	pevent_ctx_execute(ev);
	MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
}

/*
 * Execute an event handler.
 *
 * Grab the user mutex, if any, check if event was canceled,
 * execute handler, release mutex.
 *
 * This is called with an extra reference on the event, which is
 * removed by this function.
 *
 * This should be NOT be called with a lock on the event context.
 */
static void *
pevent_ctx_execute(void *arg)
{
	struct pevent *const ev = arg;
	struct pevent_ctx *const ctx = ev->ctx;
	int r;

	/* Sanity check */
	assert(ev->magic == PEVENT_MAGIC);

	/* Register cleanup hook */
	DBG(PEVENT, "ev %p being executed", ev);
	pthread_cleanup_push(pevent_ctx_execute_cleanup, ev);

try_again:
	/* Acquire context mutex */
	DBG(PEVENT, "locking ctx %p for ev %p", ctx, ev);
	MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);

	/* Check for cancellation to close the cancel/timeout race */
	if ((ev->flags & PEVENT_CANCELED) != 0) {
		DBG(PEVENT, "ev %p was canceled", ev);
		MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
		goto done;
	}

	/*
	 * Acquire user mutex but avoid two potential problems:
	 *
	 * - Race window: other thread could call timer_cancel()
	 *   and destroy the user mutex just before we try to
	 *   acquire it; we must hold context lock to prevent this.
	 *
	 * - Lock reversal deadlock: other threads acquire the
	 *   user lock before the context lock, we do the reverse.
	 *
	 * Once we've acquire both locks, we drop the context lock.
	 */
	if (ev->mutex != NULL) {
		MUTEX_TRYLOCK(ev->mutex, ev->mutex_count);
		if (errno != 0) {
			if (errno != EBUSY) {
				alogf(LOG_ERR,
				    "can't lock mutex for event %p: %m", ev);
				pevent_cancel(ev);
				MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
				goto done;
			}
			MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
			sched_yield();
			goto try_again;
		}
		ev->flags |= PEVENT_GOT_MUTEX;
	}

	/* Remove user's event reference (we still have one though) */
	pevent_cancel(ev);

	/* Release context mutex */
	MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);

	/* Reschedule a new event if recurring */
	if ((ev->flags & PEVENT_RECURRING) != 0) {
		switch (ev->type) {
		case PEVENT_READ:
		case PEVENT_WRITE:
			r = pevent_register(ev->ctx, ev->peventp,
			    (ev->flags & PEVENT_USER_FLAGS), ev->mutex,
			    ev->handler, ev->arg, ev->type, ev->u.fd);
			break;
		case PEVENT_TIME:
			r = pevent_register(ev->ctx, ev->peventp,
			    (ev->flags & PEVENT_USER_FLAGS), ev->mutex,
			    ev->handler, ev->arg, ev->type, ev->u.millis);
			break;
		case PEVENT_MESG_PORT:
			r = pevent_register(ev->ctx, ev->peventp,
			    (ev->flags & PEVENT_USER_FLAGS), ev->mutex,
			    ev->handler, ev->arg, ev->type, ev->u.port);
			break;
		case PEVENT_USER:
			r = pevent_register(ev->ctx, ev->peventp,
			    (ev->flags & PEVENT_USER_FLAGS), ev->mutex,
			    ev->handler, ev->arg, ev->type);
			break;
		default:
			r = -1;
			assert(0);
		}
		if (r == -1) {
			alogf(LOG_ERR,
			    "can't re-register recurring event %p: %m", ev);
		}
	}

	/* Execute the handler */
	(*ev->handler)(ev->arg);

done:;
	/* Release user mutex and event reference */
	pthread_cleanup_pop(1);
	return (NULL);
}

/*
 * Clean up for pevent_ctx_execute()
 */
static void
pevent_ctx_execute_cleanup(void *arg)
{
	struct pevent *const ev = arg;

	DBG(PEVENT, "ev %p cleanup", ev);
	assert(ev->magic == PEVENT_MAGIC);
	if ((ev->flags & PEVENT_GOT_MUTEX) != 0)
		MUTEX_UNLOCK(ev->mutex, ev->mutex_count);
	_pevent_unref(ev);
}

/*
 * Wakeup event thread because the event list has changed.
 *
 * This assumes the mutex is locked.
 */
static void
pevent_ctx_notify(struct pevent_ctx *ctx)
{
	DBG(PEVENT, "ctx %p being notified", ctx);
	if (!ctx->notified) {
		(void)write(ctx->pipe[1], &pevent_byte, 1);
		ctx->notified = 1;
	}
}

/*
 * Cancel an event (make it so that it never gets triggered) and
 * remove the user reference to it.
 *
 * This assumes the lock is held.
 */
static void
pevent_cancel(struct pevent *ev)
{
	DBG(PEVENT, "canceling ev %p", ev);
	assert((ev->flags & PEVENT_CANCELED) == 0);
	ev->flags |= PEVENT_CANCELED;
	*ev->peventp = NULL;
	_pevent_unref(ev);
}

/*
 * Returns true if the event has been canceled.
 */
int
_pevent_canceled(struct pevent *ev)
{
	assert(ev->magic == PEVENT_MAGIC);
	return ((ev->flags & PEVENT_CANCELED) != 0);
}

/*
 * Drop a reference on an event.
 */
void
_pevent_unref(struct pevent *ev)
{
	struct pevent_ctx *const ctx = ev->ctx;

	DBG(PEVENT, "ev %p refs %d -> %d", ev, ev->refs, ev->refs - 1);
	assert(ev->refs > 0);
	assert(ev->magic == PEVENT_MAGIC);
	MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
	if (--ev->refs > 0) {
		MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
		return;
	}
	assert((ev->flags & PEVENT_ENQUEUED) == 0);
	ev->magic = ~0;				/* invalidate magic number */
	DBG(PEVENT, "freeing ev %p", ev);
	FREE(ctx->mtype, ev);
	MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
}

/*
 * Release context lock and drop a context reference.
 *
 * This assumes the mutex is locked. Upon return it will be unlocked.
 */
static void
pevent_ctx_unref(struct pevent_ctx *ctx)
{
	DBG(PEVENT, "ctx %p refs %d -> %d", ctx, ctx->refs, ctx->refs - 1);
	assert(ctx->refs > 0);
	assert(ctx->magic == PEVENT_CTX_MAGIC);
	if (--ctx->refs > 0) {
		MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
		return;
	}
	assert(TAILQ_EMPTY(&ctx->events));
	assert(ctx->nevents == 0);
	assert(ctx->thread == 0);
	(void)close(ctx->pipe[0]);
	(void)close(ctx->pipe[1]);
	MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
	pthread_mutex_destroy(&ctx->mutex);
	if (ctx->has_attr)
		pthread_attr_destroy(&ctx->attr);
	ctx->magic = ~0;			/* invalidate magic number */
	DBG(PEVENT, "freeing ctx %p", ctx);
	FREE(ctx->mtype, ctx->fds);
	FREE(ctx->mtype, ctx);
}


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>