--- libaitrpc/src/srv.c 2010/06/24 15:01:19 1.1.1.1.2.11 +++ libaitrpc/src/srv.c 2010/07/08 12:29:38 1.1.1.1.2.20 @@ -3,7 +3,7 @@ * by Michael Pounov * * $Author: misho $ -* $Id: srv.c,v 1.1.1.1.2.11 2010/06/24 15:01:19 misho Exp $ +* $Id: srv.c,v 1.1.1.1.2.20 2010/07/08 12:29:38 misho Exp $ * *************************************************************************/ #include "global.h" @@ -14,13 +14,13 @@ rpc_srv_dispatchCall(void *arg) { rpc_cli_t *c = arg; rpc_srv_t *s; - rpc_val_t *vals, *v = NULL; + rpc_val_t *vals = NULL, *v = NULL; rpc_func_t *f; struct tagRPCCall *rpc; struct tagRPCRet rrpc; fd_set fds; u_char buf[BUFSIZ], *data; - int ret, argc, Limit = 0; + int ret, argc = 0, Limit = 0; register int i; if (!arg) { @@ -57,7 +57,7 @@ rpc_srv_dispatchCall(void *arg) if (memcmp(&rpc->call_session, &s->srv_session, sizeof rpc->call_session)) { rpc_SetErr(EINVAL, "Error:: get invalid RPC session ...\n"); ret = -5; - break; + goto makeReply; } // RPC is OK! Go decapsulate variables ... if (rpc->call_argc) { @@ -74,6 +74,12 @@ rpc_srv_dispatchCall(void *arg) v[i].val.string = (int8_t*) data; data += v[i].val_len + 1; break; + case blob: + if (s->srv_blob.state == disable) { + rpc_SetErr(ENOTSUP, "Error:: BLOB server is disabled\n"); + ret = -5; + goto makeReply; + } default: break; } @@ -87,10 +93,11 @@ rpc_srv_dispatchCall(void *arg) ret = -6; } else if ((ret = rpc_srv_execCall(f, rpc, v)) == -1) - ret = -6; + ret = -9; else argc = rpc_srv_getValsCall(f, &vals); +makeReply: memcpy(&rrpc.ret_session, &rpc->call_session, sizeof rrpc.ret_session); rrpc.ret_tag = rpc->call_tag; rrpc.ret_hash = rpc->call_hash; @@ -131,6 +138,13 @@ rpc_srv_dispatchCall(void *arg) data += vals[i].val_len + 1; Limit += vals[i].val_len + 1; break; + case blob: + if (s->srv_blob.state == disable) { + rpc_SetErr(ENOTSUP, "Error:: BLOB server is disabled\n"); + rrpc.ret_retcode = ret = -5; + rrpc.ret_argc = 0; + break; + } default: break; } @@ -165,7 +179,7 @@ rpc_srv_dispatchVars(void *arg) rpc_cli_t *c = arg; rpc_srv_t *s; rpc_blob_t *b; - int cx, ret; + int ret; fd_set fds; u_char buf[sizeof(struct tagBLOBHdr)]; struct tagBLOBHdr *blob; @@ -176,15 +190,20 @@ rpc_srv_dispatchVars(void *arg) } else s = c->cli_parent; - cx = -1; do { + // check for disable service at this moment? + if (s->srv_blob.state == disable) { + ret = 0; + break; + } + FD_ZERO(&fds); FD_SET(c->cli_sock, &fds); ret = select(c->cli_sock + 1, &fds, NULL, NULL, NULL); if (ret == -1) { ret = -2; - } else - cx++; + } + memset(buf, 0, sizeof buf); if ((ret = recv(c->cli_sock, buf, sizeof buf, 0)) == -1) { LOGERR; @@ -202,11 +221,10 @@ rpc_srv_dispatchVars(void *arg) } else blob = (struct tagBLOBHdr*) buf; // check BLOB packet session info - if (memcmp(&blob->hdr_session, &s->srv_session, sizeof blob->hdr_session) || - blob->hdr_seq != cx) { - rpc_SetErr(EINVAL, "Error:: get invalid BLOB session in seq=%d...\n", blob->hdr_seq); + if (memcmp(&blob->hdr_session, &s->srv_session, sizeof blob->hdr_session)) { + rpc_SetErr(EINVAL, "Error:: get invalid BLOB session ...\n"); ret = -5; - break; + goto makeReply; } // Go to proceed packet ... switch (blob->hdr_cmd) { @@ -216,7 +234,8 @@ rpc_srv_dispatchVars(void *arg) blob->hdr_var); ret = -6; break; - } + } else + blob->hdr_len = b->blob_len; if (rpc_srv_blobMap(s, b) != -1) { ret = rpc_srv_sendBLOB(c, b); @@ -245,9 +264,10 @@ rpc_srv_dispatchVars(void *arg) ret = -7; } +makeReply: // Replay to client! blob->hdr_cmd = ret < 0 ? error : ok; - blob->hdr_seq = ret; + blob->hdr_ret = ret; if ((ret = send(c->cli_sock, buf, sizeof buf, 0)) == -1) { LOGERR; ret = -8; @@ -343,6 +363,7 @@ rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s rpc_srv_registerCall(srv, NULL, CALL_BLOBSHUTDOWN, 0); rpc_srv_registerCall(srv, NULL, CALL_BLOBCLIENTS, 0); rpc_srv_registerCall(srv, NULL, CALL_BLOBVARS, 0); + rpc_srv_registerCall(srv, NULL, CALL_BLOBSTATE, 1); pthread_mutex_unlock(&srv->srv_mtx); srv->srv_blob.state = enable; // enable BLOB @@ -370,22 +391,27 @@ rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv) rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSHUTDOWN); rpc_srv_unregisterCall(srv, NULL, CALL_BLOBCLIENTS); rpc_srv_unregisterCall(srv, NULL, CALL_BLOBVARS); + rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSTATE); for (i = 0, c = srv->srv_blob.clients; i < srv->srv_numcli && c; i++, c++) if (c->cli_sa.sa_family) shutdown(c->cli_sock, SHUT_RDWR); close(srv->srv_blob.server.cli_sock); - if (srv->srv_blob.clients) + if (srv->srv_blob.clients) { free(srv->srv_blob.clients); + srv->srv_blob.clients = NULL; + } pthread_mutex_lock(&srv->srv_blob.mtx); while ((f = srv->srv_blob.blobs)) { srv->srv_blob.blobs = f->blob_next; + rpc_srv_blobFree(srv, f); free(f); } pthread_mutex_unlock(&srv->srv_blob.mtx); + while (pthread_mutex_trylock(&srv->srv_blob.mtx) == EBUSY); pthread_mutex_destroy(&srv->srv_blob.mtx); } @@ -404,7 +430,7 @@ rpc_srv_execBLOBServer(rpc_srv_t * __restrict srv) int ret; struct timeval tv = { DEF_RPC_TIMEOUT, 0 }; - if (!srv || !srv->srv_blob.state) { + if (!srv || srv->srv_blob.state == disable) { rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start BLOB server ...\n"); return -1; } @@ -418,7 +444,7 @@ rpc_srv_execBLOBServer(rpc_srv_t * __restrict srv) for (c = srv->srv_blob.clients, i = 0; i < srv->srv_numcli && c; i++, c++) if (!c->cli_sa.sa_family) break; - if (c && c->cli_sa.sa_family && c->cli_parent) { + if (i >= srv->srv_numcli) { usleep(1000000); continue; } @@ -589,6 +615,7 @@ rpc_srv_endServer(rpc_srv_t * __restrict srv) if (srv->srv_clients) { free(srv->srv_clients); + srv->srv_clients = NULL; srv->srv_numcli = 0; } @@ -599,6 +626,7 @@ rpc_srv_endServer(rpc_srv_t * __restrict srv) } pthread_mutex_unlock(&srv->srv_mtx); + while (pthread_mutex_trylock(&srv->srv_mtx) == EBUSY); pthread_mutex_destroy(&srv->srv_mtx); free(srv); @@ -634,7 +662,7 @@ rpc_srv_execServer(rpc_srv_t * __restrict srv) for (c = srv->srv_clients, i = 0; i < srv->srv_numcli && c; i++, c++) if (!c->cli_sa.sa_family) break; - if (c && c->cli_sa.sa_family && c->cli_parent) { + if (i >= srv->srv_numcli) { usleep(1000000); continue; }