Annotation of embedaddon/strongswan/src/libstrongswan/networking/streams/stream_service.c, revision 1.1

1.1     ! misho       1: /*
        !             2:  * Copyright (C) 2013 Martin Willi
        !             3:  * Copyright (C) 2013 revosec AG
        !             4:  *
        !             5:  * This program is free software; you can redistribute it and/or modify it
        !             6:  * under the terms of the GNU General Public License as published by the
        !             7:  * Free Software Foundation; either version 2 of the License, or (at your
        !             8:  * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
        !             9:  *
        !            10:  * This program is distributed in the hope that it will be useful, but
        !            11:  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
        !            12:  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
        !            13:  * for more details.
        !            14:  */
        !            15: 
        !            16: #include <library.h>
        !            17: #include <threading/thread.h>
        !            18: #include <threading/mutex.h>
        !            19: #include <threading/condvar.h>
        !            20: #include <processing/jobs/callback_job.h>
        !            21: 
        !            22: #include "stream_service.h"
        !            23: 
        !            24: #include <errno.h>
        !            25: #include <unistd.h>
        !            26: #include <sys/stat.h>
        !            27: 
        !            28: typedef struct private_stream_service_t private_stream_service_t;
        !            29: 
        !            30: /**
        !            31:  * Private data of an stream_service_t object.
        !            32:  */
        !            33: struct private_stream_service_t {
        !            34: 
        !            35:        /**
        !            36:         * Public stream_service_t interface.
        !            37:         */
        !            38:        stream_service_t public;
        !            39: 
        !            40:        /**
        !            41:         * Underlying socket
        !            42:         */
        !            43:        int fd;
        !            44: 
        !            45:        /**
        !            46:         * Accept callback
        !            47:         */
        !            48:        stream_service_cb_t cb;
        !            49: 
        !            50:        /**
        !            51:         * Accept callback data
        !            52:         */
        !            53:        void *data;
        !            54: 
        !            55:        /**
        !            56:         * Job priority to invoke callback with
        !            57:         */
        !            58:        job_priority_t prio;
        !            59: 
        !            60:        /**
        !            61:         * Maximum number of parallel callback invocations
        !            62:         */
        !            63:        u_int cncrncy;
        !            64: 
        !            65:        /**
        !            66:         * Currently active jobs
        !            67:         */
        !            68:        u_int active;
        !            69: 
        !            70:        /**
        !            71:         * Currently running jobs
        !            72:         */
        !            73:        u_int running;
        !            74: 
        !            75:        /**
        !            76:         * mutex to lock active counter
        !            77:         */
        !            78:        mutex_t *mutex;
        !            79: 
        !            80:        /**
        !            81:         * Condvar to wait for callback termination
        !            82:         */
        !            83:        condvar_t *condvar;
        !            84: 
        !            85:        /**
        !            86:         * TRUE when the service is terminated
        !            87:         */
        !            88:        bool terminated;
        !            89: 
        !            90:        /**
        !            91:         * Reference counter
        !            92:         */
        !            93:        refcount_t ref;
        !            94: };
        !            95: 
        !            96: static void destroy_service(private_stream_service_t *this)
        !            97: {
        !            98:        if (ref_put(&this->ref))
        !            99:        {
        !           100:                close(this->fd);
        !           101:                this->mutex->destroy(this->mutex);
        !           102:                this->condvar->destroy(this->condvar);
        !           103:                free(this);
        !           104:        }
        !           105: }
        !           106: 
        !           107: /**
        !           108:  * Data to pass to async accept job
        !           109:  */
        !           110: typedef struct {
        !           111:        /** callback function */
        !           112:        stream_service_cb_t cb;
        !           113:        /** callback data */
        !           114:        void *data;
        !           115:        /** accepted connection */
        !           116:        int fd;
        !           117:        /** reference to stream service */
        !           118:        private_stream_service_t *this;
        !           119: } async_data_t;
        !           120: 
        !           121: /**
        !           122:  * Forward declaration
        !           123:  */
        !           124: static bool watch(private_stream_service_t *this, int fd, watcher_event_t event);
        !           125: 
        !           126: /**
        !           127:  * Clean up accept data
        !           128:  */
        !           129: static void destroy_async_data(async_data_t *data)
        !           130: {
        !           131:        private_stream_service_t *this = data->this;
        !           132: 
        !           133:        this->mutex->lock(this->mutex);
        !           134:        if (this->active-- == this->cncrncy && !this->terminated)
        !           135:        {
        !           136:                /* leaving concurrency limit, restart accept()ing. */
        !           137:                lib->watcher->add(lib->watcher, this->fd,
        !           138:                                                  WATCHER_READ, (watcher_cb_t)watch, this);
        !           139:        }
        !           140:        this->condvar->signal(this->condvar);
        !           141:        this->mutex->unlock(this->mutex);
        !           142:        destroy_service(this);
        !           143: 
        !           144:        if (data->fd != -1)
        !           145:        {
        !           146:                close(data->fd);
        !           147:        }
        !           148:        free(data);
        !           149: }
        !           150: 
        !           151: /**
        !           152:  * Reduce running counter
        !           153:  */
        !           154: CALLBACK(reduce_running, void,
        !           155:        async_data_t *data)
        !           156: {
        !           157:        private_stream_service_t *this = data->this;
        !           158: 
        !           159:        this->mutex->lock(this->mutex);
        !           160:        this->running--;
        !           161:        this->condvar->signal(this->condvar);
        !           162:        this->mutex->unlock(this->mutex);
        !           163: }
        !           164: 
        !           165: /**
        !           166:  * Async processing of accepted connection
        !           167:  */
        !           168: static job_requeue_t accept_async(async_data_t *data)
        !           169: {
        !           170:        private_stream_service_t *this = data->this;
        !           171:        stream_t *stream;
        !           172: 
        !           173:        this->mutex->lock(this->mutex);
        !           174:        if (this->terminated)
        !           175:        {
        !           176:                this->mutex->unlock(this->mutex);
        !           177:                return JOB_REQUEUE_NONE;
        !           178:        }
        !           179:        this->running++;
        !           180:        this->mutex->unlock(this->mutex);
        !           181: 
        !           182:        stream = stream_create_from_fd(data->fd);
        !           183:        if (stream)
        !           184:        {
        !           185:                /* FD is now owned by stream, don't close it during cleanup */
        !           186:                data->fd = -1;
        !           187:                thread_cleanup_push(reduce_running, data);
        !           188:                thread_cleanup_push((void*)stream->destroy, stream);
        !           189:                thread_cleanup_pop(!data->cb(data->data, stream));
        !           190:                thread_cleanup_pop(TRUE);
        !           191:        }
        !           192:        return JOB_REQUEUE_NONE;
        !           193: }
        !           194: 
        !           195: /**
        !           196:  * Watcher callback function
        !           197:  */
        !           198: static bool watch(private_stream_service_t *this, int fd, watcher_event_t event)
        !           199: {
        !           200:        async_data_t *data;
        !           201:        bool keep = TRUE;
        !           202: 
        !           203:        INIT(data,
        !           204:                .cb = this->cb,
        !           205:                .data = this->data,
        !           206:                .fd = accept(fd, NULL, NULL),
        !           207:                .this = this,
        !           208:        );
        !           209: 
        !           210:        if (data->fd != -1 && !this->terminated)
        !           211:        {
        !           212:                this->mutex->lock(this->mutex);
        !           213:                if (++this->active == this->cncrncy)
        !           214:                {
        !           215:                        /* concurrency limit reached, stop accept()ing new connections */
        !           216:                        keep = FALSE;
        !           217:                }
        !           218:                this->mutex->unlock(this->mutex);
        !           219:                ref_get(&this->ref);
        !           220: 
        !           221:                lib->processor->queue_job(lib->processor,
        !           222:                        (job_t*)callback_job_create_with_prio((void*)accept_async, data,
        !           223:                                (void*)destroy_async_data, (callback_job_cancel_t)return_false,
        !           224:                                this->prio));
        !           225:        }
        !           226:        else
        !           227:        {
        !           228:                free(data);
        !           229:        }
        !           230:        return keep;
        !           231: }
        !           232: 
        !           233: METHOD(stream_service_t, on_accept, void,
        !           234:        private_stream_service_t *this, stream_service_cb_t cb, void *data,
        !           235:        job_priority_t prio, u_int cncrncy)
        !           236: {
        !           237:        this->mutex->lock(this->mutex);
        !           238: 
        !           239:        if (this->terminated)
        !           240:        {
        !           241:                this->mutex->unlock(this->mutex);
        !           242:                return;
        !           243:        }
        !           244: 
        !           245:        /* wait for all callbacks to return */
        !           246:        while (this->active)
        !           247:        {
        !           248:                this->condvar->wait(this->condvar, this->mutex);
        !           249:        }
        !           250: 
        !           251:        if (this->cb)
        !           252:        {
        !           253:                lib->watcher->remove(lib->watcher, this->fd);
        !           254:        }
        !           255: 
        !           256:        this->cb = cb;
        !           257:        this->data = data;
        !           258:        if (prio <= JOB_PRIO_MAX)
        !           259:        {
        !           260:                this->prio = prio;
        !           261:        }
        !           262:        this->cncrncy = cncrncy;
        !           263: 
        !           264:        if (this->cb)
        !           265:        {
        !           266:                lib->watcher->add(lib->watcher, this->fd,
        !           267:                                                  WATCHER_READ, (watcher_cb_t)watch, this);
        !           268:        }
        !           269: 
        !           270:        this->mutex->unlock(this->mutex);
        !           271: }
        !           272: 
        !           273: METHOD(stream_service_t, destroy, void,
        !           274:        private_stream_service_t *this)
        !           275: {
        !           276:        this->mutex->lock(this->mutex);
        !           277:        lib->watcher->remove(lib->watcher, this->fd);
        !           278:        this->terminated = TRUE;
        !           279:        while (this->running)
        !           280:        {
        !           281:                this->condvar->wait(this->condvar, this->mutex);
        !           282:        }
        !           283:        this->mutex->unlock(this->mutex);
        !           284:        destroy_service(this);
        !           285: }
        !           286: 
        !           287: /**
        !           288:  * See header
        !           289:  */
        !           290: stream_service_t *stream_service_create_from_fd(int fd)
        !           291: {
        !           292:        private_stream_service_t *this;
        !           293: 
        !           294:        INIT(this,
        !           295:                .public = {
        !           296:                        .on_accept = _on_accept,
        !           297:                        .destroy = _destroy,
        !           298:                },
        !           299:                .fd = fd,
        !           300:                .prio = JOB_PRIO_MEDIUM,
        !           301:                .mutex = mutex_create(MUTEX_TYPE_RECURSIVE),
        !           302:                .condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
        !           303:                .ref = 1,
        !           304:        );
        !           305: 
        !           306:        return &this->public;
        !           307: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>