#include "global.h" /* * io_progInit() - Init program pool * * @progName = program name for execution * @initNum = initial started programs * @maxNum = maximum started programs * return: NULL error or !=NULL allocated pool (must destroied with io_progDestroy()) */ prog_t * io_progInit(const char *progName, u_int initNum, u_int maxNum) { prog_t *prg = NULL; if (initNum > maxNum) return NULL; prg = e_malloc(sizeof(prog_t)); if (!prg) { io_SetErr(elwix_GetErrno(), "%s", elwix_GetError()); return NULL; } else memset(prg, 0, sizeof(prog_t)); prg->prog_inin = initNum; prg->prog_maxn = maxNum; strlcpy(prg->prog_name, progName, sizeof prg->prog_name); prg->prog_used = e_malloc(E_ALIGN(prg->prog_maxn, sizeof *prg->prog_used) / sizeof *prg->prog_used); if (!prg->prog_used) { io_SetErr(elwix_GetErrno(), "%s", elwix_GetError()); e_free(prg); return NULL; } prg->prog_fds = array_Init(prg->prog_maxn); if (!prg->prog_fds) { io_SetErr(elwix_GetErrno(), "%s", elwix_GetError()); e_free(prg->prog_used); e_free(prg); return NULL; } pthread_mutex_init(&prg->prog_mtx, NULL); signal(SIGPIPE, SIG_IGN); if (io_progOpen(prg, prg->prog_inin) < 0) { io_progDestroy(&prg); prg = NULL; } return prg; } /* * io_progDestroy() - Destroy entire program pool * * @pprg = program pool * return: none */ void io_progDestroy(prog_t ** __restrict pprg) { if (!pprg || !*pprg) return; io_progClose(*pprg, 0); e_free((*pprg)->prog_used); array_Destroy(&(*pprg)->prog_fds); pthread_mutex_destroy(&(*pprg)->prog_mtx); signal(SIGPIPE, SIG_DFL); e_free(*pprg); *pprg = NULL; } /* * io_progClose() - Close all programs in pool * * @prg = program pool * @closeNum = close program(s) (0 all) * return: 0 error, >0 closed programs */ int io_progClose(prog_t * __restrict prg, u_int closeNum) { register int i; int ret = 0; struct tagPIOPID *p; if (!prg) return 0; if (closeNum > prg->prog_maxn) { io_SetErr(EINVAL, "Requested number for close program is over pool's limit"); return 0; } pthread_mutex_lock(&prg->prog_mtx); for (i = array_Size(prg->prog_fds) - 1; (closeNum ? ret < closeNum : 42) && i > -1; i--) if (array_Get(prg->prog_fds, i) && #ifdef POPEN_STREAM (p = pio_pgetpid(array(prg->prog_fds, i, FILE*)))) { #else (p = pio_pgetpid((int) array(prg->prog_fds, i, intptr_t)))) { #endif kill(p->pid, SIGTERM); usleep(1000); if (waitpid(p->pid, &p->stat, WNOHANG) > 0) kill(p->pid, SIGKILL); #ifdef POPEN_STREAM e_pclose(array(prg->prog_fds, i, FILE*)); #else e_pclose((int) array(prg->prog_fds, i, intptr_t)); #endif array_Del(prg->prog_fds, i, 0); clrbit(prg->prog_used, i); prg->prog_cnum--; ret++; } pthread_mutex_unlock(&prg->prog_mtx); return ret; } /* * io_progCloseAt() - Close program at pool of certain position * * @prg = program pool * @idx = index at pool * return: 0 error or !=0 closed program */ int io_progCloseAt(prog_t * __restrict prg, u_int idx) { int ret = 0; struct tagPIOPID *p; if (!prg) return 0; if (idx > prg->prog_maxn) { io_SetErr(EINVAL, "Requested number for close program is over pool's limit"); return 0; } pthread_mutex_lock(&prg->prog_mtx); if (array_Get(prg->prog_fds, idx) && #ifdef POPEN_STREAM (p = pio_pgetpid(array(prg->prog_fds, idx, FILE*)))) { #else (p = pio_pgetpid((int) array(prg->prog_fds, idx, intptr_t)))) { #endif kill(p->pid, SIGTERM); usleep(1000); if (waitpid(p->pid, &p->stat, WNOHANG) > 0) kill(p->pid, SIGKILL); #ifdef POPEN_STREAM e_pclose(array(prg->prog_fds, idx, FILE*)); #else e_pclose((int) array(prg->prog_fds, idx, intptr_t)); #endif array_Del(prg->prog_fds, idx, 0); clrbit(prg->prog_used, idx); prg->prog_cnum--; ret++; } pthread_mutex_unlock(&prg->prog_mtx); return ret; } /* * io_progCloseOf() - Close program at pool with certain handle * * @prg = program pool * @h = handle of program * return: 0 error, >0 closed programs */ int #ifdef POPEN_STREAM io_progCloseOf(prog_t * __restrict prg, FILE *h) #else io_progCloseOf(prog_t * __restrict prg, int h) #endif { register int i; int ret = 0; struct tagPIOPID *p; #ifdef POPEN_STREAM FILE *f; #else int f; #endif if (!prg) return 0; pthread_mutex_lock(&prg->prog_mtx); for (i = 0; i < array_Size(prg->prog_fds); i++) if (array_Get(prg->prog_fds, i)) { #ifdef POPEN_STREAM f = array(prg->prog_fds, i, FILE*); if (f == h && (p = pio_pgetpid(array(prg->prog_fds, i, FILE*)))) { #else f = (int) array(prg->prog_fds, i, intptr_t); if (f == h && (p = pio_pgetpid((int) array(prg->prog_fds, i, intptr_t)))) { #endif kill(p->pid, SIGTERM); usleep(1000); if (waitpid(p->pid, &p->stat, WNOHANG) > 0) kill(p->pid, SIGKILL); #ifdef POPEN_STREAM e_pclose(array(prg->prog_fds, i, FILE*)); #else e_pclose((int) array(prg->prog_fds, i, intptr_t)); #endif array_Del(prg->prog_fds, i, 0); clrbit(prg->prog_used, i); prg->prog_cnum--; ret++; break; } } pthread_mutex_unlock(&prg->prog_mtx); return ret; } /* * io_progOpen2() - Start program from pool on first unused slot * * @prg = program pool * return: -1 error, >-1 reside at slot */ int io_progOpen2(prog_t * __restrict prg) { #ifdef POPEN_STREAM FILE *f = NULL; #else int f = -1; #endif int stat, ret = -1; register int i; pid_t pid; if (!prg) return -1; if (prg->prog_cnum + 1 > prg->prog_maxn) { io_SetErr(EINVAL, "Requested number for program execution is over pool's limit"); return -1; } pthread_mutex_lock(&prg->prog_mtx); for (i = 0; i < array_Size(prg->prog_fds); i++) if (!array_Get(prg->prog_fds, i)) { f = e_popen(prg->prog_name, "r+", &pid); #ifdef POPEN_STREAM if (!f) { #else if (f == -1) { #endif LOGERR; break; } else if (waitpid(pid, &stat, WNOHANG)) { io_SetErr(ECHILD, "Program with pid=%d exit with status %d", pid, WIFEXITED(stat) ? WEXITSTATUS(stat) : -1); e_pclose(f); break; } else array_Set(prg->prog_fds, i, f); clrbit(prg->prog_used, i); prg->prog_cnum++; ret = i; break; } pthread_mutex_unlock(&prg->prog_mtx); return ret; } /* * io_progOpen() - Execute number of program(s) * * @prg = program pool * @execNum = execute program(s) (0 max) * return: -1 error, >0 executed programs */ int io_progOpen(prog_t * __restrict prg, u_int execNum) { #ifdef POPEN_STREAM FILE *f; #else int f; #endif int stat, ret = 0; register int i; pid_t pid; if (!prg) return -1; if (prg->prog_cnum + execNum > prg->prog_maxn) { io_SetErr(EINVAL, "Requested number for program execution is over pool's limit"); return -1; } pthread_mutex_lock(&prg->prog_mtx); for (i = 0; (execNum ? ret < execNum : 42) && i < array_Size(prg->prog_fds); i++) if (!array_Get(prg->prog_fds, i)) { f = e_popen(prg->prog_name, "r+", &pid); #ifdef POPEN_STREAM if (!f) { #else if (f == -1) { #endif LOGERR; ret = -1; break; } else if (waitpid(pid, &stat, WNOHANG)) { io_SetErr(ECHILD, "Program with pid=%d exit with status %d", pid, WIFEXITED(stat) ? WEXITSTATUS(stat) : -1); e_pclose(f); ret = -1; break; } else array_Set(prg->prog_fds, i, f); clrbit(prg->prog_used, i); prg->prog_cnum++; ret++; } pthread_mutex_unlock(&prg->prog_mtx); return ret; } /* * io_progGrow() - Execute to number of programs in pool * * @prg = program pool * @toNum = execute to number of programs (0 max) * return: 0 error or nothing to do, * >0 executed programs and abs(<0) executed programs with logged error */ int io_progGrow(prog_t * __restrict prg, u_int toNum) { if (!prg) return 0; if (toNum > prg->prog_maxn) { io_SetErr(EINVAL, "Requested number for program execution is over pool's limit"); return 0; } if (!toNum) toNum = prg->prog_maxn; if (toNum < prg->prog_inin) toNum = prg->prog_inin; if ((int) (toNum - prg->prog_cnum) < 1) return 0; return io_progOpen(prg, toNum - prg->prog_cnum); } /* * io_progVacuum() - Vacuum pool to running number of programs * * @prg = program pool * @toNum = vacuum to number of programs (0 to init number) * return: 0 error or >0 closed programs */ int io_progVacuum(prog_t * __restrict prg, u_int toNum) { register int i; int ret = 0; struct tagPIOPID *p; if (!prg) return 0; if (toNum > prg->prog_maxn) { io_SetErr(EINVAL, "Requested number for close program is over pool's limit"); return 0; } if (!toNum) toNum = prg->prog_inin; pthread_mutex_lock(&prg->prog_mtx); for (i = array_Size(prg->prog_fds) - 1; prg->prog_cnum > toNum && i > -1; i--) if (array_Get(prg->prog_fds, i) && isclr(prg->prog_used, i) && #ifdef POPEN_STREAM (p = pio_pgetpid(array(prg->prog_fds, i, FILE*)))) { kill(p->pid, SIGTERM); usleep(1000); if (waitpid(p->pid, &p->stat, WNOHANG) > 0) kill(p->pid, SIGKILL); e_pclose(array(prg->prog_fds, i, FILE*)); #else (p = pio_pgetpid((int) array(prg->prog_fds, i, intptr_t)))) { kill(p->pid, SIGTERM); usleep(1000); if (waitpid(p->pid, &p->stat, WNOHANG) > 0) kill(p->pid, SIGKILL); e_pclose((int) array(prg->prog_fds, i, intptr_t)); #endif array_Del(prg->prog_fds, i, 0); prg->prog_cnum--; ret++; } pthread_mutex_unlock(&prg->prog_mtx); return ret; } /* * io_progCheck() - Check exit status of program pool * * @prg = program pool * @re = resurrect program to init number * return: -1 error or >-1 exited programs */ int io_progCheck(prog_t * __restrict prg, int re) { int ret = 0; struct tagPIOPID *p; register int i; if (!prg) return -1; pthread_mutex_lock(&prg->prog_mtx); for (i = 0; i < array_Size(prg->prog_fds); i++) if (array_Get(prg->prog_fds, i) && #ifdef POPEN_STREAM (p = pio_pgetpid(array(prg->prog_fds, i, FILE*)))) { #else (p = pio_pgetpid((int) array(prg->prog_fds, i, intptr_t)))) { #endif if (waitpid(p->pid, &p->stat, WNOHANG)) { clrbit(prg->prog_used, i); #ifdef POPEN_STREAM e_pclose(array(prg->prog_fds, i, FILE*)); #else e_pclose((int) array(prg->prog_fds, i, intptr_t)); #endif array_Del(prg->prog_fds, i, 0); prg->prog_cnum--; ret++; } } pthread_mutex_unlock(&prg->prog_mtx); /* resurrect to init number */ if (re && ((int) (prg->prog_inin - prg->prog_cnum) > 0)) io_progOpen(prg, prg->prog_inin - prg->prog_cnum); return ret; } /* * io_progAttach() - Attach to open program * * @prg = program pool * @newOne = Execute new one program after attach * return: NULL error or !=NULL attached program handle */ #ifdef POPEN_STREAM FILE * #else int #endif io_progAttach(prog_t * __restrict prg, int newOne) { #ifdef POPEN_STREAM FILE *f = NULL; #else int f = -1; #endif register int i; if (!prg) #ifdef POPEN_STREAM return NULL; #else return -1; #endif pthread_mutex_lock(&prg->prog_mtx); for (i = 0; i < array_Size(prg->prog_fds); i++) if (array_Get(prg->prog_fds, i) && isclr(prg->prog_used, i)) { setbit(prg->prog_used, i); #ifdef POPEN_STREAM f = array(prg->prog_fds, i, FILE*); #else f = array(prg->prog_fds, i, intptr_t); #endif break; } pthread_mutex_unlock(&prg->prog_mtx); /* execute new one program */ if (newOne) { if (f) io_progOpen(prg, 1); else if ((i = io_progOpen2(prg)) > -1) /* not found free program */ #ifdef POPEN_STREAM f = array(prg->prog_fds, i, FILE*); #else f = array(prg->prog_fds, i, intptr_t); #endif } return f; } /* * io_progDetach() - Detch from open program * * @prg= program pool * @pfd = attached program handle * return: none */ void #ifdef POPEN_STREAM io_progDetach(prog_t * __restrict prg, FILE *pfd) #else io_progDetach(prog_t * __restrict prg, int pfd) #endif { register int i; if (!prg || !pfd) return; pthread_mutex_lock(&prg->prog_mtx); for (i = 0; i < array_Size(prg->prog_fds); i++) #ifdef POPEN_STREAM if (array(prg->prog_fds, i, FILE*) == pfd) { #else if (array(prg->prog_fds, i, intptr_t) == pfd) { #endif clrbit(prg->prog_used, i); break; } pthread_mutex_unlock(&prg->prog_mtx); }