1:
2: /*
3: * Copyright (c) 2001-2002 Packet Design, LLC.
4: * All rights reserved.
5: *
6: * Subject to the following obligations and disclaimer of warranty,
7: * use and redistribution of this software, in source or object code
8: * forms, with or without modifications are expressly permitted by
9: * Packet Design; provided, however, that:
10: *
11: * (i) Any and all reproductions of the source or object code
12: * must include the copyright notice above and the following
13: * disclaimer of warranties; and
14: * (ii) No rights are granted, in any manner or form, to use
15: * Packet Design trademarks, including the mark "PACKET DESIGN"
16: * on advertising, endorsements, or otherwise except as such
17: * appears in the above copyright notice or in the software.
18: *
19: * THIS SOFTWARE IS BEING PROVIDED BY PACKET DESIGN "AS IS", AND
20: * TO THE MAXIMUM EXTENT PERMITTED BY LAW, PACKET DESIGN MAKES NO
21: * REPRESENTATIONS OR WARRANTIES, EXPRESS OR IMPLIED, REGARDING
22: * THIS SOFTWARE, INCLUDING WITHOUT LIMITATION, ANY AND ALL IMPLIED
23: * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE,
24: * OR NON-INFRINGEMENT. PACKET DESIGN DOES NOT WARRANT, GUARANTEE,
25: * OR MAKE ANY REPRESENTATIONS REGARDING THE USE OF, OR THE RESULTS
26: * OF THE USE OF THIS SOFTWARE IN TERMS OF ITS CORRECTNESS, ACCURACY,
27: * RELIABILITY OR OTHERWISE. IN NO EVENT SHALL PACKET DESIGN BE
28: * LIABLE FOR ANY DAMAGES RESULTING FROM OR ARISING OUT OF ANY USE
29: * OF THIS SOFTWARE, INCLUDING WITHOUT LIMITATION, ANY DIRECT,
30: * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, PUNITIVE, OR CONSEQUENTIAL
31: * DAMAGES, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES, LOSS OF
32: * USE, DATA OR PROFITS, HOWEVER CAUSED AND UNDER ANY THEORY OF
33: * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
34: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
35: * THE USE OF THIS SOFTWARE, EVEN IF PACKET DESIGN IS ADVISED OF
36: * THE POSSIBILITY OF SUCH DAMAGE.
37: *
38: * Author: Archie Cobbs <archie@freebsd.org>
39: */
40:
41: #include <sys/types.h>
42: #include <sys/param.h>
43: #include <sys/queue.h>
44: #include <sys/time.h>
45:
46: #include <netinet/in.h>
47:
48: #include <stdarg.h>
49: #include <stdio.h>
50: #include <syslog.h>
51: #include <string.h>
52: #include <errno.h>
53: #include <assert.h>
54: #include <unistd.h>
55: #include <poll.h>
56: #include <sched.h>
57: #include <pthread.h>
58:
59: #include "structs/structs.h"
60: #include "structs/type/array.h"
61: #include "util/typed_mem.h"
62: #include "util/mesg_port.h"
63: #include "util/pevent.h"
64: #include "sys/alog.h"
65:
66: #include "internal.h"
67: #include "debug/debug.h"
68:
69: #define PEVENT_MAGIC 0x31d7699b
70: #define PEVENT_CTX_MAGIC 0x7842f901
71:
72: /* Private flags */
73: #define PEVENT_OCCURRED 0x8000 /* event has occurred */
74: #define PEVENT_CANCELED 0x4000 /* event canceled or done */
75: #define PEVENT_ENQUEUED 0x2000 /* in the ctx->events queue */
76: #define PEVENT_GOT_MUTEX 0x1000 /* user mutex acquired */
77:
78: #define PEVENT_USER_FLAGS (PEVENT_RECURRING | PEVENT_OWN_THREAD)
79:
80: #define READABLE_EVENTS (POLLIN | POLLRDNORM | POLLERR \
81: | POLLHUP | POLLNVAL)
82: #define WRITABLE_EVENTS (POLLOUT | POLLWRNORM | POLLWRBAND \
83: | POLLERR | POLLHUP | POLLNVAL)
84:
85: /* Event context */
86: struct pevent_ctx {
87: u_int32_t magic; /* magic number */
88: pthread_mutex_t mutex; /* mutex for context */
89: #if PDEL_DEBUG
90: int mutex_count; /* mutex count */
91: #endif
92: pthread_attr_t attr; /* event thread attributes */
93: pthread_t thread; /* event thread */
94: TAILQ_HEAD(, pevent) events; /* pending event list */
95: u_int nevents; /* length of 'events' list */
96: u_int nrwevents; /* number read/write events */
97: struct pollfd *fds; /* poll(2) fds array */
98: u_int fds_alloc; /* allocated size of 'fds' */
99: const char *mtype; /* typed_mem(3) memory type */
100: char mtype_buf[TYPED_MEM_TYPELEN];
101: int pipe[2]; /* event thread notify pipe */
102: u_char notified; /* data in the pipe */
103: u_char has_attr; /* 'attr' is valid */
104: u_int refs; /* references to this context */
105: };
106:
107: /* Event object */
108: struct pevent {
109: u_int32_t magic; /* magic number */
110: struct pevent_ctx *ctx; /* pointer to event context */
111: struct pevent **peventp; /* user handle to this event */
112: pevent_handler_t *handler; /* event handler function */
113: void *arg; /* event handler function arg */
114: int flags; /* event flags */
115: int poll_idx; /* index in poll(2) fds array */
116: pthread_mutex_t *mutex; /* user mutex, if any */
117: #if PDEL_DEBUG
118: int mutex_count; /* mutex count */
119: #endif
120: enum pevent_type type; /* type of this event */
121: struct timeval when; /* expiration for time events */
122: u_int refs; /* references to this event */
123: union {
124: int fd; /* file descriptor */
125: int millis; /* time delay */
126: struct mesg_port *port; /* mesg_port */
127: } u;
128: TAILQ_ENTRY(pevent) next; /* next in ctx->events */
129: };
130:
131: /* Macros */
132: #define PEVENT_ENQUEUE(ctx, ev) \
133: do { \
134: assert(((ev)->flags & PEVENT_ENQUEUED) == 0); \
135: TAILQ_INSERT_TAIL(&(ctx)->events, (ev), next); \
136: (ev)->flags |= PEVENT_ENQUEUED; \
137: (ctx)->nevents++; \
138: if ((ev)->type == PEVENT_READ \
139: || (ev)->type == PEVENT_WRITE) \
140: (ctx)->nrwevents++; \
141: DBG(PEVENT, "ev %p refs %d -> %d (enqueued)", \
142: (ev), (ev)->refs, (ev)->refs + 1); \
143: (ev)->refs++; \
144: } while (0)
145:
146: #define PEVENT_DEQUEUE(ctx, ev) \
147: do { \
148: assert(((ev)->flags & PEVENT_ENQUEUED) != 0); \
149: TAILQ_REMOVE(&(ctx)->events, (ev), next); \
150: (ctx)->nevents--; \
151: if ((ev)->type == PEVENT_READ \
152: || (ev)->type == PEVENT_WRITE) \
153: (ctx)->nrwevents--; \
154: (ev)->flags &= ~PEVENT_ENQUEUED; \
155: _pevent_unref(ev); \
156: } while (0)
157:
158: #define PEVENT_SET_OCCURRED(ctx, ev) \
159: do { \
160: (ev)->flags |= PEVENT_OCCURRED; \
161: if ((ev) != TAILQ_FIRST(&ctx->events)) { \
162: TAILQ_REMOVE(&(ctx)->events, (ev), next); \
163: TAILQ_INSERT_HEAD(&(ctx)->events, (ev), next); \
164: } \
165: } while (0)
166:
167: /* Internal functions */
168: static void pevent_ctx_service(struct pevent *ev);
169: static void *pevent_ctx_main(void *arg);
170: static void pevent_ctx_main_cleanup(void *arg);
171: static void *pevent_ctx_execute(void *arg);
172: static void pevent_ctx_execute_cleanup(void *arg);
173: static void pevent_ctx_notify(struct pevent_ctx *ctx);
174: static void pevent_ctx_unref(struct pevent_ctx *ctx);
175: static void pevent_cancel(struct pevent *ev);
176:
177: /* Internal variables */
178: static char pevent_byte;
179:
180: /*
181: * Create a new event context.
182: */
183: struct pevent_ctx *
184: pevent_ctx_create(const char *mtype, const pthread_attr_t *attr)
185: {
186: pthread_mutexattr_t mutexattr;
187: struct pevent_ctx *ctx;
188: int got_mutexattr = 0;
189: int got_mutex = 0;
190:
191: /* Create context object */
192: if ((ctx = MALLOC(mtype, sizeof(*ctx))) == NULL)
193: return (NULL);
194: memset(ctx, 0, sizeof(*ctx));
195: if (mtype != NULL) {
196: strlcpy(ctx->mtype_buf, mtype, sizeof(ctx->mtype_buf));
197: ctx->mtype = ctx->mtype_buf;
198: }
199: TAILQ_INIT(&ctx->events);
200:
201: /* Copy thread attributes */
202: if (attr != NULL) {
203: struct sched_param param;
204: int value;
205:
206: if ((errno = pthread_attr_init(&ctx->attr)) != 0)
207: goto fail;
208: ctx->has_attr = 1;
209: pthread_attr_getinheritsched(attr, &value);
210: pthread_attr_setinheritsched(&ctx->attr, value);
211: pthread_attr_getschedparam(attr, ¶m);
212: pthread_attr_setschedparam(&ctx->attr, ¶m);
213: pthread_attr_getschedpolicy(attr, &value);
214: pthread_attr_setschedpolicy(&ctx->attr, value);
215: pthread_attr_getscope(attr, &value);
216: pthread_attr_setscope(&ctx->attr, value);
217: }
218:
219: /* Initialize mutex */
220: if ((errno = pthread_mutexattr_init(&mutexattr) != 0))
221: goto fail;
222: got_mutexattr = 1;
223: pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_RECURSIVE);
224: if ((errno = pthread_mutex_init(&ctx->mutex, &mutexattr)) != 0)
225: goto fail;
226: got_mutex = 1;
227:
228: /* Initialize notify pipe */
229: if (pipe(ctx->pipe) == -1)
230: goto fail;
231:
232: /* Finish up */
233: pthread_mutexattr_destroy(&mutexattr);
234: ctx->magic = PEVENT_CTX_MAGIC;
235: ctx->refs = 1;
236: DBG(PEVENT, "created ctx %p", ctx);
237: return (ctx);
238:
239: fail:
240: /* Clean up after failure */
241: if (got_mutex)
242: pthread_mutex_destroy(&ctx->mutex);
243: if (got_mutexattr)
244: pthread_mutexattr_destroy(&mutexattr);
245: if (ctx->has_attr)
246: pthread_attr_destroy(&ctx->attr);
247: FREE(mtype, ctx);
248: return (NULL);
249: }
250:
251: /*
252: * Destroy an event context.
253: *
254: * All events are unregistered.
255: */
256: void
257: pevent_ctx_destroy(struct pevent_ctx **ctxp)
258: {
259: struct pevent_ctx *const ctx = *ctxp;
260:
261: /* Allow NULL context */
262: if (ctx == NULL)
263: return;
264: *ctxp = NULL;
265: DBG(PEVENT, "destroying ctx %p", ctx);
266:
267: /* Lock context */
268: MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
269:
270: /* Sanity check */
271: assert(ctx->magic == PEVENT_CTX_MAGIC);
272:
273: /* Unregister all events */
274: while (!TAILQ_EMPTY(&ctx->events))
275: pevent_unregister(TAILQ_FIRST(&ctx->events)->peventp);
276:
277: /* Decrement context reference count and unlock context */
278: pevent_ctx_unref(ctx);
279: }
280:
281: /*
282: * Return the number of registered events.
283: */
284: u_int
285: pevent_ctx_count(struct pevent_ctx *ctx)
286: {
287: u_int nevents;
288:
289: assert(ctx->magic == PEVENT_CTX_MAGIC);
290: MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
291: nevents = ctx->nevents;
292: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
293: return (nevents);
294: }
295:
296: /*
297: * Create a new schedule item.
298: */
299: int
300: pevent_register(struct pevent_ctx *ctx, struct pevent **peventp, int flags,
301: pthread_mutex_t *mutex, pevent_handler_t *handler, void *arg,
302: enum pevent_type type, ...)
303: {
304: struct pevent *ev;
305: va_list args;
306:
307: /* Sanity check */
308: if (*peventp != NULL) {
309: errno = EBUSY;
310: return (-1);
311: }
312:
313: /* Check flags */
314: if ((flags & ~PEVENT_USER_FLAGS) != 0) {
315: errno = EINVAL;
316: return (-1);
317: }
318:
319: /* Create new event */
320: if ((ev = MALLOC(ctx->mtype, sizeof(*ev))) == NULL)
321: return (-1);
322: memset(ev, 0, sizeof(*ev));
323: ev->magic = PEVENT_MAGIC;
324: ev->ctx = ctx;
325: ev->handler = handler;
326: ev->arg = arg;
327: ev->flags = flags;
328: ev->poll_idx = -1;
329: ev->mutex = mutex;
330: ev->type = type;
331: ev->refs = 1; /* the caller's reference */
332: DBG(PEVENT, "new ev %p in ctx %p", ev, ctx);
333:
334: /* Add type-specific info */
335: switch (type) {
336: case PEVENT_READ:
337: case PEVENT_WRITE:
338: va_start(args, type);
339: ev->u.fd = va_arg(args, int);
340: va_end(args);
341: break;
342: case PEVENT_TIME:
343: va_start(args, type);
344: ev->u.millis = va_arg(args, int);
345: va_end(args);
346: if (ev->u.millis < 0)
347: ev->u.millis = 0;
348: gettimeofday(&ev->when, NULL);
349: ev->when.tv_sec += ev->u.millis / 1000;
350: ev->when.tv_usec += (ev->u.millis % 1000) * 1000;
351: if (ev->when.tv_usec > 1000000) {
352: ev->when.tv_sec++;
353: ev->when.tv_usec -= 1000000;
354: }
355: break;
356: case PEVENT_MESG_PORT:
357: va_start(args, type);
358: ev->u.port = va_arg(args, struct mesg_port *);
359: va_end(args);
360: if (ev->u.port == NULL)
361: goto invalid;
362: break;
363: case PEVENT_USER:
364: break;
365: default:
366: invalid:
367: errno = EINVAL;
368: _pevent_unref(ev);
369: return (-1);
370: }
371:
372: /* Lock context */
373: MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
374:
375: /* Link to related object (if appropriate) */
376: switch (ev->type) {
377: case PEVENT_MESG_PORT:
378: if (_mesg_port_set_event(ev->u.port, ev) == -1) {
379: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
380: _pevent_unref(ev);
381: return (-1);
382: }
383: DBG(PEVENT, "ev %p refs %d -> %d (mesg port)",
384: ev, ev->refs, ev->refs + 1);
385: ev->refs++; /* the mesg_port's reference */
386: break;
387: default:
388: break;
389: }
390:
391: /* Start or notify event thread */
392: if (ctx->thread == 0) {
393: if ((errno = pthread_create(&ctx->thread,
394: ctx->has_attr ? &ctx->attr : NULL,
395: pevent_ctx_main, ctx)) != 0) {
396: ctx->thread = 0;
397: alogf(LOG_ERR, "%s: %m", "pthread_create");
398: ev->flags |= PEVENT_CANCELED;
399: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
400: _pevent_unref(ev);
401: return (-1);
402: }
403: pthread_detach(ctx->thread);
404: DBG(PEVENT, "created tid %p for ctx %p, refs -> %d",
405: ctx->thread, ctx, ctx->refs + 1);
406: ctx->refs++; /* add reference for thread */
407: } else
408: pevent_ctx_notify(ctx);
409:
410: /* Caller gets the one reference */
411: ev->peventp = peventp;
412: *peventp = ev;
413:
414: /* Add event to the pending event list */
415: PEVENT_ENQUEUE(ctx, ev);
416:
417: /* Unlock context */
418: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
419:
420: return (0);
421: }
422:
423: /*
424: * Unregister an event.
425: */
426: void
427: pevent_unregister(struct pevent **peventp)
428: {
429: struct pevent *const ev = *peventp;
430: struct pevent_ctx *ctx;
431:
432: /* Allow NULL event */
433: if (ev == NULL)
434: return;
435: ctx = ev->ctx;
436: DBG(PEVENT, "unregister ev %p in ctx %p", ev, ctx);
437:
438: /* Sanity checks */
439: assert(ev->magic == PEVENT_MAGIC);
440: assert(ctx->magic == PEVENT_CTX_MAGIC);
441: assert(ev->peventp == peventp);
442:
443: /* Lock context */
444: MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
445: DBG(PEVENT, "unregister ev %p in ctx %p: lock", ev, ctx);
446:
447: /* Dequeue event if queued */
448: if ((ev->flags & PEVENT_ENQUEUED) != 0) {
449: PEVENT_DEQUEUE(ctx, ev);
450: pevent_ctx_notify(ctx); /* wakeup thread */
451: }
452:
453: /* Cancel the event */
454: pevent_cancel(ev);
455:
456: /* Unlock context */
457: DBG(PEVENT, "unregister ev %p in ctx %p: unlock", ev, ctx);
458: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
459: }
460:
461: /*
462: * Trigger an event.
463: */
464: void
465: pevent_trigger(struct pevent *ev)
466: {
467: struct pevent_ctx *ctx;
468:
469: /* Sanity check */
470: if (ev == NULL)
471: return;
472:
473: /* Get context */
474: ctx = ev->ctx;
475: DBG(PEVENT, "trigger ev %p in ctx %p", ev, ctx);
476: assert(ev->magic == PEVENT_MAGIC);
477:
478: /* Lock context */
479: MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
480:
481: /* If event already canceled, bail */
482: if ((ev->flags & PEVENT_CANCELED) != 0)
483: goto done;
484:
485: /* Mark event as having occurred */
486: PEVENT_SET_OCCURRED(ctx, ev);
487:
488: /* Wake up thread if event is still in the queue */
489: if ((ev->flags & PEVENT_ENQUEUED) != 0)
490: pevent_ctx_notify(ctx);
491:
492: done:
493: /* Unlock context */
494: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
495: }
496:
497: /*
498: * Get event info.
499: */
500: int
501: pevent_get_info(struct pevent *ev, struct pevent_info *info)
502: {
503: if (ev == NULL) {
504: errno = ENXIO;
505: return (-1);
506: }
507: assert(ev->magic == PEVENT_MAGIC);
508: memset(info, 0, sizeof(*info));
509: info->type = ev->type;
510: switch (ev->type) {
511: case PEVENT_READ:
512: case PEVENT_WRITE:
513: info->u.fd = ev->u.fd;
514: break;
515: case PEVENT_TIME:
516: info->u.millis = ev->u.millis;
517: break;
518: case PEVENT_MESG_PORT:
519: info->u.port = ev->u.port;
520: break;
521: case PEVENT_USER:
522: break;
523: default:
524: errno = EINVAL;
525: return (-1);
526: }
527: return (0);
528: }
529:
530: /*
531: * Event thread entry point.
532: */
533: static void *
534: pevent_ctx_main(void *arg)
535: {
536: struct pevent_ctx *const ctx = arg;
537: struct timeval now;
538: struct pollfd *fd;
539: struct pevent *ev;
540: struct pevent *next_ev;
541: int poll_idx;
542: int timeout;
543: int r;
544:
545: /* Push cleanup hook */
546: pthread_cleanup_push(pevent_ctx_main_cleanup, ctx);
547: assert(ctx->magic == PEVENT_CTX_MAGIC);
548:
549: /* Lock context */
550: MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
551:
552: /* Safety belts */
553: pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
554:
555: /* Get current time */
556: gettimeofday(&now, NULL);
557: DBG(PEVENT, "ctx %p thread starting", ctx);
558:
559: loop:
560: /* Are there any events left? */
561: if (TAILQ_EMPTY(&ctx->events)) {
562: DBG(PEVENT, "ctx %p thread exiting", ctx);
563: goto done;
564: }
565:
566: /* Make sure ctx->fds array is long enough */
567: if (ctx->fds_alloc < 1 + ctx->nrwevents) {
568: const u_int new_alloc = roundup(1 + ctx->nrwevents, 16);
569: void *mem;
570:
571: if ((mem = REALLOC(ctx->mtype, ctx->fds,
572: new_alloc * sizeof(*ctx->fds))) == NULL)
573: alogf(LOG_ERR, "%s: %m", "realloc");
574: else {
575: ctx->fds = mem;
576: ctx->fds_alloc = new_alloc;
577: }
578: }
579:
580: /* If we were intentionally woken up, read the wakeup byte */
581: if (ctx->notified) {
582: DBG(PEVENT, "ctx %p thread was notified", ctx);
583: (void)read(ctx->pipe[0], &pevent_byte, 1);
584: ctx->notified = 0;
585: }
586:
587: /* Add event for the notify pipe */
588: poll_idx = 0;
589: if (ctx->fds_alloc > 0) {
590: fd = &ctx->fds[poll_idx++];
591: memset(fd, 0, sizeof(*fd));
592: fd->fd = ctx->pipe[0];
593: fd->events = POLLRDNORM;
594: }
595:
596: /* Fill in rest of poll() array */
597: timeout = INFTIM;
598: TAILQ_FOREACH(ev, &ctx->events, next) {
599: switch (ev->type) {
600: case PEVENT_READ:
601: case PEVENT_WRITE:
602: {
603: if (poll_idx >= ctx->fds_alloc) {
604: ev->poll_idx = -1;
605: break;
606: }
607: ev->poll_idx = poll_idx++;
608: fd = &ctx->fds[ev->poll_idx];
609: memset(fd, 0, sizeof(*fd));
610: fd->fd = ev->u.fd;
611: fd->events = (ev->type == PEVENT_READ) ?
612: POLLRDNORM : POLLWRNORM;
613: break;
614: }
615: case PEVENT_TIME:
616: {
617: struct timeval remain;
618: int millis;
619:
620: /* Compute milliseconds until event */
621: if (timercmp(&ev->when, &now, <=))
622: millis = 0;
623: else {
624: timersub(&ev->when, &now, &remain);
625: millis = remain.tv_sec * 1000;
626: millis += remain.tv_usec / 1000;
627: }
628:
629: /* Remember the minimum delay */
630: if (timeout == INFTIM || millis < timeout)
631: timeout = millis;
632: break;
633: }
634: default:
635: break;
636: }
637: }
638:
639: /* Mark non-poll() events that have occurred */
640: TAILQ_FOREACH(ev, &ctx->events, next) {
641: assert(ev->magic == PEVENT_MAGIC);
642: switch (ev->type) {
643: case PEVENT_MESG_PORT:
644: if (mesg_port_qlen(ev->u.port) > 0)
645: PEVENT_SET_OCCURRED(ctx, ev);
646: break;
647: default:
648: break;
649: }
650: if ((ev->flags & PEVENT_OCCURRED) != 0)
651: timeout = 0; /* don't delay */
652: }
653:
654: #if PDEL_DEBUG
655: /* Debugging */
656: DBG(PEVENT, "ctx %p thread event list:", ctx);
657: TAILQ_FOREACH(ev, &ctx->events, next)
658: DBG(PEVENT, "\tev %p", ev);
659: #endif
660:
661: /* Wait for something to happen */
662: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
663: DBG(PEVENT, "ctx %p thread sleeping", ctx);
664: r = poll(ctx->fds, poll_idx, timeout);
665: DBG(PEVENT, "ctx %p thread woke up", ctx);
666: assert(ctx->magic == PEVENT_CTX_MAGIC);
667: MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
668:
669: /* Check for errors */
670: if (r == -1 && errno != EINTR) {
671: alogf(LOG_CRIT, "%s: %m", "poll");
672: assert(0);
673: }
674:
675: /* Update current time */
676: gettimeofday(&now, NULL);
677:
678: /* Mark poll() events that have occurred */
679: for (ev = TAILQ_FIRST((&ctx->events)); ev != NULL; ev = next_ev) {
680: next_ev = TAILQ_NEXT(ev, next);
681: assert(ev->magic == PEVENT_MAGIC);
682: switch (ev->type) {
683: case PEVENT_READ:
684: case PEVENT_WRITE:
685: if (ev->poll_idx == -1)
686: break;
687: fd = &ctx->fds[ev->poll_idx];
688: if ((fd->revents & ((ev->type == PEVENT_READ) ?
689: READABLE_EVENTS : WRITABLE_EVENTS)) != 0)
690: PEVENT_SET_OCCURRED(ctx, ev);
691: break;
692: case PEVENT_TIME:
693: if (timercmp(&ev->when, &now, <=))
694: PEVENT_SET_OCCURRED(ctx, ev);
695: break;
696: default:
697: break;
698: }
699: }
700:
701: /* Service all events that are marked as having occurred */
702: while (1) {
703:
704: /* Find next event that needs service */
705: ev = TAILQ_FIRST(&ctx->events);
706: if (ev == NULL || (ev->flags & PEVENT_OCCURRED) == 0)
707: break;
708: DBG(PEVENT, "ctx %p thread servicing ev %p", ctx, ev);
709:
710: /* Detach event and service it while keeping a reference */
711: DBG(PEVENT, "ev %p refs %d -> %d (for servicing)",
712: ev, ev->refs, ev->refs + 1);
713: ev->refs++;
714: PEVENT_DEQUEUE(ctx, ev);
715: pevent_ctx_service(ev);
716: }
717:
718: /* Spin again */
719: DBG(PEVENT, "ctx %p thread spin again", ctx);
720: goto loop;
721:
722: done:;
723: /* Done */
724: pthread_cleanup_pop(1);
725: return (NULL);
726: }
727:
728: /*
729: * Cleanup routine for event thread.
730: */
731: static void
732: pevent_ctx_main_cleanup(void *arg)
733: {
734: struct pevent_ctx *const ctx = arg;
735:
736: DBG(PEVENT, "ctx %p thread cleanup", ctx);
737: ctx->thread = 0;
738: pevent_ctx_unref(ctx); /* release thread's reference */
739: }
740:
741: /*
742: * Service an event.
743: *
744: * This is called with an extra reference on the event which is
745: * removed by pevent_ctx_execute().
746: */
747: static void
748: pevent_ctx_service(struct pevent *ev)
749: {
750: struct pevent_ctx *const ctx = ev->ctx;
751: pthread_t tid;
752:
753: /* Does handler want to run in its own thread? */
754: if ((ev->flags & PEVENT_OWN_THREAD) != 0) {
755: if ((errno = pthread_create(&tid,
756: NULL, pevent_ctx_execute, ev)) != 0) {
757: alogf(LOG_ERR,
758: "can't spawn thread for event %p: %m", ev);
759: pevent_cancel(ev); /* removes the reference */
760: return;
761: }
762: pthread_detach(tid);
763: DBG(PEVENT, "dispatched thread %p for ev %p", tid, ev);
764: return;
765: }
766:
767: /* Execute handler with context unlocked */
768: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
769: pevent_ctx_execute(ev);
770: MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
771: }
772:
773: /*
774: * Execute an event handler.
775: *
776: * Grab the user mutex, if any, check if event was canceled,
777: * execute handler, release mutex.
778: *
779: * This is called with an extra reference on the event, which is
780: * removed by this function.
781: *
782: * This should be NOT be called with a lock on the event context.
783: */
784: static void *
785: pevent_ctx_execute(void *arg)
786: {
787: struct pevent *const ev = arg;
788: struct pevent_ctx *const ctx = ev->ctx;
789: int r;
790:
791: /* Sanity check */
792: assert(ev->magic == PEVENT_MAGIC);
793:
794: /* Register cleanup hook */
795: DBG(PEVENT, "ev %p being executed", ev);
796: pthread_cleanup_push(pevent_ctx_execute_cleanup, ev);
797:
798: try_again:
799: /* Acquire context mutex */
800: DBG(PEVENT, "locking ctx %p for ev %p", ctx, ev);
801: MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
802:
803: /* Check for cancellation to close the cancel/timeout race */
804: if ((ev->flags & PEVENT_CANCELED) != 0) {
805: DBG(PEVENT, "ev %p was canceled", ev);
806: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
807: goto done;
808: }
809:
810: /*
811: * Acquire user mutex but avoid two potential problems:
812: *
813: * - Race window: other thread could call timer_cancel()
814: * and destroy the user mutex just before we try to
815: * acquire it; we must hold context lock to prevent this.
816: *
817: * - Lock reversal deadlock: other threads acquire the
818: * user lock before the context lock, we do the reverse.
819: *
820: * Once we've acquire both locks, we drop the context lock.
821: */
822: if (ev->mutex != NULL) {
823: MUTEX_TRYLOCK(ev->mutex, ev->mutex_count);
824: if (errno != 0) {
825: if (errno != EBUSY) {
826: alogf(LOG_ERR,
827: "can't lock mutex for event %p: %m", ev);
828: pevent_cancel(ev);
829: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
830: goto done;
831: }
832: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
833: sched_yield();
834: goto try_again;
835: }
836: ev->flags |= PEVENT_GOT_MUTEX;
837: }
838:
839: /* Remove user's event reference (we still have one though) */
840: pevent_cancel(ev);
841:
842: /* Release context mutex */
843: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
844:
845: /* Reschedule a new event if recurring */
846: if ((ev->flags & PEVENT_RECURRING) != 0) {
847: switch (ev->type) {
848: case PEVENT_READ:
849: case PEVENT_WRITE:
850: r = pevent_register(ev->ctx, ev->peventp,
851: (ev->flags & PEVENT_USER_FLAGS), ev->mutex,
852: ev->handler, ev->arg, ev->type, ev->u.fd);
853: break;
854: case PEVENT_TIME:
855: r = pevent_register(ev->ctx, ev->peventp,
856: (ev->flags & PEVENT_USER_FLAGS), ev->mutex,
857: ev->handler, ev->arg, ev->type, ev->u.millis);
858: break;
859: case PEVENT_MESG_PORT:
860: r = pevent_register(ev->ctx, ev->peventp,
861: (ev->flags & PEVENT_USER_FLAGS), ev->mutex,
862: ev->handler, ev->arg, ev->type, ev->u.port);
863: break;
864: case PEVENT_USER:
865: r = pevent_register(ev->ctx, ev->peventp,
866: (ev->flags & PEVENT_USER_FLAGS), ev->mutex,
867: ev->handler, ev->arg, ev->type);
868: break;
869: default:
870: r = -1;
871: assert(0);
872: }
873: if (r == -1) {
874: alogf(LOG_ERR,
875: "can't re-register recurring event %p: %m", ev);
876: }
877: }
878:
879: /* Execute the handler */
880: (*ev->handler)(ev->arg);
881:
882: done:;
883: /* Release user mutex and event reference */
884: pthread_cleanup_pop(1);
885: return (NULL);
886: }
887:
888: /*
889: * Clean up for pevent_ctx_execute()
890: */
891: static void
892: pevent_ctx_execute_cleanup(void *arg)
893: {
894: struct pevent *const ev = arg;
895:
896: DBG(PEVENT, "ev %p cleanup", ev);
897: assert(ev->magic == PEVENT_MAGIC);
898: if ((ev->flags & PEVENT_GOT_MUTEX) != 0)
899: MUTEX_UNLOCK(ev->mutex, ev->mutex_count);
900: _pevent_unref(ev);
901: }
902:
903: /*
904: * Wakeup event thread because the event list has changed.
905: *
906: * This assumes the mutex is locked.
907: */
908: static void
909: pevent_ctx_notify(struct pevent_ctx *ctx)
910: {
911: DBG(PEVENT, "ctx %p being notified", ctx);
912: if (!ctx->notified) {
913: (void)write(ctx->pipe[1], &pevent_byte, 1);
914: ctx->notified = 1;
915: }
916: }
917:
918: /*
919: * Cancel an event (make it so that it never gets triggered) and
920: * remove the user reference to it.
921: *
922: * This assumes the lock is held.
923: */
924: static void
925: pevent_cancel(struct pevent *ev)
926: {
927: DBG(PEVENT, "canceling ev %p", ev);
928: assert((ev->flags & PEVENT_CANCELED) == 0);
929: ev->flags |= PEVENT_CANCELED;
930: *ev->peventp = NULL;
931: _pevent_unref(ev);
932: }
933:
934: /*
935: * Returns true if the event has been canceled.
936: */
937: int
938: _pevent_canceled(struct pevent *ev)
939: {
940: assert(ev->magic == PEVENT_MAGIC);
941: return ((ev->flags & PEVENT_CANCELED) != 0);
942: }
943:
944: /*
945: * Drop a reference on an event.
946: */
947: void
948: _pevent_unref(struct pevent *ev)
949: {
950: struct pevent_ctx *const ctx = ev->ctx;
951:
952: DBG(PEVENT, "ev %p refs %d -> %d", ev, ev->refs, ev->refs - 1);
953: assert(ev->refs > 0);
954: assert(ev->magic == PEVENT_MAGIC);
955: MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
956: if (--ev->refs > 0) {
957: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
958: return;
959: }
960: assert((ev->flags & PEVENT_ENQUEUED) == 0);
961: ev->magic = ~0; /* invalidate magic number */
962: DBG(PEVENT, "freeing ev %p", ev);
963: FREE(ctx->mtype, ev);
964: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
965: }
966:
967: /*
968: * Release context lock and drop a context reference.
969: *
970: * This assumes the mutex is locked. Upon return it will be unlocked.
971: */
972: static void
973: pevent_ctx_unref(struct pevent_ctx *ctx)
974: {
975: DBG(PEVENT, "ctx %p refs %d -> %d", ctx, ctx->refs, ctx->refs - 1);
976: assert(ctx->refs > 0);
977: assert(ctx->magic == PEVENT_CTX_MAGIC);
978: if (--ctx->refs > 0) {
979: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
980: return;
981: }
982: assert(TAILQ_EMPTY(&ctx->events));
983: assert(ctx->nevents == 0);
984: assert(ctx->thread == 0);
985: (void)close(ctx->pipe[0]);
986: (void)close(ctx->pipe[1]);
987: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
988: pthread_mutex_destroy(&ctx->mutex);
989: if (ctx->has_attr)
990: pthread_attr_destroy(&ctx->attr);
991: ctx->magic = ~0; /* invalidate magic number */
992: DBG(PEVENT, "freeing ctx %p", ctx);
993: FREE(ctx->mtype, ctx->fds);
994: FREE(ctx->mtype, ctx);
995: }
996:
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>