Annotation of libaitrpc/src/srv.c, revision 1.22.6.1
1.1 misho 1: /*************************************************************************
2: * (C) 2010 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.22.6.1! misho 6: * $Id: srv.c,v 1.22 2013/11/15 09:55:53 misho Exp $
1.1 misho 7: *
1.2 misho 8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
1.14 misho 15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013
1.2 misho 16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
1.1 misho 46: #include "global.h"
47:
48:
1.13 misho 49: /* SOCK_STREAM */
50: static void *acceptClients(sched_task_t *);
51: static void *closeClient(sched_task_t *);
52: static void *rxPacket(sched_task_t *);
53: static void *txPacket(sched_task_t *);
54:
55: /* SOCK_DGRAM */
56: static void *freeClient(sched_task_t *);
57: static void *rxUDPPacket(sched_task_t *);
58: static void *txUDPPacket(sched_task_t *);
59:
60: /* SOCK_RAW */
61:
62: static sched_task_func_t cbProto[SOCK_RAW + 1][4] = {
63: { acceptClients, closeClient, rxPacket, txPacket }, /* SOCK_STREAM */
64: { acceptClients, closeClient, rxPacket, txPacket }, /* SOCK_STREAM */
65: { rxUDPPacket, freeClient, rxUDPPacket, txUDPPacket }, /* SOCK_DGRAM */
66: { NULL, NULL, NULL, NULL } /* SOCK_RAW */
67: };
68:
1.22.6.1! misho 69: /* Global Signal Argument when kqueue support disabled */
! 70:
! 71: static volatile uintptr_t _glSigArg = 0;
! 72:
1.13 misho 73:
1.16 misho 74: void
1.13 misho 75: rpc_freeCli(rpc_cli_t * __restrict c)
1.10 misho 76: {
77: rpc_srv_t *s = c->cli_parent;
78:
1.13 misho 79: schedCancelby(s->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
1.10 misho 80:
81: /* free buffer */
82: AIT_FREE_VAL(&c->cli_buf);
83:
1.14 misho 84: array_Del(s->srv_clients, c->cli_id, 0);
1.10 misho 85: if (c)
1.14 misho 86: e_free(c);
1.13 misho 87: }
88:
89:
90: static inline int
1.14 misho 91: _check4freeslot(rpc_srv_t * __restrict srv, sockaddr_t * __restrict sa)
1.13 misho 92: {
93: rpc_cli_t *c = NULL;
94: register int i;
95:
96: /* check free slots for connect */
1.14 misho 97: for (i = 0; i < array_Size(srv->srv_clients) &&
98: (c = array(srv->srv_clients, i, rpc_cli_t*)); i++)
1.13 misho 99: /* check for duplicates */
1.14 misho 100: if (sa && !e_addrcmp(&c->cli_sa, sa, 42))
1.13 misho 101: break;
1.14 misho 102: if (i >= array_Size(srv->srv_clients))
1.13 misho 103: return -1; /* no more free slots! */
104:
105: return i;
106: }
107:
108: static rpc_cli_t *
1.14 misho 109: _allocClient(rpc_srv_t * __restrict srv, sockaddr_t * __restrict sa)
1.13 misho 110: {
111: rpc_cli_t *c = NULL;
112: int n;
113:
114: n = _check4freeslot(srv, sa);
115: if (n == -1)
116: return NULL;
117: else
1.14 misho 118: c = array(srv->srv_clients, n, rpc_cli_t*);
1.13 misho 119:
120: if (!c) {
1.14 misho 121: c = e_malloc(sizeof(rpc_cli_t));
1.13 misho 122: if (!c) {
123: LOGERR;
124: srv->srv_kill = 1;
125: return NULL;
126: } else {
127: memset(c, 0, sizeof(rpc_cli_t));
1.14 misho 128: array_Set(srv->srv_clients, n, c);
1.13 misho 129: c->cli_id = n;
130: c->cli_parent = srv;
131: }
132:
133: /* alloc empty buffer */
1.14 misho 134: AIT_SET_BUFSIZ(&c->cli_buf, 0, srv->srv_netbuf);
1.13 misho 135: }
136:
137: return c;
138: }
139:
140:
141: static void *
142: freeClient(sched_task_t *task)
143: {
144: rpc_freeCli(TASK_ARG(task));
145:
146: return NULL;
147: }
148:
149: static void *
150: closeClient(sched_task_t *task)
151: {
152: int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock;
153:
154: rpc_freeCli(TASK_ARG(task));
155:
156: /* close client socket */
157: shutdown(sock, SHUT_RDWR);
158: close(sock);
1.10 misho 159: return NULL;
160: }
1.7 misho 161:
162: static void *
163: txPacket(sched_task_t *task)
164: {
165: rpc_cli_t *c = TASK_ARG(task);
166: rpc_srv_t *s = c->cli_parent;
167: rpc_func_t *f = NULL;
1.18 misho 168: u_char *buf = AIT_GET_BUF(&c->cli_buf);
1.7 misho 169: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
1.18 misho 170: int ret, estlen, wlen = sizeof(struct tagRPCCall);
1.19 misho 171: struct pollfd pfd;
1.21 misho 172: #ifdef TCP_SESSION_TIMEOUT
173: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
174:
175: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
176: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
177: TASK_ARG(task), ts, TASK_ARG(task), 0);
178: #endif
1.7 misho 179:
180: if (rpc->call_argc) {
1.10 misho 181: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
1.7 misho 182: if (!f) {
1.10 misho 183: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
1.18 misho 184:
1.7 misho 185: rpc->call_argc ^= rpc->call_argc;
186: rpc->call_rep.ret = RPC_ERROR(-1);
187: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
188: } else {
1.18 misho 189: /* calc estimated length */
190: estlen = ait_resideVars(RPC_RETVARS(c)) + wlen;
191: if (estlen > AIT_LEN(&c->cli_buf))
192: AIT_RE_BUF(&c->cli_buf, estlen);
193: buf = AIT_GET_BUF(&c->cli_buf);
1.19 misho 194: rpc = (struct tagRPCCall*) buf;
1.18 misho 195:
1.14 misho 196: rpc->call_argc = htons(array_Size(RPC_RETVARS(c)));
1.7 misho 197: /* Go Encapsulate variables */
1.18 misho 198: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
199: RPC_RETVARS(c));
1.10 misho 200: /* Free return values */
1.14 misho 201: ait_freeVars(&c->cli_vars);
1.7 misho 202: if (ret == -1) {
203: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
1.19 misho 204:
1.7 misho 205: rpc->call_argc ^= rpc->call_argc;
206: rpc->call_rep.ret = RPC_ERROR(-1);
207: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
208: } else
209: wlen += ret;
210: }
211: }
212:
1.18 misho 213: rpc->call_len = htonl(wlen);
1.8 misho 214:
1.15 misho 215: #if 0
1.7 misho 216: /* calculate CRC */
217: rpc->call_crc ^= rpc->call_crc;
1.8 misho 218: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
1.15 misho 219: #endif
1.7 misho 220:
221: /* send reply */
1.19 misho 222: pfd.fd = TASK_FD(task);
223: pfd.events = POLLOUT;
224: for (; wlen > 0; wlen -= ret, buf += ret) {
225: if ((ret = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 ||
226: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
227: if (ret)
228: LOGERR;
229: else
1.22 misho 230: rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
1.19 misho 231: /* close connection */
232: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
233: TASK_ARG(task), 0, NULL, 0);
234: return NULL;
235: }
236: ret = send(TASK_FD(task), buf, MIN(wlen, s->srv_netbuf), MSG_NOSIGNAL);
237: if (ret == -1) {
238: /* close connection */
239: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
240: TASK_ARG(task), 0, NULL, 0);
241: return NULL;
242: }
1.10 misho 243: }
1.7 misho 244:
245: return NULL;
246: }
247:
248: static void *
249: execCall(sched_task_t *task)
250: {
251: rpc_cli_t *c = TASK_ARG(task);
252: rpc_srv_t *s = c->cli_parent;
253: rpc_func_t *f = NULL;
254: array_t *arr = NULL;
1.18 misho 255: u_char *buf = AIT_GET_BUF(&c->cli_buf);
1.7 misho 256: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
257: int argc = ntohs(rpc->call_argc);
258:
259: /* Go decapsulate variables ... */
1.8 misho 260: if (argc) {
1.14 misho 261: arr = ait_buffer2vars(buf + sizeof(struct tagRPCCall),
1.18 misho 262: AIT_LEN(&c->cli_buf) - sizeof(struct tagRPCCall), argc, 42);
1.7 misho 263: if (!arr) {
1.14 misho 264: rpc_SetErr(ERPCMISMATCH, "#%d - %s", elwix_GetErrno(), elwix_GetError());
1.18 misho 265:
1.7 misho 266: rpc->call_argc ^= rpc->call_argc;
267: rpc->call_rep.ret = RPC_ERROR(-1);
268: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
269: return NULL;
270: }
1.10 misho 271: } else
272: arr = NULL;
1.7 misho 273:
1.10 misho 274: if (!(f = rpc_srv_getCall(s, ntohs(rpc->call_tag)))) {
1.7 misho 275: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
1.18 misho 276:
1.7 misho 277: rpc->call_argc ^= rpc->call_argc;
278: rpc->call_rep.ret = RPC_ERROR(-1);
279: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
280: } else {
1.8 misho 281: /* if client doesn't want reply */
1.10 misho 282: argc = RPC_CHK_NOREPLY(rpc);
283: rpc->call_rep.ret = RPC_ERROR(rpc_srv_execCall(c, rpc, f->func_name, arr));
1.7 misho 284: if (rpc->call_rep.ret == htonl(-1)) {
1.20 misho 285: if (!rpc->call_rep.eno) {
286: LOGERR;
287: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
288: }
1.7 misho 289: rpc->call_argc ^= rpc->call_argc;
290: } else {
291: rpc->call_rep.eno ^= rpc->call_rep.eno;
1.8 misho 292: if (argc) {
293: /* without reply */
1.14 misho 294: ait_freeVars(&c->cli_vars);
1.8 misho 295: rpc->call_argc ^= rpc->call_argc;
1.10 misho 296: } else {
297: /* reply */
1.14 misho 298: rpc->call_argc = htons(array_Size(RPC_RETVARS(c)));
1.10 misho 299: }
1.7 misho 300: }
301: }
302:
1.14 misho 303: array_Destroy(&arr);
1.7 misho 304: return NULL;
305: }
306:
307: static void *
308: rxPacket(sched_task_t *task)
309: {
310: rpc_cli_t *c = TASK_ARG(task);
311: rpc_srv_t *s = c->cli_parent;
1.18 misho 312: int len, rlen, noreply, estlen;
1.15 misho 313: #if 0
314: u_short crc;
315: #endif
1.10 misho 316: u_char *buf = AIT_GET_BUF(&c->cli_buf);
1.18 misho 317: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
1.19 misho 318: struct pollfd pfd;
1.21 misho 319: #ifdef TCP_SESSION_TIMEOUT
320: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
321:
322: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
323: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
324: TASK_ARG(task), ts, TASK_ARG(task), 0);
325: #endif
1.7 misho 326:
1.18 misho 327: memset(buf, 0, sizeof(struct tagRPCCall));
1.19 misho 328: rlen = recv(TASK_FD(task), rpc, sizeof(struct tagRPCCall), MSG_PEEK);
1.18 misho 329: if (rlen < sizeof(struct tagRPCCall)) {
1.10 misho 330: /* close connection */
1.13 misho 331: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
332: TASK_ARG(task), 0, NULL, 0);
1.7 misho 333: return NULL;
1.10 misho 334: } else {
1.18 misho 335: estlen = ntohl(rpc->call_len);
336: if (estlen > AIT_LEN(&c->cli_buf))
337: AIT_RE_BUF(&c->cli_buf, estlen);
338: rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
1.19 misho 339: buf = AIT_GET_BUF(&c->cli_buf);
340: len = estlen;
1.18 misho 341: }
342:
1.19 misho 343: /* get next part of packet */
344: memset(buf, 0, len);
345: pfd.fd = TASK_FD(task);
346: pfd.events = POLLIN | POLLPRI;
347: for (; len > 0; len -= rlen, buf += rlen) {
348: if ((rlen = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 ||
349: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
350: if (rlen)
351: LOGERR;
352: else
1.22 misho 353: rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
1.19 misho 354: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
355: TASK_ARG(task), 0, NULL, 0);
356: return NULL;
357: }
1.18 misho 358: rlen = recv(TASK_FD(task), buf, len, 0);
359: if (rlen == -1) {
360: /* close connection */
361: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
362: TASK_ARG(task), 0, NULL, 0);
1.8 misho 363: return NULL;
1.10 misho 364: }
1.18 misho 365: }
1.22 misho 366: len = estlen;
1.8 misho 367:
1.15 misho 368: #if 0
1.18 misho 369: /* check integrity of packet */
370: crc = ntohs(rpc->call_crc);
371: rpc->call_crc ^= rpc->call_crc;
372: if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
373: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
374: return NULL;
375: }
1.15 misho 376: #endif
1.7 misho 377:
1.18 misho 378: noreply = RPC_CHK_NOREPLY(rpc);
1.7 misho 379:
1.18 misho 380: /* check RPC packet session info */
381: if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) {
382: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
1.7 misho 383:
1.18 misho 384: rpc->call_argc ^= rpc->call_argc;
385: rpc->call_rep.ret = RPC_ERROR(-1);
386: rpc->call_rep.eno = RPC_ERROR(errno);
387: } else {
388: /* execute RPC call */
389: schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), (int) noreply, rpc, len);
390: }
1.7 misho 391:
1.18 misho 392: /* send RPC reply */
393: if (!noreply)
394: schedWrite(TASK_ROOT(task), cbProto[s->srv_proto][CB_TXPACKET],
395: TASK_ARG(task), TASK_FD(task), rpc, len);
1.7 misho 396:
397: /* lets get next packet */
1.18 misho 398: schedReadSelf(task);
1.7 misho 399: return NULL;
400: }
401:
1.1 misho 402: static void *
1.10 misho 403: acceptClients(sched_task_t *task)
1.1 misho 404: {
1.10 misho 405: rpc_srv_t *srv = TASK_ARG(task);
406: rpc_cli_t *c = NULL;
1.14 misho 407: socklen_t salen = sizeof(sockaddr_t);
1.21 misho 408: int sock;
409: #ifdef TCP_SESSION_TIMEOUT
410: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
411: #endif
1.7 misho 412:
1.13 misho 413: c = _allocClient(srv, NULL);
1.21 misho 414: if (!c) {
415: EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
416: if ((sock = accept(TASK_FD(task), NULL, NULL)) != -1) {
417: shutdown(sock, SHUT_RDWR);
418: close(sock);
419: }
1.10 misho 420: goto end;
1.21 misho 421: }
1.10 misho 422:
423: /* accept client */
424: c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
425: if (c->cli_sock == -1) {
426: LOGERR;
427: AIT_FREE_VAL(&c->cli_buf);
1.14 misho 428: array_Del(srv->srv_clients, c->cli_id, 42);
1.10 misho 429: goto end;
1.1 misho 430: } else
1.10 misho 431: fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
1.1 misho 432:
1.21 misho 433: #ifdef TCP_SESSION_TIMEOUT
434: /* armed timer for close stateless connection */
435: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
436: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], c,
437: ts, c, 0);
438: #endif
1.13 misho 439: schedRead(TASK_ROOT(task), cbProto[srv->srv_proto][CB_RXPACKET], c,
440: c->cli_sock, NULL, 0);
1.10 misho 441: end:
442: schedReadSelf(task);
443: return NULL;
444: }
1.5 misho 445:
1.7 misho 446:
1.10 misho 447: static void *
1.13 misho 448: txUDPPacket(sched_task_t *task)
1.10 misho 449: {
450: rpc_cli_t *c = TASK_ARG(task);
451: rpc_srv_t *s = c->cli_parent;
1.13 misho 452: rpc_func_t *f = NULL;
1.18 misho 453: u_char *buf = AIT_GET_BUF(&c->cli_buf);
1.13 misho 454: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
1.22 misho 455: int ret, estlen, wlen = sizeof(struct tagRPCCall);
1.13 misho 456: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
1.22 misho 457: struct pollfd pfd;
1.13 misho 458:
459: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
460: schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
461: TASK_ARG(task), ts, TASK_ARG(task), 0);
462:
463: if (rpc->call_argc) {
464: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
465: if (!f) {
466: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
467: rpc->call_argc ^= rpc->call_argc;
468: rpc->call_rep.ret = RPC_ERROR(-1);
469: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
470: } else {
1.22 misho 471: /* calc estimated length */
472: estlen = ait_resideVars(RPC_RETVARS(c)) + wlen;
473: if (estlen > AIT_LEN(&c->cli_buf))
474: AIT_RE_BUF(&c->cli_buf, estlen);
475: buf = AIT_GET_BUF(&c->cli_buf);
476: rpc = (struct tagRPCCall*) buf;
477:
1.14 misho 478: rpc->call_argc = htons(array_Size(RPC_RETVARS(c)));
1.13 misho 479: /* Go Encapsulate variables */
1.18 misho 480: ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen,
481: RPC_RETVARS(c));
1.13 misho 482: /* Free return values */
1.14 misho 483: ait_freeVars(&c->cli_vars);
1.13 misho 484: if (ret == -1) {
485: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
486: rpc->call_argc ^= rpc->call_argc;
487: rpc->call_rep.ret = RPC_ERROR(-1);
488: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
489: } else
490: wlen += ret;
491: }
492: }
1.7 misho 493:
1.18 misho 494: rpc->call_len = htonl(wlen);
1.7 misho 495:
1.13 misho 496: /* calculate CRC */
497: rpc->call_crc ^= rpc->call_crc;
498: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
499:
500: /* send reply */
1.22 misho 501: pfd.fd = TASK_FD(task);
502: pfd.events = POLLOUT;
503: for (; wlen > 0; wlen -= ret, buf += ret) {
504: if ((ret = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 ||
505: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
506: if (ret)
507: LOGERR;
508: else
509: rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
510: /* close connection */
511: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
512: TASK_ARG(task), 0, NULL, 0);
513: return NULL;
514: }
515: ret = sendto(TASK_FD(task), buf, MIN(wlen, s->srv_netbuf), MSG_NOSIGNAL,
516: &c->cli_sa.sa, c->cli_sa.sa.sa_len);
517: if (ret == -1) {
518: /* close connection */
519: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
520: TASK_ARG(task), 0, NULL, 0);
521: return NULL;
522: }
1.13 misho 523: }
524:
525: return NULL;
526: }
527:
528: static void *
529: rxUDPPacket(sched_task_t *task)
530: {
531: rpc_srv_t *srv = TASK_ARG(task);
532: rpc_cli_t *c = NULL;
1.22 misho 533: int len, rlen, noreply, estlen;
534: u_short crc;
535: u_char *buf, b[sizeof(struct tagRPCCall)];
536: struct tagRPCCall *rpc = (struct tagRPCCall*) b;
1.14 misho 537: sockaddr_t sa;
538: socklen_t salen;
1.13 misho 539: struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
1.22 misho 540: struct pollfd pfd;
1.13 misho 541:
542: /* receive connect packet */
1.14 misho 543: salen = sa.ss.ss_len = sizeof(sockaddr_t);
1.22 misho 544: rlen = recvfrom(TASK_FD(task), b, sizeof b, MSG_PEEK, &sa.sa, &salen);
545: if (rlen < sizeof(struct tagRPCCall)) {
546: rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
1.13 misho 547: goto end;
1.22 misho 548: }
1.13 misho 549:
550: c = _allocClient(srv, &sa);
1.22 misho 551: if (!c) {
552: EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
553: usleep(2000); /* blocked client delay */
1.13 misho 554: goto end;
1.22 misho 555: } else {
556: estlen = ntohl(rpc->call_len);
557: if (estlen > AIT_LEN(&c->cli_buf))
558: AIT_RE_BUF(&c->cli_buf, estlen);
559: rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
560: buf = AIT_GET_BUF(&c->cli_buf);
561: len = estlen;
562:
1.13 misho 563: c->cli_sock = TASK_FD(task);
564: memcpy(&c->cli_sa, &sa, sizeof c->cli_sa);
1.22 misho 565:
1.13 misho 566: /* armed timer for close stateless connection */
567: schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
568: schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
569: c, ts, c, 0);
570: }
571:
1.22 misho 572: /* get next part of packet */
573: memset(buf, 0, len);
574: pfd.fd = TASK_FD(task);
575: pfd.events = POLLIN | POLLPRI;
576: for (; len > 0; len -= rlen, buf += rlen) {
577: if ((rlen = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 ||
578: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
579: if (rlen)
580: LOGERR;
581: else
582: rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
583: schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
584: c, 0, NULL, 0);
585: return NULL;
1.13 misho 586: }
1.22 misho 587: salen = sa.ss.ss_len = sizeof(sockaddr_t);
588: rlen = recvfrom(TASK_FD(task), buf, len, 0, &sa.sa, &salen);
589: if (rlen == -1) {
590: /* close connection */
591: schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],
592: c, 0, NULL, 0);
593: return NULL;
1.13 misho 594: }
1.22 misho 595: if (e_addrcmp(&c->cli_sa, &sa, 42))
596: rlen ^= rlen; /* skip if arrive from different address */
597: }
598: len = estlen;
1.13 misho 599:
1.22 misho 600: /* check integrity of packet */
601: crc = ntohs(rpc->call_crc);
602: rpc->call_crc ^= rpc->call_crc;
603: if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
604: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
605: return NULL;
606: }
1.13 misho 607:
1.22 misho 608: noreply = RPC_CHK_NOREPLY(rpc);
1.18 misho 609:
1.22 misho 610: /* check RPC packet session info */
611: if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
612: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
1.13 misho 613:
1.22 misho 614: rpc->call_argc ^= rpc->call_argc;
615: rpc->call_rep.ret = RPC_ERROR(-1);
616: rpc->call_rep.eno = RPC_ERROR(errno);
617: } else {
618: /* execute RPC call */
619: schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
620: }
1.13 misho 621:
1.22 misho 622: /* send RPC reply */
623: if (!noreply)
624: schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],
625: c, TASK_FD(task), rpc, len);
1.13 misho 626: end:
627: schedReadSelf(task);
628: return NULL;
629: }
630:
631: /* ------------------------------------------------------ */
632:
1.16 misho 633: void
1.13 misho 634: rpc_freeBLOBCli(rpc_cli_t * __restrict c)
635: {
636: rpc_srv_t *s = c->cli_parent;
637:
638: schedCancelby(s->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
1.10 misho 639:
640: /* free buffer */
641: AIT_FREE_VAL(&c->cli_buf);
642:
1.14 misho 643: array_Del(s->srv_blob.clients, c->cli_id, 0);
1.10 misho 644: if (c)
1.14 misho 645: e_free(c);
1.13 misho 646: }
647:
648:
649: static void *
650: closeBLOBClient(sched_task_t *task)
651: {
652: int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock;
653:
654: rpc_freeBLOBCli(TASK_ARG(task));
655:
656: /* close client socket */
657: shutdown(sock, SHUT_RDWR);
658: close(sock);
1.7 misho 659: return NULL;
660: }
661:
662: static void *
663: txBLOB(sched_task_t *task)
664: {
1.10 misho 665: rpc_cli_t *c = TASK_ARG(task);
666: u_char *buf = AIT_GET_BUF(&c->cli_buf);
1.7 misho 667: int wlen = sizeof(struct tagBLOBHdr);
668:
669: /* send reply */
1.10 misho 670: wlen = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);
671: if (wlen == -1 || wlen != sizeof(struct tagBLOBHdr)) {
672: /* close blob connection */
673: schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
674: }
1.7 misho 675:
676: return NULL;
677: }
1.4 misho 678:
1.7 misho 679: static void *
680: rxBLOB(sched_task_t *task)
681: {
682: rpc_cli_t *c = TASK_ARG(task);
683: rpc_srv_t *s = c->cli_parent;
684: rpc_blob_t *b;
1.10 misho 685: struct tagBLOBHdr blob;
1.7 misho 686: int rlen;
687:
1.10 misho 688: memset(&blob, 0, sizeof blob);
689: rlen = recv(TASK_FD(task), &blob, sizeof blob, 0);
690: if (rlen < 1) {
691: /* close blob connection */
692: schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
1.7 misho 693: return NULL;
694: }
1.5 misho 695:
1.10 misho 696: /* check BLOB packet */
697: if (rlen < sizeof(struct tagBLOBHdr)) {
698: rpc_SetErr(ERPCMISMATCH, "Short BLOB packet");
1.6 misho 699:
1.10 misho 700: schedReadSelf(task);
1.7 misho 701: return NULL;
702: }
1.1 misho 703:
1.7 misho 704: /* check RPC packet session info */
1.15 misho 705: if (rpc_chkPktSession(&blob.hdr_session, &s->srv_session)) {
1.7 misho 706: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
1.10 misho 707: blob.hdr_cmd = error;
1.7 misho 708: goto end;
709: }
710:
711: /* Go to proceed packet ... */
1.10 misho 712: switch (blob.hdr_cmd) {
1.7 misho 713: case get:
1.10 misho 714: if (!(b = rpc_srv_getBLOB(s, ntohl(blob.hdr_var)))) {
715: rpc_SetErr(EINVAL, "Var=%x not found", ntohl(blob.hdr_var));
716: blob.hdr_cmd = no;
717: blob.hdr_ret = RPC_ERROR(-1);
1.7 misho 718: break;
719: } else
1.10 misho 720: blob.hdr_len = htonl(b->blob_len);
1.5 misho 721:
1.7 misho 722: if (rpc_srv_blobMap(s, b) != -1) {
723: /* deliver BLOB variable to client */
1.10 misho 724: blob.hdr_ret = htonl(rpc_srv_sendBLOB(c, b));
1.7 misho 725: rpc_srv_blobUnmap(b);
726: } else {
1.10 misho 727: blob.hdr_cmd = error;
728: blob.hdr_ret = RPC_ERROR(-1);
1.7 misho 729: }
730: break;
731: case set:
1.17 misho 732: if ((b = rpc_srv_registerBLOB(s, ntohl(blob.hdr_len),
733: ntohl(blob.hdr_ret)))) {
1.7 misho 734: /* set new BLOB variable for reply :) */
1.10 misho 735: blob.hdr_var = htonl(b->blob_var);
1.7 misho 736:
737: /* receive BLOB from client */
1.10 misho 738: blob.hdr_ret = htonl(rpc_srv_recvBLOB(c, b));
1.7 misho 739: rpc_srv_blobUnmap(b);
1.5 misho 740: } else {
1.10 misho 741: blob.hdr_cmd = error;
742: blob.hdr_ret = RPC_ERROR(-1);
1.7 misho 743: }
744: break;
745: case unset:
1.11 misho 746: if (rpc_srv_unregisterBLOB(s, ntohl(blob.hdr_var)) == -1) {
1.10 misho 747: blob.hdr_cmd = error;
748: blob.hdr_ret = RPC_ERROR(-1);
1.1 misho 749: }
750: break;
1.7 misho 751: default:
1.10 misho 752: rpc_SetErr(EPROCUNAVAIL, "Unsupported BLOB command %d", blob.hdr_cmd);
753: blob.hdr_cmd = error;
754: blob.hdr_ret = RPC_ERROR(-1);
1.7 misho 755: }
1.1 misho 756:
1.7 misho 757: end:
1.10 misho 758: memcpy(AIT_ADDR(&c->cli_buf), &blob, sizeof blob);
759: schedWrite(TASK_ROOT(task), txBLOB, TASK_ARG(task), TASK_FD(task), NULL, 0);
760: schedReadSelf(task);
1.7 misho 761: return NULL;
1.2 misho 762: }
763:
764: static void *
1.17 misho 765: flushBLOB(sched_task_t *task)
766: {
1.22.6.1! misho 767: uintptr_t sigArg = atomic_load_acq_ptr(&_glSigArg);
! 768: rpc_srv_t *srv = sigArg ? (void*) sigArg : TASK_ARG(task);
1.17 misho 769: rpc_blob_t *b, *tmp;
770:
771: TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
772: TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
773:
774: rpc_srv_blobFree(srv, b);
775: e_free(b);
776: }
777:
1.22.6.1! misho 778: if (!schedSignalSelf(task)) {
! 779: /* disabled kqueue support in libaitsched */
! 780: struct sigaction sa;
! 781:
! 782: memset(&sa, 0, sizeof sa);
! 783: sigemptyset(&sa.sa_mask);
! 784: sa.sa_handler = (void (*)(int)) flushBLOB;
! 785: sa.sa_flags = SA_RESTART | SA_RESETHAND;
! 786: sigaction(SIGFBLOB, &sa, NULL);
! 787: }
! 788:
1.17 misho 789: return NULL;
790: }
791:
792: static void *
1.10 misho 793: acceptBLOBClients(sched_task_t *task)
1.2 misho 794: {
1.10 misho 795: rpc_srv_t *srv = TASK_ARG(task);
796: rpc_cli_t *c = NULL;
797: register int i;
1.14 misho 798: socklen_t salen = sizeof(sockaddr_t);
1.21 misho 799: int sock;
1.12 misho 800: #ifdef TCP_NOPUSH
801: int n = 1;
802: #endif
1.7 misho 803:
1.10 misho 804: /* check free slots for connect */
1.14 misho 805: for (i = 0; i < array_Size(srv->srv_blob.clients) &&
806: (c = array(srv->srv_blob.clients, i, rpc_cli_t*)); i++);
1.21 misho 807: if (c) { /* no more free slots! */
808: EVERBOSE(1, "BLOB client quota exceeded! Connection will be shutdown!\n");
809: if ((sock = accept(TASK_FD(task), NULL, NULL)) != -1) {
810: shutdown(sock, SHUT_RDWR);
811: close(sock);
812: }
1.10 misho 813: goto end;
1.21 misho 814: }
815:
1.14 misho 816: c = e_malloc(sizeof(rpc_cli_t));
1.10 misho 817: if (!c) {
1.7 misho 818: LOGERR;
1.10 misho 819: srv->srv_kill = srv->srv_blob.kill = 1;
1.7 misho 820: return NULL;
821: } else {
1.10 misho 822: memset(c, 0, sizeof(rpc_cli_t));
1.14 misho 823: array_Set(srv->srv_blob.clients, i, c);
1.10 misho 824: c->cli_id = i;
825: c->cli_parent = srv;
1.7 misho 826: }
1.4 misho 827:
1.10 misho 828: /* alloc empty buffer */
1.14 misho 829: AIT_SET_BUFSIZ(&c->cli_buf, 0, srv->srv_netbuf);
1.2 misho 830:
1.10 misho 831: /* accept client */
832: c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
833: if (c->cli_sock == -1) {
834: LOGERR;
835: AIT_FREE_VAL(&c->cli_buf);
1.14 misho 836: array_Del(srv->srv_blob.clients, i, 42);
1.10 misho 837: goto end;
1.12 misho 838: } else {
839: #ifdef TCP_NOPUSH
840: setsockopt(c->cli_sock, IPPROTO_TCP, TCP_NOPUSH, &n, sizeof n);
841: #endif
1.10 misho 842: fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
1.12 misho 843: }
1.2 misho 844:
1.10 misho 845: schedRead(TASK_ROOT(task), rxBLOB, c, c->cli_sock, NULL, 0);
846: end:
847: schedReadSelf(task);
1.7 misho 848: return NULL;
1.1 misho 849: }
850:
1.10 misho 851: /* ------------------------------------------------------ */
1.1 misho 852:
853: /*
1.7 misho 854: * rpc_srv_initBLOBServer() - Init & create BLOB Server
855: *
1.4 misho 856: * @srv = RPC server instance
1.2 misho 857: * @Port = Port for bind server, if Port == 0 default port is selected
858: * @diskDir = Disk place for BLOB file objects
859: * return: -1 == error or 0 bind and created BLOB server instance
860: */
861: int
862: rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)
863: {
864: int n = 1;
865:
1.10 misho 866: if (!srv || srv->srv_kill) {
1.7 misho 867: rpc_SetErr(EINVAL, "Invalid parameters can`t init BLOB server");
1.2 misho 868: return -1;
869: }
870:
871: memset(&srv->srv_blob, 0, sizeof srv->srv_blob);
872: if (access(diskDir, R_OK | W_OK) == -1) {
873: LOGERR;
874: return -1;
875: } else
1.7 misho 876: AIT_SET_STR(&srv->srv_blob.dir, diskDir);
1.2 misho 877:
1.10 misho 878: /* init blob list */
879: TAILQ_INIT(&srv->srv_blob.blobs);
880:
1.2 misho 881: srv->srv_blob.server.cli_parent = srv;
1.4 misho 882:
1.14 misho 883: memcpy(&srv->srv_blob.server.cli_sa, &srv->srv_server.cli_sa, sizeof(sockaddr_t));
1.10 misho 884: switch (srv->srv_blob.server.cli_sa.sa.sa_family) {
1.4 misho 885: case AF_INET:
1.10 misho 886: srv->srv_blob.server.cli_sa.sin.sin_port =
887: htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin.sin_port) + 1);
1.4 misho 888: break;
889: case AF_INET6:
1.10 misho 890: srv->srv_blob.server.cli_sa.sin6.sin6_port =
891: htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin6.sin6_port) + 1);
1.4 misho 892: break;
893: case AF_LOCAL:
1.10 misho 894: strlcat(srv->srv_blob.server.cli_sa.sun.sun_path, ".blob",
895: sizeof srv->srv_blob.server.cli_sa.sun.sun_path);
1.4 misho 896: break;
897: default:
1.7 misho 898: AIT_FREE_VAL(&srv->srv_blob.dir);
1.4 misho 899: return -1;
1.2 misho 900: }
901:
1.4 misho 902: /* create BLOB server socket */
1.6 misho 903: srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);
1.2 misho 904: if (srv->srv_blob.server.cli_sock == -1) {
905: LOGERR;
1.7 misho 906: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 907: return -1;
908: }
909: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
910: LOGERR;
911: close(srv->srv_blob.server.cli_sock);
1.7 misho 912: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 913: return -1;
914: }
1.5 misho 915: n = srv->srv_netbuf;
916: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
917: LOGERR;
918: close(srv->srv_blob.server.cli_sock);
1.7 misho 919: AIT_FREE_VAL(&srv->srv_blob.dir);
1.5 misho 920: return -1;
921: }
922: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
923: LOGERR;
924: close(srv->srv_blob.server.cli_sock);
1.7 misho 925: AIT_FREE_VAL(&srv->srv_blob.dir);
1.5 misho 926: return -1;
927: }
1.6 misho 928: if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa,
929: srv->srv_blob.server.cli_sa.sa.sa_len) == -1) {
1.2 misho 930: LOGERR;
931: close(srv->srv_blob.server.cli_sock);
1.7 misho 932: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 933: return -1;
1.13 misho 934: } else
935: fcntl(srv->srv_blob.server.cli_sock, F_SETFL,
936: fcntl(srv->srv_blob.server.cli_sock, F_GETFL) | O_NONBLOCK);
937:
1.2 misho 938:
1.10 misho 939: /* allocate pool for concurent blob clients */
1.14 misho 940: srv->srv_blob.clients = array_Init(array_Size(srv->srv_clients));
1.2 misho 941: if (!srv->srv_blob.clients) {
1.14 misho 942: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
1.2 misho 943: close(srv->srv_blob.server.cli_sock);
1.7 misho 944: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 945: return -1;
1.10 misho 946: }
1.2 misho 947:
1.10 misho 948: /* init blob scheduler */
949: srv->srv_blob.root = schedBegin();
950: if (!srv->srv_blob.root) {
951: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1.14 misho 952: array_Destroy(&srv->srv_blob.clients);
1.10 misho 953: close(srv->srv_blob.server.cli_sock);
954: AIT_FREE_VAL(&srv->srv_blob.dir);
955: return -1;
956: }
1.2 misho 957:
958: return 0;
959: }
960:
961: /*
1.7 misho 962: * rpc_srv_endBLOBServer() - Destroy BLOB server, close all opened sockets and free resources
963: *
1.2 misho 964: * @srv = RPC Server instance
965: * return: none
966: */
1.16 misho 967: void
1.2 misho 968: rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
969: {
1.10 misho 970: if (!srv)
1.2 misho 971: return;
972:
1.10 misho 973: srv->srv_blob.kill = 1;
1.17 misho 974:
975: schedEnd(&srv->srv_blob.root);
1.2 misho 976: }
977:
978: /*
1.10 misho 979: * rpc_srv_loopBLOBServer() - Execute Main BLOB server loop and wait for clients requests
1.7 misho 980: *
1.2 misho 981: * @srv = RPC Server instance
982: * return: -1 error or 0 ok, infinite loop ...
983: */
984: int
1.10 misho 985: rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv)
1.2 misho 986: {
1.10 misho 987: rpc_cli_t *c;
1.2 misho 988: register int i;
1.10 misho 989: rpc_blob_t *b, *tmp;
990: struct timespec ts = { RPC_SCHED_POLLING, 0 };
1.2 misho 991:
1.10 misho 992: if (!srv || srv->srv_kill) {
1.7 misho 993: rpc_SetErr(EINVAL, "Invalid parameter can`t start BLOB server");
1.2 misho 994: return -1;
995: }
996:
1.14 misho 997: if (listen(srv->srv_blob.server.cli_sock, array_Size(srv->srv_blob.clients)) == -1) {
1.2 misho 998: LOGERR;
999: return -1;
1.10 misho 1000: }
1001:
1.22.6.1! misho 1002: if (!schedSignal(srv->srv_blob.root, flushBLOB, srv, SIGFBLOB, NULL, 0)) {
! 1003: /* disabled kqueue support in libaitsched */
! 1004: struct sigaction sa;
! 1005:
! 1006: atomic_store_rel_ptr(&_glSigArg, (uintptr_t) srv);
! 1007:
! 1008: memset(&sa, 0, sizeof sa);
! 1009: sigemptyset(&sa.sa_mask);
! 1010: sa.sa_handler = (void (*)(int)) flushBLOB;
! 1011: sa.sa_flags = SA_RESTART | SA_RESETHAND;
! 1012: sigaction(SIGFBLOB, &sa, NULL);
! 1013: }
! 1014:
1.10 misho 1015: if (!schedRead(srv->srv_blob.root, acceptBLOBClients, srv,
1016: srv->srv_blob.server.cli_sock, NULL, 0)) {
1017: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1018: return -1;
1019: }
1.2 misho 1020:
1.10 misho 1021: schedPolling(srv->srv_blob.root, &ts, NULL);
1022: /* main rpc loop */
1023: schedRun(srv->srv_blob.root, &srv->srv_blob.kill);
1.7 misho 1024:
1.17 misho 1025: /* detach blobs */
1026: TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
1027: TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
1028:
1029: rpc_srv_blobFree(srv, b);
1030: e_free(b);
1031: }
1032:
1.10 misho 1033: /* close all clients connections & server socket */
1.14 misho 1034: for (i = 0; i < array_Size(srv->srv_blob.clients); i++) {
1035: c = array(srv->srv_blob.clients, i, rpc_cli_t*);
1.10 misho 1036: if (c) {
1037: shutdown(c->cli_sock, SHUT_RDWR);
1038: close(c->cli_sock);
1039:
1040: schedCancelby(srv->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
1041: AIT_FREE_VAL(&c->cli_buf);
1.2 misho 1042: }
1.14 misho 1043: array_Del(srv->srv_blob.clients, i, 42);
1.10 misho 1044: }
1.14 misho 1045: array_Destroy(&srv->srv_blob.clients);
1.2 misho 1046:
1.10 misho 1047: close(srv->srv_blob.server.cli_sock);
1.2 misho 1048:
1.10 misho 1049: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 1050: return 0;
1051: }
1052:
1053:
1054: /*
1.7 misho 1055: * rpc_srv_initServer() - Init & create RPC Server
1056: *
1.15 misho 1057: * @InstID = Instance for authentication & recognition
1.1 misho 1058: * @concurentClients = Concurent clients at same time to this server
1.10 misho 1059: * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
1.4 misho 1060: * @csHost = Host name or address for bind server, if NULL any address
1.1 misho 1061: * @Port = Port for bind server, if Port == 0 default port is selected
1.13 misho 1062: * @proto = Protocol, if == 0 choose SOCK_STREAM
1.1 misho 1063: * return: NULL == error or !=NULL bind and created RPC server instance
1064: */
1065: rpc_srv_t *
1.15 misho 1066: rpc_srv_initServer(u_char InstID, int concurentClients, int netBuf,
1067: const char *csHost, u_short Port, int proto)
1.1 misho 1068: {
1.10 misho 1069: int n = 1;
1.1 misho 1070: rpc_srv_t *srv = NULL;
1.14 misho 1071: sockaddr_t sa = E_SOCKADDR_INIT;
1.1 misho 1072:
1.15 misho 1073: if (!concurentClients || (proto < 0 || proto > SOCK_DGRAM)) {
1.10 misho 1074: rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
1.1 misho 1075: return NULL;
1076: }
1.14 misho 1077: if (!e_gethostbyname(csHost, Port, &sa))
1.10 misho 1078: return NULL;
1.1 misho 1079: if (!Port)
1080: Port = RPC_DEFPORT;
1.13 misho 1081: if (!proto)
1082: proto = SOCK_STREAM;
1.10 misho 1083: if (netBuf < RPC_MIN_BUFSIZ)
1.5 misho 1084: netBuf = BUFSIZ;
1.7 misho 1085: else
1.14 misho 1086: netBuf = E_ALIGN(netBuf, 2); /* align netBuf length */
1.10 misho 1087:
1088: #ifdef HAVE_SRANDOMDEV
1089: srandomdev();
1090: #else
1091: time_t tim;
1092:
1093: srandom((time(&tim) ^ getpid()));
1094: #endif
1.1 misho 1095:
1.14 misho 1096: srv = e_malloc(sizeof(rpc_srv_t));
1.1 misho 1097: if (!srv) {
1098: LOGERR;
1099: return NULL;
1100: } else
1101: memset(srv, 0, sizeof(rpc_srv_t));
1102:
1.13 misho 1103: srv->srv_proto = proto;
1.5 misho 1104: srv->srv_netbuf = netBuf;
1.1 misho 1105: srv->srv_session.sess_version = RPC_VERSION;
1.15 misho 1106: srv->srv_session.sess_instance = InstID;
1.1 misho 1107:
1108: srv->srv_server.cli_parent = srv;
1.10 misho 1109: memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa);
1110:
1.12 misho 1111: /* init functions */
1112: pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
1113: SLIST_INIT(&srv->srv_funcs);
1114: AVL_INIT(&srv->srv_funcs);
1.10 misho 1115:
1116: /* init scheduler */
1117: srv->srv_root = schedBegin();
1118: if (!srv->srv_root) {
1119: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1.12 misho 1120: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1.14 misho 1121: e_free(srv);
1.10 misho 1122: return NULL;
1123: }
1124:
1125: /* init pool for clients */
1.14 misho 1126: srv->srv_clients = array_Init(concurentClients);
1.10 misho 1127: if (!srv->srv_clients) {
1.14 misho 1128: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
1.10 misho 1129: schedEnd(&srv->srv_root);
1.12 misho 1130: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1.14 misho 1131: e_free(srv);
1.10 misho 1132: return NULL;
1133: }
1.4 misho 1134:
1135: /* create server socket */
1.13 misho 1136: srv->srv_server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, srv->srv_proto, 0);
1.1 misho 1137: if (srv->srv_server.cli_sock == -1) {
1138: LOGERR;
1.14 misho 1139: array_Destroy(&srv->srv_clients);
1.10 misho 1140: schedEnd(&srv->srv_root);
1.12 misho 1141: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1.14 misho 1142: e_free(srv);
1.1 misho 1143: return NULL;
1144: }
1145: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
1146: LOGERR;
1.10 misho 1147: goto err;
1.1 misho 1148: }
1.5 misho 1149: n = srv->srv_netbuf;
1150: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
1151: LOGERR;
1.10 misho 1152: goto err;
1.5 misho 1153: }
1154: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
1155: LOGERR;
1.10 misho 1156: goto err;
1.5 misho 1157: }
1.6 misho 1158: if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa,
1159: srv->srv_server.cli_sa.sa.sa_len) == -1) {
1.1 misho 1160: LOGERR;
1.10 misho 1161: goto err;
1.13 misho 1162: } else
1163: fcntl(srv->srv_server.cli_sock, F_SETFL,
1164: fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
1.1 misho 1165:
1.10 misho 1166: rpc_register_srvPing(srv);
1.8 misho 1167:
1.1 misho 1168: return srv;
1.10 misho 1169: err: /* error condition */
1170: close(srv->srv_server.cli_sock);
1.14 misho 1171: array_Destroy(&srv->srv_clients);
1.10 misho 1172: schedEnd(&srv->srv_root);
1.12 misho 1173: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1.14 misho 1174: e_free(srv);
1.10 misho 1175: return NULL;
1.1 misho 1176: }
1177:
1178: /*
1.7 misho 1179: * rpc_srv_endServer() - Destroy RPC server, close all opened sockets and free resources
1180: *
1.6 misho 1181: * @psrv = RPC Server instance
1.1 misho 1182: * return: none
1183: */
1.16 misho 1184: void
1.6 misho 1185: rpc_srv_endServer(rpc_srv_t ** __restrict psrv)
1.1 misho 1186: {
1.10 misho 1187: if (!psrv || !*psrv)
1.1 misho 1188: return;
1189:
1.10 misho 1190: /* if send kill to blob server */
1.17 misho 1191: rpc_srv_endBLOBServer(*psrv);
1.1 misho 1192:
1.10 misho 1193: (*psrv)->srv_kill = 1;
1194: sleep(RPC_SCHED_POLLING);
1.2 misho 1195:
1.17 misho 1196: schedEnd(&(*psrv)->srv_root);
1197:
1.12 misho 1198: pthread_mutex_destroy(&(*psrv)->srv_funcs.mtx);
1.14 misho 1199: e_free(*psrv);
1.6 misho 1200: *psrv = NULL;
1.1 misho 1201: }
1202:
1203: /*
1.7 misho 1204: * rpc_srv_loopServer() - Execute Main server loop and wait for clients requests
1205: *
1.1 misho 1206: * @srv = RPC Server instance
1207: * return: -1 error or 0 ok, infinite loop ...
1208: */
1209: int
1.5 misho 1210: rpc_srv_loopServer(rpc_srv_t * __restrict srv)
1.1 misho 1211: {
1.10 misho 1212: rpc_cli_t *c;
1.1 misho 1213: register int i;
1.12 misho 1214: rpc_func_t *f;
1.10 misho 1215: struct timespec ts = { RPC_SCHED_POLLING, 0 };
1.1 misho 1216:
1217: if (!srv) {
1.10 misho 1218: rpc_SetErr(EINVAL, "Invalid parameter can`t start RPC server");
1.1 misho 1219: return -1;
1220: }
1221:
1.13 misho 1222: if (srv->srv_proto == SOCK_STREAM)
1.14 misho 1223: if (listen(srv->srv_server.cli_sock, array_Size(srv->srv_clients)) == -1) {
1.13 misho 1224: LOGERR;
1225: return -1;
1226: }
1.1 misho 1227:
1.13 misho 1228: if (!schedRead(srv->srv_root, cbProto[srv->srv_proto][CB_ACCEPTCLIENT], srv,
1229: srv->srv_server.cli_sock, NULL, 0)) {
1.10 misho 1230: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1231: return -1;
1232: }
1.7 misho 1233:
1.10 misho 1234: schedPolling(srv->srv_root, &ts, NULL);
1.7 misho 1235: /* main rpc loop */
1.10 misho 1236: schedRun(srv->srv_root, &srv->srv_kill);
1237:
1238: /* close all clients connections & server socket */
1.14 misho 1239: for (i = 0; i < array_Size(srv->srv_clients); i++) {
1240: c = array(srv->srv_clients, i, rpc_cli_t*);
1.10 misho 1241: if (c) {
1242: shutdown(c->cli_sock, SHUT_RDWR);
1243: close(c->cli_sock);
1244:
1245: schedCancelby(srv->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
1.14 misho 1246: ait_freeVars(&RPC_RETVARS(c));
1.10 misho 1247: AIT_FREE_VAL(&c->cli_buf);
1.1 misho 1248: }
1.14 misho 1249: array_Del(srv->srv_clients, i, 42);
1.10 misho 1250: }
1.14 misho 1251: array_Destroy(&srv->srv_clients);
1.2 misho 1252:
1.10 misho 1253: close(srv->srv_server.cli_sock);
1.2 misho 1254:
1.10 misho 1255: /* detach exported calls */
1.12 misho 1256: RPC_FUNCS_LOCK(&srv->srv_funcs);
1257: while ((f = SLIST_FIRST(&srv->srv_funcs))) {
1258: SLIST_REMOVE_HEAD(&srv->srv_funcs, func_next);
1.1 misho 1259:
1.10 misho 1260: AIT_FREE_VAL(&f->func_name);
1.14 misho 1261: e_free(f);
1.1 misho 1262: }
1.12 misho 1263: srv->srv_funcs.avlh_root = NULL;
1264: RPC_FUNCS_UNLOCK(&srv->srv_funcs);
1.1 misho 1265:
1266: return 0;
1267: }
1268:
1269:
1270: /*
1271: * rpc_srv_execCall() Execute registered call from RPC server
1.7 misho 1272: *
1.10 misho 1273: * @cli = RPC client
1.1 misho 1274: * @rpc = IN RPC call structure
1.10 misho 1275: * @funcname = Execute RPC function
1.5 misho 1276: * @args = IN RPC calling arguments from RPC client
1.1 misho 1277: * return: -1 error, !=-1 ok
1278: */
1279: int
1.10 misho 1280: rpc_srv_execCall(rpc_cli_t * __restrict cli, struct tagRPCCall * __restrict rpc,
1281: ait_val_t funcname, array_t * __restrict args)
1.1 misho 1282: {
1283: rpc_callback_t func;
1284:
1.10 misho 1285: if (!cli || !rpc || !AIT_ADDR(&funcname)) {
1.7 misho 1286: rpc_SetErr(EINVAL, "Invalid parameter can`t exec function");
1.1 misho 1287: return -1;
1288: }
1289:
1.10 misho 1290: func = AIT_GET_LIKE(&funcname, rpc_callback_t);
1291: return func(cli, rpc, args);
1.1 misho 1292: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>