--- libaitrpc/src/cli.c 2011/09/07 07:24:21 1.5 +++ libaitrpc/src/cli.c 2011/11/03 15:32:21 1.6 @@ -3,7 +3,7 @@ * by Michael Pounov * * $Author: misho $ -* $Id: cli.c,v 1.5 2011/09/07 07:24:21 misho Exp $ +* $Id: cli.c,v 1.6 2011/11/03 15:32:21 misho Exp $ * ************************************************************************** The ELWIX and AITNET software is distributed under the following @@ -56,20 +56,15 @@ rpc_cli_t * rpc_cli_openBLOBClient(rpc_cli_t * __restrict rpccli, u_short Port) { rpc_cli_t *cli = NULL; - struct sockaddr sa; - struct sockaddr_in *sin = (struct sockaddr_in*) &sa; - struct sockaddr_in6 *sin6 = (struct sockaddr_in6*) &sa; - struct sockaddr_un *sun = (struct sockaddr_un*) &sa; + io_sockaddr_t sa; int n; if (!rpccli || - (rpccli->cli_sa.sa_family != AF_INET && rpccli->cli_sa.sa_family != AF_INET6 && - rpccli->cli_sa.sa_family != AF_LOCAL)) { + (rpccli->cli_sa.sa.sa_family != AF_INET && rpccli->cli_sa.sa.sa_family != AF_INET6 && + rpccli->cli_sa.sa.sa_family != AF_LOCAL)) { rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t connect to BLOB server ...\n"); return NULL; } - if (!Port) - Port = RPC_DEFPORT + 1; cli = malloc(sizeof(rpc_cli_t)); if (!cli) { @@ -79,23 +74,21 @@ rpc_cli_openBLOBClient(rpc_cli_t * __restrict rpccli, memcpy(cli, rpccli, sizeof(rpc_cli_t)); memcpy(&sa, &rpccli->cli_sa, sizeof sa); - switch (rpccli->cli_sa.sa_family) { + switch (sa.sa.sa_family) { case AF_INET: - sin->sin_port = htons(Port); - memcpy(&cli->cli_sa, sin, sizeof(struct sockaddr)); + sa.sin.sin_port = htons(Port ? Port : ntohs(sa.sin.sin_port) + 1); break; case AF_INET6: - sin6->sin6_port = htons(Port); - memcpy(&cli->cli_sa, sin6, sizeof(struct sockaddr)); + sa.sin6.sin6_port = htons(Port ? Port : ntohs(sa.sin6.sin6_port) + 1); break; case AF_LOCAL: - strlcat(sun->sun_path, ".blob", sizeof sun->sun_path); - memcpy(&cli->cli_sa, sun, sizeof(struct sockaddr)); + strlcat(sa.sun.sun_path, ".blob", sizeof sa.sun.sun_path); break; } + memcpy(&cli->cli_sa, &sa, sizeof sa); /* connect to BLOB server */ - cli->cli_sock = socket(cli->cli_sa.sa_family, SOCK_STREAM, 0); + cli->cli_sock = socket(cli->cli_sa.sa.sa_family, SOCK_STREAM, 0); if (cli->cli_sock == -1) { LOGERR; free(cli); @@ -114,7 +107,7 @@ rpc_cli_openBLOBClient(rpc_cli_t * __restrict rpccli, free(cli); return NULL; } - if (connect(cli->cli_sock, &cli->cli_sa, sizeof cli->cli_sa) == -1) { + if (connect(cli->cli_sock, &cli->cli_sa.sa, cli->cli_sa.sa.sa_len) == -1) { LOGERR; close(cli->cli_sock); free(cli); @@ -161,10 +154,7 @@ rpc_cli_openClient(u_int ProgID, u_int ProcID, int net { rpc_cli_t *cli = NULL; struct hostent *host = NULL; - struct sockaddr sa; - struct sockaddr_in *sin = (struct sockaddr_in*) &sa; - struct sockaddr_in6 *sin6 = (struct sockaddr_in6*) &sa; - struct sockaddr_un *sun = (struct sockaddr_un*) &sa; + io_sockaddr_t sa; int n; if (!csHost || (family != AF_INET && family != AF_INET6 && family != AF_LOCAL)) { @@ -183,24 +173,24 @@ rpc_cli_openClient(u_int ProgID, u_int ProcID, int net } } memset(&sa, 0, sizeof sa); - sa.sa_family = family; + sa.sa.sa_family = family; switch (family) { case AF_INET: - sin->sin_len = sizeof(struct sockaddr_in); - sin->sin_port = htons(Port); + sa.sin.sin_len = sizeof(struct sockaddr_in); + sa.sin.sin_port = htons(Port); if (csHost) - memcpy(&sin->sin_addr, host->h_addr, host->h_length); + memcpy(&sa.sin.sin_addr, host->h_addr, host->h_length); break; case AF_INET6: - sin6->sin6_len = sizeof(struct sockaddr_in6); - sin6->sin6_port = htons(Port); + sa.sin6.sin6_len = sizeof(struct sockaddr_in6); + sa.sin6.sin6_port = htons(Port); if (csHost) - memcpy(&sin6->sin6_addr, host->h_addr, host->h_length); + memcpy(&sa.sin6.sin6_addr, host->h_addr, host->h_length); break; case AF_LOCAL: - sun->sun_len = sizeof(struct sockaddr_un); + sa.sun.sun_len = sizeof(struct sockaddr_un); if (csHost) - strlcpy(sun->sun_path, csHost, sizeof sun->sun_path); + strlcpy(sa.sun.sun_path, csHost, sizeof sa.sun.sun_path); break; default: rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t connect to RPC server ...\n"); @@ -226,17 +216,7 @@ rpc_cli_openClient(u_int ProgID, u_int ProcID, int net ((rpc_sess_t*) cli->cli_parent)->sess_process = ProcID; } - switch (family) { - case AF_INET: - memcpy(&cli->cli_sa, sin, sizeof cli->cli_sa); - break; - case AF_INET6: - memcpy(&cli->cli_sa, sin6, sizeof cli->cli_sa); - break; - case AF_LOCAL: - memcpy(&cli->cli_sa, sun, sizeof cli->cli_sa); - break; - } + memcpy(&cli->cli_sa, &sa, sizeof cli->cli_sa); /* connect to RPC server */ cli->cli_sock = socket(family, SOCK_STREAM, 0); @@ -261,7 +241,7 @@ rpc_cli_openClient(u_int ProgID, u_int ProcID, int net free(cli); return NULL; } - if (connect(cli->cli_sock, &cli->cli_sa, sizeof cli->cli_sa) == -1) { + if (connect(cli->cli_sock, &cli->cli_sa.sa, cli->cli_sa.sa.sa_len) == -1) { LOGERR; close(cli->cli_sock); free(cli->cli_parent); @@ -314,6 +294,8 @@ rpc_cli_execCall(rpc_cli_t *cli, const char *csModule, struct tagRPCRet *rrpc = NULL; int ret = 0, Limit = 0; struct timeval tv = { DEF_RPC_TIMEOUT, 0 }; + uint16_t tag; + uint32_t hash; if (!cli || !csFunc) { rpc_SetErr(EINVAL, "Error:: Can`t execute call because parameter is null or invalid!\n"); @@ -338,15 +320,15 @@ rpc_cli_execCall(rpc_cli_t *cli, const char *csModule, /* prepare RPC call */ rpc = (struct tagRPCCall*) buf; - memcpy(&rpc->call_session, cli->cli_parent, sizeof rpc->call_session); - rpc->call_argc = in_vars ? io_arraySize(in_vars) : 0; - rpc->call_tag = crcFletcher16((u_short*) str, sizeof str / 2); - rpc->call_hash = hash_fnv((char*) str, sizeof str); + rpc_addPktSession(&rpc->call_session, cli->cli_parent); + rpc->call_argc = htons(io_arraySize(in_vars)); + rpc->call_tag = tag = htons(crcFletcher16((u_short*) str, sizeof str / 2)); + rpc->call_hash = hash = htonl(hash_fnv((char*) str, sizeof str)); Limit = sizeof(struct tagRPCCall); - if (in_vars && io_arraySize(in_vars)) { + if (io_arraySize(in_vars)) { /* marshaling variables */ - ret = io_vals2buffer(buf + Limit, cli->cli_netbuf - Limit, in_vars); + ret = io_vars2buffer(buf + Limit, cli->cli_netbuf - Limit, in_vars); if (ret == -1) { rpc_SetErr(EBADRPC, "Error:: in prepare RPC packet values (-7) ...\n"); free(buf); @@ -396,88 +378,41 @@ rpc_cli_execCall(rpc_cli_t *cli, const char *csModule, } else rrpc = (struct tagRPCRet*) buf; /* check RPC packet session info */ - if (memcmp(&rrpc->ret_session, cli->cli_parent, sizeof rrpc->ret_session)) { + if (rpc_chkPktSession(&rrpc->ret_session, cli->cli_parent)) { rpc_SetErr(ERPCMISMATCH, "Error:: get invalid RPC session ...\n"); free(buf); return -5; } else Limit = sizeof(struct tagRPCRet); - if (rrpc->ret_retcode < 0 && rrpc->ret_errno) { - rpc_SetErr(rrpc->ret_errno, "Error::Server side: retcode=%d #%d %s\n", - rrpc->ret_retcode, rrpc->ret_errno, strerror(rrpc->ret_errno)); + if (rrpc->ret_tag != tag || rrpc->ret_hash != hash) { + rpc_SetErr(ERPCMISMATCH, "Error:: get wrong RPC reply ...\n"); free(buf); + return -5; + } + if (ntohl(rrpc->ret_retcode) < 0 && ntohl(rrpc->ret_errno)) { + rpc_SetErr(ntohl(rrpc->ret_errno), "Error::Server side: retcode=%d #%d %s\n", + ntohl(rrpc->ret_retcode), ntohl(rrpc->ret_errno), + strerror(ntohl(rrpc->ret_errno))); + free(buf); return -6; } - if (rrpc->ret_argc * sizeof(ait_val_t) > cli->cli_netbuf - Limit) { + if (ntohs(rrpc->ret_argc) * sizeof(ait_val_t) > cli->cli_netbuf - Limit) { rpc_SetErr(EMSGSIZE, "Error:: reply RPC packet is too long ...\n"); free(buf); return -7; } /* RPC is OK! Go de-marshaling variables ... */ - if (rrpc->ret_argc) { - *out_vars = io_buffer2vals(buf + Limit, cli->cli_netbuf - Limit, rrpc->ret_argc, 0); + if (ntohs(rrpc->ret_argc)) { + *out_vars = io_buffer2vars(buf + Limit, cli->cli_netbuf - Limit, + ntohs(rrpc->ret_argc), 0); if (!*out_vars) { free(buf); return -1; } } - ret = rrpc->ret_retcode; + ret = ntohl(rrpc->ret_retcode); free(buf); return ret; -} - -/* - * rpc_cli_freeVals() Free ait_val_t array returned from RPC call - * @vars = Variable array - * return: none - */ -inline void -rpc_cli_freeVals(array_t ** __restrict vars) -{ - register int i; - - if (!vars || !*vars) - return; - - for (i = 0; i < io_arraySize(*vars); i++) - if (io_arrayGet(*vars, i)) - AIT_FREE_VAL(io_array(*vars, i, ait_val_t*)); - - io_arrayFree(*vars); - io_arrayDestroy(vars); -} - -/* - * rpc_cli_allocVals() Allocate ait_val_t array for RPC call - * @args = Number of arguments - * return: =NULL error or !=NULL allocated array - */ -inline array_t * -rpc_cli_allocVals(u_short args) -{ - array_t *arr; - register int i; - ait_val_t *v; - - if (!args) - return NULL; - - if (!(arr = io_arrayInit(args))) - return NULL; - - for (i = 0; i < io_arraySize(arr); i++) { - v = malloc(sizeof(ait_val_t)); - if (!v) { - LOGERR; - rpc_cli_freeVals(&arr); - return NULL; - } else { - memset(v, 0, sizeof(ait_val_t)); - io_arraySet(arr, i, v); - } - } - - return arr; }