1: /*************************************************************************
2: * (C) 2010 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
6: * $Id: srv.c,v 1.26.2.6 2015/06/28 21:40:46 misho Exp $
7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
15: Copyright 2004 - 2015
16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47:
48:
49: /* SOCK_STREAM */
50: static void *acceptClients(sched_task_t *);
51: static void *closeClient(sched_task_t *);
52: static void *rxPacket(sched_task_t *);
53: static void *txPacket(sched_task_t *);
54:
55: /* SOCK_DGRAM */
56: static void *freeClient(sched_task_t *);
57: static void *rxUDPPacket(sched_task_t *);
58: static void *txUDPPacket(sched_task_t *);
59:
60: /* SOCK_RAW */
61: static void *rxRAWPacket(sched_task_t *);
62: static void *txRAWPacket(sched_task_t *);
63:
64: /* SOCK_BPF */
65: static void *rxBPFPacket(sched_task_t *);
66: static void *txBPFPacket(sched_task_t *);
67:
68: /* SOCK_EXT */
69: static void *rxEXTPacket(sched_task_t *);
70: static void *txEXTPacket(sched_task_t *);
71:
72: static sched_task_func_t cbProto[SOCK_MAX_SUPPORT][4] = {
73: { acceptClients, closeClient, rxPacket, txPacket }, /* SOCK_STREAM */
74: { acceptClients, closeClient, rxPacket, txPacket }, /* SOCK_STREAM */
75: { rxUDPPacket, freeClient, rxUDPPacket, txUDPPacket }, /* SOCK_DGRAM */
76: { rxRAWPacket, freeClient, rxRAWPacket, txRAWPacket }, /* SOCK_RAW */
77: { rxBPFPacket, freeClient, rxBPFPacket, txBPFPacket }, /* SOCK_BPF */
78: { rxEXTPacket, freeClient, rxEXTPacket, txEXTPacket } /* SOCK_EXT */
79: };
80:
81: /* Global Signal Argument when kqueue support disabled */
82:
83: static volatile uintptr_t _glSigArg = 0;
84:
85:
86: void
87: rpc_freeCli(rpc_cli_t * __restrict c)
88: {
89: rpc_srv_t *s = c->cli_parent;
90:
91: schedCancelby(s->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
92:
93: /* free buffer */
94: AIT_FREE_VAL(&c->cli_buf);
95:
96: array_Del(s->srv_clients, c->cli_id, 0);
97: if (c)
98: e_free(c);
99: }
100:
101:
102: static inline int
103: _check4freeslot(rpc_srv_t * __restrict srv, sockaddr_t * __restrict sa)
104: {
105: rpc_cli_t *c = NULL;
106: register int i;
107:
108: /* check free slots for connect */
109: for (i = 0; i < array_Size(srv->srv_clients) &&
110: (c = array(srv->srv_clients, i, rpc_cli_t*)); i++)
111: /* check for duplicates */
112: if (sa && !e_addrcmp(&c->cli_sa, sa, 42))
113: break;
114: if (i >= array_Size(srv->srv_clients))
115: return -1; /* no more free slots! */
116:
117: return i;
118: }
119:
120: static rpc_cli_t *
121: _allocClient(rpc_srv_t * __restrict srv, sockaddr_t * __restrict sa)
122: {
123: rpc_cli_t *c = NULL;
124: int n;
125:
126: if (srv->srv_proto != SOCK_EXT)
127: n = _check4freeslot(srv, sa);
128: else
129: n = 0;
130: if (n == -1)
131: return NULL;
132: else
133: c = array(srv->srv_clients, n, rpc_cli_t*);
134:
135: if (!c) {
136: c = e_malloc(sizeof(rpc_cli_t));
137: if (!c) {
138: LOGERR;
139: srv->srv_kill = 1;
140: return NULL;
141: } else {
142: memset(c, 0, sizeof(rpc_cli_t));
143: array_Set(srv->srv_clients, n, c);
144: c->cli_id = n;
145: c->cli_parent = srv;
146: }
147:
148: /* alloc empty buffer */
149: AIT_SET_BUFSIZ(&c->cli_buf, 0, srv->srv_netbuf);
150: }
151:
152: return c;
153: }
154:
155:
156: static void *
157: freeClient(sched_task_t *task)
158: {
159: rpc_freeCli(TASK_ARG(task));
160:
161: return NULL;
162: }
163:
164: static void *
165: closeClient(sched_task_t *task)
166: {
167: int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock;
168:
169: rpc_freeCli(TASK_ARG(task));
170:
171: /* close client socket */
172: shutdown(sock, SHUT_RDWR);
173: close(sock);
174: return NULL;
175: }
176:
177: static void *
178: txPacket(sched_task_t *task)
179: {
180: rpc_cli_t *c = TASK_ARG(task);
181: rpc_srv_t *s = c->cli_parent;
182: rpc_func_t *f = NULL;
183: u_char *buf = AIT_GET_BUF(&c->cli_buf);
184: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
185: int ret, estlen, wlen = sizeof(struct tagRPCCall);
186: struct pollfd pfd;
187: #ifdef TCP_SESSION_TIMEOUT
188: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
189:
190: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
191: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
192: TASK_ARG(task), ts, TASK_ARG(task), 0);
193: #endif
194:
195: if (rpc->call_argc) {
196: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
197: if (!f) {
198: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
199:
200: rpc->call_argc ^= rpc->call_argc;
201: rpc->call_rep.ret = RPC_ERROR(-1);
202: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
203: } else {
204: /* calc estimated length */
205: estlen = ait_resideVars(RPC_RETVARS(c)) + wlen;
206: if (estlen > AIT_LEN(&c->cli_buf))
207: AIT_RE_BUF(&c->cli_buf, estlen);
208: buf = AIT_GET_BUF(&c->cli_buf);
209: rpc = (struct tagRPCCall*) buf;
210:
211: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
212: /* Go Encapsulate variables */
213: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
214: RPC_RETVARS(c));
215: if (ret == -1) {
216: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
217:
218: rpc->call_argc ^= rpc->call_argc;
219: rpc->call_rep.ret = RPC_ERROR(-1);
220: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
221: } else
222: wlen += ret;
223: }
224: }
225:
226: /* Free return values */
227: ait_freeVars(&c->cli_vars);
228:
229: rpc->call_len = htonl(wlen);
230: rpc->call_io = RPC_ACK;
231:
232: #if 0
233: /* calculate CRC */
234: rpc->call_crc ^= rpc->call_crc;
235: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
236: #endif
237:
238: /* send reply */
239: pfd.fd = TASK_FD(task);
240: pfd.events = POLLOUT;
241: for (; wlen > 0; wlen -= ret, buf += ret) {
242: if ((ret = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 ||
243: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
244: if (ret)
245: LOGERR;
246: else
247: rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
248: /* close connection */
249: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
250: TASK_ARG(task), 0, NULL, 0);
251: return NULL;
252: }
253: ret = send(TASK_FD(task), buf, MIN(wlen, s->srv_netbuf), MSG_NOSIGNAL);
254: if (ret == -1) {
255: /* close connection */
256: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
257: TASK_ARG(task), 0, NULL, 0);
258: return NULL;
259: }
260: }
261:
262: return NULL;
263: }
264:
265: static void *
266: execCall(sched_task_t *task)
267: {
268: rpc_cli_t *c = TASK_ARG(task);
269: rpc_srv_t *s = c->cli_parent;
270: rpc_func_t *f = NULL;
271: array_t *arr = NULL;
272: u_char *buf = AIT_GET_BUF(&c->cli_buf);
273: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
274: int argc = rpc->call_argc;
275:
276: /* Go decapsulate variables ... */
277: if (argc) {
278: arr = ait_buffer2vars(buf + sizeof(struct tagRPCCall),
279: AIT_LEN(&c->cli_buf) - sizeof(struct tagRPCCall), argc, 42);
280: if (!arr) {
281: rpc_SetErr(ERPCMISMATCH, "#%d - %s", elwix_GetErrno(), elwix_GetError());
282:
283: rpc->call_argc ^= rpc->call_argc;
284: rpc->call_rep.ret = RPC_ERROR(-1);
285: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
286: return NULL;
287: }
288: } else
289: arr = NULL;
290:
291: if (!(f = rpc_srv_getCall(s, ntohs(rpc->call_tag)))) {
292: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
293:
294: rpc->call_argc ^= rpc->call_argc;
295: rpc->call_rep.ret = RPC_ERROR(-1);
296: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
297: } else {
298: /* if client doesn't want reply */
299: argc = RPC_CHK_NOREPLY(rpc);
300: rpc->call_rep.ret = RPC_ERROR(rpc_srv_execCall(c, rpc, f->func_name, arr));
301: if (rpc->call_rep.ret == htonl(-1)) {
302: if (!rpc->call_rep.eno) {
303: LOGERR;
304: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
305: }
306: rpc->call_argc ^= rpc->call_argc;
307: ait_freeVars(&c->cli_vars);
308: } else {
309: rpc->call_rep.eno ^= rpc->call_rep.eno;
310: if (argc) {
311: /* without reply */
312: ait_freeVars(&c->cli_vars);
313: rpc->call_argc ^= rpc->call_argc;
314: } else {
315: /* reply */
316: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
317: }
318: }
319: }
320:
321: array_Destroy(&arr);
322: return NULL;
323: }
324:
325: static void *
326: rxPacket(sched_task_t *task)
327: {
328: rpc_cli_t *c = TASK_ARG(task);
329: rpc_srv_t *s = c->cli_parent;
330: int len, rlen, noreply, estlen;
331: #if 0
332: u_short crc;
333: #endif
334: u_char *buf = AIT_GET_BUF(&c->cli_buf);
335: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
336: struct pollfd pfd;
337: #ifdef TCP_SESSION_TIMEOUT
338: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
339:
340: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
341: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
342: TASK_ARG(task), ts, TASK_ARG(task), 0);
343: #endif
344:
345: memset(buf, 0, sizeof(struct tagRPCCall));
346: rlen = recv(TASK_FD(task), rpc, sizeof(struct tagRPCCall), MSG_PEEK);
347: if (rlen < sizeof(struct tagRPCCall)) {
348: /* close connection */
349: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
350: TASK_ARG(task), 0, NULL, 0);
351: return NULL;
352: } else {
353: estlen = ntohl(rpc->call_len);
354: if (estlen > AIT_LEN(&c->cli_buf))
355: AIT_RE_BUF(&c->cli_buf, estlen);
356: rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
357: buf = AIT_GET_BUF(&c->cli_buf);
358: len = estlen;
359: }
360:
361: /* get next part of packet */
362: memset(buf, 0, len);
363: pfd.fd = TASK_FD(task);
364: pfd.events = POLLIN | POLLPRI;
365: for (; len > 0; len -= rlen, buf += rlen) {
366: if ((rlen = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 ||
367: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
368: if (rlen)
369: LOGERR;
370: else
371: rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
372: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
373: TASK_ARG(task), 0, NULL, 0);
374: return NULL;
375: }
376: rlen = recv(TASK_FD(task), buf, len, 0);
377: if (rlen == -1) {
378: /* close connection */
379: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
380: TASK_ARG(task), 0, NULL, 0);
381: return NULL;
382: }
383: }
384: len = estlen;
385:
386: /* skip loop packet */
387: if (rpc->call_io & RPC_ACK) {
388: schedReadSelf(task);
389: return NULL;
390: }
391:
392: #if 0
393: /* check integrity of packet */
394: crc = ntohs(rpc->call_crc);
395: rpc->call_crc ^= rpc->call_crc;
396: if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
397: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
398: return NULL;
399: }
400: #endif
401:
402: noreply = RPC_CHK_NOREPLY(rpc);
403:
404: /* check RPC packet session info */
405: if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) {
406: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
407:
408: rpc->call_argc ^= rpc->call_argc;
409: rpc->call_rep.ret = RPC_ERROR(-1);
410: rpc->call_rep.eno = RPC_ERROR(errno);
411: } else {
412: /* execute RPC call */
413: schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), (int) noreply, rpc, len);
414: }
415:
416: /* send RPC reply */
417: if (!noreply)
418: schedWrite(TASK_ROOT(task), cbProto[s->srv_proto][CB_TXPACKET],
419: TASK_ARG(task), TASK_FD(task), rpc, len);
420:
421: /* lets get next packet */
422: schedReadSelf(task);
423: return NULL;
424: }
425:
426: static void *
427: acceptClients(sched_task_t *task)
428: {
429: rpc_srv_t *srv = TASK_ARG(task);
430: rpc_cli_t *c = NULL;
431: socklen_t salen = sizeof(sockaddr_t);
432: int sock;
433: #ifdef TCP_SESSION_TIMEOUT
434: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
435: #endif
436:
437: c = _allocClient(srv, NULL);
438: if (!c) {
439: EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
440: if ((sock = accept(TASK_FD(task), NULL, NULL)) != -1) {
441: shutdown(sock, SHUT_RDWR);
442: close(sock);
443: }
444: goto end;
445: }
446:
447: /* accept client */
448: c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
449: if (c->cli_sock == -1) {
450: LOGERR;
451: AIT_FREE_VAL(&c->cli_buf);
452: array_Del(srv->srv_clients, c->cli_id, 42);
453: goto end;
454: } else
455: fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
456:
457: #ifdef TCP_SESSION_TIMEOUT
458: /* armed timer for close stateless connection */
459: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
460: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], c,
461: ts, c, 0);
462: #endif
463: schedRead(TASK_ROOT(task), cbProto[srv->srv_proto][CB_RXPACKET], c,
464: c->cli_sock, NULL, 0);
465: end:
466: schedReadSelf(task);
467: return NULL;
468: }
469:
470:
471: static void *
472: txUDPPacket(sched_task_t *task)
473: {
474: rpc_cli_t *c = TASK_ARG(task);
475: rpc_srv_t *s = c->cli_parent;
476: rpc_func_t *f = NULL;
477: u_char *buf = AIT_GET_BUF(&c->cli_buf);
478: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
479: int ret, estlen, wlen = sizeof(struct tagRPCCall);
480: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
481:
482: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
483: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
484: TASK_ARG(task), ts, TASK_ARG(task), 0);
485:
486: if (rpc->call_argc) {
487: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
488: if (!f) {
489: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
490: rpc->call_argc ^= rpc->call_argc;
491: rpc->call_rep.ret = RPC_ERROR(-1);
492: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
493: } else {
494: /* calc estimated length */
495: estlen = ait_resideVars(RPC_RETVARS(c)) + wlen;
496: if (estlen > AIT_LEN(&c->cli_buf))
497: AIT_RE_BUF(&c->cli_buf, estlen);
498: buf = AIT_GET_BUF(&c->cli_buf);
499: rpc = (struct tagRPCCall*) buf;
500:
501: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
502: /* Go Encapsulate variables */
503: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
504: RPC_RETVARS(c));
505: if (ret == -1) {
506: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
507: rpc->call_argc ^= rpc->call_argc;
508: rpc->call_rep.ret = RPC_ERROR(-1);
509: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
510: } else
511: wlen += ret;
512: }
513: }
514:
515: /* Free return values */
516: ait_freeVars(&c->cli_vars);
517:
518: rpc->call_len = htonl(wlen);
519: rpc->call_io = RPC_ACK;
520:
521: /* calculate CRC */
522: rpc->call_crc ^= rpc->call_crc;
523: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
524:
525: /* send reply */
526: ret = sendto(TASK_FD(task), buf, MIN(wlen, s->srv_netbuf), MSG_NOSIGNAL,
527: &c->cli_sa.sa, c->cli_sa.sa.sa_len);
528: if (ret == -1) {
529: /* close connection */
530: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
531: TASK_ARG(task), 0, NULL, 0);
532: return NULL;
533: }
534:
535: return NULL;
536: }
537:
538: static void *
539: rxUDPPacket(sched_task_t *task)
540: {
541: rpc_srv_t *srv = TASK_ARG(task);
542: rpc_cli_t *c = NULL;
543: int len, rlen, noreply;
544: u_short crc;
545: struct tagRPCCall *rpc;
546: sockaddr_t sa;
547: socklen_t salen;
548: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
549: ait_val_t b = AIT_VAL_INIT;
550:
551: /* receive connect packet */
552: AIT_SET_BUF(&b, NULL, srv->srv_netbuf);
553: salen = sa.ss.ss_len = sizeof(sockaddr_t);
554: rlen = recvfrom(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b), 0, &sa.sa, &salen);
555: rpc = (struct tagRPCCall*) AIT_GET_BUF(&b);
556: if (rlen < sizeof(struct tagRPCCall) || rlen < ntohl(rpc->call_len)) {
557: rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
558: goto end;
559: }
560:
561: /* skip loop packet */
562: if (rpc->call_io & RPC_ACK)
563: goto end;
564:
565: c = _allocClient(srv, &sa);
566: if (!c) {
567: EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
568: usleep(2000); /* blocked client delay */
569: goto end;
570: } else {
571: len = ntohl(rpc->call_len);
572: if (len > AIT_LEN(&c->cli_buf))
573: AIT_RE_BUF(&c->cli_buf, len);
574: memcpy(AIT_GET_BUF(&c->cli_buf), AIT_GET_BUF(&b), len);
575: rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
576:
577: c->cli_sock = TASK_FD(task);
578: memcpy(&c->cli_sa, &sa, sizeof c->cli_sa);
579:
580: /* armed timer for close stateless connection */
581: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
582: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
583: c, ts, c, 0);
584: }
585:
586: /* check integrity of packet */
587: crc = ntohs(rpc->call_crc);
588: rpc->call_crc ^= rpc->call_crc;
589: if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
590: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
591: /* close connection */
592: schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
593: c, 0, NULL, 0);
594: goto end;
595: }
596:
597: noreply = RPC_CHK_NOREPLY(rpc);
598:
599: /* check RPC packet session info */
600: if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
601: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
602:
603: rpc->call_argc ^= rpc->call_argc;
604: rpc->call_rep.ret = RPC_ERROR(-1);
605: rpc->call_rep.eno = RPC_ERROR(errno);
606: } else {
607: /* execute RPC call */
608: schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
609: }
610:
611: /* send RPC reply */
612: if (!noreply)
613: schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],
614: c, TASK_FD(task), rpc, len);
615: end:
616: AIT_FREE_VAL(&b);
617: schedReadSelf(task);
618: return NULL;
619: }
620:
621:
622: static void *
623: txRAWPacket(sched_task_t *task)
624: {
625: rpc_cli_t *c = TASK_ARG(task);
626: rpc_srv_t *s = c->cli_parent;
627: rpc_func_t *f = NULL;
628: u_char *buf = AIT_GET_BUF(&c->cli_buf);
629: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
630: int ret, estlen, wlen = sizeof(struct tagRPCCall);
631: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
632:
633: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
634: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
635: TASK_ARG(task), ts, TASK_ARG(task), 0);
636:
637: if (rpc->call_argc) {
638: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
639: if (!f) {
640: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
641: rpc->call_argc ^= rpc->call_argc;
642: rpc->call_rep.ret = RPC_ERROR(-1);
643: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
644: } else {
645: /* calc estimated length */
646: estlen = ait_resideVars(RPC_RETVARS(c)) + wlen;
647: if (estlen > AIT_LEN(&c->cli_buf))
648: AIT_RE_BUF(&c->cli_buf, estlen);
649: buf = AIT_GET_BUF(&c->cli_buf);
650: rpc = (struct tagRPCCall*) buf;
651:
652: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
653: /* Go Encapsulate variables */
654: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
655: RPC_RETVARS(c));
656: if (ret == -1) {
657: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
658: rpc->call_argc ^= rpc->call_argc;
659: rpc->call_rep.ret = RPC_ERROR(-1);
660: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
661: } else
662: wlen += ret;
663: }
664: }
665:
666: /* Free return values */
667: ait_freeVars(&c->cli_vars);
668:
669: rpc->call_len = htonl(wlen);
670: rpc->call_io = RPC_ACK;
671:
672: /* calculate CRC */
673: rpc->call_crc ^= rpc->call_crc;
674: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
675:
676: /* send reply */
677: ret = sendto(TASK_FD(task), buf, MIN(wlen, s->srv_netbuf), MSG_NOSIGNAL,
678: &c->cli_sa.sa, c->cli_sa.sa.sa_len);
679: if (ret == -1) {
680: /* close connection */
681: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
682: TASK_ARG(task), 0, NULL, 0);
683: return NULL;
684: }
685:
686: return NULL;
687: }
688:
689: static void *
690: rxRAWPacket(sched_task_t *task)
691: {
692: rpc_srv_t *srv = TASK_ARG(task);
693: rpc_cli_t *c = NULL;
694: int len, rlen, noreply;
695: u_short crc;
696: struct tagRPCCall *rpc;
697: sockaddr_t sa;
698: socklen_t salen;
699: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
700: ait_val_t b = AIT_VAL_INIT;
701:
702: /* receive connect packet */
703: AIT_SET_BUF(&b, NULL, srv->srv_netbuf);
704: salen = sa.ss.ss_len = sizeof(sockaddr_t);
705: rlen = recvfrom(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b), 0, &sa.sa, &salen);
706: if (sa.sa.sa_family == AF_INET) {
707: struct ip *h;
708: h = (struct ip*) AIT_GET_BUF(&b);
709: if (rlen < ntohs(h->ip_len) || h->ip_p != IPPROTO_ERPC)
710: goto end;
711: else {
712: rlen -= sizeof(struct ip);
713: rpc = (struct tagRPCCall*)
714: (AIT_GET_BUF(&b) + sizeof(struct ip));
715: }
716: } else {
717: struct ip6_hdr *h;
718: h = (struct ip6_hdr*) AIT_GET_BUF(&b);
719: if (rlen < (ntohs(h->ip6_plen) + sizeof(struct ip6_hdr)) ||
720: h->ip6_nxt != IPPROTO_ERPC)
721: goto end;
722: else {
723: rlen -= sizeof(struct ip6_hdr);
724: rpc = (struct tagRPCCall*)
725: (AIT_GET_BUF(&b) + sizeof(struct ip6_hdr));
726: }
727: }
728: if (rlen < sizeof(struct tagRPCCall) || rlen < ntohl(rpc->call_len)) {
729: rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
730: goto end;
731: }
732:
733: /* skip loop packet */
734: if (rpc->call_io & RPC_ACK)
735: goto end;
736:
737: c = _allocClient(srv, &sa);
738: if (!c) {
739: EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
740: usleep(2000); /* blocked client delay */
741: goto end;
742: } else {
743: len = ntohl(rpc->call_len);
744: if (len > AIT_LEN(&c->cli_buf))
745: AIT_RE_BUF(&c->cli_buf, len);
746: memcpy(AIT_GET_BUF(&c->cli_buf), rpc, len);
747: rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
748:
749: c->cli_sock = TASK_FD(task);
750: memcpy(&c->cli_sa, &sa, sizeof c->cli_sa);
751:
752: /* armed timer for close stateless connection */
753: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
754: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
755: c, ts, c, 0);
756: }
757:
758: /* check integrity of packet */
759: crc = ntohs(rpc->call_crc);
760: rpc->call_crc ^= rpc->call_crc;
761: if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
762: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
763: /* close connection */
764: schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
765: c, 0, NULL, 0);
766: goto end;
767: }
768:
769: noreply = RPC_CHK_NOREPLY(rpc);
770:
771: /* check RPC packet session info */
772: if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
773: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
774:
775: rpc->call_argc ^= rpc->call_argc;
776: rpc->call_rep.ret = RPC_ERROR(-1);
777: rpc->call_rep.eno = RPC_ERROR(errno);
778: } else {
779: /* execute RPC call */
780: schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
781: }
782:
783: /* send RPC reply */
784: if (!noreply)
785: schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],
786: c, TASK_FD(task), rpc, len);
787: end:
788: AIT_FREE_VAL(&b);
789: schedReadSelf(task);
790: return NULL;
791: }
792:
793:
794: static void *
795: txBPFPacket(sched_task_t *task)
796: {
797: rpc_cli_t *c = TASK_ARG(task);
798: rpc_srv_t *s = c->cli_parent;
799: rpc_func_t *f = NULL;
800: u_char *buf = AIT_GET_BUF(&c->cli_buf);
801: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
802: int ret, len, wlen = sizeof(struct tagRPCCall);
803: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
804: struct ether_header *eh;
805: ait_val_t b = AIT_VAL_INIT;
806:
807: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
808: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
809: TASK_ARG(task), ts, TASK_ARG(task), 0);
810:
811: if (rpc->call_argc) {
812: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
813: if (!f) {
814: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
815: rpc->call_argc ^= rpc->call_argc;
816: rpc->call_rep.ret = RPC_ERROR(-1);
817: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
818: } else {
819: /* calc estimated length */
820: len = ait_resideVars(RPC_RETVARS(c)) + wlen;
821: if (len > AIT_LEN(&c->cli_buf))
822: AIT_RE_BUF(&c->cli_buf, len);
823: buf = AIT_GET_BUF(&c->cli_buf);
824: rpc = (struct tagRPCCall*) buf;
825:
826: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
827: /* Go Encapsulate variables */
828: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
829: RPC_RETVARS(c));
830: if (ret == -1) {
831: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
832: rpc->call_argc ^= rpc->call_argc;
833: rpc->call_rep.ret = RPC_ERROR(-1);
834: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
835: } else
836: wlen += ret;
837: }
838: }
839:
840: /* Free return values */
841: ait_freeVars(&RPC_RETVARS(c));
842:
843: rpc->call_len = htonl(wlen);
844: rpc->call_io = RPC_ACK;
845:
846: /* calculate CRC */
847: rpc->call_crc ^= rpc->call_crc;
848: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
849:
850: /* send reply */
851: AIT_SET_BUF(&b, NULL, MIN(wlen, s->srv_netbuf) + ETHER_HDR_LEN);
852: eh = (struct ether_header*) AIT_GET_BUF(&b);
853: memcpy(eh->ether_dhost, LLADDR(&c->cli_sa.sdl), ETHER_ADDR_LEN);
854: eh->ether_type = htons(RPC_DEFPORT);
855: memcpy(eh + 1, buf, MIN(wlen, s->srv_netbuf));
856:
857: ret = write(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b));
858: AIT_FREE_VAL(&b);
859: if (ret == -1) {
860: /* close connection */
861: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
862: TASK_ARG(task), 0, NULL, 0);
863: return NULL;
864: }
865:
866: return NULL;
867: }
868:
869: static void *
870: rxBPFPacket(sched_task_t *task)
871: {
872: rpc_srv_t *srv = TASK_ARG(task);
873: rpc_cli_t *c = NULL;
874: int len, rlen, noreply;
875: u_short crc;
876: struct tagRPCCall *rpc;
877: sockaddr_t sa;
878: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
879: struct bpf_hdr *h;
880: struct ether_header *eh;
881: ait_val_t b = AIT_VAL_INIT;
882:
883: /* receive connect packet */
884: AIT_SET_BUF(&b, NULL, srv->srv_netbuf);
885: rlen = read(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b));
886: h = (struct bpf_hdr*) AIT_GET_BUF(&b);
887: rlen -= h->bh_hdrlen;
888: if (rlen < h->bh_datalen || h->bh_caplen != h->bh_datalen ||
889: rlen < ETHER_HDR_LEN + sizeof(struct tagRPCCall)) {
890: rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
891: goto end;
892: } else {
893: rlen = h->bh_caplen;
894: eh = (struct ether_header*) (AIT_GET_BUF(&b) + h->bh_hdrlen);
895: rlen -= ETHER_HDR_LEN;
896: rpc = (struct tagRPCCall*) (eh + 1);
897:
898: #if 0
899: /* skip loop packet */
900: if (rpc->call_io & RPC_ACK)
901: goto end;
902: #endif
903:
904: if (eh->ether_type != ntohs(RPC_DEFPORT))
905: goto end;
906: else
907: e_getlinkbymac((const ether_addr_t*) eh->ether_shost, &sa);
908: }
909:
910: c = _allocClient(srv, &sa);
911: if (!c) {
912: EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
913: usleep(2000); /* blocked client delay */
914: goto end;
915: } else {
916: len = ntohl(rpc->call_len);
917: if (len > AIT_LEN(&c->cli_buf))
918: AIT_RE_BUF(&c->cli_buf, len);
919: memcpy(AIT_GET_BUF(&c->cli_buf), rpc, len);
920: rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
921:
922: c->cli_sock = TASK_FD(task);
923: memcpy(&c->cli_sa, &sa, sizeof c->cli_sa);
924:
925: /* armed timer for close stateless connection */
926: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
927: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
928: c, ts, c, 0);
929: }
930:
931: /* check integrity of packet */
932: crc = ntohs(rpc->call_crc);
933: rpc->call_crc ^= rpc->call_crc;
934: if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
935: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
936: /* close connection */
937: schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
938: c, 0, NULL, 0);
939: goto end;
940: }
941:
942: noreply = RPC_CHK_NOREPLY(rpc);
943:
944: /* check RPC packet session info */
945: if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
946: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
947:
948: rpc->call_argc ^= rpc->call_argc;
949: rpc->call_rep.ret = RPC_ERROR(-1);
950: rpc->call_rep.eno = RPC_ERROR(errno);
951: } else {
952: /* execute RPC call */
953: schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
954: }
955:
956: /* send RPC reply */
957: if (!noreply)
958: schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],
959: c, TASK_FD(task), rpc, len);
960: end:
961: AIT_FREE_VAL(&b);
962: schedReadSelf(task);
963: return NULL;
964: }
965:
966:
967: static void *
968: txEXTPacket(sched_task_t *task)
969: {
970: rpc_cli_t *c = TASK_ARG(task);
971: rpc_srv_t *s = c->cli_parent;
972: rpc_func_t *f = NULL;
973: u_char *buf = AIT_GET_BUF(&c->cli_buf);
974: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
975: int ret, len, wlen = sizeof(struct tagRPCCall);
976: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
977:
978: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
979: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
980: TASK_ARG(task), ts, TASK_ARG(task), 0);
981:
982: if (rpc->call_argc) {
983: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
984: if (!f) {
985: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
986: rpc->call_argc ^= rpc->call_argc;
987: rpc->call_rep.ret = RPC_ERROR(-1);
988: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
989: } else {
990: /* calc estimated length */
991: len = ait_resideVars(RPC_RETVARS(c)) + wlen;
992: if (len > AIT_LEN(&c->cli_buf))
993: AIT_RE_BUF(&c->cli_buf, len);
994: buf = AIT_GET_BUF(&c->cli_buf);
995: rpc = (struct tagRPCCall*) buf;
996:
997: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
998: /* Go Encapsulate variables */
999: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
1000: RPC_RETVARS(c));
1001: if (ret == -1) {
1002: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
1003: rpc->call_argc ^= rpc->call_argc;
1004: rpc->call_rep.ret = RPC_ERROR(-1);
1005: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
1006: } else
1007: wlen += ret;
1008: }
1009: }
1010:
1011: /* Free return values */
1012: ait_freeVars(&RPC_RETVARS(c));
1013:
1014: rpc->call_len = htonl(wlen);
1015: rpc->call_io = RPC_ACK;
1016:
1017: /* send reply */
1018: ret = write(TASK_FD(task), buf, MIN(wlen, s->srv_netbuf));
1019: if (ret == -1) {
1020: /* close connection */
1021: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
1022: TASK_ARG(task), 0, NULL, 0);
1023: return NULL;
1024: }
1025:
1026: return NULL;
1027: }
1028:
1029: static void *
1030: rxEXTPacket(sched_task_t *task)
1031: {
1032: rpc_srv_t *srv = TASK_ARG(task);
1033: rpc_cli_t *c = NULL;
1034: int len, rlen, noreply;
1035: struct tagRPCCall *rpc;
1036: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
1037: ait_val_t b = AIT_VAL_INIT;
1038: sockaddr_t sa;
1039:
1040: memset(&sa, 0, sizeof sa);
1041: /* receive connect packet */
1042: AIT_SET_BUF(&b, NULL, srv->srv_netbuf);
1043: rlen = read(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b));
1044: rpc = (struct tagRPCCall*) AIT_GET_BUF(&b);
1045: if (rlen < sizeof(struct tagRPCCall) || rlen < ntohl(rpc->call_len)) {
1046: rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
1047: goto end;
1048: }
1049:
1050: /* skip loop packet */
1051: if (rpc->call_io & RPC_ACK)
1052: goto end;
1053:
1054: c = _allocClient(srv, &sa);
1055: if (!c) {
1056: EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
1057: usleep(2000); /* blocked client delay */
1058: goto end;
1059: } else {
1060: len = ntohl(rpc->call_len);
1061: if (len > AIT_LEN(&c->cli_buf))
1062: AIT_RE_BUF(&c->cli_buf, len);
1063: memcpy(AIT_GET_BUF(&c->cli_buf), rpc, AIT_LEN(&c->cli_buf));
1064: rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
1065:
1066: c->cli_sock = TASK_FD(task);
1067:
1068: /* armed timer for close stateless connection */
1069: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
1070: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
1071: c, ts, c, 0);
1072: }
1073:
1074: noreply = RPC_CHK_NOREPLY(rpc);
1075:
1076: /* check RPC packet session info */
1077: if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
1078: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
1079:
1080: rpc->call_argc ^= rpc->call_argc;
1081: rpc->call_rep.ret = RPC_ERROR(-1);
1082: rpc->call_rep.eno = RPC_ERROR(errno);
1083: } else {
1084: /* execute RPC call */
1085: schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
1086: }
1087:
1088: /* send RPC reply */
1089: if (!noreply)
1090: schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],
1091: c, TASK_FD(task), rpc, len);
1092: end:
1093: AIT_FREE_VAL(&b);
1094: schedReadSelf(task);
1095: return NULL;
1096: }
1097:
1098: /* ------------------------------------------------------ */
1099:
1100: void
1101: rpc_freeBLOBCli(rpc_cli_t * __restrict c)
1102: {
1103: rpc_srv_t *s = c->cli_parent;
1104:
1105: schedCancelby(s->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
1106:
1107: /* free buffer */
1108: AIT_FREE_VAL(&c->cli_buf);
1109:
1110: array_Del(s->srv_blob.clients, c->cli_id, 0);
1111: if (c)
1112: e_free(c);
1113: }
1114:
1115:
1116: static void *
1117: closeBLOBClient(sched_task_t *task)
1118: {
1119: int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock;
1120:
1121: rpc_freeBLOBCli(TASK_ARG(task));
1122:
1123: /* close client socket */
1124: shutdown(sock, SHUT_RDWR);
1125: close(sock);
1126: return NULL;
1127: }
1128:
1129: static void *
1130: txBLOB(sched_task_t *task)
1131: {
1132: rpc_cli_t *c = TASK_ARG(task);
1133: u_char *buf = AIT_GET_BUF(&c->cli_buf);
1134: int wlen = sizeof(struct tagBLOBHdr);
1135:
1136: /* send reply */
1137: wlen = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);
1138: if (wlen == -1 || wlen != sizeof(struct tagBLOBHdr)) {
1139: /* close blob connection */
1140: schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
1141: }
1142:
1143: return NULL;
1144: }
1145:
1146: static void *
1147: rxBLOB(sched_task_t *task)
1148: {
1149: rpc_cli_t *c = TASK_ARG(task);
1150: rpc_srv_t *s = c->cli_parent;
1151: rpc_blob_t *b;
1152: struct tagBLOBHdr blob;
1153: int rlen;
1154:
1155: memset(&blob, 0, sizeof blob);
1156: rlen = recv(TASK_FD(task), &blob, sizeof blob, 0);
1157: if (rlen < 1) {
1158: /* close blob connection */
1159: schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
1160: return NULL;
1161: }
1162:
1163: /* check BLOB packet */
1164: if (rlen < sizeof(struct tagBLOBHdr)) {
1165: rpc_SetErr(ERPCMISMATCH, "Short BLOB packet");
1166:
1167: schedReadSelf(task);
1168: return NULL;
1169: }
1170:
1171: /* check RPC packet session info */
1172: if (rpc_chkPktSession(&blob.hdr_session, &s->srv_session)) {
1173: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
1174: blob.hdr_cmd = error;
1175: goto end;
1176: }
1177:
1178: /* Go to proceed packet ... */
1179: switch (blob.hdr_cmd) {
1180: case get:
1181: if (!(b = rpc_srv_getBLOB(s, ntohl(blob.hdr_var)))) {
1182: rpc_SetErr(EINVAL, "Var=%x not found", ntohl(blob.hdr_var));
1183: blob.hdr_cmd = no;
1184: blob.hdr_ret = RPC_ERROR(-1);
1185: break;
1186: } else
1187: blob.hdr_len = htonl(b->blob_len);
1188:
1189: if (rpc_srv_blobMap(s, b) != -1) {
1190: /* deliver BLOB variable to client */
1191: blob.hdr_ret = htonl(rpc_srv_sendBLOB(c, b));
1192: rpc_srv_blobUnmap(b);
1193: } else {
1194: blob.hdr_cmd = error;
1195: blob.hdr_ret = RPC_ERROR(-1);
1196: }
1197: break;
1198: case set:
1199: if ((b = rpc_srv_registerBLOB(s, ntohl(blob.hdr_len),
1200: ntohl(blob.hdr_ret)))) {
1201: /* set new BLOB variable for reply :) */
1202: blob.hdr_var = htonl(b->blob_var);
1203:
1204: /* receive BLOB from client */
1205: blob.hdr_ret = htonl(rpc_srv_recvBLOB(c, b));
1206: rpc_srv_blobUnmap(b);
1207: } else {
1208: blob.hdr_cmd = error;
1209: blob.hdr_ret = RPC_ERROR(-1);
1210: }
1211: break;
1212: case unset:
1213: if (rpc_srv_unregisterBLOB(s, ntohl(blob.hdr_var)) == -1) {
1214: blob.hdr_cmd = error;
1215: blob.hdr_ret = RPC_ERROR(-1);
1216: }
1217: break;
1218: default:
1219: rpc_SetErr(EPROCUNAVAIL, "Unsupported BLOB command %d", blob.hdr_cmd);
1220: blob.hdr_cmd = error;
1221: blob.hdr_ret = RPC_ERROR(-1);
1222: }
1223:
1224: end:
1225: memcpy(AIT_ADDR(&c->cli_buf), &blob, sizeof blob);
1226: schedWrite(TASK_ROOT(task), txBLOB, TASK_ARG(task), TASK_FD(task), NULL, 0);
1227: schedReadSelf(task);
1228: return NULL;
1229: }
1230:
1231: static void *
1232: flushBLOB(sched_task_t *task)
1233: {
1234: uintptr_t sigArg = atomic_load_acq_ptr(&_glSigArg);
1235: rpc_srv_t *srv = sigArg ? (void*) sigArg : TASK_ARG(task);
1236: rpc_blob_t *b, *tmp;
1237:
1238: TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
1239: TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
1240:
1241: rpc_srv_blobFree(srv, b);
1242: e_free(b);
1243: }
1244:
1245: if (!schedSignalSelf(task)) {
1246: /* disabled kqueue support in libaitsched */
1247: struct sigaction sa;
1248:
1249: memset(&sa, 0, sizeof sa);
1250: sigemptyset(&sa.sa_mask);
1251: sa.sa_handler = (void (*)(int)) flushBLOB;
1252: sa.sa_flags = SA_RESTART | SA_RESETHAND;
1253: sigaction(SIGFBLOB, &sa, NULL);
1254: }
1255:
1256: return NULL;
1257: }
1258:
1259: static void *
1260: acceptBLOBClients(sched_task_t *task)
1261: {
1262: rpc_srv_t *srv = TASK_ARG(task);
1263: rpc_cli_t *c = NULL;
1264: register int i;
1265: socklen_t salen = sizeof(sockaddr_t);
1266: int sock;
1267: #ifdef TCP_NOPUSH
1268: int n = 1;
1269: #endif
1270:
1271: /* check free slots for connect */
1272: for (i = 0; i < array_Size(srv->srv_blob.clients) &&
1273: (c = array(srv->srv_blob.clients, i, rpc_cli_t*)); i++);
1274: if (c) { /* no more free slots! */
1275: EVERBOSE(1, "BLOB client quota exceeded! Connection will be shutdown!\n");
1276: if ((sock = accept(TASK_FD(task), NULL, NULL)) != -1) {
1277: shutdown(sock, SHUT_RDWR);
1278: close(sock);
1279: }
1280: goto end;
1281: }
1282:
1283: c = e_malloc(sizeof(rpc_cli_t));
1284: if (!c) {
1285: LOGERR;
1286: srv->srv_kill = srv->srv_blob.kill = 1;
1287: return NULL;
1288: } else {
1289: memset(c, 0, sizeof(rpc_cli_t));
1290: array_Set(srv->srv_blob.clients, i, c);
1291: c->cli_id = i;
1292: c->cli_parent = srv;
1293: }
1294:
1295: /* alloc empty buffer */
1296: AIT_SET_BUFSIZ(&c->cli_buf, 0, srv->srv_netbuf);
1297:
1298: /* accept client */
1299: c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
1300: if (c->cli_sock == -1) {
1301: LOGERR;
1302: AIT_FREE_VAL(&c->cli_buf);
1303: array_Del(srv->srv_blob.clients, i, 42);
1304: goto end;
1305: } else {
1306: #ifdef TCP_NOPUSH
1307: setsockopt(c->cli_sock, IPPROTO_TCP, TCP_NOPUSH, &n, sizeof n);
1308: #endif
1309: fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
1310: }
1311:
1312: schedRead(TASK_ROOT(task), rxBLOB, c, c->cli_sock, NULL, 0);
1313: end:
1314: schedReadSelf(task);
1315: return NULL;
1316: }
1317:
1318: /* ------------------------------------------------------ */
1319:
1320: /*
1321: * rpc_srv_initBLOBServer() - Init & create BLOB Server
1322: *
1323: * @srv = RPC server instance
1324: * @Port = Port for bind server, if Port == 0 default port is selected
1325: * @diskDir = Disk place for BLOB file objects
1326: * return: -1 == error or 0 bind and created BLOB server instance
1327: */
1328: int
1329: rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)
1330: {
1331: int n = 1;
1332:
1333: if (!srv || srv->srv_kill) {
1334: rpc_SetErr(EINVAL, "Invalid parameters can`t init BLOB server");
1335: return -1;
1336: }
1337:
1338: memset(&srv->srv_blob, 0, sizeof srv->srv_blob);
1339: if (access(diskDir, R_OK | W_OK) == -1) {
1340: LOGERR;
1341: return -1;
1342: } else
1343: AIT_SET_STR(&srv->srv_blob.dir, diskDir);
1344:
1345: /* init blob list */
1346: TAILQ_INIT(&srv->srv_blob.blobs);
1347:
1348: srv->srv_blob.server.cli_parent = srv;
1349:
1350: memcpy(&srv->srv_blob.server.cli_sa, &srv->srv_server.cli_sa, sizeof(sockaddr_t));
1351: switch (srv->srv_blob.server.cli_sa.sa.sa_family) {
1352: case AF_INET:
1353: srv->srv_blob.server.cli_sa.sin.sin_port =
1354: htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin.sin_port) + 1);
1355: break;
1356: case AF_INET6:
1357: srv->srv_blob.server.cli_sa.sin6.sin6_port =
1358: htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin6.sin6_port) + 1);
1359: break;
1360: case AF_LOCAL:
1361: strlcat(srv->srv_blob.server.cli_sa.sun.sun_path, ".blob",
1362: sizeof srv->srv_blob.server.cli_sa.sun.sun_path);
1363: break;
1364: default:
1365: AIT_FREE_VAL(&srv->srv_blob.dir);
1366: return -1;
1367: }
1368:
1369: /* create BLOB server socket */
1370: srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);
1371: if (srv->srv_blob.server.cli_sock == -1) {
1372: LOGERR;
1373: AIT_FREE_VAL(&srv->srv_blob.dir);
1374: return -1;
1375: }
1376: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
1377: LOGERR;
1378: close(srv->srv_blob.server.cli_sock);
1379: AIT_FREE_VAL(&srv->srv_blob.dir);
1380: return -1;
1381: }
1382: n = srv->srv_netbuf;
1383: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
1384: LOGERR;
1385: close(srv->srv_blob.server.cli_sock);
1386: AIT_FREE_VAL(&srv->srv_blob.dir);
1387: return -1;
1388: }
1389: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
1390: LOGERR;
1391: close(srv->srv_blob.server.cli_sock);
1392: AIT_FREE_VAL(&srv->srv_blob.dir);
1393: return -1;
1394: }
1395: if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa,
1396: srv->srv_blob.server.cli_sa.sa.sa_len) == -1) {
1397: LOGERR;
1398: close(srv->srv_blob.server.cli_sock);
1399: AIT_FREE_VAL(&srv->srv_blob.dir);
1400: return -1;
1401: } else
1402: fcntl(srv->srv_blob.server.cli_sock, F_SETFL,
1403: fcntl(srv->srv_blob.server.cli_sock, F_GETFL) | O_NONBLOCK);
1404:
1405:
1406: /* allocate pool for concurent blob clients */
1407: srv->srv_blob.clients = array_Init(array_Size(srv->srv_clients));
1408: if (!srv->srv_blob.clients) {
1409: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
1410: close(srv->srv_blob.server.cli_sock);
1411: AIT_FREE_VAL(&srv->srv_blob.dir);
1412: return -1;
1413: }
1414:
1415: /* init blob scheduler */
1416: srv->srv_blob.root = schedBegin();
1417: if (!srv->srv_blob.root) {
1418: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1419: array_Destroy(&srv->srv_blob.clients);
1420: close(srv->srv_blob.server.cli_sock);
1421: AIT_FREE_VAL(&srv->srv_blob.dir);
1422: return -1;
1423: }
1424:
1425: return 0;
1426: }
1427:
1428: /*
1429: * rpc_srv_endBLOBServer() - Destroy BLOB server, close all opened sockets and free resources
1430: *
1431: * @srv = RPC Server instance
1432: * return: none
1433: */
1434: void
1435: rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
1436: {
1437: if (!srv)
1438: return;
1439:
1440: srv->srv_blob.kill = 1;
1441:
1442: schedEnd(&srv->srv_blob.root);
1443:
1444: if (srv->srv_blob.server.cli_sa.sa.sa_family == AF_LOCAL)
1445: unlink(srv->srv_blob.server.cli_sa.sun.sun_path);
1446: }
1447:
1448: /*
1449: * rpc_srv_loopBLOBServer() - Execute Main BLOB server loop and wait for clients requests
1450: *
1451: * @srv = RPC Server instance
1452: * return: -1 error or 0 ok, infinite loop ...
1453: */
1454: int
1455: rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv)
1456: {
1457: rpc_cli_t *c;
1458: register int i;
1459: rpc_blob_t *b, *tmp;
1460: struct timespec ts = { RPC_SCHED_POLLING, 0 };
1461:
1462: if (!srv || srv->srv_kill) {
1463: rpc_SetErr(EINVAL, "Invalid parameter can`t start BLOB server");
1464: return -1;
1465: }
1466:
1467: if (listen(srv->srv_blob.server.cli_sock, array_Size(srv->srv_blob.clients)) == -1) {
1468: LOGERR;
1469: return -1;
1470: }
1471:
1472: if (!schedSignal(srv->srv_blob.root, flushBLOB, srv, SIGFBLOB, NULL, 0)) {
1473: /* disabled kqueue support in libaitsched */
1474: struct sigaction sa;
1475:
1476: atomic_store_rel_ptr(&_glSigArg, (uintptr_t) srv);
1477:
1478: memset(&sa, 0, sizeof sa);
1479: sigemptyset(&sa.sa_mask);
1480: sa.sa_handler = (void (*)(int)) flushBLOB;
1481: sa.sa_flags = SA_RESTART | SA_RESETHAND;
1482: sigaction(SIGFBLOB, &sa, NULL);
1483: }
1484:
1485: if (!schedRead(srv->srv_blob.root, acceptBLOBClients, srv,
1486: srv->srv_blob.server.cli_sock, NULL, 0)) {
1487: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1488: return -1;
1489: }
1490:
1491: schedPolling(srv->srv_blob.root, &ts, NULL);
1492: /* main rpc loop */
1493: schedRun(srv->srv_blob.root, &srv->srv_blob.kill);
1494:
1495: /* detach blobs */
1496: TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
1497: TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
1498:
1499: rpc_srv_blobFree(srv, b);
1500: e_free(b);
1501: }
1502:
1503: /* close all clients connections & server socket */
1504: for (i = 0; i < array_Size(srv->srv_blob.clients); i++) {
1505: c = array(srv->srv_blob.clients, i, rpc_cli_t*);
1506: if (c) {
1507: shutdown(c->cli_sock, SHUT_RDWR);
1508: close(c->cli_sock);
1509:
1510: schedCancelby(srv->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
1511: AIT_FREE_VAL(&c->cli_buf);
1512: }
1513: array_Del(srv->srv_blob.clients, i, 42);
1514: }
1515: array_Destroy(&srv->srv_blob.clients);
1516:
1517: close(srv->srv_blob.server.cli_sock);
1518:
1519: AIT_FREE_VAL(&srv->srv_blob.dir);
1520: return 0;
1521: }
1522:
1523:
1524: /*
1525: * rpc_srv_initServer() - Init & create RPC Server
1526: *
1527: * @InstID = Instance for authentication & recognition
1528: * @concurentClients = Concurent clients at same time to this server
1529: * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
1530: * @csHost = Host name or address for bind server, if NULL any address
1531: * @Port = Port for bind server, if Port == 0 default port is selected
1532: * @proto = Protocol, if == 0 choose SOCK_STREAM
1533: * return: NULL == error or !=NULL bind and created RPC server instance
1534: */
1535: rpc_srv_t *
1536: rpc_srv_initServer(u_char InstID, int concurentClients, int netBuf,
1537: const char *csHost, u_short Port, int proto)
1538: {
1539: int n = 1;
1540: rpc_srv_t *srv = NULL;
1541: sockaddr_t sa = E_SOCKADDR_INIT;
1542:
1543: if (!concurentClients || (proto < 0 || proto > SOCK_RAW)) {
1544: rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
1545: return NULL;
1546: }
1547: if (!Port && proto < SOCK_RAW)
1548: Port = RPC_DEFPORT;
1549: if (!e_gethostbyname(csHost, Port, &sa))
1550: return NULL;
1551: if (!proto)
1552: proto = SOCK_STREAM;
1553: if (netBuf < RPC_MIN_BUFSIZ)
1554: netBuf = BUFSIZ;
1555: else
1556: netBuf = E_ALIGN(netBuf, 2); /* align netBuf length */
1557:
1558: #ifdef HAVE_SRANDOMDEV
1559: srandomdev();
1560: #else
1561: time_t tim;
1562:
1563: srandom((time(&tim) ^ getpid()));
1564: #endif
1565:
1566: srv = e_malloc(sizeof(rpc_srv_t));
1567: if (!srv) {
1568: LOGERR;
1569: return NULL;
1570: } else
1571: memset(srv, 0, sizeof(rpc_srv_t));
1572:
1573: srv->srv_proto = proto;
1574: srv->srv_netbuf = netBuf;
1575: srv->srv_session.sess_version = RPC_VERSION;
1576: srv->srv_session.sess_instance = InstID;
1577:
1578: srv->srv_server.cli_parent = srv;
1579: memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa);
1580:
1581: /* init functions */
1582: pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
1583: SLIST_INIT(&srv->srv_funcs);
1584: AVL_INIT(&srv->srv_funcs);
1585:
1586: /* init scheduler */
1587: srv->srv_root = schedBegin();
1588: if (!srv->srv_root) {
1589: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1590: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1591: e_free(srv);
1592: return NULL;
1593: }
1594:
1595: /* init pool for clients */
1596: srv->srv_clients = array_Init(concurentClients);
1597: if (!srv->srv_clients) {
1598: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
1599: schedEnd(&srv->srv_root);
1600: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1601: e_free(srv);
1602: return NULL;
1603: }
1604:
1605: /* create server socket */
1606: srv->srv_server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family,
1607: srv->srv_proto, srv->srv_proto == SOCK_RAW ? IPPROTO_ERPC : 0);
1608: if (srv->srv_server.cli_sock == -1) {
1609: LOGERR;
1610: array_Destroy(&srv->srv_clients);
1611: schedEnd(&srv->srv_root);
1612: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1613: e_free(srv);
1614: return NULL;
1615: }
1616: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
1617: LOGERR;
1618: goto err;
1619: }
1620: n = srv->srv_netbuf;
1621: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
1622: LOGERR;
1623: goto err;
1624: }
1625: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
1626: LOGERR;
1627: goto err;
1628: }
1629: if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa,
1630: srv->srv_server.cli_sa.sa.sa_len) == -1) {
1631: LOGERR;
1632: goto err;
1633: } else
1634: fcntl(srv->srv_server.cli_sock, F_SETFL,
1635: fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
1636:
1637: rpc_register_srvPing(srv);
1638:
1639: return srv;
1640: err: /* error condition */
1641: close(srv->srv_server.cli_sock);
1642: array_Destroy(&srv->srv_clients);
1643: schedEnd(&srv->srv_root);
1644: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1645: e_free(srv);
1646: return NULL;
1647: }
1648:
1649: /*
1650: * rpc_srv_endServer() - Destroy RPC server, close all opened sockets and free resources
1651: *
1652: * @psrv = RPC Server instance
1653: * return: none
1654: */
1655: void
1656: rpc_srv_endServer(rpc_srv_t ** __restrict psrv)
1657: {
1658: if (!psrv || !*psrv)
1659: return;
1660:
1661: /* if send kill to blob server */
1662: rpc_srv_endBLOBServer(*psrv);
1663:
1664: (*psrv)->srv_kill = 1;
1665: sleep(RPC_SCHED_POLLING);
1666:
1667: schedEnd(&(*psrv)->srv_root);
1668:
1669: if ((*psrv)->srv_server.cli_sa.sa.sa_family == AF_LOCAL)
1670: unlink((*psrv)->srv_server.cli_sa.sun.sun_path);
1671:
1672: pthread_mutex_destroy(&(*psrv)->srv_funcs.mtx);
1673: e_free(*psrv);
1674: *psrv = NULL;
1675: }
1676:
1677: /*
1678: * rpc_srv_loopServer() - Execute Main server loop and wait for clients requests
1679: *
1680: * @srv = RPC Server instance
1681: * return: -1 error or 0 ok, infinite loop ...
1682: */
1683: int
1684: rpc_srv_loopServer(rpc_srv_t * __restrict srv)
1685: {
1686: rpc_cli_t *c;
1687: register int i;
1688: rpc_func_t *f;
1689: struct timespec ts = { RPC_SCHED_POLLING, 0 };
1690:
1691: if (!srv) {
1692: rpc_SetErr(EINVAL, "Invalid parameter can`t start RPC server");
1693: return -1;
1694: }
1695:
1696: if (srv->srv_proto == SOCK_STREAM)
1697: if (listen(srv->srv_server.cli_sock, array_Size(srv->srv_clients)) == -1) {
1698: LOGERR;
1699: return -1;
1700: }
1701:
1702: if (!schedRead(srv->srv_root, cbProto[srv->srv_proto][CB_ACCEPTCLIENT], srv,
1703: srv->srv_server.cli_sock, NULL, 0)) {
1704: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1705: return -1;
1706: }
1707:
1708: schedPolling(srv->srv_root, &ts, NULL);
1709: /* main rpc loop */
1710: schedRun(srv->srv_root, &srv->srv_kill);
1711:
1712: /* close all clients connections & server socket */
1713: for (i = 0; i < array_Size(srv->srv_clients); i++) {
1714: c = array(srv->srv_clients, i, rpc_cli_t*);
1715: if (c) {
1716: if (srv->srv_proto == SOCK_STREAM) {
1717: shutdown(c->cli_sock, SHUT_RDWR);
1718: close(c->cli_sock);
1719: }
1720:
1721: schedCancelby(srv->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
1722: ait_freeVars(&RPC_RETVARS(c));
1723: AIT_FREE_VAL(&c->cli_buf);
1724: }
1725: array_Del(srv->srv_clients, i, 42);
1726: }
1727: array_Destroy(&srv->srv_clients);
1728:
1729: if (srv->srv_proto != SOCK_EXT)
1730: close(srv->srv_server.cli_sock);
1731:
1732: /* detach exported calls */
1733: RPC_FUNCS_LOCK(&srv->srv_funcs);
1734: while ((f = SLIST_FIRST(&srv->srv_funcs))) {
1735: SLIST_REMOVE_HEAD(&srv->srv_funcs, func_next);
1736:
1737: AIT_FREE_VAL(&f->func_name);
1738: e_free(f);
1739: }
1740: srv->srv_funcs.avlh_root = NULL;
1741: RPC_FUNCS_UNLOCK(&srv->srv_funcs);
1742:
1743: return 0;
1744: }
1745:
1746:
1747: /*
1748: * rpc_srv_execCall() Execute registered call from RPC server
1749: *
1750: * @cli = RPC client
1751: * @rpc = IN RPC call structure
1752: * @funcname = Execute RPC function
1753: * @args = IN RPC calling arguments from RPC client
1754: * return: -1 error, !=-1 ok
1755: */
1756: int
1757: rpc_srv_execCall(rpc_cli_t * __restrict cli, struct tagRPCCall * __restrict rpc,
1758: ait_val_t funcname, array_t * __restrict args)
1759: {
1760: rpc_callback_t func;
1761:
1762: if (!cli || !rpc || !AIT_ADDR(&funcname)) {
1763: rpc_SetErr(EINVAL, "Invalid parameter can`t exec function");
1764: return -1;
1765: }
1766:
1767: func = AIT_GET_LIKE(&funcname, rpc_callback_t);
1768: return func(cli, rpc, args);
1769: }
1770:
1771:
1772: /*
1773: * rpc_srv_initServer2() - Init & create layer2 RPC Server
1774: *
1775: * @InstID = Instance for authentication & recognition
1776: * @concurentClients = Concurent clients at same time to this server
1777: * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
1778: * @csIface = Interface name for bind server, if NULL first interface on host
1779: * return: NULL == error or !=NULL bind and created RPC server instance
1780: */
1781: rpc_srv_t *
1782: rpc_srv_initServer2(u_char InstID, int concurentClients, int netBuf, const char *csIface)
1783: {
1784: int n = 1;
1785: rpc_srv_t *srv = NULL;
1786: sockaddr_t sa = E_SOCKADDR_INIT;
1787: char szIface[64], szStr[STRSIZ];
1788: register int i;
1789: struct ifreq ifr;
1790: struct bpf_insn insns[] = {
1791: BPF_STMT(BPF_LD + BPF_H + BPF_ABS, 12),
1792: BPF_JUMP(BPF_JMP + BPF_JEQ + BPF_K, RPC_DEFPORT, 0, 1),
1793: BPF_STMT(BPF_RET + BPF_K, -1),
1794: BPF_STMT(BPF_RET + BPF_K, 0),
1795: };
1796: struct bpf_program fcode = {
1797: .bf_len = sizeof(insns) / sizeof(struct bpf_insn),
1798: .bf_insns = insns
1799: };
1800:
1801: if (!concurentClients) {
1802: rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
1803: return NULL;
1804: }
1805: if (!csIface) {
1806: if (e_get1stiface(szIface, sizeof szIface))
1807: return NULL;
1808: } else
1809: strlcpy(szIface, csIface, sizeof szIface);
1810: if (!e_getifacebyname(szIface, &sa))
1811: return NULL;
1812:
1813: #ifdef HAVE_SRANDOMDEV
1814: srandomdev();
1815: #else
1816: time_t tim;
1817:
1818: srandom((time(&tim) ^ getpid()));
1819: #endif
1820:
1821: srv = e_malloc(sizeof(rpc_srv_t));
1822: if (!srv) {
1823: LOGERR;
1824: return NULL;
1825: } else
1826: memset(srv, 0, sizeof(rpc_srv_t));
1827:
1828: srv->srv_proto = SOCK_BPF;
1829: srv->srv_netbuf = netBuf;
1830: srv->srv_session.sess_version = RPC_VERSION;
1831: srv->srv_session.sess_instance = InstID;
1832:
1833: srv->srv_server.cli_parent = srv;
1834: memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa);
1835:
1836: /* init functions */
1837: pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
1838: SLIST_INIT(&srv->srv_funcs);
1839: AVL_INIT(&srv->srv_funcs);
1840:
1841: /* init scheduler */
1842: srv->srv_root = schedBegin();
1843: if (!srv->srv_root) {
1844: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1845: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1846: e_free(srv);
1847: return NULL;
1848: }
1849:
1850: /* init pool for clients */
1851: srv->srv_clients = array_Init(concurentClients);
1852: if (!srv->srv_clients) {
1853: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
1854: schedEnd(&srv->srv_root);
1855: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1856: e_free(srv);
1857: return NULL;
1858: }
1859:
1860: /* create server handler */
1861: for (i = 0; i < 10; i++) {
1862: memset(szStr, 0, sizeof szStr);
1863: snprintf(szStr, sizeof szStr, "/dev/bpf%d", i);
1864: srv->srv_server.cli_sock = open(szStr, O_RDWR);
1865: if (srv->srv_server.cli_sock > STDERR_FILENO)
1866: break;
1867: }
1868: if (srv->srv_server.cli_sock < 3) {
1869: LOGERR;
1870: array_Destroy(&srv->srv_clients);
1871: schedEnd(&srv->srv_root);
1872: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1873: e_free(srv);
1874: return NULL;
1875: }
1876:
1877: if (ioctl(srv->srv_server.cli_sock, BIOCIMMEDIATE, &n) == -1) {
1878: LOGERR;
1879: goto err;
1880: }
1881: if (ioctl(srv->srv_server.cli_sock, BIOCSETF, &fcode) == -1) {
1882: LOGERR;
1883: goto err;
1884: }
1885: n = (netBuf < RPC_MIN_BUFSIZ) ? getpagesize() : E_ALIGN(netBuf, 2);
1886: if (ioctl(srv->srv_server.cli_sock, BIOCSBLEN, &n) == -1) {
1887: LOGERR;
1888: goto err;
1889: } else
1890: srv->srv_netbuf = n;
1891:
1892: memset(&ifr, 0, sizeof ifr);
1893: strlcpy(ifr.ifr_name, szIface, sizeof ifr.ifr_name);
1894: if (ioctl(srv->srv_server.cli_sock, BIOCSETIF, &ifr) == -1) {
1895: LOGERR;
1896: goto err;
1897: } else
1898: fcntl(srv->srv_server.cli_sock, F_SETFL,
1899: fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
1900:
1901: rpc_register_srvPing(srv);
1902:
1903: return srv;
1904: err: /* error condition */
1905: close(srv->srv_server.cli_sock);
1906: array_Destroy(&srv->srv_clients);
1907: schedEnd(&srv->srv_root);
1908: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1909: e_free(srv);
1910: return NULL;
1911: }
1912:
1913: /*
1914: * rpc_srv_initServerExt() - Init & create pipe RPC Server
1915: *
1916: * @InstID = Instance for authentication & recognition
1917: * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
1918: * @fd = File descriptor
1919: * return: NULL == error or !=NULL bind and created RPC server instance
1920: */
1921: rpc_srv_t *
1922: rpc_srv_initServerExt(u_char InstID, int netBuf, int fd)
1923: {
1924: rpc_srv_t *srv = NULL;
1925:
1926: #ifdef HAVE_SRANDOMDEV
1927: srandomdev();
1928: #else
1929: time_t tim;
1930:
1931: srandom((time(&tim) ^ getpid()));
1932: #endif
1933:
1934: srv = e_malloc(sizeof(rpc_srv_t));
1935: if (!srv) {
1936: LOGERR;
1937: return NULL;
1938: } else
1939: memset(srv, 0, sizeof(rpc_srv_t));
1940:
1941: srv->srv_proto = SOCK_EXT;
1942: srv->srv_netbuf = (netBuf < RPC_MIN_BUFSIZ) ?
1943: getpagesize() : E_ALIGN(netBuf, 2);
1944: srv->srv_session.sess_version = RPC_VERSION;
1945: srv->srv_session.sess_instance = InstID;
1946:
1947: srv->srv_server.cli_parent = srv;
1948: srv->srv_server.cli_sock = fd;
1949:
1950: /* init functions */
1951: pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
1952: SLIST_INIT(&srv->srv_funcs);
1953: AVL_INIT(&srv->srv_funcs);
1954:
1955: /* init scheduler */
1956: srv->srv_root = schedBegin();
1957: if (!srv->srv_root) {
1958: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1959: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1960: e_free(srv);
1961: return NULL;
1962: }
1963:
1964: /* init pool for clients */
1965: srv->srv_clients = array_Init(1);
1966: if (!srv->srv_clients) {
1967: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
1968: schedEnd(&srv->srv_root);
1969: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1970: e_free(srv);
1971: return NULL;
1972: }
1973:
1974: fcntl(srv->srv_server.cli_sock, F_SETFL,
1975: fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
1976:
1977: rpc_register_srvPing(srv);
1978:
1979: return srv;
1980: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>