Diff for /libaitio/src/exec.c between versions 1.1.2.10 and 1.1.2.17

version 1.1.2.10, 2013/12/05 23:43:46 version 1.1.2.17, 2013/12/12 09:04:22
Line 45  io_progInit(const char *progName, u_int initNum, u_int Line 45  io_progInit(const char *progName, u_int initNum, u_int
         }          }
   
         pthread_mutex_init(&prg->prog_mtx, NULL);          pthread_mutex_init(&prg->prog_mtx, NULL);
           signal(SIGPIPE, SIG_IGN);
   
         if (io_progOpen(prg, prg->prog_inin) < 0) {          if (io_progOpen(prg, prg->prog_inin) < 0) {
                 io_progDestroy(&prg);                  io_progDestroy(&prg);
Line 70  io_progDestroy(prog_t ** __restrict pprg) Line 71  io_progDestroy(prog_t ** __restrict pprg)
         e_free((*pprg)->prog_used);          e_free((*pprg)->prog_used);
         array_Destroy(&(*pprg)->prog_fds);          array_Destroy(&(*pprg)->prog_fds);
         pthread_mutex_destroy(&(*pprg)->prog_mtx);          pthread_mutex_destroy(&(*pprg)->prog_mtx);
           signal(SIGPIPE, SIG_DFL);
   
         e_free(*pprg);          e_free(*pprg);
         *pprg = NULL;          *pprg = NULL;
Line 87  io_progClose(prog_t * __restrict prg, u_int closeNum) Line 89  io_progClose(prog_t * __restrict prg, u_int closeNum)
 {  {
         register int i;          register int i;
         int ret = 0;          int ret = 0;
           struct tagPIOPID *p;
   
         if (!prg)          if (!prg)
                 return 0;                  return 0;
Line 98  io_progClose(prog_t * __restrict prg, u_int closeNum) Line 101  io_progClose(prog_t * __restrict prg, u_int closeNum)
         pthread_mutex_lock(&prg->prog_mtx);          pthread_mutex_lock(&prg->prog_mtx);
         for (i = array_Size(prg->prog_fds) - 1;           for (i = array_Size(prg->prog_fds) - 1; 
                         (closeNum ? ret < closeNum : 42) && i > -1; i--)                          (closeNum ? ret < closeNum : 42) && i > -1; i--)
                if (array_Get(prg->prog_fds, i)) {                if (array_Get(prg->prog_fds, i) && 
 #ifdef POPEN_STREAM
                                 (p = pio_pgetpid(array(prg->prog_fds, i, FILE*)))) {
 #else
                                 (p = pio_pgetpid((int) array(prg->prog_fds, i, intptr_t)))) {
 #endif
                         kill(p->pid, SIGTERM);
                         usleep(1000);
                         if (waitpid(p->pid, &p->stat, WNOHANG) > 0)
                                 kill(p->pid, SIGKILL);
 #ifdef POPEN_STREAM
                         e_pclose(array(prg->prog_fds, i, FILE*));                          e_pclose(array(prg->prog_fds, i, FILE*));
   #else
                           e_pclose((int) array(prg->prog_fds, i, intptr_t));
   #endif
                         array_Del(prg->prog_fds, i, 0);                          array_Del(prg->prog_fds, i, 0);
                         clrbit(prg->prog_used, i);                          clrbit(prg->prog_used, i);
                         prg->prog_cnum--;                          prg->prog_cnum--;
Line 111  io_progClose(prog_t * __restrict prg, u_int closeNum) Line 127  io_progClose(prog_t * __restrict prg, u_int closeNum)
 }  }
   
 /*  /*
    * io_progCloseAt() - Close program at pool of certain position
    *
    * @prg = program pool
    * @idx = index at pool
    * return: 0 error or !=0 closed program
    */
   int
   io_progCloseAt(prog_t * __restrict prg, u_int idx)
   {
           int ret = 0;
           struct tagPIOPID *p;
   
           if (!prg)
                   return 0;
           if (idx > prg->prog_maxn) {
                   io_SetErr(EINVAL, "Requested number for close program is over pool's limit");
                   return 0;
           }
   
           pthread_mutex_lock(&prg->prog_mtx);
           if (array_Get(prg->prog_fds, idx) && 
   #ifdef POPEN_STREAM
                           (p = pio_pgetpid(array(prg->prog_fds, idx, FILE*)))) {
   #else
                           (p = pio_pgetpid((int) array(prg->prog_fds, idx, intptr_t)))) {
   #endif
                   kill(p->pid, SIGTERM);
                   usleep(1000);
                   if (waitpid(p->pid, &p->stat, WNOHANG) > 0)
                           kill(p->pid, SIGKILL);
   #ifdef POPEN_STREAM
                   e_pclose(array(prg->prog_fds, idx, FILE*));
   #else
                   e_pclose((int) array(prg->prog_fds, idx, intptr_t));
   #endif
                   array_Del(prg->prog_fds, idx, 0);
                   clrbit(prg->prog_used, idx);
                   prg->prog_cnum--;
                   ret++;
           }
           pthread_mutex_unlock(&prg->prog_mtx);
   
           return ret;
   }
   
   /*
  * io_progOpen() - Execute number of program(s)   * io_progOpen() - Execute number of program(s)
  *   *
  * @prg = program pool   * @prg = program pool
Line 120  io_progClose(prog_t * __restrict prg, u_int closeNum) Line 182  io_progClose(prog_t * __restrict prg, u_int closeNum)
 int  int
 io_progOpen(prog_t * __restrict prg, u_int execNum)  io_progOpen(prog_t * __restrict prg, u_int execNum)
 {  {
   #ifdef POPEN_STREAM
         FILE *f;          FILE *f;
   #else
           int f;
   #endif
         int stat, ret = 0;          int stat, ret = 0;
         register int i;          register int i;
         pid_t pid;          pid_t pid;
Line 136  io_progOpen(prog_t * __restrict prg, u_int execNum) Line 202  io_progOpen(prog_t * __restrict prg, u_int execNum)
         for (i = 0; (execNum ? ret < execNum : 42) && i < array_Size(prg->prog_fds); i++)          for (i = 0; (execNum ? ret < execNum : 42) && i < array_Size(prg->prog_fds); i++)
                 if (!array_Get(prg->prog_fds, i)) {                  if (!array_Get(prg->prog_fds, i)) {
                         f = e_popen(prg->prog_name, "r+", &pid);                          f = e_popen(prg->prog_name, "r+", &pid);
   #ifdef POPEN_STREAM
                         if (!f) {                          if (!f) {
   #else
                           if (f == -1) {
   #endif
                                 LOGERR;                                  LOGERR;
                                 ret = -1;                                  ret = -1;
                                 break;                                  break;
Line 147  io_progOpen(prog_t * __restrict prg, u_int execNum) Line 217  io_progOpen(prog_t * __restrict prg, u_int execNum)
                                 break;                                  break;
                         } else                          } else
                                 array_Set(prg->prog_fds, i, f);                                  array_Set(prg->prog_fds, i, f);
                           clrbit(prg->prog_used, i);
                         prg->prog_cnum++;                          prg->prog_cnum++;
                         ret++;                          ret++;
                 }                  }
Line 160  io_progOpen(prog_t * __restrict prg, u_int execNum) Line 231  io_progOpen(prog_t * __restrict prg, u_int execNum)
  *   *
  * @prg = program pool   * @prg = program pool
  * @toNum = execute to number of programs (0 max)   * @toNum = execute to number of programs (0 max)
 * return: 0 error, >0 executed programs and abs(<0) executed programs with logged error * return: 0 error or nothing to do, 
  *        >0 executed programs and abs(<0) executed programs with logged error
  */   */
 int  int
 io_progGrow(prog_t * __restrict prg, u_int toNum)  io_progGrow(prog_t * __restrict prg, u_int toNum)
Line 173  io_progGrow(prog_t * __restrict prg, u_int toNum) Line 245  io_progGrow(prog_t * __restrict prg, u_int toNum)
         }          }
         if (!toNum)          if (!toNum)
                 toNum = prg->prog_maxn;                  toNum = prg->prog_maxn;
           if (toNum < prg->prog_inin)
                   toNum = prg->prog_inin;
   
           if ((toNum - prg->prog_cnum) < 1)
                   return 0;
   
         return io_progOpen(prg, toNum - prg->prog_cnum);          return io_progOpen(prg, toNum - prg->prog_cnum);
 }  }
   
