Annotation of embedaddon/strongswan/src/libstrongswan/processing/watcher.c, revision 1.1

1.1     ! misho       1: /*
        !             2:  * Copyright (C) 2016 Tobias Brunner
        !             3:  * HSR Hochschule fuer Technik Rapperswil
        !             4:  *
        !             5:  * Copyright (C) 2013 Martin Willi
        !             6:  * Copyright (C) 2013 revosec AG
        !             7:  *
        !             8:  * This program is free software; you can redistribute it and/or modify it
        !             9:  * under the terms of the GNU General Public License as published by the
        !            10:  * Free Software Foundation; either version 2 of the License, or (at your
        !            11:  * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
        !            12:  *
        !            13:  * This program is distributed in the hope that it will be useful, but
        !            14:  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
        !            15:  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
        !            16:  * for more details.
        !            17:  */
        !            18: 
        !            19: #include "watcher.h"
        !            20: 
        !            21: #include <library.h>
        !            22: #include <threading/thread.h>
        !            23: #include <threading/mutex.h>
        !            24: #include <threading/condvar.h>
        !            25: #include <collections/linked_list.h>
        !            26: #include <processing/jobs/callback_job.h>
        !            27: 
        !            28: #include <unistd.h>
        !            29: #include <errno.h>
        !            30: #include <fcntl.h>
        !            31: 
        !            32: typedef struct private_watcher_t private_watcher_t;
        !            33: typedef struct entry_t entry_t;
        !            34: 
        !            35: /**
        !            36:  * Private data of an watcher_t object.
        !            37:  */
        !            38: struct private_watcher_t {
        !            39: 
        !            40:        /**
        !            41:         * Public watcher_t interface.
        !            42:         */
        !            43:        watcher_t public;
        !            44: 
        !            45:        /**
        !            46:         * List of registered FDs
        !            47:         */
        !            48:        entry_t *fds;
        !            49: 
        !            50:        /**
        !            51:         * Last registered FD
        !            52:         */
        !            53:        entry_t *last;
        !            54: 
        !            55:        /**
        !            56:         * Number of registered FDs
        !            57:         */
        !            58:        u_int count;
        !            59: 
        !            60:        /**
        !            61:         * Pending update of FD list?
        !            62:         */
        !            63:        bool pending;
        !            64: 
        !            65:        /**
        !            66:         * Running state of watcher
        !            67:         */
        !            68:        watcher_state_t state;
        !            69: 
        !            70:        /**
        !            71:         * Lock to access FD list
        !            72:         */
        !            73:        mutex_t *mutex;
        !            74: 
        !            75:        /**
        !            76:         * Condvar to signal completion of callback
        !            77:         */
        !            78:        condvar_t *condvar;
        !            79: 
        !            80:        /**
        !            81:         * Notification pipe to signal watcher thread
        !            82:         */
        !            83:        int notify[2];
        !            84: 
        !            85:        /**
        !            86:         * List of callback jobs to process by watcher thread, as job_t
        !            87:         */
        !            88:        linked_list_t *jobs;
        !            89: };
        !            90: 
        !            91: /**
        !            92:  * Entry for a registered file descriptor
        !            93:  */
        !            94: struct entry_t {
        !            95:        /** file descriptor */
        !            96:        int fd;
        !            97:        /** events to watch */
        !            98:        watcher_event_t events;
        !            99:        /** registered callback function */
        !           100:        watcher_cb_t cb;
        !           101:        /** user data to pass to callback */
        !           102:        void *data;
        !           103:        /** callback(s) currently active? */
        !           104:        int in_callback;
        !           105:        /** next registered fd */
        !           106:        entry_t *next;
        !           107: };
        !           108: 
        !           109: /**
        !           110:  * Adds the given entry at the end of the list
        !           111:  */
        !           112: static void add_entry(private_watcher_t *this, entry_t *entry)
        !           113: {
        !           114:        if (this->last)
        !           115:        {
        !           116:                this->last->next = entry;
        !           117:                this->last = entry;
        !           118:        }
        !           119:        else
        !           120:        {
        !           121:                this->fds = this->last = entry;
        !           122:        }
        !           123:        this->count++;
        !           124: }
        !           125: 
        !           126: /**
        !           127:  * Removes and frees the given entry
        !           128:  *
        !           129:  * Updates the previous entry and returns the next entry in the list, if any.
        !           130:  */
        !           131: static entry_t *remove_entry(private_watcher_t *this, entry_t *entry,
        !           132:                                                         entry_t *prev)
        !           133: {
        !           134:        entry_t *next = entry->next;
        !           135: 
        !           136:        if (prev)
        !           137:        {
        !           138:                prev->next = next;
        !           139:        }
        !           140:        else
        !           141:        {
        !           142:                this->fds = next;
        !           143:        }
        !           144:        if (this->last == entry)
        !           145:        {
        !           146:                this->last = prev;
        !           147:        }
        !           148:        this->count--;
        !           149:        free(entry);
        !           150:        return next;
        !           151: }
        !           152: 
        !           153: /**
        !           154:  * Data we pass on for an async notification
        !           155:  */
        !           156: typedef struct {
        !           157:        /** file descriptor */
        !           158:        int fd;
        !           159:        /** event type */
        !           160:        watcher_event_t event;
        !           161:        /** registered callback function */
        !           162:        watcher_cb_t cb;
        !           163:        /** user data to pass to callback */
        !           164:        void *data;
        !           165:        /** keep registered? */
        !           166:        bool keep;
        !           167:        /** reference to watcher */
        !           168:        private_watcher_t *this;
        !           169: } notify_data_t;
        !           170: 
        !           171: /**
        !           172:  * Notify watcher thread about changes
        !           173:  */
        !           174: static void update(private_watcher_t *this)
        !           175: {
        !           176:        char buf[1] = { 'u' };
        !           177: 
        !           178:        this->pending = TRUE;
        !           179:        if (this->notify[1] != -1)
        !           180:        {
        !           181:                if (write(this->notify[1], buf, sizeof(buf)) == -1)
        !           182:                {
        !           183:                        DBG1(DBG_JOB, "notifying watcher failed: %s", strerror(errno));
        !           184:                }
        !           185:        }
        !           186: }
        !           187: 
        !           188: /**
        !           189:  * Cleanup function if callback gets cancelled
        !           190:  */
        !           191: static void unregister(notify_data_t *data)
        !           192: {
        !           193:        /* if a thread processing a callback gets cancelled, we mark the entry
        !           194:         * as cancelled, like the callback would return FALSE. This is required
        !           195:         * to not queue this watcher again if all threads have been gone. */
        !           196:        data->keep = FALSE;
        !           197: }
        !           198: 
        !           199:  /**
        !           200:  * Execute callback of registered FD, asynchronous
        !           201:  */
        !           202: static job_requeue_t notify_async(notify_data_t *data)
        !           203: {
        !           204:        thread_cleanup_push((void*)unregister, data);
        !           205:        data->keep = data->cb(data->data, data->fd, data->event);
        !           206:        thread_cleanup_pop(FALSE);
        !           207:        return JOB_REQUEUE_NONE;
        !           208: }
        !           209: 
        !           210: /**
        !           211:  * Clean up notification data, reactivate FD
        !           212:  */
        !           213: static void notify_end(notify_data_t *data)
        !           214: {
        !           215:        private_watcher_t *this = data->this;
        !           216:        entry_t *entry, *prev = NULL;
        !           217: 
        !           218:        /* reactivate the disabled entry */
        !           219:        this->mutex->lock(this->mutex);
        !           220:        for (entry = this->fds; entry; prev = entry, entry = entry->next)
        !           221:        {
        !           222:                if (entry->fd == data->fd)
        !           223:                {
        !           224:                        if (!data->keep)
        !           225:                        {
        !           226:                                entry->events &= ~data->event;
        !           227:                                if (!entry->events)
        !           228:                                {
        !           229:                                        remove_entry(this, entry, prev);
        !           230:                                        break;
        !           231:                                }
        !           232:                        }
        !           233:                        entry->in_callback--;
        !           234:                        break;
        !           235:                }
        !           236:        }
        !           237:        update(this);
        !           238:        this->condvar->broadcast(this->condvar);
        !           239:        this->mutex->unlock(this->mutex);
        !           240: 
        !           241:        free(data);
        !           242: }
        !           243: 
        !           244: /**
        !           245:  * Execute the callback for a registered FD
        !           246:  */
        !           247: static void notify(private_watcher_t *this, entry_t *entry,
        !           248:                                   watcher_event_t event)
        !           249: {
        !           250:        notify_data_t *data;
        !           251: 
        !           252:        /* get a copy of entry for async job, but with specific event */
        !           253:        INIT(data,
        !           254:                .fd = entry->fd,
        !           255:                .event = event,
        !           256:                .cb = entry->cb,
        !           257:                .data = entry->data,
        !           258:                .keep = TRUE,
        !           259:                .this = this,
        !           260:        );
        !           261: 
        !           262:        /* deactivate entry, so we can select() other FDs even if the async
        !           263:         * processing did not handle the event yet */
        !           264:        entry->in_callback++;
        !           265: 
        !           266:        this->jobs->insert_last(this->jobs,
        !           267:                                        callback_job_create_with_prio((void*)notify_async, data,
        !           268:                                                (void*)notify_end, (callback_job_cancel_t)return_false,
        !           269:                                                JOB_PRIO_CRITICAL));
        !           270: }
        !           271: 
        !           272: /**
        !           273:  * Thread cancellation function for watcher thread
        !           274:  */
        !           275: static void activate_all(private_watcher_t *this)
        !           276: {
        !           277:        entry_t *entry;
        !           278: 
        !           279:        /* When the watcher thread gets cancelled, we have to reactivate any entry
        !           280:         * and signal threads in remove() to go on. */
        !           281: 
        !           282:        this->mutex->lock(this->mutex);
        !           283:        for (entry = this->fds; entry; entry = entry->next)
        !           284:        {
        !           285:                entry->in_callback = 0;
        !           286:        }
        !           287:        this->state = WATCHER_STOPPED;
        !           288:        this->condvar->broadcast(this->condvar);
        !           289:        this->mutex->unlock(this->mutex);
        !           290: }
        !           291: 
        !           292: /**
        !           293:  * Find flagged revents in a pollfd set by fd
        !           294:  */
        !           295: static inline int find_revents(struct pollfd *pfd, int count, int fd)
        !           296: {
        !           297:        int i;
        !           298: 
        !           299:        for (i = 0; i < count; i++)
        !           300:        {
        !           301:                if (pfd[i].fd == fd)
        !           302:                {
        !           303:                        return pfd[i].revents;
        !           304:                }
        !           305:        }
        !           306:        return 0;
        !           307: }
        !           308: 
        !           309: /**
        !           310:  * Check if entry is waiting for a specific event, and if it got signaled
        !           311:  */
        !           312: static inline bool entry_ready(entry_t *entry, watcher_event_t event,
        !           313:                                                           int revents)
        !           314: {
        !           315:        if (entry->events & event)
        !           316:        {
        !           317:                switch (event)
        !           318:                {
        !           319:                        case WATCHER_READ:
        !           320:                                return (revents & (POLLIN | POLLHUP | POLLNVAL)) != 0;
        !           321:                        case WATCHER_WRITE:
        !           322:                                return (revents & (POLLOUT | POLLHUP | POLLNVAL)) != 0;
        !           323:                        case WATCHER_EXCEPT:
        !           324:                                return (revents & (POLLERR | POLLHUP | POLLNVAL)) != 0;
        !           325:                }
        !           326:        }
        !           327:        return FALSE;
        !           328: }
        !           329: 
        !           330: /**
        !           331:  * Dispatching function
        !           332:  */
        !           333: static job_requeue_t watch(private_watcher_t *this)
        !           334: {
        !           335:        entry_t *entry;
        !           336:        struct pollfd *pfd;
        !           337:        int count = 0, res;
        !           338:        bool rebuild = FALSE;
        !           339: 
        !           340:        this->mutex->lock(this->mutex);
        !           341: 
        !           342:        count = this->count;
        !           343:        if (!count)
        !           344:        {
        !           345:                this->state = WATCHER_STOPPED;
        !           346:                this->mutex->unlock(this->mutex);
        !           347:                return JOB_REQUEUE_NONE;
        !           348:        }
        !           349:        if (this->state == WATCHER_QUEUED)
        !           350:        {
        !           351:                this->state = WATCHER_RUNNING;
        !           352:        }
        !           353: 
        !           354:        pfd = alloca(sizeof(*pfd) * (count + 1));
        !           355:        pfd[0].fd = this->notify[0];
        !           356:        pfd[0].events = POLLIN;
        !           357:        count = 1;
        !           358: 
        !           359:        for (entry = this->fds; entry; entry = entry->next)
        !           360:        {
        !           361:                if (!entry->in_callback)
        !           362:                {
        !           363:                        pfd[count].fd = entry->fd;
        !           364:                        pfd[count].events = 0;
        !           365:                        if (entry->events & WATCHER_READ)
        !           366:                        {
        !           367:                                DBG3(DBG_JOB, "  watching %d for reading", entry->fd);
        !           368:                                pfd[count].events |= POLLIN;
        !           369:                        }
        !           370:                        if (entry->events & WATCHER_WRITE)
        !           371:                        {
        !           372:                                DBG3(DBG_JOB, "  watching %d for writing", entry->fd);
        !           373:                                pfd[count].events |= POLLOUT;
        !           374:                        }
        !           375:                        if (entry->events & WATCHER_EXCEPT)
        !           376:                        {
        !           377:                                DBG3(DBG_JOB, "  watching %d for exceptions", entry->fd);
        !           378:                                pfd[count].events |= POLLERR;
        !           379:                        }
        !           380:                        count++;
        !           381:                }
        !           382:        }
        !           383:        this->mutex->unlock(this->mutex);
        !           384: 
        !           385:        while (!rebuild)
        !           386:        {
        !           387:                int revents;
        !           388:                char buf[1];
        !           389:                bool old;
        !           390:                ssize_t len;
        !           391:                job_t *job;
        !           392: 
        !           393:                DBG2(DBG_JOB, "watcher going to poll() %d fds", count);
        !           394:                thread_cleanup_push((void*)activate_all, this);
        !           395:                old = thread_cancelability(TRUE);
        !           396: 
        !           397:                res = poll(pfd, count, -1);
        !           398:                if (res == -1 && errno == EINTR)
        !           399:                {
        !           400:                        /* LinuxThreads interrupts poll(), but does not make it a
        !           401:                         * cancellation point. Manually test if we got cancelled. */
        !           402:                        thread_cancellation_point();
        !           403:                }
        !           404: 
        !           405:                thread_cancelability(old);
        !           406:                thread_cleanup_pop(FALSE);
        !           407: 
        !           408:                if (res > 0)
        !           409:                {
        !           410:                        if (pfd[0].revents & POLLIN)
        !           411:                        {
        !           412:                                while (TRUE)
        !           413:                                {
        !           414:                                        len = read(this->notify[0], buf, sizeof(buf));
        !           415:                                        if (len == -1)
        !           416:                                        {
        !           417:                                                if (errno != EAGAIN && errno != EWOULDBLOCK)
        !           418:                                                {
        !           419:                                                        DBG1(DBG_JOB, "reading watcher notify failed: %s",
        !           420:                                                                 strerror(errno));
        !           421:                                                }
        !           422:                                                break;
        !           423:                                        }
        !           424:                                }
        !           425:                                this->pending = FALSE;
        !           426:                                DBG2(DBG_JOB, "watcher got notification, rebuilding");
        !           427:                                return JOB_REQUEUE_DIRECT;
        !           428:                        }
        !           429: 
        !           430:                        this->mutex->lock(this->mutex);
        !           431:                        for (entry = this->fds; entry; entry = entry->next)
        !           432:                        {
        !           433:                                if (entry->in_callback)
        !           434:                                {
        !           435:                                        rebuild = TRUE;
        !           436:                                        break;
        !           437:                                }
        !           438:                                revents = find_revents(pfd, count, entry->fd);
        !           439:                                if (entry_ready(entry, WATCHER_EXCEPT, revents))
        !           440:                                {
        !           441:                                        DBG2(DBG_JOB, "watched FD %d has exception", entry->fd);
        !           442:                                        notify(this, entry, WATCHER_EXCEPT);
        !           443:                                }
        !           444:                                else
        !           445:                                {
        !           446:                                        if (entry_ready(entry, WATCHER_READ, revents))
        !           447:                                        {
        !           448:                                                DBG2(DBG_JOB, "watched FD %d ready to read", entry->fd);
        !           449:                                                notify(this, entry, WATCHER_READ);
        !           450:                                        }
        !           451:                                        if (entry_ready(entry, WATCHER_WRITE, revents))
        !           452:                                        {
        !           453:                                                DBG2(DBG_JOB, "watched FD %d ready to write", entry->fd);
        !           454:                                                notify(this, entry, WATCHER_WRITE);
        !           455:                                        }
        !           456:                                }
        !           457:                        }
        !           458:                        this->mutex->unlock(this->mutex);
        !           459: 
        !           460:                        if (this->jobs->get_count(this->jobs))
        !           461:                        {
        !           462:                                while (this->jobs->remove_first(this->jobs,
        !           463:                                                                                                (void**)&job) == SUCCESS)
        !           464:                                {
        !           465:                                        lib->processor->execute_job(lib->processor, job);
        !           466:                                }
        !           467:                                /* we temporarily disable a notified FD, rebuild FDSET */
        !           468:                                return JOB_REQUEUE_DIRECT;
        !           469:                        }
        !           470:                }
        !           471:                else
        !           472:                {
        !           473:                        if (!this->pending && errno != EINTR)
        !           474:                        {       /* complain only if no pending updates */
        !           475:                                DBG1(DBG_JOB, "watcher poll() error: %s", strerror(errno));
        !           476:                        }
        !           477:                        return JOB_REQUEUE_DIRECT;
        !           478:                }
        !           479:        }
        !           480:        return JOB_REQUEUE_DIRECT;
        !           481: }
        !           482: 
        !           483: METHOD(watcher_t, add, void,
        !           484:        private_watcher_t *this, int fd, watcher_event_t events,
        !           485:        watcher_cb_t cb, void *data)
        !           486: {
        !           487:        entry_t *entry;
        !           488: 
        !           489:        INIT(entry,
        !           490:                .fd = fd,
        !           491:                .events = events,
        !           492:                .cb = cb,
        !           493:                .data = data,
        !           494:        );
        !           495: 
        !           496:        this->mutex->lock(this->mutex);
        !           497:        add_entry(this, entry);
        !           498:        if (this->state == WATCHER_STOPPED)
        !           499:        {
        !           500:                this->state = WATCHER_QUEUED;
        !           501:                lib->processor->queue_job(lib->processor,
        !           502:                        (job_t*)callback_job_create_with_prio((void*)watch, this,
        !           503:                                NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
        !           504:        }
        !           505:        else
        !           506:        {
        !           507:                update(this);
        !           508:        }
        !           509:        this->mutex->unlock(this->mutex);
        !           510: }
        !           511: 
        !           512: METHOD(watcher_t, remove_, void,
        !           513:        private_watcher_t *this, int fd)
        !           514: {
        !           515:        entry_t *entry, *prev = NULL;
        !           516:        bool found = FALSE;
        !           517: 
        !           518:        this->mutex->lock(this->mutex);
        !           519:        while (TRUE)
        !           520:        {
        !           521:                bool is_in_callback = FALSE;
        !           522: 
        !           523:                entry = this->fds;
        !           524:                while (entry)
        !           525:                {
        !           526:                        if (entry->fd == fd)
        !           527:                        {
        !           528:                                if (this->state != WATCHER_STOPPED && entry->in_callback)
        !           529:                                {
        !           530:                                        is_in_callback = TRUE;
        !           531:                                        break;
        !           532:                                }
        !           533:                                entry = remove_entry(this, entry, prev);
        !           534:                                found = TRUE;
        !           535:                                continue;
        !           536:                        }
        !           537:                        prev = entry;
        !           538:                        entry = entry->next;
        !           539:                }
        !           540:                if (!is_in_callback)
        !           541:                {
        !           542:                        break;
        !           543:                }
        !           544:                this->condvar->wait(this->condvar, this->mutex);
        !           545:        }
        !           546:        if (found)
        !           547:        {
        !           548:                update(this);
        !           549:        }
        !           550:        this->mutex->unlock(this->mutex);
        !           551: }
        !           552: 
        !           553: METHOD(watcher_t, get_state, watcher_state_t,
        !           554:        private_watcher_t *this)
        !           555: {
        !           556:        watcher_state_t state;
        !           557: 
        !           558:        this->mutex->lock(this->mutex);
        !           559:        state = this->state;
        !           560:        this->mutex->unlock(this->mutex);
        !           561: 
        !           562:        return state;
        !           563: }
        !           564: 
        !           565: METHOD(watcher_t, destroy, void,
        !           566:        private_watcher_t *this)
        !           567: {
        !           568:        this->mutex->destroy(this->mutex);
        !           569:        this->condvar->destroy(this->condvar);
        !           570:        if (this->notify[0] != -1)
        !           571:        {
        !           572:                close(this->notify[0]);
        !           573:        }
        !           574:        if (this->notify[1] != -1)
        !           575:        {
        !           576:                close(this->notify[1]);
        !           577:        }
        !           578:        this->jobs->destroy(this->jobs);
        !           579:        free(this);
        !           580: }
        !           581: 
        !           582: #ifdef WIN32
        !           583: 
        !           584: /**
        !           585:  * Create notify pipe with a TCP socketpair
        !           586:  */
        !           587: static bool create_notify(private_watcher_t *this)
        !           588: {
        !           589:        u_long on = 1;
        !           590: 
        !           591:        if (socketpair(AF_INET, SOCK_STREAM, 0, this->notify) == 0)
        !           592:        {
        !           593:                /* use non-blocking I/O on read-end of notify pipe */
        !           594:                if (ioctlsocket(this->notify[0], FIONBIO, &on) == 0)
        !           595:                {
        !           596:                        return TRUE;
        !           597:                }
        !           598:                DBG1(DBG_LIB, "setting watcher notify pipe read-end non-blocking "
        !           599:                         "failed: %s", strerror(errno));
        !           600:        }
        !           601:        return FALSE;
        !           602: }
        !           603: 
        !           604: #else /* !WIN32 */
        !           605: 
        !           606: /**
        !           607:  * Create a notify pipe with a one-directional pipe
        !           608:  */
        !           609: static bool create_notify(private_watcher_t *this)
        !           610: {
        !           611:        int flags;
        !           612: 
        !           613:        if (pipe(this->notify) == 0)
        !           614:        {
        !           615:                /* use non-blocking I/O on read-end of notify pipe */
        !           616:                flags = fcntl(this->notify[0], F_GETFL);
        !           617:                if (flags != -1 &&
        !           618:                        fcntl(this->notify[0], F_SETFL, flags | O_NONBLOCK) != -1)
        !           619:                {
        !           620:                        return TRUE;
        !           621:                }
        !           622:                DBG1(DBG_LIB, "setting watcher notify pipe read-end non-blocking "
        !           623:                         "failed: %s", strerror(errno));
        !           624:        }
        !           625:        return FALSE;
        !           626: }
        !           627: 
        !           628: #endif /* !WIN32 */
        !           629: 
        !           630: /**
        !           631:  * See header
        !           632:  */
        !           633: watcher_t *watcher_create()
        !           634: {
        !           635:        private_watcher_t *this;
        !           636: 
        !           637:        INIT(this,
        !           638:                .public = {
        !           639:                        .add = _add,
        !           640:                        .remove = _remove_,
        !           641:                        .get_state = _get_state,
        !           642:                        .destroy = _destroy,
        !           643:                },
        !           644:                .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
        !           645:                .condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
        !           646:                .jobs = linked_list_create(),
        !           647:                .notify = {-1, -1},
        !           648:                .state = WATCHER_STOPPED,
        !           649:        );
        !           650: 
        !           651:        if (!create_notify(this))
        !           652:        {
        !           653:                DBG1(DBG_LIB, "creating watcher notify pipe failed: %s",
        !           654:                         strerror(errno));
        !           655:        }
        !           656:        return &this->public;
        !           657: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>