1: #include "global.h"
2:
3:
4: /*
5: * io_progInit() - Init program pool
6: *
7: * @progName = program name for execution
8: * @initNum = initial started programs
9: * @maxNum = maximum started programs
10: * return: NULL error or !=NULL allocated pool (must destroied with io_progDestroy())
11: */
12: prog_t *
13: io_progInit(const char *progName, u_int initNum, u_int maxNum)
14: {
15: prog_t *prg = NULL;
16:
17: if (initNum > maxNum)
18: return NULL;
19:
20: prg = e_malloc(sizeof(prog_t));
21: if (!prg) {
22: io_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
23: return NULL;
24: } else
25: memset(prg, 0, sizeof(prog_t));
26:
27: prg->prog_inin = initNum;
28: prg->prog_maxn = maxNum;
29: strlcpy(prg->prog_name, progName, sizeof prg->prog_name);
30:
31: prg->prog_used = e_malloc(E_ALIGN(prg->prog_maxn, sizeof *prg->prog_used) /
32: sizeof *prg->prog_used);
33: if (!prg->prog_used) {
34: io_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
35: e_free(prg);
36: return NULL;
37: }
38:
39: prg->prog_fds = array_Init(prg->prog_maxn);
40: if (!prg->prog_fds) {
41: io_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
42: e_free(prg->prog_used);
43: e_free(prg);
44: return NULL;
45: }
46:
47: pthread_mutex_init(&prg->prog_mtx, NULL);
48: signal(SIGPIPE, SIG_IGN);
49:
50: if (io_progOpen(prg, prg->prog_inin) < 0) {
51: io_progDestroy(&prg);
52: prg = NULL;
53: }
54: return prg;
55: }
56:
57: /*
58: * io_progDestroy() - Destroy entire program pool
59: *
60: * @pprg = program pool
61: * return: none
62: */
63: void
64: io_progDestroy(prog_t ** __restrict pprg)
65: {
66: if (!pprg || !*pprg)
67: return;
68:
69: io_progClose(*pprg, 0);
70:
71: e_free((*pprg)->prog_used);
72: array_Destroy(&(*pprg)->prog_fds);
73: pthread_mutex_destroy(&(*pprg)->prog_mtx);
74: signal(SIGPIPE, SIG_DFL);
75:
76: e_free(*pprg);
77: *pprg = NULL;
78: }
79:
80: /*
81: * io_progClose() - Close all programs in pool
82: *
83: * @prg = program pool
84: * @closeNum = close program(s) (0 all)
85: * return: 0 error, >0 closed programs
86: */
87: int
88: io_progClose(prog_t * __restrict prg, u_int closeNum)
89: {
90: register int i;
91: int ret = 0;
92: struct tagPIOPID *p;
93:
94: if (!prg)
95: return 0;
96: if (closeNum > prg->prog_maxn) {
97: io_SetErr(EINVAL, "Requested number for close program is over pool's limit");
98: return 0;
99: }
100:
101: pthread_mutex_lock(&prg->prog_mtx);
102: for (i = array_Size(prg->prog_fds) - 1;
103: (closeNum ? ret < closeNum : 42) && i > -1; i--)
104: if (array_Get(prg->prog_fds, i) &&
105: #ifdef POPEN_STREAM
106: (p = pio_pgetpid(array(prg->prog_fds, i, FILE*)))) {
107: #else
108: (p = pio_pgetpid((int) array(prg->prog_fds, i, intptr_t)))) {
109: #endif
110: kill(p->pid, SIGTERM);
111: usleep(1000);
112: if (waitpid(p->pid, &p->stat, WNOHANG) > 0)
113: kill(p->pid, SIGKILL);
114: #ifdef POPEN_STREAM
115: e_pclose(array(prg->prog_fds, i, FILE*));
116: #else
117: e_pclose((int) array(prg->prog_fds, i, intptr_t));
118: #endif
119: array_Del(prg->prog_fds, i, 0);
120: clrbit(prg->prog_used, i);
121: prg->prog_cnum--;
122: ret++;
123: }
124: pthread_mutex_unlock(&prg->prog_mtx);
125:
126: return ret;
127: }
128:
129: /*
130: * io_progCloseAt() - Close program at pool of certain position
131: *
132: * @prg = program pool
133: * @idx = index at pool
134: * return: 0 error or !=0 closed program
135: */
136: int
137: io_progCloseAt(prog_t * __restrict prg, u_int idx)
138: {
139: int ret = 0;
140: struct tagPIOPID *p;
141:
142: if (!prg)
143: return 0;
144: if (idx > prg->prog_maxn) {
145: io_SetErr(EINVAL, "Requested number for close program is over pool's limit");
146: return 0;
147: }
148:
149: pthread_mutex_lock(&prg->prog_mtx);
150: if (array_Get(prg->prog_fds, idx) &&
151: #ifdef POPEN_STREAM
152: (p = pio_pgetpid(array(prg->prog_fds, idx, FILE*)))) {
153: #else
154: (p = pio_pgetpid((int) array(prg->prog_fds, idx, intptr_t)))) {
155: #endif
156: kill(p->pid, SIGTERM);
157: usleep(1000);
158: if (waitpid(p->pid, &p->stat, WNOHANG) > 0)
159: kill(p->pid, SIGKILL);
160: #ifdef POPEN_STREAM
161: e_pclose(array(prg->prog_fds, idx, FILE*));
162: #else
163: e_pclose((int) array(prg->prog_fds, idx, intptr_t));
164: #endif
165: array_Del(prg->prog_fds, idx, 0);
166: clrbit(prg->prog_used, idx);
167: prg->prog_cnum--;
168: ret++;
169: }
170: pthread_mutex_unlock(&prg->prog_mtx);
171:
172: return ret;
173: }
174:
175: /*
176: * io_progOpen() - Execute number of program(s)
177: *
178: * @prg = program pool
179: * @execNum = execute program(s) (0 max)
180: * return: 0 error, >0 executed programs and abs(<0) executed programs with logged error
181: */
182: int
183: io_progOpen(prog_t * __restrict prg, u_int execNum)
184: {
185: #ifdef POPEN_STREAM
186: FILE *f;
187: #else
188: int f;
189: #endif
190: int stat, ret = 0;
191: register int i;
192: pid_t pid;
193:
194: if (!prg)
195: return 0;
196: if (prg->prog_cnum + execNum > prg->prog_maxn) {
197: io_SetErr(EINVAL, "Requested number for program execution is over pool's limit");
198: return 0;
199: }
200:
201: pthread_mutex_lock(&prg->prog_mtx);
202: for (i = 0; (execNum ? ret < execNum : 42) && i < array_Size(prg->prog_fds); i++)
203: if (!array_Get(prg->prog_fds, i)) {
204: f = e_popen(prg->prog_name, "r+", &pid);
205: #ifdef POPEN_STREAM
206: if (!f) {
207: #else
208: if (f == -1) {
209: #endif
210: LOGERR;
211: ret = -1;
212: break;
213: } else if (waitpid(pid, &stat, WNOHANG)) {
214: io_SetErr(ECHILD, "Program with pid=%d exit with status %d",
215: pid, WIFEXITED(stat) ? WEXITSTATUS(stat) : -1);
216: ret = -1;
217: break;
218: } else
219: array_Set(prg->prog_fds, i, f);
220: clrbit(prg->prog_used, i);
221: prg->prog_cnum++;
222: ret++;
223: }
224: pthread_mutex_unlock(&prg->prog_mtx);
225:
226: return ret;
227: }
228:
229: /*
230: * io_progGrow() - Execute to number of programs in pool
231: *
232: * @prg = program pool
233: * @toNum = execute to number of programs (0 max)
234: * return: 0 error or nothing to do,
235: * >0 executed programs and abs(<0) executed programs with logged error
236: */
237: int
238: io_progGrow(prog_t * __restrict prg, u_int toNum)
239: {
240: if (!prg)
241: return 0;
242: if (toNum > prg->prog_maxn) {
243: io_SetErr(EINVAL, "Requested number for program execution is over pool's limit");
244: return 0;
245: }
246: if (!toNum)
247: toNum = prg->prog_maxn;
248: if (toNum < prg->prog_inin)
249: toNum = prg->prog_inin;
250:
251: if ((toNum - prg->prog_cnum) < 1)
252: return 0;
253:
254: return io_progOpen(prg, toNum - prg->prog_cnum);
255: }
256:
257: /*
258: * io_progVacuum() - Vacuum pool to running number of programs
259: *
260: * @prg = program pool
261: * @toNum = vacuum to number of programs (0 to init number)
262: * return: 0 error or >0 closed programs
263: */
264: int
265: io_progVacuum(prog_t * __restrict prg, u_int toNum)
266: {
267: register int i;
268: int ret = 0;
269: struct tagPIOPID *p;
270:
271: if (!prg)
272: return 0;
273: if (toNum > prg->prog_maxn) {
274: io_SetErr(EINVAL, "Requested number for close program is over pool's limit");
275: return 0;
276: }
277: if (!toNum)
278: toNum = prg->prog_inin;
279:
280: pthread_mutex_lock(&prg->prog_mtx);
281: for (i = array_Size(prg->prog_fds) - 1; prg->prog_cnum > toNum && i > -1; i--)
282: if (array_Get(prg->prog_fds, i) && isclr(prg->prog_used, i) &&
283: #ifdef POPEN_STREAM
284: (p = pio_pgetpid(array(prg->prog_fds, i, FILE*)))) {
285: kill(p->pid, SIGTERM);
286: usleep(1000);
287: if (waitpid(p->pid, &p->stat, WNOHANG) > 0)
288: kill(p->pid, SIGKILL);
289: e_pclose(array(prg->prog_fds, i, FILE*));
290: #else
291: (p = pio_pgetpid((int) array(prg->prog_fds, i, intptr_t)))) {
292: kill(p->pid, SIGTERM);
293: usleep(1000);
294: if (waitpid(p->pid, &p->stat, WNOHANG) > 0)
295: kill(p->pid, SIGKILL);
296: e_pclose((int) array(prg->prog_fds, i, intptr_t));
297: #endif
298: array_Del(prg->prog_fds, i, 0);
299: prg->prog_cnum--;
300: ret++;
301: }
302: pthread_mutex_unlock(&prg->prog_mtx);
303:
304: return ret;
305: }
306:
307: /*
308: * io_progCheck() - Check exit status of program pool
309: *
310: * @prg = program pool
311: * @re = resurrect program
312: * return: -1 error or >-1 exited programs
313: */
314: int
315: io_progCheck(prog_t * __restrict prg, int re)
316: {
317: int ret = 0;
318: struct tagPIOPID *p;
319: register int i;
320:
321: if (!prg)
322: return -1;
323:
324: pthread_mutex_lock(&prg->prog_mtx);
325: for (i = 0; i < array_Size(prg->prog_fds); i++)
326: if (array_Get(prg->prog_fds, i) &&
327: #ifdef POPEN_STREAM
328: (p = pio_pgetpid(array(prg->prog_fds, i, FILE*))))
329: #else
330: (p = pio_pgetpid((int) array(prg->prog_fds, i, intptr_t))))
331: #endif
332: if (waitpid(p->pid, &p->stat, WNOHANG) > 0) {
333: clrbit(prg->prog_used, i);
334: #ifdef POPEN_STREAM
335: e_pclose(array(prg->prog_fds, i, FILE*));
336: #else
337: e_pclose((int) array(prg->prog_fds, i, intptr_t));
338: #endif
339: array_Del(prg->prog_fds, i, 0);
340: prg->prog_cnum--;
341: ret++;
342: }
343: pthread_mutex_unlock(&prg->prog_mtx);
344:
345: /* resurrect */
346: if (re && ret > 0 && prg->prog_inin - prg->prog_cnum)
347: io_progOpen(prg, prg->prog_inin - prg->prog_cnum);
348:
349: return ret;
350: }
351:
352: /*
353: * io_progAttach() - Attach to open program
354: *
355: * @prg = program pool
356: * @newOne = Execute new one program after attach
357: * return: NULL error or !=NULL attached program handle
358: */
359: #ifdef POPEN_STREAM
360: FILE *
361: #else
362: int
363: #endif
364: io_progAttach(prog_t * __restrict prg, int newOne)
365: {
366: #ifdef POPEN_STREAM
367: FILE *f = NULL;
368: #else
369: int f = -1;
370: #endif
371: register int i;
372:
373: if (!prg)
374: #ifdef POPEN_STREAM
375: return NULL;
376: #else
377: return -1;
378: #endif
379:
380: pthread_mutex_lock(&prg->prog_mtx);
381: for (i = 0; i < array_Size(prg->prog_fds); i++)
382: if (array_Get(prg->prog_fds, i) && isclr(prg->prog_used, i)) {
383: setbit(prg->prog_used, i);
384: #ifdef POPEN_STREAM
385: f = array(prg->prog_fds, i, FILE*);
386: #else
387: f = array(prg->prog_fds, i, intptr_t);
388: #endif
389: break;
390: }
391: pthread_mutex_unlock(&prg->prog_mtx);
392:
393: /* execute new one program */
394: if (newOne && f)
395: io_progOpen(prg, 1);
396:
397: return f;
398: }
399:
400: /*
401: * io_progDetach() - Detch from open program
402: *
403: * @prg= program pool
404: * @pfd = attached program handle
405: * return: none
406: */
407: void
408: #ifdef POPEN_STREAM
409: io_progDetach(prog_t * __restrict prg, FILE *pfd)
410: #else
411: io_progDetach(prog_t * __restrict prg, int pfd)
412: #endif
413: {
414: register int i;
415:
416: if (!prg || !pfd)
417: return;
418:
419: pthread_mutex_lock(&prg->prog_mtx);
420: for (i = 0; i < array_Size(prg->prog_fds); i++)
421: #ifdef POPEN_STREAM
422: if (array(prg->prog_fds, i, FILE*) == pfd) {
423: #else
424: if (array(prg->prog_fds, i, intptr_t) == pfd) {
425: #endif
426: clrbit(prg->prog_used, i);
427: break;
428: }
429: pthread_mutex_unlock(&prg->prog_mtx);
430: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>