--- libaitrpc/src/srv.c 2012/03/13 17:10:13 1.6.2.4 +++ libaitrpc/src/srv.c 2012/03/14 15:08:03 1.6.2.8 @@ -3,7 +3,7 @@ * by Michael Pounov * * $Author: misho $ -* $Id: srv.c,v 1.6.2.4 2012/03/13 17:10:13 misho Exp $ +* $Id: srv.c,v 1.6.2.8 2012/03/14 15:08:03 misho Exp $ * ************************************************************************** The ELWIX and AITNET software is distributed under the following @@ -47,6 +47,7 @@ SUCH DAMAGE. static void *rxPacket(sched_task_t*); +static void *rxBLOB(sched_task_t*); static void * txPacket(sched_task_t *task) @@ -59,6 +60,8 @@ txPacket(sched_task_t *task) int ret, wlen = sizeof(struct tagRPCCall); array_t *arr = NULL; + FTRACE(); + if (rpc->call_argc) { f = rpc_srv_getCall(s, ntohs(rpc->call_tag), ntohl(rpc->call_hash)); if (!f) { @@ -80,6 +83,10 @@ txPacket(sched_task_t *task) } } + /* calculate CRC */ + rpc->call_crc ^= rpc->call_crc; + rpc->call_crc = htons(crcFletcher16((u_short*) buf, ((wlen + 1) & ~1) / 2)); + /* send reply */ ret = send(TASK_FD(task), buf, wlen, 0); if (ret == -1) @@ -87,10 +94,9 @@ txPacket(sched_task_t *task) else if (ret != wlen) rpc_SetErr(EPROCUNAVAIL, "RPC reply, should be send %d bytes, " "really sended %d bytes", wlen, ret); + else + LOGGER("Sended %d bytes", ret); - /* lets get next packet */ - schedRead(TASK_ROOT(task), rxPacket, TASK_ARG(task), TASK_FD(task), - TASK_DATA(task), TASK_DATLEN(task)); return NULL; } @@ -105,6 +111,8 @@ execCall(sched_task_t *task) struct tagRPCCall *rpc = (struct tagRPCCall*) buf; int argc = ntohs(rpc->call_argc); + FTRACE(); + /* Go decapsulate variables ... */ if (!(rpc->call_req.flags & RPC_NOREPLY) && argc) { arr = io_buffer2vars(buf + sizeof(struct tagRPCCall), @@ -124,6 +132,9 @@ execCall(sched_task_t *task) rpc->call_rep.ret = RPC_ERROR(-1); rpc->call_rep.eno = RPC_ERROR(rpc_Errno); } else { + LOGGER("RPC function %s from module %s", AIT_GET_STR(&f->func_name), + AIT_GET_LIKE(&f->func_file, char*)); + rpc->call_rep.ret = RPC_ERROR(rpc_srv_execCall(f, rpc, arr)); if (rpc->call_rep.ret == htonl(-1)) { rpc->call_rep.eno = RPC_ERROR(errno); @@ -150,22 +161,20 @@ rxPacket(sched_task_t *task) struct tagRPCCall *rpc; struct timespec ts; + FTRACE(); + memset(buf, 0, TASK_DATLEN(task)); rlen = recv(TASK_FD(task), buf, TASK_DATLEN(task), 0); if (rlen == -1) { LOGERR; - s->srv_kill = kill; - - schedRead(TASK_ROOT(task), rxPacket, TASK_ARG(task), TASK_FD(task), - TASK_DATA(task), TASK_DATLEN(task)); + s->srv_kill = s->srv_blob.state = kill; return NULL; } else if (!rlen) { /* receive EOF */ - s->srv_kill = kill; - - schedRead(TASK_ROOT(task), rxPacket, TASK_ARG(task), TASK_FD(task), - TASK_DATA(task), TASK_DATLEN(task)); + s->srv_kill = s->srv_blob.state = kill; return NULL; - } + } else + LOGGER("Readed %d bytes", rlen); + if (rlen < sizeof(struct tagRPCCall)) { rpc_SetErr(ERPCMISMATCH, "Too short RPC packet"); @@ -209,9 +218,9 @@ end: if (!(rpc->call_req.flags & RPC_NOREPLY)) schedWrite(TASK_ROOT(task), txPacket, TASK_ARG(task), TASK_FD(task), TASK_DATA(task), TASK_DATLEN(task)); - else - schedRead(TASK_ROOT(task), rxPacket, TASK_ARG(task), TASK_FD(task), - TASK_DATA(task), TASK_DATLEN(task)); + /* lets get next packet */ + schedRead(TASK_ROOT(task), rxPacket, TASK_ARG(task), TASK_FD(task), + TASK_DATA(task), TASK_DATLEN(task)); return NULL; } @@ -224,6 +233,8 @@ rpc_srv_dispatchCall(void *arg) sched_root_task_t *root; struct timespec ts = { DEF_RPC_TIMEOUT, 0 }; + FTRACE(); + if (!arg) { rpc_SetErr(EINVAL, "Invalid parameter can`t procced RPC client"); return NULL; @@ -249,7 +260,7 @@ rpc_srv_dispatchCall(void *arg) schedRead(root, rxPacket, c, c->cli_sock, buf, s->srv_netbuf); - schedRun(root, (intptr_t*) &s->srv_kill); + schedRun(root, (void*) &s->srv_kill); schedEnd(&root); shutdown(c->cli_sock, SHUT_RDWR); @@ -261,113 +272,199 @@ rpc_srv_dispatchCall(void *arg) static void * -rpc_srv_dispatchVars(void *arg) +txBLOB(sched_task_t *task) { - rpc_cli_t *c = arg; - rpc_srv_t *s; + rpc_cli_t *c = TASK_ARG(task); + u_char *buf = TASK_DATA(task); + struct tagBLOBHdr *blob = (struct tagBLOBHdr *) buf; + int wlen = sizeof(struct tagBLOBHdr); + + FTRACE(); + + /* calculate CRC */ + blob->hdr_crc ^= blob->hdr_crc; + blob->hdr_crc = htons(crcFletcher16((u_short*) buf, ((wlen + 1) & ~1) / 2)); + + /* send reply */ + wlen = send(TASK_FD(task), buf, wlen, 0); + if (wlen == -1) + LOGERR; + else if (wlen != sizeof(struct tagBLOBHdr)) + rpc_SetErr(EPROCUNAVAIL, "RPC reply, should be send %d bytes, " + "really sended %d bytes", sizeof(struct tagBLOBHdr), wlen); + else + LOGGER("Sended %d bytes", wlen); + + return NULL; +} + +static void * +rxBLOB(sched_task_t *task) +{ + rpc_cli_t *c = TASK_ARG(task); + rpc_srv_t *s = c->cli_parent; rpc_blob_t *b; - int ret = 0; - fd_set fds; - u_char buf[sizeof(struct tagBLOBHdr)]; - struct tagBLOBHdr *blob; + u_char *buf = TASK_DATA(task); + struct tagBLOBHdr *blob = (struct tagBLOBHdr *) buf; + int rlen; + u_short crc; + struct timespec ts; - if (!arg) { - rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t procced BLOB client ...\n"); - return NULL; - } else - s = c->cli_parent; + FTRACE(); - do { - /* check for disable service at this moment? */ - if (s->srv_blob.state == disable && s->srv_kill != kill) { - usleep(100000); + /* check for disable service at this moment? */ + if (s->srv_blob.state == disable) { + usleep(100000); #ifdef HAVE_PTHREAD_YIELD - pthread_yield(); + pthread_yield(); #endif - continue; - } + schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task), + TASK_DATA(task), TASK_DATLEN(task)); + return NULL; + } - FD_ZERO(&fds); - FD_SET(c->cli_sock, &fds); - ret = select(c->cli_sock + 1, &fds, NULL, NULL, NULL); - if (ret == -1) { - if (errno == EINTR && s->srv_kill != kill && s->srv_blob.state != kill) - continue; + memset(buf, 0, TASK_DATLEN(task)); + rlen = recv(TASK_FD(task), buf, TASK_DATLEN(task), 0); + if (rlen == -1) { + LOGERR; + s->srv_blob.state = kill; + return NULL; + } else if (!rlen || s->srv_kill == kill) { /* receive EOF */ + s->srv_blob.state = kill; + return NULL; + } else + LOGGER("Readed %d bytes", rlen); - LOGERR; - ret = -2; + if (rlen < sizeof(struct tagBLOBHdr)) { + rpc_SetErr(ERPCMISMATCH, "Too short BLOB packet"); + schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task), + TASK_DATA(task), TASK_DATLEN(task)); + return NULL; + } + + /* check integrity of packet */ + crc = ntohs(blob->hdr_crc); + blob->hdr_crc ^= blob->hdr_crc; + if (crc != crcFletcher16((u_short*) buf, ((rlen + 1) & ~1) / 2)) { + rpc_SetErr(ERPCMISMATCH, "Bad CRC BLOB packet"); + schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task), + TASK_DATA(task), TASK_DATLEN(task)); + return NULL; + } + + /* check RPC packet session info */ + if (rpc_chkPktSession(&blob->hdr_session, &s->srv_session)) { + rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session"); + blob->hdr_cmd = error; + goto end; + } else { + /* change socket timeout from last packet */ + ts.tv_sec = blob->hdr_session.sess_timeout; + ts.tv_nsec = 0; + schedPolling(TASK_ROOT(task), &ts, NULL); + } + + /* Go to proceed packet ... */ + switch (blob->hdr_cmd) { + case get: + if (!(b = rpc_srv_getBLOB(s, ntohl(blob->hdr_var)))) { + rpc_SetErr(EINVAL, "Var=%x not found", ntohl(blob->hdr_var)); + blob->hdr_cmd = no; + blob->hdr_ret = RPC_ERROR(-1); + break; + } else + blob->hdr_len = htonl(b->blob_len); + + if (rpc_srv_blobMap(s, b) != -1) { + /* deliver BLOB variable to client */ + blob->hdr_ret = htonl(rpc_srv_sendBLOB(c, b)); + rpc_srv_blobUnmap(b); + } else { + blob->hdr_cmd = error; + blob->hdr_ret = RPC_ERROR(-1); + } break; - } + case set: + if ((b = rpc_srv_registerBLOB(s, ntohl(blob->hdr_len)))) { + /* set new BLOB variable for reply :) */ + blob->hdr_var = htonl(b->blob_var); - memset(buf, 0, sizeof buf); - ret = recv(c->cli_sock, buf, sizeof buf, 0); - if (ret == -1) { - LOGERR; - ret = -3; + /* receive BLOB from client */ + blob->hdr_ret = htonl(rpc_srv_recvBLOB(c, b)); + rpc_srv_blobUnmap(b); + } else { + blob->hdr_cmd = error; + blob->hdr_ret = RPC_ERROR(-1); + } break; - } - /* receive EOF, disable or kill service */ - if (!ret || s->srv_blob.state == kill || s->srv_kill == kill) { - ret = 0; + case unset: + if (rpc_srv_unregisterBLOB(s, blob->hdr_var) == -1) { + blob->hdr_cmd = error; + blob->hdr_ret = RPC_ERROR(-1); + } break; - } - if (ret < sizeof(struct tagBLOBHdr)) { - rpc_SetErr(ERPCMISMATCH, "Error:: too short BLOB packet ...\n"); - ret = -4; - if (s->srv_kill != kill && s->srv_blob.state != kill) - continue; - else - break; - } else - blob = (struct tagBLOBHdr*) buf; - /* check BLOB packet session info */ - if (memcmp(&blob->hdr_session, &s->srv_session, sizeof blob->hdr_session)) { - rpc_SetErr(EINVAL, "Error:: get invalid BLOB session ...\n"); - ret = -5; - goto makeReply; - } - /* Go to proceed packet ... */ - switch (blob->hdr_cmd) { - case get: - if (!(b = rpc_srv_getBLOB(s, blob->hdr_var))) { - rpc_SetErr(EINVAL, "Error:: var (%x) not found into BLOB server ...\n", - blob->hdr_var); - ret = -6; - break; - } else - blob->hdr_len = b->blob_len; + default: + rpc_SetErr(EPROCUNAVAIL, "Unsupported BLOB command %d", blob->hdr_cmd); + blob->hdr_cmd = error; + blob->hdr_ret = RPC_ERROR(-1); + } - if (rpc_srv_blobMap(s, b) != -1) { - ret = rpc_srv_sendBLOB(c, b); - rpc_srv_blobUnmap(b); - } else - ret = -7; - break; - case set: - if ((b = rpc_srv_registerBLOB(s, blob->hdr_len))) { - /* set new BLOB variable for reply :) */ - blob->hdr_var = b->blob_var; +end: + schedWrite(TASK_ROOT(task), txBLOB, TASK_ARG(task), TASK_FD(task), + TASK_DATA(task), TASK_DATLEN(task)); + schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task), + TASK_DATA(task), TASK_DATLEN(task)); + return NULL; +} - ret = rpc_srv_recvBLOB(c, b); - rpc_srv_blobUnmap(b); - } else - ret = -7; - break; - case unset: - ret = rpc_srv_unregisterBLOB(s, blob->hdr_var); - if (ret == -1) - ret = -7; - break; - default: - rpc_SetErr(EPROCUNAVAIL, "Error:: unsupported BLOB command (%d)...\n", - blob->hdr_cmd); - ret = -7; - } +static void * +rpc_srv_dispatchVars(void *arg) +{ + rpc_cli_t *c = arg; + rpc_srv_t *s; + sched_root_task_t *root; + u_char *buf; + struct timespec ts = { DEF_RPC_TIMEOUT, 0 }; + FTRACE(); + + if (!arg) { + rpc_SetErr(EINVAL, "Invalid parameter can`t procced BLOB"); + return NULL; + } else + s = c->cli_parent; + + /* allocate net buffer */ + buf = malloc(sizeof(struct tagBLOBHdr)); + if (!buf) { + LOGERR; + return NULL; + } + + root = schedBegin(); + if (!root) { + rpc_SetErr(sched_GetErrno(), "%s", sched_GetError()); + free(buf); + return NULL; + } else { + schedTermCondition(root, kill); + schedPolling(root, &ts, NULL); + } + + schedRead(root, rxBLOB, c, c->cli_sock, buf, sizeof(struct tagBLOBHdr)); + + schedRun(root, (void*) &s->srv_blob.state); + schedEnd(&root); + + shutdown(c->cli_sock, SHUT_RDWR); + close(c->cli_sock); + memset(c, 0, sizeof(rpc_cli_t)); + free(buf); + return NULL; + +#if 0 makeReply: /* Replay to client! */ - blob->hdr_cmd = ret < 0 ? error : ok; - blob->hdr_ret = ret; ret = send(c->cli_sock, buf, sizeof buf, 0); if (ret == -1) { LOGERR; @@ -383,12 +480,7 @@ makeReply: else break; } - } while (ret > -1 || s->srv_kill != kill); - - shutdown(c->cli_sock, SHUT_RDWR); - close(c->cli_sock); - memset(c, 0, sizeof(rpc_cli_t)); - return (void*) ((long)ret); +#endif } // ------------------------------------------------- @@ -407,6 +499,8 @@ rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s int n = 1; io_sockaddr_t sa; + FTRACE(); + if (!srv) { rpc_SetErr(EINVAL, "Invalid parameters can`t init BLOB server"); return -1; @@ -421,7 +515,7 @@ rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s LOGERR; return -1; } else - srv->srv_blob.dir = strdup(diskDir); + AIT_SET_STR(&srv->srv_blob.dir, diskDir); srv->srv_blob.server.cli_tid = pthread_self(); srv->srv_blob.server.cli_parent = srv; @@ -438,7 +532,7 @@ rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s strlcat(sa.sun.sun_path, ".blob", sizeof sa.sun.sun_path); break; default: - free(srv->srv_blob.dir); + AIT_FREE_VAL(&srv->srv_blob.dir); return -1; } memcpy(&srv->srv_blob.server.cli_sa, &sa, sizeof sa); @@ -447,33 +541,33 @@ rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0); if (srv->srv_blob.server.cli_sock == -1) { LOGERR; - free(srv->srv_blob.dir); + AIT_FREE_VAL(&srv->srv_blob.dir); return -1; } if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) { LOGERR; close(srv->srv_blob.server.cli_sock); - free(srv->srv_blob.dir); + AIT_FREE_VAL(&srv->srv_blob.dir); return -1; } n = srv->srv_netbuf; if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) { LOGERR; close(srv->srv_blob.server.cli_sock); - free(srv->srv_blob.dir); + AIT_FREE_VAL(&srv->srv_blob.dir); return -1; } if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) { LOGERR; close(srv->srv_blob.server.cli_sock); - free(srv->srv_blob.dir); + AIT_FREE_VAL(&srv->srv_blob.dir); return -1; } if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa, srv->srv_blob.server.cli_sa.sa.sa_len) == -1) { LOGERR; close(srv->srv_blob.server.cli_sock); - free(srv->srv_blob.dir); + AIT_FREE_VAL(&srv->srv_blob.dir); return -1; } @@ -482,7 +576,7 @@ rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s if (!srv->srv_blob.clients) { LOGERR; close(srv->srv_blob.server.cli_sock); - free(srv->srv_blob.dir); + AIT_FREE_VAL(&srv->srv_blob.dir); return -1; } else memset(srv->srv_blob.clients, 0, srv->srv_numcli * sizeof(rpc_cli_t)); @@ -511,6 +605,8 @@ rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv) register int i; rpc_blob_t *f; + FTRACE(); + if (!srv) { rpc_SetErr(EINVAL, "Can`t destroy server because parameter is null!"); return; @@ -522,8 +618,7 @@ rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv) rpc_srv_unregisterCall(srv, NULL, CALL_BLOBVARS); rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSTATE); - if (srv->srv_blob.dir) - free(srv->srv_blob.dir); + AIT_FREE_VAL(&srv->srv_blob.dir); /* close all clients connections & server socket */ for (i = 0, c = srv->srv_blob.clients; i < srv->srv_numcli && c; i++, c++) @@ -566,8 +661,10 @@ rpc_srv_loopBLOB(rpc_srv_t * __restrict srv) struct timeval tv = { DEF_RPC_TIMEOUT, 0 }; pthread_attr_t attr; + FTRACE(); + if (!srv || srv->srv_blob.state == kill) { - rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start BLOB server ...\n"); + rpc_SetErr(EINVAL, "Invalid parameter can`t start BLOB server"); return -1; } @@ -648,6 +745,8 @@ rpc_srv_initServer(u_int regProgID, u_int regProcID, i struct hostent *host = NULL; io_sockaddr_t sa; + FTRACE(); + if (!concurentClients || !regProgID || (family != AF_INET && family != AF_INET6 && family != AF_LOCAL)) { rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t init RPC server ...\n"); @@ -774,6 +873,8 @@ rpc_srv_endServer(rpc_srv_t ** __restrict psrv) register int i; rpc_func_t *f; + FTRACE(); + if (!psrv || !*psrv) { rpc_SetErr(EINVAL, "Error:: Can`t destroy server because parameter is null!\n"); return; @@ -830,6 +931,8 @@ rpc_srv_loopServer(rpc_srv_t * __restrict srv) struct timeval tv = { DEF_RPC_TIMEOUT, 0 }; pthread_attr_t attr; + FTRACE(); + if (!srv) { rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start RPC server ...\n"); return -1; @@ -909,6 +1012,8 @@ rpc_srv_execCall(rpc_func_t * __restrict call, struct rpc_callback_t func; int ret; + FTRACE(); + if (!call || !rpc || !call->func_parent) { rpc_SetErr(EINVAL, "Invalid parameter can`t exec function"); return -1; @@ -920,7 +1025,7 @@ rpc_srv_execCall(rpc_func_t * __restrict call, struct return -1; } - func = dlsym(dl, AIT_GET_STR(&call->func_name)); + func = dlsym(dl, (const char*) AIT_GET_STR(&call->func_name)); if (func) ret = func(call, ntohs(rpc->call_argc), args); else {