Annotation of embedaddon/libpdel/util/pevent.c, revision 1.1
1.1 ! misho 1:
! 2: /*
! 3: * Copyright (c) 2001-2002 Packet Design, LLC.
! 4: * All rights reserved.
! 5: *
! 6: * Subject to the following obligations and disclaimer of warranty,
! 7: * use and redistribution of this software, in source or object code
! 8: * forms, with or without modifications are expressly permitted by
! 9: * Packet Design; provided, however, that:
! 10: *
! 11: * (i) Any and all reproductions of the source or object code
! 12: * must include the copyright notice above and the following
! 13: * disclaimer of warranties; and
! 14: * (ii) No rights are granted, in any manner or form, to use
! 15: * Packet Design trademarks, including the mark "PACKET DESIGN"
! 16: * on advertising, endorsements, or otherwise except as such
! 17: * appears in the above copyright notice or in the software.
! 18: *
! 19: * THIS SOFTWARE IS BEING PROVIDED BY PACKET DESIGN "AS IS", AND
! 20: * TO THE MAXIMUM EXTENT PERMITTED BY LAW, PACKET DESIGN MAKES NO
! 21: * REPRESENTATIONS OR WARRANTIES, EXPRESS OR IMPLIED, REGARDING
! 22: * THIS SOFTWARE, INCLUDING WITHOUT LIMITATION, ANY AND ALL IMPLIED
! 23: * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE,
! 24: * OR NON-INFRINGEMENT. PACKET DESIGN DOES NOT WARRANT, GUARANTEE,
! 25: * OR MAKE ANY REPRESENTATIONS REGARDING THE USE OF, OR THE RESULTS
! 26: * OF THE USE OF THIS SOFTWARE IN TERMS OF ITS CORRECTNESS, ACCURACY,
! 27: * RELIABILITY OR OTHERWISE. IN NO EVENT SHALL PACKET DESIGN BE
! 28: * LIABLE FOR ANY DAMAGES RESULTING FROM OR ARISING OUT OF ANY USE
! 29: * OF THIS SOFTWARE, INCLUDING WITHOUT LIMITATION, ANY DIRECT,
! 30: * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, PUNITIVE, OR CONSEQUENTIAL
! 31: * DAMAGES, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES, LOSS OF
! 32: * USE, DATA OR PROFITS, HOWEVER CAUSED AND UNDER ANY THEORY OF
! 33: * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
! 34: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
! 35: * THE USE OF THIS SOFTWARE, EVEN IF PACKET DESIGN IS ADVISED OF
! 36: * THE POSSIBILITY OF SUCH DAMAGE.
! 37: *
! 38: * Author: Archie Cobbs <archie@freebsd.org>
! 39: */
! 40:
! 41: #include <sys/types.h>
! 42: #include <sys/param.h>
! 43: #include <sys/queue.h>
! 44: #include <sys/time.h>
! 45:
! 46: #include <netinet/in.h>
! 47:
! 48: #include <stdarg.h>
! 49: #include <stdio.h>
! 50: #include <syslog.h>
! 51: #include <string.h>
! 52: #include <errno.h>
! 53: #include <assert.h>
! 54: #include <unistd.h>
! 55: #include <poll.h>
! 56: #include <sched.h>
! 57: #include <pthread.h>
! 58:
! 59: #include "structs/structs.h"
! 60: #include "structs/type/array.h"
! 61: #include "util/typed_mem.h"
! 62: #include "util/mesg_port.h"
! 63: #include "util/pevent.h"
! 64: #include "sys/alog.h"
! 65:
! 66: #include "internal.h"
! 67: #include "debug/debug.h"
! 68:
! 69: #define PEVENT_MAGIC 0x31d7699b
! 70: #define PEVENT_CTX_MAGIC 0x7842f901
! 71:
! 72: /* Private flags */
! 73: #define PEVENT_OCCURRED 0x8000 /* event has occurred */
! 74: #define PEVENT_CANCELED 0x4000 /* event canceled or done */
! 75: #define PEVENT_ENQUEUED 0x2000 /* in the ctx->events queue */
! 76: #define PEVENT_GOT_MUTEX 0x1000 /* user mutex acquired */
! 77:
! 78: #define PEVENT_USER_FLAGS (PEVENT_RECURRING | PEVENT_OWN_THREAD)
! 79:
! 80: #define READABLE_EVENTS (POLLIN | POLLRDNORM | POLLERR \
! 81: | POLLHUP | POLLNVAL)
! 82: #define WRITABLE_EVENTS (POLLOUT | POLLWRNORM | POLLWRBAND \
! 83: | POLLERR | POLLHUP | POLLNVAL)
! 84:
! 85: /* Event context */
! 86: struct pevent_ctx {
! 87: u_int32_t magic; /* magic number */
! 88: pthread_mutex_t mutex; /* mutex for context */
! 89: #if PDEL_DEBUG
! 90: int mutex_count; /* mutex count */
! 91: #endif
! 92: pthread_attr_t attr; /* event thread attributes */
! 93: pthread_t thread; /* event thread */
! 94: TAILQ_HEAD(, pevent) events; /* pending event list */
! 95: u_int nevents; /* length of 'events' list */
! 96: u_int nrwevents; /* number read/write events */
! 97: struct pollfd *fds; /* poll(2) fds array */
! 98: u_int fds_alloc; /* allocated size of 'fds' */
! 99: const char *mtype; /* typed_mem(3) memory type */
! 100: char mtype_buf[TYPED_MEM_TYPELEN];
! 101: int pipe[2]; /* event thread notify pipe */
! 102: u_char notified; /* data in the pipe */
! 103: u_char has_attr; /* 'attr' is valid */
! 104: u_int refs; /* references to this context */
! 105: };
! 106:
! 107: /* Event object */
! 108: struct pevent {
! 109: u_int32_t magic; /* magic number */
! 110: struct pevent_ctx *ctx; /* pointer to event context */
! 111: struct pevent **peventp; /* user handle to this event */
! 112: pevent_handler_t *handler; /* event handler function */
! 113: void *arg; /* event handler function arg */
! 114: int flags; /* event flags */
! 115: int poll_idx; /* index in poll(2) fds array */
! 116: pthread_mutex_t *mutex; /* user mutex, if any */
! 117: #if PDEL_DEBUG
! 118: int mutex_count; /* mutex count */
! 119: #endif
! 120: enum pevent_type type; /* type of this event */
! 121: struct timeval when; /* expiration for time events */
! 122: u_int refs; /* references to this event */
! 123: union {
! 124: int fd; /* file descriptor */
! 125: int millis; /* time delay */
! 126: struct mesg_port *port; /* mesg_port */
! 127: } u;
! 128: TAILQ_ENTRY(pevent) next; /* next in ctx->events */
! 129: };
! 130:
! 131: /* Macros */
! 132: #define PEVENT_ENQUEUE(ctx, ev) \
! 133: do { \
! 134: assert(((ev)->flags & PEVENT_ENQUEUED) == 0); \
! 135: TAILQ_INSERT_TAIL(&(ctx)->events, (ev), next); \
! 136: (ev)->flags |= PEVENT_ENQUEUED; \
! 137: (ctx)->nevents++; \
! 138: if ((ev)->type == PEVENT_READ \
! 139: || (ev)->type == PEVENT_WRITE) \
! 140: (ctx)->nrwevents++; \
! 141: DBG(PEVENT, "ev %p refs %d -> %d (enqueued)", \
! 142: (ev), (ev)->refs, (ev)->refs + 1); \
! 143: (ev)->refs++; \
! 144: } while (0)
! 145:
! 146: #define PEVENT_DEQUEUE(ctx, ev) \
! 147: do { \
! 148: assert(((ev)->flags & PEVENT_ENQUEUED) != 0); \
! 149: TAILQ_REMOVE(&(ctx)->events, (ev), next); \
! 150: (ctx)->nevents--; \
! 151: if ((ev)->type == PEVENT_READ \
! 152: || (ev)->type == PEVENT_WRITE) \
! 153: (ctx)->nrwevents--; \
! 154: (ev)->flags &= ~PEVENT_ENQUEUED; \
! 155: _pevent_unref(ev); \
! 156: } while (0)
! 157:
! 158: #define PEVENT_SET_OCCURRED(ctx, ev) \
! 159: do { \
! 160: (ev)->flags |= PEVENT_OCCURRED; \
! 161: if ((ev) != TAILQ_FIRST(&ctx->events)) { \
! 162: TAILQ_REMOVE(&(ctx)->events, (ev), next); \
! 163: TAILQ_INSERT_HEAD(&(ctx)->events, (ev), next); \
! 164: } \
! 165: } while (0)
! 166:
! 167: /* Internal functions */
! 168: static void pevent_ctx_service(struct pevent *ev);
! 169: static void *pevent_ctx_main(void *arg);
! 170: static void pevent_ctx_main_cleanup(void *arg);
! 171: static void *pevent_ctx_execute(void *arg);
! 172: static void pevent_ctx_execute_cleanup(void *arg);
! 173: static void pevent_ctx_notify(struct pevent_ctx *ctx);
! 174: static void pevent_ctx_unref(struct pevent_ctx *ctx);
! 175: static void pevent_cancel(struct pevent *ev);
! 176:
! 177: /* Internal variables */
! 178: static char pevent_byte;
! 179:
! 180: /*
! 181: * Create a new event context.
! 182: */
! 183: struct pevent_ctx *
! 184: pevent_ctx_create(const char *mtype, const pthread_attr_t *attr)
! 185: {
! 186: pthread_mutexattr_t mutexattr;
! 187: struct pevent_ctx *ctx;
! 188: int got_mutexattr = 0;
! 189: int got_mutex = 0;
! 190:
! 191: /* Create context object */
! 192: if ((ctx = MALLOC(mtype, sizeof(*ctx))) == NULL)
! 193: return (NULL);
! 194: memset(ctx, 0, sizeof(*ctx));
! 195: if (mtype != NULL) {
! 196: strlcpy(ctx->mtype_buf, mtype, sizeof(ctx->mtype_buf));
! 197: ctx->mtype = ctx->mtype_buf;
! 198: }
! 199: TAILQ_INIT(&ctx->events);
! 200:
! 201: /* Copy thread attributes */
! 202: if (attr != NULL) {
! 203: struct sched_param param;
! 204: int value;
! 205:
! 206: if ((errno = pthread_attr_init(&ctx->attr)) != 0)
! 207: goto fail;
! 208: ctx->has_attr = 1;
! 209: pthread_attr_getinheritsched(attr, &value);
! 210: pthread_attr_setinheritsched(&ctx->attr, value);
! 211: pthread_attr_getschedparam(attr, ¶m);
! 212: pthread_attr_setschedparam(&ctx->attr, ¶m);
! 213: pthread_attr_getschedpolicy(attr, &value);
! 214: pthread_attr_setschedpolicy(&ctx->attr, value);
! 215: pthread_attr_getscope(attr, &value);
! 216: pthread_attr_setscope(&ctx->attr, value);
! 217: }
! 218:
! 219: /* Initialize mutex */
! 220: if ((errno = pthread_mutexattr_init(&mutexattr) != 0))
! 221: goto fail;
! 222: got_mutexattr = 1;
! 223: pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_RECURSIVE);
! 224: if ((errno = pthread_mutex_init(&ctx->mutex, &mutexattr)) != 0)
! 225: goto fail;
! 226: got_mutex = 1;
! 227:
! 228: /* Initialize notify pipe */
! 229: if (pipe(ctx->pipe) == -1)
! 230: goto fail;
! 231:
! 232: /* Finish up */
! 233: pthread_mutexattr_destroy(&mutexattr);
! 234: ctx->magic = PEVENT_CTX_MAGIC;
! 235: ctx->refs = 1;
! 236: DBG(PEVENT, "created ctx %p", ctx);
! 237: return (ctx);
! 238:
! 239: fail:
! 240: /* Clean up after failure */
! 241: if (got_mutex)
! 242: pthread_mutex_destroy(&ctx->mutex);
! 243: if (got_mutexattr)
! 244: pthread_mutexattr_destroy(&mutexattr);
! 245: if (ctx->has_attr)
! 246: pthread_attr_destroy(&ctx->attr);
! 247: FREE(mtype, ctx);
! 248: return (NULL);
! 249: }
! 250:
! 251: /*
! 252: * Destroy an event context.
! 253: *
! 254: * All events are unregistered.
! 255: */
! 256: void
! 257: pevent_ctx_destroy(struct pevent_ctx **ctxp)
! 258: {
! 259: struct pevent_ctx *const ctx = *ctxp;
! 260:
! 261: /* Allow NULL context */
! 262: if (ctx == NULL)
! 263: return;
! 264: *ctxp = NULL;
! 265: DBG(PEVENT, "destroying ctx %p", ctx);
! 266:
! 267: /* Lock context */
! 268: MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
! 269:
! 270: /* Sanity check */
! 271: assert(ctx->magic == PEVENT_CTX_MAGIC);
! 272:
! 273: /* Unregister all events */
! 274: while (!TAILQ_EMPTY(&ctx->events))
! 275: pevent_unregister(TAILQ_FIRST(&ctx->events)->peventp);
! 276:
! 277: /* Decrement context reference count and unlock context */
! 278: pevent_ctx_unref(ctx);
! 279: }
! 280:
! 281: /*
! 282: * Return the number of registered events.
! 283: */
! 284: u_int
! 285: pevent_ctx_count(struct pevent_ctx *ctx)
! 286: {
! 287: u_int nevents;
! 288:
! 289: assert(ctx->magic == PEVENT_CTX_MAGIC);
! 290: MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
! 291: nevents = ctx->nevents;
! 292: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
! 293: return (nevents);
! 294: }
! 295:
! 296: /*
! 297: * Create a new schedule item.
! 298: */
! 299: int
! 300: pevent_register(struct pevent_ctx *ctx, struct pevent **peventp, int flags,
! 301: pthread_mutex_t *mutex, pevent_handler_t *handler, void *arg,
! 302: enum pevent_type type, ...)
! 303: {
! 304: struct pevent *ev;
! 305: va_list args;
! 306:
! 307: /* Sanity check */
! 308: if (*peventp != NULL) {
! 309: errno = EBUSY;
! 310: return (-1);
! 311: }
! 312:
! 313: /* Check flags */
! 314: if ((flags & ~PEVENT_USER_FLAGS) != 0) {
! 315: errno = EINVAL;
! 316: return (-1);
! 317: }
! 318:
! 319: /* Create new event */
! 320: if ((ev = MALLOC(ctx->mtype, sizeof(*ev))) == NULL)
! 321: return (-1);
! 322: memset(ev, 0, sizeof(*ev));
! 323: ev->magic = PEVENT_MAGIC;
! 324: ev->ctx = ctx;
! 325: ev->handler = handler;
! 326: ev->arg = arg;
! 327: ev->flags = flags;
! 328: ev->poll_idx = -1;
! 329: ev->mutex = mutex;
! 330: ev->type = type;
! 331: ev->refs = 1; /* the caller's reference */
! 332: DBG(PEVENT, "new ev %p in ctx %p", ev, ctx);
! 333:
! 334: /* Add type-specific info */
! 335: switch (type) {
! 336: case PEVENT_READ:
! 337: case PEVENT_WRITE:
! 338: va_start(args, type);
! 339: ev->u.fd = va_arg(args, int);
! 340: va_end(args);
! 341: break;
! 342: case PEVENT_TIME:
! 343: va_start(args, type);
! 344: ev->u.millis = va_arg(args, int);
! 345: va_end(args);
! 346: if (ev->u.millis < 0)
! 347: ev->u.millis = 0;
! 348: gettimeofday(&ev->when, NULL);
! 349: ev->when.tv_sec += ev->u.millis / 1000;
! 350: ev->when.tv_usec += (ev->u.millis % 1000) * 1000;
! 351: if (ev->when.tv_usec > 1000000) {
! 352: ev->when.tv_sec++;
! 353: ev->when.tv_usec -= 1000000;
! 354: }
! 355: break;
! 356: case PEVENT_MESG_PORT:
! 357: va_start(args, type);
! 358: ev->u.port = va_arg(args, struct mesg_port *);
! 359: va_end(args);
! 360: if (ev->u.port == NULL)
! 361: goto invalid;
! 362: break;
! 363: case PEVENT_USER:
! 364: break;
! 365: default:
! 366: invalid:
! 367: errno = EINVAL;
! 368: _pevent_unref(ev);
! 369: return (-1);
! 370: }
! 371:
! 372: /* Lock context */
! 373: MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
! 374:
! 375: /* Link to related object (if appropriate) */
! 376: switch (ev->type) {
! 377: case PEVENT_MESG_PORT:
! 378: if (_mesg_port_set_event(ev->u.port, ev) == -1) {
! 379: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
! 380: _pevent_unref(ev);
! 381: return (-1);
! 382: }
! 383: DBG(PEVENT, "ev %p refs %d -> %d (mesg port)",
! 384: ev, ev->refs, ev->refs + 1);
! 385: ev->refs++; /* the mesg_port's reference */
! 386: break;
! 387: default:
! 388: break;
! 389: }
! 390:
! 391: /* Start or notify event thread */
! 392: if (ctx->thread == 0) {
! 393: if ((errno = pthread_create(&ctx->thread,
! 394: ctx->has_attr ? &ctx->attr : NULL,
! 395: pevent_ctx_main, ctx)) != 0) {
! 396: ctx->thread = 0;
! 397: alogf(LOG_ERR, "%s: %m", "pthread_create");
! 398: ev->flags |= PEVENT_CANCELED;
! 399: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
! 400: _pevent_unref(ev);
! 401: return (-1);
! 402: }
! 403: pthread_detach(ctx->thread);
! 404: DBG(PEVENT, "created tid %p for ctx %p, refs -> %d",
! 405: ctx->thread, ctx, ctx->refs + 1);
! 406: ctx->refs++; /* add reference for thread */
! 407: } else
! 408: pevent_ctx_notify(ctx);
! 409:
! 410: /* Caller gets the one reference */
! 411: ev->peventp = peventp;
! 412: *peventp = ev;
! 413:
! 414: /* Add event to the pending event list */
! 415: PEVENT_ENQUEUE(ctx, ev);
! 416:
! 417: /* Unlock context */
! 418: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
! 419:
! 420: return (0);
! 421: }
! 422:
! 423: /*
! 424: * Unregister an event.
! 425: */
! 426: void
! 427: pevent_unregister(struct pevent **peventp)
! 428: {
! 429: struct pevent *const ev = *peventp;
! 430: struct pevent_ctx *ctx;
! 431:
! 432: /* Allow NULL event */
! 433: if (ev == NULL)
! 434: return;
! 435: ctx = ev->ctx;
! 436: DBG(PEVENT, "unregister ev %p in ctx %p", ev, ctx);
! 437:
! 438: /* Sanity checks */
! 439: assert(ev->magic == PEVENT_MAGIC);
! 440: assert(ctx->magic == PEVENT_CTX_MAGIC);
! 441: assert(ev->peventp == peventp);
! 442:
! 443: /* Lock context */
! 444: MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
! 445: DBG(PEVENT, "unregister ev %p in ctx %p: lock", ev, ctx);
! 446:
! 447: /* Dequeue event if queued */
! 448: if ((ev->flags & PEVENT_ENQUEUED) != 0) {
! 449: PEVENT_DEQUEUE(ctx, ev);
! 450: pevent_ctx_notify(ctx); /* wakeup thread */
! 451: }
! 452:
! 453: /* Cancel the event */
! 454: pevent_cancel(ev);
! 455:
! 456: /* Unlock context */
! 457: DBG(PEVENT, "unregister ev %p in ctx %p: unlock", ev, ctx);
! 458: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
! 459: }
! 460:
! 461: /*
! 462: * Trigger an event.
! 463: */
! 464: void
! 465: pevent_trigger(struct pevent *ev)
! 466: {
! 467: struct pevent_ctx *ctx;
! 468:
! 469: /* Sanity check */
! 470: if (ev == NULL)
! 471: return;
! 472:
! 473: /* Get context */
! 474: ctx = ev->ctx;
! 475: DBG(PEVENT, "trigger ev %p in ctx %p", ev, ctx);
! 476: assert(ev->magic == PEVENT_MAGIC);
! 477:
! 478: /* Lock context */
! 479: MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
! 480:
! 481: /* If event already canceled, bail */
! 482: if ((ev->flags & PEVENT_CANCELED) != 0)
! 483: goto done;
! 484:
! 485: /* Mark event as having occurred */
! 486: PEVENT_SET_OCCURRED(ctx, ev);
! 487:
! 488: /* Wake up thread if event is still in the queue */
! 489: if ((ev->flags & PEVENT_ENQUEUED) != 0)
! 490: pevent_ctx_notify(ctx);
! 491:
! 492: done:
! 493: /* Unlock context */
! 494: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
! 495: }
! 496:
! 497: /*
! 498: * Get event info.
! 499: */
! 500: int
! 501: pevent_get_info(struct pevent *ev, struct pevent_info *info)
! 502: {
! 503: if (ev == NULL) {
! 504: errno = ENXIO;
! 505: return (-1);
! 506: }
! 507: assert(ev->magic == PEVENT_MAGIC);
! 508: memset(info, 0, sizeof(*info));
! 509: info->type = ev->type;
! 510: switch (ev->type) {
! 511: case PEVENT_READ:
! 512: case PEVENT_WRITE:
! 513: info->u.fd = ev->u.fd;
! 514: break;
! 515: case PEVENT_TIME:
! 516: info->u.millis = ev->u.millis;
! 517: break;
! 518: case PEVENT_MESG_PORT:
! 519: info->u.port = ev->u.port;
! 520: break;
! 521: case PEVENT_USER:
! 522: break;
! 523: default:
! 524: errno = EINVAL;
! 525: return (-1);
! 526: }
! 527: return (0);
! 528: }
! 529:
! 530: /*
! 531: * Event thread entry point.
! 532: */
! 533: static void *
! 534: pevent_ctx_main(void *arg)
! 535: {
! 536: struct pevent_ctx *const ctx = arg;
! 537: struct timeval now;
! 538: struct pollfd *fd;
! 539: struct pevent *ev;
! 540: struct pevent *next_ev;
! 541: int poll_idx;
! 542: int timeout;
! 543: int r;
! 544:
! 545: /* Push cleanup hook */
! 546: pthread_cleanup_push(pevent_ctx_main_cleanup, ctx);
! 547: assert(ctx->magic == PEVENT_CTX_MAGIC);
! 548:
! 549: /* Lock context */
! 550: MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
! 551:
! 552: /* Safety belts */
! 553: pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
! 554:
! 555: /* Get current time */
! 556: gettimeofday(&now, NULL);
! 557: DBG(PEVENT, "ctx %p thread starting", ctx);
! 558:
! 559: loop:
! 560: /* Are there any events left? */
! 561: if (TAILQ_EMPTY(&ctx->events)) {
! 562: DBG(PEVENT, "ctx %p thread exiting", ctx);
! 563: goto done;
! 564: }
! 565:
! 566: /* Make sure ctx->fds array is long enough */
! 567: if (ctx->fds_alloc < 1 + ctx->nrwevents) {
! 568: const u_int new_alloc = roundup(1 + ctx->nrwevents, 16);
! 569: void *mem;
! 570:
! 571: if ((mem = REALLOC(ctx->mtype, ctx->fds,
! 572: new_alloc * sizeof(*ctx->fds))) == NULL)
! 573: alogf(LOG_ERR, "%s: %m", "realloc");
! 574: else {
! 575: ctx->fds = mem;
! 576: ctx->fds_alloc = new_alloc;
! 577: }
! 578: }
! 579:
! 580: /* If we were intentionally woken up, read the wakeup byte */
! 581: if (ctx->notified) {
! 582: DBG(PEVENT, "ctx %p thread was notified", ctx);
! 583: (void)read(ctx->pipe[0], &pevent_byte, 1);
! 584: ctx->notified = 0;
! 585: }
! 586:
! 587: /* Add event for the notify pipe */
! 588: poll_idx = 0;
! 589: if (ctx->fds_alloc > 0) {
! 590: fd = &ctx->fds[poll_idx++];
! 591: memset(fd, 0, sizeof(*fd));
! 592: fd->fd = ctx->pipe[0];
! 593: fd->events = POLLRDNORM;
! 594: }
! 595:
! 596: /* Fill in rest of poll() array */
! 597: timeout = INFTIM;
! 598: TAILQ_FOREACH(ev, &ctx->events, next) {
! 599: switch (ev->type) {
! 600: case PEVENT_READ:
! 601: case PEVENT_WRITE:
! 602: {
! 603: if (poll_idx >= ctx->fds_alloc) {
! 604: ev->poll_idx = -1;
! 605: break;
! 606: }
! 607: ev->poll_idx = poll_idx++;
! 608: fd = &ctx->fds[ev->poll_idx];
! 609: memset(fd, 0, sizeof(*fd));
! 610: fd->fd = ev->u.fd;
! 611: fd->events = (ev->type == PEVENT_READ) ?
! 612: POLLRDNORM : POLLWRNORM;
! 613: break;
! 614: }
! 615: case PEVENT_TIME:
! 616: {
! 617: struct timeval remain;
! 618: int millis;
! 619:
! 620: /* Compute milliseconds until event */
! 621: if (timercmp(&ev->when, &now, <=))
! 622: millis = 0;
! 623: else {
! 624: timersub(&ev->when, &now, &remain);
! 625: millis = remain.tv_sec * 1000;
! 626: millis += remain.tv_usec / 1000;
! 627: }
! 628:
! 629: /* Remember the minimum delay */
! 630: if (timeout == INFTIM || millis < timeout)
! 631: timeout = millis;
! 632: break;
! 633: }
! 634: default:
! 635: break;
! 636: }
! 637: }
! 638:
! 639: /* Mark non-poll() events that have occurred */
! 640: TAILQ_FOREACH(ev, &ctx->events, next) {
! 641: assert(ev->magic == PEVENT_MAGIC);
! 642: switch (ev->type) {
! 643: case PEVENT_MESG_PORT:
! 644: if (mesg_port_qlen(ev->u.port) > 0)
! 645: PEVENT_SET_OCCURRED(ctx, ev);
! 646: break;
! 647: default:
! 648: break;
! 649: }
! 650: if ((ev->flags & PEVENT_OCCURRED) != 0)
! 651: timeout = 0; /* don't delay */
! 652: }
! 653:
! 654: #if PDEL_DEBUG
! 655: /* Debugging */
! 656: DBG(PEVENT, "ctx %p thread event list:", ctx);
! 657: TAILQ_FOREACH(ev, &ctx->events, next)
! 658: DBG(PEVENT, "\tev %p", ev);
! 659: #endif
! 660:
! 661: /* Wait for something to happen */
! 662: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
! 663: DBG(PEVENT, "ctx %p thread sleeping", ctx);
! 664: r = poll(ctx->fds, poll_idx, timeout);
! 665: DBG(PEVENT, "ctx %p thread woke up", ctx);
! 666: assert(ctx->magic == PEVENT_CTX_MAGIC);
! 667: MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
! 668:
! 669: /* Check for errors */
! 670: if (r == -1 && errno != EINTR) {
! 671: alogf(LOG_CRIT, "%s: %m", "poll");
! 672: assert(0);
! 673: }
! 674:
! 675: /* Update current time */
! 676: gettimeofday(&now, NULL);
! 677:
! 678: /* Mark poll() events that have occurred */
! 679: for (ev = TAILQ_FIRST((&ctx->events)); ev != NULL; ev = next_ev) {
! 680: next_ev = TAILQ_NEXT(ev, next);
! 681: assert(ev->magic == PEVENT_MAGIC);
! 682: switch (ev->type) {
! 683: case PEVENT_READ:
! 684: case PEVENT_WRITE:
! 685: if (ev->poll_idx == -1)
! 686: break;
! 687: fd = &ctx->fds[ev->poll_idx];
! 688: if ((fd->revents & ((ev->type == PEVENT_READ) ?
! 689: READABLE_EVENTS : WRITABLE_EVENTS)) != 0)
! 690: PEVENT_SET_OCCURRED(ctx, ev);
! 691: break;
! 692: case PEVENT_TIME:
! 693: if (timercmp(&ev->when, &now, <=))
! 694: PEVENT_SET_OCCURRED(ctx, ev);
! 695: break;
! 696: default:
! 697: break;
! 698: }
! 699: }
! 700:
! 701: /* Service all events that are marked as having occurred */
! 702: while (1) {
! 703:
! 704: /* Find next event that needs service */
! 705: ev = TAILQ_FIRST(&ctx->events);
! 706: if (ev == NULL || (ev->flags & PEVENT_OCCURRED) == 0)
! 707: break;
! 708: DBG(PEVENT, "ctx %p thread servicing ev %p", ctx, ev);
! 709:
! 710: /* Detach event and service it while keeping a reference */
! 711: DBG(PEVENT, "ev %p refs %d -> %d (for servicing)",
! 712: ev, ev->refs, ev->refs + 1);
! 713: ev->refs++;
! 714: PEVENT_DEQUEUE(ctx, ev);
! 715: pevent_ctx_service(ev);
! 716: }
! 717:
! 718: /* Spin again */
! 719: DBG(PEVENT, "ctx %p thread spin again", ctx);
! 720: goto loop;
! 721:
! 722: done:;
! 723: /* Done */
! 724: pthread_cleanup_pop(1);
! 725: return (NULL);
! 726: }
! 727:
! 728: /*
! 729: * Cleanup routine for event thread.
! 730: */
! 731: static void
! 732: pevent_ctx_main_cleanup(void *arg)
! 733: {
! 734: struct pevent_ctx *const ctx = arg;
! 735:
! 736: DBG(PEVENT, "ctx %p thread cleanup", ctx);
! 737: ctx->thread = 0;
! 738: pevent_ctx_unref(ctx); /* release thread's reference */
! 739: }
! 740:
! 741: /*
! 742: * Service an event.
! 743: *
! 744: * This is called with an extra reference on the event which is
! 745: * removed by pevent_ctx_execute().
! 746: */
! 747: static void
! 748: pevent_ctx_service(struct pevent *ev)
! 749: {
! 750: struct pevent_ctx *const ctx = ev->ctx;
! 751: pthread_t tid;
! 752:
! 753: /* Does handler want to run in its own thread? */
! 754: if ((ev->flags & PEVENT_OWN_THREAD) != 0) {
! 755: if ((errno = pthread_create(&tid,
! 756: NULL, pevent_ctx_execute, ev)) != 0) {
! 757: alogf(LOG_ERR,
! 758: "can't spawn thread for event %p: %m", ev);
! 759: pevent_cancel(ev); /* removes the reference */
! 760: return;
! 761: }
! 762: pthread_detach(tid);
! 763: DBG(PEVENT, "dispatched thread %p for ev %p", tid, ev);
! 764: return;
! 765: }
! 766:
! 767: /* Execute handler with context unlocked */
! 768: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
! 769: pevent_ctx_execute(ev);
! 770: MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
! 771: }
! 772:
! 773: /*
! 774: * Execute an event handler.
! 775: *
! 776: * Grab the user mutex, if any, check if event was canceled,
! 777: * execute handler, release mutex.
! 778: *
! 779: * This is called with an extra reference on the event, which is
! 780: * removed by this function.
! 781: *
! 782: * This should be NOT be called with a lock on the event context.
! 783: */
! 784: static void *
! 785: pevent_ctx_execute(void *arg)
! 786: {
! 787: struct pevent *const ev = arg;
! 788: struct pevent_ctx *const ctx = ev->ctx;
! 789: int r;
! 790:
! 791: /* Sanity check */
! 792: assert(ev->magic == PEVENT_MAGIC);
! 793:
! 794: /* Register cleanup hook */
! 795: DBG(PEVENT, "ev %p being executed", ev);
! 796: pthread_cleanup_push(pevent_ctx_execute_cleanup, ev);
! 797:
! 798: try_again:
! 799: /* Acquire context mutex */
! 800: DBG(PEVENT, "locking ctx %p for ev %p", ctx, ev);
! 801: MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
! 802:
! 803: /* Check for cancellation to close the cancel/timeout race */
! 804: if ((ev->flags & PEVENT_CANCELED) != 0) {
! 805: DBG(PEVENT, "ev %p was canceled", ev);
! 806: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
! 807: goto done;
! 808: }
! 809:
! 810: /*
! 811: * Acquire user mutex but avoid two potential problems:
! 812: *
! 813: * - Race window: other thread could call timer_cancel()
! 814: * and destroy the user mutex just before we try to
! 815: * acquire it; we must hold context lock to prevent this.
! 816: *
! 817: * - Lock reversal deadlock: other threads acquire the
! 818: * user lock before the context lock, we do the reverse.
! 819: *
! 820: * Once we've acquire both locks, we drop the context lock.
! 821: */
! 822: if (ev->mutex != NULL) {
! 823: MUTEX_TRYLOCK(ev->mutex, ev->mutex_count);
! 824: if (errno != 0) {
! 825: if (errno != EBUSY) {
! 826: alogf(LOG_ERR,
! 827: "can't lock mutex for event %p: %m", ev);
! 828: pevent_cancel(ev);
! 829: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
! 830: goto done;
! 831: }
! 832: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
! 833: sched_yield();
! 834: goto try_again;
! 835: }
! 836: ev->flags |= PEVENT_GOT_MUTEX;
! 837: }
! 838:
! 839: /* Remove user's event reference (we still have one though) */
! 840: pevent_cancel(ev);
! 841:
! 842: /* Release context mutex */
! 843: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
! 844:
! 845: /* Reschedule a new event if recurring */
! 846: if ((ev->flags & PEVENT_RECURRING) != 0) {
! 847: switch (ev->type) {
! 848: case PEVENT_READ:
! 849: case PEVENT_WRITE:
! 850: r = pevent_register(ev->ctx, ev->peventp,
! 851: (ev->flags & PEVENT_USER_FLAGS), ev->mutex,
! 852: ev->handler, ev->arg, ev->type, ev->u.fd);
! 853: break;
! 854: case PEVENT_TIME:
! 855: r = pevent_register(ev->ctx, ev->peventp,
! 856: (ev->flags & PEVENT_USER_FLAGS), ev->mutex,
! 857: ev->handler, ev->arg, ev->type, ev->u.millis);
! 858: break;
! 859: case PEVENT_MESG_PORT:
! 860: r = pevent_register(ev->ctx, ev->peventp,
! 861: (ev->flags & PEVENT_USER_FLAGS), ev->mutex,
! 862: ev->handler, ev->arg, ev->type, ev->u.port);
! 863: break;
! 864: case PEVENT_USER:
! 865: r = pevent_register(ev->ctx, ev->peventp,
! 866: (ev->flags & PEVENT_USER_FLAGS), ev->mutex,
! 867: ev->handler, ev->arg, ev->type);
! 868: break;
! 869: default:
! 870: r = -1;
! 871: assert(0);
! 872: }
! 873: if (r == -1) {
! 874: alogf(LOG_ERR,
! 875: "can't re-register recurring event %p: %m", ev);
! 876: }
! 877: }
! 878:
! 879: /* Execute the handler */
! 880: (*ev->handler)(ev->arg);
! 881:
! 882: done:;
! 883: /* Release user mutex and event reference */
! 884: pthread_cleanup_pop(1);
! 885: return (NULL);
! 886: }
! 887:
! 888: /*
! 889: * Clean up for pevent_ctx_execute()
! 890: */
! 891: static void
! 892: pevent_ctx_execute_cleanup(void *arg)
! 893: {
! 894: struct pevent *const ev = arg;
! 895:
! 896: DBG(PEVENT, "ev %p cleanup", ev);
! 897: assert(ev->magic == PEVENT_MAGIC);
! 898: if ((ev->flags & PEVENT_GOT_MUTEX) != 0)
! 899: MUTEX_UNLOCK(ev->mutex, ev->mutex_count);
! 900: _pevent_unref(ev);
! 901: }
! 902:
! 903: /*
! 904: * Wakeup event thread because the event list has changed.
! 905: *
! 906: * This assumes the mutex is locked.
! 907: */
! 908: static void
! 909: pevent_ctx_notify(struct pevent_ctx *ctx)
! 910: {
! 911: DBG(PEVENT, "ctx %p being notified", ctx);
! 912: if (!ctx->notified) {
! 913: (void)write(ctx->pipe[1], &pevent_byte, 1);
! 914: ctx->notified = 1;
! 915: }
! 916: }
! 917:
! 918: /*
! 919: * Cancel an event (make it so that it never gets triggered) and
! 920: * remove the user reference to it.
! 921: *
! 922: * This assumes the lock is held.
! 923: */
! 924: static void
! 925: pevent_cancel(struct pevent *ev)
! 926: {
! 927: DBG(PEVENT, "canceling ev %p", ev);
! 928: assert((ev->flags & PEVENT_CANCELED) == 0);
! 929: ev->flags |= PEVENT_CANCELED;
! 930: *ev->peventp = NULL;
! 931: _pevent_unref(ev);
! 932: }
! 933:
! 934: /*
! 935: * Returns true if the event has been canceled.
! 936: */
! 937: int
! 938: _pevent_canceled(struct pevent *ev)
! 939: {
! 940: assert(ev->magic == PEVENT_MAGIC);
! 941: return ((ev->flags & PEVENT_CANCELED) != 0);
! 942: }
! 943:
! 944: /*
! 945: * Drop a reference on an event.
! 946: */
! 947: void
! 948: _pevent_unref(struct pevent *ev)
! 949: {
! 950: struct pevent_ctx *const ctx = ev->ctx;
! 951:
! 952: DBG(PEVENT, "ev %p refs %d -> %d", ev, ev->refs, ev->refs - 1);
! 953: assert(ev->refs > 0);
! 954: assert(ev->magic == PEVENT_MAGIC);
! 955: MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
! 956: if (--ev->refs > 0) {
! 957: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
! 958: return;
! 959: }
! 960: assert((ev->flags & PEVENT_ENQUEUED) == 0);
! 961: ev->magic = ~0; /* invalidate magic number */
! 962: DBG(PEVENT, "freeing ev %p", ev);
! 963: FREE(ctx->mtype, ev);
! 964: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
! 965: }
! 966:
! 967: /*
! 968: * Release context lock and drop a context reference.
! 969: *
! 970: * This assumes the mutex is locked. Upon return it will be unlocked.
! 971: */
! 972: static void
! 973: pevent_ctx_unref(struct pevent_ctx *ctx)
! 974: {
! 975: DBG(PEVENT, "ctx %p refs %d -> %d", ctx, ctx->refs, ctx->refs - 1);
! 976: assert(ctx->refs > 0);
! 977: assert(ctx->magic == PEVENT_CTX_MAGIC);
! 978: if (--ctx->refs > 0) {
! 979: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
! 980: return;
! 981: }
! 982: assert(TAILQ_EMPTY(&ctx->events));
! 983: assert(ctx->nevents == 0);
! 984: assert(ctx->thread == 0);
! 985: (void)close(ctx->pipe[0]);
! 986: (void)close(ctx->pipe[1]);
! 987: MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
! 988: pthread_mutex_destroy(&ctx->mutex);
! 989: if (ctx->has_attr)
! 990: pthread_attr_destroy(&ctx->attr);
! 991: ctx->magic = ~0; /* invalidate magic number */
! 992: DBG(PEVENT, "freeing ctx %p", ctx);
! 993: FREE(ctx->mtype, ctx->fds);
! 994: FREE(ctx->mtype, ctx);
! 995: }
! 996:
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>