Annotation of libaitrpc/src/srv.c, revision 1.8.2.8
1.1 misho 1: /*************************************************************************
2: * (C) 2010 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.8.2.8 ! misho 6: * $Id: srv.c,v 1.8.2.7 2012/05/11 13:35:22 misho Exp $
1.1 misho 7: *
1.2 misho 8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
1.7 misho 15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
1.2 misho 16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
1.1 misho 46: #include "global.h"
47:
48:
1.7 misho 49: static void *rxPacket(sched_task_t*);
50: static void *rxBLOB(sched_task_t*);
51:
52: static void *
53: txPacket(sched_task_t *task)
54: {
55: rpc_cli_t *c = TASK_ARG(task);
56: rpc_srv_t *s = c->cli_parent;
57: rpc_func_t *f = NULL;
58: u_char *buf = TASK_DATA(task);
59: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
60: int ret, wlen = sizeof(struct tagRPCCall);
61: array_t *arr = NULL;
62:
63: if (rpc->call_argc) {
64: f = rpc_srv_getCall(s, ntohs(rpc->call_tag), ntohl(rpc->call_hash));
65: if (!f) {
66: rpc->call_argc ^= rpc->call_argc;
67: rpc->call_rep.ret = RPC_ERROR(-1);
68: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
69: } else {
70: rpc->call_argc = htons(rpc_srv_getVars(f, &arr));
71: /* Go Encapsulate variables */
72: ret = io_vars2buffer(buf + wlen, TASK_DATLEN(task) - wlen, arr);
73: io_clrVars(f->func_vars);
74: if (ret == -1) {
75: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
76: rpc->call_argc ^= rpc->call_argc;
77: rpc->call_rep.ret = RPC_ERROR(-1);
78: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
79: } else
80: wlen += ret;
81: }
82: }
83:
1.8 misho 84: rpc->call_len = htons(wlen);
85:
1.7 misho 86: /* calculate CRC */
87: rpc->call_crc ^= rpc->call_crc;
1.8 misho 88: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
1.7 misho 89:
90: /* send reply */
91: ret = send(TASK_FD(task), buf, wlen, 0);
92: if (ret == -1)
93: LOGERR;
94: else if (ret != wlen)
95: rpc_SetErr(EPROCUNAVAIL, "RPC reply, should be send %d bytes, "
96: "really sended %d bytes", wlen, ret);
97:
98: return NULL;
99: }
100:
101: static void *
102: execCall(sched_task_t *task)
103: {
104: rpc_cli_t *c = TASK_ARG(task);
105: rpc_srv_t *s = c->cli_parent;
106: rpc_func_t *f = NULL;
107: array_t *arr = NULL;
1.8 misho 108: u_char *buf = TASK_DATA(task) + TASK_VAL(task);
1.7 misho 109: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
110: int argc = ntohs(rpc->call_argc);
111:
112: /* Go decapsulate variables ... */
1.8 misho 113: if (argc) {
1.7 misho 114: arr = io_buffer2vars(buf + sizeof(struct tagRPCCall),
1.8 misho 115: TASK_DATLEN(task) - TASK_VAL(task) - sizeof(struct tagRPCCall),
116: argc, 1);
1.7 misho 117: if (!arr) {
118: rpc_SetErr(ERPCMISMATCH, "#%d - %s", io_GetErrno(), io_GetError());
119: rpc->call_argc ^= rpc->call_argc;
120: rpc->call_rep.ret = RPC_ERROR(-1);
121: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
122: return NULL;
123: }
124: }
125:
126: if (!(f = rpc_srv_getCall(s, ntohs(rpc->call_tag), ntohl(rpc->call_hash)))) {
127: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
128: rpc->call_argc ^= rpc->call_argc;
129: rpc->call_rep.ret = RPC_ERROR(-1);
130: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
131: } else {
1.8 misho 132: /* if client doesn't want reply */
133: argc = rpc->call_req.flags & RPC_NOREPLY;
1.7 misho 134: rpc->call_rep.ret = RPC_ERROR(rpc_srv_execCall(f, rpc, arr));
135: if (rpc->call_rep.ret == htonl(-1)) {
136: rpc->call_rep.eno = RPC_ERROR(errno);
137: rpc->call_argc ^= rpc->call_argc;
138: } else {
139: rpc->call_rep.eno ^= rpc->call_rep.eno;
1.8 misho 140: if (argc) {
141: /* without reply */
142: io_clrVars(f->func_vars);
143: rpc->call_argc ^= rpc->call_argc;
144: } else
145: rpc->call_argc = htons(rpc_srv_getVars(f, NULL));
1.7 misho 146: }
147: }
148:
149: if (arr)
150: io_arrayDestroy(&arr);
151: return NULL;
152: }
153:
154: static void *
155: rxPacket(sched_task_t *task)
156: {
157: rpc_cli_t *c = TASK_ARG(task);
158: rpc_srv_t *s = c->cli_parent;
159: u_char *buf = TASK_DATA(task);
1.8 misho 160: int rlen, noreply;
161: u_short crc, off = 0;
1.7 misho 162: struct tagRPCCall *rpc;
163: struct timespec ts;
164:
165: memset(buf, 0, TASK_DATLEN(task));
166: rlen = recv(TASK_FD(task), buf, TASK_DATLEN(task), 0);
167: if (rlen == -1) {
168: LOGERR;
1.8.2.8 ! misho 169: c->cli_kill = kill;
1.7 misho 170: return NULL;
171: } else if (!rlen) { /* receive EOF */
1.8.2.8 ! misho 172: c->cli_kill = kill;
1.7 misho 173: return NULL;
1.8.2.7 misho 174: }
1.7 misho 175:
1.8 misho 176: do {
177: if (rlen < sizeof(struct tagRPCCall)) {
178: rpc_SetErr(ERPCMISMATCH, "Too short RPC packet");
179:
180: schedRead(TASK_ROOT(task), rxPacket, TASK_ARG(task), TASK_FD(task),
181: TASK_DATA(task), TASK_DATLEN(task));
182: return NULL;
183: } else
184: rpc = (struct tagRPCCall*) (buf + off);
185:
186: rlen -= ntohs(rpc->call_len);
187:
188: /* check integrity of packet */
189: crc = ntohs(rpc->call_crc);
190: rpc->call_crc ^= rpc->call_crc;
191: if (crc != crcFletcher16((u_short*) (buf + off), ntohs(rpc->call_len) / 2)) {
192: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
1.7 misho 193:
1.8 misho 194: off += ntohs(rpc->call_len);
195: if (rlen < 1)
196: break;
197: else
198: continue;
199: }
1.7 misho 200:
1.8 misho 201: noreply = rpc->call_req.flags & RPC_NOREPLY;
1.7 misho 202:
1.8 misho 203: /* check RPC packet session info */
204: if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) {
205: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
206: rpc->call_argc ^= rpc->call_argc;
207: rpc->call_rep.ret = RPC_ERROR(-1);
208: rpc->call_rep.eno = RPC_ERROR(errno);
209: } else {
210: /* change socket timeout from last packet */
211: ts.tv_sec = rpc->call_session.sess_timeout;
212: ts.tv_nsec = 0;
213: schedPolling(TASK_ROOT(task), &ts, NULL);
214:
215: /* execute RPC call */
216: schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), off,
217: TASK_DATA(task), TASK_DATLEN(task));
218: }
1.7 misho 219:
1.8 misho 220: /* send RPC reply */
221: if (!noreply)
222: schedWrite(TASK_ROOT(task), txPacket, TASK_ARG(task), TASK_FD(task),
223: buf + off, TASK_DATLEN(task) - off);
1.7 misho 224:
1.8 misho 225: off += ntohs(rpc->call_len);
226: } while (rlen > 0);
1.7 misho 227:
228: /* lets get next packet */
229: schedRead(TASK_ROOT(task), rxPacket, TASK_ARG(task), TASK_FD(task),
230: TASK_DATA(task), TASK_DATLEN(task));
231: return NULL;
232: }
233:
1.1 misho 234: static void *
235: rpc_srv_dispatchCall(void *arg)
236: {
237: rpc_cli_t *c = arg;
238: rpc_srv_t *s;
1.5 misho 239: u_char *buf;
1.7 misho 240: sched_root_task_t *root;
241: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
242:
1.1 misho 243: if (!arg) {
1.7 misho 244: rpc_SetErr(EINVAL, "Invalid parameter can`t procced RPC client");
1.1 misho 245: return NULL;
246: } else
247: s = c->cli_parent;
248:
1.7 misho 249: /* allocate net buffer */
1.5 misho 250: buf = malloc(s->srv_netbuf);
251: if (!buf) {
252: LOGERR;
253: return NULL;
254: }
255:
1.7 misho 256: root = schedBegin();
257: if (!root) {
258: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
259: free(buf);
260: return NULL;
261: } else {
262: schedTermCondition(root, kill);
263: schedPolling(root, &ts, NULL);
264: }
265:
266: schedRead(root, rxPacket, c, c->cli_sock, buf, s->srv_netbuf);
267:
1.8.2.8 ! misho 268: schedRun(root, (void*) &c->cli_kill);
1.7 misho 269: schedEnd(&root);
270:
271: shutdown(c->cli_sock, SHUT_RDWR);
272: close(c->cli_sock);
273: memset(c, 0, sizeof(rpc_cli_t));
274: free(buf);
275: return NULL;
276: }
277:
278:
279: static void *
280: txBLOB(sched_task_t *task)
281: {
282: u_char *buf = TASK_DATA(task);
283: struct tagBLOBHdr *blob = (struct tagBLOBHdr *) buf;
284: int wlen = sizeof(struct tagBLOBHdr);
285:
286: /* calculate CRC */
287: blob->hdr_crc ^= blob->hdr_crc;
1.8 misho 288: blob->hdr_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
1.7 misho 289:
290: /* send reply */
291: wlen = send(TASK_FD(task), buf, wlen, 0);
292: if (wlen == -1)
293: LOGERR;
294: else if (wlen != sizeof(struct tagBLOBHdr))
295: rpc_SetErr(EPROCUNAVAIL, "RPC reply, should be send %d bytes, "
296: "really sended %d bytes", sizeof(struct tagBLOBHdr), wlen);
297:
298: return NULL;
299: }
1.4 misho 300:
1.7 misho 301: static void *
302: rxBLOB(sched_task_t *task)
303: {
304: rpc_cli_t *c = TASK_ARG(task);
305: rpc_srv_t *s = c->cli_parent;
306: rpc_blob_t *b;
307: u_char *buf = TASK_DATA(task);
308: struct tagBLOBHdr *blob = (struct tagBLOBHdr *) buf;
309: int rlen;
310: u_short crc;
311: struct timespec ts;
312:
313: /* check for disable service at this moment? */
1.8 misho 314: if (!s || s->srv_blob.state == disable) {
1.7 misho 315: usleep(100000);
316: #ifdef HAVE_PTHREAD_YIELD
317: pthread_yield();
318: #endif
319: schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task),
320: TASK_DATA(task), TASK_DATLEN(task));
321: return NULL;
322: }
1.5 misho 323:
1.7 misho 324: memset(buf, 0, TASK_DATLEN(task));
325: rlen = recv(TASK_FD(task), buf, TASK_DATLEN(task), 0);
326: if (rlen == -1) {
327: LOGERR;
1.8.2.8 ! misho 328: c->cli_kill = kill;
1.7 misho 329: return NULL;
1.8.2.8 ! misho 330: } else if (!rlen) { /* receive EOF */
! 331: c->cli_kill = kill;
1.7 misho 332: return NULL;
1.8.2.7 misho 333: }
1.6 misho 334:
1.7 misho 335: if (rlen < sizeof(struct tagBLOBHdr)) {
336: rpc_SetErr(ERPCMISMATCH, "Too short BLOB packet");
337: schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task),
338: TASK_DATA(task), TASK_DATLEN(task));
339: return NULL;
340: }
1.1 misho 341:
1.7 misho 342: /* check integrity of packet */
343: crc = ntohs(blob->hdr_crc);
344: blob->hdr_crc ^= blob->hdr_crc;
1.8 misho 345: if (crc != crcFletcher16((u_short*) buf, rlen / 2)) {
1.7 misho 346: rpc_SetErr(ERPCMISMATCH, "Bad CRC BLOB packet");
347: schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task),
348: TASK_DATA(task), TASK_DATLEN(task));
349: return NULL;
350: }
1.5 misho 351:
1.7 misho 352: /* check RPC packet session info */
353: if ((crc = rpc_chkPktSession(&blob->hdr_session, &s->srv_session))) {
354: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
355: blob->hdr_cmd = error;
356: goto end;
357: } else {
358: /* change socket timeout from last packet */
359: ts.tv_sec = blob->hdr_session.sess_timeout;
360: ts.tv_nsec = 0;
361: schedPolling(TASK_ROOT(task), &ts, NULL);
362: }
363:
364: /* Go to proceed packet ... */
365: switch (blob->hdr_cmd) {
366: case get:
367: if (!(b = rpc_srv_getBLOB(s, ntohl(blob->hdr_var)))) {
368: rpc_SetErr(EINVAL, "Var=%x not found", ntohl(blob->hdr_var));
369: blob->hdr_cmd = no;
370: blob->hdr_ret = RPC_ERROR(-1);
371: break;
372: } else
373: blob->hdr_len = htonl(b->blob_len);
1.5 misho 374:
1.7 misho 375: if (rpc_srv_blobMap(s, b) != -1) {
376: /* deliver BLOB variable to client */
377: blob->hdr_ret = htonl(rpc_srv_sendBLOB(c, b));
378: rpc_srv_blobUnmap(b);
379: } else {
380: blob->hdr_cmd = error;
381: blob->hdr_ret = RPC_ERROR(-1);
382: }
383: break;
384: case set:
385: if ((b = rpc_srv_registerBLOB(s, ntohl(blob->hdr_len)))) {
386: /* set new BLOB variable for reply :) */
387: blob->hdr_var = htonl(b->blob_var);
388:
389: /* receive BLOB from client */
390: blob->hdr_ret = htonl(rpc_srv_recvBLOB(c, b));
391: rpc_srv_blobUnmap(b);
1.5 misho 392: } else {
1.7 misho 393: blob->hdr_cmd = error;
394: blob->hdr_ret = RPC_ERROR(-1);
395: }
396: break;
397: case unset:
398: if (rpc_srv_unregisterBLOB(s, blob->hdr_var) == -1) {
399: blob->hdr_cmd = error;
400: blob->hdr_ret = RPC_ERROR(-1);
1.1 misho 401: }
402: break;
1.7 misho 403: default:
404: rpc_SetErr(EPROCUNAVAIL, "Unsupported BLOB command %d", blob->hdr_cmd);
405: blob->hdr_cmd = error;
406: blob->hdr_ret = RPC_ERROR(-1);
407: }
1.1 misho 408:
1.7 misho 409: end:
410: schedWrite(TASK_ROOT(task), txBLOB, TASK_ARG(task), TASK_FD(task),
411: TASK_DATA(task), TASK_DATLEN(task));
412: schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task),
413: TASK_DATA(task), TASK_DATLEN(task));
414: return NULL;
1.2 misho 415: }
416:
417: static void *
418: rpc_srv_dispatchVars(void *arg)
419: {
420: rpc_cli_t *c = arg;
421: rpc_srv_t *s;
1.7 misho 422: sched_root_task_t *root;
423: u_char *buf;
424: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
425:
1.2 misho 426: if (!arg) {
1.7 misho 427: rpc_SetErr(EINVAL, "Invalid parameter can`t procced BLOB");
1.2 misho 428: return NULL;
429: } else
430: s = c->cli_parent;
431:
1.7 misho 432: /* allocate net buffer */
433: buf = malloc(sizeof(struct tagBLOBHdr));
434: if (!buf) {
435: LOGERR;
436: return NULL;
437: }
1.2 misho 438:
1.7 misho 439: root = schedBegin();
440: if (!root) {
441: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
442: free(buf);
443: return NULL;
444: } else {
445: schedTermCondition(root, kill);
446: schedPolling(root, &ts, NULL);
447: }
1.4 misho 448:
1.7 misho 449: schedRead(root, rxBLOB, c, c->cli_sock, buf, sizeof(struct tagBLOBHdr));
1.2 misho 450:
1.8.2.8 ! misho 451: schedRun(root, (void*) &c->cli_kill);
1.7 misho 452: schedEnd(&root);
1.2 misho 453:
454: shutdown(c->cli_sock, SHUT_RDWR);
455: close(c->cli_sock);
456: memset(c, 0, sizeof(rpc_cli_t));
1.7 misho 457: free(buf);
458: return NULL;
1.1 misho 459: }
460:
461: // -------------------------------------------------
462:
463: /*
1.7 misho 464: * rpc_srv_initBLOBServer() - Init & create BLOB Server
465: *
1.4 misho 466: * @srv = RPC server instance
1.2 misho 467: * @Port = Port for bind server, if Port == 0 default port is selected
468: * @diskDir = Disk place for BLOB file objects
469: * return: -1 == error or 0 bind and created BLOB server instance
470: */
471: int
472: rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)
473: {
474: int n = 1;
1.6 misho 475: io_sockaddr_t sa;
1.2 misho 476:
477: if (!srv) {
1.7 misho 478: rpc_SetErr(EINVAL, "Invalid parameters can`t init BLOB server");
1.2 misho 479: return -1;
480: }
481: if (srv->srv_blob.state) {
1.7 misho 482: rpc_SetErr(EPERM, "Already started BLOB server!");
1.2 misho 483: return 0;
484: }
485:
486: memset(&srv->srv_blob, 0, sizeof srv->srv_blob);
487: if (access(diskDir, R_OK | W_OK) == -1) {
488: LOGERR;
489: return -1;
490: } else
1.7 misho 491: AIT_SET_STR(&srv->srv_blob.dir, diskDir);
1.2 misho 492:
493: srv->srv_blob.server.cli_tid = pthread_self();
494: srv->srv_blob.server.cli_parent = srv;
1.4 misho 495:
496: memcpy(&sa, &srv->srv_server.cli_sa, sizeof sa);
1.6 misho 497: switch (sa.sa.sa_family) {
1.4 misho 498: case AF_INET:
1.6 misho 499: sa.sin.sin_port = htons(Port ? Port : ntohs(sa.sin.sin_port) + 1);
1.4 misho 500: break;
501: case AF_INET6:
1.6 misho 502: sa.sin6.sin6_port = htons(Port ? Port : ntohs(sa.sin6.sin6_port) + 1);
1.4 misho 503: break;
504: case AF_LOCAL:
1.6 misho 505: strlcat(sa.sun.sun_path, ".blob", sizeof sa.sun.sun_path);
1.4 misho 506: break;
507: default:
1.7 misho 508: AIT_FREE_VAL(&srv->srv_blob.dir);
1.4 misho 509: return -1;
1.2 misho 510: }
1.6 misho 511: memcpy(&srv->srv_blob.server.cli_sa, &sa, sizeof sa);
1.2 misho 512:
1.4 misho 513: /* create BLOB server socket */
1.6 misho 514: srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);
1.2 misho 515: if (srv->srv_blob.server.cli_sock == -1) {
516: LOGERR;
1.7 misho 517: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 518: return -1;
519: }
520: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
521: LOGERR;
522: close(srv->srv_blob.server.cli_sock);
1.7 misho 523: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 524: return -1;
525: }
1.5 misho 526: n = srv->srv_netbuf;
527: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
528: LOGERR;
529: close(srv->srv_blob.server.cli_sock);
1.7 misho 530: AIT_FREE_VAL(&srv->srv_blob.dir);
1.5 misho 531: return -1;
532: }
533: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
534: LOGERR;
535: close(srv->srv_blob.server.cli_sock);
1.7 misho 536: AIT_FREE_VAL(&srv->srv_blob.dir);
1.5 misho 537: return -1;
538: }
1.6 misho 539: if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa,
540: srv->srv_blob.server.cli_sa.sa.sa_len) == -1) {
1.2 misho 541: LOGERR;
542: close(srv->srv_blob.server.cli_sock);
1.7 misho 543: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 544: return -1;
545: }
546:
1.4 misho 547: /* allocate pool for concurent clients */
1.2 misho 548: srv->srv_blob.clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));
549: if (!srv->srv_blob.clients) {
550: LOGERR;
551: close(srv->srv_blob.server.cli_sock);
1.7 misho 552: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 553: return -1;
554: } else
555: memset(srv->srv_blob.clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));
556:
557: rpc_srv_registerCall(srv, NULL, CALL_BLOBSHUTDOWN, 0);
1.5 misho 558: rpc_srv_registerCall(srv, NULL, CALL_BLOBCLIENTS, 1);
559: rpc_srv_registerCall(srv, NULL, CALL_BLOBVARS, 1);
560: rpc_srv_registerCall(srv, NULL, CALL_BLOBSTATE, 0);
1.2 misho 561:
1.4 misho 562: srv->srv_blob.state = enable; /* enable BLOB */
1.2 misho 563: return 0;
564: }
565:
566: /*
1.7 misho 567: * rpc_srv_endBLOBServer() - Destroy BLOB server, close all opened sockets and free resources
568: *
1.2 misho 569: * @srv = RPC Server instance
570: * return: none
571: */
572: void
573: rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
574: {
575: rpc_cli_t *c;
576: register int i;
577: rpc_blob_t *f;
578:
1.8 misho 579: if (!srv || srv->srv_blob.state == disable)
1.2 misho 580: return;
1.8 misho 581: else
1.4 misho 582: srv->srv_blob.state = kill;
1.2 misho 583:
584: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSHUTDOWN);
585: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBCLIENTS);
586: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBVARS);
587: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSTATE);
588:
1.4 misho 589: /* close all clients connections & server socket */
1.2 misho 590: for (i = 0, c = srv->srv_blob.clients; i < srv->srv_numcli && c; i++, c++)
1.6 misho 591: if (c->cli_sa.sa.sa_family)
1.2 misho 592: shutdown(c->cli_sock, SHUT_RDWR);
593: close(srv->srv_blob.server.cli_sock);
594:
595: if (srv->srv_blob.clients) {
596: free(srv->srv_blob.clients);
597: srv->srv_blob.clients = NULL;
598: }
599:
1.4 misho 600: /* detach blobs */
1.2 misho 601: while ((f = srv->srv_blob.blobs)) {
602: srv->srv_blob.blobs = f->blob_next;
603: rpc_srv_blobFree(srv, f);
604: free(f);
605: }
606:
1.7 misho 607: AIT_FREE_VAL(&srv->srv_blob.dir);
608:
1.8 misho 609: /* at final, save disable BLOB service state */
610: srv->srv_blob.state = disable;
1.2 misho 611: }
612:
613: /*
1.7 misho 614: * rpc_srv_loopBLOB() - Execute Main BLOB server loop and wait for clients requests
615: *
1.2 misho 616: * @srv = RPC Server instance
617: * return: -1 error or 0 ok, infinite loop ...
618: */
619: int
1.5 misho 620: rpc_srv_loopBLOB(rpc_srv_t * __restrict srv)
1.2 misho 621: {
1.6 misho 622: socklen_t salen = sizeof(io_sockaddr_t);
1.2 misho 623: register int i;
624: rpc_cli_t *c;
625: int ret;
1.7 misho 626: pthread_attr_t attr;
1.8.2.2 misho 627: struct pollfd pfd;
1.7 misho 628:
1.4 misho 629: if (!srv || srv->srv_blob.state == kill) {
1.7 misho 630: rpc_SetErr(EINVAL, "Invalid parameter can`t start BLOB server");
1.2 misho 631: return -1;
632: }
633:
634: if (listen(srv->srv_blob.server.cli_sock, SOMAXCONN) == -1) {
635: LOGERR;
636: return -1;
1.8.2.6 misho 637: } else
1.8.2.4 misho 638: fcntl(srv->srv_blob.server.cli_sock, F_SETFL,
639: fcntl(srv->srv_blob.server.cli_sock, F_GETFL) | O_NONBLOCK);
1.2 misho 640:
1.7 misho 641: pthread_attr_init(&attr);
642: pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
643:
1.8.2.4 misho 644: pfd.fd = srv->srv_blob.server.cli_sock;
1.8.2.2 misho 645: pfd.events = POLLIN | POLLPRI;
1.7 misho 646: /* main BLOB loop */
1.4 misho 647: while (srv->srv_blob.state != kill && srv->srv_kill != kill) {
1.2 misho 648: for (c = srv->srv_blob.clients, i = 0; i < srv->srv_numcli && c; i++, c++)
1.6 misho 649: if (!c->cli_sa.sa.sa_family)
1.2 misho 650: break;
651: if (i >= srv->srv_numcli) {
1.5 misho 652: #ifdef HAVE_PTHREAD_YIELD
653: pthread_yield();
654: #else
1.2 misho 655: usleep(1000000);
1.5 misho 656: #endif
1.2 misho 657: continue;
658: }
659:
1.8.2.2 misho 660: if ((ret = poll(&pfd, 1, srv->srv_session.sess_timeout * 1000)) == -1 ||
661: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
1.2 misho 662: LOGERR;
663: ret = 1;
664: break;
1.8.2.2 misho 665: } else if (!ret)
1.2 misho 666: continue;
667:
1.6 misho 668: c->cli_sock = accept(srv->srv_blob.server.cli_sock, &c->cli_sa.sa, &salen);
1.2 misho 669: if (c->cli_sock == -1) {
670: LOGERR;
671: continue;
672: } else
673: c->cli_parent = srv;
674:
1.7 misho 675: /* spawn dispatch thread for BLOB client */
1.8.2.8 ! misho 676: c->cli_kill = enable;
1.7 misho 677: if (pthread_create(&c->cli_tid, &attr, rpc_srv_dispatchVars, c)) {
1.2 misho 678: LOGERR;
679: continue;
1.7 misho 680: }
1.2 misho 681: }
682:
1.5 misho 683: srv->srv_blob.state = kill;
1.2 misho 684:
1.7 misho 685: pthread_attr_destroy(&attr);
1.2 misho 686: return 0;
687: }
688:
689:
690: /*
1.7 misho 691: * rpc_srv_initServer() - Init & create RPC Server
692: *
1.1 misho 693: * @regProgID = ProgramID for authentication & recognition
694: * @regProcID = ProcessID for authentication & recognition
695: * @concurentClients = Concurent clients at same time to this server
1.5 misho 696: * @netBuf = Network buffer length, if =0 == BUFSIZ (also meaning max RPC packet)
1.4 misho 697: * @family = Family type, AF_INET, AF_INET6 or AF_LOCAL
698: * @csHost = Host name or address for bind server, if NULL any address
1.1 misho 699: * @Port = Port for bind server, if Port == 0 default port is selected
700: * return: NULL == error or !=NULL bind and created RPC server instance
701: */
702: rpc_srv_t *
703: rpc_srv_initServer(u_int regProgID, u_int regProcID, int concurentClients,
1.5 misho 704: int netBuf, u_short family, const char *csHost, u_short Port)
1.1 misho 705: {
706: rpc_srv_t *srv = NULL;
707: int n = 1;
708: struct hostent *host = NULL;
1.6 misho 709: io_sockaddr_t sa;
1.1 misho 710:
1.4 misho 711: if (!concurentClients || !regProgID ||
712: (family != AF_INET && family != AF_INET6 && family != AF_LOCAL)) {
1.1 misho 713: rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t init RPC server ...\n");
714: return NULL;
715: }
716: if (!Port)
717: Port = RPC_DEFPORT;
1.5 misho 718: if (!netBuf)
719: netBuf = BUFSIZ;
1.7 misho 720: else
721: netBuf = io_align(netBuf, 1); /* align netBuf length */
1.4 misho 722: if (csHost && family != AF_LOCAL) {
1.1 misho 723: host = gethostbyname2(csHost, family);
724: if (!host) {
725: rpc_SetErr(h_errno, "Error:: %s\n", hstrerror(h_errno));
726: return NULL;
727: }
728: }
1.4 misho 729: memset(&sa, 0, sizeof sa);
1.6 misho 730: sa.sa.sa_family = family;
1.1 misho 731: switch (family) {
732: case AF_INET:
1.6 misho 733: sa.sin.sin_len = sizeof(struct sockaddr_in);
734: sa.sin.sin_port = htons(Port);
1.1 misho 735: if (csHost)
1.6 misho 736: memcpy(&sa.sin.sin_addr, host->h_addr, host->h_length);
1.1 misho 737: break;
738: case AF_INET6:
1.6 misho 739: sa.sin6.sin6_len = sizeof(struct sockaddr_in6);
740: sa.sin6.sin6_port = htons(Port);
1.4 misho 741: if (csHost)
1.6 misho 742: memcpy(&sa.sin6.sin6_addr, host->h_addr, host->h_length);
1.4 misho 743: break;
744: case AF_LOCAL:
1.6 misho 745: sa.sun.sun_len = sizeof(struct sockaddr_un);
1.1 misho 746: if (csHost)
1.6 misho 747: strlcpy(sa.sun.sun_path, csHost, sizeof sa.sun.sun_path);
748: unlink(sa.sun.sun_path);
1.1 misho 749: break;
750: default:
751: rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t start RPC server ...\n");
752: return NULL;
753: }
754:
755: srv = malloc(sizeof(rpc_srv_t));
756: if (!srv) {
757: LOGERR;
758: return NULL;
759: } else
760: memset(srv, 0, sizeof(rpc_srv_t));
761:
1.5 misho 762: srv->srv_netbuf = netBuf;
1.1 misho 763: srv->srv_numcli = concurentClients;
764: srv->srv_session.sess_version = RPC_VERSION;
1.7 misho 765: srv->srv_session.sess_timeout = DEF_RPC_TIMEOUT;
1.1 misho 766: srv->srv_session.sess_program = regProgID;
767: srv->srv_session.sess_process = regProcID;
768:
1.2 misho 769: srv->srv_server.cli_tid = pthread_self();
1.1 misho 770: srv->srv_server.cli_parent = srv;
1.6 misho 771: memcpy(&srv->srv_server.cli_sa, &sa, sizeof sa);
1.4 misho 772:
773: /* create server socket */
1.1 misho 774: srv->srv_server.cli_sock = socket(family, SOCK_STREAM, 0);
775: if (srv->srv_server.cli_sock == -1) {
776: LOGERR;
777: free(srv);
778: return NULL;
779: }
780: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
781: LOGERR;
782: close(srv->srv_server.cli_sock);
783: free(srv);
784: return NULL;
785: }
1.5 misho 786: n = srv->srv_netbuf;
787: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
788: LOGERR;
789: close(srv->srv_server.cli_sock);
790: free(srv);
791: return NULL;
792: }
793: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
794: LOGERR;
795: close(srv->srv_server.cli_sock);
796: free(srv);
797: return NULL;
798: }
1.6 misho 799: if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa,
800: srv->srv_server.cli_sa.sa.sa_len) == -1) {
1.1 misho 801: LOGERR;
802: close(srv->srv_server.cli_sock);
803: free(srv);
804: return NULL;
805: }
806:
1.4 misho 807: /* allocate pool for concurent clients */
1.1 misho 808: srv->srv_clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));
809: if (!srv->srv_clients) {
810: LOGERR;
811: close(srv->srv_server.cli_sock);
812: free(srv);
813: return NULL;
814: } else
815: memset(srv->srv_clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));
816:
1.2 misho 817: rpc_srv_registerCall(srv, NULL, CALL_SRVSHUTDOWN, 0);
1.5 misho 818: rpc_srv_registerCall(srv, NULL, CALL_SRVCLIENTS, 1);
819: rpc_srv_registerCall(srv, NULL, CALL_SRVSESSIONS, 4);
820: rpc_srv_registerCall(srv, NULL, CALL_SRVCALLS, 1);
1.8 misho 821:
1.1 misho 822: return srv;
823: }
824:
825: /*
1.7 misho 826: * rpc_srv_endServer() - Destroy RPC server, close all opened sockets and free resources
827: *
1.6 misho 828: * @psrv = RPC Server instance
1.1 misho 829: * return: none
830: */
831: void
1.6 misho 832: rpc_srv_endServer(rpc_srv_t ** __restrict psrv)
1.1 misho 833: {
834: rpc_cli_t *c;
835: register int i;
836: rpc_func_t *f;
837:
1.6 misho 838: if (!psrv || !*psrv) {
1.1 misho 839: rpc_SetErr(EINVAL, "Error:: Can`t destroy server because parameter is null!\n");
840: return;
841: }
842:
1.6 misho 843: rpc_srv_endBLOBServer(*psrv);
1.1 misho 844:
1.4 misho 845: /* close all clients connections & server socket */
1.6 misho 846: for (i = 0, c = (*psrv)->srv_clients; i < (*psrv)->srv_numcli && c; i++, c++)
847: if (c->cli_sa.sa.sa_family) {
1.1 misho 848: shutdown(c->cli_sock, SHUT_RDWR);
1.2 misho 849: close(c->cli_sock);
850: }
1.6 misho 851: close((*psrv)->srv_server.cli_sock);
1.1 misho 852:
1.6 misho 853: if ((*psrv)->srv_clients) {
854: free((*psrv)->srv_clients);
855: (*psrv)->srv_clients = NULL;
856: (*psrv)->srv_numcli = 0;
1.1 misho 857: }
858:
1.4 misho 859: /* detach exported calls */
1.6 misho 860: while ((f = (*psrv)->srv_funcs)) {
861: (*psrv)->srv_funcs = f->func_next;
862: io_freeVars(&f->func_vars);
1.7 misho 863: AIT_FREE_VAL(&f->func_name);
864: AIT_FREE_VAL(&f->func_file);
1.2 misho 865: free(f);
866: }
867:
1.6 misho 868: free(*psrv);
869: *psrv = NULL;
1.1 misho 870: }
871:
872: /*
1.7 misho 873: * rpc_srv_loopServer() - Execute Main server loop and wait for clients requests
874: *
1.1 misho 875: * @srv = RPC Server instance
876: * return: -1 error or 0 ok, infinite loop ...
877: */
878: int
1.5 misho 879: rpc_srv_loopServer(rpc_srv_t * __restrict srv)
1.1 misho 880: {
1.6 misho 881: socklen_t salen = sizeof(io_sockaddr_t);
1.1 misho 882: register int i;
883: rpc_cli_t *c;
1.2 misho 884: int ret;
1.7 misho 885: pthread_attr_t attr;
1.8.2.2 misho 886: struct pollfd pfd;
1.7 misho 887:
1.1 misho 888: if (!srv) {
889: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start RPC server ...\n");
890: return -1;
891: }
892:
1.5 misho 893: /* activate BLOB server worker if srv->srv_blob.state == enable */
894: rpc_srv_execBLOBServer(srv);
895:
1.1 misho 896: if (listen(srv->srv_server.cli_sock, SOMAXCONN) == -1) {
897: LOGERR;
898: return -1;
1.8.2.8 ! misho 899: } else
1.8.2.2 misho 900: fcntl(srv->srv_server.cli_sock, F_SETFL, fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
1.1 misho 901:
1.7 misho 902: pthread_attr_init(&attr);
903: pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
904:
1.8.2.2 misho 905: pfd.fd = srv->srv_server.cli_sock;
906: pfd.events = POLLIN | POLLPRI;
1.7 misho 907: /* main rpc loop */
1.4 misho 908: while (srv->srv_kill != kill) {
1.1 misho 909: for (c = srv->srv_clients, i = 0; i < srv->srv_numcli && c; i++, c++)
1.6 misho 910: if (!c->cli_sa.sa.sa_family)
1.1 misho 911: break;
1.2 misho 912: if (i >= srv->srv_numcli) {
1.5 misho 913: #ifdef HAVE_PTHREAD_YIELD
914: pthread_yield();
915: #else
1.1 misho 916: usleep(1000000);
1.5 misho 917: #endif
1.1 misho 918: continue;
919: }
1.2 misho 920:
1.8.2.2 misho 921: if ((ret = poll(&pfd, 1, srv->srv_session.sess_timeout * 1000)) == -1 ||
922: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
1.2 misho 923: LOGERR;
924: ret = 1;
925: break;
1.8.2.2 misho 926: } else if (!ret)
1.2 misho 927: continue;
928:
1.6 misho 929: c->cli_sock = accept(srv->srv_server.cli_sock, &c->cli_sa.sa, &salen);
1.1 misho 930: if (c->cli_sock == -1) {
931: LOGERR;
932: continue;
933: } else
934: c->cli_parent = srv;
935:
1.7 misho 936: /* spawn rpc client dispatcher */
1.8.2.8 ! misho 937: c->cli_kill = enable;
1.7 misho 938: if (pthread_create(&c->cli_tid, &attr, rpc_srv_dispatchCall, c)) {
1.1 misho 939: LOGERR;
940: continue;
1.7 misho 941: }
1.1 misho 942: }
943:
1.7 misho 944: pthread_attr_destroy(&attr);
1.1 misho 945: return 0;
946: }
947:
948: // ---------------------------------------------------------
949:
950: /*
951: * rpc_srv_execCall() Execute registered call from RPC server
1.7 misho 952: *
1.1 misho 953: * @call = Register RPC call
954: * @rpc = IN RPC call structure
1.5 misho 955: * @args = IN RPC calling arguments from RPC client
1.1 misho 956: * return: -1 error, !=-1 ok
957: */
958: int
1.2 misho 959: rpc_srv_execCall(rpc_func_t * __restrict call, struct tagRPCCall * __restrict rpc,
1.5 misho 960: array_t * __restrict args)
1.1 misho 961: {
962: void *dl;
963: rpc_callback_t func;
964: int ret;
965:
1.2 misho 966: if (!call || !rpc || !call->func_parent) {
1.7 misho 967: rpc_SetErr(EINVAL, "Invalid parameter can`t exec function");
1.1 misho 968: return -1;
969: }
970:
1.8.2.1 misho 971: dl = dlopen(AIT_ADDR(&call->func_file), RTLD_NOW);
1.1 misho 972: if (!dl) {
1.7 misho 973: rpc_SetErr(ENOENT, "Can`t attach module %s!", dlerror());
1.1 misho 974: return -1;
975: }
976:
1.7 misho 977: func = dlsym(dl, (const char*) AIT_GET_STR(&call->func_name));
1.1 misho 978: if (func)
1.6 misho 979: ret = func(call, ntohs(rpc->call_argc), args);
1.1 misho 980: else {
1.7 misho 981: rpc_SetErr(ENOEXEC, "Can`t find function %s!", dlerror());
1.1 misho 982: ret = -1;
983: }
984:
985: dlclose(dl);
986: return ret;
987: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>