File:  [ELWIX - Embedded LightWeight unIX -] / libaitsched / src / tasks.c
Revision 1.20.2.1: download - view: text, annotated - select for diffs - revision graph
Mon Aug 26 14:29:20 2013 UTC (10 years, 10 months ago) by misho
Branches: sched4_2
Diff to: branchpoint 1.20: preferred, unified
try to resolve issue for unhandled tasks

    1: /*************************************************************************
    2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
    3: *  by Michael Pounov <misho@openbsd-bg.org>
    4: *
    5: * $Author: misho $
    6: * $Id: tasks.c,v 1.20.2.1 2013/08/26 14:29:20 misho Exp $
    7: *
    8: **************************************************************************
    9: The ELWIX and AITNET software is distributed under the following
   10: terms:
   11: 
   12: All of the documentation and software included in the ELWIX and AITNET
   13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
   14: 
   15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013
   16: 	by Michael Pounov <misho@elwix.org>.  All rights reserved.
   17: 
   18: Redistribution and use in source and binary forms, with or without
   19: modification, are permitted provided that the following conditions
   20: are met:
   21: 1. Redistributions of source code must retain the above copyright
   22:    notice, this list of conditions and the following disclaimer.
   23: 2. Redistributions in binary form must reproduce the above copyright
   24:    notice, this list of conditions and the following disclaimer in the
   25:    documentation and/or other materials provided with the distribution.
   26: 3. All advertising materials mentioning features or use of this software
   27:    must display the following acknowledgement:
   28: This product includes software developed by Michael Pounov <misho@elwix.org>
   29: ELWIX - Embedded LightWeight unIX and its contributors.
   30: 4. Neither the name of AITNET nor the names of its contributors
   31:    may be used to endorse or promote products derived from this software
   32:    without specific prior written permission.
   33: 
   34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
   35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
   36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
   37: ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
   38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
   39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
   40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
   41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
   43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   44: SUCH DAMAGE.
   45: */
   46: #include "global.h"
   47: 
   48: 
   49: /*
   50:  * sched_useTask() - Get and init new task
   51:  *
   52:  * @root = root task
   53:  * return: NULL error or !=NULL prepared task
   54:  */
   55: sched_task_t *
   56: sched_useTask(sched_root_task_t * __restrict root)
   57: {
   58: 	sched_task_t *task, *tmp;
   59: 
   60: #ifdef HAVE_LIBPTHREAD
   61: 	pthread_mutex_lock(&root->root_mtx[taskUNUSE]);
   62: #endif
   63: 	TAILQ_FOREACH_SAFE(task, &root->root_unuse, task_node, tmp) {
   64: 		if (!TASK_ISLOCKED(task)) {
   65: 			TAILQ_REMOVE(&root->root_unuse, task, task_node);
   66: 			break;
   67: 		}
   68: 	}
   69: #ifdef HAVE_LIBPTHREAD
   70: 	pthread_mutex_unlock(&root->root_mtx[taskUNUSE]);
   71: #endif
   72: 
   73: 	if (!task) {
   74: 		task = malloc(sizeof(sched_task_t));
   75: 		if (!task) {
   76: 			LOGERR;
   77: 			return NULL;
   78: 		}
   79: 	}
   80: 
   81: 	memset(task, 0, sizeof(sched_task_t));
   82: 	task->task_id = (uintptr_t) task;
   83: 	return task;
   84: }
   85: 
   86: /*
   87:  * sched_unuseTask() - Unlock and put task to unuse queue
   88:  *
   89:  * @task = task
   90:  * return: always is NULL
   91:  */
   92: sched_task_t *
   93: sched_unuseTask(sched_task_t * __restrict task)
   94: {
   95: 	TASK_UNLOCK(task);
   96: 	TASK_TYPE(task) = taskUNUSE;
   97: #ifdef HAVE_LIBPTHREAD
   98: 	pthread_mutex_lock(&TASK_ROOT(task)->root_mtx[taskUNUSE]);
   99: #endif
  100: 	TAILQ_INSERT_TAIL(&TASK_ROOT(task)->root_unuse, TASK_ID(task), task_node);
  101: #ifdef HAVE_LIBPTHREAD
  102: 	pthread_mutex_unlock(&TASK_ROOT(task)->root_mtx[taskUNUSE]);
  103: #endif
  104: 	task = NULL;
  105: 
  106: 	return task;
  107: }
  108: 
  109: #pragma GCC visibility push(hidden)
  110: 
  111: #ifdef HAVE_LIBPTHREAD
  112: static void
  113: _sched_threadCleanup(sched_task_t *t)
  114: {
  115: 	if (!t || !TASK_ROOT(t))
  116: 		return;
  117: 
  118: 	if (TASK_FLAG(t) == PTHREAD_CREATE_JOINABLE)
  119: 		pthread_detach(pthread_self());
  120: }
  121: void *
  122: _sched_threadWrapper(sched_task_t *t)
  123: {
  124: 	void *ret = NULL;
  125: 	sem_t *s = NULL;
  126: 
  127: 	if (!t || !TASK_ROOT(t) || !TASK_RET(t))
  128: 		pthread_exit(ret);
  129: 	else
  130: 		s = (sem_t*) TASK_RET(t);
  131: 
  132: 	pthread_cleanup_push((void (*)(void*)) _sched_threadCleanup, t);
  133: 
  134: 	pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
  135: 	pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
  136: 
  137: 	pthread_mutex_lock(&TASK_ROOT(t)->root_mtx[taskTHREAD]);
  138: 	TAILQ_REMOVE(&TASK_ROOT(t)->root_thread, t, task_node);
  139: 	pthread_mutex_unlock(&TASK_ROOT(t)->root_mtx[taskTHREAD]);
  140: 	sched_unuseTask(t);
  141: 
  142: 	/* notify parent, thread is ready for execution */
  143: 	sem_post(s);
  144: 	pthread_testcancel();
  145: 
  146: 	ret = TASK_FUNC(t)(t);
  147: 
  148: 	pthread_cleanup_pop(42);
  149: 	TASK_ROOT(t)->root_ret = ret;
  150: 	pthread_exit(ret);
  151: }
  152: #endif
  153: 
  154: #if defined(HAVE_TIMER_CREATE) && defined(HAVE_TIMER_SETTIME)
  155: void *
  156: _sched_rtcWrapper(sched_task_t *t)
  157: {
  158: 	sched_task_func_t func;
  159: 	sched_task_t *task;
  160: 	sched_root_task_t *r;
  161: 
  162: 	if (!t || !TASK_ROOT(t) || !TASK_DATA(t))
  163: 		return NULL;
  164: 	else {
  165: 		r = TASK_ROOT(t);
  166: 		task = (sched_task_t*) TASK_DATA(t);
  167: 		func = TASK_FUNC(task);
  168: 	}
  169: 
  170: #ifdef HAVE_LIBPTHREAD
  171: 	pthread_mutex_lock(&r->root_mtx[taskRTC]);
  172: #endif
  173: 	TAILQ_REMOVE(&r->root_rtc, task, task_node);
  174: #ifdef HAVE_LIBPTHREAD
  175: 	pthread_mutex_unlock(&r->root_mtx[taskRTC]);
  176: #endif
  177: 	sched_unuseTask(task);
  178: 
  179: 	timer_delete((timer_t) TASK_DATLEN(t));
  180: 
  181: 	return func(task);
  182: }
  183: #endif
  184: 
  185: #pragma GCC visibility pop
  186: 
  187: /*
  188:  * sched_taskExit() - Exit routine for scheduler task, explicit required for thread tasks
  189:  *
  190:  * @task = current task
  191:  * @retcode = return code
  192:  * return: return code
  193:  */
  194: void *
  195: sched_taskExit(sched_task_t *task, intptr_t retcode)
  196: {
  197: 	if (!task || !TASK_ROOT(task))
  198: 		return (void*) -1;
  199: 
  200: 	if (TASK_ROOT(task)->root_hooks.hook_exec.exit)
  201: 		TASK_ROOT(task)->root_hooks.hook_exec.exit(task, (void*) retcode);
  202: 
  203: 	TASK_ROOT(task)->root_ret = (void*) retcode;
  204: 	return (void*) retcode;
  205: }
  206: 
  207: 
  208: /*
  209:  * schedRead() - Add READ I/O task to scheduler queue
  210:  *
  211:  * @root = root task
  212:  * @func = task execution function
  213:  * @arg = 1st func argument
  214:  * @fd = fd handle
  215:  * @opt_data = Optional data
  216:  * @opt_dlen = Optional data length
  217:  * return: NULL error or !=NULL new queued task
  218:  */
  219: sched_task_t *
  220: schedRead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd, 
  221: 		void *opt_data, size_t opt_dlen)
  222: {
  223: 	sched_task_t *task;
  224: 	void *ptr;
  225: 
  226: 	if (!root || !func)
  227: 		return NULL;
  228: 
  229: 	/* get new task */
  230: 	if (!(task = sched_useTask(root)))
  231: 		return NULL;
  232: 
  233: 	task->task_func = func;
  234: 	TASK_TYPE(task) = taskREAD;
  235: 	TASK_ROOT(task) = root;
  236: 
  237: 	TASK_ARG(task) = arg;
  238: 	TASK_FD(task) = fd;
  239: 
  240: 	TASK_DATA(task) = opt_data;
  241: 	TASK_DATLEN(task) = opt_dlen;
  242: 
  243: 	if (root->root_hooks.hook_add.read)
  244: 		ptr = root->root_hooks.hook_add.read(task, NULL);
  245: 	else
  246: 		ptr = NULL;
  247: 
  248: 	if (!ptr) {
  249: #ifdef HAVE_LIBPTHREAD
  250: 		pthread_mutex_lock(&root->root_mtx[taskREAD]);
  251: #endif
  252: 		TAILQ_INSERT_TAIL(&root->root_read, TASK_ID(task), task_node);
  253: #ifdef HAVE_LIBPTHREAD
  254: 		pthread_mutex_unlock(&root->root_mtx[taskREAD]);
  255: #endif
  256: 	} else
  257: 		task = sched_unuseTask(task);
  258: 
  259: 	return task;
  260: }
  261: 
  262: /*
  263:  * schedWrite() - Add WRITE I/O task to scheduler queue
  264:  *
  265:  * @root = root task
  266:  * @func = task execution function
  267:  * @arg = 1st func argument
  268:  * @fd = fd handle
  269:  * @opt_data = Optional data
  270:  * @opt_dlen = Optional data length
  271:  * return: NULL error or !=NULL new queued task
  272:  */
  273: sched_task_t *
  274: schedWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd, 
  275: 		void *opt_data, size_t opt_dlen)
  276: {
  277: 	sched_task_t *task;
  278: 	void *ptr;
  279: 
  280: 	if (!root || !func)
  281: 		return NULL;
  282: 
  283: 	/* get new task */
  284: 	if (!(task = sched_useTask(root)))
  285: 		return NULL;
  286: 
  287: 	task->task_func = func;
  288: 	TASK_TYPE(task) = taskWRITE;
  289: 	TASK_ROOT(task) = root;
  290: 
  291: 	TASK_ARG(task) = arg;
  292: 	TASK_FD(task) = fd;
  293: 
  294: 	TASK_DATA(task) = opt_data;
  295: 	TASK_DATLEN(task) = opt_dlen;
  296: 
  297: 	if (root->root_hooks.hook_add.write)
  298: 		ptr = root->root_hooks.hook_add.write(task, NULL);
  299: 	else
  300: 		ptr = NULL;
  301: 
  302: 	if (!ptr) {
  303: #ifdef HAVE_LIBPTHREAD
  304: 		pthread_mutex_lock(&root->root_mtx[taskWRITE]);
  305: #endif
  306: 		TAILQ_INSERT_TAIL(&root->root_write, TASK_ID(task), task_node);
  307: #ifdef HAVE_LIBPTHREAD
  308: 		pthread_mutex_unlock(&root->root_mtx[taskWRITE]);
  309: #endif
  310: 	} else
  311: 		task = sched_unuseTask(task);
  312: 
  313: 	return task;
  314: }
  315: 
  316: /*
  317:  * schedNode() - Add NODE task to scheduler queue
  318:  *
  319:  * @root = root task
  320:  * @func = task execution function
  321:  * @arg = 1st func argument
  322:  * @fd = fd handle
  323:  * @opt_data = Optional data
  324:  * @opt_dlen = Optional data length
  325:  * return: NULL error or !=NULL new queued task
  326:  */
  327: sched_task_t *
  328: schedNode(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd, 
  329: 		void *opt_data, size_t opt_dlen)
  330: {
  331: 	sched_task_t *task;
  332: 	void *ptr;
  333: 
  334: 	if (!root || !func)
  335: 		return NULL;
  336: 
  337: 	/* get new task */
  338: 	if (!(task = sched_useTask(root)))
  339: 		return NULL;
  340: 
  341: 	task->task_func = func;
  342: 	TASK_TYPE(task) = taskNODE;
  343: 	TASK_ROOT(task) = root;
  344: 
  345: 	TASK_ARG(task) = arg;
  346: 	TASK_FD(task) = fd;
  347: 
  348: 	TASK_DATA(task) = opt_data;
  349: 	TASK_DATLEN(task) = opt_dlen;
  350: 
  351: 	if (root->root_hooks.hook_add.node)
  352: 		ptr = root->root_hooks.hook_add.node(task, NULL);
  353: 	else
  354: 		ptr = NULL;
  355: 
  356: 	if (!ptr) {
  357: #ifdef HAVE_LIBPTHREAD
  358: 		pthread_mutex_lock(&root->root_mtx[taskNODE]);
  359: #endif
  360: 		TAILQ_INSERT_TAIL(&root->root_node, TASK_ID(task), task_node);
  361: #ifdef HAVE_LIBPTHREAD
  362: 		pthread_mutex_unlock(&root->root_mtx[taskNODE]);
  363: #endif
  364: 	} else
  365: 		task = sched_unuseTask(task);
  366: 
  367: 	return task;
  368: }
  369: 
  370: /*
  371:  * schedProc() - Add PROC task to scheduler queue
  372:  *
  373:  * @root = root task
  374:  * @func = task execution function
  375:  * @arg = 1st func argument
  376:  * @pid = PID
  377:  * @opt_data = Optional data
  378:  * @opt_dlen = Optional data length
  379:  * return: NULL error or !=NULL new queued task
  380:  */
  381: sched_task_t *
  382: schedProc(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long pid, 
  383: 		void *opt_data, size_t opt_dlen)
  384: {
  385: 	sched_task_t *task;
  386: 	void *ptr;
  387: 
  388: 	if (!root || !func)
  389: 		return NULL;
  390: 
  391: 	/* get new task */
  392: 	if (!(task = sched_useTask(root)))
  393: 		return NULL;
  394: 
  395: 	task->task_func = func;
  396: 	TASK_TYPE(task) = taskPROC;
  397: 	TASK_ROOT(task) = root;
  398: 
  399: 	TASK_ARG(task) = arg;
  400: 	TASK_VAL(task) = pid;
  401: 
  402: 	TASK_DATA(task) = opt_data;
  403: 	TASK_DATLEN(task) = opt_dlen;
  404: 
  405: 	if (root->root_hooks.hook_add.proc)
  406: 		ptr = root->root_hooks.hook_add.proc(task, NULL);
  407: 	else
  408: 		ptr = NULL;
  409: 
  410: 	if (!ptr) {
  411: #ifdef HAVE_LIBPTHREAD
  412: 		pthread_mutex_lock(&root->root_mtx[taskPROC]);
  413: #endif
  414: 		TAILQ_INSERT_TAIL(&root->root_proc, TASK_ID(task), task_node);
  415: #ifdef HAVE_LIBPTHREAD
  416: 		pthread_mutex_unlock(&root->root_mtx[taskPROC]);
  417: #endif
  418: 	} else
  419: 		task = sched_unuseTask(task);
  420: 
  421: 	return task;
  422: }
  423: 
  424: /*
  425:  * schedUser() - Add trigger USER task to scheduler queue
  426:  *
  427:  * @root = root task
  428:  * @func = task execution function
  429:  * @arg = 1st func argument
  430:  * @id = Trigger ID
  431:  * @opt_data = Optional data
  432:  * @opt_dlen = Optional user's trigger flags
  433:  * return: NULL error or !=NULL new queued task
  434:  */
  435: sched_task_t *
  436: schedUser(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id, 
  437: 		void *opt_data, size_t opt_dlen)
  438: {
  439: #ifndef EVFILT_USER
  440: 	sched_SetErr(ENOTSUP, "Not supported kevent() filter");
  441: 	return NULL;
  442: #else
  443: 	sched_task_t *task;
  444: 	void *ptr;
  445: 
  446: 	if (!root || !func)
  447: 		return NULL;
  448: 
  449: 	/* get new task */
  450: 	if (!(task = sched_useTask(root)))
  451: 		return NULL;
  452: 
  453: 	task->task_func = func;
  454: 	TASK_TYPE(task) = taskUSER;
  455: 	TASK_ROOT(task) = root;
  456: 
  457: 	TASK_ARG(task) = arg;
  458: 	TASK_VAL(task) = id;
  459: 
  460: 	TASK_DATA(task) = opt_data;
  461: 	TASK_DATLEN(task) = opt_dlen;
  462: 
  463: 	if (root->root_hooks.hook_add.user)
  464: 		ptr = root->root_hooks.hook_add.user(task, NULL);
  465: 	else
  466: 		ptr = NULL;
  467: 
  468: 	if (!ptr) {
  469: #ifdef HAVE_LIBPTHREAD
  470: 		pthread_mutex_lock(&root->root_mtx[taskUSER]);
  471: #endif
  472: 		TAILQ_INSERT_TAIL(&root->root_user, TASK_ID(task), task_node);
  473: #ifdef HAVE_LIBPTHREAD
  474: 		pthread_mutex_unlock(&root->root_mtx[taskUSER]);
  475: #endif
  476: 	} else
  477: 		task = sched_unuseTask(task);
  478: 
  479: 	return task;
  480: #endif
  481: }
  482: 
  483: /*
  484:  * schedSignal() - Add SIGNAL task to scheduler queue
  485:  *
  486:  * @root = root task
  487:  * @func = task execution function
  488:  * @arg = 1st func argument
  489:  * @sig = Signal
  490:  * @opt_data = Optional data
  491:  * @opt_dlen = Optional data length
  492:  * return: NULL error or !=NULL new queued task
  493:  */
  494: sched_task_t *
  495: schedSignal(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long sig, 
  496: 		void *opt_data, size_t opt_dlen)
  497: {
  498: 	sched_task_t *task;
  499: 	void *ptr;
  500: 
  501: 	if (!root || !func)
  502: 		return NULL;
  503: 
  504: 	/* get new task */
  505: 	if (!(task = sched_useTask(root)))
  506: 		return NULL;
  507: 
  508: 	task->task_func = func;
  509: 	TASK_TYPE(task) = taskSIGNAL;
  510: 	TASK_ROOT(task) = root;
  511: 
  512: 	TASK_ARG(task) = arg;
  513: 	TASK_VAL(task) = sig;
  514: 
  515: 	TASK_DATA(task) = opt_data;
  516: 	TASK_DATLEN(task) = opt_dlen;
  517: 
  518: 	if (root->root_hooks.hook_add.signal)
  519: 		ptr = root->root_hooks.hook_add.signal(task, NULL);
  520: 	else
  521: 		ptr = NULL;
  522: 
  523: 	if (!ptr) {
  524: #ifdef HAVE_LIBPTHREAD
  525: 		pthread_mutex_lock(&root->root_mtx[taskSIGNAL]);
  526: #endif
  527: 		TAILQ_INSERT_TAIL(&root->root_signal, TASK_ID(task), task_node);
  528: #ifdef HAVE_LIBPTHREAD
  529: 		pthread_mutex_unlock(&root->root_mtx[taskSIGNAL]);
  530: #endif
  531: 	} else
  532: 		task = sched_unuseTask(task);
  533: 
  534: 	return task;
  535: }
  536: 
  537: /*
  538:  * schedAlarm() - Add ALARM task to scheduler queue
  539:  *
  540:  * @root = root task
  541:  * @func = task execution function
  542:  * @arg = 1st func argument
  543:  * @ts = timeout argument structure, minimum alarm timer resolution is 1msec!
  544:  * @opt_data = Alarm timer ID
  545:  * @opt_dlen = Optional data length
  546:  * return: NULL error or !=NULL new queued task
  547:  */
  548: sched_task_t *
  549: schedAlarm(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts, 
  550: 		void *opt_data, size_t opt_dlen)
  551: {
  552: 	sched_task_t *task;
  553: 	void *ptr;
  554: 
  555: 	if (!root || !func)
  556: 		return NULL;
  557: 
  558: 	/* get new task */
  559: 	if (!(task = sched_useTask(root)))
  560: 		return NULL;
  561: 
  562: 	task->task_func = func;
  563: 	TASK_TYPE(task) = taskALARM;
  564: 	TASK_ROOT(task) = root;
  565: 
  566: 	TASK_ARG(task) = arg;
  567: 	TASK_TS(task) = ts;
  568: 
  569: 	TASK_DATA(task) = opt_data;
  570: 	TASK_DATLEN(task) = opt_dlen;
  571: 
  572: 	if (root->root_hooks.hook_add.alarm)
  573: 		ptr = root->root_hooks.hook_add.alarm(task, NULL);
  574: 	else
  575: 		ptr = NULL;
  576: 
  577: 	if (!ptr) {
  578: #ifdef HAVE_LIBPTHREAD
  579: 		pthread_mutex_lock(&root->root_mtx[taskALARM]);
  580: #endif
  581: 		TAILQ_INSERT_TAIL(&root->root_alarm, TASK_ID(task), task_node);
  582: #ifdef HAVE_LIBPTHREAD
  583: 		pthread_mutex_unlock(&root->root_mtx[taskALARM]);
  584: #endif
  585: 	} else
  586: 		task = sched_unuseTask(task);
  587: 
  588: 	return task;
  589: }
  590: 
  591: #ifdef AIO_SUPPORT
  592: /*
  593:  * schedAIO() - Add AIO task to scheduler queue
  594:  *
  595:  * @root = root task
  596:  * @func = task execution function
  597:  * @arg = 1st func argument
  598:  * @acb = AIO cb structure address
  599:  * @opt_data = Optional data
  600:  * @opt_dlen = Optional data length
  601:  * return: NULL error or !=NULL new queued task
  602:  */
  603: sched_task_t *
  604: schedAIO(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, 
  605: 		struct aiocb * __restrict acb, void *opt_data, size_t opt_dlen)
  606: {
  607: 	sched_task_t *task;
  608: 	void *ptr;
  609: 
  610: 	if (!root || !func || !acb || !opt_dlen)
  611: 		return NULL;
  612: 
  613: 	/* get new task */
  614: 	if (!(task = sched_useTask(root)))
  615: 		return NULL;
  616: 
  617: 	task->task_func = func;
  618: 	TASK_TYPE(task) = taskAIO;
  619: 	TASK_ROOT(task) = root;
  620: 
  621: 	TASK_ARG(task) = arg;
  622: 	TASK_VAL(task) = (u_long) acb;
  623: 
  624: 	TASK_DATA(task) = opt_data;
  625: 	TASK_DATLEN(task) = opt_dlen;
  626: 
  627: 	if (root->root_hooks.hook_add.aio)
  628: 		ptr = root->root_hooks.hook_add.aio(task, NULL);
  629: 	else
  630: 		ptr = NULL;
  631: 
  632: 	if (!ptr) {
  633: #ifdef HAVE_LIBPTHREAD
  634: 		pthread_mutex_lock(&root->root_mtx[taskAIO]);
  635: #endif
  636: 		TAILQ_INSERT_TAIL(&root->root_aio, TASK_ID(task), task_node);
  637: #ifdef HAVE_LIBPTHREAD
  638: 		pthread_mutex_unlock(&root->root_mtx[taskAIO]);
  639: #endif
  640: 	} else
  641: 		task = sched_unuseTask(task);
  642: 
  643: 	return task;
  644: }
  645: 
  646: /*
  647:  * schedAIORead() - Add AIO read task to scheduler queue
  648:  *
  649:  * @root = root task
  650:  * @func = task execution function
  651:  * @arg = 1st func argument
  652:  * @fd = file descriptor
  653:  * @buffer = Buffer
  654:  * @buflen = Buffer length
  655:  * @offset = Offset from start of file, if =-1 from current position
  656:  * return: NULL error or !=NULL new queued task
  657:  */
  658: sched_task_t *
  659: schedAIORead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd, 
  660: 		void *buffer, size_t buflen, off_t offset)
  661: {
  662: 	struct aiocb *acb;
  663: 	off_t off;
  664: 
  665: 	if (!root || !func || !buffer || !buflen)
  666: 		return NULL;
  667: 
  668: 	if (offset == (off_t) -1) {
  669: 		off = lseek(fd, 0, SEEK_CUR);
  670: 		if (off == -1) {
  671: 			LOGERR;
  672: 			return NULL;
  673: 		}
  674: 	} else
  675: 		off = offset;
  676: 
  677: 	if (!(acb = malloc(sizeof(struct aiocb)))) {
  678: 		LOGERR;
  679: 		return NULL;
  680: 	} else
  681: 		memset(acb, 0, sizeof(struct aiocb));
  682: 
  683: 	acb->aio_fildes = fd;
  684: 	acb->aio_nbytes = buflen;
  685: 	acb->aio_buf = buffer;
  686: 	acb->aio_offset = off;
  687: 	acb->aio_sigevent.sigev_notify = SIGEV_KEVENT;
  688: 	acb->aio_sigevent.sigev_notify_kqueue = root->root_kq;
  689: 	acb->aio_sigevent.sigev_value.sival_ptr = acb;
  690: 
  691: 	if (aio_read(acb)) {
  692: 		LOGERR;
  693: 		free(acb);
  694: 		return NULL;
  695: 	}
  696: 
  697: 	return schedAIO(root, func, arg, acb, buffer, buflen);
  698: }
  699: 
  700: /*
  701:  * schedAIOWrite() - Add AIO write task to scheduler queue
  702:  *
  703:  * @root = root task
  704:  * @func = task execution function
  705:  * @arg = 1st func argument
  706:  * @fd = file descriptor
  707:  * @buffer = Buffer
  708:  * @buflen = Buffer length
  709:  * @offset = Offset from start of file, if =-1 from current position
  710:  * return: NULL error or !=NULL new queued task
  711:  */
  712: sched_task_t *
  713: schedAIOWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd, 
  714: 		void *buffer, size_t buflen, off_t offset)
  715: {
  716: 	struct aiocb *acb;
  717: 	off_t off;
  718: 
  719: 	if (!root || !func || !buffer || !buflen)
  720: 		return NULL;
  721: 
  722: 	if (offset == (off_t) -1) {
  723: 		off = lseek(fd, 0, SEEK_CUR);
  724: 		if (off == -1) {
  725: 			LOGERR;
  726: 			return NULL;
  727: 		}
  728: 	} else
  729: 		off = offset;
  730: 
  731: 	if (!(acb = malloc(sizeof(struct aiocb)))) {
  732: 		LOGERR;
  733: 		return NULL;
  734: 	} else
  735: 		memset(acb, 0, sizeof(struct aiocb));
  736: 
  737: 	acb->aio_fildes = fd;
  738: 	acb->aio_nbytes = buflen;
  739: 	acb->aio_buf = buffer;
  740: 	acb->aio_offset = off;
  741: 	acb->aio_sigevent.sigev_notify = SIGEV_KEVENT;
  742: 	acb->aio_sigevent.sigev_notify_kqueue = root->root_kq;
  743: 	acb->aio_sigevent.sigev_value.sival_ptr = acb;
  744: 
  745: 	if (aio_write(acb)) {
  746: 		LOGERR;
  747: 		free(acb);
  748: 		return NULL;
  749: 	}
  750: 
  751: 	return schedAIO(root, func, arg, acb, buffer, buflen);
  752: }
  753: 
  754: #ifdef EVFILT_LIO
  755: /*
  756:  * schedLIO() - Add AIO bulk tasks to scheduler queue
  757:  *
  758:  * @root = root task
  759:  * @func = task execution function
  760:  * @arg = 1st func argument
  761:  * @acbs = AIO cb structure addresses
  762:  * @opt_data = Optional data
  763:  * @opt_dlen = Optional data length
  764:  * return: NULL error or !=NULL new queued task
  765:  */
  766: sched_task_t *
  767: schedLIO(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, 
  768: 		struct aiocb ** __restrict acbs, void *opt_data, size_t opt_dlen)
  769: {
  770: 	sched_task_t *task;
  771: 	void *ptr;
  772: 
  773: 	if (!root || !func || !acbs || !opt_dlen)
  774: 		return NULL;
  775: 
  776: 	/* get new task */
  777: 	if (!(task = sched_useTask(root)))
  778: 		return NULL;
  779: 
  780: 	task->task_func = func;
  781: 	TASK_TYPE(task) = taskLIO;
  782: 	TASK_ROOT(task) = root;
  783: 
  784: 	TASK_ARG(task) = arg;
  785: 	TASK_VAL(task) = (u_long) acbs;
  786: 
  787: 	TASK_DATA(task) = opt_data;
  788: 	TASK_DATLEN(task) = opt_dlen;
  789: 
  790: 	if (root->root_hooks.hook_add.lio)
  791: 		ptr = root->root_hooks.hook_add.lio(task, NULL);
  792: 	else
  793: 		ptr = NULL;
  794: 
  795: 	if (!ptr) {
  796: #ifdef HAVE_LIBPTHREAD
  797: 		pthread_mutex_lock(&root->root_mtx[taskLIO]);
  798: #endif
  799: 		TAILQ_INSERT_TAIL(&root->root_lio, TASK_ID(task), task_node);
  800: #ifdef HAVE_LIBPTHREAD
  801: 		pthread_mutex_unlock(&root->root_mtx[taskLIO]);
  802: #endif
  803: 	} else
  804: 		task = sched_unuseTask(task);
  805: 
  806: 	return task;
  807: }
  808: 
  809: /*
  810:  * schedLIORead() - Add list of AIO read tasks to scheduler queue
  811:  *
  812:  * @root = root task
  813:  * @func = task execution function
  814:  * @arg = 1st func argument
  815:  * @fd = file descriptor
  816:  * @bufs = Buffer's list
  817:  * @nbufs = Number of Buffers
  818:  * @offset = Offset from start of file, if =-1 from current position
  819:  * return: NULL error or !=NULL new queued task
  820:  */
  821: sched_task_t *
  822: schedLIORead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd, 
  823: 		struct iovec *bufs, size_t nbufs, off_t offset)
  824: {
  825: 	struct sigevent sig;
  826: 	struct aiocb **acb;
  827: 	off_t off;
  828: 	register int i;
  829: 
  830: 	if (!root || !func || !bufs || !nbufs)
  831: 		return NULL;
  832: 
  833: 	if (offset == (off_t) -1) {
  834: 		off = lseek(fd, 0, SEEK_CUR);
  835: 		if (off == -1) {
  836: 			LOGERR;
  837: 			return NULL;
  838: 		}
  839: 	} else
  840: 		off = offset;
  841: 
  842: 	if (!(acb = calloc(sizeof(void*), nbufs))) {
  843: 		LOGERR;
  844: 		return NULL;
  845: 	} else
  846: 		memset(acb, 0, sizeof(void*) * nbufs);
  847: 	for (i = 0; i < nbufs; off += bufs[i++].iov_len) {
  848: 		acb[i] = malloc(sizeof(struct aiocb));
  849: 		if (!acb[i]) {
  850: 			LOGERR;
  851: 			for (i = 0; i < nbufs; i++)
  852: 				if (acb[i])
  853: 					free(acb[i]);
  854: 			free(acb);
  855: 			return NULL;
  856: 		} else
  857: 			memset(acb[i], 0, sizeof(struct aiocb));
  858: 		acb[i]->aio_fildes = fd;
  859: 		acb[i]->aio_nbytes = bufs[i].iov_len;
  860: 		acb[i]->aio_buf = bufs[i].iov_base;
  861: 		acb[i]->aio_offset = off;
  862: 		acb[i]->aio_lio_opcode = LIO_READ;
  863: 	}
  864: 	memset(&sig, 0, sizeof sig);
  865: 	sig.sigev_notify = SIGEV_KEVENT;
  866: 	sig.sigev_notify_kqueue = root->root_kq;
  867: 	sig.sigev_value.sival_ptr = acb;
  868: 
  869: 	if (lio_listio(LIO_NOWAIT, acb, nbufs, &sig)) {
  870: 		LOGERR;
  871: 		for (i = 0; i < nbufs; i++)
  872: 			if (acb[i])
  873: 				free(acb[i]);
  874: 		free(acb);
  875: 		return NULL;
  876: 	}
  877: 
  878: 	return schedLIO(root, func, arg, (void*) acb, bufs, nbufs);
  879: }
  880: 
  881: /*
  882:  * schedLIOWrite() - Add list of AIO write tasks to scheduler queue
  883:  *
  884:  * @root = root task
  885:  * @func = task execution function
  886:  * @arg = 1st func argument
  887:  * @fd = file descriptor
  888:  * @bufs = Buffer's list
  889:  * @nbufs = Number of Buffers
  890:  * @offset = Offset from start of file, if =-1 from current position
  891:  * return: NULL error or !=NULL new queued task
  892:  */
  893: sched_task_t *
  894: schedLIOWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd, 
  895: 		struct iovec *bufs, size_t nbufs, off_t offset)
  896: {
  897: 	struct sigevent sig;
  898: 	struct aiocb **acb;
  899: 	off_t off;
  900: 	register int i;
  901: 
  902: 	if (!root || !func || !bufs || !nbufs)
  903: 		return NULL;
  904: 
  905: 	if (offset == (off_t) -1) {
  906: 		off = lseek(fd, 0, SEEK_CUR);
  907: 		if (off == -1) {
  908: 			LOGERR;
  909: 			return NULL;
  910: 		}
  911: 	} else
  912: 		off = offset;
  913: 
  914: 	if (!(acb = calloc(sizeof(void*), nbufs))) {
  915: 		LOGERR;
  916: 		return NULL;
  917: 	} else
  918: 		memset(acb, 0, sizeof(void*) * nbufs);
  919: 	for (i = 0; i < nbufs; off += bufs[i++].iov_len) {
  920: 		acb[i] = malloc(sizeof(struct aiocb));
  921: 		if (!acb[i]) {
  922: 			LOGERR;
  923: 			for (i = 0; i < nbufs; i++)
  924: 				if (acb[i])
  925: 					free(acb[i]);
  926: 			free(acb);
  927: 			return NULL;
  928: 		} else
  929: 			memset(acb[i], 0, sizeof(struct aiocb));
  930: 		acb[i]->aio_fildes = fd;
  931: 		acb[i]->aio_nbytes = bufs[i].iov_len;
  932: 		acb[i]->aio_buf = bufs[i].iov_base;
  933: 		acb[i]->aio_offset = off;
  934: 		acb[i]->aio_lio_opcode = LIO_WRITE;
  935: 	}
  936: 	memset(&sig, 0, sizeof sig);
  937: 	sig.sigev_notify = SIGEV_KEVENT;
  938: 	sig.sigev_notify_kqueue = root->root_kq;
  939: 	sig.sigev_value.sival_ptr = acb;
  940: 
  941: 	if (lio_listio(LIO_NOWAIT, acb, nbufs, &sig)) {
  942: 		LOGERR;
  943: 		for (i = 0; i < nbufs; i++)
  944: 			if (acb[i])
  945: 				free(acb[i]);
  946: 		free(acb);
  947: 		return NULL;
  948: 	}
  949: 
  950: 	return schedLIO(root, func, arg, (void*) acb, bufs, nbufs);
  951: }
  952: #endif	/* EVFILT_LIO */
  953: #endif	/* AIO_SUPPORT */
  954: 
  955: /*
  956:  * schedTimer() - Add TIMER task to scheduler queue
  957:  *
  958:  * @root = root task
  959:  * @func = task execution function
  960:  * @arg = 1st func argument
  961:  * @ts = timeout argument structure
  962:  * @opt_data = Optional data
  963:  * @opt_dlen = Optional data length
  964:  * return: NULL error or !=NULL new queued task
  965:  */
  966: sched_task_t *
  967: schedTimer(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts, 
  968: 		void *opt_data, size_t opt_dlen)
  969: {
  970: 	sched_task_t *task, *tmp, *t = NULL;
  971: 	void *ptr;
  972: 	struct timespec now;
  973: 
  974: 	if (!root || !func)
  975: 		return NULL;
  976: 
  977: 	/* get new task */
  978: 	if (!(task = sched_useTask(root)))
  979: 		return NULL;
  980: 
  981: 	task->task_func = func;
  982: 	TASK_TYPE(task) = taskTIMER;
  983: 	TASK_ROOT(task) = root;
  984: 
  985: 	TASK_ARG(task) = arg;
  986: 
  987: 	TASK_DATA(task) = opt_data;
  988: 	TASK_DATLEN(task) = opt_dlen;
  989: 
  990: 	/* calculate timeval structure */
  991: 	clock_gettime(CLOCK_MONOTONIC, &now);
  992: 	now.tv_sec += ts.tv_sec;
  993: 	now.tv_nsec += ts.tv_nsec;
  994: 	if (now.tv_nsec >= 1000000000L) {
  995: 		now.tv_sec++;
  996: 		now.tv_nsec -= 1000000000L;
  997: 	} else if (now.tv_nsec < 0) {
  998: 		now.tv_sec--;
  999: 		now.tv_nsec += 1000000000L;
 1000: 	}
 1001: 	TASK_TS(task) = now;
 1002: 
 1003: 	if (root->root_hooks.hook_add.timer)
 1004: 		ptr = root->root_hooks.hook_add.timer(task, NULL);
 1005: 	else
 1006: 		ptr = NULL;
 1007: 
 1008: 	if (!ptr) {
 1009: #ifdef HAVE_LIBPTHREAD
 1010: 		pthread_mutex_lock(&root->root_mtx[taskTIMER]);
 1011: #endif
 1012: #ifdef TIMER_WITHOUT_SORT
 1013: 		TAILQ_INSERT_TAIL(&root->root_timer, TASK_ID(task), task_node);
 1014: #else
 1015: 		TAILQ_FOREACH_SAFE(t, &root->root_timer, task_node, tmp)
 1016: 			if (sched_timespeccmp(&TASK_TS(task), &TASK_TS(t), -) < 1)
 1017: 				break;
 1018: 		if (!t)
 1019: 			TAILQ_INSERT_TAIL(&root->root_timer, TASK_ID(task), task_node);
 1020: 		else
 1021: 			TAILQ_INSERT_BEFORE(t, TASK_ID(task), task_node);
 1022: #endif
 1023: #ifdef HAVE_LIBPTHREAD
 1024: 		pthread_mutex_unlock(&root->root_mtx[taskTIMER]);
 1025: #endif
 1026: 	} else
 1027: 		task = sched_unuseTask(task);
 1028: 
 1029: 	return task;
 1030: }
 1031: 
 1032: /*
 1033:  * schedEvent() - Add EVENT task to scheduler queue
 1034:  *
 1035:  * @root = root task
 1036:  * @func = task execution function
 1037:  * @arg = 1st func argument
 1038:  * @val = additional func argument
 1039:  * @opt_data = Optional data
 1040:  * @opt_dlen = Optional data length
 1041:  * return: NULL error or !=NULL new queued task
 1042:  */
 1043: sched_task_t *
 1044: schedEvent(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val, 
 1045: 		void *opt_data, size_t opt_dlen)
 1046: {
 1047: 	sched_task_t *task;
 1048: 	void *ptr;
 1049: 
 1050: 	if (!root || !func)
 1051: 		return NULL;
 1052: 
 1053: 	/* get new task */
 1054: 	if (!(task = sched_useTask(root)))
 1055: 		return NULL;
 1056: 
 1057: 	task->task_func = func;
 1058: 	TASK_TYPE(task) = taskEVENT;
 1059: 	TASK_ROOT(task) = root;
 1060: 
 1061: 	TASK_ARG(task) = arg;
 1062: 	TASK_VAL(task) = val;
 1063: 
 1064: 	TASK_DATA(task) = opt_data;
 1065: 	TASK_DATLEN(task) = opt_dlen;
 1066: 
 1067: 	if (root->root_hooks.hook_add.event)
 1068: 		ptr = root->root_hooks.hook_add.event(task, NULL);
 1069: 	else
 1070: 		ptr = NULL;
 1071: 
 1072: 	if (!ptr) {
 1073: #ifdef HAVE_LIBPTHREAD
 1074: 		pthread_mutex_lock(&root->root_mtx[taskEVENT]);
 1075: #endif
 1076: 		TAILQ_INSERT_TAIL(&root->root_event, TASK_ID(task), task_node);
 1077: #ifdef HAVE_LIBPTHREAD
 1078: 		pthread_mutex_unlock(&root->root_mtx[taskEVENT]);
 1079: #endif
 1080: 	} else
 1081: 		task = sched_unuseTask(task);
 1082: 
 1083: 	return task;
 1084: }
 1085: 
 1086: 
 1087: /*
 1088:  * schedTask() - Add regular task to scheduler queue
 1089:  *
 1090:  * @root = root task
 1091:  * @func = task execution function
 1092:  * @arg = 1st func argument
 1093:  * @prio = regular task priority, 0 is hi priority for regular tasks
 1094:  * @opt_data = Optional data
 1095:  * @opt_dlen = Optional data length
 1096:  * return: NULL error or !=NULL new queued task
 1097:  */
 1098: sched_task_t *
 1099: schedTask(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long prio, 
 1100: 		void *opt_data, size_t opt_dlen)
 1101: {
 1102: 	sched_task_t *task, *tmp, *t = NULL;
 1103: 	void *ptr;
 1104: 
 1105: 	if (!root || !func)
 1106: 		return NULL;
 1107: 
 1108: 	/* get new task */
 1109: 	if (!(task = sched_useTask(root)))
 1110: 		return NULL;
 1111: 
 1112: 	task->task_func = func;
 1113: 	TASK_TYPE(task) = taskTASK;
 1114: 	TASK_ROOT(task) = root;
 1115: 
 1116: 	TASK_ARG(task) = arg;
 1117: 	TASK_VAL(task) = prio;
 1118: 
 1119: 	TASK_DATA(task) = opt_data;
 1120: 	TASK_DATLEN(task) = opt_dlen;
 1121: 
 1122: 	if (root->root_hooks.hook_add.task)
 1123: 		ptr = root->root_hooks.hook_add.task(task, NULL);
 1124: 	else
 1125: 		ptr = NULL;
 1126: 
 1127: 	if (!ptr) {
 1128: #ifdef HAVE_LIBPTHREAD
 1129: 		pthread_mutex_lock(&root->root_mtx[taskTASK]);
 1130: #endif
 1131: 		TAILQ_FOREACH_SAFE(t, &root->root_task, task_node, tmp)
 1132: 			if (TASK_VAL(task) < TASK_VAL(t))
 1133: 				break;
 1134: 		if (!t)
 1135: 			TAILQ_INSERT_TAIL(&root->root_task, TASK_ID(task), task_node);
 1136: 		else
 1137: 			TAILQ_INSERT_BEFORE(t, TASK_ID(task), task_node);
 1138: #ifdef HAVE_LIBPTHREAD
 1139: 		pthread_mutex_unlock(&root->root_mtx[taskTASK]);
 1140: #endif
 1141: 	} else
 1142: 		task = sched_unuseTask(task);
 1143: 
 1144: 	return task;
 1145: }
 1146: 
 1147: /*
 1148:  * schedSuspend() - Add Suspended task to scheduler queue
 1149:  *
 1150:  * @root = root task
 1151:  * @func = task execution function
 1152:  * @arg = 1st func argument
 1153:  * @id = Trigger ID
 1154:  * @opt_data = Optional data
 1155:  * @opt_dlen = Optional data length
 1156:  * return: NULL error or !=NULL new queued task
 1157:  */
 1158: sched_task_t *
 1159: schedSuspend(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id, 
 1160: 		void *opt_data, size_t opt_dlen)
 1161: {
 1162: 	sched_task_t *task;
 1163: 	void *ptr;
 1164: 
 1165: 	if (!root || !func)
 1166: 		return NULL;
 1167: 
 1168: 	/* get new task */
 1169: 	if (!(task = sched_useTask(root)))
 1170: 		return NULL;
 1171: 
 1172: 	task->task_func = func;
 1173: 	TASK_TYPE(task) = taskSUSPEND;
 1174: 	TASK_ROOT(task) = root;
 1175: 
 1176: 	TASK_ARG(task) = arg;
 1177: 	TASK_VAL(task) = id;
 1178: 
 1179: 	TASK_DATA(task) = opt_data;
 1180: 	TASK_DATLEN(task) = opt_dlen;
 1181: 
 1182: 	if (root->root_hooks.hook_add.suspend)
 1183: 		ptr = root->root_hooks.hook_add.suspend(task, NULL);
 1184: 	else
 1185: 		ptr = NULL;
 1186: 
 1187: 	if (!ptr) {
 1188: #ifdef HAVE_LIBPTHREAD
 1189: 		pthread_mutex_lock(&root->root_mtx[taskSUSPEND]);
 1190: #endif
 1191: 		TAILQ_INSERT_TAIL(&root->root_suspend, TASK_ID(task), task_node);
 1192: #ifdef HAVE_LIBPTHREAD
 1193: 		pthread_mutex_unlock(&root->root_mtx[taskSUSPEND]);
 1194: #endif
 1195: 	} else
 1196: 		task = sched_unuseTask(task);
 1197: 
 1198: 	return task;
 1199: }
 1200: 
 1201: /*
 1202:  * schedCallOnce() - Call once from scheduler
 1203:  *
 1204:  * @root = root task
 1205:  * @func = task execution function
 1206:  * @arg = 1st func argument
 1207:  * @val = additional func argument
 1208:  * @opt_data = Optional data
 1209:  * @opt_dlen = Optional data length
 1210:  * return: return value from called func
 1211:  */
 1212: sched_task_t *
 1213: schedCallOnce(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val, 
 1214: 		void *opt_data, size_t opt_dlen)
 1215: {
 1216: 	sched_task_t *task;
 1217: 	void *ret;
 1218: 
 1219: 	if (!root || !func)
 1220: 		return NULL;
 1221: 
 1222: 	/* get new task */
 1223: 	if (!(task = sched_useTask(root)))
 1224: 		return NULL;
 1225: 
 1226: 	task->task_func = func;
 1227: 	TASK_TYPE(task) = taskEVENT;
 1228: 	TASK_ROOT(task) = root;
 1229: 
 1230: 	TASK_ARG(task) = arg;
 1231: 	TASK_VAL(task) = val;
 1232: 
 1233: 	TASK_DATA(task) = opt_data;
 1234: 	TASK_DATLEN(task) = opt_dlen;
 1235: 
 1236: 	ret = schedCall(task);
 1237: 
 1238: 	sched_unuseTask(task);
 1239: 	return ret;
 1240: }
 1241: 
 1242: /*
 1243:  * schedThread() - Add thread task to scheduler queue
 1244:  *
 1245:  * @root = root task
 1246:  * @func = task execution function
 1247:  * @arg = 1st func argument
 1248:  * @detach = Detach thread from scheduler, if !=0
 1249:  * @ss = stack size
 1250:  * @opt_data = Optional data
 1251:  * @opt_dlen = Optional data length
 1252:  * return: NULL error or !=NULL new queued task
 1253:  */
 1254: sched_task_t *
 1255: schedThread(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int detach, 
 1256: 		size_t ss, void *opt_data, size_t opt_dlen)
 1257: {
 1258: #ifndef HAVE_LIBPTHREAD
 1259: 	sched_SetErr(ENOTSUP, "Not supported thread tasks");
 1260: 	return NULL;
 1261: #endif
 1262: 	sched_task_t *task;
 1263: 	void *ptr;
 1264: 	pthread_attr_t attr;
 1265: 	sem_t *s = NULL;
 1266: 
 1267: 	if (!root || !func)
 1268: 		return NULL;
 1269: 	else {
 1270: 		/* normalizing stack size & detach state */
 1271: 		if (ss)
 1272: 			ss &= 0x7FFFFFFF;
 1273: 		detach = detach ? PTHREAD_CREATE_DETACHED : PTHREAD_CREATE_JOINABLE;
 1274: 	}
 1275: 
 1276: 	if (!(s = (sem_t*) malloc(sizeof(sem_t)))) {
 1277: 		LOGERR;
 1278: 		return NULL;
 1279: 	}
 1280: 	if (sem_init(s, 0, 1)) {
 1281: 		LOGERR;
 1282: 		free(s);
 1283: 		return NULL;
 1284: 	}
 1285: 
 1286: 	/* get new task */
 1287: 	if (!(task = sched_useTask(root))) {
 1288: 		sem_destroy(s);
 1289: 		free(s);
 1290: 
 1291: 		return NULL;
 1292: 	}
 1293: 
 1294: 	task->task_func = func;
 1295: 	TASK_TYPE(task) = taskTHREAD;
 1296: 	TASK_ROOT(task) = root;
 1297: 
 1298: 	TASK_ARG(task) = arg;
 1299: 	TASK_FLAG(task) = detach;
 1300: 	TASK_RET(task) = (intptr_t) s;
 1301: 
 1302: 	TASK_DATA(task) = opt_data;
 1303: 	TASK_DATLEN(task) = opt_dlen;
 1304: 
 1305: 	pthread_attr_init(&attr);
 1306: 	pthread_attr_setdetachstate(&attr, detach);
 1307: 	if (ss && (errno = pthread_attr_setstacksize(&attr, ss))) {
 1308: 		LOGERR;
 1309: 		pthread_attr_destroy(&attr);
 1310: 		sem_destroy(s);
 1311: 		free(s);
 1312: 		return sched_unuseTask(task);
 1313: 	}
 1314: 	if ((errno = pthread_attr_getstacksize(&attr, &ss))) {
 1315: 		LOGERR;
 1316: 		pthread_attr_destroy(&attr);
 1317: 		sem_destroy(s);
 1318: 		free(s);
 1319: 		return sched_unuseTask(task);
 1320: 	} else
 1321: 		TASK_FLAG(task) |= (ss << 1);
 1322: 	if ((errno = pthread_attr_setguardsize(&attr, ss))) {
 1323: 		LOGERR;
 1324: 		pthread_attr_destroy(&attr);
 1325: 		sem_destroy(s);
 1326: 		free(s);
 1327: 		return sched_unuseTask(task);
 1328: 	}
 1329: #ifdef SCHED_RR
 1330: 	pthread_attr_setschedpolicy(&attr, SCHED_RR);
 1331: #else
 1332: 	pthread_attr_setschedpolicy(&attr, SCHED_OTHER);
 1333: #endif
 1334: 	if (root->root_hooks.hook_add.thread)
 1335: 		ptr = root->root_hooks.hook_add.thread(task, &attr);
 1336: 	else
 1337: 		ptr = NULL;
 1338: 	pthread_attr_destroy(&attr);
 1339: 
 1340: 	if (!ptr) {
 1341: 		pthread_mutex_lock(&root->root_mtx[taskTHREAD]);
 1342: 		TAILQ_INSERT_TAIL(&root->root_thread, TASK_ID(task), task_node);
 1343: 		pthread_mutex_unlock(&root->root_mtx[taskTHREAD]);
 1344: 
 1345: 		/* wait for init thread actions */
 1346: 		sem_wait(s);
 1347: 	} else
 1348: 		task = sched_unuseTask(task);
 1349: 
 1350: 	sem_destroy(s);
 1351: 	free(s);
 1352: 	return task;
 1353: }
 1354: 
 1355: /*
 1356:  * schedRTC() - Add RTC task to scheduler queue
 1357:  *
 1358:  * @root = root task
 1359:  * @func = task execution function
 1360:  * @arg = 1st func argument
 1361:  * @ts = timeout argument structure, minimum alarm timer resolution is 1msec!
 1362:  * @opt_data = Optional RTC ID
 1363:  * @opt_dlen = Optional data length
 1364:  * return: NULL error or !=NULL new queued task
 1365:  */
 1366: sched_task_t *
 1367: schedRTC(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts, 
 1368: 		void *opt_data, size_t opt_dlen)
 1369: {
 1370: #if defined(HAVE_TIMER_CREATE) && defined(HAVE_TIMER_SETTIME)
 1371: 	sched_task_t *task;
 1372: 	void *ptr;
 1373: 
 1374: 	if (!root || !func)
 1375: 		return NULL;
 1376: 
 1377: 	/* get new task */
 1378: 	if (!(task = sched_useTask(root)))
 1379: 		return NULL;
 1380: 
 1381: 	task->task_func = func;
 1382: 	TASK_TYPE(task) = taskRTC;
 1383: 	TASK_ROOT(task) = root;
 1384: 
 1385: 	TASK_ARG(task) = arg;
 1386: 	TASK_TS(task) = ts;
 1387: 
 1388: 	TASK_DATA(task) = opt_data;
 1389: 	TASK_DATLEN(task) = opt_dlen;
 1390: 
 1391: 	if (root->root_hooks.hook_add.rtc)
 1392: 		ptr = root->root_hooks.hook_add.rtc(task, NULL);
 1393: 	else
 1394: 		ptr = NULL;
 1395: 
 1396: 	if (!ptr) {
 1397: #ifdef HAVE_LIBPTHREAD
 1398: 		pthread_mutex_lock(&root->root_mtx[taskRTC]);
 1399: #endif
 1400: 		TAILQ_INSERT_TAIL(&root->root_rtc, TASK_ID(task), task_node);
 1401: #ifdef HAVE_LIBPTHREAD
 1402: 		pthread_mutex_unlock(&root->root_mtx[taskRTC]);
 1403: #endif
 1404: 	} else
 1405: 		task = sched_unuseTask(task);
 1406: 
 1407: 	return task;
 1408: #else
 1409: 	sched_SetErr(ENOTSUP, "Not supported realtime clock extensions");
 1410: 	return NULL;
 1411: #endif
 1412: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>