Annotation of libaitsched/src/tasks.c, revision 1.13.2.2
1.1 misho 1: /*************************************************************************
2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.13.2.2! misho 6: * $Id: tasks.c,v 1.13.2.1 2012/08/22 10:33:45 misho Exp $
1.1 misho 7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
1.6 misho 15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
1.1 misho 16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47:
48:
1.13 misho 49: /*
50: * sched_useTask() - Get and init new task
51: *
52: * @root = root task
53: * return: NULL error or !=NULL prepared task
54: */
1.4 misho 55: inline sched_task_t *
1.13 misho 56: sched_useTask(sched_root_task_t * __restrict root)
1.4 misho 57: {
1.7 misho 58: sched_task_t *task, *tmp;
1.4 misho 59:
1.7 misho 60: TAILQ_FOREACH_SAFE(task, &root->root_unuse, task_node, tmp) {
1.4 misho 61: if (!TASK_ISLOCKED(task)) {
1.5 misho 62: #ifdef HAVE_LIBPTHREAD
63: pthread_mutex_lock(&root->root_mtx[taskUNUSE]);
64: #endif
1.4 misho 65: TAILQ_REMOVE(&root->root_unuse, task, task_node);
1.5 misho 66: #ifdef HAVE_LIBPTHREAD
67: pthread_mutex_unlock(&root->root_mtx[taskUNUSE]);
68: #endif
1.4 misho 69: break;
70: }
71: }
72:
73: if (!task) {
74: task = malloc(sizeof(sched_task_t));
75: if (!task) {
76: LOGERR;
77: return NULL;
78: }
79: }
80:
1.9 misho 81: memset(task, 0, sizeof(sched_task_t));
82: task->task_id = (uintptr_t) task;
1.4 misho 83: return task;
84: }
85:
1.13 misho 86: /*
87: * sched_unuseTask() - Unlock and put task to unuse queue
88: *
89: * @task = task
90: * return: always is NULL
91: */
1.4 misho 92: inline sched_task_t *
1.13 misho 93: sched_unuseTask(sched_task_t * __restrict task)
1.4 misho 94: {
95: TASK_UNLOCK(task);
1.5 misho 96: TASK_TYPE(task) = taskUNUSE;
97: #ifdef HAVE_LIBPTHREAD
98: pthread_mutex_lock(&TASK_ROOT(task)->root_mtx[taskUNUSE]);
99: #endif
1.9 misho 100: TAILQ_INSERT_TAIL(&TASK_ROOT(task)->root_unuse, TASK_ID(task), task_node);
1.5 misho 101: #ifdef HAVE_LIBPTHREAD
102: pthread_mutex_unlock(&TASK_ROOT(task)->root_mtx[taskUNUSE]);
103: #endif
1.4 misho 104: task = NULL;
105:
106: return task;
107: }
108:
1.13.2.2! misho 109: #pragma GCC visibility push(hidden)
! 110:
1.13.2.1 misho 111: void *
112: _sched_threadJoin(sched_task_t *task)
113: {
114: void *ret = NULL;
115:
116: if (!task)
117: return NULL;
118:
119: #ifdef HAVE_LIBPTHREAD
120: pthread_join((pthread_t) TASK_VAL(task), &ret);
121: TASK_ROOT(task)->root_ret = ret;
122: #endif
123:
124: return NULL;
125: }
126:
1.13.2.2! misho 127: #pragma GCC visibility pop
! 128:
1.13.2.1 misho 129: /*
130: * sched_taskExit() - Exit routine for scheduler task, explicit required for thread tasks
131: *
132: * @task = current task
133: * @retcode = return code
134: * return: return code
135: */
136: inline void *
137: sched_taskExit(sched_task_t *task, intptr_t retcode)
138: {
139: if (!task || !TASK_ROOT(task))
140: return (void*) -1;
141:
142: if (TASK_ROOT(task)->root_hooks.hook_exec.exit)
143: TASK_ROOT(task)->root_hooks.hook_exec.exit(task, (void*) retcode);
144:
145: TASK_ROOT(task)->root_ret = (void*) retcode;
146:
147: #ifdef HAVE_LIBPTHREAD
148: if (TASK_TYPE(task) == taskTHREAD) {
149: if (TASK_FLAG(task) == PTHREAD_CREATE_JOINABLE) /* joinable thread */
150: schedTask(TASK_ROOT(task), _sched_threadJoin, TASK_ARG(task),
151: TASK_VAL(task), TASK_DATA(task), TASK_DATLEN(task));
152: sched_unuseTask(task);
153: pthread_exit((void*) retcode);
154: }
155: #endif
156:
157: return (void*) retcode;
158: }
159:
1.4 misho 160:
1.1 misho 161: /*
1.2 misho 162: * schedRead() - Add READ I/O task to scheduler queue
1.6 misho 163: *
1.1 misho 164: * @root = root task
165: * @func = task execution function
166: * @arg = 1st func argument
1.2 misho 167: * @fd = fd handle
1.5 misho 168: * @opt_data = Optional data
169: * @opt_dlen = Optional data length
1.1 misho 170: * return: NULL error or !=NULL new queued task
171: */
172: sched_task_t *
1.5 misho 173: schedRead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
174: void *opt_data, size_t opt_dlen)
1.1 misho 175: {
176: sched_task_t *task;
177: void *ptr;
178:
179: if (!root || !func)
180: return NULL;
181:
182: /* get new task */
1.13 misho 183: if (!(task = sched_useTask(root)))
1.4 misho 184: return NULL;
1.1 misho 185:
186: task->task_func = func;
1.5 misho 187: TASK_TYPE(task) = taskREAD;
188: TASK_ROOT(task) = root;
1.1 misho 189:
190: TASK_ARG(task) = arg;
1.2 misho 191: TASK_FD(task) = fd;
1.1 misho 192:
1.5 misho 193: TASK_DATA(task) = opt_data;
194: TASK_DATLEN(task) = opt_dlen;
195:
1.1 misho 196: if (root->root_hooks.hook_add.read)
197: ptr = root->root_hooks.hook_add.read(task, NULL);
198: else
199: ptr = NULL;
200:
1.5 misho 201: if (!ptr) {
202: #ifdef HAVE_LIBPTHREAD
203: pthread_mutex_lock(&root->root_mtx[taskREAD]);
204: #endif
1.9 misho 205: TAILQ_INSERT_TAIL(&root->root_read, TASK_ID(task), task_node);
1.5 misho 206: #ifdef HAVE_LIBPTHREAD
207: pthread_mutex_unlock(&root->root_mtx[taskREAD]);
208: #endif
209: } else
1.13 misho 210: task = sched_unuseTask(task);
1.1 misho 211:
212: return task;
213: }
214:
215: /*
1.2 misho 216: * schedWrite() - Add WRITE I/O task to scheduler queue
1.6 misho 217: *
1.1 misho 218: * @root = root task
219: * @func = task execution function
220: * @arg = 1st func argument
1.2 misho 221: * @fd = fd handle
1.5 misho 222: * @opt_data = Optional data
223: * @opt_dlen = Optional data length
1.1 misho 224: * return: NULL error or !=NULL new queued task
225: */
226: sched_task_t *
1.5 misho 227: schedWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
228: void *opt_data, size_t opt_dlen)
1.1 misho 229: {
230: sched_task_t *task;
231: void *ptr;
232:
233: if (!root || !func)
234: return NULL;
235:
236: /* get new task */
1.13 misho 237: if (!(task = sched_useTask(root)))
1.4 misho 238: return NULL;
1.1 misho 239:
240: task->task_func = func;
1.5 misho 241: TASK_TYPE(task) = taskWRITE;
242: TASK_ROOT(task) = root;
1.1 misho 243:
244: TASK_ARG(task) = arg;
1.2 misho 245: TASK_FD(task) = fd;
1.1 misho 246:
1.5 misho 247: TASK_DATA(task) = opt_data;
248: TASK_DATLEN(task) = opt_dlen;
249:
1.1 misho 250: if (root->root_hooks.hook_add.write)
251: ptr = root->root_hooks.hook_add.write(task, NULL);
252: else
253: ptr = NULL;
254:
1.5 misho 255: if (!ptr) {
256: #ifdef HAVE_LIBPTHREAD
257: pthread_mutex_lock(&root->root_mtx[taskWRITE]);
258: #endif
1.9 misho 259: TAILQ_INSERT_TAIL(&root->root_write, TASK_ID(task), task_node);
1.5 misho 260: #ifdef HAVE_LIBPTHREAD
261: pthread_mutex_unlock(&root->root_mtx[taskWRITE]);
262: #endif
263: } else
1.13 misho 264: task = sched_unuseTask(task);
1.1 misho 265:
266: return task;
267: }
268:
269: /*
1.9 misho 270: * schedNode() - Add NODE task to scheduler queue
271: *
272: * @root = root task
273: * @func = task execution function
274: * @arg = 1st func argument
275: * @fd = fd handle
276: * @opt_data = Optional data
277: * @opt_dlen = Optional data length
278: * return: NULL error or !=NULL new queued task
279: */
280: sched_task_t *
281: schedNode(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
282: void *opt_data, size_t opt_dlen)
283: {
284: sched_task_t *task;
285: void *ptr;
286:
287: if (!root || !func)
288: return NULL;
289:
290: /* get new task */
1.13 misho 291: if (!(task = sched_useTask(root)))
1.9 misho 292: return NULL;
293:
294: task->task_func = func;
295: TASK_TYPE(task) = taskNODE;
296: TASK_ROOT(task) = root;
297:
298: TASK_ARG(task) = arg;
299: TASK_FD(task) = fd;
300:
301: TASK_DATA(task) = opt_data;
302: TASK_DATLEN(task) = opt_dlen;
303:
304: if (root->root_hooks.hook_add.node)
305: ptr = root->root_hooks.hook_add.node(task, NULL);
306: else
307: ptr = NULL;
308:
309: if (!ptr) {
310: #ifdef HAVE_LIBPTHREAD
311: pthread_mutex_lock(&root->root_mtx[taskNODE]);
312: #endif
313: TAILQ_INSERT_TAIL(&root->root_node, TASK_ID(task), task_node);
314: #ifdef HAVE_LIBPTHREAD
315: pthread_mutex_unlock(&root->root_mtx[taskNODE]);
316: #endif
317: } else
1.13 misho 318: task = sched_unuseTask(task);
1.9 misho 319:
320: return task;
321: }
322:
323: /*
324: * schedProc() - Add PROC task to scheduler queue
325: *
326: * @root = root task
327: * @func = task execution function
328: * @arg = 1st func argument
329: * @pid = PID
330: * @opt_data = Optional data
331: * @opt_dlen = Optional data length
332: * return: NULL error or !=NULL new queued task
333: */
334: sched_task_t *
335: schedProc(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long pid,
336: void *opt_data, size_t opt_dlen)
337: {
338: sched_task_t *task;
339: void *ptr;
340:
341: if (!root || !func)
342: return NULL;
343:
344: /* get new task */
1.13 misho 345: if (!(task = sched_useTask(root)))
1.9 misho 346: return NULL;
347:
348: task->task_func = func;
349: TASK_TYPE(task) = taskPROC;
350: TASK_ROOT(task) = root;
351:
352: TASK_ARG(task) = arg;
353: TASK_VAL(task) = pid;
354:
355: TASK_DATA(task) = opt_data;
356: TASK_DATLEN(task) = opt_dlen;
357:
358: if (root->root_hooks.hook_add.proc)
359: ptr = root->root_hooks.hook_add.proc(task, NULL);
360: else
361: ptr = NULL;
362:
363: if (!ptr) {
364: #ifdef HAVE_LIBPTHREAD
365: pthread_mutex_lock(&root->root_mtx[taskPROC]);
366: #endif
367: TAILQ_INSERT_TAIL(&root->root_proc, TASK_ID(task), task_node);
368: #ifdef HAVE_LIBPTHREAD
369: pthread_mutex_unlock(&root->root_mtx[taskPROC]);
370: #endif
371: } else
1.13 misho 372: task = sched_unuseTask(task);
1.9 misho 373:
374: return task;
375: }
376:
377: /*
378: * schedUser() - Add trigger USER task to scheduler queue
379: *
380: * @root = root task
381: * @func = task execution function
382: * @arg = 1st func argument
383: * @id = Trigger ID
384: * @opt_data = Optional data
385: * @opt_dlen = Optional user's trigger flags
386: * return: NULL error or !=NULL new queued task
387: */
388: sched_task_t *
389: schedUser(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id,
390: void *opt_data, size_t opt_dlen)
391: {
392: #ifndef EVFILT_USER
393: sched_SetErr(ENOTSUP, "Not supported kevent() filter");
394: return NULL;
395: #else
396: sched_task_t *task;
397: void *ptr;
398:
399: if (!root || !func)
400: return NULL;
401:
402: /* get new task */
1.13 misho 403: if (!(task = sched_useTask(root)))
1.9 misho 404: return NULL;
405:
406: task->task_func = func;
407: TASK_TYPE(task) = taskUSER;
408: TASK_ROOT(task) = root;
409:
410: TASK_ARG(task) = arg;
411: TASK_VAL(task) = id;
412:
413: TASK_DATA(task) = opt_data;
414: TASK_DATLEN(task) = opt_dlen;
415:
416: if (root->root_hooks.hook_add.user)
417: ptr = root->root_hooks.hook_add.user(task, NULL);
418: else
419: ptr = NULL;
420:
421: if (!ptr) {
422: #ifdef HAVE_LIBPTHREAD
423: pthread_mutex_lock(&root->root_mtx[taskUSER]);
424: #endif
425: TAILQ_INSERT_TAIL(&root->root_user, TASK_ID(task), task_node);
426: #ifdef HAVE_LIBPTHREAD
427: pthread_mutex_unlock(&root->root_mtx[taskUSER]);
428: #endif
429: } else
1.13 misho 430: task = sched_unuseTask(task);
1.9 misho 431:
432: return task;
433: #endif
434: }
435:
436: /*
437: * schedSignal() - Add SIGNAL task to scheduler queue
438: *
439: * @root = root task
440: * @func = task execution function
441: * @arg = 1st func argument
442: * @sig = Signal
443: * @opt_data = Optional data
444: * @opt_dlen = Optional data length
445: * return: NULL error or !=NULL new queued task
446: */
447: sched_task_t *
448: schedSignal(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long sig,
449: void *opt_data, size_t opt_dlen)
450: {
451: sched_task_t *task;
452: void *ptr;
453:
454: if (!root || !func)
455: return NULL;
456:
457: /* get new task */
1.13 misho 458: if (!(task = sched_useTask(root)))
1.9 misho 459: return NULL;
460:
461: task->task_func = func;
462: TASK_TYPE(task) = taskSIGNAL;
463: TASK_ROOT(task) = root;
464:
465: TASK_ARG(task) = arg;
466: TASK_VAL(task) = sig;
467:
468: TASK_DATA(task) = opt_data;
469: TASK_DATLEN(task) = opt_dlen;
470:
471: if (root->root_hooks.hook_add.signal)
472: ptr = root->root_hooks.hook_add.signal(task, NULL);
473: else
474: ptr = NULL;
475:
476: if (!ptr) {
477: #ifdef HAVE_LIBPTHREAD
478: pthread_mutex_lock(&root->root_mtx[taskSIGNAL]);
479: #endif
480: TAILQ_INSERT_TAIL(&root->root_signal, TASK_ID(task), task_node);
481: #ifdef HAVE_LIBPTHREAD
482: pthread_mutex_unlock(&root->root_mtx[taskSIGNAL]);
483: #endif
484: } else
1.13 misho 485: task = sched_unuseTask(task);
1.9 misho 486:
487: return task;
488: }
489:
490: /*
1.8 misho 491: * schedAlarm() - Add ALARM task to scheduler queue
492: *
493: * @root = root task
494: * @func = task execution function
495: * @arg = 1st func argument
496: * @ts = timeout argument structure, minimum alarm timer resolution is 1msec!
497: * @opt_data = Optional data
498: * @opt_dlen = Optional data length
499: * return: NULL error or !=NULL new queued task
500: */
501: sched_task_t *
502: schedAlarm(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,
503: void *opt_data, size_t opt_dlen)
504: {
505: sched_task_t *task;
506: void *ptr;
507:
508: if (!root || !func)
509: return NULL;
510:
511: /* get new task */
1.13 misho 512: if (!(task = sched_useTask(root)))
1.8 misho 513: return NULL;
514:
515: task->task_func = func;
516: TASK_TYPE(task) = taskALARM;
517: TASK_ROOT(task) = root;
518:
519: TASK_ARG(task) = arg;
520: TASK_TS(task) = ts;
521:
522: TASK_DATA(task) = opt_data;
523: TASK_DATLEN(task) = opt_dlen;
524:
525: if (root->root_hooks.hook_add.alarm)
526: ptr = root->root_hooks.hook_add.alarm(task, NULL);
527: else
528: ptr = NULL;
529:
530: if (!ptr) {
531: #ifdef HAVE_LIBPTHREAD
532: pthread_mutex_lock(&root->root_mtx[taskALARM]);
533: #endif
1.9 misho 534: TAILQ_INSERT_TAIL(&root->root_alarm, TASK_ID(task), task_node);
1.8 misho 535: #ifdef HAVE_LIBPTHREAD
536: pthread_mutex_unlock(&root->root_mtx[taskALARM]);
537: #endif
538: } else
1.13 misho 539: task = sched_unuseTask(task);
1.8 misho 540:
541: return task;
542: }
543:
1.11 misho 544: #ifdef AIO_SUPPORT
545: /*
546: * schedAIO() - Add AIO task to scheduler queue
547: *
548: * @root = root task
549: * @func = task execution function
550: * @arg = 1st func argument
551: * @acb = AIO cb structure address
552: * @opt_data = Optional data
553: * @opt_dlen = Optional data length
554: * return: NULL error or !=NULL new queued task
555: */
556: sched_task_t *
557: schedAIO(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg,
558: struct aiocb * __restrict acb, void *opt_data, size_t opt_dlen)
559: {
560: sched_task_t *task;
561: void *ptr;
562:
563: if (!root || !func || !acb || !opt_dlen)
564: return NULL;
565:
566: /* get new task */
1.13 misho 567: if (!(task = sched_useTask(root)))
1.11 misho 568: return NULL;
569:
570: task->task_func = func;
571: TASK_TYPE(task) = taskAIO;
572: TASK_ROOT(task) = root;
573:
574: TASK_ARG(task) = arg;
575: TASK_VAL(task) = (u_long) acb;
576:
577: TASK_DATA(task) = opt_data;
578: TASK_DATLEN(task) = opt_dlen;
579:
580: if (root->root_hooks.hook_add.aio)
581: ptr = root->root_hooks.hook_add.aio(task, NULL);
582: else
583: ptr = NULL;
584:
585: if (!ptr) {
586: #ifdef HAVE_LIBPTHREAD
587: pthread_mutex_lock(&root->root_mtx[taskAIO]);
588: #endif
589: TAILQ_INSERT_TAIL(&root->root_aio, TASK_ID(task), task_node);
590: #ifdef HAVE_LIBPTHREAD
591: pthread_mutex_unlock(&root->root_mtx[taskAIO]);
592: #endif
593: } else
1.13 misho 594: task = sched_unuseTask(task);
1.11 misho 595:
596: return task;
597: }
598:
599: /*
600: * schedAIORead() - Add AIO read task to scheduler queue
601: *
602: * @root = root task
603: * @func = task execution function
604: * @arg = 1st func argument
605: * @fd = file descriptor
606: * @buffer = Buffer
607: * @buflen = Buffer length
608: * @offset = Offset from start of file, if =-1 from current position
609: * return: NULL error or !=NULL new queued task
610: */
611: inline sched_task_t *
612: schedAIORead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
613: void *buffer, size_t buflen, off_t offset)
614: {
615: struct aiocb *acb;
616: off_t off;
617:
618: if (!root || !func || !buffer || !buflen)
619: return NULL;
620:
621: if (offset == (off_t) -1) {
622: off = lseek(fd, 0, SEEK_CUR);
623: if (off == -1) {
624: LOGERR;
625: return NULL;
626: }
627: } else
628: off = offset;
629:
630: if (!(acb = malloc(sizeof(struct aiocb)))) {
631: LOGERR;
632: return NULL;
633: } else
634: memset(acb, 0, sizeof(struct aiocb));
635:
636: acb->aio_fildes = fd;
637: acb->aio_nbytes = buflen;
638: acb->aio_buf = buffer;
639: acb->aio_offset = off;
640: acb->aio_sigevent.sigev_notify = SIGEV_KEVENT;
641: acb->aio_sigevent.sigev_notify_kqueue = root->root_kq;
642: acb->aio_sigevent.sigev_value.sival_ptr = acb;
643:
644: if (aio_read(acb)) {
645: LOGERR;
646: free(acb);
647: return NULL;
648: }
649:
650: return schedAIO(root, func, arg, acb, buffer, buflen);
651: }
652:
653: /*
654: * schedAIOWrite() - Add AIO write task to scheduler queue
655: *
656: * @root = root task
657: * @func = task execution function
658: * @arg = 1st func argument
659: * @fd = file descriptor
660: * @buffer = Buffer
661: * @buflen = Buffer length
662: * @offset = Offset from start of file, if =-1 from current position
663: * return: NULL error or !=NULL new queued task
664: */
665: inline sched_task_t *
666: schedAIOWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
667: void *buffer, size_t buflen, off_t offset)
668: {
669: struct aiocb *acb;
670: off_t off;
671:
672: if (!root || !func || !buffer || !buflen)
673: return NULL;
674:
675: if (offset == (off_t) -1) {
676: off = lseek(fd, 0, SEEK_CUR);
677: if (off == -1) {
678: LOGERR;
679: return NULL;
680: }
681: } else
682: off = offset;
683:
684: if (!(acb = malloc(sizeof(struct aiocb)))) {
685: LOGERR;
686: return NULL;
687: } else
688: memset(acb, 0, sizeof(struct aiocb));
689:
690: acb->aio_fildes = fd;
691: acb->aio_nbytes = buflen;
692: acb->aio_buf = buffer;
693: acb->aio_offset = off;
694: acb->aio_sigevent.sigev_notify = SIGEV_KEVENT;
695: acb->aio_sigevent.sigev_notify_kqueue = root->root_kq;
696: acb->aio_sigevent.sigev_value.sival_ptr = acb;
697:
698: if (aio_write(acb)) {
699: LOGERR;
700: free(acb);
701: return NULL;
702: }
703:
704: return schedAIO(root, func, arg, acb, buffer, buflen);
705: }
706:
707: #ifdef EVFILT_LIO
708: /*
709: * schedLIO() - Add AIO bulk tasks to scheduler queue
710: *
711: * @root = root task
712: * @func = task execution function
713: * @arg = 1st func argument
714: * @acbs = AIO cb structure addresses
715: * @opt_data = Optional data
716: * @opt_dlen = Optional data length
717: * return: NULL error or !=NULL new queued task
718: */
719: sched_task_t *
720: schedLIO(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg,
721: struct aiocb ** __restrict acbs, void *opt_data, size_t opt_dlen)
722: {
723: sched_task_t *task;
724: void *ptr;
725:
726: if (!root || !func || !acbs || !opt_dlen)
727: return NULL;
728:
729: /* get new task */
1.13 misho 730: if (!(task = sched_useTask(root)))
1.11 misho 731: return NULL;
732:
733: task->task_func = func;
734: TASK_TYPE(task) = taskLIO;
735: TASK_ROOT(task) = root;
736:
737: TASK_ARG(task) = arg;
738: TASK_VAL(task) = (u_long) acbs;
739:
740: TASK_DATA(task) = opt_data;
741: TASK_DATLEN(task) = opt_dlen;
742:
743: if (root->root_hooks.hook_add.lio)
744: ptr = root->root_hooks.hook_add.lio(task, NULL);
745: else
746: ptr = NULL;
747:
748: if (!ptr) {
749: #ifdef HAVE_LIBPTHREAD
750: pthread_mutex_lock(&root->root_mtx[taskLIO]);
751: #endif
752: TAILQ_INSERT_TAIL(&root->root_lio, TASK_ID(task), task_node);
753: #ifdef HAVE_LIBPTHREAD
754: pthread_mutex_unlock(&root->root_mtx[taskLIO]);
755: #endif
756: } else
1.13 misho 757: task = sched_unuseTask(task);
1.11 misho 758:
759: return task;
760: }
761:
762: /*
763: * schedLIORead() - Add list of AIO read tasks to scheduler queue
764: *
765: * @root = root task
766: * @func = task execution function
767: * @arg = 1st func argument
768: * @fd = file descriptor
769: * @bufs = Buffer's list
770: * @nbufs = Number of Buffers
771: * @offset = Offset from start of file, if =-1 from current position
772: * return: NULL error or !=NULL new queued task
773: */
774: sched_task_t *
775: schedLIORead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
776: struct iovec *bufs, size_t nbufs, off_t offset)
777: {
778: struct sigevent sig;
779: struct aiocb **acb;
780: off_t off;
781: register int i;
782:
783: if (!root || !func || !bufs || !nbufs)
784: return NULL;
785:
786: if (offset == (off_t) -1) {
787: off = lseek(fd, 0, SEEK_CUR);
788: if (off == -1) {
789: LOGERR;
790: return NULL;
791: }
792: } else
793: off = offset;
794:
795: if (!(acb = calloc(sizeof(void*), nbufs))) {
796: LOGERR;
797: return NULL;
798: } else
799: memset(acb, 0, sizeof(void*) * nbufs);
800: for (i = 0; i < nbufs; off += bufs[i++].iov_len) {
801: acb[i] = malloc(sizeof(struct aiocb));
802: if (!acb[i]) {
803: LOGERR;
804: for (i = 0; i < nbufs; i++)
805: if (acb[i])
806: free(acb[i]);
807: free(acb);
808: return NULL;
809: } else
810: memset(acb[i], 0, sizeof(struct aiocb));
811: acb[i]->aio_fildes = fd;
812: acb[i]->aio_nbytes = bufs[i].iov_len;
813: acb[i]->aio_buf = bufs[i].iov_base;
814: acb[i]->aio_offset = off;
815: acb[i]->aio_lio_opcode = LIO_READ;
816: }
817: memset(&sig, 0, sizeof sig);
818: sig.sigev_notify = SIGEV_KEVENT;
819: sig.sigev_notify_kqueue = root->root_kq;
820: sig.sigev_value.sival_ptr = acb;
821:
822: if (lio_listio(LIO_NOWAIT, acb, nbufs, &sig)) {
823: LOGERR;
824: for (i = 0; i < nbufs; i++)
825: if (acb[i])
826: free(acb[i]);
827: free(acb);
828: return NULL;
829: }
830:
831: return schedLIO(root, func, arg, (void*) acb, bufs, nbufs);
832: }
833:
834: /*
835: * schedLIOWrite() - Add list of AIO write tasks to scheduler queue
836: *
837: * @root = root task
838: * @func = task execution function
839: * @arg = 1st func argument
840: * @fd = file descriptor
841: * @bufs = Buffer's list
842: * @nbufs = Number of Buffers
843: * @offset = Offset from start of file, if =-1 from current position
844: * return: NULL error or !=NULL new queued task
845: */
846: inline sched_task_t *
847: schedLIOWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
848: struct iovec *bufs, size_t nbufs, off_t offset)
849: {
850: struct sigevent sig;
851: struct aiocb **acb;
852: off_t off;
853: register int i;
854:
855: if (!root || !func || !bufs || !nbufs)
856: return NULL;
857:
858: if (offset == (off_t) -1) {
859: off = lseek(fd, 0, SEEK_CUR);
860: if (off == -1) {
861: LOGERR;
862: return NULL;
863: }
864: } else
865: off = offset;
866:
867: if (!(acb = calloc(sizeof(void*), nbufs))) {
868: LOGERR;
869: return NULL;
870: } else
871: memset(acb, 0, sizeof(void*) * nbufs);
872: for (i = 0; i < nbufs; off += bufs[i++].iov_len) {
873: acb[i] = malloc(sizeof(struct aiocb));
874: if (!acb[i]) {
875: LOGERR;
876: for (i = 0; i < nbufs; i++)
877: if (acb[i])
878: free(acb[i]);
879: free(acb);
880: return NULL;
881: } else
882: memset(acb[i], 0, sizeof(struct aiocb));
883: acb[i]->aio_fildes = fd;
884: acb[i]->aio_nbytes = bufs[i].iov_len;
885: acb[i]->aio_buf = bufs[i].iov_base;
886: acb[i]->aio_offset = off;
887: acb[i]->aio_lio_opcode = LIO_WRITE;
888: }
889: memset(&sig, 0, sizeof sig);
890: sig.sigev_notify = SIGEV_KEVENT;
891: sig.sigev_notify_kqueue = root->root_kq;
892: sig.sigev_value.sival_ptr = acb;
893:
894: if (lio_listio(LIO_NOWAIT, acb, nbufs, &sig)) {
895: LOGERR;
896: for (i = 0; i < nbufs; i++)
897: if (acb[i])
898: free(acb[i]);
899: free(acb);
900: return NULL;
901: }
902:
903: return schedLIO(root, func, arg, (void*) acb, bufs, nbufs);
904: }
905: #endif /* EVFILT_LIO */
906: #endif /* AIO_SUPPORT */
907:
1.8 misho 908: /*
1.1 misho 909: * schedTimer() - Add TIMER task to scheduler queue
1.6 misho 910: *
1.1 misho 911: * @root = root task
912: * @func = task execution function
913: * @arg = 1st func argument
1.5 misho 914: * @ts = timeout argument structure
915: * @opt_data = Optional data
916: * @opt_dlen = Optional data length
1.1 misho 917: * return: NULL error or !=NULL new queued task
918: */
919: sched_task_t *
1.5 misho 920: schedTimer(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,
921: void *opt_data, size_t opt_dlen)
1.1 misho 922: {
1.9 misho 923: sched_task_t *task, *tmp, *t = NULL;
1.1 misho 924: void *ptr;
1.5 misho 925: struct timespec now;
1.1 misho 926:
927: if (!root || !func)
928: return NULL;
929:
930: /* get new task */
1.13 misho 931: if (!(task = sched_useTask(root)))
1.4 misho 932: return NULL;
1.1 misho 933:
934: task->task_func = func;
1.5 misho 935: TASK_TYPE(task) = taskTIMER;
936: TASK_ROOT(task) = root;
1.1 misho 937:
938: TASK_ARG(task) = arg;
939:
1.5 misho 940: TASK_DATA(task) = opt_data;
941: TASK_DATLEN(task) = opt_dlen;
942:
1.1 misho 943: /* calculate timeval structure */
1.5 misho 944: clock_gettime(CLOCK_MONOTONIC, &now);
945: now.tv_sec += ts.tv_sec;
946: now.tv_nsec += ts.tv_nsec;
947: if (now.tv_nsec >= 1000000000L) {
1.1 misho 948: now.tv_sec++;
1.5 misho 949: now.tv_nsec -= 1000000000L;
950: } else if (now.tv_nsec < 0) {
1.1 misho 951: now.tv_sec--;
1.5 misho 952: now.tv_nsec += 1000000000L;
1.1 misho 953: }
1.5 misho 954: TASK_TS(task) = now;
1.1 misho 955:
956: if (root->root_hooks.hook_add.timer)
957: ptr = root->root_hooks.hook_add.timer(task, NULL);
958: else
959: ptr = NULL;
960:
961: if (!ptr) {
1.5 misho 962: #ifdef HAVE_LIBPTHREAD
963: pthread_mutex_lock(&root->root_mtx[taskTIMER]);
964: #endif
1.1 misho 965: #ifdef TIMER_WITHOUT_SORT
1.9 misho 966: TAILQ_INSERT_TAIL(&root->root_timer, TASK_ID(task), task_node);
1.1 misho 967: #else
1.9 misho 968: TAILQ_FOREACH_SAFE(t, &root->root_timer, task_node, tmp)
1.5 misho 969: if (sched_timespeccmp(&TASK_TS(task), &TASK_TS(t), -) < 1)
1.1 misho 970: break;
971: if (!t)
1.9 misho 972: TAILQ_INSERT_TAIL(&root->root_timer, TASK_ID(task), task_node);
1.1 misho 973: else
1.9 misho 974: TAILQ_INSERT_BEFORE(t, TASK_ID(task), task_node);
1.1 misho 975: #endif
1.5 misho 976: #ifdef HAVE_LIBPTHREAD
977: pthread_mutex_unlock(&root->root_mtx[taskTIMER]);
978: #endif
1.4 misho 979: } else
1.13 misho 980: task = sched_unuseTask(task);
1.1 misho 981:
982: return task;
983: }
984:
985: /*
986: * schedEvent() - Add EVENT task to scheduler queue
1.6 misho 987: *
1.1 misho 988: * @root = root task
989: * @func = task execution function
990: * @arg = 1st func argument
1.2 misho 991: * @val = additional func argument
1.5 misho 992: * @opt_data = Optional data
993: * @opt_dlen = Optional data length
1.1 misho 994: * return: NULL error or !=NULL new queued task
995: */
996: sched_task_t *
1.5 misho 997: schedEvent(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val,
998: void *opt_data, size_t opt_dlen)
1.1 misho 999: {
1000: sched_task_t *task;
1001: void *ptr;
1002:
1003: if (!root || !func)
1004: return NULL;
1005:
1006: /* get new task */
1.13 misho 1007: if (!(task = sched_useTask(root)))
1.4 misho 1008: return NULL;
1.1 misho 1009:
1010: task->task_func = func;
1.5 misho 1011: TASK_TYPE(task) = taskEVENT;
1012: TASK_ROOT(task) = root;
1.1 misho 1013:
1014: TASK_ARG(task) = arg;
1.2 misho 1015: TASK_VAL(task) = val;
1.1 misho 1016:
1.5 misho 1017: TASK_DATA(task) = opt_data;
1018: TASK_DATLEN(task) = opt_dlen;
1019:
1.1 misho 1020: if (root->root_hooks.hook_add.event)
1021: ptr = root->root_hooks.hook_add.event(task, NULL);
1022: else
1023: ptr = NULL;
1024:
1.5 misho 1025: if (!ptr) {
1026: #ifdef HAVE_LIBPTHREAD
1027: pthread_mutex_lock(&root->root_mtx[taskEVENT]);
1028: #endif
1.9 misho 1029: TAILQ_INSERT_TAIL(&root->root_event, TASK_ID(task), task_node);
1.5 misho 1030: #ifdef HAVE_LIBPTHREAD
1031: pthread_mutex_unlock(&root->root_mtx[taskEVENT]);
1032: #endif
1033: } else
1.13 misho 1034: task = sched_unuseTask(task);
1.1 misho 1035:
1036: return task;
1037: }
1038:
1039:
1040: /*
1.12 misho 1041: * schedTask() - Add regular task to scheduler queue
1.6 misho 1042: *
1.1 misho 1043: * @root = root task
1044: * @func = task execution function
1045: * @arg = 1st func argument
1.12 misho 1046: * @prio = regular task priority, 0 is hi priority for regular tasks
1.5 misho 1047: * @opt_data = Optional data
1048: * @opt_dlen = Optional data length
1.1 misho 1049: * return: NULL error or !=NULL new queued task
1050: */
1051: sched_task_t *
1.12 misho 1052: schedTask(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long prio,
1.5 misho 1053: void *opt_data, size_t opt_dlen)
1.1 misho 1054: {
1.12 misho 1055: sched_task_t *task, *tmp, *t = NULL;
1.1 misho 1056: void *ptr;
1057:
1058: if (!root || !func)
1059: return NULL;
1060:
1061: /* get new task */
1.13 misho 1062: if (!(task = sched_useTask(root)))
1.4 misho 1063: return NULL;
1.1 misho 1064:
1065: task->task_func = func;
1.12 misho 1066: TASK_TYPE(task) = taskTASK;
1.5 misho 1067: TASK_ROOT(task) = root;
1.1 misho 1068:
1069: TASK_ARG(task) = arg;
1.12 misho 1070: TASK_VAL(task) = prio;
1.1 misho 1071:
1.5 misho 1072: TASK_DATA(task) = opt_data;
1073: TASK_DATLEN(task) = opt_dlen;
1074:
1.12 misho 1075: if (root->root_hooks.hook_add.task)
1076: ptr = root->root_hooks.hook_add.task(task, NULL);
1.1 misho 1077: else
1078: ptr = NULL;
1079:
1.5 misho 1080: if (!ptr) {
1081: #ifdef HAVE_LIBPTHREAD
1.12 misho 1082: pthread_mutex_lock(&root->root_mtx[taskTASK]);
1.5 misho 1083: #endif
1.12 misho 1084: TAILQ_FOREACH_SAFE(t, &root->root_task, task_node, tmp)
1085: if (TASK_VAL(task) < TASK_VAL(t))
1086: break;
1087: if (!t)
1088: TAILQ_INSERT_TAIL(&root->root_task, TASK_ID(task), task_node);
1089: else
1090: TAILQ_INSERT_BEFORE(t, TASK_ID(task), task_node);
1.5 misho 1091: #ifdef HAVE_LIBPTHREAD
1.12 misho 1092: pthread_mutex_unlock(&root->root_mtx[taskTASK]);
1.5 misho 1093: #endif
1094: } else
1.13 misho 1095: task = sched_unuseTask(task);
1.1 misho 1096:
1097: return task;
1098: }
1099:
1100: /*
1.10 misho 1101: * schedSuspend() - Add Suspended task to scheduler queue
1102: *
1103: * @root = root task
1104: * @func = task execution function
1105: * @arg = 1st func argument
1106: * @id = Trigger ID
1107: * @opt_data = Optional data
1108: * @opt_dlen = Optional data length
1109: * return: NULL error or !=NULL new queued task
1110: */
1111: sched_task_t *
1112: schedSuspend(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id,
1113: void *opt_data, size_t opt_dlen)
1114: {
1115: sched_task_t *task;
1116: void *ptr;
1117:
1118: if (!root || !func)
1119: return NULL;
1120:
1121: /* get new task */
1.13 misho 1122: if (!(task = sched_useTask(root)))
1.10 misho 1123: return NULL;
1124:
1125: task->task_func = func;
1126: TASK_TYPE(task) = taskSUSPEND;
1127: TASK_ROOT(task) = root;
1128:
1129: TASK_ARG(task) = arg;
1130: TASK_VAL(task) = id;
1131:
1132: TASK_DATA(task) = opt_data;
1133: TASK_DATLEN(task) = opt_dlen;
1134:
1135: if (root->root_hooks.hook_add.suspend)
1136: ptr = root->root_hooks.hook_add.suspend(task, NULL);
1137: else
1138: ptr = NULL;
1139:
1140: if (!ptr) {
1141: #ifdef HAVE_LIBPTHREAD
1142: pthread_mutex_lock(&root->root_mtx[taskSUSPEND]);
1143: #endif
1144: TAILQ_INSERT_TAIL(&root->root_suspend, TASK_ID(task), task_node);
1145: #ifdef HAVE_LIBPTHREAD
1146: pthread_mutex_unlock(&root->root_mtx[taskSUSPEND]);
1147: #endif
1148: } else
1.13 misho 1149: task = sched_unuseTask(task);
1.10 misho 1150:
1151: return task;
1152: }
1153:
1154: /*
1.1 misho 1155: * schedCallOnce() - Call once from scheduler
1.6 misho 1156: *
1.1 misho 1157: * @root = root task
1158: * @func = task execution function
1159: * @arg = 1st func argument
1.2 misho 1160: * @val = additional func argument
1.5 misho 1161: * @opt_data = Optional data
1162: * @opt_dlen = Optional data length
1.2 misho 1163: * return: return value from called func
1.1 misho 1164: */
1165: sched_task_t *
1.5 misho 1166: schedCallOnce(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val,
1167: void *opt_data, size_t opt_dlen)
1.1 misho 1168: {
1169: sched_task_t *task;
1.2 misho 1170: void *ret;
1.1 misho 1171:
1172: if (!root || !func)
1173: return NULL;
1174:
1175: /* get new task */
1.13 misho 1176: if (!(task = sched_useTask(root)))
1.4 misho 1177: return NULL;
1.1 misho 1178:
1179: task->task_func = func;
1.5 misho 1180: TASK_TYPE(task) = taskEVENT;
1181: TASK_ROOT(task) = root;
1.1 misho 1182:
1183: TASK_ARG(task) = arg;
1.2 misho 1184: TASK_VAL(task) = val;
1.1 misho 1185:
1.5 misho 1186: TASK_DATA(task) = opt_data;
1187: TASK_DATLEN(task) = opt_dlen;
1188:
1.2 misho 1189: ret = schedCall(task);
1.1 misho 1190:
1.13 misho 1191: sched_unuseTask(task);
1.2 misho 1192: return ret;
1.1 misho 1193: }
1.13 misho 1194:
1195: /*
1196: * schedThread() - Add thread task to scheduler queue
1197: *
1198: * @root = root task
1199: * @func = task execution function
1200: * @arg = 1st func argument
1201: * @detach = Detach thread from scheduler, if !=0
1202: * @opt_data = Optional data
1203: * @opt_dlen = Optional data length
1204: * return: NULL error or !=NULL new queued task
1205: */
1206: sched_task_t *
1207: schedThread(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int detach,
1208: void *opt_data, size_t opt_dlen)
1209: {
1210: #ifndef HAVE_LIBPTHREAD
1211: sched_SetErr(ENOTSUP, "Not supported thread tasks");
1212: return NULL;
1213: #endif
1214: sched_task_t *task;
1215: void *ptr;
1216: pthread_attr_t attr;
1217:
1218: if (!root || !func)
1219: return NULL;
1220:
1221: /* get new task */
1222: if (!(task = sched_useTask(root)))
1223: return NULL;
1224:
1225: task->task_func = func;
1226: TASK_TYPE(task) = taskTHREAD;
1227: TASK_ROOT(task) = root;
1228:
1229: TASK_ARG(task) = arg;
1.13.2.1 misho 1230: TASK_FLAG(task) = detach ? PTHREAD_CREATE_DETACHED : PTHREAD_CREATE_JOINABLE;
1.13 misho 1231:
1232: TASK_DATA(task) = opt_data;
1233: TASK_DATLEN(task) = opt_dlen;
1234:
1235: pthread_attr_init(&attr);
1.13.2.1 misho 1236: pthread_attr_setdetachstate(&attr, TASK_FLAG(task));
1.13 misho 1237: if (root->root_hooks.hook_add.thread)
1238: ptr = root->root_hooks.hook_add.thread(task, &attr);
1239: else
1240: ptr = NULL;
1241: pthread_attr_destroy(&attr);
1242:
1243: if (!ptr) {
1244: pthread_mutex_lock(&root->root_mtx[taskTHREAD]);
1245: TAILQ_INSERT_TAIL(&root->root_thread, TASK_ID(task), task_node);
1246: pthread_mutex_unlock(&root->root_mtx[taskTHREAD]);
1247: } else
1248: task = sched_unuseTask(task);
1249:
1250: return task;
1251: }
1252:
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>