Annotation of libaitsched/src/tasks.c, revision 1.20.2.1
1.1 misho 1: /*************************************************************************
2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.20.2.1! misho 6: * $Id: tasks.c,v 1.20 2013/08/26 13:36:45 misho Exp $
1.1 misho 7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
1.16 misho 15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013
1.1 misho 16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47:
48:
1.13 misho 49: /*
50: * sched_useTask() - Get and init new task
51: *
52: * @root = root task
53: * return: NULL error or !=NULL prepared task
54: */
1.16 misho 55: sched_task_t *
1.13 misho 56: sched_useTask(sched_root_task_t * __restrict root)
1.4 misho 57: {
1.7 misho 58: sched_task_t *task, *tmp;
1.4 misho 59:
1.14 misho 60: #ifdef HAVE_LIBPTHREAD
61: pthread_mutex_lock(&root->root_mtx[taskUNUSE]);
62: #endif
1.7 misho 63: TAILQ_FOREACH_SAFE(task, &root->root_unuse, task_node, tmp) {
1.4 misho 64: if (!TASK_ISLOCKED(task)) {
65: TAILQ_REMOVE(&root->root_unuse, task, task_node);
66: break;
67: }
68: }
1.14 misho 69: #ifdef HAVE_LIBPTHREAD
70: pthread_mutex_unlock(&root->root_mtx[taskUNUSE]);
71: #endif
1.4 misho 72:
73: if (!task) {
74: task = malloc(sizeof(sched_task_t));
75: if (!task) {
76: LOGERR;
77: return NULL;
78: }
79: }
80:
1.9 misho 81: memset(task, 0, sizeof(sched_task_t));
82: task->task_id = (uintptr_t) task;
1.4 misho 83: return task;
84: }
85:
1.13 misho 86: /*
87: * sched_unuseTask() - Unlock and put task to unuse queue
88: *
89: * @task = task
90: * return: always is NULL
91: */
1.16 misho 92: sched_task_t *
1.13 misho 93: sched_unuseTask(sched_task_t * __restrict task)
1.4 misho 94: {
95: TASK_UNLOCK(task);
1.5 misho 96: TASK_TYPE(task) = taskUNUSE;
97: #ifdef HAVE_LIBPTHREAD
98: pthread_mutex_lock(&TASK_ROOT(task)->root_mtx[taskUNUSE]);
99: #endif
1.9 misho 100: TAILQ_INSERT_TAIL(&TASK_ROOT(task)->root_unuse, TASK_ID(task), task_node);
1.5 misho 101: #ifdef HAVE_LIBPTHREAD
102: pthread_mutex_unlock(&TASK_ROOT(task)->root_mtx[taskUNUSE]);
103: #endif
1.4 misho 104: task = NULL;
105:
106: return task;
107: }
108:
1.14 misho 109: #pragma GCC visibility push(hidden)
110:
111: #ifdef HAVE_LIBPTHREAD
112: static void
113: _sched_threadCleanup(sched_task_t *t)
114: {
115: if (!t || !TASK_ROOT(t))
116: return;
117:
118: if (TASK_FLAG(t) == PTHREAD_CREATE_JOINABLE)
119: pthread_detach(pthread_self());
120: }
121: void *
122: _sched_threadWrapper(sched_task_t *t)
123: {
124: void *ret = NULL;
1.15 misho 125: sem_t *s = NULL;
1.14 misho 126:
1.15 misho 127: if (!t || !TASK_ROOT(t) || !TASK_RET(t))
1.14 misho 128: pthread_exit(ret);
1.20.2.1! misho 129: else
1.15 misho 130: s = (sem_t*) TASK_RET(t);
1.14 misho 131:
132: pthread_cleanup_push((void (*)(void*)) _sched_threadCleanup, t);
133:
134: pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
135: pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
1.15 misho 136:
1.20.2.1! misho 137: pthread_mutex_lock(&TASK_ROOT(t)->root_mtx[taskTHREAD]);
! 138: TAILQ_REMOVE(&TASK_ROOT(t)->root_thread, t, task_node);
! 139: pthread_mutex_unlock(&TASK_ROOT(t)->root_mtx[taskTHREAD]);
! 140: sched_unuseTask(t);
1.20 misho 141:
1.15 misho 142: /* notify parent, thread is ready for execution */
143: sem_post(s);
1.14 misho 144: pthread_testcancel();
145:
146: ret = TASK_FUNC(t)(t);
147:
148: pthread_cleanup_pop(42);
149: TASK_ROOT(t)->root_ret = ret;
150: pthread_exit(ret);
151: }
152: #endif
153:
1.19 misho 154: #if defined(HAVE_TIMER_CREATE) && defined(HAVE_TIMER_SETTIME)
155: void *
156: _sched_rtcWrapper(sched_task_t *t)
157: {
158: sched_task_func_t func;
159: sched_task_t *task;
1.20 misho 160: sched_root_task_t *r;
1.19 misho 161:
162: if (!t || !TASK_ROOT(t) || !TASK_DATA(t))
163: return NULL;
164: else {
1.20 misho 165: r = TASK_ROOT(t);
1.19 misho 166: task = (sched_task_t*) TASK_DATA(t);
167: func = TASK_FUNC(task);
168: }
169:
1.20 misho 170: #ifdef HAVE_LIBPTHREAD
171: pthread_mutex_lock(&r->root_mtx[taskRTC]);
172: #endif
173: TAILQ_REMOVE(&r->root_rtc, task, task_node);
174: #ifdef HAVE_LIBPTHREAD
175: pthread_mutex_unlock(&r->root_mtx[taskRTC]);
176: #endif
1.20.2.1! misho 177: sched_unuseTask(task);
1.20 misho 178:
1.19 misho 179: timer_delete((timer_t) TASK_DATLEN(t));
1.20.2.1! misho 180:
! 181: return func(task);
1.19 misho 182: }
183: #endif
184:
1.14 misho 185: #pragma GCC visibility pop
186:
187: /*
188: * sched_taskExit() - Exit routine for scheduler task, explicit required for thread tasks
189: *
190: * @task = current task
191: * @retcode = return code
192: * return: return code
193: */
1.16 misho 194: void *
1.14 misho 195: sched_taskExit(sched_task_t *task, intptr_t retcode)
196: {
197: if (!task || !TASK_ROOT(task))
198: return (void*) -1;
199:
200: if (TASK_ROOT(task)->root_hooks.hook_exec.exit)
201: TASK_ROOT(task)->root_hooks.hook_exec.exit(task, (void*) retcode);
202:
203: TASK_ROOT(task)->root_ret = (void*) retcode;
204: return (void*) retcode;
205: }
206:
1.4 misho 207:
1.1 misho 208: /*
1.2 misho 209: * schedRead() - Add READ I/O task to scheduler queue
1.6 misho 210: *
1.1 misho 211: * @root = root task
212: * @func = task execution function
213: * @arg = 1st func argument
1.2 misho 214: * @fd = fd handle
1.5 misho 215: * @opt_data = Optional data
216: * @opt_dlen = Optional data length
1.1 misho 217: * return: NULL error or !=NULL new queued task
218: */
219: sched_task_t *
1.5 misho 220: schedRead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
221: void *opt_data, size_t opt_dlen)
1.1 misho 222: {
223: sched_task_t *task;
224: void *ptr;
225:
226: if (!root || !func)
227: return NULL;
228:
229: /* get new task */
1.13 misho 230: if (!(task = sched_useTask(root)))
1.4 misho 231: return NULL;
1.1 misho 232:
233: task->task_func = func;
1.5 misho 234: TASK_TYPE(task) = taskREAD;
235: TASK_ROOT(task) = root;
1.1 misho 236:
237: TASK_ARG(task) = arg;
1.2 misho 238: TASK_FD(task) = fd;
1.1 misho 239:
1.5 misho 240: TASK_DATA(task) = opt_data;
241: TASK_DATLEN(task) = opt_dlen;
242:
1.1 misho 243: if (root->root_hooks.hook_add.read)
244: ptr = root->root_hooks.hook_add.read(task, NULL);
245: else
246: ptr = NULL;
247:
1.5 misho 248: if (!ptr) {
249: #ifdef HAVE_LIBPTHREAD
250: pthread_mutex_lock(&root->root_mtx[taskREAD]);
251: #endif
1.9 misho 252: TAILQ_INSERT_TAIL(&root->root_read, TASK_ID(task), task_node);
1.5 misho 253: #ifdef HAVE_LIBPTHREAD
254: pthread_mutex_unlock(&root->root_mtx[taskREAD]);
255: #endif
256: } else
1.13 misho 257: task = sched_unuseTask(task);
1.1 misho 258:
259: return task;
260: }
261:
262: /*
1.2 misho 263: * schedWrite() - Add WRITE I/O task to scheduler queue
1.6 misho 264: *
1.1 misho 265: * @root = root task
266: * @func = task execution function
267: * @arg = 1st func argument
1.2 misho 268: * @fd = fd handle
1.5 misho 269: * @opt_data = Optional data
270: * @opt_dlen = Optional data length
1.1 misho 271: * return: NULL error or !=NULL new queued task
272: */
273: sched_task_t *
1.5 misho 274: schedWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
275: void *opt_data, size_t opt_dlen)
1.1 misho 276: {
277: sched_task_t *task;
278: void *ptr;
279:
280: if (!root || !func)
281: return NULL;
282:
283: /* get new task */
1.13 misho 284: if (!(task = sched_useTask(root)))
1.4 misho 285: return NULL;
1.1 misho 286:
287: task->task_func = func;
1.5 misho 288: TASK_TYPE(task) = taskWRITE;
289: TASK_ROOT(task) = root;
1.1 misho 290:
291: TASK_ARG(task) = arg;
1.2 misho 292: TASK_FD(task) = fd;
1.1 misho 293:
1.5 misho 294: TASK_DATA(task) = opt_data;
295: TASK_DATLEN(task) = opt_dlen;
296:
1.1 misho 297: if (root->root_hooks.hook_add.write)
298: ptr = root->root_hooks.hook_add.write(task, NULL);
299: else
300: ptr = NULL;
301:
1.5 misho 302: if (!ptr) {
303: #ifdef HAVE_LIBPTHREAD
304: pthread_mutex_lock(&root->root_mtx[taskWRITE]);
305: #endif
1.9 misho 306: TAILQ_INSERT_TAIL(&root->root_write, TASK_ID(task), task_node);
1.5 misho 307: #ifdef HAVE_LIBPTHREAD
308: pthread_mutex_unlock(&root->root_mtx[taskWRITE]);
309: #endif
310: } else
1.13 misho 311: task = sched_unuseTask(task);
1.1 misho 312:
313: return task;
314: }
315:
316: /*
1.9 misho 317: * schedNode() - Add NODE task to scheduler queue
318: *
319: * @root = root task
320: * @func = task execution function
321: * @arg = 1st func argument
322: * @fd = fd handle
323: * @opt_data = Optional data
324: * @opt_dlen = Optional data length
325: * return: NULL error or !=NULL new queued task
326: */
327: sched_task_t *
328: schedNode(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
329: void *opt_data, size_t opt_dlen)
330: {
331: sched_task_t *task;
332: void *ptr;
333:
334: if (!root || !func)
335: return NULL;
336:
337: /* get new task */
1.13 misho 338: if (!(task = sched_useTask(root)))
1.9 misho 339: return NULL;
340:
341: task->task_func = func;
342: TASK_TYPE(task) = taskNODE;
343: TASK_ROOT(task) = root;
344:
345: TASK_ARG(task) = arg;
346: TASK_FD(task) = fd;
347:
348: TASK_DATA(task) = opt_data;
349: TASK_DATLEN(task) = opt_dlen;
350:
351: if (root->root_hooks.hook_add.node)
352: ptr = root->root_hooks.hook_add.node(task, NULL);
353: else
354: ptr = NULL;
355:
356: if (!ptr) {
357: #ifdef HAVE_LIBPTHREAD
358: pthread_mutex_lock(&root->root_mtx[taskNODE]);
359: #endif
360: TAILQ_INSERT_TAIL(&root->root_node, TASK_ID(task), task_node);
361: #ifdef HAVE_LIBPTHREAD
362: pthread_mutex_unlock(&root->root_mtx[taskNODE]);
363: #endif
364: } else
1.13 misho 365: task = sched_unuseTask(task);
1.9 misho 366:
367: return task;
368: }
369:
370: /*
371: * schedProc() - Add PROC task to scheduler queue
372: *
373: * @root = root task
374: * @func = task execution function
375: * @arg = 1st func argument
376: * @pid = PID
377: * @opt_data = Optional data
378: * @opt_dlen = Optional data length
379: * return: NULL error or !=NULL new queued task
380: */
381: sched_task_t *
382: schedProc(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long pid,
383: void *opt_data, size_t opt_dlen)
384: {
385: sched_task_t *task;
386: void *ptr;
387:
388: if (!root || !func)
389: return NULL;
390:
391: /* get new task */
1.13 misho 392: if (!(task = sched_useTask(root)))
1.9 misho 393: return NULL;
394:
395: task->task_func = func;
396: TASK_TYPE(task) = taskPROC;
397: TASK_ROOT(task) = root;
398:
399: TASK_ARG(task) = arg;
400: TASK_VAL(task) = pid;
401:
402: TASK_DATA(task) = opt_data;
403: TASK_DATLEN(task) = opt_dlen;
404:
405: if (root->root_hooks.hook_add.proc)
406: ptr = root->root_hooks.hook_add.proc(task, NULL);
407: else
408: ptr = NULL;
409:
410: if (!ptr) {
411: #ifdef HAVE_LIBPTHREAD
412: pthread_mutex_lock(&root->root_mtx[taskPROC]);
413: #endif
414: TAILQ_INSERT_TAIL(&root->root_proc, TASK_ID(task), task_node);
415: #ifdef HAVE_LIBPTHREAD
416: pthread_mutex_unlock(&root->root_mtx[taskPROC]);
417: #endif
418: } else
1.13 misho 419: task = sched_unuseTask(task);
1.9 misho 420:
421: return task;
422: }
423:
424: /*
425: * schedUser() - Add trigger USER task to scheduler queue
426: *
427: * @root = root task
428: * @func = task execution function
429: * @arg = 1st func argument
430: * @id = Trigger ID
431: * @opt_data = Optional data
432: * @opt_dlen = Optional user's trigger flags
433: * return: NULL error or !=NULL new queued task
434: */
435: sched_task_t *
436: schedUser(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id,
437: void *opt_data, size_t opt_dlen)
438: {
439: #ifndef EVFILT_USER
440: sched_SetErr(ENOTSUP, "Not supported kevent() filter");
441: return NULL;
442: #else
443: sched_task_t *task;
444: void *ptr;
445:
446: if (!root || !func)
447: return NULL;
448:
449: /* get new task */
1.13 misho 450: if (!(task = sched_useTask(root)))
1.9 misho 451: return NULL;
452:
453: task->task_func = func;
454: TASK_TYPE(task) = taskUSER;
455: TASK_ROOT(task) = root;
456:
457: TASK_ARG(task) = arg;
458: TASK_VAL(task) = id;
459:
460: TASK_DATA(task) = opt_data;
461: TASK_DATLEN(task) = opt_dlen;
462:
463: if (root->root_hooks.hook_add.user)
464: ptr = root->root_hooks.hook_add.user(task, NULL);
465: else
466: ptr = NULL;
467:
468: if (!ptr) {
469: #ifdef HAVE_LIBPTHREAD
470: pthread_mutex_lock(&root->root_mtx[taskUSER]);
471: #endif
472: TAILQ_INSERT_TAIL(&root->root_user, TASK_ID(task), task_node);
473: #ifdef HAVE_LIBPTHREAD
474: pthread_mutex_unlock(&root->root_mtx[taskUSER]);
475: #endif
476: } else
1.13 misho 477: task = sched_unuseTask(task);
1.9 misho 478:
479: return task;
480: #endif
481: }
482:
483: /*
484: * schedSignal() - Add SIGNAL task to scheduler queue
485: *
486: * @root = root task
487: * @func = task execution function
488: * @arg = 1st func argument
489: * @sig = Signal
490: * @opt_data = Optional data
491: * @opt_dlen = Optional data length
492: * return: NULL error or !=NULL new queued task
493: */
494: sched_task_t *
495: schedSignal(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long sig,
496: void *opt_data, size_t opt_dlen)
497: {
498: sched_task_t *task;
499: void *ptr;
500:
501: if (!root || !func)
502: return NULL;
503:
504: /* get new task */
1.13 misho 505: if (!(task = sched_useTask(root)))
1.9 misho 506: return NULL;
507:
508: task->task_func = func;
509: TASK_TYPE(task) = taskSIGNAL;
510: TASK_ROOT(task) = root;
511:
512: TASK_ARG(task) = arg;
513: TASK_VAL(task) = sig;
514:
515: TASK_DATA(task) = opt_data;
516: TASK_DATLEN(task) = opt_dlen;
517:
518: if (root->root_hooks.hook_add.signal)
519: ptr = root->root_hooks.hook_add.signal(task, NULL);
520: else
521: ptr = NULL;
522:
523: if (!ptr) {
524: #ifdef HAVE_LIBPTHREAD
525: pthread_mutex_lock(&root->root_mtx[taskSIGNAL]);
526: #endif
527: TAILQ_INSERT_TAIL(&root->root_signal, TASK_ID(task), task_node);
528: #ifdef HAVE_LIBPTHREAD
529: pthread_mutex_unlock(&root->root_mtx[taskSIGNAL]);
530: #endif
531: } else
1.13 misho 532: task = sched_unuseTask(task);
1.9 misho 533:
534: return task;
535: }
536:
537: /*
1.8 misho 538: * schedAlarm() - Add ALARM task to scheduler queue
539: *
540: * @root = root task
541: * @func = task execution function
542: * @arg = 1st func argument
543: * @ts = timeout argument structure, minimum alarm timer resolution is 1msec!
1.17 misho 544: * @opt_data = Alarm timer ID
1.8 misho 545: * @opt_dlen = Optional data length
546: * return: NULL error or !=NULL new queued task
547: */
548: sched_task_t *
549: schedAlarm(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,
550: void *opt_data, size_t opt_dlen)
551: {
552: sched_task_t *task;
553: void *ptr;
554:
555: if (!root || !func)
556: return NULL;
557:
558: /* get new task */
1.13 misho 559: if (!(task = sched_useTask(root)))
1.8 misho 560: return NULL;
561:
562: task->task_func = func;
563: TASK_TYPE(task) = taskALARM;
564: TASK_ROOT(task) = root;
565:
566: TASK_ARG(task) = arg;
567: TASK_TS(task) = ts;
568:
569: TASK_DATA(task) = opt_data;
570: TASK_DATLEN(task) = opt_dlen;
571:
572: if (root->root_hooks.hook_add.alarm)
573: ptr = root->root_hooks.hook_add.alarm(task, NULL);
574: else
575: ptr = NULL;
576:
577: if (!ptr) {
578: #ifdef HAVE_LIBPTHREAD
579: pthread_mutex_lock(&root->root_mtx[taskALARM]);
580: #endif
1.9 misho 581: TAILQ_INSERT_TAIL(&root->root_alarm, TASK_ID(task), task_node);
1.8 misho 582: #ifdef HAVE_LIBPTHREAD
583: pthread_mutex_unlock(&root->root_mtx[taskALARM]);
584: #endif
585: } else
1.13 misho 586: task = sched_unuseTask(task);
1.8 misho 587:
588: return task;
589: }
590:
1.11 misho 591: #ifdef AIO_SUPPORT
592: /*
593: * schedAIO() - Add AIO task to scheduler queue
594: *
595: * @root = root task
596: * @func = task execution function
597: * @arg = 1st func argument
598: * @acb = AIO cb structure address
599: * @opt_data = Optional data
600: * @opt_dlen = Optional data length
601: * return: NULL error or !=NULL new queued task
602: */
603: sched_task_t *
604: schedAIO(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg,
605: struct aiocb * __restrict acb, void *opt_data, size_t opt_dlen)
606: {
607: sched_task_t *task;
608: void *ptr;
609:
610: if (!root || !func || !acb || !opt_dlen)
611: return NULL;
612:
613: /* get new task */
1.13 misho 614: if (!(task = sched_useTask(root)))
1.11 misho 615: return NULL;
616:
617: task->task_func = func;
618: TASK_TYPE(task) = taskAIO;
619: TASK_ROOT(task) = root;
620:
621: TASK_ARG(task) = arg;
622: TASK_VAL(task) = (u_long) acb;
623:
624: TASK_DATA(task) = opt_data;
625: TASK_DATLEN(task) = opt_dlen;
626:
627: if (root->root_hooks.hook_add.aio)
628: ptr = root->root_hooks.hook_add.aio(task, NULL);
629: else
630: ptr = NULL;
631:
632: if (!ptr) {
633: #ifdef HAVE_LIBPTHREAD
634: pthread_mutex_lock(&root->root_mtx[taskAIO]);
635: #endif
636: TAILQ_INSERT_TAIL(&root->root_aio, TASK_ID(task), task_node);
637: #ifdef HAVE_LIBPTHREAD
638: pthread_mutex_unlock(&root->root_mtx[taskAIO]);
639: #endif
640: } else
1.13 misho 641: task = sched_unuseTask(task);
1.11 misho 642:
643: return task;
644: }
645:
646: /*
647: * schedAIORead() - Add AIO read task to scheduler queue
648: *
649: * @root = root task
650: * @func = task execution function
651: * @arg = 1st func argument
652: * @fd = file descriptor
653: * @buffer = Buffer
654: * @buflen = Buffer length
655: * @offset = Offset from start of file, if =-1 from current position
656: * return: NULL error or !=NULL new queued task
657: */
1.16 misho 658: sched_task_t *
1.11 misho 659: schedAIORead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
660: void *buffer, size_t buflen, off_t offset)
661: {
662: struct aiocb *acb;
663: off_t off;
664:
665: if (!root || !func || !buffer || !buflen)
666: return NULL;
667:
668: if (offset == (off_t) -1) {
669: off = lseek(fd, 0, SEEK_CUR);
670: if (off == -1) {
671: LOGERR;
672: return NULL;
673: }
674: } else
675: off = offset;
676:
677: if (!(acb = malloc(sizeof(struct aiocb)))) {
678: LOGERR;
679: return NULL;
680: } else
681: memset(acb, 0, sizeof(struct aiocb));
682:
683: acb->aio_fildes = fd;
684: acb->aio_nbytes = buflen;
685: acb->aio_buf = buffer;
686: acb->aio_offset = off;
687: acb->aio_sigevent.sigev_notify = SIGEV_KEVENT;
688: acb->aio_sigevent.sigev_notify_kqueue = root->root_kq;
689: acb->aio_sigevent.sigev_value.sival_ptr = acb;
690:
691: if (aio_read(acb)) {
692: LOGERR;
693: free(acb);
694: return NULL;
695: }
696:
697: return schedAIO(root, func, arg, acb, buffer, buflen);
698: }
699:
700: /*
701: * schedAIOWrite() - Add AIO write task to scheduler queue
702: *
703: * @root = root task
704: * @func = task execution function
705: * @arg = 1st func argument
706: * @fd = file descriptor
707: * @buffer = Buffer
708: * @buflen = Buffer length
709: * @offset = Offset from start of file, if =-1 from current position
710: * return: NULL error or !=NULL new queued task
711: */
1.16 misho 712: sched_task_t *
1.11 misho 713: schedAIOWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
714: void *buffer, size_t buflen, off_t offset)
715: {
716: struct aiocb *acb;
717: off_t off;
718:
719: if (!root || !func || !buffer || !buflen)
720: return NULL;
721:
722: if (offset == (off_t) -1) {
723: off = lseek(fd, 0, SEEK_CUR);
724: if (off == -1) {
725: LOGERR;
726: return NULL;
727: }
728: } else
729: off = offset;
730:
731: if (!(acb = malloc(sizeof(struct aiocb)))) {
732: LOGERR;
733: return NULL;
734: } else
735: memset(acb, 0, sizeof(struct aiocb));
736:
737: acb->aio_fildes = fd;
738: acb->aio_nbytes = buflen;
739: acb->aio_buf = buffer;
740: acb->aio_offset = off;
741: acb->aio_sigevent.sigev_notify = SIGEV_KEVENT;
742: acb->aio_sigevent.sigev_notify_kqueue = root->root_kq;
743: acb->aio_sigevent.sigev_value.sival_ptr = acb;
744:
745: if (aio_write(acb)) {
746: LOGERR;
747: free(acb);
748: return NULL;
749: }
750:
751: return schedAIO(root, func, arg, acb, buffer, buflen);
752: }
753:
754: #ifdef EVFILT_LIO
755: /*
756: * schedLIO() - Add AIO bulk tasks to scheduler queue
757: *
758: * @root = root task
759: * @func = task execution function
760: * @arg = 1st func argument
761: * @acbs = AIO cb structure addresses
762: * @opt_data = Optional data
763: * @opt_dlen = Optional data length
764: * return: NULL error or !=NULL new queued task
765: */
766: sched_task_t *
767: schedLIO(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg,
768: struct aiocb ** __restrict acbs, void *opt_data, size_t opt_dlen)
769: {
770: sched_task_t *task;
771: void *ptr;
772:
773: if (!root || !func || !acbs || !opt_dlen)
774: return NULL;
775:
776: /* get new task */
1.13 misho 777: if (!(task = sched_useTask(root)))
1.11 misho 778: return NULL;
779:
780: task->task_func = func;
781: TASK_TYPE(task) = taskLIO;
782: TASK_ROOT(task) = root;
783:
784: TASK_ARG(task) = arg;
785: TASK_VAL(task) = (u_long) acbs;
786:
787: TASK_DATA(task) = opt_data;
788: TASK_DATLEN(task) = opt_dlen;
789:
790: if (root->root_hooks.hook_add.lio)
791: ptr = root->root_hooks.hook_add.lio(task, NULL);
792: else
793: ptr = NULL;
794:
795: if (!ptr) {
796: #ifdef HAVE_LIBPTHREAD
797: pthread_mutex_lock(&root->root_mtx[taskLIO]);
798: #endif
799: TAILQ_INSERT_TAIL(&root->root_lio, TASK_ID(task), task_node);
800: #ifdef HAVE_LIBPTHREAD
801: pthread_mutex_unlock(&root->root_mtx[taskLIO]);
802: #endif
803: } else
1.13 misho 804: task = sched_unuseTask(task);
1.11 misho 805:
806: return task;
807: }
808:
809: /*
810: * schedLIORead() - Add list of AIO read tasks to scheduler queue
811: *
812: * @root = root task
813: * @func = task execution function
814: * @arg = 1st func argument
815: * @fd = file descriptor
816: * @bufs = Buffer's list
817: * @nbufs = Number of Buffers
818: * @offset = Offset from start of file, if =-1 from current position
819: * return: NULL error or !=NULL new queued task
820: */
821: sched_task_t *
822: schedLIORead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
823: struct iovec *bufs, size_t nbufs, off_t offset)
824: {
825: struct sigevent sig;
826: struct aiocb **acb;
827: off_t off;
828: register int i;
829:
830: if (!root || !func || !bufs || !nbufs)
831: return NULL;
832:
833: if (offset == (off_t) -1) {
834: off = lseek(fd, 0, SEEK_CUR);
835: if (off == -1) {
836: LOGERR;
837: return NULL;
838: }
839: } else
840: off = offset;
841:
842: if (!(acb = calloc(sizeof(void*), nbufs))) {
843: LOGERR;
844: return NULL;
845: } else
846: memset(acb, 0, sizeof(void*) * nbufs);
847: for (i = 0; i < nbufs; off += bufs[i++].iov_len) {
848: acb[i] = malloc(sizeof(struct aiocb));
849: if (!acb[i]) {
850: LOGERR;
851: for (i = 0; i < nbufs; i++)
852: if (acb[i])
853: free(acb[i]);
854: free(acb);
855: return NULL;
856: } else
857: memset(acb[i], 0, sizeof(struct aiocb));
858: acb[i]->aio_fildes = fd;
859: acb[i]->aio_nbytes = bufs[i].iov_len;
860: acb[i]->aio_buf = bufs[i].iov_base;
861: acb[i]->aio_offset = off;
862: acb[i]->aio_lio_opcode = LIO_READ;
863: }
864: memset(&sig, 0, sizeof sig);
865: sig.sigev_notify = SIGEV_KEVENT;
866: sig.sigev_notify_kqueue = root->root_kq;
867: sig.sigev_value.sival_ptr = acb;
868:
869: if (lio_listio(LIO_NOWAIT, acb, nbufs, &sig)) {
870: LOGERR;
871: for (i = 0; i < nbufs; i++)
872: if (acb[i])
873: free(acb[i]);
874: free(acb);
875: return NULL;
876: }
877:
878: return schedLIO(root, func, arg, (void*) acb, bufs, nbufs);
879: }
880:
881: /*
882: * schedLIOWrite() - Add list of AIO write tasks to scheduler queue
883: *
884: * @root = root task
885: * @func = task execution function
886: * @arg = 1st func argument
887: * @fd = file descriptor
888: * @bufs = Buffer's list
889: * @nbufs = Number of Buffers
890: * @offset = Offset from start of file, if =-1 from current position
891: * return: NULL error or !=NULL new queued task
892: */
1.16 misho 893: sched_task_t *
1.11 misho 894: schedLIOWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
895: struct iovec *bufs, size_t nbufs, off_t offset)
896: {
897: struct sigevent sig;
898: struct aiocb **acb;
899: off_t off;
900: register int i;
901:
902: if (!root || !func || !bufs || !nbufs)
903: return NULL;
904:
905: if (offset == (off_t) -1) {
906: off = lseek(fd, 0, SEEK_CUR);
907: if (off == -1) {
908: LOGERR;
909: return NULL;
910: }
911: } else
912: off = offset;
913:
914: if (!(acb = calloc(sizeof(void*), nbufs))) {
915: LOGERR;
916: return NULL;
917: } else
918: memset(acb, 0, sizeof(void*) * nbufs);
919: for (i = 0; i < nbufs; off += bufs[i++].iov_len) {
920: acb[i] = malloc(sizeof(struct aiocb));
921: if (!acb[i]) {
922: LOGERR;
923: for (i = 0; i < nbufs; i++)
924: if (acb[i])
925: free(acb[i]);
926: free(acb);
927: return NULL;
928: } else
929: memset(acb[i], 0, sizeof(struct aiocb));
930: acb[i]->aio_fildes = fd;
931: acb[i]->aio_nbytes = bufs[i].iov_len;
932: acb[i]->aio_buf = bufs[i].iov_base;
933: acb[i]->aio_offset = off;
934: acb[i]->aio_lio_opcode = LIO_WRITE;
935: }
936: memset(&sig, 0, sizeof sig);
937: sig.sigev_notify = SIGEV_KEVENT;
938: sig.sigev_notify_kqueue = root->root_kq;
939: sig.sigev_value.sival_ptr = acb;
940:
941: if (lio_listio(LIO_NOWAIT, acb, nbufs, &sig)) {
942: LOGERR;
943: for (i = 0; i < nbufs; i++)
944: if (acb[i])
945: free(acb[i]);
946: free(acb);
947: return NULL;
948: }
949:
950: return schedLIO(root, func, arg, (void*) acb, bufs, nbufs);
951: }
952: #endif /* EVFILT_LIO */
953: #endif /* AIO_SUPPORT */
954:
1.8 misho 955: /*
1.1 misho 956: * schedTimer() - Add TIMER task to scheduler queue
1.6 misho 957: *
1.1 misho 958: * @root = root task
959: * @func = task execution function
960: * @arg = 1st func argument
1.5 misho 961: * @ts = timeout argument structure
962: * @opt_data = Optional data
963: * @opt_dlen = Optional data length
1.1 misho 964: * return: NULL error or !=NULL new queued task
965: */
966: sched_task_t *
1.5 misho 967: schedTimer(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,
968: void *opt_data, size_t opt_dlen)
1.1 misho 969: {
1.9 misho 970: sched_task_t *task, *tmp, *t = NULL;
1.1 misho 971: void *ptr;
1.5 misho 972: struct timespec now;
1.1 misho 973:
974: if (!root || !func)
975: return NULL;
976:
977: /* get new task */
1.13 misho 978: if (!(task = sched_useTask(root)))
1.4 misho 979: return NULL;
1.1 misho 980:
981: task->task_func = func;
1.5 misho 982: TASK_TYPE(task) = taskTIMER;
983: TASK_ROOT(task) = root;
1.1 misho 984:
985: TASK_ARG(task) = arg;
986:
1.5 misho 987: TASK_DATA(task) = opt_data;
988: TASK_DATLEN(task) = opt_dlen;
989:
1.1 misho 990: /* calculate timeval structure */
1.5 misho 991: clock_gettime(CLOCK_MONOTONIC, &now);
992: now.tv_sec += ts.tv_sec;
993: now.tv_nsec += ts.tv_nsec;
994: if (now.tv_nsec >= 1000000000L) {
1.1 misho 995: now.tv_sec++;
1.5 misho 996: now.tv_nsec -= 1000000000L;
997: } else if (now.tv_nsec < 0) {
1.1 misho 998: now.tv_sec--;
1.5 misho 999: now.tv_nsec += 1000000000L;
1.1 misho 1000: }
1.5 misho 1001: TASK_TS(task) = now;
1.1 misho 1002:
1003: if (root->root_hooks.hook_add.timer)
1004: ptr = root->root_hooks.hook_add.timer(task, NULL);
1005: else
1006: ptr = NULL;
1007:
1008: if (!ptr) {
1.5 misho 1009: #ifdef HAVE_LIBPTHREAD
1010: pthread_mutex_lock(&root->root_mtx[taskTIMER]);
1011: #endif
1.1 misho 1012: #ifdef TIMER_WITHOUT_SORT
1.9 misho 1013: TAILQ_INSERT_TAIL(&root->root_timer, TASK_ID(task), task_node);
1.1 misho 1014: #else
1.9 misho 1015: TAILQ_FOREACH_SAFE(t, &root->root_timer, task_node, tmp)
1.5 misho 1016: if (sched_timespeccmp(&TASK_TS(task), &TASK_TS(t), -) < 1)
1.1 misho 1017: break;
1018: if (!t)
1.9 misho 1019: TAILQ_INSERT_TAIL(&root->root_timer, TASK_ID(task), task_node);
1.1 misho 1020: else
1.9 misho 1021: TAILQ_INSERT_BEFORE(t, TASK_ID(task), task_node);
1.1 misho 1022: #endif
1.5 misho 1023: #ifdef HAVE_LIBPTHREAD
1024: pthread_mutex_unlock(&root->root_mtx[taskTIMER]);
1025: #endif
1.4 misho 1026: } else
1.13 misho 1027: task = sched_unuseTask(task);
1.1 misho 1028:
1029: return task;
1030: }
1031:
1032: /*
1033: * schedEvent() - Add EVENT task to scheduler queue
1.6 misho 1034: *
1.1 misho 1035: * @root = root task
1036: * @func = task execution function
1037: * @arg = 1st func argument
1.2 misho 1038: * @val = additional func argument
1.5 misho 1039: * @opt_data = Optional data
1040: * @opt_dlen = Optional data length
1.1 misho 1041: * return: NULL error or !=NULL new queued task
1042: */
1043: sched_task_t *
1.5 misho 1044: schedEvent(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val,
1045: void *opt_data, size_t opt_dlen)
1.1 misho 1046: {
1047: sched_task_t *task;
1048: void *ptr;
1049:
1050: if (!root || !func)
1051: return NULL;
1052:
1053: /* get new task */
1.13 misho 1054: if (!(task = sched_useTask(root)))
1.4 misho 1055: return NULL;
1.1 misho 1056:
1057: task->task_func = func;
1.5 misho 1058: TASK_TYPE(task) = taskEVENT;
1059: TASK_ROOT(task) = root;
1.1 misho 1060:
1061: TASK_ARG(task) = arg;
1.2 misho 1062: TASK_VAL(task) = val;
1.1 misho 1063:
1.5 misho 1064: TASK_DATA(task) = opt_data;
1065: TASK_DATLEN(task) = opt_dlen;
1066:
1.1 misho 1067: if (root->root_hooks.hook_add.event)
1068: ptr = root->root_hooks.hook_add.event(task, NULL);
1069: else
1070: ptr = NULL;
1071:
1.5 misho 1072: if (!ptr) {
1073: #ifdef HAVE_LIBPTHREAD
1074: pthread_mutex_lock(&root->root_mtx[taskEVENT]);
1075: #endif
1.9 misho 1076: TAILQ_INSERT_TAIL(&root->root_event, TASK_ID(task), task_node);
1.5 misho 1077: #ifdef HAVE_LIBPTHREAD
1078: pthread_mutex_unlock(&root->root_mtx[taskEVENT]);
1079: #endif
1080: } else
1.13 misho 1081: task = sched_unuseTask(task);
1.1 misho 1082:
1083: return task;
1084: }
1085:
1086:
1087: /*
1.12 misho 1088: * schedTask() - Add regular task to scheduler queue
1.6 misho 1089: *
1.1 misho 1090: * @root = root task
1091: * @func = task execution function
1092: * @arg = 1st func argument
1.12 misho 1093: * @prio = regular task priority, 0 is hi priority for regular tasks
1.5 misho 1094: * @opt_data = Optional data
1095: * @opt_dlen = Optional data length
1.1 misho 1096: * return: NULL error or !=NULL new queued task
1097: */
1098: sched_task_t *
1.12 misho 1099: schedTask(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long prio,
1.5 misho 1100: void *opt_data, size_t opt_dlen)
1.1 misho 1101: {
1.12 misho 1102: sched_task_t *task, *tmp, *t = NULL;
1.1 misho 1103: void *ptr;
1104:
1105: if (!root || !func)
1106: return NULL;
1107:
1108: /* get new task */
1.13 misho 1109: if (!(task = sched_useTask(root)))
1.4 misho 1110: return NULL;
1.1 misho 1111:
1112: task->task_func = func;
1.12 misho 1113: TASK_TYPE(task) = taskTASK;
1.5 misho 1114: TASK_ROOT(task) = root;
1.1 misho 1115:
1116: TASK_ARG(task) = arg;
1.12 misho 1117: TASK_VAL(task) = prio;
1.1 misho 1118:
1.5 misho 1119: TASK_DATA(task) = opt_data;
1120: TASK_DATLEN(task) = opt_dlen;
1121:
1.12 misho 1122: if (root->root_hooks.hook_add.task)
1123: ptr = root->root_hooks.hook_add.task(task, NULL);
1.1 misho 1124: else
1125: ptr = NULL;
1126:
1.5 misho 1127: if (!ptr) {
1128: #ifdef HAVE_LIBPTHREAD
1.12 misho 1129: pthread_mutex_lock(&root->root_mtx[taskTASK]);
1.5 misho 1130: #endif
1.12 misho 1131: TAILQ_FOREACH_SAFE(t, &root->root_task, task_node, tmp)
1132: if (TASK_VAL(task) < TASK_VAL(t))
1133: break;
1134: if (!t)
1135: TAILQ_INSERT_TAIL(&root->root_task, TASK_ID(task), task_node);
1136: else
1137: TAILQ_INSERT_BEFORE(t, TASK_ID(task), task_node);
1.5 misho 1138: #ifdef HAVE_LIBPTHREAD
1.12 misho 1139: pthread_mutex_unlock(&root->root_mtx[taskTASK]);
1.5 misho 1140: #endif
1141: } else
1.13 misho 1142: task = sched_unuseTask(task);
1.1 misho 1143:
1144: return task;
1145: }
1146:
1147: /*
1.10 misho 1148: * schedSuspend() - Add Suspended task to scheduler queue
1149: *
1150: * @root = root task
1151: * @func = task execution function
1152: * @arg = 1st func argument
1153: * @id = Trigger ID
1154: * @opt_data = Optional data
1155: * @opt_dlen = Optional data length
1156: * return: NULL error or !=NULL new queued task
1157: */
1158: sched_task_t *
1159: schedSuspend(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id,
1160: void *opt_data, size_t opt_dlen)
1161: {
1162: sched_task_t *task;
1163: void *ptr;
1164:
1165: if (!root || !func)
1166: return NULL;
1167:
1168: /* get new task */
1.13 misho 1169: if (!(task = sched_useTask(root)))
1.10 misho 1170: return NULL;
1171:
1172: task->task_func = func;
1173: TASK_TYPE(task) = taskSUSPEND;
1174: TASK_ROOT(task) = root;
1175:
1176: TASK_ARG(task) = arg;
1177: TASK_VAL(task) = id;
1178:
1179: TASK_DATA(task) = opt_data;
1180: TASK_DATLEN(task) = opt_dlen;
1181:
1182: if (root->root_hooks.hook_add.suspend)
1183: ptr = root->root_hooks.hook_add.suspend(task, NULL);
1184: else
1185: ptr = NULL;
1186:
1187: if (!ptr) {
1188: #ifdef HAVE_LIBPTHREAD
1189: pthread_mutex_lock(&root->root_mtx[taskSUSPEND]);
1190: #endif
1191: TAILQ_INSERT_TAIL(&root->root_suspend, TASK_ID(task), task_node);
1192: #ifdef HAVE_LIBPTHREAD
1193: pthread_mutex_unlock(&root->root_mtx[taskSUSPEND]);
1194: #endif
1195: } else
1.13 misho 1196: task = sched_unuseTask(task);
1.10 misho 1197:
1198: return task;
1199: }
1200:
1201: /*
1.1 misho 1202: * schedCallOnce() - Call once from scheduler
1.6 misho 1203: *
1.1 misho 1204: * @root = root task
1205: * @func = task execution function
1206: * @arg = 1st func argument
1.2 misho 1207: * @val = additional func argument
1.5 misho 1208: * @opt_data = Optional data
1209: * @opt_dlen = Optional data length
1.2 misho 1210: * return: return value from called func
1.1 misho 1211: */
1212: sched_task_t *
1.5 misho 1213: schedCallOnce(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val,
1214: void *opt_data, size_t opt_dlen)
1.1 misho 1215: {
1216: sched_task_t *task;
1.2 misho 1217: void *ret;
1.1 misho 1218:
1219: if (!root || !func)
1220: return NULL;
1221:
1222: /* get new task */
1.13 misho 1223: if (!(task = sched_useTask(root)))
1.4 misho 1224: return NULL;
1.1 misho 1225:
1226: task->task_func = func;
1.5 misho 1227: TASK_TYPE(task) = taskEVENT;
1228: TASK_ROOT(task) = root;
1.1 misho 1229:
1230: TASK_ARG(task) = arg;
1.2 misho 1231: TASK_VAL(task) = val;
1.1 misho 1232:
1.5 misho 1233: TASK_DATA(task) = opt_data;
1234: TASK_DATLEN(task) = opt_dlen;
1235:
1.2 misho 1236: ret = schedCall(task);
1.1 misho 1237:
1.13 misho 1238: sched_unuseTask(task);
1.2 misho 1239: return ret;
1.1 misho 1240: }
1.13 misho 1241:
1242: /*
1243: * schedThread() - Add thread task to scheduler queue
1244: *
1245: * @root = root task
1246: * @func = task execution function
1247: * @arg = 1st func argument
1248: * @detach = Detach thread from scheduler, if !=0
1.15 misho 1249: * @ss = stack size
1.13 misho 1250: * @opt_data = Optional data
1251: * @opt_dlen = Optional data length
1252: * return: NULL error or !=NULL new queued task
1253: */
1254: sched_task_t *
1255: schedThread(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int detach,
1.15 misho 1256: size_t ss, void *opt_data, size_t opt_dlen)
1.13 misho 1257: {
1258: #ifndef HAVE_LIBPTHREAD
1259: sched_SetErr(ENOTSUP, "Not supported thread tasks");
1260: return NULL;
1261: #endif
1262: sched_task_t *task;
1263: void *ptr;
1264: pthread_attr_t attr;
1.15 misho 1265: sem_t *s = NULL;
1.13 misho 1266:
1267: if (!root || !func)
1268: return NULL;
1.15 misho 1269: else {
1270: /* normalizing stack size & detach state */
1271: if (ss)
1272: ss &= 0x7FFFFFFF;
1273: detach = detach ? PTHREAD_CREATE_DETACHED : PTHREAD_CREATE_JOINABLE;
1274: }
1275:
1276: if (!(s = (sem_t*) malloc(sizeof(sem_t)))) {
1277: LOGERR;
1278: return NULL;
1279: }
1280: if (sem_init(s, 0, 1)) {
1281: LOGERR;
1282: free(s);
1283: return NULL;
1284: }
1.13 misho 1285:
1286: /* get new task */
1.15 misho 1287: if (!(task = sched_useTask(root))) {
1288: sem_destroy(s);
1289: free(s);
1290:
1.13 misho 1291: return NULL;
1.15 misho 1292: }
1.13 misho 1293:
1294: task->task_func = func;
1295: TASK_TYPE(task) = taskTHREAD;
1296: TASK_ROOT(task) = root;
1297:
1298: TASK_ARG(task) = arg;
1.15 misho 1299: TASK_FLAG(task) = detach;
1300: TASK_RET(task) = (intptr_t) s;
1.13 misho 1301:
1302: TASK_DATA(task) = opt_data;
1303: TASK_DATLEN(task) = opt_dlen;
1304:
1305: pthread_attr_init(&attr);
1.15 misho 1306: pthread_attr_setdetachstate(&attr, detach);
1307: if (ss && (errno = pthread_attr_setstacksize(&attr, ss))) {
1308: LOGERR;
1309: pthread_attr_destroy(&attr);
1310: sem_destroy(s);
1311: free(s);
1312: return sched_unuseTask(task);
1313: }
1314: if ((errno = pthread_attr_getstacksize(&attr, &ss))) {
1315: LOGERR;
1316: pthread_attr_destroy(&attr);
1317: sem_destroy(s);
1318: free(s);
1319: return sched_unuseTask(task);
1320: } else
1321: TASK_FLAG(task) |= (ss << 1);
1322: if ((errno = pthread_attr_setguardsize(&attr, ss))) {
1323: LOGERR;
1324: pthread_attr_destroy(&attr);
1325: sem_destroy(s);
1326: free(s);
1327: return sched_unuseTask(task);
1328: }
1329: #ifdef SCHED_RR
1330: pthread_attr_setschedpolicy(&attr, SCHED_RR);
1331: #else
1332: pthread_attr_setschedpolicy(&attr, SCHED_OTHER);
1333: #endif
1.13 misho 1334: if (root->root_hooks.hook_add.thread)
1335: ptr = root->root_hooks.hook_add.thread(task, &attr);
1336: else
1337: ptr = NULL;
1338: pthread_attr_destroy(&attr);
1339:
1340: if (!ptr) {
1341: pthread_mutex_lock(&root->root_mtx[taskTHREAD]);
1342: TAILQ_INSERT_TAIL(&root->root_thread, TASK_ID(task), task_node);
1343: pthread_mutex_unlock(&root->root_mtx[taskTHREAD]);
1.15 misho 1344:
1345: /* wait for init thread actions */
1346: sem_wait(s);
1.13 misho 1347: } else
1348: task = sched_unuseTask(task);
1349:
1.15 misho 1350: sem_destroy(s);
1351: free(s);
1.13 misho 1352: return task;
1353: }
1354:
1.17 misho 1355: /*
1356: * schedRTC() - Add RTC task to scheduler queue
1357: *
1358: * @root = root task
1359: * @func = task execution function
1360: * @arg = 1st func argument
1361: * @ts = timeout argument structure, minimum alarm timer resolution is 1msec!
1362: * @opt_data = Optional RTC ID
1.18 misho 1363: * @opt_dlen = Optional data length
1.17 misho 1364: * return: NULL error or !=NULL new queued task
1365: */
1366: sched_task_t *
1367: schedRTC(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,
1368: void *opt_data, size_t opt_dlen)
1369: {
1370: #if defined(HAVE_TIMER_CREATE) && defined(HAVE_TIMER_SETTIME)
1371: sched_task_t *task;
1372: void *ptr;
1373:
1374: if (!root || !func)
1375: return NULL;
1376:
1377: /* get new task */
1378: if (!(task = sched_useTask(root)))
1379: return NULL;
1380:
1381: task->task_func = func;
1382: TASK_TYPE(task) = taskRTC;
1383: TASK_ROOT(task) = root;
1384:
1385: TASK_ARG(task) = arg;
1386: TASK_TS(task) = ts;
1387:
1388: TASK_DATA(task) = opt_data;
1389: TASK_DATLEN(task) = opt_dlen;
1390:
1391: if (root->root_hooks.hook_add.rtc)
1392: ptr = root->root_hooks.hook_add.rtc(task, NULL);
1393: else
1394: ptr = NULL;
1395:
1396: if (!ptr) {
1397: #ifdef HAVE_LIBPTHREAD
1398: pthread_mutex_lock(&root->root_mtx[taskRTC]);
1399: #endif
1400: TAILQ_INSERT_TAIL(&root->root_rtc, TASK_ID(task), task_node);
1401: #ifdef HAVE_LIBPTHREAD
1402: pthread_mutex_unlock(&root->root_mtx[taskRTC]);
1403: #endif
1404: } else
1405: task = sched_unuseTask(task);
1406:
1407: return task;
1408: #else
1409: sched_SetErr(ENOTSUP, "Not supported realtime clock extensions");
1410: return NULL;
1411: #endif
1412: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>