Annotation of embedaddon/strongswan/src/libstrongswan/processing/watcher.c, revision 1.1.1.1
1.1 misho 1: /*
2: * Copyright (C) 2016 Tobias Brunner
3: * HSR Hochschule fuer Technik Rapperswil
4: *
5: * Copyright (C) 2013 Martin Willi
6: * Copyright (C) 2013 revosec AG
7: *
8: * This program is free software; you can redistribute it and/or modify it
9: * under the terms of the GNU General Public License as published by the
10: * Free Software Foundation; either version 2 of the License, or (at your
11: * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
12: *
13: * This program is distributed in the hope that it will be useful, but
14: * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
15: * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
16: * for more details.
17: */
18:
19: #include "watcher.h"
20:
21: #include <library.h>
22: #include <threading/thread.h>
23: #include <threading/mutex.h>
24: #include <threading/condvar.h>
25: #include <collections/linked_list.h>
26: #include <processing/jobs/callback_job.h>
27:
28: #include <unistd.h>
29: #include <errno.h>
30: #include <fcntl.h>
31:
32: typedef struct private_watcher_t private_watcher_t;
33: typedef struct entry_t entry_t;
34:
35: /**
36: * Private data of an watcher_t object.
37: */
38: struct private_watcher_t {
39:
40: /**
41: * Public watcher_t interface.
42: */
43: watcher_t public;
44:
45: /**
46: * List of registered FDs
47: */
48: entry_t *fds;
49:
50: /**
51: * Last registered FD
52: */
53: entry_t *last;
54:
55: /**
56: * Number of registered FDs
57: */
58: u_int count;
59:
60: /**
61: * Pending update of FD list?
62: */
63: bool pending;
64:
65: /**
66: * Running state of watcher
67: */
68: watcher_state_t state;
69:
70: /**
71: * Lock to access FD list
72: */
73: mutex_t *mutex;
74:
75: /**
76: * Condvar to signal completion of callback
77: */
78: condvar_t *condvar;
79:
80: /**
81: * Notification pipe to signal watcher thread
82: */
83: int notify[2];
84:
85: /**
86: * List of callback jobs to process by watcher thread, as job_t
87: */
88: linked_list_t *jobs;
89: };
90:
91: /**
92: * Entry for a registered file descriptor
93: */
94: struct entry_t {
95: /** file descriptor */
96: int fd;
97: /** events to watch */
98: watcher_event_t events;
99: /** registered callback function */
100: watcher_cb_t cb;
101: /** user data to pass to callback */
102: void *data;
103: /** callback(s) currently active? */
104: int in_callback;
105: /** next registered fd */
106: entry_t *next;
107: };
108:
109: /**
110: * Adds the given entry at the end of the list
111: */
112: static void add_entry(private_watcher_t *this, entry_t *entry)
113: {
114: if (this->last)
115: {
116: this->last->next = entry;
117: this->last = entry;
118: }
119: else
120: {
121: this->fds = this->last = entry;
122: }
123: this->count++;
124: }
125:
126: /**
127: * Removes and frees the given entry
128: *
129: * Updates the previous entry and returns the next entry in the list, if any.
130: */
131: static entry_t *remove_entry(private_watcher_t *this, entry_t *entry,
132: entry_t *prev)
133: {
134: entry_t *next = entry->next;
135:
136: if (prev)
137: {
138: prev->next = next;
139: }
140: else
141: {
142: this->fds = next;
143: }
144: if (this->last == entry)
145: {
146: this->last = prev;
147: }
148: this->count--;
149: free(entry);
150: return next;
151: }
152:
153: /**
154: * Data we pass on for an async notification
155: */
156: typedef struct {
157: /** file descriptor */
158: int fd;
159: /** event type */
160: watcher_event_t event;
161: /** registered callback function */
162: watcher_cb_t cb;
163: /** user data to pass to callback */
164: void *data;
165: /** keep registered? */
166: bool keep;
167: /** reference to watcher */
168: private_watcher_t *this;
169: } notify_data_t;
170:
171: /**
172: * Notify watcher thread about changes
173: */
174: static void update(private_watcher_t *this)
175: {
176: char buf[1] = { 'u' };
177:
178: this->pending = TRUE;
179: if (this->notify[1] != -1)
180: {
181: if (write(this->notify[1], buf, sizeof(buf)) == -1)
182: {
183: DBG1(DBG_JOB, "notifying watcher failed: %s", strerror(errno));
184: }
185: }
186: }
187:
188: /**
189: * Cleanup function if callback gets cancelled
190: */
191: static void unregister(notify_data_t *data)
192: {
193: /* if a thread processing a callback gets cancelled, we mark the entry
194: * as cancelled, like the callback would return FALSE. This is required
195: * to not queue this watcher again if all threads have been gone. */
196: data->keep = FALSE;
197: }
198:
199: /**
200: * Execute callback of registered FD, asynchronous
201: */
202: static job_requeue_t notify_async(notify_data_t *data)
203: {
204: thread_cleanup_push((void*)unregister, data);
205: data->keep = data->cb(data->data, data->fd, data->event);
206: thread_cleanup_pop(FALSE);
207: return JOB_REQUEUE_NONE;
208: }
209:
210: /**
211: * Clean up notification data, reactivate FD
212: */
213: static void notify_end(notify_data_t *data)
214: {
215: private_watcher_t *this = data->this;
216: entry_t *entry, *prev = NULL;
217:
218: /* reactivate the disabled entry */
219: this->mutex->lock(this->mutex);
220: for (entry = this->fds; entry; prev = entry, entry = entry->next)
221: {
222: if (entry->fd == data->fd)
223: {
224: if (!data->keep)
225: {
226: entry->events &= ~data->event;
227: if (!entry->events)
228: {
229: remove_entry(this, entry, prev);
230: break;
231: }
232: }
233: entry->in_callback--;
234: break;
235: }
236: }
237: update(this);
238: this->condvar->broadcast(this->condvar);
239: this->mutex->unlock(this->mutex);
240:
241: free(data);
242: }
243:
244: /**
245: * Execute the callback for a registered FD
246: */
247: static void notify(private_watcher_t *this, entry_t *entry,
248: watcher_event_t event)
249: {
250: notify_data_t *data;
251:
252: /* get a copy of entry for async job, but with specific event */
253: INIT(data,
254: .fd = entry->fd,
255: .event = event,
256: .cb = entry->cb,
257: .data = entry->data,
258: .keep = TRUE,
259: .this = this,
260: );
261:
262: /* deactivate entry, so we can select() other FDs even if the async
263: * processing did not handle the event yet */
264: entry->in_callback++;
265:
266: this->jobs->insert_last(this->jobs,
267: callback_job_create_with_prio((void*)notify_async, data,
268: (void*)notify_end, (callback_job_cancel_t)return_false,
269: JOB_PRIO_CRITICAL));
270: }
271:
272: /**
273: * Thread cancellation function for watcher thread
274: */
275: static void activate_all(private_watcher_t *this)
276: {
277: entry_t *entry;
278:
279: /* When the watcher thread gets cancelled, we have to reactivate any entry
280: * and signal threads in remove() to go on. */
281:
282: this->mutex->lock(this->mutex);
283: for (entry = this->fds; entry; entry = entry->next)
284: {
285: entry->in_callback = 0;
286: }
287: this->state = WATCHER_STOPPED;
288: this->condvar->broadcast(this->condvar);
289: this->mutex->unlock(this->mutex);
290: }
291:
292: /**
293: * Find flagged revents in a pollfd set by fd
294: */
295: static inline int find_revents(struct pollfd *pfd, int count, int fd)
296: {
297: int i;
298:
299: for (i = 0; i < count; i++)
300: {
301: if (pfd[i].fd == fd)
302: {
303: return pfd[i].revents;
304: }
305: }
306: return 0;
307: }
308:
309: /**
310: * Check if entry is waiting for a specific event, and if it got signaled
311: */
312: static inline bool entry_ready(entry_t *entry, watcher_event_t event,
313: int revents)
314: {
315: if (entry->events & event)
316: {
317: switch (event)
318: {
319: case WATCHER_READ:
320: return (revents & (POLLIN | POLLHUP | POLLNVAL)) != 0;
321: case WATCHER_WRITE:
322: return (revents & (POLLOUT | POLLHUP | POLLNVAL)) != 0;
323: case WATCHER_EXCEPT:
324: return (revents & (POLLERR | POLLHUP | POLLNVAL)) != 0;
325: }
326: }
327: return FALSE;
328: }
329:
330: /**
331: * Dispatching function
332: */
333: static job_requeue_t watch(private_watcher_t *this)
334: {
335: entry_t *entry;
336: struct pollfd *pfd;
337: int count = 0, res;
338: bool rebuild = FALSE;
339:
340: this->mutex->lock(this->mutex);
341:
342: count = this->count;
343: if (!count)
344: {
345: this->state = WATCHER_STOPPED;
346: this->mutex->unlock(this->mutex);
347: return JOB_REQUEUE_NONE;
348: }
349: if (this->state == WATCHER_QUEUED)
350: {
351: this->state = WATCHER_RUNNING;
352: }
353:
354: pfd = alloca(sizeof(*pfd) * (count + 1));
355: pfd[0].fd = this->notify[0];
356: pfd[0].events = POLLIN;
357: count = 1;
358:
359: for (entry = this->fds; entry; entry = entry->next)
360: {
361: if (!entry->in_callback)
362: {
363: pfd[count].fd = entry->fd;
364: pfd[count].events = 0;
365: if (entry->events & WATCHER_READ)
366: {
367: DBG3(DBG_JOB, " watching %d for reading", entry->fd);
368: pfd[count].events |= POLLIN;
369: }
370: if (entry->events & WATCHER_WRITE)
371: {
372: DBG3(DBG_JOB, " watching %d for writing", entry->fd);
373: pfd[count].events |= POLLOUT;
374: }
375: if (entry->events & WATCHER_EXCEPT)
376: {
377: DBG3(DBG_JOB, " watching %d for exceptions", entry->fd);
378: pfd[count].events |= POLLERR;
379: }
380: count++;
381: }
382: }
383: this->mutex->unlock(this->mutex);
384:
385: while (!rebuild)
386: {
387: int revents;
388: char buf[1];
389: bool old;
390: ssize_t len;
391: job_t *job;
392:
393: DBG2(DBG_JOB, "watcher going to poll() %d fds", count);
394: thread_cleanup_push((void*)activate_all, this);
395: old = thread_cancelability(TRUE);
396:
397: res = poll(pfd, count, -1);
398: if (res == -1 && errno == EINTR)
399: {
400: /* LinuxThreads interrupts poll(), but does not make it a
401: * cancellation point. Manually test if we got cancelled. */
402: thread_cancellation_point();
403: }
404:
405: thread_cancelability(old);
406: thread_cleanup_pop(FALSE);
407:
408: if (res > 0)
409: {
410: if (pfd[0].revents & POLLIN)
411: {
412: while (TRUE)
413: {
414: len = read(this->notify[0], buf, sizeof(buf));
415: if (len == -1)
416: {
417: if (errno != EAGAIN && errno != EWOULDBLOCK)
418: {
419: DBG1(DBG_JOB, "reading watcher notify failed: %s",
420: strerror(errno));
421: }
422: break;
423: }
424: }
425: this->pending = FALSE;
426: DBG2(DBG_JOB, "watcher got notification, rebuilding");
427: return JOB_REQUEUE_DIRECT;
428: }
429:
430: this->mutex->lock(this->mutex);
431: for (entry = this->fds; entry; entry = entry->next)
432: {
433: if (entry->in_callback)
434: {
435: rebuild = TRUE;
436: break;
437: }
438: revents = find_revents(pfd, count, entry->fd);
439: if (entry_ready(entry, WATCHER_EXCEPT, revents))
440: {
441: DBG2(DBG_JOB, "watched FD %d has exception", entry->fd);
442: notify(this, entry, WATCHER_EXCEPT);
443: }
444: else
445: {
446: if (entry_ready(entry, WATCHER_READ, revents))
447: {
448: DBG2(DBG_JOB, "watched FD %d ready to read", entry->fd);
449: notify(this, entry, WATCHER_READ);
450: }
451: if (entry_ready(entry, WATCHER_WRITE, revents))
452: {
453: DBG2(DBG_JOB, "watched FD %d ready to write", entry->fd);
454: notify(this, entry, WATCHER_WRITE);
455: }
456: }
457: }
458: this->mutex->unlock(this->mutex);
459:
460: if (this->jobs->get_count(this->jobs))
461: {
462: while (this->jobs->remove_first(this->jobs,
463: (void**)&job) == SUCCESS)
464: {
465: lib->processor->execute_job(lib->processor, job);
466: }
467: /* we temporarily disable a notified FD, rebuild FDSET */
468: return JOB_REQUEUE_DIRECT;
469: }
470: }
471: else
472: {
473: if (!this->pending && errno != EINTR)
474: { /* complain only if no pending updates */
475: DBG1(DBG_JOB, "watcher poll() error: %s", strerror(errno));
476: }
477: return JOB_REQUEUE_DIRECT;
478: }
479: }
480: return JOB_REQUEUE_DIRECT;
481: }
482:
483: METHOD(watcher_t, add, void,
484: private_watcher_t *this, int fd, watcher_event_t events,
485: watcher_cb_t cb, void *data)
486: {
487: entry_t *entry;
488:
489: INIT(entry,
490: .fd = fd,
491: .events = events,
492: .cb = cb,
493: .data = data,
494: );
495:
496: this->mutex->lock(this->mutex);
497: add_entry(this, entry);
498: if (this->state == WATCHER_STOPPED)
499: {
500: this->state = WATCHER_QUEUED;
501: lib->processor->queue_job(lib->processor,
502: (job_t*)callback_job_create_with_prio((void*)watch, this,
503: NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
504: }
505: else
506: {
507: update(this);
508: }
509: this->mutex->unlock(this->mutex);
510: }
511:
512: METHOD(watcher_t, remove_, void,
513: private_watcher_t *this, int fd)
514: {
515: entry_t *entry, *prev = NULL;
516: bool found = FALSE;
517:
518: this->mutex->lock(this->mutex);
519: while (TRUE)
520: {
521: bool is_in_callback = FALSE;
522:
523: entry = this->fds;
524: while (entry)
525: {
526: if (entry->fd == fd)
527: {
528: if (this->state != WATCHER_STOPPED && entry->in_callback)
529: {
530: is_in_callback = TRUE;
531: break;
532: }
533: entry = remove_entry(this, entry, prev);
534: found = TRUE;
535: continue;
536: }
537: prev = entry;
538: entry = entry->next;
539: }
540: if (!is_in_callback)
541: {
542: break;
543: }
544: this->condvar->wait(this->condvar, this->mutex);
545: }
546: if (found)
547: {
548: update(this);
549: }
550: this->mutex->unlock(this->mutex);
551: }
552:
553: METHOD(watcher_t, get_state, watcher_state_t,
554: private_watcher_t *this)
555: {
556: watcher_state_t state;
557:
558: this->mutex->lock(this->mutex);
559: state = this->state;
560: this->mutex->unlock(this->mutex);
561:
562: return state;
563: }
564:
565: METHOD(watcher_t, destroy, void,
566: private_watcher_t *this)
567: {
568: this->mutex->destroy(this->mutex);
569: this->condvar->destroy(this->condvar);
570: if (this->notify[0] != -1)
571: {
572: close(this->notify[0]);
573: }
574: if (this->notify[1] != -1)
575: {
576: close(this->notify[1]);
577: }
578: this->jobs->destroy(this->jobs);
579: free(this);
580: }
581:
582: #ifdef WIN32
583:
584: /**
585: * Create notify pipe with a TCP socketpair
586: */
587: static bool create_notify(private_watcher_t *this)
588: {
589: u_long on = 1;
590:
591: if (socketpair(AF_INET, SOCK_STREAM, 0, this->notify) == 0)
592: {
593: /* use non-blocking I/O on read-end of notify pipe */
594: if (ioctlsocket(this->notify[0], FIONBIO, &on) == 0)
595: {
596: return TRUE;
597: }
598: DBG1(DBG_LIB, "setting watcher notify pipe read-end non-blocking "
599: "failed: %s", strerror(errno));
600: }
601: return FALSE;
602: }
603:
604: #else /* !WIN32 */
605:
606: /**
607: * Create a notify pipe with a one-directional pipe
608: */
609: static bool create_notify(private_watcher_t *this)
610: {
611: int flags;
612:
613: if (pipe(this->notify) == 0)
614: {
615: /* use non-blocking I/O on read-end of notify pipe */
616: flags = fcntl(this->notify[0], F_GETFL);
617: if (flags != -1 &&
618: fcntl(this->notify[0], F_SETFL, flags | O_NONBLOCK) != -1)
619: {
620: return TRUE;
621: }
622: DBG1(DBG_LIB, "setting watcher notify pipe read-end non-blocking "
623: "failed: %s", strerror(errno));
624: }
625: return FALSE;
626: }
627:
628: #endif /* !WIN32 */
629:
630: /**
631: * See header
632: */
633: watcher_t *watcher_create()
634: {
635: private_watcher_t *this;
636:
637: INIT(this,
638: .public = {
639: .add = _add,
640: .remove = _remove_,
641: .get_state = _get_state,
642: .destroy = _destroy,
643: },
644: .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
645: .condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
646: .jobs = linked_list_create(),
647: .notify = {-1, -1},
648: .state = WATCHER_STOPPED,
649: );
650:
651: if (!create_notify(this))
652: {
653: DBG1(DBG_LIB, "creating watcher notify pipe failed: %s",
654: strerror(errno));
655: }
656: return &this->public;
657: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>