Annotation of embedaddon/strongswan/src/libstrongswan/networking/streams/stream_service.c, revision 1.1.1.1
1.1 misho 1: /*
2: * Copyright (C) 2013 Martin Willi
3: * Copyright (C) 2013 revosec AG
4: *
5: * This program is free software; you can redistribute it and/or modify it
6: * under the terms of the GNU General Public License as published by the
7: * Free Software Foundation; either version 2 of the License, or (at your
8: * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
9: *
10: * This program is distributed in the hope that it will be useful, but
11: * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12: * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13: * for more details.
14: */
15:
16: #include <library.h>
17: #include <threading/thread.h>
18: #include <threading/mutex.h>
19: #include <threading/condvar.h>
20: #include <processing/jobs/callback_job.h>
21:
22: #include "stream_service.h"
23:
24: #include <errno.h>
25: #include <unistd.h>
26: #include <sys/stat.h>
27:
28: typedef struct private_stream_service_t private_stream_service_t;
29:
30: /**
31: * Private data of an stream_service_t object.
32: */
33: struct private_stream_service_t {
34:
35: /**
36: * Public stream_service_t interface.
37: */
38: stream_service_t public;
39:
40: /**
41: * Underlying socket
42: */
43: int fd;
44:
45: /**
46: * Accept callback
47: */
48: stream_service_cb_t cb;
49:
50: /**
51: * Accept callback data
52: */
53: void *data;
54:
55: /**
56: * Job priority to invoke callback with
57: */
58: job_priority_t prio;
59:
60: /**
61: * Maximum number of parallel callback invocations
62: */
63: u_int cncrncy;
64:
65: /**
66: * Currently active jobs
67: */
68: u_int active;
69:
70: /**
71: * Currently running jobs
72: */
73: u_int running;
74:
75: /**
76: * mutex to lock active counter
77: */
78: mutex_t *mutex;
79:
80: /**
81: * Condvar to wait for callback termination
82: */
83: condvar_t *condvar;
84:
85: /**
86: * TRUE when the service is terminated
87: */
88: bool terminated;
89:
90: /**
91: * Reference counter
92: */
93: refcount_t ref;
94: };
95:
96: static void destroy_service(private_stream_service_t *this)
97: {
98: if (ref_put(&this->ref))
99: {
100: close(this->fd);
101: this->mutex->destroy(this->mutex);
102: this->condvar->destroy(this->condvar);
103: free(this);
104: }
105: }
106:
107: /**
108: * Data to pass to async accept job
109: */
110: typedef struct {
111: /** callback function */
112: stream_service_cb_t cb;
113: /** callback data */
114: void *data;
115: /** accepted connection */
116: int fd;
117: /** reference to stream service */
118: private_stream_service_t *this;
119: } async_data_t;
120:
121: /**
122: * Forward declaration
123: */
124: static bool watch(private_stream_service_t *this, int fd, watcher_event_t event);
125:
126: /**
127: * Clean up accept data
128: */
129: static void destroy_async_data(async_data_t *data)
130: {
131: private_stream_service_t *this = data->this;
132:
133: this->mutex->lock(this->mutex);
134: if (this->active-- == this->cncrncy && !this->terminated)
135: {
136: /* leaving concurrency limit, restart accept()ing. */
137: lib->watcher->add(lib->watcher, this->fd,
138: WATCHER_READ, (watcher_cb_t)watch, this);
139: }
140: this->condvar->signal(this->condvar);
141: this->mutex->unlock(this->mutex);
142: destroy_service(this);
143:
144: if (data->fd != -1)
145: {
146: close(data->fd);
147: }
148: free(data);
149: }
150:
151: /**
152: * Reduce running counter
153: */
154: CALLBACK(reduce_running, void,
155: async_data_t *data)
156: {
157: private_stream_service_t *this = data->this;
158:
159: this->mutex->lock(this->mutex);
160: this->running--;
161: this->condvar->signal(this->condvar);
162: this->mutex->unlock(this->mutex);
163: }
164:
165: /**
166: * Async processing of accepted connection
167: */
168: static job_requeue_t accept_async(async_data_t *data)
169: {
170: private_stream_service_t *this = data->this;
171: stream_t *stream;
172:
173: this->mutex->lock(this->mutex);
174: if (this->terminated)
175: {
176: this->mutex->unlock(this->mutex);
177: return JOB_REQUEUE_NONE;
178: }
179: this->running++;
180: this->mutex->unlock(this->mutex);
181:
182: stream = stream_create_from_fd(data->fd);
183: if (stream)
184: {
185: /* FD is now owned by stream, don't close it during cleanup */
186: data->fd = -1;
187: thread_cleanup_push(reduce_running, data);
188: thread_cleanup_push((void*)stream->destroy, stream);
189: thread_cleanup_pop(!data->cb(data->data, stream));
190: thread_cleanup_pop(TRUE);
191: }
192: return JOB_REQUEUE_NONE;
193: }
194:
195: /**
196: * Watcher callback function
197: */
198: static bool watch(private_stream_service_t *this, int fd, watcher_event_t event)
199: {
200: async_data_t *data;
201: bool keep = TRUE;
202:
203: INIT(data,
204: .cb = this->cb,
205: .data = this->data,
206: .fd = accept(fd, NULL, NULL),
207: .this = this,
208: );
209:
210: if (data->fd != -1 && !this->terminated)
211: {
212: this->mutex->lock(this->mutex);
213: if (++this->active == this->cncrncy)
214: {
215: /* concurrency limit reached, stop accept()ing new connections */
216: keep = FALSE;
217: }
218: this->mutex->unlock(this->mutex);
219: ref_get(&this->ref);
220:
221: lib->processor->queue_job(lib->processor,
222: (job_t*)callback_job_create_with_prio((void*)accept_async, data,
223: (void*)destroy_async_data, (callback_job_cancel_t)return_false,
224: this->prio));
225: }
226: else
227: {
228: free(data);
229: }
230: return keep;
231: }
232:
233: METHOD(stream_service_t, on_accept, void,
234: private_stream_service_t *this, stream_service_cb_t cb, void *data,
235: job_priority_t prio, u_int cncrncy)
236: {
237: this->mutex->lock(this->mutex);
238:
239: if (this->terminated)
240: {
241: this->mutex->unlock(this->mutex);
242: return;
243: }
244:
245: /* wait for all callbacks to return */
246: while (this->active)
247: {
248: this->condvar->wait(this->condvar, this->mutex);
249: }
250:
251: if (this->cb)
252: {
253: lib->watcher->remove(lib->watcher, this->fd);
254: }
255:
256: this->cb = cb;
257: this->data = data;
258: if (prio <= JOB_PRIO_MAX)
259: {
260: this->prio = prio;
261: }
262: this->cncrncy = cncrncy;
263:
264: if (this->cb)
265: {
266: lib->watcher->add(lib->watcher, this->fd,
267: WATCHER_READ, (watcher_cb_t)watch, this);
268: }
269:
270: this->mutex->unlock(this->mutex);
271: }
272:
273: METHOD(stream_service_t, destroy, void,
274: private_stream_service_t *this)
275: {
276: this->mutex->lock(this->mutex);
277: lib->watcher->remove(lib->watcher, this->fd);
278: this->terminated = TRUE;
279: while (this->running)
280: {
281: this->condvar->wait(this->condvar, this->mutex);
282: }
283: this->mutex->unlock(this->mutex);
284: destroy_service(this);
285: }
286:
287: /**
288: * See header
289: */
290: stream_service_t *stream_service_create_from_fd(int fd)
291: {
292: private_stream_service_t *this;
293:
294: INIT(this,
295: .public = {
296: .on_accept = _on_accept,
297: .destroy = _destroy,
298: },
299: .fd = fd,
300: .prio = JOB_PRIO_MEDIUM,
301: .mutex = mutex_create(MUTEX_TYPE_RECURSIVE),
302: .condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
303: .ref = 1,
304: );
305:
306: return &this->public;
307: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>