1: /*************************************************************************
2: * (C) 2010 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
6: * $Id: srv.c,v 1.30.2.10 2024/02/26 18:25:32 misho Exp $
7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
15: Copyright 2004 - 2024
16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47:
48:
49: /* SOCK_STREAM */
50: static void *acceptClients(sched_task_t *);
51: static void *closeClient(sched_task_t *);
52: static void *rxPacket(sched_task_t *);
53: static void *txPacket(sched_task_t *);
54:
55: /* SOCK_DGRAM */
56: static void *freeClient(sched_task_t *);
57: static void *rxUDPPacket(sched_task_t *);
58: static void *txUDPPacket(sched_task_t *);
59:
60: /* SOCK_RAW */
61: static void *rxRAWPacket(sched_task_t *);
62: static void *txRAWPacket(sched_task_t *);
63:
64: /* SOCK_BPF */
65: static void *rxBPFPacket(sched_task_t *);
66: static void *txBPFPacket(sched_task_t *);
67:
68: /* SOCK_EXT */
69: static void *rxEXTPacket(sched_task_t *);
70: static void *txEXTPacket(sched_task_t *);
71:
72: static sched_task_func_t cbProto[SOCK_MAX_SUPPORT][4] = {
73: { acceptClients, closeClient, rxPacket, txPacket }, /* SOCK_STREAM */
74: { acceptClients, closeClient, rxPacket, txPacket }, /* SOCK_STREAM */
75: { rxUDPPacket, freeClient, NULL /*rxUDPPacket*/, txUDPPacket }, /* SOCK_DGRAM */
76: { rxRAWPacket, freeClient, NULL /*rxRAWPacket*/, txRAWPacket }, /* SOCK_RAW */
77: { rxBPFPacket, freeClient, NULL /*rxBPFPacket*/, txBPFPacket }, /* SOCK_BPF */
78: { rxEXTPacket, freeClient, NULL /*rxEXTPacket*/, txEXTPacket } /* SOCK_EXT */
79: };
80:
81: /* Global Signal Argument when kqueue support disabled */
82:
83: static volatile uintptr_t _glSigArg = 0;
84:
85: void
86: rpc_freeCli(rpc_cli_t * __restrict c)
87: {
88: rpc_srv_t *s = c->cli_parent;
89:
90: if (s->srv_proto == SOCK_STREAM)
91: schedCancelby(s->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
92:
93: /* free buffer */
94: AIT_FREE_VAL(&c->cli_buf);
95:
96: array_Del(s->srv_clients, c->cli_id, 0);
97: if (c)
98: e_free(c);
99: }
100:
101:
102: static inline int
103: _check4freeslot(rpc_srv_t * __restrict srv, sockaddr_t * __restrict sa)
104: {
105: rpc_cli_t *c = NULL;
106: register int i;
107:
108: /* check free slots for connect */
109: for (i = 0; i < array_Size(srv->srv_clients) &&
110: (c = array(srv->srv_clients, i, rpc_cli_t*)); i++)
111: /* check for duplicates */
112: if (sa && !e_addrcmp(&c->cli_sa, sa, 42))
113: break;
114: if (i >= array_Size(srv->srv_clients))
115: return -1; /* no more free slots! */
116:
117: return i;
118: }
119:
120: static rpc_cli_t *
121: _allocClient(rpc_srv_t * __restrict srv, sockaddr_t * __restrict sa)
122: {
123: rpc_cli_t *c = NULL;
124: int n;
125:
126: if (srv->srv_proto != SOCK_EXT)
127: n = _check4freeslot(srv, sa);
128: else
129: n = 0;
130: if (n == -1)
131: return NULL;
132: else
133: c = array(srv->srv_clients, n, rpc_cli_t*);
134:
135: if (!c) {
136: c = e_malloc(sizeof(rpc_cli_t));
137: if (!c) {
138: LOGERR;
139: srv->srv_kill = 1;
140: return NULL;
141: } else {
142: memset(c, 0, sizeof(rpc_cli_t));
143: array_Set(srv->srv_clients, n, c);
144: c->cli_id = n;
145: c->cli_parent = srv;
146: }
147:
148: /* alloc empty buffer */
149: AIT_SET_BUFSIZ(&c->cli_buf, 0, srv->srv_netbuf);
150: }
151:
152: return c;
153: }
154:
155:
156: static void *
157: freeClient(sched_task_t *task)
158: {
159: rpc_freeCli(TASK_ARG(task));
160:
161: taskExit(task, NULL);
162: }
163:
164: static void *
165: closeClient(sched_task_t *task)
166: {
167: int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock;
168:
169: rpc_freeCli(TASK_ARG(task));
170:
171: /* close client socket */
172: shutdown(sock, SHUT_RDWR);
173: close(sock);
174: taskExit(task, NULL);
175: }
176:
177: static void *
178: txPacket(sched_task_t *task)
179: {
180: rpc_cli_t *c = TASK_ARG(task);
181: rpc_srv_t *s = c->cli_parent;
182: rpc_func_t *f = NULL;
183: u_char *buf = AIT_GET_BUF(&c->cli_buf);
184: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
185: int ret, wlen = sizeof(struct tagRPCCall);
186: #ifdef TCP_SESSION_TIMEOUT
187: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
188:
189: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
190: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
191: TASK_ARG(task), ts, TASK_ARG(task), 0);
192: #endif
193:
194: if (rpc->call_argc) {
195: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
196: if (!f) {
197: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
198:
199: rpc->call_argc ^= rpc->call_argc;
200: RPC_SET_RETURN(rpc, -1);
201: RPC_SET_ERRNO(rpc, rpc_Errno);
202: } else if (rpc_pktFreeSpace(c) > s->srv_netbuf) {
203: rpc_SetErr(EMSGSIZE, "Message too long");
204:
205: rpc->call_argc ^= rpc->call_argc;
206: RPC_SET_RETURN(rpc, -1);
207: RPC_SET_ERRNO(rpc, rpc_Errno);
208: } else {
209: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
210: /* Go Encapsulate variables */
211: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
212: RPC_RETVARS(c));
213: if (ret == -1) {
214: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
215:
216: rpc->call_argc ^= rpc->call_argc;
217: RPC_SET_RETURN(rpc, -1);
218: RPC_SET_ERRNO(rpc, rpc_Errno);
219: } else
220: wlen += ret;
221: }
222: }
223:
224: /* Free return values */
225: ait_freeVars(&c->cli_vars);
226:
227: rpc->call_len = htonl(wlen);
228: rpc->call_io = RPC_ACK;
229:
230: #if 0
231: /* calculate CRC */
232: rpc->call_crc ^= rpc->call_crc;
233: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
234: #endif
235:
236: /* send reply */
237: ret = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);
238: if (ret == -1) {
239: /* close connection */
240: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
241: TASK_ARG(task), 0, NULL, 0);
242: }
243:
244: taskExit(task, NULL);
245: }
246:
247: static void *
248: execCall(sched_task_t *task)
249: {
250: rpc_cli_t *c = TASK_ARG(task);
251: rpc_srv_t *s = c->cli_parent;
252: rpc_func_t *f = NULL;
253: array_t *arr = NULL;
254: u_char *buf = AIT_GET_BUF(&c->cli_buf);
255: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
256: int argc = rpc->call_argc;
257:
258: /* Go decapsulate variables ... */
259: if (argc) {
260: arr = ait_buffer2vars(buf + sizeof(struct tagRPCCall),
261: AIT_LEN(&c->cli_buf) - sizeof(struct tagRPCCall), argc, 42);
262: if (!arr) {
263: rpc_SetErr(ERPCMISMATCH, "#%d - %s", elwix_GetErrno(), elwix_GetError());
264:
265: rpc->call_argc ^= rpc->call_argc;
266: RPC_SET_RETURN(rpc, -1);
267: RPC_SET_ERRNO(rpc, rpc_Errno);
268: taskExit(task, NULL);
269: }
270: } else
271: arr = NULL;
272:
273: if (!(f = rpc_srv_getCall(s, ntohs(rpc->call_tag)))) {
274: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
275:
276: rpc->call_argc ^= rpc->call_argc;
277: RPC_SET_RETURN(rpc, -1);
278: RPC_SET_ERRNO(rpc, rpc_Errno);
279: } else {
280: /* if client doesn't want reply */
281: RPC_SET_RETURN(rpc, rpc_srv_execCall(c, rpc, f->func_name, arr));
282: if (rpc->call_rep.ret == htonl(-1)) {
283: if (!rpc->call_rep.eno) {
284: LOGERR;
285: RPC_SET_ERRNO(rpc, rpc_Errno);
286: }
287: rpc->call_argc ^= rpc->call_argc;
288: ait_freeVars(&c->cli_vars);
289: } else {
290: rpc->call_rep.eno ^= rpc->call_rep.eno;
291: rpc->call_argc ^= rpc->call_argc;
292: if (TASK_VAL(task)) {
293: /* without reply */
294: ait_freeVars(&c->cli_vars);
295: } else if (rpc->call_io & RPC_REQ) {
296: /* reply */
297: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
298: }
299: }
300: }
301:
302: array_Destroy(&arr);
303: taskExit(task, NULL);
304: }
305:
306: static void *
307: rxPacket(sched_task_t *task)
308: {
309: rpc_cli_t *c = TASK_ARG(task);
310: rpc_srv_t *s = c->cli_parent;
311: int len, noreply = 0, rlen = AIT_LEN(&c->cli_buf);
312: #if 0
313: u_short crc;
314: #endif
315: u_char *buf = AIT_GET_BUF(&c->cli_buf);
316: struct tagRPCCall b, *rpc = (struct tagRPCCall*) buf;
317: #ifdef TCP_SESSION_TIMEOUT
318: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
319:
320: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
321: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
322: TASK_ARG(task), ts, TASK_ARG(task), 0);
323: #endif
324:
325: /* prepare rx */
326: len = recv(TASK_FD(task), &b, sizeof b, MSG_PEEK);
327: if (len < 1) {
328: /* close connection */
329: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
330: TASK_ARG(task), 0, NULL, 0);
331: taskExit(task, NULL);
332: } else if (len == sizeof b)
333: rlen = ntohl(b.call_len);
334: else
335: goto end;
336:
337: rlen = recv(TASK_FD(task), buf, rlen, 0);
338: if (rlen == -1) {
339: /* close connection */
340: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
341: TASK_ARG(task), 0, NULL, 0);
342: taskExit(task, NULL);
343: }
344: if (rlen < sizeof(struct tagRPCCall)) {
345: rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
346:
347: rpc->call_argc ^= rpc->call_argc;
348: RPC_SET_RETURN(rpc, -1);
349: RPC_SET_ERRNO(rpc, rpc_Errno);
350: goto err;
351: } else
352: len = ntohl(rpc->call_len);
353: if (rlen < len || len > AIT_LEN(&c->cli_buf)) {
354: rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
355:
356: rpc->call_argc ^= rpc->call_argc;
357: RPC_SET_RETURN(rpc, -1);
358: RPC_SET_ERRNO(rpc, rpc_Errno);
359: goto err;
360: }
361:
362: /* skip loop packet */
363: if (rpc->call_io & RPC_ACK) {
364: schedReadSelf(task);
365: taskExit(task, NULL);
366: }
367:
368: #if 0
369: /* check integrity of packet */
370: crc = ntohs(rpc->call_crc);
371: rpc->call_crc ^= rpc->call_crc;
372: if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
373: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
374:
375: rpc->call_argc ^= rpc->call_argc;
376: RPC_SET_RETURN(rpc, -1);
377: RPC_SET_ERRNO(rpc, rpc_Errno);
378: goto err;
379: }
380: #endif
381:
382: /* check RPC packet session info */
383: if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) {
384: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
385:
386: rpc->call_argc ^= rpc->call_argc;
387: RPC_SET_RETURN(rpc, -1);
388: RPC_SET_ERRNO(rpc, rpc_Errno);
389: goto err;
390: }
391:
392: noreply = RPC_CHK_NOREPLY(rpc);
393:
394: /* execute RPC call */
395: schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), (int) noreply, rpc, len);
396: err:
397: /* send RPC reply */
398: if (!noreply && (rpc->call_io & RPC_REQ))
399: schedWrite(TASK_ROOT(task), cbProto[s->srv_proto][CB_TXPACKET],
400: TASK_ARG(task), TASK_FD(task), rpc, len);
401: end:
402: /* lets get next packet */
403: schedReadSelf(task);
404: taskExit(task, NULL);
405: }
406:
407: static void *
408: acceptClients(sched_task_t *task)
409: {
410: rpc_srv_t *srv = TASK_ARG(task);
411: rpc_cli_t *c = NULL;
412: socklen_t salen = E_SOCKADDR_MAX;
413: int sock;
414: #ifdef TCP_SESSION_TIMEOUT
415: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
416: #endif
417:
418: c = _allocClient(srv, NULL);
419: if (!c) {
420: EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
421: if ((sock = accept(TASK_FD(task), NULL, NULL)) != -1) {
422: shutdown(sock, SHUT_RDWR);
423: close(sock);
424: }
425: goto end;
426: }
427:
428: /* accept client */
429: c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
430: if (c->cli_sock == -1) {
431: LOGERR;
432: AIT_FREE_VAL(&c->cli_buf);
433: array_Del(srv->srv_clients, c->cli_id, 42);
434: goto end;
435: } else {
436: fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
437: fcntl(c->cli_sock, F_SETFD, FD_CLOEXEC);
438: }
439:
440: #ifdef TCP_SESSION_TIMEOUT
441: /* armed timer for close stateless connection */
442: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
443: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], c,
444: ts, c, 0);
445: #endif
446: schedRead(TASK_ROOT(task), cbProto[srv->srv_proto][CB_RXPACKET], c,
447: c->cli_sock, NULL, 0);
448: end:
449: schedReadSelf(task);
450: taskExit(task, NULL);
451: }
452:
453:
454: static void *
455: txUDPPacket(sched_task_t *task)
456: {
457: rpc_cli_t *c = TASK_ARG(task);
458: rpc_srv_t *s = c->cli_parent;
459: rpc_func_t *f = NULL;
460: u_char *buf = AIT_GET_BUF(&c->cli_buf);
461: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
462: int ret, wlen = sizeof(struct tagRPCCall);
463: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
464:
465: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
466: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
467: TASK_ARG(task), ts, TASK_ARG(task), 0);
468:
469: if (rpc->call_argc) {
470: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
471: if (!f) {
472: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
473:
474: rpc->call_argc ^= rpc->call_argc;
475: RPC_SET_RETURN(rpc, -1);
476: RPC_SET_ERRNO(rpc, rpc_Errno);
477: } else if (rpc_pktFreeSpace(c) > s->srv_netbuf) {
478: rpc_SetErr(EMSGSIZE, "Message too long");
479:
480: rpc->call_argc ^= rpc->call_argc;
481: RPC_SET_RETURN(rpc, -1);
482: RPC_SET_ERRNO(rpc, rpc_Errno);
483: } else {
484: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
485: /* Go Encapsulate variables */
486: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
487: RPC_RETVARS(c));
488: if (ret == -1) {
489: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
490:
491: rpc->call_argc ^= rpc->call_argc;
492: RPC_SET_RETURN(rpc, -1);
493: RPC_SET_ERRNO(rpc, rpc_Errno);
494: } else
495: wlen += ret;
496: }
497: }
498:
499: /* Free return values */
500: ait_freeVars(&c->cli_vars);
501:
502: rpc->call_len = htonl(wlen);
503: rpc->call_io = RPC_ACK;
504:
505: /* calculate CRC */
506: rpc->call_crc ^= rpc->call_crc;
507: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
508:
509: /* send reply */
510: ret = sendto(TASK_FD(task), buf, wlen, MSG_NOSIGNAL,
511: &c->cli_sa.sa, e_addrlen(&c->cli_sa));
512: if (ret == -1) {
513: /* close connection */
514: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
515: TASK_ARG(task), 0, NULL, 0);
516: }
517:
518: taskExit(task, NULL);
519: }
520:
521: static void *
522: rxUDPPacket(sched_task_t *task)
523: {
524: rpc_srv_t *srv = TASK_ARG(task);
525: rpc_cli_t *c = NULL;
526: int len, noreply = 0, rlen;
527: u_short crc;
528: struct tagRPCCall *rpc;
529: sockaddr_t sa;
530: socklen_t salen = E_SOCKADDR_MAX;
531: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
532: ait_val_t b = AIT_VAL_INIT;
533:
534: /* receive connect packet */
535: AIT_SET_BUF(&b, NULL, srv->srv_netbuf);
536: #ifndef __linux__
537: sa.ss.ss_len = salen;
538: #endif
539: rlen = recvfrom(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b), 0, &sa.sa, &salen);
540: rpc = (struct tagRPCCall*) AIT_GET_BUF(&b);
541: if (rlen < sizeof(struct tagRPCCall))
542: goto end;
543: else
544: len = ntohl(rpc->call_len);
545: if (rlen < len || len > srv->srv_netbuf)
546: goto end;
547:
548: /* skip loop packet */
549: if (rpc->call_io & RPC_ACK)
550: goto end;
551:
552: /* check integrity of packet */
553: crc = ntohs(rpc->call_crc);
554: rpc->call_crc ^= rpc->call_crc;
555: if (crc != crcFletcher16((u_short*) AIT_GET_BUF(&b), len / 2))
556: goto end;
557:
558: /* check RPC packet session info */
559: if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session))
560: goto end;
561:
562: c = _allocClient(srv, &sa);
563: if (!c) {
564: EVERBOSE(1, "RPC client quota exceeded!");
565: usleep(2000); /* blocked client delay */
566: goto end;
567: } else {
568: memcpy(AIT_GET_BUF(&c->cli_buf), AIT_GET_BUF(&b), len);
569: rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
570:
571: c->cli_sock = TASK_FD(task);
572: memcpy(&c->cli_sa, &sa, sizeof c->cli_sa);
573:
574: /* armed timer for close stateless connection */
575: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
576: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
577: c, ts, c, 0);
578: }
579:
580: noreply = RPC_CHK_NOREPLY(rpc);
581:
582: /* execute RPC call */
583: schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
584:
585: /* send RPC reply */
586: if (!noreply && (rpc->call_io & RPC_REQ))
587: schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],
588: c, TASK_FD(task), rpc, len);
589: end:
590: AIT_FREE_VAL(&b);
591: schedReadSelf(task);
592: taskExit(task, NULL);
593: }
594:
595:
596: static void *
597: txRAWPacket(sched_task_t *task)
598: {
599: rpc_cli_t *c = TASK_ARG(task);
600: rpc_srv_t *s = c->cli_parent;
601: rpc_func_t *f = NULL;
602: u_char *buf = AIT_GET_BUF(&c->cli_buf);
603: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
604: int ret, wlen = sizeof(struct tagRPCCall);
605: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
606:
607: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
608: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
609: TASK_ARG(task), ts, TASK_ARG(task), 0);
610:
611: if (rpc->call_argc) {
612: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
613: if (!f) {
614: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
615:
616: rpc->call_argc ^= rpc->call_argc;
617: RPC_SET_RETURN(rpc, -1);
618: RPC_SET_ERRNO(rpc, rpc_Errno);
619: } else if (rpc_pktFreeSpace(c) > s->srv_netbuf) {
620: rpc_SetErr(EMSGSIZE, "Message too long");
621:
622: rpc->call_argc ^= rpc->call_argc;
623: RPC_SET_RETURN(rpc, -1);
624: RPC_SET_ERRNO(rpc, rpc_Errno);
625: } else {
626: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
627: /* Go Encapsulate variables */
628: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
629: RPC_RETVARS(c));
630: if (ret == -1) {
631: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
632:
633: rpc->call_argc ^= rpc->call_argc;
634: RPC_SET_RETURN(rpc, -1);
635: RPC_SET_ERRNO(rpc, rpc_Errno);
636: } else
637: wlen += ret;
638: }
639: }
640:
641: /* Free return values */
642: ait_freeVars(&c->cli_vars);
643:
644: rpc->call_len = htonl(wlen);
645: rpc->call_io = RPC_ACK;
646:
647: /* calculate CRC */
648: rpc->call_crc ^= rpc->call_crc;
649: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
650:
651: /* send reply */
652: ret = sendto(TASK_FD(task), buf, wlen, MSG_NOSIGNAL,
653: &c->cli_sa.sa, e_addrlen(&c->cli_sa));
654: if (ret == -1) {
655: /* close connection */
656: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
657: TASK_ARG(task), 0, NULL, 0);
658: }
659:
660: taskExit(task, NULL);
661: }
662:
663: static void *
664: rxRAWPacket(sched_task_t *task)
665: {
666: rpc_srv_t *srv = TASK_ARG(task);
667: rpc_cli_t *c = NULL;
668: int len, noreply = 0, rlen;
669: u_short crc;
670: struct tagRPCCall *rpc;
671: sockaddr_t sa;
672: socklen_t salen = E_SOCKADDR_MAX;
673: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
674: ait_val_t b = AIT_VAL_INIT;
675:
676: /* receive connect packet */
677: AIT_SET_BUF(&b, NULL, srv->srv_netbuf);
678: #ifndef __linux__
679: sa.ss.ss_len = salen;
680: #endif
681: rlen = recvfrom(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b), 0, &sa.sa, &salen);
682: if (sa.sa.sa_family == AF_INET) {
683: struct ip *h;
684: h = (struct ip*) AIT_GET_BUF(&b);
685: if (rlen < ntohs(h->ip_len) || h->ip_p != IPPROTO_ERPC)
686: goto end;
687: else {
688: rlen -= sizeof(struct ip);
689: rpc = (struct tagRPCCall*) (h + 1);
690: }
691: } else {
692: #ifdef IPV6_REMOVE_HEADER
693: struct ip6_hdr *h;
694: h = (struct ip6_hdr*) AIT_GET_BUF(&b);
695: if (rlen < ntohs(h->ip6_plen) || h->ip6_nxt != IPPROTO_ERPC)
696: goto end;
697: else {
698: rlen -= sizeof(struct ip6_hdr);
699: rpc = (struct tagRPCCall*) (h + 1);
700: }
701: #else
702: rpc = (struct tagRPCCall*) AIT_GET_BUF(&b);
703: #endif
704: }
705: if (rlen < sizeof(struct tagRPCCall))
706: goto end;
707: else
708: len = ntohl(rpc->call_len);
709: if (rlen < len || len > srv->srv_netbuf)
710: goto end;
711:
712: /* skip loop packet */
713: if (rpc->call_io & RPC_ACK)
714: goto end;
715:
716: /* check integrity of packet */
717: crc = ntohs(rpc->call_crc);
718: rpc->call_crc ^= rpc->call_crc;
719: if (crc != crcFletcher16((u_short*) AIT_GET_BUF(&b), len / 2))
720: goto end;
721:
722: /* check RPC packet session info */
723: if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session))
724: goto end;
725:
726: c = _allocClient(srv, &sa);
727: if (!c) {
728: EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
729: usleep(2000); /* blocked client delay */
730: goto end;
731: } else {
732: memcpy(AIT_GET_BUF(&c->cli_buf), rpc, len);
733: rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
734:
735: c->cli_sock = TASK_FD(task);
736: memcpy(&c->cli_sa, &sa, sizeof c->cli_sa);
737:
738: /* armed timer for close stateless connection */
739: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
740: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
741: c, ts, c, 0);
742: }
743:
744: noreply = RPC_CHK_NOREPLY(rpc);
745:
746: /* execute RPC call */
747: schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
748:
749: /* send RPC reply */
750: if (!noreply && (rpc->call_io & RPC_REQ))
751: schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],
752: c, TASK_FD(task), rpc, len);
753: end:
754: AIT_FREE_VAL(&b);
755: schedReadSelf(task);
756: taskExit(task, NULL);
757: }
758:
759:
760: static void *
761: txBPFPacket(sched_task_t *task)
762: {
763: #ifndef __linux__
764: rpc_cli_t *c = TASK_ARG(task);
765: rpc_srv_t *s = c->cli_parent;
766: rpc_func_t *f = NULL;
767: u_char *buf = AIT_GET_BUF(&c->cli_buf);
768: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
769: int ret, wlen = sizeof(struct tagRPCCall);
770: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
771: struct ether_header *eh;
772: ait_val_t b = AIT_VAL_INIT;
773:
774: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
775: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
776: TASK_ARG(task), ts, TASK_ARG(task), 0);
777:
778: if (rpc->call_argc) {
779: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
780: if (!f) {
781: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
782:
783: rpc->call_argc ^= rpc->call_argc;
784: RPC_SET_RETURN(rpc, -1);
785: RPC_SET_ERRNO(rpc, rpc_Errno);
786: } else if (rpc_pktFreeSpace(c) > s->srv_netbuf) {
787: rpc_SetErr(EMSGSIZE, "Message too long");
788:
789: rpc->call_argc ^= rpc->call_argc;
790: RPC_SET_RETURN(rpc, -1);
791: RPC_SET_ERRNO(rpc, rpc_Errno);
792: } else {
793: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
794: /* Go Encapsulate variables */
795: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
796: RPC_RETVARS(c));
797: if (ret == -1) {
798: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
799:
800: rpc->call_argc ^= rpc->call_argc;
801: RPC_SET_RETURN(rpc, -1);
802: RPC_SET_ERRNO(rpc, rpc_Errno);
803: } else
804: wlen += ret;
805: }
806: }
807:
808: /* Free return values */
809: ait_freeVars(&RPC_RETVARS(c));
810:
811: rpc->call_len = htonl(wlen);
812: rpc->call_io = RPC_ACK;
813:
814: /* calculate CRC */
815: rpc->call_crc ^= rpc->call_crc;
816: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
817:
818: /* send reply */
819: AIT_SET_BUF(&b, NULL, wlen + ETHER_HDR_LEN);
820: eh = (struct ether_header*) AIT_GET_BUF(&b);
821: memcpy(eh->ether_dhost, LLADDR(&c->cli_sa.sdl), ETHER_ADDR_LEN);
822: eh->ether_type = htons(RPC_DEFPORT);
823: memcpy(eh + 1, buf, wlen);
824:
825: ret = write(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b));
826: AIT_FREE_VAL(&b);
827: if (ret == -1) {
828: /* close connection */
829: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
830: TASK_ARG(task), 0, NULL, 0);
831: }
832: #else
833: rpc_SetErr(ENOTSUP, "Feature isn't supported on Linux!");
834: #endif
835:
836: taskExit(task, NULL);
837: }
838:
839: static void *
840: rxBPFPacket(sched_task_t *task)
841: {
842: #ifndef __linux__
843: rpc_srv_t *srv = TASK_ARG(task);
844: rpc_cli_t *c = NULL;
845: int len, rlen, noreply;
846: u_short crc;
847: struct tagRPCCall *rpc;
848: sockaddr_t sa;
849: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
850: struct bpf_hdr *h;
851: struct ether_header *eh;
852: ait_val_t b = AIT_VAL_INIT;
853:
854: /* receive connect packet */
855: AIT_SET_BUF(&b, NULL, srv->srv_netbuf);
856: rlen = read(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b));
857: h = (struct bpf_hdr*) AIT_GET_BUF(&b);
858: rlen -= h->bh_hdrlen;
859: if (rlen < h->bh_datalen || h->bh_caplen != h->bh_datalen ||
860: rlen < ETHER_HDR_LEN + sizeof(struct tagRPCCall))
861: goto end;
862: else {
863: rlen = h->bh_caplen;
864: eh = (struct ether_header*) (AIT_GET_BUF(&b) + h->bh_hdrlen);
865: rlen -= ETHER_HDR_LEN;
866: rpc = (struct tagRPCCall*) (eh + 1);
867:
868: if (eh->ether_type != ntohs(RPC_DEFPORT))
869: goto end;
870: else
871: e_getlinkbymac((const ether_addr_t*) eh->ether_shost, &sa);
872: }
873: if (rlen < sizeof(struct tagRPCCall))
874: goto end;
875: else
876: len = ntohl(rpc->call_len);
877: if (rlen < len || len > srv->srv_netbuf)
878: goto end;
879:
880: #ifdef CHECK_ETHACK
881: /* skip loop packet */
882: if (rpc->call_io & RPC_ACK)
883: goto end;
884: #endif
885:
886: /* check integrity of packet */
887: crc = ntohs(rpc->call_crc);
888: rpc->call_crc ^= rpc->call_crc;
889: if (crc != crcFletcher16((u_short*) rpc, len / 2))
890: goto end;
891:
892: /* check RPC packet session info */
893: if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session))
894: goto end;
895:
896: c = _allocClient(srv, &sa);
897: if (!c) {
898: EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
899: usleep(2000); /* blocked client delay */
900: goto end;
901: } else {
902: memcpy(AIT_GET_BUF(&c->cli_buf), rpc, len);
903: rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
904:
905: c->cli_sock = TASK_FD(task);
906: memcpy(&c->cli_sa, &sa, sizeof c->cli_sa);
907:
908: /* armed timer for close stateless connection */
909: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
910: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
911: c, ts, c, 0);
912: }
913:
914: noreply = RPC_CHK_NOREPLY(rpc);
915:
916: /* execute RPC call */
917: schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
918:
919: /* send RPC reply */
920: if (!noreply && (rpc->call_io & RPC_REQ))
921: schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],
922: c, TASK_FD(task), rpc, len);
923: end:
924: AIT_FREE_VAL(&b);
925: schedReadSelf(task);
926: #else
927: rpc_SetErr(ENOTSUP, "Feature isn't supported on Linux!");
928: #endif
929:
930: taskExit(task, NULL);
931: }
932:
933:
934: static void *
935: txEXTPacket(sched_task_t *task)
936: {
937: rpc_cli_t *c = TASK_ARG(task);
938: rpc_srv_t *s = c->cli_parent;
939: rpc_func_t *f = NULL;
940: u_char *buf = AIT_GET_BUF(&c->cli_buf);
941: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
942: int ret, wlen = sizeof(struct tagRPCCall);
943: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
944:
945: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
946: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
947: TASK_ARG(task), ts, TASK_ARG(task), 0);
948:
949: if (rpc->call_argc) {
950: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
951: if (!f) {
952: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
953:
954: rpc->call_argc ^= rpc->call_argc;
955: RPC_SET_RETURN(rpc, -1);
956: RPC_SET_ERRNO(rpc, rpc_Errno);
957: } else if (rpc_pktFreeSpace(c) > s->srv_netbuf) {
958: rpc_SetErr(EMSGSIZE, "Message too long");
959:
960: rpc->call_argc ^= rpc->call_argc;
961: RPC_SET_RETURN(rpc, -1);
962: RPC_SET_ERRNO(rpc, rpc_Errno);
963: } else {
964: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
965: /* Go Encapsulate variables */
966: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
967: RPC_RETVARS(c));
968: if (ret == -1) {
969: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
970:
971: rpc->call_argc ^= rpc->call_argc;
972: RPC_SET_RETURN(rpc, -1);
973: RPC_SET_ERRNO(rpc, rpc_Errno);
974: } else
975: wlen += ret;
976: }
977: }
978:
979: /* Free return values */
980: ait_freeVars(&RPC_RETVARS(c));
981:
982: rpc->call_len = htonl(wlen);
983: rpc->call_io = RPC_ACK;
984:
985: /* send reply */
986: ret = write(TASK_FD(task), buf, wlen);
987: if (ret == -1) {
988: /* close connection */
989: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
990: TASK_ARG(task), 0, NULL, 0);
991: }
992:
993: taskExit(task, NULL);
994: }
995:
996: static void *
997: rxEXTPacket(sched_task_t *task)
998: {
999: rpc_srv_t *srv = TASK_ARG(task);
1000: rpc_cli_t *c = NULL;
1001: int len, noreply = 0, rlen = AIT_LEN(&c->cli_buf);
1002: struct tagRPCCall *rpc;
1003: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
1004: sockaddr_t sa;
1005: ait_val_t b = AIT_VAL_INIT;
1006:
1007: memset(&sa, 0, sizeof sa);
1008: /* receive connect packet */
1009: AIT_SET_BUF(&b, NULL, srv->srv_netbuf);
1010: rlen = read(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b));
1011: rpc = (struct tagRPCCall*) AIT_GET_BUF(&b);
1012: if (rlen < sizeof(struct tagRPCCall))
1013: goto end;
1014: else
1015: len = ntohl(rpc->call_len);
1016: if (rlen < len || len > srv->srv_netbuf)
1017: goto end;
1018:
1019: /* skip loop packet */
1020: if (rpc->call_io & RPC_ACK)
1021: goto end;
1022:
1023: /* check RPC packet session info */
1024: if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session))
1025: goto end;
1026:
1027: c = _allocClient(srv, &sa);
1028: if (!c) {
1029: EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
1030: usleep(2000); /* blocked client delay */
1031: goto end;
1032: } else {
1033: memcpy(AIT_GET_BUF(&c->cli_buf), AIT_GET_BUF(&b), len);
1034: rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
1035:
1036: c->cli_sock = TASK_FD(task);
1037:
1038: /* armed timer for close stateless connection */
1039: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
1040: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
1041: c, ts, c, 0);
1042: }
1043:
1044: noreply = RPC_CHK_NOREPLY(rpc);
1045:
1046: /* execute RPC call */
1047: schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
1048:
1049: /* send RPC reply */
1050: if (!noreply && (rpc->call_io & RPC_REQ))
1051: schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],
1052: c, TASK_FD(task), rpc, len);
1053: end:
1054: AIT_FREE_VAL(&b);
1055: schedReadSelf(task);
1056: taskExit(task, NULL);
1057: }
1058:
1059: /* ------------------------------------------------------ */
1060:
1061: void
1062: rpc_freeBLOBCli(rpc_cli_t * __restrict c)
1063: {
1064: rpc_srv_t *s = c->cli_parent;
1065:
1066: schedCancelby(s->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
1067:
1068: /* free buffer */
1069: AIT_FREE_VAL(&c->cli_buf);
1070:
1071: array_Del(s->srv_blob.clients, c->cli_id, 0);
1072: if (c)
1073: e_free(c);
1074: }
1075:
1076:
1077: static void *
1078: closeBLOBClient(sched_task_t *task)
1079: {
1080: int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock;
1081:
1082: rpc_freeBLOBCli(TASK_ARG(task));
1083:
1084: /* close client socket */
1085: shutdown(sock, SHUT_RDWR);
1086: close(sock);
1087: taskExit(task, NULL);
1088: }
1089:
1090: static void *
1091: txBLOB(sched_task_t *task)
1092: {
1093: rpc_cli_t *c = TASK_ARG(task);
1094: u_char *buf = AIT_GET_BUF(&c->cli_buf);
1095: int wlen = sizeof(struct tagBLOBHdr);
1096:
1097: /* send reply */
1098: wlen = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);
1099: if (wlen == -1 || wlen != sizeof(struct tagBLOBHdr)) {
1100: /* close blob connection */
1101: schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
1102: }
1103:
1104: taskExit(task, NULL);
1105: }
1106:
1107: static void *
1108: rxBLOB(sched_task_t *task)
1109: {
1110: rpc_cli_t *c = TASK_ARG(task);
1111: rpc_srv_t *s = c->cli_parent;
1112: rpc_blob_t *b;
1113: struct tagBLOBHdr blob;
1114: int rlen;
1115:
1116: memset(&blob, 0, sizeof blob);
1117: rlen = recv(TASK_FD(task), &blob, sizeof blob, 0);
1118: if (rlen < 1) {
1119: /* close blob connection */
1120: schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
1121: taskExit(task, NULL);
1122: }
1123:
1124: /* check BLOB packet */
1125: if (rlen < sizeof(struct tagBLOBHdr)) {
1126: rpc_SetErr(ERPCMISMATCH, "Short BLOB packet");
1127:
1128: schedReadSelf(task);
1129: taskExit(task, NULL);
1130: }
1131:
1132: /* check RPC packet session info */
1133: if (rpc_chkPktSession(&blob.hdr_session, &s->srv_session)) {
1134: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
1135: blob.hdr_cmd = error;
1136: goto end;
1137: }
1138:
1139: /* Go to proceed packet ... */
1140: switch (blob.hdr_cmd) {
1141: case get:
1142: if (!(b = rpc_srv_getBLOB(s, ntohl(blob.hdr_var)))) {
1143: rpc_SetErr(EINVAL, "Var=%x not found", ntohl(blob.hdr_var));
1144: blob.hdr_cmd = no;
1145: RPC_SET_BLOB_RET(&blob, -1);
1146: break;
1147: } else
1148: blob.hdr_len = htonl(b->blob_len);
1149:
1150: if (rpc_srv_blobMap(s, b) != -1) {
1151: /* deliver BLOB variable to client */
1152: blob.hdr_ret = htonl(rpc_srv_sendBLOB(c, b));
1153: rpc_srv_blobUnmap(b);
1154: } else {
1155: blob.hdr_cmd = error;
1156: RPC_SET_BLOB_RET(&blob, -1);
1157: }
1158: break;
1159: case set:
1160: if ((b = rpc_srv_registerBLOB(s, ntohl(blob.hdr_len),
1161: ntohl(blob.hdr_ret)))) {
1162: /* set new BLOB variable for reply :) */
1163: blob.hdr_var = htonl(b->blob_var);
1164:
1165: /* receive BLOB from client */
1166: blob.hdr_ret = htonl(rpc_srv_recvBLOB(c, b));
1167: rpc_srv_blobUnmap(b);
1168: } else {
1169: blob.hdr_cmd = error;
1170: RPC_SET_BLOB_RET(&blob, -1);
1171: }
1172: break;
1173: case unset:
1174: if (rpc_srv_unregisterBLOB(s, ntohl(blob.hdr_var)) == -1) {
1175: blob.hdr_cmd = error;
1176: RPC_SET_BLOB_RET(&blob, -1);
1177: }
1178: break;
1179: default:
1180: rpc_SetErr(EPROCUNAVAIL, "Unsupported BLOB command %d", blob.hdr_cmd);
1181: blob.hdr_cmd = error;
1182: RPC_SET_BLOB_RET(&blob, -1);
1183: }
1184:
1185: end:
1186: memcpy(AIT_ADDR(&c->cli_buf), &blob, sizeof blob);
1187: schedWrite(TASK_ROOT(task), txBLOB, TASK_ARG(task), TASK_FD(task), NULL, 0);
1188: schedReadSelf(task);
1189: taskExit(task, NULL);
1190: }
1191:
1192: static void *
1193: flushBLOB(sched_task_t *task)
1194: {
1195: #ifdef atomic_load_acq_ptr
1196: uintptr_t sigArg = atomic_load_acq_ptr(&_glSigArg);
1197: #else
1198: uintptr_t sigArg = *((volatile uintptr_t*) &_glSigArg);
1199: #endif
1200: rpc_srv_t *srv = sigArg ? (void*) sigArg : TASK_ARG(task);
1201: rpc_blob_t *b, *tmp;
1202:
1203: TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
1204: TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
1205:
1206: rpc_srv_blobFree(srv, b);
1207: e_free(b);
1208: }
1209:
1210: if (sigArg) {
1211: /* disabled kqueue support in libaitsched */
1212: struct sigaction sa;
1213:
1214: memset(&sa, 0, sizeof sa);
1215: sigemptyset(&sa.sa_mask);
1216: sa.sa_handler = (void (*)(int)) flushBLOB;
1217: sa.sa_flags = SA_RESTART | SA_RESETHAND;
1218: sigaction(SIGFBLOB, &sa, NULL);
1219: return NULL;
1220: } else {
1221: schedSignalSelf(task);
1222: taskExit(task, NULL);
1223: }
1224: }
1225:
1226: static void *
1227: acceptBLOBClients(sched_task_t *task)
1228: {
1229: rpc_srv_t *srv = TASK_ARG(task);
1230: rpc_cli_t *c = NULL;
1231: register int i;
1232: socklen_t salen = E_SOCKADDR_MAX;
1233: int sock;
1234: #ifdef TCP_NOPUSH
1235: int n = 1;
1236: #endif
1237:
1238: /* check free slots for connect */
1239: for (i = 0; i < array_Size(srv->srv_blob.clients) &&
1240: (c = array(srv->srv_blob.clients, i, rpc_cli_t*)); i++);
1241: if (c) { /* no more free slots! */
1242: EVERBOSE(1, "BLOB client quota exceeded! Connection will be shutdown!\n");
1243: if ((sock = accept(TASK_FD(task), NULL, NULL)) != -1) {
1244: shutdown(sock, SHUT_RDWR);
1245: close(sock);
1246: }
1247: goto end;
1248: }
1249:
1250: c = e_malloc(sizeof(rpc_cli_t));
1251: if (!c) {
1252: LOGERR;
1253: srv->srv_kill = srv->srv_blob.kill = 1;
1254: taskExit(task, NULL);
1255: } else {
1256: memset(c, 0, sizeof(rpc_cli_t));
1257: array_Set(srv->srv_blob.clients, i, c);
1258: c->cli_id = i;
1259: c->cli_parent = srv;
1260: }
1261:
1262: /* alloc empty buffer */
1263: AIT_SET_BUFSIZ(&c->cli_buf, 0, srv->srv_netbuf);
1264:
1265: /* accept client */
1266: c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
1267: if (c->cli_sock == -1) {
1268: LOGERR;
1269: AIT_FREE_VAL(&c->cli_buf);
1270: array_Del(srv->srv_blob.clients, i, 42);
1271: goto end;
1272: } else {
1273: #ifdef TCP_NOPUSH
1274: setsockopt(c->cli_sock, IPPROTO_TCP, TCP_NOPUSH, &n, sizeof n);
1275: #endif
1276: fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
1277: fcntl(c->cli_sock, F_SETFD, FD_CLOEXEC);
1278: }
1279:
1280: schedRead(TASK_ROOT(task), rxBLOB, c, c->cli_sock, NULL, 0);
1281: end:
1282: schedReadSelf(task);
1283: taskExit(task, NULL);
1284: }
1285:
1286: /* ------------------------------------------------------ */
1287:
1288: /*
1289: * rpc_srv_initBLOBServer() - Init & create BLOB Server
1290: *
1291: * @srv = RPC server instance
1292: * @Port = Port for bind server, if Port == 0 default port is selected
1293: * @diskDir = Disk place for BLOB file objects
1294: * return: -1 == error or 0 bind and created BLOB server instance
1295: */
1296: int
1297: rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)
1298: {
1299: int n = 1;
1300: socklen_t salen;
1301:
1302: if (!srv || srv->srv_kill) {
1303: rpc_SetErr(EINVAL, "Invalid parameters can`t init BLOB server");
1304: return -1;
1305: }
1306:
1307: memset(&srv->srv_blob, 0, sizeof srv->srv_blob);
1308: if (access(diskDir, R_OK | W_OK) == -1) {
1309: LOGERR;
1310: return -1;
1311: } else
1312: AIT_SET_STR(&srv->srv_blob.dir, diskDir);
1313:
1314: /* init blob list */
1315: TAILQ_INIT(&srv->srv_blob.blobs);
1316:
1317: srv->srv_blob.server.cli_parent = srv;
1318:
1319: memcpy(&srv->srv_blob.server.cli_sa, &srv->srv_server.cli_sa, sizeof srv->srv_blob.server.cli_sa);
1320: switch (srv->srv_blob.server.cli_sa.sa.sa_family) {
1321: case AF_INET:
1322: srv->srv_blob.server.cli_sa.sin.sin_port =
1323: htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin.sin_port) + 1);
1324: salen = sizeof srv->srv_blob.server.cli_sa.sin;
1325: break;
1326: case AF_INET6:
1327: srv->srv_blob.server.cli_sa.sin6.sin6_port =
1328: htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin6.sin6_port) + 1);
1329: salen = sizeof srv->srv_blob.server.cli_sa.sin6;
1330: break;
1331: case AF_LOCAL:
1332: strlcat(srv->srv_blob.server.cli_sa.sun.sun_path, ".blob",
1333: sizeof srv->srv_blob.server.cli_sa.sun.sun_path);
1334: salen = sizeof srv->srv_blob.server.cli_sa.sun;
1335: break;
1336: default:
1337: AIT_FREE_VAL(&srv->srv_blob.dir);
1338: return -1;
1339: }
1340:
1341: /* create BLOB server socket */
1342: srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);
1343: if (srv->srv_blob.server.cli_sock == -1) {
1344: LOGERR;
1345: AIT_FREE_VAL(&srv->srv_blob.dir);
1346: return -1;
1347: }
1348: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
1349: LOGERR;
1350: close(srv->srv_blob.server.cli_sock);
1351: AIT_FREE_VAL(&srv->srv_blob.dir);
1352: return -1;
1353: }
1354: n = srv->srv_netbuf;
1355: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
1356: LOGERR;
1357: close(srv->srv_blob.server.cli_sock);
1358: AIT_FREE_VAL(&srv->srv_blob.dir);
1359: return -1;
1360: }
1361: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
1362: LOGERR;
1363: close(srv->srv_blob.server.cli_sock);
1364: AIT_FREE_VAL(&srv->srv_blob.dir);
1365: return -1;
1366: }
1367: if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa, salen) == -1) {
1368: LOGERR;
1369: close(srv->srv_blob.server.cli_sock);
1370: AIT_FREE_VAL(&srv->srv_blob.dir);
1371: return -1;
1372: } else
1373: fcntl(srv->srv_blob.server.cli_sock, F_SETFL,
1374: fcntl(srv->srv_blob.server.cli_sock, F_GETFL) | O_NONBLOCK);
1375:
1376:
1377: /* allocate pool for concurent blob clients */
1378: srv->srv_blob.clients = array_Init(array_Size(srv->srv_clients));
1379: if (!srv->srv_blob.clients) {
1380: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
1381: close(srv->srv_blob.server.cli_sock);
1382: AIT_FREE_VAL(&srv->srv_blob.dir);
1383: return -1;
1384: }
1385:
1386: /* init blob scheduler */
1387: srv->srv_blob.root = schedBegin();
1388: if (!srv->srv_blob.root) {
1389: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1390: array_Destroy(&srv->srv_blob.clients);
1391: close(srv->srv_blob.server.cli_sock);
1392: AIT_FREE_VAL(&srv->srv_blob.dir);
1393: return -1;
1394: }
1395:
1396: return 0;
1397: }
1398:
1399: /*
1400: * rpc_srv_endBLOBServer() - Destroy BLOB server, close all opened sockets and free resources
1401: *
1402: * @srv = RPC Server instance
1403: * return: none
1404: */
1405: void
1406: rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
1407: {
1408: if (!srv)
1409: return;
1410:
1411: srv->srv_blob.kill = 1;
1412:
1413: if (srv->srv_blob.server.cli_sa.sa.sa_family == AF_LOCAL)
1414: unlink(srv->srv_blob.server.cli_sa.sun.sun_path);
1415:
1416: schedEnd(&srv->srv_blob.root);
1417: }
1418:
1419: /*
1420: * rpc_srv_loopBLOBServer() - Execute Main BLOB server loop and wait for clients requests
1421: *
1422: * @srv = RPC Server instance
1423: * return: -1 error or 0 ok, infinite loop ...
1424: */
1425: int
1426: rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv)
1427: {
1428: rpc_cli_t *c;
1429: register int i;
1430: rpc_blob_t *b, *tmp;
1431: struct timespec ts = { RPC_SCHED_POLLING, 0 };
1432:
1433: if (!srv || srv->srv_kill) {
1434: rpc_SetErr(EINVAL, "Invalid parameter can`t start BLOB server");
1435: return -1;
1436: }
1437:
1438: if (listen(srv->srv_blob.server.cli_sock, array_Size(srv->srv_blob.clients)) == -1) {
1439: LOGERR;
1440: return -1;
1441: }
1442:
1443: if (!schedSignal(srv->srv_blob.root, flushBLOB, srv, SIGFBLOB, NULL, 0)) {
1444: /* disabled kqueue support in libaitsched */
1445: struct sigaction sa;
1446:
1447: #ifdef atomic_store_rel_ptr
1448: atomic_store_rel_ptr(&_glSigArg, (uintptr_t) srv);
1449: #else
1450: *((volatile uintptr_t*) &_glSigArg) = (uintptr_t) srv;
1451: #endif
1452:
1453: memset(&sa, 0, sizeof sa);
1454: sigemptyset(&sa.sa_mask);
1455: sa.sa_handler = (void (*)(int)) flushBLOB;
1456: sa.sa_flags = SA_RESTART | SA_RESETHAND;
1457: sigaction(SIGFBLOB, &sa, NULL);
1458: }
1459:
1460: if (!schedRead(srv->srv_blob.root, acceptBLOBClients, srv,
1461: srv->srv_blob.server.cli_sock, NULL, 0)) {
1462: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1463: return -1;
1464: }
1465:
1466: schedPolling(srv->srv_blob.root, &ts, NULL);
1467: /* main rpc loop */
1468: schedRun(srv->srv_blob.root, &srv->srv_blob.kill);
1469:
1470: /* detach blobs */
1471: TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
1472: TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
1473:
1474: rpc_srv_blobFree(srv, b);
1475: e_free(b);
1476: }
1477:
1478: /* close all clients connections & server socket */
1479: for (i = 0; i < array_Size(srv->srv_blob.clients); i++) {
1480: c = array(srv->srv_blob.clients, i, rpc_cli_t*);
1481: if (c) {
1482: shutdown(c->cli_sock, SHUT_RDWR);
1483: close(c->cli_sock);
1484:
1485: schedCancelby(srv->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
1486: AIT_FREE_VAL(&c->cli_buf);
1487: }
1488: array_Del(srv->srv_blob.clients, i, 42);
1489: }
1490: array_Destroy(&srv->srv_blob.clients);
1491:
1492: close(srv->srv_blob.server.cli_sock);
1493:
1494: AIT_FREE_VAL(&srv->srv_blob.dir);
1495: return 0;
1496: }
1497:
1498:
1499: /*
1500: * rpc_srv_initServer() - Init & create RPC Server
1501: *
1502: * @InstID = Instance for authentication & recognition
1503: * @concurentClients = Concurent clients at same time to this server
1504: * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
1505: * @csHost = Host name or address for bind server, if NULL any address
1506: * @Port = Port for bind server, if Port == 0 default port is selected
1507: * @proto = Protocol, if == 0 choose SOCK_STREAM
1508: * return: NULL == error or !=NULL bind and created RPC server instance
1509: */
1510: rpc_srv_t *
1511: rpc_srv_initServer(u_char InstID, int concurentClients, int netBuf,
1512: const char *csHost, u_short Port, int proto)
1513: {
1514: int n = 1;
1515: rpc_srv_t *srv = NULL;
1516: sockaddr_t sa = E_SOCKADDR_INIT;
1517: socklen_t salen;
1518:
1519: if (!concurentClients || (proto < 0 || proto > SOCK_RAW)) {
1520: rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
1521: return NULL;
1522: }
1523: if (!Port && proto < SOCK_RAW)
1524: Port = RPC_DEFPORT;
1525: if (!(salen = e_gethostbyname(csHost, Port, &sa)))
1526: return NULL;
1527: if (!proto)
1528: proto = SOCK_STREAM;
1529: if (netBuf < RPC_MIN_BUFSIZ)
1530: netBuf = BUFSIZ;
1531: else
1532: netBuf = E_ALIGN(netBuf, 2); /* align netBuf length */
1533:
1534: #ifdef HAVE_SRANDOMDEV
1535: srandomdev();
1536: #else
1537: time_t tim;
1538:
1539: srandom((time(&tim) ^ getpid()));
1540: #endif
1541:
1542: srv = e_malloc(sizeof(rpc_srv_t));
1543: if (!srv) {
1544: LOGERR;
1545: return NULL;
1546: } else
1547: memset(srv, 0, sizeof(rpc_srv_t));
1548:
1549: srv->srv_proto = proto;
1550: srv->srv_netbuf = netBuf;
1551: srv->srv_session.sess_version = RPC_VERSION;
1552: srv->srv_session.sess_instance = InstID;
1553:
1554: srv->srv_server.cli_parent = srv;
1555: memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa);
1556:
1557: /* init functions */
1558: pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
1559: SLIST_INIT(&srv->srv_funcs);
1560: AVL_INIT(&srv->srv_funcs);
1561:
1562: /* init scheduler */
1563: srv->srv_root = schedBegin();
1564: if (!srv->srv_root) {
1565: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1566: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1567: e_free(srv);
1568: return NULL;
1569: } else
1570: schedSignalDispatch(srv->srv_root, 42);
1571:
1572: /* init pool for clients */
1573: srv->srv_clients = array_Init(concurentClients);
1574: if (!srv->srv_clients) {
1575: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
1576: schedEnd(&srv->srv_root);
1577: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1578: e_free(srv);
1579: return NULL;
1580: }
1581:
1582: /* create server socket */
1583: srv->srv_server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family,
1584: srv->srv_proto, srv->srv_proto == SOCK_RAW ? IPPROTO_ERPC : 0);
1585: if (srv->srv_server.cli_sock == -1) {
1586: LOGERR;
1587: array_Destroy(&srv->srv_clients);
1588: schedEnd(&srv->srv_root);
1589: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1590: e_free(srv);
1591: return NULL;
1592: }
1593: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
1594: LOGERR;
1595: goto err;
1596: }
1597: if (srv->srv_proto == SOCK_STREAM)
1598: setsockopt(srv->srv_server.cli_sock, IPPROTO_TCP, TCP_NODELAY, &n, sizeof n);
1599: n = srv->srv_netbuf;
1600: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
1601: LOGERR;
1602: goto err;
1603: }
1604: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
1605: LOGERR;
1606: goto err;
1607: }
1608: if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa, salen) == -1) {
1609: LOGERR;
1610: goto err;
1611: } else
1612: fcntl(srv->srv_server.cli_sock, F_SETFL,
1613: fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
1614:
1615: rpc_register_srvPing(srv);
1616:
1617: return srv;
1618: err: /* error condition */
1619: close(srv->srv_server.cli_sock);
1620: array_Destroy(&srv->srv_clients);
1621: schedEnd(&srv->srv_root);
1622: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1623: e_free(srv);
1624: return NULL;
1625: }
1626:
1627: /*
1628: * rpc_srv_endServer() - Destroy RPC server, close all opened sockets and free resources
1629: *
1630: * @psrv = RPC Server instance
1631: * return: none
1632: */
1633: void
1634: rpc_srv_endServer(rpc_srv_t ** __restrict psrv)
1635: {
1636: if (!psrv || !*psrv)
1637: return;
1638:
1639: /* if send kill to blob server */
1640: rpc_srv_endBLOBServer(*psrv);
1641: /* wait for BLOB server done */
1642: while (*(&(*psrv)->srv_blob.root))
1643: usleep(1000);
1644:
1645: (*psrv)->srv_kill = 1;
1646: sleep(RPC_SCHED_POLLING);
1647:
1648: if ((*psrv)->srv_server.cli_sa.sa.sa_family == AF_LOCAL)
1649: unlink((*psrv)->srv_server.cli_sa.sun.sun_path);
1650:
1651: schedEnd(&(*psrv)->srv_root);
1652:
1653: pthread_mutex_destroy(&(*psrv)->srv_funcs.mtx);
1654: e_free(*psrv);
1655: *psrv = NULL;
1656: }
1657:
1658: /*
1659: * rpc_srv_loopServer() - Execute Main server loop and wait for clients requests
1660: *
1661: * @srv = RPC Server instance
1662: * return: -1 error or 0 ok, infinite loop ...
1663: */
1664: int
1665: rpc_srv_loopServer(rpc_srv_t * __restrict srv)
1666: {
1667: rpc_cli_t *c;
1668: register int i;
1669: rpc_func_t *f;
1670: struct timespec ts = { RPC_SCHED_POLLING, 0 };
1671:
1672: if (!srv) {
1673: rpc_SetErr(EINVAL, "Invalid parameter can`t start RPC server");
1674: return -1;
1675: }
1676:
1677: if (srv->srv_proto == SOCK_STREAM)
1678: if (listen(srv->srv_server.cli_sock, array_Size(srv->srv_clients)) == -1) {
1679: LOGERR;
1680: return -1;
1681: }
1682:
1683: if (!schedRead(srv->srv_root, cbProto[srv->srv_proto][CB_ACCEPTCLIENT], srv,
1684: srv->srv_server.cli_sock, NULL, 0)) {
1685: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1686: return -1;
1687: }
1688:
1689: schedPolling(srv->srv_root, &ts, NULL);
1690: /* main rpc loop */
1691: schedRun(srv->srv_root, &srv->srv_kill);
1692: schedSignalDispatch(srv->srv_root, 0);
1693:
1694: /* close all clients connections & server socket */
1695: for (i = 0; i < array_Size(srv->srv_clients); i++) {
1696: c = array(srv->srv_clients, i, rpc_cli_t*);
1697: if (c) {
1698: if (srv->srv_proto == SOCK_STREAM) {
1699: shutdown(c->cli_sock, SHUT_RDWR);
1700: close(c->cli_sock);
1701: }
1702:
1703: schedCancelby(srv->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
1704: ait_freeVars(&RPC_RETVARS(c));
1705: AIT_FREE_VAL(&c->cli_buf);
1706: }
1707: array_Del(srv->srv_clients, i, 42);
1708: }
1709: array_Destroy(&srv->srv_clients);
1710:
1711: if (srv->srv_proto != SOCK_EXT)
1712: close(srv->srv_server.cli_sock);
1713:
1714: /* detach exported calls */
1715: RPC_FUNCS_LOCK(&srv->srv_funcs);
1716: while ((f = SLIST_FIRST(&srv->srv_funcs))) {
1717: SLIST_REMOVE_HEAD(&srv->srv_funcs, func_next);
1718:
1719: AIT_FREE_VAL(&f->func_name);
1720: e_free(f);
1721: }
1722: srv->srv_funcs.avlh_root = NULL;
1723: RPC_FUNCS_UNLOCK(&srv->srv_funcs);
1724:
1725: return 0;
1726: }
1727:
1728:
1729: /*
1730: * rpc_srv_execCall() Execute registered call from RPC server
1731: *
1732: * @cli = RPC client
1733: * @rpc = IN RPC call structure
1734: * @funcname = Execute RPC function
1735: * @args = IN RPC calling arguments from RPC client
1736: * return: -1 error, !=-1 ok
1737: */
1738: int
1739: rpc_srv_execCall(rpc_cli_t * __restrict cli, struct tagRPCCall * __restrict rpc,
1740: ait_val_t funcname, array_t * __restrict args)
1741: {
1742: rpc_callback_t func;
1743:
1744: if (!cli || !rpc || !AIT_ADDR(&funcname)) {
1745: rpc_SetErr(EINVAL, "Invalid parameter can`t exec function");
1746: return -1;
1747: }
1748:
1749: func = AIT_GET_LIKE(&funcname, rpc_callback_t);
1750: return func(cli, rpc, args);
1751: }
1752:
1753:
1754: /*
1755: * rpc_srv_initServer2() - Init & create layer2 RPC Server
1756: *
1757: * @InstID = Instance for authentication & recognition
1758: * @concurentClients = Concurent clients at same time to this server
1759: * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
1760: * @csIface = Interface name for bind server, if NULL first interface on host
1761: * return: NULL == error or !=NULL bind and created RPC server instance
1762: */
1763: rpc_srv_t *
1764: rpc_srv_initServer2(u_char InstID, int concurentClients, int netBuf, const char *csIface)
1765: {
1766: #ifndef __linux__
1767: int n = 1;
1768: rpc_srv_t *srv = NULL;
1769: sockaddr_t sa = E_SOCKADDR_INIT;
1770: char szIface[64], szStr[STRSIZ];
1771: register int i;
1772: struct ifreq ifr;
1773: struct bpf_insn insns[] = {
1774: BPF_STMT(BPF_LD + BPF_H + BPF_ABS, 12),
1775: BPF_JUMP(BPF_JMP + BPF_JEQ + BPF_K, RPC_DEFPORT, 0, 1),
1776: BPF_STMT(BPF_RET + BPF_K, -1),
1777: BPF_STMT(BPF_RET + BPF_K, 0),
1778: };
1779: struct bpf_program fcode = {
1780: .bf_len = sizeof(insns) / sizeof(struct bpf_insn),
1781: .bf_insns = insns
1782: };
1783:
1784: if (!concurentClients) {
1785: rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
1786: return NULL;
1787: }
1788: if (!csIface) {
1789: if (e_get1stiface(szIface, sizeof szIface))
1790: return NULL;
1791: } else
1792: strlcpy(szIface, csIface, sizeof szIface);
1793: if (!e_getifacebyname(szIface, &sa))
1794: return NULL;
1795:
1796: #ifdef HAVE_SRANDOMDEV
1797: srandomdev();
1798: #else
1799: time_t tim;
1800:
1801: srandom((time(&tim) ^ getpid()));
1802: #endif
1803:
1804: srv = e_malloc(sizeof(rpc_srv_t));
1805: if (!srv) {
1806: LOGERR;
1807: return NULL;
1808: } else
1809: memset(srv, 0, sizeof(rpc_srv_t));
1810:
1811: srv->srv_proto = SOCK_BPF;
1812: srv->srv_netbuf = netBuf;
1813: srv->srv_session.sess_version = RPC_VERSION;
1814: srv->srv_session.sess_instance = InstID;
1815:
1816: srv->srv_server.cli_parent = srv;
1817: memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa);
1818:
1819: /* init functions */
1820: pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
1821: SLIST_INIT(&srv->srv_funcs);
1822: AVL_INIT(&srv->srv_funcs);
1823:
1824: /* init scheduler */
1825: srv->srv_root = schedBegin();
1826: if (!srv->srv_root) {
1827: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1828: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1829: e_free(srv);
1830: return NULL;
1831: } else
1832: schedSignalDispatch(srv->srv_root, 42);
1833:
1834: /* init pool for clients */
1835: srv->srv_clients = array_Init(concurentClients);
1836: if (!srv->srv_clients) {
1837: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
1838: schedEnd(&srv->srv_root);
1839: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1840: e_free(srv);
1841: return NULL;
1842: }
1843:
1844: /* create server handler */
1845: for (i = 0; i < 10; i++) {
1846: memset(szStr, 0, sizeof szStr);
1847: snprintf(szStr, sizeof szStr, "/dev/bpf%d", i);
1848: srv->srv_server.cli_sock = open(szStr, O_RDWR);
1849: if (srv->srv_server.cli_sock > STDERR_FILENO)
1850: break;
1851: }
1852: if (srv->srv_server.cli_sock < 3) {
1853: LOGERR;
1854: array_Destroy(&srv->srv_clients);
1855: schedEnd(&srv->srv_root);
1856: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1857: e_free(srv);
1858: return NULL;
1859: }
1860:
1861: if (ioctl(srv->srv_server.cli_sock, BIOCIMMEDIATE, &n) == -1) {
1862: LOGERR;
1863: goto err;
1864: }
1865: if (ioctl(srv->srv_server.cli_sock, BIOCSETF, &fcode) == -1) {
1866: LOGERR;
1867: goto err;
1868: }
1869: n = (netBuf < RPC_MIN_BUFSIZ) ? getpagesize() : E_ALIGN(netBuf, 2);
1870: if (ioctl(srv->srv_server.cli_sock, BIOCSBLEN, &n) == -1) {
1871: LOGERR;
1872: goto err;
1873: } else
1874: srv->srv_netbuf = n;
1875:
1876: memset(&ifr, 0, sizeof ifr);
1877: strlcpy(ifr.ifr_name, szIface, sizeof ifr.ifr_name);
1878: if (ioctl(srv->srv_server.cli_sock, BIOCSETIF, &ifr) == -1) {
1879: LOGERR;
1880: goto err;
1881: } else
1882: fcntl(srv->srv_server.cli_sock, F_SETFL,
1883: fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
1884:
1885: rpc_register_srvPing(srv);
1886:
1887: return srv;
1888: err: /* error condition */
1889: close(srv->srv_server.cli_sock);
1890: array_Destroy(&srv->srv_clients);
1891: schedEnd(&srv->srv_root);
1892: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1893: e_free(srv);
1894: #else
1895: rpc_SetErr(ENOTSUP, "Feature isn't supported on Linux!");
1896: #endif
1897:
1898: return NULL;
1899: }
1900:
1901: /*
1902: * rpc_srv_initServerExt() - Init & create pipe RPC Server
1903: *
1904: * @InstID = Instance for authentication & recognition
1905: * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
1906: * @fd = File descriptor
1907: * return: NULL == error or !=NULL bind and created RPC server instance
1908: */
1909: rpc_srv_t *
1910: rpc_srv_initServerExt(u_char InstID, int netBuf, int fd)
1911: {
1912: rpc_srv_t *srv = NULL;
1913:
1914: #ifdef HAVE_SRANDOMDEV
1915: srandomdev();
1916: #else
1917: time_t tim;
1918:
1919: srandom((time(&tim) ^ getpid()));
1920: #endif
1921:
1922: srv = e_malloc(sizeof(rpc_srv_t));
1923: if (!srv) {
1924: LOGERR;
1925: return NULL;
1926: } else
1927: memset(srv, 0, sizeof(rpc_srv_t));
1928:
1929: srv->srv_proto = SOCK_EXT;
1930: srv->srv_netbuf = (netBuf < RPC_MIN_BUFSIZ) ?
1931: getpagesize() : E_ALIGN(netBuf, 2);
1932: srv->srv_session.sess_version = RPC_VERSION;
1933: srv->srv_session.sess_instance = InstID;
1934:
1935: srv->srv_server.cli_parent = srv;
1936: srv->srv_server.cli_sock = fd;
1937:
1938: /* init functions */
1939: pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
1940: SLIST_INIT(&srv->srv_funcs);
1941: AVL_INIT(&srv->srv_funcs);
1942:
1943: /* init scheduler */
1944: srv->srv_root = schedBegin();
1945: if (!srv->srv_root) {
1946: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1947: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1948: e_free(srv);
1949: return NULL;
1950: } else
1951: schedSignalDispatch(srv->srv_root, 42);
1952:
1953: /* init pool for clients */
1954: srv->srv_clients = array_Init(1);
1955: if (!srv->srv_clients) {
1956: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
1957: schedEnd(&srv->srv_root);
1958: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1959: e_free(srv);
1960: return NULL;
1961: }
1962:
1963: fcntl(srv->srv_server.cli_sock, F_SETFL,
1964: fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
1965:
1966: rpc_register_srvPing(srv);
1967:
1968: return srv;
1969: }
1970:
1971: /*
1972: * rpc_srv_Return() - Prepare IPC return answer to RPC client
1973: *
1974: * @c = RPC client
1975: * return: number of arguments in response
1976: */
1977: int
1978: rpc_srv_Return(rpc_cli_t *c)
1979: {
1980: rpc_srv_t *s = c->cli_parent;
1981: u_char *buf = AIT_GET_BUF(&c->cli_buf);
1982: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
1983:
1984: if (!RPC_CHK_NOREPLY(rpc)) {
1985: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
1986: schedWrite(s->srv_root, cbProto[s->srv_proto][CB_TXPACKET], c, c->cli_sock, rpc, 0);
1987: } else
1988: rpc->call_argc ^= rpc->call_argc;
1989:
1990: return rpc->call_argc;
1991: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>