Diff for /libaitsched/src/tasks.c between versions 1.8.2.2 and 1.15

version 1.8.2.2, 2012/05/31 14:45:10 version 1.15, 2012/09/10 15:07:53
Line 46  SUCH DAMAGE. Line 46  SUCH DAMAGE.
 #include "global.h"  #include "global.h"
   
   
#pragma GCC visibility push(hidden)/*
 * sched_useTask() - Get and init new task
  *
  * @root = root task
  * return: NULL error or !=NULL prepared task
  */
 inline sched_task_t *  inline sched_task_t *
_sched_useTask(sched_root_task_t * __restrict root)sched_useTask(sched_root_task_t * __restrict root)
 {  {
         sched_task_t *task, *tmp;          sched_task_t *task, *tmp;
   
         TAILQ_FOREACH_SAFE(task, &root->root_unuse, task_node, tmp) {  
                 if (!TASK_ISLOCKED(task)) {  
 #ifdef HAVE_LIBPTHREAD  #ifdef HAVE_LIBPTHREAD
                        pthread_mutex_lock(&root->root_mtx[taskUNUSE]);        pthread_mutex_lock(&root->root_mtx[taskUNUSE]);
 #endif  #endif
           TAILQ_FOREACH_SAFE(task, &root->root_unuse, task_node, tmp) {
                   if (!TASK_ISLOCKED(task)) {
                         TAILQ_REMOVE(&root->root_unuse, task, task_node);                          TAILQ_REMOVE(&root->root_unuse, task, task_node);
 #ifdef HAVE_LIBPTHREAD  
                         pthread_mutex_unlock(&root->root_mtx[taskUNUSE]);  
 #endif  
                         break;                          break;
                 }                  }
         }          }
   #ifdef HAVE_LIBPTHREAD
           pthread_mutex_unlock(&root->root_mtx[taskUNUSE]);
   #endif
   
         if (!task) {          if (!task) {
                 task = malloc(sizeof(sched_task_t));                  task = malloc(sizeof(sched_task_t));
Line 74  _sched_useTask(sched_root_task_t * __restrict root) Line 78  _sched_useTask(sched_root_task_t * __restrict root)
                 }                  }
         }          }
   
           memset(task, 0, sizeof(sched_task_t));
           task->task_id = (uintptr_t) task;
         return task;          return task;
 }  }
   
   /*
    * sched_unuseTask() - Unlock and put task to unuse queue
    *
    * @task = task
    * return: always is NULL
    */
 inline sched_task_t *  inline sched_task_t *
_sched_unuseTask(sched_task_t * __restrict task)sched_unuseTask(sched_task_t * __restrict task)
 {  {
         TASK_UNLOCK(task);          TASK_UNLOCK(task);
         TASK_TYPE(task) = taskUNUSE;          TASK_TYPE(task) = taskUNUSE;
 #ifdef HAVE_LIBPTHREAD  #ifdef HAVE_LIBPTHREAD
         pthread_mutex_lock(&TASK_ROOT(task)->root_mtx[taskUNUSE]);          pthread_mutex_lock(&TASK_ROOT(task)->root_mtx[taskUNUSE]);
 #endif  #endif
        TAILQ_INSERT_TAIL(&TASK_ROOT(task)->root_unuse, task, task_node);        TAILQ_INSERT_TAIL(&TASK_ROOT(task)->root_unuse, TASK_ID(task), task_node);
 #ifdef HAVE_LIBPTHREAD  #ifdef HAVE_LIBPTHREAD
         pthread_mutex_unlock(&TASK_ROOT(task)->root_mtx[taskUNUSE]);          pthread_mutex_unlock(&TASK_ROOT(task)->root_mtx[taskUNUSE]);
 #endif  #endif
Line 94  _sched_unuseTask(sched_task_t * __restrict task) Line 106  _sched_unuseTask(sched_task_t * __restrict task)
         return task;          return task;
 }  }
   
   #pragma GCC visibility push(hidden)
   
   #ifdef HAVE_LIBPTHREAD
   static void
   _sched_threadCleanup(sched_task_t *t)
   {
           if (!t || !TASK_ROOT(t))
                   return;
   
           if (TASK_FLAG(t) == PTHREAD_CREATE_JOINABLE)
                   pthread_detach(pthread_self());
   
           pthread_mutex_lock(&TASK_ROOT(t)->root_mtx[taskTHREAD]);
           TAILQ_REMOVE(&TASK_ROOT(t)->root_thread, t, task_node);
           pthread_mutex_unlock(&TASK_ROOT(t)->root_mtx[taskTHREAD]);
   
           sched_unuseTask(t);
   }
   void *
   _sched_threadWrapper(sched_task_t *t)
   {
           void *ret = NULL;
           sem_t *s = NULL;
   
           if (!t || !TASK_ROOT(t) || !TASK_RET(t))
                   pthread_exit(ret);
           else
                   s = (sem_t*) TASK_RET(t);
   
           pthread_cleanup_push((void (*)(void*)) _sched_threadCleanup, t);
   
           pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
           pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
   
           /* notify parent, thread is ready for execution */
           sem_post(s);
           pthread_testcancel();
   
           ret = TASK_FUNC(t)(t);
   
           pthread_cleanup_pop(42);
           TASK_ROOT(t)->root_ret = ret;
           pthread_exit(ret);
   }
   #endif
   
 #pragma GCC visibility pop  #pragma GCC visibility pop
   
   /*
    * sched_taskExit() - Exit routine for scheduler task, explicit required for thread tasks
    *
    * @task = current task
    * @retcode = return code
    * return: return code
    */
   inline void *
   sched_taskExit(sched_task_t *task, intptr_t retcode)
   {
           if (!task || !TASK_ROOT(task))
                   return (void*) -1;
   
           if (TASK_ROOT(task)->root_hooks.hook_exec.exit)
                   TASK_ROOT(task)->root_hooks.hook_exec.exit(task, (void*) retcode);
   
           TASK_ROOT(task)->root_ret = (void*) retcode;
           return (void*) retcode;
   }
   
   
 /*  /*
  * schedRead() - Add READ I/O task to scheduler queue   * schedRead() - Add READ I/O task to scheduler queue
  *   *
Line 119  schedRead(sched_root_task_t * __restrict root, sched_t Line 197  schedRead(sched_root_task_t * __restrict root, sched_t
                 return NULL;                  return NULL;
   
         /* get new task */          /* get new task */
        if (!(task = _sched_useTask(root)))        if (!(task = sched_useTask(root)))
                 return NULL;                  return NULL;
   
         memset(task, 0, sizeof(sched_task_t));  
         task->task_id = 0;  
         task->task_lock = 0;  
         task->task_func = func;          task->task_func = func;
         TASK_TYPE(task) = taskREAD;          TASK_TYPE(task) = taskREAD;
         TASK_ROOT(task) = root;          TASK_ROOT(task) = root;
