Annotation of embedaddon/quagga/lib/workqueue.c, revision 1.1.1.3

1.1       misho       1: /* 
                      2:  * Quagga Work Queue Support.
                      3:  *
                      4:  * Copyright (C) 2005 Sun Microsystems, Inc.
                      5:  *
                      6:  * This file is part of GNU Zebra.
                      7:  *
                      8:  * Quagga is free software; you can redistribute it and/or modify it
                      9:  * under the terms of the GNU General Public License as published by the
                     10:  * Free Software Foundation; either version 2, or (at your option) any
                     11:  * later version.
                     12:  *
                     13:  * Quagga is distributed in the hope that it will be useful, but
                     14:  * WITHOUT ANY WARRANTY; without even the implied warranty of
                     15:  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
                     16:  * General Public License for more details.
                     17:  *
                     18:  * You should have received a copy of the GNU General Public License
                     19:  * along with Quagga; see the file COPYING.  If not, write to the Free
                     20:  * Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
                     21:  * 02111-1307, USA.  
                     22:  */
                     23: 
1.1.1.3 ! misho      24: #include <zebra.h>
1.1       misho      25: #include "thread.h"
                     26: #include "memory.h"
                     27: #include "workqueue.h"
                     28: #include "linklist.h"
                     29: #include "command.h"
                     30: #include "log.h"
                     31: 
                     32: /* master list of work_queues */
1.1.1.3 ! misho      33: static struct list _work_queues;
        !            34: /* pointer primarily to avoid an otherwise harmless warning on
        !            35:  * ALL_LIST_ELEMENTS_RO 
        !            36:  */
        !            37: static struct list *work_queues = &_work_queues;
1.1       misho      38: 
                     39: #define WORK_QUEUE_MIN_GRANULARITY 1
                     40: 
                     41: static struct work_queue_item *
                     42: work_queue_item_new (struct work_queue *wq)
                     43: {
                     44:   struct work_queue_item *item;
                     45:   assert (wq);
                     46: 
                     47:   item = XCALLOC (MTYPE_WORK_QUEUE_ITEM, 
                     48:                   sizeof (struct work_queue_item));
                     49:   
                     50:   return item;
                     51: }
                     52: 
                     53: static void
                     54: work_queue_item_free (struct work_queue_item *item)
                     55: {
                     56:   XFREE (MTYPE_WORK_QUEUE_ITEM, item);
                     57:   return;
                     58: }
                     59: 
                     60: /* create new work queue */
                     61: struct work_queue *
                     62: work_queue_new (struct thread_master *m, const char *queue_name)
                     63: {
                     64:   struct work_queue *new;
                     65:   
                     66:   new = XCALLOC (MTYPE_WORK_QUEUE, sizeof (struct work_queue));
                     67: 
                     68:   if (new == NULL)
                     69:     return new;
                     70:   
                     71:   new->name = XSTRDUP (MTYPE_WORK_QUEUE_NAME, queue_name);
                     72:   new->master = m;
                     73:   SET_FLAG (new->flags, WQ_UNPLUGGED);
                     74:   
                     75:   if ( (new->items = list_new ()) == NULL)
                     76:     {
                     77:       XFREE (MTYPE_WORK_QUEUE_NAME, new->name);
                     78:       XFREE (MTYPE_WORK_QUEUE, new);
                     79:       
                     80:       return NULL;
                     81:     }
                     82:   
                     83:   new->items->del = (void (*)(void *)) work_queue_item_free;  
                     84:   
1.1.1.3 ! misho      85:   listnode_add (work_queues, new);
1.1       misho      86:   
                     87:   new->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY;
                     88: 
                     89:   /* Default values, can be overriden by caller */
                     90:   new->spec.hold = WORK_QUEUE_DEFAULT_HOLD;
                     91:     
                     92:   return new;
                     93: }
                     94: 
                     95: void
                     96: work_queue_free (struct work_queue *wq)
                     97: {
                     98:   if (wq->thread != NULL)
                     99:     thread_cancel(wq->thread);
                    100:   
                    101:   /* list_delete frees items via callback */
                    102:   list_delete (wq->items);
1.1.1.3 ! misho     103:   listnode_delete (work_queues, wq);
1.1       misho     104:   
                    105:   XFREE (MTYPE_WORK_QUEUE_NAME, wq->name);
                    106:   XFREE (MTYPE_WORK_QUEUE, wq);
                    107:   return;
                    108: }
                    109: 
