File:  [ELWIX - Embedded LightWeight unIX -] / libaitio / src / exec.c
Revision 1.1.2.20: download - view: text, annotated - select for diffs - revision graph
Thu Dec 12 23:06:45 2013 UTC (10 years, 6 months ago) by misho
Branches: io6_7
fix check

#include "global.h"


/*
 * io_progInit() - Init program pool
 *
 * @progName = program name for execution
 * @initNum = initial started programs
 * @maxNum = maximum started programs
 * return: NULL error or !=NULL allocated pool (must destroied with io_progDestroy())
 */
prog_t *
io_progInit(const char *progName, u_int initNum, u_int maxNum)
{
	prog_t *prg = NULL;

	if (initNum > maxNum)
		return NULL;

	prg = e_malloc(sizeof(prog_t));
	if (!prg) {
		io_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
		return NULL;
	} else
		memset(prg, 0, sizeof(prog_t));

	prg->prog_inin = initNum;
	prg->prog_maxn = maxNum;
	strlcpy(prg->prog_name, progName, sizeof prg->prog_name);

	prg->prog_used = e_malloc(E_ALIGN(prg->prog_maxn, sizeof *prg->prog_used) / 
			sizeof *prg->prog_used);
	if (!prg->prog_used) {
		io_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
		e_free(prg);
		return NULL;
	}

	prg->prog_fds = array_Init(prg->prog_maxn);
	if (!prg->prog_fds) {
		io_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
		e_free(prg->prog_used);
		e_free(prg);
		return NULL;
	}

	pthread_mutex_init(&prg->prog_mtx, NULL);
	signal(SIGPIPE, SIG_IGN);

	if (io_progOpen(prg, prg->prog_inin) < 0) {
		io_progDestroy(&prg);
		prg = NULL;
	}
	return prg;
}

/*
 * io_progDestroy() - Destroy entire program pool
 *
 * @pprg = program pool
 * return: none
 */
void
io_progDestroy(prog_t ** __restrict pprg)
{
	if (!pprg || !*pprg)
		return;

	io_progClose(*pprg, 0);

	e_free((*pprg)->prog_used);
	array_Destroy(&(*pprg)->prog_fds);
	pthread_mutex_destroy(&(*pprg)->prog_mtx);
	signal(SIGPIPE, SIG_DFL);

	e_free(*pprg);
	*pprg = NULL;
}

/*
 * io_progClose() - Close all programs in pool
 *
 * @prg = program pool
 * @closeNum = close program(s) (0 all)
 * return: 0 error, >0 closed programs
 */
int
io_progClose(prog_t * __restrict prg, u_int closeNum)
{
	register int i;
	int ret = 0;
	struct tagPIOPID *p;

	if (!prg)
		return 0;
	if (closeNum > prg->prog_maxn) {
		io_SetErr(EINVAL, "Requested number for close program is over pool's limit");
		return 0;
	}

	pthread_mutex_lock(&prg->prog_mtx);
	for (i = array_Size(prg->prog_fds) - 1; 
			(closeNum ? ret < closeNum : 42) && i > -1; i--)
		if (array_Get(prg->prog_fds, i) && 
#ifdef POPEN_STREAM
				(p = pio_pgetpid(array(prg->prog_fds, i, FILE*)))) {
#else
				(p = pio_pgetpid((int) array(prg->prog_fds, i, intptr_t)))) {
#endif
			kill(p->pid, SIGTERM);
			usleep(1000);
			if (waitpid(p->pid, &p->stat, WNOHANG) > 0)
				kill(p->pid, SIGKILL);
#ifdef POPEN_STREAM
			e_pclose(array(prg->prog_fds, i, FILE*));
#else
			e_pclose((int) array(prg->prog_fds, i, intptr_t));
#endif
			array_Del(prg->prog_fds, i, 0);
			clrbit(prg->prog_used, i);
			prg->prog_cnum--;
			ret++;
		}
	pthread_mutex_unlock(&prg->prog_mtx);

	return ret;
}

/*
 * io_progCloseAt() - Close program at pool of certain position
 *
 * @prg = program pool
 * @idx = index at pool
 * return: 0 error or !=0 closed program
 */
int
io_progCloseAt(prog_t * __restrict prg, u_int idx)
{
	int ret = 0;
	struct tagPIOPID *p;

	if (!prg)
		return 0;
	if (idx > prg->prog_maxn) {
		io_SetErr(EINVAL, "Requested number for close program is over pool's limit");
		return 0;
	}

	pthread_mutex_lock(&prg->prog_mtx);
	if (array_Get(prg->prog_fds, idx) && 
#ifdef POPEN_STREAM
			(p = pio_pgetpid(array(prg->prog_fds, idx, FILE*)))) {
#else
			(p = pio_pgetpid((int) array(prg->prog_fds, idx, intptr_t)))) {
#endif
		kill(p->pid, SIGTERM);
		usleep(1000);
		if (waitpid(p->pid, &p->stat, WNOHANG) > 0)
			kill(p->pid, SIGKILL);
#ifdef POPEN_STREAM
		e_pclose(array(prg->prog_fds, idx, FILE*));
#else
		e_pclose((int) array(prg->prog_fds, idx, intptr_t));
#endif
		array_Del(prg->prog_fds, idx, 0);
		clrbit(prg->prog_used, idx);
		prg->prog_cnum--;
		ret++;
	}
	pthread_mutex_unlock(&prg->prog_mtx);

	return ret;
}

