File:
[ELWIX - Embedded LightWeight unIX -] /
libaitsched /
src /
tasks.c
Revision
1.31:
download - view:
text,
annotated -
select for diffs -
revision graph
Sat Feb 25 15:55:01 2023 UTC (19 months ago) by
misho
Branches:
MAIN
CVS tags:
sched8_4,
sched8_3,
sched8_2,
sched8_1,
sched8_0,
sched7_9,
sched7_8,
sched7_7,
sched7_6,
sched7_5,
SCHED8_3,
SCHED8_2,
SCHED8_1,
SCHED8_0,
SCHED7_9,
SCHED7_8,
SCHED7_7,
SCHED7_6,
SCHED7_5,
SCHED7_4,
HEAD
version 7.4
1: /*************************************************************************
2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
6: * $Id: tasks.c,v 1.31 2023/02/25 15:55:01 misho Exp $
7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
15: Copyright 2004 - 2023
16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47:
48:
49: /*
50: * sched_useTask() - Get and init new task
51: *
52: * @root = root task
53: * return: NULL error or !=NULL prepared task
54: */
55: sched_task_t *
56: sched_useTask(sched_root_task_t * __restrict root)
57: {
58: sched_task_t *task, *tmp;
59:
60: SCHED_QLOCK(root, taskUNUSE);
61: TAILQ_FOREACH_SAFE(task, &root->root_unuse, task_node, tmp) {
62: if (!TASK_ISLOCKED(task)) {
63: TAILQ_REMOVE(&root->root_unuse, task, task_node);
64: break;
65: }
66: }
67: SCHED_QUNLOCK(root, taskUNUSE);
68:
69: if (!task) {
70: task = e_malloc(sizeof(sched_task_t));
71: if (!task) {
72: LOGERR;
73: return NULL;
74: }
75: }
76:
77: memset(task, 0, sizeof(sched_task_t));
78: task->task_id = (uintptr_t) task;
79: return task;
80: }
81:
82: /*
83: * sched_unuseTask() - Unlock and put task to unuse queue
84: *
85: * @task = task
86: * return: always is NULL
87: */
88: sched_task_t *
89: sched_unuseTask(sched_task_t * __restrict task)
90: {
91: TASK_UNLOCK(task);
92:
93: TASK_TYPE(task) = taskUNUSE;
94: insert_task_to(task, &(TASK_ROOT(task))->root_unuse);
95:
96: task = NULL;
97: return task;
98: }
99:
100: /*
101: * sched_taskExit() - Exit routine for scheduler task, explicit required for thread tasks
102: *
103: * @task = current task
104: * @retcode = return code
105: * return: return code
106: */
107: void *
108: sched_taskExit(sched_task_t *task, intptr_t retcode)
109: {
110: if (!task || !TASK_ROOT(task))
111: return (void*) -1;
112:
113: if (TASK_ROOT(task)->root_hooks.hook_exec.exit)
114: TASK_ROOT(task)->root_hooks.hook_exec.exit(task, (void*) retcode);
115:
116: TASK_ROOT(task)->root_ret = (void*) retcode;
117: return (void*) retcode;
118: }
119:
120:
121: /*
122: * schedRead() - Add READ I/O task to scheduler queue
123: *
124: * @root = root task
125: * @func = task execution function
126: * @arg = 1st func argument
127: * @fd = fd handle
128: * @opt_data = Optional data
129: * @opt_dlen = Optional data length
130: * return: NULL error or !=NULL new queued task
131: */
132: sched_task_t *
133: schedRead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
134: void *opt_data, size_t opt_dlen)
135: {
136: return schedReadExt(root, func, arg, fd, opt_data, opt_dlen, 0);
137: }
138:
139: /*
140: * schedReadExt() - Add READ I/O task to scheduler queue with custom event mask
141: *
142: * @root = root task
143: * @func = task execution function
144: * @arg = 1st func argument
145: * @fd = fd handle
146: * @opt_data = Optional data
147: * @opt_dlen = Optional data length
148: * @mask = Event mask
149: * return: NULL error or !=NULL new queued task
150: */
151: sched_task_t *
152: schedReadExt(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
153: void *opt_data, size_t opt_dlen, u_long mask)
154: {
155: sched_task_t *task;
156: void *ptr;
157:
158: if (!root || !func)
159: return NULL;
160:
161: /* get new task */
162: if (!(task = sched_useTask(root)))
163: return NULL;
164:
165: TASK_FUNC(task) = func;
166: TASK_TYPE(task) = taskREAD;
167: TASK_ROOT(task) = root;
168:
169: TASK_ARG(task) = arg;
170: TASK_FD(task) = fd;
171:
172: TASK_DATA(task) = opt_data;
173: TASK_DATLEN(task) = opt_dlen;
174:
175: TASK_HARG(task) = mask;
176:
177: if (root->root_hooks.hook_add.read)
178: ptr = root->root_hooks.hook_add.read(task,
179: (void*) task->task_harg);
180: else
181: ptr = NULL;
182:
183: if (!ptr)
184: insert_task_to(task, &root->root_read);
185: else
186: task = sched_unuseTask(task);
187:
188: return task;
189: }
190:
191: /*
192: * schedWrite() - Add WRITE I/O task to scheduler queue
193: *
194: * @root = root task
195: * @func = task execution function
196: * @arg = 1st func argument
197: * @fd = fd handle
198: * @opt_data = Optional data
199: * @opt_dlen = Optional data length
200: * return: NULL error or !=NULL new queued task
201: */
202: sched_task_t *
203: schedWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
204: void *opt_data, size_t opt_dlen)
205: {
206: return schedWriteExt(root, func, arg, fd, opt_data, opt_dlen, 0);
207: }
208:
209: /*
210: * schedWriteExt() - Add WRITE I/O task to scheduler queue with custom event mask
211: *
212: * @root = root task
213: * @func = task execution function
214: * @arg = 1st func argument
215: * @fd = fd handle
216: * @opt_data = Optional data
217: * @opt_dlen = Optional data length
218: * @mask = Event mask
219: * return: NULL error or !=NULL new queued task
220: */
221: sched_task_t *
222: schedWriteExt(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
223: void *opt_data, size_t opt_dlen, u_long mask)
224: {
225: sched_task_t *task;
226: void *ptr;
227:
228: if (!root || !func)
229: return NULL;
230:
231: /* get new task */
232: if (!(task = sched_useTask(root)))
233: return NULL;
234:
235: TASK_FUNC(task) = func;
236: TASK_TYPE(task) = taskWRITE;
237: TASK_ROOT(task) = root;
238:
239: TASK_ARG(task) = arg;
240: TASK_FD(task) = fd;
241:
242: TASK_DATA(task) = opt_data;
243: TASK_DATLEN(task) = opt_dlen;
244:
245: TASK_HARG(task) = mask;
246:
247: if (root->root_hooks.hook_add.write)
248: ptr = root->root_hooks.hook_add.write(task,
249: (void*) task->task_harg);
250: else
251: ptr = NULL;
252:
253: if (!ptr)
254: insert_task_to(task, &root->root_write);
255: else
256: task = sched_unuseTask(task);
257:
258: return task;
259: }
260:
261: /*
262: * schedNode() - Add NODE task to scheduler queue
263: *
264: * @root = root task
265: * @func = task execution function
266: * @arg = 1st func argument
267: * @fd = fd handle
268: * @opt_data = Optional data
269: * @opt_dlen = Optional data length
270: * return: NULL error or !=NULL new queued task
271: */
272: sched_task_t *
273: schedNode(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
274: void *opt_data, size_t opt_dlen)
275: {
276: #if SUP_ENABLE != KQ_SUPPORT
277: sched_SetErr(ENOTSUP, "disabled kqueue support");
278: return NULL;
279: #else
280: sched_task_t *task;
281: void *ptr;
282:
283: if (!root || !func)
284: return NULL;
285:
286: /* get new task */
287: if (!(task = sched_useTask(root)))
288: return NULL;
289:
290: TASK_FUNC(task) = func;
291: TASK_TYPE(task) = taskNODE;
292: TASK_ROOT(task) = root;
293:
294: TASK_ARG(task) = arg;
295: TASK_FD(task) = fd;
296:
297: TASK_DATA(task) = opt_data;
298: TASK_DATLEN(task) = opt_dlen;
299:
300: if (root->root_hooks.hook_add.node)
301: ptr = root->root_hooks.hook_add.node(task, NULL);
302: else
303: ptr = NULL;
304:
305: if (!ptr)
306: insert_task_to(task, &root->root_node);
307: else
308: task = sched_unuseTask(task);
309:
310: return task;
311: #endif /* KQ_SUPPORT */
312: }
313:
314: /*
315: * schedNode2() - Add NODE task with all events to scheduler queue
316: *
317: * @root = root task
318: * @func = task execution function
319: * @arg = 1st func argument
320: * @fd = fd handle
321: * @opt_data = Optional data
322: * @opt_dlen = Optional data length
323: * return: NULL error or !=NULL new queued task
324: */
325: sched_task_t *
326: schedNode2(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
327: void *opt_data, size_t opt_dlen)
328: {
329: #if SUP_ENABLE != KQ_SUPPORT
330: sched_SetErr(ENOTSUP, "disabled kqueue support");
331: return NULL;
332: #else
333: sched_task_t *task;
334: void *ptr;
335:
336: if (!root || !func)
337: return NULL;
338:
339: /* get new task */
340: if (!(task = sched_useTask(root)))
341: return NULL;
342:
343: TASK_FUNC(task) = func;
344: TASK_TYPE(task) = taskNODE;
345: TASK_ROOT(task) = root;
346:
347: TASK_ARG(task) = arg;
348: TASK_FD(task) = fd;
349:
350: TASK_DATA(task) = opt_data;
351: TASK_DATLEN(task) = opt_dlen;
352:
353: if (root->root_hooks.hook_add.node)
354: #ifdef __FreeBSD__
355: ptr = root->root_hooks.hook_add.node(task,
356: (void*) (NOTE_READ | NOTE_CLOSE_WRITE | NOTE_CLOSE | NOTE_OPEN));
357: #else
358: ptr = root->root_hooks.hook_add.node(task, NULL);
359: #endif
360: else
361: ptr = NULL;
362:
363: if (!ptr)
364: insert_task_to(task, &root->root_node);
365: else
366: task = sched_unuseTask(task);
367:
368: return task;
369: #endif /* KQ_SUPPORT */
370: }
371:
372: /*
373: * schedProc() - Add PROC task to scheduler queue
374: *
375: * @root = root task
376: * @func = task execution function
377: * @arg = 1st func argument
378: * @pid = PID
379: * @opt_data = Optional data
380: * @opt_dlen = Optional data length
381: * return: NULL error or !=NULL new queued task
382: */
383: sched_task_t *
384: schedProc(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long pid,
385: void *opt_data, size_t opt_dlen)
386: {
387: #if SUP_ENABLE != KQ_SUPPORT
388: sched_SetErr(ENOTSUP, "disabled kqueue support");
389: return NULL;
390: #else
391: sched_task_t *task;
392: void *ptr;
393:
394: if (!root || !func)
395: return NULL;
396:
397: /* get new task */
398: if (!(task = sched_useTask(root)))
399: return NULL;
400:
401: TASK_FUNC(task) = func;
402: TASK_TYPE(task) = taskPROC;
403: TASK_ROOT(task) = root;
404:
405: TASK_ARG(task) = arg;
406: TASK_VAL(task) = pid;
407:
408: TASK_DATA(task) = opt_data;
409: TASK_DATLEN(task) = opt_dlen;
410:
411: if (root->root_hooks.hook_add.proc)
412: ptr = root->root_hooks.hook_add.proc(task, NULL);
413: else
414: ptr = NULL;
415:
416: if (!ptr)
417: insert_task_to(task, &root->root_proc);
418: else
419: task = sched_unuseTask(task);
420:
421: return task;
422: #endif /* KQ_SUPPORT */
423: }
424:
425: /*
426: * schedUser() - Add trigger USER task to scheduler queue
427: *
428: * @root = root task
429: * @func = task execution function
430: * @arg = 1st func argument
431: * @id = Trigger ID
432: * @opt_data = Optional data
433: * @opt_dlen = Optional user's trigger flags
434: * return: NULL error or !=NULL new queued task
435: */
436: sched_task_t *
437: schedUser(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id,
438: void *opt_data, size_t opt_dlen)
439: {
440: #if SUP_ENABLE != KQ_SUPPORT
441: sched_SetErr(ENOTSUP, "disabled kqueue support");
442: return NULL;
443: #else
444: #ifndef EVFILT_USER
445: sched_SetErr(ENOTSUP, "Not supported kevent() filter");
446: return NULL;
447: #else
448: sched_task_t *task;
449: void *ptr;
450:
451: if (!root || !func)
452: return NULL;
453:
454: /* get new task */
455: if (!(task = sched_useTask(root)))
456: return NULL;
457:
458: TASK_FUNC(task) = func;
459: TASK_TYPE(task) = taskUSER;
460: TASK_ROOT(task) = root;
461:
462: TASK_ARG(task) = arg;
463: TASK_VAL(task) = id;
464:
465: TASK_DATA(task) = opt_data;
466: TASK_DATLEN(task) = opt_dlen;
467:
468: if (root->root_hooks.hook_add.user)
469: ptr = root->root_hooks.hook_add.user(task, NULL);
470: else
471: ptr = NULL;
472:
473: if (!ptr)
474: insert_task_to(task, &root->root_user);
475: else
476: task = sched_unuseTask(task);
477:
478: return task;
479: #endif /* EVFILT_USER */
480: #endif /* KQ_SUPPORT */
481: }
482:
483: /*
484: * schedSignal() - Add SIGNAL task to scheduler queue
485: *
486: * @root = root task
487: * @func = task execution function
488: * @arg = 1st func argument
489: * @sig = Signal
490: * @opt_data = Optional data
491: * @opt_dlen = Optional data length
492: * return: NULL error or !=NULL new queued task
493: */
494: sched_task_t *
495: schedSignal(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long sig,
496: void *opt_data, size_t opt_dlen)
497: {
498: sched_task_t *task;
499: void *ptr;
500:
501: if (!root || !func)
502: return NULL;
503:
504: /* get new task */
505: if (!(task = sched_useTask(root)))
506: return NULL;
507:
508: TASK_FUNC(task) = func;
509: TASK_TYPE(task) = taskSIGNAL;
510: TASK_ROOT(task) = root;
511:
512: TASK_ARG(task) = arg;
513: TASK_VAL(task) = sig;
514:
515: TASK_DATA(task) = opt_data;
516: TASK_DATLEN(task) = opt_dlen;
517:
518: if (root->root_hooks.hook_add.signal)
519: ptr = root->root_hooks.hook_add.signal(task, NULL);
520: else
521: ptr = NULL;
522:
523: if (!ptr)
524: insert_task_to(task, &root->root_signal);
525: else
526: task = sched_unuseTask(task);
527:
528: return task;
529: }
530:
531: /*
532: * schedAlarm() - Add ALARM task to scheduler queue
533: *
534: * @root = root task
535: * @func = task execution function
536: * @arg = 1st func argument
537: * @ts = timeout argument structure, minimum alarm timer resolution is 1msec!
538: * @opt_data = Alarm timer ID
539: * @opt_dlen = Optional data length
540: * return: NULL error or !=NULL new queued task
541: */
542: sched_task_t *
543: schedAlarm(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,
544: void *opt_data, size_t opt_dlen)
545: {
546: #if SUP_ENABLE != KQ_SUPPORT
547: sched_SetErr(ENOTSUP, "disabled kqueue support");
548: return NULL;
549: #else
550: sched_task_t *task;
551: void *ptr;
552:
553: if (!root || !func)
554: return NULL;
555:
556: /* get new task */
557: if (!(task = sched_useTask(root)))
558: return NULL;
559:
560: TASK_FUNC(task) = func;
561: TASK_TYPE(task) = taskALARM;
562: TASK_ROOT(task) = root;
563:
564: TASK_ARG(task) = arg;
565: TASK_TS(task) = ts;
566:
567: TASK_DATA(task) = opt_data;
568: TASK_DATLEN(task) = opt_dlen;
569:
570: if (root->root_hooks.hook_add.alarm)
571: ptr = root->root_hooks.hook_add.alarm(task, NULL);
572: else
573: ptr = NULL;
574:
575: if (!ptr)
576: insert_task_to(task, &root->root_alarm);
577: else
578: task = sched_unuseTask(task);
579:
580: return task;
581: #endif /* KQ_SUPPORT */
582: }
583:
584: #ifdef AIO_SUPPORT
585: /*
586: * schedAIO() - Add AIO task to scheduler queue
587: *
588: * @root = root task
589: * @func = task execution function
590: * @arg = 1st func argument
591: * @acb = AIO cb structure address
592: * @opt_data = Optional data
593: * @opt_dlen = Optional data length
594: * return: NULL error or !=NULL new queued task
595: */
596: sched_task_t *
597: schedAIO(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg,
598: struct aiocb * __restrict acb, void *opt_data, size_t opt_dlen)
599: {
600: #if SUP_ENABLE != KQ_SUPPORT
601: sched_SetErr(ENOTSUP, "disabled kqueue support");
602: return NULL;
603: #else
604: sched_task_t *task;
605: void *ptr;
606:
607: if (!root || !func || !acb || !opt_dlen)
608: return NULL;
609:
610: /* get new task */
611: if (!(task = sched_useTask(root)))
612: return NULL;
613:
614: TASK_FUNC(task) = func;
615: TASK_TYPE(task) = taskAIO;
616: TASK_ROOT(task) = root;
617:
618: TASK_ARG(task) = arg;
619: TASK_VAL(task) = (u_long) acb;
620:
621: TASK_DATA(task) = opt_data;
622: TASK_DATLEN(task) = opt_dlen;
623:
624: if (root->root_hooks.hook_add.aio)
625: ptr = root->root_hooks.hook_add.aio(task, NULL);
626: else
627: ptr = NULL;
628:
629: if (!ptr)
630: insert_task_to(task, &root->root_aio);
631: else
632: task = sched_unuseTask(task);
633:
634: return task;
635: #endif /* KQ_SUPPORT */
636: }
637:
638: /*
639: * schedAIORead() - Add AIO read task to scheduler queue
640: *
641: * @root = root task
642: * @func = task execution function
643: * @arg = 1st func argument
644: * @fd = file descriptor
645: * @buffer = Buffer
646: * @buflen = Buffer length
647: * @offset = Offset from start of file, if =-1 from current position
648: * return: NULL error or !=NULL new queued task
649: */
650: sched_task_t *
651: schedAIORead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
652: void *buffer, size_t buflen, off_t offset)
653: {
654: #if SUP_ENABLE != KQ_SUPPORT
655: sched_SetErr(ENOTSUP, "disabled kqueue support");
656: return NULL;
657: #else
658: struct aiocb *acb;
659: off_t off;
660:
661: if (!root || !func || !buffer || !buflen)
662: return NULL;
663:
664: if (offset == (off_t) -1) {
665: off = lseek(fd, 0, SEEK_CUR);
666: if (off == -1) {
667: LOGERR;
668: return NULL;
669: }
670: } else
671: off = offset;
672:
673: if (!(acb = e_malloc(sizeof(struct aiocb)))) {
674: LOGERR;
675: return NULL;
676: } else
677: memset(acb, 0, sizeof(struct aiocb));
678:
679: acb->aio_fildes = fd;
680: acb->aio_nbytes = buflen;
681: acb->aio_buf = buffer;
682: acb->aio_offset = off;
683: acb->aio_sigevent.sigev_notify = SIGEV_KEVENT;
684: acb->aio_sigevent.sigev_notify_kqueue = root->root_kq;
685: acb->aio_sigevent.sigev_value.sival_ptr = acb;
686:
687: if (aio_read(acb)) {
688: LOGERR;
689: e_free(acb);
690: return NULL;
691: }
692:
693: return schedAIO(root, func, arg, acb, buffer, buflen);
694: #endif /* KQ_SUPPORT */
695: }
696:
697: /*
698: * schedAIOWrite() - Add AIO write task to scheduler queue
699: *
700: * @root = root task
701: * @func = task execution function
702: * @arg = 1st func argument
703: * @fd = file descriptor
704: * @buffer = Buffer
705: * @buflen = Buffer length
706: * @offset = Offset from start of file, if =-1 from current position
707: * return: NULL error or !=NULL new queued task
708: */
709: sched_task_t *
710: schedAIOWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
711: void *buffer, size_t buflen, off_t offset)
712: {
713: #if SUP_ENABLE != KQ_SUPPORT
714: sched_SetErr(ENOTSUP, "disabled kqueue support");
715: return NULL;
716: #else
717: struct aiocb *acb;
718: off_t off;
719:
720: if (!root || !func || !buffer || !buflen)
721: return NULL;
722:
723: if (offset == (off_t) -1) {
724: off = lseek(fd, 0, SEEK_CUR);
725: if (off == -1) {
726: LOGERR;
727: return NULL;
728: }
729: } else
730: off = offset;
731:
732: if (!(acb = e_malloc(sizeof(struct aiocb)))) {
733: LOGERR;
734: return NULL;
735: } else
736: memset(acb, 0, sizeof(struct aiocb));
737:
738: acb->aio_fildes = fd;
739: acb->aio_nbytes = buflen;
740: acb->aio_buf = buffer;
741: acb->aio_offset = off;
742: acb->aio_sigevent.sigev_notify = SIGEV_KEVENT;
743: acb->aio_sigevent.sigev_notify_kqueue = root->root_kq;
744: acb->aio_sigevent.sigev_value.sival_ptr = acb;
745:
746: if (aio_write(acb)) {
747: LOGERR;
748: e_free(acb);
749: return NULL;
750: }
751:
752: return schedAIO(root, func, arg, acb, buffer, buflen);
753: #endif /* KQ_SUPPORT */
754: }
755:
756: #ifdef EVFILT_LIO
757: /*
758: * schedLIO() - Add AIO bulk tasks to scheduler queue
759: *
760: * @root = root task
761: * @func = task execution function
762: * @arg = 1st func argument
763: * @acbs = AIO cb structure addresses
764: * @opt_data = Optional data
765: * @opt_dlen = Optional data length
766: * return: NULL error or !=NULL new queued task
767: */
768: sched_task_t *
769: schedLIO(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg,
770: struct aiocb ** __restrict acbs, void *opt_data, size_t opt_dlen)
771: {
772: #if SUP_ENABLE != KQ_SUPPORT
773: sched_SetErr(ENOTSUP, "disabled kqueue support");
774: return NULL;
775: #else
776: sched_task_t *task;
777: void *ptr;
778:
779: if (!root || !func || !acbs || !opt_dlen)
780: return NULL;
781:
782: /* get new task */
783: if (!(task = sched_useTask(root)))
784: return NULL;
785:
786: TASK_FUNC(task) = func;
787: TASK_TYPE(task) = taskLIO;
788: TASK_ROOT(task) = root;
789:
790: TASK_ARG(task) = arg;
791: TASK_VAL(task) = (u_long) acbs;
792:
793: TASK_DATA(task) = opt_data;
794: TASK_DATLEN(task) = opt_dlen;
795:
796: if (root->root_hooks.hook_add.lio)
797: ptr = root->root_hooks.hook_add.lio(task, NULL);
798: else
799: ptr = NULL;
800:
801: if (!ptr)
802: insert_task_to(task, &root->root_lio);
803: else
804: task = sched_unuseTask(task);
805:
806: return task;
807: #endif /* KQ_SUPPORT */
808: }
809:
810: /*
811: * schedLIORead() - Add list of AIO read tasks to scheduler queue
812: *
813: * @root = root task
814: * @func = task execution function
815: * @arg = 1st func argument
816: * @fd = file descriptor
817: * @bufs = Buffer's list
818: * @nbufs = Number of Buffers
819: * @offset = Offset from start of file, if =-1 from current position
820: * return: NULL error or !=NULL new queued task
821: */
822: sched_task_t *
823: schedLIORead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
824: struct iovec *bufs, size_t nbufs, off_t offset)
825: {
826: #if SUP_ENABLE != KQ_SUPPORT
827: sched_SetErr(ENOTSUP, "disabled kqueue support");
828: return NULL;
829: #else
830: struct sigevent sig;
831: struct aiocb **acb;
832: off_t off;
833: register int i;
834:
835: if (!root || !func || !bufs || !nbufs)
836: return NULL;
837:
838: if (offset == (off_t) -1) {
839: off = lseek(fd, 0, SEEK_CUR);
840: if (off == -1) {
841: LOGERR;
842: return NULL;
843: }
844: } else
845: off = offset;
846:
847: if (!(acb = e_calloc(sizeof(void*), nbufs))) {
848: LOGERR;
849: return NULL;
850: } else
851: memset(acb, 0, sizeof(void*) * nbufs);
852: for (i = 0; i < nbufs; off += bufs[i++].iov_len) {
853: acb[i] = e_malloc(sizeof(struct aiocb));
854: if (!acb[i]) {
855: LOGERR;
856: for (i = 0; i < nbufs; i++)
857: if (acb[i])
858: e_free(acb[i]);
859: e_free(acb);
860: return NULL;
861: } else
862: memset(acb[i], 0, sizeof(struct aiocb));
863: acb[i]->aio_fildes = fd;
864: acb[i]->aio_nbytes = bufs[i].iov_len;
865: acb[i]->aio_buf = bufs[i].iov_base;
866: acb[i]->aio_offset = off;
867: acb[i]->aio_lio_opcode = LIO_READ;
868: }
869: memset(&sig, 0, sizeof sig);
870: sig.sigev_notify = SIGEV_KEVENT;
871: sig.sigev_notify_kqueue = root->root_kq;
872: sig.sigev_value.sival_ptr = acb;
873:
874: if (lio_listio(LIO_NOWAIT, acb, nbufs, &sig)) {
875: LOGERR;
876: for (i = 0; i < nbufs; i++)
877: if (acb[i])
878: e_free(acb[i]);
879: e_free(acb);
880: return NULL;
881: }
882:
883: return schedLIO(root, func, arg, (void*) acb, bufs, nbufs);
884: #endif /* KQ_SUPPORT */
885: }
886:
887: /*
888: * schedLIOWrite() - Add list of AIO write tasks to scheduler queue
889: *
890: * @root = root task
891: * @func = task execution function
892: * @arg = 1st func argument
893: * @fd = file descriptor
894: * @bufs = Buffer's list
895: * @nbufs = Number of Buffers
896: * @offset = Offset from start of file, if =-1 from current position
897: * return: NULL error or !=NULL new queued task
898: */
899: sched_task_t *
900: schedLIOWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
901: struct iovec *bufs, size_t nbufs, off_t offset)
902: {
903: #if SUP_ENABLE != KQ_SUPPORT
904: sched_SetErr(ENOTSUP, "disabled kqueue support");
905: return NULL;
906: #else
907: struct sigevent sig;
908: struct aiocb **acb;
909: off_t off;
910: register int i;
911:
912: if (!root || !func || !bufs || !nbufs)
913: return NULL;
914:
915: if (offset == (off_t) -1) {
916: off = lseek(fd, 0, SEEK_CUR);
917: if (off == -1) {
918: LOGERR;
919: return NULL;
920: }
921: } else
922: off = offset;
923:
924: if (!(acb = e_calloc(sizeof(void*), nbufs))) {
925: LOGERR;
926: return NULL;
927: } else
928: memset(acb, 0, sizeof(void*) * nbufs);
929: for (i = 0; i < nbufs; off += bufs[i++].iov_len) {
930: acb[i] = e_malloc(sizeof(struct aiocb));
931: if (!acb[i]) {
932: LOGERR;
933: for (i = 0; i < nbufs; i++)
934: if (acb[i])
935: e_free(acb[i]);
936: e_free(acb);
937: return NULL;
938: } else
939: memset(acb[i], 0, sizeof(struct aiocb));
940: acb[i]->aio_fildes = fd;
941: acb[i]->aio_nbytes = bufs[i].iov_len;
942: acb[i]->aio_buf = bufs[i].iov_base;
943: acb[i]->aio_offset = off;
944: acb[i]->aio_lio_opcode = LIO_WRITE;
945: }
946: memset(&sig, 0, sizeof sig);
947: sig.sigev_notify = SIGEV_KEVENT;
948: sig.sigev_notify_kqueue = root->root_kq;
949: sig.sigev_value.sival_ptr = acb;
950:
951: if (lio_listio(LIO_NOWAIT, acb, nbufs, &sig)) {
952: LOGERR;
953: for (i = 0; i < nbufs; i++)
954: if (acb[i])
955: e_free(acb[i]);
956: e_free(acb);
957: return NULL;
958: }
959:
960: return schedLIO(root, func, arg, (void*) acb, bufs, nbufs);
961: #endif /* KQ_SUPPORT */
962: }
963: #endif /* EVFILT_LIO */
964: #endif /* AIO_SUPPORT */
965:
966: /*
967: * schedTimer() - Add TIMER task to scheduler queue
968: *
969: * @root = root task
970: * @func = task execution function
971: * @arg = 1st func argument
972: * @ts = timeout argument structure
973: * @opt_data = Optional data
974: * @opt_dlen = Optional data length
975: * return: NULL error or !=NULL new queued task
976: */
977: sched_task_t *
978: schedTimer(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,
979: void *opt_data, size_t opt_dlen)
980: {
981: sched_task_t *task, *tmp, *t = NULL;
982: void *ptr;
983: struct timespec now;
984:
985: if (!root || !func)
986: return NULL;
987:
988: /* get new task */
989: if (!(task = sched_useTask(root)))
990: return NULL;
991:
992: TASK_FUNC(task) = func;
993: TASK_TYPE(task) = taskTIMER;
994: TASK_ROOT(task) = root;
995:
996: TASK_ARG(task) = arg;
997:
998: TASK_DATA(task) = opt_data;
999: TASK_DATLEN(task) = opt_dlen;
1000:
1001: /* calculate timeval structure */
1002: clock_gettime(CLOCK_MONOTONIC, &now);
1003: now.tv_sec += ts.tv_sec;
1004: now.tv_nsec += ts.tv_nsec;
1005: if (now.tv_nsec >= 1000000000L) {
1006: now.tv_sec++;
1007: now.tv_nsec -= 1000000000L;
1008: } else if (now.tv_nsec < 0) {
1009: now.tv_sec--;
1010: now.tv_nsec += 1000000000L;
1011: }
1012: TASK_TS(task) = now;
1013:
1014: if (root->root_hooks.hook_add.timer)
1015: ptr = root->root_hooks.hook_add.timer(task, NULL);
1016: else
1017: ptr = NULL;
1018:
1019: if (!ptr) {
1020: SCHED_QLOCK(root, taskTIMER);
1021: #ifdef TIMER_WITHOUT_SORT
1022: TAILQ_INSERT_TAIL(&root->root_timer, task, task_node);
1023: #else
1024: TAILQ_FOREACH_SAFE(t, &root->root_timer, task_node, tmp)
1025: if (sched_timespeccmp(&TASK_TS(task), &TASK_TS(t), -) < 1)
1026: break;
1027: if (!t)
1028: TAILQ_INSERT_TAIL(&root->root_timer, task, task_node);
1029: else
1030: TAILQ_INSERT_BEFORE(t, task, task_node);
1031: #endif
1032: SCHED_QUNLOCK(root, taskTIMER);
1033: } else
1034: task = sched_unuseTask(task);
1035:
1036: return task;
1037: }
1038:
1039: /*
1040: * schedEvent() - Add EVENT task to scheduler queue
1041: *
1042: * @root = root task
1043: * @func = task execution function
1044: * @arg = 1st func argument
1045: * @val = additional func argument
1046: * @opt_data = Optional data
1047: * @opt_dlen = Optional data length
1048: * return: NULL error or !=NULL new queued task
1049: */
1050: sched_task_t *
1051: schedEvent(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val,
1052: void *opt_data, size_t opt_dlen)
1053: {
1054: sched_task_t *task;
1055: void *ptr;
1056:
1057: if (!root || !func)
1058: return NULL;
1059:
1060: /* get new task */
1061: if (!(task = sched_useTask(root)))
1062: return NULL;
1063:
1064: TASK_FUNC(task) = func;
1065: TASK_TYPE(task) = taskEVENT;
1066: TASK_ROOT(task) = root;
1067:
1068: TASK_ARG(task) = arg;
1069: TASK_VAL(task) = val;
1070:
1071: TASK_DATA(task) = opt_data;
1072: TASK_DATLEN(task) = opt_dlen;
1073:
1074: if (root->root_hooks.hook_add.event)
1075: ptr = root->root_hooks.hook_add.event(task, NULL);
1076: else
1077: ptr = NULL;
1078:
1079: if (!ptr)
1080: insert_task_to(task, &root->root_event);
1081: else
1082: task = sched_unuseTask(task);
1083:
1084: return task;
1085: }
1086:
1087:
1088: /*
1089: * schedTask() - Add regular task to scheduler queue
1090: *
1091: * @root = root task
1092: * @func = task execution function
1093: * @arg = 1st func argument
1094: * @prio = regular task priority, 0 is hi priority for regular tasks
1095: * @opt_data = Optional data
1096: * @opt_dlen = Optional data length
1097: * return: NULL error or !=NULL new queued task
1098: */
1099: sched_task_t *
1100: schedTask(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long prio,
1101: void *opt_data, size_t opt_dlen)
1102: {
1103: sched_task_t *task, *tmp, *t = NULL;
1104: void *ptr;
1105:
1106: if (!root || !func)
1107: return NULL;
1108:
1109: /* get new task */
1110: if (!(task = sched_useTask(root)))
1111: return NULL;
1112:
1113: TASK_FUNC(task) = func;
1114: TASK_TYPE(task) = taskTASK;
1115: TASK_ROOT(task) = root;
1116:
1117: TASK_ARG(task) = arg;
1118: TASK_VAL(task) = prio;
1119:
1120: TASK_DATA(task) = opt_data;
1121: TASK_DATLEN(task) = opt_dlen;
1122:
1123: if (root->root_hooks.hook_add.task)
1124: ptr = root->root_hooks.hook_add.task(task, NULL);
1125: else
1126: ptr = NULL;
1127:
1128: if (!ptr) {
1129: SCHED_QLOCK(root, taskTASK);
1130: TAILQ_FOREACH_SAFE(t, &root->root_task, task_node, tmp)
1131: if (TASK_VAL(task) < TASK_VAL(t))
1132: break;
1133: if (!t)
1134: TAILQ_INSERT_TAIL(&root->root_task, task, task_node);
1135: else
1136: TAILQ_INSERT_BEFORE(t, task, task_node);
1137: SCHED_QUNLOCK(root, taskTASK);
1138: } else
1139: task = sched_unuseTask(task);
1140:
1141: return task;
1142: }
1143:
1144: /*
1145: * schedSuspend() - Add Suspended task to scheduler queue
1146: *
1147: * @root = root task
1148: * @func = task execution function
1149: * @arg = 1st func argument
1150: * @id = Trigger ID
1151: * @opt_data = Optional data
1152: * @opt_dlen = Optional data length
1153: * return: NULL error or !=NULL new queued task
1154: */
1155: sched_task_t *
1156: schedSuspend(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id,
1157: void *opt_data, size_t opt_dlen)
1158: {
1159: sched_task_t *task;
1160: void *ptr;
1161:
1162: if (!root || !func)
1163: return NULL;
1164:
1165: /* get new task */
1166: if (!(task = sched_useTask(root)))
1167: return NULL;
1168:
1169: TASK_FUNC(task) = func;
1170: TASK_TYPE(task) = taskSUSPEND;
1171: TASK_ROOT(task) = root;
1172:
1173: TASK_ARG(task) = arg;
1174: TASK_VAL(task) = id;
1175:
1176: TASK_DATA(task) = opt_data;
1177: TASK_DATLEN(task) = opt_dlen;
1178:
1179: if (root->root_hooks.hook_add.suspend)
1180: ptr = root->root_hooks.hook_add.suspend(task, NULL);
1181: else
1182: ptr = NULL;
1183:
1184: if (!ptr)
1185: insert_task_to(task, &root->root_suspend);
1186: else
1187: task = sched_unuseTask(task);
1188:
1189: return task;
1190: }
1191:
1192: /*
1193: * schedCallOnce() - Call once from scheduler
1194: *
1195: * @root = root task
1196: * @func = task execution function
1197: * @arg = 1st func argument
1198: * @val = additional func argument
1199: * @opt_data = Optional data
1200: * @opt_dlen = Optional data length
1201: * return: return value from called func
1202: */
1203: sched_task_t *
1204: schedCallOnce(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val,
1205: void *opt_data, size_t opt_dlen)
1206: {
1207: sched_task_t *task;
1208: void *ret;
1209:
1210: if (!root || !func)
1211: return NULL;
1212:
1213: /* get new task */
1214: if (!(task = sched_useTask(root)))
1215: return NULL;
1216:
1217: TASK_FUNC(task) = func;
1218: TASK_TYPE(task) = taskEVENT;
1219: TASK_ROOT(task) = root;
1220:
1221: TASK_ARG(task) = arg;
1222: TASK_VAL(task) = val;
1223:
1224: TASK_DATA(task) = opt_data;
1225: TASK_DATLEN(task) = opt_dlen;
1226:
1227: ret = schedCall(task);
1228:
1229: sched_unuseTask(task);
1230: return ret;
1231: }
1232:
1233: /*
1234: * schedThread() - Add thread task to scheduler queue
1235: *
1236: * @root = root task
1237: * @func = task execution function
1238: * @arg = 1st func argument
1239: * @ss = stack size
1240: * @opt_data = Optional data
1241: * @opt_dlen = Optional data length
1242: * return: NULL error or !=NULL new queued task
1243: */
1244: sched_task_t *
1245: schedThread(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg,
1246: size_t ss, void *opt_data, size_t opt_dlen)
1247: {
1248: #ifndef HAVE_LIBPTHREAD
1249: sched_SetErr(ENOTSUP, "Not supported thread tasks");
1250: return NULL;
1251: #endif
1252: sched_task_t *task;
1253: pthread_attr_t attr;
1254: void *ptr;
1255:
1256: if (!root || !func)
1257: return NULL;
1258:
1259: /* get new task */
1260: if (!(task = sched_useTask(root)))
1261: return NULL;
1262:
1263: TASK_FUNC(task) = func;
1264: TASK_TYPE(task) = taskTHREAD;
1265: TASK_ROOT(task) = root;
1266:
1267: TASK_ARG(task) = arg;
1268:
1269: TASK_DATA(task) = opt_data;
1270: TASK_DATLEN(task) = opt_dlen;
1271:
1272: pthread_attr_init(&attr);
1273: pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
1274: if (ss && (errno = pthread_attr_setstacksize(&attr, ss))) {
1275: LOGERR;
1276: pthread_attr_destroy(&attr);
1277: return sched_unuseTask(task);
1278: }
1279: if ((errno = pthread_attr_getstacksize(&attr, &ss))) {
1280: LOGERR;
1281: pthread_attr_destroy(&attr);
1282: return sched_unuseTask(task);
1283: } else
1284: TASK_FLAG(task) = ss;
1285:
1286: #ifdef SCHED_RR
1287: pthread_attr_setschedpolicy(&attr, SCHED_RR);
1288: #else
1289: pthread_attr_setschedpolicy(&attr, SCHED_OTHER);
1290: #endif
1291:
1292: if (root->root_hooks.hook_add.thread)
1293: ptr = root->root_hooks.hook_add.thread(task, &attr);
1294: else
1295: ptr = NULL;
1296:
1297: if (!ptr)
1298: insert_task_to(task, &root->root_thread);
1299: else
1300: task = sched_unuseTask(task);
1301:
1302: pthread_attr_destroy(&attr);
1303: return task;
1304: }
1305:
1306: /*
1307: * schedRTC() - Add RTC task to scheduler queue
1308: *
1309: * @root = root task
1310: * @func = task execution function
1311: * @arg = 1st func argument
1312: * @ts = timeout argument structure, minimum alarm timer resolution is 1msec!
1313: * @opt_data = Optional RTC ID
1314: * @opt_dlen = Optional data length
1315: * return: NULL error or !=NULL new queued task
1316: */
1317: sched_task_t *
1318: schedRTC(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,
1319: void *opt_data, size_t opt_dlen)
1320: {
1321: #if defined(HAVE_LIBRT) && defined(HAVE_TIMER_CREATE) && \
1322: defined(HAVE_TIMER_SETTIME) && defined(HAVE_TIMER_DELETE)
1323: sched_task_t *task;
1324: void *ptr;
1325:
1326: if (!root || !func)
1327: return NULL;
1328:
1329: /* get new task */
1330: if (!(task = sched_useTask(root)))
1331: return NULL;
1332:
1333: TASK_FUNC(task) = func;
1334: TASK_TYPE(task) = taskRTC;
1335: TASK_ROOT(task) = root;
1336:
1337: TASK_ARG(task) = arg;
1338: TASK_TS(task) = ts;
1339:
1340: TASK_DATA(task) = opt_data;
1341: TASK_DATLEN(task) = opt_dlen;
1342:
1343: if (root->root_hooks.hook_add.rtc)
1344: ptr = root->root_hooks.hook_add.rtc(task, NULL);
1345: else
1346: ptr = NULL;
1347:
1348: if (!ptr)
1349: insert_task_to(task, &root->root_rtc);
1350: else
1351: task = sched_unuseTask(task);
1352:
1353: return task;
1354: #else
1355: sched_SetErr(ENOTSUP, "Not supported realtime clock extensions");
1356: return NULL;
1357: #endif
1358: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>