Annotation of embedaddon/quagga/lib/workqueue.c, revision 1.1.1.2

1.1       misho       1: /* 
                      2:  * Quagga Work Queue Support.
                      3:  *
                      4:  * Copyright (C) 2005 Sun Microsystems, Inc.
                      5:  *
                      6:  * This file is part of GNU Zebra.
                      7:  *
                      8:  * Quagga is free software; you can redistribute it and/or modify it
                      9:  * under the terms of the GNU General Public License as published by the
                     10:  * Free Software Foundation; either version 2, or (at your option) any
                     11:  * later version.
                     12:  *
                     13:  * Quagga is distributed in the hope that it will be useful, but
                     14:  * WITHOUT ANY WARRANTY; without even the implied warranty of
                     15:  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
                     16:  * General Public License for more details.
                     17:  *
                     18:  * You should have received a copy of the GNU General Public License
                     19:  * along with Quagga; see the file COPYING.  If not, write to the Free
                     20:  * Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
                     21:  * 02111-1307, USA.  
                     22:  */
                     23: 
                     24: #include <lib/zebra.h>
                     25: #include "thread.h"
                     26: #include "memory.h"
                     27: #include "workqueue.h"
                     28: #include "linklist.h"
                     29: #include "command.h"
                     30: #include "log.h"
                     31: 
                     32: /* master list of work_queues */
                     33: static struct list work_queues;
                     34: 
                     35: #define WORK_QUEUE_MIN_GRANULARITY 1
                     36: 
                     37: static struct work_queue_item *
                     38: work_queue_item_new (struct work_queue *wq)
                     39: {
                     40:   struct work_queue_item *item;
                     41:   assert (wq);
                     42: 
                     43:   item = XCALLOC (MTYPE_WORK_QUEUE_ITEM, 
                     44:                   sizeof (struct work_queue_item));
                     45:   
                     46:   return item;
                     47: }
                     48: 
                     49: static void
                     50: work_queue_item_free (struct work_queue_item *item)
                     51: {
                     52:   XFREE (MTYPE_WORK_QUEUE_ITEM, item);
                     53:   return;
                     54: }
                     55: 
                     56: /* create new work queue */
                     57: struct work_queue *
                     58: work_queue_new (struct thread_master *m, const char *queue_name)
                     59: {
                     60:   struct work_queue *new;
                     61:   
                     62:   new = XCALLOC (MTYPE_WORK_QUEUE, sizeof (struct work_queue));
                     63: 
                     64:   if (new == NULL)
                     65:     return new;
                     66:   
                     67:   new->name = XSTRDUP (MTYPE_WORK_QUEUE_NAME, queue_name);
                     68:   new->master = m;
                     69:   SET_FLAG (new->flags, WQ_UNPLUGGED);
                     70:   
                     71:   if ( (new->items = list_new ()) == NULL)
                     72:     {
                     73:       XFREE (MTYPE_WORK_QUEUE_NAME, new->name);
                     74:       XFREE (MTYPE_WORK_QUEUE, new);
                     75:       
                     76:       return NULL;
                     77:     }
                     78:   
                     79:   new->items->del = (void (*)(void *)) work_queue_item_free;  
                     80:   
                     81:   listnode_add (&work_queues, new);
                     82:   
                     83:   new->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY;
                     84: 
                     85:   /* Default values, can be overriden by caller */
                     86:   new->spec.hold = WORK_QUEUE_DEFAULT_HOLD;
                     87:     
                     88:   return new;
                     89: }
                     90: 
                     91: void
                     92: work_queue_free (struct work_queue *wq)
                     93: {
                     94:   if (wq->thread != NULL)
                     95:     thread_cancel(wq->thread);
                     96:   
                     97:   /* list_delete frees items via callback */
                     98:   list_delete (wq->items);
                     99:   listnode_delete (&work_queues, wq);
                    100:   
                    101:   XFREE (MTYPE_WORK_QUEUE_NAME, wq->name);
                    102:   XFREE (MTYPE_WORK_QUEUE, wq);
                    103:   return;
                    104: }
                    105: 
