Annotation of embedaddon/strongswan/src/libcharon/plugins/vici/vici_socket.c, revision 1.1.1.2
1.1 misho 1: /*
2: * Copyright (C) 2014 Martin Willi
3: * Copyright (C) 2014 revosec AG
4: *
5: * This program is free software; you can redistribute it and/or modify it
6: * under the terms of the GNU General Public License as published by the
7: * Free Software Foundation; either version 2 of the License, or (at your
8: * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
9: *
10: * This program is distributed in the hope that it will be useful, but
11: * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12: * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13: * for more details.
14: */
15:
16: #include "vici_socket.h"
17:
18: #include <threading/mutex.h>
19: #include <threading/condvar.h>
20: #include <threading/thread.h>
21: #include <collections/array.h>
22: #include <collections/linked_list.h>
23: #include <processing/jobs/callback_job.h>
24:
25: #include <errno.h>
26: #include <string.h>
27:
28: typedef struct private_vici_socket_t private_vici_socket_t;
29:
30: /**
31: * Private members of vici_socket_t
32: */
33: struct private_vici_socket_t {
34:
35: /**
36: * public functions
37: */
38: vici_socket_t public;
39:
40: /**
41: * Inbound message callback
42: */
43: vici_inbound_cb_t inbound;
44:
45: /**
46: * Client connect callback
47: */
48: vici_connect_cb_t connect;
49:
50: /**
51: * Client disconnect callback
52: */
53: vici_disconnect_cb_t disconnect;
54:
55: /**
56: * Next client connection identifier
57: */
58: u_int nextid;
59:
60: /**
61: * User data for callbacks
62: */
63: void *user;
64:
65: /**
66: * Service accepting vici connections
67: */
68: stream_service_t *service;
69:
70: /**
71: * Client connections, as entry_t
72: */
73: linked_list_t *connections;
74:
75: /**
76: * mutex for client connections
77: */
78: mutex_t *mutex;
79: };
80:
81: /**
82: * Data to securely reference an entry
83: */
84: typedef struct {
85: /* reference to socket instance */
86: private_vici_socket_t *this;
87: /** connection identifier of entry */
88: u_int id;
89: } entry_selector_t;
90:
91: /**
92: * Partially processed message
93: */
94: typedef struct {
95: /** bytes of length header sent/received */
96: u_char hdrlen;
97: /** bytes of length header */
98: char hdr[sizeof(uint32_t)];
99: /** send/receive buffer on heap */
100: chunk_t buf;
101: /** bytes sent/received in buffer */
102: uint32_t done;
103: } msg_buf_t;
104:
105: /**
106: * Client connection entry
107: */
108: typedef struct {
109: /** reference to socket */
110: private_vici_socket_t *this;
111: /** associated stream */
112: stream_t *stream;
113: /** queued messages to send, as msg_buf_t pointers */
114: array_t *out;
115: /** input message buffer */
116: msg_buf_t in;
117: /** queued input messages to process, as chunk_t */
118: array_t *queue;
119: /** do we have job processing input queue? */
120: bool has_processor;
121: /** is this client disconnecting */
122: bool disconnecting;
123: /** client connection identifier */
124: u_int id;
125: /** any users reading over this connection? */
126: int readers;
127: /** any users writing over this connection? */
128: int writers;
129: /** condvar to wait for usage */
130: condvar_t *cond;
131: } entry_t;
132:
133: /**
134: * Destroy an connection entry
135: */
136: CALLBACK(destroy_entry, void,
137: entry_t *entry)
138: {
139: msg_buf_t *out;
140: chunk_t chunk;
141:
142: entry->stream->destroy(entry->stream);
143: entry->this->disconnect(entry->this->user, entry->id);
144: entry->cond->destroy(entry->cond);
145:
146: while (array_remove(entry->out, ARRAY_TAIL, &out))
147: {
148: chunk_clear(&out->buf);
149: free(out);
150: }
151: array_destroy(entry->out);
152: while (array_remove(entry->queue, ARRAY_TAIL, &chunk))
153: {
154: chunk_clear(&chunk);
155: }
156: array_destroy(entry->queue);
157: chunk_clear(&entry->in.buf);
158: free(entry);
159: }
160:
161: /**
162: * Find entry by stream (if given) or id, claim use
163: */
164: static entry_t* find_entry(private_vici_socket_t *this, stream_t *stream,
165: u_int id, bool reader, bool writer)
166: {
167: enumerator_t *enumerator;
168: entry_t *entry, *found = NULL;
169: bool candidate = TRUE;
170:
171: this->mutex->lock(this->mutex);
172: while (candidate && !found)
173: {
174: candidate = FALSE;
175: enumerator = this->connections->create_enumerator(this->connections);
176: while (enumerator->enumerate(enumerator, &entry))
177: {
178: if (stream)
179: {
180: if (entry->stream != stream)
181: {
182: continue;
183: }
184: }
185: else
186: {
187: if (entry->id != id)
188: {
189: continue;
190: }
191: }
192: if (entry->disconnecting)
193: {
194: continue;
195: }
196: candidate = TRUE;
197:
198: if ((reader && entry->readers) ||
199: (writer && entry->writers))
200: {
201: entry->cond->wait(entry->cond, this->mutex);
202: break;
203: }
204: if (reader)
205: {
206: entry->readers++;
207: }
208: if (writer)
209: {
210: entry->writers++;
211: }
212: found = entry;
213: break;
214: }
215: enumerator->destroy(enumerator);
216: }
217: this->mutex->unlock(this->mutex);
218:
219: return found;
220: }
221:
222: /**
223: * Remove entry by id, claim use
224: */
225: static entry_t* remove_entry(private_vici_socket_t *this, u_int id)
226: {
227: enumerator_t *enumerator;
228: entry_t *entry, *found = NULL;
229: bool candidate = TRUE;
230:
231: this->mutex->lock(this->mutex);
232: while (candidate && !found)
233: {
234: candidate = FALSE;
235: enumerator = this->connections->create_enumerator(this->connections);
236: while (enumerator->enumerate(enumerator, &entry))
237: {
238: if (entry->id == id)
239: {
240: candidate = TRUE;
241: if (entry->readers || entry->writers)
242: {
243: entry->cond->wait(entry->cond, this->mutex);
244: break;
245: }
246: this->connections->remove_at(this->connections, enumerator);
247: found = entry;
248: break;
249: }
250: }
251: enumerator->destroy(enumerator);
252: }
253: this->mutex->unlock(this->mutex);
254:
255: return found;
256: }
257:
258: /**
259: * Release a claimed entry
260: */
261: static void put_entry(private_vici_socket_t *this, entry_t *entry,
262: bool reader, bool writer)
263: {
264: this->mutex->lock(this->mutex);
265: if (reader)
266: {
267: entry->readers--;
268: }
269: if (writer)
270: {
271: entry->writers--;
272: }
273: entry->cond->signal(entry->cond);
274: this->mutex->unlock(this->mutex);
275: }
276:
277: /**
278: * Asynchronous callback to disconnect client
279: */
280: CALLBACK(disconnect_async, job_requeue_t,
281: entry_selector_t *sel)
282: {
283: entry_t *entry;
284:
285: entry = remove_entry(sel->this, sel->id);
286: if (entry)
287: {
288: destroy_entry(entry);
289: }
290: return JOB_REQUEUE_NONE;
291: }
292:
293: /**
294: * Disconnect a connected client
295: */
296: static void disconnect(private_vici_socket_t *this, u_int id)
297: {
298: entry_selector_t *sel;
299:
300: INIT(sel,
301: .this = this,
302: .id = id,
303: );
304:
305: lib->processor->queue_job(lib->processor,
306: (job_t*)callback_job_create(disconnect_async, sel, free, NULL));
307: }
308:
309: /**
310: * Write queued output data
311: */
312: static bool do_write(private_vici_socket_t *this, entry_t *entry,
1.1.1.2 ! misho 313: stream_t *stream, char *errmsg, size_t errlen, bool block)
1.1 misho 314: {
315: msg_buf_t *out;
316: ssize_t len;
317:
318: while (array_get(entry->out, ARRAY_HEAD, &out))
319: {
320: /* write header */
321: while (out->hdrlen < sizeof(out->hdr))
322: {
323: len = stream->write(stream, out->hdr + out->hdrlen,
1.1.1.2 ! misho 324: sizeof(out->hdr) - out->hdrlen, block);
1.1 misho 325: if (len == 0)
326: {
327: return FALSE;
328: }
329: if (len < 0)
330: {
331: if (errno == EWOULDBLOCK)
332: {
333: return TRUE;
334: }
335: snprintf(errmsg, errlen, "vici header write error: %s",
336: strerror(errno));
337: return FALSE;
338: }
339: out->hdrlen += len;
340: }
341:
342: /* write buffer buffer */
343: while (out->buf.len > out->done)
344: {
345: len = stream->write(stream, out->buf.ptr + out->done,
1.1.1.2 ! misho 346: out->buf.len - out->done, block);
1.1 misho 347: if (len == 0)
348: {
349: snprintf(errmsg, errlen, "premature vici disconnect");
350: return FALSE;
351: }
352: if (len < 0)
353: {
354: if (errno == EWOULDBLOCK)
355: {
356: return TRUE;
357: }
358: snprintf(errmsg, errlen, "vici write error: %s", strerror(errno));
359: return FALSE;
360: }
361: out->done += len;
362: }
363:
364: if (array_remove(entry->out, ARRAY_HEAD, &out))
365: {
366: chunk_clear(&out->buf);
367: free(out);
368: }
369: }
370: return TRUE;
371: }
372:
373: /**
374: * Send pending messages
375: */
376: CALLBACK(on_write, bool,
377: private_vici_socket_t *this, stream_t *stream)
378: {
379: char errmsg[256] = "";
380: entry_t *entry;
381: bool ret = FALSE;
382:
383: entry = find_entry(this, stream, 0, FALSE, TRUE);
384: if (entry)
385: {
1.1.1.2 ! misho 386: ret = do_write(this, entry, stream, errmsg, sizeof(errmsg), FALSE);
1.1 misho 387: if (ret)
388: {
389: /* unregister if we have no more messages to send */
390: ret = array_count(entry->out) != 0;
391: }
392: else
393: {
394: entry->disconnecting = TRUE;
395: disconnect(entry->this, entry->id);
396: }
397: put_entry(this, entry, FALSE, TRUE);
398:
399: if (!ret && errmsg[0])
400: {
401: DBG1(DBG_CFG, errmsg);
402: }
403: }
404:
405: return ret;
406: }
407:
408: /**
409: * Read in available header with data, non-blocking accumulating to buffer
410: */
411: static bool do_read(private_vici_socket_t *this, entry_t *entry,
412: stream_t *stream, char *errmsg, size_t errlen)
413: {
414: uint32_t msglen;
415: ssize_t len;
416:
417: /* assemble the length header first */
418: while (entry->in.hdrlen < sizeof(entry->in.hdr))
419: {
420: len = stream->read(stream, entry->in.hdr + entry->in.hdrlen,
421: sizeof(entry->in.hdr) - entry->in.hdrlen, FALSE);
422: if (len == 0)
423: {
424: return FALSE;
425: }
426: if (len < 0)
427: {
428: if (errno == EWOULDBLOCK)
429: {
430: return TRUE;
431: }
432: snprintf(errmsg, errlen, "vici header read error: %s",
433: strerror(errno));
434: return FALSE;
435: }
436: entry->in.hdrlen += len;
437: if (entry->in.hdrlen == sizeof(entry->in.hdr))
438: {
439: msglen = untoh32(entry->in.hdr);
440: if (msglen > VICI_MESSAGE_SIZE_MAX)
441: {
442: snprintf(errmsg, errlen, "vici message length %u exceeds %u "
443: "bytes limit, ignored", msglen, VICI_MESSAGE_SIZE_MAX);
444: return FALSE;
445: }
446: /* header complete, continue with data */
447: entry->in.buf = chunk_alloc(msglen);
448: }
449: }
450:
451: /* assemble buffer */
452: while (entry->in.buf.len > entry->in.done)
453: {
454: len = stream->read(stream, entry->in.buf.ptr + entry->in.done,
455: entry->in.buf.len - entry->in.done, FALSE);
456: if (len == 0)
457: {
458: snprintf(errmsg, errlen, "premature vici disconnect");
459: return FALSE;
460: }
461: if (len < 0)
462: {
463: if (errno == EWOULDBLOCK)
464: {
465: return TRUE;
466: }
467: snprintf(errmsg, errlen, "vici read error: %s", strerror(errno));
468: return FALSE;
469: }
470: entry->in.done += len;
471: }
472:
473: return TRUE;
474: }
475:
476: /**
477: * Callback processing incoming requests in strict order
478: */
479: CALLBACK(process_queue, job_requeue_t,
480: entry_selector_t *sel)
481: {
482: entry_t *entry;
483: chunk_t chunk;
484: bool found;
485: u_int id;
486:
487: while (TRUE)
488: {
489: entry = find_entry(sel->this, NULL, sel->id, TRUE, FALSE);
490: if (!entry)
491: {
492: break;
493: }
494:
495: found = array_remove(entry->queue, ARRAY_HEAD, &chunk);
496: if (!found)
497: {
498: entry->has_processor = FALSE;
499: }
500: id = entry->id;
501: put_entry(sel->this, entry, TRUE, FALSE);
502: if (!found)
503: {
504: break;
505: }
506:
507: thread_cleanup_push(free, chunk.ptr);
508: sel->this->inbound(sel->this->user, id, chunk);
509: thread_cleanup_pop(TRUE);
510: }
511: return JOB_REQUEUE_NONE;
512: }
513:
514: /**
515: * Process incoming messages
516: */
517: CALLBACK(on_read, bool,
518: private_vici_socket_t *this, stream_t *stream)
519: {
520: char errmsg[256] = "";
521: entry_selector_t *sel;
522: entry_t *entry;
523: bool ret = FALSE;
524:
525: entry = find_entry(this, stream, 0, TRUE, FALSE);
526: if (entry)
527: {
528: ret = do_read(this, entry, stream, errmsg, sizeof(errmsg));
529: if (!ret)
530: {
531: entry->disconnecting = TRUE;
532: disconnect(this, entry->id);
533: }
534: else if (entry->in.hdrlen == sizeof(entry->in.hdr) &&
535: entry->in.buf.len == entry->in.done)
536: {
537: array_insert(entry->queue, ARRAY_TAIL, &entry->in.buf);
538: entry->in.buf = chunk_empty;
539: entry->in.hdrlen = entry->in.done = 0;
540:
541: if (!entry->has_processor)
542: {
543: INIT(sel,
544: .this = this,
545: .id = entry->id,
546: );
547: lib->processor->queue_job(lib->processor,
548: (job_t*)callback_job_create(process_queue,
549: sel, free, NULL));
550: entry->has_processor = TRUE;
551: }
552: }
553: put_entry(this, entry, TRUE, FALSE);
554:
555: if (!ret && errmsg[0])
556: {
557: DBG1(DBG_CFG, errmsg);
558: }
559: }
560:
561: return ret;
562: }
563:
564: /**
565: * Process connection request
566: */
567: CALLBACK(on_accept, bool,
568: private_vici_socket_t *this, stream_t *stream)
569: {
570: entry_t *entry;
571: u_int id;
572:
573: id = ref_get(&this->nextid);
574:
575: INIT(entry,
576: .this = this,
577: .stream = stream,
578: .id = id,
579: .out = array_create(0, 0),
580: .queue = array_create(sizeof(chunk_t), 0),
581: .cond = condvar_create(CONDVAR_TYPE_DEFAULT),
582: .readers = 1,
583: );
584:
585: this->mutex->lock(this->mutex);
586: this->connections->insert_last(this->connections, entry);
587: this->mutex->unlock(this->mutex);
588:
589: stream->on_read(stream, on_read, this);
590:
591: put_entry(this, entry, TRUE, FALSE);
592:
593: this->connect(this->user, id);
594:
595: return TRUE;
596: }
597:
598: /**
599: * Async callback to enable writer
600: */
601: CALLBACK(enable_writer, job_requeue_t,
602: entry_selector_t *sel)
603: {
604: entry_t *entry;
605:
606: entry = find_entry(sel->this, NULL, sel->id, FALSE, TRUE);
607: if (entry)
608: {
609: entry->stream->on_write(entry->stream, on_write, sel->this);
610: put_entry(sel->this, entry, FALSE, TRUE);
611: }
612: return JOB_REQUEUE_NONE;
613: }
614:
615: METHOD(vici_socket_t, send_, void,
616: private_vici_socket_t *this, u_int id, chunk_t msg)
617: {
618: if (msg.len <= VICI_MESSAGE_SIZE_MAX)
619: {
620: entry_selector_t *sel;
621: msg_buf_t *out;
622: entry_t *entry;
623:
624: entry = find_entry(this, NULL, id, FALSE, TRUE);
625: if (entry)
626: {
627: INIT(out,
628: .buf = msg,
629: );
630: htoun32(out->hdr, msg.len);
631:
632: array_insert(entry->out, ARRAY_TAIL, out);
633: if (array_count(entry->out) == 1)
634: { /* asynchronously re-enable on_write callback when we get data */
635: INIT(sel,
636: .this = this,
637: .id = entry->id,
638: );
639: lib->processor->queue_job(lib->processor,
640: (job_t*)callback_job_create(enable_writer,
641: sel, free, NULL));
642: }
643: put_entry(this, entry, FALSE, TRUE);
644: }
645: else
646: {
647: DBG1(DBG_CFG, "vici connection %u unknown", id);
648: chunk_clear(&msg);
649: }
650: }
651: else
652: {
653: DBG1(DBG_CFG, "vici message size %zu exceeds maximum size of %u, "
654: "discarded", msg.len, VICI_MESSAGE_SIZE_MAX);
655: chunk_clear(&msg);
656: }
657: }
658:
1.1.1.2 ! misho 659: CALLBACK(flush_messages, void,
! 660: entry_t *entry, va_list args)
! 661: {
! 662: private_vici_socket_t *this;
! 663: char errmsg[256] = "";
! 664: bool ret;
! 665:
! 666: VA_ARGS_VGET(args, this);
! 667:
! 668: /* no need for any locking as no other threads are running, the connections
! 669: * all get disconnected afterwards, so error handling is simple too */
! 670: ret = do_write(this, entry, entry->stream, errmsg, sizeof(errmsg), TRUE);
! 671:
! 672: if (!ret && errmsg[0])
! 673: {
! 674: DBG1(DBG_CFG, errmsg);
! 675: }
! 676: }
! 677:
1.1 misho 678: METHOD(vici_socket_t, destroy, void,
679: private_vici_socket_t *this)
680: {
681: DESTROY_IF(this->service);
1.1.1.2 ! misho 682: this->connections->invoke_function(this->connections, flush_messages, this);
1.1 misho 683: this->connections->destroy_function(this->connections, destroy_entry);
684: this->mutex->destroy(this->mutex);
685: free(this);
686: }
687:
688: /*
689: * see header file
690: */
691: vici_socket_t *vici_socket_create(char *uri, vici_inbound_cb_t inbound,
692: vici_connect_cb_t connect,
693: vici_disconnect_cb_t disconnect, void *user)
694: {
695: private_vici_socket_t *this;
696:
697: INIT(this,
698: .public = {
699: .send = _send_,
700: .destroy = _destroy,
701: },
702: .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
703: .connections = linked_list_create(),
704: .inbound = inbound,
705: .connect = connect,
706: .disconnect = disconnect,
707: .user = user,
708: );
709:
710: this->service = lib->streams->create_service(lib->streams, uri, 3);
711: if (!this->service)
712: {
713: DBG1(DBG_CFG, "creating vici socket failed");
714: destroy(this);
715: return NULL;
716: }
717: this->service->on_accept(this->service, on_accept, this,
718: JOB_PRIO_CRITICAL, 0);
719:
720: return &this->public;
721: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>