Annotation of libaitsched/src/tasks.c, revision 1.12.4.2
1.1 misho 1: /*************************************************************************
2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.12.4.2! misho 6: * $Id: tasks.c,v 1.12.4.1 2012/08/21 11:07:16 misho Exp $
1.1 misho 7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
1.6 misho 15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
1.1 misho 16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47:
48:
1.12.4.2! misho 49: /*
! 50: * sched_useTask() - Get and init new task
! 51: *
! 52: * @root = root task
! 53: * return: NULL error or !=NULL prepared task
! 54: */
1.4 misho 55: inline sched_task_t *
1.12.4.2! misho 56: sched_useTask(sched_root_task_t * __restrict root)
1.4 misho 57: {
1.7 misho 58: sched_task_t *task, *tmp;
1.4 misho 59:
1.7 misho 60: TAILQ_FOREACH_SAFE(task, &root->root_unuse, task_node, tmp) {
1.4 misho 61: if (!TASK_ISLOCKED(task)) {
1.5 misho 62: #ifdef HAVE_LIBPTHREAD
63: pthread_mutex_lock(&root->root_mtx[taskUNUSE]);
64: #endif
1.4 misho 65: TAILQ_REMOVE(&root->root_unuse, task, task_node);
1.5 misho 66: #ifdef HAVE_LIBPTHREAD
67: pthread_mutex_unlock(&root->root_mtx[taskUNUSE]);
68: #endif
1.4 misho 69: break;
70: }
71: }
72:
73: if (!task) {
74: task = malloc(sizeof(sched_task_t));
75: if (!task) {
76: LOGERR;
77: return NULL;
78: }
79: }
80:
1.9 misho 81: memset(task, 0, sizeof(sched_task_t));
82: task->task_id = (uintptr_t) task;
1.4 misho 83: return task;
84: }
85:
1.12.4.2! misho 86: /*
! 87: * sched_unuseTask() - Unlock and put task to unuse queue
! 88: *
! 89: * @task = task
! 90: * return: always is NULL
! 91: */
1.4 misho 92: inline sched_task_t *
1.12.4.2! misho 93: sched_unuseTask(sched_task_t * __restrict task)
1.4 misho 94: {
95: TASK_UNLOCK(task);
1.5 misho 96: TASK_TYPE(task) = taskUNUSE;
97: #ifdef HAVE_LIBPTHREAD
98: pthread_mutex_lock(&TASK_ROOT(task)->root_mtx[taskUNUSE]);
99: #endif
1.9 misho 100: TAILQ_INSERT_TAIL(&TASK_ROOT(task)->root_unuse, TASK_ID(task), task_node);
1.5 misho 101: #ifdef HAVE_LIBPTHREAD
102: pthread_mutex_unlock(&TASK_ROOT(task)->root_mtx[taskUNUSE]);
103: #endif
1.4 misho 104: task = NULL;
105:
106: return task;
107: }
108:
109:
1.1 misho 110: /*
1.2 misho 111: * schedRead() - Add READ I/O task to scheduler queue
1.6 misho 112: *
1.1 misho 113: * @root = root task
114: * @func = task execution function
115: * @arg = 1st func argument
1.2 misho 116: * @fd = fd handle
1.5 misho 117: * @opt_data = Optional data
118: * @opt_dlen = Optional data length
1.1 misho 119: * return: NULL error or !=NULL new queued task
120: */
121: sched_task_t *
1.5 misho 122: schedRead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
123: void *opt_data, size_t opt_dlen)
1.1 misho 124: {
125: sched_task_t *task;
126: void *ptr;
127:
128: if (!root || !func)
129: return NULL;
130:
131: /* get new task */
1.12.4.2! misho 132: if (!(task = sched_useTask(root)))
1.4 misho 133: return NULL;
1.1 misho 134:
135: task->task_func = func;
1.5 misho 136: TASK_TYPE(task) = taskREAD;
137: TASK_ROOT(task) = root;
1.1 misho 138:
139: TASK_ARG(task) = arg;
1.2 misho 140: TASK_FD(task) = fd;
1.1 misho 141:
1.5 misho 142: TASK_DATA(task) = opt_data;
143: TASK_DATLEN(task) = opt_dlen;
144:
1.1 misho 145: if (root->root_hooks.hook_add.read)
146: ptr = root->root_hooks.hook_add.read(task, NULL);
147: else
148: ptr = NULL;
149:
1.5 misho 150: if (!ptr) {
151: #ifdef HAVE_LIBPTHREAD
152: pthread_mutex_lock(&root->root_mtx[taskREAD]);
153: #endif
1.9 misho 154: TAILQ_INSERT_TAIL(&root->root_read, TASK_ID(task), task_node);
1.5 misho 155: #ifdef HAVE_LIBPTHREAD
156: pthread_mutex_unlock(&root->root_mtx[taskREAD]);
157: #endif
158: } else
1.12.4.2! misho 159: task = sched_unuseTask(task);
1.1 misho 160:
161: return task;
162: }
163:
164: /*
1.2 misho 165: * schedWrite() - Add WRITE I/O task to scheduler queue
1.6 misho 166: *
1.1 misho 167: * @root = root task
168: * @func = task execution function
169: * @arg = 1st func argument
1.2 misho 170: * @fd = fd handle
1.5 misho 171: * @opt_data = Optional data
172: * @opt_dlen = Optional data length
1.1 misho 173: * return: NULL error or !=NULL new queued task
174: */
175: sched_task_t *
1.5 misho 176: schedWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
177: void *opt_data, size_t opt_dlen)
1.1 misho 178: {
179: sched_task_t *task;
180: void *ptr;
181:
182: if (!root || !func)
183: return NULL;
184:
185: /* get new task */
1.12.4.2! misho 186: if (!(task = sched_useTask(root)))
1.4 misho 187: return NULL;
1.1 misho 188:
189: task->task_func = func;
1.5 misho 190: TASK_TYPE(task) = taskWRITE;
191: TASK_ROOT(task) = root;
1.1 misho 192:
193: TASK_ARG(task) = arg;
1.2 misho 194: TASK_FD(task) = fd;
1.1 misho 195:
1.5 misho 196: TASK_DATA(task) = opt_data;
197: TASK_DATLEN(task) = opt_dlen;
198:
1.1 misho 199: if (root->root_hooks.hook_add.write)
200: ptr = root->root_hooks.hook_add.write(task, NULL);
201: else
202: ptr = NULL;
203:
1.5 misho 204: if (!ptr) {
205: #ifdef HAVE_LIBPTHREAD
206: pthread_mutex_lock(&root->root_mtx[taskWRITE]);
207: #endif
1.9 misho 208: TAILQ_INSERT_TAIL(&root->root_write, TASK_ID(task), task_node);
1.5 misho 209: #ifdef HAVE_LIBPTHREAD
210: pthread_mutex_unlock(&root->root_mtx[taskWRITE]);
211: #endif
212: } else
1.12.4.2! misho 213: task = sched_unuseTask(task);
1.1 misho 214:
215: return task;
216: }
217:
218: /*
1.9 misho 219: * schedNode() - Add NODE task to scheduler queue
220: *
221: * @root = root task
222: * @func = task execution function
223: * @arg = 1st func argument
224: * @fd = fd handle
225: * @opt_data = Optional data
226: * @opt_dlen = Optional data length
227: * return: NULL error or !=NULL new queued task
228: */
229: sched_task_t *
230: schedNode(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
231: void *opt_data, size_t opt_dlen)
232: {
233: sched_task_t *task;
234: void *ptr;
235:
236: if (!root || !func)
237: return NULL;
238:
239: /* get new task */
1.12.4.2! misho 240: if (!(task = sched_useTask(root)))
1.9 misho 241: return NULL;
242:
243: task->task_func = func;
244: TASK_TYPE(task) = taskNODE;
245: TASK_ROOT(task) = root;
246:
247: TASK_ARG(task) = arg;
248: TASK_FD(task) = fd;
249:
250: TASK_DATA(task) = opt_data;
251: TASK_DATLEN(task) = opt_dlen;
252:
253: if (root->root_hooks.hook_add.node)
254: ptr = root->root_hooks.hook_add.node(task, NULL);
255: else
256: ptr = NULL;
257:
258: if (!ptr) {
259: #ifdef HAVE_LIBPTHREAD
260: pthread_mutex_lock(&root->root_mtx[taskNODE]);
261: #endif
262: TAILQ_INSERT_TAIL(&root->root_node, TASK_ID(task), task_node);
263: #ifdef HAVE_LIBPTHREAD
264: pthread_mutex_unlock(&root->root_mtx[taskNODE]);
265: #endif
266: } else
1.12.4.2! misho 267: task = sched_unuseTask(task);
1.9 misho 268:
269: return task;
270: }
271:
272: /*
273: * schedProc() - Add PROC task to scheduler queue
274: *
275: * @root = root task
276: * @func = task execution function
277: * @arg = 1st func argument
278: * @pid = PID
279: * @opt_data = Optional data
280: * @opt_dlen = Optional data length
281: * return: NULL error or !=NULL new queued task
282: */
283: sched_task_t *
284: schedProc(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long pid,
285: void *opt_data, size_t opt_dlen)
286: {
287: sched_task_t *task;
288: void *ptr;
289:
290: if (!root || !func)
291: return NULL;
292:
293: /* get new task */
1.12.4.2! misho 294: if (!(task = sched_useTask(root)))
1.9 misho 295: return NULL;
296:
297: task->task_func = func;
298: TASK_TYPE(task) = taskPROC;
299: TASK_ROOT(task) = root;
300:
301: TASK_ARG(task) = arg;
302: TASK_VAL(task) = pid;
303:
304: TASK_DATA(task) = opt_data;
305: TASK_DATLEN(task) = opt_dlen;
306:
307: if (root->root_hooks.hook_add.proc)
308: ptr = root->root_hooks.hook_add.proc(task, NULL);
309: else
310: ptr = NULL;
311:
312: if (!ptr) {
313: #ifdef HAVE_LIBPTHREAD
314: pthread_mutex_lock(&root->root_mtx[taskPROC]);
315: #endif
316: TAILQ_INSERT_TAIL(&root->root_proc, TASK_ID(task), task_node);
317: #ifdef HAVE_LIBPTHREAD
318: pthread_mutex_unlock(&root->root_mtx[taskPROC]);
319: #endif
320: } else
1.12.4.2! misho 321: task = sched_unuseTask(task);
1.9 misho 322:
323: return task;
324: }
325:
326: /*
327: * schedUser() - Add trigger USER task to scheduler queue
328: *
329: * @root = root task
330: * @func = task execution function
331: * @arg = 1st func argument
332: * @id = Trigger ID
333: * @opt_data = Optional data
334: * @opt_dlen = Optional user's trigger flags
335: * return: NULL error or !=NULL new queued task
336: */
337: sched_task_t *
338: schedUser(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id,
339: void *opt_data, size_t opt_dlen)
340: {
341: #ifndef EVFILT_USER
342: sched_SetErr(ENOTSUP, "Not supported kevent() filter");
343: return NULL;
344: #else
345: sched_task_t *task;
346: void *ptr;
347:
348: if (!root || !func)
349: return NULL;
350:
351: /* get new task */
1.12.4.2! misho 352: if (!(task = sched_useTask(root)))
1.9 misho 353: return NULL;
354:
355: task->task_func = func;
356: TASK_TYPE(task) = taskUSER;
357: TASK_ROOT(task) = root;
358:
359: TASK_ARG(task) = arg;
360: TASK_VAL(task) = id;
361:
362: TASK_DATA(task) = opt_data;
363: TASK_DATLEN(task) = opt_dlen;
364:
365: if (root->root_hooks.hook_add.user)
366: ptr = root->root_hooks.hook_add.user(task, NULL);
367: else
368: ptr = NULL;
369:
370: if (!ptr) {
371: #ifdef HAVE_LIBPTHREAD
372: pthread_mutex_lock(&root->root_mtx[taskUSER]);
373: #endif
374: TAILQ_INSERT_TAIL(&root->root_user, TASK_ID(task), task_node);
375: #ifdef HAVE_LIBPTHREAD
376: pthread_mutex_unlock(&root->root_mtx[taskUSER]);
377: #endif
378: } else
1.12.4.2! misho 379: task = sched_unuseTask(task);
1.9 misho 380:
381: return task;
382: #endif
383: }
384:
385: /*
386: * schedSignal() - Add SIGNAL task to scheduler queue
387: *
388: * @root = root task
389: * @func = task execution function
390: * @arg = 1st func argument
391: * @sig = Signal
392: * @opt_data = Optional data
393: * @opt_dlen = Optional data length
394: * return: NULL error or !=NULL new queued task
395: */
396: sched_task_t *
397: schedSignal(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long sig,
398: void *opt_data, size_t opt_dlen)
399: {
400: sched_task_t *task;
401: void *ptr;
402:
403: if (!root || !func)
404: return NULL;
405:
406: /* get new task */
1.12.4.2! misho 407: if (!(task = sched_useTask(root)))
1.9 misho 408: return NULL;
409:
410: task->task_func = func;
411: TASK_TYPE(task) = taskSIGNAL;
412: TASK_ROOT(task) = root;
413:
414: TASK_ARG(task) = arg;
415: TASK_VAL(task) = sig;
416:
417: TASK_DATA(task) = opt_data;
418: TASK_DATLEN(task) = opt_dlen;
419:
420: if (root->root_hooks.hook_add.signal)
421: ptr = root->root_hooks.hook_add.signal(task, NULL);
422: else
423: ptr = NULL;
424:
425: if (!ptr) {
426: #ifdef HAVE_LIBPTHREAD
427: pthread_mutex_lock(&root->root_mtx[taskSIGNAL]);
428: #endif
429: TAILQ_INSERT_TAIL(&root->root_signal, TASK_ID(task), task_node);
430: #ifdef HAVE_LIBPTHREAD
431: pthread_mutex_unlock(&root->root_mtx[taskSIGNAL]);
432: #endif
433: } else
1.12.4.2! misho 434: task = sched_unuseTask(task);
1.9 misho 435:
436: return task;
437: }
438:
439: /*
1.8 misho 440: * schedAlarm() - Add ALARM task to scheduler queue
441: *
442: * @root = root task
443: * @func = task execution function
444: * @arg = 1st func argument
445: * @ts = timeout argument structure, minimum alarm timer resolution is 1msec!
446: * @opt_data = Optional data
447: * @opt_dlen = Optional data length
448: * return: NULL error or !=NULL new queued task
449: */
450: sched_task_t *
451: schedAlarm(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,
452: void *opt_data, size_t opt_dlen)
453: {
454: sched_task_t *task;
455: void *ptr;
456:
457: if (!root || !func)
458: return NULL;
459:
460: /* get new task */
1.12.4.2! misho 461: if (!(task = sched_useTask(root)))
1.8 misho 462: return NULL;
463:
464: task->task_func = func;
465: TASK_TYPE(task) = taskALARM;
466: TASK_ROOT(task) = root;
467:
468: TASK_ARG(task) = arg;
469: TASK_TS(task) = ts;
470:
471: TASK_DATA(task) = opt_data;
472: TASK_DATLEN(task) = opt_dlen;
473:
474: if (root->root_hooks.hook_add.alarm)
475: ptr = root->root_hooks.hook_add.alarm(task, NULL);
476: else
477: ptr = NULL;
478:
479: if (!ptr) {
480: #ifdef HAVE_LIBPTHREAD
481: pthread_mutex_lock(&root->root_mtx[taskALARM]);
482: #endif
1.9 misho 483: TAILQ_INSERT_TAIL(&root->root_alarm, TASK_ID(task), task_node);
1.8 misho 484: #ifdef HAVE_LIBPTHREAD
485: pthread_mutex_unlock(&root->root_mtx[taskALARM]);
486: #endif
487: } else
1.12.4.2! misho 488: task = sched_unuseTask(task);
1.8 misho 489:
490: return task;
491: }
492:
1.11 misho 493: #ifdef AIO_SUPPORT
494: /*
495: * schedAIO() - Add AIO task to scheduler queue
496: *
497: * @root = root task
498: * @func = task execution function
499: * @arg = 1st func argument
500: * @acb = AIO cb structure address
501: * @opt_data = Optional data
502: * @opt_dlen = Optional data length
503: * return: NULL error or !=NULL new queued task
504: */
505: sched_task_t *
506: schedAIO(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg,
507: struct aiocb * __restrict acb, void *opt_data, size_t opt_dlen)
508: {
509: sched_task_t *task;
510: void *ptr;
511:
512: if (!root || !func || !acb || !opt_dlen)
513: return NULL;
514:
515: /* get new task */
1.12.4.2! misho 516: if (!(task = sched_useTask(root)))
1.11 misho 517: return NULL;
518:
519: task->task_func = func;
520: TASK_TYPE(task) = taskAIO;
521: TASK_ROOT(task) = root;
522:
523: TASK_ARG(task) = arg;
524: TASK_VAL(task) = (u_long) acb;
525:
526: TASK_DATA(task) = opt_data;
527: TASK_DATLEN(task) = opt_dlen;
528:
529: if (root->root_hooks.hook_add.aio)
530: ptr = root->root_hooks.hook_add.aio(task, NULL);
531: else
532: ptr = NULL;
533:
534: if (!ptr) {
535: #ifdef HAVE_LIBPTHREAD
536: pthread_mutex_lock(&root->root_mtx[taskAIO]);
537: #endif
538: TAILQ_INSERT_TAIL(&root->root_aio, TASK_ID(task), task_node);
539: #ifdef HAVE_LIBPTHREAD
540: pthread_mutex_unlock(&root->root_mtx[taskAIO]);
541: #endif
542: } else
1.12.4.2! misho 543: task = sched_unuseTask(task);
1.11 misho 544:
545: return task;
546: }
547:
548: /*
549: * schedAIORead() - Add AIO read task to scheduler queue
550: *
551: * @root = root task
552: * @func = task execution function
553: * @arg = 1st func argument
554: * @fd = file descriptor
555: * @buffer = Buffer
556: * @buflen = Buffer length
557: * @offset = Offset from start of file, if =-1 from current position
558: * return: NULL error or !=NULL new queued task
559: */
560: inline sched_task_t *
561: schedAIORead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
562: void *buffer, size_t buflen, off_t offset)
563: {
564: struct aiocb *acb;
565: off_t off;
566:
567: if (!root || !func || !buffer || !buflen)
568: return NULL;
569:
570: if (offset == (off_t) -1) {
571: off = lseek(fd, 0, SEEK_CUR);
572: if (off == -1) {
573: LOGERR;
574: return NULL;
575: }
576: } else
577: off = offset;
578:
579: if (!(acb = malloc(sizeof(struct aiocb)))) {
580: LOGERR;
581: return NULL;
582: } else
583: memset(acb, 0, sizeof(struct aiocb));
584:
585: acb->aio_fildes = fd;
586: acb->aio_nbytes = buflen;
587: acb->aio_buf = buffer;
588: acb->aio_offset = off;
589: acb->aio_sigevent.sigev_notify = SIGEV_KEVENT;
590: acb->aio_sigevent.sigev_notify_kqueue = root->root_kq;
591: acb->aio_sigevent.sigev_value.sival_ptr = acb;
592:
593: if (aio_read(acb)) {
594: LOGERR;
595: free(acb);
596: return NULL;
597: }
598:
599: return schedAIO(root, func, arg, acb, buffer, buflen);
600: }
601:
602: /*
603: * schedAIOWrite() - Add AIO write task to scheduler queue
604: *
605: * @root = root task
606: * @func = task execution function
607: * @arg = 1st func argument
608: * @fd = file descriptor
609: * @buffer = Buffer
610: * @buflen = Buffer length
611: * @offset = Offset from start of file, if =-1 from current position
612: * return: NULL error or !=NULL new queued task
613: */
614: inline sched_task_t *
615: schedAIOWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
616: void *buffer, size_t buflen, off_t offset)
617: {
618: struct aiocb *acb;
619: off_t off;
620:
621: if (!root || !func || !buffer || !buflen)
622: return NULL;
623:
624: if (offset == (off_t) -1) {
625: off = lseek(fd, 0, SEEK_CUR);
626: if (off == -1) {
627: LOGERR;
628: return NULL;
629: }
630: } else
631: off = offset;
632:
633: if (!(acb = malloc(sizeof(struct aiocb)))) {
634: LOGERR;
635: return NULL;
636: } else
637: memset(acb, 0, sizeof(struct aiocb));
638:
639: acb->aio_fildes = fd;
640: acb->aio_nbytes = buflen;
641: acb->aio_buf = buffer;
642: acb->aio_offset = off;
643: acb->aio_sigevent.sigev_notify = SIGEV_KEVENT;
644: acb->aio_sigevent.sigev_notify_kqueue = root->root_kq;
645: acb->aio_sigevent.sigev_value.sival_ptr = acb;
646:
647: if (aio_write(acb)) {
648: LOGERR;
649: free(acb);
650: return NULL;
651: }
652:
653: return schedAIO(root, func, arg, acb, buffer, buflen);
654: }
655:
656: #ifdef EVFILT_LIO
657: /*
658: * schedLIO() - Add AIO bulk tasks to scheduler queue
659: *
660: * @root = root task
661: * @func = task execution function
662: * @arg = 1st func argument
663: * @acbs = AIO cb structure addresses
664: * @opt_data = Optional data
665: * @opt_dlen = Optional data length
666: * return: NULL error or !=NULL new queued task
667: */
668: sched_task_t *
669: schedLIO(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg,
670: struct aiocb ** __restrict acbs, void *opt_data, size_t opt_dlen)
671: {
672: sched_task_t *task;
673: void *ptr;
674:
675: if (!root || !func || !acbs || !opt_dlen)
676: return NULL;
677:
678: /* get new task */
1.12.4.2! misho 679: if (!(task = sched_useTask(root)))
1.11 misho 680: return NULL;
681:
682: task->task_func = func;
683: TASK_TYPE(task) = taskLIO;
684: TASK_ROOT(task) = root;
685:
686: TASK_ARG(task) = arg;
687: TASK_VAL(task) = (u_long) acbs;
688:
689: TASK_DATA(task) = opt_data;
690: TASK_DATLEN(task) = opt_dlen;
691:
692: if (root->root_hooks.hook_add.lio)
693: ptr = root->root_hooks.hook_add.lio(task, NULL);
694: else
695: ptr = NULL;
696:
697: if (!ptr) {
698: #ifdef HAVE_LIBPTHREAD
699: pthread_mutex_lock(&root->root_mtx[taskLIO]);
700: #endif
701: TAILQ_INSERT_TAIL(&root->root_lio, TASK_ID(task), task_node);
702: #ifdef HAVE_LIBPTHREAD
703: pthread_mutex_unlock(&root->root_mtx[taskLIO]);
704: #endif
705: } else
1.12.4.2! misho 706: task = sched_unuseTask(task);
1.11 misho 707:
708: return task;
709: }
710:
711: /*
712: * schedLIORead() - Add list of AIO read tasks to scheduler queue
713: *
714: * @root = root task
715: * @func = task execution function
716: * @arg = 1st func argument
717: * @fd = file descriptor
718: * @bufs = Buffer's list
719: * @nbufs = Number of Buffers
720: * @offset = Offset from start of file, if =-1 from current position
721: * return: NULL error or !=NULL new queued task
722: */
723: sched_task_t *
724: schedLIORead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
725: struct iovec *bufs, size_t nbufs, off_t offset)
726: {
727: struct sigevent sig;
728: struct aiocb **acb;
729: off_t off;
730: register int i;
731:
732: if (!root || !func || !bufs || !nbufs)
733: return NULL;
734:
735: if (offset == (off_t) -1) {
736: off = lseek(fd, 0, SEEK_CUR);
737: if (off == -1) {
738: LOGERR;
739: return NULL;
740: }
741: } else
742: off = offset;
743:
744: if (!(acb = calloc(sizeof(void*), nbufs))) {
745: LOGERR;
746: return NULL;
747: } else
748: memset(acb, 0, sizeof(void*) * nbufs);
749: for (i = 0; i < nbufs; off += bufs[i++].iov_len) {
750: acb[i] = malloc(sizeof(struct aiocb));
751: if (!acb[i]) {
752: LOGERR;
753: for (i = 0; i < nbufs; i++)
754: if (acb[i])
755: free(acb[i]);
756: free(acb);
757: return NULL;
758: } else
759: memset(acb[i], 0, sizeof(struct aiocb));
760: acb[i]->aio_fildes = fd;
761: acb[i]->aio_nbytes = bufs[i].iov_len;
762: acb[i]->aio_buf = bufs[i].iov_base;
763: acb[i]->aio_offset = off;
764: acb[i]->aio_lio_opcode = LIO_READ;
765: }
766: memset(&sig, 0, sizeof sig);
767: sig.sigev_notify = SIGEV_KEVENT;
768: sig.sigev_notify_kqueue = root->root_kq;
769: sig.sigev_value.sival_ptr = acb;
770:
771: if (lio_listio(LIO_NOWAIT, acb, nbufs, &sig)) {
772: LOGERR;
773: for (i = 0; i < nbufs; i++)
774: if (acb[i])
775: free(acb[i]);
776: free(acb);
777: return NULL;
778: }
779:
780: return schedLIO(root, func, arg, (void*) acb, bufs, nbufs);
781: }
782:
783: /*
784: * schedLIOWrite() - Add list of AIO write tasks to scheduler queue
785: *
786: * @root = root task
787: * @func = task execution function
788: * @arg = 1st func argument
789: * @fd = file descriptor
790: * @bufs = Buffer's list
791: * @nbufs = Number of Buffers
792: * @offset = Offset from start of file, if =-1 from current position
793: * return: NULL error or !=NULL new queued task
794: */
795: inline sched_task_t *
796: schedLIOWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
797: struct iovec *bufs, size_t nbufs, off_t offset)
798: {
799: struct sigevent sig;
800: struct aiocb **acb;
801: off_t off;
802: register int i;
803:
804: if (!root || !func || !bufs || !nbufs)
805: return NULL;
806:
807: if (offset == (off_t) -1) {
808: off = lseek(fd, 0, SEEK_CUR);
809: if (off == -1) {
810: LOGERR;
811: return NULL;
812: }
813: } else
814: off = offset;
815:
816: if (!(acb = calloc(sizeof(void*), nbufs))) {
817: LOGERR;
818: return NULL;
819: } else
820: memset(acb, 0, sizeof(void*) * nbufs);
821: for (i = 0; i < nbufs; off += bufs[i++].iov_len) {
822: acb[i] = malloc(sizeof(struct aiocb));
823: if (!acb[i]) {
824: LOGERR;
825: for (i = 0; i < nbufs; i++)
826: if (acb[i])
827: free(acb[i]);
828: free(acb);
829: return NULL;
830: } else
831: memset(acb[i], 0, sizeof(struct aiocb));
832: acb[i]->aio_fildes = fd;
833: acb[i]->aio_nbytes = bufs[i].iov_len;
834: acb[i]->aio_buf = bufs[i].iov_base;
835: acb[i]->aio_offset = off;
836: acb[i]->aio_lio_opcode = LIO_WRITE;
837: }
838: memset(&sig, 0, sizeof sig);
839: sig.sigev_notify = SIGEV_KEVENT;
840: sig.sigev_notify_kqueue = root->root_kq;
841: sig.sigev_value.sival_ptr = acb;
842:
843: if (lio_listio(LIO_NOWAIT, acb, nbufs, &sig)) {
844: LOGERR;
845: for (i = 0; i < nbufs; i++)
846: if (acb[i])
847: free(acb[i]);
848: free(acb);
849: return NULL;
850: }
851:
852: return schedLIO(root, func, arg, (void*) acb, bufs, nbufs);
853: }
854: #endif /* EVFILT_LIO */
855: #endif /* AIO_SUPPORT */
856:
1.8 misho 857: /*
1.1 misho 858: * schedTimer() - Add TIMER task to scheduler queue
1.6 misho 859: *
1.1 misho 860: * @root = root task
861: * @func = task execution function
862: * @arg = 1st func argument
1.5 misho 863: * @ts = timeout argument structure
864: * @opt_data = Optional data
865: * @opt_dlen = Optional data length
1.1 misho 866: * return: NULL error or !=NULL new queued task
867: */
868: sched_task_t *
1.5 misho 869: schedTimer(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,
870: void *opt_data, size_t opt_dlen)
1.1 misho 871: {
1.9 misho 872: sched_task_t *task, *tmp, *t = NULL;
1.1 misho 873: void *ptr;
1.5 misho 874: struct timespec now;
1.1 misho 875:
876: if (!root || !func)
877: return NULL;
878:
879: /* get new task */
1.12.4.2! misho 880: if (!(task = sched_useTask(root)))
1.4 misho 881: return NULL;
1.1 misho 882:
883: task->task_func = func;
1.5 misho 884: TASK_TYPE(task) = taskTIMER;
885: TASK_ROOT(task) = root;
1.1 misho 886:
887: TASK_ARG(task) = arg;
888:
1.5 misho 889: TASK_DATA(task) = opt_data;
890: TASK_DATLEN(task) = opt_dlen;
891:
1.1 misho 892: /* calculate timeval structure */
1.5 misho 893: clock_gettime(CLOCK_MONOTONIC, &now);
894: now.tv_sec += ts.tv_sec;
895: now.tv_nsec += ts.tv_nsec;
896: if (now.tv_nsec >= 1000000000L) {
1.1 misho 897: now.tv_sec++;
1.5 misho 898: now.tv_nsec -= 1000000000L;
899: } else if (now.tv_nsec < 0) {
1.1 misho 900: now.tv_sec--;
1.5 misho 901: now.tv_nsec += 1000000000L;
1.1 misho 902: }
1.5 misho 903: TASK_TS(task) = now;
1.1 misho 904:
905: if (root->root_hooks.hook_add.timer)
906: ptr = root->root_hooks.hook_add.timer(task, NULL);
907: else
908: ptr = NULL;
909:
910: if (!ptr) {
1.5 misho 911: #ifdef HAVE_LIBPTHREAD
912: pthread_mutex_lock(&root->root_mtx[taskTIMER]);
913: #endif
1.1 misho 914: #ifdef TIMER_WITHOUT_SORT
1.9 misho 915: TAILQ_INSERT_TAIL(&root->root_timer, TASK_ID(task), task_node);
1.1 misho 916: #else
1.9 misho 917: TAILQ_FOREACH_SAFE(t, &root->root_timer, task_node, tmp)
1.5 misho 918: if (sched_timespeccmp(&TASK_TS(task), &TASK_TS(t), -) < 1)
1.1 misho 919: break;
920: if (!t)
1.9 misho 921: TAILQ_INSERT_TAIL(&root->root_timer, TASK_ID(task), task_node);
1.1 misho 922: else
1.9 misho 923: TAILQ_INSERT_BEFORE(t, TASK_ID(task), task_node);
1.1 misho 924: #endif
1.5 misho 925: #ifdef HAVE_LIBPTHREAD
926: pthread_mutex_unlock(&root->root_mtx[taskTIMER]);
927: #endif
1.4 misho 928: } else
1.12.4.2! misho 929: task = sched_unuseTask(task);
1.1 misho 930:
931: return task;
932: }
933:
934: /*
935: * schedEvent() - Add EVENT task to scheduler queue
1.6 misho 936: *
1.1 misho 937: * @root = root task
938: * @func = task execution function
939: * @arg = 1st func argument
1.2 misho 940: * @val = additional func argument
1.5 misho 941: * @opt_data = Optional data
942: * @opt_dlen = Optional data length
1.1 misho 943: * return: NULL error or !=NULL new queued task
944: */
945: sched_task_t *
1.5 misho 946: schedEvent(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val,
947: void *opt_data, size_t opt_dlen)
1.1 misho 948: {
949: sched_task_t *task;
950: void *ptr;
951:
952: if (!root || !func)
953: return NULL;
954:
955: /* get new task */
1.12.4.2! misho 956: if (!(task = sched_useTask(root)))
1.4 misho 957: return NULL;
1.1 misho 958:
959: task->task_func = func;
1.5 misho 960: TASK_TYPE(task) = taskEVENT;
961: TASK_ROOT(task) = root;
1.1 misho 962:
963: TASK_ARG(task) = arg;
1.2 misho 964: TASK_VAL(task) = val;
1.1 misho 965:
1.5 misho 966: TASK_DATA(task) = opt_data;
967: TASK_DATLEN(task) = opt_dlen;
968:
1.1 misho 969: if (root->root_hooks.hook_add.event)
970: ptr = root->root_hooks.hook_add.event(task, NULL);
971: else
972: ptr = NULL;
973:
1.5 misho 974: if (!ptr) {
975: #ifdef HAVE_LIBPTHREAD
976: pthread_mutex_lock(&root->root_mtx[taskEVENT]);
977: #endif
1.9 misho 978: TAILQ_INSERT_TAIL(&root->root_event, TASK_ID(task), task_node);
1.5 misho 979: #ifdef HAVE_LIBPTHREAD
980: pthread_mutex_unlock(&root->root_mtx[taskEVENT]);
981: #endif
982: } else
1.12.4.2! misho 983: task = sched_unuseTask(task);
1.1 misho 984:
985: return task;
986: }
987:
988:
989: /*
1.12 misho 990: * schedTask() - Add regular task to scheduler queue
1.6 misho 991: *
1.1 misho 992: * @root = root task
993: * @func = task execution function
994: * @arg = 1st func argument
1.12 misho 995: * @prio = regular task priority, 0 is hi priority for regular tasks
1.5 misho 996: * @opt_data = Optional data
997: * @opt_dlen = Optional data length
1.1 misho 998: * return: NULL error or !=NULL new queued task
999: */
1000: sched_task_t *
1.12 misho 1001: schedTask(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long prio,
1.5 misho 1002: void *opt_data, size_t opt_dlen)
1.1 misho 1003: {
1.12 misho 1004: sched_task_t *task, *tmp, *t = NULL;
1.1 misho 1005: void *ptr;
1006:
1007: if (!root || !func)
1008: return NULL;
1009:
1010: /* get new task */
1.12.4.2! misho 1011: if (!(task = sched_useTask(root)))
1.4 misho 1012: return NULL;
1.1 misho 1013:
1014: task->task_func = func;
1.12 misho 1015: TASK_TYPE(task) = taskTASK;
1.5 misho 1016: TASK_ROOT(task) = root;
1.1 misho 1017:
1018: TASK_ARG(task) = arg;
1.12 misho 1019: TASK_VAL(task) = prio;
1.1 misho 1020:
1.5 misho 1021: TASK_DATA(task) = opt_data;
1022: TASK_DATLEN(task) = opt_dlen;
1023:
1.12 misho 1024: if (root->root_hooks.hook_add.task)
1025: ptr = root->root_hooks.hook_add.task(task, NULL);
1.1 misho 1026: else
1027: ptr = NULL;
1028:
1.5 misho 1029: if (!ptr) {
1030: #ifdef HAVE_LIBPTHREAD
1.12 misho 1031: pthread_mutex_lock(&root->root_mtx[taskTASK]);
1.5 misho 1032: #endif
1.12 misho 1033: TAILQ_FOREACH_SAFE(t, &root->root_task, task_node, tmp)
1034: if (TASK_VAL(task) < TASK_VAL(t))
1035: break;
1036: if (!t)
1037: TAILQ_INSERT_TAIL(&root->root_task, TASK_ID(task), task_node);
1038: else
1039: TAILQ_INSERT_BEFORE(t, TASK_ID(task), task_node);
1.5 misho 1040: #ifdef HAVE_LIBPTHREAD
1.12 misho 1041: pthread_mutex_unlock(&root->root_mtx[taskTASK]);
1.5 misho 1042: #endif
1043: } else
1.12.4.2! misho 1044: task = sched_unuseTask(task);
1.1 misho 1045:
1046: return task;
1047: }
1048:
1049: /*
1.10 misho 1050: * schedSuspend() - Add Suspended task to scheduler queue
1051: *
1052: * @root = root task
1053: * @func = task execution function
1054: * @arg = 1st func argument
1055: * @id = Trigger ID
1056: * @opt_data = Optional data
1057: * @opt_dlen = Optional data length
1058: * return: NULL error or !=NULL new queued task
1059: */
1060: sched_task_t *
1061: schedSuspend(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id,
1062: void *opt_data, size_t opt_dlen)
1063: {
1064: sched_task_t *task;
1065: void *ptr;
1066:
1067: if (!root || !func)
1068: return NULL;
1069:
1070: /* get new task */
1.12.4.2! misho 1071: if (!(task = sched_useTask(root)))
1.10 misho 1072: return NULL;
1073:
1074: task->task_func = func;
1075: TASK_TYPE(task) = taskSUSPEND;
1076: TASK_ROOT(task) = root;
1077:
1078: TASK_ARG(task) = arg;
1079: TASK_VAL(task) = id;
1080:
1081: TASK_DATA(task) = opt_data;
1082: TASK_DATLEN(task) = opt_dlen;
1083:
1084: if (root->root_hooks.hook_add.suspend)
1085: ptr = root->root_hooks.hook_add.suspend(task, NULL);
1086: else
1087: ptr = NULL;
1088:
1089: if (!ptr) {
1090: #ifdef HAVE_LIBPTHREAD
1091: pthread_mutex_lock(&root->root_mtx[taskSUSPEND]);
1092: #endif
1093: TAILQ_INSERT_TAIL(&root->root_suspend, TASK_ID(task), task_node);
1094: #ifdef HAVE_LIBPTHREAD
1095: pthread_mutex_unlock(&root->root_mtx[taskSUSPEND]);
1096: #endif
1097: } else
1.12.4.2! misho 1098: task = sched_unuseTask(task);
1.10 misho 1099:
1100: return task;
1101: }
1102:
1103: /*
1.1 misho 1104: * schedCallOnce() - Call once from scheduler
1.6 misho 1105: *
1.1 misho 1106: * @root = root task
1107: * @func = task execution function
1108: * @arg = 1st func argument
1.2 misho 1109: * @val = additional func argument
1.5 misho 1110: * @opt_data = Optional data
1111: * @opt_dlen = Optional data length
1.2 misho 1112: * return: return value from called func
1.1 misho 1113: */
1114: sched_task_t *
1.5 misho 1115: schedCallOnce(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val,
1116: void *opt_data, size_t opt_dlen)
1.1 misho 1117: {
1118: sched_task_t *task;
1.2 misho 1119: void *ret;
1.1 misho 1120:
1121: if (!root || !func)
1122: return NULL;
1123:
1124: /* get new task */
1.12.4.2! misho 1125: if (!(task = sched_useTask(root)))
1.4 misho 1126: return NULL;
1.1 misho 1127:
1128: task->task_func = func;
1.5 misho 1129: TASK_TYPE(task) = taskEVENT;
1130: TASK_ROOT(task) = root;
1.1 misho 1131:
1132: TASK_ARG(task) = arg;
1.2 misho 1133: TASK_VAL(task) = val;
1.1 misho 1134:
1.5 misho 1135: TASK_DATA(task) = opt_data;
1136: TASK_DATLEN(task) = opt_dlen;
1137:
1.2 misho 1138: ret = schedCall(task);
1.1 misho 1139:
1.12.4.2! misho 1140: sched_unuseTask(task);
1.2 misho 1141: return ret;
1.1 misho 1142: }
1.12.4.1 misho 1143:
1144: /*
1145: * schedThread() - Add thread task to scheduler queue
1146: *
1147: * @root = root task
1148: * @func = task execution function
1149: * @arg = 1st func argument
1150: * @detach = Detach thread from scheduler, if !=0
1151: * @opt_data = Optional data
1152: * @opt_dlen = Optional data length
1153: * return: NULL error or !=NULL new queued task
1154: */
1155: sched_task_t *
1156: schedThread(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int detach,
1157: void *opt_data, size_t opt_dlen)
1158: {
1159: #ifndef HAVE_LIBPTHREAD
1160: sched_SetErr(ENOTSUP, "Not supported thread tasks");
1161: return NULL;
1162: #endif
1163: sched_task_t *task;
1164: void *ptr;
1165: pthread_attr_t attr;
1166:
1167: if (!root || !func)
1168: return NULL;
1169:
1170: /* get new task */
1.12.4.2! misho 1171: if (!(task = sched_useTask(root)))
1.12.4.1 misho 1172: return NULL;
1173:
1174: task->task_func = func;
1175: TASK_TYPE(task) = taskTHREAD;
1176: TASK_ROOT(task) = root;
1177:
1178: TASK_ARG(task) = arg;
1179:
1180: TASK_DATA(task) = opt_data;
1181: TASK_DATLEN(task) = opt_dlen;
1182:
1.12.4.2! misho 1183: pthread_attr_init(&attr);
! 1184: pthread_attr_setdetachstate(&attr, detach ?
! 1185: PTHREAD_CREATE_DETACHED : PTHREAD_CREATE_JOINABLE);
1.12.4.1 misho 1186: if (root->root_hooks.hook_add.thread)
1.12.4.2! misho 1187: ptr = root->root_hooks.hook_add.thread(task, &attr);
1.12.4.1 misho 1188: else
1189: ptr = NULL;
1.12.4.2! misho 1190: pthread_attr_destroy(&attr);
1.12.4.1 misho 1191:
1192: if (!ptr) {
1193: pthread_mutex_lock(&root->root_mtx[taskTHREAD]);
1194: TAILQ_INSERT_TAIL(&root->root_thread, TASK_ID(task), task_node);
1195: pthread_mutex_unlock(&root->root_mtx[taskTHREAD]);
1196: } else
1.12.4.2! misho 1197: task = sched_unuseTask(task);
1.12.4.1 misho 1198:
1199: return task;
1200: }
1201:
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>