1: /*************************************************************************
2: * (C) 2010 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
6: * $Id: srv.c,v 1.17.4.6 2013/08/21 13:01:44 misho Exp $
7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013
16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47:
48:
49: /* SOCK_STREAM */
50: static void *acceptClients(sched_task_t *);
51: static void *closeClient(sched_task_t *);
52: static void *rxPacket(sched_task_t *);
53: static void *txPacket(sched_task_t *);
54:
55: /* SOCK_DGRAM */
56: static void *freeClient(sched_task_t *);
57: static void *rxUDPPacket(sched_task_t *);
58: static void *txUDPPacket(sched_task_t *);
59:
60: /* SOCK_RAW */
61:
62: static sched_task_func_t cbProto[SOCK_RAW + 1][4] = {
63: { acceptClients, closeClient, rxPacket, txPacket }, /* SOCK_STREAM */
64: { acceptClients, closeClient, rxPacket, txPacket }, /* SOCK_STREAM */
65: { rxUDPPacket, freeClient, rxUDPPacket, txUDPPacket }, /* SOCK_DGRAM */
66: { NULL, NULL, NULL, NULL } /* SOCK_RAW */
67: };
68:
69:
70: static inline u_char *
71: getBuffer(rpc_cli_t * __restrict c)
72: {
73: u_char *b = NULL;
74:
75: assert(c);
76:
77: if (RPC_ISNEXTBUF(c))
78: b = AIT_GET_BUF(array(c->cli_buf, 1, ait_val_t*));
79: else
80: b = AIT_GET_BUF(array(c->cli_buf, 0, ait_val_t*)) +
81: sizeof(struct tagRPCCall);
82:
83: return b;
84: }
85:
86: static inline struct tagRPCCall *
87: getHeader(rpc_cli_t * __restrict c)
88: {
89: assert(c);
90:
91: return (struct tagRPCCall*) AIT_GET_BUF(array(c->cli_buf, 0, ait_val_t*));
92: }
93:
94: void
95: rpc_freeCli(rpc_cli_t * __restrict c)
96: {
97: rpc_srv_t *s = c->cli_parent;
98:
99: schedCancelby(s->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
100:
101: /* free buffer(s) */
102: ait_freeVars(&c->cli_buf);
103:
104: array_Del(s->srv_clients, c->cli_id, 0);
105: if (c)
106: e_free(c);
107: }
108:
109:
110: static inline int
111: _check4freeslot(rpc_srv_t * __restrict srv, sockaddr_t * __restrict sa)
112: {
113: rpc_cli_t *c = NULL;
114: register int i;
115:
116: /* check free slots for connect */
117: for (i = 0; i < array_Size(srv->srv_clients) &&
118: (c = array(srv->srv_clients, i, rpc_cli_t*)); i++)
119: /* check for duplicates */
120: if (sa && !e_addrcmp(&c->cli_sa, sa, 42))
121: break;
122: if (i >= array_Size(srv->srv_clients))
123: return -1; /* no more free slots! */
124:
125: return i;
126: }
127:
128: static rpc_cli_t *
129: _allocClient(rpc_srv_t * __restrict srv, sockaddr_t * __restrict sa)
130: {
131: rpc_cli_t *c = NULL;
132: int n;
133:
134: n = _check4freeslot(srv, sa);
135: if (n == -1)
136: return NULL;
137: else
138: c = array(srv->srv_clients, n, rpc_cli_t*);
139:
140: if (!c) {
141: c = e_malloc(sizeof(rpc_cli_t));
142: if (!c) {
143: LOGERR;
144: srv->srv_kill = 1;
145: return NULL;
146: } else {
147: memset(c, 0, sizeof(rpc_cli_t));
148: array_Set(srv->srv_clients, n, c);
149: c->cli_id = n;
150: c->cli_parent = srv;
151: }
152:
153: /* init buffer(s) */
154: c->cli_buf = ait_allocVars(2);
155: if (!c->cli_buf) {
156: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
157: array_Del(srv->srv_clients, n, 42);
158: return NULL;
159: } else
160: AIT_SET_BUFSIZ(array(c->cli_buf, 0, ait_val_t*), 0, srv->srv_netbuf);
161: }
162:
163: return c;
164: }
165:
166:
167: static void *
168: freeClient(sched_task_t *task)
169: {
170: rpc_freeCli(TASK_ARG(task));
171:
172: return NULL;
173: }
174:
175: static void *
176: closeClient(sched_task_t *task)
177: {
178: int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock;
179:
180: rpc_freeCli(TASK_ARG(task));
181:
182: /* close client socket */
183: shutdown(sock, SHUT_RDWR);
184: close(sock);
185: return NULL;
186: }
187:
188: static void *
189: txPacket(sched_task_t *task)
190: {
191: rpc_cli_t *c = TASK_ARG(task);
192: rpc_srv_t *s = c->cli_parent;
193: rpc_func_t *f = NULL;
194: u_char *buf;
195: struct tagRPCCall *rpc = (struct tagRPCCall*) TASK_DATA(task);
196: int ret, wlen = sizeof(struct tagRPCCall);
197: int len = sizeof(struct tagRPCCall) + ntohl(rpc->call_len);
198:
199: buf = e_malloc(len);
200: if (!buf) {
201: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
202: /* close connection */
203: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
204: TASK_ARG(task), 0, NULL, 0);
205: return NULL;
206: } else {
207: /* copy RPC header */
208: memcpy(buf, rpc, wlen);
209: rpc = (struct tagRPCCall*) buf;
210: }
211:
212: if (rpc->call_argc) {
213: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
214: if (!f) {
215: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
216:
217: rpc->call_argc ^= rpc->call_argc;
218: rpc->call_rep.ret = RPC_ERROR(-1);
219: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
220: } else {
221: rpc->call_argc = htons(array_Size(RPC_RETVARS(c)));
222: /* Go Encapsulate variables */
223: ret = ait_vars2buffer(buf + wlen, len - wlen, RPC_RETVARS(c));
224: /* Free return values */
225: ait_freeVars(&c->cli_vars);
226: if (ret == -1) {
227: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
228:
229: rpc->call_argc ^= rpc->call_argc;
230: rpc->call_rep.ret = RPC_ERROR(-1);
231: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
232: } else
233: wlen += ret;
234: }
235: }
236:
237: rpc->call_len = htonl(wlen);
238:
239: /* send reply */
240: ret = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);
241: if (ret == -1 || ret != wlen) {
242: /* close connection */
243: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
244: TASK_ARG(task), 0, NULL, 0);
245: }
246:
247: e_free(buf);
248: return NULL;
249: }
250:
251: static void *
252: execCall(sched_task_t *task)
253: {
254: rpc_cli_t *c = TASK_ARG(task);
255: rpc_srv_t *s = c->cli_parent;
256: rpc_func_t *f = NULL;
257: array_t *arr = NULL;
258: u_char *buf = getBuffer(c);
259: struct tagRPCCall *rpc = getHeader(c);
260: int argc = ntohs(rpc->call_argc);
261:
262: /* Go decapsulate variables ... */
263: if (argc) {
264: arr = ait_buffer2vars(buf, ntohl(rpc->call_len), argc, 42);
265: if (!arr) {
266: rpc_SetErr(ERPCMISMATCH, "#%d - %s", elwix_GetErrno(), elwix_GetError());
267:
268: rpc->call_argc ^= rpc->call_argc;
269: rpc->call_rep.ret = RPC_ERROR(-1);
270: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
271: return NULL;
272: }
273: } else
274: arr = NULL;
275:
276: if (!(f = rpc_srv_getCall(s, ntohs(rpc->call_tag)))) {
277: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
278:
279: rpc->call_argc ^= rpc->call_argc;
280: rpc->call_rep.ret = RPC_ERROR(-1);
281: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
282: } else {
283: /* if client doesn't want reply */
284: argc = RPC_CHK_NOREPLY(rpc);
285: rpc->call_rep.ret = RPC_ERROR(rpc_srv_execCall(c, rpc, f->func_name, arr));
286: if (rpc->call_rep.ret == htonl(-1)) {
287: rpc->call_rep.eno = RPC_ERROR(errno);
288: rpc->call_argc ^= rpc->call_argc;
289: } else {
290: rpc->call_rep.eno ^= rpc->call_rep.eno;
291: if (argc) {
292: /* without reply */
293: ait_freeVars(&c->cli_vars);
294: rpc->call_argc ^= rpc->call_argc;
295: } else {
296: /* reply */
297: rpc->call_argc = htons(array_Size(RPC_RETVARS(c)));
298: }
299: }
300: }
301:
302: array_Destroy(&arr);
303: return NULL;
304: }
305:
306: static void *
307: rxPacket(sched_task_t *task)
308: {
309: rpc_cli_t *c = TASK_ARG(task);
310: rpc_srv_t *s = c->cli_parent;
311: int len, rlen, noreply;
312: ait_val_t *bufz = array(c->cli_buf, 0, ait_val_t*);
313: u_char *buf = (u_char*) AIT_GET_BUF(bufz);
314: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
315:
316: memset(buf, 0, AIT_LEN(bufz));
317: /* 1st buffer is last */
318: RPC_CLR_NEXTBUF(c);
319:
320: /* read rpc header */
321: rlen = recv(TASK_FD(task), rpc, MIN(sizeof(struct tagRPCCall), AIT_LEN(bufz)), 0);
322: if (rlen < sizeof(struct tagRPCCall) ||
323: ntohl(rpc->call_len) < sizeof(struct tagRPCCall)) {
324: /* close connection */
325: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
326: TASK_ARG(task), 0, NULL, 0);
327: return NULL;
328: } else {
329: buf += sizeof(struct tagRPCCall);
330: len = ntohl(rpc->call_len);
331: }
332:
333: if (len > (AIT_LEN(bufz) - sizeof(struct tagRPCCall))) {
334: /* add extra buffer */
335: if (!(bufz = ait_getVars(&c->cli_buf, 1))) {
336: /* close connection */
337: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
338: TASK_ARG(task), 0, NULL, 0);
339: return NULL;
340: } else {
341: AIT_FREE_VAL(bufz);
342: AIT_SET_BUFSIZ(bufz, 0, len);
343: buf = AIT_GET_BUF(bufz);
344: }
345: /* buffer isnt last */
346: RPC_SET_NEXTBUF(c);
347: }
348:
349: /* read payload */
350: rlen = recv(TASK_FD(task), buf, len, 0);
351: if (rlen < len) {
352: /* close connection */
353: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
354: TASK_ARG(task), 0, NULL, 0);
355: return NULL;
356: }
357:
358: noreply = RPC_CHK_NOREPLY(rpc);
359:
360: /* check RPC packet session info */
361: if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) {
362: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
363:
364: rpc->call_argc ^= rpc->call_argc;
365: rpc->call_rep.ret = RPC_ERROR(-1);
366: rpc->call_rep.eno = RPC_ERROR(errno);
367: } else {
368: /* execute RPC call */
369: schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), noreply, rpc, len);
370: }
371:
372: /* send RPC reply */
373: if (!noreply)
374: schedWrite(TASK_ROOT(task), cbProto[s->srv_proto][CB_TXPACKET],
375: TASK_ARG(task), TASK_FD(task), rpc, len);
376:
377: /* lets get next packet */
378: schedReadSelf(task);
379: return NULL;
380: }
381:
382: static void *
383: acceptClients(sched_task_t *task)
384: {
385: rpc_srv_t *srv = TASK_ARG(task);
386: rpc_cli_t *c = NULL;
387: socklen_t salen = sizeof(sockaddr_t);
388:
389: c = _allocClient(srv, NULL);
390: if (!c)
391: goto end;
392:
393: /* accept client */
394: c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
395: if (c->cli_sock == -1) {
396: LOGERR;
397: ait_freeVars(&c->cli_buf);
398: array_Del(srv->srv_clients, c->cli_id, 42);
399: goto end;
400: } else
401: fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
402:
403: schedRead(TASK_ROOT(task), cbProto[srv->srv_proto][CB_RXPACKET], c,
404: c->cli_sock, NULL, 0);
405: end:
406: schedReadSelf(task);
407: return NULL;
408: }
409:
410:
411: static void *
412: txUDPPacket(sched_task_t *task)
413: {
414: rpc_cli_t *c = TASK_ARG(task);
415: rpc_srv_t *s = c->cli_parent;
416: rpc_func_t *f = NULL;
417: u_char *buf;
418: struct tagRPCCall *rpc = (struct tagRPCCall*) TASK_DATA(task);
419: int ret, wlen = sizeof(struct tagRPCCall);
420: int len = sizeof(struct tagRPCCall) + ntohl(rpc->call_len);
421: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
422:
423: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
424: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
425: TASK_ARG(task), ts, TASK_ARG(task), 0);
426:
427: buf = e_malloc(len);
428: if (!buf) {
429: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
430: /* close connection */
431: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
432: TASK_ARG(task), 0, NULL, 0);
433: return NULL;
434: } else {
435: /* copy RPC header */
436: memcpy(buf, rpc, wlen);
437: rpc = (struct tagRPCCall*) buf;
438: }
439:
440: if (rpc->call_argc) {
441: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
442: if (!f) {
443: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
444:
445: rpc->call_argc ^= rpc->call_argc;
446: rpc->call_rep.ret = RPC_ERROR(-1);
447: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
448: } else {
449: rpc->call_argc = htons(array_Size(RPC_RETVARS(c)));
450: /* Go Encapsulate variables */
451: ret = ait_vars2buffer(buf + wlen, len - wlen, RPC_RETVARS(c));
452: /* Free return values */
453: ait_freeVars(&c->cli_vars);
454: if (ret == -1) {
455: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
456:
457: rpc->call_argc ^= rpc->call_argc;
458: rpc->call_rep.ret = RPC_ERROR(-1);
459: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
460: } else
461: wlen += ret;
462: }
463: }
464:
465: rpc->call_len = htonl(wlen);
466:
467: /* calculate CRC */
468: rpc->call_crc ^= rpc->call_crc;
469: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
470:
471: /* send reply */
472: ret = sendto(TASK_FD(task), buf, wlen, MSG_NOSIGNAL,
473: &c->cli_sa.sa, c->cli_sa.sa.sa_len);
474: if (ret == -1 || ret != wlen) {
475: /* close connection */
476: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
477: TASK_ARG(task), 0, NULL, 0);
478: }
479:
480: e_free(buf);
481: return NULL;
482: }
483:
484: static void *
485: rxUDPPacket(sched_task_t *task)
486: {
487: rpc_srv_t *srv = TASK_ARG(task);
488: rpc_cli_t *c = NULL;
489: int len, rlen, noreply;
490: ait_val_t *bufz;
491: u_char *buf = NULL;
492: struct tagRPCCall rpcbuf, *rpc;
493: sockaddr_t sa[2];
494: socklen_t salen;
495: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
496:
497: memset(buf, 0, AIT_LEN(bufz));
498: memset(&rpcbuf, 0, sizeof rpcbuf);
499:
500: /* receive connect packet */
501: salen = sa[0].ss.ss_len = sizeof(sockaddr_t);
502: rlen = recvfrom(TASK_FD(task), &rpcbuf, sizeof rpcbuf, 0, &sa[0].sa, &salen);
503: if (rlen < sizeof(struct tagRPCCall) || ntohl(rpcbuf.call_len) < sizeof(struct tagRPCCall))
504: goto end;
505: else
506: len = ntohl(rpcbuf.call_len);
507:
508: buf = e_malloc(len);
509: if (!buf)
510: goto end;
511: else
512: memset(buf, 0, len);
513:
514: /* read payload */
515: salen = sa[1].ss.ss_len = sizeof(sockaddr_t);
516: rlen = recvfrom(TASK_FD(task), buf, len, 0, &sa[1].sa, &salen);
517: if (rlen < len || memcmp(&sa[0], &sa[1], sizeof sa[0]))
518: goto end;
519:
520: c = _allocClient(srv, sa);
521: if (!c)
522: goto end;
523: else {
524: /* add extra buffer */
525: if (!(bufz = ait_getVars(&c->cli_buf, 1)))
526: goto end;
527: else {
528: AIT_FREE_VAL(bufz);
529: AIT_SET_BUFSIZ(bufz, 0, len);
530: /* buffer isnt last */
531: RPC_SET_NEXTBUF(c);
532: }
533:
534: rpc = getHeader(c);
535: memcpy(rpc, &rpcbuf, sizeof(struct tagRPCCall));
536: memcpy(getBuffer(c), buf, len);
537:
538: c->cli_sock = TASK_FD(task);
539: memcpy(&c->cli_sa, sa, sizeof c->cli_sa);
540: /* armed timer for close stateless connection */
541: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
542: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
543: c, ts, c, 0);
544: }
545:
546: /* check integrity of packet */
547: if (ntohs(rpc->call_crc) != crcFletcher16((u_short*) buf, len / 2)) {
548: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
549: goto end;
550: }
551:
552: noreply = RPC_CHK_NOREPLY(rpc);
553:
554: /* check RPC packet session info */
555: if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
556: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
557:
558: rpc->call_argc ^= rpc->call_argc;
559: rpc->call_rep.ret = RPC_ERROR(-1);
560: rpc->call_rep.eno = RPC_ERROR(errno);
561: } else {
562: /* execute RPC call */
563: schedEvent(TASK_ROOT(task), execCall, c, noreply, rpc, len);
564: }
565:
566: /* send RPC reply */
567: if (!noreply)
568: schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],
569: c, TASK_FD(task), rpc, len);
570: end:
571: if (buf)
572: e_free(buf);
573: schedReadSelf(task);
574: return NULL;
575: }
576:
577: /* ------------------------------------------------------ */
578:
579: void
580: rpc_freeBLOBCli(rpc_cli_t * __restrict c)
581: {
582: rpc_srv_t *s = c->cli_parent;
583:
584: schedCancelby(s->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
585:
586: /* free buffer(s) */
587: ait_freeVars(&c->cli_buf);
588:
589: array_Del(s->srv_blob.clients, c->cli_id, 0);
590: if (c)
591: e_free(c);
592: }
593:
594:
595: static void *
596: closeBLOBClient(sched_task_t *task)
597: {
598: int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock;
599:
600: rpc_freeBLOBCli(TASK_ARG(task));
601:
602: /* close client socket */
603: shutdown(sock, SHUT_RDWR);
604: close(sock);
605: return NULL;
606: }
607:
608: static void *
609: txBLOB(sched_task_t *task)
610: {
611: rpc_cli_t *c = TASK_ARG(task);
612: u_char *buf = AIT_GET_BUF(array(c->cli_buf, 0, ait_val_t*));
613: int wlen = sizeof(struct tagBLOBHdr);
614:
615: /* send reply */
616: wlen = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);
617: if (wlen == -1 || wlen != sizeof(struct tagBLOBHdr)) {
618: /* close blob connection */
619: schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
620: }
621:
622: return NULL;
623: }
624:
625: static void *
626: rxBLOB(sched_task_t *task)
627: {
628: rpc_cli_t *c = TASK_ARG(task);
629: rpc_srv_t *s = c->cli_parent;
630: rpc_blob_t *b;
631: struct tagBLOBHdr blob;
632: int rlen;
633:
634: memset(&blob, 0, sizeof blob);
635: rlen = recv(TASK_FD(task), &blob, sizeof blob, 0);
636: if (rlen < 1) {
637: /* close blob connection */
638: schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
639: return NULL;
640: }
641:
642: /* check BLOB packet */
643: if (rlen < sizeof(struct tagBLOBHdr)) {
644: rpc_SetErr(ERPCMISMATCH, "Short BLOB packet");
645:
646: schedReadSelf(task);
647: return NULL;
648: }
649:
650: /* check RPC packet session info */
651: if (rpc_chkPktSession(&blob.hdr_session, &s->srv_session)) {
652: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
653: blob.hdr_cmd = error;
654: goto end;
655: }
656:
657: /* Go to proceed packet ... */
658: switch (blob.hdr_cmd) {
659: case get:
660: if (!(b = rpc_srv_getBLOB(s, ntohl(blob.hdr_var)))) {
661: rpc_SetErr(EINVAL, "Var=%x not found", ntohl(blob.hdr_var));
662: blob.hdr_cmd = no;
663: blob.hdr_ret = RPC_ERROR(-1);
664: break;
665: } else
666: blob.hdr_len = htonl(b->blob_len);
667:
668: if (rpc_srv_blobMap(s, b) != -1) {
669: /* deliver BLOB variable to client */
670: blob.hdr_ret = htonl(rpc_srv_sendBLOB(c, b));
671: rpc_srv_blobUnmap(b);
672: } else {
673: blob.hdr_cmd = error;
674: blob.hdr_ret = RPC_ERROR(-1);
675: }
676: break;
677: case set:
678: if ((b = rpc_srv_registerBLOB(s, ntohl(blob.hdr_len),
679: ntohl(blob.hdr_ret)))) {
680: /* set new BLOB variable for reply :) */
681: blob.hdr_var = htonl(b->blob_var);
682:
683: /* receive BLOB from client */
684: blob.hdr_ret = htonl(rpc_srv_recvBLOB(c, b));
685: rpc_srv_blobUnmap(b);
686: } else {
687: blob.hdr_cmd = error;
688: blob.hdr_ret = RPC_ERROR(-1);
689: }
690: break;
691: case unset:
692: if (rpc_srv_unregisterBLOB(s, ntohl(blob.hdr_var)) == -1) {
693: blob.hdr_cmd = error;
694: blob.hdr_ret = RPC_ERROR(-1);
695: }
696: break;
697: default:
698: rpc_SetErr(EPROCUNAVAIL, "Unsupported BLOB command %d", blob.hdr_cmd);
699: blob.hdr_cmd = error;
700: blob.hdr_ret = RPC_ERROR(-1);
701: }
702:
703: end:
704: memcpy(AIT_ADDR(array(c->cli_buf, 0, ait_val_t*)), &blob, sizeof blob);
705: schedWrite(TASK_ROOT(task), txBLOB, TASK_ARG(task), TASK_FD(task), NULL, 0);
706: schedReadSelf(task);
707: return NULL;
708: }
709:
710: static void *
711: flushBLOB(sched_task_t *task)
712: {
713: rpc_srv_t *srv = TASK_ARG(task);
714: rpc_blob_t *b, *tmp;
715:
716: TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
717: TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
718:
719: rpc_srv_blobFree(srv, b);
720: e_free(b);
721: }
722:
723: schedSignalSelf(task);
724: return NULL;
725: }
726:
727: static void *
728: acceptBLOBClients(sched_task_t *task)
729: {
730: rpc_srv_t *srv = TASK_ARG(task);
731: rpc_cli_t *c = NULL;
732: register int i;
733: socklen_t salen = sizeof(sockaddr_t);
734: #ifdef TCP_NOPUSH
735: int n = 1;
736: #endif
737:
738: /* check free slots for connect */
739: for (i = 0; i < array_Size(srv->srv_blob.clients) &&
740: (c = array(srv->srv_blob.clients, i, rpc_cli_t*)); i++);
741: if (c) /* no more free slots! */
742: goto end;
743: c = e_malloc(sizeof(rpc_cli_t));
744: if (!c) {
745: LOGERR;
746: srv->srv_kill = srv->srv_blob.kill = 1;
747: return NULL;
748: } else {
749: memset(c, 0, sizeof(rpc_cli_t));
750: array_Set(srv->srv_blob.clients, i, c);
751: c->cli_id = i;
752: c->cli_parent = srv;
753: }
754:
755: /* init buffer(s) */
756: c->cli_buf = ait_allocVars(1);
757: if (!c->cli_buf) {
758: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
759: array_Del(srv->srv_blob.clients, i, 42);
760: goto end;
761: } else
762: AIT_SET_BUFSIZ(array(c->cli_buf, 0, ait_val_t*), 0, srv->srv_netbuf);
763:
764: /* accept client */
765: c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
766: if (c->cli_sock == -1) {
767: LOGERR;
768: ait_freeVars(&c->cli_buf);
769: array_Del(srv->srv_blob.clients, i, 42);
770: goto end;
771: } else {
772: #ifdef TCP_NOPUSH
773: setsockopt(c->cli_sock, IPPROTO_TCP, TCP_NOPUSH, &n, sizeof n);
774: #endif
775: fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
776: }
777:
778: schedRead(TASK_ROOT(task), rxBLOB, c, c->cli_sock, NULL, 0);
779: end:
780: schedReadSelf(task);
781: return NULL;
782: }
783:
784: /* ------------------------------------------------------ */
785:
786: /*
787: * rpc_srv_initBLOBServer() - Init & create BLOB Server
788: *
789: * @srv = RPC server instance
790: * @Port = Port for bind server, if Port == 0 default port is selected
791: * @diskDir = Disk place for BLOB file objects
792: * return: -1 == error or 0 bind and created BLOB server instance
793: */
794: int
795: rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)
796: {
797: int n = 1;
798:
799: if (!srv || srv->srv_kill) {
800: rpc_SetErr(EINVAL, "Invalid parameters can`t init BLOB server");
801: return -1;
802: }
803:
804: memset(&srv->srv_blob, 0, sizeof srv->srv_blob);
805: if (access(diskDir, R_OK | W_OK) == -1) {
806: LOGERR;
807: return -1;
808: } else
809: AIT_SET_STR(&srv->srv_blob.dir, diskDir);
810:
811: /* init blob list */
812: TAILQ_INIT(&srv->srv_blob.blobs);
813:
814: srv->srv_blob.server.cli_parent = srv;
815:
816: memcpy(&srv->srv_blob.server.cli_sa, &srv->srv_server.cli_sa, sizeof(sockaddr_t));
817: switch (srv->srv_blob.server.cli_sa.sa.sa_family) {
818: case AF_INET:
819: srv->srv_blob.server.cli_sa.sin.sin_port =
820: htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin.sin_port) + 1);
821: break;
822: case AF_INET6:
823: srv->srv_blob.server.cli_sa.sin6.sin6_port =
824: htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin6.sin6_port) + 1);
825: break;
826: case AF_LOCAL:
827: strlcat(srv->srv_blob.server.cli_sa.sun.sun_path, ".blob",
828: sizeof srv->srv_blob.server.cli_sa.sun.sun_path);
829: break;
830: default:
831: AIT_FREE_VAL(&srv->srv_blob.dir);
832: return -1;
833: }
834:
835: /* create BLOB server socket */
836: srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);
837: if (srv->srv_blob.server.cli_sock == -1) {
838: LOGERR;
839: AIT_FREE_VAL(&srv->srv_blob.dir);
840: return -1;
841: }
842: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
843: LOGERR;
844: close(srv->srv_blob.server.cli_sock);
845: AIT_FREE_VAL(&srv->srv_blob.dir);
846: return -1;
847: }
848: n = srv->srv_netbuf;
849: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
850: LOGERR;
851: close(srv->srv_blob.server.cli_sock);
852: AIT_FREE_VAL(&srv->srv_blob.dir);
853: return -1;
854: }
855: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
856: LOGERR;
857: close(srv->srv_blob.server.cli_sock);
858: AIT_FREE_VAL(&srv->srv_blob.dir);
859: return -1;
860: }
861: if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa,
862: srv->srv_blob.server.cli_sa.sa.sa_len) == -1) {
863: LOGERR;
864: close(srv->srv_blob.server.cli_sock);
865: AIT_FREE_VAL(&srv->srv_blob.dir);
866: return -1;
867: } else
868: fcntl(srv->srv_blob.server.cli_sock, F_SETFL,
869: fcntl(srv->srv_blob.server.cli_sock, F_GETFL) | O_NONBLOCK);
870:
871:
872: /* allocate pool for concurent blob clients */
873: srv->srv_blob.clients = array_Init(array_Size(srv->srv_clients));
874: if (!srv->srv_blob.clients) {
875: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
876: close(srv->srv_blob.server.cli_sock);
877: AIT_FREE_VAL(&srv->srv_blob.dir);
878: return -1;
879: }
880:
881: /* init blob scheduler */
882: srv->srv_blob.root = schedBegin();
883: if (!srv->srv_blob.root) {
884: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
885: array_Destroy(&srv->srv_blob.clients);
886: close(srv->srv_blob.server.cli_sock);
887: AIT_FREE_VAL(&srv->srv_blob.dir);
888: return -1;
889: }
890:
891: return 0;
892: }
893:
894: /*
895: * rpc_srv_endBLOBServer() - Destroy BLOB server, close all opened sockets and free resources
896: *
897: * @srv = RPC Server instance
898: * return: none
899: */
900: void
901: rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
902: {
903: if (!srv)
904: return;
905:
906: srv->srv_blob.kill = 1;
907:
908: schedEnd(&srv->srv_blob.root);
909: }
910:
911: /*
912: * rpc_srv_loopBLOBServer() - Execute Main BLOB server loop and wait for clients requests
913: *
914: * @srv = RPC Server instance
915: * return: -1 error or 0 ok, infinite loop ...
916: */
917: int
918: rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv)
919: {
920: rpc_cli_t *c;
921: register int i;
922: rpc_blob_t *b, *tmp;
923: struct timespec ts = { RPC_SCHED_POLLING, 0 };
924:
925: if (!srv || srv->srv_kill) {
926: rpc_SetErr(EINVAL, "Invalid parameter can`t start BLOB server");
927: return -1;
928: }
929:
930: if (listen(srv->srv_blob.server.cli_sock, array_Size(srv->srv_blob.clients)) == -1) {
931: LOGERR;
932: return -1;
933: }
934:
935: schedSignal(srv->srv_blob.root, flushBLOB, srv, SIGFBLOB, NULL, 0);
936: if (!schedRead(srv->srv_blob.root, acceptBLOBClients, srv,
937: srv->srv_blob.server.cli_sock, NULL, 0)) {
938: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
939: return -1;
940: }
941:
942: schedPolling(srv->srv_blob.root, &ts, NULL);
943: /* main rpc loop */
944: schedRun(srv->srv_blob.root, &srv->srv_blob.kill);
945:
946: /* detach blobs */
947: TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
948: TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
949:
950: rpc_srv_blobFree(srv, b);
951: e_free(b);
952: }
953:
954: /* close all clients connections & server socket */
955: for (i = 0; i < array_Size(srv->srv_blob.clients); i++) {
956: c = array(srv->srv_blob.clients, i, rpc_cli_t*);
957: if (c) {
958: shutdown(c->cli_sock, SHUT_RDWR);
959: close(c->cli_sock);
960:
961: schedCancelby(srv->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
962: ait_freeVars(&c->cli_buf);
963: }
964: array_Del(srv->srv_blob.clients, i, 42);
965: }
966: array_Destroy(&srv->srv_blob.clients);
967:
968: close(srv->srv_blob.server.cli_sock);
969:
970: AIT_FREE_VAL(&srv->srv_blob.dir);
971: return 0;
972: }
973:
974:
975: /*
976: * rpc_srv_initServer() - Init & create RPC Server
977: *
978: * @InstID = Instance for authentication & recognition
979: * @concurentClients = Concurent clients at same time to this server
980: * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
981: * @csHost = Host name or address for bind server, if NULL any address
982: * @Port = Port for bind server, if Port == 0 default port is selected
983: * @proto = Protocol, if == 0 choose SOCK_STREAM
984: * return: NULL == error or !=NULL bind and created RPC server instance
985: */
986: rpc_srv_t *
987: rpc_srv_initServer(u_char InstID, int concurentClients, int netBuf,
988: const char *csHost, u_short Port, int proto)
989: {
990: int n = 1;
991: rpc_srv_t *srv = NULL;
992: sockaddr_t sa = E_SOCKADDR_INIT;
993:
994: if (!concurentClients || (proto < 0 || proto > SOCK_DGRAM)) {
995: rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
996: return NULL;
997: }
998: if (!e_gethostbyname(csHost, Port, &sa))
999: return NULL;
1000: if (!Port)
1001: Port = RPC_DEFPORT;
1002: if (!proto)
1003: proto = SOCK_STREAM;
1004: if (netBuf < RPC_MIN_BUFSIZ)
1005: netBuf = BUFSIZ;
1006: else
1007: netBuf = E_ALIGN(netBuf, 2); /* align netBuf length */
1008:
1009: #ifdef HAVE_SRANDOMDEV
1010: srandomdev();
1011: #else
1012: time_t tim;
1013:
1014: srandom((time(&tim) ^ getpid()));
1015: #endif
1016:
1017: srv = e_malloc(sizeof(rpc_srv_t));
1018: if (!srv) {
1019: LOGERR;
1020: return NULL;
1021: } else
1022: memset(srv, 0, sizeof(rpc_srv_t));
1023:
1024: srv->srv_proto = proto;
1025: srv->srv_netbuf = netBuf;
1026: srv->srv_session.sess_version = RPC_VERSION;
1027: srv->srv_session.sess_instance = InstID;
1028:
1029: srv->srv_server.cli_parent = srv;
1030: memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa);
1031:
1032: /* init functions */
1033: pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
1034: SLIST_INIT(&srv->srv_funcs);
1035: AVL_INIT(&srv->srv_funcs);
1036:
1037: /* init scheduler */
1038: srv->srv_root = schedBegin();
1039: if (!srv->srv_root) {
1040: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1041: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1042: e_free(srv);
1043: return NULL;
1044: }
1045:
1046: /* init pool for clients */
1047: srv->srv_clients = array_Init(concurentClients);
1048: if (!srv->srv_clients) {
1049: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
1050: schedEnd(&srv->srv_root);
1051: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1052: e_free(srv);
1053: return NULL;
1054: }
1055:
1056: /* create server socket */
1057: srv->srv_server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, srv->srv_proto, 0);
1058: if (srv->srv_server.cli_sock == -1) {
1059: LOGERR;
1060: array_Destroy(&srv->srv_clients);
1061: schedEnd(&srv->srv_root);
1062: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1063: e_free(srv);
1064: return NULL;
1065: }
1066: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
1067: LOGERR;
1068: goto err;
1069: }
1070: n = srv->srv_netbuf;
1071: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
1072: LOGERR;
1073: goto err;
1074: }
1075: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
1076: LOGERR;
1077: goto err;
1078: }
1079: if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa,
1080: srv->srv_server.cli_sa.sa.sa_len) == -1) {
1081: LOGERR;
1082: goto err;
1083: } else
1084: fcntl(srv->srv_server.cli_sock, F_SETFL,
1085: fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
1086:
1087: rpc_register_srvPing(srv);
1088:
1089: return srv;
1090: err: /* error condition */
1091: close(srv->srv_server.cli_sock);
1092: array_Destroy(&srv->srv_clients);
1093: schedEnd(&srv->srv_root);
1094: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1095: e_free(srv);
1096: return NULL;
1097: }
1098:
1099: /*
1100: * rpc_srv_endServer() - Destroy RPC server, close all opened sockets and free resources
1101: *
1102: * @psrv = RPC Server instance
1103: * return: none
1104: */
1105: void
1106: rpc_srv_endServer(rpc_srv_t ** __restrict psrv)
1107: {
1108: if (!psrv || !*psrv)
1109: return;
1110:
1111: /* if send kill to blob server */
1112: rpc_srv_endBLOBServer(*psrv);
1113:
1114: (*psrv)->srv_kill = 1;
1115: sleep(RPC_SCHED_POLLING);
1116:
1117: schedEnd(&(*psrv)->srv_root);
1118:
1119: pthread_mutex_destroy(&(*psrv)->srv_funcs.mtx);
1120: e_free(*psrv);
1121: *psrv = NULL;
1122: }
1123:
1124: /*
1125: * rpc_srv_loopServer() - Execute Main server loop and wait for clients requests
1126: *
1127: * @srv = RPC Server instance
1128: * return: -1 error or 0 ok, infinite loop ...
1129: */
1130: int
1131: rpc_srv_loopServer(rpc_srv_t * __restrict srv)
1132: {
1133: rpc_cli_t *c;
1134: register int i;
1135: rpc_func_t *f;
1136: struct timespec ts = { RPC_SCHED_POLLING, 0 };
1137:
1138: if (!srv) {
1139: rpc_SetErr(EINVAL, "Invalid parameter can`t start RPC server");
1140: return -1;
1141: }
1142:
1143: if (srv->srv_proto == SOCK_STREAM)
1144: if (listen(srv->srv_server.cli_sock, array_Size(srv->srv_clients)) == -1) {
1145: LOGERR;
1146: return -1;
1147: }
1148:
1149: if (!schedRead(srv->srv_root, cbProto[srv->srv_proto][CB_ACCEPTCLIENT], srv,
1150: srv->srv_server.cli_sock, NULL, 0)) {
1151: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1152: return -1;
1153: }
1154:
1155: schedPolling(srv->srv_root, &ts, NULL);
1156: /* main rpc loop */
1157: schedRun(srv->srv_root, &srv->srv_kill);
1158:
1159: /* close all clients connections & server socket */
1160: for (i = 0; i < array_Size(srv->srv_clients); i++) {
1161: c = array(srv->srv_clients, i, rpc_cli_t*);
1162: if (c) {
1163: shutdown(c->cli_sock, SHUT_RDWR);
1164: close(c->cli_sock);
1165:
1166: schedCancelby(srv->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
1167: ait_freeVars(&RPC_RETVARS(c));
1168: ait_freeVars(&c->cli_buf);
1169: }
1170: array_Del(srv->srv_clients, i, 42);
1171: }
1172: array_Destroy(&srv->srv_clients);
1173:
1174: close(srv->srv_server.cli_sock);
1175:
1176: /* detach exported calls */
1177: RPC_FUNCS_LOCK(&srv->srv_funcs);
1178: while ((f = SLIST_FIRST(&srv->srv_funcs))) {
1179: SLIST_REMOVE_HEAD(&srv->srv_funcs, func_next);
1180:
1181: AIT_FREE_VAL(&f->func_name);
1182: e_free(f);
1183: }
1184: srv->srv_funcs.avlh_root = NULL;
1185: RPC_FUNCS_UNLOCK(&srv->srv_funcs);
1186:
1187: return 0;
1188: }
1189:
1190:
1191: /*
1192: * rpc_srv_execCall() Execute registered call from RPC server
1193: *
1194: * @cli = RPC client
1195: * @rpc = IN RPC call structure
1196: * @funcname = Execute RPC function
1197: * @args = IN RPC calling arguments from RPC client
1198: * return: -1 error, !=-1 ok
1199: */
1200: int
1201: rpc_srv_execCall(rpc_cli_t * __restrict cli, struct tagRPCCall * __restrict rpc,
1202: ait_val_t funcname, array_t * __restrict args)
1203: {
1204: rpc_callback_t func;
1205:
1206: if (!cli || !rpc || !AIT_ADDR(&funcname)) {
1207: rpc_SetErr(EINVAL, "Invalid parameter can`t exec function");
1208: return -1;
1209: }
1210:
1211: func = AIT_GET_LIKE(&funcname, rpc_callback_t);
1212: return func(cli, rpc, args);
1213: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>