File:  [ELWIX - Embedded LightWeight unIX -] / libaitio / src / exec.c
Revision 1.4: download - view: text, annotated - select for diffs - revision graph
Sat Feb 8 22:06:17 2014 UTC (10 years, 3 months ago) by misho
Branches: MAIN
CVS tags: io7_4, io7_3, io7_2, io7_1, io7_0, IO7_3, IO7_2, IO7_1, IO7_0, IO6_9, HEAD
version 6.9

/*************************************************************************
* (C) 2013 AITNET ltd - Sofia/Bulgaria - <misho@aitnet.org>
*  by Michael Pounov <misho@elwix.org>
*
* $Author: misho $
* $Id: exec.c,v 1.4 2014/02/08 22:06:17 misho Exp $
*
**************************************************************************
The ELWIX and AITNET software is distributed under the following
terms:

All of the documentation and software included in the ELWIX and AITNET
Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>

Copyright 2004 - 2014
	by Michael Pounov <misho@elwix.org>.  All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
1. Redistributions of source code must retain the above copyright
   notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
   notice, this list of conditions and the following disclaimer in the
   documentation and/or other materials provided with the distribution.
3. All advertising materials mentioning features or use of this software
   must display the following acknowledgement:
This product includes software developed by Michael Pounov <misho@elwix.org>
ELWIX - Embedded LightWeight unIX and its contributors.
4. Neither the name of AITNET nor the names of its contributors
   may be used to endorse or promote products derived from this software
   without specific prior written permission.

THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
SUCH DAMAGE.
*/
#include "global.h"


/*
 * io_progInit() - Init program pool
 *
 * @progName = program name for execution
 * @initNum = initial started programs
 * @maxNum = maximum started programs
 * return: NULL error or !=NULL allocated pool (must destroied with io_progDestroy())
 */
prog_t *
io_progInit(const char *progName, u_int initNum, u_int maxNum)
{
	prog_t *prg = NULL;

	if (initNum > maxNum)
		return NULL;

	prg = e_malloc(sizeof(prog_t));
	if (!prg) {
		io_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
		return NULL;
	} else
		memset(prg, 0, sizeof(prog_t));

	prg->prog_inin = initNum;
	prg->prog_maxn = maxNum;
	strlcpy(prg->prog_name, progName, sizeof prg->prog_name);

	prg->prog_used = e_malloc(E_ALIGN(prg->prog_maxn, sizeof *prg->prog_used) / 
			sizeof *prg->prog_used);
	if (!prg->prog_used) {
		io_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
		e_free(prg);
		return NULL;
	}

	prg->prog_fds = array_Init(prg->prog_maxn);
	if (!prg->prog_fds) {
		io_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
		e_free(prg->prog_used);
		e_free(prg);
		return NULL;
	}

	pthread_mutex_init(&prg->prog_mtx, NULL);
	signal(SIGPIPE, SIG_IGN);

	if (io_progOpen(prg, prg->prog_inin) < 0) {
		io_progDestroy(&prg);
		prg = NULL;
	}
	return prg;
}

/*
 * io_progDestroy() - Destroy entire program pool
 *
 * @pprg = program pool
 * return: none
 */
void
io_progDestroy(prog_t ** __restrict pprg)
{
	if (!pprg || !*pprg)
		return;

	io_progClose(*pprg, 0);

	e_free((*pprg)->prog_used);
	array_Destroy(&(*pprg)->prog_fds);
	pthread_mutex_destroy(&(*pprg)->prog_mtx);
	signal(SIGPIPE, SIG_DFL);

	e_free(*pprg);
	*pprg = NULL;
}

/*
 * io_progClose() - Close all programs in pool
 *
 * @prg = program pool
 * @closeNum = close program(s) (0 all)
 * return: 0 error, >0 closed programs
 */
int
io_progClose(prog_t * __restrict prg, u_int closeNum)
{
	register int i;
	int ret = 0;
	struct tagPIOPID *p;

	if (!prg)
		return 0;
	if (closeNum > prg->prog_maxn) {
		io_SetErr(EINVAL, "Requested number for close program is over pool's limit");
		return 0;
	}

	pthread_mutex_lock(&prg->prog_mtx);
	for (i = array_Size(prg->prog_fds) - 1; 
			(closeNum ? ret < closeNum : 42) && i > -1; i--)
		if (array_Get(prg->prog_fds, i) && 
#ifdef POPEN_STREAM
				(p = pio_pgetpid(array(prg->prog_fds, i, FILE*)))) {
#else
				(p = pio_pgetpid((int) array(prg->prog_fds, i, intptr_t)))) {
#endif
			kill(p->pid, SIGTERM);
			usleep(1000);
			if (waitpid(p->pid, &p->stat, WNOHANG) > 0)
				kill(p->pid, SIGKILL);
#ifdef POPEN_STREAM
			e_pclose(array(prg->prog_fds, i, FILE*));
#else
			e_pclose((int) array(prg->prog_fds, i, intptr_t));
#endif
			array_Del(prg->prog_fds, i, 0);
			clrbit(prg->prog_used, i);
			prg->prog_cnum--;
			ret++;
		}
	pthread_mutex_unlock(&prg->prog_mtx);

	return ret;
}

