--- libaitrpc/src/srv.c 2016/08/08 13:21:13 1.29 +++ libaitrpc/src/srv.c 2025/03/31 17:02:29 1.33.2.1 @@ -3,7 +3,7 @@ * by Michael Pounov * * $Author: misho $ -* $Id: srv.c,v 1.29 2016/08/08 13:21:13 misho Exp $ +* $Id: srv.c,v 1.33.2.1 2025/03/31 17:02:29 misho Exp $ * ************************************************************************** The ELWIX and AITNET software is distributed under the following @@ -12,7 +12,7 @@ terms: All of the documentation and software included in the ELWIX and AITNET Releases is copyrighted by ELWIX - Sofia/Bulgaria -Copyright 2004 - 2016 +Copyright 2004 - 2025 by Michael Pounov . All rights reserved. Redistribution and use in source and binary forms, with or without @@ -69,6 +69,27 @@ static void *txBPFPacket(sched_task_t *); static void *rxEXTPacket(sched_task_t *); static void *txEXTPacket(sched_task_t *); +#ifdef __linux__ + #ifdef __mips__ + static sched_task_func_t cbProto[SOCK_MAX_SUPPORT][4] = { + { acceptClients, closeClient, rxPacket, txPacket }, /* SOCK_STREAM */ + { rxUDPPacket, freeClient, NULL /*rxUDPPacket*/, txUDPPacket }, /* SOCK_DGRAM */ + { acceptClients, closeClient, rxPacket, txPacket }, /* SOCK_STREAM */ + { rxRAWPacket, freeClient, NULL /*rxRAWPacket*/, txRAWPacket }, /* SOCK_RAW */ + { rxBPFPacket, freeClient, NULL /*rxBPFPacket*/, txBPFPacket }, /* SOCK_BPF */ + { rxEXTPacket, freeClient, NULL /*rxEXTPacket*/, txEXTPacket } /* SOCK_EXT */ + }; + #else + static sched_task_func_t cbProto[SOCK_MAX_SUPPORT][4] = { + { acceptClients, closeClient, rxPacket, txPacket }, /* SOCK_STREAM */ + { acceptClients, closeClient, rxPacket, txPacket }, /* SOCK_STREAM */ + { rxUDPPacket, freeClient, NULL /*rxUDPPacket*/, txUDPPacket }, /* SOCK_DGRAM */ + { rxRAWPacket, freeClient, NULL /*rxRAWPacket*/, txRAWPacket }, /* SOCK_RAW */ + { rxBPFPacket, freeClient, NULL /*rxBPFPacket*/, txBPFPacket }, /* SOCK_BPF */ + { rxEXTPacket, freeClient, NULL /*rxEXTPacket*/, txEXTPacket } /* SOCK_EXT */ + }; + #endif +#else static sched_task_func_t cbProto[SOCK_MAX_SUPPORT][4] = { { acceptClients, closeClient, rxPacket, txPacket }, /* SOCK_STREAM */ { acceptClients, closeClient, rxPacket, txPacket }, /* SOCK_STREAM */ @@ -77,6 +98,7 @@ static sched_task_func_t cbProto[SOCK_MAX_SUPPORT][4] { rxBPFPacket, freeClient, NULL /*rxBPFPacket*/, txBPFPacket }, /* SOCK_BPF */ { rxEXTPacket, freeClient, NULL /*rxEXTPacket*/, txEXTPacket } /* SOCK_EXT */ }; +#endif /* Global Signal Argument when kqueue support disabled */ @@ -87,7 +109,8 @@ rpc_freeCli(rpc_cli_t * __restrict c) { rpc_srv_t *s = c->cli_parent; - schedCancelby(s->srv_root, taskMAX, CRITERIA_ARG, c, NULL); + if (s->srv_proto == SOCK_STREAM) + schedCancelby(s->srv_root, taskMAX, CRITERIA_ARG, c, NULL); /* free buffer */ AIT_FREE_VAL(&c->cli_buf); @@ -146,6 +169,11 @@ _allocClient(rpc_srv_t * __restrict srv, sockaddr_t * /* alloc empty buffer */ AIT_SET_BUFSIZ(&c->cli_buf, 0, srv->srv_netbuf); + if (!AIT_GET_BUF(&c->cli_buf)) { + array_Del(srv->srv_clients, n, 0); + e_free(c); + c = NULL; + } } return c; @@ -291,7 +319,7 @@ execCall(sched_task_t *task) if (TASK_VAL(task)) { /* without reply */ ait_freeVars(&c->cli_vars); - } else { + } else if (rpc->call_io & RPC_REQ) { /* reply */ rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c)); } @@ -307,7 +335,7 @@ rxPacket(sched_task_t *task) { rpc_cli_t *c = TASK_ARG(task); rpc_srv_t *s = c->cli_parent; - int len, noreply = 0, rlen = AIT_LEN(&c->cli_buf); + int len, noreply = 0, rlen; #if 0 u_short crc; #endif @@ -323,8 +351,15 @@ rxPacket(sched_task_t *task) /* prepare rx */ len = recv(TASK_FD(task), &b, sizeof b, MSG_PEEK); - if (len == sizeof b) + if (len < 1) { + /* close connection */ + schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], + TASK_ARG(task), 0, NULL, 0); + taskExit(task, NULL); + } else if (len == sizeof b) rlen = ntohl(b.call_len); + else + goto end; rlen = recv(TASK_FD(task), buf, rlen, 0); if (rlen == -1) { @@ -387,10 +422,10 @@ rxPacket(sched_task_t *task) schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), (int) noreply, rpc, len); err: /* send RPC reply */ - if (!noreply) + if (!noreply && (rpc->call_io & RPC_REQ)) schedWrite(TASK_ROOT(task), cbProto[s->srv_proto][CB_TXPACKET], TASK_ARG(task), TASK_FD(task), rpc, len); - +end: /* lets get next packet */ schedReadSelf(task); taskExit(task, NULL); @@ -424,8 +459,10 @@ acceptClients(sched_task_t *task) AIT_FREE_VAL(&c->cli_buf); array_Del(srv->srv_clients, c->cli_id, 42); goto end; - } else + } else { fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK); + fcntl(c->cli_sock, F_SETFD, FD_CLOEXEC); + } #ifdef TCP_SESSION_TIMEOUT /* armed timer for close stateless connection */ @@ -528,7 +565,7 @@ rxUDPPacket(sched_task_t *task) #endif rlen = recvfrom(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b), 0, &sa.sa, &salen); rpc = (struct tagRPCCall*) AIT_GET_BUF(&b); - if (rlen < sizeof(struct tagRPCCall)) + if (!rpc || rlen < sizeof(struct tagRPCCall)) goto end; else len = ntohl(rpc->call_len); @@ -542,7 +579,7 @@ rxUDPPacket(sched_task_t *task) /* check integrity of packet */ crc = ntohs(rpc->call_crc); rpc->call_crc ^= rpc->call_crc; - if (crc != crcFletcher16((u_short*) rpc, len / 2)) + if (crc != crcFletcher16((u_short*) AIT_GET_BUF(&b), len / 2)) goto end; /* check RPC packet session info */ @@ -551,7 +588,7 @@ rxUDPPacket(sched_task_t *task) c = _allocClient(srv, &sa); if (!c) { - EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n"); + EVERBOSE(1, "RPC client quota exceeded!"); usleep(2000); /* blocked client delay */ goto end; } else { @@ -573,7 +610,7 @@ rxUDPPacket(sched_task_t *task) schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len); /* send RPC reply */ - if (!noreply) + if (!noreply && (rpc->call_io & RPC_REQ)) schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], c, TASK_FD(task), rpc, len); end: @@ -672,7 +709,7 @@ rxRAWPacket(sched_task_t *task) if (sa.sa.sa_family == AF_INET) { struct ip *h; h = (struct ip*) AIT_GET_BUF(&b); - if (rlen < ntohs(h->ip_len) || h->ip_p != IPPROTO_ERPC) + if (!h || rlen < ntohs(h->ip_len) || h->ip_p != IPPROTO_ERPC) goto end; else { rlen -= sizeof(struct ip); @@ -682,7 +719,7 @@ rxRAWPacket(sched_task_t *task) #ifdef IPV6_REMOVE_HEADER struct ip6_hdr *h; h = (struct ip6_hdr*) AIT_GET_BUF(&b); - if (rlen < ntohs(h->ip6_plen) || h->ip6_nxt != IPPROTO_ERPC) + if (!h || rlen < ntohs(h->ip6_plen) || h->ip6_nxt != IPPROTO_ERPC) goto end; else { rlen -= sizeof(struct ip6_hdr); @@ -692,7 +729,7 @@ rxRAWPacket(sched_task_t *task) rpc = (struct tagRPCCall*) AIT_GET_BUF(&b); #endif } - if (rlen < sizeof(struct tagRPCCall)) + if (!rpc || rlen < sizeof(struct tagRPCCall)) goto end; else len = ntohl(rpc->call_len); @@ -706,7 +743,7 @@ rxRAWPacket(sched_task_t *task) /* check integrity of packet */ crc = ntohs(rpc->call_crc); rpc->call_crc ^= rpc->call_crc; - if (crc != crcFletcher16((u_short*) rpc, len / 2)) + if (crc != crcFletcher16((u_short*) AIT_GET_BUF(&b), len / 2)) goto end; /* check RPC packet session info */ @@ -737,7 +774,7 @@ rxRAWPacket(sched_task_t *task) schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len); /* send RPC reply */ - if (!noreply) + if (!noreply && (rpc->call_io & RPC_REQ)) schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], c, TASK_FD(task), rpc, len); end: @@ -907,7 +944,7 @@ rxBPFPacket(sched_task_t *task) schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len); /* send RPC reply */ - if (!noreply) + if (!noreply && (rpc->call_io & RPC_REQ)) schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], c, TASK_FD(task), rpc, len); end: @@ -988,7 +1025,7 @@ rxEXTPacket(sched_task_t *task) { rpc_srv_t *srv = TASK_ARG(task); rpc_cli_t *c = NULL; - int len, noreply = 0, rlen = AIT_LEN(&c->cli_buf); + int len, noreply = 0, rlen; struct tagRPCCall *rpc; struct timespec ts = { DEF_RPC_TIMEOUT, 0 }; sockaddr_t sa; @@ -999,7 +1036,7 @@ rxEXTPacket(sched_task_t *task) AIT_SET_BUF(&b, NULL, srv->srv_netbuf); rlen = read(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b)); rpc = (struct tagRPCCall*) AIT_GET_BUF(&b); - if (rlen < sizeof(struct tagRPCCall)) + if (!rpc || rlen < sizeof(struct tagRPCCall)) goto end; else len = ntohl(rpc->call_len); @@ -1037,7 +1074,7 @@ rxEXTPacket(sched_task_t *task) schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len); /* send RPC reply */ - if (!noreply) + if (!noreply && (rpc->call_io & RPC_REQ)) schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], c, TASK_FD(task), rpc, len); end: @@ -1264,6 +1301,7 @@ acceptBLOBClients(sched_task_t *task) setsockopt(c->cli_sock, IPPROTO_TCP, TCP_NOPUSH, &n, sizeof n); #endif fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK); + fcntl(c->cli_sock, F_SETFD, FD_CLOEXEC); } schedRead(TASK_ROOT(task), rxBLOB, c, c->cli_sock, NULL, 0); @@ -1546,7 +1584,7 @@ rpc_srv_initServer(u_char InstID, int concurentClients /* init functions */ pthread_mutex_init(&srv->srv_funcs.mtx, NULL); SLIST_INIT(&srv->srv_funcs); - AVL_INIT(&srv->srv_funcs); + RB_INIT(&srv->srv_funcs); /* init scheduler */ srv->srv_root = schedBegin(); @@ -1706,7 +1744,7 @@ rpc_srv_loopServer(rpc_srv_t * __restrict srv) AIT_FREE_VAL(&f->func_name); e_free(f); } - srv->srv_funcs.avlh_root = NULL; + srv->srv_funcs.rbh_root = NULL; RPC_FUNCS_UNLOCK(&srv->srv_funcs); return 0; @@ -1806,7 +1844,7 @@ rpc_srv_initServer2(u_char InstID, int concurentClient /* init functions */ pthread_mutex_init(&srv->srv_funcs.mtx, NULL); SLIST_INIT(&srv->srv_funcs); - AVL_INIT(&srv->srv_funcs); + RB_INIT(&srv->srv_funcs); /* init scheduler */ srv->srv_root = schedBegin(); @@ -1924,7 +1962,7 @@ rpc_srv_initServerExt(u_char InstID, int netBuf, int f /* init functions */ pthread_mutex_init(&srv->srv_funcs.mtx, NULL); SLIST_INIT(&srv->srv_funcs); - AVL_INIT(&srv->srv_funcs); + RB_INIT(&srv->srv_funcs); /* init scheduler */ srv->srv_root = schedBegin(); @@ -1951,4 +1989,26 @@ rpc_srv_initServerExt(u_char InstID, int netBuf, int f rpc_register_srvPing(srv); return srv; +} + +/* + * rpc_srv_Return() - Prepare IPC return answer to RPC client + * + * @c = RPC client + * return: number of arguments in response + */ +int +rpc_srv_Return(rpc_cli_t *c) +{ + rpc_srv_t *s = c->cli_parent; + u_char *buf = AIT_GET_BUF(&c->cli_buf); + struct tagRPCCall *rpc = (struct tagRPCCall*) buf; + + if (!RPC_CHK_NOREPLY(rpc)) { + rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c)); + schedWrite(s->srv_root, cbProto[s->srv_proto][CB_TXPACKET], c, c->cli_sock, rpc, 0); + } else + rpc->call_argc ^= rpc->call_argc; + + return rpc->call_argc; }