--- libaitrpc/src/srv.c 2016/08/02 15:18:41 1.28.2.4 +++ libaitrpc/src/srv.c 2024/02/26 18:25:32 1.30.2.10 @@ -3,7 +3,7 @@ * by Michael Pounov * * $Author: misho $ -* $Id: srv.c,v 1.28.2.4 2016/08/02 15:18:41 misho Exp $ +* $Id: srv.c,v 1.30.2.10 2024/02/26 18:25:32 misho Exp $ * ************************************************************************** The ELWIX and AITNET software is distributed under the following @@ -12,7 +12,7 @@ terms: All of the documentation and software included in the ELWIX and AITNET Releases is copyrighted by ELWIX - Sofia/Bulgaria -Copyright 2004 - 2016 +Copyright 2004 - 2024 by Michael Pounov . All rights reserved. Redistribution and use in source and binary forms, with or without @@ -87,7 +87,8 @@ rpc_freeCli(rpc_cli_t * __restrict c) { rpc_srv_t *s = c->cli_parent; - schedCancelby(s->srv_root, taskMAX, CRITERIA_ARG, c, NULL); + if (s->srv_proto == SOCK_STREAM) + schedCancelby(s->srv_root, taskMAX, CRITERIA_ARG, c, NULL); /* free buffer */ AIT_FREE_VAL(&c->cli_buf); @@ -291,7 +292,7 @@ execCall(sched_task_t *task) if (TASK_VAL(task)) { /* without reply */ ait_freeVars(&c->cli_vars); - } else { + } else if (rpc->call_io & RPC_REQ) { /* reply */ rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c)); } @@ -323,8 +324,15 @@ rxPacket(sched_task_t *task) /* prepare rx */ len = recv(TASK_FD(task), &b, sizeof b, MSG_PEEK); - if (len == sizeof b) + if (len < 1) { + /* close connection */ + schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], + TASK_ARG(task), 0, NULL, 0); + taskExit(task, NULL); + } else if (len == sizeof b) rlen = ntohl(b.call_len); + else + goto end; rlen = recv(TASK_FD(task), buf, rlen, 0); if (rlen == -1) { @@ -387,10 +395,10 @@ rxPacket(sched_task_t *task) schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), (int) noreply, rpc, len); err: /* send RPC reply */ - if (!noreply) + if (!noreply && (rpc->call_io & RPC_REQ)) schedWrite(TASK_ROOT(task), cbProto[s->srv_proto][CB_TXPACKET], TASK_ARG(task), TASK_FD(task), rpc, len); - +end: /* lets get next packet */ schedReadSelf(task); taskExit(task, NULL); @@ -424,8 +432,10 @@ acceptClients(sched_task_t *task) AIT_FREE_VAL(&c->cli_buf); array_Del(srv->srv_clients, c->cli_id, 42); goto end; - } else + } else { fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK); + fcntl(c->cli_sock, F_SETFD, FD_CLOEXEC); + } #ifdef TCP_SESSION_TIMEOUT /* armed timer for close stateless connection */ @@ -542,7 +552,7 @@ rxUDPPacket(sched_task_t *task) /* check integrity of packet */ crc = ntohs(rpc->call_crc); rpc->call_crc ^= rpc->call_crc; - if (crc != crcFletcher16((u_short*) rpc, len / 2)) + if (crc != crcFletcher16((u_short*) AIT_GET_BUF(&b), len / 2)) goto end; /* check RPC packet session info */ @@ -551,7 +561,7 @@ rxUDPPacket(sched_task_t *task) c = _allocClient(srv, &sa); if (!c) { - EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n"); + EVERBOSE(1, "RPC client quota exceeded!"); usleep(2000); /* blocked client delay */ goto end; } else { @@ -573,7 +583,7 @@ rxUDPPacket(sched_task_t *task) schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len); /* send RPC reply */ - if (!noreply) + if (!noreply && (rpc->call_io & RPC_REQ)) schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], c, TASK_FD(task), rpc, len); end: @@ -706,7 +716,7 @@ rxRAWPacket(sched_task_t *task) /* check integrity of packet */ crc = ntohs(rpc->call_crc); rpc->call_crc ^= rpc->call_crc; - if (crc != crcFletcher16((u_short*) rpc, len / 2)) + if (crc != crcFletcher16((u_short*) AIT_GET_BUF(&b), len / 2)) goto end; /* check RPC packet session info */ @@ -737,7 +747,7 @@ rxRAWPacket(sched_task_t *task) schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len); /* send RPC reply */ - if (!noreply) + if (!noreply && (rpc->call_io & RPC_REQ)) schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], c, TASK_FD(task), rpc, len); end: @@ -907,7 +917,7 @@ rxBPFPacket(sched_task_t *task) schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len); /* send RPC reply */ - if (!noreply) + if (!noreply && (rpc->call_io & RPC_REQ)) schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], c, TASK_FD(task), rpc, len); end: @@ -1037,7 +1047,7 @@ rxEXTPacket(sched_task_t *task) schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len); /* send RPC reply */ - if (!noreply) + if (!noreply && (rpc->call_io & RPC_REQ)) schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], c, TASK_FD(task), rpc, len); end: @@ -1197,7 +1207,7 @@ flushBLOB(sched_task_t *task) e_free(b); } - if (!schedSignalSelf(task)) { + if (sigArg) { /* disabled kqueue support in libaitsched */ struct sigaction sa; @@ -1206,9 +1216,11 @@ flushBLOB(sched_task_t *task) sa.sa_handler = (void (*)(int)) flushBLOB; sa.sa_flags = SA_RESTART | SA_RESETHAND; sigaction(SIGFBLOB, &sa, NULL); + return NULL; + } else { + schedSignalSelf(task); + taskExit(task, NULL); } - - taskExit(task, NULL); } static void * @@ -1262,6 +1274,7 @@ acceptBLOBClients(sched_task_t *task) setsockopt(c->cli_sock, IPPROTO_TCP, TCP_NOPUSH, &n, sizeof n); #endif fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK); + fcntl(c->cli_sock, F_SETFD, FD_CLOEXEC); } schedRead(TASK_ROOT(task), rxBLOB, c, c->cli_sock, NULL, 0); @@ -1553,7 +1566,8 @@ rpc_srv_initServer(u_char InstID, int concurentClients pthread_mutex_destroy(&srv->srv_funcs.mtx); e_free(srv); return NULL; - } + } else + schedSignalDispatch(srv->srv_root, 42); /* init pool for clients */ srv->srv_clients = array_Init(concurentClients); @@ -1675,6 +1689,7 @@ rpc_srv_loopServer(rpc_srv_t * __restrict srv) schedPolling(srv->srv_root, &ts, NULL); /* main rpc loop */ schedRun(srv->srv_root, &srv->srv_kill); + schedSignalDispatch(srv->srv_root, 0); /* close all clients connections & server socket */ for (i = 0; i < array_Size(srv->srv_clients); i++) { @@ -1813,7 +1828,8 @@ rpc_srv_initServer2(u_char InstID, int concurentClient pthread_mutex_destroy(&srv->srv_funcs.mtx); e_free(srv); return NULL; - } + } else + schedSignalDispatch(srv->srv_root, 42); /* init pool for clients */ srv->srv_clients = array_Init(concurentClients); @@ -1931,7 +1947,8 @@ rpc_srv_initServerExt(u_char InstID, int netBuf, int f pthread_mutex_destroy(&srv->srv_funcs.mtx); e_free(srv); return NULL; - } + } else + schedSignalDispatch(srv->srv_root, 42); /* init pool for clients */ srv->srv_clients = array_Init(1); @@ -1949,4 +1966,26 @@ rpc_srv_initServerExt(u_char InstID, int netBuf, int f rpc_register_srvPing(srv); return srv; +} + +/* + * rpc_srv_Return() - Prepare IPC return answer to RPC client + * + * @c = RPC client + * return: number of arguments in response + */ +int +rpc_srv_Return(rpc_cli_t *c) +{ + rpc_srv_t *s = c->cli_parent; + u_char *buf = AIT_GET_BUF(&c->cli_buf); + struct tagRPCCall *rpc = (struct tagRPCCall*) buf; + + if (!RPC_CHK_NOREPLY(rpc)) { + rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c)); + schedWrite(s->srv_root, cbProto[s->srv_proto][CB_TXPACKET], c, c->cli_sock, rpc, 0); + } else + rpc->call_argc ^= rpc->call_argc; + + return rpc->call_argc; }