Line 144  schedRead(sched_root_task_t * __restrict root, sched_t Line 219  schedRead(sched_root_task_t * __restrict root, sched_t
 #ifdef HAVE_LIBPTHREAD  #ifdef HAVE_LIBPTHREAD
                 pthread_mutex_lock(&root->root_mtx[taskREAD]);                  pthread_mutex_lock(&root->root_mtx[taskREAD]);
 #endif  #endif
                TAILQ_INSERT_TAIL(&root->root_read, task, task_node);                TAILQ_INSERT_TAIL(&root->root_read, TASK_ID(task), task_node);
 #ifdef HAVE_LIBPTHREAD  #ifdef HAVE_LIBPTHREAD
                 pthread_mutex_unlock(&root->root_mtx[taskREAD]);                  pthread_mutex_unlock(&root->root_mtx[taskREAD]);
 #endif  #endif
         } else          } else
                task = _sched_unuseTask(task);                task = sched_unuseTask(task);
   
         return task;          return task;
 }  }
Line 176  schedWrite(sched_root_task_t * __restrict root, sched_ Line 251  schedWrite(sched_root_task_t * __restrict root, sched_
                 return NULL;                  return NULL;
   
         /* get new task */          /* get new task */
        if (!(task = _sched_useTask(root)))        if (!(task = sched_useTask(root)))
                 return NULL;                  return NULL;
   
         memset(task, 0, sizeof(sched_task_t));  
         task->task_id = 0;  
         task->task_lock = 0;  
         task->task_func = func;          task->task_func = func;
         TASK_TYPE(task) = taskWRITE;          TASK_TYPE(task) = taskWRITE;
         TASK_ROOT(task) = root;          TASK_ROOT(task) = root;
Line 201  schedWrite(sched_root_task_t * __restrict root, sched_ Line 273  schedWrite(sched_root_task_t * __restrict root, sched_
 #ifdef HAVE_LIBPTHREAD  #ifdef HAVE_LIBPTHREAD
                 pthread_mutex_lock(&root->root_mtx[taskWRITE]);                  pthread_mutex_lock(&root->root_mtx[taskWRITE]);
 #endif  #endif
                TAILQ_INSERT_TAIL(&root->root_write, task, task_node);                TAILQ_INSERT_TAIL(&root->root_write, TASK_ID(task), task_node);
 #ifdef HAVE_LIBPTHREAD  #ifdef HAVE_LIBPTHREAD
                 pthread_mutex_unlock(&root->root_mtx[taskWRITE]);                  pthread_mutex_unlock(&root->root_mtx[taskWRITE]);
 #endif  #endif
         } else          } else
                task = _sched_unuseTask(task);                task = sched_unuseTask(task);
   
         return task;          return task;
 }  }
Line 233  schedNode(sched_root_task_t * __restrict root, sched_t Line 305  schedNode(sched_root_task_t * __restrict root, sched_t
                 return NULL;                  return NULL;
   
         /* get new task */          /* get new task */
        if (!(task = _sched_useTask(root)))        if (!(task = sched_useTask(root)))
                 return NULL;                  return NULL;
   
         memset(task, 0, sizeof(sched_task_t));  
         task->task_id = 0;  
         task->task_lock = 0;  
         task->task_func = func;          task->task_func = func;
         TASK_TYPE(task) = taskNODE;          TASK_TYPE(task) = taskNODE;
         TASK_ROOT(task) = root;          TASK_ROOT(task) = root;
Line 258  schedNode(sched_root_task_t * __restrict root, sched_t Line 327  schedNode(sched_root_task_t * __restrict root, sched_t
 #ifdef HAVE_LIBPTHREAD  #ifdef HAVE_LIBPTHREAD
                 pthread_mutex_lock(&root->root_mtx[taskNODE]);                  pthread_mutex_lock(&root->root_mtx[taskNODE]);
 #endif  #endif
                TAILQ_INSERT_TAIL(&root->root_node, task, task_node);                TAILQ_INSERT_TAIL(&root->root_node, TASK_ID(task), task_node);
 #ifdef HAVE_LIBPTHREAD  #ifdef HAVE_LIBPTHREAD
                 pthread_mutex_unlock(&root->root_mtx[taskNODE]);                  pthread_mutex_unlock(&root->root_mtx[taskNODE]);
 #endif  #endif
         } else          } else
                task = _sched_unuseTask(task);                task = sched_unuseTask(task);
   
         return task;          return task;
 }  }
