--- libaitrpc/src/srv.c 2013/08/21 11:41:15 1.17.4.4 +++ libaitrpc/src/srv.c 2013/08/21 12:59:12 1.17.4.5 @@ -3,7 +3,7 @@ * by Michael Pounov * * $Author: misho $ -* $Id: srv.c,v 1.17.4.4 2013/08/21 11:41:15 misho Exp $ +* $Id: srv.c,v 1.17.4.5 2013/08/21 12:59:12 misho Exp $ * ************************************************************************** The ELWIX and AITNET software is distributed under the following @@ -151,7 +151,7 @@ _allocClient(rpc_srv_t * __restrict srv, sockaddr_t * } /* init buffer(s) */ - c->cli_buf = ait_allocVars(1); + c->cli_buf = ait_allocVars(2); if (!c->cli_buf) { rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError()); array_Del(srv->srv_clients, n, 42); @@ -192,17 +192,20 @@ txPacket(sched_task_t *task) rpc_srv_t *s = c->cli_parent; rpc_func_t *f = NULL; u_char *buf; - struct tagRPCCall *rpc; + struct tagRPCCall *rpc = (struct tagRPCCall*) TASK_DATA(task); int ret, wlen = sizeof(struct tagRPCCall); int len = sizeof(struct tagRPCCall) + ntohl(rpc->call_len); buf = e_malloc(len); if (!buf) { rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError()); + /* close connection */ + schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], + TASK_ARG(task), 0, NULL, 0); return NULL; } else { /* copy RPC header */ - memcpy(buf, TASK_DATA(task), wlen); + memcpy(buf, rpc, wlen); rpc = (struct tagRPCCall*) buf; } @@ -241,6 +244,7 @@ txPacket(sched_task_t *task) TASK_ARG(task), 0, NULL, 0); } + e_free(buf); return NULL; } @@ -315,12 +319,7 @@ rxPacket(sched_task_t *task) /* read rpc header */ rlen = recv(TASK_FD(task), rpc, MIN(sizeof(struct tagRPCCall), AIT_LEN(bufz)), 0); - if (rlen < 1) { - /* close connection */ - schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], - TASK_ARG(task), 0, NULL, 0); - return NULL; - } else if (rlen < sizeof(struct tagRPCCall) || + if (rlen < sizeof(struct tagRPCCall) || ntohl(rpc->call_len) < sizeof(struct tagRPCCall)) { /* close connection */ schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], @@ -339,6 +338,7 @@ rxPacket(sched_task_t *task) TASK_ARG(task), 0, NULL, 0); return NULL; } else { + AIT_FREE_VAL(bufz); AIT_SET_BUFSIZ(bufz, 0, len); buf = AIT_GET_BUF(bufz); } @@ -414,33 +414,46 @@ txUDPPacket(sched_task_t *task) rpc_cli_t *c = TASK_ARG(task); rpc_srv_t *s = c->cli_parent; rpc_func_t *f = NULL; - u_char buf[USHRT_MAX] = { 0 }; - struct tagRPCCall *rpc = (struct tagRPCCall*) buf; + u_char *buf; + struct tagRPCCall *rpc = (struct tagRPCCall*) TASK_DATA(task); int ret, wlen = sizeof(struct tagRPCCall); + int len = sizeof(struct tagRPCCall) + ntohl(rpc->call_len); struct timespec ts = { DEF_RPC_TIMEOUT, 0 }; schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL); schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], TASK_ARG(task), ts, TASK_ARG(task), 0); - /* copy RPC header */ - memcpy(buf, TASK_DATA(task), wlen); + buf = e_malloc(len); + if (!buf) { + rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError()); + /* close connection */ + schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], + TASK_ARG(task), 0, NULL, 0); + return NULL; + } else { + /* copy RPC header */ + memcpy(buf, rpc, wlen); + rpc = (struct tagRPCCall*) buf; + } if (rpc->call_argc) { f = rpc_srv_getCall(s, ntohs(rpc->call_tag)); if (!f) { rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server"); + rpc->call_argc ^= rpc->call_argc; rpc->call_rep.ret = RPC_ERROR(-1); rpc->call_rep.eno = RPC_ERROR(rpc_Errno); } else { rpc->call_argc = htons(array_Size(RPC_RETVARS(c))); /* Go Encapsulate variables */ - ret = ait_vars2buffer(buf + wlen, sizeof buf - wlen, RPC_RETVARS(c)); + ret = ait_vars2buffer(buf + wlen, len - wlen, RPC_RETVARS(c)); /* Free return values */ ait_freeVars(&c->cli_vars); if (ret == -1) { rpc_SetErr(EBADRPC, "Prepare RPC packet failed"); + rpc->call_argc ^= rpc->call_argc; rpc->call_rep.ret = RPC_ERROR(-1); rpc->call_rep.eno = RPC_ERROR(rpc_Errno); @@ -449,7 +462,7 @@ txUDPPacket(sched_task_t *task) } } - rpc->call_len = htons(wlen); + rpc->call_len = htonl(wlen); /* calculate CRC */ rpc->call_crc ^= rpc->call_crc; @@ -464,6 +477,7 @@ txUDPPacket(sched_task_t *task) TASK_ARG(task), 0, NULL, 0); } + e_free(buf); return NULL; } @@ -473,83 +487,89 @@ rxUDPPacket(sched_task_t *task) rpc_srv_t *srv = TASK_ARG(task); rpc_cli_t *c = NULL; int len, rlen, noreply; - u_short crc, off = 0; - u_char buf[USHRT_MAX + 1]; - struct tagRPCCall *rpc = (struct tagRPCCall*) buf; - sockaddr_t sa; + ait_val_t *bufz; + u_char *buf = NULL; + struct tagRPCCall rpcbuf, *rpc; + sockaddr_t sa[2]; socklen_t salen; struct timespec ts = { DEF_RPC_TIMEOUT, 0 }; + memset(buf, 0, AIT_LEN(bufz)); + memset(&rpcbuf, 0, sizeof rpcbuf); + /* receive connect packet */ - salen = sa.ss.ss_len = sizeof(sockaddr_t); - rlen = recvfrom(TASK_FD(task), buf, sizeof buf, 0, &sa.sa, &salen); - if (rlen < 1) + salen = sa[0].ss.ss_len = sizeof(sockaddr_t); + rlen = recvfrom(TASK_FD(task), &rpcbuf, sizeof rpcbuf, 0, &sa[0].sa, &salen); + if (rlen < sizeof(struct tagRPCCall) || ntohl(rpcbuf.call_len) < sizeof(struct tagRPCCall)) goto end; + else + len = ntohl(rpcbuf.call_len); - c = _allocClient(srv, &sa); + buf = e_malloc(len); + if (!buf) + goto end; + else + memset(buf, 0, len); + + /* read payload */ + salen = sa[1].ss.ss_len = sizeof(sockaddr_t); + rlen = recvfrom(TASK_FD(task), buf, len, 0, &sa[1].sa, &salen); + if (rlen < len || memcmp(&sa[0], &sa[1], sizeof sa[0])) + goto end; + + c = _allocClient(srv, sa); if (!c) goto end; else { + /* add extra buffer */ + if (!(bufz = ait_getVars(&c->cli_buf, 1))) + goto end; + else { + AIT_FREE_VAL(bufz); + AIT_SET_BUFSIZ(bufz, 0, len); + /* buffer isnt last */ + RPC_SET_NEXTBUF(c); + } + + rpc = getHeader(c); + memcpy(rpc, &rpcbuf, sizeof(struct tagRPCCall)); + memcpy(getBuffer(c), buf, len); + c->cli_sock = TASK_FD(task); - memcpy(&c->cli_sa, &sa, sizeof c->cli_sa); - memcpy(AIT_GET_BUF(&c->cli_buf), buf, AIT_LEN(&c->cli_buf)); + memcpy(&c->cli_sa, sa, sizeof c->cli_sa); /* armed timer for close stateless connection */ schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL); schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], c, ts, c, 0); } - do { - /* check RPC packet */ - if (rlen < sizeof(struct tagRPCCall)) { - rpc_SetErr(ERPCMISMATCH, "Short RPC packet"); - break; - } else - rpc = (struct tagRPCCall*) (AIT_GET_BUF(&c->cli_buf) + off); + /* check integrity of packet */ + if (ntohs(rpc->call_crc) != crcFletcher16((u_short*) buf, len / 2)) { + rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet"); + goto end; + } - len = ntohs(rpc->call_len); - rlen -= len; + noreply = RPC_CHK_NOREPLY(rpc); - /* check RPC packet lengths */ - if (rlen < 0 || len < sizeof(struct tagRPCCall)) { - rpc_SetErr(ERPCMISMATCH, "Broken RPC packet length"); - /* skip entire packet */ - break; - } + /* check RPC packet session info */ + if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) { + rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session"); - /* check integrity of packet */ - crc = ntohs(rpc->call_crc); - rpc->call_crc ^= rpc->call_crc; - if (crc != crcFletcher16((u_short*) rpc, len / 2)) { - rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet"); + rpc->call_argc ^= rpc->call_argc; + rpc->call_rep.ret = RPC_ERROR(-1); + rpc->call_rep.eno = RPC_ERROR(errno); + } else { + /* execute RPC call */ + schedEvent(TASK_ROOT(task), execCall, c, noreply, rpc, len); + } - off += len; - /* try next packet remaining into buffer */ - continue; - } - - noreply = RPC_CHK_NOREPLY(rpc); - - /* check RPC packet session info */ - if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) { - rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session"); - rpc->call_argc ^= rpc->call_argc; - rpc->call_rep.ret = RPC_ERROR(-1); - rpc->call_rep.eno = RPC_ERROR(errno); - } else { - /* execute RPC call */ - schedEvent(TASK_ROOT(task), execCall, c, off, NULL, 0); - } - - /* send RPC reply */ - if (!noreply) - schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], - c, TASK_FD(task), rpc, len); - - off += len; - } while (rlen > 0); - + /* send RPC reply */ + if (!noreply) + schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], + c, TASK_FD(task), rpc, len); end: + if (buf) + e_free(buf); schedReadSelf(task); return NULL; }