1: /*************************************************************************
2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
6: * $Id: tasks.c,v 1.18.2.1 2013/08/26 07:30:07 misho Exp $
7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013
16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47:
48:
49: /*
50: * sched_useTask() - Get and init new task
51: *
52: * @root = root task
53: * return: NULL error or !=NULL prepared task
54: */
55: sched_task_t *
56: sched_useTask(sched_root_task_t * __restrict root)
57: {
58: sched_task_t *task, *tmp;
59:
60: #ifdef HAVE_LIBPTHREAD
61: pthread_mutex_lock(&root->root_mtx[taskUNUSE]);
62: #endif
63: TAILQ_FOREACH_SAFE(task, &root->root_unuse, task_node, tmp) {
64: if (!TASK_ISLOCKED(task)) {
65: TAILQ_REMOVE(&root->root_unuse, task, task_node);
66: break;
67: }
68: }
69: #ifdef HAVE_LIBPTHREAD
70: pthread_mutex_unlock(&root->root_mtx[taskUNUSE]);
71: #endif
72:
73: if (!task) {
74: task = malloc(sizeof(sched_task_t));
75: if (!task) {
76: LOGERR;
77: return NULL;
78: }
79: }
80:
81: memset(task, 0, sizeof(sched_task_t));
82: task->task_id = (uintptr_t) task;
83: return task;
84: }
85:
86: /*
87: * sched_unuseTask() - Unlock and put task to unuse queue
88: *
89: * @task = task
90: * return: always is NULL
91: */
92: sched_task_t *
93: sched_unuseTask(sched_task_t * __restrict task)
94: {
95: TASK_UNLOCK(task);
96: TASK_TYPE(task) = taskUNUSE;
97: #ifdef HAVE_LIBPTHREAD
98: pthread_mutex_lock(&TASK_ROOT(task)->root_mtx[taskUNUSE]);
99: #endif
100: TAILQ_INSERT_TAIL(&TASK_ROOT(task)->root_unuse, TASK_ID(task), task_node);
101: #ifdef HAVE_LIBPTHREAD
102: pthread_mutex_unlock(&TASK_ROOT(task)->root_mtx[taskUNUSE]);
103: #endif
104: task = NULL;
105:
106: return task;
107: }
108:
109: #pragma GCC visibility push(hidden)
110:
111: #ifdef HAVE_LIBPTHREAD
112: static void
113: _sched_threadCleanup(sched_task_t *t)
114: {
115: if (!t || !TASK_ROOT(t))
116: return;
117:
118: if (TASK_FLAG(t) == PTHREAD_CREATE_JOINABLE)
119: pthread_detach(pthread_self());
120:
121: pthread_mutex_lock(&TASK_ROOT(t)->root_mtx[taskTHREAD]);
122: TAILQ_REMOVE(&TASK_ROOT(t)->root_thread, t, task_node);
123: pthread_mutex_unlock(&TASK_ROOT(t)->root_mtx[taskTHREAD]);
124:
125: sched_unuseTask(t);
126: }
127: void *
128: _sched_threadWrapper(sched_task_t *t)
129: {
130: void *ret = NULL;
131: sem_t *s = NULL;
132:
133: if (!t || !TASK_ROOT(t) || !TASK_RET(t))
134: pthread_exit(ret);
135: else
136: s = (sem_t*) TASK_RET(t);
137:
138: pthread_cleanup_push((void (*)(void*)) _sched_threadCleanup, t);
139:
140: pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
141: pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
142:
143: /* notify parent, thread is ready for execution */
144: sem_post(s);
145: pthread_testcancel();
146:
147: ret = TASK_FUNC(t)(t);
148:
149: pthread_cleanup_pop(42);
150: TASK_ROOT(t)->root_ret = ret;
151: pthread_exit(ret);
152: }
153: #endif
154:
155: #if defined(HAVE_TIMER_CREATE) && defined(HAVE_TIMER_SETTIME)
156: void *
157: _sched_rtcWrapper(sched_task_t *t)
158: {
159: void *ret = NULL;
160: sched_task_func_t func;
161:
162: if (!t || !TASK_ROOT(t))
163: return NULL;
164: else
165: func = TASK_FUNC(t);
166:
167: ret = func(t);
168: timer_delete((timer_t) TASK_DATLEN(t));
169: return ret;
170: }
171: #endif
172:
173: #pragma GCC visibility pop
174:
175: /*
176: * sched_taskExit() - Exit routine for scheduler task, explicit required for thread tasks
177: *
178: * @task = current task
179: * @retcode = return code
180: * return: return code
181: */
182: void *
183: sched_taskExit(sched_task_t *task, intptr_t retcode)
184: {
185: if (!task || !TASK_ROOT(task))
186: return (void*) -1;
187:
188: if (TASK_ROOT(task)->root_hooks.hook_exec.exit)
189: TASK_ROOT(task)->root_hooks.hook_exec.exit(task, (void*) retcode);
190:
191: TASK_ROOT(task)->root_ret = (void*) retcode;
192: return (void*) retcode;
193: }
194:
195:
196: /*
197: * schedRead() - Add READ I/O task to scheduler queue
198: *
199: * @root = root task
200: * @func = task execution function
201: * @arg = 1st func argument
202: * @fd = fd handle
203: * @opt_data = Optional data
204: * @opt_dlen = Optional data length
205: * return: NULL error or !=NULL new queued task
206: */
207: sched_task_t *
208: schedRead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
209: void *opt_data, size_t opt_dlen)
210: {
211: sched_task_t *task;
212: void *ptr;
213:
214: if (!root || !func)
215: return NULL;
216:
217: /* get new task */
218: if (!(task = sched_useTask(root)))
219: return NULL;
220:
221: task->task_func = func;
222: TASK_TYPE(task) = taskREAD;
223: TASK_ROOT(task) = root;
224:
225: TASK_ARG(task) = arg;
226: TASK_FD(task) = fd;
227:
228: TASK_DATA(task) = opt_data;
229: TASK_DATLEN(task) = opt_dlen;
230:
231: if (root->root_hooks.hook_add.read)
232: ptr = root->root_hooks.hook_add.read(task, NULL);
233: else
234: ptr = NULL;
235:
236: if (!ptr) {
237: #ifdef HAVE_LIBPTHREAD
238: pthread_mutex_lock(&root->root_mtx[taskREAD]);
239: #endif
240: TAILQ_INSERT_TAIL(&root->root_read, TASK_ID(task), task_node);
241: #ifdef HAVE_LIBPTHREAD
242: pthread_mutex_unlock(&root->root_mtx[taskREAD]);
243: #endif
244: } else
245: task = sched_unuseTask(task);
246:
247: return task;
248: }
249:
250: /*
251: * schedWrite() - Add WRITE I/O task to scheduler queue
252: *
253: * @root = root task
254: * @func = task execution function
255: * @arg = 1st func argument
256: * @fd = fd handle
257: * @opt_data = Optional data
258: * @opt_dlen = Optional data length
259: * return: NULL error or !=NULL new queued task
260: */
261: sched_task_t *
262: schedWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
263: void *opt_data, size_t opt_dlen)
264: {
265: sched_task_t *task;
266: void *ptr;
267:
268: if (!root || !func)
269: return NULL;
270:
271: /* get new task */
272: if (!(task = sched_useTask(root)))
273: return NULL;
274:
275: task->task_func = func;
276: TASK_TYPE(task) = taskWRITE;
277: TASK_ROOT(task) = root;
278:
279: TASK_ARG(task) = arg;
280: TASK_FD(task) = fd;
281:
282: TASK_DATA(task) = opt_data;
283: TASK_DATLEN(task) = opt_dlen;
284:
285: if (root->root_hooks.hook_add.write)
286: ptr = root->root_hooks.hook_add.write(task, NULL);
287: else
288: ptr = NULL;
289:
290: if (!ptr) {
291: #ifdef HAVE_LIBPTHREAD
292: pthread_mutex_lock(&root->root_mtx[taskWRITE]);
293: #endif
294: TAILQ_INSERT_TAIL(&root->root_write, TASK_ID(task), task_node);
295: #ifdef HAVE_LIBPTHREAD
296: pthread_mutex_unlock(&root->root_mtx[taskWRITE]);
297: #endif
298: } else
299: task = sched_unuseTask(task);
300:
301: return task;
302: }
303:
304: /*
305: * schedNode() - Add NODE task to scheduler queue
306: *
307: * @root = root task
308: * @func = task execution function
309: * @arg = 1st func argument
310: * @fd = fd handle
311: * @opt_data = Optional data
312: * @opt_dlen = Optional data length
313: * return: NULL error or !=NULL new queued task
314: */
315: sched_task_t *
316: schedNode(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
317: void *opt_data, size_t opt_dlen)
318: {
319: sched_task_t *task;
320: void *ptr;
321:
322: if (!root || !func)
323: return NULL;
324:
325: /* get new task */
326: if (!(task = sched_useTask(root)))
327: return NULL;
328:
329: task->task_func = func;
330: TASK_TYPE(task) = taskNODE;
331: TASK_ROOT(task) = root;
332:
333: TASK_ARG(task) = arg;
334: TASK_FD(task) = fd;
335:
336: TASK_DATA(task) = opt_data;
337: TASK_DATLEN(task) = opt_dlen;
338:
339: if (root->root_hooks.hook_add.node)
340: ptr = root->root_hooks.hook_add.node(task, NULL);
341: else
342: ptr = NULL;
343:
344: if (!ptr) {
345: #ifdef HAVE_LIBPTHREAD
346: pthread_mutex_lock(&root->root_mtx[taskNODE]);
347: #endif
348: TAILQ_INSERT_TAIL(&root->root_node, TASK_ID(task), task_node);
349: #ifdef HAVE_LIBPTHREAD
350: pthread_mutex_unlock(&root->root_mtx[taskNODE]);
351: #endif
352: } else
353: task = sched_unuseTask(task);
354:
355: return task;
356: }
357:
358: /*
359: * schedProc() - Add PROC task to scheduler queue
360: *
361: * @root = root task
362: * @func = task execution function
363: * @arg = 1st func argument
364: * @pid = PID
365: * @opt_data = Optional data
366: * @opt_dlen = Optional data length
367: * return: NULL error or !=NULL new queued task
368: */
369: sched_task_t *
370: schedProc(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long pid,
371: void *opt_data, size_t opt_dlen)
372: {
373: sched_task_t *task;
374: void *ptr;
375:
376: if (!root || !func)
377: return NULL;
378:
379: /* get new task */
380: if (!(task = sched_useTask(root)))
381: return NULL;
382:
383: task->task_func = func;
384: TASK_TYPE(task) = taskPROC;
385: TASK_ROOT(task) = root;
386:
387: TASK_ARG(task) = arg;
388: TASK_VAL(task) = pid;
389:
390: TASK_DATA(task) = opt_data;
391: TASK_DATLEN(task) = opt_dlen;
392:
393: if (root->root_hooks.hook_add.proc)
394: ptr = root->root_hooks.hook_add.proc(task, NULL);
395: else
396: ptr = NULL;
397:
398: if (!ptr) {
399: #ifdef HAVE_LIBPTHREAD
400: pthread_mutex_lock(&root->root_mtx[taskPROC]);
401: #endif
402: TAILQ_INSERT_TAIL(&root->root_proc, TASK_ID(task), task_node);
403: #ifdef HAVE_LIBPTHREAD
404: pthread_mutex_unlock(&root->root_mtx[taskPROC]);
405: #endif
406: } else
407: task = sched_unuseTask(task);
408:
409: return task;
410: }
411:
412: /*
413: * schedUser() - Add trigger USER task to scheduler queue
414: *
415: * @root = root task
416: * @func = task execution function
417: * @arg = 1st func argument
418: * @id = Trigger ID
419: * @opt_data = Optional data
420: * @opt_dlen = Optional user's trigger flags
421: * return: NULL error or !=NULL new queued task
422: */
423: sched_task_t *
424: schedUser(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id,
425: void *opt_data, size_t opt_dlen)
426: {
427: #ifndef EVFILT_USER
428: sched_SetErr(ENOTSUP, "Not supported kevent() filter");
429: return NULL;
430: #else
431: sched_task_t *task;
432: void *ptr;
433:
434: if (!root || !func)
435: return NULL;
436:
437: /* get new task */
438: if (!(task = sched_useTask(root)))
439: return NULL;
440:
441: task->task_func = func;
442: TASK_TYPE(task) = taskUSER;
443: TASK_ROOT(task) = root;
444:
445: TASK_ARG(task) = arg;
446: TASK_VAL(task) = id;
447:
448: TASK_DATA(task) = opt_data;
449: TASK_DATLEN(task) = opt_dlen;
450:
451: if (root->root_hooks.hook_add.user)
452: ptr = root->root_hooks.hook_add.user(task, NULL);
453: else
454: ptr = NULL;
455:
456: if (!ptr) {
457: #ifdef HAVE_LIBPTHREAD
458: pthread_mutex_lock(&root->root_mtx[taskUSER]);
459: #endif
460: TAILQ_INSERT_TAIL(&root->root_user, TASK_ID(task), task_node);
461: #ifdef HAVE_LIBPTHREAD
462: pthread_mutex_unlock(&root->root_mtx[taskUSER]);
463: #endif
464: } else
465: task = sched_unuseTask(task);
466:
467: return task;
468: #endif
469: }
470:
471: /*
472: * schedSignal() - Add SIGNAL task to scheduler queue
473: *
474: * @root = root task
475: * @func = task execution function
476: * @arg = 1st func argument
477: * @sig = Signal
478: * @opt_data = Optional data
479: * @opt_dlen = Optional data length
480: * return: NULL error or !=NULL new queued task
481: */
482: sched_task_t *
483: schedSignal(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long sig,
484: void *opt_data, size_t opt_dlen)
485: {
486: sched_task_t *task;
487: void *ptr;
488:
489: if (!root || !func)
490: return NULL;
491:
492: /* get new task */
493: if (!(task = sched_useTask(root)))
494: return NULL;
495:
496: task->task_func = func;
497: TASK_TYPE(task) = taskSIGNAL;
498: TASK_ROOT(task) = root;
499:
500: TASK_ARG(task) = arg;
501: TASK_VAL(task) = sig;
502:
503: TASK_DATA(task) = opt_data;
504: TASK_DATLEN(task) = opt_dlen;
505:
506: if (root->root_hooks.hook_add.signal)
507: ptr = root->root_hooks.hook_add.signal(task, NULL);
508: else
509: ptr = NULL;
510:
511: if (!ptr) {
512: #ifdef HAVE_LIBPTHREAD
513: pthread_mutex_lock(&root->root_mtx[taskSIGNAL]);
514: #endif
515: TAILQ_INSERT_TAIL(&root->root_signal, TASK_ID(task), task_node);
516: #ifdef HAVE_LIBPTHREAD
517: pthread_mutex_unlock(&root->root_mtx[taskSIGNAL]);
518: #endif
519: } else
520: task = sched_unuseTask(task);
521:
522: return task;
523: }
524:
525: /*
526: * schedAlarm() - Add ALARM task to scheduler queue
527: *
528: * @root = root task
529: * @func = task execution function
530: * @arg = 1st func argument
531: * @ts = timeout argument structure, minimum alarm timer resolution is 1msec!
532: * @opt_data = Alarm timer ID
533: * @opt_dlen = Optional data length
534: * return: NULL error or !=NULL new queued task
535: */
536: sched_task_t *
537: schedAlarm(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,
538: void *opt_data, size_t opt_dlen)
539: {
540: sched_task_t *task;
541: void *ptr;
542:
543: if (!root || !func)
544: return NULL;
545:
546: /* get new task */
547: if (!(task = sched_useTask(root)))
548: return NULL;
549:
550: task->task_func = func;
551: TASK_TYPE(task) = taskALARM;
552: TASK_ROOT(task) = root;
553:
554: TASK_ARG(task) = arg;
555: TASK_TS(task) = ts;
556:
557: TASK_DATA(task) = opt_data;
558: TASK_DATLEN(task) = opt_dlen;
559:
560: if (root->root_hooks.hook_add.alarm)
561: ptr = root->root_hooks.hook_add.alarm(task, NULL);
562: else
563: ptr = NULL;
564:
565: if (!ptr) {
566: #ifdef HAVE_LIBPTHREAD
567: pthread_mutex_lock(&root->root_mtx[taskALARM]);
568: #endif
569: TAILQ_INSERT_TAIL(&root->root_alarm, TASK_ID(task), task_node);
570: #ifdef HAVE_LIBPTHREAD
571: pthread_mutex_unlock(&root->root_mtx[taskALARM]);
572: #endif
573: } else
574: task = sched_unuseTask(task);
575:
576: return task;
577: }
578:
579: #ifdef AIO_SUPPORT
580: /*
581: * schedAIO() - Add AIO task to scheduler queue
582: *
583: * @root = root task
584: * @func = task execution function
585: * @arg = 1st func argument
586: * @acb = AIO cb structure address
587: * @opt_data = Optional data
588: * @opt_dlen = Optional data length
589: * return: NULL error or !=NULL new queued task
590: */
591: sched_task_t *
592: schedAIO(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg,
593: struct aiocb * __restrict acb, void *opt_data, size_t opt_dlen)
594: {
595: sched_task_t *task;
596: void *ptr;
597:
598: if (!root || !func || !acb || !opt_dlen)
599: return NULL;
600:
601: /* get new task */
602: if (!(task = sched_useTask(root)))
603: return NULL;
604:
605: task->task_func = func;
606: TASK_TYPE(task) = taskAIO;
607: TASK_ROOT(task) = root;
608:
609: TASK_ARG(task) = arg;
610: TASK_VAL(task) = (u_long) acb;
611:
612: TASK_DATA(task) = opt_data;
613: TASK_DATLEN(task) = opt_dlen;
614:
615: if (root->root_hooks.hook_add.aio)
616: ptr = root->root_hooks.hook_add.aio(task, NULL);
617: else
618: ptr = NULL;
619:
620: if (!ptr) {
621: #ifdef HAVE_LIBPTHREAD
622: pthread_mutex_lock(&root->root_mtx[taskAIO]);
623: #endif
624: TAILQ_INSERT_TAIL(&root->root_aio, TASK_ID(task), task_node);
625: #ifdef HAVE_LIBPTHREAD
626: pthread_mutex_unlock(&root->root_mtx[taskAIO]);
627: #endif
628: } else
629: task = sched_unuseTask(task);
630:
631: return task;
632: }
633:
634: /*
635: * schedAIORead() - Add AIO read task to scheduler queue
636: *
637: * @root = root task
638: * @func = task execution function
639: * @arg = 1st func argument
640: * @fd = file descriptor
641: * @buffer = Buffer
642: * @buflen = Buffer length
643: * @offset = Offset from start of file, if =-1 from current position
644: * return: NULL error or !=NULL new queued task
645: */
646: sched_task_t *
647: schedAIORead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
648: void *buffer, size_t buflen, off_t offset)
649: {
650: struct aiocb *acb;
651: off_t off;
652:
653: if (!root || !func || !buffer || !buflen)
654: return NULL;
655:
656: if (offset == (off_t) -1) {
657: off = lseek(fd, 0, SEEK_CUR);
658: if (off == -1) {
659: LOGERR;
660: return NULL;
661: }
662: } else
663: off = offset;
664:
665: if (!(acb = malloc(sizeof(struct aiocb)))) {
666: LOGERR;
667: return NULL;
668: } else
669: memset(acb, 0, sizeof(struct aiocb));
670:
671: acb->aio_fildes = fd;
672: acb->aio_nbytes = buflen;
673: acb->aio_buf = buffer;
674: acb->aio_offset = off;
675: acb->aio_sigevent.sigev_notify = SIGEV_KEVENT;
676: acb->aio_sigevent.sigev_notify_kqueue = root->root_kq;
677: acb->aio_sigevent.sigev_value.sival_ptr = acb;
678:
679: if (aio_read(acb)) {
680: LOGERR;
681: free(acb);
682: return NULL;
683: }
684:
685: return schedAIO(root, func, arg, acb, buffer, buflen);
686: }
687:
688: /*
689: * schedAIOWrite() - Add AIO write task to scheduler queue
690: *
691: * @root = root task
692: * @func = task execution function
693: * @arg = 1st func argument
694: * @fd = file descriptor
695: * @buffer = Buffer
696: * @buflen = Buffer length
697: * @offset = Offset from start of file, if =-1 from current position
698: * return: NULL error or !=NULL new queued task
699: */
700: sched_task_t *
701: schedAIOWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
702: void *buffer, size_t buflen, off_t offset)
703: {
704: struct aiocb *acb;
705: off_t off;
706:
707: if (!root || !func || !buffer || !buflen)
708: return NULL;
709:
710: if (offset == (off_t) -1) {
711: off = lseek(fd, 0, SEEK_CUR);
712: if (off == -1) {
713: LOGERR;
714: return NULL;
715: }
716: } else
717: off = offset;
718:
719: if (!(acb = malloc(sizeof(struct aiocb)))) {
720: LOGERR;
721: return NULL;
722: } else
723: memset(acb, 0, sizeof(struct aiocb));
724:
725: acb->aio_fildes = fd;
726: acb->aio_nbytes = buflen;
727: acb->aio_buf = buffer;
728: acb->aio_offset = off;
729: acb->aio_sigevent.sigev_notify = SIGEV_KEVENT;
730: acb->aio_sigevent.sigev_notify_kqueue = root->root_kq;
731: acb->aio_sigevent.sigev_value.sival_ptr = acb;
732:
733: if (aio_write(acb)) {
734: LOGERR;
735: free(acb);
736: return NULL;
737: }
738:
739: return schedAIO(root, func, arg, acb, buffer, buflen);
740: }
741:
742: #ifdef EVFILT_LIO
743: /*
744: * schedLIO() - Add AIO bulk tasks to scheduler queue
745: *
746: * @root = root task
747: * @func = task execution function
748: * @arg = 1st func argument
749: * @acbs = AIO cb structure addresses
750: * @opt_data = Optional data
751: * @opt_dlen = Optional data length
752: * return: NULL error or !=NULL new queued task
753: */
754: sched_task_t *
755: schedLIO(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg,
756: struct aiocb ** __restrict acbs, void *opt_data, size_t opt_dlen)
757: {
758: sched_task_t *task;
759: void *ptr;
760:
761: if (!root || !func || !acbs || !opt_dlen)
762: return NULL;
763:
764: /* get new task */
765: if (!(task = sched_useTask(root)))
766: return NULL;
767:
768: task->task_func = func;
769: TASK_TYPE(task) = taskLIO;
770: TASK_ROOT(task) = root;
771:
772: TASK_ARG(task) = arg;
773: TASK_VAL(task) = (u_long) acbs;
774:
775: TASK_DATA(task) = opt_data;
776: TASK_DATLEN(task) = opt_dlen;
777:
778: if (root->root_hooks.hook_add.lio)
779: ptr = root->root_hooks.hook_add.lio(task, NULL);
780: else
781: ptr = NULL;
782:
783: if (!ptr) {
784: #ifdef HAVE_LIBPTHREAD
785: pthread_mutex_lock(&root->root_mtx[taskLIO]);
786: #endif
787: TAILQ_INSERT_TAIL(&root->root_lio, TASK_ID(task), task_node);
788: #ifdef HAVE_LIBPTHREAD
789: pthread_mutex_unlock(&root->root_mtx[taskLIO]);
790: #endif
791: } else
792: task = sched_unuseTask(task);
793:
794: return task;
795: }
796:
797: /*
798: * schedLIORead() - Add list of AIO read tasks to scheduler queue
799: *
800: * @root = root task
801: * @func = task execution function
802: * @arg = 1st func argument
803: * @fd = file descriptor
804: * @bufs = Buffer's list
805: * @nbufs = Number of Buffers
806: * @offset = Offset from start of file, if =-1 from current position
807: * return: NULL error or !=NULL new queued task
808: */
809: sched_task_t *
810: schedLIORead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
811: struct iovec *bufs, size_t nbufs, off_t offset)
812: {
813: struct sigevent sig;
814: struct aiocb **acb;
815: off_t off;
816: register int i;
817:
818: if (!root || !func || !bufs || !nbufs)
819: return NULL;
820:
821: if (offset == (off_t) -1) {
822: off = lseek(fd, 0, SEEK_CUR);
823: if (off == -1) {
824: LOGERR;
825: return NULL;
826: }
827: } else
828: off = offset;
829:
830: if (!(acb = calloc(sizeof(void*), nbufs))) {
831: LOGERR;
832: return NULL;
833: } else
834: memset(acb, 0, sizeof(void*) * nbufs);
835: for (i = 0; i < nbufs; off += bufs[i++].iov_len) {
836: acb[i] = malloc(sizeof(struct aiocb));
837: if (!acb[i]) {
838: LOGERR;
839: for (i = 0; i < nbufs; i++)
840: if (acb[i])
841: free(acb[i]);
842: free(acb);
843: return NULL;
844: } else
845: memset(acb[i], 0, sizeof(struct aiocb));
846: acb[i]->aio_fildes = fd;
847: acb[i]->aio_nbytes = bufs[i].iov_len;
848: acb[i]->aio_buf = bufs[i].iov_base;
849: acb[i]->aio_offset = off;
850: acb[i]->aio_lio_opcode = LIO_READ;
851: }
852: memset(&sig, 0, sizeof sig);
853: sig.sigev_notify = SIGEV_KEVENT;
854: sig.sigev_notify_kqueue = root->root_kq;
855: sig.sigev_value.sival_ptr = acb;
856:
857: if (lio_listio(LIO_NOWAIT, acb, nbufs, &sig)) {
858: LOGERR;
859: for (i = 0; i < nbufs; i++)
860: if (acb[i])
861: free(acb[i]);
862: free(acb);
863: return NULL;
864: }
865:
866: return schedLIO(root, func, arg, (void*) acb, bufs, nbufs);
867: }
868:
869: /*
870: * schedLIOWrite() - Add list of AIO write tasks to scheduler queue
871: *
872: * @root = root task
873: * @func = task execution function
874: * @arg = 1st func argument
875: * @fd = file descriptor
876: * @bufs = Buffer's list
877: * @nbufs = Number of Buffers
878: * @offset = Offset from start of file, if =-1 from current position
879: * return: NULL error or !=NULL new queued task
880: */
881: sched_task_t *
882: schedLIOWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
883: struct iovec *bufs, size_t nbufs, off_t offset)
884: {
885: struct sigevent sig;
886: struct aiocb **acb;
887: off_t off;
888: register int i;
889:
890: if (!root || !func || !bufs || !nbufs)
891: return NULL;
892:
893: if (offset == (off_t) -1) {
894: off = lseek(fd, 0, SEEK_CUR);
895: if (off == -1) {
896: LOGERR;
897: return NULL;
898: }
899: } else
900: off = offset;
901:
902: if (!(acb = calloc(sizeof(void*), nbufs))) {
903: LOGERR;
904: return NULL;
905: } else
906: memset(acb, 0, sizeof(void*) * nbufs);
907: for (i = 0; i < nbufs; off += bufs[i++].iov_len) {
908: acb[i] = malloc(sizeof(struct aiocb));
909: if (!acb[i]) {
910: LOGERR;
911: for (i = 0; i < nbufs; i++)
912: if (acb[i])
913: free(acb[i]);
914: free(acb);
915: return NULL;
916: } else
917: memset(acb[i], 0, sizeof(struct aiocb));
918: acb[i]->aio_fildes = fd;
919: acb[i]->aio_nbytes = bufs[i].iov_len;
920: acb[i]->aio_buf = bufs[i].iov_base;
921: acb[i]->aio_offset = off;
922: acb[i]->aio_lio_opcode = LIO_WRITE;
923: }
924: memset(&sig, 0, sizeof sig);
925: sig.sigev_notify = SIGEV_KEVENT;
926: sig.sigev_notify_kqueue = root->root_kq;
927: sig.sigev_value.sival_ptr = acb;
928:
929: if (lio_listio(LIO_NOWAIT, acb, nbufs, &sig)) {
930: LOGERR;
931: for (i = 0; i < nbufs; i++)
932: if (acb[i])
933: free(acb[i]);
934: free(acb);
935: return NULL;
936: }
937:
938: return schedLIO(root, func, arg, (void*) acb, bufs, nbufs);
939: }
940: #endif /* EVFILT_LIO */
941: #endif /* AIO_SUPPORT */
942:
943: /*
944: * schedTimer() - Add TIMER task to scheduler queue
945: *
946: * @root = root task
947: * @func = task execution function
948: * @arg = 1st func argument
949: * @ts = timeout argument structure
950: * @opt_data = Optional data
951: * @opt_dlen = Optional data length
952: * return: NULL error or !=NULL new queued task
953: */
954: sched_task_t *
955: schedTimer(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,
956: void *opt_data, size_t opt_dlen)
957: {
958: sched_task_t *task, *tmp, *t = NULL;
959: void *ptr;
960: struct timespec now;
961:
962: if (!root || !func)
963: return NULL;
964:
965: /* get new task */
966: if (!(task = sched_useTask(root)))
967: return NULL;
968:
969: task->task_func = func;
970: TASK_TYPE(task) = taskTIMER;
971: TASK_ROOT(task) = root;
972:
973: TASK_ARG(task) = arg;
974:
975: TASK_DATA(task) = opt_data;
976: TASK_DATLEN(task) = opt_dlen;
977:
978: /* calculate timeval structure */
979: clock_gettime(CLOCK_MONOTONIC, &now);
980: now.tv_sec += ts.tv_sec;
981: now.tv_nsec += ts.tv_nsec;
982: if (now.tv_nsec >= 1000000000L) {
983: now.tv_sec++;
984: now.tv_nsec -= 1000000000L;
985: } else if (now.tv_nsec < 0) {
986: now.tv_sec--;
987: now.tv_nsec += 1000000000L;
988: }
989: TASK_TS(task) = now;
990:
991: if (root->root_hooks.hook_add.timer)
992: ptr = root->root_hooks.hook_add.timer(task, NULL);
993: else
994: ptr = NULL;
995:
996: if (!ptr) {
997: #ifdef HAVE_LIBPTHREAD
998: pthread_mutex_lock(&root->root_mtx[taskTIMER]);
999: #endif
1000: #ifdef TIMER_WITHOUT_SORT
1001: TAILQ_INSERT_TAIL(&root->root_timer, TASK_ID(task), task_node);
1002: #else
1003: TAILQ_FOREACH_SAFE(t, &root->root_timer, task_node, tmp)
1004: if (sched_timespeccmp(&TASK_TS(task), &TASK_TS(t), -) < 1)
1005: break;
1006: if (!t)
1007: TAILQ_INSERT_TAIL(&root->root_timer, TASK_ID(task), task_node);
1008: else
1009: TAILQ_INSERT_BEFORE(t, TASK_ID(task), task_node);
1010: #endif
1011: #ifdef HAVE_LIBPTHREAD
1012: pthread_mutex_unlock(&root->root_mtx[taskTIMER]);
1013: #endif
1014: } else
1015: task = sched_unuseTask(task);
1016:
1017: return task;
1018: }
1019:
1020: /*
1021: * schedEvent() - Add EVENT task to scheduler queue
1022: *
1023: * @root = root task
1024: * @func = task execution function
1025: * @arg = 1st func argument
1026: * @val = additional func argument
1027: * @opt_data = Optional data
1028: * @opt_dlen = Optional data length
1029: * return: NULL error or !=NULL new queued task
1030: */
1031: sched_task_t *
1032: schedEvent(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val,
1033: void *opt_data, size_t opt_dlen)
1034: {
1035: sched_task_t *task;
1036: void *ptr;
1037:
1038: if (!root || !func)
1039: return NULL;
1040:
1041: /* get new task */
1042: if (!(task = sched_useTask(root)))
1043: return NULL;
1044:
1045: task->task_func = func;
1046: TASK_TYPE(task) = taskEVENT;
1047: TASK_ROOT(task) = root;
1048:
1049: TASK_ARG(task) = arg;
1050: TASK_VAL(task) = val;
1051:
1052: TASK_DATA(task) = opt_data;
1053: TASK_DATLEN(task) = opt_dlen;
1054:
1055: if (root->root_hooks.hook_add.event)
1056: ptr = root->root_hooks.hook_add.event(task, NULL);
1057: else
1058: ptr = NULL;
1059:
1060: if (!ptr) {
1061: #ifdef HAVE_LIBPTHREAD
1062: pthread_mutex_lock(&root->root_mtx[taskEVENT]);
1063: #endif
1064: TAILQ_INSERT_TAIL(&root->root_event, TASK_ID(task), task_node);
1065: #ifdef HAVE_LIBPTHREAD
1066: pthread_mutex_unlock(&root->root_mtx[taskEVENT]);
1067: #endif
1068: } else
1069: task = sched_unuseTask(task);
1070:
1071: return task;
1072: }
1073:
1074:
1075: /*
1076: * schedTask() - Add regular task to scheduler queue
1077: *
1078: * @root = root task
1079: * @func = task execution function
1080: * @arg = 1st func argument
1081: * @prio = regular task priority, 0 is hi priority for regular tasks
1082: * @opt_data = Optional data
1083: * @opt_dlen = Optional data length
1084: * return: NULL error or !=NULL new queued task
1085: */
1086: sched_task_t *
1087: schedTask(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long prio,
1088: void *opt_data, size_t opt_dlen)
1089: {
1090: sched_task_t *task, *tmp, *t = NULL;
1091: void *ptr;
1092:
1093: if (!root || !func)
1094: return NULL;
1095:
1096: /* get new task */
1097: if (!(task = sched_useTask(root)))
1098: return NULL;
1099:
1100: task->task_func = func;
1101: TASK_TYPE(task) = taskTASK;
1102: TASK_ROOT(task) = root;
1103:
1104: TASK_ARG(task) = arg;
1105: TASK_VAL(task) = prio;
1106:
1107: TASK_DATA(task) = opt_data;
1108: TASK_DATLEN(task) = opt_dlen;
1109:
1110: if (root->root_hooks.hook_add.task)
1111: ptr = root->root_hooks.hook_add.task(task, NULL);
1112: else
1113: ptr = NULL;
1114:
1115: if (!ptr) {
1116: #ifdef HAVE_LIBPTHREAD
1117: pthread_mutex_lock(&root->root_mtx[taskTASK]);
1118: #endif
1119: TAILQ_FOREACH_SAFE(t, &root->root_task, task_node, tmp)
1120: if (TASK_VAL(task) < TASK_VAL(t))
1121: break;
1122: if (!t)
1123: TAILQ_INSERT_TAIL(&root->root_task, TASK_ID(task), task_node);
1124: else
1125: TAILQ_INSERT_BEFORE(t, TASK_ID(task), task_node);
1126: #ifdef HAVE_LIBPTHREAD
1127: pthread_mutex_unlock(&root->root_mtx[taskTASK]);
1128: #endif
1129: } else
1130: task = sched_unuseTask(task);
1131:
1132: return task;
1133: }
1134:
1135: /*
1136: * schedSuspend() - Add Suspended task to scheduler queue
1137: *
1138: * @root = root task
1139: * @func = task execution function
1140: * @arg = 1st func argument
1141: * @id = Trigger ID
1142: * @opt_data = Optional data
1143: * @opt_dlen = Optional data length
1144: * return: NULL error or !=NULL new queued task
1145: */
1146: sched_task_t *
1147: schedSuspend(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id,
1148: void *opt_data, size_t opt_dlen)
1149: {
1150: sched_task_t *task;
1151: void *ptr;
1152:
1153: if (!root || !func)
1154: return NULL;
1155:
1156: /* get new task */
1157: if (!(task = sched_useTask(root)))
1158: return NULL;
1159:
1160: task->task_func = func;
1161: TASK_TYPE(task) = taskSUSPEND;
1162: TASK_ROOT(task) = root;
1163:
1164: TASK_ARG(task) = arg;
1165: TASK_VAL(task) = id;
1166:
1167: TASK_DATA(task) = opt_data;
1168: TASK_DATLEN(task) = opt_dlen;
1169:
1170: if (root->root_hooks.hook_add.suspend)
1171: ptr = root->root_hooks.hook_add.suspend(task, NULL);
1172: else
1173: ptr = NULL;
1174:
1175: if (!ptr) {
1176: #ifdef HAVE_LIBPTHREAD
1177: pthread_mutex_lock(&root->root_mtx[taskSUSPEND]);
1178: #endif
1179: TAILQ_INSERT_TAIL(&root->root_suspend, TASK_ID(task), task_node);
1180: #ifdef HAVE_LIBPTHREAD
1181: pthread_mutex_unlock(&root->root_mtx[taskSUSPEND]);
1182: #endif
1183: } else
1184: task = sched_unuseTask(task);
1185:
1186: return task;
1187: }
1188:
1189: /*
1190: * schedCallOnce() - Call once from scheduler
1191: *
1192: * @root = root task
1193: * @func = task execution function
1194: * @arg = 1st func argument
1195: * @val = additional func argument
1196: * @opt_data = Optional data
1197: * @opt_dlen = Optional data length
1198: * return: return value from called func
1199: */
1200: sched_task_t *
1201: schedCallOnce(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val,
1202: void *opt_data, size_t opt_dlen)
1203: {
1204: sched_task_t *task;
1205: void *ret;
1206:
1207: if (!root || !func)
1208: return NULL;
1209:
1210: /* get new task */
1211: if (!(task = sched_useTask(root)))
1212: return NULL;
1213:
1214: task->task_func = func;
1215: TASK_TYPE(task) = taskEVENT;
1216: TASK_ROOT(task) = root;
1217:
1218: TASK_ARG(task) = arg;
1219: TASK_VAL(task) = val;
1220:
1221: TASK_DATA(task) = opt_data;
1222: TASK_DATLEN(task) = opt_dlen;
1223:
1224: ret = schedCall(task);
1225:
1226: sched_unuseTask(task);
1227: return ret;
1228: }
1229:
1230: /*
1231: * schedThread() - Add thread task to scheduler queue
1232: *
1233: * @root = root task
1234: * @func = task execution function
1235: * @arg = 1st func argument
1236: * @detach = Detach thread from scheduler, if !=0
1237: * @ss = stack size
1238: * @opt_data = Optional data
1239: * @opt_dlen = Optional data length
1240: * return: NULL error or !=NULL new queued task
1241: */
1242: sched_task_t *
1243: schedThread(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int detach,
1244: size_t ss, void *opt_data, size_t opt_dlen)
1245: {
1246: #ifndef HAVE_LIBPTHREAD
1247: sched_SetErr(ENOTSUP, "Not supported thread tasks");
1248: return NULL;
1249: #endif
1250: sched_task_t *task;
1251: void *ptr;
1252: pthread_attr_t attr;
1253: sem_t *s = NULL;
1254:
1255: if (!root || !func)
1256: return NULL;
1257: else {
1258: /* normalizing stack size & detach state */
1259: if (ss)
1260: ss &= 0x7FFFFFFF;
1261: detach = detach ? PTHREAD_CREATE_DETACHED : PTHREAD_CREATE_JOINABLE;
1262: }
1263:
1264: if (!(s = (sem_t*) malloc(sizeof(sem_t)))) {
1265: LOGERR;
1266: return NULL;
1267: }
1268: if (sem_init(s, 0, 1)) {
1269: LOGERR;
1270: free(s);
1271: return NULL;
1272: }
1273:
1274: /* get new task */
1275: if (!(task = sched_useTask(root))) {
1276: sem_destroy(s);
1277: free(s);
1278:
1279: return NULL;
1280: }
1281:
1282: task->task_func = func;
1283: TASK_TYPE(task) = taskTHREAD;
1284: TASK_ROOT(task) = root;
1285:
1286: TASK_ARG(task) = arg;
1287: TASK_FLAG(task) = detach;
1288: TASK_RET(task) = (intptr_t) s;
1289:
1290: TASK_DATA(task) = opt_data;
1291: TASK_DATLEN(task) = opt_dlen;
1292:
1293: pthread_attr_init(&attr);
1294: pthread_attr_setdetachstate(&attr, detach);
1295: if (ss && (errno = pthread_attr_setstacksize(&attr, ss))) {
1296: LOGERR;
1297: pthread_attr_destroy(&attr);
1298: sem_destroy(s);
1299: free(s);
1300: return sched_unuseTask(task);
1301: }
1302: if ((errno = pthread_attr_getstacksize(&attr, &ss))) {
1303: LOGERR;
1304: pthread_attr_destroy(&attr);
1305: sem_destroy(s);
1306: free(s);
1307: return sched_unuseTask(task);
1308: } else
1309: TASK_FLAG(task) |= (ss << 1);
1310: if ((errno = pthread_attr_setguardsize(&attr, ss))) {
1311: LOGERR;
1312: pthread_attr_destroy(&attr);
1313: sem_destroy(s);
1314: free(s);
1315: return sched_unuseTask(task);
1316: }
1317: #ifdef SCHED_RR
1318: pthread_attr_setschedpolicy(&attr, SCHED_RR);
1319: #else
1320: pthread_attr_setschedpolicy(&attr, SCHED_OTHER);
1321: #endif
1322: if (root->root_hooks.hook_add.thread)
1323: ptr = root->root_hooks.hook_add.thread(task, &attr);
1324: else
1325: ptr = NULL;
1326: pthread_attr_destroy(&attr);
1327:
1328: if (!ptr) {
1329: pthread_mutex_lock(&root->root_mtx[taskTHREAD]);
1330: TAILQ_INSERT_TAIL(&root->root_thread, TASK_ID(task), task_node);
1331: pthread_mutex_unlock(&root->root_mtx[taskTHREAD]);
1332:
1333: /* wait for init thread actions */
1334: sem_wait(s);
1335: } else
1336: task = sched_unuseTask(task);
1337:
1338: sem_destroy(s);
1339: free(s);
1340: return task;
1341: }
1342:
1343: /*
1344: * schedRTC() - Add RTC task to scheduler queue
1345: *
1346: * @root = root task
1347: * @func = task execution function
1348: * @arg = 1st func argument
1349: * @ts = timeout argument structure, minimum alarm timer resolution is 1msec!
1350: * @opt_data = Optional RTC ID
1351: * @opt_dlen = Optional data length
1352: * return: NULL error or !=NULL new queued task
1353: */
1354: sched_task_t *
1355: schedRTC(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,
1356: void *opt_data, size_t opt_dlen)
1357: {
1358: #if defined(HAVE_TIMER_CREATE) && defined(HAVE_TIMER_SETTIME)
1359: sched_task_t *task;
1360: void *ptr;
1361:
1362: if (!root || !func)
1363: return NULL;
1364:
1365: /* get new task */
1366: if (!(task = sched_useTask(root)))
1367: return NULL;
1368:
1369: task->task_func = func;
1370: TASK_TYPE(task) = taskRTC;
1371: TASK_ROOT(task) = root;
1372:
1373: TASK_ARG(task) = arg;
1374: TASK_TS(task) = ts;
1375:
1376: TASK_DATA(task) = opt_data;
1377: TASK_DATLEN(task) = opt_dlen;
1378:
1379: if (root->root_hooks.hook_add.rtc)
1380: ptr = root->root_hooks.hook_add.rtc(task, NULL);
1381: else
1382: ptr = NULL;
1383:
1384: if (!ptr) {
1385: #ifdef HAVE_LIBPTHREAD
1386: pthread_mutex_lock(&root->root_mtx[taskRTC]);
1387: #endif
1388: TAILQ_INSERT_TAIL(&root->root_rtc, TASK_ID(task), task_node);
1389: #ifdef HAVE_LIBPTHREAD
1390: pthread_mutex_unlock(&root->root_mtx[taskRTC]);
1391: #endif
1392: } else
1393: task = sched_unuseTask(task);
1394:
1395: return task;
1396: #else
1397: sched_SetErr(ENOTSUP, "Not supported realtime clock extensions");
1398: return NULL;
1399: #endif
1400: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>