1: /*************************************************************************
2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
6: * $Id: tasks.c,v 1.29.4.1 2022/11/28 23:29:59 misho Exp $
7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
15: Copyright 2004 - 2022
16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47:
48:
49: /*
50: * sched_useTask() - Get and init new task
51: *
52: * @root = root task
53: * return: NULL error or !=NULL prepared task
54: */
55: sched_task_t *
56: sched_useTask(sched_root_task_t * __restrict root)
57: {
58: sched_task_t *task, *tmp;
59:
60: SCHED_QLOCK(root, taskUNUSE);
61: TAILQ_FOREACH_SAFE(task, &root->root_unuse, task_node, tmp) {
62: if (!TASK_ISLOCKED(task)) {
63: TAILQ_REMOVE(&root->root_unuse, task, task_node);
64: break;
65: }
66: }
67: SCHED_QUNLOCK(root, taskUNUSE);
68:
69: if (!task) {
70: task = e_malloc(sizeof(sched_task_t));
71: if (!task) {
72: LOGERR;
73: return NULL;
74: }
75: }
76:
77: memset(task, 0, sizeof(sched_task_t));
78: task->task_id = (uintptr_t) task;
79: return task;
80: }
81:
82: /*
83: * sched_unuseTask() - Unlock and put task to unuse queue
84: *
85: * @task = task
86: * return: always is NULL
87: */
88: sched_task_t *
89: sched_unuseTask(sched_task_t * __restrict task)
90: {
91: TASK_UNLOCK(task);
92:
93: TASK_TYPE(task) = taskUNUSE;
94: insert_task_to(task, &(TASK_ROOT(task))->root_unuse);
95:
96: task = NULL;
97: return task;
98: }
99:
100: /*
101: * sched_taskExit() - Exit routine for scheduler task, explicit required for thread tasks
102: *
103: * @task = current task
104: * @retcode = return code
105: * return: return code
106: */
107: void *
108: sched_taskExit(sched_task_t *task, intptr_t retcode)
109: {
110: if (!task || !TASK_ROOT(task))
111: return (void*) -1;
112:
113: if (TASK_ROOT(task)->root_hooks.hook_exec.exit)
114: TASK_ROOT(task)->root_hooks.hook_exec.exit(task, (void*) retcode);
115:
116: TASK_ROOT(task)->root_ret = (void*) retcode;
117: return (void*) retcode;
118: }
119:
120:
121: /*
122: * schedRead() - Add READ I/O task to scheduler queue
123: *
124: * @root = root task
125: * @func = task execution function
126: * @arg = 1st func argument
127: * @fd = fd handle
128: * @opt_data = Optional data
129: * @opt_dlen = Optional data length
130: * return: NULL error or !=NULL new queued task
131: */
132: sched_task_t *
133: schedRead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
134: void *opt_data, size_t opt_dlen)
135: {
136: return schedReadExt(root, func, arg, fd, opt_data, opt_dlen, 0);
137: }
138:
139: /*
140: * schedReadExt() - Add READ I/O task to scheduler queue with custom event mask
141: *
142: * @root = root task
143: * @func = task execution function
144: * @arg = 1st func argument
145: * @fd = fd handle
146: * @opt_data = Optional data
147: * @opt_dlen = Optional data length
148: * @mask = Event mask
149: * return: NULL error or !=NULL new queued task
150: */
151: sched_task_t *
152: schedReadExt(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
153: void *opt_data, size_t opt_dlen, u_long mask)
154: {
155: sched_task_t *task;
156: void *ptr;
157:
158: if (!root || !func)
159: return NULL;
160:
161: /* get new task */
162: if (!(task = sched_useTask(root)))
163: return NULL;
164:
165: TASK_FUNC(task) = func;
166: TASK_TYPE(task) = taskREAD;
167: TASK_ROOT(task) = root;
168:
169: TASK_ARG(task) = arg;
170: TASK_FD(task) = fd;
171:
172: TASK_DATA(task) = opt_data;
173: TASK_DATLEN(task) = opt_dlen;
174:
175: TASK_HARG(task) = (void*) mask;
176:
177: if (root->root_hooks.hook_add.read)
178: ptr = root->root_hooks.hook_add.read(task, task->task_harg);
179: else
180: ptr = NULL;
181:
182: if (!ptr)
183: insert_task_to(task, &root->root_read);
184: else
185: task = sched_unuseTask(task);
186:
187: return task;
188: }
189:
190: /*
191: * schedWrite() - Add WRITE I/O task to scheduler queue
192: *
193: * @root = root task
194: * @func = task execution function
195: * @arg = 1st func argument
196: * @fd = fd handle
197: * @opt_data = Optional data
198: * @opt_dlen = Optional data length
199: * return: NULL error or !=NULL new queued task
200: */
201: sched_task_t *
202: schedWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
203: void *opt_data, size_t opt_dlen)
204: {
205: sched_task_t *task;
206: void *ptr;
207:
208: if (!root || !func)
209: return NULL;
210:
211: /* get new task */
212: if (!(task = sched_useTask(root)))
213: return NULL;
214:
215: TASK_FUNC(task) = func;
216: TASK_TYPE(task) = taskWRITE;
217: TASK_ROOT(task) = root;
218:
219: TASK_ARG(task) = arg;
220: TASK_FD(task) = fd;
221:
222: TASK_DATA(task) = opt_data;
223: TASK_DATLEN(task) = opt_dlen;
224:
225: if (root->root_hooks.hook_add.write)
226: ptr = root->root_hooks.hook_add.write(task, NULL);
227: else
228: ptr = NULL;
229:
230: if (!ptr)
231: insert_task_to(task, &root->root_write);
232: else
233: task = sched_unuseTask(task);
234:
235: return task;
236: }
237:
238: /*
239: * schedNode() - Add NODE task to scheduler queue
240: *
241: * @root = root task
242: * @func = task execution function
243: * @arg = 1st func argument
244: * @fd = fd handle
245: * @opt_data = Optional data
246: * @opt_dlen = Optional data length
247: * return: NULL error or !=NULL new queued task
248: */
249: sched_task_t *
250: schedNode(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
251: void *opt_data, size_t opt_dlen)
252: {
253: #if SUP_ENABLE != KQ_SUPPORT
254: sched_SetErr(ENOTSUP, "disabled kqueue support");
255: return NULL;
256: #else
257: sched_task_t *task;
258: void *ptr;
259:
260: if (!root || !func)
261: return NULL;
262:
263: /* get new task */
264: if (!(task = sched_useTask(root)))
265: return NULL;
266:
267: TASK_FUNC(task) = func;
268: TASK_TYPE(task) = taskNODE;
269: TASK_ROOT(task) = root;
270:
271: TASK_ARG(task) = arg;
272: TASK_FD(task) = fd;
273:
274: TASK_DATA(task) = opt_data;
275: TASK_DATLEN(task) = opt_dlen;
276:
277: if (root->root_hooks.hook_add.node)
278: ptr = root->root_hooks.hook_add.node(task, NULL);
279: else
280: ptr = NULL;
281:
282: if (!ptr)
283: insert_task_to(task, &root->root_node);
284: else
285: task = sched_unuseTask(task);
286:
287: return task;
288: #endif /* KQ_SUPPORT */
289: }
290:
291: /*
292: * schedNode2() - Add NODE task with all events to scheduler queue
293: *
294: * @root = root task
295: * @func = task execution function
296: * @arg = 1st func argument
297: * @fd = fd handle
298: * @opt_data = Optional data
299: * @opt_dlen = Optional data length
300: * return: NULL error or !=NULL new queued task
301: */
302: sched_task_t *
303: schedNode2(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
304: void *opt_data, size_t opt_dlen)
305: {
306: #if SUP_ENABLE != KQ_SUPPORT
307: sched_SetErr(ENOTSUP, "disabled kqueue support");
308: return NULL;
309: #else
310: sched_task_t *task;
311: void *ptr;
312:
313: if (!root || !func)
314: return NULL;
315:
316: /* get new task */
317: if (!(task = sched_useTask(root)))
318: return NULL;
319:
320: TASK_FUNC(task) = func;
321: TASK_TYPE(task) = taskNODE;
322: TASK_ROOT(task) = root;
323:
324: TASK_ARG(task) = arg;
325: TASK_FD(task) = fd;
326:
327: TASK_DATA(task) = opt_data;
328: TASK_DATLEN(task) = opt_dlen;
329:
330: if (root->root_hooks.hook_add.node)
331: #ifdef __FreeBSD__
332: ptr = root->root_hooks.hook_add.node(task,
333: (void*) (NOTE_READ | NOTE_CLOSE_WRITE | NOTE_CLOSE | NOTE_OPEN));
334: #else
335: ptr = root->root_hooks.hook_add.node(task, NULL);
336: #endif
337: else
338: ptr = NULL;
339:
340: if (!ptr)
341: insert_task_to(task, &root->root_node);
342: else
343: task = sched_unuseTask(task);
344:
345: return task;
346: #endif /* KQ_SUPPORT */
347: }
348:
349: /*
350: * schedProc() - Add PROC task to scheduler queue
351: *
352: * @root = root task
353: * @func = task execution function
354: * @arg = 1st func argument
355: * @pid = PID
356: * @opt_data = Optional data
357: * @opt_dlen = Optional data length
358: * return: NULL error or !=NULL new queued task
359: */
360: sched_task_t *
361: schedProc(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long pid,
362: void *opt_data, size_t opt_dlen)
363: {
364: #if SUP_ENABLE != KQ_SUPPORT
365: sched_SetErr(ENOTSUP, "disabled kqueue support");
366: return NULL;
367: #else
368: sched_task_t *task;
369: void *ptr;
370:
371: if (!root || !func)
372: return NULL;
373:
374: /* get new task */
375: if (!(task = sched_useTask(root)))
376: return NULL;
377:
378: TASK_FUNC(task) = func;
379: TASK_TYPE(task) = taskPROC;
380: TASK_ROOT(task) = root;
381:
382: TASK_ARG(task) = arg;
383: TASK_VAL(task) = pid;
384:
385: TASK_DATA(task) = opt_data;
386: TASK_DATLEN(task) = opt_dlen;
387:
388: if (root->root_hooks.hook_add.proc)
389: ptr = root->root_hooks.hook_add.proc(task, NULL);
390: else
391: ptr = NULL;
392:
393: if (!ptr)
394: insert_task_to(task, &root->root_proc);
395: else
396: task = sched_unuseTask(task);
397:
398: return task;
399: #endif /* KQ_SUPPORT */
400: }
401:
402: /*
403: * schedUser() - Add trigger USER task to scheduler queue
404: *
405: * @root = root task
406: * @func = task execution function
407: * @arg = 1st func argument
408: * @id = Trigger ID
409: * @opt_data = Optional data
410: * @opt_dlen = Optional user's trigger flags
411: * return: NULL error or !=NULL new queued task
412: */
413: sched_task_t *
414: schedUser(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id,
415: void *opt_data, size_t opt_dlen)
416: {
417: #if SUP_ENABLE != KQ_SUPPORT
418: sched_SetErr(ENOTSUP, "disabled kqueue support");
419: return NULL;
420: #else
421: #ifndef EVFILT_USER
422: sched_SetErr(ENOTSUP, "Not supported kevent() filter");
423: return NULL;
424: #else
425: sched_task_t *task;
426: void *ptr;
427:
428: if (!root || !func)
429: return NULL;
430:
431: /* get new task */
432: if (!(task = sched_useTask(root)))
433: return NULL;
434:
435: TASK_FUNC(task) = func;
436: TASK_TYPE(task) = taskUSER;
437: TASK_ROOT(task) = root;
438:
439: TASK_ARG(task) = arg;
440: TASK_VAL(task) = id;
441:
442: TASK_DATA(task) = opt_data;
443: TASK_DATLEN(task) = opt_dlen;
444:
445: if (root->root_hooks.hook_add.user)
446: ptr = root->root_hooks.hook_add.user(task, NULL);
447: else
448: ptr = NULL;
449:
450: if (!ptr)
451: insert_task_to(task, &root->root_user);
452: else
453: task = sched_unuseTask(task);
454:
455: return task;
456: #endif /* EVFILT_USER */
457: #endif /* KQ_SUPPORT */
458: }
459:
460: /*
461: * schedSignal() - Add SIGNAL task to scheduler queue
462: *
463: * @root = root task
464: * @func = task execution function
465: * @arg = 1st func argument
466: * @sig = Signal
467: * @opt_data = Optional data
468: * @opt_dlen = Optional data length
469: * return: NULL error or !=NULL new queued task
470: */
471: sched_task_t *
472: schedSignal(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long sig,
473: void *opt_data, size_t opt_dlen)
474: {
475: #if SUP_ENABLE != KQ_SUPPORT
476: sched_SetErr(ENOTSUP, "disabled kqueue support");
477: return NULL;
478: #else
479: sched_task_t *task;
480: void *ptr;
481:
482: if (!root || !func)
483: return NULL;
484:
485: /* get new task */
486: if (!(task = sched_useTask(root)))
487: return NULL;
488:
489: TASK_FUNC(task) = func;
490: TASK_TYPE(task) = taskSIGNAL;
491: TASK_ROOT(task) = root;
492:
493: TASK_ARG(task) = arg;
494: TASK_VAL(task) = sig;
495:
496: TASK_DATA(task) = opt_data;
497: TASK_DATLEN(task) = opt_dlen;
498:
499: if (root->root_hooks.hook_add.signal)
500: ptr = root->root_hooks.hook_add.signal(task, NULL);
501: else
502: ptr = NULL;
503:
504: if (!ptr)
505: insert_task_to(task, &root->root_signal);
506: else
507: task = sched_unuseTask(task);
508:
509: return task;
510: #endif /* KQ_SUPPORT */
511: }
512:
513: /*
514: * schedAlarm() - Add ALARM task to scheduler queue
515: *
516: * @root = root task
517: * @func = task execution function
518: * @arg = 1st func argument
519: * @ts = timeout argument structure, minimum alarm timer resolution is 1msec!
520: * @opt_data = Alarm timer ID
521: * @opt_dlen = Optional data length
522: * return: NULL error or !=NULL new queued task
523: */
524: sched_task_t *
525: schedAlarm(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,
526: void *opt_data, size_t opt_dlen)
527: {
528: #if SUP_ENABLE != KQ_SUPPORT
529: sched_SetErr(ENOTSUP, "disabled kqueue support");
530: return NULL;
531: #else
532: sched_task_t *task;
533: void *ptr;
534:
535: if (!root || !func)
536: return NULL;
537:
538: /* get new task */
539: if (!(task = sched_useTask(root)))
540: return NULL;
541:
542: TASK_FUNC(task) = func;
543: TASK_TYPE(task) = taskALARM;
544: TASK_ROOT(task) = root;
545:
546: TASK_ARG(task) = arg;
547: TASK_TS(task) = ts;
548:
549: TASK_DATA(task) = opt_data;
550: TASK_DATLEN(task) = opt_dlen;
551:
552: if (root->root_hooks.hook_add.alarm)
553: ptr = root->root_hooks.hook_add.alarm(task, NULL);
554: else
555: ptr = NULL;
556:
557: if (!ptr)
558: insert_task_to(task, &root->root_alarm);
559: else
560: task = sched_unuseTask(task);
561:
562: return task;
563: #endif /* KQ_SUPPORT */
564: }
565:
566: #ifdef AIO_SUPPORT
567: /*
568: * schedAIO() - Add AIO task to scheduler queue
569: *
570: * @root = root task
571: * @func = task execution function
572: * @arg = 1st func argument
573: * @acb = AIO cb structure address
574: * @opt_data = Optional data
575: * @opt_dlen = Optional data length
576: * return: NULL error or !=NULL new queued task
577: */
578: sched_task_t *
579: schedAIO(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg,
580: struct aiocb * __restrict acb, void *opt_data, size_t opt_dlen)
581: {
582: #if SUP_ENABLE != KQ_SUPPORT
583: sched_SetErr(ENOTSUP, "disabled kqueue support");
584: return NULL;
585: #else
586: sched_task_t *task;
587: void *ptr;
588:
589: if (!root || !func || !acb || !opt_dlen)
590: return NULL;
591:
592: /* get new task */
593: if (!(task = sched_useTask(root)))
594: return NULL;
595:
596: TASK_FUNC(task) = func;
597: TASK_TYPE(task) = taskAIO;
598: TASK_ROOT(task) = root;
599:
600: TASK_ARG(task) = arg;
601: TASK_VAL(task) = (u_long) acb;
602:
603: TASK_DATA(task) = opt_data;
604: TASK_DATLEN(task) = opt_dlen;
605:
606: if (root->root_hooks.hook_add.aio)
607: ptr = root->root_hooks.hook_add.aio(task, NULL);
608: else
609: ptr = NULL;
610:
611: if (!ptr)
612: insert_task_to(task, &root->root_aio);
613: else
614: task = sched_unuseTask(task);
615:
616: return task;
617: #endif /* KQ_SUPPORT */
618: }
619:
620: /*
621: * schedAIORead() - Add AIO read task to scheduler queue
622: *
623: * @root = root task
624: * @func = task execution function
625: * @arg = 1st func argument
626: * @fd = file descriptor
627: * @buffer = Buffer
628: * @buflen = Buffer length
629: * @offset = Offset from start of file, if =-1 from current position
630: * return: NULL error or !=NULL new queued task
631: */
632: sched_task_t *
633: schedAIORead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
634: void *buffer, size_t buflen, off_t offset)
635: {
636: #if SUP_ENABLE != KQ_SUPPORT
637: sched_SetErr(ENOTSUP, "disabled kqueue support");
638: return NULL;
639: #else
640: struct aiocb *acb;
641: off_t off;
642:
643: if (!root || !func || !buffer || !buflen)
644: return NULL;
645:
646: if (offset == (off_t) -1) {
647: off = lseek(fd, 0, SEEK_CUR);
648: if (off == -1) {
649: LOGERR;
650: return NULL;
651: }
652: } else
653: off = offset;
654:
655: if (!(acb = e_malloc(sizeof(struct aiocb)))) {
656: LOGERR;
657: return NULL;
658: } else
659: memset(acb, 0, sizeof(struct aiocb));
660:
661: acb->aio_fildes = fd;
662: acb->aio_nbytes = buflen;
663: acb->aio_buf = buffer;
664: acb->aio_offset = off;
665: acb->aio_sigevent.sigev_notify = SIGEV_KEVENT;
666: acb->aio_sigevent.sigev_notify_kqueue = root->root_kq;
667: acb->aio_sigevent.sigev_value.sival_ptr = acb;
668:
669: if (aio_read(acb)) {
670: LOGERR;
671: e_free(acb);
672: return NULL;
673: }
674:
675: return schedAIO(root, func, arg, acb, buffer, buflen);
676: #endif /* KQ_SUPPORT */
677: }
678:
679: /*
680: * schedAIOWrite() - Add AIO write task to scheduler queue
681: *
682: * @root = root task
683: * @func = task execution function
684: * @arg = 1st func argument
685: * @fd = file descriptor
686: * @buffer = Buffer
687: * @buflen = Buffer length
688: * @offset = Offset from start of file, if =-1 from current position
689: * return: NULL error or !=NULL new queued task
690: */
691: sched_task_t *
692: schedAIOWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
693: void *buffer, size_t buflen, off_t offset)
694: {
695: #if SUP_ENABLE != KQ_SUPPORT
696: sched_SetErr(ENOTSUP, "disabled kqueue support");
697: return NULL;
698: #else
699: struct aiocb *acb;
700: off_t off;
701:
702: if (!root || !func || !buffer || !buflen)
703: return NULL;
704:
705: if (offset == (off_t) -1) {
706: off = lseek(fd, 0, SEEK_CUR);
707: if (off == -1) {
708: LOGERR;
709: return NULL;
710: }
711: } else
712: off = offset;
713:
714: if (!(acb = e_malloc(sizeof(struct aiocb)))) {
715: LOGERR;
716: return NULL;
717: } else
718: memset(acb, 0, sizeof(struct aiocb));
719:
720: acb->aio_fildes = fd;
721: acb->aio_nbytes = buflen;
722: acb->aio_buf = buffer;
723: acb->aio_offset = off;
724: acb->aio_sigevent.sigev_notify = SIGEV_KEVENT;
725: acb->aio_sigevent.sigev_notify_kqueue = root->root_kq;
726: acb->aio_sigevent.sigev_value.sival_ptr = acb;
727:
728: if (aio_write(acb)) {
729: LOGERR;
730: e_free(acb);
731: return NULL;
732: }
733:
734: return schedAIO(root, func, arg, acb, buffer, buflen);
735: #endif /* KQ_SUPPORT */
736: }
737:
738: #ifdef EVFILT_LIO
739: /*
740: * schedLIO() - Add AIO bulk tasks to scheduler queue
741: *
742: * @root = root task
743: * @func = task execution function
744: * @arg = 1st func argument
745: * @acbs = AIO cb structure addresses
746: * @opt_data = Optional data
747: * @opt_dlen = Optional data length
748: * return: NULL error or !=NULL new queued task
749: */
750: sched_task_t *
751: schedLIO(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg,
752: struct aiocb ** __restrict acbs, void *opt_data, size_t opt_dlen)
753: {
754: #if SUP_ENABLE != KQ_SUPPORT
755: sched_SetErr(ENOTSUP, "disabled kqueue support");
756: return NULL;
757: #else
758: sched_task_t *task;
759: void *ptr;
760:
761: if (!root || !func || !acbs || !opt_dlen)
762: return NULL;
763:
764: /* get new task */
765: if (!(task = sched_useTask(root)))
766: return NULL;
767:
768: TASK_FUNC(task) = func;
769: TASK_TYPE(task) = taskLIO;
770: TASK_ROOT(task) = root;
771:
772: TASK_ARG(task) = arg;
773: TASK_VAL(task) = (u_long) acbs;
774:
775: TASK_DATA(task) = opt_data;
776: TASK_DATLEN(task) = opt_dlen;
777:
778: if (root->root_hooks.hook_add.lio)
779: ptr = root->root_hooks.hook_add.lio(task, NULL);
780: else
781: ptr = NULL;
782:
783: if (!ptr)
784: insert_task_to(task, &root->root_lio);
785: else
786: task = sched_unuseTask(task);
787:
788: return task;
789: #endif /* KQ_SUPPORT */
790: }
791:
792: /*
793: * schedLIORead() - Add list of AIO read tasks to scheduler queue
794: *
795: * @root = root task
796: * @func = task execution function
797: * @arg = 1st func argument
798: * @fd = file descriptor
799: * @bufs = Buffer's list
800: * @nbufs = Number of Buffers
801: * @offset = Offset from start of file, if =-1 from current position
802: * return: NULL error or !=NULL new queued task
803: */
804: sched_task_t *
805: schedLIORead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
806: struct iovec *bufs, size_t nbufs, off_t offset)
807: {
808: #if SUP_ENABLE != KQ_SUPPORT
809: sched_SetErr(ENOTSUP, "disabled kqueue support");
810: return NULL;
811: #else
812: struct sigevent sig;
813: struct aiocb **acb;
814: off_t off;
815: register int i;
816:
817: if (!root || !func || !bufs || !nbufs)
818: return NULL;
819:
820: if (offset == (off_t) -1) {
821: off = lseek(fd, 0, SEEK_CUR);
822: if (off == -1) {
823: LOGERR;
824: return NULL;
825: }
826: } else
827: off = offset;
828:
829: if (!(acb = e_calloc(sizeof(void*), nbufs))) {
830: LOGERR;
831: return NULL;
832: } else
833: memset(acb, 0, sizeof(void*) * nbufs);
834: for (i = 0; i < nbufs; off += bufs[i++].iov_len) {
835: acb[i] = e_malloc(sizeof(struct aiocb));
836: if (!acb[i]) {
837: LOGERR;
838: for (i = 0; i < nbufs; i++)
839: if (acb[i])
840: e_free(acb[i]);
841: e_free(acb);
842: return NULL;
843: } else
844: memset(acb[i], 0, sizeof(struct aiocb));
845: acb[i]->aio_fildes = fd;
846: acb[i]->aio_nbytes = bufs[i].iov_len;
847: acb[i]->aio_buf = bufs[i].iov_base;
848: acb[i]->aio_offset = off;
849: acb[i]->aio_lio_opcode = LIO_READ;
850: }
851: memset(&sig, 0, sizeof sig);
852: sig.sigev_notify = SIGEV_KEVENT;
853: sig.sigev_notify_kqueue = root->root_kq;
854: sig.sigev_value.sival_ptr = acb;
855:
856: if (lio_listio(LIO_NOWAIT, acb, nbufs, &sig)) {
857: LOGERR;
858: for (i = 0; i < nbufs; i++)
859: if (acb[i])
860: e_free(acb[i]);
861: e_free(acb);
862: return NULL;
863: }
864:
865: return schedLIO(root, func, arg, (void*) acb, bufs, nbufs);
866: #endif /* KQ_SUPPORT */
867: }
868:
869: /*
870: * schedLIOWrite() - Add list of AIO write tasks to scheduler queue
871: *
872: * @root = root task
873: * @func = task execution function
874: * @arg = 1st func argument
875: * @fd = file descriptor
876: * @bufs = Buffer's list
877: * @nbufs = Number of Buffers
878: * @offset = Offset from start of file, if =-1 from current position
879: * return: NULL error or !=NULL new queued task
880: */
881: sched_task_t *
882: schedLIOWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
883: struct iovec *bufs, size_t nbufs, off_t offset)
884: {
885: #if SUP_ENABLE != KQ_SUPPORT
886: sched_SetErr(ENOTSUP, "disabled kqueue support");
887: return NULL;
888: #else
889: struct sigevent sig;
890: struct aiocb **acb;
891: off_t off;
892: register int i;
893:
894: if (!root || !func || !bufs || !nbufs)
895: return NULL;
896:
897: if (offset == (off_t) -1) {
898: off = lseek(fd, 0, SEEK_CUR);
899: if (off == -1) {
900: LOGERR;
901: return NULL;
902: }
903: } else
904: off = offset;
905:
906: if (!(acb = e_calloc(sizeof(void*), nbufs))) {
907: LOGERR;
908: return NULL;
909: } else
910: memset(acb, 0, sizeof(void*) * nbufs);
911: for (i = 0; i < nbufs; off += bufs[i++].iov_len) {
912: acb[i] = e_malloc(sizeof(struct aiocb));
913: if (!acb[i]) {
914: LOGERR;
915: for (i = 0; i < nbufs; i++)
916: if (acb[i])
917: e_free(acb[i]);
918: e_free(acb);
919: return NULL;
920: } else
921: memset(acb[i], 0, sizeof(struct aiocb));
922: acb[i]->aio_fildes = fd;
923: acb[i]->aio_nbytes = bufs[i].iov_len;
924: acb[i]->aio_buf = bufs[i].iov_base;
925: acb[i]->aio_offset = off;
926: acb[i]->aio_lio_opcode = LIO_WRITE;
927: }
928: memset(&sig, 0, sizeof sig);
929: sig.sigev_notify = SIGEV_KEVENT;
930: sig.sigev_notify_kqueue = root->root_kq;
931: sig.sigev_value.sival_ptr = acb;
932:
933: if (lio_listio(LIO_NOWAIT, acb, nbufs, &sig)) {
934: LOGERR;
935: for (i = 0; i < nbufs; i++)
936: if (acb[i])
937: e_free(acb[i]);
938: e_free(acb);
939: return NULL;
940: }
941:
942: return schedLIO(root, func, arg, (void*) acb, bufs, nbufs);
943: #endif /* KQ_SUPPORT */
944: }
945: #endif /* EVFILT_LIO */
946: #endif /* AIO_SUPPORT */
947:
948: /*
949: * schedTimer() - Add TIMER task to scheduler queue
950: *
951: * @root = root task
952: * @func = task execution function
953: * @arg = 1st func argument
954: * @ts = timeout argument structure
955: * @opt_data = Optional data
956: * @opt_dlen = Optional data length
957: * return: NULL error or !=NULL new queued task
958: */
959: sched_task_t *
960: schedTimer(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,
961: void *opt_data, size_t opt_dlen)
962: {
963: sched_task_t *task, *tmp, *t = NULL;
964: void *ptr;
965: struct timespec now;
966:
967: if (!root || !func)
968: return NULL;
969:
970: /* get new task */
971: if (!(task = sched_useTask(root)))
972: return NULL;
973:
974: TASK_FUNC(task) = func;
975: TASK_TYPE(task) = taskTIMER;
976: TASK_ROOT(task) = root;
977:
978: TASK_ARG(task) = arg;
979:
980: TASK_DATA(task) = opt_data;
981: TASK_DATLEN(task) = opt_dlen;
982:
983: /* calculate timeval structure */
984: clock_gettime(CLOCK_MONOTONIC, &now);
985: now.tv_sec += ts.tv_sec;
986: now.tv_nsec += ts.tv_nsec;
987: if (now.tv_nsec >= 1000000000L) {
988: now.tv_sec++;
989: now.tv_nsec -= 1000000000L;
990: } else if (now.tv_nsec < 0) {
991: now.tv_sec--;
992: now.tv_nsec += 1000000000L;
993: }
994: TASK_TS(task) = now;
995:
996: if (root->root_hooks.hook_add.timer)
997: ptr = root->root_hooks.hook_add.timer(task, NULL);
998: else
999: ptr = NULL;
1000:
1001: if (!ptr) {
1002: SCHED_QLOCK(root, taskTIMER);
1003: #ifdef TIMER_WITHOUT_SORT
1004: TAILQ_INSERT_TAIL(&root->root_timer, task, task_node);
1005: #else
1006: TAILQ_FOREACH_SAFE(t, &root->root_timer, task_node, tmp)
1007: if (sched_timespeccmp(&TASK_TS(task), &TASK_TS(t), -) < 1)
1008: break;
1009: if (!t)
1010: TAILQ_INSERT_TAIL(&root->root_timer, task, task_node);
1011: else
1012: TAILQ_INSERT_BEFORE(t, task, task_node);
1013: #endif
1014: SCHED_QUNLOCK(root, taskTIMER);
1015: } else
1016: task = sched_unuseTask(task);
1017:
1018: return task;
1019: }
1020:
1021: /*
1022: * schedEvent() - Add EVENT task to scheduler queue
1023: *
1024: * @root = root task
1025: * @func = task execution function
1026: * @arg = 1st func argument
1027: * @val = additional func argument
1028: * @opt_data = Optional data
1029: * @opt_dlen = Optional data length
1030: * return: NULL error or !=NULL new queued task
1031: */
1032: sched_task_t *
1033: schedEvent(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val,
1034: void *opt_data, size_t opt_dlen)
1035: {
1036: sched_task_t *task;
1037: void *ptr;
1038:
1039: if (!root || !func)
1040: return NULL;
1041:
1042: /* get new task */
1043: if (!(task = sched_useTask(root)))
1044: return NULL;
1045:
1046: TASK_FUNC(task) = func;
1047: TASK_TYPE(task) = taskEVENT;
1048: TASK_ROOT(task) = root;
1049:
1050: TASK_ARG(task) = arg;
1051: TASK_VAL(task) = val;
1052:
1053: TASK_DATA(task) = opt_data;
1054: TASK_DATLEN(task) = opt_dlen;
1055:
1056: if (root->root_hooks.hook_add.event)
1057: ptr = root->root_hooks.hook_add.event(task, NULL);
1058: else
1059: ptr = NULL;
1060:
1061: if (!ptr)
1062: insert_task_to(task, &root->root_event);
1063: else
1064: task = sched_unuseTask(task);
1065:
1066: return task;
1067: }
1068:
1069:
1070: /*
1071: * schedTask() - Add regular task to scheduler queue
1072: *
1073: * @root = root task
1074: * @func = task execution function
1075: * @arg = 1st func argument
1076: * @prio = regular task priority, 0 is hi priority for regular tasks
1077: * @opt_data = Optional data
1078: * @opt_dlen = Optional data length
1079: * return: NULL error or !=NULL new queued task
1080: */
1081: sched_task_t *
1082: schedTask(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long prio,
1083: void *opt_data, size_t opt_dlen)
1084: {
1085: sched_task_t *task, *tmp, *t = NULL;
1086: void *ptr;
1087:
1088: if (!root || !func)
1089: return NULL;
1090:
1091: /* get new task */
1092: if (!(task = sched_useTask(root)))
1093: return NULL;
1094:
1095: TASK_FUNC(task) = func;
1096: TASK_TYPE(task) = taskTASK;
1097: TASK_ROOT(task) = root;
1098:
1099: TASK_ARG(task) = arg;
1100: TASK_VAL(task) = prio;
1101:
1102: TASK_DATA(task) = opt_data;
1103: TASK_DATLEN(task) = opt_dlen;
1104:
1105: if (root->root_hooks.hook_add.task)
1106: ptr = root->root_hooks.hook_add.task(task, NULL);
1107: else
1108: ptr = NULL;
1109:
1110: if (!ptr) {
1111: SCHED_QLOCK(root, taskTASK);
1112: TAILQ_FOREACH_SAFE(t, &root->root_task, task_node, tmp)
1113: if (TASK_VAL(task) < TASK_VAL(t))
1114: break;
1115: if (!t)
1116: TAILQ_INSERT_TAIL(&root->root_task, task, task_node);
1117: else
1118: TAILQ_INSERT_BEFORE(t, task, task_node);
1119: SCHED_QUNLOCK(root, taskTASK);
1120: } else
1121: task = sched_unuseTask(task);
1122:
1123: return task;
1124: }
1125:
1126: /*
1127: * schedSuspend() - Add Suspended task to scheduler queue
1128: *
1129: * @root = root task
1130: * @func = task execution function
1131: * @arg = 1st func argument
1132: * @id = Trigger ID
1133: * @opt_data = Optional data
1134: * @opt_dlen = Optional data length
1135: * return: NULL error or !=NULL new queued task
1136: */
1137: sched_task_t *
1138: schedSuspend(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id,
1139: void *opt_data, size_t opt_dlen)
1140: {
1141: sched_task_t *task;
1142: void *ptr;
1143:
1144: if (!root || !func)
1145: return NULL;
1146:
1147: /* get new task */
1148: if (!(task = sched_useTask(root)))
1149: return NULL;
1150:
1151: TASK_FUNC(task) = func;
1152: TASK_TYPE(task) = taskSUSPEND;
1153: TASK_ROOT(task) = root;
1154:
1155: TASK_ARG(task) = arg;
1156: TASK_VAL(task) = id;
1157:
1158: TASK_DATA(task) = opt_data;
1159: TASK_DATLEN(task) = opt_dlen;
1160:
1161: if (root->root_hooks.hook_add.suspend)
1162: ptr = root->root_hooks.hook_add.suspend(task, NULL);
1163: else
1164: ptr = NULL;
1165:
1166: if (!ptr)
1167: insert_task_to(task, &root->root_suspend);
1168: else
1169: task = sched_unuseTask(task);
1170:
1171: return task;
1172: }
1173:
1174: /*
1175: * schedCallOnce() - Call once from scheduler
1176: *
1177: * @root = root task
1178: * @func = task execution function
1179: * @arg = 1st func argument
1180: * @val = additional func argument
1181: * @opt_data = Optional data
1182: * @opt_dlen = Optional data length
1183: * return: return value from called func
1184: */
1185: sched_task_t *
1186: schedCallOnce(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val,
1187: void *opt_data, size_t opt_dlen)
1188: {
1189: sched_task_t *task;
1190: void *ret;
1191:
1192: if (!root || !func)
1193: return NULL;
1194:
1195: /* get new task */
1196: if (!(task = sched_useTask(root)))
1197: return NULL;
1198:
1199: TASK_FUNC(task) = func;
1200: TASK_TYPE(task) = taskEVENT;
1201: TASK_ROOT(task) = root;
1202:
1203: TASK_ARG(task) = arg;
1204: TASK_VAL(task) = val;
1205:
1206: TASK_DATA(task) = opt_data;
1207: TASK_DATLEN(task) = opt_dlen;
1208:
1209: ret = schedCall(task);
1210:
1211: sched_unuseTask(task);
1212: return ret;
1213: }
1214:
1215: /*
1216: * schedThread() - Add thread task to scheduler queue
1217: *
1218: * @root = root task
1219: * @func = task execution function
1220: * @arg = 1st func argument
1221: * @ss = stack size
1222: * @opt_data = Optional data
1223: * @opt_dlen = Optional data length
1224: * return: NULL error or !=NULL new queued task
1225: */
1226: sched_task_t *
1227: schedThread(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg,
1228: size_t ss, void *opt_data, size_t opt_dlen)
1229: {
1230: #ifndef HAVE_LIBPTHREAD
1231: sched_SetErr(ENOTSUP, "Not supported thread tasks");
1232: return NULL;
1233: #endif
1234: sched_task_t *task;
1235: pthread_attr_t attr;
1236: void *ptr;
1237:
1238: if (!root || !func)
1239: return NULL;
1240:
1241: /* get new task */
1242: if (!(task = sched_useTask(root)))
1243: return NULL;
1244:
1245: TASK_FUNC(task) = func;
1246: TASK_TYPE(task) = taskTHREAD;
1247: TASK_ROOT(task) = root;
1248:
1249: TASK_ARG(task) = arg;
1250:
1251: TASK_DATA(task) = opt_data;
1252: TASK_DATLEN(task) = opt_dlen;
1253:
1254: pthread_attr_init(&attr);
1255: pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
1256: if (ss && (errno = pthread_attr_setstacksize(&attr, ss))) {
1257: LOGERR;
1258: pthread_attr_destroy(&attr);
1259: return sched_unuseTask(task);
1260: }
1261: if ((errno = pthread_attr_getstacksize(&attr, &ss))) {
1262: LOGERR;
1263: pthread_attr_destroy(&attr);
1264: return sched_unuseTask(task);
1265: } else
1266: TASK_FLAG(task) = ss;
1267:
1268: #ifdef SCHED_RR
1269: pthread_attr_setschedpolicy(&attr, SCHED_RR);
1270: #else
1271: pthread_attr_setschedpolicy(&attr, SCHED_OTHER);
1272: #endif
1273:
1274: if (root->root_hooks.hook_add.thread)
1275: ptr = root->root_hooks.hook_add.thread(task, &attr);
1276: else
1277: ptr = NULL;
1278:
1279: if (!ptr)
1280: insert_task_to(task, &root->root_thread);
1281: else
1282: task = sched_unuseTask(task);
1283:
1284: pthread_attr_destroy(&attr);
1285: return task;
1286: }
1287:
1288: /*
1289: * schedRTC() - Add RTC task to scheduler queue
1290: *
1291: * @root = root task
1292: * @func = task execution function
1293: * @arg = 1st func argument
1294: * @ts = timeout argument structure, minimum alarm timer resolution is 1msec!
1295: * @opt_data = Optional RTC ID
1296: * @opt_dlen = Optional data length
1297: * return: NULL error or !=NULL new queued task
1298: */
1299: sched_task_t *
1300: schedRTC(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,
1301: void *opt_data, size_t opt_dlen)
1302: {
1303: #if defined(HAVE_LIBRT) && defined(HAVE_TIMER_CREATE) && \
1304: defined(HAVE_TIMER_SETTIME) && defined(HAVE_TIMER_DELETE)
1305: sched_task_t *task;
1306: void *ptr;
1307:
1308: if (!root || !func)
1309: return NULL;
1310:
1311: /* get new task */
1312: if (!(task = sched_useTask(root)))
1313: return NULL;
1314:
1315: TASK_FUNC(task) = func;
1316: TASK_TYPE(task) = taskRTC;
1317: TASK_ROOT(task) = root;
1318:
1319: TASK_ARG(task) = arg;
1320: TASK_TS(task) = ts;
1321:
1322: TASK_DATA(task) = opt_data;
1323: TASK_DATLEN(task) = opt_dlen;
1324:
1325: if (root->root_hooks.hook_add.rtc)
1326: ptr = root->root_hooks.hook_add.rtc(task, NULL);
1327: else
1328: ptr = NULL;
1329:
1330: if (!ptr)
1331: insert_task_to(task, &root->root_rtc);
1332: else
1333: task = sched_unuseTask(task);
1334:
1335: return task;
1336: #else
1337: sched_SetErr(ENOTSUP, "Not supported realtime clock extensions");
1338: return NULL;
1339: #endif
1340: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>