/* * Copyright (C) 2005-2011 Martin Willi * Copyright (C) 2011 revosec AG * Copyright (C) 2008-2013 Tobias Brunner * Copyright (C) 2005 Jan Hutter * HSR Hochschule fuer Technik Rapperswil * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the * Free Software Foundation; either version 2 of the License, or (at your * option) any later version. See . * * This program is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * for more details. */ #include #include #include #include "processor.h" #include #include #include #include #include #include typedef struct private_processor_t private_processor_t; /** * Private data of processor_t class. */ struct private_processor_t { /** * Public processor_t interface. */ processor_t public; /** * Number of running threads */ u_int total_threads; /** * Desired number of threads */ u_int desired_threads; /** * Number of threads currently working, for each priority */ u_int working_threads[JOB_PRIO_MAX]; /** * All threads managed in the pool (including threads that have been * canceled, this allows to join them later), as worker_thread_t */ linked_list_t *threads; /** * A list of queued jobs for each priority */ linked_list_t *jobs[JOB_PRIO_MAX]; /** * Threads reserved for each priority */ int prio_threads[JOB_PRIO_MAX]; /** * access to job lists is locked through this mutex */ mutex_t *mutex; /** * Condvar to wait for new jobs */ condvar_t *job_added; /** * Condvar to wait for terminated threads */ condvar_t *thread_terminated; }; /** * Worker thread */ typedef struct { /** * Reference to the processor */ private_processor_t *processor; /** * The actual thread */ thread_t *thread; /** * Job currently being executed by this worker thread */ job_t *job; /** * Priority of the current job */ job_priority_t priority; } worker_thread_t; static void process_jobs(worker_thread_t *worker); /** * restart a terminated thread */ static void restart(worker_thread_t *worker) { private_processor_t *this = worker->processor; job_t *job; DBG2(DBG_JOB, "terminated worker thread %.2u", thread_current_id()); this->mutex->lock(this->mutex); /* cleanup worker thread */ this->working_threads[worker->priority]--; worker->job->status = JOB_STATUS_CANCELED; job = worker->job; /* unset the job before releasing the mutex, otherwise cancel() might * interfere */ worker->job = NULL; /* release mutex to avoid deadlocks if the same lock is required * during queue_job() and in the destructor called here */ this->mutex->unlock(this->mutex); job->destroy(job); this->mutex->lock(this->mutex); /* respawn thread if required */ if (this->desired_threads >= this->total_threads) { worker_thread_t *new_worker; INIT(new_worker, .processor = this, ); new_worker->thread = thread_create((thread_main_t)process_jobs, new_worker); if (new_worker->thread) { this->threads->insert_last(this->threads, new_worker); this->mutex->unlock(this->mutex); return; } free(new_worker); } this->total_threads--; this->thread_terminated->signal(this->thread_terminated); this->mutex->unlock(this->mutex); } /** * Get number of idle threads, non-locking variant */ static u_int get_idle_threads_nolock(private_processor_t *this) { u_int count, i; count = this->total_threads; for (i = 0; i < JOB_PRIO_MAX; i++) { count -= this->working_threads[i]; } return count; } /** * Get a job from any job queue, starting with the highest priority. * * this->mutex is expected to be locked. */ static bool get_job(private_processor_t *this, worker_thread_t *worker) { int i, reserved = 0, idle; idle = get_idle_threads_nolock(this); for (i = 0; i < JOB_PRIO_MAX; i++) { if (reserved && reserved >= idle) { DBG2(DBG_JOB, "delaying %N priority jobs: %d threads idle, " "but %d reserved for higher priorities", job_priority_names, i, idle, reserved); /* wait until a job of higher priority gets queued */ return FALSE; } if (this->working_threads[i] < this->prio_threads[i]) { reserved += this->prio_threads[i] - this->working_threads[i]; } if (this->jobs[i]->remove_first(this->jobs[i], (void**)&worker->job) == SUCCESS) { worker->priority = i; return TRUE; } } return FALSE; } /** * Process a single job (provided in worker->job, worker->priority is also * expected to be set) * * this->mutex is expected to be locked. */ static void process_job(private_processor_t *this, worker_thread_t *worker) { job_t *to_destroy = NULL; job_requeue_t requeue; this->working_threads[worker->priority]++; worker->job->status = JOB_STATUS_EXECUTING; this->mutex->unlock(this->mutex); /* canceled threads are restarted to get a constant pool */ thread_cleanup_push((thread_cleanup_t)restart, worker); while (TRUE) { requeue = worker->job->execute(worker->job); if (requeue.type != JOB_REQUEUE_TYPE_DIRECT) { break; } else if (!worker->job->cancel) { /* only allow cancelable jobs to requeue directly */ requeue.type = JOB_REQUEUE_TYPE_FAIR; break; } } thread_cleanup_pop(FALSE); this->mutex->lock(this->mutex); this->working_threads[worker->priority]--; if (worker->job->status == JOB_STATUS_CANCELED) { /* job was canceled via a custom cancel() method or did not * use JOB_REQUEUE_TYPE_DIRECT */ to_destroy = worker->job; } else { switch (requeue.type) { case JOB_REQUEUE_TYPE_NONE: worker->job->status = JOB_STATUS_DONE; to_destroy = worker->job; break; case JOB_REQUEUE_TYPE_FAIR: worker->job->status = JOB_STATUS_QUEUED; this->jobs[worker->priority]->insert_last( this->jobs[worker->priority], worker->job); this->job_added->signal(this->job_added); break; case JOB_REQUEUE_TYPE_SCHEDULE: /* scheduler_t does not hold its lock when queuing jobs * so this should be safe without unlocking our mutex */ switch (requeue.schedule) { case JOB_SCHEDULE: lib->scheduler->schedule_job(lib->scheduler, worker->job, requeue.time.rel); break; case JOB_SCHEDULE_MS: lib->scheduler->schedule_job_ms(lib->scheduler, worker->job, requeue.time.rel); break; case JOB_SCHEDULE_TV: lib->scheduler->schedule_job_tv(lib->scheduler, worker->job, requeue.time.abs); break; } break; default: break; } } /* unset the current job to avoid interference with cancel() when * destroying the job below */ worker->job = NULL; if (to_destroy) { /* release mutex to avoid deadlocks if the same lock is required * during queue_job() and in the destructor called here */ this->mutex->unlock(this->mutex); to_destroy->destroy(to_destroy); this->mutex->lock(this->mutex); } } /** * Process queued jobs, called by the worker threads */ static void process_jobs(worker_thread_t *worker) { private_processor_t *this = worker->processor; /* worker threads are not cancelable by default */ thread_cancelability(FALSE); DBG2(DBG_JOB, "started worker thread %.2u", thread_current_id()); this->mutex->lock(this->mutex); while (this->desired_threads >= this->total_threads) { if (get_job(this, worker)) { process_job(this, worker); } else { this->job_added->wait(this->job_added, this->mutex); } } this->total_threads--; this->thread_terminated->signal(this->thread_terminated); this->mutex->unlock(this->mutex); } METHOD(processor_t, get_total_threads, u_int, private_processor_t *this) { u_int count; this->mutex->lock(this->mutex); count = this->total_threads; this->mutex->unlock(this->mutex); return count; } METHOD(processor_t, get_idle_threads, u_int, private_processor_t *this) { u_int count; this->mutex->lock(this->mutex); count = get_idle_threads_nolock(this); this->mutex->unlock(this->mutex); return count; } /** * Check priority bounds */ static job_priority_t sane_prio(job_priority_t prio) { if ((int)prio < 0 || prio >= JOB_PRIO_MAX) { return JOB_PRIO_MAX - 1; } return prio; } METHOD(processor_t, get_working_threads, u_int, private_processor_t *this, job_priority_t prio) { u_int count; this->mutex->lock(this->mutex); count = this->working_threads[sane_prio(prio)]; this->mutex->unlock(this->mutex); return count; } METHOD(processor_t, get_job_load, u_int, private_processor_t *this, job_priority_t prio) { u_int load; prio = sane_prio(prio); this->mutex->lock(this->mutex); load = this->jobs[prio]->get_count(this->jobs[prio]); this->mutex->unlock(this->mutex); return load; } METHOD(processor_t, queue_job, void, private_processor_t *this, job_t *job) { job_priority_t prio; prio = sane_prio(job->get_priority(job)); job->status = JOB_STATUS_QUEUED; this->mutex->lock(this->mutex); this->jobs[prio]->insert_last(this->jobs[prio], job); this->job_added->signal(this->job_added); this->mutex->unlock(this->mutex); } METHOD(processor_t, execute_job, void, private_processor_t *this, job_t *job) { job_priority_t prio; bool queued = FALSE; this->mutex->lock(this->mutex); if (this->desired_threads && get_idle_threads_nolock(this)) { prio = sane_prio(job->get_priority(job)); job->status = JOB_STATUS_QUEUED; /* insert job in front to execute it immediately */ this->jobs[prio]->insert_first(this->jobs[prio], job); queued = TRUE; } this->job_added->signal(this->job_added); this->mutex->unlock(this->mutex); if (!queued) { job->execute(job); job->destroy(job); } } METHOD(processor_t, set_threads, void, private_processor_t *this, u_int count) { int i; this->mutex->lock(this->mutex); for (i = 0; i < JOB_PRIO_MAX; i++) { this->prio_threads[i] = lib->settings->get_int(lib->settings, "%s.processor.priority_threads.%N", 0, lib->ns, job_priority_names, i); } if (count > this->total_threads) { /* increase thread count */ worker_thread_t *worker; int i; this->desired_threads = count; DBG1(DBG_JOB, "spawning %d worker threads", count - this->total_threads); for (i = this->total_threads; i < count; i++) { INIT(worker, .processor = this, ); worker->thread = thread_create((thread_main_t)process_jobs, worker); if (worker->thread) { this->threads->insert_last(this->threads, worker); this->total_threads++; } else { free(worker); } } } else if (count < this->total_threads) { /* decrease thread count */ this->desired_threads = count; } this->job_added->broadcast(this->job_added); this->mutex->unlock(this->mutex); } METHOD(processor_t, cancel, void, private_processor_t *this) { enumerator_t *enumerator; worker_thread_t *worker; job_t *job; int i; this->mutex->lock(this->mutex); this->desired_threads = 0; /* cancel potentially blocking jobs */ enumerator = this->threads->create_enumerator(this->threads); while (enumerator->enumerate(enumerator, (void**)&worker)) { if (worker->job && worker->job->cancel) { worker->job->status = JOB_STATUS_CANCELED; if (!worker->job->cancel(worker->job)) { /* job requests to be canceled explicitly, otherwise we assume * the thread terminates itself and can be joined */ worker->thread->cancel(worker->thread); } } } enumerator->destroy(enumerator); while (this->total_threads > 0) { this->job_added->broadcast(this->job_added); this->thread_terminated->wait(this->thread_terminated, this->mutex); } while (this->threads->remove_first(this->threads, (void**)&worker) == SUCCESS) { worker->thread->join(worker->thread); free(worker); } for (i = 0; i < JOB_PRIO_MAX; i++) { while (this->jobs[i]->remove_first(this->jobs[i], (void**)&job) == SUCCESS) { job->destroy(job); } } this->mutex->unlock(this->mutex); } METHOD(processor_t, destroy, void, private_processor_t *this) { int i; cancel(this); this->thread_terminated->destroy(this->thread_terminated); this->job_added->destroy(this->job_added); this->mutex->destroy(this->mutex); for (i = 0; i < JOB_PRIO_MAX; i++) { this->jobs[i]->destroy(this->jobs[i]); } this->threads->destroy(this->threads); free(this); } /* * Described in header. */ processor_t *processor_create() { private_processor_t *this; int i; INIT(this, .public = { .get_total_threads = _get_total_threads, .get_idle_threads = _get_idle_threads, .get_working_threads = _get_working_threads, .get_job_load = _get_job_load, .queue_job = _queue_job, .execute_job = _execute_job, .set_threads = _set_threads, .cancel = _cancel, .destroy = _destroy, }, .threads = linked_list_create(), .mutex = mutex_create(MUTEX_TYPE_DEFAULT), .job_added = condvar_create(CONDVAR_TYPE_DEFAULT), .thread_terminated = condvar_create(CONDVAR_TYPE_DEFAULT), ); for (i = 0; i < JOB_PRIO_MAX; i++) { this->jobs[i] = linked_list_create(); } return &this->public; }