Line 290  schedProc(sched_root_task_t * __restrict root, sched_t Line 359  schedProc(sched_root_task_t * __restrict root, sched_t
                 return NULL;                  return NULL;
   
         /* get new task */          /* get new task */
        if (!(task = _sched_useTask(root)))        if (!(task = sched_useTask(root)))
                 return NULL;                  return NULL;
   
         memset(task, 0, sizeof(sched_task_t));  
         task->task_id = 0;  
         task->task_lock = 0;  
         task->task_func = func;          task->task_func = func;
         TASK_TYPE(task) = taskPROC;          TASK_TYPE(task) = taskPROC;
         TASK_ROOT(task) = root;          TASK_ROOT(task) = root;
Line 315  schedProc(sched_root_task_t * __restrict root, sched_t Line 381  schedProc(sched_root_task_t * __restrict root, sched_t
 #ifdef HAVE_LIBPTHREAD  #ifdef HAVE_LIBPTHREAD
                 pthread_mutex_lock(&root->root_mtx[taskPROC]);                  pthread_mutex_lock(&root->root_mtx[taskPROC]);
 #endif  #endif
                TAILQ_INSERT_TAIL(&root->root_proc, task, task_node);                TAILQ_INSERT_TAIL(&root->root_proc, TASK_ID(task), task_node);
 #ifdef HAVE_LIBPTHREAD  #ifdef HAVE_LIBPTHREAD
                 pthread_mutex_unlock(&root->root_mtx[taskPROC]);                  pthread_mutex_unlock(&root->root_mtx[taskPROC]);
 #endif  #endif
         } else          } else
                task = _sched_unuseTask(task);                task = sched_unuseTask(task);
   
         return task;          return task;
 }  }
Line 340  sched_task_t * Line 406  sched_task_t *
 schedUser(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id,   schedUser(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id, 
                 void *opt_data, size_t opt_dlen)                  void *opt_data, size_t opt_dlen)
 {  {
   #ifndef EVFILT_USER
           sched_SetErr(ENOTSUP, "Not supported kevent() filter");
           return NULL;
   #else
         sched_task_t *task;          sched_task_t *task;
         void *ptr;          void *ptr;
   
Line 347  schedUser(sched_root_task_t * __restrict root, sched_t Line 417  schedUser(sched_root_task_t * __restrict root, sched_t
                 return NULL;                  return NULL;
   
         /* get new task */          /* get new task */
        if (!(task = _sched_useTask(root)))        if (!(task = sched_useTask(root)))
                 return NULL;                  return NULL;
   
         memset(task, 0, sizeof(sched_task_t));  
         task->task_id = 0;  
         task->task_lock = 0;  
         task->task_func = func;          task->task_func = func;
         TASK_TYPE(task) = taskUSER;          TASK_TYPE(task) = taskUSER;
         TASK_ROOT(task) = root;          TASK_ROOT(task) = root;
Line 372  schedUser(sched_root_task_t * __restrict root, sched_t Line 439  schedUser(sched_root_task_t * __restrict root, sched_t
 #ifdef HAVE_LIBPTHREAD  #ifdef HAVE_LIBPTHREAD
                 pthread_mutex_lock(&root->root_mtx[taskUSER]);                  pthread_mutex_lock(&root->root_mtx[taskUSER]);
 #endif  #endif
                TAILQ_INSERT_TAIL(&root->root_user, task, task_node);                TAILQ_INSERT_TAIL(&root->root_user, TASK_ID(task), task_node);
 #ifdef HAVE_LIBPTHREAD  #ifdef HAVE_LIBPTHREAD
                 pthread_mutex_unlock(&root->root_mtx[taskUSER]);                  pthread_mutex_unlock(&root->root_mtx[taskUSER]);
 #endif  #endif
         } else          } else
                task = _sched_unuseTask(task);                task = sched_unuseTask(task);
   
         return task;          return task;
   #endif
 }  }
   
 /*  /*
Line 404  schedSignal(sched_root_task_t * __restrict root, sched Line 472  schedSignal(sched_root_task_t * __restrict root, sched
                 return NULL;                  return NULL;
   
         /* get new task */          /* get new task */
        if (!(task = _sched_useTask(root)))        if (!(task = sched_useTask(root)))
                 return NULL;                  return NULL;
   
         memset(task, 0, sizeof(sched_task_t));  
         task->task_id = 0;  
         task->task_lock = 0;  
         task->task_func = func;          task->task_func = func;
         TASK_TYPE(task) = taskSIGNAL;          TASK_TYPE(task) = taskSIGNAL;
         TASK_ROOT(task) = root;          TASK_ROOT(task) = root;
Line 429  schedSignal(sched_root_task_t * __restrict root, sched Line 494  schedSignal(sched_root_task_t * __restrict root, sched
 #ifdef HAVE_LIBPTHREAD  #ifdef HAVE_LIBPTHREAD
                 pthread_mutex_lock(&root->root_mtx[taskSIGNAL]);                  pthread_mutex_lock(&root->root_mtx[taskSIGNAL]);
 #endif  #endif
                TAILQ_INSERT_TAIL(&root->root_signal, task, task_node);                TAILQ_INSERT_TAIL(&root->root_signal, TASK_ID(task), task_node);
 #ifdef HAVE_LIBPTHREAD  #ifdef HAVE_LIBPTHREAD
                 pthread_mutex_unlock(&root->root_mtx[taskSIGNAL]);                  pthread_mutex_unlock(&root->root_mtx[taskSIGNAL]);
 #endif  #endif
         } else          } else
                task = _sched_unuseTask(task);                task = sched_unuseTask(task);
   
         return task;          return task;
 }  }
