Annotation of libaitrpc/src/srv.c, revision 1.9.2.3
1.1 misho 1: /*************************************************************************
2: * (C) 2010 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.9.2.3 ! misho 6: * $Id: srv.c,v 1.9.2.2 2012/05/14 15:22:22 misho Exp $
1.1 misho 7: *
1.2 misho 8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
1.7 misho 15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
1.2 misho 16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
1.1 misho 46: #include "global.h"
47:
48:
1.7 misho 49: static void *rxPacket(sched_task_t*);
50: static void *rxBLOB(sched_task_t*);
51:
52: static void *
53: txPacket(sched_task_t *task)
54: {
55: rpc_cli_t *c = TASK_ARG(task);
56: rpc_srv_t *s = c->cli_parent;
57: rpc_func_t *f = NULL;
58: u_char *buf = TASK_DATA(task);
59: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
60: int ret, wlen = sizeof(struct tagRPCCall);
61: array_t *arr = NULL;
62:
63: if (rpc->call_argc) {
64: f = rpc_srv_getCall(s, ntohs(rpc->call_tag), ntohl(rpc->call_hash));
65: if (!f) {
66: rpc->call_argc ^= rpc->call_argc;
67: rpc->call_rep.ret = RPC_ERROR(-1);
68: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
69: } else {
70: rpc->call_argc = htons(rpc_srv_getVars(f, &arr));
71: /* Go Encapsulate variables */
72: ret = io_vars2buffer(buf + wlen, TASK_DATLEN(task) - wlen, arr);
73: io_clrVars(f->func_vars);
74: if (ret == -1) {
75: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
76: rpc->call_argc ^= rpc->call_argc;
77: rpc->call_rep.ret = RPC_ERROR(-1);
78: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
79: } else
80: wlen += ret;
81: }
82: }
83:
1.8 misho 84: rpc->call_len = htons(wlen);
85:
1.7 misho 86: /* calculate CRC */
87: rpc->call_crc ^= rpc->call_crc;
1.8 misho 88: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
1.7 misho 89:
90: /* send reply */
91: ret = send(TASK_FD(task), buf, wlen, 0);
92: if (ret == -1)
93: LOGERR;
94: else if (ret != wlen)
95: rpc_SetErr(EPROCUNAVAIL, "RPC reply, should be send %d bytes, "
96: "really sended %d bytes", wlen, ret);
97:
98: return NULL;
99: }
100:
101: static void *
102: execCall(sched_task_t *task)
103: {
104: rpc_cli_t *c = TASK_ARG(task);
105: rpc_srv_t *s = c->cli_parent;
106: rpc_func_t *f = NULL;
107: array_t *arr = NULL;
1.8 misho 108: u_char *buf = TASK_DATA(task) + TASK_VAL(task);
1.7 misho 109: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
110: int argc = ntohs(rpc->call_argc);
111:
112: /* Go decapsulate variables ... */
1.8 misho 113: if (argc) {
1.7 misho 114: arr = io_buffer2vars(buf + sizeof(struct tagRPCCall),
1.8 misho 115: TASK_DATLEN(task) - TASK_VAL(task) - sizeof(struct tagRPCCall),
116: argc, 1);
1.7 misho 117: if (!arr) {
118: rpc_SetErr(ERPCMISMATCH, "#%d - %s", io_GetErrno(), io_GetError());
119: rpc->call_argc ^= rpc->call_argc;
120: rpc->call_rep.ret = RPC_ERROR(-1);
121: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
122: return NULL;
123: }
124: }
125:
126: if (!(f = rpc_srv_getCall(s, ntohs(rpc->call_tag), ntohl(rpc->call_hash)))) {
127: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
128: rpc->call_argc ^= rpc->call_argc;
129: rpc->call_rep.ret = RPC_ERROR(-1);
130: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
131: } else {
1.8 misho 132: /* if client doesn't want reply */
133: argc = rpc->call_req.flags & RPC_NOREPLY;
1.7 misho 134: rpc->call_rep.ret = RPC_ERROR(rpc_srv_execCall(f, rpc, arr));
135: if (rpc->call_rep.ret == htonl(-1)) {
136: rpc->call_rep.eno = RPC_ERROR(errno);
137: rpc->call_argc ^= rpc->call_argc;
138: } else {
139: rpc->call_rep.eno ^= rpc->call_rep.eno;
1.8 misho 140: if (argc) {
141: /* without reply */
142: io_clrVars(f->func_vars);
143: rpc->call_argc ^= rpc->call_argc;
144: } else
145: rpc->call_argc = htons(rpc_srv_getVars(f, NULL));
1.7 misho 146: }
147: }
148:
149: if (arr)
150: io_arrayDestroy(&arr);
151: return NULL;
152: }
153:
154: static void *
155: rxPacket(sched_task_t *task)
156: {
157: rpc_cli_t *c = TASK_ARG(task);
158: rpc_srv_t *s = c->cli_parent;
159: u_char *buf = TASK_DATA(task);
1.8 misho 160: int rlen, noreply;
161: u_short crc, off = 0;
1.7 misho 162: struct tagRPCCall *rpc;
163: struct timespec ts;
164:
165: memset(buf, 0, TASK_DATLEN(task));
166: rlen = recv(TASK_FD(task), buf, TASK_DATLEN(task), 0);
167: if (rlen == -1) {
168: LOGERR;
1.9 misho 169: c->cli_kill = kill;
1.7 misho 170: return NULL;
171: } else if (!rlen) { /* receive EOF */
1.9 misho 172: c->cli_kill = kill;
1.7 misho 173: return NULL;
1.9 misho 174: }
1.7 misho 175:
1.8 misho 176: do {
177: if (rlen < sizeof(struct tagRPCCall)) {
178: rpc_SetErr(ERPCMISMATCH, "Too short RPC packet");
179:
180: schedRead(TASK_ROOT(task), rxPacket, TASK_ARG(task), TASK_FD(task),
181: TASK_DATA(task), TASK_DATLEN(task));
182: return NULL;
183: } else
184: rpc = (struct tagRPCCall*) (buf + off);
185:
186: rlen -= ntohs(rpc->call_len);
187:
188: /* check integrity of packet */
189: crc = ntohs(rpc->call_crc);
190: rpc->call_crc ^= rpc->call_crc;
191: if (crc != crcFletcher16((u_short*) (buf + off), ntohs(rpc->call_len) / 2)) {
192: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
1.7 misho 193:
1.8 misho 194: off += ntohs(rpc->call_len);
195: if (rlen < 1)
196: break;
197: else
198: continue;
199: }
1.7 misho 200:
1.8 misho 201: noreply = rpc->call_req.flags & RPC_NOREPLY;
1.7 misho 202:
1.8 misho 203: /* check RPC packet session info */
204: if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) {
205: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
206: rpc->call_argc ^= rpc->call_argc;
207: rpc->call_rep.ret = RPC_ERROR(-1);
208: rpc->call_rep.eno = RPC_ERROR(errno);
209: } else {
210: /* change socket timeout from last packet */
211: ts.tv_sec = rpc->call_session.sess_timeout;
212: ts.tv_nsec = 0;
213: schedPolling(TASK_ROOT(task), &ts, NULL);
214:
215: /* execute RPC call */
216: schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), off,
217: TASK_DATA(task), TASK_DATLEN(task));
218: }
1.7 misho 219:
1.8 misho 220: /* send RPC reply */
221: if (!noreply)
222: schedWrite(TASK_ROOT(task), txPacket, TASK_ARG(task), TASK_FD(task),
223: buf + off, TASK_DATLEN(task) - off);
1.7 misho 224:
1.8 misho 225: off += ntohs(rpc->call_len);
226: } while (rlen > 0);
1.7 misho 227:
228: /* lets get next packet */
229: schedRead(TASK_ROOT(task), rxPacket, TASK_ARG(task), TASK_FD(task),
230: TASK_DATA(task), TASK_DATLEN(task));
231: return NULL;
232: }
233:
1.1 misho 234: static void *
235: rpc_srv_dispatchCall(void *arg)
236: {
237: rpc_cli_t *c = arg;
238: rpc_srv_t *s;
1.5 misho 239: u_char *buf;
1.7 misho 240: sched_root_task_t *root;
241: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
242:
1.1 misho 243: if (!arg) {
1.7 misho 244: rpc_SetErr(EINVAL, "Invalid parameter can`t procced RPC client");
1.1 misho 245: return NULL;
246: } else
247: s = c->cli_parent;
248:
1.7 misho 249: /* allocate net buffer */
1.5 misho 250: buf = malloc(s->srv_netbuf);
251: if (!buf) {
252: LOGERR;
253: return NULL;
254: }
255:
1.7 misho 256: root = schedBegin();
257: if (!root) {
258: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
259: free(buf);
260: return NULL;
261: } else {
262: schedTermCondition(root, kill);
263: schedPolling(root, &ts, NULL);
264: }
265:
266: schedRead(root, rxPacket, c, c->cli_sock, buf, s->srv_netbuf);
267:
1.9 misho 268: schedRun(root, (void*) &c->cli_kill);
1.7 misho 269: schedEnd(&root);
270:
271: shutdown(c->cli_sock, SHUT_RDWR);
272: close(c->cli_sock);
273: memset(c, 0, sizeof(rpc_cli_t));
274: free(buf);
275: return NULL;
276: }
277:
278:
279: static void *
280: txBLOB(sched_task_t *task)
281: {
282: u_char *buf = TASK_DATA(task);
283: struct tagBLOBHdr *blob = (struct tagBLOBHdr *) buf;
284: int wlen = sizeof(struct tagBLOBHdr);
285:
286: /* calculate CRC */
287: blob->hdr_crc ^= blob->hdr_crc;
1.8 misho 288: blob->hdr_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
1.7 misho 289:
290: /* send reply */
291: wlen = send(TASK_FD(task), buf, wlen, 0);
292: if (wlen == -1)
293: LOGERR;
294: else if (wlen != sizeof(struct tagBLOBHdr))
295: rpc_SetErr(EPROCUNAVAIL, "RPC reply, should be send %d bytes, "
296: "really sended %d bytes", sizeof(struct tagBLOBHdr), wlen);
297:
298: return NULL;
299: }
1.4 misho 300:
1.7 misho 301: static void *
302: rxBLOB(sched_task_t *task)
303: {
304: rpc_cli_t *c = TASK_ARG(task);
305: rpc_srv_t *s = c->cli_parent;
306: rpc_blob_t *b;
307: u_char *buf = TASK_DATA(task);
308: struct tagBLOBHdr *blob = (struct tagBLOBHdr *) buf;
309: int rlen;
310: u_short crc;
311: struct timespec ts;
312:
313: /* check for disable service at this moment? */
1.8 misho 314: if (!s || s->srv_blob.state == disable) {
1.7 misho 315: usleep(100000);
316: #ifdef HAVE_PTHREAD_YIELD
317: pthread_yield();
318: #endif
319: schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task),
320: TASK_DATA(task), TASK_DATLEN(task));
321: return NULL;
322: }
1.5 misho 323:
1.7 misho 324: memset(buf, 0, TASK_DATLEN(task));
325: rlen = recv(TASK_FD(task), buf, TASK_DATLEN(task), 0);
326: if (rlen == -1) {
327: LOGERR;
1.9 misho 328: c->cli_kill = kill;
1.7 misho 329: return NULL;
1.9 misho 330: } else if (!rlen) { /* receive EOF */
331: c->cli_kill = kill;
1.7 misho 332: return NULL;
1.9 misho 333: }
1.6 misho 334:
1.7 misho 335: if (rlen < sizeof(struct tagBLOBHdr)) {
336: rpc_SetErr(ERPCMISMATCH, "Too short BLOB packet");
337: schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task),
338: TASK_DATA(task), TASK_DATLEN(task));
339: return NULL;
340: }
1.1 misho 341:
1.7 misho 342: /* check integrity of packet */
343: crc = ntohs(blob->hdr_crc);
344: blob->hdr_crc ^= blob->hdr_crc;
1.8 misho 345: if (crc != crcFletcher16((u_short*) buf, rlen / 2)) {
1.7 misho 346: rpc_SetErr(ERPCMISMATCH, "Bad CRC BLOB packet");
347: schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task),
348: TASK_DATA(task), TASK_DATLEN(task));
349: return NULL;
350: }
1.5 misho 351:
1.7 misho 352: /* check RPC packet session info */
353: if ((crc = rpc_chkPktSession(&blob->hdr_session, &s->srv_session))) {
354: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
355: blob->hdr_cmd = error;
356: goto end;
357: } else {
358: /* change socket timeout from last packet */
359: ts.tv_sec = blob->hdr_session.sess_timeout;
360: ts.tv_nsec = 0;
361: schedPolling(TASK_ROOT(task), &ts, NULL);
362: }
363:
364: /* Go to proceed packet ... */
365: switch (blob->hdr_cmd) {
366: case get:
367: if (!(b = rpc_srv_getBLOB(s, ntohl(blob->hdr_var)))) {
368: rpc_SetErr(EINVAL, "Var=%x not found", ntohl(blob->hdr_var));
369: blob->hdr_cmd = no;
370: blob->hdr_ret = RPC_ERROR(-1);
371: break;
372: } else
373: blob->hdr_len = htonl(b->blob_len);
1.5 misho 374:
1.7 misho 375: if (rpc_srv_blobMap(s, b) != -1) {
376: /* deliver BLOB variable to client */
377: blob->hdr_ret = htonl(rpc_srv_sendBLOB(c, b));
378: rpc_srv_blobUnmap(b);
379: } else {
380: blob->hdr_cmd = error;
381: blob->hdr_ret = RPC_ERROR(-1);
382: }
383: break;
384: case set:
385: if ((b = rpc_srv_registerBLOB(s, ntohl(blob->hdr_len)))) {
386: /* set new BLOB variable for reply :) */
387: blob->hdr_var = htonl(b->blob_var);
388:
389: /* receive BLOB from client */
390: blob->hdr_ret = htonl(rpc_srv_recvBLOB(c, b));
391: rpc_srv_blobUnmap(b);
1.5 misho 392: } else {
1.7 misho 393: blob->hdr_cmd = error;
394: blob->hdr_ret = RPC_ERROR(-1);
395: }
396: break;
397: case unset:
398: if (rpc_srv_unregisterBLOB(s, blob->hdr_var) == -1) {
399: blob->hdr_cmd = error;
400: blob->hdr_ret = RPC_ERROR(-1);
1.1 misho 401: }
402: break;
1.7 misho 403: default:
404: rpc_SetErr(EPROCUNAVAIL, "Unsupported BLOB command %d", blob->hdr_cmd);
405: blob->hdr_cmd = error;
406: blob->hdr_ret = RPC_ERROR(-1);
407: }
1.1 misho 408:
1.7 misho 409: end:
410: schedWrite(TASK_ROOT(task), txBLOB, TASK_ARG(task), TASK_FD(task),
411: TASK_DATA(task), TASK_DATLEN(task));
412: schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task),
413: TASK_DATA(task), TASK_DATLEN(task));
414: return NULL;
1.2 misho 415: }
416:
417: static void *
418: rpc_srv_dispatchVars(void *arg)
419: {
420: rpc_cli_t *c = arg;
421: rpc_srv_t *s;
1.7 misho 422: sched_root_task_t *root;
423: u_char *buf;
424: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
425:
1.2 misho 426: if (!arg) {
1.7 misho 427: rpc_SetErr(EINVAL, "Invalid parameter can`t procced BLOB");
1.2 misho 428: return NULL;
429: } else
430: s = c->cli_parent;
431:
1.7 misho 432: /* allocate net buffer */
433: buf = malloc(sizeof(struct tagBLOBHdr));
434: if (!buf) {
435: LOGERR;
436: return NULL;
437: }
1.2 misho 438:
1.7 misho 439: root = schedBegin();
440: if (!root) {
441: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
442: free(buf);
443: return NULL;
444: } else {
445: schedTermCondition(root, kill);
446: schedPolling(root, &ts, NULL);
447: }
1.4 misho 448:
1.7 misho 449: schedRead(root, rxBLOB, c, c->cli_sock, buf, sizeof(struct tagBLOBHdr));
1.2 misho 450:
1.9 misho 451: schedRun(root, (void*) &c->cli_kill);
1.7 misho 452: schedEnd(&root);
1.2 misho 453:
454: shutdown(c->cli_sock, SHUT_RDWR);
455: close(c->cli_sock);
456: memset(c, 0, sizeof(rpc_cli_t));
1.7 misho 457: free(buf);
458: return NULL;
1.1 misho 459: }
460:
461: // -------------------------------------------------
462:
463: /*
1.7 misho 464: * rpc_srv_initBLOBServer() - Init & create BLOB Server
465: *
1.4 misho 466: * @srv = RPC server instance
1.2 misho 467: * @Port = Port for bind server, if Port == 0 default port is selected
468: * @diskDir = Disk place for BLOB file objects
469: * return: -1 == error or 0 bind and created BLOB server instance
470: */
471: int
472: rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)
473: {
474: int n = 1;
1.6 misho 475: io_sockaddr_t sa;
1.2 misho 476:
477: if (!srv) {
1.7 misho 478: rpc_SetErr(EINVAL, "Invalid parameters can`t init BLOB server");
1.2 misho 479: return -1;
480: }
481: if (srv->srv_blob.state) {
1.7 misho 482: rpc_SetErr(EPERM, "Already started BLOB server!");
1.2 misho 483: return 0;
484: }
485:
486: memset(&srv->srv_blob, 0, sizeof srv->srv_blob);
487: if (access(diskDir, R_OK | W_OK) == -1) {
488: LOGERR;
489: return -1;
490: } else
1.7 misho 491: AIT_SET_STR(&srv->srv_blob.dir, diskDir);
1.2 misho 492:
493: srv->srv_blob.server.cli_tid = pthread_self();
494: srv->srv_blob.server.cli_parent = srv;
1.4 misho 495:
496: memcpy(&sa, &srv->srv_server.cli_sa, sizeof sa);
1.6 misho 497: switch (sa.sa.sa_family) {
1.4 misho 498: case AF_INET:
1.6 misho 499: sa.sin.sin_port = htons(Port ? Port : ntohs(sa.sin.sin_port) + 1);
1.4 misho 500: break;
501: case AF_INET6:
1.6 misho 502: sa.sin6.sin6_port = htons(Port ? Port : ntohs(sa.sin6.sin6_port) + 1);
1.4 misho 503: break;
504: case AF_LOCAL:
1.6 misho 505: strlcat(sa.sun.sun_path, ".blob", sizeof sa.sun.sun_path);
1.4 misho 506: break;
507: default:
1.7 misho 508: AIT_FREE_VAL(&srv->srv_blob.dir);
1.4 misho 509: return -1;
1.2 misho 510: }
1.6 misho 511: memcpy(&srv->srv_blob.server.cli_sa, &sa, sizeof sa);
1.2 misho 512:
1.4 misho 513: /* create BLOB server socket */
1.6 misho 514: srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);
1.2 misho 515: if (srv->srv_blob.server.cli_sock == -1) {
516: LOGERR;
1.7 misho 517: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 518: return -1;
519: }
520: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
521: LOGERR;
522: close(srv->srv_blob.server.cli_sock);
1.7 misho 523: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 524: return -1;
525: }
1.5 misho 526: n = srv->srv_netbuf;
527: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
528: LOGERR;
529: close(srv->srv_blob.server.cli_sock);
1.7 misho 530: AIT_FREE_VAL(&srv->srv_blob.dir);
1.5 misho 531: return -1;
532: }
533: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
534: LOGERR;
535: close(srv->srv_blob.server.cli_sock);
1.7 misho 536: AIT_FREE_VAL(&srv->srv_blob.dir);
1.5 misho 537: return -1;
538: }
1.6 misho 539: if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa,
540: srv->srv_blob.server.cli_sa.sa.sa_len) == -1) {
1.2 misho 541: LOGERR;
542: close(srv->srv_blob.server.cli_sock);
1.7 misho 543: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 544: return -1;
545: }
546:
1.4 misho 547: /* allocate pool for concurent clients */
1.2 misho 548: srv->srv_blob.clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));
549: if (!srv->srv_blob.clients) {
550: LOGERR;
551: close(srv->srv_blob.server.cli_sock);
1.7 misho 552: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 553: return -1;
554: } else
555: memset(srv->srv_blob.clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));
556:
557: rpc_srv_registerCall(srv, NULL, CALL_BLOBSHUTDOWN, 0);
1.5 misho 558: rpc_srv_registerCall(srv, NULL, CALL_BLOBCLIENTS, 1);
559: rpc_srv_registerCall(srv, NULL, CALL_BLOBVARS, 1);
560: rpc_srv_registerCall(srv, NULL, CALL_BLOBSTATE, 0);
1.2 misho 561:
1.4 misho 562: srv->srv_blob.state = enable; /* enable BLOB */
1.2 misho 563: return 0;
564: }
565:
566: /*
1.7 misho 567: * rpc_srv_endBLOBServer() - Destroy BLOB server, close all opened sockets and free resources
568: *
1.2 misho 569: * @srv = RPC Server instance
570: * return: none
571: */
572: void
573: rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
574: {
575: rpc_cli_t *c;
576: register int i;
577: rpc_blob_t *f;
578:
1.8 misho 579: if (!srv || srv->srv_blob.state == disable)
1.2 misho 580: return;
1.8 misho 581: else
1.4 misho 582: srv->srv_blob.state = kill;
1.2 misho 583:
584: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSHUTDOWN);
585: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBCLIENTS);
586: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBVARS);
587: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSTATE);
588:
1.4 misho 589: /* close all clients connections & server socket */
1.2 misho 590: for (i = 0, c = srv->srv_blob.clients; i < srv->srv_numcli && c; i++, c++)
1.6 misho 591: if (c->cli_sa.sa.sa_family)
1.2 misho 592: shutdown(c->cli_sock, SHUT_RDWR);
593: close(srv->srv_blob.server.cli_sock);
594:
595: if (srv->srv_blob.clients) {
596: free(srv->srv_blob.clients);
597: srv->srv_blob.clients = NULL;
598: }
599:
1.4 misho 600: /* detach blobs */
1.2 misho 601: while ((f = srv->srv_blob.blobs)) {
602: srv->srv_blob.blobs = f->blob_next;
603: rpc_srv_blobFree(srv, f);
604: free(f);
605: }
606:
1.7 misho 607: AIT_FREE_VAL(&srv->srv_blob.dir);
608:
1.8 misho 609: /* at final, save disable BLOB service state */
610: srv->srv_blob.state = disable;
1.2 misho 611: }
612:
613: /*
1.7 misho 614: * rpc_srv_loopBLOB() - Execute Main BLOB server loop and wait for clients requests
615: *
1.2 misho 616: * @srv = RPC Server instance
617: * return: -1 error or 0 ok, infinite loop ...
618: */
619: int
1.5 misho 620: rpc_srv_loopBLOB(rpc_srv_t * __restrict srv)
1.2 misho 621: {
1.6 misho 622: socklen_t salen = sizeof(io_sockaddr_t);
1.2 misho 623: register int i;
624: rpc_cli_t *c;
625: int ret;
1.7 misho 626: pthread_attr_t attr;
1.9 misho 627: struct pollfd pfd;
1.2 misho 628:
1.4 misho 629: if (!srv || srv->srv_blob.state == kill) {
1.7 misho 630: rpc_SetErr(EINVAL, "Invalid parameter can`t start BLOB server");
1.2 misho 631: return -1;
632: }
633:
634: if (listen(srv->srv_blob.server.cli_sock, SOMAXCONN) == -1) {
635: LOGERR;
636: return -1;
1.9 misho 637: } else
638: fcntl(srv->srv_blob.server.cli_sock, F_SETFL,
639: fcntl(srv->srv_blob.server.cli_sock, F_GETFL) | O_NONBLOCK);
1.2 misho 640:
1.7 misho 641: pthread_attr_init(&attr);
642: pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
643:
1.9 misho 644: pfd.fd = srv->srv_blob.server.cli_sock;
645: pfd.events = POLLIN | POLLPRI;
1.7 misho 646: /* main BLOB loop */
1.4 misho 647: while (srv->srv_blob.state != kill && srv->srv_kill != kill) {
1.2 misho 648: for (c = srv->srv_blob.clients, i = 0; i < srv->srv_numcli && c; i++, c++)
1.6 misho 649: if (!c->cli_sa.sa.sa_family)
1.2 misho 650: break;
651: if (i >= srv->srv_numcli) {
1.5 misho 652: #ifdef HAVE_PTHREAD_YIELD
653: pthread_yield();
654: #endif
1.9.2.1 misho 655: usleep(1000000);
1.2 misho 656: continue;
657: }
658:
1.9 misho 659: if ((ret = poll(&pfd, 1, srv->srv_session.sess_timeout * 1000)) == -1 ||
660: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
1.2 misho 661: LOGERR;
662: ret = 1;
663: break;
1.9 misho 664: } else if (!ret)
1.2 misho 665: continue;
666:
1.6 misho 667: c->cli_sock = accept(srv->srv_blob.server.cli_sock, &c->cli_sa.sa, &salen);
1.2 misho 668: if (c->cli_sock == -1) {
669: LOGERR;
670: continue;
671: } else
672: c->cli_parent = srv;
673:
1.7 misho 674: /* spawn dispatch thread for BLOB client */
1.9 misho 675: c->cli_kill = enable;
1.7 misho 676: if (pthread_create(&c->cli_tid, &attr, rpc_srv_dispatchVars, c)) {
1.2 misho 677: LOGERR;
678: continue;
1.7 misho 679: }
1.2 misho 680: }
681:
1.5 misho 682: srv->srv_blob.state = kill;
1.2 misho 683:
1.7 misho 684: pthread_attr_destroy(&attr);
1.2 misho 685: return 0;
686: }
687:
688:
689: /*
1.7 misho 690: * rpc_srv_initServer() - Init & create RPC Server
691: *
1.1 misho 692: * @regProgID = ProgramID for authentication & recognition
693: * @regProcID = ProcessID for authentication & recognition
694: * @concurentClients = Concurent clients at same time to this server
1.5 misho 695: * @netBuf = Network buffer length, if =0 == BUFSIZ (also meaning max RPC packet)
1.4 misho 696: * @family = Family type, AF_INET, AF_INET6 or AF_LOCAL
697: * @csHost = Host name or address for bind server, if NULL any address
1.1 misho 698: * @Port = Port for bind server, if Port == 0 default port is selected
699: * return: NULL == error or !=NULL bind and created RPC server instance
700: */
701: rpc_srv_t *
702: rpc_srv_initServer(u_int regProgID, u_int regProcID, int concurentClients,
1.5 misho 703: int netBuf, u_short family, const char *csHost, u_short Port)
1.1 misho 704: {
705: rpc_srv_t *srv = NULL;
706: int n = 1;
1.6 misho 707: io_sockaddr_t sa;
1.1 misho 708:
1.9.2.2 misho 709: if (!concurentClients || !regProgID) {
710: rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
1.1 misho 711: return NULL;
712: }
1.9.2.2 misho 713: if (!io_gethostbyname(csHost, Port, &sa))
714: return NULL;
1.1 misho 715: if (!Port)
716: Port = RPC_DEFPORT;
1.5 misho 717: if (!netBuf)
718: netBuf = BUFSIZ;
1.7 misho 719: else
720: netBuf = io_align(netBuf, 1); /* align netBuf length */
1.1 misho 721:
722: srv = malloc(sizeof(rpc_srv_t));
723: if (!srv) {
724: LOGERR;
725: return NULL;
726: } else
727: memset(srv, 0, sizeof(rpc_srv_t));
728:
1.5 misho 729: srv->srv_netbuf = netBuf;
1.1 misho 730: srv->srv_numcli = concurentClients;
731: srv->srv_session.sess_version = RPC_VERSION;
1.7 misho 732: srv->srv_session.sess_timeout = DEF_RPC_TIMEOUT;
1.1 misho 733: srv->srv_session.sess_program = regProgID;
734: srv->srv_session.sess_process = regProcID;
735:
1.2 misho 736: srv->srv_server.cli_tid = pthread_self();
1.1 misho 737: srv->srv_server.cli_parent = srv;
1.6 misho 738: memcpy(&srv->srv_server.cli_sa, &sa, sizeof sa);
1.4 misho 739:
740: /* create server socket */
1.9.2.2 misho 741: srv->srv_server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);
1.1 misho 742: if (srv->srv_server.cli_sock == -1) {
743: LOGERR;
744: free(srv);
745: return NULL;
746: }
747: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
748: LOGERR;
749: close(srv->srv_server.cli_sock);
750: free(srv);
751: return NULL;
752: }
1.5 misho 753: n = srv->srv_netbuf;
754: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
755: LOGERR;
756: close(srv->srv_server.cli_sock);
757: free(srv);
758: return NULL;
759: }
760: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
761: LOGERR;
762: close(srv->srv_server.cli_sock);
763: free(srv);
764: return NULL;
765: }
1.6 misho 766: if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa,
767: srv->srv_server.cli_sa.sa.sa_len) == -1) {
1.1 misho 768: LOGERR;
769: close(srv->srv_server.cli_sock);
770: free(srv);
771: return NULL;
772: }
773:
1.4 misho 774: /* allocate pool for concurent clients */
1.1 misho 775: srv->srv_clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));
776: if (!srv->srv_clients) {
777: LOGERR;
778: close(srv->srv_server.cli_sock);
779: free(srv);
780: return NULL;
781: } else
782: memset(srv->srv_clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));
783:
1.2 misho 784: rpc_srv_registerCall(srv, NULL, CALL_SRVSHUTDOWN, 0);
1.5 misho 785: rpc_srv_registerCall(srv, NULL, CALL_SRVCLIENTS, 1);
786: rpc_srv_registerCall(srv, NULL, CALL_SRVSESSIONS, 4);
787: rpc_srv_registerCall(srv, NULL, CALL_SRVCALLS, 1);
1.8 misho 788:
1.1 misho 789: return srv;
790: }
791:
792: /*
1.7 misho 793: * rpc_srv_endServer() - Destroy RPC server, close all opened sockets and free resources
794: *
1.6 misho 795: * @psrv = RPC Server instance
1.1 misho 796: * return: none
797: */
798: void
1.6 misho 799: rpc_srv_endServer(rpc_srv_t ** __restrict psrv)
1.1 misho 800: {
801: rpc_cli_t *c;
802: register int i;
803: rpc_func_t *f;
804:
1.6 misho 805: if (!psrv || !*psrv) {
1.9.2.3 ! misho 806: rpc_SetErr(EINVAL, "Can`t destroy server because parameter is null!");
1.1 misho 807: return;
808: }
809:
1.6 misho 810: rpc_srv_endBLOBServer(*psrv);
1.1 misho 811:
1.4 misho 812: /* close all clients connections & server socket */
1.6 misho 813: for (i = 0, c = (*psrv)->srv_clients; i < (*psrv)->srv_numcli && c; i++, c++)
814: if (c->cli_sa.sa.sa_family) {
1.1 misho 815: shutdown(c->cli_sock, SHUT_RDWR);
1.2 misho 816: close(c->cli_sock);
817: }
1.6 misho 818: close((*psrv)->srv_server.cli_sock);
1.1 misho 819:
1.6 misho 820: if ((*psrv)->srv_clients) {
821: free((*psrv)->srv_clients);
822: (*psrv)->srv_clients = NULL;
823: (*psrv)->srv_numcli = 0;
1.1 misho 824: }
825:
1.4 misho 826: /* detach exported calls */
1.6 misho 827: while ((f = (*psrv)->srv_funcs)) {
828: (*psrv)->srv_funcs = f->func_next;
829: io_freeVars(&f->func_vars);
1.7 misho 830: AIT_FREE_VAL(&f->func_name);
831: AIT_FREE_VAL(&f->func_file);
1.2 misho 832: free(f);
833: }
834:
1.6 misho 835: free(*psrv);
836: *psrv = NULL;
1.1 misho 837: }
838:
839: /*
1.7 misho 840: * rpc_srv_loopServer() - Execute Main server loop and wait for clients requests
841: *
1.1 misho 842: * @srv = RPC Server instance
843: * return: -1 error or 0 ok, infinite loop ...
844: */
845: int
1.5 misho 846: rpc_srv_loopServer(rpc_srv_t * __restrict srv)
1.1 misho 847: {
1.6 misho 848: socklen_t salen = sizeof(io_sockaddr_t);
1.1 misho 849: register int i;
850: rpc_cli_t *c;
1.2 misho 851: int ret;
1.7 misho 852: pthread_attr_t attr;
1.9 misho 853: struct pollfd pfd;
1.1 misho 854:
855: if (!srv) {
1.9.2.3 ! misho 856: rpc_SetErr(EINVAL, "Invalid parameter can`t start RPC server");
1.1 misho 857: return -1;
858: }
859:
1.5 misho 860: /* activate BLOB server worker if srv->srv_blob.state == enable */
861: rpc_srv_execBLOBServer(srv);
862:
1.9.2.1 misho 863: if (listen(srv->srv_server.cli_sock, srv->srv_numcli) == -1) {
1.1 misho 864: LOGERR;
865: return -1;
1.9 misho 866: } else
867: fcntl(srv->srv_server.cli_sock, F_SETFL, fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
1.1 misho 868:
1.7 misho 869: pthread_attr_init(&attr);
870: pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
871:
1.9 misho 872: pfd.fd = srv->srv_server.cli_sock;
873: pfd.events = POLLIN | POLLPRI;
1.7 misho 874: /* main rpc loop */
1.4 misho 875: while (srv->srv_kill != kill) {
1.1 misho 876: for (c = srv->srv_clients, i = 0; i < srv->srv_numcli && c; i++, c++)
1.6 misho 877: if (!c->cli_sa.sa.sa_family)
1.1 misho 878: break;
1.2 misho 879: if (i >= srv->srv_numcli) {
1.5 misho 880: #ifdef HAVE_PTHREAD_YIELD
881: pthread_yield();
882: #endif
1.9.2.1 misho 883: usleep(1000000);
1.1 misho 884: continue;
885: }
1.2 misho 886:
1.9 misho 887: if ((ret = poll(&pfd, 1, srv->srv_session.sess_timeout * 1000)) == -1 ||
888: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
1.2 misho 889: LOGERR;
890: ret = 1;
891: break;
1.9 misho 892: } else if (!ret)
1.2 misho 893: continue;
894:
1.6 misho 895: c->cli_sock = accept(srv->srv_server.cli_sock, &c->cli_sa.sa, &salen);
1.1 misho 896: if (c->cli_sock == -1) {
897: LOGERR;
898: continue;
899: } else
900: c->cli_parent = srv;
901:
1.7 misho 902: /* spawn rpc client dispatcher */
1.9 misho 903: c->cli_kill = enable;
1.7 misho 904: if (pthread_create(&c->cli_tid, &attr, rpc_srv_dispatchCall, c)) {
1.1 misho 905: LOGERR;
906: continue;
1.7 misho 907: }
1.1 misho 908: }
909:
1.7 misho 910: pthread_attr_destroy(&attr);
1.1 misho 911: return 0;
912: }
913:
914: // ---------------------------------------------------------
915:
916: /*
917: * rpc_srv_execCall() Execute registered call from RPC server
1.7 misho 918: *
1.1 misho 919: * @call = Register RPC call
920: * @rpc = IN RPC call structure
1.5 misho 921: * @args = IN RPC calling arguments from RPC client
1.1 misho 922: * return: -1 error, !=-1 ok
923: */
924: int
1.2 misho 925: rpc_srv_execCall(rpc_func_t * __restrict call, struct tagRPCCall * __restrict rpc,
1.5 misho 926: array_t * __restrict args)
1.1 misho 927: {
928: void *dl;
929: rpc_callback_t func;
930: int ret;
931:
1.2 misho 932: if (!call || !rpc || !call->func_parent) {
1.7 misho 933: rpc_SetErr(EINVAL, "Invalid parameter can`t exec function");
1.1 misho 934: return -1;
935: }
936:
1.9 misho 937: dl = dlopen(AIT_ADDR(&call->func_file), RTLD_NOW);
1.1 misho 938: if (!dl) {
1.7 misho 939: rpc_SetErr(ENOENT, "Can`t attach module %s!", dlerror());
1.1 misho 940: return -1;
941: }
942:
1.7 misho 943: func = dlsym(dl, (const char*) AIT_GET_STR(&call->func_name));
1.1 misho 944: if (func)
1.6 misho 945: ret = func(call, ntohs(rpc->call_argc), args);
1.1 misho 946: else {
1.7 misho 947: rpc_SetErr(ENOEXEC, "Can`t find function %s!", dlerror());
1.1 misho 948: ret = -1;
949: }
950:
951: dlclose(dl);
952: return ret;
953: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>