Annotation of embedaddon/strongswan/src/libstrongswan/networking/streams/stream_service.c, revision 1.1
1.1 ! misho 1: /*
! 2: * Copyright (C) 2013 Martin Willi
! 3: * Copyright (C) 2013 revosec AG
! 4: *
! 5: * This program is free software; you can redistribute it and/or modify it
! 6: * under the terms of the GNU General Public License as published by the
! 7: * Free Software Foundation; either version 2 of the License, or (at your
! 8: * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
! 9: *
! 10: * This program is distributed in the hope that it will be useful, but
! 11: * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
! 12: * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
! 13: * for more details.
! 14: */
! 15:
! 16: #include <library.h>
! 17: #include <threading/thread.h>
! 18: #include <threading/mutex.h>
! 19: #include <threading/condvar.h>
! 20: #include <processing/jobs/callback_job.h>
! 21:
! 22: #include "stream_service.h"
! 23:
! 24: #include <errno.h>
! 25: #include <unistd.h>
! 26: #include <sys/stat.h>
! 27:
! 28: typedef struct private_stream_service_t private_stream_service_t;
! 29:
! 30: /**
! 31: * Private data of an stream_service_t object.
! 32: */
! 33: struct private_stream_service_t {
! 34:
! 35: /**
! 36: * Public stream_service_t interface.
! 37: */
! 38: stream_service_t public;
! 39:
! 40: /**
! 41: * Underlying socket
! 42: */
! 43: int fd;
! 44:
! 45: /**
! 46: * Accept callback
! 47: */
! 48: stream_service_cb_t cb;
! 49:
! 50: /**
! 51: * Accept callback data
! 52: */
! 53: void *data;
! 54:
! 55: /**
! 56: * Job priority to invoke callback with
! 57: */
! 58: job_priority_t prio;
! 59:
! 60: /**
! 61: * Maximum number of parallel callback invocations
! 62: */
! 63: u_int cncrncy;
! 64:
! 65: /**
! 66: * Currently active jobs
! 67: */
! 68: u_int active;
! 69:
! 70: /**
! 71: * Currently running jobs
! 72: */
! 73: u_int running;
! 74:
! 75: /**
! 76: * mutex to lock active counter
! 77: */
! 78: mutex_t *mutex;
! 79:
! 80: /**
! 81: * Condvar to wait for callback termination
! 82: */
! 83: condvar_t *condvar;
! 84:
! 85: /**
! 86: * TRUE when the service is terminated
! 87: */
! 88: bool terminated;
! 89:
! 90: /**
! 91: * Reference counter
! 92: */
! 93: refcount_t ref;
! 94: };
! 95:
! 96: static void destroy_service(private_stream_service_t *this)
! 97: {
! 98: if (ref_put(&this->ref))
! 99: {
! 100: close(this->fd);
! 101: this->mutex->destroy(this->mutex);
! 102: this->condvar->destroy(this->condvar);
! 103: free(this);
! 104: }
! 105: }
! 106:
! 107: /**
! 108: * Data to pass to async accept job
! 109: */
! 110: typedef struct {
! 111: /** callback function */
! 112: stream_service_cb_t cb;
! 113: /** callback data */
! 114: void *data;
! 115: /** accepted connection */
! 116: int fd;
! 117: /** reference to stream service */
! 118: private_stream_service_t *this;
! 119: } async_data_t;
! 120:
! 121: /**
! 122: * Forward declaration
! 123: */
! 124: static bool watch(private_stream_service_t *this, int fd, watcher_event_t event);
! 125:
! 126: /**
! 127: * Clean up accept data
! 128: */
! 129: static void destroy_async_data(async_data_t *data)
! 130: {
! 131: private_stream_service_t *this = data->this;
! 132:
! 133: this->mutex->lock(this->mutex);
! 134: if (this->active-- == this->cncrncy && !this->terminated)
! 135: {
! 136: /* leaving concurrency limit, restart accept()ing. */
! 137: lib->watcher->add(lib->watcher, this->fd,
! 138: WATCHER_READ, (watcher_cb_t)watch, this);
! 139: }
! 140: this->condvar->signal(this->condvar);
! 141: this->mutex->unlock(this->mutex);
! 142: destroy_service(this);
! 143:
! 144: if (data->fd != -1)
! 145: {
! 146: close(data->fd);
! 147: }
! 148: free(data);
! 149: }
! 150:
! 151: /**
! 152: * Reduce running counter
! 153: */
! 154: CALLBACK(reduce_running, void,
! 155: async_data_t *data)
! 156: {
! 157: private_stream_service_t *this = data->this;
! 158:
! 159: this->mutex->lock(this->mutex);
! 160: this->running--;
! 161: this->condvar->signal(this->condvar);
! 162: this->mutex->unlock(this->mutex);
! 163: }
! 164:
! 165: /**
! 166: * Async processing of accepted connection
! 167: */
! 168: static job_requeue_t accept_async(async_data_t *data)
! 169: {
! 170: private_stream_service_t *this = data->this;
! 171: stream_t *stream;
! 172:
! 173: this->mutex->lock(this->mutex);
! 174: if (this->terminated)
! 175: {
! 176: this->mutex->unlock(this->mutex);
! 177: return JOB_REQUEUE_NONE;
! 178: }
! 179: this->running++;
! 180: this->mutex->unlock(this->mutex);
! 181:
! 182: stream = stream_create_from_fd(data->fd);
! 183: if (stream)
! 184: {
! 185: /* FD is now owned by stream, don't close it during cleanup */
! 186: data->fd = -1;
! 187: thread_cleanup_push(reduce_running, data);
! 188: thread_cleanup_push((void*)stream->destroy, stream);
! 189: thread_cleanup_pop(!data->cb(data->data, stream));
! 190: thread_cleanup_pop(TRUE);
! 191: }
! 192: return JOB_REQUEUE_NONE;
! 193: }
! 194:
! 195: /**
! 196: * Watcher callback function
! 197: */
! 198: static bool watch(private_stream_service_t *this, int fd, watcher_event_t event)
! 199: {
! 200: async_data_t *data;
! 201: bool keep = TRUE;
! 202:
! 203: INIT(data,
! 204: .cb = this->cb,
! 205: .data = this->data,
! 206: .fd = accept(fd, NULL, NULL),
! 207: .this = this,
! 208: );
! 209:
! 210: if (data->fd != -1 && !this->terminated)
! 211: {
! 212: this->mutex->lock(this->mutex);
! 213: if (++this->active == this->cncrncy)
! 214: {
! 215: /* concurrency limit reached, stop accept()ing new connections */
! 216: keep = FALSE;
! 217: }
! 218: this->mutex->unlock(this->mutex);
! 219: ref_get(&this->ref);
! 220:
! 221: lib->processor->queue_job(lib->processor,
! 222: (job_t*)callback_job_create_with_prio((void*)accept_async, data,
! 223: (void*)destroy_async_data, (callback_job_cancel_t)return_false,
! 224: this->prio));
! 225: }
! 226: else
! 227: {
! 228: free(data);
! 229: }
! 230: return keep;
! 231: }
! 232:
! 233: METHOD(stream_service_t, on_accept, void,
! 234: private_stream_service_t *this, stream_service_cb_t cb, void *data,
! 235: job_priority_t prio, u_int cncrncy)
! 236: {
! 237: this->mutex->lock(this->mutex);
! 238:
! 239: if (this->terminated)
! 240: {
! 241: this->mutex->unlock(this->mutex);
! 242: return;
! 243: }
! 244:
! 245: /* wait for all callbacks to return */
! 246: while (this->active)
! 247: {
! 248: this->condvar->wait(this->condvar, this->mutex);
! 249: }
! 250:
! 251: if (this->cb)
! 252: {
! 253: lib->watcher->remove(lib->watcher, this->fd);
! 254: }
! 255:
! 256: this->cb = cb;
! 257: this->data = data;
! 258: if (prio <= JOB_PRIO_MAX)
! 259: {
! 260: this->prio = prio;
! 261: }
! 262: this->cncrncy = cncrncy;
! 263:
! 264: if (this->cb)
! 265: {
! 266: lib->watcher->add(lib->watcher, this->fd,
! 267: WATCHER_READ, (watcher_cb_t)watch, this);
! 268: }
! 269:
! 270: this->mutex->unlock(this->mutex);
! 271: }
! 272:
! 273: METHOD(stream_service_t, destroy, void,
! 274: private_stream_service_t *this)
! 275: {
! 276: this->mutex->lock(this->mutex);
! 277: lib->watcher->remove(lib->watcher, this->fd);
! 278: this->terminated = TRUE;
! 279: while (this->running)
! 280: {
! 281: this->condvar->wait(this->condvar, this->mutex);
! 282: }
! 283: this->mutex->unlock(this->mutex);
! 284: destroy_service(this);
! 285: }
! 286:
! 287: /**
! 288: * See header
! 289: */
! 290: stream_service_t *stream_service_create_from_fd(int fd)
! 291: {
! 292: private_stream_service_t *this;
! 293:
! 294: INIT(this,
! 295: .public = {
! 296: .on_accept = _on_accept,
! 297: .destroy = _destroy,
! 298: },
! 299: .fd = fd,
! 300: .prio = JOB_PRIO_MEDIUM,
! 301: .mutex = mutex_create(MUTEX_TYPE_RECURSIVE),
! 302: .condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
! 303: .ref = 1,
! 304: );
! 305:
! 306: return &this->public;
! 307: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>