Diff for /libaitio/src/exec.c between versions 1.1 and 1.2

version 1.1, 2013/12/05 12:43:04 version 1.2, 2013/12/15 22:57:20
Line 0 Line 1
   #include "global.h"
   
   
   /*
    * io_progInit() - Init program pool
    *
    * @progName = program name for execution
    * @initNum = initial started programs
    * @maxNum = maximum started programs
    * return: NULL error or !=NULL allocated pool (must destroied with io_progDestroy())
    */
   prog_t *
   io_progInit(const char *progName, u_int initNum, u_int maxNum)
   {
           prog_t *prg = NULL;
   
           if (initNum > maxNum)
                   return NULL;
   
           prg = e_malloc(sizeof(prog_t));
           if (!prg) {
                   io_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
                   return NULL;
           } else
                   memset(prg, 0, sizeof(prog_t));
   
           prg->prog_inin = initNum;
           prg->prog_maxn = maxNum;
           strlcpy(prg->prog_name, progName, sizeof prg->prog_name);
   
           prg->prog_used = e_malloc(E_ALIGN(prg->prog_maxn, sizeof *prg->prog_used) / 
                           sizeof *prg->prog_used);
           if (!prg->prog_used) {
                   io_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
                   e_free(prg);
                   return NULL;
           }
   
           prg->prog_fds = array_Init(prg->prog_maxn);
           if (!prg->prog_fds) {
                   io_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
                   e_free(prg->prog_used);
                   e_free(prg);
                   return NULL;
           }
   
           pthread_mutex_init(&prg->prog_mtx, NULL);
           signal(SIGPIPE, SIG_IGN);
   
           if (io_progOpen(prg, prg->prog_inin) < 0) {
                   io_progDestroy(&prg);
                   prg = NULL;
           }
           return prg;
   }
   
   /*
    * io_progDestroy() - Destroy entire program pool
    *
    * @pprg = program pool
    * return: none
    */
   void
   io_progDestroy(prog_t ** __restrict pprg)
   {
           if (!pprg || !*pprg)
                   return;
   
           io_progClose(*pprg, 0);
   
           e_free((*pprg)->prog_used);
           array_Destroy(&(*pprg)->prog_fds);
           pthread_mutex_destroy(&(*pprg)->prog_mtx);
           signal(SIGPIPE, SIG_DFL);
   
           e_free(*pprg);
           *pprg = NULL;
   }
   
   /*
    * io_progClose() - Close all programs in pool
    *
    * @prg = program pool
    * @closeNum = close program(s) (0 all)
    * return: 0 error, >0 closed programs
    */
   int
   io_progClose(prog_t * __restrict prg, u_int closeNum)
   {
           register int i;
           int ret = 0;
           struct tagPIOPID *p;
   
           if (!prg)
                   return 0;
           if (closeNum > prg->prog_maxn) {
                   io_SetErr(EINVAL, "Requested number for close program is over pool's limit");
                   return 0;
           }
   
           pthread_mutex_lock(&prg->prog_mtx);
           for (i = array_Size(prg->prog_fds) - 1; 
                           (closeNum ? ret < closeNum : 42) && i > -1; i--)
                   if (array_Get(prg->prog_fds, i) && 
   #ifdef POPEN_STREAM
                                   (p = pio_pgetpid(array(prg->prog_fds, i, FILE*)))) {
   #else
                                   (p = pio_pgetpid((int) array(prg->prog_fds, i, intptr_t)))) {
   #endif
                           kill(p->pid, SIGTERM);
                           usleep(1000);
                           if (waitpid(p->pid, &p->stat, WNOHANG) > 0)
                                   kill(p->pid, SIGKILL);
   #ifdef POPEN_STREAM
                           e_pclose(array(prg->prog_fds, i, FILE*));
   #else
                           e_pclose((int) array(prg->prog_fds, i, intptr_t));
   #endif
                           array_Del(prg->prog_fds, i, 0);
                           clrbit(prg->prog_used, i);
                           prg->prog_cnum--;
                           ret++;
                   }
           pthread_mutex_unlock(&prg->prog_mtx);
   
           return ret;
   }
   
   /*
    * io_progCloseAt() - Close program at pool of certain position
    *
    * @prg = program pool
    * @idx = index at pool
    * return: 0 error or !=0 closed program
    */
   int
   io_progCloseAt(prog_t * __restrict prg, u_int idx)
   {
           int ret = 0;
           struct tagPIOPID *p;
   
           if (!prg)
                   return 0;
           if (idx > prg->prog_maxn) {
                   io_SetErr(EINVAL, "Requested number for close program is over pool's limit");
                   return 0;
           }
   
           pthread_mutex_lock(&prg->prog_mtx);
           if (array_Get(prg->prog_fds, idx) && 
   #ifdef POPEN_STREAM
                           (p = pio_pgetpid(array(prg->prog_fds, idx, FILE*)))) {
   #else
                           (p = pio_pgetpid((int) array(prg->prog_fds, idx, intptr_t)))) {
   #endif
                   kill(p->pid, SIGTERM);
                   usleep(1000);
                   if (waitpid(p->pid, &p->stat, WNOHANG) > 0)
                           kill(p->pid, SIGKILL);
   #ifdef POPEN_STREAM
                   e_pclose(array(prg->prog_fds, idx, FILE*));
   #else
                   e_pclose((int) array(prg->prog_fds, idx, intptr_t));
   #endif
                   array_Del(prg->prog_fds, idx, 0);
                   clrbit(prg->prog_used, idx);
                   prg->prog_cnum--;
                   ret++;
           }
           pthread_mutex_unlock(&prg->prog_mtx);
   
           return ret;
   }
   
   /*
    * io_progCloseOf() - Close program at pool with certain handle
    *
    * @prg = program pool
    * @h = handle of program
    * return: 0 error, >0 closed programs
    */
   int
   #ifdef POPEN_STREAM
   io_progCloseOf(prog_t * __restrict prg, FILE *h)
   #else
   io_progCloseOf(prog_t * __restrict prg, int h)
   #endif
   {
           register int i;
           int ret = 0;
           struct tagPIOPID *p;
   #ifdef POPEN_STREAM
           FILE *f;
   #else
           int f;
   #endif
   
           if (!prg)
                   return 0;
   
           pthread_mutex_lock(&prg->prog_mtx);
           for (i = 0; i < array_Size(prg->prog_fds); i++)
                   if (array_Get(prg->prog_fds, i)) {
   #ifdef POPEN_STREAM
                           f = array(prg->prog_fds, i, FILE*);
                           if (f == h && (p = pio_pgetpid(array(prg->prog_fds, i, FILE*)))) {
   #else
                           f = (int) array(prg->prog_fds, i, intptr_t);
                           if (f == h && (p = pio_pgetpid((int) array(prg->prog_fds, i, intptr_t)))) {
   #endif
                                   kill(p->pid, SIGTERM);
                                   usleep(1000);
                                   if (waitpid(p->pid, &p->stat, WNOHANG) > 0)
                                           kill(p->pid, SIGKILL);
   #ifdef POPEN_STREAM
                                   e_pclose(array(prg->prog_fds, i, FILE*));
   #else
                                   e_pclose((int) array(prg->prog_fds, i, intptr_t));
   #endif
                                   array_Del(prg->prog_fds, i, 0);
                                   clrbit(prg->prog_used, i);
                                   prg->prog_cnum--;
                                   ret++;
                                   break;
                           }
                   }
           pthread_mutex_unlock(&prg->prog_mtx);
   
           return ret;
   }
   
   /*
    * io_progOpen2() - Start program from pool on first unused slot
    *
    * @prg = program pool
    * return: -1 error, >-1 reside at slot
    */
   int
   io_progOpen2(prog_t * __restrict prg)
   {
   #ifdef POPEN_STREAM
           FILE *f = NULL;
   #else
           int f = -1;
   #endif
           int stat, ret = -1;
           register int i;
           pid_t pid;
   
           if (!prg)
                   return -1;
           if (prg->prog_cnum + 1 > prg->prog_maxn) {
                   io_SetErr(EINVAL, "Requested number for program execution is over pool's limit");
                   return -1;
           }
   
           pthread_mutex_lock(&prg->prog_mtx);
           for (i = 0; i < array_Size(prg->prog_fds); i++)
                   if (!array_Get(prg->prog_fds, i)) {
                           f = e_popen(prg->prog_name, "r+", &pid);
   #ifdef POPEN_STREAM
                           if (!f) {
   #else
                           if (f == -1) {
   #endif
                                   LOGERR;
                                   break;
                           } else if (waitpid(pid, &stat, WNOHANG)) {
                                   io_SetErr(ECHILD, "Program with pid=%d exit with status %d", 
                                                   pid, WIFEXITED(stat) ? WEXITSTATUS(stat) : -1);
                                   e_pclose(f);
                                   break;
                           } else
                                   array_Set(prg->prog_fds, i, f);
                           clrbit(prg->prog_used, i);
                           prg->prog_cnum++;
                           ret = i;
                           break;
                   }
           pthread_mutex_unlock(&prg->prog_mtx);
   
           return ret;
   }
   
   /*
    * io_progOpen() - Execute number of program(s)
    *
    * @prg = program pool
    * @execNum = execute program(s) (0 max)
    * return: -1 error, >0 executed programs
    */
   int
   io_progOpen(prog_t * __restrict prg, u_int execNum)
   {
   #ifdef POPEN_STREAM
           FILE *f;
   #else
           int f;
   #endif
           int stat, ret = 0;
           register int i;
           pid_t pid;
   
           if (!prg)
                   return -1;
           if (prg->prog_cnum + execNum > prg->prog_maxn) {
                   io_SetErr(EINVAL, "Requested number for program execution is over pool's limit");
                   return -1;
           }
   
           pthread_mutex_lock(&prg->prog_mtx);
           for (i = 0; (execNum ? ret < execNum : 42) && i < array_Size(prg->prog_fds); i++)
                   if (!array_Get(prg->prog_fds, i)) {
                           f = e_popen(prg->prog_name, "r+", &pid);
   #ifdef POPEN_STREAM
                           if (!f) {
   #else
                           if (f == -1) {
   #endif
                                   LOGERR;
                                   ret = -1;
                                   break;
                           } else if (waitpid(pid, &stat, WNOHANG)) {
                                   io_SetErr(ECHILD, "Program with pid=%d exit with status %d", 
                                                   pid, WIFEXITED(stat) ? WEXITSTATUS(stat) : -1);
                                   e_pclose(f);
                                   ret = -1;
                                   break;
                           } else
                                   array_Set(prg->prog_fds, i, f);
                           clrbit(prg->prog_used, i);
                           prg->prog_cnum++;
                           ret++;
                   }
           pthread_mutex_unlock(&prg->prog_mtx);
   
           return ret;
   }
   
   /*
    * io_progGrow() - Execute to number of programs in pool
    *
    * @prg = program pool
    * @toNum = execute to number of programs (0 max)
    * return: 0 error or nothing to do, 
    *      >0 executed programs and abs(<0) executed programs with logged error
    */
   int
   io_progGrow(prog_t * __restrict prg, u_int toNum)
   {
           if (!prg)
                   return 0;
           if (toNum > prg->prog_maxn) {
                   io_SetErr(EINVAL, "Requested number for program execution is over pool's limit");
                   return 0;
           }
           if (!toNum)
                   toNum = prg->prog_maxn;
           if (toNum < prg->prog_inin)
                   toNum = prg->prog_inin;
   
           if ((int) (toNum - prg->prog_cnum) < 1)
                   return 0;
   
           return io_progOpen(prg, toNum - prg->prog_cnum);
   }
   
   /*
    * io_progVacuum() - Vacuum pool to running number of programs
    *
    * @prg = program pool
    * @toNum = vacuum to number of programs (0 to init number)
    * return: 0 error or >0 closed programs
    */
   int
   io_progVacuum(prog_t * __restrict prg, u_int toNum)
   {
           register int i;
           int ret = 0;
           struct tagPIOPID *p;
   
           if (!prg)
                   return 0;
           if (toNum > prg->prog_maxn) {
                   io_SetErr(EINVAL, "Requested number for close program is over pool's limit");
                   return 0;
           }
           if (!toNum)
                   toNum = prg->prog_inin;
   
           pthread_mutex_lock(&prg->prog_mtx);
           for (i = array_Size(prg->prog_fds) - 1; prg->prog_cnum > toNum && i > -1; i--)
                   if (array_Get(prg->prog_fds, i) && isclr(prg->prog_used, i) && 
   #ifdef POPEN_STREAM
                                   (p = pio_pgetpid(array(prg->prog_fds, i, FILE*)))) {
                           kill(p->pid, SIGTERM);
                           usleep(1000);
                           if (waitpid(p->pid, &p->stat, WNOHANG) > 0)
                                   kill(p->pid, SIGKILL);
                           e_pclose(array(prg->prog_fds, i, FILE*));
   #else
                                   (p = pio_pgetpid((int) array(prg->prog_fds, i, intptr_t)))) {
                           kill(p->pid, SIGTERM);
                           usleep(1000);
                           if (waitpid(p->pid, &p->stat, WNOHANG) > 0)
                                   kill(p->pid, SIGKILL);
                           e_pclose((int) array(prg->prog_fds, i, intptr_t));
   #endif
                           array_Del(prg->prog_fds, i, 0);
                           prg->prog_cnum--;
                           ret++;
                   }
           pthread_mutex_unlock(&prg->prog_mtx);
   
           return ret;
   }
   
   /*
    * io_progCheck() - Check exit status of program pool
    *
    * @prg = program pool
    * @re = resurrect program to init number
    * return: -1 error or >-1 exited programs
    */
   int
   io_progCheck(prog_t * __restrict prg, int re)
   {
           int ret = 0;
           struct tagPIOPID *p;
           register int i;
   
           if (!prg)
                   return -1;
   
           pthread_mutex_lock(&prg->prog_mtx);
           for (i = 0; i < array_Size(prg->prog_fds); i++)
                   if (array_Get(prg->prog_fds, i) && 
   #ifdef POPEN_STREAM
                                   (p = pio_pgetpid(array(prg->prog_fds, i, FILE*)))) {
   #else
                                   (p = pio_pgetpid((int) array(prg->prog_fds, i, intptr_t)))) {
   #endif
                           if (waitpid(p->pid, &p->stat, WNOHANG)) {
                                   clrbit(prg->prog_used, i);
   #ifdef POPEN_STREAM
                                   e_pclose(array(prg->prog_fds, i, FILE*));
   #else
                                   e_pclose((int) array(prg->prog_fds, i, intptr_t));
   #endif
                                   array_Del(prg->prog_fds, i, 0);
                                   prg->prog_cnum--;
                                   ret++;
                           }
                   }
           pthread_mutex_unlock(&prg->prog_mtx);
   
           /* resurrect to init number */
           if (re && ((int) (prg->prog_inin - prg->prog_cnum) > 0))
                   io_progOpen(prg, prg->prog_inin - prg->prog_cnum);
   
           return ret;
   }
   
   /*
    * io_progAttach() - Attach to open program
    *
    * @prg = program pool
    * @newOne = Execute new one program after attach
    * return: NULL error or !=NULL attached program handle
    */
   #ifdef POPEN_STREAM
   FILE *
   #else
   int
   #endif
   io_progAttach(prog_t * __restrict prg, int newOne)
   {
   #ifdef POPEN_STREAM
           FILE *f = NULL;
   #else
           int f = -1;
   #endif
           register int i;
   
           if (!prg)
   #ifdef POPEN_STREAM
                   return NULL;
   #else
                   return -1;
   #endif
   
           pthread_mutex_lock(&prg->prog_mtx);
           for (i = 0; i < array_Size(prg->prog_fds); i++)
                   if (array_Get(prg->prog_fds, i) && isclr(prg->prog_used, i)) {
                           setbit(prg->prog_used, i);
   #ifdef POPEN_STREAM
                           f = array(prg->prog_fds, i, FILE*);
   #else
                           f = array(prg->prog_fds, i, intptr_t);
   #endif
                           break;
                   }
           pthread_mutex_unlock(&prg->prog_mtx);
   
           /* execute new one program */
           if (newOne) {
                   if (f)
                           io_progOpen(prg, 1);
                   else if ((i = io_progOpen2(prg)) > -1)
                           /* not found free program */
   #ifdef POPEN_STREAM
                           f = array(prg->prog_fds, i, FILE*);
   #else
                           f = array(prg->prog_fds, i, intptr_t);
   #endif
           }
   
           return f;
   }
   
   /*
    * io_progDetach() - Detch from open program
    *
    * @prg= program pool
    * @pfd = attached program handle
    * return: none
    */
   void
   #ifdef POPEN_STREAM
   io_progDetach(prog_t * __restrict prg, FILE *pfd)
   #else
   io_progDetach(prog_t * __restrict prg, int pfd)
   #endif
   {
           register int i;
   
           if (!prg || !pfd)
                   return;
   
           pthread_mutex_lock(&prg->prog_mtx);
           for (i = 0; i < array_Size(prg->prog_fds); i++)
   #ifdef POPEN_STREAM
                   if (array(prg->prog_fds, i, FILE*) == pfd) {
   #else
                   if (array(prg->prog_fds, i, intptr_t) == pfd) {
   #endif
                           clrbit(prg->prog_used, i);
                           break;
                   }
           pthread_mutex_unlock(&prg->prog_mtx);
   }

Removed from v.1.1  
changed lines
  Added in v.1.2


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>