Annotation of libaitrpc/src/srv.c, revision 1.17.4.6
1.1 misho 1: /*************************************************************************
2: * (C) 2010 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.17.4.6! misho 6: * $Id: srv.c,v 1.17.4.5 2013/08/21 12:59:12 misho Exp $
1.1 misho 7: *
1.2 misho 8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
1.14 misho 15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013
1.2 misho 16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
1.1 misho 46: #include "global.h"
47:
48:
1.13 misho 49: /* SOCK_STREAM */
50: static void *acceptClients(sched_task_t *);
51: static void *closeClient(sched_task_t *);
52: static void *rxPacket(sched_task_t *);
53: static void *txPacket(sched_task_t *);
54:
55: /* SOCK_DGRAM */
56: static void *freeClient(sched_task_t *);
57: static void *rxUDPPacket(sched_task_t *);
58: static void *txUDPPacket(sched_task_t *);
59:
60: /* SOCK_RAW */
61:
62: static sched_task_func_t cbProto[SOCK_RAW + 1][4] = {
63: { acceptClients, closeClient, rxPacket, txPacket }, /* SOCK_STREAM */
64: { acceptClients, closeClient, rxPacket, txPacket }, /* SOCK_STREAM */
65: { rxUDPPacket, freeClient, rxUDPPacket, txUDPPacket }, /* SOCK_DGRAM */
66: { NULL, NULL, NULL, NULL } /* SOCK_RAW */
67: };
68:
69:
1.17.4.3 misho 70: static inline u_char *
71: getBuffer(rpc_cli_t * __restrict c)
72: {
73: u_char *b = NULL;
74:
75: assert(c);
76:
77: if (RPC_ISNEXTBUF(c))
78: b = AIT_GET_BUF(array(c->cli_buf, 1, ait_val_t*));
79: else
80: b = AIT_GET_BUF(array(c->cli_buf, 0, ait_val_t*)) +
81: sizeof(struct tagRPCCall);
82:
83: return b;
84: }
85:
86: static inline struct tagRPCCall *
87: getHeader(rpc_cli_t * __restrict c)
88: {
89: assert(c);
90:
91: return (struct tagRPCCall*) AIT_GET_BUF(array(c->cli_buf, 0, ait_val_t*));
92: }
93:
1.16 misho 94: void
1.13 misho 95: rpc_freeCli(rpc_cli_t * __restrict c)
1.10 misho 96: {
97: rpc_srv_t *s = c->cli_parent;
98:
1.13 misho 99: schedCancelby(s->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
1.10 misho 100:
1.17.4.1 misho 101: /* free buffer(s) */
102: ait_freeVars(&c->cli_buf);
1.10 misho 103:
1.14 misho 104: array_Del(s->srv_clients, c->cli_id, 0);
1.10 misho 105: if (c)
1.14 misho 106: e_free(c);
1.13 misho 107: }
108:
109:
110: static inline int
1.14 misho 111: _check4freeslot(rpc_srv_t * __restrict srv, sockaddr_t * __restrict sa)
1.13 misho 112: {
113: rpc_cli_t *c = NULL;
114: register int i;
115:
116: /* check free slots for connect */
1.14 misho 117: for (i = 0; i < array_Size(srv->srv_clients) &&
118: (c = array(srv->srv_clients, i, rpc_cli_t*)); i++)
1.13 misho 119: /* check for duplicates */
1.14 misho 120: if (sa && !e_addrcmp(&c->cli_sa, sa, 42))
1.13 misho 121: break;
1.14 misho 122: if (i >= array_Size(srv->srv_clients))
1.13 misho 123: return -1; /* no more free slots! */
124:
125: return i;
126: }
127:
128: static rpc_cli_t *
1.14 misho 129: _allocClient(rpc_srv_t * __restrict srv, sockaddr_t * __restrict sa)
1.13 misho 130: {
131: rpc_cli_t *c = NULL;
132: int n;
133:
134: n = _check4freeslot(srv, sa);
135: if (n == -1)
136: return NULL;
137: else
1.14 misho 138: c = array(srv->srv_clients, n, rpc_cli_t*);
1.13 misho 139:
140: if (!c) {
1.14 misho 141: c = e_malloc(sizeof(rpc_cli_t));
1.13 misho 142: if (!c) {
143: LOGERR;
144: srv->srv_kill = 1;
145: return NULL;
146: } else {
147: memset(c, 0, sizeof(rpc_cli_t));
1.14 misho 148: array_Set(srv->srv_clients, n, c);
1.13 misho 149: c->cli_id = n;
150: c->cli_parent = srv;
151: }
152:
1.17.4.2 misho 153: /* init buffer(s) */
1.17.4.5 misho 154: c->cli_buf = ait_allocVars(2);
1.17.4.1 misho 155: if (!c->cli_buf) {
156: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
157: array_Del(srv->srv_clients, n, 42);
158: return NULL;
159: } else
160: AIT_SET_BUFSIZ(array(c->cli_buf, 0, ait_val_t*), 0, srv->srv_netbuf);
1.13 misho 161: }
162:
163: return c;
164: }
165:
166:
167: static void *
168: freeClient(sched_task_t *task)
169: {
170: rpc_freeCli(TASK_ARG(task));
171:
172: return NULL;
173: }
174:
175: static void *
176: closeClient(sched_task_t *task)
177: {
178: int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock;
179:
180: rpc_freeCli(TASK_ARG(task));
181:
182: /* close client socket */
183: shutdown(sock, SHUT_RDWR);
184: close(sock);
1.10 misho 185: return NULL;
186: }
1.7 misho 187:
188: static void *
189: txPacket(sched_task_t *task)
190: {
191: rpc_cli_t *c = TASK_ARG(task);
192: rpc_srv_t *s = c->cli_parent;
193: rpc_func_t *f = NULL;
1.17.4.4 misho 194: u_char *buf;
1.17.4.5 misho 195: struct tagRPCCall *rpc = (struct tagRPCCall*) TASK_DATA(task);
1.7 misho 196: int ret, wlen = sizeof(struct tagRPCCall);
1.17.4.4 misho 197: int len = sizeof(struct tagRPCCall) + ntohl(rpc->call_len);
1.10 misho 198:
1.17.4.4 misho 199: buf = e_malloc(len);
200: if (!buf) {
201: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
1.17.4.5 misho 202: /* close connection */
203: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
204: TASK_ARG(task), 0, NULL, 0);
1.17.4.4 misho 205: return NULL;
206: } else {
207: /* copy RPC header */
1.17.4.5 misho 208: memcpy(buf, rpc, wlen);
1.17.4.4 misho 209: rpc = (struct tagRPCCall*) buf;
210: }
1.7 misho 211:
212: if (rpc->call_argc) {
1.10 misho 213: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
1.7 misho 214: if (!f) {
1.10 misho 215: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
1.17.4.4 misho 216:
1.7 misho 217: rpc->call_argc ^= rpc->call_argc;
218: rpc->call_rep.ret = RPC_ERROR(-1);
219: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
220: } else {
1.14 misho 221: rpc->call_argc = htons(array_Size(RPC_RETVARS(c)));
1.7 misho 222: /* Go Encapsulate variables */
1.17.4.4 misho 223: ret = ait_vars2buffer(buf + wlen, len - wlen, RPC_RETVARS(c));
1.10 misho 224: /* Free return values */
1.14 misho 225: ait_freeVars(&c->cli_vars);
1.7 misho 226: if (ret == -1) {
227: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
1.17.4.4 misho 228:
1.7 misho 229: rpc->call_argc ^= rpc->call_argc;
230: rpc->call_rep.ret = RPC_ERROR(-1);
231: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
232: } else
233: wlen += ret;
234: }
235: }
236:
1.17.4.4 misho 237: rpc->call_len = htonl(wlen);
1.7 misho 238:
239: /* send reply */
1.10 misho 240: ret = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);
241: if (ret == -1 || ret != wlen) {
242: /* close connection */
1.13 misho 243: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
244: TASK_ARG(task), 0, NULL, 0);
1.10 misho 245: }
1.7 misho 246:
1.17.4.5 misho 247: e_free(buf);
1.7 misho 248: return NULL;
249: }
250:
251: static void *
252: execCall(sched_task_t *task)
253: {
254: rpc_cli_t *c = TASK_ARG(task);
255: rpc_srv_t *s = c->cli_parent;
256: rpc_func_t *f = NULL;
257: array_t *arr = NULL;
1.17.4.3 misho 258: u_char *buf = getBuffer(c);
259: struct tagRPCCall *rpc = getHeader(c);
1.7 misho 260: int argc = ntohs(rpc->call_argc);
261:
262: /* Go decapsulate variables ... */
1.8 misho 263: if (argc) {
1.17.4.3 misho 264: arr = ait_buffer2vars(buf, ntohl(rpc->call_len), argc, 42);
1.7 misho 265: if (!arr) {
1.14 misho 266: rpc_SetErr(ERPCMISMATCH, "#%d - %s", elwix_GetErrno(), elwix_GetError());
1.17.4.3 misho 267:
1.7 misho 268: rpc->call_argc ^= rpc->call_argc;
269: rpc->call_rep.ret = RPC_ERROR(-1);
270: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
271: return NULL;
272: }
1.10 misho 273: } else
274: arr = NULL;
1.7 misho 275:
1.10 misho 276: if (!(f = rpc_srv_getCall(s, ntohs(rpc->call_tag)))) {
1.7 misho 277: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
1.17.4.3 misho 278:
1.7 misho 279: rpc->call_argc ^= rpc->call_argc;
280: rpc->call_rep.ret = RPC_ERROR(-1);
281: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
282: } else {
1.8 misho 283: /* if client doesn't want reply */
1.10 misho 284: argc = RPC_CHK_NOREPLY(rpc);
285: rpc->call_rep.ret = RPC_ERROR(rpc_srv_execCall(c, rpc, f->func_name, arr));
1.7 misho 286: if (rpc->call_rep.ret == htonl(-1)) {
287: rpc->call_rep.eno = RPC_ERROR(errno);
288: rpc->call_argc ^= rpc->call_argc;
289: } else {
290: rpc->call_rep.eno ^= rpc->call_rep.eno;
1.8 misho 291: if (argc) {
292: /* without reply */
1.14 misho 293: ait_freeVars(&c->cli_vars);
1.8 misho 294: rpc->call_argc ^= rpc->call_argc;
1.10 misho 295: } else {
296: /* reply */
1.14 misho 297: rpc->call_argc = htons(array_Size(RPC_RETVARS(c)));
1.10 misho 298: }
1.7 misho 299: }
300: }
301:
1.14 misho 302: array_Destroy(&arr);
1.7 misho 303: return NULL;
304: }
305:
306: static void *
307: rxPacket(sched_task_t *task)
308: {
309: rpc_cli_t *c = TASK_ARG(task);
310: rpc_srv_t *s = c->cli_parent;
1.10 misho 311: int len, rlen, noreply;
1.17.4.3 misho 312: ait_val_t *bufz = array(c->cli_buf, 0, ait_val_t*);
313: u_char *buf = (u_char*) AIT_GET_BUF(bufz);
314: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
1.7 misho 315:
1.17.4.3 misho 316: memset(buf, 0, AIT_LEN(bufz));
317: /* 1st buffer is last */
318: RPC_CLR_NEXTBUF(c);
319:
320: /* read rpc header */
321: rlen = recv(TASK_FD(task), rpc, MIN(sizeof(struct tagRPCCall), AIT_LEN(bufz)), 0);
1.17.4.5 misho 322: if (rlen < sizeof(struct tagRPCCall) ||
1.17.4.3 misho 323: ntohl(rpc->call_len) < sizeof(struct tagRPCCall)) {
324: /* close connection */
325: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
326: TASK_ARG(task), 0, NULL, 0);
327: return NULL;
1.10 misho 328: } else {
1.17.4.3 misho 329: buf += sizeof(struct tagRPCCall);
330: len = ntohl(rpc->call_len);
1.9 misho 331: }
1.7 misho 332:
1.17.4.3 misho 333: if (len > (AIT_LEN(bufz) - sizeof(struct tagRPCCall))) {
334: /* add extra buffer */
335: if (!(bufz = ait_getVars(&c->cli_buf, 1))) {
336: /* close connection */
337: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
338: TASK_ARG(task), 0, NULL, 0);
1.8 misho 339: return NULL;
1.17.4.3 misho 340: } else {
1.17.4.5 misho 341: AIT_FREE_VAL(bufz);
1.17.4.3 misho 342: AIT_SET_BUFSIZ(bufz, 0, len);
343: buf = AIT_GET_BUF(bufz);
1.10 misho 344: }
1.17.4.3 misho 345: /* buffer isnt last */
346: RPC_SET_NEXTBUF(c);
347: }
1.8 misho 348:
1.17.4.3 misho 349: /* read payload */
350: rlen = recv(TASK_FD(task), buf, len, 0);
351: if (rlen < len) {
352: /* close connection */
353: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
354: TASK_ARG(task), 0, NULL, 0);
355: return NULL;
356: }
1.7 misho 357:
1.17.4.3 misho 358: noreply = RPC_CHK_NOREPLY(rpc);
1.7 misho 359:
1.17.4.3 misho 360: /* check RPC packet session info */
361: if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) {
362: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
1.7 misho 363:
1.17.4.3 misho 364: rpc->call_argc ^= rpc->call_argc;
365: rpc->call_rep.ret = RPC_ERROR(-1);
366: rpc->call_rep.eno = RPC_ERROR(errno);
367: } else {
368: /* execute RPC call */
369: schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), noreply, rpc, len);
370: }
1.7 misho 371:
1.17.4.3 misho 372: /* send RPC reply */
373: if (!noreply)
374: schedWrite(TASK_ROOT(task), cbProto[s->srv_proto][CB_TXPACKET],
375: TASK_ARG(task), TASK_FD(task), rpc, len);
1.7 misho 376:
377: /* lets get next packet */
1.17.4.3 misho 378: schedReadSelf(task);
1.7 misho 379: return NULL;
380: }
381:
1.1 misho 382: static void *
1.10 misho 383: acceptClients(sched_task_t *task)
1.1 misho 384: {
1.10 misho 385: rpc_srv_t *srv = TASK_ARG(task);
386: rpc_cli_t *c = NULL;
1.14 misho 387: socklen_t salen = sizeof(sockaddr_t);
1.7 misho 388:
1.13 misho 389: c = _allocClient(srv, NULL);
390: if (!c)
1.10 misho 391: goto end;
392:
393: /* accept client */
394: c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
395: if (c->cli_sock == -1) {
396: LOGERR;
1.17.4.2 misho 397: ait_freeVars(&c->cli_buf);
1.14 misho 398: array_Del(srv->srv_clients, c->cli_id, 42);
1.10 misho 399: goto end;
1.1 misho 400: } else
1.10 misho 401: fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
1.1 misho 402:
1.13 misho 403: schedRead(TASK_ROOT(task), cbProto[srv->srv_proto][CB_RXPACKET], c,
404: c->cli_sock, NULL, 0);
1.10 misho 405: end:
406: schedReadSelf(task);
407: return NULL;
408: }
1.5 misho 409:
1.7 misho 410:
1.10 misho 411: static void *
1.13 misho 412: txUDPPacket(sched_task_t *task)
1.10 misho 413: {
414: rpc_cli_t *c = TASK_ARG(task);
415: rpc_srv_t *s = c->cli_parent;
1.13 misho 416: rpc_func_t *f = NULL;
1.17.4.5 misho 417: u_char *buf;
418: struct tagRPCCall *rpc = (struct tagRPCCall*) TASK_DATA(task);
1.13 misho 419: int ret, wlen = sizeof(struct tagRPCCall);
1.17.4.5 misho 420: int len = sizeof(struct tagRPCCall) + ntohl(rpc->call_len);
1.13 misho 421: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
422:
423: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
424: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
425: TASK_ARG(task), ts, TASK_ARG(task), 0);
426:
1.17.4.5 misho 427: buf = e_malloc(len);
428: if (!buf) {
429: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
430: /* close connection */
431: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
432: TASK_ARG(task), 0, NULL, 0);
433: return NULL;
434: } else {
435: /* copy RPC header */
436: memcpy(buf, rpc, wlen);
437: rpc = (struct tagRPCCall*) buf;
438: }
1.13 misho 439:
440: if (rpc->call_argc) {
441: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
442: if (!f) {
443: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
1.17.4.5 misho 444:
1.13 misho 445: rpc->call_argc ^= rpc->call_argc;
446: rpc->call_rep.ret = RPC_ERROR(-1);
447: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
448: } else {
1.14 misho 449: rpc->call_argc = htons(array_Size(RPC_RETVARS(c)));
1.13 misho 450: /* Go Encapsulate variables */
1.17.4.5 misho 451: ret = ait_vars2buffer(buf + wlen, len - wlen, RPC_RETVARS(c));
1.13 misho 452: /* Free return values */
1.14 misho 453: ait_freeVars(&c->cli_vars);
1.13 misho 454: if (ret == -1) {
455: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
1.17.4.5 misho 456:
1.13 misho 457: rpc->call_argc ^= rpc->call_argc;
458: rpc->call_rep.ret = RPC_ERROR(-1);
459: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
460: } else
461: wlen += ret;
462: }
463: }
1.7 misho 464:
1.17.4.5 misho 465: rpc->call_len = htonl(wlen);
1.7 misho 466:
1.13 misho 467: /* calculate CRC */
468: rpc->call_crc ^= rpc->call_crc;
469: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
470:
471: /* send reply */
472: ret = sendto(TASK_FD(task), buf, wlen, MSG_NOSIGNAL,
473: &c->cli_sa.sa, c->cli_sa.sa.sa_len);
474: if (ret == -1 || ret != wlen) {
475: /* close connection */
476: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
477: TASK_ARG(task), 0, NULL, 0);
478: }
479:
1.17.4.5 misho 480: e_free(buf);
1.13 misho 481: return NULL;
482: }
483:
484: static void *
485: rxUDPPacket(sched_task_t *task)
486: {
487: rpc_srv_t *srv = TASK_ARG(task);
488: rpc_cli_t *c = NULL;
489: int len, rlen, noreply;
1.17.4.5 misho 490: ait_val_t *bufz;
491: u_char *buf = NULL;
492: struct tagRPCCall rpcbuf, *rpc;
493: sockaddr_t sa[2];
1.14 misho 494: socklen_t salen;
1.13 misho 495: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
496:
1.17.4.5 misho 497: memset(buf, 0, AIT_LEN(bufz));
498: memset(&rpcbuf, 0, sizeof rpcbuf);
499:
1.13 misho 500: /* receive connect packet */
1.17.4.5 misho 501: salen = sa[0].ss.ss_len = sizeof(sockaddr_t);
502: rlen = recvfrom(TASK_FD(task), &rpcbuf, sizeof rpcbuf, 0, &sa[0].sa, &salen);
503: if (rlen < sizeof(struct tagRPCCall) || ntohl(rpcbuf.call_len) < sizeof(struct tagRPCCall))
504: goto end;
505: else
506: len = ntohl(rpcbuf.call_len);
507:
508: buf = e_malloc(len);
509: if (!buf)
1.13 misho 510: goto end;
1.17.4.5 misho 511: else
512: memset(buf, 0, len);
1.13 misho 513:
1.17.4.5 misho 514: /* read payload */
515: salen = sa[1].ss.ss_len = sizeof(sockaddr_t);
516: rlen = recvfrom(TASK_FD(task), buf, len, 0, &sa[1].sa, &salen);
517: if (rlen < len || memcmp(&sa[0], &sa[1], sizeof sa[0]))
518: goto end;
519:
520: c = _allocClient(srv, sa);
1.13 misho 521: if (!c)
522: goto end;
523: else {
1.17.4.5 misho 524: /* add extra buffer */
525: if (!(bufz = ait_getVars(&c->cli_buf, 1)))
526: goto end;
527: else {
528: AIT_FREE_VAL(bufz);
529: AIT_SET_BUFSIZ(bufz, 0, len);
530: /* buffer isnt last */
531: RPC_SET_NEXTBUF(c);
532: }
533:
534: rpc = getHeader(c);
535: memcpy(rpc, &rpcbuf, sizeof(struct tagRPCCall));
536: memcpy(getBuffer(c), buf, len);
537:
1.13 misho 538: c->cli_sock = TASK_FD(task);
1.17.4.5 misho 539: memcpy(&c->cli_sa, sa, sizeof c->cli_sa);
1.13 misho 540: /* armed timer for close stateless connection */
541: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
542: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
543: c, ts, c, 0);
544: }
545:
1.17.4.5 misho 546: /* check integrity of packet */
547: if (ntohs(rpc->call_crc) != crcFletcher16((u_short*) buf, len / 2)) {
548: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
549: goto end;
550: }
1.13 misho 551:
1.17.4.5 misho 552: noreply = RPC_CHK_NOREPLY(rpc);
1.13 misho 553:
1.17.4.5 misho 554: /* check RPC packet session info */
555: if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
556: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
1.13 misho 557:
1.17.4.5 misho 558: rpc->call_argc ^= rpc->call_argc;
559: rpc->call_rep.ret = RPC_ERROR(-1);
560: rpc->call_rep.eno = RPC_ERROR(errno);
561: } else {
562: /* execute RPC call */
563: schedEvent(TASK_ROOT(task), execCall, c, noreply, rpc, len);
564: }
1.13 misho 565:
1.17.4.5 misho 566: /* send RPC reply */
567: if (!noreply)
568: schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],
569: c, TASK_FD(task), rpc, len);
1.13 misho 570: end:
1.17.4.5 misho 571: if (buf)
572: e_free(buf);
1.13 misho 573: schedReadSelf(task);
574: return NULL;
575: }
576:
577: /* ------------------------------------------------------ */
578:
1.16 misho 579: void
1.13 misho 580: rpc_freeBLOBCli(rpc_cli_t * __restrict c)
581: {
582: rpc_srv_t *s = c->cli_parent;
583:
584: schedCancelby(s->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
1.10 misho 585:
1.17.4.2 misho 586: /* free buffer(s) */
587: ait_freeVars(&c->cli_buf);
1.10 misho 588:
1.14 misho 589: array_Del(s->srv_blob.clients, c->cli_id, 0);
1.10 misho 590: if (c)
1.14 misho 591: e_free(c);
1.13 misho 592: }
593:
594:
595: static void *
596: closeBLOBClient(sched_task_t *task)
597: {
598: int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock;
599:
600: rpc_freeBLOBCli(TASK_ARG(task));
601:
602: /* close client socket */
603: shutdown(sock, SHUT_RDWR);
604: close(sock);
1.7 misho 605: return NULL;
606: }
607:
608: static void *
609: txBLOB(sched_task_t *task)
610: {
1.10 misho 611: rpc_cli_t *c = TASK_ARG(task);
1.17.4.6! misho 612: u_char *buf = AIT_GET_BUF(array(c->cli_buf, 0, ait_val_t*));
1.7 misho 613: int wlen = sizeof(struct tagBLOBHdr);
614:
615: /* send reply */
1.10 misho 616: wlen = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);
617: if (wlen == -1 || wlen != sizeof(struct tagBLOBHdr)) {
618: /* close blob connection */
619: schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
620: }
1.7 misho 621:
622: return NULL;
623: }
1.4 misho 624:
1.7 misho 625: static void *
626: rxBLOB(sched_task_t *task)
627: {
628: rpc_cli_t *c = TASK_ARG(task);
629: rpc_srv_t *s = c->cli_parent;
630: rpc_blob_t *b;
1.10 misho 631: struct tagBLOBHdr blob;
1.7 misho 632: int rlen;
633:
1.10 misho 634: memset(&blob, 0, sizeof blob);
635: rlen = recv(TASK_FD(task), &blob, sizeof blob, 0);
636: if (rlen < 1) {
637: /* close blob connection */
638: schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
1.7 misho 639: return NULL;
640: }
1.5 misho 641:
1.10 misho 642: /* check BLOB packet */
643: if (rlen < sizeof(struct tagBLOBHdr)) {
644: rpc_SetErr(ERPCMISMATCH, "Short BLOB packet");
1.6 misho 645:
1.10 misho 646: schedReadSelf(task);
1.7 misho 647: return NULL;
648: }
1.1 misho 649:
1.7 misho 650: /* check RPC packet session info */
1.15 misho 651: if (rpc_chkPktSession(&blob.hdr_session, &s->srv_session)) {
1.7 misho 652: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
1.10 misho 653: blob.hdr_cmd = error;
1.7 misho 654: goto end;
655: }
656:
657: /* Go to proceed packet ... */
1.10 misho 658: switch (blob.hdr_cmd) {
1.7 misho 659: case get:
1.10 misho 660: if (!(b = rpc_srv_getBLOB(s, ntohl(blob.hdr_var)))) {
661: rpc_SetErr(EINVAL, "Var=%x not found", ntohl(blob.hdr_var));
662: blob.hdr_cmd = no;
663: blob.hdr_ret = RPC_ERROR(-1);
1.7 misho 664: break;
665: } else
1.10 misho 666: blob.hdr_len = htonl(b->blob_len);
1.5 misho 667:
1.7 misho 668: if (rpc_srv_blobMap(s, b) != -1) {
669: /* deliver BLOB variable to client */
1.10 misho 670: blob.hdr_ret = htonl(rpc_srv_sendBLOB(c, b));
1.7 misho 671: rpc_srv_blobUnmap(b);
672: } else {
1.10 misho 673: blob.hdr_cmd = error;
674: blob.hdr_ret = RPC_ERROR(-1);
1.7 misho 675: }
676: break;
677: case set:
1.17 misho 678: if ((b = rpc_srv_registerBLOB(s, ntohl(blob.hdr_len),
679: ntohl(blob.hdr_ret)))) {
1.7 misho 680: /* set new BLOB variable for reply :) */
1.10 misho 681: blob.hdr_var = htonl(b->blob_var);
1.7 misho 682:
683: /* receive BLOB from client */
1.10 misho 684: blob.hdr_ret = htonl(rpc_srv_recvBLOB(c, b));
1.7 misho 685: rpc_srv_blobUnmap(b);
1.5 misho 686: } else {
1.10 misho 687: blob.hdr_cmd = error;
688: blob.hdr_ret = RPC_ERROR(-1);
1.7 misho 689: }
690: break;
691: case unset:
1.11 misho 692: if (rpc_srv_unregisterBLOB(s, ntohl(blob.hdr_var)) == -1) {
1.10 misho 693: blob.hdr_cmd = error;
694: blob.hdr_ret = RPC_ERROR(-1);
1.1 misho 695: }
696: break;
1.7 misho 697: default:
1.10 misho 698: rpc_SetErr(EPROCUNAVAIL, "Unsupported BLOB command %d", blob.hdr_cmd);
699: blob.hdr_cmd = error;
700: blob.hdr_ret = RPC_ERROR(-1);
1.7 misho 701: }
1.1 misho 702:
1.7 misho 703: end:
1.17.4.6! misho 704: memcpy(AIT_ADDR(array(c->cli_buf, 0, ait_val_t*)), &blob, sizeof blob);
1.10 misho 705: schedWrite(TASK_ROOT(task), txBLOB, TASK_ARG(task), TASK_FD(task), NULL, 0);
706: schedReadSelf(task);
1.7 misho 707: return NULL;
1.2 misho 708: }
709:
710: static void *
1.17 misho 711: flushBLOB(sched_task_t *task)
712: {
713: rpc_srv_t *srv = TASK_ARG(task);
714: rpc_blob_t *b, *tmp;
715:
716: TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
717: TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
718:
719: rpc_srv_blobFree(srv, b);
720: e_free(b);
721: }
722:
723: schedSignalSelf(task);
724: return NULL;
725: }
726:
727: static void *
1.10 misho 728: acceptBLOBClients(sched_task_t *task)
1.2 misho 729: {
1.10 misho 730: rpc_srv_t *srv = TASK_ARG(task);
731: rpc_cli_t *c = NULL;
732: register int i;
1.14 misho 733: socklen_t salen = sizeof(sockaddr_t);
1.12 misho 734: #ifdef TCP_NOPUSH
735: int n = 1;
736: #endif
1.7 misho 737:
1.10 misho 738: /* check free slots for connect */
1.14 misho 739: for (i = 0; i < array_Size(srv->srv_blob.clients) &&
740: (c = array(srv->srv_blob.clients, i, rpc_cli_t*)); i++);
1.10 misho 741: if (c) /* no more free slots! */
742: goto end;
1.14 misho 743: c = e_malloc(sizeof(rpc_cli_t));
1.10 misho 744: if (!c) {
1.7 misho 745: LOGERR;
1.10 misho 746: srv->srv_kill = srv->srv_blob.kill = 1;
1.7 misho 747: return NULL;
748: } else {
1.10 misho 749: memset(c, 0, sizeof(rpc_cli_t));
1.14 misho 750: array_Set(srv->srv_blob.clients, i, c);
1.10 misho 751: c->cli_id = i;
752: c->cli_parent = srv;
1.7 misho 753: }
1.4 misho 754:
1.17.4.2 misho 755: /* init buffer(s) */
756: c->cli_buf = ait_allocVars(1);
757: if (!c->cli_buf) {
758: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
759: array_Del(srv->srv_blob.clients, i, 42);
760: goto end;
761: } else
762: AIT_SET_BUFSIZ(array(c->cli_buf, 0, ait_val_t*), 0, srv->srv_netbuf);
1.2 misho 763:
1.10 misho 764: /* accept client */
765: c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
766: if (c->cli_sock == -1) {
767: LOGERR;
1.17.4.2 misho 768: ait_freeVars(&c->cli_buf);
1.14 misho 769: array_Del(srv->srv_blob.clients, i, 42);
1.10 misho 770: goto end;
1.12 misho 771: } else {
772: #ifdef TCP_NOPUSH
773: setsockopt(c->cli_sock, IPPROTO_TCP, TCP_NOPUSH, &n, sizeof n);
774: #endif
1.10 misho 775: fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
1.12 misho 776: }
1.2 misho 777:
1.10 misho 778: schedRead(TASK_ROOT(task), rxBLOB, c, c->cli_sock, NULL, 0);
779: end:
780: schedReadSelf(task);
1.7 misho 781: return NULL;
1.1 misho 782: }
783:
1.10 misho 784: /* ------------------------------------------------------ */
1.1 misho 785:
786: /*
1.7 misho 787: * rpc_srv_initBLOBServer() - Init & create BLOB Server
788: *
1.4 misho 789: * @srv = RPC server instance
1.2 misho 790: * @Port = Port for bind server, if Port == 0 default port is selected
791: * @diskDir = Disk place for BLOB file objects
792: * return: -1 == error or 0 bind and created BLOB server instance
793: */
794: int
795: rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)
796: {
797: int n = 1;
798:
1.10 misho 799: if (!srv || srv->srv_kill) {
1.7 misho 800: rpc_SetErr(EINVAL, "Invalid parameters can`t init BLOB server");
1.2 misho 801: return -1;
802: }
803:
804: memset(&srv->srv_blob, 0, sizeof srv->srv_blob);
805: if (access(diskDir, R_OK | W_OK) == -1) {
806: LOGERR;
807: return -1;
808: } else
1.7 misho 809: AIT_SET_STR(&srv->srv_blob.dir, diskDir);
1.2 misho 810:
1.10 misho 811: /* init blob list */
812: TAILQ_INIT(&srv->srv_blob.blobs);
813:
1.2 misho 814: srv->srv_blob.server.cli_parent = srv;
1.4 misho 815:
1.14 misho 816: memcpy(&srv->srv_blob.server.cli_sa, &srv->srv_server.cli_sa, sizeof(sockaddr_t));
1.10 misho 817: switch (srv->srv_blob.server.cli_sa.sa.sa_family) {
1.4 misho 818: case AF_INET:
1.10 misho 819: srv->srv_blob.server.cli_sa.sin.sin_port =
820: htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin.sin_port) + 1);
1.4 misho 821: break;
822: case AF_INET6:
1.10 misho 823: srv->srv_blob.server.cli_sa.sin6.sin6_port =
824: htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin6.sin6_port) + 1);
1.4 misho 825: break;
826: case AF_LOCAL:
1.10 misho 827: strlcat(srv->srv_blob.server.cli_sa.sun.sun_path, ".blob",
828: sizeof srv->srv_blob.server.cli_sa.sun.sun_path);
1.4 misho 829: break;
830: default:
1.7 misho 831: AIT_FREE_VAL(&srv->srv_blob.dir);
1.4 misho 832: return -1;
1.2 misho 833: }
834:
1.4 misho 835: /* create BLOB server socket */
1.6 misho 836: srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);
1.2 misho 837: if (srv->srv_blob.server.cli_sock == -1) {
838: LOGERR;
1.7 misho 839: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 840: return -1;
841: }
842: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
843: LOGERR;
844: close(srv->srv_blob.server.cli_sock);
1.7 misho 845: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 846: return -1;
847: }
1.5 misho 848: n = srv->srv_netbuf;
849: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
850: LOGERR;
851: close(srv->srv_blob.server.cli_sock);
1.7 misho 852: AIT_FREE_VAL(&srv->srv_blob.dir);
1.5 misho 853: return -1;
854: }
855: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
856: LOGERR;
857: close(srv->srv_blob.server.cli_sock);
1.7 misho 858: AIT_FREE_VAL(&srv->srv_blob.dir);
1.5 misho 859: return -1;
860: }
1.6 misho 861: if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa,
862: srv->srv_blob.server.cli_sa.sa.sa_len) == -1) {
1.2 misho 863: LOGERR;
864: close(srv->srv_blob.server.cli_sock);
1.7 misho 865: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 866: return -1;
1.13 misho 867: } else
868: fcntl(srv->srv_blob.server.cli_sock, F_SETFL,
869: fcntl(srv->srv_blob.server.cli_sock, F_GETFL) | O_NONBLOCK);
870:
1.2 misho 871:
1.10 misho 872: /* allocate pool for concurent blob clients */
1.14 misho 873: srv->srv_blob.clients = array_Init(array_Size(srv->srv_clients));
1.2 misho 874: if (!srv->srv_blob.clients) {
1.14 misho 875: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
1.2 misho 876: close(srv->srv_blob.server.cli_sock);
1.7 misho 877: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 878: return -1;
1.10 misho 879: }
1.2 misho 880:
1.10 misho 881: /* init blob scheduler */
882: srv->srv_blob.root = schedBegin();
883: if (!srv->srv_blob.root) {
884: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1.14 misho 885: array_Destroy(&srv->srv_blob.clients);
1.10 misho 886: close(srv->srv_blob.server.cli_sock);
887: AIT_FREE_VAL(&srv->srv_blob.dir);
888: return -1;
889: }
1.2 misho 890:
891: return 0;
892: }
893:
894: /*
1.7 misho 895: * rpc_srv_endBLOBServer() - Destroy BLOB server, close all opened sockets and free resources
896: *
1.2 misho 897: * @srv = RPC Server instance
898: * return: none
899: */
1.16 misho 900: void
1.2 misho 901: rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
902: {
1.10 misho 903: if (!srv)
1.2 misho 904: return;
905:
1.10 misho 906: srv->srv_blob.kill = 1;
1.17 misho 907:
908: schedEnd(&srv->srv_blob.root);
1.2 misho 909: }
910:
911: /*
1.10 misho 912: * rpc_srv_loopBLOBServer() - Execute Main BLOB server loop and wait for clients requests
1.7 misho 913: *
1.2 misho 914: * @srv = RPC Server instance
915: * return: -1 error or 0 ok, infinite loop ...
916: */
917: int
1.10 misho 918: rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv)
1.2 misho 919: {
1.10 misho 920: rpc_cli_t *c;
1.2 misho 921: register int i;
1.10 misho 922: rpc_blob_t *b, *tmp;
923: struct timespec ts = { RPC_SCHED_POLLING, 0 };
1.2 misho 924:
1.10 misho 925: if (!srv || srv->srv_kill) {
1.7 misho 926: rpc_SetErr(EINVAL, "Invalid parameter can`t start BLOB server");
1.2 misho 927: return -1;
928: }
929:
1.14 misho 930: if (listen(srv->srv_blob.server.cli_sock, array_Size(srv->srv_blob.clients)) == -1) {
1.2 misho 931: LOGERR;
932: return -1;
1.10 misho 933: }
934:
1.17 misho 935: schedSignal(srv->srv_blob.root, flushBLOB, srv, SIGFBLOB, NULL, 0);
1.10 misho 936: if (!schedRead(srv->srv_blob.root, acceptBLOBClients, srv,
937: srv->srv_blob.server.cli_sock, NULL, 0)) {
938: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
939: return -1;
940: }
1.2 misho 941:
1.10 misho 942: schedPolling(srv->srv_blob.root, &ts, NULL);
943: /* main rpc loop */
944: schedRun(srv->srv_blob.root, &srv->srv_blob.kill);
1.7 misho 945:
1.17 misho 946: /* detach blobs */
947: TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
948: TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
949:
950: rpc_srv_blobFree(srv, b);
951: e_free(b);
952: }
953:
1.10 misho 954: /* close all clients connections & server socket */
1.14 misho 955: for (i = 0; i < array_Size(srv->srv_blob.clients); i++) {
956: c = array(srv->srv_blob.clients, i, rpc_cli_t*);
1.10 misho 957: if (c) {
958: shutdown(c->cli_sock, SHUT_RDWR);
959: close(c->cli_sock);
960:
961: schedCancelby(srv->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
1.17.4.2 misho 962: ait_freeVars(&c->cli_buf);
1.2 misho 963: }
1.14 misho 964: array_Del(srv->srv_blob.clients, i, 42);
1.10 misho 965: }
1.14 misho 966: array_Destroy(&srv->srv_blob.clients);
1.2 misho 967:
1.10 misho 968: close(srv->srv_blob.server.cli_sock);
1.2 misho 969:
1.10 misho 970: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 971: return 0;
972: }
973:
974:
975: /*
1.7 misho 976: * rpc_srv_initServer() - Init & create RPC Server
977: *
1.15 misho 978: * @InstID = Instance for authentication & recognition
1.1 misho 979: * @concurentClients = Concurent clients at same time to this server
1.10 misho 980: * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
1.4 misho 981: * @csHost = Host name or address for bind server, if NULL any address
1.1 misho 982: * @Port = Port for bind server, if Port == 0 default port is selected
1.13 misho 983: * @proto = Protocol, if == 0 choose SOCK_STREAM
1.1 misho 984: * return: NULL == error or !=NULL bind and created RPC server instance
985: */
986: rpc_srv_t *
1.15 misho 987: rpc_srv_initServer(u_char InstID, int concurentClients, int netBuf,
988: const char *csHost, u_short Port, int proto)
1.1 misho 989: {
1.10 misho 990: int n = 1;
1.1 misho 991: rpc_srv_t *srv = NULL;
1.14 misho 992: sockaddr_t sa = E_SOCKADDR_INIT;
1.1 misho 993:
1.15 misho 994: if (!concurentClients || (proto < 0 || proto > SOCK_DGRAM)) {
1.10 misho 995: rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
1.1 misho 996: return NULL;
997: }
1.14 misho 998: if (!e_gethostbyname(csHost, Port, &sa))
1.10 misho 999: return NULL;
1.1 misho 1000: if (!Port)
1001: Port = RPC_DEFPORT;
1.13 misho 1002: if (!proto)
1003: proto = SOCK_STREAM;
1.10 misho 1004: if (netBuf < RPC_MIN_BUFSIZ)
1.5 misho 1005: netBuf = BUFSIZ;
1.7 misho 1006: else
1.14 misho 1007: netBuf = E_ALIGN(netBuf, 2); /* align netBuf length */
1.10 misho 1008:
1009: #ifdef HAVE_SRANDOMDEV
1010: srandomdev();
1011: #else
1012: time_t tim;
1013:
1014: srandom((time(&tim) ^ getpid()));
1015: #endif
1.1 misho 1016:
1.14 misho 1017: srv = e_malloc(sizeof(rpc_srv_t));
1.1 misho 1018: if (!srv) {
1019: LOGERR;
1020: return NULL;
1021: } else
1022: memset(srv, 0, sizeof(rpc_srv_t));
1023:
1.13 misho 1024: srv->srv_proto = proto;
1.5 misho 1025: srv->srv_netbuf = netBuf;
1.1 misho 1026: srv->srv_session.sess_version = RPC_VERSION;
1.15 misho 1027: srv->srv_session.sess_instance = InstID;
1.1 misho 1028:
1029: srv->srv_server.cli_parent = srv;
1.10 misho 1030: memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa);
1031:
1.12 misho 1032: /* init functions */
1033: pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
1034: SLIST_INIT(&srv->srv_funcs);
1035: AVL_INIT(&srv->srv_funcs);
1.10 misho 1036:
1037: /* init scheduler */
1038: srv->srv_root = schedBegin();
1039: if (!srv->srv_root) {
1040: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1.12 misho 1041: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1.14 misho 1042: e_free(srv);
1.10 misho 1043: return NULL;
1044: }
1045:
1046: /* init pool for clients */
1.14 misho 1047: srv->srv_clients = array_Init(concurentClients);
1.10 misho 1048: if (!srv->srv_clients) {
1.14 misho 1049: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
1.10 misho 1050: schedEnd(&srv->srv_root);
1.12 misho 1051: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1.14 misho 1052: e_free(srv);
1.10 misho 1053: return NULL;
1054: }
1.4 misho 1055:
1056: /* create server socket */
1.13 misho 1057: srv->srv_server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, srv->srv_proto, 0);
1.1 misho 1058: if (srv->srv_server.cli_sock == -1) {
1059: LOGERR;
1.14 misho 1060: array_Destroy(&srv->srv_clients);
1.10 misho 1061: schedEnd(&srv->srv_root);
1.12 misho 1062: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1.14 misho 1063: e_free(srv);
1.1 misho 1064: return NULL;
1065: }
1066: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
1067: LOGERR;
1.10 misho 1068: goto err;
1.1 misho 1069: }
1.5 misho 1070: n = srv->srv_netbuf;
1071: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
1072: LOGERR;
1.10 misho 1073: goto err;
1.5 misho 1074: }
1075: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
1076: LOGERR;
1.10 misho 1077: goto err;
1.5 misho 1078: }
1.6 misho 1079: if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa,
1080: srv->srv_server.cli_sa.sa.sa_len) == -1) {
1.1 misho 1081: LOGERR;
1.10 misho 1082: goto err;
1.13 misho 1083: } else
1084: fcntl(srv->srv_server.cli_sock, F_SETFL,
1085: fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
1.1 misho 1086:
1.10 misho 1087: rpc_register_srvPing(srv);
1.8 misho 1088:
1.1 misho 1089: return srv;
1.10 misho 1090: err: /* error condition */
1091: close(srv->srv_server.cli_sock);
1.14 misho 1092: array_Destroy(&srv->srv_clients);
1.10 misho 1093: schedEnd(&srv->srv_root);
1.12 misho 1094: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1.14 misho 1095: e_free(srv);
1.10 misho 1096: return NULL;
1.1 misho 1097: }
1098:
1099: /*
1.7 misho 1100: * rpc_srv_endServer() - Destroy RPC server, close all opened sockets and free resources
1101: *
1.6 misho 1102: * @psrv = RPC Server instance
1.1 misho 1103: * return: none
1104: */
1.16 misho 1105: void
1.6 misho 1106: rpc_srv_endServer(rpc_srv_t ** __restrict psrv)
1.1 misho 1107: {
1.10 misho 1108: if (!psrv || !*psrv)
1.1 misho 1109: return;
1110:
1.10 misho 1111: /* if send kill to blob server */
1.17 misho 1112: rpc_srv_endBLOBServer(*psrv);
1.1 misho 1113:
1.10 misho 1114: (*psrv)->srv_kill = 1;
1115: sleep(RPC_SCHED_POLLING);
1.2 misho 1116:
1.17 misho 1117: schedEnd(&(*psrv)->srv_root);
1118:
1.12 misho 1119: pthread_mutex_destroy(&(*psrv)->srv_funcs.mtx);
1.14 misho 1120: e_free(*psrv);
1.6 misho 1121: *psrv = NULL;
1.1 misho 1122: }
1123:
1124: /*
1.7 misho 1125: * rpc_srv_loopServer() - Execute Main server loop and wait for clients requests
1126: *
1.1 misho 1127: * @srv = RPC Server instance
1128: * return: -1 error or 0 ok, infinite loop ...
1129: */
1130: int
1.5 misho 1131: rpc_srv_loopServer(rpc_srv_t * __restrict srv)
1.1 misho 1132: {
1.10 misho 1133: rpc_cli_t *c;
1.1 misho 1134: register int i;
1.12 misho 1135: rpc_func_t *f;
1.10 misho 1136: struct timespec ts = { RPC_SCHED_POLLING, 0 };
1.1 misho 1137:
1138: if (!srv) {
1.10 misho 1139: rpc_SetErr(EINVAL, "Invalid parameter can`t start RPC server");
1.1 misho 1140: return -1;
1141: }
1142:
1.13 misho 1143: if (srv->srv_proto == SOCK_STREAM)
1.14 misho 1144: if (listen(srv->srv_server.cli_sock, array_Size(srv->srv_clients)) == -1) {
1.13 misho 1145: LOGERR;
1146: return -1;
1147: }
1.1 misho 1148:
1.13 misho 1149: if (!schedRead(srv->srv_root, cbProto[srv->srv_proto][CB_ACCEPTCLIENT], srv,
1150: srv->srv_server.cli_sock, NULL, 0)) {
1.10 misho 1151: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1152: return -1;
1153: }
1.7 misho 1154:
1.10 misho 1155: schedPolling(srv->srv_root, &ts, NULL);
1.7 misho 1156: /* main rpc loop */
1.10 misho 1157: schedRun(srv->srv_root, &srv->srv_kill);
1158:
1159: /* close all clients connections & server socket */
1.14 misho 1160: for (i = 0; i < array_Size(srv->srv_clients); i++) {
1161: c = array(srv->srv_clients, i, rpc_cli_t*);
1.10 misho 1162: if (c) {
1163: shutdown(c->cli_sock, SHUT_RDWR);
1164: close(c->cli_sock);
1165:
1166: schedCancelby(srv->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
1.14 misho 1167: ait_freeVars(&RPC_RETVARS(c));
1.17.4.2 misho 1168: ait_freeVars(&c->cli_buf);
1.1 misho 1169: }
1.14 misho 1170: array_Del(srv->srv_clients, i, 42);
1.10 misho 1171: }
1.14 misho 1172: array_Destroy(&srv->srv_clients);
1.2 misho 1173:
1.10 misho 1174: close(srv->srv_server.cli_sock);
1.2 misho 1175:
1.10 misho 1176: /* detach exported calls */
1.12 misho 1177: RPC_FUNCS_LOCK(&srv->srv_funcs);
1178: while ((f = SLIST_FIRST(&srv->srv_funcs))) {
1179: SLIST_REMOVE_HEAD(&srv->srv_funcs, func_next);
1.1 misho 1180:
1.10 misho 1181: AIT_FREE_VAL(&f->func_name);
1.14 misho 1182: e_free(f);
1.1 misho 1183: }
1.12 misho 1184: srv->srv_funcs.avlh_root = NULL;
1185: RPC_FUNCS_UNLOCK(&srv->srv_funcs);
1.1 misho 1186:
1187: return 0;
1188: }
1189:
1190:
1191: /*
1192: * rpc_srv_execCall() Execute registered call from RPC server
1.7 misho 1193: *
1.10 misho 1194: * @cli = RPC client
1.1 misho 1195: * @rpc = IN RPC call structure
1.10 misho 1196: * @funcname = Execute RPC function
1.5 misho 1197: * @args = IN RPC calling arguments from RPC client
1.1 misho 1198: * return: -1 error, !=-1 ok
1199: */
1200: int
1.10 misho 1201: rpc_srv_execCall(rpc_cli_t * __restrict cli, struct tagRPCCall * __restrict rpc,
1202: ait_val_t funcname, array_t * __restrict args)
1.1 misho 1203: {
1204: rpc_callback_t func;
1205:
1.10 misho 1206: if (!cli || !rpc || !AIT_ADDR(&funcname)) {
1.7 misho 1207: rpc_SetErr(EINVAL, "Invalid parameter can`t exec function");
1.1 misho 1208: return -1;
1209: }
1210:
1.10 misho 1211: func = AIT_GET_LIKE(&funcname, rpc_callback_t);
1212: return func(cli, rpc, args);
1.1 misho 1213: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>