--- libaitrpc/src/srv.c 2010/06/23 15:07:15 1.1.1.1.2.5 +++ libaitrpc/src/srv.c 2011/07/14 01:25:11 1.2.2.2 @@ -3,24 +3,61 @@ * by Michael Pounov * * $Author: misho $ -* $Id: srv.c,v 1.1.1.1.2.5 2010/06/23 15:07:15 misho Exp $ +* $Id: srv.c,v 1.2.2.2 2011/07/14 01:25:11 misho Exp $ * -*************************************************************************/ +************************************************************************** +The ELWIX and AITNET software is distributed under the following +terms: + +All of the documentation and software included in the ELWIX and AITNET +Releases is copyrighted by ELWIX - Sofia/Bulgaria + +Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011 + by Michael Pounov . All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: +1. Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. +2. Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. +3. All advertising materials mentioning features or use of this software + must display the following acknowledgement: +This product includes software developed by Michael Pounov +ELWIX - Embedded LightWeight unIX and its contributors. +4. Neither the name of AITNET nor the names of its contributors + may be used to endorse or promote products derived from this software + without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS +OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) +HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY +OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF +SUCH DAMAGE. +*/ #include "global.h" static void * rpc_srv_dispatchCall(void *arg) { - rpc_cli_t cli, *c = arg; + rpc_cli_t *c = arg; rpc_srv_t *s; - rpc_val_t *vals, *v = NULL; + rpc_val_t *vals = NULL, *v = NULL; rpc_func_t *f; struct tagRPCCall *rpc; - struct tagRPCRet rrpc; + struct tagRPCRet *rrpc; fd_set fds; u_char buf[BUFSIZ], *data; - int ret, argc, Limit = 0; + int ret, argc = 0, Limit = 0; register int i; if (!arg) { @@ -30,13 +67,13 @@ rpc_srv_dispatchCall(void *arg) s = c->cli_parent; do { + v = NULL; FD_ZERO(&fds); FD_SET(c->cli_sock, &fds); ret = select(c->cli_sock + 1, &fds, NULL, NULL, NULL); if (ret == -1) { ret = -2; } - memset(&rrpc, 0, sizeof rrpc); memset(buf, 0, BUFSIZ); if ((ret = recv(c->cli_sock, buf, BUFSIZ, 0)) == -1) { LOGERR; @@ -57,27 +94,51 @@ rpc_srv_dispatchCall(void *arg) if (memcmp(&rpc->call_session, &s->srv_session, sizeof rpc->call_session)) { rpc_SetErr(EINVAL, "Error:: get invalid RPC session ...\n"); ret = -5; - break; - } + goto makeReply; + } else + Limit = sizeof(struct tagRPCCall); // RPC is OK! Go decapsulate variables ... if (rpc->call_argc) { - v = (rpc_val_t*) (buf + sizeof(struct tagRPCCall)); + v = (rpc_val_t*) (buf + Limit); + // check RPC packet length + if (rpc->call_argc * sizeof(rpc_val_t) > BUFSIZ - Limit) { + rpc_SetErr(EMSGSIZE, "Error:: Too big RPC packet ...\n"); + ret = -5; + goto makeReply; + } else + Limit += rpc->call_argc * sizeof(rpc_val_t); // RPC received variables types OK! data = (u_char*) v + rpc->call_argc * sizeof(rpc_val_t); for (i = 0; i < rpc->call_argc; i++) { switch (v[i].val_type) { case buffer: + if (v[i].val_len > BUFSIZ - Limit) { + rpc_SetErr(EMSGSIZE, "Error:: Too big RPC packet ...\n"); + ret = -5; + goto makeReply; + } else + Limit += v[i].val_len; + v[i].val.buffer = data; data += v[i].val_len; break; case string: + if (v[i].val_len > BUFSIZ - Limit) { + rpc_SetErr(EMSGSIZE, "Error:: Too big RPC packet ...\n"); + ret = -5; + goto makeReply; + } else + Limit += v[i].val_len; + v[i].val.string = (int8_t*) data; - data += v[i].val_len + 1; + data += v[i].val_len; break; case blob: - rpc_srv_getBLOBVar(&v[i], data); - data += sizeof(rpc_cli_t); - break; + if (s->srv_blob.state == disable) { + rpc_SetErr(ENOTSUP, "Error:: BLOB server is disabled\n"); + ret = -5; + goto makeReply; + } default: break; } @@ -90,32 +151,45 @@ rpc_srv_dispatchCall(void *arg) rpc_SetErr(EINVAL, "Error:: call not found into RPC server ...\n"); ret = -6; } else - if ((ret = rpc_srv_execCall(s, f, rpc, v)) == -1) - ret = -6; + if ((ret = rpc_srv_execCall(f, rpc, v)) == -1) + ret = -9; else argc = rpc_srv_getValsCall(f, &vals); - memcpy(&rrpc.ret_session, &rpc->call_session, sizeof rrpc.ret_session); - rrpc.ret_tag = rpc->call_tag; - rrpc.ret_hash = rpc->call_hash; - rrpc.ret_errno = rpc_Errno; - rrpc.ret_retcode = ret; - rrpc.ret_argc = argc; - +makeReply: memset(buf, 0, BUFSIZ); - memcpy(buf, &rrpc, (Limit = sizeof rrpc)); + rrpc = (struct tagRPCRet*) buf; + Limit = sizeof(struct tagRPCRet); + + memcpy(&rrpc->ret_session, &rpc->call_session, sizeof rrpc->ret_session); + rrpc->ret_tag = rpc->call_tag; + rrpc->ret_hash = rpc->call_hash; + rrpc->ret_errno = rpc_Errno; + rrpc->ret_retcode = ret; + rrpc->ret_argc = argc; + if (argc && vals) { - v = (rpc_val_t*) (buf + sizeof rrpc); + v = (rpc_val_t*) (buf + Limit); + if (argc * sizeof(rpc_val_t) > BUFSIZ - Limit) { + for (i = 0; i < argc; i++) + RPC_FREE_VAL(&vals[i]); + free(vals); + vals = NULL; + argc = 0; + ret = -7; + rpc_SetErr(EMSGSIZE, "Error:: in prepare RPC packet values (-7) ...\n"); + goto makeReply; + } else + Limit += argc * sizeof(rpc_val_t); memcpy(v, vals, argc * sizeof(rpc_val_t)); - Limit += argc * sizeof(rpc_val_t); data = (u_char*) v + argc * sizeof(rpc_val_t); for (ret = i = 0; i < argc; i++) { switch (vals[i].val_type) { case buffer: if (ret || Limit + vals[i].val_len > BUFSIZ) { rpc_SetErr(EMSGSIZE, "Error:: in prepare RPC packet (-7) ...\n"); - rrpc.ret_retcode = ret = -7; - rrpc.ret_argc = 0; + rrpc->ret_retcode = ret = -7; + rrpc->ret_argc = 0; break; } @@ -124,36 +198,31 @@ rpc_srv_dispatchCall(void *arg) Limit += vals[i].val_len; break; case string: - if (ret || Limit + vals[i].val_len + 1 > BUFSIZ) { + if (ret || Limit + vals[i].val_len > BUFSIZ) { rpc_SetErr(EMSGSIZE, "Error:: in prepare RPC packet (-7) ...\n"); - rrpc.ret_retcode = ret = -7; - rrpc.ret_argc = 0; + rrpc->ret_retcode = ret = -7; + rrpc->ret_argc = 0; break; } - memcpy(data, vals[i].val.string, vals[i].val_len + 1); - data += vals[i].val_len + 1; - Limit += vals[i].val_len + 1; + memcpy(data, vals[i].val.string, vals[i].val_len); + data += vals[i].val_len; + Limit += vals[i].val_len; break; case blob: - if (ret || Limit + sizeof(rpc_cli_t) > BUFSIZ) { - rpc_SetErr(EMSGSIZE, "Error:: in prepare RPC packet (-7) ...\n"); - rrpc.ret_retcode = ret = -7; - rrpc.ret_argc = 0; + if (s->srv_blob.state == disable) { + rpc_SetErr(ENOTSUP, "Error:: BLOB server is disabled\n"); + rrpc->ret_retcode = ret = -5; + rrpc->ret_argc = 0; break; } - - memcpy(data, &cli, sizeof(rpc_cli_t)); - data += sizeof(rpc_cli_t); - Limit += sizeof(rpc_cli_t); - - rpc_srv_setBLOBVar(&vals[i], &cli); - break; default: break; } RPC_FREE_VAL(&vals[i]); + free(vals); + vals = NULL; } } @@ -163,7 +232,7 @@ rpc_srv_dispatchCall(void *arg) break; } if (ret != Limit) { - rpc_SetErr(EBADMSG, "Error:: in send RPC request, should be send %d bytes, " + rpc_SetErr(ECANCELED, "Error:: in send RPC request, should be send %d bytes, " "really is %d\n", Limit, ret); ret = -9; break; @@ -173,7 +242,7 @@ rpc_srv_dispatchCall(void *arg) shutdown(c->cli_sock, SHUT_RDWR); close(c->cli_sock); memset(c, 0, sizeof(rpc_cli_t)); - return (void*) ret; + return (void*) (long)ret; } @@ -183,9 +252,9 @@ rpc_srv_dispatchVars(void *arg) rpc_cli_t *c = arg; rpc_srv_t *s; rpc_blob_t *b; - int cx, ret; + int ret; fd_set fds; - u_char buf[BLOBSIZ]; + u_char buf[sizeof(struct tagBLOBHdr)]; struct tagBLOBHdr *blob; if (!arg) { @@ -194,9 +263,12 @@ rpc_srv_dispatchVars(void *arg) } else s = c->cli_parent; - cx = -1; do { - cx++; + // check for disable service at this moment? + if (s->srv_blob.state == disable) { + ret = 0; + break; + } FD_ZERO(&fds); FD_SET(c->cli_sock, &fds); @@ -204,13 +276,14 @@ rpc_srv_dispatchVars(void *arg) if (ret == -1) { ret = -2; } - memset(buf, 0, BLOBSIZ); - if ((ret = recv(c->cli_sock, buf, BLOBSIZ, 0)) == -1) { + + memset(buf, 0, sizeof buf); + if ((ret = recv(c->cli_sock, buf, sizeof buf, 0)) == -1) { LOGERR; ret = -3; break; } - if (!ret) { // receive EOF + if (!ret || s->srv_blob.state == disable) { // receive EOF or disable service ret = 0; break; } @@ -221,39 +294,70 @@ rpc_srv_dispatchVars(void *arg) } else blob = (struct tagBLOBHdr*) buf; // check BLOB packet session info - if (memcmp(&blob->hdr_session, &s->srv_session, sizeof blob->hdr_session) || - blob->hdr_seq != cx) { - rpc_SetErr(EINVAL, "Error:: get invalid BLOB session in seq=%d...\n", blob->hdr_seq); + if (memcmp(&blob->hdr_session, &s->srv_session, sizeof blob->hdr_session)) { + rpc_SetErr(EINVAL, "Error:: get invalid BLOB session ...\n"); ret = -5; - break; + goto makeReply; } - // Go to decapsulate packet ... - if (!(b = rpc_srv_getBLOB(s, blob->hdr_var))) { - rpc_SetErr(EINVAL, "Error:: var (%x) not found into BLOB server ...\n", blob->hdr_var); - ret = -6; - break; - } + // Go to proceed packet ... switch (blob->hdr_cmd) { case get: - ret = rpc_srv_sendBLOB(c, b); + if (!(b = rpc_srv_getBLOB(s, blob->hdr_var))) { + rpc_SetErr(EINVAL, "Error:: var (%x) not found into BLOB server ...\n", + blob->hdr_var); + ret = -6; + break; + } else + blob->hdr_len = b->blob_len; + + if (rpc_srv_blobMap(s, b) != -1) { + ret = rpc_srv_sendBLOB(c, b); + rpc_srv_blobUnmap(b); + } else + ret = -7; break; case set: - ret = rpc_srv_recvBLOB(c, b); + if ((b = rpc_srv_registerBLOB(s, blob->hdr_len))) { + // set new BLOB variable for reply :) + blob->hdr_var = b->blob_var; + + ret = rpc_srv_recvBLOB(c, b); + rpc_srv_blobUnmap(b); + } else + ret = -7; break; case unset: - ret = rpc_srv_freeBLOB(b); + ret = rpc_srv_unregisterBLOB(s, blob->hdr_var); + if (ret == -1) + ret = -7; break; default: rpc_SetErr(EINVAL, "Error:: unsupported BLOB command (%d)...\n", blob->hdr_cmd); - ret -7; + ret = -7; } + +makeReply: + // Replay to client! + blob->hdr_cmd = ret < 0 ? error : ok; + blob->hdr_ret = ret; + if ((ret = send(c->cli_sock, buf, sizeof buf, 0)) == -1) { + LOGERR; + ret = -8; + break; + } + if (ret != sizeof buf) { + rpc_SetErr(ECANCELED, "Error:: in send BLOB reply, should be send %d bytes, " + "really is %d\n", sizeof buf, ret); + ret = -9; + break; + } } while (ret > -1); shutdown(c->cli_sock, SHUT_RDWR); close(c->cli_sock); memset(c, 0, sizeof(rpc_cli_t)); - return (void*) ret; + return (void*) (long)ret; } // ------------------------------------------------- @@ -332,6 +436,7 @@ rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s rpc_srv_registerCall(srv, NULL, CALL_BLOBSHUTDOWN, 0); rpc_srv_registerCall(srv, NULL, CALL_BLOBCLIENTS, 0); rpc_srv_registerCall(srv, NULL, CALL_BLOBVARS, 0); + rpc_srv_registerCall(srv, NULL, CALL_BLOBSTATE, 1); pthread_mutex_unlock(&srv->srv_mtx); srv->srv_blob.state = enable; // enable BLOB @@ -356,21 +461,30 @@ rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv) } else srv->srv_blob.state = disable; + rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSHUTDOWN); + rpc_srv_unregisterCall(srv, NULL, CALL_BLOBCLIENTS); + rpc_srv_unregisterCall(srv, NULL, CALL_BLOBVARS); + rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSTATE); + for (i = 0, c = srv->srv_blob.clients; i < srv->srv_numcli && c; i++, c++) if (c->cli_sa.sa_family) shutdown(c->cli_sock, SHUT_RDWR); close(srv->srv_blob.server.cli_sock); - if (srv->srv_blob.clients) + if (srv->srv_blob.clients) { free(srv->srv_blob.clients); + srv->srv_blob.clients = NULL; + } pthread_mutex_lock(&srv->srv_blob.mtx); while ((f = srv->srv_blob.blobs)) { srv->srv_blob.blobs = f->blob_next; + rpc_srv_blobFree(srv, f); free(f); } pthread_mutex_unlock(&srv->srv_blob.mtx); + while (pthread_mutex_trylock(&srv->srv_blob.mtx) == EBUSY); pthread_mutex_destroy(&srv->srv_blob.mtx); } @@ -389,7 +503,7 @@ rpc_srv_execBLOBServer(rpc_srv_t * __restrict srv) int ret; struct timeval tv = { DEF_RPC_TIMEOUT, 0 }; - if (!srv || !srv->srv_blob.state) { + if (!srv || srv->srv_blob.state == disable) { rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start BLOB server ...\n"); return -1; } @@ -399,11 +513,11 @@ rpc_srv_execBLOBServer(rpc_srv_t * __restrict srv) return -1; } - while (!rpc_Kill) { + while (!blob_Kill && !rpc_Kill) { for (c = srv->srv_blob.clients, i = 0; i < srv->srv_numcli && c; i++, c++) if (!c->cli_sa.sa_family) break; - if (c && c->cli_sa.sa_family && c->cli_parent) { + if (i >= srv->srv_numcli) { usleep(1000000); continue; } @@ -432,6 +546,8 @@ rpc_srv_execBLOBServer(rpc_srv_t * __restrict srv) } } + srv->srv_blob.state = disable; + return 0; } @@ -537,12 +653,12 @@ rpc_srv_initServer(u_int regProgID, u_int regProcID, i } else memset(srv->srv_clients, 0, srv->srv_numcli * sizeof(rpc_cli_t)); + pthread_mutex_init(&srv->srv_mtx, NULL); + rpc_srv_registerCall(srv, NULL, CALL_SRVSHUTDOWN, 0); rpc_srv_registerCall(srv, NULL, CALL_SRVCLIENTS, 0); rpc_srv_registerCall(srv, NULL, CALL_SRVCALLS, 0); rpc_srv_registerCall(srv, NULL, CALL_SRVSESSIONS, 0); - - pthread_mutex_init(&srv->srv_mtx, NULL); return srv; } @@ -566,12 +682,15 @@ rpc_srv_endServer(rpc_srv_t * __restrict srv) rpc_srv_endBLOBServer(srv); for (i = 0, c = srv->srv_clients; i < srv->srv_numcli && c; i++, c++) - if (c->cli_sa.sa_family) + if (c->cli_sa.sa_family) { shutdown(c->cli_sock, SHUT_RDWR); + close(c->cli_sock); + } close(srv->srv_server.cli_sock); if (srv->srv_clients) { free(srv->srv_clients); + srv->srv_clients = NULL; srv->srv_numcli = 0; } @@ -582,6 +701,7 @@ rpc_srv_endServer(rpc_srv_t * __restrict srv) } pthread_mutex_unlock(&srv->srv_mtx); + while (pthread_mutex_trylock(&srv->srv_mtx) == EBUSY); pthread_mutex_destroy(&srv->srv_mtx); free(srv); @@ -617,7 +737,7 @@ rpc_srv_execServer(rpc_srv_t * __restrict srv) for (c = srv->srv_clients, i = 0; i < srv->srv_numcli && c; i++, c++) if (!c->cli_sa.sa_family) break; - if (c && c->cli_sa.sa_family && c->cli_parent) { + if (i >= srv->srv_numcli) { usleep(1000000); continue; } @@ -653,21 +773,20 @@ rpc_srv_execServer(rpc_srv_t * __restrict srv) /* * rpc_srv_execCall() Execute registered call from RPC server - * @data = RPC const data * @call = Register RPC call * @rpc = IN RPC call structure * @args = IN RPC call array of rpc values * return: -1 error, !=-1 ok */ int -rpc_srv_execCall(void * const data, rpc_func_t * __restrict call, - struct tagRPCCall * __restrict rpc, rpc_val_t * __restrict args) +rpc_srv_execCall(rpc_func_t * __restrict call, struct tagRPCCall * __restrict rpc, + rpc_val_t * __restrict args) { void *dl; rpc_callback_t func; int ret; - if (!data || !call || !rpc) { + if (!call || !rpc || !call->func_parent) { rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t exec call from RPC server ...\n"); return -1; } @@ -680,7 +799,7 @@ rpc_srv_execCall(void * const data, rpc_func_t * __res func = dlsym(dl, (char*) call->func_name); if (func) - ret = func(data, call, rpc->call_argc, args); + ret = func(call, rpc->call_argc, args); else { rpc_SetErr(ENOEXEC, "Error:: Can`t find function %s!\n", dlerror()); ret = -1;