Annotation of embedaddon/strongswan/src/libstrongswan/processing/processor.c, revision 1.1
1.1 ! misho 1: /*
! 2: * Copyright (C) 2005-2011 Martin Willi
! 3: * Copyright (C) 2011 revosec AG
! 4: * Copyright (C) 2008-2013 Tobias Brunner
! 5: * Copyright (C) 2005 Jan Hutter
! 6: * HSR Hochschule fuer Technik Rapperswil
! 7: *
! 8: * This program is free software; you can redistribute it and/or modify it
! 9: * under the terms of the GNU General Public License as published by the
! 10: * Free Software Foundation; either version 2 of the License, or (at your
! 11: * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
! 12: *
! 13: * This program is distributed in the hope that it will be useful, but
! 14: * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
! 15: * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
! 16: * for more details.
! 17: */
! 18:
! 19: #include <stdlib.h>
! 20: #include <string.h>
! 21: #include <errno.h>
! 22:
! 23: #include "processor.h"
! 24:
! 25: #include <utils/debug.h>
! 26: #include <threading/thread.h>
! 27: #include <threading/condvar.h>
! 28: #include <threading/mutex.h>
! 29: #include <threading/thread_value.h>
! 30: #include <collections/linked_list.h>
! 31:
! 32: typedef struct private_processor_t private_processor_t;
! 33:
! 34: /**
! 35: * Private data of processor_t class.
! 36: */
! 37: struct private_processor_t {
! 38:
! 39: /**
! 40: * Public processor_t interface.
! 41: */
! 42: processor_t public;
! 43:
! 44: /**
! 45: * Number of running threads
! 46: */
! 47: u_int total_threads;
! 48:
! 49: /**
! 50: * Desired number of threads
! 51: */
! 52: u_int desired_threads;
! 53:
! 54: /**
! 55: * Number of threads currently working, for each priority
! 56: */
! 57: u_int working_threads[JOB_PRIO_MAX];
! 58:
! 59: /**
! 60: * All threads managed in the pool (including threads that have been
! 61: * canceled, this allows to join them later), as worker_thread_t
! 62: */
! 63: linked_list_t *threads;
! 64:
! 65: /**
! 66: * A list of queued jobs for each priority
! 67: */
! 68: linked_list_t *jobs[JOB_PRIO_MAX];
! 69:
! 70: /**
! 71: * Threads reserved for each priority
! 72: */
! 73: int prio_threads[JOB_PRIO_MAX];
! 74:
! 75: /**
! 76: * access to job lists is locked through this mutex
! 77: */
! 78: mutex_t *mutex;
! 79:
! 80: /**
! 81: * Condvar to wait for new jobs
! 82: */
! 83: condvar_t *job_added;
! 84:
! 85: /**
! 86: * Condvar to wait for terminated threads
! 87: */
! 88: condvar_t *thread_terminated;
! 89: };
! 90:
! 91: /**
! 92: * Worker thread
! 93: */
! 94: typedef struct {
! 95:
! 96: /**
! 97: * Reference to the processor
! 98: */
! 99: private_processor_t *processor;
! 100:
! 101: /**
! 102: * The actual thread
! 103: */
! 104: thread_t *thread;
! 105:
! 106: /**
! 107: * Job currently being executed by this worker thread
! 108: */
! 109: job_t *job;
! 110:
! 111: /**
! 112: * Priority of the current job
! 113: */
! 114: job_priority_t priority;
! 115:
! 116: } worker_thread_t;
! 117:
! 118: static void process_jobs(worker_thread_t *worker);
! 119:
! 120: /**
! 121: * restart a terminated thread
! 122: */
! 123: static void restart(worker_thread_t *worker)
! 124: {
! 125: private_processor_t *this = worker->processor;
! 126: job_t *job;
! 127:
! 128: DBG2(DBG_JOB, "terminated worker thread %.2u", thread_current_id());
! 129:
! 130: this->mutex->lock(this->mutex);
! 131: /* cleanup worker thread */
! 132: this->working_threads[worker->priority]--;
! 133: worker->job->status = JOB_STATUS_CANCELED;
! 134: job = worker->job;
! 135: /* unset the job before releasing the mutex, otherwise cancel() might
! 136: * interfere */
! 137: worker->job = NULL;
! 138: /* release mutex to avoid deadlocks if the same lock is required
! 139: * during queue_job() and in the destructor called here */
! 140: this->mutex->unlock(this->mutex);
! 141: job->destroy(job);
! 142: this->mutex->lock(this->mutex);
! 143:
! 144: /* respawn thread if required */
! 145: if (this->desired_threads >= this->total_threads)
! 146: {
! 147: worker_thread_t *new_worker;
! 148:
! 149: INIT(new_worker,
! 150: .processor = this,
! 151: );
! 152: new_worker->thread = thread_create((thread_main_t)process_jobs,
! 153: new_worker);
! 154: if (new_worker->thread)
! 155: {
! 156: this->threads->insert_last(this->threads, new_worker);
! 157: this->mutex->unlock(this->mutex);
! 158: return;
! 159: }
! 160: free(new_worker);
! 161: }
! 162: this->total_threads--;
! 163: this->thread_terminated->signal(this->thread_terminated);
! 164: this->mutex->unlock(this->mutex);
! 165: }
! 166:
! 167: /**
! 168: * Get number of idle threads, non-locking variant
! 169: */
! 170: static u_int get_idle_threads_nolock(private_processor_t *this)
! 171: {
! 172: u_int count, i;
! 173:
! 174: count = this->total_threads;
! 175: for (i = 0; i < JOB_PRIO_MAX; i++)
! 176: {
! 177: count -= this->working_threads[i];
! 178: }
! 179: return count;
! 180: }
! 181:
! 182: /**
! 183: * Get a job from any job queue, starting with the highest priority.
! 184: *
! 185: * this->mutex is expected to be locked.
! 186: */
! 187: static bool get_job(private_processor_t *this, worker_thread_t *worker)
! 188: {
! 189: int i, reserved = 0, idle;
! 190:
! 191: idle = get_idle_threads_nolock(this);
! 192:
! 193: for (i = 0; i < JOB_PRIO_MAX; i++)
! 194: {
! 195: if (reserved && reserved >= idle)
! 196: {
! 197: DBG2(DBG_JOB, "delaying %N priority jobs: %d threads idle, "
! 198: "but %d reserved for higher priorities",
! 199: job_priority_names, i, idle, reserved);
! 200: /* wait until a job of higher priority gets queued */
! 201: return FALSE;
! 202: }
! 203: if (this->working_threads[i] < this->prio_threads[i])
! 204: {
! 205: reserved += this->prio_threads[i] - this->working_threads[i];
! 206: }
! 207: if (this->jobs[i]->remove_first(this->jobs[i],
! 208: (void**)&worker->job) == SUCCESS)
! 209: {
! 210: worker->priority = i;
! 211: return TRUE;
! 212: }
! 213: }
! 214: return FALSE;
! 215: }
! 216:
! 217: /**
! 218: * Process a single job (provided in worker->job, worker->priority is also
! 219: * expected to be set)
! 220: *
! 221: * this->mutex is expected to be locked.
! 222: */
! 223: static void process_job(private_processor_t *this, worker_thread_t *worker)
! 224: {
! 225: job_t *to_destroy = NULL;
! 226: job_requeue_t requeue;
! 227:
! 228: this->working_threads[worker->priority]++;
! 229: worker->job->status = JOB_STATUS_EXECUTING;
! 230: this->mutex->unlock(this->mutex);
! 231: /* canceled threads are restarted to get a constant pool */
! 232: thread_cleanup_push((thread_cleanup_t)restart, worker);
! 233: while (TRUE)
! 234: {
! 235: requeue = worker->job->execute(worker->job);
! 236: if (requeue.type != JOB_REQUEUE_TYPE_DIRECT)
! 237: {
! 238: break;
! 239: }
! 240: else if (!worker->job->cancel)
! 241: { /* only allow cancelable jobs to requeue directly */
! 242: requeue.type = JOB_REQUEUE_TYPE_FAIR;
! 243: break;
! 244: }
! 245: }
! 246: thread_cleanup_pop(FALSE);
! 247: this->mutex->lock(this->mutex);
! 248: this->working_threads[worker->priority]--;
! 249: if (worker->job->status == JOB_STATUS_CANCELED)
! 250: { /* job was canceled via a custom cancel() method or did not
! 251: * use JOB_REQUEUE_TYPE_DIRECT */
! 252: to_destroy = worker->job;
! 253: }
! 254: else
! 255: {
! 256: switch (requeue.type)
! 257: {
! 258: case JOB_REQUEUE_TYPE_NONE:
! 259: worker->job->status = JOB_STATUS_DONE;
! 260: to_destroy = worker->job;
! 261: break;
! 262: case JOB_REQUEUE_TYPE_FAIR:
! 263: worker->job->status = JOB_STATUS_QUEUED;
! 264: this->jobs[worker->priority]->insert_last(
! 265: this->jobs[worker->priority], worker->job);
! 266: this->job_added->signal(this->job_added);
! 267: break;
! 268: case JOB_REQUEUE_TYPE_SCHEDULE:
! 269: /* scheduler_t does not hold its lock when queuing jobs
! 270: * so this should be safe without unlocking our mutex */
! 271: switch (requeue.schedule)
! 272: {
! 273: case JOB_SCHEDULE:
! 274: lib->scheduler->schedule_job(lib->scheduler,
! 275: worker->job, requeue.time.rel);
! 276: break;
! 277: case JOB_SCHEDULE_MS:
! 278: lib->scheduler->schedule_job_ms(lib->scheduler,
! 279: worker->job, requeue.time.rel);
! 280: break;
! 281: case JOB_SCHEDULE_TV:
! 282: lib->scheduler->schedule_job_tv(lib->scheduler,
! 283: worker->job, requeue.time.abs);
! 284: break;
! 285: }
! 286: break;
! 287: default:
! 288: break;
! 289: }
! 290: }
! 291: /* unset the current job to avoid interference with cancel() when
! 292: * destroying the job below */
! 293: worker->job = NULL;
! 294:
! 295: if (to_destroy)
! 296: { /* release mutex to avoid deadlocks if the same lock is required
! 297: * during queue_job() and in the destructor called here */
! 298: this->mutex->unlock(this->mutex);
! 299: to_destroy->destroy(to_destroy);
! 300: this->mutex->lock(this->mutex);
! 301: }
! 302: }
! 303:
! 304: /**
! 305: * Process queued jobs, called by the worker threads
! 306: */
! 307: static void process_jobs(worker_thread_t *worker)
! 308: {
! 309: private_processor_t *this = worker->processor;
! 310:
! 311: /* worker threads are not cancelable by default */
! 312: thread_cancelability(FALSE);
! 313:
! 314: DBG2(DBG_JOB, "started worker thread %.2u", thread_current_id());
! 315:
! 316: this->mutex->lock(this->mutex);
! 317: while (this->desired_threads >= this->total_threads)
! 318: {
! 319: if (get_job(this, worker))
! 320: {
! 321: process_job(this, worker);
! 322: }
! 323: else
! 324: {
! 325: this->job_added->wait(this->job_added, this->mutex);
! 326: }
! 327: }
! 328: this->total_threads--;
! 329: this->thread_terminated->signal(this->thread_terminated);
! 330: this->mutex->unlock(this->mutex);
! 331: }
! 332:
! 333: METHOD(processor_t, get_total_threads, u_int,
! 334: private_processor_t *this)
! 335: {
! 336: u_int count;
! 337:
! 338: this->mutex->lock(this->mutex);
! 339: count = this->total_threads;
! 340: this->mutex->unlock(this->mutex);
! 341: return count;
! 342: }
! 343:
! 344: METHOD(processor_t, get_idle_threads, u_int,
! 345: private_processor_t *this)
! 346: {
! 347: u_int count;
! 348:
! 349: this->mutex->lock(this->mutex);
! 350: count = get_idle_threads_nolock(this);
! 351: this->mutex->unlock(this->mutex);
! 352: return count;
! 353: }
! 354:
! 355: /**
! 356: * Check priority bounds
! 357: */
! 358: static job_priority_t sane_prio(job_priority_t prio)
! 359: {
! 360: if ((int)prio < 0 || prio >= JOB_PRIO_MAX)
! 361: {
! 362: return JOB_PRIO_MAX - 1;
! 363: }
! 364: return prio;
! 365: }
! 366:
! 367: METHOD(processor_t, get_working_threads, u_int,
! 368: private_processor_t *this, job_priority_t prio)
! 369: {
! 370: u_int count;
! 371:
! 372: this->mutex->lock(this->mutex);
! 373: count = this->working_threads[sane_prio(prio)];
! 374: this->mutex->unlock(this->mutex);
! 375: return count;
! 376: }
! 377:
! 378: METHOD(processor_t, get_job_load, u_int,
! 379: private_processor_t *this, job_priority_t prio)
! 380: {
! 381: u_int load;
! 382:
! 383: prio = sane_prio(prio);
! 384: this->mutex->lock(this->mutex);
! 385: load = this->jobs[prio]->get_count(this->jobs[prio]);
! 386: this->mutex->unlock(this->mutex);
! 387: return load;
! 388: }
! 389:
! 390: METHOD(processor_t, queue_job, void,
! 391: private_processor_t *this, job_t *job)
! 392: {
! 393: job_priority_t prio;
! 394:
! 395: prio = sane_prio(job->get_priority(job));
! 396: job->status = JOB_STATUS_QUEUED;
! 397:
! 398: this->mutex->lock(this->mutex);
! 399: this->jobs[prio]->insert_last(this->jobs[prio], job);
! 400: this->job_added->signal(this->job_added);
! 401: this->mutex->unlock(this->mutex);
! 402: }
! 403:
! 404: METHOD(processor_t, execute_job, void,
! 405: private_processor_t *this, job_t *job)
! 406: {
! 407: job_priority_t prio;
! 408: bool queued = FALSE;
! 409:
! 410: this->mutex->lock(this->mutex);
! 411: if (this->desired_threads && get_idle_threads_nolock(this))
! 412: {
! 413: prio = sane_prio(job->get_priority(job));
! 414: job->status = JOB_STATUS_QUEUED;
! 415: /* insert job in front to execute it immediately */
! 416: this->jobs[prio]->insert_first(this->jobs[prio], job);
! 417: queued = TRUE;
! 418: }
! 419: this->job_added->signal(this->job_added);
! 420: this->mutex->unlock(this->mutex);
! 421:
! 422: if (!queued)
! 423: {
! 424: job->execute(job);
! 425: job->destroy(job);
! 426: }
! 427: }
! 428:
! 429: METHOD(processor_t, set_threads, void,
! 430: private_processor_t *this, u_int count)
! 431: {
! 432: int i;
! 433:
! 434: this->mutex->lock(this->mutex);
! 435: for (i = 0; i < JOB_PRIO_MAX; i++)
! 436: {
! 437: this->prio_threads[i] = lib->settings->get_int(lib->settings,
! 438: "%s.processor.priority_threads.%N", 0, lib->ns,
! 439: job_priority_names, i);
! 440: }
! 441: if (count > this->total_threads)
! 442: { /* increase thread count */
! 443: worker_thread_t *worker;
! 444: int i;
! 445:
! 446: this->desired_threads = count;
! 447: DBG1(DBG_JOB, "spawning %d worker threads", count - this->total_threads);
! 448: for (i = this->total_threads; i < count; i++)
! 449: {
! 450: INIT(worker,
! 451: .processor = this,
! 452: );
! 453: worker->thread = thread_create((thread_main_t)process_jobs, worker);
! 454: if (worker->thread)
! 455: {
! 456: this->threads->insert_last(this->threads, worker);
! 457: this->total_threads++;
! 458: }
! 459: else
! 460: {
! 461: free(worker);
! 462: }
! 463: }
! 464: }
! 465: else if (count < this->total_threads)
! 466: { /* decrease thread count */
! 467: this->desired_threads = count;
! 468: }
! 469: this->job_added->broadcast(this->job_added);
! 470: this->mutex->unlock(this->mutex);
! 471: }
! 472:
! 473: METHOD(processor_t, cancel, void,
! 474: private_processor_t *this)
! 475: {
! 476: enumerator_t *enumerator;
! 477: worker_thread_t *worker;
! 478: job_t *job;
! 479: int i;
! 480:
! 481: this->mutex->lock(this->mutex);
! 482: this->desired_threads = 0;
! 483: /* cancel potentially blocking jobs */
! 484: enumerator = this->threads->create_enumerator(this->threads);
! 485: while (enumerator->enumerate(enumerator, (void**)&worker))
! 486: {
! 487: if (worker->job && worker->job->cancel)
! 488: {
! 489: worker->job->status = JOB_STATUS_CANCELED;
! 490: if (!worker->job->cancel(worker->job))
! 491: { /* job requests to be canceled explicitly, otherwise we assume
! 492: * the thread terminates itself and can be joined */
! 493: worker->thread->cancel(worker->thread);
! 494: }
! 495: }
! 496: }
! 497: enumerator->destroy(enumerator);
! 498: while (this->total_threads > 0)
! 499: {
! 500: this->job_added->broadcast(this->job_added);
! 501: this->thread_terminated->wait(this->thread_terminated, this->mutex);
! 502: }
! 503: while (this->threads->remove_first(this->threads,
! 504: (void**)&worker) == SUCCESS)
! 505: {
! 506: worker->thread->join(worker->thread);
! 507: free(worker);
! 508: }
! 509: for (i = 0; i < JOB_PRIO_MAX; i++)
! 510: {
! 511: while (this->jobs[i]->remove_first(this->jobs[i],
! 512: (void**)&job) == SUCCESS)
! 513: {
! 514: job->destroy(job);
! 515: }
! 516: }
! 517: this->mutex->unlock(this->mutex);
! 518: }
! 519:
! 520: METHOD(processor_t, destroy, void,
! 521: private_processor_t *this)
! 522: {
! 523: int i;
! 524:
! 525: cancel(this);
! 526: this->thread_terminated->destroy(this->thread_terminated);
! 527: this->job_added->destroy(this->job_added);
! 528: this->mutex->destroy(this->mutex);
! 529: for (i = 0; i < JOB_PRIO_MAX; i++)
! 530: {
! 531: this->jobs[i]->destroy(this->jobs[i]);
! 532: }
! 533: this->threads->destroy(this->threads);
! 534: free(this);
! 535: }
! 536:
! 537: /*
! 538: * Described in header.
! 539: */
! 540: processor_t *processor_create()
! 541: {
! 542: private_processor_t *this;
! 543: int i;
! 544:
! 545: INIT(this,
! 546: .public = {
! 547: .get_total_threads = _get_total_threads,
! 548: .get_idle_threads = _get_idle_threads,
! 549: .get_working_threads = _get_working_threads,
! 550: .get_job_load = _get_job_load,
! 551: .queue_job = _queue_job,
! 552: .execute_job = _execute_job,
! 553: .set_threads = _set_threads,
! 554: .cancel = _cancel,
! 555: .destroy = _destroy,
! 556: },
! 557: .threads = linked_list_create(),
! 558: .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
! 559: .job_added = condvar_create(CONDVAR_TYPE_DEFAULT),
! 560: .thread_terminated = condvar_create(CONDVAR_TYPE_DEFAULT),
! 561: );
! 562:
! 563: for (i = 0; i < JOB_PRIO_MAX; i++)
! 564: {
! 565: this->jobs[i] = linked_list_create();
! 566: }
! 567: return &this->public;
! 568: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>