--- libaitio/src/exec.c 2013/12/08 20:43:22 1.1.2.14 +++ libaitio/src/exec.c 2013/12/15 22:31:44 1.1.2.22 @@ -173,11 +173,121 @@ io_progCloseAt(prog_t * __restrict prg, u_int idx) } /* + * io_progCloseOf() - Close program at pool with certain handle + * + * @prg = program pool + * @h = handle of program + * return: 0 error, >0 closed programs + */ +int +#ifdef POPEN_STREAM +io_progCloseOf(prog_t * __restrict prg, FILE *h) +#else +io_progCloseOf(prog_t * __restrict prg, int h) +#endif +{ + register int i; + int ret = 0; + struct tagPIOPID *p; +#ifdef POPEN_STREAM + FILE *f; +#else + int f; +#endif + + if (!prg) + return 0; + + pthread_mutex_lock(&prg->prog_mtx); + for (i = 0; i < array_Size(prg->prog_fds); i++) + if (array_Get(prg->prog_fds, i)) { +#ifdef POPEN_STREAM + f = array(prg->prog_fds, i, FILE*); + if (f == h && (p = pio_pgetpid(array(prg->prog_fds, i, FILE*)))) { +#else + f = (int) array(prg->prog_fds, i, intptr_t); + if (f == h && (p = pio_pgetpid((int) array(prg->prog_fds, i, intptr_t)))) { +#endif + kill(p->pid, SIGTERM); + usleep(1000); + if (waitpid(p->pid, &p->stat, WNOHANG) > 0) + kill(p->pid, SIGKILL); +#ifdef POPEN_STREAM + e_pclose(array(prg->prog_fds, i, FILE*)); +#else + e_pclose((int) array(prg->prog_fds, i, intptr_t)); +#endif + array_Del(prg->prog_fds, i, 0); + clrbit(prg->prog_used, i); + prg->prog_cnum--; + ret++; + break; + } + } + pthread_mutex_unlock(&prg->prog_mtx); + + return ret; +} + +/* + * io_progOpen2() - Start program from pool on first unused slot + * + * @prg = program pool + * return: -1 error, >-1 reside at slot + */ +int +io_progOpen2(prog_t * __restrict prg) +{ +#ifdef POPEN_STREAM + FILE *f = NULL; +#else + int f = -1; +#endif + int stat, ret = -1; + register int i; + pid_t pid; + + if (!prg) + return -1; + if (prg->prog_cnum + 1 > prg->prog_maxn) { + io_SetErr(EINVAL, "Requested number for program execution is over pool's limit"); + return -1; + } + + pthread_mutex_lock(&prg->prog_mtx); + for (i = 0; i < array_Size(prg->prog_fds); i++) + if (!array_Get(prg->prog_fds, i)) { + f = e_popen(prg->prog_name, "r+", &pid); +#ifdef POPEN_STREAM + if (!f) { +#else + if (f == -1) { +#endif + LOGERR; + break; + } else if (waitpid(pid, &stat, WNOHANG)) { + io_SetErr(ECHILD, "Program with pid=%d exit with status %d", + pid, WIFEXITED(stat) ? WEXITSTATUS(stat) : -1); + e_pclose(f); + break; + } else + array_Set(prg->prog_fds, i, f); + clrbit(prg->prog_used, i); + prg->prog_cnum++; + ret = i; + break; + } + pthread_mutex_unlock(&prg->prog_mtx); + + return ret; +} + +/* * io_progOpen() - Execute number of program(s) * * @prg = program pool * @execNum = execute program(s) (0 max) - * return: 0 error, >0 executed programs and abs(<0) executed programs with logged error + * return: -1 error, >0 executed programs */ int io_progOpen(prog_t * __restrict prg, u_int execNum) @@ -192,10 +302,10 @@ io_progOpen(prog_t * __restrict prg, u_int execNum) pid_t pid; if (!prg) - return 0; + return -1; if (prg->prog_cnum + execNum > prg->prog_maxn) { io_SetErr(EINVAL, "Requested number for program execution is over pool's limit"); - return 0; + return -1; } pthread_mutex_lock(&prg->prog_mtx); @@ -213,10 +323,12 @@ io_progOpen(prog_t * __restrict prg, u_int execNum) } else if (waitpid(pid, &stat, WNOHANG)) { io_SetErr(ECHILD, "Program with pid=%d exit with status %d", pid, WIFEXITED(stat) ? WEXITSTATUS(stat) : -1); + e_pclose(f); ret = -1; break; } else array_Set(prg->prog_fds, i, f); + clrbit(prg->prog_used, i); prg->prog_cnum++; ret++; } @@ -230,7 +342,8 @@ io_progOpen(prog_t * __restrict prg, u_int execNum) * * @prg = program pool * @toNum = execute to number of programs (0 max) - * return: 0 error, >0 executed programs and abs(<0) executed programs with logged error + * return: 0 error or nothing to do, + * >0 executed programs and abs(<0) executed programs with logged error */ int io_progGrow(prog_t * __restrict prg, u_int toNum) @@ -243,7 +356,12 @@ io_progGrow(prog_t * __restrict prg, u_int toNum) } if (!toNum) toNum = prg->prog_maxn; + if (toNum < prg->prog_inin) + toNum = prg->prog_inin; + if ((int) (toNum - prg->prog_cnum) < 1) + return 0; + return io_progOpen(prg, toNum - prg->prog_cnum); } @@ -301,10 +419,11 @@ io_progVacuum(prog_t * __restrict prg, u_int toNum) * io_progCheck() - Check exit status of program pool * * @prg = program pool + * @re = resurrect program to init number * return: -1 error or >-1 exited programs */ int -io_progCheck(prog_t * __restrict prg) +io_progCheck(prog_t * __restrict prg, int re) { int ret = 0; struct tagPIOPID *p; @@ -317,16 +436,28 @@ io_progCheck(prog_t * __restrict prg) for (i = 0; i < array_Size(prg->prog_fds); i++) if (array_Get(prg->prog_fds, i) && #ifdef POPEN_STREAM - (p = pio_pgetpid(array(prg->prog_fds, i, FILE*)))) + (p = pio_pgetpid(array(prg->prog_fds, i, FILE*)))) { #else - (p = pio_pgetpid((int) array(prg->prog_fds, i, intptr_t)))) + (p = pio_pgetpid((int) array(prg->prog_fds, i, intptr_t)))) { #endif - if (waitpid(p->pid, &p->stat, WNOHANG) > 0) { + if (waitpid(p->pid, &p->stat, WNOHANG)) { clrbit(prg->prog_used, i); +#ifdef POPEN_STREAM + e_pclose(array(prg->prog_fds, i, FILE*)); +#else + e_pclose((int) array(prg->prog_fds, i, intptr_t)); +#endif + array_Del(prg->prog_fds, i, 0); + prg->prog_cnum--; ret++; } + } pthread_mutex_unlock(&prg->prog_mtx); + /* resurrect to init number */ + if (re && ((int) (prg->prog_inin - prg->prog_cnum) > 0)) + io_progOpen(prg, prg->prog_inin - prg->prog_cnum); + return ret; } @@ -334,6 +465,7 @@ io_progCheck(prog_t * __restrict prg) * io_progAttach() - Attach to open program * * @prg = program pool + * @newOne = Execute new one program after attach * return: NULL error or !=NULL attached program handle */ #ifdef POPEN_STREAM @@ -341,7 +473,7 @@ FILE * #else int #endif -io_progAttach(prog_t * __restrict prg) +io_progAttach(prog_t * __restrict prg, int newOne) { #ifdef POPEN_STREAM FILE *f = NULL; @@ -369,6 +501,20 @@ io_progAttach(prog_t * __restrict prg) break; } pthread_mutex_unlock(&prg->prog_mtx); + printf("+ i=%d f=%d\n", i, f); + + /* execute new one program */ + if (newOne) { + if (f) + io_progOpen(prg, 1); + else if ((i = io_progOpen2(prg)) > -1) + /* not found free program */ +#ifdef POPEN_STREAM + f = array(prg->prog_fds, i, FILE*); +#else + f = array(prg->prog_fds, i, intptr_t); +#endif + } return f; }