--- libaitrpc/src/srv.c 2015/07/22 20:01:46 1.28 +++ libaitrpc/src/srv.c 2024/03/20 17:32:31 1.31 @@ -3,7 +3,7 @@ * by Michael Pounov * * $Author: misho $ -* $Id: srv.c,v 1.28 2015/07/22 20:01:46 misho Exp $ +* $Id: srv.c,v 1.31 2024/03/20 17:32:31 misho Exp $ * ************************************************************************** The ELWIX and AITNET software is distributed under the following @@ -12,7 +12,7 @@ terms: All of the documentation and software included in the ELWIX and AITNET Releases is copyrighted by ELWIX - Sofia/Bulgaria -Copyright 2004 - 2015 +Copyright 2004 - 2024 by Michael Pounov . All rights reserved. Redistribution and use in source and binary forms, with or without @@ -87,7 +87,8 @@ rpc_freeCli(rpc_cli_t * __restrict c) { rpc_srv_t *s = c->cli_parent; - schedCancelby(s->srv_root, taskMAX, CRITERIA_ARG, c, NULL); + if (s->srv_proto == SOCK_STREAM) + schedCancelby(s->srv_root, taskMAX, CRITERIA_ARG, c, NULL); /* free buffer */ AIT_FREE_VAL(&c->cli_buf); @@ -146,6 +147,11 @@ _allocClient(rpc_srv_t * __restrict srv, sockaddr_t * /* alloc empty buffer */ AIT_SET_BUFSIZ(&c->cli_buf, 0, srv->srv_netbuf); + if (!AIT_GET_BUF(&c->cli_buf)) { + array_Del(srv->srv_clients, n, 0); + e_free(c); + c = NULL; + } } return c; @@ -291,7 +297,7 @@ execCall(sched_task_t *task) if (TASK_VAL(task)) { /* without reply */ ait_freeVars(&c->cli_vars); - } else { + } else if (rpc->call_io & RPC_REQ) { /* reply */ rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c)); } @@ -307,13 +313,12 @@ rxPacket(sched_task_t *task) { rpc_cli_t *c = TASK_ARG(task); rpc_srv_t *s = c->cli_parent; - int len, noreply = 0, rlen = AIT_LEN(&c->cli_buf); + int len, noreply = 0, rlen; #if 0 u_short crc; #endif u_char *buf = AIT_GET_BUF(&c->cli_buf); - u_char b[sizeof(struct tagRPCCall)]; - struct tagRPCCall *rpc = (struct tagRPCCall*) buf; + struct tagRPCCall b, *rpc = (struct tagRPCCall*) buf; #ifdef TCP_SESSION_TIMEOUT struct timespec ts = { DEF_RPC_TIMEOUT, 0 }; @@ -323,9 +328,16 @@ rxPacket(sched_task_t *task) #endif /* prepare rx */ - len = recv(TASK_FD(task), b, sizeof b, MSG_PEEK); - if (len == sizeof b) - rlen = ntohl(((struct tagRPCCall*) b)->call_len); + len = recv(TASK_FD(task), &b, sizeof b, MSG_PEEK); + if (len < 1) { + /* close connection */ + schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], + TASK_ARG(task), 0, NULL, 0); + taskExit(task, NULL); + } else if (len == sizeof b) + rlen = ntohl(b.call_len); + else + goto end; rlen = recv(TASK_FD(task), buf, rlen, 0); if (rlen == -1) { @@ -388,10 +400,10 @@ rxPacket(sched_task_t *task) schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), (int) noreply, rpc, len); err: /* send RPC reply */ - if (!noreply) + if (!noreply && (rpc->call_io & RPC_REQ)) schedWrite(TASK_ROOT(task), cbProto[s->srv_proto][CB_TXPACKET], TASK_ARG(task), TASK_FD(task), rpc, len); - +end: /* lets get next packet */ schedReadSelf(task); taskExit(task, NULL); @@ -402,7 +414,7 @@ acceptClients(sched_task_t *task) { rpc_srv_t *srv = TASK_ARG(task); rpc_cli_t *c = NULL; - socklen_t salen = sizeof(sockaddr_t); + socklen_t salen = E_SOCKADDR_MAX; int sock; #ifdef TCP_SESSION_TIMEOUT struct timespec ts = { DEF_RPC_TIMEOUT, 0 }; @@ -425,8 +437,10 @@ acceptClients(sched_task_t *task) AIT_FREE_VAL(&c->cli_buf); array_Del(srv->srv_clients, c->cli_id, 42); goto end; - } else + } else { fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK); + fcntl(c->cli_sock, F_SETFD, FD_CLOEXEC); + } #ifdef TCP_SESSION_TIMEOUT /* armed timer for close stateless connection */ @@ -499,7 +513,7 @@ txUDPPacket(sched_task_t *task) /* send reply */ ret = sendto(TASK_FD(task), buf, wlen, MSG_NOSIGNAL, - &c->cli_sa.sa, c->cli_sa.sa.sa_len); + &c->cli_sa.sa, e_addrlen(&c->cli_sa)); if (ret == -1) { /* close connection */ schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], @@ -518,13 +532,15 @@ rxUDPPacket(sched_task_t *task) u_short crc; struct tagRPCCall *rpc; sockaddr_t sa; - socklen_t salen; + socklen_t salen = E_SOCKADDR_MAX; struct timespec ts = { DEF_RPC_TIMEOUT, 0 }; ait_val_t b = AIT_VAL_INIT; /* receive connect packet */ AIT_SET_BUF(&b, NULL, srv->srv_netbuf); - salen = sa.ss.ss_len = (u_char) MIN(sizeof(sockaddr_t), 0xff); +#ifndef __linux__ + sa.ss.ss_len = salen; +#endif rlen = recvfrom(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b), 0, &sa.sa, &salen); rpc = (struct tagRPCCall*) AIT_GET_BUF(&b); if (rlen < sizeof(struct tagRPCCall)) @@ -541,7 +557,7 @@ rxUDPPacket(sched_task_t *task) /* check integrity of packet */ crc = ntohs(rpc->call_crc); rpc->call_crc ^= rpc->call_crc; - if (crc != crcFletcher16((u_short*) rpc, len / 2)) + if (crc != crcFletcher16((u_short*) AIT_GET_BUF(&b), len / 2)) goto end; /* check RPC packet session info */ @@ -550,7 +566,7 @@ rxUDPPacket(sched_task_t *task) c = _allocClient(srv, &sa); if (!c) { - EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n"); + EVERBOSE(1, "RPC client quota exceeded!"); usleep(2000); /* blocked client delay */ goto end; } else { @@ -572,7 +588,7 @@ rxUDPPacket(sched_task_t *task) schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len); /* send RPC reply */ - if (!noreply) + if (!noreply && (rpc->call_io & RPC_REQ)) schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], c, TASK_FD(task), rpc, len); end: @@ -639,7 +655,7 @@ txRAWPacket(sched_task_t *task) /* send reply */ ret = sendto(TASK_FD(task), buf, wlen, MSG_NOSIGNAL, - &c->cli_sa.sa, c->cli_sa.sa.sa_len); + &c->cli_sa.sa, e_addrlen(&c->cli_sa)); if (ret == -1) { /* close connection */ schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], @@ -658,13 +674,15 @@ rxRAWPacket(sched_task_t *task) u_short crc; struct tagRPCCall *rpc; sockaddr_t sa; - socklen_t salen; + socklen_t salen = E_SOCKADDR_MAX; struct timespec ts = { DEF_RPC_TIMEOUT, 0 }; ait_val_t b = AIT_VAL_INIT; /* receive connect packet */ AIT_SET_BUF(&b, NULL, srv->srv_netbuf); - salen = sa.ss.ss_len = (u_char) MIN(sizeof(sockaddr_t), 0xff); +#ifndef __linux__ + sa.ss.ss_len = salen; +#endif rlen = recvfrom(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b), 0, &sa.sa, &salen); if (sa.sa.sa_family == AF_INET) { struct ip *h; @@ -676,15 +694,18 @@ rxRAWPacket(sched_task_t *task) rpc = (struct tagRPCCall*) (h + 1); } } else { +#ifdef IPV6_REMOVE_HEADER struct ip6_hdr *h; h = (struct ip6_hdr*) AIT_GET_BUF(&b); - if (rlen < (ntohs(h->ip6_plen) + sizeof(struct ip6_hdr)) || - h->ip6_nxt != IPPROTO_ERPC) + if (rlen < ntohs(h->ip6_plen) || h->ip6_nxt != IPPROTO_ERPC) goto end; else { rlen -= sizeof(struct ip6_hdr); rpc = (struct tagRPCCall*) (h + 1); } +#else + rpc = (struct tagRPCCall*) AIT_GET_BUF(&b); +#endif } if (rlen < sizeof(struct tagRPCCall)) goto end; @@ -700,7 +721,7 @@ rxRAWPacket(sched_task_t *task) /* check integrity of packet */ crc = ntohs(rpc->call_crc); rpc->call_crc ^= rpc->call_crc; - if (crc != crcFletcher16((u_short*) rpc, len / 2)) + if (crc != crcFletcher16((u_short*) AIT_GET_BUF(&b), len / 2)) goto end; /* check RPC packet session info */ @@ -731,7 +752,7 @@ rxRAWPacket(sched_task_t *task) schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len); /* send RPC reply */ - if (!noreply) + if (!noreply && (rpc->call_io & RPC_REQ)) schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], c, TASK_FD(task), rpc, len); end: @@ -744,6 +765,7 @@ end: static void * txBPFPacket(sched_task_t *task) { +#ifndef __linux__ rpc_cli_t *c = TASK_ARG(task); rpc_srv_t *s = c->cli_parent; rpc_func_t *f = NULL; @@ -812,6 +834,9 @@ txBPFPacket(sched_task_t *task) schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], TASK_ARG(task), 0, NULL, 0); } +#else + rpc_SetErr(ENOTSUP, "Feature isn't supported on Linux!"); +#endif taskExit(task, NULL); } @@ -819,6 +844,7 @@ txBPFPacket(sched_task_t *task) static void * rxBPFPacket(sched_task_t *task) { +#ifndef __linux__ rpc_srv_t *srv = TASK_ARG(task); rpc_cli_t *c = NULL; int len, rlen, noreply; @@ -896,12 +922,16 @@ rxBPFPacket(sched_task_t *task) schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len); /* send RPC reply */ - if (!noreply) + if (!noreply && (rpc->call_io & RPC_REQ)) schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], c, TASK_FD(task), rpc, len); end: AIT_FREE_VAL(&b); schedReadSelf(task); +#else + rpc_SetErr(ENOTSUP, "Feature isn't supported on Linux!"); +#endif + taskExit(task, NULL); } @@ -973,7 +1003,7 @@ rxEXTPacket(sched_task_t *task) { rpc_srv_t *srv = TASK_ARG(task); rpc_cli_t *c = NULL; - int len, noreply = 0, rlen = AIT_LEN(&c->cli_buf); + int len, noreply = 0, rlen; struct tagRPCCall *rpc; struct timespec ts = { DEF_RPC_TIMEOUT, 0 }; sockaddr_t sa; @@ -1022,7 +1052,7 @@ rxEXTPacket(sched_task_t *task) schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len); /* send RPC reply */ - if (!noreply) + if (!noreply && (rpc->call_io & RPC_REQ)) schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], c, TASK_FD(task), rpc, len); end: @@ -1182,7 +1212,7 @@ flushBLOB(sched_task_t *task) e_free(b); } - if (!schedSignalSelf(task)) { + if (sigArg) { /* disabled kqueue support in libaitsched */ struct sigaction sa; @@ -1191,9 +1221,11 @@ flushBLOB(sched_task_t *task) sa.sa_handler = (void (*)(int)) flushBLOB; sa.sa_flags = SA_RESTART | SA_RESETHAND; sigaction(SIGFBLOB, &sa, NULL); + return NULL; + } else { + schedSignalSelf(task); + taskExit(task, NULL); } - - taskExit(task, NULL); } static void * @@ -1202,7 +1234,7 @@ acceptBLOBClients(sched_task_t *task) rpc_srv_t *srv = TASK_ARG(task); rpc_cli_t *c = NULL; register int i; - socklen_t salen = sizeof(sockaddr_t); + socklen_t salen = E_SOCKADDR_MAX; int sock; #ifdef TCP_NOPUSH int n = 1; @@ -1247,6 +1279,7 @@ acceptBLOBClients(sched_task_t *task) setsockopt(c->cli_sock, IPPROTO_TCP, TCP_NOPUSH, &n, sizeof n); #endif fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK); + fcntl(c->cli_sock, F_SETFD, FD_CLOEXEC); } schedRead(TASK_ROOT(task), rxBLOB, c, c->cli_sock, NULL, 0); @@ -1269,6 +1302,7 @@ int rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir) { int n = 1; + socklen_t salen; if (!srv || srv->srv_kill) { rpc_SetErr(EINVAL, "Invalid parameters can`t init BLOB server"); @@ -1287,19 +1321,22 @@ rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s srv->srv_blob.server.cli_parent = srv; - memcpy(&srv->srv_blob.server.cli_sa, &srv->srv_server.cli_sa, sizeof(sockaddr_t)); + memcpy(&srv->srv_blob.server.cli_sa, &srv->srv_server.cli_sa, sizeof srv->srv_blob.server.cli_sa); switch (srv->srv_blob.server.cli_sa.sa.sa_family) { case AF_INET: srv->srv_blob.server.cli_sa.sin.sin_port = htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin.sin_port) + 1); + salen = sizeof srv->srv_blob.server.cli_sa.sin; break; case AF_INET6: srv->srv_blob.server.cli_sa.sin6.sin6_port = htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin6.sin6_port) + 1); + salen = sizeof srv->srv_blob.server.cli_sa.sin6; break; case AF_LOCAL: strlcat(srv->srv_blob.server.cli_sa.sun.sun_path, ".blob", sizeof srv->srv_blob.server.cli_sa.sun.sun_path); + salen = sizeof srv->srv_blob.server.cli_sa.sun; break; default: AIT_FREE_VAL(&srv->srv_blob.dir); @@ -1332,8 +1369,7 @@ rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s AIT_FREE_VAL(&srv->srv_blob.dir); return -1; } - if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa, - srv->srv_blob.server.cli_sa.sa.sa_len) == -1) { + if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa, salen) == -1) { LOGERR; close(srv->srv_blob.server.cli_sock); AIT_FREE_VAL(&srv->srv_blob.dir); @@ -1483,6 +1519,7 @@ rpc_srv_initServer(u_char InstID, int concurentClients int n = 1; rpc_srv_t *srv = NULL; sockaddr_t sa = E_SOCKADDR_INIT; + socklen_t salen; if (!concurentClients || (proto < 0 || proto > SOCK_RAW)) { rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server"); @@ -1490,7 +1527,7 @@ rpc_srv_initServer(u_char InstID, int concurentClients } if (!Port && proto < SOCK_RAW) Port = RPC_DEFPORT; - if (!e_gethostbyname(csHost, Port, &sa)) + if (!(salen = e_gethostbyname(csHost, Port, &sa))) return NULL; if (!proto) proto = SOCK_STREAM; @@ -1572,8 +1609,7 @@ rpc_srv_initServer(u_char InstID, int concurentClients LOGERR; goto err; } - if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa, - srv->srv_server.cli_sa.sa.sa_len) == -1) { + if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa, salen) == -1) { LOGERR; goto err; } else @@ -1730,6 +1766,7 @@ rpc_srv_execCall(rpc_cli_t * __restrict cli, struct ta rpc_srv_t * rpc_srv_initServer2(u_char InstID, int concurentClients, int netBuf, const char *csIface) { +#ifndef __linux__ int n = 1; rpc_srv_t *srv = NULL; sockaddr_t sa = E_SOCKADDR_INIT; @@ -1856,6 +1893,10 @@ err: /* error condition */ schedEnd(&srv->srv_root); pthread_mutex_destroy(&srv->srv_funcs.mtx); e_free(srv); +#else + rpc_SetErr(ENOTSUP, "Feature isn't supported on Linux!"); +#endif + return NULL; } @@ -1926,4 +1967,26 @@ rpc_srv_initServerExt(u_char InstID, int netBuf, int f rpc_register_srvPing(srv); return srv; +} + +/* + * rpc_srv_Return() - Prepare IPC return answer to RPC client + * + * @c = RPC client + * return: number of arguments in response + */ +int +rpc_srv_Return(rpc_cli_t *c) +{ + rpc_srv_t *s = c->cli_parent; + u_char *buf = AIT_GET_BUF(&c->cli_buf); + struct tagRPCCall *rpc = (struct tagRPCCall*) buf; + + if (!RPC_CHK_NOREPLY(rpc)) { + rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c)); + schedWrite(s->srv_root, cbProto[s->srv_proto][CB_TXPACKET], c, c->cli_sock, rpc, 0); + } else + rpc->call_argc ^= rpc->call_argc; + + return rpc->call_argc; }