Annotation of libaitsched/src/tasks.c, revision 1.20
1.1 misho 1: /*************************************************************************
2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.20 ! misho 6: * $Id: tasks.c,v 1.19.2.1 2013/08/26 13:26:56 misho Exp $
1.1 misho 7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
1.16 misho 15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013
1.1 misho 16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47:
48:
1.13 misho 49: /*
50: * sched_useTask() - Get and init new task
51: *
52: * @root = root task
53: * return: NULL error or !=NULL prepared task
54: */
1.16 misho 55: sched_task_t *
1.13 misho 56: sched_useTask(sched_root_task_t * __restrict root)
1.4 misho 57: {
1.7 misho 58: sched_task_t *task, *tmp;
1.4 misho 59:
1.14 misho 60: #ifdef HAVE_LIBPTHREAD
61: pthread_mutex_lock(&root->root_mtx[taskUNUSE]);
62: #endif
1.7 misho 63: TAILQ_FOREACH_SAFE(task, &root->root_unuse, task_node, tmp) {
1.4 misho 64: if (!TASK_ISLOCKED(task)) {
65: TAILQ_REMOVE(&root->root_unuse, task, task_node);
66: break;
67: }
68: }
1.14 misho 69: #ifdef HAVE_LIBPTHREAD
70: pthread_mutex_unlock(&root->root_mtx[taskUNUSE]);
71: #endif
1.4 misho 72:
73: if (!task) {
74: task = malloc(sizeof(sched_task_t));
75: if (!task) {
76: LOGERR;
77: return NULL;
78: }
79: }
80:
1.9 misho 81: memset(task, 0, sizeof(sched_task_t));
82: task->task_id = (uintptr_t) task;
1.4 misho 83: return task;
84: }
85:
1.13 misho 86: /*
87: * sched_unuseTask() - Unlock and put task to unuse queue
88: *
89: * @task = task
90: * return: always is NULL
91: */
1.16 misho 92: sched_task_t *
1.13 misho 93: sched_unuseTask(sched_task_t * __restrict task)
1.4 misho 94: {
95: TASK_UNLOCK(task);
1.5 misho 96: TASK_TYPE(task) = taskUNUSE;
97: #ifdef HAVE_LIBPTHREAD
98: pthread_mutex_lock(&TASK_ROOT(task)->root_mtx[taskUNUSE]);
99: #endif
1.9 misho 100: TAILQ_INSERT_TAIL(&TASK_ROOT(task)->root_unuse, TASK_ID(task), task_node);
1.5 misho 101: #ifdef HAVE_LIBPTHREAD
102: pthread_mutex_unlock(&TASK_ROOT(task)->root_mtx[taskUNUSE]);
103: #endif
1.4 misho 104: task = NULL;
105:
106: return task;
107: }
108:
1.14 misho 109: #pragma GCC visibility push(hidden)
110:
111: #ifdef HAVE_LIBPTHREAD
112: static void
113: _sched_threadCleanup(sched_task_t *t)
114: {
115: if (!t || !TASK_ROOT(t))
116: return;
117:
118: if (TASK_FLAG(t) == PTHREAD_CREATE_JOINABLE)
119: pthread_detach(pthread_self());
120:
1.15 misho 121: pthread_mutex_lock(&TASK_ROOT(t)->root_mtx[taskTHREAD]);
122: TAILQ_REMOVE(&TASK_ROOT(t)->root_thread, t, task_node);
123: pthread_mutex_unlock(&TASK_ROOT(t)->root_mtx[taskTHREAD]);
124:
1.14 misho 125: sched_unuseTask(t);
126: }
127: void *
128: _sched_threadWrapper(sched_task_t *t)
129: {
130: void *ret = NULL;
1.15 misho 131: sem_t *s = NULL;
1.20 ! misho 132: sched_root_task_t *r;
1.14 misho 133:
1.15 misho 134: if (!t || !TASK_ROOT(t) || !TASK_RET(t))
1.14 misho 135: pthread_exit(ret);
1.20 ! misho 136: else {
1.15 misho 137: s = (sem_t*) TASK_RET(t);
1.20 ! misho 138: r = TASK_ROOT(t);
! 139: }
1.14 misho 140:
141: pthread_cleanup_push((void (*)(void*)) _sched_threadCleanup, t);
142:
143: pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
144: pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
1.15 misho 145:
1.20 ! misho 146: #ifdef HAVE_LIBPTHREAD
! 147: pthread_mutex_lock(&r->root_mtx[taskTHREAD]);
! 148: #endif
! 149: TAILQ_REMOVE(&r->root_thread, t, task_node);
! 150: #ifdef HAVE_LIBPTHREAD
! 151: pthread_mutex_unlock(&r->root_mtx[taskTHREAD]);
! 152: #endif
! 153: t->task_type = taskUNUSE;
! 154: #ifdef HAVE_LIBPTHREAD
! 155: pthread_mutex_lock(&r->root_mtx[taskUNUSE]);
! 156: #endif
! 157: TAILQ_INSERT_TAIL(&r->root_unuse, t, task_node);
! 158: #ifdef HAVE_LIBPTHREAD
! 159: pthread_mutex_unlock(&r->root_mtx[taskUNUSE]);
! 160: #endif
! 161:
1.15 misho 162: /* notify parent, thread is ready for execution */
163: sem_post(s);
1.14 misho 164: pthread_testcancel();
165:
166: ret = TASK_FUNC(t)(t);
167:
168: pthread_cleanup_pop(42);
169: TASK_ROOT(t)->root_ret = ret;
170: pthread_exit(ret);
171: }
172: #endif
173:
1.19 misho 174: #if defined(HAVE_TIMER_CREATE) && defined(HAVE_TIMER_SETTIME)
175: void *
176: _sched_rtcWrapper(sched_task_t *t)
177: {
178: void *ret = NULL;
179: sched_task_func_t func;
180: sched_task_t *task;
1.20 ! misho 181: sched_root_task_t *r;
1.19 misho 182:
183: if (!t || !TASK_ROOT(t) || !TASK_DATA(t))
184: return NULL;
185: else {
1.20 ! misho 186: r = TASK_ROOT(t);
1.19 misho 187: task = (sched_task_t*) TASK_DATA(t);
188: func = TASK_FUNC(task);
189: }
190:
1.20 ! misho 191: #ifdef HAVE_LIBPTHREAD
! 192: pthread_mutex_lock(&r->root_mtx[taskRTC]);
! 193: #endif
! 194: TAILQ_REMOVE(&r->root_rtc, task, task_node);
! 195: #ifdef HAVE_LIBPTHREAD
! 196: pthread_mutex_unlock(&r->root_mtx[taskRTC]);
! 197: #endif
! 198: task->task_type = taskUNUSE;
! 199: #ifdef HAVE_LIBPTHREAD
! 200: pthread_mutex_lock(&r->root_mtx[taskUNUSE]);
! 201: #endif
! 202: TAILQ_INSERT_TAIL(&r->root_unuse, task, task_node);
! 203: #ifdef HAVE_LIBPTHREAD
! 204: pthread_mutex_unlock(&r->root_mtx[taskUNUSE]);
! 205: #endif
! 206:
1.19 misho 207: ret = func(task);
1.20 ! misho 208:
1.19 misho 209: timer_delete((timer_t) TASK_DATLEN(t));
210: return ret;
211: }
212: #endif
213:
1.14 misho 214: #pragma GCC visibility pop
215:
216: /*
217: * sched_taskExit() - Exit routine for scheduler task, explicit required for thread tasks
218: *
219: * @task = current task
220: * @retcode = return code
221: * return: return code
222: */
1.16 misho 223: void *
1.14 misho 224: sched_taskExit(sched_task_t *task, intptr_t retcode)
225: {
226: if (!task || !TASK_ROOT(task))
227: return (void*) -1;
228:
229: if (TASK_ROOT(task)->root_hooks.hook_exec.exit)
230: TASK_ROOT(task)->root_hooks.hook_exec.exit(task, (void*) retcode);
231:
232: TASK_ROOT(task)->root_ret = (void*) retcode;
233: return (void*) retcode;
234: }
235:
1.4 misho 236:
1.1 misho 237: /*
1.2 misho 238: * schedRead() - Add READ I/O task to scheduler queue
1.6 misho 239: *
1.1 misho 240: * @root = root task
241: * @func = task execution function
242: * @arg = 1st func argument
1.2 misho 243: * @fd = fd handle
1.5 misho 244: * @opt_data = Optional data
245: * @opt_dlen = Optional data length
1.1 misho 246: * return: NULL error or !=NULL new queued task
247: */
248: sched_task_t *
1.5 misho 249: schedRead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
250: void *opt_data, size_t opt_dlen)
1.1 misho 251: {
252: sched_task_t *task;
253: void *ptr;
254:
255: if (!root || !func)
256: return NULL;
257:
258: /* get new task */
1.13 misho 259: if (!(task = sched_useTask(root)))
1.4 misho 260: return NULL;
1.1 misho 261:
262: task->task_func = func;
1.5 misho 263: TASK_TYPE(task) = taskREAD;
264: TASK_ROOT(task) = root;
1.1 misho 265:
266: TASK_ARG(task) = arg;
1.2 misho 267: TASK_FD(task) = fd;
1.1 misho 268:
1.5 misho 269: TASK_DATA(task) = opt_data;
270: TASK_DATLEN(task) = opt_dlen;
271:
1.1 misho 272: if (root->root_hooks.hook_add.read)
273: ptr = root->root_hooks.hook_add.read(task, NULL);
274: else
275: ptr = NULL;
276:
1.5 misho 277: if (!ptr) {
278: #ifdef HAVE_LIBPTHREAD
279: pthread_mutex_lock(&root->root_mtx[taskREAD]);
280: #endif
1.9 misho 281: TAILQ_INSERT_TAIL(&root->root_read, TASK_ID(task), task_node);
1.5 misho 282: #ifdef HAVE_LIBPTHREAD
283: pthread_mutex_unlock(&root->root_mtx[taskREAD]);
284: #endif
285: } else
1.13 misho 286: task = sched_unuseTask(task);
1.1 misho 287:
288: return task;
289: }
290:
291: /*
1.2 misho 292: * schedWrite() - Add WRITE I/O task to scheduler queue
1.6 misho 293: *
1.1 misho 294: * @root = root task
295: * @func = task execution function
296: * @arg = 1st func argument
1.2 misho 297: * @fd = fd handle
1.5 misho 298: * @opt_data = Optional data
299: * @opt_dlen = Optional data length
1.1 misho 300: * return: NULL error or !=NULL new queued task
301: */
302: sched_task_t *
1.5 misho 303: schedWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
304: void *opt_data, size_t opt_dlen)
1.1 misho 305: {
306: sched_task_t *task;
307: void *ptr;
308:
309: if (!root || !func)
310: return NULL;
311:
312: /* get new task */
1.13 misho 313: if (!(task = sched_useTask(root)))
1.4 misho 314: return NULL;
1.1 misho 315:
316: task->task_func = func;
1.5 misho 317: TASK_TYPE(task) = taskWRITE;
318: TASK_ROOT(task) = root;
1.1 misho 319:
320: TASK_ARG(task) = arg;
1.2 misho 321: TASK_FD(task) = fd;
1.1 misho 322:
1.5 misho 323: TASK_DATA(task) = opt_data;
324: TASK_DATLEN(task) = opt_dlen;
325:
1.1 misho 326: if (root->root_hooks.hook_add.write)
327: ptr = root->root_hooks.hook_add.write(task, NULL);
328: else
329: ptr = NULL;
330:
1.5 misho 331: if (!ptr) {
332: #ifdef HAVE_LIBPTHREAD
333: pthread_mutex_lock(&root->root_mtx[taskWRITE]);
334: #endif
1.9 misho 335: TAILQ_INSERT_TAIL(&root->root_write, TASK_ID(task), task_node);
1.5 misho 336: #ifdef HAVE_LIBPTHREAD
337: pthread_mutex_unlock(&root->root_mtx[taskWRITE]);
338: #endif
339: } else
1.13 misho 340: task = sched_unuseTask(task);
1.1 misho 341:
342: return task;
343: }
344:
345: /*
1.9 misho 346: * schedNode() - Add NODE task to scheduler queue
347: *
348: * @root = root task
349: * @func = task execution function
350: * @arg = 1st func argument
351: * @fd = fd handle
352: * @opt_data = Optional data
353: * @opt_dlen = Optional data length
354: * return: NULL error or !=NULL new queued task
355: */
356: sched_task_t *
357: schedNode(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
358: void *opt_data, size_t opt_dlen)
359: {
360: sched_task_t *task;
361: void *ptr;
362:
363: if (!root || !func)
364: return NULL;
365:
366: /* get new task */
1.13 misho 367: if (!(task = sched_useTask(root)))
1.9 misho 368: return NULL;
369:
370: task->task_func = func;
371: TASK_TYPE(task) = taskNODE;
372: TASK_ROOT(task) = root;
373:
374: TASK_ARG(task) = arg;
375: TASK_FD(task) = fd;
376:
377: TASK_DATA(task) = opt_data;
378: TASK_DATLEN(task) = opt_dlen;
379:
380: if (root->root_hooks.hook_add.node)
381: ptr = root->root_hooks.hook_add.node(task, NULL);
382: else
383: ptr = NULL;
384:
385: if (!ptr) {
386: #ifdef HAVE_LIBPTHREAD
387: pthread_mutex_lock(&root->root_mtx[taskNODE]);
388: #endif
389: TAILQ_INSERT_TAIL(&root->root_node, TASK_ID(task), task_node);
390: #ifdef HAVE_LIBPTHREAD
391: pthread_mutex_unlock(&root->root_mtx[taskNODE]);
392: #endif
393: } else
1.13 misho 394: task = sched_unuseTask(task);
1.9 misho 395:
396: return task;
397: }
398:
399: /*
400: * schedProc() - Add PROC task to scheduler queue
401: *
402: * @root = root task
403: * @func = task execution function
404: * @arg = 1st func argument
405: * @pid = PID
406: * @opt_data = Optional data
407: * @opt_dlen = Optional data length
408: * return: NULL error or !=NULL new queued task
409: */
410: sched_task_t *
411: schedProc(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long pid,
412: void *opt_data, size_t opt_dlen)
413: {
414: sched_task_t *task;
415: void *ptr;
416:
417: if (!root || !func)
418: return NULL;
419:
420: /* get new task */
1.13 misho 421: if (!(task = sched_useTask(root)))
1.9 misho 422: return NULL;
423:
424: task->task_func = func;
425: TASK_TYPE(task) = taskPROC;
426: TASK_ROOT(task) = root;
427:
428: TASK_ARG(task) = arg;
429: TASK_VAL(task) = pid;
430:
431: TASK_DATA(task) = opt_data;
432: TASK_DATLEN(task) = opt_dlen;
433:
434: if (root->root_hooks.hook_add.proc)
435: ptr = root->root_hooks.hook_add.proc(task, NULL);
436: else
437: ptr = NULL;
438:
439: if (!ptr) {
440: #ifdef HAVE_LIBPTHREAD
441: pthread_mutex_lock(&root->root_mtx[taskPROC]);
442: #endif
443: TAILQ_INSERT_TAIL(&root->root_proc, TASK_ID(task), task_node);
444: #ifdef HAVE_LIBPTHREAD
445: pthread_mutex_unlock(&root->root_mtx[taskPROC]);
446: #endif
447: } else
1.13 misho 448: task = sched_unuseTask(task);
1.9 misho 449:
450: return task;
451: }
452:
453: /*
454: * schedUser() - Add trigger USER task to scheduler queue
455: *
456: * @root = root task
457: * @func = task execution function
458: * @arg = 1st func argument
459: * @id = Trigger ID
460: * @opt_data = Optional data
461: * @opt_dlen = Optional user's trigger flags
462: * return: NULL error or !=NULL new queued task
463: */
464: sched_task_t *
465: schedUser(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id,
466: void *opt_data, size_t opt_dlen)
467: {
468: #ifndef EVFILT_USER
469: sched_SetErr(ENOTSUP, "Not supported kevent() filter");
470: return NULL;
471: #else
472: sched_task_t *task;
473: void *ptr;
474:
475: if (!root || !func)
476: return NULL;
477:
478: /* get new task */
1.13 misho 479: if (!(task = sched_useTask(root)))
1.9 misho 480: return NULL;
481:
482: task->task_func = func;
483: TASK_TYPE(task) = taskUSER;
484: TASK_ROOT(task) = root;
485:
486: TASK_ARG(task) = arg;
487: TASK_VAL(task) = id;
488:
489: TASK_DATA(task) = opt_data;
490: TASK_DATLEN(task) = opt_dlen;
491:
492: if (root->root_hooks.hook_add.user)
493: ptr = root->root_hooks.hook_add.user(task, NULL);
494: else
495: ptr = NULL;
496:
497: if (!ptr) {
498: #ifdef HAVE_LIBPTHREAD
499: pthread_mutex_lock(&root->root_mtx[taskUSER]);
500: #endif
501: TAILQ_INSERT_TAIL(&root->root_user, TASK_ID(task), task_node);
502: #ifdef HAVE_LIBPTHREAD
503: pthread_mutex_unlock(&root->root_mtx[taskUSER]);
504: #endif
505: } else
1.13 misho 506: task = sched_unuseTask(task);
1.9 misho 507:
508: return task;
509: #endif
510: }
511:
512: /*
513: * schedSignal() - Add SIGNAL task to scheduler queue
514: *
515: * @root = root task
516: * @func = task execution function
517: * @arg = 1st func argument
518: * @sig = Signal
519: * @opt_data = Optional data
520: * @opt_dlen = Optional data length
521: * return: NULL error or !=NULL new queued task
522: */
523: sched_task_t *
524: schedSignal(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long sig,
525: void *opt_data, size_t opt_dlen)
526: {
527: sched_task_t *task;
528: void *ptr;
529:
530: if (!root || !func)
531: return NULL;
532:
533: /* get new task */
1.13 misho 534: if (!(task = sched_useTask(root)))
1.9 misho 535: return NULL;
536:
537: task->task_func = func;
538: TASK_TYPE(task) = taskSIGNAL;
539: TASK_ROOT(task) = root;
540:
541: TASK_ARG(task) = arg;
542: TASK_VAL(task) = sig;
543:
544: TASK_DATA(task) = opt_data;
545: TASK_DATLEN(task) = opt_dlen;
546:
547: if (root->root_hooks.hook_add.signal)
548: ptr = root->root_hooks.hook_add.signal(task, NULL);
549: else
550: ptr = NULL;
551:
552: if (!ptr) {
553: #ifdef HAVE_LIBPTHREAD
554: pthread_mutex_lock(&root->root_mtx[taskSIGNAL]);
555: #endif
556: TAILQ_INSERT_TAIL(&root->root_signal, TASK_ID(task), task_node);
557: #ifdef HAVE_LIBPTHREAD
558: pthread_mutex_unlock(&root->root_mtx[taskSIGNAL]);
559: #endif
560: } else
1.13 misho 561: task = sched_unuseTask(task);
1.9 misho 562:
563: return task;
564: }
565:
566: /*
1.8 misho 567: * schedAlarm() - Add ALARM task to scheduler queue
568: *
569: * @root = root task
570: * @func = task execution function
571: * @arg = 1st func argument
572: * @ts = timeout argument structure, minimum alarm timer resolution is 1msec!
1.17 misho 573: * @opt_data = Alarm timer ID
1.8 misho 574: * @opt_dlen = Optional data length
575: * return: NULL error or !=NULL new queued task
576: */
577: sched_task_t *
578: schedAlarm(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,
579: void *opt_data, size_t opt_dlen)
580: {
581: sched_task_t *task;
582: void *ptr;
583:
584: if (!root || !func)
585: return NULL;
586:
587: /* get new task */
1.13 misho 588: if (!(task = sched_useTask(root)))
1.8 misho 589: return NULL;
590:
591: task->task_func = func;
592: TASK_TYPE(task) = taskALARM;
593: TASK_ROOT(task) = root;
594:
595: TASK_ARG(task) = arg;
596: TASK_TS(task) = ts;
597:
598: TASK_DATA(task) = opt_data;
599: TASK_DATLEN(task) = opt_dlen;
600:
601: if (root->root_hooks.hook_add.alarm)
602: ptr = root->root_hooks.hook_add.alarm(task, NULL);
603: else
604: ptr = NULL;
605:
606: if (!ptr) {
607: #ifdef HAVE_LIBPTHREAD
608: pthread_mutex_lock(&root->root_mtx[taskALARM]);
609: #endif
1.9 misho 610: TAILQ_INSERT_TAIL(&root->root_alarm, TASK_ID(task), task_node);
1.8 misho 611: #ifdef HAVE_LIBPTHREAD
612: pthread_mutex_unlock(&root->root_mtx[taskALARM]);
613: #endif
614: } else
1.13 misho 615: task = sched_unuseTask(task);
1.8 misho 616:
617: return task;
618: }
619:
1.11 misho 620: #ifdef AIO_SUPPORT
621: /*
622: * schedAIO() - Add AIO task to scheduler queue
623: *
624: * @root = root task
625: * @func = task execution function
626: * @arg = 1st func argument
627: * @acb = AIO cb structure address
628: * @opt_data = Optional data
629: * @opt_dlen = Optional data length
630: * return: NULL error or !=NULL new queued task
631: */
632: sched_task_t *
633: schedAIO(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg,
634: struct aiocb * __restrict acb, void *opt_data, size_t opt_dlen)
635: {
636: sched_task_t *task;
637: void *ptr;
638:
639: if (!root || !func || !acb || !opt_dlen)
640: return NULL;
641:
642: /* get new task */
1.13 misho 643: if (!(task = sched_useTask(root)))
1.11 misho 644: return NULL;
645:
646: task->task_func = func;
647: TASK_TYPE(task) = taskAIO;
648: TASK_ROOT(task) = root;
649:
650: TASK_ARG(task) = arg;
651: TASK_VAL(task) = (u_long) acb;
652:
653: TASK_DATA(task) = opt_data;
654: TASK_DATLEN(task) = opt_dlen;
655:
656: if (root->root_hooks.hook_add.aio)
657: ptr = root->root_hooks.hook_add.aio(task, NULL);
658: else
659: ptr = NULL;
660:
661: if (!ptr) {
662: #ifdef HAVE_LIBPTHREAD
663: pthread_mutex_lock(&root->root_mtx[taskAIO]);
664: #endif
665: TAILQ_INSERT_TAIL(&root->root_aio, TASK_ID(task), task_node);
666: #ifdef HAVE_LIBPTHREAD
667: pthread_mutex_unlock(&root->root_mtx[taskAIO]);
668: #endif
669: } else
1.13 misho 670: task = sched_unuseTask(task);
1.11 misho 671:
672: return task;
673: }
674:
675: /*
676: * schedAIORead() - Add AIO read task to scheduler queue
677: *
678: * @root = root task
679: * @func = task execution function
680: * @arg = 1st func argument
681: * @fd = file descriptor
682: * @buffer = Buffer
683: * @buflen = Buffer length
684: * @offset = Offset from start of file, if =-1 from current position
685: * return: NULL error or !=NULL new queued task
686: */
1.16 misho 687: sched_task_t *
1.11 misho 688: schedAIORead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
689: void *buffer, size_t buflen, off_t offset)
690: {
691: struct aiocb *acb;
692: off_t off;
693:
694: if (!root || !func || !buffer || !buflen)
695: return NULL;
696:
697: if (offset == (off_t) -1) {
698: off = lseek(fd, 0, SEEK_CUR);
699: if (off == -1) {
700: LOGERR;
701: return NULL;
702: }
703: } else
704: off = offset;
705:
706: if (!(acb = malloc(sizeof(struct aiocb)))) {
707: LOGERR;
708: return NULL;
709: } else
710: memset(acb, 0, sizeof(struct aiocb));
711:
712: acb->aio_fildes = fd;
713: acb->aio_nbytes = buflen;
714: acb->aio_buf = buffer;
715: acb->aio_offset = off;
716: acb->aio_sigevent.sigev_notify = SIGEV_KEVENT;
717: acb->aio_sigevent.sigev_notify_kqueue = root->root_kq;
718: acb->aio_sigevent.sigev_value.sival_ptr = acb;
719:
720: if (aio_read(acb)) {
721: LOGERR;
722: free(acb);
723: return NULL;
724: }
725:
726: return schedAIO(root, func, arg, acb, buffer, buflen);
727: }
728:
729: /*
730: * schedAIOWrite() - Add AIO write task to scheduler queue
731: *
732: * @root = root task
733: * @func = task execution function
734: * @arg = 1st func argument
735: * @fd = file descriptor
736: * @buffer = Buffer
737: * @buflen = Buffer length
738: * @offset = Offset from start of file, if =-1 from current position
739: * return: NULL error or !=NULL new queued task
740: */
1.16 misho 741: sched_task_t *
1.11 misho 742: schedAIOWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
743: void *buffer, size_t buflen, off_t offset)
744: {
745: struct aiocb *acb;
746: off_t off;
747:
748: if (!root || !func || !buffer || !buflen)
749: return NULL;
750:
751: if (offset == (off_t) -1) {
752: off = lseek(fd, 0, SEEK_CUR);
753: if (off == -1) {
754: LOGERR;
755: return NULL;
756: }
757: } else
758: off = offset;
759:
760: if (!(acb = malloc(sizeof(struct aiocb)))) {
761: LOGERR;
762: return NULL;
763: } else
764: memset(acb, 0, sizeof(struct aiocb));
765:
766: acb->aio_fildes = fd;
767: acb->aio_nbytes = buflen;
768: acb->aio_buf = buffer;
769: acb->aio_offset = off;
770: acb->aio_sigevent.sigev_notify = SIGEV_KEVENT;
771: acb->aio_sigevent.sigev_notify_kqueue = root->root_kq;
772: acb->aio_sigevent.sigev_value.sival_ptr = acb;
773:
774: if (aio_write(acb)) {
775: LOGERR;
776: free(acb);
777: return NULL;
778: }
779:
780: return schedAIO(root, func, arg, acb, buffer, buflen);
781: }
782:
783: #ifdef EVFILT_LIO
784: /*
785: * schedLIO() - Add AIO bulk tasks to scheduler queue
786: *
787: * @root = root task
788: * @func = task execution function
789: * @arg = 1st func argument
790: * @acbs = AIO cb structure addresses
791: * @opt_data = Optional data
792: * @opt_dlen = Optional data length
793: * return: NULL error or !=NULL new queued task
794: */
795: sched_task_t *
796: schedLIO(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg,
797: struct aiocb ** __restrict acbs, void *opt_data, size_t opt_dlen)
798: {
799: sched_task_t *task;
800: void *ptr;
801:
802: if (!root || !func || !acbs || !opt_dlen)
803: return NULL;
804:
805: /* get new task */
1.13 misho 806: if (!(task = sched_useTask(root)))
1.11 misho 807: return NULL;
808:
809: task->task_func = func;
810: TASK_TYPE(task) = taskLIO;
811: TASK_ROOT(task) = root;
812:
813: TASK_ARG(task) = arg;
814: TASK_VAL(task) = (u_long) acbs;
815:
816: TASK_DATA(task) = opt_data;
817: TASK_DATLEN(task) = opt_dlen;
818:
819: if (root->root_hooks.hook_add.lio)
820: ptr = root->root_hooks.hook_add.lio(task, NULL);
821: else
822: ptr = NULL;
823:
824: if (!ptr) {
825: #ifdef HAVE_LIBPTHREAD
826: pthread_mutex_lock(&root->root_mtx[taskLIO]);
827: #endif
828: TAILQ_INSERT_TAIL(&root->root_lio, TASK_ID(task), task_node);
829: #ifdef HAVE_LIBPTHREAD
830: pthread_mutex_unlock(&root->root_mtx[taskLIO]);
831: #endif
832: } else
1.13 misho 833: task = sched_unuseTask(task);
1.11 misho 834:
835: return task;
836: }
837:
838: /*
839: * schedLIORead() - Add list of AIO read tasks to scheduler queue
840: *
841: * @root = root task
842: * @func = task execution function
843: * @arg = 1st func argument
844: * @fd = file descriptor
845: * @bufs = Buffer's list
846: * @nbufs = Number of Buffers
847: * @offset = Offset from start of file, if =-1 from current position
848: * return: NULL error or !=NULL new queued task
849: */
850: sched_task_t *
851: schedLIORead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
852: struct iovec *bufs, size_t nbufs, off_t offset)
853: {
854: struct sigevent sig;
855: struct aiocb **acb;
856: off_t off;
857: register int i;
858:
859: if (!root || !func || !bufs || !nbufs)
860: return NULL;
861:
862: if (offset == (off_t) -1) {
863: off = lseek(fd, 0, SEEK_CUR);
864: if (off == -1) {
865: LOGERR;
866: return NULL;
867: }
868: } else
869: off = offset;
870:
871: if (!(acb = calloc(sizeof(void*), nbufs))) {
872: LOGERR;
873: return NULL;
874: } else
875: memset(acb, 0, sizeof(void*) * nbufs);
876: for (i = 0; i < nbufs; off += bufs[i++].iov_len) {
877: acb[i] = malloc(sizeof(struct aiocb));
878: if (!acb[i]) {
879: LOGERR;
880: for (i = 0; i < nbufs; i++)
881: if (acb[i])
882: free(acb[i]);
883: free(acb);
884: return NULL;
885: } else
886: memset(acb[i], 0, sizeof(struct aiocb));
887: acb[i]->aio_fildes = fd;
888: acb[i]->aio_nbytes = bufs[i].iov_len;
889: acb[i]->aio_buf = bufs[i].iov_base;
890: acb[i]->aio_offset = off;
891: acb[i]->aio_lio_opcode = LIO_READ;
892: }
893: memset(&sig, 0, sizeof sig);
894: sig.sigev_notify = SIGEV_KEVENT;
895: sig.sigev_notify_kqueue = root->root_kq;
896: sig.sigev_value.sival_ptr = acb;
897:
898: if (lio_listio(LIO_NOWAIT, acb, nbufs, &sig)) {
899: LOGERR;
900: for (i = 0; i < nbufs; i++)
901: if (acb[i])
902: free(acb[i]);
903: free(acb);
904: return NULL;
905: }
906:
907: return schedLIO(root, func, arg, (void*) acb, bufs, nbufs);
908: }
909:
910: /*
911: * schedLIOWrite() - Add list of AIO write tasks to scheduler queue
912: *
913: * @root = root task
914: * @func = task execution function
915: * @arg = 1st func argument
916: * @fd = file descriptor
917: * @bufs = Buffer's list
918: * @nbufs = Number of Buffers
919: * @offset = Offset from start of file, if =-1 from current position
920: * return: NULL error or !=NULL new queued task
921: */
1.16 misho 922: sched_task_t *
1.11 misho 923: schedLIOWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
924: struct iovec *bufs, size_t nbufs, off_t offset)
925: {
926: struct sigevent sig;
927: struct aiocb **acb;
928: off_t off;
929: register int i;
930:
931: if (!root || !func || !bufs || !nbufs)
932: return NULL;
933:
934: if (offset == (off_t) -1) {
935: off = lseek(fd, 0, SEEK_CUR);
936: if (off == -1) {
937: LOGERR;
938: return NULL;
939: }
940: } else
941: off = offset;
942:
943: if (!(acb = calloc(sizeof(void*), nbufs))) {
944: LOGERR;
945: return NULL;
946: } else
947: memset(acb, 0, sizeof(void*) * nbufs);
948: for (i = 0; i < nbufs; off += bufs[i++].iov_len) {
949: acb[i] = malloc(sizeof(struct aiocb));
950: if (!acb[i]) {
951: LOGERR;
952: for (i = 0; i < nbufs; i++)
953: if (acb[i])
954: free(acb[i]);
955: free(acb);
956: return NULL;
957: } else
958: memset(acb[i], 0, sizeof(struct aiocb));
959: acb[i]->aio_fildes = fd;
960: acb[i]->aio_nbytes = bufs[i].iov_len;
961: acb[i]->aio_buf = bufs[i].iov_base;
962: acb[i]->aio_offset = off;
963: acb[i]->aio_lio_opcode = LIO_WRITE;
964: }
965: memset(&sig, 0, sizeof sig);
966: sig.sigev_notify = SIGEV_KEVENT;
967: sig.sigev_notify_kqueue = root->root_kq;
968: sig.sigev_value.sival_ptr = acb;
969:
970: if (lio_listio(LIO_NOWAIT, acb, nbufs, &sig)) {
971: LOGERR;
972: for (i = 0; i < nbufs; i++)
973: if (acb[i])
974: free(acb[i]);
975: free(acb);
976: return NULL;
977: }
978:
979: return schedLIO(root, func, arg, (void*) acb, bufs, nbufs);
980: }
981: #endif /* EVFILT_LIO */
982: #endif /* AIO_SUPPORT */
983:
1.8 misho 984: /*
1.1 misho 985: * schedTimer() - Add TIMER task to scheduler queue
1.6 misho 986: *
1.1 misho 987: * @root = root task
988: * @func = task execution function
989: * @arg = 1st func argument
1.5 misho 990: * @ts = timeout argument structure
991: * @opt_data = Optional data
992: * @opt_dlen = Optional data length
1.1 misho 993: * return: NULL error or !=NULL new queued task
994: */
995: sched_task_t *
1.5 misho 996: schedTimer(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,
997: void *opt_data, size_t opt_dlen)
1.1 misho 998: {
1.9 misho 999: sched_task_t *task, *tmp, *t = NULL;
1.1 misho 1000: void *ptr;
1.5 misho 1001: struct timespec now;
1.1 misho 1002:
1003: if (!root || !func)
1004: return NULL;
1005:
1006: /* get new task */
1.13 misho 1007: if (!(task = sched_useTask(root)))
1.4 misho 1008: return NULL;
1.1 misho 1009:
1010: task->task_func = func;
1.5 misho 1011: TASK_TYPE(task) = taskTIMER;
1012: TASK_ROOT(task) = root;
1.1 misho 1013:
1014: TASK_ARG(task) = arg;
1015:
1.5 misho 1016: TASK_DATA(task) = opt_data;
1017: TASK_DATLEN(task) = opt_dlen;
1018:
1.1 misho 1019: /* calculate timeval structure */
1.5 misho 1020: clock_gettime(CLOCK_MONOTONIC, &now);
1021: now.tv_sec += ts.tv_sec;
1022: now.tv_nsec += ts.tv_nsec;
1023: if (now.tv_nsec >= 1000000000L) {
1.1 misho 1024: now.tv_sec++;
1.5 misho 1025: now.tv_nsec -= 1000000000L;
1026: } else if (now.tv_nsec < 0) {
1.1 misho 1027: now.tv_sec--;
1.5 misho 1028: now.tv_nsec += 1000000000L;
1.1 misho 1029: }
1.5 misho 1030: TASK_TS(task) = now;
1.1 misho 1031:
1032: if (root->root_hooks.hook_add.timer)
1033: ptr = root->root_hooks.hook_add.timer(task, NULL);
1034: else
1035: ptr = NULL;
1036:
1037: if (!ptr) {
1.5 misho 1038: #ifdef HAVE_LIBPTHREAD
1039: pthread_mutex_lock(&root->root_mtx[taskTIMER]);
1040: #endif
1.1 misho 1041: #ifdef TIMER_WITHOUT_SORT
1.9 misho 1042: TAILQ_INSERT_TAIL(&root->root_timer, TASK_ID(task), task_node);
1.1 misho 1043: #else
1.9 misho 1044: TAILQ_FOREACH_SAFE(t, &root->root_timer, task_node, tmp)
1.5 misho 1045: if (sched_timespeccmp(&TASK_TS(task), &TASK_TS(t), -) < 1)
1.1 misho 1046: break;
1047: if (!t)
1.9 misho 1048: TAILQ_INSERT_TAIL(&root->root_timer, TASK_ID(task), task_node);
1.1 misho 1049: else
1.9 misho 1050: TAILQ_INSERT_BEFORE(t, TASK_ID(task), task_node);
1.1 misho 1051: #endif
1.5 misho 1052: #ifdef HAVE_LIBPTHREAD
1053: pthread_mutex_unlock(&root->root_mtx[taskTIMER]);
1054: #endif
1.4 misho 1055: } else
1.13 misho 1056: task = sched_unuseTask(task);
1.1 misho 1057:
1058: return task;
1059: }
1060:
1061: /*
1062: * schedEvent() - Add EVENT task to scheduler queue
1.6 misho 1063: *
1.1 misho 1064: * @root = root task
1065: * @func = task execution function
1066: * @arg = 1st func argument
1.2 misho 1067: * @val = additional func argument
1.5 misho 1068: * @opt_data = Optional data
1069: * @opt_dlen = Optional data length
1.1 misho 1070: * return: NULL error or !=NULL new queued task
1071: */
1072: sched_task_t *
1.5 misho 1073: schedEvent(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val,
1074: void *opt_data, size_t opt_dlen)
1.1 misho 1075: {
1076: sched_task_t *task;
1077: void *ptr;
1078:
1079: if (!root || !func)
1080: return NULL;
1081:
1082: /* get new task */
1.13 misho 1083: if (!(task = sched_useTask(root)))
1.4 misho 1084: return NULL;
1.1 misho 1085:
1086: task->task_func = func;
1.5 misho 1087: TASK_TYPE(task) = taskEVENT;
1088: TASK_ROOT(task) = root;
1.1 misho 1089:
1090: TASK_ARG(task) = arg;
1.2 misho 1091: TASK_VAL(task) = val;
1.1 misho 1092:
1.5 misho 1093: TASK_DATA(task) = opt_data;
1094: TASK_DATLEN(task) = opt_dlen;
1095:
1.1 misho 1096: if (root->root_hooks.hook_add.event)
1097: ptr = root->root_hooks.hook_add.event(task, NULL);
1098: else
1099: ptr = NULL;
1100:
1.5 misho 1101: if (!ptr) {
1102: #ifdef HAVE_LIBPTHREAD
1103: pthread_mutex_lock(&root->root_mtx[taskEVENT]);
1104: #endif
1.9 misho 1105: TAILQ_INSERT_TAIL(&root->root_event, TASK_ID(task), task_node);
1.5 misho 1106: #ifdef HAVE_LIBPTHREAD
1107: pthread_mutex_unlock(&root->root_mtx[taskEVENT]);
1108: #endif
1109: } else
1.13 misho 1110: task = sched_unuseTask(task);
1.1 misho 1111:
1112: return task;
1113: }
1114:
1115:
1116: /*
1.12 misho 1117: * schedTask() - Add regular task to scheduler queue
1.6 misho 1118: *
1.1 misho 1119: * @root = root task
1120: * @func = task execution function
1121: * @arg = 1st func argument
1.12 misho 1122: * @prio = regular task priority, 0 is hi priority for regular tasks
1.5 misho 1123: * @opt_data = Optional data
1124: * @opt_dlen = Optional data length
1.1 misho 1125: * return: NULL error or !=NULL new queued task
1126: */
1127: sched_task_t *
1.12 misho 1128: schedTask(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long prio,
1.5 misho 1129: void *opt_data, size_t opt_dlen)
1.1 misho 1130: {
1.12 misho 1131: sched_task_t *task, *tmp, *t = NULL;
1.1 misho 1132: void *ptr;
1133:
1134: if (!root || !func)
1135: return NULL;
1136:
1137: /* get new task */
1.13 misho 1138: if (!(task = sched_useTask(root)))
1.4 misho 1139: return NULL;
1.1 misho 1140:
1141: task->task_func = func;
1.12 misho 1142: TASK_TYPE(task) = taskTASK;
1.5 misho 1143: TASK_ROOT(task) = root;
1.1 misho 1144:
1145: TASK_ARG(task) = arg;
1.12 misho 1146: TASK_VAL(task) = prio;
1.1 misho 1147:
1.5 misho 1148: TASK_DATA(task) = opt_data;
1149: TASK_DATLEN(task) = opt_dlen;
1150:
1.12 misho 1151: if (root->root_hooks.hook_add.task)
1152: ptr = root->root_hooks.hook_add.task(task, NULL);
1.1 misho 1153: else
1154: ptr = NULL;
1155:
1.5 misho 1156: if (!ptr) {
1157: #ifdef HAVE_LIBPTHREAD
1.12 misho 1158: pthread_mutex_lock(&root->root_mtx[taskTASK]);
1.5 misho 1159: #endif
1.12 misho 1160: TAILQ_FOREACH_SAFE(t, &root->root_task, task_node, tmp)
1161: if (TASK_VAL(task) < TASK_VAL(t))
1162: break;
1163: if (!t)
1164: TAILQ_INSERT_TAIL(&root->root_task, TASK_ID(task), task_node);
1165: else
1166: TAILQ_INSERT_BEFORE(t, TASK_ID(task), task_node);
1.5 misho 1167: #ifdef HAVE_LIBPTHREAD
1.12 misho 1168: pthread_mutex_unlock(&root->root_mtx[taskTASK]);
1.5 misho 1169: #endif
1170: } else
1.13 misho 1171: task = sched_unuseTask(task);
1.1 misho 1172:
1173: return task;
1174: }
1175:
1176: /*
1.10 misho 1177: * schedSuspend() - Add Suspended task to scheduler queue
1178: *
1179: * @root = root task
1180: * @func = task execution function
1181: * @arg = 1st func argument
1182: * @id = Trigger ID
1183: * @opt_data = Optional data
1184: * @opt_dlen = Optional data length
1185: * return: NULL error or !=NULL new queued task
1186: */
1187: sched_task_t *
1188: schedSuspend(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id,
1189: void *opt_data, size_t opt_dlen)
1190: {
1191: sched_task_t *task;
1192: void *ptr;
1193:
1194: if (!root || !func)
1195: return NULL;
1196:
1197: /* get new task */
1.13 misho 1198: if (!(task = sched_useTask(root)))
1.10 misho 1199: return NULL;
1200:
1201: task->task_func = func;
1202: TASK_TYPE(task) = taskSUSPEND;
1203: TASK_ROOT(task) = root;
1204:
1205: TASK_ARG(task) = arg;
1206: TASK_VAL(task) = id;
1207:
1208: TASK_DATA(task) = opt_data;
1209: TASK_DATLEN(task) = opt_dlen;
1210:
1211: if (root->root_hooks.hook_add.suspend)
1212: ptr = root->root_hooks.hook_add.suspend(task, NULL);
1213: else
1214: ptr = NULL;
1215:
1216: if (!ptr) {
1217: #ifdef HAVE_LIBPTHREAD
1218: pthread_mutex_lock(&root->root_mtx[taskSUSPEND]);
1219: #endif
1220: TAILQ_INSERT_TAIL(&root->root_suspend, TASK_ID(task), task_node);
1221: #ifdef HAVE_LIBPTHREAD
1222: pthread_mutex_unlock(&root->root_mtx[taskSUSPEND]);
1223: #endif
1224: } else
1.13 misho 1225: task = sched_unuseTask(task);
1.10 misho 1226:
1227: return task;
1228: }
1229:
1230: /*
1.1 misho 1231: * schedCallOnce() - Call once from scheduler
1.6 misho 1232: *
1.1 misho 1233: * @root = root task
1234: * @func = task execution function
1235: * @arg = 1st func argument
1.2 misho 1236: * @val = additional func argument
1.5 misho 1237: * @opt_data = Optional data
1238: * @opt_dlen = Optional data length
1.2 misho 1239: * return: return value from called func
1.1 misho 1240: */
1241: sched_task_t *
1.5 misho 1242: schedCallOnce(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val,
1243: void *opt_data, size_t opt_dlen)
1.1 misho 1244: {
1245: sched_task_t *task;
1.2 misho 1246: void *ret;
1.1 misho 1247:
1248: if (!root || !func)
1249: return NULL;
1250:
1251: /* get new task */
1.13 misho 1252: if (!(task = sched_useTask(root)))
1.4 misho 1253: return NULL;
1.1 misho 1254:
1255: task->task_func = func;
1.5 misho 1256: TASK_TYPE(task) = taskEVENT;
1257: TASK_ROOT(task) = root;
1.1 misho 1258:
1259: TASK_ARG(task) = arg;
1.2 misho 1260: TASK_VAL(task) = val;
1.1 misho 1261:
1.5 misho 1262: TASK_DATA(task) = opt_data;
1263: TASK_DATLEN(task) = opt_dlen;
1264:
1.2 misho 1265: ret = schedCall(task);
1.1 misho 1266:
1.13 misho 1267: sched_unuseTask(task);
1.2 misho 1268: return ret;
1.1 misho 1269: }
1.13 misho 1270:
1271: /*
1272: * schedThread() - Add thread task to scheduler queue
1273: *
1274: * @root = root task
1275: * @func = task execution function
1276: * @arg = 1st func argument
1277: * @detach = Detach thread from scheduler, if !=0
1.15 misho 1278: * @ss = stack size
1.13 misho 1279: * @opt_data = Optional data
1280: * @opt_dlen = Optional data length
1281: * return: NULL error or !=NULL new queued task
1282: */
1283: sched_task_t *
1284: schedThread(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int detach,
1.15 misho 1285: size_t ss, void *opt_data, size_t opt_dlen)
1.13 misho 1286: {
1287: #ifndef HAVE_LIBPTHREAD
1288: sched_SetErr(ENOTSUP, "Not supported thread tasks");
1289: return NULL;
1290: #endif
1291: sched_task_t *task;
1292: void *ptr;
1293: pthread_attr_t attr;
1.15 misho 1294: sem_t *s = NULL;
1.13 misho 1295:
1296: if (!root || !func)
1297: return NULL;
1.15 misho 1298: else {
1299: /* normalizing stack size & detach state */
1300: if (ss)
1301: ss &= 0x7FFFFFFF;
1302: detach = detach ? PTHREAD_CREATE_DETACHED : PTHREAD_CREATE_JOINABLE;
1303: }
1304:
1305: if (!(s = (sem_t*) malloc(sizeof(sem_t)))) {
1306: LOGERR;
1307: return NULL;
1308: }
1309: if (sem_init(s, 0, 1)) {
1310: LOGERR;
1311: free(s);
1312: return NULL;
1313: }
1.13 misho 1314:
1315: /* get new task */
1.15 misho 1316: if (!(task = sched_useTask(root))) {
1317: sem_destroy(s);
1318: free(s);
1319:
1.13 misho 1320: return NULL;
1.15 misho 1321: }
1.13 misho 1322:
1323: task->task_func = func;
1324: TASK_TYPE(task) = taskTHREAD;
1325: TASK_ROOT(task) = root;
1326:
1327: TASK_ARG(task) = arg;
1.15 misho 1328: TASK_FLAG(task) = detach;
1329: TASK_RET(task) = (intptr_t) s;
1.13 misho 1330:
1331: TASK_DATA(task) = opt_data;
1332: TASK_DATLEN(task) = opt_dlen;
1333:
1334: pthread_attr_init(&attr);
1.15 misho 1335: pthread_attr_setdetachstate(&attr, detach);
1336: if (ss && (errno = pthread_attr_setstacksize(&attr, ss))) {
1337: LOGERR;
1338: pthread_attr_destroy(&attr);
1339: sem_destroy(s);
1340: free(s);
1341: return sched_unuseTask(task);
1342: }
1343: if ((errno = pthread_attr_getstacksize(&attr, &ss))) {
1344: LOGERR;
1345: pthread_attr_destroy(&attr);
1346: sem_destroy(s);
1347: free(s);
1348: return sched_unuseTask(task);
1349: } else
1350: TASK_FLAG(task) |= (ss << 1);
1351: if ((errno = pthread_attr_setguardsize(&attr, ss))) {
1352: LOGERR;
1353: pthread_attr_destroy(&attr);
1354: sem_destroy(s);
1355: free(s);
1356: return sched_unuseTask(task);
1357: }
1358: #ifdef SCHED_RR
1359: pthread_attr_setschedpolicy(&attr, SCHED_RR);
1360: #else
1361: pthread_attr_setschedpolicy(&attr, SCHED_OTHER);
1362: #endif
1.13 misho 1363: if (root->root_hooks.hook_add.thread)
1364: ptr = root->root_hooks.hook_add.thread(task, &attr);
1365: else
1366: ptr = NULL;
1367: pthread_attr_destroy(&attr);
1368:
1369: if (!ptr) {
1370: pthread_mutex_lock(&root->root_mtx[taskTHREAD]);
1371: TAILQ_INSERT_TAIL(&root->root_thread, TASK_ID(task), task_node);
1372: pthread_mutex_unlock(&root->root_mtx[taskTHREAD]);
1.15 misho 1373:
1374: /* wait for init thread actions */
1375: sem_wait(s);
1.13 misho 1376: } else
1377: task = sched_unuseTask(task);
1378:
1.15 misho 1379: sem_destroy(s);
1380: free(s);
1.13 misho 1381: return task;
1382: }
1383:
1.17 misho 1384: /*
1385: * schedRTC() - Add RTC task to scheduler queue
1386: *
1387: * @root = root task
1388: * @func = task execution function
1389: * @arg = 1st func argument
1390: * @ts = timeout argument structure, minimum alarm timer resolution is 1msec!
1391: * @opt_data = Optional RTC ID
1.18 misho 1392: * @opt_dlen = Optional data length
1.17 misho 1393: * return: NULL error or !=NULL new queued task
1394: */
1395: sched_task_t *
1396: schedRTC(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,
1397: void *opt_data, size_t opt_dlen)
1398: {
1399: #if defined(HAVE_TIMER_CREATE) && defined(HAVE_TIMER_SETTIME)
1400: sched_task_t *task;
1401: void *ptr;
1402:
1403: if (!root || !func)
1404: return NULL;
1405:
1406: /* get new task */
1407: if (!(task = sched_useTask(root)))
1408: return NULL;
1409:
1410: task->task_func = func;
1411: TASK_TYPE(task) = taskRTC;
1412: TASK_ROOT(task) = root;
1413:
1414: TASK_ARG(task) = arg;
1415: TASK_TS(task) = ts;
1416:
1417: TASK_DATA(task) = opt_data;
1418: TASK_DATLEN(task) = opt_dlen;
1419:
1420: if (root->root_hooks.hook_add.rtc)
1421: ptr = root->root_hooks.hook_add.rtc(task, NULL);
1422: else
1423: ptr = NULL;
1424:
1425: if (!ptr) {
1426: #ifdef HAVE_LIBPTHREAD
1427: pthread_mutex_lock(&root->root_mtx[taskRTC]);
1428: #endif
1429: TAILQ_INSERT_TAIL(&root->root_rtc, TASK_ID(task), task_node);
1430: #ifdef HAVE_LIBPTHREAD
1431: pthread_mutex_unlock(&root->root_mtx[taskRTC]);
1432: #endif
1433: } else
1434: task = sched_unuseTask(task);
1435:
1436: return task;
1437: #else
1438: sched_SetErr(ENOTSUP, "Not supported realtime clock extensions");
1439: return NULL;
1440: #endif
1441: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>