Annotation of libaitrpc/src/srv.c, revision 1.4.2.2
1.1 misho 1: /*************************************************************************
2: * (C) 2010 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.4.2.2 ! misho 6: * $Id: srv.c,v 1.4.2.1 2011/08/29 23:26:56 misho Exp $
1.1 misho 7: *
1.2 misho 8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011
16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
1.1 misho 46: #include "global.h"
47:
48:
49: static void *
50: rpc_srv_dispatchCall(void *arg)
51: {
52: rpc_cli_t *c = arg;
53: rpc_srv_t *s;
1.4.2.1 misho 54: ait_val_t *vals = NULL, *v = NULL;
1.3 misho 55: rpc_func_t *f = NULL;
1.1 misho 56: struct tagRPCCall *rpc;
1.3 misho 57: struct tagRPCRet *rrpc;
58: rpc_sess_t ses = { 0 };
1.1 misho 59: fd_set fds;
1.4.2.2 ! misho 60: u_char *buf, *data;
1.2 misho 61: int ret, argc = 0, Limit = 0;
1.1 misho 62: register int i;
63:
64: if (!arg) {
65: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t procced RPC client ...\n");
66: return NULL;
67: } else
68: s = c->cli_parent;
69:
1.4.2.2 ! misho 70: buf = malloc(s->srv_netbuf);
! 71: if (!buf) {
! 72: LOGERR;
! 73: return NULL;
! 74: }
! 75:
1.1 misho 76: do {
1.3 misho 77: v = NULL;
1.1 misho 78: FD_ZERO(&fds);
79: FD_SET(c->cli_sock, &fds);
80: ret = select(c->cli_sock + 1, &fds, NULL, NULL, NULL);
81: if (ret == -1) {
1.4 misho 82: if (errno == EINTR && s->srv_kill != kill)
83: continue;
84:
85: LOGERR;
1.1 misho 86: ret = -2;
1.4 misho 87: break;
1.1 misho 88: }
1.4.2.2 ! misho 89: memset(buf, 0, s->srv_netbuf);
! 90: ret = recv(c->cli_sock, buf, s->srv_netbuf, 0);
1.4 misho 91: if (ret == -1) {
1.1 misho 92: LOGERR;
93: ret = -3;
94: break;
95: }
1.4 misho 96: if (!ret) { /* receive EOF */
1.1 misho 97: ret = 0;
98: break;
99: }
100: if (ret < sizeof(struct tagRPCCall)) {
101: rpc_SetErr(EMSGSIZE, "Error:: too short RPC packet ...\n");
102: ret = -4;
103: break;
104: } else
105: rpc = (struct tagRPCCall*) buf;
1.4 misho 106: /* check RPC packet session info */
1.1 misho 107: if (memcmp(&rpc->call_session, &s->srv_session, sizeof rpc->call_session)) {
108: rpc_SetErr(EINVAL, "Error:: get invalid RPC session ...\n");
109: ret = -5;
1.2 misho 110: goto makeReply;
111: } else
112: Limit = sizeof(struct tagRPCCall);
1.4.2.2 ! misho 113:
1.4 misho 114: /* RPC is OK! Go decapsulate variables ... */
1.1 misho 115: if (rpc->call_argc) {
1.4.2.1 misho 116: v = (ait_val_t*) (buf + Limit);
1.4.2.2 ! misho 117: if (rpc->call_argc * sizeof(ait_val_t) > s->srv_netbuf - Limit) {
1.4 misho 118: rpc_SetErr(EMSGSIZE, "Error:: too long RPC packet ...\n");
1.2 misho 119: ret = -5;
120: goto makeReply;
121: } else
1.4.2.1 misho 122: Limit += rpc->call_argc * sizeof(ait_val_t);
1.4.2.2 ! misho 123:
! 124: /* RPC received variables types are OK! */
1.4.2.1 misho 125: data = (u_char*) v + rpc->call_argc * sizeof(ait_val_t);
1.1 misho 126: for (i = 0; i < rpc->call_argc; i++) {
1.4.2.2 ! misho 127: switch (AIT_TYPE(&v[i])) {
1.1 misho 128: case buffer:
1.4.2.2 ! misho 129: if (AIT_LEN(&v[i]) > s->srv_netbuf - Limit) {
1.4 misho 130: rpc_SetErr(EMSGSIZE, "Error:: too long RPC packet ...\n");
1.2 misho 131: ret = -5;
132: goto makeReply;
133: } else
1.4.2.2 ! misho 134: Limit += AIT_LEN(&v[i]);
1.2 misho 135:
1.1 misho 136: v[i].val.buffer = data;
1.4.2.2 ! misho 137: data += AIT_LEN(&v[i]);
1.1 misho 138: break;
139: case string:
1.4.2.2 ! misho 140: if (AIT_LEN(&v[i]) > s->srv_netbuf - Limit) {
1.4 misho 141: rpc_SetErr(EMSGSIZE, "Error:: too long RPC packet ...\n");
1.2 misho 142: ret = -5;
143: goto makeReply;
144: } else
1.4.2.2 ! misho 145: Limit += AIT_LEN(&v[i]);
1.2 misho 146:
1.1 misho 147: v[i].val.string = (int8_t*) data;
1.4.2.2 ! misho 148: data += AIT_LEN(&v[i]);
1.1 misho 149: break;
1.2 misho 150: case blob:
151: if (s->srv_blob.state == disable) {
152: rpc_SetErr(ENOTSUP, "Error:: BLOB server is disabled\n");
153: ret = -5;
154: goto makeReply;
155: }
1.4 misho 156: if (s->srv_blob.state == kill) {
157: rpc_SetErr(ENOTSUP, "Error:: BLOB server is gone.\n");
158: ret = -5;
159: goto makeReply;
160: }
1.1 misho 161: default:
162: break;
163: }
1.4.2.2 ! misho 164:
! 165: AIT_ADDZCOPY(&v[i]);
1.1 misho 166: }
167: }
168:
1.4 misho 169: /* execute call */
1.1 misho 170: argc = 0;
171: vals = NULL;
1.3 misho 172: memcpy(&ses, &rpc->call_session, sizeof ses);
1.1 misho 173: if (!(f = rpc_srv_getCall(s, rpc->call_tag, rpc->call_hash))) {
174: rpc_SetErr(EINVAL, "Error:: call not found into RPC server ...\n");
175: ret = -6;
176: } else
1.2 misho 177: if ((ret = rpc_srv_execCall(f, rpc, v)) == -1)
178: ret = -9;
1.1 misho 179: else
1.4 misho 180: argc = rpc_srv_getVars(f, &vals);
1.3 misho 181: makeReply:
1.4 misho 182: /* made reply */
1.4.2.2 ! misho 183: memset(buf, 0, s->srv_netbuf);
1.3 misho 184: rrpc = (struct tagRPCRet*) buf;
185: Limit = sizeof(struct tagRPCRet);
1.1 misho 186:
1.3 misho 187: memcpy(&rrpc->ret_session, &ses, sizeof(rpc_sess_t));
188: rrpc->ret_tag = rpc->call_tag;
189: rrpc->ret_hash = rpc->call_hash;
190: rrpc->ret_errno = rpc_Errno;
191: rrpc->ret_retcode = ret;
192: rrpc->ret_argc = argc;
1.1 misho 193:
194: if (argc && vals) {
1.4.2.2 ! misho 195: /* Go Encapsulate variables ... */
1.4.2.1 misho 196: v = (ait_val_t*) (buf + Limit);
1.4.2.2 ! misho 197: if (argc * sizeof(ait_val_t) > s->srv_netbuf - Limit) {
1.3 misho 198: for (i = 0; i < argc; i++)
1.4.2.1 misho 199: AIT_FREE_VAL(&vals[i]);
1.4 misho 200: rpc_srv_freeVars(f);
1.3 misho 201: vals = NULL;
202: argc = 0;
203: ret = -7;
204: rpc_SetErr(EMSGSIZE, "Error:: in prepare RPC packet values (-7) ...\n");
205: goto makeReply;
206: } else
1.4.2.1 misho 207: Limit += argc * sizeof(ait_val_t);
208: memcpy(v, vals, argc * sizeof(ait_val_t));
1.4.2.2 ! misho 209:
! 210: /* RPC send variables types are OK! */
1.4.2.1 misho 211: data = (u_char*) v + argc * sizeof(ait_val_t);
1.1 misho 212: for (ret = i = 0; i < argc; i++) {
1.4.2.2 ! misho 213: switch (AIT_TYPE(&vals[i])) {
1.1 misho 214: case buffer:
1.4.2.2 ! misho 215: if (ret || Limit + vals[i].val_len > s->srv_netbuf) {
1.1 misho 216: rpc_SetErr(EMSGSIZE, "Error:: in prepare RPC packet (-7) ...\n");
1.3 misho 217: rrpc->ret_retcode = ret = -7;
218: rrpc->ret_argc = 0;
1.1 misho 219: break;
220: }
221:
222: memcpy(data, vals[i].val.buffer, vals[i].val_len);
1.4.2.2 ! misho 223: data += AIT_LEN(&vals[i]);
! 224: Limit += AIT_LEN(&vals[i]);
1.1 misho 225: break;
226: case string:
1.4.2.2 ! misho 227: if (ret || Limit + vals[i].val_len > s->srv_netbuf) {
1.1 misho 228: rpc_SetErr(EMSGSIZE, "Error:: in prepare RPC packet (-7) ...\n");
1.3 misho 229: rrpc->ret_retcode = ret = -7;
230: rrpc->ret_argc = 0;
1.1 misho 231: break;
232: }
233:
1.3 misho 234: memcpy(data, vals[i].val.string, vals[i].val_len);
1.4.2.2 ! misho 235: data += AIT_LEN(&vals[i]);
! 236: Limit += AIT_LEN(&vals[i]);
1.1 misho 237: break;
1.2 misho 238: case blob:
239: if (s->srv_blob.state == disable) {
240: rpc_SetErr(ENOTSUP, "Error:: BLOB server is disabled\n");
1.3 misho 241: rrpc->ret_retcode = ret = -5;
242: rrpc->ret_argc = 0;
1.1 misho 243: break;
244: }
1.4 misho 245: if (s->srv_blob.state == kill) {
246: rpc_SetErr(ENOTSUP, "Error:: BLOB server is gone.\n");
247: rrpc->ret_retcode = ret = -5;
248: rrpc->ret_argc = 0;
249: break;
250: }
1.1 misho 251: default:
252: break;
253: }
254:
1.4.2.2 ! misho 255: /* don't add zero copy at this position, because buffer/string must be freed! */
1.4.2.1 misho 256: AIT_FREE_VAL(&vals[i]);
1.1 misho 257: }
1.4 misho 258: rpc_srv_freeVars(f);
1.3 misho 259: vals = NULL;
260: argc = 0;
1.1 misho 261: }
262:
1.4 misho 263: ret = send(c->cli_sock, buf, Limit, 0);
264: if (ret == -1) {
1.1 misho 265: LOGERR;
266: ret = -8;
267: break;
268: }
269: if (ret != Limit) {
1.2 misho 270: rpc_SetErr(ECANCELED, "Error:: in send RPC request, should be send %d bytes, "
1.1 misho 271: "really is %d\n", Limit, ret);
272: ret = -9;
273: break;
274: }
1.4 misho 275: } while (ret > -1 || s->srv_kill != kill);
1.1 misho 276:
277: shutdown(c->cli_sock, SHUT_RDWR);
278: close(c->cli_sock);
279: memset(c, 0, sizeof(rpc_cli_t));
1.4.2.2 ! misho 280: free(buf);
1.2 misho 281: return (void*) (long)ret;
282: }
283:
284:
285: static void *
286: rpc_srv_dispatchVars(void *arg)
287: {
288: rpc_cli_t *c = arg;
289: rpc_srv_t *s;
290: rpc_blob_t *b;
1.4 misho 291: int ret = 0;
1.2 misho 292: fd_set fds;
293: u_char buf[sizeof(struct tagBLOBHdr)];
294: struct tagBLOBHdr *blob;
295:
296: if (!arg) {
297: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t procced BLOB client ...\n");
298: return NULL;
299: } else
300: s = c->cli_parent;
301:
302: do {
1.4 misho 303: /* check for disable service at this moment? */
304: if (s->srv_blob.state == disable && s->srv_kill != kill) {
305: usleep(100000);
306: #ifdef HAVE_PTHREAD_YIELD
307: pthread_yield();
308: #endif
309: continue;
1.2 misho 310: }
311:
312: FD_ZERO(&fds);
313: FD_SET(c->cli_sock, &fds);
314: ret = select(c->cli_sock + 1, &fds, NULL, NULL, NULL);
315: if (ret == -1) {
1.4 misho 316: if (errno == EINTR && s->srv_kill != kill && s->srv_blob.state != kill)
317: continue;
318:
319: LOGERR;
1.2 misho 320: ret = -2;
1.4 misho 321: break;
1.2 misho 322: }
323:
324: memset(buf, 0, sizeof buf);
1.4 misho 325: ret = recv(c->cli_sock, buf, sizeof buf, 0);
326: if (ret == -1) {
1.2 misho 327: LOGERR;
328: ret = -3;
329: break;
330: }
1.4 misho 331: /* receive EOF, disable or kill service */
332: if (!ret || s->srv_blob.state == kill || s->srv_kill == kill) {
1.2 misho 333: ret = 0;
334: break;
335: }
336: if (ret < sizeof(struct tagBLOBHdr)) {
337: rpc_SetErr(EMSGSIZE, "Error:: too short BLOB packet ...\n");
338: ret = -4;
339: break;
340: } else
341: blob = (struct tagBLOBHdr*) buf;
1.4 misho 342: /* check BLOB packet session info */
1.2 misho 343: if (memcmp(&blob->hdr_session, &s->srv_session, sizeof blob->hdr_session)) {
344: rpc_SetErr(EINVAL, "Error:: get invalid BLOB session ...\n");
345: ret = -5;
346: goto makeReply;
347: }
1.4 misho 348: /* Go to proceed packet ... */
1.2 misho 349: switch (blob->hdr_cmd) {
350: case get:
351: if (!(b = rpc_srv_getBLOB(s, blob->hdr_var))) {
352: rpc_SetErr(EINVAL, "Error:: var (%x) not found into BLOB server ...\n",
353: blob->hdr_var);
354: ret = -6;
355: break;
356: } else
357: blob->hdr_len = b->blob_len;
358:
359: if (rpc_srv_blobMap(s, b) != -1) {
360: ret = rpc_srv_sendBLOB(c, b);
361: rpc_srv_blobUnmap(b);
362: } else
363: ret = -7;
364: break;
365: case set:
366: if ((b = rpc_srv_registerBLOB(s, blob->hdr_len))) {
1.4 misho 367: /* set new BLOB variable for reply :) */
1.2 misho 368: blob->hdr_var = b->blob_var;
369:
370: ret = rpc_srv_recvBLOB(c, b);
371: rpc_srv_blobUnmap(b);
372: } else
373: ret = -7;
374: break;
375: case unset:
376: ret = rpc_srv_unregisterBLOB(s, blob->hdr_var);
377: if (ret == -1)
378: ret = -7;
379: break;
380: default:
381: rpc_SetErr(EINVAL, "Error:: unsupported BLOB command (%d)...\n",
382: blob->hdr_cmd);
383: ret = -7;
384: }
385:
386: makeReply:
1.4 misho 387: /* Replay to client! */
1.2 misho 388: blob->hdr_cmd = ret < 0 ? error : ok;
389: blob->hdr_ret = ret;
1.4 misho 390: ret = send(c->cli_sock, buf, sizeof buf, 0);
391: if (ret == -1) {
1.2 misho 392: LOGERR;
393: ret = -8;
394: break;
395: }
396: if (ret != sizeof buf) {
397: rpc_SetErr(ECANCELED, "Error:: in send BLOB reply, should be send %d bytes, "
398: "really is %d\n", sizeof buf, ret);
399: ret = -9;
400: break;
401: }
1.4 misho 402: } while (ret > -1 || s->srv_kill != kill);
1.2 misho 403:
404: shutdown(c->cli_sock, SHUT_RDWR);
405: close(c->cli_sock);
406: memset(c, 0, sizeof(rpc_cli_t));
407: return (void*) (long)ret;
1.1 misho 408: }
409:
410: // -------------------------------------------------
411:
412: /*
1.2 misho 413: * rpc_srv_initBLOBServer() Init & create BLOB Server
1.4 misho 414: * @srv = RPC server instance
1.2 misho 415: * @Port = Port for bind server, if Port == 0 default port is selected
416: * @diskDir = Disk place for BLOB file objects
417: * return: -1 == error or 0 bind and created BLOB server instance
418: */
419: int
420: rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)
421: {
422: int n = 1;
1.4 misho 423: struct sockaddr sa;
424: struct sockaddr_in *sin = (struct sockaddr_in*) &sa;
425: struct sockaddr_in6 *sin6 = (struct sockaddr_in6*) &sa;
426: struct sockaddr_un *sun = (struct sockaddr_un*) &sa;
1.2 misho 427:
428: if (!srv) {
429: rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t init BLOB server ...\n");
430: return -1;
431: }
432: if (srv->srv_blob.state) {
433: rpc_SetErr(EPERM, "Warning:: Already started BLOB server!\n");
434: return 0;
435: }
436: if (!Port)
437: Port = RPC_DEFPORT + 1;
438:
439: memset(&srv->srv_blob, 0, sizeof srv->srv_blob);
440: if (access(diskDir, R_OK | W_OK) == -1) {
441: LOGERR;
442: return -1;
443: } else
444: strlcpy(srv->srv_blob.dir, diskDir, UCHAR_MAX + 1);
445:
446: srv->srv_blob.server.cli_tid = pthread_self();
447: srv->srv_blob.server.cli_parent = srv;
1.4 misho 448:
449: memcpy(&sa, &srv->srv_server.cli_sa, sizeof sa);
450: switch (srv->srv_server.cli_sa.sa_family) {
451: case AF_INET:
452: sin->sin_port = htons(Port);
453: memcpy(&srv->srv_blob.server.cli_sa, sin, sizeof(struct sockaddr));
454: break;
455: case AF_INET6:
456: sin6->sin6_port = htons(Port);
457: memcpy(&srv->srv_blob.server.cli_sa, sin6, sizeof(struct sockaddr));
458: break;
459: case AF_LOCAL:
460: strlcat(sun->sun_path, ".blob", sizeof sun->sun_path);
461: memcpy(&srv->srv_blob.server.cli_sa, sun, sizeof(struct sockaddr));
462: break;
463: default:
464: return -1;
1.2 misho 465: }
466:
1.4 misho 467: /* create BLOB server socket */
1.2 misho 468: srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa_family, SOCK_STREAM, 0);
469: if (srv->srv_blob.server.cli_sock == -1) {
470: LOGERR;
471: return -1;
472: }
473: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
474: LOGERR;
475: close(srv->srv_blob.server.cli_sock);
476: return -1;
477: }
1.4.2.2 ! misho 478: n = srv->srv_netbuf;
! 479: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
! 480: LOGERR;
! 481: close(srv->srv_blob.server.cli_sock);
! 482: return -1;
! 483: }
! 484: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
! 485: LOGERR;
! 486: close(srv->srv_blob.server.cli_sock);
! 487: return -1;
! 488: }
1.2 misho 489: if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa,
490: sizeof srv->srv_blob.server.cli_sa) == -1) {
491: LOGERR;
492: close(srv->srv_blob.server.cli_sock);
493: return -1;
494: }
495:
1.4 misho 496: /* allocate pool for concurent clients */
1.2 misho 497: srv->srv_blob.clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));
498: if (!srv->srv_blob.clients) {
499: LOGERR;
500: close(srv->srv_blob.server.cli_sock);
501: return -1;
502: } else
503: memset(srv->srv_blob.clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));
504:
505: pthread_mutex_init(&srv->srv_blob.mtx, NULL);
506:
507: pthread_mutex_lock(&srv->srv_mtx);
508: rpc_srv_registerCall(srv, NULL, CALL_BLOBSHUTDOWN, 0);
509: rpc_srv_registerCall(srv, NULL, CALL_BLOBCLIENTS, 0);
510: rpc_srv_registerCall(srv, NULL, CALL_BLOBVARS, 0);
511: rpc_srv_registerCall(srv, NULL, CALL_BLOBSTATE, 1);
512: pthread_mutex_unlock(&srv->srv_mtx);
513:
1.4 misho 514: srv->srv_blob.state = enable; /* enable BLOB */
1.2 misho 515: return 0;
516: }
517:
518: /*
519: * rpc_srv_endBLOBServer() Destroy BLOB server, close all opened sockets and free resources
520: * @srv = RPC Server instance
521: * return: none
522: */
523: void
524: rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
525: {
526: rpc_cli_t *c;
527: register int i;
528: rpc_blob_t *f;
529:
530: if (!srv) {
531: rpc_SetErr(EINVAL, "Error:: Can`t destroy server because parameter is null!\n");
532: return;
533: } else
1.4 misho 534: srv->srv_blob.state = kill;
1.2 misho 535:
536: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSHUTDOWN);
537: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBCLIENTS);
538: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBVARS);
539: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSTATE);
540:
1.4 misho 541: /* close all clients connections & server socket */
1.2 misho 542: for (i = 0, c = srv->srv_blob.clients; i < srv->srv_numcli && c; i++, c++)
543: if (c->cli_sa.sa_family)
544: shutdown(c->cli_sock, SHUT_RDWR);
545: close(srv->srv_blob.server.cli_sock);
546:
547: if (srv->srv_blob.clients) {
548: free(srv->srv_blob.clients);
549: srv->srv_blob.clients = NULL;
550: }
551:
1.4 misho 552: /* detach blobs */
1.2 misho 553: pthread_mutex_lock(&srv->srv_blob.mtx);
554: while ((f = srv->srv_blob.blobs)) {
555: srv->srv_blob.blobs = f->blob_next;
556: rpc_srv_blobFree(srv, f);
557: free(f);
558: }
559: pthread_mutex_unlock(&srv->srv_blob.mtx);
560:
561: while (pthread_mutex_trylock(&srv->srv_blob.mtx) == EBUSY);
562: pthread_mutex_destroy(&srv->srv_blob.mtx);
563: }
564:
565: /*
566: * rpc_srv_execBLOBServer() Execute Main BLOB server loop and wait for clients requests
567: * @srv = RPC Server instance
568: * return: -1 error or 0 ok, infinite loop ...
569: */
570: int
571: rpc_srv_execBLOBServer(rpc_srv_t * __restrict srv)
572: {
573: socklen_t salen = sizeof(struct sockaddr);
574: register int i;
575: rpc_cli_t *c;
576: fd_set fds;
577: int ret;
578: struct timeval tv = { DEF_RPC_TIMEOUT, 0 };
579:
1.4 misho 580: if (!srv || srv->srv_blob.state == kill) {
1.2 misho 581: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start BLOB server ...\n");
582: return -1;
583: }
584:
585: if (listen(srv->srv_blob.server.cli_sock, SOMAXCONN) == -1) {
586: LOGERR;
587: return -1;
588: }
589:
1.4 misho 590: while (srv->srv_blob.state != kill && srv->srv_kill != kill) {
1.2 misho 591: for (c = srv->srv_blob.clients, i = 0; i < srv->srv_numcli && c; i++, c++)
592: if (!c->cli_sa.sa_family)
593: break;
594: if (i >= srv->srv_numcli) {
595: usleep(1000000);
1.4.2.2 ! misho 596: #ifdef HAVE_PTHREAD_YIELD
! 597: pthread_yield();
! 598: #endif
1.2 misho 599: continue;
600: }
601:
602: FD_ZERO(&fds);
603: FD_SET(srv->srv_blob.server.cli_sock, &fds);
604: ret = select(srv->srv_blob.server.cli_sock + 1, &fds, NULL, NULL, &tv);
605: if (ret == -1) {
606: LOGERR;
607: ret = 1;
608: break;
609: }
610: if (!ret)
611: continue;
612:
613: c->cli_sock = accept(srv->srv_blob.server.cli_sock, &c->cli_sa, &salen);
614: if (c->cli_sock == -1) {
615: LOGERR;
616: continue;
617: } else
618: c->cli_parent = srv;
619:
620: if (pthread_create(&c->cli_tid, NULL, rpc_srv_dispatchVars, c)) {
621: LOGERR;
622: continue;
1.4 misho 623: } else
624: pthread_detach(c->cli_tid);
1.2 misho 625: }
626:
627: srv->srv_blob.state = disable;
628:
629: return 0;
630: }
631:
632:
633: /*
1.1 misho 634: * rpc_srv_initServer() Init & create RPC Server
635: * @regProgID = ProgramID for authentication & recognition
636: * @regProcID = ProcessID for authentication & recognition
637: * @concurentClients = Concurent clients at same time to this server
1.4.2.2 ! misho 638: * @netBuf = Network buffer length, if =0 == BUFSIZ
1.4 misho 639: * @family = Family type, AF_INET, AF_INET6 or AF_LOCAL
640: * @csHost = Host name or address for bind server, if NULL any address
1.1 misho 641: * @Port = Port for bind server, if Port == 0 default port is selected
642: * return: NULL == error or !=NULL bind and created RPC server instance
643: */
644: rpc_srv_t *
645: rpc_srv_initServer(u_int regProgID, u_int regProcID, int concurentClients,
1.4.2.2 ! misho 646: int netBuf, u_short family, const char *csHost, u_short Port)
1.1 misho 647: {
648: rpc_srv_t *srv = NULL;
649: int n = 1;
650: struct hostent *host = NULL;
1.4 misho 651: struct sockaddr sa;
652: struct sockaddr_in *sin = (struct sockaddr_in*) &sa;
653: struct sockaddr_in6 *sin6 = (struct sockaddr_in6*) &sa;
654: struct sockaddr_un *sun = (struct sockaddr_un*) &sa;
1.1 misho 655:
1.4 misho 656: if (!concurentClients || !regProgID ||
657: (family != AF_INET && family != AF_INET6 && family != AF_LOCAL)) {
1.1 misho 658: rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t init RPC server ...\n");
659: return NULL;
660: }
661: if (!Port)
662: Port = RPC_DEFPORT;
1.4.2.2 ! misho 663: if (!netBuf)
! 664: netBuf = BUFSIZ;
1.4 misho 665: if (csHost && family != AF_LOCAL) {
1.1 misho 666: host = gethostbyname2(csHost, family);
667: if (!host) {
668: rpc_SetErr(h_errno, "Error:: %s\n", hstrerror(h_errno));
669: return NULL;
670: }
671: }
1.4 misho 672: memset(&sa, 0, sizeof sa);
673: sa.sa_family = family;
1.1 misho 674: switch (family) {
675: case AF_INET:
1.4 misho 676: sin->sin_len = sizeof(struct sockaddr_in);
677: sin->sin_port = htons(Port);
1.1 misho 678: if (csHost)
1.4 misho 679: memcpy(&sin->sin_addr, host->h_addr, host->h_length);
1.1 misho 680: break;
681: case AF_INET6:
1.4 misho 682: sin6->sin6_len = sizeof(struct sockaddr_in6);
683: sin6->sin6_port = htons(Port);
684: if (csHost)
685: memcpy(&sin6->sin6_addr, host->h_addr, host->h_length);
686: break;
687: case AF_LOCAL:
688: sun->sun_len = sizeof(struct sockaddr_un);
1.1 misho 689: if (csHost)
1.4 misho 690: strlcpy(sun->sun_path, csHost, sizeof sun->sun_path);
1.1 misho 691: break;
692: default:
693: rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t start RPC server ...\n");
694: return NULL;
695: }
696:
697: srv = malloc(sizeof(rpc_srv_t));
698: if (!srv) {
699: LOGERR;
700: return NULL;
701: } else
702: memset(srv, 0, sizeof(rpc_srv_t));
703:
1.4.2.2 ! misho 704: srv->srv_netbuf = netBuf;
1.1 misho 705: srv->srv_numcli = concurentClients;
706: srv->srv_session.sess_version = RPC_VERSION;
707: srv->srv_session.sess_program = regProgID;
708: srv->srv_session.sess_process = regProcID;
709:
1.2 misho 710: srv->srv_server.cli_tid = pthread_self();
1.1 misho 711: srv->srv_server.cli_parent = srv;
1.4 misho 712: switch (family) {
713: case AF_INET:
714: memcpy(&srv->srv_server.cli_sa, sin, sizeof srv->srv_server.cli_sa);
715: break;
716: case AF_INET6:
717: memcpy(&srv->srv_server.cli_sa, sin6, sizeof srv->srv_server.cli_sa);
718: break;
719: case AF_LOCAL:
720: memcpy(&srv->srv_server.cli_sa, sun, sizeof srv->srv_server.cli_sa);
721: unlink(sun->sun_path);
722: break;
723: }
724:
725: /* create server socket */
1.1 misho 726: srv->srv_server.cli_sock = socket(family, SOCK_STREAM, 0);
727: if (srv->srv_server.cli_sock == -1) {
728: LOGERR;
729: free(srv);
730: return NULL;
731: }
732: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
733: LOGERR;
734: close(srv->srv_server.cli_sock);
735: free(srv);
736: return NULL;
737: }
1.4.2.2 ! misho 738: n = srv->srv_netbuf;
! 739: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
! 740: LOGERR;
! 741: close(srv->srv_server.cli_sock);
! 742: free(srv);
! 743: return NULL;
! 744: }
! 745: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
! 746: LOGERR;
! 747: close(srv->srv_server.cli_sock);
! 748: free(srv);
! 749: return NULL;
! 750: }
1.1 misho 751: if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa, sizeof srv->srv_server.cli_sa) == -1) {
752: LOGERR;
753: close(srv->srv_server.cli_sock);
754: free(srv);
755: return NULL;
756: }
757:
1.4 misho 758: /* allocate pool for concurent clients */
1.1 misho 759: srv->srv_clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));
760: if (!srv->srv_clients) {
761: LOGERR;
762: close(srv->srv_server.cli_sock);
763: free(srv);
764: return NULL;
765: } else
766: memset(srv->srv_clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));
767:
1.2 misho 768: pthread_mutex_init(&srv->srv_mtx, NULL);
769:
770: rpc_srv_registerCall(srv, NULL, CALL_SRVSHUTDOWN, 0);
1.1 misho 771: rpc_srv_registerCall(srv, NULL, CALL_SRVCLIENTS, 0);
772: rpc_srv_registerCall(srv, NULL, CALL_SRVCALLS, 0);
773: rpc_srv_registerCall(srv, NULL, CALL_SRVSESSIONS, 0);
774: return srv;
775: }
776:
777: /*
778: * rpc_srv_endServer() Destroy RPC server, close all opened sockets and free resources
779: * @srv = RPC Server instance
780: * return: none
781: */
782: void
783: rpc_srv_endServer(rpc_srv_t * __restrict srv)
784: {
785: rpc_cli_t *c;
786: register int i;
787: rpc_func_t *f;
788:
789: if (!srv) {
790: rpc_SetErr(EINVAL, "Error:: Can`t destroy server because parameter is null!\n");
791: return;
792: }
793:
1.2 misho 794: rpc_srv_endBLOBServer(srv);
1.1 misho 795:
1.4 misho 796: /* close all clients connections & server socket */
1.1 misho 797: for (i = 0, c = srv->srv_clients; i < srv->srv_numcli && c; i++, c++)
1.2 misho 798: if (c->cli_sa.sa_family) {
1.1 misho 799: shutdown(c->cli_sock, SHUT_RDWR);
1.2 misho 800: close(c->cli_sock);
801: }
1.1 misho 802: close(srv->srv_server.cli_sock);
803:
804: if (srv->srv_clients) {
805: free(srv->srv_clients);
1.2 misho 806: srv->srv_clients = NULL;
1.1 misho 807: srv->srv_numcli = 0;
808: }
809:
1.4 misho 810: /* detach exported calls */
1.2 misho 811: pthread_mutex_lock(&srv->srv_mtx);
812: while ((f = srv->srv_funcs)) {
813: srv->srv_funcs = f->func_next;
814: free(f);
815: }
816: pthread_mutex_unlock(&srv->srv_mtx);
817:
818: while (pthread_mutex_trylock(&srv->srv_mtx) == EBUSY);
819: pthread_mutex_destroy(&srv->srv_mtx);
820:
1.1 misho 821: free(srv);
822: srv = NULL;
823: }
824:
825: /*
826: * rpc_srv_execServer() Execute Main server loop and wait for clients requests
827: * @srv = RPC Server instance
828: * return: -1 error or 0 ok, infinite loop ...
829: */
830: int
831: rpc_srv_execServer(rpc_srv_t * __restrict srv)
832: {
833: socklen_t salen = sizeof(struct sockaddr);
834: register int i;
835: rpc_cli_t *c;
1.2 misho 836: fd_set fds;
837: int ret;
838: struct timeval tv = { DEF_RPC_TIMEOUT, 0 };
1.1 misho 839:
840: if (!srv) {
841: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start RPC server ...\n");
842: return -1;
843: }
844:
845: if (listen(srv->srv_server.cli_sock, SOMAXCONN) == -1) {
846: LOGERR;
847: return -1;
848: }
849:
1.4 misho 850: while (srv->srv_kill != kill) {
1.1 misho 851: for (c = srv->srv_clients, i = 0; i < srv->srv_numcli && c; i++, c++)
852: if (!c->cli_sa.sa_family)
853: break;
1.2 misho 854: if (i >= srv->srv_numcli) {
1.1 misho 855: usleep(1000000);
1.4.2.2 ! misho 856: #ifdef HAVE_PTHREAD_YIELD
! 857: pthread_yield();
! 858: #endif
1.1 misho 859: continue;
860: }
1.2 misho 861:
862: FD_ZERO(&fds);
863: FD_SET(srv->srv_server.cli_sock, &fds);
864: ret = select(srv->srv_server.cli_sock + 1, &fds, NULL, NULL, &tv);
865: if (ret == -1) {
866: LOGERR;
867: ret = 1;
868: break;
869: }
870: if (!ret)
871: continue;
872:
1.1 misho 873: c->cli_sock = accept(srv->srv_server.cli_sock, &c->cli_sa, &salen);
874: if (c->cli_sock == -1) {
875: LOGERR;
876: continue;
877: } else
878: c->cli_parent = srv;
879:
880: if (pthread_create(&c->cli_tid, NULL, rpc_srv_dispatchCall, c)) {
881: LOGERR;
882: continue;
1.4 misho 883: } else
884: pthread_detach(c->cli_tid);
1.1 misho 885: }
886:
887: return 0;
888: }
889:
890: // ---------------------------------------------------------
891:
892: /*
893: * rpc_srv_execCall() Execute registered call from RPC server
894: * @call = Register RPC call
895: * @rpc = IN RPC call structure
896: * @args = IN RPC call array of rpc values
897: * return: -1 error, !=-1 ok
898: */
899: int
1.2 misho 900: rpc_srv_execCall(rpc_func_t * __restrict call, struct tagRPCCall * __restrict rpc,
1.4.2.1 misho 901: ait_val_t * __restrict args)
1.1 misho 902: {
903: void *dl;
904: rpc_callback_t func;
905: int ret;
906:
1.2 misho 907: if (!call || !rpc || !call->func_parent) {
1.1 misho 908: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t exec call from RPC server ...\n");
909: return -1;
910: }
911:
912: dl = dlopen((char*) (*call->func_file ? call->func_file : NULL), RTLD_NOW);
913: if (!dl) {
914: rpc_SetErr(ENOENT, "Error:: Can`t attach module %s!\n", dlerror());
915: return -1;
916: }
917:
918: func = dlsym(dl, (char*) call->func_name);
919: if (func)
1.2 misho 920: ret = func(call, rpc->call_argc, args);
1.1 misho 921: else {
922: rpc_SetErr(ENOEXEC, "Error:: Can`t find function %s!\n", dlerror());
923: ret = -1;
924: }
925:
926: dlclose(dl);
927: return ret;
928: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>