Annotation of libaitrpc/src/srv.c, revision 1.5.2.3
1.1 misho 1: /*************************************************************************
2: * (C) 2010 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.5.2.3 ! misho 6: * $Id: srv.c,v 1.5.2.2 2011/09/07 09:10:55 misho Exp $
1.1 misho 7: *
1.2 misho 8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011
16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
1.1 misho 46: #include "global.h"
47:
48:
49: static void *
50: rpc_srv_dispatchCall(void *arg)
51: {
52: rpc_cli_t *c = arg;
53: rpc_srv_t *s;
1.3 misho 54: rpc_func_t *f = NULL;
1.5 misho 55: array_t *arr;
1.1 misho 56: struct tagRPCCall *rpc;
1.3 misho 57: struct tagRPCRet *rrpc;
58: rpc_sess_t ses = { 0 };
1.1 misho 59: fd_set fds;
1.5 misho 60: u_char *buf;
1.2 misho 61: int ret, argc = 0, Limit = 0;
1.1 misho 62: register int i;
1.5.2.3 ! misho 63: uint16_t tag = 0;
! 64: uint32_t hash = 0;
1.1 misho 65:
66: if (!arg) {
67: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t procced RPC client ...\n");
68: return NULL;
69: } else
70: s = c->cli_parent;
71:
1.5 misho 72: buf = malloc(s->srv_netbuf);
73: if (!buf) {
74: LOGERR;
75: return NULL;
76: }
77:
1.1 misho 78: do {
79: FD_ZERO(&fds);
80: FD_SET(c->cli_sock, &fds);
81: ret = select(c->cli_sock + 1, &fds, NULL, NULL, NULL);
82: if (ret == -1) {
1.4 misho 83: if (errno == EINTR && s->srv_kill != kill)
84: continue;
85:
86: LOGERR;
1.1 misho 87: ret = -2;
1.4 misho 88: break;
1.1 misho 89: }
1.5 misho 90: memset(buf, 0, s->srv_netbuf);
91: ret = recv(c->cli_sock, buf, s->srv_netbuf, 0);
1.4 misho 92: if (ret == -1) {
1.1 misho 93: LOGERR;
94: ret = -3;
95: break;
96: }
1.4 misho 97: if (!ret) { /* receive EOF */
1.1 misho 98: ret = 0;
99: break;
100: }
101: if (ret < sizeof(struct tagRPCCall)) {
1.5 misho 102: rpc_SetErr(ERPCMISMATCH, "Error:: too short RPC packet ...\n");
1.1 misho 103: ret = -4;
104: break;
105: } else
106: rpc = (struct tagRPCCall*) buf;
1.4 misho 107: /* check RPC packet session info */
1.5.2.1 misho 108: if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) {
1.5 misho 109: rpc_SetErr(ERPCMISMATCH, "Error:: get invalid RPC session ...\n");
1.1 misho 110: ret = -5;
1.2 misho 111: goto makeReply;
112: } else
113: Limit = sizeof(struct tagRPCCall);
1.5 misho 114:
1.5.2.3 ! misho 115: tag = rpc->call_tag;
! 116: hash = rpc->call_hash;
! 117:
1.4 misho 118: /* RPC is OK! Go decapsulate variables ... */
1.5.2.2 misho 119: if (ntohs(rpc->call_argc)) {
120: arr = io_buffer2vals(buf + Limit, s->srv_netbuf - Limit,
121: ntohs(rpc->call_argc), 1);
1.5 misho 122: if (!arr) {
1.2 misho 123: ret = -5;
124: goto makeReply;
1.1 misho 125: }
1.5 misho 126: } else
127: arr = NULL;
1.1 misho 128:
1.4 misho 129: /* execute call */
1.1 misho 130: argc = 0;
1.3 misho 131: memcpy(&ses, &rpc->call_session, sizeof ses);
1.5.2.3 ! misho 132: if (!(f = rpc_srv_getCall(s, ntohs(tag), ntohl(hash)))) {
1.5 misho 133: rpc_SetErr(EPROGUNAVAIL, "Error:: call not found into RPC server ...\n");
1.1 misho 134: ret = -6;
135: } else
1.5 misho 136: if ((ret = rpc_srv_execCall(f, rpc, arr)) == -1)
1.2 misho 137: ret = -9;
1.5 misho 138: else {
139: if (arr)
140: io_arrayDestroy(&arr);
141: argc = rpc_srv_getVars(f, &arr);
142: goto makeReply; /* Call finish OK */
143: }
144:
145: if (arr)
146: io_arrayDestroy(&arr);
147:
1.3 misho 148: makeReply:
1.5 misho 149: /* Made reply */
150: memset(buf, 0, s->srv_netbuf);
1.3 misho 151: rrpc = (struct tagRPCRet*) buf;
152: Limit = sizeof(struct tagRPCRet);
1.1 misho 153:
1.3 misho 154: memcpy(&rrpc->ret_session, &ses, sizeof(rpc_sess_t));
1.5.2.3 ! misho 155: rrpc->ret_tag = tag;
! 156: rrpc->ret_hash = hash;
1.5.2.2 misho 157: rrpc->ret_errno = htonl(rpc_Errno);
158: rrpc->ret_retcode = htonl(ret);
159: rrpc->ret_argc = htons(argc);
1.1 misho 160:
1.5 misho 161: if (argc && arr) {
162: /* Go Encapsulate variables ... */
163: if ((i = io_vals2buffer(buf + Limit, s->srv_netbuf - Limit, arr)) == -1) {
164: rpc_srv_freeVals(f);
1.3 misho 165: argc = 0;
166: ret = -7;
1.5 misho 167: rpc_SetErr(EBADRPC, "Error:: in prepare RPC packet values (-7) ...\n");
1.3 misho 168: goto makeReply;
1.5 misho 169: } else {
170: Limit += i;
1.1 misho 171:
1.5 misho 172: rpc_srv_freeVals(f);
1.1 misho 173: }
174: }
175:
1.4 misho 176: ret = send(c->cli_sock, buf, Limit, 0);
177: if (ret == -1) {
1.1 misho 178: LOGERR;
179: ret = -8;
180: break;
181: }
182: if (ret != Limit) {
1.5 misho 183: rpc_SetErr(EPROCUNAVAIL, "Error:: in send RPC request, should be send %d bytes, "
1.1 misho 184: "really is %d\n", Limit, ret);
185: ret = -9;
186: break;
187: }
1.4 misho 188: } while (ret > -1 || s->srv_kill != kill);
1.1 misho 189:
190: shutdown(c->cli_sock, SHUT_RDWR);
191: close(c->cli_sock);
192: memset(c, 0, sizeof(rpc_cli_t));
1.5 misho 193: free(buf);
1.2 misho 194: return (void*) (long)ret;
195: }
196:
197:
198: static void *
199: rpc_srv_dispatchVars(void *arg)
200: {
201: rpc_cli_t *c = arg;
202: rpc_srv_t *s;
203: rpc_blob_t *b;
1.4 misho 204: int ret = 0;
1.2 misho 205: fd_set fds;
206: u_char buf[sizeof(struct tagBLOBHdr)];
207: struct tagBLOBHdr *blob;
208:
209: if (!arg) {
210: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t procced BLOB client ...\n");
211: return NULL;
212: } else
213: s = c->cli_parent;
214:
215: do {
1.4 misho 216: /* check for disable service at this moment? */
217: if (s->srv_blob.state == disable && s->srv_kill != kill) {
218: usleep(100000);
219: #ifdef HAVE_PTHREAD_YIELD
220: pthread_yield();
221: #endif
222: continue;
1.2 misho 223: }
224:
225: FD_ZERO(&fds);
226: FD_SET(c->cli_sock, &fds);
227: ret = select(c->cli_sock + 1, &fds, NULL, NULL, NULL);
228: if (ret == -1) {
1.4 misho 229: if (errno == EINTR && s->srv_kill != kill && s->srv_blob.state != kill)
230: continue;
231:
232: LOGERR;
1.2 misho 233: ret = -2;
1.4 misho 234: break;
1.2 misho 235: }
236:
237: memset(buf, 0, sizeof buf);
1.4 misho 238: ret = recv(c->cli_sock, buf, sizeof buf, 0);
239: if (ret == -1) {
1.2 misho 240: LOGERR;
241: ret = -3;
242: break;
243: }
1.4 misho 244: /* receive EOF, disable or kill service */
245: if (!ret || s->srv_blob.state == kill || s->srv_kill == kill) {
1.2 misho 246: ret = 0;
247: break;
248: }
249: if (ret < sizeof(struct tagBLOBHdr)) {
1.5 misho 250: rpc_SetErr(ERPCMISMATCH, "Error:: too short BLOB packet ...\n");
1.2 misho 251: ret = -4;
252: break;
253: } else
254: blob = (struct tagBLOBHdr*) buf;
1.4 misho 255: /* check BLOB packet session info */
1.2 misho 256: if (memcmp(&blob->hdr_session, &s->srv_session, sizeof blob->hdr_session)) {
257: rpc_SetErr(EINVAL, "Error:: get invalid BLOB session ...\n");
258: ret = -5;
259: goto makeReply;
260: }
1.4 misho 261: /* Go to proceed packet ... */
1.2 misho 262: switch (blob->hdr_cmd) {
263: case get:
264: if (!(b = rpc_srv_getBLOB(s, blob->hdr_var))) {
265: rpc_SetErr(EINVAL, "Error:: var (%x) not found into BLOB server ...\n",
266: blob->hdr_var);
267: ret = -6;
268: break;
269: } else
270: blob->hdr_len = b->blob_len;
271:
272: if (rpc_srv_blobMap(s, b) != -1) {
273: ret = rpc_srv_sendBLOB(c, b);
274: rpc_srv_blobUnmap(b);
275: } else
276: ret = -7;
277: break;
278: case set:
279: if ((b = rpc_srv_registerBLOB(s, blob->hdr_len))) {
1.4 misho 280: /* set new BLOB variable for reply :) */
1.2 misho 281: blob->hdr_var = b->blob_var;
282:
283: ret = rpc_srv_recvBLOB(c, b);
284: rpc_srv_blobUnmap(b);
285: } else
286: ret = -7;
287: break;
288: case unset:
289: ret = rpc_srv_unregisterBLOB(s, blob->hdr_var);
290: if (ret == -1)
291: ret = -7;
292: break;
293: default:
1.5 misho 294: rpc_SetErr(EPROCUNAVAIL, "Error:: unsupported BLOB command (%d)...\n",
1.2 misho 295: blob->hdr_cmd);
296: ret = -7;
297: }
298:
299: makeReply:
1.4 misho 300: /* Replay to client! */
1.2 misho 301: blob->hdr_cmd = ret < 0 ? error : ok;
302: blob->hdr_ret = ret;
1.4 misho 303: ret = send(c->cli_sock, buf, sizeof buf, 0);
304: if (ret == -1) {
1.2 misho 305: LOGERR;
306: ret = -8;
307: break;
308: }
309: if (ret != sizeof buf) {
1.5 misho 310: rpc_SetErr(EPROCUNAVAIL, "Error:: in send BLOB reply, should be send %d bytes, "
1.2 misho 311: "really is %d\n", sizeof buf, ret);
312: ret = -9;
313: break;
314: }
1.4 misho 315: } while (ret > -1 || s->srv_kill != kill);
1.2 misho 316:
317: shutdown(c->cli_sock, SHUT_RDWR);
318: close(c->cli_sock);
319: memset(c, 0, sizeof(rpc_cli_t));
1.5 misho 320: return (void*) ((long)ret);
1.1 misho 321: }
322:
323: // -------------------------------------------------
324:
325: /*
1.2 misho 326: * rpc_srv_initBLOBServer() Init & create BLOB Server
1.4 misho 327: * @srv = RPC server instance
1.2 misho 328: * @Port = Port for bind server, if Port == 0 default port is selected
329: * @diskDir = Disk place for BLOB file objects
330: * return: -1 == error or 0 bind and created BLOB server instance
331: */
332: int
333: rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)
334: {
335: int n = 1;
1.4 misho 336: struct sockaddr sa;
337: struct sockaddr_in *sin = (struct sockaddr_in*) &sa;
338: struct sockaddr_in6 *sin6 = (struct sockaddr_in6*) &sa;
339: struct sockaddr_un *sun = (struct sockaddr_un*) &sa;
1.2 misho 340:
341: if (!srv) {
342: rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t init BLOB server ...\n");
343: return -1;
344: }
345: if (srv->srv_blob.state) {
346: rpc_SetErr(EPERM, "Warning:: Already started BLOB server!\n");
347: return 0;
348: }
349: if (!Port)
350: Port = RPC_DEFPORT + 1;
351:
352: memset(&srv->srv_blob, 0, sizeof srv->srv_blob);
353: if (access(diskDir, R_OK | W_OK) == -1) {
354: LOGERR;
355: return -1;
356: } else
357: strlcpy(srv->srv_blob.dir, diskDir, UCHAR_MAX + 1);
358:
359: srv->srv_blob.server.cli_tid = pthread_self();
360: srv->srv_blob.server.cli_parent = srv;
1.4 misho 361:
362: memcpy(&sa, &srv->srv_server.cli_sa, sizeof sa);
363: switch (srv->srv_server.cli_sa.sa_family) {
364: case AF_INET:
365: sin->sin_port = htons(Port);
366: memcpy(&srv->srv_blob.server.cli_sa, sin, sizeof(struct sockaddr));
367: break;
368: case AF_INET6:
369: sin6->sin6_port = htons(Port);
370: memcpy(&srv->srv_blob.server.cli_sa, sin6, sizeof(struct sockaddr));
371: break;
372: case AF_LOCAL:
373: strlcat(sun->sun_path, ".blob", sizeof sun->sun_path);
374: memcpy(&srv->srv_blob.server.cli_sa, sun, sizeof(struct sockaddr));
375: break;
376: default:
377: return -1;
1.2 misho 378: }
379:
1.4 misho 380: /* create BLOB server socket */
1.2 misho 381: srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa_family, SOCK_STREAM, 0);
382: if (srv->srv_blob.server.cli_sock == -1) {
383: LOGERR;
384: return -1;
385: }
386: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
387: LOGERR;
388: close(srv->srv_blob.server.cli_sock);
389: return -1;
390: }
1.5 misho 391: n = srv->srv_netbuf;
392: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
393: LOGERR;
394: close(srv->srv_blob.server.cli_sock);
395: return -1;
396: }
397: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
398: LOGERR;
399: close(srv->srv_blob.server.cli_sock);
400: return -1;
401: }
1.2 misho 402: if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa,
403: sizeof srv->srv_blob.server.cli_sa) == -1) {
404: LOGERR;
405: close(srv->srv_blob.server.cli_sock);
406: return -1;
407: }
408:
1.4 misho 409: /* allocate pool for concurent clients */
1.2 misho 410: srv->srv_blob.clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));
411: if (!srv->srv_blob.clients) {
412: LOGERR;
413: close(srv->srv_blob.server.cli_sock);
414: return -1;
415: } else
416: memset(srv->srv_blob.clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));
417:
418: pthread_mutex_init(&srv->srv_blob.mtx, NULL);
419:
420: rpc_srv_registerCall(srv, NULL, CALL_BLOBSHUTDOWN, 0);
1.5 misho 421: rpc_srv_registerCall(srv, NULL, CALL_BLOBCLIENTS, 1);
422: rpc_srv_registerCall(srv, NULL, CALL_BLOBVARS, 1);
423: rpc_srv_registerCall(srv, NULL, CALL_BLOBSTATE, 0);
1.2 misho 424:
1.4 misho 425: srv->srv_blob.state = enable; /* enable BLOB */
1.2 misho 426: return 0;
427: }
428:
429: /*
430: * rpc_srv_endBLOBServer() Destroy BLOB server, close all opened sockets and free resources
431: * @srv = RPC Server instance
432: * return: none
433: */
434: void
435: rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
436: {
437: rpc_cli_t *c;
438: register int i;
439: rpc_blob_t *f;
440:
441: if (!srv) {
442: rpc_SetErr(EINVAL, "Error:: Can`t destroy server because parameter is null!\n");
443: return;
444: } else
1.4 misho 445: srv->srv_blob.state = kill;
1.2 misho 446:
447: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSHUTDOWN);
448: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBCLIENTS);
449: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBVARS);
450: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSTATE);
451:
1.4 misho 452: /* close all clients connections & server socket */
1.2 misho 453: for (i = 0, c = srv->srv_blob.clients; i < srv->srv_numcli && c; i++, c++)
454: if (c->cli_sa.sa_family)
455: shutdown(c->cli_sock, SHUT_RDWR);
456: close(srv->srv_blob.server.cli_sock);
457:
1.5 misho 458: pthread_mutex_lock(&srv->srv_blob.mtx);
1.2 misho 459: if (srv->srv_blob.clients) {
460: free(srv->srv_blob.clients);
461: srv->srv_blob.clients = NULL;
462: }
463:
1.4 misho 464: /* detach blobs */
1.2 misho 465: while ((f = srv->srv_blob.blobs)) {
466: srv->srv_blob.blobs = f->blob_next;
467: rpc_srv_blobFree(srv, f);
468: free(f);
469: }
470: pthread_mutex_unlock(&srv->srv_blob.mtx);
471:
472: while (pthread_mutex_trylock(&srv->srv_blob.mtx) == EBUSY);
473: pthread_mutex_destroy(&srv->srv_blob.mtx);
474: }
475:
476: /*
1.5 misho 477: * rpc_srv_loopBLOB() Execute Main BLOB server loop and wait for clients requests
1.2 misho 478: * @srv = RPC Server instance
479: * return: -1 error or 0 ok, infinite loop ...
480: */
481: int
1.5 misho 482: rpc_srv_loopBLOB(rpc_srv_t * __restrict srv)
1.2 misho 483: {
484: socklen_t salen = sizeof(struct sockaddr);
485: register int i;
486: rpc_cli_t *c;
487: fd_set fds;
488: int ret;
489: struct timeval tv = { DEF_RPC_TIMEOUT, 0 };
490:
1.4 misho 491: if (!srv || srv->srv_blob.state == kill) {
1.2 misho 492: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start BLOB server ...\n");
493: return -1;
494: }
495:
496: if (listen(srv->srv_blob.server.cli_sock, SOMAXCONN) == -1) {
497: LOGERR;
498: return -1;
499: }
500:
1.4 misho 501: while (srv->srv_blob.state != kill && srv->srv_kill != kill) {
1.2 misho 502: for (c = srv->srv_blob.clients, i = 0; i < srv->srv_numcli && c; i++, c++)
503: if (!c->cli_sa.sa_family)
504: break;
505: if (i >= srv->srv_numcli) {
1.5 misho 506: #ifdef HAVE_PTHREAD_YIELD
507: pthread_yield();
508: #else
1.2 misho 509: usleep(1000000);
1.5 misho 510: #endif
1.2 misho 511: continue;
512: }
513:
514: FD_ZERO(&fds);
515: FD_SET(srv->srv_blob.server.cli_sock, &fds);
516: ret = select(srv->srv_blob.server.cli_sock + 1, &fds, NULL, NULL, &tv);
517: if (ret == -1) {
518: LOGERR;
519: ret = 1;
520: break;
521: }
522: if (!ret)
523: continue;
524:
525: c->cli_sock = accept(srv->srv_blob.server.cli_sock, &c->cli_sa, &salen);
526: if (c->cli_sock == -1) {
527: LOGERR;
528: continue;
529: } else
530: c->cli_parent = srv;
531:
532: if (pthread_create(&c->cli_tid, NULL, rpc_srv_dispatchVars, c)) {
533: LOGERR;
534: continue;
1.4 misho 535: } else
536: pthread_detach(c->cli_tid);
1.2 misho 537: }
538:
1.5 misho 539: srv->srv_blob.state = kill;
1.2 misho 540:
541: return 0;
542: }
543:
544:
545: /*
1.1 misho 546: * rpc_srv_initServer() Init & create RPC Server
547: * @regProgID = ProgramID for authentication & recognition
548: * @regProcID = ProcessID for authentication & recognition
549: * @concurentClients = Concurent clients at same time to this server
1.5 misho 550: * @netBuf = Network buffer length, if =0 == BUFSIZ (also meaning max RPC packet)
1.4 misho 551: * @family = Family type, AF_INET, AF_INET6 or AF_LOCAL
552: * @csHost = Host name or address for bind server, if NULL any address
1.1 misho 553: * @Port = Port for bind server, if Port == 0 default port is selected
554: * return: NULL == error or !=NULL bind and created RPC server instance
555: */
556: rpc_srv_t *
557: rpc_srv_initServer(u_int regProgID, u_int regProcID, int concurentClients,
1.5 misho 558: int netBuf, u_short family, const char *csHost, u_short Port)
1.1 misho 559: {
560: rpc_srv_t *srv = NULL;
561: int n = 1;
562: struct hostent *host = NULL;
1.4 misho 563: struct sockaddr sa;
564: struct sockaddr_in *sin = (struct sockaddr_in*) &sa;
565: struct sockaddr_in6 *sin6 = (struct sockaddr_in6*) &sa;
566: struct sockaddr_un *sun = (struct sockaddr_un*) &sa;
1.1 misho 567:
1.4 misho 568: if (!concurentClients || !regProgID ||
569: (family != AF_INET && family != AF_INET6 && family != AF_LOCAL)) {
1.1 misho 570: rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t init RPC server ...\n");
571: return NULL;
572: }
573: if (!Port)
574: Port = RPC_DEFPORT;
1.5 misho 575: if (!netBuf)
576: netBuf = BUFSIZ;
1.4 misho 577: if (csHost && family != AF_LOCAL) {
1.1 misho 578: host = gethostbyname2(csHost, family);
579: if (!host) {
580: rpc_SetErr(h_errno, "Error:: %s\n", hstrerror(h_errno));
581: return NULL;
582: }
583: }
1.4 misho 584: memset(&sa, 0, sizeof sa);
585: sa.sa_family = family;
1.1 misho 586: switch (family) {
587: case AF_INET:
1.4 misho 588: sin->sin_len = sizeof(struct sockaddr_in);
589: sin->sin_port = htons(Port);
1.1 misho 590: if (csHost)
1.4 misho 591: memcpy(&sin->sin_addr, host->h_addr, host->h_length);
1.1 misho 592: break;
593: case AF_INET6:
1.4 misho 594: sin6->sin6_len = sizeof(struct sockaddr_in6);
595: sin6->sin6_port = htons(Port);
596: if (csHost)
597: memcpy(&sin6->sin6_addr, host->h_addr, host->h_length);
598: break;
599: case AF_LOCAL:
600: sun->sun_len = sizeof(struct sockaddr_un);
1.1 misho 601: if (csHost)
1.4 misho 602: strlcpy(sun->sun_path, csHost, sizeof sun->sun_path);
1.1 misho 603: break;
604: default:
605: rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t start RPC server ...\n");
606: return NULL;
607: }
608:
609: srv = malloc(sizeof(rpc_srv_t));
610: if (!srv) {
611: LOGERR;
612: return NULL;
613: } else
614: memset(srv, 0, sizeof(rpc_srv_t));
615:
1.5 misho 616: srv->srv_netbuf = netBuf;
1.1 misho 617: srv->srv_numcli = concurentClients;
618: srv->srv_session.sess_version = RPC_VERSION;
619: srv->srv_session.sess_program = regProgID;
620: srv->srv_session.sess_process = regProcID;
621:
1.2 misho 622: srv->srv_server.cli_tid = pthread_self();
1.1 misho 623: srv->srv_server.cli_parent = srv;
1.4 misho 624: switch (family) {
625: case AF_INET:
626: memcpy(&srv->srv_server.cli_sa, sin, sizeof srv->srv_server.cli_sa);
627: break;
628: case AF_INET6:
629: memcpy(&srv->srv_server.cli_sa, sin6, sizeof srv->srv_server.cli_sa);
630: break;
631: case AF_LOCAL:
632: memcpy(&srv->srv_server.cli_sa, sun, sizeof srv->srv_server.cli_sa);
633: unlink(sun->sun_path);
634: break;
635: }
636:
637: /* create server socket */
1.1 misho 638: srv->srv_server.cli_sock = socket(family, SOCK_STREAM, 0);
639: if (srv->srv_server.cli_sock == -1) {
640: LOGERR;
641: free(srv);
642: return NULL;
643: }
644: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
645: LOGERR;
646: close(srv->srv_server.cli_sock);
647: free(srv);
648: return NULL;
649: }
1.5 misho 650: n = srv->srv_netbuf;
651: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
652: LOGERR;
653: close(srv->srv_server.cli_sock);
654: free(srv);
655: return NULL;
656: }
657: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
658: LOGERR;
659: close(srv->srv_server.cli_sock);
660: free(srv);
661: return NULL;
662: }
1.1 misho 663: if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa, sizeof srv->srv_server.cli_sa) == -1) {
664: LOGERR;
665: close(srv->srv_server.cli_sock);
666: free(srv);
667: return NULL;
668: }
669:
1.4 misho 670: /* allocate pool for concurent clients */
1.1 misho 671: srv->srv_clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));
672: if (!srv->srv_clients) {
673: LOGERR;
674: close(srv->srv_server.cli_sock);
675: free(srv);
676: return NULL;
677: } else
678: memset(srv->srv_clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));
679:
1.2 misho 680: pthread_mutex_init(&srv->srv_mtx, NULL);
681:
682: rpc_srv_registerCall(srv, NULL, CALL_SRVSHUTDOWN, 0);
1.5 misho 683: rpc_srv_registerCall(srv, NULL, CALL_SRVCLIENTS, 1);
684: rpc_srv_registerCall(srv, NULL, CALL_SRVSESSIONS, 4);
685: rpc_srv_registerCall(srv, NULL, CALL_SRVCALLS, 1);
1.1 misho 686: return srv;
687: }
688:
689: /*
690: * rpc_srv_endServer() Destroy RPC server, close all opened sockets and free resources
691: * @srv = RPC Server instance
692: * return: none
693: */
694: void
695: rpc_srv_endServer(rpc_srv_t * __restrict srv)
696: {
697: rpc_cli_t *c;
698: register int i;
699: rpc_func_t *f;
700:
701: if (!srv) {
702: rpc_SetErr(EINVAL, "Error:: Can`t destroy server because parameter is null!\n");
703: return;
704: }
705:
1.2 misho 706: rpc_srv_endBLOBServer(srv);
1.1 misho 707:
1.4 misho 708: /* close all clients connections & server socket */
1.1 misho 709: for (i = 0, c = srv->srv_clients; i < srv->srv_numcli && c; i++, c++)
1.2 misho 710: if (c->cli_sa.sa_family) {
1.1 misho 711: shutdown(c->cli_sock, SHUT_RDWR);
1.2 misho 712: close(c->cli_sock);
713: }
1.1 misho 714: close(srv->srv_server.cli_sock);
715:
716: if (srv->srv_clients) {
717: free(srv->srv_clients);
1.2 misho 718: srv->srv_clients = NULL;
1.1 misho 719: srv->srv_numcli = 0;
720: }
721:
1.4 misho 722: /* detach exported calls */
1.2 misho 723: pthread_mutex_lock(&srv->srv_mtx);
724: while ((f = srv->srv_funcs)) {
725: srv->srv_funcs = f->func_next;
726: free(f);
727: }
728: pthread_mutex_unlock(&srv->srv_mtx);
729:
730: while (pthread_mutex_trylock(&srv->srv_mtx) == EBUSY);
731: pthread_mutex_destroy(&srv->srv_mtx);
732:
1.1 misho 733: free(srv);
734: srv = NULL;
735: }
736:
737: /*
1.5 misho 738: * rpc_srv_loopServer() Execute Main server loop and wait for clients requests
1.1 misho 739: * @srv = RPC Server instance
740: * return: -1 error or 0 ok, infinite loop ...
741: */
742: int
1.5 misho 743: rpc_srv_loopServer(rpc_srv_t * __restrict srv)
1.1 misho 744: {
745: socklen_t salen = sizeof(struct sockaddr);
746: register int i;
747: rpc_cli_t *c;
1.2 misho 748: fd_set fds;
749: int ret;
750: struct timeval tv = { DEF_RPC_TIMEOUT, 0 };
1.1 misho 751:
752: if (!srv) {
753: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start RPC server ...\n");
754: return -1;
755: }
756:
1.5 misho 757: /* activate BLOB server worker if srv->srv_blob.state == enable */
758: rpc_srv_execBLOBServer(srv);
759:
1.1 misho 760: if (listen(srv->srv_server.cli_sock, SOMAXCONN) == -1) {
761: LOGERR;
762: return -1;
763: }
764:
1.4 misho 765: while (srv->srv_kill != kill) {
1.1 misho 766: for (c = srv->srv_clients, i = 0; i < srv->srv_numcli && c; i++, c++)
767: if (!c->cli_sa.sa_family)
768: break;
1.2 misho 769: if (i >= srv->srv_numcli) {
1.5 misho 770: #ifdef HAVE_PTHREAD_YIELD
771: pthread_yield();
772: #else
1.1 misho 773: usleep(1000000);
1.5 misho 774: #endif
1.1 misho 775: continue;
776: }
1.2 misho 777:
778: FD_ZERO(&fds);
779: FD_SET(srv->srv_server.cli_sock, &fds);
780: ret = select(srv->srv_server.cli_sock + 1, &fds, NULL, NULL, &tv);
781: if (ret == -1) {
782: LOGERR;
783: ret = 1;
784: break;
785: }
786: if (!ret)
787: continue;
788:
1.1 misho 789: c->cli_sock = accept(srv->srv_server.cli_sock, &c->cli_sa, &salen);
790: if (c->cli_sock == -1) {
791: LOGERR;
792: continue;
793: } else
794: c->cli_parent = srv;
795:
796: if (pthread_create(&c->cli_tid, NULL, rpc_srv_dispatchCall, c)) {
797: LOGERR;
798: continue;
1.4 misho 799: } else
800: pthread_detach(c->cli_tid);
1.1 misho 801: }
802:
803: return 0;
804: }
805:
806: // ---------------------------------------------------------
807:
808: /*
809: * rpc_srv_execCall() Execute registered call from RPC server
810: * @call = Register RPC call
811: * @rpc = IN RPC call structure
1.5 misho 812: * @args = IN RPC calling arguments from RPC client
1.1 misho 813: * return: -1 error, !=-1 ok
814: */
815: int
1.2 misho 816: rpc_srv_execCall(rpc_func_t * __restrict call, struct tagRPCCall * __restrict rpc,
1.5 misho 817: array_t * __restrict args)
1.1 misho 818: {
819: void *dl;
820: rpc_callback_t func;
821: int ret;
822:
1.2 misho 823: if (!call || !rpc || !call->func_parent) {
1.1 misho 824: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t exec call from RPC server ...\n");
825: return -1;
826: }
827:
828: dl = dlopen((char*) (*call->func_file ? call->func_file : NULL), RTLD_NOW);
829: if (!dl) {
830: rpc_SetErr(ENOENT, "Error:: Can`t attach module %s!\n", dlerror());
831: return -1;
832: }
833:
834: func = dlsym(dl, (char*) call->func_name);
835: if (func)
1.5.2.2 misho 836: ret = func(call, ntohs(rpc->call_argc), args);
1.1 misho 837: else {
838: rpc_SetErr(ENOEXEC, "Error:: Can`t find function %s!\n", dlerror());
839: ret = -1;
840: }
841:
842: dlclose(dl);
843: return ret;
844: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>