Annotation of libaitsched/src/tasks.c, revision 1.18.2.2
1.1 misho 1: /*************************************************************************
2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.18.2.2! misho 6: * $Id: tasks.c,v 1.18.2.1 2013/08/26 07:30:07 misho Exp $
1.1 misho 7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
1.16 misho 15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013
1.1 misho 16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47:
48:
1.13 misho 49: /*
50: * sched_useTask() - Get and init new task
51: *
52: * @root = root task
53: * return: NULL error or !=NULL prepared task
54: */
1.16 misho 55: sched_task_t *
1.13 misho 56: sched_useTask(sched_root_task_t * __restrict root)
1.4 misho 57: {
1.7 misho 58: sched_task_t *task, *tmp;
1.4 misho 59:
1.14 misho 60: #ifdef HAVE_LIBPTHREAD
61: pthread_mutex_lock(&root->root_mtx[taskUNUSE]);
62: #endif
1.7 misho 63: TAILQ_FOREACH_SAFE(task, &root->root_unuse, task_node, tmp) {
1.4 misho 64: if (!TASK_ISLOCKED(task)) {
65: TAILQ_REMOVE(&root->root_unuse, task, task_node);
66: break;
67: }
68: }
1.14 misho 69: #ifdef HAVE_LIBPTHREAD
70: pthread_mutex_unlock(&root->root_mtx[taskUNUSE]);
71: #endif
1.4 misho 72:
73: if (!task) {
74: task = malloc(sizeof(sched_task_t));
75: if (!task) {
76: LOGERR;
77: return NULL;
78: }
79: }
80:
1.9 misho 81: memset(task, 0, sizeof(sched_task_t));
82: task->task_id = (uintptr_t) task;
1.4 misho 83: return task;
84: }
85:
1.13 misho 86: /*
87: * sched_unuseTask() - Unlock and put task to unuse queue
88: *
89: * @task = task
90: * return: always is NULL
91: */
1.16 misho 92: sched_task_t *
1.13 misho 93: sched_unuseTask(sched_task_t * __restrict task)
1.4 misho 94: {
95: TASK_UNLOCK(task);
1.5 misho 96: TASK_TYPE(task) = taskUNUSE;
97: #ifdef HAVE_LIBPTHREAD
98: pthread_mutex_lock(&TASK_ROOT(task)->root_mtx[taskUNUSE]);
99: #endif
1.9 misho 100: TAILQ_INSERT_TAIL(&TASK_ROOT(task)->root_unuse, TASK_ID(task), task_node);
1.5 misho 101: #ifdef HAVE_LIBPTHREAD
102: pthread_mutex_unlock(&TASK_ROOT(task)->root_mtx[taskUNUSE]);
103: #endif
1.4 misho 104: task = NULL;
105:
106: return task;
107: }
108:
1.14 misho 109: #pragma GCC visibility push(hidden)
110:
111: #ifdef HAVE_LIBPTHREAD
112: static void
113: _sched_threadCleanup(sched_task_t *t)
114: {
115: if (!t || !TASK_ROOT(t))
116: return;
117:
118: if (TASK_FLAG(t) == PTHREAD_CREATE_JOINABLE)
119: pthread_detach(pthread_self());
120:
1.15 misho 121: pthread_mutex_lock(&TASK_ROOT(t)->root_mtx[taskTHREAD]);
122: TAILQ_REMOVE(&TASK_ROOT(t)->root_thread, t, task_node);
123: pthread_mutex_unlock(&TASK_ROOT(t)->root_mtx[taskTHREAD]);
124:
1.14 misho 125: sched_unuseTask(t);
126: }
127: void *
128: _sched_threadWrapper(sched_task_t *t)
129: {
130: void *ret = NULL;
1.15 misho 131: sem_t *s = NULL;
1.14 misho 132:
1.15 misho 133: if (!t || !TASK_ROOT(t) || !TASK_RET(t))
1.14 misho 134: pthread_exit(ret);
1.15 misho 135: else
136: s = (sem_t*) TASK_RET(t);
1.14 misho 137:
138: pthread_cleanup_push((void (*)(void*)) _sched_threadCleanup, t);
139:
140: pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
141: pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
1.15 misho 142:
143: /* notify parent, thread is ready for execution */
144: sem_post(s);
1.14 misho 145: pthread_testcancel();
146:
147: ret = TASK_FUNC(t)(t);
148:
149: pthread_cleanup_pop(42);
150: TASK_ROOT(t)->root_ret = ret;
151: pthread_exit(ret);
152: }
153: #endif
154:
1.18.2.1 misho 155: #if defined(HAVE_TIMER_CREATE) && defined(HAVE_TIMER_SETTIME)
156: void *
157: _sched_rtcWrapper(sched_task_t *t)
158: {
159: void *ret = NULL;
160: sched_task_func_t func;
1.18.2.2! misho 161: sched_task_t *task;
1.18.2.1 misho 162:
1.18.2.2! misho 163: if (!t || !TASK_ROOT(t) || !TASK_DATA(t))
1.18.2.1 misho 164: return NULL;
1.18.2.2! misho 165: else {
! 166: task = (sched_task_t*) TASK_DATA(t);
! 167: func = TASK_FUNC(task);
! 168: }
1.18.2.1 misho 169:
1.18.2.2! misho 170: ret = func(task);
1.18.2.1 misho 171: timer_delete((timer_t) TASK_DATLEN(t));
172: return ret;
173: }
174: #endif
175:
1.14 misho 176: #pragma GCC visibility pop
177:
178: /*
179: * sched_taskExit() - Exit routine for scheduler task, explicit required for thread tasks
180: *
181: * @task = current task
182: * @retcode = return code
183: * return: return code
184: */
1.16 misho 185: void *
1.14 misho 186: sched_taskExit(sched_task_t *task, intptr_t retcode)
187: {
188: if (!task || !TASK_ROOT(task))
189: return (void*) -1;
190:
191: if (TASK_ROOT(task)->root_hooks.hook_exec.exit)
192: TASK_ROOT(task)->root_hooks.hook_exec.exit(task, (void*) retcode);
193:
194: TASK_ROOT(task)->root_ret = (void*) retcode;
195: return (void*) retcode;
196: }
197:
1.4 misho 198:
1.1 misho 199: /*
1.2 misho 200: * schedRead() - Add READ I/O task to scheduler queue
1.6 misho 201: *
1.1 misho 202: * @root = root task
203: * @func = task execution function
204: * @arg = 1st func argument
1.2 misho 205: * @fd = fd handle
1.5 misho 206: * @opt_data = Optional data
207: * @opt_dlen = Optional data length
1.1 misho 208: * return: NULL error or !=NULL new queued task
209: */
210: sched_task_t *
1.5 misho 211: schedRead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
212: void *opt_data, size_t opt_dlen)
1.1 misho 213: {
214: sched_task_t *task;
215: void *ptr;
216:
217: if (!root || !func)
218: return NULL;
219:
220: /* get new task */
1.13 misho 221: if (!(task = sched_useTask(root)))
1.4 misho 222: return NULL;
1.1 misho 223:
224: task->task_func = func;
1.5 misho 225: TASK_TYPE(task) = taskREAD;
226: TASK_ROOT(task) = root;
1.1 misho 227:
228: TASK_ARG(task) = arg;
1.2 misho 229: TASK_FD(task) = fd;
1.1 misho 230:
1.5 misho 231: TASK_DATA(task) = opt_data;
232: TASK_DATLEN(task) = opt_dlen;
233:
1.1 misho 234: if (root->root_hooks.hook_add.read)
235: ptr = root->root_hooks.hook_add.read(task, NULL);
236: else
237: ptr = NULL;
238:
1.5 misho 239: if (!ptr) {
240: #ifdef HAVE_LIBPTHREAD
241: pthread_mutex_lock(&root->root_mtx[taskREAD]);
242: #endif
1.9 misho 243: TAILQ_INSERT_TAIL(&root->root_read, TASK_ID(task), task_node);
1.5 misho 244: #ifdef HAVE_LIBPTHREAD
245: pthread_mutex_unlock(&root->root_mtx[taskREAD]);
246: #endif
247: } else
1.13 misho 248: task = sched_unuseTask(task);
1.1 misho 249:
250: return task;
251: }
252:
253: /*
1.2 misho 254: * schedWrite() - Add WRITE I/O task to scheduler queue
1.6 misho 255: *
1.1 misho 256: * @root = root task
257: * @func = task execution function
258: * @arg = 1st func argument
1.2 misho 259: * @fd = fd handle
1.5 misho 260: * @opt_data = Optional data
261: * @opt_dlen = Optional data length
1.1 misho 262: * return: NULL error or !=NULL new queued task
263: */
264: sched_task_t *
1.5 misho 265: schedWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
266: void *opt_data, size_t opt_dlen)
1.1 misho 267: {
268: sched_task_t *task;
269: void *ptr;
270:
271: if (!root || !func)
272: return NULL;
273:
274: /* get new task */
1.13 misho 275: if (!(task = sched_useTask(root)))
1.4 misho 276: return NULL;
1.1 misho 277:
278: task->task_func = func;
1.5 misho 279: TASK_TYPE(task) = taskWRITE;
280: TASK_ROOT(task) = root;
1.1 misho 281:
282: TASK_ARG(task) = arg;
1.2 misho 283: TASK_FD(task) = fd;
1.1 misho 284:
1.5 misho 285: TASK_DATA(task) = opt_data;
286: TASK_DATLEN(task) = opt_dlen;
287:
1.1 misho 288: if (root->root_hooks.hook_add.write)
289: ptr = root->root_hooks.hook_add.write(task, NULL);
290: else
291: ptr = NULL;
292:
1.5 misho 293: if (!ptr) {
294: #ifdef HAVE_LIBPTHREAD
295: pthread_mutex_lock(&root->root_mtx[taskWRITE]);
296: #endif
1.9 misho 297: TAILQ_INSERT_TAIL(&root->root_write, TASK_ID(task), task_node);
1.5 misho 298: #ifdef HAVE_LIBPTHREAD
299: pthread_mutex_unlock(&root->root_mtx[taskWRITE]);
300: #endif
301: } else
1.13 misho 302: task = sched_unuseTask(task);
1.1 misho 303:
304: return task;
305: }
306:
307: /*
1.9 misho 308: * schedNode() - Add NODE task to scheduler queue
309: *
310: * @root = root task
311: * @func = task execution function
312: * @arg = 1st func argument
313: * @fd = fd handle
314: * @opt_data = Optional data
315: * @opt_dlen = Optional data length
316: * return: NULL error or !=NULL new queued task
317: */
318: sched_task_t *
319: schedNode(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
320: void *opt_data, size_t opt_dlen)
321: {
322: sched_task_t *task;
323: void *ptr;
324:
325: if (!root || !func)
326: return NULL;
327:
328: /* get new task */
1.13 misho 329: if (!(task = sched_useTask(root)))
1.9 misho 330: return NULL;
331:
332: task->task_func = func;
333: TASK_TYPE(task) = taskNODE;
334: TASK_ROOT(task) = root;
335:
336: TASK_ARG(task) = arg;
337: TASK_FD(task) = fd;
338:
339: TASK_DATA(task) = opt_data;
340: TASK_DATLEN(task) = opt_dlen;
341:
342: if (root->root_hooks.hook_add.node)
343: ptr = root->root_hooks.hook_add.node(task, NULL);
344: else
345: ptr = NULL;
346:
347: if (!ptr) {
348: #ifdef HAVE_LIBPTHREAD
349: pthread_mutex_lock(&root->root_mtx[taskNODE]);
350: #endif
351: TAILQ_INSERT_TAIL(&root->root_node, TASK_ID(task), task_node);
352: #ifdef HAVE_LIBPTHREAD
353: pthread_mutex_unlock(&root->root_mtx[taskNODE]);
354: #endif
355: } else
1.13 misho 356: task = sched_unuseTask(task);
1.9 misho 357:
358: return task;
359: }
360:
361: /*
362: * schedProc() - Add PROC task to scheduler queue
363: *
364: * @root = root task
365: * @func = task execution function
366: * @arg = 1st func argument
367: * @pid = PID
368: * @opt_data = Optional data
369: * @opt_dlen = Optional data length
370: * return: NULL error or !=NULL new queued task
371: */
372: sched_task_t *
373: schedProc(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long pid,
374: void *opt_data, size_t opt_dlen)
375: {
376: sched_task_t *task;
377: void *ptr;
378:
379: if (!root || !func)
380: return NULL;
381:
382: /* get new task */
1.13 misho 383: if (!(task = sched_useTask(root)))
1.9 misho 384: return NULL;
385:
386: task->task_func = func;
387: TASK_TYPE(task) = taskPROC;
388: TASK_ROOT(task) = root;
389:
390: TASK_ARG(task) = arg;
391: TASK_VAL(task) = pid;
392:
393: TASK_DATA(task) = opt_data;
394: TASK_DATLEN(task) = opt_dlen;
395:
396: if (root->root_hooks.hook_add.proc)
397: ptr = root->root_hooks.hook_add.proc(task, NULL);
398: else
399: ptr = NULL;
400:
401: if (!ptr) {
402: #ifdef HAVE_LIBPTHREAD
403: pthread_mutex_lock(&root->root_mtx[taskPROC]);
404: #endif
405: TAILQ_INSERT_TAIL(&root->root_proc, TASK_ID(task), task_node);
406: #ifdef HAVE_LIBPTHREAD
407: pthread_mutex_unlock(&root->root_mtx[taskPROC]);
408: #endif
409: } else
1.13 misho 410: task = sched_unuseTask(task);
1.9 misho 411:
412: return task;
413: }
414:
415: /*
416: * schedUser() - Add trigger USER task to scheduler queue
417: *
418: * @root = root task
419: * @func = task execution function
420: * @arg = 1st func argument
421: * @id = Trigger ID
422: * @opt_data = Optional data
423: * @opt_dlen = Optional user's trigger flags
424: * return: NULL error or !=NULL new queued task
425: */
426: sched_task_t *
427: schedUser(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id,
428: void *opt_data, size_t opt_dlen)
429: {
430: #ifndef EVFILT_USER
431: sched_SetErr(ENOTSUP, "Not supported kevent() filter");
432: return NULL;
433: #else
434: sched_task_t *task;
435: void *ptr;
436:
437: if (!root || !func)
438: return NULL;
439:
440: /* get new task */
1.13 misho 441: if (!(task = sched_useTask(root)))
1.9 misho 442: return NULL;
443:
444: task->task_func = func;
445: TASK_TYPE(task) = taskUSER;
446: TASK_ROOT(task) = root;
447:
448: TASK_ARG(task) = arg;
449: TASK_VAL(task) = id;
450:
451: TASK_DATA(task) = opt_data;
452: TASK_DATLEN(task) = opt_dlen;
453:
454: if (root->root_hooks.hook_add.user)
455: ptr = root->root_hooks.hook_add.user(task, NULL);
456: else
457: ptr = NULL;
458:
459: if (!ptr) {
460: #ifdef HAVE_LIBPTHREAD
461: pthread_mutex_lock(&root->root_mtx[taskUSER]);
462: #endif
463: TAILQ_INSERT_TAIL(&root->root_user, TASK_ID(task), task_node);
464: #ifdef HAVE_LIBPTHREAD
465: pthread_mutex_unlock(&root->root_mtx[taskUSER]);
466: #endif
467: } else
1.13 misho 468: task = sched_unuseTask(task);
1.9 misho 469:
470: return task;
471: #endif
472: }
473:
474: /*
475: * schedSignal() - Add SIGNAL task to scheduler queue
476: *
477: * @root = root task
478: * @func = task execution function
479: * @arg = 1st func argument
480: * @sig = Signal
481: * @opt_data = Optional data
482: * @opt_dlen = Optional data length
483: * return: NULL error or !=NULL new queued task
484: */
485: sched_task_t *
486: schedSignal(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long sig,
487: void *opt_data, size_t opt_dlen)
488: {
489: sched_task_t *task;
490: void *ptr;
491:
492: if (!root || !func)
493: return NULL;
494:
495: /* get new task */
1.13 misho 496: if (!(task = sched_useTask(root)))
1.9 misho 497: return NULL;
498:
499: task->task_func = func;
500: TASK_TYPE(task) = taskSIGNAL;
501: TASK_ROOT(task) = root;
502:
503: TASK_ARG(task) = arg;
504: TASK_VAL(task) = sig;
505:
506: TASK_DATA(task) = opt_data;
507: TASK_DATLEN(task) = opt_dlen;
508:
509: if (root->root_hooks.hook_add.signal)
510: ptr = root->root_hooks.hook_add.signal(task, NULL);
511: else
512: ptr = NULL;
513:
514: if (!ptr) {
515: #ifdef HAVE_LIBPTHREAD
516: pthread_mutex_lock(&root->root_mtx[taskSIGNAL]);
517: #endif
518: TAILQ_INSERT_TAIL(&root->root_signal, TASK_ID(task), task_node);
519: #ifdef HAVE_LIBPTHREAD
520: pthread_mutex_unlock(&root->root_mtx[taskSIGNAL]);
521: #endif
522: } else
1.13 misho 523: task = sched_unuseTask(task);
1.9 misho 524:
525: return task;
526: }
527:
528: /*
1.8 misho 529: * schedAlarm() - Add ALARM task to scheduler queue
530: *
531: * @root = root task
532: * @func = task execution function
533: * @arg = 1st func argument
534: * @ts = timeout argument structure, minimum alarm timer resolution is 1msec!
1.17 misho 535: * @opt_data = Alarm timer ID
1.8 misho 536: * @opt_dlen = Optional data length
537: * return: NULL error or !=NULL new queued task
538: */
539: sched_task_t *
540: schedAlarm(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,
541: void *opt_data, size_t opt_dlen)
542: {
543: sched_task_t *task;
544: void *ptr;
545:
546: if (!root || !func)
547: return NULL;
548:
549: /* get new task */
1.13 misho 550: if (!(task = sched_useTask(root)))
1.8 misho 551: return NULL;
552:
553: task->task_func = func;
554: TASK_TYPE(task) = taskALARM;
555: TASK_ROOT(task) = root;
556:
557: TASK_ARG(task) = arg;
558: TASK_TS(task) = ts;
559:
560: TASK_DATA(task) = opt_data;
561: TASK_DATLEN(task) = opt_dlen;
562:
563: if (root->root_hooks.hook_add.alarm)
564: ptr = root->root_hooks.hook_add.alarm(task, NULL);
565: else
566: ptr = NULL;
567:
568: if (!ptr) {
569: #ifdef HAVE_LIBPTHREAD
570: pthread_mutex_lock(&root->root_mtx[taskALARM]);
571: #endif
1.9 misho 572: TAILQ_INSERT_TAIL(&root->root_alarm, TASK_ID(task), task_node);
1.8 misho 573: #ifdef HAVE_LIBPTHREAD
574: pthread_mutex_unlock(&root->root_mtx[taskALARM]);
575: #endif
576: } else
1.13 misho 577: task = sched_unuseTask(task);
1.8 misho 578:
579: return task;
580: }
581:
1.11 misho 582: #ifdef AIO_SUPPORT
583: /*
584: * schedAIO() - Add AIO task to scheduler queue
585: *
586: * @root = root task
587: * @func = task execution function
588: * @arg = 1st func argument
589: * @acb = AIO cb structure address
590: * @opt_data = Optional data
591: * @opt_dlen = Optional data length
592: * return: NULL error or !=NULL new queued task
593: */
594: sched_task_t *
595: schedAIO(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg,
596: struct aiocb * __restrict acb, void *opt_data, size_t opt_dlen)
597: {
598: sched_task_t *task;
599: void *ptr;
600:
601: if (!root || !func || !acb || !opt_dlen)
602: return NULL;
603:
604: /* get new task */
1.13 misho 605: if (!(task = sched_useTask(root)))
1.11 misho 606: return NULL;
607:
608: task->task_func = func;
609: TASK_TYPE(task) = taskAIO;
610: TASK_ROOT(task) = root;
611:
612: TASK_ARG(task) = arg;
613: TASK_VAL(task) = (u_long) acb;
614:
615: TASK_DATA(task) = opt_data;
616: TASK_DATLEN(task) = opt_dlen;
617:
618: if (root->root_hooks.hook_add.aio)
619: ptr = root->root_hooks.hook_add.aio(task, NULL);
620: else
621: ptr = NULL;
622:
623: if (!ptr) {
624: #ifdef HAVE_LIBPTHREAD
625: pthread_mutex_lock(&root->root_mtx[taskAIO]);
626: #endif
627: TAILQ_INSERT_TAIL(&root->root_aio, TASK_ID(task), task_node);
628: #ifdef HAVE_LIBPTHREAD
629: pthread_mutex_unlock(&root->root_mtx[taskAIO]);
630: #endif
631: } else
1.13 misho 632: task = sched_unuseTask(task);
1.11 misho 633:
634: return task;
635: }
636:
637: /*
638: * schedAIORead() - Add AIO read task to scheduler queue
639: *
640: * @root = root task
641: * @func = task execution function
642: * @arg = 1st func argument
643: * @fd = file descriptor
644: * @buffer = Buffer
645: * @buflen = Buffer length
646: * @offset = Offset from start of file, if =-1 from current position
647: * return: NULL error or !=NULL new queued task
648: */
1.16 misho 649: sched_task_t *
1.11 misho 650: schedAIORead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
651: void *buffer, size_t buflen, off_t offset)
652: {
653: struct aiocb *acb;
654: off_t off;
655:
656: if (!root || !func || !buffer || !buflen)
657: return NULL;
658:
659: if (offset == (off_t) -1) {
660: off = lseek(fd, 0, SEEK_CUR);
661: if (off == -1) {
662: LOGERR;
663: return NULL;
664: }
665: } else
666: off = offset;
667:
668: if (!(acb = malloc(sizeof(struct aiocb)))) {
669: LOGERR;
670: return NULL;
671: } else
672: memset(acb, 0, sizeof(struct aiocb));
673:
674: acb->aio_fildes = fd;
675: acb->aio_nbytes = buflen;
676: acb->aio_buf = buffer;
677: acb->aio_offset = off;
678: acb->aio_sigevent.sigev_notify = SIGEV_KEVENT;
679: acb->aio_sigevent.sigev_notify_kqueue = root->root_kq;
680: acb->aio_sigevent.sigev_value.sival_ptr = acb;
681:
682: if (aio_read(acb)) {
683: LOGERR;
684: free(acb);
685: return NULL;
686: }
687:
688: return schedAIO(root, func, arg, acb, buffer, buflen);
689: }
690:
691: /*
692: * schedAIOWrite() - Add AIO write task to scheduler queue
693: *
694: * @root = root task
695: * @func = task execution function
696: * @arg = 1st func argument
697: * @fd = file descriptor
698: * @buffer = Buffer
699: * @buflen = Buffer length
700: * @offset = Offset from start of file, if =-1 from current position
701: * return: NULL error or !=NULL new queued task
702: */
1.16 misho 703: sched_task_t *
1.11 misho 704: schedAIOWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
705: void *buffer, size_t buflen, off_t offset)
706: {
707: struct aiocb *acb;
708: off_t off;
709:
710: if (!root || !func || !buffer || !buflen)
711: return NULL;
712:
713: if (offset == (off_t) -1) {
714: off = lseek(fd, 0, SEEK_CUR);
715: if (off == -1) {
716: LOGERR;
717: return NULL;
718: }
719: } else
720: off = offset;
721:
722: if (!(acb = malloc(sizeof(struct aiocb)))) {
723: LOGERR;
724: return NULL;
725: } else
726: memset(acb, 0, sizeof(struct aiocb));
727:
728: acb->aio_fildes = fd;
729: acb->aio_nbytes = buflen;
730: acb->aio_buf = buffer;
731: acb->aio_offset = off;
732: acb->aio_sigevent.sigev_notify = SIGEV_KEVENT;
733: acb->aio_sigevent.sigev_notify_kqueue = root->root_kq;
734: acb->aio_sigevent.sigev_value.sival_ptr = acb;
735:
736: if (aio_write(acb)) {
737: LOGERR;
738: free(acb);
739: return NULL;
740: }
741:
742: return schedAIO(root, func, arg, acb, buffer, buflen);
743: }
744:
745: #ifdef EVFILT_LIO
746: /*
747: * schedLIO() - Add AIO bulk tasks to scheduler queue
748: *
749: * @root = root task
750: * @func = task execution function
751: * @arg = 1st func argument
752: * @acbs = AIO cb structure addresses
753: * @opt_data = Optional data
754: * @opt_dlen = Optional data length
755: * return: NULL error or !=NULL new queued task
756: */
757: sched_task_t *
758: schedLIO(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg,
759: struct aiocb ** __restrict acbs, void *opt_data, size_t opt_dlen)
760: {
761: sched_task_t *task;
762: void *ptr;
763:
764: if (!root || !func || !acbs || !opt_dlen)
765: return NULL;
766:
767: /* get new task */
1.13 misho 768: if (!(task = sched_useTask(root)))
1.11 misho 769: return NULL;
770:
771: task->task_func = func;
772: TASK_TYPE(task) = taskLIO;
773: TASK_ROOT(task) = root;
774:
775: TASK_ARG(task) = arg;
776: TASK_VAL(task) = (u_long) acbs;
777:
778: TASK_DATA(task) = opt_data;
779: TASK_DATLEN(task) = opt_dlen;
780:
781: if (root->root_hooks.hook_add.lio)
782: ptr = root->root_hooks.hook_add.lio(task, NULL);
783: else
784: ptr = NULL;
785:
786: if (!ptr) {
787: #ifdef HAVE_LIBPTHREAD
788: pthread_mutex_lock(&root->root_mtx[taskLIO]);
789: #endif
790: TAILQ_INSERT_TAIL(&root->root_lio, TASK_ID(task), task_node);
791: #ifdef HAVE_LIBPTHREAD
792: pthread_mutex_unlock(&root->root_mtx[taskLIO]);
793: #endif
794: } else
1.13 misho 795: task = sched_unuseTask(task);
1.11 misho 796:
797: return task;
798: }
799:
800: /*
801: * schedLIORead() - Add list of AIO read tasks to scheduler queue
802: *
803: * @root = root task
804: * @func = task execution function
805: * @arg = 1st func argument
806: * @fd = file descriptor
807: * @bufs = Buffer's list
808: * @nbufs = Number of Buffers
809: * @offset = Offset from start of file, if =-1 from current position
810: * return: NULL error or !=NULL new queued task
811: */
812: sched_task_t *
813: schedLIORead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
814: struct iovec *bufs, size_t nbufs, off_t offset)
815: {
816: struct sigevent sig;
817: struct aiocb **acb;
818: off_t off;
819: register int i;
820:
821: if (!root || !func || !bufs || !nbufs)
822: return NULL;
823:
824: if (offset == (off_t) -1) {
825: off = lseek(fd, 0, SEEK_CUR);
826: if (off == -1) {
827: LOGERR;
828: return NULL;
829: }
830: } else
831: off = offset;
832:
833: if (!(acb = calloc(sizeof(void*), nbufs))) {
834: LOGERR;
835: return NULL;
836: } else
837: memset(acb, 0, sizeof(void*) * nbufs);
838: for (i = 0; i < nbufs; off += bufs[i++].iov_len) {
839: acb[i] = malloc(sizeof(struct aiocb));
840: if (!acb[i]) {
841: LOGERR;
842: for (i = 0; i < nbufs; i++)
843: if (acb[i])
844: free(acb[i]);
845: free(acb);
846: return NULL;
847: } else
848: memset(acb[i], 0, sizeof(struct aiocb));
849: acb[i]->aio_fildes = fd;
850: acb[i]->aio_nbytes = bufs[i].iov_len;
851: acb[i]->aio_buf = bufs[i].iov_base;
852: acb[i]->aio_offset = off;
853: acb[i]->aio_lio_opcode = LIO_READ;
854: }
855: memset(&sig, 0, sizeof sig);
856: sig.sigev_notify = SIGEV_KEVENT;
857: sig.sigev_notify_kqueue = root->root_kq;
858: sig.sigev_value.sival_ptr = acb;
859:
860: if (lio_listio(LIO_NOWAIT, acb, nbufs, &sig)) {
861: LOGERR;
862: for (i = 0; i < nbufs; i++)
863: if (acb[i])
864: free(acb[i]);
865: free(acb);
866: return NULL;
867: }
868:
869: return schedLIO(root, func, arg, (void*) acb, bufs, nbufs);
870: }
871:
872: /*
873: * schedLIOWrite() - Add list of AIO write tasks to scheduler queue
874: *
875: * @root = root task
876: * @func = task execution function
877: * @arg = 1st func argument
878: * @fd = file descriptor
879: * @bufs = Buffer's list
880: * @nbufs = Number of Buffers
881: * @offset = Offset from start of file, if =-1 from current position
882: * return: NULL error or !=NULL new queued task
883: */
1.16 misho 884: sched_task_t *
1.11 misho 885: schedLIOWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
886: struct iovec *bufs, size_t nbufs, off_t offset)
887: {
888: struct sigevent sig;
889: struct aiocb **acb;
890: off_t off;
891: register int i;
892:
893: if (!root || !func || !bufs || !nbufs)
894: return NULL;
895:
896: if (offset == (off_t) -1) {
897: off = lseek(fd, 0, SEEK_CUR);
898: if (off == -1) {
899: LOGERR;
900: return NULL;
901: }
902: } else
903: off = offset;
904:
905: if (!(acb = calloc(sizeof(void*), nbufs))) {
906: LOGERR;
907: return NULL;
908: } else
909: memset(acb, 0, sizeof(void*) * nbufs);
910: for (i = 0; i < nbufs; off += bufs[i++].iov_len) {
911: acb[i] = malloc(sizeof(struct aiocb));
912: if (!acb[i]) {
913: LOGERR;
914: for (i = 0; i < nbufs; i++)
915: if (acb[i])
916: free(acb[i]);
917: free(acb);
918: return NULL;
919: } else
920: memset(acb[i], 0, sizeof(struct aiocb));
921: acb[i]->aio_fildes = fd;
922: acb[i]->aio_nbytes = bufs[i].iov_len;
923: acb[i]->aio_buf = bufs[i].iov_base;
924: acb[i]->aio_offset = off;
925: acb[i]->aio_lio_opcode = LIO_WRITE;
926: }
927: memset(&sig, 0, sizeof sig);
928: sig.sigev_notify = SIGEV_KEVENT;
929: sig.sigev_notify_kqueue = root->root_kq;
930: sig.sigev_value.sival_ptr = acb;
931:
932: if (lio_listio(LIO_NOWAIT, acb, nbufs, &sig)) {
933: LOGERR;
934: for (i = 0; i < nbufs; i++)
935: if (acb[i])
936: free(acb[i]);
937: free(acb);
938: return NULL;
939: }
940:
941: return schedLIO(root, func, arg, (void*) acb, bufs, nbufs);
942: }
943: #endif /* EVFILT_LIO */
944: #endif /* AIO_SUPPORT */
945:
1.8 misho 946: /*
1.1 misho 947: * schedTimer() - Add TIMER task to scheduler queue
1.6 misho 948: *
1.1 misho 949: * @root = root task
950: * @func = task execution function
951: * @arg = 1st func argument
1.5 misho 952: * @ts = timeout argument structure
953: * @opt_data = Optional data
954: * @opt_dlen = Optional data length
1.1 misho 955: * return: NULL error or !=NULL new queued task
956: */
957: sched_task_t *
1.5 misho 958: schedTimer(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,
959: void *opt_data, size_t opt_dlen)
1.1 misho 960: {
1.9 misho 961: sched_task_t *task, *tmp, *t = NULL;
1.1 misho 962: void *ptr;
1.5 misho 963: struct timespec now;
1.1 misho 964:
965: if (!root || !func)
966: return NULL;
967:
968: /* get new task */
1.13 misho 969: if (!(task = sched_useTask(root)))
1.4 misho 970: return NULL;
1.1 misho 971:
972: task->task_func = func;
1.5 misho 973: TASK_TYPE(task) = taskTIMER;
974: TASK_ROOT(task) = root;
1.1 misho 975:
976: TASK_ARG(task) = arg;
977:
1.5 misho 978: TASK_DATA(task) = opt_data;
979: TASK_DATLEN(task) = opt_dlen;
980:
1.1 misho 981: /* calculate timeval structure */
1.5 misho 982: clock_gettime(CLOCK_MONOTONIC, &now);
983: now.tv_sec += ts.tv_sec;
984: now.tv_nsec += ts.tv_nsec;
985: if (now.tv_nsec >= 1000000000L) {
1.1 misho 986: now.tv_sec++;
1.5 misho 987: now.tv_nsec -= 1000000000L;
988: } else if (now.tv_nsec < 0) {
1.1 misho 989: now.tv_sec--;
1.5 misho 990: now.tv_nsec += 1000000000L;
1.1 misho 991: }
1.5 misho 992: TASK_TS(task) = now;
1.1 misho 993:
994: if (root->root_hooks.hook_add.timer)
995: ptr = root->root_hooks.hook_add.timer(task, NULL);
996: else
997: ptr = NULL;
998:
999: if (!ptr) {
1.5 misho 1000: #ifdef HAVE_LIBPTHREAD
1001: pthread_mutex_lock(&root->root_mtx[taskTIMER]);
1002: #endif
1.1 misho 1003: #ifdef TIMER_WITHOUT_SORT
1.9 misho 1004: TAILQ_INSERT_TAIL(&root->root_timer, TASK_ID(task), task_node);
1.1 misho 1005: #else
1.9 misho 1006: TAILQ_FOREACH_SAFE(t, &root->root_timer, task_node, tmp)
1.5 misho 1007: if (sched_timespeccmp(&TASK_TS(task), &TASK_TS(t), -) < 1)
1.1 misho 1008: break;
1009: if (!t)
1.9 misho 1010: TAILQ_INSERT_TAIL(&root->root_timer, TASK_ID(task), task_node);
1.1 misho 1011: else
1.9 misho 1012: TAILQ_INSERT_BEFORE(t, TASK_ID(task), task_node);
1.1 misho 1013: #endif
1.5 misho 1014: #ifdef HAVE_LIBPTHREAD
1015: pthread_mutex_unlock(&root->root_mtx[taskTIMER]);
1016: #endif
1.4 misho 1017: } else
1.13 misho 1018: task = sched_unuseTask(task);
1.1 misho 1019:
1020: return task;
1021: }
1022:
1023: /*
1024: * schedEvent() - Add EVENT task to scheduler queue
1.6 misho 1025: *
1.1 misho 1026: * @root = root task
1027: * @func = task execution function
1028: * @arg = 1st func argument
1.2 misho 1029: * @val = additional func argument
1.5 misho 1030: * @opt_data = Optional data
1031: * @opt_dlen = Optional data length
1.1 misho 1032: * return: NULL error or !=NULL new queued task
1033: */
1034: sched_task_t *
1.5 misho 1035: schedEvent(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val,
1036: void *opt_data, size_t opt_dlen)
1.1 misho 1037: {
1038: sched_task_t *task;
1039: void *ptr;
1040:
1041: if (!root || !func)
1042: return NULL;
1043:
1044: /* get new task */
1.13 misho 1045: if (!(task = sched_useTask(root)))
1.4 misho 1046: return NULL;
1.1 misho 1047:
1048: task->task_func = func;
1.5 misho 1049: TASK_TYPE(task) = taskEVENT;
1050: TASK_ROOT(task) = root;
1.1 misho 1051:
1052: TASK_ARG(task) = arg;
1.2 misho 1053: TASK_VAL(task) = val;
1.1 misho 1054:
1.5 misho 1055: TASK_DATA(task) = opt_data;
1056: TASK_DATLEN(task) = opt_dlen;
1057:
1.1 misho 1058: if (root->root_hooks.hook_add.event)
1059: ptr = root->root_hooks.hook_add.event(task, NULL);
1060: else
1061: ptr = NULL;
1062:
1.5 misho 1063: if (!ptr) {
1064: #ifdef HAVE_LIBPTHREAD
1065: pthread_mutex_lock(&root->root_mtx[taskEVENT]);
1066: #endif
1.9 misho 1067: TAILQ_INSERT_TAIL(&root->root_event, TASK_ID(task), task_node);
1.5 misho 1068: #ifdef HAVE_LIBPTHREAD
1069: pthread_mutex_unlock(&root->root_mtx[taskEVENT]);
1070: #endif
1071: } else
1.13 misho 1072: task = sched_unuseTask(task);
1.1 misho 1073:
1074: return task;
1075: }
1076:
1077:
1078: /*
1.12 misho 1079: * schedTask() - Add regular task to scheduler queue
1.6 misho 1080: *
1.1 misho 1081: * @root = root task
1082: * @func = task execution function
1083: * @arg = 1st func argument
1.12 misho 1084: * @prio = regular task priority, 0 is hi priority for regular tasks
1.5 misho 1085: * @opt_data = Optional data
1086: * @opt_dlen = Optional data length
1.1 misho 1087: * return: NULL error or !=NULL new queued task
1088: */
1089: sched_task_t *
1.12 misho 1090: schedTask(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long prio,
1.5 misho 1091: void *opt_data, size_t opt_dlen)
1.1 misho 1092: {
1.12 misho 1093: sched_task_t *task, *tmp, *t = NULL;
1.1 misho 1094: void *ptr;
1095:
1096: if (!root || !func)
1097: return NULL;
1098:
1099: /* get new task */
1.13 misho 1100: if (!(task = sched_useTask(root)))
1.4 misho 1101: return NULL;
1.1 misho 1102:
1103: task->task_func = func;
1.12 misho 1104: TASK_TYPE(task) = taskTASK;
1.5 misho 1105: TASK_ROOT(task) = root;
1.1 misho 1106:
1107: TASK_ARG(task) = arg;
1.12 misho 1108: TASK_VAL(task) = prio;
1.1 misho 1109:
1.5 misho 1110: TASK_DATA(task) = opt_data;
1111: TASK_DATLEN(task) = opt_dlen;
1112:
1.12 misho 1113: if (root->root_hooks.hook_add.task)
1114: ptr = root->root_hooks.hook_add.task(task, NULL);
1.1 misho 1115: else
1116: ptr = NULL;
1117:
1.5 misho 1118: if (!ptr) {
1119: #ifdef HAVE_LIBPTHREAD
1.12 misho 1120: pthread_mutex_lock(&root->root_mtx[taskTASK]);
1.5 misho 1121: #endif
1.12 misho 1122: TAILQ_FOREACH_SAFE(t, &root->root_task, task_node, tmp)
1123: if (TASK_VAL(task) < TASK_VAL(t))
1124: break;
1125: if (!t)
1126: TAILQ_INSERT_TAIL(&root->root_task, TASK_ID(task), task_node);
1127: else
1128: TAILQ_INSERT_BEFORE(t, TASK_ID(task), task_node);
1.5 misho 1129: #ifdef HAVE_LIBPTHREAD
1.12 misho 1130: pthread_mutex_unlock(&root->root_mtx[taskTASK]);
1.5 misho 1131: #endif
1132: } else
1.13 misho 1133: task = sched_unuseTask(task);
1.1 misho 1134:
1135: return task;
1136: }
1137:
1138: /*
1.10 misho 1139: * schedSuspend() - Add Suspended task to scheduler queue
1140: *
1141: * @root = root task
1142: * @func = task execution function
1143: * @arg = 1st func argument
1144: * @id = Trigger ID
1145: * @opt_data = Optional data
1146: * @opt_dlen = Optional data length
1147: * return: NULL error or !=NULL new queued task
1148: */
1149: sched_task_t *
1150: schedSuspend(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id,
1151: void *opt_data, size_t opt_dlen)
1152: {
1153: sched_task_t *task;
1154: void *ptr;
1155:
1156: if (!root || !func)
1157: return NULL;
1158:
1159: /* get new task */
1.13 misho 1160: if (!(task = sched_useTask(root)))
1.10 misho 1161: return NULL;
1162:
1163: task->task_func = func;
1164: TASK_TYPE(task) = taskSUSPEND;
1165: TASK_ROOT(task) = root;
1166:
1167: TASK_ARG(task) = arg;
1168: TASK_VAL(task) = id;
1169:
1170: TASK_DATA(task) = opt_data;
1171: TASK_DATLEN(task) = opt_dlen;
1172:
1173: if (root->root_hooks.hook_add.suspend)
1174: ptr = root->root_hooks.hook_add.suspend(task, NULL);
1175: else
1176: ptr = NULL;
1177:
1178: if (!ptr) {
1179: #ifdef HAVE_LIBPTHREAD
1180: pthread_mutex_lock(&root->root_mtx[taskSUSPEND]);
1181: #endif
1182: TAILQ_INSERT_TAIL(&root->root_suspend, TASK_ID(task), task_node);
1183: #ifdef HAVE_LIBPTHREAD
1184: pthread_mutex_unlock(&root->root_mtx[taskSUSPEND]);
1185: #endif
1186: } else
1.13 misho 1187: task = sched_unuseTask(task);
1.10 misho 1188:
1189: return task;
1190: }
1191:
1192: /*
1.1 misho 1193: * schedCallOnce() - Call once from scheduler
1.6 misho 1194: *
1.1 misho 1195: * @root = root task
1196: * @func = task execution function
1197: * @arg = 1st func argument
1.2 misho 1198: * @val = additional func argument
1.5 misho 1199: * @opt_data = Optional data
1200: * @opt_dlen = Optional data length
1.2 misho 1201: * return: return value from called func
1.1 misho 1202: */
1203: sched_task_t *
1.5 misho 1204: schedCallOnce(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val,
1205: void *opt_data, size_t opt_dlen)
1.1 misho 1206: {
1207: sched_task_t *task;
1.2 misho 1208: void *ret;
1.1 misho 1209:
1210: if (!root || !func)
1211: return NULL;
1212:
1213: /* get new task */
1.13 misho 1214: if (!(task = sched_useTask(root)))
1.4 misho 1215: return NULL;
1.1 misho 1216:
1217: task->task_func = func;
1.5 misho 1218: TASK_TYPE(task) = taskEVENT;
1219: TASK_ROOT(task) = root;
1.1 misho 1220:
1221: TASK_ARG(task) = arg;
1.2 misho 1222: TASK_VAL(task) = val;
1.1 misho 1223:
1.5 misho 1224: TASK_DATA(task) = opt_data;
1225: TASK_DATLEN(task) = opt_dlen;
1226:
1.2 misho 1227: ret = schedCall(task);
1.1 misho 1228:
1.13 misho 1229: sched_unuseTask(task);
1.2 misho 1230: return ret;
1.1 misho 1231: }
1.13 misho 1232:
1233: /*
1234: * schedThread() - Add thread task to scheduler queue
1235: *
1236: * @root = root task
1237: * @func = task execution function
1238: * @arg = 1st func argument
1239: * @detach = Detach thread from scheduler, if !=0
1.15 misho 1240: * @ss = stack size
1.13 misho 1241: * @opt_data = Optional data
1242: * @opt_dlen = Optional data length
1243: * return: NULL error or !=NULL new queued task
1244: */
1245: sched_task_t *
1246: schedThread(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int detach,
1.15 misho 1247: size_t ss, void *opt_data, size_t opt_dlen)
1.13 misho 1248: {
1249: #ifndef HAVE_LIBPTHREAD
1250: sched_SetErr(ENOTSUP, "Not supported thread tasks");
1251: return NULL;
1252: #endif
1253: sched_task_t *task;
1254: void *ptr;
1255: pthread_attr_t attr;
1.15 misho 1256: sem_t *s = NULL;
1.13 misho 1257:
1258: if (!root || !func)
1259: return NULL;
1.15 misho 1260: else {
1261: /* normalizing stack size & detach state */
1262: if (ss)
1263: ss &= 0x7FFFFFFF;
1264: detach = detach ? PTHREAD_CREATE_DETACHED : PTHREAD_CREATE_JOINABLE;
1265: }
1266:
1267: if (!(s = (sem_t*) malloc(sizeof(sem_t)))) {
1268: LOGERR;
1269: return NULL;
1270: }
1271: if (sem_init(s, 0, 1)) {
1272: LOGERR;
1273: free(s);
1274: return NULL;
1275: }
1.13 misho 1276:
1277: /* get new task */
1.15 misho 1278: if (!(task = sched_useTask(root))) {
1279: sem_destroy(s);
1280: free(s);
1281:
1.13 misho 1282: return NULL;
1.15 misho 1283: }
1.13 misho 1284:
1285: task->task_func = func;
1286: TASK_TYPE(task) = taskTHREAD;
1287: TASK_ROOT(task) = root;
1288:
1289: TASK_ARG(task) = arg;
1.15 misho 1290: TASK_FLAG(task) = detach;
1291: TASK_RET(task) = (intptr_t) s;
1.13 misho 1292:
1293: TASK_DATA(task) = opt_data;
1294: TASK_DATLEN(task) = opt_dlen;
1295:
1296: pthread_attr_init(&attr);
1.15 misho 1297: pthread_attr_setdetachstate(&attr, detach);
1298: if (ss && (errno = pthread_attr_setstacksize(&attr, ss))) {
1299: LOGERR;
1300: pthread_attr_destroy(&attr);
1301: sem_destroy(s);
1302: free(s);
1303: return sched_unuseTask(task);
1304: }
1305: if ((errno = pthread_attr_getstacksize(&attr, &ss))) {
1306: LOGERR;
1307: pthread_attr_destroy(&attr);
1308: sem_destroy(s);
1309: free(s);
1310: return sched_unuseTask(task);
1311: } else
1312: TASK_FLAG(task) |= (ss << 1);
1313: if ((errno = pthread_attr_setguardsize(&attr, ss))) {
1314: LOGERR;
1315: pthread_attr_destroy(&attr);
1316: sem_destroy(s);
1317: free(s);
1318: return sched_unuseTask(task);
1319: }
1320: #ifdef SCHED_RR
1321: pthread_attr_setschedpolicy(&attr, SCHED_RR);
1322: #else
1323: pthread_attr_setschedpolicy(&attr, SCHED_OTHER);
1324: #endif
1.13 misho 1325: if (root->root_hooks.hook_add.thread)
1326: ptr = root->root_hooks.hook_add.thread(task, &attr);
1327: else
1328: ptr = NULL;
1329: pthread_attr_destroy(&attr);
1330:
1331: if (!ptr) {
1332: pthread_mutex_lock(&root->root_mtx[taskTHREAD]);
1333: TAILQ_INSERT_TAIL(&root->root_thread, TASK_ID(task), task_node);
1334: pthread_mutex_unlock(&root->root_mtx[taskTHREAD]);
1.15 misho 1335:
1336: /* wait for init thread actions */
1337: sem_wait(s);
1.13 misho 1338: } else
1339: task = sched_unuseTask(task);
1340:
1.15 misho 1341: sem_destroy(s);
1342: free(s);
1.13 misho 1343: return task;
1344: }
1345:
1.17 misho 1346: /*
1347: * schedRTC() - Add RTC task to scheduler queue
1348: *
1349: * @root = root task
1350: * @func = task execution function
1351: * @arg = 1st func argument
1352: * @ts = timeout argument structure, minimum alarm timer resolution is 1msec!
1353: * @opt_data = Optional RTC ID
1.18 misho 1354: * @opt_dlen = Optional data length
1.17 misho 1355: * return: NULL error or !=NULL new queued task
1356: */
1357: sched_task_t *
1358: schedRTC(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,
1359: void *opt_data, size_t opt_dlen)
1360: {
1361: #if defined(HAVE_TIMER_CREATE) && defined(HAVE_TIMER_SETTIME)
1362: sched_task_t *task;
1363: void *ptr;
1364:
1365: if (!root || !func)
1366: return NULL;
1367:
1368: /* get new task */
1369: if (!(task = sched_useTask(root)))
1370: return NULL;
1371:
1372: task->task_func = func;
1373: TASK_TYPE(task) = taskRTC;
1374: TASK_ROOT(task) = root;
1375:
1376: TASK_ARG(task) = arg;
1377: TASK_TS(task) = ts;
1378:
1379: TASK_DATA(task) = opt_data;
1380: TASK_DATLEN(task) = opt_dlen;
1381:
1382: if (root->root_hooks.hook_add.rtc)
1383: ptr = root->root_hooks.hook_add.rtc(task, NULL);
1384: else
1385: ptr = NULL;
1386:
1387: if (!ptr) {
1388: #ifdef HAVE_LIBPTHREAD
1389: pthread_mutex_lock(&root->root_mtx[taskRTC]);
1390: #endif
1391: TAILQ_INSERT_TAIL(&root->root_rtc, TASK_ID(task), task_node);
1392: #ifdef HAVE_LIBPTHREAD
1393: pthread_mutex_unlock(&root->root_mtx[taskRTC]);
1394: #endif
1395: } else
1396: task = sched_unuseTask(task);
1397:
1398: return task;
1399: #else
1400: sched_SetErr(ENOTSUP, "Not supported realtime clock extensions");
1401: return NULL;
1402: #endif
1403: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>