Annotation of embedaddon/libpdel/util/pevent.c, revision 1.1.1.1

1.1       misho       1: 
                      2: /*
                      3:  * Copyright (c) 2001-2002 Packet Design, LLC.
                      4:  * All rights reserved.
                      5:  * 
                      6:  * Subject to the following obligations and disclaimer of warranty,
                      7:  * use and redistribution of this software, in source or object code
                      8:  * forms, with or without modifications are expressly permitted by
                      9:  * Packet Design; provided, however, that:
                     10:  * 
                     11:  *    (i)  Any and all reproductions of the source or object code
                     12:  *         must include the copyright notice above and the following
                     13:  *         disclaimer of warranties; and
                     14:  *    (ii) No rights are granted, in any manner or form, to use
                     15:  *         Packet Design trademarks, including the mark "PACKET DESIGN"
                     16:  *         on advertising, endorsements, or otherwise except as such
                     17:  *         appears in the above copyright notice or in the software.
                     18:  * 
                     19:  * THIS SOFTWARE IS BEING PROVIDED BY PACKET DESIGN "AS IS", AND
                     20:  * TO THE MAXIMUM EXTENT PERMITTED BY LAW, PACKET DESIGN MAKES NO
                     21:  * REPRESENTATIONS OR WARRANTIES, EXPRESS OR IMPLIED, REGARDING
                     22:  * THIS SOFTWARE, INCLUDING WITHOUT LIMITATION, ANY AND ALL IMPLIED
                     23:  * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE,
                     24:  * OR NON-INFRINGEMENT.  PACKET DESIGN DOES NOT WARRANT, GUARANTEE,
                     25:  * OR MAKE ANY REPRESENTATIONS REGARDING THE USE OF, OR THE RESULTS
                     26:  * OF THE USE OF THIS SOFTWARE IN TERMS OF ITS CORRECTNESS, ACCURACY,
                     27:  * RELIABILITY OR OTHERWISE.  IN NO EVENT SHALL PACKET DESIGN BE
                     28:  * LIABLE FOR ANY DAMAGES RESULTING FROM OR ARISING OUT OF ANY USE
                     29:  * OF THIS SOFTWARE, INCLUDING WITHOUT LIMITATION, ANY DIRECT,
                     30:  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, PUNITIVE, OR CONSEQUENTIAL
                     31:  * DAMAGES, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES, LOSS OF
                     32:  * USE, DATA OR PROFITS, HOWEVER CAUSED AND UNDER ANY THEORY OF
                     33:  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
                     34:  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
                     35:  * THE USE OF THIS SOFTWARE, EVEN IF PACKET DESIGN IS ADVISED OF
                     36:  * THE POSSIBILITY OF SUCH DAMAGE.
                     37:  *
                     38:  * Author: Archie Cobbs <archie@freebsd.org>
                     39:  */
                     40: 
                     41: #include <sys/types.h>
                     42: #include <sys/param.h>
                     43: #include <sys/queue.h>
                     44: #include <sys/time.h>
                     45: 
                     46: #include <netinet/in.h>
                     47: 
                     48: #include <stdarg.h>
                     49: #include <stdio.h>
                     50: #include <syslog.h>
                     51: #include <string.h>
                     52: #include <errno.h>
                     53: #include <assert.h>
                     54: #include <unistd.h>
                     55: #include <poll.h>
                     56: #include <sched.h>
                     57: #include <pthread.h>
                     58: 
                     59: #include "structs/structs.h"
                     60: #include "structs/type/array.h"
                     61: #include "util/typed_mem.h"
                     62: #include "util/mesg_port.h"
                     63: #include "util/pevent.h"
                     64: #include "sys/alog.h"
                     65: 
                     66: #include "internal.h"
                     67: #include "debug/debug.h"
                     68: 
                     69: #define PEVENT_MAGIC           0x31d7699b
                     70: #define PEVENT_CTX_MAGIC       0x7842f901
                     71: 
                     72: /* Private flags */
                     73: #define PEVENT_OCCURRED                0x8000          /* event has occurred */
                     74: #define PEVENT_CANCELED                0x4000          /* event canceled or done */
                     75: #define PEVENT_ENQUEUED                0x2000          /* in the ctx->events queue */
                     76: #define PEVENT_GOT_MUTEX       0x1000          /* user mutex acquired */
                     77: 
                     78: #define PEVENT_USER_FLAGS      (PEVENT_RECURRING | PEVENT_OWN_THREAD)
                     79: 
                     80: #define READABLE_EVENTS                (POLLIN | POLLRDNORM | POLLERR \
                     81:                                    | POLLHUP | POLLNVAL)
                     82: #define WRITABLE_EVENTS                (POLLOUT | POLLWRNORM | POLLWRBAND \
                     83:                                    | POLLERR | POLLHUP | POLLNVAL)
                     84: 
                     85: /* Event context */
                     86: struct pevent_ctx {
                     87:        u_int32_t               magic;          /* magic number */
                     88:        pthread_mutex_t         mutex;          /* mutex for context */
                     89: #if PDEL_DEBUG
                     90:        int                     mutex_count;    /* mutex count */
                     91: #endif
                     92:        pthread_attr_t          attr;           /* event thread attributes */
                     93:        pthread_t               thread;         /* event thread */
                     94:        TAILQ_HEAD(, pevent)    events;         /* pending event list */
                     95:        u_int                   nevents;        /* length of 'events' list */
                     96:        u_int                   nrwevents;      /* number read/write events */
                     97:        struct pollfd           *fds;           /* poll(2) fds array */
                     98:        u_int                   fds_alloc;      /* allocated size of 'fds' */
                     99:        const char              *mtype;         /* typed_mem(3) memory type */
                    100:        char                    mtype_buf[TYPED_MEM_TYPELEN];
                    101:        int                     pipe[2];        /* event thread notify pipe */
                    102:        u_char                  notified;       /* data in the pipe */
                    103:        u_char                  has_attr;       /* 'attr' is valid */
                    104:        u_int                   refs;           /* references to this context */
                    105: };
                    106: 
                    107: /* Event object */
                    108: struct pevent {
                    109:        u_int32_t               magic;          /* magic number */
                    110:        struct pevent_ctx       *ctx;           /* pointer to event context */
                    111:        struct pevent           **peventp;      /* user handle to this event */
                    112:        pevent_handler_t        *handler;       /* event handler function */
                    113:        void                    *arg;           /* event handler function arg */
                    114:        int                     flags;          /* event flags */
                    115:        int                     poll_idx;       /* index in poll(2) fds array */
                    116:        pthread_mutex_t         *mutex;         /* user mutex, if any */
                    117: #if PDEL_DEBUG
                    118:        int                     mutex_count;    /* mutex count */
                    119: #endif
                    120:        enum pevent_type        type;           /* type of this event */
                    121:        struct timeval          when;           /* expiration for time events */
                    122:        u_int                   refs;           /* references to this event */
                    123:        union {
                    124:                int             fd;             /* file descriptor */
                    125:                int             millis;         /* time delay */
                    126:                struct mesg_port *port;         /* mesg_port */
                    127:        }                       u;
                    128:        TAILQ_ENTRY(pevent)     next;           /* next in ctx->events */
                    129: };
                    130: 
                    131: /* Macros */
                    132: #define PEVENT_ENQUEUE(ctx, ev)                                                \
                    133:        do {                                                            \
                    134:                assert(((ev)->flags & PEVENT_ENQUEUED) == 0);           \
                    135:                TAILQ_INSERT_TAIL(&(ctx)->events, (ev), next);          \
                    136:                (ev)->flags |= PEVENT_ENQUEUED;                         \
                    137:                (ctx)->nevents++;                                       \
                    138:                if ((ev)->type == PEVENT_READ                           \
                    139:                    || (ev)->type == PEVENT_WRITE)                      \
                    140:                        (ctx)->nrwevents++;                             \
                    141:                DBG(PEVENT, "ev %p refs %d -> %d (enqueued)",           \
                    142:                    (ev), (ev)->refs, (ev)->refs + 1);                  \
                    143:                (ev)->refs++;                                           \
                    144:        } while (0)
                    145: 
                    146: #define PEVENT_DEQUEUE(ctx, ev)                                                \
                    147:        do {                                                            \
                    148:                assert(((ev)->flags & PEVENT_ENQUEUED) != 0);           \
                    149:                TAILQ_REMOVE(&(ctx)->events, (ev), next);               \
                    150:                (ctx)->nevents--;                                       \
                    151:                if ((ev)->type == PEVENT_READ                           \
                    152:                    || (ev)->type == PEVENT_WRITE)                      \
                    153:                        (ctx)->nrwevents--;                             \
                    154:                (ev)->flags &= ~PEVENT_ENQUEUED;                        \
                    155:                _pevent_unref(ev);                                      \
                    156:        } while (0)
                    157: 
                    158: #define PEVENT_SET_OCCURRED(ctx, ev)                                   \
                    159:        do {                                                            \
                    160:                (ev)->flags |= PEVENT_OCCURRED;                         \
                    161:                if ((ev) != TAILQ_FIRST(&ctx->events)) {                \
                    162:                        TAILQ_REMOVE(&(ctx)->events, (ev), next);       \
                    163:                        TAILQ_INSERT_HEAD(&(ctx)->events, (ev), next);  \
                    164:                }                                                       \
                    165:        } while (0)
                    166: 
                    167: /* Internal functions */
                    168: static void    pevent_ctx_service(struct pevent *ev);
                    169: static void    *pevent_ctx_main(void *arg);
                    170: static void    pevent_ctx_main_cleanup(void *arg);
                    171: static void    *pevent_ctx_execute(void *arg);
                    172: static void    pevent_ctx_execute_cleanup(void *arg);
                    173: static void    pevent_ctx_notify(struct pevent_ctx *ctx);
                    174: static void    pevent_ctx_unref(struct pevent_ctx *ctx);
                    175: static void    pevent_cancel(struct pevent *ev);
                    176: 
                    177: /* Internal variables */
                    178: static char    pevent_byte;
                    179: 
                    180: /*
                    181:  * Create a new event context.
                    182:  */
                    183: struct pevent_ctx *
                    184: pevent_ctx_create(const char *mtype, const pthread_attr_t *attr)
                    185: {
                    186:        pthread_mutexattr_t mutexattr;
                    187:        struct pevent_ctx *ctx;
                    188:        int got_mutexattr = 0;
                    189:        int got_mutex = 0;
                    190: 
                    191:        /* Create context object */
                    192:        if ((ctx = MALLOC(mtype, sizeof(*ctx))) == NULL)
                    193:                return (NULL);
                    194:        memset(ctx, 0, sizeof(*ctx));
                    195:        if (mtype != NULL) {
                    196:                strlcpy(ctx->mtype_buf, mtype, sizeof(ctx->mtype_buf));
                    197:                ctx->mtype = ctx->mtype_buf;
                    198:        }
                    199:        TAILQ_INIT(&ctx->events);
                    200: 
                    201:        /* Copy thread attributes */
                    202:        if (attr != NULL) {
                    203:                struct sched_param param;
                    204:                int value;
                    205: 
                    206:                if ((errno = pthread_attr_init(&ctx->attr)) != 0)
                    207:                        goto fail;
                    208:                ctx->has_attr = 1;
                    209:                pthread_attr_getinheritsched(attr, &value);
                    210:                pthread_attr_setinheritsched(&ctx->attr, value);
                    211:                pthread_attr_getschedparam(attr, &param);
                    212:                pthread_attr_setschedparam(&ctx->attr, &param);
                    213:                pthread_attr_getschedpolicy(attr, &value);
                    214:                pthread_attr_setschedpolicy(&ctx->attr, value);
                    215:                pthread_attr_getscope(attr, &value);
                    216:                pthread_attr_setscope(&ctx->attr, value);
                    217:        }
                    218: 
                    219:        /* Initialize mutex */
                    220:        if ((errno = pthread_mutexattr_init(&mutexattr) != 0))
                    221:                goto fail;
                    222:        got_mutexattr = 1;
                    223:        pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_RECURSIVE);
                    224:        if ((errno = pthread_mutex_init(&ctx->mutex, &mutexattr)) != 0)
                    225:                goto fail;
                    226:        got_mutex = 1;
                    227: 
                    228:        /* Initialize notify pipe */
                    229:        if (pipe(ctx->pipe) == -1)
                    230:                goto fail;
                    231: 
                    232:        /* Finish up */
                    233:        pthread_mutexattr_destroy(&mutexattr);
                    234:        ctx->magic = PEVENT_CTX_MAGIC;
                    235:        ctx->refs = 1;
                    236:        DBG(PEVENT, "created ctx %p", ctx);
                    237:        return (ctx);
                    238: 
                    239: fail:
                    240:        /* Clean up after failure */
                    241:        if (got_mutex)
                    242:                pthread_mutex_destroy(&ctx->mutex);
                    243:        if (got_mutexattr)
                    244:                pthread_mutexattr_destroy(&mutexattr);
                    245:        if (ctx->has_attr)
                    246:                pthread_attr_destroy(&ctx->attr);
                    247:        FREE(mtype, ctx);
                    248:        return (NULL);
                    249: }
                    250: 
                    251: /*
                    252:  * Destroy an event context.
                    253:  *
                    254:  * All events are unregistered.
                    255:  */
                    256: void
                    257: pevent_ctx_destroy(struct pevent_ctx **ctxp)
                    258: {
                    259:        struct pevent_ctx *const ctx = *ctxp;
                    260: 
                    261:        /* Allow NULL context */
                    262:        if (ctx == NULL)
                    263:                return;
                    264:        *ctxp = NULL;
                    265:        DBG(PEVENT, "destroying ctx %p", ctx);
                    266: 
                    267:        /* Lock context */
                    268:        MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
                    269: 
                    270:        /* Sanity check */
                    271:        assert(ctx->magic == PEVENT_CTX_MAGIC);
                    272: 
                    273:        /* Unregister all events */
                    274:        while (!TAILQ_EMPTY(&ctx->events))
                    275:                pevent_unregister(TAILQ_FIRST(&ctx->events)->peventp);
                    276: 
                    277:        /* Decrement context reference count and unlock context */
                    278:        pevent_ctx_unref(ctx);
                    279: }
                    280: 
                    281: /*
                    282:  * Return the number of registered events.
                    283:  */
                    284: u_int
                    285: pevent_ctx_count(struct pevent_ctx *ctx)
                    286: {
                    287:        u_int nevents;
                    288: 
                    289:        assert(ctx->magic == PEVENT_CTX_MAGIC);
                    290:        MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
                    291:        nevents = ctx->nevents;
                    292:        MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
                    293:        return (nevents);
                    294: }
                    295: 
                    296: /*
                    297:  * Create a new schedule item.
                    298:  */
                    299: int
                    300: pevent_register(struct pevent_ctx *ctx, struct pevent **peventp, int flags,
                    301:        pthread_mutex_t *mutex, pevent_handler_t *handler, void *arg,
                    302:        enum pevent_type type, ...)
                    303: {
                    304:        struct pevent *ev;
                    305:        va_list args;
                    306: 
                    307:        /* Sanity check */
                    308:        if (*peventp != NULL) {
                    309:                errno = EBUSY;
                    310:                return (-1);
                    311:        }
                    312: 
                    313:        /* Check flags */
                    314:        if ((flags & ~PEVENT_USER_FLAGS) != 0) {
                    315:                errno = EINVAL;
                    316:                return (-1);
                    317:        }
                    318: 
                    319:        /* Create new event */
                    320:        if ((ev = MALLOC(ctx->mtype, sizeof(*ev))) == NULL)
                    321:                return (-1);
                    322:        memset(ev, 0, sizeof(*ev));
                    323:        ev->magic = PEVENT_MAGIC;
                    324:        ev->ctx = ctx;
                    325:        ev->handler = handler;
                    326:        ev->arg = arg;
                    327:        ev->flags = flags;
                    328:        ev->poll_idx = -1;
                    329:        ev->mutex = mutex;
                    330:        ev->type = type;
                    331:        ev->refs = 1;                           /* the caller's reference */
                    332:        DBG(PEVENT, "new ev %p in ctx %p", ev, ctx);
                    333: 
                    334:        /* Add type-specific info */
                    335:        switch (type) {
                    336:        case PEVENT_READ:
                    337:        case PEVENT_WRITE:
                    338:                va_start(args, type);
                    339:                ev->u.fd = va_arg(args, int);
                    340:                va_end(args);
                    341:                break;
                    342:        case PEVENT_TIME:
                    343:                va_start(args, type);
                    344:                ev->u.millis = va_arg(args, int);
                    345:                va_end(args);
                    346:                if (ev->u.millis < 0)
                    347:                        ev->u.millis = 0;
                    348:                gettimeofday(&ev->when, NULL);
                    349:                ev->when.tv_sec += ev->u.millis / 1000;
                    350:                ev->when.tv_usec += (ev->u.millis % 1000) * 1000;
                    351:                if (ev->when.tv_usec > 1000000) {
                    352:                        ev->when.tv_sec++;
                    353:                        ev->when.tv_usec -= 1000000;
                    354:                }
                    355:                break;
                    356:        case PEVENT_MESG_PORT:
                    357:                va_start(args, type);
                    358:                ev->u.port = va_arg(args, struct mesg_port *);
                    359:                va_end(args);
                    360:                if (ev->u.port == NULL)
                    361:                        goto invalid;
                    362:                break;
                    363:        case PEVENT_USER:
                    364:                break;
                    365:        default:
                    366:        invalid:
                    367:                errno = EINVAL;
                    368:                _pevent_unref(ev);
                    369:                return (-1);
                    370:        }
                    371: 
                    372:        /* Lock context */
                    373:        MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
                    374: 
                    375:        /* Link to related object (if appropriate) */
                    376:        switch (ev->type) {
                    377:        case PEVENT_MESG_PORT:
                    378:                if (_mesg_port_set_event(ev->u.port, ev) == -1) {
                    379:                        MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
                    380:                        _pevent_unref(ev);
                    381:                        return (-1);
                    382:                }
                    383:                DBG(PEVENT, "ev %p refs %d -> %d (mesg port)",
                    384:                    ev, ev->refs, ev->refs + 1);
                    385:                ev->refs++;                     /* the mesg_port's reference */
                    386:                break;
                    387:        default:
                    388:                break;
                    389:        }
                    390: 
                    391:        /* Start or notify event thread */
                    392:        if (ctx->thread == 0) {
                    393:                if ((errno = pthread_create(&ctx->thread,
                    394:                    ctx->has_attr ? &ctx->attr : NULL,
                    395:                    pevent_ctx_main, ctx)) != 0) {
                    396:                        ctx->thread = 0;
                    397:                        alogf(LOG_ERR, "%s: %m", "pthread_create");
                    398:                        ev->flags |= PEVENT_CANCELED;
                    399:                        MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
                    400:                        _pevent_unref(ev);
                    401:                        return (-1);
                    402:                }
                    403:                pthread_detach(ctx->thread);
                    404:                DBG(PEVENT, "created tid %p for ctx %p, refs -> %d",
                    405:                    ctx->thread, ctx, ctx->refs + 1);
                    406:                ctx->refs++;                    /* add reference for thread */
                    407:        } else
                    408:                pevent_ctx_notify(ctx);
                    409: 
                    410:        /* Caller gets the one reference */
                    411:        ev->peventp = peventp;
                    412:        *peventp = ev;
                    413: 
                    414:        /* Add event to the pending event list */
                    415:        PEVENT_ENQUEUE(ctx, ev);
                    416: 
                    417:        /* Unlock context */
                    418:        MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
                    419: 
                    420:        return (0);
                    421: }
                    422: 
                    423: /*
                    424:  * Unregister an event.
                    425:  */
                    426: void
                    427: pevent_unregister(struct pevent **peventp)
                    428: {
                    429:        struct pevent *const ev = *peventp;
                    430:        struct pevent_ctx *ctx;
                    431: 
                    432:        /* Allow NULL event */
                    433:        if (ev == NULL)
                    434:                return;
                    435:        ctx = ev->ctx;
                    436:        DBG(PEVENT, "unregister ev %p in ctx %p", ev, ctx);
                    437: 
                    438:        /* Sanity checks */
                    439:        assert(ev->magic == PEVENT_MAGIC);
                    440:        assert(ctx->magic == PEVENT_CTX_MAGIC);
                    441:        assert(ev->peventp == peventp);
                    442: 
                    443:        /* Lock context */
                    444:        MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
                    445:        DBG(PEVENT, "unregister ev %p in ctx %p: lock", ev, ctx);
                    446: 
                    447:        /* Dequeue event if queued */
                    448:        if ((ev->flags & PEVENT_ENQUEUED) != 0) {
                    449:                PEVENT_DEQUEUE(ctx, ev);
                    450:                pevent_ctx_notify(ctx);                 /* wakeup thread */
                    451:        }
                    452: 
                    453:        /* Cancel the event */
                    454:        pevent_cancel(ev);
                    455: 
                    456:        /* Unlock context */
                    457:        DBG(PEVENT, "unregister ev %p in ctx %p: unlock", ev, ctx);
                    458:        MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
                    459: }
                    460: 
                    461: /*
                    462:  * Trigger an event.
                    463:  */
                    464: void
                    465: pevent_trigger(struct pevent *ev)
                    466: {
                    467:        struct pevent_ctx *ctx;
                    468: 
                    469:        /* Sanity check */
                    470:        if (ev == NULL)
                    471:                return;
                    472: 
                    473:        /* Get context */
                    474:        ctx = ev->ctx;
                    475:        DBG(PEVENT, "trigger ev %p in ctx %p", ev, ctx);
                    476:        assert(ev->magic == PEVENT_MAGIC);
                    477: 
                    478:        /* Lock context */
                    479:        MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
                    480: 
                    481:        /* If event already canceled, bail */
                    482:        if ((ev->flags & PEVENT_CANCELED) != 0)
                    483:                goto done;
                    484: 
                    485:        /* Mark event as having occurred */
                    486:        PEVENT_SET_OCCURRED(ctx, ev);
                    487: 
                    488:        /* Wake up thread if event is still in the queue */
                    489:        if ((ev->flags & PEVENT_ENQUEUED) != 0)
                    490:                pevent_ctx_notify(ctx);
                    491: 
                    492: done:
                    493:        /* Unlock context */
                    494:        MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
                    495: }
                    496: 
                    497: /*
                    498:  * Get event info.
                    499:  */
                    500: int
                    501: pevent_get_info(struct pevent *ev, struct pevent_info *info)
                    502: {
                    503:        if (ev == NULL) {
                    504:                errno = ENXIO;
                    505:                return (-1);
                    506:        }
                    507:        assert(ev->magic == PEVENT_MAGIC);
                    508:        memset(info, 0, sizeof(*info));
                    509:        info->type = ev->type;
                    510:        switch (ev->type) {
                    511:        case PEVENT_READ:
                    512:        case PEVENT_WRITE:
                    513:                info->u.fd = ev->u.fd;
                    514:                break;
                    515:        case PEVENT_TIME:
                    516:                info->u.millis = ev->u.millis;
                    517:                break;
                    518:        case PEVENT_MESG_PORT:
                    519:                info->u.port = ev->u.port;
                    520:                break;
                    521:        case PEVENT_USER:
                    522:                break;
                    523:        default:
                    524:                errno = EINVAL;
                    525:                return (-1);
                    526:        }
                    527:        return (0);
                    528: }
                    529: 
                    530: /*
                    531:  * Event thread entry point.
                    532:  */
                    533: static void *
                    534: pevent_ctx_main(void *arg)
                    535: {
                    536:        struct pevent_ctx *const ctx = arg;
                    537:        struct timeval now;
                    538:        struct pollfd *fd;
                    539:        struct pevent *ev;
                    540:        struct pevent *next_ev;
                    541:        int poll_idx;
                    542:        int timeout;
                    543:        int r;
                    544: 
                    545:        /* Push cleanup hook */
                    546:        pthread_cleanup_push(pevent_ctx_main_cleanup, ctx);
                    547:        assert(ctx->magic == PEVENT_CTX_MAGIC);
                    548: 
                    549:        /* Lock context */
                    550:        MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
                    551: 
                    552:        /* Safety belts */
                    553:        pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
                    554: 
                    555:        /* Get current time */
                    556:        gettimeofday(&now, NULL);
                    557:        DBG(PEVENT, "ctx %p thread starting", ctx);
                    558: 
                    559: loop:
                    560:        /* Are there any events left? */
                    561:        if (TAILQ_EMPTY(&ctx->events)) {
                    562:                DBG(PEVENT, "ctx %p thread exiting", ctx);
                    563:                goto done;
                    564:        }
                    565: 
                    566:        /* Make sure ctx->fds array is long enough */
                    567:        if (ctx->fds_alloc < 1 + ctx->nrwevents) {
                    568:                const u_int new_alloc = roundup(1 + ctx->nrwevents, 16);
                    569:                void *mem;
                    570: 
                    571:                if ((mem = REALLOC(ctx->mtype, ctx->fds,
                    572:                    new_alloc * sizeof(*ctx->fds))) == NULL)
                    573:                        alogf(LOG_ERR, "%s: %m", "realloc");
                    574:                else {
                    575:                        ctx->fds = mem;
                    576:                        ctx->fds_alloc = new_alloc;
                    577:                }
                    578:        }
                    579: 
                    580:        /* If we were intentionally woken up, read the wakeup byte */
                    581:        if (ctx->notified) {
                    582:                DBG(PEVENT, "ctx %p thread was notified", ctx);
                    583:                (void)read(ctx->pipe[0], &pevent_byte, 1);
                    584:                ctx->notified = 0;
                    585:        }
                    586: 
                    587:        /* Add event for the notify pipe */
                    588:        poll_idx = 0;
                    589:        if (ctx->fds_alloc > 0) {
                    590:                fd = &ctx->fds[poll_idx++];
                    591:                memset(fd, 0, sizeof(*fd));
                    592:                fd->fd = ctx->pipe[0];
                    593:                fd->events = POLLRDNORM;
                    594:        }
                    595: 
                    596:        /* Fill in rest of poll() array */
                    597:        timeout = INFTIM;
                    598:        TAILQ_FOREACH(ev, &ctx->events, next) {
                    599:                switch (ev->type) {
                    600:                case PEVENT_READ:
                    601:                case PEVENT_WRITE:
                    602:                    {
                    603:                        if (poll_idx >= ctx->fds_alloc) {
                    604:                                ev->poll_idx = -1;
                    605:                                break;
                    606:                        }
                    607:                        ev->poll_idx = poll_idx++;
                    608:                        fd = &ctx->fds[ev->poll_idx];
                    609:                        memset(fd, 0, sizeof(*fd));
                    610:                        fd->fd = ev->u.fd;
                    611:                        fd->events = (ev->type == PEVENT_READ) ?
                    612:                            POLLRDNORM : POLLWRNORM;
                    613:                        break;
                    614:                    }
                    615:                case PEVENT_TIME:
                    616:                    {
                    617:                        struct timeval remain;
                    618:                        int millis;
                    619: 
                    620:                        /* Compute milliseconds until event */
                    621:                        if (timercmp(&ev->when, &now, <=))
                    622:                                millis = 0;
                    623:                        else {
                    624:                                timersub(&ev->when, &now, &remain);
                    625:                                millis = remain.tv_sec * 1000;
                    626:                                millis += remain.tv_usec / 1000;
                    627:                        }
                    628: 
                    629:                        /* Remember the minimum delay */
                    630:                        if (timeout == INFTIM || millis < timeout)
                    631:                                timeout = millis;
                    632:                        break;
                    633:                    }
                    634:                default:
                    635:                        break;
                    636:                }
                    637:        }
                    638: 
                    639:        /* Mark non-poll() events that have occurred */
                    640:        TAILQ_FOREACH(ev, &ctx->events, next) {
                    641:                assert(ev->magic == PEVENT_MAGIC);
                    642:                switch (ev->type) {
                    643:                case PEVENT_MESG_PORT:
                    644:                        if (mesg_port_qlen(ev->u.port) > 0)
                    645:                                PEVENT_SET_OCCURRED(ctx, ev);
                    646:                        break;
                    647:                default:
                    648:                        break;
                    649:                }
                    650:                if ((ev->flags & PEVENT_OCCURRED) != 0)
                    651:                        timeout = 0;                    /* don't delay */
                    652:        }
                    653: 
                    654: #if PDEL_DEBUG
                    655:        /* Debugging */
                    656:        DBG(PEVENT, "ctx %p thread event list:", ctx);
                    657:        TAILQ_FOREACH(ev, &ctx->events, next)
                    658:                DBG(PEVENT, "\tev %p", ev);
                    659: #endif
                    660: 
                    661:        /* Wait for something to happen */
                    662:        MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
                    663:        DBG(PEVENT, "ctx %p thread sleeping", ctx);
                    664:        r = poll(ctx->fds, poll_idx, timeout);
                    665:        DBG(PEVENT, "ctx %p thread woke up", ctx);
                    666:        assert(ctx->magic == PEVENT_CTX_MAGIC);
                    667:        MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
                    668: 
                    669:        /* Check for errors */
                    670:        if (r == -1 && errno != EINTR) {
                    671:                alogf(LOG_CRIT, "%s: %m", "poll");
                    672:                assert(0);
                    673:        }
                    674: 
                    675:        /* Update current time */
                    676:        gettimeofday(&now, NULL);
                    677: 
                    678:        /* Mark poll() events that have occurred */
                    679:        for (ev = TAILQ_FIRST((&ctx->events)); ev != NULL; ev = next_ev) {
                    680:                next_ev = TAILQ_NEXT(ev, next);
                    681:                assert(ev->magic == PEVENT_MAGIC);
                    682:                switch (ev->type) {
                    683:                case PEVENT_READ:
                    684:                case PEVENT_WRITE:
                    685:                        if (ev->poll_idx == -1)
                    686:                                break;
                    687:                        fd = &ctx->fds[ev->poll_idx];
                    688:                        if ((fd->revents & ((ev->type == PEVENT_READ) ?
                    689:                            READABLE_EVENTS : WRITABLE_EVENTS)) != 0)
                    690:                                PEVENT_SET_OCCURRED(ctx, ev);
                    691:                        break;
                    692:                case PEVENT_TIME:
                    693:                        if (timercmp(&ev->when, &now, <=))
                    694:                                PEVENT_SET_OCCURRED(ctx, ev);
                    695:                        break;
                    696:                default:
                    697:                        break;
                    698:                }
                    699:        }
                    700: 
                    701:        /* Service all events that are marked as having occurred */
                    702:        while (1) {
                    703: 
                    704:                /* Find next event that needs service */
                    705:                ev = TAILQ_FIRST(&ctx->events);
                    706:                if (ev == NULL || (ev->flags & PEVENT_OCCURRED) == 0)
                    707:                        break;
                    708:                DBG(PEVENT, "ctx %p thread servicing ev %p", ctx, ev);
                    709: 
                    710:                /* Detach event and service it while keeping a reference */
                    711:                DBG(PEVENT, "ev %p refs %d -> %d (for servicing)",
                    712:                    ev, ev->refs, ev->refs + 1);
                    713:                ev->refs++;
                    714:                PEVENT_DEQUEUE(ctx, ev);
                    715:                pevent_ctx_service(ev);
                    716:        }
                    717: 
                    718:        /* Spin again */
                    719:        DBG(PEVENT, "ctx %p thread spin again", ctx);
                    720:        goto loop;
                    721: 
                    722: done:;
                    723:        /* Done */
                    724:        pthread_cleanup_pop(1);
                    725:        return (NULL);
                    726: }
                    727: 
                    728: /*
                    729:  * Cleanup routine for event thread.
                    730:  */
                    731: static void
                    732: pevent_ctx_main_cleanup(void *arg)
                    733: {
                    734:        struct pevent_ctx *const ctx = arg;
                    735: 
                    736:        DBG(PEVENT, "ctx %p thread cleanup", ctx);
                    737:        ctx->thread = 0;
                    738:        pevent_ctx_unref(ctx);          /* release thread's reference */
                    739: }
                    740: 
                    741: /*
                    742:  * Service an event.
                    743:  *
                    744:  * This is called with an extra reference on the event which is
                    745:  * removed by pevent_ctx_execute().
                    746:  */
                    747: static void
                    748: pevent_ctx_service(struct pevent *ev)
                    749: {
                    750:        struct pevent_ctx *const ctx = ev->ctx;
                    751:        pthread_t tid;
                    752: 
                    753:        /* Does handler want to run in its own thread? */
                    754:        if ((ev->flags & PEVENT_OWN_THREAD) != 0) {
                    755:                if ((errno = pthread_create(&tid,
                    756:                    NULL, pevent_ctx_execute, ev)) != 0) {
                    757:                        alogf(LOG_ERR,
                    758:                            "can't spawn thread for event %p: %m", ev);
                    759:                        pevent_cancel(ev);      /* removes the reference */
                    760:                        return;
                    761:                }
                    762:                pthread_detach(tid);
                    763:                DBG(PEVENT, "dispatched thread %p for ev %p", tid, ev);
                    764:                return;
                    765:        }
                    766: 
                    767:        /* Execute handler with context unlocked */
                    768:        MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
                    769:        pevent_ctx_execute(ev);
                    770:        MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
                    771: }
                    772: 
                    773: /*
                    774:  * Execute an event handler.
                    775:  *
                    776:  * Grab the user mutex, if any, check if event was canceled,
                    777:  * execute handler, release mutex.
                    778:  *
                    779:  * This is called with an extra reference on the event, which is
                    780:  * removed by this function.
                    781:  *
                    782:  * This should be NOT be called with a lock on the event context.
                    783:  */
                    784: static void *
                    785: pevent_ctx_execute(void *arg)
                    786: {
                    787:        struct pevent *const ev = arg;
                    788:        struct pevent_ctx *const ctx = ev->ctx;
                    789:        int r;
                    790: 
                    791:        /* Sanity check */
                    792:        assert(ev->magic == PEVENT_MAGIC);
                    793: 
                    794:        /* Register cleanup hook */
                    795:        DBG(PEVENT, "ev %p being executed", ev);
                    796:        pthread_cleanup_push(pevent_ctx_execute_cleanup, ev);
                    797: 
                    798: try_again:
                    799:        /* Acquire context mutex */
                    800:        DBG(PEVENT, "locking ctx %p for ev %p", ctx, ev);
                    801:        MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
                    802: 
                    803:        /* Check for cancellation to close the cancel/timeout race */
                    804:        if ((ev->flags & PEVENT_CANCELED) != 0) {
                    805:                DBG(PEVENT, "ev %p was canceled", ev);
                    806:                MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
                    807:                goto done;
                    808:        }
                    809: 
                    810:        /*
                    811:         * Acquire user mutex but avoid two potential problems:
                    812:         *
                    813:         * - Race window: other thread could call timer_cancel()
                    814:         *   and destroy the user mutex just before we try to
                    815:         *   acquire it; we must hold context lock to prevent this.
                    816:         *
                    817:         * - Lock reversal deadlock: other threads acquire the
                    818:         *   user lock before the context lock, we do the reverse.
                    819:         *
                    820:         * Once we've acquire both locks, we drop the context lock.
                    821:         */
                    822:        if (ev->mutex != NULL) {
                    823:                MUTEX_TRYLOCK(ev->mutex, ev->mutex_count);
                    824:                if (errno != 0) {
                    825:                        if (errno != EBUSY) {
                    826:                                alogf(LOG_ERR,
                    827:                                    "can't lock mutex for event %p: %m", ev);
                    828:                                pevent_cancel(ev);
                    829:                                MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
                    830:                                goto done;
                    831:                        }
                    832:                        MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
                    833:                        sched_yield();
                    834:                        goto try_again;
                    835:                }
                    836:                ev->flags |= PEVENT_GOT_MUTEX;
                    837:        }
                    838: 
                    839:        /* Remove user's event reference (we still have one though) */
                    840:        pevent_cancel(ev);
                    841: 
                    842:        /* Release context mutex */
                    843:        MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
                    844: 
                    845:        /* Reschedule a new event if recurring */
                    846:        if ((ev->flags & PEVENT_RECURRING) != 0) {
                    847:                switch (ev->type) {
                    848:                case PEVENT_READ:
                    849:                case PEVENT_WRITE:
                    850:                        r = pevent_register(ev->ctx, ev->peventp,
                    851:                            (ev->flags & PEVENT_USER_FLAGS), ev->mutex,
                    852:                            ev->handler, ev->arg, ev->type, ev->u.fd);
                    853:                        break;
                    854:                case PEVENT_TIME:
                    855:                        r = pevent_register(ev->ctx, ev->peventp,
                    856:                            (ev->flags & PEVENT_USER_FLAGS), ev->mutex,
                    857:                            ev->handler, ev->arg, ev->type, ev->u.millis);
                    858:                        break;
                    859:                case PEVENT_MESG_PORT:
                    860:                        r = pevent_register(ev->ctx, ev->peventp,
                    861:                            (ev->flags & PEVENT_USER_FLAGS), ev->mutex,
                    862:                            ev->handler, ev->arg, ev->type, ev->u.port);
                    863:                        break;
                    864:                case PEVENT_USER:
                    865:                        r = pevent_register(ev->ctx, ev->peventp,
                    866:                            (ev->flags & PEVENT_USER_FLAGS), ev->mutex,
                    867:                            ev->handler, ev->arg, ev->type);
                    868:                        break;
                    869:                default:
                    870:                        r = -1;
                    871:                        assert(0);
                    872:                }
                    873:                if (r == -1) {
                    874:                        alogf(LOG_ERR,
                    875:                            "can't re-register recurring event %p: %m", ev);
                    876:                }
                    877:        }
                    878: 
                    879:        /* Execute the handler */
                    880:        (*ev->handler)(ev->arg);
                    881: 
                    882: done:;
                    883:        /* Release user mutex and event reference */
                    884:        pthread_cleanup_pop(1);
                    885:        return (NULL);
                    886: }
                    887: 
                    888: /*
                    889:  * Clean up for pevent_ctx_execute()
                    890:  */
                    891: static void
                    892: pevent_ctx_execute_cleanup(void *arg)
                    893: {
                    894:        struct pevent *const ev = arg;
                    895: 
                    896:        DBG(PEVENT, "ev %p cleanup", ev);
                    897:        assert(ev->magic == PEVENT_MAGIC);
                    898:        if ((ev->flags & PEVENT_GOT_MUTEX) != 0)
                    899:                MUTEX_UNLOCK(ev->mutex, ev->mutex_count);
                    900:        _pevent_unref(ev);
                    901: }
                    902: 
                    903: /*
                    904:  * Wakeup event thread because the event list has changed.
                    905:  *
                    906:  * This assumes the mutex is locked.
                    907:  */
                    908: static void
                    909: pevent_ctx_notify(struct pevent_ctx *ctx)
                    910: {
                    911:        DBG(PEVENT, "ctx %p being notified", ctx);
                    912:        if (!ctx->notified) {
                    913:                (void)write(ctx->pipe[1], &pevent_byte, 1);
                    914:                ctx->notified = 1;
                    915:        }
                    916: }
                    917: 
                    918: /*
                    919:  * Cancel an event (make it so that it never gets triggered) and
                    920:  * remove the user reference to it.
                    921:  *
                    922:  * This assumes the lock is held.
                    923:  */
                    924: static void
                    925: pevent_cancel(struct pevent *ev)
                    926: {
                    927:        DBG(PEVENT, "canceling ev %p", ev);
                    928:        assert((ev->flags & PEVENT_CANCELED) == 0);
                    929:        ev->flags |= PEVENT_CANCELED;
                    930:        *ev->peventp = NULL;
                    931:        _pevent_unref(ev);
                    932: }
                    933: 
                    934: /*
                    935:  * Returns true if the event has been canceled.
                    936:  */
                    937: int
                    938: _pevent_canceled(struct pevent *ev)
                    939: {
                    940:        assert(ev->magic == PEVENT_MAGIC);
                    941:        return ((ev->flags & PEVENT_CANCELED) != 0);
                    942: }
                    943: 
                    944: /*
                    945:  * Drop a reference on an event.
                    946:  */
                    947: void
                    948: _pevent_unref(struct pevent *ev)
                    949: {
                    950:        struct pevent_ctx *const ctx = ev->ctx;
                    951: 
                    952:        DBG(PEVENT, "ev %p refs %d -> %d", ev, ev->refs, ev->refs - 1);
                    953:        assert(ev->refs > 0);
                    954:        assert(ev->magic == PEVENT_MAGIC);
                    955:        MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
                    956:        if (--ev->refs > 0) {
                    957:                MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
                    958:                return;
                    959:        }
                    960:        assert((ev->flags & PEVENT_ENQUEUED) == 0);
                    961:        ev->magic = ~0;                         /* invalidate magic number */
                    962:        DBG(PEVENT, "freeing ev %p", ev);
                    963:        FREE(ctx->mtype, ev);
                    964:        MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
                    965: }
                    966: 
                    967: /*
                    968:  * Release context lock and drop a context reference.
                    969:  *
                    970:  * This assumes the mutex is locked. Upon return it will be unlocked.
                    971:  */
                    972: static void
                    973: pevent_ctx_unref(struct pevent_ctx *ctx)
                    974: {
                    975:        DBG(PEVENT, "ctx %p refs %d -> %d", ctx, ctx->refs, ctx->refs - 1);
                    976:        assert(ctx->refs > 0);
                    977:        assert(ctx->magic == PEVENT_CTX_MAGIC);
                    978:        if (--ctx->refs > 0) {
                    979:                MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
                    980:                return;
                    981:        }
                    982:        assert(TAILQ_EMPTY(&ctx->events));
                    983:        assert(ctx->nevents == 0);
                    984:        assert(ctx->thread == 0);
                    985:        (void)close(ctx->pipe[0]);
                    986:        (void)close(ctx->pipe[1]);
                    987:        MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
                    988:        pthread_mutex_destroy(&ctx->mutex);
                    989:        if (ctx->has_attr)
                    990:                pthread_attr_destroy(&ctx->attr);
                    991:        ctx->magic = ~0;                        /* invalidate magic number */
                    992:        DBG(PEVENT, "freeing ctx %p", ctx);
                    993:        FREE(ctx->mtype, ctx->fds);
                    994:        FREE(ctx->mtype, ctx);
                    995: }
                    996: 

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>