Annotation of libaitrpc/src/srv.c, revision 1.5.2.2
1.1 misho 1: /*************************************************************************
2: * (C) 2010 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.5.2.2 ! misho 6: * $Id: srv.c,v 1.5.2.1 2011/09/07 08:56:32 misho Exp $
1.1 misho 7: *
1.2 misho 8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011
16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
1.1 misho 46: #include "global.h"
47:
48:
49: static void *
50: rpc_srv_dispatchCall(void *arg)
51: {
52: rpc_cli_t *c = arg;
53: rpc_srv_t *s;
1.3 misho 54: rpc_func_t *f = NULL;
1.5 misho 55: array_t *arr;
1.1 misho 56: struct tagRPCCall *rpc;
1.3 misho 57: struct tagRPCRet *rrpc;
58: rpc_sess_t ses = { 0 };
1.1 misho 59: fd_set fds;
1.5 misho 60: u_char *buf;
1.2 misho 61: int ret, argc = 0, Limit = 0;
1.1 misho 62: register int i;
63:
64: if (!arg) {
65: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t procced RPC client ...\n");
66: return NULL;
67: } else
68: s = c->cli_parent;
69:
1.5 misho 70: buf = malloc(s->srv_netbuf);
71: if (!buf) {
72: LOGERR;
73: return NULL;
74: }
75:
1.1 misho 76: do {
77: FD_ZERO(&fds);
78: FD_SET(c->cli_sock, &fds);
79: ret = select(c->cli_sock + 1, &fds, NULL, NULL, NULL);
80: if (ret == -1) {
1.4 misho 81: if (errno == EINTR && s->srv_kill != kill)
82: continue;
83:
84: LOGERR;
1.1 misho 85: ret = -2;
1.4 misho 86: break;
1.1 misho 87: }
1.5 misho 88: memset(buf, 0, s->srv_netbuf);
89: ret = recv(c->cli_sock, buf, s->srv_netbuf, 0);
1.4 misho 90: if (ret == -1) {
1.1 misho 91: LOGERR;
92: ret = -3;
93: break;
94: }
1.4 misho 95: if (!ret) { /* receive EOF */
1.1 misho 96: ret = 0;
97: break;
98: }
99: if (ret < sizeof(struct tagRPCCall)) {
1.5 misho 100: rpc_SetErr(ERPCMISMATCH, "Error:: too short RPC packet ...\n");
1.1 misho 101: ret = -4;
102: break;
103: } else
104: rpc = (struct tagRPCCall*) buf;
1.4 misho 105: /* check RPC packet session info */
1.5.2.1 misho 106: if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) {
1.5 misho 107: rpc_SetErr(ERPCMISMATCH, "Error:: get invalid RPC session ...\n");
1.1 misho 108: ret = -5;
1.2 misho 109: goto makeReply;
110: } else
111: Limit = sizeof(struct tagRPCCall);
1.5 misho 112:
1.4 misho 113: /* RPC is OK! Go decapsulate variables ... */
1.5.2.2 ! misho 114: if (ntohs(rpc->call_argc)) {
! 115: arr = io_buffer2vals(buf + Limit, s->srv_netbuf - Limit,
! 116: ntohs(rpc->call_argc), 1);
1.5 misho 117: if (!arr) {
1.2 misho 118: ret = -5;
119: goto makeReply;
1.1 misho 120: }
1.5 misho 121: } else
122: arr = NULL;
1.1 misho 123:
1.4 misho 124: /* execute call */
1.1 misho 125: argc = 0;
1.3 misho 126: memcpy(&ses, &rpc->call_session, sizeof ses);
1.5.2.2 ! misho 127: if (!(f = rpc_srv_getCall(s, ntohs(rpc->call_tag), ntohl(rpc->call_hash)))) {
1.5 misho 128: rpc_SetErr(EPROGUNAVAIL, "Error:: call not found into RPC server ...\n");
1.1 misho 129: ret = -6;
130: } else
1.5 misho 131: if ((ret = rpc_srv_execCall(f, rpc, arr)) == -1)
1.2 misho 132: ret = -9;
1.5 misho 133: else {
134: if (arr)
135: io_arrayDestroy(&arr);
136: argc = rpc_srv_getVars(f, &arr);
137: goto makeReply; /* Call finish OK */
138: }
139:
140: if (arr)
141: io_arrayDestroy(&arr);
142:
1.3 misho 143: makeReply:
1.5 misho 144: /* Made reply */
145: memset(buf, 0, s->srv_netbuf);
1.3 misho 146: rrpc = (struct tagRPCRet*) buf;
147: Limit = sizeof(struct tagRPCRet);
1.1 misho 148:
1.3 misho 149: memcpy(&rrpc->ret_session, &ses, sizeof(rpc_sess_t));
150: rrpc->ret_tag = rpc->call_tag;
151: rrpc->ret_hash = rpc->call_hash;
1.5.2.2 ! misho 152: rrpc->ret_errno = htonl(rpc_Errno);
! 153: rrpc->ret_retcode = htonl(ret);
! 154: rrpc->ret_argc = htons(argc);
1.1 misho 155:
1.5 misho 156: if (argc && arr) {
157: /* Go Encapsulate variables ... */
158: if ((i = io_vals2buffer(buf + Limit, s->srv_netbuf - Limit, arr)) == -1) {
159: rpc_srv_freeVals(f);
1.3 misho 160: argc = 0;
161: ret = -7;
1.5 misho 162: rpc_SetErr(EBADRPC, "Error:: in prepare RPC packet values (-7) ...\n");
1.3 misho 163: goto makeReply;
1.5 misho 164: } else {
165: Limit += i;
1.1 misho 166:
1.5 misho 167: rpc_srv_freeVals(f);
1.1 misho 168: }
169: }
170:
1.4 misho 171: ret = send(c->cli_sock, buf, Limit, 0);
172: if (ret == -1) {
1.1 misho 173: LOGERR;
174: ret = -8;
175: break;
176: }
177: if (ret != Limit) {
1.5 misho 178: rpc_SetErr(EPROCUNAVAIL, "Error:: in send RPC request, should be send %d bytes, "
1.1 misho 179: "really is %d\n", Limit, ret);
180: ret = -9;
181: break;
182: }
1.4 misho 183: } while (ret > -1 || s->srv_kill != kill);
1.1 misho 184:
185: shutdown(c->cli_sock, SHUT_RDWR);
186: close(c->cli_sock);
187: memset(c, 0, sizeof(rpc_cli_t));
1.5 misho 188: free(buf);
1.2 misho 189: return (void*) (long)ret;
190: }
191:
192:
193: static void *
194: rpc_srv_dispatchVars(void *arg)
195: {
196: rpc_cli_t *c = arg;
197: rpc_srv_t *s;
198: rpc_blob_t *b;
1.4 misho 199: int ret = 0;
1.2 misho 200: fd_set fds;
201: u_char buf[sizeof(struct tagBLOBHdr)];
202: struct tagBLOBHdr *blob;
203:
204: if (!arg) {
205: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t procced BLOB client ...\n");
206: return NULL;
207: } else
208: s = c->cli_parent;
209:
210: do {
1.4 misho 211: /* check for disable service at this moment? */
212: if (s->srv_blob.state == disable && s->srv_kill != kill) {
213: usleep(100000);
214: #ifdef HAVE_PTHREAD_YIELD
215: pthread_yield();
216: #endif
217: continue;
1.2 misho 218: }
219:
220: FD_ZERO(&fds);
221: FD_SET(c->cli_sock, &fds);
222: ret = select(c->cli_sock + 1, &fds, NULL, NULL, NULL);
223: if (ret == -1) {
1.4 misho 224: if (errno == EINTR && s->srv_kill != kill && s->srv_blob.state != kill)
225: continue;
226:
227: LOGERR;
1.2 misho 228: ret = -2;
1.4 misho 229: break;
1.2 misho 230: }
231:
232: memset(buf, 0, sizeof buf);
1.4 misho 233: ret = recv(c->cli_sock, buf, sizeof buf, 0);
234: if (ret == -1) {
1.2 misho 235: LOGERR;
236: ret = -3;
237: break;
238: }
1.4 misho 239: /* receive EOF, disable or kill service */
240: if (!ret || s->srv_blob.state == kill || s->srv_kill == kill) {
1.2 misho 241: ret = 0;
242: break;
243: }
244: if (ret < sizeof(struct tagBLOBHdr)) {
1.5 misho 245: rpc_SetErr(ERPCMISMATCH, "Error:: too short BLOB packet ...\n");
1.2 misho 246: ret = -4;
247: break;
248: } else
249: blob = (struct tagBLOBHdr*) buf;
1.4 misho 250: /* check BLOB packet session info */
1.2 misho 251: if (memcmp(&blob->hdr_session, &s->srv_session, sizeof blob->hdr_session)) {
252: rpc_SetErr(EINVAL, "Error:: get invalid BLOB session ...\n");
253: ret = -5;
254: goto makeReply;
255: }
1.4 misho 256: /* Go to proceed packet ... */
1.2 misho 257: switch (blob->hdr_cmd) {
258: case get:
259: if (!(b = rpc_srv_getBLOB(s, blob->hdr_var))) {
260: rpc_SetErr(EINVAL, "Error:: var (%x) not found into BLOB server ...\n",
261: blob->hdr_var);
262: ret = -6;
263: break;
264: } else
265: blob->hdr_len = b->blob_len;
266:
267: if (rpc_srv_blobMap(s, b) != -1) {
268: ret = rpc_srv_sendBLOB(c, b);
269: rpc_srv_blobUnmap(b);
270: } else
271: ret = -7;
272: break;
273: case set:
274: if ((b = rpc_srv_registerBLOB(s, blob->hdr_len))) {
1.4 misho 275: /* set new BLOB variable for reply :) */
1.2 misho 276: blob->hdr_var = b->blob_var;
277:
278: ret = rpc_srv_recvBLOB(c, b);
279: rpc_srv_blobUnmap(b);
280: } else
281: ret = -7;
282: break;
283: case unset:
284: ret = rpc_srv_unregisterBLOB(s, blob->hdr_var);
285: if (ret == -1)
286: ret = -7;
287: break;
288: default:
1.5 misho 289: rpc_SetErr(EPROCUNAVAIL, "Error:: unsupported BLOB command (%d)...\n",
1.2 misho 290: blob->hdr_cmd);
291: ret = -7;
292: }
293:
294: makeReply:
1.4 misho 295: /* Replay to client! */
1.2 misho 296: blob->hdr_cmd = ret < 0 ? error : ok;
297: blob->hdr_ret = ret;
1.4 misho 298: ret = send(c->cli_sock, buf, sizeof buf, 0);
299: if (ret == -1) {
1.2 misho 300: LOGERR;
301: ret = -8;
302: break;
303: }
304: if (ret != sizeof buf) {
1.5 misho 305: rpc_SetErr(EPROCUNAVAIL, "Error:: in send BLOB reply, should be send %d bytes, "
1.2 misho 306: "really is %d\n", sizeof buf, ret);
307: ret = -9;
308: break;
309: }
1.4 misho 310: } while (ret > -1 || s->srv_kill != kill);
1.2 misho 311:
312: shutdown(c->cli_sock, SHUT_RDWR);
313: close(c->cli_sock);
314: memset(c, 0, sizeof(rpc_cli_t));
1.5 misho 315: return (void*) ((long)ret);
1.1 misho 316: }
317:
318: // -------------------------------------------------
319:
320: /*
1.2 misho 321: * rpc_srv_initBLOBServer() Init & create BLOB Server
1.4 misho 322: * @srv = RPC server instance
1.2 misho 323: * @Port = Port for bind server, if Port == 0 default port is selected
324: * @diskDir = Disk place for BLOB file objects
325: * return: -1 == error or 0 bind and created BLOB server instance
326: */
327: int
328: rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)
329: {
330: int n = 1;
1.4 misho 331: struct sockaddr sa;
332: struct sockaddr_in *sin = (struct sockaddr_in*) &sa;
333: struct sockaddr_in6 *sin6 = (struct sockaddr_in6*) &sa;
334: struct sockaddr_un *sun = (struct sockaddr_un*) &sa;
1.2 misho 335:
336: if (!srv) {
337: rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t init BLOB server ...\n");
338: return -1;
339: }
340: if (srv->srv_blob.state) {
341: rpc_SetErr(EPERM, "Warning:: Already started BLOB server!\n");
342: return 0;
343: }
344: if (!Port)
345: Port = RPC_DEFPORT + 1;
346:
347: memset(&srv->srv_blob, 0, sizeof srv->srv_blob);
348: if (access(diskDir, R_OK | W_OK) == -1) {
349: LOGERR;
350: return -1;
351: } else
352: strlcpy(srv->srv_blob.dir, diskDir, UCHAR_MAX + 1);
353:
354: srv->srv_blob.server.cli_tid = pthread_self();
355: srv->srv_blob.server.cli_parent = srv;
1.4 misho 356:
357: memcpy(&sa, &srv->srv_server.cli_sa, sizeof sa);
358: switch (srv->srv_server.cli_sa.sa_family) {
359: case AF_INET:
360: sin->sin_port = htons(Port);
361: memcpy(&srv->srv_blob.server.cli_sa, sin, sizeof(struct sockaddr));
362: break;
363: case AF_INET6:
364: sin6->sin6_port = htons(Port);
365: memcpy(&srv->srv_blob.server.cli_sa, sin6, sizeof(struct sockaddr));
366: break;
367: case AF_LOCAL:
368: strlcat(sun->sun_path, ".blob", sizeof sun->sun_path);
369: memcpy(&srv->srv_blob.server.cli_sa, sun, sizeof(struct sockaddr));
370: break;
371: default:
372: return -1;
1.2 misho 373: }
374:
1.4 misho 375: /* create BLOB server socket */
1.2 misho 376: srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa_family, SOCK_STREAM, 0);
377: if (srv->srv_blob.server.cli_sock == -1) {
378: LOGERR;
379: return -1;
380: }
381: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
382: LOGERR;
383: close(srv->srv_blob.server.cli_sock);
384: return -1;
385: }
1.5 misho 386: n = srv->srv_netbuf;
387: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
388: LOGERR;
389: close(srv->srv_blob.server.cli_sock);
390: return -1;
391: }
392: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
393: LOGERR;
394: close(srv->srv_blob.server.cli_sock);
395: return -1;
396: }
1.2 misho 397: if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa,
398: sizeof srv->srv_blob.server.cli_sa) == -1) {
399: LOGERR;
400: close(srv->srv_blob.server.cli_sock);
401: return -1;
402: }
403:
1.4 misho 404: /* allocate pool for concurent clients */
1.2 misho 405: srv->srv_blob.clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));
406: if (!srv->srv_blob.clients) {
407: LOGERR;
408: close(srv->srv_blob.server.cli_sock);
409: return -1;
410: } else
411: memset(srv->srv_blob.clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));
412:
413: pthread_mutex_init(&srv->srv_blob.mtx, NULL);
414:
415: rpc_srv_registerCall(srv, NULL, CALL_BLOBSHUTDOWN, 0);
1.5 misho 416: rpc_srv_registerCall(srv, NULL, CALL_BLOBCLIENTS, 1);
417: rpc_srv_registerCall(srv, NULL, CALL_BLOBVARS, 1);
418: rpc_srv_registerCall(srv, NULL, CALL_BLOBSTATE, 0);
1.2 misho 419:
1.4 misho 420: srv->srv_blob.state = enable; /* enable BLOB */
1.2 misho 421: return 0;
422: }
423:
424: /*
425: * rpc_srv_endBLOBServer() Destroy BLOB server, close all opened sockets and free resources
426: * @srv = RPC Server instance
427: * return: none
428: */
429: void
430: rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
431: {
432: rpc_cli_t *c;
433: register int i;
434: rpc_blob_t *f;
435:
436: if (!srv) {
437: rpc_SetErr(EINVAL, "Error:: Can`t destroy server because parameter is null!\n");
438: return;
439: } else
1.4 misho 440: srv->srv_blob.state = kill;
1.2 misho 441:
442: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSHUTDOWN);
443: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBCLIENTS);
444: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBVARS);
445: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSTATE);
446:
1.4 misho 447: /* close all clients connections & server socket */
1.2 misho 448: for (i = 0, c = srv->srv_blob.clients; i < srv->srv_numcli && c; i++, c++)
449: if (c->cli_sa.sa_family)
450: shutdown(c->cli_sock, SHUT_RDWR);
451: close(srv->srv_blob.server.cli_sock);
452:
1.5 misho 453: pthread_mutex_lock(&srv->srv_blob.mtx);
1.2 misho 454: if (srv->srv_blob.clients) {
455: free(srv->srv_blob.clients);
456: srv->srv_blob.clients = NULL;
457: }
458:
1.4 misho 459: /* detach blobs */
1.2 misho 460: while ((f = srv->srv_blob.blobs)) {
461: srv->srv_blob.blobs = f->blob_next;
462: rpc_srv_blobFree(srv, f);
463: free(f);
464: }
465: pthread_mutex_unlock(&srv->srv_blob.mtx);
466:
467: while (pthread_mutex_trylock(&srv->srv_blob.mtx) == EBUSY);
468: pthread_mutex_destroy(&srv->srv_blob.mtx);
469: }
470:
471: /*
1.5 misho 472: * rpc_srv_loopBLOB() Execute Main BLOB server loop and wait for clients requests
1.2 misho 473: * @srv = RPC Server instance
474: * return: -1 error or 0 ok, infinite loop ...
475: */
476: int
1.5 misho 477: rpc_srv_loopBLOB(rpc_srv_t * __restrict srv)
1.2 misho 478: {
479: socklen_t salen = sizeof(struct sockaddr);
480: register int i;
481: rpc_cli_t *c;
482: fd_set fds;
483: int ret;
484: struct timeval tv = { DEF_RPC_TIMEOUT, 0 };
485:
1.4 misho 486: if (!srv || srv->srv_blob.state == kill) {
1.2 misho 487: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start BLOB server ...\n");
488: return -1;
489: }
490:
491: if (listen(srv->srv_blob.server.cli_sock, SOMAXCONN) == -1) {
492: LOGERR;
493: return -1;
494: }
495:
1.4 misho 496: while (srv->srv_blob.state != kill && srv->srv_kill != kill) {
1.2 misho 497: for (c = srv->srv_blob.clients, i = 0; i < srv->srv_numcli && c; i++, c++)
498: if (!c->cli_sa.sa_family)
499: break;
500: if (i >= srv->srv_numcli) {
1.5 misho 501: #ifdef HAVE_PTHREAD_YIELD
502: pthread_yield();
503: #else
1.2 misho 504: usleep(1000000);
1.5 misho 505: #endif
1.2 misho 506: continue;
507: }
508:
509: FD_ZERO(&fds);
510: FD_SET(srv->srv_blob.server.cli_sock, &fds);
511: ret = select(srv->srv_blob.server.cli_sock + 1, &fds, NULL, NULL, &tv);
512: if (ret == -1) {
513: LOGERR;
514: ret = 1;
515: break;
516: }
517: if (!ret)
518: continue;
519:
520: c->cli_sock = accept(srv->srv_blob.server.cli_sock, &c->cli_sa, &salen);
521: if (c->cli_sock == -1) {
522: LOGERR;
523: continue;
524: } else
525: c->cli_parent = srv;
526:
527: if (pthread_create(&c->cli_tid, NULL, rpc_srv_dispatchVars, c)) {
528: LOGERR;
529: continue;
1.4 misho 530: } else
531: pthread_detach(c->cli_tid);
1.2 misho 532: }
533:
1.5 misho 534: srv->srv_blob.state = kill;
1.2 misho 535:
536: return 0;
537: }
538:
539:
540: /*
1.1 misho 541: * rpc_srv_initServer() Init & create RPC Server
542: * @regProgID = ProgramID for authentication & recognition
543: * @regProcID = ProcessID for authentication & recognition
544: * @concurentClients = Concurent clients at same time to this server
1.5 misho 545: * @netBuf = Network buffer length, if =0 == BUFSIZ (also meaning max RPC packet)
1.4 misho 546: * @family = Family type, AF_INET, AF_INET6 or AF_LOCAL
547: * @csHost = Host name or address for bind server, if NULL any address
1.1 misho 548: * @Port = Port for bind server, if Port == 0 default port is selected
549: * return: NULL == error or !=NULL bind and created RPC server instance
550: */
551: rpc_srv_t *
552: rpc_srv_initServer(u_int regProgID, u_int regProcID, int concurentClients,
1.5 misho 553: int netBuf, u_short family, const char *csHost, u_short Port)
1.1 misho 554: {
555: rpc_srv_t *srv = NULL;
556: int n = 1;
557: struct hostent *host = NULL;
1.4 misho 558: struct sockaddr sa;
559: struct sockaddr_in *sin = (struct sockaddr_in*) &sa;
560: struct sockaddr_in6 *sin6 = (struct sockaddr_in6*) &sa;
561: struct sockaddr_un *sun = (struct sockaddr_un*) &sa;
1.1 misho 562:
1.4 misho 563: if (!concurentClients || !regProgID ||
564: (family != AF_INET && family != AF_INET6 && family != AF_LOCAL)) {
1.1 misho 565: rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t init RPC server ...\n");
566: return NULL;
567: }
568: if (!Port)
569: Port = RPC_DEFPORT;
1.5 misho 570: if (!netBuf)
571: netBuf = BUFSIZ;
1.4 misho 572: if (csHost && family != AF_LOCAL) {
1.1 misho 573: host = gethostbyname2(csHost, family);
574: if (!host) {
575: rpc_SetErr(h_errno, "Error:: %s\n", hstrerror(h_errno));
576: return NULL;
577: }
578: }
1.4 misho 579: memset(&sa, 0, sizeof sa);
580: sa.sa_family = family;
1.1 misho 581: switch (family) {
582: case AF_INET:
1.4 misho 583: sin->sin_len = sizeof(struct sockaddr_in);
584: sin->sin_port = htons(Port);
1.1 misho 585: if (csHost)
1.4 misho 586: memcpy(&sin->sin_addr, host->h_addr, host->h_length);
1.1 misho 587: break;
588: case AF_INET6:
1.4 misho 589: sin6->sin6_len = sizeof(struct sockaddr_in6);
590: sin6->sin6_port = htons(Port);
591: if (csHost)
592: memcpy(&sin6->sin6_addr, host->h_addr, host->h_length);
593: break;
594: case AF_LOCAL:
595: sun->sun_len = sizeof(struct sockaddr_un);
1.1 misho 596: if (csHost)
1.4 misho 597: strlcpy(sun->sun_path, csHost, sizeof sun->sun_path);
1.1 misho 598: break;
599: default:
600: rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t start RPC server ...\n");
601: return NULL;
602: }
603:
604: srv = malloc(sizeof(rpc_srv_t));
605: if (!srv) {
606: LOGERR;
607: return NULL;
608: } else
609: memset(srv, 0, sizeof(rpc_srv_t));
610:
1.5 misho 611: srv->srv_netbuf = netBuf;
1.1 misho 612: srv->srv_numcli = concurentClients;
613: srv->srv_session.sess_version = RPC_VERSION;
614: srv->srv_session.sess_program = regProgID;
615: srv->srv_session.sess_process = regProcID;
616:
1.2 misho 617: srv->srv_server.cli_tid = pthread_self();
1.1 misho 618: srv->srv_server.cli_parent = srv;
1.4 misho 619: switch (family) {
620: case AF_INET:
621: memcpy(&srv->srv_server.cli_sa, sin, sizeof srv->srv_server.cli_sa);
622: break;
623: case AF_INET6:
624: memcpy(&srv->srv_server.cli_sa, sin6, sizeof srv->srv_server.cli_sa);
625: break;
626: case AF_LOCAL:
627: memcpy(&srv->srv_server.cli_sa, sun, sizeof srv->srv_server.cli_sa);
628: unlink(sun->sun_path);
629: break;
630: }
631:
632: /* create server socket */
1.1 misho 633: srv->srv_server.cli_sock = socket(family, SOCK_STREAM, 0);
634: if (srv->srv_server.cli_sock == -1) {
635: LOGERR;
636: free(srv);
637: return NULL;
638: }
639: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
640: LOGERR;
641: close(srv->srv_server.cli_sock);
642: free(srv);
643: return NULL;
644: }
1.5 misho 645: n = srv->srv_netbuf;
646: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
647: LOGERR;
648: close(srv->srv_server.cli_sock);
649: free(srv);
650: return NULL;
651: }
652: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
653: LOGERR;
654: close(srv->srv_server.cli_sock);
655: free(srv);
656: return NULL;
657: }
1.1 misho 658: if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa, sizeof srv->srv_server.cli_sa) == -1) {
659: LOGERR;
660: close(srv->srv_server.cli_sock);
661: free(srv);
662: return NULL;
663: }
664:
1.4 misho 665: /* allocate pool for concurent clients */
1.1 misho 666: srv->srv_clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));
667: if (!srv->srv_clients) {
668: LOGERR;
669: close(srv->srv_server.cli_sock);
670: free(srv);
671: return NULL;
672: } else
673: memset(srv->srv_clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));
674:
1.2 misho 675: pthread_mutex_init(&srv->srv_mtx, NULL);
676:
677: rpc_srv_registerCall(srv, NULL, CALL_SRVSHUTDOWN, 0);
1.5 misho 678: rpc_srv_registerCall(srv, NULL, CALL_SRVCLIENTS, 1);
679: rpc_srv_registerCall(srv, NULL, CALL_SRVSESSIONS, 4);
680: rpc_srv_registerCall(srv, NULL, CALL_SRVCALLS, 1);
1.1 misho 681: return srv;
682: }
683:
684: /*
685: * rpc_srv_endServer() Destroy RPC server, close all opened sockets and free resources
686: * @srv = RPC Server instance
687: * return: none
688: */
689: void
690: rpc_srv_endServer(rpc_srv_t * __restrict srv)
691: {
692: rpc_cli_t *c;
693: register int i;
694: rpc_func_t *f;
695:
696: if (!srv) {
697: rpc_SetErr(EINVAL, "Error:: Can`t destroy server because parameter is null!\n");
698: return;
699: }
700:
1.2 misho 701: rpc_srv_endBLOBServer(srv);
1.1 misho 702:
1.4 misho 703: /* close all clients connections & server socket */
1.1 misho 704: for (i = 0, c = srv->srv_clients; i < srv->srv_numcli && c; i++, c++)
1.2 misho 705: if (c->cli_sa.sa_family) {
1.1 misho 706: shutdown(c->cli_sock, SHUT_RDWR);
1.2 misho 707: close(c->cli_sock);
708: }
1.1 misho 709: close(srv->srv_server.cli_sock);
710:
711: if (srv->srv_clients) {
712: free(srv->srv_clients);
1.2 misho 713: srv->srv_clients = NULL;
1.1 misho 714: srv->srv_numcli = 0;
715: }
716:
1.4 misho 717: /* detach exported calls */
1.2 misho 718: pthread_mutex_lock(&srv->srv_mtx);
719: while ((f = srv->srv_funcs)) {
720: srv->srv_funcs = f->func_next;
721: free(f);
722: }
723: pthread_mutex_unlock(&srv->srv_mtx);
724:
725: while (pthread_mutex_trylock(&srv->srv_mtx) == EBUSY);
726: pthread_mutex_destroy(&srv->srv_mtx);
727:
1.1 misho 728: free(srv);
729: srv = NULL;
730: }
731:
732: /*
1.5 misho 733: * rpc_srv_loopServer() Execute Main server loop and wait for clients requests
1.1 misho 734: * @srv = RPC Server instance
735: * return: -1 error or 0 ok, infinite loop ...
736: */
737: int
1.5 misho 738: rpc_srv_loopServer(rpc_srv_t * __restrict srv)
1.1 misho 739: {
740: socklen_t salen = sizeof(struct sockaddr);
741: register int i;
742: rpc_cli_t *c;
1.2 misho 743: fd_set fds;
744: int ret;
745: struct timeval tv = { DEF_RPC_TIMEOUT, 0 };
1.1 misho 746:
747: if (!srv) {
748: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start RPC server ...\n");
749: return -1;
750: }
751:
1.5 misho 752: /* activate BLOB server worker if srv->srv_blob.state == enable */
753: rpc_srv_execBLOBServer(srv);
754:
1.1 misho 755: if (listen(srv->srv_server.cli_sock, SOMAXCONN) == -1) {
756: LOGERR;
757: return -1;
758: }
759:
1.4 misho 760: while (srv->srv_kill != kill) {
1.1 misho 761: for (c = srv->srv_clients, i = 0; i < srv->srv_numcli && c; i++, c++)
762: if (!c->cli_sa.sa_family)
763: break;
1.2 misho 764: if (i >= srv->srv_numcli) {
1.5 misho 765: #ifdef HAVE_PTHREAD_YIELD
766: pthread_yield();
767: #else
1.1 misho 768: usleep(1000000);
1.5 misho 769: #endif
1.1 misho 770: continue;
771: }
1.2 misho 772:
773: FD_ZERO(&fds);
774: FD_SET(srv->srv_server.cli_sock, &fds);
775: ret = select(srv->srv_server.cli_sock + 1, &fds, NULL, NULL, &tv);
776: if (ret == -1) {
777: LOGERR;
778: ret = 1;
779: break;
780: }
781: if (!ret)
782: continue;
783:
1.1 misho 784: c->cli_sock = accept(srv->srv_server.cli_sock, &c->cli_sa, &salen);
785: if (c->cli_sock == -1) {
786: LOGERR;
787: continue;
788: } else
789: c->cli_parent = srv;
790:
791: if (pthread_create(&c->cli_tid, NULL, rpc_srv_dispatchCall, c)) {
792: LOGERR;
793: continue;
1.4 misho 794: } else
795: pthread_detach(c->cli_tid);
1.1 misho 796: }
797:
798: return 0;
799: }
800:
801: // ---------------------------------------------------------
802:
803: /*
804: * rpc_srv_execCall() Execute registered call from RPC server
805: * @call = Register RPC call
806: * @rpc = IN RPC call structure
1.5 misho 807: * @args = IN RPC calling arguments from RPC client
1.1 misho 808: * return: -1 error, !=-1 ok
809: */
810: int
1.2 misho 811: rpc_srv_execCall(rpc_func_t * __restrict call, struct tagRPCCall * __restrict rpc,
1.5 misho 812: array_t * __restrict args)
1.1 misho 813: {
814: void *dl;
815: rpc_callback_t func;
816: int ret;
817:
1.2 misho 818: if (!call || !rpc || !call->func_parent) {
1.1 misho 819: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t exec call from RPC server ...\n");
820: return -1;
821: }
822:
823: dl = dlopen((char*) (*call->func_file ? call->func_file : NULL), RTLD_NOW);
824: if (!dl) {
825: rpc_SetErr(ENOENT, "Error:: Can`t attach module %s!\n", dlerror());
826: return -1;
827: }
828:
829: func = dlsym(dl, (char*) call->func_name);
830: if (func)
1.5.2.2 ! misho 831: ret = func(call, ntohs(rpc->call_argc), args);
1.1 misho 832: else {
833: rpc_SetErr(ENOEXEC, "Error:: Can`t find function %s!\n", dlerror());
834: ret = -1;
835: }
836:
837: dlclose(dl);
838: return ret;
839: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>