--- libaitrpc/src/srv.c 2010/06/23 16:23:31 1.1.1.1.2.6 +++ libaitrpc/src/srv.c 2010/06/24 15:23:38 1.1.1.1.2.12 @@ -3,7 +3,7 @@ * by Michael Pounov * * $Author: misho $ -* $Id: srv.c,v 1.1.1.1.2.6 2010/06/23 16:23:31 misho Exp $ +* $Id: srv.c,v 1.1.1.1.2.12 2010/06/24 15:23:38 misho Exp $ * *************************************************************************/ #include "global.h" @@ -12,15 +12,15 @@ static void * rpc_srv_dispatchCall(void *arg) { - rpc_cli_t cli, *c = arg; + rpc_cli_t *c = arg; rpc_srv_t *s; - rpc_val_t *vals, *v = NULL; + rpc_val_t *vals = NULL, *v = NULL; rpc_func_t *f; struct tagRPCCall *rpc; struct tagRPCRet rrpc; fd_set fds; u_char buf[BUFSIZ], *data; - int ret, argc, Limit = 0; + int ret, argc = 0, Limit = 0; register int i; if (!arg) { @@ -57,7 +57,7 @@ rpc_srv_dispatchCall(void *arg) if (memcmp(&rpc->call_session, &s->srv_session, sizeof rpc->call_session)) { rpc_SetErr(EINVAL, "Error:: get invalid RPC session ...\n"); ret = -5; - break; + goto makeReply; } // RPC is OK! Go decapsulate variables ... if (rpc->call_argc) { @@ -75,9 +75,11 @@ rpc_srv_dispatchCall(void *arg) data += v[i].val_len + 1; break; case blob: - rpc_srv_getBLOBVar(&v[i], data); - data += sizeof(rpc_cli_t); - break; + if (s->srv_blob.state == disable) { + rpc_SetErr(ENOTSUP, "Error:: BLOB server is disabled\n"); + ret = -5; + goto makeReply; + } default: break; } @@ -90,11 +92,12 @@ rpc_srv_dispatchCall(void *arg) rpc_SetErr(EINVAL, "Error:: call not found into RPC server ...\n"); ret = -6; } else - if ((ret = rpc_srv_execCall(s, f, rpc, v)) == -1) + if ((ret = rpc_srv_execCall(f, rpc, v)) == -1) ret = -6; else argc = rpc_srv_getValsCall(f, &vals); +makeReply: memcpy(&rrpc.ret_session, &rpc->call_session, sizeof rrpc.ret_session); rrpc.ret_tag = rpc->call_tag; rrpc.ret_hash = rpc->call_hash; @@ -136,19 +139,12 @@ rpc_srv_dispatchCall(void *arg) Limit += vals[i].val_len + 1; break; case blob: - if (ret || Limit + sizeof(rpc_cli_t) > BUFSIZ) { - rpc_SetErr(EMSGSIZE, "Error:: in prepare RPC packet (-7) ...\n"); - rrpc.ret_retcode = ret = -7; + if (s->srv_blob.state == disable) { + rpc_SetErr(ENOTSUP, "Error:: BLOB server is disabled\n"); + rrpc.ret_retcode = ret = -5; rrpc.ret_argc = 0; break; } - - memcpy(data, &cli, sizeof(rpc_cli_t)); - data += sizeof(rpc_cli_t); - Limit += sizeof(rpc_cli_t); - - rpc_srv_setBLOBVar(&vals[i], &cli); - break; default: break; } @@ -196,21 +192,26 @@ rpc_srv_dispatchVars(void *arg) cx = -1; do { - cx++; + // check for disable service at this moment? + if (s->srv_blob.state == disable) { + ret = 0; + break; + } FD_ZERO(&fds); FD_SET(c->cli_sock, &fds); ret = select(c->cli_sock + 1, &fds, NULL, NULL, NULL); if (ret == -1) { ret = -2; - } + } else + cx++; memset(buf, 0, sizeof buf); if ((ret = recv(c->cli_sock, buf, sizeof buf, 0)) == -1) { LOGERR; ret = -3; break; } - if (!ret) { // receive EOF + if (!ret || s->srv_blob.state == disable) { // receive EOF or disable service ret = 0; break; } @@ -225,16 +226,18 @@ rpc_srv_dispatchVars(void *arg) blob->hdr_seq != cx) { rpc_SetErr(EINVAL, "Error:: get invalid BLOB session in seq=%d...\n", blob->hdr_seq); ret = -5; - break; + goto makeReply; } - // Go to decapsulate packet ... - if (!(b = rpc_srv_getBLOB(s, blob->hdr_var))) { - rpc_SetErr(EINVAL, "Error:: var (%x) not found into BLOB server ...\n", blob->hdr_var); - ret = -6; - break; - } + // Go to proceed packet ... switch (blob->hdr_cmd) { case get: + if (!(b = rpc_srv_getBLOB(s, blob->hdr_var))) { + rpc_SetErr(EINVAL, "Error:: var (%x) not found into BLOB server ...\n", + blob->hdr_var); + ret = -6; + break; + } + if (rpc_srv_blobMap(s, b) != -1) { ret = rpc_srv_sendBLOB(c, b); rpc_srv_blobUnmap(b); @@ -242,22 +245,41 @@ rpc_srv_dispatchVars(void *arg) ret = -7; break; case set: - ret = rpc_srv_recvBLOB(c, b); - if (ret == -1) + if ((b = rpc_srv_registerBLOB(s, blob->hdr_len))) { + // set new BLOB variable for reply :) + blob->hdr_var = b->blob_var; + + ret = rpc_srv_recvBLOB(c, b); + rpc_srv_blobUnmap(b); + } else ret = -7; break; case unset: - ret = rpc_srv_deleteBLOB(c, b); + ret = rpc_srv_unregisterBLOB(s, blob->hdr_var); if (ret == -1) ret = -7; break; default: rpc_SetErr(EINVAL, "Error:: unsupported BLOB command (%d)...\n", blob->hdr_cmd); - ret -7; + ret = -7; } +makeReply: // Replay to client! + blob->hdr_cmd = ret < 0 ? error : ok; + blob->hdr_seq = ret; + if ((ret = send(c->cli_sock, buf, sizeof buf, 0)) == -1) { + LOGERR; + ret = -8; + break; + } + if (ret != sizeof buf) { + rpc_SetErr(EBADMSG, "Error:: in send BLOB reply, should be send %d bytes, " + "really is %d\n", sizeof buf, ret); + ret = -9; + break; + } } while (ret > -1); shutdown(c->cli_sock, SHUT_RDWR); @@ -366,6 +388,10 @@ rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv) } else srv->srv_blob.state = disable; + rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSHUTDOWN); + rpc_srv_unregisterCall(srv, NULL, CALL_BLOBCLIENTS); + rpc_srv_unregisterCall(srv, NULL, CALL_BLOBVARS); + for (i = 0, c = srv->srv_blob.clients; i < srv->srv_numcli && c; i++, c++) if (c->cli_sa.sa_family) shutdown(c->cli_sock, SHUT_RDWR); @@ -399,7 +425,7 @@ rpc_srv_execBLOBServer(rpc_srv_t * __restrict srv) int ret; struct timeval tv = { DEF_RPC_TIMEOUT, 0 }; - if (!srv || !srv->srv_blob.state) { + if (!srv || srv->srv_blob.state == disable) { rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start BLOB server ...\n"); return -1; } @@ -409,7 +435,7 @@ rpc_srv_execBLOBServer(rpc_srv_t * __restrict srv) return -1; } - while (!rpc_Kill) { + while (!blob_Kill && !rpc_Kill) { for (c = srv->srv_blob.clients, i = 0; i < srv->srv_numcli && c; i++, c++) if (!c->cli_sa.sa_family) break; @@ -442,6 +468,8 @@ rpc_srv_execBLOBServer(rpc_srv_t * __restrict srv) } } + srv->srv_blob.state = disable; + return 0; } @@ -663,21 +691,20 @@ rpc_srv_execServer(rpc_srv_t * __restrict srv) /* * rpc_srv_execCall() Execute registered call from RPC server - * @data = RPC const data * @call = Register RPC call * @rpc = IN RPC call structure * @args = IN RPC call array of rpc values * return: -1 error, !=-1 ok */ int -rpc_srv_execCall(void * const data, rpc_func_t * __restrict call, - struct tagRPCCall * __restrict rpc, rpc_val_t * __restrict args) +rpc_srv_execCall(rpc_func_t * __restrict call, struct tagRPCCall * __restrict rpc, + rpc_val_t * __restrict args) { void *dl; rpc_callback_t func; int ret; - if (!data || !call || !rpc) { + if (!call || !rpc || !call->func_parent) { rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t exec call from RPC server ...\n"); return -1; } @@ -690,7 +717,7 @@ rpc_srv_execCall(void * const data, rpc_func_t * __res func = dlsym(dl, (char*) call->func_name); if (func) - ret = func(data, call, rpc->call_argc, args); + ret = func(call, rpc->call_argc, args); else { rpc_SetErr(ENOEXEC, "Error:: Can`t find function %s!\n", dlerror()); ret = -1;