Annotation of embedaddon/mpd/src/contrib/libpdel/util/pevent.c, revision 1.1.1.1
1.1 misho 1:
2: /*
3: * Copyright (c) 2001-2002 Packet Design, LLC.
4: * All rights reserved.
5: *
6: * Subject to the following obligations and disclaimer of warranty,
7: * use and redistribution of this software, in source or object code
8: * forms, with or without modifications are expressly permitted by
9: * Packet Design; provided, however, that:
10: *
11: * (i) Any and all reproductions of the source or object code
12: * must include the copyright notice above and the following
13: * disclaimer of warranties; and
14: * (ii) No rights are granted, in any manner or form, to use
15: * Packet Design trademarks, including the mark "PACKET DESIGN"
16: * on advertising, endorsements, or otherwise except as such
17: * appears in the above copyright notice or in the software.
18: *
19: * THIS SOFTWARE IS BEING PROVIDED BY PACKET DESIGN "AS IS", AND
20: * TO THE MAXIMUM EXTENT PERMITTED BY LAW, PACKET DESIGN MAKES NO
21: * REPRESENTATIONS OR WARRANTIES, EXPRESS OR IMPLIED, REGARDING
22: * THIS SOFTWARE, INCLUDING WITHOUT LIMITATION, ANY AND ALL IMPLIED
23: * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE,
24: * OR NON-INFRINGEMENT. PACKET DESIGN DOES NOT WARRANT, GUARANTEE,
25: * OR MAKE ANY REPRESENTATIONS REGARDING THE USE OF, OR THE RESULTS
26: * OF THE USE OF THIS SOFTWARE IN TERMS OF ITS CORRECTNESS, ACCURACY,
27: * RELIABILITY OR OTHERWISE. IN NO EVENT SHALL PACKET DESIGN BE
28: * LIABLE FOR ANY DAMAGES RESULTING FROM OR ARISING OUT OF ANY USE
29: * OF THIS SOFTWARE, INCLUDING WITHOUT LIMITATION, ANY DIRECT,
30: * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, PUNITIVE, OR CONSEQUENTIAL
31: * DAMAGES, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES, LOSS OF
32: * USE, DATA OR PROFITS, HOWEVER CAUSED AND UNDER ANY THEORY OF
33: * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
34: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
35: * THE USE OF THIS SOFTWARE, EVEN IF PACKET DESIGN IS ADVISED OF
36: * THE POSSIBILITY OF SUCH DAMAGE.
37: *
38: * Author: Archie Cobbs <archie@freebsd.org>
39: */
40:
41: #include <sys/types.h>
42: #include <sys/param.h>
43: #include <sys/queue.h>
44: #include <sys/time.h>
45:
46: #include <netinet/in.h>
47:
48: #include <stdarg.h>
49: #include <stdio.h>
50: #include <syslog.h>
51: #include <string.h>
52: #include <errno.h>
53: #include <assert.h>
54: #include <unistd.h>
55: #include <poll.h>
56: #include <sched.h>
57: #include <pthread.h>
58:
59: #include "structs/structs.h"
60: #include "structs/type/array.h"
61: #include "util/typed_mem.h"
62: #include "util/mesg_port.h"
63: #include "util/pevent.h"
64: #ifdef SYSLOG_FACILITY
65: #define alogf(sev, fmt, arg...) syslog(sev, "%s: " fmt, __FUNCTION__ , ## arg)
66: #else
67: #define alogf(sev, fmt, arg...) do{}while(0)
68: #endif
69:
70: #include "internal.h"
71: #include "debug/debug.h"
72:
73: #define PEVENT_MAGIC 0x31d7699b
74: #define PEVENT_CTX_MAGIC 0x7842f901
75:
76: /* Private flags */
77: #define PEVENT_OCCURRED 0x8000 /* event has occurred */
78: #define PEVENT_CANCELED 0x4000 /* event canceled or done */
79: #define PEVENT_ENQUEUED 0x2000 /* in the ctx->events queue */
80: #define PEVENT_GOT_MUTEX 0x1000 /* user mutex acquired */
81:
82: #define PEVENT_USER_FLAGS (PEVENT_RECURRING | PEVENT_OWN_THREAD)
83:
84: #define READABLE_EVENTS (POLLIN | POLLRDNORM | POLLERR \
85: | POLLHUP | POLLNVAL)
86: #define WRITABLE_EVENTS (POLLOUT | POLLWRNORM | POLLWRBAND \
87: | POLLERR | POLLHUP | POLLNVAL)
88:
89: /* Event context */
90: struct pevent_ctx {
91: u_int32_t magic; /* magic number */
92: pthread_mutex_t mutex; /* mutex for context */
93: #if PDEL_DEBUG
94: int mutex_count; /* mutex count */
95: #endif
96: pthread_attr_t attr; /* event thread attributes */
97: pthread_t thread; /* event thread */
98: TAILQ_HEAD(, pevent) events; /* pending event list */
99: u_int nevents; /* length of 'events' list */
100: u_int nrwevents; /* number read/write events */
101: struct pollfd *fds; /* poll(2) fds array */
102: u_int fds_alloc; /* allocated size of 'fds' */
103: const char *mtype; /* typed_mem(3) memory type */
104: char mtype_buf[TYPED_MEM_TYPELEN];
105: int pipe[2]; /* event thread notify pipe */
106: u_char notified; /* data in the pipe */
107: u_char has_attr; /* 'attr' is valid */
108: u_int refs; /* references to this context */
109: };
110:
111: /* Event object */
112: struct pevent {
113: u_int32_t magic; /* magic number */
114: struct pevent_ctx *ctx; /* pointer to event context */
115: struct pevent **peventp; /* user handle to this event */
116: pevent_handler_t *handler; /* event handler function */
117: void *arg; /* event handler function arg */
118: int flags; /* event flags */
119: int poll_idx; /* index in poll(2) fds array */
120: pthread_mutex_t *mutex; /* user mutex, if any */
121: #if PDEL_DEBUG
122: int mutex_count; /* mutex count */
123: #endif
124: enum pevent_type type; /* type of this event */
125: struct timeval when; /* expiration for time events */
126: u_int refs; /* references to this event */
127: union {
128: int fd; /* file descriptor */
129: int millis; /* time delay */
130: struct mesg_port *port; /* mesg_port */
131: } u;
132: TAILQ_ENTRY(pevent) next; /* next in ctx->events */
133: };
134:
135: /* Macros */
136: #define PEVENT_ENQUEUE(ctx, ev) \
137: do { \
138: assert(((ev)->flags & PEVENT_ENQUEUED) == 0); \
139: TAILQ_INSERT_TAIL(&(ctx)->events, (ev), next); \
140: (ev)->flags |= PEVENT_ENQUEUED; \
141: (ctx)->nevents++; \
142: if ((ev)->type == PEVENT_READ \
143: || (ev)->type == PEVENT_WRITE) \
144: (ctx)->nrwevents++; \
145: DBG(PEVENT, "ev %p refs %d -> %d (enqueued)", \
146: (ev), (ev)->refs, (ev)->refs + 1); \
147: (ev)->refs++; \
148: } while (0)
149:
150: #define PEVENT_DEQUEUE(ctx, ev) \
151: do { \
152: assert(((ev)->flags & PEVENT_ENQUEUED) != 0); \
153: TAILQ_REMOVE(&(ctx)->events, (ev), next); \
154: (ctx)->nevents--; \
155: if ((ev)->type == PEVENT_READ \
156: || (ev)->type == PEVENT_WRITE) \
157: (ctx)->nrwevents--; \
158: (ev)->flags &= ~PEVENT_ENQUEUED; \
159: _pevent_unref(ev); \
160: } while (0)
161:
162: #define PEVENT_SET_OCCURRED(ctx, ev) \
163: do { \
164: (ev)->flags |= PEVENT_OCCURRED; \
165: if ((ev) != TAILQ_FIRST(&ctx->events)) { \
166: TAILQ_REMOVE(&(ctx)->events, (ev), next); \
167: TAILQ_INSERT_HEAD(&(ctx)->events, (ev), next); \
168: } \
169: } while (0)
170:
171: /* Internal functions */
172: static void pevent_ctx_service(struct pevent *ev);
173: static void *pevent_ctx_main(void *arg);
174: static void pevent_ctx_main_cleanup(void *arg);
175: static void *pevent_ctx_execute(void *arg);
176: static void pevent_ctx_execute_cleanup(void *arg);
177: static void pevent_ctx_notify(struct pevent_ctx *ctx);
178: static void pevent_ctx_unref(struct pevent_ctx *ctx);
179: static void pevent_cancel(struct pevent *ev);
180:
181: /* Internal variables */
182: static char pevent_byte;
183:
184: /*
185: * Create a new event context.
186: */
187: struct pevent_ctx *
188: pevent_ctx_create(const char *mtype, const pthread_attr_t *attr)
189: {
190: pthread_mutexattr_t mutexattr;
191: struct pevent_ctx *ctx;
192: int got_mutexattr = 0;
193: int got_mutex = 0;
194:
195: /* Create context object */
196: if ((ctx = MALLOC(mtype, sizeof(*ctx))) == NULL)
197: return (NULL);
198: memset(ctx, 0, sizeof(*ctx));
199: if (mtype != NULL) {
200: strlcpy(ctx->mtype_buf, mtype, sizeof(ctx->mtype_buf));
201: ctx->mtype = ctx->mtype_buf;
202: }
203: TAILQ_INIT(&ctx->events);
204:
205: /* Copy thread attributes */
206: if (attr != NULL) {
207: struct sched_param param;
208: int value;
209:
210: if ((errno = pthread_attr_init(&ctx->attr)) != 0)
211: goto fail;
212: ctx->has_attr = 1;
213: pthread_attr_getinheritsched((pthread_attr_t *) attr, &value);
214: pthread_attr_setinheritsched(&ctx->attr, value);
215: pthread_attr_getschedparam((pthread_attr_t *) attr, ¶m);
216: pthread_attr_setschedparam(&ctx->attr, ¶m);
217: pthread_attr_getschedpolicy((pthread_attr_t *) attr, &value);
218: pthread_attr_setschedpolicy(&ctx->attr, value);
219: pthread_attr_getscope((pthread_attr_t *) attr, &value);
220: pthread_attr_setscope(&ctx->attr, value);
221: }
222:
223: /* Initialize mutex */
224: if ((errno = pthread_mutexattr_init(&mutexattr) != 0))
225: goto fail;
226: got_mutexattr = 1;
227: pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_RECURSIVE);
228: if ((errno = pthread_mutex_init(&ctx->mutex, &mutexattr)) != 0)
229: goto fail;
230: got_mutex = 1;
231:
232: /* Initialize notify pipe */
233: if (pipe(ctx->pipe) == -1)
234: goto fail;
235:
236: /* Finish up */
237: pthread_mutexattr_destroy(&mutexattr);
238: ctx->magic = PEVENT_CTX_MAGIC;
239: ctx->refs = 1;
240: DBG(PEVENT, "created ctx %p", ctx);
241: return (ctx);
242:
243: fail:
244: /* Clean up after failure */
245: if (got_mutex)
246: pthread_mutex_destroy(&ctx->mutex);
247: if (got_mutexattr)
248: pthread_mutexattr_destroy(&mutexattr);
249: if (ctx->has_attr)
250: pthread_attr_destroy(&ctx->attr);
251: FREE(mtype, ctx);
252: return (NULL);
253: }
254:
255: /*
256: * Destroy an event context.
257: *
258: * All events are unregistered.
259: */
260: void
261: pevent_ctx_destroy(struct pevent_ctx **ctxp)
262: {
263: struct pevent_ctx *const ctx = *ctxp;
264:
265: /* Allow NULL context */
266: if (ctx == NULL)
267: return;
268: *ctxp = NULL;
269: DBG(PEVENT, "destroying ctx %p", ctx);
270:
271: /* Lock context */
272: MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
273:
274: /* Sanity check */
275: assert(ctx->magic == PEVENT_CTX_MAGIC);
276:
277: /* Unregister all events */
278: while (!TAILQ_EMPTY(&ctx->events))
279: pevent_unregister(TAILQ_FIRST(&ctx->events)->peventp);
280:
281: /* Decrement context reference count and unlock context */
282: pevent_ctx_unref(ctx);
283: }
284:
285: /*
286: * Return the number of registered events.
287: */
288: u_int
289: pevent_ctx_count(struct pevent_ctx *ctx)
290: {
291: u_int nevents;
292:
293: assert(ctx->magic == PEVENT_CTX_MAGIC);
294: MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
295: nevents = ctx->nevents;
296: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
297: return (nevents);
298: }
299:
300: /*
301: * Create a new schedule item.
302: */
303: int
304: pevent_register(struct pevent_ctx *ctx, struct pevent **peventp, int flags,
305: pthread_mutex_t *mutex, pevent_handler_t *handler, void *arg,
306: enum pevent_type type, ...)
307: {
308: struct pevent *ev;
309: va_list args;
310:
311: /* Sanity check */
312: if (*peventp != NULL) {
313: errno = EBUSY;
314: return (-1);
315: }
316:
317: /* Check flags */
318: if ((flags & ~PEVENT_USER_FLAGS) != 0) {
319: errno = EINVAL;
320: return (-1);
321: }
322:
323: /* Create new event */
324: if ((ev = MALLOC(ctx->mtype, sizeof(*ev))) == NULL)
325: return (-1);
326: memset(ev, 0, sizeof(*ev));
327: ev->magic = PEVENT_MAGIC;
328: ev->ctx = ctx;
329: ev->handler = handler;
330: ev->arg = arg;
331: ev->flags = flags;
332: ev->poll_idx = -1;
333: ev->mutex = mutex;
334: ev->type = type;
335: ev->refs = 1; /* the caller's reference */
336: DBG(PEVENT, "new ev %p in ctx %p", ev, ctx);
337:
338: /* Add type-specific info */
339: switch (type) {
340: case PEVENT_READ:
341: case PEVENT_WRITE:
342: va_start(args, type);
343: ev->u.fd = va_arg(args, int);
344: va_end(args);
345: break;
346: case PEVENT_TIME:
347: va_start(args, type);
348: ev->u.millis = va_arg(args, int);
349: va_end(args);
350: if (ev->u.millis < 0)
351: ev->u.millis = 0;
352: gettimeofday(&ev->when, NULL);
353: ev->when.tv_sec += ev->u.millis / 1000;
354: ev->when.tv_usec += (ev->u.millis % 1000) * 1000;
355: if (ev->when.tv_usec > 1000000) {
356: ev->when.tv_sec++;
357: ev->when.tv_usec -= 1000000;
358: }
359: break;
360: case PEVENT_MESG_PORT:
361: va_start(args, type);
362: ev->u.port = va_arg(args, struct mesg_port *);
363: va_end(args);
364: if (ev->u.port == NULL)
365: goto invalid;
366: break;
367: case PEVENT_USER:
368: break;
369: default:
370: invalid:
371: errno = EINVAL;
372: _pevent_unref(ev);
373: return (-1);
374: }
375:
376: /* Lock context */
377: MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
378:
379: /* Link to related object (if appropriate) */
380: switch (ev->type) {
381: case PEVENT_MESG_PORT:
382: if (_mesg_port_set_event(ev->u.port, ev) == -1) {
383: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
384: _pevent_unref(ev);
385: return (-1);
386: }
387: DBG(PEVENT, "ev %p refs %d -> %d (mesg port)",
388: ev, ev->refs, ev->refs + 1);
389: ev->refs++; /* the mesg_port's reference */
390: break;
391: default:
392: break;
393: }
394:
395: /* Start or notify event thread */
396: if (ctx->thread == 0) {
397: if ((errno = pthread_create(&ctx->thread,
398: ctx->has_attr ? &ctx->attr : NULL,
399: pevent_ctx_main, ctx)) != 0) {
400: ctx->thread = 0;
401: alogf(LOG_ERR, "%s: %m", "pthread_create");
402: ev->flags |= PEVENT_CANCELED;
403: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
404: _pevent_unref(ev);
405: return (-1);
406: }
407: pthread_detach(ctx->thread);
408: DBG(PEVENT, "created tid %p for ctx %p, refs -> %d",
409: ctx->thread, ctx, ctx->refs + 1);
410: ctx->refs++; /* add reference for thread */
411: } else
412: pevent_ctx_notify(ctx);
413:
414: /* Caller gets the one reference */
415: ev->peventp = peventp;
416: *peventp = ev;
417:
418: /* Add event to the pending event list */
419: PEVENT_ENQUEUE(ctx, ev);
420:
421: /* Unlock context */
422: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
423:
424: return (0);
425: }
426:
427: /*
428: * Unregister an event.
429: */
430: void
431: pevent_unregister(struct pevent **peventp)
432: {
433: struct pevent *const ev = *peventp;
434: struct pevent_ctx *ctx;
435:
436: /* Allow NULL event */
437: if (ev == NULL)
438: return;
439: ctx = ev->ctx;
440: DBG(PEVENT, "unregister ev %p in ctx %p", ev, ctx);
441:
442: /* Sanity checks */
443: assert(ev->magic == PEVENT_MAGIC);
444: assert(ctx->magic == PEVENT_CTX_MAGIC);
445: assert(ev->peventp == peventp);
446:
447: /* Lock context */
448: MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
449: DBG(PEVENT, "unregister ev %p in ctx %p: lock", ev, ctx);
450:
451: /* Dequeue event if queued */
452: if ((ev->flags & PEVENT_ENQUEUED) != 0) {
453: PEVENT_DEQUEUE(ctx, ev);
454: pevent_ctx_notify(ctx); /* wakeup thread */
455: }
456:
457: /* Cancel the event */
458: pevent_cancel(ev);
459:
460: /* Unlock context */
461: DBG(PEVENT, "unregister ev %p in ctx %p: unlock", ev, ctx);
462: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
463: }
464:
465: /*
466: * Trigger an event.
467: */
468: void
469: pevent_trigger(struct pevent *ev)
470: {
471: struct pevent_ctx *ctx;
472:
473: /* Sanity check */
474: if (ev == NULL)
475: return;
476:
477: /* Get context */
478: ctx = ev->ctx;
479: DBG(PEVENT, "trigger ev %p in ctx %p", ev, ctx);
480: assert(ev->magic == PEVENT_MAGIC);
481:
482: /* Lock context */
483: MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
484:
485: /* If event already canceled, bail */
486: if ((ev->flags & PEVENT_CANCELED) != 0)
487: goto done;
488:
489: /* Mark event as having occurred */
490: PEVENT_SET_OCCURRED(ctx, ev);
491:
492: /* Wake up thread if event is still in the queue */
493: if ((ev->flags & PEVENT_ENQUEUED) != 0)
494: pevent_ctx_notify(ctx);
495:
496: done:
497: /* Unlock context */
498: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
499: }
500:
501: /*
502: * Get event info.
503: */
504: int
505: pevent_get_info(struct pevent *ev, struct pevent_info *info)
506: {
507: if (ev == NULL) {
508: errno = ENXIO;
509: return (-1);
510: }
511: assert(ev->magic == PEVENT_MAGIC);
512: memset(info, 0, sizeof(*info));
513: info->type = ev->type;
514: switch (ev->type) {
515: case PEVENT_READ:
516: case PEVENT_WRITE:
517: info->u.fd = ev->u.fd;
518: break;
519: case PEVENT_TIME:
520: info->u.millis = ev->u.millis;
521: break;
522: case PEVENT_MESG_PORT:
523: info->u.port = ev->u.port;
524: break;
525: case PEVENT_USER:
526: break;
527: default:
528: errno = EINVAL;
529: return (-1);
530: }
531: return (0);
532: }
533:
534: /*
535: * Event thread entry point.
536: */
537: static void *
538: pevent_ctx_main(void *arg)
539: {
540: struct pevent_ctx *const ctx = arg;
541: struct timeval now;
542: struct pollfd *fd;
543: struct pevent *ev;
544: struct pevent *next_ev;
545: int poll_idx;
546: int timeout;
547: int r;
548:
549: /* Push cleanup hook */
550: pthread_cleanup_push(pevent_ctx_main_cleanup, ctx);
551: assert(ctx->magic == PEVENT_CTX_MAGIC);
552:
553: /* Lock context */
554: MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
555:
556: /* Safety belts */
557: pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
558:
559: /* Get current time */
560: gettimeofday(&now, NULL);
561: DBG(PEVENT, "ctx %p thread starting", ctx);
562:
563: loop:
564: /* Are there any events left? */
565: if (TAILQ_EMPTY(&ctx->events)) {
566: DBG(PEVENT, "ctx %p thread exiting", ctx);
567: goto done;
568: }
569:
570: /* Make sure ctx->fds array is long enough */
571: if (ctx->fds_alloc < 1 + ctx->nrwevents) {
572: const u_int new_alloc = roundup(1 + ctx->nrwevents, 16);
573: void *mem;
574:
575: if ((mem = REALLOC(ctx->mtype, ctx->fds,
576: new_alloc * sizeof(*ctx->fds))) == NULL)
577: alogf(LOG_ERR, "%s: %m", "realloc");
578: else {
579: ctx->fds = mem;
580: ctx->fds_alloc = new_alloc;
581: }
582: }
583:
584: /* If we were intentionally woken up, read the wakeup byte */
585: if (ctx->notified) {
586: DBG(PEVENT, "ctx %p thread was notified", ctx);
587: (void)read(ctx->pipe[0], &pevent_byte, 1);
588: ctx->notified = 0;
589: }
590:
591: /* Add event for the notify pipe */
592: poll_idx = 0;
593: if (ctx->fds_alloc > 0) {
594: fd = &ctx->fds[poll_idx++];
595: memset(fd, 0, sizeof(*fd));
596: fd->fd = ctx->pipe[0];
597: fd->events = POLLRDNORM;
598: }
599:
600: /* Fill in rest of poll() array */
601: timeout = INFTIM;
602: TAILQ_FOREACH(ev, &ctx->events, next) {
603: switch (ev->type) {
604: case PEVENT_READ:
605: case PEVENT_WRITE:
606: {
607: if (poll_idx >= ctx->fds_alloc) {
608: ev->poll_idx = -1;
609: break;
610: }
611: ev->poll_idx = poll_idx++;
612: fd = &ctx->fds[ev->poll_idx];
613: memset(fd, 0, sizeof(*fd));
614: fd->fd = ev->u.fd;
615: fd->events = (ev->type == PEVENT_READ) ?
616: POLLRDNORM : POLLWRNORM;
617: break;
618: }
619: case PEVENT_TIME:
620: {
621: struct timeval remain;
622: int millis;
623:
624: /* Compute milliseconds until event */
625: if (timercmp(&ev->when, &now, <=))
626: millis = 0;
627: else {
628: timersub(&ev->when, &now, &remain);
629: millis = remain.tv_sec * 1000;
630: millis += remain.tv_usec / 1000;
631: }
632:
633: /* Remember the minimum delay */
634: if (timeout == INFTIM || millis < timeout)
635: timeout = millis;
636: break;
637: }
638: default:
639: break;
640: }
641: }
642:
643: /* Mark non-poll() events that have occurred */
644: TAILQ_FOREACH(ev, &ctx->events, next) {
645: assert(ev->magic == PEVENT_MAGIC);
646: switch (ev->type) {
647: case PEVENT_MESG_PORT:
648: if (mesg_port_qlen(ev->u.port) > 0)
649: PEVENT_SET_OCCURRED(ctx, ev);
650: break;
651: default:
652: break;
653: }
654: if ((ev->flags & PEVENT_OCCURRED) != 0)
655: timeout = 0; /* don't delay */
656: }
657:
658: #if PDEL_DEBUG
659: /* Debugging */
660: DBG(PEVENT, "ctx %p thread event list:", ctx);
661: TAILQ_FOREACH(ev, &ctx->events, next)
662: DBG(PEVENT, "\tev %p", ev);
663: #endif
664:
665: /* Wait for something to happen */
666: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
667: DBG(PEVENT, "ctx %p thread sleeping", ctx);
668: r = poll(ctx->fds, poll_idx, timeout);
669: DBG(PEVENT, "ctx %p thread woke up", ctx);
670: assert(ctx->magic == PEVENT_CTX_MAGIC);
671: MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
672:
673: /* Check for errors */
674: if (r == -1 && errno != EINTR) {
675: alogf(LOG_CRIT, "%s: %m", "poll");
676: assert(0);
677: }
678:
679: /* Update current time */
680: gettimeofday(&now, NULL);
681:
682: /* Mark poll() events that have occurred */
683: for (ev = TAILQ_FIRST((&ctx->events)); ev != NULL; ev = next_ev) {
684: next_ev = TAILQ_NEXT(ev, next);
685: assert(ev->magic == PEVENT_MAGIC);
686: switch (ev->type) {
687: case PEVENT_READ:
688: case PEVENT_WRITE:
689: if (ev->poll_idx == -1)
690: break;
691: fd = &ctx->fds[ev->poll_idx];
692: if ((fd->revents & ((ev->type == PEVENT_READ) ?
693: READABLE_EVENTS : WRITABLE_EVENTS)) != 0)
694: PEVENT_SET_OCCURRED(ctx, ev);
695: break;
696: case PEVENT_TIME:
697: if (timercmp(&ev->when, &now, <=))
698: PEVENT_SET_OCCURRED(ctx, ev);
699: break;
700: default:
701: break;
702: }
703: }
704:
705: /* Service all events that are marked as having occurred */
706: while (1) {
707:
708: /* Find next event that needs service */
709: ev = TAILQ_FIRST(&ctx->events);
710: if (ev == NULL || (ev->flags & PEVENT_OCCURRED) == 0)
711: break;
712: DBG(PEVENT, "ctx %p thread servicing ev %p", ctx, ev);
713:
714: /* Detach event and service it while keeping a reference */
715: DBG(PEVENT, "ev %p refs %d -> %d (for servicing)",
716: ev, ev->refs, ev->refs + 1);
717: ev->refs++;
718: PEVENT_DEQUEUE(ctx, ev);
719: pevent_ctx_service(ev);
720: }
721:
722: /* Spin again */
723: DBG(PEVENT, "ctx %p thread spin again", ctx);
724: goto loop;
725:
726: done:;
727: /* Done */
728: pthread_cleanup_pop(1);
729: return (NULL);
730: }
731:
732: /*
733: * Cleanup routine for event thread.
734: */
735: static void
736: pevent_ctx_main_cleanup(void *arg)
737: {
738: struct pevent_ctx *const ctx = arg;
739:
740: DBG(PEVENT, "ctx %p thread cleanup", ctx);
741: ctx->thread = 0;
742: pevent_ctx_unref(ctx); /* release thread's reference */
743: }
744:
745: /*
746: * Service an event.
747: *
748: * This is called with an extra reference on the event which is
749: * removed by pevent_ctx_execute().
750: */
751: static void
752: pevent_ctx_service(struct pevent *ev)
753: {
754: struct pevent_ctx *const ctx = ev->ctx;
755: pthread_t tid;
756:
757: /* Does handler want to run in its own thread? */
758: if ((ev->flags & PEVENT_OWN_THREAD) != 0) {
759: if ((errno = pthread_create(&tid,
760: NULL, pevent_ctx_execute, ev)) != 0) {
761: alogf(LOG_ERR,
762: "can't spawn thread for event %p: %m", ev);
763: pevent_cancel(ev); /* removes the reference */
764: return;
765: }
766: pthread_detach(tid);
767: DBG(PEVENT, "dispatched thread %p for ev %p", tid, ev);
768: return;
769: }
770:
771: /* Execute handler with context unlocked */
772: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
773: pevent_ctx_execute(ev);
774: MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
775: }
776:
777: /*
778: * Execute an event handler.
779: *
780: * Grab the user mutex, if any, check if event was canceled,
781: * execute handler, release mutex.
782: *
783: * This is called with an extra reference on the event, which is
784: * removed by this function.
785: *
786: * This should be NOT be called with a lock on the event context.
787: */
788: static void *
789: pevent_ctx_execute(void *arg)
790: {
791: struct pevent *const ev = arg;
792: struct pevent_ctx *const ctx = ev->ctx;
793: int r;
794:
795: /* Sanity check */
796: assert(ev->magic == PEVENT_MAGIC);
797:
798: /* Register cleanup hook */
799: DBG(PEVENT, "ev %p being executed", ev);
800: pthread_cleanup_push(pevent_ctx_execute_cleanup, ev);
801:
802: try_again:
803: /* Acquire context mutex */
804: DBG(PEVENT, "locking ctx %p for ev %p", ctx, ev);
805: MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
806:
807: /* Check for cancellation to close the cancel/timeout race */
808: if ((ev->flags & PEVENT_CANCELED) != 0) {
809: DBG(PEVENT, "ev %p was canceled", ev);
810: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
811: goto done;
812: }
813:
814: /*
815: * Acquire user mutex but avoid two potential problems:
816: *
817: * - Race window: other thread could call timer_cancel()
818: * and destroy the user mutex just before we try to
819: * acquire it; we must hold context lock to prevent this.
820: *
821: * - Lock reversal deadlock: other threads acquire the
822: * user lock before the context lock, we do the reverse.
823: *
824: * Once we've acquire both locks, we drop the context lock.
825: */
826: if (ev->mutex != NULL) {
827: MUTEX_TRYLOCK(ev->mutex, ev->mutex_count);
828: if (errno != 0) {
829: if (errno != EBUSY) {
830: alogf(LOG_ERR,
831: "can't lock mutex for event %p: %m", ev);
832: pevent_cancel(ev);
833: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
834: goto done;
835: }
836: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
837: sched_yield();
838: goto try_again;
839: }
840: ev->flags |= PEVENT_GOT_MUTEX;
841: }
842:
843: /* Remove user's event reference (we still have one though) */
844: pevent_cancel(ev);
845:
846: /* Release context mutex */
847: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
848:
849: /* Reschedule a new event if recurring */
850: if ((ev->flags & PEVENT_RECURRING) != 0) {
851: switch (ev->type) {
852: case PEVENT_READ:
853: case PEVENT_WRITE:
854: r = pevent_register(ev->ctx, ev->peventp,
855: (ev->flags & PEVENT_USER_FLAGS), ev->mutex,
856: ev->handler, ev->arg, ev->type, ev->u.fd);
857: break;
858: case PEVENT_TIME:
859: r = pevent_register(ev->ctx, ev->peventp,
860: (ev->flags & PEVENT_USER_FLAGS), ev->mutex,
861: ev->handler, ev->arg, ev->type, ev->u.millis);
862: break;
863: case PEVENT_MESG_PORT:
864: r = pevent_register(ev->ctx, ev->peventp,
865: (ev->flags & PEVENT_USER_FLAGS), ev->mutex,
866: ev->handler, ev->arg, ev->type, ev->u.port);
867: break;
868: case PEVENT_USER:
869: r = pevent_register(ev->ctx, ev->peventp,
870: (ev->flags & PEVENT_USER_FLAGS), ev->mutex,
871: ev->handler, ev->arg, ev->type);
872: break;
873: default:
874: r = -1;
875: assert(0);
876: }
877: if (r == -1) {
878: alogf(LOG_ERR,
879: "can't re-register recurring event %p: %m", ev);
880: }
881: }
882:
883: /* Execute the handler */
884: (*ev->handler)(ev->arg);
885:
886: done:;
887: /* Release user mutex and event reference */
888: pthread_cleanup_pop(1);
889: return (NULL);
890: }
891:
892: /*
893: * Clean up for pevent_ctx_execute()
894: */
895: static void
896: pevent_ctx_execute_cleanup(void *arg)
897: {
898: struct pevent *const ev = arg;
899:
900: DBG(PEVENT, "ev %p cleanup", ev);
901: assert(ev->magic == PEVENT_MAGIC);
902: if ((ev->flags & PEVENT_GOT_MUTEX) != 0)
903: MUTEX_UNLOCK(ev->mutex, ev->mutex_count);
904: _pevent_unref(ev);
905: }
906:
907: /*
908: * Wakeup event thread because the event list has changed.
909: *
910: * This assumes the mutex is locked.
911: */
912: static void
913: pevent_ctx_notify(struct pevent_ctx *ctx)
914: {
915: DBG(PEVENT, "ctx %p being notified", ctx);
916: if (!ctx->notified) {
917: (void)write(ctx->pipe[1], &pevent_byte, 1);
918: ctx->notified = 1;
919: }
920: }
921:
922: /*
923: * Cancel an event (make it so that it never gets triggered) and
924: * remove the user reference to it.
925: *
926: * This assumes the lock is held.
927: */
928: static void
929: pevent_cancel(struct pevent *ev)
930: {
931: DBG(PEVENT, "canceling ev %p", ev);
932: assert((ev->flags & PEVENT_CANCELED) == 0);
933: ev->flags |= PEVENT_CANCELED;
934: *ev->peventp = NULL;
935: _pevent_unref(ev);
936: }
937:
938: /*
939: * Returns true if the event has been canceled.
940: */
941: int
942: _pevent_canceled(struct pevent *ev)
943: {
944: assert(ev->magic == PEVENT_MAGIC);
945: return ((ev->flags & PEVENT_CANCELED) != 0);
946: }
947:
948: /*
949: * Drop a reference on an event.
950: */
951: void
952: _pevent_unref(struct pevent *ev)
953: {
954: struct pevent_ctx *const ctx = ev->ctx;
955:
956: DBG(PEVENT, "ev %p refs %d -> %d", ev, ev->refs, ev->refs - 1);
957: assert(ev->refs > 0);
958: assert(ev->magic == PEVENT_MAGIC);
959: MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
960: if (--ev->refs > 0) {
961: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
962: return;
963: }
964: assert((ev->flags & PEVENT_ENQUEUED) == 0);
965: ev->magic = ~0; /* invalidate magic number */
966: DBG(PEVENT, "freeing ev %p", ev);
967: FREE(ctx->mtype, ev);
968: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
969: }
970:
971: /*
972: * Release context lock and drop a context reference.
973: *
974: * This assumes the mutex is locked. Upon return it will be unlocked.
975: */
976: static void
977: pevent_ctx_unref(struct pevent_ctx *ctx)
978: {
979: DBG(PEVENT, "ctx %p refs %d -> %d", ctx, ctx->refs, ctx->refs - 1);
980: assert(ctx->refs > 0);
981: assert(ctx->magic == PEVENT_CTX_MAGIC);
982: if (--ctx->refs > 0) {
983: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
984: return;
985: }
986: assert(TAILQ_EMPTY(&ctx->events));
987: assert(ctx->nevents == 0);
988: assert(ctx->thread == 0);
989: (void)close(ctx->pipe[0]);
990: (void)close(ctx->pipe[1]);
991: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
992: pthread_mutex_destroy(&ctx->mutex);
993: if (ctx->has_attr)
994: pthread_attr_destroy(&ctx->attr);
995: ctx->magic = ~0; /* invalidate magic number */
996: DBG(PEVENT, "freeing ctx %p", ctx);
997: FREE(ctx->mtype, ctx->fds);
998: FREE(ctx->mtype, ctx);
999: }
1000:
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>