--- libaitrpc/src/srv.c 2015/06/29 22:39:33 1.26.2.9 +++ libaitrpc/src/srv.c 2016/08/02 12:00:39 1.28.2.3 @@ -3,7 +3,7 @@ * by Michael Pounov * * $Author: misho $ -* $Id: srv.c,v 1.26.2.9 2015/06/29 22:39:33 misho Exp $ +* $Id: srv.c,v 1.28.2.3 2016/08/02 12:00:39 misho Exp $ * ************************************************************************** The ELWIX and AITNET software is distributed under the following @@ -12,7 +12,7 @@ terms: All of the documentation and software included in the ELWIX and AITNET Releases is copyrighted by ELWIX - Sofia/Bulgaria -Copyright 2004 - 2015 +Copyright 2004 - 2016 by Michael Pounov . All rights reserved. Redistribution and use in source and binary forms, with or without @@ -82,7 +82,6 @@ static sched_task_func_t cbProto[SOCK_MAX_SUPPORT][4] static volatile uintptr_t _glSigArg = 0; - void rpc_freeCli(rpc_cli_t * __restrict c) { @@ -197,14 +196,14 @@ txPacket(sched_task_t *task) rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server"); rpc->call_argc ^= rpc->call_argc; - rpc->call_rep.ret = RPC_ERROR(-1); - rpc->call_rep.eno = RPC_ERROR(rpc_Errno); + RPC_SET_RETURN(rpc, -1); + RPC_SET_ERRNO(rpc, rpc_Errno); } else if (rpc_pktFreeSpace(c) > s->srv_netbuf) { rpc_SetErr(EMSGSIZE, "Message too long"); rpc->call_argc ^= rpc->call_argc; - rpc->call_rep.ret = RPC_ERROR(-1); - rpc->call_rep.eno = RPC_ERROR(rpc_Errno); + RPC_SET_RETURN(rpc, -1); + RPC_SET_ERRNO(rpc, rpc_Errno); } else { rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c)); /* Go Encapsulate variables */ @@ -214,8 +213,8 @@ txPacket(sched_task_t *task) rpc_SetErr(EBADRPC, "Prepare RPC packet failed"); rpc->call_argc ^= rpc->call_argc; - rpc->call_rep.ret = RPC_ERROR(-1); - rpc->call_rep.eno = RPC_ERROR(rpc_Errno); + RPC_SET_RETURN(rpc, -1); + RPC_SET_ERRNO(rpc, rpc_Errno); } else wlen += ret; } @@ -263,8 +262,8 @@ execCall(sched_task_t *task) rpc_SetErr(ERPCMISMATCH, "#%d - %s", elwix_GetErrno(), elwix_GetError()); rpc->call_argc ^= rpc->call_argc; - rpc->call_rep.ret = RPC_ERROR(-1); - rpc->call_rep.eno = RPC_ERROR(rpc_Errno); + RPC_SET_RETURN(rpc, -1); + RPC_SET_ERRNO(rpc, rpc_Errno); taskExit(task, NULL); } } else @@ -274,15 +273,15 @@ execCall(sched_task_t *task) rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server"); rpc->call_argc ^= rpc->call_argc; - rpc->call_rep.ret = RPC_ERROR(-1); - rpc->call_rep.eno = RPC_ERROR(rpc_Errno); + RPC_SET_RETURN(rpc, -1); + RPC_SET_ERRNO(rpc, rpc_Errno); } else { /* if client doesn't want reply */ - rpc->call_rep.ret = RPC_ERROR(rpc_srv_execCall(c, rpc, f->func_name, arr)); + RPC_SET_RETURN(rpc, rpc_srv_execCall(c, rpc, f->func_name, arr)); if (rpc->call_rep.ret == htonl(-1)) { if (!rpc->call_rep.eno) { LOGERR; - rpc->call_rep.eno = RPC_ERROR(rpc_Errno); + RPC_SET_ERRNO(rpc, rpc_Errno); } rpc->call_argc ^= rpc->call_argc; ait_freeVars(&c->cli_vars); @@ -313,7 +312,7 @@ rxPacket(sched_task_t *task) u_short crc; #endif u_char *buf = AIT_GET_BUF(&c->cli_buf); - struct tagRPCCall *rpc = (struct tagRPCCall*) buf; + struct tagRPCCall b, *rpc = (struct tagRPCCall*) buf; #ifdef TCP_SESSION_TIMEOUT struct timespec ts = { DEF_RPC_TIMEOUT, 0 }; @@ -322,6 +321,11 @@ rxPacket(sched_task_t *task) TASK_ARG(task), ts, TASK_ARG(task), 0); #endif + /* prepare rx */ + len = recv(TASK_FD(task), &b, sizeof b, MSG_PEEK); + if (len == sizeof b) + rlen = ntohl(b.call_len); + rlen = recv(TASK_FD(task), buf, rlen, 0); if (rlen == -1) { /* close connection */ @@ -333,8 +337,8 @@ rxPacket(sched_task_t *task) rpc_SetErr(ERPCMISMATCH, "Short RPC packet"); rpc->call_argc ^= rpc->call_argc; - rpc->call_rep.ret = RPC_ERROR(-1); - rpc->call_rep.eno = RPC_ERROR(errno); + RPC_SET_RETURN(rpc, -1); + RPC_SET_ERRNO(rpc, rpc_Errno); goto err; } else len = ntohl(rpc->call_len); @@ -342,8 +346,8 @@ rxPacket(sched_task_t *task) rpc_SetErr(ERPCMISMATCH, "Short RPC packet"); rpc->call_argc ^= rpc->call_argc; - rpc->call_rep.ret = RPC_ERROR(-1); - rpc->call_rep.eno = RPC_ERROR(errno); + RPC_SET_RETURN(rpc, -1); + RPC_SET_ERRNO(rpc, rpc_Errno); goto err; } @@ -361,8 +365,8 @@ rxPacket(sched_task_t *task) rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet"); rpc->call_argc ^= rpc->call_argc; - rpc->call_rep.ret = RPC_ERROR(-1); - rpc->call_rep.eno = RPC_ERROR(errno); + RPC_SET_RETURN(rpc, -1); + RPC_SET_ERRNO(rpc, rpc_Errno); goto err; } #endif @@ -372,8 +376,8 @@ rxPacket(sched_task_t *task) rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session"); rpc->call_argc ^= rpc->call_argc; - rpc->call_rep.ret = RPC_ERROR(-1); - rpc->call_rep.eno = RPC_ERROR(errno); + RPC_SET_RETURN(rpc, -1); + RPC_SET_ERRNO(rpc, rpc_Errno); goto err; } @@ -397,7 +401,7 @@ acceptClients(sched_task_t *task) { rpc_srv_t *srv = TASK_ARG(task); rpc_cli_t *c = NULL; - socklen_t salen = sizeof(sockaddr_t); + socklen_t salen = E_SOCKADDR_MAX; int sock; #ifdef TCP_SESSION_TIMEOUT struct timespec ts = { DEF_RPC_TIMEOUT, 0 }; @@ -458,14 +462,14 @@ txUDPPacket(sched_task_t *task) rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server"); rpc->call_argc ^= rpc->call_argc; - rpc->call_rep.ret = RPC_ERROR(-1); - rpc->call_rep.eno = RPC_ERROR(rpc_Errno); + RPC_SET_RETURN(rpc, -1); + RPC_SET_ERRNO(rpc, rpc_Errno); } else if (rpc_pktFreeSpace(c) > s->srv_netbuf) { rpc_SetErr(EMSGSIZE, "Message too long"); rpc->call_argc ^= rpc->call_argc; - rpc->call_rep.ret = RPC_ERROR(-1); - rpc->call_rep.eno = RPC_ERROR(rpc_Errno); + RPC_SET_RETURN(rpc, -1); + RPC_SET_ERRNO(rpc, rpc_Errno); } else { rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c)); /* Go Encapsulate variables */ @@ -475,8 +479,8 @@ txUDPPacket(sched_task_t *task) rpc_SetErr(EBADRPC, "Prepare RPC packet failed"); rpc->call_argc ^= rpc->call_argc; - rpc->call_rep.ret = RPC_ERROR(-1); - rpc->call_rep.eno = RPC_ERROR(rpc_Errno); + RPC_SET_RETURN(rpc, -1); + RPC_SET_ERRNO(rpc, rpc_Errno); } else wlen += ret; } @@ -494,7 +498,7 @@ txUDPPacket(sched_task_t *task) /* send reply */ ret = sendto(TASK_FD(task), buf, wlen, MSG_NOSIGNAL, - &c->cli_sa.sa, c->cli_sa.sa.sa_len); + &c->cli_sa.sa, sizeof c->cli_sa.sa); if (ret == -1) { /* close connection */ schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], @@ -513,13 +517,15 @@ rxUDPPacket(sched_task_t *task) u_short crc; struct tagRPCCall *rpc; sockaddr_t sa; - socklen_t salen; + socklen_t salen = E_SOCKADDR_MAX; struct timespec ts = { DEF_RPC_TIMEOUT, 0 }; ait_val_t b = AIT_VAL_INIT; /* receive connect packet */ AIT_SET_BUF(&b, NULL, srv->srv_netbuf); - salen = sa.ss.ss_len = sizeof(sockaddr_t); +#ifndef __linux__ + sa.ss.ss_len = salen; +#endif rlen = recvfrom(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b), 0, &sa.sa, &salen); rpc = (struct tagRPCCall*) AIT_GET_BUF(&b); if (rlen < sizeof(struct tagRPCCall)) @@ -598,14 +604,14 @@ txRAWPacket(sched_task_t *task) rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server"); rpc->call_argc ^= rpc->call_argc; - rpc->call_rep.ret = RPC_ERROR(-1); - rpc->call_rep.eno = RPC_ERROR(rpc_Errno); + RPC_SET_RETURN(rpc, -1); + RPC_SET_ERRNO(rpc, rpc_Errno); } else if (rpc_pktFreeSpace(c) > s->srv_netbuf) { rpc_SetErr(EMSGSIZE, "Message too long"); rpc->call_argc ^= rpc->call_argc; - rpc->call_rep.ret = RPC_ERROR(-1); - rpc->call_rep.eno = RPC_ERROR(rpc_Errno); + RPC_SET_RETURN(rpc, -1); + RPC_SET_ERRNO(rpc, rpc_Errno); } else { rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c)); /* Go Encapsulate variables */ @@ -613,9 +619,10 @@ txRAWPacket(sched_task_t *task) RPC_RETVARS(c)); if (ret == -1) { rpc_SetErr(EBADRPC, "Prepare RPC packet failed"); + rpc->call_argc ^= rpc->call_argc; - rpc->call_rep.ret = RPC_ERROR(-1); - rpc->call_rep.eno = RPC_ERROR(rpc_Errno); + RPC_SET_RETURN(rpc, -1); + RPC_SET_ERRNO(rpc, rpc_Errno); } else wlen += ret; } @@ -633,7 +640,7 @@ txRAWPacket(sched_task_t *task) /* send reply */ ret = sendto(TASK_FD(task), buf, wlen, MSG_NOSIGNAL, - &c->cli_sa.sa, c->cli_sa.sa.sa_len); + &c->cli_sa.sa, sizeof c->cli_sa.sa); if (ret == -1) { /* close connection */ schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], @@ -652,13 +659,15 @@ rxRAWPacket(sched_task_t *task) u_short crc; struct tagRPCCall *rpc; sockaddr_t sa; - socklen_t salen; + socklen_t salen = E_SOCKADDR_MAX; struct timespec ts = { DEF_RPC_TIMEOUT, 0 }; ait_val_t b = AIT_VAL_INIT; /* receive connect packet */ AIT_SET_BUF(&b, NULL, srv->srv_netbuf); - salen = sa.ss.ss_len = sizeof(sockaddr_t); +#ifndef __linux__ + sa.ss.ss_len = salen; +#endif rlen = recvfrom(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b), 0, &sa.sa, &salen); if (sa.sa.sa_family == AF_INET) { struct ip *h; @@ -738,6 +747,7 @@ end: static void * txBPFPacket(sched_task_t *task) { +#ifndef __linux__ rpc_cli_t *c = TASK_ARG(task); rpc_srv_t *s = c->cli_parent; rpc_func_t *f = NULL; @@ -758,14 +768,14 @@ txBPFPacket(sched_task_t *task) rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server"); rpc->call_argc ^= rpc->call_argc; - rpc->call_rep.ret = RPC_ERROR(-1); - rpc->call_rep.eno = RPC_ERROR(rpc_Errno); + RPC_SET_RETURN(rpc, -1); + RPC_SET_ERRNO(rpc, rpc_Errno); } else if (rpc_pktFreeSpace(c) > s->srv_netbuf) { rpc_SetErr(EMSGSIZE, "Message too long"); rpc->call_argc ^= rpc->call_argc; - rpc->call_rep.ret = RPC_ERROR(-1); - rpc->call_rep.eno = RPC_ERROR(rpc_Errno); + RPC_SET_RETURN(rpc, -1); + RPC_SET_ERRNO(rpc, rpc_Errno); } else { rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c)); /* Go Encapsulate variables */ @@ -775,8 +785,8 @@ txBPFPacket(sched_task_t *task) rpc_SetErr(EBADRPC, "Prepare RPC packet failed"); rpc->call_argc ^= rpc->call_argc; - rpc->call_rep.ret = RPC_ERROR(-1); - rpc->call_rep.eno = RPC_ERROR(rpc_Errno); + RPC_SET_RETURN(rpc, -1); + RPC_SET_ERRNO(rpc, rpc_Errno); } else wlen += ret; } @@ -806,6 +816,9 @@ txBPFPacket(sched_task_t *task) schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], TASK_ARG(task), 0, NULL, 0); } +#else + rpc_SetErr(ENOTSUP, "Feature isn't supported on Linux!"); +#endif taskExit(task, NULL); } @@ -813,6 +826,7 @@ txBPFPacket(sched_task_t *task) static void * rxBPFPacket(sched_task_t *task) { +#ifndef __linux__ rpc_srv_t *srv = TASK_ARG(task); rpc_cli_t *c = NULL; int len, rlen, noreply; @@ -896,6 +910,10 @@ rxBPFPacket(sched_task_t *task) end: AIT_FREE_VAL(&b); schedReadSelf(task); +#else + rpc_SetErr(ENOTSUP, "Feature isn't supported on Linux!"); +#endif + taskExit(task, NULL); } @@ -921,14 +939,14 @@ txEXTPacket(sched_task_t *task) rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server"); rpc->call_argc ^= rpc->call_argc; - rpc->call_rep.ret = RPC_ERROR(-1); - rpc->call_rep.eno = RPC_ERROR(rpc_Errno); + RPC_SET_RETURN(rpc, -1); + RPC_SET_ERRNO(rpc, rpc_Errno); } else if (rpc_pktFreeSpace(c) > s->srv_netbuf) { rpc_SetErr(EMSGSIZE, "Message too long"); rpc->call_argc ^= rpc->call_argc; - rpc->call_rep.ret = RPC_ERROR(-1); - rpc->call_rep.eno = RPC_ERROR(rpc_Errno); + RPC_SET_RETURN(rpc, -1); + RPC_SET_ERRNO(rpc, rpc_Errno); } else { rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c)); /* Go Encapsulate variables */ @@ -938,8 +956,8 @@ txEXTPacket(sched_task_t *task) rpc_SetErr(EBADRPC, "Prepare RPC packet failed"); rpc->call_argc ^= rpc->call_argc; - rpc->call_rep.ret = RPC_ERROR(-1); - rpc->call_rep.eno = RPC_ERROR(rpc_Errno); + RPC_SET_RETURN(rpc, -1); + RPC_SET_ERRNO(rpc, rpc_Errno); } else wlen += ret; } @@ -1111,7 +1129,7 @@ rxBLOB(sched_task_t *task) if (!(b = rpc_srv_getBLOB(s, ntohl(blob.hdr_var)))) { rpc_SetErr(EINVAL, "Var=%x not found", ntohl(blob.hdr_var)); blob.hdr_cmd = no; - blob.hdr_ret = RPC_ERROR(-1); + RPC_SET_BLOB_RET(&blob, -1); break; } else blob.hdr_len = htonl(b->blob_len); @@ -1122,7 +1140,7 @@ rxBLOB(sched_task_t *task) rpc_srv_blobUnmap(b); } else { blob.hdr_cmd = error; - blob.hdr_ret = RPC_ERROR(-1); + RPC_SET_BLOB_RET(&blob, -1); } break; case set: @@ -1136,19 +1154,19 @@ rxBLOB(sched_task_t *task) rpc_srv_blobUnmap(b); } else { blob.hdr_cmd = error; - blob.hdr_ret = RPC_ERROR(-1); + RPC_SET_BLOB_RET(&blob, -1); } break; case unset: if (rpc_srv_unregisterBLOB(s, ntohl(blob.hdr_var)) == -1) { blob.hdr_cmd = error; - blob.hdr_ret = RPC_ERROR(-1); + RPC_SET_BLOB_RET(&blob, -1); } break; default: rpc_SetErr(EPROCUNAVAIL, "Unsupported BLOB command %d", blob.hdr_cmd); blob.hdr_cmd = error; - blob.hdr_ret = RPC_ERROR(-1); + RPC_SET_BLOB_RET(&blob, -1); } end: @@ -1161,7 +1179,11 @@ end: static void * flushBLOB(sched_task_t *task) { +#ifdef atomic_load_acq_ptr uintptr_t sigArg = atomic_load_acq_ptr(&_glSigArg); +#else + uintptr_t sigArg = *((volatile uintptr_t*) &_glSigArg); +#endif rpc_srv_t *srv = sigArg ? (void*) sigArg : TASK_ARG(task); rpc_blob_t *b, *tmp; @@ -1192,7 +1214,7 @@ acceptBLOBClients(sched_task_t *task) rpc_srv_t *srv = TASK_ARG(task); rpc_cli_t *c = NULL; register int i; - socklen_t salen = sizeof(sockaddr_t); + socklen_t salen = E_SOCKADDR_MAX; int sock; #ifdef TCP_NOPUSH int n = 1; @@ -1259,6 +1281,7 @@ int rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir) { int n = 1; + socklen_t salen; if (!srv || srv->srv_kill) { rpc_SetErr(EINVAL, "Invalid parameters can`t init BLOB server"); @@ -1282,14 +1305,17 @@ rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s case AF_INET: srv->srv_blob.server.cli_sa.sin.sin_port = htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin.sin_port) + 1); + salen = sizeof srv->srv_blob.server.cli_sa.sin; break; case AF_INET6: srv->srv_blob.server.cli_sa.sin6.sin6_port = htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin6.sin6_port) + 1); + salen = sizeof srv->srv_blob.server.cli_sa.sin6; break; case AF_LOCAL: strlcat(srv->srv_blob.server.cli_sa.sun.sun_path, ".blob", sizeof srv->srv_blob.server.cli_sa.sun.sun_path); + salen = sizeof srv->srv_blob.server.cli_sa.sun; break; default: AIT_FREE_VAL(&srv->srv_blob.dir); @@ -1322,8 +1348,7 @@ rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s AIT_FREE_VAL(&srv->srv_blob.dir); return -1; } - if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa, - srv->srv_blob.server.cli_sa.sa.sa_len) == -1) { + if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa, salen) == -1) { LOGERR; close(srv->srv_blob.server.cli_sock); AIT_FREE_VAL(&srv->srv_blob.dir); @@ -1369,10 +1394,10 @@ rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv) srv->srv_blob.kill = 1; - schedEnd(&srv->srv_blob.root); - if (srv->srv_blob.server.cli_sa.sa.sa_family == AF_LOCAL) unlink(srv->srv_blob.server.cli_sa.sun.sun_path); + + schedEnd(&srv->srv_blob.root); } /* @@ -1403,7 +1428,11 @@ rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv) /* disabled kqueue support in libaitsched */ struct sigaction sa; +#ifdef atomic_store_rel_ptr atomic_store_rel_ptr(&_glSigArg, (uintptr_t) srv); +#else + *((volatile uintptr_t*) &_glSigArg) = (uintptr_t) srv; +#endif memset(&sa, 0, sizeof sa); sigemptyset(&sa.sa_mask); @@ -1469,6 +1498,7 @@ rpc_srv_initServer(u_char InstID, int concurentClients int n = 1; rpc_srv_t *srv = NULL; sockaddr_t sa = E_SOCKADDR_INIT; + socklen_t salen; if (!concurentClients || (proto < 0 || proto > SOCK_RAW)) { rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server"); @@ -1476,7 +1506,7 @@ rpc_srv_initServer(u_char InstID, int concurentClients } if (!Port && proto < SOCK_RAW) Port = RPC_DEFPORT; - if (!e_gethostbyname(csHost, Port, &sa)) + if (!(salen = e_gethostbyname(csHost, Port, &sa))) return NULL; if (!proto) proto = SOCK_STREAM; @@ -1558,8 +1588,7 @@ rpc_srv_initServer(u_char InstID, int concurentClients LOGERR; goto err; } - if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa, - srv->srv_server.cli_sa.sa.sa_len) == -1) { + if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa, salen) == -1) { LOGERR; goto err; } else @@ -1592,15 +1621,18 @@ rpc_srv_endServer(rpc_srv_t ** __restrict psrv) /* if send kill to blob server */ rpc_srv_endBLOBServer(*psrv); + /* wait for BLOB server done */ + while (*(&(*psrv)->srv_blob.root)) + usleep(1000); (*psrv)->srv_kill = 1; sleep(RPC_SCHED_POLLING); - schedEnd(&(*psrv)->srv_root); - if ((*psrv)->srv_server.cli_sa.sa.sa_family == AF_LOCAL) unlink((*psrv)->srv_server.cli_sa.sun.sun_path); + schedEnd(&(*psrv)->srv_root); + pthread_mutex_destroy(&(*psrv)->srv_funcs.mtx); e_free(*psrv); *psrv = NULL; @@ -1713,6 +1745,7 @@ rpc_srv_execCall(rpc_cli_t * __restrict cli, struct ta rpc_srv_t * rpc_srv_initServer2(u_char InstID, int concurentClients, int netBuf, const char *csIface) { +#ifndef __linux__ int n = 1; rpc_srv_t *srv = NULL; sockaddr_t sa = E_SOCKADDR_INIT; @@ -1839,6 +1872,10 @@ err: /* error condition */ schedEnd(&srv->srv_root); pthread_mutex_destroy(&srv->srv_funcs.mtx); e_free(srv); +#else + rpc_SetErr(ENOTSUP, "Feature isn't supported on Linux!"); +#endif + return NULL; }