1: /*************************************************************************
2: * (C) 2010 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
6: * $Id: srv.c,v 1.5.2.7 2011/10/31 14:58:33 misho Exp $
7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011
16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47:
48:
49: static void *
50: rpc_srv_dispatchCall(void *arg)
51: {
52: rpc_cli_t *c = arg;
53: rpc_srv_t *s;
54: rpc_func_t *f = NULL;
55: array_t *arr;
56: struct tagRPCCall *rpc;
57: struct tagRPCRet *rrpc;
58: rpc_sess_t ses = { 0 };
59: fd_set fds;
60: u_char *buf;
61: int ret, argc = 0, Limit = 0;
62: register int i;
63: uint16_t tag = 0;
64: uint32_t hash = 0;
65:
66: if (!arg) {
67: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t procced RPC client ...\n");
68: return NULL;
69: } else
70: s = c->cli_parent;
71:
72: buf = malloc(s->srv_netbuf);
73: if (!buf) {
74: LOGERR;
75: return NULL;
76: }
77:
78: do {
79: FD_ZERO(&fds);
80: FD_SET(c->cli_sock, &fds);
81: ret = select(c->cli_sock + 1, &fds, NULL, NULL, NULL);
82: if (ret == -1) {
83: if (errno == EINTR && s->srv_kill != kill)
84: continue;
85:
86: LOGERR;
87: ret = -2;
88: break;
89: }
90: memset(buf, 0, s->srv_netbuf);
91: ret = recv(c->cli_sock, buf, s->srv_netbuf, 0);
92: if (ret == -1) {
93: LOGERR;
94: ret = -3;
95: break;
96: }
97: if (!ret) { /* receive EOF */
98: ret = 0;
99: break;
100: }
101: if (ret < sizeof(struct tagRPCCall)) {
102: rpc_SetErr(ERPCMISMATCH, "Error:: too short RPC packet ...\n");
103: ret = -4;
104: break;
105: } else
106: rpc = (struct tagRPCCall*) buf;
107: /* check RPC packet session info */
108: if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) {
109: rpc_SetErr(ERPCMISMATCH, "Error:: get invalid RPC session ...\n");
110: ret = -5;
111: goto makeReply;
112: } else
113: Limit = sizeof(struct tagRPCCall);
114:
115: tag = rpc->call_tag;
116: hash = rpc->call_hash;
117:
118: /* RPC is OK! Go decapsulate variables ... */
119: if (ntohs(rpc->call_argc)) {
120: arr = io_buffer2vals(buf + Limit, s->srv_netbuf - Limit,
121: ntohs(rpc->call_argc), 1);
122: if (!arr) {
123: ret = -5;
124: goto makeReply;
125: }
126: } else
127: arr = NULL;
128:
129: /* execute call */
130: argc = 0;
131: memcpy(&ses, &rpc->call_session, sizeof ses);
132: if (!(f = rpc_srv_getCall(s, ntohs(tag), ntohl(hash)))) {
133: rpc_SetErr(EPROGUNAVAIL, "Error:: call not found into RPC server ...\n");
134: ret = -6;
135: } else
136: if ((ret = rpc_srv_execCall(f, rpc, arr)) == -1)
137: ret = -9;
138: else {
139: if (arr)
140: io_arrayDestroy(&arr);
141: argc = rpc_srv_getVars(f, &arr);
142: goto makeReply; /* Call finish OK */
143: }
144:
145: if (arr)
146: io_arrayDestroy(&arr);
147:
148: makeReply:
149: /* Made reply */
150: memset(buf, 0, s->srv_netbuf);
151: rrpc = (struct tagRPCRet*) buf;
152: Limit = sizeof(struct tagRPCRet);
153:
154: memcpy(&rrpc->ret_session, &ses, sizeof(rpc_sess_t));
155: rrpc->ret_tag = tag;
156: rrpc->ret_hash = hash;
157: rrpc->ret_errno = htonl(rpc_Errno);
158: rrpc->ret_retcode = htonl(ret);
159: rrpc->ret_argc = htons(argc);
160:
161: if (argc && arr) {
162: /* Go Encapsulate variables ... */
163: if ((i = io_vals2buffer(buf + Limit, s->srv_netbuf - Limit, arr)) == -1) {
164: rpc_srv_freeVals(f);
165: argc = 0;
166: ret = -7;
167: rpc_SetErr(EBADRPC, "Error:: in prepare RPC packet values (-7) ...\n");
168: goto makeReply;
169: } else {
170: Limit += i;
171:
172: rpc_srv_freeVals(f);
173: }
174: }
175:
176: ret = send(c->cli_sock, buf, Limit, 0);
177: if (ret == -1) {
178: LOGERR;
179: ret = -8;
180: break;
181: }
182: if (ret != Limit) {
183: rpc_SetErr(EPROCUNAVAIL, "Error:: in send RPC request, should be send %d bytes, "
184: "really is %d\n", Limit, ret);
185: ret = -9;
186: break;
187: }
188: } while (ret > -1 || s->srv_kill != kill);
189:
190: shutdown(c->cli_sock, SHUT_RDWR);
191: close(c->cli_sock);
192: memset(c, 0, sizeof(rpc_cli_t));
193: free(buf);
194: return (void*) (long)ret;
195: }
196:
197:
198: static void *
199: rpc_srv_dispatchVars(void *arg)
200: {
201: rpc_cli_t *c = arg;
202: rpc_srv_t *s;
203: rpc_blob_t *b;
204: int ret = 0;
205: fd_set fds;
206: u_char buf[sizeof(struct tagBLOBHdr)];
207: struct tagBLOBHdr *blob;
208:
209: if (!arg) {
210: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t procced BLOB client ...\n");
211: return NULL;
212: } else
213: s = c->cli_parent;
214:
215: do {
216: /* check for disable service at this moment? */
217: if (s->srv_blob.state == disable && s->srv_kill != kill) {
218: usleep(100000);
219: #ifdef HAVE_PTHREAD_YIELD
220: pthread_yield();
221: #endif
222: continue;
223: }
224:
225: FD_ZERO(&fds);
226: FD_SET(c->cli_sock, &fds);
227: ret = select(c->cli_sock + 1, &fds, NULL, NULL, NULL);
228: if (ret == -1) {
229: if (errno == EINTR && s->srv_kill != kill && s->srv_blob.state != kill)
230: continue;
231:
232: LOGERR;
233: ret = -2;
234: break;
235: }
236:
237: memset(buf, 0, sizeof buf);
238: ret = recv(c->cli_sock, buf, sizeof buf, 0);
239: if (ret == -1) {
240: LOGERR;
241: ret = -3;
242: break;
243: }
244: /* receive EOF, disable or kill service */
245: if (!ret || s->srv_blob.state == kill || s->srv_kill == kill) {
246: ret = 0;
247: break;
248: }
249: if (ret < sizeof(struct tagBLOBHdr)) {
250: rpc_SetErr(ERPCMISMATCH, "Error:: too short BLOB packet ...\n");
251: ret = -4;
252: break;
253: } else
254: blob = (struct tagBLOBHdr*) buf;
255: /* check BLOB packet session info */
256: if (memcmp(&blob->hdr_session, &s->srv_session, sizeof blob->hdr_session)) {
257: rpc_SetErr(EINVAL, "Error:: get invalid BLOB session ...\n");
258: ret = -5;
259: goto makeReply;
260: }
261: /* Go to proceed packet ... */
262: switch (blob->hdr_cmd) {
263: case get:
264: if (!(b = rpc_srv_getBLOB(s, blob->hdr_var))) {
265: rpc_SetErr(EINVAL, "Error:: var (%x) not found into BLOB server ...\n",
266: blob->hdr_var);
267: ret = -6;
268: break;
269: } else
270: blob->hdr_len = b->blob_len;
271:
272: if (rpc_srv_blobMap(s, b) != -1) {
273: ret = rpc_srv_sendBLOB(c, b);
274: rpc_srv_blobUnmap(b);
275: } else
276: ret = -7;
277: break;
278: case set:
279: if ((b = rpc_srv_registerBLOB(s, blob->hdr_len))) {
280: /* set new BLOB variable for reply :) */
281: blob->hdr_var = b->blob_var;
282:
283: ret = rpc_srv_recvBLOB(c, b);
284: rpc_srv_blobUnmap(b);
285: } else
286: ret = -7;
287: break;
288: case unset:
289: ret = rpc_srv_unregisterBLOB(s, blob->hdr_var);
290: if (ret == -1)
291: ret = -7;
292: break;
293: default:
294: rpc_SetErr(EPROCUNAVAIL, "Error:: unsupported BLOB command (%d)...\n",
295: blob->hdr_cmd);
296: ret = -7;
297: }
298:
299: makeReply:
300: /* Replay to client! */
301: blob->hdr_cmd = ret < 0 ? error : ok;
302: blob->hdr_ret = ret;
303: ret = send(c->cli_sock, buf, sizeof buf, 0);
304: if (ret == -1) {
305: LOGERR;
306: ret = -8;
307: break;
308: }
309: if (ret != sizeof buf) {
310: rpc_SetErr(EPROCUNAVAIL, "Error:: in send BLOB reply, should be send %d bytes, "
311: "really is %d\n", sizeof buf, ret);
312: ret = -9;
313: break;
314: }
315: } while (ret > -1 || s->srv_kill != kill);
316:
317: shutdown(c->cli_sock, SHUT_RDWR);
318: close(c->cli_sock);
319: memset(c, 0, sizeof(rpc_cli_t));
320: return (void*) ((long)ret);
321: }
322:
323: // -------------------------------------------------
324:
325: /*
326: * rpc_srv_initBLOBServer() Init & create BLOB Server
327: * @srv = RPC server instance
328: * @Port = Port for bind server, if Port == 0 default port is selected
329: * @diskDir = Disk place for BLOB file objects
330: * return: -1 == error or 0 bind and created BLOB server instance
331: */
332: int
333: rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)
334: {
335: int n = 1;
336: io_sockaddr_t sa;
337:
338: if (!srv) {
339: rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t init BLOB server ...\n");
340: return -1;
341: }
342: if (srv->srv_blob.state) {
343: rpc_SetErr(EPERM, "Warning:: Already started BLOB server!\n");
344: return 0;
345: }
346: if (!Port)
347: Port = RPC_DEFPORT + 1;
348:
349: memset(&srv->srv_blob, 0, sizeof srv->srv_blob);
350: if (access(diskDir, R_OK | W_OK) == -1) {
351: LOGERR;
352: return -1;
353: } else
354: strlcpy(srv->srv_blob.dir, diskDir, UCHAR_MAX + 1);
355:
356: srv->srv_blob.server.cli_tid = pthread_self();
357: srv->srv_blob.server.cli_parent = srv;
358:
359: memcpy(&sa, &srv->srv_server.cli_sa, sizeof sa);
360: switch (srv->srv_server.cli_sa.sa.sa_family) {
361: case AF_INET:
362: sa.sin.sin_port = htons(Port);
363: memcpy(&srv->srv_blob.server.cli_sa, &sa, sizeof sa);
364: break;
365: case AF_INET6:
366: sa.sin6.sin6_port = htons(Port);
367: memcpy(&srv->srv_blob.server.cli_sa, &sa, sizeof sa);
368: break;
369: case AF_LOCAL:
370: strlcat(sa.sun.sun_path, ".blob", sizeof sa.sun.sun_path);
371: memcpy(&srv->srv_blob.server.cli_sa, &sa, sizeof sa);
372: break;
373: default:
374: return -1;
375: }
376:
377: /* create BLOB server socket */
378: srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);
379: if (srv->srv_blob.server.cli_sock == -1) {
380: LOGERR;
381: return -1;
382: }
383: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
384: LOGERR;
385: close(srv->srv_blob.server.cli_sock);
386: return -1;
387: }
388: n = srv->srv_netbuf;
389: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
390: LOGERR;
391: close(srv->srv_blob.server.cli_sock);
392: return -1;
393: }
394: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
395: LOGERR;
396: close(srv->srv_blob.server.cli_sock);
397: return -1;
398: }
399: if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa,
400: srv->srv_blob.server.cli_sa.sa.sa_len) == -1) {
401: LOGERR;
402: close(srv->srv_blob.server.cli_sock);
403: return -1;
404: }
405:
406: /* allocate pool for concurent clients */
407: srv->srv_blob.clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));
408: if (!srv->srv_blob.clients) {
409: LOGERR;
410: close(srv->srv_blob.server.cli_sock);
411: return -1;
412: } else
413: memset(srv->srv_blob.clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));
414:
415: pthread_mutex_init(&srv->srv_blob.mtx, NULL);
416:
417: rpc_srv_registerCall(srv, NULL, CALL_BLOBSHUTDOWN, 0);
418: rpc_srv_registerCall(srv, NULL, CALL_BLOBCLIENTS, 1);
419: rpc_srv_registerCall(srv, NULL, CALL_BLOBVARS, 1);
420: rpc_srv_registerCall(srv, NULL, CALL_BLOBSTATE, 0);
421:
422: srv->srv_blob.state = enable; /* enable BLOB */
423: return 0;
424: }
425:
426: /*
427: * rpc_srv_endBLOBServer() Destroy BLOB server, close all opened sockets and free resources
428: * @srv = RPC Server instance
429: * return: none
430: */
431: void
432: rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
433: {
434: rpc_cli_t *c;
435: register int i;
436: rpc_blob_t *f;
437:
438: if (!srv) {
439: rpc_SetErr(EINVAL, "Error:: Can`t destroy server because parameter is null!\n");
440: return;
441: } else
442: srv->srv_blob.state = kill;
443:
444: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSHUTDOWN);
445: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBCLIENTS);
446: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBVARS);
447: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSTATE);
448:
449: /* close all clients connections & server socket */
450: for (i = 0, c = srv->srv_blob.clients; i < srv->srv_numcli && c; i++, c++)
451: if (c->cli_sa.sa.sa_family)
452: shutdown(c->cli_sock, SHUT_RDWR);
453: close(srv->srv_blob.server.cli_sock);
454:
455: pthread_mutex_lock(&srv->srv_blob.mtx);
456: if (srv->srv_blob.clients) {
457: free(srv->srv_blob.clients);
458: srv->srv_blob.clients = NULL;
459: }
460:
461: /* detach blobs */
462: while ((f = srv->srv_blob.blobs)) {
463: srv->srv_blob.blobs = f->blob_next;
464: rpc_srv_blobFree(srv, f);
465: free(f);
466: }
467: pthread_mutex_unlock(&srv->srv_blob.mtx);
468:
469: while (pthread_mutex_trylock(&srv->srv_blob.mtx) == EBUSY);
470: pthread_mutex_destroy(&srv->srv_blob.mtx);
471: }
472:
473: /*
474: * rpc_srv_loopBLOB() Execute Main BLOB server loop and wait for clients requests
475: * @srv = RPC Server instance
476: * return: -1 error or 0 ok, infinite loop ...
477: */
478: int
479: rpc_srv_loopBLOB(rpc_srv_t * __restrict srv)
480: {
481: socklen_t salen = sizeof(io_sockaddr_t);
482: register int i;
483: rpc_cli_t *c;
484: fd_set fds;
485: int ret;
486: struct timeval tv = { DEF_RPC_TIMEOUT, 0 };
487:
488: if (!srv || srv->srv_blob.state == kill) {
489: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start BLOB server ...\n");
490: return -1;
491: }
492:
493: if (listen(srv->srv_blob.server.cli_sock, SOMAXCONN) == -1) {
494: LOGERR;
495: return -1;
496: }
497:
498: while (srv->srv_blob.state != kill && srv->srv_kill != kill) {
499: for (c = srv->srv_blob.clients, i = 0; i < srv->srv_numcli && c; i++, c++)
500: if (!c->cli_sa.sa.sa_family)
501: break;
502: if (i >= srv->srv_numcli) {
503: #ifdef HAVE_PTHREAD_YIELD
504: pthread_yield();
505: #else
506: usleep(1000000);
507: #endif
508: continue;
509: }
510:
511: FD_ZERO(&fds);
512: FD_SET(srv->srv_blob.server.cli_sock, &fds);
513: ret = select(srv->srv_blob.server.cli_sock + 1, &fds, NULL, NULL, &tv);
514: if (ret == -1) {
515: LOGERR;
516: ret = 1;
517: break;
518: }
519: if (!ret)
520: continue;
521:
522: c->cli_sock = accept(srv->srv_blob.server.cli_sock, &c->cli_sa.sa, &salen);
523: if (c->cli_sock == -1) {
524: LOGERR;
525: continue;
526: } else
527: c->cli_parent = srv;
528:
529: if (pthread_create(&c->cli_tid, NULL, rpc_srv_dispatchVars, c)) {
530: LOGERR;
531: continue;
532: } else
533: pthread_detach(c->cli_tid);
534: }
535:
536: srv->srv_blob.state = kill;
537:
538: return 0;
539: }
540:
541:
542: /*
543: * rpc_srv_initServer() Init & create RPC Server
544: * @regProgID = ProgramID for authentication & recognition
545: * @regProcID = ProcessID for authentication & recognition
546: * @concurentClients = Concurent clients at same time to this server
547: * @netBuf = Network buffer length, if =0 == BUFSIZ (also meaning max RPC packet)
548: * @family = Family type, AF_INET, AF_INET6 or AF_LOCAL
549: * @csHost = Host name or address for bind server, if NULL any address
550: * @Port = Port for bind server, if Port == 0 default port is selected
551: * return: NULL == error or !=NULL bind and created RPC server instance
552: */
553: rpc_srv_t *
554: rpc_srv_initServer(u_int regProgID, u_int regProcID, int concurentClients,
555: int netBuf, u_short family, const char *csHost, u_short Port)
556: {
557: rpc_srv_t *srv = NULL;
558: int n = 1;
559: struct hostent *host = NULL;
560: io_sockaddr_t sa;
561:
562: if (!concurentClients || !regProgID ||
563: (family != AF_INET && family != AF_INET6 && family != AF_LOCAL)) {
564: rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t init RPC server ...\n");
565: return NULL;
566: }
567: if (!Port)
568: Port = RPC_DEFPORT;
569: if (!netBuf)
570: netBuf = BUFSIZ;
571: if (csHost && family != AF_LOCAL) {
572: host = gethostbyname2(csHost, family);
573: if (!host) {
574: rpc_SetErr(h_errno, "Error:: %s\n", hstrerror(h_errno));
575: return NULL;
576: }
577: }
578: memset(&sa, 0, sizeof sa);
579: sa.sa.sa_family = family;
580: switch (family) {
581: case AF_INET:
582: sa.sin.sin_len = sizeof(struct sockaddr_in);
583: sa.sin.sin_port = htons(Port);
584: if (csHost)
585: memcpy(&sa.sin.sin_addr, host->h_addr, host->h_length);
586: break;
587: case AF_INET6:
588: sa.sin6.sin6_len = sizeof(struct sockaddr_in6);
589: sa.sin6.sin6_port = htons(Port);
590: if (csHost)
591: memcpy(&sa.sin6.sin6_addr, host->h_addr, host->h_length);
592: break;
593: case AF_LOCAL:
594: sa.sun.sun_len = sizeof(struct sockaddr_un);
595: if (csHost)
596: strlcpy(sa.sun.sun_path, csHost, sizeof sa.sun.sun_path);
597: unlink(sa.sun.sun_path);
598: break;
599: default:
600: rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t start RPC server ...\n");
601: return NULL;
602: }
603:
604: srv = malloc(sizeof(rpc_srv_t));
605: if (!srv) {
606: LOGERR;
607: return NULL;
608: } else
609: memset(srv, 0, sizeof(rpc_srv_t));
610:
611: srv->srv_netbuf = netBuf;
612: srv->srv_numcli = concurentClients;
613: srv->srv_session.sess_version = RPC_VERSION;
614: srv->srv_session.sess_program = regProgID;
615: srv->srv_session.sess_process = regProcID;
616:
617: srv->srv_server.cli_tid = pthread_self();
618: srv->srv_server.cli_parent = srv;
619: memcpy(&srv->srv_server.cli_sa, &sa, sizeof sa);
620:
621: /* create server socket */
622: srv->srv_server.cli_sock = socket(family, SOCK_STREAM, 0);
623: if (srv->srv_server.cli_sock == -1) {
624: LOGERR;
625: free(srv);
626: return NULL;
627: }
628: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
629: LOGERR;
630: close(srv->srv_server.cli_sock);
631: free(srv);
632: return NULL;
633: }
634: n = srv->srv_netbuf;
635: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
636: LOGERR;
637: close(srv->srv_server.cli_sock);
638: free(srv);
639: return NULL;
640: }
641: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
642: LOGERR;
643: close(srv->srv_server.cli_sock);
644: free(srv);
645: return NULL;
646: }
647: if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa,
648: srv->srv_server.cli_sa.sa.sa_len) == -1) {
649: LOGERR;
650: close(srv->srv_server.cli_sock);
651: free(srv);
652: return NULL;
653: }
654:
655: /* allocate pool for concurent clients */
656: srv->srv_clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));
657: if (!srv->srv_clients) {
658: LOGERR;
659: close(srv->srv_server.cli_sock);
660: free(srv);
661: return NULL;
662: } else
663: memset(srv->srv_clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));
664:
665: pthread_mutex_init(&srv->srv_mtx, NULL);
666:
667: rpc_srv_registerCall(srv, NULL, CALL_SRVSHUTDOWN, 0);
668: rpc_srv_registerCall(srv, NULL, CALL_SRVCLIENTS, 1);
669: rpc_srv_registerCall(srv, NULL, CALL_SRVSESSIONS, 4);
670: rpc_srv_registerCall(srv, NULL, CALL_SRVCALLS, 1);
671: return srv;
672: }
673:
674: /*
675: * rpc_srv_endServer() Destroy RPC server, close all opened sockets and free resources
676: * @psrv = RPC Server instance
677: * return: none
678: */
679: void
680: rpc_srv_endServer(rpc_srv_t ** __restrict psrv)
681: {
682: rpc_cli_t *c;
683: register int i;
684: rpc_func_t *f;
685:
686: if (!psrv || !*psrv) {
687: rpc_SetErr(EINVAL, "Error:: Can`t destroy server because parameter is null!\n");
688: return;
689: }
690:
691: rpc_srv_endBLOBServer(*psrv);
692:
693: /* close all clients connections & server socket */
694: for (i = 0, c = (*psrv)->srv_clients; i < (*psrv)->srv_numcli && c; i++, c++)
695: if (c->cli_sa.sa.sa_family) {
696: shutdown(c->cli_sock, SHUT_RDWR);
697: close(c->cli_sock);
698: }
699: close((*psrv)->srv_server.cli_sock);
700:
701: if ((*psrv)->srv_clients) {
702: free((*psrv)->srv_clients);
703: (*psrv)->srv_clients = NULL;
704: (*psrv)->srv_numcli = 0;
705: }
706:
707: /* detach exported calls */
708: pthread_mutex_lock(&(*psrv)->srv_mtx);
709: while ((f = (*psrv)->srv_funcs)) {
710: (*psrv)->srv_funcs = f->func_next;
711: rpc_srv_destroyVars(f);
712: free(f);
713: }
714: pthread_mutex_unlock(&(*psrv)->srv_mtx);
715:
716: while (pthread_mutex_trylock(&(*psrv)->srv_mtx) == EBUSY);
717: pthread_mutex_destroy(&(*psrv)->srv_mtx);
718:
719: free(*psrv);
720: *psrv = NULL;
721: }
722:
723: /*
724: * rpc_srv_loopServer() Execute Main server loop and wait for clients requests
725: * @srv = RPC Server instance
726: * return: -1 error or 0 ok, infinite loop ...
727: */
728: int
729: rpc_srv_loopServer(rpc_srv_t * __restrict srv)
730: {
731: socklen_t salen = sizeof(io_sockaddr_t);
732: register int i;
733: rpc_cli_t *c;
734: fd_set fds;
735: int ret;
736: struct timeval tv = { DEF_RPC_TIMEOUT, 0 };
737:
738: if (!srv) {
739: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start RPC server ...\n");
740: return -1;
741: }
742:
743: /* activate BLOB server worker if srv->srv_blob.state == enable */
744: rpc_srv_execBLOBServer(srv);
745:
746: if (listen(srv->srv_server.cli_sock, SOMAXCONN) == -1) {
747: LOGERR;
748: return -1;
749: }
750:
751: while (srv->srv_kill != kill) {
752: for (c = srv->srv_clients, i = 0; i < srv->srv_numcli && c; i++, c++)
753: if (!c->cli_sa.sa.sa_family)
754: break;
755: if (i >= srv->srv_numcli) {
756: #ifdef HAVE_PTHREAD_YIELD
757: pthread_yield();
758: #else
759: usleep(1000000);
760: #endif
761: continue;
762: }
763:
764: FD_ZERO(&fds);
765: FD_SET(srv->srv_server.cli_sock, &fds);
766: ret = select(srv->srv_server.cli_sock + 1, &fds, NULL, NULL, &tv);
767: if (ret == -1) {
768: LOGERR;
769: ret = 1;
770: break;
771: }
772: if (!ret)
773: continue;
774:
775: c->cli_sock = accept(srv->srv_server.cli_sock, &c->cli_sa.sa, &salen);
776: if (c->cli_sock == -1) {
777: LOGERR;
778: continue;
779: } else
780: c->cli_parent = srv;
781:
782: if (pthread_create(&c->cli_tid, NULL, rpc_srv_dispatchCall, c)) {
783: LOGERR;
784: continue;
785: } else
786: pthread_detach(c->cli_tid);
787: }
788:
789: return 0;
790: }
791:
792: // ---------------------------------------------------------
793:
794: /*
795: * rpc_srv_execCall() Execute registered call from RPC server
796: * @call = Register RPC call
797: * @rpc = IN RPC call structure
798: * @args = IN RPC calling arguments from RPC client
799: * return: -1 error, !=-1 ok
800: */
801: int
802: rpc_srv_execCall(rpc_func_t * __restrict call, struct tagRPCCall * __restrict rpc,
803: array_t * __restrict args)
804: {
805: void *dl;
806: rpc_callback_t func;
807: int ret;
808:
809: if (!call || !rpc || !call->func_parent) {
810: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t exec call from RPC server ...\n");
811: return -1;
812: }
813:
814: dl = dlopen((char*) (*call->func_file ? call->func_file : NULL), RTLD_NOW);
815: if (!dl) {
816: rpc_SetErr(ENOENT, "Error:: Can`t attach module %s!\n", dlerror());
817: return -1;
818: }
819:
820: func = dlsym(dl, (char*) call->func_name);
821: if (func)
822: ret = func(call, ntohs(rpc->call_argc), args);
823: else {
824: rpc_SetErr(ENOEXEC, "Error:: Can`t find function %s!\n", dlerror());
825: ret = -1;
826: }
827:
828: dlclose(dl);
829: return ret;
830: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>