File:  [ELWIX - Embedded LightWeight unIX -] / embedaddon / quagga / lib / workqueue.c
Revision 1.1.1.2 (vendor branch): download - view: text, annotated - select for diffs - revision graph
Tue Oct 9 09:22:28 2012 UTC (11 years, 9 months ago) by misho
Branches: quagga, MAIN
CVS tags: v0_99_22p0, v0_99_22, v0_99_21, HEAD
quagga

    1: /* 
    2:  * Quagga Work Queue Support.
    3:  *
    4:  * Copyright (C) 2005 Sun Microsystems, Inc.
    5:  *
    6:  * This file is part of GNU Zebra.
    7:  *
    8:  * Quagga is free software; you can redistribute it and/or modify it
    9:  * under the terms of the GNU General Public License as published by the
   10:  * Free Software Foundation; either version 2, or (at your option) any
   11:  * later version.
   12:  *
   13:  * Quagga is distributed in the hope that it will be useful, but
   14:  * WITHOUT ANY WARRANTY; without even the implied warranty of
   15:  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
   16:  * General Public License for more details.
   17:  *
   18:  * You should have received a copy of the GNU General Public License
   19:  * along with Quagga; see the file COPYING.  If not, write to the Free
   20:  * Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
   21:  * 02111-1307, USA.  
   22:  */
   23: 
   24: #include <lib/zebra.h>
   25: #include "thread.h"
   26: #include "memory.h"
   27: #include "workqueue.h"
   28: #include "linklist.h"
   29: #include "command.h"
   30: #include "log.h"
   31: 
   32: /* master list of work_queues */
   33: static struct list work_queues;
   34: 
   35: #define WORK_QUEUE_MIN_GRANULARITY 1
   36: 
   37: static struct work_queue_item *
   38: work_queue_item_new (struct work_queue *wq)
   39: {
   40:   struct work_queue_item *item;
   41:   assert (wq);
   42: 
   43:   item = XCALLOC (MTYPE_WORK_QUEUE_ITEM, 
   44:                   sizeof (struct work_queue_item));
   45:   
   46:   return item;
   47: }
   48: 
   49: static void
   50: work_queue_item_free (struct work_queue_item *item)
   51: {
   52:   XFREE (MTYPE_WORK_QUEUE_ITEM, item);
   53:   return;
   54: }
   55: 
   56: /* create new work queue */
   57: struct work_queue *
   58: work_queue_new (struct thread_master *m, const char *queue_name)
   59: {
   60:   struct work_queue *new;
   61:   
   62:   new = XCALLOC (MTYPE_WORK_QUEUE, sizeof (struct work_queue));
   63: 
   64:   if (new == NULL)
   65:     return new;
   66:   
   67:   new->name = XSTRDUP (MTYPE_WORK_QUEUE_NAME, queue_name);
   68:   new->master = m;
   69:   SET_FLAG (new->flags, WQ_UNPLUGGED);
   70:   
   71:   if ( (new->items = list_new ()) == NULL)
   72:     {
   73:       XFREE (MTYPE_WORK_QUEUE_NAME, new->name);
   74:       XFREE (MTYPE_WORK_QUEUE, new);
   75:       
   76:       return NULL;
   77:     }
   78:   
   79:   new->items->del = (void (*)(void *)) work_queue_item_free;  
   80:   
   81:   listnode_add (&work_queues, new);
   82:   
   83:   new->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY;
   84: 
   85:   /* Default values, can be overriden by caller */
   86:   new->spec.hold = WORK_QUEUE_DEFAULT_HOLD;
   87:     
   88:   return new;
   89: }
   90: 
   91: void
   92: work_queue_free (struct work_queue *wq)
   93: {
   94:   if (wq->thread != NULL)
   95:     thread_cancel(wq->thread);
   96:   
   97:   /* list_delete frees items via callback */
   98:   list_delete (wq->items);
   99:   listnode_delete (&work_queues, wq);
  100:   
  101:   XFREE (MTYPE_WORK_QUEUE_NAME, wq->name);
  102:   XFREE (MTYPE_WORK_QUEUE, wq);
  103:   return;
  104: }
  105: 
  106: static int
  107: work_queue_schedule (struct work_queue *wq, unsigned int delay)
  108: {
  109:   /* if appropriate, schedule work queue thread */
  110:   if ( CHECK_FLAG (wq->flags, WQ_UNPLUGGED)
  111:        && (wq->thread == NULL)
  112:        && (listcount (wq->items) > 0) )
  113:     {
  114:       wq->thread = thread_add_background (wq->master, work_queue_run, 
  115:                                           wq, delay);
  116:       return 1;
  117:     }
  118:   else
  119:     return 0;
  120: }
  121:   
  122: void
  123: work_queue_add (struct work_queue *wq, void *data)
  124: {
  125:   struct work_queue_item *item;
  126:   
  127:   assert (wq);
  128: 
  129:   if (!(item = work_queue_item_new (wq)))
  130:     {
  131:       zlog_err ("%s: unable to get new queue item", __func__);
  132:       return;
  133:     }
  134:   
  135:   item->data = data;
  136:   listnode_add (wq->items, item);
  137:   
  138:   work_queue_schedule (wq, wq->spec.hold);
  139:   
  140:   return;
  141: }
  142: 
  143: static void
  144: work_queue_item_remove (struct work_queue *wq, struct listnode *ln)
  145: {
  146:   struct work_queue_item *item = listgetdata (ln);
  147: 
  148:   assert (item && item->data);
  149: 
  150:   /* call private data deletion callback if needed */  
  151:   if (wq->spec.del_item_data)
  152:     wq->spec.del_item_data (wq, item->data);
  153: 
  154:   list_delete_node (wq->items, ln);
  155:   work_queue_item_free (item);
  156:   
  157:   return;
  158: }
  159: 
  160: static void
  161: work_queue_item_requeue (struct work_queue *wq, struct listnode *ln)
  162: {
  163:   LISTNODE_DETACH (wq->items, ln);
  164:   LISTNODE_ATTACH (wq->items, ln); /* attach to end of list */
  165: }
  166: 
  167: DEFUN(show_work_queues,
  168:       show_work_queues_cmd,
  169:       "show work-queues",
  170:       SHOW_STR
  171:       "Work Queue information\n")
  172: {
  173:   struct listnode *node;
  174:   struct work_queue *wq;
  175:   
  176:   vty_out (vty, 
  177:            "%c %8s %5s %8s %21s%s",
  178:            ' ', "List","(ms) ","Q. Runs","Cycle Counts   ",
  179:            VTY_NEWLINE);
  180:   vty_out (vty,
  181:            "%c %8s %5s %8s %7s %6s %6s %s%s",
  182:            'P',
  183:            "Items",
  184:            "Hold",
  185:            "Total",
  186:            "Best","Gran.","Avg.", 
  187:            "Name", 
  188:            VTY_NEWLINE);
  189:  
  190:   for (ALL_LIST_ELEMENTS_RO ((&work_queues), node, wq))
  191:     {
  192:       vty_out (vty,"%c %8d %5d %8ld %7d %6d %6u %s%s",
  193:                (CHECK_FLAG (wq->flags, WQ_UNPLUGGED) ? ' ' : 'P'),
  194:                listcount (wq->items),
  195:                wq->spec.hold,
  196:                wq->runs,
  197:                wq->cycles.best, wq->cycles.granularity,
  198:                  (wq->runs) ? 
  199:                    (unsigned int) (wq->cycles.total / wq->runs) : 0,
  200:                wq->name,
  201:                VTY_NEWLINE);
  202:     }
  203:     
  204:   return CMD_SUCCESS;
  205: }
  206: 
  207: /* 'plug' a queue: Stop it from being scheduled,
  208:  * ie: prevent the queue from draining.
  209:  */
  210: void
  211: work_queue_plug (struct work_queue *wq)
  212: {
  213:   if (wq->thread)
  214:     thread_cancel (wq->thread);
  215:   
  216:   wq->thread = NULL;
  217:   
  218:   UNSET_FLAG (wq->flags, WQ_UNPLUGGED);
  219: }
  220: 
  221: /* unplug queue, schedule it again, if appropriate
  222:  * Ie: Allow the queue to be drained again
  223:  */
  224: void
  225: work_queue_unplug (struct work_queue *wq)
  226: {
  227:   SET_FLAG (wq->flags, WQ_UNPLUGGED);
  228: 
  229:   /* if thread isnt already waiting, add one */
  230:   work_queue_schedule (wq, wq->spec.hold);
  231: }
  232: 
  233: /* timer thread to process a work queue
  234:  * will reschedule itself if required,
  235:  * otherwise work_queue_item_add 
  236:  */
  237: int
  238: work_queue_run (struct thread *thread)
  239: {
  240:   struct work_queue *wq;
  241:   struct work_queue_item *item;
  242:   wq_item_status ret;
  243:   unsigned int cycles = 0;
  244:   struct listnode *node, *nnode;
  245:   char yielded = 0;
  246: 
  247:   wq = THREAD_ARG (thread);
  248:   wq->thread = NULL;
  249: 
  250:   assert (wq && wq->items);
  251: 
  252:   /* calculate cycle granularity:
  253:    * list iteration == 1 cycle
  254:    * granularity == # cycles between checks whether we should yield.
  255:    *
  256:    * granularity should be > 0, and can increase slowly after each run to
  257:    * provide some hysteris, but not past cycles.best or 2*cycles.
  258:    *
  259:    * Best: starts low, can only increase
  260:    *
  261:    * Granularity: starts at WORK_QUEUE_MIN_GRANULARITY, can be decreased 
  262:    *              if we run to end of time slot, can increase otherwise 
  263:    *              by a small factor.
  264:    *
  265:    * We could use just the average and save some work, however we want to be
  266:    * able to adjust quickly to CPU pressure. Average wont shift much if
  267:    * daemon has been running a long time.
  268:    */
  269:    if (wq->cycles.granularity == 0)
  270:      wq->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY;
  271: 
  272:   for (ALL_LIST_ELEMENTS (wq->items, node, nnode, item))
  273:   {
  274:     assert (item && item->data);
  275:     
  276:     /* dont run items which are past their allowed retries */
  277:     if (item->ran > wq->spec.max_retries)
  278:       {
  279:         /* run error handler, if any */
  280: 	if (wq->spec.errorfunc)
  281: 	  wq->spec.errorfunc (wq, item->data);
  282: 	work_queue_item_remove (wq, node);
  283: 	continue;
  284:       }
  285: 
  286:     /* run and take care of items that want to be retried immediately */
  287:     do
  288:       {
  289:         ret = wq->spec.workfunc (wq, item->data);
  290:         item->ran++;
  291:       }
  292:     while ((ret == WQ_RETRY_NOW) 
  293:            && (item->ran < wq->spec.max_retries));
  294: 
  295:     switch (ret)
  296:       {
  297:       case WQ_QUEUE_BLOCKED:
  298:         {
  299:           /* decrement item->ran again, cause this isn't an item
  300:            * specific error, and fall through to WQ_RETRY_LATER
  301:            */
  302:           item->ran--;
  303:         }
  304:       case WQ_RETRY_LATER:
  305: 	{
  306: 	  goto stats;
  307: 	}
  308:       case WQ_REQUEUE:
  309: 	{
  310: 	  item->ran--;
  311: 	  work_queue_item_requeue (wq, node);
  312: 	  break;
  313: 	}
  314:       case WQ_RETRY_NOW:
  315:         /* a RETRY_NOW that gets here has exceeded max_tries, same as ERROR */
  316:       case WQ_ERROR:
  317: 	{
  318: 	  if (wq->spec.errorfunc)
  319: 	    wq->spec.errorfunc (wq, item);
  320: 	}
  321: 	/* fall through here is deliberate */
  322:       case WQ_SUCCESS:
  323:       default:
  324: 	{
  325: 	  work_queue_item_remove (wq, node);
  326: 	  break;
  327: 	}
  328:       }
  329: 
  330:     /* completed cycle */
  331:     cycles++;
  332: 
  333:     /* test if we should yield */
  334:     if ( !(cycles % wq->cycles.granularity) 
  335:         && thread_should_yield (thread))
  336:       {
  337:         yielded = 1;
  338:         goto stats;
  339:       }
  340:   }
  341: 
  342: stats:
  343: 
  344: #define WQ_HYSTERESIS_FACTOR 4
  345: 
  346:   /* we yielded, check whether granularity should be reduced */
  347:   if (yielded && (cycles < wq->cycles.granularity))
  348:     {
  349:       wq->cycles.granularity = ((cycles > 0) ? cycles 
  350:                                              : WORK_QUEUE_MIN_GRANULARITY);
  351:     }
  352:   /* otherwise, should granularity increase? */
  353:   else if (cycles >= (wq->cycles.granularity))
  354:     {
  355:       if (cycles > wq->cycles.best)
  356:         wq->cycles.best = cycles;
  357:       
  358:       /* along with yielded check, provides hysteresis for granularity */
  359:       if (cycles > (wq->cycles.granularity * WQ_HYSTERESIS_FACTOR
  360:                                            * WQ_HYSTERESIS_FACTOR))
  361:         wq->cycles.granularity *= WQ_HYSTERESIS_FACTOR; /* quick ramp-up */
  362:       else if (cycles > (wq->cycles.granularity * WQ_HYSTERESIS_FACTOR))
  363:         wq->cycles.granularity += WQ_HYSTERESIS_FACTOR;
  364:     }
  365: #undef WQ_HYSTERIS_FACTOR
  366:   
  367:   wq->runs++;
  368:   wq->cycles.total += cycles;
  369: 
  370: #if 0
  371:   printf ("%s: cycles %d, new: best %d, worst %d\n",
  372:             __func__, cycles, wq->cycles.best, wq->cycles.granularity);
  373: #endif
  374:   
  375:   /* Is the queue done yet? If it is, call the completion callback. */
  376:   if (listcount (wq->items) > 0)
  377:     work_queue_schedule (wq, 0);
  378:   else if (wq->spec.completion_func)
  379:     wq->spec.completion_func (wq);
  380:   
  381:   return 0;
  382: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>