--- libaitio/src/exec.c 2013/12/05 14:16:33 1.1.2.5 +++ libaitio/src/exec.c 2013/12/12 23:36:31 1.1.2.21 @@ -1,21 +1,6 @@ #include "global.h" -extern char **environ; -extern int __isthreaded; - -struct pid { - SLIST_ENTRY(pid) next; - FILE *fp; - pid_t pid; -}; -static SLIST_HEAD(, pid) pidlist = SLIST_HEAD_INITIALIZER(pidlist); -static pthread_mutex_t pidlist_mutex = PTHREAD_MUTEX_INITIALIZER; - -#define THREAD_LOCK() if (__isthreaded) pthread_mutex_lock(&pidlist_mutex) -#define THREAD_UNLOCK() if (__isthreaded) pthread_mutex_unlock(&pidlist_mutex) - - /* * io_progInit() - Init program pool * @@ -43,17 +28,29 @@ io_progInit(const char *progName, u_int initNum, u_int prg->prog_maxn = maxNum; strlcpy(prg->prog_name, progName, sizeof prg->prog_name); + prg->prog_used = e_malloc(E_ALIGN(prg->prog_maxn, sizeof *prg->prog_used) / + sizeof *prg->prog_used); + if (!prg->prog_used) { + io_SetErr(elwix_GetErrno(), "%s", elwix_GetError()); + e_free(prg); + return NULL; + } + prg->prog_fds = array_Init(prg->prog_maxn); if (!prg->prog_fds) { io_SetErr(elwix_GetErrno(), "%s", elwix_GetError()); + e_free(prg->prog_used); e_free(prg); return NULL; } pthread_mutex_init(&prg->prog_mtx, NULL); + signal(SIGPIPE, SIG_IGN); - if (io_progOpen(prg, prg->prog_inin) < 0) + if (io_progOpen(prg, prg->prog_inin) < 0) { io_progDestroy(&prg); + prg = NULL; + } return prg; } @@ -71,8 +68,10 @@ io_progDestroy(prog_t ** __restrict pprg) io_progClose(*pprg, 0); + e_free((*pprg)->prog_used); array_Destroy(&(*pprg)->prog_fds); pthread_mutex_destroy(&(*pprg)->prog_mtx); + signal(SIGPIPE, SIG_DFL); e_free(*pprg); *pprg = NULL; @@ -90,6 +89,7 @@ io_progClose(prog_t * __restrict prg, u_int closeNum) { register int i; int ret = 0; + struct tagPIOPID *p; if (!prg) return 0; @@ -101,9 +101,23 @@ io_progClose(prog_t * __restrict prg, u_int closeNum) pthread_mutex_lock(&prg->prog_mtx); for (i = array_Size(prg->prog_fds) - 1; (closeNum ? ret < closeNum : 42) && i > -1; i--) - if (array_Get(prg->prog_fds, i)) { - io_pclose(array(prg->prog_fds, i, FILE*)); + if (array_Get(prg->prog_fds, i) && +#ifdef POPEN_STREAM + (p = pio_pgetpid(array(prg->prog_fds, i, FILE*)))) { +#else + (p = pio_pgetpid((int) array(prg->prog_fds, i, intptr_t)))) { +#endif + kill(p->pid, SIGTERM); + usleep(1000); + if (waitpid(p->pid, &p->stat, WNOHANG) > 0) + kill(p->pid, SIGKILL); +#ifdef POPEN_STREAM + e_pclose(array(prg->prog_fds, i, FILE*)); +#else + e_pclose((int) array(prg->prog_fds, i, intptr_t)); +#endif array_Del(prg->prog_fds, i, 0); + clrbit(prg->prog_used, i); prg->prog_cnum--; ret++; } @@ -113,42 +127,151 @@ io_progClose(prog_t * __restrict prg, u_int closeNum) } /* + * io_progCloseAt() - Close program at pool of certain position + * + * @prg = program pool + * @idx = index at pool + * return: 0 error or !=0 closed program + */ +int +io_progCloseAt(prog_t * __restrict prg, u_int idx) +{ + int ret = 0; + struct tagPIOPID *p; + + if (!prg) + return 0; + if (idx > prg->prog_maxn) { + io_SetErr(EINVAL, "Requested number for close program is over pool's limit"); + return 0; + } + + pthread_mutex_lock(&prg->prog_mtx); + if (array_Get(prg->prog_fds, idx) && +#ifdef POPEN_STREAM + (p = pio_pgetpid(array(prg->prog_fds, idx, FILE*)))) { +#else + (p = pio_pgetpid((int) array(prg->prog_fds, idx, intptr_t)))) { +#endif + kill(p->pid, SIGTERM); + usleep(1000); + if (waitpid(p->pid, &p->stat, WNOHANG) > 0) + kill(p->pid, SIGKILL); +#ifdef POPEN_STREAM + e_pclose(array(prg->prog_fds, idx, FILE*)); +#else + e_pclose((int) array(prg->prog_fds, idx, intptr_t)); +#endif + array_Del(prg->prog_fds, idx, 0); + clrbit(prg->prog_used, idx); + prg->prog_cnum--; + ret++; + } + pthread_mutex_unlock(&prg->prog_mtx); + + return ret; +} + +/* + * io_progOpen2() - Start program from pool on first unused slot + * + * @prg = program pool + * return: -1 error, >-1 reside at slot + */ +int +io_progOpen2(prog_t * __restrict prg) +{ +#ifdef POPEN_STREAM + FILE *f = NULL; +#else + int f = -1; +#endif + int stat, ret = -1; + register int i; + pid_t pid; + + if (!prg) + return -1; + if (prg->prog_cnum + 1 > prg->prog_maxn) { + io_SetErr(EINVAL, "Requested number for program execution is over pool's limit"); + return -1; + } + + pthread_mutex_lock(&prg->prog_mtx); + for (i = 0; i < array_Size(prg->prog_fds); i++) + if (!array_Get(prg->prog_fds, i)) { + f = e_popen(prg->prog_name, "r+", &pid); +#ifdef POPEN_STREAM + if (!f) { +#else + if (f == -1) { +#endif + LOGERR; + break; + } else if (waitpid(pid, &stat, WNOHANG)) { + io_SetErr(ECHILD, "Program with pid=%d exit with status %d", + pid, WIFEXITED(stat) ? WEXITSTATUS(stat) : -1); + e_pclose(f); + break; + } else + array_Set(prg->prog_fds, i, f); + clrbit(prg->prog_used, i); + prg->prog_cnum++; + ret = i; + break; + } + pthread_mutex_unlock(&prg->prog_mtx); + + return ret; +} + +/* * io_progOpen() - Execute number of program(s) * * @prg = program pool * @execNum = execute program(s) (0 max) - * return: 0 error, >0 executed programs and abs(<0) executed programs with logged error + * return: -1 error, >0 executed programs */ int io_progOpen(prog_t * __restrict prg, u_int execNum) { +#ifdef POPEN_STREAM FILE *f; +#else + int f; +#endif int stat, ret = 0; register int i; pid_t pid; if (!prg) - return 0; - if (execNum > prg->prog_maxn) { + return -1; + if (prg->prog_cnum + execNum > prg->prog_maxn) { io_SetErr(EINVAL, "Requested number for program execution is over pool's limit"); - return 0; + return -1; } pthread_mutex_lock(&prg->prog_mtx); for (i = 0; (execNum ? ret < execNum : 42) && i < array_Size(prg->prog_fds); i++) if (!array_Get(prg->prog_fds, i)) { - f = io_popen(prg->prog_name, "r+", &pid); + f = e_popen(prg->prog_name, "r+", &pid); +#ifdef POPEN_STREAM if (!f) { +#else + if (f == -1) { +#endif LOGERR; ret = -1; break; - } else if (waitpid(pid, &stat, WNOHANG) > 0) { - io_SetErr(ECHILD, "Program exit with status %d", - WIFEXITED(stat) ? WEXITSTATUS(stat) : -1); + } else if (waitpid(pid, &stat, WNOHANG)) { + io_SetErr(ECHILD, "Program with pid=%d exit with status %d", + pid, WIFEXITED(stat) ? WEXITSTATUS(stat) : -1); + e_pclose(f); ret = -1; break; } else array_Set(prg->prog_fds, i, f); + clrbit(prg->prog_used, i); prg->prog_cnum++; ret++; } @@ -158,6 +281,34 @@ io_progOpen(prog_t * __restrict prg, u_int execNum) } /* + * io_progGrow() - Execute to number of programs in pool + * + * @prg = program pool + * @toNum = execute to number of programs (0 max) + * return: 0 error or nothing to do, + * >0 executed programs and abs(<0) executed programs with logged error + */ +int +io_progGrow(prog_t * __restrict prg, u_int toNum) +{ + if (!prg) + return 0; + if (toNum > prg->prog_maxn) { + io_SetErr(EINVAL, "Requested number for program execution is over pool's limit"); + return 0; + } + if (!toNum) + toNum = prg->prog_maxn; + if (toNum < prg->prog_inin) + toNum = prg->prog_inin; + + if ((int) (toNum - prg->prog_cnum) < 1) + return 0; + + return io_progOpen(prg, toNum - prg->prog_cnum); +} + +/* * io_progVacuum() - Vacuum pool to running number of programs * * @prg = program pool @@ -169,6 +320,7 @@ io_progVacuum(prog_t * __restrict prg, u_int toNum) { register int i; int ret = 0; + struct tagPIOPID *p; if (!prg) return 0; @@ -181,8 +333,22 @@ io_progVacuum(prog_t * __restrict prg, u_int toNum) pthread_mutex_lock(&prg->prog_mtx); for (i = array_Size(prg->prog_fds) - 1; prg->prog_cnum > toNum && i > -1; i--) - if (array_Get(prg->prog_fds, i)) { - io_pclose(array(prg->prog_fds, i, FILE*)); + if (array_Get(prg->prog_fds, i) && isclr(prg->prog_used, i) && +#ifdef POPEN_STREAM + (p = pio_pgetpid(array(prg->prog_fds, i, FILE*)))) { + kill(p->pid, SIGTERM); + usleep(1000); + if (waitpid(p->pid, &p->stat, WNOHANG) > 0) + kill(p->pid, SIGKILL); + e_pclose(array(prg->prog_fds, i, FILE*)); +#else + (p = pio_pgetpid((int) array(prg->prog_fds, i, intptr_t)))) { + kill(p->pid, SIGTERM); + usleep(1000); + if (waitpid(p->pid, &p->stat, WNOHANG) > 0) + kill(p->pid, SIGKILL); + e_pclose((int) array(prg->prog_fds, i, intptr_t)); +#endif array_Del(prg->prog_fds, i, 0); prg->prog_cnum--; ret++; @@ -192,164 +358,136 @@ io_progVacuum(prog_t * __restrict prg, u_int toNum) return ret; } - /* - * io_popen() - ELWIX replacement of standard popen + * io_progCheck() - Check exit status of program pool * - * @command = command - * @type = type - * @ppid = return pid of child program - * return: NULL error or !=NULL open program + * @prg = program pool + * @re = resurrect program to init number + * return: -1 error or >-1 exited programs */ -FILE * -io_popen(const char *command, const char *type, pid_t *ppid) +int +io_progCheck(prog_t * __restrict prg, int re) { - struct pid *cur; - FILE *iop; - int pdes[2], pid, twoway, cloexec; - char *argv[4]; - struct pid *p; + int ret = 0; + struct tagPIOPID *p; + register int i; - cloexec = strchr(type, 'e') != NULL; - /* - * Lite2 introduced two-way popen() pipes using _socketpair(). - * FreeBSD's pipe() is bidirectional, so we use that. - */ - if (strchr(type, '+')) { - twoway = 1; - type = "r+"; - } else { - twoway = 0; - if ((*type != 'r' && *type != 'w') || - (type[1] && (type[1] != 'e' || type[2]))) - return (NULL); - } - if ((cloexec ? pipe2(pdes, O_CLOEXEC) : pipe(pdes)) < 0) - return (NULL); + if (!prg) + return -1; - if ((cur = e_malloc(sizeof(struct pid))) == NULL) { - close(pdes[0]); - close(pdes[1]); - return (NULL); - } + pthread_mutex_lock(&prg->prog_mtx); + for (i = 0; i < array_Size(prg->prog_fds); i++) + if (array_Get(prg->prog_fds, i) && +#ifdef POPEN_STREAM + (p = pio_pgetpid(array(prg->prog_fds, i, FILE*)))) +#else + (p = pio_pgetpid((int) array(prg->prog_fds, i, intptr_t)))) +#endif + if (waitpid(p->pid, &p->stat, WNOHANG)) { + clrbit(prg->prog_used, i); +#ifdef POPEN_STREAM + e_pclose(array(prg->prog_fds, i, FILE*)); +#else + e_pclose((int) array(prg->prog_fds, i, intptr_t)); +#endif + array_Del(prg->prog_fds, i, 0); + prg->prog_cnum--; + ret++; + } + pthread_mutex_unlock(&prg->prog_mtx); - argv[0] = "sh"; - argv[1] = "-c"; - argv[2] = (char *)command; - argv[3] = NULL; + /* resurrect to init number */ + if (re && ((int) (prg->prog_inin - prg->prog_cnum) > 0)) + io_progOpen(prg, prg->prog_inin - prg->prog_cnum); - THREAD_LOCK(); - switch (pid = vfork()) { - case -1: /* Error. */ - THREAD_UNLOCK(); - close(pdes[0]); - close(pdes[1]); - e_free(cur); - return (NULL); - /* NOTREACHED */ - case 0: /* Child. */ - if (*type == 'r') { - /* - * The _dup2() to STDIN_FILENO is repeated to avoid - * writing to pdes[1], which might corrupt the - * parent's copy. This isn't good enough in - * general, since the _exit() is no return, so - * the compiler is free to corrupt all the local - * variables. - */ - if (!cloexec) - close(pdes[0]); - if (pdes[1] != STDOUT_FILENO) { - dup2(pdes[1], STDOUT_FILENO); - if (!cloexec) - close(pdes[1]); - if (twoway) - dup2(STDOUT_FILENO, STDIN_FILENO); - } else if (twoway && (pdes[1] != STDIN_FILENO)) { - dup2(pdes[1], STDIN_FILENO); - if (cloexec) - fcntl(pdes[1], F_SETFD, 0); - } else if (cloexec) - fcntl(pdes[1], F_SETFD, 0); - } else { - if (pdes[0] != STDIN_FILENO) { - dup2(pdes[0], STDIN_FILENO); - if (!cloexec) - close(pdes[0]); - } else if (cloexec) - fcntl(pdes[0], F_SETFD, 0); - if (!cloexec) - close(pdes[1]); - } - SLIST_FOREACH(p, &pidlist, next) - close(fileno(p->fp)); - execve(_PATH_BSHELL, argv, environ); - _exit(127); - /* NOTREACHED */ - default: - if (ppid) - *ppid = pid; - } - THREAD_UNLOCK(); - - /* Parent; assume fdopen can't fail. */ - if (*type == 'r') { - iop = fdopen(pdes[0], type); - close(pdes[1]); - } else { - iop = fdopen(pdes[1], type); - close(pdes[0]); - } - - /* Link into list of file descriptors. */ - cur->fp = iop; - cur->pid = pid; - THREAD_LOCK(); - SLIST_INSERT_HEAD(&pidlist, cur, next); - THREAD_UNLOCK(); - - return (iop); + return ret; } /* - * io_pclose() - ELWIX replacement of standard pclose + * io_progAttach() - Attach to open program * - * @iop = popen handle - * return: -1 error or !=-1 pid status + * @prg = program pool + * @newOne = Execute new one program after attach + * return: NULL error or !=NULL attached program handle */ +#ifdef POPEN_STREAM +FILE * +#else int -io_pclose(FILE *iop) +#endif +io_progAttach(prog_t * __restrict prg, int newOne) { - struct pid *cur, *last = NULL; - int pstat; - pid_t pid; +#ifdef POPEN_STREAM + FILE *f = NULL; +#else + int f = -1; +#endif + register int i; - /* - * Find the appropriate file pointer and remove it from the list. - */ - THREAD_LOCK(); - SLIST_FOREACH(cur, &pidlist, next) { - if (cur->fp == iop) + if (!prg) +#ifdef POPEN_STREAM + return NULL; +#else + return -1; +#endif + + pthread_mutex_lock(&prg->prog_mtx); + for (i = 0; i < array_Size(prg->prog_fds); i++) + if (array_Get(prg->prog_fds, i) && isclr(prg->prog_used, i)) { + setbit(prg->prog_used, i); +#ifdef POPEN_STREAM + f = array(prg->prog_fds, i, FILE*); +#else + f = array(prg->prog_fds, i, intptr_t); +#endif break; - last = cur; + } + pthread_mutex_unlock(&prg->prog_mtx); + + /* execute new one program */ + if (newOne) { + if (f) + io_progOpen(prg, 1); + else if ((i = io_progOpen2(prg)) > -1) + /* not found free program */ +#ifdef POPEN_STREAM + f = array(prg->prog_fds, i, FILE*); +#else + f = array(prg->prog_fds, i, intptr_t); +#endif } - if (cur == NULL) { - THREAD_UNLOCK(); - return (-1); - } - if (last == NULL) - SLIST_REMOVE_HEAD(&pidlist, next); - else - SLIST_REMOVE_AFTER(last, next); - THREAD_UNLOCK(); - fclose(iop); + return f; +} - do { - pid = wait4(cur->pid, &pstat, 0, (struct rusage *)0); - } while (pid == -1 && errno == EINTR); +/* + * io_progDetach() - Detch from open program + * + * @prg= program pool + * @pfd = attached program handle + * return: none + */ +void +#ifdef POPEN_STREAM +io_progDetach(prog_t * __restrict prg, FILE *pfd) +#else +io_progDetach(prog_t * __restrict prg, int pfd) +#endif +{ + register int i; - e_free(cur); + if (!prg || !pfd) + return; - return (pid == -1 ? -1 : pstat); + pthread_mutex_lock(&prg->prog_mtx); + for (i = 0; i < array_Size(prg->prog_fds); i++) +#ifdef POPEN_STREAM + if (array(prg->prog_fds, i, FILE*) == pfd) { +#else + if (array(prg->prog_fds, i, intptr_t) == pfd) { +#endif + clrbit(prg->prog_used, i); + break; + } + pthread_mutex_unlock(&prg->prog_mtx); }