1: /*************************************************************************
2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
6: * $Id: tasks.c,v 1.28.4.1 2022/10/03 22:16:36 misho Exp $
7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
15: Copyright 2004 - 2022
16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47:
48:
49: /*
50: * sched_useTask() - Get and init new task
51: *
52: * @root = root task
53: * return: NULL error or !=NULL prepared task
54: */
55: sched_task_t *
56: sched_useTask(sched_root_task_t * __restrict root)
57: {
58: sched_task_t *task, *tmp;
59:
60: SCHED_QLOCK(root, taskUNUSE);
61: TAILQ_FOREACH_SAFE(task, &root->root_unuse, task_node, tmp) {
62: if (!TASK_ISLOCKED(task)) {
63: TAILQ_REMOVE(&root->root_unuse, task, task_node);
64: break;
65: }
66: }
67: SCHED_QUNLOCK(root, taskUNUSE);
68:
69: if (!task) {
70: task = e_malloc(sizeof(sched_task_t));
71: if (!task) {
72: LOGERR;
73: return NULL;
74: }
75: }
76:
77: memset(task, 0, sizeof(sched_task_t));
78: task->task_id = (uintptr_t) task;
79: return task;
80: }
81:
82: /*
83: * sched_unuseTask() - Unlock and put task to unuse queue
84: *
85: * @task = task
86: * return: always is NULL
87: */
88: sched_task_t *
89: sched_unuseTask(sched_task_t * __restrict task)
90: {
91: TASK_UNLOCK(task);
92:
93: TASK_TYPE(task) = taskUNUSE;
94: insert_task_to(task, &(TASK_ROOT(task))->root_unuse);
95:
96: task = NULL;
97: return task;
98: }
99:
100: /*
101: * sched_taskExit() - Exit routine for scheduler task, explicit required for thread tasks
102: *
103: * @task = current task
104: * @retcode = return code
105: * return: return code
106: */
107: void *
108: sched_taskExit(sched_task_t *task, intptr_t retcode)
109: {
110: if (!task || !TASK_ROOT(task))
111: return (void*) -1;
112:
113: if (TASK_ROOT(task)->root_hooks.hook_exec.exit)
114: TASK_ROOT(task)->root_hooks.hook_exec.exit(task, (void*) retcode);
115:
116: TASK_ROOT(task)->root_ret = (void*) retcode;
117: return (void*) retcode;
118: }
119:
120:
121: /*
122: * schedRead() - Add READ I/O task to scheduler queue
123: *
124: * @root = root task
125: * @func = task execution function
126: * @arg = 1st func argument
127: * @fd = fd handle
128: * @opt_data = Optional data
129: * @opt_dlen = Optional data length
130: * return: NULL error or !=NULL new queued task
131: */
132: sched_task_t *
133: schedRead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
134: void *opt_data, size_t opt_dlen)
135: {
136: sched_task_t *task;
137: void *ptr;
138:
139: if (!root || !func)
140: return NULL;
141:
142: /* get new task */
143: if (!(task = sched_useTask(root)))
144: return NULL;
145:
146: TASK_FUNC(task) = func;
147: TASK_TYPE(task) = taskREAD;
148: TASK_ROOT(task) = root;
149:
150: TASK_ARG(task) = arg;
151: TASK_FD(task) = fd;
152:
153: TASK_DATA(task) = opt_data;
154: TASK_DATLEN(task) = opt_dlen;
155:
156: if (root->root_hooks.hook_add.read)
157: ptr = root->root_hooks.hook_add.read(task, NULL);
158: else
159: ptr = NULL;
160:
161: if (!ptr)
162: insert_task_to(task, &root->root_read);
163: else
164: task = sched_unuseTask(task);
165:
166: return task;
167: }
168:
169: /*
170: * schedWrite() - Add WRITE I/O task to scheduler queue
171: *
172: * @root = root task
173: * @func = task execution function
174: * @arg = 1st func argument
175: * @fd = fd handle
176: * @opt_data = Optional data
177: * @opt_dlen = Optional data length
178: * return: NULL error or !=NULL new queued task
179: */
180: sched_task_t *
181: schedWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
182: void *opt_data, size_t opt_dlen)
183: {
184: sched_task_t *task;
185: void *ptr;
186:
187: if (!root || !func)
188: return NULL;
189:
190: /* get new task */
191: if (!(task = sched_useTask(root)))
192: return NULL;
193:
194: TASK_FUNC(task) = func;
195: TASK_TYPE(task) = taskWRITE;
196: TASK_ROOT(task) = root;
197:
198: TASK_ARG(task) = arg;
199: TASK_FD(task) = fd;
200:
201: TASK_DATA(task) = opt_data;
202: TASK_DATLEN(task) = opt_dlen;
203:
204: if (root->root_hooks.hook_add.write)
205: ptr = root->root_hooks.hook_add.write(task, NULL);
206: else
207: ptr = NULL;
208:
209: if (!ptr)
210: insert_task_to(task, &root->root_write);
211: else
212: task = sched_unuseTask(task);
213:
214: return task;
215: }
216:
217: /*
218: * schedNode() - Add NODE task to scheduler queue
219: *
220: * @root = root task
221: * @func = task execution function
222: * @arg = 1st func argument
223: * @fd = fd handle
224: * @opt_data = Optional data
225: * @opt_dlen = Optional data length
226: * return: NULL error or !=NULL new queued task
227: */
228: sched_task_t *
229: schedNode(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
230: void *opt_data, size_t opt_dlen)
231: {
232: #if SUP_ENABLE != KQ_SUPPORT
233: sched_SetErr(ENOTSUP, "disabled kqueue support");
234: return NULL;
235: #else
236: sched_task_t *task;
237: void *ptr;
238:
239: if (!root || !func)
240: return NULL;
241:
242: /* get new task */
243: if (!(task = sched_useTask(root)))
244: return NULL;
245:
246: TASK_FUNC(task) = func;
247: TASK_TYPE(task) = taskNODE;
248: TASK_ROOT(task) = root;
249:
250: TASK_ARG(task) = arg;
251: TASK_FD(task) = fd;
252:
253: TASK_DATA(task) = opt_data;
254: TASK_DATLEN(task) = opt_dlen;
255:
256: if (root->root_hooks.hook_add.node)
257: ptr = root->root_hooks.hook_add.node(task, NULL);
258: else
259: ptr = NULL;
260:
261: if (!ptr)
262: insert_task_to(task, &root->root_node);
263: else
264: task = sched_unuseTask(task);
265:
266: return task;
267: #endif /* KQ_SUPPORT */
268: }
269:
270: /*
271: * schedNode2() - Add NODE task with all events to scheduler queue
272: *
273: * @root = root task
274: * @func = task execution function
275: * @arg = 1st func argument
276: * @fd = fd handle
277: * @opt_data = Optional data
278: * @opt_dlen = Optional data length
279: * return: NULL error or !=NULL new queued task
280: */
281: sched_task_t *
282: schedNode2(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
283: void *opt_data, size_t opt_dlen)
284: {
285: #if SUP_ENABLE != KQ_SUPPORT
286: sched_SetErr(ENOTSUP, "disabled kqueue support");
287: return NULL;
288: #else
289: sched_task_t *task;
290: void *ptr;
291:
292: if (!root || !func)
293: return NULL;
294:
295: /* get new task */
296: if (!(task = sched_useTask(root)))
297: return NULL;
298:
299: TASK_FUNC(task) = func;
300: TASK_TYPE(task) = taskNODE;
301: TASK_ROOT(task) = root;
302:
303: TASK_ARG(task) = arg;
304: TASK_FD(task) = fd;
305:
306: TASK_DATA(task) = opt_data;
307: TASK_DATLEN(task) = opt_dlen;
308:
309: if (root->root_hooks.hook_add.node)
310: #ifdef __FreeBSD__
311: ptr = root->root_hooks.hook_add.node(task,
312: (void*) (NOTE_READ | NOTE_CLOSE_WRITE | NOTE_CLOSE | NOTE_OPEN));
313: #else
314: ptr = root->root_hooks.hook_add.node(task, NULL);
315: #endif
316: else
317: ptr = NULL;
318:
319: if (!ptr)
320: insert_task_to(task, &root->root_node);
321: else
322: task = sched_unuseTask(task);
323:
324: return task;
325: #endif /* KQ_SUPPORT */
326: }
327:
328: /*
329: * schedProc() - Add PROC task to scheduler queue
330: *
331: * @root = root task
332: * @func = task execution function
333: * @arg = 1st func argument
334: * @pid = PID
335: * @opt_data = Optional data
336: * @opt_dlen = Optional data length
337: * return: NULL error or !=NULL new queued task
338: */
339: sched_task_t *
340: schedProc(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long pid,
341: void *opt_data, size_t opt_dlen)
342: {
343: #if SUP_ENABLE != KQ_SUPPORT
344: sched_SetErr(ENOTSUP, "disabled kqueue support");
345: return NULL;
346: #else
347: sched_task_t *task;
348: void *ptr;
349:
350: if (!root || !func)
351: return NULL;
352:
353: /* get new task */
354: if (!(task = sched_useTask(root)))
355: return NULL;
356:
357: TASK_FUNC(task) = func;
358: TASK_TYPE(task) = taskPROC;
359: TASK_ROOT(task) = root;
360:
361: TASK_ARG(task) = arg;
362: TASK_VAL(task) = pid;
363:
364: TASK_DATA(task) = opt_data;
365: TASK_DATLEN(task) = opt_dlen;
366:
367: if (root->root_hooks.hook_add.proc)
368: ptr = root->root_hooks.hook_add.proc(task, NULL);
369: else
370: ptr = NULL;
371:
372: if (!ptr)
373: insert_task_to(task, &root->root_proc);
374: else
375: task = sched_unuseTask(task);
376:
377: return task;
378: #endif /* KQ_SUPPORT */
379: }
380:
381: /*
382: * schedUser() - Add trigger USER task to scheduler queue
383: *
384: * @root = root task
385: * @func = task execution function
386: * @arg = 1st func argument
387: * @id = Trigger ID
388: * @opt_data = Optional data
389: * @opt_dlen = Optional user's trigger flags
390: * return: NULL error or !=NULL new queued task
391: */
392: sched_task_t *
393: schedUser(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id,
394: void *opt_data, size_t opt_dlen)
395: {
396: #if SUP_ENABLE != KQ_SUPPORT
397: sched_SetErr(ENOTSUP, "disabled kqueue support");
398: return NULL;
399: #else
400: #ifndef EVFILT_USER
401: sched_SetErr(ENOTSUP, "Not supported kevent() filter");
402: return NULL;
403: #else
404: sched_task_t *task;
405: void *ptr;
406:
407: if (!root || !func)
408: return NULL;
409:
410: /* get new task */
411: if (!(task = sched_useTask(root)))
412: return NULL;
413:
414: TASK_FUNC(task) = func;
415: TASK_TYPE(task) = taskUSER;
416: TASK_ROOT(task) = root;
417:
418: TASK_ARG(task) = arg;
419: TASK_VAL(task) = id;
420:
421: TASK_DATA(task) = opt_data;
422: TASK_DATLEN(task) = opt_dlen;
423:
424: if (root->root_hooks.hook_add.user)
425: ptr = root->root_hooks.hook_add.user(task, NULL);
426: else
427: ptr = NULL;
428:
429: if (!ptr)
430: insert_task_to(task, &root->root_user);
431: else
432: task = sched_unuseTask(task);
433:
434: return task;
435: #endif /* EVFILT_USER */
436: #endif /* KQ_SUPPORT */
437: }
438:
439: /*
440: * schedSignal() - Add SIGNAL task to scheduler queue
441: *
442: * @root = root task
443: * @func = task execution function
444: * @arg = 1st func argument
445: * @sig = Signal
446: * @opt_data = Optional data
447: * @opt_dlen = Optional data length
448: * return: NULL error or !=NULL new queued task
449: */
450: sched_task_t *
451: schedSignal(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long sig,
452: void *opt_data, size_t opt_dlen)
453: {
454: #if SUP_ENABLE != KQ_SUPPORT
455: sched_SetErr(ENOTSUP, "disabled kqueue support");
456: return NULL;
457: #else
458: sched_task_t *task;
459: void *ptr;
460:
461: if (!root || !func)
462: return NULL;
463:
464: /* get new task */
465: if (!(task = sched_useTask(root)))
466: return NULL;
467:
468: TASK_FUNC(task) = func;
469: TASK_TYPE(task) = taskSIGNAL;
470: TASK_ROOT(task) = root;
471:
472: TASK_ARG(task) = arg;
473: TASK_VAL(task) = sig;
474:
475: TASK_DATA(task) = opt_data;
476: TASK_DATLEN(task) = opt_dlen;
477:
478: if (root->root_hooks.hook_add.signal)
479: ptr = root->root_hooks.hook_add.signal(task, NULL);
480: else
481: ptr = NULL;
482:
483: if (!ptr)
484: insert_task_to(task, &root->root_signal);
485: else
486: task = sched_unuseTask(task);
487:
488: return task;
489: #endif /* KQ_SUPPORT */
490: }
491:
492: /*
493: * schedAlarm() - Add ALARM task to scheduler queue
494: *
495: * @root = root task
496: * @func = task execution function
497: * @arg = 1st func argument
498: * @ts = timeout argument structure, minimum alarm timer resolution is 1msec!
499: * @opt_data = Alarm timer ID
500: * @opt_dlen = Optional data length
501: * return: NULL error or !=NULL new queued task
502: */
503: sched_task_t *
504: schedAlarm(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,
505: void *opt_data, size_t opt_dlen)
506: {
507: #if SUP_ENABLE != KQ_SUPPORT
508: sched_SetErr(ENOTSUP, "disabled kqueue support");
509: return NULL;
510: #else
511: sched_task_t *task;
512: void *ptr;
513:
514: if (!root || !func)
515: return NULL;
516:
517: /* get new task */
518: if (!(task = sched_useTask(root)))
519: return NULL;
520:
521: TASK_FUNC(task) = func;
522: TASK_TYPE(task) = taskALARM;
523: TASK_ROOT(task) = root;
524:
525: TASK_ARG(task) = arg;
526: TASK_TS(task) = ts;
527:
528: TASK_DATA(task) = opt_data;
529: TASK_DATLEN(task) = opt_dlen;
530:
531: if (root->root_hooks.hook_add.alarm)
532: ptr = root->root_hooks.hook_add.alarm(task, NULL);
533: else
534: ptr = NULL;
535:
536: if (!ptr)
537: insert_task_to(task, &root->root_alarm);
538: else
539: task = sched_unuseTask(task);
540:
541: return task;
542: #endif /* KQ_SUPPORT */
543: }
544:
545: #ifdef AIO_SUPPORT
546: /*
547: * schedAIO() - Add AIO task to scheduler queue
548: *
549: * @root = root task
550: * @func = task execution function
551: * @arg = 1st func argument
552: * @acb = AIO cb structure address
553: * @opt_data = Optional data
554: * @opt_dlen = Optional data length
555: * return: NULL error or !=NULL new queued task
556: */
557: sched_task_t *
558: schedAIO(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg,
559: struct aiocb * __restrict acb, void *opt_data, size_t opt_dlen)
560: {
561: #if SUP_ENABLE != KQ_SUPPORT
562: sched_SetErr(ENOTSUP, "disabled kqueue support");
563: return NULL;
564: #else
565: sched_task_t *task;
566: void *ptr;
567:
568: if (!root || !func || !acb || !opt_dlen)
569: return NULL;
570:
571: /* get new task */
572: if (!(task = sched_useTask(root)))
573: return NULL;
574:
575: TASK_FUNC(task) = func;
576: TASK_TYPE(task) = taskAIO;
577: TASK_ROOT(task) = root;
578:
579: TASK_ARG(task) = arg;
580: TASK_VAL(task) = (u_long) acb;
581:
582: TASK_DATA(task) = opt_data;
583: TASK_DATLEN(task) = opt_dlen;
584:
585: if (root->root_hooks.hook_add.aio)
586: ptr = root->root_hooks.hook_add.aio(task, NULL);
587: else
588: ptr = NULL;
589:
590: if (!ptr)
591: insert_task_to(task, &root->root_aio);
592: else
593: task = sched_unuseTask(task);
594:
595: return task;
596: #endif /* KQ_SUPPORT */
597: }
598:
599: /*
600: * schedAIORead() - Add AIO read task to scheduler queue
601: *
602: * @root = root task
603: * @func = task execution function
604: * @arg = 1st func argument
605: * @fd = file descriptor
606: * @buffer = Buffer
607: * @buflen = Buffer length
608: * @offset = Offset from start of file, if =-1 from current position
609: * return: NULL error or !=NULL new queued task
610: */
611: sched_task_t *
612: schedAIORead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
613: void *buffer, size_t buflen, off_t offset)
614: {
615: #if SUP_ENABLE != KQ_SUPPORT
616: sched_SetErr(ENOTSUP, "disabled kqueue support");
617: return NULL;
618: #else
619: struct aiocb *acb;
620: off_t off;
621:
622: if (!root || !func || !buffer || !buflen)
623: return NULL;
624:
625: if (offset == (off_t) -1) {
626: off = lseek(fd, 0, SEEK_CUR);
627: if (off == -1) {
628: LOGERR;
629: return NULL;
630: }
631: } else
632: off = offset;
633:
634: if (!(acb = e_malloc(sizeof(struct aiocb)))) {
635: LOGERR;
636: return NULL;
637: } else
638: memset(acb, 0, sizeof(struct aiocb));
639:
640: acb->aio_fildes = fd;
641: acb->aio_nbytes = buflen;
642: acb->aio_buf = buffer;
643: acb->aio_offset = off;
644: acb->aio_sigevent.sigev_notify = SIGEV_KEVENT;
645: acb->aio_sigevent.sigev_notify_kqueue = root->root_kq;
646: acb->aio_sigevent.sigev_value.sival_ptr = acb;
647:
648: if (aio_read(acb)) {
649: LOGERR;
650: e_free(acb);
651: return NULL;
652: }
653:
654: return schedAIO(root, func, arg, acb, buffer, buflen);
655: #endif /* KQ_SUPPORT */
656: }
657:
658: /*
659: * schedAIOWrite() - Add AIO write task to scheduler queue
660: *
661: * @root = root task
662: * @func = task execution function
663: * @arg = 1st func argument
664: * @fd = file descriptor
665: * @buffer = Buffer
666: * @buflen = Buffer length
667: * @offset = Offset from start of file, if =-1 from current position
668: * return: NULL error or !=NULL new queued task
669: */
670: sched_task_t *
671: schedAIOWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
672: void *buffer, size_t buflen, off_t offset)
673: {
674: #if SUP_ENABLE != KQ_SUPPORT
675: sched_SetErr(ENOTSUP, "disabled kqueue support");
676: return NULL;
677: #else
678: struct aiocb *acb;
679: off_t off;
680:
681: if (!root || !func || !buffer || !buflen)
682: return NULL;
683:
684: if (offset == (off_t) -1) {
685: off = lseek(fd, 0, SEEK_CUR);
686: if (off == -1) {
687: LOGERR;
688: return NULL;
689: }
690: } else
691: off = offset;
692:
693: if (!(acb = e_malloc(sizeof(struct aiocb)))) {
694: LOGERR;
695: return NULL;
696: } else
697: memset(acb, 0, sizeof(struct aiocb));
698:
699: acb->aio_fildes = fd;
700: acb->aio_nbytes = buflen;
701: acb->aio_buf = buffer;
702: acb->aio_offset = off;
703: acb->aio_sigevent.sigev_notify = SIGEV_KEVENT;
704: acb->aio_sigevent.sigev_notify_kqueue = root->root_kq;
705: acb->aio_sigevent.sigev_value.sival_ptr = acb;
706:
707: if (aio_write(acb)) {
708: LOGERR;
709: e_free(acb);
710: return NULL;
711: }
712:
713: return schedAIO(root, func, arg, acb, buffer, buflen);
714: #endif /* KQ_SUPPORT */
715: }
716:
717: #ifdef EVFILT_LIO
718: /*
719: * schedLIO() - Add AIO bulk tasks to scheduler queue
720: *
721: * @root = root task
722: * @func = task execution function
723: * @arg = 1st func argument
724: * @acbs = AIO cb structure addresses
725: * @opt_data = Optional data
726: * @opt_dlen = Optional data length
727: * return: NULL error or !=NULL new queued task
728: */
729: sched_task_t *
730: schedLIO(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg,
731: struct aiocb ** __restrict acbs, void *opt_data, size_t opt_dlen)
732: {
733: #if SUP_ENABLE != KQ_SUPPORT
734: sched_SetErr(ENOTSUP, "disabled kqueue support");
735: return NULL;
736: #else
737: sched_task_t *task;
738: void *ptr;
739:
740: if (!root || !func || !acbs || !opt_dlen)
741: return NULL;
742:
743: /* get new task */
744: if (!(task = sched_useTask(root)))
745: return NULL;
746:
747: TASK_FUNC(task) = func;
748: TASK_TYPE(task) = taskLIO;
749: TASK_ROOT(task) = root;
750:
751: TASK_ARG(task) = arg;
752: TASK_VAL(task) = (u_long) acbs;
753:
754: TASK_DATA(task) = opt_data;
755: TASK_DATLEN(task) = opt_dlen;
756:
757: if (root->root_hooks.hook_add.lio)
758: ptr = root->root_hooks.hook_add.lio(task, NULL);
759: else
760: ptr = NULL;
761:
762: if (!ptr)
763: insert_task_to(task, &root->root_lio);
764: else
765: task = sched_unuseTask(task);
766:
767: return task;
768: #endif /* KQ_SUPPORT */
769: }
770:
771: /*
772: * schedLIORead() - Add list of AIO read tasks to scheduler queue
773: *
774: * @root = root task
775: * @func = task execution function
776: * @arg = 1st func argument
777: * @fd = file descriptor
778: * @bufs = Buffer's list
779: * @nbufs = Number of Buffers
780: * @offset = Offset from start of file, if =-1 from current position
781: * return: NULL error or !=NULL new queued task
782: */
783: sched_task_t *
784: schedLIORead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
785: struct iovec *bufs, size_t nbufs, off_t offset)
786: {
787: #if SUP_ENABLE != KQ_SUPPORT
788: sched_SetErr(ENOTSUP, "disabled kqueue support");
789: return NULL;
790: #else
791: struct sigevent sig;
792: struct aiocb **acb;
793: off_t off;
794: register int i;
795:
796: if (!root || !func || !bufs || !nbufs)
797: return NULL;
798:
799: if (offset == (off_t) -1) {
800: off = lseek(fd, 0, SEEK_CUR);
801: if (off == -1) {
802: LOGERR;
803: return NULL;
804: }
805: } else
806: off = offset;
807:
808: if (!(acb = e_calloc(sizeof(void*), nbufs))) {
809: LOGERR;
810: return NULL;
811: } else
812: memset(acb, 0, sizeof(void*) * nbufs);
813: for (i = 0; i < nbufs; off += bufs[i++].iov_len) {
814: acb[i] = e_malloc(sizeof(struct aiocb));
815: if (!acb[i]) {
816: LOGERR;
817: for (i = 0; i < nbufs; i++)
818: if (acb[i])
819: e_free(acb[i]);
820: e_free(acb);
821: return NULL;
822: } else
823: memset(acb[i], 0, sizeof(struct aiocb));
824: acb[i]->aio_fildes = fd;
825: acb[i]->aio_nbytes = bufs[i].iov_len;
826: acb[i]->aio_buf = bufs[i].iov_base;
827: acb[i]->aio_offset = off;
828: acb[i]->aio_lio_opcode = LIO_READ;
829: }
830: memset(&sig, 0, sizeof sig);
831: sig.sigev_notify = SIGEV_KEVENT;
832: sig.sigev_notify_kqueue = root->root_kq;
833: sig.sigev_value.sival_ptr = acb;
834:
835: if (lio_listio(LIO_NOWAIT, acb, nbufs, &sig)) {
836: LOGERR;
837: for (i = 0; i < nbufs; i++)
838: if (acb[i])
839: e_free(acb[i]);
840: e_free(acb);
841: return NULL;
842: }
843:
844: return schedLIO(root, func, arg, (void*) acb, bufs, nbufs);
845: #endif /* KQ_SUPPORT */
846: }
847:
848: /*
849: * schedLIOWrite() - Add list of AIO write tasks to scheduler queue
850: *
851: * @root = root task
852: * @func = task execution function
853: * @arg = 1st func argument
854: * @fd = file descriptor
855: * @bufs = Buffer's list
856: * @nbufs = Number of Buffers
857: * @offset = Offset from start of file, if =-1 from current position
858: * return: NULL error or !=NULL new queued task
859: */
860: sched_task_t *
861: schedLIOWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
862: struct iovec *bufs, size_t nbufs, off_t offset)
863: {
864: #if SUP_ENABLE != KQ_SUPPORT
865: sched_SetErr(ENOTSUP, "disabled kqueue support");
866: return NULL;
867: #else
868: struct sigevent sig;
869: struct aiocb **acb;
870: off_t off;
871: register int i;
872:
873: if (!root || !func || !bufs || !nbufs)
874: return NULL;
875:
876: if (offset == (off_t) -1) {
877: off = lseek(fd, 0, SEEK_CUR);
878: if (off == -1) {
879: LOGERR;
880: return NULL;
881: }
882: } else
883: off = offset;
884:
885: if (!(acb = e_calloc(sizeof(void*), nbufs))) {
886: LOGERR;
887: return NULL;
888: } else
889: memset(acb, 0, sizeof(void*) * nbufs);
890: for (i = 0; i < nbufs; off += bufs[i++].iov_len) {
891: acb[i] = e_malloc(sizeof(struct aiocb));
892: if (!acb[i]) {
893: LOGERR;
894: for (i = 0; i < nbufs; i++)
895: if (acb[i])
896: e_free(acb[i]);
897: e_free(acb);
898: return NULL;
899: } else
900: memset(acb[i], 0, sizeof(struct aiocb));
901: acb[i]->aio_fildes = fd;
902: acb[i]->aio_nbytes = bufs[i].iov_len;
903: acb[i]->aio_buf = bufs[i].iov_base;
904: acb[i]->aio_offset = off;
905: acb[i]->aio_lio_opcode = LIO_WRITE;
906: }
907: memset(&sig, 0, sizeof sig);
908: sig.sigev_notify = SIGEV_KEVENT;
909: sig.sigev_notify_kqueue = root->root_kq;
910: sig.sigev_value.sival_ptr = acb;
911:
912: if (lio_listio(LIO_NOWAIT, acb, nbufs, &sig)) {
913: LOGERR;
914: for (i = 0; i < nbufs; i++)
915: if (acb[i])
916: e_free(acb[i]);
917: e_free(acb);
918: return NULL;
919: }
920:
921: return schedLIO(root, func, arg, (void*) acb, bufs, nbufs);
922: #endif /* KQ_SUPPORT */
923: }
924: #endif /* EVFILT_LIO */
925: #endif /* AIO_SUPPORT */
926:
927: /*
928: * schedTimer() - Add TIMER task to scheduler queue
929: *
930: * @root = root task
931: * @func = task execution function
932: * @arg = 1st func argument
933: * @ts = timeout argument structure
934: * @opt_data = Optional data
935: * @opt_dlen = Optional data length
936: * return: NULL error or !=NULL new queued task
937: */
938: sched_task_t *
939: schedTimer(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,
940: void *opt_data, size_t opt_dlen)
941: {
942: sched_task_t *task, *tmp, *t = NULL;
943: void *ptr;
944: struct timespec now;
945:
946: if (!root || !func)
947: return NULL;
948:
949: /* get new task */
950: if (!(task = sched_useTask(root)))
951: return NULL;
952:
953: TASK_FUNC(task) = func;
954: TASK_TYPE(task) = taskTIMER;
955: TASK_ROOT(task) = root;
956:
957: TASK_ARG(task) = arg;
958:
959: TASK_DATA(task) = opt_data;
960: TASK_DATLEN(task) = opt_dlen;
961:
962: /* calculate timeval structure */
963: clock_gettime(CLOCK_MONOTONIC, &now);
964: now.tv_sec += ts.tv_sec;
965: now.tv_nsec += ts.tv_nsec;
966: if (now.tv_nsec >= 1000000000L) {
967: now.tv_sec++;
968: now.tv_nsec -= 1000000000L;
969: } else if (now.tv_nsec < 0) {
970: now.tv_sec--;
971: now.tv_nsec += 1000000000L;
972: }
973: TASK_TS(task) = now;
974:
975: if (root->root_hooks.hook_add.timer)
976: ptr = root->root_hooks.hook_add.timer(task, NULL);
977: else
978: ptr = NULL;
979:
980: if (!ptr) {
981: SCHED_QLOCK(root, taskTIMER);
982: #ifdef TIMER_WITHOUT_SORT
983: TAILQ_INSERT_TAIL(&root->root_timer, task, task_node);
984: #else
985: TAILQ_FOREACH_SAFE(t, &root->root_timer, task_node, tmp)
986: if (sched_timespeccmp(&TASK_TS(task), &TASK_TS(t), -) < 1)
987: break;
988: if (!t)
989: TAILQ_INSERT_TAIL(&root->root_timer, task, task_node);
990: else
991: TAILQ_INSERT_BEFORE(t, task, task_node);
992: #endif
993: SCHED_QUNLOCK(root, taskTIMER);
994: } else
995: task = sched_unuseTask(task);
996:
997: return task;
998: }
999:
1000: /*
1001: * schedEvent() - Add EVENT task to scheduler queue
1002: *
1003: * @root = root task
1004: * @func = task execution function
1005: * @arg = 1st func argument
1006: * @val = additional func argument
1007: * @opt_data = Optional data
1008: * @opt_dlen = Optional data length
1009: * return: NULL error or !=NULL new queued task
1010: */
1011: sched_task_t *
1012: schedEvent(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val,
1013: void *opt_data, size_t opt_dlen)
1014: {
1015: sched_task_t *task;
1016: void *ptr;
1017:
1018: if (!root || !func)
1019: return NULL;
1020:
1021: /* get new task */
1022: if (!(task = sched_useTask(root)))
1023: return NULL;
1024:
1025: TASK_FUNC(task) = func;
1026: TASK_TYPE(task) = taskEVENT;
1027: TASK_ROOT(task) = root;
1028:
1029: TASK_ARG(task) = arg;
1030: TASK_VAL(task) = val;
1031:
1032: TASK_DATA(task) = opt_data;
1033: TASK_DATLEN(task) = opt_dlen;
1034:
1035: if (root->root_hooks.hook_add.event)
1036: ptr = root->root_hooks.hook_add.event(task, NULL);
1037: else
1038: ptr = NULL;
1039:
1040: if (!ptr)
1041: insert_task_to(task, &root->root_event);
1042: else
1043: task = sched_unuseTask(task);
1044:
1045: return task;
1046: }
1047:
1048:
1049: /*
1050: * schedTask() - Add regular task to scheduler queue
1051: *
1052: * @root = root task
1053: * @func = task execution function
1054: * @arg = 1st func argument
1055: * @prio = regular task priority, 0 is hi priority for regular tasks
1056: * @opt_data = Optional data
1057: * @opt_dlen = Optional data length
1058: * return: NULL error or !=NULL new queued task
1059: */
1060: sched_task_t *
1061: schedTask(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long prio,
1062: void *opt_data, size_t opt_dlen)
1063: {
1064: sched_task_t *task, *tmp, *t = NULL;
1065: void *ptr;
1066:
1067: if (!root || !func)
1068: return NULL;
1069:
1070: /* get new task */
1071: if (!(task = sched_useTask(root)))
1072: return NULL;
1073:
1074: TASK_FUNC(task) = func;
1075: TASK_TYPE(task) = taskTASK;
1076: TASK_ROOT(task) = root;
1077:
1078: TASK_ARG(task) = arg;
1079: TASK_VAL(task) = prio;
1080:
1081: TASK_DATA(task) = opt_data;
1082: TASK_DATLEN(task) = opt_dlen;
1083:
1084: if (root->root_hooks.hook_add.task)
1085: ptr = root->root_hooks.hook_add.task(task, NULL);
1086: else
1087: ptr = NULL;
1088:
1089: if (!ptr) {
1090: SCHED_QLOCK(root, taskTASK);
1091: TAILQ_FOREACH_SAFE(t, &root->root_task, task_node, tmp)
1092: if (TASK_VAL(task) < TASK_VAL(t))
1093: break;
1094: if (!t)
1095: TAILQ_INSERT_TAIL(&root->root_task, task, task_node);
1096: else
1097: TAILQ_INSERT_BEFORE(t, task, task_node);
1098: SCHED_QUNLOCK(root, taskTASK);
1099: } else
1100: task = sched_unuseTask(task);
1101:
1102: return task;
1103: }
1104:
1105: /*
1106: * schedSuspend() - Add Suspended task to scheduler queue
1107: *
1108: * @root = root task
1109: * @func = task execution function
1110: * @arg = 1st func argument
1111: * @id = Trigger ID
1112: * @opt_data = Optional data
1113: * @opt_dlen = Optional data length
1114: * return: NULL error or !=NULL new queued task
1115: */
1116: sched_task_t *
1117: schedSuspend(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id,
1118: void *opt_data, size_t opt_dlen)
1119: {
1120: sched_task_t *task;
1121: void *ptr;
1122:
1123: if (!root || !func)
1124: return NULL;
1125:
1126: /* get new task */
1127: if (!(task = sched_useTask(root)))
1128: return NULL;
1129:
1130: TASK_FUNC(task) = func;
1131: TASK_TYPE(task) = taskSUSPEND;
1132: TASK_ROOT(task) = root;
1133:
1134: TASK_ARG(task) = arg;
1135: TASK_VAL(task) = id;
1136:
1137: TASK_DATA(task) = opt_data;
1138: TASK_DATLEN(task) = opt_dlen;
1139:
1140: if (root->root_hooks.hook_add.suspend)
1141: ptr = root->root_hooks.hook_add.suspend(task, NULL);
1142: else
1143: ptr = NULL;
1144:
1145: if (!ptr)
1146: insert_task_to(task, &root->root_suspend);
1147: else
1148: task = sched_unuseTask(task);
1149:
1150: return task;
1151: }
1152:
1153: /*
1154: * schedCallOnce() - Call once from scheduler
1155: *
1156: * @root = root task
1157: * @func = task execution function
1158: * @arg = 1st func argument
1159: * @val = additional func argument
1160: * @opt_data = Optional data
1161: * @opt_dlen = Optional data length
1162: * return: return value from called func
1163: */
1164: sched_task_t *
1165: schedCallOnce(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val,
1166: void *opt_data, size_t opt_dlen)
1167: {
1168: sched_task_t *task;
1169: void *ret;
1170:
1171: if (!root || !func)
1172: return NULL;
1173:
1174: /* get new task */
1175: if (!(task = sched_useTask(root)))
1176: return NULL;
1177:
1178: TASK_FUNC(task) = func;
1179: TASK_TYPE(task) = taskEVENT;
1180: TASK_ROOT(task) = root;
1181:
1182: TASK_ARG(task) = arg;
1183: TASK_VAL(task) = val;
1184:
1185: TASK_DATA(task) = opt_data;
1186: TASK_DATLEN(task) = opt_dlen;
1187:
1188: ret = schedCall(task);
1189:
1190: sched_unuseTask(task);
1191: return ret;
1192: }
1193:
1194: /*
1195: * schedThread() - Add thread task to scheduler queue
1196: *
1197: * @root = root task
1198: * @func = task execution function
1199: * @arg = 1st func argument
1200: * @ss = stack size
1201: * @opt_data = Optional data
1202: * @opt_dlen = Optional data length
1203: * return: NULL error or !=NULL new queued task
1204: */
1205: sched_task_t *
1206: schedThread(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg,
1207: size_t ss, void *opt_data, size_t opt_dlen)
1208: {
1209: #ifndef HAVE_LIBPTHREAD
1210: sched_SetErr(ENOTSUP, "Not supported thread tasks");
1211: return NULL;
1212: #endif
1213: sched_task_t *task;
1214: pthread_attr_t attr;
1215: void *ptr;
1216:
1217: if (!root || !func)
1218: return NULL;
1219:
1220: /* get new task */
1221: if (!(task = sched_useTask(root)))
1222: return NULL;
1223:
1224: TASK_FUNC(task) = func;
1225: TASK_TYPE(task) = taskTHREAD;
1226: TASK_ROOT(task) = root;
1227:
1228: TASK_ARG(task) = arg;
1229:
1230: TASK_DATA(task) = opt_data;
1231: TASK_DATLEN(task) = opt_dlen;
1232:
1233: pthread_attr_init(&attr);
1234: pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
1235: if (ss && (errno = pthread_attr_setstacksize(&attr, ss))) {
1236: LOGERR;
1237: pthread_attr_destroy(&attr);
1238: return sched_unuseTask(task);
1239: }
1240: if ((errno = pthread_attr_getstacksize(&attr, &ss))) {
1241: LOGERR;
1242: pthread_attr_destroy(&attr);
1243: return sched_unuseTask(task);
1244: } else
1245: TASK_FLAG(task) = ss;
1246:
1247: #ifdef SCHED_RR
1248: pthread_attr_setschedpolicy(&attr, SCHED_RR);
1249: #else
1250: pthread_attr_setschedpolicy(&attr, SCHED_OTHER);
1251: #endif
1252:
1253: if (root->root_hooks.hook_add.thread)
1254: ptr = root->root_hooks.hook_add.thread(task, &attr);
1255: else
1256: ptr = NULL;
1257:
1258: if (!ptr)
1259: insert_task_to(task, &root->root_thread);
1260: else
1261: task = sched_unuseTask(task);
1262:
1263: pthread_attr_destroy(&attr);
1264: return task;
1265: }
1266:
1267: /*
1268: * schedRTC() - Add RTC task to scheduler queue
1269: *
1270: * @root = root task
1271: * @func = task execution function
1272: * @arg = 1st func argument
1273: * @ts = timeout argument structure, minimum alarm timer resolution is 1msec!
1274: * @opt_data = Optional RTC ID
1275: * @opt_dlen = Optional data length
1276: * return: NULL error or !=NULL new queued task
1277: */
1278: sched_task_t *
1279: schedRTC(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,
1280: void *opt_data, size_t opt_dlen)
1281: {
1282: #if defined(HAVE_LIBRT) && defined(HAVE_TIMER_CREATE) && \
1283: defined(HAVE_TIMER_SETTIME) && defined(HAVE_TIMER_DELETE)
1284: sched_task_t *task;
1285: void *ptr;
1286:
1287: if (!root || !func)
1288: return NULL;
1289:
1290: /* get new task */
1291: if (!(task = sched_useTask(root)))
1292: return NULL;
1293:
1294: TASK_FUNC(task) = func;
1295: TASK_TYPE(task) = taskRTC;
1296: TASK_ROOT(task) = root;
1297:
1298: TASK_ARG(task) = arg;
1299: TASK_TS(task) = ts;
1300:
1301: TASK_DATA(task) = opt_data;
1302: TASK_DATLEN(task) = opt_dlen;
1303:
1304: if (root->root_hooks.hook_add.rtc)
1305: ptr = root->root_hooks.hook_add.rtc(task, NULL);
1306: else
1307: ptr = NULL;
1308:
1309: if (!ptr)
1310: insert_task_to(task, &root->root_rtc);
1311: else
1312: task = sched_unuseTask(task);
1313:
1314: return task;
1315: #else
1316: sched_SetErr(ENOTSUP, "Not supported realtime clock extensions");
1317: return NULL;
1318: #endif
1319: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>