Annotation of libaitrpc/src/srv.c, revision 1.6.2.7
1.1 misho 1: /*************************************************************************
2: * (C) 2010 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.6.2.7 ! misho 6: * $Id: srv.c,v 1.6.2.6 2012/03/13 17:21:52 misho Exp $
1.1 misho 7: *
1.2 misho 8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
1.6.2.1 misho 15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
1.2 misho 16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
1.1 misho 46: #include "global.h"
47:
48:
1.6.2.4 misho 49: static void *rxPacket(sched_task_t*);
50:
51: static void *
52: txPacket(sched_task_t *task)
53: {
54: rpc_cli_t *c = TASK_ARG(task);
55: rpc_srv_t *s = c->cli_parent;
56: rpc_func_t *f = NULL;
57: u_char *buf = TASK_DATA(task);
58: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
59: int ret, wlen = sizeof(struct tagRPCCall);
60: array_t *arr = NULL;
61:
1.6.2.7 ! misho 62: FTRACE();
! 63:
1.6.2.4 misho 64: if (rpc->call_argc) {
65: f = rpc_srv_getCall(s, ntohs(rpc->call_tag), ntohl(rpc->call_hash));
66: if (!f) {
67: rpc->call_argc ^= rpc->call_argc;
68: rpc->call_rep.ret = RPC_ERROR(-1);
69: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
70: } else {
71: rpc->call_argc = htons(rpc_srv_getVars(f, &arr));
72: /* Go Encapsulate variables */
73: ret = io_vars2buffer(buf + wlen, TASK_DATLEN(task) - wlen, arr);
74: io_clrVars(f->func_vars);
75: if (ret == -1) {
76: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
77: rpc->call_argc ^= rpc->call_argc;
78: rpc->call_rep.ret = RPC_ERROR(-1);
79: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
80: } else
81: wlen += ret;
82: }
83: }
84:
1.6.2.7 ! misho 85: /* calculate CRC */
! 86: rpc->call_crc ^= rpc->call_crc;
! 87: rpc->call_crc = htons(crcFletcher16((u_short*) buf, ((wlen + 1) & ~1) / 2));
! 88:
1.6.2.4 misho 89: /* send reply */
90: ret = send(TASK_FD(task), buf, wlen, 0);
91: if (ret == -1)
92: LOGERR;
93: else if (ret != wlen)
94: rpc_SetErr(EPROCUNAVAIL, "RPC reply, should be send %d bytes, "
95: "really sended %d bytes", wlen, ret);
1.6.2.7 ! misho 96: else
! 97: LOGGER("Sended %d bytes", ret);
1.6.2.4 misho 98:
99: /* lets get next packet */
100: schedRead(TASK_ROOT(task), rxPacket, TASK_ARG(task), TASK_FD(task),
101: TASK_DATA(task), TASK_DATLEN(task));
102: return NULL;
103: }
104:
105: static void *
106: execCall(sched_task_t *task)
107: {
108: rpc_cli_t *c = TASK_ARG(task);
109: rpc_srv_t *s = c->cli_parent;
110: rpc_func_t *f = NULL;
111: array_t *arr = NULL;
112: u_char *buf = TASK_DATA(task);
113: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
114: int argc = ntohs(rpc->call_argc);
115:
1.6.2.7 ! misho 116: FTRACE();
! 117:
1.6.2.4 misho 118: /* Go decapsulate variables ... */
119: if (!(rpc->call_req.flags & RPC_NOREPLY) && argc) {
120: arr = io_buffer2vars(buf + sizeof(struct tagRPCCall),
121: TASK_DATLEN(task) - sizeof(struct tagRPCCall), argc, 1);
122: if (!arr) {
123: rpc_SetErr(ERPCMISMATCH, "#%d - %s", io_GetErrno(), io_GetError());
124: rpc->call_argc ^= rpc->call_argc;
125: rpc->call_rep.ret = RPC_ERROR(-1);
126: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
127: return NULL;
128: }
129: }
130:
131: if (!(f = rpc_srv_getCall(s, ntohs(rpc->call_tag), ntohl(rpc->call_hash)))) {
132: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
133: rpc->call_argc ^= rpc->call_argc;
134: rpc->call_rep.ret = RPC_ERROR(-1);
135: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
136: } else {
1.6.2.7 ! misho 137: LOGGER("RPC function %s from module %s", AIT_GET_STR(&f->func_name),
! 138: AIT_GET_STR(&f->func_file));
! 139:
1.6.2.4 misho 140: rpc->call_rep.ret = RPC_ERROR(rpc_srv_execCall(f, rpc, arr));
141: if (rpc->call_rep.ret == htonl(-1)) {
142: rpc->call_rep.eno = RPC_ERROR(errno);
143: rpc->call_argc ^= rpc->call_argc;
144: } else {
145: rpc->call_rep.eno ^= rpc->call_rep.eno;
146: rpc->call_argc = htons(rpc_srv_getVars(f, NULL));
147: }
148: }
149:
150: if (arr)
151: io_arrayDestroy(&arr);
152: return NULL;
153: }
154:
155: static void *
156: rxPacket(sched_task_t *task)
157: {
158: rpc_cli_t *c = TASK_ARG(task);
159: rpc_srv_t *s = c->cli_parent;
160: u_char *buf = TASK_DATA(task);
161: int rlen;
162: u_short crc;
163: struct tagRPCCall *rpc;
164: struct timespec ts;
165:
1.6.2.7 ! misho 166: FTRACE();
! 167:
1.6.2.4 misho 168: memset(buf, 0, TASK_DATLEN(task));
169: rlen = recv(TASK_FD(task), buf, TASK_DATLEN(task), 0);
170: if (rlen == -1) {
171: LOGERR;
172: s->srv_kill = kill;
173:
174: schedRead(TASK_ROOT(task), rxPacket, TASK_ARG(task), TASK_FD(task),
175: TASK_DATA(task), TASK_DATLEN(task));
176: return NULL;
177: } else if (!rlen) { /* receive EOF */
178: s->srv_kill = kill;
179:
180: schedRead(TASK_ROOT(task), rxPacket, TASK_ARG(task), TASK_FD(task),
181: TASK_DATA(task), TASK_DATLEN(task));
182: return NULL;
1.6.2.7 ! misho 183: } else
! 184: LOGGER("Readed %d bytes", rlen);
! 185:
1.6.2.4 misho 186: if (rlen < sizeof(struct tagRPCCall)) {
187: rpc_SetErr(ERPCMISMATCH, "Too short RPC packet");
188:
189: schedRead(TASK_ROOT(task), rxPacket, TASK_ARG(task), TASK_FD(task),
190: TASK_DATA(task), TASK_DATLEN(task));
191: return NULL;
192: } else
193: rpc = (struct tagRPCCall*) buf;
194:
195: /* check integrity of packet */
196: crc = ntohs(rpc->call_crc);
197: rpc->call_crc ^= rpc->call_crc;
198: if (crc != crcFletcher16((u_short*) buf, ((rlen + 1) & ~1) / 2)) {
199: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
200:
201: schedRead(TASK_ROOT(task), rxPacket, TASK_ARG(task), TASK_FD(task),
202: TASK_DATA(task), TASK_DATLEN(task));
203: return NULL;
204: }
205:
206: /* check RPC packet session info */
207: if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) {
208: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
209: rpc->call_argc ^= rpc->call_argc;
210: rpc->call_rep.ret = RPC_ERROR(-1);
211: rpc->call_rep.eno = RPC_ERROR(errno);
212: goto end;
213: } else {
214: /* change socket timeout from last packet */
215: ts.tv_sec = rpc->call_session.sess_timeout;
216: ts.tv_nsec = 0;
217: schedPolling(TASK_ROOT(task), &ts, NULL);
218: }
219:
220: /* execute RPC call */
221: schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), 0,
222: TASK_DATA(task), TASK_DATLEN(task));
223:
224: end:
225: /* send RPC reply */
226: if (!(rpc->call_req.flags & RPC_NOREPLY))
227: schedWrite(TASK_ROOT(task), txPacket, TASK_ARG(task), TASK_FD(task),
228: TASK_DATA(task), TASK_DATLEN(task));
229: else
230: schedRead(TASK_ROOT(task), rxPacket, TASK_ARG(task), TASK_FD(task),
231: TASK_DATA(task), TASK_DATLEN(task));
232: return NULL;
233: }
234:
1.1 misho 235: static void *
236: rpc_srv_dispatchCall(void *arg)
237: {
238: rpc_cli_t *c = arg;
239: rpc_srv_t *s;
1.5 misho 240: u_char *buf;
1.6.2.4 misho 241: sched_root_task_t *root;
242: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
1.1 misho 243:
1.6.2.7 ! misho 244: FTRACE();
! 245:
1.1 misho 246: if (!arg) {
1.6.2.4 misho 247: rpc_SetErr(EINVAL, "Invalid parameter can`t procced RPC client");
1.1 misho 248: return NULL;
249: } else
250: s = c->cli_parent;
251:
1.6.2.4 misho 252: /* allocate net buffer */
1.5 misho 253: buf = malloc(s->srv_netbuf);
254: if (!buf) {
255: LOGERR;
256: return NULL;
257: }
258:
1.6.2.4 misho 259: root = schedBegin();
260: if (!root) {
261: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
262: free(buf);
263: return NULL;
264: } else {
265: schedTermCondition(root, kill);
266: schedPolling(root, &ts, NULL);
267: }
1.5 misho 268:
1.6.2.4 misho 269: schedRead(root, rxPacket, c, c->cli_sock, buf, s->srv_netbuf);
1.5 misho 270:
1.6.2.5 misho 271: schedRun(root, (void*) &s->srv_kill);
1.6.2.4 misho 272: schedEnd(&root);
1.1 misho 273:
274: shutdown(c->cli_sock, SHUT_RDWR);
275: close(c->cli_sock);
276: memset(c, 0, sizeof(rpc_cli_t));
1.5 misho 277: free(buf);
1.6.2.4 misho 278: return NULL;
1.2 misho 279: }
280:
281:
282: static void *
283: rpc_srv_dispatchVars(void *arg)
284: {
285: rpc_cli_t *c = arg;
286: rpc_srv_t *s;
287: rpc_blob_t *b;
1.4 misho 288: int ret = 0;
1.2 misho 289: fd_set fds;
290: u_char buf[sizeof(struct tagBLOBHdr)];
291: struct tagBLOBHdr *blob;
292:
1.6.2.7 ! misho 293: FTRACE();
! 294:
1.2 misho 295: if (!arg) {
296: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t procced BLOB client ...\n");
297: return NULL;
298: } else
299: s = c->cli_parent;
300:
301: do {
1.4 misho 302: /* check for disable service at this moment? */
303: if (s->srv_blob.state == disable && s->srv_kill != kill) {
304: usleep(100000);
305: #ifdef HAVE_PTHREAD_YIELD
306: pthread_yield();
307: #endif
308: continue;
1.2 misho 309: }
310:
311: FD_ZERO(&fds);
312: FD_SET(c->cli_sock, &fds);
313: ret = select(c->cli_sock + 1, &fds, NULL, NULL, NULL);
314: if (ret == -1) {
1.4 misho 315: if (errno == EINTR && s->srv_kill != kill && s->srv_blob.state != kill)
316: continue;
317:
318: LOGERR;
1.2 misho 319: ret = -2;
1.4 misho 320: break;
1.2 misho 321: }
322:
323: memset(buf, 0, sizeof buf);
1.4 misho 324: ret = recv(c->cli_sock, buf, sizeof buf, 0);
325: if (ret == -1) {
1.2 misho 326: LOGERR;
327: ret = -3;
328: break;
329: }
1.4 misho 330: /* receive EOF, disable or kill service */
331: if (!ret || s->srv_blob.state == kill || s->srv_kill == kill) {
1.2 misho 332: ret = 0;
333: break;
334: }
335: if (ret < sizeof(struct tagBLOBHdr)) {
1.5 misho 336: rpc_SetErr(ERPCMISMATCH, "Error:: too short BLOB packet ...\n");
1.2 misho 337: ret = -4;
1.6 misho 338: if (s->srv_kill != kill && s->srv_blob.state != kill)
339: continue;
340: else
341: break;
1.2 misho 342: } else
343: blob = (struct tagBLOBHdr*) buf;
1.4 misho 344: /* check BLOB packet session info */
1.2 misho 345: if (memcmp(&blob->hdr_session, &s->srv_session, sizeof blob->hdr_session)) {
346: rpc_SetErr(EINVAL, "Error:: get invalid BLOB session ...\n");
347: ret = -5;
348: goto makeReply;
349: }
1.4 misho 350: /* Go to proceed packet ... */
1.2 misho 351: switch (blob->hdr_cmd) {
352: case get:
353: if (!(b = rpc_srv_getBLOB(s, blob->hdr_var))) {
354: rpc_SetErr(EINVAL, "Error:: var (%x) not found into BLOB server ...\n",
355: blob->hdr_var);
356: ret = -6;
357: break;
358: } else
359: blob->hdr_len = b->blob_len;
360:
361: if (rpc_srv_blobMap(s, b) != -1) {
362: ret = rpc_srv_sendBLOB(c, b);
363: rpc_srv_blobUnmap(b);
364: } else
365: ret = -7;
366: break;
367: case set:
368: if ((b = rpc_srv_registerBLOB(s, blob->hdr_len))) {
1.4 misho 369: /* set new BLOB variable for reply :) */
1.2 misho 370: blob->hdr_var = b->blob_var;
371:
372: ret = rpc_srv_recvBLOB(c, b);
373: rpc_srv_blobUnmap(b);
374: } else
375: ret = -7;
376: break;
377: case unset:
378: ret = rpc_srv_unregisterBLOB(s, blob->hdr_var);
379: if (ret == -1)
380: ret = -7;
381: break;
382: default:
1.5 misho 383: rpc_SetErr(EPROCUNAVAIL, "Error:: unsupported BLOB command (%d)...\n",
1.2 misho 384: blob->hdr_cmd);
385: ret = -7;
386: }
387:
388: makeReply:
1.4 misho 389: /* Replay to client! */
1.2 misho 390: blob->hdr_cmd = ret < 0 ? error : ok;
391: blob->hdr_ret = ret;
1.4 misho 392: ret = send(c->cli_sock, buf, sizeof buf, 0);
393: if (ret == -1) {
1.2 misho 394: LOGERR;
395: ret = -8;
396: break;
397: }
398: if (ret != sizeof buf) {
1.5 misho 399: rpc_SetErr(EPROCUNAVAIL, "Error:: in send BLOB reply, should be send %d bytes, "
1.2 misho 400: "really is %d\n", sizeof buf, ret);
401: ret = -9;
1.6 misho 402: if (s->srv_kill != kill && s->srv_blob.state != kill)
403: continue;
404: else
405: break;
1.2 misho 406: }
1.4 misho 407: } while (ret > -1 || s->srv_kill != kill);
1.2 misho 408:
409: shutdown(c->cli_sock, SHUT_RDWR);
410: close(c->cli_sock);
411: memset(c, 0, sizeof(rpc_cli_t));
1.5 misho 412: return (void*) ((long)ret);
1.1 misho 413: }
414:
415: // -------------------------------------------------
416:
417: /*
1.6.2.4 misho 418: * rpc_srv_initBLOBServer() - Init & create BLOB Server
419: *
1.4 misho 420: * @srv = RPC server instance
1.2 misho 421: * @Port = Port for bind server, if Port == 0 default port is selected
422: * @diskDir = Disk place for BLOB file objects
423: * return: -1 == error or 0 bind and created BLOB server instance
424: */
425: int
426: rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)
427: {
428: int n = 1;
1.6 misho 429: io_sockaddr_t sa;
1.2 misho 430:
1.6.2.7 ! misho 431: FTRACE();
! 432:
1.2 misho 433: if (!srv) {
1.6.2.1 misho 434: rpc_SetErr(EINVAL, "Invalid parameters can`t init BLOB server");
1.2 misho 435: return -1;
436: }
437: if (srv->srv_blob.state) {
1.6.2.1 misho 438: rpc_SetErr(EPERM, "Already started BLOB server!");
1.2 misho 439: return 0;
440: }
441:
442: memset(&srv->srv_blob, 0, sizeof srv->srv_blob);
443: if (access(diskDir, R_OK | W_OK) == -1) {
444: LOGERR;
445: return -1;
446: } else
1.6.2.6 misho 447: AIT_SET_STR(&srv->srv_blob.dir, diskDir);
1.2 misho 448:
449: srv->srv_blob.server.cli_tid = pthread_self();
450: srv->srv_blob.server.cli_parent = srv;
1.4 misho 451:
452: memcpy(&sa, &srv->srv_server.cli_sa, sizeof sa);
1.6 misho 453: switch (sa.sa.sa_family) {
1.4 misho 454: case AF_INET:
1.6 misho 455: sa.sin.sin_port = htons(Port ? Port : ntohs(sa.sin.sin_port) + 1);
1.4 misho 456: break;
457: case AF_INET6:
1.6 misho 458: sa.sin6.sin6_port = htons(Port ? Port : ntohs(sa.sin6.sin6_port) + 1);
1.4 misho 459: break;
460: case AF_LOCAL:
1.6 misho 461: strlcat(sa.sun.sun_path, ".blob", sizeof sa.sun.sun_path);
1.4 misho 462: break;
463: default:
1.6.2.6 misho 464: AIT_FREE_VAL(&srv->srv_blob.dir);
1.4 misho 465: return -1;
1.2 misho 466: }
1.6 misho 467: memcpy(&srv->srv_blob.server.cli_sa, &sa, sizeof sa);
1.2 misho 468:
1.4 misho 469: /* create BLOB server socket */
1.6 misho 470: srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);
1.2 misho 471: if (srv->srv_blob.server.cli_sock == -1) {
472: LOGERR;
1.6.2.6 misho 473: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 474: return -1;
475: }
476: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
477: LOGERR;
478: close(srv->srv_blob.server.cli_sock);
1.6.2.6 misho 479: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 480: return -1;
481: }
1.5 misho 482: n = srv->srv_netbuf;
483: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
484: LOGERR;
485: close(srv->srv_blob.server.cli_sock);
1.6.2.6 misho 486: AIT_FREE_VAL(&srv->srv_blob.dir);
1.5 misho 487: return -1;
488: }
489: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
490: LOGERR;
491: close(srv->srv_blob.server.cli_sock);
1.6.2.6 misho 492: AIT_FREE_VAL(&srv->srv_blob.dir);
1.5 misho 493: return -1;
494: }
1.6 misho 495: if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa,
496: srv->srv_blob.server.cli_sa.sa.sa_len) == -1) {
1.2 misho 497: LOGERR;
498: close(srv->srv_blob.server.cli_sock);
1.6.2.6 misho 499: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 500: return -1;
501: }
502:
1.4 misho 503: /* allocate pool for concurent clients */
1.2 misho 504: srv->srv_blob.clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));
505: if (!srv->srv_blob.clients) {
506: LOGERR;
507: close(srv->srv_blob.server.cli_sock);
1.6.2.6 misho 508: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 509: return -1;
510: } else
511: memset(srv->srv_blob.clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));
512:
513: pthread_mutex_init(&srv->srv_blob.mtx, NULL);
514:
515: rpc_srv_registerCall(srv, NULL, CALL_BLOBSHUTDOWN, 0);
1.5 misho 516: rpc_srv_registerCall(srv, NULL, CALL_BLOBCLIENTS, 1);
517: rpc_srv_registerCall(srv, NULL, CALL_BLOBVARS, 1);
518: rpc_srv_registerCall(srv, NULL, CALL_BLOBSTATE, 0);
1.2 misho 519:
1.4 misho 520: srv->srv_blob.state = enable; /* enable BLOB */
1.2 misho 521: return 0;
522: }
523:
524: /*
1.6.2.4 misho 525: * rpc_srv_endBLOBServer() - Destroy BLOB server, close all opened sockets and free resources
526: *
1.2 misho 527: * @srv = RPC Server instance
528: * return: none
529: */
530: void
531: rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
532: {
533: rpc_cli_t *c;
534: register int i;
535: rpc_blob_t *f;
536:
1.6.2.7 ! misho 537: FTRACE();
! 538:
1.2 misho 539: if (!srv) {
1.6.2.1 misho 540: rpc_SetErr(EINVAL, "Can`t destroy server because parameter is null!");
1.2 misho 541: return;
542: } else
1.4 misho 543: srv->srv_blob.state = kill;
1.2 misho 544:
545: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSHUTDOWN);
546: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBCLIENTS);
547: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBVARS);
548: rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSTATE);
549:
1.6.2.6 misho 550: AIT_FREE_VAL(&srv->srv_blob.dir);
1.6.2.1 misho 551:
1.4 misho 552: /* close all clients connections & server socket */
1.2 misho 553: for (i = 0, c = srv->srv_blob.clients; i < srv->srv_numcli && c; i++, c++)
1.6 misho 554: if (c->cli_sa.sa.sa_family)
1.2 misho 555: shutdown(c->cli_sock, SHUT_RDWR);
556: close(srv->srv_blob.server.cli_sock);
557:
1.5 misho 558: pthread_mutex_lock(&srv->srv_blob.mtx);
1.2 misho 559: if (srv->srv_blob.clients) {
560: free(srv->srv_blob.clients);
561: srv->srv_blob.clients = NULL;
562: }
563:
1.4 misho 564: /* detach blobs */
1.2 misho 565: while ((f = srv->srv_blob.blobs)) {
566: srv->srv_blob.blobs = f->blob_next;
567: rpc_srv_blobFree(srv, f);
568: free(f);
569: }
570: pthread_mutex_unlock(&srv->srv_blob.mtx);
571:
572: while (pthread_mutex_trylock(&srv->srv_blob.mtx) == EBUSY);
573: pthread_mutex_destroy(&srv->srv_blob.mtx);
574: }
575:
576: /*
1.6.2.4 misho 577: * rpc_srv_loopBLOB() - Execute Main BLOB server loop and wait for clients requests
578: *
1.2 misho 579: * @srv = RPC Server instance
580: * return: -1 error or 0 ok, infinite loop ...
581: */
582: int
1.5 misho 583: rpc_srv_loopBLOB(rpc_srv_t * __restrict srv)
1.2 misho 584: {
1.6 misho 585: socklen_t salen = sizeof(io_sockaddr_t);
1.2 misho 586: register int i;
587: rpc_cli_t *c;
588: fd_set fds;
589: int ret;
590: struct timeval tv = { DEF_RPC_TIMEOUT, 0 };
1.6.2.1 misho 591: pthread_attr_t attr;
1.2 misho 592:
1.6.2.7 ! misho 593: FTRACE();
! 594:
1.4 misho 595: if (!srv || srv->srv_blob.state == kill) {
1.2 misho 596: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start BLOB server ...\n");
597: return -1;
598: }
599:
1.6.2.1 misho 600: tv.tv_sec = srv->srv_session.sess_timeout;
601:
1.2 misho 602: if (listen(srv->srv_blob.server.cli_sock, SOMAXCONN) == -1) {
603: LOGERR;
604: return -1;
605: }
606:
1.6.2.1 misho 607: pthread_attr_init(&attr);
608: pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
609:
610: /* main BLOB loop */
1.4 misho 611: while (srv->srv_blob.state != kill && srv->srv_kill != kill) {
1.2 misho 612: for (c = srv->srv_blob.clients, i = 0; i < srv->srv_numcli && c; i++, c++)
1.6 misho 613: if (!c->cli_sa.sa.sa_family)
1.2 misho 614: break;
615: if (i >= srv->srv_numcli) {
1.5 misho 616: #ifdef HAVE_PTHREAD_YIELD
617: pthread_yield();
618: #else
1.2 misho 619: usleep(1000000);
1.5 misho 620: #endif
1.2 misho 621: continue;
622: }
623:
624: FD_ZERO(&fds);
625: FD_SET(srv->srv_blob.server.cli_sock, &fds);
626: ret = select(srv->srv_blob.server.cli_sock + 1, &fds, NULL, NULL, &tv);
627: if (ret == -1) {
628: LOGERR;
629: ret = 1;
630: break;
631: }
632: if (!ret)
633: continue;
634:
1.6 misho 635: c->cli_sock = accept(srv->srv_blob.server.cli_sock, &c->cli_sa.sa, &salen);
1.2 misho 636: if (c->cli_sock == -1) {
637: LOGERR;
638: continue;
639: } else
640: c->cli_parent = srv;
641:
1.6.2.1 misho 642: /* spawn dispatch thread for BLOB client */
643: if (pthread_create(&c->cli_tid, &attr, rpc_srv_dispatchVars, c)) {
1.2 misho 644: LOGERR;
645: continue;
1.6.2.1 misho 646: }
1.2 misho 647: }
648:
1.5 misho 649: srv->srv_blob.state = kill;
1.2 misho 650:
1.6.2.1 misho 651: pthread_attr_destroy(&attr);
1.2 misho 652: return 0;
653: }
654:
655:
656: /*
1.6.2.4 misho 657: * rpc_srv_initServer() - Init & create RPC Server
658: *
1.1 misho 659: * @regProgID = ProgramID for authentication & recognition
660: * @regProcID = ProcessID for authentication & recognition
661: * @concurentClients = Concurent clients at same time to this server
1.5 misho 662: * @netBuf = Network buffer length, if =0 == BUFSIZ (also meaning max RPC packet)
1.4 misho 663: * @family = Family type, AF_INET, AF_INET6 or AF_LOCAL
664: * @csHost = Host name or address for bind server, if NULL any address
1.1 misho 665: * @Port = Port for bind server, if Port == 0 default port is selected
666: * return: NULL == error or !=NULL bind and created RPC server instance
667: */
668: rpc_srv_t *
669: rpc_srv_initServer(u_int regProgID, u_int regProcID, int concurentClients,
1.5 misho 670: int netBuf, u_short family, const char *csHost, u_short Port)
1.1 misho 671: {
672: rpc_srv_t *srv = NULL;
673: int n = 1;
674: struct hostent *host = NULL;
1.6 misho 675: io_sockaddr_t sa;
1.1 misho 676:
1.6.2.7 ! misho 677: FTRACE();
! 678:
1.4 misho 679: if (!concurentClients || !regProgID ||
680: (family != AF_INET && family != AF_INET6 && family != AF_LOCAL)) {
1.1 misho 681: rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t init RPC server ...\n");
682: return NULL;
683: }
684: if (!Port)
685: Port = RPC_DEFPORT;
1.5 misho 686: if (!netBuf)
687: netBuf = BUFSIZ;
1.4 misho 688: if (csHost && family != AF_LOCAL) {
1.1 misho 689: host = gethostbyname2(csHost, family);
690: if (!host) {
691: rpc_SetErr(h_errno, "Error:: %s\n", hstrerror(h_errno));
692: return NULL;
693: }
694: }
1.4 misho 695: memset(&sa, 0, sizeof sa);
1.6 misho 696: sa.sa.sa_family = family;
1.1 misho 697: switch (family) {
698: case AF_INET:
1.6 misho 699: sa.sin.sin_len = sizeof(struct sockaddr_in);
700: sa.sin.sin_port = htons(Port);
1.1 misho 701: if (csHost)
1.6 misho 702: memcpy(&sa.sin.sin_addr, host->h_addr, host->h_length);
1.1 misho 703: break;
704: case AF_INET6:
1.6 misho 705: sa.sin6.sin6_len = sizeof(struct sockaddr_in6);
706: sa.sin6.sin6_port = htons(Port);
1.4 misho 707: if (csHost)
1.6 misho 708: memcpy(&sa.sin6.sin6_addr, host->h_addr, host->h_length);
1.4 misho 709: break;
710: case AF_LOCAL:
1.6 misho 711: sa.sun.sun_len = sizeof(struct sockaddr_un);
1.1 misho 712: if (csHost)
1.6 misho 713: strlcpy(sa.sun.sun_path, csHost, sizeof sa.sun.sun_path);
714: unlink(sa.sun.sun_path);
1.1 misho 715: break;
716: default:
717: rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t start RPC server ...\n");
718: return NULL;
719: }
720:
721: srv = malloc(sizeof(rpc_srv_t));
722: if (!srv) {
723: LOGERR;
724: return NULL;
725: } else
726: memset(srv, 0, sizeof(rpc_srv_t));
727:
1.5 misho 728: srv->srv_netbuf = netBuf;
1.1 misho 729: srv->srv_numcli = concurentClients;
730: srv->srv_session.sess_version = RPC_VERSION;
1.6.2.1 misho 731: srv->srv_session.sess_timeout = DEF_RPC_TIMEOUT;
1.1 misho 732: srv->srv_session.sess_program = regProgID;
733: srv->srv_session.sess_process = regProcID;
734:
1.2 misho 735: srv->srv_server.cli_tid = pthread_self();
1.1 misho 736: srv->srv_server.cli_parent = srv;
1.6 misho 737: memcpy(&srv->srv_server.cli_sa, &sa, sizeof sa);
1.4 misho 738:
739: /* create server socket */
1.1 misho 740: srv->srv_server.cli_sock = socket(family, SOCK_STREAM, 0);
741: if (srv->srv_server.cli_sock == -1) {
742: LOGERR;
743: free(srv);
744: return NULL;
745: }
746: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
747: LOGERR;
748: close(srv->srv_server.cli_sock);
749: free(srv);
750: return NULL;
751: }
1.5 misho 752: n = srv->srv_netbuf;
753: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
754: LOGERR;
755: close(srv->srv_server.cli_sock);
756: free(srv);
757: return NULL;
758: }
759: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
760: LOGERR;
761: close(srv->srv_server.cli_sock);
762: free(srv);
763: return NULL;
764: }
1.6 misho 765: if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa,
766: srv->srv_server.cli_sa.sa.sa_len) == -1) {
1.1 misho 767: LOGERR;
768: close(srv->srv_server.cli_sock);
769: free(srv);
770: return NULL;
771: }
772:
1.4 misho 773: /* allocate pool for concurent clients */
1.1 misho 774: srv->srv_clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));
775: if (!srv->srv_clients) {
776: LOGERR;
777: close(srv->srv_server.cli_sock);
778: free(srv);
779: return NULL;
780: } else
781: memset(srv->srv_clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));
782:
1.2 misho 783: pthread_mutex_init(&srv->srv_mtx, NULL);
784:
785: rpc_srv_registerCall(srv, NULL, CALL_SRVSHUTDOWN, 0);
1.5 misho 786: rpc_srv_registerCall(srv, NULL, CALL_SRVCLIENTS, 1);
787: rpc_srv_registerCall(srv, NULL, CALL_SRVSESSIONS, 4);
788: rpc_srv_registerCall(srv, NULL, CALL_SRVCALLS, 1);
1.1 misho 789: return srv;
790: }
791:
792: /*
1.6.2.4 misho 793: * rpc_srv_endServer() - Destroy RPC server, close all opened sockets and free resources
794: *
1.6 misho 795: * @psrv = RPC Server instance
1.1 misho 796: * return: none
797: */
798: void
1.6 misho 799: rpc_srv_endServer(rpc_srv_t ** __restrict psrv)
1.1 misho 800: {
801: rpc_cli_t *c;
802: register int i;
803: rpc_func_t *f;
804:
1.6.2.7 ! misho 805: FTRACE();
! 806:
1.6 misho 807: if (!psrv || !*psrv) {
1.1 misho 808: rpc_SetErr(EINVAL, "Error:: Can`t destroy server because parameter is null!\n");
809: return;
810: }
811:
1.6 misho 812: rpc_srv_endBLOBServer(*psrv);
1.1 misho 813:
1.4 misho 814: /* close all clients connections & server socket */
1.6 misho 815: for (i = 0, c = (*psrv)->srv_clients; i < (*psrv)->srv_numcli && c; i++, c++)
816: if (c->cli_sa.sa.sa_family) {
1.1 misho 817: shutdown(c->cli_sock, SHUT_RDWR);
1.2 misho 818: close(c->cli_sock);
819: }
1.6 misho 820: close((*psrv)->srv_server.cli_sock);
1.1 misho 821:
1.6 misho 822: if ((*psrv)->srv_clients) {
823: free((*psrv)->srv_clients);
824: (*psrv)->srv_clients = NULL;
825: (*psrv)->srv_numcli = 0;
1.1 misho 826: }
827:
1.4 misho 828: /* detach exported calls */
1.6 misho 829: pthread_mutex_lock(&(*psrv)->srv_mtx);
830: while ((f = (*psrv)->srv_funcs)) {
831: (*psrv)->srv_funcs = f->func_next;
832: io_freeVars(&f->func_vars);
1.6.2.4 misho 833: AIT_FREE_VAL(&f->func_name);
834: AIT_FREE_VAL(&f->func_file);
1.2 misho 835: free(f);
836: }
1.6 misho 837: pthread_mutex_unlock(&(*psrv)->srv_mtx);
1.2 misho 838:
1.6 misho 839: while (pthread_mutex_trylock(&(*psrv)->srv_mtx) == EBUSY);
840: pthread_mutex_destroy(&(*psrv)->srv_mtx);
1.2 misho 841:
1.6 misho 842: free(*psrv);
843: *psrv = NULL;
1.1 misho 844: }
845:
846: /*
1.6.2.4 misho 847: * rpc_srv_loopServer() - Execute Main server loop and wait for clients requests
848: *
1.1 misho 849: * @srv = RPC Server instance
850: * return: -1 error or 0 ok, infinite loop ...
851: */
852: int
1.5 misho 853: rpc_srv_loopServer(rpc_srv_t * __restrict srv)
1.1 misho 854: {
1.6 misho 855: socklen_t salen = sizeof(io_sockaddr_t);
1.1 misho 856: register int i;
857: rpc_cli_t *c;
1.2 misho 858: fd_set fds;
859: int ret;
860: struct timeval tv = { DEF_RPC_TIMEOUT, 0 };
1.6.2.2 misho 861: pthread_attr_t attr;
1.1 misho 862:
1.6.2.7 ! misho 863: FTRACE();
! 864:
1.1 misho 865: if (!srv) {
866: rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start RPC server ...\n");
867: return -1;
868: }
869:
1.6.2.3 misho 870: tv.tv_sec = srv->srv_session.sess_timeout;
871:
1.5 misho 872: /* activate BLOB server worker if srv->srv_blob.state == enable */
873: rpc_srv_execBLOBServer(srv);
874:
1.1 misho 875: if (listen(srv->srv_server.cli_sock, SOMAXCONN) == -1) {
876: LOGERR;
877: return -1;
878: }
879:
1.6.2.2 misho 880: pthread_attr_init(&attr);
881: pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
882:
1.6.2.3 misho 883: /* main rpc loop */
1.4 misho 884: while (srv->srv_kill != kill) {
1.1 misho 885: for (c = srv->srv_clients, i = 0; i < srv->srv_numcli && c; i++, c++)
1.6 misho 886: if (!c->cli_sa.sa.sa_family)
1.1 misho 887: break;
1.2 misho 888: if (i >= srv->srv_numcli) {
1.5 misho 889: #ifdef HAVE_PTHREAD_YIELD
890: pthread_yield();
891: #else
1.1 misho 892: usleep(1000000);
1.5 misho 893: #endif
1.1 misho 894: continue;
895: }
1.2 misho 896:
897: FD_ZERO(&fds);
898: FD_SET(srv->srv_server.cli_sock, &fds);
899: ret = select(srv->srv_server.cli_sock + 1, &fds, NULL, NULL, &tv);
900: if (ret == -1) {
901: LOGERR;
902: ret = 1;
903: break;
904: }
905: if (!ret)
906: continue;
907:
1.6 misho 908: c->cli_sock = accept(srv->srv_server.cli_sock, &c->cli_sa.sa, &salen);
1.1 misho 909: if (c->cli_sock == -1) {
910: LOGERR;
911: continue;
912: } else
913: c->cli_parent = srv;
914:
1.6.2.3 misho 915: /* spawn rpc client dispatcher */
1.6.2.2 misho 916: if (pthread_create(&c->cli_tid, &attr, rpc_srv_dispatchCall, c)) {
1.1 misho 917: LOGERR;
918: continue;
1.6.2.2 misho 919: }
1.1 misho 920: }
921:
1.6.2.2 misho 922: pthread_attr_destroy(&attr);
1.1 misho 923: return 0;
924: }
925:
926: // ---------------------------------------------------------
927:
928: /*
929: * rpc_srv_execCall() Execute registered call from RPC server
1.6.2.4 misho 930: *
1.1 misho 931: * @call = Register RPC call
932: * @rpc = IN RPC call structure
1.5 misho 933: * @args = IN RPC calling arguments from RPC client
1.1 misho 934: * return: -1 error, !=-1 ok
935: */
936: int
1.2 misho 937: rpc_srv_execCall(rpc_func_t * __restrict call, struct tagRPCCall * __restrict rpc,
1.5 misho 938: array_t * __restrict args)
1.1 misho 939: {
940: void *dl;
941: rpc_callback_t func;
942: int ret;
943:
1.6.2.7 ! misho 944: FTRACE();
! 945:
1.2 misho 946: if (!call || !rpc || !call->func_parent) {
1.6.2.4 misho 947: rpc_SetErr(EINVAL, "Invalid parameter can`t exec function");
1.1 misho 948: return -1;
949: }
950:
1.6.2.4 misho 951: dl = dlopen(AIT_VOID(&call->func_file), RTLD_NOW);
1.1 misho 952: if (!dl) {
1.6.2.4 misho 953: rpc_SetErr(ENOENT, "Can`t attach module %s!", dlerror());
1.1 misho 954: return -1;
955: }
956:
1.6.2.5 misho 957: func = dlsym(dl, (const char*) AIT_GET_STR(&call->func_name));
1.1 misho 958: if (func)
1.6 misho 959: ret = func(call, ntohs(rpc->call_argc), args);
1.1 misho 960: else {
1.6.2.4 misho 961: rpc_SetErr(ENOEXEC, "Can`t find function %s!", dlerror());
1.1 misho 962: ret = -1;
963: }
964:
965: dlclose(dl);
966: return ret;
967: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>