Annotation of libaitsched/src/tasks.c, revision 1.13.2.1
1.1 misho 1: /*************************************************************************
2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.13.2.1! misho 6: * $Id: tasks.c,v 1.13 2012/08/21 12:54:39 misho Exp $
1.1 misho 7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
1.6 misho 15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
1.1 misho 16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47:
48:
1.13 misho 49: /*
50: * sched_useTask() - Get and init new task
51: *
52: * @root = root task
53: * return: NULL error or !=NULL prepared task
54: */
1.4 misho 55: inline sched_task_t *
1.13 misho 56: sched_useTask(sched_root_task_t * __restrict root)
1.4 misho 57: {
1.7 misho 58: sched_task_t *task, *tmp;
1.4 misho 59:
1.7 misho 60: TAILQ_FOREACH_SAFE(task, &root->root_unuse, task_node, tmp) {
1.4 misho 61: if (!TASK_ISLOCKED(task)) {
1.5 misho 62: #ifdef HAVE_LIBPTHREAD
63: pthread_mutex_lock(&root->root_mtx[taskUNUSE]);
64: #endif
1.4 misho 65: TAILQ_REMOVE(&root->root_unuse, task, task_node);
1.5 misho 66: #ifdef HAVE_LIBPTHREAD
67: pthread_mutex_unlock(&root->root_mtx[taskUNUSE]);
68: #endif
1.4 misho 69: break;
70: }
71: }
72:
73: if (!task) {
74: task = malloc(sizeof(sched_task_t));
75: if (!task) {
76: LOGERR;
77: return NULL;
78: }
79: }
80:
1.9 misho 81: memset(task, 0, sizeof(sched_task_t));
82: task->task_id = (uintptr_t) task;
1.4 misho 83: return task;
84: }
85:
1.13 misho 86: /*
87: * sched_unuseTask() - Unlock and put task to unuse queue
88: *
89: * @task = task
90: * return: always is NULL
91: */
1.4 misho 92: inline sched_task_t *
1.13 misho 93: sched_unuseTask(sched_task_t * __restrict task)
1.4 misho 94: {
95: TASK_UNLOCK(task);
1.5 misho 96: TASK_TYPE(task) = taskUNUSE;
97: #ifdef HAVE_LIBPTHREAD
98: pthread_mutex_lock(&TASK_ROOT(task)->root_mtx[taskUNUSE]);
99: #endif
1.9 misho 100: TAILQ_INSERT_TAIL(&TASK_ROOT(task)->root_unuse, TASK_ID(task), task_node);
1.5 misho 101: #ifdef HAVE_LIBPTHREAD
102: pthread_mutex_unlock(&TASK_ROOT(task)->root_mtx[taskUNUSE]);
103: #endif
1.4 misho 104: task = NULL;
105:
106: return task;
107: }
108:
1.13.2.1! misho 109: void *
! 110: _sched_threadJoin(sched_task_t *task)
! 111: {
! 112: void *ret = NULL;
! 113:
! 114: if (!task)
! 115: return NULL;
! 116:
! 117: #ifdef HAVE_LIBPTHREAD
! 118: pthread_join((pthread_t) TASK_VAL(task), &ret);
! 119: TASK_ROOT(task)->root_ret = ret;
! 120: #endif
! 121:
! 122: return NULL;
! 123: }
! 124:
! 125: /*
! 126: * sched_taskExit() - Exit routine for scheduler task, explicit required for thread tasks
! 127: *
! 128: * @task = current task
! 129: * @retcode = return code
! 130: * return: return code
! 131: */
! 132: inline void *
! 133: sched_taskExit(sched_task_t *task, intptr_t retcode)
! 134: {
! 135: if (!task || !TASK_ROOT(task))
! 136: return (void*) -1;
! 137:
! 138: if (TASK_ROOT(task)->root_hooks.hook_exec.exit)
! 139: TASK_ROOT(task)->root_hooks.hook_exec.exit(task, (void*) retcode);
! 140:
! 141: TASK_ROOT(task)->root_ret = (void*) retcode;
! 142:
! 143: #ifdef HAVE_LIBPTHREAD
! 144: if (TASK_TYPE(task) == taskTHREAD) {
! 145: if (TASK_FLAG(task) == PTHREAD_CREATE_JOINABLE) /* joinable thread */
! 146: schedTask(TASK_ROOT(task), _sched_threadJoin, TASK_ARG(task),
! 147: TASK_VAL(task), TASK_DATA(task), TASK_DATLEN(task));
! 148: sched_unuseTask(task);
! 149: pthread_exit((void*) retcode);
! 150: }
! 151: #endif
! 152:
! 153: return (void*) retcode;
! 154: }
! 155:
1.4 misho 156:
1.1 misho 157: /*
1.2 misho 158: * schedRead() - Add READ I/O task to scheduler queue
1.6 misho 159: *
1.1 misho 160: * @root = root task
161: * @func = task execution function
162: * @arg = 1st func argument
1.2 misho 163: * @fd = fd handle
1.5 misho 164: * @opt_data = Optional data
165: * @opt_dlen = Optional data length
1.1 misho 166: * return: NULL error or !=NULL new queued task
167: */
168: sched_task_t *
1.5 misho 169: schedRead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
170: void *opt_data, size_t opt_dlen)
1.1 misho 171: {
172: sched_task_t *task;
173: void *ptr;
174:
175: if (!root || !func)
176: return NULL;
177:
178: /* get new task */
1.13 misho 179: if (!(task = sched_useTask(root)))
1.4 misho 180: return NULL;
1.1 misho 181:
182: task->task_func = func;
1.5 misho 183: TASK_TYPE(task) = taskREAD;
184: TASK_ROOT(task) = root;
1.1 misho 185:
186: TASK_ARG(task) = arg;
1.2 misho 187: TASK_FD(task) = fd;
1.1 misho 188:
1.5 misho 189: TASK_DATA(task) = opt_data;
190: TASK_DATLEN(task) = opt_dlen;
191:
1.1 misho 192: if (root->root_hooks.hook_add.read)
193: ptr = root->root_hooks.hook_add.read(task, NULL);
194: else
195: ptr = NULL;
196:
1.5 misho 197: if (!ptr) {
198: #ifdef HAVE_LIBPTHREAD
199: pthread_mutex_lock(&root->root_mtx[taskREAD]);
200: #endif
1.9 misho 201: TAILQ_INSERT_TAIL(&root->root_read, TASK_ID(task), task_node);
1.5 misho 202: #ifdef HAVE_LIBPTHREAD
203: pthread_mutex_unlock(&root->root_mtx[taskREAD]);
204: #endif
205: } else
1.13 misho 206: task = sched_unuseTask(task);
1.1 misho 207:
208: return task;
209: }
210:
211: /*
1.2 misho 212: * schedWrite() - Add WRITE I/O task to scheduler queue
1.6 misho 213: *
1.1 misho 214: * @root = root task
215: * @func = task execution function
216: * @arg = 1st func argument
1.2 misho 217: * @fd = fd handle
1.5 misho 218: * @opt_data = Optional data
219: * @opt_dlen = Optional data length
1.1 misho 220: * return: NULL error or !=NULL new queued task
221: */
222: sched_task_t *
1.5 misho 223: schedWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
224: void *opt_data, size_t opt_dlen)
1.1 misho 225: {
226: sched_task_t *task;
227: void *ptr;
228:
229: if (!root || !func)
230: return NULL;
231:
232: /* get new task */
1.13 misho 233: if (!(task = sched_useTask(root)))
1.4 misho 234: return NULL;
1.1 misho 235:
236: task->task_func = func;
1.5 misho 237: TASK_TYPE(task) = taskWRITE;
238: TASK_ROOT(task) = root;
1.1 misho 239:
240: TASK_ARG(task) = arg;
1.2 misho 241: TASK_FD(task) = fd;
1.1 misho 242:
1.5 misho 243: TASK_DATA(task) = opt_data;
244: TASK_DATLEN(task) = opt_dlen;
245:
1.1 misho 246: if (root->root_hooks.hook_add.write)
247: ptr = root->root_hooks.hook_add.write(task, NULL);
248: else
249: ptr = NULL;
250:
1.5 misho 251: if (!ptr) {
252: #ifdef HAVE_LIBPTHREAD
253: pthread_mutex_lock(&root->root_mtx[taskWRITE]);
254: #endif
1.9 misho 255: TAILQ_INSERT_TAIL(&root->root_write, TASK_ID(task), task_node);
1.5 misho 256: #ifdef HAVE_LIBPTHREAD
257: pthread_mutex_unlock(&root->root_mtx[taskWRITE]);
258: #endif
259: } else
1.13 misho 260: task = sched_unuseTask(task);
1.1 misho 261:
262: return task;
263: }
264:
265: /*
1.9 misho 266: * schedNode() - Add NODE task to scheduler queue
267: *
268: * @root = root task
269: * @func = task execution function
270: * @arg = 1st func argument
271: * @fd = fd handle
272: * @opt_data = Optional data
273: * @opt_dlen = Optional data length
274: * return: NULL error or !=NULL new queued task
275: */
276: sched_task_t *
277: schedNode(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
278: void *opt_data, size_t opt_dlen)
279: {
280: sched_task_t *task;
281: void *ptr;
282:
283: if (!root || !func)
284: return NULL;
285:
286: /* get new task */
1.13 misho 287: if (!(task = sched_useTask(root)))
1.9 misho 288: return NULL;
289:
290: task->task_func = func;
291: TASK_TYPE(task) = taskNODE;
292: TASK_ROOT(task) = root;
293:
294: TASK_ARG(task) = arg;
295: TASK_FD(task) = fd;
296:
297: TASK_DATA(task) = opt_data;
298: TASK_DATLEN(task) = opt_dlen;
299:
300: if (root->root_hooks.hook_add.node)
301: ptr = root->root_hooks.hook_add.node(task, NULL);
302: else
303: ptr = NULL;
304:
305: if (!ptr) {
306: #ifdef HAVE_LIBPTHREAD
307: pthread_mutex_lock(&root->root_mtx[taskNODE]);
308: #endif
309: TAILQ_INSERT_TAIL(&root->root_node, TASK_ID(task), task_node);
310: #ifdef HAVE_LIBPTHREAD
311: pthread_mutex_unlock(&root->root_mtx[taskNODE]);
312: #endif
313: } else
1.13 misho 314: task = sched_unuseTask(task);
1.9 misho 315:
316: return task;
317: }
318:
319: /*
320: * schedProc() - Add PROC task to scheduler queue
321: *
322: * @root = root task
323: * @func = task execution function
324: * @arg = 1st func argument
325: * @pid = PID
326: * @opt_data = Optional data
327: * @opt_dlen = Optional data length
328: * return: NULL error or !=NULL new queued task
329: */
330: sched_task_t *
331: schedProc(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long pid,
332: void *opt_data, size_t opt_dlen)
333: {
334: sched_task_t *task;
335: void *ptr;
336:
337: if (!root || !func)
338: return NULL;
339:
340: /* get new task */
1.13 misho 341: if (!(task = sched_useTask(root)))
1.9 misho 342: return NULL;
343:
344: task->task_func = func;
345: TASK_TYPE(task) = taskPROC;
346: TASK_ROOT(task) = root;
347:
348: TASK_ARG(task) = arg;
349: TASK_VAL(task) = pid;
350:
351: TASK_DATA(task) = opt_data;
352: TASK_DATLEN(task) = opt_dlen;
353:
354: if (root->root_hooks.hook_add.proc)
355: ptr = root->root_hooks.hook_add.proc(task, NULL);
356: else
357: ptr = NULL;
358:
359: if (!ptr) {
360: #ifdef HAVE_LIBPTHREAD
361: pthread_mutex_lock(&root->root_mtx[taskPROC]);
362: #endif
363: TAILQ_INSERT_TAIL(&root->root_proc, TASK_ID(task), task_node);
364: #ifdef HAVE_LIBPTHREAD
365: pthread_mutex_unlock(&root->root_mtx[taskPROC]);
366: #endif
367: } else
1.13 misho 368: task = sched_unuseTask(task);
1.9 misho 369:
370: return task;
371: }
372:
373: /*
374: * schedUser() - Add trigger USER task to scheduler queue
375: *
376: * @root = root task
377: * @func = task execution function
378: * @arg = 1st func argument
379: * @id = Trigger ID
380: * @opt_data = Optional data
381: * @opt_dlen = Optional user's trigger flags
382: * return: NULL error or !=NULL new queued task
383: */
384: sched_task_t *
385: schedUser(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id,
386: void *opt_data, size_t opt_dlen)
387: {
388: #ifndef EVFILT_USER
389: sched_SetErr(ENOTSUP, "Not supported kevent() filter");
390: return NULL;
391: #else
392: sched_task_t *task;
393: void *ptr;
394:
395: if (!root || !func)
396: return NULL;
397:
398: /* get new task */
1.13 misho 399: if (!(task = sched_useTask(root)))
1.9 misho 400: return NULL;
401:
402: task->task_func = func;
403: TASK_TYPE(task) = taskUSER;
404: TASK_ROOT(task) = root;
405:
406: TASK_ARG(task) = arg;
407: TASK_VAL(task) = id;
408:
409: TASK_DATA(task) = opt_data;
410: TASK_DATLEN(task) = opt_dlen;
411:
412: if (root->root_hooks.hook_add.user)
413: ptr = root->root_hooks.hook_add.user(task, NULL);
414: else
415: ptr = NULL;
416:
417: if (!ptr) {
418: #ifdef HAVE_LIBPTHREAD
419: pthread_mutex_lock(&root->root_mtx[taskUSER]);
420: #endif
421: TAILQ_INSERT_TAIL(&root->root_user, TASK_ID(task), task_node);
422: #ifdef HAVE_LIBPTHREAD
423: pthread_mutex_unlock(&root->root_mtx[taskUSER]);
424: #endif
425: } else
1.13 misho 426: task = sched_unuseTask(task);
1.9 misho 427:
428: return task;
429: #endif
430: }
431:
432: /*
433: * schedSignal() - Add SIGNAL task to scheduler queue
434: *
435: * @root = root task
436: * @func = task execution function
437: * @arg = 1st func argument
438: * @sig = Signal
439: * @opt_data = Optional data
440: * @opt_dlen = Optional data length
441: * return: NULL error or !=NULL new queued task
442: */
443: sched_task_t *
444: schedSignal(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long sig,
445: void *opt_data, size_t opt_dlen)
446: {
447: sched_task_t *task;
448: void *ptr;
449:
450: if (!root || !func)
451: return NULL;
452:
453: /* get new task */
1.13 misho 454: if (!(task = sched_useTask(root)))
1.9 misho 455: return NULL;
456:
457: task->task_func = func;
458: TASK_TYPE(task) = taskSIGNAL;
459: TASK_ROOT(task) = root;
460:
461: TASK_ARG(task) = arg;
462: TASK_VAL(task) = sig;
463:
464: TASK_DATA(task) = opt_data;
465: TASK_DATLEN(task) = opt_dlen;
466:
467: if (root->root_hooks.hook_add.signal)
468: ptr = root->root_hooks.hook_add.signal(task, NULL);
469: else
470: ptr = NULL;
471:
472: if (!ptr) {
473: #ifdef HAVE_LIBPTHREAD
474: pthread_mutex_lock(&root->root_mtx[taskSIGNAL]);
475: #endif
476: TAILQ_INSERT_TAIL(&root->root_signal, TASK_ID(task), task_node);
477: #ifdef HAVE_LIBPTHREAD
478: pthread_mutex_unlock(&root->root_mtx[taskSIGNAL]);
479: #endif
480: } else
1.13 misho 481: task = sched_unuseTask(task);
1.9 misho 482:
483: return task;
484: }
485:
486: /*
1.8 misho 487: * schedAlarm() - Add ALARM task to scheduler queue
488: *
489: * @root = root task
490: * @func = task execution function
491: * @arg = 1st func argument
492: * @ts = timeout argument structure, minimum alarm timer resolution is 1msec!
493: * @opt_data = Optional data
494: * @opt_dlen = Optional data length
495: * return: NULL error or !=NULL new queued task
496: */
497: sched_task_t *
498: schedAlarm(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,
499: void *opt_data, size_t opt_dlen)
500: {
501: sched_task_t *task;
502: void *ptr;
503:
504: if (!root || !func)
505: return NULL;
506:
507: /* get new task */
1.13 misho 508: if (!(task = sched_useTask(root)))
1.8 misho 509: return NULL;
510:
511: task->task_func = func;
512: TASK_TYPE(task) = taskALARM;
513: TASK_ROOT(task) = root;
514:
515: TASK_ARG(task) = arg;
516: TASK_TS(task) = ts;
517:
518: TASK_DATA(task) = opt_data;
519: TASK_DATLEN(task) = opt_dlen;
520:
521: if (root->root_hooks.hook_add.alarm)
522: ptr = root->root_hooks.hook_add.alarm(task, NULL);
523: else
524: ptr = NULL;
525:
526: if (!ptr) {
527: #ifdef HAVE_LIBPTHREAD
528: pthread_mutex_lock(&root->root_mtx[taskALARM]);
529: #endif
1.9 misho 530: TAILQ_INSERT_TAIL(&root->root_alarm, TASK_ID(task), task_node);
1.8 misho 531: #ifdef HAVE_LIBPTHREAD
532: pthread_mutex_unlock(&root->root_mtx[taskALARM]);
533: #endif
534: } else
1.13 misho 535: task = sched_unuseTask(task);
1.8 misho 536:
537: return task;
538: }
539:
1.11 misho 540: #ifdef AIO_SUPPORT
541: /*
542: * schedAIO() - Add AIO task to scheduler queue
543: *
544: * @root = root task
545: * @func = task execution function
546: * @arg = 1st func argument
547: * @acb = AIO cb structure address
548: * @opt_data = Optional data
549: * @opt_dlen = Optional data length
550: * return: NULL error or !=NULL new queued task
551: */
552: sched_task_t *
553: schedAIO(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg,
554: struct aiocb * __restrict acb, void *opt_data, size_t opt_dlen)
555: {
556: sched_task_t *task;
557: void *ptr;
558:
559: if (!root || !func || !acb || !opt_dlen)
560: return NULL;
561:
562: /* get new task */
1.13 misho 563: if (!(task = sched_useTask(root)))
1.11 misho 564: return NULL;
565:
566: task->task_func = func;
567: TASK_TYPE(task) = taskAIO;
568: TASK_ROOT(task) = root;
569:
570: TASK_ARG(task) = arg;
571: TASK_VAL(task) = (u_long) acb;
572:
573: TASK_DATA(task) = opt_data;
574: TASK_DATLEN(task) = opt_dlen;
575:
576: if (root->root_hooks.hook_add.aio)
577: ptr = root->root_hooks.hook_add.aio(task, NULL);
578: else
579: ptr = NULL;
580:
581: if (!ptr) {
582: #ifdef HAVE_LIBPTHREAD
583: pthread_mutex_lock(&root->root_mtx[taskAIO]);
584: #endif
585: TAILQ_INSERT_TAIL(&root->root_aio, TASK_ID(task), task_node);
586: #ifdef HAVE_LIBPTHREAD
587: pthread_mutex_unlock(&root->root_mtx[taskAIO]);
588: #endif
589: } else
1.13 misho 590: task = sched_unuseTask(task);
1.11 misho 591:
592: return task;
593: }
594:
595: /*
596: * schedAIORead() - Add AIO read task to scheduler queue
597: *
598: * @root = root task
599: * @func = task execution function
600: * @arg = 1st func argument
601: * @fd = file descriptor
602: * @buffer = Buffer
603: * @buflen = Buffer length
604: * @offset = Offset from start of file, if =-1 from current position
605: * return: NULL error or !=NULL new queued task
606: */
607: inline sched_task_t *
608: schedAIORead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
609: void *buffer, size_t buflen, off_t offset)
610: {
611: struct aiocb *acb;
612: off_t off;
613:
614: if (!root || !func || !buffer || !buflen)
615: return NULL;
616:
617: if (offset == (off_t) -1) {
618: off = lseek(fd, 0, SEEK_CUR);
619: if (off == -1) {
620: LOGERR;
621: return NULL;
622: }
623: } else
624: off = offset;
625:
626: if (!(acb = malloc(sizeof(struct aiocb)))) {
627: LOGERR;
628: return NULL;
629: } else
630: memset(acb, 0, sizeof(struct aiocb));
631:
632: acb->aio_fildes = fd;
633: acb->aio_nbytes = buflen;
634: acb->aio_buf = buffer;
635: acb->aio_offset = off;
636: acb->aio_sigevent.sigev_notify = SIGEV_KEVENT;
637: acb->aio_sigevent.sigev_notify_kqueue = root->root_kq;
638: acb->aio_sigevent.sigev_value.sival_ptr = acb;
639:
640: if (aio_read(acb)) {
641: LOGERR;
642: free(acb);
643: return NULL;
644: }
645:
646: return schedAIO(root, func, arg, acb, buffer, buflen);
647: }
648:
649: /*
650: * schedAIOWrite() - Add AIO write task to scheduler queue
651: *
652: * @root = root task
653: * @func = task execution function
654: * @arg = 1st func argument
655: * @fd = file descriptor
656: * @buffer = Buffer
657: * @buflen = Buffer length
658: * @offset = Offset from start of file, if =-1 from current position
659: * return: NULL error or !=NULL new queued task
660: */
661: inline sched_task_t *
662: schedAIOWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
663: void *buffer, size_t buflen, off_t offset)
664: {
665: struct aiocb *acb;
666: off_t off;
667:
668: if (!root || !func || !buffer || !buflen)
669: return NULL;
670:
671: if (offset == (off_t) -1) {
672: off = lseek(fd, 0, SEEK_CUR);
673: if (off == -1) {
674: LOGERR;
675: return NULL;
676: }
677: } else
678: off = offset;
679:
680: if (!(acb = malloc(sizeof(struct aiocb)))) {
681: LOGERR;
682: return NULL;
683: } else
684: memset(acb, 0, sizeof(struct aiocb));
685:
686: acb->aio_fildes = fd;
687: acb->aio_nbytes = buflen;
688: acb->aio_buf = buffer;
689: acb->aio_offset = off;
690: acb->aio_sigevent.sigev_notify = SIGEV_KEVENT;
691: acb->aio_sigevent.sigev_notify_kqueue = root->root_kq;
692: acb->aio_sigevent.sigev_value.sival_ptr = acb;
693:
694: if (aio_write(acb)) {
695: LOGERR;
696: free(acb);
697: return NULL;
698: }
699:
700: return schedAIO(root, func, arg, acb, buffer, buflen);
701: }
702:
703: #ifdef EVFILT_LIO
704: /*
705: * schedLIO() - Add AIO bulk tasks to scheduler queue
706: *
707: * @root = root task
708: * @func = task execution function
709: * @arg = 1st func argument
710: * @acbs = AIO cb structure addresses
711: * @opt_data = Optional data
712: * @opt_dlen = Optional data length
713: * return: NULL error or !=NULL new queued task
714: */
715: sched_task_t *
716: schedLIO(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg,
717: struct aiocb ** __restrict acbs, void *opt_data, size_t opt_dlen)
718: {
719: sched_task_t *task;
720: void *ptr;
721:
722: if (!root || !func || !acbs || !opt_dlen)
723: return NULL;
724:
725: /* get new task */
1.13 misho 726: if (!(task = sched_useTask(root)))
1.11 misho 727: return NULL;
728:
729: task->task_func = func;
730: TASK_TYPE(task) = taskLIO;
731: TASK_ROOT(task) = root;
732:
733: TASK_ARG(task) = arg;
734: TASK_VAL(task) = (u_long) acbs;
735:
736: TASK_DATA(task) = opt_data;
737: TASK_DATLEN(task) = opt_dlen;
738:
739: if (root->root_hooks.hook_add.lio)
740: ptr = root->root_hooks.hook_add.lio(task, NULL);
741: else
742: ptr = NULL;
743:
744: if (!ptr) {
745: #ifdef HAVE_LIBPTHREAD
746: pthread_mutex_lock(&root->root_mtx[taskLIO]);
747: #endif
748: TAILQ_INSERT_TAIL(&root->root_lio, TASK_ID(task), task_node);
749: #ifdef HAVE_LIBPTHREAD
750: pthread_mutex_unlock(&root->root_mtx[taskLIO]);
751: #endif
752: } else
1.13 misho 753: task = sched_unuseTask(task);
1.11 misho 754:
755: return task;
756: }
757:
758: /*
759: * schedLIORead() - Add list of AIO read tasks to scheduler queue
760: *
761: * @root = root task
762: * @func = task execution function
763: * @arg = 1st func argument
764: * @fd = file descriptor
765: * @bufs = Buffer's list
766: * @nbufs = Number of Buffers
767: * @offset = Offset from start of file, if =-1 from current position
768: * return: NULL error or !=NULL new queued task
769: */
770: sched_task_t *
771: schedLIORead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
772: struct iovec *bufs, size_t nbufs, off_t offset)
773: {
774: struct sigevent sig;
775: struct aiocb **acb;
776: off_t off;
777: register int i;
778:
779: if (!root || !func || !bufs || !nbufs)
780: return NULL;
781:
782: if (offset == (off_t) -1) {
783: off = lseek(fd, 0, SEEK_CUR);
784: if (off == -1) {
785: LOGERR;
786: return NULL;
787: }
788: } else
789: off = offset;
790:
791: if (!(acb = calloc(sizeof(void*), nbufs))) {
792: LOGERR;
793: return NULL;
794: } else
795: memset(acb, 0, sizeof(void*) * nbufs);
796: for (i = 0; i < nbufs; off += bufs[i++].iov_len) {
797: acb[i] = malloc(sizeof(struct aiocb));
798: if (!acb[i]) {
799: LOGERR;
800: for (i = 0; i < nbufs; i++)
801: if (acb[i])
802: free(acb[i]);
803: free(acb);
804: return NULL;
805: } else
806: memset(acb[i], 0, sizeof(struct aiocb));
807: acb[i]->aio_fildes = fd;
808: acb[i]->aio_nbytes = bufs[i].iov_len;
809: acb[i]->aio_buf = bufs[i].iov_base;
810: acb[i]->aio_offset = off;
811: acb[i]->aio_lio_opcode = LIO_READ;
812: }
813: memset(&sig, 0, sizeof sig);
814: sig.sigev_notify = SIGEV_KEVENT;
815: sig.sigev_notify_kqueue = root->root_kq;
816: sig.sigev_value.sival_ptr = acb;
817:
818: if (lio_listio(LIO_NOWAIT, acb, nbufs, &sig)) {
819: LOGERR;
820: for (i = 0; i < nbufs; i++)
821: if (acb[i])
822: free(acb[i]);
823: free(acb);
824: return NULL;
825: }
826:
827: return schedLIO(root, func, arg, (void*) acb, bufs, nbufs);
828: }
829:
830: /*
831: * schedLIOWrite() - Add list of AIO write tasks to scheduler queue
832: *
833: * @root = root task
834: * @func = task execution function
835: * @arg = 1st func argument
836: * @fd = file descriptor
837: * @bufs = Buffer's list
838: * @nbufs = Number of Buffers
839: * @offset = Offset from start of file, if =-1 from current position
840: * return: NULL error or !=NULL new queued task
841: */
842: inline sched_task_t *
843: schedLIOWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
844: struct iovec *bufs, size_t nbufs, off_t offset)
845: {
846: struct sigevent sig;
847: struct aiocb **acb;
848: off_t off;
849: register int i;
850:
851: if (!root || !func || !bufs || !nbufs)
852: return NULL;
853:
854: if (offset == (off_t) -1) {
855: off = lseek(fd, 0, SEEK_CUR);
856: if (off == -1) {
857: LOGERR;
858: return NULL;
859: }
860: } else
861: off = offset;
862:
863: if (!(acb = calloc(sizeof(void*), nbufs))) {
864: LOGERR;
865: return NULL;
866: } else
867: memset(acb, 0, sizeof(void*) * nbufs);
868: for (i = 0; i < nbufs; off += bufs[i++].iov_len) {
869: acb[i] = malloc(sizeof(struct aiocb));
870: if (!acb[i]) {
871: LOGERR;
872: for (i = 0; i < nbufs; i++)
873: if (acb[i])
874: free(acb[i]);
875: free(acb);
876: return NULL;
877: } else
878: memset(acb[i], 0, sizeof(struct aiocb));
879: acb[i]->aio_fildes = fd;
880: acb[i]->aio_nbytes = bufs[i].iov_len;
881: acb[i]->aio_buf = bufs[i].iov_base;
882: acb[i]->aio_offset = off;
883: acb[i]->aio_lio_opcode = LIO_WRITE;
884: }
885: memset(&sig, 0, sizeof sig);
886: sig.sigev_notify = SIGEV_KEVENT;
887: sig.sigev_notify_kqueue = root->root_kq;
888: sig.sigev_value.sival_ptr = acb;
889:
890: if (lio_listio(LIO_NOWAIT, acb, nbufs, &sig)) {
891: LOGERR;
892: for (i = 0; i < nbufs; i++)
893: if (acb[i])
894: free(acb[i]);
895: free(acb);
896: return NULL;
897: }
898:
899: return schedLIO(root, func, arg, (void*) acb, bufs, nbufs);
900: }
901: #endif /* EVFILT_LIO */
902: #endif /* AIO_SUPPORT */
903:
1.8 misho 904: /*
1.1 misho 905: * schedTimer() - Add TIMER task to scheduler queue
1.6 misho 906: *
1.1 misho 907: * @root = root task
908: * @func = task execution function
909: * @arg = 1st func argument
1.5 misho 910: * @ts = timeout argument structure
911: * @opt_data = Optional data
912: * @opt_dlen = Optional data length
1.1 misho 913: * return: NULL error or !=NULL new queued task
914: */
915: sched_task_t *
1.5 misho 916: schedTimer(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,
917: void *opt_data, size_t opt_dlen)
1.1 misho 918: {
1.9 misho 919: sched_task_t *task, *tmp, *t = NULL;
1.1 misho 920: void *ptr;
1.5 misho 921: struct timespec now;
1.1 misho 922:
923: if (!root || !func)
924: return NULL;
925:
926: /* get new task */
1.13 misho 927: if (!(task = sched_useTask(root)))
1.4 misho 928: return NULL;
1.1 misho 929:
930: task->task_func = func;
1.5 misho 931: TASK_TYPE(task) = taskTIMER;
932: TASK_ROOT(task) = root;
1.1 misho 933:
934: TASK_ARG(task) = arg;
935:
1.5 misho 936: TASK_DATA(task) = opt_data;
937: TASK_DATLEN(task) = opt_dlen;
938:
1.1 misho 939: /* calculate timeval structure */
1.5 misho 940: clock_gettime(CLOCK_MONOTONIC, &now);
941: now.tv_sec += ts.tv_sec;
942: now.tv_nsec += ts.tv_nsec;
943: if (now.tv_nsec >= 1000000000L) {
1.1 misho 944: now.tv_sec++;
1.5 misho 945: now.tv_nsec -= 1000000000L;
946: } else if (now.tv_nsec < 0) {
1.1 misho 947: now.tv_sec--;
1.5 misho 948: now.tv_nsec += 1000000000L;
1.1 misho 949: }
1.5 misho 950: TASK_TS(task) = now;
1.1 misho 951:
952: if (root->root_hooks.hook_add.timer)
953: ptr = root->root_hooks.hook_add.timer(task, NULL);
954: else
955: ptr = NULL;
956:
957: if (!ptr) {
1.5 misho 958: #ifdef HAVE_LIBPTHREAD
959: pthread_mutex_lock(&root->root_mtx[taskTIMER]);
960: #endif
1.1 misho 961: #ifdef TIMER_WITHOUT_SORT
1.9 misho 962: TAILQ_INSERT_TAIL(&root->root_timer, TASK_ID(task), task_node);
1.1 misho 963: #else
1.9 misho 964: TAILQ_FOREACH_SAFE(t, &root->root_timer, task_node, tmp)
1.5 misho 965: if (sched_timespeccmp(&TASK_TS(task), &TASK_TS(t), -) < 1)
1.1 misho 966: break;
967: if (!t)
1.9 misho 968: TAILQ_INSERT_TAIL(&root->root_timer, TASK_ID(task), task_node);
1.1 misho 969: else
1.9 misho 970: TAILQ_INSERT_BEFORE(t, TASK_ID(task), task_node);
1.1 misho 971: #endif
1.5 misho 972: #ifdef HAVE_LIBPTHREAD
973: pthread_mutex_unlock(&root->root_mtx[taskTIMER]);
974: #endif
1.4 misho 975: } else
1.13 misho 976: task = sched_unuseTask(task);
1.1 misho 977:
978: return task;
979: }
980:
981: /*
982: * schedEvent() - Add EVENT task to scheduler queue
1.6 misho 983: *
1.1 misho 984: * @root = root task
985: * @func = task execution function
986: * @arg = 1st func argument
1.2 misho 987: * @val = additional func argument
1.5 misho 988: * @opt_data = Optional data
989: * @opt_dlen = Optional data length
1.1 misho 990: * return: NULL error or !=NULL new queued task
991: */
992: sched_task_t *
1.5 misho 993: schedEvent(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val,
994: void *opt_data, size_t opt_dlen)
1.1 misho 995: {
996: sched_task_t *task;
997: void *ptr;
998:
999: if (!root || !func)
1000: return NULL;
1001:
1002: /* get new task */
1.13 misho 1003: if (!(task = sched_useTask(root)))
1.4 misho 1004: return NULL;
1.1 misho 1005:
1006: task->task_func = func;
1.5 misho 1007: TASK_TYPE(task) = taskEVENT;
1008: TASK_ROOT(task) = root;
1.1 misho 1009:
1010: TASK_ARG(task) = arg;
1.2 misho 1011: TASK_VAL(task) = val;
1.1 misho 1012:
1.5 misho 1013: TASK_DATA(task) = opt_data;
1014: TASK_DATLEN(task) = opt_dlen;
1015:
1.1 misho 1016: if (root->root_hooks.hook_add.event)
1017: ptr = root->root_hooks.hook_add.event(task, NULL);
1018: else
1019: ptr = NULL;
1020:
1.5 misho 1021: if (!ptr) {
1022: #ifdef HAVE_LIBPTHREAD
1023: pthread_mutex_lock(&root->root_mtx[taskEVENT]);
1024: #endif
1.9 misho 1025: TAILQ_INSERT_TAIL(&root->root_event, TASK_ID(task), task_node);
1.5 misho 1026: #ifdef HAVE_LIBPTHREAD
1027: pthread_mutex_unlock(&root->root_mtx[taskEVENT]);
1028: #endif
1029: } else
1.13 misho 1030: task = sched_unuseTask(task);
1.1 misho 1031:
1032: return task;
1033: }
1034:
1035:
1036: /*
1.12 misho 1037: * schedTask() - Add regular task to scheduler queue
1.6 misho 1038: *
1.1 misho 1039: * @root = root task
1040: * @func = task execution function
1041: * @arg = 1st func argument
1.12 misho 1042: * @prio = regular task priority, 0 is hi priority for regular tasks
1.5 misho 1043: * @opt_data = Optional data
1044: * @opt_dlen = Optional data length
1.1 misho 1045: * return: NULL error or !=NULL new queued task
1046: */
1047: sched_task_t *
1.12 misho 1048: schedTask(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long prio,
1.5 misho 1049: void *opt_data, size_t opt_dlen)
1.1 misho 1050: {
1.12 misho 1051: sched_task_t *task, *tmp, *t = NULL;
1.1 misho 1052: void *ptr;
1053:
1054: if (!root || !func)
1055: return NULL;
1056:
1057: /* get new task */
1.13 misho 1058: if (!(task = sched_useTask(root)))
1.4 misho 1059: return NULL;
1.1 misho 1060:
1061: task->task_func = func;
1.12 misho 1062: TASK_TYPE(task) = taskTASK;
1.5 misho 1063: TASK_ROOT(task) = root;
1.1 misho 1064:
1065: TASK_ARG(task) = arg;
1.12 misho 1066: TASK_VAL(task) = prio;
1.1 misho 1067:
1.5 misho 1068: TASK_DATA(task) = opt_data;
1069: TASK_DATLEN(task) = opt_dlen;
1070:
1.12 misho 1071: if (root->root_hooks.hook_add.task)
1072: ptr = root->root_hooks.hook_add.task(task, NULL);
1.1 misho 1073: else
1074: ptr = NULL;
1075:
1.5 misho 1076: if (!ptr) {
1077: #ifdef HAVE_LIBPTHREAD
1.12 misho 1078: pthread_mutex_lock(&root->root_mtx[taskTASK]);
1.5 misho 1079: #endif
1.12 misho 1080: TAILQ_FOREACH_SAFE(t, &root->root_task, task_node, tmp)
1081: if (TASK_VAL(task) < TASK_VAL(t))
1082: break;
1083: if (!t)
1084: TAILQ_INSERT_TAIL(&root->root_task, TASK_ID(task), task_node);
1085: else
1086: TAILQ_INSERT_BEFORE(t, TASK_ID(task), task_node);
1.5 misho 1087: #ifdef HAVE_LIBPTHREAD
1.12 misho 1088: pthread_mutex_unlock(&root->root_mtx[taskTASK]);
1.5 misho 1089: #endif
1090: } else
1.13 misho 1091: task = sched_unuseTask(task);
1.1 misho 1092:
1093: return task;
1094: }
1095:
1096: /*
1.10 misho 1097: * schedSuspend() - Add Suspended task to scheduler queue
1098: *
1099: * @root = root task
1100: * @func = task execution function
1101: * @arg = 1st func argument
1102: * @id = Trigger ID
1103: * @opt_data = Optional data
1104: * @opt_dlen = Optional data length
1105: * return: NULL error or !=NULL new queued task
1106: */
1107: sched_task_t *
1108: schedSuspend(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id,
1109: void *opt_data, size_t opt_dlen)
1110: {
1111: sched_task_t *task;
1112: void *ptr;
1113:
1114: if (!root || !func)
1115: return NULL;
1116:
1117: /* get new task */
1.13 misho 1118: if (!(task = sched_useTask(root)))
1.10 misho 1119: return NULL;
1120:
1121: task->task_func = func;
1122: TASK_TYPE(task) = taskSUSPEND;
1123: TASK_ROOT(task) = root;
1124:
1125: TASK_ARG(task) = arg;
1126: TASK_VAL(task) = id;
1127:
1128: TASK_DATA(task) = opt_data;
1129: TASK_DATLEN(task) = opt_dlen;
1130:
1131: if (root->root_hooks.hook_add.suspend)
1132: ptr = root->root_hooks.hook_add.suspend(task, NULL);
1133: else
1134: ptr = NULL;
1135:
1136: if (!ptr) {
1137: #ifdef HAVE_LIBPTHREAD
1138: pthread_mutex_lock(&root->root_mtx[taskSUSPEND]);
1139: #endif
1140: TAILQ_INSERT_TAIL(&root->root_suspend, TASK_ID(task), task_node);
1141: #ifdef HAVE_LIBPTHREAD
1142: pthread_mutex_unlock(&root->root_mtx[taskSUSPEND]);
1143: #endif
1144: } else
1.13 misho 1145: task = sched_unuseTask(task);
1.10 misho 1146:
1147: return task;
1148: }
1149:
1150: /*
1.1 misho 1151: * schedCallOnce() - Call once from scheduler
1.6 misho 1152: *
1.1 misho 1153: * @root = root task
1154: * @func = task execution function
1155: * @arg = 1st func argument
1.2 misho 1156: * @val = additional func argument
1.5 misho 1157: * @opt_data = Optional data
1158: * @opt_dlen = Optional data length
1.2 misho 1159: * return: return value from called func
1.1 misho 1160: */
1161: sched_task_t *
1.5 misho 1162: schedCallOnce(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val,
1163: void *opt_data, size_t opt_dlen)
1.1 misho 1164: {
1165: sched_task_t *task;
1.2 misho 1166: void *ret;
1.1 misho 1167:
1168: if (!root || !func)
1169: return NULL;
1170:
1171: /* get new task */
1.13 misho 1172: if (!(task = sched_useTask(root)))
1.4 misho 1173: return NULL;
1.1 misho 1174:
1175: task->task_func = func;
1.5 misho 1176: TASK_TYPE(task) = taskEVENT;
1177: TASK_ROOT(task) = root;
1.1 misho 1178:
1179: TASK_ARG(task) = arg;
1.2 misho 1180: TASK_VAL(task) = val;
1.1 misho 1181:
1.5 misho 1182: TASK_DATA(task) = opt_data;
1183: TASK_DATLEN(task) = opt_dlen;
1184:
1.2 misho 1185: ret = schedCall(task);
1.1 misho 1186:
1.13 misho 1187: sched_unuseTask(task);
1.2 misho 1188: return ret;
1.1 misho 1189: }
1.13 misho 1190:
1191: /*
1192: * schedThread() - Add thread task to scheduler queue
1193: *
1194: * @root = root task
1195: * @func = task execution function
1196: * @arg = 1st func argument
1197: * @detach = Detach thread from scheduler, if !=0
1198: * @opt_data = Optional data
1199: * @opt_dlen = Optional data length
1200: * return: NULL error or !=NULL new queued task
1201: */
1202: sched_task_t *
1203: schedThread(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int detach,
1204: void *opt_data, size_t opt_dlen)
1205: {
1206: #ifndef HAVE_LIBPTHREAD
1207: sched_SetErr(ENOTSUP, "Not supported thread tasks");
1208: return NULL;
1209: #endif
1210: sched_task_t *task;
1211: void *ptr;
1212: pthread_attr_t attr;
1213:
1214: if (!root || !func)
1215: return NULL;
1216:
1217: /* get new task */
1218: if (!(task = sched_useTask(root)))
1219: return NULL;
1220:
1221: task->task_func = func;
1222: TASK_TYPE(task) = taskTHREAD;
1223: TASK_ROOT(task) = root;
1224:
1225: TASK_ARG(task) = arg;
1.13.2.1! misho 1226: TASK_FLAG(task) = detach ? PTHREAD_CREATE_DETACHED : PTHREAD_CREATE_JOINABLE;
1.13 misho 1227:
1228: TASK_DATA(task) = opt_data;
1229: TASK_DATLEN(task) = opt_dlen;
1230:
1231: pthread_attr_init(&attr);
1.13.2.1! misho 1232: pthread_attr_setdetachstate(&attr, TASK_FLAG(task));
1.13 misho 1233: if (root->root_hooks.hook_add.thread)
1234: ptr = root->root_hooks.hook_add.thread(task, &attr);
1235: else
1236: ptr = NULL;
1237: pthread_attr_destroy(&attr);
1238:
1239: if (!ptr) {
1240: pthread_mutex_lock(&root->root_mtx[taskTHREAD]);
1241: TAILQ_INSERT_TAIL(&root->root_thread, TASK_ID(task), task_node);
1242: pthread_mutex_unlock(&root->root_mtx[taskTHREAD]);
1243: } else
1244: task = sched_unuseTask(task);
1245:
1246: return task;
1247: }
1248:
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>