1: /*************************************************************************
2: * (C) 2010 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
6: * $Id: srv.c,v 1.5.2.2 2011/09/07 09:10:55 misho Exp $
7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011
16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47:
48:
49: static void *
50: rpc_srv_dispatchCall(void *arg)
51: {
52: rpc_cli_t *c = arg;
53: rpc_srv_t *s;
54: rpc_func_t *f = NULL;
55: array_t *arr;
56: struct tagRPCCall *rpc;
57: struct tagRPCRet *rrpc;
58: rpc_sess_t ses = { 0 };
59: fd_set fds;
60: u_char *buf;
61: int ret, argc = 0, Limit = 0;
62: register int i;
63:
64: if (!arg) {
65: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t procced RPC client ...\n");
66: return NULL;
67: } else
68: s = c->cli_parent;
69:
70: buf = malloc(s->srv_netbuf);
71: if (!buf) {
72: LOGERR;
73: return NULL;
74: }
75:
76: do {
77: FD_ZERO(&fds);
78: FD_SET(c->cli_sock, &fds);
79: ret = select(c->cli_sock + 1, &fds, NULL, NULL, NULL);
80: if (ret == -1) {
81: if (errno == EINTR && s->srv_kill != kill)
82: continue;
83:
84: LOGERR;
85: ret = -2;
86: break;
87: }
88: memset(buf, 0, s->srv_netbuf);
89: ret = recv(c->cli_sock, buf, s->srv_netbuf, 0);
90: if (ret == -1) {
91: LOGERR;
92: ret = -3;
93: break;
94: }
95: if (!ret) { /* receive EOF */
96: ret = 0;
97: break;
98: }
99: if (ret < sizeof(struct tagRPCCall)) {
100: rpc_SetErr(ERPCMISMATCH, "Error:: too short RPC packet ...\n");
101: ret = -4;
102: break;
103: } else
104: rpc = (struct tagRPCCall*) buf;
105: /* check RPC packet session info */
106: if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) {
107: rpc_SetErr(ERPCMISMATCH, "Error:: get invalid RPC session ...\n");
108: ret = -5;
109: goto makeReply;
110: } else
111: Limit = sizeof(struct tagRPCCall);
112:
113: /* RPC is OK! Go decapsulate variables ... */
114: if (ntohs(rpc->call_argc)) {
115: arr = io_buffer2vals(buf + Limit, s->srv_netbuf - Limit,
116: ntohs(rpc->call_argc), 1);
117: if (!arr) {
118: ret = -5;
119: goto makeReply;
120: }
121: } else
122: arr = NULL;
123:
124: /* execute call */
125: argc = 0;
126: memcpy(&ses, &rpc->call_session, sizeof ses);
127: if (!(f = rpc_srv_getCall(s, ntohs(rpc->call_tag), ntohl(rpc->call_hash)))) {
128: rpc_SetErr(EPROGUNAVAIL, "Error:: call not found into RPC server ...\n");
129: ret = -6;
130: } else
131: if ((ret = rpc_srv_execCall(f, rpc, arr)) == -1)
132: ret = -9;
133: else {
134: if (arr)
135: io_arrayDestroy(&arr);
136: argc = rpc_srv_getVars(f, &arr);
137: goto makeReply; /* Call finish OK */
138: }
139:
140: if (arr)
141: io_arrayDestroy(&arr);
142:
143: makeReply:
144: /* Made reply */
145: memset(buf, 0, s->srv_netbuf);
146: rrpc = (struct tagRPCRet*) buf;
147: Limit = sizeof(struct tagRPCRet);
148:
149: memcpy(&rrpc->ret_session, &ses, sizeof(rpc_sess_t));
150: rrpc->ret_tag = rpc->call_tag;
151: rrpc->ret_hash = rpc->call_hash;
152: rrpc->ret_errno = htonl(rpc_Errno);
153: rrpc->ret_retcode = htonl(ret);
154: rrpc->ret_argc = htons(argc);
155:
156: if (argc && arr) {
157: /* Go Encapsulate variables ... */
158: if ((i = io_vals2buffer(buf + Limit, s->srv_netbuf - Limit, arr)) == -1) {
159: rpc_srv_freeVals(f);
160: argc = 0;
161: ret = -7;
162: rpc_SetErr(EBADRPC, "Error:: in prepare RPC packet values (-7) ...\n");
163: goto makeReply;
164: } else {
165: Limit += i;
166:
167: rpc_srv_freeVals(f);
168: }
169: }
170:
171: ret = send(c->cli_sock, buf, Limit, 0);
172: if (ret == -1) {
173: LOGERR;
174: ret = -8;
175: break;
176: }
177: if (ret != Limit) {
178: rpc_SetErr(EPROCUNAVAIL, "Error:: in send RPC request, should be send %d bytes, "
179: "really is %d\n", Limit, ret);
180: ret = -9;
181: break;
182: }
183: } while (ret > -1 || s->srv_kill != kill);
184:
185: shutdown(c->cli_sock, SHUT_RDWR);
186: close(c->cli_sock);
187: memset(c, 0, sizeof(rpc_cli_t));
188: free(buf);
189: return (void*) (long)ret;
190: }
191:
192:
193: static void *
194: rpc_srv_dispatchVars(void *arg)
195: {
196: rpc_cli_t *c = arg;
197: rpc_srv_t *s;
198: rpc_blob_t *b;
199: int ret = 0;
200: fd_set fds;
201: u_char buf[sizeof(struct tagBLOBHdr)];
202: struct tagBLOBHdr *blob;
203:
204: if (!arg) {
205: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t procced BLOB client ...\n");
206: return NULL;
207: } else
208: s = c->cli_parent;
209:
210: do {
211: /* check for disable service at this moment? */
212: if (s->srv_blob.state == disable && s->srv_kill != kill) {
213: usleep(100000);
214: #ifdef HAVE_PTHREAD_YIELD
215: pthread_yield();
216: #endif
217: continue;
218: }
219:
220: FD_ZERO(&fds);
221: FD_SET(c->cli_sock, &fds);
222: ret = select(c->cli_sock + 1, &fds, NULL, NULL, NULL);
223: if (ret == -1) {
224: if (errno == EINTR && s->srv_kill != kill && s->srv_blob.state != kill)
225: continue;
226:
227: LOGERR;
228: ret = -2;
229: break;
230: }
231:
232: memset(buf, 0, sizeof buf);
233: ret = recv(c->cli_sock, buf, sizeof buf, 0);
234: if (ret == -1) {
235: LOGERR;
236: ret = -3;
237: break;
238: }
239: /* receive EOF, disable or kill service */
240: if (!ret || s->srv_blob.state == kill || s->srv_kill == kill) {
241: ret = 0;
242: break;
243: }
244: if (ret < sizeof(struct tagBLOBHdr)) {
245: rpc_SetErr(ERPCMISMATCH, "Error:: too short BLOB packet ...\n");
246: ret = -4;
247: break;
248: } else
249: blob = (struct tagBLOBHdr*) buf;
250: /* check BLOB packet session info */
251: if (memcmp(&blob->hdr_session, &s->srv_session, sizeof blob->hdr_session)) {
252: rpc_SetErr(EINVAL, "Error:: get invalid BLOB session ...\n");
253: ret = -5;
254: goto makeReply;
255: }
256: /* Go to proceed packet ... */
257: switch (blob->hdr_cmd) {
258: case get:
259: if (!(b = rpc_srv_getBLOB(s, blob->hdr_var))) {
260: rpc_SetErr(EINVAL, "Error:: var (%x) not found into BLOB server ...\n",
261: blob->hdr_var);
262: ret = -6;
263: break;
264: } else
265: blob->hdr_len = b->blob_len;
266:
267: if (rpc_srv_blobMap(s, b) != -1) {
268: ret = rpc_srv_sendBLOB(c, b);
269: rpc_srv_blobUnmap(b);
270: } else
271: ret = -7;
272: break;
273: case set:
274: if ((b = rpc_srv_registerBLOB(s, blob->hdr_len))) {
275: /* set new BLOB variable for reply :) */
276: blob->hdr_var = b->blob_var;
277:
278: ret = rpc_srv_recvBLOB(c, b);
279: rpc_srv_blobUnmap(b);
280: } else
281: ret = -7;
282: break;
283: case unset:
284: ret = rpc_srv_unregisterBLOB(s, blob->hdr_var);
285: if (ret == -1)
286: ret = -7;
287: break;
288: default:
289: rpc_SetErr(EPROCUNAVAIL, "Error:: unsupported BLOB command (%d)...\n",
290: blob->hdr_cmd);
291: ret = -7;
292: }
293:
294: makeReply:
295: /* Replay to client! */
296: blob->hdr_cmd = ret < 0 ? error : ok;
297: blob->hdr_ret = ret;
298: ret = send(c->cli_sock, buf, sizeof buf, 0);
299: if (ret == -1) {
300: LOGERR;
301: ret = -8;
302: break;
303: }
304: if (ret != sizeof buf) {
305: rpc_SetErr(EPROCUNAVAIL, "Error:: in send BLOB reply, should be send %d bytes, "
306: "really is %d\n", sizeof buf, ret);
307: ret = -9;
308: break;
309: }
310: } while (ret > -1 || s->srv_kill != kill);
311:
312: shutdown(c->cli_sock, SHUT_RDWR);
313: close(c->cli_sock);
314: memset(c, 0, sizeof(rpc_cli_t));
315: return (void*) ((long)ret);
316: }
317:
318: // -------------------------------------------------
319:
320: /*
321: * rpc_srv_initBLOBServer() Init & create BLOB Server
322: * @srv = RPC server instance
323: * @Port = Port for bind server, if Port == 0 default port is selected
324: * @diskDir = Disk place for BLOB file objects
325: * return: -1 == error or 0 bind and created BLOB server instance
326: */
327: int
328: rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)
329: {
330: int n = 1;
331: struct sockaddr sa;
332: struct sockaddr_in *sin = (struct sockaddr_in*) &sa;
333: struct sockaddr_in6 *sin6 = (struct sockaddr_in6*) &sa;
334: struct sockaddr_un *sun = (struct sockaddr_un*) &sa;
335:
336: if (!srv) {
337: rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t init BLOB server ...\n");
338: return -1;
339: }
340: if (srv->srv_blob.state) {
341: rpc_SetErr(EPERM, "Warning:: Already started BLOB server!\n");
342: return 0;
343: }
344: if (!Port)
345: Port = RPC_DEFPORT + 1;
346:
347: memset(&srv->srv_blob, 0, sizeof srv->srv_blob);
348: if (access(diskDir, R_OK | W_OK) == -1) {
349: LOGERR;
350: return -1;
351: } else
352: strlcpy(srv->srv_blob.dir, diskDir, UCHAR_MAX + 1);
353:
354: srv->srv_blob.server.cli_tid = pthread_self();
355: srv->srv_blob.server.cli_parent = srv;
356:
357: memcpy(&sa, &srv->srv_server.cli_sa, sizeof sa);
358: switch (srv->srv_server.cli_sa.sa_family) {
359: case AF_INET:
360: sin->sin_port = htons(Port);
361: memcpy(&srv->srv_blob.server.cli_sa, sin, sizeof(struct sockaddr));
362: break;
363: case AF_INET6:
364: sin6->sin6_port = htons(Port);
365: memcpy(&srv->srv_blob.server.cli_sa, sin6, sizeof(struct sockaddr));
366: break;
367: case AF_LOCAL:
368: strlcat(sun->sun_path, ".blob", sizeof sun->sun_path);
369: memcpy(&srv->srv_blob.server.cli_sa, sun, sizeof(struct sockaddr));
370: break;
371: default:
372: return -1;
373: }
374:
375: /* create BLOB server socket */
376: srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa_family, SOCK_STREAM, 0);
377: if (srv->srv_blob.server.cli_sock == -1) {
378: LOGERR;
379: return -1;
380: }
381: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
382: LOGERR;
383: close(srv->srv_blob.server.cli_sock);
384: return -1;
385: }
386: n = srv->srv_netbuf;
387: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
388: LOGERR;
389: close(srv->srv_blob.server.cli_sock);
390: return -1;
391: }
392: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
393: LOGERR;
394: close(srv->srv_blob.server.cli_sock);
395: return -1;
396: }
397: if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa,
398: sizeof srv->srv_blob.server.cli_sa) == -1) {
399: LOGERR;
400: close(srv->srv_blob.server.cli_sock);
401: return -1;
402: }
403:
404: /* allocate pool for concurent clients */
405: srv->srv_blob.clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));
406: if (!srv->srv_blob.clients) {
407: LOGERR;
408: close(srv->srv_blob.server.cli_sock);
409: return -1;
410: } else
411: memset(srv->srv_blob.clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));
412:
413: pthread_mutex_init(&srv->srv_blob.mtx, NULL);
414:
415: rpc_srv_registerCall(srv, NULL, CALL_BLOBSHUTDOWN, 0);
416: rpc_srv_registerCall(srv, NULL, CALL_BLOBCLIENTS, 1);
417: rpc_srv_registerCall(srv, NULL, CALL_BLOBVARS, 1);
418: rpc_srv_registerCall(srv, NULL, CALL_BLOBSTATE, 0);
419:
420: srv->srv_blob.state = enable; /* enable BLOB */
421: return 0;
422: }
423:
424: /*
425: * rpc_srv_endBLOBServer() Destroy BLOB server, close all opened sockets and free resources
426: * @srv = RPC Server instance
427: * return: none
428: */
429: void
430: rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
431: {
432: rpc_cli_t *c;
433: register int i;
434: rpc_blob_t *f;
435:
436: if (!srv) {
437: rpc_SetErr(EINVAL, "Error:: Can`t destroy server because parameter is null!\n");
438: return;
439: } else
440: srv->srv_blob.state = kill;
441:
442: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSHUTDOWN);
443: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBCLIENTS);
444: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBVARS);
445: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSTATE);
446:
447: /* close all clients connections & server socket */
448: for (i = 0, c = srv->srv_blob.clients; i < srv->srv_numcli && c; i++, c++)
449: if (c->cli_sa.sa_family)
450: shutdown(c->cli_sock, SHUT_RDWR);
451: close(srv->srv_blob.server.cli_sock);
452:
453: pthread_mutex_lock(&srv->srv_blob.mtx);
454: if (srv->srv_blob.clients) {
455: free(srv->srv_blob.clients);
456: srv->srv_blob.clients = NULL;
457: }
458:
459: /* detach blobs */
460: while ((f = srv->srv_blob.blobs)) {
461: srv->srv_blob.blobs = f->blob_next;
462: rpc_srv_blobFree(srv, f);
463: free(f);
464: }
465: pthread_mutex_unlock(&srv->srv_blob.mtx);
466:
467: while (pthread_mutex_trylock(&srv->srv_blob.mtx) == EBUSY);
468: pthread_mutex_destroy(&srv->srv_blob.mtx);
469: }
470:
471: /*
472: * rpc_srv_loopBLOB() Execute Main BLOB server loop and wait for clients requests
473: * @srv = RPC Server instance
474: * return: -1 error or 0 ok, infinite loop ...
475: */
476: int
477: rpc_srv_loopBLOB(rpc_srv_t * __restrict srv)
478: {
479: socklen_t salen = sizeof(struct sockaddr);
480: register int i;
481: rpc_cli_t *c;
482: fd_set fds;
483: int ret;
484: struct timeval tv = { DEF_RPC_TIMEOUT, 0 };
485:
486: if (!srv || srv->srv_blob.state == kill) {
487: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start BLOB server ...\n");
488: return -1;
489: }
490:
491: if (listen(srv->srv_blob.server.cli_sock, SOMAXCONN) == -1) {
492: LOGERR;
493: return -1;
494: }
495:
496: while (srv->srv_blob.state != kill && srv->srv_kill != kill) {
497: for (c = srv->srv_blob.clients, i = 0; i < srv->srv_numcli && c; i++, c++)
498: if (!c->cli_sa.sa_family)
499: break;
500: if (i >= srv->srv_numcli) {
501: #ifdef HAVE_PTHREAD_YIELD
502: pthread_yield();
503: #else
504: usleep(1000000);
505: #endif
506: continue;
507: }
508:
509: FD_ZERO(&fds);
510: FD_SET(srv->srv_blob.server.cli_sock, &fds);
511: ret = select(srv->srv_blob.server.cli_sock + 1, &fds, NULL, NULL, &tv);
512: if (ret == -1) {
513: LOGERR;
514: ret = 1;
515: break;
516: }
517: if (!ret)
518: continue;
519:
520: c->cli_sock = accept(srv->srv_blob.server.cli_sock, &c->cli_sa, &salen);
521: if (c->cli_sock == -1) {
522: LOGERR;
523: continue;
524: } else
525: c->cli_parent = srv;
526:
527: if (pthread_create(&c->cli_tid, NULL, rpc_srv_dispatchVars, c)) {
528: LOGERR;
529: continue;
530: } else
531: pthread_detach(c->cli_tid);
532: }
533:
534: srv->srv_blob.state = kill;
535:
536: return 0;
537: }
538:
539:
540: /*
541: * rpc_srv_initServer() Init & create RPC Server
542: * @regProgID = ProgramID for authentication & recognition
543: * @regProcID = ProcessID for authentication & recognition
544: * @concurentClients = Concurent clients at same time to this server
545: * @netBuf = Network buffer length, if =0 == BUFSIZ (also meaning max RPC packet)
546: * @family = Family type, AF_INET, AF_INET6 or AF_LOCAL
547: * @csHost = Host name or address for bind server, if NULL any address
548: * @Port = Port for bind server, if Port == 0 default port is selected
549: * return: NULL == error or !=NULL bind and created RPC server instance
550: */
551: rpc_srv_t *
552: rpc_srv_initServer(u_int regProgID, u_int regProcID, int concurentClients,
553: int netBuf, u_short family, const char *csHost, u_short Port)
554: {
555: rpc_srv_t *srv = NULL;
556: int n = 1;
557: struct hostent *host = NULL;
558: struct sockaddr sa;
559: struct sockaddr_in *sin = (struct sockaddr_in*) &sa;
560: struct sockaddr_in6 *sin6 = (struct sockaddr_in6*) &sa;
561: struct sockaddr_un *sun = (struct sockaddr_un*) &sa;
562:
563: if (!concurentClients || !regProgID ||
564: (family != AF_INET && family != AF_INET6 && family != AF_LOCAL)) {
565: rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t init RPC server ...\n");
566: return NULL;
567: }
568: if (!Port)
569: Port = RPC_DEFPORT;
570: if (!netBuf)
571: netBuf = BUFSIZ;
572: if (csHost && family != AF_LOCAL) {
573: host = gethostbyname2(csHost, family);
574: if (!host) {
575: rpc_SetErr(h_errno, "Error:: %s\n", hstrerror(h_errno));
576: return NULL;
577: }
578: }
579: memset(&sa, 0, sizeof sa);
580: sa.sa_family = family;
581: switch (family) {
582: case AF_INET:
583: sin->sin_len = sizeof(struct sockaddr_in);
584: sin->sin_port = htons(Port);
585: if (csHost)
586: memcpy(&sin->sin_addr, host->h_addr, host->h_length);
587: break;
588: case AF_INET6:
589: sin6->sin6_len = sizeof(struct sockaddr_in6);
590: sin6->sin6_port = htons(Port);
591: if (csHost)
592: memcpy(&sin6->sin6_addr, host->h_addr, host->h_length);
593: break;
594: case AF_LOCAL:
595: sun->sun_len = sizeof(struct sockaddr_un);
596: if (csHost)
597: strlcpy(sun->sun_path, csHost, sizeof sun->sun_path);
598: break;
599: default:
600: rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t start RPC server ...\n");
601: return NULL;
602: }
603:
604: srv = malloc(sizeof(rpc_srv_t));
605: if (!srv) {
606: LOGERR;
607: return NULL;
608: } else
609: memset(srv, 0, sizeof(rpc_srv_t));
610:
611: srv->srv_netbuf = netBuf;
612: srv->srv_numcli = concurentClients;
613: srv->srv_session.sess_version = RPC_VERSION;
614: srv->srv_session.sess_program = regProgID;
615: srv->srv_session.sess_process = regProcID;
616:
617: srv->srv_server.cli_tid = pthread_self();
618: srv->srv_server.cli_parent = srv;
619: switch (family) {
620: case AF_INET:
621: memcpy(&srv->srv_server.cli_sa, sin, sizeof srv->srv_server.cli_sa);
622: break;
623: case AF_INET6:
624: memcpy(&srv->srv_server.cli_sa, sin6, sizeof srv->srv_server.cli_sa);
625: break;
626: case AF_LOCAL:
627: memcpy(&srv->srv_server.cli_sa, sun, sizeof srv->srv_server.cli_sa);
628: unlink(sun->sun_path);
629: break;
630: }
631:
632: /* create server socket */
633: srv->srv_server.cli_sock = socket(family, SOCK_STREAM, 0);
634: if (srv->srv_server.cli_sock == -1) {
635: LOGERR;
636: free(srv);
637: return NULL;
638: }
639: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
640: LOGERR;
641: close(srv->srv_server.cli_sock);
642: free(srv);
643: return NULL;
644: }
645: n = srv->srv_netbuf;
646: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
647: LOGERR;
648: close(srv->srv_server.cli_sock);
649: free(srv);
650: return NULL;
651: }
652: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
653: LOGERR;
654: close(srv->srv_server.cli_sock);
655: free(srv);
656: return NULL;
657: }
658: if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa, sizeof srv->srv_server.cli_sa) == -1) {
659: LOGERR;
660: close(srv->srv_server.cli_sock);
661: free(srv);
662: return NULL;
663: }
664:
665: /* allocate pool for concurent clients */
666: srv->srv_clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));
667: if (!srv->srv_clients) {
668: LOGERR;
669: close(srv->srv_server.cli_sock);
670: free(srv);
671: return NULL;
672: } else
673: memset(srv->srv_clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));
674:
675: pthread_mutex_init(&srv->srv_mtx, NULL);
676:
677: rpc_srv_registerCall(srv, NULL, CALL_SRVSHUTDOWN, 0);
678: rpc_srv_registerCall(srv, NULL, CALL_SRVCLIENTS, 1);
679: rpc_srv_registerCall(srv, NULL, CALL_SRVSESSIONS, 4);
680: rpc_srv_registerCall(srv, NULL, CALL_SRVCALLS, 1);
681: return srv;
682: }
683:
684: /*
685: * rpc_srv_endServer() Destroy RPC server, close all opened sockets and free resources
686: * @srv = RPC Server instance
687: * return: none
688: */
689: void
690: rpc_srv_endServer(rpc_srv_t * __restrict srv)
691: {
692: rpc_cli_t *c;
693: register int i;
694: rpc_func_t *f;
695:
696: if (!srv) {
697: rpc_SetErr(EINVAL, "Error:: Can`t destroy server because parameter is null!\n");
698: return;
699: }
700:
701: rpc_srv_endBLOBServer(srv);
702:
703: /* close all clients connections & server socket */
704: for (i = 0, c = srv->srv_clients; i < srv->srv_numcli && c; i++, c++)
705: if (c->cli_sa.sa_family) {
706: shutdown(c->cli_sock, SHUT_RDWR);
707: close(c->cli_sock);
708: }
709: close(srv->srv_server.cli_sock);
710:
711: if (srv->srv_clients) {
712: free(srv->srv_clients);
713: srv->srv_clients = NULL;
714: srv->srv_numcli = 0;
715: }
716:
717: /* detach exported calls */
718: pthread_mutex_lock(&srv->srv_mtx);
719: while ((f = srv->srv_funcs)) {
720: srv->srv_funcs = f->func_next;
721: free(f);
722: }
723: pthread_mutex_unlock(&srv->srv_mtx);
724:
725: while (pthread_mutex_trylock(&srv->srv_mtx) == EBUSY);
726: pthread_mutex_destroy(&srv->srv_mtx);
727:
728: free(srv);
729: srv = NULL;
730: }
731:
732: /*
733: * rpc_srv_loopServer() Execute Main server loop and wait for clients requests
734: * @srv = RPC Server instance
735: * return: -1 error or 0 ok, infinite loop ...
736: */
737: int
738: rpc_srv_loopServer(rpc_srv_t * __restrict srv)
739: {
740: socklen_t salen = sizeof(struct sockaddr);
741: register int i;
742: rpc_cli_t *c;
743: fd_set fds;
744: int ret;
745: struct timeval tv = { DEF_RPC_TIMEOUT, 0 };
746:
747: if (!srv) {
748: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start RPC server ...\n");
749: return -1;
750: }
751:
752: /* activate BLOB server worker if srv->srv_blob.state == enable */
753: rpc_srv_execBLOBServer(srv);
754:
755: if (listen(srv->srv_server.cli_sock, SOMAXCONN) == -1) {
756: LOGERR;
757: return -1;
758: }
759:
760: while (srv->srv_kill != kill) {
761: for (c = srv->srv_clients, i = 0; i < srv->srv_numcli && c; i++, c++)
762: if (!c->cli_sa.sa_family)
763: break;
764: if (i >= srv->srv_numcli) {
765: #ifdef HAVE_PTHREAD_YIELD
766: pthread_yield();
767: #else
768: usleep(1000000);
769: #endif
770: continue;
771: }
772:
773: FD_ZERO(&fds);
774: FD_SET(srv->srv_server.cli_sock, &fds);
775: ret = select(srv->srv_server.cli_sock + 1, &fds, NULL, NULL, &tv);
776: if (ret == -1) {
777: LOGERR;
778: ret = 1;
779: break;
780: }
781: if (!ret)
782: continue;
783:
784: c->cli_sock = accept(srv->srv_server.cli_sock, &c->cli_sa, &salen);
785: if (c->cli_sock == -1) {
786: LOGERR;
787: continue;
788: } else
789: c->cli_parent = srv;
790:
791: if (pthread_create(&c->cli_tid, NULL, rpc_srv_dispatchCall, c)) {
792: LOGERR;
793: continue;
794: } else
795: pthread_detach(c->cli_tid);
796: }
797:
798: return 0;
799: }
800:
801: // ---------------------------------------------------------
802:
803: /*
804: * rpc_srv_execCall() Execute registered call from RPC server
805: * @call = Register RPC call
806: * @rpc = IN RPC call structure
807: * @args = IN RPC calling arguments from RPC client
808: * return: -1 error, !=-1 ok
809: */
810: int
811: rpc_srv_execCall(rpc_func_t * __restrict call, struct tagRPCCall * __restrict rpc,
812: array_t * __restrict args)
813: {
814: void *dl;
815: rpc_callback_t func;
816: int ret;
817:
818: if (!call || !rpc || !call->func_parent) {
819: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t exec call from RPC server ...\n");
820: return -1;
821: }
822:
823: dl = dlopen((char*) (*call->func_file ? call->func_file : NULL), RTLD_NOW);
824: if (!dl) {
825: rpc_SetErr(ENOENT, "Error:: Can`t attach module %s!\n", dlerror());
826: return -1;
827: }
828:
829: func = dlsym(dl, (char*) call->func_name);
830: if (func)
831: ret = func(call, ntohs(rpc->call_argc), args);
832: else {
833: rpc_SetErr(ENOEXEC, "Error:: Can`t find function %s!\n", dlerror());
834: ret = -1;
835: }
836:
837: dlclose(dl);
838: return ret;
839: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>