Annotation of libaitrpc/src/srv.c, revision 1.3.2.11
1.1 misho 1: /*************************************************************************
2: * (C) 2010 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.3.2.11! misho 6: * $Id: srv.c,v 1.3.2.10 2011/08/19 14:24:54 misho Exp $
1.1 misho 7: *
1.2 misho 8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011
16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
1.1 misho 46: #include "global.h"
47:
48:
49: static void *
50: rpc_srv_dispatchCall(void *arg)
51: {
52: rpc_cli_t *c = arg;
53: rpc_srv_t *s;
1.2 misho 54: rpc_val_t *vals = NULL, *v = NULL;
1.3 misho 55: rpc_func_t *f = NULL;
1.1 misho 56: struct tagRPCCall *rpc;
1.3 misho 57: struct tagRPCRet *rrpc;
58: rpc_sess_t ses = { 0 };
1.1 misho 59: fd_set fds;
60: u_char buf[BUFSIZ], *data;
1.2 misho 61: int ret, argc = 0, Limit = 0;
1.1 misho 62: register int i;
63:
64: if (!arg) {
65: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t procced RPC client ...\n");
66: return NULL;
67: } else
68: s = c->cli_parent;
69:
70: do {
1.3 misho 71: v = NULL;
1.1 misho 72: FD_ZERO(&fds);
73: FD_SET(c->cli_sock, &fds);
74: ret = select(c->cli_sock + 1, &fds, NULL, NULL, NULL);
75: if (ret == -1) {
1.3.2.9 misho 76: if (errno == EINTR && s->srv_kill != kill)
77: continue;
78:
79: LOGERR;
1.1 misho 80: ret = -2;
1.3.2.9 misho 81: break;
1.1 misho 82: }
1.3.2.10 misho 83: memset(buf, 0, sizeof buf);
84: ret = recv(c->cli_sock, buf, sizeof buf, 0);
1.3.2.9 misho 85: if (ret == -1) {
1.1 misho 86: LOGERR;
87: ret = -3;
88: break;
89: }
1.3.2.11! misho 90: if (!ret) { /* receive EOF */
1.1 misho 91: ret = 0;
92: break;
93: }
94: if (ret < sizeof(struct tagRPCCall)) {
95: rpc_SetErr(EMSGSIZE, "Error:: too short RPC packet ...\n");
96: ret = -4;
97: break;
98: } else
99: rpc = (struct tagRPCCall*) buf;
1.3.2.11! misho 100: /* check RPC packet session info */
1.1 misho 101: if (memcmp(&rpc->call_session, &s->srv_session, sizeof rpc->call_session)) {
102: rpc_SetErr(EINVAL, "Error:: get invalid RPC session ...\n");
103: ret = -5;
1.2 misho 104: goto makeReply;
105: } else
106: Limit = sizeof(struct tagRPCCall);
1.3.2.11! misho 107: /* RPC is OK! Go decapsulate variables ... */
1.1 misho 108: if (rpc->call_argc) {
1.2 misho 109: v = (rpc_val_t*) (buf + Limit);
1.3.2.11! misho 110: /* check RPC packet length */
1.3.2.10 misho 111: if (rpc->call_argc * sizeof(rpc_val_t) > sizeof buf - Limit) {
1.3.2.11! misho 112: rpc_SetErr(EMSGSIZE, "Error:: too long RPC packet ...\n");
1.2 misho 113: ret = -5;
114: goto makeReply;
115: } else
116: Limit += rpc->call_argc * sizeof(rpc_val_t);
1.3.2.11! misho 117: /* RPC received variables types OK! */
1.1 misho 118: data = (u_char*) v + rpc->call_argc * sizeof(rpc_val_t);
119: for (i = 0; i < rpc->call_argc; i++) {
120: switch (v[i].val_type) {
121: case buffer:
1.3.2.10 misho 122: if (v[i].val_len > sizeof buf - Limit) {
1.3.2.11! misho 123: rpc_SetErr(EMSGSIZE, "Error:: too long RPC packet ...\n");
1.2 misho 124: ret = -5;
125: goto makeReply;
126: } else
127: Limit += v[i].val_len;
128:
1.1 misho 129: v[i].val.buffer = data;
130: data += v[i].val_len;
131: break;
132: case string:
1.3.2.10 misho 133: if (v[i].val_len > sizeof buf - Limit) {
1.3.2.11! misho 134: rpc_SetErr(EMSGSIZE, "Error:: too long RPC packet ...\n");
1.2 misho 135: ret = -5;
136: goto makeReply;
137: } else
138: Limit += v[i].val_len;
139:
1.1 misho 140: v[i].val.string = (int8_t*) data;
1.3 misho 141: data += v[i].val_len;
1.1 misho 142: break;
1.2 misho 143: case blob:
144: if (s->srv_blob.state == disable) {
145: rpc_SetErr(ENOTSUP, "Error:: BLOB server is disabled\n");
146: ret = -5;
147: goto makeReply;
148: }
1.3.2.9 misho 149: if (s->srv_blob.state == kill) {
1.3.2.11! misho 150: rpc_SetErr(ENOTSUP, "Error:: BLOB server is gone.\n");
1.3.2.9 misho 151: ret = -5;
152: goto makeReply;
153: }
1.1 misho 154: default:
155: break;
156: }
157: }
158: }
159:
1.3.2.11! misho 160: /* execute call */
1.1 misho 161: argc = 0;
162: vals = NULL;
1.3 misho 163: memcpy(&ses, &rpc->call_session, sizeof ses);
1.1 misho 164: if (!(f = rpc_srv_getCall(s, rpc->call_tag, rpc->call_hash))) {
165: rpc_SetErr(EINVAL, "Error:: call not found into RPC server ...\n");
166: ret = -6;
167: } else
1.2 misho 168: if ((ret = rpc_srv_execCall(f, rpc, v)) == -1)
169: ret = -9;
1.1 misho 170: else
1.3.2.7 misho 171: argc = rpc_srv_getVars(f, &vals);
1.3 misho 172: makeReply:
1.3.2.11! misho 173: /* made reply */
1.3.2.10 misho 174: memset(buf, 0, sizeof buf);
1.3 misho 175: rrpc = (struct tagRPCRet*) buf;
176: Limit = sizeof(struct tagRPCRet);
1.1 misho 177:
1.3 misho 178: memcpy(&rrpc->ret_session, &ses, sizeof(rpc_sess_t));
179: rrpc->ret_tag = rpc->call_tag;
180: rrpc->ret_hash = rpc->call_hash;
181: rrpc->ret_errno = rpc_Errno;
182: rrpc->ret_retcode = ret;
183: rrpc->ret_argc = argc;
1.1 misho 184:
185: if (argc && vals) {
1.3 misho 186: v = (rpc_val_t*) (buf + Limit);
1.3.2.10 misho 187: if (argc * sizeof(rpc_val_t) > sizeof buf - Limit) {
1.3 misho 188: for (i = 0; i < argc; i++)
189: RPC_FREE_VAL(&vals[i]);
1.3.2.7 misho 190: rpc_srv_freeVars(f);
1.3 misho 191: vals = NULL;
192: argc = 0;
193: ret = -7;
194: rpc_SetErr(EMSGSIZE, "Error:: in prepare RPC packet values (-7) ...\n");
195: goto makeReply;
196: } else
197: Limit += argc * sizeof(rpc_val_t);
1.1 misho 198: memcpy(v, vals, argc * sizeof(rpc_val_t));
199: data = (u_char*) v + argc * sizeof(rpc_val_t);
200: for (ret = i = 0; i < argc; i++) {
201: switch (vals[i].val_type) {
202: case buffer:
1.3.2.10 misho 203: if (ret || Limit + vals[i].val_len > sizeof buf) {
1.1 misho 204: rpc_SetErr(EMSGSIZE, "Error:: in prepare RPC packet (-7) ...\n");
1.3 misho 205: rrpc->ret_retcode = ret = -7;
206: rrpc->ret_argc = 0;
1.1 misho 207: break;
208: }
209:
210: memcpy(data, vals[i].val.buffer, vals[i].val_len);
211: data += vals[i].val_len;
212: Limit += vals[i].val_len;
213: break;
214: case string:
1.3.2.10 misho 215: if (ret || Limit + vals[i].val_len > sizeof buf) {
1.1 misho 216: rpc_SetErr(EMSGSIZE, "Error:: in prepare RPC packet (-7) ...\n");
1.3 misho 217: rrpc->ret_retcode = ret = -7;
218: rrpc->ret_argc = 0;
1.1 misho 219: break;
220: }
221:
1.3 misho 222: memcpy(data, vals[i].val.string, vals[i].val_len);
223: data += vals[i].val_len;
224: Limit += vals[i].val_len;
1.1 misho 225: break;
1.2 misho 226: case blob:
227: if (s->srv_blob.state == disable) {
228: rpc_SetErr(ENOTSUP, "Error:: BLOB server is disabled\n");
1.3 misho 229: rrpc->ret_retcode = ret = -5;
230: rrpc->ret_argc = 0;
1.1 misho 231: break;
232: }
1.3.2.9 misho 233: if (s->srv_blob.state == kill) {
1.3.2.11! misho 234: rpc_SetErr(ENOTSUP, "Error:: BLOB server is gone.\n");
1.3.2.9 misho 235: rrpc->ret_retcode = ret = -5;
236: rrpc->ret_argc = 0;
237: break;
238: }
1.1 misho 239: default:
240: break;
241: }
242:
243: RPC_FREE_VAL(&vals[i]);
244: }
1.3.2.7 misho 245: rpc_srv_freeVars(f);
1.3 misho 246: vals = NULL;
247: argc = 0;
1.1 misho 248: }
249:
1.3.2.9 misho 250: ret = send(c->cli_sock, buf, Limit, 0);
251: if (ret == -1) {
1.1 misho 252: LOGERR;
253: ret = -8;
254: break;
255: }
256: if (ret != Limit) {
1.2 misho 257: rpc_SetErr(ECANCELED, "Error:: in send RPC request, should be send %d bytes, "
1.1 misho 258: "really is %d\n", Limit, ret);
259: ret = -9;
260: break;
261: }
1.3.2.9 misho 262: } while (ret > -1 || s->srv_kill != kill);
1.1 misho 263:
264: shutdown(c->cli_sock, SHUT_RDWR);
265: close(c->cli_sock);
266: memset(c, 0, sizeof(rpc_cli_t));
1.2 misho 267: return (void*) (long)ret;
268: }
269:
270:
271: static void *
272: rpc_srv_dispatchVars(void *arg)
273: {
274: rpc_cli_t *c = arg;
275: rpc_srv_t *s;
276: rpc_blob_t *b;
1.3.2.9 misho 277: int ret = 0;
1.2 misho 278: fd_set fds;
279: u_char buf[sizeof(struct tagBLOBHdr)];
280: struct tagBLOBHdr *blob;
281:
282: if (!arg) {
283: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t procced BLOB client ...\n");
284: return NULL;
285: } else
286: s = c->cli_parent;
287:
288: do {
1.3.2.11! misho 289: /* check for disable service at this moment? */
1.3.2.9 misho 290: if (s->srv_blob.state == disable && s->srv_kill != kill) {
291: usleep(100000);
292: pthread_yield();
293: continue;
1.2 misho 294: }
295:
296: FD_ZERO(&fds);
297: FD_SET(c->cli_sock, &fds);
298: ret = select(c->cli_sock + 1, &fds, NULL, NULL, NULL);
299: if (ret == -1) {
1.3.2.9 misho 300: if (errno == EINTR && s->srv_kill != kill && s->srv_blob.state != kill)
301: continue;
302:
303: LOGERR;
1.2 misho 304: ret = -2;
1.3.2.9 misho 305: break;
1.2 misho 306: }
307:
308: memset(buf, 0, sizeof buf);
1.3.2.9 misho 309: ret = recv(c->cli_sock, buf, sizeof buf, 0);
310: if (ret == -1) {
1.2 misho 311: LOGERR;
312: ret = -3;
313: break;
314: }
1.3.2.9 misho 315: /* receive EOF, disable or kill service */
316: if (!ret || s->srv_blob.state == kill || s->srv_kill == kill) {
1.2 misho 317: ret = 0;
318: break;
319: }
320: if (ret < sizeof(struct tagBLOBHdr)) {
321: rpc_SetErr(EMSGSIZE, "Error:: too short BLOB packet ...\n");
322: ret = -4;
323: break;
324: } else
325: blob = (struct tagBLOBHdr*) buf;
1.3.2.11! misho 326: /* check BLOB packet session info */
1.2 misho 327: if (memcmp(&blob->hdr_session, &s->srv_session, sizeof blob->hdr_session)) {
328: rpc_SetErr(EINVAL, "Error:: get invalid BLOB session ...\n");
329: ret = -5;
330: goto makeReply;
331: }
1.3.2.11! misho 332: /* Go to proceed packet ... */
1.2 misho 333: switch (blob->hdr_cmd) {
334: case get:
335: if (!(b = rpc_srv_getBLOB(s, blob->hdr_var))) {
336: rpc_SetErr(EINVAL, "Error:: var (%x) not found into BLOB server ...\n",
337: blob->hdr_var);
338: ret = -6;
339: break;
340: } else
341: blob->hdr_len = b->blob_len;
342:
343: if (rpc_srv_blobMap(s, b) != -1) {
344: ret = rpc_srv_sendBLOB(c, b);
345: rpc_srv_blobUnmap(b);
346: } else
347: ret = -7;
348: break;
349: case set:
350: if ((b = rpc_srv_registerBLOB(s, blob->hdr_len))) {
1.3.2.11! misho 351: /* set new BLOB variable for reply :) */
1.2 misho 352: blob->hdr_var = b->blob_var;
353:
354: ret = rpc_srv_recvBLOB(c, b);
355: rpc_srv_blobUnmap(b);
356: } else
357: ret = -7;
358: break;
359: case unset:
360: ret = rpc_srv_unregisterBLOB(s, blob->hdr_var);
361: if (ret == -1)
362: ret = -7;
363: break;
364: default:
365: rpc_SetErr(EINVAL, "Error:: unsupported BLOB command (%d)...\n",
366: blob->hdr_cmd);
367: ret = -7;
368: }
369:
370: makeReply:
1.3.2.11! misho 371: /* Replay to client! */
1.2 misho 372: blob->hdr_cmd = ret < 0 ? error : ok;
373: blob->hdr_ret = ret;
1.3.2.9 misho 374: ret = send(c->cli_sock, buf, sizeof buf, 0);
375: if (ret == -1) {
1.2 misho 376: LOGERR;
377: ret = -8;
378: break;
379: }
380: if (ret != sizeof buf) {
381: rpc_SetErr(ECANCELED, "Error:: in send BLOB reply, should be send %d bytes, "
382: "really is %d\n", sizeof buf, ret);
383: ret = -9;
384: break;
385: }
1.3.2.9 misho 386: } while (ret > -1 || s->srv_kill != kill);
1.2 misho 387:
388: shutdown(c->cli_sock, SHUT_RDWR);
389: close(c->cli_sock);
390: memset(c, 0, sizeof(rpc_cli_t));
391: return (void*) (long)ret;
1.1 misho 392: }
393:
394: // -------------------------------------------------
395:
396: /*
1.2 misho 397: * rpc_srv_initBLOBServer() Init & create BLOB Server
1.3.2.3 misho 398: * @srv = RPC server instance
1.2 misho 399: * @Port = Port for bind server, if Port == 0 default port is selected
400: * @diskDir = Disk place for BLOB file objects
401: * return: -1 == error or 0 bind and created BLOB server instance
402: */
403: int
1.3.2.9 misho 404: rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)
1.2 misho 405: {
406: int n = 1;
1.3.2.3 misho 407: struct sockaddr sa;
408: struct sockaddr_in *sin = (struct sockaddr_in*) &sa;
409: struct sockaddr_in6 *sin6 = (struct sockaddr_in6*) &sa;
410: struct sockaddr_un *sun = (struct sockaddr_un*) &sa;
1.2 misho 411:
412: if (!srv) {
413: rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t init BLOB server ...\n");
414: return -1;
415: }
416: if (srv->srv_blob.state) {
417: rpc_SetErr(EPERM, "Warning:: Already started BLOB server!\n");
418: return 0;
419: }
420: if (!Port)
421: Port = RPC_DEFPORT + 1;
422:
423: memset(&srv->srv_blob, 0, sizeof srv->srv_blob);
424: if (access(diskDir, R_OK | W_OK) == -1) {
425: LOGERR;
426: return -1;
427: } else
428: strlcpy(srv->srv_blob.dir, diskDir, UCHAR_MAX + 1);
429:
430: srv->srv_blob.server.cli_tid = pthread_self();
431: srv->srv_blob.server.cli_parent = srv;
1.3.2.3 misho 432:
433: memcpy(&sa, &srv->srv_server.cli_sa, sizeof sa);
434: switch (srv->srv_server.cli_sa.sa_family) {
435: case AF_INET:
436: sin->sin_port = htons(Port);
437: memcpy(&srv->srv_blob.server.cli_sa, sin, sizeof(struct sockaddr));
438: break;
439: case AF_INET6:
440: sin6->sin6_port = htons(Port);
441: memcpy(&srv->srv_blob.server.cli_sa, sin6, sizeof(struct sockaddr));
442: break;
443: case AF_LOCAL:
444: strlcat(sun->sun_path, ".blob", sizeof sun->sun_path);
445: memcpy(&srv->srv_blob.server.cli_sa, sun, sizeof(struct sockaddr));
446: break;
447: default:
448: return -1;
1.2 misho 449: }
450:
1.3.2.3 misho 451: /* create BLOB server socket */
1.3.2.9 misho 452: srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa_family, SOCK_STREAM, 0);
1.2 misho 453: if (srv->srv_blob.server.cli_sock == -1) {
454: LOGERR;
455: return -1;
456: }
457: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
458: LOGERR;
459: close(srv->srv_blob.server.cli_sock);
460: return -1;
461: }
462: if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa,
463: sizeof srv->srv_blob.server.cli_sa) == -1) {
464: LOGERR;
465: close(srv->srv_blob.server.cli_sock);
466: return -1;
467: }
468:
1.3.2.3 misho 469: /* allocate pool for concurent clients */
1.2 misho 470: srv->srv_blob.clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));
471: if (!srv->srv_blob.clients) {
472: LOGERR;
473: close(srv->srv_blob.server.cli_sock);
474: return -1;
475: } else
476: memset(srv->srv_blob.clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));
477:
478: pthread_mutex_init(&srv->srv_blob.mtx, NULL);
479:
480: pthread_mutex_lock(&srv->srv_mtx);
481: rpc_srv_registerCall(srv, NULL, CALL_BLOBSHUTDOWN, 0);
482: rpc_srv_registerCall(srv, NULL, CALL_BLOBCLIENTS, 0);
483: rpc_srv_registerCall(srv, NULL, CALL_BLOBVARS, 0);
484: rpc_srv_registerCall(srv, NULL, CALL_BLOBSTATE, 1);
485: pthread_mutex_unlock(&srv->srv_mtx);
486:
487: srv->srv_blob.state = enable; // enable BLOB
488: return 0;
489: }
490:
491: /*
492: * rpc_srv_endBLOBServer() Destroy BLOB server, close all opened sockets and free resources
493: * @srv = RPC Server instance
494: * return: none
495: */
496: void
497: rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
498: {
499: rpc_cli_t *c;
500: register int i;
501: rpc_blob_t *f;
502:
503: if (!srv) {
504: rpc_SetErr(EINVAL, "Error:: Can`t destroy server because parameter is null!\n");
505: return;
506: } else
1.3.2.9 misho 507: srv->srv_blob.state = kill;
1.2 misho 508:
509: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSHUTDOWN);
510: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBCLIENTS);
511: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBVARS);
512: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSTATE);
513:
1.3.2.3 misho 514: /* close all clients connections & server socket */
1.2 misho 515: for (i = 0, c = srv->srv_blob.clients; i < srv->srv_numcli && c; i++, c++)
516: if (c->cli_sa.sa_family)
517: shutdown(c->cli_sock, SHUT_RDWR);
518: close(srv->srv_blob.server.cli_sock);
519:
520: if (srv->srv_blob.clients) {
521: free(srv->srv_blob.clients);
522: srv->srv_blob.clients = NULL;
523: }
524:
1.3.2.3 misho 525: /* detach blobs */
1.2 misho 526: pthread_mutex_lock(&srv->srv_blob.mtx);
527: while ((f = srv->srv_blob.blobs)) {
528: srv->srv_blob.blobs = f->blob_next;
529: rpc_srv_blobFree(srv, f);
530: free(f);
531: }
532: pthread_mutex_unlock(&srv->srv_blob.mtx);
533:
534: while (pthread_mutex_trylock(&srv->srv_blob.mtx) == EBUSY);
535: pthread_mutex_destroy(&srv->srv_blob.mtx);
536: }
537:
538: /*
539: * rpc_srv_execBLOBServer() Execute Main BLOB server loop and wait for clients requests
540: * @srv = RPC Server instance
541: * return: -1 error or 0 ok, infinite loop ...
542: */
543: int
544: rpc_srv_execBLOBServer(rpc_srv_t * __restrict srv)
545: {
546: socklen_t salen = sizeof(struct sockaddr);
547: register int i;
548: rpc_cli_t *c;
549: fd_set fds;
550: int ret;
551: struct timeval tv = { DEF_RPC_TIMEOUT, 0 };
552:
1.3.2.9 misho 553: if (!srv || srv->srv_blob.state == kill) {
1.2 misho 554: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start BLOB server ...\n");
555: return -1;
556: }
557:
1.3.2.9 misho 558: if (listen(srv->srv_blob.server.cli_sock, SOMAXCONN) == -1) {
559: LOGERR;
560: return -1;
561: }
1.2 misho 562:
1.3.2.6 misho 563: while (srv->srv_blob.state != kill && srv->srv_kill != kill) {
1.2 misho 564: for (c = srv->srv_blob.clients, i = 0; i < srv->srv_numcli && c; i++, c++)
565: if (!c->cli_sa.sa_family)
566: break;
567: if (i >= srv->srv_numcli) {
568: usleep(1000000);
569: continue;
570: }
571:
572: FD_ZERO(&fds);
573: FD_SET(srv->srv_blob.server.cli_sock, &fds);
574: ret = select(srv->srv_blob.server.cli_sock + 1, &fds, NULL, NULL, &tv);
575: if (ret == -1) {
576: LOGERR;
577: ret = 1;
578: break;
579: }
580: if (!ret)
581: continue;
582:
1.3.2.9 misho 583: c->cli_sock = accept(srv->srv_blob.server.cli_sock, &c->cli_sa, &salen);
1.2 misho 584: if (c->cli_sock == -1) {
585: LOGERR;
586: continue;
587: } else
588: c->cli_parent = srv;
589:
590: if (pthread_create(&c->cli_tid, NULL, rpc_srv_dispatchVars, c)) {
591: LOGERR;
592: continue;
1.3.2.3 misho 593: } else
1.3.2.9 misho 594: pthread_detach(c->cli_tid);
1.2 misho 595: }
596:
597: srv->srv_blob.state = disable;
598:
599: return 0;
600: }
601:
602:
603: /*
1.1 misho 604: * rpc_srv_initServer() Init & create RPC Server
605: * @regProgID = ProgramID for authentication & recognition
606: * @regProcID = ProcessID for authentication & recognition
607: * @concurentClients = Concurent clients at same time to this server
1.3.2.1 misho 608: * @family = Family type, AF_INET, AF_INET6 or AF_LOCAL
609: * @csHost = Host name or address for bind server, if NULL any address
1.1 misho 610: * @Port = Port for bind server, if Port == 0 default port is selected
611: * return: NULL == error or !=NULL bind and created RPC server instance
612: */
613: rpc_srv_t *
614: rpc_srv_initServer(u_int regProgID, u_int regProcID, int concurentClients,
1.3.2.9 misho 615: u_short family, const char *csHost, u_short Port)
1.1 misho 616: {
617: rpc_srv_t *srv = NULL;
618: int n = 1;
619: struct hostent *host = NULL;
1.3.2.1 misho 620: struct sockaddr sa;
621: struct sockaddr_in *sin = (struct sockaddr_in*) &sa;
622: struct sockaddr_in6 *sin6 = (struct sockaddr_in6*) &sa;
623: struct sockaddr_un *sun = (struct sockaddr_un*) &sa;
1.1 misho 624:
1.3.2.1 misho 625: if (!concurentClients || !regProgID ||
626: (family != AF_INET && family != AF_INET6 && family != AF_LOCAL)) {
1.1 misho 627: rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t init RPC server ...\n");
628: return NULL;
629: }
630: if (!Port)
631: Port = RPC_DEFPORT;
1.3.2.1 misho 632: if (csHost && family != AF_LOCAL) {
1.1 misho 633: host = gethostbyname2(csHost, family);
634: if (!host) {
635: rpc_SetErr(h_errno, "Error:: %s\n", hstrerror(h_errno));
636: return NULL;
637: }
638: }
1.3.2.1 misho 639: memset(&sa, 0, sizeof sa);
640: sa.sa_family = family;
1.1 misho 641: switch (family) {
642: case AF_INET:
1.3.2.1 misho 643: sin->sin_len = sizeof(struct sockaddr_in);
644: sin->sin_port = htons(Port);
1.1 misho 645: if (csHost)
1.3.2.1 misho 646: memcpy(&sin->sin_addr, host->h_addr, host->h_length);
1.1 misho 647: break;
648: case AF_INET6:
1.3.2.1 misho 649: sin6->sin6_len = sizeof(struct sockaddr_in6);
650: sin6->sin6_port = htons(Port);
651: if (csHost)
652: memcpy(&sin6->sin6_addr, host->h_addr, host->h_length);
653: break;
654: case AF_LOCAL:
655: sun->sun_len = sizeof(struct sockaddr_un);
1.1 misho 656: if (csHost)
1.3.2.1 misho 657: strlcpy(sun->sun_path, csHost, sizeof sun->sun_path);
1.1 misho 658: break;
659: default:
660: rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t start RPC server ...\n");
661: return NULL;
662: }
663:
664: srv = malloc(sizeof(rpc_srv_t));
665: if (!srv) {
666: LOGERR;
667: return NULL;
668: } else
669: memset(srv, 0, sizeof(rpc_srv_t));
670:
671: srv->srv_numcli = concurentClients;
672: srv->srv_session.sess_version = RPC_VERSION;
673: srv->srv_session.sess_program = regProgID;
674: srv->srv_session.sess_process = regProcID;
675:
1.2 misho 676: srv->srv_server.cli_tid = pthread_self();
1.1 misho 677: srv->srv_server.cli_parent = srv;
1.3.2.1 misho 678: switch (family) {
679: case AF_INET:
1.3.2.3 misho 680: memcpy(&srv->srv_server.cli_sa, sin, sizeof srv->srv_server.cli_sa);
1.3.2.1 misho 681: break;
682: case AF_INET6:
1.3.2.3 misho 683: memcpy(&srv->srv_server.cli_sa, sin6, sizeof srv->srv_server.cli_sa);
1.3.2.1 misho 684: break;
685: case AF_LOCAL:
1.3.2.3 misho 686: memcpy(&srv->srv_server.cli_sa, sun, sizeof srv->srv_server.cli_sa);
1.3.2.1 misho 687: unlink(sun->sun_path);
688: break;
689: }
690:
691: /* create server socket */
1.3.2.9 misho 692: srv->srv_server.cli_sock = socket(family, SOCK_STREAM, 0);
1.1 misho 693: if (srv->srv_server.cli_sock == -1) {
694: LOGERR;
695: free(srv);
696: return NULL;
1.3.2.4 misho 697: }
1.1 misho 698: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
699: LOGERR;
700: close(srv->srv_server.cli_sock);
701: free(srv);
702: return NULL;
703: }
704: if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa, sizeof srv->srv_server.cli_sa) == -1) {
705: LOGERR;
706: close(srv->srv_server.cli_sock);
707: free(srv);
708: return NULL;
709: }
710:
1.3.2.1 misho 711: /* allocate pool for concurent clients */
1.1 misho 712: srv->srv_clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));
713: if (!srv->srv_clients) {
714: LOGERR;
715: close(srv->srv_server.cli_sock);
716: free(srv);
717: return NULL;
718: } else
719: memset(srv->srv_clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));
720:
1.2 misho 721: pthread_mutex_init(&srv->srv_mtx, NULL);
722:
723: rpc_srv_registerCall(srv, NULL, CALL_SRVSHUTDOWN, 0);
1.1 misho 724: rpc_srv_registerCall(srv, NULL, CALL_SRVCLIENTS, 0);
725: rpc_srv_registerCall(srv, NULL, CALL_SRVCALLS, 0);
726: rpc_srv_registerCall(srv, NULL, CALL_SRVSESSIONS, 0);
727: return srv;
728: }
729:
730: /*
731: * rpc_srv_endServer() Destroy RPC server, close all opened sockets and free resources
732: * @srv = RPC Server instance
733: * return: none
734: */
735: void
736: rpc_srv_endServer(rpc_srv_t * __restrict srv)
737: {
738: rpc_cli_t *c;
739: register int i;
740: rpc_func_t *f;
741:
742: if (!srv) {
743: rpc_SetErr(EINVAL, "Error:: Can`t destroy server because parameter is null!\n");
744: return;
745: }
746:
1.2 misho 747: rpc_srv_endBLOBServer(srv);
1.1 misho 748:
1.3.2.1 misho 749: /* close all clients connections & server socket */
1.1 misho 750: for (i = 0, c = srv->srv_clients; i < srv->srv_numcli && c; i++, c++)
1.2 misho 751: if (c->cli_sa.sa_family) {
1.1 misho 752: shutdown(c->cli_sock, SHUT_RDWR);
1.2 misho 753: close(c->cli_sock);
754: }
1.1 misho 755: close(srv->srv_server.cli_sock);
756:
757: if (srv->srv_clients) {
758: free(srv->srv_clients);
1.2 misho 759: srv->srv_clients = NULL;
1.1 misho 760: srv->srv_numcli = 0;
761: }
762:
1.3.2.1 misho 763: /* detach exported calls */
1.2 misho 764: pthread_mutex_lock(&srv->srv_mtx);
765: while ((f = srv->srv_funcs)) {
766: srv->srv_funcs = f->func_next;
767: free(f);
768: }
769: pthread_mutex_unlock(&srv->srv_mtx);
770:
771: while (pthread_mutex_trylock(&srv->srv_mtx) == EBUSY);
772: pthread_mutex_destroy(&srv->srv_mtx);
773:
1.1 misho 774: free(srv);
775: srv = NULL;
776: }
777:
778: /*
779: * rpc_srv_execServer() Execute Main server loop and wait for clients requests
780: * @srv = RPC Server instance
781: * return: -1 error or 0 ok, infinite loop ...
782: */
783: int
784: rpc_srv_execServer(rpc_srv_t * __restrict srv)
785: {
786: socklen_t salen = sizeof(struct sockaddr);
787: register int i;
788: rpc_cli_t *c;
1.2 misho 789: fd_set fds;
790: int ret;
791: struct timeval tv = { DEF_RPC_TIMEOUT, 0 };
1.1 misho 792:
793: if (!srv) {
794: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start RPC server ...\n");
795: return -1;
796: }
797:
1.3.2.9 misho 798: if (listen(srv->srv_server.cli_sock, SOMAXCONN) == -1) {
799: LOGERR;
800: return -1;
801: }
1.1 misho 802:
1.3.2.6 misho 803: while (srv->srv_kill != kill) {
1.1 misho 804: for (c = srv->srv_clients, i = 0; i < srv->srv_numcli && c; i++, c++)
805: if (!c->cli_sa.sa_family)
806: break;
1.2 misho 807: if (i >= srv->srv_numcli) {
1.1 misho 808: usleep(1000000);
809: continue;
810: }
1.2 misho 811:
812: FD_ZERO(&fds);
813: FD_SET(srv->srv_server.cli_sock, &fds);
814: ret = select(srv->srv_server.cli_sock + 1, &fds, NULL, NULL, &tv);
815: if (ret == -1) {
816: LOGERR;
817: ret = 1;
818: break;
819: }
820: if (!ret)
821: continue;
822:
1.3.2.9 misho 823: c->cli_sock = accept(srv->srv_server.cli_sock, &c->cli_sa, &salen);
1.1 misho 824: if (c->cli_sock == -1) {
825: LOGERR;
826: continue;
827: } else
828: c->cli_parent = srv;
829:
830: if (pthread_create(&c->cli_tid, NULL, rpc_srv_dispatchCall, c)) {
831: LOGERR;
832: continue;
1.3.2.9 misho 833: } else
834: pthread_detach(c->cli_tid);
1.1 misho 835: }
836:
837: return 0;
838: }
839:
840: // ---------------------------------------------------------
841:
842: /*
843: * rpc_srv_execCall() Execute registered call from RPC server
844: * @call = Register RPC call
845: * @rpc = IN RPC call structure
846: * @args = IN RPC call array of rpc values
847: * return: -1 error, !=-1 ok
848: */
849: int
1.2 misho 850: rpc_srv_execCall(rpc_func_t * __restrict call, struct tagRPCCall * __restrict rpc,
851: rpc_val_t * __restrict args)
1.1 misho 852: {
853: void *dl;
854: rpc_callback_t func;
855: int ret;
856:
1.2 misho 857: if (!call || !rpc || !call->func_parent) {
1.1 misho 858: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t exec call from RPC server ...\n");
859: return -1;
860: }
861:
862: dl = dlopen((char*) (*call->func_file ? call->func_file : NULL), RTLD_NOW);
863: if (!dl) {
864: rpc_SetErr(ENOENT, "Error:: Can`t attach module %s!\n", dlerror());
865: return -1;
866: }
867:
868: func = dlsym(dl, (char*) call->func_name);
869: if (func)
1.2 misho 870: ret = func(call, rpc->call_argc, args);
1.1 misho 871: else {
872: rpc_SetErr(ENOEXEC, "Error:: Can`t find function %s!\n", dlerror());
873: ret = -1;
874: }
875:
876: dlclose(dl);
877: return ret;
878: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>