Annotation of embedaddon/quagga/lib/workqueue.c, revision 1.1

1.1     ! misho       1: /* 
        !             2:  * Quagga Work Queue Support.
        !             3:  *
        !             4:  * Copyright (C) 2005 Sun Microsystems, Inc.
        !             5:  *
        !             6:  * This file is part of GNU Zebra.
        !             7:  *
        !             8:  * Quagga is free software; you can redistribute it and/or modify it
        !             9:  * under the terms of the GNU General Public License as published by the
        !            10:  * Free Software Foundation; either version 2, or (at your option) any
        !            11:  * later version.
        !            12:  *
        !            13:  * Quagga is distributed in the hope that it will be useful, but
        !            14:  * WITHOUT ANY WARRANTY; without even the implied warranty of
        !            15:  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
        !            16:  * General Public License for more details.
        !            17:  *
        !            18:  * You should have received a copy of the GNU General Public License
        !            19:  * along with Quagga; see the file COPYING.  If not, write to the Free
        !            20:  * Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
        !            21:  * 02111-1307, USA.  
        !            22:  */
        !            23: 
        !            24: #include <lib/zebra.h>
        !            25: #include "thread.h"
        !            26: #include "memory.h"
        !            27: #include "workqueue.h"
        !            28: #include "linklist.h"
        !            29: #include "command.h"
        !            30: #include "log.h"
        !            31: 
        !            32: /* master list of work_queues */
        !            33: static struct list work_queues;
        !            34: 
        !            35: #define WORK_QUEUE_MIN_GRANULARITY 1
        !            36: 
        !            37: static struct work_queue_item *
        !            38: work_queue_item_new (struct work_queue *wq)
        !            39: {
        !            40:   struct work_queue_item *item;
        !            41:   assert (wq);
        !            42: 
        !            43:   item = XCALLOC (MTYPE_WORK_QUEUE_ITEM, 
        !            44:                   sizeof (struct work_queue_item));
        !            45:   
        !            46:   return item;
        !            47: }
        !            48: 
        !            49: static void
        !            50: work_queue_item_free (struct work_queue_item *item)
        !            51: {
        !            52:   XFREE (MTYPE_WORK_QUEUE_ITEM, item);
        !            53:   return;
        !            54: }
        !            55: 
        !            56: /* create new work queue */
        !            57: struct work_queue *
        !            58: work_queue_new (struct thread_master *m, const char *queue_name)
        !            59: {
        !            60:   struct work_queue *new;
        !            61:   
        !            62:   new = XCALLOC (MTYPE_WORK_QUEUE, sizeof (struct work_queue));
        !            63: 
        !            64:   if (new == NULL)
        !            65:     return new;
        !            66:   
        !            67:   new->name = XSTRDUP (MTYPE_WORK_QUEUE_NAME, queue_name);
        !            68:   new->master = m;
        !            69:   SET_FLAG (new->flags, WQ_UNPLUGGED);
        !            70:   
        !            71:   if ( (new->items = list_new ()) == NULL)
        !            72:     {
        !            73:       XFREE (MTYPE_WORK_QUEUE_NAME, new->name);
        !            74:       XFREE (MTYPE_WORK_QUEUE, new);
        !            75:       
        !            76:       return NULL;
        !            77:     }
        !            78:   
        !            79:   new->items->del = (void (*)(void *)) work_queue_item_free;  
        !            80:   
        !            81:   listnode_add (&work_queues, new);
        !            82:   
        !            83:   new->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY;
        !            84: 
        !            85:   /* Default values, can be overriden by caller */
        !            86:   new->spec.hold = WORK_QUEUE_DEFAULT_HOLD;
        !            87:     
        !            88:   return new;
        !            89: }
        !            90: 
        !            91: void
        !            92: work_queue_free (struct work_queue *wq)
        !            93: {
        !            94:   if (wq->thread != NULL)
        !            95:     thread_cancel(wq->thread);
        !            96:   
        !            97:   /* list_delete frees items via callback */
        !            98:   list_delete (wq->items);
        !            99:   listnode_delete (&work_queues, wq);
        !           100:   
        !           101:   XFREE (MTYPE_WORK_QUEUE_NAME, wq->name);
        !           102:   XFREE (MTYPE_WORK_QUEUE, wq);
        !           103:   return;
        !           104: }
        !           105: 
        !           106: static inline int
        !           107: work_queue_schedule (struct work_queue *wq, unsigned int delay)
        !           108: {
        !           109:   /* if appropriate, schedule work queue thread */
        !           110:   if ( CHECK_FLAG (wq->flags, WQ_UNPLUGGED)
        !           111:        && (wq->thread == NULL)
        !           112:        && (listcount (wq->items) > 0) )
        !           113:     {
        !           114:       wq->thread = thread_add_background (wq->master, work_queue_run, 
        !           115:                                           wq, delay);
        !           116:       return 1;
        !           117:     }
        !           118:   else
        !           119:     return 0;
        !           120: }
        !           121:   
        !           122: void
        !           123: work_queue_add (struct work_queue *wq, void *data)
        !           124: {
        !           125:   struct work_queue_item *item;
        !           126:   
        !           127:   assert (wq);
        !           128: 
        !           129:   if (!(item = work_queue_item_new (wq)))
        !           130:     {
        !           131:       zlog_err ("%s: unable to get new queue item", __func__);
        !           132:       return;
        !           133:     }
        !           134:   
        !           135:   item->data = data;
        !           136:   listnode_add (wq->items, item);
        !           137:   
        !           138:   work_queue_schedule (wq, wq->spec.hold);
        !           139:   
        !           140:   return;
        !           141: }
        !           142: 
        !           143: static void
        !           144: work_queue_item_remove (struct work_queue *wq, struct listnode *ln)
        !           145: {
        !           146:   struct work_queue_item *item = listgetdata (ln);
        !           147: 
        !           148:   assert (item && item->data);
        !           149: 
        !           150:   /* call private data deletion callback if needed */  
        !           151:   if (wq->spec.del_item_data)
        !           152:     wq->spec.del_item_data (wq, item->data);
        !           153: 
        !           154:   list_delete_node (wq->items, ln);
        !           155:   work_queue_item_free (item);
        !           156:   
        !           157:   return;
        !           158: }
        !           159: 
        !           160: static void
        !           161: work_queue_item_requeue (struct work_queue *wq, struct listnode *ln)
        !           162: {
        !           163:   LISTNODE_DETACH (wq->items, ln);
        !           164:   LISTNODE_ATTACH (wq->items, ln); /* attach to end of list */
        !           165: }
        !           166: 
        !           167: DEFUN(show_work_queues,
        !           168:       show_work_queues_cmd,
        !           169:       "show work-queues",
        !           170:       SHOW_STR
        !           171:       "Work Queue information\n")
        !           172: {
        !           173:   struct listnode *node;
        !           174:   struct work_queue *wq;
        !           175:   
        !           176:   vty_out (vty, 
        !           177:            "%c %8s %5s %8s %21s%s",
        !           178:            ' ', "List","(ms) ","Q. Runs","Cycle Counts   ",
        !           179:            VTY_NEWLINE);
        !           180:   vty_out (vty,
        !           181:            "%c %8s %5s %8s %7s %6s %6s %s%s",
        !           182:            'P',
        !           183:            "Items",
        !           184:            "Hold",
        !           185:            "Total",
        !           186:            "Best","Gran.","Avg.", 
        !           187:            "Name", 
        !           188:            VTY_NEWLINE);
        !           189:  
        !           190:   for (ALL_LIST_ELEMENTS_RO ((&work_queues), node, wq))
        !           191:     {
        !           192:       vty_out (vty,"%c %8d %5d %8ld %7d %6d %6u %s%s",
        !           193:                (CHECK_FLAG (wq->flags, WQ_UNPLUGGED) ? ' ' : 'P'),
        !           194:                listcount (wq->items),
        !           195:                wq->spec.hold,
        !           196:                wq->runs,
        !           197:                wq->cycles.best, wq->cycles.granularity,
        !           198:                  (wq->runs) ? 
        !           199:                    (unsigned int) (wq->cycles.total / wq->runs) : 0,
        !           200:                wq->name,
        !           201:                VTY_NEWLINE);
        !           202:     }
        !           203:     
        !           204:   return CMD_SUCCESS;
        !           205: }
        !           206: 
        !           207: /* 'plug' a queue: Stop it from being scheduled,
        !           208:  * ie: prevent the queue from draining.
        !           209:  */
        !           210: void
        !           211: work_queue_plug (struct work_queue *wq)
        !           212: {
        !           213:   if (wq->thread)
        !           214:     thread_cancel (wq->thread);
        !           215:   
        !           216:   wq->thread = NULL;
        !           217:   
        !           218:   UNSET_FLAG (wq->flags, WQ_UNPLUGGED);
        !           219: }
        !           220: 
        !           221: /* unplug queue, schedule it again, if appropriate
        !           222:  * Ie: Allow the queue to be drained again
        !           223:  */
        !           224: void
        !           225: work_queue_unplug (struct work_queue *wq)
        !           226: {
        !           227:   SET_FLAG (wq->flags, WQ_UNPLUGGED);
        !           228: 
        !           229:   /* if thread isnt already waiting, add one */
        !           230:   work_queue_schedule (wq, wq->spec.hold);
        !           231: }
        !           232: 
        !           233: /* timer thread to process a work queue
        !           234:  * will reschedule itself if required,
        !           235:  * otherwise work_queue_item_add 
        !           236:  */
        !           237: int
        !           238: work_queue_run (struct thread *thread)
        !           239: {
        !           240:   struct work_queue *wq;
        !           241:   struct work_queue_item *item;
        !           242:   wq_item_status ret;
        !           243:   unsigned int cycles = 0;
        !           244:   struct listnode *node, *nnode;
        !           245:   char yielded = 0;
        !           246: 
        !           247:   wq = THREAD_ARG (thread);
        !           248:   wq->thread = NULL;
        !           249: 
        !           250:   assert (wq && wq->items);
        !           251: 
        !           252:   /* calculate cycle granularity:
        !           253:    * list iteration == 1 cycle
        !           254:    * granularity == # cycles between checks whether we should yield.
        !           255:    *
        !           256:    * granularity should be > 0, and can increase slowly after each run to
        !           257:    * provide some hysteris, but not past cycles.best or 2*cycles.
        !           258:    *
        !           259:    * Best: starts low, can only increase
        !           260:    *
        !           261:    * Granularity: starts at WORK_QUEUE_MIN_GRANULARITY, can be decreased 
        !           262:    *              if we run to end of time slot, can increase otherwise 
        !           263:    *              by a small factor.
        !           264:    *
        !           265:    * We could use just the average and save some work, however we want to be
        !           266:    * able to adjust quickly to CPU pressure. Average wont shift much if
        !           267:    * daemon has been running a long time.
        !           268:    */
        !           269:    if (wq->cycles.granularity == 0)
        !           270:      wq->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY;
        !           271: 
        !           272:   for (ALL_LIST_ELEMENTS (wq->items, node, nnode, item))
        !           273:   {
        !           274:     assert (item && item->data);
        !           275:     
        !           276:     /* dont run items which are past their allowed retries */
        !           277:     if (item->ran > wq->spec.max_retries)
        !           278:       {
        !           279:         /* run error handler, if any */
        !           280:        if (wq->spec.errorfunc)
        !           281:          wq->spec.errorfunc (wq, item->data);
        !           282:        work_queue_item_remove (wq, node);
        !           283:        continue;
        !           284:       }
        !           285: 
        !           286:     /* run and take care of items that want to be retried immediately */
        !           287:     do
        !           288:       {
        !           289:         ret = wq->spec.workfunc (wq, item->data);
        !           290:         item->ran++;
        !           291:       }
        !           292:     while ((ret == WQ_RETRY_NOW) 
        !           293:            && (item->ran < wq->spec.max_retries));
        !           294: 
        !           295:     switch (ret)
        !           296:       {
        !           297:       case WQ_QUEUE_BLOCKED:
        !           298:         {
        !           299:           /* decrement item->ran again, cause this isn't an item
        !           300:            * specific error, and fall through to WQ_RETRY_LATER
        !           301:            */
        !           302:           item->ran--;
        !           303:         }
        !           304:       case WQ_RETRY_LATER:
        !           305:        {
        !           306:          goto stats;
        !           307:        }
        !           308:       case WQ_REQUEUE:
        !           309:        {
        !           310:          item->ran--;
        !           311:          work_queue_item_requeue (wq, node);
        !           312:          break;
        !           313:        }
        !           314:       case WQ_RETRY_NOW:
        !           315:         /* a RETRY_NOW that gets here has exceeded max_tries, same as ERROR */
        !           316:       case WQ_ERROR:
        !           317:        {
        !           318:          if (wq->spec.errorfunc)
        !           319:            wq->spec.errorfunc (wq, item);
        !           320:        }
        !           321:        /* fall through here is deliberate */
        !           322:       case WQ_SUCCESS:
        !           323:       default:
        !           324:        {
        !           325:          work_queue_item_remove (wq, node);
        !           326:          break;
        !           327:        }
        !           328:       }
        !           329: 
        !           330:     /* completed cycle */
        !           331:     cycles++;
        !           332: 
        !           333:     /* test if we should yield */
        !           334:     if ( !(cycles % wq->cycles.granularity) 
        !           335:         && thread_should_yield (thread))
        !           336:       {
        !           337:         yielded = 1;
        !           338:         goto stats;
        !           339:       }
        !           340:   }
        !           341: 
        !           342: stats:
        !           343: 
        !           344: #define WQ_HYSTERESIS_FACTOR 4
        !           345: 
        !           346:   /* we yielded, check whether granularity should be reduced */
        !           347:   if (yielded && (cycles < wq->cycles.granularity))
        !           348:     {
        !           349:       wq->cycles.granularity = ((cycles > 0) ? cycles 
        !           350:                                              : WORK_QUEUE_MIN_GRANULARITY);
        !           351:     }
        !           352:   /* otherwise, should granularity increase? */
        !           353:   else if (cycles >= (wq->cycles.granularity))
        !           354:     {
        !           355:       if (cycles > wq->cycles.best)
        !           356:         wq->cycles.best = cycles;
        !           357:       
        !           358:       /* along with yielded check, provides hysteresis for granularity */
        !           359:       if (cycles > (wq->cycles.granularity * WQ_HYSTERESIS_FACTOR
        !           360:                                            * WQ_HYSTERESIS_FACTOR))
        !           361:         wq->cycles.granularity *= WQ_HYSTERESIS_FACTOR; /* quick ramp-up */
        !           362:       else if (cycles > (wq->cycles.granularity * WQ_HYSTERESIS_FACTOR))
        !           363:         wq->cycles.granularity += WQ_HYSTERESIS_FACTOR;
        !           364:     }
        !           365: #undef WQ_HYSTERIS_FACTOR
        !           366:   
        !           367:   wq->runs++;
        !           368:   wq->cycles.total += cycles;
        !           369: 
        !           370: #if 0
        !           371:   printf ("%s: cycles %d, new: best %d, worst %d\n",
        !           372:             __func__, cycles, wq->cycles.best, wq->cycles.granularity);
        !           373: #endif
        !           374:   
        !           375:   /* Is the queue done yet? If it is, call the completion callback. */
        !           376:   if (listcount (wq->items) > 0)
        !           377:     work_queue_schedule (wq, 0);
        !           378:   else if (wq->spec.completion_func)
        !           379:     wq->spec.completion_func (wq);
        !           380:   
        !           381:   return 0;
        !           382: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>