Annotation of libaitrpc/src/srv.c, revision 1.26.2.5
1.1 misho 1: /*************************************************************************
2: * (C) 2010 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.26.2.5! misho 6: * $Id: srv.c,v 1.26.2.4 2015/06/24 23:02:28 misho Exp $
1.1 misho 7: *
1.2 misho 8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
1.25 misho 15: Copyright 2004 - 2015
1.2 misho 16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
1.1 misho 46: #include "global.h"
47:
48:
1.13 misho 49: /* SOCK_STREAM */
50: static void *acceptClients(sched_task_t *);
51: static void *closeClient(sched_task_t *);
52: static void *rxPacket(sched_task_t *);
53: static void *txPacket(sched_task_t *);
54:
55: /* SOCK_DGRAM */
56: static void *freeClient(sched_task_t *);
57: static void *rxUDPPacket(sched_task_t *);
58: static void *txUDPPacket(sched_task_t *);
59:
60: /* SOCK_RAW */
1.26 misho 61: static void *rxRAWPacket(sched_task_t *);
62: static void *txRAWPacket(sched_task_t *);
1.13 misho 63:
1.24 misho 64: /* SOCK_BPF */
65: static void *rxBPFPacket(sched_task_t *);
66: static void *txBPFPacket(sched_task_t *);
67:
1.25 misho 68: /* SOCK_EXT */
69: static void *rxEXTPacket(sched_task_t *);
70: static void *txEXTPacket(sched_task_t *);
71:
72: static sched_task_func_t cbProto[SOCK_MAX_SUPPORT][4] = {
1.13 misho 73: { acceptClients, closeClient, rxPacket, txPacket }, /* SOCK_STREAM */
74: { acceptClients, closeClient, rxPacket, txPacket }, /* SOCK_STREAM */
75: { rxUDPPacket, freeClient, rxUDPPacket, txUDPPacket }, /* SOCK_DGRAM */
1.26 misho 76: { rxRAWPacket, freeClient, rxRAWPacket, txRAWPacket }, /* SOCK_RAW */
1.25 misho 77: { rxBPFPacket, freeClient, rxBPFPacket, txBPFPacket }, /* SOCK_BPF */
78: { rxEXTPacket, freeClient, rxEXTPacket, txEXTPacket } /* SOCK_EXT */
1.13 misho 79: };
80:
1.23 misho 81: /* Global Signal Argument when kqueue support disabled */
82:
83: static volatile uintptr_t _glSigArg = 0;
84:
1.13 misho 85:
1.16 misho 86: void
1.13 misho 87: rpc_freeCli(rpc_cli_t * __restrict c)
1.10 misho 88: {
89: rpc_srv_t *s = c->cli_parent;
90:
1.13 misho 91: schedCancelby(s->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
1.10 misho 92:
93: /* free buffer */
94: AIT_FREE_VAL(&c->cli_buf);
95:
1.14 misho 96: array_Del(s->srv_clients, c->cli_id, 0);
1.10 misho 97: if (c)
1.14 misho 98: e_free(c);
1.13 misho 99: }
100:
101:
102: static inline int
1.14 misho 103: _check4freeslot(rpc_srv_t * __restrict srv, sockaddr_t * __restrict sa)
1.13 misho 104: {
105: rpc_cli_t *c = NULL;
106: register int i;
107:
108: /* check free slots for connect */
1.14 misho 109: for (i = 0; i < array_Size(srv->srv_clients) &&
110: (c = array(srv->srv_clients, i, rpc_cli_t*)); i++)
1.13 misho 111: /* check for duplicates */
1.14 misho 112: if (sa && !e_addrcmp(&c->cli_sa, sa, 42))
1.13 misho 113: break;
1.14 misho 114: if (i >= array_Size(srv->srv_clients))
1.13 misho 115: return -1; /* no more free slots! */
116:
117: return i;
118: }
119:
120: static rpc_cli_t *
1.14 misho 121: _allocClient(rpc_srv_t * __restrict srv, sockaddr_t * __restrict sa)
1.13 misho 122: {
123: rpc_cli_t *c = NULL;
124: int n;
125:
1.25 misho 126: if (srv->srv_proto != SOCK_EXT)
127: n = _check4freeslot(srv, sa);
128: else
129: n = 0;
1.13 misho 130: if (n == -1)
131: return NULL;
132: else
1.14 misho 133: c = array(srv->srv_clients, n, rpc_cli_t*);
1.13 misho 134:
135: if (!c) {
1.14 misho 136: c = e_malloc(sizeof(rpc_cli_t));
1.13 misho 137: if (!c) {
138: LOGERR;
139: srv->srv_kill = 1;
140: return NULL;
141: } else {
142: memset(c, 0, sizeof(rpc_cli_t));
1.14 misho 143: array_Set(srv->srv_clients, n, c);
1.13 misho 144: c->cli_id = n;
145: c->cli_parent = srv;
146: }
147:
148: /* alloc empty buffer */
1.14 misho 149: AIT_SET_BUFSIZ(&c->cli_buf, 0, srv->srv_netbuf);
1.13 misho 150: }
151:
152: return c;
153: }
154:
155:
156: static void *
157: freeClient(sched_task_t *task)
158: {
159: rpc_freeCli(TASK_ARG(task));
160:
161: return NULL;
162: }
163:
164: static void *
165: closeClient(sched_task_t *task)
166: {
167: int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock;
168:
169: rpc_freeCli(TASK_ARG(task));
170:
171: /* close client socket */
172: shutdown(sock, SHUT_RDWR);
173: close(sock);
1.10 misho 174: return NULL;
175: }
1.7 misho 176:
177: static void *
178: txPacket(sched_task_t *task)
179: {
180: rpc_cli_t *c = TASK_ARG(task);
181: rpc_srv_t *s = c->cli_parent;
182: rpc_func_t *f = NULL;
1.18 misho 183: u_char *buf = AIT_GET_BUF(&c->cli_buf);
1.7 misho 184: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
1.18 misho 185: int ret, estlen, wlen = sizeof(struct tagRPCCall);
1.19 misho 186: struct pollfd pfd;
1.21 misho 187: #ifdef TCP_SESSION_TIMEOUT
188: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
189:
190: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
191: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
192: TASK_ARG(task), ts, TASK_ARG(task), 0);
193: #endif
1.7 misho 194:
195: if (rpc->call_argc) {
1.10 misho 196: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
1.7 misho 197: if (!f) {
1.10 misho 198: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
1.18 misho 199:
1.7 misho 200: rpc->call_argc ^= rpc->call_argc;
201: rpc->call_rep.ret = RPC_ERROR(-1);
202: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
203: } else {
1.18 misho 204: /* calc estimated length */
205: estlen = ait_resideVars(RPC_RETVARS(c)) + wlen;
206: if (estlen > AIT_LEN(&c->cli_buf))
207: AIT_RE_BUF(&c->cli_buf, estlen);
208: buf = AIT_GET_BUF(&c->cli_buf);
1.19 misho 209: rpc = (struct tagRPCCall*) buf;
1.18 misho 210:
1.25 misho 211: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
1.7 misho 212: /* Go Encapsulate variables */
1.18 misho 213: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
214: RPC_RETVARS(c));
1.7 misho 215: if (ret == -1) {
216: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
1.19 misho 217:
1.7 misho 218: rpc->call_argc ^= rpc->call_argc;
219: rpc->call_rep.ret = RPC_ERROR(-1);
220: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
221: } else
222: wlen += ret;
223: }
224: }
225:
1.26.2.5! misho 226: /* Free return values */
! 227: ait_freeVars(&c->cli_vars);
! 228:
1.18 misho 229: rpc->call_len = htonl(wlen);
1.25 misho 230: rpc->call_io = RPC_ACK;
1.8 misho 231:
1.15 misho 232: #if 0
1.7 misho 233: /* calculate CRC */
234: rpc->call_crc ^= rpc->call_crc;
1.8 misho 235: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
1.15 misho 236: #endif
1.7 misho 237:
238: /* send reply */
1.19 misho 239: pfd.fd = TASK_FD(task);
240: pfd.events = POLLOUT;
241: for (; wlen > 0; wlen -= ret, buf += ret) {
242: if ((ret = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 ||
243: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
244: if (ret)
245: LOGERR;
246: else
1.22 misho 247: rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
1.19 misho 248: /* close connection */
249: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
250: TASK_ARG(task), 0, NULL, 0);
251: return NULL;
252: }
253: ret = send(TASK_FD(task), buf, MIN(wlen, s->srv_netbuf), MSG_NOSIGNAL);
254: if (ret == -1) {
255: /* close connection */
256: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
257: TASK_ARG(task), 0, NULL, 0);
258: return NULL;
259: }
1.10 misho 260: }
1.7 misho 261:
262: return NULL;
263: }
264:
265: static void *
266: execCall(sched_task_t *task)
267: {
268: rpc_cli_t *c = TASK_ARG(task);
269: rpc_srv_t *s = c->cli_parent;
270: rpc_func_t *f = NULL;
271: array_t *arr = NULL;
1.18 misho 272: u_char *buf = AIT_GET_BUF(&c->cli_buf);
1.7 misho 273: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
1.25 misho 274: int argc = rpc->call_argc;
1.7 misho 275:
276: /* Go decapsulate variables ... */
1.8 misho 277: if (argc) {
1.14 misho 278: arr = ait_buffer2vars(buf + sizeof(struct tagRPCCall),
1.18 misho 279: AIT_LEN(&c->cli_buf) - sizeof(struct tagRPCCall), argc, 42);
1.7 misho 280: if (!arr) {
1.14 misho 281: rpc_SetErr(ERPCMISMATCH, "#%d - %s", elwix_GetErrno(), elwix_GetError());
1.18 misho 282:
1.7 misho 283: rpc->call_argc ^= rpc->call_argc;
284: rpc->call_rep.ret = RPC_ERROR(-1);
285: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
286: return NULL;
287: }
1.10 misho 288: } else
289: arr = NULL;
1.7 misho 290:
1.10 misho 291: if (!(f = rpc_srv_getCall(s, ntohs(rpc->call_tag)))) {
1.7 misho 292: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
1.18 misho 293:
1.7 misho 294: rpc->call_argc ^= rpc->call_argc;
295: rpc->call_rep.ret = RPC_ERROR(-1);
296: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
297: } else {
1.8 misho 298: /* if client doesn't want reply */
1.10 misho 299: argc = RPC_CHK_NOREPLY(rpc);
300: rpc->call_rep.ret = RPC_ERROR(rpc_srv_execCall(c, rpc, f->func_name, arr));
1.7 misho 301: if (rpc->call_rep.ret == htonl(-1)) {
1.20 misho 302: if (!rpc->call_rep.eno) {
303: LOGERR;
304: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
305: }
1.7 misho 306: rpc->call_argc ^= rpc->call_argc;
1.26.2.5! misho 307: ait_freeVars(&c->cli_vars);
1.7 misho 308: } else {
309: rpc->call_rep.eno ^= rpc->call_rep.eno;
1.8 misho 310: if (argc) {
311: /* without reply */
1.14 misho 312: ait_freeVars(&c->cli_vars);
1.8 misho 313: rpc->call_argc ^= rpc->call_argc;
1.10 misho 314: } else {
315: /* reply */
1.25 misho 316: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
1.10 misho 317: }
1.7 misho 318: }
319: }
320:
1.14 misho 321: array_Destroy(&arr);
1.7 misho 322: return NULL;
323: }
324:
325: static void *
326: rxPacket(sched_task_t *task)
327: {
328: rpc_cli_t *c = TASK_ARG(task);
329: rpc_srv_t *s = c->cli_parent;
1.18 misho 330: int len, rlen, noreply, estlen;
1.15 misho 331: #if 0
332: u_short crc;
333: #endif
1.10 misho 334: u_char *buf = AIT_GET_BUF(&c->cli_buf);
1.18 misho 335: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
1.19 misho 336: struct pollfd pfd;
1.21 misho 337: #ifdef TCP_SESSION_TIMEOUT
338: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
339:
340: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
341: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
342: TASK_ARG(task), ts, TASK_ARG(task), 0);
343: #endif
1.7 misho 344:
1.18 misho 345: memset(buf, 0, sizeof(struct tagRPCCall));
1.19 misho 346: rlen = recv(TASK_FD(task), rpc, sizeof(struct tagRPCCall), MSG_PEEK);
1.18 misho 347: if (rlen < sizeof(struct tagRPCCall)) {
1.10 misho 348: /* close connection */
1.13 misho 349: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
350: TASK_ARG(task), 0, NULL, 0);
1.7 misho 351: return NULL;
1.10 misho 352: } else {
1.18 misho 353: estlen = ntohl(rpc->call_len);
354: if (estlen > AIT_LEN(&c->cli_buf))
355: AIT_RE_BUF(&c->cli_buf, estlen);
356: rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
1.19 misho 357: buf = AIT_GET_BUF(&c->cli_buf);
358: len = estlen;
1.18 misho 359: }
360:
1.19 misho 361: /* get next part of packet */
362: memset(buf, 0, len);
363: pfd.fd = TASK_FD(task);
364: pfd.events = POLLIN | POLLPRI;
365: for (; len > 0; len -= rlen, buf += rlen) {
366: if ((rlen = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 ||
367: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
368: if (rlen)
369: LOGERR;
370: else
1.22 misho 371: rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
1.19 misho 372: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
373: TASK_ARG(task), 0, NULL, 0);
374: return NULL;
375: }
1.18 misho 376: rlen = recv(TASK_FD(task), buf, len, 0);
377: if (rlen == -1) {
378: /* close connection */
379: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
380: TASK_ARG(task), 0, NULL, 0);
1.8 misho 381: return NULL;
1.10 misho 382: }
1.18 misho 383: }
1.22 misho 384: len = estlen;
1.8 misho 385:
1.25 misho 386: /* skip loop packet */
387: if (rpc->call_io & RPC_ACK) {
388: schedReadSelf(task);
389: return NULL;
390: }
391:
1.15 misho 392: #if 0
1.18 misho 393: /* check integrity of packet */
394: crc = ntohs(rpc->call_crc);
395: rpc->call_crc ^= rpc->call_crc;
396: if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
397: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
398: return NULL;
399: }
1.15 misho 400: #endif
1.7 misho 401:
1.18 misho 402: noreply = RPC_CHK_NOREPLY(rpc);
1.7 misho 403:
1.18 misho 404: /* check RPC packet session info */
405: if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) {
406: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
1.7 misho 407:
1.18 misho 408: rpc->call_argc ^= rpc->call_argc;
409: rpc->call_rep.ret = RPC_ERROR(-1);
410: rpc->call_rep.eno = RPC_ERROR(errno);
411: } else {
412: /* execute RPC call */
413: schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), (int) noreply, rpc, len);
414: }
1.7 misho 415:
1.18 misho 416: /* send RPC reply */
417: if (!noreply)
418: schedWrite(TASK_ROOT(task), cbProto[s->srv_proto][CB_TXPACKET],
419: TASK_ARG(task), TASK_FD(task), rpc, len);
1.7 misho 420:
421: /* lets get next packet */
1.18 misho 422: schedReadSelf(task);
1.7 misho 423: return NULL;
424: }
425:
1.1 misho 426: static void *
1.10 misho 427: acceptClients(sched_task_t *task)
1.1 misho 428: {
1.10 misho 429: rpc_srv_t *srv = TASK_ARG(task);
430: rpc_cli_t *c = NULL;
1.14 misho 431: socklen_t salen = sizeof(sockaddr_t);
1.21 misho 432: int sock;
433: #ifdef TCP_SESSION_TIMEOUT
434: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
435: #endif
1.7 misho 436:
1.13 misho 437: c = _allocClient(srv, NULL);
1.21 misho 438: if (!c) {
439: EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
440: if ((sock = accept(TASK_FD(task), NULL, NULL)) != -1) {
441: shutdown(sock, SHUT_RDWR);
442: close(sock);
443: }
1.10 misho 444: goto end;
1.21 misho 445: }
1.10 misho 446:
447: /* accept client */
448: c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
449: if (c->cli_sock == -1) {
450: LOGERR;
451: AIT_FREE_VAL(&c->cli_buf);
1.14 misho 452: array_Del(srv->srv_clients, c->cli_id, 42);
1.10 misho 453: goto end;
1.1 misho 454: } else
1.10 misho 455: fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
1.1 misho 456:
1.21 misho 457: #ifdef TCP_SESSION_TIMEOUT
458: /* armed timer for close stateless connection */
459: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
460: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], c,
461: ts, c, 0);
462: #endif
1.13 misho 463: schedRead(TASK_ROOT(task), cbProto[srv->srv_proto][CB_RXPACKET], c,
464: c->cli_sock, NULL, 0);
1.10 misho 465: end:
466: schedReadSelf(task);
467: return NULL;
468: }
1.5 misho 469:
1.7 misho 470:
1.10 misho 471: static void *
1.13 misho 472: txUDPPacket(sched_task_t *task)
1.10 misho 473: {
474: rpc_cli_t *c = TASK_ARG(task);
475: rpc_srv_t *s = c->cli_parent;
1.13 misho 476: rpc_func_t *f = NULL;
1.18 misho 477: u_char *buf = AIT_GET_BUF(&c->cli_buf);
1.13 misho 478: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
1.22 misho 479: int ret, estlen, wlen = sizeof(struct tagRPCCall);
1.13 misho 480: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
1.22 misho 481: struct pollfd pfd;
1.13 misho 482:
483: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
484: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
485: TASK_ARG(task), ts, TASK_ARG(task), 0);
486:
487: if (rpc->call_argc) {
488: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
489: if (!f) {
490: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
491: rpc->call_argc ^= rpc->call_argc;
492: rpc->call_rep.ret = RPC_ERROR(-1);
493: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
494: } else {
1.22 misho 495: /* calc estimated length */
496: estlen = ait_resideVars(RPC_RETVARS(c)) + wlen;
497: if (estlen > AIT_LEN(&c->cli_buf))
498: AIT_RE_BUF(&c->cli_buf, estlen);
499: buf = AIT_GET_BUF(&c->cli_buf);
500: rpc = (struct tagRPCCall*) buf;
501:
1.25 misho 502: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
1.13 misho 503: /* Go Encapsulate variables */
1.18 misho 504: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
505: RPC_RETVARS(c));
1.13 misho 506: if (ret == -1) {
507: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
508: rpc->call_argc ^= rpc->call_argc;
509: rpc->call_rep.ret = RPC_ERROR(-1);
510: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
511: } else
512: wlen += ret;
513: }
514: }
1.7 misho 515:
1.26.2.5! misho 516: /* Free return values */
! 517: ait_freeVars(&c->cli_vars);
! 518:
1.18 misho 519: rpc->call_len = htonl(wlen);
1.25 misho 520: rpc->call_io = RPC_ACK;
1.7 misho 521:
1.13 misho 522: /* calculate CRC */
523: rpc->call_crc ^= rpc->call_crc;
524: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
525:
526: /* send reply */
1.22 misho 527: pfd.fd = TASK_FD(task);
528: pfd.events = POLLOUT;
529: for (; wlen > 0; wlen -= ret, buf += ret) {
530: if ((ret = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 ||
531: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
532: if (ret)
533: LOGERR;
534: else
535: rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
536: /* close connection */
537: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
538: TASK_ARG(task), 0, NULL, 0);
539: return NULL;
540: }
541: ret = sendto(TASK_FD(task), buf, MIN(wlen, s->srv_netbuf), MSG_NOSIGNAL,
542: &c->cli_sa.sa, c->cli_sa.sa.sa_len);
543: if (ret == -1) {
544: /* close connection */
545: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
546: TASK_ARG(task), 0, NULL, 0);
547: return NULL;
548: }
1.13 misho 549: }
550:
551: return NULL;
552: }
553:
554: static void *
555: rxUDPPacket(sched_task_t *task)
556: {
557: rpc_srv_t *srv = TASK_ARG(task);
558: rpc_cli_t *c = NULL;
1.22 misho 559: int len, rlen, noreply, estlen;
560: u_short crc;
561: u_char *buf, b[sizeof(struct tagRPCCall)];
562: struct tagRPCCall *rpc = (struct tagRPCCall*) b;
1.14 misho 563: sockaddr_t sa;
564: socklen_t salen;
1.13 misho 565: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
1.22 misho 566: struct pollfd pfd;
1.13 misho 567:
568: /* receive connect packet */
1.14 misho 569: salen = sa.ss.ss_len = sizeof(sockaddr_t);
1.22 misho 570: rlen = recvfrom(TASK_FD(task), b, sizeof b, MSG_PEEK, &sa.sa, &salen);
571: if (rlen < sizeof(struct tagRPCCall)) {
572: rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
1.13 misho 573: goto end;
1.22 misho 574: }
1.13 misho 575:
576: c = _allocClient(srv, &sa);
1.22 misho 577: if (!c) {
578: EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
579: usleep(2000); /* blocked client delay */
1.13 misho 580: goto end;
1.22 misho 581: } else {
582: estlen = ntohl(rpc->call_len);
583: if (estlen > AIT_LEN(&c->cli_buf))
584: AIT_RE_BUF(&c->cli_buf, estlen);
585: rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
586: buf = AIT_GET_BUF(&c->cli_buf);
587: len = estlen;
588:
1.13 misho 589: c->cli_sock = TASK_FD(task);
590: memcpy(&c->cli_sa, &sa, sizeof c->cli_sa);
1.22 misho 591:
1.13 misho 592: /* armed timer for close stateless connection */
593: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
594: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
595: c, ts, c, 0);
596: }
597:
1.22 misho 598: /* get next part of packet */
599: memset(buf, 0, len);
600: pfd.fd = TASK_FD(task);
601: pfd.events = POLLIN | POLLPRI;
602: for (; len > 0; len -= rlen, buf += rlen) {
603: if ((rlen = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 ||
604: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
605: if (rlen)
606: LOGERR;
607: else
608: rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
609: schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
610: c, 0, NULL, 0);
1.24 misho 611: goto end;
1.13 misho 612: }
1.22 misho 613: salen = sa.ss.ss_len = sizeof(sockaddr_t);
614: rlen = recvfrom(TASK_FD(task), buf, len, 0, &sa.sa, &salen);
615: if (rlen == -1) {
616: /* close connection */
617: schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
618: c, 0, NULL, 0);
1.24 misho 619: goto end;
1.13 misho 620: }
1.22 misho 621: if (e_addrcmp(&c->cli_sa, &sa, 42))
622: rlen ^= rlen; /* skip if arrive from different address */
623: }
624: len = estlen;
1.13 misho 625:
1.25 misho 626: /* skip loop packet */
627: if (rpc->call_io & RPC_ACK)
628: goto end;
629:
1.22 misho 630: /* check integrity of packet */
631: crc = ntohs(rpc->call_crc);
632: rpc->call_crc ^= rpc->call_crc;
633: if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
634: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
1.24 misho 635: /* close connection */
636: schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
637: c, 0, NULL, 0);
638: goto end;
639: }
640:
641: noreply = RPC_CHK_NOREPLY(rpc);
642:
643: /* check RPC packet session info */
644: if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
645: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
646:
647: rpc->call_argc ^= rpc->call_argc;
648: rpc->call_rep.ret = RPC_ERROR(-1);
649: rpc->call_rep.eno = RPC_ERROR(errno);
650: } else {
651: /* execute RPC call */
652: schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
653: }
654:
655: /* send RPC reply */
656: if (!noreply)
657: schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],
658: c, TASK_FD(task), rpc, len);
659: end:
660: schedReadSelf(task);
661: return NULL;
662: }
663:
664:
665: static void *
1.26 misho 666: txRAWPacket(sched_task_t *task)
667: {
668: rpc_cli_t *c = TASK_ARG(task);
669: rpc_srv_t *s = c->cli_parent;
670: rpc_func_t *f = NULL;
671: u_char *buf = AIT_GET_BUF(&c->cli_buf);
672: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
673: int ret, estlen, wlen = sizeof(struct tagRPCCall);
674: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
675:
676: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
677: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
678: TASK_ARG(task), ts, TASK_ARG(task), 0);
679:
680: if (rpc->call_argc) {
681: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
682: if (!f) {
683: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
684: rpc->call_argc ^= rpc->call_argc;
685: rpc->call_rep.ret = RPC_ERROR(-1);
686: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
687: } else {
688: /* calc estimated length */
689: estlen = ait_resideVars(RPC_RETVARS(c)) + wlen;
690: if (estlen > AIT_LEN(&c->cli_buf))
691: AIT_RE_BUF(&c->cli_buf, estlen);
692: buf = AIT_GET_BUF(&c->cli_buf);
693: rpc = (struct tagRPCCall*) buf;
694:
695: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
696: /* Go Encapsulate variables */
697: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
698: RPC_RETVARS(c));
699: if (ret == -1) {
700: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
701: rpc->call_argc ^= rpc->call_argc;
702: rpc->call_rep.ret = RPC_ERROR(-1);
703: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
704: } else
705: wlen += ret;
706: }
707: }
708:
1.26.2.5! misho 709: /* Free return values */
! 710: ait_freeVars(&c->cli_vars);
! 711:
1.26 misho 712: rpc->call_len = htonl(wlen);
713: rpc->call_io = RPC_ACK;
714:
715: /* calculate CRC */
716: rpc->call_crc ^= rpc->call_crc;
717: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
718:
719: /* send reply */
1.26.2.1 misho 720: ret = sendto(TASK_FD(task), buf, MIN(wlen, s->srv_netbuf), MSG_NOSIGNAL,
721: &c->cli_sa.sa, c->cli_sa.sa.sa_len);
722: if (ret == -1) {
723: /* close connection */
724: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
725: TASK_ARG(task), 0, NULL, 0);
726: return NULL;
1.26 misho 727: }
728:
729: return NULL;
730: }
731:
732: static void *
733: rxRAWPacket(sched_task_t *task)
734: {
735: rpc_srv_t *srv = TASK_ARG(task);
736: rpc_cli_t *c = NULL;
1.26.2.1 misho 737: int len, rlen, noreply;
1.26 misho 738: u_short crc;
739: struct tagRPCCall *rpc;
740: sockaddr_t sa;
741: socklen_t salen;
742: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
1.26.2.1 misho 743: ait_val_t b = AIT_VAL_INIT;
1.26 misho 744:
745: /* receive connect packet */
1.26.2.1 misho 746: AIT_SET_BUF(&b, NULL, srv->srv_netbuf);
1.26 misho 747: salen = sa.ss.ss_len = sizeof(sockaddr_t);
1.26.2.1 misho 748: rlen = recvfrom(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b), 0, &sa.sa, &salen);
749: if (sa.sa.sa_family == AF_INET) {
750: struct ip *h;
751: h = (struct ip*) AIT_GET_BUF(&b);
752: if (rlen < ntohs(h->ip_len) || h->ip_p != IPPROTO_ERPC)
753: goto end;
754: else {
755: rlen -= sizeof(struct ip);
756: rpc = (struct tagRPCCall*)
757: (AIT_GET_BUF(&b) + sizeof(struct ip));
758: }
759: } else {
760: struct ip6_hdr *h;
761: h = (struct ip6_hdr*) AIT_GET_BUF(&b);
762: if (rlen < (ntohs(h->ip6_plen) + sizeof(struct ip6_hdr)) ||
763: h->ip6_nxt != IPPROTO_ERPC)
764: goto end;
765: else {
766: rlen -= sizeof(struct ip6_hdr);
767: rpc = (struct tagRPCCall*)
768: (AIT_GET_BUF(&b) + sizeof(struct ip6_hdr));
769: }
770: }
771: if (rlen < sizeof(struct tagRPCCall)) {
1.26 misho 772: rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
773: goto end;
774: }
775:
1.26.2.1 misho 776: /* skip loop packet */
777: if (rpc->call_io & RPC_ACK)
778: goto end;
779:
1.26 misho 780: c = _allocClient(srv, &sa);
781: if (!c) {
782: EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
783: usleep(2000); /* blocked client delay */
784: goto end;
785: } else {
1.26.2.1 misho 786: len = ntohl(rpc->call_len);
787: if (len > AIT_LEN(&c->cli_buf))
788: AIT_RE_BUF(&c->cli_buf, len);
789: memcpy(AIT_GET_BUF(&c->cli_buf), rpc, len);
790: rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
1.26 misho 791:
792: c->cli_sock = TASK_FD(task);
793: memcpy(&c->cli_sa, &sa, sizeof c->cli_sa);
794:
795: /* armed timer for close stateless connection */
796: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
797: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
798: c, ts, c, 0);
799: }
800:
801: /* check integrity of packet */
802: crc = ntohs(rpc->call_crc);
803: rpc->call_crc ^= rpc->call_crc;
804: if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
805: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
806: /* close connection */
807: schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
808: c, 0, NULL, 0);
809: goto end;
810: }
811:
812: noreply = RPC_CHK_NOREPLY(rpc);
813:
814: /* check RPC packet session info */
815: if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
816: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
817:
818: rpc->call_argc ^= rpc->call_argc;
819: rpc->call_rep.ret = RPC_ERROR(-1);
820: rpc->call_rep.eno = RPC_ERROR(errno);
821: } else {
822: /* execute RPC call */
823: schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
824: }
825:
826: /* send RPC reply */
827: if (!noreply)
828: schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],
829: c, TASK_FD(task), rpc, len);
830: end:
1.26.2.1 misho 831: AIT_FREE_VAL(&b);
1.26 misho 832: schedReadSelf(task);
833: return NULL;
834: }
835:
836:
837: static void *
1.24 misho 838: txBPFPacket(sched_task_t *task)
839: {
840: rpc_cli_t *c = TASK_ARG(task);
841: rpc_srv_t *s = c->cli_parent;
842: rpc_func_t *f = NULL;
843: u_char *buf = AIT_GET_BUF(&c->cli_buf);
844: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
845: int ret, len, wlen = sizeof(struct tagRPCCall);
846: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
847: struct ether_header *eh;
848: ait_val_t b = AIT_VAL_INIT;
849:
850: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
851: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
852: TASK_ARG(task), ts, TASK_ARG(task), 0);
853:
854: if (rpc->call_argc) {
855: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
856: if (!f) {
857: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
858: rpc->call_argc ^= rpc->call_argc;
859: rpc->call_rep.ret = RPC_ERROR(-1);
860: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
861: } else {
862: /* calc estimated length */
863: len = ait_resideVars(RPC_RETVARS(c)) + wlen;
864: if (len > AIT_LEN(&c->cli_buf))
865: AIT_RE_BUF(&c->cli_buf, len);
866: buf = AIT_GET_BUF(&c->cli_buf);
867: rpc = (struct tagRPCCall*) buf;
868:
1.25 misho 869: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
1.24 misho 870: /* Go Encapsulate variables */
871: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
872: RPC_RETVARS(c));
873: if (ret == -1) {
874: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
875: rpc->call_argc ^= rpc->call_argc;
876: rpc->call_rep.ret = RPC_ERROR(-1);
877: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
878: } else
879: wlen += ret;
880: }
881: }
882:
1.26.2.5! misho 883: /* Free return values */
! 884: ait_freeVars(&RPC_RETVARS(c));
! 885:
1.24 misho 886: rpc->call_len = htonl(wlen);
1.25 misho 887: rpc->call_io = RPC_ACK;
1.24 misho 888:
889: /* calculate CRC */
890: rpc->call_crc ^= rpc->call_crc;
891: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
892:
893: /* send reply */
894: AIT_SET_BUF(&b, NULL, MIN(wlen, s->srv_netbuf) + ETHER_HDR_LEN);
895: eh = (struct ether_header*) AIT_GET_BUF(&b);
896: memcpy(eh->ether_dhost, LLADDR(&c->cli_sa.sdl), ETHER_ADDR_LEN);
897: eh->ether_type = htons(RPC_DEFPORT);
898: memcpy(eh + 1, buf, MIN(wlen, s->srv_netbuf));
899:
900: ret = write(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b));
901: AIT_FREE_VAL(&b);
902: if (ret == -1) {
903: /* close connection */
904: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
905: TASK_ARG(task), 0, NULL, 0);
1.22 misho 906: return NULL;
907: }
1.13 misho 908:
1.24 misho 909: return NULL;
910: }
911:
912: static void *
913: rxBPFPacket(sched_task_t *task)
914: {
915: rpc_srv_t *srv = TASK_ARG(task);
916: rpc_cli_t *c = NULL;
917: int len, rlen, noreply;
918: u_short crc;
919: struct tagRPCCall *rpc;
920: sockaddr_t sa;
921: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
922: struct bpf_hdr *h;
923: struct ether_header *eh;
924: ait_val_t b = AIT_VAL_INIT;
925:
926: /* receive connect packet */
927: AIT_SET_BUF(&b, NULL, srv->srv_netbuf);
928: rlen = read(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b));
929: h = (struct bpf_hdr*) AIT_GET_BUF(&b);
930: rlen -= h->bh_hdrlen;
931: if (rlen < h->bh_datalen || h->bh_caplen != h->bh_datalen ||
932: rlen < ETHER_HDR_LEN + sizeof(struct tagRPCCall)) {
933: rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
934: goto end;
935: } else {
936: rlen = h->bh_caplen;
937: eh = (struct ether_header*) (AIT_GET_BUF(&b) + h->bh_hdrlen);
938: rlen -= ETHER_HDR_LEN;
939: rpc = (struct tagRPCCall*) (eh + 1);
1.25 misho 940:
941: #if 0
942: /* skip loop packet */
943: if (rpc->call_io & RPC_ACK)
944: goto end;
945: #endif
946:
1.24 misho 947: if (eh->ether_type != ntohs(RPC_DEFPORT))
948: goto end;
949: else
950: e_getlinkbymac((const ether_addr_t*) eh->ether_shost, &sa);
951: }
952:
953: c = _allocClient(srv, &sa);
954: if (!c) {
955: EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
956: usleep(2000); /* blocked client delay */
957: goto end;
958: } else {
959: len = ntohl(rpc->call_len);
960: if (len > AIT_LEN(&c->cli_buf))
961: AIT_RE_BUF(&c->cli_buf, len);
1.26.2.1 misho 962: memcpy(AIT_GET_BUF(&c->cli_buf), rpc, len);
1.24 misho 963: rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
964:
965: c->cli_sock = TASK_FD(task);
966: memcpy(&c->cli_sa, &sa, sizeof c->cli_sa);
967:
968: /* armed timer for close stateless connection */
969: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
970: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
971: c, ts, c, 0);
972: }
973:
974: /* check integrity of packet */
975: crc = ntohs(rpc->call_crc);
976: rpc->call_crc ^= rpc->call_crc;
977: if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
978: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
979: /* close connection */
980: schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
981: c, 0, NULL, 0);
982: goto end;
983: }
984:
1.22 misho 985: noreply = RPC_CHK_NOREPLY(rpc);
1.18 misho 986:
1.22 misho 987: /* check RPC packet session info */
988: if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
989: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
1.13 misho 990:
1.22 misho 991: rpc->call_argc ^= rpc->call_argc;
992: rpc->call_rep.ret = RPC_ERROR(-1);
993: rpc->call_rep.eno = RPC_ERROR(errno);
994: } else {
995: /* execute RPC call */
996: schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
997: }
1.13 misho 998:
1.22 misho 999: /* send RPC reply */
1000: if (!noreply)
1.24 misho 1001: schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],
1.22 misho 1002: c, TASK_FD(task), rpc, len);
1.13 misho 1003: end:
1.24 misho 1004: AIT_FREE_VAL(&b);
1.13 misho 1005: schedReadSelf(task);
1006: return NULL;
1007: }
1008:
1.25 misho 1009:
1010: static void *
1011: txEXTPacket(sched_task_t *task)
1012: {
1013: rpc_cli_t *c = TASK_ARG(task);
1014: rpc_srv_t *s = c->cli_parent;
1015: rpc_func_t *f = NULL;
1016: u_char *buf = AIT_GET_BUF(&c->cli_buf);
1017: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
1018: int ret, len, wlen = sizeof(struct tagRPCCall);
1019: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
1020:
1021: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
1022: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
1023: TASK_ARG(task), ts, TASK_ARG(task), 0);
1024:
1025: if (rpc->call_argc) {
1026: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
1027: if (!f) {
1028: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
1029: rpc->call_argc ^= rpc->call_argc;
1030: rpc->call_rep.ret = RPC_ERROR(-1);
1031: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
1032: } else {
1033: /* calc estimated length */
1034: len = ait_resideVars(RPC_RETVARS(c)) + wlen;
1035: if (len > AIT_LEN(&c->cli_buf))
1036: AIT_RE_BUF(&c->cli_buf, len);
1037: buf = AIT_GET_BUF(&c->cli_buf);
1038: rpc = (struct tagRPCCall*) buf;
1039:
1040: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
1041: /* Go Encapsulate variables */
1042: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
1043: RPC_RETVARS(c));
1044: if (ret == -1) {
1045: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
1046: rpc->call_argc ^= rpc->call_argc;
1047: rpc->call_rep.ret = RPC_ERROR(-1);
1048: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
1049: } else
1050: wlen += ret;
1051: }
1052: }
1053:
1.26.2.5! misho 1054: /* Free return values */
! 1055: ait_freeVars(&RPC_RETVARS(c));
! 1056:
1.25 misho 1057: rpc->call_len = htonl(wlen);
1058: rpc->call_io = RPC_ACK;
1059:
1060: /* send reply */
1061: ret = write(TASK_FD(task), buf, MIN(wlen, s->srv_netbuf));
1062: if (ret == -1) {
1063: /* close connection */
1064: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
1065: TASK_ARG(task), 0, NULL, 0);
1066: return NULL;
1067: }
1068:
1069: return NULL;
1070: }
1071:
1072: static void *
1073: rxEXTPacket(sched_task_t *task)
1074: {
1075: rpc_srv_t *srv = TASK_ARG(task);
1076: rpc_cli_t *c = NULL;
1077: int len, rlen, noreply;
1078: struct tagRPCCall *rpc;
1079: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
1080: ait_val_t b = AIT_VAL_INIT;
1081: sockaddr_t sa;
1082:
1083: memset(&sa, 0, sizeof sa);
1084: /* receive connect packet */
1085: AIT_SET_BUF(&b, NULL, srv->srv_netbuf);
1086: rlen = read(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b));
1087: if (rlen < sizeof(struct tagRPCCall)) {
1088: rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
1089: goto end;
1090: } else {
1091: rpc = (struct tagRPCCall*) AIT_GET_BUF(&b);
1092:
1093: /* skip loop packet */
1094: if (rpc->call_io & RPC_ACK)
1095: goto end;
1096: }
1097:
1098: c = _allocClient(srv, &sa);
1099: if (!c) {
1100: EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
1101: usleep(2000); /* blocked client delay */
1102: goto end;
1103: } else {
1104: len = ntohl(rpc->call_len);
1105: if (len > AIT_LEN(&c->cli_buf))
1106: AIT_RE_BUF(&c->cli_buf, len);
1107: memcpy(AIT_GET_BUF(&c->cli_buf), rpc, AIT_LEN(&c->cli_buf));
1108: rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
1109:
1110: c->cli_sock = TASK_FD(task);
1111:
1112: /* armed timer for close stateless connection */
1113: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
1114: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
1115: c, ts, c, 0);
1116: }
1117:
1118: noreply = RPC_CHK_NOREPLY(rpc);
1119:
1120: /* check RPC packet session info */
1121: if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
1122: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
1123:
1124: rpc->call_argc ^= rpc->call_argc;
1125: rpc->call_rep.ret = RPC_ERROR(-1);
1126: rpc->call_rep.eno = RPC_ERROR(errno);
1127: } else {
1128: /* execute RPC call */
1129: schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
1130: }
1131:
1132: /* send RPC reply */
1133: if (!noreply)
1134: schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],
1135: c, TASK_FD(task), rpc, len);
1136: end:
1137: AIT_FREE_VAL(&b);
1138: schedReadSelf(task);
1139: return NULL;
1140: }
1141:
1.13 misho 1142: /* ------------------------------------------------------ */
1143:
1.16 misho 1144: void
1.13 misho 1145: rpc_freeBLOBCli(rpc_cli_t * __restrict c)
1146: {
1147: rpc_srv_t *s = c->cli_parent;
1148:
1149: schedCancelby(s->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
1.10 misho 1150:
1151: /* free buffer */
1152: AIT_FREE_VAL(&c->cli_buf);
1153:
1.14 misho 1154: array_Del(s->srv_blob.clients, c->cli_id, 0);
1.10 misho 1155: if (c)
1.14 misho 1156: e_free(c);
1.13 misho 1157: }
1158:
1159:
1160: static void *
1161: closeBLOBClient(sched_task_t *task)
1162: {
1163: int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock;
1164:
1165: rpc_freeBLOBCli(TASK_ARG(task));
1166:
1167: /* close client socket */
1168: shutdown(sock, SHUT_RDWR);
1169: close(sock);
1.7 misho 1170: return NULL;
1171: }
1172:
1173: static void *
1174: txBLOB(sched_task_t *task)
1175: {
1.10 misho 1176: rpc_cli_t *c = TASK_ARG(task);
1177: u_char *buf = AIT_GET_BUF(&c->cli_buf);
1.7 misho 1178: int wlen = sizeof(struct tagBLOBHdr);
1179:
1180: /* send reply */
1.10 misho 1181: wlen = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);
1182: if (wlen == -1 || wlen != sizeof(struct tagBLOBHdr)) {
1183: /* close blob connection */
1184: schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
1185: }
1.7 misho 1186:
1187: return NULL;
1188: }
1.4 misho 1189:
1.7 misho 1190: static void *
1191: rxBLOB(sched_task_t *task)
1192: {
1193: rpc_cli_t *c = TASK_ARG(task);
1194: rpc_srv_t *s = c->cli_parent;
1195: rpc_blob_t *b;
1.10 misho 1196: struct tagBLOBHdr blob;
1.7 misho 1197: int rlen;
1198:
1.10 misho 1199: memset(&blob, 0, sizeof blob);
1200: rlen = recv(TASK_FD(task), &blob, sizeof blob, 0);
1201: if (rlen < 1) {
1202: /* close blob connection */
1203: schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
1.7 misho 1204: return NULL;
1205: }
1.5 misho 1206:
1.10 misho 1207: /* check BLOB packet */
1208: if (rlen < sizeof(struct tagBLOBHdr)) {
1209: rpc_SetErr(ERPCMISMATCH, "Short BLOB packet");
1.6 misho 1210:
1.10 misho 1211: schedReadSelf(task);
1.7 misho 1212: return NULL;
1213: }
1.1 misho 1214:
1.7 misho 1215: /* check RPC packet session info */
1.15 misho 1216: if (rpc_chkPktSession(&blob.hdr_session, &s->srv_session)) {
1.7 misho 1217: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
1.10 misho 1218: blob.hdr_cmd = error;
1.7 misho 1219: goto end;
1220: }
1221:
1222: /* Go to proceed packet ... */
1.10 misho 1223: switch (blob.hdr_cmd) {
1.7 misho 1224: case get:
1.10 misho 1225: if (!(b = rpc_srv_getBLOB(s, ntohl(blob.hdr_var)))) {
1226: rpc_SetErr(EINVAL, "Var=%x not found", ntohl(blob.hdr_var));
1227: blob.hdr_cmd = no;
1228: blob.hdr_ret = RPC_ERROR(-1);
1.7 misho 1229: break;
1230: } else
1.10 misho 1231: blob.hdr_len = htonl(b->blob_len);
1.5 misho 1232:
1.7 misho 1233: if (rpc_srv_blobMap(s, b) != -1) {
1234: /* deliver BLOB variable to client */
1.10 misho 1235: blob.hdr_ret = htonl(rpc_srv_sendBLOB(c, b));
1.7 misho 1236: rpc_srv_blobUnmap(b);
1237: } else {
1.10 misho 1238: blob.hdr_cmd = error;
1239: blob.hdr_ret = RPC_ERROR(-1);
1.7 misho 1240: }
1241: break;
1242: case set:
1.17 misho 1243: if ((b = rpc_srv_registerBLOB(s, ntohl(blob.hdr_len),
1244: ntohl(blob.hdr_ret)))) {
1.7 misho 1245: /* set new BLOB variable for reply :) */
1.10 misho 1246: blob.hdr_var = htonl(b->blob_var);
1.7 misho 1247:
1248: /* receive BLOB from client */
1.10 misho 1249: blob.hdr_ret = htonl(rpc_srv_recvBLOB(c, b));
1.7 misho 1250: rpc_srv_blobUnmap(b);
1.5 misho 1251: } else {
1.10 misho 1252: blob.hdr_cmd = error;
1253: blob.hdr_ret = RPC_ERROR(-1);
1.7 misho 1254: }
1255: break;
1256: case unset:
1.11 misho 1257: if (rpc_srv_unregisterBLOB(s, ntohl(blob.hdr_var)) == -1) {
1.10 misho 1258: blob.hdr_cmd = error;
1259: blob.hdr_ret = RPC_ERROR(-1);
1.1 misho 1260: }
1261: break;
1.7 misho 1262: default:
1.10 misho 1263: rpc_SetErr(EPROCUNAVAIL, "Unsupported BLOB command %d", blob.hdr_cmd);
1264: blob.hdr_cmd = error;
1265: blob.hdr_ret = RPC_ERROR(-1);
1.7 misho 1266: }
1.1 misho 1267:
1.7 misho 1268: end:
1.10 misho 1269: memcpy(AIT_ADDR(&c->cli_buf), &blob, sizeof blob);
1270: schedWrite(TASK_ROOT(task), txBLOB, TASK_ARG(task), TASK_FD(task), NULL, 0);
1271: schedReadSelf(task);
1.7 misho 1272: return NULL;
1.2 misho 1273: }
1274:
1275: static void *
1.17 misho 1276: flushBLOB(sched_task_t *task)
1277: {
1.23 misho 1278: uintptr_t sigArg = atomic_load_acq_ptr(&_glSigArg);
1279: rpc_srv_t *srv = sigArg ? (void*) sigArg : TASK_ARG(task);
1.17 misho 1280: rpc_blob_t *b, *tmp;
1281:
1282: TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
1283: TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
1284:
1285: rpc_srv_blobFree(srv, b);
1286: e_free(b);
1287: }
1288:
1.23 misho 1289: if (!schedSignalSelf(task)) {
1290: /* disabled kqueue support in libaitsched */
1291: struct sigaction sa;
1292:
1293: memset(&sa, 0, sizeof sa);
1294: sigemptyset(&sa.sa_mask);
1295: sa.sa_handler = (void (*)(int)) flushBLOB;
1296: sa.sa_flags = SA_RESTART | SA_RESETHAND;
1297: sigaction(SIGFBLOB, &sa, NULL);
1298: }
1299:
1.17 misho 1300: return NULL;
1301: }
1302:
1303: static void *
1.10 misho 1304: acceptBLOBClients(sched_task_t *task)
1.2 misho 1305: {
1.10 misho 1306: rpc_srv_t *srv = TASK_ARG(task);
1307: rpc_cli_t *c = NULL;
1308: register int i;
1.14 misho 1309: socklen_t salen = sizeof(sockaddr_t);
1.21 misho 1310: int sock;
1.12 misho 1311: #ifdef TCP_NOPUSH
1312: int n = 1;
1313: #endif
1.7 misho 1314:
1.10 misho 1315: /* check free slots for connect */
1.14 misho 1316: for (i = 0; i < array_Size(srv->srv_blob.clients) &&
1317: (c = array(srv->srv_blob.clients, i, rpc_cli_t*)); i++);
1.21 misho 1318: if (c) { /* no more free slots! */
1319: EVERBOSE(1, "BLOB client quota exceeded! Connection will be shutdown!\n");
1320: if ((sock = accept(TASK_FD(task), NULL, NULL)) != -1) {
1321: shutdown(sock, SHUT_RDWR);
1322: close(sock);
1323: }
1.10 misho 1324: goto end;
1.21 misho 1325: }
1326:
1.14 misho 1327: c = e_malloc(sizeof(rpc_cli_t));
1.10 misho 1328: if (!c) {
1.7 misho 1329: LOGERR;
1.10 misho 1330: srv->srv_kill = srv->srv_blob.kill = 1;
1.7 misho 1331: return NULL;
1332: } else {
1.10 misho 1333: memset(c, 0, sizeof(rpc_cli_t));
1.14 misho 1334: array_Set(srv->srv_blob.clients, i, c);
1.10 misho 1335: c->cli_id = i;
1336: c->cli_parent = srv;
1.7 misho 1337: }
1.4 misho 1338:
1.10 misho 1339: /* alloc empty buffer */
1.14 misho 1340: AIT_SET_BUFSIZ(&c->cli_buf, 0, srv->srv_netbuf);
1.2 misho 1341:
1.10 misho 1342: /* accept client */
1343: c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
1344: if (c->cli_sock == -1) {
1345: LOGERR;
1346: AIT_FREE_VAL(&c->cli_buf);
1.14 misho 1347: array_Del(srv->srv_blob.clients, i, 42);
1.10 misho 1348: goto end;
1.12 misho 1349: } else {
1350: #ifdef TCP_NOPUSH
1351: setsockopt(c->cli_sock, IPPROTO_TCP, TCP_NOPUSH, &n, sizeof n);
1352: #endif
1.10 misho 1353: fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
1.12 misho 1354: }
1.2 misho 1355:
1.10 misho 1356: schedRead(TASK_ROOT(task), rxBLOB, c, c->cli_sock, NULL, 0);
1357: end:
1358: schedReadSelf(task);
1.7 misho 1359: return NULL;
1.1 misho 1360: }
1361:
1.10 misho 1362: /* ------------------------------------------------------ */
1.1 misho 1363:
1364: /*
1.7 misho 1365: * rpc_srv_initBLOBServer() - Init & create BLOB Server
1366: *
1.4 misho 1367: * @srv = RPC server instance
1.2 misho 1368: * @Port = Port for bind server, if Port == 0 default port is selected
1369: * @diskDir = Disk place for BLOB file objects
1370: * return: -1 == error or 0 bind and created BLOB server instance
1371: */
1372: int
1373: rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)
1374: {
1375: int n = 1;
1376:
1.10 misho 1377: if (!srv || srv->srv_kill) {
1.7 misho 1378: rpc_SetErr(EINVAL, "Invalid parameters can`t init BLOB server");
1.2 misho 1379: return -1;
1380: }
1381:
1382: memset(&srv->srv_blob, 0, sizeof srv->srv_blob);
1383: if (access(diskDir, R_OK | W_OK) == -1) {
1384: LOGERR;
1385: return -1;
1386: } else
1.7 misho 1387: AIT_SET_STR(&srv->srv_blob.dir, diskDir);
1.2 misho 1388:
1.10 misho 1389: /* init blob list */
1390: TAILQ_INIT(&srv->srv_blob.blobs);
1391:
1.2 misho 1392: srv->srv_blob.server.cli_parent = srv;
1.4 misho 1393:
1.14 misho 1394: memcpy(&srv->srv_blob.server.cli_sa, &srv->srv_server.cli_sa, sizeof(sockaddr_t));
1.10 misho 1395: switch (srv->srv_blob.server.cli_sa.sa.sa_family) {
1.4 misho 1396: case AF_INET:
1.10 misho 1397: srv->srv_blob.server.cli_sa.sin.sin_port =
1398: htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin.sin_port) + 1);
1.4 misho 1399: break;
1400: case AF_INET6:
1.10 misho 1401: srv->srv_blob.server.cli_sa.sin6.sin6_port =
1402: htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin6.sin6_port) + 1);
1.4 misho 1403: break;
1404: case AF_LOCAL:
1.10 misho 1405: strlcat(srv->srv_blob.server.cli_sa.sun.sun_path, ".blob",
1406: sizeof srv->srv_blob.server.cli_sa.sun.sun_path);
1.4 misho 1407: break;
1408: default:
1.7 misho 1409: AIT_FREE_VAL(&srv->srv_blob.dir);
1.4 misho 1410: return -1;
1.2 misho 1411: }
1412:
1.4 misho 1413: /* create BLOB server socket */
1.6 misho 1414: srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);
1.2 misho 1415: if (srv->srv_blob.server.cli_sock == -1) {
1416: LOGERR;
1.7 misho 1417: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 1418: return -1;
1419: }
1420: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
1421: LOGERR;
1422: close(srv->srv_blob.server.cli_sock);
1.7 misho 1423: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 1424: return -1;
1425: }
1.5 misho 1426: n = srv->srv_netbuf;
1427: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
1428: LOGERR;
1429: close(srv->srv_blob.server.cli_sock);
1.7 misho 1430: AIT_FREE_VAL(&srv->srv_blob.dir);
1.5 misho 1431: return -1;
1432: }
1433: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
1434: LOGERR;
1435: close(srv->srv_blob.server.cli_sock);
1.7 misho 1436: AIT_FREE_VAL(&srv->srv_blob.dir);
1.5 misho 1437: return -1;
1438: }
1.6 misho 1439: if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa,
1440: srv->srv_blob.server.cli_sa.sa.sa_len) == -1) {
1.2 misho 1441: LOGERR;
1442: close(srv->srv_blob.server.cli_sock);
1.7 misho 1443: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 1444: return -1;
1.13 misho 1445: } else
1446: fcntl(srv->srv_blob.server.cli_sock, F_SETFL,
1447: fcntl(srv->srv_blob.server.cli_sock, F_GETFL) | O_NONBLOCK);
1448:
1.2 misho 1449:
1.10 misho 1450: /* allocate pool for concurent blob clients */
1.14 misho 1451: srv->srv_blob.clients = array_Init(array_Size(srv->srv_clients));
1.2 misho 1452: if (!srv->srv_blob.clients) {
1.14 misho 1453: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
1.2 misho 1454: close(srv->srv_blob.server.cli_sock);
1.7 misho 1455: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 1456: return -1;
1.10 misho 1457: }
1.2 misho 1458:
1.10 misho 1459: /* init blob scheduler */
1460: srv->srv_blob.root = schedBegin();
1461: if (!srv->srv_blob.root) {
1462: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1.14 misho 1463: array_Destroy(&srv->srv_blob.clients);
1.10 misho 1464: close(srv->srv_blob.server.cli_sock);
1465: AIT_FREE_VAL(&srv->srv_blob.dir);
1466: return -1;
1467: }
1.2 misho 1468:
1469: return 0;
1470: }
1471:
1472: /*
1.7 misho 1473: * rpc_srv_endBLOBServer() - Destroy BLOB server, close all opened sockets and free resources
1474: *
1.2 misho 1475: * @srv = RPC Server instance
1476: * return: none
1477: */
1.16 misho 1478: void
1.2 misho 1479: rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
1480: {
1.10 misho 1481: if (!srv)
1.2 misho 1482: return;
1483:
1.10 misho 1484: srv->srv_blob.kill = 1;
1.17 misho 1485:
1486: schedEnd(&srv->srv_blob.root);
1.26.2.4 misho 1487:
1488: if (srv->srv_blob.server.cli_sa.sa.sa_family == AF_LOCAL)
1489: unlink(srv->srv_blob.server.cli_sa.sun.sun_path);
1.2 misho 1490: }
1491:
1492: /*
1.10 misho 1493: * rpc_srv_loopBLOBServer() - Execute Main BLOB server loop and wait for clients requests
1.7 misho 1494: *
1.2 misho 1495: * @srv = RPC Server instance
1496: * return: -1 error or 0 ok, infinite loop ...
1497: */
1498: int
1.10 misho 1499: rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv)
1.2 misho 1500: {
1.10 misho 1501: rpc_cli_t *c;
1.2 misho 1502: register int i;
1.10 misho 1503: rpc_blob_t *b, *tmp;
1504: struct timespec ts = { RPC_SCHED_POLLING, 0 };
1.2 misho 1505:
1.10 misho 1506: if (!srv || srv->srv_kill) {
1.7 misho 1507: rpc_SetErr(EINVAL, "Invalid parameter can`t start BLOB server");
1.2 misho 1508: return -1;
1509: }
1510:
1.14 misho 1511: if (listen(srv->srv_blob.server.cli_sock, array_Size(srv->srv_blob.clients)) == -1) {
1.2 misho 1512: LOGERR;
1513: return -1;
1.10 misho 1514: }
1515:
1.23 misho 1516: if (!schedSignal(srv->srv_blob.root, flushBLOB, srv, SIGFBLOB, NULL, 0)) {
1517: /* disabled kqueue support in libaitsched */
1518: struct sigaction sa;
1519:
1520: atomic_store_rel_ptr(&_glSigArg, (uintptr_t) srv);
1521:
1522: memset(&sa, 0, sizeof sa);
1523: sigemptyset(&sa.sa_mask);
1524: sa.sa_handler = (void (*)(int)) flushBLOB;
1525: sa.sa_flags = SA_RESTART | SA_RESETHAND;
1526: sigaction(SIGFBLOB, &sa, NULL);
1527: }
1528:
1.10 misho 1529: if (!schedRead(srv->srv_blob.root, acceptBLOBClients, srv,
1530: srv->srv_blob.server.cli_sock, NULL, 0)) {
1531: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1532: return -1;
1533: }
1.2 misho 1534:
1.10 misho 1535: schedPolling(srv->srv_blob.root, &ts, NULL);
1536: /* main rpc loop */
1537: schedRun(srv->srv_blob.root, &srv->srv_blob.kill);
1.7 misho 1538:
1.17 misho 1539: /* detach blobs */
1540: TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
1541: TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
1542:
1543: rpc_srv_blobFree(srv, b);
1544: e_free(b);
1545: }
1546:
1.10 misho 1547: /* close all clients connections & server socket */
1.14 misho 1548: for (i = 0; i < array_Size(srv->srv_blob.clients); i++) {
1549: c = array(srv->srv_blob.clients, i, rpc_cli_t*);
1.10 misho 1550: if (c) {
1551: shutdown(c->cli_sock, SHUT_RDWR);
1552: close(c->cli_sock);
1553:
1554: schedCancelby(srv->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
1555: AIT_FREE_VAL(&c->cli_buf);
1.2 misho 1556: }
1.14 misho 1557: array_Del(srv->srv_blob.clients, i, 42);
1.10 misho 1558: }
1.14 misho 1559: array_Destroy(&srv->srv_blob.clients);
1.2 misho 1560:
1.10 misho 1561: close(srv->srv_blob.server.cli_sock);
1.2 misho 1562:
1.10 misho 1563: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 1564: return 0;
1565: }
1566:
1567:
1568: /*
1.7 misho 1569: * rpc_srv_initServer() - Init & create RPC Server
1570: *
1.15 misho 1571: * @InstID = Instance for authentication & recognition
1.1 misho 1572: * @concurentClients = Concurent clients at same time to this server
1.10 misho 1573: * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
1.4 misho 1574: * @csHost = Host name or address for bind server, if NULL any address
1.1 misho 1575: * @Port = Port for bind server, if Port == 0 default port is selected
1.13 misho 1576: * @proto = Protocol, if == 0 choose SOCK_STREAM
1.1 misho 1577: * return: NULL == error or !=NULL bind and created RPC server instance
1578: */
1579: rpc_srv_t *
1.15 misho 1580: rpc_srv_initServer(u_char InstID, int concurentClients, int netBuf,
1581: const char *csHost, u_short Port, int proto)
1.1 misho 1582: {
1.10 misho 1583: int n = 1;
1.1 misho 1584: rpc_srv_t *srv = NULL;
1.14 misho 1585: sockaddr_t sa = E_SOCKADDR_INIT;
1.1 misho 1586:
1.25 misho 1587: if (!concurentClients || (proto < 0 || proto > SOCK_RAW)) {
1.10 misho 1588: rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
1.1 misho 1589: return NULL;
1590: }
1.26.2.3 misho 1591: if (!Port && proto < SOCK_RAW)
1.1 misho 1592: Port = RPC_DEFPORT;
1.26.2.2 misho 1593: if (!e_gethostbyname(csHost, Port, &sa))
1594: return NULL;
1.13 misho 1595: if (!proto)
1596: proto = SOCK_STREAM;
1.10 misho 1597: if (netBuf < RPC_MIN_BUFSIZ)
1.5 misho 1598: netBuf = BUFSIZ;
1.7 misho 1599: else
1.14 misho 1600: netBuf = E_ALIGN(netBuf, 2); /* align netBuf length */
1.10 misho 1601:
1602: #ifdef HAVE_SRANDOMDEV
1603: srandomdev();
1604: #else
1605: time_t tim;
1606:
1607: srandom((time(&tim) ^ getpid()));
1608: #endif
1.1 misho 1609:
1.14 misho 1610: srv = e_malloc(sizeof(rpc_srv_t));
1.1 misho 1611: if (!srv) {
1612: LOGERR;
1613: return NULL;
1614: } else
1615: memset(srv, 0, sizeof(rpc_srv_t));
1616:
1.13 misho 1617: srv->srv_proto = proto;
1.5 misho 1618: srv->srv_netbuf = netBuf;
1.1 misho 1619: srv->srv_session.sess_version = RPC_VERSION;
1.15 misho 1620: srv->srv_session.sess_instance = InstID;
1.1 misho 1621:
1622: srv->srv_server.cli_parent = srv;
1.10 misho 1623: memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa);
1624:
1.12 misho 1625: /* init functions */
1626: pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
1627: SLIST_INIT(&srv->srv_funcs);
1628: AVL_INIT(&srv->srv_funcs);
1.10 misho 1629:
1630: /* init scheduler */
1631: srv->srv_root = schedBegin();
1632: if (!srv->srv_root) {
1633: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1.12 misho 1634: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1.14 misho 1635: e_free(srv);
1.10 misho 1636: return NULL;
1637: }
1638:
1639: /* init pool for clients */
1.14 misho 1640: srv->srv_clients = array_Init(concurentClients);
1.10 misho 1641: if (!srv->srv_clients) {
1.14 misho 1642: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
1.10 misho 1643: schedEnd(&srv->srv_root);
1.12 misho 1644: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1.14 misho 1645: e_free(srv);
1.10 misho 1646: return NULL;
1647: }
1.4 misho 1648:
1649: /* create server socket */
1.26 misho 1650: srv->srv_server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family,
1651: srv->srv_proto, srv->srv_proto == SOCK_RAW ? IPPROTO_ERPC : 0);
1.1 misho 1652: if (srv->srv_server.cli_sock == -1) {
1653: LOGERR;
1.14 misho 1654: array_Destroy(&srv->srv_clients);
1.10 misho 1655: schedEnd(&srv->srv_root);
1.12 misho 1656: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1.14 misho 1657: e_free(srv);
1.1 misho 1658: return NULL;
1659: }
1660: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
1661: LOGERR;
1.10 misho 1662: goto err;
1.1 misho 1663: }
1.5 misho 1664: n = srv->srv_netbuf;
1665: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
1666: LOGERR;
1.10 misho 1667: goto err;
1.5 misho 1668: }
1669: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
1670: LOGERR;
1.10 misho 1671: goto err;
1.5 misho 1672: }
1.6 misho 1673: if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa,
1674: srv->srv_server.cli_sa.sa.sa_len) == -1) {
1.1 misho 1675: LOGERR;
1.10 misho 1676: goto err;
1.13 misho 1677: } else
1678: fcntl(srv->srv_server.cli_sock, F_SETFL,
1679: fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
1.1 misho 1680:
1.10 misho 1681: rpc_register_srvPing(srv);
1.8 misho 1682:
1.1 misho 1683: return srv;
1.10 misho 1684: err: /* error condition */
1685: close(srv->srv_server.cli_sock);
1.14 misho 1686: array_Destroy(&srv->srv_clients);
1.10 misho 1687: schedEnd(&srv->srv_root);
1.12 misho 1688: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1.14 misho 1689: e_free(srv);
1.10 misho 1690: return NULL;
1.1 misho 1691: }
1692:
1693: /*
1.7 misho 1694: * rpc_srv_endServer() - Destroy RPC server, close all opened sockets and free resources
1695: *
1.6 misho 1696: * @psrv = RPC Server instance
1.1 misho 1697: * return: none
1698: */
1.16 misho 1699: void
1.6 misho 1700: rpc_srv_endServer(rpc_srv_t ** __restrict psrv)
1.1 misho 1701: {
1.10 misho 1702: if (!psrv || !*psrv)
1.1 misho 1703: return;
1704:
1.10 misho 1705: /* if send kill to blob server */
1.17 misho 1706: rpc_srv_endBLOBServer(*psrv);
1.1 misho 1707:
1.10 misho 1708: (*psrv)->srv_kill = 1;
1709: sleep(RPC_SCHED_POLLING);
1.2 misho 1710:
1.17 misho 1711: schedEnd(&(*psrv)->srv_root);
1712:
1.26.2.4 misho 1713: if ((*psrv)->srv_server.cli_sa.sa.sa_family == AF_LOCAL)
1714: unlink((*psrv)->srv_server.cli_sa.sun.sun_path);
1715:
1.12 misho 1716: pthread_mutex_destroy(&(*psrv)->srv_funcs.mtx);
1.14 misho 1717: e_free(*psrv);
1.6 misho 1718: *psrv = NULL;
1.1 misho 1719: }
1720:
1721: /*
1.7 misho 1722: * rpc_srv_loopServer() - Execute Main server loop and wait for clients requests
1723: *
1.1 misho 1724: * @srv = RPC Server instance
1725: * return: -1 error or 0 ok, infinite loop ...
1726: */
1727: int
1.5 misho 1728: rpc_srv_loopServer(rpc_srv_t * __restrict srv)
1.1 misho 1729: {
1.10 misho 1730: rpc_cli_t *c;
1.1 misho 1731: register int i;
1.12 misho 1732: rpc_func_t *f;
1.10 misho 1733: struct timespec ts = { RPC_SCHED_POLLING, 0 };
1.1 misho 1734:
1735: if (!srv) {
1.10 misho 1736: rpc_SetErr(EINVAL, "Invalid parameter can`t start RPC server");
1.1 misho 1737: return -1;
1738: }
1739:
1.13 misho 1740: if (srv->srv_proto == SOCK_STREAM)
1.14 misho 1741: if (listen(srv->srv_server.cli_sock, array_Size(srv->srv_clients)) == -1) {
1.13 misho 1742: LOGERR;
1743: return -1;
1744: }
1.1 misho 1745:
1.13 misho 1746: if (!schedRead(srv->srv_root, cbProto[srv->srv_proto][CB_ACCEPTCLIENT], srv,
1747: srv->srv_server.cli_sock, NULL, 0)) {
1.10 misho 1748: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1749: return -1;
1750: }
1.7 misho 1751:
1.10 misho 1752: schedPolling(srv->srv_root, &ts, NULL);
1.7 misho 1753: /* main rpc loop */
1.10 misho 1754: schedRun(srv->srv_root, &srv->srv_kill);
1755:
1756: /* close all clients connections & server socket */
1.14 misho 1757: for (i = 0; i < array_Size(srv->srv_clients); i++) {
1758: c = array(srv->srv_clients, i, rpc_cli_t*);
1.10 misho 1759: if (c) {
1.24 misho 1760: if (srv->srv_proto == SOCK_STREAM) {
1761: shutdown(c->cli_sock, SHUT_RDWR);
1762: close(c->cli_sock);
1763: }
1.10 misho 1764:
1765: schedCancelby(srv->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
1.14 misho 1766: ait_freeVars(&RPC_RETVARS(c));
1.10 misho 1767: AIT_FREE_VAL(&c->cli_buf);
1.1 misho 1768: }
1.14 misho 1769: array_Del(srv->srv_clients, i, 42);
1.10 misho 1770: }
1.14 misho 1771: array_Destroy(&srv->srv_clients);
1.2 misho 1772:
1.25 misho 1773: if (srv->srv_proto != SOCK_EXT)
1774: close(srv->srv_server.cli_sock);
1.2 misho 1775:
1.10 misho 1776: /* detach exported calls */
1.12 misho 1777: RPC_FUNCS_LOCK(&srv->srv_funcs);
1778: while ((f = SLIST_FIRST(&srv->srv_funcs))) {
1779: SLIST_REMOVE_HEAD(&srv->srv_funcs, func_next);
1.1 misho 1780:
1.10 misho 1781: AIT_FREE_VAL(&f->func_name);
1.14 misho 1782: e_free(f);
1.1 misho 1783: }
1.12 misho 1784: srv->srv_funcs.avlh_root = NULL;
1785: RPC_FUNCS_UNLOCK(&srv->srv_funcs);
1.1 misho 1786:
1787: return 0;
1788: }
1789:
1790:
1791: /*
1792: * rpc_srv_execCall() Execute registered call from RPC server
1.7 misho 1793: *
1.10 misho 1794: * @cli = RPC client
1.1 misho 1795: * @rpc = IN RPC call structure
1.10 misho 1796: * @funcname = Execute RPC function
1.5 misho 1797: * @args = IN RPC calling arguments from RPC client
1.1 misho 1798: * return: -1 error, !=-1 ok
1799: */
1800: int
1.10 misho 1801: rpc_srv_execCall(rpc_cli_t * __restrict cli, struct tagRPCCall * __restrict rpc,
1802: ait_val_t funcname, array_t * __restrict args)
1.1 misho 1803: {
1804: rpc_callback_t func;
1805:
1.10 misho 1806: if (!cli || !rpc || !AIT_ADDR(&funcname)) {
1.7 misho 1807: rpc_SetErr(EINVAL, "Invalid parameter can`t exec function");
1.1 misho 1808: return -1;
1809: }
1810:
1.10 misho 1811: func = AIT_GET_LIKE(&funcname, rpc_callback_t);
1812: return func(cli, rpc, args);
1.1 misho 1813: }
1.24 misho 1814:
1815:
1816: /*
1817: * rpc_srv_initServer2() - Init & create layer2 RPC Server
1818: *
1819: * @InstID = Instance for authentication & recognition
1820: * @concurentClients = Concurent clients at same time to this server
1821: * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
1822: * @csIface = Interface name for bind server, if NULL first interface on host
1823: * return: NULL == error or !=NULL bind and created RPC server instance
1824: */
1825: rpc_srv_t *
1826: rpc_srv_initServer2(u_char InstID, int concurentClients, int netBuf, const char *csIface)
1827: {
1828: int n = 1;
1829: rpc_srv_t *srv = NULL;
1830: sockaddr_t sa = E_SOCKADDR_INIT;
1831: char szIface[64], szStr[STRSIZ];
1832: register int i;
1833: struct ifreq ifr;
1834: struct bpf_insn insns[] = {
1835: BPF_STMT(BPF_LD + BPF_H + BPF_ABS, 12),
1836: BPF_JUMP(BPF_JMP + BPF_JEQ + BPF_K, RPC_DEFPORT, 0, 1),
1837: BPF_STMT(BPF_RET + BPF_K, -1),
1838: BPF_STMT(BPF_RET + BPF_K, 0),
1839: };
1840: struct bpf_program fcode = {
1841: .bf_len = sizeof(insns) / sizeof(struct bpf_insn),
1842: .bf_insns = insns
1843: };
1844:
1845: if (!concurentClients) {
1846: rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
1847: return NULL;
1848: }
1849: if (!csIface) {
1850: if (e_get1stiface(szIface, sizeof szIface))
1851: return NULL;
1852: } else
1853: strlcpy(szIface, csIface, sizeof szIface);
1854: if (!e_getifacebyname(szIface, &sa))
1855: return NULL;
1856:
1857: #ifdef HAVE_SRANDOMDEV
1858: srandomdev();
1859: #else
1860: time_t tim;
1861:
1862: srandom((time(&tim) ^ getpid()));
1863: #endif
1864:
1865: srv = e_malloc(sizeof(rpc_srv_t));
1866: if (!srv) {
1867: LOGERR;
1868: return NULL;
1869: } else
1870: memset(srv, 0, sizeof(rpc_srv_t));
1871:
1872: srv->srv_proto = SOCK_BPF;
1873: srv->srv_netbuf = netBuf;
1874: srv->srv_session.sess_version = RPC_VERSION;
1875: srv->srv_session.sess_instance = InstID;
1876:
1877: srv->srv_server.cli_parent = srv;
1878: memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa);
1879:
1880: /* init functions */
1881: pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
1882: SLIST_INIT(&srv->srv_funcs);
1883: AVL_INIT(&srv->srv_funcs);
1884:
1885: /* init scheduler */
1886: srv->srv_root = schedBegin();
1887: if (!srv->srv_root) {
1888: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1889: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1890: e_free(srv);
1891: return NULL;
1892: }
1893:
1894: /* init pool for clients */
1895: srv->srv_clients = array_Init(concurentClients);
1896: if (!srv->srv_clients) {
1897: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
1898: schedEnd(&srv->srv_root);
1899: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1900: e_free(srv);
1901: return NULL;
1902: }
1903:
1904: /* create server handler */
1905: for (i = 0; i < 10; i++) {
1906: memset(szStr, 0, sizeof szStr);
1907: snprintf(szStr, sizeof szStr, "/dev/bpf%d", i);
1908: srv->srv_server.cli_sock = open(szStr, O_RDWR);
1909: if (srv->srv_server.cli_sock > STDERR_FILENO)
1910: break;
1911: }
1912: if (srv->srv_server.cli_sock < 3) {
1913: LOGERR;
1914: array_Destroy(&srv->srv_clients);
1915: schedEnd(&srv->srv_root);
1916: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1917: e_free(srv);
1918: return NULL;
1919: }
1920:
1921: if (ioctl(srv->srv_server.cli_sock, BIOCIMMEDIATE, &n) == -1) {
1922: LOGERR;
1923: goto err;
1924: }
1925: if (ioctl(srv->srv_server.cli_sock, BIOCSETF, &fcode) == -1) {
1926: LOGERR;
1927: goto err;
1928: }
1929: n = (netBuf < RPC_MIN_BUFSIZ) ? getpagesize() : E_ALIGN(netBuf, 2);
1930: if (ioctl(srv->srv_server.cli_sock, BIOCSBLEN, &n) == -1) {
1931: LOGERR;
1932: goto err;
1933: } else
1934: srv->srv_netbuf = n;
1935:
1936: memset(&ifr, 0, sizeof ifr);
1937: strlcpy(ifr.ifr_name, szIface, sizeof ifr.ifr_name);
1938: if (ioctl(srv->srv_server.cli_sock, BIOCSETIF, &ifr) == -1) {
1939: LOGERR;
1940: goto err;
1941: } else
1942: fcntl(srv->srv_server.cli_sock, F_SETFL,
1943: fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
1944:
1945: rpc_register_srvPing(srv);
1946:
1947: return srv;
1948: err: /* error condition */
1949: close(srv->srv_server.cli_sock);
1950: array_Destroy(&srv->srv_clients);
1951: schedEnd(&srv->srv_root);
1952: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1953: e_free(srv);
1954: return NULL;
1955: }
1.25 misho 1956:
1957: /*
1958: * rpc_srv_initServerExt() - Init & create pipe RPC Server
1959: *
1960: * @InstID = Instance for authentication & recognition
1961: * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
1962: * @fd = File descriptor
1963: * return: NULL == error or !=NULL bind and created RPC server instance
1964: */
1965: rpc_srv_t *
1966: rpc_srv_initServerExt(u_char InstID, int netBuf, int fd)
1967: {
1968: rpc_srv_t *srv = NULL;
1969:
1970: #ifdef HAVE_SRANDOMDEV
1971: srandomdev();
1972: #else
1973: time_t tim;
1974:
1975: srandom((time(&tim) ^ getpid()));
1976: #endif
1977:
1978: srv = e_malloc(sizeof(rpc_srv_t));
1979: if (!srv) {
1980: LOGERR;
1981: return NULL;
1982: } else
1983: memset(srv, 0, sizeof(rpc_srv_t));
1984:
1985: srv->srv_proto = SOCK_EXT;
1986: srv->srv_netbuf = (netBuf < RPC_MIN_BUFSIZ) ?
1987: getpagesize() : E_ALIGN(netBuf, 2);
1988: srv->srv_session.sess_version = RPC_VERSION;
1989: srv->srv_session.sess_instance = InstID;
1990:
1991: srv->srv_server.cli_parent = srv;
1992: srv->srv_server.cli_sock = fd;
1993:
1994: /* init functions */
1995: pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
1996: SLIST_INIT(&srv->srv_funcs);
1997: AVL_INIT(&srv->srv_funcs);
1998:
1999: /* init scheduler */
2000: srv->srv_root = schedBegin();
2001: if (!srv->srv_root) {
2002: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
2003: pthread_mutex_destroy(&srv->srv_funcs.mtx);
2004: e_free(srv);
2005: return NULL;
2006: }
2007:
2008: /* init pool for clients */
2009: srv->srv_clients = array_Init(1);
2010: if (!srv->srv_clients) {
2011: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
2012: schedEnd(&srv->srv_root);
2013: pthread_mutex_destroy(&srv->srv_funcs.mtx);
2014: e_free(srv);
2015: return NULL;
2016: }
2017:
2018: fcntl(srv->srv_server.cli_sock, F_SETFL,
2019: fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
2020:
2021: rpc_register_srvPing(srv);
2022:
2023: return srv;
2024: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>