1: /*************************************************************************
2: * (C) 2010 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
6: * $Id: srv.c,v 1.6.2.7 2012/03/14 13:29:11 misho Exp $
7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47:
48:
49: static void *rxPacket(sched_task_t*);
50:
51: static void *
52: txPacket(sched_task_t *task)
53: {
54: rpc_cli_t *c = TASK_ARG(task);
55: rpc_srv_t *s = c->cli_parent;
56: rpc_func_t *f = NULL;
57: u_char *buf = TASK_DATA(task);
58: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
59: int ret, wlen = sizeof(struct tagRPCCall);
60: array_t *arr = NULL;
61:
62: FTRACE();
63:
64: if (rpc->call_argc) {
65: f = rpc_srv_getCall(s, ntohs(rpc->call_tag), ntohl(rpc->call_hash));
66: if (!f) {
67: rpc->call_argc ^= rpc->call_argc;
68: rpc->call_rep.ret = RPC_ERROR(-1);
69: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
70: } else {
71: rpc->call_argc = htons(rpc_srv_getVars(f, &arr));
72: /* Go Encapsulate variables */
73: ret = io_vars2buffer(buf + wlen, TASK_DATLEN(task) - wlen, arr);
74: io_clrVars(f->func_vars);
75: if (ret == -1) {
76: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
77: rpc->call_argc ^= rpc->call_argc;
78: rpc->call_rep.ret = RPC_ERROR(-1);
79: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
80: } else
81: wlen += ret;
82: }
83: }
84:
85: /* calculate CRC */
86: rpc->call_crc ^= rpc->call_crc;
87: rpc->call_crc = htons(crcFletcher16((u_short*) buf, ((wlen + 1) & ~1) / 2));
88:
89: /* send reply */
90: ret = send(TASK_FD(task), buf, wlen, 0);
91: if (ret == -1)
92: LOGERR;
93: else if (ret != wlen)
94: rpc_SetErr(EPROCUNAVAIL, "RPC reply, should be send %d bytes, "
95: "really sended %d bytes", wlen, ret);
96: else
97: LOGGER("Sended %d bytes", ret);
98:
99: /* lets get next packet */
100: schedRead(TASK_ROOT(task), rxPacket, TASK_ARG(task), TASK_FD(task),
101: TASK_DATA(task), TASK_DATLEN(task));
102: return NULL;
103: }
104:
105: static void *
106: execCall(sched_task_t *task)
107: {
108: rpc_cli_t *c = TASK_ARG(task);
109: rpc_srv_t *s = c->cli_parent;
110: rpc_func_t *f = NULL;
111: array_t *arr = NULL;
112: u_char *buf = TASK_DATA(task);
113: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
114: int argc = ntohs(rpc->call_argc);
115:
116: FTRACE();
117:
118: /* Go decapsulate variables ... */
119: if (!(rpc->call_req.flags & RPC_NOREPLY) && argc) {
120: arr = io_buffer2vars(buf + sizeof(struct tagRPCCall),
121: TASK_DATLEN(task) - sizeof(struct tagRPCCall), argc, 1);
122: if (!arr) {
123: rpc_SetErr(ERPCMISMATCH, "#%d - %s", io_GetErrno(), io_GetError());
124: rpc->call_argc ^= rpc->call_argc;
125: rpc->call_rep.ret = RPC_ERROR(-1);
126: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
127: return NULL;
128: }
129: }
130:
131: if (!(f = rpc_srv_getCall(s, ntohs(rpc->call_tag), ntohl(rpc->call_hash)))) {
132: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
133: rpc->call_argc ^= rpc->call_argc;
134: rpc->call_rep.ret = RPC_ERROR(-1);
135: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
136: } else {
137: LOGGER("RPC function %s from module %s", AIT_GET_STR(&f->func_name),
138: AIT_GET_STR(&f->func_file));
139:
140: rpc->call_rep.ret = RPC_ERROR(rpc_srv_execCall(f, rpc, arr));
141: if (rpc->call_rep.ret == htonl(-1)) {
142: rpc->call_rep.eno = RPC_ERROR(errno);
143: rpc->call_argc ^= rpc->call_argc;
144: } else {
145: rpc->call_rep.eno ^= rpc->call_rep.eno;
146: rpc->call_argc = htons(rpc_srv_getVars(f, NULL));
147: }
148: }
149:
150: if (arr)
151: io_arrayDestroy(&arr);
152: return NULL;
153: }
154:
155: static void *
156: rxPacket(sched_task_t *task)
157: {
158: rpc_cli_t *c = TASK_ARG(task);
159: rpc_srv_t *s = c->cli_parent;
160: u_char *buf = TASK_DATA(task);
161: int rlen;
162: u_short crc;
163: struct tagRPCCall *rpc;
164: struct timespec ts;
165:
166: FTRACE();
167:
168: memset(buf, 0, TASK_DATLEN(task));
169: rlen = recv(TASK_FD(task), buf, TASK_DATLEN(task), 0);
170: if (rlen == -1) {
171: LOGERR;
172: s->srv_kill = kill;
173:
174: schedRead(TASK_ROOT(task), rxPacket, TASK_ARG(task), TASK_FD(task),
175: TASK_DATA(task), TASK_DATLEN(task));
176: return NULL;
177: } else if (!rlen) { /* receive EOF */
178: s->srv_kill = kill;
179:
180: schedRead(TASK_ROOT(task), rxPacket, TASK_ARG(task), TASK_FD(task),
181: TASK_DATA(task), TASK_DATLEN(task));
182: return NULL;
183: } else
184: LOGGER("Readed %d bytes", rlen);
185:
186: if (rlen < sizeof(struct tagRPCCall)) {
187: rpc_SetErr(ERPCMISMATCH, "Too short RPC packet");
188:
189: schedRead(TASK_ROOT(task), rxPacket, TASK_ARG(task), TASK_FD(task),
190: TASK_DATA(task), TASK_DATLEN(task));
191: return NULL;
192: } else
193: rpc = (struct tagRPCCall*) buf;
194:
195: /* check integrity of packet */
196: crc = ntohs(rpc->call_crc);
197: rpc->call_crc ^= rpc->call_crc;
198: if (crc != crcFletcher16((u_short*) buf, ((rlen + 1) & ~1) / 2)) {
199: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
200:
201: schedRead(TASK_ROOT(task), rxPacket, TASK_ARG(task), TASK_FD(task),
202: TASK_DATA(task), TASK_DATLEN(task));
203: return NULL;
204: }
205:
206: /* check RPC packet session info */
207: if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) {
208: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
209: rpc->call_argc ^= rpc->call_argc;
210: rpc->call_rep.ret = RPC_ERROR(-1);
211: rpc->call_rep.eno = RPC_ERROR(errno);
212: goto end;
213: } else {
214: /* change socket timeout from last packet */
215: ts.tv_sec = rpc->call_session.sess_timeout;
216: ts.tv_nsec = 0;
217: schedPolling(TASK_ROOT(task), &ts, NULL);
218: }
219:
220: /* execute RPC call */
221: schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), 0,
222: TASK_DATA(task), TASK_DATLEN(task));
223:
224: end:
225: /* send RPC reply */
226: if (!(rpc->call_req.flags & RPC_NOREPLY))
227: schedWrite(TASK_ROOT(task), txPacket, TASK_ARG(task), TASK_FD(task),
228: TASK_DATA(task), TASK_DATLEN(task));
229: else
230: schedRead(TASK_ROOT(task), rxPacket, TASK_ARG(task), TASK_FD(task),
231: TASK_DATA(task), TASK_DATLEN(task));
232: return NULL;
233: }
234:
235: static void *
236: rpc_srv_dispatchCall(void *arg)
237: {
238: rpc_cli_t *c = arg;
239: rpc_srv_t *s;
240: u_char *buf;
241: sched_root_task_t *root;
242: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
243:
244: FTRACE();
245:
246: if (!arg) {
247: rpc_SetErr(EINVAL, "Invalid parameter can`t procced RPC client");
248: return NULL;
249: } else
250: s = c->cli_parent;
251:
252: /* allocate net buffer */
253: buf = malloc(s->srv_netbuf);
254: if (!buf) {
255: LOGERR;
256: return NULL;
257: }
258:
259: root = schedBegin();
260: if (!root) {
261: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
262: free(buf);
263: return NULL;
264: } else {
265: schedTermCondition(root, kill);
266: schedPolling(root, &ts, NULL);
267: }
268:
269: schedRead(root, rxPacket, c, c->cli_sock, buf, s->srv_netbuf);
270:
271: schedRun(root, (void*) &s->srv_kill);
272: schedEnd(&root);
273:
274: shutdown(c->cli_sock, SHUT_RDWR);
275: close(c->cli_sock);
276: memset(c, 0, sizeof(rpc_cli_t));
277: free(buf);
278: return NULL;
279: }
280:
281:
282: static void *
283: rpc_srv_dispatchVars(void *arg)
284: {
285: rpc_cli_t *c = arg;
286: rpc_srv_t *s;
287: rpc_blob_t *b;
288: int ret = 0;
289: fd_set fds;
290: u_char buf[sizeof(struct tagBLOBHdr)];
291: struct tagBLOBHdr *blob;
292:
293: FTRACE();
294:
295: if (!arg) {
296: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t procced BLOB client ...\n");
297: return NULL;
298: } else
299: s = c->cli_parent;
300:
301: do {
302: /* check for disable service at this moment? */
303: if (s->srv_blob.state == disable && s->srv_kill != kill) {
304: usleep(100000);
305: #ifdef HAVE_PTHREAD_YIELD
306: pthread_yield();
307: #endif
308: continue;
309: }
310:
311: FD_ZERO(&fds);
312: FD_SET(c->cli_sock, &fds);
313: ret = select(c->cli_sock + 1, &fds, NULL, NULL, NULL);
314: if (ret == -1) {
315: if (errno == EINTR && s->srv_kill != kill && s->srv_blob.state != kill)
316: continue;
317:
318: LOGERR;
319: ret = -2;
320: break;
321: }
322:
323: memset(buf, 0, sizeof buf);
324: ret = recv(c->cli_sock, buf, sizeof buf, 0);
325: if (ret == -1) {
326: LOGERR;
327: ret = -3;
328: break;
329: }
330: /* receive EOF, disable or kill service */
331: if (!ret || s->srv_blob.state == kill || s->srv_kill == kill) {
332: ret = 0;
333: break;
334: }
335: if (ret < sizeof(struct tagBLOBHdr)) {
336: rpc_SetErr(ERPCMISMATCH, "Error:: too short BLOB packet ...\n");
337: ret = -4;
338: if (s->srv_kill != kill && s->srv_blob.state != kill)
339: continue;
340: else
341: break;
342: } else
343: blob = (struct tagBLOBHdr*) buf;
344: /* check BLOB packet session info */
345: if (memcmp(&blob->hdr_session, &s->srv_session, sizeof blob->hdr_session)) {
346: rpc_SetErr(EINVAL, "Error:: get invalid BLOB session ...\n");
347: ret = -5;
348: goto makeReply;
349: }
350: /* Go to proceed packet ... */
351: switch (blob->hdr_cmd) {
352: case get:
353: if (!(b = rpc_srv_getBLOB(s, blob->hdr_var))) {
354: rpc_SetErr(EINVAL, "Error:: var (%x) not found into BLOB server ...\n",
355: blob->hdr_var);
356: ret = -6;
357: break;
358: } else
359: blob->hdr_len = b->blob_len;
360:
361: if (rpc_srv_blobMap(s, b) != -1) {
362: ret = rpc_srv_sendBLOB(c, b);
363: rpc_srv_blobUnmap(b);
364: } else
365: ret = -7;
366: break;
367: case set:
368: if ((b = rpc_srv_registerBLOB(s, blob->hdr_len))) {
369: /* set new BLOB variable for reply :) */
370: blob->hdr_var = b->blob_var;
371:
372: ret = rpc_srv_recvBLOB(c, b);
373: rpc_srv_blobUnmap(b);
374: } else
375: ret = -7;
376: break;
377: case unset:
378: ret = rpc_srv_unregisterBLOB(s, blob->hdr_var);
379: if (ret == -1)
380: ret = -7;
381: break;
382: default:
383: rpc_SetErr(EPROCUNAVAIL, "Error:: unsupported BLOB command (%d)...\n",
384: blob->hdr_cmd);
385: ret = -7;
386: }
387:
388: makeReply:
389: /* Replay to client! */
390: blob->hdr_cmd = ret < 0 ? error : ok;
391: blob->hdr_ret = ret;
392: ret = send(c->cli_sock, buf, sizeof buf, 0);
393: if (ret == -1) {
394: LOGERR;
395: ret = -8;
396: break;
397: }
398: if (ret != sizeof buf) {
399: rpc_SetErr(EPROCUNAVAIL, "Error:: in send BLOB reply, should be send %d bytes, "
400: "really is %d\n", sizeof buf, ret);
401: ret = -9;
402: if (s->srv_kill != kill && s->srv_blob.state != kill)
403: continue;
404: else
405: break;
406: }
407: } while (ret > -1 || s->srv_kill != kill);
408:
409: shutdown(c->cli_sock, SHUT_RDWR);
410: close(c->cli_sock);
411: memset(c, 0, sizeof(rpc_cli_t));
412: return (void*) ((long)ret);
413: }
414:
415: // -------------------------------------------------
416:
417: /*
418: * rpc_srv_initBLOBServer() - Init & create BLOB Server
419: *
420: * @srv = RPC server instance
421: * @Port = Port for bind server, if Port == 0 default port is selected
422: * @diskDir = Disk place for BLOB file objects
423: * return: -1 == error or 0 bind and created BLOB server instance
424: */
425: int
426: rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)
427: {
428: int n = 1;
429: io_sockaddr_t sa;
430:
431: FTRACE();
432:
433: if (!srv) {
434: rpc_SetErr(EINVAL, "Invalid parameters can`t init BLOB server");
435: return -1;
436: }
437: if (srv->srv_blob.state) {
438: rpc_SetErr(EPERM, "Already started BLOB server!");
439: return 0;
440: }
441:
442: memset(&srv->srv_blob, 0, sizeof srv->srv_blob);
443: if (access(diskDir, R_OK | W_OK) == -1) {
444: LOGERR;
445: return -1;
446: } else
447: AIT_SET_STR(&srv->srv_blob.dir, diskDir);
448:
449: srv->srv_blob.server.cli_tid = pthread_self();
450: srv->srv_blob.server.cli_parent = srv;
451:
452: memcpy(&sa, &srv->srv_server.cli_sa, sizeof sa);
453: switch (sa.sa.sa_family) {
454: case AF_INET:
455: sa.sin.sin_port = htons(Port ? Port : ntohs(sa.sin.sin_port) + 1);
456: break;
457: case AF_INET6:
458: sa.sin6.sin6_port = htons(Port ? Port : ntohs(sa.sin6.sin6_port) + 1);
459: break;
460: case AF_LOCAL:
461: strlcat(sa.sun.sun_path, ".blob", sizeof sa.sun.sun_path);
462: break;
463: default:
464: AIT_FREE_VAL(&srv->srv_blob.dir);
465: return -1;
466: }
467: memcpy(&srv->srv_blob.server.cli_sa, &sa, sizeof sa);
468:
469: /* create BLOB server socket */
470: srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);
471: if (srv->srv_blob.server.cli_sock == -1) {
472: LOGERR;
473: AIT_FREE_VAL(&srv->srv_blob.dir);
474: return -1;
475: }
476: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
477: LOGERR;
478: close(srv->srv_blob.server.cli_sock);
479: AIT_FREE_VAL(&srv->srv_blob.dir);
480: return -1;
481: }
482: n = srv->srv_netbuf;
483: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
484: LOGERR;
485: close(srv->srv_blob.server.cli_sock);
486: AIT_FREE_VAL(&srv->srv_blob.dir);
487: return -1;
488: }
489: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
490: LOGERR;
491: close(srv->srv_blob.server.cli_sock);
492: AIT_FREE_VAL(&srv->srv_blob.dir);
493: return -1;
494: }
495: if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa,
496: srv->srv_blob.server.cli_sa.sa.sa_len) == -1) {
497: LOGERR;
498: close(srv->srv_blob.server.cli_sock);
499: AIT_FREE_VAL(&srv->srv_blob.dir);
500: return -1;
501: }
502:
503: /* allocate pool for concurent clients */
504: srv->srv_blob.clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));
505: if (!srv->srv_blob.clients) {
506: LOGERR;
507: close(srv->srv_blob.server.cli_sock);
508: AIT_FREE_VAL(&srv->srv_blob.dir);
509: return -1;
510: } else
511: memset(srv->srv_blob.clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));
512:
513: pthread_mutex_init(&srv->srv_blob.mtx, NULL);
514:
515: rpc_srv_registerCall(srv, NULL, CALL_BLOBSHUTDOWN, 0);
516: rpc_srv_registerCall(srv, NULL, CALL_BLOBCLIENTS, 1);
517: rpc_srv_registerCall(srv, NULL, CALL_BLOBVARS, 1);
518: rpc_srv_registerCall(srv, NULL, CALL_BLOBSTATE, 0);
519:
520: srv->srv_blob.state = enable; /* enable BLOB */
521: return 0;
522: }
523:
524: /*
525: * rpc_srv_endBLOBServer() - Destroy BLOB server, close all opened sockets and free resources
526: *
527: * @srv = RPC Server instance
528: * return: none
529: */
530: void
531: rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
532: {
533: rpc_cli_t *c;
534: register int i;
535: rpc_blob_t *f;
536:
537: FTRACE();
538:
539: if (!srv) {
540: rpc_SetErr(EINVAL, "Can`t destroy server because parameter is null!");
541: return;
542: } else
543: srv->srv_blob.state = kill;
544:
545: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSHUTDOWN);
546: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBCLIENTS);
547: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBVARS);
548: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSTATE);
549:
550: AIT_FREE_VAL(&srv->srv_blob.dir);
551:
552: /* close all clients connections & server socket */
553: for (i = 0, c = srv->srv_blob.clients; i < srv->srv_numcli && c; i++, c++)
554: if (c->cli_sa.sa.sa_family)
555: shutdown(c->cli_sock, SHUT_RDWR);
556: close(srv->srv_blob.server.cli_sock);
557:
558: pthread_mutex_lock(&srv->srv_blob.mtx);
559: if (srv->srv_blob.clients) {
560: free(srv->srv_blob.clients);
561: srv->srv_blob.clients = NULL;
562: }
563:
564: /* detach blobs */
565: while ((f = srv->srv_blob.blobs)) {
566: srv->srv_blob.blobs = f->blob_next;
567: rpc_srv_blobFree(srv, f);
568: free(f);
569: }
570: pthread_mutex_unlock(&srv->srv_blob.mtx);
571:
572: while (pthread_mutex_trylock(&srv->srv_blob.mtx) == EBUSY);
573: pthread_mutex_destroy(&srv->srv_blob.mtx);
574: }
575:
576: /*
577: * rpc_srv_loopBLOB() - Execute Main BLOB server loop and wait for clients requests
578: *
579: * @srv = RPC Server instance
580: * return: -1 error or 0 ok, infinite loop ...
581: */
582: int
583: rpc_srv_loopBLOB(rpc_srv_t * __restrict srv)
584: {
585: socklen_t salen = sizeof(io_sockaddr_t);
586: register int i;
587: rpc_cli_t *c;
588: fd_set fds;
589: int ret;
590: struct timeval tv = { DEF_RPC_TIMEOUT, 0 };
591: pthread_attr_t attr;
592:
593: FTRACE();
594:
595: if (!srv || srv->srv_blob.state == kill) {
596: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start BLOB server ...\n");
597: return -1;
598: }
599:
600: tv.tv_sec = srv->srv_session.sess_timeout;
601:
602: if (listen(srv->srv_blob.server.cli_sock, SOMAXCONN) == -1) {
603: LOGERR;
604: return -1;
605: }
606:
607: pthread_attr_init(&attr);
608: pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
609:
610: /* main BLOB loop */
611: while (srv->srv_blob.state != kill && srv->srv_kill != kill) {
612: for (c = srv->srv_blob.clients, i = 0; i < srv->srv_numcli && c; i++, c++)
613: if (!c->cli_sa.sa.sa_family)
614: break;
615: if (i >= srv->srv_numcli) {
616: #ifdef HAVE_PTHREAD_YIELD
617: pthread_yield();
618: #else
619: usleep(1000000);
620: #endif
621: continue;
622: }
623:
624: FD_ZERO(&fds);
625: FD_SET(srv->srv_blob.server.cli_sock, &fds);
626: ret = select(srv->srv_blob.server.cli_sock + 1, &fds, NULL, NULL, &tv);
627: if (ret == -1) {
628: LOGERR;
629: ret = 1;
630: break;
631: }
632: if (!ret)
633: continue;
634:
635: c->cli_sock = accept(srv->srv_blob.server.cli_sock, &c->cli_sa.sa, &salen);
636: if (c->cli_sock == -1) {
637: LOGERR;
638: continue;
639: } else
640: c->cli_parent = srv;
641:
642: /* spawn dispatch thread for BLOB client */
643: if (pthread_create(&c->cli_tid, &attr, rpc_srv_dispatchVars, c)) {
644: LOGERR;
645: continue;
646: }
647: }
648:
649: srv->srv_blob.state = kill;
650:
651: pthread_attr_destroy(&attr);
652: return 0;
653: }
654:
655:
656: /*
657: * rpc_srv_initServer() - Init & create RPC Server
658: *
659: * @regProgID = ProgramID for authentication & recognition
660: * @regProcID = ProcessID for authentication & recognition
661: * @concurentClients = Concurent clients at same time to this server
662: * @netBuf = Network buffer length, if =0 == BUFSIZ (also meaning max RPC packet)
663: * @family = Family type, AF_INET, AF_INET6 or AF_LOCAL
664: * @csHost = Host name or address for bind server, if NULL any address
665: * @Port = Port for bind server, if Port == 0 default port is selected
666: * return: NULL == error or !=NULL bind and created RPC server instance
667: */
668: rpc_srv_t *
669: rpc_srv_initServer(u_int regProgID, u_int regProcID, int concurentClients,
670: int netBuf, u_short family, const char *csHost, u_short Port)
671: {
672: rpc_srv_t *srv = NULL;
673: int n = 1;
674: struct hostent *host = NULL;
675: io_sockaddr_t sa;
676:
677: FTRACE();
678:
679: if (!concurentClients || !regProgID ||
680: (family != AF_INET && family != AF_INET6 && family != AF_LOCAL)) {
681: rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t init RPC server ...\n");
682: return NULL;
683: }
684: if (!Port)
685: Port = RPC_DEFPORT;
686: if (!netBuf)
687: netBuf = BUFSIZ;
688: if (csHost && family != AF_LOCAL) {
689: host = gethostbyname2(csHost, family);
690: if (!host) {
691: rpc_SetErr(h_errno, "Error:: %s\n", hstrerror(h_errno));
692: return NULL;
693: }
694: }
695: memset(&sa, 0, sizeof sa);
696: sa.sa.sa_family = family;
697: switch (family) {
698: case AF_INET:
699: sa.sin.sin_len = sizeof(struct sockaddr_in);
700: sa.sin.sin_port = htons(Port);
701: if (csHost)
702: memcpy(&sa.sin.sin_addr, host->h_addr, host->h_length);
703: break;
704: case AF_INET6:
705: sa.sin6.sin6_len = sizeof(struct sockaddr_in6);
706: sa.sin6.sin6_port = htons(Port);
707: if (csHost)
708: memcpy(&sa.sin6.sin6_addr, host->h_addr, host->h_length);
709: break;
710: case AF_LOCAL:
711: sa.sun.sun_len = sizeof(struct sockaddr_un);
712: if (csHost)
713: strlcpy(sa.sun.sun_path, csHost, sizeof sa.sun.sun_path);
714: unlink(sa.sun.sun_path);
715: break;
716: default:
717: rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t start RPC server ...\n");
718: return NULL;
719: }
720:
721: srv = malloc(sizeof(rpc_srv_t));
722: if (!srv) {
723: LOGERR;
724: return NULL;
725: } else
726: memset(srv, 0, sizeof(rpc_srv_t));
727:
728: srv->srv_netbuf = netBuf;
729: srv->srv_numcli = concurentClients;
730: srv->srv_session.sess_version = RPC_VERSION;
731: srv->srv_session.sess_timeout = DEF_RPC_TIMEOUT;
732: srv->srv_session.sess_program = regProgID;
733: srv->srv_session.sess_process = regProcID;
734:
735: srv->srv_server.cli_tid = pthread_self();
736: srv->srv_server.cli_parent = srv;
737: memcpy(&srv->srv_server.cli_sa, &sa, sizeof sa);
738:
739: /* create server socket */
740: srv->srv_server.cli_sock = socket(family, SOCK_STREAM, 0);
741: if (srv->srv_server.cli_sock == -1) {
742: LOGERR;
743: free(srv);
744: return NULL;
745: }
746: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
747: LOGERR;
748: close(srv->srv_server.cli_sock);
749: free(srv);
750: return NULL;
751: }
752: n = srv->srv_netbuf;
753: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
754: LOGERR;
755: close(srv->srv_server.cli_sock);
756: free(srv);
757: return NULL;
758: }
759: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
760: LOGERR;
761: close(srv->srv_server.cli_sock);
762: free(srv);
763: return NULL;
764: }
765: if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa,
766: srv->srv_server.cli_sa.sa.sa_len) == -1) {
767: LOGERR;
768: close(srv->srv_server.cli_sock);
769: free(srv);
770: return NULL;
771: }
772:
773: /* allocate pool for concurent clients */
774: srv->srv_clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));
775: if (!srv->srv_clients) {
776: LOGERR;
777: close(srv->srv_server.cli_sock);
778: free(srv);
779: return NULL;
780: } else
781: memset(srv->srv_clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));
782:
783: pthread_mutex_init(&srv->srv_mtx, NULL);
784:
785: rpc_srv_registerCall(srv, NULL, CALL_SRVSHUTDOWN, 0);
786: rpc_srv_registerCall(srv, NULL, CALL_SRVCLIENTS, 1);
787: rpc_srv_registerCall(srv, NULL, CALL_SRVSESSIONS, 4);
788: rpc_srv_registerCall(srv, NULL, CALL_SRVCALLS, 1);
789: return srv;
790: }
791:
792: /*
793: * rpc_srv_endServer() - Destroy RPC server, close all opened sockets and free resources
794: *
795: * @psrv = RPC Server instance
796: * return: none
797: */
798: void
799: rpc_srv_endServer(rpc_srv_t ** __restrict psrv)
800: {
801: rpc_cli_t *c;
802: register int i;
803: rpc_func_t *f;
804:
805: FTRACE();
806:
807: if (!psrv || !*psrv) {
808: rpc_SetErr(EINVAL, "Error:: Can`t destroy server because parameter is null!\n");
809: return;
810: }
811:
812: rpc_srv_endBLOBServer(*psrv);
813:
814: /* close all clients connections & server socket */
815: for (i = 0, c = (*psrv)->srv_clients; i < (*psrv)->srv_numcli && c; i++, c++)
816: if (c->cli_sa.sa.sa_family) {
817: shutdown(c->cli_sock, SHUT_RDWR);
818: close(c->cli_sock);
819: }
820: close((*psrv)->srv_server.cli_sock);
821:
822: if ((*psrv)->srv_clients) {
823: free((*psrv)->srv_clients);
824: (*psrv)->srv_clients = NULL;
825: (*psrv)->srv_numcli = 0;
826: }
827:
828: /* detach exported calls */
829: pthread_mutex_lock(&(*psrv)->srv_mtx);
830: while ((f = (*psrv)->srv_funcs)) {
831: (*psrv)->srv_funcs = f->func_next;
832: io_freeVars(&f->func_vars);
833: AIT_FREE_VAL(&f->func_name);
834: AIT_FREE_VAL(&f->func_file);
835: free(f);
836: }
837: pthread_mutex_unlock(&(*psrv)->srv_mtx);
838:
839: while (pthread_mutex_trylock(&(*psrv)->srv_mtx) == EBUSY);
840: pthread_mutex_destroy(&(*psrv)->srv_mtx);
841:
842: free(*psrv);
843: *psrv = NULL;
844: }
845:
846: /*
847: * rpc_srv_loopServer() - Execute Main server loop and wait for clients requests
848: *
849: * @srv = RPC Server instance
850: * return: -1 error or 0 ok, infinite loop ...
851: */
852: int
853: rpc_srv_loopServer(rpc_srv_t * __restrict srv)
854: {
855: socklen_t salen = sizeof(io_sockaddr_t);
856: register int i;
857: rpc_cli_t *c;
858: fd_set fds;
859: int ret;
860: struct timeval tv = { DEF_RPC_TIMEOUT, 0 };
861: pthread_attr_t attr;
862:
863: FTRACE();
864:
865: if (!srv) {
866: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start RPC server ...\n");
867: return -1;
868: }
869:
870: tv.tv_sec = srv->srv_session.sess_timeout;
871:
872: /* activate BLOB server worker if srv->srv_blob.state == enable */
873: rpc_srv_execBLOBServer(srv);
874:
875: if (listen(srv->srv_server.cli_sock, SOMAXCONN) == -1) {
876: LOGERR;
877: return -1;
878: }
879:
880: pthread_attr_init(&attr);
881: pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
882:
883: /* main rpc loop */
884: while (srv->srv_kill != kill) {
885: for (c = srv->srv_clients, i = 0; i < srv->srv_numcli && c; i++, c++)
886: if (!c->cli_sa.sa.sa_family)
887: break;
888: if (i >= srv->srv_numcli) {
889: #ifdef HAVE_PTHREAD_YIELD
890: pthread_yield();
891: #else
892: usleep(1000000);
893: #endif
894: continue;
895: }
896:
897: FD_ZERO(&fds);
898: FD_SET(srv->srv_server.cli_sock, &fds);
899: ret = select(srv->srv_server.cli_sock + 1, &fds, NULL, NULL, &tv);
900: if (ret == -1) {
901: LOGERR;
902: ret = 1;
903: break;
904: }
905: if (!ret)
906: continue;
907:
908: c->cli_sock = accept(srv->srv_server.cli_sock, &c->cli_sa.sa, &salen);
909: if (c->cli_sock == -1) {
910: LOGERR;
911: continue;
912: } else
913: c->cli_parent = srv;
914:
915: /* spawn rpc client dispatcher */
916: if (pthread_create(&c->cli_tid, &attr, rpc_srv_dispatchCall, c)) {
917: LOGERR;
918: continue;
919: }
920: }
921:
922: pthread_attr_destroy(&attr);
923: return 0;
924: }
925:
926: // ---------------------------------------------------------
927:
928: /*
929: * rpc_srv_execCall() Execute registered call from RPC server
930: *
931: * @call = Register RPC call
932: * @rpc = IN RPC call structure
933: * @args = IN RPC calling arguments from RPC client
934: * return: -1 error, !=-1 ok
935: */
936: int
937: rpc_srv_execCall(rpc_func_t * __restrict call, struct tagRPCCall * __restrict rpc,
938: array_t * __restrict args)
939: {
940: void *dl;
941: rpc_callback_t func;
942: int ret;
943:
944: FTRACE();
945:
946: if (!call || !rpc || !call->func_parent) {
947: rpc_SetErr(EINVAL, "Invalid parameter can`t exec function");
948: return -1;
949: }
950:
951: dl = dlopen(AIT_VOID(&call->func_file), RTLD_NOW);
952: if (!dl) {
953: rpc_SetErr(ENOENT, "Can`t attach module %s!", dlerror());
954: return -1;
955: }
956:
957: func = dlsym(dl, (const char*) AIT_GET_STR(&call->func_name));
958: if (func)
959: ret = func(call, ntohs(rpc->call_argc), args);
960: else {
961: rpc_SetErr(ENOEXEC, "Can`t find function %s!", dlerror());
962: ret = -1;
963: }
964:
965: dlclose(dl);
966: return ret;
967: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>