Return to pevent.c CVS log | Up to [ELWIX - Embedded LightWeight unIX -] / embedaddon / mpd / src / contrib / libpdel / util |
1.1 misho 1: 2: /* 3: * Copyright (c) 2001-2002 Packet Design, LLC. 4: * All rights reserved. 5: * 6: * Subject to the following obligations and disclaimer of warranty, 7: * use and redistribution of this software, in source or object code 8: * forms, with or without modifications are expressly permitted by 9: * Packet Design; provided, however, that: 10: * 11: * (i) Any and all reproductions of the source or object code 12: * must include the copyright notice above and the following 13: * disclaimer of warranties; and 14: * (ii) No rights are granted, in any manner or form, to use 15: * Packet Design trademarks, including the mark "PACKET DESIGN" 16: * on advertising, endorsements, or otherwise except as such 17: * appears in the above copyright notice or in the software. 18: * 19: * THIS SOFTWARE IS BEING PROVIDED BY PACKET DESIGN "AS IS", AND 20: * TO THE MAXIMUM EXTENT PERMITTED BY LAW, PACKET DESIGN MAKES NO 21: * REPRESENTATIONS OR WARRANTIES, EXPRESS OR IMPLIED, REGARDING 22: * THIS SOFTWARE, INCLUDING WITHOUT LIMITATION, ANY AND ALL IMPLIED 23: * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, 24: * OR NON-INFRINGEMENT. PACKET DESIGN DOES NOT WARRANT, GUARANTEE, 25: * OR MAKE ANY REPRESENTATIONS REGARDING THE USE OF, OR THE RESULTS 26: * OF THE USE OF THIS SOFTWARE IN TERMS OF ITS CORRECTNESS, ACCURACY, 27: * RELIABILITY OR OTHERWISE. IN NO EVENT SHALL PACKET DESIGN BE 28: * LIABLE FOR ANY DAMAGES RESULTING FROM OR ARISING OUT OF ANY USE 29: * OF THIS SOFTWARE, INCLUDING WITHOUT LIMITATION, ANY DIRECT, 30: * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, PUNITIVE, OR CONSEQUENTIAL 31: * DAMAGES, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES, LOSS OF 32: * USE, DATA OR PROFITS, HOWEVER CAUSED AND UNDER ANY THEORY OF 33: * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 34: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF 35: * THE USE OF THIS SOFTWARE, EVEN IF PACKET DESIGN IS ADVISED OF 36: * THE POSSIBILITY OF SUCH DAMAGE. 37: * 38: * Author: Archie Cobbs <archie@freebsd.org> 39: */ 40: 41: #include <sys/types.h> 42: #include <sys/param.h> 43: #include <sys/queue.h> 44: #include <sys/time.h> 45: 46: #include <netinet/in.h> 47: 48: #include <stdarg.h> 49: #include <stdio.h> 50: #include <syslog.h> 51: #include <string.h> 52: #include <errno.h> 53: #include <assert.h> 54: #include <unistd.h> 55: #include <poll.h> 56: #include <sched.h> 57: #include <pthread.h> 58: 59: #include "structs/structs.h" 60: #include "structs/type/array.h" 61: #include "util/typed_mem.h" 62: #include "util/mesg_port.h" 63: #include "util/pevent.h" 64: #ifdef SYSLOG_FACILITY 65: #define alogf(sev, fmt, arg...) syslog(sev, "%s: " fmt, __FUNCTION__ , ## arg) 66: #else 67: #define alogf(sev, fmt, arg...) do{}while(0) 68: #endif 69: 70: #include "internal.h" 71: #include "debug/debug.h" 72: 73: #define PEVENT_MAGIC 0x31d7699b 74: #define PEVENT_CTX_MAGIC 0x7842f901 75: 76: /* Private flags */ 77: #define PEVENT_OCCURRED 0x8000 /* event has occurred */ 78: #define PEVENT_CANCELED 0x4000 /* event canceled or done */ 79: #define PEVENT_ENQUEUED 0x2000 /* in the ctx->events queue */ 80: #define PEVENT_GOT_MUTEX 0x1000 /* user mutex acquired */ 81: 82: #define PEVENT_USER_FLAGS (PEVENT_RECURRING | PEVENT_OWN_THREAD) 83: 84: #define READABLE_EVENTS (POLLIN | POLLRDNORM | POLLERR \ 85: | POLLHUP | POLLNVAL) 86: #define WRITABLE_EVENTS (POLLOUT | POLLWRNORM | POLLWRBAND \ 87: | POLLERR | POLLHUP | POLLNVAL) 88: 89: /* Event context */ 90: struct pevent_ctx { 91: u_int32_t magic; /* magic number */ 92: pthread_mutex_t mutex; /* mutex for context */ 93: #if PDEL_DEBUG 94: int mutex_count; /* mutex count */ 95: #endif 96: pthread_attr_t attr; /* event thread attributes */ 97: pthread_t thread; /* event thread */ 98: TAILQ_HEAD(, pevent) events; /* pending event list */ 99: u_int nevents; /* length of 'events' list */ 100: u_int nrwevents; /* number read/write events */ 101: struct pollfd *fds; /* poll(2) fds array */ 102: u_int fds_alloc; /* allocated size of 'fds' */ 103: const char *mtype; /* typed_mem(3) memory type */ 104: char mtype_buf[TYPED_MEM_TYPELEN]; 105: int pipe[2]; /* event thread notify pipe */ 106: u_char notified; /* data in the pipe */ 107: u_char has_attr; /* 'attr' is valid */ 108: u_int refs; /* references to this context */ 109: }; 110: 111: /* Event object */ 112: struct pevent { 113: u_int32_t magic; /* magic number */ 114: struct pevent_ctx *ctx; /* pointer to event context */ 115: struct pevent **peventp; /* user handle to this event */ 116: pevent_handler_t *handler; /* event handler function */ 117: void *arg; /* event handler function arg */ 118: int flags; /* event flags */ 119: int poll_idx; /* index in poll(2) fds array */ 120: pthread_mutex_t *mutex; /* user mutex, if any */ 121: #if PDEL_DEBUG 122: int mutex_count; /* mutex count */ 123: #endif 124: enum pevent_type type; /* type of this event */ 125: struct timeval when; /* expiration for time events */ 126: u_int refs; /* references to this event */ 127: union { 128: int fd; /* file descriptor */ 129: int millis; /* time delay */ 130: struct mesg_port *port; /* mesg_port */ 131: } u; 132: TAILQ_ENTRY(pevent) next; /* next in ctx->events */ 133: }; 134: 135: /* Macros */ 136: #define PEVENT_ENQUEUE(ctx, ev) \ 137: do { \ 138: assert(((ev)->flags & PEVENT_ENQUEUED) == 0); \ 139: TAILQ_INSERT_TAIL(&(ctx)->events, (ev), next); \ 140: (ev)->flags |= PEVENT_ENQUEUED; \ 141: (ctx)->nevents++; \ 142: if ((ev)->type == PEVENT_READ \ 143: || (ev)->type == PEVENT_WRITE) \ 144: (ctx)->nrwevents++; \ 145: DBG(PEVENT, "ev %p refs %d -> %d (enqueued)", \ 146: (ev), (ev)->refs, (ev)->refs + 1); \ 147: (ev)->refs++; \ 148: } while (0) 149: 150: #define PEVENT_DEQUEUE(ctx, ev) \ 151: do { \ 152: assert(((ev)->flags & PEVENT_ENQUEUED) != 0); \ 153: TAILQ_REMOVE(&(ctx)->events, (ev), next); \ 154: (ctx)->nevents--; \ 155: if ((ev)->type == PEVENT_READ \ 156: || (ev)->type == PEVENT_WRITE) \ 157: (ctx)->nrwevents--; \ 158: (ev)->flags &= ~PEVENT_ENQUEUED; \ 159: _pevent_unref(ev); \ 160: } while (0) 161: 162: #define PEVENT_SET_OCCURRED(ctx, ev) \ 163: do { \ 164: (ev)->flags |= PEVENT_OCCURRED; \ 165: if ((ev) != TAILQ_FIRST(&ctx->events)) { \ 166: TAILQ_REMOVE(&(ctx)->events, (ev), next); \ 167: TAILQ_INSERT_HEAD(&(ctx)->events, (ev), next); \ 168: } \ 169: } while (0) 170: 171: /* Internal functions */ 172: static void pevent_ctx_service(struct pevent *ev); 173: static void *pevent_ctx_main(void *arg); 174: static void pevent_ctx_main_cleanup(void *arg); 175: static void *pevent_ctx_execute(void *arg); 176: static void pevent_ctx_execute_cleanup(void *arg); 177: static void pevent_ctx_notify(struct pevent_ctx *ctx); 178: static void pevent_ctx_unref(struct pevent_ctx *ctx); 179: static void pevent_cancel(struct pevent *ev); 180: 181: /* Internal variables */ 182: static char pevent_byte; 183: 184: /* 185: * Create a new event context. 186: */ 187: struct pevent_ctx * 188: pevent_ctx_create(const char *mtype, const pthread_attr_t *attr) 189: { 190: pthread_mutexattr_t mutexattr; 191: struct pevent_ctx *ctx; 192: int got_mutexattr = 0; 193: int got_mutex = 0; 194: 195: /* Create context object */ 196: if ((ctx = MALLOC(mtype, sizeof(*ctx))) == NULL) 197: return (NULL); 198: memset(ctx, 0, sizeof(*ctx)); 199: if (mtype != NULL) { 200: strlcpy(ctx->mtype_buf, mtype, sizeof(ctx->mtype_buf)); 201: ctx->mtype = ctx->mtype_buf; 202: } 203: TAILQ_INIT(&ctx->events); 204: 205: /* Copy thread attributes */ 206: if (attr != NULL) { 207: struct sched_param param; 208: int value; 209: 210: if ((errno = pthread_attr_init(&ctx->attr)) != 0) 211: goto fail; 212: ctx->has_attr = 1; 1.1.1.2 ! misho 213: pthread_attr_getinheritsched(attr, &value); 1.1 misho 214: pthread_attr_setinheritsched(&ctx->attr, value); 1.1.1.2 ! misho 215: pthread_attr_getschedparam(attr, ¶m); 1.1 misho 216: pthread_attr_setschedparam(&ctx->attr, ¶m); 1.1.1.2 ! misho 217: pthread_attr_getschedpolicy(attr, &value); 1.1 misho 218: pthread_attr_setschedpolicy(&ctx->attr, value); 1.1.1.2 ! misho 219: pthread_attr_getscope(attr, &value); 1.1 misho 220: pthread_attr_setscope(&ctx->attr, value); 221: } 222: 223: /* Initialize mutex */ 224: if ((errno = pthread_mutexattr_init(&mutexattr) != 0)) 225: goto fail; 226: got_mutexattr = 1; 227: pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_RECURSIVE); 228: if ((errno = pthread_mutex_init(&ctx->mutex, &mutexattr)) != 0) 229: goto fail; 230: got_mutex = 1; 231: 232: /* Initialize notify pipe */ 233: if (pipe(ctx->pipe) == -1) 234: goto fail; 235: 236: /* Finish up */ 237: pthread_mutexattr_destroy(&mutexattr); 238: ctx->magic = PEVENT_CTX_MAGIC; 239: ctx->refs = 1; 240: DBG(PEVENT, "created ctx %p", ctx); 241: return (ctx); 242: 243: fail: 244: /* Clean up after failure */ 245: if (got_mutex) 246: pthread_mutex_destroy(&ctx->mutex); 247: if (got_mutexattr) 248: pthread_mutexattr_destroy(&mutexattr); 249: if (ctx->has_attr) 250: pthread_attr_destroy(&ctx->attr); 251: FREE(mtype, ctx); 252: return (NULL); 253: } 254: 255: /* 256: * Destroy an event context. 257: * 258: * All events are unregistered. 259: */ 260: void 261: pevent_ctx_destroy(struct pevent_ctx **ctxp) 262: { 263: struct pevent_ctx *const ctx = *ctxp; 264: 265: /* Allow NULL context */ 266: if (ctx == NULL) 267: return; 268: *ctxp = NULL; 269: DBG(PEVENT, "destroying ctx %p", ctx); 270: 271: /* Lock context */ 272: MUTEX_LOCK(&ctx->mutex, ctx->mutex_count); 273: 274: /* Sanity check */ 275: assert(ctx->magic == PEVENT_CTX_MAGIC); 276: 277: /* Unregister all events */ 278: while (!TAILQ_EMPTY(&ctx->events)) 279: pevent_unregister(TAILQ_FIRST(&ctx->events)->peventp); 280: 281: /* Decrement context reference count and unlock context */ 282: pevent_ctx_unref(ctx); 283: } 284: 285: /* 286: * Return the number of registered events. 287: */ 288: u_int 289: pevent_ctx_count(struct pevent_ctx *ctx) 290: { 291: u_int nevents; 292: 293: assert(ctx->magic == PEVENT_CTX_MAGIC); 294: MUTEX_LOCK(&ctx->mutex, ctx->mutex_count); 295: nevents = ctx->nevents; 296: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count); 297: return (nevents); 298: } 299: 300: /* 301: * Create a new schedule item. 302: */ 303: int 304: pevent_register(struct pevent_ctx *ctx, struct pevent **peventp, int flags, 305: pthread_mutex_t *mutex, pevent_handler_t *handler, void *arg, 306: enum pevent_type type, ...) 307: { 308: struct pevent *ev; 309: va_list args; 310: 311: /* Sanity check */ 312: if (*peventp != NULL) { 313: errno = EBUSY; 314: return (-1); 315: } 316: 317: /* Check flags */ 318: if ((flags & ~PEVENT_USER_FLAGS) != 0) { 319: errno = EINVAL; 320: return (-1); 321: } 322: 323: /* Create new event */ 324: if ((ev = MALLOC(ctx->mtype, sizeof(*ev))) == NULL) 325: return (-1); 326: memset(ev, 0, sizeof(*ev)); 327: ev->magic = PEVENT_MAGIC; 328: ev->ctx = ctx; 329: ev->handler = handler; 330: ev->arg = arg; 331: ev->flags = flags; 332: ev->poll_idx = -1; 333: ev->mutex = mutex; 334: ev->type = type; 335: ev->refs = 1; /* the caller's reference */ 336: DBG(PEVENT, "new ev %p in ctx %p", ev, ctx); 337: 338: /* Add type-specific info */ 339: switch (type) { 340: case PEVENT_READ: 341: case PEVENT_WRITE: 342: va_start(args, type); 343: ev->u.fd = va_arg(args, int); 344: va_end(args); 345: break; 346: case PEVENT_TIME: 347: va_start(args, type); 348: ev->u.millis = va_arg(args, int); 349: va_end(args); 350: if (ev->u.millis < 0) 351: ev->u.millis = 0; 352: gettimeofday(&ev->when, NULL); 353: ev->when.tv_sec += ev->u.millis / 1000; 354: ev->when.tv_usec += (ev->u.millis % 1000) * 1000; 355: if (ev->when.tv_usec > 1000000) { 356: ev->when.tv_sec++; 357: ev->when.tv_usec -= 1000000; 358: } 359: break; 360: case PEVENT_MESG_PORT: 361: va_start(args, type); 362: ev->u.port = va_arg(args, struct mesg_port *); 363: va_end(args); 364: if (ev->u.port == NULL) 365: goto invalid; 366: break; 367: case PEVENT_USER: 368: break; 369: default: 370: invalid: 371: errno = EINVAL; 372: _pevent_unref(ev); 373: return (-1); 374: } 375: 376: /* Lock context */ 377: MUTEX_LOCK(&ctx->mutex, ctx->mutex_count); 378: 379: /* Link to related object (if appropriate) */ 380: switch (ev->type) { 381: case PEVENT_MESG_PORT: 382: if (_mesg_port_set_event(ev->u.port, ev) == -1) { 383: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count); 384: _pevent_unref(ev); 385: return (-1); 386: } 387: DBG(PEVENT, "ev %p refs %d -> %d (mesg port)", 388: ev, ev->refs, ev->refs + 1); 389: ev->refs++; /* the mesg_port's reference */ 390: break; 391: default: 392: break; 393: } 394: 395: /* Start or notify event thread */ 396: if (ctx->thread == 0) { 397: if ((errno = pthread_create(&ctx->thread, 398: ctx->has_attr ? &ctx->attr : NULL, 399: pevent_ctx_main, ctx)) != 0) { 400: ctx->thread = 0; 401: alogf(LOG_ERR, "%s: %m", "pthread_create"); 402: ev->flags |= PEVENT_CANCELED; 403: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count); 404: _pevent_unref(ev); 405: return (-1); 406: } 407: pthread_detach(ctx->thread); 408: DBG(PEVENT, "created tid %p for ctx %p, refs -> %d", 409: ctx->thread, ctx, ctx->refs + 1); 410: ctx->refs++; /* add reference for thread */ 411: } else 412: pevent_ctx_notify(ctx); 413: 414: /* Caller gets the one reference */ 415: ev->peventp = peventp; 416: *peventp = ev; 417: 418: /* Add event to the pending event list */ 419: PEVENT_ENQUEUE(ctx, ev); 420: 421: /* Unlock context */ 422: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count); 423: 424: return (0); 425: } 426: 427: /* 428: * Unregister an event. 429: */ 430: void 431: pevent_unregister(struct pevent **peventp) 432: { 433: struct pevent *const ev = *peventp; 434: struct pevent_ctx *ctx; 435: 436: /* Allow NULL event */ 437: if (ev == NULL) 438: return; 439: ctx = ev->ctx; 440: DBG(PEVENT, "unregister ev %p in ctx %p", ev, ctx); 441: 442: /* Sanity checks */ 443: assert(ev->magic == PEVENT_MAGIC); 444: assert(ctx->magic == PEVENT_CTX_MAGIC); 445: assert(ev->peventp == peventp); 446: 447: /* Lock context */ 448: MUTEX_LOCK(&ctx->mutex, ctx->mutex_count); 449: DBG(PEVENT, "unregister ev %p in ctx %p: lock", ev, ctx); 450: 451: /* Dequeue event if queued */ 452: if ((ev->flags & PEVENT_ENQUEUED) != 0) { 453: PEVENT_DEQUEUE(ctx, ev); 454: pevent_ctx_notify(ctx); /* wakeup thread */ 455: } 456: 457: /* Cancel the event */ 458: pevent_cancel(ev); 459: 460: /* Unlock context */ 461: DBG(PEVENT, "unregister ev %p in ctx %p: unlock", ev, ctx); 462: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count); 463: } 464: 465: /* 466: * Trigger an event. 467: */ 468: void 469: pevent_trigger(struct pevent *ev) 470: { 471: struct pevent_ctx *ctx; 472: 473: /* Sanity check */ 474: if (ev == NULL) 475: return; 476: 477: /* Get context */ 478: ctx = ev->ctx; 479: DBG(PEVENT, "trigger ev %p in ctx %p", ev, ctx); 480: assert(ev->magic == PEVENT_MAGIC); 481: 482: /* Lock context */ 483: MUTEX_LOCK(&ctx->mutex, ctx->mutex_count); 484: 485: /* If event already canceled, bail */ 486: if ((ev->flags & PEVENT_CANCELED) != 0) 487: goto done; 488: 489: /* Mark event as having occurred */ 490: PEVENT_SET_OCCURRED(ctx, ev); 491: 492: /* Wake up thread if event is still in the queue */ 493: if ((ev->flags & PEVENT_ENQUEUED) != 0) 494: pevent_ctx_notify(ctx); 495: 496: done: 497: /* Unlock context */ 498: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count); 499: } 500: 501: /* 502: * Get event info. 503: */ 504: int 505: pevent_get_info(struct pevent *ev, struct pevent_info *info) 506: { 507: if (ev == NULL) { 508: errno = ENXIO; 509: return (-1); 510: } 511: assert(ev->magic == PEVENT_MAGIC); 512: memset(info, 0, sizeof(*info)); 513: info->type = ev->type; 514: switch (ev->type) { 515: case PEVENT_READ: 516: case PEVENT_WRITE: 517: info->u.fd = ev->u.fd; 518: break; 519: case PEVENT_TIME: 520: info->u.millis = ev->u.millis; 521: break; 522: case PEVENT_MESG_PORT: 523: info->u.port = ev->u.port; 524: break; 525: case PEVENT_USER: 526: break; 527: default: 528: errno = EINVAL; 529: return (-1); 530: } 531: return (0); 532: } 533: 534: /* 535: * Event thread entry point. 536: */ 537: static void * 538: pevent_ctx_main(void *arg) 539: { 540: struct pevent_ctx *const ctx = arg; 541: struct timeval now; 542: struct pollfd *fd; 543: struct pevent *ev; 544: struct pevent *next_ev; 1.1.1.2 ! misho 545: unsigned poll_idx; 1.1 misho 546: int timeout; 547: int r; 548: 549: /* Push cleanup hook */ 550: pthread_cleanup_push(pevent_ctx_main_cleanup, ctx); 551: assert(ctx->magic == PEVENT_CTX_MAGIC); 552: 553: /* Lock context */ 554: MUTEX_LOCK(&ctx->mutex, ctx->mutex_count); 555: 556: /* Safety belts */ 557: pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); 558: 559: /* Get current time */ 560: gettimeofday(&now, NULL); 561: DBG(PEVENT, "ctx %p thread starting", ctx); 562: 563: loop: 564: /* Are there any events left? */ 565: if (TAILQ_EMPTY(&ctx->events)) { 566: DBG(PEVENT, "ctx %p thread exiting", ctx); 567: goto done; 568: } 569: 570: /* Make sure ctx->fds array is long enough */ 571: if (ctx->fds_alloc < 1 + ctx->nrwevents) { 572: const u_int new_alloc = roundup(1 + ctx->nrwevents, 16); 573: void *mem; 574: 575: if ((mem = REALLOC(ctx->mtype, ctx->fds, 576: new_alloc * sizeof(*ctx->fds))) == NULL) 577: alogf(LOG_ERR, "%s: %m", "realloc"); 578: else { 579: ctx->fds = mem; 580: ctx->fds_alloc = new_alloc; 581: } 582: } 583: 584: /* If we were intentionally woken up, read the wakeup byte */ 585: if (ctx->notified) { 586: DBG(PEVENT, "ctx %p thread was notified", ctx); 587: (void)read(ctx->pipe[0], &pevent_byte, 1); 588: ctx->notified = 0; 589: } 590: 591: /* Add event for the notify pipe */ 592: poll_idx = 0; 593: if (ctx->fds_alloc > 0) { 594: fd = &ctx->fds[poll_idx++]; 595: memset(fd, 0, sizeof(*fd)); 596: fd->fd = ctx->pipe[0]; 597: fd->events = POLLRDNORM; 598: } 599: 600: /* Fill in rest of poll() array */ 601: timeout = INFTIM; 602: TAILQ_FOREACH(ev, &ctx->events, next) { 603: switch (ev->type) { 604: case PEVENT_READ: 605: case PEVENT_WRITE: 606: { 607: if (poll_idx >= ctx->fds_alloc) { 608: ev->poll_idx = -1; 609: break; 610: } 611: ev->poll_idx = poll_idx++; 612: fd = &ctx->fds[ev->poll_idx]; 613: memset(fd, 0, sizeof(*fd)); 614: fd->fd = ev->u.fd; 615: fd->events = (ev->type == PEVENT_READ) ? 616: POLLRDNORM : POLLWRNORM; 617: break; 618: } 619: case PEVENT_TIME: 620: { 621: struct timeval remain; 622: int millis; 623: 624: /* Compute milliseconds until event */ 625: if (timercmp(&ev->when, &now, <=)) 626: millis = 0; 627: else { 628: timersub(&ev->when, &now, &remain); 629: millis = remain.tv_sec * 1000; 630: millis += remain.tv_usec / 1000; 631: } 632: 633: /* Remember the minimum delay */ 634: if (timeout == INFTIM || millis < timeout) 635: timeout = millis; 636: break; 637: } 638: default: 639: break; 640: } 641: } 642: 643: /* Mark non-poll() events that have occurred */ 644: TAILQ_FOREACH(ev, &ctx->events, next) { 645: assert(ev->magic == PEVENT_MAGIC); 646: switch (ev->type) { 647: case PEVENT_MESG_PORT: 648: if (mesg_port_qlen(ev->u.port) > 0) 649: PEVENT_SET_OCCURRED(ctx, ev); 650: break; 651: default: 652: break; 653: } 654: if ((ev->flags & PEVENT_OCCURRED) != 0) 655: timeout = 0; /* don't delay */ 656: } 657: 658: #if PDEL_DEBUG 659: /* Debugging */ 660: DBG(PEVENT, "ctx %p thread event list:", ctx); 661: TAILQ_FOREACH(ev, &ctx->events, next) 662: DBG(PEVENT, "\tev %p", ev); 663: #endif 664: 665: /* Wait for something to happen */ 666: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count); 667: DBG(PEVENT, "ctx %p thread sleeping", ctx); 668: r = poll(ctx->fds, poll_idx, timeout); 669: DBG(PEVENT, "ctx %p thread woke up", ctx); 670: assert(ctx->magic == PEVENT_CTX_MAGIC); 671: MUTEX_LOCK(&ctx->mutex, ctx->mutex_count); 672: 673: /* Check for errors */ 674: if (r == -1 && errno != EINTR) { 675: alogf(LOG_CRIT, "%s: %m", "poll"); 676: assert(0); 677: } 678: 679: /* Update current time */ 680: gettimeofday(&now, NULL); 681: 682: /* Mark poll() events that have occurred */ 683: for (ev = TAILQ_FIRST((&ctx->events)); ev != NULL; ev = next_ev) { 684: next_ev = TAILQ_NEXT(ev, next); 685: assert(ev->magic == PEVENT_MAGIC); 686: switch (ev->type) { 687: case PEVENT_READ: 688: case PEVENT_WRITE: 689: if (ev->poll_idx == -1) 690: break; 691: fd = &ctx->fds[ev->poll_idx]; 692: if ((fd->revents & ((ev->type == PEVENT_READ) ? 693: READABLE_EVENTS : WRITABLE_EVENTS)) != 0) 694: PEVENT_SET_OCCURRED(ctx, ev); 695: break; 696: case PEVENT_TIME: 697: if (timercmp(&ev->when, &now, <=)) 698: PEVENT_SET_OCCURRED(ctx, ev); 699: break; 700: default: 701: break; 702: } 703: } 704: 705: /* Service all events that are marked as having occurred */ 706: while (1) { 707: 708: /* Find next event that needs service */ 709: ev = TAILQ_FIRST(&ctx->events); 710: if (ev == NULL || (ev->flags & PEVENT_OCCURRED) == 0) 711: break; 712: DBG(PEVENT, "ctx %p thread servicing ev %p", ctx, ev); 713: 714: /* Detach event and service it while keeping a reference */ 715: DBG(PEVENT, "ev %p refs %d -> %d (for servicing)", 716: ev, ev->refs, ev->refs + 1); 717: ev->refs++; 718: PEVENT_DEQUEUE(ctx, ev); 719: pevent_ctx_service(ev); 720: } 721: 722: /* Spin again */ 723: DBG(PEVENT, "ctx %p thread spin again", ctx); 724: goto loop; 725: 726: done:; 727: /* Done */ 728: pthread_cleanup_pop(1); 729: return (NULL); 730: } 731: 732: /* 733: * Cleanup routine for event thread. 734: */ 735: static void 736: pevent_ctx_main_cleanup(void *arg) 737: { 738: struct pevent_ctx *const ctx = arg; 739: 740: DBG(PEVENT, "ctx %p thread cleanup", ctx); 741: ctx->thread = 0; 742: pevent_ctx_unref(ctx); /* release thread's reference */ 743: } 744: 745: /* 746: * Service an event. 747: * 748: * This is called with an extra reference on the event which is 749: * removed by pevent_ctx_execute(). 750: */ 751: static void 752: pevent_ctx_service(struct pevent *ev) 753: { 754: struct pevent_ctx *const ctx = ev->ctx; 755: pthread_t tid; 756: 757: /* Does handler want to run in its own thread? */ 758: if ((ev->flags & PEVENT_OWN_THREAD) != 0) { 759: if ((errno = pthread_create(&tid, 760: NULL, pevent_ctx_execute, ev)) != 0) { 761: alogf(LOG_ERR, 762: "can't spawn thread for event %p: %m", ev); 763: pevent_cancel(ev); /* removes the reference */ 764: return; 765: } 766: pthread_detach(tid); 767: DBG(PEVENT, "dispatched thread %p for ev %p", tid, ev); 768: return; 769: } 770: 771: /* Execute handler with context unlocked */ 772: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count); 773: pevent_ctx_execute(ev); 774: MUTEX_LOCK(&ctx->mutex, ctx->mutex_count); 775: } 776: 777: /* 778: * Execute an event handler. 779: * 780: * Grab the user mutex, if any, check if event was canceled, 781: * execute handler, release mutex. 782: * 783: * This is called with an extra reference on the event, which is 784: * removed by this function. 785: * 786: * This should be NOT be called with a lock on the event context. 787: */ 788: static void * 789: pevent_ctx_execute(void *arg) 790: { 791: struct pevent *const ev = arg; 792: struct pevent_ctx *const ctx = ev->ctx; 793: int r; 794: 795: /* Sanity check */ 796: assert(ev->magic == PEVENT_MAGIC); 797: 798: /* Register cleanup hook */ 799: DBG(PEVENT, "ev %p being executed", ev); 800: pthread_cleanup_push(pevent_ctx_execute_cleanup, ev); 801: 802: try_again: 803: /* Acquire context mutex */ 804: DBG(PEVENT, "locking ctx %p for ev %p", ctx, ev); 805: MUTEX_LOCK(&ctx->mutex, ctx->mutex_count); 806: 807: /* Check for cancellation to close the cancel/timeout race */ 808: if ((ev->flags & PEVENT_CANCELED) != 0) { 809: DBG(PEVENT, "ev %p was canceled", ev); 810: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count); 811: goto done; 812: } 813: 814: /* 815: * Acquire user mutex but avoid two potential problems: 816: * 817: * - Race window: other thread could call timer_cancel() 818: * and destroy the user mutex just before we try to 819: * acquire it; we must hold context lock to prevent this. 820: * 821: * - Lock reversal deadlock: other threads acquire the 822: * user lock before the context lock, we do the reverse. 823: * 824: * Once we've acquire both locks, we drop the context lock. 825: */ 826: if (ev->mutex != NULL) { 827: MUTEX_TRYLOCK(ev->mutex, ev->mutex_count); 828: if (errno != 0) { 829: if (errno != EBUSY) { 830: alogf(LOG_ERR, 831: "can't lock mutex for event %p: %m", ev); 832: pevent_cancel(ev); 833: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count); 834: goto done; 835: } 836: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count); 837: sched_yield(); 838: goto try_again; 839: } 840: ev->flags |= PEVENT_GOT_MUTEX; 841: } 842: 843: /* Remove user's event reference (we still have one though) */ 844: pevent_cancel(ev); 845: 846: /* Release context mutex */ 847: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count); 848: 849: /* Reschedule a new event if recurring */ 850: if ((ev->flags & PEVENT_RECURRING) != 0) { 851: switch (ev->type) { 852: case PEVENT_READ: 853: case PEVENT_WRITE: 854: r = pevent_register(ev->ctx, ev->peventp, 855: (ev->flags & PEVENT_USER_FLAGS), ev->mutex, 856: ev->handler, ev->arg, ev->type, ev->u.fd); 857: break; 858: case PEVENT_TIME: 859: r = pevent_register(ev->ctx, ev->peventp, 860: (ev->flags & PEVENT_USER_FLAGS), ev->mutex, 861: ev->handler, ev->arg, ev->type, ev->u.millis); 862: break; 863: case PEVENT_MESG_PORT: 864: r = pevent_register(ev->ctx, ev->peventp, 865: (ev->flags & PEVENT_USER_FLAGS), ev->mutex, 866: ev->handler, ev->arg, ev->type, ev->u.port); 867: break; 868: case PEVENT_USER: 869: r = pevent_register(ev->ctx, ev->peventp, 870: (ev->flags & PEVENT_USER_FLAGS), ev->mutex, 871: ev->handler, ev->arg, ev->type); 872: break; 873: default: 874: r = -1; 875: assert(0); 876: } 877: if (r == -1) { 878: alogf(LOG_ERR, 879: "can't re-register recurring event %p: %m", ev); 880: } 881: } 882: 883: /* Execute the handler */ 884: (*ev->handler)(ev->arg); 885: 886: done:; 887: /* Release user mutex and event reference */ 888: pthread_cleanup_pop(1); 889: return (NULL); 890: } 891: 892: /* 893: * Clean up for pevent_ctx_execute() 894: */ 895: static void 896: pevent_ctx_execute_cleanup(void *arg) 897: { 898: struct pevent *const ev = arg; 899: 900: DBG(PEVENT, "ev %p cleanup", ev); 901: assert(ev->magic == PEVENT_MAGIC); 902: if ((ev->flags & PEVENT_GOT_MUTEX) != 0) 903: MUTEX_UNLOCK(ev->mutex, ev->mutex_count); 904: _pevent_unref(ev); 905: } 906: 907: /* 908: * Wakeup event thread because the event list has changed. 909: * 910: * This assumes the mutex is locked. 911: */ 912: static void 913: pevent_ctx_notify(struct pevent_ctx *ctx) 914: { 915: DBG(PEVENT, "ctx %p being notified", ctx); 916: if (!ctx->notified) { 917: (void)write(ctx->pipe[1], &pevent_byte, 1); 918: ctx->notified = 1; 919: } 920: } 921: 922: /* 923: * Cancel an event (make it so that it never gets triggered) and 924: * remove the user reference to it. 925: * 926: * This assumes the lock is held. 927: */ 928: static void 929: pevent_cancel(struct pevent *ev) 930: { 931: DBG(PEVENT, "canceling ev %p", ev); 932: assert((ev->flags & PEVENT_CANCELED) == 0); 933: ev->flags |= PEVENT_CANCELED; 934: *ev->peventp = NULL; 935: _pevent_unref(ev); 936: } 937: 938: /* 939: * Returns true if the event has been canceled. 940: */ 941: int 942: _pevent_canceled(struct pevent *ev) 943: { 944: assert(ev->magic == PEVENT_MAGIC); 945: return ((ev->flags & PEVENT_CANCELED) != 0); 946: } 947: 948: /* 949: * Drop a reference on an event. 950: */ 951: void 952: _pevent_unref(struct pevent *ev) 953: { 954: struct pevent_ctx *const ctx = ev->ctx; 955: 956: DBG(PEVENT, "ev %p refs %d -> %d", ev, ev->refs, ev->refs - 1); 957: assert(ev->refs > 0); 958: assert(ev->magic == PEVENT_MAGIC); 959: MUTEX_LOCK(&ctx->mutex, ctx->mutex_count); 960: if (--ev->refs > 0) { 961: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count); 962: return; 963: } 964: assert((ev->flags & PEVENT_ENQUEUED) == 0); 965: ev->magic = ~0; /* invalidate magic number */ 966: DBG(PEVENT, "freeing ev %p", ev); 967: FREE(ctx->mtype, ev); 968: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count); 969: } 970: 971: /* 972: * Release context lock and drop a context reference. 973: * 974: * This assumes the mutex is locked. Upon return it will be unlocked. 975: */ 976: static void 977: pevent_ctx_unref(struct pevent_ctx *ctx) 978: { 979: DBG(PEVENT, "ctx %p refs %d -> %d", ctx, ctx->refs, ctx->refs - 1); 980: assert(ctx->refs > 0); 981: assert(ctx->magic == PEVENT_CTX_MAGIC); 982: if (--ctx->refs > 0) { 983: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count); 984: return; 985: } 986: assert(TAILQ_EMPTY(&ctx->events)); 987: assert(ctx->nevents == 0); 988: assert(ctx->thread == 0); 989: (void)close(ctx->pipe[0]); 990: (void)close(ctx->pipe[1]); 991: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count); 992: pthread_mutex_destroy(&ctx->mutex); 993: if (ctx->has_attr) 994: pthread_attr_destroy(&ctx->attr); 995: ctx->magic = ~0; /* invalidate magic number */ 996: DBG(PEVENT, "freeing ctx %p", ctx); 997: FREE(ctx->mtype, ctx->fds); 998: FREE(ctx->mtype, ctx); 999: } 1000: