File:  [ELWIX - Embedded LightWeight unIX -] / libaitsched / src / tasks.c
Revision 1.14.2.1: download - view: text, annotated - select for diffs - revision graph
Mon Sep 10 15:03:08 2012 UTC (11 years, 9 months ago) by misho
Branches: sched3_4
Diff to: branchpoint 1.14: preferred, unified
added stack size control for threads
critical bug fix for taskTHREAD::
 after exit thread task routine, task never removes from scheduler queue

    1: /*************************************************************************
    2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
    3: *  by Michael Pounov <misho@openbsd-bg.org>
    4: *
    5: * $Author: misho $
    6: * $Id: tasks.c,v 1.14.2.1 2012/09/10 15:03:08 misho Exp $
    7: *
    8: **************************************************************************
    9: The ELWIX and AITNET software is distributed under the following
   10: terms:
   11: 
   12: All of the documentation and software included in the ELWIX and AITNET
   13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
   14: 
   15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
   16: 	by Michael Pounov <misho@elwix.org>.  All rights reserved.
   17: 
   18: Redistribution and use in source and binary forms, with or without
   19: modification, are permitted provided that the following conditions
   20: are met:
   21: 1. Redistributions of source code must retain the above copyright
   22:    notice, this list of conditions and the following disclaimer.
   23: 2. Redistributions in binary form must reproduce the above copyright
   24:    notice, this list of conditions and the following disclaimer in the
   25:    documentation and/or other materials provided with the distribution.
   26: 3. All advertising materials mentioning features or use of this software
   27:    must display the following acknowledgement:
   28: This product includes software developed by Michael Pounov <misho@elwix.org>
   29: ELWIX - Embedded LightWeight unIX and its contributors.
   30: 4. Neither the name of AITNET nor the names of its contributors
   31:    may be used to endorse or promote products derived from this software
   32:    without specific prior written permission.
   33: 
   34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
   35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
   36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
   37: ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
   38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
   39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
   40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
   41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
   43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   44: SUCH DAMAGE.
   45: */
   46: #include "global.h"
   47: 
   48: 
   49: /*
   50:  * sched_useTask() - Get and init new task
   51:  *
   52:  * @root = root task
   53:  * return: NULL error or !=NULL prepared task
   54:  */
   55: inline sched_task_t *
   56: sched_useTask(sched_root_task_t * __restrict root)
   57: {
   58: 	sched_task_t *task, *tmp;
   59: 
   60: #ifdef HAVE_LIBPTHREAD
   61: 	pthread_mutex_lock(&root->root_mtx[taskUNUSE]);
   62: #endif
   63: 	TAILQ_FOREACH_SAFE(task, &root->root_unuse, task_node, tmp) {
   64: 		if (!TASK_ISLOCKED(task)) {
   65: 			TAILQ_REMOVE(&root->root_unuse, task, task_node);
   66: 			break;
   67: 		}
   68: 	}
   69: #ifdef HAVE_LIBPTHREAD
   70: 	pthread_mutex_unlock(&root->root_mtx[taskUNUSE]);
   71: #endif
   72: 
   73: 	if (!task) {
   74: 		task = malloc(sizeof(sched_task_t));
   75: 		if (!task) {
   76: 			LOGERR;
   77: 			return NULL;
   78: 		}
   79: 	}
   80: 
   81: 	memset(task, 0, sizeof(sched_task_t));
   82: 	task->task_id = (uintptr_t) task;
   83: 	return task;
   84: }
   85: 
   86: /*
   87:  * sched_unuseTask() - Unlock and put task to unuse queue
   88:  *
   89:  * @task = task
   90:  * return: always is NULL
   91:  */
   92: inline sched_task_t *
   93: sched_unuseTask(sched_task_t * __restrict task)
   94: {
   95: 	TASK_UNLOCK(task);
   96: 	TASK_TYPE(task) = taskUNUSE;
   97: #ifdef HAVE_LIBPTHREAD
   98: 	pthread_mutex_lock(&TASK_ROOT(task)->root_mtx[taskUNUSE]);
   99: #endif
  100: 	TAILQ_INSERT_TAIL(&TASK_ROOT(task)->root_unuse, TASK_ID(task), task_node);
  101: #ifdef HAVE_LIBPTHREAD
  102: 	pthread_mutex_unlock(&TASK_ROOT(task)->root_mtx[taskUNUSE]);
  103: #endif
  104: 	task = NULL;
  105: 
  106: 	return task;
  107: }
  108: 
  109: #pragma GCC visibility push(hidden)
  110: 
  111: #ifdef HAVE_LIBPTHREAD
  112: static void
  113: _sched_threadCleanup(sched_task_t *t)
  114: {
  115: 	if (!t || !TASK_ROOT(t))
  116: 		return;
  117: 
  118: 	if (TASK_FLAG(t) == PTHREAD_CREATE_JOINABLE)
  119: 		pthread_detach(pthread_self());
  120: 
  121: 	pthread_mutex_lock(&TASK_ROOT(t)->root_mtx[taskTHREAD]);
  122: 	TAILQ_REMOVE(&TASK_ROOT(t)->root_thread, t, task_node);
  123: 	pthread_mutex_unlock(&TASK_ROOT(t)->root_mtx[taskTHREAD]);
  124: 
  125: 	sched_unuseTask(t);
  126: }
  127: void *
  128: _sched_threadWrapper(sched_task_t *t)
  129: {
  130: 	void *ret = NULL;
  131: 	sem_t *s = NULL;
  132: 
  133: 	if (!t || !TASK_ROOT(t) || !TASK_RET(t))
  134: 		pthread_exit(ret);
  135: 	else
  136: 		s = (sem_t*) TASK_RET(t);
  137: 
  138: 	pthread_cleanup_push((void (*)(void*)) _sched_threadCleanup, t);
  139: 
  140: 	pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
  141: 	pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
  142: 
  143: 	/* notify parent, thread is ready for execution */
  144: 	sem_post(s);
  145: 	pthread_testcancel();
  146: 
  147: 	ret = TASK_FUNC(t)(t);
  148: 
  149: 	pthread_cleanup_pop(42);
  150: 	TASK_ROOT(t)->root_ret = ret;
  151: 	pthread_exit(ret);
  152: }
  153: #endif
  154: 
  155: #pragma GCC visibility pop
  156: 
  157: /*
  158:  * sched_taskExit() - Exit routine for scheduler task, explicit required for thread tasks
  159:  *
  160:  * @task = current task
  161:  * @retcode = return code
  162:  * return: return code
  163:  */
  164: inline void *
  165: sched_taskExit(sched_task_t *task, intptr_t retcode)
  166: {
  167: 	if (!task || !TASK_ROOT(task))
  168: 		return (void*) -1;
  169: 
  170: 	if (TASK_ROOT(task)->root_hooks.hook_exec.exit)
  171: 		TASK_ROOT(task)->root_hooks.hook_exec.exit(task, (void*) retcode);
  172: 
  173: 	TASK_ROOT(task)->root_ret = (void*) retcode;
  174: 	return (void*) retcode;
  175: }
  176: 
  177: 
  178: /*
  179:  * schedRead() - Add READ I/O task to scheduler queue
  180:  *
  181:  * @root = root task
  182:  * @func = task execution function
  183:  * @arg = 1st func argument
  184:  * @fd = fd handle
  185:  * @opt_data = Optional data
  186:  * @opt_dlen = Optional data length
  187:  * return: NULL error or !=NULL new queued task
  188:  */
  189: sched_task_t *
  190: schedRead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd, 
  191: 		void *opt_data, size_t opt_dlen)
  192: {
  193: 	sched_task_t *task;
  194: 	void *ptr;
  195: 
  196: 	if (!root || !func)
  197: 		return NULL;
  198: 
  199: 	/* get new task */
  200: 	if (!(task = sched_useTask(root)))
  201: 		return NULL;
  202: 
  203: 	task->task_func = func;
  204: 	TASK_TYPE(task) = taskREAD;
  205: 	TASK_ROOT(task) = root;
  206: 
  207: 	TASK_ARG(task) = arg;
  208: 	TASK_FD(task) = fd;
  209: 
  210: 	TASK_DATA(task) = opt_data;
  211: 	TASK_DATLEN(task) = opt_dlen;
  212: 
  213: 	if (root->root_hooks.hook_add.read)
  214: 		ptr = root->root_hooks.hook_add.read(task, NULL);
  215: 	else
  216: 		ptr = NULL;
  217: 
  218: 	if (!ptr) {
  219: #ifdef HAVE_LIBPTHREAD
  220: 		pthread_mutex_lock(&root->root_mtx[taskREAD]);
  221: #endif
  222: 		TAILQ_INSERT_TAIL(&root->root_read, TASK_ID(task), task_node);
  223: #ifdef HAVE_LIBPTHREAD
  224: 		pthread_mutex_unlock(&root->root_mtx[taskREAD]);
  225: #endif
  226: 	} else
  227: 		task = sched_unuseTask(task);
  228: 
  229: 	return task;
  230: }
  231: 
  232: /*
  233:  * schedWrite() - Add WRITE I/O task to scheduler queue
  234:  *
  235:  * @root = root task
  236:  * @func = task execution function
  237:  * @arg = 1st func argument
  238:  * @fd = fd handle
  239:  * @opt_data = Optional data
  240:  * @opt_dlen = Optional data length
  241:  * return: NULL error or !=NULL new queued task
  242:  */
  243: sched_task_t *
  244: schedWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd, 
  245: 		void *opt_data, size_t opt_dlen)
  246: {
  247: 	sched_task_t *task;
  248: 	void *ptr;
  249: 
  250: 	if (!root || !func)
  251: 		return NULL;
  252: 
  253: 	/* get new task */
  254: 	if (!(task = sched_useTask(root)))
  255: 		return NULL;
  256: 
  257: 	task->task_func = func;
  258: 	TASK_TYPE(task) = taskWRITE;
  259: 	TASK_ROOT(task) = root;
  260: 
  261: 	TASK_ARG(task) = arg;
  262: 	TASK_FD(task) = fd;
  263: 
  264: 	TASK_DATA(task) = opt_data;
  265: 	TASK_DATLEN(task) = opt_dlen;
  266: 
  267: 	if (root->root_hooks.hook_add.write)
  268: 		ptr = root->root_hooks.hook_add.write(task, NULL);
  269: 	else
  270: 		ptr = NULL;
  271: 
  272: 	if (!ptr) {
  273: #ifdef HAVE_LIBPTHREAD
  274: 		pthread_mutex_lock(&root->root_mtx[taskWRITE]);
  275: #endif
  276: 		TAILQ_INSERT_TAIL(&root->root_write, TASK_ID(task), task_node);
  277: #ifdef HAVE_LIBPTHREAD
  278: 		pthread_mutex_unlock(&root->root_mtx[taskWRITE]);
  279: #endif
  280: 	} else
  281: 		task = sched_unuseTask(task);
  282: 
  283: 	return task;
  284: }
  285: 
  286: /*
  287:  * schedNode() - Add NODE task to scheduler queue
  288:  *
  289:  * @root = root task
  290:  * @func = task execution function
  291:  * @arg = 1st func argument
  292:  * @fd = fd handle
  293:  * @opt_data = Optional data
  294:  * @opt_dlen = Optional data length
  295:  * return: NULL error or !=NULL new queued task
  296:  */
  297: sched_task_t *
  298: schedNode(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd, 
  299: 		void *opt_data, size_t opt_dlen)
  300: {
  301: 	sched_task_t *task;
  302: 	void *ptr;
  303: 
  304: 	if (!root || !func)
  305: 		return NULL;
  306: 
  307: 	/* get new task */
  308: 	if (!(task = sched_useTask(root)))
  309: 		return NULL;
  310: 
  311: 	task->task_func = func;
  312: 	TASK_TYPE(task) = taskNODE;
  313: 	TASK_ROOT(task) = root;
  314: 
  315: 	TASK_ARG(task) = arg;
  316: 	TASK_FD(task) = fd;
  317: 
  318: 	TASK_DATA(task) = opt_data;
  319: 	TASK_DATLEN(task) = opt_dlen;
  320: 
  321: 	if (root->root_hooks.hook_add.node)
  322: 		ptr = root->root_hooks.hook_add.node(task, NULL);
  323: 	else
  324: 		ptr = NULL;
  325: 
  326: 	if (!ptr) {
  327: #ifdef HAVE_LIBPTHREAD
  328: 		pthread_mutex_lock(&root->root_mtx[taskNODE]);
  329: #endif
  330: 		TAILQ_INSERT_TAIL(&root->root_node, TASK_ID(task), task_node);
  331: #ifdef HAVE_LIBPTHREAD
  332: 		pthread_mutex_unlock(&root->root_mtx[taskNODE]);
  333: #endif
  334: 	} else
  335: 		task = sched_unuseTask(task);
  336: 
  337: 	return task;
  338: }
  339: 
  340: /*
  341:  * schedProc() - Add PROC task to scheduler queue
  342:  *
  343:  * @root = root task
  344:  * @func = task execution function
  345:  * @arg = 1st func argument
  346:  * @pid = PID
  347:  * @opt_data = Optional data
  348:  * @opt_dlen = Optional data length
  349:  * return: NULL error or !=NULL new queued task
  350:  */
  351: sched_task_t *
  352: schedProc(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long pid, 
  353: 		void *opt_data, size_t opt_dlen)
  354: {
  355: 	sched_task_t *task;
  356: 	void *ptr;
  357: 
  358: 	if (!root || !func)
  359: 		return NULL;
  360: 
  361: 	/* get new task */
  362: 	if (!(task = sched_useTask(root)))
  363: 		return NULL;
  364: 
  365: 	task->task_func = func;
  366: 	TASK_TYPE(task) = taskPROC;
  367: 	TASK_ROOT(task) = root;
  368: 
  369: 	TASK_ARG(task) = arg;
  370: 	TASK_VAL(task) = pid;
  371: 
  372: 	TASK_DATA(task) = opt_data;
  373: 	TASK_DATLEN(task) = opt_dlen;
  374: 
  375: 	if (root->root_hooks.hook_add.proc)
  376: 		ptr = root->root_hooks.hook_add.proc(task, NULL);
  377: 	else
  378: 		ptr = NULL;
  379: 
  380: 	if (!ptr) {
  381: #ifdef HAVE_LIBPTHREAD
  382: 		pthread_mutex_lock(&root->root_mtx[taskPROC]);
  383: #endif
  384: 		TAILQ_INSERT_TAIL(&root->root_proc, TASK_ID(task), task_node);
  385: #ifdef HAVE_LIBPTHREAD
  386: 		pthread_mutex_unlock(&root->root_mtx[taskPROC]);
  387: #endif
  388: 	} else
  389: 		task = sched_unuseTask(task);
  390: 
  391: 	return task;
  392: }
  393: 
  394: /*
  395:  * schedUser() - Add trigger USER task to scheduler queue
  396:  *
  397:  * @root = root task
  398:  * @func = task execution function
  399:  * @arg = 1st func argument
  400:  * @id = Trigger ID
  401:  * @opt_data = Optional data
  402:  * @opt_dlen = Optional user's trigger flags
  403:  * return: NULL error or !=NULL new queued task
  404:  */
  405: sched_task_t *
  406: schedUser(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id, 
  407: 		void *opt_data, size_t opt_dlen)
  408: {
  409: #ifndef EVFILT_USER
  410: 	sched_SetErr(ENOTSUP, "Not supported kevent() filter");
  411: 	return NULL;
  412: #else
  413: 	sched_task_t *task;
  414: 	void *ptr;
  415: 
  416: 	if (!root || !func)
  417: 		return NULL;
  418: 
  419: 	/* get new task */
  420: 	if (!(task = sched_useTask(root)))
  421: 		return NULL;
  422: 
  423: 	task->task_func = func;
  424: 	TASK_TYPE(task) = taskUSER;
  425: 	TASK_ROOT(task) = root;
  426: 
  427: 	TASK_ARG(task) = arg;
  428: 	TASK_VAL(task) = id;
  429: 
  430: 	TASK_DATA(task) = opt_data;
  431: 	TASK_DATLEN(task) = opt_dlen;
  432: 
  433: 	if (root->root_hooks.hook_add.user)
  434: 		ptr = root->root_hooks.hook_add.user(task, NULL);
  435: 	else
  436: 		ptr = NULL;
  437: 
  438: 	if (!ptr) {
  439: #ifdef HAVE_LIBPTHREAD
  440: 		pthread_mutex_lock(&root->root_mtx[taskUSER]);
  441: #endif
  442: 		TAILQ_INSERT_TAIL(&root->root_user, TASK_ID(task), task_node);
  443: #ifdef HAVE_LIBPTHREAD
  444: 		pthread_mutex_unlock(&root->root_mtx[taskUSER]);
  445: #endif
  446: 	} else
  447: 		task = sched_unuseTask(task);
  448: 
  449: 	return task;
  450: #endif
  451: }
  452: 
  453: /*
  454:  * schedSignal() - Add SIGNAL task to scheduler queue
  455:  *
  456:  * @root = root task
  457:  * @func = task execution function
  458:  * @arg = 1st func argument
  459:  * @sig = Signal
  460:  * @opt_data = Optional data
  461:  * @opt_dlen = Optional data length
  462:  * return: NULL error or !=NULL new queued task
  463:  */
  464: sched_task_t *
  465: schedSignal(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long sig, 
  466: 		void *opt_data, size_t opt_dlen)
  467: {
  468: 	sched_task_t *task;
  469: 	void *ptr;
  470: 
  471: 	if (!root || !func)
  472: 		return NULL;
  473: 
  474: 	/* get new task */
  475: 	if (!(task = sched_useTask(root)))
  476: 		return NULL;
  477: 
  478: 	task->task_func = func;
  479: 	TASK_TYPE(task) = taskSIGNAL;
  480: 	TASK_ROOT(task) = root;
  481: 
  482: 	TASK_ARG(task) = arg;
  483: 	TASK_VAL(task) = sig;
  484: 
  485: 	TASK_DATA(task) = opt_data;
  486: 	TASK_DATLEN(task) = opt_dlen;
  487: 
  488: 	if (root->root_hooks.hook_add.signal)
  489: 		ptr = root->root_hooks.hook_add.signal(task, NULL);
  490: 	else
  491: 		ptr = NULL;
  492: 
  493: 	if (!ptr) {
  494: #ifdef HAVE_LIBPTHREAD
  495: 		pthread_mutex_lock(&root->root_mtx[taskSIGNAL]);
  496: #endif
  497: 		TAILQ_INSERT_TAIL(&root->root_signal, TASK_ID(task), task_node);
  498: #ifdef HAVE_LIBPTHREAD
  499: 		pthread_mutex_unlock(&root->root_mtx[taskSIGNAL]);
  500: #endif
  501: 	} else
  502: 		task = sched_unuseTask(task);
  503: 
  504: 	return task;
  505: }
  506: 
  507: /*
  508:  * schedAlarm() - Add ALARM task to scheduler queue
  509:  *
  510:  * @root = root task
  511:  * @func = task execution function
  512:  * @arg = 1st func argument
  513:  * @ts = timeout argument structure, minimum alarm timer resolution is 1msec!
  514:  * @opt_data = Optional data
  515:  * @opt_dlen = Optional data length
  516:  * return: NULL error or !=NULL new queued task
  517:  */
  518: sched_task_t *
  519: schedAlarm(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts, 
  520: 		void *opt_data, size_t opt_dlen)
  521: {
  522: 	sched_task_t *task;
  523: 	void *ptr;
  524: 
  525: 	if (!root || !func)
  526: 		return NULL;
  527: 
  528: 	/* get new task */
  529: 	if (!(task = sched_useTask(root)))
  530: 		return NULL;
  531: 
  532: 	task->task_func = func;
  533: 	TASK_TYPE(task) = taskALARM;
  534: 	TASK_ROOT(task) = root;
  535: 
  536: 	TASK_ARG(task) = arg;
  537: 	TASK_TS(task) = ts;
  538: 
  539: 	TASK_DATA(task) = opt_data;
  540: 	TASK_DATLEN(task) = opt_dlen;
  541: 
  542: 	if (root->root_hooks.hook_add.alarm)
  543: 		ptr = root->root_hooks.hook_add.alarm(task, NULL);
  544: 	else
  545: 		ptr = NULL;
  546: 
  547: 	if (!ptr) {
  548: #ifdef HAVE_LIBPTHREAD
  549: 		pthread_mutex_lock(&root->root_mtx[taskALARM]);
  550: #endif
  551: 		TAILQ_INSERT_TAIL(&root->root_alarm, TASK_ID(task), task_node);
  552: #ifdef HAVE_LIBPTHREAD
  553: 		pthread_mutex_unlock(&root->root_mtx[taskALARM]);
  554: #endif
  555: 	} else
  556: 		task = sched_unuseTask(task);
  557: 
  558: 	return task;
  559: }
  560: 
  561: #ifdef AIO_SUPPORT
  562: /*
  563:  * schedAIO() - Add AIO task to scheduler queue
  564:  *
  565:  * @root = root task
  566:  * @func = task execution function
  567:  * @arg = 1st func argument
  568:  * @acb = AIO cb structure address
  569:  * @opt_data = Optional data
  570:  * @opt_dlen = Optional data length
  571:  * return: NULL error or !=NULL new queued task
  572:  */
  573: sched_task_t *
  574: schedAIO(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, 
  575: 		struct aiocb * __restrict acb, void *opt_data, size_t opt_dlen)
  576: {
  577: 	sched_task_t *task;
  578: 	void *ptr;
  579: 
  580: 	if (!root || !func || !acb || !opt_dlen)
  581: 		return NULL;
  582: 
  583: 	/* get new task */
  584: 	if (!(task = sched_useTask(root)))
  585: 		return NULL;
  586: 
  587: 	task->task_func = func;
  588: 	TASK_TYPE(task) = taskAIO;
  589: 	TASK_ROOT(task) = root;
  590: 
  591: 	TASK_ARG(task) = arg;
  592: 	TASK_VAL(task) = (u_long) acb;
  593: 
  594: 	TASK_DATA(task) = opt_data;
  595: 	TASK_DATLEN(task) = opt_dlen;
  596: 
  597: 	if (root->root_hooks.hook_add.aio)
  598: 		ptr = root->root_hooks.hook_add.aio(task, NULL);
  599: 	else
  600: 		ptr = NULL;
  601: 
  602: 	if (!ptr) {
  603: #ifdef HAVE_LIBPTHREAD
  604: 		pthread_mutex_lock(&root->root_mtx[taskAIO]);
  605: #endif
  606: 		TAILQ_INSERT_TAIL(&root->root_aio, TASK_ID(task), task_node);
  607: #ifdef HAVE_LIBPTHREAD
  608: 		pthread_mutex_unlock(&root->root_mtx[taskAIO]);
  609: #endif
  610: 	} else
  611: 		task = sched_unuseTask(task);
  612: 
  613: 	return task;
  614: }
  615: 
  616: /*
  617:  * schedAIORead() - Add AIO read task to scheduler queue
  618:  *
  619:  * @root = root task
  620:  * @func = task execution function
  621:  * @arg = 1st func argument
  622:  * @fd = file descriptor
  623:  * @buffer = Buffer
  624:  * @buflen = Buffer length
  625:  * @offset = Offset from start of file, if =-1 from current position
  626:  * return: NULL error or !=NULL new queued task
  627:  */
  628: inline sched_task_t *
  629: schedAIORead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd, 
  630: 		void *buffer, size_t buflen, off_t offset)
  631: {
  632: 	struct aiocb *acb;
  633: 	off_t off;
  634: 
  635: 	if (!root || !func || !buffer || !buflen)
  636: 		return NULL;
  637: 
  638: 	if (offset == (off_t) -1) {
  639: 		off = lseek(fd, 0, SEEK_CUR);
  640: 		if (off == -1) {
  641: 			LOGERR;
  642: 			return NULL;
  643: 		}
  644: 	} else
  645: 		off = offset;
  646: 
  647: 	if (!(acb = malloc(sizeof(struct aiocb)))) {
  648: 		LOGERR;
  649: 		return NULL;
  650: 	} else
  651: 		memset(acb, 0, sizeof(struct aiocb));
  652: 
  653: 	acb->aio_fildes = fd;
  654: 	acb->aio_nbytes = buflen;
  655: 	acb->aio_buf = buffer;
  656: 	acb->aio_offset = off;
  657: 	acb->aio_sigevent.sigev_notify = SIGEV_KEVENT;
  658: 	acb->aio_sigevent.sigev_notify_kqueue = root->root_kq;
  659: 	acb->aio_sigevent.sigev_value.sival_ptr = acb;
  660: 
  661: 	if (aio_read(acb)) {
  662: 		LOGERR;
  663: 		free(acb);
  664: 		return NULL;
  665: 	}
  666: 
  667: 	return schedAIO(root, func, arg, acb, buffer, buflen);
  668: }
  669: 
  670: /*
  671:  * schedAIOWrite() - Add AIO write task to scheduler queue
  672:  *
  673:  * @root = root task
  674:  * @func = task execution function
  675:  * @arg = 1st func argument
  676:  * @fd = file descriptor
  677:  * @buffer = Buffer
  678:  * @buflen = Buffer length
  679:  * @offset = Offset from start of file, if =-1 from current position
  680:  * return: NULL error or !=NULL new queued task
  681:  */
  682: inline sched_task_t *
  683: schedAIOWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd, 
  684: 		void *buffer, size_t buflen, off_t offset)
  685: {
  686: 	struct aiocb *acb;
  687: 	off_t off;
  688: 
  689: 	if (!root || !func || !buffer || !buflen)
  690: 		return NULL;
  691: 
  692: 	if (offset == (off_t) -1) {
  693: 		off = lseek(fd, 0, SEEK_CUR);
  694: 		if (off == -1) {
  695: 			LOGERR;
  696: 			return NULL;
  697: 		}
  698: 	} else
  699: 		off = offset;
  700: 
  701: 	if (!(acb = malloc(sizeof(struct aiocb)))) {
  702: 		LOGERR;
  703: 		return NULL;
  704: 	} else
  705: 		memset(acb, 0, sizeof(struct aiocb));
  706: 
  707: 	acb->aio_fildes = fd;
  708: 	acb->aio_nbytes = buflen;
  709: 	acb->aio_buf = buffer;
  710: 	acb->aio_offset = off;
  711: 	acb->aio_sigevent.sigev_notify = SIGEV_KEVENT;
  712: 	acb->aio_sigevent.sigev_notify_kqueue = root->root_kq;
  713: 	acb->aio_sigevent.sigev_value.sival_ptr = acb;
  714: 
  715: 	if (aio_write(acb)) {
  716: 		LOGERR;
  717: 		free(acb);
  718: 		return NULL;
  719: 	}
  720: 
  721: 	return schedAIO(root, func, arg, acb, buffer, buflen);
  722: }
  723: 
  724: #ifdef EVFILT_LIO
  725: /*
  726:  * schedLIO() - Add AIO bulk tasks to scheduler queue
  727:  *
  728:  * @root = root task
  729:  * @func = task execution function
  730:  * @arg = 1st func argument
  731:  * @acbs = AIO cb structure addresses
  732:  * @opt_data = Optional data
  733:  * @opt_dlen = Optional data length
  734:  * return: NULL error or !=NULL new queued task
  735:  */
  736: sched_task_t *
  737: schedLIO(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, 
  738: 		struct aiocb ** __restrict acbs, void *opt_data, size_t opt_dlen)
  739: {
  740: 	sched_task_t *task;
  741: 	void *ptr;
  742: 
  743: 	if (!root || !func || !acbs || !opt_dlen)
  744: 		return NULL;
  745: 
  746: 	/* get new task */
  747: 	if (!(task = sched_useTask(root)))
  748: 		return NULL;
  749: 
  750: 	task->task_func = func;
  751: 	TASK_TYPE(task) = taskLIO;
  752: 	TASK_ROOT(task) = root;
  753: 
  754: 	TASK_ARG(task) = arg;
  755: 	TASK_VAL(task) = (u_long) acbs;
  756: 
  757: 	TASK_DATA(task) = opt_data;
  758: 	TASK_DATLEN(task) = opt_dlen;
  759: 
  760: 	if (root->root_hooks.hook_add.lio)
  761: 		ptr = root->root_hooks.hook_add.lio(task, NULL);
  762: 	else
  763: 		ptr = NULL;
  764: 
  765: 	if (!ptr) {
  766: #ifdef HAVE_LIBPTHREAD
  767: 		pthread_mutex_lock(&root->root_mtx[taskLIO]);
  768: #endif
  769: 		TAILQ_INSERT_TAIL(&root->root_lio, TASK_ID(task), task_node);
  770: #ifdef HAVE_LIBPTHREAD
  771: 		pthread_mutex_unlock(&root->root_mtx[taskLIO]);
  772: #endif
  773: 	} else
  774: 		task = sched_unuseTask(task);
  775: 
  776: 	return task;
  777: }
  778: 
  779: /*
  780:  * schedLIORead() - Add list of AIO read tasks to scheduler queue
  781:  *
  782:  * @root = root task
  783:  * @func = task execution function
  784:  * @arg = 1st func argument
  785:  * @fd = file descriptor
  786:  * @bufs = Buffer's list
  787:  * @nbufs = Number of Buffers
  788:  * @offset = Offset from start of file, if =-1 from current position
  789:  * return: NULL error or !=NULL new queued task
  790:  */
  791: sched_task_t *
  792: schedLIORead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd, 
  793: 		struct iovec *bufs, size_t nbufs, off_t offset)
  794: {
  795: 	struct sigevent sig;
  796: 	struct aiocb **acb;
  797: 	off_t off;
  798: 	register int i;
  799: 
  800: 	if (!root || !func || !bufs || !nbufs)
  801: 		return NULL;
  802: 
  803: 	if (offset == (off_t) -1) {
  804: 		off = lseek(fd, 0, SEEK_CUR);
  805: 		if (off == -1) {
  806: 			LOGERR;
  807: 			return NULL;
  808: 		}
  809: 	} else
  810: 		off = offset;
  811: 
  812: 	if (!(acb = calloc(sizeof(void*), nbufs))) {
  813: 		LOGERR;
  814: 		return NULL;
  815: 	} else
  816: 		memset(acb, 0, sizeof(void*) * nbufs);
  817: 	for (i = 0; i < nbufs; off += bufs[i++].iov_len) {
  818: 		acb[i] = malloc(sizeof(struct aiocb));
  819: 		if (!acb[i]) {
  820: 			LOGERR;
  821: 			for (i = 0; i < nbufs; i++)
  822: 				if (acb[i])
  823: 					free(acb[i]);
  824: 			free(acb);
  825: 			return NULL;
  826: 		} else
  827: 			memset(acb[i], 0, sizeof(struct aiocb));
  828: 		acb[i]->aio_fildes = fd;
  829: 		acb[i]->aio_nbytes = bufs[i].iov_len;
  830: 		acb[i]->aio_buf = bufs[i].iov_base;
  831: 		acb[i]->aio_offset = off;
  832: 		acb[i]->aio_lio_opcode = LIO_READ;
  833: 	}
  834: 	memset(&sig, 0, sizeof sig);
  835: 	sig.sigev_notify = SIGEV_KEVENT;
  836: 	sig.sigev_notify_kqueue = root->root_kq;
  837: 	sig.sigev_value.sival_ptr = acb;
  838: 
  839: 	if (lio_listio(LIO_NOWAIT, acb, nbufs, &sig)) {
  840: 		LOGERR;
  841: 		for (i = 0; i < nbufs; i++)
  842: 			if (acb[i])
  843: 				free(acb[i]);
  844: 		free(acb);
  845: 		return NULL;
  846: 	}
  847: 
  848: 	return schedLIO(root, func, arg, (void*) acb, bufs, nbufs);
  849: }
  850: 
  851: /*
  852:  * schedLIOWrite() - Add list of AIO write tasks to scheduler queue
  853:  *
  854:  * @root = root task
  855:  * @func = task execution function
  856:  * @arg = 1st func argument
  857:  * @fd = file descriptor
  858:  * @bufs = Buffer's list
  859:  * @nbufs = Number of Buffers
  860:  * @offset = Offset from start of file, if =-1 from current position
  861:  * return: NULL error or !=NULL new queued task
  862:  */
  863: inline sched_task_t *
  864: schedLIOWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd, 
  865: 		struct iovec *bufs, size_t nbufs, off_t offset)
  866: {
  867: 	struct sigevent sig;
  868: 	struct aiocb **acb;
  869: 	off_t off;
  870: 	register int i;
  871: 
  872: 	if (!root || !func || !bufs || !nbufs)
  873: 		return NULL;
  874: 
  875: 	if (offset == (off_t) -1) {
  876: 		off = lseek(fd, 0, SEEK_CUR);
  877: 		if (off == -1) {
  878: 			LOGERR;
  879: 			return NULL;
  880: 		}
  881: 	} else
  882: 		off = offset;
  883: 
  884: 	if (!(acb = calloc(sizeof(void*), nbufs))) {
  885: 		LOGERR;
  886: 		return NULL;
  887: 	} else
  888: 		memset(acb, 0, sizeof(void*) * nbufs);
  889: 	for (i = 0; i < nbufs; off += bufs[i++].iov_len) {
  890: 		acb[i] = malloc(sizeof(struct aiocb));
  891: 		if (!acb[i]) {
  892: 			LOGERR;
  893: 			for (i = 0; i < nbufs; i++)
  894: 				if (acb[i])
  895: 					free(acb[i]);
  896: 			free(acb);
  897: 			return NULL;
  898: 		} else
  899: 			memset(acb[i], 0, sizeof(struct aiocb));
  900: 		acb[i]->aio_fildes = fd;
  901: 		acb[i]->aio_nbytes = bufs[i].iov_len;
  902: 		acb[i]->aio_buf = bufs[i].iov_base;
  903: 		acb[i]->aio_offset = off;
  904: 		acb[i]->aio_lio_opcode = LIO_WRITE;
  905: 	}
  906: 	memset(&sig, 0, sizeof sig);
  907: 	sig.sigev_notify = SIGEV_KEVENT;
  908: 	sig.sigev_notify_kqueue = root->root_kq;
  909: 	sig.sigev_value.sival_ptr = acb;
  910: 
  911: 	if (lio_listio(LIO_NOWAIT, acb, nbufs, &sig)) {
  912: 		LOGERR;
  913: 		for (i = 0; i < nbufs; i++)
  914: 			if (acb[i])
  915: 				free(acb[i]);
  916: 		free(acb);
  917: 		return NULL;
  918: 	}
  919: 
  920: 	return schedLIO(root, func, arg, (void*) acb, bufs, nbufs);
  921: }
  922: #endif	/* EVFILT_LIO */
  923: #endif	/* AIO_SUPPORT */
  924: 
  925: /*
  926:  * schedTimer() - Add TIMER task to scheduler queue
  927:  *
  928:  * @root = root task
  929:  * @func = task execution function
  930:  * @arg = 1st func argument
  931:  * @ts = timeout argument structure
  932:  * @opt_data = Optional data
  933:  * @opt_dlen = Optional data length
  934:  * return: NULL error or !=NULL new queued task
  935:  */
  936: sched_task_t *
  937: schedTimer(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts, 
  938: 		void *opt_data, size_t opt_dlen)
  939: {
  940: 	sched_task_t *task, *tmp, *t = NULL;
  941: 	void *ptr;
  942: 	struct timespec now;
  943: 
  944: 	if (!root || !func)
  945: 		return NULL;
  946: 
  947: 	/* get new task */
  948: 	if (!(task = sched_useTask(root)))
  949: 		return NULL;
  950: 
  951: 	task->task_func = func;
  952: 	TASK_TYPE(task) = taskTIMER;
  953: 	TASK_ROOT(task) = root;
  954: 
  955: 	TASK_ARG(task) = arg;
  956: 
  957: 	TASK_DATA(task) = opt_data;
  958: 	TASK_DATLEN(task) = opt_dlen;
  959: 
  960: 	/* calculate timeval structure */
  961: 	clock_gettime(CLOCK_MONOTONIC, &now);
  962: 	now.tv_sec += ts.tv_sec;
  963: 	now.tv_nsec += ts.tv_nsec;
  964: 	if (now.tv_nsec >= 1000000000L) {
  965: 		now.tv_sec++;
  966: 		now.tv_nsec -= 1000000000L;
  967: 	} else if (now.tv_nsec < 0) {
  968: 		now.tv_sec--;
  969: 		now.tv_nsec += 1000000000L;
  970: 	}
  971: 	TASK_TS(task) = now;
  972: 
  973: 	if (root->root_hooks.hook_add.timer)
  974: 		ptr = root->root_hooks.hook_add.timer(task, NULL);
  975: 	else
  976: 		ptr = NULL;
  977: 
  978: 	if (!ptr) {
  979: #ifdef HAVE_LIBPTHREAD
  980: 		pthread_mutex_lock(&root->root_mtx[taskTIMER]);
  981: #endif
  982: #ifdef TIMER_WITHOUT_SORT
  983: 		TAILQ_INSERT_TAIL(&root->root_timer, TASK_ID(task), task_node);
  984: #else
  985: 		TAILQ_FOREACH_SAFE(t, &root->root_timer, task_node, tmp)
  986: 			if (sched_timespeccmp(&TASK_TS(task), &TASK_TS(t), -) < 1)
  987: 				break;
  988: 		if (!t)
  989: 			TAILQ_INSERT_TAIL(&root->root_timer, TASK_ID(task), task_node);
  990: 		else
  991: 			TAILQ_INSERT_BEFORE(t, TASK_ID(task), task_node);
  992: #endif
  993: #ifdef HAVE_LIBPTHREAD
  994: 		pthread_mutex_unlock(&root->root_mtx[taskTIMER]);
  995: #endif
  996: 	} else
  997: 		task = sched_unuseTask(task);
  998: 
  999: 	return task;
 1000: }
 1001: 
 1002: /*
 1003:  * schedEvent() - Add EVENT task to scheduler queue
 1004:  *
 1005:  * @root = root task
 1006:  * @func = task execution function
 1007:  * @arg = 1st func argument
 1008:  * @val = additional func argument
 1009:  * @opt_data = Optional data
 1010:  * @opt_dlen = Optional data length
 1011:  * return: NULL error or !=NULL new queued task
 1012:  */
 1013: sched_task_t *
 1014: schedEvent(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val, 
 1015: 		void *opt_data, size_t opt_dlen)
 1016: {
 1017: 	sched_task_t *task;
 1018: 	void *ptr;
 1019: 
 1020: 	if (!root || !func)
 1021: 		return NULL;
 1022: 
 1023: 	/* get new task */
 1024: 	if (!(task = sched_useTask(root)))
 1025: 		return NULL;
 1026: 
 1027: 	task->task_func = func;
 1028: 	TASK_TYPE(task) = taskEVENT;
 1029: 	TASK_ROOT(task) = root;
 1030: 
 1031: 	TASK_ARG(task) = arg;
 1032: 	TASK_VAL(task) = val;
 1033: 
 1034: 	TASK_DATA(task) = opt_data;
 1035: 	TASK_DATLEN(task) = opt_dlen;
 1036: 
 1037: 	if (root->root_hooks.hook_add.event)
 1038: 		ptr = root->root_hooks.hook_add.event(task, NULL);
 1039: 	else
 1040: 		ptr = NULL;
 1041: 
 1042: 	if (!ptr) {
 1043: #ifdef HAVE_LIBPTHREAD
 1044: 		pthread_mutex_lock(&root->root_mtx[taskEVENT]);
 1045: #endif
 1046: 		TAILQ_INSERT_TAIL(&root->root_event, TASK_ID(task), task_node);
 1047: #ifdef HAVE_LIBPTHREAD
 1048: 		pthread_mutex_unlock(&root->root_mtx[taskEVENT]);
 1049: #endif
 1050: 	} else
 1051: 		task = sched_unuseTask(task);
 1052: 
 1053: 	return task;
 1054: }
 1055: 
 1056: 
 1057: /*
 1058:  * schedTask() - Add regular task to scheduler queue
 1059:  *
 1060:  * @root = root task
 1061:  * @func = task execution function
 1062:  * @arg = 1st func argument
 1063:  * @prio = regular task priority, 0 is hi priority for regular tasks
 1064:  * @opt_data = Optional data
 1065:  * @opt_dlen = Optional data length
 1066:  * return: NULL error or !=NULL new queued task
 1067:  */
 1068: sched_task_t *
 1069: schedTask(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long prio, 
 1070: 		void *opt_data, size_t opt_dlen)
 1071: {
 1072: 	sched_task_t *task, *tmp, *t = NULL;
 1073: 	void *ptr;
 1074: 
 1075: 	if (!root || !func)
 1076: 		return NULL;
 1077: 
 1078: 	/* get new task */
 1079: 	if (!(task = sched_useTask(root)))
 1080: 		return NULL;
 1081: 
 1082: 	task->task_func = func;
 1083: 	TASK_TYPE(task) = taskTASK;
 1084: 	TASK_ROOT(task) = root;
 1085: 
 1086: 	TASK_ARG(task) = arg;
 1087: 	TASK_VAL(task) = prio;
 1088: 
 1089: 	TASK_DATA(task) = opt_data;
 1090: 	TASK_DATLEN(task) = opt_dlen;
 1091: 
 1092: 	if (root->root_hooks.hook_add.task)
 1093: 		ptr = root->root_hooks.hook_add.task(task, NULL);
 1094: 	else
 1095: 		ptr = NULL;
 1096: 
 1097: 	if (!ptr) {
 1098: #ifdef HAVE_LIBPTHREAD
 1099: 		pthread_mutex_lock(&root->root_mtx[taskTASK]);
 1100: #endif
 1101: 		TAILQ_FOREACH_SAFE(t, &root->root_task, task_node, tmp)
 1102: 			if (TASK_VAL(task) < TASK_VAL(t))
 1103: 				break;
 1104: 		if (!t)
 1105: 			TAILQ_INSERT_TAIL(&root->root_task, TASK_ID(task), task_node);
 1106: 		else
 1107: 			TAILQ_INSERT_BEFORE(t, TASK_ID(task), task_node);
 1108: #ifdef HAVE_LIBPTHREAD
 1109: 		pthread_mutex_unlock(&root->root_mtx[taskTASK]);
 1110: #endif
 1111: 	} else
 1112: 		task = sched_unuseTask(task);
 1113: 
 1114: 	return task;
 1115: }
 1116: 
 1117: /*
 1118:  * schedSuspend() - Add Suspended task to scheduler queue
 1119:  *
 1120:  * @root = root task
 1121:  * @func = task execution function
 1122:  * @arg = 1st func argument
 1123:  * @id = Trigger ID
 1124:  * @opt_data = Optional data
 1125:  * @opt_dlen = Optional data length
 1126:  * return: NULL error or !=NULL new queued task
 1127:  */
 1128: sched_task_t *
 1129: schedSuspend(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id, 
 1130: 		void *opt_data, size_t opt_dlen)
 1131: {
 1132: 	sched_task_t *task;
 1133: 	void *ptr;
 1134: 
 1135: 	if (!root || !func)
 1136: 		return NULL;
 1137: 
 1138: 	/* get new task */
 1139: 	if (!(task = sched_useTask(root)))
 1140: 		return NULL;
 1141: 
 1142: 	task->task_func = func;
 1143: 	TASK_TYPE(task) = taskSUSPEND;
 1144: 	TASK_ROOT(task) = root;
 1145: 
 1146: 	TASK_ARG(task) = arg;
 1147: 	TASK_VAL(task) = id;
 1148: 
 1149: 	TASK_DATA(task) = opt_data;
 1150: 	TASK_DATLEN(task) = opt_dlen;
 1151: 
 1152: 	if (root->root_hooks.hook_add.suspend)
 1153: 		ptr = root->root_hooks.hook_add.suspend(task, NULL);
 1154: 	else
 1155: 		ptr = NULL;
 1156: 
 1157: 	if (!ptr) {
 1158: #ifdef HAVE_LIBPTHREAD
 1159: 		pthread_mutex_lock(&root->root_mtx[taskSUSPEND]);
 1160: #endif
 1161: 		TAILQ_INSERT_TAIL(&root->root_suspend, TASK_ID(task), task_node);
 1162: #ifdef HAVE_LIBPTHREAD
 1163: 		pthread_mutex_unlock(&root->root_mtx[taskSUSPEND]);
 1164: #endif
 1165: 	} else
 1166: 		task = sched_unuseTask(task);
 1167: 
 1168: 	return task;
 1169: }
 1170: 
 1171: /*
 1172:  * schedCallOnce() - Call once from scheduler
 1173:  *
 1174:  * @root = root task
 1175:  * @func = task execution function
 1176:  * @arg = 1st func argument
 1177:  * @val = additional func argument
 1178:  * @opt_data = Optional data
 1179:  * @opt_dlen = Optional data length
 1180:  * return: return value from called func
 1181:  */
 1182: sched_task_t *
 1183: schedCallOnce(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val, 
 1184: 		void *opt_data, size_t opt_dlen)
 1185: {
 1186: 	sched_task_t *task;
 1187: 	void *ret;
 1188: 
 1189: 	if (!root || !func)
 1190: 		return NULL;
 1191: 
 1192: 	/* get new task */
 1193: 	if (!(task = sched_useTask(root)))
 1194: 		return NULL;
 1195: 
 1196: 	task->task_func = func;
 1197: 	TASK_TYPE(task) = taskEVENT;
 1198: 	TASK_ROOT(task) = root;
 1199: 
 1200: 	TASK_ARG(task) = arg;
 1201: 	TASK_VAL(task) = val;
 1202: 
 1203: 	TASK_DATA(task) = opt_data;
 1204: 	TASK_DATLEN(task) = opt_dlen;
 1205: 
 1206: 	ret = schedCall(task);
 1207: 
 1208: 	sched_unuseTask(task);
 1209: 	return ret;
 1210: }
 1211: 
 1212: /*
 1213:  * schedThread() - Add thread task to scheduler queue
 1214:  *
 1215:  * @root = root task
 1216:  * @func = task execution function
 1217:  * @arg = 1st func argument
 1218:  * @detach = Detach thread from scheduler, if !=0
 1219:  * @ss = stack size
 1220:  * @opt_data = Optional data
 1221:  * @opt_dlen = Optional data length
 1222:  * return: NULL error or !=NULL new queued task
 1223:  */
 1224: sched_task_t *
 1225: schedThread(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int detach, 
 1226: 		size_t ss, void *opt_data, size_t opt_dlen)
 1227: {
 1228: #ifndef HAVE_LIBPTHREAD
 1229: 	sched_SetErr(ENOTSUP, "Not supported thread tasks");
 1230: 	return NULL;
 1231: #endif
 1232: 	sched_task_t *task;
 1233: 	void *ptr;
 1234: 	pthread_attr_t attr;
 1235: 	sem_t *s = NULL;
 1236: 
 1237: 	if (!root || !func)
 1238: 		return NULL;
 1239: 	else {
 1240: 		/* normalizing stack size & detach state */
 1241: 		if (ss)
 1242: 			ss &= 0x7FFFFFFF;
 1243: 		detach = detach ? PTHREAD_CREATE_DETACHED : PTHREAD_CREATE_JOINABLE;
 1244: 	}
 1245: 
 1246: 	if (!(s = (sem_t*) malloc(sizeof(sem_t)))) {
 1247: 		LOGERR;
 1248: 		return NULL;
 1249: 	}
 1250: 	if (sem_init(s, 0, 1)) {
 1251: 		LOGERR;
 1252: 		free(s);
 1253: 		return NULL;
 1254: 	}
 1255: 
 1256: 	/* get new task */
 1257: 	if (!(task = sched_useTask(root))) {
 1258: 		sem_destroy(s);
 1259: 		free(s);
 1260: 
 1261: 		return NULL;
 1262: 	}
 1263: 
 1264: 	task->task_func = func;
 1265: 	TASK_TYPE(task) = taskTHREAD;
 1266: 	TASK_ROOT(task) = root;
 1267: 
 1268: 	TASK_ARG(task) = arg;
 1269: 	TASK_FLAG(task) = detach;
 1270: 	TASK_RET(task) = (intptr_t) s;
 1271: 
 1272: 	TASK_DATA(task) = opt_data;
 1273: 	TASK_DATLEN(task) = opt_dlen;
 1274: 
 1275: 	pthread_attr_init(&attr);
 1276: 	pthread_attr_setdetachstate(&attr, detach);
 1277: 	if (ss && (errno = pthread_attr_setstacksize(&attr, ss))) {
 1278: 		LOGERR;
 1279: 		pthread_attr_destroy(&attr);
 1280: 		sem_destroy(s);
 1281: 		free(s);
 1282: 		return sched_unuseTask(task);
 1283: 	}
 1284: 	if ((errno = pthread_attr_getstacksize(&attr, &ss))) {
 1285: 		LOGERR;
 1286: 		pthread_attr_destroy(&attr);
 1287: 		sem_destroy(s);
 1288: 		free(s);
 1289: 		return sched_unuseTask(task);
 1290: 	} else
 1291: 		TASK_FLAG(task) |= (ss << 1);
 1292: 	if ((errno = pthread_attr_setguardsize(&attr, ss))) {
 1293: 		LOGERR;
 1294: 		pthread_attr_destroy(&attr);
 1295: 		sem_destroy(s);
 1296: 		free(s);
 1297: 		return sched_unuseTask(task);
 1298: 	}
 1299: #ifdef SCHED_RR
 1300: 	pthread_attr_setschedpolicy(&attr, SCHED_RR);
 1301: #else
 1302: 	pthread_attr_setschedpolicy(&attr, SCHED_OTHER);
 1303: #endif
 1304: 	if (root->root_hooks.hook_add.thread)
 1305: 		ptr = root->root_hooks.hook_add.thread(task, &attr);
 1306: 	else
 1307: 		ptr = NULL;
 1308: 	pthread_attr_destroy(&attr);
 1309: 
 1310: 	if (!ptr) {
 1311: 		pthread_mutex_lock(&root->root_mtx[taskTHREAD]);
 1312: 		TAILQ_INSERT_TAIL(&root->root_thread, TASK_ID(task), task_node);
 1313: 		pthread_mutex_unlock(&root->root_mtx[taskTHREAD]);
 1314: 
 1315: 		/* wait for init thread actions */
 1316: 		sem_wait(s);
 1317: 	} else
 1318: 		task = sched_unuseTask(task);
 1319: 
 1320: 	sem_destroy(s);
 1321: 	free(s);
 1322: 	return task;
 1323: }
 1324: 

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>