Annotation of embedaddon/strongswan/src/libstrongswan/processing/watcher.c, revision 1.1
1.1 ! misho 1: /*
! 2: * Copyright (C) 2016 Tobias Brunner
! 3: * HSR Hochschule fuer Technik Rapperswil
! 4: *
! 5: * Copyright (C) 2013 Martin Willi
! 6: * Copyright (C) 2013 revosec AG
! 7: *
! 8: * This program is free software; you can redistribute it and/or modify it
! 9: * under the terms of the GNU General Public License as published by the
! 10: * Free Software Foundation; either version 2 of the License, or (at your
! 11: * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
! 12: *
! 13: * This program is distributed in the hope that it will be useful, but
! 14: * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
! 15: * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
! 16: * for more details.
! 17: */
! 18:
! 19: #include "watcher.h"
! 20:
! 21: #include <library.h>
! 22: #include <threading/thread.h>
! 23: #include <threading/mutex.h>
! 24: #include <threading/condvar.h>
! 25: #include <collections/linked_list.h>
! 26: #include <processing/jobs/callback_job.h>
! 27:
! 28: #include <unistd.h>
! 29: #include <errno.h>
! 30: #include <fcntl.h>
! 31:
! 32: typedef struct private_watcher_t private_watcher_t;
! 33: typedef struct entry_t entry_t;
! 34:
! 35: /**
! 36: * Private data of an watcher_t object.
! 37: */
! 38: struct private_watcher_t {
! 39:
! 40: /**
! 41: * Public watcher_t interface.
! 42: */
! 43: watcher_t public;
! 44:
! 45: /**
! 46: * List of registered FDs
! 47: */
! 48: entry_t *fds;
! 49:
! 50: /**
! 51: * Last registered FD
! 52: */
! 53: entry_t *last;
! 54:
! 55: /**
! 56: * Number of registered FDs
! 57: */
! 58: u_int count;
! 59:
! 60: /**
! 61: * Pending update of FD list?
! 62: */
! 63: bool pending;
! 64:
! 65: /**
! 66: * Running state of watcher
! 67: */
! 68: watcher_state_t state;
! 69:
! 70: /**
! 71: * Lock to access FD list
! 72: */
! 73: mutex_t *mutex;
! 74:
! 75: /**
! 76: * Condvar to signal completion of callback
! 77: */
! 78: condvar_t *condvar;
! 79:
! 80: /**
! 81: * Notification pipe to signal watcher thread
! 82: */
! 83: int notify[2];
! 84:
! 85: /**
! 86: * List of callback jobs to process by watcher thread, as job_t
! 87: */
! 88: linked_list_t *jobs;
! 89: };
! 90:
! 91: /**
! 92: * Entry for a registered file descriptor
! 93: */
! 94: struct entry_t {
! 95: /** file descriptor */
! 96: int fd;
! 97: /** events to watch */
! 98: watcher_event_t events;
! 99: /** registered callback function */
! 100: watcher_cb_t cb;
! 101: /** user data to pass to callback */
! 102: void *data;
! 103: /** callback(s) currently active? */
! 104: int in_callback;
! 105: /** next registered fd */
! 106: entry_t *next;
! 107: };
! 108:
! 109: /**
! 110: * Adds the given entry at the end of the list
! 111: */
! 112: static void add_entry(private_watcher_t *this, entry_t *entry)
! 113: {
! 114: if (this->last)
! 115: {
! 116: this->last->next = entry;
! 117: this->last = entry;
! 118: }
! 119: else
! 120: {
! 121: this->fds = this->last = entry;
! 122: }
! 123: this->count++;
! 124: }
! 125:
! 126: /**
! 127: * Removes and frees the given entry
! 128: *
! 129: * Updates the previous entry and returns the next entry in the list, if any.
! 130: */
! 131: static entry_t *remove_entry(private_watcher_t *this, entry_t *entry,
! 132: entry_t *prev)
! 133: {
! 134: entry_t *next = entry->next;
! 135:
! 136: if (prev)
! 137: {
! 138: prev->next = next;
! 139: }
! 140: else
! 141: {
! 142: this->fds = next;
! 143: }
! 144: if (this->last == entry)
! 145: {
! 146: this->last = prev;
! 147: }
! 148: this->count--;
! 149: free(entry);
! 150: return next;
! 151: }
! 152:
! 153: /**
! 154: * Data we pass on for an async notification
! 155: */
! 156: typedef struct {
! 157: /** file descriptor */
! 158: int fd;
! 159: /** event type */
! 160: watcher_event_t event;
! 161: /** registered callback function */
! 162: watcher_cb_t cb;
! 163: /** user data to pass to callback */
! 164: void *data;
! 165: /** keep registered? */
! 166: bool keep;
! 167: /** reference to watcher */
! 168: private_watcher_t *this;
! 169: } notify_data_t;
! 170:
! 171: /**
! 172: * Notify watcher thread about changes
! 173: */
! 174: static void update(private_watcher_t *this)
! 175: {
! 176: char buf[1] = { 'u' };
! 177:
! 178: this->pending = TRUE;
! 179: if (this->notify[1] != -1)
! 180: {
! 181: if (write(this->notify[1], buf, sizeof(buf)) == -1)
! 182: {
! 183: DBG1(DBG_JOB, "notifying watcher failed: %s", strerror(errno));
! 184: }
! 185: }
! 186: }
! 187:
! 188: /**
! 189: * Cleanup function if callback gets cancelled
! 190: */
! 191: static void unregister(notify_data_t *data)
! 192: {
! 193: /* if a thread processing a callback gets cancelled, we mark the entry
! 194: * as cancelled, like the callback would return FALSE. This is required
! 195: * to not queue this watcher again if all threads have been gone. */
! 196: data->keep = FALSE;
! 197: }
! 198:
! 199: /**
! 200: * Execute callback of registered FD, asynchronous
! 201: */
! 202: static job_requeue_t notify_async(notify_data_t *data)
! 203: {
! 204: thread_cleanup_push((void*)unregister, data);
! 205: data->keep = data->cb(data->data, data->fd, data->event);
! 206: thread_cleanup_pop(FALSE);
! 207: return JOB_REQUEUE_NONE;
! 208: }
! 209:
! 210: /**
! 211: * Clean up notification data, reactivate FD
! 212: */
! 213: static void notify_end(notify_data_t *data)
! 214: {
! 215: private_watcher_t *this = data->this;
! 216: entry_t *entry, *prev = NULL;
! 217:
! 218: /* reactivate the disabled entry */
! 219: this->mutex->lock(this->mutex);
! 220: for (entry = this->fds; entry; prev = entry, entry = entry->next)
! 221: {
! 222: if (entry->fd == data->fd)
! 223: {
! 224: if (!data->keep)
! 225: {
! 226: entry->events &= ~data->event;
! 227: if (!entry->events)
! 228: {
! 229: remove_entry(this, entry, prev);
! 230: break;
! 231: }
! 232: }
! 233: entry->in_callback--;
! 234: break;
! 235: }
! 236: }
! 237: update(this);
! 238: this->condvar->broadcast(this->condvar);
! 239: this->mutex->unlock(this->mutex);
! 240:
! 241: free(data);
! 242: }
! 243:
! 244: /**
! 245: * Execute the callback for a registered FD
! 246: */
! 247: static void notify(private_watcher_t *this, entry_t *entry,
! 248: watcher_event_t event)
! 249: {
! 250: notify_data_t *data;
! 251:
! 252: /* get a copy of entry for async job, but with specific event */
! 253: INIT(data,
! 254: .fd = entry->fd,
! 255: .event = event,
! 256: .cb = entry->cb,
! 257: .data = entry->data,
! 258: .keep = TRUE,
! 259: .this = this,
! 260: );
! 261:
! 262: /* deactivate entry, so we can select() other FDs even if the async
! 263: * processing did not handle the event yet */
! 264: entry->in_callback++;
! 265:
! 266: this->jobs->insert_last(this->jobs,
! 267: callback_job_create_with_prio((void*)notify_async, data,
! 268: (void*)notify_end, (callback_job_cancel_t)return_false,
! 269: JOB_PRIO_CRITICAL));
! 270: }
! 271:
! 272: /**
! 273: * Thread cancellation function for watcher thread
! 274: */
! 275: static void activate_all(private_watcher_t *this)
! 276: {
! 277: entry_t *entry;
! 278:
! 279: /* When the watcher thread gets cancelled, we have to reactivate any entry
! 280: * and signal threads in remove() to go on. */
! 281:
! 282: this->mutex->lock(this->mutex);
! 283: for (entry = this->fds; entry; entry = entry->next)
! 284: {
! 285: entry->in_callback = 0;
! 286: }
! 287: this->state = WATCHER_STOPPED;
! 288: this->condvar->broadcast(this->condvar);
! 289: this->mutex->unlock(this->mutex);
! 290: }
! 291:
! 292: /**
! 293: * Find flagged revents in a pollfd set by fd
! 294: */
! 295: static inline int find_revents(struct pollfd *pfd, int count, int fd)
! 296: {
! 297: int i;
! 298:
! 299: for (i = 0; i < count; i++)
! 300: {
! 301: if (pfd[i].fd == fd)
! 302: {
! 303: return pfd[i].revents;
! 304: }
! 305: }
! 306: return 0;
! 307: }
! 308:
! 309: /**
! 310: * Check if entry is waiting for a specific event, and if it got signaled
! 311: */
! 312: static inline bool entry_ready(entry_t *entry, watcher_event_t event,
! 313: int revents)
! 314: {
! 315: if (entry->events & event)
! 316: {
! 317: switch (event)
! 318: {
! 319: case WATCHER_READ:
! 320: return (revents & (POLLIN | POLLHUP | POLLNVAL)) != 0;
! 321: case WATCHER_WRITE:
! 322: return (revents & (POLLOUT | POLLHUP | POLLNVAL)) != 0;
! 323: case WATCHER_EXCEPT:
! 324: return (revents & (POLLERR | POLLHUP | POLLNVAL)) != 0;
! 325: }
! 326: }
! 327: return FALSE;
! 328: }
! 329:
! 330: /**
! 331: * Dispatching function
! 332: */
! 333: static job_requeue_t watch(private_watcher_t *this)
! 334: {
! 335: entry_t *entry;
! 336: struct pollfd *pfd;
! 337: int count = 0, res;
! 338: bool rebuild = FALSE;
! 339:
! 340: this->mutex->lock(this->mutex);
! 341:
! 342: count = this->count;
! 343: if (!count)
! 344: {
! 345: this->state = WATCHER_STOPPED;
! 346: this->mutex->unlock(this->mutex);
! 347: return JOB_REQUEUE_NONE;
! 348: }
! 349: if (this->state == WATCHER_QUEUED)
! 350: {
! 351: this->state = WATCHER_RUNNING;
! 352: }
! 353:
! 354: pfd = alloca(sizeof(*pfd) * (count + 1));
! 355: pfd[0].fd = this->notify[0];
! 356: pfd[0].events = POLLIN;
! 357: count = 1;
! 358:
! 359: for (entry = this->fds; entry; entry = entry->next)
! 360: {
! 361: if (!entry->in_callback)
! 362: {
! 363: pfd[count].fd = entry->fd;
! 364: pfd[count].events = 0;
! 365: if (entry->events & WATCHER_READ)
! 366: {
! 367: DBG3(DBG_JOB, " watching %d for reading", entry->fd);
! 368: pfd[count].events |= POLLIN;
! 369: }
! 370: if (entry->events & WATCHER_WRITE)
! 371: {
! 372: DBG3(DBG_JOB, " watching %d for writing", entry->fd);
! 373: pfd[count].events |= POLLOUT;
! 374: }
! 375: if (entry->events & WATCHER_EXCEPT)
! 376: {
! 377: DBG3(DBG_JOB, " watching %d for exceptions", entry->fd);
! 378: pfd[count].events |= POLLERR;
! 379: }
! 380: count++;
! 381: }
! 382: }
! 383: this->mutex->unlock(this->mutex);
! 384:
! 385: while (!rebuild)
! 386: {
! 387: int revents;
! 388: char buf[1];
! 389: bool old;
! 390: ssize_t len;
! 391: job_t *job;
! 392:
! 393: DBG2(DBG_JOB, "watcher going to poll() %d fds", count);
! 394: thread_cleanup_push((void*)activate_all, this);
! 395: old = thread_cancelability(TRUE);
! 396:
! 397: res = poll(pfd, count, -1);
! 398: if (res == -1 && errno == EINTR)
! 399: {
! 400: /* LinuxThreads interrupts poll(), but does not make it a
! 401: * cancellation point. Manually test if we got cancelled. */
! 402: thread_cancellation_point();
! 403: }
! 404:
! 405: thread_cancelability(old);
! 406: thread_cleanup_pop(FALSE);
! 407:
! 408: if (res > 0)
! 409: {
! 410: if (pfd[0].revents & POLLIN)
! 411: {
! 412: while (TRUE)
! 413: {
! 414: len = read(this->notify[0], buf, sizeof(buf));
! 415: if (len == -1)
! 416: {
! 417: if (errno != EAGAIN && errno != EWOULDBLOCK)
! 418: {
! 419: DBG1(DBG_JOB, "reading watcher notify failed: %s",
! 420: strerror(errno));
! 421: }
! 422: break;
! 423: }
! 424: }
! 425: this->pending = FALSE;
! 426: DBG2(DBG_JOB, "watcher got notification, rebuilding");
! 427: return JOB_REQUEUE_DIRECT;
! 428: }
! 429:
! 430: this->mutex->lock(this->mutex);
! 431: for (entry = this->fds; entry; entry = entry->next)
! 432: {
! 433: if (entry->in_callback)
! 434: {
! 435: rebuild = TRUE;
! 436: break;
! 437: }
! 438: revents = find_revents(pfd, count, entry->fd);
! 439: if (entry_ready(entry, WATCHER_EXCEPT, revents))
! 440: {
! 441: DBG2(DBG_JOB, "watched FD %d has exception", entry->fd);
! 442: notify(this, entry, WATCHER_EXCEPT);
! 443: }
! 444: else
! 445: {
! 446: if (entry_ready(entry, WATCHER_READ, revents))
! 447: {
! 448: DBG2(DBG_JOB, "watched FD %d ready to read", entry->fd);
! 449: notify(this, entry, WATCHER_READ);
! 450: }
! 451: if (entry_ready(entry, WATCHER_WRITE, revents))
! 452: {
! 453: DBG2(DBG_JOB, "watched FD %d ready to write", entry->fd);
! 454: notify(this, entry, WATCHER_WRITE);
! 455: }
! 456: }
! 457: }
! 458: this->mutex->unlock(this->mutex);
! 459:
! 460: if (this->jobs->get_count(this->jobs))
! 461: {
! 462: while (this->jobs->remove_first(this->jobs,
! 463: (void**)&job) == SUCCESS)
! 464: {
! 465: lib->processor->execute_job(lib->processor, job);
! 466: }
! 467: /* we temporarily disable a notified FD, rebuild FDSET */
! 468: return JOB_REQUEUE_DIRECT;
! 469: }
! 470: }
! 471: else
! 472: {
! 473: if (!this->pending && errno != EINTR)
! 474: { /* complain only if no pending updates */
! 475: DBG1(DBG_JOB, "watcher poll() error: %s", strerror(errno));
! 476: }
! 477: return JOB_REQUEUE_DIRECT;
! 478: }
! 479: }
! 480: return JOB_REQUEUE_DIRECT;
! 481: }
! 482:
! 483: METHOD(watcher_t, add, void,
! 484: private_watcher_t *this, int fd, watcher_event_t events,
! 485: watcher_cb_t cb, void *data)
! 486: {
! 487: entry_t *entry;
! 488:
! 489: INIT(entry,
! 490: .fd = fd,
! 491: .events = events,
! 492: .cb = cb,
! 493: .data = data,
! 494: );
! 495:
! 496: this->mutex->lock(this->mutex);
! 497: add_entry(this, entry);
! 498: if (this->state == WATCHER_STOPPED)
! 499: {
! 500: this->state = WATCHER_QUEUED;
! 501: lib->processor->queue_job(lib->processor,
! 502: (job_t*)callback_job_create_with_prio((void*)watch, this,
! 503: NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
! 504: }
! 505: else
! 506: {
! 507: update(this);
! 508: }
! 509: this->mutex->unlock(this->mutex);
! 510: }
! 511:
! 512: METHOD(watcher_t, remove_, void,
! 513: private_watcher_t *this, int fd)
! 514: {
! 515: entry_t *entry, *prev = NULL;
! 516: bool found = FALSE;
! 517:
! 518: this->mutex->lock(this->mutex);
! 519: while (TRUE)
! 520: {
! 521: bool is_in_callback = FALSE;
! 522:
! 523: entry = this->fds;
! 524: while (entry)
! 525: {
! 526: if (entry->fd == fd)
! 527: {
! 528: if (this->state != WATCHER_STOPPED && entry->in_callback)
! 529: {
! 530: is_in_callback = TRUE;
! 531: break;
! 532: }
! 533: entry = remove_entry(this, entry, prev);
! 534: found = TRUE;
! 535: continue;
! 536: }
! 537: prev = entry;
! 538: entry = entry->next;
! 539: }
! 540: if (!is_in_callback)
! 541: {
! 542: break;
! 543: }
! 544: this->condvar->wait(this->condvar, this->mutex);
! 545: }
! 546: if (found)
! 547: {
! 548: update(this);
! 549: }
! 550: this->mutex->unlock(this->mutex);
! 551: }
! 552:
! 553: METHOD(watcher_t, get_state, watcher_state_t,
! 554: private_watcher_t *this)
! 555: {
! 556: watcher_state_t state;
! 557:
! 558: this->mutex->lock(this->mutex);
! 559: state = this->state;
! 560: this->mutex->unlock(this->mutex);
! 561:
! 562: return state;
! 563: }
! 564:
! 565: METHOD(watcher_t, destroy, void,
! 566: private_watcher_t *this)
! 567: {
! 568: this->mutex->destroy(this->mutex);
! 569: this->condvar->destroy(this->condvar);
! 570: if (this->notify[0] != -1)
! 571: {
! 572: close(this->notify[0]);
! 573: }
! 574: if (this->notify[1] != -1)
! 575: {
! 576: close(this->notify[1]);
! 577: }
! 578: this->jobs->destroy(this->jobs);
! 579: free(this);
! 580: }
! 581:
! 582: #ifdef WIN32
! 583:
! 584: /**
! 585: * Create notify pipe with a TCP socketpair
! 586: */
! 587: static bool create_notify(private_watcher_t *this)
! 588: {
! 589: u_long on = 1;
! 590:
! 591: if (socketpair(AF_INET, SOCK_STREAM, 0, this->notify) == 0)
! 592: {
! 593: /* use non-blocking I/O on read-end of notify pipe */
! 594: if (ioctlsocket(this->notify[0], FIONBIO, &on) == 0)
! 595: {
! 596: return TRUE;
! 597: }
! 598: DBG1(DBG_LIB, "setting watcher notify pipe read-end non-blocking "
! 599: "failed: %s", strerror(errno));
! 600: }
! 601: return FALSE;
! 602: }
! 603:
! 604: #else /* !WIN32 */
! 605:
! 606: /**
! 607: * Create a notify pipe with a one-directional pipe
! 608: */
! 609: static bool create_notify(private_watcher_t *this)
! 610: {
! 611: int flags;
! 612:
! 613: if (pipe(this->notify) == 0)
! 614: {
! 615: /* use non-blocking I/O on read-end of notify pipe */
! 616: flags = fcntl(this->notify[0], F_GETFL);
! 617: if (flags != -1 &&
! 618: fcntl(this->notify[0], F_SETFL, flags | O_NONBLOCK) != -1)
! 619: {
! 620: return TRUE;
! 621: }
! 622: DBG1(DBG_LIB, "setting watcher notify pipe read-end non-blocking "
! 623: "failed: %s", strerror(errno));
! 624: }
! 625: return FALSE;
! 626: }
! 627:
! 628: #endif /* !WIN32 */
! 629:
! 630: /**
! 631: * See header
! 632: */
! 633: watcher_t *watcher_create()
! 634: {
! 635: private_watcher_t *this;
! 636:
! 637: INIT(this,
! 638: .public = {
! 639: .add = _add,
! 640: .remove = _remove_,
! 641: .get_state = _get_state,
! 642: .destroy = _destroy,
! 643: },
! 644: .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
! 645: .condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
! 646: .jobs = linked_list_create(),
! 647: .notify = {-1, -1},
! 648: .state = WATCHER_STOPPED,
! 649: );
! 650:
! 651: if (!create_notify(this))
! 652: {
! 653: DBG1(DBG_LIB, "creating watcher notify pipe failed: %s",
! 654: strerror(errno));
! 655: }
! 656: return &this->public;
! 657: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>