File:  [ELWIX - Embedded LightWeight unIX -] / libaitsched / src / tasks.c
Revision 1.20: download - view: text, annotated - select for diffs - revision graph
Mon Aug 26 13:36:45 2013 UTC (10 years, 10 months ago) by misho
Branches: MAIN
CVS tags: sched4_2, SCHED4_1, HEAD
version 4.1

    1: /*************************************************************************
    2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
    3: *  by Michael Pounov <misho@openbsd-bg.org>
    4: *
    5: * $Author: misho $
    6: * $Id: tasks.c,v 1.20 2013/08/26 13:36:45 misho Exp $
    7: *
    8: **************************************************************************
    9: The ELWIX and AITNET software is distributed under the following
   10: terms:
   11: 
   12: All of the documentation and software included in the ELWIX and AITNET
   13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
   14: 
   15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013
   16: 	by Michael Pounov <misho@elwix.org>.  All rights reserved.
   17: 
   18: Redistribution and use in source and binary forms, with or without
   19: modification, are permitted provided that the following conditions
   20: are met:
   21: 1. Redistributions of source code must retain the above copyright
   22:    notice, this list of conditions and the following disclaimer.
   23: 2. Redistributions in binary form must reproduce the above copyright
   24:    notice, this list of conditions and the following disclaimer in the
   25:    documentation and/or other materials provided with the distribution.
   26: 3. All advertising materials mentioning features or use of this software
   27:    must display the following acknowledgement:
   28: This product includes software developed by Michael Pounov <misho@elwix.org>
   29: ELWIX - Embedded LightWeight unIX and its contributors.
   30: 4. Neither the name of AITNET nor the names of its contributors
   31:    may be used to endorse or promote products derived from this software
   32:    without specific prior written permission.
   33: 
   34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
   35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
   36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
   37: ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
   38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
   39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
   40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
   41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
   43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   44: SUCH DAMAGE.
   45: */
   46: #include "global.h"
   47: 
   48: 
   49: /*
   50:  * sched_useTask() - Get and init new task
   51:  *
   52:  * @root = root task
   53:  * return: NULL error or !=NULL prepared task
   54:  */
   55: sched_task_t *
   56: sched_useTask(sched_root_task_t * __restrict root)
   57: {
   58: 	sched_task_t *task, *tmp;
   59: 
   60: #ifdef HAVE_LIBPTHREAD
   61: 	pthread_mutex_lock(&root->root_mtx[taskUNUSE]);
   62: #endif
   63: 	TAILQ_FOREACH_SAFE(task, &root->root_unuse, task_node, tmp) {
   64: 		if (!TASK_ISLOCKED(task)) {
   65: 			TAILQ_REMOVE(&root->root_unuse, task, task_node);
   66: 			break;
   67: 		}
   68: 	}
   69: #ifdef HAVE_LIBPTHREAD
   70: 	pthread_mutex_unlock(&root->root_mtx[taskUNUSE]);
   71: #endif
   72: 
   73: 	if (!task) {
   74: 		task = malloc(sizeof(sched_task_t));
   75: 		if (!task) {
   76: 			LOGERR;
   77: 			return NULL;
   78: 		}
   79: 	}
   80: 
   81: 	memset(task, 0, sizeof(sched_task_t));
   82: 	task->task_id = (uintptr_t) task;
   83: 	return task;
   84: }
   85: 
   86: /*
   87:  * sched_unuseTask() - Unlock and put task to unuse queue
   88:  *
   89:  * @task = task
   90:  * return: always is NULL
   91:  */
   92: sched_task_t *
   93: sched_unuseTask(sched_task_t * __restrict task)
   94: {
   95: 	TASK_UNLOCK(task);
   96: 	TASK_TYPE(task) = taskUNUSE;
   97: #ifdef HAVE_LIBPTHREAD
   98: 	pthread_mutex_lock(&TASK_ROOT(task)->root_mtx[taskUNUSE]);
   99: #endif
  100: 	TAILQ_INSERT_TAIL(&TASK_ROOT(task)->root_unuse, TASK_ID(task), task_node);
  101: #ifdef HAVE_LIBPTHREAD
  102: 	pthread_mutex_unlock(&TASK_ROOT(task)->root_mtx[taskUNUSE]);
  103: #endif
  104: 	task = NULL;
  105: 
  106: 	return task;
  107: }
  108: 
  109: #pragma GCC visibility push(hidden)
  110: 
  111: #ifdef HAVE_LIBPTHREAD
  112: static void
  113: _sched_threadCleanup(sched_task_t *t)
  114: {
  115: 	if (!t || !TASK_ROOT(t))
  116: 		return;
  117: 
  118: 	if (TASK_FLAG(t) == PTHREAD_CREATE_JOINABLE)
  119: 		pthread_detach(pthread_self());
  120: 
  121: 	pthread_mutex_lock(&TASK_ROOT(t)->root_mtx[taskTHREAD]);
  122: 	TAILQ_REMOVE(&TASK_ROOT(t)->root_thread, t, task_node);
  123: 	pthread_mutex_unlock(&TASK_ROOT(t)->root_mtx[taskTHREAD]);
  124: 
  125: 	sched_unuseTask(t);
  126: }
  127: void *
  128: _sched_threadWrapper(sched_task_t *t)
  129: {
  130: 	void *ret = NULL;
  131: 	sem_t *s = NULL;
  132: 	sched_root_task_t *r;
  133: 
  134: 	if (!t || !TASK_ROOT(t) || !TASK_RET(t))
  135: 		pthread_exit(ret);
  136: 	else {
  137: 		s = (sem_t*) TASK_RET(t);
  138: 		r = TASK_ROOT(t);
  139: 	}
  140: 
  141: 	pthread_cleanup_push((void (*)(void*)) _sched_threadCleanup, t);
  142: 
  143: 	pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
  144: 	pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
  145: 
  146: #ifdef HAVE_LIBPTHREAD
  147: 	pthread_mutex_lock(&r->root_mtx[taskTHREAD]);
  148: #endif
  149: 	TAILQ_REMOVE(&r->root_thread, t, task_node);
  150: #ifdef HAVE_LIBPTHREAD
  151: 	pthread_mutex_unlock(&r->root_mtx[taskTHREAD]);
  152: #endif
  153: 	t->task_type = taskUNUSE;
  154: #ifdef HAVE_LIBPTHREAD
  155: 	pthread_mutex_lock(&r->root_mtx[taskUNUSE]);
  156: #endif
  157: 	TAILQ_INSERT_TAIL(&r->root_unuse, t, task_node);
  158: #ifdef HAVE_LIBPTHREAD
  159: 	pthread_mutex_unlock(&r->root_mtx[taskUNUSE]);
  160: #endif
  161: 
  162: 	/* notify parent, thread is ready for execution */
  163: 	sem_post(s);
  164: 	pthread_testcancel();
  165: 
  166: 	ret = TASK_FUNC(t)(t);
  167: 
  168: 	pthread_cleanup_pop(42);
  169: 	TASK_ROOT(t)->root_ret = ret;
  170: 	pthread_exit(ret);
  171: }
  172: #endif
  173: 
  174: #if defined(HAVE_TIMER_CREATE) && defined(HAVE_TIMER_SETTIME)
  175: void *
  176: _sched_rtcWrapper(sched_task_t *t)
  177: {
  178: 	void *ret = NULL;
  179: 	sched_task_func_t func;
  180: 	sched_task_t *task;
  181: 	sched_root_task_t *r;
  182: 
  183: 	if (!t || !TASK_ROOT(t) || !TASK_DATA(t))
  184: 		return NULL;
  185: 	else {
  186: 		r = TASK_ROOT(t);
  187: 		task = (sched_task_t*) TASK_DATA(t);
  188: 		func = TASK_FUNC(task);
  189: 	}
  190: 
  191: #ifdef HAVE_LIBPTHREAD
  192: 	pthread_mutex_lock(&r->root_mtx[taskRTC]);
  193: #endif
  194: 	TAILQ_REMOVE(&r->root_rtc, task, task_node);
  195: #ifdef HAVE_LIBPTHREAD
  196: 	pthread_mutex_unlock(&r->root_mtx[taskRTC]);
  197: #endif
  198: 	task->task_type = taskUNUSE;
  199: #ifdef HAVE_LIBPTHREAD
  200: 	pthread_mutex_lock(&r->root_mtx[taskUNUSE]);
  201: #endif
  202: 	TAILQ_INSERT_TAIL(&r->root_unuse, task, task_node);
  203: #ifdef HAVE_LIBPTHREAD
  204: 	pthread_mutex_unlock(&r->root_mtx[taskUNUSE]);
  205: #endif
  206: 
  207: 	ret = func(task);
  208: 
  209: 	timer_delete((timer_t) TASK_DATLEN(t));
  210: 	return ret;
  211: }
  212: #endif
  213: 
  214: #pragma GCC visibility pop
  215: 
  216: /*
  217:  * sched_taskExit() - Exit routine for scheduler task, explicit required for thread tasks
  218:  *
  219:  * @task = current task
  220:  * @retcode = return code
  221:  * return: return code
  222:  */
  223: void *
  224: sched_taskExit(sched_task_t *task, intptr_t retcode)
  225: {
  226: 	if (!task || !TASK_ROOT(task))
  227: 		return (void*) -1;
  228: 
  229: 	if (TASK_ROOT(task)->root_hooks.hook_exec.exit)
  230: 		TASK_ROOT(task)->root_hooks.hook_exec.exit(task, (void*) retcode);
  231: 
  232: 	TASK_ROOT(task)->root_ret = (void*) retcode;
  233: 	return (void*) retcode;
  234: }
  235: 
  236: 
  237: /*
  238:  * schedRead() - Add READ I/O task to scheduler queue
  239:  *
  240:  * @root = root task
  241:  * @func = task execution function
  242:  * @arg = 1st func argument
  243:  * @fd = fd handle
  244:  * @opt_data = Optional data
  245:  * @opt_dlen = Optional data length
  246:  * return: NULL error or !=NULL new queued task
  247:  */
  248: sched_task_t *
  249: schedRead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd, 
  250: 		void *opt_data, size_t opt_dlen)
  251: {
  252: 	sched_task_t *task;
  253: 	void *ptr;
  254: 
  255: 	if (!root || !func)
  256: 		return NULL;
  257: 
  258: 	/* get new task */
  259: 	if (!(task = sched_useTask(root)))
  260: 		return NULL;
  261: 
  262: 	task->task_func = func;
  263: 	TASK_TYPE(task) = taskREAD;
  264: 	TASK_ROOT(task) = root;
  265: 
  266: 	TASK_ARG(task) = arg;
  267: 	TASK_FD(task) = fd;
  268: 
  269: 	TASK_DATA(task) = opt_data;
  270: 	TASK_DATLEN(task) = opt_dlen;
  271: 
  272: 	if (root->root_hooks.hook_add.read)
  273: 		ptr = root->root_hooks.hook_add.read(task, NULL);
  274: 	else
  275: 		ptr = NULL;
  276: 
  277: 	if (!ptr) {
  278: #ifdef HAVE_LIBPTHREAD
  279: 		pthread_mutex_lock(&root->root_mtx[taskREAD]);
  280: #endif
  281: 		TAILQ_INSERT_TAIL(&root->root_read, TASK_ID(task), task_node);
  282: #ifdef HAVE_LIBPTHREAD
  283: 		pthread_mutex_unlock(&root->root_mtx[taskREAD]);
  284: #endif
  285: 	} else
  286: 		task = sched_unuseTask(task);
  287: 
  288: 	return task;
  289: }
  290: 
  291: /*
  292:  * schedWrite() - Add WRITE I/O task to scheduler queue
  293:  *
  294:  * @root = root task
  295:  * @func = task execution function
  296:  * @arg = 1st func argument
  297:  * @fd = fd handle
  298:  * @opt_data = Optional data
  299:  * @opt_dlen = Optional data length
  300:  * return: NULL error or !=NULL new queued task
  301:  */
  302: sched_task_t *
  303: schedWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd, 
  304: 		void *opt_data, size_t opt_dlen)
  305: {
  306: 	sched_task_t *task;
  307: 	void *ptr;
  308: 
  309: 	if (!root || !func)
  310: 		return NULL;
  311: 
  312: 	/* get new task */
  313: 	if (!(task = sched_useTask(root)))
  314: 		return NULL;
  315: 
  316: 	task->task_func = func;
  317: 	TASK_TYPE(task) = taskWRITE;
  318: 	TASK_ROOT(task) = root;
  319: 
  320: 	TASK_ARG(task) = arg;
  321: 	TASK_FD(task) = fd;
  322: 
  323: 	TASK_DATA(task) = opt_data;
  324: 	TASK_DATLEN(task) = opt_dlen;
  325: 
  326: 	if (root->root_hooks.hook_add.write)
  327: 		ptr = root->root_hooks.hook_add.write(task, NULL);
  328: 	else
  329: 		ptr = NULL;
  330: 
  331: 	if (!ptr) {
  332: #ifdef HAVE_LIBPTHREAD
  333: 		pthread_mutex_lock(&root->root_mtx[taskWRITE]);
  334: #endif
  335: 		TAILQ_INSERT_TAIL(&root->root_write, TASK_ID(task), task_node);
  336: #ifdef HAVE_LIBPTHREAD
  337: 		pthread_mutex_unlock(&root->root_mtx[taskWRITE]);
  338: #endif
  339: 	} else
  340: 		task = sched_unuseTask(task);
  341: 
  342: 	return task;
  343: }
  344: 
  345: /*
  346:  * schedNode() - Add NODE task to scheduler queue
  347:  *
  348:  * @root = root task
  349:  * @func = task execution function
  350:  * @arg = 1st func argument
  351:  * @fd = fd handle
  352:  * @opt_data = Optional data
  353:  * @opt_dlen = Optional data length
  354:  * return: NULL error or !=NULL new queued task
  355:  */
  356: sched_task_t *
  357: schedNode(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd, 
  358: 		void *opt_data, size_t opt_dlen)
  359: {
  360: 	sched_task_t *task;
  361: 	void *ptr;
  362: 
  363: 	if (!root || !func)
  364: 		return NULL;
  365: 
  366: 	/* get new task */
  367: 	if (!(task = sched_useTask(root)))
  368: 		return NULL;
  369: 
  370: 	task->task_func = func;
  371: 	TASK_TYPE(task) = taskNODE;
  372: 	TASK_ROOT(task) = root;
  373: 
  374: 	TASK_ARG(task) = arg;
  375: 	TASK_FD(task) = fd;
  376: 
  377: 	TASK_DATA(task) = opt_data;
  378: 	TASK_DATLEN(task) = opt_dlen;
  379: 
  380: 	if (root->root_hooks.hook_add.node)
  381: 		ptr = root->root_hooks.hook_add.node(task, NULL);
  382: 	else
  383: 		ptr = NULL;
  384: 
  385: 	if (!ptr) {
  386: #ifdef HAVE_LIBPTHREAD
  387: 		pthread_mutex_lock(&root->root_mtx[taskNODE]);
  388: #endif
  389: 		TAILQ_INSERT_TAIL(&root->root_node, TASK_ID(task), task_node);
  390: #ifdef HAVE_LIBPTHREAD
  391: 		pthread_mutex_unlock(&root->root_mtx[taskNODE]);
  392: #endif
  393: 	} else
  394: 		task = sched_unuseTask(task);
  395: 
  396: 	return task;
  397: }
  398: 
  399: /*
  400:  * schedProc() - Add PROC task to scheduler queue
  401:  *
  402:  * @root = root task
  403:  * @func = task execution function
  404:  * @arg = 1st func argument
  405:  * @pid = PID
  406:  * @opt_data = Optional data
  407:  * @opt_dlen = Optional data length
  408:  * return: NULL error or !=NULL new queued task
  409:  */
  410: sched_task_t *
  411: schedProc(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long pid, 
  412: 		void *opt_data, size_t opt_dlen)
  413: {
  414: 	sched_task_t *task;
  415: 	void *ptr;
  416: 
  417: 	if (!root || !func)
  418: 		return NULL;
  419: 
  420: 	/* get new task */
  421: 	if (!(task = sched_useTask(root)))
  422: 		return NULL;
  423: 
  424: 	task->task_func = func;
  425: 	TASK_TYPE(task) = taskPROC;
  426: 	TASK_ROOT(task) = root;
  427: 
  428: 	TASK_ARG(task) = arg;
  429: 	TASK_VAL(task) = pid;
  430: 
  431: 	TASK_DATA(task) = opt_data;
  432: 	TASK_DATLEN(task) = opt_dlen;
  433: 
  434: 	if (root->root_hooks.hook_add.proc)
  435: 		ptr = root->root_hooks.hook_add.proc(task, NULL);
  436: 	else
  437: 		ptr = NULL;
  438: 
  439: 	if (!ptr) {
  440: #ifdef HAVE_LIBPTHREAD
  441: 		pthread_mutex_lock(&root->root_mtx[taskPROC]);
  442: #endif
  443: 		TAILQ_INSERT_TAIL(&root->root_proc, TASK_ID(task), task_node);
  444: #ifdef HAVE_LIBPTHREAD
  445: 		pthread_mutex_unlock(&root->root_mtx[taskPROC]);
  446: #endif
  447: 	} else
  448: 		task = sched_unuseTask(task);
  449: 
  450: 	return task;
  451: }
  452: 
  453: /*
  454:  * schedUser() - Add trigger USER task to scheduler queue
  455:  *
  456:  * @root = root task
  457:  * @func = task execution function
  458:  * @arg = 1st func argument
  459:  * @id = Trigger ID
  460:  * @opt_data = Optional data
  461:  * @opt_dlen = Optional user's trigger flags
  462:  * return: NULL error or !=NULL new queued task
  463:  */
  464: sched_task_t *
  465: schedUser(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id, 
  466: 		void *opt_data, size_t opt_dlen)
  467: {
  468: #ifndef EVFILT_USER
  469: 	sched_SetErr(ENOTSUP, "Not supported kevent() filter");
  470: 	return NULL;
  471: #else
  472: 	sched_task_t *task;
  473: 	void *ptr;
  474: 
  475: 	if (!root || !func)
  476: 		return NULL;
  477: 
  478: 	/* get new task */
  479: 	if (!(task = sched_useTask(root)))
  480: 		return NULL;
  481: 
  482: 	task->task_func = func;
  483: 	TASK_TYPE(task) = taskUSER;
  484: 	TASK_ROOT(task) = root;
  485: 
  486: 	TASK_ARG(task) = arg;
  487: 	TASK_VAL(task) = id;
  488: 
  489: 	TASK_DATA(task) = opt_data;
  490: 	TASK_DATLEN(task) = opt_dlen;
  491: 
  492: 	if (root->root_hooks.hook_add.user)
  493: 		ptr = root->root_hooks.hook_add.user(task, NULL);
  494: 	else
  495: 		ptr = NULL;
  496: 
  497: 	if (!ptr) {
  498: #ifdef HAVE_LIBPTHREAD
  499: 		pthread_mutex_lock(&root->root_mtx[taskUSER]);
  500: #endif
  501: 		TAILQ_INSERT_TAIL(&root->root_user, TASK_ID(task), task_node);
  502: #ifdef HAVE_LIBPTHREAD
  503: 		pthread_mutex_unlock(&root->root_mtx[taskUSER]);
  504: #endif
  505: 	} else
  506: 		task = sched_unuseTask(task);
  507: 
  508: 	return task;
  509: #endif
  510: }
  511: 
  512: /*
  513:  * schedSignal() - Add SIGNAL task to scheduler queue
  514:  *
  515:  * @root = root task
  516:  * @func = task execution function
  517:  * @arg = 1st func argument
  518:  * @sig = Signal
  519:  * @opt_data = Optional data
  520:  * @opt_dlen = Optional data length
  521:  * return: NULL error or !=NULL new queued task
  522:  */
  523: sched_task_t *
  524: schedSignal(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long sig, 
  525: 		void *opt_data, size_t opt_dlen)
  526: {
  527: 	sched_task_t *task;
  528: 	void *ptr;
  529: 
  530: 	if (!root || !func)
  531: 		return NULL;
  532: 
  533: 	/* get new task */
  534: 	if (!(task = sched_useTask(root)))
  535: 		return NULL;
  536: 
  537: 	task->task_func = func;
  538: 	TASK_TYPE(task) = taskSIGNAL;
  539: 	TASK_ROOT(task) = root;
  540: 
  541: 	TASK_ARG(task) = arg;
  542: 	TASK_VAL(task) = sig;
  543: 
  544: 	TASK_DATA(task) = opt_data;
  545: 	TASK_DATLEN(task) = opt_dlen;
  546: 
  547: 	if (root->root_hooks.hook_add.signal)
  548: 		ptr = root->root_hooks.hook_add.signal(task, NULL);
  549: 	else
  550: 		ptr = NULL;
  551: 
  552: 	if (!ptr) {
  553: #ifdef HAVE_LIBPTHREAD
  554: 		pthread_mutex_lock(&root->root_mtx[taskSIGNAL]);
  555: #endif
  556: 		TAILQ_INSERT_TAIL(&root->root_signal, TASK_ID(task), task_node);
  557: #ifdef HAVE_LIBPTHREAD
  558: 		pthread_mutex_unlock(&root->root_mtx[taskSIGNAL]);
  559: #endif
  560: 	} else
  561: 		task = sched_unuseTask(task);
  562: 
  563: 	return task;
  564: }
  565: 
  566: /*
  567:  * schedAlarm() - Add ALARM task to scheduler queue
  568:  *
  569:  * @root = root task
  570:  * @func = task execution function
  571:  * @arg = 1st func argument
  572:  * @ts = timeout argument structure, minimum alarm timer resolution is 1msec!
  573:  * @opt_data = Alarm timer ID
  574:  * @opt_dlen = Optional data length
  575:  * return: NULL error or !=NULL new queued task
  576:  */
  577: sched_task_t *
  578: schedAlarm(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts, 
  579: 		void *opt_data, size_t opt_dlen)
  580: {
  581: 	sched_task_t *task;
  582: 	void *ptr;
  583: 
  584: 	if (!root || !func)
  585: 		return NULL;
  586: 
  587: 	/* get new task */
  588: 	if (!(task = sched_useTask(root)))
  589: 		return NULL;
  590: 
  591: 	task->task_func = func;
  592: 	TASK_TYPE(task) = taskALARM;
  593: 	TASK_ROOT(task) = root;
  594: 
  595: 	TASK_ARG(task) = arg;
  596: 	TASK_TS(task) = ts;
  597: 
  598: 	TASK_DATA(task) = opt_data;
  599: 	TASK_DATLEN(task) = opt_dlen;
  600: 
  601: 	if (root->root_hooks.hook_add.alarm)
  602: 		ptr = root->root_hooks.hook_add.alarm(task, NULL);
  603: 	else
  604: 		ptr = NULL;
  605: 
  606: 	if (!ptr) {
  607: #ifdef HAVE_LIBPTHREAD
  608: 		pthread_mutex_lock(&root->root_mtx[taskALARM]);
  609: #endif
  610: 		TAILQ_INSERT_TAIL(&root->root_alarm, TASK_ID(task), task_node);
  611: #ifdef HAVE_LIBPTHREAD
  612: 		pthread_mutex_unlock(&root->root_mtx[taskALARM]);
  613: #endif
  614: 	} else
  615: 		task = sched_unuseTask(task);
  616: 
  617: 	return task;
  618: }
  619: 
  620: #ifdef AIO_SUPPORT
  621: /*
  622:  * schedAIO() - Add AIO task to scheduler queue
  623:  *
  624:  * @root = root task
  625:  * @func = task execution function
  626:  * @arg = 1st func argument
  627:  * @acb = AIO cb structure address
  628:  * @opt_data = Optional data
  629:  * @opt_dlen = Optional data length
  630:  * return: NULL error or !=NULL new queued task
  631:  */
  632: sched_task_t *
  633: schedAIO(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, 
  634: 		struct aiocb * __restrict acb, void *opt_data, size_t opt_dlen)
  635: {
  636: 	sched_task_t *task;
  637: 	void *ptr;
  638: 
  639: 	if (!root || !func || !acb || !opt_dlen)
  640: 		return NULL;
  641: 
  642: 	/* get new task */
  643: 	if (!(task = sched_useTask(root)))
  644: 		return NULL;
  645: 
  646: 	task->task_func = func;
  647: 	TASK_TYPE(task) = taskAIO;
  648: 	TASK_ROOT(task) = root;
  649: 
  650: 	TASK_ARG(task) = arg;
  651: 	TASK_VAL(task) = (u_long) acb;
  652: 
  653: 	TASK_DATA(task) = opt_data;
  654: 	TASK_DATLEN(task) = opt_dlen;
  655: 
  656: 	if (root->root_hooks.hook_add.aio)
  657: 		ptr = root->root_hooks.hook_add.aio(task, NULL);
  658: 	else
  659: 		ptr = NULL;
  660: 
  661: 	if (!ptr) {
  662: #ifdef HAVE_LIBPTHREAD
  663: 		pthread_mutex_lock(&root->root_mtx[taskAIO]);
  664: #endif
  665: 		TAILQ_INSERT_TAIL(&root->root_aio, TASK_ID(task), task_node);
  666: #ifdef HAVE_LIBPTHREAD
  667: 		pthread_mutex_unlock(&root->root_mtx[taskAIO]);
  668: #endif
  669: 	} else
  670: 		task = sched_unuseTask(task);
  671: 
  672: 	return task;
  673: }
  674: 
  675: /*
  676:  * schedAIORead() - Add AIO read task to scheduler queue
  677:  *
  678:  * @root = root task
  679:  * @func = task execution function
  680:  * @arg = 1st func argument
  681:  * @fd = file descriptor
  682:  * @buffer = Buffer
  683:  * @buflen = Buffer length
  684:  * @offset = Offset from start of file, if =-1 from current position
  685:  * return: NULL error or !=NULL new queued task
  686:  */
  687: sched_task_t *
  688: schedAIORead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd, 
  689: 		void *buffer, size_t buflen, off_t offset)
  690: {
  691: 	struct aiocb *acb;
  692: 	off_t off;
  693: 
  694: 	if (!root || !func || !buffer || !buflen)
  695: 		return NULL;
  696: 
  697: 	if (offset == (off_t) -1) {
  698: 		off = lseek(fd, 0, SEEK_CUR);
  699: 		if (off == -1) {
  700: 			LOGERR;
  701: 			return NULL;
  702: 		}
  703: 	} else
  704: 		off = offset;
  705: 
  706: 	if (!(acb = malloc(sizeof(struct aiocb)))) {
  707: 		LOGERR;
  708: 		return NULL;
  709: 	} else
  710: 		memset(acb, 0, sizeof(struct aiocb));
  711: 
  712: 	acb->aio_fildes = fd;
  713: 	acb->aio_nbytes = buflen;
  714: 	acb->aio_buf = buffer;
  715: 	acb->aio_offset = off;
  716: 	acb->aio_sigevent.sigev_notify = SIGEV_KEVENT;
  717: 	acb->aio_sigevent.sigev_notify_kqueue = root->root_kq;
  718: 	acb->aio_sigevent.sigev_value.sival_ptr = acb;
  719: 
  720: 	if (aio_read(acb)) {
  721: 		LOGERR;
  722: 		free(acb);
  723: 		return NULL;
  724: 	}
  725: 
  726: 	return schedAIO(root, func, arg, acb, buffer, buflen);
  727: }
  728: 
  729: /*
  730:  * schedAIOWrite() - Add AIO write task to scheduler queue
  731:  *
  732:  * @root = root task
  733:  * @func = task execution function
  734:  * @arg = 1st func argument
  735:  * @fd = file descriptor
  736:  * @buffer = Buffer
  737:  * @buflen = Buffer length
  738:  * @offset = Offset from start of file, if =-1 from current position
  739:  * return: NULL error or !=NULL new queued task
  740:  */
  741: sched_task_t *
  742: schedAIOWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd, 
  743: 		void *buffer, size_t buflen, off_t offset)
  744: {
  745: 	struct aiocb *acb;
  746: 	off_t off;
  747: 
  748: 	if (!root || !func || !buffer || !buflen)
  749: 		return NULL;
  750: 
  751: 	if (offset == (off_t) -1) {
  752: 		off = lseek(fd, 0, SEEK_CUR);
  753: 		if (off == -1) {
  754: 			LOGERR;
  755: 			return NULL;
  756: 		}
  757: 	} else
  758: 		off = offset;
  759: 
  760: 	if (!(acb = malloc(sizeof(struct aiocb)))) {
  761: 		LOGERR;
  762: 		return NULL;
  763: 	} else
  764: 		memset(acb, 0, sizeof(struct aiocb));
  765: 
  766: 	acb->aio_fildes = fd;
  767: 	acb->aio_nbytes = buflen;
  768: 	acb->aio_buf = buffer;
  769: 	acb->aio_offset = off;
  770: 	acb->aio_sigevent.sigev_notify = SIGEV_KEVENT;
  771: 	acb->aio_sigevent.sigev_notify_kqueue = root->root_kq;
  772: 	acb->aio_sigevent.sigev_value.sival_ptr = acb;
  773: 
  774: 	if (aio_write(acb)) {
  775: 		LOGERR;
  776: 		free(acb);
  777: 		return NULL;
  778: 	}
  779: 
  780: 	return schedAIO(root, func, arg, acb, buffer, buflen);
  781: }
  782: 
  783: #ifdef EVFILT_LIO
  784: /*
  785:  * schedLIO() - Add AIO bulk tasks to scheduler queue
  786:  *
  787:  * @root = root task
  788:  * @func = task execution function
  789:  * @arg = 1st func argument
  790:  * @acbs = AIO cb structure addresses
  791:  * @opt_data = Optional data
  792:  * @opt_dlen = Optional data length
  793:  * return: NULL error or !=NULL new queued task
  794:  */
  795: sched_task_t *
  796: schedLIO(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, 
  797: 		struct aiocb ** __restrict acbs, void *opt_data, size_t opt_dlen)
  798: {
  799: 	sched_task_t *task;
  800: 	void *ptr;
  801: 
  802: 	if (!root || !func || !acbs || !opt_dlen)
  803: 		return NULL;
  804: 
  805: 	/* get new task */
  806: 	if (!(task = sched_useTask(root)))
  807: 		return NULL;
  808: 
  809: 	task->task_func = func;
  810: 	TASK_TYPE(task) = taskLIO;
  811: 	TASK_ROOT(task) = root;
  812: 
  813: 	TASK_ARG(task) = arg;
  814: 	TASK_VAL(task) = (u_long) acbs;
  815: 
  816: 	TASK_DATA(task) = opt_data;
  817: 	TASK_DATLEN(task) = opt_dlen;
  818: 
  819: 	if (root->root_hooks.hook_add.lio)
  820: 		ptr = root->root_hooks.hook_add.lio(task, NULL);
  821: 	else
  822: 		ptr = NULL;
  823: 
  824: 	if (!ptr) {
  825: #ifdef HAVE_LIBPTHREAD
  826: 		pthread_mutex_lock(&root->root_mtx[taskLIO]);
  827: #endif
  828: 		TAILQ_INSERT_TAIL(&root->root_lio, TASK_ID(task), task_node);
  829: #ifdef HAVE_LIBPTHREAD
  830: 		pthread_mutex_unlock(&root->root_mtx[taskLIO]);
  831: #endif
  832: 	} else
  833: 		task = sched_unuseTask(task);
  834: 
  835: 	return task;
  836: }
  837: 
  838: /*
  839:  * schedLIORead() - Add list of AIO read tasks to scheduler queue
  840:  *
  841:  * @root = root task
  842:  * @func = task execution function
  843:  * @arg = 1st func argument
  844:  * @fd = file descriptor
  845:  * @bufs = Buffer's list
  846:  * @nbufs = Number of Buffers
  847:  * @offset = Offset from start of file, if =-1 from current position
  848:  * return: NULL error or !=NULL new queued task
  849:  */
  850: sched_task_t *
  851: schedLIORead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd, 
  852: 		struct iovec *bufs, size_t nbufs, off_t offset)
  853: {
  854: 	struct sigevent sig;
  855: 	struct aiocb **acb;
  856: 	off_t off;
  857: 	register int i;
  858: 
  859: 	if (!root || !func || !bufs || !nbufs)
  860: 		return NULL;
  861: 
  862: 	if (offset == (off_t) -1) {
  863: 		off = lseek(fd, 0, SEEK_CUR);
  864: 		if (off == -1) {
  865: 			LOGERR;
  866: 			return NULL;
  867: 		}
  868: 	} else
  869: 		off = offset;
  870: 
  871: 	if (!(acb = calloc(sizeof(void*), nbufs))) {
  872: 		LOGERR;
  873: 		return NULL;
  874: 	} else
  875: 		memset(acb, 0, sizeof(void*) * nbufs);
  876: 	for (i = 0; i < nbufs; off += bufs[i++].iov_len) {
  877: 		acb[i] = malloc(sizeof(struct aiocb));
  878: 		if (!acb[i]) {
  879: 			LOGERR;
  880: 			for (i = 0; i < nbufs; i++)
  881: 				if (acb[i])
  882: 					free(acb[i]);
  883: 			free(acb);
  884: 			return NULL;
  885: 		} else
  886: 			memset(acb[i], 0, sizeof(struct aiocb));
  887: 		acb[i]->aio_fildes = fd;
  888: 		acb[i]->aio_nbytes = bufs[i].iov_len;
  889: 		acb[i]->aio_buf = bufs[i].iov_base;
  890: 		acb[i]->aio_offset = off;
  891: 		acb[i]->aio_lio_opcode = LIO_READ;
  892: 	}
  893: 	memset(&sig, 0, sizeof sig);
  894: 	sig.sigev_notify = SIGEV_KEVENT;
  895: 	sig.sigev_notify_kqueue = root->root_kq;
  896: 	sig.sigev_value.sival_ptr = acb;
  897: 
  898: 	if (lio_listio(LIO_NOWAIT, acb, nbufs, &sig)) {
  899: 		LOGERR;
  900: 		for (i = 0; i < nbufs; i++)
  901: 			if (acb[i])
  902: 				free(acb[i]);
  903: 		free(acb);
  904: 		return NULL;
  905: 	}
  906: 
  907: 	return schedLIO(root, func, arg, (void*) acb, bufs, nbufs);
  908: }
  909: 
  910: /*
  911:  * schedLIOWrite() - Add list of AIO write tasks to scheduler queue
  912:  *
  913:  * @root = root task
  914:  * @func = task execution function
  915:  * @arg = 1st func argument
  916:  * @fd = file descriptor
  917:  * @bufs = Buffer's list
  918:  * @nbufs = Number of Buffers
  919:  * @offset = Offset from start of file, if =-1 from current position
  920:  * return: NULL error or !=NULL new queued task
  921:  */
  922: sched_task_t *
  923: schedLIOWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd, 
  924: 		struct iovec *bufs, size_t nbufs, off_t offset)
  925: {
  926: 	struct sigevent sig;
  927: 	struct aiocb **acb;
  928: 	off_t off;
  929: 	register int i;
  930: 
  931: 	if (!root || !func || !bufs || !nbufs)
  932: 		return NULL;
  933: 
  934: 	if (offset == (off_t) -1) {
  935: 		off = lseek(fd, 0, SEEK_CUR);
  936: 		if (off == -1) {
  937: 			LOGERR;
  938: 			return NULL;
  939: 		}
  940: 	} else
  941: 		off = offset;
  942: 
  943: 	if (!(acb = calloc(sizeof(void*), nbufs))) {
  944: 		LOGERR;
  945: 		return NULL;
  946: 	} else
  947: 		memset(acb, 0, sizeof(void*) * nbufs);
  948: 	for (i = 0; i < nbufs; off += bufs[i++].iov_len) {
  949: 		acb[i] = malloc(sizeof(struct aiocb));
  950: 		if (!acb[i]) {
  951: 			LOGERR;
  952: 			for (i = 0; i < nbufs; i++)
  953: 				if (acb[i])
  954: 					free(acb[i]);
  955: 			free(acb);
  956: 			return NULL;
  957: 		} else
  958: 			memset(acb[i], 0, sizeof(struct aiocb));
  959: 		acb[i]->aio_fildes = fd;
  960: 		acb[i]->aio_nbytes = bufs[i].iov_len;
  961: 		acb[i]->aio_buf = bufs[i].iov_base;
  962: 		acb[i]->aio_offset = off;
  963: 		acb[i]->aio_lio_opcode = LIO_WRITE;
  964: 	}
  965: 	memset(&sig, 0, sizeof sig);
  966: 	sig.sigev_notify = SIGEV_KEVENT;
  967: 	sig.sigev_notify_kqueue = root->root_kq;
  968: 	sig.sigev_value.sival_ptr = acb;
  969: 
  970: 	if (lio_listio(LIO_NOWAIT, acb, nbufs, &sig)) {
  971: 		LOGERR;
  972: 		for (i = 0; i < nbufs; i++)
  973: 			if (acb[i])
  974: 				free(acb[i]);
  975: 		free(acb);
  976: 		return NULL;
  977: 	}
  978: 
  979: 	return schedLIO(root, func, arg, (void*) acb, bufs, nbufs);
  980: }
  981: #endif	/* EVFILT_LIO */
  982: #endif	/* AIO_SUPPORT */
  983: 
  984: /*
  985:  * schedTimer() - Add TIMER task to scheduler queue
  986:  *
  987:  * @root = root task
  988:  * @func = task execution function
  989:  * @arg = 1st func argument
  990:  * @ts = timeout argument structure
  991:  * @opt_data = Optional data
  992:  * @opt_dlen = Optional data length
  993:  * return: NULL error or !=NULL new queued task
  994:  */
  995: sched_task_t *
  996: schedTimer(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts, 
  997: 		void *opt_data, size_t opt_dlen)
  998: {
  999: 	sched_task_t *task, *tmp, *t = NULL;
 1000: 	void *ptr;
 1001: 	struct timespec now;
 1002: 
 1003: 	if (!root || !func)
 1004: 		return NULL;
 1005: 
 1006: 	/* get new task */
 1007: 	if (!(task = sched_useTask(root)))
 1008: 		return NULL;
 1009: 
 1010: 	task->task_func = func;
 1011: 	TASK_TYPE(task) = taskTIMER;
 1012: 	TASK_ROOT(task) = root;
 1013: 
 1014: 	TASK_ARG(task) = arg;
 1015: 
 1016: 	TASK_DATA(task) = opt_data;
 1017: 	TASK_DATLEN(task) = opt_dlen;
 1018: 
 1019: 	/* calculate timeval structure */
 1020: 	clock_gettime(CLOCK_MONOTONIC, &now);
 1021: 	now.tv_sec += ts.tv_sec;
 1022: 	now.tv_nsec += ts.tv_nsec;
 1023: 	if (now.tv_nsec >= 1000000000L) {
 1024: 		now.tv_sec++;
 1025: 		now.tv_nsec -= 1000000000L;
 1026: 	} else if (now.tv_nsec < 0) {
 1027: 		now.tv_sec--;
 1028: 		now.tv_nsec += 1000000000L;
 1029: 	}
 1030: 	TASK_TS(task) = now;
 1031: 
 1032: 	if (root->root_hooks.hook_add.timer)
 1033: 		ptr = root->root_hooks.hook_add.timer(task, NULL);
 1034: 	else
 1035: 		ptr = NULL;
 1036: 
 1037: 	if (!ptr) {
 1038: #ifdef HAVE_LIBPTHREAD
 1039: 		pthread_mutex_lock(&root->root_mtx[taskTIMER]);
 1040: #endif
 1041: #ifdef TIMER_WITHOUT_SORT
 1042: 		TAILQ_INSERT_TAIL(&root->root_timer, TASK_ID(task), task_node);
 1043: #else
 1044: 		TAILQ_FOREACH_SAFE(t, &root->root_timer, task_node, tmp)
 1045: 			if (sched_timespeccmp(&TASK_TS(task), &TASK_TS(t), -) < 1)
 1046: 				break;
 1047: 		if (!t)
 1048: 			TAILQ_INSERT_TAIL(&root->root_timer, TASK_ID(task), task_node);
 1049: 		else
 1050: 			TAILQ_INSERT_BEFORE(t, TASK_ID(task), task_node);
 1051: #endif
 1052: #ifdef HAVE_LIBPTHREAD
 1053: 		pthread_mutex_unlock(&root->root_mtx[taskTIMER]);
 1054: #endif
 1055: 	} else
 1056: 		task = sched_unuseTask(task);
 1057: 
 1058: 	return task;
 1059: }
 1060: 
 1061: /*
 1062:  * schedEvent() - Add EVENT task to scheduler queue
 1063:  *
 1064:  * @root = root task
 1065:  * @func = task execution function
 1066:  * @arg = 1st func argument
 1067:  * @val = additional func argument
 1068:  * @opt_data = Optional data
 1069:  * @opt_dlen = Optional data length
 1070:  * return: NULL error or !=NULL new queued task
 1071:  */
 1072: sched_task_t *
 1073: schedEvent(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val, 
 1074: 		void *opt_data, size_t opt_dlen)
 1075: {
 1076: 	sched_task_t *task;
 1077: 	void *ptr;
 1078: 
 1079: 	if (!root || !func)
 1080: 		return NULL;
 1081: 
 1082: 	/* get new task */
 1083: 	if (!(task = sched_useTask(root)))
 1084: 		return NULL;
 1085: 
 1086: 	task->task_func = func;
 1087: 	TASK_TYPE(task) = taskEVENT;
 1088: 	TASK_ROOT(task) = root;
 1089: 
 1090: 	TASK_ARG(task) = arg;
 1091: 	TASK_VAL(task) = val;
 1092: 
 1093: 	TASK_DATA(task) = opt_data;
 1094: 	TASK_DATLEN(task) = opt_dlen;
 1095: 
 1096: 	if (root->root_hooks.hook_add.event)
 1097: 		ptr = root->root_hooks.hook_add.event(task, NULL);
 1098: 	else
 1099: 		ptr = NULL;
 1100: 
 1101: 	if (!ptr) {
 1102: #ifdef HAVE_LIBPTHREAD
 1103: 		pthread_mutex_lock(&root->root_mtx[taskEVENT]);
 1104: #endif
 1105: 		TAILQ_INSERT_TAIL(&root->root_event, TASK_ID(task), task_node);
 1106: #ifdef HAVE_LIBPTHREAD
 1107: 		pthread_mutex_unlock(&root->root_mtx[taskEVENT]);
 1108: #endif
 1109: 	} else
 1110: 		task = sched_unuseTask(task);
 1111: 
 1112: 	return task;
 1113: }
 1114: 
 1115: 
 1116: /*
 1117:  * schedTask() - Add regular task to scheduler queue
 1118:  *
 1119:  * @root = root task
 1120:  * @func = task execution function
 1121:  * @arg = 1st func argument
 1122:  * @prio = regular task priority, 0 is hi priority for regular tasks
 1123:  * @opt_data = Optional data
 1124:  * @opt_dlen = Optional data length
 1125:  * return: NULL error or !=NULL new queued task
 1126:  */
 1127: sched_task_t *
 1128: schedTask(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long prio, 
 1129: 		void *opt_data, size_t opt_dlen)
 1130: {
 1131: 	sched_task_t *task, *tmp, *t = NULL;
 1132: 	void *ptr;
 1133: 
 1134: 	if (!root || !func)
 1135: 		return NULL;
 1136: 
 1137: 	/* get new task */
 1138: 	if (!(task = sched_useTask(root)))
 1139: 		return NULL;
 1140: 
 1141: 	task->task_func = func;
 1142: 	TASK_TYPE(task) = taskTASK;
 1143: 	TASK_ROOT(task) = root;
 1144: 
 1145: 	TASK_ARG(task) = arg;
 1146: 	TASK_VAL(task) = prio;
 1147: 
 1148: 	TASK_DATA(task) = opt_data;
 1149: 	TASK_DATLEN(task) = opt_dlen;
 1150: 
 1151: 	if (root->root_hooks.hook_add.task)
 1152: 		ptr = root->root_hooks.hook_add.task(task, NULL);
 1153: 	else
 1154: 		ptr = NULL;
 1155: 
 1156: 	if (!ptr) {
 1157: #ifdef HAVE_LIBPTHREAD
 1158: 		pthread_mutex_lock(&root->root_mtx[taskTASK]);
 1159: #endif
 1160: 		TAILQ_FOREACH_SAFE(t, &root->root_task, task_node, tmp)
 1161: 			if (TASK_VAL(task) < TASK_VAL(t))
 1162: 				break;
 1163: 		if (!t)
 1164: 			TAILQ_INSERT_TAIL(&root->root_task, TASK_ID(task), task_node);
 1165: 		else
 1166: 			TAILQ_INSERT_BEFORE(t, TASK_ID(task), task_node);
 1167: #ifdef HAVE_LIBPTHREAD
 1168: 		pthread_mutex_unlock(&root->root_mtx[taskTASK]);
 1169: #endif
 1170: 	} else
 1171: 		task = sched_unuseTask(task);
 1172: 
 1173: 	return task;
 1174: }
 1175: 
 1176: /*
 1177:  * schedSuspend() - Add Suspended task to scheduler queue
 1178:  *
 1179:  * @root = root task
 1180:  * @func = task execution function
 1181:  * @arg = 1st func argument
 1182:  * @id = Trigger ID
 1183:  * @opt_data = Optional data
 1184:  * @opt_dlen = Optional data length
 1185:  * return: NULL error or !=NULL new queued task
 1186:  */
 1187: sched_task_t *
 1188: schedSuspend(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id, 
 1189: 		void *opt_data, size_t opt_dlen)
 1190: {
 1191: 	sched_task_t *task;
 1192: 	void *ptr;
 1193: 
 1194: 	if (!root || !func)
 1195: 		return NULL;
 1196: 
 1197: 	/* get new task */
 1198: 	if (!(task = sched_useTask(root)))
 1199: 		return NULL;
 1200: 
 1201: 	task->task_func = func;
 1202: 	TASK_TYPE(task) = taskSUSPEND;
 1203: 	TASK_ROOT(task) = root;
 1204: 
 1205: 	TASK_ARG(task) = arg;
 1206: 	TASK_VAL(task) = id;
 1207: 
 1208: 	TASK_DATA(task) = opt_data;
 1209: 	TASK_DATLEN(task) = opt_dlen;
 1210: 
 1211: 	if (root->root_hooks.hook_add.suspend)
 1212: 		ptr = root->root_hooks.hook_add.suspend(task, NULL);
 1213: 	else
 1214: 		ptr = NULL;
 1215: 
 1216: 	if (!ptr) {
 1217: #ifdef HAVE_LIBPTHREAD
 1218: 		pthread_mutex_lock(&root->root_mtx[taskSUSPEND]);
 1219: #endif
 1220: 		TAILQ_INSERT_TAIL(&root->root_suspend, TASK_ID(task), task_node);
 1221: #ifdef HAVE_LIBPTHREAD
 1222: 		pthread_mutex_unlock(&root->root_mtx[taskSUSPEND]);
 1223: #endif
 1224: 	} else
 1225: 		task = sched_unuseTask(task);
 1226: 
 1227: 	return task;
 1228: }
 1229: 
 1230: /*
 1231:  * schedCallOnce() - Call once from scheduler
 1232:  *
 1233:  * @root = root task
 1234:  * @func = task execution function
 1235:  * @arg = 1st func argument
 1236:  * @val = additional func argument
 1237:  * @opt_data = Optional data
 1238:  * @opt_dlen = Optional data length
 1239:  * return: return value from called func
 1240:  */
 1241: sched_task_t *
 1242: schedCallOnce(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val, 
 1243: 		void *opt_data, size_t opt_dlen)
 1244: {
 1245: 	sched_task_t *task;
 1246: 	void *ret;
 1247: 
 1248: 	if (!root || !func)
 1249: 		return NULL;
 1250: 
 1251: 	/* get new task */
 1252: 	if (!(task = sched_useTask(root)))
 1253: 		return NULL;
 1254: 
 1255: 	task->task_func = func;
 1256: 	TASK_TYPE(task) = taskEVENT;
 1257: 	TASK_ROOT(task) = root;
 1258: 
 1259: 	TASK_ARG(task) = arg;
 1260: 	TASK_VAL(task) = val;
 1261: 
 1262: 	TASK_DATA(task) = opt_data;
 1263: 	TASK_DATLEN(task) = opt_dlen;
 1264: 
 1265: 	ret = schedCall(task);
 1266: 
 1267: 	sched_unuseTask(task);
 1268: 	return ret;
 1269: }
 1270: 
 1271: /*
 1272:  * schedThread() - Add thread task to scheduler queue
 1273:  *
 1274:  * @root = root task
 1275:  * @func = task execution function
 1276:  * @arg = 1st func argument
 1277:  * @detach = Detach thread from scheduler, if !=0
 1278:  * @ss = stack size
 1279:  * @opt_data = Optional data
 1280:  * @opt_dlen = Optional data length
 1281:  * return: NULL error or !=NULL new queued task
 1282:  */
 1283: sched_task_t *
 1284: schedThread(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int detach, 
 1285: 		size_t ss, void *opt_data, size_t opt_dlen)
 1286: {
 1287: #ifndef HAVE_LIBPTHREAD
 1288: 	sched_SetErr(ENOTSUP, "Not supported thread tasks");
 1289: 	return NULL;
 1290: #endif
 1291: 	sched_task_t *task;
 1292: 	void *ptr;
 1293: 	pthread_attr_t attr;
 1294: 	sem_t *s = NULL;
 1295: 
 1296: 	if (!root || !func)
 1297: 		return NULL;
 1298: 	else {
 1299: 		/* normalizing stack size & detach state */
 1300: 		if (ss)
 1301: 			ss &= 0x7FFFFFFF;
 1302: 		detach = detach ? PTHREAD_CREATE_DETACHED : PTHREAD_CREATE_JOINABLE;
 1303: 	}
 1304: 
 1305: 	if (!(s = (sem_t*) malloc(sizeof(sem_t)))) {
 1306: 		LOGERR;
 1307: 		return NULL;
 1308: 	}
 1309: 	if (sem_init(s, 0, 1)) {
 1310: 		LOGERR;
 1311: 		free(s);
 1312: 		return NULL;
 1313: 	}
 1314: 
 1315: 	/* get new task */
 1316: 	if (!(task = sched_useTask(root))) {
 1317: 		sem_destroy(s);
 1318: 		free(s);
 1319: 
 1320: 		return NULL;
 1321: 	}
 1322: 
 1323: 	task->task_func = func;
 1324: 	TASK_TYPE(task) = taskTHREAD;
 1325: 	TASK_ROOT(task) = root;
 1326: 
 1327: 	TASK_ARG(task) = arg;
 1328: 	TASK_FLAG(task) = detach;
 1329: 	TASK_RET(task) = (intptr_t) s;
 1330: 
 1331: 	TASK_DATA(task) = opt_data;
 1332: 	TASK_DATLEN(task) = opt_dlen;
 1333: 
 1334: 	pthread_attr_init(&attr);
 1335: 	pthread_attr_setdetachstate(&attr, detach);
 1336: 	if (ss && (errno = pthread_attr_setstacksize(&attr, ss))) {
 1337: 		LOGERR;
 1338: 		pthread_attr_destroy(&attr);
 1339: 		sem_destroy(s);
 1340: 		free(s);
 1341: 		return sched_unuseTask(task);
 1342: 	}
 1343: 	if ((errno = pthread_attr_getstacksize(&attr, &ss))) {
 1344: 		LOGERR;
 1345: 		pthread_attr_destroy(&attr);
 1346: 		sem_destroy(s);
 1347: 		free(s);
 1348: 		return sched_unuseTask(task);
 1349: 	} else
 1350: 		TASK_FLAG(task) |= (ss << 1);
 1351: 	if ((errno = pthread_attr_setguardsize(&attr, ss))) {
 1352: 		LOGERR;
 1353: 		pthread_attr_destroy(&attr);
 1354: 		sem_destroy(s);
 1355: 		free(s);
 1356: 		return sched_unuseTask(task);
 1357: 	}
 1358: #ifdef SCHED_RR
 1359: 	pthread_attr_setschedpolicy(&attr, SCHED_RR);
 1360: #else
 1361: 	pthread_attr_setschedpolicy(&attr, SCHED_OTHER);
 1362: #endif
 1363: 	if (root->root_hooks.hook_add.thread)
 1364: 		ptr = root->root_hooks.hook_add.thread(task, &attr);
 1365: 	else
 1366: 		ptr = NULL;
 1367: 	pthread_attr_destroy(&attr);
 1368: 
 1369: 	if (!ptr) {
 1370: 		pthread_mutex_lock(&root->root_mtx[taskTHREAD]);
 1371: 		TAILQ_INSERT_TAIL(&root->root_thread, TASK_ID(task), task_node);
 1372: 		pthread_mutex_unlock(&root->root_mtx[taskTHREAD]);
 1373: 
 1374: 		/* wait for init thread actions */
 1375: 		sem_wait(s);
 1376: 	} else
 1377: 		task = sched_unuseTask(task);
 1378: 
 1379: 	sem_destroy(s);
 1380: 	free(s);
 1381: 	return task;
 1382: }
 1383: 
 1384: /*
 1385:  * schedRTC() - Add RTC task to scheduler queue
 1386:  *
 1387:  * @root = root task
 1388:  * @func = task execution function
 1389:  * @arg = 1st func argument
 1390:  * @ts = timeout argument structure, minimum alarm timer resolution is 1msec!
 1391:  * @opt_data = Optional RTC ID
 1392:  * @opt_dlen = Optional data length
 1393:  * return: NULL error or !=NULL new queued task
 1394:  */
 1395: sched_task_t *
 1396: schedRTC(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts, 
 1397: 		void *opt_data, size_t opt_dlen)
 1398: {
 1399: #if defined(HAVE_TIMER_CREATE) && defined(HAVE_TIMER_SETTIME)
 1400: 	sched_task_t *task;
 1401: 	void *ptr;
 1402: 
 1403: 	if (!root || !func)
 1404: 		return NULL;
 1405: 
 1406: 	/* get new task */
 1407: 	if (!(task = sched_useTask(root)))
 1408: 		return NULL;
 1409: 
 1410: 	task->task_func = func;
 1411: 	TASK_TYPE(task) = taskRTC;
 1412: 	TASK_ROOT(task) = root;
 1413: 
 1414: 	TASK_ARG(task) = arg;
 1415: 	TASK_TS(task) = ts;
 1416: 
 1417: 	TASK_DATA(task) = opt_data;
 1418: 	TASK_DATLEN(task) = opt_dlen;
 1419: 
 1420: 	if (root->root_hooks.hook_add.rtc)
 1421: 		ptr = root->root_hooks.hook_add.rtc(task, NULL);
 1422: 	else
 1423: 		ptr = NULL;
 1424: 
 1425: 	if (!ptr) {
 1426: #ifdef HAVE_LIBPTHREAD
 1427: 		pthread_mutex_lock(&root->root_mtx[taskRTC]);
 1428: #endif
 1429: 		TAILQ_INSERT_TAIL(&root->root_rtc, TASK_ID(task), task_node);
 1430: #ifdef HAVE_LIBPTHREAD
 1431: 		pthread_mutex_unlock(&root->root_mtx[taskRTC]);
 1432: #endif
 1433: 	} else
 1434: 		task = sched_unuseTask(task);
 1435: 
 1436: 	return task;
 1437: #else
 1438: 	sched_SetErr(ENOTSUP, "Not supported realtime clock extensions");
 1439: 	return NULL;
 1440: #endif
 1441: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>