Annotation of libaitrpc/src/srv.c, revision 1.5.2.7
1.1 misho 1: /*************************************************************************
2: * (C) 2010 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.5.2.7 ! misho 6: * $Id: srv.c,v 1.5.2.6 2011/10/31 14:45:26 misho Exp $
1.1 misho 7: *
1.2 misho 8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011
16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
1.1 misho 46: #include "global.h"
47:
48:
49: static void *
50: rpc_srv_dispatchCall(void *arg)
51: {
52: rpc_cli_t *c = arg;
53: rpc_srv_t *s;
1.3 misho 54: rpc_func_t *f = NULL;
1.5 misho 55: array_t *arr;
1.1 misho 56: struct tagRPCCall *rpc;
1.3 misho 57: struct tagRPCRet *rrpc;
58: rpc_sess_t ses = { 0 };
1.1 misho 59: fd_set fds;
1.5 misho 60: u_char *buf;
1.2 misho 61: int ret, argc = 0, Limit = 0;
1.1 misho 62: register int i;
1.5.2.3 misho 63: uint16_t tag = 0;
64: uint32_t hash = 0;
1.1 misho 65:
66: if (!arg) {
67: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t procced RPC client ...\n");
68: return NULL;
69: } else
70: s = c->cli_parent;
71:
1.5 misho 72: buf = malloc(s->srv_netbuf);
73: if (!buf) {
74: LOGERR;
75: return NULL;
76: }
77:
1.1 misho 78: do {
79: FD_ZERO(&fds);
80: FD_SET(c->cli_sock, &fds);
81: ret = select(c->cli_sock + 1, &fds, NULL, NULL, NULL);
82: if (ret == -1) {
1.4 misho 83: if (errno == EINTR && s->srv_kill != kill)
84: continue;
85:
86: LOGERR;
1.1 misho 87: ret = -2;
1.4 misho 88: break;
1.1 misho 89: }
1.5 misho 90: memset(buf, 0, s->srv_netbuf);
91: ret = recv(c->cli_sock, buf, s->srv_netbuf, 0);
1.4 misho 92: if (ret == -1) {
1.1 misho 93: LOGERR;
94: ret = -3;
95: break;
96: }
1.4 misho 97: if (!ret) { /* receive EOF */
1.1 misho 98: ret = 0;
99: break;
100: }
101: if (ret < sizeof(struct tagRPCCall)) {
1.5 misho 102: rpc_SetErr(ERPCMISMATCH, "Error:: too short RPC packet ...\n");
1.1 misho 103: ret = -4;
104: break;
105: } else
106: rpc = (struct tagRPCCall*) buf;
1.4 misho 107: /* check RPC packet session info */
1.5.2.1 misho 108: if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) {
1.5 misho 109: rpc_SetErr(ERPCMISMATCH, "Error:: get invalid RPC session ...\n");
1.1 misho 110: ret = -5;
1.2 misho 111: goto makeReply;
112: } else
113: Limit = sizeof(struct tagRPCCall);
1.5 misho 114:
1.5.2.3 misho 115: tag = rpc->call_tag;
116: hash = rpc->call_hash;
117:
1.4 misho 118: /* RPC is OK! Go decapsulate variables ... */
1.5.2.2 misho 119: if (ntohs(rpc->call_argc)) {
120: arr = io_buffer2vals(buf + Limit, s->srv_netbuf - Limit,
121: ntohs(rpc->call_argc), 1);
1.5 misho 122: if (!arr) {
1.2 misho 123: ret = -5;
124: goto makeReply;
1.1 misho 125: }
1.5 misho 126: } else
127: arr = NULL;
1.1 misho 128:
1.4 misho 129: /* execute call */
1.1 misho 130: argc = 0;
1.3 misho 131: memcpy(&ses, &rpc->call_session, sizeof ses);
1.5.2.3 misho 132: if (!(f = rpc_srv_getCall(s, ntohs(tag), ntohl(hash)))) {
1.5 misho 133: rpc_SetErr(EPROGUNAVAIL, "Error:: call not found into RPC server ...\n");
1.1 misho 134: ret = -6;
135: } else
1.5 misho 136: if ((ret = rpc_srv_execCall(f, rpc, arr)) == -1)
1.2 misho 137: ret = -9;
1.5 misho 138: else {
139: if (arr)
140: io_arrayDestroy(&arr);
141: argc = rpc_srv_getVars(f, &arr);
142: goto makeReply; /* Call finish OK */
143: }
144:
145: if (arr)
146: io_arrayDestroy(&arr);
147:
1.3 misho 148: makeReply:
1.5 misho 149: /* Made reply */
150: memset(buf, 0, s->srv_netbuf);
1.3 misho 151: rrpc = (struct tagRPCRet*) buf;
152: Limit = sizeof(struct tagRPCRet);
1.1 misho 153:
1.3 misho 154: memcpy(&rrpc->ret_session, &ses, sizeof(rpc_sess_t));
1.5.2.3 misho 155: rrpc->ret_tag = tag;
156: rrpc->ret_hash = hash;
1.5.2.2 misho 157: rrpc->ret_errno = htonl(rpc_Errno);
158: rrpc->ret_retcode = htonl(ret);
159: rrpc->ret_argc = htons(argc);
1.1 misho 160:
1.5 misho 161: if (argc && arr) {
162: /* Go Encapsulate variables ... */
163: if ((i = io_vals2buffer(buf + Limit, s->srv_netbuf - Limit, arr)) == -1) {
164: rpc_srv_freeVals(f);
1.3 misho 165: argc = 0;
166: ret = -7;
1.5 misho 167: rpc_SetErr(EBADRPC, "Error:: in prepare RPC packet values (-7) ...\n");
1.3 misho 168: goto makeReply;
1.5 misho 169: } else {
170: Limit += i;
1.1 misho 171:
1.5 misho 172: rpc_srv_freeVals(f);
1.1 misho 173: }
174: }
175:
1.4 misho 176: ret = send(c->cli_sock, buf, Limit, 0);
177: if (ret == -1) {
1.1 misho 178: LOGERR;
179: ret = -8;
180: break;
181: }
182: if (ret != Limit) {
1.5 misho 183: rpc_SetErr(EPROCUNAVAIL, "Error:: in send RPC request, should be send %d bytes, "
1.1 misho 184: "really is %d\n", Limit, ret);
185: ret = -9;
186: break;
187: }
1.4 misho 188: } while (ret > -1 || s->srv_kill != kill);
1.1 misho 189:
190: shutdown(c->cli_sock, SHUT_RDWR);
191: close(c->cli_sock);
192: memset(c, 0, sizeof(rpc_cli_t));
1.5 misho 193: free(buf);
1.2 misho 194: return (void*) (long)ret;
195: }
196:
197:
198: static void *
199: rpc_srv_dispatchVars(void *arg)
200: {
201: rpc_cli_t *c = arg;
202: rpc_srv_t *s;
203: rpc_blob_t *b;
1.4 misho 204: int ret = 0;
1.2 misho 205: fd_set fds;
206: u_char buf[sizeof(struct tagBLOBHdr)];
207: struct tagBLOBHdr *blob;
208:
209: if (!arg) {
210: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t procced BLOB client ...\n");
211: return NULL;
212: } else
213: s = c->cli_parent;
214:
215: do {
1.4 misho 216: /* check for disable service at this moment? */
217: if (s->srv_blob.state == disable && s->srv_kill != kill) {
218: usleep(100000);
219: #ifdef HAVE_PTHREAD_YIELD
220: pthread_yield();
221: #endif
222: continue;
1.2 misho 223: }
224:
225: FD_ZERO(&fds);
226: FD_SET(c->cli_sock, &fds);
227: ret = select(c->cli_sock + 1, &fds, NULL, NULL, NULL);
228: if (ret == -1) {
1.4 misho 229: if (errno == EINTR && s->srv_kill != kill && s->srv_blob.state != kill)
230: continue;
231:
232: LOGERR;
1.2 misho 233: ret = -2;
1.4 misho 234: break;
1.2 misho 235: }
236:
237: memset(buf, 0, sizeof buf);
1.4 misho 238: ret = recv(c->cli_sock, buf, sizeof buf, 0);
239: if (ret == -1) {
1.2 misho 240: LOGERR;
241: ret = -3;
242: break;
243: }
1.4 misho 244: /* receive EOF, disable or kill service */
245: if (!ret || s->srv_blob.state == kill || s->srv_kill == kill) {
1.2 misho 246: ret = 0;
247: break;
248: }
249: if (ret < sizeof(struct tagBLOBHdr)) {
1.5 misho 250: rpc_SetErr(ERPCMISMATCH, "Error:: too short BLOB packet ...\n");
1.2 misho 251: ret = -4;
252: break;
253: } else
254: blob = (struct tagBLOBHdr*) buf;
1.4 misho 255: /* check BLOB packet session info */
1.2 misho 256: if (memcmp(&blob->hdr_session, &s->srv_session, sizeof blob->hdr_session)) {
257: rpc_SetErr(EINVAL, "Error:: get invalid BLOB session ...\n");
258: ret = -5;
259: goto makeReply;
260: }
1.4 misho 261: /* Go to proceed packet ... */
1.2 misho 262: switch (blob->hdr_cmd) {
263: case get:
264: if (!(b = rpc_srv_getBLOB(s, blob->hdr_var))) {
265: rpc_SetErr(EINVAL, "Error:: var (%x) not found into BLOB server ...\n",
266: blob->hdr_var);
267: ret = -6;
268: break;
269: } else
270: blob->hdr_len = b->blob_len;
271:
272: if (rpc_srv_blobMap(s, b) != -1) {
273: ret = rpc_srv_sendBLOB(c, b);
274: rpc_srv_blobUnmap(b);
275: } else
276: ret = -7;
277: break;
278: case set:
279: if ((b = rpc_srv_registerBLOB(s, blob->hdr_len))) {
1.4 misho 280: /* set new BLOB variable for reply :) */
1.2 misho 281: blob->hdr_var = b->blob_var;
282:
283: ret = rpc_srv_recvBLOB(c, b);
284: rpc_srv_blobUnmap(b);
285: } else
286: ret = -7;
287: break;
288: case unset:
289: ret = rpc_srv_unregisterBLOB(s, blob->hdr_var);
290: if (ret == -1)
291: ret = -7;
292: break;
293: default:
1.5 misho 294: rpc_SetErr(EPROCUNAVAIL, "Error:: unsupported BLOB command (%d)...\n",
1.2 misho 295: blob->hdr_cmd);
296: ret = -7;
297: }
298:
299: makeReply:
1.4 misho 300: /* Replay to client! */
1.2 misho 301: blob->hdr_cmd = ret < 0 ? error : ok;
302: blob->hdr_ret = ret;
1.4 misho 303: ret = send(c->cli_sock, buf, sizeof buf, 0);
304: if (ret == -1) {
1.2 misho 305: LOGERR;
306: ret = -8;
307: break;
308: }
309: if (ret != sizeof buf) {
1.5 misho 310: rpc_SetErr(EPROCUNAVAIL, "Error:: in send BLOB reply, should be send %d bytes, "
1.2 misho 311: "really is %d\n", sizeof buf, ret);
312: ret = -9;
313: break;
314: }
1.4 misho 315: } while (ret > -1 || s->srv_kill != kill);
1.2 misho 316:
317: shutdown(c->cli_sock, SHUT_RDWR);
318: close(c->cli_sock);
319: memset(c, 0, sizeof(rpc_cli_t));
1.5 misho 320: return (void*) ((long)ret);
1.1 misho 321: }
322:
323: // -------------------------------------------------
324:
325: /*
1.2 misho 326: * rpc_srv_initBLOBServer() Init & create BLOB Server
1.4 misho 327: * @srv = RPC server instance
1.2 misho 328: * @Port = Port for bind server, if Port == 0 default port is selected
329: * @diskDir = Disk place for BLOB file objects
330: * return: -1 == error or 0 bind and created BLOB server instance
331: */
332: int
333: rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)
334: {
335: int n = 1;
1.5.2.6 misho 336: io_sockaddr_t sa;
1.2 misho 337:
338: if (!srv) {
339: rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t init BLOB server ...\n");
340: return -1;
341: }
342: if (srv->srv_blob.state) {
343: rpc_SetErr(EPERM, "Warning:: Already started BLOB server!\n");
344: return 0;
345: }
346: if (!Port)
347: Port = RPC_DEFPORT + 1;
348:
349: memset(&srv->srv_blob, 0, sizeof srv->srv_blob);
350: if (access(diskDir, R_OK | W_OK) == -1) {
351: LOGERR;
352: return -1;
353: } else
354: strlcpy(srv->srv_blob.dir, diskDir, UCHAR_MAX + 1);
355:
356: srv->srv_blob.server.cli_tid = pthread_self();
357: srv->srv_blob.server.cli_parent = srv;
1.4 misho 358:
359: memcpy(&sa, &srv->srv_server.cli_sa, sizeof sa);
1.5.2.6 misho 360: switch (srv->srv_server.cli_sa.sa.sa_family) {
1.4 misho 361: case AF_INET:
1.5.2.6 misho 362: sa.sin.sin_port = htons(Port);
363: memcpy(&srv->srv_blob.server.cli_sa, &sa, sizeof sa);
1.4 misho 364: break;
365: case AF_INET6:
1.5.2.6 misho 366: sa.sin6.sin6_port = htons(Port);
367: memcpy(&srv->srv_blob.server.cli_sa, &sa, sizeof sa);
1.4 misho 368: break;
369: case AF_LOCAL:
1.5.2.6 misho 370: strlcat(sa.sun.sun_path, ".blob", sizeof sa.sun.sun_path);
371: memcpy(&srv->srv_blob.server.cli_sa, &sa, sizeof sa);
1.4 misho 372: break;
373: default:
374: return -1;
1.2 misho 375: }
376:
1.4 misho 377: /* create BLOB server socket */
1.5.2.6 misho 378: srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);
1.2 misho 379: if (srv->srv_blob.server.cli_sock == -1) {
380: LOGERR;
381: return -1;
382: }
383: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
384: LOGERR;
385: close(srv->srv_blob.server.cli_sock);
386: return -1;
387: }
1.5 misho 388: n = srv->srv_netbuf;
389: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
390: LOGERR;
391: close(srv->srv_blob.server.cli_sock);
392: return -1;
393: }
394: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
395: LOGERR;
396: close(srv->srv_blob.server.cli_sock);
397: return -1;
398: }
1.5.2.6 misho 399: if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa,
1.5.2.7 ! misho 400: srv->srv_blob.server.cli_sa.sa.sa_len) == -1) {
1.2 misho 401: LOGERR;
402: close(srv->srv_blob.server.cli_sock);
403: return -1;
404: }
405:
1.4 misho 406: /* allocate pool for concurent clients */
1.2 misho 407: srv->srv_blob.clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));
408: if (!srv->srv_blob.clients) {
409: LOGERR;
410: close(srv->srv_blob.server.cli_sock);
411: return -1;
412: } else
413: memset(srv->srv_blob.clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));
414:
415: pthread_mutex_init(&srv->srv_blob.mtx, NULL);
416:
417: rpc_srv_registerCall(srv, NULL, CALL_BLOBSHUTDOWN, 0);
1.5 misho 418: rpc_srv_registerCall(srv, NULL, CALL_BLOBCLIENTS, 1);
419: rpc_srv_registerCall(srv, NULL, CALL_BLOBVARS, 1);
420: rpc_srv_registerCall(srv, NULL, CALL_BLOBSTATE, 0);
1.2 misho 421:
1.4 misho 422: srv->srv_blob.state = enable; /* enable BLOB */
1.2 misho 423: return 0;
424: }
425:
426: /*
427: * rpc_srv_endBLOBServer() Destroy BLOB server, close all opened sockets and free resources
428: * @srv = RPC Server instance
429: * return: none
430: */
431: void
432: rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
433: {
434: rpc_cli_t *c;
435: register int i;
436: rpc_blob_t *f;
437:
438: if (!srv) {
439: rpc_SetErr(EINVAL, "Error:: Can`t destroy server because parameter is null!\n");
440: return;
441: } else
1.4 misho 442: srv->srv_blob.state = kill;
1.2 misho 443:
444: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSHUTDOWN);
445: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBCLIENTS);
446: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBVARS);
447: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSTATE);
448:
1.4 misho 449: /* close all clients connections & server socket */
1.2 misho 450: for (i = 0, c = srv->srv_blob.clients; i < srv->srv_numcli && c; i++, c++)
1.5.2.6 misho 451: if (c->cli_sa.sa.sa_family)
1.2 misho 452: shutdown(c->cli_sock, SHUT_RDWR);
453: close(srv->srv_blob.server.cli_sock);
454:
1.5 misho 455: pthread_mutex_lock(&srv->srv_blob.mtx);
1.2 misho 456: if (srv->srv_blob.clients) {
457: free(srv->srv_blob.clients);
458: srv->srv_blob.clients = NULL;
459: }
460:
1.4 misho 461: /* detach blobs */
1.2 misho 462: while ((f = srv->srv_blob.blobs)) {
463: srv->srv_blob.blobs = f->blob_next;
464: rpc_srv_blobFree(srv, f);
465: free(f);
466: }
467: pthread_mutex_unlock(&srv->srv_blob.mtx);
468:
469: while (pthread_mutex_trylock(&srv->srv_blob.mtx) == EBUSY);
470: pthread_mutex_destroy(&srv->srv_blob.mtx);
471: }
472:
473: /*
1.5 misho 474: * rpc_srv_loopBLOB() Execute Main BLOB server loop and wait for clients requests
1.2 misho 475: * @srv = RPC Server instance
476: * return: -1 error or 0 ok, infinite loop ...
477: */
478: int
1.5 misho 479: rpc_srv_loopBLOB(rpc_srv_t * __restrict srv)
1.2 misho 480: {
1.5.2.6 misho 481: socklen_t salen = sizeof(io_sockaddr_t);
1.2 misho 482: register int i;
483: rpc_cli_t *c;
484: fd_set fds;
485: int ret;
486: struct timeval tv = { DEF_RPC_TIMEOUT, 0 };
487:
1.4 misho 488: if (!srv || srv->srv_blob.state == kill) {
1.2 misho 489: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start BLOB server ...\n");
490: return -1;
491: }
492:
493: if (listen(srv->srv_blob.server.cli_sock, SOMAXCONN) == -1) {
494: LOGERR;
495: return -1;
496: }
497:
1.4 misho 498: while (srv->srv_blob.state != kill && srv->srv_kill != kill) {
1.2 misho 499: for (c = srv->srv_blob.clients, i = 0; i < srv->srv_numcli && c; i++, c++)
1.5.2.6 misho 500: if (!c->cli_sa.sa.sa_family)
1.2 misho 501: break;
502: if (i >= srv->srv_numcli) {
1.5 misho 503: #ifdef HAVE_PTHREAD_YIELD
504: pthread_yield();
505: #else
1.2 misho 506: usleep(1000000);
1.5 misho 507: #endif
1.2 misho 508: continue;
509: }
510:
511: FD_ZERO(&fds);
512: FD_SET(srv->srv_blob.server.cli_sock, &fds);
513: ret = select(srv->srv_blob.server.cli_sock + 1, &fds, NULL, NULL, &tv);
514: if (ret == -1) {
515: LOGERR;
516: ret = 1;
517: break;
518: }
519: if (!ret)
520: continue;
521:
1.5.2.6 misho 522: c->cli_sock = accept(srv->srv_blob.server.cli_sock, &c->cli_sa.sa, &salen);
1.2 misho 523: if (c->cli_sock == -1) {
524: LOGERR;
525: continue;
526: } else
527: c->cli_parent = srv;
528:
529: if (pthread_create(&c->cli_tid, NULL, rpc_srv_dispatchVars, c)) {
530: LOGERR;
531: continue;
1.4 misho 532: } else
533: pthread_detach(c->cli_tid);
1.2 misho 534: }
535:
1.5 misho 536: srv->srv_blob.state = kill;
1.2 misho 537:
538: return 0;
539: }
540:
541:
542: /*
1.1 misho 543: * rpc_srv_initServer() Init & create RPC Server
544: * @regProgID = ProgramID for authentication & recognition
545: * @regProcID = ProcessID for authentication & recognition
546: * @concurentClients = Concurent clients at same time to this server
1.5 misho 547: * @netBuf = Network buffer length, if =0 == BUFSIZ (also meaning max RPC packet)
1.4 misho 548: * @family = Family type, AF_INET, AF_INET6 or AF_LOCAL
549: * @csHost = Host name or address for bind server, if NULL any address
1.1 misho 550: * @Port = Port for bind server, if Port == 0 default port is selected
551: * return: NULL == error or !=NULL bind and created RPC server instance
552: */
553: rpc_srv_t *
554: rpc_srv_initServer(u_int regProgID, u_int regProcID, int concurentClients,
1.5 misho 555: int netBuf, u_short family, const char *csHost, u_short Port)
1.1 misho 556: {
557: rpc_srv_t *srv = NULL;
558: int n = 1;
559: struct hostent *host = NULL;
1.5.2.6 misho 560: io_sockaddr_t sa;
1.1 misho 561:
1.4 misho 562: if (!concurentClients || !regProgID ||
563: (family != AF_INET && family != AF_INET6 && family != AF_LOCAL)) {
1.1 misho 564: rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t init RPC server ...\n");
565: return NULL;
566: }
567: if (!Port)
568: Port = RPC_DEFPORT;
1.5 misho 569: if (!netBuf)
570: netBuf = BUFSIZ;
1.4 misho 571: if (csHost && family != AF_LOCAL) {
1.1 misho 572: host = gethostbyname2(csHost, family);
573: if (!host) {
574: rpc_SetErr(h_errno, "Error:: %s\n", hstrerror(h_errno));
575: return NULL;
576: }
577: }
1.4 misho 578: memset(&sa, 0, sizeof sa);
1.5.2.6 misho 579: sa.sa.sa_family = family;
1.1 misho 580: switch (family) {
581: case AF_INET:
1.5.2.6 misho 582: sa.sin.sin_len = sizeof(struct sockaddr_in);
583: sa.sin.sin_port = htons(Port);
1.1 misho 584: if (csHost)
1.5.2.6 misho 585: memcpy(&sa.sin.sin_addr, host->h_addr, host->h_length);
1.1 misho 586: break;
587: case AF_INET6:
1.5.2.6 misho 588: sa.sin6.sin6_len = sizeof(struct sockaddr_in6);
589: sa.sin6.sin6_port = htons(Port);
1.4 misho 590: if (csHost)
1.5.2.6 misho 591: memcpy(&sa.sin6.sin6_addr, host->h_addr, host->h_length);
1.4 misho 592: break;
593: case AF_LOCAL:
1.5.2.6 misho 594: sa.sun.sun_len = sizeof(struct sockaddr_un);
1.1 misho 595: if (csHost)
1.5.2.6 misho 596: strlcpy(sa.sun.sun_path, csHost, sizeof sa.sun.sun_path);
597: unlink(sa.sun.sun_path);
1.1 misho 598: break;
599: default:
600: rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t start RPC server ...\n");
601: return NULL;
602: }
603:
604: srv = malloc(sizeof(rpc_srv_t));
605: if (!srv) {
606: LOGERR;
607: return NULL;
608: } else
609: memset(srv, 0, sizeof(rpc_srv_t));
610:
1.5 misho 611: srv->srv_netbuf = netBuf;
1.1 misho 612: srv->srv_numcli = concurentClients;
613: srv->srv_session.sess_version = RPC_VERSION;
614: srv->srv_session.sess_program = regProgID;
615: srv->srv_session.sess_process = regProcID;
616:
1.2 misho 617: srv->srv_server.cli_tid = pthread_self();
1.1 misho 618: srv->srv_server.cli_parent = srv;
1.5.2.6 misho 619: memcpy(&srv->srv_server.cli_sa, &sa, sizeof sa);
1.4 misho 620:
621: /* create server socket */
1.1 misho 622: srv->srv_server.cli_sock = socket(family, SOCK_STREAM, 0);
623: if (srv->srv_server.cli_sock == -1) {
624: LOGERR;
625: free(srv);
626: return NULL;
627: }
628: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
629: LOGERR;
630: close(srv->srv_server.cli_sock);
631: free(srv);
632: return NULL;
633: }
1.5 misho 634: n = srv->srv_netbuf;
635: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
636: LOGERR;
637: close(srv->srv_server.cli_sock);
638: free(srv);
639: return NULL;
640: }
641: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
642: LOGERR;
643: close(srv->srv_server.cli_sock);
644: free(srv);
645: return NULL;
646: }
1.5.2.7 ! misho 647: if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa,
! 648: srv->srv_server.cli_sa.sa.sa_len) == -1) {
1.1 misho 649: LOGERR;
650: close(srv->srv_server.cli_sock);
651: free(srv);
652: return NULL;
653: }
654:
1.4 misho 655: /* allocate pool for concurent clients */
1.1 misho 656: srv->srv_clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));
657: if (!srv->srv_clients) {
658: LOGERR;
659: close(srv->srv_server.cli_sock);
660: free(srv);
661: return NULL;
662: } else
663: memset(srv->srv_clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));
664:
1.2 misho 665: pthread_mutex_init(&srv->srv_mtx, NULL);
666:
667: rpc_srv_registerCall(srv, NULL, CALL_SRVSHUTDOWN, 0);
1.5 misho 668: rpc_srv_registerCall(srv, NULL, CALL_SRVCLIENTS, 1);
669: rpc_srv_registerCall(srv, NULL, CALL_SRVSESSIONS, 4);
670: rpc_srv_registerCall(srv, NULL, CALL_SRVCALLS, 1);
1.1 misho 671: return srv;
672: }
673:
674: /*
675: * rpc_srv_endServer() Destroy RPC server, close all opened sockets and free resources
1.5.2.4 misho 676: * @psrv = RPC Server instance
1.1 misho 677: * return: none
678: */
679: void
1.5.2.4 misho 680: rpc_srv_endServer(rpc_srv_t ** __restrict psrv)
1.1 misho 681: {
682: rpc_cli_t *c;
683: register int i;
684: rpc_func_t *f;
685:
1.5.2.4 misho 686: if (!psrv || !*psrv) {
1.1 misho 687: rpc_SetErr(EINVAL, "Error:: Can`t destroy server because parameter is null!\n");
688: return;
689: }
690:
1.5.2.4 misho 691: rpc_srv_endBLOBServer(*psrv);
1.1 misho 692:
1.4 misho 693: /* close all clients connections & server socket */
1.5.2.4 misho 694: for (i = 0, c = (*psrv)->srv_clients; i < (*psrv)->srv_numcli && c; i++, c++)
1.5.2.6 misho 695: if (c->cli_sa.sa.sa_family) {
1.1 misho 696: shutdown(c->cli_sock, SHUT_RDWR);
1.2 misho 697: close(c->cli_sock);
698: }
1.5.2.4 misho 699: close((*psrv)->srv_server.cli_sock);
1.1 misho 700:
1.5.2.4 misho 701: if ((*psrv)->srv_clients) {
702: free((*psrv)->srv_clients);
703: (*psrv)->srv_clients = NULL;
704: (*psrv)->srv_numcli = 0;
1.1 misho 705: }
706:
1.4 misho 707: /* detach exported calls */
1.5.2.4 misho 708: pthread_mutex_lock(&(*psrv)->srv_mtx);
709: while ((f = (*psrv)->srv_funcs)) {
710: (*psrv)->srv_funcs = f->func_next;
1.5.2.5 misho 711: rpc_srv_destroyVars(f);
1.2 misho 712: free(f);
713: }
1.5.2.4 misho 714: pthread_mutex_unlock(&(*psrv)->srv_mtx);
1.2 misho 715:
1.5.2.4 misho 716: while (pthread_mutex_trylock(&(*psrv)->srv_mtx) == EBUSY);
717: pthread_mutex_destroy(&(*psrv)->srv_mtx);
1.2 misho 718:
1.5.2.4 misho 719: free(*psrv);
720: *psrv = NULL;
1.1 misho 721: }
722:
723: /*
1.5 misho 724: * rpc_srv_loopServer() Execute Main server loop and wait for clients requests
1.1 misho 725: * @srv = RPC Server instance
726: * return: -1 error or 0 ok, infinite loop ...
727: */
728: int
1.5 misho 729: rpc_srv_loopServer(rpc_srv_t * __restrict srv)
1.1 misho 730: {
1.5.2.6 misho 731: socklen_t salen = sizeof(io_sockaddr_t);
1.1 misho 732: register int i;
733: rpc_cli_t *c;
1.2 misho 734: fd_set fds;
735: int ret;
736: struct timeval tv = { DEF_RPC_TIMEOUT, 0 };
1.1 misho 737:
738: if (!srv) {
739: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start RPC server ...\n");
740: return -1;
741: }
742:
1.5 misho 743: /* activate BLOB server worker if srv->srv_blob.state == enable */
744: rpc_srv_execBLOBServer(srv);
745:
1.1 misho 746: if (listen(srv->srv_server.cli_sock, SOMAXCONN) == -1) {
747: LOGERR;
748: return -1;
749: }
750:
1.4 misho 751: while (srv->srv_kill != kill) {
1.1 misho 752: for (c = srv->srv_clients, i = 0; i < srv->srv_numcli && c; i++, c++)
1.5.2.6 misho 753: if (!c->cli_sa.sa.sa_family)
1.1 misho 754: break;
1.2 misho 755: if (i >= srv->srv_numcli) {
1.5 misho 756: #ifdef HAVE_PTHREAD_YIELD
757: pthread_yield();
758: #else
1.1 misho 759: usleep(1000000);
1.5 misho 760: #endif
1.1 misho 761: continue;
762: }
1.2 misho 763:
764: FD_ZERO(&fds);
765: FD_SET(srv->srv_server.cli_sock, &fds);
766: ret = select(srv->srv_server.cli_sock + 1, &fds, NULL, NULL, &tv);
767: if (ret == -1) {
768: LOGERR;
769: ret = 1;
770: break;
771: }
772: if (!ret)
773: continue;
774:
1.5.2.6 misho 775: c->cli_sock = accept(srv->srv_server.cli_sock, &c->cli_sa.sa, &salen);
1.1 misho 776: if (c->cli_sock == -1) {
777: LOGERR;
778: continue;
779: } else
780: c->cli_parent = srv;
781:
782: if (pthread_create(&c->cli_tid, NULL, rpc_srv_dispatchCall, c)) {
783: LOGERR;
784: continue;
1.4 misho 785: } else
786: pthread_detach(c->cli_tid);
1.1 misho 787: }
788:
789: return 0;
790: }
791:
792: // ---------------------------------------------------------
793:
794: /*
795: * rpc_srv_execCall() Execute registered call from RPC server
796: * @call = Register RPC call
797: * @rpc = IN RPC call structure
1.5 misho 798: * @args = IN RPC calling arguments from RPC client
1.1 misho 799: * return: -1 error, !=-1 ok
800: */
801: int
1.2 misho 802: rpc_srv_execCall(rpc_func_t * __restrict call, struct tagRPCCall * __restrict rpc,
1.5 misho 803: array_t * __restrict args)
1.1 misho 804: {
805: void *dl;
806: rpc_callback_t func;
807: int ret;
808:
1.2 misho 809: if (!call || !rpc || !call->func_parent) {
1.1 misho 810: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t exec call from RPC server ...\n");
811: return -1;
812: }
813:
814: dl = dlopen((char*) (*call->func_file ? call->func_file : NULL), RTLD_NOW);
815: if (!dl) {
816: rpc_SetErr(ENOENT, "Error:: Can`t attach module %s!\n", dlerror());
817: return -1;
818: }
819:
820: func = dlsym(dl, (char*) call->func_name);
821: if (func)
1.5.2.2 misho 822: ret = func(call, ntohs(rpc->call_argc), args);
1.1 misho 823: else {
824: rpc_SetErr(ENOEXEC, "Error:: Can`t find function %s!\n", dlerror());
825: ret = -1;
826: }
827:
828: dlclose(dl);
829: return ret;
830: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>