Annotation of libaitsched/src/tasks.c, revision 1.22.8.1
1.1 misho 1: /*************************************************************************
2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.22.8.1! misho 6: * $Id: tasks.c,v 1.22 2013/09/02 11:21:22 misho Exp $
1.1 misho 7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
1.16 misho 15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013
1.1 misho 16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47:
48:
1.13 misho 49: /*
50: * sched_useTask() - Get and init new task
51: *
52: * @root = root task
53: * return: NULL error or !=NULL prepared task
54: */
1.16 misho 55: sched_task_t *
1.13 misho 56: sched_useTask(sched_root_task_t * __restrict root)
1.4 misho 57: {
1.7 misho 58: sched_task_t *task, *tmp;
1.4 misho 59:
1.14 misho 60: #ifdef HAVE_LIBPTHREAD
61: pthread_mutex_lock(&root->root_mtx[taskUNUSE]);
62: #endif
1.7 misho 63: TAILQ_FOREACH_SAFE(task, &root->root_unuse, task_node, tmp) {
1.4 misho 64: if (!TASK_ISLOCKED(task)) {
65: TAILQ_REMOVE(&root->root_unuse, task, task_node);
66: break;
67: }
68: }
1.14 misho 69: #ifdef HAVE_LIBPTHREAD
70: pthread_mutex_unlock(&root->root_mtx[taskUNUSE]);
71: #endif
1.4 misho 72:
73: if (!task) {
74: task = malloc(sizeof(sched_task_t));
75: if (!task) {
76: LOGERR;
77: return NULL;
78: }
79: }
80:
1.9 misho 81: memset(task, 0, sizeof(sched_task_t));
82: task->task_id = (uintptr_t) task;
1.4 misho 83: return task;
84: }
85:
1.13 misho 86: /*
87: * sched_unuseTask() - Unlock and put task to unuse queue
88: *
89: * @task = task
90: * return: always is NULL
91: */
1.16 misho 92: sched_task_t *
1.13 misho 93: sched_unuseTask(sched_task_t * __restrict task)
1.4 misho 94: {
95: TASK_UNLOCK(task);
1.5 misho 96: TASK_TYPE(task) = taskUNUSE;
97: #ifdef HAVE_LIBPTHREAD
98: pthread_mutex_lock(&TASK_ROOT(task)->root_mtx[taskUNUSE]);
99: #endif
1.9 misho 100: TAILQ_INSERT_TAIL(&TASK_ROOT(task)->root_unuse, TASK_ID(task), task_node);
1.5 misho 101: #ifdef HAVE_LIBPTHREAD
102: pthread_mutex_unlock(&TASK_ROOT(task)->root_mtx[taskUNUSE]);
103: #endif
1.4 misho 104: task = NULL;
105:
106: return task;
107: }
108:
1.14 misho 109: #pragma GCC visibility push(hidden)
110:
111: #ifdef HAVE_LIBPTHREAD
112: static void
113: _sched_threadCleanup(sched_task_t *t)
114: {
115: if (!t || !TASK_ROOT(t))
116: return;
117:
1.15 misho 118: pthread_mutex_lock(&TASK_ROOT(t)->root_mtx[taskTHREAD]);
119: TAILQ_REMOVE(&TASK_ROOT(t)->root_thread, t, task_node);
120: pthread_mutex_unlock(&TASK_ROOT(t)->root_mtx[taskTHREAD]);
1.14 misho 121: sched_unuseTask(t);
122: }
123: void *
124: _sched_threadWrapper(sched_task_t *t)
125: {
126: void *ret = NULL;
127:
1.21 misho 128: pthread_cleanup_push((void (*)(void*)) _sched_threadCleanup, t);
129:
130: if (!t || !TASK_ROOT(t))
1.14 misho 131: pthread_exit(ret);
132:
133: pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
1.21 misho 134: /*
1.14 misho 135: pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
1.21 misho 136: */
1.20 misho 137:
1.15 misho 138: /* notify parent, thread is ready for execution */
1.14 misho 139: pthread_testcancel();
140:
1.22 misho 141: ret = schedCall(t);
1.14 misho 142:
143: pthread_cleanup_pop(42);
144: TASK_ROOT(t)->root_ret = ret;
145: pthread_exit(ret);
146: }
147: #endif
148:
1.19 misho 149: #if defined(HAVE_TIMER_CREATE) && defined(HAVE_TIMER_SETTIME)
150: void *
151: _sched_rtcWrapper(sched_task_t *t)
152: {
153: sched_task_func_t func;
154: sched_task_t *task;
1.20 misho 155: sched_root_task_t *r;
1.19 misho 156:
157: if (!t || !TASK_ROOT(t) || !TASK_DATA(t))
158: return NULL;
159: else {
1.20 misho 160: r = TASK_ROOT(t);
1.19 misho 161: task = (sched_task_t*) TASK_DATA(t);
162: func = TASK_FUNC(task);
163: }
164:
1.20 misho 165: #ifdef HAVE_LIBPTHREAD
166: pthread_mutex_lock(&r->root_mtx[taskRTC]);
167: #endif
168: TAILQ_REMOVE(&r->root_rtc, task, task_node);
169: #ifdef HAVE_LIBPTHREAD
170: pthread_mutex_unlock(&r->root_mtx[taskRTC]);
171: #endif
1.21 misho 172: sched_unuseTask(task);
1.20 misho 173:
1.21 misho 174: timer_delete((timer_t) TASK_DATLEN(t));
1.20 misho 175:
1.22 misho 176: return schedCall(task);
1.19 misho 177: }
178: #endif
179:
1.14 misho 180: #pragma GCC visibility pop
181:
182: /*
183: * sched_taskExit() - Exit routine for scheduler task, explicit required for thread tasks
184: *
185: * @task = current task
186: * @retcode = return code
187: * return: return code
188: */
1.16 misho 189: void *
1.14 misho 190: sched_taskExit(sched_task_t *task, intptr_t retcode)
191: {
192: if (!task || !TASK_ROOT(task))
193: return (void*) -1;
194:
195: if (TASK_ROOT(task)->root_hooks.hook_exec.exit)
196: TASK_ROOT(task)->root_hooks.hook_exec.exit(task, (void*) retcode);
197:
198: TASK_ROOT(task)->root_ret = (void*) retcode;
199: return (void*) retcode;
200: }
201:
1.4 misho 202:
1.1 misho 203: /*
1.2 misho 204: * schedRead() - Add READ I/O task to scheduler queue
1.6 misho 205: *
1.1 misho 206: * @root = root task
207: * @func = task execution function
208: * @arg = 1st func argument
1.2 misho 209: * @fd = fd handle
1.5 misho 210: * @opt_data = Optional data
211: * @opt_dlen = Optional data length
1.1 misho 212: * return: NULL error or !=NULL new queued task
213: */
214: sched_task_t *
1.5 misho 215: schedRead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
216: void *opt_data, size_t opt_dlen)
1.1 misho 217: {
218: sched_task_t *task;
219: void *ptr;
220:
221: if (!root || !func)
222: return NULL;
223:
224: /* get new task */
1.13 misho 225: if (!(task = sched_useTask(root)))
1.4 misho 226: return NULL;
1.1 misho 227:
228: task->task_func = func;
1.5 misho 229: TASK_TYPE(task) = taskREAD;
230: TASK_ROOT(task) = root;
1.1 misho 231:
232: TASK_ARG(task) = arg;
1.2 misho 233: TASK_FD(task) = fd;
1.1 misho 234:
1.5 misho 235: TASK_DATA(task) = opt_data;
236: TASK_DATLEN(task) = opt_dlen;
237:
1.1 misho 238: if (root->root_hooks.hook_add.read)
239: ptr = root->root_hooks.hook_add.read(task, NULL);
240: else
241: ptr = NULL;
242:
1.5 misho 243: if (!ptr) {
244: #ifdef HAVE_LIBPTHREAD
245: pthread_mutex_lock(&root->root_mtx[taskREAD]);
246: #endif
1.9 misho 247: TAILQ_INSERT_TAIL(&root->root_read, TASK_ID(task), task_node);
1.5 misho 248: #ifdef HAVE_LIBPTHREAD
249: pthread_mutex_unlock(&root->root_mtx[taskREAD]);
250: #endif
251: } else
1.13 misho 252: task = sched_unuseTask(task);
1.1 misho 253:
254: return task;
255: }
256:
257: /*
1.2 misho 258: * schedWrite() - Add WRITE I/O task to scheduler queue
1.6 misho 259: *
1.1 misho 260: * @root = root task
261: * @func = task execution function
262: * @arg = 1st func argument
1.2 misho 263: * @fd = fd handle
1.5 misho 264: * @opt_data = Optional data
265: * @opt_dlen = Optional data length
1.1 misho 266: * return: NULL error or !=NULL new queued task
267: */
268: sched_task_t *
1.5 misho 269: schedWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
270: void *opt_data, size_t opt_dlen)
1.1 misho 271: {
272: sched_task_t *task;
273: void *ptr;
274:
275: if (!root || !func)
276: return NULL;
277:
278: /* get new task */
1.13 misho 279: if (!(task = sched_useTask(root)))
1.4 misho 280: return NULL;
1.1 misho 281:
282: task->task_func = func;
1.5 misho 283: TASK_TYPE(task) = taskWRITE;
284: TASK_ROOT(task) = root;
1.1 misho 285:
286: TASK_ARG(task) = arg;
1.2 misho 287: TASK_FD(task) = fd;
1.1 misho 288:
1.5 misho 289: TASK_DATA(task) = opt_data;
290: TASK_DATLEN(task) = opt_dlen;
291:
1.1 misho 292: if (root->root_hooks.hook_add.write)
293: ptr = root->root_hooks.hook_add.write(task, NULL);
294: else
295: ptr = NULL;
296:
1.5 misho 297: if (!ptr) {
298: #ifdef HAVE_LIBPTHREAD
299: pthread_mutex_lock(&root->root_mtx[taskWRITE]);
300: #endif
1.9 misho 301: TAILQ_INSERT_TAIL(&root->root_write, TASK_ID(task), task_node);
1.5 misho 302: #ifdef HAVE_LIBPTHREAD
303: pthread_mutex_unlock(&root->root_mtx[taskWRITE]);
304: #endif
305: } else
1.13 misho 306: task = sched_unuseTask(task);
1.1 misho 307:
308: return task;
309: }
310:
311: /*
1.9 misho 312: * schedNode() - Add NODE task to scheduler queue
313: *
314: * @root = root task
315: * @func = task execution function
316: * @arg = 1st func argument
317: * @fd = fd handle
318: * @opt_data = Optional data
319: * @opt_dlen = Optional data length
320: * return: NULL error or !=NULL new queued task
321: */
322: sched_task_t *
323: schedNode(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
324: void *opt_data, size_t opt_dlen)
325: {
1.22.8.1! misho 326: #ifdef KQ_DISABLE
! 327: sched_SetErr(ENOTSUP, "disabled kqueue support");
! 328: return NULL;
! 329: #else
1.9 misho 330: sched_task_t *task;
331: void *ptr;
332:
333: if (!root || !func)
334: return NULL;
335:
336: /* get new task */
1.13 misho 337: if (!(task = sched_useTask(root)))
1.9 misho 338: return NULL;
339:
340: task->task_func = func;
341: TASK_TYPE(task) = taskNODE;
342: TASK_ROOT(task) = root;
343:
344: TASK_ARG(task) = arg;
345: TASK_FD(task) = fd;
346:
347: TASK_DATA(task) = opt_data;
348: TASK_DATLEN(task) = opt_dlen;
349:
350: if (root->root_hooks.hook_add.node)
351: ptr = root->root_hooks.hook_add.node(task, NULL);
352: else
353: ptr = NULL;
354:
355: if (!ptr) {
356: #ifdef HAVE_LIBPTHREAD
357: pthread_mutex_lock(&root->root_mtx[taskNODE]);
358: #endif
359: TAILQ_INSERT_TAIL(&root->root_node, TASK_ID(task), task_node);
360: #ifdef HAVE_LIBPTHREAD
361: pthread_mutex_unlock(&root->root_mtx[taskNODE]);
362: #endif
363: } else
1.13 misho 364: task = sched_unuseTask(task);
1.9 misho 365:
366: return task;
1.22.8.1! misho 367: #endif /* KQ_DISABLE */
1.9 misho 368: }
369:
370: /*
371: * schedProc() - Add PROC task to scheduler queue
372: *
373: * @root = root task
374: * @func = task execution function
375: * @arg = 1st func argument
376: * @pid = PID
377: * @opt_data = Optional data
378: * @opt_dlen = Optional data length
379: * return: NULL error or !=NULL new queued task
380: */
381: sched_task_t *
382: schedProc(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long pid,
383: void *opt_data, size_t opt_dlen)
384: {
1.22.8.1! misho 385: #ifdef KQ_DISABLE
! 386: sched_SetErr(ENOTSUP, "disabled kqueue support");
! 387: return NULL;
! 388: #else
1.9 misho 389: sched_task_t *task;
390: void *ptr;
391:
392: if (!root || !func)
393: return NULL;
394:
395: /* get new task */
1.13 misho 396: if (!(task = sched_useTask(root)))
1.9 misho 397: return NULL;
398:
399: task->task_func = func;
400: TASK_TYPE(task) = taskPROC;
401: TASK_ROOT(task) = root;
402:
403: TASK_ARG(task) = arg;
404: TASK_VAL(task) = pid;
405:
406: TASK_DATA(task) = opt_data;
407: TASK_DATLEN(task) = opt_dlen;
408:
409: if (root->root_hooks.hook_add.proc)
410: ptr = root->root_hooks.hook_add.proc(task, NULL);
411: else
412: ptr = NULL;
413:
414: if (!ptr) {
415: #ifdef HAVE_LIBPTHREAD
416: pthread_mutex_lock(&root->root_mtx[taskPROC]);
417: #endif
418: TAILQ_INSERT_TAIL(&root->root_proc, TASK_ID(task), task_node);
419: #ifdef HAVE_LIBPTHREAD
420: pthread_mutex_unlock(&root->root_mtx[taskPROC]);
421: #endif
422: } else
1.13 misho 423: task = sched_unuseTask(task);
1.9 misho 424:
425: return task;
1.22.8.1! misho 426: #endif /* KQ_DISABLE */
1.9 misho 427: }
428:
429: /*
430: * schedUser() - Add trigger USER task to scheduler queue
431: *
432: * @root = root task
433: * @func = task execution function
434: * @arg = 1st func argument
435: * @id = Trigger ID
436: * @opt_data = Optional data
437: * @opt_dlen = Optional user's trigger flags
438: * return: NULL error or !=NULL new queued task
439: */
440: sched_task_t *
441: schedUser(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id,
442: void *opt_data, size_t opt_dlen)
443: {
1.22.8.1! misho 444: #ifdef KQ_DISABLE
! 445: sched_SetErr(ENOTSUP, "disabled kqueue support");
! 446: return NULL;
! 447: #else
1.9 misho 448: #ifndef EVFILT_USER
449: sched_SetErr(ENOTSUP, "Not supported kevent() filter");
450: return NULL;
451: #else
452: sched_task_t *task;
453: void *ptr;
454:
455: if (!root || !func)
456: return NULL;
457:
458: /* get new task */
1.13 misho 459: if (!(task = sched_useTask(root)))
1.9 misho 460: return NULL;
461:
462: task->task_func = func;
463: TASK_TYPE(task) = taskUSER;
464: TASK_ROOT(task) = root;
465:
466: TASK_ARG(task) = arg;
467: TASK_VAL(task) = id;
468:
469: TASK_DATA(task) = opt_data;
470: TASK_DATLEN(task) = opt_dlen;
471:
472: if (root->root_hooks.hook_add.user)
473: ptr = root->root_hooks.hook_add.user(task, NULL);
474: else
475: ptr = NULL;
476:
477: if (!ptr) {
478: #ifdef HAVE_LIBPTHREAD
479: pthread_mutex_lock(&root->root_mtx[taskUSER]);
480: #endif
481: TAILQ_INSERT_TAIL(&root->root_user, TASK_ID(task), task_node);
482: #ifdef HAVE_LIBPTHREAD
483: pthread_mutex_unlock(&root->root_mtx[taskUSER]);
484: #endif
485: } else
1.13 misho 486: task = sched_unuseTask(task);
1.9 misho 487:
488: return task;
1.22.8.1! misho 489: #endif /* EVFILT_USER */
! 490: #endif /* KQ_DISABLE */
1.9 misho 491: }
492:
493: /*
494: * schedSignal() - Add SIGNAL task to scheduler queue
495: *
496: * @root = root task
497: * @func = task execution function
498: * @arg = 1st func argument
499: * @sig = Signal
500: * @opt_data = Optional data
501: * @opt_dlen = Optional data length
502: * return: NULL error or !=NULL new queued task
503: */
504: sched_task_t *
505: schedSignal(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long sig,
506: void *opt_data, size_t opt_dlen)
507: {
508: sched_task_t *task;
509: void *ptr;
510:
511: if (!root || !func)
512: return NULL;
513:
514: /* get new task */
1.13 misho 515: if (!(task = sched_useTask(root)))
1.9 misho 516: return NULL;
517:
518: task->task_func = func;
519: TASK_TYPE(task) = taskSIGNAL;
520: TASK_ROOT(task) = root;
521:
522: TASK_ARG(task) = arg;
523: TASK_VAL(task) = sig;
524:
525: TASK_DATA(task) = opt_data;
526: TASK_DATLEN(task) = opt_dlen;
527:
528: if (root->root_hooks.hook_add.signal)
529: ptr = root->root_hooks.hook_add.signal(task, NULL);
530: else
531: ptr = NULL;
532:
533: if (!ptr) {
534: #ifdef HAVE_LIBPTHREAD
535: pthread_mutex_lock(&root->root_mtx[taskSIGNAL]);
536: #endif
537: TAILQ_INSERT_TAIL(&root->root_signal, TASK_ID(task), task_node);
538: #ifdef HAVE_LIBPTHREAD
539: pthread_mutex_unlock(&root->root_mtx[taskSIGNAL]);
540: #endif
541: } else
1.13 misho 542: task = sched_unuseTask(task);
1.9 misho 543:
544: return task;
545: }
546:
547: /*
1.8 misho 548: * schedAlarm() - Add ALARM task to scheduler queue
549: *
550: * @root = root task
551: * @func = task execution function
552: * @arg = 1st func argument
553: * @ts = timeout argument structure, minimum alarm timer resolution is 1msec!
1.17 misho 554: * @opt_data = Alarm timer ID
1.8 misho 555: * @opt_dlen = Optional data length
556: * return: NULL error or !=NULL new queued task
557: */
558: sched_task_t *
559: schedAlarm(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,
560: void *opt_data, size_t opt_dlen)
561: {
1.22.8.1! misho 562: #ifdef KQ_DISABLE
! 563: sched_SetErr(ENOTSUP, "disabled kqueue support");
! 564: return NULL;
! 565: #else
1.8 misho 566: sched_task_t *task;
567: void *ptr;
568:
569: if (!root || !func)
570: return NULL;
571:
572: /* get new task */
1.13 misho 573: if (!(task = sched_useTask(root)))
1.8 misho 574: return NULL;
575:
576: task->task_func = func;
577: TASK_TYPE(task) = taskALARM;
578: TASK_ROOT(task) = root;
579:
580: TASK_ARG(task) = arg;
581: TASK_TS(task) = ts;
582:
583: TASK_DATA(task) = opt_data;
584: TASK_DATLEN(task) = opt_dlen;
585:
586: if (root->root_hooks.hook_add.alarm)
587: ptr = root->root_hooks.hook_add.alarm(task, NULL);
588: else
589: ptr = NULL;
590:
591: if (!ptr) {
592: #ifdef HAVE_LIBPTHREAD
593: pthread_mutex_lock(&root->root_mtx[taskALARM]);
594: #endif
1.9 misho 595: TAILQ_INSERT_TAIL(&root->root_alarm, TASK_ID(task), task_node);
1.8 misho 596: #ifdef HAVE_LIBPTHREAD
597: pthread_mutex_unlock(&root->root_mtx[taskALARM]);
598: #endif
599: } else
1.13 misho 600: task = sched_unuseTask(task);
1.8 misho 601:
602: return task;
1.22.8.1! misho 603: #endif /* KQ_DISABLE */
1.8 misho 604: }
605:
1.11 misho 606: #ifdef AIO_SUPPORT
607: /*
608: * schedAIO() - Add AIO task to scheduler queue
609: *
610: * @root = root task
611: * @func = task execution function
612: * @arg = 1st func argument
613: * @acb = AIO cb structure address
614: * @opt_data = Optional data
615: * @opt_dlen = Optional data length
616: * return: NULL error or !=NULL new queued task
617: */
618: sched_task_t *
619: schedAIO(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg,
620: struct aiocb * __restrict acb, void *opt_data, size_t opt_dlen)
621: {
1.22.8.1! misho 622: #ifdef KQ_DISABLE
! 623: sched_SetErr(ENOTSUP, "disabled kqueue support");
! 624: return NULL;
! 625: #else
1.11 misho 626: sched_task_t *task;
627: void *ptr;
628:
629: if (!root || !func || !acb || !opt_dlen)
630: return NULL;
631:
632: /* get new task */
1.13 misho 633: if (!(task = sched_useTask(root)))
1.11 misho 634: return NULL;
635:
636: task->task_func = func;
637: TASK_TYPE(task) = taskAIO;
638: TASK_ROOT(task) = root;
639:
640: TASK_ARG(task) = arg;
641: TASK_VAL(task) = (u_long) acb;
642:
643: TASK_DATA(task) = opt_data;
644: TASK_DATLEN(task) = opt_dlen;
645:
646: if (root->root_hooks.hook_add.aio)
647: ptr = root->root_hooks.hook_add.aio(task, NULL);
648: else
649: ptr = NULL;
650:
651: if (!ptr) {
652: #ifdef HAVE_LIBPTHREAD
653: pthread_mutex_lock(&root->root_mtx[taskAIO]);
654: #endif
655: TAILQ_INSERT_TAIL(&root->root_aio, TASK_ID(task), task_node);
656: #ifdef HAVE_LIBPTHREAD
657: pthread_mutex_unlock(&root->root_mtx[taskAIO]);
658: #endif
659: } else
1.13 misho 660: task = sched_unuseTask(task);
1.11 misho 661:
662: return task;
1.22.8.1! misho 663: #endif /* KQ_DISABLE */
1.11 misho 664: }
665:
666: /*
667: * schedAIORead() - Add AIO read task to scheduler queue
668: *
669: * @root = root task
670: * @func = task execution function
671: * @arg = 1st func argument
672: * @fd = file descriptor
673: * @buffer = Buffer
674: * @buflen = Buffer length
675: * @offset = Offset from start of file, if =-1 from current position
676: * return: NULL error or !=NULL new queued task
677: */
1.16 misho 678: sched_task_t *
1.11 misho 679: schedAIORead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
680: void *buffer, size_t buflen, off_t offset)
681: {
1.22.8.1! misho 682: #ifdef KQ_DISABLE
! 683: sched_SetErr(ENOTSUP, "disabled kqueue support");
! 684: return NULL;
! 685: #else
1.11 misho 686: struct aiocb *acb;
687: off_t off;
688:
689: if (!root || !func || !buffer || !buflen)
690: return NULL;
691:
692: if (offset == (off_t) -1) {
693: off = lseek(fd, 0, SEEK_CUR);
694: if (off == -1) {
695: LOGERR;
696: return NULL;
697: }
698: } else
699: off = offset;
700:
701: if (!(acb = malloc(sizeof(struct aiocb)))) {
702: LOGERR;
703: return NULL;
704: } else
705: memset(acb, 0, sizeof(struct aiocb));
706:
707: acb->aio_fildes = fd;
708: acb->aio_nbytes = buflen;
709: acb->aio_buf = buffer;
710: acb->aio_offset = off;
711: acb->aio_sigevent.sigev_notify = SIGEV_KEVENT;
712: acb->aio_sigevent.sigev_notify_kqueue = root->root_kq;
713: acb->aio_sigevent.sigev_value.sival_ptr = acb;
714:
715: if (aio_read(acb)) {
716: LOGERR;
717: free(acb);
718: return NULL;
719: }
720:
721: return schedAIO(root, func, arg, acb, buffer, buflen);
1.22.8.1! misho 722: #endif /* KQ_DISABLE */
1.11 misho 723: }
724:
725: /*
726: * schedAIOWrite() - Add AIO write task to scheduler queue
727: *
728: * @root = root task
729: * @func = task execution function
730: * @arg = 1st func argument
731: * @fd = file descriptor
732: * @buffer = Buffer
733: * @buflen = Buffer length
734: * @offset = Offset from start of file, if =-1 from current position
735: * return: NULL error or !=NULL new queued task
736: */
1.16 misho 737: sched_task_t *
1.11 misho 738: schedAIOWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
739: void *buffer, size_t buflen, off_t offset)
740: {
1.22.8.1! misho 741: #ifdef KQ_DISABLE
! 742: sched_SetErr(ENOTSUP, "disabled kqueue support");
! 743: return NULL;
! 744: #else
1.11 misho 745: struct aiocb *acb;
746: off_t off;
747:
748: if (!root || !func || !buffer || !buflen)
749: return NULL;
750:
751: if (offset == (off_t) -1) {
752: off = lseek(fd, 0, SEEK_CUR);
753: if (off == -1) {
754: LOGERR;
755: return NULL;
756: }
757: } else
758: off = offset;
759:
760: if (!(acb = malloc(sizeof(struct aiocb)))) {
761: LOGERR;
762: return NULL;
763: } else
764: memset(acb, 0, sizeof(struct aiocb));
765:
766: acb->aio_fildes = fd;
767: acb->aio_nbytes = buflen;
768: acb->aio_buf = buffer;
769: acb->aio_offset = off;
770: acb->aio_sigevent.sigev_notify = SIGEV_KEVENT;
771: acb->aio_sigevent.sigev_notify_kqueue = root->root_kq;
772: acb->aio_sigevent.sigev_value.sival_ptr = acb;
773:
774: if (aio_write(acb)) {
775: LOGERR;
776: free(acb);
777: return NULL;
778: }
779:
780: return schedAIO(root, func, arg, acb, buffer, buflen);
1.22.8.1! misho 781: #endif /* KQ_DISABLE */
1.11 misho 782: }
783:
784: #ifdef EVFILT_LIO
785: /*
786: * schedLIO() - Add AIO bulk tasks to scheduler queue
787: *
788: * @root = root task
789: * @func = task execution function
790: * @arg = 1st func argument
791: * @acbs = AIO cb structure addresses
792: * @opt_data = Optional data
793: * @opt_dlen = Optional data length
794: * return: NULL error or !=NULL new queued task
795: */
796: sched_task_t *
797: schedLIO(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg,
798: struct aiocb ** __restrict acbs, void *opt_data, size_t opt_dlen)
799: {
1.22.8.1! misho 800: #ifdef KQ_DISABLE
! 801: sched_SetErr(ENOTSUP, "disabled kqueue support");
! 802: return NULL;
! 803: #else
1.11 misho 804: sched_task_t *task;
805: void *ptr;
806:
807: if (!root || !func || !acbs || !opt_dlen)
808: return NULL;
809:
810: /* get new task */
1.13 misho 811: if (!(task = sched_useTask(root)))
1.11 misho 812: return NULL;
813:
814: task->task_func = func;
815: TASK_TYPE(task) = taskLIO;
816: TASK_ROOT(task) = root;
817:
818: TASK_ARG(task) = arg;
819: TASK_VAL(task) = (u_long) acbs;
820:
821: TASK_DATA(task) = opt_data;
822: TASK_DATLEN(task) = opt_dlen;
823:
824: if (root->root_hooks.hook_add.lio)
825: ptr = root->root_hooks.hook_add.lio(task, NULL);
826: else
827: ptr = NULL;
828:
829: if (!ptr) {
830: #ifdef HAVE_LIBPTHREAD
831: pthread_mutex_lock(&root->root_mtx[taskLIO]);
832: #endif
833: TAILQ_INSERT_TAIL(&root->root_lio, TASK_ID(task), task_node);
834: #ifdef HAVE_LIBPTHREAD
835: pthread_mutex_unlock(&root->root_mtx[taskLIO]);
836: #endif
837: } else
1.13 misho 838: task = sched_unuseTask(task);
1.11 misho 839:
840: return task;
1.22.8.1! misho 841: #endif /* KQ_DISABLE */
1.11 misho 842: }
843:
844: /*
845: * schedLIORead() - Add list of AIO read tasks to scheduler queue
846: *
847: * @root = root task
848: * @func = task execution function
849: * @arg = 1st func argument
850: * @fd = file descriptor
851: * @bufs = Buffer's list
852: * @nbufs = Number of Buffers
853: * @offset = Offset from start of file, if =-1 from current position
854: * return: NULL error or !=NULL new queued task
855: */
856: sched_task_t *
857: schedLIORead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
858: struct iovec *bufs, size_t nbufs, off_t offset)
859: {
1.22.8.1! misho 860: #ifdef KQ_DISABLE
! 861: sched_SetErr(ENOTSUP, "disabled kqueue support");
! 862: return NULL;
! 863: #else
1.11 misho 864: struct sigevent sig;
865: struct aiocb **acb;
866: off_t off;
867: register int i;
868:
869: if (!root || !func || !bufs || !nbufs)
870: return NULL;
871:
872: if (offset == (off_t) -1) {
873: off = lseek(fd, 0, SEEK_CUR);
874: if (off == -1) {
875: LOGERR;
876: return NULL;
877: }
878: } else
879: off = offset;
880:
881: if (!(acb = calloc(sizeof(void*), nbufs))) {
882: LOGERR;
883: return NULL;
884: } else
885: memset(acb, 0, sizeof(void*) * nbufs);
886: for (i = 0; i < nbufs; off += bufs[i++].iov_len) {
887: acb[i] = malloc(sizeof(struct aiocb));
888: if (!acb[i]) {
889: LOGERR;
890: for (i = 0; i < nbufs; i++)
891: if (acb[i])
892: free(acb[i]);
893: free(acb);
894: return NULL;
895: } else
896: memset(acb[i], 0, sizeof(struct aiocb));
897: acb[i]->aio_fildes = fd;
898: acb[i]->aio_nbytes = bufs[i].iov_len;
899: acb[i]->aio_buf = bufs[i].iov_base;
900: acb[i]->aio_offset = off;
901: acb[i]->aio_lio_opcode = LIO_READ;
902: }
903: memset(&sig, 0, sizeof sig);
904: sig.sigev_notify = SIGEV_KEVENT;
905: sig.sigev_notify_kqueue = root->root_kq;
906: sig.sigev_value.sival_ptr = acb;
907:
908: if (lio_listio(LIO_NOWAIT, acb, nbufs, &sig)) {
909: LOGERR;
910: for (i = 0; i < nbufs; i++)
911: if (acb[i])
912: free(acb[i]);
913: free(acb);
914: return NULL;
915: }
916:
917: return schedLIO(root, func, arg, (void*) acb, bufs, nbufs);
1.22.8.1! misho 918: #endif /* KQ_DISABLE */
1.11 misho 919: }
920:
921: /*
922: * schedLIOWrite() - Add list of AIO write tasks to scheduler queue
923: *
924: * @root = root task
925: * @func = task execution function
926: * @arg = 1st func argument
927: * @fd = file descriptor
928: * @bufs = Buffer's list
929: * @nbufs = Number of Buffers
930: * @offset = Offset from start of file, if =-1 from current position
931: * return: NULL error or !=NULL new queued task
932: */
1.16 misho 933: sched_task_t *
1.11 misho 934: schedLIOWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
935: struct iovec *bufs, size_t nbufs, off_t offset)
936: {
1.22.8.1! misho 937: #ifdef KQ_DISABLE
! 938: sched_SetErr(ENOTSUP, "disabled kqueue support");
! 939: return NULL;
! 940: #else
1.11 misho 941: struct sigevent sig;
942: struct aiocb **acb;
943: off_t off;
944: register int i;
945:
946: if (!root || !func || !bufs || !nbufs)
947: return NULL;
948:
949: if (offset == (off_t) -1) {
950: off = lseek(fd, 0, SEEK_CUR);
951: if (off == -1) {
952: LOGERR;
953: return NULL;
954: }
955: } else
956: off = offset;
957:
958: if (!(acb = calloc(sizeof(void*), nbufs))) {
959: LOGERR;
960: return NULL;
961: } else
962: memset(acb, 0, sizeof(void*) * nbufs);
963: for (i = 0; i < nbufs; off += bufs[i++].iov_len) {
964: acb[i] = malloc(sizeof(struct aiocb));
965: if (!acb[i]) {
966: LOGERR;
967: for (i = 0; i < nbufs; i++)
968: if (acb[i])
969: free(acb[i]);
970: free(acb);
971: return NULL;
972: } else
973: memset(acb[i], 0, sizeof(struct aiocb));
974: acb[i]->aio_fildes = fd;
975: acb[i]->aio_nbytes = bufs[i].iov_len;
976: acb[i]->aio_buf = bufs[i].iov_base;
977: acb[i]->aio_offset = off;
978: acb[i]->aio_lio_opcode = LIO_WRITE;
979: }
980: memset(&sig, 0, sizeof sig);
981: sig.sigev_notify = SIGEV_KEVENT;
982: sig.sigev_notify_kqueue = root->root_kq;
983: sig.sigev_value.sival_ptr = acb;
984:
985: if (lio_listio(LIO_NOWAIT, acb, nbufs, &sig)) {
986: LOGERR;
987: for (i = 0; i < nbufs; i++)
988: if (acb[i])
989: free(acb[i]);
990: free(acb);
991: return NULL;
992: }
993:
994: return schedLIO(root, func, arg, (void*) acb, bufs, nbufs);
1.22.8.1! misho 995: #endif /* KQ_DISABLE */
1.11 misho 996: }
997: #endif /* EVFILT_LIO */
998: #endif /* AIO_SUPPORT */
999:
1.8 misho 1000: /*
1.1 misho 1001: * schedTimer() - Add TIMER task to scheduler queue
1.6 misho 1002: *
1.1 misho 1003: * @root = root task
1004: * @func = task execution function
1005: * @arg = 1st func argument
1.5 misho 1006: * @ts = timeout argument structure
1007: * @opt_data = Optional data
1008: * @opt_dlen = Optional data length
1.1 misho 1009: * return: NULL error or !=NULL new queued task
1010: */
1011: sched_task_t *
1.5 misho 1012: schedTimer(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,
1013: void *opt_data, size_t opt_dlen)
1.1 misho 1014: {
1.9 misho 1015: sched_task_t *task, *tmp, *t = NULL;
1.1 misho 1016: void *ptr;
1.5 misho 1017: struct timespec now;
1.1 misho 1018:
1019: if (!root || !func)
1020: return NULL;
1021:
1022: /* get new task */
1.13 misho 1023: if (!(task = sched_useTask(root)))
1.4 misho 1024: return NULL;
1.1 misho 1025:
1026: task->task_func = func;
1.5 misho 1027: TASK_TYPE(task) = taskTIMER;
1028: TASK_ROOT(task) = root;
1.1 misho 1029:
1030: TASK_ARG(task) = arg;
1031:
1.5 misho 1032: TASK_DATA(task) = opt_data;
1033: TASK_DATLEN(task) = opt_dlen;
1034:
1.1 misho 1035: /* calculate timeval structure */
1.5 misho 1036: clock_gettime(CLOCK_MONOTONIC, &now);
1037: now.tv_sec += ts.tv_sec;
1038: now.tv_nsec += ts.tv_nsec;
1039: if (now.tv_nsec >= 1000000000L) {
1.1 misho 1040: now.tv_sec++;
1.5 misho 1041: now.tv_nsec -= 1000000000L;
1042: } else if (now.tv_nsec < 0) {
1.1 misho 1043: now.tv_sec--;
1.5 misho 1044: now.tv_nsec += 1000000000L;
1.1 misho 1045: }
1.5 misho 1046: TASK_TS(task) = now;
1.1 misho 1047:
1048: if (root->root_hooks.hook_add.timer)
1049: ptr = root->root_hooks.hook_add.timer(task, NULL);
1050: else
1051: ptr = NULL;
1052:
1053: if (!ptr) {
1.5 misho 1054: #ifdef HAVE_LIBPTHREAD
1055: pthread_mutex_lock(&root->root_mtx[taskTIMER]);
1056: #endif
1.1 misho 1057: #ifdef TIMER_WITHOUT_SORT
1.9 misho 1058: TAILQ_INSERT_TAIL(&root->root_timer, TASK_ID(task), task_node);
1.1 misho 1059: #else
1.9 misho 1060: TAILQ_FOREACH_SAFE(t, &root->root_timer, task_node, tmp)
1.5 misho 1061: if (sched_timespeccmp(&TASK_TS(task), &TASK_TS(t), -) < 1)
1.1 misho 1062: break;
1063: if (!t)
1.9 misho 1064: TAILQ_INSERT_TAIL(&root->root_timer, TASK_ID(task), task_node);
1.1 misho 1065: else
1.9 misho 1066: TAILQ_INSERT_BEFORE(t, TASK_ID(task), task_node);
1.1 misho 1067: #endif
1.5 misho 1068: #ifdef HAVE_LIBPTHREAD
1069: pthread_mutex_unlock(&root->root_mtx[taskTIMER]);
1070: #endif
1.4 misho 1071: } else
1.13 misho 1072: task = sched_unuseTask(task);
1.1 misho 1073:
1074: return task;
1075: }
1076:
1077: /*
1078: * schedEvent() - Add EVENT task to scheduler queue
1.6 misho 1079: *
1.1 misho 1080: * @root = root task
1081: * @func = task execution function
1082: * @arg = 1st func argument
1.2 misho 1083: * @val = additional func argument
1.5 misho 1084: * @opt_data = Optional data
1085: * @opt_dlen = Optional data length
1.1 misho 1086: * return: NULL error or !=NULL new queued task
1087: */
1088: sched_task_t *
1.5 misho 1089: schedEvent(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val,
1090: void *opt_data, size_t opt_dlen)
1.1 misho 1091: {
1092: sched_task_t *task;
1093: void *ptr;
1094:
1095: if (!root || !func)
1096: return NULL;
1097:
1098: /* get new task */
1.13 misho 1099: if (!(task = sched_useTask(root)))
1.4 misho 1100: return NULL;
1.1 misho 1101:
1102: task->task_func = func;
1.5 misho 1103: TASK_TYPE(task) = taskEVENT;
1104: TASK_ROOT(task) = root;
1.1 misho 1105:
1106: TASK_ARG(task) = arg;
1.2 misho 1107: TASK_VAL(task) = val;
1.1 misho 1108:
1.5 misho 1109: TASK_DATA(task) = opt_data;
1110: TASK_DATLEN(task) = opt_dlen;
1111:
1.1 misho 1112: if (root->root_hooks.hook_add.event)
1113: ptr = root->root_hooks.hook_add.event(task, NULL);
1114: else
1115: ptr = NULL;
1116:
1.5 misho 1117: if (!ptr) {
1118: #ifdef HAVE_LIBPTHREAD
1119: pthread_mutex_lock(&root->root_mtx[taskEVENT]);
1120: #endif
1.9 misho 1121: TAILQ_INSERT_TAIL(&root->root_event, TASK_ID(task), task_node);
1.5 misho 1122: #ifdef HAVE_LIBPTHREAD
1123: pthread_mutex_unlock(&root->root_mtx[taskEVENT]);
1124: #endif
1125: } else
1.13 misho 1126: task = sched_unuseTask(task);
1.1 misho 1127:
1128: return task;
1129: }
1130:
1131:
1132: /*
1.12 misho 1133: * schedTask() - Add regular task to scheduler queue
1.6 misho 1134: *
1.1 misho 1135: * @root = root task
1136: * @func = task execution function
1137: * @arg = 1st func argument
1.12 misho 1138: * @prio = regular task priority, 0 is hi priority for regular tasks
1.5 misho 1139: * @opt_data = Optional data
1140: * @opt_dlen = Optional data length
1.1 misho 1141: * return: NULL error or !=NULL new queued task
1142: */
1143: sched_task_t *
1.12 misho 1144: schedTask(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long prio,
1.5 misho 1145: void *opt_data, size_t opt_dlen)
1.1 misho 1146: {
1.12 misho 1147: sched_task_t *task, *tmp, *t = NULL;
1.1 misho 1148: void *ptr;
1149:
1150: if (!root || !func)
1151: return NULL;
1152:
1153: /* get new task */
1.13 misho 1154: if (!(task = sched_useTask(root)))
1.4 misho 1155: return NULL;
1.1 misho 1156:
1157: task->task_func = func;
1.12 misho 1158: TASK_TYPE(task) = taskTASK;
1.5 misho 1159: TASK_ROOT(task) = root;
1.1 misho 1160:
1161: TASK_ARG(task) = arg;
1.12 misho 1162: TASK_VAL(task) = prio;
1.1 misho 1163:
1.5 misho 1164: TASK_DATA(task) = opt_data;
1165: TASK_DATLEN(task) = opt_dlen;
1166:
1.12 misho 1167: if (root->root_hooks.hook_add.task)
1168: ptr = root->root_hooks.hook_add.task(task, NULL);
1.1 misho 1169: else
1170: ptr = NULL;
1171:
1.5 misho 1172: if (!ptr) {
1173: #ifdef HAVE_LIBPTHREAD
1.12 misho 1174: pthread_mutex_lock(&root->root_mtx[taskTASK]);
1.5 misho 1175: #endif
1.12 misho 1176: TAILQ_FOREACH_SAFE(t, &root->root_task, task_node, tmp)
1177: if (TASK_VAL(task) < TASK_VAL(t))
1178: break;
1179: if (!t)
1180: TAILQ_INSERT_TAIL(&root->root_task, TASK_ID(task), task_node);
1181: else
1182: TAILQ_INSERT_BEFORE(t, TASK_ID(task), task_node);
1.5 misho 1183: #ifdef HAVE_LIBPTHREAD
1.12 misho 1184: pthread_mutex_unlock(&root->root_mtx[taskTASK]);
1.5 misho 1185: #endif
1186: } else
1.13 misho 1187: task = sched_unuseTask(task);
1.1 misho 1188:
1189: return task;
1190: }
1191:
1192: /*
1.10 misho 1193: * schedSuspend() - Add Suspended task to scheduler queue
1194: *
1195: * @root = root task
1196: * @func = task execution function
1197: * @arg = 1st func argument
1198: * @id = Trigger ID
1199: * @opt_data = Optional data
1200: * @opt_dlen = Optional data length
1201: * return: NULL error or !=NULL new queued task
1202: */
1203: sched_task_t *
1204: schedSuspend(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id,
1205: void *opt_data, size_t opt_dlen)
1206: {
1207: sched_task_t *task;
1208: void *ptr;
1209:
1210: if (!root || !func)
1211: return NULL;
1212:
1213: /* get new task */
1.13 misho 1214: if (!(task = sched_useTask(root)))
1.10 misho 1215: return NULL;
1216:
1217: task->task_func = func;
1218: TASK_TYPE(task) = taskSUSPEND;
1219: TASK_ROOT(task) = root;
1220:
1221: TASK_ARG(task) = arg;
1222: TASK_VAL(task) = id;
1223:
1224: TASK_DATA(task) = opt_data;
1225: TASK_DATLEN(task) = opt_dlen;
1226:
1227: if (root->root_hooks.hook_add.suspend)
1228: ptr = root->root_hooks.hook_add.suspend(task, NULL);
1229: else
1230: ptr = NULL;
1231:
1232: if (!ptr) {
1233: #ifdef HAVE_LIBPTHREAD
1234: pthread_mutex_lock(&root->root_mtx[taskSUSPEND]);
1235: #endif
1236: TAILQ_INSERT_TAIL(&root->root_suspend, TASK_ID(task), task_node);
1237: #ifdef HAVE_LIBPTHREAD
1238: pthread_mutex_unlock(&root->root_mtx[taskSUSPEND]);
1239: #endif
1240: } else
1.13 misho 1241: task = sched_unuseTask(task);
1.10 misho 1242:
1243: return task;
1244: }
1245:
1246: /*
1.1 misho 1247: * schedCallOnce() - Call once from scheduler
1.6 misho 1248: *
1.1 misho 1249: * @root = root task
1250: * @func = task execution function
1251: * @arg = 1st func argument
1.2 misho 1252: * @val = additional func argument
1.5 misho 1253: * @opt_data = Optional data
1254: * @opt_dlen = Optional data length
1.2 misho 1255: * return: return value from called func
1.1 misho 1256: */
1257: sched_task_t *
1.5 misho 1258: schedCallOnce(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val,
1259: void *opt_data, size_t opt_dlen)
1.1 misho 1260: {
1261: sched_task_t *task;
1.2 misho 1262: void *ret;
1.1 misho 1263:
1264: if (!root || !func)
1265: return NULL;
1266:
1267: /* get new task */
1.13 misho 1268: if (!(task = sched_useTask(root)))
1.4 misho 1269: return NULL;
1.1 misho 1270:
1271: task->task_func = func;
1.5 misho 1272: TASK_TYPE(task) = taskEVENT;
1273: TASK_ROOT(task) = root;
1.1 misho 1274:
1275: TASK_ARG(task) = arg;
1.2 misho 1276: TASK_VAL(task) = val;
1.1 misho 1277:
1.5 misho 1278: TASK_DATA(task) = opt_data;
1279: TASK_DATLEN(task) = opt_dlen;
1280:
1.2 misho 1281: ret = schedCall(task);
1.1 misho 1282:
1.13 misho 1283: sched_unuseTask(task);
1.2 misho 1284: return ret;
1.1 misho 1285: }
1.13 misho 1286:
1287: /*
1288: * schedThread() - Add thread task to scheduler queue
1289: *
1290: * @root = root task
1291: * @func = task execution function
1292: * @arg = 1st func argument
1.15 misho 1293: * @ss = stack size
1.13 misho 1294: * @opt_data = Optional data
1295: * @opt_dlen = Optional data length
1296: * return: NULL error or !=NULL new queued task
1297: */
1298: sched_task_t *
1.21 misho 1299: schedThread(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg,
1.15 misho 1300: size_t ss, void *opt_data, size_t opt_dlen)
1.13 misho 1301: {
1302: #ifndef HAVE_LIBPTHREAD
1303: sched_SetErr(ENOTSUP, "Not supported thread tasks");
1304: return NULL;
1305: #endif
1306: sched_task_t *task;
1307: pthread_attr_t attr;
1308:
1309: if (!root || !func)
1310: return NULL;
1311:
1312: /* get new task */
1.15 misho 1313: if (!(task = sched_useTask(root))) {
1314:
1.13 misho 1315: return NULL;
1.15 misho 1316: }
1.13 misho 1317:
1318: task->task_func = func;
1319: TASK_TYPE(task) = taskTHREAD;
1320: TASK_ROOT(task) = root;
1321:
1322: TASK_ARG(task) = arg;
1323:
1324: TASK_DATA(task) = opt_data;
1325: TASK_DATLEN(task) = opt_dlen;
1326:
1327: pthread_attr_init(&attr);
1.21 misho 1328: pthread_attr_setdetachstate(&attr, PTHREAD_DETACHED);
1.15 misho 1329: if (ss && (errno = pthread_attr_setstacksize(&attr, ss))) {
1330: LOGERR;
1331: pthread_attr_destroy(&attr);
1332: return sched_unuseTask(task);
1333: }
1334: if ((errno = pthread_attr_getstacksize(&attr, &ss))) {
1335: LOGERR;
1336: pthread_attr_destroy(&attr);
1337: return sched_unuseTask(task);
1338: } else
1.21 misho 1339: TASK_FLAG(task) = ss;
1.15 misho 1340: if ((errno = pthread_attr_setguardsize(&attr, ss))) {
1341: LOGERR;
1342: pthread_attr_destroy(&attr);
1343: return sched_unuseTask(task);
1344: }
1345: #ifdef SCHED_RR
1346: pthread_attr_setschedpolicy(&attr, SCHED_RR);
1347: #else
1348: pthread_attr_setschedpolicy(&attr, SCHED_OTHER);
1349: #endif
1.21 misho 1350:
1351: pthread_mutex_lock(&root->root_mtx[taskTHREAD]);
1352: TAILQ_INSERT_TAIL(&root->root_thread, TASK_ID(task), task_node);
1353: pthread_mutex_unlock(&root->root_mtx[taskTHREAD]);
1354:
1.13 misho 1355: if (root->root_hooks.hook_add.thread)
1.21 misho 1356: if (root->root_hooks.hook_add.thread(task, &attr)) {
1357: schedCancel(task);
1358: task = NULL;
1359: }
1.13 misho 1360: pthread_attr_destroy(&attr);
1361: return task;
1362: }
1363:
1.17 misho 1364: /*
1365: * schedRTC() - Add RTC task to scheduler queue
1366: *
1367: * @root = root task
1368: * @func = task execution function
1369: * @arg = 1st func argument
1370: * @ts = timeout argument structure, minimum alarm timer resolution is 1msec!
1371: * @opt_data = Optional RTC ID
1.18 misho 1372: * @opt_dlen = Optional data length
1.17 misho 1373: * return: NULL error or !=NULL new queued task
1374: */
1375: sched_task_t *
1376: schedRTC(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,
1377: void *opt_data, size_t opt_dlen)
1378: {
1379: #if defined(HAVE_TIMER_CREATE) && defined(HAVE_TIMER_SETTIME)
1380: sched_task_t *task;
1381: void *ptr;
1382:
1383: if (!root || !func)
1384: return NULL;
1385:
1386: /* get new task */
1387: if (!(task = sched_useTask(root)))
1388: return NULL;
1389:
1390: task->task_func = func;
1391: TASK_TYPE(task) = taskRTC;
1392: TASK_ROOT(task) = root;
1393:
1394: TASK_ARG(task) = arg;
1395: TASK_TS(task) = ts;
1396:
1397: TASK_DATA(task) = opt_data;
1398: TASK_DATLEN(task) = opt_dlen;
1399:
1400: if (root->root_hooks.hook_add.rtc)
1401: ptr = root->root_hooks.hook_add.rtc(task, NULL);
1402: else
1403: ptr = NULL;
1404:
1405: if (!ptr) {
1406: #ifdef HAVE_LIBPTHREAD
1407: pthread_mutex_lock(&root->root_mtx[taskRTC]);
1408: #endif
1409: TAILQ_INSERT_TAIL(&root->root_rtc, TASK_ID(task), task_node);
1410: #ifdef HAVE_LIBPTHREAD
1411: pthread_mutex_unlock(&root->root_mtx[taskRTC]);
1412: #endif
1413: } else
1414: task = sched_unuseTask(task);
1415:
1416: return task;
1417: #else
1418: sched_SetErr(ENOTSUP, "Not supported realtime clock extensions");
1419: return NULL;
1420: #endif
1421: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>