Annotation of embedaddon/quagga/lib/workqueue.c, revision 1.1.1.2
1.1 misho 1: /*
2: * Quagga Work Queue Support.
3: *
4: * Copyright (C) 2005 Sun Microsystems, Inc.
5: *
6: * This file is part of GNU Zebra.
7: *
8: * Quagga is free software; you can redistribute it and/or modify it
9: * under the terms of the GNU General Public License as published by the
10: * Free Software Foundation; either version 2, or (at your option) any
11: * later version.
12: *
13: * Quagga is distributed in the hope that it will be useful, but
14: * WITHOUT ANY WARRANTY; without even the implied warranty of
15: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16: * General Public License for more details.
17: *
18: * You should have received a copy of the GNU General Public License
19: * along with Quagga; see the file COPYING. If not, write to the Free
20: * Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
21: * 02111-1307, USA.
22: */
23:
24: #include <lib/zebra.h>
25: #include "thread.h"
26: #include "memory.h"
27: #include "workqueue.h"
28: #include "linklist.h"
29: #include "command.h"
30: #include "log.h"
31:
32: /* master list of work_queues */
33: static struct list work_queues;
34:
35: #define WORK_QUEUE_MIN_GRANULARITY 1
36:
37: static struct work_queue_item *
38: work_queue_item_new (struct work_queue *wq)
39: {
40: struct work_queue_item *item;
41: assert (wq);
42:
43: item = XCALLOC (MTYPE_WORK_QUEUE_ITEM,
44: sizeof (struct work_queue_item));
45:
46: return item;
47: }
48:
49: static void
50: work_queue_item_free (struct work_queue_item *item)
51: {
52: XFREE (MTYPE_WORK_QUEUE_ITEM, item);
53: return;
54: }
55:
56: /* create new work queue */
57: struct work_queue *
58: work_queue_new (struct thread_master *m, const char *queue_name)
59: {
60: struct work_queue *new;
61:
62: new = XCALLOC (MTYPE_WORK_QUEUE, sizeof (struct work_queue));
63:
64: if (new == NULL)
65: return new;
66:
67: new->name = XSTRDUP (MTYPE_WORK_QUEUE_NAME, queue_name);
68: new->master = m;
69: SET_FLAG (new->flags, WQ_UNPLUGGED);
70:
71: if ( (new->items = list_new ()) == NULL)
72: {
73: XFREE (MTYPE_WORK_QUEUE_NAME, new->name);
74: XFREE (MTYPE_WORK_QUEUE, new);
75:
76: return NULL;
77: }
78:
79: new->items->del = (void (*)(void *)) work_queue_item_free;
80:
81: listnode_add (&work_queues, new);
82:
83: new->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY;
84:
85: /* Default values, can be overriden by caller */
86: new->spec.hold = WORK_QUEUE_DEFAULT_HOLD;
87:
88: return new;
89: }
90:
91: void
92: work_queue_free (struct work_queue *wq)
93: {
94: if (wq->thread != NULL)
95: thread_cancel(wq->thread);
96:
97: /* list_delete frees items via callback */
98: list_delete (wq->items);
99: listnode_delete (&work_queues, wq);
100:
101: XFREE (MTYPE_WORK_QUEUE_NAME, wq->name);
102: XFREE (MTYPE_WORK_QUEUE, wq);
103: return;
104: }
105:
1.1.1.2 ! misho 106: static int
1.1 misho 107: work_queue_schedule (struct work_queue *wq, unsigned int delay)
108: {
109: /* if appropriate, schedule work queue thread */
110: if ( CHECK_FLAG (wq->flags, WQ_UNPLUGGED)
111: && (wq->thread == NULL)
112: && (listcount (wq->items) > 0) )
113: {
114: wq->thread = thread_add_background (wq->master, work_queue_run,
115: wq, delay);
116: return 1;
117: }
118: else
119: return 0;
120: }
121:
122: void
123: work_queue_add (struct work_queue *wq, void *data)
124: {
125: struct work_queue_item *item;
126:
127: assert (wq);
128:
129: if (!(item = work_queue_item_new (wq)))
130: {
131: zlog_err ("%s: unable to get new queue item", __func__);
132: return;
133: }
134:
135: item->data = data;
136: listnode_add (wq->items, item);
137:
138: work_queue_schedule (wq, wq->spec.hold);
139:
140: return;
141: }
142:
143: static void
144: work_queue_item_remove (struct work_queue *wq, struct listnode *ln)
145: {
146: struct work_queue_item *item = listgetdata (ln);
147:
148: assert (item && item->data);
149:
150: /* call private data deletion callback if needed */
151: if (wq->spec.del_item_data)
152: wq->spec.del_item_data (wq, item->data);
153:
154: list_delete_node (wq->items, ln);
155: work_queue_item_free (item);
156:
157: return;
158: }
159:
160: static void
161: work_queue_item_requeue (struct work_queue *wq, struct listnode *ln)
162: {
163: LISTNODE_DETACH (wq->items, ln);
164: LISTNODE_ATTACH (wq->items, ln); /* attach to end of list */
165: }
166:
167: DEFUN(show_work_queues,
168: show_work_queues_cmd,
169: "show work-queues",
170: SHOW_STR
171: "Work Queue information\n")
172: {
173: struct listnode *node;
174: struct work_queue *wq;
175:
176: vty_out (vty,
177: "%c %8s %5s %8s %21s%s",
178: ' ', "List","(ms) ","Q. Runs","Cycle Counts ",
179: VTY_NEWLINE);
180: vty_out (vty,
181: "%c %8s %5s %8s %7s %6s %6s %s%s",
182: 'P',
183: "Items",
184: "Hold",
185: "Total",
186: "Best","Gran.","Avg.",
187: "Name",
188: VTY_NEWLINE);
189:
190: for (ALL_LIST_ELEMENTS_RO ((&work_queues), node, wq))
191: {
192: vty_out (vty,"%c %8d %5d %8ld %7d %6d %6u %s%s",
193: (CHECK_FLAG (wq->flags, WQ_UNPLUGGED) ? ' ' : 'P'),
194: listcount (wq->items),
195: wq->spec.hold,
196: wq->runs,
197: wq->cycles.best, wq->cycles.granularity,
198: (wq->runs) ?
199: (unsigned int) (wq->cycles.total / wq->runs) : 0,
200: wq->name,
201: VTY_NEWLINE);
202: }
203:
204: return CMD_SUCCESS;
205: }
206:
207: /* 'plug' a queue: Stop it from being scheduled,
208: * ie: prevent the queue from draining.
209: */
210: void
211: work_queue_plug (struct work_queue *wq)
212: {
213: if (wq->thread)
214: thread_cancel (wq->thread);
215:
216: wq->thread = NULL;
217:
218: UNSET_FLAG (wq->flags, WQ_UNPLUGGED);
219: }
220:
221: /* unplug queue, schedule it again, if appropriate
222: * Ie: Allow the queue to be drained again
223: */
224: void
225: work_queue_unplug (struct work_queue *wq)
226: {
227: SET_FLAG (wq->flags, WQ_UNPLUGGED);
228:
229: /* if thread isnt already waiting, add one */
230: work_queue_schedule (wq, wq->spec.hold);
231: }
232:
233: /* timer thread to process a work queue
234: * will reschedule itself if required,
235: * otherwise work_queue_item_add
236: */
237: int
238: work_queue_run (struct thread *thread)
239: {
240: struct work_queue *wq;
241: struct work_queue_item *item;
242: wq_item_status ret;
243: unsigned int cycles = 0;
244: struct listnode *node, *nnode;
245: char yielded = 0;
246:
247: wq = THREAD_ARG (thread);
248: wq->thread = NULL;
249:
250: assert (wq && wq->items);
251:
252: /* calculate cycle granularity:
253: * list iteration == 1 cycle
254: * granularity == # cycles between checks whether we should yield.
255: *
256: * granularity should be > 0, and can increase slowly after each run to
257: * provide some hysteris, but not past cycles.best or 2*cycles.
258: *
259: * Best: starts low, can only increase
260: *
261: * Granularity: starts at WORK_QUEUE_MIN_GRANULARITY, can be decreased
262: * if we run to end of time slot, can increase otherwise
263: * by a small factor.
264: *
265: * We could use just the average and save some work, however we want to be
266: * able to adjust quickly to CPU pressure. Average wont shift much if
267: * daemon has been running a long time.
268: */
269: if (wq->cycles.granularity == 0)
270: wq->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY;
271:
272: for (ALL_LIST_ELEMENTS (wq->items, node, nnode, item))
273: {
274: assert (item && item->data);
275:
276: /* dont run items which are past their allowed retries */
277: if (item->ran > wq->spec.max_retries)
278: {
279: /* run error handler, if any */
280: if (wq->spec.errorfunc)
281: wq->spec.errorfunc (wq, item->data);
282: work_queue_item_remove (wq, node);
283: continue;
284: }
285:
286: /* run and take care of items that want to be retried immediately */
287: do
288: {
289: ret = wq->spec.workfunc (wq, item->data);
290: item->ran++;
291: }
292: while ((ret == WQ_RETRY_NOW)
293: && (item->ran < wq->spec.max_retries));
294:
295: switch (ret)
296: {
297: case WQ_QUEUE_BLOCKED:
298: {
299: /* decrement item->ran again, cause this isn't an item
300: * specific error, and fall through to WQ_RETRY_LATER
301: */
302: item->ran--;
303: }
304: case WQ_RETRY_LATER:
305: {
306: goto stats;
307: }
308: case WQ_REQUEUE:
309: {
310: item->ran--;
311: work_queue_item_requeue (wq, node);
312: break;
313: }
314: case WQ_RETRY_NOW:
315: /* a RETRY_NOW that gets here has exceeded max_tries, same as ERROR */
316: case WQ_ERROR:
317: {
318: if (wq->spec.errorfunc)
319: wq->spec.errorfunc (wq, item);
320: }
321: /* fall through here is deliberate */
322: case WQ_SUCCESS:
323: default:
324: {
325: work_queue_item_remove (wq, node);
326: break;
327: }
328: }
329:
330: /* completed cycle */
331: cycles++;
332:
333: /* test if we should yield */
334: if ( !(cycles % wq->cycles.granularity)
335: && thread_should_yield (thread))
336: {
337: yielded = 1;
338: goto stats;
339: }
340: }
341:
342: stats:
343:
344: #define WQ_HYSTERESIS_FACTOR 4
345:
346: /* we yielded, check whether granularity should be reduced */
347: if (yielded && (cycles < wq->cycles.granularity))
348: {
349: wq->cycles.granularity = ((cycles > 0) ? cycles
350: : WORK_QUEUE_MIN_GRANULARITY);
351: }
352: /* otherwise, should granularity increase? */
353: else if (cycles >= (wq->cycles.granularity))
354: {
355: if (cycles > wq->cycles.best)
356: wq->cycles.best = cycles;
357:
358: /* along with yielded check, provides hysteresis for granularity */
359: if (cycles > (wq->cycles.granularity * WQ_HYSTERESIS_FACTOR
360: * WQ_HYSTERESIS_FACTOR))
361: wq->cycles.granularity *= WQ_HYSTERESIS_FACTOR; /* quick ramp-up */
362: else if (cycles > (wq->cycles.granularity * WQ_HYSTERESIS_FACTOR))
363: wq->cycles.granularity += WQ_HYSTERESIS_FACTOR;
364: }
365: #undef WQ_HYSTERIS_FACTOR
366:
367: wq->runs++;
368: wq->cycles.total += cycles;
369:
370: #if 0
371: printf ("%s: cycles %d, new: best %d, worst %d\n",
372: __func__, cycles, wq->cycles.best, wq->cycles.granularity);
373: #endif
374:
375: /* Is the queue done yet? If it is, call the completion callback. */
376: if (listcount (wq->items) > 0)
377: work_queue_schedule (wq, 0);
378: else if (wq->spec.completion_func)
379: wq->spec.completion_func (wq);
380:
381: return 0;
382: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>