Annotation of libaitrpc/src/srv.c, revision 1.8
1.1 misho 1: /*************************************************************************
2: * (C) 2010 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.8 ! misho 6: * $Id: srv.c,v 1.7.2.4 2012/03/29 01:23:59 misho Exp $
1.1 misho 7: *
1.2 misho 8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
1.7 misho 15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
1.2 misho 16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
1.1 misho 46: #include "global.h"
47:
48:
1.7 misho 49: static void *rxPacket(sched_task_t*);
50: static void *rxBLOB(sched_task_t*);
51:
52: static void *
53: txPacket(sched_task_t *task)
54: {
55: rpc_cli_t *c = TASK_ARG(task);
56: rpc_srv_t *s = c->cli_parent;
57: rpc_func_t *f = NULL;
58: u_char *buf = TASK_DATA(task);
59: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
60: int ret, wlen = sizeof(struct tagRPCCall);
61: array_t *arr = NULL;
62:
63: ioTRACE(RPC_TRACE_LEVEL);
64:
65: if (rpc->call_argc) {
66: f = rpc_srv_getCall(s, ntohs(rpc->call_tag), ntohl(rpc->call_hash));
67: if (!f) {
68: rpc->call_argc ^= rpc->call_argc;
69: rpc->call_rep.ret = RPC_ERROR(-1);
70: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
71: } else {
72: rpc->call_argc = htons(rpc_srv_getVars(f, &arr));
73: /* Go Encapsulate variables */
74: ret = io_vars2buffer(buf + wlen, TASK_DATLEN(task) - wlen, arr);
75: io_clrVars(f->func_vars);
76: if (ret == -1) {
77: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
78: rpc->call_argc ^= rpc->call_argc;
79: rpc->call_rep.ret = RPC_ERROR(-1);
80: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
81: } else
82: wlen += ret;
83: }
84: }
85:
1.8 ! misho 86: rpc->call_len = htons(wlen);
! 87:
1.7 misho 88: /* calculate CRC */
89: rpc->call_crc ^= rpc->call_crc;
1.8 ! misho 90: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
1.7 misho 91:
92: /* send reply */
93: ret = send(TASK_FD(task), buf, wlen, 0);
94: if (ret == -1)
95: LOGERR;
96: else if (ret != wlen)
97: rpc_SetErr(EPROCUNAVAIL, "RPC reply, should be send %d bytes, "
98: "really sended %d bytes", wlen, ret);
99: else
100: ioDEBUG(RPC_DEBUG_LEVEL, "Sended %d bytes", ret);
101:
102: return NULL;
103: }
104:
105: static void *
106: execCall(sched_task_t *task)
107: {
108: rpc_cli_t *c = TASK_ARG(task);
109: rpc_srv_t *s = c->cli_parent;
110: rpc_func_t *f = NULL;
111: array_t *arr = NULL;
1.8 ! misho 112: u_char *buf = TASK_DATA(task) + TASK_VAL(task);
1.7 misho 113: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
114: int argc = ntohs(rpc->call_argc);
115:
116: ioTRACE(RPC_TRACE_LEVEL);
117:
118: /* Go decapsulate variables ... */
1.8 ! misho 119: if (argc) {
1.7 misho 120: arr = io_buffer2vars(buf + sizeof(struct tagRPCCall),
1.8 ! misho 121: TASK_DATLEN(task) - TASK_VAL(task) - sizeof(struct tagRPCCall),
! 122: argc, 1);
1.7 misho 123: if (!arr) {
124: rpc_SetErr(ERPCMISMATCH, "#%d - %s", io_GetErrno(), io_GetError());
125: rpc->call_argc ^= rpc->call_argc;
126: rpc->call_rep.ret = RPC_ERROR(-1);
127: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
128: return NULL;
129: }
130: }
131:
132: if (!(f = rpc_srv_getCall(s, ntohs(rpc->call_tag), ntohl(rpc->call_hash)))) {
133: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
134: rpc->call_argc ^= rpc->call_argc;
135: rpc->call_rep.ret = RPC_ERROR(-1);
136: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
137: } else {
138: ioDEBUG(RPC_DEBUG_LEVEL, "RPC function %s from module %s",
139: AIT_GET_STR(&f->func_name), AIT_GET_LIKE(&f->func_file, char*));
140:
1.8 ! misho 141: /* if client doesn't want reply */
! 142: argc = rpc->call_req.flags & RPC_NOREPLY;
1.7 misho 143: rpc->call_rep.ret = RPC_ERROR(rpc_srv_execCall(f, rpc, arr));
144: if (rpc->call_rep.ret == htonl(-1)) {
145: rpc->call_rep.eno = RPC_ERROR(errno);
146: rpc->call_argc ^= rpc->call_argc;
147: } else {
148: rpc->call_rep.eno ^= rpc->call_rep.eno;
1.8 ! misho 149: if (argc) {
! 150: /* without reply */
! 151: io_clrVars(f->func_vars);
! 152: rpc->call_argc ^= rpc->call_argc;
! 153: } else
! 154: rpc->call_argc = htons(rpc_srv_getVars(f, NULL));
1.7 misho 155: }
156: }
157:
158: if (arr)
159: io_arrayDestroy(&arr);
160: return NULL;
161: }
162:
163: static void *
164: rxPacket(sched_task_t *task)
165: {
166: rpc_cli_t *c = TASK_ARG(task);
167: rpc_srv_t *s = c->cli_parent;
168: u_char *buf = TASK_DATA(task);
1.8 ! misho 169: int rlen, noreply;
! 170: u_short crc, off = 0;
1.7 misho 171: struct tagRPCCall *rpc;
172: struct timespec ts;
173:
174: ioTRACE(RPC_TRACE_LEVEL);
175:
176: memset(buf, 0, TASK_DATLEN(task));
177: rlen = recv(TASK_FD(task), buf, TASK_DATLEN(task), 0);
178: if (rlen == -1) {
179: LOGERR;
180: s->srv_kill = s->srv_blob.state = kill;
181: return NULL;
182: } else if (!rlen) { /* receive EOF */
183: s->srv_kill = s->srv_blob.state = kill;
184: return NULL;
185: } else
186: ioDEBUG(RPC_DEBUG_LEVEL, "Readed %d bytes", rlen);
187:
1.8 ! misho 188: do {
! 189: if (rlen < sizeof(struct tagRPCCall)) {
! 190: rpc_SetErr(ERPCMISMATCH, "Too short RPC packet");
! 191:
! 192: schedRead(TASK_ROOT(task), rxPacket, TASK_ARG(task), TASK_FD(task),
! 193: TASK_DATA(task), TASK_DATLEN(task));
! 194: return NULL;
! 195: } else
! 196: rpc = (struct tagRPCCall*) (buf + off);
! 197:
! 198: rlen -= ntohs(rpc->call_len);
! 199:
! 200: /* check integrity of packet */
! 201: crc = ntohs(rpc->call_crc);
! 202: rpc->call_crc ^= rpc->call_crc;
! 203: if (crc != crcFletcher16((u_short*) (buf + off), ntohs(rpc->call_len) / 2)) {
! 204: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
1.7 misho 205:
1.8 ! misho 206: off += ntohs(rpc->call_len);
! 207: if (rlen < 1)
! 208: break;
! 209: else
! 210: continue;
! 211: }
1.7 misho 212:
1.8 ! misho 213: noreply = rpc->call_req.flags & RPC_NOREPLY;
1.7 misho 214:
1.8 ! misho 215: /* check RPC packet session info */
! 216: if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) {
! 217: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
! 218: rpc->call_argc ^= rpc->call_argc;
! 219: rpc->call_rep.ret = RPC_ERROR(-1);
! 220: rpc->call_rep.eno = RPC_ERROR(errno);
! 221: } else {
! 222: /* change socket timeout from last packet */
! 223: ts.tv_sec = rpc->call_session.sess_timeout;
! 224: ts.tv_nsec = 0;
! 225: schedPolling(TASK_ROOT(task), &ts, NULL);
! 226:
! 227: /* execute RPC call */
! 228: schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), off,
! 229: TASK_DATA(task), TASK_DATLEN(task));
! 230: }
1.7 misho 231:
1.8 ! misho 232: /* send RPC reply */
! 233: if (!noreply)
! 234: schedWrite(TASK_ROOT(task), txPacket, TASK_ARG(task), TASK_FD(task),
! 235: buf + off, TASK_DATLEN(task) - off);
1.7 misho 236:
1.8 ! misho 237: off += ntohs(rpc->call_len);
! 238: } while (rlen > 0);
1.7 misho 239:
240: /* lets get next packet */
241: schedRead(TASK_ROOT(task), rxPacket, TASK_ARG(task), TASK_FD(task),
242: TASK_DATA(task), TASK_DATLEN(task));
243: return NULL;
244: }
245:
1.1 misho 246: static void *
247: rpc_srv_dispatchCall(void *arg)
248: {
249: rpc_cli_t *c = arg;
250: rpc_srv_t *s;
1.5 misho 251: u_char *buf;
1.7 misho 252: sched_root_task_t *root;
253: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
254:
255: ioTRACE(RPC_TRACE_LEVEL);
1.1 misho 256:
257: if (!arg) {
1.7 misho 258: rpc_SetErr(EINVAL, "Invalid parameter can`t procced RPC client");
1.1 misho 259: return NULL;
260: } else
261: s = c->cli_parent;
262:
1.7 misho 263: /* allocate net buffer */
1.5 misho 264: buf = malloc(s->srv_netbuf);
265: if (!buf) {
266: LOGERR;
267: return NULL;
268: }
269:
1.7 misho 270: root = schedBegin();
271: if (!root) {
272: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
273: free(buf);
274: return NULL;
275: } else {
276: schedTermCondition(root, kill);
277: schedPolling(root, &ts, NULL);
278: }
279:
280: schedRead(root, rxPacket, c, c->cli_sock, buf, s->srv_netbuf);
281:
282: schedRun(root, (void*) &s->srv_kill);
283: schedEnd(&root);
284:
285: shutdown(c->cli_sock, SHUT_RDWR);
286: close(c->cli_sock);
287: memset(c, 0, sizeof(rpc_cli_t));
288: free(buf);
289: return NULL;
290: }
291:
292:
293: static void *
294: txBLOB(sched_task_t *task)
295: {
296: u_char *buf = TASK_DATA(task);
297: struct tagBLOBHdr *blob = (struct tagBLOBHdr *) buf;
298: int wlen = sizeof(struct tagBLOBHdr);
299:
300: ioTRACE(RPC_TRACE_LEVEL);
301:
302: /* calculate CRC */
303: blob->hdr_crc ^= blob->hdr_crc;
1.8 ! misho 304: blob->hdr_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
1.7 misho 305:
306: /* send reply */
307: wlen = send(TASK_FD(task), buf, wlen, 0);
308: if (wlen == -1)
309: LOGERR;
310: else if (wlen != sizeof(struct tagBLOBHdr))
311: rpc_SetErr(EPROCUNAVAIL, "RPC reply, should be send %d bytes, "
312: "really sended %d bytes", sizeof(struct tagBLOBHdr), wlen);
313: else
314: ioDEBUG(RPC_DEBUG_LEVEL, "Sended %d bytes", wlen);
315:
316: return NULL;
317: }
1.4 misho 318:
1.7 misho 319: static void *
320: rxBLOB(sched_task_t *task)
321: {
322: rpc_cli_t *c = TASK_ARG(task);
323: rpc_srv_t *s = c->cli_parent;
324: rpc_blob_t *b;
325: u_char *buf = TASK_DATA(task);
326: struct tagBLOBHdr *blob = (struct tagBLOBHdr *) buf;
327: int rlen;
328: u_short crc;
329: struct timespec ts;
330:
331: ioTRACE(RPC_TRACE_LEVEL);
332:
333: /* check for disable service at this moment? */
1.8 ! misho 334: if (!s || s->srv_blob.state == disable) {
1.7 misho 335: usleep(100000);
336: #ifdef HAVE_PTHREAD_YIELD
337: pthread_yield();
338: #endif
339: schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task),
340: TASK_DATA(task), TASK_DATLEN(task));
341: return NULL;
342: }
1.5 misho 343:
1.7 misho 344: memset(buf, 0, TASK_DATLEN(task));
345: rlen = recv(TASK_FD(task), buf, TASK_DATLEN(task), 0);
346: if (rlen == -1) {
347: LOGERR;
348: s->srv_blob.state = kill;
349: return NULL;
350: } else if (!rlen || s->srv_kill == kill) { /* receive EOF */
351: s->srv_blob.state = kill;
352: return NULL;
353: } else
354: ioDEBUG(RPC_DEBUG_LEVEL, "Readed %d bytes", rlen);
1.6 misho 355:
1.7 misho 356: if (rlen < sizeof(struct tagBLOBHdr)) {
357: rpc_SetErr(ERPCMISMATCH, "Too short BLOB packet");
358: schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task),
359: TASK_DATA(task), TASK_DATLEN(task));
360: return NULL;
361: }
1.1 misho 362:
1.7 misho 363: /* check integrity of packet */
364: crc = ntohs(blob->hdr_crc);
365: blob->hdr_crc ^= blob->hdr_crc;
1.8 ! misho 366: if (crc != crcFletcher16((u_short*) buf, rlen / 2)) {
1.7 misho 367: rpc_SetErr(ERPCMISMATCH, "Bad CRC BLOB packet");
368: schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task),
369: TASK_DATA(task), TASK_DATLEN(task));
370: return NULL;
371: }
1.5 misho 372:
1.7 misho 373: /* check RPC packet session info */
374: if ((crc = rpc_chkPktSession(&blob->hdr_session, &s->srv_session))) {
375: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
376: blob->hdr_cmd = error;
377: goto end;
378: } else {
379: /* change socket timeout from last packet */
380: ts.tv_sec = blob->hdr_session.sess_timeout;
381: ts.tv_nsec = 0;
382: schedPolling(TASK_ROOT(task), &ts, NULL);
383: }
384:
385: /* Go to proceed packet ... */
386: switch (blob->hdr_cmd) {
387: case get:
388: if (!(b = rpc_srv_getBLOB(s, ntohl(blob->hdr_var)))) {
389: rpc_SetErr(EINVAL, "Var=%x not found", ntohl(blob->hdr_var));
390: blob->hdr_cmd = no;
391: blob->hdr_ret = RPC_ERROR(-1);
392: break;
393: } else
394: blob->hdr_len = htonl(b->blob_len);
1.5 misho 395:
1.7 misho 396: if (rpc_srv_blobMap(s, b) != -1) {
397: /* deliver BLOB variable to client */
398: blob->hdr_ret = htonl(rpc_srv_sendBLOB(c, b));
399: rpc_srv_blobUnmap(b);
400: } else {
401: blob->hdr_cmd = error;
402: blob->hdr_ret = RPC_ERROR(-1);
403: }
404: break;
405: case set:
406: if ((b = rpc_srv_registerBLOB(s, ntohl(blob->hdr_len)))) {
407: /* set new BLOB variable for reply :) */
408: blob->hdr_var = htonl(b->blob_var);
409:
410: /* receive BLOB from client */
411: blob->hdr_ret = htonl(rpc_srv_recvBLOB(c, b));
412: rpc_srv_blobUnmap(b);
1.5 misho 413: } else {
1.7 misho 414: blob->hdr_cmd = error;
415: blob->hdr_ret = RPC_ERROR(-1);
416: }
417: break;
418: case unset:
419: if (rpc_srv_unregisterBLOB(s, blob->hdr_var) == -1) {
420: blob->hdr_cmd = error;
421: blob->hdr_ret = RPC_ERROR(-1);
1.1 misho 422: }
423: break;
1.7 misho 424: default:
425: rpc_SetErr(EPROCUNAVAIL, "Unsupported BLOB command %d", blob->hdr_cmd);
426: blob->hdr_cmd = error;
427: blob->hdr_ret = RPC_ERROR(-1);
428: }
1.1 misho 429:
1.7 misho 430: end:
431: schedWrite(TASK_ROOT(task), txBLOB, TASK_ARG(task), TASK_FD(task),
432: TASK_DATA(task), TASK_DATLEN(task));
433: schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task),
434: TASK_DATA(task), TASK_DATLEN(task));
435: return NULL;
1.2 misho 436: }
437:
438: static void *
439: rpc_srv_dispatchVars(void *arg)
440: {
441: rpc_cli_t *c = arg;
442: rpc_srv_t *s;
1.7 misho 443: sched_root_task_t *root;
444: u_char *buf;
445: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
446:
447: ioTRACE(RPC_TRACE_LEVEL);
1.2 misho 448:
449: if (!arg) {
1.7 misho 450: rpc_SetErr(EINVAL, "Invalid parameter can`t procced BLOB");
1.2 misho 451: return NULL;
452: } else
453: s = c->cli_parent;
454:
1.7 misho 455: /* allocate net buffer */
456: buf = malloc(sizeof(struct tagBLOBHdr));
457: if (!buf) {
458: LOGERR;
459: return NULL;
460: }
1.2 misho 461:
1.7 misho 462: root = schedBegin();
463: if (!root) {
464: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
465: free(buf);
466: return NULL;
467: } else {
468: schedTermCondition(root, kill);
469: schedPolling(root, &ts, NULL);
470: }
1.4 misho 471:
1.7 misho 472: schedRead(root, rxBLOB, c, c->cli_sock, buf, sizeof(struct tagBLOBHdr));
1.2 misho 473:
1.7 misho 474: schedRun(root, (void*) &s->srv_blob.state);
475: schedEnd(&root);
1.2 misho 476:
477: shutdown(c->cli_sock, SHUT_RDWR);
478: close(c->cli_sock);
479: memset(c, 0, sizeof(rpc_cli_t));
1.7 misho 480: free(buf);
481: return NULL;
1.1 misho 482: }
483:
484: // -------------------------------------------------
485:
486: /*
1.7 misho 487: * rpc_srv_initBLOBServer() - Init & create BLOB Server
488: *
1.4 misho 489: * @srv = RPC server instance
1.2 misho 490: * @Port = Port for bind server, if Port == 0 default port is selected
491: * @diskDir = Disk place for BLOB file objects
492: * return: -1 == error or 0 bind and created BLOB server instance
493: */
494: int
495: rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)
496: {
497: int n = 1;
1.6 misho 498: io_sockaddr_t sa;
1.2 misho 499:
1.7 misho 500: ioTRACE(RPC_TRACE_LEVEL);
501:
1.2 misho 502: if (!srv) {
1.7 misho 503: rpc_SetErr(EINVAL, "Invalid parameters can`t init BLOB server");
1.2 misho 504: return -1;
505: }
506: if (srv->srv_blob.state) {
1.7 misho 507: rpc_SetErr(EPERM, "Already started BLOB server!");
1.2 misho 508: return 0;
509: }
510:
511: memset(&srv->srv_blob, 0, sizeof srv->srv_blob);
512: if (access(diskDir, R_OK | W_OK) == -1) {
513: LOGERR;
514: return -1;
515: } else
1.7 misho 516: AIT_SET_STR(&srv->srv_blob.dir, diskDir);
1.2 misho 517:
518: srv->srv_blob.server.cli_tid = pthread_self();
519: srv->srv_blob.server.cli_parent = srv;
1.4 misho 520:
521: memcpy(&sa, &srv->srv_server.cli_sa, sizeof sa);
1.6 misho 522: switch (sa.sa.sa_family) {
1.4 misho 523: case AF_INET:
1.6 misho 524: sa.sin.sin_port = htons(Port ? Port : ntohs(sa.sin.sin_port) + 1);
1.4 misho 525: break;
526: case AF_INET6:
1.6 misho 527: sa.sin6.sin6_port = htons(Port ? Port : ntohs(sa.sin6.sin6_port) + 1);
1.4 misho 528: break;
529: case AF_LOCAL:
1.6 misho 530: strlcat(sa.sun.sun_path, ".blob", sizeof sa.sun.sun_path);
1.4 misho 531: break;
532: default:
1.7 misho 533: AIT_FREE_VAL(&srv->srv_blob.dir);
1.4 misho 534: return -1;
1.2 misho 535: }
1.6 misho 536: memcpy(&srv->srv_blob.server.cli_sa, &sa, sizeof sa);
1.2 misho 537:
1.4 misho 538: /* create BLOB server socket */
1.6 misho 539: srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);
1.2 misho 540: if (srv->srv_blob.server.cli_sock == -1) {
541: LOGERR;
1.7 misho 542: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 543: return -1;
544: }
545: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
546: LOGERR;
547: close(srv->srv_blob.server.cli_sock);
1.7 misho 548: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 549: return -1;
550: }
1.5 misho 551: n = srv->srv_netbuf;
552: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
553: LOGERR;
554: close(srv->srv_blob.server.cli_sock);
1.7 misho 555: AIT_FREE_VAL(&srv->srv_blob.dir);
1.5 misho 556: return -1;
557: }
558: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
559: LOGERR;
560: close(srv->srv_blob.server.cli_sock);
1.7 misho 561: AIT_FREE_VAL(&srv->srv_blob.dir);
1.5 misho 562: return -1;
563: }
1.6 misho 564: if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa,
565: srv->srv_blob.server.cli_sa.sa.sa_len) == -1) {
1.2 misho 566: LOGERR;
567: close(srv->srv_blob.server.cli_sock);
1.7 misho 568: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 569: return -1;
570: }
571:
1.4 misho 572: /* allocate pool for concurent clients */
1.2 misho 573: srv->srv_blob.clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));
574: if (!srv->srv_blob.clients) {
575: LOGERR;
576: close(srv->srv_blob.server.cli_sock);
1.7 misho 577: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 578: return -1;
579: } else
580: memset(srv->srv_blob.clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));
581:
582: pthread_mutex_init(&srv->srv_blob.mtx, NULL);
583:
584: rpc_srv_registerCall(srv, NULL, CALL_BLOBSHUTDOWN, 0);
1.5 misho 585: rpc_srv_registerCall(srv, NULL, CALL_BLOBCLIENTS, 1);
586: rpc_srv_registerCall(srv, NULL, CALL_BLOBVARS, 1);
587: rpc_srv_registerCall(srv, NULL, CALL_BLOBSTATE, 0);
1.2 misho 588:
1.4 misho 589: srv->srv_blob.state = enable; /* enable BLOB */
1.2 misho 590: return 0;
591: }
592:
593: /*
1.7 misho 594: * rpc_srv_endBLOBServer() - Destroy BLOB server, close all opened sockets and free resources
595: *
1.2 misho 596: * @srv = RPC Server instance
597: * return: none
598: */
599: void
600: rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
601: {
602: rpc_cli_t *c;
603: register int i;
604: rpc_blob_t *f;
605:
1.7 misho 606: ioTRACE(RPC_TRACE_LEVEL);
607:
1.8 ! misho 608: if (!srv || srv->srv_blob.state == disable)
1.2 misho 609: return;
1.8 ! misho 610: else
1.4 misho 611: srv->srv_blob.state = kill;
1.2 misho 612:
613: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSHUTDOWN);
614: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBCLIENTS);
615: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBVARS);
616: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSTATE);
617:
1.4 misho 618: /* close all clients connections & server socket */
1.2 misho 619: for (i = 0, c = srv->srv_blob.clients; i < srv->srv_numcli && c; i++, c++)
1.6 misho 620: if (c->cli_sa.sa.sa_family)
1.2 misho 621: shutdown(c->cli_sock, SHUT_RDWR);
622: close(srv->srv_blob.server.cli_sock);
623:
1.5 misho 624: pthread_mutex_lock(&srv->srv_blob.mtx);
1.2 misho 625: if (srv->srv_blob.clients) {
626: free(srv->srv_blob.clients);
627: srv->srv_blob.clients = NULL;
628: }
629:
1.4 misho 630: /* detach blobs */
1.2 misho 631: while ((f = srv->srv_blob.blobs)) {
632: srv->srv_blob.blobs = f->blob_next;
633: rpc_srv_blobFree(srv, f);
634: free(f);
635: }
636: pthread_mutex_unlock(&srv->srv_blob.mtx);
637:
1.7 misho 638: AIT_FREE_VAL(&srv->srv_blob.dir);
639:
1.2 misho 640: while (pthread_mutex_trylock(&srv->srv_blob.mtx) == EBUSY);
641: pthread_mutex_destroy(&srv->srv_blob.mtx);
1.8 ! misho 642:
! 643: /* at final, save disable BLOB service state */
! 644: srv->srv_blob.state = disable;
1.2 misho 645: }
646:
647: /*
1.7 misho 648: * rpc_srv_loopBLOB() - Execute Main BLOB server loop and wait for clients requests
649: *
1.2 misho 650: * @srv = RPC Server instance
651: * return: -1 error or 0 ok, infinite loop ...
652: */
653: int
1.5 misho 654: rpc_srv_loopBLOB(rpc_srv_t * __restrict srv)
1.2 misho 655: {
1.6 misho 656: socklen_t salen = sizeof(io_sockaddr_t);
1.2 misho 657: register int i;
658: rpc_cli_t *c;
659: fd_set fds;
660: int ret;
661: struct timeval tv = { DEF_RPC_TIMEOUT, 0 };
1.7 misho 662: pthread_attr_t attr;
663:
664: ioTRACE(RPC_TRACE_LEVEL);
1.2 misho 665:
1.4 misho 666: if (!srv || srv->srv_blob.state == kill) {
1.7 misho 667: rpc_SetErr(EINVAL, "Invalid parameter can`t start BLOB server");
1.2 misho 668: return -1;
669: }
670:
1.7 misho 671: tv.tv_sec = srv->srv_session.sess_timeout;
672:
1.2 misho 673: if (listen(srv->srv_blob.server.cli_sock, SOMAXCONN) == -1) {
674: LOGERR;
675: return -1;
676: }
677:
1.7 misho 678: pthread_attr_init(&attr);
679: pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
680:
681: /* main BLOB loop */
1.4 misho 682: while (srv->srv_blob.state != kill && srv->srv_kill != kill) {
1.2 misho 683: for (c = srv->srv_blob.clients, i = 0; i < srv->srv_numcli && c; i++, c++)
1.6 misho 684: if (!c->cli_sa.sa.sa_family)
1.2 misho 685: break;
686: if (i >= srv->srv_numcli) {
1.5 misho 687: #ifdef HAVE_PTHREAD_YIELD
688: pthread_yield();
689: #else
1.2 misho 690: usleep(1000000);
1.5 misho 691: #endif
1.2 misho 692: continue;
693: }
694:
695: FD_ZERO(&fds);
696: FD_SET(srv->srv_blob.server.cli_sock, &fds);
697: ret = select(srv->srv_blob.server.cli_sock + 1, &fds, NULL, NULL, &tv);
698: if (ret == -1) {
699: LOGERR;
700: ret = 1;
701: break;
702: }
703: if (!ret)
704: continue;
705:
1.6 misho 706: c->cli_sock = accept(srv->srv_blob.server.cli_sock, &c->cli_sa.sa, &salen);
1.2 misho 707: if (c->cli_sock == -1) {
708: LOGERR;
709: continue;
710: } else
711: c->cli_parent = srv;
712:
1.7 misho 713: /* spawn dispatch thread for BLOB client */
714: if (pthread_create(&c->cli_tid, &attr, rpc_srv_dispatchVars, c)) {
1.2 misho 715: LOGERR;
716: continue;
1.7 misho 717: }
1.2 misho 718: }
719:
1.5 misho 720: srv->srv_blob.state = kill;
1.2 misho 721:
1.7 misho 722: pthread_attr_destroy(&attr);
1.2 misho 723: return 0;
724: }
725:
726:
727: /*
1.7 misho 728: * rpc_srv_initServer() - Init & create RPC Server
729: *
1.1 misho 730: * @regProgID = ProgramID for authentication & recognition
731: * @regProcID = ProcessID for authentication & recognition
732: * @concurentClients = Concurent clients at same time to this server
1.5 misho 733: * @netBuf = Network buffer length, if =0 == BUFSIZ (also meaning max RPC packet)
1.4 misho 734: * @family = Family type, AF_INET, AF_INET6 or AF_LOCAL
735: * @csHost = Host name or address for bind server, if NULL any address
1.1 misho 736: * @Port = Port for bind server, if Port == 0 default port is selected
737: * return: NULL == error or !=NULL bind and created RPC server instance
738: */
739: rpc_srv_t *
740: rpc_srv_initServer(u_int regProgID, u_int regProcID, int concurentClients,
1.5 misho 741: int netBuf, u_short family, const char *csHost, u_short Port)
1.1 misho 742: {
743: rpc_srv_t *srv = NULL;
744: int n = 1;
745: struct hostent *host = NULL;
1.6 misho 746: io_sockaddr_t sa;
1.1 misho 747:
1.7 misho 748: ioTRACE(RPC_TRACE_LEVEL);
749:
1.4 misho 750: if (!concurentClients || !regProgID ||
751: (family != AF_INET && family != AF_INET6 && family != AF_LOCAL)) {
1.1 misho 752: rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t init RPC server ...\n");
753: return NULL;
754: }
755: if (!Port)
756: Port = RPC_DEFPORT;
1.5 misho 757: if (!netBuf)
758: netBuf = BUFSIZ;
1.7 misho 759: else
760: netBuf = io_align(netBuf, 1); /* align netBuf length */
1.4 misho 761: if (csHost && family != AF_LOCAL) {
1.1 misho 762: host = gethostbyname2(csHost, family);
763: if (!host) {
764: rpc_SetErr(h_errno, "Error:: %s\n", hstrerror(h_errno));
765: return NULL;
766: }
767: }
1.4 misho 768: memset(&sa, 0, sizeof sa);
1.6 misho 769: sa.sa.sa_family = family;
1.1 misho 770: switch (family) {
771: case AF_INET:
1.6 misho 772: sa.sin.sin_len = sizeof(struct sockaddr_in);
773: sa.sin.sin_port = htons(Port);
1.1 misho 774: if (csHost)
1.6 misho 775: memcpy(&sa.sin.sin_addr, host->h_addr, host->h_length);
1.1 misho 776: break;
777: case AF_INET6:
1.6 misho 778: sa.sin6.sin6_len = sizeof(struct sockaddr_in6);
779: sa.sin6.sin6_port = htons(Port);
1.4 misho 780: if (csHost)
1.6 misho 781: memcpy(&sa.sin6.sin6_addr, host->h_addr, host->h_length);
1.4 misho 782: break;
783: case AF_LOCAL:
1.6 misho 784: sa.sun.sun_len = sizeof(struct sockaddr_un);
1.1 misho 785: if (csHost)
1.6 misho 786: strlcpy(sa.sun.sun_path, csHost, sizeof sa.sun.sun_path);
787: unlink(sa.sun.sun_path);
1.1 misho 788: break;
789: default:
790: rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t start RPC server ...\n");
791: return NULL;
792: }
793:
794: srv = malloc(sizeof(rpc_srv_t));
795: if (!srv) {
796: LOGERR;
797: return NULL;
798: } else
799: memset(srv, 0, sizeof(rpc_srv_t));
800:
1.5 misho 801: srv->srv_netbuf = netBuf;
1.1 misho 802: srv->srv_numcli = concurentClients;
803: srv->srv_session.sess_version = RPC_VERSION;
1.7 misho 804: srv->srv_session.sess_timeout = DEF_RPC_TIMEOUT;
1.1 misho 805: srv->srv_session.sess_program = regProgID;
806: srv->srv_session.sess_process = regProcID;
807:
1.2 misho 808: srv->srv_server.cli_tid = pthread_self();
1.1 misho 809: srv->srv_server.cli_parent = srv;
1.6 misho 810: memcpy(&srv->srv_server.cli_sa, &sa, sizeof sa);
1.4 misho 811:
812: /* create server socket */
1.1 misho 813: srv->srv_server.cli_sock = socket(family, SOCK_STREAM, 0);
814: if (srv->srv_server.cli_sock == -1) {
815: LOGERR;
816: free(srv);
817: return NULL;
818: }
819: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
820: LOGERR;
821: close(srv->srv_server.cli_sock);
822: free(srv);
823: return NULL;
824: }
1.5 misho 825: n = srv->srv_netbuf;
826: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
827: LOGERR;
828: close(srv->srv_server.cli_sock);
829: free(srv);
830: return NULL;
831: }
832: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
833: LOGERR;
834: close(srv->srv_server.cli_sock);
835: free(srv);
836: return NULL;
837: }
1.6 misho 838: if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa,
839: srv->srv_server.cli_sa.sa.sa_len) == -1) {
1.1 misho 840: LOGERR;
841: close(srv->srv_server.cli_sock);
842: free(srv);
843: return NULL;
844: }
845:
1.4 misho 846: /* allocate pool for concurent clients */
1.1 misho 847: srv->srv_clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));
848: if (!srv->srv_clients) {
849: LOGERR;
850: close(srv->srv_server.cli_sock);
851: free(srv);
852: return NULL;
853: } else
854: memset(srv->srv_clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));
855:
1.2 misho 856: pthread_mutex_init(&srv->srv_mtx, NULL);
857:
858: rpc_srv_registerCall(srv, NULL, CALL_SRVSHUTDOWN, 0);
1.5 misho 859: rpc_srv_registerCall(srv, NULL, CALL_SRVCLIENTS, 1);
860: rpc_srv_registerCall(srv, NULL, CALL_SRVSESSIONS, 4);
861: rpc_srv_registerCall(srv, NULL, CALL_SRVCALLS, 1);
1.8 ! misho 862:
1.1 misho 863: return srv;
864: }
865:
866: /*
1.7 misho 867: * rpc_srv_endServer() - Destroy RPC server, close all opened sockets and free resources
868: *
1.6 misho 869: * @psrv = RPC Server instance
1.1 misho 870: * return: none
871: */
872: void
1.6 misho 873: rpc_srv_endServer(rpc_srv_t ** __restrict psrv)
1.1 misho 874: {
875: rpc_cli_t *c;
876: register int i;
877: rpc_func_t *f;
878:
1.7 misho 879: ioTRACE(RPC_TRACE_LEVEL);
880:
1.6 misho 881: if (!psrv || !*psrv) {
1.1 misho 882: rpc_SetErr(EINVAL, "Error:: Can`t destroy server because parameter is null!\n");
883: return;
884: }
885:
1.6 misho 886: rpc_srv_endBLOBServer(*psrv);
1.1 misho 887:
1.4 misho 888: /* close all clients connections & server socket */
1.6 misho 889: for (i = 0, c = (*psrv)->srv_clients; i < (*psrv)->srv_numcli && c; i++, c++)
890: if (c->cli_sa.sa.sa_family) {
1.1 misho 891: shutdown(c->cli_sock, SHUT_RDWR);
1.2 misho 892: close(c->cli_sock);
893: }
1.6 misho 894: close((*psrv)->srv_server.cli_sock);
1.1 misho 895:
1.6 misho 896: if ((*psrv)->srv_clients) {
897: free((*psrv)->srv_clients);
898: (*psrv)->srv_clients = NULL;
899: (*psrv)->srv_numcli = 0;
1.1 misho 900: }
901:
1.4 misho 902: /* detach exported calls */
1.6 misho 903: pthread_mutex_lock(&(*psrv)->srv_mtx);
904: while ((f = (*psrv)->srv_funcs)) {
905: (*psrv)->srv_funcs = f->func_next;
906: io_freeVars(&f->func_vars);
1.7 misho 907: AIT_FREE_VAL(&f->func_name);
908: AIT_FREE_VAL(&f->func_file);
1.2 misho 909: free(f);
910: }
1.6 misho 911: pthread_mutex_unlock(&(*psrv)->srv_mtx);
1.2 misho 912:
1.6 misho 913: while (pthread_mutex_trylock(&(*psrv)->srv_mtx) == EBUSY);
914: pthread_mutex_destroy(&(*psrv)->srv_mtx);
1.2 misho 915:
1.6 misho 916: free(*psrv);
917: *psrv = NULL;
1.1 misho 918: }
919:
920: /*
1.7 misho 921: * rpc_srv_loopServer() - Execute Main server loop and wait for clients requests
922: *
1.1 misho 923: * @srv = RPC Server instance
924: * return: -1 error or 0 ok, infinite loop ...
925: */
926: int
1.5 misho 927: rpc_srv_loopServer(rpc_srv_t * __restrict srv)
1.1 misho 928: {
1.6 misho 929: socklen_t salen = sizeof(io_sockaddr_t);
1.1 misho 930: register int i;
931: rpc_cli_t *c;
1.2 misho 932: fd_set fds;
933: int ret;
934: struct timeval tv = { DEF_RPC_TIMEOUT, 0 };
1.7 misho 935: pthread_attr_t attr;
936:
937: ioTRACE(RPC_TRACE_LEVEL);
1.1 misho 938:
939: if (!srv) {
940: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start RPC server ...\n");
941: return -1;
942: }
943:
1.7 misho 944: tv.tv_sec = srv->srv_session.sess_timeout;
945:
1.5 misho 946: /* activate BLOB server worker if srv->srv_blob.state == enable */
947: rpc_srv_execBLOBServer(srv);
948:
1.1 misho 949: if (listen(srv->srv_server.cli_sock, SOMAXCONN) == -1) {
950: LOGERR;
951: return -1;
952: }
953:
1.7 misho 954: pthread_attr_init(&attr);
955: pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
956:
957: /* main rpc loop */
1.4 misho 958: while (srv->srv_kill != kill) {
1.1 misho 959: for (c = srv->srv_clients, i = 0; i < srv->srv_numcli && c; i++, c++)
1.6 misho 960: if (!c->cli_sa.sa.sa_family)
1.1 misho 961: break;
1.2 misho 962: if (i >= srv->srv_numcli) {
1.5 misho 963: #ifdef HAVE_PTHREAD_YIELD
964: pthread_yield();
965: #else
1.1 misho 966: usleep(1000000);
1.5 misho 967: #endif
1.1 misho 968: continue;
969: }
1.2 misho 970:
971: FD_ZERO(&fds);
972: FD_SET(srv->srv_server.cli_sock, &fds);
973: ret = select(srv->srv_server.cli_sock + 1, &fds, NULL, NULL, &tv);
974: if (ret == -1) {
975: LOGERR;
976: ret = 1;
977: break;
978: }
979: if (!ret)
980: continue;
981:
1.6 misho 982: c->cli_sock = accept(srv->srv_server.cli_sock, &c->cli_sa.sa, &salen);
1.1 misho 983: if (c->cli_sock == -1) {
984: LOGERR;
985: continue;
986: } else
987: c->cli_parent = srv;
988:
1.7 misho 989: /* spawn rpc client dispatcher */
990: if (pthread_create(&c->cli_tid, &attr, rpc_srv_dispatchCall, c)) {
1.1 misho 991: LOGERR;
992: continue;
1.7 misho 993: }
1.1 misho 994: }
995:
1.7 misho 996: pthread_attr_destroy(&attr);
1.1 misho 997: return 0;
998: }
999:
1000: // ---------------------------------------------------------
1001:
1002: /*
1003: * rpc_srv_execCall() Execute registered call from RPC server
1.7 misho 1004: *
1.1 misho 1005: * @call = Register RPC call
1006: * @rpc = IN RPC call structure
1.5 misho 1007: * @args = IN RPC calling arguments from RPC client
1.1 misho 1008: * return: -1 error, !=-1 ok
1009: */
1010: int
1.2 misho 1011: rpc_srv_execCall(rpc_func_t * __restrict call, struct tagRPCCall * __restrict rpc,
1.5 misho 1012: array_t * __restrict args)
1.1 misho 1013: {
1014: void *dl;
1015: rpc_callback_t func;
1016: int ret;
1017:
1.7 misho 1018: ioTRACE(RPC_TRACE_LEVEL);
1019:
1.2 misho 1020: if (!call || !rpc || !call->func_parent) {
1.7 misho 1021: rpc_SetErr(EINVAL, "Invalid parameter can`t exec function");
1.1 misho 1022: return -1;
1023: }
1024:
1.7 misho 1025: dl = dlopen(AIT_VOID(&call->func_file), RTLD_NOW);
1.1 misho 1026: if (!dl) {
1.7 misho 1027: rpc_SetErr(ENOENT, "Can`t attach module %s!", dlerror());
1.1 misho 1028: return -1;
1029: }
1030:
1.7 misho 1031: func = dlsym(dl, (const char*) AIT_GET_STR(&call->func_name));
1.1 misho 1032: if (func)
1.6 misho 1033: ret = func(call, ntohs(rpc->call_argc), args);
1.1 misho 1034: else {
1.7 misho 1035: rpc_SetErr(ENOEXEC, "Can`t find function %s!", dlerror());
1.1 misho 1036: ret = -1;
1037: }
1038:
1039: dlclose(dl);
1040: return ret;
1041: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>