--- libaitrpc/src/srv.c 2012/03/14 13:29:11 1.6.2.7 +++ libaitrpc/src/srv.c 2012/03/14 15:08:03 1.6.2.8 @@ -3,7 +3,7 @@ * by Michael Pounov * * $Author: misho $ -* $Id: srv.c,v 1.6.2.7 2012/03/14 13:29:11 misho Exp $ +* $Id: srv.c,v 1.6.2.8 2012/03/14 15:08:03 misho Exp $ * ************************************************************************** The ELWIX and AITNET software is distributed under the following @@ -47,6 +47,7 @@ SUCH DAMAGE. static void *rxPacket(sched_task_t*); +static void *rxBLOB(sched_task_t*); static void * txPacket(sched_task_t *task) @@ -96,9 +97,6 @@ txPacket(sched_task_t *task) else LOGGER("Sended %d bytes", ret); - /* lets get next packet */ - schedRead(TASK_ROOT(task), rxPacket, TASK_ARG(task), TASK_FD(task), - TASK_DATA(task), TASK_DATLEN(task)); return NULL; } @@ -135,7 +133,7 @@ execCall(sched_task_t *task) rpc->call_rep.eno = RPC_ERROR(rpc_Errno); } else { LOGGER("RPC function %s from module %s", AIT_GET_STR(&f->func_name), - AIT_GET_STR(&f->func_file)); + AIT_GET_LIKE(&f->func_file, char*)); rpc->call_rep.ret = RPC_ERROR(rpc_srv_execCall(f, rpc, arr)); if (rpc->call_rep.ret == htonl(-1)) { @@ -169,16 +167,10 @@ rxPacket(sched_task_t *task) rlen = recv(TASK_FD(task), buf, TASK_DATLEN(task), 0); if (rlen == -1) { LOGERR; - s->srv_kill = kill; - - schedRead(TASK_ROOT(task), rxPacket, TASK_ARG(task), TASK_FD(task), - TASK_DATA(task), TASK_DATLEN(task)); + s->srv_kill = s->srv_blob.state = kill; return NULL; } else if (!rlen) { /* receive EOF */ - s->srv_kill = kill; - - schedRead(TASK_ROOT(task), rxPacket, TASK_ARG(task), TASK_FD(task), - TASK_DATA(task), TASK_DATLEN(task)); + s->srv_kill = s->srv_blob.state = kill; return NULL; } else LOGGER("Readed %d bytes", rlen); @@ -226,9 +218,9 @@ end: if (!(rpc->call_req.flags & RPC_NOREPLY)) schedWrite(TASK_ROOT(task), txPacket, TASK_ARG(task), TASK_FD(task), TASK_DATA(task), TASK_DATLEN(task)); - else - schedRead(TASK_ROOT(task), rxPacket, TASK_ARG(task), TASK_FD(task), - TASK_DATA(task), TASK_DATLEN(task)); + /* lets get next packet */ + schedRead(TASK_ROOT(task), rxPacket, TASK_ARG(task), TASK_FD(task), + TASK_DATA(task), TASK_DATLEN(task)); return NULL; } @@ -280,115 +272,199 @@ rpc_srv_dispatchCall(void *arg) static void * -rpc_srv_dispatchVars(void *arg) +txBLOB(sched_task_t *task) { - rpc_cli_t *c = arg; - rpc_srv_t *s; + rpc_cli_t *c = TASK_ARG(task); + u_char *buf = TASK_DATA(task); + struct tagBLOBHdr *blob = (struct tagBLOBHdr *) buf; + int wlen = sizeof(struct tagBLOBHdr); + + FTRACE(); + + /* calculate CRC */ + blob->hdr_crc ^= blob->hdr_crc; + blob->hdr_crc = htons(crcFletcher16((u_short*) buf, ((wlen + 1) & ~1) / 2)); + + /* send reply */ + wlen = send(TASK_FD(task), buf, wlen, 0); + if (wlen == -1) + LOGERR; + else if (wlen != sizeof(struct tagBLOBHdr)) + rpc_SetErr(EPROCUNAVAIL, "RPC reply, should be send %d bytes, " + "really sended %d bytes", sizeof(struct tagBLOBHdr), wlen); + else + LOGGER("Sended %d bytes", wlen); + + return NULL; +} + +static void * +rxBLOB(sched_task_t *task) +{ + rpc_cli_t *c = TASK_ARG(task); + rpc_srv_t *s = c->cli_parent; rpc_blob_t *b; - int ret = 0; - fd_set fds; - u_char buf[sizeof(struct tagBLOBHdr)]; - struct tagBLOBHdr *blob; + u_char *buf = TASK_DATA(task); + struct tagBLOBHdr *blob = (struct tagBLOBHdr *) buf; + int rlen; + u_short crc; + struct timespec ts; FTRACE(); - if (!arg) { - rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t procced BLOB client ...\n"); + /* check for disable service at this moment? */ + if (s->srv_blob.state == disable) { + usleep(100000); +#ifdef HAVE_PTHREAD_YIELD + pthread_yield(); +#endif + schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task), + TASK_DATA(task), TASK_DATLEN(task)); return NULL; + } + + memset(buf, 0, TASK_DATLEN(task)); + rlen = recv(TASK_FD(task), buf, TASK_DATLEN(task), 0); + if (rlen == -1) { + LOGERR; + s->srv_blob.state = kill; + return NULL; + } else if (!rlen || s->srv_kill == kill) { /* receive EOF */ + s->srv_blob.state = kill; + return NULL; } else - s = c->cli_parent; + LOGGER("Readed %d bytes", rlen); - do { - /* check for disable service at this moment? */ - if (s->srv_blob.state == disable && s->srv_kill != kill) { - usleep(100000); -#ifdef HAVE_PTHREAD_YIELD - pthread_yield(); -#endif - continue; - } + if (rlen < sizeof(struct tagBLOBHdr)) { + rpc_SetErr(ERPCMISMATCH, "Too short BLOB packet"); + schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task), + TASK_DATA(task), TASK_DATLEN(task)); + return NULL; + } - FD_ZERO(&fds); - FD_SET(c->cli_sock, &fds); - ret = select(c->cli_sock + 1, &fds, NULL, NULL, NULL); - if (ret == -1) { - if (errno == EINTR && s->srv_kill != kill && s->srv_blob.state != kill) - continue; + /* check integrity of packet */ + crc = ntohs(blob->hdr_crc); + blob->hdr_crc ^= blob->hdr_crc; + if (crc != crcFletcher16((u_short*) buf, ((rlen + 1) & ~1) / 2)) { + rpc_SetErr(ERPCMISMATCH, "Bad CRC BLOB packet"); + schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task), + TASK_DATA(task), TASK_DATLEN(task)); + return NULL; + } - LOGERR; - ret = -2; + /* check RPC packet session info */ + if (rpc_chkPktSession(&blob->hdr_session, &s->srv_session)) { + rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session"); + blob->hdr_cmd = error; + goto end; + } else { + /* change socket timeout from last packet */ + ts.tv_sec = blob->hdr_session.sess_timeout; + ts.tv_nsec = 0; + schedPolling(TASK_ROOT(task), &ts, NULL); + } + + /* Go to proceed packet ... */ + switch (blob->hdr_cmd) { + case get: + if (!(b = rpc_srv_getBLOB(s, ntohl(blob->hdr_var)))) { + rpc_SetErr(EINVAL, "Var=%x not found", ntohl(blob->hdr_var)); + blob->hdr_cmd = no; + blob->hdr_ret = RPC_ERROR(-1); + break; + } else + blob->hdr_len = htonl(b->blob_len); + + if (rpc_srv_blobMap(s, b) != -1) { + /* deliver BLOB variable to client */ + blob->hdr_ret = htonl(rpc_srv_sendBLOB(c, b)); + rpc_srv_blobUnmap(b); + } else { + blob->hdr_cmd = error; + blob->hdr_ret = RPC_ERROR(-1); + } break; - } + case set: + if ((b = rpc_srv_registerBLOB(s, ntohl(blob->hdr_len)))) { + /* set new BLOB variable for reply :) */ + blob->hdr_var = htonl(b->blob_var); - memset(buf, 0, sizeof buf); - ret = recv(c->cli_sock, buf, sizeof buf, 0); - if (ret == -1) { - LOGERR; - ret = -3; + /* receive BLOB from client */ + blob->hdr_ret = htonl(rpc_srv_recvBLOB(c, b)); + rpc_srv_blobUnmap(b); + } else { + blob->hdr_cmd = error; + blob->hdr_ret = RPC_ERROR(-1); + } break; - } - /* receive EOF, disable or kill service */ - if (!ret || s->srv_blob.state == kill || s->srv_kill == kill) { - ret = 0; + case unset: + if (rpc_srv_unregisterBLOB(s, blob->hdr_var) == -1) { + blob->hdr_cmd = error; + blob->hdr_ret = RPC_ERROR(-1); + } break; - } - if (ret < sizeof(struct tagBLOBHdr)) { - rpc_SetErr(ERPCMISMATCH, "Error:: too short BLOB packet ...\n"); - ret = -4; - if (s->srv_kill != kill && s->srv_blob.state != kill) - continue; - else - break; - } else - blob = (struct tagBLOBHdr*) buf; - /* check BLOB packet session info */ - if (memcmp(&blob->hdr_session, &s->srv_session, sizeof blob->hdr_session)) { - rpc_SetErr(EINVAL, "Error:: get invalid BLOB session ...\n"); - ret = -5; - goto makeReply; - } - /* Go to proceed packet ... */ - switch (blob->hdr_cmd) { - case get: - if (!(b = rpc_srv_getBLOB(s, blob->hdr_var))) { - rpc_SetErr(EINVAL, "Error:: var (%x) not found into BLOB server ...\n", - blob->hdr_var); - ret = -6; - break; - } else - blob->hdr_len = b->blob_len; + default: + rpc_SetErr(EPROCUNAVAIL, "Unsupported BLOB command %d", blob->hdr_cmd); + blob->hdr_cmd = error; + blob->hdr_ret = RPC_ERROR(-1); + } - if (rpc_srv_blobMap(s, b) != -1) { - ret = rpc_srv_sendBLOB(c, b); - rpc_srv_blobUnmap(b); - } else - ret = -7; - break; - case set: - if ((b = rpc_srv_registerBLOB(s, blob->hdr_len))) { - /* set new BLOB variable for reply :) */ - blob->hdr_var = b->blob_var; +end: + schedWrite(TASK_ROOT(task), txBLOB, TASK_ARG(task), TASK_FD(task), + TASK_DATA(task), TASK_DATLEN(task)); + schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task), + TASK_DATA(task), TASK_DATLEN(task)); + return NULL; +} - ret = rpc_srv_recvBLOB(c, b); - rpc_srv_blobUnmap(b); - } else - ret = -7; - break; - case unset: - ret = rpc_srv_unregisterBLOB(s, blob->hdr_var); - if (ret == -1) - ret = -7; - break; - default: - rpc_SetErr(EPROCUNAVAIL, "Error:: unsupported BLOB command (%d)...\n", - blob->hdr_cmd); - ret = -7; - } +static void * +rpc_srv_dispatchVars(void *arg) +{ + rpc_cli_t *c = arg; + rpc_srv_t *s; + sched_root_task_t *root; + u_char *buf; + struct timespec ts = { DEF_RPC_TIMEOUT, 0 }; + FTRACE(); + + if (!arg) { + rpc_SetErr(EINVAL, "Invalid parameter can`t procced BLOB"); + return NULL; + } else + s = c->cli_parent; + + /* allocate net buffer */ + buf = malloc(sizeof(struct tagBLOBHdr)); + if (!buf) { + LOGERR; + return NULL; + } + + root = schedBegin(); + if (!root) { + rpc_SetErr(sched_GetErrno(), "%s", sched_GetError()); + free(buf); + return NULL; + } else { + schedTermCondition(root, kill); + schedPolling(root, &ts, NULL); + } + + schedRead(root, rxBLOB, c, c->cli_sock, buf, sizeof(struct tagBLOBHdr)); + + schedRun(root, (void*) &s->srv_blob.state); + schedEnd(&root); + + shutdown(c->cli_sock, SHUT_RDWR); + close(c->cli_sock); + memset(c, 0, sizeof(rpc_cli_t)); + free(buf); + return NULL; + +#if 0 makeReply: /* Replay to client! */ - blob->hdr_cmd = ret < 0 ? error : ok; - blob->hdr_ret = ret; ret = send(c->cli_sock, buf, sizeof buf, 0); if (ret == -1) { LOGERR; @@ -404,12 +480,7 @@ makeReply: else break; } - } while (ret > -1 || s->srv_kill != kill); - - shutdown(c->cli_sock, SHUT_RDWR); - close(c->cli_sock); - memset(c, 0, sizeof(rpc_cli_t)); - return (void*) ((long)ret); +#endif } // ------------------------------------------------- @@ -593,7 +664,7 @@ rpc_srv_loopBLOB(rpc_srv_t * __restrict srv) FTRACE(); if (!srv || srv->srv_blob.state == kill) { - rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start BLOB server ...\n"); + rpc_SetErr(EINVAL, "Invalid parameter can`t start BLOB server"); return -1; }