Annotation of libaitsched/src/tasks.c, revision 1.24.2.1
1.1 misho 1: /*************************************************************************
2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.24.2.1! misho 6: * $Id: tasks.c,v 1.24 2014/04/27 16:20:37 misho Exp $
1.1 misho 7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
1.23 misho 15: Copyright 2004 - 2014
1.1 misho 16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47:
48:
1.13 misho 49: /*
50: * sched_useTask() - Get and init new task
51: *
52: * @root = root task
53: * return: NULL error or !=NULL prepared task
54: */
1.16 misho 55: sched_task_t *
1.13 misho 56: sched_useTask(sched_root_task_t * __restrict root)
1.4 misho 57: {
1.7 misho 58: sched_task_t *task, *tmp;
1.4 misho 59:
1.14 misho 60: #ifdef HAVE_LIBPTHREAD
61: pthread_mutex_lock(&root->root_mtx[taskUNUSE]);
62: #endif
1.7 misho 63: TAILQ_FOREACH_SAFE(task, &root->root_unuse, task_node, tmp) {
1.4 misho 64: if (!TASK_ISLOCKED(task)) {
65: TAILQ_REMOVE(&root->root_unuse, task, task_node);
66: break;
67: }
68: }
1.14 misho 69: #ifdef HAVE_LIBPTHREAD
70: pthread_mutex_unlock(&root->root_mtx[taskUNUSE]);
71: #endif
1.4 misho 72:
73: if (!task) {
74: task = malloc(sizeof(sched_task_t));
75: if (!task) {
76: LOGERR;
77: return NULL;
78: }
79: }
80:
1.9 misho 81: memset(task, 0, sizeof(sched_task_t));
82: task->task_id = (uintptr_t) task;
1.4 misho 83: return task;
84: }
85:
1.13 misho 86: /*
87: * sched_unuseTask() - Unlock and put task to unuse queue
88: *
89: * @task = task
90: * return: always is NULL
91: */
1.16 misho 92: sched_task_t *
1.13 misho 93: sched_unuseTask(sched_task_t * __restrict task)
1.4 misho 94: {
95: TASK_UNLOCK(task);
1.5 misho 96: TASK_TYPE(task) = taskUNUSE;
97: #ifdef HAVE_LIBPTHREAD
98: pthread_mutex_lock(&TASK_ROOT(task)->root_mtx[taskUNUSE]);
99: #endif
1.9 misho 100: TAILQ_INSERT_TAIL(&TASK_ROOT(task)->root_unuse, TASK_ID(task), task_node);
1.5 misho 101: #ifdef HAVE_LIBPTHREAD
102: pthread_mutex_unlock(&TASK_ROOT(task)->root_mtx[taskUNUSE]);
103: #endif
1.4 misho 104: task = NULL;
105:
106: return task;
107: }
108:
1.14 misho 109: #pragma GCC visibility push(hidden)
110:
111: #ifdef HAVE_LIBPTHREAD
112: static void
113: _sched_threadCleanup(sched_task_t *t)
114: {
115: if (!t || !TASK_ROOT(t))
116: return;
117:
1.15 misho 118: pthread_mutex_lock(&TASK_ROOT(t)->root_mtx[taskTHREAD]);
119: TAILQ_REMOVE(&TASK_ROOT(t)->root_thread, t, task_node);
120: pthread_mutex_unlock(&TASK_ROOT(t)->root_mtx[taskTHREAD]);
1.14 misho 121: sched_unuseTask(t);
122: }
123: void *
124: _sched_threadWrapper(sched_task_t *t)
125: {
126: void *ret = NULL;
127:
1.21 misho 128: pthread_cleanup_push((void (*)(void*)) _sched_threadCleanup, t);
129:
130: if (!t || !TASK_ROOT(t))
1.14 misho 131: pthread_exit(ret);
132:
133: pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
1.21 misho 134: /*
1.14 misho 135: pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
1.21 misho 136: */
1.20 misho 137:
1.15 misho 138: /* notify parent, thread is ready for execution */
1.14 misho 139: pthread_testcancel();
140:
1.22 misho 141: ret = schedCall(t);
1.14 misho 142:
143: pthread_cleanup_pop(42);
144: TASK_ROOT(t)->root_ret = ret;
145: pthread_exit(ret);
146: }
147: #endif
148:
1.24 misho 149: #if defined(HAVE_TIMER_CREATE) && defined(HAVE_TIMER_SETTIME) && defined(HAVE_TIMER_DELETE)
1.19 misho 150: void *
151: _sched_rtcWrapper(sched_task_t *t)
152: {
153: sched_task_func_t func;
154: sched_task_t *task;
1.20 misho 155: sched_root_task_t *r;
1.19 misho 156:
157: if (!t || !TASK_ROOT(t) || !TASK_DATA(t))
158: return NULL;
159: else {
1.20 misho 160: r = TASK_ROOT(t);
1.19 misho 161: task = (sched_task_t*) TASK_DATA(t);
162: func = TASK_FUNC(task);
163: }
164:
1.20 misho 165: #ifdef HAVE_LIBPTHREAD
166: pthread_mutex_lock(&r->root_mtx[taskRTC]);
167: #endif
168: TAILQ_REMOVE(&r->root_rtc, task, task_node);
169: #ifdef HAVE_LIBPTHREAD
170: pthread_mutex_unlock(&r->root_mtx[taskRTC]);
171: #endif
1.21 misho 172: sched_unuseTask(task);
1.20 misho 173:
1.21 misho 174: timer_delete((timer_t) TASK_DATLEN(t));
1.20 misho 175:
1.22 misho 176: return schedCall(task);
1.19 misho 177: }
178: #endif
179:
1.14 misho 180: #pragma GCC visibility pop
181:
182: /*
183: * sched_taskExit() - Exit routine for scheduler task, explicit required for thread tasks
184: *
185: * @task = current task
186: * @retcode = return code
187: * return: return code
188: */
1.16 misho 189: void *
1.14 misho 190: sched_taskExit(sched_task_t *task, intptr_t retcode)
191: {
192: if (!task || !TASK_ROOT(task))
193: return (void*) -1;
194:
195: if (TASK_ROOT(task)->root_hooks.hook_exec.exit)
196: TASK_ROOT(task)->root_hooks.hook_exec.exit(task, (void*) retcode);
197:
198: TASK_ROOT(task)->root_ret = (void*) retcode;
199: return (void*) retcode;
200: }
201:
1.4 misho 202:
1.1 misho 203: /*
1.2 misho 204: * schedRead() - Add READ I/O task to scheduler queue
1.6 misho 205: *
1.1 misho 206: * @root = root task
207: * @func = task execution function
208: * @arg = 1st func argument
1.2 misho 209: * @fd = fd handle
1.5 misho 210: * @opt_data = Optional data
211: * @opt_dlen = Optional data length
1.1 misho 212: * return: NULL error or !=NULL new queued task
213: */
214: sched_task_t *
1.5 misho 215: schedRead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
216: void *opt_data, size_t opt_dlen)
1.1 misho 217: {
218: sched_task_t *task;
219: void *ptr;
220:
221: if (!root || !func)
222: return NULL;
223:
224: /* get new task */
1.13 misho 225: if (!(task = sched_useTask(root)))
1.4 misho 226: return NULL;
1.1 misho 227:
228: task->task_func = func;
1.5 misho 229: TASK_TYPE(task) = taskREAD;
230: TASK_ROOT(task) = root;
1.1 misho 231:
232: TASK_ARG(task) = arg;
1.2 misho 233: TASK_FD(task) = fd;
1.1 misho 234:
1.5 misho 235: TASK_DATA(task) = opt_data;
236: TASK_DATLEN(task) = opt_dlen;
237:
1.1 misho 238: if (root->root_hooks.hook_add.read)
239: ptr = root->root_hooks.hook_add.read(task, NULL);
240: else
241: ptr = NULL;
242:
1.5 misho 243: if (!ptr) {
244: #ifdef HAVE_LIBPTHREAD
245: pthread_mutex_lock(&root->root_mtx[taskREAD]);
246: #endif
1.9 misho 247: TAILQ_INSERT_TAIL(&root->root_read, TASK_ID(task), task_node);
1.5 misho 248: #ifdef HAVE_LIBPTHREAD
249: pthread_mutex_unlock(&root->root_mtx[taskREAD]);
250: #endif
251: } else
1.13 misho 252: task = sched_unuseTask(task);
1.1 misho 253:
254: return task;
255: }
256:
257: /*
1.2 misho 258: * schedWrite() - Add WRITE I/O task to scheduler queue
1.6 misho 259: *
1.1 misho 260: * @root = root task
261: * @func = task execution function
262: * @arg = 1st func argument
1.2 misho 263: * @fd = fd handle
1.5 misho 264: * @opt_data = Optional data
265: * @opt_dlen = Optional data length
1.1 misho 266: * return: NULL error or !=NULL new queued task
267: */
268: sched_task_t *
1.5 misho 269: schedWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
270: void *opt_data, size_t opt_dlen)
1.1 misho 271: {
272: sched_task_t *task;
273: void *ptr;
274:
275: if (!root || !func)
276: return NULL;
277:
278: /* get new task */
1.13 misho 279: if (!(task = sched_useTask(root)))
1.4 misho 280: return NULL;
1.1 misho 281:
282: task->task_func = func;
1.5 misho 283: TASK_TYPE(task) = taskWRITE;
284: TASK_ROOT(task) = root;
1.1 misho 285:
286: TASK_ARG(task) = arg;
1.2 misho 287: TASK_FD(task) = fd;
1.1 misho 288:
1.5 misho 289: TASK_DATA(task) = opt_data;
290: TASK_DATLEN(task) = opt_dlen;
291:
1.1 misho 292: if (root->root_hooks.hook_add.write)
293: ptr = root->root_hooks.hook_add.write(task, NULL);
294: else
295: ptr = NULL;
296:
1.5 misho 297: if (!ptr) {
298: #ifdef HAVE_LIBPTHREAD
299: pthread_mutex_lock(&root->root_mtx[taskWRITE]);
300: #endif
1.9 misho 301: TAILQ_INSERT_TAIL(&root->root_write, TASK_ID(task), task_node);
1.5 misho 302: #ifdef HAVE_LIBPTHREAD
303: pthread_mutex_unlock(&root->root_mtx[taskWRITE]);
304: #endif
305: } else
1.13 misho 306: task = sched_unuseTask(task);
1.1 misho 307:
308: return task;
309: }
310:
311: /*
1.9 misho 312: * schedNode() - Add NODE task to scheduler queue
313: *
314: * @root = root task
315: * @func = task execution function
316: * @arg = 1st func argument
317: * @fd = fd handle
318: * @opt_data = Optional data
319: * @opt_dlen = Optional data length
320: * return: NULL error or !=NULL new queued task
321: */
322: sched_task_t *
323: schedNode(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
324: void *opt_data, size_t opt_dlen)
325: {
1.23 misho 326: #ifdef KQ_DISABLE
327: sched_SetErr(ENOTSUP, "disabled kqueue support");
328: return NULL;
329: #else
1.9 misho 330: sched_task_t *task;
331: void *ptr;
332:
333: if (!root || !func)
334: return NULL;
335:
336: /* get new task */
1.13 misho 337: if (!(task = sched_useTask(root)))
1.9 misho 338: return NULL;
339:
340: task->task_func = func;
341: TASK_TYPE(task) = taskNODE;
342: TASK_ROOT(task) = root;
343:
344: TASK_ARG(task) = arg;
345: TASK_FD(task) = fd;
346:
347: TASK_DATA(task) = opt_data;
348: TASK_DATLEN(task) = opt_dlen;
349:
350: if (root->root_hooks.hook_add.node)
351: ptr = root->root_hooks.hook_add.node(task, NULL);
352: else
353: ptr = NULL;
354:
355: if (!ptr) {
356: #ifdef HAVE_LIBPTHREAD
357: pthread_mutex_lock(&root->root_mtx[taskNODE]);
358: #endif
359: TAILQ_INSERT_TAIL(&root->root_node, TASK_ID(task), task_node);
360: #ifdef HAVE_LIBPTHREAD
361: pthread_mutex_unlock(&root->root_mtx[taskNODE]);
362: #endif
363: } else
1.13 misho 364: task = sched_unuseTask(task);
1.9 misho 365:
366: return task;
1.23 misho 367: #endif /* KQ_DISABLE */
1.9 misho 368: }
369:
370: /*
371: * schedProc() - Add PROC task to scheduler queue
372: *
373: * @root = root task
374: * @func = task execution function
375: * @arg = 1st func argument
376: * @pid = PID
377: * @opt_data = Optional data
378: * @opt_dlen = Optional data length
379: * return: NULL error or !=NULL new queued task
380: */
381: sched_task_t *
382: schedProc(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long pid,
383: void *opt_data, size_t opt_dlen)
384: {
1.23 misho 385: #ifdef KQ_DISABLE
386: sched_SetErr(ENOTSUP, "disabled kqueue support");
387: return NULL;
388: #else
1.9 misho 389: sched_task_t *task;
390: void *ptr;
391:
392: if (!root || !func)
393: return NULL;
394:
395: /* get new task */
1.13 misho 396: if (!(task = sched_useTask(root)))
1.9 misho 397: return NULL;
398:
399: task->task_func = func;
400: TASK_TYPE(task) = taskPROC;
401: TASK_ROOT(task) = root;
402:
403: TASK_ARG(task) = arg;
404: TASK_VAL(task) = pid;
405:
406: TASK_DATA(task) = opt_data;
407: TASK_DATLEN(task) = opt_dlen;
408:
409: if (root->root_hooks.hook_add.proc)
410: ptr = root->root_hooks.hook_add.proc(task, NULL);
411: else
412: ptr = NULL;
413:
414: if (!ptr) {
415: #ifdef HAVE_LIBPTHREAD
416: pthread_mutex_lock(&root->root_mtx[taskPROC]);
417: #endif
418: TAILQ_INSERT_TAIL(&root->root_proc, TASK_ID(task), task_node);
419: #ifdef HAVE_LIBPTHREAD
420: pthread_mutex_unlock(&root->root_mtx[taskPROC]);
421: #endif
422: } else
1.13 misho 423: task = sched_unuseTask(task);
1.9 misho 424:
425: return task;
1.23 misho 426: #endif /* KQ_DISABLE */
1.9 misho 427: }
428:
429: /*
430: * schedUser() - Add trigger USER task to scheduler queue
431: *
432: * @root = root task
433: * @func = task execution function
434: * @arg = 1st func argument
435: * @id = Trigger ID
436: * @opt_data = Optional data
437: * @opt_dlen = Optional user's trigger flags
438: * return: NULL error or !=NULL new queued task
439: */
440: sched_task_t *
441: schedUser(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id,
442: void *opt_data, size_t opt_dlen)
443: {
1.23 misho 444: #ifdef KQ_DISABLE
445: sched_SetErr(ENOTSUP, "disabled kqueue support");
446: return NULL;
447: #else
1.9 misho 448: #ifndef EVFILT_USER
449: sched_SetErr(ENOTSUP, "Not supported kevent() filter");
450: return NULL;
451: #else
452: sched_task_t *task;
453: void *ptr;
454:
455: if (!root || !func)
456: return NULL;
457:
458: /* get new task */
1.13 misho 459: if (!(task = sched_useTask(root)))
1.9 misho 460: return NULL;
461:
462: task->task_func = func;
463: TASK_TYPE(task) = taskUSER;
464: TASK_ROOT(task) = root;
465:
466: TASK_ARG(task) = arg;
467: TASK_VAL(task) = id;
468:
469: TASK_DATA(task) = opt_data;
470: TASK_DATLEN(task) = opt_dlen;
471:
472: if (root->root_hooks.hook_add.user)
473: ptr = root->root_hooks.hook_add.user(task, NULL);
474: else
475: ptr = NULL;
476:
477: if (!ptr) {
478: #ifdef HAVE_LIBPTHREAD
479: pthread_mutex_lock(&root->root_mtx[taskUSER]);
480: #endif
481: TAILQ_INSERT_TAIL(&root->root_user, TASK_ID(task), task_node);
482: #ifdef HAVE_LIBPTHREAD
483: pthread_mutex_unlock(&root->root_mtx[taskUSER]);
484: #endif
485: } else
1.13 misho 486: task = sched_unuseTask(task);
1.9 misho 487:
488: return task;
1.23 misho 489: #endif /* EVFILT_USER */
490: #endif /* KQ_DISABLE */
1.9 misho 491: }
492:
493: /*
494: * schedSignal() - Add SIGNAL task to scheduler queue
495: *
496: * @root = root task
497: * @func = task execution function
498: * @arg = 1st func argument
499: * @sig = Signal
500: * @opt_data = Optional data
501: * @opt_dlen = Optional data length
502: * return: NULL error or !=NULL new queued task
503: */
504: sched_task_t *
505: schedSignal(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long sig,
506: void *opt_data, size_t opt_dlen)
507: {
1.23 misho 508: #ifdef KQ_DISABLE
509: sched_SetErr(ENOTSUP, "disabled kqueue support");
510: return NULL;
511: #else
1.9 misho 512: sched_task_t *task;
513: void *ptr;
514:
515: if (!root || !func)
516: return NULL;
517:
518: /* get new task */
1.13 misho 519: if (!(task = sched_useTask(root)))
1.9 misho 520: return NULL;
521:
522: task->task_func = func;
523: TASK_TYPE(task) = taskSIGNAL;
524: TASK_ROOT(task) = root;
525:
526: TASK_ARG(task) = arg;
527: TASK_VAL(task) = sig;
528:
529: TASK_DATA(task) = opt_data;
530: TASK_DATLEN(task) = opt_dlen;
531:
532: if (root->root_hooks.hook_add.signal)
533: ptr = root->root_hooks.hook_add.signal(task, NULL);
534: else
535: ptr = NULL;
536:
537: if (!ptr) {
538: #ifdef HAVE_LIBPTHREAD
539: pthread_mutex_lock(&root->root_mtx[taskSIGNAL]);
540: #endif
541: TAILQ_INSERT_TAIL(&root->root_signal, TASK_ID(task), task_node);
542: #ifdef HAVE_LIBPTHREAD
543: pthread_mutex_unlock(&root->root_mtx[taskSIGNAL]);
544: #endif
545: } else
1.13 misho 546: task = sched_unuseTask(task);
1.9 misho 547:
548: return task;
1.23 misho 549: #endif /* KQ_DISABLE */
1.9 misho 550: }
551:
552: /*
1.8 misho 553: * schedAlarm() - Add ALARM task to scheduler queue
554: *
555: * @root = root task
556: * @func = task execution function
557: * @arg = 1st func argument
558: * @ts = timeout argument structure, minimum alarm timer resolution is 1msec!
1.17 misho 559: * @opt_data = Alarm timer ID
1.8 misho 560: * @opt_dlen = Optional data length
561: * return: NULL error or !=NULL new queued task
562: */
563: sched_task_t *
564: schedAlarm(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,
565: void *opt_data, size_t opt_dlen)
566: {
1.23 misho 567: #ifdef KQ_DISABLE
568: sched_SetErr(ENOTSUP, "disabled kqueue support");
569: return NULL;
570: #else
1.8 misho 571: sched_task_t *task;
572: void *ptr;
573:
574: if (!root || !func)
575: return NULL;
576:
577: /* get new task */
1.13 misho 578: if (!(task = sched_useTask(root)))
1.8 misho 579: return NULL;
580:
581: task->task_func = func;
582: TASK_TYPE(task) = taskALARM;
583: TASK_ROOT(task) = root;
584:
585: TASK_ARG(task) = arg;
586: TASK_TS(task) = ts;
587:
588: TASK_DATA(task) = opt_data;
589: TASK_DATLEN(task) = opt_dlen;
590:
591: if (root->root_hooks.hook_add.alarm)
592: ptr = root->root_hooks.hook_add.alarm(task, NULL);
593: else
594: ptr = NULL;
595:
596: if (!ptr) {
597: #ifdef HAVE_LIBPTHREAD
598: pthread_mutex_lock(&root->root_mtx[taskALARM]);
599: #endif
1.9 misho 600: TAILQ_INSERT_TAIL(&root->root_alarm, TASK_ID(task), task_node);
1.8 misho 601: #ifdef HAVE_LIBPTHREAD
602: pthread_mutex_unlock(&root->root_mtx[taskALARM]);
603: #endif
604: } else
1.13 misho 605: task = sched_unuseTask(task);
1.8 misho 606:
607: return task;
1.23 misho 608: #endif /* KQ_DISABLE */
1.8 misho 609: }
610:
1.11 misho 611: #ifdef AIO_SUPPORT
612: /*
613: * schedAIO() - Add AIO task to scheduler queue
614: *
615: * @root = root task
616: * @func = task execution function
617: * @arg = 1st func argument
618: * @acb = AIO cb structure address
619: * @opt_data = Optional data
620: * @opt_dlen = Optional data length
621: * return: NULL error or !=NULL new queued task
622: */
623: sched_task_t *
624: schedAIO(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg,
625: struct aiocb * __restrict acb, void *opt_data, size_t opt_dlen)
626: {
1.23 misho 627: #ifdef KQ_DISABLE
628: sched_SetErr(ENOTSUP, "disabled kqueue support");
629: return NULL;
630: #else
1.11 misho 631: sched_task_t *task;
632: void *ptr;
633:
634: if (!root || !func || !acb || !opt_dlen)
635: return NULL;
636:
637: /* get new task */
1.13 misho 638: if (!(task = sched_useTask(root)))
1.11 misho 639: return NULL;
640:
641: task->task_func = func;
642: TASK_TYPE(task) = taskAIO;
643: TASK_ROOT(task) = root;
644:
645: TASK_ARG(task) = arg;
646: TASK_VAL(task) = (u_long) acb;
647:
648: TASK_DATA(task) = opt_data;
649: TASK_DATLEN(task) = opt_dlen;
650:
651: if (root->root_hooks.hook_add.aio)
652: ptr = root->root_hooks.hook_add.aio(task, NULL);
653: else
654: ptr = NULL;
655:
656: if (!ptr) {
657: #ifdef HAVE_LIBPTHREAD
658: pthread_mutex_lock(&root->root_mtx[taskAIO]);
659: #endif
660: TAILQ_INSERT_TAIL(&root->root_aio, TASK_ID(task), task_node);
661: #ifdef HAVE_LIBPTHREAD
662: pthread_mutex_unlock(&root->root_mtx[taskAIO]);
663: #endif
664: } else
1.13 misho 665: task = sched_unuseTask(task);
1.11 misho 666:
667: return task;
1.23 misho 668: #endif /* KQ_DISABLE */
1.11 misho 669: }
670:
671: /*
672: * schedAIORead() - Add AIO read task to scheduler queue
673: *
674: * @root = root task
675: * @func = task execution function
676: * @arg = 1st func argument
677: * @fd = file descriptor
678: * @buffer = Buffer
679: * @buflen = Buffer length
680: * @offset = Offset from start of file, if =-1 from current position
681: * return: NULL error or !=NULL new queued task
682: */
1.16 misho 683: sched_task_t *
1.11 misho 684: schedAIORead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
685: void *buffer, size_t buflen, off_t offset)
686: {
1.23 misho 687: #ifdef KQ_DISABLE
688: sched_SetErr(ENOTSUP, "disabled kqueue support");
689: return NULL;
690: #else
1.11 misho 691: struct aiocb *acb;
692: off_t off;
693:
694: if (!root || !func || !buffer || !buflen)
695: return NULL;
696:
697: if (offset == (off_t) -1) {
698: off = lseek(fd, 0, SEEK_CUR);
699: if (off == -1) {
700: LOGERR;
701: return NULL;
702: }
703: } else
704: off = offset;
705:
706: if (!(acb = malloc(sizeof(struct aiocb)))) {
707: LOGERR;
708: return NULL;
709: } else
710: memset(acb, 0, sizeof(struct aiocb));
711:
712: acb->aio_fildes = fd;
713: acb->aio_nbytes = buflen;
714: acb->aio_buf = buffer;
715: acb->aio_offset = off;
716: acb->aio_sigevent.sigev_notify = SIGEV_KEVENT;
717: acb->aio_sigevent.sigev_notify_kqueue = root->root_kq;
718: acb->aio_sigevent.sigev_value.sival_ptr = acb;
719:
720: if (aio_read(acb)) {
721: LOGERR;
722: free(acb);
723: return NULL;
724: }
725:
726: return schedAIO(root, func, arg, acb, buffer, buflen);
1.23 misho 727: #endif /* KQ_DISABLE */
1.11 misho 728: }
729:
730: /*
731: * schedAIOWrite() - Add AIO write task to scheduler queue
732: *
733: * @root = root task
734: * @func = task execution function
735: * @arg = 1st func argument
736: * @fd = file descriptor
737: * @buffer = Buffer
738: * @buflen = Buffer length
739: * @offset = Offset from start of file, if =-1 from current position
740: * return: NULL error or !=NULL new queued task
741: */
1.16 misho 742: sched_task_t *
1.11 misho 743: schedAIOWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
744: void *buffer, size_t buflen, off_t offset)
745: {
1.23 misho 746: #ifdef KQ_DISABLE
747: sched_SetErr(ENOTSUP, "disabled kqueue support");
748: return NULL;
749: #else
1.11 misho 750: struct aiocb *acb;
751: off_t off;
752:
753: if (!root || !func || !buffer || !buflen)
754: return NULL;
755:
756: if (offset == (off_t) -1) {
757: off = lseek(fd, 0, SEEK_CUR);
758: if (off == -1) {
759: LOGERR;
760: return NULL;
761: }
762: } else
763: off = offset;
764:
765: if (!(acb = malloc(sizeof(struct aiocb)))) {
766: LOGERR;
767: return NULL;
768: } else
769: memset(acb, 0, sizeof(struct aiocb));
770:
771: acb->aio_fildes = fd;
772: acb->aio_nbytes = buflen;
773: acb->aio_buf = buffer;
774: acb->aio_offset = off;
775: acb->aio_sigevent.sigev_notify = SIGEV_KEVENT;
776: acb->aio_sigevent.sigev_notify_kqueue = root->root_kq;
777: acb->aio_sigevent.sigev_value.sival_ptr = acb;
778:
779: if (aio_write(acb)) {
780: LOGERR;
781: free(acb);
782: return NULL;
783: }
784:
785: return schedAIO(root, func, arg, acb, buffer, buflen);
1.23 misho 786: #endif /* KQ_DISABLE */
1.11 misho 787: }
788:
789: #ifdef EVFILT_LIO
790: /*
791: * schedLIO() - Add AIO bulk tasks to scheduler queue
792: *
793: * @root = root task
794: * @func = task execution function
795: * @arg = 1st func argument
796: * @acbs = AIO cb structure addresses
797: * @opt_data = Optional data
798: * @opt_dlen = Optional data length
799: * return: NULL error or !=NULL new queued task
800: */
801: sched_task_t *
802: schedLIO(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg,
803: struct aiocb ** __restrict acbs, void *opt_data, size_t opt_dlen)
804: {
1.23 misho 805: #ifdef KQ_DISABLE
806: sched_SetErr(ENOTSUP, "disabled kqueue support");
807: return NULL;
808: #else
1.11 misho 809: sched_task_t *task;
810: void *ptr;
811:
812: if (!root || !func || !acbs || !opt_dlen)
813: return NULL;
814:
815: /* get new task */
1.13 misho 816: if (!(task = sched_useTask(root)))
1.11 misho 817: return NULL;
818:
819: task->task_func = func;
820: TASK_TYPE(task) = taskLIO;
821: TASK_ROOT(task) = root;
822:
823: TASK_ARG(task) = arg;
824: TASK_VAL(task) = (u_long) acbs;
825:
826: TASK_DATA(task) = opt_data;
827: TASK_DATLEN(task) = opt_dlen;
828:
829: if (root->root_hooks.hook_add.lio)
830: ptr = root->root_hooks.hook_add.lio(task, NULL);
831: else
832: ptr = NULL;
833:
834: if (!ptr) {
835: #ifdef HAVE_LIBPTHREAD
836: pthread_mutex_lock(&root->root_mtx[taskLIO]);
837: #endif
838: TAILQ_INSERT_TAIL(&root->root_lio, TASK_ID(task), task_node);
839: #ifdef HAVE_LIBPTHREAD
840: pthread_mutex_unlock(&root->root_mtx[taskLIO]);
841: #endif
842: } else
1.13 misho 843: task = sched_unuseTask(task);
1.11 misho 844:
845: return task;
1.23 misho 846: #endif /* KQ_DISABLE */
1.11 misho 847: }
848:
849: /*
850: * schedLIORead() - Add list of AIO read tasks to scheduler queue
851: *
852: * @root = root task
853: * @func = task execution function
854: * @arg = 1st func argument
855: * @fd = file descriptor
856: * @bufs = Buffer's list
857: * @nbufs = Number of Buffers
858: * @offset = Offset from start of file, if =-1 from current position
859: * return: NULL error or !=NULL new queued task
860: */
861: sched_task_t *
862: schedLIORead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
863: struct iovec *bufs, size_t nbufs, off_t offset)
864: {
1.23 misho 865: #ifdef KQ_DISABLE
866: sched_SetErr(ENOTSUP, "disabled kqueue support");
867: return NULL;
868: #else
1.11 misho 869: struct sigevent sig;
870: struct aiocb **acb;
871: off_t off;
872: register int i;
873:
874: if (!root || !func || !bufs || !nbufs)
875: return NULL;
876:
877: if (offset == (off_t) -1) {
878: off = lseek(fd, 0, SEEK_CUR);
879: if (off == -1) {
880: LOGERR;
881: return NULL;
882: }
883: } else
884: off = offset;
885:
886: if (!(acb = calloc(sizeof(void*), nbufs))) {
887: LOGERR;
888: return NULL;
889: } else
890: memset(acb, 0, sizeof(void*) * nbufs);
891: for (i = 0; i < nbufs; off += bufs[i++].iov_len) {
892: acb[i] = malloc(sizeof(struct aiocb));
893: if (!acb[i]) {
894: LOGERR;
895: for (i = 0; i < nbufs; i++)
896: if (acb[i])
897: free(acb[i]);
898: free(acb);
899: return NULL;
900: } else
901: memset(acb[i], 0, sizeof(struct aiocb));
902: acb[i]->aio_fildes = fd;
903: acb[i]->aio_nbytes = bufs[i].iov_len;
904: acb[i]->aio_buf = bufs[i].iov_base;
905: acb[i]->aio_offset = off;
906: acb[i]->aio_lio_opcode = LIO_READ;
907: }
908: memset(&sig, 0, sizeof sig);
909: sig.sigev_notify = SIGEV_KEVENT;
910: sig.sigev_notify_kqueue = root->root_kq;
911: sig.sigev_value.sival_ptr = acb;
912:
913: if (lio_listio(LIO_NOWAIT, acb, nbufs, &sig)) {
914: LOGERR;
915: for (i = 0; i < nbufs; i++)
916: if (acb[i])
917: free(acb[i]);
918: free(acb);
919: return NULL;
920: }
921:
922: return schedLIO(root, func, arg, (void*) acb, bufs, nbufs);
1.23 misho 923: #endif /* KQ_DISABLE */
1.11 misho 924: }
925:
926: /*
927: * schedLIOWrite() - Add list of AIO write tasks to scheduler queue
928: *
929: * @root = root task
930: * @func = task execution function
931: * @arg = 1st func argument
932: * @fd = file descriptor
933: * @bufs = Buffer's list
934: * @nbufs = Number of Buffers
935: * @offset = Offset from start of file, if =-1 from current position
936: * return: NULL error or !=NULL new queued task
937: */
1.16 misho 938: sched_task_t *
1.11 misho 939: schedLIOWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
940: struct iovec *bufs, size_t nbufs, off_t offset)
941: {
1.23 misho 942: #ifdef KQ_DISABLE
943: sched_SetErr(ENOTSUP, "disabled kqueue support");
944: return NULL;
945: #else
1.11 misho 946: struct sigevent sig;
947: struct aiocb **acb;
948: off_t off;
949: register int i;
950:
951: if (!root || !func || !bufs || !nbufs)
952: return NULL;
953:
954: if (offset == (off_t) -1) {
955: off = lseek(fd, 0, SEEK_CUR);
956: if (off == -1) {
957: LOGERR;
958: return NULL;
959: }
960: } else
961: off = offset;
962:
963: if (!(acb = calloc(sizeof(void*), nbufs))) {
964: LOGERR;
965: return NULL;
966: } else
967: memset(acb, 0, sizeof(void*) * nbufs);
968: for (i = 0; i < nbufs; off += bufs[i++].iov_len) {
969: acb[i] = malloc(sizeof(struct aiocb));
970: if (!acb[i]) {
971: LOGERR;
972: for (i = 0; i < nbufs; i++)
973: if (acb[i])
974: free(acb[i]);
975: free(acb);
976: return NULL;
977: } else
978: memset(acb[i], 0, sizeof(struct aiocb));
979: acb[i]->aio_fildes = fd;
980: acb[i]->aio_nbytes = bufs[i].iov_len;
981: acb[i]->aio_buf = bufs[i].iov_base;
982: acb[i]->aio_offset = off;
983: acb[i]->aio_lio_opcode = LIO_WRITE;
984: }
985: memset(&sig, 0, sizeof sig);
986: sig.sigev_notify = SIGEV_KEVENT;
987: sig.sigev_notify_kqueue = root->root_kq;
988: sig.sigev_value.sival_ptr = acb;
989:
990: if (lio_listio(LIO_NOWAIT, acb, nbufs, &sig)) {
991: LOGERR;
992: for (i = 0; i < nbufs; i++)
993: if (acb[i])
994: free(acb[i]);
995: free(acb);
996: return NULL;
997: }
998:
999: return schedLIO(root, func, arg, (void*) acb, bufs, nbufs);
1.23 misho 1000: #endif /* KQ_DISABLE */
1.11 misho 1001: }
1002: #endif /* EVFILT_LIO */
1003: #endif /* AIO_SUPPORT */
1004:
1.8 misho 1005: /*
1.1 misho 1006: * schedTimer() - Add TIMER task to scheduler queue
1.6 misho 1007: *
1.1 misho 1008: * @root = root task
1009: * @func = task execution function
1010: * @arg = 1st func argument
1.5 misho 1011: * @ts = timeout argument structure
1012: * @opt_data = Optional data
1013: * @opt_dlen = Optional data length
1.1 misho 1014: * return: NULL error or !=NULL new queued task
1015: */
1016: sched_task_t *
1.5 misho 1017: schedTimer(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,
1018: void *opt_data, size_t opt_dlen)
1.1 misho 1019: {
1.9 misho 1020: sched_task_t *task, *tmp, *t = NULL;
1.1 misho 1021: void *ptr;
1.5 misho 1022: struct timespec now;
1.1 misho 1023:
1024: if (!root || !func)
1025: return NULL;
1026:
1027: /* get new task */
1.13 misho 1028: if (!(task = sched_useTask(root)))
1.4 misho 1029: return NULL;
1.1 misho 1030:
1031: task->task_func = func;
1.5 misho 1032: TASK_TYPE(task) = taskTIMER;
1033: TASK_ROOT(task) = root;
1.1 misho 1034:
1035: TASK_ARG(task) = arg;
1036:
1.5 misho 1037: TASK_DATA(task) = opt_data;
1038: TASK_DATLEN(task) = opt_dlen;
1039:
1.1 misho 1040: /* calculate timeval structure */
1.5 misho 1041: clock_gettime(CLOCK_MONOTONIC, &now);
1042: now.tv_sec += ts.tv_sec;
1043: now.tv_nsec += ts.tv_nsec;
1044: if (now.tv_nsec >= 1000000000L) {
1.1 misho 1045: now.tv_sec++;
1.5 misho 1046: now.tv_nsec -= 1000000000L;
1047: } else if (now.tv_nsec < 0) {
1.1 misho 1048: now.tv_sec--;
1.5 misho 1049: now.tv_nsec += 1000000000L;
1.1 misho 1050: }
1.5 misho 1051: TASK_TS(task) = now;
1.1 misho 1052:
1053: if (root->root_hooks.hook_add.timer)
1054: ptr = root->root_hooks.hook_add.timer(task, NULL);
1055: else
1056: ptr = NULL;
1057:
1058: if (!ptr) {
1.5 misho 1059: #ifdef HAVE_LIBPTHREAD
1060: pthread_mutex_lock(&root->root_mtx[taskTIMER]);
1061: #endif
1.1 misho 1062: #ifdef TIMER_WITHOUT_SORT
1.9 misho 1063: TAILQ_INSERT_TAIL(&root->root_timer, TASK_ID(task), task_node);
1.1 misho 1064: #else
1.9 misho 1065: TAILQ_FOREACH_SAFE(t, &root->root_timer, task_node, tmp)
1.5 misho 1066: if (sched_timespeccmp(&TASK_TS(task), &TASK_TS(t), -) < 1)
1.1 misho 1067: break;
1068: if (!t)
1.9 misho 1069: TAILQ_INSERT_TAIL(&root->root_timer, TASK_ID(task), task_node);
1.1 misho 1070: else
1.9 misho 1071: TAILQ_INSERT_BEFORE(t, TASK_ID(task), task_node);
1.1 misho 1072: #endif
1.5 misho 1073: #ifdef HAVE_LIBPTHREAD
1074: pthread_mutex_unlock(&root->root_mtx[taskTIMER]);
1075: #endif
1.4 misho 1076: } else
1.13 misho 1077: task = sched_unuseTask(task);
1.1 misho 1078:
1079: return task;
1080: }
1081:
1082: /*
1083: * schedEvent() - Add EVENT task to scheduler queue
1.6 misho 1084: *
1.1 misho 1085: * @root = root task
1086: * @func = task execution function
1087: * @arg = 1st func argument
1.2 misho 1088: * @val = additional func argument
1.5 misho 1089: * @opt_data = Optional data
1090: * @opt_dlen = Optional data length
1.1 misho 1091: * return: NULL error or !=NULL new queued task
1092: */
1093: sched_task_t *
1.5 misho 1094: schedEvent(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val,
1095: void *opt_data, size_t opt_dlen)
1.1 misho 1096: {
1097: sched_task_t *task;
1098: void *ptr;
1099:
1100: if (!root || !func)
1101: return NULL;
1102:
1103: /* get new task */
1.13 misho 1104: if (!(task = sched_useTask(root)))
1.4 misho 1105: return NULL;
1.1 misho 1106:
1107: task->task_func = func;
1.5 misho 1108: TASK_TYPE(task) = taskEVENT;
1109: TASK_ROOT(task) = root;
1.1 misho 1110:
1111: TASK_ARG(task) = arg;
1.2 misho 1112: TASK_VAL(task) = val;
1.1 misho 1113:
1.5 misho 1114: TASK_DATA(task) = opt_data;
1115: TASK_DATLEN(task) = opt_dlen;
1116:
1.1 misho 1117: if (root->root_hooks.hook_add.event)
1118: ptr = root->root_hooks.hook_add.event(task, NULL);
1119: else
1120: ptr = NULL;
1121:
1.5 misho 1122: if (!ptr) {
1123: #ifdef HAVE_LIBPTHREAD
1124: pthread_mutex_lock(&root->root_mtx[taskEVENT]);
1125: #endif
1.9 misho 1126: TAILQ_INSERT_TAIL(&root->root_event, TASK_ID(task), task_node);
1.5 misho 1127: #ifdef HAVE_LIBPTHREAD
1128: pthread_mutex_unlock(&root->root_mtx[taskEVENT]);
1129: #endif
1130: } else
1.13 misho 1131: task = sched_unuseTask(task);
1.1 misho 1132:
1133: return task;
1134: }
1135:
1136:
1137: /*
1.12 misho 1138: * schedTask() - Add regular task to scheduler queue
1.6 misho 1139: *
1.1 misho 1140: * @root = root task
1141: * @func = task execution function
1142: * @arg = 1st func argument
1.12 misho 1143: * @prio = regular task priority, 0 is hi priority for regular tasks
1.5 misho 1144: * @opt_data = Optional data
1145: * @opt_dlen = Optional data length
1.1 misho 1146: * return: NULL error or !=NULL new queued task
1147: */
1148: sched_task_t *
1.12 misho 1149: schedTask(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long prio,
1.5 misho 1150: void *opt_data, size_t opt_dlen)
1.1 misho 1151: {
1.12 misho 1152: sched_task_t *task, *tmp, *t = NULL;
1.1 misho 1153: void *ptr;
1154:
1155: if (!root || !func)
1156: return NULL;
1157:
1158: /* get new task */
1.13 misho 1159: if (!(task = sched_useTask(root)))
1.4 misho 1160: return NULL;
1.1 misho 1161:
1162: task->task_func = func;
1.12 misho 1163: TASK_TYPE(task) = taskTASK;
1.5 misho 1164: TASK_ROOT(task) = root;
1.1 misho 1165:
1166: TASK_ARG(task) = arg;
1.12 misho 1167: TASK_VAL(task) = prio;
1.1 misho 1168:
1.5 misho 1169: TASK_DATA(task) = opt_data;
1170: TASK_DATLEN(task) = opt_dlen;
1171:
1.12 misho 1172: if (root->root_hooks.hook_add.task)
1173: ptr = root->root_hooks.hook_add.task(task, NULL);
1.1 misho 1174: else
1175: ptr = NULL;
1176:
1.5 misho 1177: if (!ptr) {
1178: #ifdef HAVE_LIBPTHREAD
1.12 misho 1179: pthread_mutex_lock(&root->root_mtx[taskTASK]);
1.5 misho 1180: #endif
1.12 misho 1181: TAILQ_FOREACH_SAFE(t, &root->root_task, task_node, tmp)
1182: if (TASK_VAL(task) < TASK_VAL(t))
1183: break;
1184: if (!t)
1185: TAILQ_INSERT_TAIL(&root->root_task, TASK_ID(task), task_node);
1186: else
1187: TAILQ_INSERT_BEFORE(t, TASK_ID(task), task_node);
1.5 misho 1188: #ifdef HAVE_LIBPTHREAD
1.12 misho 1189: pthread_mutex_unlock(&root->root_mtx[taskTASK]);
1.5 misho 1190: #endif
1191: } else
1.13 misho 1192: task = sched_unuseTask(task);
1.1 misho 1193:
1194: return task;
1195: }
1196:
1197: /*
1.10 misho 1198: * schedSuspend() - Add Suspended task to scheduler queue
1199: *
1200: * @root = root task
1201: * @func = task execution function
1202: * @arg = 1st func argument
1203: * @id = Trigger ID
1204: * @opt_data = Optional data
1205: * @opt_dlen = Optional data length
1206: * return: NULL error or !=NULL new queued task
1207: */
1208: sched_task_t *
1209: schedSuspend(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id,
1210: void *opt_data, size_t opt_dlen)
1211: {
1212: sched_task_t *task;
1213: void *ptr;
1214:
1215: if (!root || !func)
1216: return NULL;
1217:
1218: /* get new task */
1.13 misho 1219: if (!(task = sched_useTask(root)))
1.10 misho 1220: return NULL;
1221:
1222: task->task_func = func;
1223: TASK_TYPE(task) = taskSUSPEND;
1224: TASK_ROOT(task) = root;
1225:
1226: TASK_ARG(task) = arg;
1227: TASK_VAL(task) = id;
1228:
1229: TASK_DATA(task) = opt_data;
1230: TASK_DATLEN(task) = opt_dlen;
1231:
1232: if (root->root_hooks.hook_add.suspend)
1233: ptr = root->root_hooks.hook_add.suspend(task, NULL);
1234: else
1235: ptr = NULL;
1236:
1237: if (!ptr) {
1238: #ifdef HAVE_LIBPTHREAD
1239: pthread_mutex_lock(&root->root_mtx[taskSUSPEND]);
1240: #endif
1241: TAILQ_INSERT_TAIL(&root->root_suspend, TASK_ID(task), task_node);
1242: #ifdef HAVE_LIBPTHREAD
1243: pthread_mutex_unlock(&root->root_mtx[taskSUSPEND]);
1244: #endif
1245: } else
1.13 misho 1246: task = sched_unuseTask(task);
1.10 misho 1247:
1248: return task;
1249: }
1250:
1251: /*
1.1 misho 1252: * schedCallOnce() - Call once from scheduler
1.6 misho 1253: *
1.1 misho 1254: * @root = root task
1255: * @func = task execution function
1256: * @arg = 1st func argument
1.2 misho 1257: * @val = additional func argument
1.5 misho 1258: * @opt_data = Optional data
1259: * @opt_dlen = Optional data length
1.2 misho 1260: * return: return value from called func
1.1 misho 1261: */
1262: sched_task_t *
1.5 misho 1263: schedCallOnce(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val,
1264: void *opt_data, size_t opt_dlen)
1.1 misho 1265: {
1266: sched_task_t *task;
1.2 misho 1267: void *ret;
1.1 misho 1268:
1269: if (!root || !func)
1270: return NULL;
1271:
1272: /* get new task */
1.13 misho 1273: if (!(task = sched_useTask(root)))
1.4 misho 1274: return NULL;
1.1 misho 1275:
1276: task->task_func = func;
1.5 misho 1277: TASK_TYPE(task) = taskEVENT;
1278: TASK_ROOT(task) = root;
1.1 misho 1279:
1280: TASK_ARG(task) = arg;
1.2 misho 1281: TASK_VAL(task) = val;
1.1 misho 1282:
1.5 misho 1283: TASK_DATA(task) = opt_data;
1284: TASK_DATLEN(task) = opt_dlen;
1285:
1.2 misho 1286: ret = schedCall(task);
1.1 misho 1287:
1.13 misho 1288: sched_unuseTask(task);
1.2 misho 1289: return ret;
1.1 misho 1290: }
1.13 misho 1291:
1292: /*
1293: * schedThread() - Add thread task to scheduler queue
1294: *
1295: * @root = root task
1296: * @func = task execution function
1297: * @arg = 1st func argument
1.15 misho 1298: * @ss = stack size
1.13 misho 1299: * @opt_data = Optional data
1300: * @opt_dlen = Optional data length
1301: * return: NULL error or !=NULL new queued task
1302: */
1303: sched_task_t *
1.21 misho 1304: schedThread(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg,
1.15 misho 1305: size_t ss, void *opt_data, size_t opt_dlen)
1.13 misho 1306: {
1307: #ifndef HAVE_LIBPTHREAD
1308: sched_SetErr(ENOTSUP, "Not supported thread tasks");
1309: return NULL;
1310: #endif
1311: sched_task_t *task;
1312: pthread_attr_t attr;
1313:
1314: if (!root || !func)
1315: return NULL;
1316:
1317: /* get new task */
1.15 misho 1318: if (!(task = sched_useTask(root))) {
1319:
1.13 misho 1320: return NULL;
1.15 misho 1321: }
1.13 misho 1322:
1323: task->task_func = func;
1324: TASK_TYPE(task) = taskTHREAD;
1325: TASK_ROOT(task) = root;
1326:
1327: TASK_ARG(task) = arg;
1328:
1329: TASK_DATA(task) = opt_data;
1330: TASK_DATLEN(task) = opt_dlen;
1331:
1332: pthread_attr_init(&attr);
1.24.2.1! misho 1333: pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
1.15 misho 1334: if (ss && (errno = pthread_attr_setstacksize(&attr, ss))) {
1335: LOGERR;
1336: pthread_attr_destroy(&attr);
1337: return sched_unuseTask(task);
1338: }
1339: if ((errno = pthread_attr_getstacksize(&attr, &ss))) {
1340: LOGERR;
1341: pthread_attr_destroy(&attr);
1342: return sched_unuseTask(task);
1343: } else
1.21 misho 1344: TASK_FLAG(task) = ss;
1.15 misho 1345: if ((errno = pthread_attr_setguardsize(&attr, ss))) {
1346: LOGERR;
1347: pthread_attr_destroy(&attr);
1348: return sched_unuseTask(task);
1349: }
1350: #ifdef SCHED_RR
1351: pthread_attr_setschedpolicy(&attr, SCHED_RR);
1352: #else
1353: pthread_attr_setschedpolicy(&attr, SCHED_OTHER);
1354: #endif
1.21 misho 1355:
1356: pthread_mutex_lock(&root->root_mtx[taskTHREAD]);
1357: TAILQ_INSERT_TAIL(&root->root_thread, TASK_ID(task), task_node);
1358: pthread_mutex_unlock(&root->root_mtx[taskTHREAD]);
1359:
1.13 misho 1360: if (root->root_hooks.hook_add.thread)
1.21 misho 1361: if (root->root_hooks.hook_add.thread(task, &attr)) {
1362: schedCancel(task);
1363: task = NULL;
1364: }
1.13 misho 1365: pthread_attr_destroy(&attr);
1366: return task;
1367: }
1368:
1.17 misho 1369: /*
1370: * schedRTC() - Add RTC task to scheduler queue
1371: *
1372: * @root = root task
1373: * @func = task execution function
1374: * @arg = 1st func argument
1375: * @ts = timeout argument structure, minimum alarm timer resolution is 1msec!
1376: * @opt_data = Optional RTC ID
1.18 misho 1377: * @opt_dlen = Optional data length
1.17 misho 1378: * return: NULL error or !=NULL new queued task
1379: */
1380: sched_task_t *
1381: schedRTC(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,
1382: void *opt_data, size_t opt_dlen)
1383: {
1.24 misho 1384: #if defined(HAVE_TIMER_CREATE) && defined(HAVE_TIMER_SETTIME) && defined(HAVE_TIMER_DELETE)
1.17 misho 1385: sched_task_t *task;
1386: void *ptr;
1387:
1388: if (!root || !func)
1389: return NULL;
1390:
1391: /* get new task */
1392: if (!(task = sched_useTask(root)))
1393: return NULL;
1394:
1395: task->task_func = func;
1396: TASK_TYPE(task) = taskRTC;
1397: TASK_ROOT(task) = root;
1398:
1399: TASK_ARG(task) = arg;
1400: TASK_TS(task) = ts;
1401:
1402: TASK_DATA(task) = opt_data;
1403: TASK_DATLEN(task) = opt_dlen;
1404:
1405: if (root->root_hooks.hook_add.rtc)
1406: ptr = root->root_hooks.hook_add.rtc(task, NULL);
1407: else
1408: ptr = NULL;
1409:
1410: if (!ptr) {
1411: #ifdef HAVE_LIBPTHREAD
1412: pthread_mutex_lock(&root->root_mtx[taskRTC]);
1413: #endif
1414: TAILQ_INSERT_TAIL(&root->root_rtc, TASK_ID(task), task_node);
1415: #ifdef HAVE_LIBPTHREAD
1416: pthread_mutex_unlock(&root->root_mtx[taskRTC]);
1417: #endif
1418: } else
1419: task = sched_unuseTask(task);
1420:
1421: return task;
1422: #else
1423: sched_SetErr(ENOTSUP, "Not supported realtime clock extensions");
1424: return NULL;
1425: #endif
1426: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>