1.1.1.3 ! misho     110: bool
        !           111: work_queue_is_scheduled (struct work_queue *wq)
        !           112: {
        !           113:   return (wq->thread != NULL);
        !           114: }
        !           115: 
1.1.1.2   misho     116: static int
1.1       misho     117: work_queue_schedule (struct work_queue *wq, unsigned int delay)
                    118: {
                    119:   /* if appropriate, schedule work queue thread */
                    120:   if ( CHECK_FLAG (wq->flags, WQ_UNPLUGGED)
                    121:        && (wq->thread == NULL)
                    122:        && (listcount (wq->items) > 0) )
                    123:     {
                    124:       wq->thread = thread_add_background (wq->master, work_queue_run, 
                    125:                                           wq, delay);
                    126:       return 1;
                    127:     }
                    128:   else
                    129:     return 0;
                    130: }
                    131:   
                    132: void
                    133: work_queue_add (struct work_queue *wq, void *data)
                    134: {
                    135:   struct work_queue_item *item;
                    136:   
                    137:   assert (wq);
                    138: 
                    139:   if (!(item = work_queue_item_new (wq)))
                    140:     {
                    141:       zlog_err ("%s: unable to get new queue item", __func__);
                    142:       return;
                    143:     }
                    144:   
                    145:   item->data = data;
                    146:   listnode_add (wq->items, item);
                    147:   
                    148:   work_queue_schedule (wq, wq->spec.hold);
                    149:   
                    150:   return;
                    151: }
                    152: 
                    153: static void
                    154: work_queue_item_remove (struct work_queue *wq, struct listnode *ln)
                    155: {
                    156:   struct work_queue_item *item = listgetdata (ln);
                    157: 
                    158:   assert (item && item->data);
                    159: 
                    160:   /* call private data deletion callback if needed */  
                    161:   if (wq->spec.del_item_data)
                    162:     wq->spec.del_item_data (wq, item->data);
                    163: 
                    164:   list_delete_node (wq->items, ln);
                    165:   work_queue_item_free (item);
                    166:   
                    167:   return;
                    168: }
                    169: 
                    170: static void
                    171: work_queue_item_requeue (struct work_queue *wq, struct listnode *ln)
                    172: {
                    173:   LISTNODE_DETACH (wq->items, ln);
                    174:   LISTNODE_ATTACH (wq->items, ln); /* attach to end of list */
                    175: }
                    176: 
                    177: DEFUN(show_work_queues,
                    178:       show_work_queues_cmd,
                    179:       "show work-queues",
                    180:       SHOW_STR
                    181:       "Work Queue information\n")
                    182: {
                    183:   struct listnode *node;
                    184:   struct work_queue *wq;
                    185:   
                    186:   vty_out (vty, 
                    187:            "%c %8s %5s %8s %21s%s",
                    188:            ' ', "List","(ms) ","Q. Runs","Cycle Counts   ",
                    189:            VTY_NEWLINE);
                    190:   vty_out (vty,
                    191:            "%c %8s %5s %8s %7s %6s %6s %s%s",
                    192:            'P',
                    193:            "Items",
                    194:            "Hold",
                    195:            "Total",
                    196:            "Best","Gran.","Avg.", 
                    197:            "Name", 
                    198:            VTY_NEWLINE);
                    199:  
1.1.1.3 ! misho     200:   for (ALL_LIST_ELEMENTS_RO (work_queues, node, wq))
1.1       misho     201:     {
                    202:       vty_out (vty,"%c %8d %5d %8ld %7d %6d %6u %s%s",
                    203:                (CHECK_FLAG (wq->flags, WQ_UNPLUGGED) ? ' ' : 'P'),
                    204:                listcount (wq->items),
                    205:                wq->spec.hold,
                    206:                wq->runs,
                    207:                wq->cycles.best, wq->cycles.granularity,
                    208:                  (wq->runs) ? 
                    209:                    (unsigned int) (wq->cycles.total / wq->runs) : 0,
                    210:                wq->name,
                    211:                VTY_NEWLINE);
                    212:     }
                    213:     
                    214:   return CMD_SUCCESS;
                    215: }
                    216: 
                    217: /* 'plug' a queue: Stop it from being scheduled,
                    218:  * ie: prevent the queue from draining.
                    219:  */
                    220: void
                    221: work_queue_plug (struct work_queue *wq)
                    222: {
                    223:   if (wq->thread)
                    224:     thread_cancel (wq->thread);
                    225:   
                    226:   wq->thread = NULL;
                    227:   
                    228:   UNSET_FLAG (wq->flags, WQ_UNPLUGGED);
                    229: }
                    230: 
                    231: /* unplug queue, schedule it again, if appropriate
                    232:  * Ie: Allow the queue to be drained again
                    233:  */
                    234: void
                    235: work_queue_unplug (struct work_queue *wq)
                    236: {
                    237:   SET_FLAG (wq->flags, WQ_UNPLUGGED);
                    238: 
                    239:   /* if thread isnt already waiting, add one */
                    240:   work_queue_schedule (wq, wq->spec.hold);
                    241: }
                    242: 
                    243: /* timer thread to process a work queue
                    244:  * will reschedule itself if required,
                    245:  * otherwise work_queue_item_add 
                    246:  */
                    247: int
                    248: work_queue_run (struct thread *thread)
                    249: {
                    250:   struct work_queue *wq;
                    251:   struct work_queue_item *item;
                    252:   wq_item_status ret;
                    253:   unsigned int cycles = 0;
                    254:   struct listnode *node, *nnode;
                    255:   char yielded = 0;
                    256: 
                    257:   wq = THREAD_ARG (thread);
                    258:   wq->thread = NULL;
                    259: 
                    260:   assert (wq && wq->items);
                    261: 
                    262:   /* calculate cycle granularity:
                    263:    * list iteration == 1 cycle
                    264:    * granularity == # cycles between checks whether we should yield.
                    265:    *
                    266:    * granularity should be > 0, and can increase slowly after each run to
                    267:    * provide some hysteris, but not past cycles.best or 2*cycles.
                    268:    *
                    269:    * Best: starts low, can only increase
                    270:    *
                    271:    * Granularity: starts at WORK_QUEUE_MIN_GRANULARITY, can be decreased 
                    272:    *              if we run to end of time slot, can increase otherwise 
                    273:    *              by a small factor.
                    274:    *
                    275:    * We could use just the average and save some work, however we want to be
                    276:    * able to adjust quickly to CPU pressure. Average wont shift much if
                    277:    * daemon has been running a long time.
                    278:    */
                    279:    if (wq->cycles.granularity == 0)
                    280:      wq->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY;
                    281: 
                    282:   for (ALL_LIST_ELEMENTS (wq->items, node, nnode, item))
                    283:   {
                    284:     assert (item && item->data);
                    285:     
                    286:     /* dont run items which are past their allowed retries */
                    287:     if (item->ran > wq->spec.max_retries)
                    288:       {
                    289:         /* run error handler, if any */
                    290:        if (wq->spec.errorfunc)
                    291:          wq->spec.errorfunc (wq, item->data);
                    292:        work_queue_item_remove (wq, node);
                    293:        continue;
                    294:       }
                    295: 
                    296:     /* run and take care of items that want to be retried immediately */
                    297:     do
                    298:       {
                    299:         ret = wq->spec.workfunc (wq, item->data);
                    300:         item->ran++;
                    301:       }
                    302:     while ((ret == WQ_RETRY_NOW) 
                    303:            && (item->ran < wq->spec.max_retries));
                    304: 
                    305:     switch (ret)
                    306:       {
                    307:       case WQ_QUEUE_BLOCKED:
                    308:         {
                    309:           /* decrement item->ran again, cause this isn't an item
                    310:            * specific error, and fall through to WQ_RETRY_LATER
                    311:            */
                    312:           item->ran--;
                    313:         }
                    314:       case WQ_RETRY_LATER:
                    315:        {
                    316:          goto stats;
                    317:        }
                    318:       case WQ_REQUEUE:
                    319:        {
                    320:          item->ran--;
                    321:          work_queue_item_requeue (wq, node);
                    322:          break;
                    323:        }
                    324:       case WQ_RETRY_NOW:
                    325:         /* a RETRY_NOW that gets here has exceeded max_tries, same as ERROR */
                    326:       case WQ_ERROR:
                    327:        {
                    328:          if (wq->spec.errorfunc)
                    329:            wq->spec.errorfunc (wq, item);
                    330:        }
                    331:        /* fall through here is deliberate */
                    332:       case WQ_SUCCESS:
                    333:       default:
                    334:        {
                    335:          work_queue_item_remove (wq, node);
                    336:          break;
                    337:        }
                    338:       }
                    339: 
                    340:     /* completed cycle */
                    341:     cycles++;
                    342: 
                    343:     /* test if we should yield */
                    344:     if ( !(cycles % wq->cycles.granularity) 
                    345:         && thread_should_yield (thread))
                    346:       {
                    347:         yielded = 1;
                    348:         goto stats;
                    349:       }
                    350:   }
                    351: 
                    352: stats:
                    353: 
                    354: #define WQ_HYSTERESIS_FACTOR 4
                    355: 
                    356:   /* we yielded, check whether granularity should be reduced */
                    357:   if (yielded && (cycles < wq->cycles.granularity))
                    358:     {
                    359:       wq->cycles.granularity = ((cycles > 0) ? cycles 
                    360:                                              : WORK_QUEUE_MIN_GRANULARITY);
                    361:     }
                    362:   /* otherwise, should granularity increase? */
                    363:   else if (cycles >= (wq->cycles.granularity))
                    364:     {
                    365:       if (cycles > wq->cycles.best)
                    366:         wq->cycles.best = cycles;
                    367:       
                    368:       /* along with yielded check, provides hysteresis for granularity */
                    369:       if (cycles > (wq->cycles.granularity * WQ_HYSTERESIS_FACTOR
                    370:                                            * WQ_HYSTERESIS_FACTOR))
                    371:         wq->cycles.granularity *= WQ_HYSTERESIS_FACTOR; /* quick ramp-up */
                    372:       else if (cycles > (wq->cycles.granularity * WQ_HYSTERESIS_FACTOR))
                    373:         wq->cycles.granularity += WQ_HYSTERESIS_FACTOR;
                    374:     }
                    375: #undef WQ_HYSTERIS_FACTOR
                    376:   
                    377:   wq->runs++;
                    378:   wq->cycles.total += cycles;
                    379: 
                    380: #if 0
                    381:   printf ("%s: cycles %d, new: best %d, worst %d\n",
                    382:             __func__, cycles, wq->cycles.best, wq->cycles.granularity);
                    383: #endif
                    384:   
                    385:   /* Is the queue done yet? If it is, call the completion callback. */
                    386:   if (listcount (wq->items) > 0)
                    387:     work_queue_schedule (wq, 0);
                    388:   else if (wq->spec.completion_func)
                    389:     wq->spec.completion_func (wq);
                    390:   
                    391:   return 0;
                    392: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>