Annotation of embedaddon/strongswan/src/libstrongswan/processing/scheduler.c, revision 1.1.1.1
1.1 misho 1: /*
2: * Copyright (C) 2008-2015 Tobias Brunner
3: * Copyright (C) 2005-2006 Martin Willi
4: * Copyright (C) 2005 Jan Hutter
5: * HSR Hochschule fuer Technik Rapperswil
6: *
7: * This program is free software; you can redistribute it and/or modify it
8: * under the terms of the GNU General Public License as published by the
9: * Free Software Foundation; either version 2 of the License, or (at your
10: * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
11: *
12: * This program is distributed in the hope that it will be useful, but
13: * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
14: * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
15: * for more details.
16: */
17:
18: #include <stdlib.h>
19:
20: #include "scheduler.h"
21:
22: #include <utils/debug.h>
23: #include <processing/processor.h>
24: #include <processing/jobs/callback_job.h>
25: #include <threading/thread.h>
26: #include <threading/condvar.h>
27: #include <threading/mutex.h>
28:
29: /* the initial size of the heap */
30: #define HEAP_SIZE_DEFAULT 64
31:
32: typedef struct event_t event_t;
33:
34: /**
35: * Event containing a job and a schedule time
36: */
37: struct event_t {
38: /**
39: * Time to fire the event.
40: */
41: timeval_t time;
42:
43: /**
44: * Every event has its assigned job.
45: */
46: job_t *job;
47: };
48:
49: /**
50: * destroy an event and its job
51: */
52: static void event_destroy(event_t *event)
53: {
54: event->job->destroy(event->job);
55: free(event);
56: }
57:
58: typedef struct private_scheduler_t private_scheduler_t;
59:
60: /**
61: * Private data of a scheduler_t object.
62: */
63: struct private_scheduler_t {
64:
65: /**
66: * Public part of a scheduler_t object.
67: */
68: scheduler_t public;
69:
70: /**
71: * The heap in which the events are stored.
72: */
73: event_t **heap;
74:
75: /**
76: * The size of the heap.
77: */
78: u_int heap_size;
79:
80: /**
81: * The number of scheduled events.
82: */
83: u_int event_count;
84:
85: /**
86: * Exclusive access to list
87: */
88: mutex_t *mutex;
89:
90: /**
91: * Condvar to wait for next job.
92: */
93: condvar_t *condvar;
94: };
95:
96: /**
97: * Compares two timevals, return >0 if a > b, <0 if a < b and =0 if equal
98: */
99: static int timeval_cmp(timeval_t *a, timeval_t *b)
100: {
101: if (a->tv_sec > b->tv_sec)
102: {
103: return 1;
104: }
105: if (a->tv_sec < b->tv_sec)
106: {
107: return -1;
108: }
109: if (a->tv_usec > b->tv_usec)
110: {
111: return 1;
112: }
113: if (a->tv_usec < b->tv_usec)
114: {
115: return -1;
116: }
117: return 0;
118: }
119:
120: /**
121: * Returns the top event without removing it. Returns NULL if the heap is empty.
122: */
123: static event_t *peek_event(private_scheduler_t *this)
124: {
125: return this->event_count > 0 ? this->heap[1] : NULL;
126: }
127:
128: /**
129: * Removes the top event from the heap and returns it. Returns NULL if the heap
130: * is empty.
131: */
132: static event_t *remove_event(private_scheduler_t *this)
133: {
134: event_t *event, *top;
135: if (!this->event_count)
136: {
137: return NULL;
138: }
139:
140: /* store the value to return */
141: event = this->heap[1];
142: /* move the bottom event to the top */
143: top = this->heap[1] = this->heap[this->event_count];
144:
145: if (--this->event_count > 1)
146: {
147: /* seep down the top event */
148: u_int position = 1;
149: while ((position << 1) <= this->event_count)
150: {
151: u_int child = position << 1;
152:
153: if ((child + 1) <= this->event_count &&
154: timeval_cmp(&this->heap[child + 1]->time,
155: &this->heap[child]->time) < 0)
156: {
157: /* the "right" child is smaller */
158: child++;
159: }
160:
161: if (timeval_cmp(&top->time, &this->heap[child]->time) <= 0)
162: {
163: /* the top event fires before the smaller of the two children,
164: * stop */
165: break;
166: }
167:
168: /* swap with the smaller child */
169: this->heap[position] = this->heap[child];
170: position = child;
171: }
172: this->heap[position] = top;
173: }
174: return event;
175: }
176:
177: /**
178: * Get events from the queue and pass it to the processor
179: */
180: static job_requeue_t schedule(private_scheduler_t * this)
181: {
182: timeval_t now;
183: event_t *event;
184: bool timed = FALSE, oldstate;
185:
186: this->mutex->lock(this->mutex);
187:
188: time_monotonic(&now);
189:
190: if ((event = peek_event(this)) != NULL)
191: {
192: if (timeval_cmp(&now, &event->time) >= 0)
193: {
194: remove_event(this);
195: this->mutex->unlock(this->mutex);
196: DBG2(DBG_JOB, "got event, queuing job for execution");
197: lib->processor->queue_job(lib->processor, event->job);
198: free(event);
199: return JOB_REQUEUE_DIRECT;
200: }
201: timersub(&event->time, &now, &now);
202: if (now.tv_sec)
203: {
204: DBG2(DBG_JOB, "next event in %ds %dms, waiting",
205: now.tv_sec, now.tv_usec/1000);
206: }
207: else
208: {
209: DBG2(DBG_JOB, "next event in %dms, waiting", now.tv_usec/1000);
210: }
211: timed = TRUE;
212: }
213: thread_cleanup_push((thread_cleanup_t)this->mutex->unlock, this->mutex);
214: oldstate = thread_cancelability(TRUE);
215:
216: if (timed)
217: {
218: this->condvar->timed_wait_abs(this->condvar, this->mutex, event->time);
219: }
220: else
221: {
222: DBG2(DBG_JOB, "no events, waiting");
223: this->condvar->wait(this->condvar, this->mutex);
224: }
225: thread_cancelability(oldstate);
226: thread_cleanup_pop(TRUE);
227: return JOB_REQUEUE_DIRECT;
228: }
229:
230: METHOD(scheduler_t, get_job_load, u_int,
231: private_scheduler_t *this)
232: {
233: int count;
234: this->mutex->lock(this->mutex);
235: count = this->event_count;
236: this->mutex->unlock(this->mutex);
237: return count;
238: }
239:
240: METHOD(scheduler_t, schedule_job_tv, void,
241: private_scheduler_t *this, job_t *job, timeval_t tv)
242: {
243: event_t *event;
244: u_int position;
245:
246: event = malloc_thing(event_t);
247: event->job = job;
248: event->job->status = JOB_STATUS_QUEUED;
249: event->time = tv;
250:
251: this->mutex->lock(this->mutex);
252:
253: this->event_count++;
254: if (this->event_count > this->heap_size)
255: {
256: /* double the size of the heap */
257: this->heap_size <<= 1;
258: this->heap = (event_t**)realloc(this->heap,
259: (this->heap_size + 1) * sizeof(event_t*));
260: }
261: /* "put" the event to the bottom */
262: position = this->event_count;
263:
264: /* then bubble it up */
265: while (position > 1 && timeval_cmp(&this->heap[position >> 1]->time,
266: &event->time) > 0)
267: {
268: /* parent has to be fired after the new event, move up */
269: this->heap[position] = this->heap[position >> 1];
270: position >>= 1;
271: }
272: this->heap[position] = event;
273:
274: this->condvar->signal(this->condvar);
275: this->mutex->unlock(this->mutex);
276: }
277:
278: METHOD(scheduler_t, schedule_job, void,
279: private_scheduler_t *this, job_t *job, uint32_t s)
280: {
281: timeval_t tv;
282:
283: time_monotonic(&tv);
284: tv.tv_sec += s;
285:
286: schedule_job_tv(this, job, tv);
287: }
288:
289: METHOD(scheduler_t, schedule_job_ms, void,
290: private_scheduler_t *this, job_t *job, uint32_t ms)
291: {
292: timeval_t tv, add;
293:
294: time_monotonic(&tv);
295: add.tv_sec = ms / 1000;
296: add.tv_usec = (ms % 1000) * 1000;
297:
298: timeradd(&tv, &add, &tv);
299:
300: schedule_job_tv(this, job, tv);
301: }
302:
303: METHOD(scheduler_t, flush, void,
304: private_scheduler_t *this)
305: {
306: event_t *event;
307:
308: this->mutex->lock(this->mutex);
309: while ((event = remove_event(this)) != NULL)
310: {
311: event_destroy(event);
312: }
313: this->condvar->signal(this->condvar);
314: this->mutex->unlock(this->mutex);
315: }
316:
317: METHOD(scheduler_t, destroy, void,
318: private_scheduler_t *this)
319: {
320: flush(this);
321: this->condvar->destroy(this->condvar);
322: this->mutex->destroy(this->mutex);
323: free(this->heap);
324: free(this);
325: }
326:
327: /*
328: * Described in header.
329: */
330: scheduler_t * scheduler_create()
331: {
332: private_scheduler_t *this;
333: callback_job_t *job;
334:
335: INIT(this,
336: .public = {
337: .get_job_load = _get_job_load,
338: .schedule_job = _schedule_job,
339: .schedule_job_ms = _schedule_job_ms,
340: .schedule_job_tv = _schedule_job_tv,
341: .flush = _flush,
342: .destroy = _destroy,
343: },
344: .heap_size = HEAP_SIZE_DEFAULT,
345: .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
346: .condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
347: );
348:
349: this->heap = (event_t**)calloc(this->heap_size + 1, sizeof(event_t*));
350:
351: job = callback_job_create_with_prio((callback_job_cb_t)schedule, this,
352: NULL, return_false, JOB_PRIO_CRITICAL);
353: lib->processor->queue_job(lib->processor, (job_t*)job);
354:
355: return &this->public;
356: }
357:
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>