/*
 * io_progOpen2() - Start program from pool on first unused slot
 *
 * @prg = program pool
 * return: -1 error, >-1 reside at slot
 */
int
io_progOpen2(prog_t * __restrict prg)
{
#ifdef POPEN_STREAM
	FILE *f = NULL;
#else
	int f = -1;
#endif
	int stat, ret = -1;
	register int i;
	pid_t pid;

	if (!prg)
		return -1;
	if (prg->prog_cnum + 1 > prg->prog_maxn) {
		io_SetErr(EINVAL, "Requested number for program execution is over pool's limit");
		return -1;
	}

	pthread_mutex_lock(&prg->prog_mtx);
	for (i = 0; i < array_Size(prg->prog_fds); i++)
		if (!array_Get(prg->prog_fds, i)) {
			f = e_popen(prg->prog_name, "r+", &pid);
#ifdef POPEN_STREAM
			if (!f) {
#else
			if (f == -1) {
#endif
				LOGERR;
				break;
			} else if (waitpid(pid, &stat, WNOHANG)) {
				io_SetErr(ECHILD, "Program with pid=%d exit with status %d", 
						pid, WIFEXITED(stat) ? WEXITSTATUS(stat) : -1);
				e_pclose(f);
				break;
			} else
				array_Set(prg->prog_fds, i, f);
			clrbit(prg->prog_used, i);
			prg->prog_cnum++;
			ret = i;
			break;
		}
	pthread_mutex_unlock(&prg->prog_mtx);

	return ret;
}

/*
 * io_progOpen() - Execute number of program(s)
 *
 * @prg = program pool
 * @execNum = execute program(s) (0 max)
 * return: -1 error, >0 executed programs
 */
int
io_progOpen(prog_t * __restrict prg, u_int execNum)
{
#ifdef POPEN_STREAM
	FILE *f;
#else
	int f;
#endif
	int stat, ret = 0;
	register int i;
	pid_t pid;

	if (!prg)
		return -1;
	if (prg->prog_cnum + execNum > prg->prog_maxn) {
		io_SetErr(EINVAL, "Requested number for program execution is over pool's limit");
		return -1;
	}

	pthread_mutex_lock(&prg->prog_mtx);
	for (i = 0; (execNum ? ret < execNum : 42) && i < array_Size(prg->prog_fds); i++)
		if (!array_Get(prg->prog_fds, i)) {
			printf("%d) %s!!! %d ret=%d\n", i, __func__, execNum, ret);
			f = e_popen(prg->prog_name, "r+", &pid);
#ifdef POPEN_STREAM
			if (!f) {
#else
			if (f == -1) {
#endif
				LOGERR;
				ret = -1;
				break;
			} else if (waitpid(pid, &stat, WNOHANG)) {
				io_SetErr(ECHILD, "Program with pid=%d exit with status %d", 
						pid, WIFEXITED(stat) ? WEXITSTATUS(stat) : -1);
				e_pclose(f);
				ret = -1;
				break;
			} else
				array_Set(prg->prog_fds, i, f);
			clrbit(prg->prog_used, i);
			prg->prog_cnum++;
			ret++;
		}
	pthread_mutex_unlock(&prg->prog_mtx);

	return ret;
}

/*
 * io_progGrow() - Execute to number of programs in pool
 *
 * @prg = program pool
 * @toNum = execute to number of programs (0 max)
 * return: 0 error or nothing to do, 
 * 	>0 executed programs and abs(<0) executed programs with logged error
 */
int
io_progGrow(prog_t * __restrict prg, u_int toNum)
{
	if (!prg)
		return 0;
	if (toNum > prg->prog_maxn) {
		io_SetErr(EINVAL, "Requested number for program execution is over pool's limit");
		return 0;
	}
	if (!toNum)
		toNum = prg->prog_maxn;
	if (toNum < prg->prog_inin)
		toNum = prg->prog_inin;

	if ((int) (toNum - prg->prog_cnum) < 1)
		return 0;

	return io_progOpen(prg, toNum - prg->prog_cnum);
}

/*
 * io_progVacuum() - Vacuum pool to running number of programs
 *
 * @prg = program pool
 * @toNum = vacuum to number of programs (0 to init number)
 * return: 0 error or >0 closed programs
 */
int
io_progVacuum(prog_t * __restrict prg, u_int toNum)
{
	register int i;
	int ret = 0;
	struct tagPIOPID *p;

	if (!prg)
		return 0;
	if (toNum > prg->prog_maxn) {
		io_SetErr(EINVAL, "Requested number for close program is over pool's limit");
		return 0;
	}
	if (!toNum)
		toNum = prg->prog_inin;

	pthread_mutex_lock(&prg->prog_mtx);
	for (i = array_Size(prg->prog_fds) - 1; prg->prog_cnum > toNum && i > -1; i--)
		if (array_Get(prg->prog_fds, i) && isclr(prg->prog_used, i) && 
#ifdef POPEN_STREAM
				(p = pio_pgetpid(array(prg->prog_fds, i, FILE*)))) {
			kill(p->pid, SIGTERM);
			usleep(1000);
			if (waitpid(p->pid, &p->stat, WNOHANG) > 0)
				kill(p->pid, SIGKILL);
			e_pclose(array(prg->prog_fds, i, FILE*));
#else
				(p = pio_pgetpid((int) array(prg->prog_fds, i, intptr_t)))) {
			kill(p->pid, SIGTERM);
			usleep(1000);
			if (waitpid(p->pid, &p->stat, WNOHANG) > 0)
				kill(p->pid, SIGKILL);
			e_pclose((int) array(prg->prog_fds, i, intptr_t));
#endif
			array_Del(prg->prog_fds, i, 0);
			prg->prog_cnum--;
			ret++;
		}
	pthread_mutex_unlock(&prg->prog_mtx);

	return ret;
}

/*
 * io_progCheck() - Check exit status of program pool
 *
 * @prg = program pool
 * @re = resurrect program to init number
 * return: -1 error or >-1 exited programs
 */
int
io_progCheck(prog_t * __restrict prg, int re)
{
	int ret = 0;
	struct tagPIOPID *p;
	register int i;

	if (!prg)
		return -1;

	pthread_mutex_lock(&prg->prog_mtx);
	for (i = 0; i < array_Size(prg->prog_fds); i++)
		if (array_Get(prg->prog_fds, i) && 
#ifdef POPEN_STREAM
				(p = pio_pgetpid(array(prg->prog_fds, i, FILE*))))
#else
				(p = pio_pgetpid((int) array(prg->prog_fds, i, intptr_t))))
#endif
			if (waitpid(p->pid, &p->stat, WNOHANG)) {
				clrbit(prg->prog_used, i);
#ifdef POPEN_STREAM
				e_pclose(array(prg->prog_fds, i, FILE*));
#else
				e_pclose((int) array(prg->prog_fds, i, intptr_t));
#endif
				array_Del(prg->prog_fds, i, 0);
				prg->prog_cnum--;
				ret++;
			}
	pthread_mutex_unlock(&prg->prog_mtx);

	/* resurrect to init number */
	if (re && ((int) (prg->prog_inin - prg->prog_cnum) > 0))
		io_progOpen(prg, prg->prog_inin - prg->prog_cnum);

	return ret;
}

/*
 * io_progAttach() - Attach to open program
 *
 * @prg = program pool
 * @newOne = Execute new one program after attach
 * return: NULL error or !=NULL attached program handle
 */
#ifdef POPEN_STREAM
FILE *
#else
int
#endif
io_progAttach(prog_t * __restrict prg, int newOne)
{
#ifdef POPEN_STREAM
	FILE *f = NULL;
#else
	int f = -1;
#endif
	register int i;

	if (!prg)
#ifdef POPEN_STREAM
		return NULL;
#else
		return -1;
#endif

	pthread_mutex_lock(&prg->prog_mtx);
	for (i = 0; i < array_Size(prg->prog_fds); i++)
		if (array_Get(prg->prog_fds, i) && isclr(prg->prog_used, i)) {
			setbit(prg->prog_used, i);
#ifdef POPEN_STREAM
			f = array(prg->prog_fds, i, FILE*);
#else
			f = array(prg->prog_fds, i, intptr_t);
#endif
			break;
		}
	pthread_mutex_unlock(&prg->prog_mtx);

	/* execute new one program */
	if (newOne) {
		if (f)
			io_progOpen(prg, 1);
		else if ((i = io_progOpen2(prg)) > -1)
			/* not found free program */
#ifdef POPEN_STREAM
			f = array(prg->prog_fds, i, FILE*);
#else
			f = array(prg->prog_fds, i, intptr_t);
#endif
	}

	return f;
}

/*
 * io_progDetach() - Detch from open program
 *
 * @prg= program pool
 * @pfd = attached program handle
 * return: none
 */
void
#ifdef POPEN_STREAM
io_progDetach(prog_t * __restrict prg, FILE *pfd)
#else
io_progDetach(prog_t * __restrict prg, int pfd)
#endif
{
	register int i;

	if (!prg || !pfd)
		return;

	pthread_mutex_lock(&prg->prog_mtx);
	for (i = 0; i < array_Size(prg->prog_fds); i++)
#ifdef POPEN_STREAM
		if (array(prg->prog_fds, i, FILE*) == pfd) {
#else
		if (array(prg->prog_fds, i, intptr_t) == pfd) {
#endif
			clrbit(prg->prog_used, i);
			break;
		}
	pthread_mutex_unlock(&prg->prog_mtx);
}

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>