--- libaitio/src/exec.c 2013/12/05 23:43:46 1.1.2.10 +++ libaitio/src/exec.c 2013/12/08 20:43:22 1.1.2.14 @@ -45,6 +45,7 @@ io_progInit(const char *progName, u_int initNum, u_int } pthread_mutex_init(&prg->prog_mtx, NULL); + signal(SIGPIPE, SIG_IGN); if (io_progOpen(prg, prg->prog_inin) < 0) { io_progDestroy(&prg); @@ -70,6 +71,7 @@ io_progDestroy(prog_t ** __restrict pprg) e_free((*pprg)->prog_used); array_Destroy(&(*pprg)->prog_fds); pthread_mutex_destroy(&(*pprg)->prog_mtx); + signal(SIGPIPE, SIG_DFL); e_free(*pprg); *pprg = NULL; @@ -87,6 +89,7 @@ io_progClose(prog_t * __restrict prg, u_int closeNum) { register int i; int ret = 0; + struct tagPIOPID *p; if (!prg) return 0; @@ -98,8 +101,21 @@ io_progClose(prog_t * __restrict prg, u_int closeNum) pthread_mutex_lock(&prg->prog_mtx); for (i = array_Size(prg->prog_fds) - 1; (closeNum ? ret < closeNum : 42) && i > -1; i--) - if (array_Get(prg->prog_fds, i)) { + if (array_Get(prg->prog_fds, i) && +#ifdef POPEN_STREAM + (p = pio_pgetpid(array(prg->prog_fds, i, FILE*)))) { +#else + (p = pio_pgetpid((int) array(prg->prog_fds, i, intptr_t)))) { +#endif + kill(p->pid, SIGTERM); + usleep(1000); + if (waitpid(p->pid, &p->stat, WNOHANG) > 0) + kill(p->pid, SIGKILL); +#ifdef POPEN_STREAM e_pclose(array(prg->prog_fds, i, FILE*)); +#else + e_pclose((int) array(prg->prog_fds, i, intptr_t)); +#endif array_Del(prg->prog_fds, i, 0); clrbit(prg->prog_used, i); prg->prog_cnum--; @@ -111,6 +127,52 @@ io_progClose(prog_t * __restrict prg, u_int closeNum) } /* + * io_progCloseAt() - Close program at pool of certain position + * + * @prg = program pool + * @idx = index at pool + * return: 0 error or !=0 closed program + */ +int +io_progCloseAt(prog_t * __restrict prg, u_int idx) +{ + int ret = 0; + struct tagPIOPID *p; + + if (!prg) + return 0; + if (idx > prg->prog_maxn) { + io_SetErr(EINVAL, "Requested number for close program is over pool's limit"); + return 0; + } + + pthread_mutex_lock(&prg->prog_mtx); + if (array_Get(prg->prog_fds, idx) && +#ifdef POPEN_STREAM + (p = pio_pgetpid(array(prg->prog_fds, idx, FILE*)))) { +#else + (p = pio_pgetpid((int) array(prg->prog_fds, idx, intptr_t)))) { +#endif + kill(p->pid, SIGTERM); + usleep(1000); + if (waitpid(p->pid, &p->stat, WNOHANG) > 0) + kill(p->pid, SIGKILL); +#ifdef POPEN_STREAM + e_pclose(array(prg->prog_fds, idx, FILE*)); +#else + e_pclose((int) array(prg->prog_fds, idx, intptr_t)); +#endif + array_Del(prg->prog_fds, idx, 0); + clrbit(prg->prog_used, idx); + prg->prog_cnum--; + ret++; + } + pthread_mutex_unlock(&prg->prog_mtx); + + return ret; +} + +/* * io_progOpen() - Execute number of program(s) * * @prg = program pool @@ -120,7 +182,11 @@ io_progClose(prog_t * __restrict prg, u_int closeNum) int io_progOpen(prog_t * __restrict prg, u_int execNum) { +#ifdef POPEN_STREAM FILE *f; +#else + int f; +#endif int stat, ret = 0; register int i; pid_t pid; @@ -136,7 +202,11 @@ io_progOpen(prog_t * __restrict prg, u_int execNum) for (i = 0; (execNum ? ret < execNum : 42) && i < array_Size(prg->prog_fds); i++) if (!array_Get(prg->prog_fds, i)) { f = e_popen(prg->prog_name, "r+", &pid); +#ifdef POPEN_STREAM if (!f) { +#else + if (f == -1) { +#endif LOGERR; ret = -1; break; @@ -189,6 +259,7 @@ io_progVacuum(prog_t * __restrict prg, u_int toNum) { register int i; int ret = 0; + struct tagPIOPID *p; if (!prg) return 0; @@ -201,8 +272,22 @@ io_progVacuum(prog_t * __restrict prg, u_int toNum) pthread_mutex_lock(&prg->prog_mtx); for (i = array_Size(prg->prog_fds) - 1; prg->prog_cnum > toNum && i > -1; i--) - if (array_Get(prg->prog_fds, i) && isclr(prg->prog_used, i)) { + if (array_Get(prg->prog_fds, i) && isclr(prg->prog_used, i) && +#ifdef POPEN_STREAM + (p = pio_pgetpid(array(prg->prog_fds, i, FILE*)))) { + kill(p->pid, SIGTERM); + usleep(1000); + if (waitpid(p->pid, &p->stat, WNOHANG) > 0) + kill(p->pid, SIGKILL); e_pclose(array(prg->prog_fds, i, FILE*)); +#else + (p = pio_pgetpid((int) array(prg->prog_fds, i, intptr_t)))) { + kill(p->pid, SIGTERM); + usleep(1000); + if (waitpid(p->pid, &p->stat, WNOHANG) > 0) + kill(p->pid, SIGKILL); + e_pclose((int) array(prg->prog_fds, i, intptr_t)); +#endif array_Del(prg->prog_fds, i, 0); prg->prog_cnum--; ret++; @@ -231,7 +316,11 @@ io_progCheck(prog_t * __restrict prg) pthread_mutex_lock(&prg->prog_mtx); for (i = 0; i < array_Size(prg->prog_fds); i++) if (array_Get(prg->prog_fds, i) && +#ifdef POPEN_STREAM (p = pio_pgetpid(array(prg->prog_fds, i, FILE*)))) +#else + (p = pio_pgetpid((int) array(prg->prog_fds, i, intptr_t)))) +#endif if (waitpid(p->pid, &p->stat, WNOHANG) > 0) { clrbit(prg->prog_used, i); ret++; @@ -247,20 +336,36 @@ io_progCheck(prog_t * __restrict prg) * @prg = program pool * return: NULL error or !=NULL attached program handle */ +#ifdef POPEN_STREAM FILE * +#else +int +#endif io_progAttach(prog_t * __restrict prg) { +#ifdef POPEN_STREAM FILE *f = NULL; +#else + int f = -1; +#endif register int i; if (!prg) +#ifdef POPEN_STREAM return NULL; +#else + return -1; +#endif pthread_mutex_lock(&prg->prog_mtx); for (i = 0; i < array_Size(prg->prog_fds); i++) if (array_Get(prg->prog_fds, i) && isclr(prg->prog_used, i)) { setbit(prg->prog_used, i); +#ifdef POPEN_STREAM f = array(prg->prog_fds, i, FILE*); +#else + f = array(prg->prog_fds, i, intptr_t); +#endif break; } pthread_mutex_unlock(&prg->prog_mtx); @@ -276,7 +381,11 @@ io_progAttach(prog_t * __restrict prg) * return: none */ void +#ifdef POPEN_STREAM io_progDetach(prog_t * __restrict prg, FILE *pfd) +#else +io_progDetach(prog_t * __restrict prg, int pfd) +#endif { register int i; @@ -285,7 +394,11 @@ io_progDetach(prog_t * __restrict prg, FILE *pfd) pthread_mutex_lock(&prg->prog_mtx); for (i = 0; i < array_Size(prg->prog_fds); i++) +#ifdef POPEN_STREAM if (array(prg->prog_fds, i, FILE*) == pfd) { +#else + if (array(prg->prog_fds, i, intptr_t) == pfd) { +#endif clrbit(prg->prog_used, i); break; }