Annotation of embedaddon/quagga/lib/workqueue.c, revision 1.1
1.1 ! misho 1: /*
! 2: * Quagga Work Queue Support.
! 3: *
! 4: * Copyright (C) 2005 Sun Microsystems, Inc.
! 5: *
! 6: * This file is part of GNU Zebra.
! 7: *
! 8: * Quagga is free software; you can redistribute it and/or modify it
! 9: * under the terms of the GNU General Public License as published by the
! 10: * Free Software Foundation; either version 2, or (at your option) any
! 11: * later version.
! 12: *
! 13: * Quagga is distributed in the hope that it will be useful, but
! 14: * WITHOUT ANY WARRANTY; without even the implied warranty of
! 15: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
! 16: * General Public License for more details.
! 17: *
! 18: * You should have received a copy of the GNU General Public License
! 19: * along with Quagga; see the file COPYING. If not, write to the Free
! 20: * Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
! 21: * 02111-1307, USA.
! 22: */
! 23:
! 24: #include <lib/zebra.h>
! 25: #include "thread.h"
! 26: #include "memory.h"
! 27: #include "workqueue.h"
! 28: #include "linklist.h"
! 29: #include "command.h"
! 30: #include "log.h"
! 31:
! 32: /* master list of work_queues */
! 33: static struct list work_queues;
! 34:
! 35: #define WORK_QUEUE_MIN_GRANULARITY 1
! 36:
! 37: static struct work_queue_item *
! 38: work_queue_item_new (struct work_queue *wq)
! 39: {
! 40: struct work_queue_item *item;
! 41: assert (wq);
! 42:
! 43: item = XCALLOC (MTYPE_WORK_QUEUE_ITEM,
! 44: sizeof (struct work_queue_item));
! 45:
! 46: return item;
! 47: }
! 48:
! 49: static void
! 50: work_queue_item_free (struct work_queue_item *item)
! 51: {
! 52: XFREE (MTYPE_WORK_QUEUE_ITEM, item);
! 53: return;
! 54: }
! 55:
! 56: /* create new work queue */
! 57: struct work_queue *
! 58: work_queue_new (struct thread_master *m, const char *queue_name)
! 59: {
! 60: struct work_queue *new;
! 61:
! 62: new = XCALLOC (MTYPE_WORK_QUEUE, sizeof (struct work_queue));
! 63:
! 64: if (new == NULL)
! 65: return new;
! 66:
! 67: new->name = XSTRDUP (MTYPE_WORK_QUEUE_NAME, queue_name);
! 68: new->master = m;
! 69: SET_FLAG (new->flags, WQ_UNPLUGGED);
! 70:
! 71: if ( (new->items = list_new ()) == NULL)
! 72: {
! 73: XFREE (MTYPE_WORK_QUEUE_NAME, new->name);
! 74: XFREE (MTYPE_WORK_QUEUE, new);
! 75:
! 76: return NULL;
! 77: }
! 78:
! 79: new->items->del = (void (*)(void *)) work_queue_item_free;
! 80:
! 81: listnode_add (&work_queues, new);
! 82:
! 83: new->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY;
! 84:
! 85: /* Default values, can be overriden by caller */
! 86: new->spec.hold = WORK_QUEUE_DEFAULT_HOLD;
! 87:
! 88: return new;
! 89: }
! 90:
! 91: void
! 92: work_queue_free (struct work_queue *wq)
! 93: {
! 94: if (wq->thread != NULL)
! 95: thread_cancel(wq->thread);
! 96:
! 97: /* list_delete frees items via callback */
! 98: list_delete (wq->items);
! 99: listnode_delete (&work_queues, wq);
! 100:
! 101: XFREE (MTYPE_WORK_QUEUE_NAME, wq->name);
! 102: XFREE (MTYPE_WORK_QUEUE, wq);
! 103: return;
! 104: }
! 105:
! 106: static inline int
! 107: work_queue_schedule (struct work_queue *wq, unsigned int delay)
! 108: {
! 109: /* if appropriate, schedule work queue thread */
! 110: if ( CHECK_FLAG (wq->flags, WQ_UNPLUGGED)
! 111: && (wq->thread == NULL)
! 112: && (listcount (wq->items) > 0) )
! 113: {
! 114: wq->thread = thread_add_background (wq->master, work_queue_run,
! 115: wq, delay);
! 116: return 1;
! 117: }
! 118: else
! 119: return 0;
! 120: }
! 121:
! 122: void
! 123: work_queue_add (struct work_queue *wq, void *data)
! 124: {
! 125: struct work_queue_item *item;
! 126:
! 127: assert (wq);
! 128:
! 129: if (!(item = work_queue_item_new (wq)))
! 130: {
! 131: zlog_err ("%s: unable to get new queue item", __func__);
! 132: return;
! 133: }
! 134:
! 135: item->data = data;
! 136: listnode_add (wq->items, item);
! 137:
! 138: work_queue_schedule (wq, wq->spec.hold);
! 139:
! 140: return;
! 141: }
! 142:
! 143: static void
! 144: work_queue_item_remove (struct work_queue *wq, struct listnode *ln)
! 145: {
! 146: struct work_queue_item *item = listgetdata (ln);
! 147:
! 148: assert (item && item->data);
! 149:
! 150: /* call private data deletion callback if needed */
! 151: if (wq->spec.del_item_data)
! 152: wq->spec.del_item_data (wq, item->data);
! 153:
! 154: list_delete_node (wq->items, ln);
! 155: work_queue_item_free (item);
! 156:
! 157: return;
! 158: }
! 159:
! 160: static void
! 161: work_queue_item_requeue (struct work_queue *wq, struct listnode *ln)
! 162: {
! 163: LISTNODE_DETACH (wq->items, ln);
! 164: LISTNODE_ATTACH (wq->items, ln); /* attach to end of list */
! 165: }
! 166:
! 167: DEFUN(show_work_queues,
! 168: show_work_queues_cmd,
! 169: "show work-queues",
! 170: SHOW_STR
! 171: "Work Queue information\n")
! 172: {
! 173: struct listnode *node;
! 174: struct work_queue *wq;
! 175:
! 176: vty_out (vty,
! 177: "%c %8s %5s %8s %21s%s",
! 178: ' ', "List","(ms) ","Q. Runs","Cycle Counts ",
! 179: VTY_NEWLINE);
! 180: vty_out (vty,
! 181: "%c %8s %5s %8s %7s %6s %6s %s%s",
! 182: 'P',
! 183: "Items",
! 184: "Hold",
! 185: "Total",
! 186: "Best","Gran.","Avg.",
! 187: "Name",
! 188: VTY_NEWLINE);
! 189:
! 190: for (ALL_LIST_ELEMENTS_RO ((&work_queues), node, wq))
! 191: {
! 192: vty_out (vty,"%c %8d %5d %8ld %7d %6d %6u %s%s",
! 193: (CHECK_FLAG (wq->flags, WQ_UNPLUGGED) ? ' ' : 'P'),
! 194: listcount (wq->items),
! 195: wq->spec.hold,
! 196: wq->runs,
! 197: wq->cycles.best, wq->cycles.granularity,
! 198: (wq->runs) ?
! 199: (unsigned int) (wq->cycles.total / wq->runs) : 0,
! 200: wq->name,
! 201: VTY_NEWLINE);
! 202: }
! 203:
! 204: return CMD_SUCCESS;
! 205: }
! 206:
! 207: /* 'plug' a queue: Stop it from being scheduled,
! 208: * ie: prevent the queue from draining.
! 209: */
! 210: void
! 211: work_queue_plug (struct work_queue *wq)
! 212: {
! 213: if (wq->thread)
! 214: thread_cancel (wq->thread);
! 215:
! 216: wq->thread = NULL;
! 217:
! 218: UNSET_FLAG (wq->flags, WQ_UNPLUGGED);
! 219: }
! 220:
! 221: /* unplug queue, schedule it again, if appropriate
! 222: * Ie: Allow the queue to be drained again
! 223: */
! 224: void
! 225: work_queue_unplug (struct work_queue *wq)
! 226: {
! 227: SET_FLAG (wq->flags, WQ_UNPLUGGED);
! 228:
! 229: /* if thread isnt already waiting, add one */
! 230: work_queue_schedule (wq, wq->spec.hold);
! 231: }
! 232:
! 233: /* timer thread to process a work queue
! 234: * will reschedule itself if required,
! 235: * otherwise work_queue_item_add
! 236: */
! 237: int
! 238: work_queue_run (struct thread *thread)
! 239: {
! 240: struct work_queue *wq;
! 241: struct work_queue_item *item;
! 242: wq_item_status ret;
! 243: unsigned int cycles = 0;
! 244: struct listnode *node, *nnode;
! 245: char yielded = 0;
! 246:
! 247: wq = THREAD_ARG (thread);
! 248: wq->thread = NULL;
! 249:
! 250: assert (wq && wq->items);
! 251:
! 252: /* calculate cycle granularity:
! 253: * list iteration == 1 cycle
! 254: * granularity == # cycles between checks whether we should yield.
! 255: *
! 256: * granularity should be > 0, and can increase slowly after each run to
! 257: * provide some hysteris, but not past cycles.best or 2*cycles.
! 258: *
! 259: * Best: starts low, can only increase
! 260: *
! 261: * Granularity: starts at WORK_QUEUE_MIN_GRANULARITY, can be decreased
! 262: * if we run to end of time slot, can increase otherwise
! 263: * by a small factor.
! 264: *
! 265: * We could use just the average and save some work, however we want to be
! 266: * able to adjust quickly to CPU pressure. Average wont shift much if
! 267: * daemon has been running a long time.
! 268: */
! 269: if (wq->cycles.granularity == 0)
! 270: wq->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY;
! 271:
! 272: for (ALL_LIST_ELEMENTS (wq->items, node, nnode, item))
! 273: {
! 274: assert (item && item->data);
! 275:
! 276: /* dont run items which are past their allowed retries */
! 277: if (item->ran > wq->spec.max_retries)
! 278: {
! 279: /* run error handler, if any */
! 280: if (wq->spec.errorfunc)
! 281: wq->spec.errorfunc (wq, item->data);
! 282: work_queue_item_remove (wq, node);
! 283: continue;
! 284: }
! 285:
! 286: /* run and take care of items that want to be retried immediately */
! 287: do
! 288: {
! 289: ret = wq->spec.workfunc (wq, item->data);
! 290: item->ran++;
! 291: }
! 292: while ((ret == WQ_RETRY_NOW)
! 293: && (item->ran < wq->spec.max_retries));
! 294:
! 295: switch (ret)
! 296: {
! 297: case WQ_QUEUE_BLOCKED:
! 298: {
! 299: /* decrement item->ran again, cause this isn't an item
! 300: * specific error, and fall through to WQ_RETRY_LATER
! 301: */
! 302: item->ran--;
! 303: }
! 304: case WQ_RETRY_LATER:
! 305: {
! 306: goto stats;
! 307: }
! 308: case WQ_REQUEUE:
! 309: {
! 310: item->ran--;
! 311: work_queue_item_requeue (wq, node);
! 312: break;
! 313: }
! 314: case WQ_RETRY_NOW:
! 315: /* a RETRY_NOW that gets here has exceeded max_tries, same as ERROR */
! 316: case WQ_ERROR:
! 317: {
! 318: if (wq->spec.errorfunc)
! 319: wq->spec.errorfunc (wq, item);
! 320: }
! 321: /* fall through here is deliberate */
! 322: case WQ_SUCCESS:
! 323: default:
! 324: {
! 325: work_queue_item_remove (wq, node);
! 326: break;
! 327: }
! 328: }
! 329:
! 330: /* completed cycle */
! 331: cycles++;
! 332:
! 333: /* test if we should yield */
! 334: if ( !(cycles % wq->cycles.granularity)
! 335: && thread_should_yield (thread))
! 336: {
! 337: yielded = 1;
! 338: goto stats;
! 339: }
! 340: }
! 341:
! 342: stats:
! 343:
! 344: #define WQ_HYSTERESIS_FACTOR 4
! 345:
! 346: /* we yielded, check whether granularity should be reduced */
! 347: if (yielded && (cycles < wq->cycles.granularity))
! 348: {
! 349: wq->cycles.granularity = ((cycles > 0) ? cycles
! 350: : WORK_QUEUE_MIN_GRANULARITY);
! 351: }
! 352: /* otherwise, should granularity increase? */
! 353: else if (cycles >= (wq->cycles.granularity))
! 354: {
! 355: if (cycles > wq->cycles.best)
! 356: wq->cycles.best = cycles;
! 357:
! 358: /* along with yielded check, provides hysteresis for granularity */
! 359: if (cycles > (wq->cycles.granularity * WQ_HYSTERESIS_FACTOR
! 360: * WQ_HYSTERESIS_FACTOR))
! 361: wq->cycles.granularity *= WQ_HYSTERESIS_FACTOR; /* quick ramp-up */
! 362: else if (cycles > (wq->cycles.granularity * WQ_HYSTERESIS_FACTOR))
! 363: wq->cycles.granularity += WQ_HYSTERESIS_FACTOR;
! 364: }
! 365: #undef WQ_HYSTERIS_FACTOR
! 366:
! 367: wq->runs++;
! 368: wq->cycles.total += cycles;
! 369:
! 370: #if 0
! 371: printf ("%s: cycles %d, new: best %d, worst %d\n",
! 372: __func__, cycles, wq->cycles.best, wq->cycles.granularity);
! 373: #endif
! 374:
! 375: /* Is the queue done yet? If it is, call the completion callback. */
! 376: if (listcount (wq->items) > 0)
! 377: work_queue_schedule (wq, 0);
! 378: else if (wq->spec.completion_func)
! 379: wq->spec.completion_func (wq);
! 380:
! 381: return 0;
! 382: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>