Annotation of libaitsched/src/tasks.c, revision 1.11
1.1 misho 1: /*************************************************************************
2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.11 ! misho 6: * $Id: tasks.c,v 1.10.2.12 2012/08/02 13:53:22 misho Exp $
1.1 misho 7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
1.6 misho 15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
1.1 misho 16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47:
48:
1.4 misho 49: #pragma GCC visibility push(hidden)
50:
51: inline sched_task_t *
52: _sched_useTask(sched_root_task_t * __restrict root)
53: {
1.7 misho 54: sched_task_t *task, *tmp;
1.4 misho 55:
1.7 misho 56: TAILQ_FOREACH_SAFE(task, &root->root_unuse, task_node, tmp) {
1.4 misho 57: if (!TASK_ISLOCKED(task)) {
1.5 misho 58: #ifdef HAVE_LIBPTHREAD
59: pthread_mutex_lock(&root->root_mtx[taskUNUSE]);
60: #endif
1.4 misho 61: TAILQ_REMOVE(&root->root_unuse, task, task_node);
1.5 misho 62: #ifdef HAVE_LIBPTHREAD
63: pthread_mutex_unlock(&root->root_mtx[taskUNUSE]);
64: #endif
1.4 misho 65: break;
66: }
67: }
68:
69: if (!task) {
70: task = malloc(sizeof(sched_task_t));
71: if (!task) {
72: LOGERR;
73: return NULL;
74: }
75: }
76:
1.9 misho 77: memset(task, 0, sizeof(sched_task_t));
78: task->task_id = (uintptr_t) task;
1.4 misho 79: return task;
80: }
81:
82: inline sched_task_t *
83: _sched_unuseTask(sched_task_t * __restrict task)
84: {
85: TASK_UNLOCK(task);
1.5 misho 86: TASK_TYPE(task) = taskUNUSE;
87: #ifdef HAVE_LIBPTHREAD
88: pthread_mutex_lock(&TASK_ROOT(task)->root_mtx[taskUNUSE]);
89: #endif
1.9 misho 90: TAILQ_INSERT_TAIL(&TASK_ROOT(task)->root_unuse, TASK_ID(task), task_node);
1.5 misho 91: #ifdef HAVE_LIBPTHREAD
92: pthread_mutex_unlock(&TASK_ROOT(task)->root_mtx[taskUNUSE]);
93: #endif
1.4 misho 94: task = NULL;
95:
96: return task;
97: }
98:
99: #pragma GCC visibility pop
100:
101:
1.1 misho 102: /*
1.2 misho 103: * schedRead() - Add READ I/O task to scheduler queue
1.6 misho 104: *
1.1 misho 105: * @root = root task
106: * @func = task execution function
107: * @arg = 1st func argument
1.2 misho 108: * @fd = fd handle
1.5 misho 109: * @opt_data = Optional data
110: * @opt_dlen = Optional data length
1.1 misho 111: * return: NULL error or !=NULL new queued task
112: */
113: sched_task_t *
1.5 misho 114: schedRead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
115: void *opt_data, size_t opt_dlen)
1.1 misho 116: {
117: sched_task_t *task;
118: void *ptr;
119:
120: if (!root || !func)
121: return NULL;
122:
123: /* get new task */
1.4 misho 124: if (!(task = _sched_useTask(root)))
125: return NULL;
1.1 misho 126:
127: task->task_func = func;
1.5 misho 128: TASK_TYPE(task) = taskREAD;
129: TASK_ROOT(task) = root;
1.1 misho 130:
131: TASK_ARG(task) = arg;
1.2 misho 132: TASK_FD(task) = fd;
1.1 misho 133:
1.5 misho 134: TASK_DATA(task) = opt_data;
135: TASK_DATLEN(task) = opt_dlen;
136:
1.1 misho 137: if (root->root_hooks.hook_add.read)
138: ptr = root->root_hooks.hook_add.read(task, NULL);
139: else
140: ptr = NULL;
141:
1.5 misho 142: if (!ptr) {
143: #ifdef HAVE_LIBPTHREAD
144: pthread_mutex_lock(&root->root_mtx[taskREAD]);
145: #endif
1.9 misho 146: TAILQ_INSERT_TAIL(&root->root_read, TASK_ID(task), task_node);
1.5 misho 147: #ifdef HAVE_LIBPTHREAD
148: pthread_mutex_unlock(&root->root_mtx[taskREAD]);
149: #endif
150: } else
1.4 misho 151: task = _sched_unuseTask(task);
1.1 misho 152:
153: return task;
154: }
155:
156: /*
1.2 misho 157: * schedWrite() - Add WRITE I/O task to scheduler queue
1.6 misho 158: *
1.1 misho 159: * @root = root task
160: * @func = task execution function
161: * @arg = 1st func argument
1.2 misho 162: * @fd = fd handle
1.5 misho 163: * @opt_data = Optional data
164: * @opt_dlen = Optional data length
1.1 misho 165: * return: NULL error or !=NULL new queued task
166: */
167: sched_task_t *
1.5 misho 168: schedWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
169: void *opt_data, size_t opt_dlen)
1.1 misho 170: {
171: sched_task_t *task;
172: void *ptr;
173:
174: if (!root || !func)
175: return NULL;
176:
177: /* get new task */
1.4 misho 178: if (!(task = _sched_useTask(root)))
179: return NULL;
1.1 misho 180:
181: task->task_func = func;
1.5 misho 182: TASK_TYPE(task) = taskWRITE;
183: TASK_ROOT(task) = root;
1.1 misho 184:
185: TASK_ARG(task) = arg;
1.2 misho 186: TASK_FD(task) = fd;
1.1 misho 187:
1.5 misho 188: TASK_DATA(task) = opt_data;
189: TASK_DATLEN(task) = opt_dlen;
190:
1.1 misho 191: if (root->root_hooks.hook_add.write)
192: ptr = root->root_hooks.hook_add.write(task, NULL);
193: else
194: ptr = NULL;
195:
1.5 misho 196: if (!ptr) {
197: #ifdef HAVE_LIBPTHREAD
198: pthread_mutex_lock(&root->root_mtx[taskWRITE]);
199: #endif
1.9 misho 200: TAILQ_INSERT_TAIL(&root->root_write, TASK_ID(task), task_node);
1.5 misho 201: #ifdef HAVE_LIBPTHREAD
202: pthread_mutex_unlock(&root->root_mtx[taskWRITE]);
203: #endif
204: } else
1.4 misho 205: task = _sched_unuseTask(task);
1.1 misho 206:
207: return task;
208: }
209:
210: /*
1.9 misho 211: * schedNode() - Add NODE task to scheduler queue
212: *
213: * @root = root task
214: * @func = task execution function
215: * @arg = 1st func argument
216: * @fd = fd handle
217: * @opt_data = Optional data
218: * @opt_dlen = Optional data length
219: * return: NULL error or !=NULL new queued task
220: */
221: sched_task_t *
222: schedNode(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
223: void *opt_data, size_t opt_dlen)
224: {
225: sched_task_t *task;
226: void *ptr;
227:
228: if (!root || !func)
229: return NULL;
230:
231: /* get new task */
232: if (!(task = _sched_useTask(root)))
233: return NULL;
234:
235: task->task_func = func;
236: TASK_TYPE(task) = taskNODE;
237: TASK_ROOT(task) = root;
238:
239: TASK_ARG(task) = arg;
240: TASK_FD(task) = fd;
241:
242: TASK_DATA(task) = opt_data;
243: TASK_DATLEN(task) = opt_dlen;
244:
245: if (root->root_hooks.hook_add.node)
246: ptr = root->root_hooks.hook_add.node(task, NULL);
247: else
248: ptr = NULL;
249:
250: if (!ptr) {
251: #ifdef HAVE_LIBPTHREAD
252: pthread_mutex_lock(&root->root_mtx[taskNODE]);
253: #endif
254: TAILQ_INSERT_TAIL(&root->root_node, TASK_ID(task), task_node);
255: #ifdef HAVE_LIBPTHREAD
256: pthread_mutex_unlock(&root->root_mtx[taskNODE]);
257: #endif
258: } else
259: task = _sched_unuseTask(task);
260:
261: return task;
262: }
263:
264: /*
265: * schedProc() - Add PROC task to scheduler queue
266: *
267: * @root = root task
268: * @func = task execution function
269: * @arg = 1st func argument
270: * @pid = PID
271: * @opt_data = Optional data
272: * @opt_dlen = Optional data length
273: * return: NULL error or !=NULL new queued task
274: */
275: sched_task_t *
276: schedProc(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long pid,
277: void *opt_data, size_t opt_dlen)
278: {
279: sched_task_t *task;
280: void *ptr;
281:
282: if (!root || !func)
283: return NULL;
284:
285: /* get new task */
286: if (!(task = _sched_useTask(root)))
287: return NULL;
288:
289: task->task_func = func;
290: TASK_TYPE(task) = taskPROC;
291: TASK_ROOT(task) = root;
292:
293: TASK_ARG(task) = arg;
294: TASK_VAL(task) = pid;
295:
296: TASK_DATA(task) = opt_data;
297: TASK_DATLEN(task) = opt_dlen;
298:
299: if (root->root_hooks.hook_add.proc)
300: ptr = root->root_hooks.hook_add.proc(task, NULL);
301: else
302: ptr = NULL;
303:
304: if (!ptr) {
305: #ifdef HAVE_LIBPTHREAD
306: pthread_mutex_lock(&root->root_mtx[taskPROC]);
307: #endif
308: TAILQ_INSERT_TAIL(&root->root_proc, TASK_ID(task), task_node);
309: #ifdef HAVE_LIBPTHREAD
310: pthread_mutex_unlock(&root->root_mtx[taskPROC]);
311: #endif
312: } else
313: task = _sched_unuseTask(task);
314:
315: return task;
316: }
317:
318: /*
319: * schedUser() - Add trigger USER task to scheduler queue
320: *
321: * @root = root task
322: * @func = task execution function
323: * @arg = 1st func argument
324: * @id = Trigger ID
325: * @opt_data = Optional data
326: * @opt_dlen = Optional user's trigger flags
327: * return: NULL error or !=NULL new queued task
328: */
329: sched_task_t *
330: schedUser(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id,
331: void *opt_data, size_t opt_dlen)
332: {
333: #ifndef EVFILT_USER
334: sched_SetErr(ENOTSUP, "Not supported kevent() filter");
335: return NULL;
336: #else
337: sched_task_t *task;
338: void *ptr;
339:
340: if (!root || !func)
341: return NULL;
342:
343: /* get new task */
344: if (!(task = _sched_useTask(root)))
345: return NULL;
346:
347: task->task_func = func;
348: TASK_TYPE(task) = taskUSER;
349: TASK_ROOT(task) = root;
350:
351: TASK_ARG(task) = arg;
352: TASK_VAL(task) = id;
353:
354: TASK_DATA(task) = opt_data;
355: TASK_DATLEN(task) = opt_dlen;
356:
357: if (root->root_hooks.hook_add.user)
358: ptr = root->root_hooks.hook_add.user(task, NULL);
359: else
360: ptr = NULL;
361:
362: if (!ptr) {
363: #ifdef HAVE_LIBPTHREAD
364: pthread_mutex_lock(&root->root_mtx[taskUSER]);
365: #endif
366: TAILQ_INSERT_TAIL(&root->root_user, TASK_ID(task), task_node);
367: #ifdef HAVE_LIBPTHREAD
368: pthread_mutex_unlock(&root->root_mtx[taskUSER]);
369: #endif
370: } else
371: task = _sched_unuseTask(task);
372:
373: return task;
374: #endif
375: }
376:
377: /*
378: * schedSignal() - Add SIGNAL task to scheduler queue
379: *
380: * @root = root task
381: * @func = task execution function
382: * @arg = 1st func argument
383: * @sig = Signal
384: * @opt_data = Optional data
385: * @opt_dlen = Optional data length
386: * return: NULL error or !=NULL new queued task
387: */
388: sched_task_t *
389: schedSignal(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long sig,
390: void *opt_data, size_t opt_dlen)
391: {
392: sched_task_t *task;
393: void *ptr;
394:
395: if (!root || !func)
396: return NULL;
397:
398: /* get new task */
399: if (!(task = _sched_useTask(root)))
400: return NULL;
401:
402: task->task_func = func;
403: TASK_TYPE(task) = taskSIGNAL;
404: TASK_ROOT(task) = root;
405:
406: TASK_ARG(task) = arg;
407: TASK_VAL(task) = sig;
408:
409: TASK_DATA(task) = opt_data;
410: TASK_DATLEN(task) = opt_dlen;
411:
412: if (root->root_hooks.hook_add.signal)
413: ptr = root->root_hooks.hook_add.signal(task, NULL);
414: else
415: ptr = NULL;
416:
417: if (!ptr) {
418: #ifdef HAVE_LIBPTHREAD
419: pthread_mutex_lock(&root->root_mtx[taskSIGNAL]);
420: #endif
421: TAILQ_INSERT_TAIL(&root->root_signal, TASK_ID(task), task_node);
422: #ifdef HAVE_LIBPTHREAD
423: pthread_mutex_unlock(&root->root_mtx[taskSIGNAL]);
424: #endif
425: } else
426: task = _sched_unuseTask(task);
427:
428: return task;
429: }
430:
431: /*
1.8 misho 432: * schedAlarm() - Add ALARM task to scheduler queue
433: *
434: * @root = root task
435: * @func = task execution function
436: * @arg = 1st func argument
437: * @ts = timeout argument structure, minimum alarm timer resolution is 1msec!
438: * @opt_data = Optional data
439: * @opt_dlen = Optional data length
440: * return: NULL error or !=NULL new queued task
441: */
442: sched_task_t *
443: schedAlarm(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,
444: void *opt_data, size_t opt_dlen)
445: {
446: sched_task_t *task;
447: void *ptr;
448:
449: if (!root || !func)
450: return NULL;
451:
452: /* get new task */
453: if (!(task = _sched_useTask(root)))
454: return NULL;
455:
456: task->task_func = func;
457: TASK_TYPE(task) = taskALARM;
458: TASK_ROOT(task) = root;
459:
460: TASK_ARG(task) = arg;
461: TASK_TS(task) = ts;
462:
463: TASK_DATA(task) = opt_data;
464: TASK_DATLEN(task) = opt_dlen;
465:
466: if (root->root_hooks.hook_add.alarm)
467: ptr = root->root_hooks.hook_add.alarm(task, NULL);
468: else
469: ptr = NULL;
470:
471: if (!ptr) {
472: #ifdef HAVE_LIBPTHREAD
473: pthread_mutex_lock(&root->root_mtx[taskALARM]);
474: #endif
1.9 misho 475: TAILQ_INSERT_TAIL(&root->root_alarm, TASK_ID(task), task_node);
1.8 misho 476: #ifdef HAVE_LIBPTHREAD
477: pthread_mutex_unlock(&root->root_mtx[taskALARM]);
478: #endif
479: } else
480: task = _sched_unuseTask(task);
481:
482: return task;
483: }
484:
1.11 ! misho 485: #ifdef AIO_SUPPORT
! 486: /*
! 487: * schedAIO() - Add AIO task to scheduler queue
! 488: *
! 489: * @root = root task
! 490: * @func = task execution function
! 491: * @arg = 1st func argument
! 492: * @acb = AIO cb structure address
! 493: * @opt_data = Optional data
! 494: * @opt_dlen = Optional data length
! 495: * return: NULL error or !=NULL new queued task
! 496: */
! 497: sched_task_t *
! 498: schedAIO(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg,
! 499: struct aiocb * __restrict acb, void *opt_data, size_t opt_dlen)
! 500: {
! 501: sched_task_t *task;
! 502: void *ptr;
! 503:
! 504: if (!root || !func || !acb || !opt_dlen)
! 505: return NULL;
! 506:
! 507: /* get new task */
! 508: if (!(task = _sched_useTask(root)))
! 509: return NULL;
! 510:
! 511: task->task_func = func;
! 512: TASK_TYPE(task) = taskAIO;
! 513: TASK_ROOT(task) = root;
! 514:
! 515: TASK_ARG(task) = arg;
! 516: TASK_VAL(task) = (u_long) acb;
! 517:
! 518: TASK_DATA(task) = opt_data;
! 519: TASK_DATLEN(task) = opt_dlen;
! 520:
! 521: if (root->root_hooks.hook_add.aio)
! 522: ptr = root->root_hooks.hook_add.aio(task, NULL);
! 523: else
! 524: ptr = NULL;
! 525:
! 526: if (!ptr) {
! 527: #ifdef HAVE_LIBPTHREAD
! 528: pthread_mutex_lock(&root->root_mtx[taskAIO]);
! 529: #endif
! 530: TAILQ_INSERT_TAIL(&root->root_aio, TASK_ID(task), task_node);
! 531: #ifdef HAVE_LIBPTHREAD
! 532: pthread_mutex_unlock(&root->root_mtx[taskAIO]);
! 533: #endif
! 534: } else
! 535: task = _sched_unuseTask(task);
! 536:
! 537: return task;
! 538: }
! 539:
! 540: /*
! 541: * schedAIORead() - Add AIO read task to scheduler queue
! 542: *
! 543: * @root = root task
! 544: * @func = task execution function
! 545: * @arg = 1st func argument
! 546: * @fd = file descriptor
! 547: * @buffer = Buffer
! 548: * @buflen = Buffer length
! 549: * @offset = Offset from start of file, if =-1 from current position
! 550: * return: NULL error or !=NULL new queued task
! 551: */
! 552: inline sched_task_t *
! 553: schedAIORead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
! 554: void *buffer, size_t buflen, off_t offset)
! 555: {
! 556: struct aiocb *acb;
! 557: off_t off;
! 558:
! 559: if (!root || !func || !buffer || !buflen)
! 560: return NULL;
! 561:
! 562: if (offset == (off_t) -1) {
! 563: off = lseek(fd, 0, SEEK_CUR);
! 564: if (off == -1) {
! 565: LOGERR;
! 566: return NULL;
! 567: }
! 568: } else
! 569: off = offset;
! 570:
! 571: if (!(acb = malloc(sizeof(struct aiocb)))) {
! 572: LOGERR;
! 573: return NULL;
! 574: } else
! 575: memset(acb, 0, sizeof(struct aiocb));
! 576:
! 577: acb->aio_fildes = fd;
! 578: acb->aio_nbytes = buflen;
! 579: acb->aio_buf = buffer;
! 580: acb->aio_offset = off;
! 581: acb->aio_sigevent.sigev_notify = SIGEV_KEVENT;
! 582: acb->aio_sigevent.sigev_notify_kqueue = root->root_kq;
! 583: acb->aio_sigevent.sigev_value.sival_ptr = acb;
! 584:
! 585: if (aio_read(acb)) {
! 586: LOGERR;
! 587: free(acb);
! 588: return NULL;
! 589: }
! 590:
! 591: return schedAIO(root, func, arg, acb, buffer, buflen);
! 592: }
! 593:
! 594: /*
! 595: * schedAIOWrite() - Add AIO write task to scheduler queue
! 596: *
! 597: * @root = root task
! 598: * @func = task execution function
! 599: * @arg = 1st func argument
! 600: * @fd = file descriptor
! 601: * @buffer = Buffer
! 602: * @buflen = Buffer length
! 603: * @offset = Offset from start of file, if =-1 from current position
! 604: * return: NULL error or !=NULL new queued task
! 605: */
! 606: inline sched_task_t *
! 607: schedAIOWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
! 608: void *buffer, size_t buflen, off_t offset)
! 609: {
! 610: struct aiocb *acb;
! 611: off_t off;
! 612:
! 613: if (!root || !func || !buffer || !buflen)
! 614: return NULL;
! 615:
! 616: if (offset == (off_t) -1) {
! 617: off = lseek(fd, 0, SEEK_CUR);
! 618: if (off == -1) {
! 619: LOGERR;
! 620: return NULL;
! 621: }
! 622: } else
! 623: off = offset;
! 624:
! 625: if (!(acb = malloc(sizeof(struct aiocb)))) {
! 626: LOGERR;
! 627: return NULL;
! 628: } else
! 629: memset(acb, 0, sizeof(struct aiocb));
! 630:
! 631: acb->aio_fildes = fd;
! 632: acb->aio_nbytes = buflen;
! 633: acb->aio_buf = buffer;
! 634: acb->aio_offset = off;
! 635: acb->aio_sigevent.sigev_notify = SIGEV_KEVENT;
! 636: acb->aio_sigevent.sigev_notify_kqueue = root->root_kq;
! 637: acb->aio_sigevent.sigev_value.sival_ptr = acb;
! 638:
! 639: if (aio_write(acb)) {
! 640: LOGERR;
! 641: free(acb);
! 642: return NULL;
! 643: }
! 644:
! 645: return schedAIO(root, func, arg, acb, buffer, buflen);
! 646: }
! 647:
! 648: #ifdef EVFILT_LIO
! 649: /*
! 650: * schedLIO() - Add AIO bulk tasks to scheduler queue
! 651: *
! 652: * @root = root task
! 653: * @func = task execution function
! 654: * @arg = 1st func argument
! 655: * @acbs = AIO cb structure addresses
! 656: * @opt_data = Optional data
! 657: * @opt_dlen = Optional data length
! 658: * return: NULL error or !=NULL new queued task
! 659: */
! 660: sched_task_t *
! 661: schedLIO(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg,
! 662: struct aiocb ** __restrict acbs, void *opt_data, size_t opt_dlen)
! 663: {
! 664: sched_task_t *task;
! 665: void *ptr;
! 666:
! 667: if (!root || !func || !acbs || !opt_dlen)
! 668: return NULL;
! 669:
! 670: /* get new task */
! 671: if (!(task = _sched_useTask(root)))
! 672: return NULL;
! 673:
! 674: task->task_func = func;
! 675: TASK_TYPE(task) = taskLIO;
! 676: TASK_ROOT(task) = root;
! 677:
! 678: TASK_ARG(task) = arg;
! 679: TASK_VAL(task) = (u_long) acbs;
! 680:
! 681: TASK_DATA(task) = opt_data;
! 682: TASK_DATLEN(task) = opt_dlen;
! 683:
! 684: if (root->root_hooks.hook_add.lio)
! 685: ptr = root->root_hooks.hook_add.lio(task, NULL);
! 686: else
! 687: ptr = NULL;
! 688:
! 689: if (!ptr) {
! 690: #ifdef HAVE_LIBPTHREAD
! 691: pthread_mutex_lock(&root->root_mtx[taskLIO]);
! 692: #endif
! 693: TAILQ_INSERT_TAIL(&root->root_lio, TASK_ID(task), task_node);
! 694: #ifdef HAVE_LIBPTHREAD
! 695: pthread_mutex_unlock(&root->root_mtx[taskLIO]);
! 696: #endif
! 697: } else
! 698: task = _sched_unuseTask(task);
! 699:
! 700: return task;
! 701: }
! 702:
! 703: /*
! 704: * schedLIORead() - Add list of AIO read tasks to scheduler queue
! 705: *
! 706: * @root = root task
! 707: * @func = task execution function
! 708: * @arg = 1st func argument
! 709: * @fd = file descriptor
! 710: * @bufs = Buffer's list
! 711: * @nbufs = Number of Buffers
! 712: * @offset = Offset from start of file, if =-1 from current position
! 713: * return: NULL error or !=NULL new queued task
! 714: */
! 715: sched_task_t *
! 716: schedLIORead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
! 717: struct iovec *bufs, size_t nbufs, off_t offset)
! 718: {
! 719: struct sigevent sig;
! 720: struct aiocb **acb;
! 721: off_t off;
! 722: register int i;
! 723:
! 724: if (!root || !func || !bufs || !nbufs)
! 725: return NULL;
! 726:
! 727: if (offset == (off_t) -1) {
! 728: off = lseek(fd, 0, SEEK_CUR);
! 729: if (off == -1) {
! 730: LOGERR;
! 731: return NULL;
! 732: }
! 733: } else
! 734: off = offset;
! 735:
! 736: if (!(acb = calloc(sizeof(void*), nbufs))) {
! 737: LOGERR;
! 738: return NULL;
! 739: } else
! 740: memset(acb, 0, sizeof(void*) * nbufs);
! 741: for (i = 0; i < nbufs; off += bufs[i++].iov_len) {
! 742: acb[i] = malloc(sizeof(struct aiocb));
! 743: if (!acb[i]) {
! 744: LOGERR;
! 745: for (i = 0; i < nbufs; i++)
! 746: if (acb[i])
! 747: free(acb[i]);
! 748: free(acb);
! 749: return NULL;
! 750: } else
! 751: memset(acb[i], 0, sizeof(struct aiocb));
! 752: acb[i]->aio_fildes = fd;
! 753: acb[i]->aio_nbytes = bufs[i].iov_len;
! 754: acb[i]->aio_buf = bufs[i].iov_base;
! 755: acb[i]->aio_offset = off;
! 756: acb[i]->aio_lio_opcode = LIO_READ;
! 757: }
! 758: memset(&sig, 0, sizeof sig);
! 759: sig.sigev_notify = SIGEV_KEVENT;
! 760: sig.sigev_notify_kqueue = root->root_kq;
! 761: sig.sigev_value.sival_ptr = acb;
! 762:
! 763: if (lio_listio(LIO_NOWAIT, acb, nbufs, &sig)) {
! 764: LOGERR;
! 765: for (i = 0; i < nbufs; i++)
! 766: if (acb[i])
! 767: free(acb[i]);
! 768: free(acb);
! 769: return NULL;
! 770: }
! 771:
! 772: return schedLIO(root, func, arg, (void*) acb, bufs, nbufs);
! 773: }
! 774:
! 775: /*
! 776: * schedLIOWrite() - Add list of AIO write tasks to scheduler queue
! 777: *
! 778: * @root = root task
! 779: * @func = task execution function
! 780: * @arg = 1st func argument
! 781: * @fd = file descriptor
! 782: * @bufs = Buffer's list
! 783: * @nbufs = Number of Buffers
! 784: * @offset = Offset from start of file, if =-1 from current position
! 785: * return: NULL error or !=NULL new queued task
! 786: */
! 787: inline sched_task_t *
! 788: schedLIOWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd,
! 789: struct iovec *bufs, size_t nbufs, off_t offset)
! 790: {
! 791: struct sigevent sig;
! 792: struct aiocb **acb;
! 793: off_t off;
! 794: register int i;
! 795:
! 796: if (!root || !func || !bufs || !nbufs)
! 797: return NULL;
! 798:
! 799: if (offset == (off_t) -1) {
! 800: off = lseek(fd, 0, SEEK_CUR);
! 801: if (off == -1) {
! 802: LOGERR;
! 803: return NULL;
! 804: }
! 805: } else
! 806: off = offset;
! 807:
! 808: if (!(acb = calloc(sizeof(void*), nbufs))) {
! 809: LOGERR;
! 810: return NULL;
! 811: } else
! 812: memset(acb, 0, sizeof(void*) * nbufs);
! 813: for (i = 0; i < nbufs; off += bufs[i++].iov_len) {
! 814: acb[i] = malloc(sizeof(struct aiocb));
! 815: if (!acb[i]) {
! 816: LOGERR;
! 817: for (i = 0; i < nbufs; i++)
! 818: if (acb[i])
! 819: free(acb[i]);
! 820: free(acb);
! 821: return NULL;
! 822: } else
! 823: memset(acb[i], 0, sizeof(struct aiocb));
! 824: acb[i]->aio_fildes = fd;
! 825: acb[i]->aio_nbytes = bufs[i].iov_len;
! 826: acb[i]->aio_buf = bufs[i].iov_base;
! 827: acb[i]->aio_offset = off;
! 828: acb[i]->aio_lio_opcode = LIO_WRITE;
! 829: }
! 830: memset(&sig, 0, sizeof sig);
! 831: sig.sigev_notify = SIGEV_KEVENT;
! 832: sig.sigev_notify_kqueue = root->root_kq;
! 833: sig.sigev_value.sival_ptr = acb;
! 834:
! 835: if (lio_listio(LIO_NOWAIT, acb, nbufs, &sig)) {
! 836: LOGERR;
! 837: for (i = 0; i < nbufs; i++)
! 838: if (acb[i])
! 839: free(acb[i]);
! 840: free(acb);
! 841: return NULL;
! 842: }
! 843:
! 844: return schedLIO(root, func, arg, (void*) acb, bufs, nbufs);
! 845: }
! 846: #endif /* EVFILT_LIO */
! 847: #endif /* AIO_SUPPORT */
! 848:
1.8 misho 849: /*
1.1 misho 850: * schedTimer() - Add TIMER task to scheduler queue
1.6 misho 851: *
1.1 misho 852: * @root = root task
853: * @func = task execution function
854: * @arg = 1st func argument
1.5 misho 855: * @ts = timeout argument structure
856: * @opt_data = Optional data
857: * @opt_dlen = Optional data length
1.1 misho 858: * return: NULL error or !=NULL new queued task
859: */
860: sched_task_t *
1.5 misho 861: schedTimer(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,
862: void *opt_data, size_t opt_dlen)
1.1 misho 863: {
1.9 misho 864: sched_task_t *task, *tmp, *t = NULL;
1.1 misho 865: void *ptr;
1.5 misho 866: struct timespec now;
1.1 misho 867:
868: if (!root || !func)
869: return NULL;
870:
871: /* get new task */
1.4 misho 872: if (!(task = _sched_useTask(root)))
873: return NULL;
1.1 misho 874:
875: task->task_func = func;
1.5 misho 876: TASK_TYPE(task) = taskTIMER;
877: TASK_ROOT(task) = root;
1.1 misho 878:
879: TASK_ARG(task) = arg;
880:
1.5 misho 881: TASK_DATA(task) = opt_data;
882: TASK_DATLEN(task) = opt_dlen;
883:
1.1 misho 884: /* calculate timeval structure */
1.5 misho 885: clock_gettime(CLOCK_MONOTONIC, &now);
886: now.tv_sec += ts.tv_sec;
887: now.tv_nsec += ts.tv_nsec;
888: if (now.tv_nsec >= 1000000000L) {
1.1 misho 889: now.tv_sec++;
1.5 misho 890: now.tv_nsec -= 1000000000L;
891: } else if (now.tv_nsec < 0) {
1.1 misho 892: now.tv_sec--;
1.5 misho 893: now.tv_nsec += 1000000000L;
1.1 misho 894: }
1.5 misho 895: TASK_TS(task) = now;
1.1 misho 896:
897: if (root->root_hooks.hook_add.timer)
898: ptr = root->root_hooks.hook_add.timer(task, NULL);
899: else
900: ptr = NULL;
901:
902: if (!ptr) {
1.5 misho 903: #ifdef HAVE_LIBPTHREAD
904: pthread_mutex_lock(&root->root_mtx[taskTIMER]);
905: #endif
1.1 misho 906: #ifdef TIMER_WITHOUT_SORT
1.9 misho 907: TAILQ_INSERT_TAIL(&root->root_timer, TASK_ID(task), task_node);
1.1 misho 908: #else
1.9 misho 909: TAILQ_FOREACH_SAFE(t, &root->root_timer, task_node, tmp)
1.5 misho 910: if (sched_timespeccmp(&TASK_TS(task), &TASK_TS(t), -) < 1)
1.1 misho 911: break;
912: if (!t)
1.9 misho 913: TAILQ_INSERT_TAIL(&root->root_timer, TASK_ID(task), task_node);
1.1 misho 914: else
1.9 misho 915: TAILQ_INSERT_BEFORE(t, TASK_ID(task), task_node);
1.1 misho 916: #endif
1.5 misho 917: #ifdef HAVE_LIBPTHREAD
918: pthread_mutex_unlock(&root->root_mtx[taskTIMER]);
919: #endif
1.4 misho 920: } else
921: task = _sched_unuseTask(task);
1.1 misho 922:
923: return task;
924: }
925:
926: /*
927: * schedEvent() - Add EVENT task to scheduler queue
1.6 misho 928: *
1.1 misho 929: * @root = root task
930: * @func = task execution function
931: * @arg = 1st func argument
1.2 misho 932: * @val = additional func argument
1.5 misho 933: * @opt_data = Optional data
934: * @opt_dlen = Optional data length
1.1 misho 935: * return: NULL error or !=NULL new queued task
936: */
937: sched_task_t *
1.5 misho 938: schedEvent(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val,
939: void *opt_data, size_t opt_dlen)
1.1 misho 940: {
941: sched_task_t *task;
942: void *ptr;
943:
944: if (!root || !func)
945: return NULL;
946:
947: /* get new task */
1.4 misho 948: if (!(task = _sched_useTask(root)))
949: return NULL;
1.1 misho 950:
951: task->task_func = func;
1.5 misho 952: TASK_TYPE(task) = taskEVENT;
953: TASK_ROOT(task) = root;
1.1 misho 954:
955: TASK_ARG(task) = arg;
1.2 misho 956: TASK_VAL(task) = val;
1.1 misho 957:
1.5 misho 958: TASK_DATA(task) = opt_data;
959: TASK_DATLEN(task) = opt_dlen;
960:
1.1 misho 961: if (root->root_hooks.hook_add.event)
962: ptr = root->root_hooks.hook_add.event(task, NULL);
963: else
964: ptr = NULL;
965:
1.5 misho 966: if (!ptr) {
967: #ifdef HAVE_LIBPTHREAD
968: pthread_mutex_lock(&root->root_mtx[taskEVENT]);
969: #endif
1.9 misho 970: TAILQ_INSERT_TAIL(&root->root_event, TASK_ID(task), task_node);
1.5 misho 971: #ifdef HAVE_LIBPTHREAD
972: pthread_mutex_unlock(&root->root_mtx[taskEVENT]);
973: #endif
974: } else
1.4 misho 975: task = _sched_unuseTask(task);
1.1 misho 976:
977: return task;
978: }
979:
980:
981: /*
982: * schedEventLo() - Add EVENT_Lo task to scheduler queue
1.6 misho 983: *
1.1 misho 984: * @root = root task
985: * @func = task execution function
986: * @arg = 1st func argument
1.2 misho 987: * @val = additional func argument
1.5 misho 988: * @opt_data = Optional data
989: * @opt_dlen = Optional data length
1.1 misho 990: * return: NULL error or !=NULL new queued task
991: */
992: sched_task_t *
1.5 misho 993: schedEventLo(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val,
994: void *opt_data, size_t opt_dlen)
1.1 misho 995: {
996: sched_task_t *task;
997: void *ptr;
998:
999: if (!root || !func)
1000: return NULL;
1001:
1002: /* get new task */
1.4 misho 1003: if (!(task = _sched_useTask(root)))
1004: return NULL;
1.1 misho 1005:
1006: task->task_func = func;
1.5 misho 1007: TASK_TYPE(task) = taskEVENT;
1008: TASK_ROOT(task) = root;
1.1 misho 1009:
1010: TASK_ARG(task) = arg;
1.2 misho 1011: TASK_VAL(task) = val;
1.1 misho 1012:
1.5 misho 1013: TASK_DATA(task) = opt_data;
1014: TASK_DATLEN(task) = opt_dlen;
1015:
1.1 misho 1016: if (root->root_hooks.hook_add.eventlo)
1017: ptr = root->root_hooks.hook_add.eventlo(task, NULL);
1018: else
1019: ptr = NULL;
1020:
1.5 misho 1021: if (!ptr) {
1022: #ifdef HAVE_LIBPTHREAD
1023: pthread_mutex_lock(&root->root_mtx[taskEVENTLO]);
1024: #endif
1.9 misho 1025: TAILQ_INSERT_TAIL(&root->root_eventlo, TASK_ID(task), task_node);
1.5 misho 1026: #ifdef HAVE_LIBPTHREAD
1027: pthread_mutex_unlock(&root->root_mtx[taskEVENTLO]);
1028: #endif
1029: } else
1.4 misho 1030: task = _sched_unuseTask(task);
1.1 misho 1031:
1032: return task;
1033: }
1034:
1035: /*
1.10 misho 1036: * schedSuspend() - Add Suspended task to scheduler queue
1037: *
1038: * @root = root task
1039: * @func = task execution function
1040: * @arg = 1st func argument
1041: * @id = Trigger ID
1042: * @opt_data = Optional data
1043: * @opt_dlen = Optional data length
1044: * return: NULL error or !=NULL new queued task
1045: */
1046: sched_task_t *
1047: schedSuspend(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id,
1048: void *opt_data, size_t opt_dlen)
1049: {
1050: sched_task_t *task;
1051: void *ptr;
1052:
1053: if (!root || !func)
1054: return NULL;
1055:
1056: /* get new task */
1057: if (!(task = _sched_useTask(root)))
1058: return NULL;
1059:
1060: task->task_func = func;
1061: TASK_TYPE(task) = taskSUSPEND;
1062: TASK_ROOT(task) = root;
1063:
1064: TASK_ARG(task) = arg;
1065: TASK_VAL(task) = id;
1066:
1067: TASK_DATA(task) = opt_data;
1068: TASK_DATLEN(task) = opt_dlen;
1069:
1070: if (root->root_hooks.hook_add.suspend)
1071: ptr = root->root_hooks.hook_add.suspend(task, NULL);
1072: else
1073: ptr = NULL;
1074:
1075: if (!ptr) {
1076: #ifdef HAVE_LIBPTHREAD
1077: pthread_mutex_lock(&root->root_mtx[taskSUSPEND]);
1078: #endif
1079: TAILQ_INSERT_TAIL(&root->root_suspend, TASK_ID(task), task_node);
1080: #ifdef HAVE_LIBPTHREAD
1081: pthread_mutex_unlock(&root->root_mtx[taskSUSPEND]);
1082: #endif
1083: } else
1084: task = _sched_unuseTask(task);
1085:
1086: return task;
1087: }
1088:
1089: /*
1.1 misho 1090: * schedCallOnce() - Call once from scheduler
1.6 misho 1091: *
1.1 misho 1092: * @root = root task
1093: * @func = task execution function
1094: * @arg = 1st func argument
1.2 misho 1095: * @val = additional func argument
1.5 misho 1096: * @opt_data = Optional data
1097: * @opt_dlen = Optional data length
1.2 misho 1098: * return: return value from called func
1.1 misho 1099: */
1100: sched_task_t *
1.5 misho 1101: schedCallOnce(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val,
1102: void *opt_data, size_t opt_dlen)
1.1 misho 1103: {
1104: sched_task_t *task;
1.2 misho 1105: void *ret;
1.1 misho 1106:
1107: if (!root || !func)
1108: return NULL;
1109:
1110: /* get new task */
1.4 misho 1111: if (!(task = _sched_useTask(root)))
1112: return NULL;
1.1 misho 1113:
1114: task->task_func = func;
1.5 misho 1115: TASK_TYPE(task) = taskEVENT;
1116: TASK_ROOT(task) = root;
1.1 misho 1117:
1118: TASK_ARG(task) = arg;
1.2 misho 1119: TASK_VAL(task) = val;
1.1 misho 1120:
1.5 misho 1121: TASK_DATA(task) = opt_data;
1122: TASK_DATLEN(task) = opt_dlen;
1123:
1.2 misho 1124: ret = schedCall(task);
1.1 misho 1125:
1.4 misho 1126: _sched_unuseTask(task);
1.2 misho 1127: return ret;
1.1 misho 1128: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>