--- libaitrpc/src/srv.c 2015/07/22 12:49:10 1.27.2.5 +++ libaitrpc/src/srv.c 2024/02/26 18:25:32 1.30.2.10 @@ -3,7 +3,7 @@ * by Michael Pounov * * $Author: misho $ -* $Id: srv.c,v 1.27.2.5 2015/07/22 12:49:10 misho Exp $ +* $Id: srv.c,v 1.30.2.10 2024/02/26 18:25:32 misho Exp $ * ************************************************************************** The ELWIX and AITNET software is distributed under the following @@ -12,7 +12,7 @@ terms: All of the documentation and software included in the ELWIX and AITNET Releases is copyrighted by ELWIX - Sofia/Bulgaria -Copyright 2004 - 2015 +Copyright 2004 - 2024 by Michael Pounov . All rights reserved. Redistribution and use in source and binary forms, with or without @@ -87,7 +87,8 @@ rpc_freeCli(rpc_cli_t * __restrict c) { rpc_srv_t *s = c->cli_parent; - schedCancelby(s->srv_root, taskMAX, CRITERIA_ARG, c, NULL); + if (s->srv_proto == SOCK_STREAM) + schedCancelby(s->srv_root, taskMAX, CRITERIA_ARG, c, NULL); /* free buffer */ AIT_FREE_VAL(&c->cli_buf); @@ -291,7 +292,7 @@ execCall(sched_task_t *task) if (TASK_VAL(task)) { /* without reply */ ait_freeVars(&c->cli_vars); - } else { + } else if (rpc->call_io & RPC_REQ) { /* reply */ rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c)); } @@ -312,8 +313,7 @@ rxPacket(sched_task_t *task) u_short crc; #endif u_char *buf = AIT_GET_BUF(&c->cli_buf); - u_char b[sizeof(struct tagRPCCall)]; - struct tagRPCCall *rpc = (struct tagRPCCall*) buf; + struct tagRPCCall b, *rpc = (struct tagRPCCall*) buf; #ifdef TCP_SESSION_TIMEOUT struct timespec ts = { DEF_RPC_TIMEOUT, 0 }; @@ -323,9 +323,16 @@ rxPacket(sched_task_t *task) #endif /* prepare rx */ - len = recv(TASK_FD(task), b, sizeof b, MSG_PEEK); - if (len == sizeof b) - rlen = ntohl(((struct tagRPCCall*) b)->call_len); + len = recv(TASK_FD(task), &b, sizeof b, MSG_PEEK); + if (len < 1) { + /* close connection */ + schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], + TASK_ARG(task), 0, NULL, 0); + taskExit(task, NULL); + } else if (len == sizeof b) + rlen = ntohl(b.call_len); + else + goto end; rlen = recv(TASK_FD(task), buf, rlen, 0); if (rlen == -1) { @@ -388,10 +395,10 @@ rxPacket(sched_task_t *task) schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), (int) noreply, rpc, len); err: /* send RPC reply */ - if (!noreply) + if (!noreply && (rpc->call_io & RPC_REQ)) schedWrite(TASK_ROOT(task), cbProto[s->srv_proto][CB_TXPACKET], TASK_ARG(task), TASK_FD(task), rpc, len); - +end: /* lets get next packet */ schedReadSelf(task); taskExit(task, NULL); @@ -402,7 +409,7 @@ acceptClients(sched_task_t *task) { rpc_srv_t *srv = TASK_ARG(task); rpc_cli_t *c = NULL; - socklen_t salen = sizeof(sockaddr_t); + socklen_t salen = E_SOCKADDR_MAX; int sock; #ifdef TCP_SESSION_TIMEOUT struct timespec ts = { DEF_RPC_TIMEOUT, 0 }; @@ -425,8 +432,10 @@ acceptClients(sched_task_t *task) AIT_FREE_VAL(&c->cli_buf); array_Del(srv->srv_clients, c->cli_id, 42); goto end; - } else + } else { fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK); + fcntl(c->cli_sock, F_SETFD, FD_CLOEXEC); + } #ifdef TCP_SESSION_TIMEOUT /* armed timer for close stateless connection */ @@ -499,7 +508,7 @@ txUDPPacket(sched_task_t *task) /* send reply */ ret = sendto(TASK_FD(task), buf, wlen, MSG_NOSIGNAL, - &c->cli_sa.sa, c->cli_sa.sa.sa_len); + &c->cli_sa.sa, e_addrlen(&c->cli_sa)); if (ret == -1) { /* close connection */ schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], @@ -518,13 +527,15 @@ rxUDPPacket(sched_task_t *task) u_short crc; struct tagRPCCall *rpc; sockaddr_t sa; - socklen_t salen; + socklen_t salen = E_SOCKADDR_MAX; struct timespec ts = { DEF_RPC_TIMEOUT, 0 }; ait_val_t b = AIT_VAL_INIT; /* receive connect packet */ AIT_SET_BUF(&b, NULL, srv->srv_netbuf); - salen = sa.ss.ss_len = (u_char) sizeof(sockaddr_t); +#ifndef __linux__ + sa.ss.ss_len = salen; +#endif rlen = recvfrom(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b), 0, &sa.sa, &salen); rpc = (struct tagRPCCall*) AIT_GET_BUF(&b); if (rlen < sizeof(struct tagRPCCall)) @@ -541,7 +552,7 @@ rxUDPPacket(sched_task_t *task) /* check integrity of packet */ crc = ntohs(rpc->call_crc); rpc->call_crc ^= rpc->call_crc; - if (crc != crcFletcher16((u_short*) rpc, len / 2)) + if (crc != crcFletcher16((u_short*) AIT_GET_BUF(&b), len / 2)) goto end; /* check RPC packet session info */ @@ -550,7 +561,7 @@ rxUDPPacket(sched_task_t *task) c = _allocClient(srv, &sa); if (!c) { - EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n"); + EVERBOSE(1, "RPC client quota exceeded!"); usleep(2000); /* blocked client delay */ goto end; } else { @@ -572,7 +583,7 @@ rxUDPPacket(sched_task_t *task) schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len); /* send RPC reply */ - if (!noreply) + if (!noreply && (rpc->call_io & RPC_REQ)) schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], c, TASK_FD(task), rpc, len); end: @@ -639,7 +650,7 @@ txRAWPacket(sched_task_t *task) /* send reply */ ret = sendto(TASK_FD(task), buf, wlen, MSG_NOSIGNAL, - &c->cli_sa.sa, c->cli_sa.sa.sa_len); + &c->cli_sa.sa, e_addrlen(&c->cli_sa)); if (ret == -1) { /* close connection */ schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], @@ -658,13 +669,15 @@ rxRAWPacket(sched_task_t *task) u_short crc; struct tagRPCCall *rpc; sockaddr_t sa; - socklen_t salen; + socklen_t salen = E_SOCKADDR_MAX; struct timespec ts = { DEF_RPC_TIMEOUT, 0 }; ait_val_t b = AIT_VAL_INIT; /* receive connect packet */ AIT_SET_BUF(&b, NULL, srv->srv_netbuf); - salen = sa.ss.ss_len = (u_char) sizeof(sockaddr_t); +#ifndef __linux__ + sa.ss.ss_len = salen; +#endif rlen = recvfrom(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b), 0, &sa.sa, &salen); if (sa.sa.sa_family == AF_INET) { struct ip *h; @@ -676,15 +689,18 @@ rxRAWPacket(sched_task_t *task) rpc = (struct tagRPCCall*) (h + 1); } } else { +#ifdef IPV6_REMOVE_HEADER struct ip6_hdr *h; h = (struct ip6_hdr*) AIT_GET_BUF(&b); - if (rlen < (ntohs(h->ip6_plen) + sizeof(struct ip6_hdr)) || - h->ip6_nxt != IPPROTO_ERPC) + if (rlen < ntohs(h->ip6_plen) || h->ip6_nxt != IPPROTO_ERPC) goto end; else { rlen -= sizeof(struct ip6_hdr); rpc = (struct tagRPCCall*) (h + 1); } +#else + rpc = (struct tagRPCCall*) AIT_GET_BUF(&b); +#endif } if (rlen < sizeof(struct tagRPCCall)) goto end; @@ -700,7 +716,7 @@ rxRAWPacket(sched_task_t *task) /* check integrity of packet */ crc = ntohs(rpc->call_crc); rpc->call_crc ^= rpc->call_crc; - if (crc != crcFletcher16((u_short*) rpc, len / 2)) + if (crc != crcFletcher16((u_short*) AIT_GET_BUF(&b), len / 2)) goto end; /* check RPC packet session info */ @@ -731,7 +747,7 @@ rxRAWPacket(sched_task_t *task) schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len); /* send RPC reply */ - if (!noreply) + if (!noreply && (rpc->call_io & RPC_REQ)) schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], c, TASK_FD(task), rpc, len); end: @@ -744,6 +760,7 @@ end: static void * txBPFPacket(sched_task_t *task) { +#ifndef __linux__ rpc_cli_t *c = TASK_ARG(task); rpc_srv_t *s = c->cli_parent; rpc_func_t *f = NULL; @@ -812,6 +829,9 @@ txBPFPacket(sched_task_t *task) schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], TASK_ARG(task), 0, NULL, 0); } +#else + rpc_SetErr(ENOTSUP, "Feature isn't supported on Linux!"); +#endif taskExit(task, NULL); } @@ -819,6 +839,7 @@ txBPFPacket(sched_task_t *task) static void * rxBPFPacket(sched_task_t *task) { +#ifndef __linux__ rpc_srv_t *srv = TASK_ARG(task); rpc_cli_t *c = NULL; int len, rlen, noreply; @@ -896,12 +917,16 @@ rxBPFPacket(sched_task_t *task) schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len); /* send RPC reply */ - if (!noreply) + if (!noreply && (rpc->call_io & RPC_REQ)) schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], c, TASK_FD(task), rpc, len); end: AIT_FREE_VAL(&b); schedReadSelf(task); +#else + rpc_SetErr(ENOTSUP, "Feature isn't supported on Linux!"); +#endif + taskExit(task, NULL); } @@ -1022,7 +1047,7 @@ rxEXTPacket(sched_task_t *task) schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len); /* send RPC reply */ - if (!noreply) + if (!noreply && (rpc->call_io & RPC_REQ)) schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], c, TASK_FD(task), rpc, len); end: @@ -1182,7 +1207,7 @@ flushBLOB(sched_task_t *task) e_free(b); } - if (!schedSignalSelf(task)) { + if (sigArg) { /* disabled kqueue support in libaitsched */ struct sigaction sa; @@ -1191,9 +1216,11 @@ flushBLOB(sched_task_t *task) sa.sa_handler = (void (*)(int)) flushBLOB; sa.sa_flags = SA_RESTART | SA_RESETHAND; sigaction(SIGFBLOB, &sa, NULL); + return NULL; + } else { + schedSignalSelf(task); + taskExit(task, NULL); } - - taskExit(task, NULL); } static void * @@ -1202,7 +1229,7 @@ acceptBLOBClients(sched_task_t *task) rpc_srv_t *srv = TASK_ARG(task); rpc_cli_t *c = NULL; register int i; - socklen_t salen = sizeof(sockaddr_t); + socklen_t salen = E_SOCKADDR_MAX; int sock; #ifdef TCP_NOPUSH int n = 1; @@ -1247,6 +1274,7 @@ acceptBLOBClients(sched_task_t *task) setsockopt(c->cli_sock, IPPROTO_TCP, TCP_NOPUSH, &n, sizeof n); #endif fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK); + fcntl(c->cli_sock, F_SETFD, FD_CLOEXEC); } schedRead(TASK_ROOT(task), rxBLOB, c, c->cli_sock, NULL, 0); @@ -1269,6 +1297,7 @@ int rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir) { int n = 1; + socklen_t salen; if (!srv || srv->srv_kill) { rpc_SetErr(EINVAL, "Invalid parameters can`t init BLOB server"); @@ -1287,19 +1316,22 @@ rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s srv->srv_blob.server.cli_parent = srv; - memcpy(&srv->srv_blob.server.cli_sa, &srv->srv_server.cli_sa, sizeof(sockaddr_t)); + memcpy(&srv->srv_blob.server.cli_sa, &srv->srv_server.cli_sa, sizeof srv->srv_blob.server.cli_sa); switch (srv->srv_blob.server.cli_sa.sa.sa_family) { case AF_INET: srv->srv_blob.server.cli_sa.sin.sin_port = htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin.sin_port) + 1); + salen = sizeof srv->srv_blob.server.cli_sa.sin; break; case AF_INET6: srv->srv_blob.server.cli_sa.sin6.sin6_port = htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin6.sin6_port) + 1); + salen = sizeof srv->srv_blob.server.cli_sa.sin6; break; case AF_LOCAL: strlcat(srv->srv_blob.server.cli_sa.sun.sun_path, ".blob", sizeof srv->srv_blob.server.cli_sa.sun.sun_path); + salen = sizeof srv->srv_blob.server.cli_sa.sun; break; default: AIT_FREE_VAL(&srv->srv_blob.dir); @@ -1332,8 +1364,7 @@ rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s AIT_FREE_VAL(&srv->srv_blob.dir); return -1; } - if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa, - srv->srv_blob.server.cli_sa.sa.sa_len) == -1) { + if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa, salen) == -1) { LOGERR; close(srv->srv_blob.server.cli_sock); AIT_FREE_VAL(&srv->srv_blob.dir); @@ -1483,6 +1514,7 @@ rpc_srv_initServer(u_char InstID, int concurentClients int n = 1; rpc_srv_t *srv = NULL; sockaddr_t sa = E_SOCKADDR_INIT; + socklen_t salen; if (!concurentClients || (proto < 0 || proto > SOCK_RAW)) { rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server"); @@ -1490,7 +1522,7 @@ rpc_srv_initServer(u_char InstID, int concurentClients } if (!Port && proto < SOCK_RAW) Port = RPC_DEFPORT; - if (!e_gethostbyname(csHost, Port, &sa)) + if (!(salen = e_gethostbyname(csHost, Port, &sa))) return NULL; if (!proto) proto = SOCK_STREAM; @@ -1534,7 +1566,8 @@ rpc_srv_initServer(u_char InstID, int concurentClients pthread_mutex_destroy(&srv->srv_funcs.mtx); e_free(srv); return NULL; - } + } else + schedSignalDispatch(srv->srv_root, 42); /* init pool for clients */ srv->srv_clients = array_Init(concurentClients); @@ -1572,8 +1605,7 @@ rpc_srv_initServer(u_char InstID, int concurentClients LOGERR; goto err; } - if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa, - srv->srv_server.cli_sa.sa.sa_len) == -1) { + if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa, salen) == -1) { LOGERR; goto err; } else @@ -1657,6 +1689,7 @@ rpc_srv_loopServer(rpc_srv_t * __restrict srv) schedPolling(srv->srv_root, &ts, NULL); /* main rpc loop */ schedRun(srv->srv_root, &srv->srv_kill); + schedSignalDispatch(srv->srv_root, 0); /* close all clients connections & server socket */ for (i = 0; i < array_Size(srv->srv_clients); i++) { @@ -1730,6 +1763,7 @@ rpc_srv_execCall(rpc_cli_t * __restrict cli, struct ta rpc_srv_t * rpc_srv_initServer2(u_char InstID, int concurentClients, int netBuf, const char *csIface) { +#ifndef __linux__ int n = 1; rpc_srv_t *srv = NULL; sockaddr_t sa = E_SOCKADDR_INIT; @@ -1794,7 +1828,8 @@ rpc_srv_initServer2(u_char InstID, int concurentClient pthread_mutex_destroy(&srv->srv_funcs.mtx); e_free(srv); return NULL; - } + } else + schedSignalDispatch(srv->srv_root, 42); /* init pool for clients */ srv->srv_clients = array_Init(concurentClients); @@ -1856,6 +1891,10 @@ err: /* error condition */ schedEnd(&srv->srv_root); pthread_mutex_destroy(&srv->srv_funcs.mtx); e_free(srv); +#else + rpc_SetErr(ENOTSUP, "Feature isn't supported on Linux!"); +#endif + return NULL; } @@ -1908,7 +1947,8 @@ rpc_srv_initServerExt(u_char InstID, int netBuf, int f pthread_mutex_destroy(&srv->srv_funcs.mtx); e_free(srv); return NULL; - } + } else + schedSignalDispatch(srv->srv_root, 42); /* init pool for clients */ srv->srv_clients = array_Init(1); @@ -1926,4 +1966,26 @@ rpc_srv_initServerExt(u_char InstID, int netBuf, int f rpc_register_srvPing(srv); return srv; +} + +/* + * rpc_srv_Return() - Prepare IPC return answer to RPC client + * + * @c = RPC client + * return: number of arguments in response + */ +int +rpc_srv_Return(rpc_cli_t *c) +{ + rpc_srv_t *s = c->cli_parent; + u_char *buf = AIT_GET_BUF(&c->cli_buf); + struct tagRPCCall *rpc = (struct tagRPCCall*) buf; + + if (!RPC_CHK_NOREPLY(rpc)) { + rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c)); + schedWrite(s->srv_root, cbProto[s->srv_proto][CB_TXPACKET], c, c->cli_sock, rpc, 0); + } else + rpc->call_argc ^= rpc->call_argc; + + return rpc->call_argc; }