Line 189  io_progVacuum(prog_t * __restrict prg, u_int toNum) Line 266  io_progVacuum(prog_t * __restrict prg, u_int toNum)
 {  {
         register int i;          register int i;
         int ret = 0;          int ret = 0;
           struct tagPIOPID *p;
   
         if (!prg)          if (!prg)
                 return 0;                  return 0;
Line 201  io_progVacuum(prog_t * __restrict prg, u_int toNum) Line 279  io_progVacuum(prog_t * __restrict prg, u_int toNum)
   
         pthread_mutex_lock(&prg->prog_mtx);          pthread_mutex_lock(&prg->prog_mtx);
         for (i = array_Size(prg->prog_fds) - 1; prg->prog_cnum > toNum && i > -1; i--)          for (i = array_Size(prg->prog_fds) - 1; prg->prog_cnum > toNum && i > -1; i--)
                if (array_Get(prg->prog_fds, i) && isclr(prg->prog_used, i)) {                if (array_Get(prg->prog_fds, i) && isclr(prg->prog_used, i) && 
 #ifdef POPEN_STREAM
                                 (p = pio_pgetpid(array(prg->prog_fds, i, FILE*)))) {
                         kill(p->pid, SIGTERM);
                         usleep(1000);
                         if (waitpid(p->pid, &p->stat, WNOHANG) > 0)
                                 kill(p->pid, SIGKILL);
                         e_pclose(array(prg->prog_fds, i, FILE*));                          e_pclose(array(prg->prog_fds, i, FILE*));
   #else
                                   (p = pio_pgetpid((int) array(prg->prog_fds, i, intptr_t)))) {
                           kill(p->pid, SIGTERM);
                           usleep(1000);
                           if (waitpid(p->pid, &p->stat, WNOHANG) > 0)
                                   kill(p->pid, SIGKILL);
                           e_pclose((int) array(prg->prog_fds, i, intptr_t));
   #endif
                         array_Del(prg->prog_fds, i, 0);                          array_Del(prg->prog_fds, i, 0);
                         prg->prog_cnum--;                          prg->prog_cnum--;
                         ret++;                          ret++;
Line 216  io_progVacuum(prog_t * __restrict prg, u_int toNum) Line 308  io_progVacuum(prog_t * __restrict prg, u_int toNum)
  * io_progCheck() - Check exit status of program pool   * io_progCheck() - Check exit status of program pool
  *   *
  * @prg = program pool   * @prg = program pool
    * @re = resurrect program
  * return: -1 error or >-1 exited programs   * return: -1 error or >-1 exited programs
  */   */
 int  int
io_progCheck(prog_t * __restrict prg)io_progCheck(prog_t * __restrict prg, int re)
 {  {
         int ret = 0;          int ret = 0;
         struct tagPIOPID *p;          struct tagPIOPID *p;
Line 231  io_progCheck(prog_t * __restrict prg) Line 324  io_progCheck(prog_t * __restrict prg)
         pthread_mutex_lock(&prg->prog_mtx);          pthread_mutex_lock(&prg->prog_mtx);
         for (i = 0; i < array_Size(prg->prog_fds); i++)          for (i = 0; i < array_Size(prg->prog_fds); i++)
                 if (array_Get(prg->prog_fds, i) &&                   if (array_Get(prg->prog_fds, i) && 
   #ifdef POPEN_STREAM
                                 (p = pio_pgetpid(array(prg->prog_fds, i, FILE*))))                                  (p = pio_pgetpid(array(prg->prog_fds, i, FILE*))))
   #else
                                   (p = pio_pgetpid((int) array(prg->prog_fds, i, intptr_t))))
   #endif
                         if (waitpid(p->pid, &p->stat, WNOHANG) > 0) {                          if (waitpid(p->pid, &p->stat, WNOHANG) > 0) {
                                 clrbit(prg->prog_used, i);                                  clrbit(prg->prog_used, i);
   #ifdef POPEN_STREAM
                                   e_pclose(array(prg->prog_fds, i, FILE*));
   #else
                                   e_pclose((int) array(prg->prog_fds, i, intptr_t));
   #endif
                                   array_Del(prg->prog_fds, i, 0);
                                   prg->prog_cnum--;
                                 ret++;                                  ret++;
                         }                          }
         pthread_mutex_unlock(&prg->prog_mtx);          pthread_mutex_unlock(&prg->prog_mtx);
   
           /* resurrect */
           if (re && ret > 0 && prg->prog_inin - prg->prog_cnum)
                   io_progOpen(prg, prg->prog_inin - prg->prog_cnum);
   
         return ret;          return ret;
 }  }
   
Line 245  io_progCheck(prog_t * __restrict prg) Line 353  io_progCheck(prog_t * __restrict prg)
  * io_progAttach() - Attach to open program   * io_progAttach() - Attach to open program
  *   *
  * @prg = program pool   * @prg = program pool
    * @newOne = Execute new one program after attach
  * return: NULL error or !=NULL attached program handle   * return: NULL error or !=NULL attached program handle
  */   */
   #ifdef POPEN_STREAM
 FILE *  FILE *
io_progAttach(prog_t * __restrict prg)#else
 int
 #endif
 io_progAttach(prog_t * __restrict prg, int newOne)
 {  {
   #ifdef POPEN_STREAM
         FILE *f = NULL;          FILE *f = NULL;
   #else
           int f = -1;
   #endif
         register int i;          register int i;
   
         if (!prg)          if (!prg)
   #ifdef POPEN_STREAM
                 return NULL;                  return NULL;
   #else
                   return -1;
   #endif
   
         pthread_mutex_lock(&prg->prog_mtx);          pthread_mutex_lock(&prg->prog_mtx);
         for (i = 0; i < array_Size(prg->prog_fds); i++)          for (i = 0; i < array_Size(prg->prog_fds); i++)
                 if (array_Get(prg->prog_fds, i) && isclr(prg->prog_used, i)) {                  if (array_Get(prg->prog_fds, i) && isclr(prg->prog_used, i)) {
                         setbit(prg->prog_used, i);                          setbit(prg->prog_used, i);
   #ifdef POPEN_STREAM
                         f = array(prg->prog_fds, i, FILE*);                          f = array(prg->prog_fds, i, FILE*);
   #else
                           f = array(prg->prog_fds, i, intptr_t);
   #endif
                         break;                          break;
                 }                  }
         pthread_mutex_unlock(&prg->prog_mtx);          pthread_mutex_unlock(&prg->prog_mtx);
   
           /* execute new one program */
           if (newOne && f)
                   io_progOpen(prg, 1);
   
         return f;          return f;
 }  }
   
Line 276  io_progAttach(prog_t * __restrict prg) Line 405  io_progAttach(prog_t * __restrict prg)
  * return: none   * return: none
  */   */
 void  void
   #ifdef POPEN_STREAM
 io_progDetach(prog_t * __restrict prg, FILE *pfd)  io_progDetach(prog_t * __restrict prg, FILE *pfd)
   #else
   io_progDetach(prog_t * __restrict prg, int pfd)
   #endif
 {  {
         register int i;          register int i;
   
Line 285  io_progDetach(prog_t * __restrict prg, FILE *pfd) Line 418  io_progDetach(prog_t * __restrict prg, FILE *pfd)
   
         pthread_mutex_lock(&prg->prog_mtx);          pthread_mutex_lock(&prg->prog_mtx);
         for (i = 0; i < array_Size(prg->prog_fds); i++)          for (i = 0; i < array_Size(prg->prog_fds); i++)
   #ifdef POPEN_STREAM
                 if (array(prg->prog_fds, i, FILE*) == pfd) {                  if (array(prg->prog_fds, i, FILE*) == pfd) {
   #else
                   if (array(prg->prog_fds, i, intptr_t) == pfd) {
   #endif
                         clrbit(prg->prog_used, i);                          clrbit(prg->prog_used, i);
                         break;                          break;
                 }                  }

Removed from v.1.1.2.10  
changed lines
  Added in v.1.1.2.17


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>