Annotation of libaitsched/src/tasks.c, revision 1.21.2.1
1.1 misho 1: /*************************************************************************
2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.21.2.1! misho 6: * $Id: tasks.c,v 1.21 2013/08/26 18:48:23 misho Exp $
1.1 misho 7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
1.16 misho 15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013
1.1 misho 16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47:
48:
1.13 misho 49: /*
50: * sched_useTask() - Get and init new task
51: *
52: * @root = root task
53: * return: NULL error or !=NULL prepared task
54: */
1.16 misho 55: sched_task_t *
1.13 misho 56: sched_useTask(sched_root_task_t * __restrict root)
1.4 misho 57: {
1.7 misho 58: sched_task_t *task, *tmp;
1.4 misho 59:
1.14 misho 60: #ifdef HAVE_LIBPTHREAD
61: pthread_mutex_lock(&root->root_mtx[taskUNUSE]);
62: #endif
1.7 misho 63: TAILQ_FOREACH_SAFE(task, &root->root_unuse, task_node, tmp) {
1.4 misho 64: if (!TASK_ISLOCKED(task)) {
65: TAILQ_REMOVE(&root->root_unuse, task, task_node);
66: break;
67: }
68: }
1.14 misho 69: #ifdef HAVE_LIBPTHREAD
70: pthread_mutex_unlock(&root->root_mtx[taskUNUSE]);
71: #endif
1.4 misho 72:
73: if (!task) {
74: task = malloc(sizeof(sched_task_t));
75: if (!task) {
76: LOGERR;
77: return NULL;
78: }
79: }
80:
1.9 misho 81: memset(task, 0, sizeof(sched_task_t));
82: task->task_id = (uintptr_t) task;
1.4 misho 83: return task;
84: }
85:
1.13 misho 86: /*
87: * sched_unuseTask() - Unlock and put task to unuse queue
88: *
89: * @task = task
90: * return: always is NULL
91: */
1.16 misho 92: sched_task_t *
1.13 misho 93: sched_unuseTask(sched_task_t * __restrict task)
1.4 misho 94: {
95: TASK_UNLOCK(task);
1.5 misho 96: TASK_TYPE(task) = taskUNUSE;
97: #ifdef HAVE_LIBPTHREAD
98: pthread_mutex_lock(&TASK_ROOT(task)->root_mtx[taskUNUSE]);
99: #endif
1.9 misho 100: TAILQ_INSERT_TAIL(&TASK_ROOT(task)->root_unuse, TASK_ID(task), task_node);
1.5 misho 101: #ifdef HAVE_LIBPTHREAD
102: pthread_mutex_unlock(&TASK_ROOT(task)->root_mtx[taskUNUSE]);
103: #endif
1.4 misho 104: task = NULL;
105:
106: return task;
107: }
108:
1.14 misho 109: #pragma GCC visibility push(hidden)
110:
111: #ifdef HAVE_LIBPTHREAD
112: static void
113: _sched_threadCleanup(sched_task_t *t)
114: {
115: if (!t || !TASK_ROOT(t))
116: return;
117:
1.15 misho 118: pthread_mutex_lock(&TASK_ROOT(t)->root_mtx[taskTHREAD]);
119: TAILQ_REMOVE(&TASK_ROOT(t)->root_thread, t, task_node);
120: pthread_mutex_unlock(&TASK_ROOT(t)->root_mtx[taskTHREAD]);
1.14 misho 121: sched_unuseTask(t);
122: }
123: void *
124: _sched_threadWrapper(sched_task_t *t)
125: {
126: void *ret = NULL;
127:
1.21 misho 128: pthread_cleanup_push((void (*)(void*)) _sched_threadCleanup, t);
129:
130: if (!t || !TASK_ROOT(t))
1.14 misho 131: pthread_exit(ret);
132:
133: pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
1.21 misho 134: /*
1.14 misho 135: pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
1.21 misho 136: */
1.20 misho 137:
1.15 misho 138: /* notify parent, thread is ready for execution */
1.14 misho 139: pthread_testcancel();
140:
1.21.2.1! misho 141: // ret = TASK_FUNC(t)(t);
! 142: ret = schedCall(t);
1.14 misho 143:
144: pthread_cleanup_pop(42);
145: TASK_ROOT(t)->root_ret = ret;
146: pthread_exit(ret);
147: }
148: #endif
149:
1.19 misho 150: #if defined(HAVE_TIMER_CREATE) && defined(HAVE_TIMER_SETTIME)
151: void *
152: _sched_rtcWrapper(sched_task_t *t)
153: {
154: sched_task_func_t func;
155: sched_task_t *task;
1.20 misho 156: sched_root_task_t *r;
1.19 misho 157:
158: if (!t || !TASK_ROOT(t) || !TASK_DATA(t))
159: return NULL;
160: else {
1.20 misho 161: r = TASK_ROOT(t);
1.19 misho 162: task = (sched_task_t*) TASK_DATA(t);
163: func = TASK_FUNC(task);
164: }
165:
1.20 misho 166: #ifdef HAVE_LIBPTHREAD
167: pthread_mutex_lock(&r->root_mtx[taskRTC]);
168: #endif
169: TAILQ_REMOVE(&r->root_rtc, task, task_node);
170: #ifdef HAVE_LIBPTHREAD
171: pthread_mutex_unlock(&r->root_mtx[taskRTC]);
172: #endif
1.21 misho 173: sched_unuseTask(task);
1.20 misho 174:
1.21 misho 175: timer_delete((timer_t) TASK_DATLEN(t));
1.20 misho 176:
1.21.2.1! misho 177: return schedCall(task);
1.19 misho 178: }
179: #endif
180:
1.14 misho 181: #pragma GCC visibility pop
182:
183: /*
184: * sched_taskExit() - Exit routine for scheduler task, explicit required for thread tasks
185: *
186: * @task = current task
187: * @retcode = return code
188: * return: return code
189: */
1.16 misho 190: void *
1.14 misho 191: sched_taskExit(sched_task_t *task, intptr_t retcode)
192: {
193: if (!task || !TASK_ROOT(task))
194: return (void*) -1;
195:
196: if (TASK_ROOT(task)->root_hooks.hook_exec.exit)
197: TASK_ROOT(task)->root_hooks.hook_exec.exit(task, (void*) retcode);
198:
199: TASK_ROOT(task)->root_ret = (void*) retcode;
200: return (void*) retcode;
201: }
202:
1.4 misho 203:
1.1 misho 204: /*
1.2 misho 205: * schedRead() - Add READ I/O task to scheduler queue
1.6 misho 206: *
1.1 misho 207: * @root = root task
208: * @func = task execution function
209: * @arg = 1st func argument
1.2 misho 210: * @fd = fd handle
1.5 misho 211: * @opt_data = Optional data
212: * @opt_dlen = Optional data length
1.1 misho 213: * return: NULL error or !=NULL new queued task
214: */
215: sched_task_t *
1.5 misho 216: schedRead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
217: void *opt_data, size_t opt_dlen)
1.1 misho 218: {
219: sched_task_t *task;
220: void *ptr;
221:
222: if (!root || !func)
223: return NULL;
224:
225: /* get new task */
1.13 misho 226: if (!(task = sched_useTask(root)))
1.4 misho 227: return NULL;
1.1 misho 228:
229: task->task_func = func;
1.5 misho 230: TASK_TYPE(task) = taskREAD;
231: TASK_ROOT(task) = root;
1.1 misho 232:
233: TASK_ARG(task) = arg;
1.2 misho 234: TASK_FD(task) = fd;
1.1 misho 235:
1.5 misho 236: TASK_DATA(task) = opt_data;
237: TASK_DATLEN(task) = opt_dlen;
238:
1.1 misho 239: if (root->root_hooks.hook_add.read)
240: ptr = root->root_hooks.hook_add.read(task, NULL);
241: else
242: ptr = NULL;
243:
1.5 misho 244: if (!ptr) {
245: #ifdef HAVE_LIBPTHREAD
246: pthread_mutex_lock(&root->root_mtx[taskREAD]);
247: #endif
1.9 misho 248: TAILQ_INSERT_TAIL(&root->root_read, TASK_ID(task), task_node);
1.5 misho 249: #ifdef HAVE_LIBPTHREAD
250: pthread_mutex_unlock(&root->root_mtx[taskREAD]);
251: #endif
252: } else
1.13 misho 253: task = sched_unuseTask(task);
1.1 misho 254:
255: return task;
256: }
257:
258: /*
1.2 misho 259: * schedWrite() - Add WRITE I/O task to scheduler queue
1.6 misho 260: *
1.1 misho 261: * @root = root task
262: * @func = task execution function
263: * @arg = 1st func argument
1.2 misho 264: * @fd = fd handle
1.5 misho 265: * @opt_data = Optional data
266: * @opt_dlen = Optional data length
1.1 misho 267: * return: NULL error or !=NULL new queued task
268: */
269: sched_task_t *
1.5 misho 270: schedWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
271: void *opt_data, size_t opt_dlen)
1.1 misho 272: {
273: sched_task_t *task;
274: void *ptr;
275:
276: if (!root || !func)
277: return NULL;
278:
279: /* get new task */
1.13 misho 280: if (!(task = sched_useTask(root)))
1.4 misho 281: return NULL;
1.1 misho 282:
283: task->task_func = func;
1.5 misho 284: TASK_TYPE(task) = taskWRITE;
285: TASK_ROOT(task) = root;
1.1 misho 286:
287: TASK_ARG(task) = arg;
1.2 misho 288: TASK_FD(task) = fd;
1.1 misho 289:
1.5 misho 290: TASK_DATA(task) = opt_data;
291: TASK_DATLEN(task) = opt_dlen;
292:
1.1 misho 293: if (root->root_hooks.hook_add.write)
294: ptr = root->root_hooks.hook_add.write(task, NULL);
295: else
296: ptr = NULL;
297:
1.5 misho 298: if (!ptr) {
299: #ifdef HAVE_LIBPTHREAD
300: pthread_mutex_lock(&root->root_mtx[taskWRITE]);
301: #endif
1.9 misho 302: TAILQ_INSERT_TAIL(&root->root_write, TASK_ID(task), task_node);
1.5 misho 303: #ifdef HAVE_LIBPTHREAD
304: pthread_mutex_unlock(&root->root_mtx[taskWRITE]);
305: #endif
306: } else
1.13 misho 307: task = sched_unuseTask(task);
1.1 misho 308:
309: return task;
310: }
311:
312: /*
1.9 misho 313: * schedNode() - Add NODE task to scheduler queue
314: *
315: * @root = root task
316: * @func = task execution function
317: * @arg = 1st func argument
318: * @fd = fd handle
319: * @opt_data = Optional data
320: * @opt_dlen = Optional data length
321: * return: NULL error or !=NULL new queued task
322: */
323: sched_task_t *
324: schedNode(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
325: void *opt_data, size_t opt_dlen)
326: {
327: sched_task_t *task;
328: void *ptr;
329:
330: if (!root || !func)
331: return NULL;
332:
333: /* get new task */
1.13 misho 334: if (!(task = sched_useTask(root)))
1.9 misho 335: return NULL;
336:
337: task->task_func = func;
338: TASK_TYPE(task) = taskNODE;
339: TASK_ROOT(task) = root;
340:
341: TASK_ARG(task) = arg;
342: TASK_FD(task) = fd;
343:
344: TASK_DATA(task) = opt_data;
345: TASK_DATLEN(task) = opt_dlen;
346:
347: if (root->root_hooks.hook_add.node)
348: ptr = root->root_hooks.hook_add.node(task, NULL);
349: else
350: ptr = NULL;
351:
352: if (!ptr) {
353: #ifdef HAVE_LIBPTHREAD
354: pthread_mutex_lock(&root->root_mtx[taskNODE]);
355: #endif
356: TAILQ_INSERT_TAIL(&root->root_node, TASK_ID(task), task_node);
357: #ifdef HAVE_LIBPTHREAD
358: pthread_mutex_unlock(&root->root_mtx[taskNODE]);
359: #endif
360: } else
1.13 misho 361: task = sched_unuseTask(task);
1.9 misho 362:
363: return task;
364: }
365:
366: /*
367: * schedProc() - Add PROC task to scheduler queue
368: *
369: * @root = root task
370: * @func = task execution function
371: * @arg = 1st func argument
372: * @pid = PID
373: * @opt_data = Optional data
374: * @opt_dlen = Optional data length
375: * return: NULL error or !=NULL new queued task
376: */
377: sched_task_t *
378: schedProc(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long pid,
379: void *opt_data, size_t opt_dlen)
380: {
381: sched_task_t *task;
382: void *ptr;
383:
384: if (!root || !func)
385: return NULL;
386:
387: /* get new task */
1.13 misho 388: if (!(task = sched_useTask(root)))
1.9 misho 389: return NULL;
390:
391: task->task_func = func;
392: TASK_TYPE(task) = taskPROC;
393: TASK_ROOT(task) = root;
394:
395: TASK_ARG(task) = arg;
396: TASK_VAL(task) = pid;
397:
398: TASK_DATA(task) = opt_data;
399: TASK_DATLEN(task) = opt_dlen;
400:
401: if (root->root_hooks.hook_add.proc)
402: ptr = root->root_hooks.hook_add.proc(task, NULL);
403: else
404: ptr = NULL;
405:
406: if (!ptr) {
407: #ifdef HAVE_LIBPTHREAD
408: pthread_mutex_lock(&root->root_mtx[taskPROC]);
409: #endif
410: TAILQ_INSERT_TAIL(&root->root_proc, TASK_ID(task), task_node);
411: #ifdef HAVE_LIBPTHREAD
412: pthread_mutex_unlock(&root->root_mtx[taskPROC]);
413: #endif
414: } else
1.13 misho 415: task = sched_unuseTask(task);
1.9 misho 416:
417: return task;
418: }
419:
420: /*
421: * schedUser() - Add trigger USER task to scheduler queue
422: *
423: * @root = root task
424: * @func = task execution function
425: * @arg = 1st func argument
426: * @id = Trigger ID
427: * @opt_data = Optional data
428: * @opt_dlen = Optional user's trigger flags
429: * return: NULL error or !=NULL new queued task
430: */
431: sched_task_t *
432: schedUser(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id,
433: void *opt_data, size_t opt_dlen)
434: {
435: #ifndef EVFILT_USER
436: sched_SetErr(ENOTSUP, "Not supported kevent() filter");
437: return NULL;
438: #else
439: sched_task_t *task;
440: void *ptr;
441:
442: if (!root || !func)
443: return NULL;
444:
445: /* get new task */
1.13 misho 446: if (!(task = sched_useTask(root)))
1.9 misho 447: return NULL;
448:
449: task->task_func = func;
450: TASK_TYPE(task) = taskUSER;
451: TASK_ROOT(task) = root;
452:
453: TASK_ARG(task) = arg;
454: TASK_VAL(task) = id;
455:
456: TASK_DATA(task) = opt_data;
457: TASK_DATLEN(task) = opt_dlen;
458:
459: if (root->root_hooks.hook_add.user)
460: ptr = root->root_hooks.hook_add.user(task, NULL);
461: else
462: ptr = NULL;
463:
464: if (!ptr) {
465: #ifdef HAVE_LIBPTHREAD
466: pthread_mutex_lock(&root->root_mtx[taskUSER]);
467: #endif
468: TAILQ_INSERT_TAIL(&root->root_user, TASK_ID(task), task_node);
469: #ifdef HAVE_LIBPTHREAD
470: pthread_mutex_unlock(&root->root_mtx[taskUSER]);
471: #endif
472: } else
1.13 misho 473: task = sched_unuseTask(task);
1.9 misho 474:
475: return task;
476: #endif
477: }
478:
479: /*
480: * schedSignal() - Add SIGNAL task to scheduler queue
481: *
482: * @root = root task
483: * @func = task execution function
484: * @arg = 1st func argument
485: * @sig = Signal
486: * @opt_data = Optional data
487: * @opt_dlen = Optional data length
488: * return: NULL error or !=NULL new queued task
489: */
490: sched_task_t *
491: schedSignal(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long sig,
492: void *opt_data, size_t opt_dlen)
493: {
494: sched_task_t *task;
495: void *ptr;
496:
497: if (!root || !func)
498: return NULL;
499:
500: /* get new task */
1.13 misho 501: if (!(task = sched_useTask(root)))
1.9 misho 502: return NULL;
503:
504: task->task_func = func;
505: TASK_TYPE(task) = taskSIGNAL;
506: TASK_ROOT(task) = root;
507:
508: TASK_ARG(task) = arg;
509: TASK_VAL(task) = sig;
510:
511: TASK_DATA(task) = opt_data;
512: TASK_DATLEN(task) = opt_dlen;
513:
514: if (root->root_hooks.hook_add.signal)
515: ptr = root->root_hooks.hook_add.signal(task, NULL);
516: else
517: ptr = NULL;
518:
519: if (!ptr) {
520: #ifdef HAVE_LIBPTHREAD
521: pthread_mutex_lock(&root->root_mtx[taskSIGNAL]);
522: #endif
523: TAILQ_INSERT_TAIL(&root->root_signal, TASK_ID(task), task_node);
524: #ifdef HAVE_LIBPTHREAD
525: pthread_mutex_unlock(&root->root_mtx[taskSIGNAL]);
526: #endif
527: } else
1.13 misho 528: task = sched_unuseTask(task);
1.9 misho 529:
530: return task;
531: }
532:
533: /*
1.8 misho 534: * schedAlarm() - Add ALARM task to scheduler queue
535: *
536: * @root = root task
537: * @func = task execution function
538: * @arg = 1st func argument
539: * @ts = timeout argument structure, minimum alarm timer resolution is 1msec!
1.17 misho 540: * @opt_data = Alarm timer ID
1.8 misho 541: * @opt_dlen = Optional data length
542: * return: NULL error or !=NULL new queued task
543: */
544: sched_task_t *
545: schedAlarm(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,
546: void *opt_data, size_t opt_dlen)
547: {
548: sched_task_t *task;
549: void *ptr;
550:
551: if (!root || !func)
552: return NULL;
553:
554: /* get new task */
1.13 misho 555: if (!(task = sched_useTask(root)))
1.8 misho 556: return NULL;
557:
558: task->task_func = func;
559: TASK_TYPE(task) = taskALARM;
560: TASK_ROOT(task) = root;
561:
562: TASK_ARG(task) = arg;
563: TASK_TS(task) = ts;
564:
565: TASK_DATA(task) = opt_data;
566: TASK_DATLEN(task) = opt_dlen;
567:
568: if (root->root_hooks.hook_add.alarm)
569: ptr = root->root_hooks.hook_add.alarm(task, NULL);
570: else
571: ptr = NULL;
572:
573: if (!ptr) {
574: #ifdef HAVE_LIBPTHREAD
575: pthread_mutex_lock(&root->root_mtx[taskALARM]);
576: #endif
1.9 misho 577: TAILQ_INSERT_TAIL(&root->root_alarm, TASK_ID(task), task_node);
1.8 misho 578: #ifdef HAVE_LIBPTHREAD
579: pthread_mutex_unlock(&root->root_mtx[taskALARM]);
580: #endif
581: } else
1.13 misho 582: task = sched_unuseTask(task);
1.8 misho 583:
584: return task;
585: }
586:
1.11 misho 587: #ifdef AIO_SUPPORT
588: /*
589: * schedAIO() - Add AIO task to scheduler queue
590: *
591: * @root = root task
592: * @func = task execution function
593: * @arg = 1st func argument
594: * @acb = AIO cb structure address
595: * @opt_data = Optional data
596: * @opt_dlen = Optional data length
597: * return: NULL error or !=NULL new queued task
598: */
599: sched_task_t *
600: schedAIO(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg,
601: struct aiocb * __restrict acb, void *opt_data, size_t opt_dlen)
602: {
603: sched_task_t *task;
604: void *ptr;
605:
606: if (!root || !func || !acb || !opt_dlen)
607: return NULL;
608:
609: /* get new task */
1.13 misho 610: if (!(task = sched_useTask(root)))
1.11 misho 611: return NULL;
612:
613: task->task_func = func;
614: TASK_TYPE(task) = taskAIO;
615: TASK_ROOT(task) = root;
616:
617: TASK_ARG(task) = arg;
618: TASK_VAL(task) = (u_long) acb;
619:
620: TASK_DATA(task) = opt_data;
621: TASK_DATLEN(task) = opt_dlen;
622:
623: if (root->root_hooks.hook_add.aio)
624: ptr = root->root_hooks.hook_add.aio(task, NULL);
625: else
626: ptr = NULL;
627:
628: if (!ptr) {
629: #ifdef HAVE_LIBPTHREAD
630: pthread_mutex_lock(&root->root_mtx[taskAIO]);
631: #endif
632: TAILQ_INSERT_TAIL(&root->root_aio, TASK_ID(task), task_node);
633: #ifdef HAVE_LIBPTHREAD
634: pthread_mutex_unlock(&root->root_mtx[taskAIO]);
635: #endif
636: } else
1.13 misho 637: task = sched_unuseTask(task);
1.11 misho 638:
639: return task;
640: }
641:
642: /*
643: * schedAIORead() - Add AIO read task to scheduler queue
644: *
645: * @root = root task
646: * @func = task execution function
647: * @arg = 1st func argument
648: * @fd = file descriptor
649: * @buffer = Buffer
650: * @buflen = Buffer length
651: * @offset = Offset from start of file, if =-1 from current position
652: * return: NULL error or !=NULL new queued task
653: */
1.16 misho 654: sched_task_t *
1.11 misho 655: schedAIORead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
656: void *buffer, size_t buflen, off_t offset)
657: {
658: struct aiocb *acb;
659: off_t off;
660:
661: if (!root || !func || !buffer || !buflen)
662: return NULL;
663:
664: if (offset == (off_t) -1) {
665: off = lseek(fd, 0, SEEK_CUR);
666: if (off == -1) {
667: LOGERR;
668: return NULL;
669: }
670: } else
671: off = offset;
672:
673: if (!(acb = malloc(sizeof(struct aiocb)))) {
674: LOGERR;
675: return NULL;
676: } else
677: memset(acb, 0, sizeof(struct aiocb));
678:
679: acb->aio_fildes = fd;
680: acb->aio_nbytes = buflen;
681: acb->aio_buf = buffer;
682: acb->aio_offset = off;
683: acb->aio_sigevent.sigev_notify = SIGEV_KEVENT;
684: acb->aio_sigevent.sigev_notify_kqueue = root->root_kq;
685: acb->aio_sigevent.sigev_value.sival_ptr = acb;
686:
687: if (aio_read(acb)) {
688: LOGERR;
689: free(acb);
690: return NULL;
691: }
692:
693: return schedAIO(root, func, arg, acb, buffer, buflen);
694: }
695:
696: /*
697: * schedAIOWrite() - Add AIO write task to scheduler queue
698: *
699: * @root = root task
700: * @func = task execution function
701: * @arg = 1st func argument
702: * @fd = file descriptor
703: * @buffer = Buffer
704: * @buflen = Buffer length
705: * @offset = Offset from start of file, if =-1 from current position
706: * return: NULL error or !=NULL new queued task
707: */
1.16 misho 708: sched_task_t *
1.11 misho 709: schedAIOWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
710: void *buffer, size_t buflen, off_t offset)
711: {
712: struct aiocb *acb;
713: off_t off;
714:
715: if (!root || !func || !buffer || !buflen)
716: return NULL;
717:
718: if (offset == (off_t) -1) {
719: off = lseek(fd, 0, SEEK_CUR);
720: if (off == -1) {
721: LOGERR;
722: return NULL;
723: }
724: } else
725: off = offset;
726:
727: if (!(acb = malloc(sizeof(struct aiocb)))) {
728: LOGERR;
729: return NULL;
730: } else
731: memset(acb, 0, sizeof(struct aiocb));
732:
733: acb->aio_fildes = fd;
734: acb->aio_nbytes = buflen;
735: acb->aio_buf = buffer;
736: acb->aio_offset = off;
737: acb->aio_sigevent.sigev_notify = SIGEV_KEVENT;
738: acb->aio_sigevent.sigev_notify_kqueue = root->root_kq;
739: acb->aio_sigevent.sigev_value.sival_ptr = acb;
740:
741: if (aio_write(acb)) {
742: LOGERR;
743: free(acb);
744: return NULL;
745: }
746:
747: return schedAIO(root, func, arg, acb, buffer, buflen);
748: }
749:
750: #ifdef EVFILT_LIO
751: /*
752: * schedLIO() - Add AIO bulk tasks to scheduler queue
753: *
754: * @root = root task
755: * @func = task execution function
756: * @arg = 1st func argument
757: * @acbs = AIO cb structure addresses
758: * @opt_data = Optional data
759: * @opt_dlen = Optional data length
760: * return: NULL error or !=NULL new queued task
761: */
762: sched_task_t *
763: schedLIO(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg,
764: struct aiocb ** __restrict acbs, void *opt_data, size_t opt_dlen)
765: {
766: sched_task_t *task;
767: void *ptr;
768:
769: if (!root || !func || !acbs || !opt_dlen)
770: return NULL;
771:
772: /* get new task */
1.13 misho 773: if (!(task = sched_useTask(root)))
1.11 misho 774: return NULL;
775:
776: task->task_func = func;
777: TASK_TYPE(task) = taskLIO;
778: TASK_ROOT(task) = root;
779:
780: TASK_ARG(task) = arg;
781: TASK_VAL(task) = (u_long) acbs;
782:
783: TASK_DATA(task) = opt_data;
784: TASK_DATLEN(task) = opt_dlen;
785:
786: if (root->root_hooks.hook_add.lio)
787: ptr = root->root_hooks.hook_add.lio(task, NULL);
788: else
789: ptr = NULL;
790:
791: if (!ptr) {
792: #ifdef HAVE_LIBPTHREAD
793: pthread_mutex_lock(&root->root_mtx[taskLIO]);
794: #endif
795: TAILQ_INSERT_TAIL(&root->root_lio, TASK_ID(task), task_node);
796: #ifdef HAVE_LIBPTHREAD
797: pthread_mutex_unlock(&root->root_mtx[taskLIO]);
798: #endif
799: } else
1.13 misho 800: task = sched_unuseTask(task);
1.11 misho 801:
802: return task;
803: }
804:
805: /*
806: * schedLIORead() - Add list of AIO read tasks to scheduler queue
807: *
808: * @root = root task
809: * @func = task execution function
810: * @arg = 1st func argument
811: * @fd = file descriptor
812: * @bufs = Buffer's list
813: * @nbufs = Number of Buffers
814: * @offset = Offset from start of file, if =-1 from current position
815: * return: NULL error or !=NULL new queued task
816: */
817: sched_task_t *
818: schedLIORead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
819: struct iovec *bufs, size_t nbufs, off_t offset)
820: {
821: struct sigevent sig;
822: struct aiocb **acb;
823: off_t off;
824: register int i;
825:
826: if (!root || !func || !bufs || !nbufs)
827: return NULL;
828:
829: if (offset == (off_t) -1) {
830: off = lseek(fd, 0, SEEK_CUR);
831: if (off == -1) {
832: LOGERR;
833: return NULL;
834: }
835: } else
836: off = offset;
837:
838: if (!(acb = calloc(sizeof(void*), nbufs))) {
839: LOGERR;
840: return NULL;
841: } else
842: memset(acb, 0, sizeof(void*) * nbufs);
843: for (i = 0; i < nbufs; off += bufs[i++].iov_len) {
844: acb[i] = malloc(sizeof(struct aiocb));
845: if (!acb[i]) {
846: LOGERR;
847: for (i = 0; i < nbufs; i++)
848: if (acb[i])
849: free(acb[i]);
850: free(acb);
851: return NULL;
852: } else
853: memset(acb[i], 0, sizeof(struct aiocb));
854: acb[i]->aio_fildes = fd;
855: acb[i]->aio_nbytes = bufs[i].iov_len;
856: acb[i]->aio_buf = bufs[i].iov_base;
857: acb[i]->aio_offset = off;
858: acb[i]->aio_lio_opcode = LIO_READ;
859: }
860: memset(&sig, 0, sizeof sig);
861: sig.sigev_notify = SIGEV_KEVENT;
862: sig.sigev_notify_kqueue = root->root_kq;
863: sig.sigev_value.sival_ptr = acb;
864:
865: if (lio_listio(LIO_NOWAIT, acb, nbufs, &sig)) {
866: LOGERR;
867: for (i = 0; i < nbufs; i++)
868: if (acb[i])
869: free(acb[i]);
870: free(acb);
871: return NULL;
872: }
873:
874: return schedLIO(root, func, arg, (void*) acb, bufs, nbufs);
875: }
876:
877: /*
878: * schedLIOWrite() - Add list of AIO write tasks to scheduler queue
879: *
880: * @root = root task
881: * @func = task execution function
882: * @arg = 1st func argument
883: * @fd = file descriptor
884: * @bufs = Buffer's list
885: * @nbufs = Number of Buffers
886: * @offset = Offset from start of file, if =-1 from current position
887: * return: NULL error or !=NULL new queued task
888: */
1.16 misho 889: sched_task_t *
1.11 misho 890: schedLIOWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
891: struct iovec *bufs, size_t nbufs, off_t offset)
892: {
893: struct sigevent sig;
894: struct aiocb **acb;
895: off_t off;
896: register int i;
897:
898: if (!root || !func || !bufs || !nbufs)
899: return NULL;
900:
901: if (offset == (off_t) -1) {
902: off = lseek(fd, 0, SEEK_CUR);
903: if (off == -1) {
904: LOGERR;
905: return NULL;
906: }
907: } else
908: off = offset;
909:
910: if (!(acb = calloc(sizeof(void*), nbufs))) {
911: LOGERR;
912: return NULL;
913: } else
914: memset(acb, 0, sizeof(void*) * nbufs);
915: for (i = 0; i < nbufs; off += bufs[i++].iov_len) {
916: acb[i] = malloc(sizeof(struct aiocb));
917: if (!acb[i]) {
918: LOGERR;
919: for (i = 0; i < nbufs; i++)
920: if (acb[i])
921: free(acb[i]);
922: free(acb);
923: return NULL;
924: } else
925: memset(acb[i], 0, sizeof(struct aiocb));
926: acb[i]->aio_fildes = fd;
927: acb[i]->aio_nbytes = bufs[i].iov_len;
928: acb[i]->aio_buf = bufs[i].iov_base;
929: acb[i]->aio_offset = off;
930: acb[i]->aio_lio_opcode = LIO_WRITE;
931: }
932: memset(&sig, 0, sizeof sig);
933: sig.sigev_notify = SIGEV_KEVENT;
934: sig.sigev_notify_kqueue = root->root_kq;
935: sig.sigev_value.sival_ptr = acb;
936:
937: if (lio_listio(LIO_NOWAIT, acb, nbufs, &sig)) {
938: LOGERR;
939: for (i = 0; i < nbufs; i++)
940: if (acb[i])
941: free(acb[i]);
942: free(acb);
943: return NULL;
944: }
945:
946: return schedLIO(root, func, arg, (void*) acb, bufs, nbufs);
947: }
948: #endif /* EVFILT_LIO */
949: #endif /* AIO_SUPPORT */
950:
1.8 misho 951: /*
1.1 misho 952: * schedTimer() - Add TIMER task to scheduler queue
1.6 misho 953: *
1.1 misho 954: * @root = root task
955: * @func = task execution function
956: * @arg = 1st func argument
1.5 misho 957: * @ts = timeout argument structure
958: * @opt_data = Optional data
959: * @opt_dlen = Optional data length
1.1 misho 960: * return: NULL error or !=NULL new queued task
961: */
962: sched_task_t *
1.5 misho 963: schedTimer(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,
964: void *opt_data, size_t opt_dlen)
1.1 misho 965: {
1.9 misho 966: sched_task_t *task, *tmp, *t = NULL;
1.1 misho 967: void *ptr;
1.5 misho 968: struct timespec now;
1.1 misho 969:
970: if (!root || !func)
971: return NULL;
972:
973: /* get new task */
1.13 misho 974: if (!(task = sched_useTask(root)))
1.4 misho 975: return NULL;
1.1 misho 976:
977: task->task_func = func;
1.5 misho 978: TASK_TYPE(task) = taskTIMER;
979: TASK_ROOT(task) = root;
1.1 misho 980:
981: TASK_ARG(task) = arg;
982:
1.5 misho 983: TASK_DATA(task) = opt_data;
984: TASK_DATLEN(task) = opt_dlen;
985:
1.1 misho 986: /* calculate timeval structure */
1.5 misho 987: clock_gettime(CLOCK_MONOTONIC, &now);
988: now.tv_sec += ts.tv_sec;
989: now.tv_nsec += ts.tv_nsec;
990: if (now.tv_nsec >= 1000000000L) {
1.1 misho 991: now.tv_sec++;
1.5 misho 992: now.tv_nsec -= 1000000000L;
993: } else if (now.tv_nsec < 0) {
1.1 misho 994: now.tv_sec--;
1.5 misho 995: now.tv_nsec += 1000000000L;
1.1 misho 996: }
1.5 misho 997: TASK_TS(task) = now;
1.1 misho 998:
999: if (root->root_hooks.hook_add.timer)
1000: ptr = root->root_hooks.hook_add.timer(task, NULL);
1001: else
1002: ptr = NULL;
1003:
1004: if (!ptr) {
1.5 misho 1005: #ifdef HAVE_LIBPTHREAD
1006: pthread_mutex_lock(&root->root_mtx[taskTIMER]);
1007: #endif
1.1 misho 1008: #ifdef TIMER_WITHOUT_SORT
1.9 misho 1009: TAILQ_INSERT_TAIL(&root->root_timer, TASK_ID(task), task_node);
1.1 misho 1010: #else
1.9 misho 1011: TAILQ_FOREACH_SAFE(t, &root->root_timer, task_node, tmp)
1.5 misho 1012: if (sched_timespeccmp(&TASK_TS(task), &TASK_TS(t), -) < 1)
1.1 misho 1013: break;
1014: if (!t)
1.9 misho 1015: TAILQ_INSERT_TAIL(&root->root_timer, TASK_ID(task), task_node);
1.1 misho 1016: else
1.9 misho 1017: TAILQ_INSERT_BEFORE(t, TASK_ID(task), task_node);
1.1 misho 1018: #endif
1.5 misho 1019: #ifdef HAVE_LIBPTHREAD
1020: pthread_mutex_unlock(&root->root_mtx[taskTIMER]);
1021: #endif
1.4 misho 1022: } else
1.13 misho 1023: task = sched_unuseTask(task);
1.1 misho 1024:
1025: return task;
1026: }
1027:
1028: /*
1029: * schedEvent() - Add EVENT task to scheduler queue
1.6 misho 1030: *
1.1 misho 1031: * @root = root task
1032: * @func = task execution function
1033: * @arg = 1st func argument
1.2 misho 1034: * @val = additional func argument
1.5 misho 1035: * @opt_data = Optional data
1036: * @opt_dlen = Optional data length
1.1 misho 1037: * return: NULL error or !=NULL new queued task
1038: */
1039: sched_task_t *
1.5 misho 1040: schedEvent(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val,
1041: void *opt_data, size_t opt_dlen)
1.1 misho 1042: {
1043: sched_task_t *task;
1044: void *ptr;
1045:
1046: if (!root || !func)
1047: return NULL;
1048:
1049: /* get new task */
1.13 misho 1050: if (!(task = sched_useTask(root)))
1.4 misho 1051: return NULL;
1.1 misho 1052:
1053: task->task_func = func;
1.5 misho 1054: TASK_TYPE(task) = taskEVENT;
1055: TASK_ROOT(task) = root;
1.1 misho 1056:
1057: TASK_ARG(task) = arg;
1.2 misho 1058: TASK_VAL(task) = val;
1.1 misho 1059:
1.5 misho 1060: TASK_DATA(task) = opt_data;
1061: TASK_DATLEN(task) = opt_dlen;
1062:
1.1 misho 1063: if (root->root_hooks.hook_add.event)
1064: ptr = root->root_hooks.hook_add.event(task, NULL);
1065: else
1066: ptr = NULL;
1067:
1.5 misho 1068: if (!ptr) {
1069: #ifdef HAVE_LIBPTHREAD
1070: pthread_mutex_lock(&root->root_mtx[taskEVENT]);
1071: #endif
1.9 misho 1072: TAILQ_INSERT_TAIL(&root->root_event, TASK_ID(task), task_node);
1.5 misho 1073: #ifdef HAVE_LIBPTHREAD
1074: pthread_mutex_unlock(&root->root_mtx[taskEVENT]);
1075: #endif
1076: } else
1.13 misho 1077: task = sched_unuseTask(task);
1.1 misho 1078:
1079: return task;
1080: }
1081:
1082:
1083: /*
1.12 misho 1084: * schedTask() - Add regular task to scheduler queue
1.6 misho 1085: *
1.1 misho 1086: * @root = root task
1087: * @func = task execution function
1088: * @arg = 1st func argument
1.12 misho 1089: * @prio = regular task priority, 0 is hi priority for regular tasks
1.5 misho 1090: * @opt_data = Optional data
1091: * @opt_dlen = Optional data length
1.1 misho 1092: * return: NULL error or !=NULL new queued task
1093: */
1094: sched_task_t *
1.12 misho 1095: schedTask(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long prio,
1.5 misho 1096: void *opt_data, size_t opt_dlen)
1.1 misho 1097: {
1.12 misho 1098: sched_task_t *task, *tmp, *t = NULL;
1.1 misho 1099: void *ptr;
1100:
1101: if (!root || !func)
1102: return NULL;
1103:
1104: /* get new task */
1.13 misho 1105: if (!(task = sched_useTask(root)))
1.4 misho 1106: return NULL;
1.1 misho 1107:
1108: task->task_func = func;
1.12 misho 1109: TASK_TYPE(task) = taskTASK;
1.5 misho 1110: TASK_ROOT(task) = root;
1.1 misho 1111:
1112: TASK_ARG(task) = arg;
1.12 misho 1113: TASK_VAL(task) = prio;
1.1 misho 1114:
1.5 misho 1115: TASK_DATA(task) = opt_data;
1116: TASK_DATLEN(task) = opt_dlen;
1117:
1.12 misho 1118: if (root->root_hooks.hook_add.task)
1119: ptr = root->root_hooks.hook_add.task(task, NULL);
1.1 misho 1120: else
1121: ptr = NULL;
1122:
1.5 misho 1123: if (!ptr) {
1124: #ifdef HAVE_LIBPTHREAD
1.12 misho 1125: pthread_mutex_lock(&root->root_mtx[taskTASK]);
1.5 misho 1126: #endif
1.12 misho 1127: TAILQ_FOREACH_SAFE(t, &root->root_task, task_node, tmp)
1128: if (TASK_VAL(task) < TASK_VAL(t))
1129: break;
1130: if (!t)
1131: TAILQ_INSERT_TAIL(&root->root_task, TASK_ID(task), task_node);
1132: else
1133: TAILQ_INSERT_BEFORE(t, TASK_ID(task), task_node);
1.5 misho 1134: #ifdef HAVE_LIBPTHREAD
1.12 misho 1135: pthread_mutex_unlock(&root->root_mtx[taskTASK]);
1.5 misho 1136: #endif
1137: } else
1.13 misho 1138: task = sched_unuseTask(task);
1.1 misho 1139:
1140: return task;
1141: }
1142:
1143: /*
1.10 misho 1144: * schedSuspend() - Add Suspended task to scheduler queue
1145: *
1146: * @root = root task
1147: * @func = task execution function
1148: * @arg = 1st func argument
1149: * @id = Trigger ID
1150: * @opt_data = Optional data
1151: * @opt_dlen = Optional data length
1152: * return: NULL error or !=NULL new queued task
1153: */
1154: sched_task_t *
1155: schedSuspend(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id,
1156: void *opt_data, size_t opt_dlen)
1157: {
1158: sched_task_t *task;
1159: void *ptr;
1160:
1161: if (!root || !func)
1162: return NULL;
1163:
1164: /* get new task */
1.13 misho 1165: if (!(task = sched_useTask(root)))
1.10 misho 1166: return NULL;
1167:
1168: task->task_func = func;
1169: TASK_TYPE(task) = taskSUSPEND;
1170: TASK_ROOT(task) = root;
1171:
1172: TASK_ARG(task) = arg;
1173: TASK_VAL(task) = id;
1174:
1175: TASK_DATA(task) = opt_data;
1176: TASK_DATLEN(task) = opt_dlen;
1177:
1178: if (root->root_hooks.hook_add.suspend)
1179: ptr = root->root_hooks.hook_add.suspend(task, NULL);
1180: else
1181: ptr = NULL;
1182:
1183: if (!ptr) {
1184: #ifdef HAVE_LIBPTHREAD
1185: pthread_mutex_lock(&root->root_mtx[taskSUSPEND]);
1186: #endif
1187: TAILQ_INSERT_TAIL(&root->root_suspend, TASK_ID(task), task_node);
1188: #ifdef HAVE_LIBPTHREAD
1189: pthread_mutex_unlock(&root->root_mtx[taskSUSPEND]);
1190: #endif
1191: } else
1.13 misho 1192: task = sched_unuseTask(task);
1.10 misho 1193:
1194: return task;
1195: }
1196:
1197: /*
1.1 misho 1198: * schedCallOnce() - Call once from scheduler
1.6 misho 1199: *
1.1 misho 1200: * @root = root task
1201: * @func = task execution function
1202: * @arg = 1st func argument
1.2 misho 1203: * @val = additional func argument
1.5 misho 1204: * @opt_data = Optional data
1205: * @opt_dlen = Optional data length
1.2 misho 1206: * return: return value from called func
1.1 misho 1207: */
1208: sched_task_t *
1.5 misho 1209: schedCallOnce(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val,
1210: void *opt_data, size_t opt_dlen)
1.1 misho 1211: {
1212: sched_task_t *task;
1.2 misho 1213: void *ret;
1.1 misho 1214:
1215: if (!root || !func)
1216: return NULL;
1217:
1218: /* get new task */
1.13 misho 1219: if (!(task = sched_useTask(root)))
1.4 misho 1220: return NULL;
1.1 misho 1221:
1222: task->task_func = func;
1.5 misho 1223: TASK_TYPE(task) = taskEVENT;
1224: TASK_ROOT(task) = root;
1.1 misho 1225:
1226: TASK_ARG(task) = arg;
1.2 misho 1227: TASK_VAL(task) = val;
1.1 misho 1228:
1.5 misho 1229: TASK_DATA(task) = opt_data;
1230: TASK_DATLEN(task) = opt_dlen;
1231:
1.2 misho 1232: ret = schedCall(task);
1.1 misho 1233:
1.13 misho 1234: sched_unuseTask(task);
1.2 misho 1235: return ret;
1.1 misho 1236: }
1.13 misho 1237:
1238: /*
1239: * schedThread() - Add thread task to scheduler queue
1240: *
1241: * @root = root task
1242: * @func = task execution function
1243: * @arg = 1st func argument
1.15 misho 1244: * @ss = stack size
1.13 misho 1245: * @opt_data = Optional data
1246: * @opt_dlen = Optional data length
1247: * return: NULL error or !=NULL new queued task
1248: */
1249: sched_task_t *
1.21 misho 1250: schedThread(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg,
1.15 misho 1251: size_t ss, void *opt_data, size_t opt_dlen)
1.13 misho 1252: {
1253: #ifndef HAVE_LIBPTHREAD
1254: sched_SetErr(ENOTSUP, "Not supported thread tasks");
1255: return NULL;
1256: #endif
1257: sched_task_t *task;
1258: pthread_attr_t attr;
1259:
1260: if (!root || !func)
1261: return NULL;
1262:
1263: /* get new task */
1.15 misho 1264: if (!(task = sched_useTask(root))) {
1265:
1.13 misho 1266: return NULL;
1.15 misho 1267: }
1.13 misho 1268:
1269: task->task_func = func;
1270: TASK_TYPE(task) = taskTHREAD;
1271: TASK_ROOT(task) = root;
1272:
1273: TASK_ARG(task) = arg;
1274:
1275: TASK_DATA(task) = opt_data;
1276: TASK_DATLEN(task) = opt_dlen;
1277:
1278: pthread_attr_init(&attr);
1.21 misho 1279: pthread_attr_setdetachstate(&attr, PTHREAD_DETACHED);
1.15 misho 1280: if (ss && (errno = pthread_attr_setstacksize(&attr, ss))) {
1281: LOGERR;
1282: pthread_attr_destroy(&attr);
1283: return sched_unuseTask(task);
1284: }
1285: if ((errno = pthread_attr_getstacksize(&attr, &ss))) {
1286: LOGERR;
1287: pthread_attr_destroy(&attr);
1288: return sched_unuseTask(task);
1289: } else
1.21 misho 1290: TASK_FLAG(task) = ss;
1.15 misho 1291: if ((errno = pthread_attr_setguardsize(&attr, ss))) {
1292: LOGERR;
1293: pthread_attr_destroy(&attr);
1294: return sched_unuseTask(task);
1295: }
1296: #ifdef SCHED_RR
1297: pthread_attr_setschedpolicy(&attr, SCHED_RR);
1298: #else
1299: pthread_attr_setschedpolicy(&attr, SCHED_OTHER);
1300: #endif
1.21 misho 1301:
1302: pthread_mutex_lock(&root->root_mtx[taskTHREAD]);
1303: TAILQ_INSERT_TAIL(&root->root_thread, TASK_ID(task), task_node);
1304: pthread_mutex_unlock(&root->root_mtx[taskTHREAD]);
1305:
1.13 misho 1306: if (root->root_hooks.hook_add.thread)
1.21 misho 1307: if (root->root_hooks.hook_add.thread(task, &attr)) {
1308: schedCancel(task);
1309: task = NULL;
1310: }
1.13 misho 1311: pthread_attr_destroy(&attr);
1312: return task;
1313: }
1314:
1.17 misho 1315: /*
1316: * schedRTC() - Add RTC task to scheduler queue
1317: *
1318: * @root = root task
1319: * @func = task execution function
1320: * @arg = 1st func argument
1321: * @ts = timeout argument structure, minimum alarm timer resolution is 1msec!
1322: * @opt_data = Optional RTC ID
1.18 misho 1323: * @opt_dlen = Optional data length
1.17 misho 1324: * return: NULL error or !=NULL new queued task
1325: */
1326: sched_task_t *
1327: schedRTC(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,
1328: void *opt_data, size_t opt_dlen)
1329: {
1330: #if defined(HAVE_TIMER_CREATE) && defined(HAVE_TIMER_SETTIME)
1331: sched_task_t *task;
1332: void *ptr;
1333:
1334: if (!root || !func)
1335: return NULL;
1336:
1337: /* get new task */
1338: if (!(task = sched_useTask(root)))
1339: return NULL;
1340:
1341: task->task_func = func;
1342: TASK_TYPE(task) = taskRTC;
1343: TASK_ROOT(task) = root;
1344:
1345: TASK_ARG(task) = arg;
1346: TASK_TS(task) = ts;
1347:
1348: TASK_DATA(task) = opt_data;
1349: TASK_DATLEN(task) = opt_dlen;
1350:
1351: if (root->root_hooks.hook_add.rtc)
1352: ptr = root->root_hooks.hook_add.rtc(task, NULL);
1353: else
1354: ptr = NULL;
1355:
1356: if (!ptr) {
1357: #ifdef HAVE_LIBPTHREAD
1358: pthread_mutex_lock(&root->root_mtx[taskRTC]);
1359: #endif
1360: TAILQ_INSERT_TAIL(&root->root_rtc, TASK_ID(task), task_node);
1361: #ifdef HAVE_LIBPTHREAD
1362: pthread_mutex_unlock(&root->root_mtx[taskRTC]);
1363: #endif
1364: } else
1365: task = sched_unuseTask(task);
1366:
1367: return task;
1368: #else
1369: sched_SetErr(ENOTSUP, "Not supported realtime clock extensions");
1370: return NULL;
1371: #endif
1372: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>