![]() ![]() | ![]() |
epoll support fixed select/epoll and kqueue
1: /************************************************************************* 2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com> 3: * by Michael Pounov <misho@openbsd-bg.org> 4: * 5: * $Author: misho $ 6: * $Id: tasks.c,v 1.24.2.4 2014/06/03 20:39:54 misho Exp $ 7: * 8: ************************************************************************** 9: The ELWIX and AITNET software is distributed under the following 10: terms: 11: 12: All of the documentation and software included in the ELWIX and AITNET 13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org> 14: 15: Copyright 2004 - 2014 16: by Michael Pounov <misho@elwix.org>. All rights reserved. 17: 18: Redistribution and use in source and binary forms, with or without 19: modification, are permitted provided that the following conditions 20: are met: 21: 1. Redistributions of source code must retain the above copyright 22: notice, this list of conditions and the following disclaimer. 23: 2. Redistributions in binary form must reproduce the above copyright 24: notice, this list of conditions and the following disclaimer in the 25: documentation and/or other materials provided with the distribution. 26: 3. All advertising materials mentioning features or use of this software 27: must display the following acknowledgement: 28: This product includes software developed by Michael Pounov <misho@elwix.org> 29: ELWIX - Embedded LightWeight unIX and its contributors. 30: 4. Neither the name of AITNET nor the names of its contributors 31: may be used to endorse or promote products derived from this software 32: without specific prior written permission. 33: 34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND 35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE 38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 44: SUCH DAMAGE. 45: */ 46: #include "global.h" 47: 48: 49: /* 50: * sched_useTask() - Get and init new task 51: * 52: * @root = root task 53: * return: NULL error or !=NULL prepared task 54: */ 55: sched_task_t * 56: sched_useTask(sched_root_task_t * __restrict root) 57: { 58: sched_task_t *task, *tmp; 59: 60: SCHED_QLOCK(root, taskUNUSE); 61: TAILQ_FOREACH_SAFE(task, &root->root_unuse, task_node, tmp) { 62: if (!TASK_ISLOCKED(task)) { 63: TAILQ_REMOVE(&root->root_unuse, task, task_node); 64: break; 65: } 66: } 67: SCHED_QUNLOCK(root, taskUNUSE); 68: 69: if (!task) { 70: task = malloc(sizeof(sched_task_t)); 71: if (!task) { 72: LOGERR; 73: return NULL; 74: } 75: } 76: 77: memset(task, 0, sizeof(sched_task_t)); 78: task->task_id = (uintptr_t) task; 79: return task; 80: } 81: 82: /* 83: * sched_unuseTask() - Unlock and put task to unuse queue 84: * 85: * @task = task 86: * return: always is NULL 87: */ 88: sched_task_t * 89: sched_unuseTask(sched_task_t * __restrict task) 90: { 91: TASK_UNLOCK(task); 92: 93: TASK_TYPE(task) = taskUNUSE; 94: insert_task_to(task, &(TASK_ROOT(task))->root_unuse); 95: 96: task = NULL; 97: return task; 98: } 99: 100: #pragma GCC visibility push(hidden) 101: 102: #ifdef HAVE_LIBPTHREAD 103: void * 104: _sched_threadWrapper(sched_task_t *t) 105: { 106: void *ret = NULL; 107: sched_root_task_t *r; 108: 109: if (!t || !TASK_ROOT(t)) 110: pthread_exit(ret); 111: else 112: r = (sched_root_task_t*) TASK_ROOT(t); 113: 114: pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); 115: /* 116: pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); 117: */ 118: 119: /* notify parent, thread is ready for execution */ 120: pthread_testcancel(); 121: 122: ret = schedCall(t); 123: r->root_ret = ret; 124: 125: if (TASK_VAL(t)) { 126: transit_task2unuse(t, &r->root_thread); 127: TASK_VAL(t) = 0; 128: } 129: 130: pthread_exit(ret); 131: } 132: #endif 133: 134: #if defined(HAVE_TIMER_CREATE) && defined(HAVE_TIMER_SETTIME) && defined(HAVE_TIMER_DELETE) 135: void * 136: _sched_rtcWrapper(sched_task_t *t) 137: { 138: sched_task_t *task; 139: void *ret; 140: 141: if (!t || !TASK_ROOT(t) || !TASK_DATA(t)) 142: return NULL; 143: else { 144: task = (sched_task_t*) TASK_DATA(t); 145: timer_delete((timer_t) TASK_DATLEN(t)); 146: } 147: 148: ret = schedCall(task); 149: 150: transit_task2unuse(task, &(TASK_ROOT(task))->root_rtc); 151: return ret; 152: } 153: #endif 154: 155: #pragma GCC visibility pop 156: 157: /* 158: * sched_taskExit() - Exit routine for scheduler task, explicit required for thread tasks 159: * 160: * @task = current task 161: * @retcode = return code 162: * return: return code 163: */ 164: void * 165: sched_taskExit(sched_task_t *task, intptr_t retcode) 166: { 167: if (!task || !TASK_ROOT(task)) 168: return (void*) -1; 169: 170: if (TASK_ROOT(task)->root_hooks.hook_exec.exit) 171: TASK_ROOT(task)->root_hooks.hook_exec.exit(task, (void*) retcode); 172: 173: TASK_ROOT(task)->root_ret = (void*) retcode; 174: return (void*) retcode; 175: } 176: 177: 178: /* 179: * schedRead() - Add READ I/O task to scheduler queue 180: * 181: * @root = root task 182: * @func = task execution function 183: * @arg = 1st func argument 184: * @fd = fd handle 185: * @opt_data = Optional data 186: * @opt_dlen = Optional data length 187: * return: NULL error or !=NULL new queued task 188: */ 189: sched_task_t * 190: schedRead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd, 191: void *opt_data, size_t opt_dlen) 192: { 193: sched_task_t *task; 194: void *ptr; 195: 196: if (!root || !func) 197: return NULL; 198: 199: /* get new task */ 200: if (!(task = sched_useTask(root))) 201: return NULL; 202: 203: TASK_FUNC(task) = func; 204: TASK_TYPE(task) = taskREAD; 205: TASK_ROOT(task) = root; 206: 207: TASK_ARG(task) = arg; 208: TASK_FD(task) = fd; 209: 210: TASK_DATA(task) = opt_data; 211: TASK_DATLEN(task) = opt_dlen; 212: 213: if (root->root_hooks.hook_add.read) 214: ptr = root->root_hooks.hook_add.read(task, NULL); 215: else 216: ptr = NULL; 217: 218: if (!ptr) 219: insert_task_to(task, &root->root_read); 220: else 221: task = sched_unuseTask(task); 222: 223: return task; 224: } 225: 226: /* 227: * schedWrite() - Add WRITE I/O task to scheduler queue 228: * 229: * @root = root task 230: * @func = task execution function 231: * @arg = 1st func argument 232: * @fd = fd handle 233: * @opt_data = Optional data 234: * @opt_dlen = Optional data length 235: * return: NULL error or !=NULL new queued task 236: */ 237: sched_task_t * 238: schedWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd, 239: void *opt_data, size_t opt_dlen) 240: { 241: sched_task_t *task; 242: void *ptr; 243: 244: if (!root || !func) 245: return NULL; 246: 247: /* get new task */ 248: if (!(task = sched_useTask(root))) 249: return NULL; 250: 251: TASK_FUNC(task) = func; 252: TASK_TYPE(task) = taskWRITE; 253: TASK_ROOT(task) = root; 254: 255: TASK_ARG(task) = arg; 256: TASK_FD(task) = fd; 257: 258: TASK_DATA(task) = opt_data; 259: TASK_DATLEN(task) = opt_dlen; 260: 261: if (root->root_hooks.hook_add.write) 262: ptr = root->root_hooks.hook_add.write(task, NULL); 263: else 264: ptr = NULL; 265: 266: if (!ptr) 267: insert_task_to(task, &root->root_write); 268: else 269: task = sched_unuseTask(task); 270: 271: return task; 272: } 273: 274: /* 275: * schedNode() - Add NODE task to scheduler queue 276: * 277: * @root = root task 278: * @func = task execution function 279: * @arg = 1st func argument 280: * @fd = fd handle 281: * @opt_data = Optional data 282: * @opt_dlen = Optional data length 283: * return: NULL error or !=NULL new queued task 284: */ 285: sched_task_t * 286: schedNode(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd, 287: void *opt_data, size_t opt_dlen) 288: { 289: #if SUP_ENABLE != KQ_SUPPORT 290: sched_SetErr(ENOTSUP, "disabled kqueue support"); 291: return NULL; 292: #else 293: sched_task_t *task; 294: void *ptr; 295: 296: if (!root || !func) 297: return NULL; 298: 299: /* get new task */ 300: if (!(task = sched_useTask(root))) 301: return NULL; 302: 303: TASK_FUNC(task) = func; 304: TASK_TYPE(task) = taskNODE; 305: TASK_ROOT(task) = root; 306: 307: TASK_ARG(task) = arg; 308: TASK_FD(task) = fd; 309: 310: TASK_DATA(task) = opt_data; 311: TASK_DATLEN(task) = opt_dlen; 312: 313: if (root->root_hooks.hook_add.node) 314: ptr = root->root_hooks.hook_add.node(task, NULL); 315: else 316: ptr = NULL; 317: 318: if (!ptr) 319: insert_task_to(task, &root->root_node); 320: else 321: task = sched_unuseTask(task); 322: 323: return task; 324: #endif /* KQ_SUPPORT */ 325: } 326: 327: /* 328: * schedProc() - Add PROC task to scheduler queue 329: * 330: * @root = root task 331: * @func = task execution function 332: * @arg = 1st func argument 333: * @pid = PID 334: * @opt_data = Optional data 335: * @opt_dlen = Optional data length 336: * return: NULL error or !=NULL new queued task 337: */ 338: sched_task_t * 339: schedProc(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long pid, 340: void *opt_data, size_t opt_dlen) 341: { 342: #if SUP_ENABLE != KQ_SUPPORT 343: sched_SetErr(ENOTSUP, "disabled kqueue support"); 344: return NULL; 345: #else 346: sched_task_t *task; 347: void *ptr; 348: 349: if (!root || !func) 350: return NULL; 351: 352: /* get new task */ 353: if (!(task = sched_useTask(root))) 354: return NULL; 355: 356: TASK_FUNC(task) = func; 357: TASK_TYPE(task) = taskPROC; 358: TASK_ROOT(task) = root; 359: 360: TASK_ARG(task) = arg; 361: TASK_VAL(task) = pid; 362: 363: TASK_DATA(task) = opt_data; 364: TASK_DATLEN(task) = opt_dlen; 365: 366: if (root->root_hooks.hook_add.proc) 367: ptr = root->root_hooks.hook_add.proc(task, NULL); 368: else 369: ptr = NULL; 370: 371: if (!ptr) 372: insert_task_to(task, &root->root_proc); 373: else 374: task = sched_unuseTask(task); 375: 376: return task; 377: #endif /* KQ_SUPPORT */ 378: } 379: 380: /* 381: * schedUser() - Add trigger USER task to scheduler queue 382: * 383: * @root = root task 384: * @func = task execution function 385: * @arg = 1st func argument 386: * @id = Trigger ID 387: * @opt_data = Optional data 388: * @opt_dlen = Optional user's trigger flags 389: * return: NULL error or !=NULL new queued task 390: */ 391: sched_task_t * 392: schedUser(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id, 393: void *opt_data, size_t opt_dlen) 394: { 395: #if SUP_ENABLE != KQ_SUPPORT 396: sched_SetErr(ENOTSUP, "disabled kqueue support"); 397: return NULL; 398: #else 399: #ifndef EVFILT_USER 400: sched_SetErr(ENOTSUP, "Not supported kevent() filter"); 401: return NULL; 402: #else 403: sched_task_t *task; 404: void *ptr; 405: 406: if (!root || !func) 407: return NULL; 408: 409: /* get new task */ 410: if (!(task = sched_useTask(root))) 411: return NULL; 412: 413: TASK_FUNC(task) = func; 414: TASK_TYPE(task) = taskUSER; 415: TASK_ROOT(task) = root; 416: 417: TASK_ARG(task) = arg; 418: TASK_VAL(task) = id; 419: 420: TASK_DATA(task) = opt_data; 421: TASK_DATLEN(task) = opt_dlen; 422: 423: if (root->root_hooks.hook_add.user) 424: ptr = root->root_hooks.hook_add.user(task, NULL); 425: else 426: ptr = NULL; 427: 428: if (!ptr) 429: insert_task_to(task, &root->root_user); 430: else 431: task = sched_unuseTask(task); 432: 433: return task; 434: #endif /* EVFILT_USER */ 435: #endif /* KQ_SUPPORT */ 436: } 437: 438: /* 439: * schedSignal() - Add SIGNAL task to scheduler queue 440: * 441: * @root = root task 442: * @func = task execution function 443: * @arg = 1st func argument 444: * @sig = Signal 445: * @opt_data = Optional data 446: * @opt_dlen = Optional data length 447: * return: NULL error or !=NULL new queued task 448: */ 449: sched_task_t * 450: schedSignal(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long sig, 451: void *opt_data, size_t opt_dlen) 452: { 453: #if SUP_ENABLE != KQ_SUPPORT 454: sched_SetErr(ENOTSUP, "disabled kqueue support"); 455: return NULL; 456: #else 457: sched_task_t *task; 458: void *ptr; 459: 460: if (!root || !func) 461: return NULL; 462: 463: /* get new task */ 464: if (!(task = sched_useTask(root))) 465: return NULL; 466: 467: TASK_FUNC(task) = func; 468: TASK_TYPE(task) = taskSIGNAL; 469: TASK_ROOT(task) = root; 470: 471: TASK_ARG(task) = arg; 472: TASK_VAL(task) = sig; 473: 474: TASK_DATA(task) = opt_data; 475: TASK_DATLEN(task) = opt_dlen; 476: 477: if (root->root_hooks.hook_add.signal) 478: ptr = root->root_hooks.hook_add.signal(task, NULL); 479: else 480: ptr = NULL; 481: 482: if (!ptr) 483: insert_task_to(task, &root->root_signal); 484: else 485: task = sched_unuseTask(task); 486: 487: return task; 488: #endif /* KQ_SUPPORT */ 489: } 490: 491: /* 492: * schedAlarm() - Add ALARM task to scheduler queue 493: * 494: * @root = root task 495: * @func = task execution function 496: * @arg = 1st func argument 497: * @ts = timeout argument structure, minimum alarm timer resolution is 1msec! 498: * @opt_data = Alarm timer ID 499: * @opt_dlen = Optional data length 500: * return: NULL error or !=NULL new queued task 501: */ 502: sched_task_t * 503: schedAlarm(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts, 504: void *opt_data, size_t opt_dlen) 505: { 506: #if SUP_ENABLE != KQ_SUPPORT 507: sched_SetErr(ENOTSUP, "disabled kqueue support"); 508: return NULL; 509: #else 510: sched_task_t *task; 511: void *ptr; 512: 513: if (!root || !func) 514: return NULL; 515: 516: /* get new task */ 517: if (!(task = sched_useTask(root))) 518: return NULL; 519: 520: TASK_FUNC(task) = func; 521: TASK_TYPE(task) = taskALARM; 522: TASK_ROOT(task) = root; 523: 524: TASK_ARG(task) = arg; 525: TASK_TS(task) = ts; 526: 527: TASK_DATA(task) = opt_data; 528: TASK_DATLEN(task) = opt_dlen; 529: 530: if (root->root_hooks.hook_add.alarm) 531: ptr = root->root_hooks.hook_add.alarm(task, NULL); 532: else 533: ptr = NULL; 534: 535: if (!ptr) 536: insert_task_to(task, &root->root_alarm); 537: else 538: task = sched_unuseTask(task); 539: 540: return task; 541: #endif /* KQ_SUPPORT */ 542: } 543: 544: #ifdef AIO_SUPPORT 545: /* 546: * schedAIO() - Add AIO task to scheduler queue 547: * 548: * @root = root task 549: * @func = task execution function 550: * @arg = 1st func argument 551: * @acb = AIO cb structure address 552: * @opt_data = Optional data 553: * @opt_dlen = Optional data length 554: * return: NULL error or !=NULL new queued task 555: */ 556: sched_task_t * 557: schedAIO(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, 558: struct aiocb * __restrict acb, void *opt_data, size_t opt_dlen) 559: { 560: #if SUP_ENABLE != KQ_SUPPORT 561: sched_SetErr(ENOTSUP, "disabled kqueue support"); 562: return NULL; 563: #else 564: sched_task_t *task; 565: void *ptr; 566: 567: if (!root || !func || !acb || !opt_dlen) 568: return NULL; 569: 570: /* get new task */ 571: if (!(task = sched_useTask(root))) 572: return NULL; 573: 574: TASK_FUNC(task) = func; 575: TASK_TYPE(task) = taskAIO; 576: TASK_ROOT(task) = root; 577: 578: TASK_ARG(task) = arg; 579: TASK_VAL(task) = (u_long) acb; 580: 581: TASK_DATA(task) = opt_data; 582: TASK_DATLEN(task) = opt_dlen; 583: 584: if (root->root_hooks.hook_add.aio) 585: ptr = root->root_hooks.hook_add.aio(task, NULL); 586: else 587: ptr = NULL; 588: 589: if (!ptr) 590: insert_task_to(task, &root->root_aio); 591: else 592: task = sched_unuseTask(task); 593: 594: return task; 595: #endif /* KQ_SUPPORT */ 596: } 597: 598: /* 599: * schedAIORead() - Add AIO read task to scheduler queue 600: * 601: * @root = root task 602: * @func = task execution function 603: * @arg = 1st func argument 604: * @fd = file descriptor 605: * @buffer = Buffer 606: * @buflen = Buffer length 607: * @offset = Offset from start of file, if =-1 from current position 608: * return: NULL error or !=NULL new queued task 609: */ 610: sched_task_t * 611: schedAIORead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd, 612: void *buffer, size_t buflen, off_t offset) 613: { 614: #if SUP_ENABLE != KQ_SUPPORT 615: sched_SetErr(ENOTSUP, "disabled kqueue support"); 616: return NULL; 617: #else 618: struct aiocb *acb; 619: off_t off; 620: 621: if (!root || !func || !buffer || !buflen) 622: return NULL; 623: 624: if (offset == (off_t) -1) { 625: off = lseek(fd, 0, SEEK_CUR); 626: if (off == -1) { 627: LOGERR; 628: return NULL; 629: } 630: } else 631: off = offset; 632: 633: if (!(acb = malloc(sizeof(struct aiocb)))) { 634: LOGERR; 635: return NULL; 636: } else 637: memset(acb, 0, sizeof(struct aiocb)); 638: 639: acb->aio_fildes = fd; 640: acb->aio_nbytes = buflen; 641: acb->aio_buf = buffer; 642: acb->aio_offset = off; 643: acb->aio_sigevent.sigev_notify = SIGEV_KEVENT; 644: acb->aio_sigevent.sigev_notify_kqueue = root->root_kq; 645: acb->aio_sigevent.sigev_value.sival_ptr = acb; 646: 647: if (aio_read(acb)) { 648: LOGERR; 649: free(acb); 650: return NULL; 651: } 652: 653: return schedAIO(root, func, arg, acb, buffer, buflen); 654: #endif /* KQ_SUPPORT */ 655: } 656: 657: /* 658: * schedAIOWrite() - Add AIO write task to scheduler queue 659: * 660: * @root = root task 661: * @func = task execution function 662: * @arg = 1st func argument 663: * @fd = file descriptor 664: * @buffer = Buffer 665: * @buflen = Buffer length 666: * @offset = Offset from start of file, if =-1 from current position 667: * return: NULL error or !=NULL new queued task 668: */ 669: sched_task_t * 670: schedAIOWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd, 671: void *buffer, size_t buflen, off_t offset) 672: { 673: #if SUP_ENABLE != KQ_SUPPORT 674: sched_SetErr(ENOTSUP, "disabled kqueue support"); 675: return NULL; 676: #else 677: struct aiocb *acb; 678: off_t off; 679: 680: if (!root || !func || !buffer || !buflen) 681: return NULL; 682: 683: if (offset == (off_t) -1) { 684: off = lseek(fd, 0, SEEK_CUR); 685: if (off == -1) { 686: LOGERR; 687: return NULL; 688: } 689: } else 690: off = offset; 691: 692: if (!(acb = malloc(sizeof(struct aiocb)))) { 693: LOGERR; 694: return NULL; 695: } else 696: memset(acb, 0, sizeof(struct aiocb)); 697: 698: acb->aio_fildes = fd; 699: acb->aio_nbytes = buflen; 700: acb->aio_buf = buffer; 701: acb->aio_offset = off; 702: acb->aio_sigevent.sigev_notify = SIGEV_KEVENT; 703: acb->aio_sigevent.sigev_notify_kqueue = root->root_kq; 704: acb->aio_sigevent.sigev_value.sival_ptr = acb; 705: 706: if (aio_write(acb)) { 707: LOGERR; 708: free(acb); 709: return NULL; 710: } 711: 712: return schedAIO(root, func, arg, acb, buffer, buflen); 713: #endif /* KQ_SUPPORT */ 714: } 715: 716: #ifdef EVFILT_LIO 717: /* 718: * schedLIO() - Add AIO bulk tasks to scheduler queue 719: * 720: * @root = root task 721: * @func = task execution function 722: * @arg = 1st func argument 723: * @acbs = AIO cb structure addresses 724: * @opt_data = Optional data 725: * @opt_dlen = Optional data length 726: * return: NULL error or !=NULL new queued task 727: */ 728: sched_task_t * 729: schedLIO(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, 730: struct aiocb ** __restrict acbs, void *opt_data, size_t opt_dlen) 731: { 732: #if SUP_ENABLE != KQ_SUPPORT 733: sched_SetErr(ENOTSUP, "disabled kqueue support"); 734: return NULL; 735: #else 736: sched_task_t *task; 737: void *ptr; 738: 739: if (!root || !func || !acbs || !opt_dlen) 740: return NULL; 741: 742: /* get new task */ 743: if (!(task = sched_useTask(root))) 744: return NULL; 745: 746: TASK_FUNC(task) = func; 747: TASK_TYPE(task) = taskLIO; 748: TASK_ROOT(task) = root; 749: 750: TASK_ARG(task) = arg; 751: TASK_VAL(task) = (u_long) acbs; 752: 753: TASK_DATA(task) = opt_data; 754: TASK_DATLEN(task) = opt_dlen; 755: 756: if (root->root_hooks.hook_add.lio) 757: ptr = root->root_hooks.hook_add.lio(task, NULL); 758: else 759: ptr = NULL; 760: 761: if (!ptr) 762: insert_task_to(task, &root->root_lio); 763: else 764: task = sched_unuseTask(task); 765: 766: return task; 767: #endif /* KQ_SUPPORT */ 768: } 769: 770: /* 771: * schedLIORead() - Add list of AIO read tasks to scheduler queue 772: * 773: * @root = root task 774: * @func = task execution function 775: * @arg = 1st func argument 776: * @fd = file descriptor 777: * @bufs = Buffer's list 778: * @nbufs = Number of Buffers 779: * @offset = Offset from start of file, if =-1 from current position 780: * return: NULL error or !=NULL new queued task 781: */ 782: sched_task_t * 783: schedLIORead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd, 784: struct iovec *bufs, size_t nbufs, off_t offset) 785: { 786: #if SUP_ENABLE != KQ_SUPPORT 787: sched_SetErr(ENOTSUP, "disabled kqueue support"); 788: return NULL; 789: #else 790: struct sigevent sig; 791: struct aiocb **acb; 792: off_t off; 793: register int i; 794: 795: if (!root || !func || !bufs || !nbufs) 796: return NULL; 797: 798: if (offset == (off_t) -1) { 799: off = lseek(fd, 0, SEEK_CUR); 800: if (off == -1) { 801: LOGERR; 802: return NULL; 803: } 804: } else 805: off = offset; 806: 807: if (!(acb = calloc(sizeof(void*), nbufs))) { 808: LOGERR; 809: return NULL; 810: } else 811: memset(acb, 0, sizeof(void*) * nbufs); 812: for (i = 0; i < nbufs; off += bufs[i++].iov_len) { 813: acb[i] = malloc(sizeof(struct aiocb)); 814: if (!acb[i]) { 815: LOGERR; 816: for (i = 0; i < nbufs; i++) 817: if (acb[i]) 818: free(acb[i]); 819: free(acb); 820: return NULL; 821: } else 822: memset(acb[i], 0, sizeof(struct aiocb)); 823: acb[i]->aio_fildes = fd; 824: acb[i]->aio_nbytes = bufs[i].iov_len; 825: acb[i]->aio_buf = bufs[i].iov_base; 826: acb[i]->aio_offset = off; 827: acb[i]->aio_lio_opcode = LIO_READ; 828: } 829: memset(&sig, 0, sizeof sig); 830: sig.sigev_notify = SIGEV_KEVENT; 831: sig.sigev_notify_kqueue = root->root_kq; 832: sig.sigev_value.sival_ptr = acb; 833: 834: if (lio_listio(LIO_NOWAIT, acb, nbufs, &sig)) { 835: LOGERR; 836: for (i = 0; i < nbufs; i++) 837: if (acb[i]) 838: free(acb[i]); 839: free(acb); 840: return NULL; 841: } 842: 843: return schedLIO(root, func, arg, (void*) acb, bufs, nbufs); 844: #endif /* KQ_SUPPORT */ 845: } 846: 847: /* 848: * schedLIOWrite() - Add list of AIO write tasks to scheduler queue 849: * 850: * @root = root task 851: * @func = task execution function 852: * @arg = 1st func argument 853: * @fd = file descriptor 854: * @bufs = Buffer's list 855: * @nbufs = Number of Buffers 856: * @offset = Offset from start of file, if =-1 from current position 857: * return: NULL error or !=NULL new queued task 858: */ 859: sched_task_t * 860: schedLIOWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd, 861: struct iovec *bufs, size_t nbufs, off_t offset) 862: { 863: #if SUP_ENABLE != KQ_SUPPORT 864: sched_SetErr(ENOTSUP, "disabled kqueue support"); 865: return NULL; 866: #else 867: struct sigevent sig; 868: struct aiocb **acb; 869: off_t off; 870: register int i; 871: 872: if (!root || !func || !bufs || !nbufs) 873: return NULL; 874: 875: if (offset == (off_t) -1) { 876: off = lseek(fd, 0, SEEK_CUR); 877: if (off == -1) { 878: LOGERR; 879: return NULL; 880: } 881: } else 882: off = offset; 883: 884: if (!(acb = calloc(sizeof(void*), nbufs))) { 885: LOGERR; 886: return NULL; 887: } else 888: memset(acb, 0, sizeof(void*) * nbufs); 889: for (i = 0; i < nbufs; off += bufs[i++].iov_len) { 890: acb[i] = malloc(sizeof(struct aiocb)); 891: if (!acb[i]) { 892: LOGERR; 893: for (i = 0; i < nbufs; i++) 894: if (acb[i]) 895: free(acb[i]); 896: free(acb); 897: return NULL; 898: } else 899: memset(acb[i], 0, sizeof(struct aiocb)); 900: acb[i]->aio_fildes = fd; 901: acb[i]->aio_nbytes = bufs[i].iov_len; 902: acb[i]->aio_buf = bufs[i].iov_base; 903: acb[i]->aio_offset = off; 904: acb[i]->aio_lio_opcode = LIO_WRITE; 905: } 906: memset(&sig, 0, sizeof sig); 907: sig.sigev_notify = SIGEV_KEVENT; 908: sig.sigev_notify_kqueue = root->root_kq; 909: sig.sigev_value.sival_ptr = acb; 910: 911: if (lio_listio(LIO_NOWAIT, acb, nbufs, &sig)) { 912: LOGERR; 913: for (i = 0; i < nbufs; i++) 914: if (acb[i]) 915: free(acb[i]); 916: free(acb); 917: return NULL; 918: } 919: 920: return schedLIO(root, func, arg, (void*) acb, bufs, nbufs); 921: #endif /* KQ_SUPPORT */ 922: } 923: #endif /* EVFILT_LIO */ 924: #endif /* AIO_SUPPORT */ 925: 926: /* 927: * schedTimer() - Add TIMER task to scheduler queue 928: * 929: * @root = root task 930: * @func = task execution function 931: * @arg = 1st func argument 932: * @ts = timeout argument structure 933: * @opt_data = Optional data 934: * @opt_dlen = Optional data length 935: * return: NULL error or !=NULL new queued task 936: */ 937: sched_task_t * 938: schedTimer(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts, 939: void *opt_data, size_t opt_dlen) 940: { 941: sched_task_t *task, *tmp, *t = NULL; 942: void *ptr; 943: struct timespec now; 944: 945: if (!root || !func) 946: return NULL; 947: 948: /* get new task */ 949: if (!(task = sched_useTask(root))) 950: return NULL; 951: 952: TASK_FUNC(task) = func; 953: TASK_TYPE(task) = taskTIMER; 954: TASK_ROOT(task) = root; 955: 956: TASK_ARG(task) = arg; 957: 958: TASK_DATA(task) = opt_data; 959: TASK_DATLEN(task) = opt_dlen; 960: 961: /* calculate timeval structure */ 962: clock_gettime(CLOCK_MONOTONIC, &now); 963: now.tv_sec += ts.tv_sec; 964: now.tv_nsec += ts.tv_nsec; 965: if (now.tv_nsec >= 1000000000L) { 966: now.tv_sec++; 967: now.tv_nsec -= 1000000000L; 968: } else if (now.tv_nsec < 0) { 969: now.tv_sec--; 970: now.tv_nsec += 1000000000L; 971: } 972: TASK_TS(task) = now; 973: 974: if (root->root_hooks.hook_add.timer) 975: ptr = root->root_hooks.hook_add.timer(task, NULL); 976: else 977: ptr = NULL; 978: 979: if (!ptr) { 980: SCHED_QLOCK(root, taskTIMER); 981: #ifdef TIMER_WITHOUT_SORT 982: TAILQ_INSERT_TAIL(&root->root_timer, task, task_node); 983: #else 984: TAILQ_FOREACH_SAFE(t, &root->root_timer, task_node, tmp) 985: if (sched_timespeccmp(&TASK_TS(task), &TASK_TS(t), -) < 1) 986: break; 987: if (!t) 988: TAILQ_INSERT_TAIL(&root->root_timer, task, task_node); 989: else 990: TAILQ_INSERT_BEFORE(t, task, task_node); 991: #endif 992: SCHED_QUNLOCK(root, taskTIMER); 993: } else 994: task = sched_unuseTask(task); 995: 996: return task; 997: } 998: 999: /* 1000: * schedEvent() - Add EVENT task to scheduler queue 1001: * 1002: * @root = root task 1003: * @func = task execution function 1004: * @arg = 1st func argument 1005: * @val = additional func argument 1006: * @opt_data = Optional data 1007: * @opt_dlen = Optional data length 1008: * return: NULL error or !=NULL new queued task 1009: */ 1010: sched_task_t * 1011: schedEvent(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val, 1012: void *opt_data, size_t opt_dlen) 1013: { 1014: sched_task_t *task; 1015: void *ptr; 1016: 1017: if (!root || !func) 1018: return NULL; 1019: 1020: /* get new task */ 1021: if (!(task = sched_useTask(root))) 1022: return NULL; 1023: 1024: TASK_FUNC(task) = func; 1025: TASK_TYPE(task) = taskEVENT; 1026: TASK_ROOT(task) = root; 1027: 1028: TASK_ARG(task) = arg; 1029: TASK_VAL(task) = val; 1030: 1031: TASK_DATA(task) = opt_data; 1032: TASK_DATLEN(task) = opt_dlen; 1033: 1034: if (root->root_hooks.hook_add.event) 1035: ptr = root->root_hooks.hook_add.event(task, NULL); 1036: else 1037: ptr = NULL; 1038: 1039: if (!ptr) 1040: insert_task_to(task, &root->root_event); 1041: else 1042: task = sched_unuseTask(task); 1043: 1044: return task; 1045: } 1046: 1047: 1048: /* 1049: * schedTask() - Add regular task to scheduler queue 1050: * 1051: * @root = root task 1052: * @func = task execution function 1053: * @arg = 1st func argument 1054: * @prio = regular task priority, 0 is hi priority for regular tasks 1055: * @opt_data = Optional data 1056: * @opt_dlen = Optional data length 1057: * return: NULL error or !=NULL new queued task 1058: */ 1059: sched_task_t * 1060: schedTask(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long prio, 1061: void *opt_data, size_t opt_dlen) 1062: { 1063: sched_task_t *task, *tmp, *t = NULL; 1064: void *ptr; 1065: 1066: if (!root || !func) 1067: return NULL; 1068: 1069: /* get new task */ 1070: if (!(task = sched_useTask(root))) 1071: return NULL; 1072: 1073: TASK_FUNC(task) = func; 1074: TASK_TYPE(task) = taskTASK; 1075: TASK_ROOT(task) = root; 1076: 1077: TASK_ARG(task) = arg; 1078: TASK_VAL(task) = prio; 1079: 1080: TASK_DATA(task) = opt_data; 1081: TASK_DATLEN(task) = opt_dlen; 1082: 1083: if (root->root_hooks.hook_add.task) 1084: ptr = root->root_hooks.hook_add.task(task, NULL); 1085: else 1086: ptr = NULL; 1087: 1088: if (!ptr) { 1089: SCHED_QLOCK(root, taskTASK); 1090: TAILQ_FOREACH_SAFE(t, &root->root_task, task_node, tmp) 1091: if (TASK_VAL(task) < TASK_VAL(t)) 1092: break; 1093: if (!t) 1094: TAILQ_INSERT_TAIL(&root->root_task, task, task_node); 1095: else 1096: TAILQ_INSERT_BEFORE(t, task, task_node); 1097: SCHED_QUNLOCK(root, taskTASK); 1098: } else 1099: task = sched_unuseTask(task); 1100: 1101: return task; 1102: } 1103: 1104: /* 1105: * schedSuspend() - Add Suspended task to scheduler queue 1106: * 1107: * @root = root task 1108: * @func = task execution function 1109: * @arg = 1st func argument 1110: * @id = Trigger ID 1111: * @opt_data = Optional data 1112: * @opt_dlen = Optional data length 1113: * return: NULL error or !=NULL new queued task 1114: */ 1115: sched_task_t * 1116: schedSuspend(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id, 1117: void *opt_data, size_t opt_dlen) 1118: { 1119: sched_task_t *task; 1120: void *ptr; 1121: 1122: if (!root || !func) 1123: return NULL; 1124: 1125: /* get new task */ 1126: if (!(task = sched_useTask(root))) 1127: return NULL; 1128: 1129: TASK_FUNC(task) = func; 1130: TASK_TYPE(task) = taskSUSPEND; 1131: TASK_ROOT(task) = root; 1132: 1133: TASK_ARG(task) = arg; 1134: TASK_VAL(task) = id; 1135: 1136: TASK_DATA(task) = opt_data; 1137: TASK_DATLEN(task) = opt_dlen; 1138: 1139: if (root->root_hooks.hook_add.suspend) 1140: ptr = root->root_hooks.hook_add.suspend(task, NULL); 1141: else 1142: ptr = NULL; 1143: 1144: if (!ptr) 1145: insert_task_to(task, &root->root_suspend); 1146: else 1147: task = sched_unuseTask(task); 1148: 1149: return task; 1150: } 1151: 1152: /* 1153: * schedCallOnce() - Call once from scheduler 1154: * 1155: * @root = root task 1156: * @func = task execution function 1157: * @arg = 1st func argument 1158: * @val = additional func argument 1159: * @opt_data = Optional data 1160: * @opt_dlen = Optional data length 1161: * return: return value from called func 1162: */ 1163: sched_task_t * 1164: schedCallOnce(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val, 1165: void *opt_data, size_t opt_dlen) 1166: { 1167: sched_task_t *task; 1168: void *ret; 1169: 1170: if (!root || !func) 1171: return NULL; 1172: 1173: /* get new task */ 1174: if (!(task = sched_useTask(root))) 1175: return NULL; 1176: 1177: TASK_FUNC(task) = func; 1178: TASK_TYPE(task) = taskEVENT; 1179: TASK_ROOT(task) = root; 1180: 1181: TASK_ARG(task) = arg; 1182: TASK_VAL(task) = val; 1183: 1184: TASK_DATA(task) = opt_data; 1185: TASK_DATLEN(task) = opt_dlen; 1186: 1187: ret = schedCall(task); 1188: 1189: sched_unuseTask(task); 1190: return ret; 1191: } 1192: 1193: /* 1194: * schedThread() - Add thread task to scheduler queue 1195: * 1196: * @root = root task 1197: * @func = task execution function 1198: * @arg = 1st func argument 1199: * @ss = stack size 1200: * @opt_data = Optional data 1201: * @opt_dlen = Optional data length 1202: * return: NULL error or !=NULL new queued task 1203: */ 1204: sched_task_t * 1205: schedThread(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, 1206: size_t ss, void *opt_data, size_t opt_dlen) 1207: { 1208: #ifndef HAVE_LIBPTHREAD 1209: sched_SetErr(ENOTSUP, "Not supported thread tasks"); 1210: return NULL; 1211: #endif 1212: sched_task_t *task; 1213: pthread_attr_t attr; 1214: void *ptr; 1215: 1216: if (!root || !func) 1217: return NULL; 1218: 1219: /* get new task */ 1220: if (!(task = sched_useTask(root))) 1221: return NULL; 1222: 1223: TASK_FUNC(task) = func; 1224: TASK_TYPE(task) = taskTHREAD; 1225: TASK_ROOT(task) = root; 1226: 1227: TASK_ARG(task) = arg; 1228: 1229: TASK_DATA(task) = opt_data; 1230: TASK_DATLEN(task) = opt_dlen; 1231: 1232: pthread_attr_init(&attr); 1233: pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); 1234: if (ss && (errno = pthread_attr_setstacksize(&attr, ss))) { 1235: LOGERR; 1236: pthread_attr_destroy(&attr); 1237: return sched_unuseTask(task); 1238: } 1239: if ((errno = pthread_attr_getstacksize(&attr, &ss))) { 1240: LOGERR; 1241: pthread_attr_destroy(&attr); 1242: return sched_unuseTask(task); 1243: } else 1244: TASK_FLAG(task) = ss; 1245: 1246: #ifdef SCHED_RR 1247: pthread_attr_setschedpolicy(&attr, SCHED_RR); 1248: #else 1249: pthread_attr_setschedpolicy(&attr, SCHED_OTHER); 1250: #endif 1251: 1252: if (root->root_hooks.hook_add.thread) 1253: ptr = root->root_hooks.hook_add.thread(task, &attr); 1254: else 1255: ptr = NULL; 1256: 1257: if (!ptr) 1258: insert_task_to(task, &root->root_thread); 1259: else 1260: task = sched_unuseTask(task); 1261: 1262: pthread_attr_destroy(&attr); 1263: return task; 1264: } 1265: 1266: /* 1267: * schedRTC() - Add RTC task to scheduler queue 1268: * 1269: * @root = root task 1270: * @func = task execution function 1271: * @arg = 1st func argument 1272: * @ts = timeout argument structure, minimum alarm timer resolution is 1msec! 1273: * @opt_data = Optional RTC ID 1274: * @opt_dlen = Optional data length 1275: * return: NULL error or !=NULL new queued task 1276: */ 1277: sched_task_t * 1278: schedRTC(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts, 1279: void *opt_data, size_t opt_dlen) 1280: { 1281: #if defined(HAVE_TIMER_CREATE) && defined(HAVE_TIMER_SETTIME) && defined(HAVE_TIMER_DELETE) 1282: sched_task_t *task; 1283: void *ptr; 1284: 1285: if (!root || !func) 1286: return NULL; 1287: 1288: /* get new task */ 1289: if (!(task = sched_useTask(root))) 1290: return NULL; 1291: 1292: TASK_FUNC(task) = func; 1293: TASK_TYPE(task) = taskRTC; 1294: TASK_ROOT(task) = root; 1295: 1296: TASK_ARG(task) = arg; 1297: TASK_TS(task) = ts; 1298: 1299: TASK_DATA(task) = opt_data; 1300: TASK_DATLEN(task) = opt_dlen; 1301: 1302: if (root->root_hooks.hook_add.rtc) 1303: ptr = root->root_hooks.hook_add.rtc(task, NULL); 1304: else 1305: ptr = NULL; 1306: 1307: if (!ptr) 1308: insert_task_to(task, &root->root_rtc); 1309: else 1310: task = sched_unuseTask(task); 1311: 1312: return task; 1313: #else 1314: sched_SetErr(ENOTSUP, "Not supported realtime clock extensions"); 1315: return NULL; 1316: #endif 1317: }