1: /*************************************************************************
2: * (C) 2010 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
6: * $Id: srv.c,v 1.26.2.1 2015/06/24 22:27:58 misho Exp $
7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
15: Copyright 2004 - 2015
16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47:
48:
49: /* SOCK_STREAM */
50: static void *acceptClients(sched_task_t *);
51: static void *closeClient(sched_task_t *);
52: static void *rxPacket(sched_task_t *);
53: static void *txPacket(sched_task_t *);
54:
55: /* SOCK_DGRAM */
56: static void *freeClient(sched_task_t *);
57: static void *rxUDPPacket(sched_task_t *);
58: static void *txUDPPacket(sched_task_t *);
59:
60: /* SOCK_RAW */
61: static void *rxRAWPacket(sched_task_t *);
62: static void *txRAWPacket(sched_task_t *);
63:
64: /* SOCK_BPF */
65: static void *rxBPFPacket(sched_task_t *);
66: static void *txBPFPacket(sched_task_t *);
67:
68: /* SOCK_EXT */
69: static void *rxEXTPacket(sched_task_t *);
70: static void *txEXTPacket(sched_task_t *);
71:
72: static sched_task_func_t cbProto[SOCK_MAX_SUPPORT][4] = {
73: { acceptClients, closeClient, rxPacket, txPacket }, /* SOCK_STREAM */
74: { acceptClients, closeClient, rxPacket, txPacket }, /* SOCK_STREAM */
75: { rxUDPPacket, freeClient, rxUDPPacket, txUDPPacket }, /* SOCK_DGRAM */
76: { rxRAWPacket, freeClient, rxRAWPacket, txRAWPacket }, /* SOCK_RAW */
77: { rxBPFPacket, freeClient, rxBPFPacket, txBPFPacket }, /* SOCK_BPF */
78: { rxEXTPacket, freeClient, rxEXTPacket, txEXTPacket } /* SOCK_EXT */
79: };
80:
81: /* Global Signal Argument when kqueue support disabled */
82:
83: static volatile uintptr_t _glSigArg = 0;
84:
85:
86: void
87: rpc_freeCli(rpc_cli_t * __restrict c)
88: {
89: rpc_srv_t *s = c->cli_parent;
90:
91: schedCancelby(s->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
92:
93: /* free buffer */
94: AIT_FREE_VAL(&c->cli_buf);
95:
96: array_Del(s->srv_clients, c->cli_id, 0);
97: if (c)
98: e_free(c);
99: }
100:
101:
102: static inline int
103: _check4freeslot(rpc_srv_t * __restrict srv, sockaddr_t * __restrict sa)
104: {
105: rpc_cli_t *c = NULL;
106: register int i;
107:
108: /* check free slots for connect */
109: for (i = 0; i < array_Size(srv->srv_clients) &&
110: (c = array(srv->srv_clients, i, rpc_cli_t*)); i++)
111: /* check for duplicates */
112: if (sa && !e_addrcmp(&c->cli_sa, sa, 42))
113: break;
114: if (i >= array_Size(srv->srv_clients))
115: return -1; /* no more free slots! */
116:
117: return i;
118: }
119:
120: static rpc_cli_t *
121: _allocClient(rpc_srv_t * __restrict srv, sockaddr_t * __restrict sa)
122: {
123: rpc_cli_t *c = NULL;
124: int n;
125:
126: if (srv->srv_proto != SOCK_EXT)
127: n = _check4freeslot(srv, sa);
128: else
129: n = 0;
130: if (n == -1)
131: return NULL;
132: else
133: c = array(srv->srv_clients, n, rpc_cli_t*);
134:
135: if (!c) {
136: c = e_malloc(sizeof(rpc_cli_t));
137: if (!c) {
138: LOGERR;
139: srv->srv_kill = 1;
140: return NULL;
141: } else {
142: memset(c, 0, sizeof(rpc_cli_t));
143: array_Set(srv->srv_clients, n, c);
144: c->cli_id = n;
145: c->cli_parent = srv;
146: }
147:
148: /* alloc empty buffer */
149: AIT_SET_BUFSIZ(&c->cli_buf, 0, srv->srv_netbuf);
150: }
151:
152: return c;
153: }
154:
155:
156: static void *
157: freeClient(sched_task_t *task)
158: {
159: rpc_freeCli(TASK_ARG(task));
160:
161: return NULL;
162: }
163:
164: static void *
165: closeClient(sched_task_t *task)
166: {
167: int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock;
168:
169: rpc_freeCli(TASK_ARG(task));
170:
171: /* close client socket */
172: shutdown(sock, SHUT_RDWR);
173: close(sock);
174: return NULL;
175: }
176:
177: static void *
178: txPacket(sched_task_t *task)
179: {
180: rpc_cli_t *c = TASK_ARG(task);
181: rpc_srv_t *s = c->cli_parent;
182: rpc_func_t *f = NULL;
183: u_char *buf = AIT_GET_BUF(&c->cli_buf);
184: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
185: int ret, estlen, wlen = sizeof(struct tagRPCCall);
186: struct pollfd pfd;
187: #ifdef TCP_SESSION_TIMEOUT
188: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
189:
190: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
191: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
192: TASK_ARG(task), ts, TASK_ARG(task), 0);
193: #endif
194:
195: if (rpc->call_argc) {
196: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
197: if (!f) {
198: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
199:
200: rpc->call_argc ^= rpc->call_argc;
201: rpc->call_rep.ret = RPC_ERROR(-1);
202: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
203: } else {
204: /* calc estimated length */
205: estlen = ait_resideVars(RPC_RETVARS(c)) + wlen;
206: if (estlen > AIT_LEN(&c->cli_buf))
207: AIT_RE_BUF(&c->cli_buf, estlen);
208: buf = AIT_GET_BUF(&c->cli_buf);
209: rpc = (struct tagRPCCall*) buf;
210:
211: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
212: /* Go Encapsulate variables */
213: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
214: RPC_RETVARS(c));
215: /* Free return values */
216: ait_freeVars(&c->cli_vars);
217: if (ret == -1) {
218: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
219:
220: rpc->call_argc ^= rpc->call_argc;
221: rpc->call_rep.ret = RPC_ERROR(-1);
222: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
223: } else
224: wlen += ret;
225: }
226: }
227:
228: rpc->call_len = htonl(wlen);
229: rpc->call_io = RPC_ACK;
230:
231: #if 0
232: /* calculate CRC */
233: rpc->call_crc ^= rpc->call_crc;
234: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
235: #endif
236:
237: /* send reply */
238: pfd.fd = TASK_FD(task);
239: pfd.events = POLLOUT;
240: for (; wlen > 0; wlen -= ret, buf += ret) {
241: if ((ret = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 ||
242: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
243: if (ret)
244: LOGERR;
245: else
246: rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
247: /* close connection */
248: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
249: TASK_ARG(task), 0, NULL, 0);
250: return NULL;
251: }
252: ret = send(TASK_FD(task), buf, MIN(wlen, s->srv_netbuf), MSG_NOSIGNAL);
253: if (ret == -1) {
254: /* close connection */
255: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
256: TASK_ARG(task), 0, NULL, 0);
257: return NULL;
258: }
259: }
260:
261: return NULL;
262: }
263:
264: static void *
265: execCall(sched_task_t *task)
266: {
267: rpc_cli_t *c = TASK_ARG(task);
268: rpc_srv_t *s = c->cli_parent;
269: rpc_func_t *f = NULL;
270: array_t *arr = NULL;
271: u_char *buf = AIT_GET_BUF(&c->cli_buf);
272: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
273: int argc = rpc->call_argc;
274:
275: /* Go decapsulate variables ... */
276: if (argc) {
277: arr = ait_buffer2vars(buf + sizeof(struct tagRPCCall),
278: AIT_LEN(&c->cli_buf) - sizeof(struct tagRPCCall), argc, 42);
279: if (!arr) {
280: rpc_SetErr(ERPCMISMATCH, "#%d - %s", elwix_GetErrno(), elwix_GetError());
281:
282: rpc->call_argc ^= rpc->call_argc;
283: rpc->call_rep.ret = RPC_ERROR(-1);
284: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
285: return NULL;
286: }
287: } else
288: arr = NULL;
289:
290: if (!(f = rpc_srv_getCall(s, ntohs(rpc->call_tag)))) {
291: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
292:
293: rpc->call_argc ^= rpc->call_argc;
294: rpc->call_rep.ret = RPC_ERROR(-1);
295: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
296: } else {
297: /* if client doesn't want reply */
298: argc = RPC_CHK_NOREPLY(rpc);
299: rpc->call_rep.ret = RPC_ERROR(rpc_srv_execCall(c, rpc, f->func_name, arr));
300: if (rpc->call_rep.ret == htonl(-1)) {
301: if (!rpc->call_rep.eno) {
302: LOGERR;
303: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
304: }
305: rpc->call_argc ^= rpc->call_argc;
306: } else {
307: rpc->call_rep.eno ^= rpc->call_rep.eno;
308: if (argc) {
309: /* without reply */
310: ait_freeVars(&c->cli_vars);
311: rpc->call_argc ^= rpc->call_argc;
312: } else {
313: /* reply */
314: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
315: }
316: }
317: }
318:
319: array_Destroy(&arr);
320: return NULL;
321: }
322:
323: static void *
324: rxPacket(sched_task_t *task)
325: {
326: rpc_cli_t *c = TASK_ARG(task);
327: rpc_srv_t *s = c->cli_parent;
328: int len, rlen, noreply, estlen;
329: #if 0
330: u_short crc;
331: #endif
332: u_char *buf = AIT_GET_BUF(&c->cli_buf);
333: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
334: struct pollfd pfd;
335: #ifdef TCP_SESSION_TIMEOUT
336: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
337:
338: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
339: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
340: TASK_ARG(task), ts, TASK_ARG(task), 0);
341: #endif
342:
343: memset(buf, 0, sizeof(struct tagRPCCall));
344: rlen = recv(TASK_FD(task), rpc, sizeof(struct tagRPCCall), MSG_PEEK);
345: if (rlen < sizeof(struct tagRPCCall)) {
346: /* close connection */
347: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
348: TASK_ARG(task), 0, NULL, 0);
349: return NULL;
350: } else {
351: estlen = ntohl(rpc->call_len);
352: if (estlen > AIT_LEN(&c->cli_buf))
353: AIT_RE_BUF(&c->cli_buf, estlen);
354: rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
355: buf = AIT_GET_BUF(&c->cli_buf);
356: len = estlen;
357: }
358:
359: /* get next part of packet */
360: memset(buf, 0, len);
361: pfd.fd = TASK_FD(task);
362: pfd.events = POLLIN | POLLPRI;
363: for (; len > 0; len -= rlen, buf += rlen) {
364: if ((rlen = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 ||
365: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
366: if (rlen)
367: LOGERR;
368: else
369: rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
370: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
371: TASK_ARG(task), 0, NULL, 0);
372: return NULL;
373: }
374: rlen = recv(TASK_FD(task), buf, len, 0);
375: if (rlen == -1) {
376: /* close connection */
377: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
378: TASK_ARG(task), 0, NULL, 0);
379: return NULL;
380: }
381: }
382: len = estlen;
383:
384: /* skip loop packet */
385: if (rpc->call_io & RPC_ACK) {
386: schedReadSelf(task);
387: return NULL;
388: }
389:
390: #if 0
391: /* check integrity of packet */
392: crc = ntohs(rpc->call_crc);
393: rpc->call_crc ^= rpc->call_crc;
394: if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
395: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
396: return NULL;
397: }
398: #endif
399:
400: noreply = RPC_CHK_NOREPLY(rpc);
401:
402: /* check RPC packet session info */
403: if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) {
404: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
405:
406: rpc->call_argc ^= rpc->call_argc;
407: rpc->call_rep.ret = RPC_ERROR(-1);
408: rpc->call_rep.eno = RPC_ERROR(errno);
409: } else {
410: /* execute RPC call */
411: schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), (int) noreply, rpc, len);
412: }
413:
414: /* send RPC reply */
415: if (!noreply)
416: schedWrite(TASK_ROOT(task), cbProto[s->srv_proto][CB_TXPACKET],
417: TASK_ARG(task), TASK_FD(task), rpc, len);
418:
419: /* lets get next packet */
420: schedReadSelf(task);
421: return NULL;
422: }
423:
424: static void *
425: acceptClients(sched_task_t *task)
426: {
427: rpc_srv_t *srv = TASK_ARG(task);
428: rpc_cli_t *c = NULL;
429: socklen_t salen = sizeof(sockaddr_t);
430: int sock;
431: #ifdef TCP_SESSION_TIMEOUT
432: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
433: #endif
434:
435: c = _allocClient(srv, NULL);
436: if (!c) {
437: EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
438: if ((sock = accept(TASK_FD(task), NULL, NULL)) != -1) {
439: shutdown(sock, SHUT_RDWR);
440: close(sock);
441: }
442: goto end;
443: }
444:
445: /* accept client */
446: c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
447: if (c->cli_sock == -1) {
448: LOGERR;
449: AIT_FREE_VAL(&c->cli_buf);
450: array_Del(srv->srv_clients, c->cli_id, 42);
451: goto end;
452: } else
453: fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
454:
455: #ifdef TCP_SESSION_TIMEOUT
456: /* armed timer for close stateless connection */
457: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
458: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], c,
459: ts, c, 0);
460: #endif
461: schedRead(TASK_ROOT(task), cbProto[srv->srv_proto][CB_RXPACKET], c,
462: c->cli_sock, NULL, 0);
463: end:
464: schedReadSelf(task);
465: return NULL;
466: }
467:
468:
469: static void *
470: txUDPPacket(sched_task_t *task)
471: {
472: rpc_cli_t *c = TASK_ARG(task);
473: rpc_srv_t *s = c->cli_parent;
474: rpc_func_t *f = NULL;
475: u_char *buf = AIT_GET_BUF(&c->cli_buf);
476: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
477: int ret, estlen, wlen = sizeof(struct tagRPCCall);
478: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
479: struct pollfd pfd;
480:
481: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
482: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
483: TASK_ARG(task), ts, TASK_ARG(task), 0);
484:
485: if (rpc->call_argc) {
486: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
487: if (!f) {
488: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
489: rpc->call_argc ^= rpc->call_argc;
490: rpc->call_rep.ret = RPC_ERROR(-1);
491: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
492: } else {
493: /* calc estimated length */
494: estlen = ait_resideVars(RPC_RETVARS(c)) + wlen;
495: if (estlen > AIT_LEN(&c->cli_buf))
496: AIT_RE_BUF(&c->cli_buf, estlen);
497: buf = AIT_GET_BUF(&c->cli_buf);
498: rpc = (struct tagRPCCall*) buf;
499:
500: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
501: /* Go Encapsulate variables */
502: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
503: RPC_RETVARS(c));
504: /* Free return values */
505: ait_freeVars(&c->cli_vars);
506: if (ret == -1) {
507: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
508: rpc->call_argc ^= rpc->call_argc;
509: rpc->call_rep.ret = RPC_ERROR(-1);
510: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
511: } else
512: wlen += ret;
513: }
514: }
515:
516: rpc->call_len = htonl(wlen);
517: rpc->call_io = RPC_ACK;
518:
519: /* calculate CRC */
520: rpc->call_crc ^= rpc->call_crc;
521: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
522:
523: /* send reply */
524: pfd.fd = TASK_FD(task);
525: pfd.events = POLLOUT;
526: for (; wlen > 0; wlen -= ret, buf += ret) {
527: if ((ret = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 ||
528: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
529: if (ret)
530: LOGERR;
531: else
532: rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
533: /* close connection */
534: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
535: TASK_ARG(task), 0, NULL, 0);
536: return NULL;
537: }
538: ret = sendto(TASK_FD(task), buf, MIN(wlen, s->srv_netbuf), MSG_NOSIGNAL,
539: &c->cli_sa.sa, c->cli_sa.sa.sa_len);
540: if (ret == -1) {
541: /* close connection */
542: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
543: TASK_ARG(task), 0, NULL, 0);
544: return NULL;
545: }
546: }
547:
548: return NULL;
549: }
550:
551: static void *
552: rxUDPPacket(sched_task_t *task)
553: {
554: rpc_srv_t *srv = TASK_ARG(task);
555: rpc_cli_t *c = NULL;
556: int len, rlen, noreply, estlen;
557: u_short crc;
558: u_char *buf, b[sizeof(struct tagRPCCall)];
559: struct tagRPCCall *rpc = (struct tagRPCCall*) b;
560: sockaddr_t sa;
561: socklen_t salen;
562: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
563: struct pollfd pfd;
564:
565: /* receive connect packet */
566: salen = sa.ss.ss_len = sizeof(sockaddr_t);
567: rlen = recvfrom(TASK_FD(task), b, sizeof b, MSG_PEEK, &sa.sa, &salen);
568: if (rlen < sizeof(struct tagRPCCall)) {
569: rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
570: goto end;
571: }
572:
573: c = _allocClient(srv, &sa);
574: if (!c) {
575: EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
576: usleep(2000); /* blocked client delay */
577: goto end;
578: } else {
579: estlen = ntohl(rpc->call_len);
580: if (estlen > AIT_LEN(&c->cli_buf))
581: AIT_RE_BUF(&c->cli_buf, estlen);
582: rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
583: buf = AIT_GET_BUF(&c->cli_buf);
584: len = estlen;
585:
586: c->cli_sock = TASK_FD(task);
587: memcpy(&c->cli_sa, &sa, sizeof c->cli_sa);
588:
589: /* armed timer for close stateless connection */
590: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
591: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
592: c, ts, c, 0);
593: }
594:
595: /* get next part of packet */
596: memset(buf, 0, len);
597: pfd.fd = TASK_FD(task);
598: pfd.events = POLLIN | POLLPRI;
599: for (; len > 0; len -= rlen, buf += rlen) {
600: if ((rlen = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 ||
601: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
602: if (rlen)
603: LOGERR;
604: else
605: rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
606: schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
607: c, 0, NULL, 0);
608: goto end;
609: }
610: salen = sa.ss.ss_len = sizeof(sockaddr_t);
611: rlen = recvfrom(TASK_FD(task), buf, len, 0, &sa.sa, &salen);
612: if (rlen == -1) {
613: /* close connection */
614: schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
615: c, 0, NULL, 0);
616: goto end;
617: }
618: if (e_addrcmp(&c->cli_sa, &sa, 42))
619: rlen ^= rlen; /* skip if arrive from different address */
620: }
621: len = estlen;
622:
623: /* skip loop packet */
624: if (rpc->call_io & RPC_ACK)
625: goto end;
626:
627: /* check integrity of packet */
628: crc = ntohs(rpc->call_crc);
629: rpc->call_crc ^= rpc->call_crc;
630: if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
631: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
632: /* close connection */
633: schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
634: c, 0, NULL, 0);
635: goto end;
636: }
637:
638: noreply = RPC_CHK_NOREPLY(rpc);
639:
640: /* check RPC packet session info */
641: if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
642: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
643:
644: rpc->call_argc ^= rpc->call_argc;
645: rpc->call_rep.ret = RPC_ERROR(-1);
646: rpc->call_rep.eno = RPC_ERROR(errno);
647: } else {
648: /* execute RPC call */
649: schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
650: }
651:
652: /* send RPC reply */
653: if (!noreply)
654: schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],
655: c, TASK_FD(task), rpc, len);
656: end:
657: schedReadSelf(task);
658: return NULL;
659: }
660:
661:
662: static void *
663: txRAWPacket(sched_task_t *task)
664: {
665: rpc_cli_t *c = TASK_ARG(task);
666: rpc_srv_t *s = c->cli_parent;
667: rpc_func_t *f = NULL;
668: u_char *buf = AIT_GET_BUF(&c->cli_buf);
669: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
670: int ret, estlen, wlen = sizeof(struct tagRPCCall);
671: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
672:
673: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
674: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
675: TASK_ARG(task), ts, TASK_ARG(task), 0);
676:
677: if (rpc->call_argc) {
678: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
679: if (!f) {
680: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
681: rpc->call_argc ^= rpc->call_argc;
682: rpc->call_rep.ret = RPC_ERROR(-1);
683: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
684: } else {
685: /* calc estimated length */
686: estlen = ait_resideVars(RPC_RETVARS(c)) + wlen;
687: if (estlen > AIT_LEN(&c->cli_buf))
688: AIT_RE_BUF(&c->cli_buf, estlen);
689: buf = AIT_GET_BUF(&c->cli_buf);
690: rpc = (struct tagRPCCall*) buf;
691:
692: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
693: /* Go Encapsulate variables */
694: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
695: RPC_RETVARS(c));
696: /* Free return values */
697: ait_freeVars(&c->cli_vars);
698: if (ret == -1) {
699: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
700: rpc->call_argc ^= rpc->call_argc;
701: rpc->call_rep.ret = RPC_ERROR(-1);
702: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
703: } else
704: wlen += ret;
705: }
706: }
707:
708: rpc->call_len = htonl(wlen);
709: rpc->call_io = RPC_ACK;
710:
711: /* calculate CRC */
712: rpc->call_crc ^= rpc->call_crc;
713: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
714:
715: /* send reply */
716: ret = sendto(TASK_FD(task), buf, MIN(wlen, s->srv_netbuf), MSG_NOSIGNAL,
717: &c->cli_sa.sa, c->cli_sa.sa.sa_len);
718: if (ret == -1) {
719: /* close connection */
720: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
721: TASK_ARG(task), 0, NULL, 0);
722: return NULL;
723: }
724:
725: return NULL;
726: }
727:
728: static void *
729: rxRAWPacket(sched_task_t *task)
730: {
731: rpc_srv_t *srv = TASK_ARG(task);
732: rpc_cli_t *c = NULL;
733: int len, rlen, noreply;
734: u_short crc;
735: struct tagRPCCall *rpc;
736: sockaddr_t sa;
737: socklen_t salen;
738: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
739: ait_val_t b = AIT_VAL_INIT;
740:
741: /* receive connect packet */
742: AIT_SET_BUF(&b, NULL, srv->srv_netbuf);
743: salen = sa.ss.ss_len = sizeof(sockaddr_t);
744: rlen = recvfrom(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b), 0, &sa.sa, &salen);
745: if (sa.sa.sa_family == AF_INET) {
746: struct ip *h;
747: h = (struct ip*) AIT_GET_BUF(&b);
748: if (rlen < ntohs(h->ip_len) || h->ip_p != IPPROTO_ERPC)
749: goto end;
750: else {
751: rlen -= sizeof(struct ip);
752: rpc = (struct tagRPCCall*)
753: (AIT_GET_BUF(&b) + sizeof(struct ip));
754: }
755: } else {
756: struct ip6_hdr *h;
757: h = (struct ip6_hdr*) AIT_GET_BUF(&b);
758: if (rlen < (ntohs(h->ip6_plen) + sizeof(struct ip6_hdr)) ||
759: h->ip6_nxt != IPPROTO_ERPC)
760: goto end;
761: else {
762: rlen -= sizeof(struct ip6_hdr);
763: rpc = (struct tagRPCCall*)
764: (AIT_GET_BUF(&b) + sizeof(struct ip6_hdr));
765: }
766: }
767: if (rlen < sizeof(struct tagRPCCall)) {
768: rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
769: goto end;
770: }
771:
772: /* skip loop packet */
773: if (rpc->call_io & RPC_ACK)
774: goto end;
775:
776: c = _allocClient(srv, &sa);
777: if (!c) {
778: EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
779: usleep(2000); /* blocked client delay */
780: goto end;
781: } else {
782: len = ntohl(rpc->call_len);
783: if (len > AIT_LEN(&c->cli_buf))
784: AIT_RE_BUF(&c->cli_buf, len);
785: memcpy(AIT_GET_BUF(&c->cli_buf), rpc, len);
786: rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
787:
788: c->cli_sock = TASK_FD(task);
789: memcpy(&c->cli_sa, &sa, sizeof c->cli_sa);
790:
791: /* armed timer for close stateless connection */
792: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
793: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
794: c, ts, c, 0);
795: }
796:
797: /* check integrity of packet */
798: crc = ntohs(rpc->call_crc);
799: rpc->call_crc ^= rpc->call_crc;
800: if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
801: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
802: /* close connection */
803: schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
804: c, 0, NULL, 0);
805: goto end;
806: }
807:
808: noreply = RPC_CHK_NOREPLY(rpc);
809:
810: /* check RPC packet session info */
811: if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
812: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
813:
814: rpc->call_argc ^= rpc->call_argc;
815: rpc->call_rep.ret = RPC_ERROR(-1);
816: rpc->call_rep.eno = RPC_ERROR(errno);
817: } else {
818: /* execute RPC call */
819: schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
820: }
821:
822: /* send RPC reply */
823: if (!noreply)
824: schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],
825: c, TASK_FD(task), rpc, len);
826: end:
827: AIT_FREE_VAL(&b);
828: schedReadSelf(task);
829: return NULL;
830: }
831:
832:
833: static void *
834: txBPFPacket(sched_task_t *task)
835: {
836: rpc_cli_t *c = TASK_ARG(task);
837: rpc_srv_t *s = c->cli_parent;
838: rpc_func_t *f = NULL;
839: u_char *buf = AIT_GET_BUF(&c->cli_buf);
840: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
841: int ret, len, wlen = sizeof(struct tagRPCCall);
842: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
843: struct ether_header *eh;
844: ait_val_t b = AIT_VAL_INIT;
845:
846: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
847: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
848: TASK_ARG(task), ts, TASK_ARG(task), 0);
849:
850: if (rpc->call_argc) {
851: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
852: if (!f) {
853: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
854: rpc->call_argc ^= rpc->call_argc;
855: rpc->call_rep.ret = RPC_ERROR(-1);
856: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
857: } else {
858: /* calc estimated length */
859: len = ait_resideVars(RPC_RETVARS(c)) + wlen;
860: if (len > AIT_LEN(&c->cli_buf))
861: AIT_RE_BUF(&c->cli_buf, len);
862: buf = AIT_GET_BUF(&c->cli_buf);
863: rpc = (struct tagRPCCall*) buf;
864:
865: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
866: /* Go Encapsulate variables */
867: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
868: RPC_RETVARS(c));
869: /* Free return values */
870: ait_freeVars(&RPC_RETVARS(c));
871: if (ret == -1) {
872: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
873: rpc->call_argc ^= rpc->call_argc;
874: rpc->call_rep.ret = RPC_ERROR(-1);
875: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
876: } else
877: wlen += ret;
878: }
879: }
880:
881: rpc->call_len = htonl(wlen);
882: rpc->call_io = RPC_ACK;
883:
884: /* calculate CRC */
885: rpc->call_crc ^= rpc->call_crc;
886: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
887:
888: /* send reply */
889: AIT_SET_BUF(&b, NULL, MIN(wlen, s->srv_netbuf) + ETHER_HDR_LEN);
890: eh = (struct ether_header*) AIT_GET_BUF(&b);
891: memcpy(eh->ether_dhost, LLADDR(&c->cli_sa.sdl), ETHER_ADDR_LEN);
892: eh->ether_type = htons(RPC_DEFPORT);
893: memcpy(eh + 1, buf, MIN(wlen, s->srv_netbuf));
894:
895: ret = write(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b));
896: AIT_FREE_VAL(&b);
897: if (ret == -1) {
898: /* close connection */
899: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
900: TASK_ARG(task), 0, NULL, 0);
901: return NULL;
902: }
903:
904: return NULL;
905: }
906:
907: static void *
908: rxBPFPacket(sched_task_t *task)
909: {
910: rpc_srv_t *srv = TASK_ARG(task);
911: rpc_cli_t *c = NULL;
912: int len, rlen, noreply;
913: u_short crc;
914: struct tagRPCCall *rpc;
915: sockaddr_t sa;
916: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
917: struct bpf_hdr *h;
918: struct ether_header *eh;
919: ait_val_t b = AIT_VAL_INIT;
920:
921: /* receive connect packet */
922: AIT_SET_BUF(&b, NULL, srv->srv_netbuf);
923: rlen = read(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b));
924: h = (struct bpf_hdr*) AIT_GET_BUF(&b);
925: rlen -= h->bh_hdrlen;
926: if (rlen < h->bh_datalen || h->bh_caplen != h->bh_datalen ||
927: rlen < ETHER_HDR_LEN + sizeof(struct tagRPCCall)) {
928: rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
929: goto end;
930: } else {
931: rlen = h->bh_caplen;
932: eh = (struct ether_header*) (AIT_GET_BUF(&b) + h->bh_hdrlen);
933: rlen -= ETHER_HDR_LEN;
934: rpc = (struct tagRPCCall*) (eh + 1);
935:
936: #if 0
937: /* skip loop packet */
938: if (rpc->call_io & RPC_ACK)
939: goto end;
940: #endif
941:
942: if (eh->ether_type != ntohs(RPC_DEFPORT))
943: goto end;
944: else
945: e_getlinkbymac((const ether_addr_t*) eh->ether_shost, &sa);
946: }
947:
948: c = _allocClient(srv, &sa);
949: if (!c) {
950: EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
951: usleep(2000); /* blocked client delay */
952: goto end;
953: } else {
954: len = ntohl(rpc->call_len);
955: if (len > AIT_LEN(&c->cli_buf))
956: AIT_RE_BUF(&c->cli_buf, len);
957: memcpy(AIT_GET_BUF(&c->cli_buf), rpc, len);
958: rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
959:
960: c->cli_sock = TASK_FD(task);
961: memcpy(&c->cli_sa, &sa, sizeof c->cli_sa);
962:
963: /* armed timer for close stateless connection */
964: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
965: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
966: c, ts, c, 0);
967: }
968:
969: /* check integrity of packet */
970: crc = ntohs(rpc->call_crc);
971: rpc->call_crc ^= rpc->call_crc;
972: if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
973: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
974: /* close connection */
975: schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
976: c, 0, NULL, 0);
977: goto end;
978: }
979:
980: noreply = RPC_CHK_NOREPLY(rpc);
981:
982: /* check RPC packet session info */
983: if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
984: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
985:
986: rpc->call_argc ^= rpc->call_argc;
987: rpc->call_rep.ret = RPC_ERROR(-1);
988: rpc->call_rep.eno = RPC_ERROR(errno);
989: } else {
990: /* execute RPC call */
991: schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
992: }
993:
994: /* send RPC reply */
995: if (!noreply)
996: schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],
997: c, TASK_FD(task), rpc, len);
998: end:
999: AIT_FREE_VAL(&b);
1000: schedReadSelf(task);
1001: return NULL;
1002: }
1003:
1004:
1005: static void *
1006: txEXTPacket(sched_task_t *task)
1007: {
1008: rpc_cli_t *c = TASK_ARG(task);
1009: rpc_srv_t *s = c->cli_parent;
1010: rpc_func_t *f = NULL;
1011: u_char *buf = AIT_GET_BUF(&c->cli_buf);
1012: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
1013: int ret, len, wlen = sizeof(struct tagRPCCall);
1014: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
1015:
1016: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
1017: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
1018: TASK_ARG(task), ts, TASK_ARG(task), 0);
1019:
1020: if (rpc->call_argc) {
1021: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
1022: if (!f) {
1023: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
1024: rpc->call_argc ^= rpc->call_argc;
1025: rpc->call_rep.ret = RPC_ERROR(-1);
1026: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
1027: } else {
1028: /* calc estimated length */
1029: len = ait_resideVars(RPC_RETVARS(c)) + wlen;
1030: if (len > AIT_LEN(&c->cli_buf))
1031: AIT_RE_BUF(&c->cli_buf, len);
1032: buf = AIT_GET_BUF(&c->cli_buf);
1033: rpc = (struct tagRPCCall*) buf;
1034:
1035: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
1036: /* Go Encapsulate variables */
1037: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
1038: RPC_RETVARS(c));
1039: /* Free return values */
1040: ait_freeVars(&RPC_RETVARS(c));
1041: if (ret == -1) {
1042: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
1043: rpc->call_argc ^= rpc->call_argc;
1044: rpc->call_rep.ret = RPC_ERROR(-1);
1045: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
1046: } else
1047: wlen += ret;
1048: }
1049: }
1050:
1051: rpc->call_len = htonl(wlen);
1052: rpc->call_io = RPC_ACK;
1053:
1054: /* send reply */
1055: ret = write(TASK_FD(task), buf, MIN(wlen, s->srv_netbuf));
1056: if (ret == -1) {
1057: /* close connection */
1058: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
1059: TASK_ARG(task), 0, NULL, 0);
1060: return NULL;
1061: }
1062:
1063: return NULL;
1064: }
1065:
1066: static void *
1067: rxEXTPacket(sched_task_t *task)
1068: {
1069: rpc_srv_t *srv = TASK_ARG(task);
1070: rpc_cli_t *c = NULL;
1071: int len, rlen, noreply;
1072: struct tagRPCCall *rpc;
1073: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
1074: ait_val_t b = AIT_VAL_INIT;
1075: sockaddr_t sa;
1076:
1077: memset(&sa, 0, sizeof sa);
1078: /* receive connect packet */
1079: AIT_SET_BUF(&b, NULL, srv->srv_netbuf);
1080: rlen = read(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b));
1081: if (rlen < sizeof(struct tagRPCCall)) {
1082: rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
1083: goto end;
1084: } else {
1085: rpc = (struct tagRPCCall*) AIT_GET_BUF(&b);
1086:
1087: /* skip loop packet */
1088: if (rpc->call_io & RPC_ACK)
1089: goto end;
1090: }
1091:
1092: c = _allocClient(srv, &sa);
1093: if (!c) {
1094: EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
1095: usleep(2000); /* blocked client delay */
1096: goto end;
1097: } else {
1098: len = ntohl(rpc->call_len);
1099: if (len > AIT_LEN(&c->cli_buf))
1100: AIT_RE_BUF(&c->cli_buf, len);
1101: memcpy(AIT_GET_BUF(&c->cli_buf), rpc, AIT_LEN(&c->cli_buf));
1102: rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
1103:
1104: c->cli_sock = TASK_FD(task);
1105:
1106: /* armed timer for close stateless connection */
1107: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
1108: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
1109: c, ts, c, 0);
1110: }
1111:
1112: noreply = RPC_CHK_NOREPLY(rpc);
1113:
1114: /* check RPC packet session info */
1115: if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
1116: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
1117:
1118: rpc->call_argc ^= rpc->call_argc;
1119: rpc->call_rep.ret = RPC_ERROR(-1);
1120: rpc->call_rep.eno = RPC_ERROR(errno);
1121: } else {
1122: /* execute RPC call */
1123: schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
1124: }
1125:
1126: /* send RPC reply */
1127: if (!noreply)
1128: schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],
1129: c, TASK_FD(task), rpc, len);
1130: end:
1131: AIT_FREE_VAL(&b);
1132: schedReadSelf(task);
1133: return NULL;
1134: }
1135:
1136: /* ------------------------------------------------------ */
1137:
1138: void
1139: rpc_freeBLOBCli(rpc_cli_t * __restrict c)
1140: {
1141: rpc_srv_t *s = c->cli_parent;
1142:
1143: schedCancelby(s->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
1144:
1145: /* free buffer */
1146: AIT_FREE_VAL(&c->cli_buf);
1147:
1148: array_Del(s->srv_blob.clients, c->cli_id, 0);
1149: if (c)
1150: e_free(c);
1151: }
1152:
1153:
1154: static void *
1155: closeBLOBClient(sched_task_t *task)
1156: {
1157: int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock;
1158:
1159: rpc_freeBLOBCli(TASK_ARG(task));
1160:
1161: /* close client socket */
1162: shutdown(sock, SHUT_RDWR);
1163: close(sock);
1164: return NULL;
1165: }
1166:
1167: static void *
1168: txBLOB(sched_task_t *task)
1169: {
1170: rpc_cli_t *c = TASK_ARG(task);
1171: u_char *buf = AIT_GET_BUF(&c->cli_buf);
1172: int wlen = sizeof(struct tagBLOBHdr);
1173:
1174: /* send reply */
1175: wlen = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);
1176: if (wlen == -1 || wlen != sizeof(struct tagBLOBHdr)) {
1177: /* close blob connection */
1178: schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
1179: }
1180:
1181: return NULL;
1182: }
1183:
1184: static void *
1185: rxBLOB(sched_task_t *task)
1186: {
1187: rpc_cli_t *c = TASK_ARG(task);
1188: rpc_srv_t *s = c->cli_parent;
1189: rpc_blob_t *b;
1190: struct tagBLOBHdr blob;
1191: int rlen;
1192:
1193: memset(&blob, 0, sizeof blob);
1194: rlen = recv(TASK_FD(task), &blob, sizeof blob, 0);
1195: if (rlen < 1) {
1196: /* close blob connection */
1197: schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
1198: return NULL;
1199: }
1200:
1201: /* check BLOB packet */
1202: if (rlen < sizeof(struct tagBLOBHdr)) {
1203: rpc_SetErr(ERPCMISMATCH, "Short BLOB packet");
1204:
1205: schedReadSelf(task);
1206: return NULL;
1207: }
1208:
1209: /* check RPC packet session info */
1210: if (rpc_chkPktSession(&blob.hdr_session, &s->srv_session)) {
1211: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
1212: blob.hdr_cmd = error;
1213: goto end;
1214: }
1215:
1216: /* Go to proceed packet ... */
1217: switch (blob.hdr_cmd) {
1218: case get:
1219: if (!(b = rpc_srv_getBLOB(s, ntohl(blob.hdr_var)))) {
1220: rpc_SetErr(EINVAL, "Var=%x not found", ntohl(blob.hdr_var));
1221: blob.hdr_cmd = no;
1222: blob.hdr_ret = RPC_ERROR(-1);
1223: break;
1224: } else
1225: blob.hdr_len = htonl(b->blob_len);
1226:
1227: if (rpc_srv_blobMap(s, b) != -1) {
1228: /* deliver BLOB variable to client */
1229: blob.hdr_ret = htonl(rpc_srv_sendBLOB(c, b));
1230: rpc_srv_blobUnmap(b);
1231: } else {
1232: blob.hdr_cmd = error;
1233: blob.hdr_ret = RPC_ERROR(-1);
1234: }
1235: break;
1236: case set:
1237: if ((b = rpc_srv_registerBLOB(s, ntohl(blob.hdr_len),
1238: ntohl(blob.hdr_ret)))) {
1239: /* set new BLOB variable for reply :) */
1240: blob.hdr_var = htonl(b->blob_var);
1241:
1242: /* receive BLOB from client */
1243: blob.hdr_ret = htonl(rpc_srv_recvBLOB(c, b));
1244: rpc_srv_blobUnmap(b);
1245: } else {
1246: blob.hdr_cmd = error;
1247: blob.hdr_ret = RPC_ERROR(-1);
1248: }
1249: break;
1250: case unset:
1251: if (rpc_srv_unregisterBLOB(s, ntohl(blob.hdr_var)) == -1) {
1252: blob.hdr_cmd = error;
1253: blob.hdr_ret = RPC_ERROR(-1);
1254: }
1255: break;
1256: default:
1257: rpc_SetErr(EPROCUNAVAIL, "Unsupported BLOB command %d", blob.hdr_cmd);
1258: blob.hdr_cmd = error;
1259: blob.hdr_ret = RPC_ERROR(-1);
1260: }
1261:
1262: end:
1263: memcpy(AIT_ADDR(&c->cli_buf), &blob, sizeof blob);
1264: schedWrite(TASK_ROOT(task), txBLOB, TASK_ARG(task), TASK_FD(task), NULL, 0);
1265: schedReadSelf(task);
1266: return NULL;
1267: }
1268:
1269: static void *
1270: flushBLOB(sched_task_t *task)
1271: {
1272: uintptr_t sigArg = atomic_load_acq_ptr(&_glSigArg);
1273: rpc_srv_t *srv = sigArg ? (void*) sigArg : TASK_ARG(task);
1274: rpc_blob_t *b, *tmp;
1275:
1276: TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
1277: TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
1278:
1279: rpc_srv_blobFree(srv, b);
1280: e_free(b);
1281: }
1282:
1283: if (!schedSignalSelf(task)) {
1284: /* disabled kqueue support in libaitsched */
1285: struct sigaction sa;
1286:
1287: memset(&sa, 0, sizeof sa);
1288: sigemptyset(&sa.sa_mask);
1289: sa.sa_handler = (void (*)(int)) flushBLOB;
1290: sa.sa_flags = SA_RESTART | SA_RESETHAND;
1291: sigaction(SIGFBLOB, &sa, NULL);
1292: }
1293:
1294: return NULL;
1295: }
1296:
1297: static void *
1298: acceptBLOBClients(sched_task_t *task)
1299: {
1300: rpc_srv_t *srv = TASK_ARG(task);
1301: rpc_cli_t *c = NULL;
1302: register int i;
1303: socklen_t salen = sizeof(sockaddr_t);
1304: int sock;
1305: #ifdef TCP_NOPUSH
1306: int n = 1;
1307: #endif
1308:
1309: /* check free slots for connect */
1310: for (i = 0; i < array_Size(srv->srv_blob.clients) &&
1311: (c = array(srv->srv_blob.clients, i, rpc_cli_t*)); i++);
1312: if (c) { /* no more free slots! */
1313: EVERBOSE(1, "BLOB client quota exceeded! Connection will be shutdown!\n");
1314: if ((sock = accept(TASK_FD(task), NULL, NULL)) != -1) {
1315: shutdown(sock, SHUT_RDWR);
1316: close(sock);
1317: }
1318: goto end;
1319: }
1320:
1321: c = e_malloc(sizeof(rpc_cli_t));
1322: if (!c) {
1323: LOGERR;
1324: srv->srv_kill = srv->srv_blob.kill = 1;
1325: return NULL;
1326: } else {
1327: memset(c, 0, sizeof(rpc_cli_t));
1328: array_Set(srv->srv_blob.clients, i, c);
1329: c->cli_id = i;
1330: c->cli_parent = srv;
1331: }
1332:
1333: /* alloc empty buffer */
1334: AIT_SET_BUFSIZ(&c->cli_buf, 0, srv->srv_netbuf);
1335:
1336: /* accept client */
1337: c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
1338: if (c->cli_sock == -1) {
1339: LOGERR;
1340: AIT_FREE_VAL(&c->cli_buf);
1341: array_Del(srv->srv_blob.clients, i, 42);
1342: goto end;
1343: } else {
1344: #ifdef TCP_NOPUSH
1345: setsockopt(c->cli_sock, IPPROTO_TCP, TCP_NOPUSH, &n, sizeof n);
1346: #endif
1347: fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
1348: }
1349:
1350: schedRead(TASK_ROOT(task), rxBLOB, c, c->cli_sock, NULL, 0);
1351: end:
1352: schedReadSelf(task);
1353: return NULL;
1354: }
1355:
1356: /* ------------------------------------------------------ */
1357:
1358: /*
1359: * rpc_srv_initBLOBServer() - Init & create BLOB Server
1360: *
1361: * @srv = RPC server instance
1362: * @Port = Port for bind server, if Port == 0 default port is selected
1363: * @diskDir = Disk place for BLOB file objects
1364: * return: -1 == error or 0 bind and created BLOB server instance
1365: */
1366: int
1367: rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)
1368: {
1369: int n = 1;
1370:
1371: if (!srv || srv->srv_kill) {
1372: rpc_SetErr(EINVAL, "Invalid parameters can`t init BLOB server");
1373: return -1;
1374: }
1375:
1376: memset(&srv->srv_blob, 0, sizeof srv->srv_blob);
1377: if (access(diskDir, R_OK | W_OK) == -1) {
1378: LOGERR;
1379: return -1;
1380: } else
1381: AIT_SET_STR(&srv->srv_blob.dir, diskDir);
1382:
1383: /* init blob list */
1384: TAILQ_INIT(&srv->srv_blob.blobs);
1385:
1386: srv->srv_blob.server.cli_parent = srv;
1387:
1388: memcpy(&srv->srv_blob.server.cli_sa, &srv->srv_server.cli_sa, sizeof(sockaddr_t));
1389: switch (srv->srv_blob.server.cli_sa.sa.sa_family) {
1390: case AF_INET:
1391: srv->srv_blob.server.cli_sa.sin.sin_port =
1392: htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin.sin_port) + 1);
1393: break;
1394: case AF_INET6:
1395: srv->srv_blob.server.cli_sa.sin6.sin6_port =
1396: htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin6.sin6_port) + 1);
1397: break;
1398: case AF_LOCAL:
1399: strlcat(srv->srv_blob.server.cli_sa.sun.sun_path, ".blob",
1400: sizeof srv->srv_blob.server.cli_sa.sun.sun_path);
1401: break;
1402: default:
1403: AIT_FREE_VAL(&srv->srv_blob.dir);
1404: return -1;
1405: }
1406:
1407: /* create BLOB server socket */
1408: srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);
1409: if (srv->srv_blob.server.cli_sock == -1) {
1410: LOGERR;
1411: AIT_FREE_VAL(&srv->srv_blob.dir);
1412: return -1;
1413: }
1414: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
1415: LOGERR;
1416: close(srv->srv_blob.server.cli_sock);
1417: AIT_FREE_VAL(&srv->srv_blob.dir);
1418: return -1;
1419: }
1420: n = srv->srv_netbuf;
1421: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
1422: LOGERR;
1423: close(srv->srv_blob.server.cli_sock);
1424: AIT_FREE_VAL(&srv->srv_blob.dir);
1425: return -1;
1426: }
1427: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
1428: LOGERR;
1429: close(srv->srv_blob.server.cli_sock);
1430: AIT_FREE_VAL(&srv->srv_blob.dir);
1431: return -1;
1432: }
1433: if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa,
1434: srv->srv_blob.server.cli_sa.sa.sa_len) == -1) {
1435: LOGERR;
1436: close(srv->srv_blob.server.cli_sock);
1437: AIT_FREE_VAL(&srv->srv_blob.dir);
1438: return -1;
1439: } else
1440: fcntl(srv->srv_blob.server.cli_sock, F_SETFL,
1441: fcntl(srv->srv_blob.server.cli_sock, F_GETFL) | O_NONBLOCK);
1442:
1443:
1444: /* allocate pool for concurent blob clients */
1445: srv->srv_blob.clients = array_Init(array_Size(srv->srv_clients));
1446: if (!srv->srv_blob.clients) {
1447: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
1448: close(srv->srv_blob.server.cli_sock);
1449: AIT_FREE_VAL(&srv->srv_blob.dir);
1450: return -1;
1451: }
1452:
1453: /* init blob scheduler */
1454: srv->srv_blob.root = schedBegin();
1455: if (!srv->srv_blob.root) {
1456: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1457: array_Destroy(&srv->srv_blob.clients);
1458: close(srv->srv_blob.server.cli_sock);
1459: AIT_FREE_VAL(&srv->srv_blob.dir);
1460: return -1;
1461: }
1462:
1463: return 0;
1464: }
1465:
1466: /*
1467: * rpc_srv_endBLOBServer() - Destroy BLOB server, close all opened sockets and free resources
1468: *
1469: * @srv = RPC Server instance
1470: * return: none
1471: */
1472: void
1473: rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
1474: {
1475: if (!srv)
1476: return;
1477:
1478: srv->srv_blob.kill = 1;
1479:
1480: schedEnd(&srv->srv_blob.root);
1481: }
1482:
1483: /*
1484: * rpc_srv_loopBLOBServer() - Execute Main BLOB server loop and wait for clients requests
1485: *
1486: * @srv = RPC Server instance
1487: * return: -1 error or 0 ok, infinite loop ...
1488: */
1489: int
1490: rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv)
1491: {
1492: rpc_cli_t *c;
1493: register int i;
1494: rpc_blob_t *b, *tmp;
1495: struct timespec ts = { RPC_SCHED_POLLING, 0 };
1496:
1497: if (!srv || srv->srv_kill) {
1498: rpc_SetErr(EINVAL, "Invalid parameter can`t start BLOB server");
1499: return -1;
1500: }
1501:
1502: if (listen(srv->srv_blob.server.cli_sock, array_Size(srv->srv_blob.clients)) == -1) {
1503: LOGERR;
1504: return -1;
1505: }
1506:
1507: if (!schedSignal(srv->srv_blob.root, flushBLOB, srv, SIGFBLOB, NULL, 0)) {
1508: /* disabled kqueue support in libaitsched */
1509: struct sigaction sa;
1510:
1511: atomic_store_rel_ptr(&_glSigArg, (uintptr_t) srv);
1512:
1513: memset(&sa, 0, sizeof sa);
1514: sigemptyset(&sa.sa_mask);
1515: sa.sa_handler = (void (*)(int)) flushBLOB;
1516: sa.sa_flags = SA_RESTART | SA_RESETHAND;
1517: sigaction(SIGFBLOB, &sa, NULL);
1518: }
1519:
1520: if (!schedRead(srv->srv_blob.root, acceptBLOBClients, srv,
1521: srv->srv_blob.server.cli_sock, NULL, 0)) {
1522: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1523: return -1;
1524: }
1525:
1526: schedPolling(srv->srv_blob.root, &ts, NULL);
1527: /* main rpc loop */
1528: schedRun(srv->srv_blob.root, &srv->srv_blob.kill);
1529:
1530: /* detach blobs */
1531: TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
1532: TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
1533:
1534: rpc_srv_blobFree(srv, b);
1535: e_free(b);
1536: }
1537:
1538: /* close all clients connections & server socket */
1539: for (i = 0; i < array_Size(srv->srv_blob.clients); i++) {
1540: c = array(srv->srv_blob.clients, i, rpc_cli_t*);
1541: if (c) {
1542: shutdown(c->cli_sock, SHUT_RDWR);
1543: close(c->cli_sock);
1544:
1545: schedCancelby(srv->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
1546: AIT_FREE_VAL(&c->cli_buf);
1547: }
1548: array_Del(srv->srv_blob.clients, i, 42);
1549: }
1550: array_Destroy(&srv->srv_blob.clients);
1551:
1552: close(srv->srv_blob.server.cli_sock);
1553:
1554: AIT_FREE_VAL(&srv->srv_blob.dir);
1555: return 0;
1556: }
1557:
1558:
1559: /*
1560: * rpc_srv_initServer() - Init & create RPC Server
1561: *
1562: * @InstID = Instance for authentication & recognition
1563: * @concurentClients = Concurent clients at same time to this server
1564: * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
1565: * @csHost = Host name or address for bind server, if NULL any address
1566: * @Port = Port for bind server, if Port == 0 default port is selected
1567: * @proto = Protocol, if == 0 choose SOCK_STREAM
1568: * return: NULL == error or !=NULL bind and created RPC server instance
1569: */
1570: rpc_srv_t *
1571: rpc_srv_initServer(u_char InstID, int concurentClients, int netBuf,
1572: const char *csHost, u_short Port, int proto)
1573: {
1574: int n = 1;
1575: rpc_srv_t *srv = NULL;
1576: sockaddr_t sa = E_SOCKADDR_INIT;
1577:
1578: if (!concurentClients || (proto < 0 || proto > SOCK_RAW)) {
1579: rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
1580: return NULL;
1581: }
1582: if (!e_gethostbyname(csHost, Port, &sa))
1583: return NULL;
1584: if (!Port)
1585: Port = RPC_DEFPORT;
1586: if (!proto)
1587: proto = SOCK_STREAM;
1588: if (netBuf < RPC_MIN_BUFSIZ)
1589: netBuf = BUFSIZ;
1590: else
1591: netBuf = E_ALIGN(netBuf, 2); /* align netBuf length */
1592:
1593: #ifdef HAVE_SRANDOMDEV
1594: srandomdev();
1595: #else
1596: time_t tim;
1597:
1598: srandom((time(&tim) ^ getpid()));
1599: #endif
1600:
1601: srv = e_malloc(sizeof(rpc_srv_t));
1602: if (!srv) {
1603: LOGERR;
1604: return NULL;
1605: } else
1606: memset(srv, 0, sizeof(rpc_srv_t));
1607:
1608: srv->srv_proto = proto;
1609: srv->srv_netbuf = netBuf;
1610: srv->srv_session.sess_version = RPC_VERSION;
1611: srv->srv_session.sess_instance = InstID;
1612:
1613: srv->srv_server.cli_parent = srv;
1614: memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa);
1615:
1616: /* init functions */
1617: pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
1618: SLIST_INIT(&srv->srv_funcs);
1619: AVL_INIT(&srv->srv_funcs);
1620:
1621: /* init scheduler */
1622: srv->srv_root = schedBegin();
1623: if (!srv->srv_root) {
1624: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1625: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1626: e_free(srv);
1627: return NULL;
1628: }
1629:
1630: /* init pool for clients */
1631: srv->srv_clients = array_Init(concurentClients);
1632: if (!srv->srv_clients) {
1633: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
1634: schedEnd(&srv->srv_root);
1635: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1636: e_free(srv);
1637: return NULL;
1638: }
1639:
1640: /* create server socket */
1641: srv->srv_server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family,
1642: srv->srv_proto, srv->srv_proto == SOCK_RAW ? IPPROTO_ERPC : 0);
1643: if (srv->srv_server.cli_sock == -1) {
1644: LOGERR;
1645: array_Destroy(&srv->srv_clients);
1646: schedEnd(&srv->srv_root);
1647: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1648: e_free(srv);
1649: return NULL;
1650: }
1651: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
1652: LOGERR;
1653: goto err;
1654: }
1655: n = srv->srv_netbuf;
1656: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
1657: LOGERR;
1658: goto err;
1659: }
1660: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
1661: LOGERR;
1662: goto err;
1663: }
1664: if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa,
1665: srv->srv_server.cli_sa.sa.sa_len) == -1) {
1666: LOGERR;
1667: goto err;
1668: } else
1669: fcntl(srv->srv_server.cli_sock, F_SETFL,
1670: fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
1671:
1672: rpc_register_srvPing(srv);
1673:
1674: return srv;
1675: err: /* error condition */
1676: close(srv->srv_server.cli_sock);
1677: array_Destroy(&srv->srv_clients);
1678: schedEnd(&srv->srv_root);
1679: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1680: e_free(srv);
1681: return NULL;
1682: }
1683:
1684: /*
1685: * rpc_srv_endServer() - Destroy RPC server, close all opened sockets and free resources
1686: *
1687: * @psrv = RPC Server instance
1688: * return: none
1689: */
1690: void
1691: rpc_srv_endServer(rpc_srv_t ** __restrict psrv)
1692: {
1693: if (!psrv || !*psrv)
1694: return;
1695:
1696: /* if send kill to blob server */
1697: rpc_srv_endBLOBServer(*psrv);
1698:
1699: (*psrv)->srv_kill = 1;
1700: sleep(RPC_SCHED_POLLING);
1701:
1702: schedEnd(&(*psrv)->srv_root);
1703:
1704: pthread_mutex_destroy(&(*psrv)->srv_funcs.mtx);
1705: e_free(*psrv);
1706: *psrv = NULL;
1707: }
1708:
1709: /*
1710: * rpc_srv_loopServer() - Execute Main server loop and wait for clients requests
1711: *
1712: * @srv = RPC Server instance
1713: * return: -1 error or 0 ok, infinite loop ...
1714: */
1715: int
1716: rpc_srv_loopServer(rpc_srv_t * __restrict srv)
1717: {
1718: rpc_cli_t *c;
1719: register int i;
1720: rpc_func_t *f;
1721: struct timespec ts = { RPC_SCHED_POLLING, 0 };
1722:
1723: if (!srv) {
1724: rpc_SetErr(EINVAL, "Invalid parameter can`t start RPC server");
1725: return -1;
1726: }
1727:
1728: if (srv->srv_proto == SOCK_STREAM)
1729: if (listen(srv->srv_server.cli_sock, array_Size(srv->srv_clients)) == -1) {
1730: LOGERR;
1731: return -1;
1732: }
1733:
1734: if (!schedRead(srv->srv_root, cbProto[srv->srv_proto][CB_ACCEPTCLIENT], srv,
1735: srv->srv_server.cli_sock, NULL, 0)) {
1736: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1737: return -1;
1738: }
1739:
1740: schedPolling(srv->srv_root, &ts, NULL);
1741: /* main rpc loop */
1742: schedRun(srv->srv_root, &srv->srv_kill);
1743:
1744: /* close all clients connections & server socket */
1745: for (i = 0; i < array_Size(srv->srv_clients); i++) {
1746: c = array(srv->srv_clients, i, rpc_cli_t*);
1747: if (c) {
1748: if (srv->srv_proto == SOCK_STREAM) {
1749: shutdown(c->cli_sock, SHUT_RDWR);
1750: close(c->cli_sock);
1751: }
1752:
1753: schedCancelby(srv->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
1754: ait_freeVars(&RPC_RETVARS(c));
1755: AIT_FREE_VAL(&c->cli_buf);
1756: }
1757: array_Del(srv->srv_clients, i, 42);
1758: }
1759: array_Destroy(&srv->srv_clients);
1760:
1761: if (srv->srv_proto != SOCK_EXT)
1762: close(srv->srv_server.cli_sock);
1763:
1764: /* detach exported calls */
1765: RPC_FUNCS_LOCK(&srv->srv_funcs);
1766: while ((f = SLIST_FIRST(&srv->srv_funcs))) {
1767: SLIST_REMOVE_HEAD(&srv->srv_funcs, func_next);
1768:
1769: AIT_FREE_VAL(&f->func_name);
1770: e_free(f);
1771: }
1772: srv->srv_funcs.avlh_root = NULL;
1773: RPC_FUNCS_UNLOCK(&srv->srv_funcs);
1774:
1775: return 0;
1776: }
1777:
1778:
1779: /*
1780: * rpc_srv_execCall() Execute registered call from RPC server
1781: *
1782: * @cli = RPC client
1783: * @rpc = IN RPC call structure
1784: * @funcname = Execute RPC function
1785: * @args = IN RPC calling arguments from RPC client
1786: * return: -1 error, !=-1 ok
1787: */
1788: int
1789: rpc_srv_execCall(rpc_cli_t * __restrict cli, struct tagRPCCall * __restrict rpc,
1790: ait_val_t funcname, array_t * __restrict args)
1791: {
1792: rpc_callback_t func;
1793:
1794: if (!cli || !rpc || !AIT_ADDR(&funcname)) {
1795: rpc_SetErr(EINVAL, "Invalid parameter can`t exec function");
1796: return -1;
1797: }
1798:
1799: func = AIT_GET_LIKE(&funcname, rpc_callback_t);
1800: return func(cli, rpc, args);
1801: }
1802:
1803:
1804: /*
1805: * rpc_srv_initServer2() - Init & create layer2 RPC Server
1806: *
1807: * @InstID = Instance for authentication & recognition
1808: * @concurentClients = Concurent clients at same time to this server
1809: * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
1810: * @csIface = Interface name for bind server, if NULL first interface on host
1811: * return: NULL == error or !=NULL bind and created RPC server instance
1812: */
1813: rpc_srv_t *
1814: rpc_srv_initServer2(u_char InstID, int concurentClients, int netBuf, const char *csIface)
1815: {
1816: int n = 1;
1817: rpc_srv_t *srv = NULL;
1818: sockaddr_t sa = E_SOCKADDR_INIT;
1819: char szIface[64], szStr[STRSIZ];
1820: register int i;
1821: struct ifreq ifr;
1822: struct bpf_insn insns[] = {
1823: BPF_STMT(BPF_LD + BPF_H + BPF_ABS, 12),
1824: BPF_JUMP(BPF_JMP + BPF_JEQ + BPF_K, RPC_DEFPORT, 0, 1),
1825: BPF_STMT(BPF_RET + BPF_K, -1),
1826: BPF_STMT(BPF_RET + BPF_K, 0),
1827: };
1828: struct bpf_program fcode = {
1829: .bf_len = sizeof(insns) / sizeof(struct bpf_insn),
1830: .bf_insns = insns
1831: };
1832:
1833: if (!concurentClients) {
1834: rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
1835: return NULL;
1836: }
1837: if (!csIface) {
1838: if (e_get1stiface(szIface, sizeof szIface))
1839: return NULL;
1840: } else
1841: strlcpy(szIface, csIface, sizeof szIface);
1842: if (!e_getifacebyname(szIface, &sa))
1843: return NULL;
1844:
1845: #ifdef HAVE_SRANDOMDEV
1846: srandomdev();
1847: #else
1848: time_t tim;
1849:
1850: srandom((time(&tim) ^ getpid()));
1851: #endif
1852:
1853: srv = e_malloc(sizeof(rpc_srv_t));
1854: if (!srv) {
1855: LOGERR;
1856: return NULL;
1857: } else
1858: memset(srv, 0, sizeof(rpc_srv_t));
1859:
1860: srv->srv_proto = SOCK_BPF;
1861: srv->srv_netbuf = netBuf;
1862: srv->srv_session.sess_version = RPC_VERSION;
1863: srv->srv_session.sess_instance = InstID;
1864:
1865: srv->srv_server.cli_parent = srv;
1866: memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa);
1867:
1868: /* init functions */
1869: pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
1870: SLIST_INIT(&srv->srv_funcs);
1871: AVL_INIT(&srv->srv_funcs);
1872:
1873: /* init scheduler */
1874: srv->srv_root = schedBegin();
1875: if (!srv->srv_root) {
1876: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1877: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1878: e_free(srv);
1879: return NULL;
1880: }
1881:
1882: /* init pool for clients */
1883: srv->srv_clients = array_Init(concurentClients);
1884: if (!srv->srv_clients) {
1885: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
1886: schedEnd(&srv->srv_root);
1887: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1888: e_free(srv);
1889: return NULL;
1890: }
1891:
1892: /* create server handler */
1893: for (i = 0; i < 10; i++) {
1894: memset(szStr, 0, sizeof szStr);
1895: snprintf(szStr, sizeof szStr, "/dev/bpf%d", i);
1896: srv->srv_server.cli_sock = open(szStr, O_RDWR);
1897: if (srv->srv_server.cli_sock > STDERR_FILENO)
1898: break;
1899: }
1900: if (srv->srv_server.cli_sock < 3) {
1901: LOGERR;
1902: array_Destroy(&srv->srv_clients);
1903: schedEnd(&srv->srv_root);
1904: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1905: e_free(srv);
1906: return NULL;
1907: }
1908:
1909: if (ioctl(srv->srv_server.cli_sock, BIOCIMMEDIATE, &n) == -1) {
1910: LOGERR;
1911: goto err;
1912: }
1913: if (ioctl(srv->srv_server.cli_sock, BIOCSETF, &fcode) == -1) {
1914: LOGERR;
1915: goto err;
1916: }
1917: n = (netBuf < RPC_MIN_BUFSIZ) ? getpagesize() : E_ALIGN(netBuf, 2);
1918: if (ioctl(srv->srv_server.cli_sock, BIOCSBLEN, &n) == -1) {
1919: LOGERR;
1920: goto err;
1921: } else
1922: srv->srv_netbuf = n;
1923:
1924: memset(&ifr, 0, sizeof ifr);
1925: strlcpy(ifr.ifr_name, szIface, sizeof ifr.ifr_name);
1926: if (ioctl(srv->srv_server.cli_sock, BIOCSETIF, &ifr) == -1) {
1927: LOGERR;
1928: goto err;
1929: } else
1930: fcntl(srv->srv_server.cli_sock, F_SETFL,
1931: fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
1932:
1933: rpc_register_srvPing(srv);
1934:
1935: return srv;
1936: err: /* error condition */
1937: close(srv->srv_server.cli_sock);
1938: array_Destroy(&srv->srv_clients);
1939: schedEnd(&srv->srv_root);
1940: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1941: e_free(srv);
1942: return NULL;
1943: }
1944:
1945: /*
1946: * rpc_srv_initServerExt() - Init & create pipe RPC Server
1947: *
1948: * @InstID = Instance for authentication & recognition
1949: * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
1950: * @fd = File descriptor
1951: * return: NULL == error or !=NULL bind and created RPC server instance
1952: */
1953: rpc_srv_t *
1954: rpc_srv_initServerExt(u_char InstID, int netBuf, int fd)
1955: {
1956: rpc_srv_t *srv = NULL;
1957:
1958: #ifdef HAVE_SRANDOMDEV
1959: srandomdev();
1960: #else
1961: time_t tim;
1962:
1963: srandom((time(&tim) ^ getpid()));
1964: #endif
1965:
1966: srv = e_malloc(sizeof(rpc_srv_t));
1967: if (!srv) {
1968: LOGERR;
1969: return NULL;
1970: } else
1971: memset(srv, 0, sizeof(rpc_srv_t));
1972:
1973: srv->srv_proto = SOCK_EXT;
1974: srv->srv_netbuf = (netBuf < RPC_MIN_BUFSIZ) ?
1975: getpagesize() : E_ALIGN(netBuf, 2);
1976: srv->srv_session.sess_version = RPC_VERSION;
1977: srv->srv_session.sess_instance = InstID;
1978:
1979: srv->srv_server.cli_parent = srv;
1980: srv->srv_server.cli_sock = fd;
1981:
1982: /* init functions */
1983: pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
1984: SLIST_INIT(&srv->srv_funcs);
1985: AVL_INIT(&srv->srv_funcs);
1986:
1987: /* init scheduler */
1988: srv->srv_root = schedBegin();
1989: if (!srv->srv_root) {
1990: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1991: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1992: e_free(srv);
1993: return NULL;
1994: }
1995:
1996: /* init pool for clients */
1997: srv->srv_clients = array_Init(1);
1998: if (!srv->srv_clients) {
1999: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
2000: schedEnd(&srv->srv_root);
2001: pthread_mutex_destroy(&srv->srv_funcs.mtx);
2002: e_free(srv);
2003: return NULL;
2004: }
2005:
2006: fcntl(srv->srv_server.cli_sock, F_SETFL,
2007: fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
2008:
2009: rpc_register_srvPing(srv);
2010:
2011: return srv;
2012: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>