/*
 * io_progCloseAt() - Close program at pool of certain position
 *
 * @prg = program pool
 * @idx = index at pool
 * return: 0 error or !=0 closed program
 */
int
io_progCloseAt(prog_t * __restrict prg, u_int idx)
{
	int ret = 0;
	struct tagPIOPID *p;

	if (!prg)
		return 0;
	if (idx > prg->prog_maxn) {
		io_SetErr(EINVAL, "Requested number for close program is over pool's limit");
		return 0;
	}

	pthread_mutex_lock(&prg->prog_mtx);
	if (array_Get(prg->prog_fds, idx) && 
#ifdef POPEN_STREAM
			(p = pio_pgetpid(array(prg->prog_fds, idx, FILE*)))) {
#else
			(p = pio_pgetpid((int) array(prg->prog_fds, idx, intptr_t)))) {
#endif
		kill(p->pid, SIGTERM);
		usleep(1000);
		if (waitpid(p->pid, &p->stat, WNOHANG) > 0)
			kill(p->pid, SIGKILL);
#ifdef POPEN_STREAM
		e_pclose(array(prg->prog_fds, idx, FILE*));
#else
		e_pclose((int) array(prg->prog_fds, idx, intptr_t));
#endif
		array_Del(prg->prog_fds, idx, 0);
		clrbit(prg->prog_used, idx);
		prg->prog_cnum--;
		ret++;
	}
	pthread_mutex_unlock(&prg->prog_mtx);

	return ret;
}

/*
 * io_progCloseOf() - Close program at pool with certain handle
 *
 * @prg = program pool
 * @h = handle of program
 * return: 0 error, >0 closed programs
 */
