Annotation of libaitrpc/src/srv.c, revision 1.8.2.7
1.1 misho 1: /*************************************************************************
2: * (C) 2010 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.8.2.7 ! misho 6: * $Id: srv.c,v 1.8.2.6 2012/05/11 13:23:12 misho Exp $
1.1 misho 7: *
1.2 misho 8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
1.7 misho 15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
1.2 misho 16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
1.1 misho 46: #include "global.h"
47:
48:
1.7 misho 49: static void *rxPacket(sched_task_t*);
50: static void *rxBLOB(sched_task_t*);
51:
52: static void *
53: txPacket(sched_task_t *task)
54: {
55: rpc_cli_t *c = TASK_ARG(task);
56: rpc_srv_t *s = c->cli_parent;
57: rpc_func_t *f = NULL;
58: u_char *buf = TASK_DATA(task);
59: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
60: int ret, wlen = sizeof(struct tagRPCCall);
61: array_t *arr = NULL;
62:
63: if (rpc->call_argc) {
64: f = rpc_srv_getCall(s, ntohs(rpc->call_tag), ntohl(rpc->call_hash));
65: if (!f) {
66: rpc->call_argc ^= rpc->call_argc;
67: rpc->call_rep.ret = RPC_ERROR(-1);
68: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
69: } else {
70: rpc->call_argc = htons(rpc_srv_getVars(f, &arr));
71: /* Go Encapsulate variables */
72: ret = io_vars2buffer(buf + wlen, TASK_DATLEN(task) - wlen, arr);
73: io_clrVars(f->func_vars);
74: if (ret == -1) {
75: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
76: rpc->call_argc ^= rpc->call_argc;
77: rpc->call_rep.ret = RPC_ERROR(-1);
78: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
79: } else
80: wlen += ret;
81: }
82: }
83:
1.8 misho 84: rpc->call_len = htons(wlen);
85:
1.7 misho 86: /* calculate CRC */
87: rpc->call_crc ^= rpc->call_crc;
1.8 misho 88: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
1.7 misho 89:
90: /* send reply */
91: ret = send(TASK_FD(task), buf, wlen, 0);
92: if (ret == -1)
93: LOGERR;
94: else if (ret != wlen)
95: rpc_SetErr(EPROCUNAVAIL, "RPC reply, should be send %d bytes, "
96: "really sended %d bytes", wlen, ret);
97:
98: return NULL;
99: }
100:
101: static void *
102: execCall(sched_task_t *task)
103: {
104: rpc_cli_t *c = TASK_ARG(task);
105: rpc_srv_t *s = c->cli_parent;
106: rpc_func_t *f = NULL;
107: array_t *arr = NULL;
1.8 misho 108: u_char *buf = TASK_DATA(task) + TASK_VAL(task);
1.7 misho 109: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
110: int argc = ntohs(rpc->call_argc);
111:
112: /* Go decapsulate variables ... */
1.8 misho 113: if (argc) {
1.7 misho 114: arr = io_buffer2vars(buf + sizeof(struct tagRPCCall),
1.8 misho 115: TASK_DATLEN(task) - TASK_VAL(task) - sizeof(struct tagRPCCall),
116: argc, 1);
1.7 misho 117: if (!arr) {
118: rpc_SetErr(ERPCMISMATCH, "#%d - %s", io_GetErrno(), io_GetError());
119: rpc->call_argc ^= rpc->call_argc;
120: rpc->call_rep.ret = RPC_ERROR(-1);
121: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
122: return NULL;
123: }
124: }
125:
126: if (!(f = rpc_srv_getCall(s, ntohs(rpc->call_tag), ntohl(rpc->call_hash)))) {
127: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
128: rpc->call_argc ^= rpc->call_argc;
129: rpc->call_rep.ret = RPC_ERROR(-1);
130: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
131: } else {
1.8 misho 132: /* if client doesn't want reply */
133: argc = rpc->call_req.flags & RPC_NOREPLY;
1.7 misho 134: rpc->call_rep.ret = RPC_ERROR(rpc_srv_execCall(f, rpc, arr));
135: if (rpc->call_rep.ret == htonl(-1)) {
136: rpc->call_rep.eno = RPC_ERROR(errno);
137: rpc->call_argc ^= rpc->call_argc;
138: } else {
139: rpc->call_rep.eno ^= rpc->call_rep.eno;
1.8 misho 140: if (argc) {
141: /* without reply */
142: io_clrVars(f->func_vars);
143: rpc->call_argc ^= rpc->call_argc;
144: } else
145: rpc->call_argc = htons(rpc_srv_getVars(f, NULL));
1.7 misho 146: }
147: }
148:
149: if (arr)
150: io_arrayDestroy(&arr);
151: return NULL;
152: }
153:
154: static void *
155: rxPacket(sched_task_t *task)
156: {
157: rpc_cli_t *c = TASK_ARG(task);
158: rpc_srv_t *s = c->cli_parent;
159: u_char *buf = TASK_DATA(task);
1.8 misho 160: int rlen, noreply;
161: u_short crc, off = 0;
1.7 misho 162: struct tagRPCCall *rpc;
163: struct timespec ts;
164:
165: memset(buf, 0, TASK_DATLEN(task));
166: rlen = recv(TASK_FD(task), buf, TASK_DATLEN(task), 0);
167: if (rlen == -1) {
168: LOGERR;
169: s->srv_kill = s->srv_blob.state = kill;
170: return NULL;
171: } else if (!rlen) { /* receive EOF */
172: s->srv_kill = s->srv_blob.state = kill;
173: return NULL;
1.8.2.7 ! misho 174: }
1.7 misho 175:
1.8 misho 176: do {
177: if (rlen < sizeof(struct tagRPCCall)) {
178: rpc_SetErr(ERPCMISMATCH, "Too short RPC packet");
179:
180: schedRead(TASK_ROOT(task), rxPacket, TASK_ARG(task), TASK_FD(task),
181: TASK_DATA(task), TASK_DATLEN(task));
182: return NULL;
183: } else
184: rpc = (struct tagRPCCall*) (buf + off);
185:
186: rlen -= ntohs(rpc->call_len);
187:
188: /* check integrity of packet */
189: crc = ntohs(rpc->call_crc);
190: rpc->call_crc ^= rpc->call_crc;
191: if (crc != crcFletcher16((u_short*) (buf + off), ntohs(rpc->call_len) / 2)) {
192: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
1.7 misho 193:
1.8 misho 194: off += ntohs(rpc->call_len);
195: if (rlen < 1)
196: break;
197: else
198: continue;
199: }
1.7 misho 200:
1.8 misho 201: noreply = rpc->call_req.flags & RPC_NOREPLY;
1.7 misho 202:
1.8 misho 203: /* check RPC packet session info */
204: if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) {
205: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
206: rpc->call_argc ^= rpc->call_argc;
207: rpc->call_rep.ret = RPC_ERROR(-1);
208: rpc->call_rep.eno = RPC_ERROR(errno);
209: } else {
210: /* change socket timeout from last packet */
211: ts.tv_sec = rpc->call_session.sess_timeout;
212: ts.tv_nsec = 0;
213: schedPolling(TASK_ROOT(task), &ts, NULL);
214:
215: /* execute RPC call */
216: schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), off,
217: TASK_DATA(task), TASK_DATLEN(task));
218: }
1.7 misho 219:
1.8 misho 220: /* send RPC reply */
221: if (!noreply)
222: schedWrite(TASK_ROOT(task), txPacket, TASK_ARG(task), TASK_FD(task),
223: buf + off, TASK_DATLEN(task) - off);
1.7 misho 224:
1.8 misho 225: off += ntohs(rpc->call_len);
226: } while (rlen > 0);
1.7 misho 227:
228: /* lets get next packet */
229: schedRead(TASK_ROOT(task), rxPacket, TASK_ARG(task), TASK_FD(task),
230: TASK_DATA(task), TASK_DATLEN(task));
231: return NULL;
232: }
233:
1.1 misho 234: static void *
235: rpc_srv_dispatchCall(void *arg)
236: {
237: rpc_cli_t *c = arg;
238: rpc_srv_t *s;
1.5 misho 239: u_char *buf;
1.7 misho 240: sched_root_task_t *root;
241: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
242:
1.1 misho 243: if (!arg) {
1.7 misho 244: rpc_SetErr(EINVAL, "Invalid parameter can`t procced RPC client");
1.1 misho 245: return NULL;
246: } else
247: s = c->cli_parent;
248:
1.7 misho 249: /* allocate net buffer */
1.5 misho 250: buf = malloc(s->srv_netbuf);
251: if (!buf) {
252: LOGERR;
253: return NULL;
254: }
255:
1.7 misho 256: root = schedBegin();
257: if (!root) {
258: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
259: free(buf);
260: return NULL;
261: } else {
262: schedTermCondition(root, kill);
263: schedPolling(root, &ts, NULL);
264: }
265:
266: schedRead(root, rxPacket, c, c->cli_sock, buf, s->srv_netbuf);
267:
268: schedRun(root, (void*) &s->srv_kill);
269: schedEnd(&root);
270:
271: shutdown(c->cli_sock, SHUT_RDWR);
272: close(c->cli_sock);
273: memset(c, 0, sizeof(rpc_cli_t));
274: free(buf);
275: return NULL;
276: }
277:
278:
279: static void *
280: txBLOB(sched_task_t *task)
281: {
282: u_char *buf = TASK_DATA(task);
283: struct tagBLOBHdr *blob = (struct tagBLOBHdr *) buf;
284: int wlen = sizeof(struct tagBLOBHdr);
285:
286: /* calculate CRC */
287: blob->hdr_crc ^= blob->hdr_crc;
1.8 misho 288: blob->hdr_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
1.7 misho 289:
290: /* send reply */
291: wlen = send(TASK_FD(task), buf, wlen, 0);
292: if (wlen == -1)
293: LOGERR;
294: else if (wlen != sizeof(struct tagBLOBHdr))
295: rpc_SetErr(EPROCUNAVAIL, "RPC reply, should be send %d bytes, "
296: "really sended %d bytes", sizeof(struct tagBLOBHdr), wlen);
297:
298: return NULL;
299: }
1.4 misho 300:
1.7 misho 301: static void *
302: rxBLOB(sched_task_t *task)
303: {
304: rpc_cli_t *c = TASK_ARG(task);
305: rpc_srv_t *s = c->cli_parent;
306: rpc_blob_t *b;
307: u_char *buf = TASK_DATA(task);
308: struct tagBLOBHdr *blob = (struct tagBLOBHdr *) buf;
309: int rlen;
310: u_short crc;
311: struct timespec ts;
312:
313: /* check for disable service at this moment? */
1.8 misho 314: if (!s || s->srv_blob.state == disable) {
1.7 misho 315: usleep(100000);
316: #ifdef HAVE_PTHREAD_YIELD
317: pthread_yield();
318: #endif
319: schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task),
320: TASK_DATA(task), TASK_DATLEN(task));
321: return NULL;
322: }
1.5 misho 323:
1.7 misho 324: memset(buf, 0, TASK_DATLEN(task));
325: rlen = recv(TASK_FD(task), buf, TASK_DATLEN(task), 0);
326: if (rlen == -1) {
327: LOGERR;
328: s->srv_blob.state = kill;
329: return NULL;
330: } else if (!rlen || s->srv_kill == kill) { /* receive EOF */
331: s->srv_blob.state = kill;
332: return NULL;
1.8.2.7 ! misho 333: }
1.6 misho 334:
1.7 misho 335: if (rlen < sizeof(struct tagBLOBHdr)) {
336: rpc_SetErr(ERPCMISMATCH, "Too short BLOB packet");
337: schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task),
338: TASK_DATA(task), TASK_DATLEN(task));
339: return NULL;
340: }
1.1 misho 341:
1.7 misho 342: /* check integrity of packet */
343: crc = ntohs(blob->hdr_crc);
344: blob->hdr_crc ^= blob->hdr_crc;
1.8 misho 345: if (crc != crcFletcher16((u_short*) buf, rlen / 2)) {
1.7 misho 346: rpc_SetErr(ERPCMISMATCH, "Bad CRC BLOB packet");
347: schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task),
348: TASK_DATA(task), TASK_DATLEN(task));
349: return NULL;
350: }
1.5 misho 351:
1.7 misho 352: /* check RPC packet session info */
353: if ((crc = rpc_chkPktSession(&blob->hdr_session, &s->srv_session))) {
354: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
355: blob->hdr_cmd = error;
356: goto end;
357: } else {
358: /* change socket timeout from last packet */
359: ts.tv_sec = blob->hdr_session.sess_timeout;
360: ts.tv_nsec = 0;
361: schedPolling(TASK_ROOT(task), &ts, NULL);
362: }
363:
364: /* Go to proceed packet ... */
365: switch (blob->hdr_cmd) {
366: case get:
367: if (!(b = rpc_srv_getBLOB(s, ntohl(blob->hdr_var)))) {
368: rpc_SetErr(EINVAL, "Var=%x not found", ntohl(blob->hdr_var));
369: blob->hdr_cmd = no;
370: blob->hdr_ret = RPC_ERROR(-1);
371: break;
372: } else
373: blob->hdr_len = htonl(b->blob_len);
1.5 misho 374:
1.7 misho 375: if (rpc_srv_blobMap(s, b) != -1) {
376: /* deliver BLOB variable to client */
377: blob->hdr_ret = htonl(rpc_srv_sendBLOB(c, b));
378: rpc_srv_blobUnmap(b);
379: } else {
380: blob->hdr_cmd = error;
381: blob->hdr_ret = RPC_ERROR(-1);
382: }
383: break;
384: case set:
385: if ((b = rpc_srv_registerBLOB(s, ntohl(blob->hdr_len)))) {
386: /* set new BLOB variable for reply :) */
387: blob->hdr_var = htonl(b->blob_var);
388:
389: /* receive BLOB from client */
390: blob->hdr_ret = htonl(rpc_srv_recvBLOB(c, b));
391: rpc_srv_blobUnmap(b);
1.5 misho 392: } else {
1.7 misho 393: blob->hdr_cmd = error;
394: blob->hdr_ret = RPC_ERROR(-1);
395: }
396: break;
397: case unset:
398: if (rpc_srv_unregisterBLOB(s, blob->hdr_var) == -1) {
399: blob->hdr_cmd = error;
400: blob->hdr_ret = RPC_ERROR(-1);
1.1 misho 401: }
402: break;
1.7 misho 403: default:
404: rpc_SetErr(EPROCUNAVAIL, "Unsupported BLOB command %d", blob->hdr_cmd);
405: blob->hdr_cmd = error;
406: blob->hdr_ret = RPC_ERROR(-1);
407: }
1.1 misho 408:
1.7 misho 409: end:
410: schedWrite(TASK_ROOT(task), txBLOB, TASK_ARG(task), TASK_FD(task),
411: TASK_DATA(task), TASK_DATLEN(task));
412: schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task),
413: TASK_DATA(task), TASK_DATLEN(task));
414: return NULL;
1.2 misho 415: }
416:
417: static void *
418: rpc_srv_dispatchVars(void *arg)
419: {
420: rpc_cli_t *c = arg;
421: rpc_srv_t *s;
1.7 misho 422: sched_root_task_t *root;
423: u_char *buf;
424: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
425:
1.2 misho 426: if (!arg) {
1.7 misho 427: rpc_SetErr(EINVAL, "Invalid parameter can`t procced BLOB");
1.2 misho 428: return NULL;
429: } else
430: s = c->cli_parent;
431:
1.7 misho 432: /* allocate net buffer */
433: buf = malloc(sizeof(struct tagBLOBHdr));
434: if (!buf) {
435: LOGERR;
436: return NULL;
437: }
1.2 misho 438:
1.7 misho 439: root = schedBegin();
440: if (!root) {
441: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
442: free(buf);
443: return NULL;
444: } else {
445: schedTermCondition(root, kill);
446: schedPolling(root, &ts, NULL);
447: }
1.4 misho 448:
1.7 misho 449: schedRead(root, rxBLOB, c, c->cli_sock, buf, sizeof(struct tagBLOBHdr));
1.2 misho 450:
1.7 misho 451: schedRun(root, (void*) &s->srv_blob.state);
452: schedEnd(&root);
1.2 misho 453:
454: shutdown(c->cli_sock, SHUT_RDWR);
455: close(c->cli_sock);
456: memset(c, 0, sizeof(rpc_cli_t));
1.7 misho 457: free(buf);
458: return NULL;
1.1 misho 459: }
460:
461: // -------------------------------------------------
462:
463: /*
1.7 misho 464: * rpc_srv_initBLOBServer() - Init & create BLOB Server
465: *
1.4 misho 466: * @srv = RPC server instance
1.2 misho 467: * @Port = Port for bind server, if Port == 0 default port is selected
468: * @diskDir = Disk place for BLOB file objects
469: * return: -1 == error or 0 bind and created BLOB server instance
470: */
471: int
472: rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)
473: {
474: int n = 1;
1.6 misho 475: io_sockaddr_t sa;
1.2 misho 476:
477: if (!srv) {
1.7 misho 478: rpc_SetErr(EINVAL, "Invalid parameters can`t init BLOB server");
1.2 misho 479: return -1;
480: }
481: if (srv->srv_blob.state) {
1.7 misho 482: rpc_SetErr(EPERM, "Already started BLOB server!");
1.2 misho 483: return 0;
484: }
485:
486: memset(&srv->srv_blob, 0, sizeof srv->srv_blob);
487: if (access(diskDir, R_OK | W_OK) == -1) {
488: LOGERR;
489: return -1;
490: } else
1.7 misho 491: AIT_SET_STR(&srv->srv_blob.dir, diskDir);
1.2 misho 492:
493: srv->srv_blob.server.cli_tid = pthread_self();
494: srv->srv_blob.server.cli_parent = srv;
1.4 misho 495:
496: memcpy(&sa, &srv->srv_server.cli_sa, sizeof sa);
1.6 misho 497: switch (sa.sa.sa_family) {
1.4 misho 498: case AF_INET:
1.6 misho 499: sa.sin.sin_port = htons(Port ? Port : ntohs(sa.sin.sin_port) + 1);
1.4 misho 500: break;
501: case AF_INET6:
1.6 misho 502: sa.sin6.sin6_port = htons(Port ? Port : ntohs(sa.sin6.sin6_port) + 1);
1.4 misho 503: break;
504: case AF_LOCAL:
1.6 misho 505: strlcat(sa.sun.sun_path, ".blob", sizeof sa.sun.sun_path);
1.4 misho 506: break;
507: default:
1.7 misho 508: AIT_FREE_VAL(&srv->srv_blob.dir);
1.4 misho 509: return -1;
1.2 misho 510: }
1.6 misho 511: memcpy(&srv->srv_blob.server.cli_sa, &sa, sizeof sa);
1.2 misho 512:
1.4 misho 513: /* create BLOB server socket */
1.6 misho 514: srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);
1.2 misho 515: if (srv->srv_blob.server.cli_sock == -1) {
516: LOGERR;
1.7 misho 517: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 518: return -1;
519: }
520: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
521: LOGERR;
522: close(srv->srv_blob.server.cli_sock);
1.7 misho 523: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 524: return -1;
525: }
1.5 misho 526: n = srv->srv_netbuf;
527: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
528: LOGERR;
529: close(srv->srv_blob.server.cli_sock);
1.7 misho 530: AIT_FREE_VAL(&srv->srv_blob.dir);
1.5 misho 531: return -1;
532: }
533: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
534: LOGERR;
535: close(srv->srv_blob.server.cli_sock);
1.7 misho 536: AIT_FREE_VAL(&srv->srv_blob.dir);
1.5 misho 537: return -1;
538: }
1.6 misho 539: if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa,
540: srv->srv_blob.server.cli_sa.sa.sa_len) == -1) {
1.2 misho 541: LOGERR;
542: close(srv->srv_blob.server.cli_sock);
1.7 misho 543: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 544: return -1;
545: }
546:
1.4 misho 547: /* allocate pool for concurent clients */
1.2 misho 548: srv->srv_blob.clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));
549: if (!srv->srv_blob.clients) {
550: LOGERR;
551: close(srv->srv_blob.server.cli_sock);
1.7 misho 552: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 553: return -1;
554: } else
555: memset(srv->srv_blob.clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));
556:
557: rpc_srv_registerCall(srv, NULL, CALL_BLOBSHUTDOWN, 0);
1.5 misho 558: rpc_srv_registerCall(srv, NULL, CALL_BLOBCLIENTS, 1);
559: rpc_srv_registerCall(srv, NULL, CALL_BLOBVARS, 1);
560: rpc_srv_registerCall(srv, NULL, CALL_BLOBSTATE, 0);
1.2 misho 561:
1.4 misho 562: srv->srv_blob.state = enable; /* enable BLOB */
1.2 misho 563: return 0;
564: }
565:
566: /*
1.7 misho 567: * rpc_srv_endBLOBServer() - Destroy BLOB server, close all opened sockets and free resources
568: *
1.2 misho 569: * @srv = RPC Server instance
570: * return: none
571: */
572: void
573: rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
574: {
575: rpc_cli_t *c;
576: register int i;
577: rpc_blob_t *f;
578:
1.8 misho 579: if (!srv || srv->srv_blob.state == disable)
1.2 misho 580: return;
1.8 misho 581: else
1.4 misho 582: srv->srv_blob.state = kill;
1.2 misho 583:
584: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSHUTDOWN);
585: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBCLIENTS);
586: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBVARS);
587: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSTATE);
588:
1.4 misho 589: /* close all clients connections & server socket */
1.2 misho 590: for (i = 0, c = srv->srv_blob.clients; i < srv->srv_numcli && c; i++, c++)
1.6 misho 591: if (c->cli_sa.sa.sa_family)
1.2 misho 592: shutdown(c->cli_sock, SHUT_RDWR);
593: close(srv->srv_blob.server.cli_sock);
594:
595: if (srv->srv_blob.clients) {
596: free(srv->srv_blob.clients);
597: srv->srv_blob.clients = NULL;
598: }
599:
1.4 misho 600: /* detach blobs */
1.2 misho 601: while ((f = srv->srv_blob.blobs)) {
602: srv->srv_blob.blobs = f->blob_next;
603: rpc_srv_blobFree(srv, f);
604: free(f);
605: }
606:
1.7 misho 607: AIT_FREE_VAL(&srv->srv_blob.dir);
608:
1.8 misho 609: /* at final, save disable BLOB service state */
610: srv->srv_blob.state = disable;
1.2 misho 611: }
612:
613: /*
1.7 misho 614: * rpc_srv_loopBLOB() - Execute Main BLOB server loop and wait for clients requests
615: *
1.2 misho 616: * @srv = RPC Server instance
617: * return: -1 error or 0 ok, infinite loop ...
618: */
619: int
1.5 misho 620: rpc_srv_loopBLOB(rpc_srv_t * __restrict srv)
1.2 misho 621: {
1.6 misho 622: socklen_t salen = sizeof(io_sockaddr_t);
1.2 misho 623: register int i;
624: rpc_cli_t *c;
625: int ret;
1.7 misho 626: pthread_attr_t attr;
1.8.2.2 misho 627: struct pollfd pfd;
1.7 misho 628:
1.4 misho 629: if (!srv || srv->srv_blob.state == kill) {
1.7 misho 630: rpc_SetErr(EINVAL, "Invalid parameter can`t start BLOB server");
1.2 misho 631: return -1;
632: }
633:
634: if (listen(srv->srv_blob.server.cli_sock, SOMAXCONN) == -1) {
635: LOGERR;
636: return -1;
1.8.2.6 misho 637: } else
1.8.2.4 misho 638: fcntl(srv->srv_blob.server.cli_sock, F_SETFL,
639: fcntl(srv->srv_blob.server.cli_sock, F_GETFL) | O_NONBLOCK);
1.2 misho 640:
1.7 misho 641: pthread_attr_init(&attr);
642: pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
643:
1.8.2.4 misho 644: pfd.fd = srv->srv_blob.server.cli_sock;
1.8.2.2 misho 645: pfd.events = POLLIN | POLLPRI;
1.7 misho 646: /* main BLOB loop */
1.4 misho 647: while (srv->srv_blob.state != kill && srv->srv_kill != kill) {
1.2 misho 648: for (c = srv->srv_blob.clients, i = 0; i < srv->srv_numcli && c; i++, c++)
1.6 misho 649: if (!c->cli_sa.sa.sa_family)
1.2 misho 650: break;
651: if (i >= srv->srv_numcli) {
1.5 misho 652: #ifdef HAVE_PTHREAD_YIELD
653: pthread_yield();
654: #else
1.2 misho 655: usleep(1000000);
1.5 misho 656: #endif
1.2 misho 657: continue;
658: }
659:
1.8.2.2 misho 660: if ((ret = poll(&pfd, 1, srv->srv_session.sess_timeout * 1000)) == -1 ||
661: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
1.2 misho 662: LOGERR;
663: ret = 1;
664: break;
1.8.2.2 misho 665: } else if (!ret)
1.2 misho 666: continue;
667:
1.6 misho 668: c->cli_sock = accept(srv->srv_blob.server.cli_sock, &c->cli_sa.sa, &salen);
1.2 misho 669: if (c->cli_sock == -1) {
670: LOGERR;
671: continue;
672: } else
673: c->cli_parent = srv;
674:
1.7 misho 675: /* spawn dispatch thread for BLOB client */
676: if (pthread_create(&c->cli_tid, &attr, rpc_srv_dispatchVars, c)) {
1.2 misho 677: LOGERR;
678: continue;
1.7 misho 679: }
1.2 misho 680: }
681:
1.5 misho 682: srv->srv_blob.state = kill;
1.2 misho 683:
1.7 misho 684: pthread_attr_destroy(&attr);
1.2 misho 685: return 0;
686: }
687:
688:
689: /*
1.7 misho 690: * rpc_srv_initServer() - Init & create RPC Server
691: *
1.1 misho 692: * @regProgID = ProgramID for authentication & recognition
693: * @regProcID = ProcessID for authentication & recognition
694: * @concurentClients = Concurent clients at same time to this server
1.5 misho 695: * @netBuf = Network buffer length, if =0 == BUFSIZ (also meaning max RPC packet)
1.4 misho 696: * @family = Family type, AF_INET, AF_INET6 or AF_LOCAL
697: * @csHost = Host name or address for bind server, if NULL any address
1.1 misho 698: * @Port = Port for bind server, if Port == 0 default port is selected
699: * return: NULL == error or !=NULL bind and created RPC server instance
700: */
701: rpc_srv_t *
702: rpc_srv_initServer(u_int regProgID, u_int regProcID, int concurentClients,
1.5 misho 703: int netBuf, u_short family, const char *csHost, u_short Port)
1.1 misho 704: {
705: rpc_srv_t *srv = NULL;
706: int n = 1;
707: struct hostent *host = NULL;
1.6 misho 708: io_sockaddr_t sa;
1.1 misho 709:
1.4 misho 710: if (!concurentClients || !regProgID ||
711: (family != AF_INET && family != AF_INET6 && family != AF_LOCAL)) {
1.1 misho 712: rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t init RPC server ...\n");
713: return NULL;
714: }
715: if (!Port)
716: Port = RPC_DEFPORT;
1.5 misho 717: if (!netBuf)
718: netBuf = BUFSIZ;
1.7 misho 719: else
720: netBuf = io_align(netBuf, 1); /* align netBuf length */
1.4 misho 721: if (csHost && family != AF_LOCAL) {
1.1 misho 722: host = gethostbyname2(csHost, family);
723: if (!host) {
724: rpc_SetErr(h_errno, "Error:: %s\n", hstrerror(h_errno));
725: return NULL;
726: }
727: }
1.4 misho 728: memset(&sa, 0, sizeof sa);
1.6 misho 729: sa.sa.sa_family = family;
1.1 misho 730: switch (family) {
731: case AF_INET:
1.6 misho 732: sa.sin.sin_len = sizeof(struct sockaddr_in);
733: sa.sin.sin_port = htons(Port);
1.1 misho 734: if (csHost)
1.6 misho 735: memcpy(&sa.sin.sin_addr, host->h_addr, host->h_length);
1.1 misho 736: break;
737: case AF_INET6:
1.6 misho 738: sa.sin6.sin6_len = sizeof(struct sockaddr_in6);
739: sa.sin6.sin6_port = htons(Port);
1.4 misho 740: if (csHost)
1.6 misho 741: memcpy(&sa.sin6.sin6_addr, host->h_addr, host->h_length);
1.4 misho 742: break;
743: case AF_LOCAL:
1.6 misho 744: sa.sun.sun_len = sizeof(struct sockaddr_un);
1.1 misho 745: if (csHost)
1.6 misho 746: strlcpy(sa.sun.sun_path, csHost, sizeof sa.sun.sun_path);
747: unlink(sa.sun.sun_path);
1.1 misho 748: break;
749: default:
750: rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t start RPC server ...\n");
751: return NULL;
752: }
753:
754: srv = malloc(sizeof(rpc_srv_t));
755: if (!srv) {
756: LOGERR;
757: return NULL;
758: } else
759: memset(srv, 0, sizeof(rpc_srv_t));
760:
1.5 misho 761: srv->srv_netbuf = netBuf;
1.1 misho 762: srv->srv_numcli = concurentClients;
763: srv->srv_session.sess_version = RPC_VERSION;
1.7 misho 764: srv->srv_session.sess_timeout = DEF_RPC_TIMEOUT;
1.1 misho 765: srv->srv_session.sess_program = regProgID;
766: srv->srv_session.sess_process = regProcID;
767:
1.2 misho 768: srv->srv_server.cli_tid = pthread_self();
1.1 misho 769: srv->srv_server.cli_parent = srv;
1.6 misho 770: memcpy(&srv->srv_server.cli_sa, &sa, sizeof sa);
1.4 misho 771:
772: /* create server socket */
1.1 misho 773: srv->srv_server.cli_sock = socket(family, SOCK_STREAM, 0);
774: if (srv->srv_server.cli_sock == -1) {
775: LOGERR;
776: free(srv);
777: return NULL;
778: }
779: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
780: LOGERR;
781: close(srv->srv_server.cli_sock);
782: free(srv);
783: return NULL;
784: }
1.5 misho 785: n = srv->srv_netbuf;
786: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
787: LOGERR;
788: close(srv->srv_server.cli_sock);
789: free(srv);
790: return NULL;
791: }
792: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
793: LOGERR;
794: close(srv->srv_server.cli_sock);
795: free(srv);
796: return NULL;
797: }
1.6 misho 798: if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa,
799: srv->srv_server.cli_sa.sa.sa_len) == -1) {
1.1 misho 800: LOGERR;
801: close(srv->srv_server.cli_sock);
802: free(srv);
803: return NULL;
804: }
805:
1.4 misho 806: /* allocate pool for concurent clients */
1.1 misho 807: srv->srv_clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));
808: if (!srv->srv_clients) {
809: LOGERR;
810: close(srv->srv_server.cli_sock);
811: free(srv);
812: return NULL;
813: } else
814: memset(srv->srv_clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));
815:
1.2 misho 816: rpc_srv_registerCall(srv, NULL, CALL_SRVSHUTDOWN, 0);
1.5 misho 817: rpc_srv_registerCall(srv, NULL, CALL_SRVCLIENTS, 1);
818: rpc_srv_registerCall(srv, NULL, CALL_SRVSESSIONS, 4);
819: rpc_srv_registerCall(srv, NULL, CALL_SRVCALLS, 1);
1.8 misho 820:
1.1 misho 821: return srv;
822: }
823:
824: /*
1.7 misho 825: * rpc_srv_endServer() - Destroy RPC server, close all opened sockets and free resources
826: *
1.6 misho 827: * @psrv = RPC Server instance
1.1 misho 828: * return: none
829: */
830: void
1.6 misho 831: rpc_srv_endServer(rpc_srv_t ** __restrict psrv)
1.1 misho 832: {
833: rpc_cli_t *c;
834: register int i;
835: rpc_func_t *f;
836:
1.6 misho 837: if (!psrv || !*psrv) {
1.1 misho 838: rpc_SetErr(EINVAL, "Error:: Can`t destroy server because parameter is null!\n");
839: return;
840: }
841:
1.6 misho 842: rpc_srv_endBLOBServer(*psrv);
1.1 misho 843:
1.4 misho 844: /* close all clients connections & server socket */
1.6 misho 845: for (i = 0, c = (*psrv)->srv_clients; i < (*psrv)->srv_numcli && c; i++, c++)
846: if (c->cli_sa.sa.sa_family) {
1.1 misho 847: shutdown(c->cli_sock, SHUT_RDWR);
1.2 misho 848: close(c->cli_sock);
849: }
1.6 misho 850: close((*psrv)->srv_server.cli_sock);
1.1 misho 851:
1.6 misho 852: if ((*psrv)->srv_clients) {
853: free((*psrv)->srv_clients);
854: (*psrv)->srv_clients = NULL;
855: (*psrv)->srv_numcli = 0;
1.1 misho 856: }
857:
1.4 misho 858: /* detach exported calls */
1.6 misho 859: while ((f = (*psrv)->srv_funcs)) {
860: (*psrv)->srv_funcs = f->func_next;
861: io_freeVars(&f->func_vars);
1.7 misho 862: AIT_FREE_VAL(&f->func_name);
863: AIT_FREE_VAL(&f->func_file);
1.2 misho 864: free(f);
865: }
866:
1.6 misho 867: free(*psrv);
868: *psrv = NULL;
1.1 misho 869: }
870:
871: /*
1.7 misho 872: * rpc_srv_loopServer() - Execute Main server loop and wait for clients requests
873: *
1.1 misho 874: * @srv = RPC Server instance
875: * return: -1 error or 0 ok, infinite loop ...
876: */
877: int
1.5 misho 878: rpc_srv_loopServer(rpc_srv_t * __restrict srv)
1.1 misho 879: {
1.6 misho 880: socklen_t salen = sizeof(io_sockaddr_t);
1.1 misho 881: register int i;
882: rpc_cli_t *c;
1.2 misho 883: int ret;
1.7 misho 884: pthread_attr_t attr;
1.8.2.2 misho 885: struct pollfd pfd;
1.7 misho 886:
1.1 misho 887: if (!srv) {
888: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start RPC server ...\n");
889: return -1;
890: }
891:
1.5 misho 892: /* activate BLOB server worker if srv->srv_blob.state == enable */
893: rpc_srv_execBLOBServer(srv);
894:
1.1 misho 895: if (listen(srv->srv_server.cli_sock, SOMAXCONN) == -1) {
896: LOGERR;
897: return -1;
1.8.2.5 misho 898: } /*else
1.8.2.2 misho 899: fcntl(srv->srv_server.cli_sock, F_SETFL, fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
1.8.2.5 misho 900: */
1.1 misho 901:
1.7 misho 902: pthread_attr_init(&attr);
903: pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
904:
1.8.2.2 misho 905: pfd.fd = srv->srv_server.cli_sock;
906: pfd.events = POLLIN | POLLPRI;
1.7 misho 907: /* main rpc loop */
1.4 misho 908: while (srv->srv_kill != kill) {
1.1 misho 909: for (c = srv->srv_clients, i = 0; i < srv->srv_numcli && c; i++, c++)
1.6 misho 910: if (!c->cli_sa.sa.sa_family)
1.1 misho 911: break;
1.2 misho 912: if (i >= srv->srv_numcli) {
1.5 misho 913: #ifdef HAVE_PTHREAD_YIELD
914: pthread_yield();
915: #else
1.1 misho 916: usleep(1000000);
1.5 misho 917: #endif
1.1 misho 918: continue;
919: }
1.2 misho 920:
1.8.2.2 misho 921: if ((ret = poll(&pfd, 1, srv->srv_session.sess_timeout * 1000)) == -1 ||
922: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
1.2 misho 923: LOGERR;
924: ret = 1;
925: break;
1.8.2.2 misho 926: } else if (!ret)
1.2 misho 927: continue;
928:
1.6 misho 929: c->cli_sock = accept(srv->srv_server.cli_sock, &c->cli_sa.sa, &salen);
1.1 misho 930: if (c->cli_sock == -1) {
931: LOGERR;
932: continue;
933: } else
934: c->cli_parent = srv;
935:
1.7 misho 936: /* spawn rpc client dispatcher */
937: if (pthread_create(&c->cli_tid, &attr, rpc_srv_dispatchCall, c)) {
1.1 misho 938: LOGERR;
939: continue;
1.7 misho 940: }
1.1 misho 941: }
942:
1.7 misho 943: pthread_attr_destroy(&attr);
1.1 misho 944: return 0;
945: }
946:
947: // ---------------------------------------------------------
948:
949: /*
950: * rpc_srv_execCall() Execute registered call from RPC server
1.7 misho 951: *
1.1 misho 952: * @call = Register RPC call
953: * @rpc = IN RPC call structure
1.5 misho 954: * @args = IN RPC calling arguments from RPC client
1.1 misho 955: * return: -1 error, !=-1 ok
956: */
957: int
1.2 misho 958: rpc_srv_execCall(rpc_func_t * __restrict call, struct tagRPCCall * __restrict rpc,
1.5 misho 959: array_t * __restrict args)
1.1 misho 960: {
961: void *dl;
962: rpc_callback_t func;
963: int ret;
964:
1.2 misho 965: if (!call || !rpc || !call->func_parent) {
1.7 misho 966: rpc_SetErr(EINVAL, "Invalid parameter can`t exec function");
1.1 misho 967: return -1;
968: }
969:
1.8.2.1 misho 970: dl = dlopen(AIT_ADDR(&call->func_file), RTLD_NOW);
1.1 misho 971: if (!dl) {
1.7 misho 972: rpc_SetErr(ENOENT, "Can`t attach module %s!", dlerror());
1.1 misho 973: return -1;
974: }
975:
1.7 misho 976: func = dlsym(dl, (const char*) AIT_GET_STR(&call->func_name));
1.1 misho 977: if (func)
1.6 misho 978: ret = func(call, ntohs(rpc->call_argc), args);
1.1 misho 979: else {
1.7 misho 980: rpc_SetErr(ENOEXEC, "Can`t find function %s!", dlerror());
1.1 misho 981: ret = -1;
982: }
983:
984: dlclose(dl);
985: return ret;
986: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>