1: /*************************************************************************
2: * (C) 2010 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
6: * $Id: srv.c,v 1.26.2.4 2015/06/24 23:02:28 misho Exp $
7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
15: Copyright 2004 - 2015
16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47:
48:
49: /* SOCK_STREAM */
50: static void *acceptClients(sched_task_t *);
51: static void *closeClient(sched_task_t *);
52: static void *rxPacket(sched_task_t *);
53: static void *txPacket(sched_task_t *);
54:
55: /* SOCK_DGRAM */
56: static void *freeClient(sched_task_t *);
57: static void *rxUDPPacket(sched_task_t *);
58: static void *txUDPPacket(sched_task_t *);
59:
60: /* SOCK_RAW */
61: static void *rxRAWPacket(sched_task_t *);
62: static void *txRAWPacket(sched_task_t *);
63:
64: /* SOCK_BPF */
65: static void *rxBPFPacket(sched_task_t *);
66: static void *txBPFPacket(sched_task_t *);
67:
68: /* SOCK_EXT */
69: static void *rxEXTPacket(sched_task_t *);
70: static void *txEXTPacket(sched_task_t *);
71:
72: static sched_task_func_t cbProto[SOCK_MAX_SUPPORT][4] = {
73: { acceptClients, closeClient, rxPacket, txPacket }, /* SOCK_STREAM */
74: { acceptClients, closeClient, rxPacket, txPacket }, /* SOCK_STREAM */
75: { rxUDPPacket, freeClient, rxUDPPacket, txUDPPacket }, /* SOCK_DGRAM */
76: { rxRAWPacket, freeClient, rxRAWPacket, txRAWPacket }, /* SOCK_RAW */
77: { rxBPFPacket, freeClient, rxBPFPacket, txBPFPacket }, /* SOCK_BPF */
78: { rxEXTPacket, freeClient, rxEXTPacket, txEXTPacket } /* SOCK_EXT */
79: };
80:
81: /* Global Signal Argument when kqueue support disabled */
82:
83: static volatile uintptr_t _glSigArg = 0;
84:
85:
86: void
87: rpc_freeCli(rpc_cli_t * __restrict c)
88: {
89: rpc_srv_t *s = c->cli_parent;
90:
91: schedCancelby(s->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
92:
93: /* free buffer */
94: AIT_FREE_VAL(&c->cli_buf);
95:
96: array_Del(s->srv_clients, c->cli_id, 0);
97: if (c)
98: e_free(c);
99: }
100:
101:
102: static inline int
103: _check4freeslot(rpc_srv_t * __restrict srv, sockaddr_t * __restrict sa)
104: {
105: rpc_cli_t *c = NULL;
106: register int i;
107:
108: /* check free slots for connect */
109: for (i = 0; i < array_Size(srv->srv_clients) &&
110: (c = array(srv->srv_clients, i, rpc_cli_t*)); i++)
111: /* check for duplicates */
112: if (sa && !e_addrcmp(&c->cli_sa, sa, 42))
113: break;
114: if (i >= array_Size(srv->srv_clients))
115: return -1; /* no more free slots! */
116:
117: return i;
118: }
119:
120: static rpc_cli_t *
121: _allocClient(rpc_srv_t * __restrict srv, sockaddr_t * __restrict sa)
122: {
123: rpc_cli_t *c = NULL;
124: int n;
125:
126: if (srv->srv_proto != SOCK_EXT)
127: n = _check4freeslot(srv, sa);
128: else
129: n = 0;
130: if (n == -1)
131: return NULL;
132: else
133: c = array(srv->srv_clients, n, rpc_cli_t*);
134:
135: if (!c) {
136: c = e_malloc(sizeof(rpc_cli_t));
137: if (!c) {
138: LOGERR;
139: srv->srv_kill = 1;
140: return NULL;
141: } else {
142: memset(c, 0, sizeof(rpc_cli_t));
143: array_Set(srv->srv_clients, n, c);
144: c->cli_id = n;
145: c->cli_parent = srv;
146: }
147:
148: /* alloc empty buffer */
149: AIT_SET_BUFSIZ(&c->cli_buf, 0, srv->srv_netbuf);
150: }
151:
152: return c;
153: }
154:
155:
156: static void *
157: freeClient(sched_task_t *task)
158: {
159: rpc_freeCli(TASK_ARG(task));
160:
161: return NULL;
162: }
163:
164: static void *
165: closeClient(sched_task_t *task)
166: {
167: int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock;
168:
169: rpc_freeCli(TASK_ARG(task));
170:
171: /* close client socket */
172: shutdown(sock, SHUT_RDWR);
173: close(sock);
174: return NULL;
175: }
176:
177: static void *
178: txPacket(sched_task_t *task)
179: {
180: rpc_cli_t *c = TASK_ARG(task);
181: rpc_srv_t *s = c->cli_parent;
182: rpc_func_t *f = NULL;
183: u_char *buf = AIT_GET_BUF(&c->cli_buf);
184: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
185: int ret, estlen, wlen = sizeof(struct tagRPCCall);
186: struct pollfd pfd;
187: #ifdef TCP_SESSION_TIMEOUT
188: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
189:
190: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
191: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
192: TASK_ARG(task), ts, TASK_ARG(task), 0);
193: #endif
194:
195: if (rpc->call_argc) {
196: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
197: if (!f) {
198: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
199:
200: rpc->call_argc ^= rpc->call_argc;
201: rpc->call_rep.ret = RPC_ERROR(-1);
202: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
203: } else {
204: /* calc estimated length */
205: estlen = ait_resideVars(RPC_RETVARS(c)) + wlen;
206: if (estlen > AIT_LEN(&c->cli_buf))
207: AIT_RE_BUF(&c->cli_buf, estlen);
208: buf = AIT_GET_BUF(&c->cli_buf);
209: rpc = (struct tagRPCCall*) buf;
210:
211: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
212: /* Go Encapsulate variables */
213: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
214: RPC_RETVARS(c));
215: /* Free return values */
216: ait_freeVars(&c->cli_vars);
217: if (ret == -1) {
218: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
219:
220: rpc->call_argc ^= rpc->call_argc;
221: rpc->call_rep.ret = RPC_ERROR(-1);
222: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
223: } else
224: wlen += ret;
225: }
226: }
227:
228: rpc->call_len = htonl(wlen);
229: rpc->call_io = RPC_ACK;
230:
231: #if 0
232: /* calculate CRC */
233: rpc->call_crc ^= rpc->call_crc;
234: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
235: #endif
236:
237: /* send reply */
238: pfd.fd = TASK_FD(task);
239: pfd.events = POLLOUT;
240: for (; wlen > 0; wlen -= ret, buf += ret) {
241: if ((ret = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 ||
242: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
243: if (ret)
244: LOGERR;
245: else
246: rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
247: /* close connection */
248: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
249: TASK_ARG(task), 0, NULL, 0);
250: return NULL;
251: }
252: ret = send(TASK_FD(task), buf, MIN(wlen, s->srv_netbuf), MSG_NOSIGNAL);
253: if (ret == -1) {
254: /* close connection */
255: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
256: TASK_ARG(task), 0, NULL, 0);
257: return NULL;
258: }
259: }
260:
261: return NULL;
262: }
263:
264: static void *
265: execCall(sched_task_t *task)
266: {
267: rpc_cli_t *c = TASK_ARG(task);
268: rpc_srv_t *s = c->cli_parent;
269: rpc_func_t *f = NULL;
270: array_t *arr = NULL;
271: u_char *buf = AIT_GET_BUF(&c->cli_buf);
272: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
273: int argc = rpc->call_argc;
274:
275: /* Go decapsulate variables ... */
276: if (argc) {
277: arr = ait_buffer2vars(buf + sizeof(struct tagRPCCall),
278: AIT_LEN(&c->cli_buf) - sizeof(struct tagRPCCall), argc, 42);
279: if (!arr) {
280: rpc_SetErr(ERPCMISMATCH, "#%d - %s", elwix_GetErrno(), elwix_GetError());
281:
282: rpc->call_argc ^= rpc->call_argc;
283: rpc->call_rep.ret = RPC_ERROR(-1);
284: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
285: return NULL;
286: }
287: } else
288: arr = NULL;
289:
290: if (!(f = rpc_srv_getCall(s, ntohs(rpc->call_tag)))) {
291: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
292:
293: rpc->call_argc ^= rpc->call_argc;
294: rpc->call_rep.ret = RPC_ERROR(-1);
295: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
296: } else {
297: /* if client doesn't want reply */
298: argc = RPC_CHK_NOREPLY(rpc);
299: rpc->call_rep.ret = RPC_ERROR(rpc_srv_execCall(c, rpc, f->func_name, arr));
300: if (rpc->call_rep.ret == htonl(-1)) {
301: if (!rpc->call_rep.eno) {
302: LOGERR;
303: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
304: }
305: rpc->call_argc ^= rpc->call_argc;
306: } else {
307: rpc->call_rep.eno ^= rpc->call_rep.eno;
308: if (argc) {
309: /* without reply */
310: ait_freeVars(&c->cli_vars);
311: rpc->call_argc ^= rpc->call_argc;
312: } else {
313: /* reply */
314: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
315: }
316: }
317: }
318:
319: array_Destroy(&arr);
320: return NULL;
321: }
322:
323: static void *
324: rxPacket(sched_task_t *task)
325: {
326: rpc_cli_t *c = TASK_ARG(task);
327: rpc_srv_t *s = c->cli_parent;
328: int len, rlen, noreply, estlen;
329: #if 0
330: u_short crc;
331: #endif
332: u_char *buf = AIT_GET_BUF(&c->cli_buf);
333: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
334: struct pollfd pfd;
335: #ifdef TCP_SESSION_TIMEOUT
336: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
337:
338: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
339: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
340: TASK_ARG(task), ts, TASK_ARG(task), 0);
341: #endif
342:
343: memset(buf, 0, sizeof(struct tagRPCCall));
344: rlen = recv(TASK_FD(task), rpc, sizeof(struct tagRPCCall), MSG_PEEK);
345: if (rlen < sizeof(struct tagRPCCall)) {
346: /* close connection */
347: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
348: TASK_ARG(task), 0, NULL, 0);
349: return NULL;
350: } else {
351: estlen = ntohl(rpc->call_len);
352: if (estlen > AIT_LEN(&c->cli_buf))
353: AIT_RE_BUF(&c->cli_buf, estlen);
354: rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
355: buf = AIT_GET_BUF(&c->cli_buf);
356: len = estlen;
357: }
358:
359: /* get next part of packet */
360: memset(buf, 0, len);
361: pfd.fd = TASK_FD(task);
362: pfd.events = POLLIN | POLLPRI;
363: for (; len > 0; len -= rlen, buf += rlen) {
364: if ((rlen = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 ||
365: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
366: if (rlen)
367: LOGERR;
368: else
369: rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
370: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
371: TASK_ARG(task), 0, NULL, 0);
372: return NULL;
373: }
374: rlen = recv(TASK_FD(task), buf, len, 0);
375: if (rlen == -1) {
376: /* close connection */
377: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
378: TASK_ARG(task), 0, NULL, 0);
379: return NULL;
380: }
381: }
382: len = estlen;
383:
384: /* skip loop packet */
385: if (rpc->call_io & RPC_ACK) {
386: schedReadSelf(task);
387: return NULL;
388: }
389:
390: #if 0
391: /* check integrity of packet */
392: crc = ntohs(rpc->call_crc);
393: rpc->call_crc ^= rpc->call_crc;
394: if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
395: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
396: return NULL;
397: }
398: #endif
399:
400: noreply = RPC_CHK_NOREPLY(rpc);
401:
402: /* check RPC packet session info */
403: if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) {
404: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
405:
406: rpc->call_argc ^= rpc->call_argc;
407: rpc->call_rep.ret = RPC_ERROR(-1);
408: rpc->call_rep.eno = RPC_ERROR(errno);
409: } else {
410: /* execute RPC call */
411: schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), (int) noreply, rpc, len);
412: }
413:
414: /* send RPC reply */
415: if (!noreply)
416: schedWrite(TASK_ROOT(task), cbProto[s->srv_proto][CB_TXPACKET],
417: TASK_ARG(task), TASK_FD(task), rpc, len);
418:
419: /* lets get next packet */
420: schedReadSelf(task);
421: return NULL;
422: }
423:
424: static void *
425: acceptClients(sched_task_t *task)
426: {
427: rpc_srv_t *srv = TASK_ARG(task);
428: rpc_cli_t *c = NULL;
429: socklen_t salen = sizeof(sockaddr_t);
430: int sock;
431: #ifdef TCP_SESSION_TIMEOUT
432: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
433: #endif
434:
435: c = _allocClient(srv, NULL);
436: if (!c) {
437: EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
438: if ((sock = accept(TASK_FD(task), NULL, NULL)) != -1) {
439: shutdown(sock, SHUT_RDWR);
440: close(sock);
441: }
442: goto end;
443: }
444:
445: /* accept client */
446: c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
447: if (c->cli_sock == -1) {
448: LOGERR;
449: AIT_FREE_VAL(&c->cli_buf);
450: array_Del(srv->srv_clients, c->cli_id, 42);
451: goto end;
452: } else
453: fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
454:
455: #ifdef TCP_SESSION_TIMEOUT
456: /* armed timer for close stateless connection */
457: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
458: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], c,
459: ts, c, 0);
460: #endif
461: schedRead(TASK_ROOT(task), cbProto[srv->srv_proto][CB_RXPACKET], c,
462: c->cli_sock, NULL, 0);
463: end:
464: schedReadSelf(task);
465: return NULL;
466: }
467:
468:
469: static void *
470: txUDPPacket(sched_task_t *task)
471: {
472: rpc_cli_t *c = TASK_ARG(task);
473: rpc_srv_t *s = c->cli_parent;
474: rpc_func_t *f = NULL;
475: u_char *buf = AIT_GET_BUF(&c->cli_buf);
476: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
477: int ret, estlen, wlen = sizeof(struct tagRPCCall);
478: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
479: struct pollfd pfd;
480:
481: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
482: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
483: TASK_ARG(task), ts, TASK_ARG(task), 0);
484:
485: if (rpc->call_argc) {
486: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
487: if (!f) {
488: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
489: rpc->call_argc ^= rpc->call_argc;
490: rpc->call_rep.ret = RPC_ERROR(-1);
491: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
492: } else {
493: /* calc estimated length */
494: estlen = ait_resideVars(RPC_RETVARS(c)) + wlen;
495: if (estlen > AIT_LEN(&c->cli_buf))
496: AIT_RE_BUF(&c->cli_buf, estlen);
497: buf = AIT_GET_BUF(&c->cli_buf);
498: rpc = (struct tagRPCCall*) buf;
499:
500: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
501: /* Go Encapsulate variables */
502: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
503: RPC_RETVARS(c));
504: /* Free return values */
505: ait_freeVars(&c->cli_vars);
506: if (ret == -1) {
507: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
508: rpc->call_argc ^= rpc->call_argc;
509: rpc->call_rep.ret = RPC_ERROR(-1);
510: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
511: } else
512: wlen += ret;
513: }
514: }
515:
516: rpc->call_len = htonl(wlen);
517: rpc->call_io = RPC_ACK;
518:
519: /* calculate CRC */
520: rpc->call_crc ^= rpc->call_crc;
521: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
522:
523: /* send reply */
524: pfd.fd = TASK_FD(task);
525: pfd.events = POLLOUT;
526: for (; wlen > 0; wlen -= ret, buf += ret) {
527: if ((ret = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 ||
528: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
529: if (ret)
530: LOGERR;
531: else
532: rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
533: /* close connection */
534: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
535: TASK_ARG(task), 0, NULL, 0);
536: return NULL;
537: }
538: ret = sendto(TASK_FD(task), buf, MIN(wlen, s->srv_netbuf), MSG_NOSIGNAL,
539: &c->cli_sa.sa, c->cli_sa.sa.sa_len);
540: if (ret == -1) {
541: /* close connection */
542: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
543: TASK_ARG(task), 0, NULL, 0);
544: return NULL;
545: }
546: }
547:
548: return NULL;
549: }
550:
551: static void *
552: rxUDPPacket(sched_task_t *task)
553: {
554: rpc_srv_t *srv = TASK_ARG(task);
555: rpc_cli_t *c = NULL;
556: int len, rlen, noreply, estlen;
557: u_short crc;
558: u_char *buf, b[sizeof(struct tagRPCCall)];
559: struct tagRPCCall *rpc = (struct tagRPCCall*) b;
560: sockaddr_t sa;
561: socklen_t salen;
562: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
563: struct pollfd pfd;
564:
565: /* receive connect packet */
566: salen = sa.ss.ss_len = sizeof(sockaddr_t);
567: rlen = recvfrom(TASK_FD(task), b, sizeof b, MSG_PEEK, &sa.sa, &salen);
568: if (rlen < sizeof(struct tagRPCCall)) {
569: rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
570: goto end;
571: }
572:
573: c = _allocClient(srv, &sa);
574: if (!c) {
575: EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
576: usleep(2000); /* blocked client delay */
577: goto end;
578: } else {
579: estlen = ntohl(rpc->call_len);
580: if (estlen > AIT_LEN(&c->cli_buf))
581: AIT_RE_BUF(&c->cli_buf, estlen);
582: rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
583: buf = AIT_GET_BUF(&c->cli_buf);
584: len = estlen;
585:
586: c->cli_sock = TASK_FD(task);
587: memcpy(&c->cli_sa, &sa, sizeof c->cli_sa);
588:
589: /* armed timer for close stateless connection */
590: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
591: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
592: c, ts, c, 0);
593: }
594:
595: /* get next part of packet */
596: memset(buf, 0, len);
597: pfd.fd = TASK_FD(task);
598: pfd.events = POLLIN | POLLPRI;
599: for (; len > 0; len -= rlen, buf += rlen) {
600: if ((rlen = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 ||
601: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
602: if (rlen)
603: LOGERR;
604: else
605: rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
606: schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
607: c, 0, NULL, 0);
608: goto end;
609: }
610: salen = sa.ss.ss_len = sizeof(sockaddr_t);
611: rlen = recvfrom(TASK_FD(task), buf, len, 0, &sa.sa, &salen);
612: if (rlen == -1) {
613: /* close connection */
614: schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
615: c, 0, NULL, 0);
616: goto end;
617: }
618: if (e_addrcmp(&c->cli_sa, &sa, 42))
619: rlen ^= rlen; /* skip if arrive from different address */
620: }
621: len = estlen;
622:
623: /* skip loop packet */
624: if (rpc->call_io & RPC_ACK)
625: goto end;
626:
627: /* check integrity of packet */
628: crc = ntohs(rpc->call_crc);
629: rpc->call_crc ^= rpc->call_crc;
630: if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
631: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
632: /* close connection */
633: schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
634: c, 0, NULL, 0);
635: goto end;
636: }
637:
638: noreply = RPC_CHK_NOREPLY(rpc);
639:
640: /* check RPC packet session info */
641: if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
642: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
643:
644: rpc->call_argc ^= rpc->call_argc;
645: rpc->call_rep.ret = RPC_ERROR(-1);
646: rpc->call_rep.eno = RPC_ERROR(errno);
647: } else {
648: /* execute RPC call */
649: schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
650: }
651:
652: /* send RPC reply */
653: if (!noreply)
654: schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],
655: c, TASK_FD(task), rpc, len);
656: end:
657: schedReadSelf(task);
658: return NULL;
659: }
660:
661:
662: static void *
663: txRAWPacket(sched_task_t *task)
664: {
665: rpc_cli_t *c = TASK_ARG(task);
666: rpc_srv_t *s = c->cli_parent;
667: rpc_func_t *f = NULL;
668: u_char *buf = AIT_GET_BUF(&c->cli_buf);
669: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
670: int ret, estlen, wlen = sizeof(struct tagRPCCall);
671: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
672:
673: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
674: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
675: TASK_ARG(task), ts, TASK_ARG(task), 0);
676:
677: if (rpc->call_argc) {
678: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
679: if (!f) {
680: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
681: rpc->call_argc ^= rpc->call_argc;
682: rpc->call_rep.ret = RPC_ERROR(-1);
683: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
684: } else {
685: /* calc estimated length */
686: estlen = ait_resideVars(RPC_RETVARS(c)) + wlen;
687: if (estlen > AIT_LEN(&c->cli_buf))
688: AIT_RE_BUF(&c->cli_buf, estlen);
689: buf = AIT_GET_BUF(&c->cli_buf);
690: rpc = (struct tagRPCCall*) buf;
691:
692: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
693: /* Go Encapsulate variables */
694: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
695: RPC_RETVARS(c));
696: /* Free return values */
697: ait_freeVars(&c->cli_vars);
698: if (ret == -1) {
699: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
700: rpc->call_argc ^= rpc->call_argc;
701: rpc->call_rep.ret = RPC_ERROR(-1);
702: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
703: } else
704: wlen += ret;
705: }
706: }
707:
708: rpc->call_len = htonl(wlen);
709: rpc->call_io = RPC_ACK;
710:
711: /* calculate CRC */
712: rpc->call_crc ^= rpc->call_crc;
713: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
714:
715: /* send reply */
716: ret = sendto(TASK_FD(task), buf, MIN(wlen, s->srv_netbuf), MSG_NOSIGNAL,
717: &c->cli_sa.sa, c->cli_sa.sa.sa_len);
718: if (ret == -1) {
719: /* close connection */
720: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
721: TASK_ARG(task), 0, NULL, 0);
722: return NULL;
723: }
724:
725: return NULL;
726: }
727:
728: static void *
729: rxRAWPacket(sched_task_t *task)
730: {
731: rpc_srv_t *srv = TASK_ARG(task);
732: rpc_cli_t *c = NULL;
733: int len, rlen, noreply;
734: u_short crc;
735: struct tagRPCCall *rpc;
736: sockaddr_t sa;
737: socklen_t salen;
738: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
739: ait_val_t b = AIT_VAL_INIT;
740:
741: /* receive connect packet */
742: AIT_SET_BUF(&b, NULL, srv->srv_netbuf);
743: salen = sa.ss.ss_len = sizeof(sockaddr_t);
744: rlen = recvfrom(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b), 0, &sa.sa, &salen);
745: if (sa.sa.sa_family == AF_INET) {
746: struct ip *h;
747: h = (struct ip*) AIT_GET_BUF(&b);
748: if (rlen < ntohs(h->ip_len) || h->ip_p != IPPROTO_ERPC)
749: goto end;
750: else {
751: rlen -= sizeof(struct ip);
752: rpc = (struct tagRPCCall*)
753: (AIT_GET_BUF(&b) + sizeof(struct ip));
754: }
755: } else {
756: struct ip6_hdr *h;
757: h = (struct ip6_hdr*) AIT_GET_BUF(&b);
758: if (rlen < (ntohs(h->ip6_plen) + sizeof(struct ip6_hdr)) ||
759: h->ip6_nxt != IPPROTO_ERPC)
760: goto end;
761: else {
762: rlen -= sizeof(struct ip6_hdr);
763: rpc = (struct tagRPCCall*)
764: (AIT_GET_BUF(&b) + sizeof(struct ip6_hdr));
765: }
766: }
767: if (rlen < sizeof(struct tagRPCCall)) {
768: rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
769: goto end;
770: }
771:
772: /* skip loop packet */
773: if (rpc->call_io & RPC_ACK)
774: goto end;
775:
776: c = _allocClient(srv, &sa);
777: if (!c) {
778: EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
779: usleep(2000); /* blocked client delay */
780: goto end;
781: } else {
782: len = ntohl(rpc->call_len);
783: if (len > AIT_LEN(&c->cli_buf))
784: AIT_RE_BUF(&c->cli_buf, len);
785: memcpy(AIT_GET_BUF(&c->cli_buf), rpc, len);
786: rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
787:
788: c->cli_sock = TASK_FD(task);
789: memcpy(&c->cli_sa, &sa, sizeof c->cli_sa);
790:
791: /* armed timer for close stateless connection */
792: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
793: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
794: c, ts, c, 0);
795: }
796:
797: /* check integrity of packet */
798: crc = ntohs(rpc->call_crc);
799: rpc->call_crc ^= rpc->call_crc;
800: if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
801: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
802: /* close connection */
803: schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
804: c, 0, NULL, 0);
805: goto end;
806: }
807:
808: noreply = RPC_CHK_NOREPLY(rpc);
809:
810: /* check RPC packet session info */
811: if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
812: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
813:
814: rpc->call_argc ^= rpc->call_argc;
815: rpc->call_rep.ret = RPC_ERROR(-1);
816: rpc->call_rep.eno = RPC_ERROR(errno);
817: } else {
818: /* execute RPC call */
819: schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
820: }
821:
822: /* send RPC reply */
823: if (!noreply)
824: schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],
825: c, TASK_FD(task), rpc, len);
826: end:
827: AIT_FREE_VAL(&b);
828: schedReadSelf(task);
829: return NULL;
830: }
831:
832:
833: static void *
834: txBPFPacket(sched_task_t *task)
835: {
836: rpc_cli_t *c = TASK_ARG(task);
837: rpc_srv_t *s = c->cli_parent;
838: rpc_func_t *f = NULL;
839: u_char *buf = AIT_GET_BUF(&c->cli_buf);
840: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
841: int ret, len, wlen = sizeof(struct tagRPCCall);
842: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
843: struct ether_header *eh;
844: ait_val_t b = AIT_VAL_INIT;
845:
846: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
847: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
848: TASK_ARG(task), ts, TASK_ARG(task), 0);
849:
850: if (rpc->call_argc) {
851: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
852: if (!f) {
853: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
854: rpc->call_argc ^= rpc->call_argc;
855: rpc->call_rep.ret = RPC_ERROR(-1);
856: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
857: } else {
858: /* calc estimated length */
859: len = ait_resideVars(RPC_RETVARS(c)) + wlen;
860: if (len > AIT_LEN(&c->cli_buf))
861: AIT_RE_BUF(&c->cli_buf, len);
862: buf = AIT_GET_BUF(&c->cli_buf);
863: rpc = (struct tagRPCCall*) buf;
864:
865: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
866: /* Go Encapsulate variables */
867: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
868: RPC_RETVARS(c));
869: /* Free return values */
870: ait_freeVars(&RPC_RETVARS(c));
871: if (ret == -1) {
872: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
873: rpc->call_argc ^= rpc->call_argc;
874: rpc->call_rep.ret = RPC_ERROR(-1);
875: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
876: } else
877: wlen += ret;
878: }
879: }
880:
881: rpc->call_len = htonl(wlen);
882: rpc->call_io = RPC_ACK;
883:
884: /* calculate CRC */
885: rpc->call_crc ^= rpc->call_crc;
886: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
887:
888: /* send reply */
889: AIT_SET_BUF(&b, NULL, MIN(wlen, s->srv_netbuf) + ETHER_HDR_LEN);
890: eh = (struct ether_header*) AIT_GET_BUF(&b);
891: memcpy(eh->ether_dhost, LLADDR(&c->cli_sa.sdl), ETHER_ADDR_LEN);
892: eh->ether_type = htons(RPC_DEFPORT);
893: memcpy(eh + 1, buf, MIN(wlen, s->srv_netbuf));
894:
895: ret = write(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b));
896: AIT_FREE_VAL(&b);
897: if (ret == -1) {
898: /* close connection */
899: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
900: TASK_ARG(task), 0, NULL, 0);
901: return NULL;
902: }
903:
904: return NULL;
905: }
906:
907: static void *
908: rxBPFPacket(sched_task_t *task)
909: {
910: rpc_srv_t *srv = TASK_ARG(task);
911: rpc_cli_t *c = NULL;
912: int len, rlen, noreply;
913: u_short crc;
914: struct tagRPCCall *rpc;
915: sockaddr_t sa;
916: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
917: struct bpf_hdr *h;
918: struct ether_header *eh;
919: ait_val_t b = AIT_VAL_INIT;
920:
921: /* receive connect packet */
922: AIT_SET_BUF(&b, NULL, srv->srv_netbuf);
923: rlen = read(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b));
924: h = (struct bpf_hdr*) AIT_GET_BUF(&b);
925: rlen -= h->bh_hdrlen;
926: if (rlen < h->bh_datalen || h->bh_caplen != h->bh_datalen ||
927: rlen < ETHER_HDR_LEN + sizeof(struct tagRPCCall)) {
928: rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
929: goto end;
930: } else {
931: rlen = h->bh_caplen;
932: eh = (struct ether_header*) (AIT_GET_BUF(&b) + h->bh_hdrlen);
933: rlen -= ETHER_HDR_LEN;
934: rpc = (struct tagRPCCall*) (eh + 1);
935:
936: #if 0
937: /* skip loop packet */
938: if (rpc->call_io & RPC_ACK)
939: goto end;
940: #endif
941:
942: if (eh->ether_type != ntohs(RPC_DEFPORT))
943: goto end;
944: else
945: e_getlinkbymac((const ether_addr_t*) eh->ether_shost, &sa);
946: }
947:
948: c = _allocClient(srv, &sa);
949: if (!c) {
950: EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
951: usleep(2000); /* blocked client delay */
952: goto end;
953: } else {
954: len = ntohl(rpc->call_len);
955: if (len > AIT_LEN(&c->cli_buf))
956: AIT_RE_BUF(&c->cli_buf, len);
957: memcpy(AIT_GET_BUF(&c->cli_buf), rpc, len);
958: rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
959:
960: c->cli_sock = TASK_FD(task);
961: memcpy(&c->cli_sa, &sa, sizeof c->cli_sa);
962:
963: /* armed timer for close stateless connection */
964: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
965: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
966: c, ts, c, 0);
967: }
968:
969: /* check integrity of packet */
970: crc = ntohs(rpc->call_crc);
971: rpc->call_crc ^= rpc->call_crc;
972: if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
973: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
974: /* close connection */
975: schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
976: c, 0, NULL, 0);
977: goto end;
978: }
979:
980: noreply = RPC_CHK_NOREPLY(rpc);
981:
982: /* check RPC packet session info */
983: if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
984: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
985:
986: rpc->call_argc ^= rpc->call_argc;
987: rpc->call_rep.ret = RPC_ERROR(-1);
988: rpc->call_rep.eno = RPC_ERROR(errno);
989: } else {
990: /* execute RPC call */
991: schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
992: }
993:
994: /* send RPC reply */
995: if (!noreply)
996: schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],
997: c, TASK_FD(task), rpc, len);
998: end:
999: AIT_FREE_VAL(&b);
1000: schedReadSelf(task);
1001: return NULL;
1002: }
1003:
1004:
1005: static void *
1006: txEXTPacket(sched_task_t *task)
1007: {
1008: rpc_cli_t *c = TASK_ARG(task);
1009: rpc_srv_t *s = c->cli_parent;
1010: rpc_func_t *f = NULL;
1011: u_char *buf = AIT_GET_BUF(&c->cli_buf);
1012: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
1013: int ret, len, wlen = sizeof(struct tagRPCCall);
1014: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
1015:
1016: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
1017: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
1018: TASK_ARG(task), ts, TASK_ARG(task), 0);
1019:
1020: if (rpc->call_argc) {
1021: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
1022: if (!f) {
1023: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
1024: rpc->call_argc ^= rpc->call_argc;
1025: rpc->call_rep.ret = RPC_ERROR(-1);
1026: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
1027: } else {
1028: /* calc estimated length */
1029: len = ait_resideVars(RPC_RETVARS(c)) + wlen;
1030: if (len > AIT_LEN(&c->cli_buf))
1031: AIT_RE_BUF(&c->cli_buf, len);
1032: buf = AIT_GET_BUF(&c->cli_buf);
1033: rpc = (struct tagRPCCall*) buf;
1034:
1035: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
1036: /* Go Encapsulate variables */
1037: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
1038: RPC_RETVARS(c));
1039: /* Free return values */
1040: ait_freeVars(&RPC_RETVARS(c));
1041: if (ret == -1) {
1042: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
1043: rpc->call_argc ^= rpc->call_argc;
1044: rpc->call_rep.ret = RPC_ERROR(-1);
1045: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
1046: } else
1047: wlen += ret;
1048: }
1049: }
1050:
1051: rpc->call_len = htonl(wlen);
1052: rpc->call_io = RPC_ACK;
1053:
1054: /* send reply */
1055: ret = write(TASK_FD(task), buf, MIN(wlen, s->srv_netbuf));
1056: if (ret == -1) {
1057: /* close connection */
1058: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
1059: TASK_ARG(task), 0, NULL, 0);
1060: return NULL;
1061: }
1062:
1063: return NULL;
1064: }
1065:
1066: static void *
1067: rxEXTPacket(sched_task_t *task)
1068: {
1069: rpc_srv_t *srv = TASK_ARG(task);
1070: rpc_cli_t *c = NULL;
1071: int len, rlen, noreply;
1072: struct tagRPCCall *rpc;
1073: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
1074: ait_val_t b = AIT_VAL_INIT;
1075: sockaddr_t sa;
1076:
1077: memset(&sa, 0, sizeof sa);
1078: /* receive connect packet */
1079: AIT_SET_BUF(&b, NULL, srv->srv_netbuf);
1080: rlen = read(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b));
1081: if (rlen < sizeof(struct tagRPCCall)) {
1082: rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
1083: goto end;
1084: } else {
1085: rpc = (struct tagRPCCall*) AIT_GET_BUF(&b);
1086:
1087: /* skip loop packet */
1088: if (rpc->call_io & RPC_ACK)
1089: goto end;
1090: }
1091:
1092: c = _allocClient(srv, &sa);
1093: if (!c) {
1094: EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
1095: usleep(2000); /* blocked client delay */
1096: goto end;
1097: } else {
1098: len = ntohl(rpc->call_len);
1099: if (len > AIT_LEN(&c->cli_buf))
1100: AIT_RE_BUF(&c->cli_buf, len);
1101: memcpy(AIT_GET_BUF(&c->cli_buf), rpc, AIT_LEN(&c->cli_buf));
1102: rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
1103:
1104: c->cli_sock = TASK_FD(task);
1105:
1106: /* armed timer for close stateless connection */
1107: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
1108: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
1109: c, ts, c, 0);
1110: }
1111:
1112: noreply = RPC_CHK_NOREPLY(rpc);
1113:
1114: /* check RPC packet session info */
1115: if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
1116: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
1117:
1118: rpc->call_argc ^= rpc->call_argc;
1119: rpc->call_rep.ret = RPC_ERROR(-1);
1120: rpc->call_rep.eno = RPC_ERROR(errno);
1121: } else {
1122: /* execute RPC call */
1123: schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
1124: }
1125:
1126: /* send RPC reply */
1127: if (!noreply)
1128: schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],
1129: c, TASK_FD(task), rpc, len);
1130: end:
1131: AIT_FREE_VAL(&b);
1132: schedReadSelf(task);
1133: return NULL;
1134: }
1135:
1136: /* ------------------------------------------------------ */
1137:
1138: void
1139: rpc_freeBLOBCli(rpc_cli_t * __restrict c)
1140: {
1141: rpc_srv_t *s = c->cli_parent;
1142:
1143: schedCancelby(s->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
1144:
1145: /* free buffer */
1146: AIT_FREE_VAL(&c->cli_buf);
1147:
1148: array_Del(s->srv_blob.clients, c->cli_id, 0);
1149: if (c)
1150: e_free(c);
1151: }
1152:
1153:
1154: static void *
1155: closeBLOBClient(sched_task_t *task)
1156: {
1157: int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock;
1158:
1159: rpc_freeBLOBCli(TASK_ARG(task));
1160:
1161: /* close client socket */
1162: shutdown(sock, SHUT_RDWR);
1163: close(sock);
1164: return NULL;
1165: }
1166:
1167: static void *
1168: txBLOB(sched_task_t *task)
1169: {
1170: rpc_cli_t *c = TASK_ARG(task);
1171: u_char *buf = AIT_GET_BUF(&c->cli_buf);
1172: int wlen = sizeof(struct tagBLOBHdr);
1173:
1174: /* send reply */
1175: wlen = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);
1176: if (wlen == -1 || wlen != sizeof(struct tagBLOBHdr)) {
1177: /* close blob connection */
1178: schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
1179: }
1180:
1181: return NULL;
1182: }
1183:
1184: static void *
1185: rxBLOB(sched_task_t *task)
1186: {
1187: rpc_cli_t *c = TASK_ARG(task);
1188: rpc_srv_t *s = c->cli_parent;
1189: rpc_blob_t *b;
1190: struct tagBLOBHdr blob;
1191: int rlen;
1192:
1193: memset(&blob, 0, sizeof blob);
1194: rlen = recv(TASK_FD(task), &blob, sizeof blob, 0);
1195: if (rlen < 1) {
1196: /* close blob connection */
1197: schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
1198: return NULL;
1199: }
1200:
1201: /* check BLOB packet */
1202: if (rlen < sizeof(struct tagBLOBHdr)) {
1203: rpc_SetErr(ERPCMISMATCH, "Short BLOB packet");
1204:
1205: schedReadSelf(task);
1206: return NULL;
1207: }
1208:
1209: /* check RPC packet session info */
1210: if (rpc_chkPktSession(&blob.hdr_session, &s->srv_session)) {
1211: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
1212: blob.hdr_cmd = error;
1213: goto end;
1214: }
1215:
1216: /* Go to proceed packet ... */
1217: switch (blob.hdr_cmd) {
1218: case get:
1219: if (!(b = rpc_srv_getBLOB(s, ntohl(blob.hdr_var)))) {
1220: rpc_SetErr(EINVAL, "Var=%x not found", ntohl(blob.hdr_var));
1221: blob.hdr_cmd = no;
1222: blob.hdr_ret = RPC_ERROR(-1);
1223: break;
1224: } else
1225: blob.hdr_len = htonl(b->blob_len);
1226:
1227: if (rpc_srv_blobMap(s, b) != -1) {
1228: /* deliver BLOB variable to client */
1229: blob.hdr_ret = htonl(rpc_srv_sendBLOB(c, b));
1230: rpc_srv_blobUnmap(b);
1231: } else {
1232: blob.hdr_cmd = error;
1233: blob.hdr_ret = RPC_ERROR(-1);
1234: }
1235: break;
1236: case set:
1237: if ((b = rpc_srv_registerBLOB(s, ntohl(blob.hdr_len),
1238: ntohl(blob.hdr_ret)))) {
1239: /* set new BLOB variable for reply :) */
1240: blob.hdr_var = htonl(b->blob_var);
1241:
1242: /* receive BLOB from client */
1243: blob.hdr_ret = htonl(rpc_srv_recvBLOB(c, b));
1244: rpc_srv_blobUnmap(b);
1245: } else {
1246: blob.hdr_cmd = error;
1247: blob.hdr_ret = RPC_ERROR(-1);
1248: }
1249: break;
1250: case unset:
1251: if (rpc_srv_unregisterBLOB(s, ntohl(blob.hdr_var)) == -1) {
1252: blob.hdr_cmd = error;
1253: blob.hdr_ret = RPC_ERROR(-1);
1254: }
1255: break;
1256: default:
1257: rpc_SetErr(EPROCUNAVAIL, "Unsupported BLOB command %d", blob.hdr_cmd);
1258: blob.hdr_cmd = error;
1259: blob.hdr_ret = RPC_ERROR(-1);
1260: }
1261:
1262: end:
1263: memcpy(AIT_ADDR(&c->cli_buf), &blob, sizeof blob);
1264: schedWrite(TASK_ROOT(task), txBLOB, TASK_ARG(task), TASK_FD(task), NULL, 0);
1265: schedReadSelf(task);
1266: return NULL;
1267: }
1268:
1269: static void *
1270: flushBLOB(sched_task_t *task)
1271: {
1272: uintptr_t sigArg = atomic_load_acq_ptr(&_glSigArg);
1273: rpc_srv_t *srv = sigArg ? (void*) sigArg : TASK_ARG(task);
1274: rpc_blob_t *b, *tmp;
1275:
1276: TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
1277: TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
1278:
1279: rpc_srv_blobFree(srv, b);
1280: e_free(b);
1281: }
1282:
1283: if (!schedSignalSelf(task)) {
1284: /* disabled kqueue support in libaitsched */
1285: struct sigaction sa;
1286:
1287: memset(&sa, 0, sizeof sa);
1288: sigemptyset(&sa.sa_mask);
1289: sa.sa_handler = (void (*)(int)) flushBLOB;
1290: sa.sa_flags = SA_RESTART | SA_RESETHAND;
1291: sigaction(SIGFBLOB, &sa, NULL);
1292: }
1293:
1294: return NULL;
1295: }
1296:
1297: static void *
1298: acceptBLOBClients(sched_task_t *task)
1299: {
1300: rpc_srv_t *srv = TASK_ARG(task);
1301: rpc_cli_t *c = NULL;
1302: register int i;
1303: socklen_t salen = sizeof(sockaddr_t);
1304: int sock;
1305: #ifdef TCP_NOPUSH
1306: int n = 1;
1307: #endif
1308:
1309: /* check free slots for connect */
1310: for (i = 0; i < array_Size(srv->srv_blob.clients) &&
1311: (c = array(srv->srv_blob.clients, i, rpc_cli_t*)); i++);
1312: if (c) { /* no more free slots! */
1313: EVERBOSE(1, "BLOB client quota exceeded! Connection will be shutdown!\n");
1314: if ((sock = accept(TASK_FD(task), NULL, NULL)) != -1) {
1315: shutdown(sock, SHUT_RDWR);
1316: close(sock);
1317: }
1318: goto end;
1319: }
1320:
1321: c = e_malloc(sizeof(rpc_cli_t));
1322: if (!c) {
1323: LOGERR;
1324: srv->srv_kill = srv->srv_blob.kill = 1;
1325: return NULL;
1326: } else {
1327: memset(c, 0, sizeof(rpc_cli_t));
1328: array_Set(srv->srv_blob.clients, i, c);
1329: c->cli_id = i;
1330: c->cli_parent = srv;
1331: }
1332:
1333: /* alloc empty buffer */
1334: AIT_SET_BUFSIZ(&c->cli_buf, 0, srv->srv_netbuf);
1335:
1336: /* accept client */
1337: c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
1338: if (c->cli_sock == -1) {
1339: LOGERR;
1340: AIT_FREE_VAL(&c->cli_buf);
1341: array_Del(srv->srv_blob.clients, i, 42);
1342: goto end;
1343: } else {
1344: #ifdef TCP_NOPUSH
1345: setsockopt(c->cli_sock, IPPROTO_TCP, TCP_NOPUSH, &n, sizeof n);
1346: #endif
1347: fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
1348: }
1349:
1350: schedRead(TASK_ROOT(task), rxBLOB, c, c->cli_sock, NULL, 0);
1351: end:
1352: schedReadSelf(task);
1353: return NULL;
1354: }
1355:
1356: /* ------------------------------------------------------ */
1357:
1358: /*
1359: * rpc_srv_initBLOBServer() - Init & create BLOB Server
1360: *
1361: * @srv = RPC server instance
1362: * @Port = Port for bind server, if Port == 0 default port is selected
1363: * @diskDir = Disk place for BLOB file objects
1364: * return: -1 == error or 0 bind and created BLOB server instance
1365: */
1366: int
1367: rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)
1368: {
1369: int n = 1;
1370:
1371: if (!srv || srv->srv_kill) {
1372: rpc_SetErr(EINVAL, "Invalid parameters can`t init BLOB server");
1373: return -1;
1374: }
1375:
1376: memset(&srv->srv_blob, 0, sizeof srv->srv_blob);
1377: if (access(diskDir, R_OK | W_OK) == -1) {
1378: LOGERR;
1379: return -1;
1380: } else
1381: AIT_SET_STR(&srv->srv_blob.dir, diskDir);
1382:
1383: /* init blob list */
1384: TAILQ_INIT(&srv->srv_blob.blobs);
1385:
1386: srv->srv_blob.server.cli_parent = srv;
1387:
1388: memcpy(&srv->srv_blob.server.cli_sa, &srv->srv_server.cli_sa, sizeof(sockaddr_t));
1389: switch (srv->srv_blob.server.cli_sa.sa.sa_family) {
1390: case AF_INET:
1391: srv->srv_blob.server.cli_sa.sin.sin_port =
1392: htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin.sin_port) + 1);
1393: break;
1394: case AF_INET6:
1395: srv->srv_blob.server.cli_sa.sin6.sin6_port =
1396: htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin6.sin6_port) + 1);
1397: break;
1398: case AF_LOCAL:
1399: strlcat(srv->srv_blob.server.cli_sa.sun.sun_path, ".blob",
1400: sizeof srv->srv_blob.server.cli_sa.sun.sun_path);
1401: break;
1402: default:
1403: AIT_FREE_VAL(&srv->srv_blob.dir);
1404: return -1;
1405: }
1406:
1407: /* create BLOB server socket */
1408: srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);
1409: if (srv->srv_blob.server.cli_sock == -1) {
1410: LOGERR;
1411: AIT_FREE_VAL(&srv->srv_blob.dir);
1412: return -1;
1413: }
1414: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
1415: LOGERR;
1416: close(srv->srv_blob.server.cli_sock);
1417: AIT_FREE_VAL(&srv->srv_blob.dir);
1418: return -1;
1419: }
1420: n = srv->srv_netbuf;
1421: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
1422: LOGERR;
1423: close(srv->srv_blob.server.cli_sock);
1424: AIT_FREE_VAL(&srv->srv_blob.dir);
1425: return -1;
1426: }
1427: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
1428: LOGERR;
1429: close(srv->srv_blob.server.cli_sock);
1430: AIT_FREE_VAL(&srv->srv_blob.dir);
1431: return -1;
1432: }
1433: if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa,
1434: srv->srv_blob.server.cli_sa.sa.sa_len) == -1) {
1435: LOGERR;
1436: close(srv->srv_blob.server.cli_sock);
1437: AIT_FREE_VAL(&srv->srv_blob.dir);
1438: return -1;
1439: } else
1440: fcntl(srv->srv_blob.server.cli_sock, F_SETFL,
1441: fcntl(srv->srv_blob.server.cli_sock, F_GETFL) | O_NONBLOCK);
1442:
1443:
1444: /* allocate pool for concurent blob clients */
1445: srv->srv_blob.clients = array_Init(array_Size(srv->srv_clients));
1446: if (!srv->srv_blob.clients) {
1447: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
1448: close(srv->srv_blob.server.cli_sock);
1449: AIT_FREE_VAL(&srv->srv_blob.dir);
1450: return -1;
1451: }
1452:
1453: /* init blob scheduler */
1454: srv->srv_blob.root = schedBegin();
1455: if (!srv->srv_blob.root) {
1456: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1457: array_Destroy(&srv->srv_blob.clients);
1458: close(srv->srv_blob.server.cli_sock);
1459: AIT_FREE_VAL(&srv->srv_blob.dir);
1460: return -1;
1461: }
1462:
1463: return 0;
1464: }
1465:
1466: /*
1467: * rpc_srv_endBLOBServer() - Destroy BLOB server, close all opened sockets and free resources
1468: *
1469: * @srv = RPC Server instance
1470: * return: none
1471: */
1472: void
1473: rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
1474: {
1475: if (!srv)
1476: return;
1477:
1478: srv->srv_blob.kill = 1;
1479:
1480: schedEnd(&srv->srv_blob.root);
1481:
1482: if (srv->srv_blob.server.cli_sa.sa.sa_family == AF_LOCAL)
1483: unlink(srv->srv_blob.server.cli_sa.sun.sun_path);
1484: }
1485:
1486: /*
1487: * rpc_srv_loopBLOBServer() - Execute Main BLOB server loop and wait for clients requests
1488: *
1489: * @srv = RPC Server instance
1490: * return: -1 error or 0 ok, infinite loop ...
1491: */
1492: int
1493: rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv)
1494: {
1495: rpc_cli_t *c;
1496: register int i;
1497: rpc_blob_t *b, *tmp;
1498: struct timespec ts = { RPC_SCHED_POLLING, 0 };
1499:
1500: if (!srv || srv->srv_kill) {
1501: rpc_SetErr(EINVAL, "Invalid parameter can`t start BLOB server");
1502: return -1;
1503: }
1504:
1505: if (listen(srv->srv_blob.server.cli_sock, array_Size(srv->srv_blob.clients)) == -1) {
1506: LOGERR;
1507: return -1;
1508: }
1509:
1510: if (!schedSignal(srv->srv_blob.root, flushBLOB, srv, SIGFBLOB, NULL, 0)) {
1511: /* disabled kqueue support in libaitsched */
1512: struct sigaction sa;
1513:
1514: atomic_store_rel_ptr(&_glSigArg, (uintptr_t) srv);
1515:
1516: memset(&sa, 0, sizeof sa);
1517: sigemptyset(&sa.sa_mask);
1518: sa.sa_handler = (void (*)(int)) flushBLOB;
1519: sa.sa_flags = SA_RESTART | SA_RESETHAND;
1520: sigaction(SIGFBLOB, &sa, NULL);
1521: }
1522:
1523: if (!schedRead(srv->srv_blob.root, acceptBLOBClients, srv,
1524: srv->srv_blob.server.cli_sock, NULL, 0)) {
1525: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1526: return -1;
1527: }
1528:
1529: schedPolling(srv->srv_blob.root, &ts, NULL);
1530: /* main rpc loop */
1531: schedRun(srv->srv_blob.root, &srv->srv_blob.kill);
1532:
1533: /* detach blobs */
1534: TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
1535: TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
1536:
1537: rpc_srv_blobFree(srv, b);
1538: e_free(b);
1539: }
1540:
1541: /* close all clients connections & server socket */
1542: for (i = 0; i < array_Size(srv->srv_blob.clients); i++) {
1543: c = array(srv->srv_blob.clients, i, rpc_cli_t*);
1544: if (c) {
1545: shutdown(c->cli_sock, SHUT_RDWR);
1546: close(c->cli_sock);
1547:
1548: schedCancelby(srv->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
1549: AIT_FREE_VAL(&c->cli_buf);
1550: }
1551: array_Del(srv->srv_blob.clients, i, 42);
1552: }
1553: array_Destroy(&srv->srv_blob.clients);
1554:
1555: close(srv->srv_blob.server.cli_sock);
1556:
1557: AIT_FREE_VAL(&srv->srv_blob.dir);
1558: return 0;
1559: }
1560:
1561:
1562: /*
1563: * rpc_srv_initServer() - Init & create RPC Server
1564: *
1565: * @InstID = Instance for authentication & recognition
1566: * @concurentClients = Concurent clients at same time to this server
1567: * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
1568: * @csHost = Host name or address for bind server, if NULL any address
1569: * @Port = Port for bind server, if Port == 0 default port is selected
1570: * @proto = Protocol, if == 0 choose SOCK_STREAM
1571: * return: NULL == error or !=NULL bind and created RPC server instance
1572: */
1573: rpc_srv_t *
1574: rpc_srv_initServer(u_char InstID, int concurentClients, int netBuf,
1575: const char *csHost, u_short Port, int proto)
1576: {
1577: int n = 1;
1578: rpc_srv_t *srv = NULL;
1579: sockaddr_t sa = E_SOCKADDR_INIT;
1580:
1581: if (!concurentClients || (proto < 0 || proto > SOCK_RAW)) {
1582: rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
1583: return NULL;
1584: }
1585: if (!Port && proto < SOCK_RAW)
1586: Port = RPC_DEFPORT;
1587: if (!e_gethostbyname(csHost, Port, &sa))
1588: return NULL;
1589: if (!proto)
1590: proto = SOCK_STREAM;
1591: if (netBuf < RPC_MIN_BUFSIZ)
1592: netBuf = BUFSIZ;
1593: else
1594: netBuf = E_ALIGN(netBuf, 2); /* align netBuf length */
1595:
1596: #ifdef HAVE_SRANDOMDEV
1597: srandomdev();
1598: #else
1599: time_t tim;
1600:
1601: srandom((time(&tim) ^ getpid()));
1602: #endif
1603:
1604: srv = e_malloc(sizeof(rpc_srv_t));
1605: if (!srv) {
1606: LOGERR;
1607: return NULL;
1608: } else
1609: memset(srv, 0, sizeof(rpc_srv_t));
1610:
1611: srv->srv_proto = proto;
1612: srv->srv_netbuf = netBuf;
1613: srv->srv_session.sess_version = RPC_VERSION;
1614: srv->srv_session.sess_instance = InstID;
1615:
1616: srv->srv_server.cli_parent = srv;
1617: memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa);
1618:
1619: /* init functions */
1620: pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
1621: SLIST_INIT(&srv->srv_funcs);
1622: AVL_INIT(&srv->srv_funcs);
1623:
1624: /* init scheduler */
1625: srv->srv_root = schedBegin();
1626: if (!srv->srv_root) {
1627: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1628: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1629: e_free(srv);
1630: return NULL;
1631: }
1632:
1633: /* init pool for clients */
1634: srv->srv_clients = array_Init(concurentClients);
1635: if (!srv->srv_clients) {
1636: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
1637: schedEnd(&srv->srv_root);
1638: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1639: e_free(srv);
1640: return NULL;
1641: }
1642:
1643: /* create server socket */
1644: srv->srv_server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family,
1645: srv->srv_proto, srv->srv_proto == SOCK_RAW ? IPPROTO_ERPC : 0);
1646: if (srv->srv_server.cli_sock == -1) {
1647: LOGERR;
1648: array_Destroy(&srv->srv_clients);
1649: schedEnd(&srv->srv_root);
1650: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1651: e_free(srv);
1652: return NULL;
1653: }
1654: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
1655: LOGERR;
1656: goto err;
1657: }
1658: n = srv->srv_netbuf;
1659: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
1660: LOGERR;
1661: goto err;
1662: }
1663: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
1664: LOGERR;
1665: goto err;
1666: }
1667: if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa,
1668: srv->srv_server.cli_sa.sa.sa_len) == -1) {
1669: LOGERR;
1670: goto err;
1671: } else
1672: fcntl(srv->srv_server.cli_sock, F_SETFL,
1673: fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
1674:
1675: rpc_register_srvPing(srv);
1676:
1677: return srv;
1678: err: /* error condition */
1679: close(srv->srv_server.cli_sock);
1680: array_Destroy(&srv->srv_clients);
1681: schedEnd(&srv->srv_root);
1682: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1683: e_free(srv);
1684: return NULL;
1685: }
1686:
1687: /*
1688: * rpc_srv_endServer() - Destroy RPC server, close all opened sockets and free resources
1689: *
1690: * @psrv = RPC Server instance
1691: * return: none
1692: */
1693: void
1694: rpc_srv_endServer(rpc_srv_t ** __restrict psrv)
1695: {
1696: if (!psrv || !*psrv)
1697: return;
1698:
1699: /* if send kill to blob server */
1700: rpc_srv_endBLOBServer(*psrv);
1701:
1702: (*psrv)->srv_kill = 1;
1703: sleep(RPC_SCHED_POLLING);
1704:
1705: schedEnd(&(*psrv)->srv_root);
1706:
1707: if ((*psrv)->srv_server.cli_sa.sa.sa_family == AF_LOCAL)
1708: unlink((*psrv)->srv_server.cli_sa.sun.sun_path);
1709:
1710: pthread_mutex_destroy(&(*psrv)->srv_funcs.mtx);
1711: e_free(*psrv);
1712: *psrv = NULL;
1713: }
1714:
1715: /*
1716: * rpc_srv_loopServer() - Execute Main server loop and wait for clients requests
1717: *
1718: * @srv = RPC Server instance
1719: * return: -1 error or 0 ok, infinite loop ...
1720: */
1721: int
1722: rpc_srv_loopServer(rpc_srv_t * __restrict srv)
1723: {
1724: rpc_cli_t *c;
1725: register int i;
1726: rpc_func_t *f;
1727: struct timespec ts = { RPC_SCHED_POLLING, 0 };
1728:
1729: if (!srv) {
1730: rpc_SetErr(EINVAL, "Invalid parameter can`t start RPC server");
1731: return -1;
1732: }
1733:
1734: if (srv->srv_proto == SOCK_STREAM)
1735: if (listen(srv->srv_server.cli_sock, array_Size(srv->srv_clients)) == -1) {
1736: LOGERR;
1737: return -1;
1738: }
1739:
1740: if (!schedRead(srv->srv_root, cbProto[srv->srv_proto][CB_ACCEPTCLIENT], srv,
1741: srv->srv_server.cli_sock, NULL, 0)) {
1742: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1743: return -1;
1744: }
1745:
1746: schedPolling(srv->srv_root, &ts, NULL);
1747: /* main rpc loop */
1748: schedRun(srv->srv_root, &srv->srv_kill);
1749:
1750: /* close all clients connections & server socket */
1751: for (i = 0; i < array_Size(srv->srv_clients); i++) {
1752: c = array(srv->srv_clients, i, rpc_cli_t*);
1753: if (c) {
1754: if (srv->srv_proto == SOCK_STREAM) {
1755: shutdown(c->cli_sock, SHUT_RDWR);
1756: close(c->cli_sock);
1757: }
1758:
1759: schedCancelby(srv->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
1760: ait_freeVars(&RPC_RETVARS(c));
1761: AIT_FREE_VAL(&c->cli_buf);
1762: }
1763: array_Del(srv->srv_clients, i, 42);
1764: }
1765: array_Destroy(&srv->srv_clients);
1766:
1767: if (srv->srv_proto != SOCK_EXT)
1768: close(srv->srv_server.cli_sock);
1769:
1770: /* detach exported calls */
1771: RPC_FUNCS_LOCK(&srv->srv_funcs);
1772: while ((f = SLIST_FIRST(&srv->srv_funcs))) {
1773: SLIST_REMOVE_HEAD(&srv->srv_funcs, func_next);
1774:
1775: AIT_FREE_VAL(&f->func_name);
1776: e_free(f);
1777: }
1778: srv->srv_funcs.avlh_root = NULL;
1779: RPC_FUNCS_UNLOCK(&srv->srv_funcs);
1780:
1781: return 0;
1782: }
1783:
1784:
1785: /*
1786: * rpc_srv_execCall() Execute registered call from RPC server
1787: *
1788: * @cli = RPC client
1789: * @rpc = IN RPC call structure
1790: * @funcname = Execute RPC function
1791: * @args = IN RPC calling arguments from RPC client
1792: * return: -1 error, !=-1 ok
1793: */
1794: int
1795: rpc_srv_execCall(rpc_cli_t * __restrict cli, struct tagRPCCall * __restrict rpc,
1796: ait_val_t funcname, array_t * __restrict args)
1797: {
1798: rpc_callback_t func;
1799:
1800: if (!cli || !rpc || !AIT_ADDR(&funcname)) {
1801: rpc_SetErr(EINVAL, "Invalid parameter can`t exec function");
1802: return -1;
1803: }
1804:
1805: func = AIT_GET_LIKE(&funcname, rpc_callback_t);
1806: return func(cli, rpc, args);
1807: }
1808:
1809:
1810: /*
1811: * rpc_srv_initServer2() - Init & create layer2 RPC Server
1812: *
1813: * @InstID = Instance for authentication & recognition
1814: * @concurentClients = Concurent clients at same time to this server
1815: * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
1816: * @csIface = Interface name for bind server, if NULL first interface on host
1817: * return: NULL == error or !=NULL bind and created RPC server instance
1818: */
1819: rpc_srv_t *
1820: rpc_srv_initServer2(u_char InstID, int concurentClients, int netBuf, const char *csIface)
1821: {
1822: int n = 1;
1823: rpc_srv_t *srv = NULL;
1824: sockaddr_t sa = E_SOCKADDR_INIT;
1825: char szIface[64], szStr[STRSIZ];
1826: register int i;
1827: struct ifreq ifr;
1828: struct bpf_insn insns[] = {
1829: BPF_STMT(BPF_LD + BPF_H + BPF_ABS, 12),
1830: BPF_JUMP(BPF_JMP + BPF_JEQ + BPF_K, RPC_DEFPORT, 0, 1),
1831: BPF_STMT(BPF_RET + BPF_K, -1),
1832: BPF_STMT(BPF_RET + BPF_K, 0),
1833: };
1834: struct bpf_program fcode = {
1835: .bf_len = sizeof(insns) / sizeof(struct bpf_insn),
1836: .bf_insns = insns
1837: };
1838:
1839: if (!concurentClients) {
1840: rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
1841: return NULL;
1842: }
1843: if (!csIface) {
1844: if (e_get1stiface(szIface, sizeof szIface))
1845: return NULL;
1846: } else
1847: strlcpy(szIface, csIface, sizeof szIface);
1848: if (!e_getifacebyname(szIface, &sa))
1849: return NULL;
1850:
1851: #ifdef HAVE_SRANDOMDEV
1852: srandomdev();
1853: #else
1854: time_t tim;
1855:
1856: srandom((time(&tim) ^ getpid()));
1857: #endif
1858:
1859: srv = e_malloc(sizeof(rpc_srv_t));
1860: if (!srv) {
1861: LOGERR;
1862: return NULL;
1863: } else
1864: memset(srv, 0, sizeof(rpc_srv_t));
1865:
1866: srv->srv_proto = SOCK_BPF;
1867: srv->srv_netbuf = netBuf;
1868: srv->srv_session.sess_version = RPC_VERSION;
1869: srv->srv_session.sess_instance = InstID;
1870:
1871: srv->srv_server.cli_parent = srv;
1872: memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa);
1873:
1874: /* init functions */
1875: pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
1876: SLIST_INIT(&srv->srv_funcs);
1877: AVL_INIT(&srv->srv_funcs);
1878:
1879: /* init scheduler */
1880: srv->srv_root = schedBegin();
1881: if (!srv->srv_root) {
1882: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1883: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1884: e_free(srv);
1885: return NULL;
1886: }
1887:
1888: /* init pool for clients */
1889: srv->srv_clients = array_Init(concurentClients);
1890: if (!srv->srv_clients) {
1891: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
1892: schedEnd(&srv->srv_root);
1893: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1894: e_free(srv);
1895: return NULL;
1896: }
1897:
1898: /* create server handler */
1899: for (i = 0; i < 10; i++) {
1900: memset(szStr, 0, sizeof szStr);
1901: snprintf(szStr, sizeof szStr, "/dev/bpf%d", i);
1902: srv->srv_server.cli_sock = open(szStr, O_RDWR);
1903: if (srv->srv_server.cli_sock > STDERR_FILENO)
1904: break;
1905: }
1906: if (srv->srv_server.cli_sock < 3) {
1907: LOGERR;
1908: array_Destroy(&srv->srv_clients);
1909: schedEnd(&srv->srv_root);
1910: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1911: e_free(srv);
1912: return NULL;
1913: }
1914:
1915: if (ioctl(srv->srv_server.cli_sock, BIOCIMMEDIATE, &n) == -1) {
1916: LOGERR;
1917: goto err;
1918: }
1919: if (ioctl(srv->srv_server.cli_sock, BIOCSETF, &fcode) == -1) {
1920: LOGERR;
1921: goto err;
1922: }
1923: n = (netBuf < RPC_MIN_BUFSIZ) ? getpagesize() : E_ALIGN(netBuf, 2);
1924: if (ioctl(srv->srv_server.cli_sock, BIOCSBLEN, &n) == -1) {
1925: LOGERR;
1926: goto err;
1927: } else
1928: srv->srv_netbuf = n;
1929:
1930: memset(&ifr, 0, sizeof ifr);
1931: strlcpy(ifr.ifr_name, szIface, sizeof ifr.ifr_name);
1932: if (ioctl(srv->srv_server.cli_sock, BIOCSETIF, &ifr) == -1) {
1933: LOGERR;
1934: goto err;
1935: } else
1936: fcntl(srv->srv_server.cli_sock, F_SETFL,
1937: fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
1938:
1939: rpc_register_srvPing(srv);
1940:
1941: return srv;
1942: err: /* error condition */
1943: close(srv->srv_server.cli_sock);
1944: array_Destroy(&srv->srv_clients);
1945: schedEnd(&srv->srv_root);
1946: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1947: e_free(srv);
1948: return NULL;
1949: }
1950:
1951: /*
1952: * rpc_srv_initServerExt() - Init & create pipe RPC Server
1953: *
1954: * @InstID = Instance for authentication & recognition
1955: * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
1956: * @fd = File descriptor
1957: * return: NULL == error or !=NULL bind and created RPC server instance
1958: */
1959: rpc_srv_t *
1960: rpc_srv_initServerExt(u_char InstID, int netBuf, int fd)
1961: {
1962: rpc_srv_t *srv = NULL;
1963:
1964: #ifdef HAVE_SRANDOMDEV
1965: srandomdev();
1966: #else
1967: time_t tim;
1968:
1969: srandom((time(&tim) ^ getpid()));
1970: #endif
1971:
1972: srv = e_malloc(sizeof(rpc_srv_t));
1973: if (!srv) {
1974: LOGERR;
1975: return NULL;
1976: } else
1977: memset(srv, 0, sizeof(rpc_srv_t));
1978:
1979: srv->srv_proto = SOCK_EXT;
1980: srv->srv_netbuf = (netBuf < RPC_MIN_BUFSIZ) ?
1981: getpagesize() : E_ALIGN(netBuf, 2);
1982: srv->srv_session.sess_version = RPC_VERSION;
1983: srv->srv_session.sess_instance = InstID;
1984:
1985: srv->srv_server.cli_parent = srv;
1986: srv->srv_server.cli_sock = fd;
1987:
1988: /* init functions */
1989: pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
1990: SLIST_INIT(&srv->srv_funcs);
1991: AVL_INIT(&srv->srv_funcs);
1992:
1993: /* init scheduler */
1994: srv->srv_root = schedBegin();
1995: if (!srv->srv_root) {
1996: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1997: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1998: e_free(srv);
1999: return NULL;
2000: }
2001:
2002: /* init pool for clients */
2003: srv->srv_clients = array_Init(1);
2004: if (!srv->srv_clients) {
2005: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
2006: schedEnd(&srv->srv_root);
2007: pthread_mutex_destroy(&srv->srv_funcs.mtx);
2008: e_free(srv);
2009: return NULL;
2010: }
2011:
2012: fcntl(srv->srv_server.cli_sock, F_SETFL,
2013: fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
2014:
2015: rpc_register_srvPing(srv);
2016:
2017: return srv;
2018: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>