1.1.1.2 ! misho     106: static int
1.1       misho     107: work_queue_schedule (struct work_queue *wq, unsigned int delay)
                    108: {
                    109:   /* if appropriate, schedule work queue thread */
                    110:   if ( CHECK_FLAG (wq->flags, WQ_UNPLUGGED)
                    111:        && (wq->thread == NULL)
                    112:        && (listcount (wq->items) > 0) )
                    113:     {
                    114:       wq->thread = thread_add_background (wq->master, work_queue_run, 
                    115:                                           wq, delay);
                    116:       return 1;
                    117:     }
                    118:   else
                    119:     return 0;
                    120: }
                    121:   
                    122: void
                    123: work_queue_add (struct work_queue *wq, void *data)
                    124: {
                    125:   struct work_queue_item *item;
                    126:   
                    127:   assert (wq);
                    128: 
                    129:   if (!(item = work_queue_item_new (wq)))
                    130:     {
                    131:       zlog_err ("%s: unable to get new queue item", __func__);
                    132:       return;
                    133:     }
                    134:   
                    135:   item->data = data;
                    136:   listnode_add (wq->items, item);
                    137:   
                    138:   work_queue_schedule (wq, wq->spec.hold);
                    139:   
                    140:   return;
                    141: }
                    142: 
                    143: static void
                    144: work_queue_item_remove (struct work_queue *wq, struct listnode *ln)
                    145: {
                    146:   struct work_queue_item *item = listgetdata (ln);
                    147: 
                    148:   assert (item && item->data);
                    149: 
                    150:   /* call private data deletion callback if needed */  
                    151:   if (wq->spec.del_item_data)
                    152:     wq->spec.del_item_data (wq, item->data);
                    153: 
                    154:   list_delete_node (wq->items, ln);
                    155:   work_queue_item_free (item);
                    156:   
                    157:   return;
                    158: }
                    159: 
                    160: static void
                    161: work_queue_item_requeue (struct work_queue *wq, struct listnode *ln)
                    162: {
                    163:   LISTNODE_DETACH (wq->items, ln);
                    164:   LISTNODE_ATTACH (wq->items, ln); /* attach to end of list */
                    165: }
                    166: 
                    167: DEFUN(show_work_queues,
                    168:       show_work_queues_cmd,
                    169:       "show work-queues",
                    170:       SHOW_STR
                    171:       "Work Queue information\n")
                    172: {
                    173:   struct listnode *node;
                    174:   struct work_queue *wq;
                    175:   
                    176:   vty_out (vty, 
                    177:            "%c %8s %5s %8s %21s%s",
                    178:            ' ', "List","(ms) ","Q. Runs","Cycle Counts   ",
                    179:            VTY_NEWLINE);
                    180:   vty_out (vty,
                    181:            "%c %8s %5s %8s %7s %6s %6s %s%s",
                    182:            'P',
                    183:            "Items",
                    184:            "Hold",
                    185:            "Total",
                    186:            "Best","Gran.","Avg.", 
                    187:            "Name", 
                    188:            VTY_NEWLINE);
                    189:  
                    190:   for (ALL_LIST_ELEMENTS_RO ((&work_queues), node, wq))
                    191:     {
                    192:       vty_out (vty,"%c %8d %5d %8ld %7d %6d %6u %s%s",
                    193:                (CHECK_FLAG (wq->flags, WQ_UNPLUGGED) ? ' ' : 'P'),
                    194:                listcount (wq->items),
                    195:                wq->spec.hold,
                    196:                wq->runs,
                    197:                wq->cycles.best, wq->cycles.granularity,
                    198:                  (wq->runs) ? 
                    199:                    (unsigned int) (wq->cycles.total / wq->runs) : 0,
                    200:                wq->name,
                    201:                VTY_NEWLINE);
                    202:     }
                    203:     
                    204:   return CMD_SUCCESS;
                    205: }
                    206: 
                    207: /* 'plug' a queue: Stop it from being scheduled,
                    208:  * ie: prevent the queue from draining.
                    209:  */
                    210: void
                    211: work_queue_plug (struct work_queue *wq)
                    212: {
                    213:   if (wq->thread)
                    214:     thread_cancel (wq->thread);
                    215:   
                    216:   wq->thread = NULL;
                    217:   
                    218:   UNSET_FLAG (wq->flags, WQ_UNPLUGGED);
                    219: }
                    220: 
                    221: /* unplug queue, schedule it again, if appropriate
                    222:  * Ie: Allow the queue to be drained again
                    223:  */
                    224: void
                    225: work_queue_unplug (struct work_queue *wq)
                    226: {
                    227:   SET_FLAG (wq->flags, WQ_UNPLUGGED);
                    228: 
                    229:   /* if thread isnt already waiting, add one */
                    230:   work_queue_schedule (wq, wq->spec.hold);
                    231: }
                    232: 
                    233: /* timer thread to process a work queue
                    234:  * will reschedule itself if required,
                    235:  * otherwise work_queue_item_add 
                    236:  */
                    237: int
                    238: work_queue_run (struct thread *thread)
                    239: {
                    240:   struct work_queue *wq;
                    241:   struct work_queue_item *item;
                    242:   wq_item_status ret;
                    243:   unsigned int cycles = 0;
                    244:   struct listnode *node, *nnode;
                    245:   char yielded = 0;
                    246: 
                    247:   wq = THREAD_ARG (thread);
                    248:   wq->thread = NULL;
                    249: 
                    250:   assert (wq && wq->items);
                    251: 
                    252:   /* calculate cycle granularity:
                    253:    * list iteration == 1 cycle
                    254:    * granularity == # cycles between checks whether we should yield.
                    255:    *
                    256:    * granularity should be > 0, and can increase slowly after each run to
                    257:    * provide some hysteris, but not past cycles.best or 2*cycles.
                    258:    *
                    259:    * Best: starts low, can only increase
                    260:    *
                    261:    * Granularity: starts at WORK_QUEUE_MIN_GRANULARITY, can be decreased 
                    262:    *              if we run to end of time slot, can increase otherwise 
                    263:    *              by a small factor.
                    264:    *
                    265:    * We could use just the average and save some work, however we want to be
                    266:    * able to adjust quickly to CPU pressure. Average wont shift much if
                    267:    * daemon has been running a long time.
                    268:    */
                    269:    if (wq->cycles.granularity == 0)
                    270:      wq->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY;
                    271: 
                    272:   for (ALL_LIST_ELEMENTS (wq->items, node, nnode, item))
                    273:   {
                    274:     assert (item && item->data);
                    275:     
                    276:     /* dont run items which are past their allowed retries */
                    277:     if (item->ran > wq->spec.max_retries)
                    278:       {
                    279:         /* run error handler, if any */
                    280:        if (wq->spec.errorfunc)
                    281:          wq->spec.errorfunc (wq, item->data);
                    282:        work_queue_item_remove (wq, node);
                    283:        continue;
                    284:       }
                    285: 
                    286:     /* run and take care of items that want to be retried immediately */
                    287:     do
                    288:       {
                    289:         ret = wq->spec.workfunc (wq, item->data);
                    290:         item->ran++;
                    291:       }
                    292:     while ((ret == WQ_RETRY_NOW) 
                    293:            && (item->ran < wq->spec.max_retries));
                    294: 
                    295:     switch (ret)
                    296:       {
                    297:       case WQ_QUEUE_BLOCKED:
                    298:         {
                    299:           /* decrement item->ran again, cause this isn't an item
                    300:            * specific error, and fall through to WQ_RETRY_LATER
                    301:            */
                    302:           item->ran--;
                    303:         }
                    304:       case WQ_RETRY_LATER:
                    305:        {
                    306:          goto stats;
                    307:        }
                    308:       case WQ_REQUEUE:
                    309:        {
                    310:          item->ran--;
                    311:          work_queue_item_requeue (wq, node);
                    312:          break;
                    313:        }
                    314:       case WQ_RETRY_NOW:
                    315:         /* a RETRY_NOW that gets here has exceeded max_tries, same as ERROR */
                    316:       case WQ_ERROR:
                    317:        {
                    318:          if (wq->spec.errorfunc)
                    319:            wq->spec.errorfunc (wq, item);
                    320:        }
                    321:        /* fall through here is deliberate */
                    322:       case WQ_SUCCESS:
                    323:       default:
                    324:        {
                    325:          work_queue_item_remove (wq, node);
                    326:          break;
                    327:        }
                    328:       }
                    329: 
                    330:     /* completed cycle */
                    331:     cycles++;
                    332: 
                    333:     /* test if we should yield */
                    334:     if ( !(cycles % wq->cycles.granularity) 
                    335:         && thread_should_yield (thread))
                    336:       {
                    337:         yielded = 1;
                    338:         goto stats;
                    339:       }
                    340:   }
                    341: 
                    342: stats:
                    343: 
                    344: #define WQ_HYSTERESIS_FACTOR 4
                    345: 
                    346:   /* we yielded, check whether granularity should be reduced */
                    347:   if (yielded && (cycles < wq->cycles.granularity))
                    348:     {
                    349:       wq->cycles.granularity = ((cycles > 0) ? cycles 
                    350:                                              : WORK_QUEUE_MIN_GRANULARITY);
                    351:     }
                    352:   /* otherwise, should granularity increase? */
                    353:   else if (cycles >= (wq->cycles.granularity))
                    354:     {
                    355:       if (cycles > wq->cycles.best)
                    356:         wq->cycles.best = cycles;
                    357:       
                    358:       /* along with yielded check, provides hysteresis for granularity */
                    359:       if (cycles > (wq->cycles.granularity * WQ_HYSTERESIS_FACTOR
                    360:                                            * WQ_HYSTERESIS_FACTOR))
                    361:         wq->cycles.granularity *= WQ_HYSTERESIS_FACTOR; /* quick ramp-up */
                    362:       else if (cycles > (wq->cycles.granularity * WQ_HYSTERESIS_FACTOR))
                    363:         wq->cycles.granularity += WQ_HYSTERESIS_FACTOR;
                    364:     }
                    365: #undef WQ_HYSTERIS_FACTOR
                    366:   
                    367:   wq->runs++;
                    368:   wq->cycles.total += cycles;
                    369: 
                    370: #if 0
                    371:   printf ("%s: cycles %d, new: best %d, worst %d\n",
                    372:             __func__, cycles, wq->cycles.best, wq->cycles.granularity);
                    373: #endif
                    374:   
                    375:   /* Is the queue done yet? If it is, call the completion callback. */
                    376:   if (listcount (wq->items) > 0)
                    377:     work_queue_schedule (wq, 0);
                    378:   else if (wq->spec.completion_func)
                    379:     wq->spec.completion_func (wq);
                    380:   
                    381:   return 0;
                    382: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>