Annotation of libaitrpc/src/srv.c, revision 1.4.2.1
1.1 misho 1: /*************************************************************************
2: * (C) 2010 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.4.2.1 ! misho 6: * $Id: srv.c,v 1.4 2011/08/29 22:37:06 misho Exp $
1.1 misho 7: *
1.2 misho 8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011
16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
1.1 misho 46: #include "global.h"
47:
48:
49: static void *
50: rpc_srv_dispatchCall(void *arg)
51: {
52: rpc_cli_t *c = arg;
53: rpc_srv_t *s;
1.4.2.1 ! misho 54: ait_val_t *vals = NULL, *v = NULL;
1.3 misho 55: rpc_func_t *f = NULL;
1.1 misho 56: struct tagRPCCall *rpc;
1.3 misho 57: struct tagRPCRet *rrpc;
58: rpc_sess_t ses = { 0 };
1.1 misho 59: fd_set fds;
60: u_char buf[BUFSIZ], *data;
1.2 misho 61: int ret, argc = 0, Limit = 0;
1.1 misho 62: register int i;
63:
64: if (!arg) {
65: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t procced RPC client ...\n");
66: return NULL;
67: } else
68: s = c->cli_parent;
69:
70: do {
1.3 misho 71: v = NULL;
1.1 misho 72: FD_ZERO(&fds);
73: FD_SET(c->cli_sock, &fds);
74: ret = select(c->cli_sock + 1, &fds, NULL, NULL, NULL);
75: if (ret == -1) {
1.4 misho 76: if (errno == EINTR && s->srv_kill != kill)
77: continue;
78:
79: LOGERR;
1.1 misho 80: ret = -2;
1.4 misho 81: break;
1.1 misho 82: }
1.4 misho 83: memset(buf, 0, sizeof buf);
84: ret = recv(c->cli_sock, buf, sizeof buf, 0);
85: if (ret == -1) {
1.1 misho 86: LOGERR;
87: ret = -3;
88: break;
89: }
1.4 misho 90: if (!ret) { /* receive EOF */
1.1 misho 91: ret = 0;
92: break;
93: }
94: if (ret < sizeof(struct tagRPCCall)) {
95: rpc_SetErr(EMSGSIZE, "Error:: too short RPC packet ...\n");
96: ret = -4;
97: break;
98: } else
99: rpc = (struct tagRPCCall*) buf;
1.4 misho 100: /* check RPC packet session info */
1.1 misho 101: if (memcmp(&rpc->call_session, &s->srv_session, sizeof rpc->call_session)) {
102: rpc_SetErr(EINVAL, "Error:: get invalid RPC session ...\n");
103: ret = -5;
1.2 misho 104: goto makeReply;
105: } else
106: Limit = sizeof(struct tagRPCCall);
1.4 misho 107: /* RPC is OK! Go decapsulate variables ... */
1.1 misho 108: if (rpc->call_argc) {
1.4.2.1 ! misho 109: v = (ait_val_t*) (buf + Limit);
1.4 misho 110: /* check RPC packet length */
1.4.2.1 ! misho 111: if (rpc->call_argc * sizeof(ait_val_t) > sizeof buf - Limit) {
1.4 misho 112: rpc_SetErr(EMSGSIZE, "Error:: too long RPC packet ...\n");
1.2 misho 113: ret = -5;
114: goto makeReply;
115: } else
1.4.2.1 ! misho 116: Limit += rpc->call_argc * sizeof(ait_val_t);
1.4 misho 117: /* RPC received variables types OK! */
1.4.2.1 ! misho 118: data = (u_char*) v + rpc->call_argc * sizeof(ait_val_t);
1.1 misho 119: for (i = 0; i < rpc->call_argc; i++) {
120: switch (v[i].val_type) {
121: case buffer:
1.4 misho 122: if (v[i].val_len > sizeof buf - Limit) {
123: rpc_SetErr(EMSGSIZE, "Error:: too long RPC packet ...\n");
1.2 misho 124: ret = -5;
125: goto makeReply;
126: } else
127: Limit += v[i].val_len;
128:
1.1 misho 129: v[i].val.buffer = data;
130: data += v[i].val_len;
131: break;
132: case string:
1.4 misho 133: if (v[i].val_len > sizeof buf - Limit) {
134: rpc_SetErr(EMSGSIZE, "Error:: too long RPC packet ...\n");
1.2 misho 135: ret = -5;
136: goto makeReply;
137: } else
138: Limit += v[i].val_len;
139:
1.1 misho 140: v[i].val.string = (int8_t*) data;
1.3 misho 141: data += v[i].val_len;
1.1 misho 142: break;
1.2 misho 143: case blob:
144: if (s->srv_blob.state == disable) {
145: rpc_SetErr(ENOTSUP, "Error:: BLOB server is disabled\n");
146: ret = -5;
147: goto makeReply;
148: }
1.4 misho 149: if (s->srv_blob.state == kill) {
150: rpc_SetErr(ENOTSUP, "Error:: BLOB server is gone.\n");
151: ret = -5;
152: goto makeReply;
153: }
1.1 misho 154: default:
155: break;
156: }
157: }
158: }
159:
1.4 misho 160: /* execute call */
1.1 misho 161: argc = 0;
162: vals = NULL;
1.3 misho 163: memcpy(&ses, &rpc->call_session, sizeof ses);
1.1 misho 164: if (!(f = rpc_srv_getCall(s, rpc->call_tag, rpc->call_hash))) {
165: rpc_SetErr(EINVAL, "Error:: call not found into RPC server ...\n");
166: ret = -6;
167: } else
1.2 misho 168: if ((ret = rpc_srv_execCall(f, rpc, v)) == -1)
169: ret = -9;
1.1 misho 170: else
1.4 misho 171: argc = rpc_srv_getVars(f, &vals);
1.3 misho 172: makeReply:
1.4 misho 173: /* made reply */
174: memset(buf, 0, sizeof buf);
1.3 misho 175: rrpc = (struct tagRPCRet*) buf;
176: Limit = sizeof(struct tagRPCRet);
1.1 misho 177:
1.3 misho 178: memcpy(&rrpc->ret_session, &ses, sizeof(rpc_sess_t));
179: rrpc->ret_tag = rpc->call_tag;
180: rrpc->ret_hash = rpc->call_hash;
181: rrpc->ret_errno = rpc_Errno;
182: rrpc->ret_retcode = ret;
183: rrpc->ret_argc = argc;
1.1 misho 184:
185: if (argc && vals) {
1.4.2.1 ! misho 186: v = (ait_val_t*) (buf + Limit);
! 187: if (argc * sizeof(ait_val_t) > sizeof buf - Limit) {
1.3 misho 188: for (i = 0; i < argc; i++)
1.4.2.1 ! misho 189: AIT_FREE_VAL(&vals[i]);
1.4 misho 190: rpc_srv_freeVars(f);
1.3 misho 191: vals = NULL;
192: argc = 0;
193: ret = -7;
194: rpc_SetErr(EMSGSIZE, "Error:: in prepare RPC packet values (-7) ...\n");
195: goto makeReply;
196: } else
1.4.2.1 ! misho 197: Limit += argc * sizeof(ait_val_t);
! 198: memcpy(v, vals, argc * sizeof(ait_val_t));
! 199: data = (u_char*) v + argc * sizeof(ait_val_t);
1.1 misho 200: for (ret = i = 0; i < argc; i++) {
201: switch (vals[i].val_type) {
202: case buffer:
1.4 misho 203: if (ret || Limit + vals[i].val_len > sizeof buf) {
1.1 misho 204: rpc_SetErr(EMSGSIZE, "Error:: in prepare RPC packet (-7) ...\n");
1.3 misho 205: rrpc->ret_retcode = ret = -7;
206: rrpc->ret_argc = 0;
1.1 misho 207: break;
208: }
209:
210: memcpy(data, vals[i].val.buffer, vals[i].val_len);
211: data += vals[i].val_len;
212: Limit += vals[i].val_len;
213: break;
214: case string:
1.4 misho 215: if (ret || Limit + vals[i].val_len > sizeof buf) {
1.1 misho 216: rpc_SetErr(EMSGSIZE, "Error:: in prepare RPC packet (-7) ...\n");
1.3 misho 217: rrpc->ret_retcode = ret = -7;
218: rrpc->ret_argc = 0;
1.1 misho 219: break;
220: }
221:
1.3 misho 222: memcpy(data, vals[i].val.string, vals[i].val_len);
223: data += vals[i].val_len;
224: Limit += vals[i].val_len;
1.1 misho 225: break;
1.2 misho 226: case blob:
227: if (s->srv_blob.state == disable) {
228: rpc_SetErr(ENOTSUP, "Error:: BLOB server is disabled\n");
1.3 misho 229: rrpc->ret_retcode = ret = -5;
230: rrpc->ret_argc = 0;
1.1 misho 231: break;
232: }
1.4 misho 233: if (s->srv_blob.state == kill) {
234: rpc_SetErr(ENOTSUP, "Error:: BLOB server is gone.\n");
235: rrpc->ret_retcode = ret = -5;
236: rrpc->ret_argc = 0;
237: break;
238: }
1.1 misho 239: default:
240: break;
241: }
242:
1.4.2.1 ! misho 243: AIT_FREE_VAL(&vals[i]);
1.1 misho 244: }
1.4 misho 245: rpc_srv_freeVars(f);
1.3 misho 246: vals = NULL;
247: argc = 0;
1.1 misho 248: }
249:
1.4 misho 250: ret = send(c->cli_sock, buf, Limit, 0);
251: if (ret == -1) {
1.1 misho 252: LOGERR;
253: ret = -8;
254: break;
255: }
256: if (ret != Limit) {
1.2 misho 257: rpc_SetErr(ECANCELED, "Error:: in send RPC request, should be send %d bytes, "
1.1 misho 258: "really is %d\n", Limit, ret);
259: ret = -9;
260: break;
261: }
1.4 misho 262: } while (ret > -1 || s->srv_kill != kill);
1.1 misho 263:
264: shutdown(c->cli_sock, SHUT_RDWR);
265: close(c->cli_sock);
266: memset(c, 0, sizeof(rpc_cli_t));
1.2 misho 267: return (void*) (long)ret;
268: }
269:
270:
271: static void *
272: rpc_srv_dispatchVars(void *arg)
273: {
274: rpc_cli_t *c = arg;
275: rpc_srv_t *s;
276: rpc_blob_t *b;
1.4 misho 277: int ret = 0;
1.2 misho 278: fd_set fds;
279: u_char buf[sizeof(struct tagBLOBHdr)];
280: struct tagBLOBHdr *blob;
281:
282: if (!arg) {
283: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t procced BLOB client ...\n");
284: return NULL;
285: } else
286: s = c->cli_parent;
287:
288: do {
1.4 misho 289: /* check for disable service at this moment? */
290: if (s->srv_blob.state == disable && s->srv_kill != kill) {
291: usleep(100000);
292: #ifdef HAVE_PTHREAD_YIELD
293: pthread_yield();
294: #endif
295: continue;
1.2 misho 296: }
297:
298: FD_ZERO(&fds);
299: FD_SET(c->cli_sock, &fds);
300: ret = select(c->cli_sock + 1, &fds, NULL, NULL, NULL);
301: if (ret == -1) {
1.4 misho 302: if (errno == EINTR && s->srv_kill != kill && s->srv_blob.state != kill)
303: continue;
304:
305: LOGERR;
1.2 misho 306: ret = -2;
1.4 misho 307: break;
1.2 misho 308: }
309:
310: memset(buf, 0, sizeof buf);
1.4 misho 311: ret = recv(c->cli_sock, buf, sizeof buf, 0);
312: if (ret == -1) {
1.2 misho 313: LOGERR;
314: ret = -3;
315: break;
316: }
1.4 misho 317: /* receive EOF, disable or kill service */
318: if (!ret || s->srv_blob.state == kill || s->srv_kill == kill) {
1.2 misho 319: ret = 0;
320: break;
321: }
322: if (ret < sizeof(struct tagBLOBHdr)) {
323: rpc_SetErr(EMSGSIZE, "Error:: too short BLOB packet ...\n");
324: ret = -4;
325: break;
326: } else
327: blob = (struct tagBLOBHdr*) buf;
1.4 misho 328: /* check BLOB packet session info */
1.2 misho 329: if (memcmp(&blob->hdr_session, &s->srv_session, sizeof blob->hdr_session)) {
330: rpc_SetErr(EINVAL, "Error:: get invalid BLOB session ...\n");
331: ret = -5;
332: goto makeReply;
333: }
1.4 misho 334: /* Go to proceed packet ... */
1.2 misho 335: switch (blob->hdr_cmd) {
336: case get:
337: if (!(b = rpc_srv_getBLOB(s, blob->hdr_var))) {
338: rpc_SetErr(EINVAL, "Error:: var (%x) not found into BLOB server ...\n",
339: blob->hdr_var);
340: ret = -6;
341: break;
342: } else
343: blob->hdr_len = b->blob_len;
344:
345: if (rpc_srv_blobMap(s, b) != -1) {
346: ret = rpc_srv_sendBLOB(c, b);
347: rpc_srv_blobUnmap(b);
348: } else
349: ret = -7;
350: break;
351: case set:
352: if ((b = rpc_srv_registerBLOB(s, blob->hdr_len))) {
1.4 misho 353: /* set new BLOB variable for reply :) */
1.2 misho 354: blob->hdr_var = b->blob_var;
355:
356: ret = rpc_srv_recvBLOB(c, b);
357: rpc_srv_blobUnmap(b);
358: } else
359: ret = -7;
360: break;
361: case unset:
362: ret = rpc_srv_unregisterBLOB(s, blob->hdr_var);
363: if (ret == -1)
364: ret = -7;
365: break;
366: default:
367: rpc_SetErr(EINVAL, "Error:: unsupported BLOB command (%d)...\n",
368: blob->hdr_cmd);
369: ret = -7;
370: }
371:
372: makeReply:
1.4 misho 373: /* Replay to client! */
1.2 misho 374: blob->hdr_cmd = ret < 0 ? error : ok;
375: blob->hdr_ret = ret;
1.4 misho 376: ret = send(c->cli_sock, buf, sizeof buf, 0);
377: if (ret == -1) {
1.2 misho 378: LOGERR;
379: ret = -8;
380: break;
381: }
382: if (ret != sizeof buf) {
383: rpc_SetErr(ECANCELED, "Error:: in send BLOB reply, should be send %d bytes, "
384: "really is %d\n", sizeof buf, ret);
385: ret = -9;
386: break;
387: }
1.4 misho 388: } while (ret > -1 || s->srv_kill != kill);
1.2 misho 389:
390: shutdown(c->cli_sock, SHUT_RDWR);
391: close(c->cli_sock);
392: memset(c, 0, sizeof(rpc_cli_t));
393: return (void*) (long)ret;
1.1 misho 394: }
395:
396: // -------------------------------------------------
397:
398: /*
1.2 misho 399: * rpc_srv_initBLOBServer() Init & create BLOB Server
1.4 misho 400: * @srv = RPC server instance
1.2 misho 401: * @Port = Port for bind server, if Port == 0 default port is selected
402: * @diskDir = Disk place for BLOB file objects
403: * return: -1 == error or 0 bind and created BLOB server instance
404: */
405: int
406: rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)
407: {
408: int n = 1;
1.4 misho 409: struct sockaddr sa;
410: struct sockaddr_in *sin = (struct sockaddr_in*) &sa;
411: struct sockaddr_in6 *sin6 = (struct sockaddr_in6*) &sa;
412: struct sockaddr_un *sun = (struct sockaddr_un*) &sa;
1.2 misho 413:
414: if (!srv) {
415: rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t init BLOB server ...\n");
416: return -1;
417: }
418: if (srv->srv_blob.state) {
419: rpc_SetErr(EPERM, "Warning:: Already started BLOB server!\n");
420: return 0;
421: }
422: if (!Port)
423: Port = RPC_DEFPORT + 1;
424:
425: memset(&srv->srv_blob, 0, sizeof srv->srv_blob);
426: if (access(diskDir, R_OK | W_OK) == -1) {
427: LOGERR;
428: return -1;
429: } else
430: strlcpy(srv->srv_blob.dir, diskDir, UCHAR_MAX + 1);
431:
432: srv->srv_blob.server.cli_tid = pthread_self();
433: srv->srv_blob.server.cli_parent = srv;
1.4 misho 434:
435: memcpy(&sa, &srv->srv_server.cli_sa, sizeof sa);
436: switch (srv->srv_server.cli_sa.sa_family) {
437: case AF_INET:
438: sin->sin_port = htons(Port);
439: memcpy(&srv->srv_blob.server.cli_sa, sin, sizeof(struct sockaddr));
440: break;
441: case AF_INET6:
442: sin6->sin6_port = htons(Port);
443: memcpy(&srv->srv_blob.server.cli_sa, sin6, sizeof(struct sockaddr));
444: break;
445: case AF_LOCAL:
446: strlcat(sun->sun_path, ".blob", sizeof sun->sun_path);
447: memcpy(&srv->srv_blob.server.cli_sa, sun, sizeof(struct sockaddr));
448: break;
449: default:
450: return -1;
1.2 misho 451: }
452:
1.4 misho 453: /* create BLOB server socket */
1.2 misho 454: srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa_family, SOCK_STREAM, 0);
455: if (srv->srv_blob.server.cli_sock == -1) {
456: LOGERR;
457: return -1;
458: }
459: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
460: LOGERR;
461: close(srv->srv_blob.server.cli_sock);
462: return -1;
463: }
464: if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa,
465: sizeof srv->srv_blob.server.cli_sa) == -1) {
466: LOGERR;
467: close(srv->srv_blob.server.cli_sock);
468: return -1;
469: }
470:
1.4 misho 471: /* allocate pool for concurent clients */
1.2 misho 472: srv->srv_blob.clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));
473: if (!srv->srv_blob.clients) {
474: LOGERR;
475: close(srv->srv_blob.server.cli_sock);
476: return -1;
477: } else
478: memset(srv->srv_blob.clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));
479:
480: pthread_mutex_init(&srv->srv_blob.mtx, NULL);
481:
482: pthread_mutex_lock(&srv->srv_mtx);
483: rpc_srv_registerCall(srv, NULL, CALL_BLOBSHUTDOWN, 0);
484: rpc_srv_registerCall(srv, NULL, CALL_BLOBCLIENTS, 0);
485: rpc_srv_registerCall(srv, NULL, CALL_BLOBVARS, 0);
486: rpc_srv_registerCall(srv, NULL, CALL_BLOBSTATE, 1);
487: pthread_mutex_unlock(&srv->srv_mtx);
488:
1.4 misho 489: srv->srv_blob.state = enable; /* enable BLOB */
1.2 misho 490: return 0;
491: }
492:
493: /*
494: * rpc_srv_endBLOBServer() Destroy BLOB server, close all opened sockets and free resources
495: * @srv = RPC Server instance
496: * return: none
497: */
498: void
499: rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
500: {
501: rpc_cli_t *c;
502: register int i;
503: rpc_blob_t *f;
504:
505: if (!srv) {
506: rpc_SetErr(EINVAL, "Error:: Can`t destroy server because parameter is null!\n");
507: return;
508: } else
1.4 misho 509: srv->srv_blob.state = kill;
1.2 misho 510:
511: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSHUTDOWN);
512: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBCLIENTS);
513: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBVARS);
514: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSTATE);
515:
1.4 misho 516: /* close all clients connections & server socket */
1.2 misho 517: for (i = 0, c = srv->srv_blob.clients; i < srv->srv_numcli && c; i++, c++)
518: if (c->cli_sa.sa_family)
519: shutdown(c->cli_sock, SHUT_RDWR);
520: close(srv->srv_blob.server.cli_sock);
521:
522: if (srv->srv_blob.clients) {
523: free(srv->srv_blob.clients);
524: srv->srv_blob.clients = NULL;
525: }
526:
1.4 misho 527: /* detach blobs */
1.2 misho 528: pthread_mutex_lock(&srv->srv_blob.mtx);
529: while ((f = srv->srv_blob.blobs)) {
530: srv->srv_blob.blobs = f->blob_next;
531: rpc_srv_blobFree(srv, f);
532: free(f);
533: }
534: pthread_mutex_unlock(&srv->srv_blob.mtx);
535:
536: while (pthread_mutex_trylock(&srv->srv_blob.mtx) == EBUSY);
537: pthread_mutex_destroy(&srv->srv_blob.mtx);
538: }
539:
540: /*
541: * rpc_srv_execBLOBServer() Execute Main BLOB server loop and wait for clients requests
542: * @srv = RPC Server instance
543: * return: -1 error or 0 ok, infinite loop ...
544: */
545: int
546: rpc_srv_execBLOBServer(rpc_srv_t * __restrict srv)
547: {
548: socklen_t salen = sizeof(struct sockaddr);
549: register int i;
550: rpc_cli_t *c;
551: fd_set fds;
552: int ret;
553: struct timeval tv = { DEF_RPC_TIMEOUT, 0 };
554:
1.4 misho 555: if (!srv || srv->srv_blob.state == kill) {
1.2 misho 556: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start BLOB server ...\n");
557: return -1;
558: }
559:
560: if (listen(srv->srv_blob.server.cli_sock, SOMAXCONN) == -1) {
561: LOGERR;
562: return -1;
563: }
564:
1.4 misho 565: while (srv->srv_blob.state != kill && srv->srv_kill != kill) {
1.2 misho 566: for (c = srv->srv_blob.clients, i = 0; i < srv->srv_numcli && c; i++, c++)
567: if (!c->cli_sa.sa_family)
568: break;
569: if (i >= srv->srv_numcli) {
570: usleep(1000000);
571: continue;
572: }
573:
574: FD_ZERO(&fds);
575: FD_SET(srv->srv_blob.server.cli_sock, &fds);
576: ret = select(srv->srv_blob.server.cli_sock + 1, &fds, NULL, NULL, &tv);
577: if (ret == -1) {
578: LOGERR;
579: ret = 1;
580: break;
581: }
582: if (!ret)
583: continue;
584:
585: c->cli_sock = accept(srv->srv_blob.server.cli_sock, &c->cli_sa, &salen);
586: if (c->cli_sock == -1) {
587: LOGERR;
588: continue;
589: } else
590: c->cli_parent = srv;
591:
592: if (pthread_create(&c->cli_tid, NULL, rpc_srv_dispatchVars, c)) {
593: LOGERR;
594: continue;
1.4 misho 595: } else
596: pthread_detach(c->cli_tid);
1.2 misho 597: }
598:
599: srv->srv_blob.state = disable;
600:
601: return 0;
602: }
603:
604:
605: /*
1.1 misho 606: * rpc_srv_initServer() Init & create RPC Server
607: * @regProgID = ProgramID for authentication & recognition
608: * @regProcID = ProcessID for authentication & recognition
609: * @concurentClients = Concurent clients at same time to this server
1.4 misho 610: * @family = Family type, AF_INET, AF_INET6 or AF_LOCAL
611: * @csHost = Host name or address for bind server, if NULL any address
1.1 misho 612: * @Port = Port for bind server, if Port == 0 default port is selected
613: * return: NULL == error or !=NULL bind and created RPC server instance
614: */
615: rpc_srv_t *
616: rpc_srv_initServer(u_int regProgID, u_int regProcID, int concurentClients,
617: u_short family, const char *csHost, u_short Port)
618: {
619: rpc_srv_t *srv = NULL;
620: int n = 1;
621: struct hostent *host = NULL;
1.4 misho 622: struct sockaddr sa;
623: struct sockaddr_in *sin = (struct sockaddr_in*) &sa;
624: struct sockaddr_in6 *sin6 = (struct sockaddr_in6*) &sa;
625: struct sockaddr_un *sun = (struct sockaddr_un*) &sa;
1.1 misho 626:
1.4 misho 627: if (!concurentClients || !regProgID ||
628: (family != AF_INET && family != AF_INET6 && family != AF_LOCAL)) {
1.1 misho 629: rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t init RPC server ...\n");
630: return NULL;
631: }
632: if (!Port)
633: Port = RPC_DEFPORT;
1.4 misho 634: if (csHost && family != AF_LOCAL) {
1.1 misho 635: host = gethostbyname2(csHost, family);
636: if (!host) {
637: rpc_SetErr(h_errno, "Error:: %s\n", hstrerror(h_errno));
638: return NULL;
639: }
640: }
1.4 misho 641: memset(&sa, 0, sizeof sa);
642: sa.sa_family = family;
1.1 misho 643: switch (family) {
644: case AF_INET:
1.4 misho 645: sin->sin_len = sizeof(struct sockaddr_in);
646: sin->sin_port = htons(Port);
1.1 misho 647: if (csHost)
1.4 misho 648: memcpy(&sin->sin_addr, host->h_addr, host->h_length);
1.1 misho 649: break;
650: case AF_INET6:
1.4 misho 651: sin6->sin6_len = sizeof(struct sockaddr_in6);
652: sin6->sin6_port = htons(Port);
653: if (csHost)
654: memcpy(&sin6->sin6_addr, host->h_addr, host->h_length);
655: break;
656: case AF_LOCAL:
657: sun->sun_len = sizeof(struct sockaddr_un);
1.1 misho 658: if (csHost)
1.4 misho 659: strlcpy(sun->sun_path, csHost, sizeof sun->sun_path);
1.1 misho 660: break;
661: default:
662: rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t start RPC server ...\n");
663: return NULL;
664: }
665:
666: srv = malloc(sizeof(rpc_srv_t));
667: if (!srv) {
668: LOGERR;
669: return NULL;
670: } else
671: memset(srv, 0, sizeof(rpc_srv_t));
672:
673: srv->srv_numcli = concurentClients;
674: srv->srv_session.sess_version = RPC_VERSION;
675: srv->srv_session.sess_program = regProgID;
676: srv->srv_session.sess_process = regProcID;
677:
1.2 misho 678: srv->srv_server.cli_tid = pthread_self();
1.1 misho 679: srv->srv_server.cli_parent = srv;
1.4 misho 680: switch (family) {
681: case AF_INET:
682: memcpy(&srv->srv_server.cli_sa, sin, sizeof srv->srv_server.cli_sa);
683: break;
684: case AF_INET6:
685: memcpy(&srv->srv_server.cli_sa, sin6, sizeof srv->srv_server.cli_sa);
686: break;
687: case AF_LOCAL:
688: memcpy(&srv->srv_server.cli_sa, sun, sizeof srv->srv_server.cli_sa);
689: unlink(sun->sun_path);
690: break;
691: }
692:
693: /* create server socket */
1.1 misho 694: srv->srv_server.cli_sock = socket(family, SOCK_STREAM, 0);
695: if (srv->srv_server.cli_sock == -1) {
696: LOGERR;
697: free(srv);
698: return NULL;
699: }
700: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
701: LOGERR;
702: close(srv->srv_server.cli_sock);
703: free(srv);
704: return NULL;
705: }
706: if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa, sizeof srv->srv_server.cli_sa) == -1) {
707: LOGERR;
708: close(srv->srv_server.cli_sock);
709: free(srv);
710: return NULL;
711: }
712:
1.4 misho 713: /* allocate pool for concurent clients */
1.1 misho 714: srv->srv_clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));
715: if (!srv->srv_clients) {
716: LOGERR;
717: close(srv->srv_server.cli_sock);
718: free(srv);
719: return NULL;
720: } else
721: memset(srv->srv_clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));
722:
1.2 misho 723: pthread_mutex_init(&srv->srv_mtx, NULL);
724:
725: rpc_srv_registerCall(srv, NULL, CALL_SRVSHUTDOWN, 0);
1.1 misho 726: rpc_srv_registerCall(srv, NULL, CALL_SRVCLIENTS, 0);
727: rpc_srv_registerCall(srv, NULL, CALL_SRVCALLS, 0);
728: rpc_srv_registerCall(srv, NULL, CALL_SRVSESSIONS, 0);
729: return srv;
730: }
731:
732: /*
733: * rpc_srv_endServer() Destroy RPC server, close all opened sockets and free resources
734: * @srv = RPC Server instance
735: * return: none
736: */
737: void
738: rpc_srv_endServer(rpc_srv_t * __restrict srv)
739: {
740: rpc_cli_t *c;
741: register int i;
742: rpc_func_t *f;
743:
744: if (!srv) {
745: rpc_SetErr(EINVAL, "Error:: Can`t destroy server because parameter is null!\n");
746: return;
747: }
748:
1.2 misho 749: rpc_srv_endBLOBServer(srv);
1.1 misho 750:
1.4 misho 751: /* close all clients connections & server socket */
1.1 misho 752: for (i = 0, c = srv->srv_clients; i < srv->srv_numcli && c; i++, c++)
1.2 misho 753: if (c->cli_sa.sa_family) {
1.1 misho 754: shutdown(c->cli_sock, SHUT_RDWR);
1.2 misho 755: close(c->cli_sock);
756: }
1.1 misho 757: close(srv->srv_server.cli_sock);
758:
759: if (srv->srv_clients) {
760: free(srv->srv_clients);
1.2 misho 761: srv->srv_clients = NULL;
1.1 misho 762: srv->srv_numcli = 0;
763: }
764:
1.4 misho 765: /* detach exported calls */
1.2 misho 766: pthread_mutex_lock(&srv->srv_mtx);
767: while ((f = srv->srv_funcs)) {
768: srv->srv_funcs = f->func_next;
769: free(f);
770: }
771: pthread_mutex_unlock(&srv->srv_mtx);
772:
773: while (pthread_mutex_trylock(&srv->srv_mtx) == EBUSY);
774: pthread_mutex_destroy(&srv->srv_mtx);
775:
1.1 misho 776: free(srv);
777: srv = NULL;
778: }
779:
780: /*
781: * rpc_srv_execServer() Execute Main server loop and wait for clients requests
782: * @srv = RPC Server instance
783: * return: -1 error or 0 ok, infinite loop ...
784: */
785: int
786: rpc_srv_execServer(rpc_srv_t * __restrict srv)
787: {
788: socklen_t salen = sizeof(struct sockaddr);
789: register int i;
790: rpc_cli_t *c;
1.2 misho 791: fd_set fds;
792: int ret;
793: struct timeval tv = { DEF_RPC_TIMEOUT, 0 };
1.1 misho 794:
795: if (!srv) {
796: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start RPC server ...\n");
797: return -1;
798: }
799:
800: if (listen(srv->srv_server.cli_sock, SOMAXCONN) == -1) {
801: LOGERR;
802: return -1;
803: }
804:
1.4 misho 805: while (srv->srv_kill != kill) {
1.1 misho 806: for (c = srv->srv_clients, i = 0; i < srv->srv_numcli && c; i++, c++)
807: if (!c->cli_sa.sa_family)
808: break;
1.2 misho 809: if (i >= srv->srv_numcli) {
1.1 misho 810: usleep(1000000);
811: continue;
812: }
1.2 misho 813:
814: FD_ZERO(&fds);
815: FD_SET(srv->srv_server.cli_sock, &fds);
816: ret = select(srv->srv_server.cli_sock + 1, &fds, NULL, NULL, &tv);
817: if (ret == -1) {
818: LOGERR;
819: ret = 1;
820: break;
821: }
822: if (!ret)
823: continue;
824:
1.1 misho 825: c->cli_sock = accept(srv->srv_server.cli_sock, &c->cli_sa, &salen);
826: if (c->cli_sock == -1) {
827: LOGERR;
828: continue;
829: } else
830: c->cli_parent = srv;
831:
832: if (pthread_create(&c->cli_tid, NULL, rpc_srv_dispatchCall, c)) {
833: LOGERR;
834: continue;
1.4 misho 835: } else
836: pthread_detach(c->cli_tid);
1.1 misho 837: }
838:
839: return 0;
840: }
841:
842: // ---------------------------------------------------------
843:
844: /*
845: * rpc_srv_execCall() Execute registered call from RPC server
846: * @call = Register RPC call
847: * @rpc = IN RPC call structure
848: * @args = IN RPC call array of rpc values
849: * return: -1 error, !=-1 ok
850: */
851: int
1.2 misho 852: rpc_srv_execCall(rpc_func_t * __restrict call, struct tagRPCCall * __restrict rpc,
1.4.2.1 ! misho 853: ait_val_t * __restrict args)
1.1 misho 854: {
855: void *dl;
856: rpc_callback_t func;
857: int ret;
858:
1.2 misho 859: if (!call || !rpc || !call->func_parent) {
1.1 misho 860: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t exec call from RPC server ...\n");
861: return -1;
862: }
863:
864: dl = dlopen((char*) (*call->func_file ? call->func_file : NULL), RTLD_NOW);
865: if (!dl) {
866: rpc_SetErr(ENOENT, "Error:: Can`t attach module %s!\n", dlerror());
867: return -1;
868: }
869:
870: func = dlsym(dl, (char*) call->func_name);
871: if (func)
1.2 misho 872: ret = func(call, rpc->call_argc, args);
1.1 misho 873: else {
874: rpc_SetErr(ENOEXEC, "Error:: Can`t find function %s!\n", dlerror());
875: ret = -1;
876: }
877:
878: dlclose(dl);
879: return ret;
880: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>