Line 461  schedAlarm(sched_root_task_t * __restrict root, sched_ Line 526  schedAlarm(sched_root_task_t * __restrict root, sched_
                 return NULL;                  return NULL;
   
         /* get new task */          /* get new task */
        if (!(task = _sched_useTask(root)))        if (!(task = sched_useTask(root)))
                 return NULL;                  return NULL;
   
         memset(task, 0, sizeof(sched_task_t));  
         task->task_id = 0;  
         task->task_lock = 0;  
         task->task_func = func;          task->task_func = func;
         TASK_TYPE(task) = taskALARM;          TASK_TYPE(task) = taskALARM;
         TASK_ROOT(task) = root;          TASK_ROOT(task) = root;
Line 486  schedAlarm(sched_root_task_t * __restrict root, sched_ Line 548  schedAlarm(sched_root_task_t * __restrict root, sched_
 #ifdef HAVE_LIBPTHREAD  #ifdef HAVE_LIBPTHREAD
                 pthread_mutex_lock(&root->root_mtx[taskALARM]);                  pthread_mutex_lock(&root->root_mtx[taskALARM]);
 #endif  #endif
                TAILQ_INSERT_TAIL(&root->root_alarm, task, task_node);                TAILQ_INSERT_TAIL(&root->root_alarm, TASK_ID(task), task_node);
 #ifdef HAVE_LIBPTHREAD  #ifdef HAVE_LIBPTHREAD
                 pthread_mutex_unlock(&root->root_mtx[taskALARM]);                  pthread_mutex_unlock(&root->root_mtx[taskALARM]);
 #endif  #endif
         } else          } else
                task = _sched_unuseTask(task);                task = sched_unuseTask(task);
   
         return task;          return task;
 }  }
   
   #ifdef AIO_SUPPORT
 /*  /*
    * schedAIO() - Add AIO task to scheduler queue
    *
    * @root = root task
    * @func = task execution function
    * @arg = 1st func argument
    * @acb = AIO cb structure address
    * @opt_data = Optional data
    * @opt_dlen = Optional data length
    * return: NULL error or !=NULL new queued task
    */
   sched_task_t *
   schedAIO(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, 
                   struct aiocb * __restrict acb, void *opt_data, size_t opt_dlen)
   {
           sched_task_t *task;
           void *ptr;
   
           if (!root || !func || !acb || !opt_dlen)
                   return NULL;
   
           /* get new task */
           if (!(task = sched_useTask(root)))
                   return NULL;
   
           task->task_func = func;
           TASK_TYPE(task) = taskAIO;
           TASK_ROOT(task) = root;
   
           TASK_ARG(task) = arg;
           TASK_VAL(task) = (u_long) acb;
   
           TASK_DATA(task) = opt_data;
           TASK_DATLEN(task) = opt_dlen;
   
           if (root->root_hooks.hook_add.aio)
                   ptr = root->root_hooks.hook_add.aio(task, NULL);
           else
                   ptr = NULL;
   
           if (!ptr) {
   #ifdef HAVE_LIBPTHREAD
                   pthread_mutex_lock(&root->root_mtx[taskAIO]);
   #endif
                   TAILQ_INSERT_TAIL(&root->root_aio, TASK_ID(task), task_node);
   #ifdef HAVE_LIBPTHREAD
                   pthread_mutex_unlock(&root->root_mtx[taskAIO]);
   #endif
           } else
                   task = sched_unuseTask(task);
   
           return task;
   }
   
   /*
    * schedAIORead() - Add AIO read task to scheduler queue
    *
    * @root = root task
    * @func = task execution function
    * @arg = 1st func argument
    * @fd = file descriptor
    * @buffer = Buffer
    * @buflen = Buffer length
    * @offset = Offset from start of file, if =-1 from current position
    * return: NULL error or !=NULL new queued task
    */
   inline sched_task_t *
   schedAIORead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd, 
                   void *buffer, size_t buflen, off_t offset)
   {
           struct aiocb *acb;
           off_t off;
   
           if (!root || !func || !buffer || !buflen)
                   return NULL;
   
           if (offset == (off_t) -1) {
                   off = lseek(fd, 0, SEEK_CUR);
                   if (off == -1) {
                           LOGERR;
                           return NULL;
                   }
           } else
                   off = offset;
   
           if (!(acb = malloc(sizeof(struct aiocb)))) {
                   LOGERR;
                   return NULL;
           } else
                   memset(acb, 0, sizeof(struct aiocb));
   
           acb->aio_fildes = fd;
           acb->aio_nbytes = buflen;
           acb->aio_buf = buffer;
           acb->aio_offset = off;
           acb->aio_sigevent.sigev_notify = SIGEV_KEVENT;
           acb->aio_sigevent.sigev_notify_kqueue = root->root_kq;
           acb->aio_sigevent.sigev_value.sival_ptr = acb;
   
           if (aio_read(acb)) {
                   LOGERR;
                   free(acb);
                   return NULL;
           }
   
           return schedAIO(root, func, arg, acb, buffer, buflen);
   }
   
   /*
    * schedAIOWrite() - Add AIO write task to scheduler queue
    *
    * @root = root task
    * @func = task execution function
    * @arg = 1st func argument
    * @fd = file descriptor
    * @buffer = Buffer
    * @buflen = Buffer length
    * @offset = Offset from start of file, if =-1 from current position
    * return: NULL error or !=NULL new queued task
    */
   inline sched_task_t *
   schedAIOWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd, 
                   void *buffer, size_t buflen, off_t offset)
   {
           struct aiocb *acb;
           off_t off;
   
           if (!root || !func || !buffer || !buflen)
                   return NULL;
   
           if (offset == (off_t) -1) {
                   off = lseek(fd, 0, SEEK_CUR);
                   if (off == -1) {
                           LOGERR;
                           return NULL;
                   }
           } else
                   off = offset;
   
           if (!(acb = malloc(sizeof(struct aiocb)))) {
                   LOGERR;
                   return NULL;
           } else
                   memset(acb, 0, sizeof(struct aiocb));
   
           acb->aio_fildes = fd;
           acb->aio_nbytes = buflen;
           acb->aio_buf = buffer;
           acb->aio_offset = off;
           acb->aio_sigevent.sigev_notify = SIGEV_KEVENT;
           acb->aio_sigevent.sigev_notify_kqueue = root->root_kq;
           acb->aio_sigevent.sigev_value.sival_ptr = acb;
   
           if (aio_write(acb)) {
                   LOGERR;
                   free(acb);
                   return NULL;
           }
   
           return schedAIO(root, func, arg, acb, buffer, buflen);
   }
   
   #ifdef EVFILT_LIO
   /*
    * schedLIO() - Add AIO bulk tasks to scheduler queue
    *
    * @root = root task
    * @func = task execution function
    * @arg = 1st func argument
    * @acbs = AIO cb structure addresses
    * @opt_data = Optional data
    * @opt_dlen = Optional data length
    * return: NULL error or !=NULL new queued task
    */
   sched_task_t *
   schedLIO(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, 
                   struct aiocb ** __restrict acbs, void *opt_data, size_t opt_dlen)
   {
           sched_task_t *task;
           void *ptr;
   
           if (!root || !func || !acbs || !opt_dlen)
                   return NULL;
   
           /* get new task */
           if (!(task = sched_useTask(root)))
                   return NULL;
   
           task->task_func = func;
           TASK_TYPE(task) = taskLIO;
           TASK_ROOT(task) = root;
   
           TASK_ARG(task) = arg;
           TASK_VAL(task) = (u_long) acbs;
   
           TASK_DATA(task) = opt_data;
           TASK_DATLEN(task) = opt_dlen;
   
           if (root->root_hooks.hook_add.lio)
                   ptr = root->root_hooks.hook_add.lio(task, NULL);
           else
                   ptr = NULL;
   
           if (!ptr) {
   #ifdef HAVE_LIBPTHREAD
                   pthread_mutex_lock(&root->root_mtx[taskLIO]);
   #endif
                   TAILQ_INSERT_TAIL(&root->root_lio, TASK_ID(task), task_node);
   #ifdef HAVE_LIBPTHREAD
                   pthread_mutex_unlock(&root->root_mtx[taskLIO]);
   #endif
           } else
                   task = sched_unuseTask(task);
   
           return task;
   }
   
   /*
    * schedLIORead() - Add list of AIO read tasks to scheduler queue
    *
    * @root = root task
    * @func = task execution function
    * @arg = 1st func argument
    * @fd = file descriptor
    * @bufs = Buffer's list
    * @nbufs = Number of Buffers
    * @offset = Offset from start of file, if =-1 from current position
    * return: NULL error or !=NULL new queued task
    */
   sched_task_t *
   schedLIORead(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd, 
                   struct iovec *bufs, size_t nbufs, off_t offset)
   {
           struct sigevent sig;
           struct aiocb **acb;
           off_t off;
           register int i;
   
           if (!root || !func || !bufs || !nbufs)
                   return NULL;
   
           if (offset == (off_t) -1) {
                   off = lseek(fd, 0, SEEK_CUR);
                   if (off == -1) {
                           LOGERR;
                           return NULL;
                   }
           } else
                   off = offset;
   
           if (!(acb = calloc(sizeof(void*), nbufs))) {
                   LOGERR;
                   return NULL;
           } else
                   memset(acb, 0, sizeof(void*) * nbufs);
           for (i = 0; i < nbufs; off += bufs[i++].iov_len) {
                   acb[i] = malloc(sizeof(struct aiocb));
                   if (!acb[i]) {
                           LOGERR;
                           for (i = 0; i < nbufs; i++)
                                   if (acb[i])
                                           free(acb[i]);
                           free(acb);
                           return NULL;
                   } else
                           memset(acb[i], 0, sizeof(struct aiocb));
                   acb[i]->aio_fildes = fd;
                   acb[i]->aio_nbytes = bufs[i].iov_len;
                   acb[i]->aio_buf = bufs[i].iov_base;
                   acb[i]->aio_offset = off;
                   acb[i]->aio_lio_opcode = LIO_READ;
           }
           memset(&sig, 0, sizeof sig);
           sig.sigev_notify = SIGEV_KEVENT;
           sig.sigev_notify_kqueue = root->root_kq;
           sig.sigev_value.sival_ptr = acb;
   
           if (lio_listio(LIO_NOWAIT, acb, nbufs, &sig)) {
                   LOGERR;
                   for (i = 0; i < nbufs; i++)
                           if (acb[i])
                                   free(acb[i]);
                   free(acb);
                   return NULL;
           }
   
           return schedLIO(root, func, arg, (void*) acb, bufs, nbufs);
   }
   
   /*
    * schedLIOWrite() - Add list of AIO write tasks to scheduler queue
    *
    * @root = root task
    * @func = task execution function
    * @arg = 1st func argument
    * @fd = file descriptor
    * @bufs = Buffer's list
    * @nbufs = Number of Buffers
    * @offset = Offset from start of file, if =-1 from current position
    * return: NULL error or !=NULL new queued task
    */
   inline sched_task_t *
   schedLIOWrite(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int fd, 
                   struct iovec *bufs, size_t nbufs, off_t offset)
   {
           struct sigevent sig;
           struct aiocb **acb;
           off_t off;
           register int i;
   
           if (!root || !func || !bufs || !nbufs)
                   return NULL;
   
           if (offset == (off_t) -1) {
                   off = lseek(fd, 0, SEEK_CUR);
                   if (off == -1) {
                           LOGERR;
                           return NULL;
                   }
           } else
                   off = offset;
   
           if (!(acb = calloc(sizeof(void*), nbufs))) {
                   LOGERR;
                   return NULL;
           } else
                   memset(acb, 0, sizeof(void*) * nbufs);
           for (i = 0; i < nbufs; off += bufs[i++].iov_len) {
                   acb[i] = malloc(sizeof(struct aiocb));
                   if (!acb[i]) {
                           LOGERR;
                           for (i = 0; i < nbufs; i++)
                                   if (acb[i])
                                           free(acb[i]);
                           free(acb);
                           return NULL;
                   } else
                           memset(acb[i], 0, sizeof(struct aiocb));
                   acb[i]->aio_fildes = fd;
                   acb[i]->aio_nbytes = bufs[i].iov_len;
                   acb[i]->aio_buf = bufs[i].iov_base;
                   acb[i]->aio_offset = off;
                   acb[i]->aio_lio_opcode = LIO_WRITE;
           }
           memset(&sig, 0, sizeof sig);
           sig.sigev_notify = SIGEV_KEVENT;
           sig.sigev_notify_kqueue = root->root_kq;
           sig.sigev_value.sival_ptr = acb;
   
           if (lio_listio(LIO_NOWAIT, acb, nbufs, &sig)) {
                   LOGERR;
                   for (i = 0; i < nbufs; i++)
                           if (acb[i])
                                   free(acb[i]);
                   free(acb);
                   return NULL;
           }
   
           return schedLIO(root, func, arg, (void*) acb, bufs, nbufs);
   }
   #endif  /* EVFILT_LIO */
   #endif  /* AIO_SUPPORT */
   
   /*
  * schedTimer() - Add TIMER task to scheduler queue   * schedTimer() - Add TIMER task to scheduler queue
  *   *
  * @root = root task   * @root = root task
Line 511  sched_task_t * Line 937  sched_task_t *
 schedTimer(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts,   schedTimer(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, struct timespec ts, 
                 void *opt_data, size_t opt_dlen)                  void *opt_data, size_t opt_dlen)
 {  {
        sched_task_t *task, *t = NULL;        sched_task_t *task, *tmp, *t = NULL;
         void *ptr;          void *ptr;
         struct timespec now;          struct timespec now;
   
Line 519  schedTimer(sched_root_task_t * __restrict root, sched_ Line 945  schedTimer(sched_root_task_t * __restrict root, sched_
                 return NULL;                  return NULL;
   
         /* get new task */          /* get new task */
        if (!(task = _sched_useTask(root)))        if (!(task = sched_useTask(root)))
                 return NULL;                  return NULL;
   
         memset(task, 0, sizeof(sched_task_t));  
         task->task_id = 0;  
         task->task_lock = 0;  
         task->task_func = func;          task->task_func = func;
         TASK_TYPE(task) = taskTIMER;          TASK_TYPE(task) = taskTIMER;
         TASK_ROOT(task) = root;          TASK_ROOT(task) = root;
