Annotation of embedaddon/strongswan/src/libstrongswan/processing/processor.c, revision 1.1.1.1

1.1       misho       1: /*
                      2:  * Copyright (C) 2005-2011 Martin Willi
                      3:  * Copyright (C) 2011 revosec AG
                      4:  * Copyright (C) 2008-2013 Tobias Brunner
                      5:  * Copyright (C) 2005 Jan Hutter
                      6:  * HSR Hochschule fuer Technik Rapperswil
                      7:  *
                      8:  * This program is free software; you can redistribute it and/or modify it
                      9:  * under the terms of the GNU General Public License as published by the
                     10:  * Free Software Foundation; either version 2 of the License, or (at your
                     11:  * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
                     12:  *
                     13:  * This program is distributed in the hope that it will be useful, but
                     14:  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
                     15:  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
                     16:  * for more details.
                     17:  */
                     18: 
                     19: #include <stdlib.h>
                     20: #include <string.h>
                     21: #include <errno.h>
                     22: 
                     23: #include "processor.h"
                     24: 
                     25: #include <utils/debug.h>
                     26: #include <threading/thread.h>
                     27: #include <threading/condvar.h>
                     28: #include <threading/mutex.h>
                     29: #include <threading/thread_value.h>
                     30: #include <collections/linked_list.h>
                     31: 
                     32: typedef struct private_processor_t private_processor_t;
                     33: 
                     34: /**
                     35:  * Private data of processor_t class.
                     36:  */
                     37: struct private_processor_t {
                     38: 
                     39:        /**
                     40:         * Public processor_t interface.
                     41:         */
                     42:        processor_t public;
                     43: 
                     44:        /**
                     45:         * Number of running threads
                     46:         */
                     47:        u_int total_threads;
                     48: 
                     49:        /**
                     50:         * Desired number of threads
                     51:         */
                     52:        u_int desired_threads;
                     53: 
                     54:        /**
                     55:         * Number of threads currently working, for each priority
                     56:         */
                     57:        u_int working_threads[JOB_PRIO_MAX];
                     58: 
                     59:        /**
                     60:         * All threads managed in the pool (including threads that have been
                     61:         * canceled, this allows to join them later), as worker_thread_t
                     62:         */
                     63:        linked_list_t *threads;
                     64: 
                     65:        /**
                     66:         * A list of queued jobs for each priority
                     67:         */
                     68:        linked_list_t *jobs[JOB_PRIO_MAX];
                     69: 
                     70:        /**
                     71:         * Threads reserved for each priority
                     72:         */
                     73:        int prio_threads[JOB_PRIO_MAX];
                     74: 
                     75:        /**
                     76:         * access to job lists is locked through this mutex
                     77:         */
                     78:        mutex_t *mutex;
                     79: 
                     80:        /**
                     81:         * Condvar to wait for new jobs
                     82:         */
                     83:        condvar_t *job_added;
                     84: 
                     85:        /**
                     86:         * Condvar to wait for terminated threads
                     87:         */
                     88:        condvar_t *thread_terminated;
                     89: };
                     90: 
                     91: /**
                     92:  * Worker thread
                     93:  */
                     94: typedef struct {
                     95: 
                     96:        /**
                     97:         * Reference to the processor
                     98:         */
                     99:        private_processor_t *processor;
                    100: 
                    101:        /**
                    102:         * The actual thread
                    103:         */
                    104:        thread_t *thread;
                    105: 
                    106:        /**
                    107:         * Job currently being executed by this worker thread
                    108:         */
                    109:        job_t *job;
                    110: 
                    111:        /**
                    112:         * Priority of the current job
                    113:         */
                    114:        job_priority_t priority;
                    115: 
                    116: } worker_thread_t;
                    117: 
                    118: static void process_jobs(worker_thread_t *worker);
                    119: 
                    120: /**
                    121:  * restart a terminated thread
                    122:  */
                    123: static void restart(worker_thread_t *worker)
                    124: {
                    125:        private_processor_t *this = worker->processor;
                    126:        job_t *job;
                    127: 
                    128:        DBG2(DBG_JOB, "terminated worker thread %.2u", thread_current_id());
                    129: 
                    130:        this->mutex->lock(this->mutex);
                    131:        /* cleanup worker thread  */
                    132:        this->working_threads[worker->priority]--;
                    133:        worker->job->status = JOB_STATUS_CANCELED;
                    134:        job = worker->job;
                    135:        /* unset the job before releasing the mutex, otherwise cancel() might
                    136:         * interfere */
                    137:        worker->job = NULL;
                    138:        /* release mutex to avoid deadlocks if the same lock is required
                    139:         * during queue_job() and in the destructor called here */
                    140:        this->mutex->unlock(this->mutex);
                    141:        job->destroy(job);
                    142:        this->mutex->lock(this->mutex);
                    143: 
                    144:        /* respawn thread if required */
                    145:        if (this->desired_threads >= this->total_threads)
                    146:        {
                    147:                worker_thread_t *new_worker;
                    148: 
                    149:                INIT(new_worker,
                    150:                        .processor = this,
                    151:                );
                    152:                new_worker->thread = thread_create((thread_main_t)process_jobs,
                    153:                                                                                   new_worker);
                    154:                if (new_worker->thread)
                    155:                {
                    156:                        this->threads->insert_last(this->threads, new_worker);
                    157:                        this->mutex->unlock(this->mutex);
                    158:                        return;
                    159:                }
                    160:                free(new_worker);
                    161:        }
                    162:        this->total_threads--;
                    163:        this->thread_terminated->signal(this->thread_terminated);
                    164:        this->mutex->unlock(this->mutex);
                    165: }
                    166: 
                    167: /**
                    168:  * Get number of idle threads, non-locking variant
                    169:  */
                    170: static u_int get_idle_threads_nolock(private_processor_t *this)
                    171: {
                    172:        u_int count, i;
                    173: 
                    174:        count = this->total_threads;
                    175:        for (i = 0; i < JOB_PRIO_MAX; i++)
                    176:        {
                    177:                count -= this->working_threads[i];
                    178:        }
                    179:        return count;
                    180: }
                    181: 
                    182: /**
                    183:  * Get a job from any job queue, starting with the highest priority.
                    184:  *
                    185:  * this->mutex is expected to be locked.
                    186:  */
                    187: static bool get_job(private_processor_t *this, worker_thread_t *worker)
                    188: {
                    189:        int i, reserved = 0, idle;
                    190: 
                    191:        idle = get_idle_threads_nolock(this);
                    192: 
                    193:        for (i = 0; i < JOB_PRIO_MAX; i++)
                    194:        {
                    195:                if (reserved && reserved >= idle)
                    196:                {
                    197:                        DBG2(DBG_JOB, "delaying %N priority jobs: %d threads idle, "
                    198:                                 "but %d reserved for higher priorities",
                    199:                                 job_priority_names, i, idle, reserved);
                    200:                        /* wait until a job of higher priority gets queued */
                    201:                        return FALSE;
                    202:                }
                    203:                if (this->working_threads[i] < this->prio_threads[i])
                    204:                {
                    205:                        reserved += this->prio_threads[i] - this->working_threads[i];
                    206:                }
                    207:                if (this->jobs[i]->remove_first(this->jobs[i],
                    208:                                                                                (void**)&worker->job) == SUCCESS)
                    209:                {
                    210:                        worker->priority = i;
                    211:                        return TRUE;
                    212:                }
                    213:        }
                    214:        return FALSE;
                    215: }
                    216: 
                    217: /**
                    218:  * Process a single job (provided in worker->job, worker->priority is also
                    219:  * expected to be set)
                    220:  *
                    221:  * this->mutex is expected to be locked.
                    222:  */
                    223: static void process_job(private_processor_t *this, worker_thread_t *worker)
                    224: {
                    225:        job_t *to_destroy = NULL;
                    226:        job_requeue_t requeue;
                    227: 
                    228:        this->working_threads[worker->priority]++;
                    229:        worker->job->status = JOB_STATUS_EXECUTING;
                    230:        this->mutex->unlock(this->mutex);
                    231:        /* canceled threads are restarted to get a constant pool */
                    232:        thread_cleanup_push((thread_cleanup_t)restart, worker);
                    233:        while (TRUE)
                    234:        {
                    235:                requeue = worker->job->execute(worker->job);
                    236:                if (requeue.type != JOB_REQUEUE_TYPE_DIRECT)
                    237:                {
                    238:                        break;
                    239:                }
                    240:                else if (!worker->job->cancel)
                    241:                {       /* only allow cancelable jobs to requeue directly */
                    242:                        requeue.type = JOB_REQUEUE_TYPE_FAIR;
                    243:                        break;
                    244:                }
                    245:        }
                    246:        thread_cleanup_pop(FALSE);
                    247:        this->mutex->lock(this->mutex);
                    248:        this->working_threads[worker->priority]--;
                    249:        if (worker->job->status == JOB_STATUS_CANCELED)
                    250:        {       /* job was canceled via a custom cancel() method or did not
                    251:                 * use JOB_REQUEUE_TYPE_DIRECT */
                    252:                to_destroy = worker->job;
                    253:        }
                    254:        else
                    255:        {
                    256:                switch (requeue.type)
                    257:                {
                    258:                        case JOB_REQUEUE_TYPE_NONE:
                    259:                                worker->job->status = JOB_STATUS_DONE;
                    260:                                to_destroy = worker->job;
                    261:                                break;
                    262:                        case JOB_REQUEUE_TYPE_FAIR:
                    263:                                worker->job->status = JOB_STATUS_QUEUED;
                    264:                                this->jobs[worker->priority]->insert_last(
                    265:                                                                        this->jobs[worker->priority], worker->job);
                    266:                                this->job_added->signal(this->job_added);
                    267:                                break;
                    268:                        case JOB_REQUEUE_TYPE_SCHEDULE:
                    269:                                /* scheduler_t does not hold its lock when queuing jobs
                    270:                                 * so this should be safe without unlocking our mutex */
                    271:                                switch (requeue.schedule)
                    272:                                {
                    273:                                        case JOB_SCHEDULE:
                    274:                                                lib->scheduler->schedule_job(lib->scheduler,
                    275:                                                                                                worker->job, requeue.time.rel);
                    276:                                                break;
                    277:                                        case JOB_SCHEDULE_MS:
                    278:                                                lib->scheduler->schedule_job_ms(lib->scheduler,
                    279:                                                                                                worker->job, requeue.time.rel);
                    280:                                                break;
                    281:                                        case JOB_SCHEDULE_TV:
                    282:                                                lib->scheduler->schedule_job_tv(lib->scheduler,
                    283:                                                                                                worker->job, requeue.time.abs);
                    284:                                                break;
                    285:                                }
                    286:                                break;
                    287:                        default:
                    288:                                break;
                    289:                }
                    290:        }
                    291:        /* unset the current job to avoid interference with cancel() when
                    292:         * destroying the job below */
                    293:        worker->job = NULL;
                    294: 
                    295:        if (to_destroy)
                    296:        {       /* release mutex to avoid deadlocks if the same lock is required
                    297:                 * during queue_job() and in the destructor called here */
                    298:                this->mutex->unlock(this->mutex);
                    299:                to_destroy->destroy(to_destroy);
                    300:                this->mutex->lock(this->mutex);
                    301:        }
                    302: }
                    303: 
                    304: /**
                    305:  * Process queued jobs, called by the worker threads
                    306:  */
                    307: static void process_jobs(worker_thread_t *worker)
                    308: {
                    309:        private_processor_t *this = worker->processor;
                    310: 
                    311:        /* worker threads are not cancelable by default */
                    312:        thread_cancelability(FALSE);
                    313: 
                    314:        DBG2(DBG_JOB, "started worker thread %.2u", thread_current_id());
                    315: 
                    316:        this->mutex->lock(this->mutex);
                    317:        while (this->desired_threads >= this->total_threads)
                    318:        {
                    319:                if (get_job(this, worker))
                    320:                {
                    321:                        process_job(this, worker);
                    322:                }
                    323:                else
                    324:                {
                    325:                        this->job_added->wait(this->job_added, this->mutex);
                    326:                }
                    327:        }
                    328:        this->total_threads--;
                    329:        this->thread_terminated->signal(this->thread_terminated);
                    330:        this->mutex->unlock(this->mutex);
                    331: }
                    332: 
                    333: METHOD(processor_t, get_total_threads, u_int,
                    334:        private_processor_t *this)
                    335: {
                    336:        u_int count;
                    337: 
                    338:        this->mutex->lock(this->mutex);
                    339:        count = this->total_threads;
                    340:        this->mutex->unlock(this->mutex);
                    341:        return count;
                    342: }
                    343: 
                    344: METHOD(processor_t, get_idle_threads, u_int,
                    345:        private_processor_t *this)
                    346: {
                    347:        u_int count;
                    348: 
                    349:        this->mutex->lock(this->mutex);
                    350:        count = get_idle_threads_nolock(this);
                    351:        this->mutex->unlock(this->mutex);
                    352:        return count;
                    353: }
                    354: 
                    355: /**
                    356:  * Check priority bounds
                    357:  */
                    358: static job_priority_t sane_prio(job_priority_t prio)
                    359: {
                    360:        if ((int)prio < 0 || prio >= JOB_PRIO_MAX)
                    361:        {
                    362:                return JOB_PRIO_MAX - 1;
                    363:        }
                    364:        return prio;
                    365: }
                    366: 
                    367: METHOD(processor_t, get_working_threads, u_int,
                    368:        private_processor_t *this, job_priority_t prio)
                    369: {
                    370:        u_int count;
                    371: 
                    372:        this->mutex->lock(this->mutex);
                    373:        count = this->working_threads[sane_prio(prio)];
                    374:        this->mutex->unlock(this->mutex);
                    375:        return count;
                    376: }
                    377: 
                    378: METHOD(processor_t, get_job_load, u_int,
                    379:        private_processor_t *this, job_priority_t prio)
                    380: {
                    381:        u_int load;
                    382: 
                    383:        prio = sane_prio(prio);
                    384:        this->mutex->lock(this->mutex);
                    385:        load = this->jobs[prio]->get_count(this->jobs[prio]);
                    386:        this->mutex->unlock(this->mutex);
                    387:        return load;
                    388: }
                    389: 
                    390: METHOD(processor_t, queue_job, void,
                    391:        private_processor_t *this, job_t *job)
                    392: {
                    393:        job_priority_t prio;
                    394: 
                    395:        prio = sane_prio(job->get_priority(job));
                    396:        job->status = JOB_STATUS_QUEUED;
                    397: 
                    398:        this->mutex->lock(this->mutex);
                    399:        this->jobs[prio]->insert_last(this->jobs[prio], job);
                    400:        this->job_added->signal(this->job_added);
                    401:        this->mutex->unlock(this->mutex);
                    402: }
                    403: 
                    404: METHOD(processor_t, execute_job, void,
                    405:        private_processor_t *this, job_t *job)
                    406: {
                    407:        job_priority_t prio;
                    408:        bool queued = FALSE;
                    409: 
                    410:        this->mutex->lock(this->mutex);
                    411:        if (this->desired_threads && get_idle_threads_nolock(this))
                    412:        {
                    413:                prio = sane_prio(job->get_priority(job));
                    414:                job->status = JOB_STATUS_QUEUED;
                    415:                /* insert job in front to execute it immediately */
                    416:                this->jobs[prio]->insert_first(this->jobs[prio], job);
                    417:                queued = TRUE;
                    418:        }
                    419:        this->job_added->signal(this->job_added);
                    420:        this->mutex->unlock(this->mutex);
                    421: 
                    422:        if (!queued)
                    423:        {
                    424:                job->execute(job);
                    425:                job->destroy(job);
                    426:        }
                    427: }
                    428: 
                    429: METHOD(processor_t, set_threads, void,
                    430:        private_processor_t *this, u_int count)
                    431: {
                    432:        int i;
                    433: 
                    434:        this->mutex->lock(this->mutex);
                    435:        for (i = 0; i < JOB_PRIO_MAX; i++)
                    436:        {
                    437:                this->prio_threads[i] = lib->settings->get_int(lib->settings,
                    438:                                                "%s.processor.priority_threads.%N", 0, lib->ns,
                    439:                                                job_priority_names, i);
                    440:        }
                    441:        if (count > this->total_threads)
                    442:        {       /* increase thread count */
                    443:                worker_thread_t *worker;
                    444:                int i;
                    445: 
                    446:                this->desired_threads = count;
                    447:                DBG1(DBG_JOB, "spawning %d worker threads", count - this->total_threads);
                    448:                for (i = this->total_threads; i < count; i++)
                    449:                {
                    450:                        INIT(worker,
                    451:                                .processor = this,
                    452:                        );
                    453:                        worker->thread = thread_create((thread_main_t)process_jobs, worker);
                    454:                        if (worker->thread)
                    455:                        {
                    456:                                this->threads->insert_last(this->threads, worker);
                    457:                                this->total_threads++;
                    458:                        }
                    459:                        else
                    460:                        {
                    461:                                free(worker);
                    462:                        }
                    463:                }
                    464:        }
                    465:        else if (count < this->total_threads)
                    466:        {       /* decrease thread count */
                    467:                this->desired_threads = count;
                    468:        }
                    469:        this->job_added->broadcast(this->job_added);
                    470:        this->mutex->unlock(this->mutex);
                    471: }
                    472: 
                    473: METHOD(processor_t, cancel, void,
                    474:        private_processor_t *this)
                    475: {
                    476:        enumerator_t *enumerator;
                    477:        worker_thread_t *worker;
                    478:        job_t *job;
                    479:        int i;
                    480: 
                    481:        this->mutex->lock(this->mutex);
                    482:        this->desired_threads = 0;
                    483:        /* cancel potentially blocking jobs */
                    484:        enumerator = this->threads->create_enumerator(this->threads);
                    485:        while (enumerator->enumerate(enumerator, (void**)&worker))
                    486:        {
                    487:                if (worker->job && worker->job->cancel)
                    488:                {
                    489:                        worker->job->status = JOB_STATUS_CANCELED;
                    490:                        if (!worker->job->cancel(worker->job))
                    491:                        {       /* job requests to be canceled explicitly, otherwise we assume
                    492:                                 * the thread terminates itself and can be joined */
                    493:                                worker->thread->cancel(worker->thread);
                    494:                        }
                    495:                }
                    496:        }
                    497:        enumerator->destroy(enumerator);
                    498:        while (this->total_threads > 0)
                    499:        {
                    500:                this->job_added->broadcast(this->job_added);
                    501:                this->thread_terminated->wait(this->thread_terminated, this->mutex);
                    502:        }
                    503:        while (this->threads->remove_first(this->threads,
                    504:                                                                          (void**)&worker) == SUCCESS)
                    505:        {
                    506:                worker->thread->join(worker->thread);
                    507:                free(worker);
                    508:        }
                    509:        for (i = 0; i < JOB_PRIO_MAX; i++)
                    510:        {
                    511:                while (this->jobs[i]->remove_first(this->jobs[i],
                    512:                                                                                   (void**)&job) == SUCCESS)
                    513:                {
                    514:                        job->destroy(job);
                    515:                }
                    516:        }
                    517:        this->mutex->unlock(this->mutex);
                    518: }
                    519: 
                    520: METHOD(processor_t, destroy, void,
                    521:        private_processor_t *this)
                    522: {
                    523:        int i;
                    524: 
                    525:        cancel(this);
                    526:        this->thread_terminated->destroy(this->thread_terminated);
                    527:        this->job_added->destroy(this->job_added);
                    528:        this->mutex->destroy(this->mutex);
                    529:        for (i = 0; i < JOB_PRIO_MAX; i++)
                    530:        {
                    531:                this->jobs[i]->destroy(this->jobs[i]);
                    532:        }
                    533:        this->threads->destroy(this->threads);
                    534:        free(this);
                    535: }
                    536: 
                    537: /*
                    538:  * Described in header.
                    539:  */
                    540: processor_t *processor_create()
                    541: {
                    542:        private_processor_t *this;
                    543:        int i;
                    544: 
                    545:        INIT(this,
                    546:                .public = {
                    547:                        .get_total_threads = _get_total_threads,
                    548:                        .get_idle_threads = _get_idle_threads,
                    549:                        .get_working_threads = _get_working_threads,
                    550:                        .get_job_load = _get_job_load,
                    551:                        .queue_job = _queue_job,
                    552:                        .execute_job = _execute_job,
                    553:                        .set_threads = _set_threads,
                    554:                        .cancel = _cancel,
                    555:                        .destroy = _destroy,
                    556:                },
                    557:                .threads = linked_list_create(),
                    558:                .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
                    559:                .job_added = condvar_create(CONDVAR_TYPE_DEFAULT),
                    560:                .thread_terminated = condvar_create(CONDVAR_TYPE_DEFAULT),
                    561:        );
                    562: 
                    563:        for (i = 0; i < JOB_PRIO_MAX; i++)
                    564:        {
                    565:                this->jobs[i] = linked_list_create();
                    566:        }
                    567:        return &this->public;
                    568: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>