Annotation of embedaddon/strongswan/src/libstrongswan/networking/streams/stream_service.c, revision 1.1.1.1

1.1       misho       1: /*
                      2:  * Copyright (C) 2013 Martin Willi
                      3:  * Copyright (C) 2013 revosec AG
                      4:  *
                      5:  * This program is free software; you can redistribute it and/or modify it
                      6:  * under the terms of the GNU General Public License as published by the
                      7:  * Free Software Foundation; either version 2 of the License, or (at your
                      8:  * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
                      9:  *
                     10:  * This program is distributed in the hope that it will be useful, but
                     11:  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
                     12:  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
                     13:  * for more details.
                     14:  */
                     15: 
                     16: #include <library.h>
                     17: #include <threading/thread.h>
                     18: #include <threading/mutex.h>
                     19: #include <threading/condvar.h>
                     20: #include <processing/jobs/callback_job.h>
                     21: 
                     22: #include "stream_service.h"
                     23: 
                     24: #include <errno.h>
                     25: #include <unistd.h>
                     26: #include <sys/stat.h>
                     27: 
                     28: typedef struct private_stream_service_t private_stream_service_t;
                     29: 
                     30: /**
                     31:  * Private data of an stream_service_t object.
                     32:  */
                     33: struct private_stream_service_t {
                     34: 
                     35:        /**
                     36:         * Public stream_service_t interface.
                     37:         */
                     38:        stream_service_t public;
                     39: 
                     40:        /**
                     41:         * Underlying socket
                     42:         */
                     43:        int fd;
                     44: 
                     45:        /**
                     46:         * Accept callback
                     47:         */
                     48:        stream_service_cb_t cb;
                     49: 
                     50:        /**
                     51:         * Accept callback data
                     52:         */
                     53:        void *data;
                     54: 
                     55:        /**
                     56:         * Job priority to invoke callback with
                     57:         */
                     58:        job_priority_t prio;
                     59: 
                     60:        /**
                     61:         * Maximum number of parallel callback invocations
                     62:         */
                     63:        u_int cncrncy;
                     64: 
                     65:        /**
                     66:         * Currently active jobs
                     67:         */
                     68:        u_int active;
                     69: 
                     70:        /**
                     71:         * Currently running jobs
                     72:         */
                     73:        u_int running;
                     74: 
                     75:        /**
                     76:         * mutex to lock active counter
                     77:         */
                     78:        mutex_t *mutex;
                     79: 
                     80:        /**
                     81:         * Condvar to wait for callback termination
                     82:         */
                     83:        condvar_t *condvar;
                     84: 
                     85:        /**
                     86:         * TRUE when the service is terminated
                     87:         */
                     88:        bool terminated;
                     89: 
                     90:        /**
                     91:         * Reference counter
                     92:         */
                     93:        refcount_t ref;
                     94: };
                     95: 
                     96: static void destroy_service(private_stream_service_t *this)
                     97: {
                     98:        if (ref_put(&this->ref))
                     99:        {
                    100:                close(this->fd);
                    101:                this->mutex->destroy(this->mutex);
                    102:                this->condvar->destroy(this->condvar);
                    103:                free(this);
                    104:        }
                    105: }
                    106: 
                    107: /**
                    108:  * Data to pass to async accept job
                    109:  */
                    110: typedef struct {
                    111:        /** callback function */
                    112:        stream_service_cb_t cb;
                    113:        /** callback data */
                    114:        void *data;
                    115:        /** accepted connection */
                    116:        int fd;
                    117:        /** reference to stream service */
                    118:        private_stream_service_t *this;
                    119: } async_data_t;
                    120: 
                    121: /**
                    122:  * Forward declaration
                    123:  */
                    124: static bool watch(private_stream_service_t *this, int fd, watcher_event_t event);
                    125: 
                    126: /**
                    127:  * Clean up accept data
                    128:  */
                    129: static void destroy_async_data(async_data_t *data)
                    130: {
                    131:        private_stream_service_t *this = data->this;
                    132: 
                    133:        this->mutex->lock(this->mutex);
                    134:        if (this->active-- == this->cncrncy && !this->terminated)
                    135:        {
                    136:                /* leaving concurrency limit, restart accept()ing. */
                    137:                lib->watcher->add(lib->watcher, this->fd,
                    138:                                                  WATCHER_READ, (watcher_cb_t)watch, this);
                    139:        }
                    140:        this->condvar->signal(this->condvar);
                    141:        this->mutex->unlock(this->mutex);
                    142:        destroy_service(this);
                    143: 
                    144:        if (data->fd != -1)
                    145:        {
                    146:                close(data->fd);
                    147:        }
                    148:        free(data);
                    149: }
                    150: 
                    151: /**
                    152:  * Reduce running counter
                    153:  */
                    154: CALLBACK(reduce_running, void,
                    155:        async_data_t *data)
                    156: {
                    157:        private_stream_service_t *this = data->this;
                    158: 
                    159:        this->mutex->lock(this->mutex);
                    160:        this->running--;
                    161:        this->condvar->signal(this->condvar);
                    162:        this->mutex->unlock(this->mutex);
                    163: }
                    164: 
                    165: /**
                    166:  * Async processing of accepted connection
                    167:  */
                    168: static job_requeue_t accept_async(async_data_t *data)
                    169: {
                    170:        private_stream_service_t *this = data->this;
                    171:        stream_t *stream;
                    172: 
                    173:        this->mutex->lock(this->mutex);
                    174:        if (this->terminated)
                    175:        {
                    176:                this->mutex->unlock(this->mutex);
                    177:                return JOB_REQUEUE_NONE;
                    178:        }
                    179:        this->running++;
                    180:        this->mutex->unlock(this->mutex);
                    181: 
                    182:        stream = stream_create_from_fd(data->fd);
                    183:        if (stream)
                    184:        {
                    185:                /* FD is now owned by stream, don't close it during cleanup */
                    186:                data->fd = -1;
                    187:                thread_cleanup_push(reduce_running, data);
                    188:                thread_cleanup_push((void*)stream->destroy, stream);
                    189:                thread_cleanup_pop(!data->cb(data->data, stream));
                    190:                thread_cleanup_pop(TRUE);
                    191:        }
                    192:        return JOB_REQUEUE_NONE;
                    193: }
                    194: 
                    195: /**
                    196:  * Watcher callback function
                    197:  */
                    198: static bool watch(private_stream_service_t *this, int fd, watcher_event_t event)
                    199: {
                    200:        async_data_t *data;
                    201:        bool keep = TRUE;
                    202: 
                    203:        INIT(data,
                    204:                .cb = this->cb,
                    205:                .data = this->data,
                    206:                .fd = accept(fd, NULL, NULL),
                    207:                .this = this,
                    208:        );
                    209: 
                    210:        if (data->fd != -1 && !this->terminated)
                    211:        {
                    212:                this->mutex->lock(this->mutex);
                    213:                if (++this->active == this->cncrncy)
                    214:                {
                    215:                        /* concurrency limit reached, stop accept()ing new connections */
                    216:                        keep = FALSE;
                    217:                }
                    218:                this->mutex->unlock(this->mutex);
                    219:                ref_get(&this->ref);
                    220: 
                    221:                lib->processor->queue_job(lib->processor,
                    222:                        (job_t*)callback_job_create_with_prio((void*)accept_async, data,
                    223:                                (void*)destroy_async_data, (callback_job_cancel_t)return_false,
                    224:                                this->prio));
                    225:        }
                    226:        else
                    227:        {
                    228:                free(data);
                    229:        }
                    230:        return keep;
                    231: }
                    232: 
                    233: METHOD(stream_service_t, on_accept, void,
                    234:        private_stream_service_t *this, stream_service_cb_t cb, void *data,
                    235:        job_priority_t prio, u_int cncrncy)
                    236: {
                    237:        this->mutex->lock(this->mutex);
                    238: 
                    239:        if (this->terminated)
                    240:        {
                    241:                this->mutex->unlock(this->mutex);
                    242:                return;
                    243:        }
                    244: 
                    245:        /* wait for all callbacks to return */
                    246:        while (this->active)
                    247:        {
                    248:                this->condvar->wait(this->condvar, this->mutex);
                    249:        }
                    250: 
                    251:        if (this->cb)
                    252:        {
                    253:                lib->watcher->remove(lib->watcher, this->fd);
                    254:        }
                    255: 
                    256:        this->cb = cb;
                    257:        this->data = data;
                    258:        if (prio <= JOB_PRIO_MAX)
                    259:        {
                    260:                this->prio = prio;
                    261:        }
                    262:        this->cncrncy = cncrncy;
                    263: 
                    264:        if (this->cb)
                    265:        {
                    266:                lib->watcher->add(lib->watcher, this->fd,
                    267:                                                  WATCHER_READ, (watcher_cb_t)watch, this);
                    268:        }
                    269: 
                    270:        this->mutex->unlock(this->mutex);
                    271: }
                    272: 
                    273: METHOD(stream_service_t, destroy, void,
                    274:        private_stream_service_t *this)
                    275: {
                    276:        this->mutex->lock(this->mutex);
                    277:        lib->watcher->remove(lib->watcher, this->fd);
                    278:        this->terminated = TRUE;
                    279:        while (this->running)
                    280:        {
                    281:                this->condvar->wait(this->condvar, this->mutex);
                    282:        }
                    283:        this->mutex->unlock(this->mutex);
                    284:        destroy_service(this);
                    285: }
                    286: 
                    287: /**
                    288:  * See header
                    289:  */
                    290: stream_service_t *stream_service_create_from_fd(int fd)
                    291: {
                    292:        private_stream_service_t *this;
                    293: 
                    294:        INIT(this,
                    295:                .public = {
                    296:                        .on_accept = _on_accept,
                    297:                        .destroy = _destroy,
                    298:                },
                    299:                .fd = fd,
                    300:                .prio = JOB_PRIO_MEDIUM,
                    301:                .mutex = mutex_create(MUTEX_TYPE_RECURSIVE),
                    302:                .condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
                    303:                .ref = 1,
                    304:        );
                    305: 
                    306:        return &this->public;
                    307: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>