Line 557  schedTimer(sched_root_task_t * __restrict root, sched_ Line 980  schedTimer(sched_root_task_t * __restrict root, sched_
                 pthread_mutex_lock(&root->root_mtx[taskTIMER]);                  pthread_mutex_lock(&root->root_mtx[taskTIMER]);
 #endif  #endif
 #ifdef TIMER_WITHOUT_SORT  #ifdef TIMER_WITHOUT_SORT
                TAILQ_INSERT_TAIL(&root->root_timer, task, task_node);                TAILQ_INSERT_TAIL(&root->root_timer, TASK_ID(task), task_node);
 #else  #else
                TAILQ_FOREACH(t, &root->root_timer, task_node)                TAILQ_FOREACH_SAFE(t, &root->root_timer, task_node, tmp)
                         if (sched_timespeccmp(&TASK_TS(task), &TASK_TS(t), -) < 1)                          if (sched_timespeccmp(&TASK_TS(task), &TASK_TS(t), -) < 1)
                                 break;                                  break;
                 if (!t)                  if (!t)
                        TAILQ_INSERT_TAIL(&root->root_timer, task, task_node);                        TAILQ_INSERT_TAIL(&root->root_timer, TASK_ID(task), task_node);
                 else                  else
                        TAILQ_INSERT_BEFORE(t, task, task_node);                        TAILQ_INSERT_BEFORE(t, TASK_ID(task), task_node);
 #endif  #endif
 #ifdef HAVE_LIBPTHREAD  #ifdef HAVE_LIBPTHREAD
                 pthread_mutex_unlock(&root->root_mtx[taskTIMER]);                  pthread_mutex_unlock(&root->root_mtx[taskTIMER]);
 #endif  #endif
         } else          } else
                task = _sched_unuseTask(task);                task = sched_unuseTask(task);
   
         return task;          return task;
 }  }
