Annotation of libaitsched/src/tasks.c, revision 1.13.2.4
1.1 misho 1: /*************************************************************************
2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.13.2.4! misho 6: * $Id: tasks.c,v 1.13.2.3 2012/08/22 23:43:36 misho Exp $
1.1 misho 7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
1.6 misho 15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
1.1 misho 16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47:
48:
1.13 misho 49: /*
50: * sched_useTask() - Get and init new task
51: *
52: * @root = root task
53: * return: NULL error or !=NULL prepared task
54: */
1.4 misho 55: inline sched_task_t *
1.13 misho 56: sched_useTask(sched_root_task_t * __restrict root)
1.4 misho 57: {
1.7 misho 58: sched_task_t *task, *tmp;
1.4 misho 59:
1.5 misho 60: #ifdef HAVE_LIBPTHREAD
1.13.2.4! misho 61: pthread_mutex_lock(&root->root_mtx[taskUNUSE]);
1.5 misho 62: #endif
1.13.2.4! misho 63: TAILQ_FOREACH_SAFE(task, &root->root_unuse, task_node, tmp) {
! 64: if (!TASK_ISLOCKED(task)) {
1.4 misho 65: TAILQ_REMOVE(&root->root_unuse, task, task_node);
66: break;
67: }
68: }
1.13.2.4! misho 69: #ifdef HAVE_LIBPTHREAD
! 70: pthread_mutex_unlock(&root->root_mtx[taskUNUSE]);
! 71: #endif
1.4 misho 72:
73: if (!task) {
74: task = malloc(sizeof(sched_task_t));
75: if (!task) {
76: LOGERR;
77: return NULL;
78: }
79: }
80:
1.9 misho 81: memset(task, 0, sizeof(sched_task_t));
82: task->task_id = (uintptr_t) task;
1.4 misho 83: return task;
84: }
85:
1.13 misho 86: /*
87: * sched_unuseTask() - Unlock and put task to unuse queue
88: *
89: * @task = task
90: * return: always is NULL
91: */
1.4 misho 92: inline sched_task_t *
1.13 misho 93: sched_unuseTask(sched_task_t * __restrict task)
1.4 misho 94: {
95: TASK_UNLOCK(task);
1.5 misho 96: TASK_TYPE(task) = taskUNUSE;
97: #ifdef HAVE_LIBPTHREAD
98: pthread_mutex_lock(&TASK_ROOT(task)->root_mtx[taskUNUSE]);
99: #endif
1.9 misho 100: TAILQ_INSERT_TAIL(&TASK_ROOT(task)->root_unuse, TASK_ID(task), task_node);
1.5 misho 101: #ifdef HAVE_LIBPTHREAD
102: pthread_mutex_unlock(&TASK_ROOT(task)->root_mtx[taskUNUSE]);
103: #endif
1.4 misho 104: task = NULL;
105:
106: return task;
107: }
108:
1.13.2.2 misho 109: #pragma GCC visibility push(hidden)
110:
1.13.2.4! misho 111: #ifdef HAVE_LIBPTHREAD
! 112: static void
! 113: _sched_threadCleanup(sched_task_t *t)
! 114: {
! 115: if (!t || !TASK_ROOT(t))
! 116: return;
! 117:
! 118: if (TASK_FLAG(t) == PTHREAD_CREATE_JOINABLE)
! 119: pthread_detach(pthread_self());
! 120:
! 121: sched_unuseTask(t);
! 122: }
1.13.2.1 misho 123: void *
1.13.2.4! misho 124: _sched_threadWrapper(sched_task_t *t)
1.13.2.1 misho 125: {
126: void *ret = NULL;
127:
1.13.2.4! misho 128: if (!t || !TASK_ROOT(t))
! 129: pthread_exit(ret);
1.13.2.1 misho 130:
1.13.2.4! misho 131: pthread_cleanup_push((void (*)(void*)) _sched_threadCleanup, t);
1.13.2.1 misho 132:
1.13.2.4! misho 133: pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
! 134: pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
! 135: pthread_testcancel();
! 136:
! 137: ret = TASK_FUNC(t)(t);
! 138:
! 139: pthread_cleanup_pop(42);
! 140: TASK_ROOT(t)->root_ret = ret;
! 141: pthread_exit(ret);
1.13.2.1 misho 142: }
1.13.2.4! misho 143: #endif
1.13.2.1 misho 144:
1.13.2.2 misho 145: #pragma GCC visibility pop
146:
1.13.2.1 misho 147: /*
148: * sched_taskExit() - Exit routine for scheduler task, explicit required for thread tasks
149: *
150: * @task = current task
151: * @retcode = return code
152: * return: return code
153: */
154: inline void *
155: sched_taskExit(sched_task_t *task, intptr_t retcode)
156: {
157: if (!task || !TASK_ROOT(task))
158: return (void*) -1;
159:
160: if (TASK_ROOT(task)->root_hooks.hook_exec.exit)
161: TASK_ROOT(task)->root_hooks.hook_exec.exit(task, (void*) retcode);
162:
163: TASK_ROOT(task)->root_ret = (void*) retcode;
164: return (void*) retcode;
165: }
166:
1.4 misho 167:
1.1 misho 168: /*
1.2 misho 169: * schedRead() - Add READ I/O task to scheduler queue
1.6 misho 170: *
1.1 misho 171: * @root = root task
172: * @func = task execution function
173: * @arg = 1st func argument
1.2 misho 174: * @fd = fd handle
1.5 misho 175: * @opt_data = Optional data
176: * @opt_dlen = Optional data length
1.1 misho 177: * return: NULL error or !=NULL new queued task
178: */
179: sched_task_t *
1.5 misho 180: schedRead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
181: void *opt_data, size_t opt_dlen)
1.1 misho 182: {
183: sched_task_t *task;
184: void *ptr;
185:
186: if (!root || !func)
187: return NULL;
188:
189: /* get new task */
1.13 misho 190: if (!(task = sched_useTask(root)))
1.4 misho 191: return NULL;
1.1 misho 192:
193: task->task_func = func;
1.5 misho 194: TASK_TYPE(task) = taskREAD;
195: TASK_ROOT(task) = root;
1.1 misho 196:
197: TASK_ARG(task) = arg;
1.2 misho 198: TASK_FD(task) = fd;
1.1 misho 199:
1.5 misho 200: TASK_DATA(task) = opt_data;
201: TASK_DATLEN(task) = opt_dlen;
202:
1.1 misho 203: if (root->root_hooks.hook_add.read)
204: ptr = root->root_hooks.hook_add.read(task, NULL);
205: else
206: ptr = NULL;
207:
1.5 misho 208: if (!ptr) {
209: #ifdef HAVE_LIBPTHREAD
210: pthread_mutex_lock(&root->root_mtx[taskREAD]);
211: #endif
1.9 misho 212: TAILQ_INSERT_TAIL(&root->root_read, TASK_ID(task), task_node);
1.5 misho 213: #ifdef HAVE_LIBPTHREAD
214: pthread_mutex_unlock(&root->root_mtx[taskREAD]);
215: #endif
216: } else
1.13 misho 217: task = sched_unuseTask(task);
1.1 misho 218:
219: return task;
220: }
221:
222: /*
1.2 misho 223: * schedWrite() - Add WRITE I/O task to scheduler queue
1.6 misho 224: *
1.1 misho 225: * @root = root task
226: * @func = task execution function
227: * @arg = 1st func argument
1.2 misho 228: * @fd = fd handle
1.5 misho 229: * @opt_data = Optional data
230: * @opt_dlen = Optional data length
1.1 misho 231: * return: NULL error or !=NULL new queued task
232: */
233: sched_task_t *
1.5 misho 234: schedWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
235: void *opt_data, size_t opt_dlen)
1.1 misho 236: {
237: sched_task_t *task;
238: void *ptr;
239:
240: if (!root || !func)
241: return NULL;
242:
243: /* get new task */
1.13 misho 244: if (!(task = sched_useTask(root)))
1.4 misho 245: return NULL;
1.1 misho 246:
247: task->task_func = func;
1.5 misho 248: TASK_TYPE(task) = taskWRITE;
249: TASK_ROOT(task) = root;
1.1 misho 250:
251: TASK_ARG(task) = arg;
1.2 misho 252: TASK_FD(task) = fd;
1.1 misho 253:
1.5 misho 254: TASK_DATA(task) = opt_data;
255: TASK_DATLEN(task) = opt_dlen;
256:
1.1 misho 257: if (root->root_hooks.hook_add.write)
258: ptr = root->root_hooks.hook_add.write(task, NULL);
259: else
260: ptr = NULL;
261:
1.5 misho 262: if (!ptr) {
263: #ifdef HAVE_LIBPTHREAD
264: pthread_mutex_lock(&root->root_mtx[taskWRITE]);
265: #endif
1.9 misho 266: TAILQ_INSERT_TAIL(&root->root_write, TASK_ID(task), task_node);
1.5 misho 267: #ifdef HAVE_LIBPTHREAD
268: pthread_mutex_unlock(&root->root_mtx[taskWRITE]);
269: #endif
270: } else
1.13 misho 271: task = sched_unuseTask(task);
1.1 misho 272:
273: return task;
274: }
275:
276: /*
1.9 misho 277: * schedNode() - Add NODE task to scheduler queue
278: *
279: * @root = root task
280: * @func = task execution function
281: * @arg = 1st func argument
282: * @fd = fd handle
283: * @opt_data = Optional data
284: * @opt_dlen = Optional data length
285: * return: NULL error or !=NULL new queued task
286: */
287: sched_task_t *
288: schedNode(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
289: void *opt_data, size_t opt_dlen)
290: {
291: sched_task_t *task;
292: void *ptr;
293:
294: if (!root || !func)
295: return NULL;
296:
297: /* get new task */
1.13 misho 298: if (!(task = sched_useTask(root)))
1.9 misho 299: return NULL;
300:
301: task->task_func = func;
302: TASK_TYPE(task) = taskNODE;
303: TASK_ROOT(task) = root;
304:
305: TASK_ARG(task) = arg;
306: TASK_FD(task) = fd;
307:
308: TASK_DATA(task) = opt_data;
309: TASK_DATLEN(task) = opt_dlen;
310:
311: if (root->root_hooks.hook_add.node)
312: ptr = root->root_hooks.hook_add.node(task, NULL);
313: else
314: ptr = NULL;
315:
316: if (!ptr) {
317: #ifdef HAVE_LIBPTHREAD
318: pthread_mutex_lock(&root->root_mtx[taskNODE]);
319: #endif
320: TAILQ_INSERT_TAIL(&root->root_node, TASK_ID(task), task_node);
321: #ifdef HAVE_LIBPTHREAD
322: pthread_mutex_unlock(&root->root_mtx[taskNODE]);
323: #endif
324: } else
1.13 misho 325: task = sched_unuseTask(task);
1.9 misho 326:
327: return task;
328: }
329:
330: /*
331: * schedProc() - Add PROC task to scheduler queue
332: *
333: * @root = root task
334: * @func = task execution function
335: * @arg = 1st func argument
336: * @pid = PID
337: * @opt_data = Optional data
338: * @opt_dlen = Optional data length
339: * return: NULL error or !=NULL new queued task
340: */
341: sched_task_t *
342: schedProc(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long pid,
343: void *opt_data, size_t opt_dlen)
344: {
345: sched_task_t *task;
346: void *ptr;
347:
348: if (!root || !func)
349: return NULL;
350:
351: /* get new task */
1.13 misho 352: if (!(task = sched_useTask(root)))
1.9 misho 353: return NULL;
354:
355: task->task_func = func;
356: TASK_TYPE(task) = taskPROC;
357: TASK_ROOT(task) = root;
358:
359: TASK_ARG(task) = arg;
360: TASK_VAL(task) = pid;
361:
362: TASK_DATA(task) = opt_data;
363: TASK_DATLEN(task) = opt_dlen;
364:
365: if (root->root_hooks.hook_add.proc)
366: ptr = root->root_hooks.hook_add.proc(task, NULL);
367: else
368: ptr = NULL;
369:
370: if (!ptr) {
371: #ifdef HAVE_LIBPTHREAD
372: pthread_mutex_lock(&root->root_mtx[taskPROC]);
373: #endif
374: TAILQ_INSERT_TAIL(&root->root_proc, TASK_ID(task), task_node);
375: #ifdef HAVE_LIBPTHREAD
376: pthread_mutex_unlock(&root->root_mtx[taskPROC]);
377: #endif
378: } else
1.13 misho 379: task = sched_unuseTask(task);
1.9 misho 380:
381: return task;
382: }
383:
384: /*
385: * schedUser() - Add trigger USER task to scheduler queue
386: *
387: * @root = root task
388: * @func = task execution function
389: * @arg = 1st func argument
390: * @id = Trigger ID
391: * @opt_data = Optional data
392: * @opt_dlen = Optional user's trigger flags
393: * return: NULL error or !=NULL new queued task
394: */
395: sched_task_t *
396: schedUser(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id,
397: void *opt_data, size_t opt_dlen)
398: {
399: #ifndef EVFILT_USER
400: sched_SetErr(ENOTSUP, "Not supported kevent() filter");
401: return NULL;
402: #else
403: sched_task_t *task;
404: void *ptr;
405:
406: if (!root || !func)
407: return NULL;
408:
409: /* get new task */
1.13 misho 410: if (!(task = sched_useTask(root)))
1.9 misho 411: return NULL;
412:
413: task->task_func = func;
414: TASK_TYPE(task) = taskUSER;
415: TASK_ROOT(task) = root;
416:
417: TASK_ARG(task) = arg;
418: TASK_VAL(task) = id;
419:
420: TASK_DATA(task) = opt_data;
421: TASK_DATLEN(task) = opt_dlen;
422:
423: if (root->root_hooks.hook_add.user)
424: ptr = root->root_hooks.hook_add.user(task, NULL);
425: else
426: ptr = NULL;
427:
428: if (!ptr) {
429: #ifdef HAVE_LIBPTHREAD
430: pthread_mutex_lock(&root->root_mtx[taskUSER]);
431: #endif
432: TAILQ_INSERT_TAIL(&root->root_user, TASK_ID(task), task_node);
433: #ifdef HAVE_LIBPTHREAD
434: pthread_mutex_unlock(&root->root_mtx[taskUSER]);
435: #endif
436: } else
1.13 misho 437: task = sched_unuseTask(task);
1.9 misho 438:
439: return task;
440: #endif
441: }
442:
443: /*
444: * schedSignal() - Add SIGNAL task to scheduler queue
445: *
446: * @root = root task
447: * @func = task execution function
448: * @arg = 1st func argument
449: * @sig = Signal
450: * @opt_data = Optional data
451: * @opt_dlen = Optional data length
452: * return: NULL error or !=NULL new queued task
453: */
454: sched_task_t *
455: schedSignal(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long sig,
456: void *opt_data, size_t opt_dlen)
457: {
458: sched_task_t *task;
459: void *ptr;
460:
461: if (!root || !func)
462: return NULL;
463:
464: /* get new task */
1.13 misho 465: if (!(task = sched_useTask(root)))
1.9 misho 466: return NULL;
467:
468: task->task_func = func;
469: TASK_TYPE(task) = taskSIGNAL;
470: TASK_ROOT(task) = root;
471:
472: TASK_ARG(task) = arg;
473: TASK_VAL(task) = sig;
474:
475: TASK_DATA(task) = opt_data;
476: TASK_DATLEN(task) = opt_dlen;
477:
478: if (root->root_hooks.hook_add.signal)
479: ptr = root->root_hooks.hook_add.signal(task, NULL);
480: else
481: ptr = NULL;
482:
483: if (!ptr) {
484: #ifdef HAVE_LIBPTHREAD
485: pthread_mutex_lock(&root->root_mtx[taskSIGNAL]);
486: #endif
487: TAILQ_INSERT_TAIL(&root->root_signal, TASK_ID(task), task_node);
488: #ifdef HAVE_LIBPTHREAD
489: pthread_mutex_unlock(&root->root_mtx[taskSIGNAL]);
490: #endif
491: } else
1.13 misho 492: task = sched_unuseTask(task);
1.9 misho 493:
494: return task;
495: }
496:
497: /*
1.8 misho 498: * schedAlarm() - Add ALARM task to scheduler queue
499: *
500: * @root = root task
501: * @func = task execution function
502: * @arg = 1st func argument
503: * @ts = timeout argument structure, minimum alarm timer resolution is 1msec!
504: * @opt_data = Optional data
505: * @opt_dlen = Optional data length
506: * return: NULL error or !=NULL new queued task
507: */
508: sched_task_t *
509: schedAlarm(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,
510: void *opt_data, size_t opt_dlen)
511: {
512: sched_task_t *task;
513: void *ptr;
514:
515: if (!root || !func)
516: return NULL;
517:
518: /* get new task */
1.13 misho 519: if (!(task = sched_useTask(root)))
1.8 misho 520: return NULL;
521:
522: task->task_func = func;
523: TASK_TYPE(task) = taskALARM;
524: TASK_ROOT(task) = root;
525:
526: TASK_ARG(task) = arg;
527: TASK_TS(task) = ts;
528:
529: TASK_DATA(task) = opt_data;
530: TASK_DATLEN(task) = opt_dlen;
531:
532: if (root->root_hooks.hook_add.alarm)
533: ptr = root->root_hooks.hook_add.alarm(task, NULL);
534: else
535: ptr = NULL;
536:
537: if (!ptr) {
538: #ifdef HAVE_LIBPTHREAD
539: pthread_mutex_lock(&root->root_mtx[taskALARM]);
540: #endif
1.9 misho 541: TAILQ_INSERT_TAIL(&root->root_alarm, TASK_ID(task), task_node);
1.8 misho 542: #ifdef HAVE_LIBPTHREAD
543: pthread_mutex_unlock(&root->root_mtx[taskALARM]);
544: #endif
545: } else
1.13 misho 546: task = sched_unuseTask(task);
1.8 misho 547:
548: return task;
549: }
550:
1.11 misho 551: #ifdef AIO_SUPPORT
552: /*
553: * schedAIO() - Add AIO task to scheduler queue
554: *
555: * @root = root task
556: * @func = task execution function
557: * @arg = 1st func argument
558: * @acb = AIO cb structure address
559: * @opt_data = Optional data
560: * @opt_dlen = Optional data length
561: * return: NULL error or !=NULL new queued task
562: */
563: sched_task_t *
564: schedAIO(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg,
565: struct aiocb * __restrict acb, void *opt_data, size_t opt_dlen)
566: {
567: sched_task_t *task;
568: void *ptr;
569:
570: if (!root || !func || !acb || !opt_dlen)
571: return NULL;
572:
573: /* get new task */
1.13 misho 574: if (!(task = sched_useTask(root)))
1.11 misho 575: return NULL;
576:
577: task->task_func = func;
578: TASK_TYPE(task) = taskAIO;
579: TASK_ROOT(task) = root;
580:
581: TASK_ARG(task) = arg;
582: TASK_VAL(task) = (u_long) acb;
583:
584: TASK_DATA(task) = opt_data;
585: TASK_DATLEN(task) = opt_dlen;
586:
587: if (root->root_hooks.hook_add.aio)
588: ptr = root->root_hooks.hook_add.aio(task, NULL);
589: else
590: ptr = NULL;
591:
592: if (!ptr) {
593: #ifdef HAVE_LIBPTHREAD
594: pthread_mutex_lock(&root->root_mtx[taskAIO]);
595: #endif
596: TAILQ_INSERT_TAIL(&root->root_aio, TASK_ID(task), task_node);
597: #ifdef HAVE_LIBPTHREAD
598: pthread_mutex_unlock(&root->root_mtx[taskAIO]);
599: #endif
600: } else
1.13 misho 601: task = sched_unuseTask(task);
1.11 misho 602:
603: return task;
604: }
605:
606: /*
607: * schedAIORead() - Add AIO read task to scheduler queue
608: *
609: * @root = root task
610: * @func = task execution function
611: * @arg = 1st func argument
612: * @fd = file descriptor
613: * @buffer = Buffer
614: * @buflen = Buffer length
615: * @offset = Offset from start of file, if =-1 from current position
616: * return: NULL error or !=NULL new queued task
617: */
618: inline sched_task_t *
619: schedAIORead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
620: void *buffer, size_t buflen, off_t offset)
621: {
622: struct aiocb *acb;
623: off_t off;
624:
625: if (!root || !func || !buffer || !buflen)
626: return NULL;
627:
628: if (offset == (off_t) -1) {
629: off = lseek(fd, 0, SEEK_CUR);
630: if (off == -1) {
631: LOGERR;
632: return NULL;
633: }
634: } else
635: off = offset;
636:
637: if (!(acb = malloc(sizeof(struct aiocb)))) {
638: LOGERR;
639: return NULL;
640: } else
641: memset(acb, 0, sizeof(struct aiocb));
642:
643: acb->aio_fildes = fd;
644: acb->aio_nbytes = buflen;
645: acb->aio_buf = buffer;
646: acb->aio_offset = off;
647: acb->aio_sigevent.sigev_notify = SIGEV_KEVENT;
648: acb->aio_sigevent.sigev_notify_kqueue = root->root_kq;
649: acb->aio_sigevent.sigev_value.sival_ptr = acb;
650:
651: if (aio_read(acb)) {
652: LOGERR;
653: free(acb);
654: return NULL;
655: }
656:
657: return schedAIO(root, func, arg, acb, buffer, buflen);
658: }
659:
660: /*
661: * schedAIOWrite() - Add AIO write task to scheduler queue
662: *
663: * @root = root task
664: * @func = task execution function
665: * @arg = 1st func argument
666: * @fd = file descriptor
667: * @buffer = Buffer
668: * @buflen = Buffer length
669: * @offset = Offset from start of file, if =-1 from current position
670: * return: NULL error or !=NULL new queued task
671: */
672: inline sched_task_t *
673: schedAIOWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
674: void *buffer, size_t buflen, off_t offset)
675: {
676: struct aiocb *acb;
677: off_t off;
678:
679: if (!root || !func || !buffer || !buflen)
680: return NULL;
681:
682: if (offset == (off_t) -1) {
683: off = lseek(fd, 0, SEEK_CUR);
684: if (off == -1) {
685: LOGERR;
686: return NULL;
687: }
688: } else
689: off = offset;
690:
691: if (!(acb = malloc(sizeof(struct aiocb)))) {
692: LOGERR;
693: return NULL;
694: } else
695: memset(acb, 0, sizeof(struct aiocb));
696:
697: acb->aio_fildes = fd;
698: acb->aio_nbytes = buflen;
699: acb->aio_buf = buffer;
700: acb->aio_offset = off;
701: acb->aio_sigevent.sigev_notify = SIGEV_KEVENT;
702: acb->aio_sigevent.sigev_notify_kqueue = root->root_kq;
703: acb->aio_sigevent.sigev_value.sival_ptr = acb;
704:
705: if (aio_write(acb)) {
706: LOGERR;
707: free(acb);
708: return NULL;
709: }
710:
711: return schedAIO(root, func, arg, acb, buffer, buflen);
712: }
713:
714: #ifdef EVFILT_LIO
715: /*
716: * schedLIO() - Add AIO bulk tasks to scheduler queue
717: *
718: * @root = root task
719: * @func = task execution function
720: * @arg = 1st func argument
721: * @acbs = AIO cb structure addresses
722: * @opt_data = Optional data
723: * @opt_dlen = Optional data length
724: * return: NULL error or !=NULL new queued task
725: */
726: sched_task_t *
727: schedLIO(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg,
728: struct aiocb ** __restrict acbs, void *opt_data, size_t opt_dlen)
729: {
730: sched_task_t *task;
731: void *ptr;
732:
733: if (!root || !func || !acbs || !opt_dlen)
734: return NULL;
735:
736: /* get new task */
1.13 misho 737: if (!(task = sched_useTask(root)))
1.11 misho 738: return NULL;
739:
740: task->task_func = func;
741: TASK_TYPE(task) = taskLIO;
742: TASK_ROOT(task) = root;
743:
744: TASK_ARG(task) = arg;
745: TASK_VAL(task) = (u_long) acbs;
746:
747: TASK_DATA(task) = opt_data;
748: TASK_DATLEN(task) = opt_dlen;
749:
750: if (root->root_hooks.hook_add.lio)
751: ptr = root->root_hooks.hook_add.lio(task, NULL);
752: else
753: ptr = NULL;
754:
755: if (!ptr) {
756: #ifdef HAVE_LIBPTHREAD
757: pthread_mutex_lock(&root->root_mtx[taskLIO]);
758: #endif
759: TAILQ_INSERT_TAIL(&root->root_lio, TASK_ID(task), task_node);
760: #ifdef HAVE_LIBPTHREAD
761: pthread_mutex_unlock(&root->root_mtx[taskLIO]);
762: #endif
763: } else
1.13 misho 764: task = sched_unuseTask(task);
1.11 misho 765:
766: return task;
767: }
768:
769: /*
770: * schedLIORead() - Add list of AIO read tasks to scheduler queue
771: *
772: * @root = root task
773: * @func = task execution function
774: * @arg = 1st func argument
775: * @fd = file descriptor
776: * @bufs = Buffer's list
777: * @nbufs = Number of Buffers
778: * @offset = Offset from start of file, if =-1 from current position
779: * return: NULL error or !=NULL new queued task
780: */
781: sched_task_t *
782: schedLIORead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
783: struct iovec *bufs, size_t nbufs, off_t offset)
784: {
785: struct sigevent sig;
786: struct aiocb **acb;
787: off_t off;
788: register int i;
789:
790: if (!root || !func || !bufs || !nbufs)
791: return NULL;
792:
793: if (offset == (off_t) -1) {
794: off = lseek(fd, 0, SEEK_CUR);
795: if (off == -1) {
796: LOGERR;
797: return NULL;
798: }
799: } else
800: off = offset;
801:
802: if (!(acb = calloc(sizeof(void*), nbufs))) {
803: LOGERR;
804: return NULL;
805: } else
806: memset(acb, 0, sizeof(void*) * nbufs);
807: for (i = 0; i < nbufs; off += bufs[i++].iov_len) {
808: acb[i] = malloc(sizeof(struct aiocb));
809: if (!acb[i]) {
810: LOGERR;
811: for (i = 0; i < nbufs; i++)
812: if (acb[i])
813: free(acb[i]);
814: free(acb);
815: return NULL;
816: } else
817: memset(acb[i], 0, sizeof(struct aiocb));
818: acb[i]->aio_fildes = fd;
819: acb[i]->aio_nbytes = bufs[i].iov_len;
820: acb[i]->aio_buf = bufs[i].iov_base;
821: acb[i]->aio_offset = off;
822: acb[i]->aio_lio_opcode = LIO_READ;
823: }
824: memset(&sig, 0, sizeof sig);
825: sig.sigev_notify = SIGEV_KEVENT;
826: sig.sigev_notify_kqueue = root->root_kq;
827: sig.sigev_value.sival_ptr = acb;
828:
829: if (lio_listio(LIO_NOWAIT, acb, nbufs, &sig)) {
830: LOGERR;
831: for (i = 0; i < nbufs; i++)
832: if (acb[i])
833: free(acb[i]);
834: free(acb);
835: return NULL;
836: }
837:
838: return schedLIO(root, func, arg, (void*) acb, bufs, nbufs);
839: }
840:
841: /*
842: * schedLIOWrite() - Add list of AIO write tasks to scheduler queue
843: *
844: * @root = root task
845: * @func = task execution function
846: * @arg = 1st func argument
847: * @fd = file descriptor
848: * @bufs = Buffer's list
849: * @nbufs = Number of Buffers
850: * @offset = Offset from start of file, if =-1 from current position
851: * return: NULL error or !=NULL new queued task
852: */
853: inline sched_task_t *
854: schedLIOWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
855: struct iovec *bufs, size_t nbufs, off_t offset)
856: {
857: struct sigevent sig;
858: struct aiocb **acb;
859: off_t off;
860: register int i;
861:
862: if (!root || !func || !bufs || !nbufs)
863: return NULL;
864:
865: if (offset == (off_t) -1) {
866: off = lseek(fd, 0, SEEK_CUR);
867: if (off == -1) {
868: LOGERR;
869: return NULL;
870: }
871: } else
872: off = offset;
873:
874: if (!(acb = calloc(sizeof(void*), nbufs))) {
875: LOGERR;
876: return NULL;
877: } else
878: memset(acb, 0, sizeof(void*) * nbufs);
879: for (i = 0; i < nbufs; off += bufs[i++].iov_len) {
880: acb[i] = malloc(sizeof(struct aiocb));
881: if (!acb[i]) {
882: LOGERR;
883: for (i = 0; i < nbufs; i++)
884: if (acb[i])
885: free(acb[i]);
886: free(acb);
887: return NULL;
888: } else
889: memset(acb[i], 0, sizeof(struct aiocb));
890: acb[i]->aio_fildes = fd;
891: acb[i]->aio_nbytes = bufs[i].iov_len;
892: acb[i]->aio_buf = bufs[i].iov_base;
893: acb[i]->aio_offset = off;
894: acb[i]->aio_lio_opcode = LIO_WRITE;
895: }
896: memset(&sig, 0, sizeof sig);
897: sig.sigev_notify = SIGEV_KEVENT;
898: sig.sigev_notify_kqueue = root->root_kq;
899: sig.sigev_value.sival_ptr = acb;
900:
901: if (lio_listio(LIO_NOWAIT, acb, nbufs, &sig)) {
902: LOGERR;
903: for (i = 0; i < nbufs; i++)
904: if (acb[i])
905: free(acb[i]);
906: free(acb);
907: return NULL;
908: }
909:
910: return schedLIO(root, func, arg, (void*) acb, bufs, nbufs);
911: }
912: #endif /* EVFILT_LIO */
913: #endif /* AIO_SUPPORT */
914:
1.8 misho 915: /*
1.1 misho 916: * schedTimer() - Add TIMER task to scheduler queue
1.6 misho 917: *
1.1 misho 918: * @root = root task
919: * @func = task execution function
920: * @arg = 1st func argument
1.5 misho 921: * @ts = timeout argument structure
922: * @opt_data = Optional data
923: * @opt_dlen = Optional data length
1.1 misho 924: * return: NULL error or !=NULL new queued task
925: */
926: sched_task_t *
1.5 misho 927: schedTimer(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,
928: void *opt_data, size_t opt_dlen)
1.1 misho 929: {
1.9 misho 930: sched_task_t *task, *tmp, *t = NULL;
1.1 misho 931: void *ptr;
1.5 misho 932: struct timespec now;
1.1 misho 933:
934: if (!root || !func)
935: return NULL;
936:
937: /* get new task */
1.13 misho 938: if (!(task = sched_useTask(root)))
1.4 misho 939: return NULL;
1.1 misho 940:
941: task->task_func = func;
1.5 misho 942: TASK_TYPE(task) = taskTIMER;
943: TASK_ROOT(task) = root;
1.1 misho 944:
945: TASK_ARG(task) = arg;
946:
1.5 misho 947: TASK_DATA(task) = opt_data;
948: TASK_DATLEN(task) = opt_dlen;
949:
1.1 misho 950: /* calculate timeval structure */
1.5 misho 951: clock_gettime(CLOCK_MONOTONIC, &now);
952: now.tv_sec += ts.tv_sec;
953: now.tv_nsec += ts.tv_nsec;
954: if (now.tv_nsec >= 1000000000L) {
1.1 misho 955: now.tv_sec++;
1.5 misho 956: now.tv_nsec -= 1000000000L;
957: } else if (now.tv_nsec < 0) {
1.1 misho 958: now.tv_sec--;
1.5 misho 959: now.tv_nsec += 1000000000L;
1.1 misho 960: }
1.5 misho 961: TASK_TS(task) = now;
1.1 misho 962:
963: if (root->root_hooks.hook_add.timer)
964: ptr = root->root_hooks.hook_add.timer(task, NULL);
965: else
966: ptr = NULL;
967:
968: if (!ptr) {
1.5 misho 969: #ifdef HAVE_LIBPTHREAD
970: pthread_mutex_lock(&root->root_mtx[taskTIMER]);
971: #endif
1.1 misho 972: #ifdef TIMER_WITHOUT_SORT
1.9 misho 973: TAILQ_INSERT_TAIL(&root->root_timer, TASK_ID(task), task_node);
1.1 misho 974: #else
1.9 misho 975: TAILQ_FOREACH_SAFE(t, &root->root_timer, task_node, tmp)
1.5 misho 976: if (sched_timespeccmp(&TASK_TS(task), &TASK_TS(t), -) < 1)
1.1 misho 977: break;
978: if (!t)
1.9 misho 979: TAILQ_INSERT_TAIL(&root->root_timer, TASK_ID(task), task_node);
1.1 misho 980: else
1.9 misho 981: TAILQ_INSERT_BEFORE(t, TASK_ID(task), task_node);
1.1 misho 982: #endif
1.5 misho 983: #ifdef HAVE_LIBPTHREAD
984: pthread_mutex_unlock(&root->root_mtx[taskTIMER]);
985: #endif
1.4 misho 986: } else
1.13 misho 987: task = sched_unuseTask(task);
1.1 misho 988:
989: return task;
990: }
991:
992: /*
993: * schedEvent() - Add EVENT task to scheduler queue
1.6 misho 994: *
1.1 misho 995: * @root = root task
996: * @func = task execution function
997: * @arg = 1st func argument
1.2 misho 998: * @val = additional func argument
1.5 misho 999: * @opt_data = Optional data
1000: * @opt_dlen = Optional data length
1.1 misho 1001: * return: NULL error or !=NULL new queued task
1002: */
1003: sched_task_t *
1.5 misho 1004: schedEvent(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val,
1005: void *opt_data, size_t opt_dlen)
1.1 misho 1006: {
1007: sched_task_t *task;
1008: void *ptr;
1009:
1010: if (!root || !func)
1011: return NULL;
1012:
1013: /* get new task */
1.13 misho 1014: if (!(task = sched_useTask(root)))
1.4 misho 1015: return NULL;
1.1 misho 1016:
1017: task->task_func = func;
1.5 misho 1018: TASK_TYPE(task) = taskEVENT;
1019: TASK_ROOT(task) = root;
1.1 misho 1020:
1021: TASK_ARG(task) = arg;
1.2 misho 1022: TASK_VAL(task) = val;
1.1 misho 1023:
1.5 misho 1024: TASK_DATA(task) = opt_data;
1025: TASK_DATLEN(task) = opt_dlen;
1026:
1.1 misho 1027: if (root->root_hooks.hook_add.event)
1028: ptr = root->root_hooks.hook_add.event(task, NULL);
1029: else
1030: ptr = NULL;
1031:
1.5 misho 1032: if (!ptr) {
1033: #ifdef HAVE_LIBPTHREAD
1034: pthread_mutex_lock(&root->root_mtx[taskEVENT]);
1035: #endif
1.9 misho 1036: TAILQ_INSERT_TAIL(&root->root_event, TASK_ID(task), task_node);
1.5 misho 1037: #ifdef HAVE_LIBPTHREAD
1038: pthread_mutex_unlock(&root->root_mtx[taskEVENT]);
1039: #endif
1040: } else
1.13 misho 1041: task = sched_unuseTask(task);
1.1 misho 1042:
1043: return task;
1044: }
1045:
1046:
1047: /*
1.12 misho 1048: * schedTask() - Add regular task to scheduler queue
1.6 misho 1049: *
1.1 misho 1050: * @root = root task
1051: * @func = task execution function
1052: * @arg = 1st func argument
1.12 misho 1053: * @prio = regular task priority, 0 is hi priority for regular tasks
1.5 misho 1054: * @opt_data = Optional data
1055: * @opt_dlen = Optional data length
1.1 misho 1056: * return: NULL error or !=NULL new queued task
1057: */
1058: sched_task_t *
1.12 misho 1059: schedTask(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long prio,
1.5 misho 1060: void *opt_data, size_t opt_dlen)
1.1 misho 1061: {
1.12 misho 1062: sched_task_t *task, *tmp, *t = NULL;
1.1 misho 1063: void *ptr;
1064:
1065: if (!root || !func)
1066: return NULL;
1067:
1068: /* get new task */
1.13 misho 1069: if (!(task = sched_useTask(root)))
1.4 misho 1070: return NULL;
1.1 misho 1071:
1072: task->task_func = func;
1.12 misho 1073: TASK_TYPE(task) = taskTASK;
1.5 misho 1074: TASK_ROOT(task) = root;
1.1 misho 1075:
1076: TASK_ARG(task) = arg;
1.12 misho 1077: TASK_VAL(task) = prio;
1.1 misho 1078:
1.5 misho 1079: TASK_DATA(task) = opt_data;
1080: TASK_DATLEN(task) = opt_dlen;
1081:
1.12 misho 1082: if (root->root_hooks.hook_add.task)
1083: ptr = root->root_hooks.hook_add.task(task, NULL);
1.1 misho 1084: else
1085: ptr = NULL;
1086:
1.5 misho 1087: if (!ptr) {
1088: #ifdef HAVE_LIBPTHREAD
1.12 misho 1089: pthread_mutex_lock(&root->root_mtx[taskTASK]);
1.5 misho 1090: #endif
1.12 misho 1091: TAILQ_FOREACH_SAFE(t, &root->root_task, task_node, tmp)
1092: if (TASK_VAL(task) < TASK_VAL(t))
1093: break;
1094: if (!t)
1095: TAILQ_INSERT_TAIL(&root->root_task, TASK_ID(task), task_node);
1096: else
1097: TAILQ_INSERT_BEFORE(t, TASK_ID(task), task_node);
1.5 misho 1098: #ifdef HAVE_LIBPTHREAD
1.12 misho 1099: pthread_mutex_unlock(&root->root_mtx[taskTASK]);
1.5 misho 1100: #endif
1101: } else
1.13 misho 1102: task = sched_unuseTask(task);
1.1 misho 1103:
1104: return task;
1105: }
1106:
1107: /*
1.10 misho 1108: * schedSuspend() - Add Suspended task to scheduler queue
1109: *
1110: * @root = root task
1111: * @func = task execution function
1112: * @arg = 1st func argument
1113: * @id = Trigger ID
1114: * @opt_data = Optional data
1115: * @opt_dlen = Optional data length
1116: * return: NULL error or !=NULL new queued task
1117: */
1118: sched_task_t *
1119: schedSuspend(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id,
1120: void *opt_data, size_t opt_dlen)
1121: {
1122: sched_task_t *task;
1123: void *ptr;
1124:
1125: if (!root || !func)
1126: return NULL;
1127:
1128: /* get new task */
1.13 misho 1129: if (!(task = sched_useTask(root)))
1.10 misho 1130: return NULL;
1131:
1132: task->task_func = func;
1133: TASK_TYPE(task) = taskSUSPEND;
1134: TASK_ROOT(task) = root;
1135:
1136: TASK_ARG(task) = arg;
1137: TASK_VAL(task) = id;
1138:
1139: TASK_DATA(task) = opt_data;
1140: TASK_DATLEN(task) = opt_dlen;
1141:
1142: if (root->root_hooks.hook_add.suspend)
1143: ptr = root->root_hooks.hook_add.suspend(task, NULL);
1144: else
1145: ptr = NULL;
1146:
1147: if (!ptr) {
1148: #ifdef HAVE_LIBPTHREAD
1149: pthread_mutex_lock(&root->root_mtx[taskSUSPEND]);
1150: #endif
1151: TAILQ_INSERT_TAIL(&root->root_suspend, TASK_ID(task), task_node);
1152: #ifdef HAVE_LIBPTHREAD
1153: pthread_mutex_unlock(&root->root_mtx[taskSUSPEND]);
1154: #endif
1155: } else
1.13 misho 1156: task = sched_unuseTask(task);
1.10 misho 1157:
1158: return task;
1159: }
1160:
1161: /*
1.1 misho 1162: * schedCallOnce() - Call once from scheduler
1.6 misho 1163: *
1.1 misho 1164: * @root = root task
1165: * @func = task execution function
1166: * @arg = 1st func argument
1.2 misho 1167: * @val = additional func argument
1.5 misho 1168: * @opt_data = Optional data
1169: * @opt_dlen = Optional data length
1.2 misho 1170: * return: return value from called func
1.1 misho 1171: */
1172: sched_task_t *
1.5 misho 1173: schedCallOnce(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val,
1174: void *opt_data, size_t opt_dlen)
1.1 misho 1175: {
1176: sched_task_t *task;
1.2 misho 1177: void *ret;
1.1 misho 1178:
1179: if (!root || !func)
1180: return NULL;
1181:
1182: /* get new task */
1.13 misho 1183: if (!(task = sched_useTask(root)))
1.4 misho 1184: return NULL;
1.1 misho 1185:
1186: task->task_func = func;
1.5 misho 1187: TASK_TYPE(task) = taskEVENT;
1188: TASK_ROOT(task) = root;
1.1 misho 1189:
1190: TASK_ARG(task) = arg;
1.2 misho 1191: TASK_VAL(task) = val;
1.1 misho 1192:
1.5 misho 1193: TASK_DATA(task) = opt_data;
1194: TASK_DATLEN(task) = opt_dlen;
1195:
1.2 misho 1196: ret = schedCall(task);
1.1 misho 1197:
1.13 misho 1198: sched_unuseTask(task);
1.2 misho 1199: return ret;
1.1 misho 1200: }
1.13 misho 1201:
1202: /*
1203: * schedThread() - Add thread task to scheduler queue
1204: *
1205: * @root = root task
1206: * @func = task execution function
1207: * @arg = 1st func argument
1208: * @detach = Detach thread from scheduler, if !=0
1209: * @opt_data = Optional data
1210: * @opt_dlen = Optional data length
1211: * return: NULL error or !=NULL new queued task
1212: */
1213: sched_task_t *
1214: schedThread(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int detach,
1215: void *opt_data, size_t opt_dlen)
1216: {
1217: #ifndef HAVE_LIBPTHREAD
1218: sched_SetErr(ENOTSUP, "Not supported thread tasks");
1219: return NULL;
1220: #endif
1221: sched_task_t *task;
1222: void *ptr;
1223: pthread_attr_t attr;
1224:
1225: if (!root || !func)
1226: return NULL;
1227:
1228: /* get new task */
1229: if (!(task = sched_useTask(root)))
1230: return NULL;
1231:
1232: task->task_func = func;
1233: TASK_TYPE(task) = taskTHREAD;
1234: TASK_ROOT(task) = root;
1235:
1236: TASK_ARG(task) = arg;
1.13.2.1 misho 1237: TASK_FLAG(task) = detach ? PTHREAD_CREATE_DETACHED : PTHREAD_CREATE_JOINABLE;
1.13 misho 1238:
1239: TASK_DATA(task) = opt_data;
1240: TASK_DATLEN(task) = opt_dlen;
1241:
1242: pthread_attr_init(&attr);
1.13.2.1 misho 1243: pthread_attr_setdetachstate(&attr, TASK_FLAG(task));
1.13 misho 1244: if (root->root_hooks.hook_add.thread)
1245: ptr = root->root_hooks.hook_add.thread(task, &attr);
1246: else
1247: ptr = NULL;
1248: pthread_attr_destroy(&attr);
1249:
1250: if (!ptr) {
1251: pthread_mutex_lock(&root->root_mtx[taskTHREAD]);
1252: TAILQ_INSERT_TAIL(&root->root_thread, TASK_ID(task), task_node);
1253: pthread_mutex_unlock(&root->root_mtx[taskTHREAD]);
1254: } else
1255: task = sched_unuseTask(task);
1256:
1257: return task;
1258: }
1259:
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>