Annotation of embedaddon/strongswan/src/libstrongswan/processing/scheduler.c, revision 1.1.1.2

1.1       misho       1: /*
                      2:  * Copyright (C) 2008-2015 Tobias Brunner
                      3:  * Copyright (C) 2005-2006 Martin Willi
                      4:  * Copyright (C) 2005 Jan Hutter
                      5:  * HSR Hochschule fuer Technik Rapperswil
                      6:  *
                      7:  * This program is free software; you can redistribute it and/or modify it
                      8:  * under the terms of the GNU General Public License as published by the
                      9:  * Free Software Foundation; either version 2 of the License, or (at your
                     10:  * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
                     11:  *
                     12:  * This program is distributed in the hope that it will be useful, but
                     13:  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
                     14:  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
                     15:  * for more details.
                     16:  */
                     17: 
                     18: #include <stdlib.h>
                     19: 
                     20: #include "scheduler.h"
                     21: 
                     22: #include <utils/debug.h>
                     23: #include <processing/processor.h>
                     24: #include <processing/jobs/callback_job.h>
                     25: #include <threading/thread.h>
                     26: #include <threading/condvar.h>
                     27: #include <threading/mutex.h>
                     28: 
                     29: /* the initial size of the heap */
                     30: #define HEAP_SIZE_DEFAULT 64
                     31: 
                     32: typedef struct event_t event_t;
                     33: 
                     34: /**
                     35:  * Event containing a job and a schedule time
                     36:  */
                     37: struct event_t {
                     38:        /**
                     39:         * Time to fire the event.
                     40:         */
                     41:        timeval_t time;
                     42: 
                     43:        /**
                     44:         * Every event has its assigned job.
                     45:         */
                     46:        job_t *job;
                     47: };
                     48: 
                     49: /**
                     50:  * destroy an event and its job
                     51:  */
                     52: static void event_destroy(event_t *event)
                     53: {
                     54:        event->job->destroy(event->job);
                     55:        free(event);
                     56: }
                     57: 
                     58: typedef struct private_scheduler_t private_scheduler_t;
                     59: 
                     60: /**
                     61:  * Private data of a scheduler_t object.
                     62:  */
                     63: struct private_scheduler_t {
                     64: 
                     65:        /**
                     66:         * Public part of a scheduler_t object.
                     67:         */
                     68:         scheduler_t public;
                     69: 
                     70:        /**
                     71:         * The heap in which the events are stored.
                     72:         */
                     73:        event_t **heap;
                     74: 
                     75:        /**
                     76:         * The size of the heap.
                     77:         */
                     78:        u_int heap_size;
                     79: 
                     80:        /**
                     81:         * The number of scheduled events.
                     82:         */
                     83:        u_int event_count;
                     84: 
                     85:        /**
                     86:         * Exclusive access to list
                     87:         */
                     88:        mutex_t *mutex;
                     89: 
                     90:        /**
                     91:         * Condvar to wait for next job.
                     92:         */
                     93:        condvar_t *condvar;
                     94: };
                     95: 
                     96: /**
                     97:  * Returns the top event without removing it. Returns NULL if the heap is empty.
                     98:  */
                     99: static event_t *peek_event(private_scheduler_t *this)
                    100: {
                    101:        return this->event_count > 0 ? this->heap[1] : NULL;
                    102: }
                    103: 
                    104: /**
                    105:  * Removes the top event from the heap and returns it. Returns NULL if the heap
                    106:  * is empty.
                    107:  */
                    108: static event_t *remove_event(private_scheduler_t *this)
                    109: {
                    110:        event_t *event, *top;
1.1.1.2 ! misho     111: 
1.1       misho     112:        if (!this->event_count)
                    113:        {
                    114:                return NULL;
                    115:        }
                    116: 
                    117:        /* store the value to return */
                    118:        event = this->heap[1];
                    119:        /* move the bottom event to the top */
                    120:        top = this->heap[1] = this->heap[this->event_count];
                    121: 
                    122:        if (--this->event_count > 1)
                    123:        {
                    124:                u_int position = 1;
1.1.1.2 ! misho     125: 
        !           126:                /* seep down the top event */
1.1       misho     127:                while ((position << 1) <= this->event_count)
                    128:                {
                    129:                        u_int child = position << 1;
                    130: 
                    131:                        if ((child + 1) <= this->event_count &&
1.1.1.2 ! misho     132:                                timercmp(&this->heap[child + 1]->time,
        !           133:                                                 &this->heap[child]->time, <))
1.1       misho     134:                        {
                    135:                                /* the "right" child is smaller */
                    136:                                child++;
                    137:                        }
                    138: 
1.1.1.2 ! misho     139:                        if (!timercmp(&top->time, &this->heap[child]->time, >))
1.1       misho     140:                        {
                    141:                                /* the top event fires before the smaller of the two children,
                    142:                                 * stop */
                    143:                                break;
                    144:                        }
                    145: 
                    146:                        /* swap with the smaller child */
                    147:                        this->heap[position] = this->heap[child];
                    148:                        position = child;
                    149:                }
                    150:                this->heap[position] = top;
                    151:        }
                    152:        return event;
                    153: }
                    154: 
                    155: /**
                    156:  * Get events from the queue and pass it to the processor
                    157:  */
                    158: static job_requeue_t schedule(private_scheduler_t * this)
                    159: {
                    160:        timeval_t now;
                    161:        event_t *event;
                    162:        bool timed = FALSE, oldstate;
                    163: 
                    164:        this->mutex->lock(this->mutex);
                    165: 
                    166:        time_monotonic(&now);
                    167: 
                    168:        if ((event = peek_event(this)) != NULL)
                    169:        {
1.1.1.2 ! misho     170:                if (!timercmp(&now, &event->time, <))
1.1       misho     171:                {
                    172:                        remove_event(this);
                    173:                        this->mutex->unlock(this->mutex);
                    174:                        DBG2(DBG_JOB, "got event, queuing job for execution");
                    175:                        lib->processor->queue_job(lib->processor, event->job);
                    176:                        free(event);
                    177:                        return JOB_REQUEUE_DIRECT;
                    178:                }
                    179:                timersub(&event->time, &now, &now);
                    180:                if (now.tv_sec)
                    181:                {
                    182:                        DBG2(DBG_JOB, "next event in %ds %dms, waiting",
                    183:                                 now.tv_sec, now.tv_usec/1000);
                    184:                }
                    185:                else
                    186:                {
                    187:                        DBG2(DBG_JOB, "next event in %dms, waiting", now.tv_usec/1000);
                    188:                }
                    189:                timed = TRUE;
                    190:        }
                    191:        thread_cleanup_push((thread_cleanup_t)this->mutex->unlock, this->mutex);
                    192:        oldstate = thread_cancelability(TRUE);
                    193: 
                    194:        if (timed)
                    195:        {
                    196:                this->condvar->timed_wait_abs(this->condvar, this->mutex, event->time);
                    197:        }
                    198:        else
                    199:        {
                    200:                DBG2(DBG_JOB, "no events, waiting");
                    201:                this->condvar->wait(this->condvar, this->mutex);
                    202:        }
                    203:        thread_cancelability(oldstate);
                    204:        thread_cleanup_pop(TRUE);
                    205:        return JOB_REQUEUE_DIRECT;
                    206: }
                    207: 
                    208: METHOD(scheduler_t, get_job_load, u_int,
                    209:        private_scheduler_t *this)
                    210: {
                    211:        int count;
1.1.1.2 ! misho     212: 
1.1       misho     213:        this->mutex->lock(this->mutex);
                    214:        count = this->event_count;
                    215:        this->mutex->unlock(this->mutex);
                    216:        return count;
                    217: }
                    218: 
                    219: METHOD(scheduler_t, schedule_job_tv, void,
                    220:        private_scheduler_t *this, job_t *job, timeval_t tv)
                    221: {
                    222:        event_t *event;
                    223:        u_int position;
                    224: 
                    225:        event = malloc_thing(event_t);
                    226:        event->job = job;
                    227:        event->job->status = JOB_STATUS_QUEUED;
                    228:        event->time = tv;
                    229: 
                    230:        this->mutex->lock(this->mutex);
                    231: 
                    232:        this->event_count++;
                    233:        if (this->event_count > this->heap_size)
                    234:        {
                    235:                /* double the size of the heap */
                    236:                this->heap_size <<= 1;
                    237:                this->heap = (event_t**)realloc(this->heap,
                    238:                                                                        (this->heap_size + 1) * sizeof(event_t*));
                    239:        }
                    240:        /* "put" the event to the bottom */
                    241:        position = this->event_count;
                    242: 
                    243:        /* then bubble it up */
1.1.1.2 ! misho     244:        while (position > 1 &&
        !           245:                   timercmp(&this->heap[position >> 1]->time, &event->time, >))
1.1       misho     246:        {
                    247:                /* parent has to be fired after the new event, move up */
                    248:                this->heap[position] = this->heap[position >> 1];
                    249:                position >>= 1;
                    250:        }
                    251:        this->heap[position] = event;
                    252: 
                    253:        this->condvar->signal(this->condvar);
                    254:        this->mutex->unlock(this->mutex);
                    255: }
                    256: 
                    257: METHOD(scheduler_t, schedule_job, void,
                    258:        private_scheduler_t *this, job_t *job, uint32_t s)
                    259: {
                    260:        timeval_t tv;
                    261: 
                    262:        time_monotonic(&tv);
                    263:        tv.tv_sec += s;
                    264: 
                    265:        schedule_job_tv(this, job, tv);
                    266: }
                    267: 
                    268: METHOD(scheduler_t, schedule_job_ms, void,
                    269:        private_scheduler_t *this, job_t *job, uint32_t ms)
                    270: {
                    271:        timeval_t tv, add;
                    272: 
                    273:        time_monotonic(&tv);
                    274:        add.tv_sec = ms / 1000;
                    275:        add.tv_usec = (ms % 1000) * 1000;
                    276: 
                    277:        timeradd(&tv, &add, &tv);
                    278: 
                    279:        schedule_job_tv(this, job, tv);
                    280: }
                    281: 
                    282: METHOD(scheduler_t, flush, void,
                    283:        private_scheduler_t *this)
                    284: {
                    285:        event_t *event;
                    286: 
                    287:        this->mutex->lock(this->mutex);
                    288:        while ((event = remove_event(this)) != NULL)
                    289:        {
                    290:                event_destroy(event);
                    291:        }
                    292:        this->condvar->signal(this->condvar);
                    293:        this->mutex->unlock(this->mutex);
                    294: }
                    295: 
                    296: METHOD(scheduler_t, destroy, void,
                    297:        private_scheduler_t *this)
                    298: {
                    299:        flush(this);
                    300:        this->condvar->destroy(this->condvar);
                    301:        this->mutex->destroy(this->mutex);
                    302:        free(this->heap);
                    303:        free(this);
                    304: }
                    305: 
                    306: /*
                    307:  * Described in header.
                    308:  */
                    309: scheduler_t * scheduler_create()
                    310: {
                    311:        private_scheduler_t *this;
                    312:        callback_job_t *job;
                    313: 
                    314:        INIT(this,
                    315:                .public = {
                    316:                        .get_job_load = _get_job_load,
                    317:                        .schedule_job = _schedule_job,
                    318:                        .schedule_job_ms = _schedule_job_ms,
                    319:                        .schedule_job_tv = _schedule_job_tv,
                    320:                        .flush = _flush,
                    321:                        .destroy = _destroy,
                    322:                },
                    323:                .heap_size = HEAP_SIZE_DEFAULT,
                    324:                .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
                    325:                .condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
                    326:        );
                    327: 
                    328:        this->heap = (event_t**)calloc(this->heap_size + 1, sizeof(event_t*));
                    329: 
                    330:        job = callback_job_create_with_prio((callback_job_cb_t)schedule, this,
                    331:                                                                                NULL, return_false, JOB_PRIO_CRITICAL);
                    332:        lib->processor->queue_job(lib->processor, (job_t*)job);
                    333: 
                    334:        return &this->public;
                    335: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>