Line 598  schedEvent(sched_root_task_t * __restrict root, sched_ Line 1021  schedEvent(sched_root_task_t * __restrict root, sched_
                 return NULL;                  return NULL;
   
         /* get new task */          /* get new task */
        if (!(task = _sched_useTask(root)))        if (!(task = sched_useTask(root)))
                 return NULL;                  return NULL;
   
         memset(task, 0, sizeof(sched_task_t));  
         task->task_id = 0;  
         task->task_lock = 0;  
         task->task_func = func;          task->task_func = func;
         TASK_TYPE(task) = taskEVENT;          TASK_TYPE(task) = taskEVENT;
         TASK_ROOT(task) = root;          TASK_ROOT(task) = root;
Line 623  schedEvent(sched_root_task_t * __restrict root, sched_ Line 1043  schedEvent(sched_root_task_t * __restrict root, sched_
 #ifdef HAVE_LIBPTHREAD  #ifdef HAVE_LIBPTHREAD
                 pthread_mutex_lock(&root->root_mtx[taskEVENT]);                  pthread_mutex_lock(&root->root_mtx[taskEVENT]);
 #endif  #endif
                TAILQ_INSERT_TAIL(&root->root_event, task, task_node);                TAILQ_INSERT_TAIL(&root->root_event, TASK_ID(task), task_node);
 #ifdef HAVE_LIBPTHREAD  #ifdef HAVE_LIBPTHREAD
                 pthread_mutex_unlock(&root->root_mtx[taskEVENT]);                  pthread_mutex_unlock(&root->root_mtx[taskEVENT]);
 #endif  #endif
         } else          } else
                task = _sched_unuseTask(task);                task = sched_unuseTask(task);
   
         return task;          return task;
 }  }
   
   
 /*  /*
 * schedEventLo() - Add EVENT_Lo task to scheduler queue * schedTask() - Add regular task to scheduler queue
  *   *
  * @root = root task   * @root = root task
  * @func = task execution function   * @func = task execution function
  * @arg = 1st func argument   * @arg = 1st func argument
 * @val = additional func argument * @prio = regular task priority, 0 is hi priority for regular tasks
  * @opt_data = Optional data   * @opt_data = Optional data
  * @opt_dlen = Optional data length   * @opt_dlen = Optional data length
  * return: NULL error or !=NULL new queued task   * return: NULL error or !=NULL new queued task
  */   */
 sched_task_t *  sched_task_t *
