Annotation of libaitrpc/src/srv.c, revision 1.26.2.4
1.1 misho 1: /*************************************************************************
2: * (C) 2010 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.26.2.4! misho 6: * $Id: srv.c,v 1.26.2.3 2015/06/24 22:44:24 misho Exp $
1.1 misho 7: *
1.2 misho 8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
1.25 misho 15: Copyright 2004 - 2015
1.2 misho 16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
1.1 misho 46: #include "global.h"
47:
48:
1.13 misho 49: /* SOCK_STREAM */
50: static void *acceptClients(sched_task_t *);
51: static void *closeClient(sched_task_t *);
52: static void *rxPacket(sched_task_t *);
53: static void *txPacket(sched_task_t *);
54:
55: /* SOCK_DGRAM */
56: static void *freeClient(sched_task_t *);
57: static void *rxUDPPacket(sched_task_t *);
58: static void *txUDPPacket(sched_task_t *);
59:
60: /* SOCK_RAW */
1.26 misho 61: static void *rxRAWPacket(sched_task_t *);
62: static void *txRAWPacket(sched_task_t *);
1.13 misho 63:
1.24 misho 64: /* SOCK_BPF */
65: static void *rxBPFPacket(sched_task_t *);
66: static void *txBPFPacket(sched_task_t *);
67:
1.25 misho 68: /* SOCK_EXT */
69: static void *rxEXTPacket(sched_task_t *);
70: static void *txEXTPacket(sched_task_t *);
71:
72: static sched_task_func_t cbProto[SOCK_MAX_SUPPORT][4] = {
1.13 misho 73: { acceptClients, closeClient, rxPacket, txPacket }, /* SOCK_STREAM */
74: { acceptClients, closeClient, rxPacket, txPacket }, /* SOCK_STREAM */
75: { rxUDPPacket, freeClient, rxUDPPacket, txUDPPacket }, /* SOCK_DGRAM */
1.26 misho 76: { rxRAWPacket, freeClient, rxRAWPacket, txRAWPacket }, /* SOCK_RAW */
1.25 misho 77: { rxBPFPacket, freeClient, rxBPFPacket, txBPFPacket }, /* SOCK_BPF */
78: { rxEXTPacket, freeClient, rxEXTPacket, txEXTPacket } /* SOCK_EXT */
1.13 misho 79: };
80:
1.23 misho 81: /* Global Signal Argument when kqueue support disabled */
82:
83: static volatile uintptr_t _glSigArg = 0;
84:
1.13 misho 85:
1.16 misho 86: void
1.13 misho 87: rpc_freeCli(rpc_cli_t * __restrict c)
1.10 misho 88: {
89: rpc_srv_t *s = c->cli_parent;
90:
1.13 misho 91: schedCancelby(s->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
1.10 misho 92:
93: /* free buffer */
94: AIT_FREE_VAL(&c->cli_buf);
95:
1.14 misho 96: array_Del(s->srv_clients, c->cli_id, 0);
1.10 misho 97: if (c)
1.14 misho 98: e_free(c);
1.13 misho 99: }
100:
101:
102: static inline int
1.14 misho 103: _check4freeslot(rpc_srv_t * __restrict srv, sockaddr_t * __restrict sa)
1.13 misho 104: {
105: rpc_cli_t *c = NULL;
106: register int i;
107:
108: /* check free slots for connect */
1.14 misho 109: for (i = 0; i < array_Size(srv->srv_clients) &&
110: (c = array(srv->srv_clients, i, rpc_cli_t*)); i++)
1.13 misho 111: /* check for duplicates */
1.14 misho 112: if (sa && !e_addrcmp(&c->cli_sa, sa, 42))
1.13 misho 113: break;
1.14 misho 114: if (i >= array_Size(srv->srv_clients))
1.13 misho 115: return -1; /* no more free slots! */
116:
117: return i;
118: }
119:
120: static rpc_cli_t *
1.14 misho 121: _allocClient(rpc_srv_t * __restrict srv, sockaddr_t * __restrict sa)
1.13 misho 122: {
123: rpc_cli_t *c = NULL;
124: int n;
125:
1.25 misho 126: if (srv->srv_proto != SOCK_EXT)
127: n = _check4freeslot(srv, sa);
128: else
129: n = 0;
1.13 misho 130: if (n == -1)
131: return NULL;
132: else
1.14 misho 133: c = array(srv->srv_clients, n, rpc_cli_t*);
1.13 misho 134:
135: if (!c) {
1.14 misho 136: c = e_malloc(sizeof(rpc_cli_t));
1.13 misho 137: if (!c) {
138: LOGERR;
139: srv->srv_kill = 1;
140: return NULL;
141: } else {
142: memset(c, 0, sizeof(rpc_cli_t));
1.14 misho 143: array_Set(srv->srv_clients, n, c);
1.13 misho 144: c->cli_id = n;
145: c->cli_parent = srv;
146: }
147:
148: /* alloc empty buffer */
1.14 misho 149: AIT_SET_BUFSIZ(&c->cli_buf, 0, srv->srv_netbuf);
1.13 misho 150: }
151:
152: return c;
153: }
154:
155:
156: static void *
157: freeClient(sched_task_t *task)
158: {
159: rpc_freeCli(TASK_ARG(task));
160:
161: return NULL;
162: }
163:
164: static void *
165: closeClient(sched_task_t *task)
166: {
167: int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock;
168:
169: rpc_freeCli(TASK_ARG(task));
170:
171: /* close client socket */
172: shutdown(sock, SHUT_RDWR);
173: close(sock);
1.10 misho 174: return NULL;
175: }
1.7 misho 176:
177: static void *
178: txPacket(sched_task_t *task)
179: {
180: rpc_cli_t *c = TASK_ARG(task);
181: rpc_srv_t *s = c->cli_parent;
182: rpc_func_t *f = NULL;
1.18 misho 183: u_char *buf = AIT_GET_BUF(&c->cli_buf);
1.7 misho 184: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
1.18 misho 185: int ret, estlen, wlen = sizeof(struct tagRPCCall);
1.19 misho 186: struct pollfd pfd;
1.21 misho 187: #ifdef TCP_SESSION_TIMEOUT
188: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
189:
190: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
191: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
192: TASK_ARG(task), ts, TASK_ARG(task), 0);
193: #endif
1.7 misho 194:
195: if (rpc->call_argc) {
1.10 misho 196: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
1.7 misho 197: if (!f) {
1.10 misho 198: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
1.18 misho 199:
1.7 misho 200: rpc->call_argc ^= rpc->call_argc;
201: rpc->call_rep.ret = RPC_ERROR(-1);
202: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
203: } else {
1.18 misho 204: /* calc estimated length */
205: estlen = ait_resideVars(RPC_RETVARS(c)) + wlen;
206: if (estlen > AIT_LEN(&c->cli_buf))
207: AIT_RE_BUF(&c->cli_buf, estlen);
208: buf = AIT_GET_BUF(&c->cli_buf);
1.19 misho 209: rpc = (struct tagRPCCall*) buf;
1.18 misho 210:
1.25 misho 211: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
1.7 misho 212: /* Go Encapsulate variables */
1.18 misho 213: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
214: RPC_RETVARS(c));
1.10 misho 215: /* Free return values */
1.14 misho 216: ait_freeVars(&c->cli_vars);
1.7 misho 217: if (ret == -1) {
218: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
1.19 misho 219:
1.7 misho 220: rpc->call_argc ^= rpc->call_argc;
221: rpc->call_rep.ret = RPC_ERROR(-1);
222: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
223: } else
224: wlen += ret;
225: }
226: }
227:
1.18 misho 228: rpc->call_len = htonl(wlen);
1.25 misho 229: rpc->call_io = RPC_ACK;
1.8 misho 230:
1.15 misho 231: #if 0
1.7 misho 232: /* calculate CRC */
233: rpc->call_crc ^= rpc->call_crc;
1.8 misho 234: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
1.15 misho 235: #endif
1.7 misho 236:
237: /* send reply */
1.19 misho 238: pfd.fd = TASK_FD(task);
239: pfd.events = POLLOUT;
240: for (; wlen > 0; wlen -= ret, buf += ret) {
241: if ((ret = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 ||
242: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
243: if (ret)
244: LOGERR;
245: else
1.22 misho 246: rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
1.19 misho 247: /* close connection */
248: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
249: TASK_ARG(task), 0, NULL, 0);
250: return NULL;
251: }
252: ret = send(TASK_FD(task), buf, MIN(wlen, s->srv_netbuf), MSG_NOSIGNAL);
253: if (ret == -1) {
254: /* close connection */
255: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
256: TASK_ARG(task), 0, NULL, 0);
257: return NULL;
258: }
1.10 misho 259: }
1.7 misho 260:
261: return NULL;
262: }
263:
264: static void *
265: execCall(sched_task_t *task)
266: {
267: rpc_cli_t *c = TASK_ARG(task);
268: rpc_srv_t *s = c->cli_parent;
269: rpc_func_t *f = NULL;
270: array_t *arr = NULL;
1.18 misho 271: u_char *buf = AIT_GET_BUF(&c->cli_buf);
1.7 misho 272: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
1.25 misho 273: int argc = rpc->call_argc;
1.7 misho 274:
275: /* Go decapsulate variables ... */
1.8 misho 276: if (argc) {
1.14 misho 277: arr = ait_buffer2vars(buf + sizeof(struct tagRPCCall),
1.18 misho 278: AIT_LEN(&c->cli_buf) - sizeof(struct tagRPCCall), argc, 42);
1.7 misho 279: if (!arr) {
1.14 misho 280: rpc_SetErr(ERPCMISMATCH, "#%d - %s", elwix_GetErrno(), elwix_GetError());
1.18 misho 281:
1.7 misho 282: rpc->call_argc ^= rpc->call_argc;
283: rpc->call_rep.ret = RPC_ERROR(-1);
284: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
285: return NULL;
286: }
1.10 misho 287: } else
288: arr = NULL;
1.7 misho 289:
1.10 misho 290: if (!(f = rpc_srv_getCall(s, ntohs(rpc->call_tag)))) {
1.7 misho 291: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
1.18 misho 292:
1.7 misho 293: rpc->call_argc ^= rpc->call_argc;
294: rpc->call_rep.ret = RPC_ERROR(-1);
295: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
296: } else {
1.8 misho 297: /* if client doesn't want reply */
1.10 misho 298: argc = RPC_CHK_NOREPLY(rpc);
299: rpc->call_rep.ret = RPC_ERROR(rpc_srv_execCall(c, rpc, f->func_name, arr));
1.7 misho 300: if (rpc->call_rep.ret == htonl(-1)) {
1.20 misho 301: if (!rpc->call_rep.eno) {
302: LOGERR;
303: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
304: }
1.7 misho 305: rpc->call_argc ^= rpc->call_argc;
306: } else {
307: rpc->call_rep.eno ^= rpc->call_rep.eno;
1.8 misho 308: if (argc) {
309: /* without reply */
1.14 misho 310: ait_freeVars(&c->cli_vars);
1.8 misho 311: rpc->call_argc ^= rpc->call_argc;
1.10 misho 312: } else {
313: /* reply */
1.25 misho 314: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
1.10 misho 315: }
1.7 misho 316: }
317: }
318:
1.14 misho 319: array_Destroy(&arr);
1.7 misho 320: return NULL;
321: }
322:
323: static void *
324: rxPacket(sched_task_t *task)
325: {
326: rpc_cli_t *c = TASK_ARG(task);
327: rpc_srv_t *s = c->cli_parent;
1.18 misho 328: int len, rlen, noreply, estlen;
1.15 misho 329: #if 0
330: u_short crc;
331: #endif
1.10 misho 332: u_char *buf = AIT_GET_BUF(&c->cli_buf);
1.18 misho 333: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
1.19 misho 334: struct pollfd pfd;
1.21 misho 335: #ifdef TCP_SESSION_TIMEOUT
336: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
337:
338: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
339: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
340: TASK_ARG(task), ts, TASK_ARG(task), 0);
341: #endif
1.7 misho 342:
1.18 misho 343: memset(buf, 0, sizeof(struct tagRPCCall));
1.19 misho 344: rlen = recv(TASK_FD(task), rpc, sizeof(struct tagRPCCall), MSG_PEEK);
1.18 misho 345: if (rlen < sizeof(struct tagRPCCall)) {
1.10 misho 346: /* close connection */
1.13 misho 347: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
348: TASK_ARG(task), 0, NULL, 0);
1.7 misho 349: return NULL;
1.10 misho 350: } else {
1.18 misho 351: estlen = ntohl(rpc->call_len);
352: if (estlen > AIT_LEN(&c->cli_buf))
353: AIT_RE_BUF(&c->cli_buf, estlen);
354: rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
1.19 misho 355: buf = AIT_GET_BUF(&c->cli_buf);
356: len = estlen;
1.18 misho 357: }
358:
1.19 misho 359: /* get next part of packet */
360: memset(buf, 0, len);
361: pfd.fd = TASK_FD(task);
362: pfd.events = POLLIN | POLLPRI;
363: for (; len > 0; len -= rlen, buf += rlen) {
364: if ((rlen = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 ||
365: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
366: if (rlen)
367: LOGERR;
368: else
1.22 misho 369: rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
1.19 misho 370: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
371: TASK_ARG(task), 0, NULL, 0);
372: return NULL;
373: }
1.18 misho 374: rlen = recv(TASK_FD(task), buf, len, 0);
375: if (rlen == -1) {
376: /* close connection */
377: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
378: TASK_ARG(task), 0, NULL, 0);
1.8 misho 379: return NULL;
1.10 misho 380: }
1.18 misho 381: }
1.22 misho 382: len = estlen;
1.8 misho 383:
1.25 misho 384: /* skip loop packet */
385: if (rpc->call_io & RPC_ACK) {
386: schedReadSelf(task);
387: return NULL;
388: }
389:
1.15 misho 390: #if 0
1.18 misho 391: /* check integrity of packet */
392: crc = ntohs(rpc->call_crc);
393: rpc->call_crc ^= rpc->call_crc;
394: if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
395: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
396: return NULL;
397: }
1.15 misho 398: #endif
1.7 misho 399:
1.18 misho 400: noreply = RPC_CHK_NOREPLY(rpc);
1.7 misho 401:
1.18 misho 402: /* check RPC packet session info */
403: if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) {
404: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
1.7 misho 405:
1.18 misho 406: rpc->call_argc ^= rpc->call_argc;
407: rpc->call_rep.ret = RPC_ERROR(-1);
408: rpc->call_rep.eno = RPC_ERROR(errno);
409: } else {
410: /* execute RPC call */
411: schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), (int) noreply, rpc, len);
412: }
1.7 misho 413:
1.18 misho 414: /* send RPC reply */
415: if (!noreply)
416: schedWrite(TASK_ROOT(task), cbProto[s->srv_proto][CB_TXPACKET],
417: TASK_ARG(task), TASK_FD(task), rpc, len);
1.7 misho 418:
419: /* lets get next packet */
1.18 misho 420: schedReadSelf(task);
1.7 misho 421: return NULL;
422: }
423:
1.1 misho 424: static void *
1.10 misho 425: acceptClients(sched_task_t *task)
1.1 misho 426: {
1.10 misho 427: rpc_srv_t *srv = TASK_ARG(task);
428: rpc_cli_t *c = NULL;
1.14 misho 429: socklen_t salen = sizeof(sockaddr_t);
1.21 misho 430: int sock;
431: #ifdef TCP_SESSION_TIMEOUT
432: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
433: #endif
1.7 misho 434:
1.13 misho 435: c = _allocClient(srv, NULL);
1.21 misho 436: if (!c) {
437: EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
438: if ((sock = accept(TASK_FD(task), NULL, NULL)) != -1) {
439: shutdown(sock, SHUT_RDWR);
440: close(sock);
441: }
1.10 misho 442: goto end;
1.21 misho 443: }
1.10 misho 444:
445: /* accept client */
446: c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
447: if (c->cli_sock == -1) {
448: LOGERR;
449: AIT_FREE_VAL(&c->cli_buf);
1.14 misho 450: array_Del(srv->srv_clients, c->cli_id, 42);
1.10 misho 451: goto end;
1.1 misho 452: } else
1.10 misho 453: fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
1.1 misho 454:
1.21 misho 455: #ifdef TCP_SESSION_TIMEOUT
456: /* armed timer for close stateless connection */
457: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
458: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], c,
459: ts, c, 0);
460: #endif
1.13 misho 461: schedRead(TASK_ROOT(task), cbProto[srv->srv_proto][CB_RXPACKET], c,
462: c->cli_sock, NULL, 0);
1.10 misho 463: end:
464: schedReadSelf(task);
465: return NULL;
466: }
1.5 misho 467:
1.7 misho 468:
1.10 misho 469: static void *
1.13 misho 470: txUDPPacket(sched_task_t *task)
1.10 misho 471: {
472: rpc_cli_t *c = TASK_ARG(task);
473: rpc_srv_t *s = c->cli_parent;
1.13 misho 474: rpc_func_t *f = NULL;
1.18 misho 475: u_char *buf = AIT_GET_BUF(&c->cli_buf);
1.13 misho 476: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
1.22 misho 477: int ret, estlen, wlen = sizeof(struct tagRPCCall);
1.13 misho 478: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
1.22 misho 479: struct pollfd pfd;
1.13 misho 480:
481: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
482: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
483: TASK_ARG(task), ts, TASK_ARG(task), 0);
484:
485: if (rpc->call_argc) {
486: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
487: if (!f) {
488: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
489: rpc->call_argc ^= rpc->call_argc;
490: rpc->call_rep.ret = RPC_ERROR(-1);
491: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
492: } else {
1.22 misho 493: /* calc estimated length */
494: estlen = ait_resideVars(RPC_RETVARS(c)) + wlen;
495: if (estlen > AIT_LEN(&c->cli_buf))
496: AIT_RE_BUF(&c->cli_buf, estlen);
497: buf = AIT_GET_BUF(&c->cli_buf);
498: rpc = (struct tagRPCCall*) buf;
499:
1.25 misho 500: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
1.13 misho 501: /* Go Encapsulate variables */
1.18 misho 502: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
503: RPC_RETVARS(c));
1.13 misho 504: /* Free return values */
1.14 misho 505: ait_freeVars(&c->cli_vars);
1.13 misho 506: if (ret == -1) {
507: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
508: rpc->call_argc ^= rpc->call_argc;
509: rpc->call_rep.ret = RPC_ERROR(-1);
510: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
511: } else
512: wlen += ret;
513: }
514: }
1.7 misho 515:
1.18 misho 516: rpc->call_len = htonl(wlen);
1.25 misho 517: rpc->call_io = RPC_ACK;
1.7 misho 518:
1.13 misho 519: /* calculate CRC */
520: rpc->call_crc ^= rpc->call_crc;
521: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
522:
523: /* send reply */
1.22 misho 524: pfd.fd = TASK_FD(task);
525: pfd.events = POLLOUT;
526: for (; wlen > 0; wlen -= ret, buf += ret) {
527: if ((ret = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 ||
528: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
529: if (ret)
530: LOGERR;
531: else
532: rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
533: /* close connection */
534: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
535: TASK_ARG(task), 0, NULL, 0);
536: return NULL;
537: }
538: ret = sendto(TASK_FD(task), buf, MIN(wlen, s->srv_netbuf), MSG_NOSIGNAL,
539: &c->cli_sa.sa, c->cli_sa.sa.sa_len);
540: if (ret == -1) {
541: /* close connection */
542: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
543: TASK_ARG(task), 0, NULL, 0);
544: return NULL;
545: }
1.13 misho 546: }
547:
548: return NULL;
549: }
550:
551: static void *
552: rxUDPPacket(sched_task_t *task)
553: {
554: rpc_srv_t *srv = TASK_ARG(task);
555: rpc_cli_t *c = NULL;
1.22 misho 556: int len, rlen, noreply, estlen;
557: u_short crc;
558: u_char *buf, b[sizeof(struct tagRPCCall)];
559: struct tagRPCCall *rpc = (struct tagRPCCall*) b;
1.14 misho 560: sockaddr_t sa;
561: socklen_t salen;
1.13 misho 562: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
1.22 misho 563: struct pollfd pfd;
1.13 misho 564:
565: /* receive connect packet */
1.14 misho 566: salen = sa.ss.ss_len = sizeof(sockaddr_t);
1.22 misho 567: rlen = recvfrom(TASK_FD(task), b, sizeof b, MSG_PEEK, &sa.sa, &salen);
568: if (rlen < sizeof(struct tagRPCCall)) {
569: rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
1.13 misho 570: goto end;
1.22 misho 571: }
1.13 misho 572:
573: c = _allocClient(srv, &sa);
1.22 misho 574: if (!c) {
575: EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
576: usleep(2000); /* blocked client delay */
1.13 misho 577: goto end;
1.22 misho 578: } else {
579: estlen = ntohl(rpc->call_len);
580: if (estlen > AIT_LEN(&c->cli_buf))
581: AIT_RE_BUF(&c->cli_buf, estlen);
582: rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
583: buf = AIT_GET_BUF(&c->cli_buf);
584: len = estlen;
585:
1.13 misho 586: c->cli_sock = TASK_FD(task);
587: memcpy(&c->cli_sa, &sa, sizeof c->cli_sa);
1.22 misho 588:
1.13 misho 589: /* armed timer for close stateless connection */
590: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
591: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
592: c, ts, c, 0);
593: }
594:
1.22 misho 595: /* get next part of packet */
596: memset(buf, 0, len);
597: pfd.fd = TASK_FD(task);
598: pfd.events = POLLIN | POLLPRI;
599: for (; len > 0; len -= rlen, buf += rlen) {
600: if ((rlen = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 ||
601: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
602: if (rlen)
603: LOGERR;
604: else
605: rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
606: schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
607: c, 0, NULL, 0);
1.24 misho 608: goto end;
1.13 misho 609: }
1.22 misho 610: salen = sa.ss.ss_len = sizeof(sockaddr_t);
611: rlen = recvfrom(TASK_FD(task), buf, len, 0, &sa.sa, &salen);
612: if (rlen == -1) {
613: /* close connection */
614: schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
615: c, 0, NULL, 0);
1.24 misho 616: goto end;
1.13 misho 617: }
1.22 misho 618: if (e_addrcmp(&c->cli_sa, &sa, 42))
619: rlen ^= rlen; /* skip if arrive from different address */
620: }
621: len = estlen;
1.13 misho 622:
1.25 misho 623: /* skip loop packet */
624: if (rpc->call_io & RPC_ACK)
625: goto end;
626:
1.22 misho 627: /* check integrity of packet */
628: crc = ntohs(rpc->call_crc);
629: rpc->call_crc ^= rpc->call_crc;
630: if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
631: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
1.24 misho 632: /* close connection */
633: schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
634: c, 0, NULL, 0);
635: goto end;
636: }
637:
638: noreply = RPC_CHK_NOREPLY(rpc);
639:
640: /* check RPC packet session info */
641: if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
642: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
643:
644: rpc->call_argc ^= rpc->call_argc;
645: rpc->call_rep.ret = RPC_ERROR(-1);
646: rpc->call_rep.eno = RPC_ERROR(errno);
647: } else {
648: /* execute RPC call */
649: schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
650: }
651:
652: /* send RPC reply */
653: if (!noreply)
654: schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],
655: c, TASK_FD(task), rpc, len);
656: end:
657: schedReadSelf(task);
658: return NULL;
659: }
660:
661:
662: static void *
1.26 misho 663: txRAWPacket(sched_task_t *task)
664: {
665: rpc_cli_t *c = TASK_ARG(task);
666: rpc_srv_t *s = c->cli_parent;
667: rpc_func_t *f = NULL;
668: u_char *buf = AIT_GET_BUF(&c->cli_buf);
669: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
670: int ret, estlen, wlen = sizeof(struct tagRPCCall);
671: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
672:
673: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
674: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
675: TASK_ARG(task), ts, TASK_ARG(task), 0);
676:
677: if (rpc->call_argc) {
678: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
679: if (!f) {
680: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
681: rpc->call_argc ^= rpc->call_argc;
682: rpc->call_rep.ret = RPC_ERROR(-1);
683: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
684: } else {
685: /* calc estimated length */
686: estlen = ait_resideVars(RPC_RETVARS(c)) + wlen;
687: if (estlen > AIT_LEN(&c->cli_buf))
688: AIT_RE_BUF(&c->cli_buf, estlen);
689: buf = AIT_GET_BUF(&c->cli_buf);
690: rpc = (struct tagRPCCall*) buf;
691:
692: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
693: /* Go Encapsulate variables */
694: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
695: RPC_RETVARS(c));
696: /* Free return values */
697: ait_freeVars(&c->cli_vars);
698: if (ret == -1) {
699: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
700: rpc->call_argc ^= rpc->call_argc;
701: rpc->call_rep.ret = RPC_ERROR(-1);
702: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
703: } else
704: wlen += ret;
705: }
706: }
707:
708: rpc->call_len = htonl(wlen);
709: rpc->call_io = RPC_ACK;
710:
711: /* calculate CRC */
712: rpc->call_crc ^= rpc->call_crc;
713: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
714:
715: /* send reply */
1.26.2.1 misho 716: ret = sendto(TASK_FD(task), buf, MIN(wlen, s->srv_netbuf), MSG_NOSIGNAL,
717: &c->cli_sa.sa, c->cli_sa.sa.sa_len);
718: if (ret == -1) {
719: /* close connection */
720: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
721: TASK_ARG(task), 0, NULL, 0);
722: return NULL;
1.26 misho 723: }
724:
725: return NULL;
726: }
727:
728: static void *
729: rxRAWPacket(sched_task_t *task)
730: {
731: rpc_srv_t *srv = TASK_ARG(task);
732: rpc_cli_t *c = NULL;
1.26.2.1 misho 733: int len, rlen, noreply;
1.26 misho 734: u_short crc;
735: struct tagRPCCall *rpc;
736: sockaddr_t sa;
737: socklen_t salen;
738: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
1.26.2.1 misho 739: ait_val_t b = AIT_VAL_INIT;
1.26 misho 740:
741: /* receive connect packet */
1.26.2.1 misho 742: AIT_SET_BUF(&b, NULL, srv->srv_netbuf);
1.26 misho 743: salen = sa.ss.ss_len = sizeof(sockaddr_t);
1.26.2.1 misho 744: rlen = recvfrom(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b), 0, &sa.sa, &salen);
745: if (sa.sa.sa_family == AF_INET) {
746: struct ip *h;
747: h = (struct ip*) AIT_GET_BUF(&b);
748: if (rlen < ntohs(h->ip_len) || h->ip_p != IPPROTO_ERPC)
749: goto end;
750: else {
751: rlen -= sizeof(struct ip);
752: rpc = (struct tagRPCCall*)
753: (AIT_GET_BUF(&b) + sizeof(struct ip));
754: }
755: } else {
756: struct ip6_hdr *h;
757: h = (struct ip6_hdr*) AIT_GET_BUF(&b);
758: if (rlen < (ntohs(h->ip6_plen) + sizeof(struct ip6_hdr)) ||
759: h->ip6_nxt != IPPROTO_ERPC)
760: goto end;
761: else {
762: rlen -= sizeof(struct ip6_hdr);
763: rpc = (struct tagRPCCall*)
764: (AIT_GET_BUF(&b) + sizeof(struct ip6_hdr));
765: }
766: }
767: if (rlen < sizeof(struct tagRPCCall)) {
1.26 misho 768: rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
769: goto end;
770: }
771:
1.26.2.1 misho 772: /* skip loop packet */
773: if (rpc->call_io & RPC_ACK)
774: goto end;
775:
1.26 misho 776: c = _allocClient(srv, &sa);
777: if (!c) {
778: EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
779: usleep(2000); /* blocked client delay */
780: goto end;
781: } else {
1.26.2.1 misho 782: len = ntohl(rpc->call_len);
783: if (len > AIT_LEN(&c->cli_buf))
784: AIT_RE_BUF(&c->cli_buf, len);
785: memcpy(AIT_GET_BUF(&c->cli_buf), rpc, len);
786: rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
1.26 misho 787:
788: c->cli_sock = TASK_FD(task);
789: memcpy(&c->cli_sa, &sa, sizeof c->cli_sa);
790:
791: /* armed timer for close stateless connection */
792: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
793: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
794: c, ts, c, 0);
795: }
796:
797: /* check integrity of packet */
798: crc = ntohs(rpc->call_crc);
799: rpc->call_crc ^= rpc->call_crc;
800: if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
801: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
802: /* close connection */
803: schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
804: c, 0, NULL, 0);
805: goto end;
806: }
807:
808: noreply = RPC_CHK_NOREPLY(rpc);
809:
810: /* check RPC packet session info */
811: if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
812: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
813:
814: rpc->call_argc ^= rpc->call_argc;
815: rpc->call_rep.ret = RPC_ERROR(-1);
816: rpc->call_rep.eno = RPC_ERROR(errno);
817: } else {
818: /* execute RPC call */
819: schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
820: }
821:
822: /* send RPC reply */
823: if (!noreply)
824: schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],
825: c, TASK_FD(task), rpc, len);
826: end:
1.26.2.1 misho 827: AIT_FREE_VAL(&b);
1.26 misho 828: schedReadSelf(task);
829: return NULL;
830: }
831:
832:
833: static void *
1.24 misho 834: txBPFPacket(sched_task_t *task)
835: {
836: rpc_cli_t *c = TASK_ARG(task);
837: rpc_srv_t *s = c->cli_parent;
838: rpc_func_t *f = NULL;
839: u_char *buf = AIT_GET_BUF(&c->cli_buf);
840: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
841: int ret, len, wlen = sizeof(struct tagRPCCall);
842: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
843: struct ether_header *eh;
844: ait_val_t b = AIT_VAL_INIT;
845:
846: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
847: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
848: TASK_ARG(task), ts, TASK_ARG(task), 0);
849:
850: if (rpc->call_argc) {
851: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
852: if (!f) {
853: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
854: rpc->call_argc ^= rpc->call_argc;
855: rpc->call_rep.ret = RPC_ERROR(-1);
856: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
857: } else {
858: /* calc estimated length */
859: len = ait_resideVars(RPC_RETVARS(c)) + wlen;
860: if (len > AIT_LEN(&c->cli_buf))
861: AIT_RE_BUF(&c->cli_buf, len);
862: buf = AIT_GET_BUF(&c->cli_buf);
863: rpc = (struct tagRPCCall*) buf;
864:
1.25 misho 865: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
1.24 misho 866: /* Go Encapsulate variables */
867: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
868: RPC_RETVARS(c));
869: /* Free return values */
870: ait_freeVars(&RPC_RETVARS(c));
871: if (ret == -1) {
872: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
873: rpc->call_argc ^= rpc->call_argc;
874: rpc->call_rep.ret = RPC_ERROR(-1);
875: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
876: } else
877: wlen += ret;
878: }
879: }
880:
881: rpc->call_len = htonl(wlen);
1.25 misho 882: rpc->call_io = RPC_ACK;
1.24 misho 883:
884: /* calculate CRC */
885: rpc->call_crc ^= rpc->call_crc;
886: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
887:
888: /* send reply */
889: AIT_SET_BUF(&b, NULL, MIN(wlen, s->srv_netbuf) + ETHER_HDR_LEN);
890: eh = (struct ether_header*) AIT_GET_BUF(&b);
891: memcpy(eh->ether_dhost, LLADDR(&c->cli_sa.sdl), ETHER_ADDR_LEN);
892: eh->ether_type = htons(RPC_DEFPORT);
893: memcpy(eh + 1, buf, MIN(wlen, s->srv_netbuf));
894:
895: ret = write(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b));
896: AIT_FREE_VAL(&b);
897: if (ret == -1) {
898: /* close connection */
899: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
900: TASK_ARG(task), 0, NULL, 0);
1.22 misho 901: return NULL;
902: }
1.13 misho 903:
1.24 misho 904: return NULL;
905: }
906:
907: static void *
908: rxBPFPacket(sched_task_t *task)
909: {
910: rpc_srv_t *srv = TASK_ARG(task);
911: rpc_cli_t *c = NULL;
912: int len, rlen, noreply;
913: u_short crc;
914: struct tagRPCCall *rpc;
915: sockaddr_t sa;
916: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
917: struct bpf_hdr *h;
918: struct ether_header *eh;
919: ait_val_t b = AIT_VAL_INIT;
920:
921: /* receive connect packet */
922: AIT_SET_BUF(&b, NULL, srv->srv_netbuf);
923: rlen = read(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b));
924: h = (struct bpf_hdr*) AIT_GET_BUF(&b);
925: rlen -= h->bh_hdrlen;
926: if (rlen < h->bh_datalen || h->bh_caplen != h->bh_datalen ||
927: rlen < ETHER_HDR_LEN + sizeof(struct tagRPCCall)) {
928: rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
929: goto end;
930: } else {
931: rlen = h->bh_caplen;
932: eh = (struct ether_header*) (AIT_GET_BUF(&b) + h->bh_hdrlen);
933: rlen -= ETHER_HDR_LEN;
934: rpc = (struct tagRPCCall*) (eh + 1);
1.25 misho 935:
936: #if 0
937: /* skip loop packet */
938: if (rpc->call_io & RPC_ACK)
939: goto end;
940: #endif
941:
1.24 misho 942: if (eh->ether_type != ntohs(RPC_DEFPORT))
943: goto end;
944: else
945: e_getlinkbymac((const ether_addr_t*) eh->ether_shost, &sa);
946: }
947:
948: c = _allocClient(srv, &sa);
949: if (!c) {
950: EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
951: usleep(2000); /* blocked client delay */
952: goto end;
953: } else {
954: len = ntohl(rpc->call_len);
955: if (len > AIT_LEN(&c->cli_buf))
956: AIT_RE_BUF(&c->cli_buf, len);
1.26.2.1 misho 957: memcpy(AIT_GET_BUF(&c->cli_buf), rpc, len);
1.24 misho 958: rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
959:
960: c->cli_sock = TASK_FD(task);
961: memcpy(&c->cli_sa, &sa, sizeof c->cli_sa);
962:
963: /* armed timer for close stateless connection */
964: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
965: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
966: c, ts, c, 0);
967: }
968:
969: /* check integrity of packet */
970: crc = ntohs(rpc->call_crc);
971: rpc->call_crc ^= rpc->call_crc;
972: if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
973: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
974: /* close connection */
975: schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
976: c, 0, NULL, 0);
977: goto end;
978: }
979:
1.22 misho 980: noreply = RPC_CHK_NOREPLY(rpc);
1.18 misho 981:
1.22 misho 982: /* check RPC packet session info */
983: if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
984: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
1.13 misho 985:
1.22 misho 986: rpc->call_argc ^= rpc->call_argc;
987: rpc->call_rep.ret = RPC_ERROR(-1);
988: rpc->call_rep.eno = RPC_ERROR(errno);
989: } else {
990: /* execute RPC call */
991: schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
992: }
1.13 misho 993:
1.22 misho 994: /* send RPC reply */
995: if (!noreply)
1.24 misho 996: schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],
1.22 misho 997: c, TASK_FD(task), rpc, len);
1.13 misho 998: end:
1.24 misho 999: AIT_FREE_VAL(&b);
1.13 misho 1000: schedReadSelf(task);
1001: return NULL;
1002: }
1003:
1.25 misho 1004:
1005: static void *
1006: txEXTPacket(sched_task_t *task)
1007: {
1008: rpc_cli_t *c = TASK_ARG(task);
1009: rpc_srv_t *s = c->cli_parent;
1010: rpc_func_t *f = NULL;
1011: u_char *buf = AIT_GET_BUF(&c->cli_buf);
1012: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
1013: int ret, len, wlen = sizeof(struct tagRPCCall);
1014: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
1015:
1016: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
1017: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
1018: TASK_ARG(task), ts, TASK_ARG(task), 0);
1019:
1020: if (rpc->call_argc) {
1021: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
1022: if (!f) {
1023: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
1024: rpc->call_argc ^= rpc->call_argc;
1025: rpc->call_rep.ret = RPC_ERROR(-1);
1026: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
1027: } else {
1028: /* calc estimated length */
1029: len = ait_resideVars(RPC_RETVARS(c)) + wlen;
1030: if (len > AIT_LEN(&c->cli_buf))
1031: AIT_RE_BUF(&c->cli_buf, len);
1032: buf = AIT_GET_BUF(&c->cli_buf);
1033: rpc = (struct tagRPCCall*) buf;
1034:
1035: rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
1036: /* Go Encapsulate variables */
1037: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
1038: RPC_RETVARS(c));
1039: /* Free return values */
1040: ait_freeVars(&RPC_RETVARS(c));
1041: if (ret == -1) {
1042: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
1043: rpc->call_argc ^= rpc->call_argc;
1044: rpc->call_rep.ret = RPC_ERROR(-1);
1045: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
1046: } else
1047: wlen += ret;
1048: }
1049: }
1050:
1051: rpc->call_len = htonl(wlen);
1052: rpc->call_io = RPC_ACK;
1053:
1054: /* send reply */
1055: ret = write(TASK_FD(task), buf, MIN(wlen, s->srv_netbuf));
1056: if (ret == -1) {
1057: /* close connection */
1058: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
1059: TASK_ARG(task), 0, NULL, 0);
1060: return NULL;
1061: }
1062:
1063: return NULL;
1064: }
1065:
1066: static void *
1067: rxEXTPacket(sched_task_t *task)
1068: {
1069: rpc_srv_t *srv = TASK_ARG(task);
1070: rpc_cli_t *c = NULL;
1071: int len, rlen, noreply;
1072: struct tagRPCCall *rpc;
1073: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
1074: ait_val_t b = AIT_VAL_INIT;
1075: sockaddr_t sa;
1076:
1077: memset(&sa, 0, sizeof sa);
1078: /* receive connect packet */
1079: AIT_SET_BUF(&b, NULL, srv->srv_netbuf);
1080: rlen = read(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b));
1081: if (rlen < sizeof(struct tagRPCCall)) {
1082: rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
1083: goto end;
1084: } else {
1085: rpc = (struct tagRPCCall*) AIT_GET_BUF(&b);
1086:
1087: /* skip loop packet */
1088: if (rpc->call_io & RPC_ACK)
1089: goto end;
1090: }
1091:
1092: c = _allocClient(srv, &sa);
1093: if (!c) {
1094: EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
1095: usleep(2000); /* blocked client delay */
1096: goto end;
1097: } else {
1098: len = ntohl(rpc->call_len);
1099: if (len > AIT_LEN(&c->cli_buf))
1100: AIT_RE_BUF(&c->cli_buf, len);
1101: memcpy(AIT_GET_BUF(&c->cli_buf), rpc, AIT_LEN(&c->cli_buf));
1102: rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
1103:
1104: c->cli_sock = TASK_FD(task);
1105:
1106: /* armed timer for close stateless connection */
1107: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
1108: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
1109: c, ts, c, 0);
1110: }
1111:
1112: noreply = RPC_CHK_NOREPLY(rpc);
1113:
1114: /* check RPC packet session info */
1115: if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
1116: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
1117:
1118: rpc->call_argc ^= rpc->call_argc;
1119: rpc->call_rep.ret = RPC_ERROR(-1);
1120: rpc->call_rep.eno = RPC_ERROR(errno);
1121: } else {
1122: /* execute RPC call */
1123: schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
1124: }
1125:
1126: /* send RPC reply */
1127: if (!noreply)
1128: schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],
1129: c, TASK_FD(task), rpc, len);
1130: end:
1131: AIT_FREE_VAL(&b);
1132: schedReadSelf(task);
1133: return NULL;
1134: }
1135:
1.13 misho 1136: /* ------------------------------------------------------ */
1137:
1.16 misho 1138: void
1.13 misho 1139: rpc_freeBLOBCli(rpc_cli_t * __restrict c)
1140: {
1141: rpc_srv_t *s = c->cli_parent;
1142:
1143: schedCancelby(s->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
1.10 misho 1144:
1145: /* free buffer */
1146: AIT_FREE_VAL(&c->cli_buf);
1147:
1.14 misho 1148: array_Del(s->srv_blob.clients, c->cli_id, 0);
1.10 misho 1149: if (c)
1.14 misho 1150: e_free(c);
1.13 misho 1151: }
1152:
1153:
1154: static void *
1155: closeBLOBClient(sched_task_t *task)
1156: {
1157: int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock;
1158:
1159: rpc_freeBLOBCli(TASK_ARG(task));
1160:
1161: /* close client socket */
1162: shutdown(sock, SHUT_RDWR);
1163: close(sock);
1.7 misho 1164: return NULL;
1165: }
1166:
1167: static void *
1168: txBLOB(sched_task_t *task)
1169: {
1.10 misho 1170: rpc_cli_t *c = TASK_ARG(task);
1171: u_char *buf = AIT_GET_BUF(&c->cli_buf);
1.7 misho 1172: int wlen = sizeof(struct tagBLOBHdr);
1173:
1174: /* send reply */
1.10 misho 1175: wlen = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);
1176: if (wlen == -1 || wlen != sizeof(struct tagBLOBHdr)) {
1177: /* close blob connection */
1178: schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
1179: }
1.7 misho 1180:
1181: return NULL;
1182: }
1.4 misho 1183:
1.7 misho 1184: static void *
1185: rxBLOB(sched_task_t *task)
1186: {
1187: rpc_cli_t *c = TASK_ARG(task);
1188: rpc_srv_t *s = c->cli_parent;
1189: rpc_blob_t *b;
1.10 misho 1190: struct tagBLOBHdr blob;
1.7 misho 1191: int rlen;
1192:
1.10 misho 1193: memset(&blob, 0, sizeof blob);
1194: rlen = recv(TASK_FD(task), &blob, sizeof blob, 0);
1195: if (rlen < 1) {
1196: /* close blob connection */
1197: schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
1.7 misho 1198: return NULL;
1199: }
1.5 misho 1200:
1.10 misho 1201: /* check BLOB packet */
1202: if (rlen < sizeof(struct tagBLOBHdr)) {
1203: rpc_SetErr(ERPCMISMATCH, "Short BLOB packet");
1.6 misho 1204:
1.10 misho 1205: schedReadSelf(task);
1.7 misho 1206: return NULL;
1207: }
1.1 misho 1208:
1.7 misho 1209: /* check RPC packet session info */
1.15 misho 1210: if (rpc_chkPktSession(&blob.hdr_session, &s->srv_session)) {
1.7 misho 1211: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
1.10 misho 1212: blob.hdr_cmd = error;
1.7 misho 1213: goto end;
1214: }
1215:
1216: /* Go to proceed packet ... */
1.10 misho 1217: switch (blob.hdr_cmd) {
1.7 misho 1218: case get:
1.10 misho 1219: if (!(b = rpc_srv_getBLOB(s, ntohl(blob.hdr_var)))) {
1220: rpc_SetErr(EINVAL, "Var=%x not found", ntohl(blob.hdr_var));
1221: blob.hdr_cmd = no;
1222: blob.hdr_ret = RPC_ERROR(-1);
1.7 misho 1223: break;
1224: } else
1.10 misho 1225: blob.hdr_len = htonl(b->blob_len);
1.5 misho 1226:
1.7 misho 1227: if (rpc_srv_blobMap(s, b) != -1) {
1228: /* deliver BLOB variable to client */
1.10 misho 1229: blob.hdr_ret = htonl(rpc_srv_sendBLOB(c, b));
1.7 misho 1230: rpc_srv_blobUnmap(b);
1231: } else {
1.10 misho 1232: blob.hdr_cmd = error;
1233: blob.hdr_ret = RPC_ERROR(-1);
1.7 misho 1234: }
1235: break;
1236: case set:
1.17 misho 1237: if ((b = rpc_srv_registerBLOB(s, ntohl(blob.hdr_len),
1238: ntohl(blob.hdr_ret)))) {
1.7 misho 1239: /* set new BLOB variable for reply :) */
1.10 misho 1240: blob.hdr_var = htonl(b->blob_var);
1.7 misho 1241:
1242: /* receive BLOB from client */
1.10 misho 1243: blob.hdr_ret = htonl(rpc_srv_recvBLOB(c, b));
1.7 misho 1244: rpc_srv_blobUnmap(b);
1.5 misho 1245: } else {
1.10 misho 1246: blob.hdr_cmd = error;
1247: blob.hdr_ret = RPC_ERROR(-1);
1.7 misho 1248: }
1249: break;
1250: case unset:
1.11 misho 1251: if (rpc_srv_unregisterBLOB(s, ntohl(blob.hdr_var)) == -1) {
1.10 misho 1252: blob.hdr_cmd = error;
1253: blob.hdr_ret = RPC_ERROR(-1);
1.1 misho 1254: }
1255: break;
1.7 misho 1256: default:
1.10 misho 1257: rpc_SetErr(EPROCUNAVAIL, "Unsupported BLOB command %d", blob.hdr_cmd);
1258: blob.hdr_cmd = error;
1259: blob.hdr_ret = RPC_ERROR(-1);
1.7 misho 1260: }
1.1 misho 1261:
1.7 misho 1262: end:
1.10 misho 1263: memcpy(AIT_ADDR(&c->cli_buf), &blob, sizeof blob);
1264: schedWrite(TASK_ROOT(task), txBLOB, TASK_ARG(task), TASK_FD(task), NULL, 0);
1265: schedReadSelf(task);
1.7 misho 1266: return NULL;
1.2 misho 1267: }
1268:
1269: static void *
1.17 misho 1270: flushBLOB(sched_task_t *task)
1271: {
1.23 misho 1272: uintptr_t sigArg = atomic_load_acq_ptr(&_glSigArg);
1273: rpc_srv_t *srv = sigArg ? (void*) sigArg : TASK_ARG(task);
1.17 misho 1274: rpc_blob_t *b, *tmp;
1275:
1276: TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
1277: TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
1278:
1279: rpc_srv_blobFree(srv, b);
1280: e_free(b);
1281: }
1282:
1.23 misho 1283: if (!schedSignalSelf(task)) {
1284: /* disabled kqueue support in libaitsched */
1285: struct sigaction sa;
1286:
1287: memset(&sa, 0, sizeof sa);
1288: sigemptyset(&sa.sa_mask);
1289: sa.sa_handler = (void (*)(int)) flushBLOB;
1290: sa.sa_flags = SA_RESTART | SA_RESETHAND;
1291: sigaction(SIGFBLOB, &sa, NULL);
1292: }
1293:
1.17 misho 1294: return NULL;
1295: }
1296:
1297: static void *
1.10 misho 1298: acceptBLOBClients(sched_task_t *task)
1.2 misho 1299: {
1.10 misho 1300: rpc_srv_t *srv = TASK_ARG(task);
1301: rpc_cli_t *c = NULL;
1302: register int i;
1.14 misho 1303: socklen_t salen = sizeof(sockaddr_t);
1.21 misho 1304: int sock;
1.12 misho 1305: #ifdef TCP_NOPUSH
1306: int n = 1;
1307: #endif
1.7 misho 1308:
1.10 misho 1309: /* check free slots for connect */
1.14 misho 1310: for (i = 0; i < array_Size(srv->srv_blob.clients) &&
1311: (c = array(srv->srv_blob.clients, i, rpc_cli_t*)); i++);
1.21 misho 1312: if (c) { /* no more free slots! */
1313: EVERBOSE(1, "BLOB client quota exceeded! Connection will be shutdown!\n");
1314: if ((sock = accept(TASK_FD(task), NULL, NULL)) != -1) {
1315: shutdown(sock, SHUT_RDWR);
1316: close(sock);
1317: }
1.10 misho 1318: goto end;
1.21 misho 1319: }
1320:
1.14 misho 1321: c = e_malloc(sizeof(rpc_cli_t));
1.10 misho 1322: if (!c) {
1.7 misho 1323: LOGERR;
1.10 misho 1324: srv->srv_kill = srv->srv_blob.kill = 1;
1.7 misho 1325: return NULL;
1326: } else {
1.10 misho 1327: memset(c, 0, sizeof(rpc_cli_t));
1.14 misho 1328: array_Set(srv->srv_blob.clients, i, c);
1.10 misho 1329: c->cli_id = i;
1330: c->cli_parent = srv;
1.7 misho 1331: }
1.4 misho 1332:
1.10 misho 1333: /* alloc empty buffer */
1.14 misho 1334: AIT_SET_BUFSIZ(&c->cli_buf, 0, srv->srv_netbuf);
1.2 misho 1335:
1.10 misho 1336: /* accept client */
1337: c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
1338: if (c->cli_sock == -1) {
1339: LOGERR;
1340: AIT_FREE_VAL(&c->cli_buf);
1.14 misho 1341: array_Del(srv->srv_blob.clients, i, 42);
1.10 misho 1342: goto end;
1.12 misho 1343: } else {
1344: #ifdef TCP_NOPUSH
1345: setsockopt(c->cli_sock, IPPROTO_TCP, TCP_NOPUSH, &n, sizeof n);
1346: #endif
1.10 misho 1347: fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
1.12 misho 1348: }
1.2 misho 1349:
1.10 misho 1350: schedRead(TASK_ROOT(task), rxBLOB, c, c->cli_sock, NULL, 0);
1351: end:
1352: schedReadSelf(task);
1.7 misho 1353: return NULL;
1.1 misho 1354: }
1355:
1.10 misho 1356: /* ------------------------------------------------------ */
1.1 misho 1357:
1358: /*
1.7 misho 1359: * rpc_srv_initBLOBServer() - Init & create BLOB Server
1360: *
1.4 misho 1361: * @srv = RPC server instance
1.2 misho 1362: * @Port = Port for bind server, if Port == 0 default port is selected
1363: * @diskDir = Disk place for BLOB file objects
1364: * return: -1 == error or 0 bind and created BLOB server instance
1365: */
1366: int
1367: rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)
1368: {
1369: int n = 1;
1370:
1.10 misho 1371: if (!srv || srv->srv_kill) {
1.7 misho 1372: rpc_SetErr(EINVAL, "Invalid parameters can`t init BLOB server");
1.2 misho 1373: return -1;
1374: }
1375:
1376: memset(&srv->srv_blob, 0, sizeof srv->srv_blob);
1377: if (access(diskDir, R_OK | W_OK) == -1) {
1378: LOGERR;
1379: return -1;
1380: } else
1.7 misho 1381: AIT_SET_STR(&srv->srv_blob.dir, diskDir);
1.2 misho 1382:
1.10 misho 1383: /* init blob list */
1384: TAILQ_INIT(&srv->srv_blob.blobs);
1385:
1.2 misho 1386: srv->srv_blob.server.cli_parent = srv;
1.4 misho 1387:
1.14 misho 1388: memcpy(&srv->srv_blob.server.cli_sa, &srv->srv_server.cli_sa, sizeof(sockaddr_t));
1.10 misho 1389: switch (srv->srv_blob.server.cli_sa.sa.sa_family) {
1.4 misho 1390: case AF_INET:
1.10 misho 1391: srv->srv_blob.server.cli_sa.sin.sin_port =
1392: htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin.sin_port) + 1);
1.4 misho 1393: break;
1394: case AF_INET6:
1.10 misho 1395: srv->srv_blob.server.cli_sa.sin6.sin6_port =
1396: htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin6.sin6_port) + 1);
1.4 misho 1397: break;
1398: case AF_LOCAL:
1.10 misho 1399: strlcat(srv->srv_blob.server.cli_sa.sun.sun_path, ".blob",
1400: sizeof srv->srv_blob.server.cli_sa.sun.sun_path);
1.4 misho 1401: break;
1402: default:
1.7 misho 1403: AIT_FREE_VAL(&srv->srv_blob.dir);
1.4 misho 1404: return -1;
1.2 misho 1405: }
1406:
1.4 misho 1407: /* create BLOB server socket */
1.6 misho 1408: srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);
1.2 misho 1409: if (srv->srv_blob.server.cli_sock == -1) {
1410: LOGERR;
1.7 misho 1411: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 1412: return -1;
1413: }
1414: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
1415: LOGERR;
1416: close(srv->srv_blob.server.cli_sock);
1.7 misho 1417: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 1418: return -1;
1419: }
1.5 misho 1420: n = srv->srv_netbuf;
1421: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
1422: LOGERR;
1423: close(srv->srv_blob.server.cli_sock);
1.7 misho 1424: AIT_FREE_VAL(&srv->srv_blob.dir);
1.5 misho 1425: return -1;
1426: }
1427: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
1428: LOGERR;
1429: close(srv->srv_blob.server.cli_sock);
1.7 misho 1430: AIT_FREE_VAL(&srv->srv_blob.dir);
1.5 misho 1431: return -1;
1432: }
1.6 misho 1433: if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa,
1434: srv->srv_blob.server.cli_sa.sa.sa_len) == -1) {
1.2 misho 1435: LOGERR;
1436: close(srv->srv_blob.server.cli_sock);
1.7 misho 1437: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 1438: return -1;
1.13 misho 1439: } else
1440: fcntl(srv->srv_blob.server.cli_sock, F_SETFL,
1441: fcntl(srv->srv_blob.server.cli_sock, F_GETFL) | O_NONBLOCK);
1442:
1.2 misho 1443:
1.10 misho 1444: /* allocate pool for concurent blob clients */
1.14 misho 1445: srv->srv_blob.clients = array_Init(array_Size(srv->srv_clients));
1.2 misho 1446: if (!srv->srv_blob.clients) {
1.14 misho 1447: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
1.2 misho 1448: close(srv->srv_blob.server.cli_sock);
1.7 misho 1449: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 1450: return -1;
1.10 misho 1451: }
1.2 misho 1452:
1.10 misho 1453: /* init blob scheduler */
1454: srv->srv_blob.root = schedBegin();
1455: if (!srv->srv_blob.root) {
1456: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1.14 misho 1457: array_Destroy(&srv->srv_blob.clients);
1.10 misho 1458: close(srv->srv_blob.server.cli_sock);
1459: AIT_FREE_VAL(&srv->srv_blob.dir);
1460: return -1;
1461: }
1.2 misho 1462:
1463: return 0;
1464: }
1465:
1466: /*
1.7 misho 1467: * rpc_srv_endBLOBServer() - Destroy BLOB server, close all opened sockets and free resources
1468: *
1.2 misho 1469: * @srv = RPC Server instance
1470: * return: none
1471: */
1.16 misho 1472: void
1.2 misho 1473: rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
1474: {
1.10 misho 1475: if (!srv)
1.2 misho 1476: return;
1477:
1.10 misho 1478: srv->srv_blob.kill = 1;
1.17 misho 1479:
1480: schedEnd(&srv->srv_blob.root);
1.26.2.4! misho 1481:
! 1482: if (srv->srv_blob.server.cli_sa.sa.sa_family == AF_LOCAL)
! 1483: unlink(srv->srv_blob.server.cli_sa.sun.sun_path);
1.2 misho 1484: }
1485:
1486: /*
1.10 misho 1487: * rpc_srv_loopBLOBServer() - Execute Main BLOB server loop and wait for clients requests
1.7 misho 1488: *
1.2 misho 1489: * @srv = RPC Server instance
1490: * return: -1 error or 0 ok, infinite loop ...
1491: */
1492: int
1.10 misho 1493: rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv)
1.2 misho 1494: {
1.10 misho 1495: rpc_cli_t *c;
1.2 misho 1496: register int i;
1.10 misho 1497: rpc_blob_t *b, *tmp;
1498: struct timespec ts = { RPC_SCHED_POLLING, 0 };
1.2 misho 1499:
1.10 misho 1500: if (!srv || srv->srv_kill) {
1.7 misho 1501: rpc_SetErr(EINVAL, "Invalid parameter can`t start BLOB server");
1.2 misho 1502: return -1;
1503: }
1504:
1.14 misho 1505: if (listen(srv->srv_blob.server.cli_sock, array_Size(srv->srv_blob.clients)) == -1) {
1.2 misho 1506: LOGERR;
1507: return -1;
1.10 misho 1508: }
1509:
1.23 misho 1510: if (!schedSignal(srv->srv_blob.root, flushBLOB, srv, SIGFBLOB, NULL, 0)) {
1511: /* disabled kqueue support in libaitsched */
1512: struct sigaction sa;
1513:
1514: atomic_store_rel_ptr(&_glSigArg, (uintptr_t) srv);
1515:
1516: memset(&sa, 0, sizeof sa);
1517: sigemptyset(&sa.sa_mask);
1518: sa.sa_handler = (void (*)(int)) flushBLOB;
1519: sa.sa_flags = SA_RESTART | SA_RESETHAND;
1520: sigaction(SIGFBLOB, &sa, NULL);
1521: }
1522:
1.10 misho 1523: if (!schedRead(srv->srv_blob.root, acceptBLOBClients, srv,
1524: srv->srv_blob.server.cli_sock, NULL, 0)) {
1525: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1526: return -1;
1527: }
1.2 misho 1528:
1.10 misho 1529: schedPolling(srv->srv_blob.root, &ts, NULL);
1530: /* main rpc loop */
1531: schedRun(srv->srv_blob.root, &srv->srv_blob.kill);
1.7 misho 1532:
1.17 misho 1533: /* detach blobs */
1534: TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
1535: TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
1536:
1537: rpc_srv_blobFree(srv, b);
1538: e_free(b);
1539: }
1540:
1.10 misho 1541: /* close all clients connections & server socket */
1.14 misho 1542: for (i = 0; i < array_Size(srv->srv_blob.clients); i++) {
1543: c = array(srv->srv_blob.clients, i, rpc_cli_t*);
1.10 misho 1544: if (c) {
1545: shutdown(c->cli_sock, SHUT_RDWR);
1546: close(c->cli_sock);
1547:
1548: schedCancelby(srv->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
1549: AIT_FREE_VAL(&c->cli_buf);
1.2 misho 1550: }
1.14 misho 1551: array_Del(srv->srv_blob.clients, i, 42);
1.10 misho 1552: }
1.14 misho 1553: array_Destroy(&srv->srv_blob.clients);
1.2 misho 1554:
1.10 misho 1555: close(srv->srv_blob.server.cli_sock);
1.2 misho 1556:
1.10 misho 1557: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 1558: return 0;
1559: }
1560:
1561:
1562: /*
1.7 misho 1563: * rpc_srv_initServer() - Init & create RPC Server
1564: *
1.15 misho 1565: * @InstID = Instance for authentication & recognition
1.1 misho 1566: * @concurentClients = Concurent clients at same time to this server
1.10 misho 1567: * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
1.4 misho 1568: * @csHost = Host name or address for bind server, if NULL any address
1.1 misho 1569: * @Port = Port for bind server, if Port == 0 default port is selected
1.13 misho 1570: * @proto = Protocol, if == 0 choose SOCK_STREAM
1.1 misho 1571: * return: NULL == error or !=NULL bind and created RPC server instance
1572: */
1573: rpc_srv_t *
1.15 misho 1574: rpc_srv_initServer(u_char InstID, int concurentClients, int netBuf,
1575: const char *csHost, u_short Port, int proto)
1.1 misho 1576: {
1.10 misho 1577: int n = 1;
1.1 misho 1578: rpc_srv_t *srv = NULL;
1.14 misho 1579: sockaddr_t sa = E_SOCKADDR_INIT;
1.1 misho 1580:
1.25 misho 1581: if (!concurentClients || (proto < 0 || proto > SOCK_RAW)) {
1.10 misho 1582: rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
1.1 misho 1583: return NULL;
1584: }
1.26.2.3 misho 1585: if (!Port && proto < SOCK_RAW)
1.1 misho 1586: Port = RPC_DEFPORT;
1.26.2.2 misho 1587: if (!e_gethostbyname(csHost, Port, &sa))
1588: return NULL;
1.13 misho 1589: if (!proto)
1590: proto = SOCK_STREAM;
1.10 misho 1591: if (netBuf < RPC_MIN_BUFSIZ)
1.5 misho 1592: netBuf = BUFSIZ;
1.7 misho 1593: else
1.14 misho 1594: netBuf = E_ALIGN(netBuf, 2); /* align netBuf length */
1.10 misho 1595:
1596: #ifdef HAVE_SRANDOMDEV
1597: srandomdev();
1598: #else
1599: time_t tim;
1600:
1601: srandom((time(&tim) ^ getpid()));
1602: #endif
1.1 misho 1603:
1.14 misho 1604: srv = e_malloc(sizeof(rpc_srv_t));
1.1 misho 1605: if (!srv) {
1606: LOGERR;
1607: return NULL;
1608: } else
1609: memset(srv, 0, sizeof(rpc_srv_t));
1610:
1.13 misho 1611: srv->srv_proto = proto;
1.5 misho 1612: srv->srv_netbuf = netBuf;
1.1 misho 1613: srv->srv_session.sess_version = RPC_VERSION;
1.15 misho 1614: srv->srv_session.sess_instance = InstID;
1.1 misho 1615:
1616: srv->srv_server.cli_parent = srv;
1.10 misho 1617: memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa);
1618:
1.12 misho 1619: /* init functions */
1620: pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
1621: SLIST_INIT(&srv->srv_funcs);
1622: AVL_INIT(&srv->srv_funcs);
1.10 misho 1623:
1624: /* init scheduler */
1625: srv->srv_root = schedBegin();
1626: if (!srv->srv_root) {
1627: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1.12 misho 1628: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1.14 misho 1629: e_free(srv);
1.10 misho 1630: return NULL;
1631: }
1632:
1633: /* init pool for clients */
1.14 misho 1634: srv->srv_clients = array_Init(concurentClients);
1.10 misho 1635: if (!srv->srv_clients) {
1.14 misho 1636: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
1.10 misho 1637: schedEnd(&srv->srv_root);
1.12 misho 1638: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1.14 misho 1639: e_free(srv);
1.10 misho 1640: return NULL;
1641: }
1.4 misho 1642:
1643: /* create server socket */
1.26 misho 1644: srv->srv_server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family,
1645: srv->srv_proto, srv->srv_proto == SOCK_RAW ? IPPROTO_ERPC : 0);
1.1 misho 1646: if (srv->srv_server.cli_sock == -1) {
1647: LOGERR;
1.14 misho 1648: array_Destroy(&srv->srv_clients);
1.10 misho 1649: schedEnd(&srv->srv_root);
1.12 misho 1650: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1.14 misho 1651: e_free(srv);
1.1 misho 1652: return NULL;
1653: }
1654: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
1655: LOGERR;
1.10 misho 1656: goto err;
1.1 misho 1657: }
1.5 misho 1658: n = srv->srv_netbuf;
1659: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
1660: LOGERR;
1.10 misho 1661: goto err;
1.5 misho 1662: }
1663: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
1664: LOGERR;
1.10 misho 1665: goto err;
1.5 misho 1666: }
1.6 misho 1667: if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa,
1668: srv->srv_server.cli_sa.sa.sa_len) == -1) {
1.1 misho 1669: LOGERR;
1.10 misho 1670: goto err;
1.13 misho 1671: } else
1672: fcntl(srv->srv_server.cli_sock, F_SETFL,
1673: fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
1.1 misho 1674:
1.10 misho 1675: rpc_register_srvPing(srv);
1.8 misho 1676:
1.1 misho 1677: return srv;
1.10 misho 1678: err: /* error condition */
1679: close(srv->srv_server.cli_sock);
1.14 misho 1680: array_Destroy(&srv->srv_clients);
1.10 misho 1681: schedEnd(&srv->srv_root);
1.12 misho 1682: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1.14 misho 1683: e_free(srv);
1.10 misho 1684: return NULL;
1.1 misho 1685: }
1686:
1687: /*
1.7 misho 1688: * rpc_srv_endServer() - Destroy RPC server, close all opened sockets and free resources
1689: *
1.6 misho 1690: * @psrv = RPC Server instance
1.1 misho 1691: * return: none
1692: */
1.16 misho 1693: void
1.6 misho 1694: rpc_srv_endServer(rpc_srv_t ** __restrict psrv)
1.1 misho 1695: {
1.10 misho 1696: if (!psrv || !*psrv)
1.1 misho 1697: return;
1698:
1.10 misho 1699: /* if send kill to blob server */
1.17 misho 1700: rpc_srv_endBLOBServer(*psrv);
1.1 misho 1701:
1.10 misho 1702: (*psrv)->srv_kill = 1;
1703: sleep(RPC_SCHED_POLLING);
1.2 misho 1704:
1.17 misho 1705: schedEnd(&(*psrv)->srv_root);
1706:
1.26.2.4! misho 1707: if ((*psrv)->srv_server.cli_sa.sa.sa_family == AF_LOCAL)
! 1708: unlink((*psrv)->srv_server.cli_sa.sun.sun_path);
! 1709:
1.12 misho 1710: pthread_mutex_destroy(&(*psrv)->srv_funcs.mtx);
1.14 misho 1711: e_free(*psrv);
1.6 misho 1712: *psrv = NULL;
1.1 misho 1713: }
1714:
1715: /*
1.7 misho 1716: * rpc_srv_loopServer() - Execute Main server loop and wait for clients requests
1717: *
1.1 misho 1718: * @srv = RPC Server instance
1719: * return: -1 error or 0 ok, infinite loop ...
1720: */
1721: int
1.5 misho 1722: rpc_srv_loopServer(rpc_srv_t * __restrict srv)
1.1 misho 1723: {
1.10 misho 1724: rpc_cli_t *c;
1.1 misho 1725: register int i;
1.12 misho 1726: rpc_func_t *f;
1.10 misho 1727: struct timespec ts = { RPC_SCHED_POLLING, 0 };
1.1 misho 1728:
1729: if (!srv) {
1.10 misho 1730: rpc_SetErr(EINVAL, "Invalid parameter can`t start RPC server");
1.1 misho 1731: return -1;
1732: }
1733:
1.13 misho 1734: if (srv->srv_proto == SOCK_STREAM)
1.14 misho 1735: if (listen(srv->srv_server.cli_sock, array_Size(srv->srv_clients)) == -1) {
1.13 misho 1736: LOGERR;
1737: return -1;
1738: }
1.1 misho 1739:
1.13 misho 1740: if (!schedRead(srv->srv_root, cbProto[srv->srv_proto][CB_ACCEPTCLIENT], srv,
1741: srv->srv_server.cli_sock, NULL, 0)) {
1.10 misho 1742: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1743: return -1;
1744: }
1.7 misho 1745:
1.10 misho 1746: schedPolling(srv->srv_root, &ts, NULL);
1.7 misho 1747: /* main rpc loop */
1.10 misho 1748: schedRun(srv->srv_root, &srv->srv_kill);
1749:
1750: /* close all clients connections & server socket */
1.14 misho 1751: for (i = 0; i < array_Size(srv->srv_clients); i++) {
1752: c = array(srv->srv_clients, i, rpc_cli_t*);
1.10 misho 1753: if (c) {
1.24 misho 1754: if (srv->srv_proto == SOCK_STREAM) {
1755: shutdown(c->cli_sock, SHUT_RDWR);
1756: close(c->cli_sock);
1757: }
1.10 misho 1758:
1759: schedCancelby(srv->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
1.14 misho 1760: ait_freeVars(&RPC_RETVARS(c));
1.10 misho 1761: AIT_FREE_VAL(&c->cli_buf);
1.1 misho 1762: }
1.14 misho 1763: array_Del(srv->srv_clients, i, 42);
1.10 misho 1764: }
1.14 misho 1765: array_Destroy(&srv->srv_clients);
1.2 misho 1766:
1.25 misho 1767: if (srv->srv_proto != SOCK_EXT)
1768: close(srv->srv_server.cli_sock);
1.2 misho 1769:
1.10 misho 1770: /* detach exported calls */
1.12 misho 1771: RPC_FUNCS_LOCK(&srv->srv_funcs);
1772: while ((f = SLIST_FIRST(&srv->srv_funcs))) {
1773: SLIST_REMOVE_HEAD(&srv->srv_funcs, func_next);
1.1 misho 1774:
1.10 misho 1775: AIT_FREE_VAL(&f->func_name);
1.14 misho 1776: e_free(f);
1.1 misho 1777: }
1.12 misho 1778: srv->srv_funcs.avlh_root = NULL;
1779: RPC_FUNCS_UNLOCK(&srv->srv_funcs);
1.1 misho 1780:
1781: return 0;
1782: }
1783:
1784:
1785: /*
1786: * rpc_srv_execCall() Execute registered call from RPC server
1.7 misho 1787: *
1.10 misho 1788: * @cli = RPC client
1.1 misho 1789: * @rpc = IN RPC call structure
1.10 misho 1790: * @funcname = Execute RPC function
1.5 misho 1791: * @args = IN RPC calling arguments from RPC client
1.1 misho 1792: * return: -1 error, !=-1 ok
1793: */
1794: int
1.10 misho 1795: rpc_srv_execCall(rpc_cli_t * __restrict cli, struct tagRPCCall * __restrict rpc,
1796: ait_val_t funcname, array_t * __restrict args)
1.1 misho 1797: {
1798: rpc_callback_t func;
1799:
1.10 misho 1800: if (!cli || !rpc || !AIT_ADDR(&funcname)) {
1.7 misho 1801: rpc_SetErr(EINVAL, "Invalid parameter can`t exec function");
1.1 misho 1802: return -1;
1803: }
1804:
1.10 misho 1805: func = AIT_GET_LIKE(&funcname, rpc_callback_t);
1806: return func(cli, rpc, args);
1.1 misho 1807: }
1.24 misho 1808:
1809:
1810: /*
1811: * rpc_srv_initServer2() - Init & create layer2 RPC Server
1812: *
1813: * @InstID = Instance for authentication & recognition
1814: * @concurentClients = Concurent clients at same time to this server
1815: * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
1816: * @csIface = Interface name for bind server, if NULL first interface on host
1817: * return: NULL == error or !=NULL bind and created RPC server instance
1818: */
1819: rpc_srv_t *
1820: rpc_srv_initServer2(u_char InstID, int concurentClients, int netBuf, const char *csIface)
1821: {
1822: int n = 1;
1823: rpc_srv_t *srv = NULL;
1824: sockaddr_t sa = E_SOCKADDR_INIT;
1825: char szIface[64], szStr[STRSIZ];
1826: register int i;
1827: struct ifreq ifr;
1828: struct bpf_insn insns[] = {
1829: BPF_STMT(BPF_LD + BPF_H + BPF_ABS, 12),
1830: BPF_JUMP(BPF_JMP + BPF_JEQ + BPF_K, RPC_DEFPORT, 0, 1),
1831: BPF_STMT(BPF_RET + BPF_K, -1),
1832: BPF_STMT(BPF_RET + BPF_K, 0),
1833: };
1834: struct bpf_program fcode = {
1835: .bf_len = sizeof(insns) / sizeof(struct bpf_insn),
1836: .bf_insns = insns
1837: };
1838:
1839: if (!concurentClients) {
1840: rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
1841: return NULL;
1842: }
1843: if (!csIface) {
1844: if (e_get1stiface(szIface, sizeof szIface))
1845: return NULL;
1846: } else
1847: strlcpy(szIface, csIface, sizeof szIface);
1848: if (!e_getifacebyname(szIface, &sa))
1849: return NULL;
1850:
1851: #ifdef HAVE_SRANDOMDEV
1852: srandomdev();
1853: #else
1854: time_t tim;
1855:
1856: srandom((time(&tim) ^ getpid()));
1857: #endif
1858:
1859: srv = e_malloc(sizeof(rpc_srv_t));
1860: if (!srv) {
1861: LOGERR;
1862: return NULL;
1863: } else
1864: memset(srv, 0, sizeof(rpc_srv_t));
1865:
1866: srv->srv_proto = SOCK_BPF;
1867: srv->srv_netbuf = netBuf;
1868: srv->srv_session.sess_version = RPC_VERSION;
1869: srv->srv_session.sess_instance = InstID;
1870:
1871: srv->srv_server.cli_parent = srv;
1872: memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa);
1873:
1874: /* init functions */
1875: pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
1876: SLIST_INIT(&srv->srv_funcs);
1877: AVL_INIT(&srv->srv_funcs);
1878:
1879: /* init scheduler */
1880: srv->srv_root = schedBegin();
1881: if (!srv->srv_root) {
1882: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1883: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1884: e_free(srv);
1885: return NULL;
1886: }
1887:
1888: /* init pool for clients */
1889: srv->srv_clients = array_Init(concurentClients);
1890: if (!srv->srv_clients) {
1891: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
1892: schedEnd(&srv->srv_root);
1893: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1894: e_free(srv);
1895: return NULL;
1896: }
1897:
1898: /* create server handler */
1899: for (i = 0; i < 10; i++) {
1900: memset(szStr, 0, sizeof szStr);
1901: snprintf(szStr, sizeof szStr, "/dev/bpf%d", i);
1902: srv->srv_server.cli_sock = open(szStr, O_RDWR);
1903: if (srv->srv_server.cli_sock > STDERR_FILENO)
1904: break;
1905: }
1906: if (srv->srv_server.cli_sock < 3) {
1907: LOGERR;
1908: array_Destroy(&srv->srv_clients);
1909: schedEnd(&srv->srv_root);
1910: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1911: e_free(srv);
1912: return NULL;
1913: }
1914:
1915: if (ioctl(srv->srv_server.cli_sock, BIOCIMMEDIATE, &n) == -1) {
1916: LOGERR;
1917: goto err;
1918: }
1919: if (ioctl(srv->srv_server.cli_sock, BIOCSETF, &fcode) == -1) {
1920: LOGERR;
1921: goto err;
1922: }
1923: n = (netBuf < RPC_MIN_BUFSIZ) ? getpagesize() : E_ALIGN(netBuf, 2);
1924: if (ioctl(srv->srv_server.cli_sock, BIOCSBLEN, &n) == -1) {
1925: LOGERR;
1926: goto err;
1927: } else
1928: srv->srv_netbuf = n;
1929:
1930: memset(&ifr, 0, sizeof ifr);
1931: strlcpy(ifr.ifr_name, szIface, sizeof ifr.ifr_name);
1932: if (ioctl(srv->srv_server.cli_sock, BIOCSETIF, &ifr) == -1) {
1933: LOGERR;
1934: goto err;
1935: } else
1936: fcntl(srv->srv_server.cli_sock, F_SETFL,
1937: fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
1938:
1939: rpc_register_srvPing(srv);
1940:
1941: return srv;
1942: err: /* error condition */
1943: close(srv->srv_server.cli_sock);
1944: array_Destroy(&srv->srv_clients);
1945: schedEnd(&srv->srv_root);
1946: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1947: e_free(srv);
1948: return NULL;
1949: }
1.25 misho 1950:
1951: /*
1952: * rpc_srv_initServerExt() - Init & create pipe RPC Server
1953: *
1954: * @InstID = Instance for authentication & recognition
1955: * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
1956: * @fd = File descriptor
1957: * return: NULL == error or !=NULL bind and created RPC server instance
1958: */
1959: rpc_srv_t *
1960: rpc_srv_initServerExt(u_char InstID, int netBuf, int fd)
1961: {
1962: rpc_srv_t *srv = NULL;
1963:
1964: #ifdef HAVE_SRANDOMDEV
1965: srandomdev();
1966: #else
1967: time_t tim;
1968:
1969: srandom((time(&tim) ^ getpid()));
1970: #endif
1971:
1972: srv = e_malloc(sizeof(rpc_srv_t));
1973: if (!srv) {
1974: LOGERR;
1975: return NULL;
1976: } else
1977: memset(srv, 0, sizeof(rpc_srv_t));
1978:
1979: srv->srv_proto = SOCK_EXT;
1980: srv->srv_netbuf = (netBuf < RPC_MIN_BUFSIZ) ?
1981: getpagesize() : E_ALIGN(netBuf, 2);
1982: srv->srv_session.sess_version = RPC_VERSION;
1983: srv->srv_session.sess_instance = InstID;
1984:
1985: srv->srv_server.cli_parent = srv;
1986: srv->srv_server.cli_sock = fd;
1987:
1988: /* init functions */
1989: pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
1990: SLIST_INIT(&srv->srv_funcs);
1991: AVL_INIT(&srv->srv_funcs);
1992:
1993: /* init scheduler */
1994: srv->srv_root = schedBegin();
1995: if (!srv->srv_root) {
1996: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1997: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1998: e_free(srv);
1999: return NULL;
2000: }
2001:
2002: /* init pool for clients */
2003: srv->srv_clients = array_Init(1);
2004: if (!srv->srv_clients) {
2005: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
2006: schedEnd(&srv->srv_root);
2007: pthread_mutex_destroy(&srv->srv_funcs.mtx);
2008: e_free(srv);
2009: return NULL;
2010: }
2011:
2012: fcntl(srv->srv_server.cli_sock, F_SETFL,
2013: fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
2014:
2015: rpc_register_srvPing(srv);
2016:
2017: return srv;
2018: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>