1: #include "global.h"
2:
3:
4: /*
5: * io_progInit() - Init program pool
6: *
7: * @progName = program name for execution
8: * @initNum = initial started programs
9: * @maxNum = maximum started programs
10: * return: NULL error or !=NULL allocated pool (must destroied with io_progDestroy())
11: */
12: prog_t *
13: io_progInit(const char *progName, u_int initNum, u_int maxNum)
14: {
15: prog_t *prg = NULL;
16:
17: if (initNum > maxNum)
18: return NULL;
19:
20: prg = e_malloc(sizeof(prog_t));
21: if (!prg) {
22: io_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
23: return NULL;
24: } else
25: memset(prg, 0, sizeof(prog_t));
26:
27: prg->prog_inin = initNum;
28: prg->prog_maxn = maxNum;
29: strlcpy(prg->prog_name, progName, sizeof prg->prog_name);
30:
31: prg->prog_used = e_malloc(E_ALIGN(prg->prog_maxn, sizeof *prg->prog_used) /
32: sizeof *prg->prog_used);
33: if (!prg->prog_used) {
34: io_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
35: e_free(prg);
36: return NULL;
37: }
38:
39: prg->prog_fds = array_Init(prg->prog_maxn);
40: if (!prg->prog_fds) {
41: io_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
42: e_free(prg->prog_used);
43: e_free(prg);
44: return NULL;
45: }
46:
47: pthread_mutex_init(&prg->prog_mtx, NULL);
48: signal(SIGPIPE, SIG_IGN);
49:
50: if (io_progOpen(prg, prg->prog_inin) < 0) {
51: io_progDestroy(&prg);
52: prg = NULL;
53: }
54: return prg;
55: }
56:
57: /*
58: * io_progDestroy() - Destroy entire program pool
59: *
60: * @pprg = program pool
61: * return: none
62: */
63: void
64: io_progDestroy(prog_t ** __restrict pprg)
65: {
66: if (!pprg || !*pprg)
67: return;
68:
69: io_progClose(*pprg, 0);
70:
71: e_free((*pprg)->prog_used);
72: array_Destroy(&(*pprg)->prog_fds);
73: pthread_mutex_destroy(&(*pprg)->prog_mtx);
74: signal(SIGPIPE, SIG_DFL);
75:
76: e_free(*pprg);
77: *pprg = NULL;
78: }
79:
80: /*
81: * io_progClose() - Close all programs in pool
82: *
83: * @prg = program pool
84: * @closeNum = close program(s) (0 all)
85: * return: 0 error, >0 closed programs
86: */
87: int
88: io_progClose(prog_t * __restrict prg, u_int closeNum)
89: {
90: register int i;
91: int ret = 0;
92: struct tagPIOPID *p;
93:
94: if (!prg)
95: return 0;
96: if (closeNum > prg->prog_maxn) {
97: io_SetErr(EINVAL, "Requested number for close program is over pool's limit");
98: return 0;
99: }
100:
101: pthread_mutex_lock(&prg->prog_mtx);
102: for (i = array_Size(prg->prog_fds) - 1;
103: (closeNum ? ret < closeNum : 42) && i > -1; i--)
104: if (array_Get(prg->prog_fds, i) &&
105: #ifdef POPEN_STREAM
106: (p = pio_pgetpid(array(prg->prog_fds, i, FILE*)))) {
107: #else
108: (p = pio_pgetpid((int) array(prg->prog_fds, i, intptr_t)))) {
109: #endif
110: kill(p->pid, SIGTERM);
111: usleep(1000);
112: if (waitpid(p->pid, &p->stat, WNOHANG) > 0)
113: kill(p->pid, SIGKILL);
114: #ifdef POPEN_STREAM
115: e_pclose(array(prg->prog_fds, i, FILE*));
116: #else
117: e_pclose((int) array(prg->prog_fds, i, intptr_t));
118: #endif
119: array_Del(prg->prog_fds, i, 0);
120: clrbit(prg->prog_used, i);
121: prg->prog_cnum--;
122: ret++;
123: }
124: pthread_mutex_unlock(&prg->prog_mtx);
125:
126: return ret;
127: }
128:
129: /*
130: * io_progCloseAt() - Close program at pool of certain position
131: *
132: * @prg = program pool
133: * @idx = index at pool
134: * return: 0 error or !=0 closed program
135: */
136: int
137: io_progCloseAt(prog_t * __restrict prg, u_int idx)
138: {
139: int ret = 0;
140: struct tagPIOPID *p;
141:
142: if (!prg)
143: return 0;
144: if (idx > prg->prog_maxn) {
145: io_SetErr(EINVAL, "Requested number for close program is over pool's limit");
146: return 0;
147: }
148:
149: pthread_mutex_lock(&prg->prog_mtx);
150: if (array_Get(prg->prog_fds, idx) &&
151: #ifdef POPEN_STREAM
152: (p = pio_pgetpid(array(prg->prog_fds, idx, FILE*)))) {
153: #else
154: (p = pio_pgetpid((int) array(prg->prog_fds, idx, intptr_t)))) {
155: #endif
156: kill(p->pid, SIGTERM);
157: usleep(1000);
158: if (waitpid(p->pid, &p->stat, WNOHANG) > 0)
159: kill(p->pid, SIGKILL);
160: #ifdef POPEN_STREAM
161: e_pclose(array(prg->prog_fds, idx, FILE*));
162: #else
163: e_pclose((int) array(prg->prog_fds, idx, intptr_t));
164: #endif
165: array_Del(prg->prog_fds, idx, 0);
166: clrbit(prg->prog_used, idx);
167: prg->prog_cnum--;
168: ret++;
169: }
170: pthread_mutex_unlock(&prg->prog_mtx);
171:
172: return ret;
173: }
174:
175: /*
176: * io_progCloseOf() - Close program at pool with certain handle
177: *
178: * @prg = program pool
179: * @h = handle of program
180: * return: 0 error, >0 closed programs
181: */
182: int
183: #ifdef POPEN_STREAM
184: io_progCloseOf(prog_t * __restrict prg, FILE *h)
185: #else
186: io_progCloseOf(prog_t * __restrict prg, int h)
187: #endif
188: {
189: register int i;
190: int ret = 0;
191: struct tagPIOPID *p;
192: #ifdef POPEN_STREAM
193: FILE *f;
194: #else
195: int f;
196: #endif
197:
198: if (!prg)
199: return 0;
200:
201: pthread_mutex_lock(&prg->prog_mtx);
202: for (i = 0; i < array_Size(prg->prog_fds); i++)
203: if (array_Get(prg->prog_fds, i)) {
204: #ifdef POPEN_STREAM
205: f = array(prg->prog_fds, i, FILE*);
206: if (f == h && (p = pio_pgetpid(array(prg->prog_fds, i, FILE*)))) {
207: #else
208: f = (int) array(prg->prog_fds, i, intptr_t);
209: if (f == h && (p = pio_pgetpid((int) array(prg->prog_fds, i, intptr_t)))) {
210: #endif
211: kill(p->pid, SIGTERM);
212: usleep(1000);
213: if (waitpid(p->pid, &p->stat, WNOHANG) > 0)
214: kill(p->pid, SIGKILL);
215: #ifdef POPEN_STREAM
216: e_pclose(array(prg->prog_fds, i, FILE*));
217: #else
218: e_pclose((int) array(prg->prog_fds, i, intptr_t));
219: #endif
220: array_Del(prg->prog_fds, i, 0);
221: clrbit(prg->prog_used, i);
222: prg->prog_cnum--;
223: ret++;
224: break;
225: }
226: }
227: pthread_mutex_unlock(&prg->prog_mtx);
228:
229: return ret;
230: }
231:
232: /*
233: * io_progOpen2() - Start program from pool on first unused slot
234: *
235: * @prg = program pool
236: * return: -1 error, >-1 reside at slot
237: */
238: int
239: io_progOpen2(prog_t * __restrict prg)
240: {
241: #ifdef POPEN_STREAM
242: FILE *f = NULL;
243: #else
244: int f = -1;
245: #endif
246: int stat, ret = -1;
247: register int i;
248: pid_t pid;
249:
250: if (!prg)
251: return -1;
252: if (prg->prog_cnum + 1 > prg->prog_maxn) {
253: io_SetErr(EINVAL, "Requested number for program execution is over pool's limit");
254: return -1;
255: }
256:
257: pthread_mutex_lock(&prg->prog_mtx);
258: for (i = 0; i < array_Size(prg->prog_fds); i++)
259: if (!array_Get(prg->prog_fds, i)) {
260: f = e_popen(prg->prog_name, "r+", &pid);
261: #ifdef POPEN_STREAM
262: if (!f) {
263: #else
264: if (f == -1) {
265: #endif
266: LOGERR;
267: break;
268: } else if (waitpid(pid, &stat, WNOHANG)) {
269: io_SetErr(ECHILD, "Program with pid=%d exit with status %d",
270: pid, WIFEXITED(stat) ? WEXITSTATUS(stat) : -1);
271: e_pclose(f);
272: break;
273: } else
274: array_Set(prg->prog_fds, i, f);
275: clrbit(prg->prog_used, i);
276: prg->prog_cnum++;
277: ret = i;
278: break;
279: }
280: pthread_mutex_unlock(&prg->prog_mtx);
281:
282: return ret;
283: }
284:
285: /*
286: * io_progOpen() - Execute number of program(s)
287: *
288: * @prg = program pool
289: * @execNum = execute program(s) (0 max)
290: * return: -1 error, >0 executed programs
291: */
292: int
293: io_progOpen(prog_t * __restrict prg, u_int execNum)
294: {
295: #ifdef POPEN_STREAM
296: FILE *f;
297: #else
298: int f;
299: #endif
300: int stat, ret = 0;
301: register int i;
302: pid_t pid;
303:
304: if (!prg)
305: return -1;
306: if (prg->prog_cnum + execNum > prg->prog_maxn) {
307: io_SetErr(EINVAL, "Requested number for program execution is over pool's limit");
308: return -1;
309: }
310:
311: pthread_mutex_lock(&prg->prog_mtx);
312: for (i = 0; (execNum ? ret < execNum : 42) && i < array_Size(prg->prog_fds); i++)
313: if (!array_Get(prg->prog_fds, i)) {
314: f = e_popen(prg->prog_name, "r+", &pid);
315: #ifdef POPEN_STREAM
316: if (!f) {
317: #else
318: if (f == -1) {
319: #endif
320: LOGERR;
321: ret = -1;
322: break;
323: } else if (waitpid(pid, &stat, WNOHANG)) {
324: io_SetErr(ECHILD, "Program with pid=%d exit with status %d",
325: pid, WIFEXITED(stat) ? WEXITSTATUS(stat) : -1);
326: e_pclose(f);
327: ret = -1;
328: break;
329: } else
330: array_Set(prg->prog_fds, i, f);
331: clrbit(prg->prog_used, i);
332: prg->prog_cnum++;
333: ret++;
334: }
335: pthread_mutex_unlock(&prg->prog_mtx);
336:
337: return ret;
338: }
339:
340: /*
341: * io_progGrow() - Execute to number of programs in pool
342: *
343: * @prg = program pool
344: * @toNum = execute to number of programs (0 max)
345: * return: 0 error or nothing to do,
346: * >0 executed programs and abs(<0) executed programs with logged error
347: */
348: int
349: io_progGrow(prog_t * __restrict prg, u_int toNum)
350: {
351: if (!prg)
352: return 0;
353: if (toNum > prg->prog_maxn) {
354: io_SetErr(EINVAL, "Requested number for program execution is over pool's limit");
355: return 0;
356: }
357: if (!toNum)
358: toNum = prg->prog_maxn;
359: if (toNum < prg->prog_inin)
360: toNum = prg->prog_inin;
361:
362: if ((int) (toNum - prg->prog_cnum) < 1)
363: return 0;
364:
365: return io_progOpen(prg, toNum - prg->prog_cnum);
366: }
367:
368: /*
369: * io_progVacuum() - Vacuum pool to running number of programs
370: *
371: * @prg = program pool
372: * @toNum = vacuum to number of programs (0 to init number)
373: * return: 0 error or >0 closed programs
374: */
375: int
376: io_progVacuum(prog_t * __restrict prg, u_int toNum)
377: {
378: register int i;
379: int ret = 0;
380: struct tagPIOPID *p;
381:
382: if (!prg)
383: return 0;
384: if (toNum > prg->prog_maxn) {
385: io_SetErr(EINVAL, "Requested number for close program is over pool's limit");
386: return 0;
387: }
388: if (!toNum)
389: toNum = prg->prog_inin;
390:
391: pthread_mutex_lock(&prg->prog_mtx);
392: for (i = array_Size(prg->prog_fds) - 1; prg->prog_cnum > toNum && i > -1; i--)
393: if (array_Get(prg->prog_fds, i) && isclr(prg->prog_used, i) &&
394: #ifdef POPEN_STREAM
395: (p = pio_pgetpid(array(prg->prog_fds, i, FILE*)))) {
396: kill(p->pid, SIGTERM);
397: usleep(1000);
398: if (waitpid(p->pid, &p->stat, WNOHANG) > 0)
399: kill(p->pid, SIGKILL);
400: e_pclose(array(prg->prog_fds, i, FILE*));
401: #else
402: (p = pio_pgetpid((int) array(prg->prog_fds, i, intptr_t)))) {
403: kill(p->pid, SIGTERM);
404: usleep(1000);
405: if (waitpid(p->pid, &p->stat, WNOHANG) > 0)
406: kill(p->pid, SIGKILL);
407: e_pclose((int) array(prg->prog_fds, i, intptr_t));
408: #endif
409: array_Del(prg->prog_fds, i, 0);
410: prg->prog_cnum--;
411: ret++;
412: }
413: pthread_mutex_unlock(&prg->prog_mtx);
414:
415: return ret;
416: }
417:
418: /*
419: * io_progCheck() - Check exit status of program pool
420: *
421: * @prg = program pool
422: * @re = resurrect program to init number
423: * return: -1 error or >-1 exited programs
424: */
425: int
426: io_progCheck(prog_t * __restrict prg, int re)
427: {
428: int ret = 0;
429: struct tagPIOPID *p;
430: register int i;
431:
432: if (!prg)
433: return -1;
434:
435: pthread_mutex_lock(&prg->prog_mtx);
436: for (i = 0; i < array_Size(prg->prog_fds); i++)
437: if (array_Get(prg->prog_fds, i) &&
438: #ifdef POPEN_STREAM
439: (p = pio_pgetpid(array(prg->prog_fds, i, FILE*)))) {
440: #else
441: (p = pio_pgetpid((int) array(prg->prog_fds, i, intptr_t)))) {
442: #endif
443: if (waitpid(p->pid, &p->stat, WNOHANG)) {
444: clrbit(prg->prog_used, i);
445: #ifdef POPEN_STREAM
446: e_pclose(array(prg->prog_fds, i, FILE*));
447: #else
448: e_pclose((int) array(prg->prog_fds, i, intptr_t));
449: #endif
450: array_Del(prg->prog_fds, i, 0);
451: prg->prog_cnum--;
452: ret++;
453: }
454: }
455: pthread_mutex_unlock(&prg->prog_mtx);
456:
457: /* resurrect to init number */
458: if (re && ((int) (prg->prog_inin - prg->prog_cnum) > 0))
459: io_progOpen(prg, prg->prog_inin - prg->prog_cnum);
460:
461: return ret;
462: }
463:
464: /*
465: * io_progAttach() - Attach to open program
466: *
467: * @prg = program pool
468: * @newOne = Execute new one program after attach
469: * return: NULL error or !=NULL attached program handle
470: */
471: #ifdef POPEN_STREAM
472: FILE *
473: #else
474: int
475: #endif
476: io_progAttach(prog_t * __restrict prg, int newOne)
477: {
478: #ifdef POPEN_STREAM
479: FILE *f = NULL;
480: #else
481: int f = -1;
482: #endif
483: register int i;
484:
485: if (!prg)
486: #ifdef POPEN_STREAM
487: return NULL;
488: #else
489: return -1;
490: #endif
491:
492: pthread_mutex_lock(&prg->prog_mtx);
493: for (i = 0; i < array_Size(prg->prog_fds); i++)
494: if (array_Get(prg->prog_fds, i) && isclr(prg->prog_used, i)) {
495: setbit(prg->prog_used, i);
496: #ifdef POPEN_STREAM
497: f = array(prg->prog_fds, i, FILE*);
498: #else
499: f = array(prg->prog_fds, i, intptr_t);
500: #endif
501: break;
502: }
503: pthread_mutex_unlock(&prg->prog_mtx);
504:
505: /* execute new one program */
506: if (newOne) {
507: if (f)
508: io_progOpen(prg, 1);
509: else if ((i = io_progOpen2(prg)) > -1)
510: /* not found free program */
511: #ifdef POPEN_STREAM
512: f = array(prg->prog_fds, i, FILE*);
513: #else
514: f = array(prg->prog_fds, i, intptr_t);
515: #endif
516: }
517:
518: return f;
519: }
520:
521: /*
522: * io_progDetach() - Detch from open program
523: *
524: * @prg= program pool
525: * @pfd = attached program handle
526: * return: none
527: */
528: void
529: #ifdef POPEN_STREAM
530: io_progDetach(prog_t * __restrict prg, FILE *pfd)
531: #else
532: io_progDetach(prog_t * __restrict prg, int pfd)
533: #endif
534: {
535: register int i;
536:
537: if (!prg || !pfd)
538: return;
539:
540: pthread_mutex_lock(&prg->prog_mtx);
541: for (i = 0; i < array_Size(prg->prog_fds); i++)
542: #ifdef POPEN_STREAM
543: if (array(prg->prog_fds, i, FILE*) == pfd) {
544: #else
545: if (array(prg->prog_fds, i, intptr_t) == pfd) {
546: #endif
547: clrbit(prg->prog_used, i);
548: break;
549: }
550: pthread_mutex_unlock(&prg->prog_mtx);
551: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>