1: /*************************************************************************
2: * (C) 2010 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
6: * $Id: srv.c,v 1.8.2.8 2012/05/14 08:36:58 misho Exp $
7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47:
48:
49: static void *rxPacket(sched_task_t*);
50: static void *rxBLOB(sched_task_t*);
51:
52: static void *
53: txPacket(sched_task_t *task)
54: {
55: rpc_cli_t *c = TASK_ARG(task);
56: rpc_srv_t *s = c->cli_parent;
57: rpc_func_t *f = NULL;
58: u_char *buf = TASK_DATA(task);
59: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
60: int ret, wlen = sizeof(struct tagRPCCall);
61: array_t *arr = NULL;
62:
63: if (rpc->call_argc) {
64: f = rpc_srv_getCall(s, ntohs(rpc->call_tag), ntohl(rpc->call_hash));
65: if (!f) {
66: rpc->call_argc ^= rpc->call_argc;
67: rpc->call_rep.ret = RPC_ERROR(-1);
68: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
69: } else {
70: rpc->call_argc = htons(rpc_srv_getVars(f, &arr));
71: /* Go Encapsulate variables */
72: ret = io_vars2buffer(buf + wlen, TASK_DATLEN(task) - wlen, arr);
73: io_clrVars(f->func_vars);
74: if (ret == -1) {
75: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
76: rpc->call_argc ^= rpc->call_argc;
77: rpc->call_rep.ret = RPC_ERROR(-1);
78: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
79: } else
80: wlen += ret;
81: }
82: }
83:
84: rpc->call_len = htons(wlen);
85:
86: /* calculate CRC */
87: rpc->call_crc ^= rpc->call_crc;
88: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
89:
90: /* send reply */
91: ret = send(TASK_FD(task), buf, wlen, 0);
92: if (ret == -1)
93: LOGERR;
94: else if (ret != wlen)
95: rpc_SetErr(EPROCUNAVAIL, "RPC reply, should be send %d bytes, "
96: "really sended %d bytes", wlen, ret);
97:
98: return NULL;
99: }
100:
101: static void *
102: execCall(sched_task_t *task)
103: {
104: rpc_cli_t *c = TASK_ARG(task);
105: rpc_srv_t *s = c->cli_parent;
106: rpc_func_t *f = NULL;
107: array_t *arr = NULL;
108: u_char *buf = TASK_DATA(task) + TASK_VAL(task);
109: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
110: int argc = ntohs(rpc->call_argc);
111:
112: /* Go decapsulate variables ... */
113: if (argc) {
114: arr = io_buffer2vars(buf + sizeof(struct tagRPCCall),
115: TASK_DATLEN(task) - TASK_VAL(task) - sizeof(struct tagRPCCall),
116: argc, 1);
117: if (!arr) {
118: rpc_SetErr(ERPCMISMATCH, "#%d - %s", io_GetErrno(), io_GetError());
119: rpc->call_argc ^= rpc->call_argc;
120: rpc->call_rep.ret = RPC_ERROR(-1);
121: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
122: return NULL;
123: }
124: }
125:
126: if (!(f = rpc_srv_getCall(s, ntohs(rpc->call_tag), ntohl(rpc->call_hash)))) {
127: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
128: rpc->call_argc ^= rpc->call_argc;
129: rpc->call_rep.ret = RPC_ERROR(-1);
130: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
131: } else {
132: /* if client doesn't want reply */
133: argc = rpc->call_req.flags & RPC_NOREPLY;
134: rpc->call_rep.ret = RPC_ERROR(rpc_srv_execCall(f, rpc, arr));
135: if (rpc->call_rep.ret == htonl(-1)) {
136: rpc->call_rep.eno = RPC_ERROR(errno);
137: rpc->call_argc ^= rpc->call_argc;
138: } else {
139: rpc->call_rep.eno ^= rpc->call_rep.eno;
140: if (argc) {
141: /* without reply */
142: io_clrVars(f->func_vars);
143: rpc->call_argc ^= rpc->call_argc;
144: } else
145: rpc->call_argc = htons(rpc_srv_getVars(f, NULL));
146: }
147: }
148:
149: if (arr)
150: io_arrayDestroy(&arr);
151: return NULL;
152: }
153:
154: static void *
155: rxPacket(sched_task_t *task)
156: {
157: rpc_cli_t *c = TASK_ARG(task);
158: rpc_srv_t *s = c->cli_parent;
159: u_char *buf = TASK_DATA(task);
160: int rlen, noreply;
161: u_short crc, off = 0;
162: struct tagRPCCall *rpc;
163: struct timespec ts;
164:
165: memset(buf, 0, TASK_DATLEN(task));
166: rlen = recv(TASK_FD(task), buf, TASK_DATLEN(task), 0);
167: if (rlen == -1) {
168: LOGERR;
169: c->cli_kill = kill;
170: return NULL;
171: } else if (!rlen) { /* receive EOF */
172: c->cli_kill = kill;
173: return NULL;
174: }
175:
176: do {
177: if (rlen < sizeof(struct tagRPCCall)) {
178: rpc_SetErr(ERPCMISMATCH, "Too short RPC packet");
179:
180: schedRead(TASK_ROOT(task), rxPacket, TASK_ARG(task), TASK_FD(task),
181: TASK_DATA(task), TASK_DATLEN(task));
182: return NULL;
183: } else
184: rpc = (struct tagRPCCall*) (buf + off);
185:
186: rlen -= ntohs(rpc->call_len);
187:
188: /* check integrity of packet */
189: crc = ntohs(rpc->call_crc);
190: rpc->call_crc ^= rpc->call_crc;
191: if (crc != crcFletcher16((u_short*) (buf + off), ntohs(rpc->call_len) / 2)) {
192: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
193:
194: off += ntohs(rpc->call_len);
195: if (rlen < 1)
196: break;
197: else
198: continue;
199: }
200:
201: noreply = rpc->call_req.flags & RPC_NOREPLY;
202:
203: /* check RPC packet session info */
204: if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) {
205: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
206: rpc->call_argc ^= rpc->call_argc;
207: rpc->call_rep.ret = RPC_ERROR(-1);
208: rpc->call_rep.eno = RPC_ERROR(errno);
209: } else {
210: /* change socket timeout from last packet */
211: ts.tv_sec = rpc->call_session.sess_timeout;
212: ts.tv_nsec = 0;
213: schedPolling(TASK_ROOT(task), &ts, NULL);
214:
215: /* execute RPC call */
216: schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), off,
217: TASK_DATA(task), TASK_DATLEN(task));
218: }
219:
220: /* send RPC reply */
221: if (!noreply)
222: schedWrite(TASK_ROOT(task), txPacket, TASK_ARG(task), TASK_FD(task),
223: buf + off, TASK_DATLEN(task) - off);
224:
225: off += ntohs(rpc->call_len);
226: } while (rlen > 0);
227:
228: /* lets get next packet */
229: schedRead(TASK_ROOT(task), rxPacket, TASK_ARG(task), TASK_FD(task),
230: TASK_DATA(task), TASK_DATLEN(task));
231: return NULL;
232: }
233:
234: static void *
235: rpc_srv_dispatchCall(void *arg)
236: {
237: rpc_cli_t *c = arg;
238: rpc_srv_t *s;
239: u_char *buf;
240: sched_root_task_t *root;
241: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
242:
243: if (!arg) {
244: rpc_SetErr(EINVAL, "Invalid parameter can`t procced RPC client");
245: return NULL;
246: } else
247: s = c->cli_parent;
248:
249: /* allocate net buffer */
250: buf = malloc(s->srv_netbuf);
251: if (!buf) {
252: LOGERR;
253: return NULL;
254: }
255:
256: root = schedBegin();
257: if (!root) {
258: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
259: free(buf);
260: return NULL;
261: } else {
262: schedTermCondition(root, kill);
263: schedPolling(root, &ts, NULL);
264: }
265:
266: schedRead(root, rxPacket, c, c->cli_sock, buf, s->srv_netbuf);
267:
268: schedRun(root, (void*) &c->cli_kill);
269: schedEnd(&root);
270:
271: shutdown(c->cli_sock, SHUT_RDWR);
272: close(c->cli_sock);
273: memset(c, 0, sizeof(rpc_cli_t));
274: free(buf);
275: return NULL;
276: }
277:
278:
279: static void *
280: txBLOB(sched_task_t *task)
281: {
282: u_char *buf = TASK_DATA(task);
283: struct tagBLOBHdr *blob = (struct tagBLOBHdr *) buf;
284: int wlen = sizeof(struct tagBLOBHdr);
285:
286: /* calculate CRC */
287: blob->hdr_crc ^= blob->hdr_crc;
288: blob->hdr_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
289:
290: /* send reply */
291: wlen = send(TASK_FD(task), buf, wlen, 0);
292: if (wlen == -1)
293: LOGERR;
294: else if (wlen != sizeof(struct tagBLOBHdr))
295: rpc_SetErr(EPROCUNAVAIL, "RPC reply, should be send %d bytes, "
296: "really sended %d bytes", sizeof(struct tagBLOBHdr), wlen);
297:
298: return NULL;
299: }
300:
301: static void *
302: rxBLOB(sched_task_t *task)
303: {
304: rpc_cli_t *c = TASK_ARG(task);
305: rpc_srv_t *s = c->cli_parent;
306: rpc_blob_t *b;
307: u_char *buf = TASK_DATA(task);
308: struct tagBLOBHdr *blob = (struct tagBLOBHdr *) buf;
309: int rlen;
310: u_short crc;
311: struct timespec ts;
312:
313: /* check for disable service at this moment? */
314: if (!s || s->srv_blob.state == disable) {
315: usleep(100000);
316: #ifdef HAVE_PTHREAD_YIELD
317: pthread_yield();
318: #endif
319: schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task),
320: TASK_DATA(task), TASK_DATLEN(task));
321: return NULL;
322: }
323:
324: memset(buf, 0, TASK_DATLEN(task));
325: rlen = recv(TASK_FD(task), buf, TASK_DATLEN(task), 0);
326: if (rlen == -1) {
327: LOGERR;
328: c->cli_kill = kill;
329: return NULL;
330: } else if (!rlen) { /* receive EOF */
331: c->cli_kill = kill;
332: return NULL;
333: }
334:
335: if (rlen < sizeof(struct tagBLOBHdr)) {
336: rpc_SetErr(ERPCMISMATCH, "Too short BLOB packet");
337: schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task),
338: TASK_DATA(task), TASK_DATLEN(task));
339: return NULL;
340: }
341:
342: /* check integrity of packet */
343: crc = ntohs(blob->hdr_crc);
344: blob->hdr_crc ^= blob->hdr_crc;
345: if (crc != crcFletcher16((u_short*) buf, rlen / 2)) {
346: rpc_SetErr(ERPCMISMATCH, "Bad CRC BLOB packet");
347: schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task),
348: TASK_DATA(task), TASK_DATLEN(task));
349: return NULL;
350: }
351:
352: /* check RPC packet session info */
353: if ((crc = rpc_chkPktSession(&blob->hdr_session, &s->srv_session))) {
354: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
355: blob->hdr_cmd = error;
356: goto end;
357: } else {
358: /* change socket timeout from last packet */
359: ts.tv_sec = blob->hdr_session.sess_timeout;
360: ts.tv_nsec = 0;
361: schedPolling(TASK_ROOT(task), &ts, NULL);
362: }
363:
364: /* Go to proceed packet ... */
365: switch (blob->hdr_cmd) {
366: case get:
367: if (!(b = rpc_srv_getBLOB(s, ntohl(blob->hdr_var)))) {
368: rpc_SetErr(EINVAL, "Var=%x not found", ntohl(blob->hdr_var));
369: blob->hdr_cmd = no;
370: blob->hdr_ret = RPC_ERROR(-1);
371: break;
372: } else
373: blob->hdr_len = htonl(b->blob_len);
374:
375: if (rpc_srv_blobMap(s, b) != -1) {
376: /* deliver BLOB variable to client */
377: blob->hdr_ret = htonl(rpc_srv_sendBLOB(c, b));
378: rpc_srv_blobUnmap(b);
379: } else {
380: blob->hdr_cmd = error;
381: blob->hdr_ret = RPC_ERROR(-1);
382: }
383: break;
384: case set:
385: if ((b = rpc_srv_registerBLOB(s, ntohl(blob->hdr_len)))) {
386: /* set new BLOB variable for reply :) */
387: blob->hdr_var = htonl(b->blob_var);
388:
389: /* receive BLOB from client */
390: blob->hdr_ret = htonl(rpc_srv_recvBLOB(c, b));
391: rpc_srv_blobUnmap(b);
392: } else {
393: blob->hdr_cmd = error;
394: blob->hdr_ret = RPC_ERROR(-1);
395: }
396: break;
397: case unset:
398: if (rpc_srv_unregisterBLOB(s, blob->hdr_var) == -1) {
399: blob->hdr_cmd = error;
400: blob->hdr_ret = RPC_ERROR(-1);
401: }
402: break;
403: default:
404: rpc_SetErr(EPROCUNAVAIL, "Unsupported BLOB command %d", blob->hdr_cmd);
405: blob->hdr_cmd = error;
406: blob->hdr_ret = RPC_ERROR(-1);
407: }
408:
409: end:
410: schedWrite(TASK_ROOT(task), txBLOB, TASK_ARG(task), TASK_FD(task),
411: TASK_DATA(task), TASK_DATLEN(task));
412: schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task),
413: TASK_DATA(task), TASK_DATLEN(task));
414: return NULL;
415: }
416:
417: static void *
418: rpc_srv_dispatchVars(void *arg)
419: {
420: rpc_cli_t *c = arg;
421: rpc_srv_t *s;
422: sched_root_task_t *root;
423: u_char *buf;
424: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
425:
426: if (!arg) {
427: rpc_SetErr(EINVAL, "Invalid parameter can`t procced BLOB");
428: return NULL;
429: } else
430: s = c->cli_parent;
431:
432: /* allocate net buffer */
433: buf = malloc(sizeof(struct tagBLOBHdr));
434: if (!buf) {
435: LOGERR;
436: return NULL;
437: }
438:
439: root = schedBegin();
440: if (!root) {
441: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
442: free(buf);
443: return NULL;
444: } else {
445: schedTermCondition(root, kill);
446: schedPolling(root, &ts, NULL);
447: }
448:
449: schedRead(root, rxBLOB, c, c->cli_sock, buf, sizeof(struct tagBLOBHdr));
450:
451: schedRun(root, (void*) &c->cli_kill);
452: schedEnd(&root);
453:
454: shutdown(c->cli_sock, SHUT_RDWR);
455: close(c->cli_sock);
456: memset(c, 0, sizeof(rpc_cli_t));
457: free(buf);
458: return NULL;
459: }
460:
461: // -------------------------------------------------
462:
463: /*
464: * rpc_srv_initBLOBServer() - Init & create BLOB Server
465: *
466: * @srv = RPC server instance
467: * @Port = Port for bind server, if Port == 0 default port is selected
468: * @diskDir = Disk place for BLOB file objects
469: * return: -1 == error or 0 bind and created BLOB server instance
470: */
471: int
472: rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)
473: {
474: int n = 1;
475: io_sockaddr_t sa;
476:
477: if (!srv) {
478: rpc_SetErr(EINVAL, "Invalid parameters can`t init BLOB server");
479: return -1;
480: }
481: if (srv->srv_blob.state) {
482: rpc_SetErr(EPERM, "Already started BLOB server!");
483: return 0;
484: }
485:
486: memset(&srv->srv_blob, 0, sizeof srv->srv_blob);
487: if (access(diskDir, R_OK | W_OK) == -1) {
488: LOGERR;
489: return -1;
490: } else
491: AIT_SET_STR(&srv->srv_blob.dir, diskDir);
492:
493: srv->srv_blob.server.cli_tid = pthread_self();
494: srv->srv_blob.server.cli_parent = srv;
495:
496: memcpy(&sa, &srv->srv_server.cli_sa, sizeof sa);
497: switch (sa.sa.sa_family) {
498: case AF_INET:
499: sa.sin.sin_port = htons(Port ? Port : ntohs(sa.sin.sin_port) + 1);
500: break;
501: case AF_INET6:
502: sa.sin6.sin6_port = htons(Port ? Port : ntohs(sa.sin6.sin6_port) + 1);
503: break;
504: case AF_LOCAL:
505: strlcat(sa.sun.sun_path, ".blob", sizeof sa.sun.sun_path);
506: break;
507: default:
508: AIT_FREE_VAL(&srv->srv_blob.dir);
509: return -1;
510: }
511: memcpy(&srv->srv_blob.server.cli_sa, &sa, sizeof sa);
512:
513: /* create BLOB server socket */
514: srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);
515: if (srv->srv_blob.server.cli_sock == -1) {
516: LOGERR;
517: AIT_FREE_VAL(&srv->srv_blob.dir);
518: return -1;
519: }
520: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
521: LOGERR;
522: close(srv->srv_blob.server.cli_sock);
523: AIT_FREE_VAL(&srv->srv_blob.dir);
524: return -1;
525: }
526: n = srv->srv_netbuf;
527: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
528: LOGERR;
529: close(srv->srv_blob.server.cli_sock);
530: AIT_FREE_VAL(&srv->srv_blob.dir);
531: return -1;
532: }
533: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
534: LOGERR;
535: close(srv->srv_blob.server.cli_sock);
536: AIT_FREE_VAL(&srv->srv_blob.dir);
537: return -1;
538: }
539: if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa,
540: srv->srv_blob.server.cli_sa.sa.sa_len) == -1) {
541: LOGERR;
542: close(srv->srv_blob.server.cli_sock);
543: AIT_FREE_VAL(&srv->srv_blob.dir);
544: return -1;
545: }
546:
547: /* allocate pool for concurent clients */
548: srv->srv_blob.clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));
549: if (!srv->srv_blob.clients) {
550: LOGERR;
551: close(srv->srv_blob.server.cli_sock);
552: AIT_FREE_VAL(&srv->srv_blob.dir);
553: return -1;
554: } else
555: memset(srv->srv_blob.clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));
556:
557: rpc_srv_registerCall(srv, NULL, CALL_BLOBSHUTDOWN, 0);
558: rpc_srv_registerCall(srv, NULL, CALL_BLOBCLIENTS, 1);
559: rpc_srv_registerCall(srv, NULL, CALL_BLOBVARS, 1);
560: rpc_srv_registerCall(srv, NULL, CALL_BLOBSTATE, 0);
561:
562: srv->srv_blob.state = enable; /* enable BLOB */
563: return 0;
564: }
565:
566: /*
567: * rpc_srv_endBLOBServer() - Destroy BLOB server, close all opened sockets and free resources
568: *
569: * @srv = RPC Server instance
570: * return: none
571: */
572: void
573: rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
574: {
575: rpc_cli_t *c;
576: register int i;
577: rpc_blob_t *f;
578:
579: if (!srv || srv->srv_blob.state == disable)
580: return;
581: else
582: srv->srv_blob.state = kill;
583:
584: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSHUTDOWN);
585: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBCLIENTS);
586: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBVARS);
587: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSTATE);
588:
589: /* close all clients connections & server socket */
590: for (i = 0, c = srv->srv_blob.clients; i < srv->srv_numcli && c; i++, c++)
591: if (c->cli_sa.sa.sa_family)
592: shutdown(c->cli_sock, SHUT_RDWR);
593: close(srv->srv_blob.server.cli_sock);
594:
595: if (srv->srv_blob.clients) {
596: free(srv->srv_blob.clients);
597: srv->srv_blob.clients = NULL;
598: }
599:
600: /* detach blobs */
601: while ((f = srv->srv_blob.blobs)) {
602: srv->srv_blob.blobs = f->blob_next;
603: rpc_srv_blobFree(srv, f);
604: free(f);
605: }
606:
607: AIT_FREE_VAL(&srv->srv_blob.dir);
608:
609: /* at final, save disable BLOB service state */
610: srv->srv_blob.state = disable;
611: }
612:
613: /*
614: * rpc_srv_loopBLOB() - Execute Main BLOB server loop and wait for clients requests
615: *
616: * @srv = RPC Server instance
617: * return: -1 error or 0 ok, infinite loop ...
618: */
619: int
620: rpc_srv_loopBLOB(rpc_srv_t * __restrict srv)
621: {
622: socklen_t salen = sizeof(io_sockaddr_t);
623: register int i;
624: rpc_cli_t *c;
625: int ret;
626: pthread_attr_t attr;
627: struct pollfd pfd;
628:
629: if (!srv || srv->srv_blob.state == kill) {
630: rpc_SetErr(EINVAL, "Invalid parameter can`t start BLOB server");
631: return -1;
632: }
633:
634: if (listen(srv->srv_blob.server.cli_sock, SOMAXCONN) == -1) {
635: LOGERR;
636: return -1;
637: } else
638: fcntl(srv->srv_blob.server.cli_sock, F_SETFL,
639: fcntl(srv->srv_blob.server.cli_sock, F_GETFL) | O_NONBLOCK);
640:
641: pthread_attr_init(&attr);
642: pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
643:
644: pfd.fd = srv->srv_blob.server.cli_sock;
645: pfd.events = POLLIN | POLLPRI;
646: /* main BLOB loop */
647: while (srv->srv_blob.state != kill && srv->srv_kill != kill) {
648: for (c = srv->srv_blob.clients, i = 0; i < srv->srv_numcli && c; i++, c++)
649: if (!c->cli_sa.sa.sa_family)
650: break;
651: if (i >= srv->srv_numcli) {
652: #ifdef HAVE_PTHREAD_YIELD
653: pthread_yield();
654: #else
655: usleep(1000000);
656: #endif
657: continue;
658: }
659:
660: if ((ret = poll(&pfd, 1, srv->srv_session.sess_timeout * 1000)) == -1 ||
661: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
662: LOGERR;
663: ret = 1;
664: break;
665: } else if (!ret)
666: continue;
667:
668: c->cli_sock = accept(srv->srv_blob.server.cli_sock, &c->cli_sa.sa, &salen);
669: if (c->cli_sock == -1) {
670: LOGERR;
671: continue;
672: } else
673: c->cli_parent = srv;
674:
675: /* spawn dispatch thread for BLOB client */
676: c->cli_kill = enable;
677: if (pthread_create(&c->cli_tid, &attr, rpc_srv_dispatchVars, c)) {
678: LOGERR;
679: continue;
680: }
681: }
682:
683: srv->srv_blob.state = kill;
684:
685: pthread_attr_destroy(&attr);
686: return 0;
687: }
688:
689:
690: /*
691: * rpc_srv_initServer() - Init & create RPC Server
692: *
693: * @regProgID = ProgramID for authentication & recognition
694: * @regProcID = ProcessID for authentication & recognition
695: * @concurentClients = Concurent clients at same time to this server
696: * @netBuf = Network buffer length, if =0 == BUFSIZ (also meaning max RPC packet)
697: * @family = Family type, AF_INET, AF_INET6 or AF_LOCAL
698: * @csHost = Host name or address for bind server, if NULL any address
699: * @Port = Port for bind server, if Port == 0 default port is selected
700: * return: NULL == error or !=NULL bind and created RPC server instance
701: */
702: rpc_srv_t *
703: rpc_srv_initServer(u_int regProgID, u_int regProcID, int concurentClients,
704: int netBuf, u_short family, const char *csHost, u_short Port)
705: {
706: rpc_srv_t *srv = NULL;
707: int n = 1;
708: struct hostent *host = NULL;
709: io_sockaddr_t sa;
710:
711: if (!concurentClients || !regProgID ||
712: (family != AF_INET && family != AF_INET6 && family != AF_LOCAL)) {
713: rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t init RPC server ...\n");
714: return NULL;
715: }
716: if (!Port)
717: Port = RPC_DEFPORT;
718: if (!netBuf)
719: netBuf = BUFSIZ;
720: else
721: netBuf = io_align(netBuf, 1); /* align netBuf length */
722: if (csHost && family != AF_LOCAL) {
723: host = gethostbyname2(csHost, family);
724: if (!host) {
725: rpc_SetErr(h_errno, "Error:: %s\n", hstrerror(h_errno));
726: return NULL;
727: }
728: }
729: memset(&sa, 0, sizeof sa);
730: sa.sa.sa_family = family;
731: switch (family) {
732: case AF_INET:
733: sa.sin.sin_len = sizeof(struct sockaddr_in);
734: sa.sin.sin_port = htons(Port);
735: if (csHost)
736: memcpy(&sa.sin.sin_addr, host->h_addr, host->h_length);
737: break;
738: case AF_INET6:
739: sa.sin6.sin6_len = sizeof(struct sockaddr_in6);
740: sa.sin6.sin6_port = htons(Port);
741: if (csHost)
742: memcpy(&sa.sin6.sin6_addr, host->h_addr, host->h_length);
743: break;
744: case AF_LOCAL:
745: sa.sun.sun_len = sizeof(struct sockaddr_un);
746: if (csHost)
747: strlcpy(sa.sun.sun_path, csHost, sizeof sa.sun.sun_path);
748: unlink(sa.sun.sun_path);
749: break;
750: default:
751: rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t start RPC server ...\n");
752: return NULL;
753: }
754:
755: srv = malloc(sizeof(rpc_srv_t));
756: if (!srv) {
757: LOGERR;
758: return NULL;
759: } else
760: memset(srv, 0, sizeof(rpc_srv_t));
761:
762: srv->srv_netbuf = netBuf;
763: srv->srv_numcli = concurentClients;
764: srv->srv_session.sess_version = RPC_VERSION;
765: srv->srv_session.sess_timeout = DEF_RPC_TIMEOUT;
766: srv->srv_session.sess_program = regProgID;
767: srv->srv_session.sess_process = regProcID;
768:
769: srv->srv_server.cli_tid = pthread_self();
770: srv->srv_server.cli_parent = srv;
771: memcpy(&srv->srv_server.cli_sa, &sa, sizeof sa);
772:
773: /* create server socket */
774: srv->srv_server.cli_sock = socket(family, SOCK_STREAM, 0);
775: if (srv->srv_server.cli_sock == -1) {
776: LOGERR;
777: free(srv);
778: return NULL;
779: }
780: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
781: LOGERR;
782: close(srv->srv_server.cli_sock);
783: free(srv);
784: return NULL;
785: }
786: n = srv->srv_netbuf;
787: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
788: LOGERR;
789: close(srv->srv_server.cli_sock);
790: free(srv);
791: return NULL;
792: }
793: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
794: LOGERR;
795: close(srv->srv_server.cli_sock);
796: free(srv);
797: return NULL;
798: }
799: if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa,
800: srv->srv_server.cli_sa.sa.sa_len) == -1) {
801: LOGERR;
802: close(srv->srv_server.cli_sock);
803: free(srv);
804: return NULL;
805: }
806:
807: /* allocate pool for concurent clients */
808: srv->srv_clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));
809: if (!srv->srv_clients) {
810: LOGERR;
811: close(srv->srv_server.cli_sock);
812: free(srv);
813: return NULL;
814: } else
815: memset(srv->srv_clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));
816:
817: rpc_srv_registerCall(srv, NULL, CALL_SRVSHUTDOWN, 0);
818: rpc_srv_registerCall(srv, NULL, CALL_SRVCLIENTS, 1);
819: rpc_srv_registerCall(srv, NULL, CALL_SRVSESSIONS, 4);
820: rpc_srv_registerCall(srv, NULL, CALL_SRVCALLS, 1);
821:
822: return srv;
823: }
824:
825: /*
826: * rpc_srv_endServer() - Destroy RPC server, close all opened sockets and free resources
827: *
828: * @psrv = RPC Server instance
829: * return: none
830: */
831: void
832: rpc_srv_endServer(rpc_srv_t ** __restrict psrv)
833: {
834: rpc_cli_t *c;
835: register int i;
836: rpc_func_t *f;
837:
838: if (!psrv || !*psrv) {
839: rpc_SetErr(EINVAL, "Error:: Can`t destroy server because parameter is null!\n");
840: return;
841: }
842:
843: rpc_srv_endBLOBServer(*psrv);
844:
845: /* close all clients connections & server socket */
846: for (i = 0, c = (*psrv)->srv_clients; i < (*psrv)->srv_numcli && c; i++, c++)
847: if (c->cli_sa.sa.sa_family) {
848: shutdown(c->cli_sock, SHUT_RDWR);
849: close(c->cli_sock);
850: }
851: close((*psrv)->srv_server.cli_sock);
852:
853: if ((*psrv)->srv_clients) {
854: free((*psrv)->srv_clients);
855: (*psrv)->srv_clients = NULL;
856: (*psrv)->srv_numcli = 0;
857: }
858:
859: /* detach exported calls */
860: while ((f = (*psrv)->srv_funcs)) {
861: (*psrv)->srv_funcs = f->func_next;
862: io_freeVars(&f->func_vars);
863: AIT_FREE_VAL(&f->func_name);
864: AIT_FREE_VAL(&f->func_file);
865: free(f);
866: }
867:
868: free(*psrv);
869: *psrv = NULL;
870: }
871:
872: /*
873: * rpc_srv_loopServer() - Execute Main server loop and wait for clients requests
874: *
875: * @srv = RPC Server instance
876: * return: -1 error or 0 ok, infinite loop ...
877: */
878: int
879: rpc_srv_loopServer(rpc_srv_t * __restrict srv)
880: {
881: socklen_t salen = sizeof(io_sockaddr_t);
882: register int i;
883: rpc_cli_t *c;
884: int ret;
885: pthread_attr_t attr;
886: struct pollfd pfd;
887:
888: if (!srv) {
889: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start RPC server ...\n");
890: return -1;
891: }
892:
893: /* activate BLOB server worker if srv->srv_blob.state == enable */
894: rpc_srv_execBLOBServer(srv);
895:
896: if (listen(srv->srv_server.cli_sock, SOMAXCONN) == -1) {
897: LOGERR;
898: return -1;
899: } else
900: fcntl(srv->srv_server.cli_sock, F_SETFL, fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
901:
902: pthread_attr_init(&attr);
903: pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
904:
905: pfd.fd = srv->srv_server.cli_sock;
906: pfd.events = POLLIN | POLLPRI;
907: /* main rpc loop */
908: while (srv->srv_kill != kill) {
909: for (c = srv->srv_clients, i = 0; i < srv->srv_numcli && c; i++, c++)
910: if (!c->cli_sa.sa.sa_family)
911: break;
912: if (i >= srv->srv_numcli) {
913: #ifdef HAVE_PTHREAD_YIELD
914: pthread_yield();
915: #else
916: usleep(1000000);
917: #endif
918: continue;
919: }
920:
921: if ((ret = poll(&pfd, 1, srv->srv_session.sess_timeout * 1000)) == -1 ||
922: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
923: LOGERR;
924: ret = 1;
925: break;
926: } else if (!ret)
927: continue;
928:
929: c->cli_sock = accept(srv->srv_server.cli_sock, &c->cli_sa.sa, &salen);
930: if (c->cli_sock == -1) {
931: LOGERR;
932: continue;
933: } else
934: c->cli_parent = srv;
935:
936: /* spawn rpc client dispatcher */
937: c->cli_kill = enable;
938: if (pthread_create(&c->cli_tid, &attr, rpc_srv_dispatchCall, c)) {
939: LOGERR;
940: continue;
941: }
942: }
943:
944: pthread_attr_destroy(&attr);
945: return 0;
946: }
947:
948: // ---------------------------------------------------------
949:
950: /*
951: * rpc_srv_execCall() Execute registered call from RPC server
952: *
953: * @call = Register RPC call
954: * @rpc = IN RPC call structure
955: * @args = IN RPC calling arguments from RPC client
956: * return: -1 error, !=-1 ok
957: */
958: int
959: rpc_srv_execCall(rpc_func_t * __restrict call, struct tagRPCCall * __restrict rpc,
960: array_t * __restrict args)
961: {
962: void *dl;
963: rpc_callback_t func;
964: int ret;
965:
966: if (!call || !rpc || !call->func_parent) {
967: rpc_SetErr(EINVAL, "Invalid parameter can`t exec function");
968: return -1;
969: }
970:
971: dl = dlopen(AIT_ADDR(&call->func_file), RTLD_NOW);
972: if (!dl) {
973: rpc_SetErr(ENOENT, "Can`t attach module %s!", dlerror());
974: return -1;
975: }
976:
977: func = dlsym(dl, (const char*) AIT_GET_STR(&call->func_name));
978: if (func)
979: ret = func(call, ntohs(rpc->call_argc), args);
980: else {
981: rpc_SetErr(ENOEXEC, "Can`t find function %s!", dlerror());
982: ret = -1;
983: }
984:
985: dlclose(dl);
986: return ret;
987: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>