--- libaitrpc/src/srv.c 2013/07/16 13:04:20 1.17 +++ libaitrpc/src/srv.c 2013/08/21 15:28:16 1.17.4.8 @@ -3,7 +3,7 @@ * by Michael Pounov * * $Author: misho $ -* $Id: srv.c,v 1.17 2013/07/16 13:04:20 misho Exp $ +* $Id: srv.c,v 1.17.4.8 2013/08/21 15:28:16 misho Exp $ * ************************************************************************** The ELWIX and AITNET software is distributed under the following @@ -67,6 +67,36 @@ static sched_task_func_t cbProto[SOCK_RAW + 1][4] = { }; +ait_val_t * +rpc_getBufVar(rpc_cli_t * __restrict c) +{ + return array(c->cli_buf, RPC_ISNEXTBUF(c), ait_val_t*); +} + +u_char * +rpc_getBuffer(rpc_cli_t * __restrict c) +{ + u_char *b = NULL; + + assert(c); + + if (RPC_ISNEXTBUF(c)) + b = AIT_GET_BUF(array(c->cli_buf, 1, ait_val_t*)); + else + b = AIT_GET_BUF(array(c->cli_buf, 0, ait_val_t*)) + + sizeof(struct tagRPCCall); + + return b; +} + +struct tagRPCCall * +rpc_getHeader(rpc_cli_t * __restrict c) +{ + assert(c); + + return (struct tagRPCCall*) AIT_GET_BUF(array(c->cli_buf, 0, ait_val_t*)); +} + void rpc_freeCli(rpc_cli_t * __restrict c) { @@ -74,8 +104,8 @@ rpc_freeCli(rpc_cli_t * __restrict c) schedCancelby(s->srv_root, taskMAX, CRITERIA_ARG, c, NULL); - /* free buffer */ - AIT_FREE_VAL(&c->cli_buf); + /* free buffer(s) */ + ait_freeVars(&c->cli_buf); array_Del(s->srv_clients, c->cli_id, 0); if (c) @@ -126,8 +156,14 @@ _allocClient(rpc_srv_t * __restrict srv, sockaddr_t * c->cli_parent = srv; } - /* alloc empty buffer */ - AIT_SET_BUFSIZ(&c->cli_buf, 0, srv->srv_netbuf); + /* init buffer(s) */ + c->cli_buf = ait_allocVars(2); + if (!c->cli_buf) { + rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError()); + array_Del(srv->srv_clients, n, 42); + return NULL; + } else + AIT_SET_BUFSIZ(array(c->cli_buf, 0, ait_val_t*), 0, srv->srv_netbuf); } return c; @@ -161,28 +197,41 @@ txPacket(sched_task_t *task) rpc_cli_t *c = TASK_ARG(task); rpc_srv_t *s = c->cli_parent; rpc_func_t *f = NULL; - u_char buf[USHRT_MAX] = { 0 }; - struct tagRPCCall *rpc = (struct tagRPCCall*) buf; + u_char *buf; + struct tagRPCCall *rpc = (struct tagRPCCall*) TASK_DATA(task); int ret, wlen = sizeof(struct tagRPCCall); + int len = sizeof(struct tagRPCCall) + ntohl(rpc->call_len); - /* copy RPC header */ - memcpy(buf, TASK_DATA(task), wlen); + buf = e_malloc(len); + if (!buf) { + rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError()); + /* close connection */ + schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], + TASK_ARG(task), 0, NULL, 0); + return NULL; + } else { + /* copy RPC header */ + memcpy(buf, rpc, wlen); + rpc = (struct tagRPCCall*) buf; + } if (rpc->call_argc) { f = rpc_srv_getCall(s, ntohs(rpc->call_tag)); if (!f) { rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server"); + rpc->call_argc ^= rpc->call_argc; rpc->call_rep.ret = RPC_ERROR(-1); rpc->call_rep.eno = RPC_ERROR(rpc_Errno); } else { rpc->call_argc = htons(array_Size(RPC_RETVARS(c))); /* Go Encapsulate variables */ - ret = ait_vars2buffer(buf + wlen, sizeof buf - wlen, RPC_RETVARS(c)); + ret = ait_vars2buffer(buf + wlen, len - wlen, RPC_RETVARS(c)); /* Free return values */ ait_freeVars(&c->cli_vars); if (ret == -1) { rpc_SetErr(EBADRPC, "Prepare RPC packet failed"); + rpc->call_argc ^= rpc->call_argc; rpc->call_rep.ret = RPC_ERROR(-1); rpc->call_rep.eno = RPC_ERROR(rpc_Errno); @@ -191,14 +240,8 @@ txPacket(sched_task_t *task) } } - rpc->call_len = htons(wlen); + rpc->call_len = htonl(wlen); -#if 0 - /* calculate CRC */ - rpc->call_crc ^= rpc->call_crc; - rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2)); -#endif - /* send reply */ ret = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL); if (ret == -1 || ret != wlen) { @@ -207,6 +250,7 @@ txPacket(sched_task_t *task) TASK_ARG(task), 0, NULL, 0); } + e_free(buf); return NULL; } @@ -217,17 +261,16 @@ execCall(sched_task_t *task) rpc_srv_t *s = c->cli_parent; rpc_func_t *f = NULL; array_t *arr = NULL; - u_char *buf = AIT_GET_BUF(&c->cli_buf) + TASK_VAL(task); - struct tagRPCCall *rpc = (struct tagRPCCall*) buf; + u_char *buf = rpc_getBuffer(c); + struct tagRPCCall *rpc = rpc_getHeader(c); int argc = ntohs(rpc->call_argc); /* Go decapsulate variables ... */ if (argc) { - arr = ait_buffer2vars(buf + sizeof(struct tagRPCCall), - AIT_LEN(&c->cli_buf) - TASK_VAL(task) - sizeof(struct tagRPCCall), - argc, 42); + arr = ait_buffer2vars(buf, ntohl(rpc->call_len), argc, 42); if (!arr) { rpc_SetErr(ERPCMISMATCH, "#%d - %s", elwix_GetErrno(), elwix_GetError()); + rpc->call_argc ^= rpc->call_argc; rpc->call_rep.ret = RPC_ERROR(-1); rpc->call_rep.eno = RPC_ERROR(rpc_Errno); @@ -238,6 +281,7 @@ execCall(sched_task_t *task) if (!(f = rpc_srv_getCall(s, ntohs(rpc->call_tag)))) { rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server"); + rpc->call_argc ^= rpc->call_argc; rpc->call_rep.ret = RPC_ERROR(-1); rpc->call_rep.eno = RPC_ERROR(rpc_Errno); @@ -271,84 +315,73 @@ rxPacket(sched_task_t *task) rpc_cli_t *c = TASK_ARG(task); rpc_srv_t *s = c->cli_parent; int len, rlen, noreply; - u_short off = TASK_DATLEN(task); -#if 0 - u_short crc; -#endif - u_char *buf = AIT_GET_BUF(&c->cli_buf); - struct tagRPCCall *rpc; + ait_val_t *bufz = array(c->cli_buf, 0, ait_val_t*); + u_char *buf = (u_char*) AIT_GET_BUF(bufz); + struct tagRPCCall *rpc = (struct tagRPCCall*) buf; - if (!off) - memset(buf, 0, AIT_LEN(&c->cli_buf)); - rlen = recv(TASK_FD(task), buf + off, AIT_LEN(&c->cli_buf) - off, 0); - if (rlen < 1) { + memset(buf, 0, AIT_LEN(bufz)); + /* 1st buffer is last */ + RPC_CLR_NEXTBUF(c); + + /* read rpc header */ + rlen = recv(TASK_FD(task), rpc, MIN(sizeof(struct tagRPCCall), AIT_LEN(bufz)), 0); + if (rlen < sizeof(struct tagRPCCall) || + ntohl(rpc->call_len) < sizeof(struct tagRPCCall)) { /* close connection */ schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], TASK_ARG(task), 0, NULL, 0); return NULL; } else { - rlen += off; /* add reminded bytes from previous rxPacket, if exists! */ - off = 0; /* process buffer from start offset == 0 */ + buf += sizeof(struct tagRPCCall); + len = ntohl(rpc->call_len); } - do { - /* check RPC packet */ - if (rlen < sizeof(struct tagRPCCall)) { - rpc_SetErr(ERPCMISMATCH, "Short RPC packet"); - - /* reminder received previous bytes ;) */ - schedRead(TASK_ROOT(task), TASK_FUNC(task), TASK_ARG(task), - TASK_FD(task), TASK_DATA(task), rlen); + if (len > (AIT_LEN(bufz) - sizeof(struct tagRPCCall))) { + /* add extra buffer */ + if (!(bufz = ait_getVars(&c->cli_buf, 1))) { + /* close connection */ + schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], + TASK_ARG(task), 0, NULL, 0); return NULL; - } else - rpc = (struct tagRPCCall*) (buf + off); - - len = ntohs(rpc->call_len); - rlen -= len; - - /* check RPC packet lengths */ - if (rlen < 0 || len < sizeof(struct tagRPCCall)) { - rpc_SetErr(ERPCMISMATCH, "Broken RPC packet length"); - /* skip entire packet */ - break; + } else { + AIT_FREE_VAL(bufz); + AIT_SET_BUFSIZ(bufz, 0, len); + buf = AIT_GET_BUF(bufz); } + /* buffer isnt last */ + RPC_SET_NEXTBUF(c); + } -#if 0 - /* check integrity of packet */ - crc = ntohs(rpc->call_crc); - rpc->call_crc ^= rpc->call_crc; - if (crc != crcFletcher16((u_short*) rpc, len / 2)) { - rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet"); + /* read payload */ + rlen = recv(TASK_FD(task), buf, len, 0); + if (rlen < len) { + /* close connection */ + schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], + TASK_ARG(task), 0, NULL, 0); + return NULL; + } - off += len; - /* try next packet remaining into buffer */ - continue; - } -#endif + noreply = RPC_CHK_NOREPLY(rpc); - noreply = RPC_CHK_NOREPLY(rpc); + /* check RPC packet session info */ + if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) { + rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session"); - /* check RPC packet session info */ - if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) { - rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session"); - rpc->call_argc ^= rpc->call_argc; - rpc->call_rep.ret = RPC_ERROR(-1); - rpc->call_rep.eno = RPC_ERROR(errno); - } else { - /* execute RPC call */ - schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), off, NULL, 0); - } + rpc->call_argc ^= rpc->call_argc; + rpc->call_rep.ret = RPC_ERROR(-1); + rpc->call_rep.eno = RPC_ERROR(errno); + } else { + /* execute RPC call */ + schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), noreply, rpc, len); + } - /* send RPC reply */ - if (!noreply) - schedWrite(TASK_ROOT(task), cbProto[s->srv_proto][CB_TXPACKET], - TASK_ARG(task), TASK_FD(task), rpc, len); + /* send RPC reply */ + if (!noreply) + schedWrite(TASK_ROOT(task), cbProto[s->srv_proto][CB_TXPACKET], + TASK_ARG(task), TASK_FD(task), rpc, len); - off += len; - } while (rlen > 0); - /* lets get next packet */ - schedRead(TASK_ROOT(task), TASK_FUNC(task), TASK_ARG(task), TASK_FD(task), TASK_DATA(task), 0); + schedReadSelf(task); return NULL; } @@ -367,7 +400,7 @@ acceptClients(sched_task_t *task) c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen); if (c->cli_sock == -1) { LOGERR; - AIT_FREE_VAL(&c->cli_buf); + ait_freeVars(&c->cli_buf); array_Del(srv->srv_clients, c->cli_id, 42); goto end; } else @@ -387,33 +420,46 @@ txUDPPacket(sched_task_t *task) rpc_cli_t *c = TASK_ARG(task); rpc_srv_t *s = c->cli_parent; rpc_func_t *f = NULL; - u_char buf[USHRT_MAX] = { 0 }; - struct tagRPCCall *rpc = (struct tagRPCCall*) buf; + u_char *buf; + struct tagRPCCall *rpc = (struct tagRPCCall*) TASK_DATA(task); int ret, wlen = sizeof(struct tagRPCCall); + int len = sizeof(struct tagRPCCall) + ntohl(rpc->call_len); struct timespec ts = { DEF_RPC_TIMEOUT, 0 }; schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL); schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], TASK_ARG(task), ts, TASK_ARG(task), 0); - /* copy RPC header */ - memcpy(buf, TASK_DATA(task), wlen); + buf = e_malloc(len); + if (!buf) { + rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError()); + /* close connection */ + schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], + TASK_ARG(task), 0, NULL, 0); + return NULL; + } else { + /* copy RPC header */ + memcpy(buf, rpc, wlen); + rpc = (struct tagRPCCall*) buf; + } if (rpc->call_argc) { f = rpc_srv_getCall(s, ntohs(rpc->call_tag)); if (!f) { rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server"); + rpc->call_argc ^= rpc->call_argc; rpc->call_rep.ret = RPC_ERROR(-1); rpc->call_rep.eno = RPC_ERROR(rpc_Errno); } else { rpc->call_argc = htons(array_Size(RPC_RETVARS(c))); /* Go Encapsulate variables */ - ret = ait_vars2buffer(buf + wlen, sizeof buf - wlen, RPC_RETVARS(c)); + ret = ait_vars2buffer(buf + wlen, len - wlen, RPC_RETVARS(c)); /* Free return values */ ait_freeVars(&c->cli_vars); if (ret == -1) { rpc_SetErr(EBADRPC, "Prepare RPC packet failed"); + rpc->call_argc ^= rpc->call_argc; rpc->call_rep.ret = RPC_ERROR(-1); rpc->call_rep.eno = RPC_ERROR(rpc_Errno); @@ -422,7 +468,7 @@ txUDPPacket(sched_task_t *task) } } - rpc->call_len = htons(wlen); + rpc->call_len = htonl(wlen); /* calculate CRC */ rpc->call_crc ^= rpc->call_crc; @@ -437,6 +483,7 @@ txUDPPacket(sched_task_t *task) TASK_ARG(task), 0, NULL, 0); } + e_free(buf); return NULL; } @@ -446,83 +493,88 @@ rxUDPPacket(sched_task_t *task) rpc_srv_t *srv = TASK_ARG(task); rpc_cli_t *c = NULL; int len, rlen, noreply; - u_short crc, off = 0; - u_char buf[USHRT_MAX + 1]; - struct tagRPCCall *rpc = (struct tagRPCCall*) buf; - sockaddr_t sa; + ait_val_t *bufz; + u_char *buf = NULL; + struct tagRPCCall rpcbuf, *rpc; + sockaddr_t sa[2]; socklen_t salen; struct timespec ts = { DEF_RPC_TIMEOUT, 0 }; + memset(&rpcbuf, 0, sizeof rpcbuf); + /* receive connect packet */ - salen = sa.ss.ss_len = sizeof(sockaddr_t); - rlen = recvfrom(TASK_FD(task), buf, sizeof buf, 0, &sa.sa, &salen); - if (rlen < 1) + salen = sa[0].ss.ss_len = sizeof(sockaddr_t); + rlen = recvfrom(TASK_FD(task), &rpcbuf, sizeof rpcbuf, 0, &sa[0].sa, &salen); + if (rlen < sizeof(struct tagRPCCall) || ntohl(rpcbuf.call_len) < sizeof(struct tagRPCCall)) goto end; + else + len = ntohl(rpcbuf.call_len); - c = _allocClient(srv, &sa); + buf = e_malloc(len); + if (!buf) + goto end; + else + memset(buf, 0, len); + + /* read payload */ + salen = sa[1].ss.ss_len = sizeof(sockaddr_t); + rlen = recvfrom(TASK_FD(task), buf, len, 0, &sa[1].sa, &salen); + if (rlen < len || memcmp(&sa[0], &sa[1], sizeof sa[0])) + goto end; + + c = _allocClient(srv, sa); if (!c) goto end; else { + /* add extra buffer */ + if (!(bufz = ait_getVars(&c->cli_buf, 1))) + goto end; + else { + AIT_FREE_VAL(bufz); + AIT_SET_BUFSIZ(bufz, 0, len); + /* buffer isnt last */ + RPC_SET_NEXTBUF(c); + } + + rpc = rpc_getHeader(c); + memcpy(rpc, &rpcbuf, sizeof(struct tagRPCCall)); + memcpy(rpc_getBuffer(c), buf, len); + c->cli_sock = TASK_FD(task); - memcpy(&c->cli_sa, &sa, sizeof c->cli_sa); - memcpy(AIT_GET_BUF(&c->cli_buf), buf, AIT_LEN(&c->cli_buf)); + memcpy(&c->cli_sa, sa, sizeof c->cli_sa); /* armed timer for close stateless connection */ schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL); schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], c, ts, c, 0); } - do { - /* check RPC packet */ - if (rlen < sizeof(struct tagRPCCall)) { - rpc_SetErr(ERPCMISMATCH, "Short RPC packet"); - break; - } else - rpc = (struct tagRPCCall*) (AIT_GET_BUF(&c->cli_buf) + off); + /* check integrity of packet */ + if (ntohs(rpc->call_crc) != crcFletcher16((u_short*) buf, len / 2)) { + rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet"); + goto end; + } - len = ntohs(rpc->call_len); - rlen -= len; + noreply = RPC_CHK_NOREPLY(rpc); - /* check RPC packet lengths */ - if (rlen < 0 || len < sizeof(struct tagRPCCall)) { - rpc_SetErr(ERPCMISMATCH, "Broken RPC packet length"); - /* skip entire packet */ - break; - } + /* check RPC packet session info */ + if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) { + rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session"); - /* check integrity of packet */ - crc = ntohs(rpc->call_crc); - rpc->call_crc ^= rpc->call_crc; - if (crc != crcFletcher16((u_short*) rpc, len / 2)) { - rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet"); + rpc->call_argc ^= rpc->call_argc; + rpc->call_rep.ret = RPC_ERROR(-1); + rpc->call_rep.eno = RPC_ERROR(errno); + } else { + /* execute RPC call */ + schedEvent(TASK_ROOT(task), execCall, c, noreply, rpc, len); + } - off += len; - /* try next packet remaining into buffer */ - continue; - } - - noreply = RPC_CHK_NOREPLY(rpc); - - /* check RPC packet session info */ - if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) { - rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session"); - rpc->call_argc ^= rpc->call_argc; - rpc->call_rep.ret = RPC_ERROR(-1); - rpc->call_rep.eno = RPC_ERROR(errno); - } else { - /* execute RPC call */ - schedEvent(TASK_ROOT(task), execCall, c, off, NULL, 0); - } - - /* send RPC reply */ - if (!noreply) - schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], - c, TASK_FD(task), rpc, len); - - off += len; - } while (rlen > 0); - + /* send RPC reply */ + if (!noreply) + schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], + c, TASK_FD(task), rpc, len); end: + if (buf) + e_free(buf); schedReadSelf(task); return NULL; } @@ -536,8 +588,8 @@ rpc_freeBLOBCli(rpc_cli_t * __restrict c) schedCancelby(s->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL); - /* free buffer */ - AIT_FREE_VAL(&c->cli_buf); + /* free buffer(s) */ + ait_freeVars(&c->cli_buf); array_Del(s->srv_blob.clients, c->cli_id, 0); if (c) @@ -562,7 +614,7 @@ static void * txBLOB(sched_task_t *task) { rpc_cli_t *c = TASK_ARG(task); - u_char *buf = AIT_GET_BUF(&c->cli_buf); + u_char *buf = AIT_GET_BUF(array(c->cli_buf, 0, ait_val_t*)); int wlen = sizeof(struct tagBLOBHdr); /* send reply */ @@ -654,7 +706,7 @@ rxBLOB(sched_task_t *task) } end: - memcpy(AIT_ADDR(&c->cli_buf), &blob, sizeof blob); + memcpy(AIT_ADDR(array(c->cli_buf, 0, ait_val_t*)), &blob, sizeof blob); schedWrite(TASK_ROOT(task), txBLOB, TASK_ARG(task), TASK_FD(task), NULL, 0); schedReadSelf(task); return NULL; @@ -705,14 +757,20 @@ acceptBLOBClients(sched_task_t *task) c->cli_parent = srv; } - /* alloc empty buffer */ - AIT_SET_BUFSIZ(&c->cli_buf, 0, srv->srv_netbuf); + /* init buffer(s) */ + c->cli_buf = ait_allocVars(1); + if (!c->cli_buf) { + rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError()); + array_Del(srv->srv_blob.clients, i, 42); + goto end; + } else + AIT_SET_BUFSIZ(array(c->cli_buf, 0, ait_val_t*), 0, srv->srv_netbuf); /* accept client */ c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen); if (c->cli_sock == -1) { LOGERR; - AIT_FREE_VAL(&c->cli_buf); + ait_freeVars(&c->cli_buf); array_Del(srv->srv_blob.clients, i, 42); goto end; } else { @@ -906,7 +964,7 @@ rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv) close(c->cli_sock); schedCancelby(srv->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL); - AIT_FREE_VAL(&c->cli_buf); + ait_freeVars(&c->cli_buf); } array_Del(srv->srv_blob.clients, i, 42); } @@ -1112,7 +1170,7 @@ rpc_srv_loopServer(rpc_srv_t * __restrict srv) schedCancelby(srv->srv_root, taskMAX, CRITERIA_ARG, c, NULL); ait_freeVars(&RPC_RETVARS(c)); - AIT_FREE_VAL(&c->cli_buf); + ait_freeVars(&c->cli_buf); } array_Del(srv->srv_clients, i, 42); }