--- libaitsched/src/aitsched.c 2011/10/04 23:12:33 1.2.2.5 +++ libaitsched/src/aitsched.c 2012/01/24 15:30:55 1.4.2.8 @@ -3,7 +3,7 @@ * by Michael Pounov * * $Author: misho $ -* $Id: aitsched.c,v 1.2.2.5 2011/10/04 23:12:33 misho Exp $ +* $Id: aitsched.c,v 1.4.2.8 2012/01/24 15:30:55 misho Exp $ * ************************************************************************** The ELWIX and AITNET software is distributed under the following @@ -122,12 +122,33 @@ schedInit(void ** __restrict data, size_t datlen) { sched_root_task_t *root = NULL; int (*func)(sched_root_task_t *); +#ifdef HAVE_LIBPTHREAD + register int i; +#endif root = malloc(sizeof(sched_root_task_t)); if (!root) { LOGERR; } else { memset(root, 0, sizeof(sched_root_task_t)); + + /* INFINIT polling period by default */ + sched_timespecinf(&root->root_poll); + +#ifdef HAVE_LIBPTHREAD + for (i = 0; i < taskMAX; i++) + if (pthread_mutex_init(&root->root_mtx[i], NULL)) { + LOGERR; + while (i) + pthread_mutex_destroy(&root->root_mtx[--i]); + free(root); + return NULL; + } + + for (i = 0; i < taskMAX; i++) + pthread_mutex_lock(&root->root_mtx[i]); +#endif + TAILQ_INIT(&root->root_read); TAILQ_INIT(&root->root_write); TAILQ_INIT(&root->root_timer); @@ -136,11 +157,17 @@ schedInit(void ** __restrict data, size_t datlen) TAILQ_INIT(&root->root_ready); TAILQ_INIT(&root->root_unuse); +#ifdef HAVE_LIBPTHREAD + for (i = 0; i < taskMAX; i++) + pthread_mutex_unlock(&root->root_mtx[i]); +#endif + if (data && *data) { if (datlen) { root->root_data.iov_base = *data; root->root_data.iov_len = datlen; - } else { + } else { /* if datlen == 0, switch to callbacks init mode */ + /* little hack :) for correct initialization of scheduler */ func = (int(*)(sched_root_task_t*)) data; func(root); } @@ -162,6 +189,9 @@ int schedEnd(sched_root_task_t ** __restrict root) { sched_task_t *task; +#ifdef HAVE_LIBPTHREAD + register int i; +#endif if (!root || !*root) return -1; @@ -178,18 +208,32 @@ schedEnd(sched_root_task_t ** __restrict root) TAILQ_FOREACH(task, &(*root)->root_event, task_node) { schedCancel(task); } + TAILQ_FOREACH(task, &(*root)->root_eventlo, task_node) { + schedCancel(task); + } TAILQ_FOREACH(task, &(*root)->root_ready, task_node) { schedCancel(task); } +#ifdef HAVE_LIBPTHREAD + pthread_mutex_lock(&(*root)->root_mtx[taskUNUSE]); +#endif while ((task = TAILQ_FIRST(&(*root)->root_unuse))) { TAILQ_REMOVE(&(*root)->root_unuse, task, task_node); free(task); } +#ifdef HAVE_LIBPTHREAD + pthread_mutex_unlock(&(*root)->root_mtx[taskUNUSE]); +#endif if ((*root)->root_hooks.hook_root.fini) (*root)->root_hooks.hook_root.fini(*root, NULL); +#ifdef HAVE_LIBPTHREAD + for (i = 0; i < taskMAX; i++) + pthread_mutex_destroy(&(*root)->root_mtx[i]); +#endif + free(*root); *root = NULL; return 0; @@ -203,11 +247,19 @@ schedEnd(sched_root_task_t ** __restrict root) inline void * schedCall(sched_task_t * __restrict task) { + void *ptr = (void*) -1; + if (!task) - return (void*) -1; + return ptr; + if (!TASK_ISLOCKED(task)) + TASK_LOCK(task); + task->task_id++; - return task->task_func(task); + ptr = task->task_func(task); + + TASK_UNLOCK(task); + return ptr; } /* @@ -241,38 +293,46 @@ schedCancel(sched_task_t * __restrict task) { sched_queue_t *queue; - if (!task || !task->task_root) + if (!task || !TASK_ROOT(task)) return -1; - if (task->task_root->root_hooks.hook_exec.cancel) - if (task->task_root->root_hooks.hook_exec.cancel(task, NULL)) + if (TASK_ROOT(task)->root_hooks.hook_exec.cancel) + if (TASK_ROOT(task)->root_hooks.hook_exec.cancel(task, NULL)) return -1; - switch (task->task_type) { + switch (TASK_TYPE(task)) { case taskREAD: - queue = &task->task_root->root_read; + queue = &TASK_ROOT(task)->root_read; break; case taskWRITE: - queue = &task->task_root->root_write; + queue = &TASK_ROOT(task)->root_write; break; case taskTIMER: - queue = &task->task_root->root_timer; + queue = &TASK_ROOT(task)->root_timer; break; case taskEVENT: - queue = &task->task_root->root_event; + queue = &TASK_ROOT(task)->root_event; break; + case taskEVENTLO: + queue = &TASK_ROOT(task)->root_eventlo; + break; case taskREADY: - queue = &task->task_root->root_ready; + queue = &TASK_ROOT(task)->root_ready; break; default: queue = NULL; } - if (queue) + if (queue) { +#ifdef HAVE_LIBPTHREAD + pthread_mutex_lock(&TASK_ROOT(task)->root_mtx[TASK_TYPE(task)]); +#endif TAILQ_REMOVE(queue, task, task_node); - if (task->task_type != taskUNUSE) { - task->task_type = taskUNUSE; - TAILQ_INSERT_TAIL(&task->task_root->root_unuse, task, task_node); +#ifdef HAVE_LIBPTHREAD + pthread_mutex_unlock(&TASK_ROOT(task)->root_mtx[TASK_TYPE(task)]); +#endif } + if (TASK_TYPE(task) != taskUNUSE) + _sched_unuseTask(task); return 0; } @@ -280,37 +340,63 @@ schedCancel(sched_task_t * __restrict task) /* * schedCancelby() - Cancel task from scheduler by criteria * @root = root task - * @queue = cancel from queue, if =NULL cancel same task from all queues + * @type = cancel from queue type, if =taskMAX cancel same task from all queues * @criteria = find task by criteria [CRITERIA_CALL|CRITERIA_ARG|CRITERIA_FD|CRITERIA_VAL|CRITERIA_TV] * @param = search parameter * @hook = custom cleanup hook function, may be NULL - * return: -1 error or 0 ok + * return: -1 error, -2 error in sub-stage cancel execution, -3 error from custom hook or 0 ok */ int -schedCancelby(sched_root_task_t * __restrict root, sched_queue_t * __restrict queue, +schedCancelby(sched_root_task_t * __restrict root, sched_task_type_t type, u_char criteria, void *param, sched_hook_func_t hook) { sched_task_t *task; + sched_queue_t *queue; int flg = 0; if (!root) return -1; - if (!queue) { - if (schedCancelby(root, &root->root_read, criteria, param, hook)) + if (type == taskMAX) { + if (schedCancelby(root, taskREAD, criteria, param, hook)) return -2; - if (schedCancelby(root, &root->root_write, criteria, param, hook)) + if (schedCancelby(root, taskWRITE, criteria, param, hook)) return -2; - if (schedCancelby(root, &root->root_timer, criteria, param, hook)) + if (schedCancelby(root, taskTIMER, criteria, param, hook)) return -2; - if (schedCancelby(root, &root->root_event, criteria, param, hook)) + if (schedCancelby(root, taskEVENT, criteria, param, hook)) return -2; - if (schedCancelby(root, &root->root_ready, criteria, param, hook)) + if (schedCancelby(root, taskEVENTLO, criteria, param, hook)) return -2; - if (schedCancelby(root, &root->root_read, criteria, param, hook)) + if (schedCancelby(root, taskREADY, criteria, param, hook)) return -2; return 0; } + switch (type) { + case taskREAD: + queue = &root->root_read; + break; + case taskWRITE: + queue = &root->root_write; + break; + case taskTIMER: + queue = &root->root_timer; + break; + case taskEVENT: + queue = &root->root_event; + break; + case taskEVENTLO: + queue = &root->root_eventlo; + break; + case taskREADY: + queue = &root->root_ready; + break; + default: + return 0; + } +#ifdef HAVE_LIBPTHREAD + pthread_mutex_lock(&root->root_mtx[type]); +#endif TAILQ_FOREACH(task, queue, task_node) if (criteria == CRITERIA_CALL) { if (task->task_func == (sched_task_func_t) param) { @@ -333,30 +419,40 @@ schedCancelby(sched_root_task_t * __restrict root, sch break; } } else if (criteria == CRITERIA_TV) { - if (!timercmp(&TASK_TV(task), (struct timeval*) param, -)) { + if (!sched_timespeccmp(&TASK_TS(task), (struct timespec*) param, -)) { flg++; break; } } else { +#ifdef HAVE_LIBPTHREAD + pthread_mutex_unlock(&root->root_mtx[type]); +#endif sched_SetErr(EINVAL, "Invalid parameter criteria %d", criteria); return -1; } +#ifdef HAVE_LIBPTHREAD + pthread_mutex_unlock(&root->root_mtx[type]); +#endif if (!flg || !task) /* task not found */ return 0; - if (task->task_root->root_hooks.hook_exec.cancel) - if (task->task_root->root_hooks.hook_exec.cancel(task, NULL)) + if (TASK_ROOT(task)->root_hooks.hook_exec.cancel) + if (TASK_ROOT(task)->root_hooks.hook_exec.cancel(task, NULL)) return -1; if (hook) if (hook(task, NULL)) return -3; +#ifdef HAVE_LIBPTHREAD + pthread_mutex_lock(&TASK_ROOT(task)->root_mtx[type]); +#endif TAILQ_REMOVE(queue, task, task_node); +#ifdef HAVE_LIBPTHREAD + pthread_mutex_unlock(&TASK_ROOT(task)->root_mtx[type]); +#endif - if (task->task_type != taskUNUSE) { - task->task_type = taskUNUSE; - TAILQ_INSERT_TAIL(&task->task_root->root_unuse, task, task_node); - } + if (TASK_TYPE(task) != taskUNUSE) + _sched_unuseTask(task); return 0; } @@ -387,6 +483,31 @@ schedRun(sched_root_task_t * __restrict root, volatile while ((task = root->root_hooks.hook_exec.fetch(root, NULL))) schedCall(task); } + + return 0; +} + +/* + * schedPolling() - Polling timeout period if no timer task is present + * @root = root task + * @ts = timeout polling period, if ==NULL INFINIT timeout + * @tsold = old timeout polling if !=NULL + * return: -1 error or 0 ok + */ +inline int +schedPolling(sched_root_task_t * __restrict root, struct timespec * __restrict ts, + struct timespec * __restrict tsold) +{ + if (!root) + return -1; + + if (tsold) + *tsold = root->root_poll; + + if (!ts) + sched_timespecinf(&root->root_poll); + else + root->root_poll = *ts; return 0; }