Annotation of libaitrpc/src/srv.c, revision 1.26.2.6
1.1 misho 1: /*************************************************************************
2: * (C) 2010 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.26.2.6! misho 6: * $Id: srv.c,v 1.26.2.5 2015/06/25 22:39:05 misho Exp $
1.1 misho 7: *
1.2 misho 8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
1.25 misho 15: Copyright 2004 - 2015
1.2 misho 16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
1.1 misho 46: #include "global.h"
47:
48:
1.13 misho 49: /* SOCK_STREAM */
50: static void *acceptClients(sched_task_t *);
51: static void *closeClient(sched_task_t *);
52: static void *rxPacket(sched_task_t *);
53: static void *txPacket(sched_task_t *);
54:
55: /* SOCK_DGRAM */
56: static void *freeClient(sched_task_t *);
57: static void *rxUDPPacket(sched_task_t *);
58: static void *txUDPPacket(sched_task_t *);
59:
60: /* SOCK_RAW */
1.26 misho 61: static void *rxRAWPacket(sched_task_t *);
62: static void *txRAWPacket(sched_task_t *);
1.13 misho 63:
1.24 misho 64: /* SOCK_BPF */
65: static void *rxBPFPacket(sched_task_t *);
66: static void *txBPFPacket(sched_task_t *);
67:
1.25 misho 68: /* SOCK_EXT */
69: static void *rxEXTPacket(sched_task_t *);
70: static void *txEXTPacket(sched_task_t *);
71:
72: static sched_task_func_t cbProto[SOCK_MAX_SUPPORT][4] = {
1.13 misho 73: { acceptClients, closeClient, rxPacket, txPacket }, /* SOCK_STREAM */
74: { acceptClients, closeClient, rxPacket, txPacket }, /* SOCK_STREAM */
75: { rxUDPPacket, freeClient, rxUDPPacket, txUDPPacket }, /* SOCK_DGRAM */
1.26 misho 76: { rxRAWPacket, freeClient, rxRAWPacket, txRAWPacket }, /* SOCK_RAW */
1.25 misho 77: { rxBPFPacket, freeClient, rxBPFPacket, txBPFPacket }, /* SOCK_BPF */
78: { rxEXTPacket, freeClient, rxEXTPacket, txEXTPacket } /* SOCK_EXT */
1.13 misho 79: };
80:
1.23 misho 81: /* Global Signal Argument when kqueue support disabled */
82:
83: static volatile uintptr_t _glSigArg = 0;
84:
1.13 misho 85:
1.16 misho 86: void
1.13 misho 87: rpc_freeCli(rpc_cli_t * __restrict c)
1.10 misho 88: {
89: rpc_srv_t *s = c->cli_parent;
90:
1.13 misho 91: schedCancelby(s->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
1.10 misho 92:
93: /* free buffer */
94: AIT_FREE_VAL(&c->cli_buf);
95:
1.14 misho 96: array_Del(s->srv_clients, c->cli_id, 0);
1.10 misho 97: if (c)
1.14 misho 98: e_free(c);
1.13 misho 99: }
100:
101:
102: static inline int
1.14 misho 103: _check4freeslot(rpc_srv_t * __restrict srv, sockaddr_t * __restrict sa)
1.13 misho 104: {
105: rpc_cli_t *c = NULL;
106: register int i;
107:
108: /* check free slots for connect */
1.14 misho 109: for (i = 0; i < array_Size(srv->srv_clients) &&
110: (c = array(srv->srv_clients, i, rpc_cli_t*)); i++)
1.13 misho 111: /* check for duplicates */
1.14 misho 112: if (sa && !e_addrcmp(&c->cli_sa, sa, 42))
1.13 misho 113: break;
1.14 misho 114: if (i >= array_Size(srv->srv_clients))
1.13 misho 115: return -1; /* no more free slots! */
116:
117: return i;
118: }
119:
120: static rpc_cli_t *
1.14 misho 121: _allocClient(rpc_srv_t * __restrict srv, sockaddr_t * __restrict sa)
1.13 misho 122: {
123: rpc_cli_t *c = NULL;
124: int n;
125:
1.25 misho 126: if (srv->srv_proto != SOCK_EXT)
127: n = _check4freeslot(srv, sa);
128: else
129: n = 0;
1.13 misho 130: if (n == -1)
131: return NULL;
132: else
1.14 misho 133: c = array(srv->srv_clients, n, rpc_cli_t*);
1.13 misho 134:
135: if (!c) {
1.14 misho 136: c = e_malloc(sizeof(rpc_cli_t));
1.13 misho 137: if (!c) {
138: LOGERR;
139: srv->srv_kill = 1;
140: return NULL;
141: } else {
142: memset(c, 0, sizeof(rpc_cli_t));
1.14 misho 143: array_Set(srv->srv_clients, n, c);
1.13 misho 144: c->cli_id = n;
145: c->cli_parent = srv;
146: }
147:
148: /* alloc empty buffer */
1.14 misho 149: AIT_SET_BUFSIZ(&c->cli_buf, 0, srv->srv_netbuf);
1.13 misho 150: }
151:
152: return c;
153: }
154:
155:
156: static void *
157: freeClient(sched_task_t *task)
158: {
159: rpc_freeCli(TASK_ARG(task));
160:
161: return NULL;
162: }
163:
164: static void *
165: closeClient(sched_task_t *task)
166: {
167: int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock;
168:
169: rpc_freeCli(TASK_ARG(task));
170:
171: /* close client socket */
172: shutdown(sock, SHUT_RDWR);
173: close(sock);
1.10 misho 174: return NULL;
175: }
1.7 misho 176:
177: static void *
178: txPacket(sched_task_t *task)
179: {
180: rpc_cli_t *c = TASK_ARG(task);
181: rpc_srv_t *s = c->cli_parent;
182: rpc_func_t *f = NULL;
1.18 misho 183: u_char *buf = AIT_GET_BUF(&c->cli_buf);
1.7 misho 184: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
1.18 misho 185: int ret, estlen, wlen = sizeof(struct tagRPCCall);
1.19 misho 186: struct pollfd pfd;
1.21 misho 187: #ifdef TCP_SESSION_TIMEOUT
188: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
189:
190: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
191: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
192: TASK_ARG(task), ts, TASK_ARG(task), 0);
193: #endif
1.7 misho 194:
195: if (rpc->call_argc) {
1.10 misho 196: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
1.7 misho 197: if (!f) {
1.10 misho 198: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
1.18 misho 199:
1.7 misho 200: rpc->call_argc ^= rpc->call_argc;
201: rpc->call_rep.ret = RPC_ERROR(-1);
202: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
203: } else {
1.18 misho 204: /* calc estimated length */
205: estlen = ait_resideVars(RPC_RETVARS(c)) + wlen;
206: if (estlen > AIT_LEN(&c->cli_buf))
207: AIT_RE_BUF(&c->cli_buf, estlen);
208: buf = AIT_GET_BUF(&c->cli_buf);
1.19 misho 209: rpc = (struct tagRPCCall*) buf;
1.18 misho 210:
1.25 misho 211: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
1.7 misho 212: /* Go Encapsulate variables */
1.18 misho 213: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
214: RPC_RETVARS(c));
1.7 misho 215: if (ret == -1) {
216: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
1.19 misho 217:
1.7 misho 218: rpc->call_argc ^= rpc->call_argc;
219: rpc->call_rep.ret = RPC_ERROR(-1);
220: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
221: } else
222: wlen += ret;
223: }
224: }
225:
1.26.2.5 misho 226: /* Free return values */
227: ait_freeVars(&c->cli_vars);
228:
1.18 misho 229: rpc->call_len = htonl(wlen);
1.25 misho 230: rpc->call_io = RPC_ACK;
1.8 misho 231:
1.15 misho 232: #if 0
1.7 misho 233: /* calculate CRC */
234: rpc->call_crc ^= rpc->call_crc;
1.8 misho 235: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
1.15 misho 236: #endif
1.7 misho 237:
238: /* send reply */
1.19 misho 239: pfd.fd = TASK_FD(task);
240: pfd.events = POLLOUT;
241: for (; wlen > 0; wlen -= ret, buf += ret) {
242: if ((ret = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 ||
243: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
244: if (ret)
245: LOGERR;
246: else
1.22 misho 247: rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
1.19 misho 248: /* close connection */
249: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
250: TASK_ARG(task), 0, NULL, 0);
251: return NULL;
252: }
253: ret = send(TASK_FD(task), buf, MIN(wlen, s->srv_netbuf), MSG_NOSIGNAL);
254: if (ret == -1) {
255: /* close connection */
256: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
257: TASK_ARG(task), 0, NULL, 0);
258: return NULL;
259: }
1.10 misho 260: }
1.7 misho 261:
262: return NULL;
263: }
264:
265: static void *
266: execCall(sched_task_t *task)
267: {
268: rpc_cli_t *c = TASK_ARG(task);
269: rpc_srv_t *s = c->cli_parent;
270: rpc_func_t *f = NULL;
271: array_t *arr = NULL;
1.18 misho 272: u_char *buf = AIT_GET_BUF(&c->cli_buf);
1.7 misho 273: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
1.25 misho 274: int argc = rpc->call_argc;
1.7 misho 275:
276: /* Go decapsulate variables ... */
1.8 misho 277: if (argc) {
1.14 misho 278: arr = ait_buffer2vars(buf + sizeof(struct tagRPCCall),
1.18 misho 279: AIT_LEN(&c->cli_buf) - sizeof(struct tagRPCCall), argc, 42);
1.7 misho 280: if (!arr) {
1.14 misho 281: rpc_SetErr(ERPCMISMATCH, "#%d - %s", elwix_GetErrno(), elwix_GetError());
1.18 misho 282:
1.7 misho 283: rpc->call_argc ^= rpc->call_argc;
284: rpc->call_rep.ret = RPC_ERROR(-1);
285: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
286: return NULL;
287: }
1.10 misho 288: } else
289: arr = NULL;
1.7 misho 290:
1.10 misho 291: if (!(f = rpc_srv_getCall(s, ntohs(rpc->call_tag)))) {
1.7 misho 292: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
1.18 misho 293:
1.7 misho 294: rpc->call_argc ^= rpc->call_argc;
295: rpc->call_rep.ret = RPC_ERROR(-1);
296: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
297: } else {
1.8 misho 298: /* if client doesn't want reply */
1.10 misho 299: argc = RPC_CHK_NOREPLY(rpc);
300: rpc->call_rep.ret = RPC_ERROR(rpc_srv_execCall(c, rpc, f->func_name, arr));
1.7 misho 301: if (rpc->call_rep.ret == htonl(-1)) {
1.20 misho 302: if (!rpc->call_rep.eno) {
303: LOGERR;
304: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
305: }
1.7 misho 306: rpc->call_argc ^= rpc->call_argc;
1.26.2.5 misho 307: ait_freeVars(&c->cli_vars);
1.7 misho 308: } else {
309: rpc->call_rep.eno ^= rpc->call_rep.eno;
1.8 misho 310: if (argc) {
311: /* without reply */
1.14 misho 312: ait_freeVars(&c->cli_vars);
1.8 misho 313: rpc->call_argc ^= rpc->call_argc;
1.10 misho 314: } else {
315: /* reply */
1.25 misho 316: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
1.10 misho 317: }
1.7 misho 318: }
319: }
320:
1.14 misho 321: array_Destroy(&arr);
1.7 misho 322: return NULL;
323: }
324:
325: static void *
326: rxPacket(sched_task_t *task)
327: {
328: rpc_cli_t *c = TASK_ARG(task);
329: rpc_srv_t *s = c->cli_parent;
1.18 misho 330: int len, rlen, noreply, estlen;
1.15 misho 331: #if 0
332: u_short crc;
333: #endif
1.10 misho 334: u_char *buf = AIT_GET_BUF(&c->cli_buf);
1.18 misho 335: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
1.19 misho 336: struct pollfd pfd;
1.21 misho 337: #ifdef TCP_SESSION_TIMEOUT
338: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
339:
340: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
341: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
342: TASK_ARG(task), ts, TASK_ARG(task), 0);
343: #endif
1.7 misho 344:
1.18 misho 345: memset(buf, 0, sizeof(struct tagRPCCall));
1.19 misho 346: rlen = recv(TASK_FD(task), rpc, sizeof(struct tagRPCCall), MSG_PEEK);
1.18 misho 347: if (rlen < sizeof(struct tagRPCCall)) {
1.10 misho 348: /* close connection */
1.13 misho 349: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
350: TASK_ARG(task), 0, NULL, 0);
1.7 misho 351: return NULL;
1.10 misho 352: } else {
1.18 misho 353: estlen = ntohl(rpc->call_len);
354: if (estlen > AIT_LEN(&c->cli_buf))
355: AIT_RE_BUF(&c->cli_buf, estlen);
356: rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
1.19 misho 357: buf = AIT_GET_BUF(&c->cli_buf);
358: len = estlen;
1.18 misho 359: }
360:
1.19 misho 361: /* get next part of packet */
362: memset(buf, 0, len);
363: pfd.fd = TASK_FD(task);
364: pfd.events = POLLIN | POLLPRI;
365: for (; len > 0; len -= rlen, buf += rlen) {
366: if ((rlen = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 ||
367: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
368: if (rlen)
369: LOGERR;
370: else
1.22 misho 371: rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
1.19 misho 372: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
373: TASK_ARG(task), 0, NULL, 0);
374: return NULL;
375: }
1.18 misho 376: rlen = recv(TASK_FD(task), buf, len, 0);
377: if (rlen == -1) {
378: /* close connection */
379: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
380: TASK_ARG(task), 0, NULL, 0);
1.8 misho 381: return NULL;
1.10 misho 382: }
1.18 misho 383: }
1.22 misho 384: len = estlen;
1.8 misho 385:
1.25 misho 386: /* skip loop packet */
387: if (rpc->call_io & RPC_ACK) {
388: schedReadSelf(task);
389: return NULL;
390: }
391:
1.15 misho 392: #if 0
1.18 misho 393: /* check integrity of packet */
394: crc = ntohs(rpc->call_crc);
395: rpc->call_crc ^= rpc->call_crc;
396: if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
397: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
398: return NULL;
399: }
1.15 misho 400: #endif
1.7 misho 401:
1.18 misho 402: noreply = RPC_CHK_NOREPLY(rpc);
1.7 misho 403:
1.18 misho 404: /* check RPC packet session info */
405: if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) {
406: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
1.7 misho 407:
1.18 misho 408: rpc->call_argc ^= rpc->call_argc;
409: rpc->call_rep.ret = RPC_ERROR(-1);
410: rpc->call_rep.eno = RPC_ERROR(errno);
411: } else {
412: /* execute RPC call */
413: schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), (int) noreply, rpc, len);
414: }
1.7 misho 415:
1.18 misho 416: /* send RPC reply */
417: if (!noreply)
418: schedWrite(TASK_ROOT(task), cbProto[s->srv_proto][CB_TXPACKET],
419: TASK_ARG(task), TASK_FD(task), rpc, len);
1.7 misho 420:
421: /* lets get next packet */
1.18 misho 422: schedReadSelf(task);
1.7 misho 423: return NULL;
424: }
425:
1.1 misho 426: static void *
1.10 misho 427: acceptClients(sched_task_t *task)
1.1 misho 428: {
1.10 misho 429: rpc_srv_t *srv = TASK_ARG(task);
430: rpc_cli_t *c = NULL;
1.14 misho 431: socklen_t salen = sizeof(sockaddr_t);
1.21 misho 432: int sock;
433: #ifdef TCP_SESSION_TIMEOUT
434: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
435: #endif
1.7 misho 436:
1.13 misho 437: c = _allocClient(srv, NULL);
1.21 misho 438: if (!c) {
439: EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
440: if ((sock = accept(TASK_FD(task), NULL, NULL)) != -1) {
441: shutdown(sock, SHUT_RDWR);
442: close(sock);
443: }
1.10 misho 444: goto end;
1.21 misho 445: }
1.10 misho 446:
447: /* accept client */
448: c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
449: if (c->cli_sock == -1) {
450: LOGERR;
451: AIT_FREE_VAL(&c->cli_buf);
1.14 misho 452: array_Del(srv->srv_clients, c->cli_id, 42);
1.10 misho 453: goto end;
1.1 misho 454: } else
1.10 misho 455: fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
1.1 misho 456:
1.21 misho 457: #ifdef TCP_SESSION_TIMEOUT
458: /* armed timer for close stateless connection */
459: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
460: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], c,
461: ts, c, 0);
462: #endif
1.13 misho 463: schedRead(TASK_ROOT(task), cbProto[srv->srv_proto][CB_RXPACKET], c,
464: c->cli_sock, NULL, 0);
1.10 misho 465: end:
466: schedReadSelf(task);
467: return NULL;
468: }
1.5 misho 469:
1.7 misho 470:
1.10 misho 471: static void *
1.13 misho 472: txUDPPacket(sched_task_t *task)
1.10 misho 473: {
474: rpc_cli_t *c = TASK_ARG(task);
475: rpc_srv_t *s = c->cli_parent;
1.13 misho 476: rpc_func_t *f = NULL;
1.18 misho 477: u_char *buf = AIT_GET_BUF(&c->cli_buf);
1.13 misho 478: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
1.22 misho 479: int ret, estlen, wlen = sizeof(struct tagRPCCall);
1.13 misho 480: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
481:
482: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
483: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
484: TASK_ARG(task), ts, TASK_ARG(task), 0);
485:
486: if (rpc->call_argc) {
487: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
488: if (!f) {
489: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
490: rpc->call_argc ^= rpc->call_argc;
491: rpc->call_rep.ret = RPC_ERROR(-1);
492: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
493: } else {
1.22 misho 494: /* calc estimated length */
495: estlen = ait_resideVars(RPC_RETVARS(c)) + wlen;
496: if (estlen > AIT_LEN(&c->cli_buf))
497: AIT_RE_BUF(&c->cli_buf, estlen);
498: buf = AIT_GET_BUF(&c->cli_buf);
499: rpc = (struct tagRPCCall*) buf;
500:
1.25 misho 501: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
1.13 misho 502: /* Go Encapsulate variables */
1.18 misho 503: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
504: RPC_RETVARS(c));
1.13 misho 505: if (ret == -1) {
506: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
507: rpc->call_argc ^= rpc->call_argc;
508: rpc->call_rep.ret = RPC_ERROR(-1);
509: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
510: } else
511: wlen += ret;
512: }
513: }
1.7 misho 514:
1.26.2.5 misho 515: /* Free return values */
516: ait_freeVars(&c->cli_vars);
517:
1.18 misho 518: rpc->call_len = htonl(wlen);
1.25 misho 519: rpc->call_io = RPC_ACK;
1.7 misho 520:
1.13 misho 521: /* calculate CRC */
522: rpc->call_crc ^= rpc->call_crc;
523: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
524:
525: /* send reply */
1.26.2.6! misho 526: ret = sendto(TASK_FD(task), buf, MIN(wlen, s->srv_netbuf), MSG_NOSIGNAL,
! 527: &c->cli_sa.sa, c->cli_sa.sa.sa_len);
! 528: if (ret == -1) {
! 529: /* close connection */
! 530: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
! 531: TASK_ARG(task), 0, NULL, 0);
! 532: return NULL;
1.13 misho 533: }
534:
535: return NULL;
536: }
537:
538: static void *
539: rxUDPPacket(sched_task_t *task)
540: {
541: rpc_srv_t *srv = TASK_ARG(task);
542: rpc_cli_t *c = NULL;
1.26.2.6! misho 543: int len, rlen, noreply;
1.22 misho 544: u_short crc;
1.26.2.6! misho 545: struct tagRPCCall *rpc;
1.14 misho 546: sockaddr_t sa;
547: socklen_t salen;
1.13 misho 548: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
1.26.2.6! misho 549: ait_val_t b = AIT_VAL_INIT;
1.13 misho 550:
551: /* receive connect packet */
1.26.2.6! misho 552: AIT_SET_BUF(&b, NULL, srv->srv_netbuf);
1.14 misho 553: salen = sa.ss.ss_len = sizeof(sockaddr_t);
1.26.2.6! misho 554: rlen = recvfrom(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b), 0, &sa.sa, &salen);
! 555: rpc = (struct tagRPCCall*) AIT_GET_BUF(&b);
! 556: if (rlen < sizeof(struct tagRPCCall) || rlen < ntohl(rpc->call_len)) {
1.22 misho 557: rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
1.13 misho 558: goto end;
1.22 misho 559: }
1.13 misho 560:
1.26.2.6! misho 561: /* skip loop packet */
! 562: if (rpc->call_io & RPC_ACK)
! 563: goto end;
! 564:
1.13 misho 565: c = _allocClient(srv, &sa);
1.22 misho 566: if (!c) {
567: EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
568: usleep(2000); /* blocked client delay */
1.13 misho 569: goto end;
1.22 misho 570: } else {
1.26.2.6! misho 571: len = ntohl(rpc->call_len);
! 572: if (len > AIT_LEN(&c->cli_buf))
! 573: AIT_RE_BUF(&c->cli_buf, len);
! 574: memcpy(AIT_GET_BUF(&c->cli_buf), AIT_GET_BUF(&b), len);
1.22 misho 575: rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
576:
1.13 misho 577: c->cli_sock = TASK_FD(task);
578: memcpy(&c->cli_sa, &sa, sizeof c->cli_sa);
1.22 misho 579:
1.13 misho 580: /* armed timer for close stateless connection */
581: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
582: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
583: c, ts, c, 0);
584: }
585:
1.22 misho 586: /* check integrity of packet */
587: crc = ntohs(rpc->call_crc);
588: rpc->call_crc ^= rpc->call_crc;
589: if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
590: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
1.24 misho 591: /* close connection */
592: schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
593: c, 0, NULL, 0);
594: goto end;
595: }
596:
597: noreply = RPC_CHK_NOREPLY(rpc);
598:
599: /* check RPC packet session info */
600: if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
601: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
602:
603: rpc->call_argc ^= rpc->call_argc;
604: rpc->call_rep.ret = RPC_ERROR(-1);
605: rpc->call_rep.eno = RPC_ERROR(errno);
606: } else {
607: /* execute RPC call */
608: schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
609: }
610:
611: /* send RPC reply */
612: if (!noreply)
613: schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],
614: c, TASK_FD(task), rpc, len);
615: end:
1.26.2.6! misho 616: AIT_FREE_VAL(&b);
1.24 misho 617: schedReadSelf(task);
618: return NULL;
619: }
620:
621:
622: static void *
1.26 misho 623: txRAWPacket(sched_task_t *task)
624: {
625: rpc_cli_t *c = TASK_ARG(task);
626: rpc_srv_t *s = c->cli_parent;
627: rpc_func_t *f = NULL;
628: u_char *buf = AIT_GET_BUF(&c->cli_buf);
629: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
630: int ret, estlen, wlen = sizeof(struct tagRPCCall);
631: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
632:
633: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
634: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
635: TASK_ARG(task), ts, TASK_ARG(task), 0);
636:
637: if (rpc->call_argc) {
638: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
639: if (!f) {
640: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
641: rpc->call_argc ^= rpc->call_argc;
642: rpc->call_rep.ret = RPC_ERROR(-1);
643: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
644: } else {
645: /* calc estimated length */
646: estlen = ait_resideVars(RPC_RETVARS(c)) + wlen;
647: if (estlen > AIT_LEN(&c->cli_buf))
648: AIT_RE_BUF(&c->cli_buf, estlen);
649: buf = AIT_GET_BUF(&c->cli_buf);
650: rpc = (struct tagRPCCall*) buf;
651:
652: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
653: /* Go Encapsulate variables */
654: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
655: RPC_RETVARS(c));
656: if (ret == -1) {
657: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
658: rpc->call_argc ^= rpc->call_argc;
659: rpc->call_rep.ret = RPC_ERROR(-1);
660: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
661: } else
662: wlen += ret;
663: }
664: }
665:
1.26.2.5 misho 666: /* Free return values */
667: ait_freeVars(&c->cli_vars);
668:
1.26 misho 669: rpc->call_len = htonl(wlen);
670: rpc->call_io = RPC_ACK;
671:
672: /* calculate CRC */
673: rpc->call_crc ^= rpc->call_crc;
674: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
675:
676: /* send reply */
1.26.2.1 misho 677: ret = sendto(TASK_FD(task), buf, MIN(wlen, s->srv_netbuf), MSG_NOSIGNAL,
678: &c->cli_sa.sa, c->cli_sa.sa.sa_len);
679: if (ret == -1) {
680: /* close connection */
681: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
682: TASK_ARG(task), 0, NULL, 0);
683: return NULL;
1.26 misho 684: }
685:
686: return NULL;
687: }
688:
689: static void *
690: rxRAWPacket(sched_task_t *task)
691: {
692: rpc_srv_t *srv = TASK_ARG(task);
693: rpc_cli_t *c = NULL;
1.26.2.1 misho 694: int len, rlen, noreply;
1.26 misho 695: u_short crc;
696: struct tagRPCCall *rpc;
697: sockaddr_t sa;
698: socklen_t salen;
699: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
1.26.2.1 misho 700: ait_val_t b = AIT_VAL_INIT;
1.26 misho 701:
702: /* receive connect packet */
1.26.2.1 misho 703: AIT_SET_BUF(&b, NULL, srv->srv_netbuf);
1.26 misho 704: salen = sa.ss.ss_len = sizeof(sockaddr_t);
1.26.2.1 misho 705: rlen = recvfrom(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b), 0, &sa.sa, &salen);
706: if (sa.sa.sa_family == AF_INET) {
707: struct ip *h;
708: h = (struct ip*) AIT_GET_BUF(&b);
709: if (rlen < ntohs(h->ip_len) || h->ip_p != IPPROTO_ERPC)
710: goto end;
711: else {
712: rlen -= sizeof(struct ip);
713: rpc = (struct tagRPCCall*)
714: (AIT_GET_BUF(&b) + sizeof(struct ip));
715: }
716: } else {
717: struct ip6_hdr *h;
718: h = (struct ip6_hdr*) AIT_GET_BUF(&b);
719: if (rlen < (ntohs(h->ip6_plen) + sizeof(struct ip6_hdr)) ||
720: h->ip6_nxt != IPPROTO_ERPC)
721: goto end;
722: else {
723: rlen -= sizeof(struct ip6_hdr);
724: rpc = (struct tagRPCCall*)
725: (AIT_GET_BUF(&b) + sizeof(struct ip6_hdr));
726: }
727: }
1.26.2.6! misho 728: if (rlen < sizeof(struct tagRPCCall) || rlen < ntohl(rpc->call_len)) {
1.26 misho 729: rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
730: goto end;
731: }
732:
1.26.2.1 misho 733: /* skip loop packet */
734: if (rpc->call_io & RPC_ACK)
735: goto end;
736:
1.26 misho 737: c = _allocClient(srv, &sa);
738: if (!c) {
739: EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
740: usleep(2000); /* blocked client delay */
741: goto end;
742: } else {
1.26.2.1 misho 743: len = ntohl(rpc->call_len);
744: if (len > AIT_LEN(&c->cli_buf))
745: AIT_RE_BUF(&c->cli_buf, len);
746: memcpy(AIT_GET_BUF(&c->cli_buf), rpc, len);
747: rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
1.26 misho 748:
749: c->cli_sock = TASK_FD(task);
750: memcpy(&c->cli_sa, &sa, sizeof c->cli_sa);
751:
752: /* armed timer for close stateless connection */
753: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
754: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
755: c, ts, c, 0);
756: }
757:
758: /* check integrity of packet */
759: crc = ntohs(rpc->call_crc);
760: rpc->call_crc ^= rpc->call_crc;
761: if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
762: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
763: /* close connection */
764: schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
765: c, 0, NULL, 0);
766: goto end;
767: }
768:
769: noreply = RPC_CHK_NOREPLY(rpc);
770:
771: /* check RPC packet session info */
772: if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
773: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
774:
775: rpc->call_argc ^= rpc->call_argc;
776: rpc->call_rep.ret = RPC_ERROR(-1);
777: rpc->call_rep.eno = RPC_ERROR(errno);
778: } else {
779: /* execute RPC call */
780: schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
781: }
782:
783: /* send RPC reply */
784: if (!noreply)
785: schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],
786: c, TASK_FD(task), rpc, len);
787: end:
1.26.2.1 misho 788: AIT_FREE_VAL(&b);
1.26 misho 789: schedReadSelf(task);
790: return NULL;
791: }
792:
793:
794: static void *
1.24 misho 795: txBPFPacket(sched_task_t *task)
796: {
797: rpc_cli_t *c = TASK_ARG(task);
798: rpc_srv_t *s = c->cli_parent;
799: rpc_func_t *f = NULL;
800: u_char *buf = AIT_GET_BUF(&c->cli_buf);
801: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
802: int ret, len, wlen = sizeof(struct tagRPCCall);
803: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
804: struct ether_header *eh;
805: ait_val_t b = AIT_VAL_INIT;
806:
807: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
808: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
809: TASK_ARG(task), ts, TASK_ARG(task), 0);
810:
811: if (rpc->call_argc) {
812: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
813: if (!f) {
814: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
815: rpc->call_argc ^= rpc->call_argc;
816: rpc->call_rep.ret = RPC_ERROR(-1);
817: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
818: } else {
819: /* calc estimated length */
820: len = ait_resideVars(RPC_RETVARS(c)) + wlen;
821: if (len > AIT_LEN(&c->cli_buf))
822: AIT_RE_BUF(&c->cli_buf, len);
823: buf = AIT_GET_BUF(&c->cli_buf);
824: rpc = (struct tagRPCCall*) buf;
825:
1.25 misho 826: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
1.24 misho 827: /* Go Encapsulate variables */
828: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
829: RPC_RETVARS(c));
830: if (ret == -1) {
831: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
832: rpc->call_argc ^= rpc->call_argc;
833: rpc->call_rep.ret = RPC_ERROR(-1);
834: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
835: } else
836: wlen += ret;
837: }
838: }
839:
1.26.2.5 misho 840: /* Free return values */
841: ait_freeVars(&RPC_RETVARS(c));
842:
1.24 misho 843: rpc->call_len = htonl(wlen);
1.25 misho 844: rpc->call_io = RPC_ACK;
1.24 misho 845:
846: /* calculate CRC */
847: rpc->call_crc ^= rpc->call_crc;
848: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
849:
850: /* send reply */
851: AIT_SET_BUF(&b, NULL, MIN(wlen, s->srv_netbuf) + ETHER_HDR_LEN);
852: eh = (struct ether_header*) AIT_GET_BUF(&b);
853: memcpy(eh->ether_dhost, LLADDR(&c->cli_sa.sdl), ETHER_ADDR_LEN);
854: eh->ether_type = htons(RPC_DEFPORT);
855: memcpy(eh + 1, buf, MIN(wlen, s->srv_netbuf));
856:
857: ret = write(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b));
858: AIT_FREE_VAL(&b);
859: if (ret == -1) {
860: /* close connection */
861: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
862: TASK_ARG(task), 0, NULL, 0);
1.22 misho 863: return NULL;
864: }
1.13 misho 865:
1.24 misho 866: return NULL;
867: }
868:
869: static void *
870: rxBPFPacket(sched_task_t *task)
871: {
872: rpc_srv_t *srv = TASK_ARG(task);
873: rpc_cli_t *c = NULL;
874: int len, rlen, noreply;
875: u_short crc;
876: struct tagRPCCall *rpc;
877: sockaddr_t sa;
878: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
879: struct bpf_hdr *h;
880: struct ether_header *eh;
881: ait_val_t b = AIT_VAL_INIT;
882:
883: /* receive connect packet */
884: AIT_SET_BUF(&b, NULL, srv->srv_netbuf);
885: rlen = read(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b));
886: h = (struct bpf_hdr*) AIT_GET_BUF(&b);
887: rlen -= h->bh_hdrlen;
888: if (rlen < h->bh_datalen || h->bh_caplen != h->bh_datalen ||
889: rlen < ETHER_HDR_LEN + sizeof(struct tagRPCCall)) {
890: rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
891: goto end;
892: } else {
893: rlen = h->bh_caplen;
894: eh = (struct ether_header*) (AIT_GET_BUF(&b) + h->bh_hdrlen);
895: rlen -= ETHER_HDR_LEN;
896: rpc = (struct tagRPCCall*) (eh + 1);
1.25 misho 897:
898: #if 0
899: /* skip loop packet */
900: if (rpc->call_io & RPC_ACK)
901: goto end;
902: #endif
903:
1.24 misho 904: if (eh->ether_type != ntohs(RPC_DEFPORT))
905: goto end;
906: else
907: e_getlinkbymac((const ether_addr_t*) eh->ether_shost, &sa);
908: }
909:
910: c = _allocClient(srv, &sa);
911: if (!c) {
912: EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
913: usleep(2000); /* blocked client delay */
914: goto end;
915: } else {
916: len = ntohl(rpc->call_len);
917: if (len > AIT_LEN(&c->cli_buf))
918: AIT_RE_BUF(&c->cli_buf, len);
1.26.2.1 misho 919: memcpy(AIT_GET_BUF(&c->cli_buf), rpc, len);
1.24 misho 920: rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
921:
922: c->cli_sock = TASK_FD(task);
923: memcpy(&c->cli_sa, &sa, sizeof c->cli_sa);
924:
925: /* armed timer for close stateless connection */
926: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
927: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
928: c, ts, c, 0);
929: }
930:
931: /* check integrity of packet */
932: crc = ntohs(rpc->call_crc);
933: rpc->call_crc ^= rpc->call_crc;
934: if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
935: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
936: /* close connection */
937: schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
938: c, 0, NULL, 0);
939: goto end;
940: }
941:
1.22 misho 942: noreply = RPC_CHK_NOREPLY(rpc);
1.18 misho 943:
1.22 misho 944: /* check RPC packet session info */
945: if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
946: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
1.13 misho 947:
1.22 misho 948: rpc->call_argc ^= rpc->call_argc;
949: rpc->call_rep.ret = RPC_ERROR(-1);
950: rpc->call_rep.eno = RPC_ERROR(errno);
951: } else {
952: /* execute RPC call */
953: schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
954: }
1.13 misho 955:
1.22 misho 956: /* send RPC reply */
957: if (!noreply)
1.24 misho 958: schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],
1.22 misho 959: c, TASK_FD(task), rpc, len);
1.13 misho 960: end:
1.24 misho 961: AIT_FREE_VAL(&b);
1.13 misho 962: schedReadSelf(task);
963: return NULL;
964: }
965:
1.25 misho 966:
967: static void *
968: txEXTPacket(sched_task_t *task)
969: {
970: rpc_cli_t *c = TASK_ARG(task);
971: rpc_srv_t *s = c->cli_parent;
972: rpc_func_t *f = NULL;
973: u_char *buf = AIT_GET_BUF(&c->cli_buf);
974: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
975: int ret, len, wlen = sizeof(struct tagRPCCall);
976: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
977:
978: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
979: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
980: TASK_ARG(task), ts, TASK_ARG(task), 0);
981:
982: if (rpc->call_argc) {
983: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
984: if (!f) {
985: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
986: rpc->call_argc ^= rpc->call_argc;
987: rpc->call_rep.ret = RPC_ERROR(-1);
988: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
989: } else {
990: /* calc estimated length */
991: len = ait_resideVars(RPC_RETVARS(c)) + wlen;
992: if (len > AIT_LEN(&c->cli_buf))
993: AIT_RE_BUF(&c->cli_buf, len);
994: buf = AIT_GET_BUF(&c->cli_buf);
995: rpc = (struct tagRPCCall*) buf;
996:
997: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
998: /* Go Encapsulate variables */
999: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
1000: RPC_RETVARS(c));
1001: if (ret == -1) {
1002: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
1003: rpc->call_argc ^= rpc->call_argc;
1004: rpc->call_rep.ret = RPC_ERROR(-1);
1005: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
1006: } else
1007: wlen += ret;
1008: }
1009: }
1010:
1.26.2.5 misho 1011: /* Free return values */
1012: ait_freeVars(&RPC_RETVARS(c));
1013:
1.25 misho 1014: rpc->call_len = htonl(wlen);
1015: rpc->call_io = RPC_ACK;
1016:
1017: /* send reply */
1018: ret = write(TASK_FD(task), buf, MIN(wlen, s->srv_netbuf));
1019: if (ret == -1) {
1020: /* close connection */
1021: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
1022: TASK_ARG(task), 0, NULL, 0);
1023: return NULL;
1024: }
1025:
1026: return NULL;
1027: }
1028:
1029: static void *
1030: rxEXTPacket(sched_task_t *task)
1031: {
1032: rpc_srv_t *srv = TASK_ARG(task);
1033: rpc_cli_t *c = NULL;
1034: int len, rlen, noreply;
1035: struct tagRPCCall *rpc;
1036: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
1037: ait_val_t b = AIT_VAL_INIT;
1038: sockaddr_t sa;
1039:
1040: memset(&sa, 0, sizeof sa);
1041: /* receive connect packet */
1042: AIT_SET_BUF(&b, NULL, srv->srv_netbuf);
1043: rlen = read(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b));
1.26.2.6! misho 1044: rpc = (struct tagRPCCall*) AIT_GET_BUF(&b);
! 1045: if (rlen < sizeof(struct tagRPCCall) || rlen < ntohl(rpc->call_len)) {
1.25 misho 1046: rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
1047: goto end;
1048: }
1049:
1.26.2.6! misho 1050: /* skip loop packet */
! 1051: if (rpc->call_io & RPC_ACK)
! 1052: goto end;
! 1053:
1.25 misho 1054: c = _allocClient(srv, &sa);
1055: if (!c) {
1056: EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
1057: usleep(2000); /* blocked client delay */
1058: goto end;
1059: } else {
1060: len = ntohl(rpc->call_len);
1061: if (len > AIT_LEN(&c->cli_buf))
1062: AIT_RE_BUF(&c->cli_buf, len);
1063: memcpy(AIT_GET_BUF(&c->cli_buf), rpc, AIT_LEN(&c->cli_buf));
1064: rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
1065:
1066: c->cli_sock = TASK_FD(task);
1067:
1068: /* armed timer for close stateless connection */
1069: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
1070: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
1071: c, ts, c, 0);
1072: }
1073:
1074: noreply = RPC_CHK_NOREPLY(rpc);
1075:
1076: /* check RPC packet session info */
1077: if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
1078: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
1079:
1080: rpc->call_argc ^= rpc->call_argc;
1081: rpc->call_rep.ret = RPC_ERROR(-1);
1082: rpc->call_rep.eno = RPC_ERROR(errno);
1083: } else {
1084: /* execute RPC call */
1085: schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
1086: }
1087:
1088: /* send RPC reply */
1089: if (!noreply)
1090: schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],
1091: c, TASK_FD(task), rpc, len);
1092: end:
1093: AIT_FREE_VAL(&b);
1094: schedReadSelf(task);
1095: return NULL;
1096: }
1097:
1.13 misho 1098: /* ------------------------------------------------------ */
1099:
1.16 misho 1100: void
1.13 misho 1101: rpc_freeBLOBCli(rpc_cli_t * __restrict c)
1102: {
1103: rpc_srv_t *s = c->cli_parent;
1104:
1105: schedCancelby(s->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
1.10 misho 1106:
1107: /* free buffer */
1108: AIT_FREE_VAL(&c->cli_buf);
1109:
1.14 misho 1110: array_Del(s->srv_blob.clients, c->cli_id, 0);
1.10 misho 1111: if (c)
1.14 misho 1112: e_free(c);
1.13 misho 1113: }
1114:
1115:
1116: static void *
1117: closeBLOBClient(sched_task_t *task)
1118: {
1119: int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock;
1120:
1121: rpc_freeBLOBCli(TASK_ARG(task));
1122:
1123: /* close client socket */
1124: shutdown(sock, SHUT_RDWR);
1125: close(sock);
1.7 misho 1126: return NULL;
1127: }
1128:
1129: static void *
1130: txBLOB(sched_task_t *task)
1131: {
1.10 misho 1132: rpc_cli_t *c = TASK_ARG(task);
1133: u_char *buf = AIT_GET_BUF(&c->cli_buf);
1.7 misho 1134: int wlen = sizeof(struct tagBLOBHdr);
1135:
1136: /* send reply */
1.10 misho 1137: wlen = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);
1138: if (wlen == -1 || wlen != sizeof(struct tagBLOBHdr)) {
1139: /* close blob connection */
1140: schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
1141: }
1.7 misho 1142:
1143: return NULL;
1144: }
1.4 misho 1145:
1.7 misho 1146: static void *
1147: rxBLOB(sched_task_t *task)
1148: {
1149: rpc_cli_t *c = TASK_ARG(task);
1150: rpc_srv_t *s = c->cli_parent;
1151: rpc_blob_t *b;
1.10 misho 1152: struct tagBLOBHdr blob;
1.7 misho 1153: int rlen;
1154:
1.10 misho 1155: memset(&blob, 0, sizeof blob);
1156: rlen = recv(TASK_FD(task), &blob, sizeof blob, 0);
1157: if (rlen < 1) {
1158: /* close blob connection */
1159: schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
1.7 misho 1160: return NULL;
1161: }
1.5 misho 1162:
1.10 misho 1163: /* check BLOB packet */
1164: if (rlen < sizeof(struct tagBLOBHdr)) {
1165: rpc_SetErr(ERPCMISMATCH, "Short BLOB packet");
1.6 misho 1166:
1.10 misho 1167: schedReadSelf(task);
1.7 misho 1168: return NULL;
1169: }
1.1 misho 1170:
1.7 misho 1171: /* check RPC packet session info */
1.15 misho 1172: if (rpc_chkPktSession(&blob.hdr_session, &s->srv_session)) {
1.7 misho 1173: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
1.10 misho 1174: blob.hdr_cmd = error;
1.7 misho 1175: goto end;
1176: }
1177:
1178: /* Go to proceed packet ... */
1.10 misho 1179: switch (blob.hdr_cmd) {
1.7 misho 1180: case get:
1.10 misho 1181: if (!(b = rpc_srv_getBLOB(s, ntohl(blob.hdr_var)))) {
1182: rpc_SetErr(EINVAL, "Var=%x not found", ntohl(blob.hdr_var));
1183: blob.hdr_cmd = no;
1184: blob.hdr_ret = RPC_ERROR(-1);
1.7 misho 1185: break;
1186: } else
1.10 misho 1187: blob.hdr_len = htonl(b->blob_len);
1.5 misho 1188:
1.7 misho 1189: if (rpc_srv_blobMap(s, b) != -1) {
1190: /* deliver BLOB variable to client */
1.10 misho 1191: blob.hdr_ret = htonl(rpc_srv_sendBLOB(c, b));
1.7 misho 1192: rpc_srv_blobUnmap(b);
1193: } else {
1.10 misho 1194: blob.hdr_cmd = error;
1195: blob.hdr_ret = RPC_ERROR(-1);
1.7 misho 1196: }
1197: break;
1198: case set:
1.17 misho 1199: if ((b = rpc_srv_registerBLOB(s, ntohl(blob.hdr_len),
1200: ntohl(blob.hdr_ret)))) {
1.7 misho 1201: /* set new BLOB variable for reply :) */
1.10 misho 1202: blob.hdr_var = htonl(b->blob_var);
1.7 misho 1203:
1204: /* receive BLOB from client */
1.10 misho 1205: blob.hdr_ret = htonl(rpc_srv_recvBLOB(c, b));
1.7 misho 1206: rpc_srv_blobUnmap(b);
1.5 misho 1207: } else {
1.10 misho 1208: blob.hdr_cmd = error;
1209: blob.hdr_ret = RPC_ERROR(-1);
1.7 misho 1210: }
1211: break;
1212: case unset:
1.11 misho 1213: if (rpc_srv_unregisterBLOB(s, ntohl(blob.hdr_var)) == -1) {
1.10 misho 1214: blob.hdr_cmd = error;
1215: blob.hdr_ret = RPC_ERROR(-1);
1.1 misho 1216: }
1217: break;
1.7 misho 1218: default:
1.10 misho 1219: rpc_SetErr(EPROCUNAVAIL, "Unsupported BLOB command %d", blob.hdr_cmd);
1220: blob.hdr_cmd = error;
1221: blob.hdr_ret = RPC_ERROR(-1);
1.7 misho 1222: }
1.1 misho 1223:
1.7 misho 1224: end:
1.10 misho 1225: memcpy(AIT_ADDR(&c->cli_buf), &blob, sizeof blob);
1226: schedWrite(TASK_ROOT(task), txBLOB, TASK_ARG(task), TASK_FD(task), NULL, 0);
1227: schedReadSelf(task);
1.7 misho 1228: return NULL;
1.2 misho 1229: }
1230:
1231: static void *
1.17 misho 1232: flushBLOB(sched_task_t *task)
1233: {
1.23 misho 1234: uintptr_t sigArg = atomic_load_acq_ptr(&_glSigArg);
1235: rpc_srv_t *srv = sigArg ? (void*) sigArg : TASK_ARG(task);
1.17 misho 1236: rpc_blob_t *b, *tmp;
1237:
1238: TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
1239: TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
1240:
1241: rpc_srv_blobFree(srv, b);
1242: e_free(b);
1243: }
1244:
1.23 misho 1245: if (!schedSignalSelf(task)) {
1246: /* disabled kqueue support in libaitsched */
1247: struct sigaction sa;
1248:
1249: memset(&sa, 0, sizeof sa);
1250: sigemptyset(&sa.sa_mask);
1251: sa.sa_handler = (void (*)(int)) flushBLOB;
1252: sa.sa_flags = SA_RESTART | SA_RESETHAND;
1253: sigaction(SIGFBLOB, &sa, NULL);
1254: }
1255:
1.17 misho 1256: return NULL;
1257: }
1258:
1259: static void *
1.10 misho 1260: acceptBLOBClients(sched_task_t *task)
1.2 misho 1261: {
1.10 misho 1262: rpc_srv_t *srv = TASK_ARG(task);
1263: rpc_cli_t *c = NULL;
1264: register int i;
1.14 misho 1265: socklen_t salen = sizeof(sockaddr_t);
1.21 misho 1266: int sock;
1.12 misho 1267: #ifdef TCP_NOPUSH
1268: int n = 1;
1269: #endif
1.7 misho 1270:
1.10 misho 1271: /* check free slots for connect */
1.14 misho 1272: for (i = 0; i < array_Size(srv->srv_blob.clients) &&
1273: (c = array(srv->srv_blob.clients, i, rpc_cli_t*)); i++);
1.21 misho 1274: if (c) { /* no more free slots! */
1275: EVERBOSE(1, "BLOB client quota exceeded! Connection will be shutdown!\n");
1276: if ((sock = accept(TASK_FD(task), NULL, NULL)) != -1) {
1277: shutdown(sock, SHUT_RDWR);
1278: close(sock);
1279: }
1.10 misho 1280: goto end;
1.21 misho 1281: }
1282:
1.14 misho 1283: c = e_malloc(sizeof(rpc_cli_t));
1.10 misho 1284: if (!c) {
1.7 misho 1285: LOGERR;
1.10 misho 1286: srv->srv_kill = srv->srv_blob.kill = 1;
1.7 misho 1287: return NULL;
1288: } else {
1.10 misho 1289: memset(c, 0, sizeof(rpc_cli_t));
1.14 misho 1290: array_Set(srv->srv_blob.clients, i, c);
1.10 misho 1291: c->cli_id = i;
1292: c->cli_parent = srv;
1.7 misho 1293: }
1.4 misho 1294:
1.10 misho 1295: /* alloc empty buffer */
1.14 misho 1296: AIT_SET_BUFSIZ(&c->cli_buf, 0, srv->srv_netbuf);
1.2 misho 1297:
1.10 misho 1298: /* accept client */
1299: c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
1300: if (c->cli_sock == -1) {
1301: LOGERR;
1302: AIT_FREE_VAL(&c->cli_buf);
1.14 misho 1303: array_Del(srv->srv_blob.clients, i, 42);
1.10 misho 1304: goto end;
1.12 misho 1305: } else {
1306: #ifdef TCP_NOPUSH
1307: setsockopt(c->cli_sock, IPPROTO_TCP, TCP_NOPUSH, &n, sizeof n);
1308: #endif
1.10 misho 1309: fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
1.12 misho 1310: }
1.2 misho 1311:
1.10 misho 1312: schedRead(TASK_ROOT(task), rxBLOB, c, c->cli_sock, NULL, 0);
1313: end:
1314: schedReadSelf(task);
1.7 misho 1315: return NULL;
1.1 misho 1316: }
1317:
1.10 misho 1318: /* ------------------------------------------------------ */
1.1 misho 1319:
1320: /*
1.7 misho 1321: * rpc_srv_initBLOBServer() - Init & create BLOB Server
1322: *
1.4 misho 1323: * @srv = RPC server instance
1.2 misho 1324: * @Port = Port for bind server, if Port == 0 default port is selected
1325: * @diskDir = Disk place for BLOB file objects
1326: * return: -1 == error or 0 bind and created BLOB server instance
1327: */
1328: int
1329: rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)
1330: {
1331: int n = 1;
1332:
1.10 misho 1333: if (!srv || srv->srv_kill) {
1.7 misho 1334: rpc_SetErr(EINVAL, "Invalid parameters can`t init BLOB server");
1.2 misho 1335: return -1;
1336: }
1337:
1338: memset(&srv->srv_blob, 0, sizeof srv->srv_blob);
1339: if (access(diskDir, R_OK | W_OK) == -1) {
1340: LOGERR;
1341: return -1;
1342: } else
1.7 misho 1343: AIT_SET_STR(&srv->srv_blob.dir, diskDir);
1.2 misho 1344:
1.10 misho 1345: /* init blob list */
1346: TAILQ_INIT(&srv->srv_blob.blobs);
1347:
1.2 misho 1348: srv->srv_blob.server.cli_parent = srv;
1.4 misho 1349:
1.14 misho 1350: memcpy(&srv->srv_blob.server.cli_sa, &srv->srv_server.cli_sa, sizeof(sockaddr_t));
1.10 misho 1351: switch (srv->srv_blob.server.cli_sa.sa.sa_family) {
1.4 misho 1352: case AF_INET:
1.10 misho 1353: srv->srv_blob.server.cli_sa.sin.sin_port =
1354: htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin.sin_port) + 1);
1.4 misho 1355: break;
1356: case AF_INET6:
1.10 misho 1357: srv->srv_blob.server.cli_sa.sin6.sin6_port =
1358: htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin6.sin6_port) + 1);
1.4 misho 1359: break;
1360: case AF_LOCAL:
1.10 misho 1361: strlcat(srv->srv_blob.server.cli_sa.sun.sun_path, ".blob",
1362: sizeof srv->srv_blob.server.cli_sa.sun.sun_path);
1.4 misho 1363: break;
1364: default:
1.7 misho 1365: AIT_FREE_VAL(&srv->srv_blob.dir);
1.4 misho 1366: return -1;
1.2 misho 1367: }
1368:
1.4 misho 1369: /* create BLOB server socket */
1.6 misho 1370: srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);
1.2 misho 1371: if (srv->srv_blob.server.cli_sock == -1) {
1372: LOGERR;
1.7 misho 1373: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 1374: return -1;
1375: }
1376: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
1377: LOGERR;
1378: close(srv->srv_blob.server.cli_sock);
1.7 misho 1379: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 1380: return -1;
1381: }
1.5 misho 1382: n = srv->srv_netbuf;
1383: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
1384: LOGERR;
1385: close(srv->srv_blob.server.cli_sock);
1.7 misho 1386: AIT_FREE_VAL(&srv->srv_blob.dir);
1.5 misho 1387: return -1;
1388: }
1389: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
1390: LOGERR;
1391: close(srv->srv_blob.server.cli_sock);
1.7 misho 1392: AIT_FREE_VAL(&srv->srv_blob.dir);
1.5 misho 1393: return -1;
1394: }
1.6 misho 1395: if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa,
1396: srv->srv_blob.server.cli_sa.sa.sa_len) == -1) {
1.2 misho 1397: LOGERR;
1398: close(srv->srv_blob.server.cli_sock);
1.7 misho 1399: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 1400: return -1;
1.13 misho 1401: } else
1402: fcntl(srv->srv_blob.server.cli_sock, F_SETFL,
1403: fcntl(srv->srv_blob.server.cli_sock, F_GETFL) | O_NONBLOCK);
1404:
1.2 misho 1405:
1.10 misho 1406: /* allocate pool for concurent blob clients */
1.14 misho 1407: srv->srv_blob.clients = array_Init(array_Size(srv->srv_clients));
1.2 misho 1408: if (!srv->srv_blob.clients) {
1.14 misho 1409: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
1.2 misho 1410: close(srv->srv_blob.server.cli_sock);
1.7 misho 1411: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 1412: return -1;
1.10 misho 1413: }
1.2 misho 1414:
1.10 misho 1415: /* init blob scheduler */
1416: srv->srv_blob.root = schedBegin();
1417: if (!srv->srv_blob.root) {
1418: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1.14 misho 1419: array_Destroy(&srv->srv_blob.clients);
1.10 misho 1420: close(srv->srv_blob.server.cli_sock);
1421: AIT_FREE_VAL(&srv->srv_blob.dir);
1422: return -1;
1423: }
1.2 misho 1424:
1425: return 0;
1426: }
1427:
1428: /*
1.7 misho 1429: * rpc_srv_endBLOBServer() - Destroy BLOB server, close all opened sockets and free resources
1430: *
1.2 misho 1431: * @srv = RPC Server instance
1432: * return: none
1433: */
1.16 misho 1434: void
1.2 misho 1435: rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
1436: {
1.10 misho 1437: if (!srv)
1.2 misho 1438: return;
1439:
1.10 misho 1440: srv->srv_blob.kill = 1;
1.17 misho 1441:
1442: schedEnd(&srv->srv_blob.root);
1.26.2.4 misho 1443:
1444: if (srv->srv_blob.server.cli_sa.sa.sa_family == AF_LOCAL)
1445: unlink(srv->srv_blob.server.cli_sa.sun.sun_path);
1.2 misho 1446: }
1447:
1448: /*
1.10 misho 1449: * rpc_srv_loopBLOBServer() - Execute Main BLOB server loop and wait for clients requests
1.7 misho 1450: *
1.2 misho 1451: * @srv = RPC Server instance
1452: * return: -1 error or 0 ok, infinite loop ...
1453: */
1454: int
1.10 misho 1455: rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv)
1.2 misho 1456: {
1.10 misho 1457: rpc_cli_t *c;
1.2 misho 1458: register int i;
1.10 misho 1459: rpc_blob_t *b, *tmp;
1460: struct timespec ts = { RPC_SCHED_POLLING, 0 };
1.2 misho 1461:
1.10 misho 1462: if (!srv || srv->srv_kill) {
1.7 misho 1463: rpc_SetErr(EINVAL, "Invalid parameter can`t start BLOB server");
1.2 misho 1464: return -1;
1465: }
1466:
1.14 misho 1467: if (listen(srv->srv_blob.server.cli_sock, array_Size(srv->srv_blob.clients)) == -1) {
1.2 misho 1468: LOGERR;
1469: return -1;
1.10 misho 1470: }
1471:
1.23 misho 1472: if (!schedSignal(srv->srv_blob.root, flushBLOB, srv, SIGFBLOB, NULL, 0)) {
1473: /* disabled kqueue support in libaitsched */
1474: struct sigaction sa;
1475:
1476: atomic_store_rel_ptr(&_glSigArg, (uintptr_t) srv);
1477:
1478: memset(&sa, 0, sizeof sa);
1479: sigemptyset(&sa.sa_mask);
1480: sa.sa_handler = (void (*)(int)) flushBLOB;
1481: sa.sa_flags = SA_RESTART | SA_RESETHAND;
1482: sigaction(SIGFBLOB, &sa, NULL);
1483: }
1484:
1.10 misho 1485: if (!schedRead(srv->srv_blob.root, acceptBLOBClients, srv,
1486: srv->srv_blob.server.cli_sock, NULL, 0)) {
1487: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1488: return -1;
1489: }
1.2 misho 1490:
1.10 misho 1491: schedPolling(srv->srv_blob.root, &ts, NULL);
1492: /* main rpc loop */
1493: schedRun(srv->srv_blob.root, &srv->srv_blob.kill);
1.7 misho 1494:
1.17 misho 1495: /* detach blobs */
1496: TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
1497: TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
1498:
1499: rpc_srv_blobFree(srv, b);
1500: e_free(b);
1501: }
1502:
1.10 misho 1503: /* close all clients connections & server socket */
1.14 misho 1504: for (i = 0; i < array_Size(srv->srv_blob.clients); i++) {
1505: c = array(srv->srv_blob.clients, i, rpc_cli_t*);
1.10 misho 1506: if (c) {
1507: shutdown(c->cli_sock, SHUT_RDWR);
1508: close(c->cli_sock);
1509:
1510: schedCancelby(srv->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
1511: AIT_FREE_VAL(&c->cli_buf);
1.2 misho 1512: }
1.14 misho 1513: array_Del(srv->srv_blob.clients, i, 42);
1.10 misho 1514: }
1.14 misho 1515: array_Destroy(&srv->srv_blob.clients);
1.2 misho 1516:
1.10 misho 1517: close(srv->srv_blob.server.cli_sock);
1.2 misho 1518:
1.10 misho 1519: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 1520: return 0;
1521: }
1522:
1523:
1524: /*
1.7 misho 1525: * rpc_srv_initServer() - Init & create RPC Server
1526: *
1.15 misho 1527: * @InstID = Instance for authentication & recognition
1.1 misho 1528: * @concurentClients = Concurent clients at same time to this server
1.10 misho 1529: * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
1.4 misho 1530: * @csHost = Host name or address for bind server, if NULL any address
1.1 misho 1531: * @Port = Port for bind server, if Port == 0 default port is selected
1.13 misho 1532: * @proto = Protocol, if == 0 choose SOCK_STREAM
1.1 misho 1533: * return: NULL == error or !=NULL bind and created RPC server instance
1534: */
1535: rpc_srv_t *
1.15 misho 1536: rpc_srv_initServer(u_char InstID, int concurentClients, int netBuf,
1537: const char *csHost, u_short Port, int proto)
1.1 misho 1538: {
1.10 misho 1539: int n = 1;
1.1 misho 1540: rpc_srv_t *srv = NULL;
1.14 misho 1541: sockaddr_t sa = E_SOCKADDR_INIT;
1.1 misho 1542:
1.25 misho 1543: if (!concurentClients || (proto < 0 || proto > SOCK_RAW)) {
1.10 misho 1544: rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
1.1 misho 1545: return NULL;
1546: }
1.26.2.3 misho 1547: if (!Port && proto < SOCK_RAW)
1.1 misho 1548: Port = RPC_DEFPORT;
1.26.2.2 misho 1549: if (!e_gethostbyname(csHost, Port, &sa))
1550: return NULL;
1.13 misho 1551: if (!proto)
1552: proto = SOCK_STREAM;
1.10 misho 1553: if (netBuf < RPC_MIN_BUFSIZ)
1.5 misho 1554: netBuf = BUFSIZ;
1.7 misho 1555: else
1.14 misho 1556: netBuf = E_ALIGN(netBuf, 2); /* align netBuf length */
1.10 misho 1557:
1558: #ifdef HAVE_SRANDOMDEV
1559: srandomdev();
1560: #else
1561: time_t tim;
1562:
1563: srandom((time(&tim) ^ getpid()));
1564: #endif
1.1 misho 1565:
1.14 misho 1566: srv = e_malloc(sizeof(rpc_srv_t));
1.1 misho 1567: if (!srv) {
1568: LOGERR;
1569: return NULL;
1570: } else
1571: memset(srv, 0, sizeof(rpc_srv_t));
1572:
1.13 misho 1573: srv->srv_proto = proto;
1.5 misho 1574: srv->srv_netbuf = netBuf;
1.1 misho 1575: srv->srv_session.sess_version = RPC_VERSION;
1.15 misho 1576: srv->srv_session.sess_instance = InstID;
1.1 misho 1577:
1578: srv->srv_server.cli_parent = srv;
1.10 misho 1579: memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa);
1580:
1.12 misho 1581: /* init functions */
1582: pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
1583: SLIST_INIT(&srv->srv_funcs);
1584: AVL_INIT(&srv->srv_funcs);
1.10 misho 1585:
1586: /* init scheduler */
1587: srv->srv_root = schedBegin();
1588: if (!srv->srv_root) {
1589: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1.12 misho 1590: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1.14 misho 1591: e_free(srv);
1.10 misho 1592: return NULL;
1593: }
1594:
1595: /* init pool for clients */
1.14 misho 1596: srv->srv_clients = array_Init(concurentClients);
1.10 misho 1597: if (!srv->srv_clients) {
1.14 misho 1598: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
1.10 misho 1599: schedEnd(&srv->srv_root);
1.12 misho 1600: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1.14 misho 1601: e_free(srv);
1.10 misho 1602: return NULL;
1603: }
1.4 misho 1604:
1605: /* create server socket */
1.26 misho 1606: srv->srv_server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family,
1607: srv->srv_proto, srv->srv_proto == SOCK_RAW ? IPPROTO_ERPC : 0);
1.1 misho 1608: if (srv->srv_server.cli_sock == -1) {
1609: LOGERR;
1.14 misho 1610: array_Destroy(&srv->srv_clients);
1.10 misho 1611: schedEnd(&srv->srv_root);
1.12 misho 1612: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1.14 misho 1613: e_free(srv);
1.1 misho 1614: return NULL;
1615: }
1616: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
1617: LOGERR;
1.10 misho 1618: goto err;
1.1 misho 1619: }
1.5 misho 1620: n = srv->srv_netbuf;
1621: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
1622: LOGERR;
1.10 misho 1623: goto err;
1.5 misho 1624: }
1625: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
1626: LOGERR;
1.10 misho 1627: goto err;
1.5 misho 1628: }
1.6 misho 1629: if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa,
1630: srv->srv_server.cli_sa.sa.sa_len) == -1) {
1.1 misho 1631: LOGERR;
1.10 misho 1632: goto err;
1.13 misho 1633: } else
1634: fcntl(srv->srv_server.cli_sock, F_SETFL,
1635: fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
1.1 misho 1636:
1.10 misho 1637: rpc_register_srvPing(srv);
1.8 misho 1638:
1.1 misho 1639: return srv;
1.10 misho 1640: err: /* error condition */
1641: close(srv->srv_server.cli_sock);
1.14 misho 1642: array_Destroy(&srv->srv_clients);
1.10 misho 1643: schedEnd(&srv->srv_root);
1.12 misho 1644: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1.14 misho 1645: e_free(srv);
1.10 misho 1646: return NULL;
1.1 misho 1647: }
1648:
1649: /*
1.7 misho 1650: * rpc_srv_endServer() - Destroy RPC server, close all opened sockets and free resources
1651: *
1.6 misho 1652: * @psrv = RPC Server instance
1.1 misho 1653: * return: none
1654: */
1.16 misho 1655: void
1.6 misho 1656: rpc_srv_endServer(rpc_srv_t ** __restrict psrv)
1.1 misho 1657: {
1.10 misho 1658: if (!psrv || !*psrv)
1.1 misho 1659: return;
1660:
1.10 misho 1661: /* if send kill to blob server */
1.17 misho 1662: rpc_srv_endBLOBServer(*psrv);
1.1 misho 1663:
1.10 misho 1664: (*psrv)->srv_kill = 1;
1665: sleep(RPC_SCHED_POLLING);
1.2 misho 1666:
1.17 misho 1667: schedEnd(&(*psrv)->srv_root);
1668:
1.26.2.4 misho 1669: if ((*psrv)->srv_server.cli_sa.sa.sa_family == AF_LOCAL)
1670: unlink((*psrv)->srv_server.cli_sa.sun.sun_path);
1671:
1.12 misho 1672: pthread_mutex_destroy(&(*psrv)->srv_funcs.mtx);
1.14 misho 1673: e_free(*psrv);
1.6 misho 1674: *psrv = NULL;
1.1 misho 1675: }
1676:
1677: /*
1.7 misho 1678: * rpc_srv_loopServer() - Execute Main server loop and wait for clients requests
1679: *
1.1 misho 1680: * @srv = RPC Server instance
1681: * return: -1 error or 0 ok, infinite loop ...
1682: */
1683: int
1.5 misho 1684: rpc_srv_loopServer(rpc_srv_t * __restrict srv)
1.1 misho 1685: {
1.10 misho 1686: rpc_cli_t *c;
1.1 misho 1687: register int i;
1.12 misho 1688: rpc_func_t *f;
1.10 misho 1689: struct timespec ts = { RPC_SCHED_POLLING, 0 };
1.1 misho 1690:
1691: if (!srv) {
1.10 misho 1692: rpc_SetErr(EINVAL, "Invalid parameter can`t start RPC server");
1.1 misho 1693: return -1;
1694: }
1695:
1.13 misho 1696: if (srv->srv_proto == SOCK_STREAM)
1.14 misho 1697: if (listen(srv->srv_server.cli_sock, array_Size(srv->srv_clients)) == -1) {
1.13 misho 1698: LOGERR;
1699: return -1;
1700: }
1.1 misho 1701:
1.13 misho 1702: if (!schedRead(srv->srv_root, cbProto[srv->srv_proto][CB_ACCEPTCLIENT], srv,
1703: srv->srv_server.cli_sock, NULL, 0)) {
1.10 misho 1704: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1705: return -1;
1706: }
1.7 misho 1707:
1.10 misho 1708: schedPolling(srv->srv_root, &ts, NULL);
1.7 misho 1709: /* main rpc loop */
1.10 misho 1710: schedRun(srv->srv_root, &srv->srv_kill);
1711:
1712: /* close all clients connections & server socket */
1.14 misho 1713: for (i = 0; i < array_Size(srv->srv_clients); i++) {
1714: c = array(srv->srv_clients, i, rpc_cli_t*);
1.10 misho 1715: if (c) {
1.24 misho 1716: if (srv->srv_proto == SOCK_STREAM) {
1717: shutdown(c->cli_sock, SHUT_RDWR);
1718: close(c->cli_sock);
1719: }
1.10 misho 1720:
1721: schedCancelby(srv->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
1.14 misho 1722: ait_freeVars(&RPC_RETVARS(c));
1.10 misho 1723: AIT_FREE_VAL(&c->cli_buf);
1.1 misho 1724: }
1.14 misho 1725: array_Del(srv->srv_clients, i, 42);
1.10 misho 1726: }
1.14 misho 1727: array_Destroy(&srv->srv_clients);
1.2 misho 1728:
1.25 misho 1729: if (srv->srv_proto != SOCK_EXT)
1730: close(srv->srv_server.cli_sock);
1.2 misho 1731:
1.10 misho 1732: /* detach exported calls */
1.12 misho 1733: RPC_FUNCS_LOCK(&srv->srv_funcs);
1734: while ((f = SLIST_FIRST(&srv->srv_funcs))) {
1735: SLIST_REMOVE_HEAD(&srv->srv_funcs, func_next);
1.1 misho 1736:
1.10 misho 1737: AIT_FREE_VAL(&f->func_name);
1.14 misho 1738: e_free(f);
1.1 misho 1739: }
1.12 misho 1740: srv->srv_funcs.avlh_root = NULL;
1741: RPC_FUNCS_UNLOCK(&srv->srv_funcs);
1.1 misho 1742:
1743: return 0;
1744: }
1745:
1746:
1747: /*
1748: * rpc_srv_execCall() Execute registered call from RPC server
1.7 misho 1749: *
1.10 misho 1750: * @cli = RPC client
1.1 misho 1751: * @rpc = IN RPC call structure
1.10 misho 1752: * @funcname = Execute RPC function
1.5 misho 1753: * @args = IN RPC calling arguments from RPC client
1.1 misho 1754: * return: -1 error, !=-1 ok
1755: */
1756: int
1.10 misho 1757: rpc_srv_execCall(rpc_cli_t * __restrict cli, struct tagRPCCall * __restrict rpc,
1758: ait_val_t funcname, array_t * __restrict args)
1.1 misho 1759: {
1760: rpc_callback_t func;
1761:
1.10 misho 1762: if (!cli || !rpc || !AIT_ADDR(&funcname)) {
1.7 misho 1763: rpc_SetErr(EINVAL, "Invalid parameter can`t exec function");
1.1 misho 1764: return -1;
1765: }
1766:
1.10 misho 1767: func = AIT_GET_LIKE(&funcname, rpc_callback_t);
1768: return func(cli, rpc, args);
1.1 misho 1769: }
1.24 misho 1770:
1771:
1772: /*
1773: * rpc_srv_initServer2() - Init & create layer2 RPC Server
1774: *
1775: * @InstID = Instance for authentication & recognition
1776: * @concurentClients = Concurent clients at same time to this server
1777: * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
1778: * @csIface = Interface name for bind server, if NULL first interface on host
1779: * return: NULL == error or !=NULL bind and created RPC server instance
1780: */
1781: rpc_srv_t *
1782: rpc_srv_initServer2(u_char InstID, int concurentClients, int netBuf, const char *csIface)
1783: {
1784: int n = 1;
1785: rpc_srv_t *srv = NULL;
1786: sockaddr_t sa = E_SOCKADDR_INIT;
1787: char szIface[64], szStr[STRSIZ];
1788: register int i;
1789: struct ifreq ifr;
1790: struct bpf_insn insns[] = {
1791: BPF_STMT(BPF_LD + BPF_H + BPF_ABS, 12),
1792: BPF_JUMP(BPF_JMP + BPF_JEQ + BPF_K, RPC_DEFPORT, 0, 1),
1793: BPF_STMT(BPF_RET + BPF_K, -1),
1794: BPF_STMT(BPF_RET + BPF_K, 0),
1795: };
1796: struct bpf_program fcode = {
1797: .bf_len = sizeof(insns) / sizeof(struct bpf_insn),
1798: .bf_insns = insns
1799: };
1800:
1801: if (!concurentClients) {
1802: rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
1803: return NULL;
1804: }
1805: if (!csIface) {
1806: if (e_get1stiface(szIface, sizeof szIface))
1807: return NULL;
1808: } else
1809: strlcpy(szIface, csIface, sizeof szIface);
1810: if (!e_getifacebyname(szIface, &sa))
1811: return NULL;
1812:
1813: #ifdef HAVE_SRANDOMDEV
1814: srandomdev();
1815: #else
1816: time_t tim;
1817:
1818: srandom((time(&tim) ^ getpid()));
1819: #endif
1820:
1821: srv = e_malloc(sizeof(rpc_srv_t));
1822: if (!srv) {
1823: LOGERR;
1824: return NULL;
1825: } else
1826: memset(srv, 0, sizeof(rpc_srv_t));
1827:
1828: srv->srv_proto = SOCK_BPF;
1829: srv->srv_netbuf = netBuf;
1830: srv->srv_session.sess_version = RPC_VERSION;
1831: srv->srv_session.sess_instance = InstID;
1832:
1833: srv->srv_server.cli_parent = srv;
1834: memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa);
1835:
1836: /* init functions */
1837: pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
1838: SLIST_INIT(&srv->srv_funcs);
1839: AVL_INIT(&srv->srv_funcs);
1840:
1841: /* init scheduler */
1842: srv->srv_root = schedBegin();
1843: if (!srv->srv_root) {
1844: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1845: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1846: e_free(srv);
1847: return NULL;
1848: }
1849:
1850: /* init pool for clients */
1851: srv->srv_clients = array_Init(concurentClients);
1852: if (!srv->srv_clients) {
1853: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
1854: schedEnd(&srv->srv_root);
1855: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1856: e_free(srv);
1857: return NULL;
1858: }
1859:
1860: /* create server handler */
1861: for (i = 0; i < 10; i++) {
1862: memset(szStr, 0, sizeof szStr);
1863: snprintf(szStr, sizeof szStr, "/dev/bpf%d", i);
1864: srv->srv_server.cli_sock = open(szStr, O_RDWR);
1865: if (srv->srv_server.cli_sock > STDERR_FILENO)
1866: break;
1867: }
1868: if (srv->srv_server.cli_sock < 3) {
1869: LOGERR;
1870: array_Destroy(&srv->srv_clients);
1871: schedEnd(&srv->srv_root);
1872: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1873: e_free(srv);
1874: return NULL;
1875: }
1876:
1877: if (ioctl(srv->srv_server.cli_sock, BIOCIMMEDIATE, &n) == -1) {
1878: LOGERR;
1879: goto err;
1880: }
1881: if (ioctl(srv->srv_server.cli_sock, BIOCSETF, &fcode) == -1) {
1882: LOGERR;
1883: goto err;
1884: }
1885: n = (netBuf < RPC_MIN_BUFSIZ) ? getpagesize() : E_ALIGN(netBuf, 2);
1886: if (ioctl(srv->srv_server.cli_sock, BIOCSBLEN, &n) == -1) {
1887: LOGERR;
1888: goto err;
1889: } else
1890: srv->srv_netbuf = n;
1891:
1892: memset(&ifr, 0, sizeof ifr);
1893: strlcpy(ifr.ifr_name, szIface, sizeof ifr.ifr_name);
1894: if (ioctl(srv->srv_server.cli_sock, BIOCSETIF, &ifr) == -1) {
1895: LOGERR;
1896: goto err;
1897: } else
1898: fcntl(srv->srv_server.cli_sock, F_SETFL,
1899: fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
1900:
1901: rpc_register_srvPing(srv);
1902:
1903: return srv;
1904: err: /* error condition */
1905: close(srv->srv_server.cli_sock);
1906: array_Destroy(&srv->srv_clients);
1907: schedEnd(&srv->srv_root);
1908: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1909: e_free(srv);
1910: return NULL;
1911: }
1.25 misho 1912:
1913: /*
1914: * rpc_srv_initServerExt() - Init & create pipe RPC Server
1915: *
1916: * @InstID = Instance for authentication & recognition
1917: * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
1918: * @fd = File descriptor
1919: * return: NULL == error or !=NULL bind and created RPC server instance
1920: */
1921: rpc_srv_t *
1922: rpc_srv_initServerExt(u_char InstID, int netBuf, int fd)
1923: {
1924: rpc_srv_t *srv = NULL;
1925:
1926: #ifdef HAVE_SRANDOMDEV
1927: srandomdev();
1928: #else
1929: time_t tim;
1930:
1931: srandom((time(&tim) ^ getpid()));
1932: #endif
1933:
1934: srv = e_malloc(sizeof(rpc_srv_t));
1935: if (!srv) {
1936: LOGERR;
1937: return NULL;
1938: } else
1939: memset(srv, 0, sizeof(rpc_srv_t));
1940:
1941: srv->srv_proto = SOCK_EXT;
1942: srv->srv_netbuf = (netBuf < RPC_MIN_BUFSIZ) ?
1943: getpagesize() : E_ALIGN(netBuf, 2);
1944: srv->srv_session.sess_version = RPC_VERSION;
1945: srv->srv_session.sess_instance = InstID;
1946:
1947: srv->srv_server.cli_parent = srv;
1948: srv->srv_server.cli_sock = fd;
1949:
1950: /* init functions */
1951: pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
1952: SLIST_INIT(&srv->srv_funcs);
1953: AVL_INIT(&srv->srv_funcs);
1954:
1955: /* init scheduler */
1956: srv->srv_root = schedBegin();
1957: if (!srv->srv_root) {
1958: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1959: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1960: e_free(srv);
1961: return NULL;
1962: }
1963:
1964: /* init pool for clients */
1965: srv->srv_clients = array_Init(1);
1966: if (!srv->srv_clients) {
1967: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
1968: schedEnd(&srv->srv_root);
1969: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1970: e_free(srv);
1971: return NULL;
1972: }
1973:
1974: fcntl(srv->srv_server.cli_sock, F_SETFL,
1975: fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
1976:
1977: rpc_register_srvPing(srv);
1978:
1979: return srv;
1980: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>