Annotation of libaitrpc/src/srv.c, revision 1.7.2.3
1.1 misho 1: /*************************************************************************
2: * (C) 2010 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.7.2.3 ! misho 6: * $Id: srv.c,v 1.7.2.2 2012/03/28 13:37:13 misho Exp $
1.1 misho 7: *
1.2 misho 8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
1.7 misho 15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
1.2 misho 16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
1.1 misho 46: #include "global.h"
47:
48:
1.7 misho 49: static void *rxPacket(sched_task_t*);
50: static void *rxBLOB(sched_task_t*);
51:
52: static void *
53: txPacket(sched_task_t *task)
54: {
55: rpc_cli_t *c = TASK_ARG(task);
56: rpc_srv_t *s = c->cli_parent;
57: rpc_func_t *f = NULL;
58: u_char *buf = TASK_DATA(task);
59: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
60: int ret, wlen = sizeof(struct tagRPCCall);
61: array_t *arr = NULL;
62:
63: ioTRACE(RPC_TRACE_LEVEL);
64:
65: if (rpc->call_argc) {
66: f = rpc_srv_getCall(s, ntohs(rpc->call_tag), ntohl(rpc->call_hash));
67: if (!f) {
68: rpc->call_argc ^= rpc->call_argc;
69: rpc->call_rep.ret = RPC_ERROR(-1);
70: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
71: } else {
72: rpc->call_argc = htons(rpc_srv_getVars(f, &arr));
73: /* Go Encapsulate variables */
74: ret = io_vars2buffer(buf + wlen, TASK_DATLEN(task) - wlen, arr);
75: io_clrVars(f->func_vars);
76: if (ret == -1) {
77: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
78: rpc->call_argc ^= rpc->call_argc;
79: rpc->call_rep.ret = RPC_ERROR(-1);
80: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
81: } else
82: wlen += ret;
83: }
84: }
85:
1.7.2.3 ! misho 86: rpc->call_len = htons(wlen);
! 87:
1.7 misho 88: /* calculate CRC */
89: rpc->call_crc ^= rpc->call_crc;
90: rpc->call_crc = htons(crcFletcher16((u_short*) buf, io_align(wlen, 1) / 2));
91:
92: /* send reply */
93: ret = send(TASK_FD(task), buf, wlen, 0);
94: if (ret == -1)
95: LOGERR;
96: else if (ret != wlen)
97: rpc_SetErr(EPROCUNAVAIL, "RPC reply, should be send %d bytes, "
98: "really sended %d bytes", wlen, ret);
99: else
100: ioDEBUG(RPC_DEBUG_LEVEL, "Sended %d bytes", ret);
101:
102: return NULL;
103: }
104:
105: static void *
106: execCall(sched_task_t *task)
107: {
108: rpc_cli_t *c = TASK_ARG(task);
109: rpc_srv_t *s = c->cli_parent;
110: rpc_func_t *f = NULL;
111: array_t *arr = NULL;
1.7.2.3 ! misho 112: u_char *buf = TASK_DATA(task) + TASK_VAL(task);
1.7 misho 113: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
114: int argc = ntohs(rpc->call_argc);
115:
116: ioTRACE(RPC_TRACE_LEVEL);
117:
118: /* Go decapsulate variables ... */
1.7.2.1 misho 119: if (argc) {
1.7 misho 120: arr = io_buffer2vars(buf + sizeof(struct tagRPCCall),
1.7.2.3 ! misho 121: TASK_DATLEN(task) - TASK_VAL(task) - sizeof(struct tagRPCCall),
! 122: argc, 1);
1.7 misho 123: if (!arr) {
124: rpc_SetErr(ERPCMISMATCH, "#%d - %s", io_GetErrno(), io_GetError());
125: rpc->call_argc ^= rpc->call_argc;
126: rpc->call_rep.ret = RPC_ERROR(-1);
127: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
128: return NULL;
129: }
130: }
131:
132: if (!(f = rpc_srv_getCall(s, ntohs(rpc->call_tag), ntohl(rpc->call_hash)))) {
133: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
134: rpc->call_argc ^= rpc->call_argc;
135: rpc->call_rep.ret = RPC_ERROR(-1);
136: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
137: } else {
138: ioDEBUG(RPC_DEBUG_LEVEL, "RPC function %s from module %s",
139: AIT_GET_STR(&f->func_name), AIT_GET_LIKE(&f->func_file, char*));
140:
1.7.2.1 misho 141: /* if client doesn't want reply */
142: argc = rpc->call_req.flags & RPC_NOREPLY;
1.7 misho 143: rpc->call_rep.ret = RPC_ERROR(rpc_srv_execCall(f, rpc, arr));
144: if (rpc->call_rep.ret == htonl(-1)) {
145: rpc->call_rep.eno = RPC_ERROR(errno);
146: rpc->call_argc ^= rpc->call_argc;
147: } else {
148: rpc->call_rep.eno ^= rpc->call_rep.eno;
1.7.2.1 misho 149: if (argc) {
150: /* without reply */
151: io_clrVars(f->func_vars);
152: rpc->call_argc ^= rpc->call_argc;
153: } else
154: rpc->call_argc = htons(rpc_srv_getVars(f, NULL));
1.7 misho 155: }
156: }
157:
158: if (arr)
159: io_arrayDestroy(&arr);
160: return NULL;
161: }
162:
163: static void *
164: rxPacket(sched_task_t *task)
165: {
166: rpc_cli_t *c = TASK_ARG(task);
167: rpc_srv_t *s = c->cli_parent;
168: u_char *buf = TASK_DATA(task);
1.7.2.3 ! misho 169: int rlen, noreply;
! 170: u_short crc, off = 0;
1.7 misho 171: struct tagRPCCall *rpc;
172: struct timespec ts;
173:
174: ioTRACE(RPC_TRACE_LEVEL);
175:
176: memset(buf, 0, TASK_DATLEN(task));
177: rlen = recv(TASK_FD(task), buf, TASK_DATLEN(task), 0);
178: if (rlen == -1) {
179: LOGERR;
180: s->srv_kill = s->srv_blob.state = kill;
181: return NULL;
182: } else if (!rlen) { /* receive EOF */
183: s->srv_kill = s->srv_blob.state = kill;
184: return NULL;
185: } else
186: ioDEBUG(RPC_DEBUG_LEVEL, "Readed %d bytes", rlen);
187:
1.7.2.3 ! misho 188: do {
! 189: if (rlen < sizeof(struct tagRPCCall)) {
! 190: rpc_SetErr(ERPCMISMATCH, "Too short RPC packet");
1.7 misho 191:
1.7.2.3 ! misho 192: schedRead(TASK_ROOT(task), rxPacket, TASK_ARG(task), TASK_FD(task),
! 193: TASK_DATA(task), TASK_DATLEN(task));
! 194: return NULL;
! 195: } else
! 196: rpc = (struct tagRPCCall*) (buf + off);
1.7 misho 197:
1.7.2.3 ! misho 198: rlen -= ntohs(rpc->call_len);
1.7 misho 199:
1.7.2.3 ! misho 200: /* check integrity of packet */
! 201: crc = ntohs(rpc->call_crc);
! 202: rpc->call_crc ^= rpc->call_crc;
! 203: if (crc != crcFletcher16((u_short*) (buf + off),
! 204: io_align(ntohs(rpc->call_len), 1) / 2)) {
! 205: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
1.7 misho 206:
1.7.2.3 ! misho 207: off += ntohs(rpc->call_len);
! 208: if (rlen < 1)
! 209: break;
! 210: else
! 211: continue;
! 212: }
1.7 misho 213:
1.7.2.3 ! misho 214: noreply = rpc->call_req.flags & RPC_NOREPLY;
! 215:
! 216: /* check RPC packet session info */
! 217: if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) {
! 218: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
! 219: rpc->call_argc ^= rpc->call_argc;
! 220: rpc->call_rep.ret = RPC_ERROR(-1);
! 221: rpc->call_rep.eno = RPC_ERROR(errno);
! 222: } else {
! 223: /* change socket timeout from last packet */
! 224: ts.tv_sec = rpc->call_session.sess_timeout;
! 225: ts.tv_nsec = 0;
! 226: schedPolling(TASK_ROOT(task), &ts, NULL);
! 227:
! 228: /* execute RPC call */
! 229: schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), off,
! 230: TASK_DATA(task), TASK_DATLEN(task));
! 231: }
! 232:
! 233: /* send RPC reply */
! 234: if (!noreply)
! 235: schedWrite(TASK_ROOT(task), txPacket, TASK_ARG(task), TASK_FD(task),
! 236: buf + off, TASK_DATLEN(task) - off);
! 237:
! 238: off += ntohs(rpc->call_len);
! 239: } while (rlen > 0);
1.7 misho 240:
241: /* lets get next packet */
242: schedRead(TASK_ROOT(task), rxPacket, TASK_ARG(task), TASK_FD(task),
243: TASK_DATA(task), TASK_DATLEN(task));
244: return NULL;
245: }
246:
1.1 misho 247: static void *
248: rpc_srv_dispatchCall(void *arg)
249: {
250: rpc_cli_t *c = arg;
251: rpc_srv_t *s;
1.5 misho 252: u_char *buf;
1.7 misho 253: sched_root_task_t *root;
254: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
255:
256: ioTRACE(RPC_TRACE_LEVEL);
1.1 misho 257:
258: if (!arg) {
1.7 misho 259: rpc_SetErr(EINVAL, "Invalid parameter can`t procced RPC client");
1.1 misho 260: return NULL;
261: } else
262: s = c->cli_parent;
263:
1.7 misho 264: /* allocate net buffer */
1.5 misho 265: buf = malloc(s->srv_netbuf);
266: if (!buf) {
267: LOGERR;
268: return NULL;
269: }
270:
1.7 misho 271: root = schedBegin();
272: if (!root) {
273: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
274: free(buf);
275: return NULL;
276: } else {
277: schedTermCondition(root, kill);
278: schedPolling(root, &ts, NULL);
279: }
280:
281: schedRead(root, rxPacket, c, c->cli_sock, buf, s->srv_netbuf);
282:
283: schedRun(root, (void*) &s->srv_kill);
284: schedEnd(&root);
285:
286: shutdown(c->cli_sock, SHUT_RDWR);
287: close(c->cli_sock);
288: memset(c, 0, sizeof(rpc_cli_t));
289: free(buf);
290: return NULL;
291: }
292:
293:
294: static void *
295: txBLOB(sched_task_t *task)
296: {
297: u_char *buf = TASK_DATA(task);
298: struct tagBLOBHdr *blob = (struct tagBLOBHdr *) buf;
299: int wlen = sizeof(struct tagBLOBHdr);
300:
301: ioTRACE(RPC_TRACE_LEVEL);
302:
303: /* calculate CRC */
304: blob->hdr_crc ^= blob->hdr_crc;
305: blob->hdr_crc = htons(crcFletcher16((u_short*) buf, io_align(wlen, 1) / 2));
306:
307: /* send reply */
308: wlen = send(TASK_FD(task), buf, wlen, 0);
309: if (wlen == -1)
310: LOGERR;
311: else if (wlen != sizeof(struct tagBLOBHdr))
312: rpc_SetErr(EPROCUNAVAIL, "RPC reply, should be send %d bytes, "
313: "really sended %d bytes", sizeof(struct tagBLOBHdr), wlen);
314: else
315: ioDEBUG(RPC_DEBUG_LEVEL, "Sended %d bytes", wlen);
316:
317: return NULL;
318: }
1.4 misho 319:
1.7 misho 320: static void *
321: rxBLOB(sched_task_t *task)
322: {
323: rpc_cli_t *c = TASK_ARG(task);
324: rpc_srv_t *s = c->cli_parent;
325: rpc_blob_t *b;
326: u_char *buf = TASK_DATA(task);
327: struct tagBLOBHdr *blob = (struct tagBLOBHdr *) buf;
328: int rlen;
329: u_short crc;
330: struct timespec ts;
331:
332: ioTRACE(RPC_TRACE_LEVEL);
333:
334: /* check for disable service at this moment? */
1.7.2.1 misho 335: if (!s || s->srv_blob.state == disable) {
1.7 misho 336: usleep(100000);
337: #ifdef HAVE_PTHREAD_YIELD
338: pthread_yield();
339: #endif
340: schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task),
341: TASK_DATA(task), TASK_DATLEN(task));
342: return NULL;
343: }
1.5 misho 344:
1.7 misho 345: memset(buf, 0, TASK_DATLEN(task));
346: rlen = recv(TASK_FD(task), buf, TASK_DATLEN(task), 0);
347: if (rlen == -1) {
348: LOGERR;
349: s->srv_blob.state = kill;
350: return NULL;
351: } else if (!rlen || s->srv_kill == kill) { /* receive EOF */
352: s->srv_blob.state = kill;
353: return NULL;
354: } else
355: ioDEBUG(RPC_DEBUG_LEVEL, "Readed %d bytes", rlen);
1.6 misho 356:
1.7 misho 357: if (rlen < sizeof(struct tagBLOBHdr)) {
358: rpc_SetErr(ERPCMISMATCH, "Too short BLOB packet");
359: schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task),
360: TASK_DATA(task), TASK_DATLEN(task));
361: return NULL;
362: }
1.1 misho 363:
1.7 misho 364: /* check integrity of packet */
365: crc = ntohs(blob->hdr_crc);
366: blob->hdr_crc ^= blob->hdr_crc;
367: if (crc != crcFletcher16((u_short*) buf, io_align(rlen, 1) / 2)) {
368: rpc_SetErr(ERPCMISMATCH, "Bad CRC BLOB packet");
369: schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task),
370: TASK_DATA(task), TASK_DATLEN(task));
371: return NULL;
372: }
1.5 misho 373:
1.7 misho 374: /* check RPC packet session info */
375: if ((crc = rpc_chkPktSession(&blob->hdr_session, &s->srv_session))) {
376: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
377: blob->hdr_cmd = error;
378: goto end;
379: } else {
380: /* change socket timeout from last packet */
381: ts.tv_sec = blob->hdr_session.sess_timeout;
382: ts.tv_nsec = 0;
383: schedPolling(TASK_ROOT(task), &ts, NULL);
384: }
385:
386: /* Go to proceed packet ... */
387: switch (blob->hdr_cmd) {
388: case get:
389: if (!(b = rpc_srv_getBLOB(s, ntohl(blob->hdr_var)))) {
390: rpc_SetErr(EINVAL, "Var=%x not found", ntohl(blob->hdr_var));
391: blob->hdr_cmd = no;
392: blob->hdr_ret = RPC_ERROR(-1);
393: break;
394: } else
395: blob->hdr_len = htonl(b->blob_len);
1.5 misho 396:
1.7 misho 397: if (rpc_srv_blobMap(s, b) != -1) {
398: /* deliver BLOB variable to client */
399: blob->hdr_ret = htonl(rpc_srv_sendBLOB(c, b));
400: rpc_srv_blobUnmap(b);
401: } else {
402: blob->hdr_cmd = error;
403: blob->hdr_ret = RPC_ERROR(-1);
404: }
405: break;
406: case set:
407: if ((b = rpc_srv_registerBLOB(s, ntohl(blob->hdr_len)))) {
408: /* set new BLOB variable for reply :) */
409: blob->hdr_var = htonl(b->blob_var);
410:
411: /* receive BLOB from client */
412: blob->hdr_ret = htonl(rpc_srv_recvBLOB(c, b));
413: rpc_srv_blobUnmap(b);
1.5 misho 414: } else {
1.7 misho 415: blob->hdr_cmd = error;
416: blob->hdr_ret = RPC_ERROR(-1);
417: }
418: break;
419: case unset:
420: if (rpc_srv_unregisterBLOB(s, blob->hdr_var) == -1) {
421: blob->hdr_cmd = error;
422: blob->hdr_ret = RPC_ERROR(-1);
1.1 misho 423: }
424: break;
1.7 misho 425: default:
426: rpc_SetErr(EPROCUNAVAIL, "Unsupported BLOB command %d", blob->hdr_cmd);
427: blob->hdr_cmd = error;
428: blob->hdr_ret = RPC_ERROR(-1);
429: }
1.1 misho 430:
1.7 misho 431: end:
432: schedWrite(TASK_ROOT(task), txBLOB, TASK_ARG(task), TASK_FD(task),
433: TASK_DATA(task), TASK_DATLEN(task));
434: schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task),
435: TASK_DATA(task), TASK_DATLEN(task));
436: return NULL;
1.2 misho 437: }
438:
439: static void *
440: rpc_srv_dispatchVars(void *arg)
441: {
442: rpc_cli_t *c = arg;
443: rpc_srv_t *s;
1.7 misho 444: sched_root_task_t *root;
445: u_char *buf;
446: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
447:
448: ioTRACE(RPC_TRACE_LEVEL);
1.2 misho 449:
450: if (!arg) {
1.7 misho 451: rpc_SetErr(EINVAL, "Invalid parameter can`t procced BLOB");
1.2 misho 452: return NULL;
453: } else
454: s = c->cli_parent;
455:
1.7 misho 456: /* allocate net buffer */
457: buf = malloc(sizeof(struct tagBLOBHdr));
458: if (!buf) {
459: LOGERR;
460: return NULL;
461: }
1.2 misho 462:
1.7 misho 463: root = schedBegin();
464: if (!root) {
465: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
466: free(buf);
467: return NULL;
468: } else {
469: schedTermCondition(root, kill);
470: schedPolling(root, &ts, NULL);
471: }
1.4 misho 472:
1.7 misho 473: schedRead(root, rxBLOB, c, c->cli_sock, buf, sizeof(struct tagBLOBHdr));
1.2 misho 474:
1.7 misho 475: schedRun(root, (void*) &s->srv_blob.state);
476: schedEnd(&root);
1.2 misho 477:
478: shutdown(c->cli_sock, SHUT_RDWR);
479: close(c->cli_sock);
480: memset(c, 0, sizeof(rpc_cli_t));
1.7 misho 481: free(buf);
482: return NULL;
1.1 misho 483: }
484:
485: // -------------------------------------------------
486:
487: /*
1.7 misho 488: * rpc_srv_initBLOBServer() - Init & create BLOB Server
489: *
1.4 misho 490: * @srv = RPC server instance
1.2 misho 491: * @Port = Port for bind server, if Port == 0 default port is selected
492: * @diskDir = Disk place for BLOB file objects
493: * return: -1 == error or 0 bind and created BLOB server instance
494: */
495: int
496: rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)
497: {
498: int n = 1;
1.6 misho 499: io_sockaddr_t sa;
1.2 misho 500:
1.7 misho 501: ioTRACE(RPC_TRACE_LEVEL);
502:
1.2 misho 503: if (!srv) {
1.7 misho 504: rpc_SetErr(EINVAL, "Invalid parameters can`t init BLOB server");
1.2 misho 505: return -1;
506: }
507: if (srv->srv_blob.state) {
1.7 misho 508: rpc_SetErr(EPERM, "Already started BLOB server!");
1.2 misho 509: return 0;
510: }
511:
512: memset(&srv->srv_blob, 0, sizeof srv->srv_blob);
513: if (access(diskDir, R_OK | W_OK) == -1) {
514: LOGERR;
515: return -1;
516: } else
1.7 misho 517: AIT_SET_STR(&srv->srv_blob.dir, diskDir);
1.2 misho 518:
519: srv->srv_blob.server.cli_tid = pthread_self();
520: srv->srv_blob.server.cli_parent = srv;
1.4 misho 521:
522: memcpy(&sa, &srv->srv_server.cli_sa, sizeof sa);
1.6 misho 523: switch (sa.sa.sa_family) {
1.4 misho 524: case AF_INET:
1.6 misho 525: sa.sin.sin_port = htons(Port ? Port : ntohs(sa.sin.sin_port) + 1);
1.4 misho 526: break;
527: case AF_INET6:
1.6 misho 528: sa.sin6.sin6_port = htons(Port ? Port : ntohs(sa.sin6.sin6_port) + 1);
1.4 misho 529: break;
530: case AF_LOCAL:
1.6 misho 531: strlcat(sa.sun.sun_path, ".blob", sizeof sa.sun.sun_path);
1.4 misho 532: break;
533: default:
1.7 misho 534: AIT_FREE_VAL(&srv->srv_blob.dir);
1.4 misho 535: return -1;
1.2 misho 536: }
1.6 misho 537: memcpy(&srv->srv_blob.server.cli_sa, &sa, sizeof sa);
1.2 misho 538:
1.4 misho 539: /* create BLOB server socket */
1.6 misho 540: srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);
1.2 misho 541: if (srv->srv_blob.server.cli_sock == -1) {
542: LOGERR;
1.7 misho 543: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 544: return -1;
545: }
546: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
547: LOGERR;
548: close(srv->srv_blob.server.cli_sock);
1.7 misho 549: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 550: return -1;
551: }
1.5 misho 552: n = srv->srv_netbuf;
553: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
554: LOGERR;
555: close(srv->srv_blob.server.cli_sock);
1.7 misho 556: AIT_FREE_VAL(&srv->srv_blob.dir);
1.5 misho 557: return -1;
558: }
559: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
560: LOGERR;
561: close(srv->srv_blob.server.cli_sock);
1.7 misho 562: AIT_FREE_VAL(&srv->srv_blob.dir);
1.5 misho 563: return -1;
564: }
1.6 misho 565: if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa,
566: srv->srv_blob.server.cli_sa.sa.sa_len) == -1) {
1.2 misho 567: LOGERR;
568: close(srv->srv_blob.server.cli_sock);
1.7 misho 569: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 570: return -1;
571: }
572:
1.4 misho 573: /* allocate pool for concurent clients */
1.2 misho 574: srv->srv_blob.clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));
575: if (!srv->srv_blob.clients) {
576: LOGERR;
577: close(srv->srv_blob.server.cli_sock);
1.7 misho 578: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 579: return -1;
580: } else
581: memset(srv->srv_blob.clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));
582:
583: pthread_mutex_init(&srv->srv_blob.mtx, NULL);
584:
585: rpc_srv_registerCall(srv, NULL, CALL_BLOBSHUTDOWN, 0);
1.5 misho 586: rpc_srv_registerCall(srv, NULL, CALL_BLOBCLIENTS, 1);
587: rpc_srv_registerCall(srv, NULL, CALL_BLOBVARS, 1);
588: rpc_srv_registerCall(srv, NULL, CALL_BLOBSTATE, 0);
1.2 misho 589:
1.4 misho 590: srv->srv_blob.state = enable; /* enable BLOB */
1.2 misho 591: return 0;
592: }
593:
594: /*
1.7 misho 595: * rpc_srv_endBLOBServer() - Destroy BLOB server, close all opened sockets and free resources
596: *
1.2 misho 597: * @srv = RPC Server instance
598: * return: none
599: */
600: void
601: rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
602: {
603: rpc_cli_t *c;
604: register int i;
605: rpc_blob_t *f;
606:
1.7 misho 607: ioTRACE(RPC_TRACE_LEVEL);
608:
1.7.2.2 misho 609: if (!srv || srv->srv_blob.state == disable)
1.2 misho 610: return;
1.7.2.2 misho 611: else
1.4 misho 612: srv->srv_blob.state = kill;
1.2 misho 613:
614: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSHUTDOWN);
615: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBCLIENTS);
616: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBVARS);
617: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSTATE);
618:
1.4 misho 619: /* close all clients connections & server socket */
1.2 misho 620: for (i = 0, c = srv->srv_blob.clients; i < srv->srv_numcli && c; i++, c++)
1.6 misho 621: if (c->cli_sa.sa.sa_family)
1.2 misho 622: shutdown(c->cli_sock, SHUT_RDWR);
623: close(srv->srv_blob.server.cli_sock);
624:
1.5 misho 625: pthread_mutex_lock(&srv->srv_blob.mtx);
1.2 misho 626: if (srv->srv_blob.clients) {
627: free(srv->srv_blob.clients);
628: srv->srv_blob.clients = NULL;
629: }
630:
1.4 misho 631: /* detach blobs */
1.2 misho 632: while ((f = srv->srv_blob.blobs)) {
633: srv->srv_blob.blobs = f->blob_next;
634: rpc_srv_blobFree(srv, f);
635: free(f);
636: }
637: pthread_mutex_unlock(&srv->srv_blob.mtx);
638:
1.7 misho 639: AIT_FREE_VAL(&srv->srv_blob.dir);
640:
1.2 misho 641: while (pthread_mutex_trylock(&srv->srv_blob.mtx) == EBUSY);
642: pthread_mutex_destroy(&srv->srv_blob.mtx);
1.7.2.2 misho 643:
644: /* at final, save disable BLOB service state */
645: srv->srv_blob.state = disable;
1.2 misho 646: }
647:
648: /*
1.7 misho 649: * rpc_srv_loopBLOB() - Execute Main BLOB server loop and wait for clients requests
650: *
1.2 misho 651: * @srv = RPC Server instance
652: * return: -1 error or 0 ok, infinite loop ...
653: */
654: int
1.5 misho 655: rpc_srv_loopBLOB(rpc_srv_t * __restrict srv)
1.2 misho 656: {
1.6 misho 657: socklen_t salen = sizeof(io_sockaddr_t);
1.2 misho 658: register int i;
659: rpc_cli_t *c;
660: fd_set fds;
661: int ret;
662: struct timeval tv = { DEF_RPC_TIMEOUT, 0 };
1.7 misho 663: pthread_attr_t attr;
664:
665: ioTRACE(RPC_TRACE_LEVEL);
1.2 misho 666:
1.4 misho 667: if (!srv || srv->srv_blob.state == kill) {
1.7 misho 668: rpc_SetErr(EINVAL, "Invalid parameter can`t start BLOB server");
1.2 misho 669: return -1;
670: }
671:
1.7 misho 672: tv.tv_sec = srv->srv_session.sess_timeout;
673:
1.2 misho 674: if (listen(srv->srv_blob.server.cli_sock, SOMAXCONN) == -1) {
675: LOGERR;
676: return -1;
677: }
678:
1.7 misho 679: pthread_attr_init(&attr);
680: pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
681:
682: /* main BLOB loop */
1.4 misho 683: while (srv->srv_blob.state != kill && srv->srv_kill != kill) {
1.2 misho 684: for (c = srv->srv_blob.clients, i = 0; i < srv->srv_numcli && c; i++, c++)
1.6 misho 685: if (!c->cli_sa.sa.sa_family)
1.2 misho 686: break;
687: if (i >= srv->srv_numcli) {
1.5 misho 688: #ifdef HAVE_PTHREAD_YIELD
689: pthread_yield();
690: #else
1.2 misho 691: usleep(1000000);
1.5 misho 692: #endif
1.2 misho 693: continue;
694: }
695:
696: FD_ZERO(&fds);
697: FD_SET(srv->srv_blob.server.cli_sock, &fds);
698: ret = select(srv->srv_blob.server.cli_sock + 1, &fds, NULL, NULL, &tv);
699: if (ret == -1) {
700: LOGERR;
701: ret = 1;
702: break;
703: }
704: if (!ret)
705: continue;
706:
1.6 misho 707: c->cli_sock = accept(srv->srv_blob.server.cli_sock, &c->cli_sa.sa, &salen);
1.2 misho 708: if (c->cli_sock == -1) {
709: LOGERR;
710: continue;
711: } else
712: c->cli_parent = srv;
713:
1.7 misho 714: /* spawn dispatch thread for BLOB client */
715: if (pthread_create(&c->cli_tid, &attr, rpc_srv_dispatchVars, c)) {
1.2 misho 716: LOGERR;
717: continue;
1.7 misho 718: }
1.2 misho 719: }
720:
1.5 misho 721: srv->srv_blob.state = kill;
1.2 misho 722:
1.7 misho 723: pthread_attr_destroy(&attr);
1.2 misho 724: return 0;
725: }
726:
727:
728: /*
1.7 misho 729: * rpc_srv_initServer() - Init & create RPC Server
730: *
1.1 misho 731: * @regProgID = ProgramID for authentication & recognition
732: * @regProcID = ProcessID for authentication & recognition
733: * @concurentClients = Concurent clients at same time to this server
1.5 misho 734: * @netBuf = Network buffer length, if =0 == BUFSIZ (also meaning max RPC packet)
1.4 misho 735: * @family = Family type, AF_INET, AF_INET6 or AF_LOCAL
736: * @csHost = Host name or address for bind server, if NULL any address
1.1 misho 737: * @Port = Port for bind server, if Port == 0 default port is selected
738: * return: NULL == error or !=NULL bind and created RPC server instance
739: */
740: rpc_srv_t *
741: rpc_srv_initServer(u_int regProgID, u_int regProcID, int concurentClients,
1.5 misho 742: int netBuf, u_short family, const char *csHost, u_short Port)
1.1 misho 743: {
744: rpc_srv_t *srv = NULL;
745: int n = 1;
746: struct hostent *host = NULL;
1.6 misho 747: io_sockaddr_t sa;
1.1 misho 748:
1.7 misho 749: ioTRACE(RPC_TRACE_LEVEL);
750:
1.4 misho 751: if (!concurentClients || !regProgID ||
752: (family != AF_INET && family != AF_INET6 && family != AF_LOCAL)) {
1.1 misho 753: rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t init RPC server ...\n");
754: return NULL;
755: }
756: if (!Port)
757: Port = RPC_DEFPORT;
1.5 misho 758: if (!netBuf)
759: netBuf = BUFSIZ;
1.7 misho 760: else
761: netBuf = io_align(netBuf, 1); /* align netBuf length */
1.4 misho 762: if (csHost && family != AF_LOCAL) {
1.1 misho 763: host = gethostbyname2(csHost, family);
764: if (!host) {
765: rpc_SetErr(h_errno, "Error:: %s\n", hstrerror(h_errno));
766: return NULL;
767: }
768: }
1.4 misho 769: memset(&sa, 0, sizeof sa);
1.6 misho 770: sa.sa.sa_family = family;
1.1 misho 771: switch (family) {
772: case AF_INET:
1.6 misho 773: sa.sin.sin_len = sizeof(struct sockaddr_in);
774: sa.sin.sin_port = htons(Port);
1.1 misho 775: if (csHost)
1.6 misho 776: memcpy(&sa.sin.sin_addr, host->h_addr, host->h_length);
1.1 misho 777: break;
778: case AF_INET6:
1.6 misho 779: sa.sin6.sin6_len = sizeof(struct sockaddr_in6);
780: sa.sin6.sin6_port = htons(Port);
1.4 misho 781: if (csHost)
1.6 misho 782: memcpy(&sa.sin6.sin6_addr, host->h_addr, host->h_length);
1.4 misho 783: break;
784: case AF_LOCAL:
1.6 misho 785: sa.sun.sun_len = sizeof(struct sockaddr_un);
1.1 misho 786: if (csHost)
1.6 misho 787: strlcpy(sa.sun.sun_path, csHost, sizeof sa.sun.sun_path);
788: unlink(sa.sun.sun_path);
1.1 misho 789: break;
790: default:
791: rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t start RPC server ...\n");
792: return NULL;
793: }
794:
795: srv = malloc(sizeof(rpc_srv_t));
796: if (!srv) {
797: LOGERR;
798: return NULL;
799: } else
800: memset(srv, 0, sizeof(rpc_srv_t));
801:
1.5 misho 802: srv->srv_netbuf = netBuf;
1.1 misho 803: srv->srv_numcli = concurentClients;
804: srv->srv_session.sess_version = RPC_VERSION;
1.7 misho 805: srv->srv_session.sess_timeout = DEF_RPC_TIMEOUT;
1.1 misho 806: srv->srv_session.sess_program = regProgID;
807: srv->srv_session.sess_process = regProcID;
808:
1.2 misho 809: srv->srv_server.cli_tid = pthread_self();
1.1 misho 810: srv->srv_server.cli_parent = srv;
1.6 misho 811: memcpy(&srv->srv_server.cli_sa, &sa, sizeof sa);
1.4 misho 812:
813: /* create server socket */
1.1 misho 814: srv->srv_server.cli_sock = socket(family, SOCK_STREAM, 0);
815: if (srv->srv_server.cli_sock == -1) {
816: LOGERR;
817: free(srv);
818: return NULL;
819: }
820: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
821: LOGERR;
822: close(srv->srv_server.cli_sock);
823: free(srv);
824: return NULL;
825: }
1.5 misho 826: n = srv->srv_netbuf;
827: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
828: LOGERR;
829: close(srv->srv_server.cli_sock);
830: free(srv);
831: return NULL;
832: }
833: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
834: LOGERR;
835: close(srv->srv_server.cli_sock);
836: free(srv);
837: return NULL;
838: }
1.6 misho 839: if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa,
840: srv->srv_server.cli_sa.sa.sa_len) == -1) {
1.1 misho 841: LOGERR;
842: close(srv->srv_server.cli_sock);
843: free(srv);
844: return NULL;
845: }
846:
1.4 misho 847: /* allocate pool for concurent clients */
1.1 misho 848: srv->srv_clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));
849: if (!srv->srv_clients) {
850: LOGERR;
851: close(srv->srv_server.cli_sock);
852: free(srv);
853: return NULL;
854: } else
855: memset(srv->srv_clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));
856:
1.2 misho 857: pthread_mutex_init(&srv->srv_mtx, NULL);
858:
859: rpc_srv_registerCall(srv, NULL, CALL_SRVSHUTDOWN, 0);
1.5 misho 860: rpc_srv_registerCall(srv, NULL, CALL_SRVCLIENTS, 1);
861: rpc_srv_registerCall(srv, NULL, CALL_SRVSESSIONS, 4);
862: rpc_srv_registerCall(srv, NULL, CALL_SRVCALLS, 1);
1.7.2.2 misho 863:
1.1 misho 864: return srv;
865: }
866:
867: /*
1.7 misho 868: * rpc_srv_endServer() - Destroy RPC server, close all opened sockets and free resources
869: *
1.6 misho 870: * @psrv = RPC Server instance
1.1 misho 871: * return: none
872: */
873: void
1.6 misho 874: rpc_srv_endServer(rpc_srv_t ** __restrict psrv)
1.1 misho 875: {
876: rpc_cli_t *c;
877: register int i;
878: rpc_func_t *f;
879:
1.7 misho 880: ioTRACE(RPC_TRACE_LEVEL);
881:
1.6 misho 882: if (!psrv || !*psrv) {
1.1 misho 883: rpc_SetErr(EINVAL, "Error:: Can`t destroy server because parameter is null!\n");
884: return;
885: }
886:
1.6 misho 887: rpc_srv_endBLOBServer(*psrv);
1.1 misho 888:
1.4 misho 889: /* close all clients connections & server socket */
1.6 misho 890: for (i = 0, c = (*psrv)->srv_clients; i < (*psrv)->srv_numcli && c; i++, c++)
891: if (c->cli_sa.sa.sa_family) {
1.1 misho 892: shutdown(c->cli_sock, SHUT_RDWR);
1.2 misho 893: close(c->cli_sock);
894: }
1.6 misho 895: close((*psrv)->srv_server.cli_sock);
1.1 misho 896:
1.6 misho 897: if ((*psrv)->srv_clients) {
898: free((*psrv)->srv_clients);
899: (*psrv)->srv_clients = NULL;
900: (*psrv)->srv_numcli = 0;
1.1 misho 901: }
902:
1.4 misho 903: /* detach exported calls */
1.6 misho 904: pthread_mutex_lock(&(*psrv)->srv_mtx);
905: while ((f = (*psrv)->srv_funcs)) {
906: (*psrv)->srv_funcs = f->func_next;
907: io_freeVars(&f->func_vars);
1.7 misho 908: AIT_FREE_VAL(&f->func_name);
909: AIT_FREE_VAL(&f->func_file);
1.2 misho 910: free(f);
911: }
1.6 misho 912: pthread_mutex_unlock(&(*psrv)->srv_mtx);
1.2 misho 913:
1.6 misho 914: while (pthread_mutex_trylock(&(*psrv)->srv_mtx) == EBUSY);
915: pthread_mutex_destroy(&(*psrv)->srv_mtx);
1.2 misho 916:
1.6 misho 917: free(*psrv);
918: *psrv = NULL;
1.1 misho 919: }
920:
921: /*
1.7 misho 922: * rpc_srv_loopServer() - Execute Main server loop and wait for clients requests
923: *
1.1 misho 924: * @srv = RPC Server instance
925: * return: -1 error or 0 ok, infinite loop ...
926: */
927: int
1.5 misho 928: rpc_srv_loopServer(rpc_srv_t * __restrict srv)
1.1 misho 929: {
1.6 misho 930: socklen_t salen = sizeof(io_sockaddr_t);
1.1 misho 931: register int i;
932: rpc_cli_t *c;
1.2 misho 933: fd_set fds;
934: int ret;
935: struct timeval tv = { DEF_RPC_TIMEOUT, 0 };
1.7 misho 936: pthread_attr_t attr;
937:
938: ioTRACE(RPC_TRACE_LEVEL);
1.1 misho 939:
940: if (!srv) {
941: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start RPC server ...\n");
942: return -1;
943: }
944:
1.7 misho 945: tv.tv_sec = srv->srv_session.sess_timeout;
946:
1.5 misho 947: /* activate BLOB server worker if srv->srv_blob.state == enable */
948: rpc_srv_execBLOBServer(srv);
949:
1.1 misho 950: if (listen(srv->srv_server.cli_sock, SOMAXCONN) == -1) {
951: LOGERR;
952: return -1;
953: }
954:
1.7 misho 955: pthread_attr_init(&attr);
956: pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
957:
958: /* main rpc loop */
1.4 misho 959: while (srv->srv_kill != kill) {
1.1 misho 960: for (c = srv->srv_clients, i = 0; i < srv->srv_numcli && c; i++, c++)
1.6 misho 961: if (!c->cli_sa.sa.sa_family)
1.1 misho 962: break;
1.2 misho 963: if (i >= srv->srv_numcli) {
1.5 misho 964: #ifdef HAVE_PTHREAD_YIELD
965: pthread_yield();
966: #else
1.1 misho 967: usleep(1000000);
1.5 misho 968: #endif
1.1 misho 969: continue;
970: }
1.2 misho 971:
972: FD_ZERO(&fds);
973: FD_SET(srv->srv_server.cli_sock, &fds);
974: ret = select(srv->srv_server.cli_sock + 1, &fds, NULL, NULL, &tv);
975: if (ret == -1) {
976: LOGERR;
977: ret = 1;
978: break;
979: }
980: if (!ret)
981: continue;
982:
1.6 misho 983: c->cli_sock = accept(srv->srv_server.cli_sock, &c->cli_sa.sa, &salen);
1.1 misho 984: if (c->cli_sock == -1) {
985: LOGERR;
986: continue;
987: } else
988: c->cli_parent = srv;
989:
1.7 misho 990: /* spawn rpc client dispatcher */
991: if (pthread_create(&c->cli_tid, &attr, rpc_srv_dispatchCall, c)) {
1.1 misho 992: LOGERR;
993: continue;
1.7 misho 994: }
1.1 misho 995: }
996:
1.7 misho 997: pthread_attr_destroy(&attr);
1.1 misho 998: return 0;
999: }
1000:
1001: // ---------------------------------------------------------
1002:
1003: /*
1004: * rpc_srv_execCall() Execute registered call from RPC server
1.7 misho 1005: *
1.1 misho 1006: * @call = Register RPC call
1007: * @rpc = IN RPC call structure
1.5 misho 1008: * @args = IN RPC calling arguments from RPC client
1.1 misho 1009: * return: -1 error, !=-1 ok
1010: */
1011: int
1.2 misho 1012: rpc_srv_execCall(rpc_func_t * __restrict call, struct tagRPCCall * __restrict rpc,
1.5 misho 1013: array_t * __restrict args)
1.1 misho 1014: {
1015: void *dl;
1016: rpc_callback_t func;
1017: int ret;
1018:
1.7 misho 1019: ioTRACE(RPC_TRACE_LEVEL);
1020:
1.2 misho 1021: if (!call || !rpc || !call->func_parent) {
1.7 misho 1022: rpc_SetErr(EINVAL, "Invalid parameter can`t exec function");
1.1 misho 1023: return -1;
1024: }
1025:
1.7 misho 1026: dl = dlopen(AIT_VOID(&call->func_file), RTLD_NOW);
1.1 misho 1027: if (!dl) {
1.7 misho 1028: rpc_SetErr(ENOENT, "Can`t attach module %s!", dlerror());
1.1 misho 1029: return -1;
1030: }
1031:
1.7 misho 1032: func = dlsym(dl, (const char*) AIT_GET_STR(&call->func_name));
1.1 misho 1033: if (func)
1.6 misho 1034: ret = func(call, ntohs(rpc->call_argc), args);
1.1 misho 1035: else {
1.7 misho 1036: rpc_SetErr(ENOEXEC, "Can`t find function %s!", dlerror());
1.1 misho 1037: ret = -1;
1038: }
1039:
1040: dlclose(dl);
1041: return ret;
1042: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>