Annotation of embedaddon/ntp/lib/isc/task.c, revision 1.1.1.1
1.1 misho 1: /*
2: * Copyright (C) 2004-2008 Internet Systems Consortium, Inc. ("ISC")
3: * Copyright (C) 1998-2003 Internet Software Consortium.
4: *
5: * Permission to use, copy, modify, and/or distribute this software for any
6: * purpose with or without fee is hereby granted, provided that the above
7: * copyright notice and this permission notice appear in all copies.
8: *
9: * THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
10: * REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
11: * AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
12: * INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
13: * LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
14: * OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
15: * PERFORMANCE OF THIS SOFTWARE.
16: */
17:
18: /* $Id: task.c,v 1.107 2008/03/27 23:46:57 tbox Exp $ */
19:
20: /*! \file
21: * \author Principal Author: Bob Halley
22: */
23:
24: /*
25: * XXXRTH Need to document the states a task can be in, and the rules
26: * for changing states.
27: */
28:
29: #include <config.h>
30:
31: #include <isc/condition.h>
32: #include <isc/event.h>
33: #include <isc/magic.h>
34: #include <isc/mem.h>
35: #include <isc/msgs.h>
36: #include <isc/platform.h>
37: #include <isc/string.h>
38: #include <isc/task.h>
39: #include <isc/thread.h>
40: #include <isc/util.h>
41: #include <isc/xml.h>
42:
43: #ifndef ISC_PLATFORM_USETHREADS
44: #include "task_p.h"
45: #endif /* ISC_PLATFORM_USETHREADS */
46:
47: #ifdef ISC_TASK_TRACE
48: #define XTRACE(m) fprintf(stderr, "task %p thread %lu: %s\n", \
49: task, isc_thread_self(), (m))
50: #define XTTRACE(t, m) fprintf(stderr, "task %p thread %lu: %s\n", \
51: (t), isc_thread_self(), (m))
52: #define XTHREADTRACE(m) fprintf(stderr, "thread %lu: %s\n", \
53: isc_thread_self(), (m))
54: #else
55: #define XTRACE(m)
56: #define XTTRACE(t, m)
57: #define XTHREADTRACE(m)
58: #endif
59:
60: /***
61: *** Types.
62: ***/
63:
64: typedef enum {
65: task_state_idle, task_state_ready, task_state_running,
66: task_state_done
67: } task_state_t;
68:
69: #ifdef HAVE_LIBXML2
70: static const char *statenames[] = {
71: "idle", "ready", "running", "done",
72: };
73: #endif
74:
75: #define TASK_MAGIC ISC_MAGIC('T', 'A', 'S', 'K')
76: #define VALID_TASK(t) ISC_MAGIC_VALID(t, TASK_MAGIC)
77:
78: struct isc_task {
79: /* Not locked. */
80: unsigned int magic;
81: isc_taskmgr_t * manager;
82: isc_mutex_t lock;
83: /* Locked by task lock. */
84: task_state_t state;
85: unsigned int references;
86: isc_eventlist_t events;
87: isc_eventlist_t on_shutdown;
88: unsigned int quantum;
89: unsigned int flags;
90: isc_stdtime_t now;
91: char name[16];
92: void * tag;
93: /* Locked by task manager lock. */
94: LINK(isc_task_t) link;
95: LINK(isc_task_t) ready_link;
96: };
97:
98: #define TASK_F_SHUTTINGDOWN 0x01
99:
100: #define TASK_SHUTTINGDOWN(t) (((t)->flags & TASK_F_SHUTTINGDOWN) \
101: != 0)
102:
103: #define TASK_MANAGER_MAGIC ISC_MAGIC('T', 'S', 'K', 'M')
104: #define VALID_MANAGER(m) ISC_MAGIC_VALID(m, TASK_MANAGER_MAGIC)
105:
106: struct isc_taskmgr {
107: /* Not locked. */
108: unsigned int magic;
109: isc_mem_t * mctx;
110: isc_mutex_t lock;
111: #ifdef ISC_PLATFORM_USETHREADS
112: unsigned int workers;
113: isc_thread_t * threads;
114: #endif /* ISC_PLATFORM_USETHREADS */
115: /* Locked by task manager lock. */
116: unsigned int default_quantum;
117: LIST(isc_task_t) tasks;
118: isc_tasklist_t ready_tasks;
119: #ifdef ISC_PLATFORM_USETHREADS
120: isc_condition_t work_available;
121: isc_condition_t exclusive_granted;
122: #endif /* ISC_PLATFORM_USETHREADS */
123: unsigned int tasks_running;
124: isc_boolean_t exclusive_requested;
125: isc_boolean_t exiting;
126: #ifndef ISC_PLATFORM_USETHREADS
127: unsigned int refs;
128: #endif /* ISC_PLATFORM_USETHREADS */
129: };
130:
131: #define DEFAULT_TASKMGR_QUANTUM 10
132: #define DEFAULT_DEFAULT_QUANTUM 5
133: #define FINISHED(m) ((m)->exiting && EMPTY((m)->tasks))
134:
135: #ifndef ISC_PLATFORM_USETHREADS
136: static isc_taskmgr_t *taskmgr = NULL;
137: #endif /* ISC_PLATFORM_USETHREADS */
138:
139: /***
140: *** Tasks.
141: ***/
142:
143: static void
144: task_finished(isc_task_t *task) {
145: isc_taskmgr_t *manager = task->manager;
146:
147: REQUIRE(EMPTY(task->events));
148: REQUIRE(EMPTY(task->on_shutdown));
149: REQUIRE(task->references == 0);
150: REQUIRE(task->state == task_state_done);
151:
152: XTRACE("task_finished");
153:
154: LOCK(&manager->lock);
155: UNLINK(manager->tasks, task, link);
156: #ifdef ISC_PLATFORM_USETHREADS
157: if (FINISHED(manager)) {
158: /*
159: * All tasks have completed and the
160: * task manager is exiting. Wake up
161: * any idle worker threads so they
162: * can exit.
163: */
164: BROADCAST(&manager->work_available);
165: }
166: #endif /* ISC_PLATFORM_USETHREADS */
167: UNLOCK(&manager->lock);
168:
169: DESTROYLOCK(&task->lock);
170: task->magic = 0;
171: isc_mem_put(manager->mctx, task, sizeof(*task));
172: }
173:
174: isc_result_t
175: isc_task_create(isc_taskmgr_t *manager, unsigned int quantum,
176: isc_task_t **taskp)
177: {
178: isc_task_t *task;
179: isc_boolean_t exiting;
180: isc_result_t result;
181:
182: REQUIRE(VALID_MANAGER(manager));
183: REQUIRE(taskp != NULL && *taskp == NULL);
184:
185: task = isc_mem_get(manager->mctx, sizeof(*task));
186: if (task == NULL)
187: return (ISC_R_NOMEMORY);
188: XTRACE("isc_task_create");
189: task->manager = manager;
190: result = isc_mutex_init(&task->lock);
191: if (result != ISC_R_SUCCESS) {
192: isc_mem_put(manager->mctx, task, sizeof(*task));
193: return (result);
194: }
195: task->state = task_state_idle;
196: task->references = 1;
197: INIT_LIST(task->events);
198: INIT_LIST(task->on_shutdown);
199: task->quantum = quantum;
200: task->flags = 0;
201: task->now = 0;
202: memset(task->name, 0, sizeof(task->name));
203: task->tag = NULL;
204: INIT_LINK(task, link);
205: INIT_LINK(task, ready_link);
206:
207: exiting = ISC_FALSE;
208: LOCK(&manager->lock);
209: if (!manager->exiting) {
210: if (task->quantum == 0)
211: task->quantum = manager->default_quantum;
212: APPEND(manager->tasks, task, link);
213: } else
214: exiting = ISC_TRUE;
215: UNLOCK(&manager->lock);
216:
217: if (exiting) {
218: DESTROYLOCK(&task->lock);
219: isc_mem_put(manager->mctx, task, sizeof(*task));
220: return (ISC_R_SHUTTINGDOWN);
221: }
222:
223: task->magic = TASK_MAGIC;
224: *taskp = task;
225:
226: return (ISC_R_SUCCESS);
227: }
228:
229: void
230: isc_task_attach(isc_task_t *source, isc_task_t **targetp) {
231:
232: /*
233: * Attach *targetp to source.
234: */
235:
236: REQUIRE(VALID_TASK(source));
237: REQUIRE(targetp != NULL && *targetp == NULL);
238:
239: XTTRACE(source, "isc_task_attach");
240:
241: LOCK(&source->lock);
242: source->references++;
243: UNLOCK(&source->lock);
244:
245: *targetp = source;
246: }
247:
248: static inline isc_boolean_t
249: task_shutdown(isc_task_t *task) {
250: isc_boolean_t was_idle = ISC_FALSE;
251: isc_event_t *event, *prev;
252:
253: /*
254: * Caller must be holding the task's lock.
255: */
256:
257: XTRACE("task_shutdown");
258:
259: if (! TASK_SHUTTINGDOWN(task)) {
260: XTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
261: ISC_MSG_SHUTTINGDOWN, "shutting down"));
262: task->flags |= TASK_F_SHUTTINGDOWN;
263: if (task->state == task_state_idle) {
264: INSIST(EMPTY(task->events));
265: task->state = task_state_ready;
266: was_idle = ISC_TRUE;
267: }
268: INSIST(task->state == task_state_ready ||
269: task->state == task_state_running);
270: /*
271: * Note that we post shutdown events LIFO.
272: */
273: for (event = TAIL(task->on_shutdown);
274: event != NULL;
275: event = prev) {
276: prev = PREV(event, ev_link);
277: DEQUEUE(task->on_shutdown, event, ev_link);
278: ENQUEUE(task->events, event, ev_link);
279: }
280: }
281:
282: return (was_idle);
283: }
284:
285: static inline void
286: task_ready(isc_task_t *task) {
287: isc_taskmgr_t *manager = task->manager;
288:
289: REQUIRE(VALID_MANAGER(manager));
290: REQUIRE(task->state == task_state_ready);
291:
292: XTRACE("task_ready");
293:
294: LOCK(&manager->lock);
295:
296: ENQUEUE(manager->ready_tasks, task, ready_link);
297: #ifdef ISC_PLATFORM_USETHREADS
298: SIGNAL(&manager->work_available);
299: #endif /* ISC_PLATFORM_USETHREADS */
300:
301: UNLOCK(&manager->lock);
302: }
303:
304: static inline isc_boolean_t
305: task_detach(isc_task_t *task) {
306:
307: /*
308: * Caller must be holding the task lock.
309: */
310:
311: REQUIRE(task->references > 0);
312:
313: XTRACE("detach");
314:
315: task->references--;
316: if (task->references == 0 && task->state == task_state_idle) {
317: INSIST(EMPTY(task->events));
318: /*
319: * There are no references to this task, and no
320: * pending events. We could try to optimize and
321: * either initiate shutdown or clean up the task,
322: * depending on its state, but it's easier to just
323: * make the task ready and allow run() or the event
324: * loop to deal with shutting down and termination.
325: */
326: task->state = task_state_ready;
327: return (ISC_TRUE);
328: }
329:
330: return (ISC_FALSE);
331: }
332:
333: void
334: isc_task_detach(isc_task_t **taskp) {
335: isc_task_t *task;
336: isc_boolean_t was_idle;
337:
338: /*
339: * Detach *taskp from its task.
340: */
341:
342: REQUIRE(taskp != NULL);
343: task = *taskp;
344: REQUIRE(VALID_TASK(task));
345:
346: XTRACE("isc_task_detach");
347:
348: LOCK(&task->lock);
349: was_idle = task_detach(task);
350: UNLOCK(&task->lock);
351:
352: if (was_idle)
353: task_ready(task);
354:
355: *taskp = NULL;
356: }
357:
358: static inline isc_boolean_t
359: task_send(isc_task_t *task, isc_event_t **eventp) {
360: isc_boolean_t was_idle = ISC_FALSE;
361: isc_event_t *event;
362:
363: /*
364: * Caller must be holding the task lock.
365: */
366:
367: REQUIRE(eventp != NULL);
368: event = *eventp;
369: REQUIRE(event != NULL);
370: REQUIRE(event->ev_type > 0);
371: REQUIRE(task->state != task_state_done);
372:
373: XTRACE("task_send");
374:
375: if (task->state == task_state_idle) {
376: was_idle = ISC_TRUE;
377: INSIST(EMPTY(task->events));
378: task->state = task_state_ready;
379: }
380: INSIST(task->state == task_state_ready ||
381: task->state == task_state_running);
382: ENQUEUE(task->events, event, ev_link);
383: *eventp = NULL;
384:
385: return (was_idle);
386: }
387:
388: void
389: isc_task_send(isc_task_t *task, isc_event_t **eventp) {
390: isc_boolean_t was_idle;
391:
392: /*
393: * Send '*event' to 'task'.
394: */
395:
396: REQUIRE(VALID_TASK(task));
397:
398: XTRACE("isc_task_send");
399:
400: /*
401: * We're trying hard to hold locks for as short a time as possible.
402: * We're also trying to hold as few locks as possible. This is why
403: * some processing is deferred until after the lock is released.
404: */
405: LOCK(&task->lock);
406: was_idle = task_send(task, eventp);
407: UNLOCK(&task->lock);
408:
409: if (was_idle) {
410: /*
411: * We need to add this task to the ready queue.
412: *
413: * We've waited until now to do it because making a task
414: * ready requires locking the manager. If we tried to do
415: * this while holding the task lock, we could deadlock.
416: *
417: * We've changed the state to ready, so no one else will
418: * be trying to add this task to the ready queue. The
419: * only way to leave the ready state is by executing the
420: * task. It thus doesn't matter if events are added,
421: * removed, or a shutdown is started in the interval
422: * between the time we released the task lock, and the time
423: * we add the task to the ready queue.
424: */
425: task_ready(task);
426: }
427: }
428:
429: void
430: isc_task_sendanddetach(isc_task_t **taskp, isc_event_t **eventp) {
431: isc_boolean_t idle1, idle2;
432: isc_task_t *task;
433:
434: /*
435: * Send '*event' to '*taskp' and then detach '*taskp' from its
436: * task.
437: */
438:
439: REQUIRE(taskp != NULL);
440: task = *taskp;
441: REQUIRE(VALID_TASK(task));
442:
443: XTRACE("isc_task_sendanddetach");
444:
445: LOCK(&task->lock);
446: idle1 = task_send(task, eventp);
447: idle2 = task_detach(task);
448: UNLOCK(&task->lock);
449:
450: /*
451: * If idle1, then idle2 shouldn't be true as well since we're holding
452: * the task lock, and thus the task cannot switch from ready back to
453: * idle.
454: */
455: INSIST(!(idle1 && idle2));
456:
457: if (idle1 || idle2)
458: task_ready(task);
459:
460: *taskp = NULL;
461: }
462:
463: #define PURGE_OK(event) (((event)->ev_attributes & ISC_EVENTATTR_NOPURGE) == 0)
464:
465: static unsigned int
466: dequeue_events(isc_task_t *task, void *sender, isc_eventtype_t first,
467: isc_eventtype_t last, void *tag,
468: isc_eventlist_t *events, isc_boolean_t purging)
469: {
470: isc_event_t *event, *next_event;
471: unsigned int count = 0;
472:
473: REQUIRE(VALID_TASK(task));
474: REQUIRE(last >= first);
475:
476: XTRACE("dequeue_events");
477:
478: /*
479: * Events matching 'sender', whose type is >= first and <= last, and
480: * whose tag is 'tag' will be dequeued. If 'purging', matching events
481: * which are marked as unpurgable will not be dequeued.
482: *
483: * sender == NULL means "any sender", and tag == NULL means "any tag".
484: */
485:
486: LOCK(&task->lock);
487:
488: for (event = HEAD(task->events); event != NULL; event = next_event) {
489: next_event = NEXT(event, ev_link);
490: if (event->ev_type >= first && event->ev_type <= last &&
491: (sender == NULL || event->ev_sender == sender) &&
492: (tag == NULL || event->ev_tag == tag) &&
493: (!purging || PURGE_OK(event))) {
494: DEQUEUE(task->events, event, ev_link);
495: ENQUEUE(*events, event, ev_link);
496: count++;
497: }
498: }
499:
500: UNLOCK(&task->lock);
501:
502: return (count);
503: }
504:
505: unsigned int
506: isc_task_purgerange(isc_task_t *task, void *sender, isc_eventtype_t first,
507: isc_eventtype_t last, void *tag)
508: {
509: unsigned int count;
510: isc_eventlist_t events;
511: isc_event_t *event, *next_event;
512:
513: /*
514: * Purge events from a task's event queue.
515: */
516:
517: XTRACE("isc_task_purgerange");
518:
519: ISC_LIST_INIT(events);
520:
521: count = dequeue_events(task, sender, first, last, tag, &events,
522: ISC_TRUE);
523:
524: for (event = HEAD(events); event != NULL; event = next_event) {
525: next_event = NEXT(event, ev_link);
526: isc_event_free(&event);
527: }
528:
529: /*
530: * Note that purging never changes the state of the task.
531: */
532:
533: return (count);
534: }
535:
536: unsigned int
537: isc_task_purge(isc_task_t *task, void *sender, isc_eventtype_t type,
538: void *tag)
539: {
540: /*
541: * Purge events from a task's event queue.
542: */
543:
544: XTRACE("isc_task_purge");
545:
546: return (isc_task_purgerange(task, sender, type, type, tag));
547: }
548:
549: isc_boolean_t
550: isc_task_purgeevent(isc_task_t *task, isc_event_t *event) {
551: isc_event_t *curr_event, *next_event;
552:
553: /*
554: * Purge 'event' from a task's event queue.
555: *
556: * XXXRTH: WARNING: This method may be removed before beta.
557: */
558:
559: REQUIRE(VALID_TASK(task));
560:
561: /*
562: * If 'event' is on the task's event queue, it will be purged,
563: * unless it is marked as unpurgeable. 'event' does not have to be
564: * on the task's event queue; in fact, it can even be an invalid
565: * pointer. Purging only occurs if the event is actually on the task's
566: * event queue.
567: *
568: * Purging never changes the state of the task.
569: */
570:
571: LOCK(&task->lock);
572: for (curr_event = HEAD(task->events);
573: curr_event != NULL;
574: curr_event = next_event) {
575: next_event = NEXT(curr_event, ev_link);
576: if (curr_event == event && PURGE_OK(event)) {
577: DEQUEUE(task->events, curr_event, ev_link);
578: break;
579: }
580: }
581: UNLOCK(&task->lock);
582:
583: if (curr_event == NULL)
584: return (ISC_FALSE);
585:
586: isc_event_free(&curr_event);
587:
588: return (ISC_TRUE);
589: }
590:
591: unsigned int
592: isc_task_unsendrange(isc_task_t *task, void *sender, isc_eventtype_t first,
593: isc_eventtype_t last, void *tag,
594: isc_eventlist_t *events)
595: {
596: /*
597: * Remove events from a task's event queue.
598: */
599:
600: XTRACE("isc_task_unsendrange");
601:
602: return (dequeue_events(task, sender, first, last, tag, events,
603: ISC_FALSE));
604: }
605:
606: unsigned int
607: isc_task_unsend(isc_task_t *task, void *sender, isc_eventtype_t type,
608: void *tag, isc_eventlist_t *events)
609: {
610: /*
611: * Remove events from a task's event queue.
612: */
613:
614: XTRACE("isc_task_unsend");
615:
616: return (dequeue_events(task, sender, type, type, tag, events,
617: ISC_FALSE));
618: }
619:
620: isc_result_t
621: isc_task_onshutdown(isc_task_t *task, isc_taskaction_t action, const void *arg)
622: {
623: isc_boolean_t disallowed = ISC_FALSE;
624: isc_result_t result = ISC_R_SUCCESS;
625: isc_event_t *event;
626:
627: /*
628: * Send a shutdown event with action 'action' and argument 'arg' when
629: * 'task' is shutdown.
630: */
631:
632: REQUIRE(VALID_TASK(task));
633: REQUIRE(action != NULL);
634:
635: event = isc_event_allocate(task->manager->mctx,
636: NULL,
637: ISC_TASKEVENT_SHUTDOWN,
638: action,
639: arg,
640: sizeof(*event));
641: if (event == NULL)
642: return (ISC_R_NOMEMORY);
643:
644: LOCK(&task->lock);
645: if (TASK_SHUTTINGDOWN(task)) {
646: disallowed = ISC_TRUE;
647: result = ISC_R_SHUTTINGDOWN;
648: } else
649: ENQUEUE(task->on_shutdown, event, ev_link);
650: UNLOCK(&task->lock);
651:
652: if (disallowed)
653: isc_mem_put(task->manager->mctx, event, sizeof(*event));
654:
655: return (result);
656: }
657:
658: void
659: isc_task_shutdown(isc_task_t *task) {
660: isc_boolean_t was_idle;
661:
662: /*
663: * Shutdown 'task'.
664: */
665:
666: REQUIRE(VALID_TASK(task));
667:
668: LOCK(&task->lock);
669: was_idle = task_shutdown(task);
670: UNLOCK(&task->lock);
671:
672: if (was_idle)
673: task_ready(task);
674: }
675:
676: void
677: isc_task_destroy(isc_task_t **taskp) {
678:
679: /*
680: * Destroy '*taskp'.
681: */
682:
683: REQUIRE(taskp != NULL);
684:
685: isc_task_shutdown(*taskp);
686: isc_task_detach(taskp);
687: }
688:
689: void
690: isc_task_setname(isc_task_t *task, const char *name, void *tag) {
691:
692: /*
693: * Name 'task'.
694: */
695:
696: REQUIRE(VALID_TASK(task));
697:
698: LOCK(&task->lock);
699: memset(task->name, 0, sizeof(task->name));
700: strncpy(task->name, name, sizeof(task->name) - 1);
701: task->tag = tag;
702: UNLOCK(&task->lock);
703: }
704:
705: const char *
706: isc_task_getname(isc_task_t *task) {
707: return (task->name);
708: }
709:
710: void *
711: isc_task_gettag(isc_task_t *task) {
712: return (task->tag);
713: }
714:
715: void
716: isc_task_getcurrenttime(isc_task_t *task, isc_stdtime_t *t) {
717: REQUIRE(VALID_TASK(task));
718: REQUIRE(t != NULL);
719:
720: LOCK(&task->lock);
721:
722: *t = task->now;
723:
724: UNLOCK(&task->lock);
725: }
726:
727: /***
728: *** Task Manager.
729: ***/
730: static void
731: dispatch(isc_taskmgr_t *manager) {
732: isc_task_t *task;
733: #ifndef ISC_PLATFORM_USETHREADS
734: unsigned int total_dispatch_count = 0;
735: isc_tasklist_t ready_tasks;
736: #endif /* ISC_PLATFORM_USETHREADS */
737:
738: REQUIRE(VALID_MANAGER(manager));
739:
740: /*
741: * Again we're trying to hold the lock for as short a time as possible
742: * and to do as little locking and unlocking as possible.
743: *
744: * In both while loops, the appropriate lock must be held before the
745: * while body starts. Code which acquired the lock at the top of
746: * the loop would be more readable, but would result in a lot of
747: * extra locking. Compare:
748: *
749: * Straightforward:
750: *
751: * LOCK();
752: * ...
753: * UNLOCK();
754: * while (expression) {
755: * LOCK();
756: * ...
757: * UNLOCK();
758: *
759: * Unlocked part here...
760: *
761: * LOCK();
762: * ...
763: * UNLOCK();
764: * }
765: *
766: * Note how if the loop continues we unlock and then immediately lock.
767: * For N iterations of the loop, this code does 2N+1 locks and 2N+1
768: * unlocks. Also note that the lock is not held when the while
769: * condition is tested, which may or may not be important, depending
770: * on the expression.
771: *
772: * As written:
773: *
774: * LOCK();
775: * while (expression) {
776: * ...
777: * UNLOCK();
778: *
779: * Unlocked part here...
780: *
781: * LOCK();
782: * ...
783: * }
784: * UNLOCK();
785: *
786: * For N iterations of the loop, this code does N+1 locks and N+1
787: * unlocks. The while expression is always protected by the lock.
788: */
789:
790: #ifndef ISC_PLATFORM_USETHREADS
791: ISC_LIST_INIT(ready_tasks);
792: #endif
793: LOCK(&manager->lock);
794: while (!FINISHED(manager)) {
795: #ifdef ISC_PLATFORM_USETHREADS
796: /*
797: * For reasons similar to those given in the comment in
798: * isc_task_send() above, it is safe for us to dequeue
799: * the task while only holding the manager lock, and then
800: * change the task to running state while only holding the
801: * task lock.
802: */
803: while ((EMPTY(manager->ready_tasks) ||
804: manager->exclusive_requested) &&
805: !FINISHED(manager))
806: {
807: XTHREADTRACE(isc_msgcat_get(isc_msgcat,
808: ISC_MSGSET_GENERAL,
809: ISC_MSG_WAIT, "wait"));
810: WAIT(&manager->work_available, &manager->lock);
811: XTHREADTRACE(isc_msgcat_get(isc_msgcat,
812: ISC_MSGSET_TASK,
813: ISC_MSG_AWAKE, "awake"));
814: }
815: #else /* ISC_PLATFORM_USETHREADS */
816: if (total_dispatch_count >= DEFAULT_TASKMGR_QUANTUM ||
817: EMPTY(manager->ready_tasks))
818: break;
819: #endif /* ISC_PLATFORM_USETHREADS */
820: XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_TASK,
821: ISC_MSG_WORKING, "working"));
822:
823: task = HEAD(manager->ready_tasks);
824: if (task != NULL) {
825: unsigned int dispatch_count = 0;
826: isc_boolean_t done = ISC_FALSE;
827: isc_boolean_t requeue = ISC_FALSE;
828: isc_boolean_t finished = ISC_FALSE;
829: isc_event_t *event;
830:
831: INSIST(VALID_TASK(task));
832:
833: /*
834: * Note we only unlock the manager lock if we actually
835: * have a task to do. We must reacquire the manager
836: * lock before exiting the 'if (task != NULL)' block.
837: */
838: DEQUEUE(manager->ready_tasks, task, ready_link);
839: manager->tasks_running++;
840: UNLOCK(&manager->lock);
841:
842: LOCK(&task->lock);
843: INSIST(task->state == task_state_ready);
844: task->state = task_state_running;
845: XTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
846: ISC_MSG_RUNNING, "running"));
847: isc_stdtime_get(&task->now);
848: do {
849: if (!EMPTY(task->events)) {
850: event = HEAD(task->events);
851: DEQUEUE(task->events, event, ev_link);
852:
853: /*
854: * Execute the event action.
855: */
856: XTRACE(isc_msgcat_get(isc_msgcat,
857: ISC_MSGSET_TASK,
858: ISC_MSG_EXECUTE,
859: "execute action"));
860: if (event->ev_action != NULL) {
861: UNLOCK(&task->lock);
862: (event->ev_action)(task,event);
863: LOCK(&task->lock);
864: }
865: dispatch_count++;
866: #ifndef ISC_PLATFORM_USETHREADS
867: total_dispatch_count++;
868: #endif /* ISC_PLATFORM_USETHREADS */
869: }
870:
871: if (task->references == 0 &&
872: EMPTY(task->events) &&
873: !TASK_SHUTTINGDOWN(task)) {
874: isc_boolean_t was_idle;
875:
876: /*
877: * There are no references and no
878: * pending events for this task,
879: * which means it will not become
880: * runnable again via an external
881: * action (such as sending an event
882: * or detaching).
883: *
884: * We initiate shutdown to prevent
885: * it from becoming a zombie.
886: *
887: * We do this here instead of in
888: * the "if EMPTY(task->events)" block
889: * below because:
890: *
891: * If we post no shutdown events,
892: * we want the task to finish.
893: *
894: * If we did post shutdown events,
895: * will still want the task's
896: * quantum to be applied.
897: */
898: was_idle = task_shutdown(task);
899: INSIST(!was_idle);
900: }
901:
902: if (EMPTY(task->events)) {
903: /*
904: * Nothing else to do for this task
905: * right now.
906: */
907: XTRACE(isc_msgcat_get(isc_msgcat,
908: ISC_MSGSET_TASK,
909: ISC_MSG_EMPTY,
910: "empty"));
911: if (task->references == 0 &&
912: TASK_SHUTTINGDOWN(task)) {
913: /*
914: * The task is done.
915: */
916: XTRACE(isc_msgcat_get(
917: isc_msgcat,
918: ISC_MSGSET_TASK,
919: ISC_MSG_DONE,
920: "done"));
921: finished = ISC_TRUE;
922: task->state = task_state_done;
923: } else
924: task->state = task_state_idle;
925: done = ISC_TRUE;
926: } else if (dispatch_count >= task->quantum) {
927: /*
928: * Our quantum has expired, but
929: * there is more work to be done.
930: * We'll requeue it to the ready
931: * queue later.
932: *
933: * We don't check quantum until
934: * dispatching at least one event,
935: * so the minimum quantum is one.
936: */
937: XTRACE(isc_msgcat_get(isc_msgcat,
938: ISC_MSGSET_TASK,
939: ISC_MSG_QUANTUM,
940: "quantum"));
941: task->state = task_state_ready;
942: requeue = ISC_TRUE;
943: done = ISC_TRUE;
944: }
945: } while (!done);
946: UNLOCK(&task->lock);
947:
948: if (finished)
949: task_finished(task);
950:
951: LOCK(&manager->lock);
952: manager->tasks_running--;
953: #ifdef ISC_PLATFORM_USETHREADS
954: if (manager->exclusive_requested &&
955: manager->tasks_running == 1) {
956: SIGNAL(&manager->exclusive_granted);
957: }
958: #endif /* ISC_PLATFORM_USETHREADS */
959: if (requeue) {
960: /*
961: * We know we're awake, so we don't have
962: * to wakeup any sleeping threads if the
963: * ready queue is empty before we requeue.
964: *
965: * A possible optimization if the queue is
966: * empty is to 'goto' the 'if (task != NULL)'
967: * block, avoiding the ENQUEUE of the task
968: * and the subsequent immediate DEQUEUE
969: * (since it is the only executable task).
970: * We don't do this because then we'd be
971: * skipping the exit_requested check. The
972: * cost of ENQUEUE is low anyway, especially
973: * when you consider that we'd have to do
974: * an extra EMPTY check to see if we could
975: * do the optimization. If the ready queue
976: * were usually nonempty, the 'optimization'
977: * might even hurt rather than help.
978: */
979: #ifdef ISC_PLATFORM_USETHREADS
980: ENQUEUE(manager->ready_tasks, task,
981: ready_link);
982: #else
983: ENQUEUE(ready_tasks, task, ready_link);
984: #endif
985: }
986: }
987: }
988: #ifndef ISC_PLATFORM_USETHREADS
989: ISC_LIST_APPENDLIST(manager->ready_tasks, ready_tasks, ready_link);
990: #endif
991: UNLOCK(&manager->lock);
992: }
993:
994: #ifdef ISC_PLATFORM_USETHREADS
995: static isc_threadresult_t
996: #ifdef _WIN32
997: WINAPI
998: #endif
999: run(void *uap) {
1000: isc_taskmgr_t *manager = uap;
1001:
1002: XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
1003: ISC_MSG_STARTING, "starting"));
1004:
1005: dispatch(manager);
1006:
1007: XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
1008: ISC_MSG_EXITING, "exiting"));
1009:
1010: return ((isc_threadresult_t)0);
1011: }
1012: #endif /* ISC_PLATFORM_USETHREADS */
1013:
1014: static void
1015: manager_free(isc_taskmgr_t *manager) {
1016: isc_mem_t *mctx;
1017:
1018: #ifdef ISC_PLATFORM_USETHREADS
1019: (void)isc_condition_destroy(&manager->exclusive_granted);
1020: (void)isc_condition_destroy(&manager->work_available);
1021: isc_mem_free(manager->mctx, manager->threads);
1022: #endif /* ISC_PLATFORM_USETHREADS */
1023: DESTROYLOCK(&manager->lock);
1024: manager->magic = 0;
1025: mctx = manager->mctx;
1026: isc_mem_put(mctx, manager, sizeof(*manager));
1027: isc_mem_detach(&mctx);
1028: }
1029:
1030: isc_result_t
1031: isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers,
1032: unsigned int default_quantum, isc_taskmgr_t **managerp)
1033: {
1034: isc_result_t result;
1035: unsigned int i, started = 0;
1036: isc_taskmgr_t *manager;
1037:
1038: /*
1039: * Create a new task manager.
1040: */
1041:
1042: REQUIRE(workers > 0);
1043: REQUIRE(managerp != NULL && *managerp == NULL);
1044:
1045: #ifndef ISC_PLATFORM_USETHREADS
1046: UNUSED(i);
1047: UNUSED(started);
1048: UNUSED(workers);
1049:
1050: if (taskmgr != NULL) {
1051: taskmgr->refs++;
1052: *managerp = taskmgr;
1053: return (ISC_R_SUCCESS);
1054: }
1055: #endif /* ISC_PLATFORM_USETHREADS */
1056:
1057: manager = isc_mem_get(mctx, sizeof(*manager));
1058: if (manager == NULL)
1059: return (ISC_R_NOMEMORY);
1060: manager->magic = TASK_MANAGER_MAGIC;
1061: manager->mctx = NULL;
1062: result = isc_mutex_init(&manager->lock);
1063: if (result != ISC_R_SUCCESS)
1064: goto cleanup_mgr;
1065:
1066: #ifdef ISC_PLATFORM_USETHREADS
1067: manager->workers = 0;
1068: manager->threads = isc_mem_allocate(mctx,
1069: workers * sizeof(isc_thread_t));
1070: if (manager->threads == NULL) {
1071: result = ISC_R_NOMEMORY;
1072: goto cleanup_lock;
1073: }
1074: if (isc_condition_init(&manager->work_available) != ISC_R_SUCCESS) {
1075: UNEXPECTED_ERROR(__FILE__, __LINE__,
1076: "isc_condition_init() %s",
1077: isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
1078: ISC_MSG_FAILED, "failed"));
1079: result = ISC_R_UNEXPECTED;
1080: goto cleanup_threads;
1081: }
1082: if (isc_condition_init(&manager->exclusive_granted) != ISC_R_SUCCESS) {
1083: UNEXPECTED_ERROR(__FILE__, __LINE__,
1084: "isc_condition_init() %s",
1085: isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
1086: ISC_MSG_FAILED, "failed"));
1087: result = ISC_R_UNEXPECTED;
1088: goto cleanup_workavailable;
1089: }
1090: #endif /* ISC_PLATFORM_USETHREADS */
1091: if (default_quantum == 0)
1092: default_quantum = DEFAULT_DEFAULT_QUANTUM;
1093: manager->default_quantum = default_quantum;
1094: INIT_LIST(manager->tasks);
1095: INIT_LIST(manager->ready_tasks);
1096: manager->tasks_running = 0;
1097: manager->exclusive_requested = ISC_FALSE;
1098: manager->exiting = ISC_FALSE;
1099:
1100: isc_mem_attach(mctx, &manager->mctx);
1101:
1102: #ifdef ISC_PLATFORM_USETHREADS
1103: LOCK(&manager->lock);
1104: /*
1105: * Start workers.
1106: */
1107: for (i = 0; i < workers; i++) {
1108: if (isc_thread_create(run, manager,
1109: &manager->threads[manager->workers]) ==
1110: ISC_R_SUCCESS) {
1111: manager->workers++;
1112: started++;
1113: }
1114: }
1115: UNLOCK(&manager->lock);
1116:
1117: if (started == 0) {
1118: manager_free(manager);
1119: return (ISC_R_NOTHREADS);
1120: }
1121: isc_thread_setconcurrency(workers);
1122: #else /* ISC_PLATFORM_USETHREADS */
1123: manager->refs = 1;
1124: taskmgr = manager;
1125: #endif /* ISC_PLATFORM_USETHREADS */
1126:
1127: *managerp = manager;
1128:
1129: return (ISC_R_SUCCESS);
1130:
1131: #ifdef ISC_PLATFORM_USETHREADS
1132: cleanup_workavailable:
1133: (void)isc_condition_destroy(&manager->work_available);
1134: cleanup_threads:
1135: isc_mem_free(mctx, manager->threads);
1136: cleanup_lock:
1137: DESTROYLOCK(&manager->lock);
1138: #endif
1139: cleanup_mgr:
1140: isc_mem_put(mctx, manager, sizeof(*manager));
1141: return (result);
1142: }
1143:
1144: void
1145: isc_taskmgr_destroy(isc_taskmgr_t **managerp) {
1146: isc_taskmgr_t *manager;
1147: isc_task_t *task;
1148: unsigned int i;
1149:
1150: /*
1151: * Destroy '*managerp'.
1152: */
1153:
1154: REQUIRE(managerp != NULL);
1155: manager = *managerp;
1156: REQUIRE(VALID_MANAGER(manager));
1157:
1158: #ifndef ISC_PLATFORM_USETHREADS
1159: UNUSED(i);
1160:
1161: if (manager->refs > 1) {
1162: manager->refs--;
1163: *managerp = NULL;
1164: return;
1165: }
1166: #endif /* ISC_PLATFORM_USETHREADS */
1167:
1168: XTHREADTRACE("isc_taskmgr_destroy");
1169: /*
1170: * Only one non-worker thread may ever call this routine.
1171: * If a worker thread wants to initiate shutdown of the
1172: * task manager, it should ask some non-worker thread to call
1173: * isc_taskmgr_destroy(), e.g. by signalling a condition variable
1174: * that the startup thread is sleeping on.
1175: */
1176:
1177: /*
1178: * Unlike elsewhere, we're going to hold this lock a long time.
1179: * We need to do so, because otherwise the list of tasks could
1180: * change while we were traversing it.
1181: *
1182: * This is also the only function where we will hold both the
1183: * task manager lock and a task lock at the same time.
1184: */
1185:
1186: LOCK(&manager->lock);
1187:
1188: /*
1189: * Make sure we only get called once.
1190: */
1191: INSIST(!manager->exiting);
1192: manager->exiting = ISC_TRUE;
1193:
1194: /*
1195: * Post shutdown event(s) to every task (if they haven't already been
1196: * posted).
1197: */
1198: for (task = HEAD(manager->tasks);
1199: task != NULL;
1200: task = NEXT(task, link)) {
1201: LOCK(&task->lock);
1202: if (task_shutdown(task))
1203: ENQUEUE(manager->ready_tasks, task, ready_link);
1204: UNLOCK(&task->lock);
1205: }
1206: #ifdef ISC_PLATFORM_USETHREADS
1207: /*
1208: * Wake up any sleeping workers. This ensures we get work done if
1209: * there's work left to do, and if there are already no tasks left
1210: * it will cause the workers to see manager->exiting.
1211: */
1212: BROADCAST(&manager->work_available);
1213: UNLOCK(&manager->lock);
1214:
1215: /*
1216: * Wait for all the worker threads to exit.
1217: */
1218: for (i = 0; i < manager->workers; i++)
1219: (void)isc_thread_join(manager->threads[i], NULL);
1220: #else /* ISC_PLATFORM_USETHREADS */
1221: /*
1222: * Dispatch the shutdown events.
1223: */
1224: UNLOCK(&manager->lock);
1225: while (isc__taskmgr_ready())
1226: (void)isc__taskmgr_dispatch();
1227: if (!ISC_LIST_EMPTY(manager->tasks))
1228: isc_mem_printallactive(stderr);
1229: INSIST(ISC_LIST_EMPTY(manager->tasks));
1230: #endif /* ISC_PLATFORM_USETHREADS */
1231:
1232: manager_free(manager);
1233:
1234: *managerp = NULL;
1235: }
1236:
1237: #ifndef ISC_PLATFORM_USETHREADS
1238: isc_boolean_t
1239: isc__taskmgr_ready(void) {
1240: if (taskmgr == NULL)
1241: return (ISC_FALSE);
1242: return (ISC_TF(!ISC_LIST_EMPTY(taskmgr->ready_tasks)));
1243: }
1244:
1245: isc_result_t
1246: isc__taskmgr_dispatch(void) {
1247: isc_taskmgr_t *manager = taskmgr;
1248:
1249: if (taskmgr == NULL)
1250: return (ISC_R_NOTFOUND);
1251:
1252: dispatch(manager);
1253:
1254: return (ISC_R_SUCCESS);
1255: }
1256:
1257: #endif /* ISC_PLATFORM_USETHREADS */
1258:
1259: isc_result_t
1260: isc_task_beginexclusive(isc_task_t *task) {
1261: #ifdef ISC_PLATFORM_USETHREADS
1262: isc_taskmgr_t *manager = task->manager;
1263: REQUIRE(task->state == task_state_running);
1264: LOCK(&manager->lock);
1265: if (manager->exclusive_requested) {
1266: UNLOCK(&manager->lock);
1267: return (ISC_R_LOCKBUSY);
1268: }
1269: manager->exclusive_requested = ISC_TRUE;
1270: while (manager->tasks_running > 1) {
1271: WAIT(&manager->exclusive_granted, &manager->lock);
1272: }
1273: UNLOCK(&manager->lock);
1274: #else
1275: UNUSED(task);
1276: #endif
1277: return (ISC_R_SUCCESS);
1278: }
1279:
1280: void
1281: isc_task_endexclusive(isc_task_t *task) {
1282: #ifdef ISC_PLATFORM_USETHREADS
1283: isc_taskmgr_t *manager = task->manager;
1284: REQUIRE(task->state == task_state_running);
1285: LOCK(&manager->lock);
1286: REQUIRE(manager->exclusive_requested);
1287: manager->exclusive_requested = ISC_FALSE;
1288: BROADCAST(&manager->work_available);
1289: UNLOCK(&manager->lock);
1290: #else
1291: UNUSED(task);
1292: #endif
1293: }
1294:
1295: #ifdef HAVE_LIBXML2
1296:
1297: void
1298: isc_taskmgr_renderxml(isc_taskmgr_t *mgr, xmlTextWriterPtr writer)
1299: {
1300: isc_task_t *task;
1301:
1302: LOCK(&mgr->lock);
1303:
1304: /*
1305: * Write out the thread-model, and some details about each depending
1306: * on which type is enabled.
1307: */
1308: xmlTextWriterStartElement(writer, ISC_XMLCHAR "thread-model");
1309: #ifdef ISC_PLATFORM_USETHREADS
1310: xmlTextWriterStartElement(writer, ISC_XMLCHAR "type");
1311: xmlTextWriterWriteString(writer, ISC_XMLCHAR "threaded");
1312: xmlTextWriterEndElement(writer); /* type */
1313:
1314: xmlTextWriterStartElement(writer, ISC_XMLCHAR "worker-threads");
1315: xmlTextWriterWriteFormatString(writer, "%d", mgr->workers);
1316: xmlTextWriterEndElement(writer); /* worker-threads */
1317: #else /* ISC_PLATFORM_USETHREADS */
1318: xmlTextWriterStartElement(writer, ISC_XMLCHAR "type");
1319: xmlTextWriterWriteString(writer, ISC_XMLCHAR "non-threaded");
1320: xmlTextWriterEndElement(writer); /* type */
1321:
1322: xmlTextWriterStartElement(writer, ISC_XMLCHAR "references");
1323: xmlTextWriterWriteFormatString(writer, "%d", mgr->refs);
1324: xmlTextWriterEndElement(writer); /* references */
1325: #endif /* ISC_PLATFORM_USETHREADS */
1326:
1327: xmlTextWriterStartElement(writer, ISC_XMLCHAR "default-quantum");
1328: xmlTextWriterWriteFormatString(writer, "%d", mgr->default_quantum);
1329: xmlTextWriterEndElement(writer); /* default-quantum */
1330:
1331: xmlTextWriterStartElement(writer, ISC_XMLCHAR "tasks-running");
1332: xmlTextWriterWriteFormatString(writer, "%d", mgr->tasks_running);
1333: xmlTextWriterEndElement(writer); /* tasks-running */
1334:
1335: xmlTextWriterEndElement(writer); /* thread-model */
1336:
1337: xmlTextWriterStartElement(writer, ISC_XMLCHAR "tasks");
1338: task = ISC_LIST_HEAD(mgr->tasks);
1339: while (task != NULL) {
1340: LOCK(&task->lock);
1341: xmlTextWriterStartElement(writer, ISC_XMLCHAR "task");
1342:
1343: if (task->name[0] != 0) {
1344: xmlTextWriterStartElement(writer, ISC_XMLCHAR "name");
1345: xmlTextWriterWriteFormatString(writer, "%s",
1346: task->name);
1347: xmlTextWriterEndElement(writer); /* name */
1348: }
1349:
1350: xmlTextWriterStartElement(writer, ISC_XMLCHAR "references");
1351: xmlTextWriterWriteFormatString(writer, "%d", task->references);
1352: xmlTextWriterEndElement(writer); /* references */
1353:
1354: xmlTextWriterStartElement(writer, ISC_XMLCHAR "id");
1355: xmlTextWriterWriteFormatString(writer, "%p", task);
1356: xmlTextWriterEndElement(writer); /* id */
1357:
1358: xmlTextWriterStartElement(writer, ISC_XMLCHAR "state");
1359: xmlTextWriterWriteFormatString(writer, "%s",
1360: statenames[task->state]);
1361: xmlTextWriterEndElement(writer); /* state */
1362:
1363: xmlTextWriterStartElement(writer, ISC_XMLCHAR "quantum");
1364: xmlTextWriterWriteFormatString(writer, "%d", task->quantum);
1365: xmlTextWriterEndElement(writer); /* quantum */
1366:
1367: xmlTextWriterEndElement(writer);
1368:
1369: UNLOCK(&task->lock);
1370: task = ISC_LIST_NEXT(task, link);
1371: }
1372: xmlTextWriterEndElement(writer); /* tasks */
1373:
1374: UNLOCK(&mgr->lock);
1375: }
1376: #endif /* HAVE_LIBXML2 */
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>