Annotation of libaitrpc/src/srv.c, revision 1.5
1.1 misho 1: /*************************************************************************
2: * (C) 2010 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.5 ! misho 6: * $Id: srv.c,v 1.4.2.12 2011/09/07 00:47:14 misho Exp $
1.1 misho 7: *
1.2 misho 8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011
16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
1.1 misho 46: #include "global.h"
47:
48:
49: static void *
50: rpc_srv_dispatchCall(void *arg)
51: {
52: rpc_cli_t *c = arg;
53: rpc_srv_t *s;
1.3 misho 54: rpc_func_t *f = NULL;
1.5 ! misho 55: array_t *arr;
1.1 misho 56: struct tagRPCCall *rpc;
1.3 misho 57: struct tagRPCRet *rrpc;
58: rpc_sess_t ses = { 0 };
1.1 misho 59: fd_set fds;
1.5 ! misho 60: u_char *buf;
1.2 misho 61: int ret, argc = 0, Limit = 0;
1.1 misho 62: register int i;
63:
64: if (!arg) {
65: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t procced RPC client ...\n");
66: return NULL;
67: } else
68: s = c->cli_parent;
69:
1.5 ! misho 70: buf = malloc(s->srv_netbuf);
! 71: if (!buf) {
! 72: LOGERR;
! 73: return NULL;
! 74: }
! 75:
1.1 misho 76: do {
77: FD_ZERO(&fds);
78: FD_SET(c->cli_sock, &fds);
79: ret = select(c->cli_sock + 1, &fds, NULL, NULL, NULL);
80: if (ret == -1) {
1.4 misho 81: if (errno == EINTR && s->srv_kill != kill)
82: continue;
83:
84: LOGERR;
1.1 misho 85: ret = -2;
1.4 misho 86: break;
1.1 misho 87: }
1.5 ! misho 88: memset(buf, 0, s->srv_netbuf);
! 89: ret = recv(c->cli_sock, buf, s->srv_netbuf, 0);
1.4 misho 90: if (ret == -1) {
1.1 misho 91: LOGERR;
92: ret = -3;
93: break;
94: }
1.4 misho 95: if (!ret) { /* receive EOF */
1.1 misho 96: ret = 0;
97: break;
98: }
99: if (ret < sizeof(struct tagRPCCall)) {
1.5 ! misho 100: rpc_SetErr(ERPCMISMATCH, "Error:: too short RPC packet ...\n");
1.1 misho 101: ret = -4;
102: break;
103: } else
104: rpc = (struct tagRPCCall*) buf;
1.4 misho 105: /* check RPC packet session info */
1.1 misho 106: if (memcmp(&rpc->call_session, &s->srv_session, sizeof rpc->call_session)) {
1.5 ! misho 107: rpc_SetErr(ERPCMISMATCH, "Error:: get invalid RPC session ...\n");
1.1 misho 108: ret = -5;
1.2 misho 109: goto makeReply;
110: } else
111: Limit = sizeof(struct tagRPCCall);
1.5 ! misho 112:
1.4 misho 113: /* RPC is OK! Go decapsulate variables ... */
1.1 misho 114: if (rpc->call_argc) {
1.5 ! misho 115: arr = io_buffer2vals(buf + Limit, s->srv_netbuf - Limit, rpc->call_argc, 1);
! 116: if (!arr) {
1.2 misho 117: ret = -5;
118: goto makeReply;
1.1 misho 119: }
1.5 ! misho 120: } else
! 121: arr = NULL;
1.1 misho 122:
1.4 misho 123: /* execute call */
1.1 misho 124: argc = 0;
1.3 misho 125: memcpy(&ses, &rpc->call_session, sizeof ses);
1.1 misho 126: if (!(f = rpc_srv_getCall(s, rpc->call_tag, rpc->call_hash))) {
1.5 ! misho 127: rpc_SetErr(EPROGUNAVAIL, "Error:: call not found into RPC server ...\n");
1.1 misho 128: ret = -6;
129: } else
1.5 ! misho 130: if ((ret = rpc_srv_execCall(f, rpc, arr)) == -1)
1.2 misho 131: ret = -9;
1.5 ! misho 132: else {
! 133: if (arr)
! 134: io_arrayDestroy(&arr);
! 135: argc = rpc_srv_getVars(f, &arr);
! 136: goto makeReply; /* Call finish OK */
! 137: }
! 138:
! 139: if (arr)
! 140: io_arrayDestroy(&arr);
! 141:
1.3 misho 142: makeReply:
1.5 ! misho 143: /* Made reply */
! 144: memset(buf, 0, s->srv_netbuf);
1.3 misho 145: rrpc = (struct tagRPCRet*) buf;
146: Limit = sizeof(struct tagRPCRet);
1.1 misho 147:
1.3 misho 148: memcpy(&rrpc->ret_session, &ses, sizeof(rpc_sess_t));
149: rrpc->ret_tag = rpc->call_tag;
150: rrpc->ret_hash = rpc->call_hash;
151: rrpc->ret_errno = rpc_Errno;
152: rrpc->ret_retcode = ret;
153: rrpc->ret_argc = argc;
1.1 misho 154:
1.5 ! misho 155: if (argc && arr) {
! 156: /* Go Encapsulate variables ... */
! 157: if ((i = io_vals2buffer(buf + Limit, s->srv_netbuf - Limit, arr)) == -1) {
! 158: rpc_srv_freeVals(f);
1.3 misho 159: argc = 0;
160: ret = -7;
1.5 ! misho 161: rpc_SetErr(EBADRPC, "Error:: in prepare RPC packet values (-7) ...\n");
1.3 misho 162: goto makeReply;
1.5 ! misho 163: } else {
! 164: Limit += i;
1.1 misho 165:
1.5 ! misho 166: rpc_srv_freeVals(f);
1.1 misho 167: }
168: }
169:
1.4 misho 170: ret = send(c->cli_sock, buf, Limit, 0);
171: if (ret == -1) {
1.1 misho 172: LOGERR;
173: ret = -8;
174: break;
175: }
176: if (ret != Limit) {
1.5 ! misho 177: rpc_SetErr(EPROCUNAVAIL, "Error:: in send RPC request, should be send %d bytes, "
1.1 misho 178: "really is %d\n", Limit, ret);
179: ret = -9;
180: break;
181: }
1.4 misho 182: } while (ret > -1 || s->srv_kill != kill);
1.1 misho 183:
184: shutdown(c->cli_sock, SHUT_RDWR);
185: close(c->cli_sock);
186: memset(c, 0, sizeof(rpc_cli_t));
1.5 ! misho 187: free(buf);
1.2 misho 188: return (void*) (long)ret;
189: }
190:
191:
192: static void *
193: rpc_srv_dispatchVars(void *arg)
194: {
195: rpc_cli_t *c = arg;
196: rpc_srv_t *s;
197: rpc_blob_t *b;
1.4 misho 198: int ret = 0;
1.2 misho 199: fd_set fds;
200: u_char buf[sizeof(struct tagBLOBHdr)];
201: struct tagBLOBHdr *blob;
202:
203: if (!arg) {
204: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t procced BLOB client ...\n");
205: return NULL;
206: } else
207: s = c->cli_parent;
208:
209: do {
1.4 misho 210: /* check for disable service at this moment? */
211: if (s->srv_blob.state == disable && s->srv_kill != kill) {
212: usleep(100000);
213: #ifdef HAVE_PTHREAD_YIELD
214: pthread_yield();
215: #endif
216: continue;
1.2 misho 217: }
218:
219: FD_ZERO(&fds);
220: FD_SET(c->cli_sock, &fds);
221: ret = select(c->cli_sock + 1, &fds, NULL, NULL, NULL);
222: if (ret == -1) {
1.4 misho 223: if (errno == EINTR && s->srv_kill != kill && s->srv_blob.state != kill)
224: continue;
225:
226: LOGERR;
1.2 misho 227: ret = -2;
1.4 misho 228: break;
1.2 misho 229: }
230:
231: memset(buf, 0, sizeof buf);
1.4 misho 232: ret = recv(c->cli_sock, buf, sizeof buf, 0);
233: if (ret == -1) {
1.2 misho 234: LOGERR;
235: ret = -3;
236: break;
237: }
1.4 misho 238: /* receive EOF, disable or kill service */
239: if (!ret || s->srv_blob.state == kill || s->srv_kill == kill) {
1.2 misho 240: ret = 0;
241: break;
242: }
243: if (ret < sizeof(struct tagBLOBHdr)) {
1.5 ! misho 244: rpc_SetErr(ERPCMISMATCH, "Error:: too short BLOB packet ...\n");
1.2 misho 245: ret = -4;
246: break;
247: } else
248: blob = (struct tagBLOBHdr*) buf;
1.4 misho 249: /* check BLOB packet session info */
1.2 misho 250: if (memcmp(&blob->hdr_session, &s->srv_session, sizeof blob->hdr_session)) {
251: rpc_SetErr(EINVAL, "Error:: get invalid BLOB session ...\n");
252: ret = -5;
253: goto makeReply;
254: }
1.4 misho 255: /* Go to proceed packet ... */
1.2 misho 256: switch (blob->hdr_cmd) {
257: case get:
258: if (!(b = rpc_srv_getBLOB(s, blob->hdr_var))) {
259: rpc_SetErr(EINVAL, "Error:: var (%x) not found into BLOB server ...\n",
260: blob->hdr_var);
261: ret = -6;
262: break;
263: } else
264: blob->hdr_len = b->blob_len;
265:
266: if (rpc_srv_blobMap(s, b) != -1) {
267: ret = rpc_srv_sendBLOB(c, b);
268: rpc_srv_blobUnmap(b);
269: } else
270: ret = -7;
271: break;
272: case set:
273: if ((b = rpc_srv_registerBLOB(s, blob->hdr_len))) {
1.4 misho 274: /* set new BLOB variable for reply :) */
1.2 misho 275: blob->hdr_var = b->blob_var;
276:
277: ret = rpc_srv_recvBLOB(c, b);
278: rpc_srv_blobUnmap(b);
279: } else
280: ret = -7;
281: break;
282: case unset:
283: ret = rpc_srv_unregisterBLOB(s, blob->hdr_var);
284: if (ret == -1)
285: ret = -7;
286: break;
287: default:
1.5 ! misho 288: rpc_SetErr(EPROCUNAVAIL, "Error:: unsupported BLOB command (%d)...\n",
1.2 misho 289: blob->hdr_cmd);
290: ret = -7;
291: }
292:
293: makeReply:
1.4 misho 294: /* Replay to client! */
1.2 misho 295: blob->hdr_cmd = ret < 0 ? error : ok;
296: blob->hdr_ret = ret;
1.4 misho 297: ret = send(c->cli_sock, buf, sizeof buf, 0);
298: if (ret == -1) {
1.2 misho 299: LOGERR;
300: ret = -8;
301: break;
302: }
303: if (ret != sizeof buf) {
1.5 ! misho 304: rpc_SetErr(EPROCUNAVAIL, "Error:: in send BLOB reply, should be send %d bytes, "
1.2 misho 305: "really is %d\n", sizeof buf, ret);
306: ret = -9;
307: break;
308: }
1.4 misho 309: } while (ret > -1 || s->srv_kill != kill);
1.2 misho 310:
311: shutdown(c->cli_sock, SHUT_RDWR);
312: close(c->cli_sock);
313: memset(c, 0, sizeof(rpc_cli_t));
1.5 ! misho 314: return (void*) ((long)ret);
1.1 misho 315: }
316:
317: // -------------------------------------------------
318:
319: /*
1.2 misho 320: * rpc_srv_initBLOBServer() Init & create BLOB Server
1.4 misho 321: * @srv = RPC server instance
1.2 misho 322: * @Port = Port for bind server, if Port == 0 default port is selected
323: * @diskDir = Disk place for BLOB file objects
324: * return: -1 == error or 0 bind and created BLOB server instance
325: */
326: int
327: rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)
328: {
329: int n = 1;
1.4 misho 330: struct sockaddr sa;
331: struct sockaddr_in *sin = (struct sockaddr_in*) &sa;
332: struct sockaddr_in6 *sin6 = (struct sockaddr_in6*) &sa;
333: struct sockaddr_un *sun = (struct sockaddr_un*) &sa;
1.2 misho 334:
335: if (!srv) {
336: rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t init BLOB server ...\n");
337: return -1;
338: }
339: if (srv->srv_blob.state) {
340: rpc_SetErr(EPERM, "Warning:: Already started BLOB server!\n");
341: return 0;
342: }
343: if (!Port)
344: Port = RPC_DEFPORT + 1;
345:
346: memset(&srv->srv_blob, 0, sizeof srv->srv_blob);
347: if (access(diskDir, R_OK | W_OK) == -1) {
348: LOGERR;
349: return -1;
350: } else
351: strlcpy(srv->srv_blob.dir, diskDir, UCHAR_MAX + 1);
352:
353: srv->srv_blob.server.cli_tid = pthread_self();
354: srv->srv_blob.server.cli_parent = srv;
1.4 misho 355:
356: memcpy(&sa, &srv->srv_server.cli_sa, sizeof sa);
357: switch (srv->srv_server.cli_sa.sa_family) {
358: case AF_INET:
359: sin->sin_port = htons(Port);
360: memcpy(&srv->srv_blob.server.cli_sa, sin, sizeof(struct sockaddr));
361: break;
362: case AF_INET6:
363: sin6->sin6_port = htons(Port);
364: memcpy(&srv->srv_blob.server.cli_sa, sin6, sizeof(struct sockaddr));
365: break;
366: case AF_LOCAL:
367: strlcat(sun->sun_path, ".blob", sizeof sun->sun_path);
368: memcpy(&srv->srv_blob.server.cli_sa, sun, sizeof(struct sockaddr));
369: break;
370: default:
371: return -1;
1.2 misho 372: }
373:
1.4 misho 374: /* create BLOB server socket */
1.2 misho 375: srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa_family, SOCK_STREAM, 0);
376: if (srv->srv_blob.server.cli_sock == -1) {
377: LOGERR;
378: return -1;
379: }
380: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
381: LOGERR;
382: close(srv->srv_blob.server.cli_sock);
383: return -1;
384: }
1.5 ! misho 385: n = srv->srv_netbuf;
! 386: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
! 387: LOGERR;
! 388: close(srv->srv_blob.server.cli_sock);
! 389: return -1;
! 390: }
! 391: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
! 392: LOGERR;
! 393: close(srv->srv_blob.server.cli_sock);
! 394: return -1;
! 395: }
1.2 misho 396: if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa,
397: sizeof srv->srv_blob.server.cli_sa) == -1) {
398: LOGERR;
399: close(srv->srv_blob.server.cli_sock);
400: return -1;
401: }
402:
1.4 misho 403: /* allocate pool for concurent clients */
1.2 misho 404: srv->srv_blob.clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));
405: if (!srv->srv_blob.clients) {
406: LOGERR;
407: close(srv->srv_blob.server.cli_sock);
408: return -1;
409: } else
410: memset(srv->srv_blob.clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));
411:
412: pthread_mutex_init(&srv->srv_blob.mtx, NULL);
413:
414: rpc_srv_registerCall(srv, NULL, CALL_BLOBSHUTDOWN, 0);
1.5 ! misho 415: rpc_srv_registerCall(srv, NULL, CALL_BLOBCLIENTS, 1);
! 416: rpc_srv_registerCall(srv, NULL, CALL_BLOBVARS, 1);
! 417: rpc_srv_registerCall(srv, NULL, CALL_BLOBSTATE, 0);
1.2 misho 418:
1.4 misho 419: srv->srv_blob.state = enable; /* enable BLOB */
1.2 misho 420: return 0;
421: }
422:
423: /*
424: * rpc_srv_endBLOBServer() Destroy BLOB server, close all opened sockets and free resources
425: * @srv = RPC Server instance
426: * return: none
427: */
428: void
429: rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
430: {
431: rpc_cli_t *c;
432: register int i;
433: rpc_blob_t *f;
434:
435: if (!srv) {
436: rpc_SetErr(EINVAL, "Error:: Can`t destroy server because parameter is null!\n");
437: return;
438: } else
1.4 misho 439: srv->srv_blob.state = kill;
1.2 misho 440:
441: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSHUTDOWN);
442: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBCLIENTS);
443: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBVARS);
444: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSTATE);
445:
1.4 misho 446: /* close all clients connections & server socket */
1.2 misho 447: for (i = 0, c = srv->srv_blob.clients; i < srv->srv_numcli && c; i++, c++)
448: if (c->cli_sa.sa_family)
449: shutdown(c->cli_sock, SHUT_RDWR);
450: close(srv->srv_blob.server.cli_sock);
451:
1.5 ! misho 452: pthread_mutex_lock(&srv->srv_blob.mtx);
1.2 misho 453: if (srv->srv_blob.clients) {
454: free(srv->srv_blob.clients);
455: srv->srv_blob.clients = NULL;
456: }
457:
1.4 misho 458: /* detach blobs */
1.2 misho 459: while ((f = srv->srv_blob.blobs)) {
460: srv->srv_blob.blobs = f->blob_next;
461: rpc_srv_blobFree(srv, f);
462: free(f);
463: }
464: pthread_mutex_unlock(&srv->srv_blob.mtx);
465:
466: while (pthread_mutex_trylock(&srv->srv_blob.mtx) == EBUSY);
467: pthread_mutex_destroy(&srv->srv_blob.mtx);
468: }
469:
470: /*
1.5 ! misho 471: * rpc_srv_loopBLOB() Execute Main BLOB server loop and wait for clients requests
1.2 misho 472: * @srv = RPC Server instance
473: * return: -1 error or 0 ok, infinite loop ...
474: */
475: int
1.5 ! misho 476: rpc_srv_loopBLOB(rpc_srv_t * __restrict srv)
1.2 misho 477: {
478: socklen_t salen = sizeof(struct sockaddr);
479: register int i;
480: rpc_cli_t *c;
481: fd_set fds;
482: int ret;
483: struct timeval tv = { DEF_RPC_TIMEOUT, 0 };
484:
1.4 misho 485: if (!srv || srv->srv_blob.state == kill) {
1.2 misho 486: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start BLOB server ...\n");
487: return -1;
488: }
489:
490: if (listen(srv->srv_blob.server.cli_sock, SOMAXCONN) == -1) {
491: LOGERR;
492: return -1;
493: }
494:
1.4 misho 495: while (srv->srv_blob.state != kill && srv->srv_kill != kill) {
1.2 misho 496: for (c = srv->srv_blob.clients, i = 0; i < srv->srv_numcli && c; i++, c++)
497: if (!c->cli_sa.sa_family)
498: break;
499: if (i >= srv->srv_numcli) {
1.5 ! misho 500: #ifdef HAVE_PTHREAD_YIELD
! 501: pthread_yield();
! 502: #else
1.2 misho 503: usleep(1000000);
1.5 ! misho 504: #endif
1.2 misho 505: continue;
506: }
507:
508: FD_ZERO(&fds);
509: FD_SET(srv->srv_blob.server.cli_sock, &fds);
510: ret = select(srv->srv_blob.server.cli_sock + 1, &fds, NULL, NULL, &tv);
511: if (ret == -1) {
512: LOGERR;
513: ret = 1;
514: break;
515: }
516: if (!ret)
517: continue;
518:
519: c->cli_sock = accept(srv->srv_blob.server.cli_sock, &c->cli_sa, &salen);
520: if (c->cli_sock == -1) {
521: LOGERR;
522: continue;
523: } else
524: c->cli_parent = srv;
525:
526: if (pthread_create(&c->cli_tid, NULL, rpc_srv_dispatchVars, c)) {
527: LOGERR;
528: continue;
1.4 misho 529: } else
530: pthread_detach(c->cli_tid);
1.2 misho 531: }
532:
1.5 ! misho 533: srv->srv_blob.state = kill;
1.2 misho 534:
535: return 0;
536: }
537:
538:
539: /*
1.1 misho 540: * rpc_srv_initServer() Init & create RPC Server
541: * @regProgID = ProgramID for authentication & recognition
542: * @regProcID = ProcessID for authentication & recognition
543: * @concurentClients = Concurent clients at same time to this server
1.5 ! misho 544: * @netBuf = Network buffer length, if =0 == BUFSIZ (also meaning max RPC packet)
1.4 misho 545: * @family = Family type, AF_INET, AF_INET6 or AF_LOCAL
546: * @csHost = Host name or address for bind server, if NULL any address
1.1 misho 547: * @Port = Port for bind server, if Port == 0 default port is selected
548: * return: NULL == error or !=NULL bind and created RPC server instance
549: */
550: rpc_srv_t *
551: rpc_srv_initServer(u_int regProgID, u_int regProcID, int concurentClients,
1.5 ! misho 552: int netBuf, u_short family, const char *csHost, u_short Port)
1.1 misho 553: {
554: rpc_srv_t *srv = NULL;
555: int n = 1;
556: struct hostent *host = NULL;
1.4 misho 557: struct sockaddr sa;
558: struct sockaddr_in *sin = (struct sockaddr_in*) &sa;
559: struct sockaddr_in6 *sin6 = (struct sockaddr_in6*) &sa;
560: struct sockaddr_un *sun = (struct sockaddr_un*) &sa;
1.1 misho 561:
1.4 misho 562: if (!concurentClients || !regProgID ||
563: (family != AF_INET && family != AF_INET6 && family != AF_LOCAL)) {
1.1 misho 564: rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t init RPC server ...\n");
565: return NULL;
566: }
567: if (!Port)
568: Port = RPC_DEFPORT;
1.5 ! misho 569: if (!netBuf)
! 570: netBuf = BUFSIZ;
1.4 misho 571: if (csHost && family != AF_LOCAL) {
1.1 misho 572: host = gethostbyname2(csHost, family);
573: if (!host) {
574: rpc_SetErr(h_errno, "Error:: %s\n", hstrerror(h_errno));
575: return NULL;
576: }
577: }
1.4 misho 578: memset(&sa, 0, sizeof sa);
579: sa.sa_family = family;
1.1 misho 580: switch (family) {
581: case AF_INET:
1.4 misho 582: sin->sin_len = sizeof(struct sockaddr_in);
583: sin->sin_port = htons(Port);
1.1 misho 584: if (csHost)
1.4 misho 585: memcpy(&sin->sin_addr, host->h_addr, host->h_length);
1.1 misho 586: break;
587: case AF_INET6:
1.4 misho 588: sin6->sin6_len = sizeof(struct sockaddr_in6);
589: sin6->sin6_port = htons(Port);
590: if (csHost)
591: memcpy(&sin6->sin6_addr, host->h_addr, host->h_length);
592: break;
593: case AF_LOCAL:
594: sun->sun_len = sizeof(struct sockaddr_un);
1.1 misho 595: if (csHost)
1.4 misho 596: strlcpy(sun->sun_path, csHost, sizeof sun->sun_path);
1.1 misho 597: break;
598: default:
599: rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t start RPC server ...\n");
600: return NULL;
601: }
602:
603: srv = malloc(sizeof(rpc_srv_t));
604: if (!srv) {
605: LOGERR;
606: return NULL;
607: } else
608: memset(srv, 0, sizeof(rpc_srv_t));
609:
1.5 ! misho 610: srv->srv_netbuf = netBuf;
1.1 misho 611: srv->srv_numcli = concurentClients;
612: srv->srv_session.sess_version = RPC_VERSION;
613: srv->srv_session.sess_program = regProgID;
614: srv->srv_session.sess_process = regProcID;
615:
1.2 misho 616: srv->srv_server.cli_tid = pthread_self();
1.1 misho 617: srv->srv_server.cli_parent = srv;
1.4 misho 618: switch (family) {
619: case AF_INET:
620: memcpy(&srv->srv_server.cli_sa, sin, sizeof srv->srv_server.cli_sa);
621: break;
622: case AF_INET6:
623: memcpy(&srv->srv_server.cli_sa, sin6, sizeof srv->srv_server.cli_sa);
624: break;
625: case AF_LOCAL:
626: memcpy(&srv->srv_server.cli_sa, sun, sizeof srv->srv_server.cli_sa);
627: unlink(sun->sun_path);
628: break;
629: }
630:
631: /* create server socket */
1.1 misho 632: srv->srv_server.cli_sock = socket(family, SOCK_STREAM, 0);
633: if (srv->srv_server.cli_sock == -1) {
634: LOGERR;
635: free(srv);
636: return NULL;
637: }
638: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
639: LOGERR;
640: close(srv->srv_server.cli_sock);
641: free(srv);
642: return NULL;
643: }
1.5 ! misho 644: n = srv->srv_netbuf;
! 645: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
! 646: LOGERR;
! 647: close(srv->srv_server.cli_sock);
! 648: free(srv);
! 649: return NULL;
! 650: }
! 651: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
! 652: LOGERR;
! 653: close(srv->srv_server.cli_sock);
! 654: free(srv);
! 655: return NULL;
! 656: }
1.1 misho 657: if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa, sizeof srv->srv_server.cli_sa) == -1) {
658: LOGERR;
659: close(srv->srv_server.cli_sock);
660: free(srv);
661: return NULL;
662: }
663:
1.4 misho 664: /* allocate pool for concurent clients */
1.1 misho 665: srv->srv_clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));
666: if (!srv->srv_clients) {
667: LOGERR;
668: close(srv->srv_server.cli_sock);
669: free(srv);
670: return NULL;
671: } else
672: memset(srv->srv_clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));
673:
1.2 misho 674: pthread_mutex_init(&srv->srv_mtx, NULL);
675:
676: rpc_srv_registerCall(srv, NULL, CALL_SRVSHUTDOWN, 0);
1.5 ! misho 677: rpc_srv_registerCall(srv, NULL, CALL_SRVCLIENTS, 1);
! 678: rpc_srv_registerCall(srv, NULL, CALL_SRVSESSIONS, 4);
! 679: rpc_srv_registerCall(srv, NULL, CALL_SRVCALLS, 1);
1.1 misho 680: return srv;
681: }
682:
683: /*
684: * rpc_srv_endServer() Destroy RPC server, close all opened sockets and free resources
685: * @srv = RPC Server instance
686: * return: none
687: */
688: void
689: rpc_srv_endServer(rpc_srv_t * __restrict srv)
690: {
691: rpc_cli_t *c;
692: register int i;
693: rpc_func_t *f;
694:
695: if (!srv) {
696: rpc_SetErr(EINVAL, "Error:: Can`t destroy server because parameter is null!\n");
697: return;
698: }
699:
1.2 misho 700: rpc_srv_endBLOBServer(srv);
1.1 misho 701:
1.4 misho 702: /* close all clients connections & server socket */
1.1 misho 703: for (i = 0, c = srv->srv_clients; i < srv->srv_numcli && c; i++, c++)
1.2 misho 704: if (c->cli_sa.sa_family) {
1.1 misho 705: shutdown(c->cli_sock, SHUT_RDWR);
1.2 misho 706: close(c->cli_sock);
707: }
1.1 misho 708: close(srv->srv_server.cli_sock);
709:
710: if (srv->srv_clients) {
711: free(srv->srv_clients);
1.2 misho 712: srv->srv_clients = NULL;
1.1 misho 713: srv->srv_numcli = 0;
714: }
715:
1.4 misho 716: /* detach exported calls */
1.2 misho 717: pthread_mutex_lock(&srv->srv_mtx);
718: while ((f = srv->srv_funcs)) {
719: srv->srv_funcs = f->func_next;
720: free(f);
721: }
722: pthread_mutex_unlock(&srv->srv_mtx);
723:
724: while (pthread_mutex_trylock(&srv->srv_mtx) == EBUSY);
725: pthread_mutex_destroy(&srv->srv_mtx);
726:
1.1 misho 727: free(srv);
728: srv = NULL;
729: }
730:
731: /*
1.5 ! misho 732: * rpc_srv_loopServer() Execute Main server loop and wait for clients requests
1.1 misho 733: * @srv = RPC Server instance
734: * return: -1 error or 0 ok, infinite loop ...
735: */
736: int
1.5 ! misho 737: rpc_srv_loopServer(rpc_srv_t * __restrict srv)
1.1 misho 738: {
739: socklen_t salen = sizeof(struct sockaddr);
740: register int i;
741: rpc_cli_t *c;
1.2 misho 742: fd_set fds;
743: int ret;
744: struct timeval tv = { DEF_RPC_TIMEOUT, 0 };
1.1 misho 745:
746: if (!srv) {
747: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start RPC server ...\n");
748: return -1;
749: }
750:
1.5 ! misho 751: /* activate BLOB server worker if srv->srv_blob.state == enable */
! 752: rpc_srv_execBLOBServer(srv);
! 753:
1.1 misho 754: if (listen(srv->srv_server.cli_sock, SOMAXCONN) == -1) {
755: LOGERR;
756: return -1;
757: }
758:
1.4 misho 759: while (srv->srv_kill != kill) {
1.1 misho 760: for (c = srv->srv_clients, i = 0; i < srv->srv_numcli && c; i++, c++)
761: if (!c->cli_sa.sa_family)
762: break;
1.2 misho 763: if (i >= srv->srv_numcli) {
1.5 ! misho 764: #ifdef HAVE_PTHREAD_YIELD
! 765: pthread_yield();
! 766: #else
1.1 misho 767: usleep(1000000);
1.5 ! misho 768: #endif
1.1 misho 769: continue;
770: }
1.2 misho 771:
772: FD_ZERO(&fds);
773: FD_SET(srv->srv_server.cli_sock, &fds);
774: ret = select(srv->srv_server.cli_sock + 1, &fds, NULL, NULL, &tv);
775: if (ret == -1) {
776: LOGERR;
777: ret = 1;
778: break;
779: }
780: if (!ret)
781: continue;
782:
1.1 misho 783: c->cli_sock = accept(srv->srv_server.cli_sock, &c->cli_sa, &salen);
784: if (c->cli_sock == -1) {
785: LOGERR;
786: continue;
787: } else
788: c->cli_parent = srv;
789:
790: if (pthread_create(&c->cli_tid, NULL, rpc_srv_dispatchCall, c)) {
791: LOGERR;
792: continue;
1.4 misho 793: } else
794: pthread_detach(c->cli_tid);
1.1 misho 795: }
796:
797: return 0;
798: }
799:
800: // ---------------------------------------------------------
801:
802: /*
803: * rpc_srv_execCall() Execute registered call from RPC server
804: * @call = Register RPC call
805: * @rpc = IN RPC call structure
1.5 ! misho 806: * @args = IN RPC calling arguments from RPC client
1.1 misho 807: * return: -1 error, !=-1 ok
808: */
809: int
1.2 misho 810: rpc_srv_execCall(rpc_func_t * __restrict call, struct tagRPCCall * __restrict rpc,
1.5 ! misho 811: array_t * __restrict args)
1.1 misho 812: {
813: void *dl;
814: rpc_callback_t func;
815: int ret;
816:
1.2 misho 817: if (!call || !rpc || !call->func_parent) {
1.1 misho 818: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t exec call from RPC server ...\n");
819: return -1;
820: }
821:
822: dl = dlopen((char*) (*call->func_file ? call->func_file : NULL), RTLD_NOW);
823: if (!dl) {
824: rpc_SetErr(ENOENT, "Error:: Can`t attach module %s!\n", dlerror());
825: return -1;
826: }
827:
828: func = dlsym(dl, (char*) call->func_name);
829: if (func)
1.2 misho 830: ret = func(call, rpc->call_argc, args);
1.1 misho 831: else {
832: rpc_SetErr(ENOEXEC, "Error:: Can`t find function %s!\n", dlerror());
833: ret = -1;
834: }
835:
836: dlclose(dl);
837: return ret;
838: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>