Annotation of libaitrpc/src/srv.c, revision 1.17.4.7
1.1 misho 1: /*************************************************************************
2: * (C) 2010 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.17.4.7! misho 6: * $Id: srv.c,v 1.17.4.6 2013/08/21 13:01:44 misho Exp $
1.1 misho 7: *
1.2 misho 8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
1.14 misho 15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013
1.2 misho 16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
1.1 misho 46: #include "global.h"
47:
48:
1.13 misho 49: /* SOCK_STREAM */
50: static void *acceptClients(sched_task_t *);
51: static void *closeClient(sched_task_t *);
52: static void *rxPacket(sched_task_t *);
53: static void *txPacket(sched_task_t *);
54:
55: /* SOCK_DGRAM */
56: static void *freeClient(sched_task_t *);
57: static void *rxUDPPacket(sched_task_t *);
58: static void *txUDPPacket(sched_task_t *);
59:
60: /* SOCK_RAW */
61:
62: static sched_task_func_t cbProto[SOCK_RAW + 1][4] = {
63: { acceptClients, closeClient, rxPacket, txPacket }, /* SOCK_STREAM */
64: { acceptClients, closeClient, rxPacket, txPacket }, /* SOCK_STREAM */
65: { rxUDPPacket, freeClient, rxUDPPacket, txUDPPacket }, /* SOCK_DGRAM */
66: { NULL, NULL, NULL, NULL } /* SOCK_RAW */
67: };
68:
69:
1.17.4.3 misho 70: static inline u_char *
71: getBuffer(rpc_cli_t * __restrict c)
72: {
73: u_char *b = NULL;
74:
75: assert(c);
76:
77: if (RPC_ISNEXTBUF(c))
78: b = AIT_GET_BUF(array(c->cli_buf, 1, ait_val_t*));
79: else
80: b = AIT_GET_BUF(array(c->cli_buf, 0, ait_val_t*)) +
81: sizeof(struct tagRPCCall);
82:
83: return b;
84: }
85:
86: static inline struct tagRPCCall *
87: getHeader(rpc_cli_t * __restrict c)
88: {
89: assert(c);
90:
91: return (struct tagRPCCall*) AIT_GET_BUF(array(c->cli_buf, 0, ait_val_t*));
92: }
93:
1.16 misho 94: void
1.13 misho 95: rpc_freeCli(rpc_cli_t * __restrict c)
1.10 misho 96: {
97: rpc_srv_t *s = c->cli_parent;
98:
1.13 misho 99: schedCancelby(s->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
1.10 misho 100:
1.17.4.1 misho 101: /* free buffer(s) */
102: ait_freeVars(&c->cli_buf);
1.10 misho 103:
1.14 misho 104: array_Del(s->srv_clients, c->cli_id, 0);
1.10 misho 105: if (c)
1.14 misho 106: e_free(c);
1.13 misho 107: }
108:
109:
110: static inline int
1.14 misho 111: _check4freeslot(rpc_srv_t * __restrict srv, sockaddr_t * __restrict sa)
1.13 misho 112: {
113: rpc_cli_t *c = NULL;
114: register int i;
115:
116: /* check free slots for connect */
1.14 misho 117: for (i = 0; i < array_Size(srv->srv_clients) &&
118: (c = array(srv->srv_clients, i, rpc_cli_t*)); i++)
1.13 misho 119: /* check for duplicates */
1.14 misho 120: if (sa && !e_addrcmp(&c->cli_sa, sa, 42))
1.13 misho 121: break;
1.14 misho 122: if (i >= array_Size(srv->srv_clients))
1.13 misho 123: return -1; /* no more free slots! */
124:
125: return i;
126: }
127:
128: static rpc_cli_t *
1.14 misho 129: _allocClient(rpc_srv_t * __restrict srv, sockaddr_t * __restrict sa)
1.13 misho 130: {
131: rpc_cli_t *c = NULL;
132: int n;
133:
134: n = _check4freeslot(srv, sa);
135: if (n == -1)
136: return NULL;
137: else
1.14 misho 138: c = array(srv->srv_clients, n, rpc_cli_t*);
1.13 misho 139:
140: if (!c) {
1.14 misho 141: c = e_malloc(sizeof(rpc_cli_t));
1.13 misho 142: if (!c) {
143: LOGERR;
144: srv->srv_kill = 1;
145: return NULL;
146: } else {
147: memset(c, 0, sizeof(rpc_cli_t));
1.14 misho 148: array_Set(srv->srv_clients, n, c);
1.13 misho 149: c->cli_id = n;
150: c->cli_parent = srv;
151: }
152:
1.17.4.2 misho 153: /* init buffer(s) */
1.17.4.5 misho 154: c->cli_buf = ait_allocVars(2);
1.17.4.1 misho 155: if (!c->cli_buf) {
156: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
157: array_Del(srv->srv_clients, n, 42);
158: return NULL;
159: } else
160: AIT_SET_BUFSIZ(array(c->cli_buf, 0, ait_val_t*), 0, srv->srv_netbuf);
1.13 misho 161: }
162:
163: return c;
164: }
165:
166:
167: static void *
168: freeClient(sched_task_t *task)
169: {
170: rpc_freeCli(TASK_ARG(task));
171:
172: return NULL;
173: }
174:
175: static void *
176: closeClient(sched_task_t *task)
177: {
178: int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock;
179:
180: rpc_freeCli(TASK_ARG(task));
181:
182: /* close client socket */
183: shutdown(sock, SHUT_RDWR);
184: close(sock);
1.10 misho 185: return NULL;
186: }
1.7 misho 187:
188: static void *
189: txPacket(sched_task_t *task)
190: {
191: rpc_cli_t *c = TASK_ARG(task);
192: rpc_srv_t *s = c->cli_parent;
193: rpc_func_t *f = NULL;
1.17.4.4 misho 194: u_char *buf;
1.17.4.5 misho 195: struct tagRPCCall *rpc = (struct tagRPCCall*) TASK_DATA(task);
1.7 misho 196: int ret, wlen = sizeof(struct tagRPCCall);
1.17.4.4 misho 197: int len = sizeof(struct tagRPCCall) + ntohl(rpc->call_len);
1.10 misho 198:
1.17.4.4 misho 199: buf = e_malloc(len);
200: if (!buf) {
201: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
1.17.4.5 misho 202: /* close connection */
203: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
204: TASK_ARG(task), 0, NULL, 0);
1.17.4.4 misho 205: return NULL;
206: } else {
207: /* copy RPC header */
1.17.4.5 misho 208: memcpy(buf, rpc, wlen);
1.17.4.4 misho 209: rpc = (struct tagRPCCall*) buf;
210: }
1.7 misho 211:
212: if (rpc->call_argc) {
1.10 misho 213: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
1.7 misho 214: if (!f) {
1.10 misho 215: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
1.17.4.4 misho 216:
1.7 misho 217: rpc->call_argc ^= rpc->call_argc;
218: rpc->call_rep.ret = RPC_ERROR(-1);
219: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
220: } else {
1.14 misho 221: rpc->call_argc = htons(array_Size(RPC_RETVARS(c)));
1.7 misho 222: /* Go Encapsulate variables */
1.17.4.4 misho 223: ret = ait_vars2buffer(buf + wlen, len - wlen, RPC_RETVARS(c));
1.10 misho 224: /* Free return values */
1.14 misho 225: ait_freeVars(&c->cli_vars);
1.7 misho 226: if (ret == -1) {
227: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
1.17.4.4 misho 228:
1.7 misho 229: rpc->call_argc ^= rpc->call_argc;
230: rpc->call_rep.ret = RPC_ERROR(-1);
231: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
232: } else
233: wlen += ret;
234: }
235: }
236:
1.17.4.4 misho 237: rpc->call_len = htonl(wlen);
1.7 misho 238:
239: /* send reply */
1.10 misho 240: ret = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);
241: if (ret == -1 || ret != wlen) {
242: /* close connection */
1.13 misho 243: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
244: TASK_ARG(task), 0, NULL, 0);
1.10 misho 245: }
1.7 misho 246:
1.17.4.5 misho 247: e_free(buf);
1.7 misho 248: return NULL;
249: }
250:
251: static void *
252: execCall(sched_task_t *task)
253: {
254: rpc_cli_t *c = TASK_ARG(task);
255: rpc_srv_t *s = c->cli_parent;
256: rpc_func_t *f = NULL;
257: array_t *arr = NULL;
1.17.4.3 misho 258: u_char *buf = getBuffer(c);
259: struct tagRPCCall *rpc = getHeader(c);
1.7 misho 260: int argc = ntohs(rpc->call_argc);
261:
262: /* Go decapsulate variables ... */
1.8 misho 263: if (argc) {
1.17.4.3 misho 264: arr = ait_buffer2vars(buf, ntohl(rpc->call_len), argc, 42);
1.7 misho 265: if (!arr) {
1.14 misho 266: rpc_SetErr(ERPCMISMATCH, "#%d - %s", elwix_GetErrno(), elwix_GetError());
1.17.4.3 misho 267:
1.7 misho 268: rpc->call_argc ^= rpc->call_argc;
269: rpc->call_rep.ret = RPC_ERROR(-1);
270: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
271: return NULL;
272: }
1.10 misho 273: } else
274: arr = NULL;
1.7 misho 275:
1.10 misho 276: if (!(f = rpc_srv_getCall(s, ntohs(rpc->call_tag)))) {
1.7 misho 277: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
1.17.4.3 misho 278:
1.7 misho 279: rpc->call_argc ^= rpc->call_argc;
280: rpc->call_rep.ret = RPC_ERROR(-1);
281: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
282: } else {
1.8 misho 283: /* if client doesn't want reply */
1.10 misho 284: argc = RPC_CHK_NOREPLY(rpc);
285: rpc->call_rep.ret = RPC_ERROR(rpc_srv_execCall(c, rpc, f->func_name, arr));
1.7 misho 286: if (rpc->call_rep.ret == htonl(-1)) {
287: rpc->call_rep.eno = RPC_ERROR(errno);
288: rpc->call_argc ^= rpc->call_argc;
289: } else {
290: rpc->call_rep.eno ^= rpc->call_rep.eno;
1.8 misho 291: if (argc) {
292: /* without reply */
1.14 misho 293: ait_freeVars(&c->cli_vars);
1.8 misho 294: rpc->call_argc ^= rpc->call_argc;
1.10 misho 295: } else {
296: /* reply */
1.14 misho 297: rpc->call_argc = htons(array_Size(RPC_RETVARS(c)));
1.10 misho 298: }
1.7 misho 299: }
300: }
301:
1.14 misho 302: array_Destroy(&arr);
1.7 misho 303: return NULL;
304: }
305:
306: static void *
307: rxPacket(sched_task_t *task)
308: {
309: rpc_cli_t *c = TASK_ARG(task);
310: rpc_srv_t *s = c->cli_parent;
1.10 misho 311: int len, rlen, noreply;
1.17.4.3 misho 312: ait_val_t *bufz = array(c->cli_buf, 0, ait_val_t*);
313: u_char *buf = (u_char*) AIT_GET_BUF(bufz);
314: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
1.7 misho 315:
1.17.4.3 misho 316: memset(buf, 0, AIT_LEN(bufz));
317: /* 1st buffer is last */
318: RPC_CLR_NEXTBUF(c);
319:
320: /* read rpc header */
321: rlen = recv(TASK_FD(task), rpc, MIN(sizeof(struct tagRPCCall), AIT_LEN(bufz)), 0);
1.17.4.5 misho 322: if (rlen < sizeof(struct tagRPCCall) ||
1.17.4.3 misho 323: ntohl(rpc->call_len) < sizeof(struct tagRPCCall)) {
324: /* close connection */
325: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
326: TASK_ARG(task), 0, NULL, 0);
327: return NULL;
1.10 misho 328: } else {
1.17.4.3 misho 329: buf += sizeof(struct tagRPCCall);
330: len = ntohl(rpc->call_len);
1.9 misho 331: }
1.7 misho 332:
1.17.4.3 misho 333: if (len > (AIT_LEN(bufz) - sizeof(struct tagRPCCall))) {
334: /* add extra buffer */
335: if (!(bufz = ait_getVars(&c->cli_buf, 1))) {
336: /* close connection */
337: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
338: TASK_ARG(task), 0, NULL, 0);
1.8 misho 339: return NULL;
1.17.4.3 misho 340: } else {
1.17.4.5 misho 341: AIT_FREE_VAL(bufz);
1.17.4.3 misho 342: AIT_SET_BUFSIZ(bufz, 0, len);
343: buf = AIT_GET_BUF(bufz);
1.10 misho 344: }
1.17.4.3 misho 345: /* buffer isnt last */
346: RPC_SET_NEXTBUF(c);
347: }
1.8 misho 348:
1.17.4.3 misho 349: /* read payload */
350: rlen = recv(TASK_FD(task), buf, len, 0);
351: if (rlen < len) {
352: /* close connection */
353: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
354: TASK_ARG(task), 0, NULL, 0);
355: return NULL;
356: }
1.7 misho 357:
1.17.4.3 misho 358: noreply = RPC_CHK_NOREPLY(rpc);
1.7 misho 359:
1.17.4.3 misho 360: /* check RPC packet session info */
361: if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) {
362: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
1.7 misho 363:
1.17.4.3 misho 364: rpc->call_argc ^= rpc->call_argc;
365: rpc->call_rep.ret = RPC_ERROR(-1);
366: rpc->call_rep.eno = RPC_ERROR(errno);
367: } else {
368: /* execute RPC call */
369: schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), noreply, rpc, len);
370: }
1.7 misho 371:
1.17.4.3 misho 372: /* send RPC reply */
373: if (!noreply)
374: schedWrite(TASK_ROOT(task), cbProto[s->srv_proto][CB_TXPACKET],
375: TASK_ARG(task), TASK_FD(task), rpc, len);
1.7 misho 376:
377: /* lets get next packet */
1.17.4.3 misho 378: schedReadSelf(task);
1.7 misho 379: return NULL;
380: }
381:
1.1 misho 382: static void *
1.10 misho 383: acceptClients(sched_task_t *task)
1.1 misho 384: {
1.10 misho 385: rpc_srv_t *srv = TASK_ARG(task);
386: rpc_cli_t *c = NULL;
1.14 misho 387: socklen_t salen = sizeof(sockaddr_t);
1.7 misho 388:
1.13 misho 389: c = _allocClient(srv, NULL);
390: if (!c)
1.10 misho 391: goto end;
392:
393: /* accept client */
394: c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
395: if (c->cli_sock == -1) {
396: LOGERR;
1.17.4.2 misho 397: ait_freeVars(&c->cli_buf);
1.14 misho 398: array_Del(srv->srv_clients, c->cli_id, 42);
1.10 misho 399: goto end;
1.1 misho 400: } else
1.10 misho 401: fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
1.1 misho 402:
1.13 misho 403: schedRead(TASK_ROOT(task), cbProto[srv->srv_proto][CB_RXPACKET], c,
404: c->cli_sock, NULL, 0);
1.10 misho 405: end:
406: schedReadSelf(task);
407: return NULL;
408: }
1.5 misho 409:
1.7 misho 410:
1.10 misho 411: static void *
1.13 misho 412: txUDPPacket(sched_task_t *task)
1.10 misho 413: {
414: rpc_cli_t *c = TASK_ARG(task);
415: rpc_srv_t *s = c->cli_parent;
1.13 misho 416: rpc_func_t *f = NULL;
1.17.4.5 misho 417: u_char *buf;
418: struct tagRPCCall *rpc = (struct tagRPCCall*) TASK_DATA(task);
1.13 misho 419: int ret, wlen = sizeof(struct tagRPCCall);
1.17.4.5 misho 420: int len = sizeof(struct tagRPCCall) + ntohl(rpc->call_len);
1.13 misho 421: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
422:
423: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
424: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
425: TASK_ARG(task), ts, TASK_ARG(task), 0);
426:
1.17.4.5 misho 427: buf = e_malloc(len);
428: if (!buf) {
429: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
430: /* close connection */
431: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
432: TASK_ARG(task), 0, NULL, 0);
433: return NULL;
434: } else {
435: /* copy RPC header */
436: memcpy(buf, rpc, wlen);
437: rpc = (struct tagRPCCall*) buf;
438: }
1.13 misho 439:
440: if (rpc->call_argc) {
441: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
442: if (!f) {
443: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
1.17.4.5 misho 444:
1.13 misho 445: rpc->call_argc ^= rpc->call_argc;
446: rpc->call_rep.ret = RPC_ERROR(-1);
447: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
448: } else {
1.14 misho 449: rpc->call_argc = htons(array_Size(RPC_RETVARS(c)));
1.13 misho 450: /* Go Encapsulate variables */
1.17.4.5 misho 451: ret = ait_vars2buffer(buf + wlen, len - wlen, RPC_RETVARS(c));
1.13 misho 452: /* Free return values */
1.14 misho 453: ait_freeVars(&c->cli_vars);
1.13 misho 454: if (ret == -1) {
455: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
1.17.4.5 misho 456:
1.13 misho 457: rpc->call_argc ^= rpc->call_argc;
458: rpc->call_rep.ret = RPC_ERROR(-1);
459: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
460: } else
461: wlen += ret;
462: }
463: }
1.7 misho 464:
1.17.4.5 misho 465: rpc->call_len = htonl(wlen);
1.7 misho 466:
1.13 misho 467: /* calculate CRC */
468: rpc->call_crc ^= rpc->call_crc;
469: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
470:
471: /* send reply */
472: ret = sendto(TASK_FD(task), buf, wlen, MSG_NOSIGNAL,
473: &c->cli_sa.sa, c->cli_sa.sa.sa_len);
474: if (ret == -1 || ret != wlen) {
475: /* close connection */
476: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
477: TASK_ARG(task), 0, NULL, 0);
478: }
479:
1.17.4.5 misho 480: e_free(buf);
1.13 misho 481: return NULL;
482: }
483:
484: static void *
485: rxUDPPacket(sched_task_t *task)
486: {
487: rpc_srv_t *srv = TASK_ARG(task);
488: rpc_cli_t *c = NULL;
489: int len, rlen, noreply;
1.17.4.5 misho 490: ait_val_t *bufz;
491: u_char *buf = NULL;
492: struct tagRPCCall rpcbuf, *rpc;
493: sockaddr_t sa[2];
1.14 misho 494: socklen_t salen;
1.13 misho 495: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
496:
1.17.4.5 misho 497: memset(&rpcbuf, 0, sizeof rpcbuf);
498:
1.13 misho 499: /* receive connect packet */
1.17.4.5 misho 500: salen = sa[0].ss.ss_len = sizeof(sockaddr_t);
501: rlen = recvfrom(TASK_FD(task), &rpcbuf, sizeof rpcbuf, 0, &sa[0].sa, &salen);
502: if (rlen < sizeof(struct tagRPCCall) || ntohl(rpcbuf.call_len) < sizeof(struct tagRPCCall))
503: goto end;
504: else
505: len = ntohl(rpcbuf.call_len);
506:
507: buf = e_malloc(len);
508: if (!buf)
1.13 misho 509: goto end;
1.17.4.5 misho 510: else
511: memset(buf, 0, len);
1.13 misho 512:
1.17.4.5 misho 513: /* read payload */
514: salen = sa[1].ss.ss_len = sizeof(sockaddr_t);
515: rlen = recvfrom(TASK_FD(task), buf, len, 0, &sa[1].sa, &salen);
516: if (rlen < len || memcmp(&sa[0], &sa[1], sizeof sa[0]))
517: goto end;
518:
519: c = _allocClient(srv, sa);
1.13 misho 520: if (!c)
521: goto end;
522: else {
1.17.4.5 misho 523: /* add extra buffer */
524: if (!(bufz = ait_getVars(&c->cli_buf, 1)))
525: goto end;
526: else {
527: AIT_FREE_VAL(bufz);
528: AIT_SET_BUFSIZ(bufz, 0, len);
529: /* buffer isnt last */
530: RPC_SET_NEXTBUF(c);
531: }
532:
533: rpc = getHeader(c);
534: memcpy(rpc, &rpcbuf, sizeof(struct tagRPCCall));
535: memcpy(getBuffer(c), buf, len);
536:
1.13 misho 537: c->cli_sock = TASK_FD(task);
1.17.4.5 misho 538: memcpy(&c->cli_sa, sa, sizeof c->cli_sa);
1.13 misho 539: /* armed timer for close stateless connection */
540: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
541: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
542: c, ts, c, 0);
543: }
544:
1.17.4.5 misho 545: /* check integrity of packet */
546: if (ntohs(rpc->call_crc) != crcFletcher16((u_short*) buf, len / 2)) {
547: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
548: goto end;
549: }
1.13 misho 550:
1.17.4.5 misho 551: noreply = RPC_CHK_NOREPLY(rpc);
1.13 misho 552:
1.17.4.5 misho 553: /* check RPC packet session info */
554: if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
555: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
1.13 misho 556:
1.17.4.5 misho 557: rpc->call_argc ^= rpc->call_argc;
558: rpc->call_rep.ret = RPC_ERROR(-1);
559: rpc->call_rep.eno = RPC_ERROR(errno);
560: } else {
561: /* execute RPC call */
562: schedEvent(TASK_ROOT(task), execCall, c, noreply, rpc, len);
563: }
1.13 misho 564:
1.17.4.5 misho 565: /* send RPC reply */
566: if (!noreply)
567: schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],
568: c, TASK_FD(task), rpc, len);
1.13 misho 569: end:
1.17.4.5 misho 570: if (buf)
571: e_free(buf);
1.13 misho 572: schedReadSelf(task);
573: return NULL;
574: }
575:
576: /* ------------------------------------------------------ */
577:
1.16 misho 578: void
1.13 misho 579: rpc_freeBLOBCli(rpc_cli_t * __restrict c)
580: {
581: rpc_srv_t *s = c->cli_parent;
582:
583: schedCancelby(s->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
1.10 misho 584:
1.17.4.2 misho 585: /* free buffer(s) */
586: ait_freeVars(&c->cli_buf);
1.10 misho 587:
1.14 misho 588: array_Del(s->srv_blob.clients, c->cli_id, 0);
1.10 misho 589: if (c)
1.14 misho 590: e_free(c);
1.13 misho 591: }
592:
593:
594: static void *
595: closeBLOBClient(sched_task_t *task)
596: {
597: int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock;
598:
599: rpc_freeBLOBCli(TASK_ARG(task));
600:
601: /* close client socket */
602: shutdown(sock, SHUT_RDWR);
603: close(sock);
1.7 misho 604: return NULL;
605: }
606:
607: static void *
608: txBLOB(sched_task_t *task)
609: {
1.10 misho 610: rpc_cli_t *c = TASK_ARG(task);
1.17.4.6 misho 611: u_char *buf = AIT_GET_BUF(array(c->cli_buf, 0, ait_val_t*));
1.7 misho 612: int wlen = sizeof(struct tagBLOBHdr);
613:
614: /* send reply */
1.10 misho 615: wlen = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);
616: if (wlen == -1 || wlen != sizeof(struct tagBLOBHdr)) {
617: /* close blob connection */
618: schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
619: }
1.7 misho 620:
621: return NULL;
622: }
1.4 misho 623:
1.7 misho 624: static void *
625: rxBLOB(sched_task_t *task)
626: {
627: rpc_cli_t *c = TASK_ARG(task);
628: rpc_srv_t *s = c->cli_parent;
629: rpc_blob_t *b;
1.10 misho 630: struct tagBLOBHdr blob;
1.7 misho 631: int rlen;
632:
1.10 misho 633: memset(&blob, 0, sizeof blob);
634: rlen = recv(TASK_FD(task), &blob, sizeof blob, 0);
635: if (rlen < 1) {
636: /* close blob connection */
637: schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
1.7 misho 638: return NULL;
639: }
1.5 misho 640:
1.10 misho 641: /* check BLOB packet */
642: if (rlen < sizeof(struct tagBLOBHdr)) {
643: rpc_SetErr(ERPCMISMATCH, "Short BLOB packet");
1.6 misho 644:
1.10 misho 645: schedReadSelf(task);
1.7 misho 646: return NULL;
647: }
1.1 misho 648:
1.7 misho 649: /* check RPC packet session info */
1.15 misho 650: if (rpc_chkPktSession(&blob.hdr_session, &s->srv_session)) {
1.7 misho 651: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
1.10 misho 652: blob.hdr_cmd = error;
1.7 misho 653: goto end;
654: }
655:
656: /* Go to proceed packet ... */
1.10 misho 657: switch (blob.hdr_cmd) {
1.7 misho 658: case get:
1.10 misho 659: if (!(b = rpc_srv_getBLOB(s, ntohl(blob.hdr_var)))) {
660: rpc_SetErr(EINVAL, "Var=%x not found", ntohl(blob.hdr_var));
661: blob.hdr_cmd = no;
662: blob.hdr_ret = RPC_ERROR(-1);
1.7 misho 663: break;
664: } else
1.10 misho 665: blob.hdr_len = htonl(b->blob_len);
1.5 misho 666:
1.7 misho 667: if (rpc_srv_blobMap(s, b) != -1) {
668: /* deliver BLOB variable to client */
1.10 misho 669: blob.hdr_ret = htonl(rpc_srv_sendBLOB(c, b));
1.7 misho 670: rpc_srv_blobUnmap(b);
671: } else {
1.10 misho 672: blob.hdr_cmd = error;
673: blob.hdr_ret = RPC_ERROR(-1);
1.7 misho 674: }
675: break;
676: case set:
1.17 misho 677: if ((b = rpc_srv_registerBLOB(s, ntohl(blob.hdr_len),
678: ntohl(blob.hdr_ret)))) {
1.7 misho 679: /* set new BLOB variable for reply :) */
1.10 misho 680: blob.hdr_var = htonl(b->blob_var);
1.7 misho 681:
682: /* receive BLOB from client */
1.10 misho 683: blob.hdr_ret = htonl(rpc_srv_recvBLOB(c, b));
1.7 misho 684: rpc_srv_blobUnmap(b);
1.5 misho 685: } else {
1.10 misho 686: blob.hdr_cmd = error;
687: blob.hdr_ret = RPC_ERROR(-1);
1.7 misho 688: }
689: break;
690: case unset:
1.11 misho 691: if (rpc_srv_unregisterBLOB(s, ntohl(blob.hdr_var)) == -1) {
1.10 misho 692: blob.hdr_cmd = error;
693: blob.hdr_ret = RPC_ERROR(-1);
1.1 misho 694: }
695: break;
1.7 misho 696: default:
1.10 misho 697: rpc_SetErr(EPROCUNAVAIL, "Unsupported BLOB command %d", blob.hdr_cmd);
698: blob.hdr_cmd = error;
699: blob.hdr_ret = RPC_ERROR(-1);
1.7 misho 700: }
1.1 misho 701:
1.7 misho 702: end:
1.17.4.6 misho 703: memcpy(AIT_ADDR(array(c->cli_buf, 0, ait_val_t*)), &blob, sizeof blob);
1.10 misho 704: schedWrite(TASK_ROOT(task), txBLOB, TASK_ARG(task), TASK_FD(task), NULL, 0);
705: schedReadSelf(task);
1.7 misho 706: return NULL;
1.2 misho 707: }
708:
709: static void *
1.17 misho 710: flushBLOB(sched_task_t *task)
711: {
712: rpc_srv_t *srv = TASK_ARG(task);
713: rpc_blob_t *b, *tmp;
714:
715: TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
716: TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
717:
718: rpc_srv_blobFree(srv, b);
719: e_free(b);
720: }
721:
722: schedSignalSelf(task);
723: return NULL;
724: }
725:
726: static void *
1.10 misho 727: acceptBLOBClients(sched_task_t *task)
1.2 misho 728: {
1.10 misho 729: rpc_srv_t *srv = TASK_ARG(task);
730: rpc_cli_t *c = NULL;
731: register int i;
1.14 misho 732: socklen_t salen = sizeof(sockaddr_t);
1.12 misho 733: #ifdef TCP_NOPUSH
734: int n = 1;
735: #endif
1.7 misho 736:
1.10 misho 737: /* check free slots for connect */
1.14 misho 738: for (i = 0; i < array_Size(srv->srv_blob.clients) &&
739: (c = array(srv->srv_blob.clients, i, rpc_cli_t*)); i++);
1.10 misho 740: if (c) /* no more free slots! */
741: goto end;
1.14 misho 742: c = e_malloc(sizeof(rpc_cli_t));
1.10 misho 743: if (!c) {
1.7 misho 744: LOGERR;
1.10 misho 745: srv->srv_kill = srv->srv_blob.kill = 1;
1.7 misho 746: return NULL;
747: } else {
1.10 misho 748: memset(c, 0, sizeof(rpc_cli_t));
1.14 misho 749: array_Set(srv->srv_blob.clients, i, c);
1.10 misho 750: c->cli_id = i;
751: c->cli_parent = srv;
1.7 misho 752: }
1.4 misho 753:
1.17.4.2 misho 754: /* init buffer(s) */
755: c->cli_buf = ait_allocVars(1);
756: if (!c->cli_buf) {
757: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
758: array_Del(srv->srv_blob.clients, i, 42);
759: goto end;
760: } else
761: AIT_SET_BUFSIZ(array(c->cli_buf, 0, ait_val_t*), 0, srv->srv_netbuf);
1.2 misho 762:
1.10 misho 763: /* accept client */
764: c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
765: if (c->cli_sock == -1) {
766: LOGERR;
1.17.4.2 misho 767: ait_freeVars(&c->cli_buf);
1.14 misho 768: array_Del(srv->srv_blob.clients, i, 42);
1.10 misho 769: goto end;
1.12 misho 770: } else {
771: #ifdef TCP_NOPUSH
772: setsockopt(c->cli_sock, IPPROTO_TCP, TCP_NOPUSH, &n, sizeof n);
773: #endif
1.10 misho 774: fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
1.12 misho 775: }
1.2 misho 776:
1.10 misho 777: schedRead(TASK_ROOT(task), rxBLOB, c, c->cli_sock, NULL, 0);
778: end:
779: schedReadSelf(task);
1.7 misho 780: return NULL;
1.1 misho 781: }
782:
1.10 misho 783: /* ------------------------------------------------------ */
1.1 misho 784:
785: /*
1.7 misho 786: * rpc_srv_initBLOBServer() - Init & create BLOB Server
787: *
1.4 misho 788: * @srv = RPC server instance
1.2 misho 789: * @Port = Port for bind server, if Port == 0 default port is selected
790: * @diskDir = Disk place for BLOB file objects
791: * return: -1 == error or 0 bind and created BLOB server instance
792: */
793: int
794: rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)
795: {
796: int n = 1;
797:
1.10 misho 798: if (!srv || srv->srv_kill) {
1.7 misho 799: rpc_SetErr(EINVAL, "Invalid parameters can`t init BLOB server");
1.2 misho 800: return -1;
801: }
802:
803: memset(&srv->srv_blob, 0, sizeof srv->srv_blob);
804: if (access(diskDir, R_OK | W_OK) == -1) {
805: LOGERR;
806: return -1;
807: } else
1.7 misho 808: AIT_SET_STR(&srv->srv_blob.dir, diskDir);
1.2 misho 809:
1.10 misho 810: /* init blob list */
811: TAILQ_INIT(&srv->srv_blob.blobs);
812:
1.2 misho 813: srv->srv_blob.server.cli_parent = srv;
1.4 misho 814:
1.14 misho 815: memcpy(&srv->srv_blob.server.cli_sa, &srv->srv_server.cli_sa, sizeof(sockaddr_t));
1.10 misho 816: switch (srv->srv_blob.server.cli_sa.sa.sa_family) {
1.4 misho 817: case AF_INET:
1.10 misho 818: srv->srv_blob.server.cli_sa.sin.sin_port =
819: htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin.sin_port) + 1);
1.4 misho 820: break;
821: case AF_INET6:
1.10 misho 822: srv->srv_blob.server.cli_sa.sin6.sin6_port =
823: htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin6.sin6_port) + 1);
1.4 misho 824: break;
825: case AF_LOCAL:
1.10 misho 826: strlcat(srv->srv_blob.server.cli_sa.sun.sun_path, ".blob",
827: sizeof srv->srv_blob.server.cli_sa.sun.sun_path);
1.4 misho 828: break;
829: default:
1.7 misho 830: AIT_FREE_VAL(&srv->srv_blob.dir);
1.4 misho 831: return -1;
1.2 misho 832: }
833:
1.4 misho 834: /* create BLOB server socket */
1.6 misho 835: srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);
1.2 misho 836: if (srv->srv_blob.server.cli_sock == -1) {
837: LOGERR;
1.7 misho 838: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 839: return -1;
840: }
841: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
842: LOGERR;
843: close(srv->srv_blob.server.cli_sock);
1.7 misho 844: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 845: return -1;
846: }
1.5 misho 847: n = srv->srv_netbuf;
848: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
849: LOGERR;
850: close(srv->srv_blob.server.cli_sock);
1.7 misho 851: AIT_FREE_VAL(&srv->srv_blob.dir);
1.5 misho 852: return -1;
853: }
854: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
855: LOGERR;
856: close(srv->srv_blob.server.cli_sock);
1.7 misho 857: AIT_FREE_VAL(&srv->srv_blob.dir);
1.5 misho 858: return -1;
859: }
1.6 misho 860: if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa,
861: srv->srv_blob.server.cli_sa.sa.sa_len) == -1) {
1.2 misho 862: LOGERR;
863: close(srv->srv_blob.server.cli_sock);
1.7 misho 864: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 865: return -1;
1.13 misho 866: } else
867: fcntl(srv->srv_blob.server.cli_sock, F_SETFL,
868: fcntl(srv->srv_blob.server.cli_sock, F_GETFL) | O_NONBLOCK);
869:
1.2 misho 870:
1.10 misho 871: /* allocate pool for concurent blob clients */
1.14 misho 872: srv->srv_blob.clients = array_Init(array_Size(srv->srv_clients));
1.2 misho 873: if (!srv->srv_blob.clients) {
1.14 misho 874: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
1.2 misho 875: close(srv->srv_blob.server.cli_sock);
1.7 misho 876: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 877: return -1;
1.10 misho 878: }
1.2 misho 879:
1.10 misho 880: /* init blob scheduler */
881: srv->srv_blob.root = schedBegin();
882: if (!srv->srv_blob.root) {
883: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1.14 misho 884: array_Destroy(&srv->srv_blob.clients);
1.10 misho 885: close(srv->srv_blob.server.cli_sock);
886: AIT_FREE_VAL(&srv->srv_blob.dir);
887: return -1;
888: }
1.2 misho 889:
890: return 0;
891: }
892:
893: /*
1.7 misho 894: * rpc_srv_endBLOBServer() - Destroy BLOB server, close all opened sockets and free resources
895: *
1.2 misho 896: * @srv = RPC Server instance
897: * return: none
898: */
1.16 misho 899: void
1.2 misho 900: rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
901: {
1.10 misho 902: if (!srv)
1.2 misho 903: return;
904:
1.10 misho 905: srv->srv_blob.kill = 1;
1.17 misho 906:
907: schedEnd(&srv->srv_blob.root);
1.2 misho 908: }
909:
910: /*
1.10 misho 911: * rpc_srv_loopBLOBServer() - Execute Main BLOB server loop and wait for clients requests
1.7 misho 912: *
1.2 misho 913: * @srv = RPC Server instance
914: * return: -1 error or 0 ok, infinite loop ...
915: */
916: int
1.10 misho 917: rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv)
1.2 misho 918: {
1.10 misho 919: rpc_cli_t *c;
1.2 misho 920: register int i;
1.10 misho 921: rpc_blob_t *b, *tmp;
922: struct timespec ts = { RPC_SCHED_POLLING, 0 };
1.2 misho 923:
1.10 misho 924: if (!srv || srv->srv_kill) {
1.7 misho 925: rpc_SetErr(EINVAL, "Invalid parameter can`t start BLOB server");
1.2 misho 926: return -1;
927: }
928:
1.14 misho 929: if (listen(srv->srv_blob.server.cli_sock, array_Size(srv->srv_blob.clients)) == -1) {
1.2 misho 930: LOGERR;
931: return -1;
1.10 misho 932: }
933:
1.17 misho 934: schedSignal(srv->srv_blob.root, flushBLOB, srv, SIGFBLOB, NULL, 0);
1.10 misho 935: if (!schedRead(srv->srv_blob.root, acceptBLOBClients, srv,
936: srv->srv_blob.server.cli_sock, NULL, 0)) {
937: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
938: return -1;
939: }
1.2 misho 940:
1.10 misho 941: schedPolling(srv->srv_blob.root, &ts, NULL);
942: /* main rpc loop */
943: schedRun(srv->srv_blob.root, &srv->srv_blob.kill);
1.7 misho 944:
1.17 misho 945: /* detach blobs */
946: TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
947: TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
948:
949: rpc_srv_blobFree(srv, b);
950: e_free(b);
951: }
952:
1.10 misho 953: /* close all clients connections & server socket */
1.14 misho 954: for (i = 0; i < array_Size(srv->srv_blob.clients); i++) {
955: c = array(srv->srv_blob.clients, i, rpc_cli_t*);
1.10 misho 956: if (c) {
957: shutdown(c->cli_sock, SHUT_RDWR);
958: close(c->cli_sock);
959:
960: schedCancelby(srv->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
1.17.4.2 misho 961: ait_freeVars(&c->cli_buf);
1.2 misho 962: }
1.14 misho 963: array_Del(srv->srv_blob.clients, i, 42);
1.10 misho 964: }
1.14 misho 965: array_Destroy(&srv->srv_blob.clients);
1.2 misho 966:
1.10 misho 967: close(srv->srv_blob.server.cli_sock);
1.2 misho 968:
1.10 misho 969: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 970: return 0;
971: }
972:
973:
974: /*
1.7 misho 975: * rpc_srv_initServer() - Init & create RPC Server
976: *
1.15 misho 977: * @InstID = Instance for authentication & recognition
1.1 misho 978: * @concurentClients = Concurent clients at same time to this server
1.10 misho 979: * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
1.4 misho 980: * @csHost = Host name or address for bind server, if NULL any address
1.1 misho 981: * @Port = Port for bind server, if Port == 0 default port is selected
1.13 misho 982: * @proto = Protocol, if == 0 choose SOCK_STREAM
1.1 misho 983: * return: NULL == error or !=NULL bind and created RPC server instance
984: */
985: rpc_srv_t *
1.15 misho 986: rpc_srv_initServer(u_char InstID, int concurentClients, int netBuf,
987: const char *csHost, u_short Port, int proto)
1.1 misho 988: {
1.10 misho 989: int n = 1;
1.1 misho 990: rpc_srv_t *srv = NULL;
1.14 misho 991: sockaddr_t sa = E_SOCKADDR_INIT;
1.1 misho 992:
1.15 misho 993: if (!concurentClients || (proto < 0 || proto > SOCK_DGRAM)) {
1.10 misho 994: rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
1.1 misho 995: return NULL;
996: }
1.14 misho 997: if (!e_gethostbyname(csHost, Port, &sa))
1.10 misho 998: return NULL;
1.1 misho 999: if (!Port)
1000: Port = RPC_DEFPORT;
1.13 misho 1001: if (!proto)
1002: proto = SOCK_STREAM;
1.10 misho 1003: if (netBuf < RPC_MIN_BUFSIZ)
1.5 misho 1004: netBuf = BUFSIZ;
1.7 misho 1005: else
1.14 misho 1006: netBuf = E_ALIGN(netBuf, 2); /* align netBuf length */
1.10 misho 1007:
1008: #ifdef HAVE_SRANDOMDEV
1009: srandomdev();
1010: #else
1011: time_t tim;
1012:
1013: srandom((time(&tim) ^ getpid()));
1014: #endif
1.1 misho 1015:
1.14 misho 1016: srv = e_malloc(sizeof(rpc_srv_t));
1.1 misho 1017: if (!srv) {
1018: LOGERR;
1019: return NULL;
1020: } else
1021: memset(srv, 0, sizeof(rpc_srv_t));
1022:
1.13 misho 1023: srv->srv_proto = proto;
1.5 misho 1024: srv->srv_netbuf = netBuf;
1.1 misho 1025: srv->srv_session.sess_version = RPC_VERSION;
1.15 misho 1026: srv->srv_session.sess_instance = InstID;
1.1 misho 1027:
1028: srv->srv_server.cli_parent = srv;
1.10 misho 1029: memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa);
1030:
1.12 misho 1031: /* init functions */
1032: pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
1033: SLIST_INIT(&srv->srv_funcs);
1034: AVL_INIT(&srv->srv_funcs);
1.10 misho 1035:
1036: /* init scheduler */
1037: srv->srv_root = schedBegin();
1038: if (!srv->srv_root) {
1039: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1.12 misho 1040: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1.14 misho 1041: e_free(srv);
1.10 misho 1042: return NULL;
1043: }
1044:
1045: /* init pool for clients */
1.14 misho 1046: srv->srv_clients = array_Init(concurentClients);
1.10 misho 1047: if (!srv->srv_clients) {
1.14 misho 1048: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
1.10 misho 1049: schedEnd(&srv->srv_root);
1.12 misho 1050: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1.14 misho 1051: e_free(srv);
1.10 misho 1052: return NULL;
1053: }
1.4 misho 1054:
1055: /* create server socket */
1.13 misho 1056: srv->srv_server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, srv->srv_proto, 0);
1.1 misho 1057: if (srv->srv_server.cli_sock == -1) {
1058: LOGERR;
1.14 misho 1059: array_Destroy(&srv->srv_clients);
1.10 misho 1060: schedEnd(&srv->srv_root);
1.12 misho 1061: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1.14 misho 1062: e_free(srv);
1.1 misho 1063: return NULL;
1064: }
1065: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
1066: LOGERR;
1.10 misho 1067: goto err;
1.1 misho 1068: }
1.5 misho 1069: n = srv->srv_netbuf;
1070: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
1071: LOGERR;
1.10 misho 1072: goto err;
1.5 misho 1073: }
1074: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
1075: LOGERR;
1.10 misho 1076: goto err;
1.5 misho 1077: }
1.6 misho 1078: if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa,
1079: srv->srv_server.cli_sa.sa.sa_len) == -1) {
1.1 misho 1080: LOGERR;
1.10 misho 1081: goto err;
1.13 misho 1082: } else
1083: fcntl(srv->srv_server.cli_sock, F_SETFL,
1084: fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
1.1 misho 1085:
1.10 misho 1086: rpc_register_srvPing(srv);
1.8 misho 1087:
1.1 misho 1088: return srv;
1.10 misho 1089: err: /* error condition */
1090: close(srv->srv_server.cli_sock);
1.14 misho 1091: array_Destroy(&srv->srv_clients);
1.10 misho 1092: schedEnd(&srv->srv_root);
1.12 misho 1093: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1.14 misho 1094: e_free(srv);
1.10 misho 1095: return NULL;
1.1 misho 1096: }
1097:
1098: /*
1.7 misho 1099: * rpc_srv_endServer() - Destroy RPC server, close all opened sockets and free resources
1100: *
1.6 misho 1101: * @psrv = RPC Server instance
1.1 misho 1102: * return: none
1103: */
1.16 misho 1104: void
1.6 misho 1105: rpc_srv_endServer(rpc_srv_t ** __restrict psrv)
1.1 misho 1106: {
1.10 misho 1107: if (!psrv || !*psrv)
1.1 misho 1108: return;
1109:
1.10 misho 1110: /* if send kill to blob server */
1.17 misho 1111: rpc_srv_endBLOBServer(*psrv);
1.1 misho 1112:
1.10 misho 1113: (*psrv)->srv_kill = 1;
1114: sleep(RPC_SCHED_POLLING);
1.2 misho 1115:
1.17 misho 1116: schedEnd(&(*psrv)->srv_root);
1117:
1.12 misho 1118: pthread_mutex_destroy(&(*psrv)->srv_funcs.mtx);
1.14 misho 1119: e_free(*psrv);
1.6 misho 1120: *psrv = NULL;
1.1 misho 1121: }
1122:
1123: /*
1.7 misho 1124: * rpc_srv_loopServer() - Execute Main server loop and wait for clients requests
1125: *
1.1 misho 1126: * @srv = RPC Server instance
1127: * return: -1 error or 0 ok, infinite loop ...
1128: */
1129: int
1.5 misho 1130: rpc_srv_loopServer(rpc_srv_t * __restrict srv)
1.1 misho 1131: {
1.10 misho 1132: rpc_cli_t *c;
1.1 misho 1133: register int i;
1.12 misho 1134: rpc_func_t *f;
1.10 misho 1135: struct timespec ts = { RPC_SCHED_POLLING, 0 };
1.1 misho 1136:
1137: if (!srv) {
1.10 misho 1138: rpc_SetErr(EINVAL, "Invalid parameter can`t start RPC server");
1.1 misho 1139: return -1;
1140: }
1141:
1.13 misho 1142: if (srv->srv_proto == SOCK_STREAM)
1.14 misho 1143: if (listen(srv->srv_server.cli_sock, array_Size(srv->srv_clients)) == -1) {
1.13 misho 1144: LOGERR;
1145: return -1;
1146: }
1.1 misho 1147:
1.13 misho 1148: if (!schedRead(srv->srv_root, cbProto[srv->srv_proto][CB_ACCEPTCLIENT], srv,
1149: srv->srv_server.cli_sock, NULL, 0)) {
1.10 misho 1150: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1151: return -1;
1152: }
1.7 misho 1153:
1.10 misho 1154: schedPolling(srv->srv_root, &ts, NULL);
1.7 misho 1155: /* main rpc loop */
1.10 misho 1156: schedRun(srv->srv_root, &srv->srv_kill);
1157:
1158: /* close all clients connections & server socket */
1.14 misho 1159: for (i = 0; i < array_Size(srv->srv_clients); i++) {
1160: c = array(srv->srv_clients, i, rpc_cli_t*);
1.10 misho 1161: if (c) {
1162: shutdown(c->cli_sock, SHUT_RDWR);
1163: close(c->cli_sock);
1164:
1165: schedCancelby(srv->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
1.14 misho 1166: ait_freeVars(&RPC_RETVARS(c));
1.17.4.2 misho 1167: ait_freeVars(&c->cli_buf);
1.1 misho 1168: }
1.14 misho 1169: array_Del(srv->srv_clients, i, 42);
1.10 misho 1170: }
1.14 misho 1171: array_Destroy(&srv->srv_clients);
1.2 misho 1172:
1.10 misho 1173: close(srv->srv_server.cli_sock);
1.2 misho 1174:
1.10 misho 1175: /* detach exported calls */
1.12 misho 1176: RPC_FUNCS_LOCK(&srv->srv_funcs);
1177: while ((f = SLIST_FIRST(&srv->srv_funcs))) {
1178: SLIST_REMOVE_HEAD(&srv->srv_funcs, func_next);
1.1 misho 1179:
1.10 misho 1180: AIT_FREE_VAL(&f->func_name);
1.14 misho 1181: e_free(f);
1.1 misho 1182: }
1.12 misho 1183: srv->srv_funcs.avlh_root = NULL;
1184: RPC_FUNCS_UNLOCK(&srv->srv_funcs);
1.1 misho 1185:
1186: return 0;
1187: }
1188:
1189:
1190: /*
1191: * rpc_srv_execCall() Execute registered call from RPC server
1.7 misho 1192: *
1.10 misho 1193: * @cli = RPC client
1.1 misho 1194: * @rpc = IN RPC call structure
1.10 misho 1195: * @funcname = Execute RPC function
1.5 misho 1196: * @args = IN RPC calling arguments from RPC client
1.1 misho 1197: * return: -1 error, !=-1 ok
1198: */
1199: int
1.10 misho 1200: rpc_srv_execCall(rpc_cli_t * __restrict cli, struct tagRPCCall * __restrict rpc,
1201: ait_val_t funcname, array_t * __restrict args)
1.1 misho 1202: {
1203: rpc_callback_t func;
1204:
1.10 misho 1205: if (!cli || !rpc || !AIT_ADDR(&funcname)) {
1.7 misho 1206: rpc_SetErr(EINVAL, "Invalid parameter can`t exec function");
1.1 misho 1207: return -1;
1208: }
1209:
1.10 misho 1210: func = AIT_GET_LIKE(&funcname, rpc_callback_t);
1211: return func(cli, rpc, args);
1.1 misho 1212: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>