Annotation of embedaddon/strongswan/src/libstrongswan/processing/processor.c, revision 1.1

1.1     ! misho       1: /*
        !             2:  * Copyright (C) 2005-2011 Martin Willi
        !             3:  * Copyright (C) 2011 revosec AG
        !             4:  * Copyright (C) 2008-2013 Tobias Brunner
        !             5:  * Copyright (C) 2005 Jan Hutter
        !             6:  * HSR Hochschule fuer Technik Rapperswil
        !             7:  *
        !             8:  * This program is free software; you can redistribute it and/or modify it
        !             9:  * under the terms of the GNU General Public License as published by the
        !            10:  * Free Software Foundation; either version 2 of the License, or (at your
        !            11:  * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
        !            12:  *
        !            13:  * This program is distributed in the hope that it will be useful, but
        !            14:  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
        !            15:  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
        !            16:  * for more details.
        !            17:  */
        !            18: 
        !            19: #include <stdlib.h>
        !            20: #include <string.h>
        !            21: #include <errno.h>
        !            22: 
        !            23: #include "processor.h"
        !            24: 
        !            25: #include <utils/debug.h>
        !            26: #include <threading/thread.h>
        !            27: #include <threading/condvar.h>
        !            28: #include <threading/mutex.h>
        !            29: #include <threading/thread_value.h>
        !            30: #include <collections/linked_list.h>
        !            31: 
        !            32: typedef struct private_processor_t private_processor_t;
        !            33: 
        !            34: /**
        !            35:  * Private data of processor_t class.
        !            36:  */
        !            37: struct private_processor_t {
        !            38: 
        !            39:        /**
        !            40:         * Public processor_t interface.
        !            41:         */
        !            42:        processor_t public;
        !            43: 
        !            44:        /**
        !            45:         * Number of running threads
        !            46:         */
        !            47:        u_int total_threads;
        !            48: 
        !            49:        /**
        !            50:         * Desired number of threads
        !            51:         */
        !            52:        u_int desired_threads;
        !            53: 
        !            54:        /**
        !            55:         * Number of threads currently working, for each priority
        !            56:         */
        !            57:        u_int working_threads[JOB_PRIO_MAX];
        !            58: 
        !            59:        /**
        !            60:         * All threads managed in the pool (including threads that have been
        !            61:         * canceled, this allows to join them later), as worker_thread_t
        !            62:         */
        !            63:        linked_list_t *threads;
        !            64: 
        !            65:        /**
        !            66:         * A list of queued jobs for each priority
        !            67:         */
        !            68:        linked_list_t *jobs[JOB_PRIO_MAX];
        !            69: 
        !            70:        /**
        !            71:         * Threads reserved for each priority
        !            72:         */
        !            73:        int prio_threads[JOB_PRIO_MAX];
        !            74: 
        !            75:        /**
        !            76:         * access to job lists is locked through this mutex
        !            77:         */
        !            78:        mutex_t *mutex;
        !            79: 
        !            80:        /**
        !            81:         * Condvar to wait for new jobs
        !            82:         */
        !            83:        condvar_t *job_added;
        !            84: 
        !            85:        /**
        !            86:         * Condvar to wait for terminated threads
        !            87:         */
        !            88:        condvar_t *thread_terminated;
        !            89: };
        !            90: 
        !            91: /**
        !            92:  * Worker thread
        !            93:  */
        !            94: typedef struct {
        !            95: 
        !            96:        /**
        !            97:         * Reference to the processor
        !            98:         */
        !            99:        private_processor_t *processor;
        !           100: 
        !           101:        /**
        !           102:         * The actual thread
        !           103:         */
        !           104:        thread_t *thread;
        !           105: 
        !           106:        /**
        !           107:         * Job currently being executed by this worker thread
        !           108:         */
        !           109:        job_t *job;
        !           110: 
        !           111:        /**
        !           112:         * Priority of the current job
        !           113:         */
        !           114:        job_priority_t priority;
        !           115: 
        !           116: } worker_thread_t;
        !           117: 
        !           118: static void process_jobs(worker_thread_t *worker);
        !           119: 
        !           120: /**
        !           121:  * restart a terminated thread
        !           122:  */
        !           123: static void restart(worker_thread_t *worker)
        !           124: {
        !           125:        private_processor_t *this = worker->processor;
        !           126:        job_t *job;
        !           127: 
        !           128:        DBG2(DBG_JOB, "terminated worker thread %.2u", thread_current_id());
        !           129: 
        !           130:        this->mutex->lock(this->mutex);
        !           131:        /* cleanup worker thread  */
        !           132:        this->working_threads[worker->priority]--;
        !           133:        worker->job->status = JOB_STATUS_CANCELED;
        !           134:        job = worker->job;
        !           135:        /* unset the job before releasing the mutex, otherwise cancel() might
        !           136:         * interfere */
        !           137:        worker->job = NULL;
        !           138:        /* release mutex to avoid deadlocks if the same lock is required
        !           139:         * during queue_job() and in the destructor called here */
        !           140:        this->mutex->unlock(this->mutex);
        !           141:        job->destroy(job);
        !           142:        this->mutex->lock(this->mutex);
        !           143: 
        !           144:        /* respawn thread if required */
        !           145:        if (this->desired_threads >= this->total_threads)
        !           146:        {
        !           147:                worker_thread_t *new_worker;
        !           148: 
        !           149:                INIT(new_worker,
        !           150:                        .processor = this,
        !           151:                );
        !           152:                new_worker->thread = thread_create((thread_main_t)process_jobs,
        !           153:                                                                                   new_worker);
        !           154:                if (new_worker->thread)
        !           155:                {
        !           156:                        this->threads->insert_last(this->threads, new_worker);
        !           157:                        this->mutex->unlock(this->mutex);
        !           158:                        return;
        !           159:                }
        !           160:                free(new_worker);
        !           161:        }
        !           162:        this->total_threads--;
        !           163:        this->thread_terminated->signal(this->thread_terminated);
        !           164:        this->mutex->unlock(this->mutex);
        !           165: }
        !           166: 
        !           167: /**
        !           168:  * Get number of idle threads, non-locking variant
        !           169:  */
        !           170: static u_int get_idle_threads_nolock(private_processor_t *this)
        !           171: {
        !           172:        u_int count, i;
        !           173: 
        !           174:        count = this->total_threads;
        !           175:        for (i = 0; i < JOB_PRIO_MAX; i++)
        !           176:        {
        !           177:                count -= this->working_threads[i];
        !           178:        }
        !           179:        return count;
        !           180: }
        !           181: 
        !           182: /**
        !           183:  * Get a job from any job queue, starting with the highest priority.
        !           184:  *
        !           185:  * this->mutex is expected to be locked.
        !           186:  */
        !           187: static bool get_job(private_processor_t *this, worker_thread_t *worker)
        !           188: {
        !           189:        int i, reserved = 0, idle;
        !           190: 
        !           191:        idle = get_idle_threads_nolock(this);
        !           192: 
        !           193:        for (i = 0; i < JOB_PRIO_MAX; i++)
        !           194:        {
        !           195:                if (reserved && reserved >= idle)
        !           196:                {
        !           197:                        DBG2(DBG_JOB, "delaying %N priority jobs: %d threads idle, "
        !           198:                                 "but %d reserved for higher priorities",
        !           199:                                 job_priority_names, i, idle, reserved);
        !           200:                        /* wait until a job of higher priority gets queued */
        !           201:                        return FALSE;
        !           202:                }
        !           203:                if (this->working_threads[i] < this->prio_threads[i])
        !           204:                {
        !           205:                        reserved += this->prio_threads[i] - this->working_threads[i];
        !           206:                }
        !           207:                if (this->jobs[i]->remove_first(this->jobs[i],
        !           208:                                                                                (void**)&worker->job) == SUCCESS)
        !           209:                {
        !           210:                        worker->priority = i;
        !           211:                        return TRUE;
        !           212:                }
        !           213:        }
        !           214:        return FALSE;
        !           215: }
        !           216: 
        !           217: /**
        !           218:  * Process a single job (provided in worker->job, worker->priority is also
        !           219:  * expected to be set)
        !           220:  *
        !           221:  * this->mutex is expected to be locked.
        !           222:  */
        !           223: static void process_job(private_processor_t *this, worker_thread_t *worker)
        !           224: {
        !           225:        job_t *to_destroy = NULL;
        !           226:        job_requeue_t requeue;
        !           227: 
        !           228:        this->working_threads[worker->priority]++;
        !           229:        worker->job->status = JOB_STATUS_EXECUTING;
        !           230:        this->mutex->unlock(this->mutex);
        !           231:        /* canceled threads are restarted to get a constant pool */
        !           232:        thread_cleanup_push((thread_cleanup_t)restart, worker);
        !           233:        while (TRUE)
        !           234:        {
        !           235:                requeue = worker->job->execute(worker->job);
        !           236:                if (requeue.type != JOB_REQUEUE_TYPE_DIRECT)
        !           237:                {
        !           238:                        break;
        !           239:                }
        !           240:                else if (!worker->job->cancel)
        !           241:                {       /* only allow cancelable jobs to requeue directly */
        !           242:                        requeue.type = JOB_REQUEUE_TYPE_FAIR;
        !           243:                        break;
        !           244:                }
        !           245:        }
        !           246:        thread_cleanup_pop(FALSE);
        !           247:        this->mutex->lock(this->mutex);
        !           248:        this->working_threads[worker->priority]--;
        !           249:        if (worker->job->status == JOB_STATUS_CANCELED)
        !           250:        {       /* job was canceled via a custom cancel() method or did not
        !           251:                 * use JOB_REQUEUE_TYPE_DIRECT */
        !           252:                to_destroy = worker->job;
        !           253:        }
        !           254:        else
        !           255:        {
        !           256:                switch (requeue.type)
        !           257:                {
        !           258:                        case JOB_REQUEUE_TYPE_NONE:
        !           259:                                worker->job->status = JOB_STATUS_DONE;
        !           260:                                to_destroy = worker->job;
        !           261:                                break;
        !           262:                        case JOB_REQUEUE_TYPE_FAIR:
        !           263:                                worker->job->status = JOB_STATUS_QUEUED;
        !           264:                                this->jobs[worker->priority]->insert_last(
        !           265:                                                                        this->jobs[worker->priority], worker->job);
        !           266:                                this->job_added->signal(this->job_added);
        !           267:                                break;
        !           268:                        case JOB_REQUEUE_TYPE_SCHEDULE:
        !           269:                                /* scheduler_t does not hold its lock when queuing jobs
        !           270:                                 * so this should be safe without unlocking our mutex */
        !           271:                                switch (requeue.schedule)
        !           272:                                {
        !           273:                                        case JOB_SCHEDULE:
        !           274:                                                lib->scheduler->schedule_job(lib->scheduler,
        !           275:                                                                                                worker->job, requeue.time.rel);
        !           276:                                                break;
        !           277:                                        case JOB_SCHEDULE_MS:
        !           278:                                                lib->scheduler->schedule_job_ms(lib->scheduler,
        !           279:                                                                                                worker->job, requeue.time.rel);
        !           280:                                                break;
        !           281:                                        case JOB_SCHEDULE_TV:
        !           282:                                                lib->scheduler->schedule_job_tv(lib->scheduler,
        !           283:                                                                                                worker->job, requeue.time.abs);
        !           284:                                                break;
        !           285:                                }
        !           286:                                break;
        !           287:                        default:
        !           288:                                break;
        !           289:                }
        !           290:        }
        !           291:        /* unset the current job to avoid interference with cancel() when
        !           292:         * destroying the job below */
        !           293:        worker->job = NULL;
        !           294: 
        !           295:        if (to_destroy)
        !           296:        {       /* release mutex to avoid deadlocks if the same lock is required
        !           297:                 * during queue_job() and in the destructor called here */
        !           298:                this->mutex->unlock(this->mutex);
        !           299:                to_destroy->destroy(to_destroy);
        !           300:                this->mutex->lock(this->mutex);
        !           301:        }
        !           302: }
        !           303: 
        !           304: /**
        !           305:  * Process queued jobs, called by the worker threads
        !           306:  */
        !           307: static void process_jobs(worker_thread_t *worker)
        !           308: {
        !           309:        private_processor_t *this = worker->processor;
        !           310: 
        !           311:        /* worker threads are not cancelable by default */
        !           312:        thread_cancelability(FALSE);
        !           313: 
        !           314:        DBG2(DBG_JOB, "started worker thread %.2u", thread_current_id());
        !           315: 
        !           316:        this->mutex->lock(this->mutex);
        !           317:        while (this->desired_threads >= this->total_threads)
        !           318:        {
        !           319:                if (get_job(this, worker))
        !           320:                {
        !           321:                        process_job(this, worker);
        !           322:                }
        !           323:                else
        !           324:                {
        !           325:                        this->job_added->wait(this->job_added, this->mutex);
        !           326:                }
        !           327:        }
        !           328:        this->total_threads--;
        !           329:        this->thread_terminated->signal(this->thread_terminated);
        !           330:        this->mutex->unlock(this->mutex);
        !           331: }
        !           332: 
        !           333: METHOD(processor_t, get_total_threads, u_int,
        !           334:        private_processor_t *this)
        !           335: {
        !           336:        u_int count;
        !           337: 
        !           338:        this->mutex->lock(this->mutex);
        !           339:        count = this->total_threads;
        !           340:        this->mutex->unlock(this->mutex);
        !           341:        return count;
        !           342: }
        !           343: 
        !           344: METHOD(processor_t, get_idle_threads, u_int,
        !           345:        private_processor_t *this)
        !           346: {
        !           347:        u_int count;
        !           348: 
        !           349:        this->mutex->lock(this->mutex);
        !           350:        count = get_idle_threads_nolock(this);
        !           351:        this->mutex->unlock(this->mutex);
        !           352:        return count;
        !           353: }
        !           354: 
        !           355: /**
        !           356:  * Check priority bounds
        !           357:  */
        !           358: static job_priority_t sane_prio(job_priority_t prio)
        !           359: {
        !           360:        if ((int)prio < 0 || prio >= JOB_PRIO_MAX)
        !           361:        {
        !           362:                return JOB_PRIO_MAX - 1;
        !           363:        }
        !           364:        return prio;
        !           365: }
        !           366: 
        !           367: METHOD(processor_t, get_working_threads, u_int,
        !           368:        private_processor_t *this, job_priority_t prio)
        !           369: {
        !           370:        u_int count;
        !           371: 
        !           372:        this->mutex->lock(this->mutex);
        !           373:        count = this->working_threads[sane_prio(prio)];
        !           374:        this->mutex->unlock(this->mutex);
        !           375:        return count;
        !           376: }
        !           377: 
        !           378: METHOD(processor_t, get_job_load, u_int,
        !           379:        private_processor_t *this, job_priority_t prio)
        !           380: {
        !           381:        u_int load;
        !           382: 
        !           383:        prio = sane_prio(prio);
        !           384:        this->mutex->lock(this->mutex);
        !           385:        load = this->jobs[prio]->get_count(this->jobs[prio]);
        !           386:        this->mutex->unlock(this->mutex);
        !           387:        return load;
        !           388: }
        !           389: 
        !           390: METHOD(processor_t, queue_job, void,
        !           391:        private_processor_t *this, job_t *job)
        !           392: {
        !           393:        job_priority_t prio;
        !           394: 
        !           395:        prio = sane_prio(job->get_priority(job));
        !           396:        job->status = JOB_STATUS_QUEUED;
        !           397: 
        !           398:        this->mutex->lock(this->mutex);
        !           399:        this->jobs[prio]->insert_last(this->jobs[prio], job);
        !           400:        this->job_added->signal(this->job_added);
        !           401:        this->mutex->unlock(this->mutex);
        !           402: }
        !           403: 
        !           404: METHOD(processor_t, execute_job, void,
        !           405:        private_processor_t *this, job_t *job)
        !           406: {
        !           407:        job_priority_t prio;
        !           408:        bool queued = FALSE;
        !           409: 
        !           410:        this->mutex->lock(this->mutex);
        !           411:        if (this->desired_threads && get_idle_threads_nolock(this))
        !           412:        {
        !           413:                prio = sane_prio(job->get_priority(job));
        !           414:                job->status = JOB_STATUS_QUEUED;
        !           415:                /* insert job in front to execute it immediately */
        !           416:                this->jobs[prio]->insert_first(this->jobs[prio], job);
        !           417:                queued = TRUE;
        !           418:        }
        !           419:        this->job_added->signal(this->job_added);
        !           420:        this->mutex->unlock(this->mutex);
        !           421: 
        !           422:        if (!queued)
        !           423:        {
        !           424:                job->execute(job);
        !           425:                job->destroy(job);
        !           426:        }
        !           427: }
        !           428: 
        !           429: METHOD(processor_t, set_threads, void,
        !           430:        private_processor_t *this, u_int count)
        !           431: {
        !           432:        int i;
        !           433: 
        !           434:        this->mutex->lock(this->mutex);
        !           435:        for (i = 0; i < JOB_PRIO_MAX; i++)
        !           436:        {
        !           437:                this->prio_threads[i] = lib->settings->get_int(lib->settings,
        !           438:                                                "%s.processor.priority_threads.%N", 0, lib->ns,
        !           439:                                                job_priority_names, i);
        !           440:        }
        !           441:        if (count > this->total_threads)
        !           442:        {       /* increase thread count */
        !           443:                worker_thread_t *worker;
        !           444:                int i;
        !           445: 
        !           446:                this->desired_threads = count;
        !           447:                DBG1(DBG_JOB, "spawning %d worker threads", count - this->total_threads);
        !           448:                for (i = this->total_threads; i < count; i++)
        !           449:                {
        !           450:                        INIT(worker,
        !           451:                                .processor = this,
        !           452:                        );
        !           453:                        worker->thread = thread_create((thread_main_t)process_jobs, worker);
        !           454:                        if (worker->thread)
        !           455:                        {
        !           456:                                this->threads->insert_last(this->threads, worker);
        !           457:                                this->total_threads++;
        !           458:                        }
        !           459:                        else
        !           460:                        {
        !           461:                                free(worker);
        !           462:                        }
        !           463:                }
        !           464:        }
        !           465:        else if (count < this->total_threads)
        !           466:        {       /* decrease thread count */
        !           467:                this->desired_threads = count;
        !           468:        }
        !           469:        this->job_added->broadcast(this->job_added);
        !           470:        this->mutex->unlock(this->mutex);
        !           471: }
        !           472: 
        !           473: METHOD(processor_t, cancel, void,
        !           474:        private_processor_t *this)
        !           475: {
        !           476:        enumerator_t *enumerator;
        !           477:        worker_thread_t *worker;
        !           478:        job_t *job;
        !           479:        int i;
        !           480: 
        !           481:        this->mutex->lock(this->mutex);
        !           482:        this->desired_threads = 0;
        !           483:        /* cancel potentially blocking jobs */
        !           484:        enumerator = this->threads->create_enumerator(this->threads);
        !           485:        while (enumerator->enumerate(enumerator, (void**)&worker))
        !           486:        {
        !           487:                if (worker->job && worker->job->cancel)
        !           488:                {
        !           489:                        worker->job->status = JOB_STATUS_CANCELED;
        !           490:                        if (!worker->job->cancel(worker->job))
        !           491:                        {       /* job requests to be canceled explicitly, otherwise we assume
        !           492:                                 * the thread terminates itself and can be joined */
        !           493:                                worker->thread->cancel(worker->thread);
        !           494:                        }
        !           495:                }
        !           496:        }
        !           497:        enumerator->destroy(enumerator);
        !           498:        while (this->total_threads > 0)
        !           499:        {
        !           500:                this->job_added->broadcast(this->job_added);
        !           501:                this->thread_terminated->wait(this->thread_terminated, this->mutex);
        !           502:        }
        !           503:        while (this->threads->remove_first(this->threads,
        !           504:                                                                          (void**)&worker) == SUCCESS)
        !           505:        {
        !           506:                worker->thread->join(worker->thread);
        !           507:                free(worker);
        !           508:        }
        !           509:        for (i = 0; i < JOB_PRIO_MAX; i++)
        !           510:        {
        !           511:                while (this->jobs[i]->remove_first(this->jobs[i],
        !           512:                                                                                   (void**)&job) == SUCCESS)
        !           513:                {
        !           514:                        job->destroy(job);
        !           515:                }
        !           516:        }
        !           517:        this->mutex->unlock(this->mutex);
        !           518: }
        !           519: 
        !           520: METHOD(processor_t, destroy, void,
        !           521:        private_processor_t *this)
        !           522: {
        !           523:        int i;
        !           524: 
        !           525:        cancel(this);
        !           526:        this->thread_terminated->destroy(this->thread_terminated);
        !           527:        this->job_added->destroy(this->job_added);
        !           528:        this->mutex->destroy(this->mutex);
        !           529:        for (i = 0; i < JOB_PRIO_MAX; i++)
        !           530:        {
        !           531:                this->jobs[i]->destroy(this->jobs[i]);
        !           532:        }
        !           533:        this->threads->destroy(this->threads);
        !           534:        free(this);
        !           535: }
        !           536: 
        !           537: /*
        !           538:  * Described in header.
        !           539:  */
        !           540: processor_t *processor_create()
        !           541: {
        !           542:        private_processor_t *this;
        !           543:        int i;
        !           544: 
        !           545:        INIT(this,
        !           546:                .public = {
        !           547:                        .get_total_threads = _get_total_threads,
        !           548:                        .get_idle_threads = _get_idle_threads,
        !           549:                        .get_working_threads = _get_working_threads,
        !           550:                        .get_job_load = _get_job_load,
        !           551:                        .queue_job = _queue_job,
        !           552:                        .execute_job = _execute_job,
        !           553:                        .set_threads = _set_threads,
        !           554:                        .cancel = _cancel,
        !           555:                        .destroy = _destroy,
        !           556:                },
        !           557:                .threads = linked_list_create(),
        !           558:                .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
        !           559:                .job_added = condvar_create(CONDVAR_TYPE_DEFAULT),
        !           560:                .thread_terminated = condvar_create(CONDVAR_TYPE_DEFAULT),
        !           561:        );
        !           562: 
        !           563:        for (i = 0; i < JOB_PRIO_MAX; i++)
        !           564:        {
        !           565:                this->jobs[i] = linked_list_create();
        !           566:        }
        !           567:        return &this->public;
        !           568: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>