--- libaitsched/src/tasks.c 2014/06/03 20:39:54 1.24.2.4 +++ libaitsched/src/tasks.c 2022/11/29 20:15:18 1.30 @@ -3,7 +3,7 @@ * by Michael Pounov * * $Author: misho $ -* $Id: tasks.c,v 1.24.2.4 2014/06/03 20:39:54 misho Exp $ +* $Id: tasks.c,v 1.30 2022/11/29 20:15:18 misho Exp $ * ************************************************************************** The ELWIX and AITNET software is distributed under the following @@ -12,7 +12,7 @@ terms: All of the documentation and software included in the ELWIX and AITNET Releases is copyrighted by ELWIX - Sofia/Bulgaria -Copyright 2004 - 2014 +Copyright 2004 - 2022 by Michael Pounov . All rights reserved. Redistribution and use in source and binary forms, with or without @@ -67,7 +67,7 @@ sched_useTask(sched_root_task_t * __restrict root) SCHED_QUNLOCK(root, taskUNUSE); if (!task) { - task = malloc(sizeof(sched_task_t)); + task = e_malloc(sizeof(sched_task_t)); if (!task) { LOGERR; return NULL; @@ -97,63 +97,6 @@ sched_unuseTask(sched_task_t * __restrict task) return task; } -#pragma GCC visibility push(hidden) - -#ifdef HAVE_LIBPTHREAD -void * -_sched_threadWrapper(sched_task_t *t) -{ - void *ret = NULL; - sched_root_task_t *r; - - if (!t || !TASK_ROOT(t)) - pthread_exit(ret); - else - r = (sched_root_task_t*) TASK_ROOT(t); - - pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); - /* - pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); - */ - - /* notify parent, thread is ready for execution */ - pthread_testcancel(); - - ret = schedCall(t); - r->root_ret = ret; - - if (TASK_VAL(t)) { - transit_task2unuse(t, &r->root_thread); - TASK_VAL(t) = 0; - } - - pthread_exit(ret); -} -#endif - -#if defined(HAVE_TIMER_CREATE) && defined(HAVE_TIMER_SETTIME) && defined(HAVE_TIMER_DELETE) -void * -_sched_rtcWrapper(sched_task_t *t) -{ - sched_task_t *task; - void *ret; - - if (!t || !TASK_ROOT(t) || !TASK_DATA(t)) - return NULL; - else { - task = (sched_task_t*) TASK_DATA(t); - timer_delete((timer_t) TASK_DATLEN(t)); - } - - ret = schedCall(task); - - transit_task2unuse(task, &(TASK_ROOT(task))->root_rtc); - return ret; -} -#endif - -#pragma GCC visibility pop - /* * sched_taskExit() - Exit routine for scheduler task, explicit required for thread tasks * @@ -190,6 +133,25 @@ sched_task_t * schedRead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd, void *opt_data, size_t opt_dlen) { + return schedReadExt(root, func, arg, fd, opt_data, opt_dlen, 0); +} + +/* + * schedReadExt() - Add READ I/O task to scheduler queue with custom event mask + * + * @root = root task + * @func = task execution function + * @arg = 1st func argument + * @fd = fd handle + * @opt_data = Optional data + * @opt_dlen = Optional data length + * @mask = Event mask + * return: NULL error or !=NULL new queued task + */ +sched_task_t * +schedReadExt(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd, + void *opt_data, size_t opt_dlen, u_long mask) +{ sched_task_t *task; void *ptr; @@ -210,8 +172,11 @@ schedRead(sched_root_task_t * __restrict root, sched_t TASK_DATA(task) = opt_data; TASK_DATLEN(task) = opt_dlen; + TASK_HARG(task) = mask; + if (root->root_hooks.hook_add.read) - ptr = root->root_hooks.hook_add.read(task, NULL); + ptr = root->root_hooks.hook_add.read(task, + (void*) task->task_harg); else ptr = NULL; @@ -238,6 +203,25 @@ sched_task_t * schedWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd, void *opt_data, size_t opt_dlen) { + return schedWriteExt(root, func, arg, fd, opt_data, opt_dlen, 0); +} + +/* + * schedWriteExt() - Add WRITE I/O task to scheduler queue with custom event mask + * + * @root = root task + * @func = task execution function + * @arg = 1st func argument + * @fd = fd handle + * @opt_data = Optional data + * @opt_dlen = Optional data length + * @mask = Event mask + * return: NULL error or !=NULL new queued task + */ +sched_task_t * +schedWriteExt(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd, + void *opt_data, size_t opt_dlen, u_long mask) +{ sched_task_t *task; void *ptr; @@ -258,8 +242,11 @@ schedWrite(sched_root_task_t * __restrict root, sched_ TASK_DATA(task) = opt_data; TASK_DATLEN(task) = opt_dlen; + TASK_HARG(task) = mask; + if (root->root_hooks.hook_add.write) - ptr = root->root_hooks.hook_add.write(task, NULL); + ptr = root->root_hooks.hook_add.write(task, + (void*) task->task_harg); else ptr = NULL; @@ -325,6 +312,64 @@ schedNode(sched_root_task_t * __restrict root, sched_t } /* + * schedNode2() - Add NODE task with all events to scheduler queue + * + * @root = root task + * @func = task execution function + * @arg = 1st func argument + * @fd = fd handle + * @opt_data = Optional data + * @opt_dlen = Optional data length + * return: NULL error or !=NULL new queued task + */ +sched_task_t * +schedNode2(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd, + void *opt_data, size_t opt_dlen) +{ +#if SUP_ENABLE != KQ_SUPPORT + sched_SetErr(ENOTSUP, "disabled kqueue support"); + return NULL; +#else + sched_task_t *task; + void *ptr; + + if (!root || !func) + return NULL; + + /* get new task */ + if (!(task = sched_useTask(root))) + return NULL; + + TASK_FUNC(task) = func; + TASK_TYPE(task) = taskNODE; + TASK_ROOT(task) = root; + + TASK_ARG(task) = arg; + TASK_FD(task) = fd; + + TASK_DATA(task) = opt_data; + TASK_DATLEN(task) = opt_dlen; + + if (root->root_hooks.hook_add.node) +#ifdef __FreeBSD__ + ptr = root->root_hooks.hook_add.node(task, + (void*) (NOTE_READ | NOTE_CLOSE_WRITE | NOTE_CLOSE | NOTE_OPEN)); +#else + ptr = root->root_hooks.hook_add.node(task, NULL); +#endif + else + ptr = NULL; + + if (!ptr) + insert_task_to(task, &root->root_node); + else + task = sched_unuseTask(task); + + return task; +#endif /* KQ_SUPPORT */ +} + +/* * schedProc() - Add PROC task to scheduler queue * * @root = root task @@ -630,7 +675,7 @@ schedAIORead(sched_root_task_t * __restrict root, sche } else off = offset; - if (!(acb = malloc(sizeof(struct aiocb)))) { + if (!(acb = e_malloc(sizeof(struct aiocb)))) { LOGERR; return NULL; } else @@ -646,7 +691,7 @@ schedAIORead(sched_root_task_t * __restrict root, sche if (aio_read(acb)) { LOGERR; - free(acb); + e_free(acb); return NULL; } @@ -689,7 +734,7 @@ schedAIOWrite(sched_root_task_t * __restrict root, sch } else off = offset; - if (!(acb = malloc(sizeof(struct aiocb)))) { + if (!(acb = e_malloc(sizeof(struct aiocb)))) { LOGERR; return NULL; } else @@ -705,7 +750,7 @@ schedAIOWrite(sched_root_task_t * __restrict root, sch if (aio_write(acb)) { LOGERR; - free(acb); + e_free(acb); return NULL; } @@ -804,19 +849,19 @@ schedLIORead(sched_root_task_t * __restrict root, sche } else off = offset; - if (!(acb = calloc(sizeof(void*), nbufs))) { + if (!(acb = e_calloc(sizeof(void*), nbufs))) { LOGERR; return NULL; } else memset(acb, 0, sizeof(void*) * nbufs); for (i = 0; i < nbufs; off += bufs[i++].iov_len) { - acb[i] = malloc(sizeof(struct aiocb)); + acb[i] = e_malloc(sizeof(struct aiocb)); if (!acb[i]) { LOGERR; for (i = 0; i < nbufs; i++) if (acb[i]) - free(acb[i]); - free(acb); + e_free(acb[i]); + e_free(acb); return NULL; } else memset(acb[i], 0, sizeof(struct aiocb)); @@ -835,8 +880,8 @@ schedLIORead(sched_root_task_t * __restrict root, sche LOGERR; for (i = 0; i < nbufs; i++) if (acb[i]) - free(acb[i]); - free(acb); + e_free(acb[i]); + e_free(acb); return NULL; } @@ -881,19 +926,19 @@ schedLIOWrite(sched_root_task_t * __restrict root, sch } else off = offset; - if (!(acb = calloc(sizeof(void*), nbufs))) { + if (!(acb = e_calloc(sizeof(void*), nbufs))) { LOGERR; return NULL; } else memset(acb, 0, sizeof(void*) * nbufs); for (i = 0; i < nbufs; off += bufs[i++].iov_len) { - acb[i] = malloc(sizeof(struct aiocb)); + acb[i] = e_malloc(sizeof(struct aiocb)); if (!acb[i]) { LOGERR; for (i = 0; i < nbufs; i++) if (acb[i]) - free(acb[i]); - free(acb); + e_free(acb[i]); + e_free(acb); return NULL; } else memset(acb[i], 0, sizeof(struct aiocb)); @@ -912,8 +957,8 @@ schedLIOWrite(sched_root_task_t * __restrict root, sch LOGERR; for (i = 0; i < nbufs; i++) if (acb[i]) - free(acb[i]); - free(acb); + e_free(acb[i]); + e_free(acb); return NULL; } @@ -1278,7 +1323,8 @@ sched_task_t * schedRTC(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts, void *opt_data, size_t opt_dlen) { -#if defined(HAVE_TIMER_CREATE) && defined(HAVE_TIMER_SETTIME) && defined(HAVE_TIMER_DELETE) +#if defined(HAVE_LIBRT) && defined(HAVE_TIMER_CREATE) && \ + defined(HAVE_TIMER_SETTIME) && defined(HAVE_TIMER_DELETE) sched_task_t *task; void *ptr;