int
#ifdef POPEN_STREAM
io_progCloseOf(prog_t * __restrict prg, FILE *h)
#else
io_progCloseOf(prog_t * __restrict prg, int h)
#endif
{
	register int i;
	int ret = 0;
	struct tagPIOPID *p;
#ifdef POPEN_STREAM
	FILE *f;
#else
	int f;
#endif

	if (!prg)
		return 0;

	pthread_mutex_lock(&prg->prog_mtx);
	for (i = 0; i < array_Size(prg->prog_fds); i++)
		if (array_Get(prg->prog_fds, i)) {
#ifdef POPEN_STREAM
			f = array(prg->prog_fds, i, FILE*);
			if (f == h && (p = pio_pgetpid(array(prg->prog_fds, i, FILE*)))) {
#else
			f = (int) array(prg->prog_fds, i, intptr_t);
			if (f == h && (p = pio_pgetpid((int) array(prg->prog_fds, i, intptr_t)))) {
#endif
				kill(p->pid, SIGTERM);
				usleep(1000);
				if (waitpid(p->pid, &p->stat, WNOHANG) > 0)
					kill(p->pid, SIGKILL);
#ifdef POPEN_STREAM
				e_pclose(array(prg->prog_fds, i, FILE*));
#else
				e_pclose((int) array(prg->prog_fds, i, intptr_t));
#endif
				array_Del(prg->prog_fds, i, 0);
				clrbit(prg->prog_used, i);
				prg->prog_cnum--;
				ret++;
				break;
			}
		}
	pthread_mutex_unlock(&prg->prog_mtx);

	return ret;
}

/*
 * io_progOpen2() - Start program from pool on first unused slot
 *
 * @prg = program pool
 * return: -1 error, >-1 reside at slot
 */
int
io_progOpen2(prog_t * __restrict prg)
{
#ifdef POPEN_STREAM
	FILE *f = NULL;
#else
	int f = -1;
#endif
	int stat, ret = -1;
	register int i;
	pid_t pid;

	if (!prg)
		return -1;
	if (prg->prog_cnum + 1 > prg->prog_maxn) {
		io_SetErr(EINVAL, "Requested number for program execution is over pool's limit");
		return -1;
	}

	pthread_mutex_lock(&prg->prog_mtx);
	for (i = 0; i < array_Size(prg->prog_fds); i++)
		if (!array_Get(prg->prog_fds, i)) {
			f = e_popen(prg->prog_name, "r+", &pid);
#ifdef POPEN_STREAM
			if (!f) {
#else
			if (f == -1) {
#endif
				LOGERR;
				break;
			} else if (waitpid(pid, &stat, WNOHANG)) {
				io_SetErr(ECHILD, "Program with pid=%d exit with status %d", 
						pid, WIFEXITED(stat) ? WEXITSTATUS(stat) : -1);
				e_pclose(f);
				break;
			} else
				array_Set(prg->prog_fds, i, (intptr_t) f);
			clrbit(prg->prog_used, i);
			prg->prog_cnum++;
			ret = i;
			break;
		}
	pthread_mutex_unlock(&prg->prog_mtx);

	return ret;
}

/*
 * io_progOpen() - Execute number of program(s)
 *
 * @prg = program pool
 * @execNum = execute program(s) (0 max)
 * return: -1 error, >0 executed programs
 */
int
io_progOpen(prog_t * __restrict prg, u_int execNum)
{
#ifdef POPEN_STREAM
	FILE *f;
#else
	int f;
#endif
	int stat, ret = 0;
	register int i;
	pid_t pid;

	if (!prg)
		return -1;
	if (prg->prog_cnum + execNum > prg->prog_maxn) {
		io_SetErr(EINVAL, "Requested number for program execution is over pool's limit");
		return -1;
	}

	pthread_mutex_lock(&prg->prog_mtx);
	for (i = 0; (execNum ? ret < execNum : 42) && i < array_Size(prg->prog_fds); i++)
		if (!array_Get(prg->prog_fds, i)) {
			f = e_popen(prg->prog_name, "r+", &pid);
#ifdef POPEN_STREAM
			if (!f) {
#else
			if (f == -1) {
#endif
				LOGERR;
				ret = -1;
				break;
			} else if (waitpid(pid, &stat, WNOHANG)) {
				io_SetErr(ECHILD, "Program with pid=%d exit with status %d", 
						pid, WIFEXITED(stat) ? WEXITSTATUS(stat) : -1);
				e_pclose(f);
				ret = -1;
				break;
			} else
				array_Set(prg->prog_fds, i, (intptr_t) f);
			clrbit(prg->prog_used, i);
			prg->prog_cnum++;
			ret++;
		}
	pthread_mutex_unlock(&prg->prog_mtx);

	return ret;
}

/*
 * io_progGrow() - Execute to number of programs in pool
 *
 * @prg = program pool
 * @toNum = execute to number of programs (0 max)
 * return: 0 error or nothing to do, 
 * 	>0 executed programs and abs(<0) executed programs with logged error
 */
int
io_progGrow(prog_t * __restrict prg, u_int toNum)
{
	if (!prg)
		return 0;
	if (toNum > prg->prog_maxn) {
		io_SetErr(EINVAL, "Requested number for program execution is over pool's limit");
		return 0;
	}
	if (!toNum)
		toNum = prg->prog_maxn;
	if (toNum < prg->prog_inin)
		toNum = prg->prog_inin;

	if ((int) (toNum - prg->prog_cnum) < 1)
		return 0;

	return io_progOpen(prg, toNum - prg->prog_cnum);
}

/*
 * io_progVacuum() - Vacuum pool to running number of programs
 *
 * @prg = program pool
 * @toNum = vacuum to number of programs (0 to init number)
 * return: 0 error or >0 closed programs
 */
int
io_progVacuum(prog_t * __restrict prg, u_int toNum)
{
	register int i;
	int ret = 0;
	struct tagPIOPID *p;

	if (!prg)
		return 0;
	if (toNum > prg->prog_maxn) {
		io_SetErr(EINVAL, "Requested number for close program is over pool's limit");
		return 0;
	}
	if (!toNum)
		toNum = prg->prog_inin;

	pthread_mutex_lock(&prg->prog_mtx);
	for (i = array_Size(prg->prog_fds) - 1; prg->prog_cnum > toNum && i > -1; i--)
		if (array_Get(prg->prog_fds, i) && isclr(prg->prog_used, i) && 
#ifdef POPEN_STREAM
				(p = pio_pgetpid(array(prg->prog_fds, i, FILE*)))) {
			kill(p->pid, SIGTERM);
			usleep(1000);
			if (waitpid(p->pid, &p->stat, WNOHANG) > 0)
				kill(p->pid, SIGKILL);
			e_pclose(array(prg->prog_fds, i, FILE*));
#else
				(p = pio_pgetpid((int) array(prg->prog_fds, i, intptr_t)))) {
			kill(p->pid, SIGTERM);
			usleep(1000);
			if (waitpid(p->pid, &p->stat, WNOHANG) > 0)
				kill(p->pid, SIGKILL);
			e_pclose((int) array(prg->prog_fds, i, intptr_t));
#endif
			array_Del(prg->prog_fds, i, 0);
			prg->prog_cnum--;
			ret++;
		}
	pthread_mutex_unlock(&prg->prog_mtx);

	return ret;
}

/*
 * io_progCheck() - Check exit status of program pool
 *
 * @prg = program pool
 * @re = resurrect program to init number
 * return: -1 error or >-1 exited programs
 */
int
io_progCheck(prog_t * __restrict prg, int re)
{
	int ret = 0;
	struct tagPIOPID *p;
	register int i;

	if (!prg)
		return -1;

	pthread_mutex_lock(&prg->prog_mtx);
	for (i = 0; i < array_Size(prg->prog_fds); i++)
		if (array_Get(prg->prog_fds, i) && 
#ifdef POPEN_STREAM
				(p = pio_pgetpid(array(prg->prog_fds, i, FILE*)))) {
#else
				(p = pio_pgetpid((int) array(prg->prog_fds, i, intptr_t)))) {
#endif
			if (waitpid(p->pid, &p->stat, WNOHANG)) {
				clrbit(prg->prog_used, i);
#ifdef POPEN_STREAM
				e_pclose(array(prg->prog_fds, i, FILE*));
#else
				e_pclose((int) array(prg->prog_fds, i, intptr_t));
#endif
				array_Del(prg->prog_fds, i, 0);
				prg->prog_cnum--;
				ret++;
			}
		}
	pthread_mutex_unlock(&prg->prog_mtx);

	/* resurrect to init number */
	if (re && ((int) (prg->prog_inin - prg->prog_cnum) > 0))
		io_progOpen(prg, prg->prog_inin - prg->prog_cnum);

	return ret;
}

/*
 * io_progAttach() - Attach to open program
 *
 * @prg = program pool
 * @newOne = Execute new one program after attach
 * return: NULL error or !=NULL attached program handle
 */
#ifdef POPEN_STREAM
FILE *
#else
int
#endif
io_progAttach(prog_t * __restrict prg, int newOne)
{
#ifdef POPEN_STREAM
	FILE *f = NULL;
#else
	int f = -1;
#endif
	register int i;

	if (!prg)
#ifdef POPEN_STREAM
		return NULL;
#else
		return -1;
#endif

	pthread_mutex_lock(&prg->prog_mtx);
	for (i = 0; i < array_Size(prg->prog_fds); i++)
		if (array_Get(prg->prog_fds, i) && isclr(prg->prog_used, i)) {
			setbit(prg->prog_used, i);
#ifdef POPEN_STREAM
			f = array(prg->prog_fds, i, FILE*);
#else
			f = array(prg->prog_fds, i, intptr_t);
#endif
			break;
		}
	pthread_mutex_unlock(&prg->prog_mtx);

	/* execute new one program */
	if (newOne) {
		if (f)
			io_progOpen(prg, 1);
		else if ((i = io_progOpen2(prg)) > -1)
			/* not found free program */
#ifdef POPEN_STREAM
			f = array(prg->prog_fds, i, FILE*);
#else
			f = array(prg->prog_fds, i, intptr_t);
#endif
	}

	return f;
}

/*
 * io_progDetach() - Detch from open program
 *
 * @prg= program pool
 * @pfd = attached program handle
 * return: none
 */
void
#ifdef POPEN_STREAM
io_progDetach(prog_t * __restrict prg, FILE *pfd)
#else
io_progDetach(prog_t * __restrict prg, int pfd)
#endif
{
	register int i;

	if (!prg || !pfd)
		return;

	pthread_mutex_lock(&prg->prog_mtx);
	for (i = 0; i < array_Size(prg->prog_fds); i++)
#ifdef POPEN_STREAM
		if (array(prg->prog_fds, i, FILE*) == pfd) {
#else
		if (array(prg->prog_fds, i, intptr_t) == pfd) {
#endif
			clrbit(prg->prog_used, i);
			break;
		}
	pthread_mutex_unlock(&prg->prog_mtx);
}

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>