--- libaitio/src/exec.c 2013/12/05 12:43:04 1.1 +++ libaitio/src/exec.c 2013/12/15 22:57:20 1.2 @@ -0,0 +1,551 @@ +#include "global.h" + + +/* + * io_progInit() - Init program pool + * + * @progName = program name for execution + * @initNum = initial started programs + * @maxNum = maximum started programs + * return: NULL error or !=NULL allocated pool (must destroied with io_progDestroy()) + */ +prog_t * +io_progInit(const char *progName, u_int initNum, u_int maxNum) +{ + prog_t *prg = NULL; + + if (initNum > maxNum) + return NULL; + + prg = e_malloc(sizeof(prog_t)); + if (!prg) { + io_SetErr(elwix_GetErrno(), "%s", elwix_GetError()); + return NULL; + } else + memset(prg, 0, sizeof(prog_t)); + + prg->prog_inin = initNum; + prg->prog_maxn = maxNum; + strlcpy(prg->prog_name, progName, sizeof prg->prog_name); + + prg->prog_used = e_malloc(E_ALIGN(prg->prog_maxn, sizeof *prg->prog_used) / + sizeof *prg->prog_used); + if (!prg->prog_used) { + io_SetErr(elwix_GetErrno(), "%s", elwix_GetError()); + e_free(prg); + return NULL; + } + + prg->prog_fds = array_Init(prg->prog_maxn); + if (!prg->prog_fds) { + io_SetErr(elwix_GetErrno(), "%s", elwix_GetError()); + e_free(prg->prog_used); + e_free(prg); + return NULL; + } + + pthread_mutex_init(&prg->prog_mtx, NULL); + signal(SIGPIPE, SIG_IGN); + + if (io_progOpen(prg, prg->prog_inin) < 0) { + io_progDestroy(&prg); + prg = NULL; + } + return prg; +} + +/* + * io_progDestroy() - Destroy entire program pool + * + * @pprg = program pool + * return: none + */ +void +io_progDestroy(prog_t ** __restrict pprg) +{ + if (!pprg || !*pprg) + return; + + io_progClose(*pprg, 0); + + e_free((*pprg)->prog_used); + array_Destroy(&(*pprg)->prog_fds); + pthread_mutex_destroy(&(*pprg)->prog_mtx); + signal(SIGPIPE, SIG_DFL); + + e_free(*pprg); + *pprg = NULL; +} + +/* + * io_progClose() - Close all programs in pool + * + * @prg = program pool + * @closeNum = close program(s) (0 all) + * return: 0 error, >0 closed programs + */ +int +io_progClose(prog_t * __restrict prg, u_int closeNum) +{ + register int i; + int ret = 0; + struct tagPIOPID *p; + + if (!prg) + return 0; + if (closeNum > prg->prog_maxn) { + io_SetErr(EINVAL, "Requested number for close program is over pool's limit"); + return 0; + } + + pthread_mutex_lock(&prg->prog_mtx); + for (i = array_Size(prg->prog_fds) - 1; + (closeNum ? ret < closeNum : 42) && i > -1; i--) + if (array_Get(prg->prog_fds, i) && +#ifdef POPEN_STREAM + (p = pio_pgetpid(array(prg->prog_fds, i, FILE*)))) { +#else + (p = pio_pgetpid((int) array(prg->prog_fds, i, intptr_t)))) { +#endif + kill(p->pid, SIGTERM); + usleep(1000); + if (waitpid(p->pid, &p->stat, WNOHANG) > 0) + kill(p->pid, SIGKILL); +#ifdef POPEN_STREAM + e_pclose(array(prg->prog_fds, i, FILE*)); +#else + e_pclose((int) array(prg->prog_fds, i, intptr_t)); +#endif + array_Del(prg->prog_fds, i, 0); + clrbit(prg->prog_used, i); + prg->prog_cnum--; + ret++; + } + pthread_mutex_unlock(&prg->prog_mtx); + + return ret; +} + +/* + * io_progCloseAt() - Close program at pool of certain position + * + * @prg = program pool + * @idx = index at pool + * return: 0 error or !=0 closed program + */ +int +io_progCloseAt(prog_t * __restrict prg, u_int idx) +{ + int ret = 0; + struct tagPIOPID *p; + + if (!prg) + return 0; + if (idx > prg->prog_maxn) { + io_SetErr(EINVAL, "Requested number for close program is over pool's limit"); + return 0; + } + + pthread_mutex_lock(&prg->prog_mtx); + if (array_Get(prg->prog_fds, idx) && +#ifdef POPEN_STREAM + (p = pio_pgetpid(array(prg->prog_fds, idx, FILE*)))) { +#else + (p = pio_pgetpid((int) array(prg->prog_fds, idx, intptr_t)))) { +#endif + kill(p->pid, SIGTERM); + usleep(1000); + if (waitpid(p->pid, &p->stat, WNOHANG) > 0) + kill(p->pid, SIGKILL); +#ifdef POPEN_STREAM + e_pclose(array(prg->prog_fds, idx, FILE*)); +#else + e_pclose((int) array(prg->prog_fds, idx, intptr_t)); +#endif + array_Del(prg->prog_fds, idx, 0); + clrbit(prg->prog_used, idx); + prg->prog_cnum--; + ret++; + } + pthread_mutex_unlock(&prg->prog_mtx); + + return ret; +} + +/* + * io_progCloseOf() - Close program at pool with certain handle + * + * @prg = program pool + * @h = handle of program + * return: 0 error, >0 closed programs + */ +int +#ifdef POPEN_STREAM +io_progCloseOf(prog_t * __restrict prg, FILE *h) +#else +io_progCloseOf(prog_t * __restrict prg, int h) +#endif +{ + register int i; + int ret = 0; + struct tagPIOPID *p; +#ifdef POPEN_STREAM + FILE *f; +#else + int f; +#endif + + if (!prg) + return 0; + + pthread_mutex_lock(&prg->prog_mtx); + for (i = 0; i < array_Size(prg->prog_fds); i++) + if (array_Get(prg->prog_fds, i)) { +#ifdef POPEN_STREAM + f = array(prg->prog_fds, i, FILE*); + if (f == h && (p = pio_pgetpid(array(prg->prog_fds, i, FILE*)))) { +#else + f = (int) array(prg->prog_fds, i, intptr_t); + if (f == h && (p = pio_pgetpid((int) array(prg->prog_fds, i, intptr_t)))) { +#endif + kill(p->pid, SIGTERM); + usleep(1000); + if (waitpid(p->pid, &p->stat, WNOHANG) > 0) + kill(p->pid, SIGKILL); +#ifdef POPEN_STREAM + e_pclose(array(prg->prog_fds, i, FILE*)); +#else + e_pclose((int) array(prg->prog_fds, i, intptr_t)); +#endif + array_Del(prg->prog_fds, i, 0); + clrbit(prg->prog_used, i); + prg->prog_cnum--; + ret++; + break; + } + } + pthread_mutex_unlock(&prg->prog_mtx); + + return ret; +} + +/* + * io_progOpen2() - Start program from pool on first unused slot + * + * @prg = program pool + * return: -1 error, >-1 reside at slot + */ +int +io_progOpen2(prog_t * __restrict prg) +{ +#ifdef POPEN_STREAM + FILE *f = NULL; +#else + int f = -1; +#endif + int stat, ret = -1; + register int i; + pid_t pid; + + if (!prg) + return -1; + if (prg->prog_cnum + 1 > prg->prog_maxn) { + io_SetErr(EINVAL, "Requested number for program execution is over pool's limit"); + return -1; + } + + pthread_mutex_lock(&prg->prog_mtx); + for (i = 0; i < array_Size(prg->prog_fds); i++) + if (!array_Get(prg->prog_fds, i)) { + f = e_popen(prg->prog_name, "r+", &pid); +#ifdef POPEN_STREAM + if (!f) { +#else + if (f == -1) { +#endif + LOGERR; + break; + } else if (waitpid(pid, &stat, WNOHANG)) { + io_SetErr(ECHILD, "Program with pid=%d exit with status %d", + pid, WIFEXITED(stat) ? WEXITSTATUS(stat) : -1); + e_pclose(f); + break; + } else + array_Set(prg->prog_fds, i, f); + clrbit(prg->prog_used, i); + prg->prog_cnum++; + ret = i; + break; + } + pthread_mutex_unlock(&prg->prog_mtx); + + return ret; +} + +/* + * io_progOpen() - Execute number of program(s) + * + * @prg = program pool + * @execNum = execute program(s) (0 max) + * return: -1 error, >0 executed programs + */ +int +io_progOpen(prog_t * __restrict prg, u_int execNum) +{ +#ifdef POPEN_STREAM + FILE *f; +#else + int f; +#endif + int stat, ret = 0; + register int i; + pid_t pid; + + if (!prg) + return -1; + if (prg->prog_cnum + execNum > prg->prog_maxn) { + io_SetErr(EINVAL, "Requested number for program execution is over pool's limit"); + return -1; + } + + pthread_mutex_lock(&prg->prog_mtx); + for (i = 0; (execNum ? ret < execNum : 42) && i < array_Size(prg->prog_fds); i++) + if (!array_Get(prg->prog_fds, i)) { + f = e_popen(prg->prog_name, "r+", &pid); +#ifdef POPEN_STREAM + if (!f) { +#else + if (f == -1) { +#endif + LOGERR; + ret = -1; + break; + } else if (waitpid(pid, &stat, WNOHANG)) { + io_SetErr(ECHILD, "Program with pid=%d exit with status %d", + pid, WIFEXITED(stat) ? WEXITSTATUS(stat) : -1); + e_pclose(f); + ret = -1; + break; + } else + array_Set(prg->prog_fds, i, f); + clrbit(prg->prog_used, i); + prg->prog_cnum++; + ret++; + } + pthread_mutex_unlock(&prg->prog_mtx); + + return ret; +} + +/* + * io_progGrow() - Execute to number of programs in pool + * + * @prg = program pool + * @toNum = execute to number of programs (0 max) + * return: 0 error or nothing to do, + * >0 executed programs and abs(<0) executed programs with logged error + */ +int +io_progGrow(prog_t * __restrict prg, u_int toNum) +{ + if (!prg) + return 0; + if (toNum > prg->prog_maxn) { + io_SetErr(EINVAL, "Requested number for program execution is over pool's limit"); + return 0; + } + if (!toNum) + toNum = prg->prog_maxn; + if (toNum < prg->prog_inin) + toNum = prg->prog_inin; + + if ((int) (toNum - prg->prog_cnum) < 1) + return 0; + + return io_progOpen(prg, toNum - prg->prog_cnum); +} + +/* + * io_progVacuum() - Vacuum pool to running number of programs + * + * @prg = program pool + * @toNum = vacuum to number of programs (0 to init number) + * return: 0 error or >0 closed programs + */ +int +io_progVacuum(prog_t * __restrict prg, u_int toNum) +{ + register int i; + int ret = 0; + struct tagPIOPID *p; + + if (!prg) + return 0; + if (toNum > prg->prog_maxn) { + io_SetErr(EINVAL, "Requested number for close program is over pool's limit"); + return 0; + } + if (!toNum) + toNum = prg->prog_inin; + + pthread_mutex_lock(&prg->prog_mtx); + for (i = array_Size(prg->prog_fds) - 1; prg->prog_cnum > toNum && i > -1; i--) + if (array_Get(prg->prog_fds, i) && isclr(prg->prog_used, i) && +#ifdef POPEN_STREAM + (p = pio_pgetpid(array(prg->prog_fds, i, FILE*)))) { + kill(p->pid, SIGTERM); + usleep(1000); + if (waitpid(p->pid, &p->stat, WNOHANG) > 0) + kill(p->pid, SIGKILL); + e_pclose(array(prg->prog_fds, i, FILE*)); +#else + (p = pio_pgetpid((int) array(prg->prog_fds, i, intptr_t)))) { + kill(p->pid, SIGTERM); + usleep(1000); + if (waitpid(p->pid, &p->stat, WNOHANG) > 0) + kill(p->pid, SIGKILL); + e_pclose((int) array(prg->prog_fds, i, intptr_t)); +#endif + array_Del(prg->prog_fds, i, 0); + prg->prog_cnum--; + ret++; + } + pthread_mutex_unlock(&prg->prog_mtx); + + return ret; +} + +/* + * io_progCheck() - Check exit status of program pool + * + * @prg = program pool + * @re = resurrect program to init number + * return: -1 error or >-1 exited programs + */ +int +io_progCheck(prog_t * __restrict prg, int re) +{ + int ret = 0; + struct tagPIOPID *p; + register int i; + + if (!prg) + return -1; + + pthread_mutex_lock(&prg->prog_mtx); + for (i = 0; i < array_Size(prg->prog_fds); i++) + if (array_Get(prg->prog_fds, i) && +#ifdef POPEN_STREAM + (p = pio_pgetpid(array(prg->prog_fds, i, FILE*)))) { +#else + (p = pio_pgetpid((int) array(prg->prog_fds, i, intptr_t)))) { +#endif + if (waitpid(p->pid, &p->stat, WNOHANG)) { + clrbit(prg->prog_used, i); +#ifdef POPEN_STREAM + e_pclose(array(prg->prog_fds, i, FILE*)); +#else + e_pclose((int) array(prg->prog_fds, i, intptr_t)); +#endif + array_Del(prg->prog_fds, i, 0); + prg->prog_cnum--; + ret++; + } + } + pthread_mutex_unlock(&prg->prog_mtx); + + /* resurrect to init number */ + if (re && ((int) (prg->prog_inin - prg->prog_cnum) > 0)) + io_progOpen(prg, prg->prog_inin - prg->prog_cnum); + + return ret; +} + +/* + * io_progAttach() - Attach to open program + * + * @prg = program pool + * @newOne = Execute new one program after attach + * return: NULL error or !=NULL attached program handle + */ +#ifdef POPEN_STREAM +FILE * +#else +int +#endif +io_progAttach(prog_t * __restrict prg, int newOne) +{ +#ifdef POPEN_STREAM + FILE *f = NULL; +#else + int f = -1; +#endif + register int i; + + if (!prg) +#ifdef POPEN_STREAM + return NULL; +#else + return -1; +#endif + + pthread_mutex_lock(&prg->prog_mtx); + for (i = 0; i < array_Size(prg->prog_fds); i++) + if (array_Get(prg->prog_fds, i) && isclr(prg->prog_used, i)) { + setbit(prg->prog_used, i); +#ifdef POPEN_STREAM + f = array(prg->prog_fds, i, FILE*); +#else + f = array(prg->prog_fds, i, intptr_t); +#endif + break; + } + pthread_mutex_unlock(&prg->prog_mtx); + + /* execute new one program */ + if (newOne) { + if (f) + io_progOpen(prg, 1); + else if ((i = io_progOpen2(prg)) > -1) + /* not found free program */ +#ifdef POPEN_STREAM + f = array(prg->prog_fds, i, FILE*); +#else + f = array(prg->prog_fds, i, intptr_t); +#endif + } + + return f; +} + +/* + * io_progDetach() - Detch from open program + * + * @prg= program pool + * @pfd = attached program handle + * return: none + */ +void +#ifdef POPEN_STREAM +io_progDetach(prog_t * __restrict prg, FILE *pfd) +#else +io_progDetach(prog_t * __restrict prg, int pfd) +#endif +{ + register int i; + + if (!prg || !pfd) + return; + + pthread_mutex_lock(&prg->prog_mtx); + for (i = 0; i < array_Size(prg->prog_fds); i++) +#ifdef POPEN_STREAM + if (array(prg->prog_fds, i, FILE*) == pfd) { +#else + if (array(prg->prog_fds, i, intptr_t) == pfd) { +#endif + clrbit(prg->prog_used, i); + break; + } + pthread_mutex_unlock(&prg->prog_mtx); +}