--- libaitrpc/src/srv.c 2012/11/19 10:29:02 1.12.2.5 +++ libaitrpc/src/srv.c 2012/11/19 21:35:43 1.12.2.11 @@ -3,7 +3,7 @@ * by Michael Pounov * * $Author: misho $ -* $Id: srv.c,v 1.12.2.5 2012/11/19 10:29:02 misho Exp $ +* $Id: srv.c,v 1.12.2.11 2012/11/19 21:35:43 misho Exp $ * ************************************************************************** The ELWIX and AITNET software is distributed under the following @@ -67,6 +67,22 @@ static sched_task_func_t cbProto[SOCK_RAW + 1][4] = { }; +inline void +rpc_freeCli(rpc_cli_t * __restrict c) +{ + rpc_srv_t *s = c->cli_parent; + + schedCancelby(s->srv_root, taskMAX, CRITERIA_ARG, c, NULL); + + /* free buffer */ + AIT_FREE_VAL(&c->cli_buf); + + io_arrayDel(s->srv_clients, c->cli_id, 0); + if (c) + io_free(c); +} + + static inline int _check4freeslot(rpc_srv_t * __restrict srv, io_sockaddr_t * __restrict sa) { @@ -121,39 +137,21 @@ _allocClient(rpc_srv_t * __restrict srv, io_sockaddr_t static void * freeClient(sched_task_t *task) { - rpc_cli_t *c = TASK_ARG(task); - rpc_srv_t *s = c->cli_parent; + rpc_freeCli(TASK_ARG(task)); - schedCancelby(TASK_ROOT(task), taskMAX, CRITERIA_ARG, TASK_ARG(task), NULL); - - /* free buffer */ - AIT_FREE_VAL(&c->cli_buf); - - io_arrayDel(s->srv_clients, c->cli_id, 0); - if (c) - io_free(c); return NULL; } static void * closeClient(sched_task_t *task) { - rpc_cli_t *c = TASK_ARG(task); - rpc_srv_t *s = c->cli_parent; + int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock; - schedCancelby(TASK_ROOT(task), taskMAX, CRITERIA_ARG, TASK_ARG(task), NULL); + rpc_freeCli(TASK_ARG(task)); /* close client socket */ - if (TASK_VAL(task)) - shutdown(c->cli_sock, SHUT_RDWR); - close(c->cli_sock); - - /* free buffer */ - AIT_FREE_VAL(&c->cli_buf); - - io_arrayDel(s->srv_clients, c->cli_id, 0); - if (c) - io_free(c); + shutdown(sock, SHUT_RDWR); + close(sock); return NULL; } @@ -204,7 +202,7 @@ txPacket(sched_task_t *task) if (ret == -1 || ret != wlen) { /* close connection */ schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], - c, 42, NULL, 0); + TASK_ARG(task), 0, NULL, 0); } return NULL; @@ -281,7 +279,7 @@ rxPacket(sched_task_t *task) if (rlen < 1) { /* close connection */ schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], - c, 42, NULL, 0); + TASK_ARG(task), 0, NULL, 0); return NULL; } else { rlen += off; /* add reminded bytes from previous rxPacket, if exists! */ @@ -387,9 +385,9 @@ txUDPPacket(sched_task_t *task) int ret, wlen = sizeof(struct tagRPCCall); struct timespec ts = { DEF_RPC_TIMEOUT, 0 }; - schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL); + schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL); schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], - c, ts, c, 0); + TASK_ARG(task), ts, TASK_ARG(task), 0); /* copy RPC header */ memcpy(buf, TASK_DATA(task), wlen); @@ -429,7 +427,7 @@ txUDPPacket(sched_task_t *task) if (ret == -1 || ret != wlen) { /* close connection */ schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], - c, 42, NULL, 0); + TASK_ARG(task), 0, NULL, 0); } return NULL; @@ -457,8 +455,10 @@ rxUDPPacket(sched_task_t *task) if (!c) goto end; else { - /* armed timer for close stateless connection */ + c->cli_sock = TASK_FD(task); + memcpy(&c->cli_sa, &sa, sizeof c->cli_sa); memcpy(AIT_GET_BUF(&c->cli_buf), buf, AIT_LEN(&c->cli_buf)); + /* armed timer for close stateless connection */ schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL); schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], c, ts, c, 0); @@ -503,13 +503,13 @@ rxUDPPacket(sched_task_t *task) rpc->call_rep.eno = RPC_ERROR(errno); } else { /* execute RPC call */ - schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), off, NULL, 0); + schedEvent(TASK_ROOT(task), execCall, c, off, NULL, 0); } /* send RPC reply */ if (!noreply) schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], - TASK_ARG(task), TASK_FD(task), rpc, len); + c, TASK_FD(task), rpc, len); off += len; } while (rlen > 0); @@ -521,25 +521,32 @@ end: /* ------------------------------------------------------ */ -static void * -closeBLOBClient(sched_task_t *task) +inline void +rpc_freeBLOBCli(rpc_cli_t * __restrict c) { - rpc_cli_t *c = TASK_ARG(task); rpc_srv_t *s = c->cli_parent; - schedCancelby(TASK_ROOT(task), taskMAX, CRITERIA_ARG, TASK_ARG(task), NULL); + schedCancelby(s->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL); - /* close client socket */ - if (TASK_VAL(task)) - shutdown(c->cli_sock, SHUT_RDWR); - close(c->cli_sock); - /* free buffer */ AIT_FREE_VAL(&c->cli_buf); io_arrayDel(s->srv_blob.clients, c->cli_id, 0); if (c) io_free(c); +} + + +static void * +closeBLOBClient(sched_task_t *task) +{ + int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock; + + rpc_freeBLOBCli(TASK_ARG(task)); + + /* close client socket */ + shutdown(sock, SHUT_RDWR); + close(sock); return NULL; } @@ -924,12 +931,12 @@ rpc_srv_initServer(u_int regProgID, u_char regProcID, rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server"); return NULL; } - if (!proto) - proto = SOCK_STREAM; if (!io_gethostbyname(csHost, Port, &sa)) return NULL; if (!Port) Port = RPC_DEFPORT; + if (!proto) + proto = SOCK_STREAM; if (netBuf < RPC_MIN_BUFSIZ) netBuf = BUFSIZ; else @@ -1069,10 +1076,12 @@ rpc_srv_loopServer(rpc_srv_t * __restrict srv) return -1; } - if (listen(srv->srv_server.cli_sock, io_arraySize(srv->srv_clients)) == -1) { - LOGERR; - return -1; - } + if (srv->srv_proto == SOCK_STREAM) + if (listen(srv->srv_server.cli_sock, + io_arraySize(srv->srv_clients)) == -1) { + LOGERR; + return -1; + } if (!schedRead(srv->srv_root, cbProto[srv->srv_proto][CB_ACCEPTCLIENT], srv, srv->srv_server.cli_sock, NULL, 0)) {