1: /*************************************************************************
2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
6: * $Id: tasks.c,v 1.10.2.7 2012/08/02 11:37:08 misho Exp $
7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47:
48:
49: #pragma GCC visibility push(hidden)
50:
51: inline sched_task_t *
52: _sched_useTask(sched_root_task_t * __restrict root)
53: {
54: sched_task_t *task, *tmp;
55:
56: TAILQ_FOREACH_SAFE(task, &root->root_unuse, task_node, tmp) {
57: if (!TASK_ISLOCKED(task)) {
58: #ifdef HAVE_LIBPTHREAD
59: pthread_mutex_lock(&root->root_mtx[taskUNUSE]);
60: #endif
61: TAILQ_REMOVE(&root->root_unuse, task, task_node);
62: #ifdef HAVE_LIBPTHREAD
63: pthread_mutex_unlock(&root->root_mtx[taskUNUSE]);
64: #endif
65: break;
66: }
67: }
68:
69: if (!task) {
70: task = malloc(sizeof(sched_task_t));
71: if (!task) {
72: LOGERR;
73: return NULL;
74: }
75: }
76:
77: memset(task, 0, sizeof(sched_task_t));
78: task->task_id = (uintptr_t) task;
79: return task;
80: }
81:
82: inline sched_task_t *
83: _sched_unuseTask(sched_task_t * __restrict task)
84: {
85: TASK_UNLOCK(task);
86: TASK_TYPE(task) = taskUNUSE;
87: #ifdef HAVE_LIBPTHREAD
88: pthread_mutex_lock(&TASK_ROOT(task)->root_mtx[taskUNUSE]);
89: #endif
90: TAILQ_INSERT_TAIL(&TASK_ROOT(task)->root_unuse, TASK_ID(task), task_node);
91: #ifdef HAVE_LIBPTHREAD
92: pthread_mutex_unlock(&TASK_ROOT(task)->root_mtx[taskUNUSE]);
93: #endif
94: task = NULL;
95:
96: return task;
97: }
98:
99: #pragma GCC visibility pop
100:
101:
102: /*
103: * schedRead() - Add READ I/O task to scheduler queue
104: *
105: * @root = root task
106: * @func = task execution function
107: * @arg = 1st func argument
108: * @fd = fd handle
109: * @opt_data = Optional data
110: * @opt_dlen = Optional data length
111: * return: NULL error or !=NULL new queued task
112: */
113: sched_task_t *
114: schedRead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
115: void *opt_data, size_t opt_dlen)
116: {
117: sched_task_t *task;
118: void *ptr;
119:
120: if (!root || !func)
121: return NULL;
122:
123: /* get new task */
124: if (!(task = _sched_useTask(root)))
125: return NULL;
126:
127: task->task_func = func;
128: TASK_TYPE(task) = taskREAD;
129: TASK_ROOT(task) = root;
130:
131: TASK_ARG(task) = arg;
132: TASK_FD(task) = fd;
133:
134: TASK_DATA(task) = opt_data;
135: TASK_DATLEN(task) = opt_dlen;
136:
137: if (root->root_hooks.hook_add.read)
138: ptr = root->root_hooks.hook_add.read(task, NULL);
139: else
140: ptr = NULL;
141:
142: if (!ptr) {
143: #ifdef HAVE_LIBPTHREAD
144: pthread_mutex_lock(&root->root_mtx[taskREAD]);
145: #endif
146: TAILQ_INSERT_TAIL(&root->root_read, TASK_ID(task), task_node);
147: #ifdef HAVE_LIBPTHREAD
148: pthread_mutex_unlock(&root->root_mtx[taskREAD]);
149: #endif
150: } else
151: task = _sched_unuseTask(task);
152:
153: return task;
154: }
155:
156: /*
157: * schedWrite() - Add WRITE I/O task to scheduler queue
158: *
159: * @root = root task
160: * @func = task execution function
161: * @arg = 1st func argument
162: * @fd = fd handle
163: * @opt_data = Optional data
164: * @opt_dlen = Optional data length
165: * return: NULL error or !=NULL new queued task
166: */
167: sched_task_t *
168: schedWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
169: void *opt_data, size_t opt_dlen)
170: {
171: sched_task_t *task;
172: void *ptr;
173:
174: if (!root || !func)
175: return NULL;
176:
177: /* get new task */
178: if (!(task = _sched_useTask(root)))
179: return NULL;
180:
181: task->task_func = func;
182: TASK_TYPE(task) = taskWRITE;
183: TASK_ROOT(task) = root;
184:
185: TASK_ARG(task) = arg;
186: TASK_FD(task) = fd;
187:
188: TASK_DATA(task) = opt_data;
189: TASK_DATLEN(task) = opt_dlen;
190:
191: if (root->root_hooks.hook_add.write)
192: ptr = root->root_hooks.hook_add.write(task, NULL);
193: else
194: ptr = NULL;
195:
196: if (!ptr) {
197: #ifdef HAVE_LIBPTHREAD
198: pthread_mutex_lock(&root->root_mtx[taskWRITE]);
199: #endif
200: TAILQ_INSERT_TAIL(&root->root_write, TASK_ID(task), task_node);
201: #ifdef HAVE_LIBPTHREAD
202: pthread_mutex_unlock(&root->root_mtx[taskWRITE]);
203: #endif
204: } else
205: task = _sched_unuseTask(task);
206:
207: return task;
208: }
209:
210: /*
211: * schedNode() - Add NODE task to scheduler queue
212: *
213: * @root = root task
214: * @func = task execution function
215: * @arg = 1st func argument
216: * @fd = fd handle
217: * @opt_data = Optional data
218: * @opt_dlen = Optional data length
219: * return: NULL error or !=NULL new queued task
220: */
221: sched_task_t *
222: schedNode(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
223: void *opt_data, size_t opt_dlen)
224: {
225: sched_task_t *task;
226: void *ptr;
227:
228: if (!root || !func)
229: return NULL;
230:
231: /* get new task */
232: if (!(task = _sched_useTask(root)))
233: return NULL;
234:
235: task->task_func = func;
236: TASK_TYPE(task) = taskNODE;
237: TASK_ROOT(task) = root;
238:
239: TASK_ARG(task) = arg;
240: TASK_FD(task) = fd;
241:
242: TASK_DATA(task) = opt_data;
243: TASK_DATLEN(task) = opt_dlen;
244:
245: if (root->root_hooks.hook_add.node)
246: ptr = root->root_hooks.hook_add.node(task, NULL);
247: else
248: ptr = NULL;
249:
250: if (!ptr) {
251: #ifdef HAVE_LIBPTHREAD
252: pthread_mutex_lock(&root->root_mtx[taskNODE]);
253: #endif
254: TAILQ_INSERT_TAIL(&root->root_node, TASK_ID(task), task_node);
255: #ifdef HAVE_LIBPTHREAD
256: pthread_mutex_unlock(&root->root_mtx[taskNODE]);
257: #endif
258: } else
259: task = _sched_unuseTask(task);
260:
261: return task;
262: }
263:
264: /*
265: * schedProc() - Add PROC task to scheduler queue
266: *
267: * @root = root task
268: * @func = task execution function
269: * @arg = 1st func argument
270: * @pid = PID
271: * @opt_data = Optional data
272: * @opt_dlen = Optional data length
273: * return: NULL error or !=NULL new queued task
274: */
275: sched_task_t *
276: schedProc(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long pid,
277: void *opt_data, size_t opt_dlen)
278: {
279: sched_task_t *task;
280: void *ptr;
281:
282: if (!root || !func)
283: return NULL;
284:
285: /* get new task */
286: if (!(task = _sched_useTask(root)))
287: return NULL;
288:
289: task->task_func = func;
290: TASK_TYPE(task) = taskPROC;
291: TASK_ROOT(task) = root;
292:
293: TASK_ARG(task) = arg;
294: TASK_VAL(task) = pid;
295:
296: TASK_DATA(task) = opt_data;
297: TASK_DATLEN(task) = opt_dlen;
298:
299: if (root->root_hooks.hook_add.proc)
300: ptr = root->root_hooks.hook_add.proc(task, NULL);
301: else
302: ptr = NULL;
303:
304: if (!ptr) {
305: #ifdef HAVE_LIBPTHREAD
306: pthread_mutex_lock(&root->root_mtx[taskPROC]);
307: #endif
308: TAILQ_INSERT_TAIL(&root->root_proc, TASK_ID(task), task_node);
309: #ifdef HAVE_LIBPTHREAD
310: pthread_mutex_unlock(&root->root_mtx[taskPROC]);
311: #endif
312: } else
313: task = _sched_unuseTask(task);
314:
315: return task;
316: }
317:
318: /*
319: * schedUser() - Add trigger USER task to scheduler queue
320: *
321: * @root = root task
322: * @func = task execution function
323: * @arg = 1st func argument
324: * @id = Trigger ID
325: * @opt_data = Optional data
326: * @opt_dlen = Optional user's trigger flags
327: * return: NULL error or !=NULL new queued task
328: */
329: sched_task_t *
330: schedUser(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id,
331: void *opt_data, size_t opt_dlen)
332: {
333: #ifndef EVFILT_USER
334: sched_SetErr(ENOTSUP, "Not supported kevent() filter");
335: return NULL;
336: #else
337: sched_task_t *task;
338: void *ptr;
339:
340: if (!root || !func)
341: return NULL;
342:
343: /* get new task */
344: if (!(task = _sched_useTask(root)))
345: return NULL;
346:
347: task->task_func = func;
348: TASK_TYPE(task) = taskUSER;
349: TASK_ROOT(task) = root;
350:
351: TASK_ARG(task) = arg;
352: TASK_VAL(task) = id;
353:
354: TASK_DATA(task) = opt_data;
355: TASK_DATLEN(task) = opt_dlen;
356:
357: if (root->root_hooks.hook_add.user)
358: ptr = root->root_hooks.hook_add.user(task, NULL);
359: else
360: ptr = NULL;
361:
362: if (!ptr) {
363: #ifdef HAVE_LIBPTHREAD
364: pthread_mutex_lock(&root->root_mtx[taskUSER]);
365: #endif
366: TAILQ_INSERT_TAIL(&root->root_user, TASK_ID(task), task_node);
367: #ifdef HAVE_LIBPTHREAD
368: pthread_mutex_unlock(&root->root_mtx[taskUSER]);
369: #endif
370: } else
371: task = _sched_unuseTask(task);
372:
373: return task;
374: #endif
375: }
376:
377: /*
378: * schedSignal() - Add SIGNAL task to scheduler queue
379: *
380: * @root = root task
381: * @func = task execution function
382: * @arg = 1st func argument
383: * @sig = Signal
384: * @opt_data = Optional data
385: * @opt_dlen = Optional data length
386: * return: NULL error or !=NULL new queued task
387: */
388: sched_task_t *
389: schedSignal(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long sig,
390: void *opt_data, size_t opt_dlen)
391: {
392: sched_task_t *task;
393: void *ptr;
394:
395: if (!root || !func)
396: return NULL;
397:
398: /* get new task */
399: if (!(task = _sched_useTask(root)))
400: return NULL;
401:
402: task->task_func = func;
403: TASK_TYPE(task) = taskSIGNAL;
404: TASK_ROOT(task) = root;
405:
406: TASK_ARG(task) = arg;
407: TASK_VAL(task) = sig;
408:
409: TASK_DATA(task) = opt_data;
410: TASK_DATLEN(task) = opt_dlen;
411:
412: if (root->root_hooks.hook_add.signal)
413: ptr = root->root_hooks.hook_add.signal(task, NULL);
414: else
415: ptr = NULL;
416:
417: if (!ptr) {
418: #ifdef HAVE_LIBPTHREAD
419: pthread_mutex_lock(&root->root_mtx[taskSIGNAL]);
420: #endif
421: TAILQ_INSERT_TAIL(&root->root_signal, TASK_ID(task), task_node);
422: #ifdef HAVE_LIBPTHREAD
423: pthread_mutex_unlock(&root->root_mtx[taskSIGNAL]);
424: #endif
425: } else
426: task = _sched_unuseTask(task);
427:
428: return task;
429: }
430:
431: /*
432: * schedAlarm() - Add ALARM task to scheduler queue
433: *
434: * @root = root task
435: * @func = task execution function
436: * @arg = 1st func argument
437: * @ts = timeout argument structure, minimum alarm timer resolution is 1msec!
438: * @opt_data = Optional data
439: * @opt_dlen = Optional data length
440: * return: NULL error or !=NULL new queued task
441: */
442: sched_task_t *
443: schedAlarm(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,
444: void *opt_data, size_t opt_dlen)
445: {
446: sched_task_t *task;
447: void *ptr;
448:
449: if (!root || !func)
450: return NULL;
451:
452: /* get new task */
453: if (!(task = _sched_useTask(root)))
454: return NULL;
455:
456: task->task_func = func;
457: TASK_TYPE(task) = taskALARM;
458: TASK_ROOT(task) = root;
459:
460: TASK_ARG(task) = arg;
461: TASK_TS(task) = ts;
462:
463: TASK_DATA(task) = opt_data;
464: TASK_DATLEN(task) = opt_dlen;
465:
466: if (root->root_hooks.hook_add.alarm)
467: ptr = root->root_hooks.hook_add.alarm(task, NULL);
468: else
469: ptr = NULL;
470:
471: if (!ptr) {
472: #ifdef HAVE_LIBPTHREAD
473: pthread_mutex_lock(&root->root_mtx[taskALARM]);
474: #endif
475: TAILQ_INSERT_TAIL(&root->root_alarm, TASK_ID(task), task_node);
476: #ifdef HAVE_LIBPTHREAD
477: pthread_mutex_unlock(&root->root_mtx[taskALARM]);
478: #endif
479: } else
480: task = _sched_unuseTask(task);
481:
482: return task;
483: }
484:
485: #ifdef EVFILT_AIO
486: /*
487: * schedAIO() - Add AIO task to scheduler queue
488: *
489: * @root = root task
490: * @func = task execution function
491: * @arg = 1st func argument
492: * @acb = AIO cb structure address
493: * @opt_data = Optional data
494: * @opt_dlen = Optional data length
495: * return: NULL error or !=NULL new queued task
496: */
497: sched_task_t *
498: schedAIO(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg,
499: struct aiocb * __restrict acb, void *opt_data, size_t opt_dlen)
500: {
501: sched_task_t *task;
502: void *ptr;
503:
504: if (!root || !func || !acb || !opt_dlen)
505: return NULL;
506:
507: /* get new task */
508: if (!(task = _sched_useTask(root)))
509: return NULL;
510:
511: task->task_func = func;
512: TASK_TYPE(task) = taskAIO;
513: TASK_ROOT(task) = root;
514:
515: TASK_ARG(task) = arg;
516: TASK_VAL(task) = (u_long) acb;
517:
518: TASK_DATA(task) = opt_data;
519: TASK_DATLEN(task) = opt_dlen;
520:
521: if (root->root_hooks.hook_add.aio)
522: ptr = root->root_hooks.hook_add.aio(task, NULL);
523: else
524: ptr = NULL;
525:
526: if (!ptr) {
527: #ifdef HAVE_LIBPTHREAD
528: pthread_mutex_lock(&root->root_mtx[taskAIO]);
529: #endif
530: TAILQ_INSERT_TAIL(&root->root_aio, TASK_ID(task), task_node);
531: #ifdef HAVE_LIBPTHREAD
532: pthread_mutex_unlock(&root->root_mtx[taskAIO]);
533: #endif
534: } else
535: task = _sched_unuseTask(task);
536:
537: return task;
538: }
539:
540: /*
541: * schedAIORead() - Add AIO read task to scheduler queue
542: *
543: * @root = root task
544: * @func = task execution function
545: * @arg = 1st func argument
546: * @fd = file descriptor
547: * @buffer = Buffer
548: * @buflen = Buffer length
549: * return: NULL error or !=NULL new queued task
550: */
551: inline sched_task_t *
552: schedAIORead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
553: void *buffer, size_t buflen)
554: {
555: struct aiocb *acb;
556: off_t off = 0;
557:
558: if (!root || !func || !buffer || !buflen)
559: return NULL;
560: else
561: memset(buffer, 0, buflen);
562:
563: if (!(acb = malloc(sizeof(struct aiocb)))) {
564: LOGERR;
565: return NULL;
566: } else
567: memset(acb, 0, sizeof(struct aiocb));
568:
569: acb->aio_fildes = fd;
570: acb->aio_nbytes = buflen;
571: acb->aio_buf = buffer;
572: off = lseek(fd, 0, SEEK_CUR);
573: if (off == -1) {
574: LOGERR;
575: free(acb);
576: return NULL;
577: } else
578: acb->aio_offset = off;
579: acb->aio_sigevent.sigev_notify = SIGEV_KEVENT;
580: acb->aio_sigevent.sigev_notify_kqueue = root->root_kq;
581: acb->aio_sigevent.sigev_value.sival_ptr = acb;
582:
583: if (aio_read(acb)) {
584: LOGERR;
585: free(acb);
586: return NULL;
587: }
588:
589: return schedAIO(root, func, arg, acb, buffer, buflen);
590: }
591:
592: /*
593: * schedAIOWrite() - Add AIO write task to scheduler queue
594: *
595: * @root = root task
596: * @func = task execution function
597: * @arg = 1st func argument
598: * @fd = file descriptor
599: * @buffer = Buffer
600: * @buflen = Buffer length
601: * return: NULL error or !=NULL new queued task
602: */
603: inline sched_task_t *
604: schedAIOWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
605: void *buffer, size_t buflen)
606: {
607: struct aiocb *acb;
608: off_t off = 0;
609:
610: if (!root || !func || !buffer || !buflen)
611: return NULL;
612:
613: if (!(acb = malloc(sizeof(struct aiocb)))) {
614: LOGERR;
615: return NULL;
616: } else
617: memset(acb, 0, sizeof(struct aiocb));
618:
619: acb->aio_fildes = fd;
620: acb->aio_nbytes = buflen;
621: acb->aio_buf = buffer;
622: off = lseek(fd, 0, SEEK_CUR);
623: if (off == -1) {
624: LOGERR;
625: free(acb);
626: return NULL;
627: } else
628: acb->aio_offset = off;
629: acb->aio_sigevent.sigev_notify = SIGEV_KEVENT;
630: acb->aio_sigevent.sigev_notify_kqueue = root->root_kq;
631: acb->aio_sigevent.sigev_value.sival_ptr = acb;
632:
633: if (aio_write(acb)) {
634: LOGERR;
635: free(acb);
636: return NULL;
637: }
638:
639: return schedAIO(root, func, arg, acb, buffer, buflen);
640: }
641:
642: #ifdef EVFILT_LIO
643: /*
644: * schedLIORead() - Add list of AIO read tasks to scheduler queue
645: *
646: * @root = root task
647: * @func = task execution function
648: * @arg = 1st func argument
649: * @fd = file descriptor
650: * @bufs = Buffer's list
651: * @nbufs = Number of Buffers
652: * @offset = Offset from start of file, if =-1 from current position
653: * return: NULL error or !=NULL new queued task
654: */
655: sched_task_t *
656: schedLIORead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
657: struct iovec *bufs, size_t nbufs, off_t offset)
658: {
659: struct sigevent sig;
660: struct aiocb **acb;
661: off_t off;
662: register int i;
663:
664: if (!root || !func || !bufs || !nbufs)
665: return NULL;
666:
667: if (offset == (off_t) -1) {
668: off = lseek(fd, 0, SEEK_CUR);
669: if (off == -1) {
670: LOGERR;
671: return NULL;
672: }
673: } else
674: off = offset;
675:
676: if (!(acb = calloc(sizeof(void*), nbufs))) {
677: LOGERR;
678: return NULL;
679: } else
680: memset(acb, 0, sizeof(void*) * nbufs);
681: for (i = 0; i < nbufs; off += bufs[i++].iov_len) {
682: acb[i] = malloc(sizeof(struct aiocb));
683: if (!acb[i]) {
684: LOGERR;
685: for (i = 0; i < nbufs; i++)
686: if (acb[i])
687: free(acb[i]);
688: free(acb);
689: return NULL;
690: } else
691: memset(acb[i], 0, sizeof(struct aiocb));
692: acb[i]->aio_fildes = fd;
693: acb[i]->aio_nbytes = bufs[i].iov_len;
694: acb[i]->aio_buf = bufs[i].iov_base;
695: acb[i]->aio_offset = off;
696: acb[i]->aio_lio_opcode = LIO_READ;
697: }
698: memset(&sig, 0, sizeof sig);
699: sig.sigev_notify = SIGEV_KEVENT;
700: sig.sigev_notify_kqueue = root->root_kq;
701: sig.sigev_value.sival_ptr = acb;
702:
703: if (lio_listio(LIO_NOWAIT, acb, nbufs, &sig)) {
704: LOGERR;
705: return NULL;
706: }
707:
708: return schedAIO(root, func, arg, (void*) acb, bufs, nbufs);
709: }
710:
711: /*
712: * schedLIOWrite() - Add list of AIO write tasks to scheduler queue
713: *
714: * @root = root task
715: * @func = task execution function
716: * @arg = 1st func argument
717: * @fd = file descriptor
718: * @bufs = Buffer's list
719: * @nbufs = Number of Buffers
720: * @offset = Offset from start of file, if =-1 from current position
721: * return: NULL error or !=NULL new queued task
722: */
723: inline sched_task_t *
724: schedLIOWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
725: struct iovec *bufs, size_t nbufs, off_t offset)
726: {
727: struct sigevent sig;
728: struct aiocb **acb;
729: off_t off;
730: register int i;
731:
732: if (!root || !func || !bufs || !nbufs)
733: return NULL;
734:
735: if (offset == (off_t) -1) {
736: off = lseek(fd, 0, SEEK_CUR);
737: if (off == -1) {
738: LOGERR;
739: return NULL;
740: }
741: } else
742: off = offset;
743:
744: if (!(acb = calloc(sizeof(void*), nbufs))) {
745: LOGERR;
746: return NULL;
747: } else
748: memset(acb, 0, sizeof(void*) * nbufs);
749: for (i = 0; i < nbufs; off += bufs[i++].iov_len) {
750: acb[i] = malloc(sizeof(struct aiocb));
751: if (!acb[i]) {
752: LOGERR;
753: for (i = 0; i < nbufs; i++)
754: if (acb[i])
755: free(acb[i]);
756: free(acb);
757: return NULL;
758: } else
759: memset(acb[i], 0, sizeof(struct aiocb));
760: acb[i]->aio_fildes = fd;
761: acb[i]->aio_nbytes = bufs[i].iov_len;
762: acb[i]->aio_buf = bufs[i].iov_base;
763: acb[i]->aio_offset = off;
764: acb[i]->aio_lio_opcode = LIO_WRITE;
765: }
766: memset(&sig, 0, sizeof sig);
767: sig.sigev_notify = SIGEV_KEVENT;
768: sig.sigev_notify_kqueue = root->root_kq;
769: sig.sigev_value.sival_ptr = acb;
770:
771: if (lio_listio(LIO_NOWAIT, acb, nbufs, &sig)) {
772: LOGERR;
773: return NULL;
774: }
775:
776: return schedAIO(root, func, arg, (void*) acb, bufs, nbufs);
777: }
778: #endif /* EVFILT_LIO */
779: #endif /* EVFILT_AIO */
780:
781: /*
782: * schedTimer() - Add TIMER task to scheduler queue
783: *
784: * @root = root task
785: * @func = task execution function
786: * @arg = 1st func argument
787: * @ts = timeout argument structure
788: * @opt_data = Optional data
789: * @opt_dlen = Optional data length
790: * return: NULL error or !=NULL new queued task
791: */
792: sched_task_t *
793: schedTimer(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,
794: void *opt_data, size_t opt_dlen)
795: {
796: sched_task_t *task, *tmp, *t = NULL;
797: void *ptr;
798: struct timespec now;
799:
800: if (!root || !func)
801: return NULL;
802:
803: /* get new task */
804: if (!(task = _sched_useTask(root)))
805: return NULL;
806:
807: task->task_func = func;
808: TASK_TYPE(task) = taskTIMER;
809: TASK_ROOT(task) = root;
810:
811: TASK_ARG(task) = arg;
812:
813: TASK_DATA(task) = opt_data;
814: TASK_DATLEN(task) = opt_dlen;
815:
816: /* calculate timeval structure */
817: clock_gettime(CLOCK_MONOTONIC, &now);
818: now.tv_sec += ts.tv_sec;
819: now.tv_nsec += ts.tv_nsec;
820: if (now.tv_nsec >= 1000000000L) {
821: now.tv_sec++;
822: now.tv_nsec -= 1000000000L;
823: } else if (now.tv_nsec < 0) {
824: now.tv_sec--;
825: now.tv_nsec += 1000000000L;
826: }
827: TASK_TS(task) = now;
828:
829: if (root->root_hooks.hook_add.timer)
830: ptr = root->root_hooks.hook_add.timer(task, NULL);
831: else
832: ptr = NULL;
833:
834: if (!ptr) {
835: #ifdef HAVE_LIBPTHREAD
836: pthread_mutex_lock(&root->root_mtx[taskTIMER]);
837: #endif
838: #ifdef TIMER_WITHOUT_SORT
839: TAILQ_INSERT_TAIL(&root->root_timer, TASK_ID(task), task_node);
840: #else
841: TAILQ_FOREACH_SAFE(t, &root->root_timer, task_node, tmp)
842: if (sched_timespeccmp(&TASK_TS(task), &TASK_TS(t), -) < 1)
843: break;
844: if (!t)
845: TAILQ_INSERT_TAIL(&root->root_timer, TASK_ID(task), task_node);
846: else
847: TAILQ_INSERT_BEFORE(t, TASK_ID(task), task_node);
848: #endif
849: #ifdef HAVE_LIBPTHREAD
850: pthread_mutex_unlock(&root->root_mtx[taskTIMER]);
851: #endif
852: } else
853: task = _sched_unuseTask(task);
854:
855: return task;
856: }
857:
858: /*
859: * schedEvent() - Add EVENT task to scheduler queue
860: *
861: * @root = root task
862: * @func = task execution function
863: * @arg = 1st func argument
864: * @val = additional func argument
865: * @opt_data = Optional data
866: * @opt_dlen = Optional data length
867: * return: NULL error or !=NULL new queued task
868: */
869: sched_task_t *
870: schedEvent(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val,
871: void *opt_data, size_t opt_dlen)
872: {
873: sched_task_t *task;
874: void *ptr;
875:
876: if (!root || !func)
877: return NULL;
878:
879: /* get new task */
880: if (!(task = _sched_useTask(root)))
881: return NULL;
882:
883: task->task_func = func;
884: TASK_TYPE(task) = taskEVENT;
885: TASK_ROOT(task) = root;
886:
887: TASK_ARG(task) = arg;
888: TASK_VAL(task) = val;
889:
890: TASK_DATA(task) = opt_data;
891: TASK_DATLEN(task) = opt_dlen;
892:
893: if (root->root_hooks.hook_add.event)
894: ptr = root->root_hooks.hook_add.event(task, NULL);
895: else
896: ptr = NULL;
897:
898: if (!ptr) {
899: #ifdef HAVE_LIBPTHREAD
900: pthread_mutex_lock(&root->root_mtx[taskEVENT]);
901: #endif
902: TAILQ_INSERT_TAIL(&root->root_event, TASK_ID(task), task_node);
903: #ifdef HAVE_LIBPTHREAD
904: pthread_mutex_unlock(&root->root_mtx[taskEVENT]);
905: #endif
906: } else
907: task = _sched_unuseTask(task);
908:
909: return task;
910: }
911:
912:
913: /*
914: * schedEventLo() - Add EVENT_Lo task to scheduler queue
915: *
916: * @root = root task
917: * @func = task execution function
918: * @arg = 1st func argument
919: * @val = additional func argument
920: * @opt_data = Optional data
921: * @opt_dlen = Optional data length
922: * return: NULL error or !=NULL new queued task
923: */
924: sched_task_t *
925: schedEventLo(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val,
926: void *opt_data, size_t opt_dlen)
927: {
928: sched_task_t *task;
929: void *ptr;
930:
931: if (!root || !func)
932: return NULL;
933:
934: /* get new task */
935: if (!(task = _sched_useTask(root)))
936: return NULL;
937:
938: task->task_func = func;
939: TASK_TYPE(task) = taskEVENT;
940: TASK_ROOT(task) = root;
941:
942: TASK_ARG(task) = arg;
943: TASK_VAL(task) = val;
944:
945: TASK_DATA(task) = opt_data;
946: TASK_DATLEN(task) = opt_dlen;
947:
948: if (root->root_hooks.hook_add.eventlo)
949: ptr = root->root_hooks.hook_add.eventlo(task, NULL);
950: else
951: ptr = NULL;
952:
953: if (!ptr) {
954: #ifdef HAVE_LIBPTHREAD
955: pthread_mutex_lock(&root->root_mtx[taskEVENTLO]);
956: #endif
957: TAILQ_INSERT_TAIL(&root->root_eventlo, TASK_ID(task), task_node);
958: #ifdef HAVE_LIBPTHREAD
959: pthread_mutex_unlock(&root->root_mtx[taskEVENTLO]);
960: #endif
961: } else
962: task = _sched_unuseTask(task);
963:
964: return task;
965: }
966:
967: /*
968: * schedSuspend() - Add Suspended task to scheduler queue
969: *
970: * @root = root task
971: * @func = task execution function
972: * @arg = 1st func argument
973: * @id = Trigger ID
974: * @opt_data = Optional data
975: * @opt_dlen = Optional data length
976: * return: NULL error or !=NULL new queued task
977: */
978: sched_task_t *
979: schedSuspend(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id,
980: void *opt_data, size_t opt_dlen)
981: {
982: sched_task_t *task;
983: void *ptr;
984:
985: if (!root || !func)
986: return NULL;
987:
988: /* get new task */
989: if (!(task = _sched_useTask(root)))
990: return NULL;
991:
992: task->task_func = func;
993: TASK_TYPE(task) = taskSUSPEND;
994: TASK_ROOT(task) = root;
995:
996: TASK_ARG(task) = arg;
997: TASK_VAL(task) = id;
998:
999: TASK_DATA(task) = opt_data;
1000: TASK_DATLEN(task) = opt_dlen;
1001:
1002: if (root->root_hooks.hook_add.suspend)
1003: ptr = root->root_hooks.hook_add.suspend(task, NULL);
1004: else
1005: ptr = NULL;
1006:
1007: if (!ptr) {
1008: #ifdef HAVE_LIBPTHREAD
1009: pthread_mutex_lock(&root->root_mtx[taskSUSPEND]);
1010: #endif
1011: TAILQ_INSERT_TAIL(&root->root_suspend, TASK_ID(task), task_node);
1012: #ifdef HAVE_LIBPTHREAD
1013: pthread_mutex_unlock(&root->root_mtx[taskSUSPEND]);
1014: #endif
1015: } else
1016: task = _sched_unuseTask(task);
1017:
1018: return task;
1019: }
1020:
1021: /*
1022: * schedCallOnce() - Call once from scheduler
1023: *
1024: * @root = root task
1025: * @func = task execution function
1026: * @arg = 1st func argument
1027: * @val = additional func argument
1028: * @opt_data = Optional data
1029: * @opt_dlen = Optional data length
1030: * return: return value from called func
1031: */
1032: sched_task_t *
1033: schedCallOnce(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val,
1034: void *opt_data, size_t opt_dlen)
1035: {
1036: sched_task_t *task;
1037: void *ret;
1038:
1039: if (!root || !func)
1040: return NULL;
1041:
1042: /* get new task */
1043: if (!(task = _sched_useTask(root)))
1044: return NULL;
1045:
1046: task->task_func = func;
1047: TASK_TYPE(task) = taskEVENT;
1048: TASK_ROOT(task) = root;
1049:
1050: TASK_ARG(task) = arg;
1051: TASK_VAL(task) = val;
1052:
1053: TASK_DATA(task) = opt_data;
1054: TASK_DATLEN(task) = opt_dlen;
1055:
1056: ret = schedCall(task);
1057:
1058: _sched_unuseTask(task);
1059: return ret;
1060: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>