1: /*************************************************************************
2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
6: * $Id: tasks.c,v 1.27 2019/01/14 15:58:50 misho Exp $
7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
15: Copyright 2004 - 2018
16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47:
48:
49: /*
50: * sched_useTask() - Get and init new task
51: *
52: * @root = root task
53: * return: NULL error or !=NULL prepared task
54: */
55: sched_task_t *
56: sched_useTask(sched_root_task_t * __restrict root)
57: {
58: sched_task_t *task, *tmp;
59:
60: SCHED_QLOCK(root, taskUNUSE);
61: TAILQ_FOREACH_SAFE(task, &root->root_unuse, task_node, tmp) {
62: if (!TASK_ISLOCKED(task)) {
63: TAILQ_REMOVE(&root->root_unuse, task, task_node);
64: break;
65: }
66: }
67: SCHED_QUNLOCK(root, taskUNUSE);
68:
69: if (!task) {
70: task = malloc(sizeof(sched_task_t));
71: if (!task) {
72: LOGERR;
73: return NULL;
74: }
75: }
76:
77: memset(task, 0, sizeof(sched_task_t));
78: task->task_id = (uintptr_t) task;
79: return task;
80: }
81:
82: /*
83: * sched_unuseTask() - Unlock and put task to unuse queue
84: *
85: * @task = task
86: * return: always is NULL
87: */
88: sched_task_t *
89: sched_unuseTask(sched_task_t * __restrict task)
90: {
91: TASK_UNLOCK(task);
92:
93: TASK_TYPE(task) = taskUNUSE;
94: insert_task_to(task, &(TASK_ROOT(task))->root_unuse);
95:
96: task = NULL;
97: return task;
98: }
99:
100: /*
101: * sched_taskExit() - Exit routine for scheduler task, explicit required for thread tasks
102: *
103: * @task = current task
104: * @retcode = return code
105: * return: return code
106: */
107: void *
108: sched_taskExit(sched_task_t *task, intptr_t retcode)
109: {
110: if (!task || !TASK_ROOT(task))
111: return (void*) -1;
112:
113: if (TASK_ROOT(task)->root_hooks.hook_exec.exit)
114: TASK_ROOT(task)->root_hooks.hook_exec.exit(task, (void*) retcode);
115:
116: TASK_ROOT(task)->root_ret = (void*) retcode;
117: return (void*) retcode;
118: }
119:
120:
121: /*
122: * schedRead() - Add READ I/O task to scheduler queue
123: *
124: * @root = root task
125: * @func = task execution function
126: * @arg = 1st func argument
127: * @fd = fd handle
128: * @opt_data = Optional data
129: * @opt_dlen = Optional data length
130: * return: NULL error or !=NULL new queued task
131: */
132: sched_task_t *
133: schedRead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
134: void *opt_data, size_t opt_dlen)
135: {
136: sched_task_t *task;
137: void *ptr;
138:
139: if (!root || !func)
140: return NULL;
141:
142: /* get new task */
143: if (!(task = sched_useTask(root)))
144: return NULL;
145:
146: TASK_FUNC(task) = func;
147: TASK_TYPE(task) = taskREAD;
148: TASK_ROOT(task) = root;
149:
150: TASK_ARG(task) = arg;
151: TASK_FD(task) = fd;
152:
153: TASK_DATA(task) = opt_data;
154: TASK_DATLEN(task) = opt_dlen;
155:
156: if (root->root_hooks.hook_add.read)
157: ptr = root->root_hooks.hook_add.read(task, NULL);
158: else
159: ptr = NULL;
160:
161: if (!ptr)
162: insert_task_to(task, &root->root_read);
163: else
164: task = sched_unuseTask(task);
165:
166: return task;
167: }
168:
169: /*
170: * schedWrite() - Add WRITE I/O task to scheduler queue
171: *
172: * @root = root task
173: * @func = task execution function
174: * @arg = 1st func argument
175: * @fd = fd handle
176: * @opt_data = Optional data
177: * @opt_dlen = Optional data length
178: * return: NULL error or !=NULL new queued task
179: */
180: sched_task_t *
181: schedWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
182: void *opt_data, size_t opt_dlen)
183: {
184: sched_task_t *task;
185: void *ptr;
186:
187: if (!root || !func)
188: return NULL;
189:
190: /* get new task */
191: if (!(task = sched_useTask(root)))
192: return NULL;
193:
194: TASK_FUNC(task) = func;
195: TASK_TYPE(task) = taskWRITE;
196: TASK_ROOT(task) = root;
197:
198: TASK_ARG(task) = arg;
199: TASK_FD(task) = fd;
200:
201: TASK_DATA(task) = opt_data;
202: TASK_DATLEN(task) = opt_dlen;
203:
204: if (root->root_hooks.hook_add.write)
205: ptr = root->root_hooks.hook_add.write(task, NULL);
206: else
207: ptr = NULL;
208:
209: if (!ptr)
210: insert_task_to(task, &root->root_write);
211: else
212: task = sched_unuseTask(task);
213:
214: return task;
215: }
216:
217: /*
218: * schedNode() - Add NODE task to scheduler queue
219: *
220: * @root = root task
221: * @func = task execution function
222: * @arg = 1st func argument
223: * @fd = fd handle
224: * @opt_data = Optional data
225: * @opt_dlen = Optional data length
226: * return: NULL error or !=NULL new queued task
227: */
228: sched_task_t *
229: schedNode(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
230: void *opt_data, size_t opt_dlen)
231: {
232: #if SUP_ENABLE != KQ_SUPPORT
233: sched_SetErr(ENOTSUP, "disabled kqueue support");
234: return NULL;
235: #else
236: sched_task_t *task;
237: void *ptr;
238:
239: if (!root || !func)
240: return NULL;
241:
242: /* get new task */
243: if (!(task = sched_useTask(root)))
244: return NULL;
245:
246: TASK_FUNC(task) = func;
247: TASK_TYPE(task) = taskNODE;
248: TASK_ROOT(task) = root;
249:
250: TASK_ARG(task) = arg;
251: TASK_FD(task) = fd;
252:
253: TASK_DATA(task) = opt_data;
254: TASK_DATLEN(task) = opt_dlen;
255:
256: if (root->root_hooks.hook_add.node)
257: ptr = root->root_hooks.hook_add.node(task, NULL);
258: else
259: ptr = NULL;
260:
261: if (!ptr)
262: insert_task_to(task, &root->root_node);
263: else
264: task = sched_unuseTask(task);
265:
266: return task;
267: #endif /* KQ_SUPPORT */
268: }
269:
270: /*
271: * schedNode2() - Add NODE task with all events to scheduler queue
272: *
273: * @root = root task
274: * @func = task execution function
275: * @arg = 1st func argument
276: * @fd = fd handle
277: * @opt_data = Optional data
278: * @opt_dlen = Optional data length
279: * return: NULL error or !=NULL new queued task
280: */
281: sched_task_t *
282: schedNode2(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
283: void *opt_data, size_t opt_dlen)
284: {
285: #if SUP_ENABLE != KQ_SUPPORT
286: sched_SetErr(ENOTSUP, "disabled kqueue support");
287: return NULL;
288: #else
289: sched_task_t *task;
290: void *ptr;
291:
292: if (!root || !func)
293: return NULL;
294:
295: /* get new task */
296: if (!(task = sched_useTask(root)))
297: return NULL;
298:
299: TASK_FUNC(task) = func;
300: TASK_TYPE(task) = taskNODE;
301: TASK_ROOT(task) = root;
302:
303: TASK_ARG(task) = arg;
304: TASK_FD(task) = fd;
305:
306: TASK_DATA(task) = opt_data;
307: TASK_DATLEN(task) = opt_dlen;
308:
309: if (root->root_hooks.hook_add.node)
310: ptr = root->root_hooks.hook_add.node(task,
311: (void*) (NOTE_READ | NOTE_CLOSE_WRITE | NOTE_CLOSE | NOTE_OPEN));
312: else
313: ptr = NULL;
314:
315: if (!ptr)
316: insert_task_to(task, &root->root_node);
317: else
318: task = sched_unuseTask(task);
319:
320: return task;
321: #endif /* KQ_SUPPORT */
322: }
323:
324: /*
325: * schedProc() - Add PROC task to scheduler queue
326: *
327: * @root = root task
328: * @func = task execution function
329: * @arg = 1st func argument
330: * @pid = PID
331: * @opt_data = Optional data
332: * @opt_dlen = Optional data length
333: * return: NULL error or !=NULL new queued task
334: */
335: sched_task_t *
336: schedProc(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long pid,
337: void *opt_data, size_t opt_dlen)
338: {
339: #if SUP_ENABLE != KQ_SUPPORT
340: sched_SetErr(ENOTSUP, "disabled kqueue support");
341: return NULL;
342: #else
343: sched_task_t *task;
344: void *ptr;
345:
346: if (!root || !func)
347: return NULL;
348:
349: /* get new task */
350: if (!(task = sched_useTask(root)))
351: return NULL;
352:
353: TASK_FUNC(task) = func;
354: TASK_TYPE(task) = taskPROC;
355: TASK_ROOT(task) = root;
356:
357: TASK_ARG(task) = arg;
358: TASK_VAL(task) = pid;
359:
360: TASK_DATA(task) = opt_data;
361: TASK_DATLEN(task) = opt_dlen;
362:
363: if (root->root_hooks.hook_add.proc)
364: ptr = root->root_hooks.hook_add.proc(task, NULL);
365: else
366: ptr = NULL;
367:
368: if (!ptr)
369: insert_task_to(task, &root->root_proc);
370: else
371: task = sched_unuseTask(task);
372:
373: return task;
374: #endif /* KQ_SUPPORT */
375: }
376:
377: /*
378: * schedUser() - Add trigger USER task to scheduler queue
379: *
380: * @root = root task
381: * @func = task execution function
382: * @arg = 1st func argument
383: * @id = Trigger ID
384: * @opt_data = Optional data
385: * @opt_dlen = Optional user's trigger flags
386: * return: NULL error or !=NULL new queued task
387: */
388: sched_task_t *
389: schedUser(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id,
390: void *opt_data, size_t opt_dlen)
391: {
392: #if SUP_ENABLE != KQ_SUPPORT
393: sched_SetErr(ENOTSUP, "disabled kqueue support");
394: return NULL;
395: #else
396: #ifndef EVFILT_USER
397: sched_SetErr(ENOTSUP, "Not supported kevent() filter");
398: return NULL;
399: #else
400: sched_task_t *task;
401: void *ptr;
402:
403: if (!root || !func)
404: return NULL;
405:
406: /* get new task */
407: if (!(task = sched_useTask(root)))
408: return NULL;
409:
410: TASK_FUNC(task) = func;
411: TASK_TYPE(task) = taskUSER;
412: TASK_ROOT(task) = root;
413:
414: TASK_ARG(task) = arg;
415: TASK_VAL(task) = id;
416:
417: TASK_DATA(task) = opt_data;
418: TASK_DATLEN(task) = opt_dlen;
419:
420: if (root->root_hooks.hook_add.user)
421: ptr = root->root_hooks.hook_add.user(task, NULL);
422: else
423: ptr = NULL;
424:
425: if (!ptr)
426: insert_task_to(task, &root->root_user);
427: else
428: task = sched_unuseTask(task);
429:
430: return task;
431: #endif /* EVFILT_USER */
432: #endif /* KQ_SUPPORT */
433: }
434:
435: /*
436: * schedSignal() - Add SIGNAL task to scheduler queue
437: *
438: * @root = root task
439: * @func = task execution function
440: * @arg = 1st func argument
441: * @sig = Signal
442: * @opt_data = Optional data
443: * @opt_dlen = Optional data length
444: * return: NULL error or !=NULL new queued task
445: */
446: sched_task_t *
447: schedSignal(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long sig,
448: void *opt_data, size_t opt_dlen)
449: {
450: #if SUP_ENABLE != KQ_SUPPORT
451: sched_SetErr(ENOTSUP, "disabled kqueue support");
452: return NULL;
453: #else
454: sched_task_t *task;
455: void *ptr;
456:
457: if (!root || !func)
458: return NULL;
459:
460: /* get new task */
461: if (!(task = sched_useTask(root)))
462: return NULL;
463:
464: TASK_FUNC(task) = func;
465: TASK_TYPE(task) = taskSIGNAL;
466: TASK_ROOT(task) = root;
467:
468: TASK_ARG(task) = arg;
469: TASK_VAL(task) = sig;
470:
471: TASK_DATA(task) = opt_data;
472: TASK_DATLEN(task) = opt_dlen;
473:
474: if (root->root_hooks.hook_add.signal)
475: ptr = root->root_hooks.hook_add.signal(task, NULL);
476: else
477: ptr = NULL;
478:
479: if (!ptr)
480: insert_task_to(task, &root->root_signal);
481: else
482: task = sched_unuseTask(task);
483:
484: return task;
485: #endif /* KQ_SUPPORT */
486: }
487:
488: /*
489: * schedAlarm() - Add ALARM task to scheduler queue
490: *
491: * @root = root task
492: * @func = task execution function
493: * @arg = 1st func argument
494: * @ts = timeout argument structure, minimum alarm timer resolution is 1msec!
495: * @opt_data = Alarm timer ID
496: * @opt_dlen = Optional data length
497: * return: NULL error or !=NULL new queued task
498: */
499: sched_task_t *
500: schedAlarm(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,
501: void *opt_data, size_t opt_dlen)
502: {
503: #if SUP_ENABLE != KQ_SUPPORT
504: sched_SetErr(ENOTSUP, "disabled kqueue support");
505: return NULL;
506: #else
507: sched_task_t *task;
508: void *ptr;
509:
510: if (!root || !func)
511: return NULL;
512:
513: /* get new task */
514: if (!(task = sched_useTask(root)))
515: return NULL;
516:
517: TASK_FUNC(task) = func;
518: TASK_TYPE(task) = taskALARM;
519: TASK_ROOT(task) = root;
520:
521: TASK_ARG(task) = arg;
522: TASK_TS(task) = ts;
523:
524: TASK_DATA(task) = opt_data;
525: TASK_DATLEN(task) = opt_dlen;
526:
527: if (root->root_hooks.hook_add.alarm)
528: ptr = root->root_hooks.hook_add.alarm(task, NULL);
529: else
530: ptr = NULL;
531:
532: if (!ptr)
533: insert_task_to(task, &root->root_alarm);
534: else
535: task = sched_unuseTask(task);
536:
537: return task;
538: #endif /* KQ_SUPPORT */
539: }
540:
541: #ifdef AIO_SUPPORT
542: /*
543: * schedAIO() - Add AIO task to scheduler queue
544: *
545: * @root = root task
546: * @func = task execution function
547: * @arg = 1st func argument
548: * @acb = AIO cb structure address
549: * @opt_data = Optional data
550: * @opt_dlen = Optional data length
551: * return: NULL error or !=NULL new queued task
552: */
553: sched_task_t *
554: schedAIO(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg,
555: struct aiocb * __restrict acb, void *opt_data, size_t opt_dlen)
556: {
557: #if SUP_ENABLE != KQ_SUPPORT
558: sched_SetErr(ENOTSUP, "disabled kqueue support");
559: return NULL;
560: #else
561: sched_task_t *task;
562: void *ptr;
563:
564: if (!root || !func || !acb || !opt_dlen)
565: return NULL;
566:
567: /* get new task */
568: if (!(task = sched_useTask(root)))
569: return NULL;
570:
571: TASK_FUNC(task) = func;
572: TASK_TYPE(task) = taskAIO;
573: TASK_ROOT(task) = root;
574:
575: TASK_ARG(task) = arg;
576: TASK_VAL(task) = (u_long) acb;
577:
578: TASK_DATA(task) = opt_data;
579: TASK_DATLEN(task) = opt_dlen;
580:
581: if (root->root_hooks.hook_add.aio)
582: ptr = root->root_hooks.hook_add.aio(task, NULL);
583: else
584: ptr = NULL;
585:
586: if (!ptr)
587: insert_task_to(task, &root->root_aio);
588: else
589: task = sched_unuseTask(task);
590:
591: return task;
592: #endif /* KQ_SUPPORT */
593: }
594:
595: /*
596: * schedAIORead() - Add AIO read task to scheduler queue
597: *
598: * @root = root task
599: * @func = task execution function
600: * @arg = 1st func argument
601: * @fd = file descriptor
602: * @buffer = Buffer
603: * @buflen = Buffer length
604: * @offset = Offset from start of file, if =-1 from current position
605: * return: NULL error or !=NULL new queued task
606: */
607: sched_task_t *
608: schedAIORead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
609: void *buffer, size_t buflen, off_t offset)
610: {
611: #if SUP_ENABLE != KQ_SUPPORT
612: sched_SetErr(ENOTSUP, "disabled kqueue support");
613: return NULL;
614: #else
615: struct aiocb *acb;
616: off_t off;
617:
618: if (!root || !func || !buffer || !buflen)
619: return NULL;
620:
621: if (offset == (off_t) -1) {
622: off = lseek(fd, 0, SEEK_CUR);
623: if (off == -1) {
624: LOGERR;
625: return NULL;
626: }
627: } else
628: off = offset;
629:
630: if (!(acb = malloc(sizeof(struct aiocb)))) {
631: LOGERR;
632: return NULL;
633: } else
634: memset(acb, 0, sizeof(struct aiocb));
635:
636: acb->aio_fildes = fd;
637: acb->aio_nbytes = buflen;
638: acb->aio_buf = buffer;
639: acb->aio_offset = off;
640: acb->aio_sigevent.sigev_notify = SIGEV_KEVENT;
641: acb->aio_sigevent.sigev_notify_kqueue = root->root_kq;
642: acb->aio_sigevent.sigev_value.sival_ptr = acb;
643:
644: if (aio_read(acb)) {
645: LOGERR;
646: free(acb);
647: return NULL;
648: }
649:
650: return schedAIO(root, func, arg, acb, buffer, buflen);
651: #endif /* KQ_SUPPORT */
652: }
653:
654: /*
655: * schedAIOWrite() - Add AIO write task to scheduler queue
656: *
657: * @root = root task
658: * @func = task execution function
659: * @arg = 1st func argument
660: * @fd = file descriptor
661: * @buffer = Buffer
662: * @buflen = Buffer length
663: * @offset = Offset from start of file, if =-1 from current position
664: * return: NULL error or !=NULL new queued task
665: */
666: sched_task_t *
667: schedAIOWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
668: void *buffer, size_t buflen, off_t offset)
669: {
670: #if SUP_ENABLE != KQ_SUPPORT
671: sched_SetErr(ENOTSUP, "disabled kqueue support");
672: return NULL;
673: #else
674: struct aiocb *acb;
675: off_t off;
676:
677: if (!root || !func || !buffer || !buflen)
678: return NULL;
679:
680: if (offset == (off_t) -1) {
681: off = lseek(fd, 0, SEEK_CUR);
682: if (off == -1) {
683: LOGERR;
684: return NULL;
685: }
686: } else
687: off = offset;
688:
689: if (!(acb = malloc(sizeof(struct aiocb)))) {
690: LOGERR;
691: return NULL;
692: } else
693: memset(acb, 0, sizeof(struct aiocb));
694:
695: acb->aio_fildes = fd;
696: acb->aio_nbytes = buflen;
697: acb->aio_buf = buffer;
698: acb->aio_offset = off;
699: acb->aio_sigevent.sigev_notify = SIGEV_KEVENT;
700: acb->aio_sigevent.sigev_notify_kqueue = root->root_kq;
701: acb->aio_sigevent.sigev_value.sival_ptr = acb;
702:
703: if (aio_write(acb)) {
704: LOGERR;
705: free(acb);
706: return NULL;
707: }
708:
709: return schedAIO(root, func, arg, acb, buffer, buflen);
710: #endif /* KQ_SUPPORT */
711: }
712:
713: #ifdef EVFILT_LIO
714: /*
715: * schedLIO() - Add AIO bulk tasks to scheduler queue
716: *
717: * @root = root task
718: * @func = task execution function
719: * @arg = 1st func argument
720: * @acbs = AIO cb structure addresses
721: * @opt_data = Optional data
722: * @opt_dlen = Optional data length
723: * return: NULL error or !=NULL new queued task
724: */
725: sched_task_t *
726: schedLIO(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg,
727: struct aiocb ** __restrict acbs, void *opt_data, size_t opt_dlen)
728: {
729: #if SUP_ENABLE != KQ_SUPPORT
730: sched_SetErr(ENOTSUP, "disabled kqueue support");
731: return NULL;
732: #else
733: sched_task_t *task;
734: void *ptr;
735:
736: if (!root || !func || !acbs || !opt_dlen)
737: return NULL;
738:
739: /* get new task */
740: if (!(task = sched_useTask(root)))
741: return NULL;
742:
743: TASK_FUNC(task) = func;
744: TASK_TYPE(task) = taskLIO;
745: TASK_ROOT(task) = root;
746:
747: TASK_ARG(task) = arg;
748: TASK_VAL(task) = (u_long) acbs;
749:
750: TASK_DATA(task) = opt_data;
751: TASK_DATLEN(task) = opt_dlen;
752:
753: if (root->root_hooks.hook_add.lio)
754: ptr = root->root_hooks.hook_add.lio(task, NULL);
755: else
756: ptr = NULL;
757:
758: if (!ptr)
759: insert_task_to(task, &root->root_lio);
760: else
761: task = sched_unuseTask(task);
762:
763: return task;
764: #endif /* KQ_SUPPORT */
765: }
766:
767: /*
768: * schedLIORead() - Add list of AIO read tasks to scheduler queue
769: *
770: * @root = root task
771: * @func = task execution function
772: * @arg = 1st func argument
773: * @fd = file descriptor
774: * @bufs = Buffer's list
775: * @nbufs = Number of Buffers
776: * @offset = Offset from start of file, if =-1 from current position
777: * return: NULL error or !=NULL new queued task
778: */
779: sched_task_t *
780: schedLIORead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
781: struct iovec *bufs, size_t nbufs, off_t offset)
782: {
783: #if SUP_ENABLE != KQ_SUPPORT
784: sched_SetErr(ENOTSUP, "disabled kqueue support");
785: return NULL;
786: #else
787: struct sigevent sig;
788: struct aiocb **acb;
789: off_t off;
790: register int i;
791:
792: if (!root || !func || !bufs || !nbufs)
793: return NULL;
794:
795: if (offset == (off_t) -1) {
796: off = lseek(fd, 0, SEEK_CUR);
797: if (off == -1) {
798: LOGERR;
799: return NULL;
800: }
801: } else
802: off = offset;
803:
804: if (!(acb = calloc(sizeof(void*), nbufs))) {
805: LOGERR;
806: return NULL;
807: } else
808: memset(acb, 0, sizeof(void*) * nbufs);
809: for (i = 0; i < nbufs; off += bufs[i++].iov_len) {
810: acb[i] = malloc(sizeof(struct aiocb));
811: if (!acb[i]) {
812: LOGERR;
813: for (i = 0; i < nbufs; i++)
814: if (acb[i])
815: free(acb[i]);
816: free(acb);
817: return NULL;
818: } else
819: memset(acb[i], 0, sizeof(struct aiocb));
820: acb[i]->aio_fildes = fd;
821: acb[i]->aio_nbytes = bufs[i].iov_len;
822: acb[i]->aio_buf = bufs[i].iov_base;
823: acb[i]->aio_offset = off;
824: acb[i]->aio_lio_opcode = LIO_READ;
825: }
826: memset(&sig, 0, sizeof sig);
827: sig.sigev_notify = SIGEV_KEVENT;
828: sig.sigev_notify_kqueue = root->root_kq;
829: sig.sigev_value.sival_ptr = acb;
830:
831: if (lio_listio(LIO_NOWAIT, acb, nbufs, &sig)) {
832: LOGERR;
833: for (i = 0; i < nbufs; i++)
834: if (acb[i])
835: free(acb[i]);
836: free(acb);
837: return NULL;
838: }
839:
840: return schedLIO(root, func, arg, (void*) acb, bufs, nbufs);
841: #endif /* KQ_SUPPORT */
842: }
843:
844: /*
845: * schedLIOWrite() - Add list of AIO write tasks to scheduler queue
846: *
847: * @root = root task
848: * @func = task execution function
849: * @arg = 1st func argument
850: * @fd = file descriptor
851: * @bufs = Buffer's list
852: * @nbufs = Number of Buffers
853: * @offset = Offset from start of file, if =-1 from current position
854: * return: NULL error or !=NULL new queued task
855: */
856: sched_task_t *
857: schedLIOWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
858: struct iovec *bufs, size_t nbufs, off_t offset)
859: {
860: #if SUP_ENABLE != KQ_SUPPORT
861: sched_SetErr(ENOTSUP, "disabled kqueue support");
862: return NULL;
863: #else
864: struct sigevent sig;
865: struct aiocb **acb;
866: off_t off;
867: register int i;
868:
869: if (!root || !func || !bufs || !nbufs)
870: return NULL;
871:
872: if (offset == (off_t) -1) {
873: off = lseek(fd, 0, SEEK_CUR);
874: if (off == -1) {
875: LOGERR;
876: return NULL;
877: }
878: } else
879: off = offset;
880:
881: if (!(acb = calloc(sizeof(void*), nbufs))) {
882: LOGERR;
883: return NULL;
884: } else
885: memset(acb, 0, sizeof(void*) * nbufs);
886: for (i = 0; i < nbufs; off += bufs[i++].iov_len) {
887: acb[i] = malloc(sizeof(struct aiocb));
888: if (!acb[i]) {
889: LOGERR;
890: for (i = 0; i < nbufs; i++)
891: if (acb[i])
892: free(acb[i]);
893: free(acb);
894: return NULL;
895: } else
896: memset(acb[i], 0, sizeof(struct aiocb));
897: acb[i]->aio_fildes = fd;
898: acb[i]->aio_nbytes = bufs[i].iov_len;
899: acb[i]->aio_buf = bufs[i].iov_base;
900: acb[i]->aio_offset = off;
901: acb[i]->aio_lio_opcode = LIO_WRITE;
902: }
903: memset(&sig, 0, sizeof sig);
904: sig.sigev_notify = SIGEV_KEVENT;
905: sig.sigev_notify_kqueue = root->root_kq;
906: sig.sigev_value.sival_ptr = acb;
907:
908: if (lio_listio(LIO_NOWAIT, acb, nbufs, &sig)) {
909: LOGERR;
910: for (i = 0; i < nbufs; i++)
911: if (acb[i])
912: free(acb[i]);
913: free(acb);
914: return NULL;
915: }
916:
917: return schedLIO(root, func, arg, (void*) acb, bufs, nbufs);
918: #endif /* KQ_SUPPORT */
919: }
920: #endif /* EVFILT_LIO */
921: #endif /* AIO_SUPPORT */
922:
923: /*
924: * schedTimer() - Add TIMER task to scheduler queue
925: *
926: * @root = root task
927: * @func = task execution function
928: * @arg = 1st func argument
929: * @ts = timeout argument structure
930: * @opt_data = Optional data
931: * @opt_dlen = Optional data length
932: * return: NULL error or !=NULL new queued task
933: */
934: sched_task_t *
935: schedTimer(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,
936: void *opt_data, size_t opt_dlen)
937: {
938: sched_task_t *task, *tmp, *t = NULL;
939: void *ptr;
940: struct timespec now;
941:
942: if (!root || !func)
943: return NULL;
944:
945: /* get new task */
946: if (!(task = sched_useTask(root)))
947: return NULL;
948:
949: TASK_FUNC(task) = func;
950: TASK_TYPE(task) = taskTIMER;
951: TASK_ROOT(task) = root;
952:
953: TASK_ARG(task) = arg;
954:
955: TASK_DATA(task) = opt_data;
956: TASK_DATLEN(task) = opt_dlen;
957:
958: /* calculate timeval structure */
959: clock_gettime(CLOCK_MONOTONIC, &now);
960: now.tv_sec += ts.tv_sec;
961: now.tv_nsec += ts.tv_nsec;
962: if (now.tv_nsec >= 1000000000L) {
963: now.tv_sec++;
964: now.tv_nsec -= 1000000000L;
965: } else if (now.tv_nsec < 0) {
966: now.tv_sec--;
967: now.tv_nsec += 1000000000L;
968: }
969: TASK_TS(task) = now;
970:
971: if (root->root_hooks.hook_add.timer)
972: ptr = root->root_hooks.hook_add.timer(task, NULL);
973: else
974: ptr = NULL;
975:
976: if (!ptr) {
977: SCHED_QLOCK(root, taskTIMER);
978: #ifdef TIMER_WITHOUT_SORT
979: TAILQ_INSERT_TAIL(&root->root_timer, task, task_node);
980: #else
981: TAILQ_FOREACH_SAFE(t, &root->root_timer, task_node, tmp)
982: if (sched_timespeccmp(&TASK_TS(task), &TASK_TS(t), -) < 1)
983: break;
984: if (!t)
985: TAILQ_INSERT_TAIL(&root->root_timer, task, task_node);
986: else
987: TAILQ_INSERT_BEFORE(t, task, task_node);
988: #endif
989: SCHED_QUNLOCK(root, taskTIMER);
990: } else
991: task = sched_unuseTask(task);
992:
993: return task;
994: }
995:
996: /*
997: * schedEvent() - Add EVENT task to scheduler queue
998: *
999: * @root = root task
1000: * @func = task execution function
1001: * @arg = 1st func argument
1002: * @val = additional func argument
1003: * @opt_data = Optional data
1004: * @opt_dlen = Optional data length
1005: * return: NULL error or !=NULL new queued task
1006: */
1007: sched_task_t *
1008: schedEvent(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val,
1009: void *opt_data, size_t opt_dlen)
1010: {
1011: sched_task_t *task;
1012: void *ptr;
1013:
1014: if (!root || !func)
1015: return NULL;
1016:
1017: /* get new task */
1018: if (!(task = sched_useTask(root)))
1019: return NULL;
1020:
1021: TASK_FUNC(task) = func;
1022: TASK_TYPE(task) = taskEVENT;
1023: TASK_ROOT(task) = root;
1024:
1025: TASK_ARG(task) = arg;
1026: TASK_VAL(task) = val;
1027:
1028: TASK_DATA(task) = opt_data;
1029: TASK_DATLEN(task) = opt_dlen;
1030:
1031: if (root->root_hooks.hook_add.event)
1032: ptr = root->root_hooks.hook_add.event(task, NULL);
1033: else
1034: ptr = NULL;
1035:
1036: if (!ptr)
1037: insert_task_to(task, &root->root_event);
1038: else
1039: task = sched_unuseTask(task);
1040:
1041: return task;
1042: }
1043:
1044:
1045: /*
1046: * schedTask() - Add regular task to scheduler queue
1047: *
1048: * @root = root task
1049: * @func = task execution function
1050: * @arg = 1st func argument
1051: * @prio = regular task priority, 0 is hi priority for regular tasks
1052: * @opt_data = Optional data
1053: * @opt_dlen = Optional data length
1054: * return: NULL error or !=NULL new queued task
1055: */
1056: sched_task_t *
1057: schedTask(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long prio,
1058: void *opt_data, size_t opt_dlen)
1059: {
1060: sched_task_t *task, *tmp, *t = NULL;
1061: void *ptr;
1062:
1063: if (!root || !func)
1064: return NULL;
1065:
1066: /* get new task */
1067: if (!(task = sched_useTask(root)))
1068: return NULL;
1069:
1070: TASK_FUNC(task) = func;
1071: TASK_TYPE(task) = taskTASK;
1072: TASK_ROOT(task) = root;
1073:
1074: TASK_ARG(task) = arg;
1075: TASK_VAL(task) = prio;
1076:
1077: TASK_DATA(task) = opt_data;
1078: TASK_DATLEN(task) = opt_dlen;
1079:
1080: if (root->root_hooks.hook_add.task)
1081: ptr = root->root_hooks.hook_add.task(task, NULL);
1082: else
1083: ptr = NULL;
1084:
1085: if (!ptr) {
1086: SCHED_QLOCK(root, taskTASK);
1087: TAILQ_FOREACH_SAFE(t, &root->root_task, task_node, tmp)
1088: if (TASK_VAL(task) < TASK_VAL(t))
1089: break;
1090: if (!t)
1091: TAILQ_INSERT_TAIL(&root->root_task, task, task_node);
1092: else
1093: TAILQ_INSERT_BEFORE(t, task, task_node);
1094: SCHED_QUNLOCK(root, taskTASK);
1095: } else
1096: task = sched_unuseTask(task);
1097:
1098: return task;
1099: }
1100:
1101: /*
1102: * schedSuspend() - Add Suspended task to scheduler queue
1103: *
1104: * @root = root task
1105: * @func = task execution function
1106: * @arg = 1st func argument
1107: * @id = Trigger ID
1108: * @opt_data = Optional data
1109: * @opt_dlen = Optional data length
1110: * return: NULL error or !=NULL new queued task
1111: */
1112: sched_task_t *
1113: schedSuspend(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id,
1114: void *opt_data, size_t opt_dlen)
1115: {
1116: sched_task_t *task;
1117: void *ptr;
1118:
1119: if (!root || !func)
1120: return NULL;
1121:
1122: /* get new task */
1123: if (!(task = sched_useTask(root)))
1124: return NULL;
1125:
1126: TASK_FUNC(task) = func;
1127: TASK_TYPE(task) = taskSUSPEND;
1128: TASK_ROOT(task) = root;
1129:
1130: TASK_ARG(task) = arg;
1131: TASK_VAL(task) = id;
1132:
1133: TASK_DATA(task) = opt_data;
1134: TASK_DATLEN(task) = opt_dlen;
1135:
1136: if (root->root_hooks.hook_add.suspend)
1137: ptr = root->root_hooks.hook_add.suspend(task, NULL);
1138: else
1139: ptr = NULL;
1140:
1141: if (!ptr)
1142: insert_task_to(task, &root->root_suspend);
1143: else
1144: task = sched_unuseTask(task);
1145:
1146: return task;
1147: }
1148:
1149: /*
1150: * schedCallOnce() - Call once from scheduler
1151: *
1152: * @root = root task
1153: * @func = task execution function
1154: * @arg = 1st func argument
1155: * @val = additional func argument
1156: * @opt_data = Optional data
1157: * @opt_dlen = Optional data length
1158: * return: return value from called func
1159: */
1160: sched_task_t *
1161: schedCallOnce(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val,
1162: void *opt_data, size_t opt_dlen)
1163: {
1164: sched_task_t *task;
1165: void *ret;
1166:
1167: if (!root || !func)
1168: return NULL;
1169:
1170: /* get new task */
1171: if (!(task = sched_useTask(root)))
1172: return NULL;
1173:
1174: TASK_FUNC(task) = func;
1175: TASK_TYPE(task) = taskEVENT;
1176: TASK_ROOT(task) = root;
1177:
1178: TASK_ARG(task) = arg;
1179: TASK_VAL(task) = val;
1180:
1181: TASK_DATA(task) = opt_data;
1182: TASK_DATLEN(task) = opt_dlen;
1183:
1184: ret = schedCall(task);
1185:
1186: sched_unuseTask(task);
1187: return ret;
1188: }
1189:
1190: /*
1191: * schedThread() - Add thread task to scheduler queue
1192: *
1193: * @root = root task
1194: * @func = task execution function
1195: * @arg = 1st func argument
1196: * @ss = stack size
1197: * @opt_data = Optional data
1198: * @opt_dlen = Optional data length
1199: * return: NULL error or !=NULL new queued task
1200: */
1201: sched_task_t *
1202: schedThread(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg,
1203: size_t ss, void *opt_data, size_t opt_dlen)
1204: {
1205: #ifndef HAVE_LIBPTHREAD
1206: sched_SetErr(ENOTSUP, "Not supported thread tasks");
1207: return NULL;
1208: #endif
1209: sched_task_t *task;
1210: pthread_attr_t attr;
1211: void *ptr;
1212:
1213: if (!root || !func)
1214: return NULL;
1215:
1216: /* get new task */
1217: if (!(task = sched_useTask(root)))
1218: return NULL;
1219:
1220: TASK_FUNC(task) = func;
1221: TASK_TYPE(task) = taskTHREAD;
1222: TASK_ROOT(task) = root;
1223:
1224: TASK_ARG(task) = arg;
1225:
1226: TASK_DATA(task) = opt_data;
1227: TASK_DATLEN(task) = opt_dlen;
1228:
1229: pthread_attr_init(&attr);
1230: pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
1231: if (ss && (errno = pthread_attr_setstacksize(&attr, ss))) {
1232: LOGERR;
1233: pthread_attr_destroy(&attr);
1234: return sched_unuseTask(task);
1235: }
1236: if ((errno = pthread_attr_getstacksize(&attr, &ss))) {
1237: LOGERR;
1238: pthread_attr_destroy(&attr);
1239: return sched_unuseTask(task);
1240: } else
1241: TASK_FLAG(task) = ss;
1242:
1243: #ifdef SCHED_RR
1244: pthread_attr_setschedpolicy(&attr, SCHED_RR);
1245: #else
1246: pthread_attr_setschedpolicy(&attr, SCHED_OTHER);
1247: #endif
1248:
1249: if (root->root_hooks.hook_add.thread)
1250: ptr = root->root_hooks.hook_add.thread(task, &attr);
1251: else
1252: ptr = NULL;
1253:
1254: if (!ptr)
1255: insert_task_to(task, &root->root_thread);
1256: else
1257: task = sched_unuseTask(task);
1258:
1259: pthread_attr_destroy(&attr);
1260: return task;
1261: }
1262:
1263: /*
1264: * schedRTC() - Add RTC task to scheduler queue
1265: *
1266: * @root = root task
1267: * @func = task execution function
1268: * @arg = 1st func argument
1269: * @ts = timeout argument structure, minimum alarm timer resolution is 1msec!
1270: * @opt_data = Optional RTC ID
1271: * @opt_dlen = Optional data length
1272: * return: NULL error or !=NULL new queued task
1273: */
1274: sched_task_t *
1275: schedRTC(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,
1276: void *opt_data, size_t opt_dlen)
1277: {
1278: #if defined(HAVE_LIBRT) && defined(HAVE_TIMER_CREATE) && \
1279: defined(HAVE_TIMER_SETTIME) && defined(HAVE_TIMER_DELETE)
1280: sched_task_t *task;
1281: void *ptr;
1282:
1283: if (!root || !func)
1284: return NULL;
1285:
1286: /* get new task */
1287: if (!(task = sched_useTask(root)))
1288: return NULL;
1289:
1290: TASK_FUNC(task) = func;
1291: TASK_TYPE(task) = taskRTC;
1292: TASK_ROOT(task) = root;
1293:
1294: TASK_ARG(task) = arg;
1295: TASK_TS(task) = ts;
1296:
1297: TASK_DATA(task) = opt_data;
1298: TASK_DATLEN(task) = opt_dlen;
1299:
1300: if (root->root_hooks.hook_add.rtc)
1301: ptr = root->root_hooks.hook_add.rtc(task, NULL);
1302: else
1303: ptr = NULL;
1304:
1305: if (!ptr)
1306: insert_task_to(task, &root->root_rtc);
1307: else
1308: task = sched_unuseTask(task);
1309:
1310: return task;
1311: #else
1312: sched_SetErr(ENOTSUP, "Not supported realtime clock extensions");
1313: return NULL;
1314: #endif
1315: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>