1: /*************************************************************************
2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
6: * $Id: tasks.c,v 1.13.2.3 2012/08/22 23:43:36 misho Exp $
7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47:
48:
49: /*
50: * sched_useTask() - Get and init new task
51: *
52: * @root = root task
53: * return: NULL error or !=NULL prepared task
54: */
55: inline sched_task_t *
56: sched_useTask(sched_root_task_t * __restrict root)
57: {
58: sched_task_t *task, *tmp;
59:
60: TAILQ_FOREACH_SAFE(task, &root->root_unuse, task_node, tmp) {
61: if (!TASK_ISLOCKED(task)) {
62: #ifdef HAVE_LIBPTHREAD
63: pthread_mutex_lock(&root->root_mtx[taskUNUSE]);
64: #endif
65: TAILQ_REMOVE(&root->root_unuse, task, task_node);
66: #ifdef HAVE_LIBPTHREAD
67: pthread_mutex_unlock(&root->root_mtx[taskUNUSE]);
68: #endif
69: break;
70: }
71: }
72:
73: if (!task) {
74: task = malloc(sizeof(sched_task_t));
75: if (!task) {
76: LOGERR;
77: return NULL;
78: }
79: }
80:
81: memset(task, 0, sizeof(sched_task_t));
82: task->task_id = (uintptr_t) task;
83: return task;
84: }
85:
86: /*
87: * sched_unuseTask() - Unlock and put task to unuse queue
88: *
89: * @task = task
90: * return: always is NULL
91: */
92: inline sched_task_t *
93: sched_unuseTask(sched_task_t * __restrict task)
94: {
95: TASK_UNLOCK(task);
96: TASK_TYPE(task) = taskUNUSE;
97: #ifdef HAVE_LIBPTHREAD
98: pthread_mutex_lock(&TASK_ROOT(task)->root_mtx[taskUNUSE]);
99: #endif
100: TAILQ_INSERT_TAIL(&TASK_ROOT(task)->root_unuse, TASK_ID(task), task_node);
101: #ifdef HAVE_LIBPTHREAD
102: pthread_mutex_unlock(&TASK_ROOT(task)->root_mtx[taskUNUSE]);
103: #endif
104: task = NULL;
105:
106: return task;
107: }
108:
109: #pragma GCC visibility push(hidden)
110:
111: void *
112: _sched_threadJoin(sched_task_t *task)
113: {
114: void *ret = NULL;
115:
116: if (!task)
117: return NULL;
118:
119: #ifdef HAVE_LIBPTHREAD
120: if (pthread_kill((pthread_t) TASK_VAL(task), 0)) {
121: pthread_join((pthread_t) TASK_VAL(task), &ret);
122: TASK_ROOT(task)->root_ret = ret;
123: } else {
124: usleep(10000);
125: schedTaskSelf(task);
126: }
127: #endif
128:
129: return NULL;
130: }
131:
132: #pragma GCC visibility pop
133:
134: /*
135: * sched_taskExit() - Exit routine for scheduler task, explicit required for thread tasks
136: *
137: * @task = current task
138: * @retcode = return code
139: * return: return code
140: */
141: inline void *
142: sched_taskExit(sched_task_t *task, intptr_t retcode)
143: {
144: if (!task || !TASK_ROOT(task))
145: return (void*) -1;
146:
147: if (TASK_ROOT(task)->root_hooks.hook_exec.exit)
148: TASK_ROOT(task)->root_hooks.hook_exec.exit(task, (void*) retcode);
149:
150: TASK_ROOT(task)->root_ret = (void*) retcode;
151:
152: #ifdef HAVE_LIBPTHREAD
153: if (TASK_TYPE(task) == taskTHREAD) {
154: if (TASK_FLAG(task) == PTHREAD_CREATE_JOINABLE) /* joinable thread */
155: schedTask(TASK_ROOT(task), _sched_threadJoin, TASK_ARG(task),
156: TASK_VAL(task), TASK_DATA(task), TASK_DATLEN(task));
157: sched_unuseTask(task);
158: pthread_exit((void*) retcode);
159: }
160: #endif
161:
162: return (void*) retcode;
163: }
164:
165:
166: /*
167: * schedRead() - Add READ I/O task to scheduler queue
168: *
169: * @root = root task
170: * @func = task execution function
171: * @arg = 1st func argument
172: * @fd = fd handle
173: * @opt_data = Optional data
174: * @opt_dlen = Optional data length
175: * return: NULL error or !=NULL new queued task
176: */
177: sched_task_t *
178: schedRead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
179: void *opt_data, size_t opt_dlen)
180: {
181: sched_task_t *task;
182: void *ptr;
183:
184: if (!root || !func)
185: return NULL;
186:
187: /* get new task */
188: if (!(task = sched_useTask(root)))
189: return NULL;
190:
191: task->task_func = func;
192: TASK_TYPE(task) = taskREAD;
193: TASK_ROOT(task) = root;
194:
195: TASK_ARG(task) = arg;
196: TASK_FD(task) = fd;
197:
198: TASK_DATA(task) = opt_data;
199: TASK_DATLEN(task) = opt_dlen;
200:
201: if (root->root_hooks.hook_add.read)
202: ptr = root->root_hooks.hook_add.read(task, NULL);
203: else
204: ptr = NULL;
205:
206: if (!ptr) {
207: #ifdef HAVE_LIBPTHREAD
208: pthread_mutex_lock(&root->root_mtx[taskREAD]);
209: #endif
210: TAILQ_INSERT_TAIL(&root->root_read, TASK_ID(task), task_node);
211: #ifdef HAVE_LIBPTHREAD
212: pthread_mutex_unlock(&root->root_mtx[taskREAD]);
213: #endif
214: } else
215: task = sched_unuseTask(task);
216:
217: return task;
218: }
219:
220: /*
221: * schedWrite() - Add WRITE I/O task to scheduler queue
222: *
223: * @root = root task
224: * @func = task execution function
225: * @arg = 1st func argument
226: * @fd = fd handle
227: * @opt_data = Optional data
228: * @opt_dlen = Optional data length
229: * return: NULL error or !=NULL new queued task
230: */
231: sched_task_t *
232: schedWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
233: void *opt_data, size_t opt_dlen)
234: {
235: sched_task_t *task;
236: void *ptr;
237:
238: if (!root || !func)
239: return NULL;
240:
241: /* get new task */
242: if (!(task = sched_useTask(root)))
243: return NULL;
244:
245: task->task_func = func;
246: TASK_TYPE(task) = taskWRITE;
247: TASK_ROOT(task) = root;
248:
249: TASK_ARG(task) = arg;
250: TASK_FD(task) = fd;
251:
252: TASK_DATA(task) = opt_data;
253: TASK_DATLEN(task) = opt_dlen;
254:
255: if (root->root_hooks.hook_add.write)
256: ptr = root->root_hooks.hook_add.write(task, NULL);
257: else
258: ptr = NULL;
259:
260: if (!ptr) {
261: #ifdef HAVE_LIBPTHREAD
262: pthread_mutex_lock(&root->root_mtx[taskWRITE]);
263: #endif
264: TAILQ_INSERT_TAIL(&root->root_write, TASK_ID(task), task_node);
265: #ifdef HAVE_LIBPTHREAD
266: pthread_mutex_unlock(&root->root_mtx[taskWRITE]);
267: #endif
268: } else
269: task = sched_unuseTask(task);
270:
271: return task;
272: }
273:
274: /*
275: * schedNode() - Add NODE task to scheduler queue
276: *
277: * @root = root task
278: * @func = task execution function
279: * @arg = 1st func argument
280: * @fd = fd handle
281: * @opt_data = Optional data
282: * @opt_dlen = Optional data length
283: * return: NULL error or !=NULL new queued task
284: */
285: sched_task_t *
286: schedNode(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
287: void *opt_data, size_t opt_dlen)
288: {
289: sched_task_t *task;
290: void *ptr;
291:
292: if (!root || !func)
293: return NULL;
294:
295: /* get new task */
296: if (!(task = sched_useTask(root)))
297: return NULL;
298:
299: task->task_func = func;
300: TASK_TYPE(task) = taskNODE;
301: TASK_ROOT(task) = root;
302:
303: TASK_ARG(task) = arg;
304: TASK_FD(task) = fd;
305:
306: TASK_DATA(task) = opt_data;
307: TASK_DATLEN(task) = opt_dlen;
308:
309: if (root->root_hooks.hook_add.node)
310: ptr = root->root_hooks.hook_add.node(task, NULL);
311: else
312: ptr = NULL;
313:
314: if (!ptr) {
315: #ifdef HAVE_LIBPTHREAD
316: pthread_mutex_lock(&root->root_mtx[taskNODE]);
317: #endif
318: TAILQ_INSERT_TAIL(&root->root_node, TASK_ID(task), task_node);
319: #ifdef HAVE_LIBPTHREAD
320: pthread_mutex_unlock(&root->root_mtx[taskNODE]);
321: #endif
322: } else
323: task = sched_unuseTask(task);
324:
325: return task;
326: }
327:
328: /*
329: * schedProc() - Add PROC task to scheduler queue
330: *
331: * @root = root task
332: * @func = task execution function
333: * @arg = 1st func argument
334: * @pid = PID
335: * @opt_data = Optional data
336: * @opt_dlen = Optional data length
337: * return: NULL error or !=NULL new queued task
338: */
339: sched_task_t *
340: schedProc(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long pid,
341: void *opt_data, size_t opt_dlen)
342: {
343: sched_task_t *task;
344: void *ptr;
345:
346: if (!root || !func)
347: return NULL;
348:
349: /* get new task */
350: if (!(task = sched_useTask(root)))
351: return NULL;
352:
353: task->task_func = func;
354: TASK_TYPE(task) = taskPROC;
355: TASK_ROOT(task) = root;
356:
357: TASK_ARG(task) = arg;
358: TASK_VAL(task) = pid;
359:
360: TASK_DATA(task) = opt_data;
361: TASK_DATLEN(task) = opt_dlen;
362:
363: if (root->root_hooks.hook_add.proc)
364: ptr = root->root_hooks.hook_add.proc(task, NULL);
365: else
366: ptr = NULL;
367:
368: if (!ptr) {
369: #ifdef HAVE_LIBPTHREAD
370: pthread_mutex_lock(&root->root_mtx[taskPROC]);
371: #endif
372: TAILQ_INSERT_TAIL(&root->root_proc, TASK_ID(task), task_node);
373: #ifdef HAVE_LIBPTHREAD
374: pthread_mutex_unlock(&root->root_mtx[taskPROC]);
375: #endif
376: } else
377: task = sched_unuseTask(task);
378:
379: return task;
380: }
381:
382: /*
383: * schedUser() - Add trigger USER task to scheduler queue
384: *
385: * @root = root task
386: * @func = task execution function
387: * @arg = 1st func argument
388: * @id = Trigger ID
389: * @opt_data = Optional data
390: * @opt_dlen = Optional user's trigger flags
391: * return: NULL error or !=NULL new queued task
392: */
393: sched_task_t *
394: schedUser(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id,
395: void *opt_data, size_t opt_dlen)
396: {
397: #ifndef EVFILT_USER
398: sched_SetErr(ENOTSUP, "Not supported kevent() filter");
399: return NULL;
400: #else
401: sched_task_t *task;
402: void *ptr;
403:
404: if (!root || !func)
405: return NULL;
406:
407: /* get new task */
408: if (!(task = sched_useTask(root)))
409: return NULL;
410:
411: task->task_func = func;
412: TASK_TYPE(task) = taskUSER;
413: TASK_ROOT(task) = root;
414:
415: TASK_ARG(task) = arg;
416: TASK_VAL(task) = id;
417:
418: TASK_DATA(task) = opt_data;
419: TASK_DATLEN(task) = opt_dlen;
420:
421: if (root->root_hooks.hook_add.user)
422: ptr = root->root_hooks.hook_add.user(task, NULL);
423: else
424: ptr = NULL;
425:
426: if (!ptr) {
427: #ifdef HAVE_LIBPTHREAD
428: pthread_mutex_lock(&root->root_mtx[taskUSER]);
429: #endif
430: TAILQ_INSERT_TAIL(&root->root_user, TASK_ID(task), task_node);
431: #ifdef HAVE_LIBPTHREAD
432: pthread_mutex_unlock(&root->root_mtx[taskUSER]);
433: #endif
434: } else
435: task = sched_unuseTask(task);
436:
437: return task;
438: #endif
439: }
440:
441: /*
442: * schedSignal() - Add SIGNAL task to scheduler queue
443: *
444: * @root = root task
445: * @func = task execution function
446: * @arg = 1st func argument
447: * @sig = Signal
448: * @opt_data = Optional data
449: * @opt_dlen = Optional data length
450: * return: NULL error or !=NULL new queued task
451: */
452: sched_task_t *
453: schedSignal(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long sig,
454: void *opt_data, size_t opt_dlen)
455: {
456: sched_task_t *task;
457: void *ptr;
458:
459: if (!root || !func)
460: return NULL;
461:
462: /* get new task */
463: if (!(task = sched_useTask(root)))
464: return NULL;
465:
466: task->task_func = func;
467: TASK_TYPE(task) = taskSIGNAL;
468: TASK_ROOT(task) = root;
469:
470: TASK_ARG(task) = arg;
471: TASK_VAL(task) = sig;
472:
473: TASK_DATA(task) = opt_data;
474: TASK_DATLEN(task) = opt_dlen;
475:
476: if (root->root_hooks.hook_add.signal)
477: ptr = root->root_hooks.hook_add.signal(task, NULL);
478: else
479: ptr = NULL;
480:
481: if (!ptr) {
482: #ifdef HAVE_LIBPTHREAD
483: pthread_mutex_lock(&root->root_mtx[taskSIGNAL]);
484: #endif
485: TAILQ_INSERT_TAIL(&root->root_signal, TASK_ID(task), task_node);
486: #ifdef HAVE_LIBPTHREAD
487: pthread_mutex_unlock(&root->root_mtx[taskSIGNAL]);
488: #endif
489: } else
490: task = sched_unuseTask(task);
491:
492: return task;
493: }
494:
495: /*
496: * schedAlarm() - Add ALARM task to scheduler queue
497: *
498: * @root = root task
499: * @func = task execution function
500: * @arg = 1st func argument
501: * @ts = timeout argument structure, minimum alarm timer resolution is 1msec!
502: * @opt_data = Optional data
503: * @opt_dlen = Optional data length
504: * return: NULL error or !=NULL new queued task
505: */
506: sched_task_t *
507: schedAlarm(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,
508: void *opt_data, size_t opt_dlen)
509: {
510: sched_task_t *task;
511: void *ptr;
512:
513: if (!root || !func)
514: return NULL;
515:
516: /* get new task */
517: if (!(task = sched_useTask(root)))
518: return NULL;
519:
520: task->task_func = func;
521: TASK_TYPE(task) = taskALARM;
522: TASK_ROOT(task) = root;
523:
524: TASK_ARG(task) = arg;
525: TASK_TS(task) = ts;
526:
527: TASK_DATA(task) = opt_data;
528: TASK_DATLEN(task) = opt_dlen;
529:
530: if (root->root_hooks.hook_add.alarm)
531: ptr = root->root_hooks.hook_add.alarm(task, NULL);
532: else
533: ptr = NULL;
534:
535: if (!ptr) {
536: #ifdef HAVE_LIBPTHREAD
537: pthread_mutex_lock(&root->root_mtx[taskALARM]);
538: #endif
539: TAILQ_INSERT_TAIL(&root->root_alarm, TASK_ID(task), task_node);
540: #ifdef HAVE_LIBPTHREAD
541: pthread_mutex_unlock(&root->root_mtx[taskALARM]);
542: #endif
543: } else
544: task = sched_unuseTask(task);
545:
546: return task;
547: }
548:
549: #ifdef AIO_SUPPORT
550: /*
551: * schedAIO() - Add AIO task to scheduler queue
552: *
553: * @root = root task
554: * @func = task execution function
555: * @arg = 1st func argument
556: * @acb = AIO cb structure address
557: * @opt_data = Optional data
558: * @opt_dlen = Optional data length
559: * return: NULL error or !=NULL new queued task
560: */
561: sched_task_t *
562: schedAIO(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg,
563: struct aiocb * __restrict acb, void *opt_data, size_t opt_dlen)
564: {
565: sched_task_t *task;
566: void *ptr;
567:
568: if (!root || !func || !acb || !opt_dlen)
569: return NULL;
570:
571: /* get new task */
572: if (!(task = sched_useTask(root)))
573: return NULL;
574:
575: task->task_func = func;
576: TASK_TYPE(task) = taskAIO;
577: TASK_ROOT(task) = root;
578:
579: TASK_ARG(task) = arg;
580: TASK_VAL(task) = (u_long) acb;
581:
582: TASK_DATA(task) = opt_data;
583: TASK_DATLEN(task) = opt_dlen;
584:
585: if (root->root_hooks.hook_add.aio)
586: ptr = root->root_hooks.hook_add.aio(task, NULL);
587: else
588: ptr = NULL;
589:
590: if (!ptr) {
591: #ifdef HAVE_LIBPTHREAD
592: pthread_mutex_lock(&root->root_mtx[taskAIO]);
593: #endif
594: TAILQ_INSERT_TAIL(&root->root_aio, TASK_ID(task), task_node);
595: #ifdef HAVE_LIBPTHREAD
596: pthread_mutex_unlock(&root->root_mtx[taskAIO]);
597: #endif
598: } else
599: task = sched_unuseTask(task);
600:
601: return task;
602: }
603:
604: /*
605: * schedAIORead() - Add AIO read task to scheduler queue
606: *
607: * @root = root task
608: * @func = task execution function
609: * @arg = 1st func argument
610: * @fd = file descriptor
611: * @buffer = Buffer
612: * @buflen = Buffer length
613: * @offset = Offset from start of file, if =-1 from current position
614: * return: NULL error or !=NULL new queued task
615: */
616: inline sched_task_t *
617: schedAIORead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
618: void *buffer, size_t buflen, off_t offset)
619: {
620: struct aiocb *acb;
621: off_t off;
622:
623: if (!root || !func || !buffer || !buflen)
624: return NULL;
625:
626: if (offset == (off_t) -1) {
627: off = lseek(fd, 0, SEEK_CUR);
628: if (off == -1) {
629: LOGERR;
630: return NULL;
631: }
632: } else
633: off = offset;
634:
635: if (!(acb = malloc(sizeof(struct aiocb)))) {
636: LOGERR;
637: return NULL;
638: } else
639: memset(acb, 0, sizeof(struct aiocb));
640:
641: acb->aio_fildes = fd;
642: acb->aio_nbytes = buflen;
643: acb->aio_buf = buffer;
644: acb->aio_offset = off;
645: acb->aio_sigevent.sigev_notify = SIGEV_KEVENT;
646: acb->aio_sigevent.sigev_notify_kqueue = root->root_kq;
647: acb->aio_sigevent.sigev_value.sival_ptr = acb;
648:
649: if (aio_read(acb)) {
650: LOGERR;
651: free(acb);
652: return NULL;
653: }
654:
655: return schedAIO(root, func, arg, acb, buffer, buflen);
656: }
657:
658: /*
659: * schedAIOWrite() - Add AIO write task to scheduler queue
660: *
661: * @root = root task
662: * @func = task execution function
663: * @arg = 1st func argument
664: * @fd = file descriptor
665: * @buffer = Buffer
666: * @buflen = Buffer length
667: * @offset = Offset from start of file, if =-1 from current position
668: * return: NULL error or !=NULL new queued task
669: */
670: inline sched_task_t *
671: schedAIOWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
672: void *buffer, size_t buflen, off_t offset)
673: {
674: struct aiocb *acb;
675: off_t off;
676:
677: if (!root || !func || !buffer || !buflen)
678: return NULL;
679:
680: if (offset == (off_t) -1) {
681: off = lseek(fd, 0, SEEK_CUR);
682: if (off == -1) {
683: LOGERR;
684: return NULL;
685: }
686: } else
687: off = offset;
688:
689: if (!(acb = malloc(sizeof(struct aiocb)))) {
690: LOGERR;
691: return NULL;
692: } else
693: memset(acb, 0, sizeof(struct aiocb));
694:
695: acb->aio_fildes = fd;
696: acb->aio_nbytes = buflen;
697: acb->aio_buf = buffer;
698: acb->aio_offset = off;
699: acb->aio_sigevent.sigev_notify = SIGEV_KEVENT;
700: acb->aio_sigevent.sigev_notify_kqueue = root->root_kq;
701: acb->aio_sigevent.sigev_value.sival_ptr = acb;
702:
703: if (aio_write(acb)) {
704: LOGERR;
705: free(acb);
706: return NULL;
707: }
708:
709: return schedAIO(root, func, arg, acb, buffer, buflen);
710: }
711:
712: #ifdef EVFILT_LIO
713: /*
714: * schedLIO() - Add AIO bulk tasks to scheduler queue
715: *
716: * @root = root task
717: * @func = task execution function
718: * @arg = 1st func argument
719: * @acbs = AIO cb structure addresses
720: * @opt_data = Optional data
721: * @opt_dlen = Optional data length
722: * return: NULL error or !=NULL new queued task
723: */
724: sched_task_t *
725: schedLIO(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg,
726: struct aiocb ** __restrict acbs, void *opt_data, size_t opt_dlen)
727: {
728: sched_task_t *task;
729: void *ptr;
730:
731: if (!root || !func || !acbs || !opt_dlen)
732: return NULL;
733:
734: /* get new task */
735: if (!(task = sched_useTask(root)))
736: return NULL;
737:
738: task->task_func = func;
739: TASK_TYPE(task) = taskLIO;
740: TASK_ROOT(task) = root;
741:
742: TASK_ARG(task) = arg;
743: TASK_VAL(task) = (u_long) acbs;
744:
745: TASK_DATA(task) = opt_data;
746: TASK_DATLEN(task) = opt_dlen;
747:
748: if (root->root_hooks.hook_add.lio)
749: ptr = root->root_hooks.hook_add.lio(task, NULL);
750: else
751: ptr = NULL;
752:
753: if (!ptr) {
754: #ifdef HAVE_LIBPTHREAD
755: pthread_mutex_lock(&root->root_mtx[taskLIO]);
756: #endif
757: TAILQ_INSERT_TAIL(&root->root_lio, TASK_ID(task), task_node);
758: #ifdef HAVE_LIBPTHREAD
759: pthread_mutex_unlock(&root->root_mtx[taskLIO]);
760: #endif
761: } else
762: task = sched_unuseTask(task);
763:
764: return task;
765: }
766:
767: /*
768: * schedLIORead() - Add list of AIO read tasks to scheduler queue
769: *
770: * @root = root task
771: * @func = task execution function
772: * @arg = 1st func argument
773: * @fd = file descriptor
774: * @bufs = Buffer's list
775: * @nbufs = Number of Buffers
776: * @offset = Offset from start of file, if =-1 from current position
777: * return: NULL error or !=NULL new queued task
778: */
779: sched_task_t *
780: schedLIORead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
781: struct iovec *bufs, size_t nbufs, off_t offset)
782: {
783: struct sigevent sig;
784: struct aiocb **acb;
785: off_t off;
786: register int i;
787:
788: if (!root || !func || !bufs || !nbufs)
789: return NULL;
790:
791: if (offset == (off_t) -1) {
792: off = lseek(fd, 0, SEEK_CUR);
793: if (off == -1) {
794: LOGERR;
795: return NULL;
796: }
797: } else
798: off = offset;
799:
800: if (!(acb = calloc(sizeof(void*), nbufs))) {
801: LOGERR;
802: return NULL;
803: } else
804: memset(acb, 0, sizeof(void*) * nbufs);
805: for (i = 0; i < nbufs; off += bufs[i++].iov_len) {
806: acb[i] = malloc(sizeof(struct aiocb));
807: if (!acb[i]) {
808: LOGERR;
809: for (i = 0; i < nbufs; i++)
810: if (acb[i])
811: free(acb[i]);
812: free(acb);
813: return NULL;
814: } else
815: memset(acb[i], 0, sizeof(struct aiocb));
816: acb[i]->aio_fildes = fd;
817: acb[i]->aio_nbytes = bufs[i].iov_len;
818: acb[i]->aio_buf = bufs[i].iov_base;
819: acb[i]->aio_offset = off;
820: acb[i]->aio_lio_opcode = LIO_READ;
821: }
822: memset(&sig, 0, sizeof sig);
823: sig.sigev_notify = SIGEV_KEVENT;
824: sig.sigev_notify_kqueue = root->root_kq;
825: sig.sigev_value.sival_ptr = acb;
826:
827: if (lio_listio(LIO_NOWAIT, acb, nbufs, &sig)) {
828: LOGERR;
829: for (i = 0; i < nbufs; i++)
830: if (acb[i])
831: free(acb[i]);
832: free(acb);
833: return NULL;
834: }
835:
836: return schedLIO(root, func, arg, (void*) acb, bufs, nbufs);
837: }
838:
839: /*
840: * schedLIOWrite() - Add list of AIO write tasks to scheduler queue
841: *
842: * @root = root task
843: * @func = task execution function
844: * @arg = 1st func argument
845: * @fd = file descriptor
846: * @bufs = Buffer's list
847: * @nbufs = Number of Buffers
848: * @offset = Offset from start of file, if =-1 from current position
849: * return: NULL error or !=NULL new queued task
850: */
851: inline sched_task_t *
852: schedLIOWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
853: struct iovec *bufs, size_t nbufs, off_t offset)
854: {
855: struct sigevent sig;
856: struct aiocb **acb;
857: off_t off;
858: register int i;
859:
860: if (!root || !func || !bufs || !nbufs)
861: return NULL;
862:
863: if (offset == (off_t) -1) {
864: off = lseek(fd, 0, SEEK_CUR);
865: if (off == -1) {
866: LOGERR;
867: return NULL;
868: }
869: } else
870: off = offset;
871:
872: if (!(acb = calloc(sizeof(void*), nbufs))) {
873: LOGERR;
874: return NULL;
875: } else
876: memset(acb, 0, sizeof(void*) * nbufs);
877: for (i = 0; i < nbufs; off += bufs[i++].iov_len) {
878: acb[i] = malloc(sizeof(struct aiocb));
879: if (!acb[i]) {
880: LOGERR;
881: for (i = 0; i < nbufs; i++)
882: if (acb[i])
883: free(acb[i]);
884: free(acb);
885: return NULL;
886: } else
887: memset(acb[i], 0, sizeof(struct aiocb));
888: acb[i]->aio_fildes = fd;
889: acb[i]->aio_nbytes = bufs[i].iov_len;
890: acb[i]->aio_buf = bufs[i].iov_base;
891: acb[i]->aio_offset = off;
892: acb[i]->aio_lio_opcode = LIO_WRITE;
893: }
894: memset(&sig, 0, sizeof sig);
895: sig.sigev_notify = SIGEV_KEVENT;
896: sig.sigev_notify_kqueue = root->root_kq;
897: sig.sigev_value.sival_ptr = acb;
898:
899: if (lio_listio(LIO_NOWAIT, acb, nbufs, &sig)) {
900: LOGERR;
901: for (i = 0; i < nbufs; i++)
902: if (acb[i])
903: free(acb[i]);
904: free(acb);
905: return NULL;
906: }
907:
908: return schedLIO(root, func, arg, (void*) acb, bufs, nbufs);
909: }
910: #endif /* EVFILT_LIO */
911: #endif /* AIO_SUPPORT */
912:
913: /*
914: * schedTimer() - Add TIMER task to scheduler queue
915: *
916: * @root = root task
917: * @func = task execution function
918: * @arg = 1st func argument
919: * @ts = timeout argument structure
920: * @opt_data = Optional data
921: * @opt_dlen = Optional data length
922: * return: NULL error or !=NULL new queued task
923: */
924: sched_task_t *
925: schedTimer(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,
926: void *opt_data, size_t opt_dlen)
927: {
928: sched_task_t *task, *tmp, *t = NULL;
929: void *ptr;
930: struct timespec now;
931:
932: if (!root || !func)
933: return NULL;
934:
935: /* get new task */
936: if (!(task = sched_useTask(root)))
937: return NULL;
938:
939: task->task_func = func;
940: TASK_TYPE(task) = taskTIMER;
941: TASK_ROOT(task) = root;
942:
943: TASK_ARG(task) = arg;
944:
945: TASK_DATA(task) = opt_data;
946: TASK_DATLEN(task) = opt_dlen;
947:
948: /* calculate timeval structure */
949: clock_gettime(CLOCK_MONOTONIC, &now);
950: now.tv_sec += ts.tv_sec;
951: now.tv_nsec += ts.tv_nsec;
952: if (now.tv_nsec >= 1000000000L) {
953: now.tv_sec++;
954: now.tv_nsec -= 1000000000L;
955: } else if (now.tv_nsec < 0) {
956: now.tv_sec--;
957: now.tv_nsec += 1000000000L;
958: }
959: TASK_TS(task) = now;
960:
961: if (root->root_hooks.hook_add.timer)
962: ptr = root->root_hooks.hook_add.timer(task, NULL);
963: else
964: ptr = NULL;
965:
966: if (!ptr) {
967: #ifdef HAVE_LIBPTHREAD
968: pthread_mutex_lock(&root->root_mtx[taskTIMER]);
969: #endif
970: #ifdef TIMER_WITHOUT_SORT
971: TAILQ_INSERT_TAIL(&root->root_timer, TASK_ID(task), task_node);
972: #else
973: TAILQ_FOREACH_SAFE(t, &root->root_timer, task_node, tmp)
974: if (sched_timespeccmp(&TASK_TS(task), &TASK_TS(t), -) < 1)
975: break;
976: if (!t)
977: TAILQ_INSERT_TAIL(&root->root_timer, TASK_ID(task), task_node);
978: else
979: TAILQ_INSERT_BEFORE(t, TASK_ID(task), task_node);
980: #endif
981: #ifdef HAVE_LIBPTHREAD
982: pthread_mutex_unlock(&root->root_mtx[taskTIMER]);
983: #endif
984: } else
985: task = sched_unuseTask(task);
986:
987: return task;
988: }
989:
990: /*
991: * schedEvent() - Add EVENT task to scheduler queue
992: *
993: * @root = root task
994: * @func = task execution function
995: * @arg = 1st func argument
996: * @val = additional func argument
997: * @opt_data = Optional data
998: * @opt_dlen = Optional data length
999: * return: NULL error or !=NULL new queued task
1000: */
1001: sched_task_t *
1002: schedEvent(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val,
1003: void *opt_data, size_t opt_dlen)
1004: {
1005: sched_task_t *task;
1006: void *ptr;
1007:
1008: if (!root || !func)
1009: return NULL;
1010:
1011: /* get new task */
1012: if (!(task = sched_useTask(root)))
1013: return NULL;
1014:
1015: task->task_func = func;
1016: TASK_TYPE(task) = taskEVENT;
1017: TASK_ROOT(task) = root;
1018:
1019: TASK_ARG(task) = arg;
1020: TASK_VAL(task) = val;
1021:
1022: TASK_DATA(task) = opt_data;
1023: TASK_DATLEN(task) = opt_dlen;
1024:
1025: if (root->root_hooks.hook_add.event)
1026: ptr = root->root_hooks.hook_add.event(task, NULL);
1027: else
1028: ptr = NULL;
1029:
1030: if (!ptr) {
1031: #ifdef HAVE_LIBPTHREAD
1032: pthread_mutex_lock(&root->root_mtx[taskEVENT]);
1033: #endif
1034: TAILQ_INSERT_TAIL(&root->root_event, TASK_ID(task), task_node);
1035: #ifdef HAVE_LIBPTHREAD
1036: pthread_mutex_unlock(&root->root_mtx[taskEVENT]);
1037: #endif
1038: } else
1039: task = sched_unuseTask(task);
1040:
1041: return task;
1042: }
1043:
1044:
1045: /*
1046: * schedTask() - Add regular task to scheduler queue
1047: *
1048: * @root = root task
1049: * @func = task execution function
1050: * @arg = 1st func argument
1051: * @prio = regular task priority, 0 is hi priority for regular tasks
1052: * @opt_data = Optional data
1053: * @opt_dlen = Optional data length
1054: * return: NULL error or !=NULL new queued task
1055: */
1056: sched_task_t *
1057: schedTask(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long prio,
1058: void *opt_data, size_t opt_dlen)
1059: {
1060: sched_task_t *task, *tmp, *t = NULL;
1061: void *ptr;
1062:
1063: if (!root || !func)
1064: return NULL;
1065:
1066: /* get new task */
1067: if (!(task = sched_useTask(root)))
1068: return NULL;
1069:
1070: task->task_func = func;
1071: TASK_TYPE(task) = taskTASK;
1072: TASK_ROOT(task) = root;
1073:
1074: TASK_ARG(task) = arg;
1075: TASK_VAL(task) = prio;
1076:
1077: TASK_DATA(task) = opt_data;
1078: TASK_DATLEN(task) = opt_dlen;
1079:
1080: if (root->root_hooks.hook_add.task)
1081: ptr = root->root_hooks.hook_add.task(task, NULL);
1082: else
1083: ptr = NULL;
1084:
1085: if (!ptr) {
1086: #ifdef HAVE_LIBPTHREAD
1087: pthread_mutex_lock(&root->root_mtx[taskTASK]);
1088: #endif
1089: TAILQ_FOREACH_SAFE(t, &root->root_task, task_node, tmp)
1090: if (TASK_VAL(task) < TASK_VAL(t))
1091: break;
1092: if (!t)
1093: TAILQ_INSERT_TAIL(&root->root_task, TASK_ID(task), task_node);
1094: else
1095: TAILQ_INSERT_BEFORE(t, TASK_ID(task), task_node);
1096: #ifdef HAVE_LIBPTHREAD
1097: pthread_mutex_unlock(&root->root_mtx[taskTASK]);
1098: #endif
1099: } else
1100: task = sched_unuseTask(task);
1101:
1102: return task;
1103: }
1104:
1105: /*
1106: * schedSuspend() - Add Suspended task to scheduler queue
1107: *
1108: * @root = root task
1109: * @func = task execution function
1110: * @arg = 1st func argument
1111: * @id = Trigger ID
1112: * @opt_data = Optional data
1113: * @opt_dlen = Optional data length
1114: * return: NULL error or !=NULL new queued task
1115: */
1116: sched_task_t *
1117: schedSuspend(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id,
1118: void *opt_data, size_t opt_dlen)
1119: {
1120: sched_task_t *task;
1121: void *ptr;
1122:
1123: if (!root || !func)
1124: return NULL;
1125:
1126: /* get new task */
1127: if (!(task = sched_useTask(root)))
1128: return NULL;
1129:
1130: task->task_func = func;
1131: TASK_TYPE(task) = taskSUSPEND;
1132: TASK_ROOT(task) = root;
1133:
1134: TASK_ARG(task) = arg;
1135: TASK_VAL(task) = id;
1136:
1137: TASK_DATA(task) = opt_data;
1138: TASK_DATLEN(task) = opt_dlen;
1139:
1140: if (root->root_hooks.hook_add.suspend)
1141: ptr = root->root_hooks.hook_add.suspend(task, NULL);
1142: else
1143: ptr = NULL;
1144:
1145: if (!ptr) {
1146: #ifdef HAVE_LIBPTHREAD
1147: pthread_mutex_lock(&root->root_mtx[taskSUSPEND]);
1148: #endif
1149: TAILQ_INSERT_TAIL(&root->root_suspend, TASK_ID(task), task_node);
1150: #ifdef HAVE_LIBPTHREAD
1151: pthread_mutex_unlock(&root->root_mtx[taskSUSPEND]);
1152: #endif
1153: } else
1154: task = sched_unuseTask(task);
1155:
1156: return task;
1157: }
1158:
1159: /*
1160: * schedCallOnce() - Call once from scheduler
1161: *
1162: * @root = root task
1163: * @func = task execution function
1164: * @arg = 1st func argument
1165: * @val = additional func argument
1166: * @opt_data = Optional data
1167: * @opt_dlen = Optional data length
1168: * return: return value from called func
1169: */
1170: sched_task_t *
1171: schedCallOnce(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val,
1172: void *opt_data, size_t opt_dlen)
1173: {
1174: sched_task_t *task;
1175: void *ret;
1176:
1177: if (!root || !func)
1178: return NULL;
1179:
1180: /* get new task */
1181: if (!(task = sched_useTask(root)))
1182: return NULL;
1183:
1184: task->task_func = func;
1185: TASK_TYPE(task) = taskEVENT;
1186: TASK_ROOT(task) = root;
1187:
1188: TASK_ARG(task) = arg;
1189: TASK_VAL(task) = val;
1190:
1191: TASK_DATA(task) = opt_data;
1192: TASK_DATLEN(task) = opt_dlen;
1193:
1194: ret = schedCall(task);
1195:
1196: sched_unuseTask(task);
1197: return ret;
1198: }
1199:
1200: /*
1201: * schedThread() - Add thread task to scheduler queue
1202: *
1203: * @root = root task
1204: * @func = task execution function
1205: * @arg = 1st func argument
1206: * @detach = Detach thread from scheduler, if !=0
1207: * @opt_data = Optional data
1208: * @opt_dlen = Optional data length
1209: * return: NULL error or !=NULL new queued task
1210: */
1211: sched_task_t *
1212: schedThread(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int detach,
1213: void *opt_data, size_t opt_dlen)
1214: {
1215: #ifndef HAVE_LIBPTHREAD
1216: sched_SetErr(ENOTSUP, "Not supported thread tasks");
1217: return NULL;
1218: #endif
1219: sched_task_t *task;
1220: void *ptr;
1221: pthread_attr_t attr;
1222:
1223: if (!root || !func)
1224: return NULL;
1225:
1226: /* get new task */
1227: if (!(task = sched_useTask(root)))
1228: return NULL;
1229:
1230: task->task_func = func;
1231: TASK_TYPE(task) = taskTHREAD;
1232: TASK_ROOT(task) = root;
1233:
1234: TASK_ARG(task) = arg;
1235: TASK_FLAG(task) = detach ? PTHREAD_CREATE_DETACHED : PTHREAD_CREATE_JOINABLE;
1236:
1237: TASK_DATA(task) = opt_data;
1238: TASK_DATLEN(task) = opt_dlen;
1239:
1240: pthread_attr_init(&attr);
1241: pthread_attr_setdetachstate(&attr, TASK_FLAG(task));
1242: if (root->root_hooks.hook_add.thread)
1243: ptr = root->root_hooks.hook_add.thread(task, &attr);
1244: else
1245: ptr = NULL;
1246: pthread_attr_destroy(&attr);
1247:
1248: if (!ptr) {
1249: pthread_mutex_lock(&root->root_mtx[taskTHREAD]);
1250: TAILQ_INSERT_TAIL(&root->root_thread, TASK_ID(task), task_node);
1251: pthread_mutex_unlock(&root->root_mtx[taskTHREAD]);
1252: } else
1253: task = sched_unuseTask(task);
1254:
1255: return task;
1256: }
1257:
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>