1: /*************************************************************************
2: * (C) 2010 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
6: * $Id: srv.c,v 1.26.2.5 2015/06/25 22:39:05 misho Exp $
7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
15: Copyright 2004 - 2015
16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47:
48:
49: /* SOCK_STREAM */
50: static void *acceptClients(sched_task_t *);
51: static void *closeClient(sched_task_t *);
52: static void *rxPacket(sched_task_t *);
53: static void *txPacket(sched_task_t *);
54:
55: /* SOCK_DGRAM */
56: static void *freeClient(sched_task_t *);
57: static void *rxUDPPacket(sched_task_t *);
58: static void *txUDPPacket(sched_task_t *);
59:
60: /* SOCK_RAW */
61: static void *rxRAWPacket(sched_task_t *);
62: static void *txRAWPacket(sched_task_t *);
63:
64: /* SOCK_BPF */
65: static void *rxBPFPacket(sched_task_t *);
66: static void *txBPFPacket(sched_task_t *);
67:
68: /* SOCK_EXT */
69: static void *rxEXTPacket(sched_task_t *);
70: static void *txEXTPacket(sched_task_t *);
71:
72: static sched_task_func_t cbProto[SOCK_MAX_SUPPORT][4] = {
73: { acceptClients, closeClient, rxPacket, txPacket }, /* SOCK_STREAM */
74: { acceptClients, closeClient, rxPacket, txPacket }, /* SOCK_STREAM */
75: { rxUDPPacket, freeClient, rxUDPPacket, txUDPPacket }, /* SOCK_DGRAM */
76: { rxRAWPacket, freeClient, rxRAWPacket, txRAWPacket }, /* SOCK_RAW */
77: { rxBPFPacket, freeClient, rxBPFPacket, txBPFPacket }, /* SOCK_BPF */
78: { rxEXTPacket, freeClient, rxEXTPacket, txEXTPacket } /* SOCK_EXT */
79: };
80:
81: /* Global Signal Argument when kqueue support disabled */
82:
83: static volatile uintptr_t _glSigArg = 0;
84:
85:
86: void
87: rpc_freeCli(rpc_cli_t * __restrict c)
88: {
89: rpc_srv_t *s = c->cli_parent;
90:
91: schedCancelby(s->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
92:
93: /* free buffer */
94: AIT_FREE_VAL(&c->cli_buf);
95:
96: array_Del(s->srv_clients, c->cli_id, 0);
97: if (c)
98: e_free(c);
99: }
100:
101:
102: static inline int
103: _check4freeslot(rpc_srv_t * __restrict srv, sockaddr_t * __restrict sa)
104: {
105: rpc_cli_t *c = NULL;
106: register int i;
107:
108: /* check free slots for connect */
109: for (i = 0; i < array_Size(srv->srv_clients) &&
110: (c = array(srv->srv_clients, i, rpc_cli_t*)); i++)
111: /* check for duplicates */
112: if (sa && !e_addrcmp(&c->cli_sa, sa, 42))
113: break;
114: if (i >= array_Size(srv->srv_clients))
115: return -1; /* no more free slots! */
116:
117: return i;
118: }
119:
120: static rpc_cli_t *
121: _allocClient(rpc_srv_t * __restrict srv, sockaddr_t * __restrict sa)
122: {
123: rpc_cli_t *c = NULL;
124: int n;
125:
126: if (srv->srv_proto != SOCK_EXT)
127: n = _check4freeslot(srv, sa);
128: else
129: n = 0;
130: if (n == -1)
131: return NULL;
132: else
133: c = array(srv->srv_clients, n, rpc_cli_t*);
134:
135: if (!c) {
136: c = e_malloc(sizeof(rpc_cli_t));
137: if (!c) {
138: LOGERR;
139: srv->srv_kill = 1;
140: return NULL;
141: } else {
142: memset(c, 0, sizeof(rpc_cli_t));
143: array_Set(srv->srv_clients, n, c);
144: c->cli_id = n;
145: c->cli_parent = srv;
146: }
147:
148: /* alloc empty buffer */
149: AIT_SET_BUFSIZ(&c->cli_buf, 0, srv->srv_netbuf);
150: }
151:
152: return c;
153: }
154:
155:
156: static void *
157: freeClient(sched_task_t *task)
158: {
159: rpc_freeCli(TASK_ARG(task));
160:
161: return NULL;
162: }
163:
164: static void *
165: closeClient(sched_task_t *task)
166: {
167: int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock;
168:
169: rpc_freeCli(TASK_ARG(task));
170:
171: /* close client socket */
172: shutdown(sock, SHUT_RDWR);
173: close(sock);
174: return NULL;
175: }
176:
177: static void *
178: txPacket(sched_task_t *task)
179: {
180: rpc_cli_t *c = TASK_ARG(task);
181: rpc_srv_t *s = c->cli_parent;
182: rpc_func_t *f = NULL;
183: u_char *buf = AIT_GET_BUF(&c->cli_buf);
184: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
185: int ret, estlen, wlen = sizeof(struct tagRPCCall);
186: struct pollfd pfd;
187: #ifdef TCP_SESSION_TIMEOUT
188: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
189:
190: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
191: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
192: TASK_ARG(task), ts, TASK_ARG(task), 0);
193: #endif
194:
195: if (rpc->call_argc) {
196: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
197: if (!f) {
198: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
199:
200: rpc->call_argc ^= rpc->call_argc;
201: rpc->call_rep.ret = RPC_ERROR(-1);
202: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
203: } else {
204: /* calc estimated length */
205: estlen = ait_resideVars(RPC_RETVARS(c)) + wlen;
206: if (estlen > AIT_LEN(&c->cli_buf))
207: AIT_RE_BUF(&c->cli_buf, estlen);
208: buf = AIT_GET_BUF(&c->cli_buf);
209: rpc = (struct tagRPCCall*) buf;
210:
211: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
212: /* Go Encapsulate variables */
213: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
214: RPC_RETVARS(c));
215: if (ret == -1) {
216: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
217:
218: rpc->call_argc ^= rpc->call_argc;
219: rpc->call_rep.ret = RPC_ERROR(-1);
220: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
221: } else
222: wlen += ret;
223: }
224: }
225:
226: /* Free return values */
227: ait_freeVars(&c->cli_vars);
228:
229: rpc->call_len = htonl(wlen);
230: rpc->call_io = RPC_ACK;
231:
232: #if 0
233: /* calculate CRC */
234: rpc->call_crc ^= rpc->call_crc;
235: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
236: #endif
237:
238: /* send reply */
239: pfd.fd = TASK_FD(task);
240: pfd.events = POLLOUT;
241: for (; wlen > 0; wlen -= ret, buf += ret) {
242: if ((ret = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 ||
243: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
244: if (ret)
245: LOGERR;
246: else
247: rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
248: /* close connection */
249: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
250: TASK_ARG(task), 0, NULL, 0);
251: return NULL;
252: }
253: ret = send(TASK_FD(task), buf, MIN(wlen, s->srv_netbuf), MSG_NOSIGNAL);
254: if (ret == -1) {
255: /* close connection */
256: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
257: TASK_ARG(task), 0, NULL, 0);
258: return NULL;
259: }
260: }
261:
262: return NULL;
263: }
264:
265: static void *
266: execCall(sched_task_t *task)
267: {
268: rpc_cli_t *c = TASK_ARG(task);
269: rpc_srv_t *s = c->cli_parent;
270: rpc_func_t *f = NULL;
271: array_t *arr = NULL;
272: u_char *buf = AIT_GET_BUF(&c->cli_buf);
273: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
274: int argc = rpc->call_argc;
275:
276: /* Go decapsulate variables ... */
277: if (argc) {
278: arr = ait_buffer2vars(buf + sizeof(struct tagRPCCall),
279: AIT_LEN(&c->cli_buf) - sizeof(struct tagRPCCall), argc, 42);
280: if (!arr) {
281: rpc_SetErr(ERPCMISMATCH, "#%d - %s", elwix_GetErrno(), elwix_GetError());
282:
283: rpc->call_argc ^= rpc->call_argc;
284: rpc->call_rep.ret = RPC_ERROR(-1);
285: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
286: return NULL;
287: }
288: } else
289: arr = NULL;
290:
291: if (!(f = rpc_srv_getCall(s, ntohs(rpc->call_tag)))) {
292: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
293:
294: rpc->call_argc ^= rpc->call_argc;
295: rpc->call_rep.ret = RPC_ERROR(-1);
296: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
297: } else {
298: /* if client doesn't want reply */
299: argc = RPC_CHK_NOREPLY(rpc);
300: rpc->call_rep.ret = RPC_ERROR(rpc_srv_execCall(c, rpc, f->func_name, arr));
301: if (rpc->call_rep.ret == htonl(-1)) {
302: if (!rpc->call_rep.eno) {
303: LOGERR;
304: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
305: }
306: rpc->call_argc ^= rpc->call_argc;
307: ait_freeVars(&c->cli_vars);
308: } else {
309: rpc->call_rep.eno ^= rpc->call_rep.eno;
310: if (argc) {
311: /* without reply */
312: ait_freeVars(&c->cli_vars);
313: rpc->call_argc ^= rpc->call_argc;
314: } else {
315: /* reply */
316: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
317: }
318: }
319: }
320:
321: array_Destroy(&arr);
322: return NULL;
323: }
324:
325: static void *
326: rxPacket(sched_task_t *task)
327: {
328: rpc_cli_t *c = TASK_ARG(task);
329: rpc_srv_t *s = c->cli_parent;
330: int len, rlen, noreply, estlen;
331: #if 0
332: u_short crc;
333: #endif
334: u_char *buf = AIT_GET_BUF(&c->cli_buf);
335: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
336: struct pollfd pfd;
337: #ifdef TCP_SESSION_TIMEOUT
338: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
339:
340: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
341: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
342: TASK_ARG(task), ts, TASK_ARG(task), 0);
343: #endif
344:
345: memset(buf, 0, sizeof(struct tagRPCCall));
346: rlen = recv(TASK_FD(task), rpc, sizeof(struct tagRPCCall), MSG_PEEK);
347: if (rlen < sizeof(struct tagRPCCall)) {
348: /* close connection */
349: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
350: TASK_ARG(task), 0, NULL, 0);
351: return NULL;
352: } else {
353: estlen = ntohl(rpc->call_len);
354: if (estlen > AIT_LEN(&c->cli_buf))
355: AIT_RE_BUF(&c->cli_buf, estlen);
356: rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
357: buf = AIT_GET_BUF(&c->cli_buf);
358: len = estlen;
359: }
360:
361: /* get next part of packet */
362: memset(buf, 0, len);
363: pfd.fd = TASK_FD(task);
364: pfd.events = POLLIN | POLLPRI;
365: for (; len > 0; len -= rlen, buf += rlen) {
366: if ((rlen = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 ||
367: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
368: if (rlen)
369: LOGERR;
370: else
371: rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
372: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
373: TASK_ARG(task), 0, NULL, 0);
374: return NULL;
375: }
376: rlen = recv(TASK_FD(task), buf, len, 0);
377: if (rlen == -1) {
378: /* close connection */
379: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
380: TASK_ARG(task), 0, NULL, 0);
381: return NULL;
382: }
383: }
384: len = estlen;
385:
386: /* skip loop packet */
387: if (rpc->call_io & RPC_ACK) {
388: schedReadSelf(task);
389: return NULL;
390: }
391:
392: #if 0
393: /* check integrity of packet */
394: crc = ntohs(rpc->call_crc);
395: rpc->call_crc ^= rpc->call_crc;
396: if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
397: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
398: return NULL;
399: }
400: #endif
401:
402: noreply = RPC_CHK_NOREPLY(rpc);
403:
404: /* check RPC packet session info */
405: if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) {
406: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
407:
408: rpc->call_argc ^= rpc->call_argc;
409: rpc->call_rep.ret = RPC_ERROR(-1);
410: rpc->call_rep.eno = RPC_ERROR(errno);
411: } else {
412: /* execute RPC call */
413: schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), (int) noreply, rpc, len);
414: }
415:
416: /* send RPC reply */
417: if (!noreply)
418: schedWrite(TASK_ROOT(task), cbProto[s->srv_proto][CB_TXPACKET],
419: TASK_ARG(task), TASK_FD(task), rpc, len);
420:
421: /* lets get next packet */
422: schedReadSelf(task);
423: return NULL;
424: }
425:
426: static void *
427: acceptClients(sched_task_t *task)
428: {
429: rpc_srv_t *srv = TASK_ARG(task);
430: rpc_cli_t *c = NULL;
431: socklen_t salen = sizeof(sockaddr_t);
432: int sock;
433: #ifdef TCP_SESSION_TIMEOUT
434: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
435: #endif
436:
437: c = _allocClient(srv, NULL);
438: if (!c) {
439: EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
440: if ((sock = accept(TASK_FD(task), NULL, NULL)) != -1) {
441: shutdown(sock, SHUT_RDWR);
442: close(sock);
443: }
444: goto end;
445: }
446:
447: /* accept client */
448: c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
449: if (c->cli_sock == -1) {
450: LOGERR;
451: AIT_FREE_VAL(&c->cli_buf);
452: array_Del(srv->srv_clients, c->cli_id, 42);
453: goto end;
454: } else
455: fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
456:
457: #ifdef TCP_SESSION_TIMEOUT
458: /* armed timer for close stateless connection */
459: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
460: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], c,
461: ts, c, 0);
462: #endif
463: schedRead(TASK_ROOT(task), cbProto[srv->srv_proto][CB_RXPACKET], c,
464: c->cli_sock, NULL, 0);
465: end:
466: schedReadSelf(task);
467: return NULL;
468: }
469:
470:
471: static void *
472: txUDPPacket(sched_task_t *task)
473: {
474: rpc_cli_t *c = TASK_ARG(task);
475: rpc_srv_t *s = c->cli_parent;
476: rpc_func_t *f = NULL;
477: u_char *buf = AIT_GET_BUF(&c->cli_buf);
478: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
479: int ret, estlen, wlen = sizeof(struct tagRPCCall);
480: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
481: struct pollfd pfd;
482:
483: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
484: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
485: TASK_ARG(task), ts, TASK_ARG(task), 0);
486:
487: if (rpc->call_argc) {
488: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
489: if (!f) {
490: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
491: rpc->call_argc ^= rpc->call_argc;
492: rpc->call_rep.ret = RPC_ERROR(-1);
493: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
494: } else {
495: /* calc estimated length */
496: estlen = ait_resideVars(RPC_RETVARS(c)) + wlen;
497: if (estlen > AIT_LEN(&c->cli_buf))
498: AIT_RE_BUF(&c->cli_buf, estlen);
499: buf = AIT_GET_BUF(&c->cli_buf);
500: rpc = (struct tagRPCCall*) buf;
501:
502: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
503: /* Go Encapsulate variables */
504: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
505: RPC_RETVARS(c));
506: if (ret == -1) {
507: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
508: rpc->call_argc ^= rpc->call_argc;
509: rpc->call_rep.ret = RPC_ERROR(-1);
510: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
511: } else
512: wlen += ret;
513: }
514: }
515:
516: /* Free return values */
517: ait_freeVars(&c->cli_vars);
518:
519: rpc->call_len = htonl(wlen);
520: rpc->call_io = RPC_ACK;
521:
522: /* calculate CRC */
523: rpc->call_crc ^= rpc->call_crc;
524: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
525:
526: /* send reply */
527: pfd.fd = TASK_FD(task);
528: pfd.events = POLLOUT;
529: for (; wlen > 0; wlen -= ret, buf += ret) {
530: if ((ret = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 ||
531: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
532: if (ret)
533: LOGERR;
534: else
535: rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
536: /* close connection */
537: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
538: TASK_ARG(task), 0, NULL, 0);
539: return NULL;
540: }
541: ret = sendto(TASK_FD(task), buf, MIN(wlen, s->srv_netbuf), MSG_NOSIGNAL,
542: &c->cli_sa.sa, c->cli_sa.sa.sa_len);
543: if (ret == -1) {
544: /* close connection */
545: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
546: TASK_ARG(task), 0, NULL, 0);
547: return NULL;
548: }
549: }
550:
551: return NULL;
552: }
553:
554: static void *
555: rxUDPPacket(sched_task_t *task)
556: {
557: rpc_srv_t *srv = TASK_ARG(task);
558: rpc_cli_t *c = NULL;
559: int len, rlen, noreply, estlen;
560: u_short crc;
561: u_char *buf, b[sizeof(struct tagRPCCall)];
562: struct tagRPCCall *rpc = (struct tagRPCCall*) b;
563: sockaddr_t sa;
564: socklen_t salen;
565: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
566: struct pollfd pfd;
567:
568: /* receive connect packet */
569: salen = sa.ss.ss_len = sizeof(sockaddr_t);
570: rlen = recvfrom(TASK_FD(task), b, sizeof b, MSG_PEEK, &sa.sa, &salen);
571: if (rlen < sizeof(struct tagRPCCall)) {
572: rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
573: goto end;
574: }
575:
576: c = _allocClient(srv, &sa);
577: if (!c) {
578: EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
579: usleep(2000); /* blocked client delay */
580: goto end;
581: } else {
582: estlen = ntohl(rpc->call_len);
583: if (estlen > AIT_LEN(&c->cli_buf))
584: AIT_RE_BUF(&c->cli_buf, estlen);
585: rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
586: buf = AIT_GET_BUF(&c->cli_buf);
587: len = estlen;
588:
589: c->cli_sock = TASK_FD(task);
590: memcpy(&c->cli_sa, &sa, sizeof c->cli_sa);
591:
592: /* armed timer for close stateless connection */
593: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
594: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
595: c, ts, c, 0);
596: }
597:
598: /* get next part of packet */
599: memset(buf, 0, len);
600: pfd.fd = TASK_FD(task);
601: pfd.events = POLLIN | POLLPRI;
602: for (; len > 0; len -= rlen, buf += rlen) {
603: if ((rlen = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 ||
604: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
605: if (rlen)
606: LOGERR;
607: else
608: rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
609: schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
610: c, 0, NULL, 0);
611: goto end;
612: }
613: salen = sa.ss.ss_len = sizeof(sockaddr_t);
614: rlen = recvfrom(TASK_FD(task), buf, len, 0, &sa.sa, &salen);
615: if (rlen == -1) {
616: /* close connection */
617: schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
618: c, 0, NULL, 0);
619: goto end;
620: }
621: if (e_addrcmp(&c->cli_sa, &sa, 42))
622: rlen ^= rlen; /* skip if arrive from different address */
623: }
624: len = estlen;
625:
626: /* skip loop packet */
627: if (rpc->call_io & RPC_ACK)
628: goto end;
629:
630: /* check integrity of packet */
631: crc = ntohs(rpc->call_crc);
632: rpc->call_crc ^= rpc->call_crc;
633: if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
634: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
635: /* close connection */
636: schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
637: c, 0, NULL, 0);
638: goto end;
639: }
640:
641: noreply = RPC_CHK_NOREPLY(rpc);
642:
643: /* check RPC packet session info */
644: if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
645: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
646:
647: rpc->call_argc ^= rpc->call_argc;
648: rpc->call_rep.ret = RPC_ERROR(-1);
649: rpc->call_rep.eno = RPC_ERROR(errno);
650: } else {
651: /* execute RPC call */
652: schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
653: }
654:
655: /* send RPC reply */
656: if (!noreply)
657: schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],
658: c, TASK_FD(task), rpc, len);
659: end:
660: schedReadSelf(task);
661: return NULL;
662: }
663:
664:
665: static void *
666: txRAWPacket(sched_task_t *task)
667: {
668: rpc_cli_t *c = TASK_ARG(task);
669: rpc_srv_t *s = c->cli_parent;
670: rpc_func_t *f = NULL;
671: u_char *buf = AIT_GET_BUF(&c->cli_buf);
672: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
673: int ret, estlen, wlen = sizeof(struct tagRPCCall);
674: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
675:
676: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
677: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
678: TASK_ARG(task), ts, TASK_ARG(task), 0);
679:
680: if (rpc->call_argc) {
681: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
682: if (!f) {
683: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
684: rpc->call_argc ^= rpc->call_argc;
685: rpc->call_rep.ret = RPC_ERROR(-1);
686: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
687: } else {
688: /* calc estimated length */
689: estlen = ait_resideVars(RPC_RETVARS(c)) + wlen;
690: if (estlen > AIT_LEN(&c->cli_buf))
691: AIT_RE_BUF(&c->cli_buf, estlen);
692: buf = AIT_GET_BUF(&c->cli_buf);
693: rpc = (struct tagRPCCall*) buf;
694:
695: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
696: /* Go Encapsulate variables */
697: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
698: RPC_RETVARS(c));
699: if (ret == -1) {
700: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
701: rpc->call_argc ^= rpc->call_argc;
702: rpc->call_rep.ret = RPC_ERROR(-1);
703: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
704: } else
705: wlen += ret;
706: }
707: }
708:
709: /* Free return values */
710: ait_freeVars(&c->cli_vars);
711:
712: rpc->call_len = htonl(wlen);
713: rpc->call_io = RPC_ACK;
714:
715: /* calculate CRC */
716: rpc->call_crc ^= rpc->call_crc;
717: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
718:
719: /* send reply */
720: ret = sendto(TASK_FD(task), buf, MIN(wlen, s->srv_netbuf), MSG_NOSIGNAL,
721: &c->cli_sa.sa, c->cli_sa.sa.sa_len);
722: if (ret == -1) {
723: /* close connection */
724: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
725: TASK_ARG(task), 0, NULL, 0);
726: return NULL;
727: }
728:
729: return NULL;
730: }
731:
732: static void *
733: rxRAWPacket(sched_task_t *task)
734: {
735: rpc_srv_t *srv = TASK_ARG(task);
736: rpc_cli_t *c = NULL;
737: int len, rlen, noreply;
738: u_short crc;
739: struct tagRPCCall *rpc;
740: sockaddr_t sa;
741: socklen_t salen;
742: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
743: ait_val_t b = AIT_VAL_INIT;
744:
745: /* receive connect packet */
746: AIT_SET_BUF(&b, NULL, srv->srv_netbuf);
747: salen = sa.ss.ss_len = sizeof(sockaddr_t);
748: rlen = recvfrom(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b), 0, &sa.sa, &salen);
749: if (sa.sa.sa_family == AF_INET) {
750: struct ip *h;
751: h = (struct ip*) AIT_GET_BUF(&b);
752: if (rlen < ntohs(h->ip_len) || h->ip_p != IPPROTO_ERPC)
753: goto end;
754: else {
755: rlen -= sizeof(struct ip);
756: rpc = (struct tagRPCCall*)
757: (AIT_GET_BUF(&b) + sizeof(struct ip));
758: }
759: } else {
760: struct ip6_hdr *h;
761: h = (struct ip6_hdr*) AIT_GET_BUF(&b);
762: if (rlen < (ntohs(h->ip6_plen) + sizeof(struct ip6_hdr)) ||
763: h->ip6_nxt != IPPROTO_ERPC)
764: goto end;
765: else {
766: rlen -= sizeof(struct ip6_hdr);
767: rpc = (struct tagRPCCall*)
768: (AIT_GET_BUF(&b) + sizeof(struct ip6_hdr));
769: }
770: }
771: if (rlen < sizeof(struct tagRPCCall)) {
772: rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
773: goto end;
774: }
775:
776: /* skip loop packet */
777: if (rpc->call_io & RPC_ACK)
778: goto end;
779:
780: c = _allocClient(srv, &sa);
781: if (!c) {
782: EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
783: usleep(2000); /* blocked client delay */
784: goto end;
785: } else {
786: len = ntohl(rpc->call_len);
787: if (len > AIT_LEN(&c->cli_buf))
788: AIT_RE_BUF(&c->cli_buf, len);
789: memcpy(AIT_GET_BUF(&c->cli_buf), rpc, len);
790: rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
791:
792: c->cli_sock = TASK_FD(task);
793: memcpy(&c->cli_sa, &sa, sizeof c->cli_sa);
794:
795: /* armed timer for close stateless connection */
796: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
797: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
798: c, ts, c, 0);
799: }
800:
801: /* check integrity of packet */
802: crc = ntohs(rpc->call_crc);
803: rpc->call_crc ^= rpc->call_crc;
804: if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
805: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
806: /* close connection */
807: schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
808: c, 0, NULL, 0);
809: goto end;
810: }
811:
812: noreply = RPC_CHK_NOREPLY(rpc);
813:
814: /* check RPC packet session info */
815: if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
816: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
817:
818: rpc->call_argc ^= rpc->call_argc;
819: rpc->call_rep.ret = RPC_ERROR(-1);
820: rpc->call_rep.eno = RPC_ERROR(errno);
821: } else {
822: /* execute RPC call */
823: schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
824: }
825:
826: /* send RPC reply */
827: if (!noreply)
828: schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],
829: c, TASK_FD(task), rpc, len);
830: end:
831: AIT_FREE_VAL(&b);
832: schedReadSelf(task);
833: return NULL;
834: }
835:
836:
837: static void *
838: txBPFPacket(sched_task_t *task)
839: {
840: rpc_cli_t *c = TASK_ARG(task);
841: rpc_srv_t *s = c->cli_parent;
842: rpc_func_t *f = NULL;
843: u_char *buf = AIT_GET_BUF(&c->cli_buf);
844: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
845: int ret, len, wlen = sizeof(struct tagRPCCall);
846: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
847: struct ether_header *eh;
848: ait_val_t b = AIT_VAL_INIT;
849:
850: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
851: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
852: TASK_ARG(task), ts, TASK_ARG(task), 0);
853:
854: if (rpc->call_argc) {
855: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
856: if (!f) {
857: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
858: rpc->call_argc ^= rpc->call_argc;
859: rpc->call_rep.ret = RPC_ERROR(-1);
860: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
861: } else {
862: /* calc estimated length */
863: len = ait_resideVars(RPC_RETVARS(c)) + wlen;
864: if (len > AIT_LEN(&c->cli_buf))
865: AIT_RE_BUF(&c->cli_buf, len);
866: buf = AIT_GET_BUF(&c->cli_buf);
867: rpc = (struct tagRPCCall*) buf;
868:
869: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
870: /* Go Encapsulate variables */
871: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
872: RPC_RETVARS(c));
873: if (ret == -1) {
874: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
875: rpc->call_argc ^= rpc->call_argc;
876: rpc->call_rep.ret = RPC_ERROR(-1);
877: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
878: } else
879: wlen += ret;
880: }
881: }
882:
883: /* Free return values */
884: ait_freeVars(&RPC_RETVARS(c));
885:
886: rpc->call_len = htonl(wlen);
887: rpc->call_io = RPC_ACK;
888:
889: /* calculate CRC */
890: rpc->call_crc ^= rpc->call_crc;
891: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
892:
893: /* send reply */
894: AIT_SET_BUF(&b, NULL, MIN(wlen, s->srv_netbuf) + ETHER_HDR_LEN);
895: eh = (struct ether_header*) AIT_GET_BUF(&b);
896: memcpy(eh->ether_dhost, LLADDR(&c->cli_sa.sdl), ETHER_ADDR_LEN);
897: eh->ether_type = htons(RPC_DEFPORT);
898: memcpy(eh + 1, buf, MIN(wlen, s->srv_netbuf));
899:
900: ret = write(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b));
901: AIT_FREE_VAL(&b);
902: if (ret == -1) {
903: /* close connection */
904: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
905: TASK_ARG(task), 0, NULL, 0);
906: return NULL;
907: }
908:
909: return NULL;
910: }
911:
912: static void *
913: rxBPFPacket(sched_task_t *task)
914: {
915: rpc_srv_t *srv = TASK_ARG(task);
916: rpc_cli_t *c = NULL;
917: int len, rlen, noreply;
918: u_short crc;
919: struct tagRPCCall *rpc;
920: sockaddr_t sa;
921: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
922: struct bpf_hdr *h;
923: struct ether_header *eh;
924: ait_val_t b = AIT_VAL_INIT;
925:
926: /* receive connect packet */
927: AIT_SET_BUF(&b, NULL, srv->srv_netbuf);
928: rlen = read(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b));
929: h = (struct bpf_hdr*) AIT_GET_BUF(&b);
930: rlen -= h->bh_hdrlen;
931: if (rlen < h->bh_datalen || h->bh_caplen != h->bh_datalen ||
932: rlen < ETHER_HDR_LEN + sizeof(struct tagRPCCall)) {
933: rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
934: goto end;
935: } else {
936: rlen = h->bh_caplen;
937: eh = (struct ether_header*) (AIT_GET_BUF(&b) + h->bh_hdrlen);
938: rlen -= ETHER_HDR_LEN;
939: rpc = (struct tagRPCCall*) (eh + 1);
940:
941: #if 0
942: /* skip loop packet */
943: if (rpc->call_io & RPC_ACK)
944: goto end;
945: #endif
946:
947: if (eh->ether_type != ntohs(RPC_DEFPORT))
948: goto end;
949: else
950: e_getlinkbymac((const ether_addr_t*) eh->ether_shost, &sa);
951: }
952:
953: c = _allocClient(srv, &sa);
954: if (!c) {
955: EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
956: usleep(2000); /* blocked client delay */
957: goto end;
958: } else {
959: len = ntohl(rpc->call_len);
960: if (len > AIT_LEN(&c->cli_buf))
961: AIT_RE_BUF(&c->cli_buf, len);
962: memcpy(AIT_GET_BUF(&c->cli_buf), rpc, len);
963: rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
964:
965: c->cli_sock = TASK_FD(task);
966: memcpy(&c->cli_sa, &sa, sizeof c->cli_sa);
967:
968: /* armed timer for close stateless connection */
969: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
970: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
971: c, ts, c, 0);
972: }
973:
974: /* check integrity of packet */
975: crc = ntohs(rpc->call_crc);
976: rpc->call_crc ^= rpc->call_crc;
977: if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
978: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
979: /* close connection */
980: schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
981: c, 0, NULL, 0);
982: goto end;
983: }
984:
985: noreply = RPC_CHK_NOREPLY(rpc);
986:
987: /* check RPC packet session info */
988: if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
989: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
990:
991: rpc->call_argc ^= rpc->call_argc;
992: rpc->call_rep.ret = RPC_ERROR(-1);
993: rpc->call_rep.eno = RPC_ERROR(errno);
994: } else {
995: /* execute RPC call */
996: schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
997: }
998:
999: /* send RPC reply */
1000: if (!noreply)
1001: schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],
1002: c, TASK_FD(task), rpc, len);
1003: end:
1004: AIT_FREE_VAL(&b);
1005: schedReadSelf(task);
1006: return NULL;
1007: }
1008:
1009:
1010: static void *
1011: txEXTPacket(sched_task_t *task)
1012: {
1013: rpc_cli_t *c = TASK_ARG(task);
1014: rpc_srv_t *s = c->cli_parent;
1015: rpc_func_t *f = NULL;
1016: u_char *buf = AIT_GET_BUF(&c->cli_buf);
1017: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
1018: int ret, len, wlen = sizeof(struct tagRPCCall);
1019: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
1020:
1021: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
1022: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
1023: TASK_ARG(task), ts, TASK_ARG(task), 0);
1024:
1025: if (rpc->call_argc) {
1026: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
1027: if (!f) {
1028: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
1029: rpc->call_argc ^= rpc->call_argc;
1030: rpc->call_rep.ret = RPC_ERROR(-1);
1031: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
1032: } else {
1033: /* calc estimated length */
1034: len = ait_resideVars(RPC_RETVARS(c)) + wlen;
1035: if (len > AIT_LEN(&c->cli_buf))
1036: AIT_RE_BUF(&c->cli_buf, len);
1037: buf = AIT_GET_BUF(&c->cli_buf);
1038: rpc = (struct tagRPCCall*) buf;
1039:
1040: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
1041: /* Go Encapsulate variables */
1042: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
1043: RPC_RETVARS(c));
1044: if (ret == -1) {
1045: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
1046: rpc->call_argc ^= rpc->call_argc;
1047: rpc->call_rep.ret = RPC_ERROR(-1);
1048: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
1049: } else
1050: wlen += ret;
1051: }
1052: }
1053:
1054: /* Free return values */
1055: ait_freeVars(&RPC_RETVARS(c));
1056:
1057: rpc->call_len = htonl(wlen);
1058: rpc->call_io = RPC_ACK;
1059:
1060: /* send reply */
1061: ret = write(TASK_FD(task), buf, MIN(wlen, s->srv_netbuf));
1062: if (ret == -1) {
1063: /* close connection */
1064: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
1065: TASK_ARG(task), 0, NULL, 0);
1066: return NULL;
1067: }
1068:
1069: return NULL;
1070: }
1071:
1072: static void *
1073: rxEXTPacket(sched_task_t *task)
1074: {
1075: rpc_srv_t *srv = TASK_ARG(task);
1076: rpc_cli_t *c = NULL;
1077: int len, rlen, noreply;
1078: struct tagRPCCall *rpc;
1079: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
1080: ait_val_t b = AIT_VAL_INIT;
1081: sockaddr_t sa;
1082:
1083: memset(&sa, 0, sizeof sa);
1084: /* receive connect packet */
1085: AIT_SET_BUF(&b, NULL, srv->srv_netbuf);
1086: rlen = read(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b));
1087: if (rlen < sizeof(struct tagRPCCall)) {
1088: rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
1089: goto end;
1090: } else {
1091: rpc = (struct tagRPCCall*) AIT_GET_BUF(&b);
1092:
1093: /* skip loop packet */
1094: if (rpc->call_io & RPC_ACK)
1095: goto end;
1096: }
1097:
1098: c = _allocClient(srv, &sa);
1099: if (!c) {
1100: EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
1101: usleep(2000); /* blocked client delay */
1102: goto end;
1103: } else {
1104: len = ntohl(rpc->call_len);
1105: if (len > AIT_LEN(&c->cli_buf))
1106: AIT_RE_BUF(&c->cli_buf, len);
1107: memcpy(AIT_GET_BUF(&c->cli_buf), rpc, AIT_LEN(&c->cli_buf));
1108: rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
1109:
1110: c->cli_sock = TASK_FD(task);
1111:
1112: /* armed timer for close stateless connection */
1113: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
1114: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
1115: c, ts, c, 0);
1116: }
1117:
1118: noreply = RPC_CHK_NOREPLY(rpc);
1119:
1120: /* check RPC packet session info */
1121: if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
1122: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
1123:
1124: rpc->call_argc ^= rpc->call_argc;
1125: rpc->call_rep.ret = RPC_ERROR(-1);
1126: rpc->call_rep.eno = RPC_ERROR(errno);
1127: } else {
1128: /* execute RPC call */
1129: schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
1130: }
1131:
1132: /* send RPC reply */
1133: if (!noreply)
1134: schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],
1135: c, TASK_FD(task), rpc, len);
1136: end:
1137: AIT_FREE_VAL(&b);
1138: schedReadSelf(task);
1139: return NULL;
1140: }
1141:
1142: /* ------------------------------------------------------ */
1143:
1144: void
1145: rpc_freeBLOBCli(rpc_cli_t * __restrict c)
1146: {
1147: rpc_srv_t *s = c->cli_parent;
1148:
1149: schedCancelby(s->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
1150:
1151: /* free buffer */
1152: AIT_FREE_VAL(&c->cli_buf);
1153:
1154: array_Del(s->srv_blob.clients, c->cli_id, 0);
1155: if (c)
1156: e_free(c);
1157: }
1158:
1159:
1160: static void *
1161: closeBLOBClient(sched_task_t *task)
1162: {
1163: int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock;
1164:
1165: rpc_freeBLOBCli(TASK_ARG(task));
1166:
1167: /* close client socket */
1168: shutdown(sock, SHUT_RDWR);
1169: close(sock);
1170: return NULL;
1171: }
1172:
1173: static void *
1174: txBLOB(sched_task_t *task)
1175: {
1176: rpc_cli_t *c = TASK_ARG(task);
1177: u_char *buf = AIT_GET_BUF(&c->cli_buf);
1178: int wlen = sizeof(struct tagBLOBHdr);
1179:
1180: /* send reply */
1181: wlen = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);
1182: if (wlen == -1 || wlen != sizeof(struct tagBLOBHdr)) {
1183: /* close blob connection */
1184: schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
1185: }
1186:
1187: return NULL;
1188: }
1189:
1190: static void *
1191: rxBLOB(sched_task_t *task)
1192: {
1193: rpc_cli_t *c = TASK_ARG(task);
1194: rpc_srv_t *s = c->cli_parent;
1195: rpc_blob_t *b;
1196: struct tagBLOBHdr blob;
1197: int rlen;
1198:
1199: memset(&blob, 0, sizeof blob);
1200: rlen = recv(TASK_FD(task), &blob, sizeof blob, 0);
1201: if (rlen < 1) {
1202: /* close blob connection */
1203: schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
1204: return NULL;
1205: }
1206:
1207: /* check BLOB packet */
1208: if (rlen < sizeof(struct tagBLOBHdr)) {
1209: rpc_SetErr(ERPCMISMATCH, "Short BLOB packet");
1210:
1211: schedReadSelf(task);
1212: return NULL;
1213: }
1214:
1215: /* check RPC packet session info */
1216: if (rpc_chkPktSession(&blob.hdr_session, &s->srv_session)) {
1217: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
1218: blob.hdr_cmd = error;
1219: goto end;
1220: }
1221:
1222: /* Go to proceed packet ... */
1223: switch (blob.hdr_cmd) {
1224: case get:
1225: if (!(b = rpc_srv_getBLOB(s, ntohl(blob.hdr_var)))) {
1226: rpc_SetErr(EINVAL, "Var=%x not found", ntohl(blob.hdr_var));
1227: blob.hdr_cmd = no;
1228: blob.hdr_ret = RPC_ERROR(-1);
1229: break;
1230: } else
1231: blob.hdr_len = htonl(b->blob_len);
1232:
1233: if (rpc_srv_blobMap(s, b) != -1) {
1234: /* deliver BLOB variable to client */
1235: blob.hdr_ret = htonl(rpc_srv_sendBLOB(c, b));
1236: rpc_srv_blobUnmap(b);
1237: } else {
1238: blob.hdr_cmd = error;
1239: blob.hdr_ret = RPC_ERROR(-1);
1240: }
1241: break;
1242: case set:
1243: if ((b = rpc_srv_registerBLOB(s, ntohl(blob.hdr_len),
1244: ntohl(blob.hdr_ret)))) {
1245: /* set new BLOB variable for reply :) */
1246: blob.hdr_var = htonl(b->blob_var);
1247:
1248: /* receive BLOB from client */
1249: blob.hdr_ret = htonl(rpc_srv_recvBLOB(c, b));
1250: rpc_srv_blobUnmap(b);
1251: } else {
1252: blob.hdr_cmd = error;
1253: blob.hdr_ret = RPC_ERROR(-1);
1254: }
1255: break;
1256: case unset:
1257: if (rpc_srv_unregisterBLOB(s, ntohl(blob.hdr_var)) == -1) {
1258: blob.hdr_cmd = error;
1259: blob.hdr_ret = RPC_ERROR(-1);
1260: }
1261: break;
1262: default:
1263: rpc_SetErr(EPROCUNAVAIL, "Unsupported BLOB command %d", blob.hdr_cmd);
1264: blob.hdr_cmd = error;
1265: blob.hdr_ret = RPC_ERROR(-1);
1266: }
1267:
1268: end:
1269: memcpy(AIT_ADDR(&c->cli_buf), &blob, sizeof blob);
1270: schedWrite(TASK_ROOT(task), txBLOB, TASK_ARG(task), TASK_FD(task), NULL, 0);
1271: schedReadSelf(task);
1272: return NULL;
1273: }
1274:
1275: static void *
1276: flushBLOB(sched_task_t *task)
1277: {
1278: uintptr_t sigArg = atomic_load_acq_ptr(&_glSigArg);
1279: rpc_srv_t *srv = sigArg ? (void*) sigArg : TASK_ARG(task);
1280: rpc_blob_t *b, *tmp;
1281:
1282: TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
1283: TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
1284:
1285: rpc_srv_blobFree(srv, b);
1286: e_free(b);
1287: }
1288:
1289: if (!schedSignalSelf(task)) {
1290: /* disabled kqueue support in libaitsched */
1291: struct sigaction sa;
1292:
1293: memset(&sa, 0, sizeof sa);
1294: sigemptyset(&sa.sa_mask);
1295: sa.sa_handler = (void (*)(int)) flushBLOB;
1296: sa.sa_flags = SA_RESTART | SA_RESETHAND;
1297: sigaction(SIGFBLOB, &sa, NULL);
1298: }
1299:
1300: return NULL;
1301: }
1302:
1303: static void *
1304: acceptBLOBClients(sched_task_t *task)
1305: {
1306: rpc_srv_t *srv = TASK_ARG(task);
1307: rpc_cli_t *c = NULL;
1308: register int i;
1309: socklen_t salen = sizeof(sockaddr_t);
1310: int sock;
1311: #ifdef TCP_NOPUSH
1312: int n = 1;
1313: #endif
1314:
1315: /* check free slots for connect */
1316: for (i = 0; i < array_Size(srv->srv_blob.clients) &&
1317: (c = array(srv->srv_blob.clients, i, rpc_cli_t*)); i++);
1318: if (c) { /* no more free slots! */
1319: EVERBOSE(1, "BLOB client quota exceeded! Connection will be shutdown!\n");
1320: if ((sock = accept(TASK_FD(task), NULL, NULL)) != -1) {
1321: shutdown(sock, SHUT_RDWR);
1322: close(sock);
1323: }
1324: goto end;
1325: }
1326:
1327: c = e_malloc(sizeof(rpc_cli_t));
1328: if (!c) {
1329: LOGERR;
1330: srv->srv_kill = srv->srv_blob.kill = 1;
1331: return NULL;
1332: } else {
1333: memset(c, 0, sizeof(rpc_cli_t));
1334: array_Set(srv->srv_blob.clients, i, c);
1335: c->cli_id = i;
1336: c->cli_parent = srv;
1337: }
1338:
1339: /* alloc empty buffer */
1340: AIT_SET_BUFSIZ(&c->cli_buf, 0, srv->srv_netbuf);
1341:
1342: /* accept client */
1343: c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
1344: if (c->cli_sock == -1) {
1345: LOGERR;
1346: AIT_FREE_VAL(&c->cli_buf);
1347: array_Del(srv->srv_blob.clients, i, 42);
1348: goto end;
1349: } else {
1350: #ifdef TCP_NOPUSH
1351: setsockopt(c->cli_sock, IPPROTO_TCP, TCP_NOPUSH, &n, sizeof n);
1352: #endif
1353: fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
1354: }
1355:
1356: schedRead(TASK_ROOT(task), rxBLOB, c, c->cli_sock, NULL, 0);
1357: end:
1358: schedReadSelf(task);
1359: return NULL;
1360: }
1361:
1362: /* ------------------------------------------------------ */
1363:
1364: /*
1365: * rpc_srv_initBLOBServer() - Init & create BLOB Server
1366: *
1367: * @srv = RPC server instance
1368: * @Port = Port for bind server, if Port == 0 default port is selected
1369: * @diskDir = Disk place for BLOB file objects
1370: * return: -1 == error or 0 bind and created BLOB server instance
1371: */
1372: int
1373: rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)
1374: {
1375: int n = 1;
1376:
1377: if (!srv || srv->srv_kill) {
1378: rpc_SetErr(EINVAL, "Invalid parameters can`t init BLOB server");
1379: return -1;
1380: }
1381:
1382: memset(&srv->srv_blob, 0, sizeof srv->srv_blob);
1383: if (access(diskDir, R_OK | W_OK) == -1) {
1384: LOGERR;
1385: return -1;
1386: } else
1387: AIT_SET_STR(&srv->srv_blob.dir, diskDir);
1388:
1389: /* init blob list */
1390: TAILQ_INIT(&srv->srv_blob.blobs);
1391:
1392: srv->srv_blob.server.cli_parent = srv;
1393:
1394: memcpy(&srv->srv_blob.server.cli_sa, &srv->srv_server.cli_sa, sizeof(sockaddr_t));
1395: switch (srv->srv_blob.server.cli_sa.sa.sa_family) {
1396: case AF_INET:
1397: srv->srv_blob.server.cli_sa.sin.sin_port =
1398: htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin.sin_port) + 1);
1399: break;
1400: case AF_INET6:
1401: srv->srv_blob.server.cli_sa.sin6.sin6_port =
1402: htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin6.sin6_port) + 1);
1403: break;
1404: case AF_LOCAL:
1405: strlcat(srv->srv_blob.server.cli_sa.sun.sun_path, ".blob",
1406: sizeof srv->srv_blob.server.cli_sa.sun.sun_path);
1407: break;
1408: default:
1409: AIT_FREE_VAL(&srv->srv_blob.dir);
1410: return -1;
1411: }
1412:
1413: /* create BLOB server socket */
1414: srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);
1415: if (srv->srv_blob.server.cli_sock == -1) {
1416: LOGERR;
1417: AIT_FREE_VAL(&srv->srv_blob.dir);
1418: return -1;
1419: }
1420: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
1421: LOGERR;
1422: close(srv->srv_blob.server.cli_sock);
1423: AIT_FREE_VAL(&srv->srv_blob.dir);
1424: return -1;
1425: }
1426: n = srv->srv_netbuf;
1427: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
1428: LOGERR;
1429: close(srv->srv_blob.server.cli_sock);
1430: AIT_FREE_VAL(&srv->srv_blob.dir);
1431: return -1;
1432: }
1433: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
1434: LOGERR;
1435: close(srv->srv_blob.server.cli_sock);
1436: AIT_FREE_VAL(&srv->srv_blob.dir);
1437: return -1;
1438: }
1439: if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa,
1440: srv->srv_blob.server.cli_sa.sa.sa_len) == -1) {
1441: LOGERR;
1442: close(srv->srv_blob.server.cli_sock);
1443: AIT_FREE_VAL(&srv->srv_blob.dir);
1444: return -1;
1445: } else
1446: fcntl(srv->srv_blob.server.cli_sock, F_SETFL,
1447: fcntl(srv->srv_blob.server.cli_sock, F_GETFL) | O_NONBLOCK);
1448:
1449:
1450: /* allocate pool for concurent blob clients */
1451: srv->srv_blob.clients = array_Init(array_Size(srv->srv_clients));
1452: if (!srv->srv_blob.clients) {
1453: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
1454: close(srv->srv_blob.server.cli_sock);
1455: AIT_FREE_VAL(&srv->srv_blob.dir);
1456: return -1;
1457: }
1458:
1459: /* init blob scheduler */
1460: srv->srv_blob.root = schedBegin();
1461: if (!srv->srv_blob.root) {
1462: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1463: array_Destroy(&srv->srv_blob.clients);
1464: close(srv->srv_blob.server.cli_sock);
1465: AIT_FREE_VAL(&srv->srv_blob.dir);
1466: return -1;
1467: }
1468:
1469: return 0;
1470: }
1471:
1472: /*
1473: * rpc_srv_endBLOBServer() - Destroy BLOB server, close all opened sockets and free resources
1474: *
1475: * @srv = RPC Server instance
1476: * return: none
1477: */
1478: void
1479: rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
1480: {
1481: if (!srv)
1482: return;
1483:
1484: srv->srv_blob.kill = 1;
1485:
1486: schedEnd(&srv->srv_blob.root);
1487:
1488: if (srv->srv_blob.server.cli_sa.sa.sa_family == AF_LOCAL)
1489: unlink(srv->srv_blob.server.cli_sa.sun.sun_path);
1490: }
1491:
1492: /*
1493: * rpc_srv_loopBLOBServer() - Execute Main BLOB server loop and wait for clients requests
1494: *
1495: * @srv = RPC Server instance
1496: * return: -1 error or 0 ok, infinite loop ...
1497: */
1498: int
1499: rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv)
1500: {
1501: rpc_cli_t *c;
1502: register int i;
1503: rpc_blob_t *b, *tmp;
1504: struct timespec ts = { RPC_SCHED_POLLING, 0 };
1505:
1506: if (!srv || srv->srv_kill) {
1507: rpc_SetErr(EINVAL, "Invalid parameter can`t start BLOB server");
1508: return -1;
1509: }
1510:
1511: if (listen(srv->srv_blob.server.cli_sock, array_Size(srv->srv_blob.clients)) == -1) {
1512: LOGERR;
1513: return -1;
1514: }
1515:
1516: if (!schedSignal(srv->srv_blob.root, flushBLOB, srv, SIGFBLOB, NULL, 0)) {
1517: /* disabled kqueue support in libaitsched */
1518: struct sigaction sa;
1519:
1520: atomic_store_rel_ptr(&_glSigArg, (uintptr_t) srv);
1521:
1522: memset(&sa, 0, sizeof sa);
1523: sigemptyset(&sa.sa_mask);
1524: sa.sa_handler = (void (*)(int)) flushBLOB;
1525: sa.sa_flags = SA_RESTART | SA_RESETHAND;
1526: sigaction(SIGFBLOB, &sa, NULL);
1527: }
1528:
1529: if (!schedRead(srv->srv_blob.root, acceptBLOBClients, srv,
1530: srv->srv_blob.server.cli_sock, NULL, 0)) {
1531: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1532: return -1;
1533: }
1534:
1535: schedPolling(srv->srv_blob.root, &ts, NULL);
1536: /* main rpc loop */
1537: schedRun(srv->srv_blob.root, &srv->srv_blob.kill);
1538:
1539: /* detach blobs */
1540: TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
1541: TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
1542:
1543: rpc_srv_blobFree(srv, b);
1544: e_free(b);
1545: }
1546:
1547: /* close all clients connections & server socket */
1548: for (i = 0; i < array_Size(srv->srv_blob.clients); i++) {
1549: c = array(srv->srv_blob.clients, i, rpc_cli_t*);
1550: if (c) {
1551: shutdown(c->cli_sock, SHUT_RDWR);
1552: close(c->cli_sock);
1553:
1554: schedCancelby(srv->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
1555: AIT_FREE_VAL(&c->cli_buf);
1556: }
1557: array_Del(srv->srv_blob.clients, i, 42);
1558: }
1559: array_Destroy(&srv->srv_blob.clients);
1560:
1561: close(srv->srv_blob.server.cli_sock);
1562:
1563: AIT_FREE_VAL(&srv->srv_blob.dir);
1564: return 0;
1565: }
1566:
1567:
1568: /*
1569: * rpc_srv_initServer() - Init & create RPC Server
1570: *
1571: * @InstID = Instance for authentication & recognition
1572: * @concurentClients = Concurent clients at same time to this server
1573: * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
1574: * @csHost = Host name or address for bind server, if NULL any address
1575: * @Port = Port for bind server, if Port == 0 default port is selected
1576: * @proto = Protocol, if == 0 choose SOCK_STREAM
1577: * return: NULL == error or !=NULL bind and created RPC server instance
1578: */
1579: rpc_srv_t *
1580: rpc_srv_initServer(u_char InstID, int concurentClients, int netBuf,
1581: const char *csHost, u_short Port, int proto)
1582: {
1583: int n = 1;
1584: rpc_srv_t *srv = NULL;
1585: sockaddr_t sa = E_SOCKADDR_INIT;
1586:
1587: if (!concurentClients || (proto < 0 || proto > SOCK_RAW)) {
1588: rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
1589: return NULL;
1590: }
1591: if (!Port && proto < SOCK_RAW)
1592: Port = RPC_DEFPORT;
1593: if (!e_gethostbyname(csHost, Port, &sa))
1594: return NULL;
1595: if (!proto)
1596: proto = SOCK_STREAM;
1597: if (netBuf < RPC_MIN_BUFSIZ)
1598: netBuf = BUFSIZ;
1599: else
1600: netBuf = E_ALIGN(netBuf, 2); /* align netBuf length */
1601:
1602: #ifdef HAVE_SRANDOMDEV
1603: srandomdev();
1604: #else
1605: time_t tim;
1606:
1607: srandom((time(&tim) ^ getpid()));
1608: #endif
1609:
1610: srv = e_malloc(sizeof(rpc_srv_t));
1611: if (!srv) {
1612: LOGERR;
1613: return NULL;
1614: } else
1615: memset(srv, 0, sizeof(rpc_srv_t));
1616:
1617: srv->srv_proto = proto;
1618: srv->srv_netbuf = netBuf;
1619: srv->srv_session.sess_version = RPC_VERSION;
1620: srv->srv_session.sess_instance = InstID;
1621:
1622: srv->srv_server.cli_parent = srv;
1623: memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa);
1624:
1625: /* init functions */
1626: pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
1627: SLIST_INIT(&srv->srv_funcs);
1628: AVL_INIT(&srv->srv_funcs);
1629:
1630: /* init scheduler */
1631: srv->srv_root = schedBegin();
1632: if (!srv->srv_root) {
1633: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1634: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1635: e_free(srv);
1636: return NULL;
1637: }
1638:
1639: /* init pool for clients */
1640: srv->srv_clients = array_Init(concurentClients);
1641: if (!srv->srv_clients) {
1642: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
1643: schedEnd(&srv->srv_root);
1644: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1645: e_free(srv);
1646: return NULL;
1647: }
1648:
1649: /* create server socket */
1650: srv->srv_server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family,
1651: srv->srv_proto, srv->srv_proto == SOCK_RAW ? IPPROTO_ERPC : 0);
1652: if (srv->srv_server.cli_sock == -1) {
1653: LOGERR;
1654: array_Destroy(&srv->srv_clients);
1655: schedEnd(&srv->srv_root);
1656: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1657: e_free(srv);
1658: return NULL;
1659: }
1660: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
1661: LOGERR;
1662: goto err;
1663: }
1664: n = srv->srv_netbuf;
1665: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
1666: LOGERR;
1667: goto err;
1668: }
1669: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
1670: LOGERR;
1671: goto err;
1672: }
1673: if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa,
1674: srv->srv_server.cli_sa.sa.sa_len) == -1) {
1675: LOGERR;
1676: goto err;
1677: } else
1678: fcntl(srv->srv_server.cli_sock, F_SETFL,
1679: fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
1680:
1681: rpc_register_srvPing(srv);
1682:
1683: return srv;
1684: err: /* error condition */
1685: close(srv->srv_server.cli_sock);
1686: array_Destroy(&srv->srv_clients);
1687: schedEnd(&srv->srv_root);
1688: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1689: e_free(srv);
1690: return NULL;
1691: }
1692:
1693: /*
1694: * rpc_srv_endServer() - Destroy RPC server, close all opened sockets and free resources
1695: *
1696: * @psrv = RPC Server instance
1697: * return: none
1698: */
1699: void
1700: rpc_srv_endServer(rpc_srv_t ** __restrict psrv)
1701: {
1702: if (!psrv || !*psrv)
1703: return;
1704:
1705: /* if send kill to blob server */
1706: rpc_srv_endBLOBServer(*psrv);
1707:
1708: (*psrv)->srv_kill = 1;
1709: sleep(RPC_SCHED_POLLING);
1710:
1711: schedEnd(&(*psrv)->srv_root);
1712:
1713: if ((*psrv)->srv_server.cli_sa.sa.sa_family == AF_LOCAL)
1714: unlink((*psrv)->srv_server.cli_sa.sun.sun_path);
1715:
1716: pthread_mutex_destroy(&(*psrv)->srv_funcs.mtx);
1717: e_free(*psrv);
1718: *psrv = NULL;
1719: }
1720:
1721: /*
1722: * rpc_srv_loopServer() - Execute Main server loop and wait for clients requests
1723: *
1724: * @srv = RPC Server instance
1725: * return: -1 error or 0 ok, infinite loop ...
1726: */
1727: int
1728: rpc_srv_loopServer(rpc_srv_t * __restrict srv)
1729: {
1730: rpc_cli_t *c;
1731: register int i;
1732: rpc_func_t *f;
1733: struct timespec ts = { RPC_SCHED_POLLING, 0 };
1734:
1735: if (!srv) {
1736: rpc_SetErr(EINVAL, "Invalid parameter can`t start RPC server");
1737: return -1;
1738: }
1739:
1740: if (srv->srv_proto == SOCK_STREAM)
1741: if (listen(srv->srv_server.cli_sock, array_Size(srv->srv_clients)) == -1) {
1742: LOGERR;
1743: return -1;
1744: }
1745:
1746: if (!schedRead(srv->srv_root, cbProto[srv->srv_proto][CB_ACCEPTCLIENT], srv,
1747: srv->srv_server.cli_sock, NULL, 0)) {
1748: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1749: return -1;
1750: }
1751:
1752: schedPolling(srv->srv_root, &ts, NULL);
1753: /* main rpc loop */
1754: schedRun(srv->srv_root, &srv->srv_kill);
1755:
1756: /* close all clients connections & server socket */
1757: for (i = 0; i < array_Size(srv->srv_clients); i++) {
1758: c = array(srv->srv_clients, i, rpc_cli_t*);
1759: if (c) {
1760: if (srv->srv_proto == SOCK_STREAM) {
1761: shutdown(c->cli_sock, SHUT_RDWR);
1762: close(c->cli_sock);
1763: }
1764:
1765: schedCancelby(srv->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
1766: ait_freeVars(&RPC_RETVARS(c));
1767: AIT_FREE_VAL(&c->cli_buf);
1768: }
1769: array_Del(srv->srv_clients, i, 42);
1770: }
1771: array_Destroy(&srv->srv_clients);
1772:
1773: if (srv->srv_proto != SOCK_EXT)
1774: close(srv->srv_server.cli_sock);
1775:
1776: /* detach exported calls */
1777: RPC_FUNCS_LOCK(&srv->srv_funcs);
1778: while ((f = SLIST_FIRST(&srv->srv_funcs))) {
1779: SLIST_REMOVE_HEAD(&srv->srv_funcs, func_next);
1780:
1781: AIT_FREE_VAL(&f->func_name);
1782: e_free(f);
1783: }
1784: srv->srv_funcs.avlh_root = NULL;
1785: RPC_FUNCS_UNLOCK(&srv->srv_funcs);
1786:
1787: return 0;
1788: }
1789:
1790:
1791: /*
1792: * rpc_srv_execCall() Execute registered call from RPC server
1793: *
1794: * @cli = RPC client
1795: * @rpc = IN RPC call structure
1796: * @funcname = Execute RPC function
1797: * @args = IN RPC calling arguments from RPC client
1798: * return: -1 error, !=-1 ok
1799: */
1800: int
1801: rpc_srv_execCall(rpc_cli_t * __restrict cli, struct tagRPCCall * __restrict rpc,
1802: ait_val_t funcname, array_t * __restrict args)
1803: {
1804: rpc_callback_t func;
1805:
1806: if (!cli || !rpc || !AIT_ADDR(&funcname)) {
1807: rpc_SetErr(EINVAL, "Invalid parameter can`t exec function");
1808: return -1;
1809: }
1810:
1811: func = AIT_GET_LIKE(&funcname, rpc_callback_t);
1812: return func(cli, rpc, args);
1813: }
1814:
1815:
1816: /*
1817: * rpc_srv_initServer2() - Init & create layer2 RPC Server
1818: *
1819: * @InstID = Instance for authentication & recognition
1820: * @concurentClients = Concurent clients at same time to this server
1821: * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
1822: * @csIface = Interface name for bind server, if NULL first interface on host
1823: * return: NULL == error or !=NULL bind and created RPC server instance
1824: */
1825: rpc_srv_t *
1826: rpc_srv_initServer2(u_char InstID, int concurentClients, int netBuf, const char *csIface)
1827: {
1828: int n = 1;
1829: rpc_srv_t *srv = NULL;
1830: sockaddr_t sa = E_SOCKADDR_INIT;
1831: char szIface[64], szStr[STRSIZ];
1832: register int i;
1833: struct ifreq ifr;
1834: struct bpf_insn insns[] = {
1835: BPF_STMT(BPF_LD + BPF_H + BPF_ABS, 12),
1836: BPF_JUMP(BPF_JMP + BPF_JEQ + BPF_K, RPC_DEFPORT, 0, 1),
1837: BPF_STMT(BPF_RET + BPF_K, -1),
1838: BPF_STMT(BPF_RET + BPF_K, 0),
1839: };
1840: struct bpf_program fcode = {
1841: .bf_len = sizeof(insns) / sizeof(struct bpf_insn),
1842: .bf_insns = insns
1843: };
1844:
1845: if (!concurentClients) {
1846: rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
1847: return NULL;
1848: }
1849: if (!csIface) {
1850: if (e_get1stiface(szIface, sizeof szIface))
1851: return NULL;
1852: } else
1853: strlcpy(szIface, csIface, sizeof szIface);
1854: if (!e_getifacebyname(szIface, &sa))
1855: return NULL;
1856:
1857: #ifdef HAVE_SRANDOMDEV
1858: srandomdev();
1859: #else
1860: time_t tim;
1861:
1862: srandom((time(&tim) ^ getpid()));
1863: #endif
1864:
1865: srv = e_malloc(sizeof(rpc_srv_t));
1866: if (!srv) {
1867: LOGERR;
1868: return NULL;
1869: } else
1870: memset(srv, 0, sizeof(rpc_srv_t));
1871:
1872: srv->srv_proto = SOCK_BPF;
1873: srv->srv_netbuf = netBuf;
1874: srv->srv_session.sess_version = RPC_VERSION;
1875: srv->srv_session.sess_instance = InstID;
1876:
1877: srv->srv_server.cli_parent = srv;
1878: memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa);
1879:
1880: /* init functions */
1881: pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
1882: SLIST_INIT(&srv->srv_funcs);
1883: AVL_INIT(&srv->srv_funcs);
1884:
1885: /* init scheduler */
1886: srv->srv_root = schedBegin();
1887: if (!srv->srv_root) {
1888: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1889: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1890: e_free(srv);
1891: return NULL;
1892: }
1893:
1894: /* init pool for clients */
1895: srv->srv_clients = array_Init(concurentClients);
1896: if (!srv->srv_clients) {
1897: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
1898: schedEnd(&srv->srv_root);
1899: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1900: e_free(srv);
1901: return NULL;
1902: }
1903:
1904: /* create server handler */
1905: for (i = 0; i < 10; i++) {
1906: memset(szStr, 0, sizeof szStr);
1907: snprintf(szStr, sizeof szStr, "/dev/bpf%d", i);
1908: srv->srv_server.cli_sock = open(szStr, O_RDWR);
1909: if (srv->srv_server.cli_sock > STDERR_FILENO)
1910: break;
1911: }
1912: if (srv->srv_server.cli_sock < 3) {
1913: LOGERR;
1914: array_Destroy(&srv->srv_clients);
1915: schedEnd(&srv->srv_root);
1916: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1917: e_free(srv);
1918: return NULL;
1919: }
1920:
1921: if (ioctl(srv->srv_server.cli_sock, BIOCIMMEDIATE, &n) == -1) {
1922: LOGERR;
1923: goto err;
1924: }
1925: if (ioctl(srv->srv_server.cli_sock, BIOCSETF, &fcode) == -1) {
1926: LOGERR;
1927: goto err;
1928: }
1929: n = (netBuf < RPC_MIN_BUFSIZ) ? getpagesize() : E_ALIGN(netBuf, 2);
1930: if (ioctl(srv->srv_server.cli_sock, BIOCSBLEN, &n) == -1) {
1931: LOGERR;
1932: goto err;
1933: } else
1934: srv->srv_netbuf = n;
1935:
1936: memset(&ifr, 0, sizeof ifr);
1937: strlcpy(ifr.ifr_name, szIface, sizeof ifr.ifr_name);
1938: if (ioctl(srv->srv_server.cli_sock, BIOCSETIF, &ifr) == -1) {
1939: LOGERR;
1940: goto err;
1941: } else
1942: fcntl(srv->srv_server.cli_sock, F_SETFL,
1943: fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
1944:
1945: rpc_register_srvPing(srv);
1946:
1947: return srv;
1948: err: /* error condition */
1949: close(srv->srv_server.cli_sock);
1950: array_Destroy(&srv->srv_clients);
1951: schedEnd(&srv->srv_root);
1952: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1953: e_free(srv);
1954: return NULL;
1955: }
1956:
1957: /*
1958: * rpc_srv_initServerExt() - Init & create pipe RPC Server
1959: *
1960: * @InstID = Instance for authentication & recognition
1961: * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
1962: * @fd = File descriptor
1963: * return: NULL == error or !=NULL bind and created RPC server instance
1964: */
1965: rpc_srv_t *
1966: rpc_srv_initServerExt(u_char InstID, int netBuf, int fd)
1967: {
1968: rpc_srv_t *srv = NULL;
1969:
1970: #ifdef HAVE_SRANDOMDEV
1971: srandomdev();
1972: #else
1973: time_t tim;
1974:
1975: srandom((time(&tim) ^ getpid()));
1976: #endif
1977:
1978: srv = e_malloc(sizeof(rpc_srv_t));
1979: if (!srv) {
1980: LOGERR;
1981: return NULL;
1982: } else
1983: memset(srv, 0, sizeof(rpc_srv_t));
1984:
1985: srv->srv_proto = SOCK_EXT;
1986: srv->srv_netbuf = (netBuf < RPC_MIN_BUFSIZ) ?
1987: getpagesize() : E_ALIGN(netBuf, 2);
1988: srv->srv_session.sess_version = RPC_VERSION;
1989: srv->srv_session.sess_instance = InstID;
1990:
1991: srv->srv_server.cli_parent = srv;
1992: srv->srv_server.cli_sock = fd;
1993:
1994: /* init functions */
1995: pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
1996: SLIST_INIT(&srv->srv_funcs);
1997: AVL_INIT(&srv->srv_funcs);
1998:
1999: /* init scheduler */
2000: srv->srv_root = schedBegin();
2001: if (!srv->srv_root) {
2002: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
2003: pthread_mutex_destroy(&srv->srv_funcs.mtx);
2004: e_free(srv);
2005: return NULL;
2006: }
2007:
2008: /* init pool for clients */
2009: srv->srv_clients = array_Init(1);
2010: if (!srv->srv_clients) {
2011: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
2012: schedEnd(&srv->srv_root);
2013: pthread_mutex_destroy(&srv->srv_funcs.mtx);
2014: e_free(srv);
2015: return NULL;
2016: }
2017:
2018: fcntl(srv->srv_server.cli_sock, F_SETFL,
2019: fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
2020:
2021: rpc_register_srvPing(srv);
2022:
2023: return srv;
2024: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>