--- libaitrpc/src/srv.c 2014/11/17 23:51:26 1.23.6.2 +++ libaitrpc/src/srv.c 2014/12/18 00:50:06 1.23.6.6 @@ -3,7 +3,7 @@ * by Michael Pounov * * $Author: misho $ -* $Id: srv.c,v 1.23.6.2 2014/11/17 23:51:26 misho Exp $ +* $Id: srv.c,v 1.23.6.6 2014/12/18 00:50:06 misho Exp $ * ************************************************************************** The ELWIX and AITNET software is distributed under the following @@ -587,7 +587,7 @@ rxUDPPacket(sched_task_t *task) rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond"); schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], c, 0, NULL, 0); - return NULL; + goto end; } salen = sa.ss.ss_len = sizeof(sockaddr_t); rlen = recvfrom(TASK_FD(task), buf, len, 0, &sa.sa, &salen); @@ -595,7 +595,7 @@ rxUDPPacket(sched_task_t *task) /* close connection */ schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], c, 0, NULL, 0); - return NULL; + goto end; } if (e_addrcmp(&c->cli_sa, &sa, 42)) rlen ^= rlen; /* skip if arrive from different address */ @@ -607,9 +607,175 @@ rxUDPPacket(sched_task_t *task) rpc->call_crc ^= rpc->call_crc; if (crc != crcFletcher16((u_short*) rpc, len / 2)) { rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet"); + /* close connection */ + schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], + c, 0, NULL, 0); + goto end; + } + + noreply = RPC_CHK_NOREPLY(rpc); + + /* check RPC packet session info */ + if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) { + rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session"); + + rpc->call_argc ^= rpc->call_argc; + rpc->call_rep.ret = RPC_ERROR(-1); + rpc->call_rep.eno = RPC_ERROR(errno); + } else { + /* execute RPC call */ + schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len); + } + + /* send RPC reply */ + if (!noreply) + schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], + c, TASK_FD(task), rpc, len); +end: + schedReadSelf(task); + return NULL; +} + + +static void * +txBPFPacket(sched_task_t *task) +{ + rpc_cli_t *c = TASK_ARG(task); + rpc_srv_t *s = c->cli_parent; + rpc_func_t *f = NULL; + u_char *buf = AIT_GET_BUF(&c->cli_buf); + struct tagRPCCall *rpc = (struct tagRPCCall*) buf; + int ret, len, wlen = sizeof(struct tagRPCCall); + struct timespec ts = { DEF_RPC_TIMEOUT, 0 }; + struct ether_header *eh; + ait_val_t b = AIT_VAL_INIT; + + schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL); + schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], + TASK_ARG(task), ts, TASK_ARG(task), 0); + + if (rpc->call_argc) { + f = rpc_srv_getCall(s, ntohs(rpc->call_tag)); + if (!f) { + rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server"); + rpc->call_argc ^= rpc->call_argc; + rpc->call_rep.ret = RPC_ERROR(-1); + rpc->call_rep.eno = RPC_ERROR(rpc_Errno); + } else { + /* calc estimated length */ + len = ait_resideVars(RPC_RETVARS(c)) + wlen; + if (len > AIT_LEN(&c->cli_buf)) + AIT_RE_BUF(&c->cli_buf, len); + buf = AIT_GET_BUF(&c->cli_buf); + rpc = (struct tagRPCCall*) buf; + + rpc->call_argc = htons(array_Size(RPC_RETVARS(c))); + /* Go Encapsulate variables */ + ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen, + RPC_RETVARS(c)); + /* Free return values */ + ait_freeVars(&RPC_RETVARS(c)); + if (ret == -1) { + rpc_SetErr(EBADRPC, "Prepare RPC packet failed"); + rpc->call_argc ^= rpc->call_argc; + rpc->call_rep.ret = RPC_ERROR(-1); + rpc->call_rep.eno = RPC_ERROR(rpc_Errno); + } else + wlen += ret; + } + } + + rpc->call_len = htonl(wlen); + + /* calculate CRC */ + rpc->call_crc ^= rpc->call_crc; + rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2)); + + /* send reply */ + AIT_SET_BUF(&b, NULL, MIN(wlen, s->srv_netbuf) + ETHER_HDR_LEN); + eh = (struct ether_header*) AIT_GET_BUF(&b); + memcpy(eh->ether_dhost, LLADDR(&c->cli_sa.sdl), ETHER_ADDR_LEN); + eh->ether_type = htons(RPC_DEFPORT); + memcpy(eh + 1, buf, MIN(wlen, s->srv_netbuf)); + + ret = write(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b)); + AIT_FREE_VAL(&b); + if (ret == -1) { + /* close connection */ + schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], + TASK_ARG(task), 0, NULL, 0); return NULL; } + return NULL; +} + +static void * +rxBPFPacket(sched_task_t *task) +{ + rpc_srv_t *srv = TASK_ARG(task); + rpc_cli_t *c = NULL; + int len, rlen, noreply; + u_short crc; + struct tagRPCCall *rpc; + sockaddr_t sa; + struct timespec ts = { DEF_RPC_TIMEOUT, 0 }; + struct bpf_hdr *h; + struct ether_header *eh; + ait_val_t b = AIT_VAL_INIT; + + /* receive connect packet */ + AIT_SET_BUF(&b, NULL, srv->srv_netbuf); + rlen = read(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b)); + h = (struct bpf_hdr*) AIT_GET_BUF(&b); + rlen -= h->bh_hdrlen; + if (rlen < h->bh_caplen || h->bh_caplen != h->bh_datalen || + rlen < ETHER_HDR_LEN + sizeof(struct tagRPCCall)) { + rpc_SetErr(ERPCMISMATCH, "Short RPC packet"); + goto end; + } else { + rlen = h->bh_caplen; + eh = (struct ether_header*) (AIT_GET_BUF(&b) + h->bh_hdrlen); + rlen -= ETHER_HDR_LEN; + rpc = (struct tagRPCCall*) (eh + 1); + if (eh->ether_type != ntohs(RPC_DEFPORT)) + goto end; + else + e_getlinkbymac((const ether_addr_t*) eh->ether_shost, &sa); + } + + c = _allocClient(srv, &sa); + if (!c) { + EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n"); + usleep(2000); /* blocked client delay */ + goto end; + } else { + len = ntohl(rpc->call_len); + if (len > AIT_LEN(&c->cli_buf)) + AIT_RE_BUF(&c->cli_buf, len); + memcpy(AIT_GET_BUF(&c->cli_buf), rpc, AIT_LEN(&c->cli_buf)); + rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf); + + c->cli_sock = TASK_FD(task); + memcpy(&c->cli_sa, &sa, sizeof c->cli_sa); + + /* armed timer for close stateless connection */ + schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL); + schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], + c, ts, c, 0); + } + + /* check integrity of packet */ + crc = ntohs(rpc->call_crc); + rpc->call_crc ^= rpc->call_crc; + if (crc != crcFletcher16((u_short*) rpc, len / 2)) { + rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet"); + /* close connection */ + schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], + c, 0, NULL, 0); + goto end; + } + noreply = RPC_CHK_NOREPLY(rpc); /* check RPC packet session info */ @@ -629,6 +795,7 @@ rxUDPPacket(sched_task_t *task) schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], c, TASK_FD(task), rpc, len); end: + AIT_FREE_VAL(&b); schedReadSelf(task); return NULL; } @@ -1244,8 +1411,10 @@ rpc_srv_loopServer(rpc_srv_t * __restrict srv) for (i = 0; i < array_Size(srv->srv_clients); i++) { c = array(srv->srv_clients, i, rpc_cli_t*); if (c) { - shutdown(c->cli_sock, SHUT_RDWR); - close(c->cli_sock); + if (srv->srv_proto == SOCK_STREAM) { + shutdown(c->cli_sock, SHUT_RDWR); + close(c->cli_sock); + } schedCancelby(srv->srv_root, taskMAX, CRITERIA_ARG, c, NULL); ait_freeVars(&RPC_RETVARS(c)); @@ -1315,6 +1484,16 @@ rpc_srv_initServer2(u_char InstID, int concurentClient char szIface[64], szStr[STRSIZ]; register int i; struct ifreq ifr; + struct bpf_insn insns[] = { + BPF_STMT(BPF_LD + BPF_H + BPF_ABS, 12), + BPF_JUMP(BPF_JMP + BPF_JEQ + BPF_K, RPC_DEFPORT, 0, 1), + BPF_STMT(BPF_RET + BPF_K, -1), + BPF_STMT(BPF_RET + BPF_K, 0), + }; + struct bpf_program fcode = { + .bf_len = sizeof(insns) / sizeof(struct bpf_insn), + .bf_insns = insns + }; if (!concurentClients) { rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server"); @@ -1325,7 +1504,7 @@ rpc_srv_initServer2(u_char InstID, int concurentClient return NULL; } else strlcpy(szIface, csIface, sizeof szIface); - if (e_getifacebyname(szIface, &sa)) + if (!e_getifacebyname(szIface, &sa)) return NULL; #ifdef HAVE_SRANDOMDEV @@ -1396,6 +1575,10 @@ rpc_srv_initServer2(u_char InstID, int concurentClient LOGERR; goto err; } + if (ioctl(srv->srv_server.cli_sock, BIOCSETF, &fcode) == -1) { + LOGERR; + goto err; + } n = (netBuf < RPC_MIN_BUFSIZ) ? getpagesize() : E_ALIGN(netBuf, 2); if (ioctl(srv->srv_server.cli_sock, BIOCSBLEN, &n) == -1) { LOGERR; @@ -1422,61 +1605,4 @@ err: /* error condition */ pthread_mutex_destroy(&srv->srv_funcs.mtx); e_free(srv); return NULL; -} - -/* - * rpc_srv_loopServer2() - Execute Main layer2 server loop and wait for clients requests - * - * @srv = RPC Server instance - * return: -1 error or 0 ok, infinite loop ... - */ -int -rpc_srv_loopServer2(rpc_srv_t * __restrict srv) -{ - rpc_cli_t *c; - register int i; - rpc_func_t *f; - struct timespec ts = { RPC_SCHED_POLLING, 0 }; - - if (!srv) { - rpc_SetErr(EINVAL, "Invalid parameter can`t start RPC server"); - return -1; - } - - if (!schedRead(srv->srv_root, cbProto[srv->srv_proto][CB_ACCEPTCLIENT], srv, - srv->srv_server.cli_sock, NULL, 0)) { - rpc_SetErr(sched_GetErrno(), "%s", sched_GetError()); - return -1; - } - - schedPolling(srv->srv_root, &ts, NULL); - /* main rpc loop */ - schedRun(srv->srv_root, &srv->srv_kill); - - /* close all clients connections & server socket */ - for (i = 0; i < array_Size(srv->srv_clients); i++) { - c = array(srv->srv_clients, i, rpc_cli_t*); - if (c) { - schedCancelby(srv->srv_root, taskMAX, CRITERIA_ARG, c, NULL); - ait_freeVars(&RPC_RETVARS(c)); - AIT_FREE_VAL(&c->cli_buf); - } - array_Del(srv->srv_clients, i, 42); - } - array_Destroy(&srv->srv_clients); - - close(srv->srv_server.cli_sock); - - /* detach exported calls */ - RPC_FUNCS_LOCK(&srv->srv_funcs); - while ((f = SLIST_FIRST(&srv->srv_funcs))) { - SLIST_REMOVE_HEAD(&srv->srv_funcs, func_next); - - AIT_FREE_VAL(&f->func_name); - e_free(f); - } - srv->srv_funcs.avlh_root = NULL; - RPC_FUNCS_UNLOCK(&srv->srv_funcs); - - return 0; }