--- libaitio/src/exec.c 2013/12/06 01:30:22 1.1.2.12 +++ libaitio/src/exec.c 2013/12/12 09:04:22 1.1.2.17 @@ -71,6 +71,7 @@ io_progDestroy(prog_t ** __restrict pprg) e_free((*pprg)->prog_used); array_Destroy(&(*pprg)->prog_fds); pthread_mutex_destroy(&(*pprg)->prog_mtx); + signal(SIGPIPE, SIG_DFL); e_free(*pprg); *pprg = NULL; @@ -88,6 +89,7 @@ io_progClose(prog_t * __restrict prg, u_int closeNum) { register int i; int ret = 0; + struct tagPIOPID *p; if (!prg) return 0; @@ -99,8 +101,17 @@ io_progClose(prog_t * __restrict prg, u_int closeNum) pthread_mutex_lock(&prg->prog_mtx); for (i = array_Size(prg->prog_fds) - 1; (closeNum ? ret < closeNum : 42) && i > -1; i--) - if (array_Get(prg->prog_fds, i)) { + if (array_Get(prg->prog_fds, i) && #ifdef POPEN_STREAM + (p = pio_pgetpid(array(prg->prog_fds, i, FILE*)))) { +#else + (p = pio_pgetpid((int) array(prg->prog_fds, i, intptr_t)))) { +#endif + kill(p->pid, SIGTERM); + usleep(1000); + if (waitpid(p->pid, &p->stat, WNOHANG) > 0) + kill(p->pid, SIGKILL); +#ifdef POPEN_STREAM e_pclose(array(prg->prog_fds, i, FILE*)); #else e_pclose((int) array(prg->prog_fds, i, intptr_t)); @@ -116,6 +127,52 @@ io_progClose(prog_t * __restrict prg, u_int closeNum) } /* + * io_progCloseAt() - Close program at pool of certain position + * + * @prg = program pool + * @idx = index at pool + * return: 0 error or !=0 closed program + */ +int +io_progCloseAt(prog_t * __restrict prg, u_int idx) +{ + int ret = 0; + struct tagPIOPID *p; + + if (!prg) + return 0; + if (idx > prg->prog_maxn) { + io_SetErr(EINVAL, "Requested number for close program is over pool's limit"); + return 0; + } + + pthread_mutex_lock(&prg->prog_mtx); + if (array_Get(prg->prog_fds, idx) && +#ifdef POPEN_STREAM + (p = pio_pgetpid(array(prg->prog_fds, idx, FILE*)))) { +#else + (p = pio_pgetpid((int) array(prg->prog_fds, idx, intptr_t)))) { +#endif + kill(p->pid, SIGTERM); + usleep(1000); + if (waitpid(p->pid, &p->stat, WNOHANG) > 0) + kill(p->pid, SIGKILL); +#ifdef POPEN_STREAM + e_pclose(array(prg->prog_fds, idx, FILE*)); +#else + e_pclose((int) array(prg->prog_fds, idx, intptr_t)); +#endif + array_Del(prg->prog_fds, idx, 0); + clrbit(prg->prog_used, idx); + prg->prog_cnum--; + ret++; + } + pthread_mutex_unlock(&prg->prog_mtx); + + return ret; +} + +/* * io_progOpen() - Execute number of program(s) * * @prg = program pool @@ -160,6 +217,7 @@ io_progOpen(prog_t * __restrict prg, u_int execNum) break; } else array_Set(prg->prog_fds, i, f); + clrbit(prg->prog_used, i); prg->prog_cnum++; ret++; } @@ -173,7 +231,8 @@ io_progOpen(prog_t * __restrict prg, u_int execNum) * * @prg = program pool * @toNum = execute to number of programs (0 max) - * return: 0 error, >0 executed programs and abs(<0) executed programs with logged error + * return: 0 error or nothing to do, + * >0 executed programs and abs(<0) executed programs with logged error */ int io_progGrow(prog_t * __restrict prg, u_int toNum) @@ -186,7 +245,12 @@ io_progGrow(prog_t * __restrict prg, u_int toNum) } if (!toNum) toNum = prg->prog_maxn; + if (toNum < prg->prog_inin) + toNum = prg->prog_inin; + if ((toNum - prg->prog_cnum) < 1) + return 0; + return io_progOpen(prg, toNum - prg->prog_cnum); } @@ -202,6 +266,7 @@ io_progVacuum(prog_t * __restrict prg, u_int toNum) { register int i; int ret = 0; + struct tagPIOPID *p; if (!prg) return 0; @@ -214,10 +279,20 @@ io_progVacuum(prog_t * __restrict prg, u_int toNum) pthread_mutex_lock(&prg->prog_mtx); for (i = array_Size(prg->prog_fds) - 1; prg->prog_cnum > toNum && i > -1; i--) - if (array_Get(prg->prog_fds, i) && isclr(prg->prog_used, i)) { + if (array_Get(prg->prog_fds, i) && isclr(prg->prog_used, i) && #ifdef POPEN_STREAM + (p = pio_pgetpid(array(prg->prog_fds, i, FILE*)))) { + kill(p->pid, SIGTERM); + usleep(1000); + if (waitpid(p->pid, &p->stat, WNOHANG) > 0) + kill(p->pid, SIGKILL); e_pclose(array(prg->prog_fds, i, FILE*)); #else + (p = pio_pgetpid((int) array(prg->prog_fds, i, intptr_t)))) { + kill(p->pid, SIGTERM); + usleep(1000); + if (waitpid(p->pid, &p->stat, WNOHANG) > 0) + kill(p->pid, SIGKILL); e_pclose((int) array(prg->prog_fds, i, intptr_t)); #endif array_Del(prg->prog_fds, i, 0); @@ -233,10 +308,11 @@ io_progVacuum(prog_t * __restrict prg, u_int toNum) * io_progCheck() - Check exit status of program pool * * @prg = program pool + * @re = resurrect program * return: -1 error or >-1 exited programs */ int -io_progCheck(prog_t * __restrict prg) +io_progCheck(prog_t * __restrict prg, int re) { int ret = 0; struct tagPIOPID *p; @@ -255,10 +331,21 @@ io_progCheck(prog_t * __restrict prg) #endif if (waitpid(p->pid, &p->stat, WNOHANG) > 0) { clrbit(prg->prog_used, i); +#ifdef POPEN_STREAM + e_pclose(array(prg->prog_fds, i, FILE*)); +#else + e_pclose((int) array(prg->prog_fds, i, intptr_t)); +#endif + array_Del(prg->prog_fds, i, 0); + prg->prog_cnum--; ret++; } pthread_mutex_unlock(&prg->prog_mtx); + /* resurrect */ + if (re && ret > 0 && prg->prog_inin - prg->prog_cnum) + io_progOpen(prg, prg->prog_inin - prg->prog_cnum); + return ret; } @@ -266,6 +353,7 @@ io_progCheck(prog_t * __restrict prg) * io_progAttach() - Attach to open program * * @prg = program pool + * @newOne = Execute new one program after attach * return: NULL error or !=NULL attached program handle */ #ifdef POPEN_STREAM @@ -273,7 +361,7 @@ FILE * #else int #endif -io_progAttach(prog_t * __restrict prg) +io_progAttach(prog_t * __restrict prg, int newOne) { #ifdef POPEN_STREAM FILE *f = NULL; @@ -301,6 +389,10 @@ io_progAttach(prog_t * __restrict prg) break; } pthread_mutex_unlock(&prg->prog_mtx); + + /* execute new one program */ + if (newOne && f) + io_progOpen(prg, 1); return f; }