1: /*
2: * Quagga Work Queue Support.
3: *
4: * Copyright (C) 2005 Sun Microsystems, Inc.
5: *
6: * This file is part of GNU Zebra.
7: *
8: * Quagga is free software; you can redistribute it and/or modify it
9: * under the terms of the GNU General Public License as published by the
10: * Free Software Foundation; either version 2, or (at your option) any
11: * later version.
12: *
13: * Quagga is distributed in the hope that it will be useful, but
14: * WITHOUT ANY WARRANTY; without even the implied warranty of
15: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16: * General Public License for more details.
17: *
18: * You should have received a copy of the GNU General Public License
19: * along with Quagga; see the file COPYING. If not, write to the Free
20: * Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
21: * 02111-1307, USA.
22: */
23:
24: #include <zebra.h>
25: #include "thread.h"
26: #include "memory.h"
27: #include "workqueue.h"
28: #include "linklist.h"
29: #include "command.h"
30: #include "log.h"
31:
32: /* master list of work_queues */
33: static struct list _work_queues;
34: /* pointer primarily to avoid an otherwise harmless warning on
35: * ALL_LIST_ELEMENTS_RO
36: */
37: static struct list *work_queues = &_work_queues;
38:
39: #define WORK_QUEUE_MIN_GRANULARITY 1
40:
41: static struct work_queue_item *
42: work_queue_item_new (struct work_queue *wq)
43: {
44: struct work_queue_item *item;
45: assert (wq);
46:
47: item = XCALLOC (MTYPE_WORK_QUEUE_ITEM,
48: sizeof (struct work_queue_item));
49:
50: return item;
51: }
52:
53: static void
54: work_queue_item_free (struct work_queue_item *item)
55: {
56: XFREE (MTYPE_WORK_QUEUE_ITEM, item);
57: return;
58: }
59:
60: /* create new work queue */
61: struct work_queue *
62: work_queue_new (struct thread_master *m, const char *queue_name)
63: {
64: struct work_queue *new;
65:
66: new = XCALLOC (MTYPE_WORK_QUEUE, sizeof (struct work_queue));
67:
68: if (new == NULL)
69: return new;
70:
71: new->name = XSTRDUP (MTYPE_WORK_QUEUE_NAME, queue_name);
72: new->master = m;
73: SET_FLAG (new->flags, WQ_UNPLUGGED);
74:
75: if ( (new->items = list_new ()) == NULL)
76: {
77: XFREE (MTYPE_WORK_QUEUE_NAME, new->name);
78: XFREE (MTYPE_WORK_QUEUE, new);
79:
80: return NULL;
81: }
82:
83: new->items->del = (void (*)(void *)) work_queue_item_free;
84:
85: listnode_add (work_queues, new);
86:
87: new->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY;
88:
89: /* Default values, can be overriden by caller */
90: new->spec.hold = WORK_QUEUE_DEFAULT_HOLD;
91:
92: return new;
93: }
94:
95: void
96: work_queue_free (struct work_queue *wq)
97: {
98: if (wq->thread != NULL)
99: thread_cancel(wq->thread);
100:
101: /* list_delete frees items via callback */
102: list_delete (wq->items);
103: listnode_delete (work_queues, wq);
104:
105: XFREE (MTYPE_WORK_QUEUE_NAME, wq->name);
106: XFREE (MTYPE_WORK_QUEUE, wq);
107: return;
108: }
109:
110: bool
111: work_queue_is_scheduled (struct work_queue *wq)
112: {
113: return (wq->thread != NULL);
114: }
115:
116: static int
117: work_queue_schedule (struct work_queue *wq, unsigned int delay)
118: {
119: /* if appropriate, schedule work queue thread */
120: if ( CHECK_FLAG (wq->flags, WQ_UNPLUGGED)
121: && (wq->thread == NULL)
122: && (listcount (wq->items) > 0) )
123: {
124: wq->thread = thread_add_background (wq->master, work_queue_run,
125: wq, delay);
126: return 1;
127: }
128: else
129: return 0;
130: }
131:
132: void
133: work_queue_add (struct work_queue *wq, void *data)
134: {
135: struct work_queue_item *item;
136:
137: assert (wq);
138:
139: if (!(item = work_queue_item_new (wq)))
140: {
141: zlog_err ("%s: unable to get new queue item", __func__);
142: return;
143: }
144:
145: item->data = data;
146: listnode_add (wq->items, item);
147:
148: work_queue_schedule (wq, wq->spec.hold);
149:
150: return;
151: }
152:
153: static void
154: work_queue_item_remove (struct work_queue *wq, struct listnode *ln)
155: {
156: struct work_queue_item *item = listgetdata (ln);
157:
158: assert (item && item->data);
159:
160: /* call private data deletion callback if needed */
161: if (wq->spec.del_item_data)
162: wq->spec.del_item_data (wq, item->data);
163:
164: list_delete_node (wq->items, ln);
165: work_queue_item_free (item);
166:
167: return;
168: }
169:
170: static void
171: work_queue_item_requeue (struct work_queue *wq, struct listnode *ln)
172: {
173: LISTNODE_DETACH (wq->items, ln);
174: LISTNODE_ATTACH (wq->items, ln); /* attach to end of list */
175: }
176:
177: DEFUN(show_work_queues,
178: show_work_queues_cmd,
179: "show work-queues",
180: SHOW_STR
181: "Work Queue information\n")
182: {
183: struct listnode *node;
184: struct work_queue *wq;
185:
186: vty_out (vty,
187: "%c %8s %5s %8s %21s%s",
188: ' ', "List","(ms) ","Q. Runs","Cycle Counts ",
189: VTY_NEWLINE);
190: vty_out (vty,
191: "%c %8s %5s %8s %7s %6s %6s %s%s",
192: 'P',
193: "Items",
194: "Hold",
195: "Total",
196: "Best","Gran.","Avg.",
197: "Name",
198: VTY_NEWLINE);
199:
200: for (ALL_LIST_ELEMENTS_RO (work_queues, node, wq))
201: {
202: vty_out (vty,"%c %8d %5d %8ld %7d %6d %6u %s%s",
203: (CHECK_FLAG (wq->flags, WQ_UNPLUGGED) ? ' ' : 'P'),
204: listcount (wq->items),
205: wq->spec.hold,
206: wq->runs,
207: wq->cycles.best, wq->cycles.granularity,
208: (wq->runs) ?
209: (unsigned int) (wq->cycles.total / wq->runs) : 0,
210: wq->name,
211: VTY_NEWLINE);
212: }
213:
214: return CMD_SUCCESS;
215: }
216:
217: /* 'plug' a queue: Stop it from being scheduled,
218: * ie: prevent the queue from draining.
219: */
220: void
221: work_queue_plug (struct work_queue *wq)
222: {
223: if (wq->thread)
224: thread_cancel (wq->thread);
225:
226: wq->thread = NULL;
227:
228: UNSET_FLAG (wq->flags, WQ_UNPLUGGED);
229: }
230:
231: /* unplug queue, schedule it again, if appropriate
232: * Ie: Allow the queue to be drained again
233: */
234: void
235: work_queue_unplug (struct work_queue *wq)
236: {
237: SET_FLAG (wq->flags, WQ_UNPLUGGED);
238:
239: /* if thread isnt already waiting, add one */
240: work_queue_schedule (wq, wq->spec.hold);
241: }
242:
243: /* timer thread to process a work queue
244: * will reschedule itself if required,
245: * otherwise work_queue_item_add
246: */
247: int
248: work_queue_run (struct thread *thread)
249: {
250: struct work_queue *wq;
251: struct work_queue_item *item;
252: wq_item_status ret;
253: unsigned int cycles = 0;
254: struct listnode *node, *nnode;
255: char yielded = 0;
256:
257: wq = THREAD_ARG (thread);
258: wq->thread = NULL;
259:
260: assert (wq && wq->items);
261:
262: /* calculate cycle granularity:
263: * list iteration == 1 cycle
264: * granularity == # cycles between checks whether we should yield.
265: *
266: * granularity should be > 0, and can increase slowly after each run to
267: * provide some hysteris, but not past cycles.best or 2*cycles.
268: *
269: * Best: starts low, can only increase
270: *
271: * Granularity: starts at WORK_QUEUE_MIN_GRANULARITY, can be decreased
272: * if we run to end of time slot, can increase otherwise
273: * by a small factor.
274: *
275: * We could use just the average and save some work, however we want to be
276: * able to adjust quickly to CPU pressure. Average wont shift much if
277: * daemon has been running a long time.
278: */
279: if (wq->cycles.granularity == 0)
280: wq->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY;
281:
282: for (ALL_LIST_ELEMENTS (wq->items, node, nnode, item))
283: {
284: assert (item && item->data);
285:
286: /* dont run items which are past their allowed retries */
287: if (item->ran > wq->spec.max_retries)
288: {
289: /* run error handler, if any */
290: if (wq->spec.errorfunc)
291: wq->spec.errorfunc (wq, item->data);
292: work_queue_item_remove (wq, node);
293: continue;
294: }
295:
296: /* run and take care of items that want to be retried immediately */
297: do
298: {
299: ret = wq->spec.workfunc (wq, item->data);
300: item->ran++;
301: }
302: while ((ret == WQ_RETRY_NOW)
303: && (item->ran < wq->spec.max_retries));
304:
305: switch (ret)
306: {
307: case WQ_QUEUE_BLOCKED:
308: {
309: /* decrement item->ran again, cause this isn't an item
310: * specific error, and fall through to WQ_RETRY_LATER
311: */
312: item->ran--;
313: }
314: case WQ_RETRY_LATER:
315: {
316: goto stats;
317: }
318: case WQ_REQUEUE:
319: {
320: item->ran--;
321: work_queue_item_requeue (wq, node);
322: break;
323: }
324: case WQ_RETRY_NOW:
325: /* a RETRY_NOW that gets here has exceeded max_tries, same as ERROR */
326: case WQ_ERROR:
327: {
328: if (wq->spec.errorfunc)
329: wq->spec.errorfunc (wq, item);
330: }
331: /* fall through here is deliberate */
332: case WQ_SUCCESS:
333: default:
334: {
335: work_queue_item_remove (wq, node);
336: break;
337: }
338: }
339:
340: /* completed cycle */
341: cycles++;
342:
343: /* test if we should yield */
344: if ( !(cycles % wq->cycles.granularity)
345: && thread_should_yield (thread))
346: {
347: yielded = 1;
348: goto stats;
349: }
350: }
351:
352: stats:
353:
354: #define WQ_HYSTERESIS_FACTOR 4
355:
356: /* we yielded, check whether granularity should be reduced */
357: if (yielded && (cycles < wq->cycles.granularity))
358: {
359: wq->cycles.granularity = ((cycles > 0) ? cycles
360: : WORK_QUEUE_MIN_GRANULARITY);
361: }
362: /* otherwise, should granularity increase? */
363: else if (cycles >= (wq->cycles.granularity))
364: {
365: if (cycles > wq->cycles.best)
366: wq->cycles.best = cycles;
367:
368: /* along with yielded check, provides hysteresis for granularity */
369: if (cycles > (wq->cycles.granularity * WQ_HYSTERESIS_FACTOR
370: * WQ_HYSTERESIS_FACTOR))
371: wq->cycles.granularity *= WQ_HYSTERESIS_FACTOR; /* quick ramp-up */
372: else if (cycles > (wq->cycles.granularity * WQ_HYSTERESIS_FACTOR))
373: wq->cycles.granularity += WQ_HYSTERESIS_FACTOR;
374: }
375: #undef WQ_HYSTERIS_FACTOR
376:
377: wq->runs++;
378: wq->cycles.total += cycles;
379:
380: #if 0
381: printf ("%s: cycles %d, new: best %d, worst %d\n",
382: __func__, cycles, wq->cycles.best, wq->cycles.granularity);
383: #endif
384:
385: /* Is the queue done yet? If it is, call the completion callback. */
386: if (listcount (wq->items) > 0)
387: work_queue_schedule (wq, 0);
388: else if (wq->spec.completion_func)
389: wq->spec.completion_func (wq);
390:
391: return 0;
392: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>