Annotation of embedaddon/strongswan/src/libstrongswan/processing/processor.c, revision 1.1.1.1
1.1 misho 1: /*
2: * Copyright (C) 2005-2011 Martin Willi
3: * Copyright (C) 2011 revosec AG
4: * Copyright (C) 2008-2013 Tobias Brunner
5: * Copyright (C) 2005 Jan Hutter
6: * HSR Hochschule fuer Technik Rapperswil
7: *
8: * This program is free software; you can redistribute it and/or modify it
9: * under the terms of the GNU General Public License as published by the
10: * Free Software Foundation; either version 2 of the License, or (at your
11: * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
12: *
13: * This program is distributed in the hope that it will be useful, but
14: * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
15: * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
16: * for more details.
17: */
18:
19: #include <stdlib.h>
20: #include <string.h>
21: #include <errno.h>
22:
23: #include "processor.h"
24:
25: #include <utils/debug.h>
26: #include <threading/thread.h>
27: #include <threading/condvar.h>
28: #include <threading/mutex.h>
29: #include <threading/thread_value.h>
30: #include <collections/linked_list.h>
31:
32: typedef struct private_processor_t private_processor_t;
33:
34: /**
35: * Private data of processor_t class.
36: */
37: struct private_processor_t {
38:
39: /**
40: * Public processor_t interface.
41: */
42: processor_t public;
43:
44: /**
45: * Number of running threads
46: */
47: u_int total_threads;
48:
49: /**
50: * Desired number of threads
51: */
52: u_int desired_threads;
53:
54: /**
55: * Number of threads currently working, for each priority
56: */
57: u_int working_threads[JOB_PRIO_MAX];
58:
59: /**
60: * All threads managed in the pool (including threads that have been
61: * canceled, this allows to join them later), as worker_thread_t
62: */
63: linked_list_t *threads;
64:
65: /**
66: * A list of queued jobs for each priority
67: */
68: linked_list_t *jobs[JOB_PRIO_MAX];
69:
70: /**
71: * Threads reserved for each priority
72: */
73: int prio_threads[JOB_PRIO_MAX];
74:
75: /**
76: * access to job lists is locked through this mutex
77: */
78: mutex_t *mutex;
79:
80: /**
81: * Condvar to wait for new jobs
82: */
83: condvar_t *job_added;
84:
85: /**
86: * Condvar to wait for terminated threads
87: */
88: condvar_t *thread_terminated;
89: };
90:
91: /**
92: * Worker thread
93: */
94: typedef struct {
95:
96: /**
97: * Reference to the processor
98: */
99: private_processor_t *processor;
100:
101: /**
102: * The actual thread
103: */
104: thread_t *thread;
105:
106: /**
107: * Job currently being executed by this worker thread
108: */
109: job_t *job;
110:
111: /**
112: * Priority of the current job
113: */
114: job_priority_t priority;
115:
116: } worker_thread_t;
117:
118: static void process_jobs(worker_thread_t *worker);
119:
120: /**
121: * restart a terminated thread
122: */
123: static void restart(worker_thread_t *worker)
124: {
125: private_processor_t *this = worker->processor;
126: job_t *job;
127:
128: DBG2(DBG_JOB, "terminated worker thread %.2u", thread_current_id());
129:
130: this->mutex->lock(this->mutex);
131: /* cleanup worker thread */
132: this->working_threads[worker->priority]--;
133: worker->job->status = JOB_STATUS_CANCELED;
134: job = worker->job;
135: /* unset the job before releasing the mutex, otherwise cancel() might
136: * interfere */
137: worker->job = NULL;
138: /* release mutex to avoid deadlocks if the same lock is required
139: * during queue_job() and in the destructor called here */
140: this->mutex->unlock(this->mutex);
141: job->destroy(job);
142: this->mutex->lock(this->mutex);
143:
144: /* respawn thread if required */
145: if (this->desired_threads >= this->total_threads)
146: {
147: worker_thread_t *new_worker;
148:
149: INIT(new_worker,
150: .processor = this,
151: );
152: new_worker->thread = thread_create((thread_main_t)process_jobs,
153: new_worker);
154: if (new_worker->thread)
155: {
156: this->threads->insert_last(this->threads, new_worker);
157: this->mutex->unlock(this->mutex);
158: return;
159: }
160: free(new_worker);
161: }
162: this->total_threads--;
163: this->thread_terminated->signal(this->thread_terminated);
164: this->mutex->unlock(this->mutex);
165: }
166:
167: /**
168: * Get number of idle threads, non-locking variant
169: */
170: static u_int get_idle_threads_nolock(private_processor_t *this)
171: {
172: u_int count, i;
173:
174: count = this->total_threads;
175: for (i = 0; i < JOB_PRIO_MAX; i++)
176: {
177: count -= this->working_threads[i];
178: }
179: return count;
180: }
181:
182: /**
183: * Get a job from any job queue, starting with the highest priority.
184: *
185: * this->mutex is expected to be locked.
186: */
187: static bool get_job(private_processor_t *this, worker_thread_t *worker)
188: {
189: int i, reserved = 0, idle;
190:
191: idle = get_idle_threads_nolock(this);
192:
193: for (i = 0; i < JOB_PRIO_MAX; i++)
194: {
195: if (reserved && reserved >= idle)
196: {
197: DBG2(DBG_JOB, "delaying %N priority jobs: %d threads idle, "
198: "but %d reserved for higher priorities",
199: job_priority_names, i, idle, reserved);
200: /* wait until a job of higher priority gets queued */
201: return FALSE;
202: }
203: if (this->working_threads[i] < this->prio_threads[i])
204: {
205: reserved += this->prio_threads[i] - this->working_threads[i];
206: }
207: if (this->jobs[i]->remove_first(this->jobs[i],
208: (void**)&worker->job) == SUCCESS)
209: {
210: worker->priority = i;
211: return TRUE;
212: }
213: }
214: return FALSE;
215: }
216:
217: /**
218: * Process a single job (provided in worker->job, worker->priority is also
219: * expected to be set)
220: *
221: * this->mutex is expected to be locked.
222: */
223: static void process_job(private_processor_t *this, worker_thread_t *worker)
224: {
225: job_t *to_destroy = NULL;
226: job_requeue_t requeue;
227:
228: this->working_threads[worker->priority]++;
229: worker->job->status = JOB_STATUS_EXECUTING;
230: this->mutex->unlock(this->mutex);
231: /* canceled threads are restarted to get a constant pool */
232: thread_cleanup_push((thread_cleanup_t)restart, worker);
233: while (TRUE)
234: {
235: requeue = worker->job->execute(worker->job);
236: if (requeue.type != JOB_REQUEUE_TYPE_DIRECT)
237: {
238: break;
239: }
240: else if (!worker->job->cancel)
241: { /* only allow cancelable jobs to requeue directly */
242: requeue.type = JOB_REQUEUE_TYPE_FAIR;
243: break;
244: }
245: }
246: thread_cleanup_pop(FALSE);
247: this->mutex->lock(this->mutex);
248: this->working_threads[worker->priority]--;
249: if (worker->job->status == JOB_STATUS_CANCELED)
250: { /* job was canceled via a custom cancel() method or did not
251: * use JOB_REQUEUE_TYPE_DIRECT */
252: to_destroy = worker->job;
253: }
254: else
255: {
256: switch (requeue.type)
257: {
258: case JOB_REQUEUE_TYPE_NONE:
259: worker->job->status = JOB_STATUS_DONE;
260: to_destroy = worker->job;
261: break;
262: case JOB_REQUEUE_TYPE_FAIR:
263: worker->job->status = JOB_STATUS_QUEUED;
264: this->jobs[worker->priority]->insert_last(
265: this->jobs[worker->priority], worker->job);
266: this->job_added->signal(this->job_added);
267: break;
268: case JOB_REQUEUE_TYPE_SCHEDULE:
269: /* scheduler_t does not hold its lock when queuing jobs
270: * so this should be safe without unlocking our mutex */
271: switch (requeue.schedule)
272: {
273: case JOB_SCHEDULE:
274: lib->scheduler->schedule_job(lib->scheduler,
275: worker->job, requeue.time.rel);
276: break;
277: case JOB_SCHEDULE_MS:
278: lib->scheduler->schedule_job_ms(lib->scheduler,
279: worker->job, requeue.time.rel);
280: break;
281: case JOB_SCHEDULE_TV:
282: lib->scheduler->schedule_job_tv(lib->scheduler,
283: worker->job, requeue.time.abs);
284: break;
285: }
286: break;
287: default:
288: break;
289: }
290: }
291: /* unset the current job to avoid interference with cancel() when
292: * destroying the job below */
293: worker->job = NULL;
294:
295: if (to_destroy)
296: { /* release mutex to avoid deadlocks if the same lock is required
297: * during queue_job() and in the destructor called here */
298: this->mutex->unlock(this->mutex);
299: to_destroy->destroy(to_destroy);
300: this->mutex->lock(this->mutex);
301: }
302: }
303:
304: /**
305: * Process queued jobs, called by the worker threads
306: */
307: static void process_jobs(worker_thread_t *worker)
308: {
309: private_processor_t *this = worker->processor;
310:
311: /* worker threads are not cancelable by default */
312: thread_cancelability(FALSE);
313:
314: DBG2(DBG_JOB, "started worker thread %.2u", thread_current_id());
315:
316: this->mutex->lock(this->mutex);
317: while (this->desired_threads >= this->total_threads)
318: {
319: if (get_job(this, worker))
320: {
321: process_job(this, worker);
322: }
323: else
324: {
325: this->job_added->wait(this->job_added, this->mutex);
326: }
327: }
328: this->total_threads--;
329: this->thread_terminated->signal(this->thread_terminated);
330: this->mutex->unlock(this->mutex);
331: }
332:
333: METHOD(processor_t, get_total_threads, u_int,
334: private_processor_t *this)
335: {
336: u_int count;
337:
338: this->mutex->lock(this->mutex);
339: count = this->total_threads;
340: this->mutex->unlock(this->mutex);
341: return count;
342: }
343:
344: METHOD(processor_t, get_idle_threads, u_int,
345: private_processor_t *this)
346: {
347: u_int count;
348:
349: this->mutex->lock(this->mutex);
350: count = get_idle_threads_nolock(this);
351: this->mutex->unlock(this->mutex);
352: return count;
353: }
354:
355: /**
356: * Check priority bounds
357: */
358: static job_priority_t sane_prio(job_priority_t prio)
359: {
360: if ((int)prio < 0 || prio >= JOB_PRIO_MAX)
361: {
362: return JOB_PRIO_MAX - 1;
363: }
364: return prio;
365: }
366:
367: METHOD(processor_t, get_working_threads, u_int,
368: private_processor_t *this, job_priority_t prio)
369: {
370: u_int count;
371:
372: this->mutex->lock(this->mutex);
373: count = this->working_threads[sane_prio(prio)];
374: this->mutex->unlock(this->mutex);
375: return count;
376: }
377:
378: METHOD(processor_t, get_job_load, u_int,
379: private_processor_t *this, job_priority_t prio)
380: {
381: u_int load;
382:
383: prio = sane_prio(prio);
384: this->mutex->lock(this->mutex);
385: load = this->jobs[prio]->get_count(this->jobs[prio]);
386: this->mutex->unlock(this->mutex);
387: return load;
388: }
389:
390: METHOD(processor_t, queue_job, void,
391: private_processor_t *this, job_t *job)
392: {
393: job_priority_t prio;
394:
395: prio = sane_prio(job->get_priority(job));
396: job->status = JOB_STATUS_QUEUED;
397:
398: this->mutex->lock(this->mutex);
399: this->jobs[prio]->insert_last(this->jobs[prio], job);
400: this->job_added->signal(this->job_added);
401: this->mutex->unlock(this->mutex);
402: }
403:
404: METHOD(processor_t, execute_job, void,
405: private_processor_t *this, job_t *job)
406: {
407: job_priority_t prio;
408: bool queued = FALSE;
409:
410: this->mutex->lock(this->mutex);
411: if (this->desired_threads && get_idle_threads_nolock(this))
412: {
413: prio = sane_prio(job->get_priority(job));
414: job->status = JOB_STATUS_QUEUED;
415: /* insert job in front to execute it immediately */
416: this->jobs[prio]->insert_first(this->jobs[prio], job);
417: queued = TRUE;
418: }
419: this->job_added->signal(this->job_added);
420: this->mutex->unlock(this->mutex);
421:
422: if (!queued)
423: {
424: job->execute(job);
425: job->destroy(job);
426: }
427: }
428:
429: METHOD(processor_t, set_threads, void,
430: private_processor_t *this, u_int count)
431: {
432: int i;
433:
434: this->mutex->lock(this->mutex);
435: for (i = 0; i < JOB_PRIO_MAX; i++)
436: {
437: this->prio_threads[i] = lib->settings->get_int(lib->settings,
438: "%s.processor.priority_threads.%N", 0, lib->ns,
439: job_priority_names, i);
440: }
441: if (count > this->total_threads)
442: { /* increase thread count */
443: worker_thread_t *worker;
444: int i;
445:
446: this->desired_threads = count;
447: DBG1(DBG_JOB, "spawning %d worker threads", count - this->total_threads);
448: for (i = this->total_threads; i < count; i++)
449: {
450: INIT(worker,
451: .processor = this,
452: );
453: worker->thread = thread_create((thread_main_t)process_jobs, worker);
454: if (worker->thread)
455: {
456: this->threads->insert_last(this->threads, worker);
457: this->total_threads++;
458: }
459: else
460: {
461: free(worker);
462: }
463: }
464: }
465: else if (count < this->total_threads)
466: { /* decrease thread count */
467: this->desired_threads = count;
468: }
469: this->job_added->broadcast(this->job_added);
470: this->mutex->unlock(this->mutex);
471: }
472:
473: METHOD(processor_t, cancel, void,
474: private_processor_t *this)
475: {
476: enumerator_t *enumerator;
477: worker_thread_t *worker;
478: job_t *job;
479: int i;
480:
481: this->mutex->lock(this->mutex);
482: this->desired_threads = 0;
483: /* cancel potentially blocking jobs */
484: enumerator = this->threads->create_enumerator(this->threads);
485: while (enumerator->enumerate(enumerator, (void**)&worker))
486: {
487: if (worker->job && worker->job->cancel)
488: {
489: worker->job->status = JOB_STATUS_CANCELED;
490: if (!worker->job->cancel(worker->job))
491: { /* job requests to be canceled explicitly, otherwise we assume
492: * the thread terminates itself and can be joined */
493: worker->thread->cancel(worker->thread);
494: }
495: }
496: }
497: enumerator->destroy(enumerator);
498: while (this->total_threads > 0)
499: {
500: this->job_added->broadcast(this->job_added);
501: this->thread_terminated->wait(this->thread_terminated, this->mutex);
502: }
503: while (this->threads->remove_first(this->threads,
504: (void**)&worker) == SUCCESS)
505: {
506: worker->thread->join(worker->thread);
507: free(worker);
508: }
509: for (i = 0; i < JOB_PRIO_MAX; i++)
510: {
511: while (this->jobs[i]->remove_first(this->jobs[i],
512: (void**)&job) == SUCCESS)
513: {
514: job->destroy(job);
515: }
516: }
517: this->mutex->unlock(this->mutex);
518: }
519:
520: METHOD(processor_t, destroy, void,
521: private_processor_t *this)
522: {
523: int i;
524:
525: cancel(this);
526: this->thread_terminated->destroy(this->thread_terminated);
527: this->job_added->destroy(this->job_added);
528: this->mutex->destroy(this->mutex);
529: for (i = 0; i < JOB_PRIO_MAX; i++)
530: {
531: this->jobs[i]->destroy(this->jobs[i]);
532: }
533: this->threads->destroy(this->threads);
534: free(this);
535: }
536:
537: /*
538: * Described in header.
539: */
540: processor_t *processor_create()
541: {
542: private_processor_t *this;
543: int i;
544:
545: INIT(this,
546: .public = {
547: .get_total_threads = _get_total_threads,
548: .get_idle_threads = _get_idle_threads,
549: .get_working_threads = _get_working_threads,
550: .get_job_load = _get_job_load,
551: .queue_job = _queue_job,
552: .execute_job = _execute_job,
553: .set_threads = _set_threads,
554: .cancel = _cancel,
555: .destroy = _destroy,
556: },
557: .threads = linked_list_create(),
558: .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
559: .job_added = condvar_create(CONDVAR_TYPE_DEFAULT),
560: .thread_terminated = condvar_create(CONDVAR_TYPE_DEFAULT),
561: );
562:
563: for (i = 0; i < JOB_PRIO_MAX; i++)
564: {
565: this->jobs[i] = linked_list_create();
566: }
567: return &this->public;
568: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>