Annotation of embedaddon/mpd/src/contrib/libpdel/util/pevent.c, revision 1.1.1.2

1.1       misho       1: 
                      2: /*
                      3:  * Copyright (c) 2001-2002 Packet Design, LLC.
                      4:  * All rights reserved.
                      5:  * 
                      6:  * Subject to the following obligations and disclaimer of warranty,
                      7:  * use and redistribution of this software, in source or object code
                      8:  * forms, with or without modifications are expressly permitted by
                      9:  * Packet Design; provided, however, that:
                     10:  * 
                     11:  *    (i)  Any and all reproductions of the source or object code
                     12:  *         must include the copyright notice above and the following
                     13:  *         disclaimer of warranties; and
                     14:  *    (ii) No rights are granted, in any manner or form, to use
                     15:  *         Packet Design trademarks, including the mark "PACKET DESIGN"
                     16:  *         on advertising, endorsements, or otherwise except as such
                     17:  *         appears in the above copyright notice or in the software.
                     18:  * 
                     19:  * THIS SOFTWARE IS BEING PROVIDED BY PACKET DESIGN "AS IS", AND
                     20:  * TO THE MAXIMUM EXTENT PERMITTED BY LAW, PACKET DESIGN MAKES NO
                     21:  * REPRESENTATIONS OR WARRANTIES, EXPRESS OR IMPLIED, REGARDING
                     22:  * THIS SOFTWARE, INCLUDING WITHOUT LIMITATION, ANY AND ALL IMPLIED
                     23:  * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE,
                     24:  * OR NON-INFRINGEMENT.  PACKET DESIGN DOES NOT WARRANT, GUARANTEE,
                     25:  * OR MAKE ANY REPRESENTATIONS REGARDING THE USE OF, OR THE RESULTS
                     26:  * OF THE USE OF THIS SOFTWARE IN TERMS OF ITS CORRECTNESS, ACCURACY,
                     27:  * RELIABILITY OR OTHERWISE.  IN NO EVENT SHALL PACKET DESIGN BE
                     28:  * LIABLE FOR ANY DAMAGES RESULTING FROM OR ARISING OUT OF ANY USE
                     29:  * OF THIS SOFTWARE, INCLUDING WITHOUT LIMITATION, ANY DIRECT,
                     30:  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, PUNITIVE, OR CONSEQUENTIAL
                     31:  * DAMAGES, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES, LOSS OF
                     32:  * USE, DATA OR PROFITS, HOWEVER CAUSED AND UNDER ANY THEORY OF
                     33:  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
                     34:  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
                     35:  * THE USE OF THIS SOFTWARE, EVEN IF PACKET DESIGN IS ADVISED OF
                     36:  * THE POSSIBILITY OF SUCH DAMAGE.
                     37:  *
                     38:  * Author: Archie Cobbs <archie@freebsd.org>
                     39:  */
                     40: 
                     41: #include <sys/types.h>
                     42: #include <sys/param.h>
                     43: #include <sys/queue.h>
                     44: #include <sys/time.h>
                     45: 
                     46: #include <netinet/in.h>
                     47: 
                     48: #include <stdarg.h>
                     49: #include <stdio.h>
                     50: #include <syslog.h>
                     51: #include <string.h>
                     52: #include <errno.h>
                     53: #include <assert.h>
                     54: #include <unistd.h>
                     55: #include <poll.h>
                     56: #include <sched.h>
                     57: #include <pthread.h>
                     58: 
                     59: #include "structs/structs.h"
                     60: #include "structs/type/array.h"
                     61: #include "util/typed_mem.h"
                     62: #include "util/mesg_port.h"
                     63: #include "util/pevent.h"
                     64: #ifdef SYSLOG_FACILITY
                     65: #define alogf(sev, fmt, arg...) syslog(sev, "%s: " fmt, __FUNCTION__ , ## arg)
                     66: #else
                     67: #define alogf(sev, fmt, arg...) do{}while(0)
                     68: #endif
                     69: 
                     70: #include "internal.h"
                     71: #include "debug/debug.h"
                     72: 
                     73: #define PEVENT_MAGIC           0x31d7699b
                     74: #define PEVENT_CTX_MAGIC       0x7842f901
                     75: 
                     76: /* Private flags */
                     77: #define PEVENT_OCCURRED                0x8000          /* event has occurred */
                     78: #define PEVENT_CANCELED                0x4000          /* event canceled or done */
                     79: #define PEVENT_ENQUEUED                0x2000          /* in the ctx->events queue */
                     80: #define PEVENT_GOT_MUTEX       0x1000          /* user mutex acquired */
                     81: 
                     82: #define PEVENT_USER_FLAGS      (PEVENT_RECURRING | PEVENT_OWN_THREAD)
                     83: 
                     84: #define READABLE_EVENTS                (POLLIN | POLLRDNORM | POLLERR \
                     85:                                    | POLLHUP | POLLNVAL)
                     86: #define WRITABLE_EVENTS                (POLLOUT | POLLWRNORM | POLLWRBAND \
                     87:                                    | POLLERR | POLLHUP | POLLNVAL)
                     88: 
                     89: /* Event context */
                     90: struct pevent_ctx {
                     91:        u_int32_t               magic;          /* magic number */
                     92:        pthread_mutex_t         mutex;          /* mutex for context */
                     93: #if PDEL_DEBUG
                     94:        int                     mutex_count;    /* mutex count */
                     95: #endif
                     96:        pthread_attr_t          attr;           /* event thread attributes */
                     97:        pthread_t               thread;         /* event thread */
                     98:        TAILQ_HEAD(, pevent)    events;         /* pending event list */
                     99:        u_int                   nevents;        /* length of 'events' list */
                    100:        u_int                   nrwevents;      /* number read/write events */
                    101:        struct pollfd           *fds;           /* poll(2) fds array */
                    102:        u_int                   fds_alloc;      /* allocated size of 'fds' */
                    103:        const char              *mtype;         /* typed_mem(3) memory type */
                    104:        char                    mtype_buf[TYPED_MEM_TYPELEN];
                    105:        int                     pipe[2];        /* event thread notify pipe */
                    106:        u_char                  notified;       /* data in the pipe */
                    107:        u_char                  has_attr;       /* 'attr' is valid */
                    108:        u_int                   refs;           /* references to this context */
                    109: };
                    110: 
                    111: /* Event object */
                    112: struct pevent {
                    113:        u_int32_t               magic;          /* magic number */
                    114:        struct pevent_ctx       *ctx;           /* pointer to event context */
                    115:        struct pevent           **peventp;      /* user handle to this event */
                    116:        pevent_handler_t        *handler;       /* event handler function */
                    117:        void                    *arg;           /* event handler function arg */
                    118:        int                     flags;          /* event flags */
                    119:        int                     poll_idx;       /* index in poll(2) fds array */
                    120:        pthread_mutex_t         *mutex;         /* user mutex, if any */
                    121: #if PDEL_DEBUG
                    122:        int                     mutex_count;    /* mutex count */
                    123: #endif
                    124:        enum pevent_type        type;           /* type of this event */
                    125:        struct timeval          when;           /* expiration for time events */
                    126:        u_int                   refs;           /* references to this event */
                    127:        union {
                    128:                int             fd;             /* file descriptor */
                    129:                int             millis;         /* time delay */
                    130:                struct mesg_port *port;         /* mesg_port */
                    131:        }                       u;
                    132:        TAILQ_ENTRY(pevent)     next;           /* next in ctx->events */
                    133: };
                    134: 
                    135: /* Macros */
                    136: #define PEVENT_ENQUEUE(ctx, ev)                                                \
                    137:        do {                                                            \
                    138:                assert(((ev)->flags & PEVENT_ENQUEUED) == 0);           \
                    139:                TAILQ_INSERT_TAIL(&(ctx)->events, (ev), next);          \
                    140:                (ev)->flags |= PEVENT_ENQUEUED;                         \
                    141:                (ctx)->nevents++;                                       \
                    142:                if ((ev)->type == PEVENT_READ                           \
                    143:                    || (ev)->type == PEVENT_WRITE)                      \
                    144:                        (ctx)->nrwevents++;                             \
                    145:                DBG(PEVENT, "ev %p refs %d -> %d (enqueued)",           \
                    146:                    (ev), (ev)->refs, (ev)->refs + 1);                  \
                    147:                (ev)->refs++;                                           \
                    148:        } while (0)
                    149: 
                    150: #define PEVENT_DEQUEUE(ctx, ev)                                                \
                    151:        do {                                                            \
                    152:                assert(((ev)->flags & PEVENT_ENQUEUED) != 0);           \
                    153:                TAILQ_REMOVE(&(ctx)->events, (ev), next);               \
                    154:                (ctx)->nevents--;                                       \
                    155:                if ((ev)->type == PEVENT_READ                           \
                    156:                    || (ev)->type == PEVENT_WRITE)                      \
                    157:                        (ctx)->nrwevents--;                             \
                    158:                (ev)->flags &= ~PEVENT_ENQUEUED;                        \
                    159:                _pevent_unref(ev);                                      \
                    160:        } while (0)
                    161: 
                    162: #define PEVENT_SET_OCCURRED(ctx, ev)                                   \
                    163:        do {                                                            \
                    164:                (ev)->flags |= PEVENT_OCCURRED;                         \
                    165:                if ((ev) != TAILQ_FIRST(&ctx->events)) {                \
                    166:                        TAILQ_REMOVE(&(ctx)->events, (ev), next);       \
                    167:                        TAILQ_INSERT_HEAD(&(ctx)->events, (ev), next);  \
                    168:                }                                                       \
                    169:        } while (0)
                    170: 
                    171: /* Internal functions */
                    172: static void    pevent_ctx_service(struct pevent *ev);
                    173: static void    *pevent_ctx_main(void *arg);
                    174: static void    pevent_ctx_main_cleanup(void *arg);
                    175: static void    *pevent_ctx_execute(void *arg);
                    176: static void    pevent_ctx_execute_cleanup(void *arg);
                    177: static void    pevent_ctx_notify(struct pevent_ctx *ctx);
                    178: static void    pevent_ctx_unref(struct pevent_ctx *ctx);
                    179: static void    pevent_cancel(struct pevent *ev);
                    180: 
                    181: /* Internal variables */
                    182: static char    pevent_byte;
                    183: 
                    184: /*
                    185:  * Create a new event context.
                    186:  */
                    187: struct pevent_ctx *
                    188: pevent_ctx_create(const char *mtype, const pthread_attr_t *attr)
                    189: {
                    190:        pthread_mutexattr_t mutexattr;
                    191:        struct pevent_ctx *ctx;
                    192:        int got_mutexattr = 0;
                    193:        int got_mutex = 0;
                    194: 
                    195:        /* Create context object */
                    196:        if ((ctx = MALLOC(mtype, sizeof(*ctx))) == NULL)
                    197:                return (NULL);
                    198:        memset(ctx, 0, sizeof(*ctx));
                    199:        if (mtype != NULL) {
                    200:                strlcpy(ctx->mtype_buf, mtype, sizeof(ctx->mtype_buf));
                    201:                ctx->mtype = ctx->mtype_buf;
                    202:        }
                    203:        TAILQ_INIT(&ctx->events);
                    204: 
                    205:        /* Copy thread attributes */
                    206:        if (attr != NULL) {
                    207:                struct sched_param param;
                    208:                int value;
                    209: 
                    210:                if ((errno = pthread_attr_init(&ctx->attr)) != 0)
                    211:                        goto fail;
                    212:                ctx->has_attr = 1;
1.1.1.2 ! misho     213:                pthread_attr_getinheritsched(attr, &value);
1.1       misho     214:                pthread_attr_setinheritsched(&ctx->attr, value);
1.1.1.2 ! misho     215:                pthread_attr_getschedparam(attr, &param);
1.1       misho     216:                pthread_attr_setschedparam(&ctx->attr, &param);
1.1.1.2 ! misho     217:                pthread_attr_getschedpolicy(attr, &value);
1.1       misho     218:                pthread_attr_setschedpolicy(&ctx->attr, value);
1.1.1.2 ! misho     219:                pthread_attr_getscope(attr, &value);
1.1       misho     220:                pthread_attr_setscope(&ctx->attr, value);
                    221:        }
                    222: 
                    223:        /* Initialize mutex */
                    224:        if ((errno = pthread_mutexattr_init(&mutexattr) != 0))
                    225:                goto fail;
                    226:        got_mutexattr = 1;
                    227:        pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_RECURSIVE);
                    228:        if ((errno = pthread_mutex_init(&ctx->mutex, &mutexattr)) != 0)
                    229:                goto fail;
                    230:        got_mutex = 1;
                    231: 
                    232:        /* Initialize notify pipe */
                    233:        if (pipe(ctx->pipe) == -1)
                    234:                goto fail;
                    235: 
                    236:        /* Finish up */
                    237:        pthread_mutexattr_destroy(&mutexattr);
                    238:        ctx->magic = PEVENT_CTX_MAGIC;
                    239:        ctx->refs = 1;
                    240:        DBG(PEVENT, "created ctx %p", ctx);
                    241:        return (ctx);
                    242: 
                    243: fail:
                    244:        /* Clean up after failure */
                    245:        if (got_mutex)
                    246:                pthread_mutex_destroy(&ctx->mutex);
                    247:        if (got_mutexattr)
                    248:                pthread_mutexattr_destroy(&mutexattr);
                    249:        if (ctx->has_attr)
                    250:                pthread_attr_destroy(&ctx->attr);
                    251:        FREE(mtype, ctx);
                    252:        return (NULL);
                    253: }
                    254: 
                    255: /*
                    256:  * Destroy an event context.
                    257:  *
                    258:  * All events are unregistered.
                    259:  */
                    260: void
                    261: pevent_ctx_destroy(struct pevent_ctx **ctxp)
                    262: {
                    263:        struct pevent_ctx *const ctx = *ctxp;
                    264: 
                    265:        /* Allow NULL context */
                    266:        if (ctx == NULL)
                    267:                return;
                    268:        *ctxp = NULL;
                    269:        DBG(PEVENT, "destroying ctx %p", ctx);
                    270: 
                    271:        /* Lock context */
                    272:        MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
                    273: 
                    274:        /* Sanity check */
                    275:        assert(ctx->magic == PEVENT_CTX_MAGIC);
                    276: 
                    277:        /* Unregister all events */
                    278:        while (!TAILQ_EMPTY(&ctx->events))
                    279:                pevent_unregister(TAILQ_FIRST(&ctx->events)->peventp);
                    280: 
                    281:        /* Decrement context reference count and unlock context */
                    282:        pevent_ctx_unref(ctx);
                    283: }
                    284: 
                    285: /*
                    286:  * Return the number of registered events.
                    287:  */
                    288: u_int
                    289: pevent_ctx_count(struct pevent_ctx *ctx)
                    290: {
                    291:        u_int nevents;
                    292: 
                    293:        assert(ctx->magic == PEVENT_CTX_MAGIC);
                    294:        MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
                    295:        nevents = ctx->nevents;
                    296:        MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
                    297:        return (nevents);
                    298: }
                    299: 
                    300: /*
                    301:  * Create a new schedule item.
                    302:  */
                    303: int
                    304: pevent_register(struct pevent_ctx *ctx, struct pevent **peventp, int flags,
                    305:        pthread_mutex_t *mutex, pevent_handler_t *handler, void *arg,
                    306:        enum pevent_type type, ...)
                    307: {
                    308:        struct pevent *ev;
                    309:        va_list args;
                    310: 
                    311:        /* Sanity check */
                    312:        if (*peventp != NULL) {
                    313:                errno = EBUSY;
                    314:                return (-1);
                    315:        }
                    316: 
                    317:        /* Check flags */
                    318:        if ((flags & ~PEVENT_USER_FLAGS) != 0) {
                    319:                errno = EINVAL;
                    320:                return (-1);
                    321:        }
                    322: 
                    323:        /* Create new event */
                    324:        if ((ev = MALLOC(ctx->mtype, sizeof(*ev))) == NULL)
                    325:                return (-1);
                    326:        memset(ev, 0, sizeof(*ev));
                    327:        ev->magic = PEVENT_MAGIC;
                    328:        ev->ctx = ctx;
                    329:        ev->handler = handler;
                    330:        ev->arg = arg;
                    331:        ev->flags = flags;
                    332:        ev->poll_idx = -1;
                    333:        ev->mutex = mutex;
                    334:        ev->type = type;
                    335:        ev->refs = 1;                           /* the caller's reference */
                    336:        DBG(PEVENT, "new ev %p in ctx %p", ev, ctx);
                    337: 
                    338:        /* Add type-specific info */
                    339:        switch (type) {
                    340:        case PEVENT_READ:
                    341:        case PEVENT_WRITE:
                    342:                va_start(args, type);
                    343:                ev->u.fd = va_arg(args, int);
                    344:                va_end(args);
                    345:                break;
                    346:        case PEVENT_TIME:
                    347:                va_start(args, type);
                    348:                ev->u.millis = va_arg(args, int);
                    349:                va_end(args);
                    350:                if (ev->u.millis < 0)
                    351:                        ev->u.millis = 0;
                    352:                gettimeofday(&ev->when, NULL);
                    353:                ev->when.tv_sec += ev->u.millis / 1000;
                    354:                ev->when.tv_usec += (ev->u.millis % 1000) * 1000;
                    355:                if (ev->when.tv_usec > 1000000) {
                    356:                        ev->when.tv_sec++;
                    357:                        ev->when.tv_usec -= 1000000;
                    358:                }
                    359:                break;
                    360:        case PEVENT_MESG_PORT:
                    361:                va_start(args, type);
                    362:                ev->u.port = va_arg(args, struct mesg_port *);
                    363:                va_end(args);
                    364:                if (ev->u.port == NULL)
                    365:                        goto invalid;
                    366:                break;
                    367:        case PEVENT_USER:
                    368:                break;
                    369:        default:
                    370:        invalid:
                    371:                errno = EINVAL;
                    372:                _pevent_unref(ev);
                    373:                return (-1);
                    374:        }
                    375: 
                    376:        /* Lock context */
                    377:        MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
                    378: 
                    379:        /* Link to related object (if appropriate) */
                    380:        switch (ev->type) {
                    381:        case PEVENT_MESG_PORT:
                    382:                if (_mesg_port_set_event(ev->u.port, ev) == -1) {
                    383:                        MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
                    384:                        _pevent_unref(ev);
                    385:                        return (-1);
                    386:                }
                    387:                DBG(PEVENT, "ev %p refs %d -> %d (mesg port)",
                    388:                    ev, ev->refs, ev->refs + 1);
                    389:                ev->refs++;                     /* the mesg_port's reference */
                    390:                break;
                    391:        default:
                    392:                break;
                    393:        }
                    394: 
                    395:        /* Start or notify event thread */
                    396:        if (ctx->thread == 0) {
                    397:                if ((errno = pthread_create(&ctx->thread,
                    398:                    ctx->has_attr ? &ctx->attr : NULL,
                    399:                    pevent_ctx_main, ctx)) != 0) {
                    400:                        ctx->thread = 0;
                    401:                        alogf(LOG_ERR, "%s: %m", "pthread_create");
                    402:                        ev->flags |= PEVENT_CANCELED;
                    403:                        MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
                    404:                        _pevent_unref(ev);
                    405:                        return (-1);
                    406:                }
                    407:                pthread_detach(ctx->thread);
                    408:                DBG(PEVENT, "created tid %p for ctx %p, refs -> %d",
                    409:                    ctx->thread, ctx, ctx->refs + 1);
                    410:                ctx->refs++;                    /* add reference for thread */
                    411:        } else
                    412:                pevent_ctx_notify(ctx);
                    413: 
                    414:        /* Caller gets the one reference */
                    415:        ev->peventp = peventp;
                    416:        *peventp = ev;
                    417: 
                    418:        /* Add event to the pending event list */
                    419:        PEVENT_ENQUEUE(ctx, ev);
                    420: 
                    421:        /* Unlock context */
                    422:        MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
                    423: 
                    424:        return (0);
                    425: }
                    426: 
                    427: /*
                    428:  * Unregister an event.
                    429:  */
                    430: void
                    431: pevent_unregister(struct pevent **peventp)
                    432: {
                    433:        struct pevent *const ev = *peventp;
                    434:        struct pevent_ctx *ctx;
                    435: 
                    436:        /* Allow NULL event */
                    437:        if (ev == NULL)
                    438:                return;
                    439:        ctx = ev->ctx;
                    440:        DBG(PEVENT, "unregister ev %p in ctx %p", ev, ctx);
                    441: 
                    442:        /* Sanity checks */
                    443:        assert(ev->magic == PEVENT_MAGIC);
                    444:        assert(ctx->magic == PEVENT_CTX_MAGIC);
                    445:        assert(ev->peventp == peventp);
                    446: 
                    447:        /* Lock context */
                    448:        MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
                    449:        DBG(PEVENT, "unregister ev %p in ctx %p: lock", ev, ctx);
                    450: 
                    451:        /* Dequeue event if queued */
                    452:        if ((ev->flags & PEVENT_ENQUEUED) != 0) {
                    453:                PEVENT_DEQUEUE(ctx, ev);
                    454:                pevent_ctx_notify(ctx);                 /* wakeup thread */
                    455:        }
                    456: 
                    457:        /* Cancel the event */
                    458:        pevent_cancel(ev);
                    459: 
                    460:        /* Unlock context */
                    461:        DBG(PEVENT, "unregister ev %p in ctx %p: unlock", ev, ctx);
                    462:        MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
                    463: }
                    464: 
                    465: /*
                    466:  * Trigger an event.
                    467:  */
                    468: void
                    469: pevent_trigger(struct pevent *ev)
                    470: {
                    471:        struct pevent_ctx *ctx;
                    472: 
                    473:        /* Sanity check */
                    474:        if (ev == NULL)
                    475:                return;
                    476: 
                    477:        /* Get context */
                    478:        ctx = ev->ctx;
                    479:        DBG(PEVENT, "trigger ev %p in ctx %p", ev, ctx);
                    480:        assert(ev->magic == PEVENT_MAGIC);
                    481: 
                    482:        /* Lock context */
                    483:        MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
                    484: 
                    485:        /* If event already canceled, bail */
                    486:        if ((ev->flags & PEVENT_CANCELED) != 0)
                    487:                goto done;
                    488: 
                    489:        /* Mark event as having occurred */
                    490:        PEVENT_SET_OCCURRED(ctx, ev);
                    491: 
                    492:        /* Wake up thread if event is still in the queue */
                    493:        if ((ev->flags & PEVENT_ENQUEUED) != 0)
                    494:                pevent_ctx_notify(ctx);
                    495: 
                    496: done:
                    497:        /* Unlock context */
                    498:        MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
                    499: }
                    500: 
                    501: /*
                    502:  * Get event info.
                    503:  */
                    504: int
                    505: pevent_get_info(struct pevent *ev, struct pevent_info *info)
                    506: {
                    507:        if (ev == NULL) {
                    508:                errno = ENXIO;
                    509:                return (-1);
                    510:        }
                    511:        assert(ev->magic == PEVENT_MAGIC);
                    512:        memset(info, 0, sizeof(*info));
                    513:        info->type = ev->type;
                    514:        switch (ev->type) {
                    515:        case PEVENT_READ:
                    516:        case PEVENT_WRITE:
                    517:                info->u.fd = ev->u.fd;
                    518:                break;
                    519:        case PEVENT_TIME:
                    520:                info->u.millis = ev->u.millis;
                    521:                break;
                    522:        case PEVENT_MESG_PORT:
                    523:                info->u.port = ev->u.port;
                    524:                break;
                    525:        case PEVENT_USER:
                    526:                break;
                    527:        default:
                    528:                errno = EINVAL;
                    529:                return (-1);
                    530:        }
                    531:        return (0);
                    532: }
                    533: 
                    534: /*
                    535:  * Event thread entry point.
                    536:  */
                    537: static void *
                    538: pevent_ctx_main(void *arg)
                    539: {
                    540:        struct pevent_ctx *const ctx = arg;
                    541:        struct timeval now;
                    542:        struct pollfd *fd;
                    543:        struct pevent *ev;
                    544:        struct pevent *next_ev;
1.1.1.2 ! misho     545:        unsigned poll_idx;
1.1       misho     546:        int timeout;
                    547:        int r;
                    548: 
                    549:        /* Push cleanup hook */
                    550:        pthread_cleanup_push(pevent_ctx_main_cleanup, ctx);
                    551:        assert(ctx->magic == PEVENT_CTX_MAGIC);
                    552: 
                    553:        /* Lock context */
                    554:        MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
                    555: 
                    556:        /* Safety belts */
                    557:        pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
                    558: 
                    559:        /* Get current time */
                    560:        gettimeofday(&now, NULL);
                    561:        DBG(PEVENT, "ctx %p thread starting", ctx);
                    562: 
                    563: loop:
                    564:        /* Are there any events left? */
                    565:        if (TAILQ_EMPTY(&ctx->events)) {
                    566:                DBG(PEVENT, "ctx %p thread exiting", ctx);
                    567:                goto done;
                    568:        }
                    569: 
                    570:        /* Make sure ctx->fds array is long enough */
                    571:        if (ctx->fds_alloc < 1 + ctx->nrwevents) {
                    572:                const u_int new_alloc = roundup(1 + ctx->nrwevents, 16);
                    573:                void *mem;
                    574: 
                    575:                if ((mem = REALLOC(ctx->mtype, ctx->fds,
                    576:                    new_alloc * sizeof(*ctx->fds))) == NULL)
                    577:                        alogf(LOG_ERR, "%s: %m", "realloc");
                    578:                else {
                    579:                        ctx->fds = mem;
                    580:                        ctx->fds_alloc = new_alloc;
                    581:                }
                    582:        }
                    583: 
                    584:        /* If we were intentionally woken up, read the wakeup byte */
                    585:        if (ctx->notified) {
                    586:                DBG(PEVENT, "ctx %p thread was notified", ctx);
                    587:                (void)read(ctx->pipe[0], &pevent_byte, 1);
                    588:                ctx->notified = 0;
                    589:        }
                    590: 
                    591:        /* Add event for the notify pipe */
                    592:        poll_idx = 0;
                    593:        if (ctx->fds_alloc > 0) {
                    594:                fd = &ctx->fds[poll_idx++];
                    595:                memset(fd, 0, sizeof(*fd));
                    596:                fd->fd = ctx->pipe[0];
                    597:                fd->events = POLLRDNORM;
                    598:        }
                    599: 
                    600:        /* Fill in rest of poll() array */
                    601:        timeout = INFTIM;
                    602:        TAILQ_FOREACH(ev, &ctx->events, next) {
                    603:                switch (ev->type) {
                    604:                case PEVENT_READ:
                    605:                case PEVENT_WRITE:
                    606:                    {
                    607:                        if (poll_idx >= ctx->fds_alloc) {
                    608:                                ev->poll_idx = -1;
                    609:                                break;
                    610:                        }
                    611:                        ev->poll_idx = poll_idx++;
                    612:                        fd = &ctx->fds[ev->poll_idx];
                    613:                        memset(fd, 0, sizeof(*fd));
                    614:                        fd->fd = ev->u.fd;
                    615:                        fd->events = (ev->type == PEVENT_READ) ?
                    616:                            POLLRDNORM : POLLWRNORM;
                    617:                        break;
                    618:                    }
                    619:                case PEVENT_TIME:
                    620:                    {
                    621:                        struct timeval remain;
                    622:                        int millis;
                    623: 
                    624:                        /* Compute milliseconds until event */
                    625:                        if (timercmp(&ev->when, &now, <=))
                    626:                                millis = 0;
                    627:                        else {
                    628:                                timersub(&ev->when, &now, &remain);
                    629:                                millis = remain.tv_sec * 1000;
                    630:                                millis += remain.tv_usec / 1000;
                    631:                        }
                    632: 
                    633:                        /* Remember the minimum delay */
                    634:                        if (timeout == INFTIM || millis < timeout)
                    635:                                timeout = millis;
                    636:                        break;
                    637:                    }
                    638:                default:
                    639:                        break;
                    640:                }
                    641:        }
                    642: 
                    643:        /* Mark non-poll() events that have occurred */
                    644:        TAILQ_FOREACH(ev, &ctx->events, next) {
                    645:                assert(ev->magic == PEVENT_MAGIC);
                    646:                switch (ev->type) {
                    647:                case PEVENT_MESG_PORT:
                    648:                        if (mesg_port_qlen(ev->u.port) > 0)
                    649:                                PEVENT_SET_OCCURRED(ctx, ev);
                    650:                        break;
                    651:                default:
                    652:                        break;
                    653:                }
                    654:                if ((ev->flags & PEVENT_OCCURRED) != 0)
                    655:                        timeout = 0;                    /* don't delay */
                    656:        }
                    657: 
                    658: #if PDEL_DEBUG
                    659:        /* Debugging */
                    660:        DBG(PEVENT, "ctx %p thread event list:", ctx);
                    661:        TAILQ_FOREACH(ev, &ctx->events, next)
                    662:                DBG(PEVENT, "\tev %p", ev);
                    663: #endif
                    664: 
                    665:        /* Wait for something to happen */
                    666:        MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
                    667:        DBG(PEVENT, "ctx %p thread sleeping", ctx);
                    668:        r = poll(ctx->fds, poll_idx, timeout);
                    669:        DBG(PEVENT, "ctx %p thread woke up", ctx);
                    670:        assert(ctx->magic == PEVENT_CTX_MAGIC);
                    671:        MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
                    672: 
                    673:        /* Check for errors */
                    674:        if (r == -1 && errno != EINTR) {
                    675:                alogf(LOG_CRIT, "%s: %m", "poll");
                    676:                assert(0);
                    677:        }
                    678: 
                    679:        /* Update current time */
                    680:        gettimeofday(&now, NULL);
                    681: 
                    682:        /* Mark poll() events that have occurred */
                    683:        for (ev = TAILQ_FIRST((&ctx->events)); ev != NULL; ev = next_ev) {
                    684:                next_ev = TAILQ_NEXT(ev, next);
                    685:                assert(ev->magic == PEVENT_MAGIC);
                    686:                switch (ev->type) {
                    687:                case PEVENT_READ:
                    688:                case PEVENT_WRITE:
                    689:                        if (ev->poll_idx == -1)
                    690:                                break;
                    691:                        fd = &ctx->fds[ev->poll_idx];
                    692:                        if ((fd->revents & ((ev->type == PEVENT_READ) ?
                    693:                            READABLE_EVENTS : WRITABLE_EVENTS)) != 0)
                    694:                                PEVENT_SET_OCCURRED(ctx, ev);
                    695:                        break;
                    696:                case PEVENT_TIME:
                    697:                        if (timercmp(&ev->when, &now, <=))
                    698:                                PEVENT_SET_OCCURRED(ctx, ev);
                    699:                        break;
                    700:                default:
                    701:                        break;
                    702:                }
                    703:        }
                    704: 
                    705:        /* Service all events that are marked as having occurred */
                    706:        while (1) {
                    707: 
                    708:                /* Find next event that needs service */
                    709:                ev = TAILQ_FIRST(&ctx->events);
                    710:                if (ev == NULL || (ev->flags & PEVENT_OCCURRED) == 0)
                    711:                        break;
                    712:                DBG(PEVENT, "ctx %p thread servicing ev %p", ctx, ev);
                    713: 
                    714:                /* Detach event and service it while keeping a reference */
                    715:                DBG(PEVENT, "ev %p refs %d -> %d (for servicing)",
                    716:                    ev, ev->refs, ev->refs + 1);
                    717:                ev->refs++;
                    718:                PEVENT_DEQUEUE(ctx, ev);
                    719:                pevent_ctx_service(ev);
                    720:        }
                    721: 
                    722:        /* Spin again */
                    723:        DBG(PEVENT, "ctx %p thread spin again", ctx);
                    724:        goto loop;
                    725: 
                    726: done:;
                    727:        /* Done */
                    728:        pthread_cleanup_pop(1);
                    729:        return (NULL);
                    730: }
                    731: 
                    732: /*
                    733:  * Cleanup routine for event thread.
                    734:  */
                    735: static void
                    736: pevent_ctx_main_cleanup(void *arg)
                    737: {
                    738:        struct pevent_ctx *const ctx = arg;
                    739: 
                    740:        DBG(PEVENT, "ctx %p thread cleanup", ctx);
                    741:        ctx->thread = 0;
                    742:        pevent_ctx_unref(ctx);          /* release thread's reference */
                    743: }
                    744: 
                    745: /*
                    746:  * Service an event.
                    747:  *
                    748:  * This is called with an extra reference on the event which is
                    749:  * removed by pevent_ctx_execute().
                    750:  */
                    751: static void
                    752: pevent_ctx_service(struct pevent *ev)
                    753: {
                    754:        struct pevent_ctx *const ctx = ev->ctx;
                    755:        pthread_t tid;
                    756: 
                    757:        /* Does handler want to run in its own thread? */
                    758:        if ((ev->flags & PEVENT_OWN_THREAD) != 0) {
                    759:                if ((errno = pthread_create(&tid,
                    760:                    NULL, pevent_ctx_execute, ev)) != 0) {
                    761:                        alogf(LOG_ERR,
                    762:                            "can't spawn thread for event %p: %m", ev);
                    763:                        pevent_cancel(ev);      /* removes the reference */
                    764:                        return;
                    765:                }
                    766:                pthread_detach(tid);
                    767:                DBG(PEVENT, "dispatched thread %p for ev %p", tid, ev);
                    768:                return;
                    769:        }
                    770: 
                    771:        /* Execute handler with context unlocked */
                    772:        MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
                    773:        pevent_ctx_execute(ev);
                    774:        MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
                    775: }
                    776: 
                    777: /*
                    778:  * Execute an event handler.
                    779:  *
                    780:  * Grab the user mutex, if any, check if event was canceled,
                    781:  * execute handler, release mutex.
                    782:  *
                    783:  * This is called with an extra reference on the event, which is
                    784:  * removed by this function.
                    785:  *
                    786:  * This should be NOT be called with a lock on the event context.
                    787:  */
                    788: static void *
                    789: pevent_ctx_execute(void *arg)
                    790: {
                    791:        struct pevent *const ev = arg;
                    792:        struct pevent_ctx *const ctx = ev->ctx;
                    793:        int r;
                    794: 
                    795:        /* Sanity check */
                    796:        assert(ev->magic == PEVENT_MAGIC);
                    797: 
                    798:        /* Register cleanup hook */
                    799:        DBG(PEVENT, "ev %p being executed", ev);
                    800:        pthread_cleanup_push(pevent_ctx_execute_cleanup, ev);
                    801: 
                    802: try_again:
                    803:        /* Acquire context mutex */
                    804:        DBG(PEVENT, "locking ctx %p for ev %p", ctx, ev);
                    805:        MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
                    806: 
                    807:        /* Check for cancellation to close the cancel/timeout race */
                    808:        if ((ev->flags & PEVENT_CANCELED) != 0) {
                    809:                DBG(PEVENT, "ev %p was canceled", ev);
                    810:                MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
                    811:                goto done;
                    812:        }
                    813: 
                    814:        /*
                    815:         * Acquire user mutex but avoid two potential problems:
                    816:         *
                    817:         * - Race window: other thread could call timer_cancel()
                    818:         *   and destroy the user mutex just before we try to
                    819:         *   acquire it; we must hold context lock to prevent this.
                    820:         *
                    821:         * - Lock reversal deadlock: other threads acquire the
                    822:         *   user lock before the context lock, we do the reverse.
                    823:         *
                    824:         * Once we've acquire both locks, we drop the context lock.
                    825:         */
                    826:        if (ev->mutex != NULL) {
                    827:                MUTEX_TRYLOCK(ev->mutex, ev->mutex_count);
                    828:                if (errno != 0) {
                    829:                        if (errno != EBUSY) {
                    830:                                alogf(LOG_ERR,
                    831:                                    "can't lock mutex for event %p: %m", ev);
                    832:                                pevent_cancel(ev);
                    833:                                MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
                    834:                                goto done;
                    835:                        }
                    836:                        MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
                    837:                        sched_yield();
                    838:                        goto try_again;
                    839:                }
                    840:                ev->flags |= PEVENT_GOT_MUTEX;
                    841:        }
                    842: 
                    843:        /* Remove user's event reference (we still have one though) */
                    844:        pevent_cancel(ev);
                    845: 
                    846:        /* Release context mutex */
                    847:        MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
                    848: 
                    849:        /* Reschedule a new event if recurring */
                    850:        if ((ev->flags & PEVENT_RECURRING) != 0) {
                    851:                switch (ev->type) {
                    852:                case PEVENT_READ:
                    853:                case PEVENT_WRITE:
                    854:                        r = pevent_register(ev->ctx, ev->peventp,
                    855:                            (ev->flags & PEVENT_USER_FLAGS), ev->mutex,
                    856:                            ev->handler, ev->arg, ev->type, ev->u.fd);
                    857:                        break;
                    858:                case PEVENT_TIME:
                    859:                        r = pevent_register(ev->ctx, ev->peventp,
                    860:                            (ev->flags & PEVENT_USER_FLAGS), ev->mutex,
                    861:                            ev->handler, ev->arg, ev->type, ev->u.millis);
                    862:                        break;
                    863:                case PEVENT_MESG_PORT:
                    864:                        r = pevent_register(ev->ctx, ev->peventp,
                    865:                            (ev->flags & PEVENT_USER_FLAGS), ev->mutex,
                    866:                            ev->handler, ev->arg, ev->type, ev->u.port);
                    867:                        break;
                    868:                case PEVENT_USER:
                    869:                        r = pevent_register(ev->ctx, ev->peventp,
                    870:                            (ev->flags & PEVENT_USER_FLAGS), ev->mutex,
                    871:                            ev->handler, ev->arg, ev->type);
                    872:                        break;
                    873:                default:
                    874:                        r = -1;
                    875:                        assert(0);
                    876:                }
                    877:                if (r == -1) {
                    878:                        alogf(LOG_ERR,
                    879:                            "can't re-register recurring event %p: %m", ev);
                    880:                }
                    881:        }
                    882: 
                    883:        /* Execute the handler */
                    884:        (*ev->handler)(ev->arg);
                    885: 
                    886: done:;
                    887:        /* Release user mutex and event reference */
                    888:        pthread_cleanup_pop(1);
                    889:        return (NULL);
                    890: }
                    891: 
                    892: /*
                    893:  * Clean up for pevent_ctx_execute()
                    894:  */
                    895: static void
                    896: pevent_ctx_execute_cleanup(void *arg)
                    897: {
                    898:        struct pevent *const ev = arg;
                    899: 
                    900:        DBG(PEVENT, "ev %p cleanup", ev);
                    901:        assert(ev->magic == PEVENT_MAGIC);
                    902:        if ((ev->flags & PEVENT_GOT_MUTEX) != 0)
                    903:                MUTEX_UNLOCK(ev->mutex, ev->mutex_count);
                    904:        _pevent_unref(ev);
                    905: }
                    906: 
                    907: /*
                    908:  * Wakeup event thread because the event list has changed.
                    909:  *
                    910:  * This assumes the mutex is locked.
                    911:  */
                    912: static void
                    913: pevent_ctx_notify(struct pevent_ctx *ctx)
                    914: {
                    915:        DBG(PEVENT, "ctx %p being notified", ctx);
                    916:        if (!ctx->notified) {
                    917:                (void)write(ctx->pipe[1], &pevent_byte, 1);
                    918:                ctx->notified = 1;
                    919:        }
                    920: }
                    921: 
                    922: /*
                    923:  * Cancel an event (make it so that it never gets triggered) and
                    924:  * remove the user reference to it.
                    925:  *
                    926:  * This assumes the lock is held.
                    927:  */
                    928: static void
                    929: pevent_cancel(struct pevent *ev)
                    930: {
                    931:        DBG(PEVENT, "canceling ev %p", ev);
                    932:        assert((ev->flags & PEVENT_CANCELED) == 0);
                    933:        ev->flags |= PEVENT_CANCELED;
                    934:        *ev->peventp = NULL;
                    935:        _pevent_unref(ev);
                    936: }
                    937: 
                    938: /*
                    939:  * Returns true if the event has been canceled.
                    940:  */
                    941: int
                    942: _pevent_canceled(struct pevent *ev)
                    943: {
                    944:        assert(ev->magic == PEVENT_MAGIC);
                    945:        return ((ev->flags & PEVENT_CANCELED) != 0);
                    946: }
                    947: 
                    948: /*
                    949:  * Drop a reference on an event.
                    950:  */
                    951: void
                    952: _pevent_unref(struct pevent *ev)
                    953: {
                    954:        struct pevent_ctx *const ctx = ev->ctx;
                    955: 
                    956:        DBG(PEVENT, "ev %p refs %d -> %d", ev, ev->refs, ev->refs - 1);
                    957:        assert(ev->refs > 0);
                    958:        assert(ev->magic == PEVENT_MAGIC);
                    959:        MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
                    960:        if (--ev->refs > 0) {
                    961:                MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
                    962:                return;
                    963:        }
                    964:        assert((ev->flags & PEVENT_ENQUEUED) == 0);
                    965:        ev->magic = ~0;                         /* invalidate magic number */
                    966:        DBG(PEVENT, "freeing ev %p", ev);
                    967:        FREE(ctx->mtype, ev);
                    968:        MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
                    969: }
                    970: 
                    971: /*
                    972:  * Release context lock and drop a context reference.
                    973:  *
                    974:  * This assumes the mutex is locked. Upon return it will be unlocked.
                    975:  */
                    976: static void
                    977: pevent_ctx_unref(struct pevent_ctx *ctx)
                    978: {
                    979:        DBG(PEVENT, "ctx %p refs %d -> %d", ctx, ctx->refs, ctx->refs - 1);
                    980:        assert(ctx->refs > 0);
                    981:        assert(ctx->magic == PEVENT_CTX_MAGIC);
                    982:        if (--ctx->refs > 0) {
                    983:                MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
                    984:                return;
                    985:        }
                    986:        assert(TAILQ_EMPTY(&ctx->events));
                    987:        assert(ctx->nevents == 0);
                    988:        assert(ctx->thread == 0);
                    989:        (void)close(ctx->pipe[0]);
                    990:        (void)close(ctx->pipe[1]);
                    991:        MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
                    992:        pthread_mutex_destroy(&ctx->mutex);
                    993:        if (ctx->has_attr)
                    994:                pthread_attr_destroy(&ctx->attr);
                    995:        ctx->magic = ~0;                        /* invalidate magic number */
                    996:        DBG(PEVENT, "freeing ctx %p", ctx);
                    997:        FREE(ctx->mtype, ctx->fds);
                    998:        FREE(ctx->mtype, ctx);
                    999: }
                   1000: 

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>