schedEventLo(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long val, schedTask(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long prio, 
                 void *opt_data, size_t opt_dlen)                  void *opt_data, size_t opt_dlen)
 {  {
           sched_task_t *task, *tmp, *t = NULL;
           void *ptr;
   
           if (!root || !func)
                   return NULL;
   
           /* get new task */
           if (!(task = sched_useTask(root)))
                   return NULL;
   
           task->task_func = func;
           TASK_TYPE(task) = taskTASK;
           TASK_ROOT(task) = root;
   
           TASK_ARG(task) = arg;
           TASK_VAL(task) = prio;
   
           TASK_DATA(task) = opt_data;
           TASK_DATLEN(task) = opt_dlen;
   
           if (root->root_hooks.hook_add.task)
                   ptr = root->root_hooks.hook_add.task(task, NULL);
           else
                   ptr = NULL;
   
           if (!ptr) {
   #ifdef HAVE_LIBPTHREAD
                   pthread_mutex_lock(&root->root_mtx[taskTASK]);
   #endif
                   TAILQ_FOREACH_SAFE(t, &root->root_task, task_node, tmp)
                           if (TASK_VAL(task) < TASK_VAL(t))
                                   break;
                   if (!t)
                           TAILQ_INSERT_TAIL(&root->root_task, TASK_ID(task), task_node);
                   else
                           TAILQ_INSERT_BEFORE(t, TASK_ID(task), task_node);
   #ifdef HAVE_LIBPTHREAD
                   pthread_mutex_unlock(&root->root_mtx[taskTASK]);
   #endif
           } else
                   task = sched_unuseTask(task);
   
           return task;
   }
   
   /*
    * schedSuspend() - Add Suspended task to scheduler queue
    *
    * @root = root task
    * @func = task execution function
    * @arg = 1st func argument
    * @id = Trigger ID
    * @opt_data = Optional data
    * @opt_dlen = Optional data length
    * return: NULL error or !=NULL new queued task
    */
   sched_task_t *
   schedSuspend(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, u_long id, 
                   void *opt_data, size_t opt_dlen)
   {
         sched_task_t *task;          sched_task_t *task;
         void *ptr;          void *ptr;
   
Line 656  schedEventLo(sched_root_task_t * __restrict root, sche Line 1136  schedEventLo(sched_root_task_t * __restrict root, sche
                 return NULL;                  return NULL;
   
         /* get new task */          /* get new task */
        if (!(task = _sched_useTask(root)))        if (!(task = sched_useTask(root)))
                 return NULL;                  return NULL;
   
         memset(task, 0, sizeof(sched_task_t));  
         task->task_id = 0;  
         task->task_lock = 0;  
         task->task_func = func;          task->task_func = func;
        TASK_TYPE(task) = taskEVENT;        TASK_TYPE(task) = taskSUSPEND;
         TASK_ROOT(task) = root;          TASK_ROOT(task) = root;
   
         TASK_ARG(task) = arg;          TASK_ARG(task) = arg;
        TASK_VAL(task) = val;        TASK_VAL(task) = id;
   
         TASK_DATA(task) = opt_data;          TASK_DATA(task) = opt_data;
         TASK_DATLEN(task) = opt_dlen;          TASK_DATLEN(task) = opt_dlen;
   
        if (root->root_hooks.hook_add.eventlo)        if (root->root_hooks.hook_add.suspend)
                ptr = root->root_hooks.hook_add.eventlo(task, NULL);                ptr = root->root_hooks.hook_add.suspend(task, NULL);
         else          else
                 ptr = NULL;                  ptr = NULL;
   
         if (!ptr) {          if (!ptr) {
 #ifdef HAVE_LIBPTHREAD  #ifdef HAVE_LIBPTHREAD
                pthread_mutex_lock(&root->root_mtx[taskEVENTLO]);                pthread_mutex_lock(&root->root_mtx[taskSUSPEND]);
 #endif  #endif
                TAILQ_INSERT_TAIL(&root->root_eventlo, task, task_node);                TAILQ_INSERT_TAIL(&root->root_suspend, TASK_ID(task), task_node);
 #ifdef HAVE_LIBPTHREAD  #ifdef HAVE_LIBPTHREAD
                pthread_mutex_unlock(&root->root_mtx[taskEVENTLO]);                pthread_mutex_unlock(&root->root_mtx[taskSUSPEND]);
 #endif  #endif
         } else          } else
                task = _sched_unuseTask(task);                task = sched_unuseTask(task);
   
         return task;          return task;
 }  }
Line 713  schedCallOnce(sched_root_task_t * __restrict root, sch Line 1190  schedCallOnce(sched_root_task_t * __restrict root, sch
                 return NULL;                  return NULL;
   
         /* get new task */          /* get new task */
        if (!(task = _sched_useTask(root)))        if (!(task = sched_useTask(root)))
                 return NULL;                  return NULL;
   
         memset(task, 0, sizeof(sched_task_t));  
         task->task_id = 0;  
         task->task_lock = 0;  
         task->task_func = func;          task->task_func = func;
         TASK_TYPE(task) = taskEVENT;          TASK_TYPE(task) = taskEVENT;
         TASK_ROOT(task) = root;          TASK_ROOT(task) = root;
Line 731  schedCallOnce(sched_root_task_t * __restrict root, sch Line 1205  schedCallOnce(sched_root_task_t * __restrict root, sch
   
         ret = schedCall(task);          ret = schedCall(task);
   
        _sched_unuseTask(task);        sched_unuseTask(task);
         return ret;          return ret;
 }  }
   
   /*
    * schedThread() - Add thread task to scheduler queue
    *
    * @root = root task
    * @func = task execution function
    * @arg = 1st func argument
    * @detach = Detach thread from scheduler, if !=0
    * @ss = stack size
    * @opt_data = Optional data
    * @opt_dlen = Optional data length
    * return: NULL error or !=NULL new queued task
    */
   sched_task_t *
   schedThread(sched_root_task_t * __restrict root, sched_task_func_t func, void *arg, int detach, 
                   size_t ss, void *opt_data, size_t opt_dlen)
   {
   #ifndef HAVE_LIBPTHREAD
           sched_SetErr(ENOTSUP, "Not supported thread tasks");
           return NULL;
   #endif
           sched_task_t *task;
           void *ptr;
           pthread_attr_t attr;
           sem_t *s = NULL;
   
           if (!root || !func)
                   return NULL;
           else {
                   /* normalizing stack size & detach state */
                   if (ss)
                           ss &= 0x7FFFFFFF;
                   detach = detach ? PTHREAD_CREATE_DETACHED : PTHREAD_CREATE_JOINABLE;
           }
   
           if (!(s = (sem_t*) malloc(sizeof(sem_t)))) {
                   LOGERR;
                   return NULL;
           }
           if (sem_init(s, 0, 1)) {
                   LOGERR;
                   free(s);
                   return NULL;
           }
   
           /* get new task */
           if (!(task = sched_useTask(root))) {
                   sem_destroy(s);
                   free(s);
   
                   return NULL;
           }
   
           task->task_func = func;
           TASK_TYPE(task) = taskTHREAD;
           TASK_ROOT(task) = root;
   
           TASK_ARG(task) = arg;
           TASK_FLAG(task) = detach;
           TASK_RET(task) = (intptr_t) s;
   
           TASK_DATA(task) = opt_data;
           TASK_DATLEN(task) = opt_dlen;
   
           pthread_attr_init(&attr);
           pthread_attr_setdetachstate(&attr, detach);
           if (ss && (errno = pthread_attr_setstacksize(&attr, ss))) {
                   LOGERR;
                   pthread_attr_destroy(&attr);
                   sem_destroy(s);
                   free(s);
                   return sched_unuseTask(task);
           }
           if ((errno = pthread_attr_getstacksize(&attr, &ss))) {
                   LOGERR;
                   pthread_attr_destroy(&attr);
                   sem_destroy(s);
                   free(s);
                   return sched_unuseTask(task);
           } else
                   TASK_FLAG(task) |= (ss << 1);
           if ((errno = pthread_attr_setguardsize(&attr, ss))) {
                   LOGERR;
                   pthread_attr_destroy(&attr);
                   sem_destroy(s);
                   free(s);
                   return sched_unuseTask(task);
           }
   #ifdef SCHED_RR
           pthread_attr_setschedpolicy(&attr, SCHED_RR);
   #else
           pthread_attr_setschedpolicy(&attr, SCHED_OTHER);
   #endif
           if (root->root_hooks.hook_add.thread)
                   ptr = root->root_hooks.hook_add.thread(task, &attr);
           else
                   ptr = NULL;
           pthread_attr_destroy(&attr);
   
           if (!ptr) {
                   pthread_mutex_lock(&root->root_mtx[taskTHREAD]);
                   TAILQ_INSERT_TAIL(&root->root_thread, TASK_ID(task), task_node);
                   pthread_mutex_unlock(&root->root_mtx[taskTHREAD]);
   
                   /* wait for init thread actions */
                   sem_wait(s);
           } else
                   task = sched_unuseTask(task);
   
           sem_destroy(s);
           free(s);
           return task;
   }
   

Removed from v.1.8.2.2  
changed lines
  Added in v.1.15


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>