#include "global.h"
/*
* io_progInit() - Init program pool
*
* @progName = program name for execution
* @initNum = initial started programs
* @maxNum = maximum started programs
* return: NULL error or !=NULL allocated pool (must destroied with io_progDestroy())
*/
prog_t *
io_progInit(const char *progName, u_int initNum, u_int maxNum)
{
prog_t *prg = NULL;
if (initNum > maxNum)
return NULL;
prg = e_malloc(sizeof(prog_t));
if (!prg) {
io_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
return NULL;
} else
memset(prg, 0, sizeof(prog_t));
prg->prog_inin = initNum;
prg->prog_maxn = maxNum;
strlcpy(prg->prog_name, progName, sizeof prg->prog_name);
prg->prog_used = e_malloc(E_ALIGN(prg->prog_maxn, sizeof *prg->prog_used) /
sizeof *prg->prog_used);
if (!prg->prog_used) {
io_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
e_free(prg);
return NULL;
}
prg->prog_fds = array_Init(prg->prog_maxn);
if (!prg->prog_fds) {
io_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
e_free(prg->prog_used);
e_free(prg);
return NULL;
}
pthread_mutex_init(&prg->prog_mtx, NULL);
signal(SIGPIPE, SIG_IGN);
if (io_progOpen(prg, prg->prog_inin) < 0) {
io_progDestroy(&prg);
prg = NULL;
}
return prg;
}
/*
* io_progDestroy() - Destroy entire program pool
*
* @pprg = program pool
* return: none
*/
void
io_progDestroy(prog_t ** __restrict pprg)
{
if (!pprg || !*pprg)
return;
io_progClose(*pprg, 0);
e_free((*pprg)->prog_used);
array_Destroy(&(*pprg)->prog_fds);
pthread_mutex_destroy(&(*pprg)->prog_mtx);
signal(SIGPIPE, SIG_DFL);
e_free(*pprg);
*pprg = NULL;
}
/*
* io_progClose() - Close all programs in pool
*
* @prg = program pool
* @closeNum = close program(s) (0 all)
* return: 0 error, >0 closed programs
*/
int
io_progClose(prog_t * __restrict prg, u_int closeNum)
{
register int i;
int ret = 0;
struct tagPIOPID *p;
if (!prg)
return 0;
if (closeNum > prg->prog_maxn) {
io_SetErr(EINVAL, "Requested number for close program is over pool's limit");
return 0;
}
pthread_mutex_lock(&prg->prog_mtx);
for (i = array_Size(prg->prog_fds) - 1;
(closeNum ? ret < closeNum : 42) && i > -1; i--)
if (array_Get(prg->prog_fds, i) &&
#ifdef POPEN_STREAM
(p = pio_pgetpid(array(prg->prog_fds, i, FILE*)))) {
#else
(p = pio_pgetpid((int) array(prg->prog_fds, i, intptr_t)))) {
#endif
kill(p->pid, SIGTERM);
usleep(1000);
if (waitpid(p->pid, &p->stat, WNOHANG) > 0)
kill(p->pid, SIGKILL);
#ifdef POPEN_STREAM
e_pclose(array(prg->prog_fds, i, FILE*));
#else
e_pclose((int) array(prg->prog_fds, i, intptr_t));
#endif
array_Del(prg->prog_fds, i, 0);
clrbit(prg->prog_used, i);
prg->prog_cnum--;
ret++;
}
pthread_mutex_unlock(&prg->prog_mtx);
return ret;
}
/*
* io_progCloseAt() - Close program at pool of certain position
*
* @prg = program pool
* @idx = index at pool
* return: 0 error or !=0 closed program
*/
int
io_progCloseAt(prog_t * __restrict prg, u_int idx)
{
int ret = 0;
struct tagPIOPID *p;
if (!prg)
return 0;
if (idx > prg->prog_maxn) {
io_SetErr(EINVAL, "Requested number for close program is over pool's limit");
return 0;
}
pthread_mutex_lock(&prg->prog_mtx);
if (array_Get(prg->prog_fds, idx) &&
#ifdef POPEN_STREAM
(p = pio_pgetpid(array(prg->prog_fds, idx, FILE*)))) {
#else
(p = pio_pgetpid((int) array(prg->prog_fds, idx, intptr_t)))) {
#endif
kill(p->pid, SIGTERM);
usleep(1000);
if (waitpid(p->pid, &p->stat, WNOHANG) > 0)
kill(p->pid, SIGKILL);
#ifdef POPEN_STREAM
e_pclose(array(prg->prog_fds, idx, FILE*));
#else
e_pclose((int) array(prg->prog_fds, idx, intptr_t));
#endif
array_Del(prg->prog_fds, idx, 0);
clrbit(prg->prog_used, idx);
prg->prog_cnum--;
ret++;
}
pthread_mutex_unlock(&prg->prog_mtx);
return ret;
}
/*
* io_progCloseOf() - Close program at pool with certain handle
*
* @prg = program pool
* @h = handle of program
* return: 0 error, >0 closed programs
*/
int
#ifdef POPEN_STREAM
io_progCloseOf(prog_t * __restrict prg, FILE *h)
#else
io_progCloseOf(prog_t * __restrict prg, int h)
#endif
{
register int i;
int ret = 0;
struct tagPIOPID *p;
#ifdef POPEN_STREAM
FILE *f;
#else
int f;
#endif
if (!prg)
return 0;
pthread_mutex_lock(&prg->prog_mtx);
for (i = 0; i < array_Size(prg->prog_fds); i++)
if (array_Get(prg->prog_fds, i)) {
#ifdef POPEN_STREAM
f = array(prg->prog_fds, i, FILE*);
if (f == h && (p = pio_pgetpid(array(prg->prog_fds, i, FILE*)))) {
#else
f = (int) array(prg->prog_fds, i, intptr_t);
if (f == h && (p = pio_pgetpid((int) array(prg->prog_fds, i, intptr_t)))) {
#endif
kill(p->pid, SIGTERM);
usleep(1000);
if (waitpid(p->pid, &p->stat, WNOHANG) > 0)
kill(p->pid, SIGKILL);
#ifdef POPEN_STREAM
e_pclose(array(prg->prog_fds, i, FILE*));
#else
e_pclose((int) array(prg->prog_fds, i, intptr_t));
#endif
array_Del(prg->prog_fds, i, 0);
clrbit(prg->prog_used, i);
prg->prog_cnum--;
ret++;
break;
}
}
pthread_mutex_unlock(&prg->prog_mtx);
return ret;
}
/*
* io_progOpen2() - Start program from pool on first unused slot
*
* @prg = program pool
* return: -1 error, >-1 reside at slot
*/
int
io_progOpen2(prog_t * __restrict prg)
{
#ifdef POPEN_STREAM
FILE *f = NULL;
#else
int f = -1;
#endif
int stat, ret = -1;
register int i;
pid_t pid;
if (!prg)
return -1;
if (prg->prog_cnum + 1 > prg->prog_maxn) {
io_SetErr(EINVAL, "Requested number for program execution is over pool's limit");
return -1;
}
pthread_mutex_lock(&prg->prog_mtx);
for (i = 0; i < array_Size(prg->prog_fds); i++)
if (!array_Get(prg->prog_fds, i)) {
f = e_popen(prg->prog_name, "r+", &pid);
#ifdef POPEN_STREAM
if (!f) {
#else
if (f == -1) {
#endif
LOGERR;
break;
} else if (waitpid(pid, &stat, WNOHANG)) {
io_SetErr(ECHILD, "Program with pid=%d exit with status %d",
pid, WIFEXITED(stat) ? WEXITSTATUS(stat) : -1);
e_pclose(f);
break;
} else
array_Set(prg->prog_fds, i, f);
clrbit(prg->prog_used, i);
prg->prog_cnum++;
ret = i;
break;
}
pthread_mutex_unlock(&prg->prog_mtx);
return ret;
}
/*
* io_progOpen() - Execute number of program(s)
*
* @prg = program pool
* @execNum = execute program(s) (0 max)
* return: -1 error, >0 executed programs
*/
int
io_progOpen(prog_t * __restrict prg, u_int execNum)
{
#ifdef POPEN_STREAM
FILE *f;
#else
int f;
#endif
int stat, ret = 0;
register int i;
pid_t pid;
if (!prg)
return -1;
if (prg->prog_cnum + execNum > prg->prog_maxn) {
io_SetErr(EINVAL, "Requested number for program execution is over pool's limit");
return -1;
}
pthread_mutex_lock(&prg->prog_mtx);
for (i = 0; (execNum ? ret < execNum : 42) && i < array_Size(prg->prog_fds); i++)
if (!array_Get(prg->prog_fds, i)) {
f = e_popen(prg->prog_name, "r+", &pid);
#ifdef POPEN_STREAM
if (!f) {
#else
if (f == -1) {
#endif
LOGERR;
ret = -1;
break;
} else if (waitpid(pid, &stat, WNOHANG)) {
io_SetErr(ECHILD, "Program with pid=%d exit with status %d",
pid, WIFEXITED(stat) ? WEXITSTATUS(stat) : -1);
e_pclose(f);
ret = -1;
break;
} else
array_Set(prg->prog_fds, i, f);
clrbit(prg->prog_used, i);
prg->prog_cnum++;
ret++;
}
pthread_mutex_unlock(&prg->prog_mtx);
return ret;
}
/*
* io_progGrow() - Execute to number of programs in pool
*
* @prg = program pool
* @toNum = execute to number of programs (0 max)
* return: 0 error or nothing to do,
* >0 executed programs and abs(<0) executed programs with logged error
*/
int
io_progGrow(prog_t * __restrict prg, u_int toNum)
{
if (!prg)
return 0;
if (toNum > prg->prog_maxn) {
io_SetErr(EINVAL, "Requested number for program execution is over pool's limit");
return 0;
}
if (!toNum)
toNum = prg->prog_maxn;
if (toNum < prg->prog_inin)
toNum = prg->prog_inin;
if ((int) (toNum - prg->prog_cnum) < 1)
return 0;
return io_progOpen(prg, toNum - prg->prog_cnum);
}
/*
* io_progVacuum() - Vacuum pool to running number of programs
*
* @prg = program pool
* @toNum = vacuum to number of programs (0 to init number)
* return: 0 error or >0 closed programs
*/
int
io_progVacuum(prog_t * __restrict prg, u_int toNum)
{
register int i;
int ret = 0;
struct tagPIOPID *p;
if (!prg)
return 0;
if (toNum > prg->prog_maxn) {
io_SetErr(EINVAL, "Requested number for close program is over pool's limit");
return 0;
}
if (!toNum)
toNum = prg->prog_inin;
pthread_mutex_lock(&prg->prog_mtx);
for (i = array_Size(prg->prog_fds) - 1; prg->prog_cnum > toNum && i > -1; i--)
if (array_Get(prg->prog_fds, i) && isclr(prg->prog_used, i) &&
#ifdef POPEN_STREAM
(p = pio_pgetpid(array(prg->prog_fds, i, FILE*)))) {
kill(p->pid, SIGTERM);
usleep(1000);
if (waitpid(p->pid, &p->stat, WNOHANG) > 0)
kill(p->pid, SIGKILL);
e_pclose(array(prg->prog_fds, i, FILE*));
#else
(p = pio_pgetpid((int) array(prg->prog_fds, i, intptr_t)))) {
kill(p->pid, SIGTERM);
usleep(1000);
if (waitpid(p->pid, &p->stat, WNOHANG) > 0)
kill(p->pid, SIGKILL);
e_pclose((int) array(prg->prog_fds, i, intptr_t));
#endif
array_Del(prg->prog_fds, i, 0);
prg->prog_cnum--;
ret++;
}
pthread_mutex_unlock(&prg->prog_mtx);
return ret;
}
/*
* io_progCheck() - Check exit status of program pool
*
* @prg = program pool
* @re = resurrect program to init number
* return: -1 error or >-1 exited programs
*/
int
io_progCheck(prog_t * __restrict prg, int re)
{
int ret = 0;
struct tagPIOPID *p;
register int i;
if (!prg)
return -1;
pthread_mutex_lock(&prg->prog_mtx);
for (i = 0; i < array_Size(prg->prog_fds); i++)
if (array_Get(prg->prog_fds, i) &&
#ifdef POPEN_STREAM
(p = pio_pgetpid(array(prg->prog_fds, i, FILE*)))) {
#else
(p = pio_pgetpid((int) array(prg->prog_fds, i, intptr_t)))) {
#endif
if (waitpid(p->pid, &p->stat, WNOHANG)) {
clrbit(prg->prog_used, i);
#ifdef POPEN_STREAM
e_pclose(array(prg->prog_fds, i, FILE*));
#else
e_pclose((int) array(prg->prog_fds, i, intptr_t));
#endif
array_Del(prg->prog_fds, i, 0);
prg->prog_cnum--;
ret++;
}
}
pthread_mutex_unlock(&prg->prog_mtx);
/* resurrect to init number */
if (re && ((int) (prg->prog_inin - prg->prog_cnum) > 0))
io_progOpen(prg, prg->prog_inin - prg->prog_cnum);
return ret;
}
/*
* io_progAttach() - Attach to open program
*
* @prg = program pool
* @newOne = Execute new one program after attach
* return: NULL error or !=NULL attached program handle
*/
#ifdef POPEN_STREAM
FILE *
#else
int
#endif
io_progAttach(prog_t * __restrict prg, int newOne)
{
#ifdef POPEN_STREAM
FILE *f = NULL;
#else
int f = -1;
#endif
register int i;
if (!prg)
#ifdef POPEN_STREAM
return NULL;
#else
return -1;
#endif
pthread_mutex_lock(&prg->prog_mtx);
for (i = 0; i < array_Size(prg->prog_fds); i++)
if (array_Get(prg->prog_fds, i) && isclr(prg->prog_used, i)) {
setbit(prg->prog_used, i);
#ifdef POPEN_STREAM
f = array(prg->prog_fds, i, FILE*);
#else
f = array(prg->prog_fds, i, intptr_t);
#endif
break;
}
pthread_mutex_unlock(&prg->prog_mtx);
/* execute new one program */
if (newOne) {
if (f)
io_progOpen(prg, 1);
else if ((i = io_progOpen2(prg)) > -1)
/* not found free program */
#ifdef POPEN_STREAM
f = array(prg->prog_fds, i, FILE*);
#else
f = array(prg->prog_fds, i, intptr_t);
#endif
}
return f;
}
/*
* io_progDetach() - Detch from open program
*
* @prg= program pool
* @pfd = attached program handle
* return: none
*/
void
#ifdef POPEN_STREAM
io_progDetach(prog_t * __restrict prg, FILE *pfd)
#else
io_progDetach(prog_t * __restrict prg, int pfd)
#endif
{
register int i;
if (!prg || !pfd)
return;
pthread_mutex_lock(&prg->prog_mtx);
for (i = 0; i < array_Size(prg->prog_fds); i++)
#ifdef POPEN_STREAM
if (array(prg->prog_fds, i, FILE*) == pfd) {
#else
if (array(prg->prog_fds, i, intptr_t) == pfd) {
#endif
clrbit(prg->prog_used, i);
break;
}
pthread_mutex_unlock(&prg->prog_mtx);
}
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>