1: /*************************************************************************
2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
6: * $Id: tasks.c,v 1.17 2013/08/15 19:10:48 misho Exp $
7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013
16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47:
48:
49: /*
50: * sched_useTask() - Get and init new task
51: *
52: * @root = root task
53: * return: NULL error or !=NULL prepared task
54: */
55: sched_task_t *
56: sched_useTask(sched_root_task_t * __restrict root)
57: {
58: sched_task_t *task, *tmp;
59:
60: #ifdef HAVE_LIBPTHREAD
61: pthread_mutex_lock(&root->root_mtx[taskUNUSE]);
62: #endif
63: TAILQ_FOREACH_SAFE(task, &root->root_unuse, task_node, tmp) {
64: if (!TASK_ISLOCKED(task)) {
65: TAILQ_REMOVE(&root->root_unuse, task, task_node);
66: break;
67: }
68: }
69: #ifdef HAVE_LIBPTHREAD
70: pthread_mutex_unlock(&root->root_mtx[taskUNUSE]);
71: #endif
72:
73: if (!task) {
74: task = malloc(sizeof(sched_task_t));
75: if (!task) {
76: LOGERR;
77: return NULL;
78: }
79: }
80:
81: memset(task, 0, sizeof(sched_task_t));
82: task->task_id = (uintptr_t) task;
83: return task;
84: }
85:
86: /*
87: * sched_unuseTask() - Unlock and put task to unuse queue
88: *
89: * @task = task
90: * return: always is NULL
91: */
92: sched_task_t *
93: sched_unuseTask(sched_task_t * __restrict task)
94: {
95: TASK_UNLOCK(task);
96: TASK_TYPE(task) = taskUNUSE;
97: #ifdef HAVE_LIBPTHREAD
98: pthread_mutex_lock(&TASK_ROOT(task)->root_mtx[taskUNUSE]);
99: #endif
100: TAILQ_INSERT_TAIL(&TASK_ROOT(task)->root_unuse, TASK_ID(task), task_node);
101: #ifdef HAVE_LIBPTHREAD
102: pthread_mutex_unlock(&TASK_ROOT(task)->root_mtx[taskUNUSE]);
103: #endif
104: task = NULL;
105:
106: return task;
107: }
108:
109: #pragma GCC visibility push(hidden)
110:
111: #ifdef HAVE_LIBPTHREAD
112: static void
113: _sched_threadCleanup(sched_task_t *t)
114: {
115: if (!t || !TASK_ROOT(t))
116: return;
117:
118: if (TASK_FLAG(t) == PTHREAD_CREATE_JOINABLE)
119: pthread_detach(pthread_self());
120:
121: pthread_mutex_lock(&TASK_ROOT(t)->root_mtx[taskTHREAD]);
122: TAILQ_REMOVE(&TASK_ROOT(t)->root_thread, t, task_node);
123: pthread_mutex_unlock(&TASK_ROOT(t)->root_mtx[taskTHREAD]);
124:
125: sched_unuseTask(t);
126: }
127: void *
128: _sched_threadWrapper(sched_task_t *t)
129: {
130: void *ret = NULL;
131: sem_t *s = NULL;
132:
133: if (!t || !TASK_ROOT(t) || !TASK_RET(t))
134: pthread_exit(ret);
135: else
136: s = (sem_t*) TASK_RET(t);
137:
138: pthread_cleanup_push((void (*)(void*)) _sched_threadCleanup, t);
139:
140: pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
141: pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
142:
143: /* notify parent, thread is ready for execution */
144: sem_post(s);
145: pthread_testcancel();
146:
147: ret = TASK_FUNC(t)(t);
148:
149: pthread_cleanup_pop(42);
150: TASK_ROOT(t)->root_ret = ret;
151: pthread_exit(ret);
152: }
153: #endif
154:
155: #pragma GCC visibility pop
156:
157: /*
158: * sched_taskExit() - Exit routine for scheduler task, explicit required for thread tasks
159: *
160: * @task = current task
161: * @retcode = return code
162: * return: return code
163: */
164: void *
165: sched_taskExit(sched_task_t *task, intptr_t retcode)
166: {
167: if (!task || !TASK_ROOT(task))
168: return (void*) -1;
169:
170: if (TASK_ROOT(task)->root_hooks.hook_exec.exit)
171: TASK_ROOT(task)->root_hooks.hook_exec.exit(task, (void*) retcode);
172:
173: TASK_ROOT(task)->root_ret = (void*) retcode;
174: return (void*) retcode;
175: }
176:
177:
178: /*
179: * schedRead() - Add READ I/O task to scheduler queue
180: *
181: * @root = root task
182: * @func = task execution function
183: * @arg = 1st func argument
184: * @fd = fd handle
185: * @opt_data = Optional data
186: * @opt_dlen = Optional data length
187: * return: NULL error or !=NULL new queued task
188: */
189: sched_task_t *
190: schedRead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
191: void *opt_data, size_t opt_dlen)
192: {
193: sched_task_t *task;
194: void *ptr;
195:
196: if (!root || !func)
197: return NULL;
198:
199: /* get new task */
200: if (!(task = sched_useTask(root)))
201: return NULL;
202:
203: task->task_func = func;
204: TASK_TYPE(task) = taskREAD;
205: TASK_ROOT(task) = root;
206:
207: TASK_ARG(task) = arg;
208: TASK_FD(task) = fd;
209:
210: TASK_DATA(task) = opt_data;
211: TASK_DATLEN(task) = opt_dlen;
212:
213: if (root->root_hooks.hook_add.read)
214: ptr = root->root_hooks.hook_add.read(task, NULL);
215: else
216: ptr = NULL;
217:
218: if (!ptr) {
219: #ifdef HAVE_LIBPTHREAD
220: pthread_mutex_lock(&root->root_mtx[taskREAD]);
221: #endif
222: TAILQ_INSERT_TAIL(&root->root_read, TASK_ID(task), task_node);
223: #ifdef HAVE_LIBPTHREAD
224: pthread_mutex_unlock(&root->root_mtx[taskREAD]);
225: #endif
226: } else
227: task = sched_unuseTask(task);
228:
229: return task;
230: }
231:
232: /*
233: * schedWrite() - Add WRITE I/O task to scheduler queue
234: *
235: * @root = root task
236: * @func = task execution function
237: * @arg = 1st func argument
238: * @fd = fd handle
239: * @opt_data = Optional data
240: * @opt_dlen = Optional data length
241: * return: NULL error or !=NULL new queued task
242: */
243: sched_task_t *
244: schedWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
245: void *opt_data, size_t opt_dlen)
246: {
247: sched_task_t *task;
248: void *ptr;
249:
250: if (!root || !func)
251: return NULL;
252:
253: /* get new task */
254: if (!(task = sched_useTask(root)))
255: return NULL;
256:
257: task->task_func = func;
258: TASK_TYPE(task) = taskWRITE;
259: TASK_ROOT(task) = root;
260:
261: TASK_ARG(task) = arg;
262: TASK_FD(task) = fd;
263:
264: TASK_DATA(task) = opt_data;
265: TASK_DATLEN(task) = opt_dlen;
266:
267: if (root->root_hooks.hook_add.write)
268: ptr = root->root_hooks.hook_add.write(task, NULL);
269: else
270: ptr = NULL;
271:
272: if (!ptr) {
273: #ifdef HAVE_LIBPTHREAD
274: pthread_mutex_lock(&root->root_mtx[taskWRITE]);
275: #endif
276: TAILQ_INSERT_TAIL(&root->root_write, TASK_ID(task), task_node);
277: #ifdef HAVE_LIBPTHREAD
278: pthread_mutex_unlock(&root->root_mtx[taskWRITE]);
279: #endif
280: } else
281: task = sched_unuseTask(task);
282:
283: return task;
284: }
285:
286: /*
287: * schedNode() - Add NODE task to scheduler queue
288: *
289: * @root = root task
290: * @func = task execution function
291: * @arg = 1st func argument
292: * @fd = fd handle
293: * @opt_data = Optional data
294: * @opt_dlen = Optional data length
295: * return: NULL error or !=NULL new queued task
296: */
297: sched_task_t *
298: schedNode(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
299: void *opt_data, size_t opt_dlen)
300: {
301: sched_task_t *task;
302: void *ptr;
303:
304: if (!root || !func)
305: return NULL;
306:
307: /* get new task */
308: if (!(task = sched_useTask(root)))
309: return NULL;
310:
311: task->task_func = func;
312: TASK_TYPE(task) = taskNODE;
313: TASK_ROOT(task) = root;
314:
315: TASK_ARG(task) = arg;
316: TASK_FD(task) = fd;
317:
318: TASK_DATA(task) = opt_data;
319: TASK_DATLEN(task) = opt_dlen;
320:
321: if (root->root_hooks.hook_add.node)
322: ptr = root->root_hooks.hook_add.node(task, NULL);
323: else
324: ptr = NULL;
325:
326: if (!ptr) {
327: #ifdef HAVE_LIBPTHREAD
328: pthread_mutex_lock(&root->root_mtx[taskNODE]);
329: #endif
330: TAILQ_INSERT_TAIL(&root->root_node, TASK_ID(task), task_node);
331: #ifdef HAVE_LIBPTHREAD
332: pthread_mutex_unlock(&root->root_mtx[taskNODE]);
333: #endif
334: } else
335: task = sched_unuseTask(task);
336:
337: return task;
338: }
339:
340: /*
341: * schedProc() - Add PROC task to scheduler queue
342: *
343: * @root = root task
344: * @func = task execution function
345: * @arg = 1st func argument
346: * @pid = PID
347: * @opt_data = Optional data
348: * @opt_dlen = Optional data length
349: * return: NULL error or !=NULL new queued task
350: */
351: sched_task_t *
352: schedProc(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long pid,
353: void *opt_data, size_t opt_dlen)
354: {
355: sched_task_t *task;
356: void *ptr;
357:
358: if (!root || !func)
359: return NULL;
360:
361: /* get new task */
362: if (!(task = sched_useTask(root)))
363: return NULL;
364:
365: task->task_func = func;
366: TASK_TYPE(task) = taskPROC;
367: TASK_ROOT(task) = root;
368:
369: TASK_ARG(task) = arg;
370: TASK_VAL(task) = pid;
371:
372: TASK_DATA(task) = opt_data;
373: TASK_DATLEN(task) = opt_dlen;
374:
375: if (root->root_hooks.hook_add.proc)
376: ptr = root->root_hooks.hook_add.proc(task, NULL);
377: else
378: ptr = NULL;
379:
380: if (!ptr) {
381: #ifdef HAVE_LIBPTHREAD
382: pthread_mutex_lock(&root->root_mtx[taskPROC]);
383: #endif
384: TAILQ_INSERT_TAIL(&root->root_proc, TASK_ID(task), task_node);
385: #ifdef HAVE_LIBPTHREAD
386: pthread_mutex_unlock(&root->root_mtx[taskPROC]);
387: #endif
388: } else
389: task = sched_unuseTask(task);
390:
391: return task;
392: }
393:
394: /*
395: * schedUser() - Add trigger USER task to scheduler queue
396: *
397: * @root = root task
398: * @func = task execution function
399: * @arg = 1st func argument
400: * @id = Trigger ID
401: * @opt_data = Optional data
402: * @opt_dlen = Optional user's trigger flags
403: * return: NULL error or !=NULL new queued task
404: */
405: sched_task_t *
406: schedUser(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id,
407: void *opt_data, size_t opt_dlen)
408: {
409: #ifndef EVFILT_USER
410: sched_SetErr(ENOTSUP, "Not supported kevent() filter");
411: return NULL;
412: #else
413: sched_task_t *task;
414: void *ptr;
415:
416: if (!root || !func)
417: return NULL;
418:
419: /* get new task */
420: if (!(task = sched_useTask(root)))
421: return NULL;
422:
423: task->task_func = func;
424: TASK_TYPE(task) = taskUSER;
425: TASK_ROOT(task) = root;
426:
427: TASK_ARG(task) = arg;
428: TASK_VAL(task) = id;
429:
430: TASK_DATA(task) = opt_data;
431: TASK_DATLEN(task) = opt_dlen;
432:
433: if (root->root_hooks.hook_add.user)
434: ptr = root->root_hooks.hook_add.user(task, NULL);
435: else
436: ptr = NULL;
437:
438: if (!ptr) {
439: #ifdef HAVE_LIBPTHREAD
440: pthread_mutex_lock(&root->root_mtx[taskUSER]);
441: #endif
442: TAILQ_INSERT_TAIL(&root->root_user, TASK_ID(task), task_node);
443: #ifdef HAVE_LIBPTHREAD
444: pthread_mutex_unlock(&root->root_mtx[taskUSER]);
445: #endif
446: } else
447: task = sched_unuseTask(task);
448:
449: return task;
450: #endif
451: }
452:
453: /*
454: * schedSignal() - Add SIGNAL task to scheduler queue
455: *
456: * @root = root task
457: * @func = task execution function
458: * @arg = 1st func argument
459: * @sig = Signal
460: * @opt_data = Optional data
461: * @opt_dlen = Optional data length
462: * return: NULL error or !=NULL new queued task
463: */
464: sched_task_t *
465: schedSignal(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long sig,
466: void *opt_data, size_t opt_dlen)
467: {
468: sched_task_t *task;
469: void *ptr;
470:
471: if (!root || !func)
472: return NULL;
473:
474: /* get new task */
475: if (!(task = sched_useTask(root)))
476: return NULL;
477:
478: task->task_func = func;
479: TASK_TYPE(task) = taskSIGNAL;
480: TASK_ROOT(task) = root;
481:
482: TASK_ARG(task) = arg;
483: TASK_VAL(task) = sig;
484:
485: TASK_DATA(task) = opt_data;
486: TASK_DATLEN(task) = opt_dlen;
487:
488: if (root->root_hooks.hook_add.signal)
489: ptr = root->root_hooks.hook_add.signal(task, NULL);
490: else
491: ptr = NULL;
492:
493: if (!ptr) {
494: #ifdef HAVE_LIBPTHREAD
495: pthread_mutex_lock(&root->root_mtx[taskSIGNAL]);
496: #endif
497: TAILQ_INSERT_TAIL(&root->root_signal, TASK_ID(task), task_node);
498: #ifdef HAVE_LIBPTHREAD
499: pthread_mutex_unlock(&root->root_mtx[taskSIGNAL]);
500: #endif
501: } else
502: task = sched_unuseTask(task);
503:
504: return task;
505: }
506:
507: /*
508: * schedAlarm() - Add ALARM task to scheduler queue
509: *
510: * @root = root task
511: * @func = task execution function
512: * @arg = 1st func argument
513: * @ts = timeout argument structure, minimum alarm timer resolution is 1msec!
514: * @opt_data = Alarm timer ID
515: * @opt_dlen = Optional data length
516: * return: NULL error or !=NULL new queued task
517: */
518: sched_task_t *
519: schedAlarm(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,
520: void *opt_data, size_t opt_dlen)
521: {
522: sched_task_t *task;
523: void *ptr;
524:
525: if (!root || !func)
526: return NULL;
527:
528: /* get new task */
529: if (!(task = sched_useTask(root)))
530: return NULL;
531:
532: task->task_func = func;
533: TASK_TYPE(task) = taskALARM;
534: TASK_ROOT(task) = root;
535:
536: TASK_ARG(task) = arg;
537: TASK_TS(task) = ts;
538:
539: TASK_DATA(task) = opt_data;
540: TASK_DATLEN(task) = opt_dlen;
541:
542: if (root->root_hooks.hook_add.alarm)
543: ptr = root->root_hooks.hook_add.alarm(task, NULL);
544: else
545: ptr = NULL;
546:
547: if (!ptr) {
548: #ifdef HAVE_LIBPTHREAD
549: pthread_mutex_lock(&root->root_mtx[taskALARM]);
550: #endif
551: TAILQ_INSERT_TAIL(&root->root_alarm, TASK_ID(task), task_node);
552: #ifdef HAVE_LIBPTHREAD
553: pthread_mutex_unlock(&root->root_mtx[taskALARM]);
554: #endif
555: } else
556: task = sched_unuseTask(task);
557:
558: return task;
559: }
560:
561: #ifdef AIO_SUPPORT
562: /*
563: * schedAIO() - Add AIO task to scheduler queue
564: *
565: * @root = root task
566: * @func = task execution function
567: * @arg = 1st func argument
568: * @acb = AIO cb structure address
569: * @opt_data = Optional data
570: * @opt_dlen = Optional data length
571: * return: NULL error or !=NULL new queued task
572: */
573: sched_task_t *
574: schedAIO(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg,
575: struct aiocb * __restrict acb, void *opt_data, size_t opt_dlen)
576: {
577: sched_task_t *task;
578: void *ptr;
579:
580: if (!root || !func || !acb || !opt_dlen)
581: return NULL;
582:
583: /* get new task */
584: if (!(task = sched_useTask(root)))
585: return NULL;
586:
587: task->task_func = func;
588: TASK_TYPE(task) = taskAIO;
589: TASK_ROOT(task) = root;
590:
591: TASK_ARG(task) = arg;
592: TASK_VAL(task) = (u_long) acb;
593:
594: TASK_DATA(task) = opt_data;
595: TASK_DATLEN(task) = opt_dlen;
596:
597: if (root->root_hooks.hook_add.aio)
598: ptr = root->root_hooks.hook_add.aio(task, NULL);
599: else
600: ptr = NULL;
601:
602: if (!ptr) {
603: #ifdef HAVE_LIBPTHREAD
604: pthread_mutex_lock(&root->root_mtx[taskAIO]);
605: #endif
606: TAILQ_INSERT_TAIL(&root->root_aio, TASK_ID(task), task_node);
607: #ifdef HAVE_LIBPTHREAD
608: pthread_mutex_unlock(&root->root_mtx[taskAIO]);
609: #endif
610: } else
611: task = sched_unuseTask(task);
612:
613: return task;
614: }
615:
616: /*
617: * schedAIORead() - Add AIO read task to scheduler queue
618: *
619: * @root = root task
620: * @func = task execution function
621: * @arg = 1st func argument
622: * @fd = file descriptor
623: * @buffer = Buffer
624: * @buflen = Buffer length
625: * @offset = Offset from start of file, if =-1 from current position
626: * return: NULL error or !=NULL new queued task
627: */
628: sched_task_t *
629: schedAIORead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
630: void *buffer, size_t buflen, off_t offset)
631: {
632: struct aiocb *acb;
633: off_t off;
634:
635: if (!root || !func || !buffer || !buflen)
636: return NULL;
637:
638: if (offset == (off_t) -1) {
639: off = lseek(fd, 0, SEEK_CUR);
640: if (off == -1) {
641: LOGERR;
642: return NULL;
643: }
644: } else
645: off = offset;
646:
647: if (!(acb = malloc(sizeof(struct aiocb)))) {
648: LOGERR;
649: return NULL;
650: } else
651: memset(acb, 0, sizeof(struct aiocb));
652:
653: acb->aio_fildes = fd;
654: acb->aio_nbytes = buflen;
655: acb->aio_buf = buffer;
656: acb->aio_offset = off;
657: acb->aio_sigevent.sigev_notify = SIGEV_KEVENT;
658: acb->aio_sigevent.sigev_notify_kqueue = root->root_kq;
659: acb->aio_sigevent.sigev_value.sival_ptr = acb;
660:
661: if (aio_read(acb)) {
662: LOGERR;
663: free(acb);
664: return NULL;
665: }
666:
667: return schedAIO(root, func, arg, acb, buffer, buflen);
668: }
669:
670: /*
671: * schedAIOWrite() - Add AIO write task to scheduler queue
672: *
673: * @root = root task
674: * @func = task execution function
675: * @arg = 1st func argument
676: * @fd = file descriptor
677: * @buffer = Buffer
678: * @buflen = Buffer length
679: * @offset = Offset from start of file, if =-1 from current position
680: * return: NULL error or !=NULL new queued task
681: */
682: sched_task_t *
683: schedAIOWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
684: void *buffer, size_t buflen, off_t offset)
685: {
686: struct aiocb *acb;
687: off_t off;
688:
689: if (!root || !func || !buffer || !buflen)
690: return NULL;
691:
692: if (offset == (off_t) -1) {
693: off = lseek(fd, 0, SEEK_CUR);
694: if (off == -1) {
695: LOGERR;
696: return NULL;
697: }
698: } else
699: off = offset;
700:
701: if (!(acb = malloc(sizeof(struct aiocb)))) {
702: LOGERR;
703: return NULL;
704: } else
705: memset(acb, 0, sizeof(struct aiocb));
706:
707: acb->aio_fildes = fd;
708: acb->aio_nbytes = buflen;
709: acb->aio_buf = buffer;
710: acb->aio_offset = off;
711: acb->aio_sigevent.sigev_notify = SIGEV_KEVENT;
712: acb->aio_sigevent.sigev_notify_kqueue = root->root_kq;
713: acb->aio_sigevent.sigev_value.sival_ptr = acb;
714:
715: if (aio_write(acb)) {
716: LOGERR;
717: free(acb);
718: return NULL;
719: }
720:
721: return schedAIO(root, func, arg, acb, buffer, buflen);
722: }
723:
724: #ifdef EVFILT_LIO
725: /*
726: * schedLIO() - Add AIO bulk tasks to scheduler queue
727: *
728: * @root = root task
729: * @func = task execution function
730: * @arg = 1st func argument
731: * @acbs = AIO cb structure addresses
732: * @opt_data = Optional data
733: * @opt_dlen = Optional data length
734: * return: NULL error or !=NULL new queued task
735: */
736: sched_task_t *
737: schedLIO(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg,
738: struct aiocb ** __restrict acbs, void *opt_data, size_t opt_dlen)
739: {
740: sched_task_t *task;
741: void *ptr;
742:
743: if (!root || !func || !acbs || !opt_dlen)
744: return NULL;
745:
746: /* get new task */
747: if (!(task = sched_useTask(root)))
748: return NULL;
749:
750: task->task_func = func;
751: TASK_TYPE(task) = taskLIO;
752: TASK_ROOT(task) = root;
753:
754: TASK_ARG(task) = arg;
755: TASK_VAL(task) = (u_long) acbs;
756:
757: TASK_DATA(task) = opt_data;
758: TASK_DATLEN(task) = opt_dlen;
759:
760: if (root->root_hooks.hook_add.lio)
761: ptr = root->root_hooks.hook_add.lio(task, NULL);
762: else
763: ptr = NULL;
764:
765: if (!ptr) {
766: #ifdef HAVE_LIBPTHREAD
767: pthread_mutex_lock(&root->root_mtx[taskLIO]);
768: #endif
769: TAILQ_INSERT_TAIL(&root->root_lio, TASK_ID(task), task_node);
770: #ifdef HAVE_LIBPTHREAD
771: pthread_mutex_unlock(&root->root_mtx[taskLIO]);
772: #endif
773: } else
774: task = sched_unuseTask(task);
775:
776: return task;
777: }
778:
779: /*
780: * schedLIORead() - Add list of AIO read tasks to scheduler queue
781: *
782: * @root = root task
783: * @func = task execution function
784: * @arg = 1st func argument
785: * @fd = file descriptor
786: * @bufs = Buffer's list
787: * @nbufs = Number of Buffers
788: * @offset = Offset from start of file, if =-1 from current position
789: * return: NULL error or !=NULL new queued task
790: */
791: sched_task_t *
792: schedLIORead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
793: struct iovec *bufs, size_t nbufs, off_t offset)
794: {
795: struct sigevent sig;
796: struct aiocb **acb;
797: off_t off;
798: register int i;
799:
800: if (!root || !func || !bufs || !nbufs)
801: return NULL;
802:
803: if (offset == (off_t) -1) {
804: off = lseek(fd, 0, SEEK_CUR);
805: if (off == -1) {
806: LOGERR;
807: return NULL;
808: }
809: } else
810: off = offset;
811:
812: if (!(acb = calloc(sizeof(void*), nbufs))) {
813: LOGERR;
814: return NULL;
815: } else
816: memset(acb, 0, sizeof(void*) * nbufs);
817: for (i = 0; i < nbufs; off += bufs[i++].iov_len) {
818: acb[i] = malloc(sizeof(struct aiocb));
819: if (!acb[i]) {
820: LOGERR;
821: for (i = 0; i < nbufs; i++)
822: if (acb[i])
823: free(acb[i]);
824: free(acb);
825: return NULL;
826: } else
827: memset(acb[i], 0, sizeof(struct aiocb));
828: acb[i]->aio_fildes = fd;
829: acb[i]->aio_nbytes = bufs[i].iov_len;
830: acb[i]->aio_buf = bufs[i].iov_base;
831: acb[i]->aio_offset = off;
832: acb[i]->aio_lio_opcode = LIO_READ;
833: }
834: memset(&sig, 0, sizeof sig);
835: sig.sigev_notify = SIGEV_KEVENT;
836: sig.sigev_notify_kqueue = root->root_kq;
837: sig.sigev_value.sival_ptr = acb;
838:
839: if (lio_listio(LIO_NOWAIT, acb, nbufs, &sig)) {
840: LOGERR;
841: for (i = 0; i < nbufs; i++)
842: if (acb[i])
843: free(acb[i]);
844: free(acb);
845: return NULL;
846: }
847:
848: return schedLIO(root, func, arg, (void*) acb, bufs, nbufs);
849: }
850:
851: /*
852: * schedLIOWrite() - Add list of AIO write tasks to scheduler queue
853: *
854: * @root = root task
855: * @func = task execution function
856: * @arg = 1st func argument
857: * @fd = file descriptor
858: * @bufs = Buffer's list
859: * @nbufs = Number of Buffers
860: * @offset = Offset from start of file, if =-1 from current position
861: * return: NULL error or !=NULL new queued task
862: */
863: sched_task_t *
864: schedLIOWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
865: struct iovec *bufs, size_t nbufs, off_t offset)
866: {
867: struct sigevent sig;
868: struct aiocb **acb;
869: off_t off;
870: register int i;
871:
872: if (!root || !func || !bufs || !nbufs)
873: return NULL;
874:
875: if (offset == (off_t) -1) {
876: off = lseek(fd, 0, SEEK_CUR);
877: if (off == -1) {
878: LOGERR;
879: return NULL;
880: }
881: } else
882: off = offset;
883:
884: if (!(acb = calloc(sizeof(void*), nbufs))) {
885: LOGERR;
886: return NULL;
887: } else
888: memset(acb, 0, sizeof(void*) * nbufs);
889: for (i = 0; i < nbufs; off += bufs[i++].iov_len) {
890: acb[i] = malloc(sizeof(struct aiocb));
891: if (!acb[i]) {
892: LOGERR;
893: for (i = 0; i < nbufs; i++)
894: if (acb[i])
895: free(acb[i]);
896: free(acb);
897: return NULL;
898: } else
899: memset(acb[i], 0, sizeof(struct aiocb));
900: acb[i]->aio_fildes = fd;
901: acb[i]->aio_nbytes = bufs[i].iov_len;
902: acb[i]->aio_buf = bufs[i].iov_base;
903: acb[i]->aio_offset = off;
904: acb[i]->aio_lio_opcode = LIO_WRITE;
905: }
906: memset(&sig, 0, sizeof sig);
907: sig.sigev_notify = SIGEV_KEVENT;
908: sig.sigev_notify_kqueue = root->root_kq;
909: sig.sigev_value.sival_ptr = acb;
910:
911: if (lio_listio(LIO_NOWAIT, acb, nbufs, &sig)) {
912: LOGERR;
913: for (i = 0; i < nbufs; i++)
914: if (acb[i])
915: free(acb[i]);
916: free(acb);
917: return NULL;
918: }
919:
920: return schedLIO(root, func, arg, (void*) acb, bufs, nbufs);
921: }
922: #endif /* EVFILT_LIO */
923: #endif /* AIO_SUPPORT */
924:
925: /*
926: * schedTimer() - Add TIMER task to scheduler queue
927: *
928: * @root = root task
929: * @func = task execution function
930: * @arg = 1st func argument
931: * @ts = timeout argument structure
932: * @opt_data = Optional data
933: * @opt_dlen = Optional data length
934: * return: NULL error or !=NULL new queued task
935: */
936: sched_task_t *
937: schedTimer(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,
938: void *opt_data, size_t opt_dlen)
939: {
940: sched_task_t *task, *tmp, *t = NULL;
941: void *ptr;
942: struct timespec now;
943:
944: if (!root || !func)
945: return NULL;
946:
947: /* get new task */
948: if (!(task = sched_useTask(root)))
949: return NULL;
950:
951: task->task_func = func;
952: TASK_TYPE(task) = taskTIMER;
953: TASK_ROOT(task) = root;
954:
955: TASK_ARG(task) = arg;
956:
957: TASK_DATA(task) = opt_data;
958: TASK_DATLEN(task) = opt_dlen;
959:
960: /* calculate timeval structure */
961: clock_gettime(CLOCK_MONOTONIC, &now);
962: now.tv_sec += ts.tv_sec;
963: now.tv_nsec += ts.tv_nsec;
964: if (now.tv_nsec >= 1000000000L) {
965: now.tv_sec++;
966: now.tv_nsec -= 1000000000L;
967: } else if (now.tv_nsec < 0) {
968: now.tv_sec--;
969: now.tv_nsec += 1000000000L;
970: }
971: TASK_TS(task) = now;
972:
973: if (root->root_hooks.hook_add.timer)
974: ptr = root->root_hooks.hook_add.timer(task, NULL);
975: else
976: ptr = NULL;
977:
978: if (!ptr) {
979: #ifdef HAVE_LIBPTHREAD
980: pthread_mutex_lock(&root->root_mtx[taskTIMER]);
981: #endif
982: #ifdef TIMER_WITHOUT_SORT
983: TAILQ_INSERT_TAIL(&root->root_timer, TASK_ID(task), task_node);
984: #else
985: TAILQ_FOREACH_SAFE(t, &root->root_timer, task_node, tmp)
986: if (sched_timespeccmp(&TASK_TS(task), &TASK_TS(t), -) < 1)
987: break;
988: if (!t)
989: TAILQ_INSERT_TAIL(&root->root_timer, TASK_ID(task), task_node);
990: else
991: TAILQ_INSERT_BEFORE(t, TASK_ID(task), task_node);
992: #endif
993: #ifdef HAVE_LIBPTHREAD
994: pthread_mutex_unlock(&root->root_mtx[taskTIMER]);
995: #endif
996: } else
997: task = sched_unuseTask(task);
998:
999: return task;
1000: }
1001:
1002: /*
1003: * schedEvent() - Add EVENT task to scheduler queue
1004: *
1005: * @root = root task
1006: * @func = task execution function
1007: * @arg = 1st func argument
1008: * @val = additional func argument
1009: * @opt_data = Optional data
1010: * @opt_dlen = Optional data length
1011: * return: NULL error or !=NULL new queued task
1012: */
1013: sched_task_t *
1014: schedEvent(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val,
1015: void *opt_data, size_t opt_dlen)
1016: {
1017: sched_task_t *task;
1018: void *ptr;
1019:
1020: if (!root || !func)
1021: return NULL;
1022:
1023: /* get new task */
1024: if (!(task = sched_useTask(root)))
1025: return NULL;
1026:
1027: task->task_func = func;
1028: TASK_TYPE(task) = taskEVENT;
1029: TASK_ROOT(task) = root;
1030:
1031: TASK_ARG(task) = arg;
1032: TASK_VAL(task) = val;
1033:
1034: TASK_DATA(task) = opt_data;
1035: TASK_DATLEN(task) = opt_dlen;
1036:
1037: if (root->root_hooks.hook_add.event)
1038: ptr = root->root_hooks.hook_add.event(task, NULL);
1039: else
1040: ptr = NULL;
1041:
1042: if (!ptr) {
1043: #ifdef HAVE_LIBPTHREAD
1044: pthread_mutex_lock(&root->root_mtx[taskEVENT]);
1045: #endif
1046: TAILQ_INSERT_TAIL(&root->root_event, TASK_ID(task), task_node);
1047: #ifdef HAVE_LIBPTHREAD
1048: pthread_mutex_unlock(&root->root_mtx[taskEVENT]);
1049: #endif
1050: } else
1051: task = sched_unuseTask(task);
1052:
1053: return task;
1054: }
1055:
1056:
1057: /*
1058: * schedTask() - Add regular task to scheduler queue
1059: *
1060: * @root = root task
1061: * @func = task execution function
1062: * @arg = 1st func argument
1063: * @prio = regular task priority, 0 is hi priority for regular tasks
1064: * @opt_data = Optional data
1065: * @opt_dlen = Optional data length
1066: * return: NULL error or !=NULL new queued task
1067: */
1068: sched_task_t *
1069: schedTask(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long prio,
1070: void *opt_data, size_t opt_dlen)
1071: {
1072: sched_task_t *task, *tmp, *t = NULL;
1073: void *ptr;
1074:
1075: if (!root || !func)
1076: return NULL;
1077:
1078: /* get new task */
1079: if (!(task = sched_useTask(root)))
1080: return NULL;
1081:
1082: task->task_func = func;
1083: TASK_TYPE(task) = taskTASK;
1084: TASK_ROOT(task) = root;
1085:
1086: TASK_ARG(task) = arg;
1087: TASK_VAL(task) = prio;
1088:
1089: TASK_DATA(task) = opt_data;
1090: TASK_DATLEN(task) = opt_dlen;
1091:
1092: if (root->root_hooks.hook_add.task)
1093: ptr = root->root_hooks.hook_add.task(task, NULL);
1094: else
1095: ptr = NULL;
1096:
1097: if (!ptr) {
1098: #ifdef HAVE_LIBPTHREAD
1099: pthread_mutex_lock(&root->root_mtx[taskTASK]);
1100: #endif
1101: TAILQ_FOREACH_SAFE(t, &root->root_task, task_node, tmp)
1102: if (TASK_VAL(task) < TASK_VAL(t))
1103: break;
1104: if (!t)
1105: TAILQ_INSERT_TAIL(&root->root_task, TASK_ID(task), task_node);
1106: else
1107: TAILQ_INSERT_BEFORE(t, TASK_ID(task), task_node);
1108: #ifdef HAVE_LIBPTHREAD
1109: pthread_mutex_unlock(&root->root_mtx[taskTASK]);
1110: #endif
1111: } else
1112: task = sched_unuseTask(task);
1113:
1114: return task;
1115: }
1116:
1117: /*
1118: * schedSuspend() - Add Suspended task to scheduler queue
1119: *
1120: * @root = root task
1121: * @func = task execution function
1122: * @arg = 1st func argument
1123: * @id = Trigger ID
1124: * @opt_data = Optional data
1125: * @opt_dlen = Optional data length
1126: * return: NULL error or !=NULL new queued task
1127: */
1128: sched_task_t *
1129: schedSuspend(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id,
1130: void *opt_data, size_t opt_dlen)
1131: {
1132: sched_task_t *task;
1133: void *ptr;
1134:
1135: if (!root || !func)
1136: return NULL;
1137:
1138: /* get new task */
1139: if (!(task = sched_useTask(root)))
1140: return NULL;
1141:
1142: task->task_func = func;
1143: TASK_TYPE(task) = taskSUSPEND;
1144: TASK_ROOT(task) = root;
1145:
1146: TASK_ARG(task) = arg;
1147: TASK_VAL(task) = id;
1148:
1149: TASK_DATA(task) = opt_data;
1150: TASK_DATLEN(task) = opt_dlen;
1151:
1152: if (root->root_hooks.hook_add.suspend)
1153: ptr = root->root_hooks.hook_add.suspend(task, NULL);
1154: else
1155: ptr = NULL;
1156:
1157: if (!ptr) {
1158: #ifdef HAVE_LIBPTHREAD
1159: pthread_mutex_lock(&root->root_mtx[taskSUSPEND]);
1160: #endif
1161: TAILQ_INSERT_TAIL(&root->root_suspend, TASK_ID(task), task_node);
1162: #ifdef HAVE_LIBPTHREAD
1163: pthread_mutex_unlock(&root->root_mtx[taskSUSPEND]);
1164: #endif
1165: } else
1166: task = sched_unuseTask(task);
1167:
1168: return task;
1169: }
1170:
1171: /*
1172: * schedCallOnce() - Call once from scheduler
1173: *
1174: * @root = root task
1175: * @func = task execution function
1176: * @arg = 1st func argument
1177: * @val = additional func argument
1178: * @opt_data = Optional data
1179: * @opt_dlen = Optional data length
1180: * return: return value from called func
1181: */
1182: sched_task_t *
1183: schedCallOnce(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val,
1184: void *opt_data, size_t opt_dlen)
1185: {
1186: sched_task_t *task;
1187: void *ret;
1188:
1189: if (!root || !func)
1190: return NULL;
1191:
1192: /* get new task */
1193: if (!(task = sched_useTask(root)))
1194: return NULL;
1195:
1196: task->task_func = func;
1197: TASK_TYPE(task) = taskEVENT;
1198: TASK_ROOT(task) = root;
1199:
1200: TASK_ARG(task) = arg;
1201: TASK_VAL(task) = val;
1202:
1203: TASK_DATA(task) = opt_data;
1204: TASK_DATLEN(task) = opt_dlen;
1205:
1206: ret = schedCall(task);
1207:
1208: sched_unuseTask(task);
1209: return ret;
1210: }
1211:
1212: /*
1213: * schedThread() - Add thread task to scheduler queue
1214: *
1215: * @root = root task
1216: * @func = task execution function
1217: * @arg = 1st func argument
1218: * @detach = Detach thread from scheduler, if !=0
1219: * @ss = stack size
1220: * @opt_data = Optional data
1221: * @opt_dlen = Optional data length
1222: * return: NULL error or !=NULL new queued task
1223: */
1224: sched_task_t *
1225: schedThread(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int detach,
1226: size_t ss, void *opt_data, size_t opt_dlen)
1227: {
1228: #ifndef HAVE_LIBPTHREAD
1229: sched_SetErr(ENOTSUP, "Not supported thread tasks");
1230: return NULL;
1231: #endif
1232: sched_task_t *task;
1233: void *ptr;
1234: pthread_attr_t attr;
1235: sem_t *s = NULL;
1236:
1237: if (!root || !func)
1238: return NULL;
1239: else {
1240: /* normalizing stack size & detach state */
1241: if (ss)
1242: ss &= 0x7FFFFFFF;
1243: detach = detach ? PTHREAD_CREATE_DETACHED : PTHREAD_CREATE_JOINABLE;
1244: }
1245:
1246: if (!(s = (sem_t*) malloc(sizeof(sem_t)))) {
1247: LOGERR;
1248: return NULL;
1249: }
1250: if (sem_init(s, 0, 1)) {
1251: LOGERR;
1252: free(s);
1253: return NULL;
1254: }
1255:
1256: /* get new task */
1257: if (!(task = sched_useTask(root))) {
1258: sem_destroy(s);
1259: free(s);
1260:
1261: return NULL;
1262: }
1263:
1264: task->task_func = func;
1265: TASK_TYPE(task) = taskTHREAD;
1266: TASK_ROOT(task) = root;
1267:
1268: TASK_ARG(task) = arg;
1269: TASK_FLAG(task) = detach;
1270: TASK_RET(task) = (intptr_t) s;
1271:
1272: TASK_DATA(task) = opt_data;
1273: TASK_DATLEN(task) = opt_dlen;
1274:
1275: pthread_attr_init(&attr);
1276: pthread_attr_setdetachstate(&attr, detach);
1277: if (ss && (errno = pthread_attr_setstacksize(&attr, ss))) {
1278: LOGERR;
1279: pthread_attr_destroy(&attr);
1280: sem_destroy(s);
1281: free(s);
1282: return sched_unuseTask(task);
1283: }
1284: if ((errno = pthread_attr_getstacksize(&attr, &ss))) {
1285: LOGERR;
1286: pthread_attr_destroy(&attr);
1287: sem_destroy(s);
1288: free(s);
1289: return sched_unuseTask(task);
1290: } else
1291: TASK_FLAG(task) |= (ss << 1);
1292: if ((errno = pthread_attr_setguardsize(&attr, ss))) {
1293: LOGERR;
1294: pthread_attr_destroy(&attr);
1295: sem_destroy(s);
1296: free(s);
1297: return sched_unuseTask(task);
1298: }
1299: #ifdef SCHED_RR
1300: pthread_attr_setschedpolicy(&attr, SCHED_RR);
1301: #else
1302: pthread_attr_setschedpolicy(&attr, SCHED_OTHER);
1303: #endif
1304: if (root->root_hooks.hook_add.thread)
1305: ptr = root->root_hooks.hook_add.thread(task, &attr);
1306: else
1307: ptr = NULL;
1308: pthread_attr_destroy(&attr);
1309:
1310: if (!ptr) {
1311: pthread_mutex_lock(&root->root_mtx[taskTHREAD]);
1312: TAILQ_INSERT_TAIL(&root->root_thread, TASK_ID(task), task_node);
1313: pthread_mutex_unlock(&root->root_mtx[taskTHREAD]);
1314:
1315: /* wait for init thread actions */
1316: sem_wait(s);
1317: } else
1318: task = sched_unuseTask(task);
1319:
1320: sem_destroy(s);
1321: free(s);
1322: return task;
1323: }
1324:
1325: /*
1326: * schedRTC() - Add RTC task to scheduler queue
1327: *
1328: * @root = root task
1329: * @func = task execution function
1330: * @arg = 1st func argument
1331: * @ts = timeout argument structure, minimum alarm timer resolution is 1msec!
1332: * @opt_data = Optional RTC ID
1333: * @opt_dlen = Optional Signal No.
1334: * return: NULL error or !=NULL new queued task
1335: */
1336: sched_task_t *
1337: schedRTC(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,
1338: void *opt_data, size_t opt_dlen)
1339: {
1340: #if defined(HAVE_TIMER_CREATE) && defined(HAVE_TIMER_SETTIME)
1341: sched_task_t *task;
1342: void *ptr;
1343:
1344: if (!root || !func)
1345: return NULL;
1346:
1347: /* get new task */
1348: if (!(task = sched_useTask(root)))
1349: return NULL;
1350:
1351: task->task_func = func;
1352: TASK_TYPE(task) = taskRTC;
1353: TASK_ROOT(task) = root;
1354:
1355: TASK_ARG(task) = arg;
1356: TASK_TS(task) = ts;
1357:
1358: TASK_DATA(task) = opt_data;
1359: TASK_DATLEN(task) = opt_dlen;
1360:
1361: if (root->root_hooks.hook_add.rtc)
1362: ptr = root->root_hooks.hook_add.rtc(task, NULL);
1363: else
1364: ptr = NULL;
1365:
1366: if (!ptr) {
1367: #ifdef HAVE_LIBPTHREAD
1368: pthread_mutex_lock(&root->root_mtx[taskRTC]);
1369: #endif
1370: TAILQ_INSERT_TAIL(&root->root_rtc, TASK_ID(task), task_node);
1371: #ifdef HAVE_LIBPTHREAD
1372: pthread_mutex_unlock(&root->root_mtx[taskRTC]);
1373: #endif
1374: } else
1375: task = sched_unuseTask(task);
1376:
1377: return task;
1378: #else
1379: sched_SetErr(ENOTSUP, "Not supported realtime clock extensions");
1380: return NULL;
1381: #endif
1382: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>