--- libaitsched/src/aitsched.c 2011/10/04 23:12:33 1.2.2.5 +++ libaitsched/src/aitsched.c 2012/05/14 12:09:13 1.7 @@ -3,7 +3,7 @@ * by Michael Pounov * * $Author: misho $ -* $Id: aitsched.c,v 1.2.2.5 2011/10/04 23:12:33 misho Exp $ +* $Id: aitsched.c,v 1.7 2012/05/14 12:09:13 misho Exp $ * ************************************************************************** The ELWIX and AITNET software is distributed under the following @@ -12,7 +12,7 @@ terms: All of the documentation and software included in the ELWIX and AITNET Releases is copyrighted by ELWIX - Sofia/Bulgaria -Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011 +Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012 by Michael Pounov . All rights reserved. Redistribution and use in source and binary forms, with or without @@ -86,14 +86,14 @@ sched_SetErr(int eno, char *estr, ...) /* * schedRegisterHooks() - Register IO handles and bind tasks to it + * * @root = root task * return: -1 error or 0 ok */ int schedRegisterHooks(sched_root_task_t * __restrict root) { - if (!root || (root->root_data.iov_base && root->root_data.iov_len)) - return -1; + assert(root); if (root->root_hooks.hook_root.fini) root->root_hooks.hook_root.fini(root, NULL); @@ -113,6 +113,7 @@ schedRegisterHooks(sched_root_task_t * __restrict root /* * schedInit() - Init scheduler + * * @data = optional data if !=NULL * @datlen = data len if data is set * return: allocated root task if ok or NULL error @@ -122,12 +123,33 @@ schedInit(void ** __restrict data, size_t datlen) { sched_root_task_t *root = NULL; int (*func)(sched_root_task_t *); +#ifdef HAVE_LIBPTHREAD + register int i; +#endif root = malloc(sizeof(sched_root_task_t)); if (!root) { LOGERR; } else { memset(root, 0, sizeof(sched_root_task_t)); + + /* INFINIT polling period by default */ + sched_timespecinf(&root->root_poll); + +#ifdef HAVE_LIBPTHREAD + for (i = 0; i < taskMAX; i++) + if (pthread_mutex_init(&root->root_mtx[i], NULL)) { + LOGERR; + while (i) + pthread_mutex_destroy(&root->root_mtx[--i]); + free(root); + return NULL; + } + + for (i = 0; i < taskMAX; i++) + pthread_mutex_lock(&root->root_mtx[i]); +#endif + TAILQ_INIT(&root->root_read); TAILQ_INIT(&root->root_write); TAILQ_INIT(&root->root_timer); @@ -136,11 +158,17 @@ schedInit(void ** __restrict data, size_t datlen) TAILQ_INIT(&root->root_ready); TAILQ_INIT(&root->root_unuse); +#ifdef HAVE_LIBPTHREAD + for (i = 0; i < taskMAX; i++) + pthread_mutex_unlock(&root->root_mtx[i]); +#endif + if (data && *data) { if (datlen) { root->root_data.iov_base = *data; root->root_data.iov_len = datlen; - } else { + } else { /* if datlen == 0, switch to callbacks init mode */ + /* little hack :) for correct initialization of scheduler */ func = (int(*)(sched_root_task_t*)) data; func(root); } @@ -155,41 +183,59 @@ schedInit(void ** __restrict data, size_t datlen) /* * schedEnd() - End scheduler & free all resources + * * @root = root task * return: -1 error or 0 ok */ int schedEnd(sched_root_task_t ** __restrict root) { - sched_task_t *task; + sched_task_t *task, *tmp; +#ifdef HAVE_LIBPTHREAD + register int i; +#endif if (!root || !*root) return -1; - TAILQ_FOREACH(task, &(*root)->root_read, task_node) { + TAILQ_FOREACH_SAFE(task, &(*root)->root_read, task_node, tmp) { schedCancel(task); } - TAILQ_FOREACH(task, &(*root)->root_write, task_node) { + TAILQ_FOREACH_SAFE(task, &(*root)->root_write, task_node, tmp) { schedCancel(task); } - TAILQ_FOREACH(task, &(*root)->root_timer, task_node) { + TAILQ_FOREACH_SAFE(task, &(*root)->root_timer, task_node, tmp) { schedCancel(task); } - TAILQ_FOREACH(task, &(*root)->root_event, task_node) { + TAILQ_FOREACH_SAFE(task, &(*root)->root_event, task_node, tmp) { schedCancel(task); } - TAILQ_FOREACH(task, &(*root)->root_ready, task_node) { + TAILQ_FOREACH_SAFE(task, &(*root)->root_eventlo, task_node, tmp) { schedCancel(task); } + TAILQ_FOREACH_SAFE(task, &(*root)->root_ready, task_node, tmp) { + schedCancel(task); + } +#ifdef HAVE_LIBPTHREAD + pthread_mutex_lock(&(*root)->root_mtx[taskUNUSE]); +#endif while ((task = TAILQ_FIRST(&(*root)->root_unuse))) { TAILQ_REMOVE(&(*root)->root_unuse, task, task_node); free(task); } +#ifdef HAVE_LIBPTHREAD + pthread_mutex_unlock(&(*root)->root_mtx[taskUNUSE]); +#endif if ((*root)->root_hooks.hook_root.fini) (*root)->root_hooks.hook_root.fini(*root, NULL); +#ifdef HAVE_LIBPTHREAD + for (i = 0; i < taskMAX; i++) + pthread_mutex_destroy(&(*root)->root_mtx[i]); +#endif + free(*root); *root = NULL; return 0; @@ -197,21 +243,31 @@ schedEnd(sched_root_task_t ** __restrict root) /* * schedCall() - Call task execution function + * * @task = current task * return: !=NULL error or =NULL ok */ inline void * schedCall(sched_task_t * __restrict task) { + void *ptr = (void*) -1; + if (!task) - return (void*) -1; + return ptr; + if (!TASK_ISLOCKED(task)) + TASK_LOCK(task); + task->task_id++; - return task->task_func(task); + ptr = task->task_func(task); + + TASK_UNLOCK(task); + return ptr; } /* * schedFetch() - Fetch ready task + * * @root = root task * return: =NULL error or !=NULL ready task */ @@ -233,6 +289,7 @@ schedFetch(sched_root_task_t * __restrict root) /* * schedCancel() - Cancel task from scheduler + * * @task = task * return: -1 error or 0 ok */ @@ -241,76 +298,111 @@ schedCancel(sched_task_t * __restrict task) { sched_queue_t *queue; - if (!task || !task->task_root) + if (!task || !TASK_ROOT(task)) return -1; - if (task->task_root->root_hooks.hook_exec.cancel) - if (task->task_root->root_hooks.hook_exec.cancel(task, NULL)) + if (TASK_ROOT(task)->root_hooks.hook_exec.cancel) + if (TASK_ROOT(task)->root_hooks.hook_exec.cancel(task, NULL)) return -1; - switch (task->task_type) { + switch (TASK_TYPE(task)) { case taskREAD: - queue = &task->task_root->root_read; + queue = &TASK_ROOT(task)->root_read; break; case taskWRITE: - queue = &task->task_root->root_write; + queue = &TASK_ROOT(task)->root_write; break; case taskTIMER: - queue = &task->task_root->root_timer; + queue = &TASK_ROOT(task)->root_timer; break; case taskEVENT: - queue = &task->task_root->root_event; + queue = &TASK_ROOT(task)->root_event; break; + case taskEVENTLO: + queue = &TASK_ROOT(task)->root_eventlo; + break; case taskREADY: - queue = &task->task_root->root_ready; + queue = &TASK_ROOT(task)->root_ready; break; default: queue = NULL; } - if (queue) + if (queue) { +#ifdef HAVE_LIBPTHREAD + pthread_mutex_lock(&TASK_ROOT(task)->root_mtx[TASK_TYPE(task)]); +#endif TAILQ_REMOVE(queue, task, task_node); - if (task->task_type != taskUNUSE) { - task->task_type = taskUNUSE; - TAILQ_INSERT_TAIL(&task->task_root->root_unuse, task, task_node); +#ifdef HAVE_LIBPTHREAD + pthread_mutex_unlock(&TASK_ROOT(task)->root_mtx[TASK_TYPE(task)]); +#endif } + if (TASK_TYPE(task) != taskUNUSE) + _sched_unuseTask(task); return 0; } /* * schedCancelby() - Cancel task from scheduler by criteria + * * @root = root task - * @queue = cancel from queue, if =NULL cancel same task from all queues + * @type = cancel from queue type, if =taskMAX cancel same task from all queues * @criteria = find task by criteria [CRITERIA_CALL|CRITERIA_ARG|CRITERIA_FD|CRITERIA_VAL|CRITERIA_TV] * @param = search parameter * @hook = custom cleanup hook function, may be NULL - * return: -1 error or 0 ok + * return: -1 error, -2 error in sub-stage cancel execution, -3 error from custom hook or 0 ok */ int -schedCancelby(sched_root_task_t * __restrict root, sched_queue_t * __restrict queue, +schedCancelby(sched_root_task_t * __restrict root, sched_task_type_t type, u_char criteria, void *param, sched_hook_func_t hook) { sched_task_t *task; + sched_queue_t *queue; int flg = 0; if (!root) return -1; - if (!queue) { - if (schedCancelby(root, &root->root_read, criteria, param, hook)) + if (type == taskMAX) { + if (schedCancelby(root, taskREAD, criteria, param, hook)) return -2; - if (schedCancelby(root, &root->root_write, criteria, param, hook)) + if (schedCancelby(root, taskWRITE, criteria, param, hook)) return -2; - if (schedCancelby(root, &root->root_timer, criteria, param, hook)) + if (schedCancelby(root, taskTIMER, criteria, param, hook)) return -2; - if (schedCancelby(root, &root->root_event, criteria, param, hook)) + if (schedCancelby(root, taskEVENT, criteria, param, hook)) return -2; - if (schedCancelby(root, &root->root_ready, criteria, param, hook)) + if (schedCancelby(root, taskEVENTLO, criteria, param, hook)) return -2; - if (schedCancelby(root, &root->root_read, criteria, param, hook)) + if (schedCancelby(root, taskREADY, criteria, param, hook)) return -2; return 0; } + switch (type) { + case taskREAD: + queue = &root->root_read; + break; + case taskWRITE: + queue = &root->root_write; + break; + case taskTIMER: + queue = &root->root_timer; + break; + case taskEVENT: + queue = &root->root_event; + break; + case taskEVENTLO: + queue = &root->root_eventlo; + break; + case taskREADY: + queue = &root->root_ready; + break; + default: + return 0; + } +#ifdef HAVE_LIBPTHREAD + pthread_mutex_lock(&root->root_mtx[type]); +#endif TAILQ_FOREACH(task, queue, task_node) if (criteria == CRITERIA_CALL) { if (task->task_func == (sched_task_func_t) param) { @@ -333,41 +425,52 @@ schedCancelby(sched_root_task_t * __restrict root, sch break; } } else if (criteria == CRITERIA_TV) { - if (!timercmp(&TASK_TV(task), (struct timeval*) param, -)) { + if (!sched_timespeccmp(&TASK_TS(task), (struct timespec*) param, -)) { flg++; break; } } else { +#ifdef HAVE_LIBPTHREAD + pthread_mutex_unlock(&root->root_mtx[type]); +#endif sched_SetErr(EINVAL, "Invalid parameter criteria %d", criteria); return -1; } +#ifdef HAVE_LIBPTHREAD + pthread_mutex_unlock(&root->root_mtx[type]); +#endif if (!flg || !task) /* task not found */ return 0; - if (task->task_root->root_hooks.hook_exec.cancel) - if (task->task_root->root_hooks.hook_exec.cancel(task, NULL)) + if (TASK_ROOT(task)->root_hooks.hook_exec.cancel) + if (TASK_ROOT(task)->root_hooks.hook_exec.cancel(task, NULL)) return -1; if (hook) if (hook(task, NULL)) return -3; +#ifdef HAVE_LIBPTHREAD + pthread_mutex_lock(&TASK_ROOT(task)->root_mtx[type]); +#endif TAILQ_REMOVE(queue, task, task_node); +#ifdef HAVE_LIBPTHREAD + pthread_mutex_unlock(&TASK_ROOT(task)->root_mtx[type]); +#endif - if (task->task_type != taskUNUSE) { - task->task_type = taskUNUSE; - TAILQ_INSERT_TAIL(&task->task_root->root_unuse, task, task_node); - } + if (TASK_TYPE(task) != taskUNUSE) + _sched_unuseTask(task); return 0; } /* * schedRun() - Scheduler *run loop* + * * @root = root task * @killState = kill condition variable, if !=0 stop scheduler loop * return: -1 error or 0 ok */ int -schedRun(sched_root_task_t * __restrict root, volatile intptr_t * __restrict killState) +schedRun(sched_root_task_t *root, volatile intptr_t * __restrict killState) { sched_task_t *task; @@ -377,16 +480,71 @@ schedRun(sched_root_task_t * __restrict root, volatile if (root->root_hooks.hook_exec.run) if (root->root_hooks.hook_exec.run(root, NULL)) return -1; - if (root->root_hooks.hook_exec.fetch) { - if (killState) - while (!*killState) { + + if (killState) { + if (root->root_hooks.hook_exec.condition) + /* condition scheduler loop */ + while (root && root->root_hooks.hook_exec.fetch && + root->root_hooks.hook_exec.condition && + root->root_hooks.hook_exec.condition(root, (void*) killState)) { if ((task = root->root_hooks.hook_exec.fetch(root, NULL))) schedCall(task); } else - while ((task = root->root_hooks.hook_exec.fetch(root, NULL))) + /* trigger scheduler loop */ + while (!*killState && root && root->root_hooks.hook_exec.fetch) { + if ((task = root->root_hooks.hook_exec.fetch(root, NULL))) + schedCall(task); + } + } else + /* infinite scheduler loop */ + while (root && root->root_hooks.hook_exec.fetch) + if ((task = root->root_hooks.hook_exec.fetch(root, NULL))) schedCall(task); - } + return 0; +} + +/* + * schedPolling() - Polling timeout period if no timer task is present + * + * @root = root task + * @ts = timeout polling period, if ==NULL INFINIT timeout + * @tsold = old timeout polling if !=NULL + * return: -1 error or 0 ok + */ +inline int +schedPolling(sched_root_task_t * __restrict root, struct timespec * __restrict ts, + struct timespec * __restrict tsold) +{ + if (!root) + return -1; + + if (tsold) + *tsold = root->root_poll; + + if (!ts) + sched_timespecinf(&root->root_poll); + else + root->root_poll = *ts; + + return 0; +} + +/* + * schedTermCondition() - Activate hook for scheduler condition kill + * + * @root = root task + * @condValue = condition value, kill schedRun() if condValue == killState + * return: -1 error ok 0 ok + */ +inline int +schedTermCondition(sched_root_task_t * __restrict root, intptr_t condValue) +{ + if (!root) + return -1; + + root->root_cond = condValue; + root->root_hooks.hook_exec.condition = sched_hook_condition; return 0; }