Annotation of libaitrpc/src/srv.c, revision 1.6.2.8
1.1 misho 1: /*************************************************************************
2: * (C) 2010 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.6.2.8 ! misho 6: * $Id: srv.c,v 1.6.2.7 2012/03/14 13:29:11 misho Exp $
1.1 misho 7: *
1.2 misho 8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
1.6.2.1 misho 15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
1.2 misho 16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
1.1 misho 46: #include "global.h"
47:
48:
1.6.2.4 misho 49: static void *rxPacket(sched_task_t*);
1.6.2.8 ! misho 50: static void *rxBLOB(sched_task_t*);
1.6.2.4 misho 51:
52: static void *
53: txPacket(sched_task_t *task)
54: {
55: rpc_cli_t *c = TASK_ARG(task);
56: rpc_srv_t *s = c->cli_parent;
57: rpc_func_t *f = NULL;
58: u_char *buf = TASK_DATA(task);
59: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
60: int ret, wlen = sizeof(struct tagRPCCall);
61: array_t *arr = NULL;
62:
1.6.2.7 misho 63: FTRACE();
64:
1.6.2.4 misho 65: if (rpc->call_argc) {
66: f = rpc_srv_getCall(s, ntohs(rpc->call_tag), ntohl(rpc->call_hash));
67: if (!f) {
68: rpc->call_argc ^= rpc->call_argc;
69: rpc->call_rep.ret = RPC_ERROR(-1);
70: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
71: } else {
72: rpc->call_argc = htons(rpc_srv_getVars(f, &arr));
73: /* Go Encapsulate variables */
74: ret = io_vars2buffer(buf + wlen, TASK_DATLEN(task) - wlen, arr);
75: io_clrVars(f->func_vars);
76: if (ret == -1) {
77: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
78: rpc->call_argc ^= rpc->call_argc;
79: rpc->call_rep.ret = RPC_ERROR(-1);
80: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
81: } else
82: wlen += ret;
83: }
84: }
85:
1.6.2.7 misho 86: /* calculate CRC */
87: rpc->call_crc ^= rpc->call_crc;
88: rpc->call_crc = htons(crcFletcher16((u_short*) buf, ((wlen + 1) & ~1) / 2));
89:
1.6.2.4 misho 90: /* send reply */
91: ret = send(TASK_FD(task), buf, wlen, 0);
92: if (ret == -1)
93: LOGERR;
94: else if (ret != wlen)
95: rpc_SetErr(EPROCUNAVAIL, "RPC reply, should be send %d bytes, "
96: "really sended %d bytes", wlen, ret);
1.6.2.7 misho 97: else
98: LOGGER("Sended %d bytes", ret);
1.6.2.4 misho 99:
100: return NULL;
101: }
102:
103: static void *
104: execCall(sched_task_t *task)
105: {
106: rpc_cli_t *c = TASK_ARG(task);
107: rpc_srv_t *s = c->cli_parent;
108: rpc_func_t *f = NULL;
109: array_t *arr = NULL;
110: u_char *buf = TASK_DATA(task);
111: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
112: int argc = ntohs(rpc->call_argc);
113:
1.6.2.7 misho 114: FTRACE();
115:
1.6.2.4 misho 116: /* Go decapsulate variables ... */
117: if (!(rpc->call_req.flags & RPC_NOREPLY) && argc) {
118: arr = io_buffer2vars(buf + sizeof(struct tagRPCCall),
119: TASK_DATLEN(task) - sizeof(struct tagRPCCall), argc, 1);
120: if (!arr) {
121: rpc_SetErr(ERPCMISMATCH, "#%d - %s", io_GetErrno(), io_GetError());
122: rpc->call_argc ^= rpc->call_argc;
123: rpc->call_rep.ret = RPC_ERROR(-1);
124: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
125: return NULL;
126: }
127: }
128:
129: if (!(f = rpc_srv_getCall(s, ntohs(rpc->call_tag), ntohl(rpc->call_hash)))) {
130: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
131: rpc->call_argc ^= rpc->call_argc;
132: rpc->call_rep.ret = RPC_ERROR(-1);
133: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
134: } else {
1.6.2.7 misho 135: LOGGER("RPC function %s from module %s", AIT_GET_STR(&f->func_name),
1.6.2.8 ! misho 136: AIT_GET_LIKE(&f->func_file, char*));
1.6.2.7 misho 137:
1.6.2.4 misho 138: rpc->call_rep.ret = RPC_ERROR(rpc_srv_execCall(f, rpc, arr));
139: if (rpc->call_rep.ret == htonl(-1)) {
140: rpc->call_rep.eno = RPC_ERROR(errno);
141: rpc->call_argc ^= rpc->call_argc;
142: } else {
143: rpc->call_rep.eno ^= rpc->call_rep.eno;
144: rpc->call_argc = htons(rpc_srv_getVars(f, NULL));
145: }
146: }
147:
148: if (arr)
149: io_arrayDestroy(&arr);
150: return NULL;
151: }
152:
153: static void *
154: rxPacket(sched_task_t *task)
155: {
156: rpc_cli_t *c = TASK_ARG(task);
157: rpc_srv_t *s = c->cli_parent;
158: u_char *buf = TASK_DATA(task);
159: int rlen;
160: u_short crc;
161: struct tagRPCCall *rpc;
162: struct timespec ts;
163:
1.6.2.7 misho 164: FTRACE();
165:
1.6.2.4 misho 166: memset(buf, 0, TASK_DATLEN(task));
167: rlen = recv(TASK_FD(task), buf, TASK_DATLEN(task), 0);
168: if (rlen == -1) {
169: LOGERR;
1.6.2.8 ! misho 170: s->srv_kill = s->srv_blob.state = kill;
1.6.2.4 misho 171: return NULL;
172: } else if (!rlen) { /* receive EOF */
1.6.2.8 ! misho 173: s->srv_kill = s->srv_blob.state = kill;
1.6.2.4 misho 174: return NULL;
1.6.2.7 misho 175: } else
176: LOGGER("Readed %d bytes", rlen);
177:
1.6.2.4 misho 178: if (rlen < sizeof(struct tagRPCCall)) {
179: rpc_SetErr(ERPCMISMATCH, "Too short RPC packet");
180:
181: schedRead(TASK_ROOT(task), rxPacket, TASK_ARG(task), TASK_FD(task),
182: TASK_DATA(task), TASK_DATLEN(task));
183: return NULL;
184: } else
185: rpc = (struct tagRPCCall*) buf;
186:
187: /* check integrity of packet */
188: crc = ntohs(rpc->call_crc);
189: rpc->call_crc ^= rpc->call_crc;
190: if (crc != crcFletcher16((u_short*) buf, ((rlen + 1) & ~1) / 2)) {
191: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
192:
193: schedRead(TASK_ROOT(task), rxPacket, TASK_ARG(task), TASK_FD(task),
194: TASK_DATA(task), TASK_DATLEN(task));
195: return NULL;
196: }
197:
198: /* check RPC packet session info */
199: if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) {
200: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
201: rpc->call_argc ^= rpc->call_argc;
202: rpc->call_rep.ret = RPC_ERROR(-1);
203: rpc->call_rep.eno = RPC_ERROR(errno);
204: goto end;
205: } else {
206: /* change socket timeout from last packet */
207: ts.tv_sec = rpc->call_session.sess_timeout;
208: ts.tv_nsec = 0;
209: schedPolling(TASK_ROOT(task), &ts, NULL);
210: }
211:
212: /* execute RPC call */
213: schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), 0,
214: TASK_DATA(task), TASK_DATLEN(task));
215:
216: end:
217: /* send RPC reply */
218: if (!(rpc->call_req.flags & RPC_NOREPLY))
219: schedWrite(TASK_ROOT(task), txPacket, TASK_ARG(task), TASK_FD(task),
220: TASK_DATA(task), TASK_DATLEN(task));
1.6.2.8 ! misho 221: /* lets get next packet */
! 222: schedRead(TASK_ROOT(task), rxPacket, TASK_ARG(task), TASK_FD(task),
! 223: TASK_DATA(task), TASK_DATLEN(task));
1.6.2.4 misho 224: return NULL;
225: }
226:
1.1 misho 227: static void *
228: rpc_srv_dispatchCall(void *arg)
229: {
230: rpc_cli_t *c = arg;
231: rpc_srv_t *s;
1.5 misho 232: u_char *buf;
1.6.2.4 misho 233: sched_root_task_t *root;
234: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
1.1 misho 235:
1.6.2.7 misho 236: FTRACE();
237:
1.1 misho 238: if (!arg) {
1.6.2.4 misho 239: rpc_SetErr(EINVAL, "Invalid parameter can`t procced RPC client");
1.1 misho 240: return NULL;
241: } else
242: s = c->cli_parent;
243:
1.6.2.4 misho 244: /* allocate net buffer */
1.5 misho 245: buf = malloc(s->srv_netbuf);
246: if (!buf) {
247: LOGERR;
248: return NULL;
249: }
250:
1.6.2.4 misho 251: root = schedBegin();
252: if (!root) {
253: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
254: free(buf);
255: return NULL;
256: } else {
257: schedTermCondition(root, kill);
258: schedPolling(root, &ts, NULL);
259: }
1.5 misho 260:
1.6.2.4 misho 261: schedRead(root, rxPacket, c, c->cli_sock, buf, s->srv_netbuf);
1.5 misho 262:
1.6.2.5 misho 263: schedRun(root, (void*) &s->srv_kill);
1.6.2.4 misho 264: schedEnd(&root);
1.1 misho 265:
266: shutdown(c->cli_sock, SHUT_RDWR);
267: close(c->cli_sock);
268: memset(c, 0, sizeof(rpc_cli_t));
1.5 misho 269: free(buf);
1.6.2.4 misho 270: return NULL;
1.2 misho 271: }
272:
273:
274: static void *
1.6.2.8 ! misho 275: txBLOB(sched_task_t *task)
! 276: {
! 277: rpc_cli_t *c = TASK_ARG(task);
! 278: u_char *buf = TASK_DATA(task);
! 279: struct tagBLOBHdr *blob = (struct tagBLOBHdr *) buf;
! 280: int wlen = sizeof(struct tagBLOBHdr);
! 281:
! 282: FTRACE();
! 283:
! 284: /* calculate CRC */
! 285: blob->hdr_crc ^= blob->hdr_crc;
! 286: blob->hdr_crc = htons(crcFletcher16((u_short*) buf, ((wlen + 1) & ~1) / 2));
! 287:
! 288: /* send reply */
! 289: wlen = send(TASK_FD(task), buf, wlen, 0);
! 290: if (wlen == -1)
! 291: LOGERR;
! 292: else if (wlen != sizeof(struct tagBLOBHdr))
! 293: rpc_SetErr(EPROCUNAVAIL, "RPC reply, should be send %d bytes, "
! 294: "really sended %d bytes", sizeof(struct tagBLOBHdr), wlen);
! 295: else
! 296: LOGGER("Sended %d bytes", wlen);
! 297:
! 298: return NULL;
! 299: }
! 300:
! 301: static void *
! 302: rxBLOB(sched_task_t *task)
! 303: {
! 304: rpc_cli_t *c = TASK_ARG(task);
! 305: rpc_srv_t *s = c->cli_parent;
! 306: rpc_blob_t *b;
! 307: u_char *buf = TASK_DATA(task);
! 308: struct tagBLOBHdr *blob = (struct tagBLOBHdr *) buf;
! 309: int rlen;
! 310: u_short crc;
! 311: struct timespec ts;
! 312:
! 313: FTRACE();
! 314:
! 315: /* check for disable service at this moment? */
! 316: if (s->srv_blob.state == disable) {
! 317: usleep(100000);
! 318: #ifdef HAVE_PTHREAD_YIELD
! 319: pthread_yield();
! 320: #endif
! 321: schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task),
! 322: TASK_DATA(task), TASK_DATLEN(task));
! 323: return NULL;
! 324: }
! 325:
! 326: memset(buf, 0, TASK_DATLEN(task));
! 327: rlen = recv(TASK_FD(task), buf, TASK_DATLEN(task), 0);
! 328: if (rlen == -1) {
! 329: LOGERR;
! 330: s->srv_blob.state = kill;
! 331: return NULL;
! 332: } else if (!rlen || s->srv_kill == kill) { /* receive EOF */
! 333: s->srv_blob.state = kill;
! 334: return NULL;
! 335: } else
! 336: LOGGER("Readed %d bytes", rlen);
! 337:
! 338: if (rlen < sizeof(struct tagBLOBHdr)) {
! 339: rpc_SetErr(ERPCMISMATCH, "Too short BLOB packet");
! 340: schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task),
! 341: TASK_DATA(task), TASK_DATLEN(task));
! 342: return NULL;
! 343: }
! 344:
! 345: /* check integrity of packet */
! 346: crc = ntohs(blob->hdr_crc);
! 347: blob->hdr_crc ^= blob->hdr_crc;
! 348: if (crc != crcFletcher16((u_short*) buf, ((rlen + 1) & ~1) / 2)) {
! 349: rpc_SetErr(ERPCMISMATCH, "Bad CRC BLOB packet");
! 350: schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task),
! 351: TASK_DATA(task), TASK_DATLEN(task));
! 352: return NULL;
! 353: }
! 354:
! 355: /* check RPC packet session info */
! 356: if (rpc_chkPktSession(&blob->hdr_session, &s->srv_session)) {
! 357: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
! 358: blob->hdr_cmd = error;
! 359: goto end;
! 360: } else {
! 361: /* change socket timeout from last packet */
! 362: ts.tv_sec = blob->hdr_session.sess_timeout;
! 363: ts.tv_nsec = 0;
! 364: schedPolling(TASK_ROOT(task), &ts, NULL);
! 365: }
! 366:
! 367: /* Go to proceed packet ... */
! 368: switch (blob->hdr_cmd) {
! 369: case get:
! 370: if (!(b = rpc_srv_getBLOB(s, ntohl(blob->hdr_var)))) {
! 371: rpc_SetErr(EINVAL, "Var=%x not found", ntohl(blob->hdr_var));
! 372: blob->hdr_cmd = no;
! 373: blob->hdr_ret = RPC_ERROR(-1);
! 374: break;
! 375: } else
! 376: blob->hdr_len = htonl(b->blob_len);
! 377:
! 378: if (rpc_srv_blobMap(s, b) != -1) {
! 379: /* deliver BLOB variable to client */
! 380: blob->hdr_ret = htonl(rpc_srv_sendBLOB(c, b));
! 381: rpc_srv_blobUnmap(b);
! 382: } else {
! 383: blob->hdr_cmd = error;
! 384: blob->hdr_ret = RPC_ERROR(-1);
! 385: }
! 386: break;
! 387: case set:
! 388: if ((b = rpc_srv_registerBLOB(s, ntohl(blob->hdr_len)))) {
! 389: /* set new BLOB variable for reply :) */
! 390: blob->hdr_var = htonl(b->blob_var);
! 391:
! 392: /* receive BLOB from client */
! 393: blob->hdr_ret = htonl(rpc_srv_recvBLOB(c, b));
! 394: rpc_srv_blobUnmap(b);
! 395: } else {
! 396: blob->hdr_cmd = error;
! 397: blob->hdr_ret = RPC_ERROR(-1);
! 398: }
! 399: break;
! 400: case unset:
! 401: if (rpc_srv_unregisterBLOB(s, blob->hdr_var) == -1) {
! 402: blob->hdr_cmd = error;
! 403: blob->hdr_ret = RPC_ERROR(-1);
! 404: }
! 405: break;
! 406: default:
! 407: rpc_SetErr(EPROCUNAVAIL, "Unsupported BLOB command %d", blob->hdr_cmd);
! 408: blob->hdr_cmd = error;
! 409: blob->hdr_ret = RPC_ERROR(-1);
! 410: }
! 411:
! 412: end:
! 413: schedWrite(TASK_ROOT(task), txBLOB, TASK_ARG(task), TASK_FD(task),
! 414: TASK_DATA(task), TASK_DATLEN(task));
! 415: schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task),
! 416: TASK_DATA(task), TASK_DATLEN(task));
! 417: return NULL;
! 418: }
! 419:
! 420: static void *
1.2 misho 421: rpc_srv_dispatchVars(void *arg)
422: {
423: rpc_cli_t *c = arg;
424: rpc_srv_t *s;
1.6.2.8 ! misho 425: sched_root_task_t *root;
! 426: u_char *buf;
! 427: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
1.2 misho 428:
1.6.2.7 misho 429: FTRACE();
430:
1.2 misho 431: if (!arg) {
1.6.2.8 ! misho 432: rpc_SetErr(EINVAL, "Invalid parameter can`t procced BLOB");
1.2 misho 433: return NULL;
434: } else
435: s = c->cli_parent;
436:
1.6.2.8 ! misho 437: /* allocate net buffer */
! 438: buf = malloc(sizeof(struct tagBLOBHdr));
! 439: if (!buf) {
! 440: LOGERR;
! 441: return NULL;
! 442: }
1.2 misho 443:
1.6.2.8 ! misho 444: root = schedBegin();
! 445: if (!root) {
! 446: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
! 447: free(buf);
! 448: return NULL;
! 449: } else {
! 450: schedTermCondition(root, kill);
! 451: schedPolling(root, &ts, NULL);
! 452: }
1.4 misho 453:
1.6.2.8 ! misho 454: schedRead(root, rxBLOB, c, c->cli_sock, buf, sizeof(struct tagBLOBHdr));
1.2 misho 455:
1.6.2.8 ! misho 456: schedRun(root, (void*) &s->srv_blob.state);
! 457: schedEnd(&root);
! 458:
! 459: shutdown(c->cli_sock, SHUT_RDWR);
! 460: close(c->cli_sock);
! 461: memset(c, 0, sizeof(rpc_cli_t));
! 462: free(buf);
! 463: return NULL;
1.2 misho 464:
1.6.2.8 ! misho 465: #if 0
1.2 misho 466: makeReply:
1.4 misho 467: /* Replay to client! */
468: ret = send(c->cli_sock, buf, sizeof buf, 0);
469: if (ret == -1) {
1.2 misho 470: LOGERR;
471: ret = -8;
472: break;
473: }
474: if (ret != sizeof buf) {
1.5 misho 475: rpc_SetErr(EPROCUNAVAIL, "Error:: in send BLOB reply, should be send %d bytes, "
1.2 misho 476: "really is %d\n", sizeof buf, ret);
477: ret = -9;
1.6 misho 478: if (s->srv_kill != kill && s->srv_blob.state != kill)
479: continue;
480: else
481: break;
1.2 misho 482: }
1.6.2.8 ! misho 483: #endif
1.1 misho 484: }
485:
486: // -------------------------------------------------
487:
488: /*
1.6.2.4 misho 489: * rpc_srv_initBLOBServer() - Init & create BLOB Server
490: *
1.4 misho 491: * @srv = RPC server instance
1.2 misho 492: * @Port = Port for bind server, if Port == 0 default port is selected
493: * @diskDir = Disk place for BLOB file objects
494: * return: -1 == error or 0 bind and created BLOB server instance
495: */
496: int
497: rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)
498: {
499: int n = 1;
1.6 misho 500: io_sockaddr_t sa;
1.2 misho 501:
1.6.2.7 misho 502: FTRACE();
503:
1.2 misho 504: if (!srv) {
1.6.2.1 misho 505: rpc_SetErr(EINVAL, "Invalid parameters can`t init BLOB server");
1.2 misho 506: return -1;
507: }
508: if (srv->srv_blob.state) {
1.6.2.1 misho 509: rpc_SetErr(EPERM, "Already started BLOB server!");
1.2 misho 510: return 0;
511: }
512:
513: memset(&srv->srv_blob, 0, sizeof srv->srv_blob);
514: if (access(diskDir, R_OK | W_OK) == -1) {
515: LOGERR;
516: return -1;
517: } else
1.6.2.6 misho 518: AIT_SET_STR(&srv->srv_blob.dir, diskDir);
1.2 misho 519:
520: srv->srv_blob.server.cli_tid = pthread_self();
521: srv->srv_blob.server.cli_parent = srv;
1.4 misho 522:
523: memcpy(&sa, &srv->srv_server.cli_sa, sizeof sa);
1.6 misho 524: switch (sa.sa.sa_family) {
1.4 misho 525: case AF_INET:
1.6 misho 526: sa.sin.sin_port = htons(Port ? Port : ntohs(sa.sin.sin_port) + 1);
1.4 misho 527: break;
528: case AF_INET6:
1.6 misho 529: sa.sin6.sin6_port = htons(Port ? Port : ntohs(sa.sin6.sin6_port) + 1);
1.4 misho 530: break;
531: case AF_LOCAL:
1.6 misho 532: strlcat(sa.sun.sun_path, ".blob", sizeof sa.sun.sun_path);
1.4 misho 533: break;
534: default:
1.6.2.6 misho 535: AIT_FREE_VAL(&srv->srv_blob.dir);
1.4 misho 536: return -1;
1.2 misho 537: }
1.6 misho 538: memcpy(&srv->srv_blob.server.cli_sa, &sa, sizeof sa);
1.2 misho 539:
1.4 misho 540: /* create BLOB server socket */
1.6 misho 541: srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);
1.2 misho 542: if (srv->srv_blob.server.cli_sock == -1) {
543: LOGERR;
1.6.2.6 misho 544: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 545: return -1;
546: }
547: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
548: LOGERR;
549: close(srv->srv_blob.server.cli_sock);
1.6.2.6 misho 550: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 551: return -1;
552: }
1.5 misho 553: n = srv->srv_netbuf;
554: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
555: LOGERR;
556: close(srv->srv_blob.server.cli_sock);
1.6.2.6 misho 557: AIT_FREE_VAL(&srv->srv_blob.dir);
1.5 misho 558: return -1;
559: }
560: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
561: LOGERR;
562: close(srv->srv_blob.server.cli_sock);
1.6.2.6 misho 563: AIT_FREE_VAL(&srv->srv_blob.dir);
1.5 misho 564: return -1;
565: }
1.6 misho 566: if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa,
567: srv->srv_blob.server.cli_sa.sa.sa_len) == -1) {
1.2 misho 568: LOGERR;
569: close(srv->srv_blob.server.cli_sock);
1.6.2.6 misho 570: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 571: return -1;
572: }
573:
1.4 misho 574: /* allocate pool for concurent clients */
1.2 misho 575: srv->srv_blob.clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));
576: if (!srv->srv_blob.clients) {
577: LOGERR;
578: close(srv->srv_blob.server.cli_sock);
1.6.2.6 misho 579: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 580: return -1;
581: } else
582: memset(srv->srv_blob.clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));
583:
584: pthread_mutex_init(&srv->srv_blob.mtx, NULL);
585:
586: rpc_srv_registerCall(srv, NULL, CALL_BLOBSHUTDOWN, 0);
1.5 misho 587: rpc_srv_registerCall(srv, NULL, CALL_BLOBCLIENTS, 1);
588: rpc_srv_registerCall(srv, NULL, CALL_BLOBVARS, 1);
589: rpc_srv_registerCall(srv, NULL, CALL_BLOBSTATE, 0);
1.2 misho 590:
1.4 misho 591: srv->srv_blob.state = enable; /* enable BLOB */
1.2 misho 592: return 0;
593: }
594:
595: /*
1.6.2.4 misho 596: * rpc_srv_endBLOBServer() - Destroy BLOB server, close all opened sockets and free resources
597: *
1.2 misho 598: * @srv = RPC Server instance
599: * return: none
600: */
601: void
602: rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
603: {
604: rpc_cli_t *c;
605: register int i;
606: rpc_blob_t *f;
607:
1.6.2.7 misho 608: FTRACE();
609:
1.2 misho 610: if (!srv) {
1.6.2.1 misho 611: rpc_SetErr(EINVAL, "Can`t destroy server because parameter is null!");
1.2 misho 612: return;
613: } else
1.4 misho 614: srv->srv_blob.state = kill;
1.2 misho 615:
616: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSHUTDOWN);
617: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBCLIENTS);
618: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBVARS);
619: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSTATE);
620:
1.6.2.6 misho 621: AIT_FREE_VAL(&srv->srv_blob.dir);
1.6.2.1 misho 622:
1.4 misho 623: /* close all clients connections & server socket */
1.2 misho 624: for (i = 0, c = srv->srv_blob.clients; i < srv->srv_numcli && c; i++, c++)
1.6 misho 625: if (c->cli_sa.sa.sa_family)
1.2 misho 626: shutdown(c->cli_sock, SHUT_RDWR);
627: close(srv->srv_blob.server.cli_sock);
628:
1.5 misho 629: pthread_mutex_lock(&srv->srv_blob.mtx);
1.2 misho 630: if (srv->srv_blob.clients) {
631: free(srv->srv_blob.clients);
632: srv->srv_blob.clients = NULL;
633: }
634:
1.4 misho 635: /* detach blobs */
1.2 misho 636: while ((f = srv->srv_blob.blobs)) {
637: srv->srv_blob.blobs = f->blob_next;
638: rpc_srv_blobFree(srv, f);
639: free(f);
640: }
641: pthread_mutex_unlock(&srv->srv_blob.mtx);
642:
643: while (pthread_mutex_trylock(&srv->srv_blob.mtx) == EBUSY);
644: pthread_mutex_destroy(&srv->srv_blob.mtx);
645: }
646:
647: /*
1.6.2.4 misho 648: * rpc_srv_loopBLOB() - Execute Main BLOB server loop and wait for clients requests
649: *
1.2 misho 650: * @srv = RPC Server instance
651: * return: -1 error or 0 ok, infinite loop ...
652: */
653: int
1.5 misho 654: rpc_srv_loopBLOB(rpc_srv_t * __restrict srv)
1.2 misho 655: {
1.6 misho 656: socklen_t salen = sizeof(io_sockaddr_t);
1.2 misho 657: register int i;
658: rpc_cli_t *c;
659: fd_set fds;
660: int ret;
661: struct timeval tv = { DEF_RPC_TIMEOUT, 0 };
1.6.2.1 misho 662: pthread_attr_t attr;
1.2 misho 663:
1.6.2.7 misho 664: FTRACE();
665:
1.4 misho 666: if (!srv || srv->srv_blob.state == kill) {
1.6.2.8 ! misho 667: rpc_SetErr(EINVAL, "Invalid parameter can`t start BLOB server");
1.2 misho 668: return -1;
669: }
670:
1.6.2.1 misho 671: tv.tv_sec = srv->srv_session.sess_timeout;
672:
1.2 misho 673: if (listen(srv->srv_blob.server.cli_sock, SOMAXCONN) == -1) {
674: LOGERR;
675: return -1;
676: }
677:
1.6.2.1 misho 678: pthread_attr_init(&attr);
679: pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
680:
681: /* main BLOB loop */
1.4 misho 682: while (srv->srv_blob.state != kill && srv->srv_kill != kill) {
1.2 misho 683: for (c = srv->srv_blob.clients, i = 0; i < srv->srv_numcli && c; i++, c++)
1.6 misho 684: if (!c->cli_sa.sa.sa_family)
1.2 misho 685: break;
686: if (i >= srv->srv_numcli) {
1.5 misho 687: #ifdef HAVE_PTHREAD_YIELD
688: pthread_yield();
689: #else
1.2 misho 690: usleep(1000000);
1.5 misho 691: #endif
1.2 misho 692: continue;
693: }
694:
695: FD_ZERO(&fds);
696: FD_SET(srv->srv_blob.server.cli_sock, &fds);
697: ret = select(srv->srv_blob.server.cli_sock + 1, &fds, NULL, NULL, &tv);
698: if (ret == -1) {
699: LOGERR;
700: ret = 1;
701: break;
702: }
703: if (!ret)
704: continue;
705:
1.6 misho 706: c->cli_sock = accept(srv->srv_blob.server.cli_sock, &c->cli_sa.sa, &salen);
1.2 misho 707: if (c->cli_sock == -1) {
708: LOGERR;
709: continue;
710: } else
711: c->cli_parent = srv;
712:
1.6.2.1 misho 713: /* spawn dispatch thread for BLOB client */
714: if (pthread_create(&c->cli_tid, &attr, rpc_srv_dispatchVars, c)) {
1.2 misho 715: LOGERR;
716: continue;
1.6.2.1 misho 717: }
1.2 misho 718: }
719:
1.5 misho 720: srv->srv_blob.state = kill;
1.2 misho 721:
1.6.2.1 misho 722: pthread_attr_destroy(&attr);
1.2 misho 723: return 0;
724: }
725:
726:
727: /*
1.6.2.4 misho 728: * rpc_srv_initServer() - Init & create RPC Server
729: *
1.1 misho 730: * @regProgID = ProgramID for authentication & recognition
731: * @regProcID = ProcessID for authentication & recognition
732: * @concurentClients = Concurent clients at same time to this server
1.5 misho 733: * @netBuf = Network buffer length, if =0 == BUFSIZ (also meaning max RPC packet)
1.4 misho 734: * @family = Family type, AF_INET, AF_INET6 or AF_LOCAL
735: * @csHost = Host name or address for bind server, if NULL any address
1.1 misho 736: * @Port = Port for bind server, if Port == 0 default port is selected
737: * return: NULL == error or !=NULL bind and created RPC server instance
738: */
739: rpc_srv_t *
740: rpc_srv_initServer(u_int regProgID, u_int regProcID, int concurentClients,
1.5 misho 741: int netBuf, u_short family, const char *csHost, u_short Port)
1.1 misho 742: {
743: rpc_srv_t *srv = NULL;
744: int n = 1;
745: struct hostent *host = NULL;
1.6 misho 746: io_sockaddr_t sa;
1.1 misho 747:
1.6.2.7 misho 748: FTRACE();
749:
1.4 misho 750: if (!concurentClients || !regProgID ||
751: (family != AF_INET && family != AF_INET6 && family != AF_LOCAL)) {
1.1 misho 752: rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t init RPC server ...\n");
753: return NULL;
754: }
755: if (!Port)
756: Port = RPC_DEFPORT;
1.5 misho 757: if (!netBuf)
758: netBuf = BUFSIZ;
1.4 misho 759: if (csHost && family != AF_LOCAL) {
1.1 misho 760: host = gethostbyname2(csHost, family);
761: if (!host) {
762: rpc_SetErr(h_errno, "Error:: %s\n", hstrerror(h_errno));
763: return NULL;
764: }
765: }
1.4 misho 766: memset(&sa, 0, sizeof sa);
1.6 misho 767: sa.sa.sa_family = family;
1.1 misho 768: switch (family) {
769: case AF_INET:
1.6 misho 770: sa.sin.sin_len = sizeof(struct sockaddr_in);
771: sa.sin.sin_port = htons(Port);
1.1 misho 772: if (csHost)
1.6 misho 773: memcpy(&sa.sin.sin_addr, host->h_addr, host->h_length);
1.1 misho 774: break;
775: case AF_INET6:
1.6 misho 776: sa.sin6.sin6_len = sizeof(struct sockaddr_in6);
777: sa.sin6.sin6_port = htons(Port);
1.4 misho 778: if (csHost)
1.6 misho 779: memcpy(&sa.sin6.sin6_addr, host->h_addr, host->h_length);
1.4 misho 780: break;
781: case AF_LOCAL:
1.6 misho 782: sa.sun.sun_len = sizeof(struct sockaddr_un);
1.1 misho 783: if (csHost)
1.6 misho 784: strlcpy(sa.sun.sun_path, csHost, sizeof sa.sun.sun_path);
785: unlink(sa.sun.sun_path);
1.1 misho 786: break;
787: default:
788: rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t start RPC server ...\n");
789: return NULL;
790: }
791:
792: srv = malloc(sizeof(rpc_srv_t));
793: if (!srv) {
794: LOGERR;
795: return NULL;
796: } else
797: memset(srv, 0, sizeof(rpc_srv_t));
798:
1.5 misho 799: srv->srv_netbuf = netBuf;
1.1 misho 800: srv->srv_numcli = concurentClients;
801: srv->srv_session.sess_version = RPC_VERSION;
1.6.2.1 misho 802: srv->srv_session.sess_timeout = DEF_RPC_TIMEOUT;
1.1 misho 803: srv->srv_session.sess_program = regProgID;
804: srv->srv_session.sess_process = regProcID;
805:
1.2 misho 806: srv->srv_server.cli_tid = pthread_self();
1.1 misho 807: srv->srv_server.cli_parent = srv;
1.6 misho 808: memcpy(&srv->srv_server.cli_sa, &sa, sizeof sa);
1.4 misho 809:
810: /* create server socket */
1.1 misho 811: srv->srv_server.cli_sock = socket(family, SOCK_STREAM, 0);
812: if (srv->srv_server.cli_sock == -1) {
813: LOGERR;
814: free(srv);
815: return NULL;
816: }
817: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
818: LOGERR;
819: close(srv->srv_server.cli_sock);
820: free(srv);
821: return NULL;
822: }
1.5 misho 823: n = srv->srv_netbuf;
824: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
825: LOGERR;
826: close(srv->srv_server.cli_sock);
827: free(srv);
828: return NULL;
829: }
830: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
831: LOGERR;
832: close(srv->srv_server.cli_sock);
833: free(srv);
834: return NULL;
835: }
1.6 misho 836: if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa,
837: srv->srv_server.cli_sa.sa.sa_len) == -1) {
1.1 misho 838: LOGERR;
839: close(srv->srv_server.cli_sock);
840: free(srv);
841: return NULL;
842: }
843:
1.4 misho 844: /* allocate pool for concurent clients */
1.1 misho 845: srv->srv_clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));
846: if (!srv->srv_clients) {
847: LOGERR;
848: close(srv->srv_server.cli_sock);
849: free(srv);
850: return NULL;
851: } else
852: memset(srv->srv_clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));
853:
1.2 misho 854: pthread_mutex_init(&srv->srv_mtx, NULL);
855:
856: rpc_srv_registerCall(srv, NULL, CALL_SRVSHUTDOWN, 0);
1.5 misho 857: rpc_srv_registerCall(srv, NULL, CALL_SRVCLIENTS, 1);
858: rpc_srv_registerCall(srv, NULL, CALL_SRVSESSIONS, 4);
859: rpc_srv_registerCall(srv, NULL, CALL_SRVCALLS, 1);
1.1 misho 860: return srv;
861: }
862:
863: /*
1.6.2.4 misho 864: * rpc_srv_endServer() - Destroy RPC server, close all opened sockets and free resources
865: *
1.6 misho 866: * @psrv = RPC Server instance
1.1 misho 867: * return: none
868: */
869: void
1.6 misho 870: rpc_srv_endServer(rpc_srv_t ** __restrict psrv)
1.1 misho 871: {
872: rpc_cli_t *c;
873: register int i;
874: rpc_func_t *f;
875:
1.6.2.7 misho 876: FTRACE();
877:
1.6 misho 878: if (!psrv || !*psrv) {
1.1 misho 879: rpc_SetErr(EINVAL, "Error:: Can`t destroy server because parameter is null!\n");
880: return;
881: }
882:
1.6 misho 883: rpc_srv_endBLOBServer(*psrv);
1.1 misho 884:
1.4 misho 885: /* close all clients connections & server socket */
1.6 misho 886: for (i = 0, c = (*psrv)->srv_clients; i < (*psrv)->srv_numcli && c; i++, c++)
887: if (c->cli_sa.sa.sa_family) {
1.1 misho 888: shutdown(c->cli_sock, SHUT_RDWR);
1.2 misho 889: close(c->cli_sock);
890: }
1.6 misho 891: close((*psrv)->srv_server.cli_sock);
1.1 misho 892:
1.6 misho 893: if ((*psrv)->srv_clients) {
894: free((*psrv)->srv_clients);
895: (*psrv)->srv_clients = NULL;
896: (*psrv)->srv_numcli = 0;
1.1 misho 897: }
898:
1.4 misho 899: /* detach exported calls */
1.6 misho 900: pthread_mutex_lock(&(*psrv)->srv_mtx);
901: while ((f = (*psrv)->srv_funcs)) {
902: (*psrv)->srv_funcs = f->func_next;
903: io_freeVars(&f->func_vars);
1.6.2.4 misho 904: AIT_FREE_VAL(&f->func_name);
905: AIT_FREE_VAL(&f->func_file);
1.2 misho 906: free(f);
907: }
1.6 misho 908: pthread_mutex_unlock(&(*psrv)->srv_mtx);
1.2 misho 909:
1.6 misho 910: while (pthread_mutex_trylock(&(*psrv)->srv_mtx) == EBUSY);
911: pthread_mutex_destroy(&(*psrv)->srv_mtx);
1.2 misho 912:
1.6 misho 913: free(*psrv);
914: *psrv = NULL;
1.1 misho 915: }
916:
917: /*
1.6.2.4 misho 918: * rpc_srv_loopServer() - Execute Main server loop and wait for clients requests
919: *
1.1 misho 920: * @srv = RPC Server instance
921: * return: -1 error or 0 ok, infinite loop ...
922: */
923: int
1.5 misho 924: rpc_srv_loopServer(rpc_srv_t * __restrict srv)
1.1 misho 925: {
1.6 misho 926: socklen_t salen = sizeof(io_sockaddr_t);
1.1 misho 927: register int i;
928: rpc_cli_t *c;
1.2 misho 929: fd_set fds;
930: int ret;
931: struct timeval tv = { DEF_RPC_TIMEOUT, 0 };
1.6.2.2 misho 932: pthread_attr_t attr;
1.1 misho 933:
1.6.2.7 misho 934: FTRACE();
935:
1.1 misho 936: if (!srv) {
937: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start RPC server ...\n");
938: return -1;
939: }
940:
1.6.2.3 misho 941: tv.tv_sec = srv->srv_session.sess_timeout;
942:
1.5 misho 943: /* activate BLOB server worker if srv->srv_blob.state == enable */
944: rpc_srv_execBLOBServer(srv);
945:
1.1 misho 946: if (listen(srv->srv_server.cli_sock, SOMAXCONN) == -1) {
947: LOGERR;
948: return -1;
949: }
950:
1.6.2.2 misho 951: pthread_attr_init(&attr);
952: pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
953:
1.6.2.3 misho 954: /* main rpc loop */
1.4 misho 955: while (srv->srv_kill != kill) {
1.1 misho 956: for (c = srv->srv_clients, i = 0; i < srv->srv_numcli && c; i++, c++)
1.6 misho 957: if (!c->cli_sa.sa.sa_family)
1.1 misho 958: break;
1.2 misho 959: if (i >= srv->srv_numcli) {
1.5 misho 960: #ifdef HAVE_PTHREAD_YIELD
961: pthread_yield();
962: #else
1.1 misho 963: usleep(1000000);
1.5 misho 964: #endif
1.1 misho 965: continue;
966: }
1.2 misho 967:
968: FD_ZERO(&fds);
969: FD_SET(srv->srv_server.cli_sock, &fds);
970: ret = select(srv->srv_server.cli_sock + 1, &fds, NULL, NULL, &tv);
971: if (ret == -1) {
972: LOGERR;
973: ret = 1;
974: break;
975: }
976: if (!ret)
977: continue;
978:
1.6 misho 979: c->cli_sock = accept(srv->srv_server.cli_sock, &c->cli_sa.sa, &salen);
1.1 misho 980: if (c->cli_sock == -1) {
981: LOGERR;
982: continue;
983: } else
984: c->cli_parent = srv;
985:
1.6.2.3 misho 986: /* spawn rpc client dispatcher */
1.6.2.2 misho 987: if (pthread_create(&c->cli_tid, &attr, rpc_srv_dispatchCall, c)) {
1.1 misho 988: LOGERR;
989: continue;
1.6.2.2 misho 990: }
1.1 misho 991: }
992:
1.6.2.2 misho 993: pthread_attr_destroy(&attr);
1.1 misho 994: return 0;
995: }
996:
997: // ---------------------------------------------------------
998:
999: /*
1000: * rpc_srv_execCall() Execute registered call from RPC server
1.6.2.4 misho 1001: *
1.1 misho 1002: * @call = Register RPC call
1003: * @rpc = IN RPC call structure
1.5 misho 1004: * @args = IN RPC calling arguments from RPC client
1.1 misho 1005: * return: -1 error, !=-1 ok
1006: */
1007: int
1.2 misho 1008: rpc_srv_execCall(rpc_func_t * __restrict call, struct tagRPCCall * __restrict rpc,
1.5 misho 1009: array_t * __restrict args)
1.1 misho 1010: {
1011: void *dl;
1012: rpc_callback_t func;
1013: int ret;
1014:
1.6.2.7 misho 1015: FTRACE();
1016:
1.2 misho 1017: if (!call || !rpc || !call->func_parent) {
1.6.2.4 misho 1018: rpc_SetErr(EINVAL, "Invalid parameter can`t exec function");
1.1 misho 1019: return -1;
1020: }
1021:
1.6.2.4 misho 1022: dl = dlopen(AIT_VOID(&call->func_file), RTLD_NOW);
1.1 misho 1023: if (!dl) {
1.6.2.4 misho 1024: rpc_SetErr(ENOENT, "Can`t attach module %s!", dlerror());
1.1 misho 1025: return -1;
1026: }
1027:
1.6.2.5 misho 1028: func = dlsym(dl, (const char*) AIT_GET_STR(&call->func_name));
1.1 misho 1029: if (func)
1.6 misho 1030: ret = func(call, ntohs(rpc->call_argc), args);
1.1 misho 1031: else {
1.6.2.4 misho 1032: rpc_SetErr(ENOEXEC, "Can`t find function %s!", dlerror());
1.1 misho 1033: ret = -1;
1034: }
1035:
1036: dlclose(dl);
1037: return ret;
1038: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>