Annotation of libaitrpc/src/srv.c, revision 1.30.2.10
1.1 misho 1: /*************************************************************************
2: * (C) 2010 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.30.2.10! misho 6: * $Id: srv.c,v 1.30.2.9 2024/02/26 18:07:32 misho Exp $
1.1 misho 7: *
1.2 misho 8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
1.30.2.4 misho 15: Copyright 2004 - 2024
1.2 misho 16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
1.1 misho 46: #include "global.h"
47:
48:
1.13 misho 49: /* SOCK_STREAM */
50: static void *acceptClients(sched_task_t *);
51: static void *closeClient(sched_task_t *);
52: static void *rxPacket(sched_task_t *);
53: static void *txPacket(sched_task_t *);
54:
55: /* SOCK_DGRAM */
56: static void *freeClient(sched_task_t *);
57: static void *rxUDPPacket(sched_task_t *);
58: static void *txUDPPacket(sched_task_t *);
59:
60: /* SOCK_RAW */
1.26 misho 61: static void *rxRAWPacket(sched_task_t *);
62: static void *txRAWPacket(sched_task_t *);
1.13 misho 63:
1.24 misho 64: /* SOCK_BPF */
65: static void *rxBPFPacket(sched_task_t *);
66: static void *txBPFPacket(sched_task_t *);
67:
1.25 misho 68: /* SOCK_EXT */
69: static void *rxEXTPacket(sched_task_t *);
70: static void *txEXTPacket(sched_task_t *);
71:
72: static sched_task_func_t cbProto[SOCK_MAX_SUPPORT][4] = {
1.27 misho 73: { acceptClients, closeClient, rxPacket, txPacket }, /* SOCK_STREAM */
74: { acceptClients, closeClient, rxPacket, txPacket }, /* SOCK_STREAM */
75: { rxUDPPacket, freeClient, NULL /*rxUDPPacket*/, txUDPPacket }, /* SOCK_DGRAM */
76: { rxRAWPacket, freeClient, NULL /*rxRAWPacket*/, txRAWPacket }, /* SOCK_RAW */
77: { rxBPFPacket, freeClient, NULL /*rxBPFPacket*/, txBPFPacket }, /* SOCK_BPF */
78: { rxEXTPacket, freeClient, NULL /*rxEXTPacket*/, txEXTPacket } /* SOCK_EXT */
1.13 misho 79: };
80:
1.23 misho 81: /* Global Signal Argument when kqueue support disabled */
82:
83: static volatile uintptr_t _glSigArg = 0;
84:
1.16 misho 85: void
1.13 misho 86: rpc_freeCli(rpc_cli_t * __restrict c)
1.10 misho 87: {
88: rpc_srv_t *s = c->cli_parent;
89:
1.30.2.8 misho 90: if (s->srv_proto == SOCK_STREAM)
91: schedCancelby(s->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
1.10 misho 92:
93: /* free buffer */
94: AIT_FREE_VAL(&c->cli_buf);
95:
1.14 misho 96: array_Del(s->srv_clients, c->cli_id, 0);
1.10 misho 97: if (c)
1.14 misho 98: e_free(c);
1.13 misho 99: }
100:
101:
102: static inline int
1.14 misho 103: _check4freeslot(rpc_srv_t * __restrict srv, sockaddr_t * __restrict sa)
1.13 misho 104: {
105: rpc_cli_t *c = NULL;
106: register int i;
107:
108: /* check free slots for connect */
1.14 misho 109: for (i = 0; i < array_Size(srv->srv_clients) &&
110: (c = array(srv->srv_clients, i, rpc_cli_t*)); i++)
1.13 misho 111: /* check for duplicates */
1.14 misho 112: if (sa && !e_addrcmp(&c->cli_sa, sa, 42))
1.13 misho 113: break;
1.14 misho 114: if (i >= array_Size(srv->srv_clients))
1.13 misho 115: return -1; /* no more free slots! */
116:
117: return i;
118: }
119:
120: static rpc_cli_t *
1.14 misho 121: _allocClient(rpc_srv_t * __restrict srv, sockaddr_t * __restrict sa)
1.13 misho 122: {
123: rpc_cli_t *c = NULL;
124: int n;
125:
1.25 misho 126: if (srv->srv_proto != SOCK_EXT)
127: n = _check4freeslot(srv, sa);
128: else
129: n = 0;
1.13 misho 130: if (n == -1)
131: return NULL;
132: else
1.14 misho 133: c = array(srv->srv_clients, n, rpc_cli_t*);
1.13 misho 134:
135: if (!c) {
1.14 misho 136: c = e_malloc(sizeof(rpc_cli_t));
1.13 misho 137: if (!c) {
138: LOGERR;
139: srv->srv_kill = 1;
140: return NULL;
141: } else {
142: memset(c, 0, sizeof(rpc_cli_t));
1.14 misho 143: array_Set(srv->srv_clients, n, c);
1.13 misho 144: c->cli_id = n;
145: c->cli_parent = srv;
146: }
147:
148: /* alloc empty buffer */
1.14 misho 149: AIT_SET_BUFSIZ(&c->cli_buf, 0, srv->srv_netbuf);
1.13 misho 150: }
151:
152: return c;
153: }
154:
155:
156: static void *
157: freeClient(sched_task_t *task)
158: {
159: rpc_freeCli(TASK_ARG(task));
160:
1.27 misho 161: taskExit(task, NULL);
1.13 misho 162: }
163:
164: static void *
165: closeClient(sched_task_t *task)
166: {
167: int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock;
168:
169: rpc_freeCli(TASK_ARG(task));
170:
171: /* close client socket */
172: shutdown(sock, SHUT_RDWR);
173: close(sock);
1.27 misho 174: taskExit(task, NULL);
1.10 misho 175: }
1.7 misho 176:
177: static void *
178: txPacket(sched_task_t *task)
179: {
180: rpc_cli_t *c = TASK_ARG(task);
181: rpc_srv_t *s = c->cli_parent;
182: rpc_func_t *f = NULL;
1.18 misho 183: u_char *buf = AIT_GET_BUF(&c->cli_buf);
1.7 misho 184: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
1.27 misho 185: int ret, wlen = sizeof(struct tagRPCCall);
1.21 misho 186: #ifdef TCP_SESSION_TIMEOUT
187: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
188:
189: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
190: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
191: TASK_ARG(task), ts, TASK_ARG(task), 0);
192: #endif
1.7 misho 193:
194: if (rpc->call_argc) {
1.10 misho 195: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
1.7 misho 196: if (!f) {
1.10 misho 197: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
1.18 misho 198:
1.7 misho 199: rpc->call_argc ^= rpc->call_argc;
1.27 misho 200: RPC_SET_RETURN(rpc, -1);
201: RPC_SET_ERRNO(rpc, rpc_Errno);
202: } else if (rpc_pktFreeSpace(c) > s->srv_netbuf) {
203: rpc_SetErr(EMSGSIZE, "Message too long");
204:
205: rpc->call_argc ^= rpc->call_argc;
206: RPC_SET_RETURN(rpc, -1);
207: RPC_SET_ERRNO(rpc, rpc_Errno);
1.7 misho 208: } else {
1.25 misho 209: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
1.7 misho 210: /* Go Encapsulate variables */
1.18 misho 211: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
212: RPC_RETVARS(c));
1.7 misho 213: if (ret == -1) {
214: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
1.19 misho 215:
1.7 misho 216: rpc->call_argc ^= rpc->call_argc;
1.27 misho 217: RPC_SET_RETURN(rpc, -1);
218: RPC_SET_ERRNO(rpc, rpc_Errno);
1.7 misho 219: } else
220: wlen += ret;
221: }
222: }
223:
1.27 misho 224: /* Free return values */
225: ait_freeVars(&c->cli_vars);
226:
1.18 misho 227: rpc->call_len = htonl(wlen);
1.25 misho 228: rpc->call_io = RPC_ACK;
1.8 misho 229:
1.15 misho 230: #if 0
1.7 misho 231: /* calculate CRC */
232: rpc->call_crc ^= rpc->call_crc;
1.8 misho 233: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
1.15 misho 234: #endif
1.7 misho 235:
236: /* send reply */
1.27 misho 237: ret = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);
238: if (ret == -1) {
239: /* close connection */
240: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
241: TASK_ARG(task), 0, NULL, 0);
1.10 misho 242: }
1.7 misho 243:
1.27 misho 244: taskExit(task, NULL);
1.7 misho 245: }
246:
247: static void *
248: execCall(sched_task_t *task)
249: {
250: rpc_cli_t *c = TASK_ARG(task);
251: rpc_srv_t *s = c->cli_parent;
252: rpc_func_t *f = NULL;
253: array_t *arr = NULL;
1.18 misho 254: u_char *buf = AIT_GET_BUF(&c->cli_buf);
1.7 misho 255: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
1.25 misho 256: int argc = rpc->call_argc;
1.7 misho 257:
258: /* Go decapsulate variables ... */
1.8 misho 259: if (argc) {
1.14 misho 260: arr = ait_buffer2vars(buf + sizeof(struct tagRPCCall),
1.18 misho 261: AIT_LEN(&c->cli_buf) - sizeof(struct tagRPCCall), argc, 42);
1.7 misho 262: if (!arr) {
1.14 misho 263: rpc_SetErr(ERPCMISMATCH, "#%d - %s", elwix_GetErrno(), elwix_GetError());
1.18 misho 264:
1.7 misho 265: rpc->call_argc ^= rpc->call_argc;
1.27 misho 266: RPC_SET_RETURN(rpc, -1);
267: RPC_SET_ERRNO(rpc, rpc_Errno);
268: taskExit(task, NULL);
1.7 misho 269: }
1.10 misho 270: } else
271: arr = NULL;
1.7 misho 272:
1.10 misho 273: if (!(f = rpc_srv_getCall(s, ntohs(rpc->call_tag)))) {
1.7 misho 274: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
1.18 misho 275:
1.7 misho 276: rpc->call_argc ^= rpc->call_argc;
1.27 misho 277: RPC_SET_RETURN(rpc, -1);
278: RPC_SET_ERRNO(rpc, rpc_Errno);
1.7 misho 279: } else {
1.8 misho 280: /* if client doesn't want reply */
1.27 misho 281: RPC_SET_RETURN(rpc, rpc_srv_execCall(c, rpc, f->func_name, arr));
1.7 misho 282: if (rpc->call_rep.ret == htonl(-1)) {
1.20 misho 283: if (!rpc->call_rep.eno) {
284: LOGERR;
1.27 misho 285: RPC_SET_ERRNO(rpc, rpc_Errno);
1.20 misho 286: }
1.7 misho 287: rpc->call_argc ^= rpc->call_argc;
1.27 misho 288: ait_freeVars(&c->cli_vars);
1.7 misho 289: } else {
290: rpc->call_rep.eno ^= rpc->call_rep.eno;
1.27 misho 291: rpc->call_argc ^= rpc->call_argc;
292: if (TASK_VAL(task)) {
1.8 misho 293: /* without reply */
1.14 misho 294: ait_freeVars(&c->cli_vars);
1.30.2.6 misho 295: } else if (rpc->call_io & RPC_REQ) {
1.10 misho 296: /* reply */
1.25 misho 297: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
1.10 misho 298: }
1.7 misho 299: }
300: }
301:
1.14 misho 302: array_Destroy(&arr);
1.27 misho 303: taskExit(task, NULL);
1.7 misho 304: }
305:
306: static void *
307: rxPacket(sched_task_t *task)
308: {
309: rpc_cli_t *c = TASK_ARG(task);
310: rpc_srv_t *s = c->cli_parent;
1.27 misho 311: int len, noreply = 0, rlen = AIT_LEN(&c->cli_buf);
1.15 misho 312: #if 0
313: u_short crc;
314: #endif
1.10 misho 315: u_char *buf = AIT_GET_BUF(&c->cli_buf);
1.29 misho 316: struct tagRPCCall b, *rpc = (struct tagRPCCall*) buf;
1.21 misho 317: #ifdef TCP_SESSION_TIMEOUT
318: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
319:
320: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
321: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
322: TASK_ARG(task), ts, TASK_ARG(task), 0);
323: #endif
1.7 misho 324:
1.27 misho 325: /* prepare rx */
1.29 misho 326: len = recv(TASK_FD(task), &b, sizeof b, MSG_PEEK);
1.30.2.7 misho 327: if (len < 1) {
328: /* close connection */
329: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
330: TASK_ARG(task), 0, NULL, 0);
331: taskExit(task, NULL);
332: } else if (len == sizeof b)
1.29 misho 333: rlen = ntohl(b.call_len);
1.30.2.7 misho 334: else
335: goto end;
1.27 misho 336:
337: rlen = recv(TASK_FD(task), buf, rlen, 0);
338: if (rlen == -1) {
1.10 misho 339: /* close connection */
1.13 misho 340: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
341: TASK_ARG(task), 0, NULL, 0);
1.27 misho 342: taskExit(task, NULL);
1.18 misho 343: }
1.27 misho 344: if (rlen < sizeof(struct tagRPCCall)) {
345: rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
1.18 misho 346:
1.27 misho 347: rpc->call_argc ^= rpc->call_argc;
348: RPC_SET_RETURN(rpc, -1);
349: RPC_SET_ERRNO(rpc, rpc_Errno);
350: goto err;
351: } else
352: len = ntohl(rpc->call_len);
353: if (rlen < len || len > AIT_LEN(&c->cli_buf)) {
354: rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
355:
356: rpc->call_argc ^= rpc->call_argc;
357: RPC_SET_RETURN(rpc, -1);
358: RPC_SET_ERRNO(rpc, rpc_Errno);
359: goto err;
1.18 misho 360: }
1.8 misho 361:
1.25 misho 362: /* skip loop packet */
363: if (rpc->call_io & RPC_ACK) {
364: schedReadSelf(task);
1.27 misho 365: taskExit(task, NULL);
1.25 misho 366: }
367:
1.15 misho 368: #if 0
1.18 misho 369: /* check integrity of packet */
370: crc = ntohs(rpc->call_crc);
371: rpc->call_crc ^= rpc->call_crc;
372: if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
373: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
1.27 misho 374:
375: rpc->call_argc ^= rpc->call_argc;
376: RPC_SET_RETURN(rpc, -1);
377: RPC_SET_ERRNO(rpc, rpc_Errno);
378: goto err;
1.18 misho 379: }
1.15 misho 380: #endif
1.7 misho 381:
1.18 misho 382: /* check RPC packet session info */
383: if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) {
384: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
1.7 misho 385:
1.18 misho 386: rpc->call_argc ^= rpc->call_argc;
1.27 misho 387: RPC_SET_RETURN(rpc, -1);
388: RPC_SET_ERRNO(rpc, rpc_Errno);
389: goto err;
1.18 misho 390: }
1.7 misho 391:
1.27 misho 392: noreply = RPC_CHK_NOREPLY(rpc);
393:
394: /* execute RPC call */
395: schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), (int) noreply, rpc, len);
396: err:
1.18 misho 397: /* send RPC reply */
1.30.2.3 misho 398: if (!noreply && (rpc->call_io & RPC_REQ))
1.18 misho 399: schedWrite(TASK_ROOT(task), cbProto[s->srv_proto][CB_TXPACKET],
400: TASK_ARG(task), TASK_FD(task), rpc, len);
1.30.2.7 misho 401: end:
1.7 misho 402: /* lets get next packet */
1.18 misho 403: schedReadSelf(task);
1.27 misho 404: taskExit(task, NULL);
1.7 misho 405: }
406:
1.1 misho 407: static void *
1.10 misho 408: acceptClients(sched_task_t *task)
1.1 misho 409: {
1.10 misho 410: rpc_srv_t *srv = TASK_ARG(task);
411: rpc_cli_t *c = NULL;
1.29 misho 412: socklen_t salen = E_SOCKADDR_MAX;
1.21 misho 413: int sock;
414: #ifdef TCP_SESSION_TIMEOUT
415: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
416: #endif
1.7 misho 417:
1.13 misho 418: c = _allocClient(srv, NULL);
1.21 misho 419: if (!c) {
420: EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
421: if ((sock = accept(TASK_FD(task), NULL, NULL)) != -1) {
422: shutdown(sock, SHUT_RDWR);
423: close(sock);
424: }
1.10 misho 425: goto end;
1.21 misho 426: }
1.10 misho 427:
428: /* accept client */
429: c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
430: if (c->cli_sock == -1) {
431: LOGERR;
432: AIT_FREE_VAL(&c->cli_buf);
1.14 misho 433: array_Del(srv->srv_clients, c->cli_id, 42);
1.10 misho 434: goto end;
1.30 misho 435: } else {
1.10 misho 436: fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
1.30 misho 437: fcntl(c->cli_sock, F_SETFD, FD_CLOEXEC);
438: }
1.1 misho 439:
1.21 misho 440: #ifdef TCP_SESSION_TIMEOUT
441: /* armed timer for close stateless connection */
442: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
443: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], c,
444: ts, c, 0);
445: #endif
1.13 misho 446: schedRead(TASK_ROOT(task), cbProto[srv->srv_proto][CB_RXPACKET], c,
447: c->cli_sock, NULL, 0);
1.10 misho 448: end:
449: schedReadSelf(task);
1.27 misho 450: taskExit(task, NULL);
1.10 misho 451: }
1.5 misho 452:
1.7 misho 453:
1.10 misho 454: static void *
1.13 misho 455: txUDPPacket(sched_task_t *task)
1.10 misho 456: {
457: rpc_cli_t *c = TASK_ARG(task);
458: rpc_srv_t *s = c->cli_parent;
1.13 misho 459: rpc_func_t *f = NULL;
1.18 misho 460: u_char *buf = AIT_GET_BUF(&c->cli_buf);
1.13 misho 461: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
1.27 misho 462: int ret, wlen = sizeof(struct tagRPCCall);
1.13 misho 463: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
464:
465: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
466: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
467: TASK_ARG(task), ts, TASK_ARG(task), 0);
468:
469: if (rpc->call_argc) {
470: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
471: if (!f) {
472: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
1.27 misho 473:
1.13 misho 474: rpc->call_argc ^= rpc->call_argc;
1.27 misho 475: RPC_SET_RETURN(rpc, -1);
476: RPC_SET_ERRNO(rpc, rpc_Errno);
477: } else if (rpc_pktFreeSpace(c) > s->srv_netbuf) {
478: rpc_SetErr(EMSGSIZE, "Message too long");
479:
480: rpc->call_argc ^= rpc->call_argc;
481: RPC_SET_RETURN(rpc, -1);
482: RPC_SET_ERRNO(rpc, rpc_Errno);
1.13 misho 483: } else {
1.25 misho 484: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
1.13 misho 485: /* Go Encapsulate variables */
1.18 misho 486: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
487: RPC_RETVARS(c));
1.13 misho 488: if (ret == -1) {
489: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
1.27 misho 490:
1.13 misho 491: rpc->call_argc ^= rpc->call_argc;
1.27 misho 492: RPC_SET_RETURN(rpc, -1);
493: RPC_SET_ERRNO(rpc, rpc_Errno);
1.13 misho 494: } else
495: wlen += ret;
496: }
497: }
1.7 misho 498:
1.27 misho 499: /* Free return values */
500: ait_freeVars(&c->cli_vars);
501:
1.18 misho 502: rpc->call_len = htonl(wlen);
1.25 misho 503: rpc->call_io = RPC_ACK;
1.7 misho 504:
1.13 misho 505: /* calculate CRC */
506: rpc->call_crc ^= rpc->call_crc;
507: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
508:
509: /* send reply */
1.27 misho 510: ret = sendto(TASK_FD(task), buf, wlen, MSG_NOSIGNAL,
1.29 misho 511: &c->cli_sa.sa, e_addrlen(&c->cli_sa));
1.27 misho 512: if (ret == -1) {
513: /* close connection */
514: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
515: TASK_ARG(task), 0, NULL, 0);
1.13 misho 516: }
517:
1.27 misho 518: taskExit(task, NULL);
1.13 misho 519: }
520:
521: static void *
522: rxUDPPacket(sched_task_t *task)
523: {
524: rpc_srv_t *srv = TASK_ARG(task);
525: rpc_cli_t *c = NULL;
1.27 misho 526: int len, noreply = 0, rlen;
1.22 misho 527: u_short crc;
1.27 misho 528: struct tagRPCCall *rpc;
1.14 misho 529: sockaddr_t sa;
1.29 misho 530: socklen_t salen = E_SOCKADDR_MAX;
1.13 misho 531: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
1.27 misho 532: ait_val_t b = AIT_VAL_INIT;
1.13 misho 533:
534: /* receive connect packet */
1.27 misho 535: AIT_SET_BUF(&b, NULL, srv->srv_netbuf);
1.29 misho 536: #ifndef __linux__
537: sa.ss.ss_len = salen;
538: #endif
1.27 misho 539: rlen = recvfrom(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b), 0, &sa.sa, &salen);
540: rpc = (struct tagRPCCall*) AIT_GET_BUF(&b);
541: if (rlen < sizeof(struct tagRPCCall))
542: goto end;
543: else
544: len = ntohl(rpc->call_len);
545: if (rlen < len || len > srv->srv_netbuf)
546: goto end;
547:
548: /* skip loop packet */
549: if (rpc->call_io & RPC_ACK)
550: goto end;
551:
552: /* check integrity of packet */
553: crc = ntohs(rpc->call_crc);
554: rpc->call_crc ^= rpc->call_crc;
1.30.2.2 misho 555: if (crc != crcFletcher16((u_short*) AIT_GET_BUF(&b), len / 2))
1.27 misho 556: goto end;
557:
558: /* check RPC packet session info */
559: if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session))
1.13 misho 560: goto end;
561:
562: c = _allocClient(srv, &sa);
1.22 misho 563: if (!c) {
1.30.2.9 misho 564: EVERBOSE(1, "RPC client quota exceeded!");
1.22 misho 565: usleep(2000); /* blocked client delay */
1.13 misho 566: goto end;
1.22 misho 567: } else {
1.27 misho 568: memcpy(AIT_GET_BUF(&c->cli_buf), AIT_GET_BUF(&b), len);
1.22 misho 569: rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
570:
1.13 misho 571: c->cli_sock = TASK_FD(task);
572: memcpy(&c->cli_sa, &sa, sizeof c->cli_sa);
1.22 misho 573:
1.13 misho 574: /* armed timer for close stateless connection */
575: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
576: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
577: c, ts, c, 0);
578: }
579:
1.24 misho 580: noreply = RPC_CHK_NOREPLY(rpc);
581:
1.27 misho 582: /* execute RPC call */
583: schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
1.24 misho 584:
585: /* send RPC reply */
1.30.2.3 misho 586: if (!noreply && (rpc->call_io & RPC_REQ))
1.24 misho 587: schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],
588: c, TASK_FD(task), rpc, len);
589: end:
1.27 misho 590: AIT_FREE_VAL(&b);
1.24 misho 591: schedReadSelf(task);
1.27 misho 592: taskExit(task, NULL);
1.24 misho 593: }
594:
595:
596: static void *
1.26 misho 597: txRAWPacket(sched_task_t *task)
598: {
599: rpc_cli_t *c = TASK_ARG(task);
600: rpc_srv_t *s = c->cli_parent;
601: rpc_func_t *f = NULL;
602: u_char *buf = AIT_GET_BUF(&c->cli_buf);
603: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
1.27 misho 604: int ret, wlen = sizeof(struct tagRPCCall);
1.26 misho 605: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
606:
607: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
608: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
609: TASK_ARG(task), ts, TASK_ARG(task), 0);
610:
611: if (rpc->call_argc) {
612: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
613: if (!f) {
614: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
1.27 misho 615:
1.26 misho 616: rpc->call_argc ^= rpc->call_argc;
1.27 misho 617: RPC_SET_RETURN(rpc, -1);
618: RPC_SET_ERRNO(rpc, rpc_Errno);
619: } else if (rpc_pktFreeSpace(c) > s->srv_netbuf) {
620: rpc_SetErr(EMSGSIZE, "Message too long");
621:
622: rpc->call_argc ^= rpc->call_argc;
623: RPC_SET_RETURN(rpc, -1);
624: RPC_SET_ERRNO(rpc, rpc_Errno);
1.26 misho 625: } else {
626: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
627: /* Go Encapsulate variables */
628: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
629: RPC_RETVARS(c));
630: if (ret == -1) {
631: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
1.27 misho 632:
1.26 misho 633: rpc->call_argc ^= rpc->call_argc;
1.27 misho 634: RPC_SET_RETURN(rpc, -1);
635: RPC_SET_ERRNO(rpc, rpc_Errno);
1.26 misho 636: } else
637: wlen += ret;
638: }
639: }
640:
1.27 misho 641: /* Free return values */
642: ait_freeVars(&c->cli_vars);
643:
1.26 misho 644: rpc->call_len = htonl(wlen);
645: rpc->call_io = RPC_ACK;
646:
647: /* calculate CRC */
648: rpc->call_crc ^= rpc->call_crc;
649: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
650:
651: /* send reply */
1.27 misho 652: ret = sendto(TASK_FD(task), buf, wlen, MSG_NOSIGNAL,
1.29 misho 653: &c->cli_sa.sa, e_addrlen(&c->cli_sa));
1.27 misho 654: if (ret == -1) {
655: /* close connection */
656: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
657: TASK_ARG(task), 0, NULL, 0);
1.26 misho 658: }
659:
1.27 misho 660: taskExit(task, NULL);
1.26 misho 661: }
662:
663: static void *
664: rxRAWPacket(sched_task_t *task)
665: {
666: rpc_srv_t *srv = TASK_ARG(task);
667: rpc_cli_t *c = NULL;
1.27 misho 668: int len, noreply = 0, rlen;
1.26 misho 669: u_short crc;
670: struct tagRPCCall *rpc;
671: sockaddr_t sa;
1.29 misho 672: socklen_t salen = E_SOCKADDR_MAX;
1.26 misho 673: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
1.27 misho 674: ait_val_t b = AIT_VAL_INIT;
1.26 misho 675:
676: /* receive connect packet */
1.27 misho 677: AIT_SET_BUF(&b, NULL, srv->srv_netbuf);
1.29 misho 678: #ifndef __linux__
679: sa.ss.ss_len = salen;
680: #endif
1.27 misho 681: rlen = recvfrom(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b), 0, &sa.sa, &salen);
682: if (sa.sa.sa_family == AF_INET) {
683: struct ip *h;
684: h = (struct ip*) AIT_GET_BUF(&b);
685: if (rlen < ntohs(h->ip_len) || h->ip_p != IPPROTO_ERPC)
686: goto end;
687: else {
688: rlen -= sizeof(struct ip);
689: rpc = (struct tagRPCCall*) (h + 1);
690: }
691: } else {
1.29 misho 692: #ifdef IPV6_REMOVE_HEADER
1.27 misho 693: struct ip6_hdr *h;
694: h = (struct ip6_hdr*) AIT_GET_BUF(&b);
1.29 misho 695: if (rlen < ntohs(h->ip6_plen) || h->ip6_nxt != IPPROTO_ERPC)
1.27 misho 696: goto end;
697: else {
698: rlen -= sizeof(struct ip6_hdr);
699: rpc = (struct tagRPCCall*) (h + 1);
700: }
1.29 misho 701: #else
702: rpc = (struct tagRPCCall*) AIT_GET_BUF(&b);
703: #endif
1.27 misho 704: }
705: if (rlen < sizeof(struct tagRPCCall))
706: goto end;
707: else
708: len = ntohl(rpc->call_len);
709: if (rlen < len || len > srv->srv_netbuf)
710: goto end;
711:
712: /* skip loop packet */
713: if (rpc->call_io & RPC_ACK)
714: goto end;
715:
716: /* check integrity of packet */
717: crc = ntohs(rpc->call_crc);
718: rpc->call_crc ^= rpc->call_crc;
1.30.2.2 misho 719: if (crc != crcFletcher16((u_short*) AIT_GET_BUF(&b), len / 2))
1.27 misho 720: goto end;
721:
722: /* check RPC packet session info */
723: if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session))
1.26 misho 724: goto end;
725:
726: c = _allocClient(srv, &sa);
727: if (!c) {
728: EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
729: usleep(2000); /* blocked client delay */
730: goto end;
731: } else {
1.27 misho 732: memcpy(AIT_GET_BUF(&c->cli_buf), rpc, len);
733: rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
1.26 misho 734:
735: c->cli_sock = TASK_FD(task);
736: memcpy(&c->cli_sa, &sa, sizeof c->cli_sa);
737:
738: /* armed timer for close stateless connection */
739: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
740: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
741: c, ts, c, 0);
742: }
743:
744: noreply = RPC_CHK_NOREPLY(rpc);
745:
1.27 misho 746: /* execute RPC call */
747: schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
1.26 misho 748:
749: /* send RPC reply */
1.30.2.3 misho 750: if (!noreply && (rpc->call_io & RPC_REQ))
1.26 misho 751: schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],
752: c, TASK_FD(task), rpc, len);
753: end:
1.27 misho 754: AIT_FREE_VAL(&b);
1.26 misho 755: schedReadSelf(task);
1.27 misho 756: taskExit(task, NULL);
1.26 misho 757: }
758:
759:
760: static void *
1.24 misho 761: txBPFPacket(sched_task_t *task)
762: {
1.29 misho 763: #ifndef __linux__
1.24 misho 764: rpc_cli_t *c = TASK_ARG(task);
765: rpc_srv_t *s = c->cli_parent;
766: rpc_func_t *f = NULL;
767: u_char *buf = AIT_GET_BUF(&c->cli_buf);
768: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
1.27 misho 769: int ret, wlen = sizeof(struct tagRPCCall);
1.24 misho 770: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
771: struct ether_header *eh;
772: ait_val_t b = AIT_VAL_INIT;
773:
774: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
775: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
776: TASK_ARG(task), ts, TASK_ARG(task), 0);
777:
778: if (rpc->call_argc) {
779: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
780: if (!f) {
781: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
1.27 misho 782:
1.24 misho 783: rpc->call_argc ^= rpc->call_argc;
1.27 misho 784: RPC_SET_RETURN(rpc, -1);
785: RPC_SET_ERRNO(rpc, rpc_Errno);
786: } else if (rpc_pktFreeSpace(c) > s->srv_netbuf) {
787: rpc_SetErr(EMSGSIZE, "Message too long");
788:
789: rpc->call_argc ^= rpc->call_argc;
790: RPC_SET_RETURN(rpc, -1);
791: RPC_SET_ERRNO(rpc, rpc_Errno);
1.24 misho 792: } else {
1.25 misho 793: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
1.24 misho 794: /* Go Encapsulate variables */
795: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
796: RPC_RETVARS(c));
797: if (ret == -1) {
798: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
1.27 misho 799:
1.24 misho 800: rpc->call_argc ^= rpc->call_argc;
1.27 misho 801: RPC_SET_RETURN(rpc, -1);
802: RPC_SET_ERRNO(rpc, rpc_Errno);
1.24 misho 803: } else
804: wlen += ret;
805: }
806: }
807:
1.27 misho 808: /* Free return values */
809: ait_freeVars(&RPC_RETVARS(c));
810:
1.24 misho 811: rpc->call_len = htonl(wlen);
1.25 misho 812: rpc->call_io = RPC_ACK;
1.24 misho 813:
814: /* calculate CRC */
815: rpc->call_crc ^= rpc->call_crc;
816: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
817:
818: /* send reply */
1.27 misho 819: AIT_SET_BUF(&b, NULL, wlen + ETHER_HDR_LEN);
1.24 misho 820: eh = (struct ether_header*) AIT_GET_BUF(&b);
821: memcpy(eh->ether_dhost, LLADDR(&c->cli_sa.sdl), ETHER_ADDR_LEN);
822: eh->ether_type = htons(RPC_DEFPORT);
1.27 misho 823: memcpy(eh + 1, buf, wlen);
1.24 misho 824:
825: ret = write(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b));
826: AIT_FREE_VAL(&b);
827: if (ret == -1) {
828: /* close connection */
829: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
830: TASK_ARG(task), 0, NULL, 0);
1.22 misho 831: }
1.29 misho 832: #else
833: rpc_SetErr(ENOTSUP, "Feature isn't supported on Linux!");
834: #endif
1.13 misho 835:
1.27 misho 836: taskExit(task, NULL);
1.24 misho 837: }
838:
839: static void *
840: rxBPFPacket(sched_task_t *task)
841: {
1.29 misho 842: #ifndef __linux__
1.24 misho 843: rpc_srv_t *srv = TASK_ARG(task);
844: rpc_cli_t *c = NULL;
845: int len, rlen, noreply;
846: u_short crc;
847: struct tagRPCCall *rpc;
848: sockaddr_t sa;
849: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
850: struct bpf_hdr *h;
851: struct ether_header *eh;
852: ait_val_t b = AIT_VAL_INIT;
853:
854: /* receive connect packet */
855: AIT_SET_BUF(&b, NULL, srv->srv_netbuf);
856: rlen = read(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b));
857: h = (struct bpf_hdr*) AIT_GET_BUF(&b);
858: rlen -= h->bh_hdrlen;
859: if (rlen < h->bh_datalen || h->bh_caplen != h->bh_datalen ||
1.27 misho 860: rlen < ETHER_HDR_LEN + sizeof(struct tagRPCCall))
1.24 misho 861: goto end;
1.27 misho 862: else {
1.24 misho 863: rlen = h->bh_caplen;
864: eh = (struct ether_header*) (AIT_GET_BUF(&b) + h->bh_hdrlen);
865: rlen -= ETHER_HDR_LEN;
866: rpc = (struct tagRPCCall*) (eh + 1);
1.25 misho 867:
1.24 misho 868: if (eh->ether_type != ntohs(RPC_DEFPORT))
869: goto end;
870: else
871: e_getlinkbymac((const ether_addr_t*) eh->ether_shost, &sa);
872: }
1.27 misho 873: if (rlen < sizeof(struct tagRPCCall))
874: goto end;
875: else
876: len = ntohl(rpc->call_len);
877: if (rlen < len || len > srv->srv_netbuf)
878: goto end;
879:
880: #ifdef CHECK_ETHACK
881: /* skip loop packet */
882: if (rpc->call_io & RPC_ACK)
883: goto end;
884: #endif
885:
886: /* check integrity of packet */
887: crc = ntohs(rpc->call_crc);
888: rpc->call_crc ^= rpc->call_crc;
889: if (crc != crcFletcher16((u_short*) rpc, len / 2))
890: goto end;
891:
892: /* check RPC packet session info */
893: if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session))
894: goto end;
1.24 misho 895:
896: c = _allocClient(srv, &sa);
897: if (!c) {
898: EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
899: usleep(2000); /* blocked client delay */
900: goto end;
901: } else {
1.27 misho 902: memcpy(AIT_GET_BUF(&c->cli_buf), rpc, len);
1.24 misho 903: rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
904:
905: c->cli_sock = TASK_FD(task);
906: memcpy(&c->cli_sa, &sa, sizeof c->cli_sa);
907:
908: /* armed timer for close stateless connection */
909: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
910: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
911: c, ts, c, 0);
912: }
913:
1.22 misho 914: noreply = RPC_CHK_NOREPLY(rpc);
1.18 misho 915:
1.27 misho 916: /* execute RPC call */
917: schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
1.13 misho 918:
1.22 misho 919: /* send RPC reply */
1.30.2.3 misho 920: if (!noreply && (rpc->call_io & RPC_REQ))
1.24 misho 921: schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],
1.22 misho 922: c, TASK_FD(task), rpc, len);
1.13 misho 923: end:
1.24 misho 924: AIT_FREE_VAL(&b);
1.13 misho 925: schedReadSelf(task);
1.29 misho 926: #else
927: rpc_SetErr(ENOTSUP, "Feature isn't supported on Linux!");
928: #endif
929:
1.27 misho 930: taskExit(task, NULL);
1.13 misho 931: }
932:
1.25 misho 933:
934: static void *
935: txEXTPacket(sched_task_t *task)
936: {
937: rpc_cli_t *c = TASK_ARG(task);
938: rpc_srv_t *s = c->cli_parent;
939: rpc_func_t *f = NULL;
940: u_char *buf = AIT_GET_BUF(&c->cli_buf);
941: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
1.27 misho 942: int ret, wlen = sizeof(struct tagRPCCall);
1.25 misho 943: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
944:
945: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
946: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
947: TASK_ARG(task), ts, TASK_ARG(task), 0);
948:
949: if (rpc->call_argc) {
950: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
951: if (!f) {
952: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
1.27 misho 953:
954: rpc->call_argc ^= rpc->call_argc;
955: RPC_SET_RETURN(rpc, -1);
956: RPC_SET_ERRNO(rpc, rpc_Errno);
957: } else if (rpc_pktFreeSpace(c) > s->srv_netbuf) {
958: rpc_SetErr(EMSGSIZE, "Message too long");
959:
1.25 misho 960: rpc->call_argc ^= rpc->call_argc;
1.27 misho 961: RPC_SET_RETURN(rpc, -1);
962: RPC_SET_ERRNO(rpc, rpc_Errno);
1.25 misho 963: } else {
964: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
965: /* Go Encapsulate variables */
966: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
967: RPC_RETVARS(c));
968: if (ret == -1) {
969: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
1.27 misho 970:
1.25 misho 971: rpc->call_argc ^= rpc->call_argc;
1.27 misho 972: RPC_SET_RETURN(rpc, -1);
973: RPC_SET_ERRNO(rpc, rpc_Errno);
1.25 misho 974: } else
975: wlen += ret;
976: }
977: }
978:
1.27 misho 979: /* Free return values */
980: ait_freeVars(&RPC_RETVARS(c));
981:
1.25 misho 982: rpc->call_len = htonl(wlen);
983: rpc->call_io = RPC_ACK;
984:
985: /* send reply */
1.27 misho 986: ret = write(TASK_FD(task), buf, wlen);
1.25 misho 987: if (ret == -1) {
988: /* close connection */
989: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
990: TASK_ARG(task), 0, NULL, 0);
991: }
992:
1.27 misho 993: taskExit(task, NULL);
1.25 misho 994: }
995:
996: static void *
997: rxEXTPacket(sched_task_t *task)
998: {
999: rpc_srv_t *srv = TASK_ARG(task);
1000: rpc_cli_t *c = NULL;
1.27 misho 1001: int len, noreply = 0, rlen = AIT_LEN(&c->cli_buf);
1.25 misho 1002: struct tagRPCCall *rpc;
1003: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
1.27 misho 1004: sockaddr_t sa;
1.25 misho 1005: ait_val_t b = AIT_VAL_INIT;
1006:
1007: memset(&sa, 0, sizeof sa);
1008: /* receive connect packet */
1009: AIT_SET_BUF(&b, NULL, srv->srv_netbuf);
1010: rlen = read(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b));
1.27 misho 1011: rpc = (struct tagRPCCall*) AIT_GET_BUF(&b);
1012: if (rlen < sizeof(struct tagRPCCall))
1013: goto end;
1014: else
1015: len = ntohl(rpc->call_len);
1016: if (rlen < len || len > srv->srv_netbuf)
1017: goto end;
1018:
1019: /* skip loop packet */
1020: if (rpc->call_io & RPC_ACK)
1.25 misho 1021: goto end;
1022:
1.27 misho 1023: /* check RPC packet session info */
1024: if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session))
1025: goto end;
1.25 misho 1026:
1027: c = _allocClient(srv, &sa);
1028: if (!c) {
1029: EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
1030: usleep(2000); /* blocked client delay */
1031: goto end;
1032: } else {
1.27 misho 1033: memcpy(AIT_GET_BUF(&c->cli_buf), AIT_GET_BUF(&b), len);
1.25 misho 1034: rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
1035:
1036: c->cli_sock = TASK_FD(task);
1037:
1038: /* armed timer for close stateless connection */
1039: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
1040: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
1041: c, ts, c, 0);
1042: }
1043:
1044: noreply = RPC_CHK_NOREPLY(rpc);
1045:
1.27 misho 1046: /* execute RPC call */
1047: schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
1.25 misho 1048:
1049: /* send RPC reply */
1.30.2.3 misho 1050: if (!noreply && (rpc->call_io & RPC_REQ))
1.25 misho 1051: schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],
1052: c, TASK_FD(task), rpc, len);
1053: end:
1054: AIT_FREE_VAL(&b);
1055: schedReadSelf(task);
1.27 misho 1056: taskExit(task, NULL);
1.25 misho 1057: }
1058:
1.13 misho 1059: /* ------------------------------------------------------ */
1060:
1.16 misho 1061: void
1.13 misho 1062: rpc_freeBLOBCli(rpc_cli_t * __restrict c)
1063: {
1064: rpc_srv_t *s = c->cli_parent;
1065:
1066: schedCancelby(s->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
1.10 misho 1067:
1068: /* free buffer */
1069: AIT_FREE_VAL(&c->cli_buf);
1070:
1.14 misho 1071: array_Del(s->srv_blob.clients, c->cli_id, 0);
1.10 misho 1072: if (c)
1.14 misho 1073: e_free(c);
1.13 misho 1074: }
1075:
1076:
1077: static void *
1078: closeBLOBClient(sched_task_t *task)
1079: {
1080: int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock;
1081:
1082: rpc_freeBLOBCli(TASK_ARG(task));
1083:
1084: /* close client socket */
1085: shutdown(sock, SHUT_RDWR);
1086: close(sock);
1.27 misho 1087: taskExit(task, NULL);
1.7 misho 1088: }
1089:
1090: static void *
1091: txBLOB(sched_task_t *task)
1092: {
1.10 misho 1093: rpc_cli_t *c = TASK_ARG(task);
1094: u_char *buf = AIT_GET_BUF(&c->cli_buf);
1.7 misho 1095: int wlen = sizeof(struct tagBLOBHdr);
1096:
1097: /* send reply */
1.10 misho 1098: wlen = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);
1099: if (wlen == -1 || wlen != sizeof(struct tagBLOBHdr)) {
1100: /* close blob connection */
1101: schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
1102: }
1.7 misho 1103:
1.27 misho 1104: taskExit(task, NULL);
1.7 misho 1105: }
1.4 misho 1106:
1.7 misho 1107: static void *
1108: rxBLOB(sched_task_t *task)
1109: {
1110: rpc_cli_t *c = TASK_ARG(task);
1111: rpc_srv_t *s = c->cli_parent;
1112: rpc_blob_t *b;
1.10 misho 1113: struct tagBLOBHdr blob;
1.7 misho 1114: int rlen;
1115:
1.10 misho 1116: memset(&blob, 0, sizeof blob);
1117: rlen = recv(TASK_FD(task), &blob, sizeof blob, 0);
1118: if (rlen < 1) {
1119: /* close blob connection */
1120: schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
1.27 misho 1121: taskExit(task, NULL);
1.7 misho 1122: }
1.5 misho 1123:
1.10 misho 1124: /* check BLOB packet */
1125: if (rlen < sizeof(struct tagBLOBHdr)) {
1126: rpc_SetErr(ERPCMISMATCH, "Short BLOB packet");
1.6 misho 1127:
1.10 misho 1128: schedReadSelf(task);
1.27 misho 1129: taskExit(task, NULL);
1.7 misho 1130: }
1.1 misho 1131:
1.7 misho 1132: /* check RPC packet session info */
1.15 misho 1133: if (rpc_chkPktSession(&blob.hdr_session, &s->srv_session)) {
1.7 misho 1134: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
1.10 misho 1135: blob.hdr_cmd = error;
1.7 misho 1136: goto end;
1137: }
1138:
1139: /* Go to proceed packet ... */
1.10 misho 1140: switch (blob.hdr_cmd) {
1.7 misho 1141: case get:
1.10 misho 1142: if (!(b = rpc_srv_getBLOB(s, ntohl(blob.hdr_var)))) {
1143: rpc_SetErr(EINVAL, "Var=%x not found", ntohl(blob.hdr_var));
1144: blob.hdr_cmd = no;
1.27 misho 1145: RPC_SET_BLOB_RET(&blob, -1);
1.7 misho 1146: break;
1147: } else
1.10 misho 1148: blob.hdr_len = htonl(b->blob_len);
1.5 misho 1149:
1.7 misho 1150: if (rpc_srv_blobMap(s, b) != -1) {
1151: /* deliver BLOB variable to client */
1.10 misho 1152: blob.hdr_ret = htonl(rpc_srv_sendBLOB(c, b));
1.7 misho 1153: rpc_srv_blobUnmap(b);
1154: } else {
1.10 misho 1155: blob.hdr_cmd = error;
1.27 misho 1156: RPC_SET_BLOB_RET(&blob, -1);
1.7 misho 1157: }
1158: break;
1159: case set:
1.17 misho 1160: if ((b = rpc_srv_registerBLOB(s, ntohl(blob.hdr_len),
1161: ntohl(blob.hdr_ret)))) {
1.7 misho 1162: /* set new BLOB variable for reply :) */
1.10 misho 1163: blob.hdr_var = htonl(b->blob_var);
1.7 misho 1164:
1165: /* receive BLOB from client */
1.10 misho 1166: blob.hdr_ret = htonl(rpc_srv_recvBLOB(c, b));
1.7 misho 1167: rpc_srv_blobUnmap(b);
1.5 misho 1168: } else {
1.10 misho 1169: blob.hdr_cmd = error;
1.27 misho 1170: RPC_SET_BLOB_RET(&blob, -1);
1.7 misho 1171: }
1172: break;
1173: case unset:
1.11 misho 1174: if (rpc_srv_unregisterBLOB(s, ntohl(blob.hdr_var)) == -1) {
1.10 misho 1175: blob.hdr_cmd = error;
1.27 misho 1176: RPC_SET_BLOB_RET(&blob, -1);
1.1 misho 1177: }
1178: break;
1.7 misho 1179: default:
1.10 misho 1180: rpc_SetErr(EPROCUNAVAIL, "Unsupported BLOB command %d", blob.hdr_cmd);
1181: blob.hdr_cmd = error;
1.27 misho 1182: RPC_SET_BLOB_RET(&blob, -1);
1.7 misho 1183: }
1.1 misho 1184:
1.7 misho 1185: end:
1.10 misho 1186: memcpy(AIT_ADDR(&c->cli_buf), &blob, sizeof blob);
1187: schedWrite(TASK_ROOT(task), txBLOB, TASK_ARG(task), TASK_FD(task), NULL, 0);
1188: schedReadSelf(task);
1.27 misho 1189: taskExit(task, NULL);
1.2 misho 1190: }
1191:
1192: static void *
1.17 misho 1193: flushBLOB(sched_task_t *task)
1194: {
1.28 misho 1195: #ifdef atomic_load_acq_ptr
1.23 misho 1196: uintptr_t sigArg = atomic_load_acq_ptr(&_glSigArg);
1.28 misho 1197: #else
1198: uintptr_t sigArg = *((volatile uintptr_t*) &_glSigArg);
1199: #endif
1.23 misho 1200: rpc_srv_t *srv = sigArg ? (void*) sigArg : TASK_ARG(task);
1.17 misho 1201: rpc_blob_t *b, *tmp;
1202:
1203: TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
1204: TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
1205:
1206: rpc_srv_blobFree(srv, b);
1207: e_free(b);
1208: }
1209:
1.29 misho 1210: if (sigArg) {
1.23 misho 1211: /* disabled kqueue support in libaitsched */
1212: struct sigaction sa;
1213:
1214: memset(&sa, 0, sizeof sa);
1215: sigemptyset(&sa.sa_mask);
1216: sa.sa_handler = (void (*)(int)) flushBLOB;
1217: sa.sa_flags = SA_RESTART | SA_RESETHAND;
1218: sigaction(SIGFBLOB, &sa, NULL);
1.29 misho 1219: return NULL;
1220: } else {
1221: schedSignalSelf(task);
1222: taskExit(task, NULL);
1.23 misho 1223: }
1.17 misho 1224: }
1225:
1226: static void *
1.10 misho 1227: acceptBLOBClients(sched_task_t *task)
1.2 misho 1228: {
1.10 misho 1229: rpc_srv_t *srv = TASK_ARG(task);
1230: rpc_cli_t *c = NULL;
1231: register int i;
1.29 misho 1232: socklen_t salen = E_SOCKADDR_MAX;
1.21 misho 1233: int sock;
1.12 misho 1234: #ifdef TCP_NOPUSH
1235: int n = 1;
1236: #endif
1.7 misho 1237:
1.10 misho 1238: /* check free slots for connect */
1.14 misho 1239: for (i = 0; i < array_Size(srv->srv_blob.clients) &&
1240: (c = array(srv->srv_blob.clients, i, rpc_cli_t*)); i++);
1.21 misho 1241: if (c) { /* no more free slots! */
1242: EVERBOSE(1, "BLOB client quota exceeded! Connection will be shutdown!\n");
1243: if ((sock = accept(TASK_FD(task), NULL, NULL)) != -1) {
1244: shutdown(sock, SHUT_RDWR);
1245: close(sock);
1246: }
1.10 misho 1247: goto end;
1.21 misho 1248: }
1249:
1.14 misho 1250: c = e_malloc(sizeof(rpc_cli_t));
1.10 misho 1251: if (!c) {
1.7 misho 1252: LOGERR;
1.10 misho 1253: srv->srv_kill = srv->srv_blob.kill = 1;
1.27 misho 1254: taskExit(task, NULL);
1.7 misho 1255: } else {
1.10 misho 1256: memset(c, 0, sizeof(rpc_cli_t));
1.14 misho 1257: array_Set(srv->srv_blob.clients, i, c);
1.10 misho 1258: c->cli_id = i;
1259: c->cli_parent = srv;
1.7 misho 1260: }
1.4 misho 1261:
1.10 misho 1262: /* alloc empty buffer */
1.14 misho 1263: AIT_SET_BUFSIZ(&c->cli_buf, 0, srv->srv_netbuf);
1.2 misho 1264:
1.10 misho 1265: /* accept client */
1266: c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
1267: if (c->cli_sock == -1) {
1268: LOGERR;
1269: AIT_FREE_VAL(&c->cli_buf);
1.14 misho 1270: array_Del(srv->srv_blob.clients, i, 42);
1.10 misho 1271: goto end;
1.12 misho 1272: } else {
1273: #ifdef TCP_NOPUSH
1274: setsockopt(c->cli_sock, IPPROTO_TCP, TCP_NOPUSH, &n, sizeof n);
1275: #endif
1.10 misho 1276: fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
1.30 misho 1277: fcntl(c->cli_sock, F_SETFD, FD_CLOEXEC);
1.12 misho 1278: }
1.2 misho 1279:
1.10 misho 1280: schedRead(TASK_ROOT(task), rxBLOB, c, c->cli_sock, NULL, 0);
1281: end:
1282: schedReadSelf(task);
1.27 misho 1283: taskExit(task, NULL);
1.1 misho 1284: }
1285:
1.10 misho 1286: /* ------------------------------------------------------ */
1.1 misho 1287:
1288: /*
1.7 misho 1289: * rpc_srv_initBLOBServer() - Init & create BLOB Server
1290: *
1.4 misho 1291: * @srv = RPC server instance
1.2 misho 1292: * @Port = Port for bind server, if Port == 0 default port is selected
1293: * @diskDir = Disk place for BLOB file objects
1294: * return: -1 == error or 0 bind and created BLOB server instance
1295: */
1296: int
1297: rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)
1298: {
1299: int n = 1;
1.29 misho 1300: socklen_t salen;
1.2 misho 1301:
1.10 misho 1302: if (!srv || srv->srv_kill) {
1.7 misho 1303: rpc_SetErr(EINVAL, "Invalid parameters can`t init BLOB server");
1.2 misho 1304: return -1;
1305: }
1306:
1307: memset(&srv->srv_blob, 0, sizeof srv->srv_blob);
1308: if (access(diskDir, R_OK | W_OK) == -1) {
1309: LOGERR;
1310: return -1;
1311: } else
1.7 misho 1312: AIT_SET_STR(&srv->srv_blob.dir, diskDir);
1.2 misho 1313:
1.10 misho 1314: /* init blob list */
1315: TAILQ_INIT(&srv->srv_blob.blobs);
1316:
1.2 misho 1317: srv->srv_blob.server.cli_parent = srv;
1.4 misho 1318:
1.29 misho 1319: memcpy(&srv->srv_blob.server.cli_sa, &srv->srv_server.cli_sa, sizeof srv->srv_blob.server.cli_sa);
1.10 misho 1320: switch (srv->srv_blob.server.cli_sa.sa.sa_family) {
1.4 misho 1321: case AF_INET:
1.10 misho 1322: srv->srv_blob.server.cli_sa.sin.sin_port =
1323: htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin.sin_port) + 1);
1.29 misho 1324: salen = sizeof srv->srv_blob.server.cli_sa.sin;
1.4 misho 1325: break;
1326: case AF_INET6:
1.10 misho 1327: srv->srv_blob.server.cli_sa.sin6.sin6_port =
1328: htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin6.sin6_port) + 1);
1.29 misho 1329: salen = sizeof srv->srv_blob.server.cli_sa.sin6;
1.4 misho 1330: break;
1331: case AF_LOCAL:
1.10 misho 1332: strlcat(srv->srv_blob.server.cli_sa.sun.sun_path, ".blob",
1333: sizeof srv->srv_blob.server.cli_sa.sun.sun_path);
1.29 misho 1334: salen = sizeof srv->srv_blob.server.cli_sa.sun;
1.4 misho 1335: break;
1336: default:
1.7 misho 1337: AIT_FREE_VAL(&srv->srv_blob.dir);
1.4 misho 1338: return -1;
1.2 misho 1339: }
1340:
1.4 misho 1341: /* create BLOB server socket */
1.6 misho 1342: srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);
1.2 misho 1343: if (srv->srv_blob.server.cli_sock == -1) {
1344: LOGERR;
1.7 misho 1345: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 1346: return -1;
1347: }
1348: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
1349: LOGERR;
1350: close(srv->srv_blob.server.cli_sock);
1.7 misho 1351: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 1352: return -1;
1353: }
1.5 misho 1354: n = srv->srv_netbuf;
1355: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
1356: LOGERR;
1357: close(srv->srv_blob.server.cli_sock);
1.7 misho 1358: AIT_FREE_VAL(&srv->srv_blob.dir);
1.5 misho 1359: return -1;
1360: }
1361: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
1362: LOGERR;
1363: close(srv->srv_blob.server.cli_sock);
1.7 misho 1364: AIT_FREE_VAL(&srv->srv_blob.dir);
1.5 misho 1365: return -1;
1366: }
1.29 misho 1367: if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa, salen) == -1) {
1.2 misho 1368: LOGERR;
1369: close(srv->srv_blob.server.cli_sock);
1.7 misho 1370: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 1371: return -1;
1.13 misho 1372: } else
1373: fcntl(srv->srv_blob.server.cli_sock, F_SETFL,
1374: fcntl(srv->srv_blob.server.cli_sock, F_GETFL) | O_NONBLOCK);
1375:
1.2 misho 1376:
1.10 misho 1377: /* allocate pool for concurent blob clients */
1.14 misho 1378: srv->srv_blob.clients = array_Init(array_Size(srv->srv_clients));
1.2 misho 1379: if (!srv->srv_blob.clients) {
1.14 misho 1380: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
1.2 misho 1381: close(srv->srv_blob.server.cli_sock);
1.7 misho 1382: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 1383: return -1;
1.10 misho 1384: }
1.2 misho 1385:
1.10 misho 1386: /* init blob scheduler */
1387: srv->srv_blob.root = schedBegin();
1388: if (!srv->srv_blob.root) {
1389: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1.14 misho 1390: array_Destroy(&srv->srv_blob.clients);
1.10 misho 1391: close(srv->srv_blob.server.cli_sock);
1392: AIT_FREE_VAL(&srv->srv_blob.dir);
1393: return -1;
1.30.2.5 misho 1394: }
1.2 misho 1395:
1396: return 0;
1397: }
1398:
1399: /*
1.7 misho 1400: * rpc_srv_endBLOBServer() - Destroy BLOB server, close all opened sockets and free resources
1401: *
1.2 misho 1402: * @srv = RPC Server instance
1403: * return: none
1404: */
1.16 misho 1405: void
1.2 misho 1406: rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
1407: {
1.10 misho 1408: if (!srv)
1.2 misho 1409: return;
1410:
1.10 misho 1411: srv->srv_blob.kill = 1;
1.17 misho 1412:
1.27 misho 1413: if (srv->srv_blob.server.cli_sa.sa.sa_family == AF_LOCAL)
1414: unlink(srv->srv_blob.server.cli_sa.sun.sun_path);
1.28 misho 1415:
1416: schedEnd(&srv->srv_blob.root);
1.2 misho 1417: }
1418:
1419: /*
1.10 misho 1420: * rpc_srv_loopBLOBServer() - Execute Main BLOB server loop and wait for clients requests
1.7 misho 1421: *
1.2 misho 1422: * @srv = RPC Server instance
1423: * return: -1 error or 0 ok, infinite loop ...
1424: */
1425: int
1.10 misho 1426: rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv)
1.2 misho 1427: {
1.10 misho 1428: rpc_cli_t *c;
1.2 misho 1429: register int i;
1.10 misho 1430: rpc_blob_t *b, *tmp;
1431: struct timespec ts = { RPC_SCHED_POLLING, 0 };
1.2 misho 1432:
1.10 misho 1433: if (!srv || srv->srv_kill) {
1.7 misho 1434: rpc_SetErr(EINVAL, "Invalid parameter can`t start BLOB server");
1.2 misho 1435: return -1;
1436: }
1437:
1.14 misho 1438: if (listen(srv->srv_blob.server.cli_sock, array_Size(srv->srv_blob.clients)) == -1) {
1.2 misho 1439: LOGERR;
1440: return -1;
1.10 misho 1441: }
1442:
1.23 misho 1443: if (!schedSignal(srv->srv_blob.root, flushBLOB, srv, SIGFBLOB, NULL, 0)) {
1444: /* disabled kqueue support in libaitsched */
1445: struct sigaction sa;
1446:
1.28 misho 1447: #ifdef atomic_store_rel_ptr
1.23 misho 1448: atomic_store_rel_ptr(&_glSigArg, (uintptr_t) srv);
1.28 misho 1449: #else
1450: *((volatile uintptr_t*) &_glSigArg) = (uintptr_t) srv;
1451: #endif
1.23 misho 1452:
1453: memset(&sa, 0, sizeof sa);
1454: sigemptyset(&sa.sa_mask);
1455: sa.sa_handler = (void (*)(int)) flushBLOB;
1456: sa.sa_flags = SA_RESTART | SA_RESETHAND;
1457: sigaction(SIGFBLOB, &sa, NULL);
1458: }
1459:
1.10 misho 1460: if (!schedRead(srv->srv_blob.root, acceptBLOBClients, srv,
1461: srv->srv_blob.server.cli_sock, NULL, 0)) {
1462: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1463: return -1;
1464: }
1.2 misho 1465:
1.10 misho 1466: schedPolling(srv->srv_blob.root, &ts, NULL);
1467: /* main rpc loop */
1468: schedRun(srv->srv_blob.root, &srv->srv_blob.kill);
1.7 misho 1469:
1.17 misho 1470: /* detach blobs */
1471: TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
1472: TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
1473:
1474: rpc_srv_blobFree(srv, b);
1475: e_free(b);
1476: }
1477:
1.10 misho 1478: /* close all clients connections & server socket */
1.14 misho 1479: for (i = 0; i < array_Size(srv->srv_blob.clients); i++) {
1480: c = array(srv->srv_blob.clients, i, rpc_cli_t*);
1.10 misho 1481: if (c) {
1482: shutdown(c->cli_sock, SHUT_RDWR);
1483: close(c->cli_sock);
1484:
1485: schedCancelby(srv->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
1486: AIT_FREE_VAL(&c->cli_buf);
1.2 misho 1487: }
1.14 misho 1488: array_Del(srv->srv_blob.clients, i, 42);
1.10 misho 1489: }
1.14 misho 1490: array_Destroy(&srv->srv_blob.clients);
1.2 misho 1491:
1.10 misho 1492: close(srv->srv_blob.server.cli_sock);
1.2 misho 1493:
1.10 misho 1494: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 1495: return 0;
1496: }
1497:
1498:
1499: /*
1.7 misho 1500: * rpc_srv_initServer() - Init & create RPC Server
1501: *
1.15 misho 1502: * @InstID = Instance for authentication & recognition
1.1 misho 1503: * @concurentClients = Concurent clients at same time to this server
1.10 misho 1504: * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
1.4 misho 1505: * @csHost = Host name or address for bind server, if NULL any address
1.1 misho 1506: * @Port = Port for bind server, if Port == 0 default port is selected
1.13 misho 1507: * @proto = Protocol, if == 0 choose SOCK_STREAM
1.1 misho 1508: * return: NULL == error or !=NULL bind and created RPC server instance
1509: */
1510: rpc_srv_t *
1.15 misho 1511: rpc_srv_initServer(u_char InstID, int concurentClients, int netBuf,
1512: const char *csHost, u_short Port, int proto)
1.1 misho 1513: {
1.10 misho 1514: int n = 1;
1.1 misho 1515: rpc_srv_t *srv = NULL;
1.14 misho 1516: sockaddr_t sa = E_SOCKADDR_INIT;
1.29 misho 1517: socklen_t salen;
1.1 misho 1518:
1.25 misho 1519: if (!concurentClients || (proto < 0 || proto > SOCK_RAW)) {
1.10 misho 1520: rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
1.1 misho 1521: return NULL;
1522: }
1.27 misho 1523: if (!Port && proto < SOCK_RAW)
1524: Port = RPC_DEFPORT;
1.29 misho 1525: if (!(salen = e_gethostbyname(csHost, Port, &sa)))
1.10 misho 1526: return NULL;
1.13 misho 1527: if (!proto)
1528: proto = SOCK_STREAM;
1.10 misho 1529: if (netBuf < RPC_MIN_BUFSIZ)
1.5 misho 1530: netBuf = BUFSIZ;
1.7 misho 1531: else
1.14 misho 1532: netBuf = E_ALIGN(netBuf, 2); /* align netBuf length */
1.10 misho 1533:
1534: #ifdef HAVE_SRANDOMDEV
1535: srandomdev();
1536: #else
1537: time_t tim;
1538:
1539: srandom((time(&tim) ^ getpid()));
1540: #endif
1.1 misho 1541:
1.14 misho 1542: srv = e_malloc(sizeof(rpc_srv_t));
1.1 misho 1543: if (!srv) {
1544: LOGERR;
1545: return NULL;
1546: } else
1547: memset(srv, 0, sizeof(rpc_srv_t));
1548:
1.13 misho 1549: srv->srv_proto = proto;
1.5 misho 1550: srv->srv_netbuf = netBuf;
1.1 misho 1551: srv->srv_session.sess_version = RPC_VERSION;
1.15 misho 1552: srv->srv_session.sess_instance = InstID;
1.1 misho 1553:
1554: srv->srv_server.cli_parent = srv;
1.10 misho 1555: memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa);
1556:
1.12 misho 1557: /* init functions */
1558: pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
1559: SLIST_INIT(&srv->srv_funcs);
1560: AVL_INIT(&srv->srv_funcs);
1.10 misho 1561:
1562: /* init scheduler */
1563: srv->srv_root = schedBegin();
1564: if (!srv->srv_root) {
1565: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1.12 misho 1566: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1.14 misho 1567: e_free(srv);
1.10 misho 1568: return NULL;
1.30.2.3 misho 1569: } else
1570: schedSignalDispatch(srv->srv_root, 42);
1.10 misho 1571:
1572: /* init pool for clients */
1.14 misho 1573: srv->srv_clients = array_Init(concurentClients);
1.10 misho 1574: if (!srv->srv_clients) {
1.14 misho 1575: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
1.10 misho 1576: schedEnd(&srv->srv_root);
1.12 misho 1577: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1.14 misho 1578: e_free(srv);
1.10 misho 1579: return NULL;
1580: }
1.4 misho 1581:
1582: /* create server socket */
1.26 misho 1583: srv->srv_server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family,
1584: srv->srv_proto, srv->srv_proto == SOCK_RAW ? IPPROTO_ERPC : 0);
1.1 misho 1585: if (srv->srv_server.cli_sock == -1) {
1586: LOGERR;
1.14 misho 1587: array_Destroy(&srv->srv_clients);
1.10 misho 1588: schedEnd(&srv->srv_root);
1.12 misho 1589: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1.14 misho 1590: e_free(srv);
1.1 misho 1591: return NULL;
1592: }
1593: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
1594: LOGERR;
1.10 misho 1595: goto err;
1.1 misho 1596: }
1.27 misho 1597: if (srv->srv_proto == SOCK_STREAM)
1598: setsockopt(srv->srv_server.cli_sock, IPPROTO_TCP, TCP_NODELAY, &n, sizeof n);
1.5 misho 1599: n = srv->srv_netbuf;
1600: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
1601: LOGERR;
1.10 misho 1602: goto err;
1.5 misho 1603: }
1604: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
1605: LOGERR;
1.10 misho 1606: goto err;
1.5 misho 1607: }
1.29 misho 1608: if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa, salen) == -1) {
1.1 misho 1609: LOGERR;
1.10 misho 1610: goto err;
1.13 misho 1611: } else
1612: fcntl(srv->srv_server.cli_sock, F_SETFL,
1613: fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
1.1 misho 1614:
1.10 misho 1615: rpc_register_srvPing(srv);
1.8 misho 1616:
1.1 misho 1617: return srv;
1.10 misho 1618: err: /* error condition */
1619: close(srv->srv_server.cli_sock);
1.14 misho 1620: array_Destroy(&srv->srv_clients);
1.10 misho 1621: schedEnd(&srv->srv_root);
1.12 misho 1622: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1.14 misho 1623: e_free(srv);
1.10 misho 1624: return NULL;
1.1 misho 1625: }
1626:
1627: /*
1.7 misho 1628: * rpc_srv_endServer() - Destroy RPC server, close all opened sockets and free resources
1629: *
1.6 misho 1630: * @psrv = RPC Server instance
1.1 misho 1631: * return: none
1632: */
1.16 misho 1633: void
1.6 misho 1634: rpc_srv_endServer(rpc_srv_t ** __restrict psrv)
1.1 misho 1635: {
1.10 misho 1636: if (!psrv || !*psrv)
1.1 misho 1637: return;
1638:
1.10 misho 1639: /* if send kill to blob server */
1.17 misho 1640: rpc_srv_endBLOBServer(*psrv);
1.28 misho 1641: /* wait for BLOB server done */
1642: while (*(&(*psrv)->srv_blob.root))
1643: usleep(1000);
1.1 misho 1644:
1.10 misho 1645: (*psrv)->srv_kill = 1;
1646: sleep(RPC_SCHED_POLLING);
1.2 misho 1647:
1.27 misho 1648: if ((*psrv)->srv_server.cli_sa.sa.sa_family == AF_LOCAL)
1649: unlink((*psrv)->srv_server.cli_sa.sun.sun_path);
1650:
1.28 misho 1651: schedEnd(&(*psrv)->srv_root);
1652:
1.12 misho 1653: pthread_mutex_destroy(&(*psrv)->srv_funcs.mtx);
1.14 misho 1654: e_free(*psrv);
1.6 misho 1655: *psrv = NULL;
1.1 misho 1656: }
1657:
1658: /*
1.7 misho 1659: * rpc_srv_loopServer() - Execute Main server loop and wait for clients requests
1660: *
1.1 misho 1661: * @srv = RPC Server instance
1662: * return: -1 error or 0 ok, infinite loop ...
1663: */
1664: int
1.5 misho 1665: rpc_srv_loopServer(rpc_srv_t * __restrict srv)
1.1 misho 1666: {
1.10 misho 1667: rpc_cli_t *c;
1.1 misho 1668: register int i;
1.12 misho 1669: rpc_func_t *f;
1.10 misho 1670: struct timespec ts = { RPC_SCHED_POLLING, 0 };
1.1 misho 1671:
1672: if (!srv) {
1.10 misho 1673: rpc_SetErr(EINVAL, "Invalid parameter can`t start RPC server");
1.1 misho 1674: return -1;
1675: }
1676:
1.13 misho 1677: if (srv->srv_proto == SOCK_STREAM)
1.14 misho 1678: if (listen(srv->srv_server.cli_sock, array_Size(srv->srv_clients)) == -1) {
1.13 misho 1679: LOGERR;
1680: return -1;
1681: }
1.1 misho 1682:
1.13 misho 1683: if (!schedRead(srv->srv_root, cbProto[srv->srv_proto][CB_ACCEPTCLIENT], srv,
1684: srv->srv_server.cli_sock, NULL, 0)) {
1.10 misho 1685: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1686: return -1;
1687: }
1.7 misho 1688:
1.10 misho 1689: schedPolling(srv->srv_root, &ts, NULL);
1.7 misho 1690: /* main rpc loop */
1.10 misho 1691: schedRun(srv->srv_root, &srv->srv_kill);
1.30.2.3 misho 1692: schedSignalDispatch(srv->srv_root, 0);
1.10 misho 1693:
1694: /* close all clients connections & server socket */
1.14 misho 1695: for (i = 0; i < array_Size(srv->srv_clients); i++) {
1696: c = array(srv->srv_clients, i, rpc_cli_t*);
1.10 misho 1697: if (c) {
1.24 misho 1698: if (srv->srv_proto == SOCK_STREAM) {
1699: shutdown(c->cli_sock, SHUT_RDWR);
1700: close(c->cli_sock);
1701: }
1.10 misho 1702:
1703: schedCancelby(srv->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
1.14 misho 1704: ait_freeVars(&RPC_RETVARS(c));
1.10 misho 1705: AIT_FREE_VAL(&c->cli_buf);
1.1 misho 1706: }
1.14 misho 1707: array_Del(srv->srv_clients, i, 42);
1.10 misho 1708: }
1.14 misho 1709: array_Destroy(&srv->srv_clients);
1.2 misho 1710:
1.25 misho 1711: if (srv->srv_proto != SOCK_EXT)
1712: close(srv->srv_server.cli_sock);
1.2 misho 1713:
1.10 misho 1714: /* detach exported calls */
1.12 misho 1715: RPC_FUNCS_LOCK(&srv->srv_funcs);
1716: while ((f = SLIST_FIRST(&srv->srv_funcs))) {
1717: SLIST_REMOVE_HEAD(&srv->srv_funcs, func_next);
1.1 misho 1718:
1.10 misho 1719: AIT_FREE_VAL(&f->func_name);
1.14 misho 1720: e_free(f);
1.1 misho 1721: }
1.12 misho 1722: srv->srv_funcs.avlh_root = NULL;
1723: RPC_FUNCS_UNLOCK(&srv->srv_funcs);
1.1 misho 1724:
1725: return 0;
1726: }
1727:
1728:
1729: /*
1730: * rpc_srv_execCall() Execute registered call from RPC server
1.7 misho 1731: *
1.10 misho 1732: * @cli = RPC client
1.1 misho 1733: * @rpc = IN RPC call structure
1.10 misho 1734: * @funcname = Execute RPC function
1.5 misho 1735: * @args = IN RPC calling arguments from RPC client
1.1 misho 1736: * return: -1 error, !=-1 ok
1737: */
1738: int
1.10 misho 1739: rpc_srv_execCall(rpc_cli_t * __restrict cli, struct tagRPCCall * __restrict rpc,
1740: ait_val_t funcname, array_t * __restrict args)
1.1 misho 1741: {
1742: rpc_callback_t func;
1743:
1.10 misho 1744: if (!cli || !rpc || !AIT_ADDR(&funcname)) {
1.7 misho 1745: rpc_SetErr(EINVAL, "Invalid parameter can`t exec function");
1.1 misho 1746: return -1;
1747: }
1748:
1.10 misho 1749: func = AIT_GET_LIKE(&funcname, rpc_callback_t);
1750: return func(cli, rpc, args);
1.1 misho 1751: }
1.24 misho 1752:
1753:
1754: /*
1755: * rpc_srv_initServer2() - Init & create layer2 RPC Server
1756: *
1757: * @InstID = Instance for authentication & recognition
1758: * @concurentClients = Concurent clients at same time to this server
1759: * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
1760: * @csIface = Interface name for bind server, if NULL first interface on host
1761: * return: NULL == error or !=NULL bind and created RPC server instance
1762: */
1763: rpc_srv_t *
1764: rpc_srv_initServer2(u_char InstID, int concurentClients, int netBuf, const char *csIface)
1765: {
1.29 misho 1766: #ifndef __linux__
1.24 misho 1767: int n = 1;
1768: rpc_srv_t *srv = NULL;
1769: sockaddr_t sa = E_SOCKADDR_INIT;
1770: char szIface[64], szStr[STRSIZ];
1771: register int i;
1772: struct ifreq ifr;
1773: struct bpf_insn insns[] = {
1774: BPF_STMT(BPF_LD + BPF_H + BPF_ABS, 12),
1775: BPF_JUMP(BPF_JMP + BPF_JEQ + BPF_K, RPC_DEFPORT, 0, 1),
1776: BPF_STMT(BPF_RET + BPF_K, -1),
1777: BPF_STMT(BPF_RET + BPF_K, 0),
1778: };
1779: struct bpf_program fcode = {
1780: .bf_len = sizeof(insns) / sizeof(struct bpf_insn),
1781: .bf_insns = insns
1782: };
1783:
1784: if (!concurentClients) {
1785: rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
1786: return NULL;
1787: }
1788: if (!csIface) {
1789: if (e_get1stiface(szIface, sizeof szIface))
1790: return NULL;
1791: } else
1792: strlcpy(szIface, csIface, sizeof szIface);
1793: if (!e_getifacebyname(szIface, &sa))
1794: return NULL;
1795:
1796: #ifdef HAVE_SRANDOMDEV
1797: srandomdev();
1798: #else
1799: time_t tim;
1800:
1801: srandom((time(&tim) ^ getpid()));
1802: #endif
1803:
1804: srv = e_malloc(sizeof(rpc_srv_t));
1805: if (!srv) {
1806: LOGERR;
1807: return NULL;
1808: } else
1809: memset(srv, 0, sizeof(rpc_srv_t));
1810:
1811: srv->srv_proto = SOCK_BPF;
1812: srv->srv_netbuf = netBuf;
1813: srv->srv_session.sess_version = RPC_VERSION;
1814: srv->srv_session.sess_instance = InstID;
1815:
1816: srv->srv_server.cli_parent = srv;
1817: memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa);
1818:
1819: /* init functions */
1820: pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
1821: SLIST_INIT(&srv->srv_funcs);
1822: AVL_INIT(&srv->srv_funcs);
1823:
1824: /* init scheduler */
1825: srv->srv_root = schedBegin();
1826: if (!srv->srv_root) {
1827: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1828: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1829: e_free(srv);
1830: return NULL;
1.30.2.3 misho 1831: } else
1832: schedSignalDispatch(srv->srv_root, 42);
1.24 misho 1833:
1834: /* init pool for clients */
1835: srv->srv_clients = array_Init(concurentClients);
1836: if (!srv->srv_clients) {
1837: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
1838: schedEnd(&srv->srv_root);
1839: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1840: e_free(srv);
1841: return NULL;
1842: }
1843:
1844: /* create server handler */
1845: for (i = 0; i < 10; i++) {
1846: memset(szStr, 0, sizeof szStr);
1847: snprintf(szStr, sizeof szStr, "/dev/bpf%d", i);
1848: srv->srv_server.cli_sock = open(szStr, O_RDWR);
1849: if (srv->srv_server.cli_sock > STDERR_FILENO)
1850: break;
1851: }
1852: if (srv->srv_server.cli_sock < 3) {
1853: LOGERR;
1854: array_Destroy(&srv->srv_clients);
1855: schedEnd(&srv->srv_root);
1856: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1857: e_free(srv);
1858: return NULL;
1859: }
1860:
1861: if (ioctl(srv->srv_server.cli_sock, BIOCIMMEDIATE, &n) == -1) {
1862: LOGERR;
1863: goto err;
1864: }
1865: if (ioctl(srv->srv_server.cli_sock, BIOCSETF, &fcode) == -1) {
1866: LOGERR;
1867: goto err;
1868: }
1869: n = (netBuf < RPC_MIN_BUFSIZ) ? getpagesize() : E_ALIGN(netBuf, 2);
1870: if (ioctl(srv->srv_server.cli_sock, BIOCSBLEN, &n) == -1) {
1871: LOGERR;
1872: goto err;
1873: } else
1874: srv->srv_netbuf = n;
1875:
1876: memset(&ifr, 0, sizeof ifr);
1877: strlcpy(ifr.ifr_name, szIface, sizeof ifr.ifr_name);
1878: if (ioctl(srv->srv_server.cli_sock, BIOCSETIF, &ifr) == -1) {
1879: LOGERR;
1880: goto err;
1881: } else
1882: fcntl(srv->srv_server.cli_sock, F_SETFL,
1883: fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
1884:
1885: rpc_register_srvPing(srv);
1886:
1887: return srv;
1888: err: /* error condition */
1889: close(srv->srv_server.cli_sock);
1890: array_Destroy(&srv->srv_clients);
1891: schedEnd(&srv->srv_root);
1892: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1893: e_free(srv);
1.29 misho 1894: #else
1895: rpc_SetErr(ENOTSUP, "Feature isn't supported on Linux!");
1896: #endif
1897:
1.24 misho 1898: return NULL;
1899: }
1.25 misho 1900:
1901: /*
1902: * rpc_srv_initServerExt() - Init & create pipe RPC Server
1903: *
1904: * @InstID = Instance for authentication & recognition
1905: * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
1906: * @fd = File descriptor
1907: * return: NULL == error or !=NULL bind and created RPC server instance
1908: */
1909: rpc_srv_t *
1910: rpc_srv_initServerExt(u_char InstID, int netBuf, int fd)
1911: {
1912: rpc_srv_t *srv = NULL;
1913:
1914: #ifdef HAVE_SRANDOMDEV
1915: srandomdev();
1916: #else
1917: time_t tim;
1918:
1919: srandom((time(&tim) ^ getpid()));
1920: #endif
1921:
1922: srv = e_malloc(sizeof(rpc_srv_t));
1923: if (!srv) {
1924: LOGERR;
1925: return NULL;
1926: } else
1927: memset(srv, 0, sizeof(rpc_srv_t));
1928:
1929: srv->srv_proto = SOCK_EXT;
1930: srv->srv_netbuf = (netBuf < RPC_MIN_BUFSIZ) ?
1931: getpagesize() : E_ALIGN(netBuf, 2);
1932: srv->srv_session.sess_version = RPC_VERSION;
1933: srv->srv_session.sess_instance = InstID;
1934:
1935: srv->srv_server.cli_parent = srv;
1936: srv->srv_server.cli_sock = fd;
1937:
1938: /* init functions */
1939: pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
1940: SLIST_INIT(&srv->srv_funcs);
1941: AVL_INIT(&srv->srv_funcs);
1942:
1943: /* init scheduler */
1944: srv->srv_root = schedBegin();
1945: if (!srv->srv_root) {
1946: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1947: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1948: e_free(srv);
1949: return NULL;
1.30.2.3 misho 1950: } else
1951: schedSignalDispatch(srv->srv_root, 42);
1.25 misho 1952:
1953: /* init pool for clients */
1954: srv->srv_clients = array_Init(1);
1955: if (!srv->srv_clients) {
1956: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
1957: schedEnd(&srv->srv_root);
1958: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1959: e_free(srv);
1960: return NULL;
1961: }
1962:
1963: fcntl(srv->srv_server.cli_sock, F_SETFL,
1964: fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
1965:
1966: rpc_register_srvPing(srv);
1967:
1968: return srv;
1969: }
1.30.2.10! misho 1970:
! 1971: /*
! 1972: * rpc_srv_Return() - Prepare IPC return answer to RPC client
! 1973: *
! 1974: * @c = RPC client
! 1975: * return: number of arguments in response
! 1976: */
! 1977: int
! 1978: rpc_srv_Return(rpc_cli_t *c)
! 1979: {
! 1980: rpc_srv_t *s = c->cli_parent;
! 1981: u_char *buf = AIT_GET_BUF(&c->cli_buf);
! 1982: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
! 1983:
! 1984: if (!RPC_CHK_NOREPLY(rpc)) {
! 1985: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
! 1986: schedWrite(s->srv_root, cbProto[s->srv_proto][CB_TXPACKET], c, c->cli_sock, rpc, 0);
! 1987: } else
! 1988: rpc->call_argc ^= rpc->call_argc;
! 1989:
! 1990: return rpc->call_argc;
! 1991: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>