File:  [ELWIX - Embedded LightWeight unIX -] / embedaddon / quagga / lib / workqueue.c
Revision 1.1.1.3 (vendor branch): download - view: text, annotated - select for diffs - revision graph
Wed Nov 2 10:09:11 2016 UTC (8 years ago) by misho
Branches: quagga, MAIN
CVS tags: v1_0_20160315, HEAD
quagga 1.0.20160315

    1: /* 
    2:  * Quagga Work Queue Support.
    3:  *
    4:  * Copyright (C) 2005 Sun Microsystems, Inc.
    5:  *
    6:  * This file is part of GNU Zebra.
    7:  *
    8:  * Quagga is free software; you can redistribute it and/or modify it
    9:  * under the terms of the GNU General Public License as published by the
   10:  * Free Software Foundation; either version 2, or (at your option) any
   11:  * later version.
   12:  *
   13:  * Quagga is distributed in the hope that it will be useful, but
   14:  * WITHOUT ANY WARRANTY; without even the implied warranty of
   15:  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
   16:  * General Public License for more details.
   17:  *
   18:  * You should have received a copy of the GNU General Public License
   19:  * along with Quagga; see the file COPYING.  If not, write to the Free
   20:  * Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
   21:  * 02111-1307, USA.  
   22:  */
   23: 
   24: #include <zebra.h>
   25: #include "thread.h"
   26: #include "memory.h"
   27: #include "workqueue.h"
   28: #include "linklist.h"
   29: #include "command.h"
   30: #include "log.h"
   31: 
   32: /* master list of work_queues */
   33: static struct list _work_queues;
   34: /* pointer primarily to avoid an otherwise harmless warning on
   35:  * ALL_LIST_ELEMENTS_RO 
   36:  */
   37: static struct list *work_queues = &_work_queues;
   38: 
   39: #define WORK_QUEUE_MIN_GRANULARITY 1
   40: 
   41: static struct work_queue_item *
   42: work_queue_item_new (struct work_queue *wq)
   43: {
   44:   struct work_queue_item *item;
   45:   assert (wq);
   46: 
   47:   item = XCALLOC (MTYPE_WORK_QUEUE_ITEM, 
   48:                   sizeof (struct work_queue_item));
   49:   
   50:   return item;
   51: }
   52: 
   53: static void
   54: work_queue_item_free (struct work_queue_item *item)
   55: {
   56:   XFREE (MTYPE_WORK_QUEUE_ITEM, item);
   57:   return;
   58: }
   59: 
   60: /* create new work queue */
   61: struct work_queue *
   62: work_queue_new (struct thread_master *m, const char *queue_name)
   63: {
   64:   struct work_queue *new;
   65:   
   66:   new = XCALLOC (MTYPE_WORK_QUEUE, sizeof (struct work_queue));
   67: 
   68:   if (new == NULL)
   69:     return new;
   70:   
   71:   new->name = XSTRDUP (MTYPE_WORK_QUEUE_NAME, queue_name);
   72:   new->master = m;
   73:   SET_FLAG (new->flags, WQ_UNPLUGGED);
   74:   
   75:   if ( (new->items = list_new ()) == NULL)
   76:     {
   77:       XFREE (MTYPE_WORK_QUEUE_NAME, new->name);
   78:       XFREE (MTYPE_WORK_QUEUE, new);
   79:       
   80:       return NULL;
   81:     }
   82:   
   83:   new->items->del = (void (*)(void *)) work_queue_item_free;  
   84:   
   85:   listnode_add (work_queues, new);
   86:   
   87:   new->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY;
   88: 
   89:   /* Default values, can be overriden by caller */
   90:   new->spec.hold = WORK_QUEUE_DEFAULT_HOLD;
   91:     
   92:   return new;
   93: }
   94: 
   95: void
   96: work_queue_free (struct work_queue *wq)
   97: {
   98:   if (wq->thread != NULL)
   99:     thread_cancel(wq->thread);
  100:   
  101:   /* list_delete frees items via callback */
  102:   list_delete (wq->items);
  103:   listnode_delete (work_queues, wq);
  104:   
  105:   XFREE (MTYPE_WORK_QUEUE_NAME, wq->name);
  106:   XFREE (MTYPE_WORK_QUEUE, wq);
  107:   return;
  108: }
  109: 
  110: bool
  111: work_queue_is_scheduled (struct work_queue *wq)
  112: {
  113:   return (wq->thread != NULL);
  114: }
  115: 
  116: static int
  117: work_queue_schedule (struct work_queue *wq, unsigned int delay)
  118: {
  119:   /* if appropriate, schedule work queue thread */
  120:   if ( CHECK_FLAG (wq->flags, WQ_UNPLUGGED)
  121:        && (wq->thread == NULL)
  122:        && (listcount (wq->items) > 0) )
  123:     {
  124:       wq->thread = thread_add_background (wq->master, work_queue_run, 
  125:                                           wq, delay);
  126:       return 1;
  127:     }
  128:   else
  129:     return 0;
  130: }
  131:   
  132: void
  133: work_queue_add (struct work_queue *wq, void *data)
  134: {
  135:   struct work_queue_item *item;
  136:   
  137:   assert (wq);
  138: 
  139:   if (!(item = work_queue_item_new (wq)))
  140:     {
  141:       zlog_err ("%s: unable to get new queue item", __func__);
  142:       return;
  143:     }
  144:   
  145:   item->data = data;
  146:   listnode_add (wq->items, item);
  147:   
  148:   work_queue_schedule (wq, wq->spec.hold);
  149:   
  150:   return;
  151: }
  152: 
  153: static void
  154: work_queue_item_remove (struct work_queue *wq, struct listnode *ln)
  155: {
  156:   struct work_queue_item *item = listgetdata (ln);
  157: 
  158:   assert (item && item->data);
  159: 
  160:   /* call private data deletion callback if needed */  
  161:   if (wq->spec.del_item_data)
  162:     wq->spec.del_item_data (wq, item->data);
  163: 
  164:   list_delete_node (wq->items, ln);
  165:   work_queue_item_free (item);
  166:   
  167:   return;
  168: }
  169: 
  170: static void
  171: work_queue_item_requeue (struct work_queue *wq, struct listnode *ln)
  172: {
  173:   LISTNODE_DETACH (wq->items, ln);
  174:   LISTNODE_ATTACH (wq->items, ln); /* attach to end of list */
  175: }
  176: 
  177: DEFUN(show_work_queues,
  178:       show_work_queues_cmd,
  179:       "show work-queues",
  180:       SHOW_STR
  181:       "Work Queue information\n")
  182: {
  183:   struct listnode *node;
  184:   struct work_queue *wq;
  185:   
  186:   vty_out (vty, 
  187:            "%c %8s %5s %8s %21s%s",
  188:            ' ', "List","(ms) ","Q. Runs","Cycle Counts   ",
  189:            VTY_NEWLINE);
  190:   vty_out (vty,
  191:            "%c %8s %5s %8s %7s %6s %6s %s%s",
  192:            'P',
  193:            "Items",
  194:            "Hold",
  195:            "Total",
  196:            "Best","Gran.","Avg.", 
  197:            "Name", 
  198:            VTY_NEWLINE);
  199:  
  200:   for (ALL_LIST_ELEMENTS_RO (work_queues, node, wq))
  201:     {
  202:       vty_out (vty,"%c %8d %5d %8ld %7d %6d %6u %s%s",
  203:                (CHECK_FLAG (wq->flags, WQ_UNPLUGGED) ? ' ' : 'P'),
  204:                listcount (wq->items),
  205:                wq->spec.hold,
  206:                wq->runs,
  207:                wq->cycles.best, wq->cycles.granularity,
  208:                  (wq->runs) ? 
  209:                    (unsigned int) (wq->cycles.total / wq->runs) : 0,
  210:                wq->name,
  211:                VTY_NEWLINE);
  212:     }
  213:     
  214:   return CMD_SUCCESS;
  215: }
  216: 
  217: /* 'plug' a queue: Stop it from being scheduled,
  218:  * ie: prevent the queue from draining.
  219:  */
  220: void
  221: work_queue_plug (struct work_queue *wq)
  222: {
  223:   if (wq->thread)
  224:     thread_cancel (wq->thread);
  225:   
  226:   wq->thread = NULL;
  227:   
  228:   UNSET_FLAG (wq->flags, WQ_UNPLUGGED);
  229: }
  230: 
  231: /* unplug queue, schedule it again, if appropriate
  232:  * Ie: Allow the queue to be drained again
  233:  */
  234: void
  235: work_queue_unplug (struct work_queue *wq)
  236: {
  237:   SET_FLAG (wq->flags, WQ_UNPLUGGED);
  238: 
  239:   /* if thread isnt already waiting, add one */
  240:   work_queue_schedule (wq, wq->spec.hold);
  241: }
  242: 
  243: /* timer thread to process a work queue
  244:  * will reschedule itself if required,
  245:  * otherwise work_queue_item_add 
  246:  */
  247: int
  248: work_queue_run (struct thread *thread)
  249: {
  250:   struct work_queue *wq;
  251:   struct work_queue_item *item;
  252:   wq_item_status ret;
  253:   unsigned int cycles = 0;
  254:   struct listnode *node, *nnode;
  255:   char yielded = 0;
  256: 
  257:   wq = THREAD_ARG (thread);
  258:   wq->thread = NULL;
  259: 
  260:   assert (wq && wq->items);
  261: 
  262:   /* calculate cycle granularity:
  263:    * list iteration == 1 cycle
  264:    * granularity == # cycles between checks whether we should yield.
  265:    *
  266:    * granularity should be > 0, and can increase slowly after each run to
  267:    * provide some hysteris, but not past cycles.best or 2*cycles.
  268:    *
  269:    * Best: starts low, can only increase
  270:    *
  271:    * Granularity: starts at WORK_QUEUE_MIN_GRANULARITY, can be decreased 
  272:    *              if we run to end of time slot, can increase otherwise 
  273:    *              by a small factor.
  274:    *
  275:    * We could use just the average and save some work, however we want to be
  276:    * able to adjust quickly to CPU pressure. Average wont shift much if
  277:    * daemon has been running a long time.
  278:    */
  279:    if (wq->cycles.granularity == 0)
  280:      wq->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY;
  281: 
  282:   for (ALL_LIST_ELEMENTS (wq->items, node, nnode, item))
  283:   {
  284:     assert (item && item->data);
  285:     
  286:     /* dont run items which are past their allowed retries */
  287:     if (item->ran > wq->spec.max_retries)
  288:       {
  289:         /* run error handler, if any */
  290: 	if (wq->spec.errorfunc)
  291: 	  wq->spec.errorfunc (wq, item->data);
  292: 	work_queue_item_remove (wq, node);
  293: 	continue;
  294:       }
  295: 
  296:     /* run and take care of items that want to be retried immediately */
  297:     do
  298:       {
  299:         ret = wq->spec.workfunc (wq, item->data);
  300:         item->ran++;
  301:       }
  302:     while ((ret == WQ_RETRY_NOW) 
  303:            && (item->ran < wq->spec.max_retries));
  304: 
  305:     switch (ret)
  306:       {
  307:       case WQ_QUEUE_BLOCKED:
  308:         {
  309:           /* decrement item->ran again, cause this isn't an item
  310:            * specific error, and fall through to WQ_RETRY_LATER
  311:            */
  312:           item->ran--;
  313:         }
  314:       case WQ_RETRY_LATER:
  315: 	{
  316: 	  goto stats;
  317: 	}
  318:       case WQ_REQUEUE:
  319: 	{
  320: 	  item->ran--;
  321: 	  work_queue_item_requeue (wq, node);
  322: 	  break;
  323: 	}
  324:       case WQ_RETRY_NOW:
  325:         /* a RETRY_NOW that gets here has exceeded max_tries, same as ERROR */
  326:       case WQ_ERROR:
  327: 	{
  328: 	  if (wq->spec.errorfunc)
  329: 	    wq->spec.errorfunc (wq, item);
  330: 	}
  331: 	/* fall through here is deliberate */
  332:       case WQ_SUCCESS:
  333:       default:
  334: 	{
  335: 	  work_queue_item_remove (wq, node);
  336: 	  break;
  337: 	}
  338:       }
  339: 
  340:     /* completed cycle */
  341:     cycles++;
  342: 
  343:     /* test if we should yield */
  344:     if ( !(cycles % wq->cycles.granularity) 
  345:         && thread_should_yield (thread))
  346:       {
  347:         yielded = 1;
  348:         goto stats;
  349:       }
  350:   }
  351: 
  352: stats:
  353: 
  354: #define WQ_HYSTERESIS_FACTOR 4
  355: 
  356:   /* we yielded, check whether granularity should be reduced */
  357:   if (yielded && (cycles < wq->cycles.granularity))
  358:     {
  359:       wq->cycles.granularity = ((cycles > 0) ? cycles 
  360:                                              : WORK_QUEUE_MIN_GRANULARITY);
  361:     }
  362:   /* otherwise, should granularity increase? */
  363:   else if (cycles >= (wq->cycles.granularity))
  364:     {
  365:       if (cycles > wq->cycles.best)
  366:         wq->cycles.best = cycles;
  367:       
  368:       /* along with yielded check, provides hysteresis for granularity */
  369:       if (cycles > (wq->cycles.granularity * WQ_HYSTERESIS_FACTOR
  370:                                            * WQ_HYSTERESIS_FACTOR))
  371:         wq->cycles.granularity *= WQ_HYSTERESIS_FACTOR; /* quick ramp-up */
  372:       else if (cycles > (wq->cycles.granularity * WQ_HYSTERESIS_FACTOR))
  373:         wq->cycles.granularity += WQ_HYSTERESIS_FACTOR;
  374:     }
  375: #undef WQ_HYSTERIS_FACTOR
  376:   
  377:   wq->runs++;
  378:   wq->cycles.total += cycles;
  379: 
  380: #if 0
  381:   printf ("%s: cycles %d, new: best %d, worst %d\n",
  382:             __func__, cycles, wq->cycles.best, wq->cycles.granularity);
  383: #endif
  384:   
  385:   /* Is the queue done yet? If it is, call the completion callback. */
  386:   if (listcount (wq->items) > 0)
  387:     work_queue_schedule (wq, 0);
  388:   else if (wq->spec.completion_func)
  389:     wq->spec.completion_func (wq);
  390:   
  391:   return 0;
  392: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>