File:  [ELWIX - Embedded LightWeight unIX -] / libaitsched / src / tasks.c
Revision 1.11.2.1: download - view: text, annotated - select for diffs - revision graph
Wed Aug 8 08:15:24 2012 UTC (11 years, 10 months ago) by misho
Branches: sched2_7
Diff to: branchpoint 1.11: preferred, unified
remove eventlo tasks and added new kind of task, regular tasks
schedTask() is new api call with local regular tasks priority
added ability to acquire new regular task missing hit rate before firing task

    1: /*************************************************************************
    2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
    3: *  by Michael Pounov <misho@openbsd-bg.org>
    4: *
    5: * $Author: misho $
    6: * $Id: tasks.c,v 1.11.2.1 2012/08/08 08:15:24 misho Exp $
    7: *
    8: **************************************************************************
    9: The ELWIX and AITNET software is distributed under the following
   10: terms:
   11: 
   12: All of the documentation and software included in the ELWIX and AITNET
   13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
   14: 
   15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
   16: 	by Michael Pounov <misho@elwix.org>.  All rights reserved.
   17: 
   18: Redistribution and use in source and binary forms, with or without
   19: modification, are permitted provided that the following conditions
   20: are met:
   21: 1. Redistributions of source code must retain the above copyright
   22:    notice, this list of conditions and the following disclaimer.
   23: 2. Redistributions in binary form must reproduce the above copyright
   24:    notice, this list of conditions and the following disclaimer in the
   25:    documentation and/or other materials provided with the distribution.
   26: 3. All advertising materials mentioning features or use of this software
   27:    must display the following acknowledgement:
   28: This product includes software developed by Michael Pounov <misho@elwix.org>
   29: ELWIX - Embedded LightWeight unIX and its contributors.
   30: 4. Neither the name of AITNET nor the names of its contributors
   31:    may be used to endorse or promote products derived from this software
   32:    without specific prior written permission.
   33: 
   34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
   35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
   36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
   37: ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
   38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
   39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
   40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
   41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
   43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   44: SUCH DAMAGE.
   45: */
   46: #include "global.h"
   47: 
   48: 
   49: #pragma GCC visibility push(hidden)
   50: 
   51: inline sched_task_t *
   52: _sched_useTask(sched_root_task_t * __restrict root)
   53: {
   54: 	sched_task_t *task, *tmp;
   55: 
   56: 	TAILQ_FOREACH_SAFE(task, &root->root_unuse, task_node, tmp) {
   57: 		if (!TASK_ISLOCKED(task)) {
   58: #ifdef HAVE_LIBPTHREAD
   59: 			pthread_mutex_lock(&root->root_mtx[taskUNUSE]);
   60: #endif
   61: 			TAILQ_REMOVE(&root->root_unuse, task, task_node);
   62: #ifdef HAVE_LIBPTHREAD
   63: 			pthread_mutex_unlock(&root->root_mtx[taskUNUSE]);
   64: #endif
   65: 			break;
   66: 		}
   67: 	}
   68: 
   69: 	if (!task) {
   70: 		task = malloc(sizeof(sched_task_t));
   71: 		if (!task) {
   72: 			LOGERR;
   73: 			return NULL;
   74: 		}
   75: 	}
   76: 
   77: 	memset(task, 0, sizeof(sched_task_t));
   78: 	task->task_id = (uintptr_t) task;
   79: 	return task;
   80: }
   81: 
   82: inline sched_task_t *
   83: _sched_unuseTask(sched_task_t * __restrict task)
   84: {
   85: 	TASK_UNLOCK(task);
   86: 	TASK_TYPE(task) = taskUNUSE;
   87: #ifdef HAVE_LIBPTHREAD
   88: 	pthread_mutex_lock(&TASK_ROOT(task)->root_mtx[taskUNUSE]);
   89: #endif
   90: 	TAILQ_INSERT_TAIL(&TASK_ROOT(task)->root_unuse, TASK_ID(task), task_node);
   91: #ifdef HAVE_LIBPTHREAD
   92: 	pthread_mutex_unlock(&TASK_ROOT(task)->root_mtx[taskUNUSE]);
   93: #endif
   94: 	task = NULL;
   95: 
   96: 	return task;
   97: }
   98: 
   99: #pragma GCC visibility pop
  100: 
  101: 
  102: /*
  103:  * schedRead() - Add READ I/O task to scheduler queue
  104:  *
  105:  * @root = root task
  106:  * @func = task execution function
  107:  * @arg = 1st func argument
  108:  * @fd = fd handle
  109:  * @opt_data = Optional data
  110:  * @opt_dlen = Optional data length
  111:  * return: NULL error or !=NULL new queued task
  112:  */
  113: sched_task_t *
  114: schedRead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd, 
  115: 		void *opt_data, size_t opt_dlen)
  116: {
  117: 	sched_task_t *task;
  118: 	void *ptr;
  119: 
  120: 	if (!root || !func)
  121: 		return NULL;
  122: 
  123: 	/* get new task */
  124: 	if (!(task = _sched_useTask(root)))
  125: 		return NULL;
  126: 
  127: 	task->task_func = func;
  128: 	TASK_TYPE(task) = taskREAD;
  129: 	TASK_ROOT(task) = root;
  130: 
  131: 	TASK_ARG(task) = arg;
  132: 	TASK_FD(task) = fd;
  133: 
  134: 	TASK_DATA(task) = opt_data;
  135: 	TASK_DATLEN(task) = opt_dlen;
  136: 
  137: 	if (root->root_hooks.hook_add.read)
  138: 		ptr = root->root_hooks.hook_add.read(task, NULL);
  139: 	else
  140: 		ptr = NULL;
  141: 
  142: 	if (!ptr) {
  143: #ifdef HAVE_LIBPTHREAD
  144: 		pthread_mutex_lock(&root->root_mtx[taskREAD]);
  145: #endif
  146: 		TAILQ_INSERT_TAIL(&root->root_read, TASK_ID(task), task_node);
  147: #ifdef HAVE_LIBPTHREAD
  148: 		pthread_mutex_unlock(&root->root_mtx[taskREAD]);
  149: #endif
  150: 	} else
  151: 		task = _sched_unuseTask(task);
  152: 
  153: 	return task;
  154: }
  155: 
  156: /*
  157:  * schedWrite() - Add WRITE I/O task to scheduler queue
  158:  *
  159:  * @root = root task
  160:  * @func = task execution function
  161:  * @arg = 1st func argument
  162:  * @fd = fd handle
  163:  * @opt_data = Optional data
  164:  * @opt_dlen = Optional data length
  165:  * return: NULL error or !=NULL new queued task
  166:  */
  167: sched_task_t *
  168: schedWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd, 
  169: 		void *opt_data, size_t opt_dlen)
  170: {
  171: 	sched_task_t *task;
  172: 	void *ptr;
  173: 
  174: 	if (!root || !func)
  175: 		return NULL;
  176: 
  177: 	/* get new task */
  178: 	if (!(task = _sched_useTask(root)))
  179: 		return NULL;
  180: 
  181: 	task->task_func = func;
  182: 	TASK_TYPE(task) = taskWRITE;
  183: 	TASK_ROOT(task) = root;
  184: 
  185: 	TASK_ARG(task) = arg;
  186: 	TASK_FD(task) = fd;
  187: 
  188: 	TASK_DATA(task) = opt_data;
  189: 	TASK_DATLEN(task) = opt_dlen;
  190: 
  191: 	if (root->root_hooks.hook_add.write)
  192: 		ptr = root->root_hooks.hook_add.write(task, NULL);
  193: 	else
  194: 		ptr = NULL;
  195: 
  196: 	if (!ptr) {
  197: #ifdef HAVE_LIBPTHREAD
  198: 		pthread_mutex_lock(&root->root_mtx[taskWRITE]);
  199: #endif
  200: 		TAILQ_INSERT_TAIL(&root->root_write, TASK_ID(task), task_node);
  201: #ifdef HAVE_LIBPTHREAD
  202: 		pthread_mutex_unlock(&root->root_mtx[taskWRITE]);
  203: #endif
  204: 	} else
  205: 		task = _sched_unuseTask(task);
  206: 
  207: 	return task;
  208: }
  209: 
  210: /*
  211:  * schedNode() - Add NODE task to scheduler queue
  212:  *
  213:  * @root = root task
  214:  * @func = task execution function
  215:  * @arg = 1st func argument
  216:  * @fd = fd handle
  217:  * @opt_data = Optional data
  218:  * @opt_dlen = Optional data length
  219:  * return: NULL error or !=NULL new queued task
  220:  */
  221: sched_task_t *
  222: schedNode(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd, 
  223: 		void *opt_data, size_t opt_dlen)
  224: {
  225: 	sched_task_t *task;
  226: 	void *ptr;
  227: 
  228: 	if (!root || !func)
  229: 		return NULL;
  230: 
  231: 	/* get new task */
  232: 	if (!(task = _sched_useTask(root)))
  233: 		return NULL;
  234: 
  235: 	task->task_func = func;
  236: 	TASK_TYPE(task) = taskNODE;
  237: 	TASK_ROOT(task) = root;
  238: 
  239: 	TASK_ARG(task) = arg;
  240: 	TASK_FD(task) = fd;
  241: 
  242: 	TASK_DATA(task) = opt_data;
  243: 	TASK_DATLEN(task) = opt_dlen;
  244: 
  245: 	if (root->root_hooks.hook_add.node)
  246: 		ptr = root->root_hooks.hook_add.node(task, NULL);
  247: 	else
  248: 		ptr = NULL;
  249: 
  250: 	if (!ptr) {
  251: #ifdef HAVE_LIBPTHREAD
  252: 		pthread_mutex_lock(&root->root_mtx[taskNODE]);
  253: #endif
  254: 		TAILQ_INSERT_TAIL(&root->root_node, TASK_ID(task), task_node);
  255: #ifdef HAVE_LIBPTHREAD
  256: 		pthread_mutex_unlock(&root->root_mtx[taskNODE]);
  257: #endif
  258: 	} else
  259: 		task = _sched_unuseTask(task);
  260: 
  261: 	return task;
  262: }
  263: 
  264: /*
  265:  * schedProc() - Add PROC task to scheduler queue
  266:  *
  267:  * @root = root task
  268:  * @func = task execution function
  269:  * @arg = 1st func argument
  270:  * @pid = PID
  271:  * @opt_data = Optional data
  272:  * @opt_dlen = Optional data length
  273:  * return: NULL error or !=NULL new queued task
  274:  */
  275: sched_task_t *
  276: schedProc(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long pid, 
  277: 		void *opt_data, size_t opt_dlen)
  278: {
  279: 	sched_task_t *task;
  280: 	void *ptr;
  281: 
  282: 	if (!root || !func)
  283: 		return NULL;
  284: 
  285: 	/* get new task */
  286: 	if (!(task = _sched_useTask(root)))
  287: 		return NULL;
  288: 
  289: 	task->task_func = func;
  290: 	TASK_TYPE(task) = taskPROC;
  291: 	TASK_ROOT(task) = root;
  292: 
  293: 	TASK_ARG(task) = arg;
  294: 	TASK_VAL(task) = pid;
  295: 
  296: 	TASK_DATA(task) = opt_data;
  297: 	TASK_DATLEN(task) = opt_dlen;
  298: 
  299: 	if (root->root_hooks.hook_add.proc)
  300: 		ptr = root->root_hooks.hook_add.proc(task, NULL);
  301: 	else
  302: 		ptr = NULL;
  303: 
  304: 	if (!ptr) {
  305: #ifdef HAVE_LIBPTHREAD
  306: 		pthread_mutex_lock(&root->root_mtx[taskPROC]);
  307: #endif
  308: 		TAILQ_INSERT_TAIL(&root->root_proc, TASK_ID(task), task_node);
  309: #ifdef HAVE_LIBPTHREAD
  310: 		pthread_mutex_unlock(&root->root_mtx[taskPROC]);
  311: #endif
  312: 	} else
  313: 		task = _sched_unuseTask(task);
  314: 
  315: 	return task;
  316: }
  317: 
  318: /*
  319:  * schedUser() - Add trigger USER task to scheduler queue
  320:  *
  321:  * @root = root task
  322:  * @func = task execution function
  323:  * @arg = 1st func argument
  324:  * @id = Trigger ID
  325:  * @opt_data = Optional data
  326:  * @opt_dlen = Optional user's trigger flags
  327:  * return: NULL error or !=NULL new queued task
  328:  */
  329: sched_task_t *
  330: schedUser(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id, 
  331: 		void *opt_data, size_t opt_dlen)
  332: {
  333: #ifndef EVFILT_USER
  334: 	sched_SetErr(ENOTSUP, "Not supported kevent() filter");
  335: 	return NULL;
  336: #else
  337: 	sched_task_t *task;
  338: 	void *ptr;
  339: 
  340: 	if (!root || !func)
  341: 		return NULL;
  342: 
  343: 	/* get new task */
  344: 	if (!(task = _sched_useTask(root)))
  345: 		return NULL;
  346: 
  347: 	task->task_func = func;
  348: 	TASK_TYPE(task) = taskUSER;
  349: 	TASK_ROOT(task) = root;
  350: 
  351: 	TASK_ARG(task) = arg;
  352: 	TASK_VAL(task) = id;
  353: 
  354: 	TASK_DATA(task) = opt_data;
  355: 	TASK_DATLEN(task) = opt_dlen;
  356: 
  357: 	if (root->root_hooks.hook_add.user)
  358: 		ptr = root->root_hooks.hook_add.user(task, NULL);
  359: 	else
  360: 		ptr = NULL;
  361: 
  362: 	if (!ptr) {
  363: #ifdef HAVE_LIBPTHREAD
  364: 		pthread_mutex_lock(&root->root_mtx[taskUSER]);
  365: #endif
  366: 		TAILQ_INSERT_TAIL(&root->root_user, TASK_ID(task), task_node);
  367: #ifdef HAVE_LIBPTHREAD
  368: 		pthread_mutex_unlock(&root->root_mtx[taskUSER]);
  369: #endif
  370: 	} else
  371: 		task = _sched_unuseTask(task);
  372: 
  373: 	return task;
  374: #endif
  375: }
  376: 
  377: /*
  378:  * schedSignal() - Add SIGNAL task to scheduler queue
  379:  *
  380:  * @root = root task
  381:  * @func = task execution function
  382:  * @arg = 1st func argument
  383:  * @sig = Signal
  384:  * @opt_data = Optional data
  385:  * @opt_dlen = Optional data length
  386:  * return: NULL error or !=NULL new queued task
  387:  */
  388: sched_task_t *
  389: schedSignal(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long sig, 
  390: 		void *opt_data, size_t opt_dlen)
  391: {
  392: 	sched_task_t *task;
  393: 	void *ptr;
  394: 
  395: 	if (!root || !func)
  396: 		return NULL;
  397: 
  398: 	/* get new task */
  399: 	if (!(task = _sched_useTask(root)))
  400: 		return NULL;
  401: 
  402: 	task->task_func = func;
  403: 	TASK_TYPE(task) = taskSIGNAL;
  404: 	TASK_ROOT(task) = root;
  405: 
  406: 	TASK_ARG(task) = arg;
  407: 	TASK_VAL(task) = sig;
  408: 
  409: 	TASK_DATA(task) = opt_data;
  410: 	TASK_DATLEN(task) = opt_dlen;
  411: 
  412: 	if (root->root_hooks.hook_add.signal)
  413: 		ptr = root->root_hooks.hook_add.signal(task, NULL);
  414: 	else
  415: 		ptr = NULL;
  416: 
  417: 	if (!ptr) {
  418: #ifdef HAVE_LIBPTHREAD
  419: 		pthread_mutex_lock(&root->root_mtx[taskSIGNAL]);
  420: #endif
  421: 		TAILQ_INSERT_TAIL(&root->root_signal, TASK_ID(task), task_node);
  422: #ifdef HAVE_LIBPTHREAD
  423: 		pthread_mutex_unlock(&root->root_mtx[taskSIGNAL]);
  424: #endif
  425: 	} else
  426: 		task = _sched_unuseTask(task);
  427: 
  428: 	return task;
  429: }
  430: 
  431: /*
  432:  * schedAlarm() - Add ALARM task to scheduler queue
  433:  *
  434:  * @root = root task
  435:  * @func = task execution function
  436:  * @arg = 1st func argument
  437:  * @ts = timeout argument structure, minimum alarm timer resolution is 1msec!
  438:  * @opt_data = Optional data
  439:  * @opt_dlen = Optional data length
  440:  * return: NULL error or !=NULL new queued task
  441:  */
  442: sched_task_t *
  443: schedAlarm(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts, 
  444: 		void *opt_data, size_t opt_dlen)
  445: {
  446: 	sched_task_t *task;
  447: 	void *ptr;
  448: 
  449: 	if (!root || !func)
  450: 		return NULL;
  451: 
  452: 	/* get new task */
  453: 	if (!(task = _sched_useTask(root)))
  454: 		return NULL;
  455: 
  456: 	task->task_func = func;
  457: 	TASK_TYPE(task) = taskALARM;
  458: 	TASK_ROOT(task) = root;
  459: 
  460: 	TASK_ARG(task) = arg;
  461: 	TASK_TS(task) = ts;
  462: 
  463: 	TASK_DATA(task) = opt_data;
  464: 	TASK_DATLEN(task) = opt_dlen;
  465: 
  466: 	if (root->root_hooks.hook_add.alarm)
  467: 		ptr = root->root_hooks.hook_add.alarm(task, NULL);
  468: 	else
  469: 		ptr = NULL;
  470: 
  471: 	if (!ptr) {
  472: #ifdef HAVE_LIBPTHREAD
  473: 		pthread_mutex_lock(&root->root_mtx[taskALARM]);
  474: #endif
  475: 		TAILQ_INSERT_TAIL(&root->root_alarm, TASK_ID(task), task_node);
  476: #ifdef HAVE_LIBPTHREAD
  477: 		pthread_mutex_unlock(&root->root_mtx[taskALARM]);
  478: #endif
  479: 	} else
  480: 		task = _sched_unuseTask(task);
  481: 
  482: 	return task;
  483: }
  484: 
  485: #ifdef AIO_SUPPORT
  486: /*
  487:  * schedAIO() - Add AIO task to scheduler queue
  488:  *
  489:  * @root = root task
  490:  * @func = task execution function
  491:  * @arg = 1st func argument
  492:  * @acb = AIO cb structure address
  493:  * @opt_data = Optional data
  494:  * @opt_dlen = Optional data length
  495:  * return: NULL error or !=NULL new queued task
  496:  */
  497: sched_task_t *
  498: schedAIO(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, 
  499: 		struct aiocb * __restrict acb, void *opt_data, size_t opt_dlen)
  500: {
  501: 	sched_task_t *task;
  502: 	void *ptr;
  503: 
  504: 	if (!root || !func || !acb || !opt_dlen)
  505: 		return NULL;
  506: 
  507: 	/* get new task */
  508: 	if (!(task = _sched_useTask(root)))
  509: 		return NULL;
  510: 
  511: 	task->task_func = func;
  512: 	TASK_TYPE(task) = taskAIO;
  513: 	TASK_ROOT(task) = root;
  514: 
  515: 	TASK_ARG(task) = arg;
  516: 	TASK_VAL(task) = (u_long) acb;
  517: 
  518: 	TASK_DATA(task) = opt_data;
  519: 	TASK_DATLEN(task) = opt_dlen;
  520: 
  521: 	if (root->root_hooks.hook_add.aio)
  522: 		ptr = root->root_hooks.hook_add.aio(task, NULL);
  523: 	else
  524: 		ptr = NULL;
  525: 
  526: 	if (!ptr) {
  527: #ifdef HAVE_LIBPTHREAD
  528: 		pthread_mutex_lock(&root->root_mtx[taskAIO]);
  529: #endif
  530: 		TAILQ_INSERT_TAIL(&root->root_aio, TASK_ID(task), task_node);
  531: #ifdef HAVE_LIBPTHREAD
  532: 		pthread_mutex_unlock(&root->root_mtx[taskAIO]);
  533: #endif
  534: 	} else
  535: 		task = _sched_unuseTask(task);
  536: 
  537: 	return task;
  538: }
  539: 
  540: /*
  541:  * schedAIORead() - Add AIO read task to scheduler queue
  542:  *
  543:  * @root = root task
  544:  * @func = task execution function
  545:  * @arg = 1st func argument
  546:  * @fd = file descriptor
  547:  * @buffer = Buffer
  548:  * @buflen = Buffer length
  549:  * @offset = Offset from start of file, if =-1 from current position
  550:  * return: NULL error or !=NULL new queued task
  551:  */
  552: inline sched_task_t *
  553: schedAIORead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd, 
  554: 		void *buffer, size_t buflen, off_t offset)
  555: {
  556: 	struct aiocb *acb;
  557: 	off_t off;
  558: 
  559: 	if (!root || !func || !buffer || !buflen)
  560: 		return NULL;
  561: 
  562: 	if (offset == (off_t) -1) {
  563: 		off = lseek(fd, 0, SEEK_CUR);
  564: 		if (off == -1) {
  565: 			LOGERR;
  566: 			return NULL;
  567: 		}
  568: 	} else
  569: 		off = offset;
  570: 
  571: 	if (!(acb = malloc(sizeof(struct aiocb)))) {
  572: 		LOGERR;
  573: 		return NULL;
  574: 	} else
  575: 		memset(acb, 0, sizeof(struct aiocb));
  576: 
  577: 	acb->aio_fildes = fd;
  578: 	acb->aio_nbytes = buflen;
  579: 	acb->aio_buf = buffer;
  580: 	acb->aio_offset = off;
  581: 	acb->aio_sigevent.sigev_notify = SIGEV_KEVENT;
  582: 	acb->aio_sigevent.sigev_notify_kqueue = root->root_kq;
  583: 	acb->aio_sigevent.sigev_value.sival_ptr = acb;
  584: 
  585: 	if (aio_read(acb)) {
  586: 		LOGERR;
  587: 		free(acb);
  588: 		return NULL;
  589: 	}
  590: 
  591: 	return schedAIO(root, func, arg, acb, buffer, buflen);
  592: }
  593: 
  594: /*
  595:  * schedAIOWrite() - Add AIO write task to scheduler queue
  596:  *
  597:  * @root = root task
  598:  * @func = task execution function
  599:  * @arg = 1st func argument
  600:  * @fd = file descriptor
  601:  * @buffer = Buffer
  602:  * @buflen = Buffer length
  603:  * @offset = Offset from start of file, if =-1 from current position
  604:  * return: NULL error or !=NULL new queued task
  605:  */
  606: inline sched_task_t *
  607: schedAIOWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd, 
  608: 		void *buffer, size_t buflen, off_t offset)
  609: {
  610: 	struct aiocb *acb;
  611: 	off_t off;
  612: 
  613: 	if (!root || !func || !buffer || !buflen)
  614: 		return NULL;
  615: 
  616: 	if (offset == (off_t) -1) {
  617: 		off = lseek(fd, 0, SEEK_CUR);
  618: 		if (off == -1) {
  619: 			LOGERR;
  620: 			return NULL;
  621: 		}
  622: 	} else
  623: 		off = offset;
  624: 
  625: 	if (!(acb = malloc(sizeof(struct aiocb)))) {
  626: 		LOGERR;
  627: 		return NULL;
  628: 	} else
  629: 		memset(acb, 0, sizeof(struct aiocb));
  630: 
  631: 	acb->aio_fildes = fd;
  632: 	acb->aio_nbytes = buflen;
  633: 	acb->aio_buf = buffer;
  634: 	acb->aio_offset = off;
  635: 	acb->aio_sigevent.sigev_notify = SIGEV_KEVENT;
  636: 	acb->aio_sigevent.sigev_notify_kqueue = root->root_kq;
  637: 	acb->aio_sigevent.sigev_value.sival_ptr = acb;
  638: 
  639: 	if (aio_write(acb)) {
  640: 		LOGERR;
  641: 		free(acb);
  642: 		return NULL;
  643: 	}
  644: 
  645: 	return schedAIO(root, func, arg, acb, buffer, buflen);
  646: }
  647: 
  648: #ifdef EVFILT_LIO
  649: /*
  650:  * schedLIO() - Add AIO bulk tasks to scheduler queue
  651:  *
  652:  * @root = root task
  653:  * @func = task execution function
  654:  * @arg = 1st func argument
  655:  * @acbs = AIO cb structure addresses
  656:  * @opt_data = Optional data
  657:  * @opt_dlen = Optional data length
  658:  * return: NULL error or !=NULL new queued task
  659:  */
  660: sched_task_t *
  661: schedLIO(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, 
  662: 		struct aiocb ** __restrict acbs, void *opt_data, size_t opt_dlen)
  663: {
  664: 	sched_task_t *task;
  665: 	void *ptr;
  666: 
  667: 	if (!root || !func || !acbs || !opt_dlen)
  668: 		return NULL;
  669: 
  670: 	/* get new task */
  671: 	if (!(task = _sched_useTask(root)))
  672: 		return NULL;
  673: 
  674: 	task->task_func = func;
  675: 	TASK_TYPE(task) = taskLIO;
  676: 	TASK_ROOT(task) = root;
  677: 
  678: 	TASK_ARG(task) = arg;
  679: 	TASK_VAL(task) = (u_long) acbs;
  680: 
  681: 	TASK_DATA(task) = opt_data;
  682: 	TASK_DATLEN(task) = opt_dlen;
  683: 
  684: 	if (root->root_hooks.hook_add.lio)
  685: 		ptr = root->root_hooks.hook_add.lio(task, NULL);
  686: 	else
  687: 		ptr = NULL;
  688: 
  689: 	if (!ptr) {
  690: #ifdef HAVE_LIBPTHREAD
  691: 		pthread_mutex_lock(&root->root_mtx[taskLIO]);
  692: #endif
  693: 		TAILQ_INSERT_TAIL(&root->root_lio, TASK_ID(task), task_node);
  694: #ifdef HAVE_LIBPTHREAD
  695: 		pthread_mutex_unlock(&root->root_mtx[taskLIO]);
  696: #endif
  697: 	} else
  698: 		task = _sched_unuseTask(task);
  699: 
  700: 	return task;
  701: }
  702: 
  703: /*
  704:  * schedLIORead() - Add list of AIO read tasks to scheduler queue
  705:  *
  706:  * @root = root task
  707:  * @func = task execution function
  708:  * @arg = 1st func argument
  709:  * @fd = file descriptor
  710:  * @bufs = Buffer's list
  711:  * @nbufs = Number of Buffers
  712:  * @offset = Offset from start of file, if =-1 from current position
  713:  * return: NULL error or !=NULL new queued task
  714:  */
  715: sched_task_t *
  716: schedLIORead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd, 
  717: 		struct iovec *bufs, size_t nbufs, off_t offset)
  718: {
  719: 	struct sigevent sig;
  720: 	struct aiocb **acb;
  721: 	off_t off;
  722: 	register int i;
  723: 
  724: 	if (!root || !func || !bufs || !nbufs)
  725: 		return NULL;
  726: 
  727: 	if (offset == (off_t) -1) {
  728: 		off = lseek(fd, 0, SEEK_CUR);
  729: 		if (off == -1) {
  730: 			LOGERR;
  731: 			return NULL;
  732: 		}
  733: 	} else
  734: 		off = offset;
  735: 
  736: 	if (!(acb = calloc(sizeof(void*), nbufs))) {
  737: 		LOGERR;
  738: 		return NULL;
  739: 	} else
  740: 		memset(acb, 0, sizeof(void*) * nbufs);
  741: 	for (i = 0; i < nbufs; off += bufs[i++].iov_len) {
  742: 		acb[i] = malloc(sizeof(struct aiocb));
  743: 		if (!acb[i]) {
  744: 			LOGERR;
  745: 			for (i = 0; i < nbufs; i++)
  746: 				if (acb[i])
  747: 					free(acb[i]);
  748: 			free(acb);
  749: 			return NULL;
  750: 		} else
  751: 			memset(acb[i], 0, sizeof(struct aiocb));
  752: 		acb[i]->aio_fildes = fd;
  753: 		acb[i]->aio_nbytes = bufs[i].iov_len;
  754: 		acb[i]->aio_buf = bufs[i].iov_base;
  755: 		acb[i]->aio_offset = off;
  756: 		acb[i]->aio_lio_opcode = LIO_READ;
  757: 	}
  758: 	memset(&sig, 0, sizeof sig);
  759: 	sig.sigev_notify = SIGEV_KEVENT;
  760: 	sig.sigev_notify_kqueue = root->root_kq;
  761: 	sig.sigev_value.sival_ptr = acb;
  762: 
  763: 	if (lio_listio(LIO_NOWAIT, acb, nbufs, &sig)) {
  764: 		LOGERR;
  765: 		for (i = 0; i < nbufs; i++)
  766: 			if (acb[i])
  767: 				free(acb[i]);
  768: 		free(acb);
  769: 		return NULL;
  770: 	}
  771: 
  772: 	return schedLIO(root, func, arg, (void*) acb, bufs, nbufs);
  773: }
  774: 
  775: /*
  776:  * schedLIOWrite() - Add list of AIO write tasks to scheduler queue
  777:  *
  778:  * @root = root task
  779:  * @func = task execution function
  780:  * @arg = 1st func argument
  781:  * @fd = file descriptor
  782:  * @bufs = Buffer's list
  783:  * @nbufs = Number of Buffers
  784:  * @offset = Offset from start of file, if =-1 from current position
  785:  * return: NULL error or !=NULL new queued task
  786:  */
  787: inline sched_task_t *
  788: schedLIOWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd, 
  789: 		struct iovec *bufs, size_t nbufs, off_t offset)
  790: {
  791: 	struct sigevent sig;
  792: 	struct aiocb **acb;
  793: 	off_t off;
  794: 	register int i;
  795: 
  796: 	if (!root || !func || !bufs || !nbufs)
  797: 		return NULL;
  798: 
  799: 	if (offset == (off_t) -1) {
  800: 		off = lseek(fd, 0, SEEK_CUR);
  801: 		if (off == -1) {
  802: 			LOGERR;
  803: 			return NULL;
  804: 		}
  805: 	} else
  806: 		off = offset;
  807: 
  808: 	if (!(acb = calloc(sizeof(void*), nbufs))) {
  809: 		LOGERR;
  810: 		return NULL;
  811: 	} else
  812: 		memset(acb, 0, sizeof(void*) * nbufs);
  813: 	for (i = 0; i < nbufs; off += bufs[i++].iov_len) {
  814: 		acb[i] = malloc(sizeof(struct aiocb));
  815: 		if (!acb[i]) {
  816: 			LOGERR;
  817: 			for (i = 0; i < nbufs; i++)
  818: 				if (acb[i])
  819: 					free(acb[i]);
  820: 			free(acb);
  821: 			return NULL;
  822: 		} else
  823: 			memset(acb[i], 0, sizeof(struct aiocb));
  824: 		acb[i]->aio_fildes = fd;
  825: 		acb[i]->aio_nbytes = bufs[i].iov_len;
  826: 		acb[i]->aio_buf = bufs[i].iov_base;
  827: 		acb[i]->aio_offset = off;
  828: 		acb[i]->aio_lio_opcode = LIO_WRITE;
  829: 	}
  830: 	memset(&sig, 0, sizeof sig);
  831: 	sig.sigev_notify = SIGEV_KEVENT;
  832: 	sig.sigev_notify_kqueue = root->root_kq;
  833: 	sig.sigev_value.sival_ptr = acb;
  834: 
  835: 	if (lio_listio(LIO_NOWAIT, acb, nbufs, &sig)) {
  836: 		LOGERR;
  837: 		for (i = 0; i < nbufs; i++)
  838: 			if (acb[i])
  839: 				free(acb[i]);
  840: 		free(acb);
  841: 		return NULL;
  842: 	}
  843: 
  844: 	return schedLIO(root, func, arg, (void*) acb, bufs, nbufs);
  845: }
  846: #endif	/* EVFILT_LIO */
  847: #endif	/* AIO_SUPPORT */
  848: 
  849: /*
  850:  * schedTimer() - Add TIMER task to scheduler queue
  851:  *
  852:  * @root = root task
  853:  * @func = task execution function
  854:  * @arg = 1st func argument
  855:  * @ts = timeout argument structure
  856:  * @opt_data = Optional data
  857:  * @opt_dlen = Optional data length
  858:  * return: NULL error or !=NULL new queued task
  859:  */
  860: sched_task_t *
  861: schedTimer(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts, 
  862: 		void *opt_data, size_t opt_dlen)
  863: {
  864: 	sched_task_t *task, *tmp, *t = NULL;
  865: 	void *ptr;
  866: 	struct timespec now;
  867: 
  868: 	if (!root || !func)
  869: 		return NULL;
  870: 
  871: 	/* get new task */
  872: 	if (!(task = _sched_useTask(root)))
  873: 		return NULL;
  874: 
  875: 	task->task_func = func;
  876: 	TASK_TYPE(task) = taskTIMER;
  877: 	TASK_ROOT(task) = root;
  878: 
  879: 	TASK_ARG(task) = arg;
  880: 
  881: 	TASK_DATA(task) = opt_data;
  882: 	TASK_DATLEN(task) = opt_dlen;
  883: 
  884: 	/* calculate timeval structure */
  885: 	clock_gettime(CLOCK_MONOTONIC, &now);
  886: 	now.tv_sec += ts.tv_sec;
  887: 	now.tv_nsec += ts.tv_nsec;
  888: 	if (now.tv_nsec >= 1000000000L) {
  889: 		now.tv_sec++;
  890: 		now.tv_nsec -= 1000000000L;
  891: 	} else if (now.tv_nsec < 0) {
  892: 		now.tv_sec--;
  893: 		now.tv_nsec += 1000000000L;
  894: 	}
  895: 	TASK_TS(task) = now;
  896: 
  897: 	if (root->root_hooks.hook_add.timer)
  898: 		ptr = root->root_hooks.hook_add.timer(task, NULL);
  899: 	else
  900: 		ptr = NULL;
  901: 
  902: 	if (!ptr) {
  903: #ifdef HAVE_LIBPTHREAD
  904: 		pthread_mutex_lock(&root->root_mtx[taskTIMER]);
  905: #endif
  906: #ifdef TIMER_WITHOUT_SORT
  907: 		TAILQ_INSERT_TAIL(&root->root_timer, TASK_ID(task), task_node);
  908: #else
  909: 		TAILQ_FOREACH_SAFE(t, &root->root_timer, task_node, tmp)
  910: 			if (sched_timespeccmp(&TASK_TS(task), &TASK_TS(t), -) < 1)
  911: 				break;
  912: 		if (!t)
  913: 			TAILQ_INSERT_TAIL(&root->root_timer, TASK_ID(task), task_node);
  914: 		else
  915: 			TAILQ_INSERT_BEFORE(t, TASK_ID(task), task_node);
  916: #endif
  917: #ifdef HAVE_LIBPTHREAD
  918: 		pthread_mutex_unlock(&root->root_mtx[taskTIMER]);
  919: #endif
  920: 	} else
  921: 		task = _sched_unuseTask(task);
  922: 
  923: 	return task;
  924: }
  925: 
  926: /*
  927:  * schedEvent() - Add EVENT task to scheduler queue
  928:  *
  929:  * @root = root task
  930:  * @func = task execution function
  931:  * @arg = 1st func argument
  932:  * @val = additional func argument
  933:  * @opt_data = Optional data
  934:  * @opt_dlen = Optional data length
  935:  * return: NULL error or !=NULL new queued task
  936:  */
  937: sched_task_t *
  938: schedEvent(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val, 
  939: 		void *opt_data, size_t opt_dlen)
  940: {
  941: 	sched_task_t *task;
  942: 	void *ptr;
  943: 
  944: 	if (!root || !func)
  945: 		return NULL;
  946: 
  947: 	/* get new task */
  948: 	if (!(task = _sched_useTask(root)))
  949: 		return NULL;
  950: 
  951: 	task->task_func = func;
  952: 	TASK_TYPE(task) = taskEVENT;
  953: 	TASK_ROOT(task) = root;
  954: 
  955: 	TASK_ARG(task) = arg;
  956: 	TASK_VAL(task) = val;
  957: 
  958: 	TASK_DATA(task) = opt_data;
  959: 	TASK_DATLEN(task) = opt_dlen;
  960: 
  961: 	if (root->root_hooks.hook_add.event)
  962: 		ptr = root->root_hooks.hook_add.event(task, NULL);
  963: 	else
  964: 		ptr = NULL;
  965: 
  966: 	if (!ptr) {
  967: #ifdef HAVE_LIBPTHREAD
  968: 		pthread_mutex_lock(&root->root_mtx[taskEVENT]);
  969: #endif
  970: 		TAILQ_INSERT_TAIL(&root->root_event, TASK_ID(task), task_node);
  971: #ifdef HAVE_LIBPTHREAD
  972: 		pthread_mutex_unlock(&root->root_mtx[taskEVENT]);
  973: #endif
  974: 	} else
  975: 		task = _sched_unuseTask(task);
  976: 
  977: 	return task;
  978: }
  979: 
  980: 
  981: /*
  982:  * schedTask() - Add regular task to scheduler queue
  983:  *
  984:  * @root = root task
  985:  * @func = task execution function
  986:  * @arg = 1st func argument
  987:  * @prio = regular task priority, 0 is hi priority for regular tasks
  988:  * @opt_data = Optional data
  989:  * @opt_dlen = Optional data length
  990:  * return: NULL error or !=NULL new queued task
  991:  */
  992: sched_task_t *
  993: schedTask(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long prio, 
  994: 		void *opt_data, size_t opt_dlen)
  995: {
  996: 	sched_task_t *task, *tmp, *t = NULL;
  997: 	void *ptr;
  998: 
  999: 	if (!root || !func)
 1000: 		return NULL;
 1001: 
 1002: 	/* get new task */
 1003: 	if (!(task = _sched_useTask(root)))
 1004: 		return NULL;
 1005: 
 1006: 	task->task_func = func;
 1007: 	TASK_TYPE(task) = taskTASK;
 1008: 	TASK_ROOT(task) = root;
 1009: 
 1010: 	TASK_ARG(task) = arg;
 1011: 	TASK_VAL(task) = prio;
 1012: 
 1013: 	TASK_DATA(task) = opt_data;
 1014: 	TASK_DATLEN(task) = opt_dlen;
 1015: 
 1016: 	if (root->root_hooks.hook_add.task)
 1017: 		ptr = root->root_hooks.hook_add.task(task, NULL);
 1018: 	else
 1019: 		ptr = NULL;
 1020: 
 1021: 	if (!ptr) {
 1022: #ifdef HAVE_LIBPTHREAD
 1023: 		pthread_mutex_lock(&root->root_mtx[taskTASK]);
 1024: #endif
 1025: 		TAILQ_FOREACH_SAFE(t, &root->root_task, task_node, tmp)
 1026: 			if (TASK_VAL(task) < TASK_VAL(t))
 1027: 				break;
 1028: 		if (!t)
 1029: 			TAILQ_INSERT_TAIL(&root->root_task, TASK_ID(task), task_node);
 1030: 		else
 1031: 			TAILQ_INSERT_BEFORE(t, TASK_ID(task), task_node);
 1032: #ifdef HAVE_LIBPTHREAD
 1033: 		pthread_mutex_unlock(&root->root_mtx[taskTASK]);
 1034: #endif
 1035: 	} else
 1036: 		task = _sched_unuseTask(task);
 1037: 
 1038: 	return task;
 1039: }
 1040: 
 1041: /*
 1042:  * schedSuspend() - Add Suspended task to scheduler queue
 1043:  *
 1044:  * @root = root task
 1045:  * @func = task execution function
 1046:  * @arg = 1st func argument
 1047:  * @id = Trigger ID
 1048:  * @opt_data = Optional data
 1049:  * @opt_dlen = Optional data length
 1050:  * return: NULL error or !=NULL new queued task
 1051:  */
 1052: sched_task_t *
 1053: schedSuspend(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id, 
 1054: 		void *opt_data, size_t opt_dlen)
 1055: {
 1056: 	sched_task_t *task;
 1057: 	void *ptr;
 1058: 
 1059: 	if (!root || !func)
 1060: 		return NULL;
 1061: 
 1062: 	/* get new task */
 1063: 	if (!(task = _sched_useTask(root)))
 1064: 		return NULL;
 1065: 
 1066: 	task->task_func = func;
 1067: 	TASK_TYPE(task) = taskSUSPEND;
 1068: 	TASK_ROOT(task) = root;
 1069: 
 1070: 	TASK_ARG(task) = arg;
 1071: 	TASK_VAL(task) = id;
 1072: 
 1073: 	TASK_DATA(task) = opt_data;
 1074: 	TASK_DATLEN(task) = opt_dlen;
 1075: 
 1076: 	if (root->root_hooks.hook_add.suspend)
 1077: 		ptr = root->root_hooks.hook_add.suspend(task, NULL);
 1078: 	else
 1079: 		ptr = NULL;
 1080: 
 1081: 	if (!ptr) {
 1082: #ifdef HAVE_LIBPTHREAD
 1083: 		pthread_mutex_lock(&root->root_mtx[taskSUSPEND]);
 1084: #endif
 1085: 		TAILQ_INSERT_TAIL(&root->root_suspend, TASK_ID(task), task_node);
 1086: #ifdef HAVE_LIBPTHREAD
 1087: 		pthread_mutex_unlock(&root->root_mtx[taskSUSPEND]);
 1088: #endif
 1089: 	} else
 1090: 		task = _sched_unuseTask(task);
 1091: 
 1092: 	return task;
 1093: }
 1094: 
 1095: /*
 1096:  * schedCallOnce() - Call once from scheduler
 1097:  *
 1098:  * @root = root task
 1099:  * @func = task execution function
 1100:  * @arg = 1st func argument
 1101:  * @val = additional func argument
 1102:  * @opt_data = Optional data
 1103:  * @opt_dlen = Optional data length
 1104:  * return: return value from called func
 1105:  */
 1106: sched_task_t *
 1107: schedCallOnce(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val, 
 1108: 		void *opt_data, size_t opt_dlen)
 1109: {
 1110: 	sched_task_t *task;
 1111: 	void *ret;
 1112: 
 1113: 	if (!root || !func)
 1114: 		return NULL;
 1115: 
 1116: 	/* get new task */
 1117: 	if (!(task = _sched_useTask(root)))
 1118: 		return NULL;
 1119: 
 1120: 	task->task_func = func;
 1121: 	TASK_TYPE(task) = taskEVENT;
 1122: 	TASK_ROOT(task) = root;
 1123: 
 1124: 	TASK_ARG(task) = arg;
 1125: 	TASK_VAL(task) = val;
 1126: 
 1127: 	TASK_DATA(task) = opt_data;
 1128: 	TASK_DATLEN(task) = opt_dlen;
 1129: 
 1130: 	ret = schedCall(task);
 1131: 
 1132: 	_sched_unuseTask(task);
 1133: 	return ret;
 1134: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>