--- libaitio/src/exec.c 2013/12/06 01:03:05 1.1.2.11 +++ libaitio/src/exec.c 2013/12/08 21:11:54 1.1.2.16 @@ -45,6 +45,7 @@ io_progInit(const char *progName, u_int initNum, u_int } pthread_mutex_init(&prg->prog_mtx, NULL); + signal(SIGPIPE, SIG_IGN); if (io_progOpen(prg, prg->prog_inin) < 0) { io_progDestroy(&prg); @@ -70,6 +71,7 @@ io_progDestroy(prog_t ** __restrict pprg) e_free((*pprg)->prog_used); array_Destroy(&(*pprg)->prog_fds); pthread_mutex_destroy(&(*pprg)->prog_mtx); + signal(SIGPIPE, SIG_DFL); e_free(*pprg); *pprg = NULL; @@ -87,6 +89,7 @@ io_progClose(prog_t * __restrict prg, u_int closeNum) { register int i; int ret = 0; + struct tagPIOPID *p; if (!prg) return 0; @@ -98,8 +101,17 @@ io_progClose(prog_t * __restrict prg, u_int closeNum) pthread_mutex_lock(&prg->prog_mtx); for (i = array_Size(prg->prog_fds) - 1; (closeNum ? ret < closeNum : 42) && i > -1; i--) - if (array_Get(prg->prog_fds, i)) { + if (array_Get(prg->prog_fds, i) && #ifdef POPEN_STREAM + (p = pio_pgetpid(array(prg->prog_fds, i, FILE*)))) { +#else + (p = pio_pgetpid((int) array(prg->prog_fds, i, intptr_t)))) { +#endif + kill(p->pid, SIGTERM); + usleep(1000); + if (waitpid(p->pid, &p->stat, WNOHANG) > 0) + kill(p->pid, SIGKILL); +#ifdef POPEN_STREAM e_pclose(array(prg->prog_fds, i, FILE*)); #else e_pclose((int) array(prg->prog_fds, i, intptr_t)); @@ -115,6 +127,52 @@ io_progClose(prog_t * __restrict prg, u_int closeNum) } /* + * io_progCloseAt() - Close program at pool of certain position + * + * @prg = program pool + * @idx = index at pool + * return: 0 error or !=0 closed program + */ +int +io_progCloseAt(prog_t * __restrict prg, u_int idx) +{ + int ret = 0; + struct tagPIOPID *p; + + if (!prg) + return 0; + if (idx > prg->prog_maxn) { + io_SetErr(EINVAL, "Requested number for close program is over pool's limit"); + return 0; + } + + pthread_mutex_lock(&prg->prog_mtx); + if (array_Get(prg->prog_fds, idx) && +#ifdef POPEN_STREAM + (p = pio_pgetpid(array(prg->prog_fds, idx, FILE*)))) { +#else + (p = pio_pgetpid((int) array(prg->prog_fds, idx, intptr_t)))) { +#endif + kill(p->pid, SIGTERM); + usleep(1000); + if (waitpid(p->pid, &p->stat, WNOHANG) > 0) + kill(p->pid, SIGKILL); +#ifdef POPEN_STREAM + e_pclose(array(prg->prog_fds, idx, FILE*)); +#else + e_pclose((int) array(prg->prog_fds, idx, intptr_t)); +#endif + array_Del(prg->prog_fds, idx, 0); + clrbit(prg->prog_used, idx); + prg->prog_cnum--; + ret++; + } + pthread_mutex_unlock(&prg->prog_mtx); + + return ret; +} + +/* * io_progOpen() - Execute number of program(s) * * @prg = program pool @@ -159,6 +217,7 @@ io_progOpen(prog_t * __restrict prg, u_int execNum) break; } else array_Set(prg->prog_fds, i, f); + clrbit(prg->prog_used, i); prg->prog_cnum++; ret++; } @@ -201,6 +260,7 @@ io_progVacuum(prog_t * __restrict prg, u_int toNum) { register int i; int ret = 0; + struct tagPIOPID *p; if (!prg) return 0; @@ -213,10 +273,20 @@ io_progVacuum(prog_t * __restrict prg, u_int toNum) pthread_mutex_lock(&prg->prog_mtx); for (i = array_Size(prg->prog_fds) - 1; prg->prog_cnum > toNum && i > -1; i--) - if (array_Get(prg->prog_fds, i) && isclr(prg->prog_used, i)) { + if (array_Get(prg->prog_fds, i) && isclr(prg->prog_used, i) && #ifdef POPEN_STREAM + (p = pio_pgetpid(array(prg->prog_fds, i, FILE*)))) { + kill(p->pid, SIGTERM); + usleep(1000); + if (waitpid(p->pid, &p->stat, WNOHANG) > 0) + kill(p->pid, SIGKILL); e_pclose(array(prg->prog_fds, i, FILE*)); #else + (p = pio_pgetpid((int) array(prg->prog_fds, i, intptr_t)))) { + kill(p->pid, SIGTERM); + usleep(1000); + if (waitpid(p->pid, &p->stat, WNOHANG) > 0) + kill(p->pid, SIGKILL); e_pclose((int) array(prg->prog_fds, i, intptr_t)); #endif array_Del(prg->prog_fds, i, 0); @@ -232,10 +302,11 @@ io_progVacuum(prog_t * __restrict prg, u_int toNum) * io_progCheck() - Check exit status of program pool * * @prg = program pool + * @re = resurrect program * return: -1 error or >-1 exited programs */ int -io_progCheck(prog_t * __restrict prg) +io_progCheck(prog_t * __restrict prg, int re) { int ret = 0; struct tagPIOPID *p; @@ -254,10 +325,21 @@ io_progCheck(prog_t * __restrict prg) #endif if (waitpid(p->pid, &p->stat, WNOHANG) > 0) { clrbit(prg->prog_used, i); +#ifdef POPEN_STREAM + e_pclose(array(prg->prog_fds, i, FILE*)); +#else + e_pclose((int) array(prg->prog_fds, i, intptr_t)); +#endif + array_Del(prg->prog_fds, i, 0); + prg->prog_cnum--; ret++; } pthread_mutex_unlock(&prg->prog_mtx); + /* resurrect */ + if (re && ret > 0) + io_progOpen(prg, ret); + return ret; } @@ -265,6 +347,7 @@ io_progCheck(prog_t * __restrict prg) * io_progAttach() - Attach to open program * * @prg = program pool + * @newOne = Execute new one program after attach * return: NULL error or !=NULL attached program handle */ #ifdef POPEN_STREAM @@ -272,7 +355,7 @@ FILE * #else int #endif -io_progAttach(prog_t * __restrict prg) +io_progAttach(prog_t * __restrict prg, int newOne) { #ifdef POPEN_STREAM FILE *f = NULL; @@ -300,6 +383,10 @@ io_progAttach(prog_t * __restrict prg) break; } pthread_mutex_unlock(&prg->prog_mtx); + + /* execute new one program */ + if (newOne && f) + io_progOpen(prg, 1); return f; }