Annotation of libaitrpc/src/srv.c, revision 1.6.2.11
1.1 misho 1: /*************************************************************************
2: * (C) 2010 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.6.2.11! misho 6: * $Id: srv.c,v 1.6.2.10 2012/03/15 01:22:55 misho Exp $
1.1 misho 7: *
1.2 misho 8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
1.6.2.1 misho 15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
1.2 misho 16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
1.1 misho 46: #include "global.h"
47:
48:
1.6.2.4 misho 49: static void *rxPacket(sched_task_t*);
1.6.2.8 misho 50: static void *rxBLOB(sched_task_t*);
1.6.2.4 misho 51:
52: static void *
53: txPacket(sched_task_t *task)
54: {
55: rpc_cli_t *c = TASK_ARG(task);
56: rpc_srv_t *s = c->cli_parent;
57: rpc_func_t *f = NULL;
58: u_char *buf = TASK_DATA(task);
59: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
60: int ret, wlen = sizeof(struct tagRPCCall);
61: array_t *arr = NULL;
62:
1.6.2.11! misho 63: ioTRACE(RPC_TRACE_LEVEL);
1.6.2.7 misho 64:
1.6.2.4 misho 65: if (rpc->call_argc) {
66: f = rpc_srv_getCall(s, ntohs(rpc->call_tag), ntohl(rpc->call_hash));
67: if (!f) {
68: rpc->call_argc ^= rpc->call_argc;
69: rpc->call_rep.ret = RPC_ERROR(-1);
70: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
71: } else {
72: rpc->call_argc = htons(rpc_srv_getVars(f, &arr));
73: /* Go Encapsulate variables */
74: ret = io_vars2buffer(buf + wlen, TASK_DATLEN(task) - wlen, arr);
75: io_clrVars(f->func_vars);
76: if (ret == -1) {
77: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
78: rpc->call_argc ^= rpc->call_argc;
79: rpc->call_rep.ret = RPC_ERROR(-1);
80: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
81: } else
82: wlen += ret;
83: }
84: }
85:
1.6.2.7 misho 86: /* calculate CRC */
87: rpc->call_crc ^= rpc->call_crc;
1.6.2.9 misho 88: rpc->call_crc = htons(crcFletcher16((u_short*) buf, io_align(wlen, 1) / 2));
1.6.2.7 misho 89:
1.6.2.4 misho 90: /* send reply */
91: ret = send(TASK_FD(task), buf, wlen, 0);
92: if (ret == -1)
93: LOGERR;
94: else if (ret != wlen)
95: rpc_SetErr(EPROCUNAVAIL, "RPC reply, should be send %d bytes, "
96: "really sended %d bytes", wlen, ret);
1.6.2.7 misho 97: else
1.6.2.11! misho 98: ioDEBUG(RPC_DEBUG_LEVEL, "Sended %d bytes", ret);
1.6.2.4 misho 99:
100: return NULL;
101: }
102:
103: static void *
104: execCall(sched_task_t *task)
105: {
106: rpc_cli_t *c = TASK_ARG(task);
107: rpc_srv_t *s = c->cli_parent;
108: rpc_func_t *f = NULL;
109: array_t *arr = NULL;
110: u_char *buf = TASK_DATA(task);
111: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
112: int argc = ntohs(rpc->call_argc);
113:
1.6.2.11! misho 114: ioTRACE(RPC_TRACE_LEVEL);
1.6.2.7 misho 115:
1.6.2.4 misho 116: /* Go decapsulate variables ... */
117: if (!(rpc->call_req.flags & RPC_NOREPLY) && argc) {
118: arr = io_buffer2vars(buf + sizeof(struct tagRPCCall),
119: TASK_DATLEN(task) - sizeof(struct tagRPCCall), argc, 1);
120: if (!arr) {
121: rpc_SetErr(ERPCMISMATCH, "#%d - %s", io_GetErrno(), io_GetError());
122: rpc->call_argc ^= rpc->call_argc;
123: rpc->call_rep.ret = RPC_ERROR(-1);
124: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
125: return NULL;
126: }
127: }
128:
129: if (!(f = rpc_srv_getCall(s, ntohs(rpc->call_tag), ntohl(rpc->call_hash)))) {
130: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
131: rpc->call_argc ^= rpc->call_argc;
132: rpc->call_rep.ret = RPC_ERROR(-1);
133: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
134: } else {
1.6.2.11! misho 135: ioDEBUG(RPC_DEBUG_LEVEL, "RPC function %s from module %s",
! 136: AIT_GET_STR(&f->func_name), AIT_GET_LIKE(&f->func_file, char*));
1.6.2.7 misho 137:
1.6.2.4 misho 138: rpc->call_rep.ret = RPC_ERROR(rpc_srv_execCall(f, rpc, arr));
139: if (rpc->call_rep.ret == htonl(-1)) {
140: rpc->call_rep.eno = RPC_ERROR(errno);
141: rpc->call_argc ^= rpc->call_argc;
142: } else {
143: rpc->call_rep.eno ^= rpc->call_rep.eno;
144: rpc->call_argc = htons(rpc_srv_getVars(f, NULL));
145: }
146: }
147:
148: if (arr)
149: io_arrayDestroy(&arr);
150: return NULL;
151: }
152:
153: static void *
154: rxPacket(sched_task_t *task)
155: {
156: rpc_cli_t *c = TASK_ARG(task);
157: rpc_srv_t *s = c->cli_parent;
158: u_char *buf = TASK_DATA(task);
159: int rlen;
160: u_short crc;
161: struct tagRPCCall *rpc;
162: struct timespec ts;
163:
1.6.2.11! misho 164: ioTRACE(RPC_TRACE_LEVEL);
1.6.2.7 misho 165:
1.6.2.4 misho 166: memset(buf, 0, TASK_DATLEN(task));
167: rlen = recv(TASK_FD(task), buf, TASK_DATLEN(task), 0);
168: if (rlen == -1) {
169: LOGERR;
1.6.2.8 misho 170: s->srv_kill = s->srv_blob.state = kill;
1.6.2.4 misho 171: return NULL;
172: } else if (!rlen) { /* receive EOF */
1.6.2.8 misho 173: s->srv_kill = s->srv_blob.state = kill;
1.6.2.4 misho 174: return NULL;
1.6.2.7 misho 175: } else
1.6.2.11! misho 176: ioDEBUG(RPC_DEBUG_LEVEL, "Readed %d bytes", rlen);
1.6.2.7 misho 177:
1.6.2.4 misho 178: if (rlen < sizeof(struct tagRPCCall)) {
179: rpc_SetErr(ERPCMISMATCH, "Too short RPC packet");
180:
181: schedRead(TASK_ROOT(task), rxPacket, TASK_ARG(task), TASK_FD(task),
182: TASK_DATA(task), TASK_DATLEN(task));
183: return NULL;
184: } else
185: rpc = (struct tagRPCCall*) buf;
186:
187: /* check integrity of packet */
188: crc = ntohs(rpc->call_crc);
189: rpc->call_crc ^= rpc->call_crc;
1.6.2.9 misho 190: if (crc != crcFletcher16((u_short*) buf, io_align(rlen, 1) / 2)) {
1.6.2.4 misho 191: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
192:
193: schedRead(TASK_ROOT(task), rxPacket, TASK_ARG(task), TASK_FD(task),
194: TASK_DATA(task), TASK_DATLEN(task));
195: return NULL;
196: }
197:
198: /* check RPC packet session info */
199: if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) {
200: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
201: rpc->call_argc ^= rpc->call_argc;
202: rpc->call_rep.ret = RPC_ERROR(-1);
203: rpc->call_rep.eno = RPC_ERROR(errno);
204: goto end;
205: } else {
206: /* change socket timeout from last packet */
207: ts.tv_sec = rpc->call_session.sess_timeout;
208: ts.tv_nsec = 0;
209: schedPolling(TASK_ROOT(task), &ts, NULL);
210: }
211:
212: /* execute RPC call */
213: schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), 0,
214: TASK_DATA(task), TASK_DATLEN(task));
215:
216: end:
217: /* send RPC reply */
218: if (!(rpc->call_req.flags & RPC_NOREPLY))
219: schedWrite(TASK_ROOT(task), txPacket, TASK_ARG(task), TASK_FD(task),
220: TASK_DATA(task), TASK_DATLEN(task));
1.6.2.8 misho 221: /* lets get next packet */
222: schedRead(TASK_ROOT(task), rxPacket, TASK_ARG(task), TASK_FD(task),
223: TASK_DATA(task), TASK_DATLEN(task));
1.6.2.4 misho 224: return NULL;
225: }
226:
1.1 misho 227: static void *
228: rpc_srv_dispatchCall(void *arg)
229: {
230: rpc_cli_t *c = arg;
231: rpc_srv_t *s;
1.5 misho 232: u_char *buf;
1.6.2.4 misho 233: sched_root_task_t *root;
234: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
1.1 misho 235:
1.6.2.11! misho 236: ioTRACE(RPC_TRACE_LEVEL);
1.6.2.7 misho 237:
1.1 misho 238: if (!arg) {
1.6.2.4 misho 239: rpc_SetErr(EINVAL, "Invalid parameter can`t procced RPC client");
1.1 misho 240: return NULL;
241: } else
242: s = c->cli_parent;
243:
1.6.2.4 misho 244: /* allocate net buffer */
1.5 misho 245: buf = malloc(s->srv_netbuf);
246: if (!buf) {
247: LOGERR;
248: return NULL;
249: }
250:
1.6.2.4 misho 251: root = schedBegin();
252: if (!root) {
253: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
254: free(buf);
255: return NULL;
256: } else {
257: schedTermCondition(root, kill);
258: schedPolling(root, &ts, NULL);
259: }
1.5 misho 260:
1.6.2.4 misho 261: schedRead(root, rxPacket, c, c->cli_sock, buf, s->srv_netbuf);
1.5 misho 262:
1.6.2.5 misho 263: schedRun(root, (void*) &s->srv_kill);
1.6.2.4 misho 264: schedEnd(&root);
1.1 misho 265:
266: shutdown(c->cli_sock, SHUT_RDWR);
267: close(c->cli_sock);
268: memset(c, 0, sizeof(rpc_cli_t));
1.5 misho 269: free(buf);
1.6.2.4 misho 270: return NULL;
1.2 misho 271: }
272:
273:
274: static void *
1.6.2.8 misho 275: txBLOB(sched_task_t *task)
276: {
277: u_char *buf = TASK_DATA(task);
278: struct tagBLOBHdr *blob = (struct tagBLOBHdr *) buf;
279: int wlen = sizeof(struct tagBLOBHdr);
280:
1.6.2.11! misho 281: ioTRACE(RPC_TRACE_LEVEL);
1.6.2.8 misho 282:
283: /* calculate CRC */
284: blob->hdr_crc ^= blob->hdr_crc;
1.6.2.9 misho 285: blob->hdr_crc = htons(crcFletcher16((u_short*) buf, io_align(wlen, 1) / 2));
1.6.2.8 misho 286:
287: /* send reply */
288: wlen = send(TASK_FD(task), buf, wlen, 0);
289: if (wlen == -1)
290: LOGERR;
291: else if (wlen != sizeof(struct tagBLOBHdr))
292: rpc_SetErr(EPROCUNAVAIL, "RPC reply, should be send %d bytes, "
293: "really sended %d bytes", sizeof(struct tagBLOBHdr), wlen);
294: else
1.6.2.11! misho 295: ioDEBUG(RPC_DEBUG_LEVEL, "Sended %d bytes", wlen);
1.6.2.8 misho 296:
297: return NULL;
298: }
299:
300: static void *
301: rxBLOB(sched_task_t *task)
302: {
303: rpc_cli_t *c = TASK_ARG(task);
304: rpc_srv_t *s = c->cli_parent;
305: rpc_blob_t *b;
306: u_char *buf = TASK_DATA(task);
307: struct tagBLOBHdr *blob = (struct tagBLOBHdr *) buf;
308: int rlen;
309: u_short crc;
310: struct timespec ts;
311:
1.6.2.11! misho 312: ioTRACE(RPC_TRACE_LEVEL);
1.6.2.8 misho 313:
314: /* check for disable service at this moment? */
315: if (s->srv_blob.state == disable) {
316: usleep(100000);
317: #ifdef HAVE_PTHREAD_YIELD
318: pthread_yield();
319: #endif
320: schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task),
321: TASK_DATA(task), TASK_DATLEN(task));
322: return NULL;
323: }
324:
325: memset(buf, 0, TASK_DATLEN(task));
326: rlen = recv(TASK_FD(task), buf, TASK_DATLEN(task), 0);
327: if (rlen == -1) {
328: LOGERR;
329: s->srv_blob.state = kill;
330: return NULL;
331: } else if (!rlen || s->srv_kill == kill) { /* receive EOF */
332: s->srv_blob.state = kill;
333: return NULL;
334: } else
1.6.2.11! misho 335: ioDEBUG(RPC_DEBUG_LEVEL, "Readed %d bytes", rlen);
1.6.2.8 misho 336:
337: if (rlen < sizeof(struct tagBLOBHdr)) {
338: rpc_SetErr(ERPCMISMATCH, "Too short BLOB packet");
339: schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task),
340: TASK_DATA(task), TASK_DATLEN(task));
341: return NULL;
342: }
343:
344: /* check integrity of packet */
345: crc = ntohs(blob->hdr_crc);
346: blob->hdr_crc ^= blob->hdr_crc;
1.6.2.9 misho 347: if (crc != crcFletcher16((u_short*) buf, io_align(rlen, 1) / 2)) {
1.6.2.8 misho 348: rpc_SetErr(ERPCMISMATCH, "Bad CRC BLOB packet");
349: schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task),
350: TASK_DATA(task), TASK_DATLEN(task));
351: return NULL;
352: }
353:
354: /* check RPC packet session info */
1.6.2.9 misho 355: if ((crc = rpc_chkPktSession(&blob->hdr_session, &s->srv_session))) {
1.6.2.8 misho 356: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
357: blob->hdr_cmd = error;
358: goto end;
359: } else {
360: /* change socket timeout from last packet */
361: ts.tv_sec = blob->hdr_session.sess_timeout;
362: ts.tv_nsec = 0;
363: schedPolling(TASK_ROOT(task), &ts, NULL);
364: }
365:
366: /* Go to proceed packet ... */
367: switch (blob->hdr_cmd) {
368: case get:
369: if (!(b = rpc_srv_getBLOB(s, ntohl(blob->hdr_var)))) {
370: rpc_SetErr(EINVAL, "Var=%x not found", ntohl(blob->hdr_var));
371: blob->hdr_cmd = no;
372: blob->hdr_ret = RPC_ERROR(-1);
373: break;
374: } else
375: blob->hdr_len = htonl(b->blob_len);
376:
377: if (rpc_srv_blobMap(s, b) != -1) {
378: /* deliver BLOB variable to client */
379: blob->hdr_ret = htonl(rpc_srv_sendBLOB(c, b));
380: rpc_srv_blobUnmap(b);
381: } else {
382: blob->hdr_cmd = error;
383: blob->hdr_ret = RPC_ERROR(-1);
384: }
385: break;
386: case set:
387: if ((b = rpc_srv_registerBLOB(s, ntohl(blob->hdr_len)))) {
388: /* set new BLOB variable for reply :) */
389: blob->hdr_var = htonl(b->blob_var);
390:
391: /* receive BLOB from client */
392: blob->hdr_ret = htonl(rpc_srv_recvBLOB(c, b));
393: rpc_srv_blobUnmap(b);
394: } else {
395: blob->hdr_cmd = error;
396: blob->hdr_ret = RPC_ERROR(-1);
397: }
398: break;
399: case unset:
400: if (rpc_srv_unregisterBLOB(s, blob->hdr_var) == -1) {
401: blob->hdr_cmd = error;
402: blob->hdr_ret = RPC_ERROR(-1);
403: }
404: break;
405: default:
406: rpc_SetErr(EPROCUNAVAIL, "Unsupported BLOB command %d", blob->hdr_cmd);
407: blob->hdr_cmd = error;
408: blob->hdr_ret = RPC_ERROR(-1);
409: }
410:
411: end:
412: schedWrite(TASK_ROOT(task), txBLOB, TASK_ARG(task), TASK_FD(task),
413: TASK_DATA(task), TASK_DATLEN(task));
414: schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task),
415: TASK_DATA(task), TASK_DATLEN(task));
416: return NULL;
417: }
418:
419: static void *
1.2 misho 420: rpc_srv_dispatchVars(void *arg)
421: {
422: rpc_cli_t *c = arg;
423: rpc_srv_t *s;
1.6.2.8 misho 424: sched_root_task_t *root;
425: u_char *buf;
426: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
1.2 misho 427:
1.6.2.11! misho 428: ioTRACE(RPC_TRACE_LEVEL);
1.6.2.7 misho 429:
1.2 misho 430: if (!arg) {
1.6.2.8 misho 431: rpc_SetErr(EINVAL, "Invalid parameter can`t procced BLOB");
1.2 misho 432: return NULL;
433: } else
434: s = c->cli_parent;
435:
1.6.2.8 misho 436: /* allocate net buffer */
437: buf = malloc(sizeof(struct tagBLOBHdr));
438: if (!buf) {
439: LOGERR;
440: return NULL;
441: }
1.2 misho 442:
1.6.2.8 misho 443: root = schedBegin();
444: if (!root) {
445: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
446: free(buf);
447: return NULL;
448: } else {
449: schedTermCondition(root, kill);
450: schedPolling(root, &ts, NULL);
451: }
1.4 misho 452:
1.6.2.8 misho 453: schedRead(root, rxBLOB, c, c->cli_sock, buf, sizeof(struct tagBLOBHdr));
1.2 misho 454:
1.6.2.8 misho 455: schedRun(root, (void*) &s->srv_blob.state);
456: schedEnd(&root);
457:
458: shutdown(c->cli_sock, SHUT_RDWR);
459: close(c->cli_sock);
460: memset(c, 0, sizeof(rpc_cli_t));
461: free(buf);
462: return NULL;
1.1 misho 463: }
464:
465: // -------------------------------------------------
466:
467: /*
1.6.2.4 misho 468: * rpc_srv_initBLOBServer() - Init & create BLOB Server
469: *
1.4 misho 470: * @srv = RPC server instance
1.2 misho 471: * @Port = Port for bind server, if Port == 0 default port is selected
472: * @diskDir = Disk place for BLOB file objects
473: * return: -1 == error or 0 bind and created BLOB server instance
474: */
475: int
476: rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)
477: {
478: int n = 1;
1.6 misho 479: io_sockaddr_t sa;
1.2 misho 480:
1.6.2.11! misho 481: ioTRACE(RPC_TRACE_LEVEL);
1.6.2.7 misho 482:
1.2 misho 483: if (!srv) {
1.6.2.1 misho 484: rpc_SetErr(EINVAL, "Invalid parameters can`t init BLOB server");
1.2 misho 485: return -1;
486: }
487: if (srv->srv_blob.state) {
1.6.2.1 misho 488: rpc_SetErr(EPERM, "Already started BLOB server!");
1.2 misho 489: return 0;
490: }
491:
492: memset(&srv->srv_blob, 0, sizeof srv->srv_blob);
493: if (access(diskDir, R_OK | W_OK) == -1) {
494: LOGERR;
495: return -1;
496: } else
1.6.2.6 misho 497: AIT_SET_STR(&srv->srv_blob.dir, diskDir);
1.2 misho 498:
499: srv->srv_blob.server.cli_tid = pthread_self();
500: srv->srv_blob.server.cli_parent = srv;
1.4 misho 501:
502: memcpy(&sa, &srv->srv_server.cli_sa, sizeof sa);
1.6 misho 503: switch (sa.sa.sa_family) {
1.4 misho 504: case AF_INET:
1.6 misho 505: sa.sin.sin_port = htons(Port ? Port : ntohs(sa.sin.sin_port) + 1);
1.4 misho 506: break;
507: case AF_INET6:
1.6 misho 508: sa.sin6.sin6_port = htons(Port ? Port : ntohs(sa.sin6.sin6_port) + 1);
1.4 misho 509: break;
510: case AF_LOCAL:
1.6 misho 511: strlcat(sa.sun.sun_path, ".blob", sizeof sa.sun.sun_path);
1.4 misho 512: break;
513: default:
1.6.2.6 misho 514: AIT_FREE_VAL(&srv->srv_blob.dir);
1.4 misho 515: return -1;
1.2 misho 516: }
1.6 misho 517: memcpy(&srv->srv_blob.server.cli_sa, &sa, sizeof sa);
1.2 misho 518:
1.4 misho 519: /* create BLOB server socket */
1.6 misho 520: srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);
1.2 misho 521: if (srv->srv_blob.server.cli_sock == -1) {
522: LOGERR;
1.6.2.6 misho 523: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 524: return -1;
525: }
526: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
527: LOGERR;
528: close(srv->srv_blob.server.cli_sock);
1.6.2.6 misho 529: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 530: return -1;
531: }
1.5 misho 532: n = srv->srv_netbuf;
533: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
534: LOGERR;
535: close(srv->srv_blob.server.cli_sock);
1.6.2.6 misho 536: AIT_FREE_VAL(&srv->srv_blob.dir);
1.5 misho 537: return -1;
538: }
539: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
540: LOGERR;
541: close(srv->srv_blob.server.cli_sock);
1.6.2.6 misho 542: AIT_FREE_VAL(&srv->srv_blob.dir);
1.5 misho 543: return -1;
544: }
1.6 misho 545: if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa,
546: srv->srv_blob.server.cli_sa.sa.sa_len) == -1) {
1.2 misho 547: LOGERR;
548: close(srv->srv_blob.server.cli_sock);
1.6.2.6 misho 549: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 550: return -1;
551: }
552:
1.4 misho 553: /* allocate pool for concurent clients */
1.2 misho 554: srv->srv_blob.clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));
555: if (!srv->srv_blob.clients) {
556: LOGERR;
557: close(srv->srv_blob.server.cli_sock);
1.6.2.6 misho 558: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 559: return -1;
560: } else
561: memset(srv->srv_blob.clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));
562:
563: pthread_mutex_init(&srv->srv_blob.mtx, NULL);
564:
565: rpc_srv_registerCall(srv, NULL, CALL_BLOBSHUTDOWN, 0);
1.5 misho 566: rpc_srv_registerCall(srv, NULL, CALL_BLOBCLIENTS, 1);
567: rpc_srv_registerCall(srv, NULL, CALL_BLOBVARS, 1);
568: rpc_srv_registerCall(srv, NULL, CALL_BLOBSTATE, 0);
1.2 misho 569:
1.4 misho 570: srv->srv_blob.state = enable; /* enable BLOB */
1.2 misho 571: return 0;
572: }
573:
574: /*
1.6.2.4 misho 575: * rpc_srv_endBLOBServer() - Destroy BLOB server, close all opened sockets and free resources
576: *
1.2 misho 577: * @srv = RPC Server instance
578: * return: none
579: */
580: void
581: rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
582: {
583: rpc_cli_t *c;
584: register int i;
585: rpc_blob_t *f;
586:
1.6.2.11! misho 587: ioTRACE(RPC_TRACE_LEVEL);
1.6.2.7 misho 588:
1.2 misho 589: if (!srv) {
1.6.2.1 misho 590: rpc_SetErr(EINVAL, "Can`t destroy server because parameter is null!");
1.2 misho 591: return;
592: } else
1.4 misho 593: srv->srv_blob.state = kill;
1.2 misho 594:
595: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSHUTDOWN);
596: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBCLIENTS);
597: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBVARS);
598: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSTATE);
599:
1.4 misho 600: /* close all clients connections & server socket */
1.2 misho 601: for (i = 0, c = srv->srv_blob.clients; i < srv->srv_numcli && c; i++, c++)
1.6 misho 602: if (c->cli_sa.sa.sa_family)
1.2 misho 603: shutdown(c->cli_sock, SHUT_RDWR);
604: close(srv->srv_blob.server.cli_sock);
605:
1.5 misho 606: pthread_mutex_lock(&srv->srv_blob.mtx);
1.2 misho 607: if (srv->srv_blob.clients) {
608: free(srv->srv_blob.clients);
609: srv->srv_blob.clients = NULL;
610: }
611:
1.4 misho 612: /* detach blobs */
1.2 misho 613: while ((f = srv->srv_blob.blobs)) {
614: srv->srv_blob.blobs = f->blob_next;
615: rpc_srv_blobFree(srv, f);
616: free(f);
617: }
618: pthread_mutex_unlock(&srv->srv_blob.mtx);
619:
1.6.2.10 misho 620: AIT_FREE_VAL(&srv->srv_blob.dir);
621:
1.2 misho 622: while (pthread_mutex_trylock(&srv->srv_blob.mtx) == EBUSY);
623: pthread_mutex_destroy(&srv->srv_blob.mtx);
624: }
625:
626: /*
1.6.2.4 misho 627: * rpc_srv_loopBLOB() - Execute Main BLOB server loop and wait for clients requests
628: *
1.2 misho 629: * @srv = RPC Server instance
630: * return: -1 error or 0 ok, infinite loop ...
631: */
632: int
1.5 misho 633: rpc_srv_loopBLOB(rpc_srv_t * __restrict srv)
1.2 misho 634: {
1.6 misho 635: socklen_t salen = sizeof(io_sockaddr_t);
1.2 misho 636: register int i;
637: rpc_cli_t *c;
638: fd_set fds;
639: int ret;
640: struct timeval tv = { DEF_RPC_TIMEOUT, 0 };
1.6.2.1 misho 641: pthread_attr_t attr;
1.2 misho 642:
1.6.2.11! misho 643: ioTRACE(RPC_TRACE_LEVEL);
1.6.2.7 misho 644:
1.4 misho 645: if (!srv || srv->srv_blob.state == kill) {
1.6.2.8 misho 646: rpc_SetErr(EINVAL, "Invalid parameter can`t start BLOB server");
1.2 misho 647: return -1;
648: }
649:
1.6.2.1 misho 650: tv.tv_sec = srv->srv_session.sess_timeout;
651:
1.2 misho 652: if (listen(srv->srv_blob.server.cli_sock, SOMAXCONN) == -1) {
653: LOGERR;
654: return -1;
655: }
656:
1.6.2.1 misho 657: pthread_attr_init(&attr);
658: pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
659:
660: /* main BLOB loop */
1.4 misho 661: while (srv->srv_blob.state != kill && srv->srv_kill != kill) {
1.2 misho 662: for (c = srv->srv_blob.clients, i = 0; i < srv->srv_numcli && c; i++, c++)
1.6 misho 663: if (!c->cli_sa.sa.sa_family)
1.2 misho 664: break;
665: if (i >= srv->srv_numcli) {
1.5 misho 666: #ifdef HAVE_PTHREAD_YIELD
667: pthread_yield();
668: #else
1.2 misho 669: usleep(1000000);
1.5 misho 670: #endif
1.2 misho 671: continue;
672: }
673:
674: FD_ZERO(&fds);
675: FD_SET(srv->srv_blob.server.cli_sock, &fds);
676: ret = select(srv->srv_blob.server.cli_sock + 1, &fds, NULL, NULL, &tv);
677: if (ret == -1) {
678: LOGERR;
679: ret = 1;
680: break;
681: }
682: if (!ret)
683: continue;
684:
1.6 misho 685: c->cli_sock = accept(srv->srv_blob.server.cli_sock, &c->cli_sa.sa, &salen);
1.2 misho 686: if (c->cli_sock == -1) {
687: LOGERR;
688: continue;
689: } else
690: c->cli_parent = srv;
691:
1.6.2.1 misho 692: /* spawn dispatch thread for BLOB client */
693: if (pthread_create(&c->cli_tid, &attr, rpc_srv_dispatchVars, c)) {
1.2 misho 694: LOGERR;
695: continue;
1.6.2.1 misho 696: }
1.2 misho 697: }
698:
1.5 misho 699: srv->srv_blob.state = kill;
1.2 misho 700:
1.6.2.1 misho 701: pthread_attr_destroy(&attr);
1.2 misho 702: return 0;
703: }
704:
705:
706: /*
1.6.2.4 misho 707: * rpc_srv_initServer() - Init & create RPC Server
708: *
1.1 misho 709: * @regProgID = ProgramID for authentication & recognition
710: * @regProcID = ProcessID for authentication & recognition
711: * @concurentClients = Concurent clients at same time to this server
1.5 misho 712: * @netBuf = Network buffer length, if =0 == BUFSIZ (also meaning max RPC packet)
1.4 misho 713: * @family = Family type, AF_INET, AF_INET6 or AF_LOCAL
714: * @csHost = Host name or address for bind server, if NULL any address
1.1 misho 715: * @Port = Port for bind server, if Port == 0 default port is selected
716: * return: NULL == error or !=NULL bind and created RPC server instance
717: */
718: rpc_srv_t *
719: rpc_srv_initServer(u_int regProgID, u_int regProcID, int concurentClients,
1.5 misho 720: int netBuf, u_short family, const char *csHost, u_short Port)
1.1 misho 721: {
722: rpc_srv_t *srv = NULL;
723: int n = 1;
724: struct hostent *host = NULL;
1.6 misho 725: io_sockaddr_t sa;
1.1 misho 726:
1.6.2.11! misho 727: ioTRACE(RPC_TRACE_LEVEL);
1.6.2.7 misho 728:
1.4 misho 729: if (!concurentClients || !regProgID ||
730: (family != AF_INET && family != AF_INET6 && family != AF_LOCAL)) {
1.1 misho 731: rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t init RPC server ...\n");
732: return NULL;
733: }
734: if (!Port)
735: Port = RPC_DEFPORT;
1.5 misho 736: if (!netBuf)
737: netBuf = BUFSIZ;
1.6.2.9 misho 738: else
739: netBuf = io_align(netBuf, 1); /* align netBuf length */
1.4 misho 740: if (csHost && family != AF_LOCAL) {
1.1 misho 741: host = gethostbyname2(csHost, family);
742: if (!host) {
743: rpc_SetErr(h_errno, "Error:: %s\n", hstrerror(h_errno));
744: return NULL;
745: }
746: }
1.4 misho 747: memset(&sa, 0, sizeof sa);
1.6 misho 748: sa.sa.sa_family = family;
1.1 misho 749: switch (family) {
750: case AF_INET:
1.6 misho 751: sa.sin.sin_len = sizeof(struct sockaddr_in);
752: sa.sin.sin_port = htons(Port);
1.1 misho 753: if (csHost)
1.6 misho 754: memcpy(&sa.sin.sin_addr, host->h_addr, host->h_length);
1.1 misho 755: break;
756: case AF_INET6:
1.6 misho 757: sa.sin6.sin6_len = sizeof(struct sockaddr_in6);
758: sa.sin6.sin6_port = htons(Port);
1.4 misho 759: if (csHost)
1.6 misho 760: memcpy(&sa.sin6.sin6_addr, host->h_addr, host->h_length);
1.4 misho 761: break;
762: case AF_LOCAL:
1.6 misho 763: sa.sun.sun_len = sizeof(struct sockaddr_un);
1.1 misho 764: if (csHost)
1.6 misho 765: strlcpy(sa.sun.sun_path, csHost, sizeof sa.sun.sun_path);
766: unlink(sa.sun.sun_path);
1.1 misho 767: break;
768: default:
769: rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t start RPC server ...\n");
770: return NULL;
771: }
772:
773: srv = malloc(sizeof(rpc_srv_t));
774: if (!srv) {
775: LOGERR;
776: return NULL;
777: } else
778: memset(srv, 0, sizeof(rpc_srv_t));
779:
1.5 misho 780: srv->srv_netbuf = netBuf;
1.1 misho 781: srv->srv_numcli = concurentClients;
782: srv->srv_session.sess_version = RPC_VERSION;
1.6.2.1 misho 783: srv->srv_session.sess_timeout = DEF_RPC_TIMEOUT;
1.1 misho 784: srv->srv_session.sess_program = regProgID;
785: srv->srv_session.sess_process = regProcID;
786:
1.2 misho 787: srv->srv_server.cli_tid = pthread_self();
1.1 misho 788: srv->srv_server.cli_parent = srv;
1.6 misho 789: memcpy(&srv->srv_server.cli_sa, &sa, sizeof sa);
1.4 misho 790:
791: /* create server socket */
1.1 misho 792: srv->srv_server.cli_sock = socket(family, SOCK_STREAM, 0);
793: if (srv->srv_server.cli_sock == -1) {
794: LOGERR;
795: free(srv);
796: return NULL;
797: }
798: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
799: LOGERR;
800: close(srv->srv_server.cli_sock);
801: free(srv);
802: return NULL;
803: }
1.5 misho 804: n = srv->srv_netbuf;
805: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
806: LOGERR;
807: close(srv->srv_server.cli_sock);
808: free(srv);
809: return NULL;
810: }
811: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
812: LOGERR;
813: close(srv->srv_server.cli_sock);
814: free(srv);
815: return NULL;
816: }
1.6 misho 817: if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa,
818: srv->srv_server.cli_sa.sa.sa_len) == -1) {
1.1 misho 819: LOGERR;
820: close(srv->srv_server.cli_sock);
821: free(srv);
822: return NULL;
823: }
824:
1.4 misho 825: /* allocate pool for concurent clients */
1.1 misho 826: srv->srv_clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));
827: if (!srv->srv_clients) {
828: LOGERR;
829: close(srv->srv_server.cli_sock);
830: free(srv);
831: return NULL;
832: } else
833: memset(srv->srv_clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));
834:
1.2 misho 835: pthread_mutex_init(&srv->srv_mtx, NULL);
836:
837: rpc_srv_registerCall(srv, NULL, CALL_SRVSHUTDOWN, 0);
1.5 misho 838: rpc_srv_registerCall(srv, NULL, CALL_SRVCLIENTS, 1);
839: rpc_srv_registerCall(srv, NULL, CALL_SRVSESSIONS, 4);
840: rpc_srv_registerCall(srv, NULL, CALL_SRVCALLS, 1);
1.1 misho 841: return srv;
842: }
843:
844: /*
1.6.2.4 misho 845: * rpc_srv_endServer() - Destroy RPC server, close all opened sockets and free resources
846: *
1.6 misho 847: * @psrv = RPC Server instance
1.1 misho 848: * return: none
849: */
850: void
1.6 misho 851: rpc_srv_endServer(rpc_srv_t ** __restrict psrv)
1.1 misho 852: {
853: rpc_cli_t *c;
854: register int i;
855: rpc_func_t *f;
856:
1.6.2.11! misho 857: ioTRACE(RPC_TRACE_LEVEL);
1.6.2.7 misho 858:
1.6 misho 859: if (!psrv || !*psrv) {
1.1 misho 860: rpc_SetErr(EINVAL, "Error:: Can`t destroy server because parameter is null!\n");
861: return;
862: }
863:
1.6 misho 864: rpc_srv_endBLOBServer(*psrv);
1.1 misho 865:
1.4 misho 866: /* close all clients connections & server socket */
1.6 misho 867: for (i = 0, c = (*psrv)->srv_clients; i < (*psrv)->srv_numcli && c; i++, c++)
868: if (c->cli_sa.sa.sa_family) {
1.1 misho 869: shutdown(c->cli_sock, SHUT_RDWR);
1.2 misho 870: close(c->cli_sock);
871: }
1.6 misho 872: close((*psrv)->srv_server.cli_sock);
1.1 misho 873:
1.6 misho 874: if ((*psrv)->srv_clients) {
875: free((*psrv)->srv_clients);
876: (*psrv)->srv_clients = NULL;
877: (*psrv)->srv_numcli = 0;
1.1 misho 878: }
879:
1.4 misho 880: /* detach exported calls */
1.6 misho 881: pthread_mutex_lock(&(*psrv)->srv_mtx);
882: while ((f = (*psrv)->srv_funcs)) {
883: (*psrv)->srv_funcs = f->func_next;
884: io_freeVars(&f->func_vars);
1.6.2.4 misho 885: AIT_FREE_VAL(&f->func_name);
886: AIT_FREE_VAL(&f->func_file);
1.2 misho 887: free(f);
888: }
1.6 misho 889: pthread_mutex_unlock(&(*psrv)->srv_mtx);
1.2 misho 890:
1.6 misho 891: while (pthread_mutex_trylock(&(*psrv)->srv_mtx) == EBUSY);
892: pthread_mutex_destroy(&(*psrv)->srv_mtx);
1.2 misho 893:
1.6 misho 894: free(*psrv);
895: *psrv = NULL;
1.1 misho 896: }
897:
898: /*
1.6.2.4 misho 899: * rpc_srv_loopServer() - Execute Main server loop and wait for clients requests
900: *
1.1 misho 901: * @srv = RPC Server instance
902: * return: -1 error or 0 ok, infinite loop ...
903: */
904: int
1.5 misho 905: rpc_srv_loopServer(rpc_srv_t * __restrict srv)
1.1 misho 906: {
1.6 misho 907: socklen_t salen = sizeof(io_sockaddr_t);
1.1 misho 908: register int i;
909: rpc_cli_t *c;
1.2 misho 910: fd_set fds;
911: int ret;
912: struct timeval tv = { DEF_RPC_TIMEOUT, 0 };
1.6.2.2 misho 913: pthread_attr_t attr;
1.1 misho 914:
1.6.2.11! misho 915: ioTRACE(RPC_TRACE_LEVEL);
1.6.2.7 misho 916:
1.1 misho 917: if (!srv) {
918: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start RPC server ...\n");
919: return -1;
920: }
921:
1.6.2.3 misho 922: tv.tv_sec = srv->srv_session.sess_timeout;
923:
1.5 misho 924: /* activate BLOB server worker if srv->srv_blob.state == enable */
925: rpc_srv_execBLOBServer(srv);
926:
1.1 misho 927: if (listen(srv->srv_server.cli_sock, SOMAXCONN) == -1) {
928: LOGERR;
929: return -1;
930: }
931:
1.6.2.2 misho 932: pthread_attr_init(&attr);
933: pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
934:
1.6.2.3 misho 935: /* main rpc loop */
1.4 misho 936: while (srv->srv_kill != kill) {
1.1 misho 937: for (c = srv->srv_clients, i = 0; i < srv->srv_numcli && c; i++, c++)
1.6 misho 938: if (!c->cli_sa.sa.sa_family)
1.1 misho 939: break;
1.2 misho 940: if (i >= srv->srv_numcli) {
1.5 misho 941: #ifdef HAVE_PTHREAD_YIELD
942: pthread_yield();
943: #else
1.1 misho 944: usleep(1000000);
1.5 misho 945: #endif
1.1 misho 946: continue;
947: }
1.2 misho 948:
949: FD_ZERO(&fds);
950: FD_SET(srv->srv_server.cli_sock, &fds);
951: ret = select(srv->srv_server.cli_sock + 1, &fds, NULL, NULL, &tv);
952: if (ret == -1) {
953: LOGERR;
954: ret = 1;
955: break;
956: }
957: if (!ret)
958: continue;
959:
1.6 misho 960: c->cli_sock = accept(srv->srv_server.cli_sock, &c->cli_sa.sa, &salen);
1.1 misho 961: if (c->cli_sock == -1) {
962: LOGERR;
963: continue;
964: } else
965: c->cli_parent = srv;
966:
1.6.2.3 misho 967: /* spawn rpc client dispatcher */
1.6.2.2 misho 968: if (pthread_create(&c->cli_tid, &attr, rpc_srv_dispatchCall, c)) {
1.1 misho 969: LOGERR;
970: continue;
1.6.2.2 misho 971: }
1.1 misho 972: }
973:
1.6.2.2 misho 974: pthread_attr_destroy(&attr);
1.1 misho 975: return 0;
976: }
977:
978: // ---------------------------------------------------------
979:
980: /*
981: * rpc_srv_execCall() Execute registered call from RPC server
1.6.2.4 misho 982: *
1.1 misho 983: * @call = Register RPC call
984: * @rpc = IN RPC call structure
1.5 misho 985: * @args = IN RPC calling arguments from RPC client
1.1 misho 986: * return: -1 error, !=-1 ok
987: */
988: int
1.2 misho 989: rpc_srv_execCall(rpc_func_t * __restrict call, struct tagRPCCall * __restrict rpc,
1.5 misho 990: array_t * __restrict args)
1.1 misho 991: {
992: void *dl;
993: rpc_callback_t func;
994: int ret;
995:
1.6.2.11! misho 996: ioTRACE(RPC_TRACE_LEVEL);
1.6.2.7 misho 997:
1.2 misho 998: if (!call || !rpc || !call->func_parent) {
1.6.2.4 misho 999: rpc_SetErr(EINVAL, "Invalid parameter can`t exec function");
1.1 misho 1000: return -1;
1001: }
1002:
1.6.2.4 misho 1003: dl = dlopen(AIT_VOID(&call->func_file), RTLD_NOW);
1.1 misho 1004: if (!dl) {
1.6.2.4 misho 1005: rpc_SetErr(ENOENT, "Can`t attach module %s!", dlerror());
1.1 misho 1006: return -1;
1007: }
1008:
1.6.2.5 misho 1009: func = dlsym(dl, (const char*) AIT_GET_STR(&call->func_name));
1.1 misho 1010: if (func)
1.6 misho 1011: ret = func(call, ntohs(rpc->call_argc), args);
1.1 misho 1012: else {
1.6.2.4 misho 1013: rpc_SetErr(ENOEXEC, "Can`t find function %s!", dlerror());
1.1 misho 1014: ret = -1;
1015: }
1016:
1017: dlclose(dl);
1018: return ret;
1019: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>