Annotation of embedaddon/strongswan/src/libstrongswan/processing/watcher.c, revision 1.1.1.1

1.1       misho       1: /*
                      2:  * Copyright (C) 2016 Tobias Brunner
                      3:  * HSR Hochschule fuer Technik Rapperswil
                      4:  *
                      5:  * Copyright (C) 2013 Martin Willi
                      6:  * Copyright (C) 2013 revosec AG
                      7:  *
                      8:  * This program is free software; you can redistribute it and/or modify it
                      9:  * under the terms of the GNU General Public License as published by the
                     10:  * Free Software Foundation; either version 2 of the License, or (at your
                     11:  * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
                     12:  *
                     13:  * This program is distributed in the hope that it will be useful, but
                     14:  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
                     15:  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
                     16:  * for more details.
                     17:  */
                     18: 
                     19: #include "watcher.h"
                     20: 
                     21: #include <library.h>
                     22: #include <threading/thread.h>
                     23: #include <threading/mutex.h>
                     24: #include <threading/condvar.h>
                     25: #include <collections/linked_list.h>
                     26: #include <processing/jobs/callback_job.h>
                     27: 
                     28: #include <unistd.h>
                     29: #include <errno.h>
                     30: #include <fcntl.h>
                     31: 
                     32: typedef struct private_watcher_t private_watcher_t;
                     33: typedef struct entry_t entry_t;
                     34: 
                     35: /**
                     36:  * Private data of an watcher_t object.
                     37:  */
                     38: struct private_watcher_t {
                     39: 
                     40:        /**
                     41:         * Public watcher_t interface.
                     42:         */
                     43:        watcher_t public;
                     44: 
                     45:        /**
                     46:         * List of registered FDs
                     47:         */
                     48:        entry_t *fds;
                     49: 
                     50:        /**
                     51:         * Last registered FD
                     52:         */
                     53:        entry_t *last;
                     54: 
                     55:        /**
                     56:         * Number of registered FDs
                     57:         */
                     58:        u_int count;
                     59: 
                     60:        /**
                     61:         * Pending update of FD list?
                     62:         */
                     63:        bool pending;
                     64: 
                     65:        /**
                     66:         * Running state of watcher
                     67:         */
                     68:        watcher_state_t state;
                     69: 
                     70:        /**
                     71:         * Lock to access FD list
                     72:         */
                     73:        mutex_t *mutex;
                     74: 
                     75:        /**
                     76:         * Condvar to signal completion of callback
                     77:         */
                     78:        condvar_t *condvar;
                     79: 
                     80:        /**
                     81:         * Notification pipe to signal watcher thread
                     82:         */
                     83:        int notify[2];
                     84: 
                     85:        /**
                     86:         * List of callback jobs to process by watcher thread, as job_t
                     87:         */
                     88:        linked_list_t *jobs;
                     89: };
                     90: 
                     91: /**
                     92:  * Entry for a registered file descriptor
                     93:  */
                     94: struct entry_t {
                     95:        /** file descriptor */
                     96:        int fd;
                     97:        /** events to watch */
                     98:        watcher_event_t events;
                     99:        /** registered callback function */
                    100:        watcher_cb_t cb;
                    101:        /** user data to pass to callback */
                    102:        void *data;
                    103:        /** callback(s) currently active? */
                    104:        int in_callback;
                    105:        /** next registered fd */
                    106:        entry_t *next;
                    107: };
                    108: 
                    109: /**
                    110:  * Adds the given entry at the end of the list
                    111:  */
                    112: static void add_entry(private_watcher_t *this, entry_t *entry)
                    113: {
                    114:        if (this->last)
                    115:        {
                    116:                this->last->next = entry;
                    117:                this->last = entry;
                    118:        }
                    119:        else
                    120:        {
                    121:                this->fds = this->last = entry;
                    122:        }
                    123:        this->count++;
                    124: }
                    125: 
                    126: /**
                    127:  * Removes and frees the given entry
                    128:  *
                    129:  * Updates the previous entry and returns the next entry in the list, if any.
                    130:  */
                    131: static entry_t *remove_entry(private_watcher_t *this, entry_t *entry,
                    132:                                                         entry_t *prev)
                    133: {
                    134:        entry_t *next = entry->next;
                    135: 
                    136:        if (prev)
                    137:        {
                    138:                prev->next = next;
                    139:        }
                    140:        else
                    141:        {
                    142:                this->fds = next;
                    143:        }
                    144:        if (this->last == entry)
                    145:        {
                    146:                this->last = prev;
                    147:        }
                    148:        this->count--;
                    149:        free(entry);
                    150:        return next;
                    151: }
                    152: 
                    153: /**
                    154:  * Data we pass on for an async notification
                    155:  */
                    156: typedef struct {
                    157:        /** file descriptor */
                    158:        int fd;
                    159:        /** event type */
                    160:        watcher_event_t event;
                    161:        /** registered callback function */
                    162:        watcher_cb_t cb;
                    163:        /** user data to pass to callback */
                    164:        void *data;
                    165:        /** keep registered? */
                    166:        bool keep;
                    167:        /** reference to watcher */
                    168:        private_watcher_t *this;
                    169: } notify_data_t;
                    170: 
                    171: /**
                    172:  * Notify watcher thread about changes
                    173:  */
                    174: static void update(private_watcher_t *this)
                    175: {
                    176:        char buf[1] = { 'u' };
                    177: 
                    178:        this->pending = TRUE;
                    179:        if (this->notify[1] != -1)
                    180:        {
                    181:                if (write(this->notify[1], buf, sizeof(buf)) == -1)
                    182:                {
                    183:                        DBG1(DBG_JOB, "notifying watcher failed: %s", strerror(errno));
                    184:                }
                    185:        }
                    186: }
                    187: 
                    188: /**
                    189:  * Cleanup function if callback gets cancelled
                    190:  */
                    191: static void unregister(notify_data_t *data)
                    192: {
                    193:        /* if a thread processing a callback gets cancelled, we mark the entry
                    194:         * as cancelled, like the callback would return FALSE. This is required
                    195:         * to not queue this watcher again if all threads have been gone. */
                    196:        data->keep = FALSE;
                    197: }
                    198: 
                    199:  /**
                    200:  * Execute callback of registered FD, asynchronous
                    201:  */
                    202: static job_requeue_t notify_async(notify_data_t *data)
                    203: {
                    204:        thread_cleanup_push((void*)unregister, data);
                    205:        data->keep = data->cb(data->data, data->fd, data->event);
                    206:        thread_cleanup_pop(FALSE);
                    207:        return JOB_REQUEUE_NONE;
                    208: }
                    209: 
                    210: /**
                    211:  * Clean up notification data, reactivate FD
                    212:  */
                    213: static void notify_end(notify_data_t *data)
                    214: {
                    215:        private_watcher_t *this = data->this;
                    216:        entry_t *entry, *prev = NULL;
                    217: 
                    218:        /* reactivate the disabled entry */
                    219:        this->mutex->lock(this->mutex);
                    220:        for (entry = this->fds; entry; prev = entry, entry = entry->next)
                    221:        {
                    222:                if (entry->fd == data->fd)
                    223:                {
                    224:                        if (!data->keep)
                    225:                        {
                    226:                                entry->events &= ~data->event;
                    227:                                if (!entry->events)
                    228:                                {
                    229:                                        remove_entry(this, entry, prev);
                    230:                                        break;
                    231:                                }
                    232:                        }
                    233:                        entry->in_callback--;
                    234:                        break;
                    235:                }
                    236:        }
                    237:        update(this);
                    238:        this->condvar->broadcast(this->condvar);
                    239:        this->mutex->unlock(this->mutex);
                    240: 
                    241:        free(data);
                    242: }
                    243: 
                    244: /**
                    245:  * Execute the callback for a registered FD
                    246:  */
                    247: static void notify(private_watcher_t *this, entry_t *entry,
                    248:                                   watcher_event_t event)
                    249: {
                    250:        notify_data_t *data;
                    251: 
                    252:        /* get a copy of entry for async job, but with specific event */
                    253:        INIT(data,
                    254:                .fd = entry->fd,
                    255:                .event = event,
                    256:                .cb = entry->cb,
                    257:                .data = entry->data,
                    258:                .keep = TRUE,
                    259:                .this = this,
                    260:        );
                    261: 
                    262:        /* deactivate entry, so we can select() other FDs even if the async
                    263:         * processing did not handle the event yet */
                    264:        entry->in_callback++;
                    265: 
                    266:        this->jobs->insert_last(this->jobs,
                    267:                                        callback_job_create_with_prio((void*)notify_async, data,
                    268:                                                (void*)notify_end, (callback_job_cancel_t)return_false,
                    269:                                                JOB_PRIO_CRITICAL));
                    270: }
                    271: 
                    272: /**
                    273:  * Thread cancellation function for watcher thread
                    274:  */
                    275: static void activate_all(private_watcher_t *this)
                    276: {
                    277:        entry_t *entry;
                    278: 
                    279:        /* When the watcher thread gets cancelled, we have to reactivate any entry
                    280:         * and signal threads in remove() to go on. */
                    281: 
                    282:        this->mutex->lock(this->mutex);
                    283:        for (entry = this->fds; entry; entry = entry->next)
                    284:        {
                    285:                entry->in_callback = 0;
                    286:        }
                    287:        this->state = WATCHER_STOPPED;
                    288:        this->condvar->broadcast(this->condvar);
                    289:        this->mutex->unlock(this->mutex);
                    290: }
                    291: 
                    292: /**
                    293:  * Find flagged revents in a pollfd set by fd
                    294:  */
                    295: static inline int find_revents(struct pollfd *pfd, int count, int fd)
                    296: {
                    297:        int i;
                    298: 
                    299:        for (i = 0; i < count; i++)
                    300:        {
                    301:                if (pfd[i].fd == fd)
                    302:                {
                    303:                        return pfd[i].revents;
                    304:                }
                    305:        }
                    306:        return 0;
                    307: }
                    308: 
                    309: /**
                    310:  * Check if entry is waiting for a specific event, and if it got signaled
                    311:  */
                    312: static inline bool entry_ready(entry_t *entry, watcher_event_t event,
                    313:                                                           int revents)
                    314: {
                    315:        if (entry->events & event)
                    316:        {
                    317:                switch (event)
                    318:                {
                    319:                        case WATCHER_READ:
                    320:                                return (revents & (POLLIN | POLLHUP | POLLNVAL)) != 0;
                    321:                        case WATCHER_WRITE:
                    322:                                return (revents & (POLLOUT | POLLHUP | POLLNVAL)) != 0;
                    323:                        case WATCHER_EXCEPT:
                    324:                                return (revents & (POLLERR | POLLHUP | POLLNVAL)) != 0;
                    325:                }
                    326:        }
                    327:        return FALSE;
                    328: }
                    329: 
                    330: /**
                    331:  * Dispatching function
                    332:  */
                    333: static job_requeue_t watch(private_watcher_t *this)
                    334: {
                    335:        entry_t *entry;
                    336:        struct pollfd *pfd;
                    337:        int count = 0, res;
                    338:        bool rebuild = FALSE;
                    339: 
                    340:        this->mutex->lock(this->mutex);
                    341: 
                    342:        count = this->count;
                    343:        if (!count)
                    344:        {
                    345:                this->state = WATCHER_STOPPED;
                    346:                this->mutex->unlock(this->mutex);
                    347:                return JOB_REQUEUE_NONE;
                    348:        }
                    349:        if (this->state == WATCHER_QUEUED)
                    350:        {
                    351:                this->state = WATCHER_RUNNING;
                    352:        }
                    353: 
                    354:        pfd = alloca(sizeof(*pfd) * (count + 1));
                    355:        pfd[0].fd = this->notify[0];
                    356:        pfd[0].events = POLLIN;
                    357:        count = 1;
                    358: 
                    359:        for (entry = this->fds; entry; entry = entry->next)
                    360:        {
                    361:                if (!entry->in_callback)
                    362:                {
                    363:                        pfd[count].fd = entry->fd;
                    364:                        pfd[count].events = 0;
                    365:                        if (entry->events & WATCHER_READ)
                    366:                        {
                    367:                                DBG3(DBG_JOB, "  watching %d for reading", entry->fd);
                    368:                                pfd[count].events |= POLLIN;
                    369:                        }
                    370:                        if (entry->events & WATCHER_WRITE)
                    371:                        {
                    372:                                DBG3(DBG_JOB, "  watching %d for writing", entry->fd);
                    373:                                pfd[count].events |= POLLOUT;
                    374:                        }
                    375:                        if (entry->events & WATCHER_EXCEPT)
                    376:                        {
                    377:                                DBG3(DBG_JOB, "  watching %d for exceptions", entry->fd);
                    378:                                pfd[count].events |= POLLERR;
                    379:                        }
                    380:                        count++;
                    381:                }
                    382:        }
                    383:        this->mutex->unlock(this->mutex);
                    384: 
                    385:        while (!rebuild)
                    386:        {
                    387:                int revents;
                    388:                char buf[1];
                    389:                bool old;
                    390:                ssize_t len;
                    391:                job_t *job;
                    392: 
                    393:                DBG2(DBG_JOB, "watcher going to poll() %d fds", count);
                    394:                thread_cleanup_push((void*)activate_all, this);
                    395:                old = thread_cancelability(TRUE);
                    396: 
                    397:                res = poll(pfd, count, -1);
                    398:                if (res == -1 && errno == EINTR)
                    399:                {
                    400:                        /* LinuxThreads interrupts poll(), but does not make it a
                    401:                         * cancellation point. Manually test if we got cancelled. */
                    402:                        thread_cancellation_point();
                    403:                }
                    404: 
                    405:                thread_cancelability(old);
                    406:                thread_cleanup_pop(FALSE);
                    407: 
                    408:                if (res > 0)
                    409:                {
                    410:                        if (pfd[0].revents & POLLIN)
                    411:                        {
                    412:                                while (TRUE)
                    413:                                {
                    414:                                        len = read(this->notify[0], buf, sizeof(buf));
                    415:                                        if (len == -1)
                    416:                                        {
                    417:                                                if (errno != EAGAIN && errno != EWOULDBLOCK)
                    418:                                                {
                    419:                                                        DBG1(DBG_JOB, "reading watcher notify failed: %s",
                    420:                                                                 strerror(errno));
                    421:                                                }
                    422:                                                break;
                    423:                                        }
                    424:                                }
                    425:                                this->pending = FALSE;
                    426:                                DBG2(DBG_JOB, "watcher got notification, rebuilding");
                    427:                                return JOB_REQUEUE_DIRECT;
                    428:                        }
                    429: 
                    430:                        this->mutex->lock(this->mutex);
                    431:                        for (entry = this->fds; entry; entry = entry->next)
                    432:                        {
                    433:                                if (entry->in_callback)
                    434:                                {
                    435:                                        rebuild = TRUE;
                    436:                                        break;
                    437:                                }
                    438:                                revents = find_revents(pfd, count, entry->fd);
                    439:                                if (entry_ready(entry, WATCHER_EXCEPT, revents))
                    440:                                {
                    441:                                        DBG2(DBG_JOB, "watched FD %d has exception", entry->fd);
                    442:                                        notify(this, entry, WATCHER_EXCEPT);
                    443:                                }
                    444:                                else
                    445:                                {
                    446:                                        if (entry_ready(entry, WATCHER_READ, revents))
                    447:                                        {
                    448:                                                DBG2(DBG_JOB, "watched FD %d ready to read", entry->fd);
                    449:                                                notify(this, entry, WATCHER_READ);
                    450:                                        }
                    451:                                        if (entry_ready(entry, WATCHER_WRITE, revents))
                    452:                                        {
                    453:                                                DBG2(DBG_JOB, "watched FD %d ready to write", entry->fd);
                    454:                                                notify(this, entry, WATCHER_WRITE);
                    455:                                        }
                    456:                                }
                    457:                        }
                    458:                        this->mutex->unlock(this->mutex);
                    459: 
                    460:                        if (this->jobs->get_count(this->jobs))
                    461:                        {
                    462:                                while (this->jobs->remove_first(this->jobs,
                    463:                                                                                                (void**)&job) == SUCCESS)
                    464:                                {
                    465:                                        lib->processor->execute_job(lib->processor, job);
                    466:                                }
                    467:                                /* we temporarily disable a notified FD, rebuild FDSET */
                    468:                                return JOB_REQUEUE_DIRECT;
                    469:                        }
                    470:                }
                    471:                else
                    472:                {
                    473:                        if (!this->pending && errno != EINTR)
                    474:                        {       /* complain only if no pending updates */
                    475:                                DBG1(DBG_JOB, "watcher poll() error: %s", strerror(errno));
                    476:                        }
                    477:                        return JOB_REQUEUE_DIRECT;
                    478:                }
                    479:        }
                    480:        return JOB_REQUEUE_DIRECT;
                    481: }
                    482: 
                    483: METHOD(watcher_t, add, void,
                    484:        private_watcher_t *this, int fd, watcher_event_t events,
                    485:        watcher_cb_t cb, void *data)
                    486: {
                    487:        entry_t *entry;
                    488: 
                    489:        INIT(entry,
                    490:                .fd = fd,
                    491:                .events = events,
                    492:                .cb = cb,
                    493:                .data = data,
                    494:        );
                    495: 
                    496:        this->mutex->lock(this->mutex);
                    497:        add_entry(this, entry);
                    498:        if (this->state == WATCHER_STOPPED)
                    499:        {
                    500:                this->state = WATCHER_QUEUED;
                    501:                lib->processor->queue_job(lib->processor,
                    502:                        (job_t*)callback_job_create_with_prio((void*)watch, this,
                    503:                                NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
                    504:        }
                    505:        else
                    506:        {
                    507:                update(this);
                    508:        }
                    509:        this->mutex->unlock(this->mutex);
                    510: }
                    511: 
                    512: METHOD(watcher_t, remove_, void,
                    513:        private_watcher_t *this, int fd)
                    514: {
                    515:        entry_t *entry, *prev = NULL;
                    516:        bool found = FALSE;
                    517: 
                    518:        this->mutex->lock(this->mutex);
                    519:        while (TRUE)
                    520:        {
                    521:                bool is_in_callback = FALSE;
                    522: 
                    523:                entry = this->fds;
                    524:                while (entry)
                    525:                {
                    526:                        if (entry->fd == fd)
                    527:                        {
                    528:                                if (this->state != WATCHER_STOPPED && entry->in_callback)
                    529:                                {
                    530:                                        is_in_callback = TRUE;
                    531:                                        break;
                    532:                                }
                    533:                                entry = remove_entry(this, entry, prev);
                    534:                                found = TRUE;
                    535:                                continue;
                    536:                        }
                    537:                        prev = entry;
                    538:                        entry = entry->next;
                    539:                }
                    540:                if (!is_in_callback)
                    541:                {
                    542:                        break;
                    543:                }
                    544:                this->condvar->wait(this->condvar, this->mutex);
                    545:        }
                    546:        if (found)
                    547:        {
                    548:                update(this);
                    549:        }
                    550:        this->mutex->unlock(this->mutex);
                    551: }
                    552: 
                    553: METHOD(watcher_t, get_state, watcher_state_t,
                    554:        private_watcher_t *this)
                    555: {
                    556:        watcher_state_t state;
                    557: 
                    558:        this->mutex->lock(this->mutex);
                    559:        state = this->state;
                    560:        this->mutex->unlock(this->mutex);
                    561: 
                    562:        return state;
                    563: }
                    564: 
                    565: METHOD(watcher_t, destroy, void,
                    566:        private_watcher_t *this)
                    567: {
                    568:        this->mutex->destroy(this->mutex);
                    569:        this->condvar->destroy(this->condvar);
                    570:        if (this->notify[0] != -1)
                    571:        {
                    572:                close(this->notify[0]);
                    573:        }
                    574:        if (this->notify[1] != -1)
                    575:        {
                    576:                close(this->notify[1]);
                    577:        }
                    578:        this->jobs->destroy(this->jobs);
                    579:        free(this);
                    580: }
                    581: 
                    582: #ifdef WIN32
                    583: 
                    584: /**
                    585:  * Create notify pipe with a TCP socketpair
                    586:  */
                    587: static bool create_notify(private_watcher_t *this)
                    588: {
                    589:        u_long on = 1;
                    590: 
                    591:        if (socketpair(AF_INET, SOCK_STREAM, 0, this->notify) == 0)
                    592:        {
                    593:                /* use non-blocking I/O on read-end of notify pipe */
                    594:                if (ioctlsocket(this->notify[0], FIONBIO, &on) == 0)
                    595:                {
                    596:                        return TRUE;
                    597:                }
                    598:                DBG1(DBG_LIB, "setting watcher notify pipe read-end non-blocking "
                    599:                         "failed: %s", strerror(errno));
                    600:        }
                    601:        return FALSE;
                    602: }
                    603: 
                    604: #else /* !WIN32 */
                    605: 
                    606: /**
                    607:  * Create a notify pipe with a one-directional pipe
                    608:  */
                    609: static bool create_notify(private_watcher_t *this)
                    610: {
                    611:        int flags;
                    612: 
                    613:        if (pipe(this->notify) == 0)
                    614:        {
                    615:                /* use non-blocking I/O on read-end of notify pipe */
                    616:                flags = fcntl(this->notify[0], F_GETFL);
                    617:                if (flags != -1 &&
                    618:                        fcntl(this->notify[0], F_SETFL, flags | O_NONBLOCK) != -1)
                    619:                {
                    620:                        return TRUE;
                    621:                }
                    622:                DBG1(DBG_LIB, "setting watcher notify pipe read-end non-blocking "
                    623:                         "failed: %s", strerror(errno));
                    624:        }
                    625:        return FALSE;
                    626: }
                    627: 
                    628: #endif /* !WIN32 */
                    629: 
                    630: /**
                    631:  * See header
                    632:  */
                    633: watcher_t *watcher_create()
                    634: {
                    635:        private_watcher_t *this;
                    636: 
                    637:        INIT(this,
                    638:                .public = {
                    639:                        .add = _add,
                    640:                        .remove = _remove_,
                    641:                        .get_state = _get_state,
                    642:                        .destroy = _destroy,
                    643:                },
                    644:                .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
                    645:                .condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
                    646:                .jobs = linked_list_create(),
                    647:                .notify = {-1, -1},
                    648:                .state = WATCHER_STOPPED,
                    649:        );
                    650: 
                    651:        if (!create_notify(this))
                    652:        {
                    653:                DBG1(DBG_LIB, "creating watcher notify pipe failed: %s",
                    654:                         strerror(errno));
                    655:        }
                    656:        return &this->public;
                    657: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>