Annotation of libaitrpc/src/srv.c, revision 1.24.2.4
1.1 misho 1: /*************************************************************************
2: * (C) 2010 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.24.2.4! misho 6: * $Id: srv.c,v 1.24.2.3 2015/01/18 00:03:01 misho Exp $
1.1 misho 7: *
1.2 misho 8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
1.24.2.2 misho 15: Copyright 2004 - 2015
1.2 misho 16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
1.1 misho 46: #include "global.h"
47:
48:
1.13 misho 49: /* SOCK_STREAM */
50: static void *acceptClients(sched_task_t *);
51: static void *closeClient(sched_task_t *);
52: static void *rxPacket(sched_task_t *);
53: static void *txPacket(sched_task_t *);
54:
55: /* SOCK_DGRAM */
56: static void *freeClient(sched_task_t *);
57: static void *rxUDPPacket(sched_task_t *);
58: static void *txUDPPacket(sched_task_t *);
59:
60: /* SOCK_RAW */
61:
1.24 misho 62: /* SOCK_BPF */
63: static void *rxBPFPacket(sched_task_t *);
64: static void *txBPFPacket(sched_task_t *);
65:
1.24.2.3 misho 66: /* SOCK_EXT */
1.24.2.1 misho 67:
68: static sched_task_func_t cbProto[SOCK_MAX_SUPPORT][4] = {
1.13 misho 69: { acceptClients, closeClient, rxPacket, txPacket }, /* SOCK_STREAM */
70: { acceptClients, closeClient, rxPacket, txPacket }, /* SOCK_STREAM */
71: { rxUDPPacket, freeClient, rxUDPPacket, txUDPPacket }, /* SOCK_DGRAM */
1.24 misho 72: { NULL, NULL, NULL, NULL }, /* SOCK_RAW */
1.24.2.1 misho 73: { rxBPFPacket, freeClient, rxBPFPacket, txBPFPacket }, /* SOCK_BPF */
1.24.2.3 misho 74: { NULL, NULL, NULL, NULL } /* SOCK_EXT */
1.13 misho 75: };
76:
1.23 misho 77: /* Global Signal Argument when kqueue support disabled */
78:
79: static volatile uintptr_t _glSigArg = 0;
80:
1.13 misho 81:
1.16 misho 82: void
1.13 misho 83: rpc_freeCli(rpc_cli_t * __restrict c)
1.10 misho 84: {
85: rpc_srv_t *s = c->cli_parent;
86:
1.13 misho 87: schedCancelby(s->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
1.10 misho 88:
89: /* free buffer */
90: AIT_FREE_VAL(&c->cli_buf);
91:
1.14 misho 92: array_Del(s->srv_clients, c->cli_id, 0);
1.10 misho 93: if (c)
1.14 misho 94: e_free(c);
1.13 misho 95: }
96:
97:
98: static inline int
1.14 misho 99: _check4freeslot(rpc_srv_t * __restrict srv, sockaddr_t * __restrict sa)
1.13 misho 100: {
101: rpc_cli_t *c = NULL;
102: register int i;
103:
104: /* check free slots for connect */
1.14 misho 105: for (i = 0; i < array_Size(srv->srv_clients) &&
106: (c = array(srv->srv_clients, i, rpc_cli_t*)); i++)
1.13 misho 107: /* check for duplicates */
1.14 misho 108: if (sa && !e_addrcmp(&c->cli_sa, sa, 42))
1.13 misho 109: break;
1.14 misho 110: if (i >= array_Size(srv->srv_clients))
1.13 misho 111: return -1; /* no more free slots! */
112:
113: return i;
114: }
115:
116: static rpc_cli_t *
1.14 misho 117: _allocClient(rpc_srv_t * __restrict srv, sockaddr_t * __restrict sa)
1.13 misho 118: {
119: rpc_cli_t *c = NULL;
120: int n;
121:
122: n = _check4freeslot(srv, sa);
123: if (n == -1)
124: return NULL;
125: else
1.14 misho 126: c = array(srv->srv_clients, n, rpc_cli_t*);
1.13 misho 127:
128: if (!c) {
1.14 misho 129: c = e_malloc(sizeof(rpc_cli_t));
1.13 misho 130: if (!c) {
131: LOGERR;
132: srv->srv_kill = 1;
133: return NULL;
134: } else {
135: memset(c, 0, sizeof(rpc_cli_t));
1.14 misho 136: array_Set(srv->srv_clients, n, c);
1.13 misho 137: c->cli_id = n;
138: c->cli_parent = srv;
139: }
140:
141: /* alloc empty buffer */
1.14 misho 142: AIT_SET_BUFSIZ(&c->cli_buf, 0, srv->srv_netbuf);
1.13 misho 143: }
144:
145: return c;
146: }
147:
148:
149: static void *
150: freeClient(sched_task_t *task)
151: {
152: rpc_freeCli(TASK_ARG(task));
153:
154: return NULL;
155: }
156:
157: static void *
158: closeClient(sched_task_t *task)
159: {
160: int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock;
161:
162: rpc_freeCli(TASK_ARG(task));
163:
164: /* close client socket */
165: shutdown(sock, SHUT_RDWR);
166: close(sock);
1.10 misho 167: return NULL;
168: }
1.7 misho 169:
170: static void *
171: txPacket(sched_task_t *task)
172: {
173: rpc_cli_t *c = TASK_ARG(task);
174: rpc_srv_t *s = c->cli_parent;
175: rpc_func_t *f = NULL;
1.18 misho 176: u_char *buf = AIT_GET_BUF(&c->cli_buf);
1.7 misho 177: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
1.18 misho 178: int ret, estlen, wlen = sizeof(struct tagRPCCall);
1.19 misho 179: struct pollfd pfd;
1.21 misho 180: #ifdef TCP_SESSION_TIMEOUT
181: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
182:
183: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
184: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
185: TASK_ARG(task), ts, TASK_ARG(task), 0);
186: #endif
1.7 misho 187:
188: if (rpc->call_argc) {
1.10 misho 189: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
1.7 misho 190: if (!f) {
1.10 misho 191: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
1.18 misho 192:
1.7 misho 193: rpc->call_argc ^= rpc->call_argc;
194: rpc->call_rep.ret = RPC_ERROR(-1);
195: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
196: } else {
1.18 misho 197: /* calc estimated length */
198: estlen = ait_resideVars(RPC_RETVARS(c)) + wlen;
199: if (estlen > AIT_LEN(&c->cli_buf))
200: AIT_RE_BUF(&c->cli_buf, estlen);
201: buf = AIT_GET_BUF(&c->cli_buf);
1.19 misho 202: rpc = (struct tagRPCCall*) buf;
1.18 misho 203:
1.14 misho 204: rpc->call_argc = htons(array_Size(RPC_RETVARS(c)));
1.7 misho 205: /* Go Encapsulate variables */
1.18 misho 206: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
207: RPC_RETVARS(c));
1.10 misho 208: /* Free return values */
1.14 misho 209: ait_freeVars(&c->cli_vars);
1.7 misho 210: if (ret == -1) {
211: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
1.19 misho 212:
1.7 misho 213: rpc->call_argc ^= rpc->call_argc;
214: rpc->call_rep.ret = RPC_ERROR(-1);
215: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
216: } else
217: wlen += ret;
218: }
219: }
220:
1.18 misho 221: rpc->call_len = htonl(wlen);
1.8 misho 222:
1.15 misho 223: #if 0
1.7 misho 224: /* calculate CRC */
225: rpc->call_crc ^= rpc->call_crc;
1.8 misho 226: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
1.15 misho 227: #endif
1.7 misho 228:
229: /* send reply */
1.19 misho 230: pfd.fd = TASK_FD(task);
231: pfd.events = POLLOUT;
232: for (; wlen > 0; wlen -= ret, buf += ret) {
233: if ((ret = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 ||
234: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
235: if (ret)
236: LOGERR;
237: else
1.22 misho 238: rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
1.19 misho 239: /* close connection */
240: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
241: TASK_ARG(task), 0, NULL, 0);
242: return NULL;
243: }
244: ret = send(TASK_FD(task), buf, MIN(wlen, s->srv_netbuf), MSG_NOSIGNAL);
245: if (ret == -1) {
246: /* close connection */
247: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
248: TASK_ARG(task), 0, NULL, 0);
249: return NULL;
250: }
1.10 misho 251: }
1.7 misho 252:
253: return NULL;
254: }
255:
256: static void *
257: execCall(sched_task_t *task)
258: {
259: rpc_cli_t *c = TASK_ARG(task);
260: rpc_srv_t *s = c->cli_parent;
261: rpc_func_t *f = NULL;
262: array_t *arr = NULL;
1.18 misho 263: u_char *buf = AIT_GET_BUF(&c->cli_buf);
1.7 misho 264: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
265: int argc = ntohs(rpc->call_argc);
266:
267: /* Go decapsulate variables ... */
1.8 misho 268: if (argc) {
1.14 misho 269: arr = ait_buffer2vars(buf + sizeof(struct tagRPCCall),
1.18 misho 270: AIT_LEN(&c->cli_buf) - sizeof(struct tagRPCCall), argc, 42);
1.7 misho 271: if (!arr) {
1.14 misho 272: rpc_SetErr(ERPCMISMATCH, "#%d - %s", elwix_GetErrno(), elwix_GetError());
1.18 misho 273:
1.7 misho 274: rpc->call_argc ^= rpc->call_argc;
275: rpc->call_rep.ret = RPC_ERROR(-1);
276: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
277: return NULL;
278: }
1.10 misho 279: } else
280: arr = NULL;
1.7 misho 281:
1.10 misho 282: if (!(f = rpc_srv_getCall(s, ntohs(rpc->call_tag)))) {
1.7 misho 283: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
1.18 misho 284:
1.7 misho 285: rpc->call_argc ^= rpc->call_argc;
286: rpc->call_rep.ret = RPC_ERROR(-1);
287: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
288: } else {
1.8 misho 289: /* if client doesn't want reply */
1.10 misho 290: argc = RPC_CHK_NOREPLY(rpc);
291: rpc->call_rep.ret = RPC_ERROR(rpc_srv_execCall(c, rpc, f->func_name, arr));
1.7 misho 292: if (rpc->call_rep.ret == htonl(-1)) {
1.20 misho 293: if (!rpc->call_rep.eno) {
294: LOGERR;
295: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
296: }
1.7 misho 297: rpc->call_argc ^= rpc->call_argc;
298: } else {
299: rpc->call_rep.eno ^= rpc->call_rep.eno;
1.8 misho 300: if (argc) {
301: /* without reply */
1.14 misho 302: ait_freeVars(&c->cli_vars);
1.8 misho 303: rpc->call_argc ^= rpc->call_argc;
1.10 misho 304: } else {
305: /* reply */
1.14 misho 306: rpc->call_argc = htons(array_Size(RPC_RETVARS(c)));
1.10 misho 307: }
1.7 misho 308: }
309: }
310:
1.14 misho 311: array_Destroy(&arr);
1.7 misho 312: return NULL;
313: }
314:
315: static void *
316: rxPacket(sched_task_t *task)
317: {
318: rpc_cli_t *c = TASK_ARG(task);
319: rpc_srv_t *s = c->cli_parent;
1.18 misho 320: int len, rlen, noreply, estlen;
1.15 misho 321: #if 0
322: u_short crc;
323: #endif
1.10 misho 324: u_char *buf = AIT_GET_BUF(&c->cli_buf);
1.18 misho 325: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
1.19 misho 326: struct pollfd pfd;
1.21 misho 327: #ifdef TCP_SESSION_TIMEOUT
328: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
329:
330: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
331: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
332: TASK_ARG(task), ts, TASK_ARG(task), 0);
333: #endif
1.7 misho 334:
1.18 misho 335: memset(buf, 0, sizeof(struct tagRPCCall));
1.19 misho 336: rlen = recv(TASK_FD(task), rpc, sizeof(struct tagRPCCall), MSG_PEEK);
1.18 misho 337: if (rlen < sizeof(struct tagRPCCall)) {
1.10 misho 338: /* close connection */
1.13 misho 339: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
340: TASK_ARG(task), 0, NULL, 0);
1.7 misho 341: return NULL;
1.10 misho 342: } else {
1.18 misho 343: estlen = ntohl(rpc->call_len);
344: if (estlen > AIT_LEN(&c->cli_buf))
345: AIT_RE_BUF(&c->cli_buf, estlen);
346: rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
1.19 misho 347: buf = AIT_GET_BUF(&c->cli_buf);
348: len = estlen;
1.18 misho 349: }
350:
1.19 misho 351: /* get next part of packet */
352: memset(buf, 0, len);
353: pfd.fd = TASK_FD(task);
354: pfd.events = POLLIN | POLLPRI;
355: for (; len > 0; len -= rlen, buf += rlen) {
356: if ((rlen = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 ||
357: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
358: if (rlen)
359: LOGERR;
360: else
1.22 misho 361: rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
1.19 misho 362: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
363: TASK_ARG(task), 0, NULL, 0);
364: return NULL;
365: }
1.18 misho 366: rlen = recv(TASK_FD(task), buf, len, 0);
367: if (rlen == -1) {
368: /* close connection */
369: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
370: TASK_ARG(task), 0, NULL, 0);
1.8 misho 371: return NULL;
1.10 misho 372: }
1.18 misho 373: }
1.22 misho 374: len = estlen;
1.8 misho 375:
1.15 misho 376: #if 0
1.18 misho 377: /* check integrity of packet */
378: crc = ntohs(rpc->call_crc);
379: rpc->call_crc ^= rpc->call_crc;
380: if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
381: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
382: return NULL;
383: }
1.15 misho 384: #endif
1.7 misho 385:
1.18 misho 386: noreply = RPC_CHK_NOREPLY(rpc);
1.7 misho 387:
1.18 misho 388: /* check RPC packet session info */
389: if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) {
390: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
1.7 misho 391:
1.18 misho 392: rpc->call_argc ^= rpc->call_argc;
393: rpc->call_rep.ret = RPC_ERROR(-1);
394: rpc->call_rep.eno = RPC_ERROR(errno);
395: } else {
396: /* execute RPC call */
397: schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), (int) noreply, rpc, len);
398: }
1.7 misho 399:
1.18 misho 400: /* send RPC reply */
401: if (!noreply)
402: schedWrite(TASK_ROOT(task), cbProto[s->srv_proto][CB_TXPACKET],
403: TASK_ARG(task), TASK_FD(task), rpc, len);
1.7 misho 404:
405: /* lets get next packet */
1.18 misho 406: schedReadSelf(task);
1.7 misho 407: return NULL;
408: }
409:
1.1 misho 410: static void *
1.10 misho 411: acceptClients(sched_task_t *task)
1.1 misho 412: {
1.10 misho 413: rpc_srv_t *srv = TASK_ARG(task);
414: rpc_cli_t *c = NULL;
1.14 misho 415: socklen_t salen = sizeof(sockaddr_t);
1.21 misho 416: int sock;
417: #ifdef TCP_SESSION_TIMEOUT
418: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
419: #endif
1.7 misho 420:
1.13 misho 421: c = _allocClient(srv, NULL);
1.21 misho 422: if (!c) {
423: EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
424: if ((sock = accept(TASK_FD(task), NULL, NULL)) != -1) {
425: shutdown(sock, SHUT_RDWR);
426: close(sock);
427: }
1.10 misho 428: goto end;
1.21 misho 429: }
1.10 misho 430:
431: /* accept client */
432: c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
433: if (c->cli_sock == -1) {
434: LOGERR;
435: AIT_FREE_VAL(&c->cli_buf);
1.14 misho 436: array_Del(srv->srv_clients, c->cli_id, 42);
1.10 misho 437: goto end;
1.1 misho 438: } else
1.10 misho 439: fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
1.1 misho 440:
1.21 misho 441: #ifdef TCP_SESSION_TIMEOUT
442: /* armed timer for close stateless connection */
443: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
444: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], c,
445: ts, c, 0);
446: #endif
1.13 misho 447: schedRead(TASK_ROOT(task), cbProto[srv->srv_proto][CB_RXPACKET], c,
448: c->cli_sock, NULL, 0);
1.10 misho 449: end:
450: schedReadSelf(task);
451: return NULL;
452: }
1.5 misho 453:
1.7 misho 454:
1.10 misho 455: static void *
1.13 misho 456: txUDPPacket(sched_task_t *task)
1.10 misho 457: {
458: rpc_cli_t *c = TASK_ARG(task);
459: rpc_srv_t *s = c->cli_parent;
1.13 misho 460: rpc_func_t *f = NULL;
1.18 misho 461: u_char *buf = AIT_GET_BUF(&c->cli_buf);
1.13 misho 462: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
1.22 misho 463: int ret, estlen, wlen = sizeof(struct tagRPCCall);
1.13 misho 464: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
1.22 misho 465: struct pollfd pfd;
1.13 misho 466:
467: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
468: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
469: TASK_ARG(task), ts, TASK_ARG(task), 0);
470:
471: if (rpc->call_argc) {
472: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
473: if (!f) {
474: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
475: rpc->call_argc ^= rpc->call_argc;
476: rpc->call_rep.ret = RPC_ERROR(-1);
477: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
478: } else {
1.22 misho 479: /* calc estimated length */
480: estlen = ait_resideVars(RPC_RETVARS(c)) + wlen;
481: if (estlen > AIT_LEN(&c->cli_buf))
482: AIT_RE_BUF(&c->cli_buf, estlen);
483: buf = AIT_GET_BUF(&c->cli_buf);
484: rpc = (struct tagRPCCall*) buf;
485:
1.14 misho 486: rpc->call_argc = htons(array_Size(RPC_RETVARS(c)));
1.13 misho 487: /* Go Encapsulate variables */
1.18 misho 488: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
489: RPC_RETVARS(c));
1.13 misho 490: /* Free return values */
1.14 misho 491: ait_freeVars(&c->cli_vars);
1.13 misho 492: if (ret == -1) {
493: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
494: rpc->call_argc ^= rpc->call_argc;
495: rpc->call_rep.ret = RPC_ERROR(-1);
496: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
497: } else
498: wlen += ret;
499: }
500: }
1.7 misho 501:
1.18 misho 502: rpc->call_len = htonl(wlen);
1.7 misho 503:
1.13 misho 504: /* calculate CRC */
505: rpc->call_crc ^= rpc->call_crc;
506: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
507:
508: /* send reply */
1.22 misho 509: pfd.fd = TASK_FD(task);
510: pfd.events = POLLOUT;
511: for (; wlen > 0; wlen -= ret, buf += ret) {
512: if ((ret = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 ||
513: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
514: if (ret)
515: LOGERR;
516: else
517: rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
518: /* close connection */
519: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
520: TASK_ARG(task), 0, NULL, 0);
521: return NULL;
522: }
523: ret = sendto(TASK_FD(task), buf, MIN(wlen, s->srv_netbuf), MSG_NOSIGNAL,
524: &c->cli_sa.sa, c->cli_sa.sa.sa_len);
525: if (ret == -1) {
526: /* close connection */
527: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
528: TASK_ARG(task), 0, NULL, 0);
529: return NULL;
530: }
1.13 misho 531: }
532:
533: return NULL;
534: }
535:
536: static void *
537: rxUDPPacket(sched_task_t *task)
538: {
539: rpc_srv_t *srv = TASK_ARG(task);
540: rpc_cli_t *c = NULL;
1.22 misho 541: int len, rlen, noreply, estlen;
542: u_short crc;
543: u_char *buf, b[sizeof(struct tagRPCCall)];
544: struct tagRPCCall *rpc = (struct tagRPCCall*) b;
1.14 misho 545: sockaddr_t sa;
546: socklen_t salen;
1.13 misho 547: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
1.22 misho 548: struct pollfd pfd;
1.13 misho 549:
550: /* receive connect packet */
1.14 misho 551: salen = sa.ss.ss_len = sizeof(sockaddr_t);
1.22 misho 552: rlen = recvfrom(TASK_FD(task), b, sizeof b, MSG_PEEK, &sa.sa, &salen);
553: if (rlen < sizeof(struct tagRPCCall)) {
554: rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
1.13 misho 555: goto end;
1.22 misho 556: }
1.13 misho 557:
558: c = _allocClient(srv, &sa);
1.22 misho 559: if (!c) {
560: EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
561: usleep(2000); /* blocked client delay */
1.13 misho 562: goto end;
1.22 misho 563: } else {
564: estlen = ntohl(rpc->call_len);
565: if (estlen > AIT_LEN(&c->cli_buf))
566: AIT_RE_BUF(&c->cli_buf, estlen);
567: rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
568: buf = AIT_GET_BUF(&c->cli_buf);
569: len = estlen;
570:
1.13 misho 571: c->cli_sock = TASK_FD(task);
572: memcpy(&c->cli_sa, &sa, sizeof c->cli_sa);
1.22 misho 573:
1.13 misho 574: /* armed timer for close stateless connection */
575: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
576: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
577: c, ts, c, 0);
578: }
579:
1.22 misho 580: /* get next part of packet */
581: memset(buf, 0, len);
582: pfd.fd = TASK_FD(task);
583: pfd.events = POLLIN | POLLPRI;
584: for (; len > 0; len -= rlen, buf += rlen) {
585: if ((rlen = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 ||
586: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
587: if (rlen)
588: LOGERR;
589: else
590: rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
591: schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
592: c, 0, NULL, 0);
1.24 misho 593: goto end;
1.13 misho 594: }
1.22 misho 595: salen = sa.ss.ss_len = sizeof(sockaddr_t);
596: rlen = recvfrom(TASK_FD(task), buf, len, 0, &sa.sa, &salen);
597: if (rlen == -1) {
598: /* close connection */
599: schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
600: c, 0, NULL, 0);
1.24 misho 601: goto end;
1.13 misho 602: }
1.22 misho 603: if (e_addrcmp(&c->cli_sa, &sa, 42))
604: rlen ^= rlen; /* skip if arrive from different address */
605: }
606: len = estlen;
1.13 misho 607:
1.22 misho 608: /* check integrity of packet */
609: crc = ntohs(rpc->call_crc);
610: rpc->call_crc ^= rpc->call_crc;
611: if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
612: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
1.24 misho 613: /* close connection */
614: schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
615: c, 0, NULL, 0);
616: goto end;
617: }
618:
619: noreply = RPC_CHK_NOREPLY(rpc);
620:
621: /* check RPC packet session info */
622: if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
623: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
624:
625: rpc->call_argc ^= rpc->call_argc;
626: rpc->call_rep.ret = RPC_ERROR(-1);
627: rpc->call_rep.eno = RPC_ERROR(errno);
628: } else {
629: /* execute RPC call */
630: schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
631: }
632:
633: /* send RPC reply */
634: if (!noreply)
635: schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],
636: c, TASK_FD(task), rpc, len);
637: end:
638: schedReadSelf(task);
639: return NULL;
640: }
641:
642:
643: static void *
644: txBPFPacket(sched_task_t *task)
645: {
646: rpc_cli_t *c = TASK_ARG(task);
647: rpc_srv_t *s = c->cli_parent;
648: rpc_func_t *f = NULL;
649: u_char *buf = AIT_GET_BUF(&c->cli_buf);
650: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
651: int ret, len, wlen = sizeof(struct tagRPCCall);
652: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
653: struct ether_header *eh;
654: ait_val_t b = AIT_VAL_INIT;
655:
656: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
657: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
658: TASK_ARG(task), ts, TASK_ARG(task), 0);
659:
660: if (rpc->call_argc) {
661: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
662: if (!f) {
663: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
664: rpc->call_argc ^= rpc->call_argc;
665: rpc->call_rep.ret = RPC_ERROR(-1);
666: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
667: } else {
668: /* calc estimated length */
669: len = ait_resideVars(RPC_RETVARS(c)) + wlen;
670: if (len > AIT_LEN(&c->cli_buf))
671: AIT_RE_BUF(&c->cli_buf, len);
672: buf = AIT_GET_BUF(&c->cli_buf);
673: rpc = (struct tagRPCCall*) buf;
674:
675: rpc->call_argc = htons(array_Size(RPC_RETVARS(c)));
676: /* Go Encapsulate variables */
677: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
678: RPC_RETVARS(c));
679: /* Free return values */
680: ait_freeVars(&RPC_RETVARS(c));
681: if (ret == -1) {
682: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
683: rpc->call_argc ^= rpc->call_argc;
684: rpc->call_rep.ret = RPC_ERROR(-1);
685: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
686: } else
687: wlen += ret;
688: }
689: }
690:
691: rpc->call_len = htonl(wlen);
692:
693: /* calculate CRC */
694: rpc->call_crc ^= rpc->call_crc;
695: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
696:
697: /* send reply */
698: AIT_SET_BUF(&b, NULL, MIN(wlen, s->srv_netbuf) + ETHER_HDR_LEN);
699: eh = (struct ether_header*) AIT_GET_BUF(&b);
700: memcpy(eh->ether_dhost, LLADDR(&c->cli_sa.sdl), ETHER_ADDR_LEN);
701: eh->ether_type = htons(RPC_DEFPORT);
702: memcpy(eh + 1, buf, MIN(wlen, s->srv_netbuf));
703:
704: ret = write(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b));
705: AIT_FREE_VAL(&b);
706: if (ret == -1) {
707: /* close connection */
708: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
709: TASK_ARG(task), 0, NULL, 0);
1.22 misho 710: return NULL;
711: }
1.13 misho 712:
1.24 misho 713: return NULL;
714: }
715:
716: static void *
717: rxBPFPacket(sched_task_t *task)
718: {
719: rpc_srv_t *srv = TASK_ARG(task);
720: rpc_cli_t *c = NULL;
721: int len, rlen, noreply;
722: u_short crc;
723: struct tagRPCCall *rpc;
724: sockaddr_t sa;
725: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
726: struct bpf_hdr *h;
727: struct ether_header *eh;
728: ait_val_t b = AIT_VAL_INIT;
729:
730: /* receive connect packet */
731: AIT_SET_BUF(&b, NULL, srv->srv_netbuf);
732: rlen = read(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b));
733: h = (struct bpf_hdr*) AIT_GET_BUF(&b);
734: rlen -= h->bh_hdrlen;
735: if (rlen < h->bh_datalen || h->bh_caplen != h->bh_datalen ||
736: rlen < ETHER_HDR_LEN + sizeof(struct tagRPCCall)) {
737: rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
738: goto end;
739: } else {
740: rlen = h->bh_caplen;
741: eh = (struct ether_header*) (AIT_GET_BUF(&b) + h->bh_hdrlen);
742: rlen -= ETHER_HDR_LEN;
743: rpc = (struct tagRPCCall*) (eh + 1);
744: if (eh->ether_type != ntohs(RPC_DEFPORT))
745: goto end;
746: else
747: e_getlinkbymac((const ether_addr_t*) eh->ether_shost, &sa);
748: }
749:
750: c = _allocClient(srv, &sa);
751: if (!c) {
752: EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
753: usleep(2000); /* blocked client delay */
754: goto end;
755: } else {
756: len = ntohl(rpc->call_len);
757: if (len > AIT_LEN(&c->cli_buf))
758: AIT_RE_BUF(&c->cli_buf, len);
759: memcpy(AIT_GET_BUF(&c->cli_buf), rpc, AIT_LEN(&c->cli_buf));
760: rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
761:
762: c->cli_sock = TASK_FD(task);
763: memcpy(&c->cli_sa, &sa, sizeof c->cli_sa);
764:
765: /* armed timer for close stateless connection */
766: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
767: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
768: c, ts, c, 0);
769: }
770:
771: /* check integrity of packet */
772: crc = ntohs(rpc->call_crc);
773: rpc->call_crc ^= rpc->call_crc;
774: if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
775: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
776: /* close connection */
777: schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
778: c, 0, NULL, 0);
779: goto end;
780: }
781:
1.22 misho 782: noreply = RPC_CHK_NOREPLY(rpc);
1.18 misho 783:
1.22 misho 784: /* check RPC packet session info */
785: if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
786: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
1.13 misho 787:
1.22 misho 788: rpc->call_argc ^= rpc->call_argc;
789: rpc->call_rep.ret = RPC_ERROR(-1);
790: rpc->call_rep.eno = RPC_ERROR(errno);
791: } else {
792: /* execute RPC call */
793: schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
794: }
1.13 misho 795:
1.22 misho 796: /* send RPC reply */
797: if (!noreply)
1.24 misho 798: schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],
1.22 misho 799: c, TASK_FD(task), rpc, len);
1.13 misho 800: end:
1.24 misho 801: AIT_FREE_VAL(&b);
1.13 misho 802: schedReadSelf(task);
803: return NULL;
804: }
805:
806: /* ------------------------------------------------------ */
807:
1.16 misho 808: void
1.13 misho 809: rpc_freeBLOBCli(rpc_cli_t * __restrict c)
810: {
811: rpc_srv_t *s = c->cli_parent;
812:
813: schedCancelby(s->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
1.10 misho 814:
815: /* free buffer */
816: AIT_FREE_VAL(&c->cli_buf);
817:
1.14 misho 818: array_Del(s->srv_blob.clients, c->cli_id, 0);
1.10 misho 819: if (c)
1.14 misho 820: e_free(c);
1.13 misho 821: }
822:
823:
824: static void *
825: closeBLOBClient(sched_task_t *task)
826: {
827: int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock;
828:
829: rpc_freeBLOBCli(TASK_ARG(task));
830:
831: /* close client socket */
832: shutdown(sock, SHUT_RDWR);
833: close(sock);
1.7 misho 834: return NULL;
835: }
836:
837: static void *
838: txBLOB(sched_task_t *task)
839: {
1.10 misho 840: rpc_cli_t *c = TASK_ARG(task);
841: u_char *buf = AIT_GET_BUF(&c->cli_buf);
1.7 misho 842: int wlen = sizeof(struct tagBLOBHdr);
843:
844: /* send reply */
1.10 misho 845: wlen = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);
846: if (wlen == -1 || wlen != sizeof(struct tagBLOBHdr)) {
847: /* close blob connection */
848: schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
849: }
1.7 misho 850:
851: return NULL;
852: }
1.4 misho 853:
1.7 misho 854: static void *
855: rxBLOB(sched_task_t *task)
856: {
857: rpc_cli_t *c = TASK_ARG(task);
858: rpc_srv_t *s = c->cli_parent;
859: rpc_blob_t *b;
1.10 misho 860: struct tagBLOBHdr blob;
1.7 misho 861: int rlen;
862:
1.10 misho 863: memset(&blob, 0, sizeof blob);
864: rlen = recv(TASK_FD(task), &blob, sizeof blob, 0);
865: if (rlen < 1) {
866: /* close blob connection */
867: schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
1.7 misho 868: return NULL;
869: }
1.5 misho 870:
1.10 misho 871: /* check BLOB packet */
872: if (rlen < sizeof(struct tagBLOBHdr)) {
873: rpc_SetErr(ERPCMISMATCH, "Short BLOB packet");
1.6 misho 874:
1.10 misho 875: schedReadSelf(task);
1.7 misho 876: return NULL;
877: }
1.1 misho 878:
1.7 misho 879: /* check RPC packet session info */
1.15 misho 880: if (rpc_chkPktSession(&blob.hdr_session, &s->srv_session)) {
1.7 misho 881: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
1.10 misho 882: blob.hdr_cmd = error;
1.7 misho 883: goto end;
884: }
885:
886: /* Go to proceed packet ... */
1.10 misho 887: switch (blob.hdr_cmd) {
1.7 misho 888: case get:
1.10 misho 889: if (!(b = rpc_srv_getBLOB(s, ntohl(blob.hdr_var)))) {
890: rpc_SetErr(EINVAL, "Var=%x not found", ntohl(blob.hdr_var));
891: blob.hdr_cmd = no;
892: blob.hdr_ret = RPC_ERROR(-1);
1.7 misho 893: break;
894: } else
1.10 misho 895: blob.hdr_len = htonl(b->blob_len);
1.5 misho 896:
1.7 misho 897: if (rpc_srv_blobMap(s, b) != -1) {
898: /* deliver BLOB variable to client */
1.10 misho 899: blob.hdr_ret = htonl(rpc_srv_sendBLOB(c, b));
1.7 misho 900: rpc_srv_blobUnmap(b);
901: } else {
1.10 misho 902: blob.hdr_cmd = error;
903: blob.hdr_ret = RPC_ERROR(-1);
1.7 misho 904: }
905: break;
906: case set:
1.17 misho 907: if ((b = rpc_srv_registerBLOB(s, ntohl(blob.hdr_len),
908: ntohl(blob.hdr_ret)))) {
1.7 misho 909: /* set new BLOB variable for reply :) */
1.10 misho 910: blob.hdr_var = htonl(b->blob_var);
1.7 misho 911:
912: /* receive BLOB from client */
1.10 misho 913: blob.hdr_ret = htonl(rpc_srv_recvBLOB(c, b));
1.7 misho 914: rpc_srv_blobUnmap(b);
1.5 misho 915: } else {
1.10 misho 916: blob.hdr_cmd = error;
917: blob.hdr_ret = RPC_ERROR(-1);
1.7 misho 918: }
919: break;
920: case unset:
1.11 misho 921: if (rpc_srv_unregisterBLOB(s, ntohl(blob.hdr_var)) == -1) {
1.10 misho 922: blob.hdr_cmd = error;
923: blob.hdr_ret = RPC_ERROR(-1);
1.1 misho 924: }
925: break;
1.7 misho 926: default:
1.10 misho 927: rpc_SetErr(EPROCUNAVAIL, "Unsupported BLOB command %d", blob.hdr_cmd);
928: blob.hdr_cmd = error;
929: blob.hdr_ret = RPC_ERROR(-1);
1.7 misho 930: }
1.1 misho 931:
1.7 misho 932: end:
1.10 misho 933: memcpy(AIT_ADDR(&c->cli_buf), &blob, sizeof blob);
934: schedWrite(TASK_ROOT(task), txBLOB, TASK_ARG(task), TASK_FD(task), NULL, 0);
935: schedReadSelf(task);
1.7 misho 936: return NULL;
1.2 misho 937: }
938:
939: static void *
1.17 misho 940: flushBLOB(sched_task_t *task)
941: {
1.23 misho 942: uintptr_t sigArg = atomic_load_acq_ptr(&_glSigArg);
943: rpc_srv_t *srv = sigArg ? (void*) sigArg : TASK_ARG(task);
1.17 misho 944: rpc_blob_t *b, *tmp;
945:
946: TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
947: TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
948:
949: rpc_srv_blobFree(srv, b);
950: e_free(b);
951: }
952:
1.23 misho 953: if (!schedSignalSelf(task)) {
954: /* disabled kqueue support in libaitsched */
955: struct sigaction sa;
956:
957: memset(&sa, 0, sizeof sa);
958: sigemptyset(&sa.sa_mask);
959: sa.sa_handler = (void (*)(int)) flushBLOB;
960: sa.sa_flags = SA_RESTART | SA_RESETHAND;
961: sigaction(SIGFBLOB, &sa, NULL);
962: }
963:
1.17 misho 964: return NULL;
965: }
966:
967: static void *
1.10 misho 968: acceptBLOBClients(sched_task_t *task)
1.2 misho 969: {
1.10 misho 970: rpc_srv_t *srv = TASK_ARG(task);
971: rpc_cli_t *c = NULL;
972: register int i;
1.14 misho 973: socklen_t salen = sizeof(sockaddr_t);
1.21 misho 974: int sock;
1.12 misho 975: #ifdef TCP_NOPUSH
976: int n = 1;
977: #endif
1.7 misho 978:
1.10 misho 979: /* check free slots for connect */
1.14 misho 980: for (i = 0; i < array_Size(srv->srv_blob.clients) &&
981: (c = array(srv->srv_blob.clients, i, rpc_cli_t*)); i++);
1.21 misho 982: if (c) { /* no more free slots! */
983: EVERBOSE(1, "BLOB client quota exceeded! Connection will be shutdown!\n");
984: if ((sock = accept(TASK_FD(task), NULL, NULL)) != -1) {
985: shutdown(sock, SHUT_RDWR);
986: close(sock);
987: }
1.10 misho 988: goto end;
1.21 misho 989: }
990:
1.14 misho 991: c = e_malloc(sizeof(rpc_cli_t));
1.10 misho 992: if (!c) {
1.7 misho 993: LOGERR;
1.10 misho 994: srv->srv_kill = srv->srv_blob.kill = 1;
1.7 misho 995: return NULL;
996: } else {
1.10 misho 997: memset(c, 0, sizeof(rpc_cli_t));
1.14 misho 998: array_Set(srv->srv_blob.clients, i, c);
1.10 misho 999: c->cli_id = i;
1000: c->cli_parent = srv;
1.7 misho 1001: }
1.4 misho 1002:
1.10 misho 1003: /* alloc empty buffer */
1.14 misho 1004: AIT_SET_BUFSIZ(&c->cli_buf, 0, srv->srv_netbuf);
1.2 misho 1005:
1.10 misho 1006: /* accept client */
1007: c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
1008: if (c->cli_sock == -1) {
1009: LOGERR;
1010: AIT_FREE_VAL(&c->cli_buf);
1.14 misho 1011: array_Del(srv->srv_blob.clients, i, 42);
1.10 misho 1012: goto end;
1.12 misho 1013: } else {
1014: #ifdef TCP_NOPUSH
1015: setsockopt(c->cli_sock, IPPROTO_TCP, TCP_NOPUSH, &n, sizeof n);
1016: #endif
1.10 misho 1017: fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
1.12 misho 1018: }
1.2 misho 1019:
1.10 misho 1020: schedRead(TASK_ROOT(task), rxBLOB, c, c->cli_sock, NULL, 0);
1021: end:
1022: schedReadSelf(task);
1.7 misho 1023: return NULL;
1.1 misho 1024: }
1025:
1.10 misho 1026: /* ------------------------------------------------------ */
1.1 misho 1027:
1028: /*
1.7 misho 1029: * rpc_srv_initBLOBServer() - Init & create BLOB Server
1030: *
1.4 misho 1031: * @srv = RPC server instance
1.2 misho 1032: * @Port = Port for bind server, if Port == 0 default port is selected
1033: * @diskDir = Disk place for BLOB file objects
1034: * return: -1 == error or 0 bind and created BLOB server instance
1035: */
1036: int
1037: rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)
1038: {
1039: int n = 1;
1040:
1.10 misho 1041: if (!srv || srv->srv_kill) {
1.7 misho 1042: rpc_SetErr(EINVAL, "Invalid parameters can`t init BLOB server");
1.2 misho 1043: return -1;
1044: }
1045:
1046: memset(&srv->srv_blob, 0, sizeof srv->srv_blob);
1047: if (access(diskDir, R_OK | W_OK) == -1) {
1048: LOGERR;
1049: return -1;
1050: } else
1.7 misho 1051: AIT_SET_STR(&srv->srv_blob.dir, diskDir);
1.2 misho 1052:
1.10 misho 1053: /* init blob list */
1054: TAILQ_INIT(&srv->srv_blob.blobs);
1055:
1.2 misho 1056: srv->srv_blob.server.cli_parent = srv;
1.4 misho 1057:
1.14 misho 1058: memcpy(&srv->srv_blob.server.cli_sa, &srv->srv_server.cli_sa, sizeof(sockaddr_t));
1.10 misho 1059: switch (srv->srv_blob.server.cli_sa.sa.sa_family) {
1.4 misho 1060: case AF_INET:
1.10 misho 1061: srv->srv_blob.server.cli_sa.sin.sin_port =
1062: htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin.sin_port) + 1);
1.4 misho 1063: break;
1064: case AF_INET6:
1.10 misho 1065: srv->srv_blob.server.cli_sa.sin6.sin6_port =
1066: htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin6.sin6_port) + 1);
1.4 misho 1067: break;
1068: case AF_LOCAL:
1.10 misho 1069: strlcat(srv->srv_blob.server.cli_sa.sun.sun_path, ".blob",
1070: sizeof srv->srv_blob.server.cli_sa.sun.sun_path);
1.4 misho 1071: break;
1072: default:
1.7 misho 1073: AIT_FREE_VAL(&srv->srv_blob.dir);
1.4 misho 1074: return -1;
1.2 misho 1075: }
1076:
1.4 misho 1077: /* create BLOB server socket */
1.6 misho 1078: srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);
1.2 misho 1079: if (srv->srv_blob.server.cli_sock == -1) {
1080: LOGERR;
1.7 misho 1081: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 1082: return -1;
1083: }
1084: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
1085: LOGERR;
1086: close(srv->srv_blob.server.cli_sock);
1.7 misho 1087: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 1088: return -1;
1089: }
1.5 misho 1090: n = srv->srv_netbuf;
1091: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
1092: LOGERR;
1093: close(srv->srv_blob.server.cli_sock);
1.7 misho 1094: AIT_FREE_VAL(&srv->srv_blob.dir);
1.5 misho 1095: return -1;
1096: }
1097: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
1098: LOGERR;
1099: close(srv->srv_blob.server.cli_sock);
1.7 misho 1100: AIT_FREE_VAL(&srv->srv_blob.dir);
1.5 misho 1101: return -1;
1102: }
1.6 misho 1103: if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa,
1104: srv->srv_blob.server.cli_sa.sa.sa_len) == -1) {
1.2 misho 1105: LOGERR;
1106: close(srv->srv_blob.server.cli_sock);
1.7 misho 1107: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 1108: return -1;
1.13 misho 1109: } else
1110: fcntl(srv->srv_blob.server.cli_sock, F_SETFL,
1111: fcntl(srv->srv_blob.server.cli_sock, F_GETFL) | O_NONBLOCK);
1112:
1.2 misho 1113:
1.10 misho 1114: /* allocate pool for concurent blob clients */
1.14 misho 1115: srv->srv_blob.clients = array_Init(array_Size(srv->srv_clients));
1.2 misho 1116: if (!srv->srv_blob.clients) {
1.14 misho 1117: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
1.2 misho 1118: close(srv->srv_blob.server.cli_sock);
1.7 misho 1119: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 1120: return -1;
1.10 misho 1121: }
1.2 misho 1122:
1.10 misho 1123: /* init blob scheduler */
1124: srv->srv_blob.root = schedBegin();
1125: if (!srv->srv_blob.root) {
1126: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1.14 misho 1127: array_Destroy(&srv->srv_blob.clients);
1.10 misho 1128: close(srv->srv_blob.server.cli_sock);
1129: AIT_FREE_VAL(&srv->srv_blob.dir);
1130: return -1;
1131: }
1.2 misho 1132:
1133: return 0;
1134: }
1135:
1136: /*
1.7 misho 1137: * rpc_srv_endBLOBServer() - Destroy BLOB server, close all opened sockets and free resources
1138: *
1.2 misho 1139: * @srv = RPC Server instance
1140: * return: none
1141: */
1.16 misho 1142: void
1.2 misho 1143: rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
1144: {
1.10 misho 1145: if (!srv)
1.2 misho 1146: return;
1147:
1.10 misho 1148: srv->srv_blob.kill = 1;
1.17 misho 1149:
1150: schedEnd(&srv->srv_blob.root);
1.2 misho 1151: }
1152:
1153: /*
1.10 misho 1154: * rpc_srv_loopBLOBServer() - Execute Main BLOB server loop and wait for clients requests
1.7 misho 1155: *
1.2 misho 1156: * @srv = RPC Server instance
1157: * return: -1 error or 0 ok, infinite loop ...
1158: */
1159: int
1.10 misho 1160: rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv)
1.2 misho 1161: {
1.10 misho 1162: rpc_cli_t *c;
1.2 misho 1163: register int i;
1.10 misho 1164: rpc_blob_t *b, *tmp;
1165: struct timespec ts = { RPC_SCHED_POLLING, 0 };
1.2 misho 1166:
1.10 misho 1167: if (!srv || srv->srv_kill) {
1.7 misho 1168: rpc_SetErr(EINVAL, "Invalid parameter can`t start BLOB server");
1.2 misho 1169: return -1;
1170: }
1171:
1.14 misho 1172: if (listen(srv->srv_blob.server.cli_sock, array_Size(srv->srv_blob.clients)) == -1) {
1.2 misho 1173: LOGERR;
1174: return -1;
1.10 misho 1175: }
1176:
1.23 misho 1177: if (!schedSignal(srv->srv_blob.root, flushBLOB, srv, SIGFBLOB, NULL, 0)) {
1178: /* disabled kqueue support in libaitsched */
1179: struct sigaction sa;
1180:
1181: atomic_store_rel_ptr(&_glSigArg, (uintptr_t) srv);
1182:
1183: memset(&sa, 0, sizeof sa);
1184: sigemptyset(&sa.sa_mask);
1185: sa.sa_handler = (void (*)(int)) flushBLOB;
1186: sa.sa_flags = SA_RESTART | SA_RESETHAND;
1187: sigaction(SIGFBLOB, &sa, NULL);
1188: }
1189:
1.10 misho 1190: if (!schedRead(srv->srv_blob.root, acceptBLOBClients, srv,
1191: srv->srv_blob.server.cli_sock, NULL, 0)) {
1192: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1193: return -1;
1194: }
1.2 misho 1195:
1.10 misho 1196: schedPolling(srv->srv_blob.root, &ts, NULL);
1197: /* main rpc loop */
1198: schedRun(srv->srv_blob.root, &srv->srv_blob.kill);
1.7 misho 1199:
1.17 misho 1200: /* detach blobs */
1201: TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
1202: TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
1203:
1204: rpc_srv_blobFree(srv, b);
1205: e_free(b);
1206: }
1207:
1.10 misho 1208: /* close all clients connections & server socket */
1.14 misho 1209: for (i = 0; i < array_Size(srv->srv_blob.clients); i++) {
1210: c = array(srv->srv_blob.clients, i, rpc_cli_t*);
1.10 misho 1211: if (c) {
1212: shutdown(c->cli_sock, SHUT_RDWR);
1213: close(c->cli_sock);
1214:
1215: schedCancelby(srv->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
1216: AIT_FREE_VAL(&c->cli_buf);
1.2 misho 1217: }
1.14 misho 1218: array_Del(srv->srv_blob.clients, i, 42);
1.10 misho 1219: }
1.14 misho 1220: array_Destroy(&srv->srv_blob.clients);
1.2 misho 1221:
1.10 misho 1222: close(srv->srv_blob.server.cli_sock);
1.2 misho 1223:
1.10 misho 1224: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 1225: return 0;
1226: }
1227:
1228:
1229: /*
1.7 misho 1230: * rpc_srv_initServer() - Init & create RPC Server
1231: *
1.15 misho 1232: * @InstID = Instance for authentication & recognition
1.1 misho 1233: * @concurentClients = Concurent clients at same time to this server
1.10 misho 1234: * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
1.4 misho 1235: * @csHost = Host name or address for bind server, if NULL any address
1.1 misho 1236: * @Port = Port for bind server, if Port == 0 default port is selected
1.13 misho 1237: * @proto = Protocol, if == 0 choose SOCK_STREAM
1.1 misho 1238: * return: NULL == error or !=NULL bind and created RPC server instance
1239: */
1240: rpc_srv_t *
1.15 misho 1241: rpc_srv_initServer(u_char InstID, int concurentClients, int netBuf,
1242: const char *csHost, u_short Port, int proto)
1.1 misho 1243: {
1.10 misho 1244: int n = 1;
1.1 misho 1245: rpc_srv_t *srv = NULL;
1.14 misho 1246: sockaddr_t sa = E_SOCKADDR_INIT;
1.1 misho 1247:
1.24.2.2 misho 1248: if (!concurentClients || (proto < 0 || proto > SOCK_RAW)) {
1.10 misho 1249: rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
1.1 misho 1250: return NULL;
1251: }
1.14 misho 1252: if (!e_gethostbyname(csHost, Port, &sa))
1.10 misho 1253: return NULL;
1.1 misho 1254: if (!Port)
1255: Port = RPC_DEFPORT;
1.13 misho 1256: if (!proto)
1257: proto = SOCK_STREAM;
1.10 misho 1258: if (netBuf < RPC_MIN_BUFSIZ)
1.5 misho 1259: netBuf = BUFSIZ;
1.7 misho 1260: else
1.14 misho 1261: netBuf = E_ALIGN(netBuf, 2); /* align netBuf length */
1.10 misho 1262:
1263: #ifdef HAVE_SRANDOMDEV
1264: srandomdev();
1265: #else
1266: time_t tim;
1267:
1268: srandom((time(&tim) ^ getpid()));
1269: #endif
1.1 misho 1270:
1.14 misho 1271: srv = e_malloc(sizeof(rpc_srv_t));
1.1 misho 1272: if (!srv) {
1273: LOGERR;
1274: return NULL;
1275: } else
1276: memset(srv, 0, sizeof(rpc_srv_t));
1277:
1.13 misho 1278: srv->srv_proto = proto;
1.5 misho 1279: srv->srv_netbuf = netBuf;
1.1 misho 1280: srv->srv_session.sess_version = RPC_VERSION;
1.15 misho 1281: srv->srv_session.sess_instance = InstID;
1.1 misho 1282:
1283: srv->srv_server.cli_parent = srv;
1.10 misho 1284: memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa);
1285:
1.12 misho 1286: /* init functions */
1287: pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
1288: SLIST_INIT(&srv->srv_funcs);
1289: AVL_INIT(&srv->srv_funcs);
1.10 misho 1290:
1291: /* init scheduler */
1292: srv->srv_root = schedBegin();
1293: if (!srv->srv_root) {
1294: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1.12 misho 1295: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1.14 misho 1296: e_free(srv);
1.10 misho 1297: return NULL;
1298: }
1299:
1300: /* init pool for clients */
1.14 misho 1301: srv->srv_clients = array_Init(concurentClients);
1.10 misho 1302: if (!srv->srv_clients) {
1.14 misho 1303: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
1.10 misho 1304: schedEnd(&srv->srv_root);
1.12 misho 1305: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1.14 misho 1306: e_free(srv);
1.10 misho 1307: return NULL;
1308: }
1.4 misho 1309:
1310: /* create server socket */
1.13 misho 1311: srv->srv_server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, srv->srv_proto, 0);
1.1 misho 1312: if (srv->srv_server.cli_sock == -1) {
1313: LOGERR;
1.14 misho 1314: array_Destroy(&srv->srv_clients);
1.10 misho 1315: schedEnd(&srv->srv_root);
1.12 misho 1316: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1.14 misho 1317: e_free(srv);
1.1 misho 1318: return NULL;
1319: }
1320: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
1321: LOGERR;
1.10 misho 1322: goto err;
1.1 misho 1323: }
1.5 misho 1324: n = srv->srv_netbuf;
1325: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
1326: LOGERR;
1.10 misho 1327: goto err;
1.5 misho 1328: }
1329: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
1330: LOGERR;
1.10 misho 1331: goto err;
1.5 misho 1332: }
1.6 misho 1333: if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa,
1334: srv->srv_server.cli_sa.sa.sa_len) == -1) {
1.1 misho 1335: LOGERR;
1.10 misho 1336: goto err;
1.13 misho 1337: } else
1338: fcntl(srv->srv_server.cli_sock, F_SETFL,
1339: fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
1.1 misho 1340:
1.10 misho 1341: rpc_register_srvPing(srv);
1.8 misho 1342:
1.1 misho 1343: return srv;
1.10 misho 1344: err: /* error condition */
1345: close(srv->srv_server.cli_sock);
1.14 misho 1346: array_Destroy(&srv->srv_clients);
1.10 misho 1347: schedEnd(&srv->srv_root);
1.12 misho 1348: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1.14 misho 1349: e_free(srv);
1.10 misho 1350: return NULL;
1.1 misho 1351: }
1352:
1353: /*
1.7 misho 1354: * rpc_srv_endServer() - Destroy RPC server, close all opened sockets and free resources
1355: *
1.6 misho 1356: * @psrv = RPC Server instance
1.1 misho 1357: * return: none
1358: */
1.16 misho 1359: void
1.6 misho 1360: rpc_srv_endServer(rpc_srv_t ** __restrict psrv)
1.1 misho 1361: {
1.10 misho 1362: if (!psrv || !*psrv)
1.1 misho 1363: return;
1364:
1.10 misho 1365: /* if send kill to blob server */
1.17 misho 1366: rpc_srv_endBLOBServer(*psrv);
1.1 misho 1367:
1.10 misho 1368: (*psrv)->srv_kill = 1;
1369: sleep(RPC_SCHED_POLLING);
1.2 misho 1370:
1.17 misho 1371: schedEnd(&(*psrv)->srv_root);
1372:
1.12 misho 1373: pthread_mutex_destroy(&(*psrv)->srv_funcs.mtx);
1.14 misho 1374: e_free(*psrv);
1.6 misho 1375: *psrv = NULL;
1.1 misho 1376: }
1377:
1378: /*
1.7 misho 1379: * rpc_srv_loopServer() - Execute Main server loop and wait for clients requests
1380: *
1.1 misho 1381: * @srv = RPC Server instance
1382: * return: -1 error or 0 ok, infinite loop ...
1383: */
1384: int
1.5 misho 1385: rpc_srv_loopServer(rpc_srv_t * __restrict srv)
1.1 misho 1386: {
1.10 misho 1387: rpc_cli_t *c;
1.1 misho 1388: register int i;
1.12 misho 1389: rpc_func_t *f;
1.10 misho 1390: struct timespec ts = { RPC_SCHED_POLLING, 0 };
1.1 misho 1391:
1392: if (!srv) {
1.10 misho 1393: rpc_SetErr(EINVAL, "Invalid parameter can`t start RPC server");
1.1 misho 1394: return -1;
1395: }
1396:
1.13 misho 1397: if (srv->srv_proto == SOCK_STREAM)
1.14 misho 1398: if (listen(srv->srv_server.cli_sock, array_Size(srv->srv_clients)) == -1) {
1.13 misho 1399: LOGERR;
1400: return -1;
1401: }
1.1 misho 1402:
1.13 misho 1403: if (!schedRead(srv->srv_root, cbProto[srv->srv_proto][CB_ACCEPTCLIENT], srv,
1404: srv->srv_server.cli_sock, NULL, 0)) {
1.10 misho 1405: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1406: return -1;
1407: }
1.7 misho 1408:
1.10 misho 1409: schedPolling(srv->srv_root, &ts, NULL);
1.7 misho 1410: /* main rpc loop */
1.10 misho 1411: schedRun(srv->srv_root, &srv->srv_kill);
1412:
1413: /* close all clients connections & server socket */
1.14 misho 1414: for (i = 0; i < array_Size(srv->srv_clients); i++) {
1415: c = array(srv->srv_clients, i, rpc_cli_t*);
1.10 misho 1416: if (c) {
1.24 misho 1417: if (srv->srv_proto == SOCK_STREAM) {
1418: shutdown(c->cli_sock, SHUT_RDWR);
1419: close(c->cli_sock);
1420: }
1.10 misho 1421:
1422: schedCancelby(srv->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
1.14 misho 1423: ait_freeVars(&RPC_RETVARS(c));
1.10 misho 1424: AIT_FREE_VAL(&c->cli_buf);
1.1 misho 1425: }
1.14 misho 1426: array_Del(srv->srv_clients, i, 42);
1.10 misho 1427: }
1.14 misho 1428: array_Destroy(&srv->srv_clients);
1.2 misho 1429:
1.24.2.4! misho 1430: if (srv->srv_proto != SOCK_EXT)
! 1431: close(srv->srv_server.cli_sock);
1.2 misho 1432:
1.10 misho 1433: /* detach exported calls */
1.12 misho 1434: RPC_FUNCS_LOCK(&srv->srv_funcs);
1435: while ((f = SLIST_FIRST(&srv->srv_funcs))) {
1436: SLIST_REMOVE_HEAD(&srv->srv_funcs, func_next);
1.1 misho 1437:
1.10 misho 1438: AIT_FREE_VAL(&f->func_name);
1.14 misho 1439: e_free(f);
1.1 misho 1440: }
1.12 misho 1441: srv->srv_funcs.avlh_root = NULL;
1442: RPC_FUNCS_UNLOCK(&srv->srv_funcs);
1.1 misho 1443:
1444: return 0;
1445: }
1446:
1447:
1448: /*
1449: * rpc_srv_execCall() Execute registered call from RPC server
1.7 misho 1450: *
1.10 misho 1451: * @cli = RPC client
1.1 misho 1452: * @rpc = IN RPC call structure
1.10 misho 1453: * @funcname = Execute RPC function
1.5 misho 1454: * @args = IN RPC calling arguments from RPC client
1.1 misho 1455: * return: -1 error, !=-1 ok
1456: */
1457: int
1.10 misho 1458: rpc_srv_execCall(rpc_cli_t * __restrict cli, struct tagRPCCall * __restrict rpc,
1459: ait_val_t funcname, array_t * __restrict args)
1.1 misho 1460: {
1461: rpc_callback_t func;
1462:
1.10 misho 1463: if (!cli || !rpc || !AIT_ADDR(&funcname)) {
1.7 misho 1464: rpc_SetErr(EINVAL, "Invalid parameter can`t exec function");
1.1 misho 1465: return -1;
1466: }
1467:
1.10 misho 1468: func = AIT_GET_LIKE(&funcname, rpc_callback_t);
1469: return func(cli, rpc, args);
1.1 misho 1470: }
1.24 misho 1471:
1472:
1473: /*
1474: * rpc_srv_initServer2() - Init & create layer2 RPC Server
1475: *
1476: * @InstID = Instance for authentication & recognition
1477: * @concurentClients = Concurent clients at same time to this server
1478: * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
1479: * @csIface = Interface name for bind server, if NULL first interface on host
1480: * return: NULL == error or !=NULL bind and created RPC server instance
1481: */
1482: rpc_srv_t *
1483: rpc_srv_initServer2(u_char InstID, int concurentClients, int netBuf, const char *csIface)
1484: {
1485: int n = 1;
1486: rpc_srv_t *srv = NULL;
1487: sockaddr_t sa = E_SOCKADDR_INIT;
1488: char szIface[64], szStr[STRSIZ];
1489: register int i;
1490: struct ifreq ifr;
1491: struct bpf_insn insns[] = {
1492: BPF_STMT(BPF_LD + BPF_H + BPF_ABS, 12),
1493: BPF_JUMP(BPF_JMP + BPF_JEQ + BPF_K, RPC_DEFPORT, 0, 1),
1494: BPF_STMT(BPF_RET + BPF_K, -1),
1495: BPF_STMT(BPF_RET + BPF_K, 0),
1496: };
1497: struct bpf_program fcode = {
1498: .bf_len = sizeof(insns) / sizeof(struct bpf_insn),
1499: .bf_insns = insns
1500: };
1501:
1502: if (!concurentClients) {
1503: rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
1504: return NULL;
1505: }
1506: if (!csIface) {
1507: if (e_get1stiface(szIface, sizeof szIface))
1508: return NULL;
1509: } else
1510: strlcpy(szIface, csIface, sizeof szIface);
1511: if (!e_getifacebyname(szIface, &sa))
1512: return NULL;
1513:
1514: #ifdef HAVE_SRANDOMDEV
1515: srandomdev();
1516: #else
1517: time_t tim;
1518:
1519: srandom((time(&tim) ^ getpid()));
1520: #endif
1521:
1522: srv = e_malloc(sizeof(rpc_srv_t));
1523: if (!srv) {
1524: LOGERR;
1525: return NULL;
1526: } else
1527: memset(srv, 0, sizeof(rpc_srv_t));
1528:
1529: srv->srv_proto = SOCK_BPF;
1530: srv->srv_netbuf = netBuf;
1531: srv->srv_session.sess_version = RPC_VERSION;
1532: srv->srv_session.sess_instance = InstID;
1533:
1534: srv->srv_server.cli_parent = srv;
1535: memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa);
1536:
1537: /* init functions */
1538: pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
1539: SLIST_INIT(&srv->srv_funcs);
1540: AVL_INIT(&srv->srv_funcs);
1541:
1542: /* init scheduler */
1543: srv->srv_root = schedBegin();
1544: if (!srv->srv_root) {
1545: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1546: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1547: e_free(srv);
1548: return NULL;
1549: }
1550:
1551: /* init pool for clients */
1552: srv->srv_clients = array_Init(concurentClients);
1553: if (!srv->srv_clients) {
1554: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
1555: schedEnd(&srv->srv_root);
1556: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1557: e_free(srv);
1558: return NULL;
1559: }
1560:
1561: /* create server handler */
1562: for (i = 0; i < 10; i++) {
1563: memset(szStr, 0, sizeof szStr);
1564: snprintf(szStr, sizeof szStr, "/dev/bpf%d", i);
1565: srv->srv_server.cli_sock = open(szStr, O_RDWR);
1566: if (srv->srv_server.cli_sock > STDERR_FILENO)
1567: break;
1568: }
1569: if (srv->srv_server.cli_sock < 3) {
1570: LOGERR;
1571: array_Destroy(&srv->srv_clients);
1572: schedEnd(&srv->srv_root);
1573: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1574: e_free(srv);
1575: return NULL;
1576: }
1577:
1578: if (ioctl(srv->srv_server.cli_sock, BIOCIMMEDIATE, &n) == -1) {
1579: LOGERR;
1580: goto err;
1581: }
1582: if (ioctl(srv->srv_server.cli_sock, BIOCSETF, &fcode) == -1) {
1583: LOGERR;
1584: goto err;
1585: }
1586: n = (netBuf < RPC_MIN_BUFSIZ) ? getpagesize() : E_ALIGN(netBuf, 2);
1587: if (ioctl(srv->srv_server.cli_sock, BIOCSBLEN, &n) == -1) {
1588: LOGERR;
1589: goto err;
1590: } else
1591: srv->srv_netbuf = n;
1592:
1593: memset(&ifr, 0, sizeof ifr);
1594: strlcpy(ifr.ifr_name, szIface, sizeof ifr.ifr_name);
1595: if (ioctl(srv->srv_server.cli_sock, BIOCSETIF, &ifr) == -1) {
1596: LOGERR;
1597: goto err;
1598: } else
1599: fcntl(srv->srv_server.cli_sock, F_SETFL,
1600: fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
1601:
1602: rpc_register_srvPing(srv);
1603:
1604: return srv;
1605: err: /* error condition */
1606: close(srv->srv_server.cli_sock);
1607: array_Destroy(&srv->srv_clients);
1608: schedEnd(&srv->srv_root);
1609: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1610: e_free(srv);
1611: return NULL;
1612: }
1.24.2.3 misho 1613:
1614: /*
1615: * rpc_srv_initServerExt() - Init & create pipe RPC Server
1616: *
1617: * @InstID = Instance for authentication & recognition
1618: * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
1619: * @fd = File descriptor
1620: * return: NULL == error or !=NULL bind and created RPC server instance
1621: */
1622: rpc_srv_t *
1623: rpc_srv_initServerExt(u_char InstID, int netBuf, int fd)
1624: {
1625: rpc_srv_t *srv = NULL;
1626:
1627: #ifdef HAVE_SRANDOMDEV
1628: srandomdev();
1629: #else
1630: time_t tim;
1631:
1632: srandom((time(&tim) ^ getpid()));
1633: #endif
1634:
1635: srv = e_malloc(sizeof(rpc_srv_t));
1636: if (!srv) {
1637: LOGERR;
1638: return NULL;
1639: } else
1640: memset(srv, 0, sizeof(rpc_srv_t));
1641:
1642: srv->srv_proto = SOCK_EXT;
1643: srv->srv_netbuf = netBuf;
1644: srv->srv_session.sess_version = RPC_VERSION;
1645: srv->srv_session.sess_instance = InstID;
1646:
1647: srv->srv_server.cli_parent = srv;
1648: srv->srv_server.cli_sock = fd;
1649:
1650: /* init functions */
1651: pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
1652: SLIST_INIT(&srv->srv_funcs);
1653: AVL_INIT(&srv->srv_funcs);
1654:
1655: /* init scheduler */
1656: srv->srv_root = schedBegin();
1657: if (!srv->srv_root) {
1658: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1659: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1660: e_free(srv);
1661: return NULL;
1662: }
1663:
1664: /* init pool for clients */
1665: srv->srv_clients = array_Init(1);
1666: if (!srv->srv_clients) {
1667: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
1668: schedEnd(&srv->srv_root);
1669: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1670: e_free(srv);
1671: return NULL;
1672: }
1673:
1674: fcntl(srv->srv_server.cli_sock, F_SETFL,
1675: fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
1676:
1677: rpc_register_srvPing(srv);
1678:
1679: return srv;
1680: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>