1: /*************************************************************************
2: * (C) 2010 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
6: * $Id: srv.c,v 1.23 2014/01/28 14:05:43 misho Exp $
7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
15: Copyright 2004 - 2014
16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47:
48:
49: /* SOCK_STREAM */
50: static void *acceptClients(sched_task_t *);
51: static void *closeClient(sched_task_t *);
52: static void *rxPacket(sched_task_t *);
53: static void *txPacket(sched_task_t *);
54:
55: /* SOCK_DGRAM */
56: static void *freeClient(sched_task_t *);
57: static void *rxUDPPacket(sched_task_t *);
58: static void *txUDPPacket(sched_task_t *);
59:
60: /* SOCK_RAW */
61:
62: static sched_task_func_t cbProto[SOCK_RAW + 1][4] = {
63: { acceptClients, closeClient, rxPacket, txPacket }, /* SOCK_STREAM */
64: { acceptClients, closeClient, rxPacket, txPacket }, /* SOCK_STREAM */
65: { rxUDPPacket, freeClient, rxUDPPacket, txUDPPacket }, /* SOCK_DGRAM */
66: { NULL, NULL, NULL, NULL } /* SOCK_RAW */
67: };
68:
69: /* Global Signal Argument when kqueue support disabled */
70:
71: static volatile uintptr_t _glSigArg = 0;
72:
73:
74: void
75: rpc_freeCli(rpc_cli_t * __restrict c)
76: {
77: rpc_srv_t *s = c->cli_parent;
78:
79: schedCancelby(s->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
80:
81: /* free buffer */
82: AIT_FREE_VAL(&c->cli_buf);
83:
84: array_Del(s->srv_clients, c->cli_id, 0);
85: if (c)
86: e_free(c);
87: }
88:
89:
90: static inline int
91: _check4freeslot(rpc_srv_t * __restrict srv, sockaddr_t * __restrict sa)
92: {
93: rpc_cli_t *c = NULL;
94: register int i;
95:
96: /* check free slots for connect */
97: for (i = 0; i < array_Size(srv->srv_clients) &&
98: (c = array(srv->srv_clients, i, rpc_cli_t*)); i++)
99: /* check for duplicates */
100: if (sa && !e_addrcmp(&c->cli_sa, sa, 42))
101: break;
102: if (i >= array_Size(srv->srv_clients))
103: return -1; /* no more free slots! */
104:
105: return i;
106: }
107:
108: static rpc_cli_t *
109: _allocClient(rpc_srv_t * __restrict srv, sockaddr_t * __restrict sa)
110: {
111: rpc_cli_t *c = NULL;
112: int n;
113:
114: n = _check4freeslot(srv, sa);
115: if (n == -1)
116: return NULL;
117: else
118: c = array(srv->srv_clients, n, rpc_cli_t*);
119:
120: if (!c) {
121: c = e_malloc(sizeof(rpc_cli_t));
122: if (!c) {
123: LOGERR;
124: srv->srv_kill = 1;
125: return NULL;
126: } else {
127: memset(c, 0, sizeof(rpc_cli_t));
128: array_Set(srv->srv_clients, n, c);
129: c->cli_id = n;
130: c->cli_parent = srv;
131: }
132:
133: /* alloc empty buffer */
134: AIT_SET_BUFSIZ(&c->cli_buf, 0, srv->srv_netbuf);
135: }
136:
137: return c;
138: }
139:
140:
141: static void *
142: freeClient(sched_task_t *task)
143: {
144: rpc_freeCli(TASK_ARG(task));
145:
146: return NULL;
147: }
148:
149: static void *
150: closeClient(sched_task_t *task)
151: {
152: int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock;
153:
154: rpc_freeCli(TASK_ARG(task));
155:
156: /* close client socket */
157: shutdown(sock, SHUT_RDWR);
158: close(sock);
159: return NULL;
160: }
161:
162: static void *
163: txPacket(sched_task_t *task)
164: {
165: rpc_cli_t *c = TASK_ARG(task);
166: rpc_srv_t *s = c->cli_parent;
167: rpc_func_t *f = NULL;
168: u_char *buf = AIT_GET_BUF(&c->cli_buf);
169: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
170: int ret, estlen, wlen = sizeof(struct tagRPCCall);
171: struct pollfd pfd;
172: #ifdef TCP_SESSION_TIMEOUT
173: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
174:
175: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
176: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
177: TASK_ARG(task), ts, TASK_ARG(task), 0);
178: #endif
179:
180: if (rpc->call_argc) {
181: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
182: if (!f) {
183: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
184:
185: rpc->call_argc ^= rpc->call_argc;
186: rpc->call_rep.ret = RPC_ERROR(-1);
187: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
188: } else {
189: /* calc estimated length */
190: estlen = ait_resideVars(RPC_RETVARS(c)) + wlen;
191: if (estlen > AIT_LEN(&c->cli_buf))
192: AIT_RE_BUF(&c->cli_buf, estlen);
193: buf = AIT_GET_BUF(&c->cli_buf);
194: rpc = (struct tagRPCCall*) buf;
195:
196: rpc->call_argc = htons(array_Size(RPC_RETVARS(c)));
197: /* Go Encapsulate variables */
198: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
199: RPC_RETVARS(c));
200: /* Free return values */
201: ait_freeVars(&c->cli_vars);
202: if (ret == -1) {
203: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
204:
205: rpc->call_argc ^= rpc->call_argc;
206: rpc->call_rep.ret = RPC_ERROR(-1);
207: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
208: } else
209: wlen += ret;
210: }
211: }
212:
213: rpc->call_len = htonl(wlen);
214:
215: #if 0
216: /* calculate CRC */
217: rpc->call_crc ^= rpc->call_crc;
218: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
219: #endif
220:
221: /* send reply */
222: pfd.fd = TASK_FD(task);
223: pfd.events = POLLOUT;
224: for (; wlen > 0; wlen -= ret, buf += ret) {
225: if ((ret = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 ||
226: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
227: if (ret)
228: LOGERR;
229: else
230: rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
231: /* close connection */
232: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
233: TASK_ARG(task), 0, NULL, 0);
234: return NULL;
235: }
236: ret = send(TASK_FD(task), buf, MIN(wlen, s->srv_netbuf), MSG_NOSIGNAL);
237: if (ret == -1) {
238: /* close connection */
239: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
240: TASK_ARG(task), 0, NULL, 0);
241: return NULL;
242: }
243: }
244:
245: return NULL;
246: }
247:
248: static void *
249: execCall(sched_task_t *task)
250: {
251: rpc_cli_t *c = TASK_ARG(task);
252: rpc_srv_t *s = c->cli_parent;
253: rpc_func_t *f = NULL;
254: array_t *arr = NULL;
255: u_char *buf = AIT_GET_BUF(&c->cli_buf);
256: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
257: int argc = ntohs(rpc->call_argc);
258:
259: /* Go decapsulate variables ... */
260: if (argc) {
261: arr = ait_buffer2vars(buf + sizeof(struct tagRPCCall),
262: AIT_LEN(&c->cli_buf) - sizeof(struct tagRPCCall), argc, 42);
263: if (!arr) {
264: rpc_SetErr(ERPCMISMATCH, "#%d - %s", elwix_GetErrno(), elwix_GetError());
265:
266: rpc->call_argc ^= rpc->call_argc;
267: rpc->call_rep.ret = RPC_ERROR(-1);
268: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
269: return NULL;
270: }
271: } else
272: arr = NULL;
273:
274: if (!(f = rpc_srv_getCall(s, ntohs(rpc->call_tag)))) {
275: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
276:
277: rpc->call_argc ^= rpc->call_argc;
278: rpc->call_rep.ret = RPC_ERROR(-1);
279: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
280: } else {
281: /* if client doesn't want reply */
282: argc = RPC_CHK_NOREPLY(rpc);
283: rpc->call_rep.ret = RPC_ERROR(rpc_srv_execCall(c, rpc, f->func_name, arr));
284: if (rpc->call_rep.ret == htonl(-1)) {
285: if (!rpc->call_rep.eno) {
286: LOGERR;
287: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
288: }
289: rpc->call_argc ^= rpc->call_argc;
290: } else {
291: rpc->call_rep.eno ^= rpc->call_rep.eno;
292: if (argc) {
293: /* without reply */
294: ait_freeVars(&c->cli_vars);
295: rpc->call_argc ^= rpc->call_argc;
296: } else {
297: /* reply */
298: rpc->call_argc = htons(array_Size(RPC_RETVARS(c)));
299: }
300: }
301: }
302:
303: array_Destroy(&arr);
304: return NULL;
305: }
306:
307: static void *
308: rxPacket(sched_task_t *task)
309: {
310: rpc_cli_t *c = TASK_ARG(task);
311: rpc_srv_t *s = c->cli_parent;
312: int len, rlen, noreply, estlen;
313: #if 0
314: u_short crc;
315: #endif
316: u_char *buf = AIT_GET_BUF(&c->cli_buf);
317: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
318: struct pollfd pfd;
319: #ifdef TCP_SESSION_TIMEOUT
320: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
321:
322: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
323: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
324: TASK_ARG(task), ts, TASK_ARG(task), 0);
325: #endif
326:
327: memset(buf, 0, sizeof(struct tagRPCCall));
328: rlen = recv(TASK_FD(task), rpc, sizeof(struct tagRPCCall), MSG_PEEK);
329: if (rlen < sizeof(struct tagRPCCall)) {
330: /* close connection */
331: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
332: TASK_ARG(task), 0, NULL, 0);
333: return NULL;
334: } else {
335: estlen = ntohl(rpc->call_len);
336: if (estlen > AIT_LEN(&c->cli_buf))
337: AIT_RE_BUF(&c->cli_buf, estlen);
338: rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
339: buf = AIT_GET_BUF(&c->cli_buf);
340: len = estlen;
341: }
342:
343: /* get next part of packet */
344: memset(buf, 0, len);
345: pfd.fd = TASK_FD(task);
346: pfd.events = POLLIN | POLLPRI;
347: for (; len > 0; len -= rlen, buf += rlen) {
348: if ((rlen = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 ||
349: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
350: if (rlen)
351: LOGERR;
352: else
353: rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
354: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
355: TASK_ARG(task), 0, NULL, 0);
356: return NULL;
357: }
358: rlen = recv(TASK_FD(task), buf, len, 0);
359: if (rlen == -1) {
360: /* close connection */
361: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
362: TASK_ARG(task), 0, NULL, 0);
363: return NULL;
364: }
365: }
366: len = estlen;
367:
368: #if 0
369: /* check integrity of packet */
370: crc = ntohs(rpc->call_crc);
371: rpc->call_crc ^= rpc->call_crc;
372: if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
373: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
374: return NULL;
375: }
376: #endif
377:
378: noreply = RPC_CHK_NOREPLY(rpc);
379:
380: /* check RPC packet session info */
381: if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) {
382: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
383:
384: rpc->call_argc ^= rpc->call_argc;
385: rpc->call_rep.ret = RPC_ERROR(-1);
386: rpc->call_rep.eno = RPC_ERROR(errno);
387: } else {
388: /* execute RPC call */
389: schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), (int) noreply, rpc, len);
390: }
391:
392: /* send RPC reply */
393: if (!noreply)
394: schedWrite(TASK_ROOT(task), cbProto[s->srv_proto][CB_TXPACKET],
395: TASK_ARG(task), TASK_FD(task), rpc, len);
396:
397: /* lets get next packet */
398: schedReadSelf(task);
399: return NULL;
400: }
401:
402: static void *
403: acceptClients(sched_task_t *task)
404: {
405: rpc_srv_t *srv = TASK_ARG(task);
406: rpc_cli_t *c = NULL;
407: socklen_t salen = sizeof(sockaddr_t);
408: int sock;
409: #ifdef TCP_SESSION_TIMEOUT
410: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
411: #endif
412:
413: c = _allocClient(srv, NULL);
414: if (!c) {
415: EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
416: if ((sock = accept(TASK_FD(task), NULL, NULL)) != -1) {
417: shutdown(sock, SHUT_RDWR);
418: close(sock);
419: }
420: goto end;
421: }
422:
423: /* accept client */
424: c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
425: if (c->cli_sock == -1) {
426: LOGERR;
427: AIT_FREE_VAL(&c->cli_buf);
428: array_Del(srv->srv_clients, c->cli_id, 42);
429: goto end;
430: } else
431: fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
432:
433: #ifdef TCP_SESSION_TIMEOUT
434: /* armed timer for close stateless connection */
435: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
436: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], c,
437: ts, c, 0);
438: #endif
439: schedRead(TASK_ROOT(task), cbProto[srv->srv_proto][CB_RXPACKET], c,
440: c->cli_sock, NULL, 0);
441: end:
442: schedReadSelf(task);
443: return NULL;
444: }
445:
446:
447: static void *
448: txUDPPacket(sched_task_t *task)
449: {
450: rpc_cli_t *c = TASK_ARG(task);
451: rpc_srv_t *s = c->cli_parent;
452: rpc_func_t *f = NULL;
453: u_char *buf = AIT_GET_BUF(&c->cli_buf);
454: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
455: int ret, estlen, wlen = sizeof(struct tagRPCCall);
456: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
457: struct pollfd pfd;
458:
459: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
460: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
461: TASK_ARG(task), ts, TASK_ARG(task), 0);
462:
463: if (rpc->call_argc) {
464: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
465: if (!f) {
466: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
467: rpc->call_argc ^= rpc->call_argc;
468: rpc->call_rep.ret = RPC_ERROR(-1);
469: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
470: } else {
471: /* calc estimated length */
472: estlen = ait_resideVars(RPC_RETVARS(c)) + wlen;
473: if (estlen > AIT_LEN(&c->cli_buf))
474: AIT_RE_BUF(&c->cli_buf, estlen);
475: buf = AIT_GET_BUF(&c->cli_buf);
476: rpc = (struct tagRPCCall*) buf;
477:
478: rpc->call_argc = htons(array_Size(RPC_RETVARS(c)));
479: /* Go Encapsulate variables */
480: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
481: RPC_RETVARS(c));
482: /* Free return values */
483: ait_freeVars(&c->cli_vars);
484: if (ret == -1) {
485: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
486: rpc->call_argc ^= rpc->call_argc;
487: rpc->call_rep.ret = RPC_ERROR(-1);
488: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
489: } else
490: wlen += ret;
491: }
492: }
493:
494: rpc->call_len = htonl(wlen);
495:
496: /* calculate CRC */
497: rpc->call_crc ^= rpc->call_crc;
498: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
499:
500: /* send reply */
501: pfd.fd = TASK_FD(task);
502: pfd.events = POLLOUT;
503: for (; wlen > 0; wlen -= ret, buf += ret) {
504: if ((ret = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 ||
505: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
506: if (ret)
507: LOGERR;
508: else
509: rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
510: /* close connection */
511: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
512: TASK_ARG(task), 0, NULL, 0);
513: return NULL;
514: }
515: ret = sendto(TASK_FD(task), buf, MIN(wlen, s->srv_netbuf), MSG_NOSIGNAL,
516: &c->cli_sa.sa, c->cli_sa.sa.sa_len);
517: if (ret == -1) {
518: /* close connection */
519: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
520: TASK_ARG(task), 0, NULL, 0);
521: return NULL;
522: }
523: }
524:
525: return NULL;
526: }
527:
528: static void *
529: rxUDPPacket(sched_task_t *task)
530: {
531: rpc_srv_t *srv = TASK_ARG(task);
532: rpc_cli_t *c = NULL;
533: int len, rlen, noreply, estlen;
534: u_short crc;
535: u_char *buf, b[sizeof(struct tagRPCCall)];
536: struct tagRPCCall *rpc = (struct tagRPCCall*) b;
537: sockaddr_t sa;
538: socklen_t salen;
539: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
540: struct pollfd pfd;
541:
542: /* receive connect packet */
543: salen = sa.ss.ss_len = sizeof(sockaddr_t);
544: rlen = recvfrom(TASK_FD(task), b, sizeof b, MSG_PEEK, &sa.sa, &salen);
545: if (rlen < sizeof(struct tagRPCCall)) {
546: rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
547: goto end;
548: }
549:
550: c = _allocClient(srv, &sa);
551: if (!c) {
552: EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
553: usleep(2000); /* blocked client delay */
554: goto end;
555: } else {
556: estlen = ntohl(rpc->call_len);
557: if (estlen > AIT_LEN(&c->cli_buf))
558: AIT_RE_BUF(&c->cli_buf, estlen);
559: rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
560: buf = AIT_GET_BUF(&c->cli_buf);
561: len = estlen;
562:
563: c->cli_sock = TASK_FD(task);
564: memcpy(&c->cli_sa, &sa, sizeof c->cli_sa);
565:
566: /* armed timer for close stateless connection */
567: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
568: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
569: c, ts, c, 0);
570: }
571:
572: /* get next part of packet */
573: memset(buf, 0, len);
574: pfd.fd = TASK_FD(task);
575: pfd.events = POLLIN | POLLPRI;
576: for (; len > 0; len -= rlen, buf += rlen) {
577: if ((rlen = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 ||
578: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
579: if (rlen)
580: LOGERR;
581: else
582: rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
583: schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
584: c, 0, NULL, 0);
585: return NULL;
586: }
587: salen = sa.ss.ss_len = sizeof(sockaddr_t);
588: rlen = recvfrom(TASK_FD(task), buf, len, 0, &sa.sa, &salen);
589: if (rlen == -1) {
590: /* close connection */
591: schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
592: c, 0, NULL, 0);
593: return NULL;
594: }
595: if (e_addrcmp(&c->cli_sa, &sa, 42))
596: rlen ^= rlen; /* skip if arrive from different address */
597: }
598: len = estlen;
599:
600: /* check integrity of packet */
601: crc = ntohs(rpc->call_crc);
602: rpc->call_crc ^= rpc->call_crc;
603: if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
604: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
605: return NULL;
606: }
607:
608: noreply = RPC_CHK_NOREPLY(rpc);
609:
610: /* check RPC packet session info */
611: if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
612: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
613:
614: rpc->call_argc ^= rpc->call_argc;
615: rpc->call_rep.ret = RPC_ERROR(-1);
616: rpc->call_rep.eno = RPC_ERROR(errno);
617: } else {
618: /* execute RPC call */
619: schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
620: }
621:
622: /* send RPC reply */
623: if (!noreply)
624: schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],
625: c, TASK_FD(task), rpc, len);
626: end:
627: schedReadSelf(task);
628: return NULL;
629: }
630:
631: /* ------------------------------------------------------ */
632:
633: void
634: rpc_freeBLOBCli(rpc_cli_t * __restrict c)
635: {
636: rpc_srv_t *s = c->cli_parent;
637:
638: schedCancelby(s->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
639:
640: /* free buffer */
641: AIT_FREE_VAL(&c->cli_buf);
642:
643: array_Del(s->srv_blob.clients, c->cli_id, 0);
644: if (c)
645: e_free(c);
646: }
647:
648:
649: static void *
650: closeBLOBClient(sched_task_t *task)
651: {
652: int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock;
653:
654: rpc_freeBLOBCli(TASK_ARG(task));
655:
656: /* close client socket */
657: shutdown(sock, SHUT_RDWR);
658: close(sock);
659: return NULL;
660: }
661:
662: static void *
663: txBLOB(sched_task_t *task)
664: {
665: rpc_cli_t *c = TASK_ARG(task);
666: u_char *buf = AIT_GET_BUF(&c->cli_buf);
667: int wlen = sizeof(struct tagBLOBHdr);
668:
669: /* send reply */
670: wlen = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);
671: if (wlen == -1 || wlen != sizeof(struct tagBLOBHdr)) {
672: /* close blob connection */
673: schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
674: }
675:
676: return NULL;
677: }
678:
679: static void *
680: rxBLOB(sched_task_t *task)
681: {
682: rpc_cli_t *c = TASK_ARG(task);
683: rpc_srv_t *s = c->cli_parent;
684: rpc_blob_t *b;
685: struct tagBLOBHdr blob;
686: int rlen;
687:
688: memset(&blob, 0, sizeof blob);
689: rlen = recv(TASK_FD(task), &blob, sizeof blob, 0);
690: if (rlen < 1) {
691: /* close blob connection */
692: schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
693: return NULL;
694: }
695:
696: /* check BLOB packet */
697: if (rlen < sizeof(struct tagBLOBHdr)) {
698: rpc_SetErr(ERPCMISMATCH, "Short BLOB packet");
699:
700: schedReadSelf(task);
701: return NULL;
702: }
703:
704: /* check RPC packet session info */
705: if (rpc_chkPktSession(&blob.hdr_session, &s->srv_session)) {
706: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
707: blob.hdr_cmd = error;
708: goto end;
709: }
710:
711: /* Go to proceed packet ... */
712: switch (blob.hdr_cmd) {
713: case get:
714: if (!(b = rpc_srv_getBLOB(s, ntohl(blob.hdr_var)))) {
715: rpc_SetErr(EINVAL, "Var=%x not found", ntohl(blob.hdr_var));
716: blob.hdr_cmd = no;
717: blob.hdr_ret = RPC_ERROR(-1);
718: break;
719: } else
720: blob.hdr_len = htonl(b->blob_len);
721:
722: if (rpc_srv_blobMap(s, b) != -1) {
723: /* deliver BLOB variable to client */
724: blob.hdr_ret = htonl(rpc_srv_sendBLOB(c, b));
725: rpc_srv_blobUnmap(b);
726: } else {
727: blob.hdr_cmd = error;
728: blob.hdr_ret = RPC_ERROR(-1);
729: }
730: break;
731: case set:
732: if ((b = rpc_srv_registerBLOB(s, ntohl(blob.hdr_len),
733: ntohl(blob.hdr_ret)))) {
734: /* set new BLOB variable for reply :) */
735: blob.hdr_var = htonl(b->blob_var);
736:
737: /* receive BLOB from client */
738: blob.hdr_ret = htonl(rpc_srv_recvBLOB(c, b));
739: rpc_srv_blobUnmap(b);
740: } else {
741: blob.hdr_cmd = error;
742: blob.hdr_ret = RPC_ERROR(-1);
743: }
744: break;
745: case unset:
746: if (rpc_srv_unregisterBLOB(s, ntohl(blob.hdr_var)) == -1) {
747: blob.hdr_cmd = error;
748: blob.hdr_ret = RPC_ERROR(-1);
749: }
750: break;
751: default:
752: rpc_SetErr(EPROCUNAVAIL, "Unsupported BLOB command %d", blob.hdr_cmd);
753: blob.hdr_cmd = error;
754: blob.hdr_ret = RPC_ERROR(-1);
755: }
756:
757: end:
758: memcpy(AIT_ADDR(&c->cli_buf), &blob, sizeof blob);
759: schedWrite(TASK_ROOT(task), txBLOB, TASK_ARG(task), TASK_FD(task), NULL, 0);
760: schedReadSelf(task);
761: return NULL;
762: }
763:
764: static void *
765: flushBLOB(sched_task_t *task)
766: {
767: uintptr_t sigArg = atomic_load_acq_ptr(&_glSigArg);
768: rpc_srv_t *srv = sigArg ? (void*) sigArg : TASK_ARG(task);
769: rpc_blob_t *b, *tmp;
770:
771: TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
772: TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
773:
774: rpc_srv_blobFree(srv, b);
775: e_free(b);
776: }
777:
778: if (!schedSignalSelf(task)) {
779: /* disabled kqueue support in libaitsched */
780: struct sigaction sa;
781:
782: memset(&sa, 0, sizeof sa);
783: sigemptyset(&sa.sa_mask);
784: sa.sa_handler = (void (*)(int)) flushBLOB;
785: sa.sa_flags = SA_RESTART | SA_RESETHAND;
786: sigaction(SIGFBLOB, &sa, NULL);
787: }
788:
789: return NULL;
790: }
791:
792: static void *
793: acceptBLOBClients(sched_task_t *task)
794: {
795: rpc_srv_t *srv = TASK_ARG(task);
796: rpc_cli_t *c = NULL;
797: register int i;
798: socklen_t salen = sizeof(sockaddr_t);
799: int sock;
800: #ifdef TCP_NOPUSH
801: int n = 1;
802: #endif
803:
804: /* check free slots for connect */
805: for (i = 0; i < array_Size(srv->srv_blob.clients) &&
806: (c = array(srv->srv_blob.clients, i, rpc_cli_t*)); i++);
807: if (c) { /* no more free slots! */
808: EVERBOSE(1, "BLOB client quota exceeded! Connection will be shutdown!\n");
809: if ((sock = accept(TASK_FD(task), NULL, NULL)) != -1) {
810: shutdown(sock, SHUT_RDWR);
811: close(sock);
812: }
813: goto end;
814: }
815:
816: c = e_malloc(sizeof(rpc_cli_t));
817: if (!c) {
818: LOGERR;
819: srv->srv_kill = srv->srv_blob.kill = 1;
820: return NULL;
821: } else {
822: memset(c, 0, sizeof(rpc_cli_t));
823: array_Set(srv->srv_blob.clients, i, c);
824: c->cli_id = i;
825: c->cli_parent = srv;
826: }
827:
828: /* alloc empty buffer */
829: AIT_SET_BUFSIZ(&c->cli_buf, 0, srv->srv_netbuf);
830:
831: /* accept client */
832: c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
833: if (c->cli_sock == -1) {
834: LOGERR;
835: AIT_FREE_VAL(&c->cli_buf);
836: array_Del(srv->srv_blob.clients, i, 42);
837: goto end;
838: } else {
839: #ifdef TCP_NOPUSH
840: setsockopt(c->cli_sock, IPPROTO_TCP, TCP_NOPUSH, &n, sizeof n);
841: #endif
842: fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
843: }
844:
845: schedRead(TASK_ROOT(task), rxBLOB, c, c->cli_sock, NULL, 0);
846: end:
847: schedReadSelf(task);
848: return NULL;
849: }
850:
851: /* ------------------------------------------------------ */
852:
853: /*
854: * rpc_srv_initBLOBServer() - Init & create BLOB Server
855: *
856: * @srv = RPC server instance
857: * @Port = Port for bind server, if Port == 0 default port is selected
858: * @diskDir = Disk place for BLOB file objects
859: * return: -1 == error or 0 bind and created BLOB server instance
860: */
861: int
862: rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)
863: {
864: int n = 1;
865:
866: if (!srv || srv->srv_kill) {
867: rpc_SetErr(EINVAL, "Invalid parameters can`t init BLOB server");
868: return -1;
869: }
870:
871: memset(&srv->srv_blob, 0, sizeof srv->srv_blob);
872: if (access(diskDir, R_OK | W_OK) == -1) {
873: LOGERR;
874: return -1;
875: } else
876: AIT_SET_STR(&srv->srv_blob.dir, diskDir);
877:
878: /* init blob list */
879: TAILQ_INIT(&srv->srv_blob.blobs);
880:
881: srv->srv_blob.server.cli_parent = srv;
882:
883: memcpy(&srv->srv_blob.server.cli_sa, &srv->srv_server.cli_sa, sizeof(sockaddr_t));
884: switch (srv->srv_blob.server.cli_sa.sa.sa_family) {
885: case AF_INET:
886: srv->srv_blob.server.cli_sa.sin.sin_port =
887: htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin.sin_port) + 1);
888: break;
889: case AF_INET6:
890: srv->srv_blob.server.cli_sa.sin6.sin6_port =
891: htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin6.sin6_port) + 1);
892: break;
893: case AF_LOCAL:
894: strlcat(srv->srv_blob.server.cli_sa.sun.sun_path, ".blob",
895: sizeof srv->srv_blob.server.cli_sa.sun.sun_path);
896: break;
897: default:
898: AIT_FREE_VAL(&srv->srv_blob.dir);
899: return -1;
900: }
901:
902: /* create BLOB server socket */
903: srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);
904: if (srv->srv_blob.server.cli_sock == -1) {
905: LOGERR;
906: AIT_FREE_VAL(&srv->srv_blob.dir);
907: return -1;
908: }
909: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
910: LOGERR;
911: close(srv->srv_blob.server.cli_sock);
912: AIT_FREE_VAL(&srv->srv_blob.dir);
913: return -1;
914: }
915: n = srv->srv_netbuf;
916: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
917: LOGERR;
918: close(srv->srv_blob.server.cli_sock);
919: AIT_FREE_VAL(&srv->srv_blob.dir);
920: return -1;
921: }
922: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
923: LOGERR;
924: close(srv->srv_blob.server.cli_sock);
925: AIT_FREE_VAL(&srv->srv_blob.dir);
926: return -1;
927: }
928: if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa,
929: srv->srv_blob.server.cli_sa.sa.sa_len) == -1) {
930: LOGERR;
931: close(srv->srv_blob.server.cli_sock);
932: AIT_FREE_VAL(&srv->srv_blob.dir);
933: return -1;
934: } else
935: fcntl(srv->srv_blob.server.cli_sock, F_SETFL,
936: fcntl(srv->srv_blob.server.cli_sock, F_GETFL) | O_NONBLOCK);
937:
938:
939: /* allocate pool for concurent blob clients */
940: srv->srv_blob.clients = array_Init(array_Size(srv->srv_clients));
941: if (!srv->srv_blob.clients) {
942: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
943: close(srv->srv_blob.server.cli_sock);
944: AIT_FREE_VAL(&srv->srv_blob.dir);
945: return -1;
946: }
947:
948: /* init blob scheduler */
949: srv->srv_blob.root = schedBegin();
950: if (!srv->srv_blob.root) {
951: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
952: array_Destroy(&srv->srv_blob.clients);
953: close(srv->srv_blob.server.cli_sock);
954: AIT_FREE_VAL(&srv->srv_blob.dir);
955: return -1;
956: }
957:
958: return 0;
959: }
960:
961: /*
962: * rpc_srv_endBLOBServer() - Destroy BLOB server, close all opened sockets and free resources
963: *
964: * @srv = RPC Server instance
965: * return: none
966: */
967: void
968: rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
969: {
970: if (!srv)
971: return;
972:
973: srv->srv_blob.kill = 1;
974:
975: schedEnd(&srv->srv_blob.root);
976: }
977:
978: /*
979: * rpc_srv_loopBLOBServer() - Execute Main BLOB server loop and wait for clients requests
980: *
981: * @srv = RPC Server instance
982: * return: -1 error or 0 ok, infinite loop ...
983: */
984: int
985: rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv)
986: {
987: rpc_cli_t *c;
988: register int i;
989: rpc_blob_t *b, *tmp;
990: struct timespec ts = { RPC_SCHED_POLLING, 0 };
991:
992: if (!srv || srv->srv_kill) {
993: rpc_SetErr(EINVAL, "Invalid parameter can`t start BLOB server");
994: return -1;
995: }
996:
997: if (listen(srv->srv_blob.server.cli_sock, array_Size(srv->srv_blob.clients)) == -1) {
998: LOGERR;
999: return -1;
1000: }
1001:
1002: if (!schedSignal(srv->srv_blob.root, flushBLOB, srv, SIGFBLOB, NULL, 0)) {
1003: /* disabled kqueue support in libaitsched */
1004: struct sigaction sa;
1005:
1006: atomic_store_rel_ptr(&_glSigArg, (uintptr_t) srv);
1007:
1008: memset(&sa, 0, sizeof sa);
1009: sigemptyset(&sa.sa_mask);
1010: sa.sa_handler = (void (*)(int)) flushBLOB;
1011: sa.sa_flags = SA_RESTART | SA_RESETHAND;
1012: sigaction(SIGFBLOB, &sa, NULL);
1013: }
1014:
1015: if (!schedRead(srv->srv_blob.root, acceptBLOBClients, srv,
1016: srv->srv_blob.server.cli_sock, NULL, 0)) {
1017: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1018: return -1;
1019: }
1020:
1021: schedPolling(srv->srv_blob.root, &ts, NULL);
1022: /* main rpc loop */
1023: schedRun(srv->srv_blob.root, &srv->srv_blob.kill);
1024:
1025: /* detach blobs */
1026: TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
1027: TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
1028:
1029: rpc_srv_blobFree(srv, b);
1030: e_free(b);
1031: }
1032:
1033: /* close all clients connections & server socket */
1034: for (i = 0; i < array_Size(srv->srv_blob.clients); i++) {
1035: c = array(srv->srv_blob.clients, i, rpc_cli_t*);
1036: if (c) {
1037: shutdown(c->cli_sock, SHUT_RDWR);
1038: close(c->cli_sock);
1039:
1040: schedCancelby(srv->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
1041: AIT_FREE_VAL(&c->cli_buf);
1042: }
1043: array_Del(srv->srv_blob.clients, i, 42);
1044: }
1045: array_Destroy(&srv->srv_blob.clients);
1046:
1047: close(srv->srv_blob.server.cli_sock);
1048:
1049: AIT_FREE_VAL(&srv->srv_blob.dir);
1050: return 0;
1051: }
1052:
1053:
1054: /*
1055: * rpc_srv_initServer() - Init & create RPC Server
1056: *
1057: * @InstID = Instance for authentication & recognition
1058: * @concurentClients = Concurent clients at same time to this server
1059: * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
1060: * @csHost = Host name or address for bind server, if NULL any address
1061: * @Port = Port for bind server, if Port == 0 default port is selected
1062: * @proto = Protocol, if == 0 choose SOCK_STREAM
1063: * return: NULL == error or !=NULL bind and created RPC server instance
1064: */
1065: rpc_srv_t *
1066: rpc_srv_initServer(u_char InstID, int concurentClients, int netBuf,
1067: const char *csHost, u_short Port, int proto)
1068: {
1069: int n = 1;
1070: rpc_srv_t *srv = NULL;
1071: sockaddr_t sa = E_SOCKADDR_INIT;
1072:
1073: if (!concurentClients || (proto < 0 || proto > SOCK_DGRAM)) {
1074: rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
1075: return NULL;
1076: }
1077: if (!e_gethostbyname(csHost, Port, &sa))
1078: return NULL;
1079: if (!Port)
1080: Port = RPC_DEFPORT;
1081: if (!proto)
1082: proto = SOCK_STREAM;
1083: if (netBuf < RPC_MIN_BUFSIZ)
1084: netBuf = BUFSIZ;
1085: else
1086: netBuf = E_ALIGN(netBuf, 2); /* align netBuf length */
1087:
1088: #ifdef HAVE_SRANDOMDEV
1089: srandomdev();
1090: #else
1091: time_t tim;
1092:
1093: srandom((time(&tim) ^ getpid()));
1094: #endif
1095:
1096: srv = e_malloc(sizeof(rpc_srv_t));
1097: if (!srv) {
1098: LOGERR;
1099: return NULL;
1100: } else
1101: memset(srv, 0, sizeof(rpc_srv_t));
1102:
1103: srv->srv_proto = proto;
1104: srv->srv_netbuf = netBuf;
1105: srv->srv_session.sess_version = RPC_VERSION;
1106: srv->srv_session.sess_instance = InstID;
1107:
1108: srv->srv_server.cli_parent = srv;
1109: memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa);
1110:
1111: /* init functions */
1112: pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
1113: SLIST_INIT(&srv->srv_funcs);
1114: AVL_INIT(&srv->srv_funcs);
1115:
1116: /* init scheduler */
1117: srv->srv_root = schedBegin();
1118: if (!srv->srv_root) {
1119: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1120: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1121: e_free(srv);
1122: return NULL;
1123: }
1124:
1125: /* init pool for clients */
1126: srv->srv_clients = array_Init(concurentClients);
1127: if (!srv->srv_clients) {
1128: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
1129: schedEnd(&srv->srv_root);
1130: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1131: e_free(srv);
1132: return NULL;
1133: }
1134:
1135: /* create server socket */
1136: srv->srv_server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, srv->srv_proto, 0);
1137: if (srv->srv_server.cli_sock == -1) {
1138: LOGERR;
1139: array_Destroy(&srv->srv_clients);
1140: schedEnd(&srv->srv_root);
1141: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1142: e_free(srv);
1143: return NULL;
1144: }
1145: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
1146: LOGERR;
1147: goto err;
1148: }
1149: n = srv->srv_netbuf;
1150: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
1151: LOGERR;
1152: goto err;
1153: }
1154: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
1155: LOGERR;
1156: goto err;
1157: }
1158: if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa,
1159: srv->srv_server.cli_sa.sa.sa_len) == -1) {
1160: LOGERR;
1161: goto err;
1162: } else
1163: fcntl(srv->srv_server.cli_sock, F_SETFL,
1164: fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
1165:
1166: rpc_register_srvPing(srv);
1167:
1168: return srv;
1169: err: /* error condition */
1170: close(srv->srv_server.cli_sock);
1171: array_Destroy(&srv->srv_clients);
1172: schedEnd(&srv->srv_root);
1173: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1174: e_free(srv);
1175: return NULL;
1176: }
1177:
1178: /*
1179: * rpc_srv_endServer() - Destroy RPC server, close all opened sockets and free resources
1180: *
1181: * @psrv = RPC Server instance
1182: * return: none
1183: */
1184: void
1185: rpc_srv_endServer(rpc_srv_t ** __restrict psrv)
1186: {
1187: if (!psrv || !*psrv)
1188: return;
1189:
1190: /* if send kill to blob server */
1191: rpc_srv_endBLOBServer(*psrv);
1192:
1193: (*psrv)->srv_kill = 1;
1194: sleep(RPC_SCHED_POLLING);
1195:
1196: schedEnd(&(*psrv)->srv_root);
1197:
1198: pthread_mutex_destroy(&(*psrv)->srv_funcs.mtx);
1199: e_free(*psrv);
1200: *psrv = NULL;
1201: }
1202:
1203: /*
1204: * rpc_srv_loopServer() - Execute Main server loop and wait for clients requests
1205: *
1206: * @srv = RPC Server instance
1207: * return: -1 error or 0 ok, infinite loop ...
1208: */
1209: int
1210: rpc_srv_loopServer(rpc_srv_t * __restrict srv)
1211: {
1212: rpc_cli_t *c;
1213: register int i;
1214: rpc_func_t *f;
1215: struct timespec ts = { RPC_SCHED_POLLING, 0 };
1216:
1217: if (!srv) {
1218: rpc_SetErr(EINVAL, "Invalid parameter can`t start RPC server");
1219: return -1;
1220: }
1221:
1222: if (srv->srv_proto == SOCK_STREAM)
1223: if (listen(srv->srv_server.cli_sock, array_Size(srv->srv_clients)) == -1) {
1224: LOGERR;
1225: return -1;
1226: }
1227:
1228: if (!schedRead(srv->srv_root, cbProto[srv->srv_proto][CB_ACCEPTCLIENT], srv,
1229: srv->srv_server.cli_sock, NULL, 0)) {
1230: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1231: return -1;
1232: }
1233:
1234: schedPolling(srv->srv_root, &ts, NULL);
1235: /* main rpc loop */
1236: schedRun(srv->srv_root, &srv->srv_kill);
1237:
1238: /* close all clients connections & server socket */
1239: for (i = 0; i < array_Size(srv->srv_clients); i++) {
1240: c = array(srv->srv_clients, i, rpc_cli_t*);
1241: if (c) {
1242: shutdown(c->cli_sock, SHUT_RDWR);
1243: close(c->cli_sock);
1244:
1245: schedCancelby(srv->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
1246: ait_freeVars(&RPC_RETVARS(c));
1247: AIT_FREE_VAL(&c->cli_buf);
1248: }
1249: array_Del(srv->srv_clients, i, 42);
1250: }
1251: array_Destroy(&srv->srv_clients);
1252:
1253: close(srv->srv_server.cli_sock);
1254:
1255: /* detach exported calls */
1256: RPC_FUNCS_LOCK(&srv->srv_funcs);
1257: while ((f = SLIST_FIRST(&srv->srv_funcs))) {
1258: SLIST_REMOVE_HEAD(&srv->srv_funcs, func_next);
1259:
1260: AIT_FREE_VAL(&f->func_name);
1261: e_free(f);
1262: }
1263: srv->srv_funcs.avlh_root = NULL;
1264: RPC_FUNCS_UNLOCK(&srv->srv_funcs);
1265:
1266: return 0;
1267: }
1268:
1269:
1270: /*
1271: * rpc_srv_execCall() Execute registered call from RPC server
1272: *
1273: * @cli = RPC client
1274: * @rpc = IN RPC call structure
1275: * @funcname = Execute RPC function
1276: * @args = IN RPC calling arguments from RPC client
1277: * return: -1 error, !=-1 ok
1278: */
1279: int
1280: rpc_srv_execCall(rpc_cli_t * __restrict cli, struct tagRPCCall * __restrict rpc,
1281: ait_val_t funcname, array_t * __restrict args)
1282: {
1283: rpc_callback_t func;
1284:
1285: if (!cli || !rpc || !AIT_ADDR(&funcname)) {
1286: rpc_SetErr(EINVAL, "Invalid parameter can`t exec function");
1287: return -1;
1288: }
1289:
1290: func = AIT_GET_LIKE(&funcname, rpc_callback_t);
1291: return func(cli, rpc, args);
1292: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>