--- libaitsched/src/aitsched.c 2012/01/08 00:51:17 1.4 +++ libaitsched/src/aitsched.c 2012/01/08 02:52:29 1.4.2.2 @@ -3,7 +3,7 @@ * by Michael Pounov * * $Author: misho $ -* $Id: aitsched.c,v 1.4 2012/01/08 00:51:17 misho Exp $ +* $Id: aitsched.c,v 1.4.2.2 2012/01/08 02:52:29 misho Exp $ * ************************************************************************** The ELWIX and AITNET software is distributed under the following @@ -122,12 +122,30 @@ schedInit(void ** __restrict data, size_t datlen) { sched_root_task_t *root = NULL; int (*func)(sched_root_task_t *); +#ifdef HAVE_LIBPTHREAD + register int i; +#endif root = malloc(sizeof(sched_root_task_t)); if (!root) { LOGERR; } else { memset(root, 0, sizeof(sched_root_task_t)); + +#ifdef HAVE_LIBPTHREAD + for (i = 0; i < taskMAX; i++) + if (pthread_mutex_init(&root->root_mtx[i], NULL)) { + LOGERR; + while (i) + pthread_mutex_destroy(&root->root_mtx[--i]); + free(root); + return NULL; + } + + for (i = 0; i < taskMAX; i++) + pthread_mutex_lock(&root->root_mtx[i]); +#endif + TAILQ_INIT(&root->root_read); TAILQ_INIT(&root->root_write); TAILQ_INIT(&root->root_timer); @@ -136,6 +154,11 @@ schedInit(void ** __restrict data, size_t datlen) TAILQ_INIT(&root->root_ready); TAILQ_INIT(&root->root_unuse); +#ifdef HAVE_LIBPTHREAD + for (i = 0; i < taskMAX; i++) + pthread_mutex_unlock(&root->root_mtx[i]); +#endif + if (data && *data) { if (datlen) { root->root_data.iov_base = *data; @@ -163,10 +186,17 @@ int schedEnd(sched_root_task_t ** __restrict root) { sched_task_t *task; +#ifdef HAVE_LIBPTHREAD + register int i; +#endif if (!root || !*root) return -1; +#ifdef HAVE_LIBPTHREAD + for (i = 0; i < taskMAX; i++) + pthread_mutex_lock(&(*root)->root_mtx[i]); +#endif TAILQ_FOREACH(task, &(*root)->root_read, task_node) { schedCancel(task); } @@ -179,6 +209,9 @@ schedEnd(sched_root_task_t ** __restrict root) TAILQ_FOREACH(task, &(*root)->root_event, task_node) { schedCancel(task); } + TAILQ_FOREACH(task, &(*root)->root_eventlo, task_node) { + schedCancel(task); + } TAILQ_FOREACH(task, &(*root)->root_ready, task_node) { schedCancel(task); } @@ -187,10 +220,19 @@ schedEnd(sched_root_task_t ** __restrict root) TAILQ_REMOVE(&(*root)->root_unuse, task, task_node); free(task); } +#ifdef HAVE_LIBPTHREAD + for (i = 0; i < taskMAX; i++) + pthread_mutex_unlock(&(*root)->root_mtx[i]); +#endif if ((*root)->root_hooks.hook_root.fini) (*root)->root_hooks.hook_root.fini(*root, NULL); +#ifdef HAVE_LIBPTHREAD + for (i = 0; i < taskMAX; i++) + pthread_mutex_destroy(&(*root)->root_mtx[i]); +#endif + free(*root); *root = NULL; return 0; @@ -250,35 +292,45 @@ schedCancel(sched_task_t * __restrict task) { sched_queue_t *queue; - if (!task || !task->task_root) + if (!task || !TASK_ROOT(task)) return -1; - if (task->task_root->root_hooks.hook_exec.cancel) - if (task->task_root->root_hooks.hook_exec.cancel(task, NULL)) + if (TASK_ROOT(task)->root_hooks.hook_exec.cancel) + if (TASK_ROOT(task)->root_hooks.hook_exec.cancel(task, NULL)) return -1; - switch (task->task_type) { + switch (TASK_TYPE(task)) { case taskREAD: - queue = &task->task_root->root_read; + queue = &TASK_ROOT(task)->root_read; break; case taskWRITE: - queue = &task->task_root->root_write; + queue = &TASK_ROOT(task)->root_write; break; case taskTIMER: - queue = &task->task_root->root_timer; + queue = &TASK_ROOT(task)->root_timer; break; case taskEVENT: - queue = &task->task_root->root_event; + queue = &TASK_ROOT(task)->root_event; break; + case taskEVENTLO: + queue = &TASK_ROOT(task)->root_eventlo; + break; case taskREADY: - queue = &task->task_root->root_ready; + queue = &TASK_ROOT(task)->root_ready; break; default: queue = NULL; } - if (queue) + if (queue) { +#ifdef HAVE_LIBPTHREAD + pthread_mutex_lock(&TASK_ROOT(task)->root_mtx[TASK_TYPE(task)]); +#endif TAILQ_REMOVE(queue, task, task_node); - if (task->task_type != taskUNUSE) +#ifdef HAVE_LIBPTHREAD + pthread_mutex_unlock(&TASK_ROOT(task)->root_mtx[TASK_TYPE(task)]); +#endif + } + if (TASK_TYPE(task) != taskUNUSE) _sched_unuseTask(task); return 0; @@ -287,37 +339,63 @@ schedCancel(sched_task_t * __restrict task) /* * schedCancelby() - Cancel task from scheduler by criteria * @root = root task - * @queue = cancel from queue, if =NULL cancel same task from all queues + * @type = cancel from queue type, if =taskMAX cancel same task from all queues * @criteria = find task by criteria [CRITERIA_CALL|CRITERIA_ARG|CRITERIA_FD|CRITERIA_VAL|CRITERIA_TV] * @param = search parameter * @hook = custom cleanup hook function, may be NULL * return: -1 error, -2 error in sub-stage cancel execution, -3 error from custom hook or 0 ok */ int -schedCancelby(sched_root_task_t * __restrict root, sched_queue_t * __restrict queue, +schedCancelby(sched_root_task_t * __restrict root, sched_task_type_t type, u_char criteria, void *param, sched_hook_func_t hook) { sched_task_t *task; + sched_queue_t *queue; int flg = 0; if (!root) return -1; - if (!queue) { - if (schedCancelby(root, &root->root_read, criteria, param, hook)) + if (type == taskMAX) { + if (schedCancelby(root, taskREAD, criteria, param, hook)) return -2; - if (schedCancelby(root, &root->root_write, criteria, param, hook)) + if (schedCancelby(root, taskWRITE, criteria, param, hook)) return -2; - if (schedCancelby(root, &root->root_timer, criteria, param, hook)) + if (schedCancelby(root, taskTIMER, criteria, param, hook)) return -2; - if (schedCancelby(root, &root->root_event, criteria, param, hook)) + if (schedCancelby(root, taskEVENT, criteria, param, hook)) return -2; - if (schedCancelby(root, &root->root_ready, criteria, param, hook)) + if (schedCancelby(root, taskEVENTLO, criteria, param, hook)) return -2; - if (schedCancelby(root, &root->root_read, criteria, param, hook)) + if (schedCancelby(root, taskREADY, criteria, param, hook)) return -2; return 0; } + switch (type) { + case taskREAD: + queue = &root->root_read; + break; + case taskWRITE: + queue = &root->root_write; + break; + case taskTIMER: + queue = &root->root_timer; + break; + case taskEVENT: + queue = &root->root_event; + break; + case taskEVENTLO: + queue = &root->root_eventlo; + break; + case taskREADY: + queue = &root->root_ready; + break; + default: + return 0; + } +#ifdef HAVE_LIBPTHREAD + pthread_mutex_lock(&root->root_mtx[type]); +#endif TAILQ_FOREACH(task, queue, task_node) if (criteria == CRITERIA_CALL) { if (task->task_func == (sched_task_func_t) param) { @@ -345,22 +423,34 @@ schedCancelby(sched_root_task_t * __restrict root, sch break; } } else { +#ifdef HAVE_LIBPTHREAD + pthread_mutex_unlock(&root->root_mtx[type]); +#endif sched_SetErr(EINVAL, "Invalid parameter criteria %d", criteria); return -1; } +#ifdef HAVE_LIBPTHREAD + pthread_mutex_unlock(&root->root_mtx[type]); +#endif if (!flg || !task) /* task not found */ return 0; - if (task->task_root->root_hooks.hook_exec.cancel) - if (task->task_root->root_hooks.hook_exec.cancel(task, NULL)) + if (TASK_ROOT(task)->root_hooks.hook_exec.cancel) + if (TASK_ROOT(task)->root_hooks.hook_exec.cancel(task, NULL)) return -1; if (hook) if (hook(task, NULL)) return -3; +#ifdef HAVE_LIBPTHREAD + pthread_mutex_lock(&TASK_ROOT(task)->root_mtx[type]); +#endif TAILQ_REMOVE(queue, task, task_node); +#ifdef HAVE_LIBPTHREAD + pthread_mutex_unlock(&TASK_ROOT(task)->root_mtx[type]); +#endif - if (task->task_type != taskUNUSE) + if (TASK_TYPE(task) != taskUNUSE) _sched_unuseTask(task); return 0; }