/************************************************************************* * (C) 2010 AITNET ltd - Sofia/Bulgaria - * by Michael Pounov * * $Author: misho $ * $Id: srv.c,v 1.31 2024/03/20 17:32:31 misho Exp $ * ************************************************************************** The ELWIX and AITNET software is distributed under the following terms: All of the documentation and software included in the ELWIX and AITNET Releases is copyrighted by ELWIX - Sofia/Bulgaria Copyright 2004 - 2024 by Michael Pounov . All rights reserved. Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. 3. All advertising materials mentioning features or use of this software must display the following acknowledgement: This product includes software developed by Michael Pounov ELWIX - Embedded LightWeight unIX and its contributors. 4. Neither the name of AITNET nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission. THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #include "global.h" /* SOCK_STREAM */ static void *acceptClients(sched_task_t *); static void *closeClient(sched_task_t *); static void *rxPacket(sched_task_t *); static void *txPacket(sched_task_t *); /* SOCK_DGRAM */ static void *freeClient(sched_task_t *); static void *rxUDPPacket(sched_task_t *); static void *txUDPPacket(sched_task_t *); /* SOCK_RAW */ static void *rxRAWPacket(sched_task_t *); static void *txRAWPacket(sched_task_t *); /* SOCK_BPF */ static void *rxBPFPacket(sched_task_t *); static void *txBPFPacket(sched_task_t *); /* SOCK_EXT */ static void *rxEXTPacket(sched_task_t *); static void *txEXTPacket(sched_task_t *); static sched_task_func_t cbProto[SOCK_MAX_SUPPORT][4] = { { acceptClients, closeClient, rxPacket, txPacket }, /* SOCK_STREAM */ { acceptClients, closeClient, rxPacket, txPacket }, /* SOCK_STREAM */ { rxUDPPacket, freeClient, NULL /*rxUDPPacket*/, txUDPPacket }, /* SOCK_DGRAM */ { rxRAWPacket, freeClient, NULL /*rxRAWPacket*/, txRAWPacket }, /* SOCK_RAW */ { rxBPFPacket, freeClient, NULL /*rxBPFPacket*/, txBPFPacket }, /* SOCK_BPF */ { rxEXTPacket, freeClient, NULL /*rxEXTPacket*/, txEXTPacket } /* SOCK_EXT */ }; /* Global Signal Argument when kqueue support disabled */ static volatile uintptr_t _glSigArg = 0; void rpc_freeCli(rpc_cli_t * __restrict c) { rpc_srv_t *s = c->cli_parent; if (s->srv_proto == SOCK_STREAM) schedCancelby(s->srv_root, taskMAX, CRITERIA_ARG, c, NULL); /* free buffer */ AIT_FREE_VAL(&c->cli_buf); array_Del(s->srv_clients, c->cli_id, 0); if (c) e_free(c); } static inline int _check4freeslot(rpc_srv_t * __restrict srv, sockaddr_t * __restrict sa) { rpc_cli_t *c = NULL; register int i; /* check free slots for connect */ for (i = 0; i < array_Size(srv->srv_clients) && (c = array(srv->srv_clients, i, rpc_cli_t*)); i++) /* check for duplicates */ if (sa && !e_addrcmp(&c->cli_sa, sa, 42)) break; if (i >= array_Size(srv->srv_clients)) return -1; /* no more free slots! */ return i; } static rpc_cli_t * _allocClient(rpc_srv_t * __restrict srv, sockaddr_t * __restrict sa) { rpc_cli_t *c = NULL; int n; if (srv->srv_proto != SOCK_EXT) n = _check4freeslot(srv, sa); else n = 0; if (n == -1) return NULL; else c = array(srv->srv_clients, n, rpc_cli_t*); if (!c) { c = e_malloc(sizeof(rpc_cli_t)); if (!c) { LOGERR; srv->srv_kill = 1; return NULL; } else { memset(c, 0, sizeof(rpc_cli_t)); array_Set(srv->srv_clients, n, c); c->cli_id = n; c->cli_parent = srv; } /* alloc empty buffer */ AIT_SET_BUFSIZ(&c->cli_buf, 0, srv->srv_netbuf); if (!AIT_GET_BUF(&c->cli_buf)) { array_Del(srv->srv_clients, n, 0); e_free(c); c = NULL; } } return c; } static void * freeClient(sched_task_t *task) { rpc_freeCli(TASK_ARG(task)); taskExit(task, NULL); } static void * closeClient(sched_task_t *task) { int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock; rpc_freeCli(TASK_ARG(task)); /* close client socket */ shutdown(sock, SHUT_RDWR); close(sock); taskExit(task, NULL); } static void * txPacket(sched_task_t *task) { rpc_cli_t *c = TASK_ARG(task); rpc_srv_t *s = c->cli_parent; rpc_func_t *f = NULL; u_char *buf = AIT_GET_BUF(&c->cli_buf); struct tagRPCCall *rpc = (struct tagRPCCall*) buf; int ret, wlen = sizeof(struct tagRPCCall); #ifdef TCP_SESSION_TIMEOUT struct timespec ts = { DEF_RPC_TIMEOUT, 0 }; schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL); schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], TASK_ARG(task), ts, TASK_ARG(task), 0); #endif if (rpc->call_argc) { f = rpc_srv_getCall(s, ntohs(rpc->call_tag)); if (!f) { rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server"); rpc->call_argc ^= rpc->call_argc; RPC_SET_RETURN(rpc, -1); RPC_SET_ERRNO(rpc, rpc_Errno); } else if (rpc_pktFreeSpace(c) > s->srv_netbuf) { rpc_SetErr(EMSGSIZE, "Message too long"); rpc->call_argc ^= rpc->call_argc; RPC_SET_RETURN(rpc, -1); RPC_SET_ERRNO(rpc, rpc_Errno); } else { rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c)); /* Go Encapsulate variables */ ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen, RPC_RETVARS(c)); if (ret == -1) { rpc_SetErr(EBADRPC, "Prepare RPC packet failed"); rpc->call_argc ^= rpc->call_argc; RPC_SET_RETURN(rpc, -1); RPC_SET_ERRNO(rpc, rpc_Errno); } else wlen += ret; } } /* Free return values */ ait_freeVars(&c->cli_vars); rpc->call_len = htonl(wlen); rpc->call_io = RPC_ACK; #if 0 /* calculate CRC */ rpc->call_crc ^= rpc->call_crc; rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2)); #endif /* send reply */ ret = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL); if (ret == -1) { /* close connection */ schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], TASK_ARG(task), 0, NULL, 0); } taskExit(task, NULL); } static void * execCall(sched_task_t *task) { rpc_cli_t *c = TASK_ARG(task); rpc_srv_t *s = c->cli_parent; rpc_func_t *f = NULL; array_t *arr = NULL; u_char *buf = AIT_GET_BUF(&c->cli_buf); struct tagRPCCall *rpc = (struct tagRPCCall*) buf; int argc = rpc->call_argc; /* Go decapsulate variables ... */ if (argc) { arr = ait_buffer2vars(buf + sizeof(struct tagRPCCall), AIT_LEN(&c->cli_buf) - sizeof(struct tagRPCCall), argc, 42); if (!arr) { rpc_SetErr(ERPCMISMATCH, "#%d - %s", elwix_GetErrno(), elwix_GetError()); rpc->call_argc ^= rpc->call_argc; RPC_SET_RETURN(rpc, -1); RPC_SET_ERRNO(rpc, rpc_Errno); taskExit(task, NULL); } } else arr = NULL; if (!(f = rpc_srv_getCall(s, ntohs(rpc->call_tag)))) { rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server"); rpc->call_argc ^= rpc->call_argc; RPC_SET_RETURN(rpc, -1); RPC_SET_ERRNO(rpc, rpc_Errno); } else { /* if client doesn't want reply */ RPC_SET_RETURN(rpc, rpc_srv_execCall(c, rpc, f->func_name, arr)); if (rpc->call_rep.ret == htonl(-1)) { if (!rpc->call_rep.eno) { LOGERR; RPC_SET_ERRNO(rpc, rpc_Errno); } rpc->call_argc ^= rpc->call_argc; ait_freeVars(&c->cli_vars); } else { rpc->call_rep.eno ^= rpc->call_rep.eno; rpc->call_argc ^= rpc->call_argc; if (TASK_VAL(task)) { /* without reply */ ait_freeVars(&c->cli_vars); } else if (rpc->call_io & RPC_REQ) { /* reply */ rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c)); } } } array_Destroy(&arr); taskExit(task, NULL); } static void * rxPacket(sched_task_t *task) { rpc_cli_t *c = TASK_ARG(task); rpc_srv_t *s = c->cli_parent; int len, noreply = 0, rlen; #if 0 u_short crc; #endif u_char *buf = AIT_GET_BUF(&c->cli_buf); struct tagRPCCall b, *rpc = (struct tagRPCCall*) buf; #ifdef TCP_SESSION_TIMEOUT struct timespec ts = { DEF_RPC_TIMEOUT, 0 }; schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL); schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], TASK_ARG(task), ts, TASK_ARG(task), 0); #endif /* prepare rx */ len = recv(TASK_FD(task), &b, sizeof b, MSG_PEEK); if (len < 1) { /* close connection */ schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], TASK_ARG(task), 0, NULL, 0); taskExit(task, NULL); } else if (len == sizeof b) rlen = ntohl(b.call_len); else goto end; rlen = recv(TASK_FD(task), buf, rlen, 0); if (rlen == -1) { /* close connection */ schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], TASK_ARG(task), 0, NULL, 0); taskExit(task, NULL); } if (rlen < sizeof(struct tagRPCCall)) { rpc_SetErr(ERPCMISMATCH, "Short RPC packet"); rpc->call_argc ^= rpc->call_argc; RPC_SET_RETURN(rpc, -1); RPC_SET_ERRNO(rpc, rpc_Errno); goto err; } else len = ntohl(rpc->call_len); if (rlen < len || len > AIT_LEN(&c->cli_buf)) { rpc_SetErr(ERPCMISMATCH, "Short RPC packet"); rpc->call_argc ^= rpc->call_argc; RPC_SET_RETURN(rpc, -1); RPC_SET_ERRNO(rpc, rpc_Errno); goto err; } /* skip loop packet */ if (rpc->call_io & RPC_ACK) { schedReadSelf(task); taskExit(task, NULL); } #if 0 /* check integrity of packet */ crc = ntohs(rpc->call_crc); rpc->call_crc ^= rpc->call_crc; if (crc != crcFletcher16((u_short*) rpc, len / 2)) { rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet"); rpc->call_argc ^= rpc->call_argc; RPC_SET_RETURN(rpc, -1); RPC_SET_ERRNO(rpc, rpc_Errno); goto err; } #endif /* check RPC packet session info */ if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) { rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session"); rpc->call_argc ^= rpc->call_argc; RPC_SET_RETURN(rpc, -1); RPC_SET_ERRNO(rpc, rpc_Errno); goto err; } noreply = RPC_CHK_NOREPLY(rpc); /* execute RPC call */ schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), (int) noreply, rpc, len); err: /* send RPC reply */ if (!noreply && (rpc->call_io & RPC_REQ)) schedWrite(TASK_ROOT(task), cbProto[s->srv_proto][CB_TXPACKET], TASK_ARG(task), TASK_FD(task), rpc, len); end: /* lets get next packet */ schedReadSelf(task); taskExit(task, NULL); } static void * acceptClients(sched_task_t *task) { rpc_srv_t *srv = TASK_ARG(task); rpc_cli_t *c = NULL; socklen_t salen = E_SOCKADDR_MAX; int sock; #ifdef TCP_SESSION_TIMEOUT struct timespec ts = { DEF_RPC_TIMEOUT, 0 }; #endif c = _allocClient(srv, NULL); if (!c) { EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n"); if ((sock = accept(TASK_FD(task), NULL, NULL)) != -1) { shutdown(sock, SHUT_RDWR); close(sock); } goto end; } /* accept client */ c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen); if (c->cli_sock == -1) { LOGERR; AIT_FREE_VAL(&c->cli_buf); array_Del(srv->srv_clients, c->cli_id, 42); goto end; } else { fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK); fcntl(c->cli_sock, F_SETFD, FD_CLOEXEC); } #ifdef TCP_SESSION_TIMEOUT /* armed timer for close stateless connection */ schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL); schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], c, ts, c, 0); #endif schedRead(TASK_ROOT(task), cbProto[srv->srv_proto][CB_RXPACKET], c, c->cli_sock, NULL, 0); end: schedReadSelf(task); taskExit(task, NULL); } static void * txUDPPacket(sched_task_t *task) { rpc_cli_t *c = TASK_ARG(task); rpc_srv_t *s = c->cli_parent; rpc_func_t *f = NULL; u_char *buf = AIT_GET_BUF(&c->cli_buf); struct tagRPCCall *rpc = (struct tagRPCCall*) buf; int ret, wlen = sizeof(struct tagRPCCall); struct timespec ts = { DEF_RPC_TIMEOUT, 0 }; schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL); schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], TASK_ARG(task), ts, TASK_ARG(task), 0); if (rpc->call_argc) { f = rpc_srv_getCall(s, ntohs(rpc->call_tag)); if (!f) { rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server"); rpc->call_argc ^= rpc->call_argc; RPC_SET_RETURN(rpc, -1); RPC_SET_ERRNO(rpc, rpc_Errno); } else if (rpc_pktFreeSpace(c) > s->srv_netbuf) { rpc_SetErr(EMSGSIZE, "Message too long"); rpc->call_argc ^= rpc->call_argc; RPC_SET_RETURN(rpc, -1); RPC_SET_ERRNO(rpc, rpc_Errno); } else { rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c)); /* Go Encapsulate variables */ ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen, RPC_RETVARS(c)); if (ret == -1) { rpc_SetErr(EBADRPC, "Prepare RPC packet failed"); rpc->call_argc ^= rpc->call_argc; RPC_SET_RETURN(rpc, -1); RPC_SET_ERRNO(rpc, rpc_Errno); } else wlen += ret; } } /* Free return values */ ait_freeVars(&c->cli_vars); rpc->call_len = htonl(wlen); rpc->call_io = RPC_ACK; /* calculate CRC */ rpc->call_crc ^= rpc->call_crc; rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2)); /* send reply */ ret = sendto(TASK_FD(task), buf, wlen, MSG_NOSIGNAL, &c->cli_sa.sa, e_addrlen(&c->cli_sa)); if (ret == -1) { /* close connection */ schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], TASK_ARG(task), 0, NULL, 0); } taskExit(task, NULL); } static void * rxUDPPacket(sched_task_t *task) { rpc_srv_t *srv = TASK_ARG(task); rpc_cli_t *c = NULL; int len, noreply = 0, rlen; u_short crc; struct tagRPCCall *rpc; sockaddr_t sa; socklen_t salen = E_SOCKADDR_MAX; struct timespec ts = { DEF_RPC_TIMEOUT, 0 }; ait_val_t b = AIT_VAL_INIT; /* receive connect packet */ AIT_SET_BUF(&b, NULL, srv->srv_netbuf); #ifndef __linux__ sa.ss.ss_len = salen; #endif rlen = recvfrom(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b), 0, &sa.sa, &salen); rpc = (struct tagRPCCall*) AIT_GET_BUF(&b); if (rlen < sizeof(struct tagRPCCall)) goto end; else len = ntohl(rpc->call_len); if (rlen < len || len > srv->srv_netbuf) goto end; /* skip loop packet */ if (rpc->call_io & RPC_ACK) goto end; /* check integrity of packet */ crc = ntohs(rpc->call_crc); rpc->call_crc ^= rpc->call_crc; if (crc != crcFletcher16((u_short*) AIT_GET_BUF(&b), len / 2)) goto end; /* check RPC packet session info */ if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) goto end; c = _allocClient(srv, &sa); if (!c) { EVERBOSE(1, "RPC client quota exceeded!"); usleep(2000); /* blocked client delay */ goto end; } else { memcpy(AIT_GET_BUF(&c->cli_buf), AIT_GET_BUF(&b), len); rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf); c->cli_sock = TASK_FD(task); memcpy(&c->cli_sa, &sa, sizeof c->cli_sa); /* armed timer for close stateless connection */ schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL); schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], c, ts, c, 0); } noreply = RPC_CHK_NOREPLY(rpc); /* execute RPC call */ schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len); /* send RPC reply */ if (!noreply && (rpc->call_io & RPC_REQ)) schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], c, TASK_FD(task), rpc, len); end: AIT_FREE_VAL(&b); schedReadSelf(task); taskExit(task, NULL); } static void * txRAWPacket(sched_task_t *task) { rpc_cli_t *c = TASK_ARG(task); rpc_srv_t *s = c->cli_parent; rpc_func_t *f = NULL; u_char *buf = AIT_GET_BUF(&c->cli_buf); struct tagRPCCall *rpc = (struct tagRPCCall*) buf; int ret, wlen = sizeof(struct tagRPCCall); struct timespec ts = { DEF_RPC_TIMEOUT, 0 }; schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL); schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], TASK_ARG(task), ts, TASK_ARG(task), 0); if (rpc->call_argc) { f = rpc_srv_getCall(s, ntohs(rpc->call_tag)); if (!f) { rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server"); rpc->call_argc ^= rpc->call_argc; RPC_SET_RETURN(rpc, -1); RPC_SET_ERRNO(rpc, rpc_Errno); } else if (rpc_pktFreeSpace(c) > s->srv_netbuf) { rpc_SetErr(EMSGSIZE, "Message too long"); rpc->call_argc ^= rpc->call_argc; RPC_SET_RETURN(rpc, -1); RPC_SET_ERRNO(rpc, rpc_Errno); } else { rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c)); /* Go Encapsulate variables */ ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen, RPC_RETVARS(c)); if (ret == -1) { rpc_SetErr(EBADRPC, "Prepare RPC packet failed"); rpc->call_argc ^= rpc->call_argc; RPC_SET_RETURN(rpc, -1); RPC_SET_ERRNO(rpc, rpc_Errno); } else wlen += ret; } } /* Free return values */ ait_freeVars(&c->cli_vars); rpc->call_len = htonl(wlen); rpc->call_io = RPC_ACK; /* calculate CRC */ rpc->call_crc ^= rpc->call_crc; rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2)); /* send reply */ ret = sendto(TASK_FD(task), buf, wlen, MSG_NOSIGNAL, &c->cli_sa.sa, e_addrlen(&c->cli_sa)); if (ret == -1) { /* close connection */ schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], TASK_ARG(task), 0, NULL, 0); } taskExit(task, NULL); } static void * rxRAWPacket(sched_task_t *task) { rpc_srv_t *srv = TASK_ARG(task); rpc_cli_t *c = NULL; int len, noreply = 0, rlen; u_short crc; struct tagRPCCall *rpc; sockaddr_t sa; socklen_t salen = E_SOCKADDR_MAX; struct timespec ts = { DEF_RPC_TIMEOUT, 0 }; ait_val_t b = AIT_VAL_INIT; /* receive connect packet */ AIT_SET_BUF(&b, NULL, srv->srv_netbuf); #ifndef __linux__ sa.ss.ss_len = salen; #endif rlen = recvfrom(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b), 0, &sa.sa, &salen); if (sa.sa.sa_family == AF_INET) { struct ip *h; h = (struct ip*) AIT_GET_BUF(&b); if (rlen < ntohs(h->ip_len) || h->ip_p != IPPROTO_ERPC) goto end; else { rlen -= sizeof(struct ip); rpc = (struct tagRPCCall*) (h + 1); } } else { #ifdef IPV6_REMOVE_HEADER struct ip6_hdr *h; h = (struct ip6_hdr*) AIT_GET_BUF(&b); if (rlen < ntohs(h->ip6_plen) || h->ip6_nxt != IPPROTO_ERPC) goto end; else { rlen -= sizeof(struct ip6_hdr); rpc = (struct tagRPCCall*) (h + 1); } #else rpc = (struct tagRPCCall*) AIT_GET_BUF(&b); #endif } if (rlen < sizeof(struct tagRPCCall)) goto end; else len = ntohl(rpc->call_len); if (rlen < len || len > srv->srv_netbuf) goto end; /* skip loop packet */ if (rpc->call_io & RPC_ACK) goto end; /* check integrity of packet */ crc = ntohs(rpc->call_crc); rpc->call_crc ^= rpc->call_crc; if (crc != crcFletcher16((u_short*) AIT_GET_BUF(&b), len / 2)) goto end; /* check RPC packet session info */ if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) goto end; c = _allocClient(srv, &sa); if (!c) { EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n"); usleep(2000); /* blocked client delay */ goto end; } else { memcpy(AIT_GET_BUF(&c->cli_buf), rpc, len); rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf); c->cli_sock = TASK_FD(task); memcpy(&c->cli_sa, &sa, sizeof c->cli_sa); /* armed timer for close stateless connection */ schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL); schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], c, ts, c, 0); } noreply = RPC_CHK_NOREPLY(rpc); /* execute RPC call */ schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len); /* send RPC reply */ if (!noreply && (rpc->call_io & RPC_REQ)) schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], c, TASK_FD(task), rpc, len); end: AIT_FREE_VAL(&b); schedReadSelf(task); taskExit(task, NULL); } static void * txBPFPacket(sched_task_t *task) { #ifndef __linux__ rpc_cli_t *c = TASK_ARG(task); rpc_srv_t *s = c->cli_parent; rpc_func_t *f = NULL; u_char *buf = AIT_GET_BUF(&c->cli_buf); struct tagRPCCall *rpc = (struct tagRPCCall*) buf; int ret, wlen = sizeof(struct tagRPCCall); struct timespec ts = { DEF_RPC_TIMEOUT, 0 }; struct ether_header *eh; ait_val_t b = AIT_VAL_INIT; schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL); schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], TASK_ARG(task), ts, TASK_ARG(task), 0); if (rpc->call_argc) { f = rpc_srv_getCall(s, ntohs(rpc->call_tag)); if (!f) { rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server"); rpc->call_argc ^= rpc->call_argc; RPC_SET_RETURN(rpc, -1); RPC_SET_ERRNO(rpc, rpc_Errno); } else if (rpc_pktFreeSpace(c) > s->srv_netbuf) { rpc_SetErr(EMSGSIZE, "Message too long"); rpc->call_argc ^= rpc->call_argc; RPC_SET_RETURN(rpc, -1); RPC_SET_ERRNO(rpc, rpc_Errno); } else { rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c)); /* Go Encapsulate variables */ ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen, RPC_RETVARS(c)); if (ret == -1) { rpc_SetErr(EBADRPC, "Prepare RPC packet failed"); rpc->call_argc ^= rpc->call_argc; RPC_SET_RETURN(rpc, -1); RPC_SET_ERRNO(rpc, rpc_Errno); } else wlen += ret; } } /* Free return values */ ait_freeVars(&RPC_RETVARS(c)); rpc->call_len = htonl(wlen); rpc->call_io = RPC_ACK; /* calculate CRC */ rpc->call_crc ^= rpc->call_crc; rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2)); /* send reply */ AIT_SET_BUF(&b, NULL, wlen + ETHER_HDR_LEN); eh = (struct ether_header*) AIT_GET_BUF(&b); memcpy(eh->ether_dhost, LLADDR(&c->cli_sa.sdl), ETHER_ADDR_LEN); eh->ether_type = htons(RPC_DEFPORT); memcpy(eh + 1, buf, wlen); ret = write(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b)); AIT_FREE_VAL(&b); if (ret == -1) { /* close connection */ schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], TASK_ARG(task), 0, NULL, 0); } #else rpc_SetErr(ENOTSUP, "Feature isn't supported on Linux!"); #endif taskExit(task, NULL); } static void * rxBPFPacket(sched_task_t *task) { #ifndef __linux__ rpc_srv_t *srv = TASK_ARG(task); rpc_cli_t *c = NULL; int len, rlen, noreply; u_short crc; struct tagRPCCall *rpc; sockaddr_t sa; struct timespec ts = { DEF_RPC_TIMEOUT, 0 }; struct bpf_hdr *h; struct ether_header *eh; ait_val_t b = AIT_VAL_INIT; /* receive connect packet */ AIT_SET_BUF(&b, NULL, srv->srv_netbuf); rlen = read(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b)); h = (struct bpf_hdr*) AIT_GET_BUF(&b); rlen -= h->bh_hdrlen; if (rlen < h->bh_datalen || h->bh_caplen != h->bh_datalen || rlen < ETHER_HDR_LEN + sizeof(struct tagRPCCall)) goto end; else { rlen = h->bh_caplen; eh = (struct ether_header*) (AIT_GET_BUF(&b) + h->bh_hdrlen); rlen -= ETHER_HDR_LEN; rpc = (struct tagRPCCall*) (eh + 1); if (eh->ether_type != ntohs(RPC_DEFPORT)) goto end; else e_getlinkbymac((const ether_addr_t*) eh->ether_shost, &sa); } if (rlen < sizeof(struct tagRPCCall)) goto end; else len = ntohl(rpc->call_len); if (rlen < len || len > srv->srv_netbuf) goto end; #ifdef CHECK_ETHACK /* skip loop packet */ if (rpc->call_io & RPC_ACK) goto end; #endif /* check integrity of packet */ crc = ntohs(rpc->call_crc); rpc->call_crc ^= rpc->call_crc; if (crc != crcFletcher16((u_short*) rpc, len / 2)) goto end; /* check RPC packet session info */ if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) goto end; c = _allocClient(srv, &sa); if (!c) { EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n"); usleep(2000); /* blocked client delay */ goto end; } else { memcpy(AIT_GET_BUF(&c->cli_buf), rpc, len); rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf); c->cli_sock = TASK_FD(task); memcpy(&c->cli_sa, &sa, sizeof c->cli_sa); /* armed timer for close stateless connection */ schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL); schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], c, ts, c, 0); } noreply = RPC_CHK_NOREPLY(rpc); /* execute RPC call */ schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len); /* send RPC reply */ if (!noreply && (rpc->call_io & RPC_REQ)) schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], c, TASK_FD(task), rpc, len); end: AIT_FREE_VAL(&b); schedReadSelf(task); #else rpc_SetErr(ENOTSUP, "Feature isn't supported on Linux!"); #endif taskExit(task, NULL); } static void * txEXTPacket(sched_task_t *task) { rpc_cli_t *c = TASK_ARG(task); rpc_srv_t *s = c->cli_parent; rpc_func_t *f = NULL; u_char *buf = AIT_GET_BUF(&c->cli_buf); struct tagRPCCall *rpc = (struct tagRPCCall*) buf; int ret, wlen = sizeof(struct tagRPCCall); struct timespec ts = { DEF_RPC_TIMEOUT, 0 }; schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL); schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], TASK_ARG(task), ts, TASK_ARG(task), 0); if (rpc->call_argc) { f = rpc_srv_getCall(s, ntohs(rpc->call_tag)); if (!f) { rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server"); rpc->call_argc ^= rpc->call_argc; RPC_SET_RETURN(rpc, -1); RPC_SET_ERRNO(rpc, rpc_Errno); } else if (rpc_pktFreeSpace(c) > s->srv_netbuf) { rpc_SetErr(EMSGSIZE, "Message too long"); rpc->call_argc ^= rpc->call_argc; RPC_SET_RETURN(rpc, -1); RPC_SET_ERRNO(rpc, rpc_Errno); } else { rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c)); /* Go Encapsulate variables */ ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen, RPC_RETVARS(c)); if (ret == -1) { rpc_SetErr(EBADRPC, "Prepare RPC packet failed"); rpc->call_argc ^= rpc->call_argc; RPC_SET_RETURN(rpc, -1); RPC_SET_ERRNO(rpc, rpc_Errno); } else wlen += ret; } } /* Free return values */ ait_freeVars(&RPC_RETVARS(c)); rpc->call_len = htonl(wlen); rpc->call_io = RPC_ACK; /* send reply */ ret = write(TASK_FD(task), buf, wlen); if (ret == -1) { /* close connection */ schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], TASK_ARG(task), 0, NULL, 0); } taskExit(task, NULL); } static void * rxEXTPacket(sched_task_t *task) { rpc_srv_t *srv = TASK_ARG(task); rpc_cli_t *c = NULL; int len, noreply = 0, rlen; struct tagRPCCall *rpc; struct timespec ts = { DEF_RPC_TIMEOUT, 0 }; sockaddr_t sa; ait_val_t b = AIT_VAL_INIT; memset(&sa, 0, sizeof sa); /* receive connect packet */ AIT_SET_BUF(&b, NULL, srv->srv_netbuf); rlen = read(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b)); rpc = (struct tagRPCCall*) AIT_GET_BUF(&b); if (rlen < sizeof(struct tagRPCCall)) goto end; else len = ntohl(rpc->call_len); if (rlen < len || len > srv->srv_netbuf) goto end; /* skip loop packet */ if (rpc->call_io & RPC_ACK) goto end; /* check RPC packet session info */ if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) goto end; c = _allocClient(srv, &sa); if (!c) { EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n"); usleep(2000); /* blocked client delay */ goto end; } else { memcpy(AIT_GET_BUF(&c->cli_buf), AIT_GET_BUF(&b), len); rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf); c->cli_sock = TASK_FD(task); /* armed timer for close stateless connection */ schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL); schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], c, ts, c, 0); } noreply = RPC_CHK_NOREPLY(rpc); /* execute RPC call */ schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len); /* send RPC reply */ if (!noreply && (rpc->call_io & RPC_REQ)) schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], c, TASK_FD(task), rpc, len); end: AIT_FREE_VAL(&b); schedReadSelf(task); taskExit(task, NULL); } /* ------------------------------------------------------ */ void rpc_freeBLOBCli(rpc_cli_t * __restrict c) { rpc_srv_t *s = c->cli_parent; schedCancelby(s->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL); /* free buffer */ AIT_FREE_VAL(&c->cli_buf); array_Del(s->srv_blob.clients, c->cli_id, 0); if (c) e_free(c); } static void * closeBLOBClient(sched_task_t *task) { int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock; rpc_freeBLOBCli(TASK_ARG(task)); /* close client socket */ shutdown(sock, SHUT_RDWR); close(sock); taskExit(task, NULL); } static void * txBLOB(sched_task_t *task) { rpc_cli_t *c = TASK_ARG(task); u_char *buf = AIT_GET_BUF(&c->cli_buf); int wlen = sizeof(struct tagBLOBHdr); /* send reply */ wlen = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL); if (wlen == -1 || wlen != sizeof(struct tagBLOBHdr)) { /* close blob connection */ schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0); } taskExit(task, NULL); } static void * rxBLOB(sched_task_t *task) { rpc_cli_t *c = TASK_ARG(task); rpc_srv_t *s = c->cli_parent; rpc_blob_t *b; struct tagBLOBHdr blob; int rlen; memset(&blob, 0, sizeof blob); rlen = recv(TASK_FD(task), &blob, sizeof blob, 0); if (rlen < 1) { /* close blob connection */ schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0); taskExit(task, NULL); } /* check BLOB packet */ if (rlen < sizeof(struct tagBLOBHdr)) { rpc_SetErr(ERPCMISMATCH, "Short BLOB packet"); schedReadSelf(task); taskExit(task, NULL); } /* check RPC packet session info */ if (rpc_chkPktSession(&blob.hdr_session, &s->srv_session)) { rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session"); blob.hdr_cmd = error; goto end; } /* Go to proceed packet ... */ switch (blob.hdr_cmd) { case get: if (!(b = rpc_srv_getBLOB(s, ntohl(blob.hdr_var)))) { rpc_SetErr(EINVAL, "Var=%x not found", ntohl(blob.hdr_var)); blob.hdr_cmd = no; RPC_SET_BLOB_RET(&blob, -1); break; } else blob.hdr_len = htonl(b->blob_len); if (rpc_srv_blobMap(s, b) != -1) { /* deliver BLOB variable to client */ blob.hdr_ret = htonl(rpc_srv_sendBLOB(c, b)); rpc_srv_blobUnmap(b); } else { blob.hdr_cmd = error; RPC_SET_BLOB_RET(&blob, -1); } break; case set: if ((b = rpc_srv_registerBLOB(s, ntohl(blob.hdr_len), ntohl(blob.hdr_ret)))) { /* set new BLOB variable for reply :) */ blob.hdr_var = htonl(b->blob_var); /* receive BLOB from client */ blob.hdr_ret = htonl(rpc_srv_recvBLOB(c, b)); rpc_srv_blobUnmap(b); } else { blob.hdr_cmd = error; RPC_SET_BLOB_RET(&blob, -1); } break; case unset: if (rpc_srv_unregisterBLOB(s, ntohl(blob.hdr_var)) == -1) { blob.hdr_cmd = error; RPC_SET_BLOB_RET(&blob, -1); } break; default: rpc_SetErr(EPROCUNAVAIL, "Unsupported BLOB command %d", blob.hdr_cmd); blob.hdr_cmd = error; RPC_SET_BLOB_RET(&blob, -1); } end: memcpy(AIT_ADDR(&c->cli_buf), &blob, sizeof blob); schedWrite(TASK_ROOT(task), txBLOB, TASK_ARG(task), TASK_FD(task), NULL, 0); schedReadSelf(task); taskExit(task, NULL); } static void * flushBLOB(sched_task_t *task) { #ifdef atomic_load_acq_ptr uintptr_t sigArg = atomic_load_acq_ptr(&_glSigArg); #else uintptr_t sigArg = *((volatile uintptr_t*) &_glSigArg); #endif rpc_srv_t *srv = sigArg ? (void*) sigArg : TASK_ARG(task); rpc_blob_t *b, *tmp; TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) { TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node); rpc_srv_blobFree(srv, b); e_free(b); } if (sigArg) { /* disabled kqueue support in libaitsched */ struct sigaction sa; memset(&sa, 0, sizeof sa); sigemptyset(&sa.sa_mask); sa.sa_handler = (void (*)(int)) flushBLOB; sa.sa_flags = SA_RESTART | SA_RESETHAND; sigaction(SIGFBLOB, &sa, NULL); return NULL; } else { schedSignalSelf(task); taskExit(task, NULL); } } static void * acceptBLOBClients(sched_task_t *task) { rpc_srv_t *srv = TASK_ARG(task); rpc_cli_t *c = NULL; register int i; socklen_t salen = E_SOCKADDR_MAX; int sock; #ifdef TCP_NOPUSH int n = 1; #endif /* check free slots for connect */ for (i = 0; i < array_Size(srv->srv_blob.clients) && (c = array(srv->srv_blob.clients, i, rpc_cli_t*)); i++); if (c) { /* no more free slots! */ EVERBOSE(1, "BLOB client quota exceeded! Connection will be shutdown!\n"); if ((sock = accept(TASK_FD(task), NULL, NULL)) != -1) { shutdown(sock, SHUT_RDWR); close(sock); } goto end; } c = e_malloc(sizeof(rpc_cli_t)); if (!c) { LOGERR; srv->srv_kill = srv->srv_blob.kill = 1; taskExit(task, NULL); } else { memset(c, 0, sizeof(rpc_cli_t)); array_Set(srv->srv_blob.clients, i, c); c->cli_id = i; c->cli_parent = srv; } /* alloc empty buffer */ AIT_SET_BUFSIZ(&c->cli_buf, 0, srv->srv_netbuf); /* accept client */ c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen); if (c->cli_sock == -1) { LOGERR; AIT_FREE_VAL(&c->cli_buf); array_Del(srv->srv_blob.clients, i, 42); goto end; } else { #ifdef TCP_NOPUSH setsockopt(c->cli_sock, IPPROTO_TCP, TCP_NOPUSH, &n, sizeof n); #endif fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK); fcntl(c->cli_sock, F_SETFD, FD_CLOEXEC); } schedRead(TASK_ROOT(task), rxBLOB, c, c->cli_sock, NULL, 0); end: schedReadSelf(task); taskExit(task, NULL); } /* ------------------------------------------------------ */ /* * rpc_srv_initBLOBServer() - Init & create BLOB Server * * @srv = RPC server instance * @Port = Port for bind server, if Port == 0 default port is selected * @diskDir = Disk place for BLOB file objects * return: -1 == error or 0 bind and created BLOB server instance */ int rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir) { int n = 1; socklen_t salen; if (!srv || srv->srv_kill) { rpc_SetErr(EINVAL, "Invalid parameters can`t init BLOB server"); return -1; } memset(&srv->srv_blob, 0, sizeof srv->srv_blob); if (access(diskDir, R_OK | W_OK) == -1) { LOGERR; return -1; } else AIT_SET_STR(&srv->srv_blob.dir, diskDir); /* init blob list */ TAILQ_INIT(&srv->srv_blob.blobs); srv->srv_blob.server.cli_parent = srv; memcpy(&srv->srv_blob.server.cli_sa, &srv->srv_server.cli_sa, sizeof srv->srv_blob.server.cli_sa); switch (srv->srv_blob.server.cli_sa.sa.sa_family) { case AF_INET: srv->srv_blob.server.cli_sa.sin.sin_port = htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin.sin_port) + 1); salen = sizeof srv->srv_blob.server.cli_sa.sin; break; case AF_INET6: srv->srv_blob.server.cli_sa.sin6.sin6_port = htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin6.sin6_port) + 1); salen = sizeof srv->srv_blob.server.cli_sa.sin6; break; case AF_LOCAL: strlcat(srv->srv_blob.server.cli_sa.sun.sun_path, ".blob", sizeof srv->srv_blob.server.cli_sa.sun.sun_path); salen = sizeof srv->srv_blob.server.cli_sa.sun; break; default: AIT_FREE_VAL(&srv->srv_blob.dir); return -1; } /* create BLOB server socket */ srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0); if (srv->srv_blob.server.cli_sock == -1) { LOGERR; AIT_FREE_VAL(&srv->srv_blob.dir); return -1; } if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) { LOGERR; close(srv->srv_blob.server.cli_sock); AIT_FREE_VAL(&srv->srv_blob.dir); return -1; } n = srv->srv_netbuf; if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) { LOGERR; close(srv->srv_blob.server.cli_sock); AIT_FREE_VAL(&srv->srv_blob.dir); return -1; } if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) { LOGERR; close(srv->srv_blob.server.cli_sock); AIT_FREE_VAL(&srv->srv_blob.dir); return -1; } if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa, salen) == -1) { LOGERR; close(srv->srv_blob.server.cli_sock); AIT_FREE_VAL(&srv->srv_blob.dir); return -1; } else fcntl(srv->srv_blob.server.cli_sock, F_SETFL, fcntl(srv->srv_blob.server.cli_sock, F_GETFL) | O_NONBLOCK); /* allocate pool for concurent blob clients */ srv->srv_blob.clients = array_Init(array_Size(srv->srv_clients)); if (!srv->srv_blob.clients) { rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError()); close(srv->srv_blob.server.cli_sock); AIT_FREE_VAL(&srv->srv_blob.dir); return -1; } /* init blob scheduler */ srv->srv_blob.root = schedBegin(); if (!srv->srv_blob.root) { rpc_SetErr(sched_GetErrno(), "%s", sched_GetError()); array_Destroy(&srv->srv_blob.clients); close(srv->srv_blob.server.cli_sock); AIT_FREE_VAL(&srv->srv_blob.dir); return -1; } return 0; } /* * rpc_srv_endBLOBServer() - Destroy BLOB server, close all opened sockets and free resources * * @srv = RPC Server instance * return: none */ void rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv) { if (!srv) return; srv->srv_blob.kill = 1; if (srv->srv_blob.server.cli_sa.sa.sa_family == AF_LOCAL) unlink(srv->srv_blob.server.cli_sa.sun.sun_path); schedEnd(&srv->srv_blob.root); } /* * rpc_srv_loopBLOBServer() - Execute Main BLOB server loop and wait for clients requests * * @srv = RPC Server instance * return: -1 error or 0 ok, infinite loop ... */ int rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv) { rpc_cli_t *c; register int i; rpc_blob_t *b, *tmp; struct timespec ts = { RPC_SCHED_POLLING, 0 }; if (!srv || srv->srv_kill) { rpc_SetErr(EINVAL, "Invalid parameter can`t start BLOB server"); return -1; } if (listen(srv->srv_blob.server.cli_sock, array_Size(srv->srv_blob.clients)) == -1) { LOGERR; return -1; } if (!schedSignal(srv->srv_blob.root, flushBLOB, srv, SIGFBLOB, NULL, 0)) { /* disabled kqueue support in libaitsched */ struct sigaction sa; #ifdef atomic_store_rel_ptr atomic_store_rel_ptr(&_glSigArg, (uintptr_t) srv); #else *((volatile uintptr_t*) &_glSigArg) = (uintptr_t) srv; #endif memset(&sa, 0, sizeof sa); sigemptyset(&sa.sa_mask); sa.sa_handler = (void (*)(int)) flushBLOB; sa.sa_flags = SA_RESTART | SA_RESETHAND; sigaction(SIGFBLOB, &sa, NULL); } if (!schedRead(srv->srv_blob.root, acceptBLOBClients, srv, srv->srv_blob.server.cli_sock, NULL, 0)) { rpc_SetErr(sched_GetErrno(), "%s", sched_GetError()); return -1; } schedPolling(srv->srv_blob.root, &ts, NULL); /* main rpc loop */ schedRun(srv->srv_blob.root, &srv->srv_blob.kill); /* detach blobs */ TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) { TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node); rpc_srv_blobFree(srv, b); e_free(b); } /* close all clients connections & server socket */ for (i = 0; i < array_Size(srv->srv_blob.clients); i++) { c = array(srv->srv_blob.clients, i, rpc_cli_t*); if (c) { shutdown(c->cli_sock, SHUT_RDWR); close(c->cli_sock); schedCancelby(srv->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL); AIT_FREE_VAL(&c->cli_buf); } array_Del(srv->srv_blob.clients, i, 42); } array_Destroy(&srv->srv_blob.clients); close(srv->srv_blob.server.cli_sock); AIT_FREE_VAL(&srv->srv_blob.dir); return 0; } /* * rpc_srv_initServer() - Init & create RPC Server * * @InstID = Instance for authentication & recognition * @concurentClients = Concurent clients at same time to this server * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet) * @csHost = Host name or address for bind server, if NULL any address * @Port = Port for bind server, if Port == 0 default port is selected * @proto = Protocol, if == 0 choose SOCK_STREAM * return: NULL == error or !=NULL bind and created RPC server instance */ rpc_srv_t * rpc_srv_initServer(u_char InstID, int concurentClients, int netBuf, const char *csHost, u_short Port, int proto) { int n = 1; rpc_srv_t *srv = NULL; sockaddr_t sa = E_SOCKADDR_INIT; socklen_t salen; if (!concurentClients || (proto < 0 || proto > SOCK_RAW)) { rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server"); return NULL; } if (!Port && proto < SOCK_RAW) Port = RPC_DEFPORT; if (!(salen = e_gethostbyname(csHost, Port, &sa))) return NULL; if (!proto) proto = SOCK_STREAM; if (netBuf < RPC_MIN_BUFSIZ) netBuf = BUFSIZ; else netBuf = E_ALIGN(netBuf, 2); /* align netBuf length */ #ifdef HAVE_SRANDOMDEV srandomdev(); #else time_t tim; srandom((time(&tim) ^ getpid())); #endif srv = e_malloc(sizeof(rpc_srv_t)); if (!srv) { LOGERR; return NULL; } else memset(srv, 0, sizeof(rpc_srv_t)); srv->srv_proto = proto; srv->srv_netbuf = netBuf; srv->srv_session.sess_version = RPC_VERSION; srv->srv_session.sess_instance = InstID; srv->srv_server.cli_parent = srv; memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa); /* init functions */ pthread_mutex_init(&srv->srv_funcs.mtx, NULL); SLIST_INIT(&srv->srv_funcs); AVL_INIT(&srv->srv_funcs); /* init scheduler */ srv->srv_root = schedBegin(); if (!srv->srv_root) { rpc_SetErr(sched_GetErrno(), "%s", sched_GetError()); pthread_mutex_destroy(&srv->srv_funcs.mtx); e_free(srv); return NULL; } /* init pool for clients */ srv->srv_clients = array_Init(concurentClients); if (!srv->srv_clients) { rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError()); schedEnd(&srv->srv_root); pthread_mutex_destroy(&srv->srv_funcs.mtx); e_free(srv); return NULL; } /* create server socket */ srv->srv_server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, srv->srv_proto, srv->srv_proto == SOCK_RAW ? IPPROTO_ERPC : 0); if (srv->srv_server.cli_sock == -1) { LOGERR; array_Destroy(&srv->srv_clients); schedEnd(&srv->srv_root); pthread_mutex_destroy(&srv->srv_funcs.mtx); e_free(srv); return NULL; } if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) { LOGERR; goto err; } if (srv->srv_proto == SOCK_STREAM) setsockopt(srv->srv_server.cli_sock, IPPROTO_TCP, TCP_NODELAY, &n, sizeof n); n = srv->srv_netbuf; if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) { LOGERR; goto err; } if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) { LOGERR; goto err; } if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa, salen) == -1) { LOGERR; goto err; } else fcntl(srv->srv_server.cli_sock, F_SETFL, fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK); rpc_register_srvPing(srv); return srv; err: /* error condition */ close(srv->srv_server.cli_sock); array_Destroy(&srv->srv_clients); schedEnd(&srv->srv_root); pthread_mutex_destroy(&srv->srv_funcs.mtx); e_free(srv); return NULL; } /* * rpc_srv_endServer() - Destroy RPC server, close all opened sockets and free resources * * @psrv = RPC Server instance * return: none */ void rpc_srv_endServer(rpc_srv_t ** __restrict psrv) { if (!psrv || !*psrv) return; /* if send kill to blob server */ rpc_srv_endBLOBServer(*psrv); /* wait for BLOB server done */ while (*(&(*psrv)->srv_blob.root)) usleep(1000); (*psrv)->srv_kill = 1; sleep(RPC_SCHED_POLLING); if ((*psrv)->srv_server.cli_sa.sa.sa_family == AF_LOCAL) unlink((*psrv)->srv_server.cli_sa.sun.sun_path); schedEnd(&(*psrv)->srv_root); pthread_mutex_destroy(&(*psrv)->srv_funcs.mtx); e_free(*psrv); *psrv = NULL; } /* * rpc_srv_loopServer() - Execute Main server loop and wait for clients requests * * @srv = RPC Server instance * return: -1 error or 0 ok, infinite loop ... */ int rpc_srv_loopServer(rpc_srv_t * __restrict srv) { rpc_cli_t *c; register int i; rpc_func_t *f; struct timespec ts = { RPC_SCHED_POLLING, 0 }; if (!srv) { rpc_SetErr(EINVAL, "Invalid parameter can`t start RPC server"); return -1; } if (srv->srv_proto == SOCK_STREAM) if (listen(srv->srv_server.cli_sock, array_Size(srv->srv_clients)) == -1) { LOGERR; return -1; } if (!schedRead(srv->srv_root, cbProto[srv->srv_proto][CB_ACCEPTCLIENT], srv, srv->srv_server.cli_sock, NULL, 0)) { rpc_SetErr(sched_GetErrno(), "%s", sched_GetError()); return -1; } schedPolling(srv->srv_root, &ts, NULL); /* main rpc loop */ schedRun(srv->srv_root, &srv->srv_kill); /* close all clients connections & server socket */ for (i = 0; i < array_Size(srv->srv_clients); i++) { c = array(srv->srv_clients, i, rpc_cli_t*); if (c) { if (srv->srv_proto == SOCK_STREAM) { shutdown(c->cli_sock, SHUT_RDWR); close(c->cli_sock); } schedCancelby(srv->srv_root, taskMAX, CRITERIA_ARG, c, NULL); ait_freeVars(&RPC_RETVARS(c)); AIT_FREE_VAL(&c->cli_buf); } array_Del(srv->srv_clients, i, 42); } array_Destroy(&srv->srv_clients); if (srv->srv_proto != SOCK_EXT) close(srv->srv_server.cli_sock); /* detach exported calls */ RPC_FUNCS_LOCK(&srv->srv_funcs); while ((f = SLIST_FIRST(&srv->srv_funcs))) { SLIST_REMOVE_HEAD(&srv->srv_funcs, func_next); AIT_FREE_VAL(&f->func_name); e_free(f); } srv->srv_funcs.avlh_root = NULL; RPC_FUNCS_UNLOCK(&srv->srv_funcs); return 0; } /* * rpc_srv_execCall() Execute registered call from RPC server * * @cli = RPC client * @rpc = IN RPC call structure * @funcname = Execute RPC function * @args = IN RPC calling arguments from RPC client * return: -1 error, !=-1 ok */ int rpc_srv_execCall(rpc_cli_t * __restrict cli, struct tagRPCCall * __restrict rpc, ait_val_t funcname, array_t * __restrict args) { rpc_callback_t func; if (!cli || !rpc || !AIT_ADDR(&funcname)) { rpc_SetErr(EINVAL, "Invalid parameter can`t exec function"); return -1; } func = AIT_GET_LIKE(&funcname, rpc_callback_t); return func(cli, rpc, args); } /* * rpc_srv_initServer2() - Init & create layer2 RPC Server * * @InstID = Instance for authentication & recognition * @concurentClients = Concurent clients at same time to this server * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet) * @csIface = Interface name for bind server, if NULL first interface on host * return: NULL == error or !=NULL bind and created RPC server instance */ rpc_srv_t * rpc_srv_initServer2(u_char InstID, int concurentClients, int netBuf, const char *csIface) { #ifndef __linux__ int n = 1; rpc_srv_t *srv = NULL; sockaddr_t sa = E_SOCKADDR_INIT; char szIface[64], szStr[STRSIZ]; register int i; struct ifreq ifr; struct bpf_insn insns[] = { BPF_STMT(BPF_LD + BPF_H + BPF_ABS, 12), BPF_JUMP(BPF_JMP + BPF_JEQ + BPF_K, RPC_DEFPORT, 0, 1), BPF_STMT(BPF_RET + BPF_K, -1), BPF_STMT(BPF_RET + BPF_K, 0), }; struct bpf_program fcode = { .bf_len = sizeof(insns) / sizeof(struct bpf_insn), .bf_insns = insns }; if (!concurentClients) { rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server"); return NULL; } if (!csIface) { if (e_get1stiface(szIface, sizeof szIface)) return NULL; } else strlcpy(szIface, csIface, sizeof szIface); if (!e_getifacebyname(szIface, &sa)) return NULL; #ifdef HAVE_SRANDOMDEV srandomdev(); #else time_t tim; srandom((time(&tim) ^ getpid())); #endif srv = e_malloc(sizeof(rpc_srv_t)); if (!srv) { LOGERR; return NULL; } else memset(srv, 0, sizeof(rpc_srv_t)); srv->srv_proto = SOCK_BPF; srv->srv_netbuf = netBuf; srv->srv_session.sess_version = RPC_VERSION; srv->srv_session.sess_instance = InstID; srv->srv_server.cli_parent = srv; memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa); /* init functions */ pthread_mutex_init(&srv->srv_funcs.mtx, NULL); SLIST_INIT(&srv->srv_funcs); AVL_INIT(&srv->srv_funcs); /* init scheduler */ srv->srv_root = schedBegin(); if (!srv->srv_root) { rpc_SetErr(sched_GetErrno(), "%s", sched_GetError()); pthread_mutex_destroy(&srv->srv_funcs.mtx); e_free(srv); return NULL; } /* init pool for clients */ srv->srv_clients = array_Init(concurentClients); if (!srv->srv_clients) { rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError()); schedEnd(&srv->srv_root); pthread_mutex_destroy(&srv->srv_funcs.mtx); e_free(srv); return NULL; } /* create server handler */ for (i = 0; i < 10; i++) { memset(szStr, 0, sizeof szStr); snprintf(szStr, sizeof szStr, "/dev/bpf%d", i); srv->srv_server.cli_sock = open(szStr, O_RDWR); if (srv->srv_server.cli_sock > STDERR_FILENO) break; } if (srv->srv_server.cli_sock < 3) { LOGERR; array_Destroy(&srv->srv_clients); schedEnd(&srv->srv_root); pthread_mutex_destroy(&srv->srv_funcs.mtx); e_free(srv); return NULL; } if (ioctl(srv->srv_server.cli_sock, BIOCIMMEDIATE, &n) == -1) { LOGERR; goto err; } if (ioctl(srv->srv_server.cli_sock, BIOCSETF, &fcode) == -1) { LOGERR; goto err; } n = (netBuf < RPC_MIN_BUFSIZ) ? getpagesize() : E_ALIGN(netBuf, 2); if (ioctl(srv->srv_server.cli_sock, BIOCSBLEN, &n) == -1) { LOGERR; goto err; } else srv->srv_netbuf = n; memset(&ifr, 0, sizeof ifr); strlcpy(ifr.ifr_name, szIface, sizeof ifr.ifr_name); if (ioctl(srv->srv_server.cli_sock, BIOCSETIF, &ifr) == -1) { LOGERR; goto err; } else fcntl(srv->srv_server.cli_sock, F_SETFL, fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK); rpc_register_srvPing(srv); return srv; err: /* error condition */ close(srv->srv_server.cli_sock); array_Destroy(&srv->srv_clients); schedEnd(&srv->srv_root); pthread_mutex_destroy(&srv->srv_funcs.mtx); e_free(srv); #else rpc_SetErr(ENOTSUP, "Feature isn't supported on Linux!"); #endif return NULL; } /* * rpc_srv_initServerExt() - Init & create pipe RPC Server * * @InstID = Instance for authentication & recognition * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet) * @fd = File descriptor * return: NULL == error or !=NULL bind and created RPC server instance */ rpc_srv_t * rpc_srv_initServerExt(u_char InstID, int netBuf, int fd) { rpc_srv_t *srv = NULL; #ifdef HAVE_SRANDOMDEV srandomdev(); #else time_t tim; srandom((time(&tim) ^ getpid())); #endif srv = e_malloc(sizeof(rpc_srv_t)); if (!srv) { LOGERR; return NULL; } else memset(srv, 0, sizeof(rpc_srv_t)); srv->srv_proto = SOCK_EXT; srv->srv_netbuf = (netBuf < RPC_MIN_BUFSIZ) ? getpagesize() : E_ALIGN(netBuf, 2); srv->srv_session.sess_version = RPC_VERSION; srv->srv_session.sess_instance = InstID; srv->srv_server.cli_parent = srv; srv->srv_server.cli_sock = fd; /* init functions */ pthread_mutex_init(&srv->srv_funcs.mtx, NULL); SLIST_INIT(&srv->srv_funcs); AVL_INIT(&srv->srv_funcs); /* init scheduler */ srv->srv_root = schedBegin(); if (!srv->srv_root) { rpc_SetErr(sched_GetErrno(), "%s", sched_GetError()); pthread_mutex_destroy(&srv->srv_funcs.mtx); e_free(srv); return NULL; } /* init pool for clients */ srv->srv_clients = array_Init(1); if (!srv->srv_clients) { rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError()); schedEnd(&srv->srv_root); pthread_mutex_destroy(&srv->srv_funcs.mtx); e_free(srv); return NULL; } fcntl(srv->srv_server.cli_sock, F_SETFL, fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK); rpc_register_srvPing(srv); return srv; } /* * rpc_srv_Return() - Prepare IPC return answer to RPC client * * @c = RPC client * return: number of arguments in response */ int rpc_srv_Return(rpc_cli_t *c) { rpc_srv_t *s = c->cli_parent; u_char *buf = AIT_GET_BUF(&c->cli_buf); struct tagRPCCall *rpc = (struct tagRPCCall*) buf; if (!RPC_CHK_NOREPLY(rpc)) { rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c)); schedWrite(s->srv_root, cbProto[s->srv_proto][CB_TXPACKET], c, c->cli_sock, rpc, 0); } else rpc->call_argc ^= rpc->call_argc; return rpc->call_argc; }