Diff for /libaitrpc/src/srv.c between versions 1.1 and 1.24.2.5

version 1.1, 2010/06/18 01:48:06 version 1.24.2.5, 2015/01/18 00:55:05
Line 5 Line 5
 * $Author$  * $Author$
 * $Id$  * $Id$
 *  *
*************************************************************************/**************************************************************************
 The ELWIX and AITNET software is distributed under the following
 terms:
 
 All of the documentation and software included in the ELWIX and AITNET
 Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
 
 Copyright 2004 - 2015
         by Michael Pounov <misho@elwix.org>.  All rights reserved.
 
 Redistribution and use in source and binary forms, with or without
 modification, are permitted provided that the following conditions
 are met:
 1. Redistributions of source code must retain the above copyright
    notice, this list of conditions and the following disclaimer.
 2. Redistributions in binary form must reproduce the above copyright
    notice, this list of conditions and the following disclaimer in the
    documentation and/or other materials provided with the distribution.
 3. All advertising materials mentioning features or use of this software
    must display the following acknowledgement:
 This product includes software developed by Michael Pounov <misho@elwix.org>
 ELWIX - Embedded LightWeight unIX and its contributors.
 4. Neither the name of AITNET nor the names of its contributors
    may be used to endorse or promote products derived from this software
    without specific prior written permission.
 
 THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
 ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
 OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
 HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
 OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
 SUCH DAMAGE.
 */
 #include "global.h"  #include "global.h"
   
   
static void */* SOCK_STREAM */
rpc_srv_dispatchCall(void *arg)static void *acceptClients(sched_task_t *);
 static void *closeClient(sched_task_t *);
 static void *rxPacket(sched_task_t *);
 static void *txPacket(sched_task_t *);
 
 /* SOCK_DGRAM */
 static void *freeClient(sched_task_t *);
 static void *rxUDPPacket(sched_task_t *);
 static void *txUDPPacket(sched_task_t *);
 
 /* SOCK_RAW */
 
 /* SOCK_BPF */
 static void *rxBPFPacket(sched_task_t *);
 static void *txBPFPacket(sched_task_t *);
 
 /* SOCK_EXT */
 static void *rxEXTPacket(sched_task_t *);
 static void *txEXTPacket(sched_task_t *);
 
 static sched_task_func_t cbProto[SOCK_MAX_SUPPORT][4] = {
         { acceptClients, closeClient, rxPacket, txPacket },     /* SOCK_STREAM */
         { acceptClients, closeClient, rxPacket, txPacket },     /* SOCK_STREAM */
         { rxUDPPacket, freeClient, rxUDPPacket, txUDPPacket },  /* SOCK_DGRAM */
         { NULL, NULL, NULL, NULL },                             /* SOCK_RAW */
         { rxBPFPacket, freeClient, rxBPFPacket, txBPFPacket },  /* SOCK_BPF */
         { rxEXTPacket, freeClient, rxEXTPacket, txEXTPacket }   /* SOCK_EXT */
 };
 
 /* Global Signal Argument when kqueue support disabled */
 
 static volatile uintptr_t _glSigArg = 0;
 
 
 void
 rpc_freeCli(rpc_cli_t * __restrict c)
 {  {
        rpc_cli_t *c = arg;        rpc_srv_t *s = c->cli_parent;
        rpc_srv_t *s;
        rpc_val_t *vals, *v = NULL;        schedCancelby(s->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
        rpc_func_t *f;
        struct tagRPCCall *rpc;        /* free buffer */
        struct tagRPCRet rrpc;        AIT_FREE_VAL(&c->cli_buf);
        fd_set fds;
        u_char buf[BUFSIZ], *data;        array_Del(s->srv_clients, c->cli_id, 0);
        int ret, argc, Limit = 0;        if (c)
                 e_free(c);
 }
 
 
 static inline int
 _check4freeslot(rpc_srv_t * __restrict srv, sockaddr_t * __restrict sa)
 {
         rpc_cli_t *c = NULL;
         register int i;          register int i;
   
        if (!arg) {        /* check free slots for connect */
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t procced RPC client ...\n");        for (i = 0; i < array_Size(srv->srv_clients) && 
                         (c = array(srv->srv_clients, i, rpc_cli_t*)); i++)
                 /* check for duplicates */
                 if (sa && !e_addrcmp(&c->cli_sa, sa, 42))
                         break;
         if (i >= array_Size(srv->srv_clients))
                 return -1;      /* no more free slots! */
 
         return i;
 }
 
 static rpc_cli_t *
 _allocClient(rpc_srv_t * __restrict srv, sockaddr_t * __restrict sa)
 {
         rpc_cli_t *c = NULL;
         int n;
 
         if (srv->srv_proto != SOCK_EXT)
                 n = _check4freeslot(srv, sa);
         else
                 n = 0;
         if (n == -1)
                 return NULL;                  return NULL;
           else
                   c = array(srv->srv_clients, n, rpc_cli_t*);
   
           if (!c) {
                   c = e_malloc(sizeof(rpc_cli_t));
                   if (!c) {
                           LOGERR;
                           srv->srv_kill = 1;
                           return NULL;
                   } else {
                           memset(c, 0, sizeof(rpc_cli_t));
                           array_Set(srv->srv_clients, n, c);
                           c->cli_id = n;
                           c->cli_parent = srv;
                   }
   
                   /* alloc empty buffer */
                   AIT_SET_BUFSIZ(&c->cli_buf, 0, srv->srv_netbuf);
           }
   
           return c;
   }
   
   
   static void *
   freeClient(sched_task_t *task)
   {
           rpc_freeCli(TASK_ARG(task));
   
           return NULL;
   }
   
   static void *
   closeClient(sched_task_t *task)
   {
           int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock;
   
           rpc_freeCli(TASK_ARG(task));
   
           /* close client socket */
           shutdown(sock, SHUT_RDWR);
           close(sock);
           return NULL;
   }
   
   static void *
   txPacket(sched_task_t *task)
   {
           rpc_cli_t *c = TASK_ARG(task);
           rpc_srv_t *s = c->cli_parent;
           rpc_func_t *f = NULL;
           u_char *buf = AIT_GET_BUF(&c->cli_buf);
           struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
           int ret, estlen, wlen = sizeof(struct tagRPCCall);
           struct pollfd pfd;
   #ifdef TCP_SESSION_TIMEOUT
           struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
   
           schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
           schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                            TASK_ARG(task), ts, TASK_ARG(task), 0);
   #endif
   
           if (rpc->call_argc) {
                   f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
                   if (!f) {
                           rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
   
                           rpc->call_argc ^= rpc->call_argc;
                           rpc->call_rep.ret = RPC_ERROR(-1);
                           rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                   } else {
                           /* calc estimated length */
                           estlen = ait_resideVars(RPC_RETVARS(c)) + wlen;
                           if (estlen > AIT_LEN(&c->cli_buf))
                                   AIT_RE_BUF(&c->cli_buf, estlen);
                           buf = AIT_GET_BUF(&c->cli_buf);
                           rpc = (struct tagRPCCall*) buf;
   
                           rpc->call_argc = htons(array_Size(RPC_RETVARS(c)));
                           /* Go Encapsulate variables */
                           ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen, 
                                           RPC_RETVARS(c));
                           /* Free return values */
                           ait_freeVars(&c->cli_vars);
                           if (ret == -1) {
                                   rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
   
                                   rpc->call_argc ^= rpc->call_argc;
                                   rpc->call_rep.ret = RPC_ERROR(-1);
                                   rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                           } else
                                   wlen += ret;
                   }
           }
   
           rpc->call_len = htonl(wlen);
   
   #if 0
           /* calculate CRC */
           rpc->call_crc ^= rpc->call_crc;
           rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
   #endif
   
           /* send reply */
           pfd.fd = TASK_FD(task);
           pfd.events = POLLOUT;
           for (; wlen > 0; wlen -= ret, buf += ret) {
                   if ((ret = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 || 
                                   pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
                           if (ret)
                                   LOGERR;
                           else
                                   rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
                           /* close connection */
                           schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                           TASK_ARG(task), 0, NULL, 0);
                           return NULL;
                   }
                   ret = send(TASK_FD(task), buf, MIN(wlen, s->srv_netbuf), MSG_NOSIGNAL);
                   if (ret == -1) {
                           /* close connection */
                           schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                           TASK_ARG(task), 0, NULL, 0);
                           return NULL;
                   }
           }
   
           return NULL;
   }
   
   static void *
   execCall(sched_task_t *task)
   {
           rpc_cli_t *c = TASK_ARG(task);
           rpc_srv_t *s = c->cli_parent;
           rpc_func_t *f = NULL;
           array_t *arr = NULL;
           u_char *buf = AIT_GET_BUF(&c->cli_buf);
           struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
           int argc = ntohs(rpc->call_argc);
   
           /* Go decapsulate variables ... */
           if (argc) {
                   arr = ait_buffer2vars(buf + sizeof(struct tagRPCCall), 
                                   AIT_LEN(&c->cli_buf) - sizeof(struct tagRPCCall), argc, 42);
                   if (!arr) {
                           rpc_SetErr(ERPCMISMATCH, "#%d - %s", elwix_GetErrno(), elwix_GetError());
   
                           rpc->call_argc ^= rpc->call_argc;
                           rpc->call_rep.ret = RPC_ERROR(-1);
                           rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                           return NULL;
                   }
         } else          } else
                s = c->cli_parent;                arr = NULL;
   
        do {        if (!(f = rpc_srv_getCall(s, ntohs(rpc->call_tag)))) {
                FD_ZERO(&fds);                rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
                FD_SET(c->cli_sock, &fds);
                ret = select(c->cli_sock + 1, &fds, NULL, NULL, NULL);                rpc->call_argc ^= rpc->call_argc;
                 rpc->call_rep.ret = RPC_ERROR(-1);
                 rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
         } else {
                 /* if client doesn't want reply */
                 argc = RPC_CHK_NOREPLY(rpc);
                 rpc->call_rep.ret = RPC_ERROR(rpc_srv_execCall(c, rpc, f->func_name, arr));
                 if (rpc->call_rep.ret == htonl(-1)) {
                         if (!rpc->call_rep.eno) {
                                 LOGERR;
                                 rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                         }
                         rpc->call_argc ^= rpc->call_argc;
                 } else {
                         rpc->call_rep.eno ^= rpc->call_rep.eno;
                         if (argc) {
                                 /* without reply */
                                 ait_freeVars(&c->cli_vars);
                                 rpc->call_argc ^= rpc->call_argc;
                         } else {
                                 /* reply */
                                 rpc->call_argc = htons(array_Size(RPC_RETVARS(c)));
                         }
                 }
         }
 
         array_Destroy(&arr);
         return NULL;
 }
 
 static void *
 rxPacket(sched_task_t *task)
 {
         rpc_cli_t *c = TASK_ARG(task);
         rpc_srv_t *s = c->cli_parent;
         int len, rlen, noreply, estlen;
 #if 0
         u_short crc;
 #endif
         u_char *buf = AIT_GET_BUF(&c->cli_buf);
         struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
         struct pollfd pfd;
 #ifdef TCP_SESSION_TIMEOUT
         struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
 
         schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
         schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                          TASK_ARG(task), ts, TASK_ARG(task), 0);
 #endif
 
         memset(buf, 0, sizeof(struct tagRPCCall));
         rlen = recv(TASK_FD(task), rpc, sizeof(struct tagRPCCall), MSG_PEEK);
         if (rlen < sizeof(struct tagRPCCall)) {
                 /* close connection */
                 schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                 TASK_ARG(task), 0, NULL, 0);
                 return NULL;
         } else {
                 estlen = ntohl(rpc->call_len);
                 if (estlen > AIT_LEN(&c->cli_buf))
                         AIT_RE_BUF(&c->cli_buf, estlen);
                 rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
                 buf = AIT_GET_BUF(&c->cli_buf);
                 len = estlen;
         }
 
         /* get next part of packet */
         memset(buf, 0, len);
         pfd.fd = TASK_FD(task);
         pfd.events = POLLIN | POLLPRI;
         for (; len > 0; len -= rlen, buf += rlen) {
                 if ((rlen = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 || 
                                 pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
                         if (rlen)
                                 LOGERR;
                         else
                                 rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
                         schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                         TASK_ARG(task), 0, NULL, 0);
                         return NULL;
                 }
                 rlen = recv(TASK_FD(task), buf, len, 0);
                 if (rlen == -1) {
                         /* close connection */
                         schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                         TASK_ARG(task), 0, NULL, 0);
                         return NULL;
                 }
         }
         len = estlen;
 
 #if 0
         /* check integrity of packet */
         crc = ntohs(rpc->call_crc);
         rpc->call_crc ^= rpc->call_crc;
         if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
                 rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
                 return NULL;
         }
 #endif
 
         noreply = RPC_CHK_NOREPLY(rpc);
 
         /* check RPC packet session info */
         if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) {
                 rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
 
                 rpc->call_argc ^= rpc->call_argc;
                 rpc->call_rep.ret = RPC_ERROR(-1);
                 rpc->call_rep.eno = RPC_ERROR(errno);
         } else {
                 /* execute RPC call */
                 schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), (int) noreply, rpc, len);
         }
 
         /* send RPC reply */
         if (!noreply)
                 schedWrite(TASK_ROOT(task), cbProto[s->srv_proto][CB_TXPACKET], 
                                 TASK_ARG(task), TASK_FD(task), rpc, len);
 
         /* lets get next packet */
         schedReadSelf(task);
         return NULL;
 }
 
 static void *
 acceptClients(sched_task_t *task)
 {
         rpc_srv_t *srv = TASK_ARG(task);
         rpc_cli_t *c = NULL;
         socklen_t salen = sizeof(sockaddr_t);
         int sock;
 #ifdef TCP_SESSION_TIMEOUT
         struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
 #endif
 
         c = _allocClient(srv, NULL);
         if (!c) {
                 EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
                 if ((sock = accept(TASK_FD(task), NULL, NULL)) != -1) {
                         shutdown(sock, SHUT_RDWR);
                         close(sock);
                 }
                 goto end;
         }
 
         /* accept client */
         c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
         if (c->cli_sock == -1) {
                 LOGERR;
                 AIT_FREE_VAL(&c->cli_buf);
                 array_Del(srv->srv_clients, c->cli_id, 42);
                 goto end;
         } else
                 fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
 
 #ifdef TCP_SESSION_TIMEOUT
         /* armed timer for close stateless connection */
         schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
         schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], c, 
                         ts, c, 0);
 #endif
         schedRead(TASK_ROOT(task), cbProto[srv->srv_proto][CB_RXPACKET], c, 
                         c->cli_sock, NULL, 0);
 end:
         schedReadSelf(task);
         return NULL;
 }
 
 
 static void *
 txUDPPacket(sched_task_t *task)
 {
         rpc_cli_t *c = TASK_ARG(task);
         rpc_srv_t *s = c->cli_parent;
         rpc_func_t *f = NULL;
         u_char *buf = AIT_GET_BUF(&c->cli_buf);
         struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
         int ret, estlen, wlen = sizeof(struct tagRPCCall);
         struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
         struct pollfd pfd;
 
         schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
         schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                          TASK_ARG(task), ts, TASK_ARG(task), 0);
 
         if (rpc->call_argc) {
                 f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
                 if (!f) {
                         rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
                         rpc->call_argc ^= rpc->call_argc;
                         rpc->call_rep.ret = RPC_ERROR(-1);
                         rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                 } else {
                         /* calc estimated length */
                         estlen = ait_resideVars(RPC_RETVARS(c)) + wlen;
                         if (estlen > AIT_LEN(&c->cli_buf))
                                 AIT_RE_BUF(&c->cli_buf, estlen);
                         buf = AIT_GET_BUF(&c->cli_buf);
                         rpc = (struct tagRPCCall*) buf;
 
                         rpc->call_argc = htons(array_Size(RPC_RETVARS(c)));
                         /* Go Encapsulate variables */
                         ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen, 
                                         RPC_RETVARS(c));
                         /* Free return values */
                         ait_freeVars(&c->cli_vars);
                         if (ret == -1) {
                                 rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
                                 rpc->call_argc ^= rpc->call_argc;
                                 rpc->call_rep.ret = RPC_ERROR(-1);
                                 rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                         } else
                                 wlen += ret;
                 }
         }
 
         rpc->call_len = htonl(wlen);
 
         /* calculate CRC */
         rpc->call_crc ^= rpc->call_crc;
         rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
 
         /* send reply */
         pfd.fd = TASK_FD(task);
         pfd.events = POLLOUT;
         for (; wlen > 0; wlen -= ret, buf += ret) {
                 if ((ret = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 || 
                                 pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
                         if (ret)
                                 LOGERR;
                         else
                                 rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
                         /* close connection */
                         schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                         TASK_ARG(task), 0, NULL, 0);
                         return NULL;
                 }
                 ret = sendto(TASK_FD(task), buf, MIN(wlen, s->srv_netbuf), MSG_NOSIGNAL, 
                                 &c->cli_sa.sa, c->cli_sa.sa.sa_len);
                 if (ret == -1) {                  if (ret == -1) {
                        ret = -2;                        /* close connection */
                         schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                         TASK_ARG(task), 0, NULL, 0);
                         return NULL;
                 }                  }
                memset(&rrpc, 0, sizeof rrpc);        }
                memset(buf, 0, BUFSIZ);
                if ((ret = recv(c->cli_sock, buf, BUFSIZ, 0)) == -1) {        return NULL;
                        LOGERR;}
                        ret = -3;
                        break;static void *
 rxUDPPacket(sched_task_t *task)
 {
         rpc_srv_t *srv = TASK_ARG(task);
         rpc_cli_t *c = NULL;
         int len, rlen, noreply, estlen;
         u_short crc;
         u_char *buf, b[sizeof(struct tagRPCCall)];
         struct tagRPCCall *rpc = (struct tagRPCCall*) b;
         sockaddr_t sa;
         socklen_t salen;
         struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
         struct pollfd pfd;
 
         /* receive connect packet */
         salen = sa.ss.ss_len = sizeof(sockaddr_t);
         rlen = recvfrom(TASK_FD(task), b, sizeof b, MSG_PEEK, &sa.sa, &salen);
         if (rlen < sizeof(struct tagRPCCall)) {
                 rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
                 goto end;
         }
 
         c = _allocClient(srv, &sa);
         if (!c) {
                 EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
                 usleep(2000);   /* blocked client delay */
                 goto end;
         } else {
                 estlen = ntohl(rpc->call_len);
                 if (estlen > AIT_LEN(&c->cli_buf))
                         AIT_RE_BUF(&c->cli_buf, estlen);
                 rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
                 buf = AIT_GET_BUF(&c->cli_buf);
                 len = estlen;
 
                 c->cli_sock = TASK_FD(task);
                 memcpy(&c->cli_sa, &sa, sizeof c->cli_sa);
 
                 /* armed timer for close stateless connection */
                 schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
                 schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], 
                                 c, ts, c, 0);
         }
 
         /* get next part of packet */
         memset(buf, 0, len);
         pfd.fd = TASK_FD(task);
         pfd.events = POLLIN | POLLPRI;
         for (; len > 0; len -= rlen, buf += rlen) {
                 if ((rlen = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 || 
                                 pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
                         if (rlen)
                                 LOGERR;
                         else
                                 rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
                         schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], 
                                         c, 0, NULL, 0);
                         goto end;
                 }                  }
                if (!ret) {         // receive EOF                salen = sa.ss.ss_len = sizeof(sockaddr_t);
                        ret = 0;                rlen = recvfrom(TASK_FD(task), buf, len, 0, &sa.sa, &salen);
                        break;                if (rlen == -1) {
                         /* close connection */
                         schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], 
                                         c, 0, NULL, 0);
                         goto end;
                 }                  }
                if (ret < sizeof(struct tagRPCCall)) {                if (e_addrcmp(&c->cli_sa, &sa, 42))
                        rpc_SetErr(EMSGSIZE, "Error:: too short RPC packet ...\n");                        rlen ^= rlen;   /* skip if arrive from different address */
                        ret = -4;        }
                        break;        len = estlen;
                } else
         /* check integrity of packet */
         crc = ntohs(rpc->call_crc);
         rpc->call_crc ^= rpc->call_crc;
         if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
                 rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
                 /* close connection */
                 schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], 
                                 c, 0, NULL, 0);
                 goto end;
         }
 
         noreply = RPC_CHK_NOREPLY(rpc);
 
         /* check RPC packet session info */
         if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
                 rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
 
                 rpc->call_argc ^= rpc->call_argc;
                 rpc->call_rep.ret = RPC_ERROR(-1);
                 rpc->call_rep.eno = RPC_ERROR(errno);
         } else {
                 /* execute RPC call */
                 schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
         }
 
         /* send RPC reply */
         if (!noreply)
                 schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], 
                                 c, TASK_FD(task), rpc, len);
 end:
         schedReadSelf(task);
         return NULL;
 }
 
 
 static void *
 txBPFPacket(sched_task_t *task)
 {
         rpc_cli_t *c = TASK_ARG(task);
         rpc_srv_t *s = c->cli_parent;
         rpc_func_t *f = NULL;
         u_char *buf = AIT_GET_BUF(&c->cli_buf);
         struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
         int ret, len, wlen = sizeof(struct tagRPCCall);
         struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
         struct ether_header *eh;
         ait_val_t b = AIT_VAL_INIT;
 
         schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
         schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                          TASK_ARG(task), ts, TASK_ARG(task), 0);
 
         if (rpc->call_argc) {
                 f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
                 if (!f) {
                         rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
                         rpc->call_argc ^= rpc->call_argc;
                         rpc->call_rep.ret = RPC_ERROR(-1);
                         rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                 } else {
                         /* calc estimated length */
                         len = ait_resideVars(RPC_RETVARS(c)) + wlen;
                         if (len > AIT_LEN(&c->cli_buf))
                                 AIT_RE_BUF(&c->cli_buf, len);
                         buf = AIT_GET_BUF(&c->cli_buf);
                         rpc = (struct tagRPCCall*) buf;                          rpc = (struct tagRPCCall*) buf;
                // check RPC packet session info
                if (memcmp(&rpc->call_session, &s->srv_session, sizeof rpc->call_session)) {                        rpc->call_argc = htons(array_Size(RPC_RETVARS(c)));
                        rpc_SetErr(EINVAL, "Error:: get invalid RPC session ...\n");                        /* Go Encapsulate variables */
                        ret = -5;                        ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen, 
                        break;                                        RPC_RETVARS(c));
                         /* Free return values */
                         ait_freeVars(&RPC_RETVARS(c));
                         if (ret == -1) {
                                 rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
                                 rpc->call_argc ^= rpc->call_argc;
                                 rpc->call_rep.ret = RPC_ERROR(-1);
                                 rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                         } else
                                 wlen += ret;
                 }                  }
                // RPC is OK! Go decapsulate variables ...        }
                if (rpc->call_argc) {
                        v = (rpc_val_t*) (buf + sizeof(struct tagRPCCall));        rpc->call_len = htonl(wlen);
                        // RPC received variables types OK!
                        data = (u_char*) v + rpc->call_argc * sizeof(rpc_val_t);        /* calculate CRC */
                        for (i = 0; i < rpc->call_argc; i++) {        rpc->call_crc ^= rpc->call_crc;
                                switch (v[i].val_type) {        rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
                                        case buffer:
                                                v[i].val.buffer = data;        /* send reply */
                                                data += v[i].val_len;        AIT_SET_BUF(&b, NULL, MIN(wlen, s->srv_netbuf) + ETHER_HDR_LEN);
                                                break;        eh = (struct ether_header*) AIT_GET_BUF(&b);
                                        case string:        memcpy(eh->ether_dhost, LLADDR(&c->cli_sa.sdl), ETHER_ADDR_LEN);
                                                v[i].val.string = (int8_t*) data;        eh->ether_type = htons(RPC_DEFPORT);
                                                data += v[i].val_len + 1;        memcpy(eh + 1, buf, MIN(wlen, s->srv_netbuf));
                                                break;
                                        case array:        ret = write(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b));
                                                v[i].val.array = (int8_t**) data;        AIT_FREE_VAL(&b);
                                                data += v[i].val_len;        if (ret == -1) {
                                                break;                /* close connection */
                                        default:                schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                                break;                                TASK_ARG(task), 0, NULL, 0);
                                }                return NULL;
                        }        }
 
         return NULL;
 }
 
 static void *
 rxBPFPacket(sched_task_t *task)
 {
         rpc_srv_t *srv = TASK_ARG(task);
         rpc_cli_t *c = NULL;
         int len, rlen, noreply;
         u_short crc;
         struct tagRPCCall *rpc;
         sockaddr_t sa;
         struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
         struct bpf_hdr *h;
         struct ether_header *eh;
         ait_val_t b = AIT_VAL_INIT;
 
         /* receive connect packet */
         AIT_SET_BUF(&b, NULL, srv->srv_netbuf);
         rlen = read(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b));
         h = (struct bpf_hdr*) AIT_GET_BUF(&b);
         rlen -= h->bh_hdrlen;
         if (rlen < h->bh_datalen || h->bh_caplen != h->bh_datalen || 
                         rlen < ETHER_HDR_LEN + sizeof(struct tagRPCCall)) {
                 rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
                 goto end;
         } else {
                 rlen = h->bh_caplen;
                 eh = (struct ether_header*) (AIT_GET_BUF(&b) + h->bh_hdrlen);
                 rlen -= ETHER_HDR_LEN;
                 rpc = (struct tagRPCCall*) (eh + 1);
                 if (eh->ether_type != ntohs(RPC_DEFPORT))
                         goto end;
                 else
                         e_getlinkbymac((const ether_addr_t*) eh->ether_shost, &sa);
         }
 
         c = _allocClient(srv, &sa);
         if (!c) {
                 EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
                 usleep(2000);   /* blocked client delay */
                 goto end;
         } else {
                 len = ntohl(rpc->call_len);
                 if (len > AIT_LEN(&c->cli_buf))
                         AIT_RE_BUF(&c->cli_buf, len);
                 memcpy(AIT_GET_BUF(&c->cli_buf), rpc, AIT_LEN(&c->cli_buf));
                 rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
 
                 c->cli_sock = TASK_FD(task);
                 memcpy(&c->cli_sa, &sa, sizeof c->cli_sa);
 
                 /* armed timer for close stateless connection */
                 schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
                 schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], 
                                 c, ts, c, 0);
         }
 
         /* check integrity of packet */
         crc = ntohs(rpc->call_crc);
         rpc->call_crc ^= rpc->call_crc;
         if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
                 rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
                 /* close connection */
                 schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], 
                                 c, 0, NULL, 0);
                 goto end;
         }
 
         noreply = RPC_CHK_NOREPLY(rpc);
 
         /* check RPC packet session info */
         if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
                 rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
 
                 rpc->call_argc ^= rpc->call_argc;
                 rpc->call_rep.ret = RPC_ERROR(-1);
                 rpc->call_rep.eno = RPC_ERROR(errno);
         } else {
                 /* execute RPC call */
                 schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
         }
 
         /* send RPC reply */
         if (!noreply)
                 schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], 
                                 c, TASK_FD(task), rpc, len);
 end:
         AIT_FREE_VAL(&b);
         schedReadSelf(task);
         return NULL;
 }
 
 
 static void *
 txEXTPacket(sched_task_t *task)
 {
         rpc_cli_t *c = TASK_ARG(task);
         rpc_srv_t *s = c->cli_parent;
         rpc_func_t *f = NULL;
         u_char *buf = AIT_GET_BUF(&c->cli_buf);
         struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
         int ret, estlen, wlen = sizeof(struct tagRPCCall);
         struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
         struct pollfd pfd;
 
         schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
         schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                          TASK_ARG(task), ts, TASK_ARG(task), 0);
 
         if (rpc->call_argc) {
                 f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
                 if (!f) {
                         rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
                         rpc->call_argc ^= rpc->call_argc;
                         rpc->call_rep.ret = RPC_ERROR(-1);
                         rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                 } else {
                         /* calc estimated length */
                         estlen = ait_resideVars(RPC_RETVARS(c)) + wlen;
                         if (estlen > AIT_LEN(&c->cli_buf))
                                 AIT_RE_BUF(&c->cli_buf, estlen);
                         buf = AIT_GET_BUF(&c->cli_buf);
                         rpc = (struct tagRPCCall*) buf;
 
                         rpc->call_argc = htons(array_Size(RPC_RETVARS(c)));
                         /* Go Encapsulate variables */
                         ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen, 
                                         RPC_RETVARS(c));
                         /* Free return values */
                         ait_freeVars(&c->cli_vars);
                         if (ret == -1) {
                                 rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
                                 rpc->call_argc ^= rpc->call_argc;
                                 rpc->call_rep.ret = RPC_ERROR(-1);
                                 rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                         } else
                                 wlen += ret;
                 }                  }
           }
   
                argc = 0;        rpc->call_len = htonl(wlen);
                vals = NULL;
                if (!(f = rpc_srv_getCall(s, rpc->call_tag, rpc->call_hash))) {        /* send reply */
                        rpc_SetErr(EINVAL, "Error:: call not found into RPC server ...\n");        pfd.fd = TASK_FD(task);
                        ret = -6;        pfd.events = POLLOUT;
                } else        for (; wlen > 0; wlen -= ret, buf += ret) {
                        if ((ret = rpc_srv_execCall(s, f, rpc, v)) == -1)                if ((ret = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 || 
                                ret = -6;                                pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
                         if (ret)
                                 LOGERR;
                         else                          else
                                argc = rpc_srv_getValsCall(f, &vals);                                rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
                         /* close connection */
                         schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                         TASK_ARG(task), 0, NULL, 0);
                         return NULL;
                 }
                 ret = write(TASK_FD(task), buf, MIN(wlen, s->srv_netbuf));
                 if (ret == -1) {
                         /* close connection */
                         schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                         TASK_ARG(task), 0, NULL, 0);
                         return NULL;
                 }
         }
   
                memcpy(&rrpc.ret_session, &rpc->call_session, sizeof rrpc.ret_session);        return NULL;
                rrpc.ret_tag = rpc->call_tag;}
                rrpc.ret_hash = rpc->call_hash; 
                rrpc.ret_errno = rpc_Errno; 
                rrpc.ret_retcode = ret; 
                rrpc.ret_argc = argc; 
   
                memset(buf, 0, BUFSIZ);static void *
                memcpy(buf, &rrpc, (Limit = sizeof rrpc));rxEXTPacket(sched_task_t *task)
                if (argc && vals) {{
                        v = (rpc_val_t*) (buf + sizeof rrpc);        rpc_srv_t *srv = TASK_ARG(task);
                        memcpy(v, vals, argc * sizeof(rpc_val_t));        rpc_cli_t *c = NULL;
                        Limit += argc * sizeof(rpc_val_t);        int len, rlen, noreply, estlen;
                        data = (u_char*) v + argc * sizeof(rpc_val_t);        u_char *buf, b[sizeof(struct tagRPCCall)];
                        for (ret = i = 0; i < argc; i++) {        struct tagRPCCall *rpc = (struct tagRPCCall*) b;
                                switch (vals[i].val_type) {        struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
                                        case buffer:        struct pollfd pfd;
                                                if (ret || Limit + vals[i].val_len > BUFSIZ) { 
                                                        rpc_SetErr(EMSGSIZE, "Error:: in prepare RPC packet (-7) ...\n"); 
                                                        rrpc.ret_retcode = ret = -7; 
                                                        rrpc.ret_argc = 0; 
                                                        break; 
                                                } 
   
                                                memcpy(data, vals[i].val.buffer, vals[i].val_len);        /* receive connect packet */
                                                data += vals[i].val_len;        rlen = read(TASK_FD(task), b, sizeof b);
                                                Limit += vals[i].val_len;        if (rlen < sizeof(struct tagRPCCall)) {
                                                break;                rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
                                        case string:                goto end;
                                                if (ret || Limit + vals[i].val_len + 1 > BUFSIZ) {        }
                                                        rpc_SetErr(EMSGSIZE, "Error:: in prepare RPC packet (-7) ...\n"); 
                                                        rrpc.ret_retcode = ret = -7; 
                                                        rrpc.ret_argc = 0; 
                                                        break; 
                                                } 
   
                                                memcpy(data, vals[i].val.string, vals[i].val_len + 1);        c = _allocClient(srv, (sockaddr_t*) -1);
                                                data += vals[i].val_len + 1;        if (!c) {
                                                Limit += vals[i].val_len + 1;                EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
                                                break;                usleep(2000);   /* blocked client delay */
                                        case array:                goto end;
                                                if (ret || Limit + vals[i].val_len > BUFSIZ) {        } else {
                                                        rpc_SetErr(EMSGSIZE, "Error:: in prepare RPC packet (-7) ...\n");                estlen = ntohl(rpc->call_len);
                                                        rrpc.ret_retcode = ret = -7;                if (estlen > AIT_LEN(&c->cli_buf))
                                                        rrpc.ret_argc = 0;                        AIT_RE_BUF(&c->cli_buf, estlen);
                                                        break;                rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
                                                }                memcpy(rpc, b, sizeof b);
                 buf = (u_char*) (rpc + 1);
                 len = estlen;
   
                                                memcpy(data, vals[i].val.array, vals[i].val_len);                c->cli_sock = TASK_FD(task);
                                                data += vals[i].val_len; 
                                                Limit += vals[i].val_len; 
                                                break; 
                                        default: 
                                                break; 
                                } 
   
                                RPC_FREE_VAL(&vals[i]);                /* armed timer for close stateless connection */
                        }                schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
                 schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], 
                                 c, ts, c, 0);
         }
 
         /* get next part of packet */
         memset(buf, 0, len);
         pfd.fd = TASK_FD(task);
         pfd.events = POLLIN | POLLPRI;
         for (; len > 0; len -= rlen, buf += rlen) {
                 if ((rlen = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 || 
                                 pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
                         if (rlen)
                                 LOGERR;
                         else
                                 rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
                         schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], 
                                         c, 0, NULL, 0);
                         goto end;
                 }                  }
                   rlen = read(TASK_FD(task), buf, len);
                   if (rlen == -1) {
                           /* close connection */
                           schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], 
                                           c, 0, NULL, 0);
                           goto end;
                   }
           }
           len = estlen;
   
                if ((ret = send(c->cli_sock, buf, Limit, 0)) == -1) {        noreply = RPC_CHK_NOREPLY(rpc);
                        LOGERR;
                        ret = -8;        /* check RPC packet session info */
         if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
                 rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
 
                 rpc->call_argc ^= rpc->call_argc;
                 rpc->call_rep.ret = RPC_ERROR(-1);
                 rpc->call_rep.eno = RPC_ERROR(errno);
         } else {
                 /* execute RPC call */
                 schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
         }
 
         /* send RPC reply */
         if (!noreply)
                 schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], 
                                 c, TASK_FD(task), rpc, len);
 end:
         schedReadSelf(task);
         return NULL;
 }
 
 /* ------------------------------------------------------ */
 
 void
 rpc_freeBLOBCli(rpc_cli_t * __restrict c)
 {
         rpc_srv_t *s = c->cli_parent;
 
         schedCancelby(s->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
 
         /* free buffer */
         AIT_FREE_VAL(&c->cli_buf);
 
         array_Del(s->srv_blob.clients, c->cli_id, 0);
         if (c)
                 e_free(c);
 }
 
 
 static void *
 closeBLOBClient(sched_task_t *task)
 {
         int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock;
 
         rpc_freeBLOBCli(TASK_ARG(task));
 
         /* close client socket */
         shutdown(sock, SHUT_RDWR);
         close(sock);
         return NULL;
 }
 
 static void *
 txBLOB(sched_task_t *task)
 {
         rpc_cli_t *c = TASK_ARG(task);
         u_char *buf = AIT_GET_BUF(&c->cli_buf);
         int wlen = sizeof(struct tagBLOBHdr);
 
         /* send reply */
         wlen = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);
         if (wlen == -1 || wlen != sizeof(struct tagBLOBHdr)) {
                 /* close blob connection */
                 schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
         }
 
         return NULL;
 }
 
 static void *
 rxBLOB(sched_task_t *task)
 {
         rpc_cli_t *c = TASK_ARG(task);
         rpc_srv_t *s = c->cli_parent;
         rpc_blob_t *b;
         struct tagBLOBHdr blob;
         int rlen;
 
         memset(&blob, 0, sizeof blob);
         rlen = recv(TASK_FD(task), &blob, sizeof blob, 0);
         if (rlen < 1) {
                 /* close blob connection */
                 schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
                 return NULL;
         }
 
         /* check BLOB packet */
         if (rlen < sizeof(struct tagBLOBHdr)) {
                 rpc_SetErr(ERPCMISMATCH, "Short BLOB packet");
 
                 schedReadSelf(task);
                 return NULL;
         }
 
         /* check RPC packet session info */
         if (rpc_chkPktSession(&blob.hdr_session, &s->srv_session)) {
                 rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
                 blob.hdr_cmd = error;
                 goto end;
         }
 
         /* Go to proceed packet ... */
         switch (blob.hdr_cmd) {
                 case get:
                         if (!(b = rpc_srv_getBLOB(s, ntohl(blob.hdr_var)))) {
                                 rpc_SetErr(EINVAL, "Var=%x not found", ntohl(blob.hdr_var));
                                 blob.hdr_cmd = no;
                                 blob.hdr_ret = RPC_ERROR(-1);
                                 break;
                         } else
                                 blob.hdr_len = htonl(b->blob_len);
 
                         if (rpc_srv_blobMap(s, b) != -1) {
                                 /* deliver BLOB variable to client */
                                 blob.hdr_ret = htonl(rpc_srv_sendBLOB(c, b));
                                 rpc_srv_blobUnmap(b);
                         } else {
                                 blob.hdr_cmd = error;
                                 blob.hdr_ret = RPC_ERROR(-1);
                         }
                         break;                          break;
                }                case set:
                if (ret != Limit) {                        if ((b = rpc_srv_registerBLOB(s, ntohl(blob.hdr_len), 
                        rpc_SetErr(EBADMSG, "Error:: in send RPC request, should be send %d bytes, "                                                        ntohl(blob.hdr_ret)))) {
                                        "really is %d\n", Limit, ret);                                /* set new BLOB variable for reply :) */
                        ret = -9;                                blob.hdr_var = htonl(b->blob_var);
 
                                 /* receive BLOB from client */
                                 blob.hdr_ret = htonl(rpc_srv_recvBLOB(c, b));
                                 rpc_srv_blobUnmap(b);
                         } else {
                                 blob.hdr_cmd = error;
                                 blob.hdr_ret = RPC_ERROR(-1);
                         }
                         break;                          break;
                   case unset:
                           if (rpc_srv_unregisterBLOB(s, ntohl(blob.hdr_var)) == -1) {
                                   blob.hdr_cmd = error;
                                   blob.hdr_ret = RPC_ERROR(-1);
                           }
                           break;
                   default:
                           rpc_SetErr(EPROCUNAVAIL, "Unsupported BLOB command %d", blob.hdr_cmd);
                           blob.hdr_cmd = error;
                           blob.hdr_ret = RPC_ERROR(-1);
           }
   
   end:
           memcpy(AIT_ADDR(&c->cli_buf), &blob, sizeof blob);
           schedWrite(TASK_ROOT(task), txBLOB, TASK_ARG(task), TASK_FD(task), NULL, 0);
           schedReadSelf(task);
           return NULL;
   }
   
   static void *
   flushBLOB(sched_task_t *task)
   {
           uintptr_t sigArg = atomic_load_acq_ptr(&_glSigArg);
           rpc_srv_t *srv = sigArg ? (void*) sigArg : TASK_ARG(task);
           rpc_blob_t *b, *tmp;
   
           TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
                   TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
   
                   rpc_srv_blobFree(srv, b);
                   e_free(b);
           }
   
           if (!schedSignalSelf(task)) {
                   /* disabled kqueue support in libaitsched */
                   struct sigaction sa;
   
                   memset(&sa, 0, sizeof sa);
                   sigemptyset(&sa.sa_mask);
                   sa.sa_handler = (void (*)(int)) flushBLOB;
                   sa.sa_flags = SA_RESTART | SA_RESETHAND;
                   sigaction(SIGFBLOB, &sa, NULL);
           }
   
           return NULL;
   }
   
   static void *
   acceptBLOBClients(sched_task_t *task)
   {
           rpc_srv_t *srv = TASK_ARG(task);
           rpc_cli_t *c = NULL;
           register int i;
           socklen_t salen = sizeof(sockaddr_t);
           int sock;
   #ifdef TCP_NOPUSH
           int n = 1;
   #endif
   
           /* check free slots for connect */
           for (i = 0; i < array_Size(srv->srv_blob.clients) && 
                           (c = array(srv->srv_blob.clients, i, rpc_cli_t*)); i++);
           if (c) {        /* no more free slots! */
                   EVERBOSE(1, "BLOB client quota exceeded! Connection will be shutdown!\n");
                   if ((sock = accept(TASK_FD(task), NULL, NULL)) != -1) {
                           shutdown(sock, SHUT_RDWR);
                           close(sock);
                 }                  }
        } while (ret > -1);                goto end;
         }
   
        shutdown(c->cli_sock, SHUT_RDWR);        c = e_malloc(sizeof(rpc_cli_t));
        close(c->cli_sock);        if (!c) {
        memset(c, 0, sizeof(rpc_cli_t));                LOGERR;
        return (void*) ret;                srv->srv_kill = srv->srv_blob.kill = 1;
                 return NULL;
         } else {
                 memset(c, 0, sizeof(rpc_cli_t));
                 array_Set(srv->srv_blob.clients, i, c);
                 c->cli_id = i;
                 c->cli_parent = srv;
         }
 
         /* alloc empty buffer */
         AIT_SET_BUFSIZ(&c->cli_buf, 0, srv->srv_netbuf);
 
         /* accept client */
         c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
         if (c->cli_sock == -1) {
                 LOGERR;
                 AIT_FREE_VAL(&c->cli_buf);
                 array_Del(srv->srv_blob.clients, i, 42);
                 goto end;
         } else {
 #ifdef TCP_NOPUSH
                 setsockopt(c->cli_sock, IPPROTO_TCP, TCP_NOPUSH, &n, sizeof n);
 #endif
                 fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
         }
 
         schedRead(TASK_ROOT(task), rxBLOB, c, c->cli_sock, NULL, 0);
 end:
         schedReadSelf(task);
         return NULL;
 }  }
   
// -------------------------------------------------/* ------------------------------------------------------ */
   
 /*  /*
 * rpc_srv_initServer() Init & create RPC Server * rpc_srv_initBLOBServer() - Init & create BLOB Server
 * @regProgID = ProgramID for authentication & recognition *
 * @regProcID = ProcessID for authentication & recognition * @srv = RPC server instance
 * @concurentClients = Concurent clients at same time to this server 
 * @family = Family socket type, AF_INET or AF_INET6 
 * @csHost = Host name or IP address for bind server, if NULL any address 
  * @Port = Port for bind server, if Port == 0 default port is selected   * @Port = Port for bind server, if Port == 0 default port is selected
 * return: NULL == error or !=NULL bind and created RPC server instance * @diskDir = Disk place for BLOB file objects
  * return: -1 == error or 0 bind and created BLOB server instance
  */   */
rpc_srv_t *int
rpc_srv_initServer(u_int regProgID, u_int regProcID, int concurentClients, rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)
                u_short family, const char *csHost, u_short Port) 
 {  {
         rpc_srv_t *srv = NULL;  
         int n = 1;          int n = 1;
         struct hostent *host = NULL;  
         struct sockaddr_in sin;  
         struct sockaddr_in6 sin6;  
   
        if (!concurentClients || !regProgID || (family != AF_INET && family != AF_INET6)) {        if (!srv || srv->srv_kill) {
                rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t init RPC server ...\n");                rpc_SetErr(EINVAL, "Invalid parameters can`t init BLOB server");
                return NULL;                return -1;
         }          }
        if (!Port)
                Port = RPC_DEFPORT;        memset(&srv->srv_blob, 0, sizeof srv->srv_blob);
        if (csHost) {        if (access(diskDir, R_OK | W_OK) == -1) {
                host = gethostbyname2(csHost, family);                LOGERR;
                if (!host) {                return -1;
                        rpc_SetErr(h_errno, "Error:: %s\n", hstrerror(h_errno));        } else
                        return NULL;                AIT_SET_STR(&srv->srv_blob.dir, diskDir);
                }
        }        /* init blob list */
        switch (family) {        TAILQ_INIT(&srv->srv_blob.blobs);
 
         srv->srv_blob.server.cli_parent = srv;
 
         memcpy(&srv->srv_blob.server.cli_sa, &srv->srv_server.cli_sa, sizeof(sockaddr_t));
         switch (srv->srv_blob.server.cli_sa.sa.sa_family) {
                 case AF_INET:                  case AF_INET:
                        memset(&sin, 0, sizeof sin);                        srv->srv_blob.server.cli_sa.sin.sin_port = 
                        sin.sin_len = sizeof sin;                                htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin.sin_port) + 1);
                        sin.sin_family = family; 
                        sin.sin_port = htons(Port); 
                        if (csHost) 
                                memcpy(&sin.sin_addr, host->h_addr, host->h_length); 
                         break;                          break;
                 case AF_INET6:                  case AF_INET6:
                        memset(&sin6, 0, sizeof sin6);                        srv->srv_blob.server.cli_sa.sin6.sin6_port = 
                        sin6.sin6_len = sizeof sin6;                                htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin6.sin6_port) + 1);
                        sin6.sin6_family = family; 
                        sin6.sin6_port = htons(Port); 
                        if (csHost) 
                                memcpy(&sin6.sin6_addr, host->h_addr, host->h_length); 
                         break;                          break;
                   case AF_LOCAL:
                           strlcat(srv->srv_blob.server.cli_sa.sun.sun_path, ".blob", 
                                           sizeof srv->srv_blob.server.cli_sa.sun.sun_path);
                           break;
                 default:                  default:
                        rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t start RPC server ...\n");                        AIT_FREE_VAL(&srv->srv_blob.dir);
                        return NULL;                        return -1;
         }          }
   
        srv = malloc(sizeof(rpc_srv_t));        /* create BLOB server socket */
        if (!srv) {        srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);
         if (srv->srv_blob.server.cli_sock == -1) {
                 LOGERR;                  LOGERR;
                return NULL;                AIT_FREE_VAL(&srv->srv_blob.dir);
        } else                return -1;
                memset(srv, 0, sizeof(rpc_srv_t));        }
        if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
        srv->srv_numcli = concurentClients; 
        srv->srv_session.sess_version = RPC_VERSION; 
        srv->srv_session.sess_program = regProgID; 
        srv->srv_session.sess_process = regProcID; 
 
        srv->srv_server.cli_parent = srv; 
        if (family == AF_INET) 
                memcpy(&srv->srv_server.cli_sa, &sin, sizeof srv->srv_server.cli_sa); 
        else 
                memcpy(&srv->srv_server.cli_sa, &sin6, sizeof srv->srv_server.cli_sa); 
        srv->srv_server.cli_sock = socket(family, SOCK_STREAM, 0); 
        if (srv->srv_server.cli_sock == -1) { 
                 LOGERR;                  LOGERR;
                free(srv);                close(srv->srv_blob.server.cli_sock);
                return NULL;                AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;
         }          }
        if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {        n = srv->srv_netbuf;
         if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
                 LOGERR;                  LOGERR;
                close(srv->srv_server.cli_sock);                close(srv->srv_blob.server.cli_sock);
                free(srv);                AIT_FREE_VAL(&srv->srv_blob.dir);
                return NULL;                return -1;
         }          }
        if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa, sizeof srv->srv_server.cli_sa) == -1) {        if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
                 LOGERR;                  LOGERR;
                close(srv->srv_server.cli_sock);                close(srv->srv_blob.server.cli_sock);
                free(srv);                AIT_FREE_VAL(&srv->srv_blob.dir);
                return NULL;                return -1;
         }          }
        if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa, 
        srv->srv_clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));                                srv->srv_blob.server.cli_sa.sa.sa_len) == -1) {
        if (!srv->srv_clients) { 
                 LOGERR;                  LOGERR;
                close(srv->srv_server.cli_sock);                close(srv->srv_blob.server.cli_sock);
                free(srv);                AIT_FREE_VAL(&srv->srv_blob.dir);
                return NULL;                return -1;
         } else          } else
                memset(srv->srv_clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));                fcntl(srv->srv_blob.server.cli_sock, F_SETFL, 
                                 fcntl(srv->srv_blob.server.cli_sock, F_GETFL) | O_NONBLOCK);
   
        rpc_srv_registerCall(srv, NULL, CALL_SRVCLIENTS, 0);
        rpc_srv_registerCall(srv, NULL, CALL_SRVCALLS, 0);        /* allocate pool for concurent blob clients */
        rpc_srv_registerCall(srv, NULL, CALL_SRVSESSIONS, 0);        srv->srv_blob.clients = array_Init(array_Size(srv->srv_clients));
        return srv;        if (!srv->srv_blob.clients) {
                 rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
                 close(srv->srv_blob.server.cli_sock);
                 AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;
         }
 
         /* init blob scheduler */
         srv->srv_blob.root = schedBegin();
         if (!srv->srv_blob.root) {
                 rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                 array_Destroy(&srv->srv_blob.clients);
                 close(srv->srv_blob.server.cli_sock);
                 AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;
         }
 
         return 0;
 }  }
   
 /*  /*
 * rpc_srv_endServer() Destroy RPC server, close all opened sockets and free resources * rpc_srv_endBLOBServer() - Destroy BLOB server, close all opened sockets and free resources
  *
  * @srv = RPC Server instance   * @srv = RPC Server instance
  * return: none   * return: none
  */   */
 void  void
rpc_srv_endServer(rpc_srv_t * __restrict srv)rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
 {  {
        rpc_cli_t *c;        if (!srv)
        register int i; 
        rpc_func_t *f; 
 
        if (!srv) { 
                rpc_SetErr(EINVAL, "Error:: Can`t destroy server because parameter is null!\n"); 
                 return;                  return;
         }  
   
        while ((f = srv->srv_funcs)) {        srv->srv_blob.kill = 1;
                srv->srv_funcs = f->func_next; 
                free(f); 
        } 
   
        for (i = 0, c = srv->srv_clients; i < srv->srv_numcli && c; i++, c++)        schedEnd(&srv->srv_blob.root);
                if (c->cli_sa.sa_family) 
                        shutdown(c->cli_sock, SHUT_RDWR); 
        close(srv->srv_server.cli_sock); 
 
        if (srv->srv_clients) { 
                free(srv->srv_clients); 
                srv->srv_numcli = 0; 
        } 
 
        free(srv); 
        srv = NULL; 
 }  }
   
 /*  /*
 * rpc_srv_execServer() Execute Main server loop and wait for clients requests * rpc_srv_loopBLOBServer() - Execute Main BLOB server loop and wait for clients requests
  *
  * @srv = RPC Server instance   * @srv = RPC Server instance
  * return: -1 error or 0 ok, infinite loop ...   * return: -1 error or 0 ok, infinite loop ...
  */   */
 int  int
rpc_srv_execServer(rpc_srv_t * __restrict srv)rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv)
 {  {
         socklen_t salen = sizeof(struct sockaddr);  
         register int i;  
         rpc_cli_t *c;          rpc_cli_t *c;
           register int i;
           rpc_blob_t *b, *tmp;
           struct timespec ts = { RPC_SCHED_POLLING, 0 };
   
        if (!srv) {        if (!srv || srv->srv_kill) {
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start RPC server ...\n");                rpc_SetErr(EINVAL, "Invalid parameter can`t start BLOB server");
                 return -1;                  return -1;
         }          }
   
        if (listen(srv->srv_server.cli_sock, SOMAXCONN) == -1) {        if (listen(srv->srv_blob.server.cli_sock, array_Size(srv->srv_blob.clients)) == -1) {
                 LOGERR;                  LOGERR;
                 return -1;                  return -1;
         }          }
   
        while (42) {        if (!schedSignal(srv->srv_blob.root, flushBLOB, srv, SIGFBLOB, NULL, 0)) {
                for (c = srv->srv_clients, i = 0; i < srv->srv_numcli && c; i++, c++)                /* disabled kqueue support in libaitsched */
                        if (!c->cli_sa.sa_family)                struct sigaction sa;
                                break; 
                if (c && c->cli_sa.sa_family && c->cli_parent) { 
                        usleep(1000000); 
                        continue; 
                } 
                c->cli_sock = accept(srv->srv_server.cli_sock, &c->cli_sa, &salen); 
                if (c->cli_sock == -1) { 
                        printf("%s(%d): #%d - %s\n", __func__, __LINE__, errno, strerror(errno)); 
                        LOGERR; 
                        continue; 
                } else 
                        c->cli_parent = srv; 
   
                if (pthread_create(&c->cli_tid, NULL, rpc_srv_dispatchCall, c)) {                atomic_store_rel_ptr(&_glSigArg, (uintptr_t) srv);
                        LOGERR;
                        continue;                memset(&sa, 0, sizeof sa);
                }                sigemptyset(&sa.sa_mask);
                 sa.sa_handler = (void (*)(int)) flushBLOB;
                 sa.sa_flags = SA_RESTART | SA_RESETHAND;
                 sigaction(SIGFBLOB, &sa, NULL);
         }          }
   
        /* not reached !!! */        if (!schedRead(srv->srv_blob.root, acceptBLOBClients, srv, 
        return 0;                                srv->srv_blob.server.cli_sock, NULL, 0)) {
}                rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                 return -1;
         }
   
// ---------------------------------------------------------        schedPolling(srv->srv_blob.root, &ts, NULL);
         /* main rpc loop */
         schedRun(srv->srv_blob.root, &srv->srv_blob.kill);
   
/*        /* detach blobs */
 * rpc_srv_freeValsCall() Free return variables for RPC call        TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
 * @call = RPC function call                TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
 * return: none 
 */ 
inline void 
rpc_srv_freeValsCall(rpc_func_t * __restrict call) 
{ 
        rpc_srv_declValsCall(call, 0); 
} 
   
/*                rpc_srv_blobFree(srv, b);
 * rpc_srv_declValsCall() Declare return variables for RPC call                e_free(b);
 * @call = RPC function call        }
 * @return_vals = Number of return variables 
 * return: -1 error, !=-1 ok 
 */ 
inline int 
rpc_srv_declValsCall(rpc_func_t * __restrict call, int return_vals) 
{ 
        void *ptr; 
   
        if (!call || return_vals < 0) {        /* close all clients connections & server socket */
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t declare return variables for RPC call...\n");        for (i = 0; i < array_Size(srv->srv_blob.clients); i++) {
                return -1;                c = array(srv->srv_blob.clients, i, rpc_cli_t*);
        } else                if (c) {
                call->func_args = return_vals;                        shutdown(c->cli_sock, SHUT_RDWR);
                         close(c->cli_sock);
   
        if (!return_vals) {                        schedCancelby(srv->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
                if (call->func_vals) {                        AIT_FREE_VAL(&c->cli_buf);
                        free(call->func_vals); 
                        call->func_vals = NULL; 
                 }                  }
        } else {                array_Del(srv->srv_blob.clients, i, 42);
                ptr = realloc(call->func_vals, return_vals * sizeof(rpc_val_t)); 
                if (!ptr) { 
                        LOGERR; 
                        return -1; 
                } else 
                        call->func_vals = ptr; 
         }          }
           array_Destroy(&srv->srv_blob.clients);
   
        return call->func_args;        close(srv->srv_blob.server.cli_sock);
 
         AIT_FREE_VAL(&srv->srv_blob.dir);
         return 0;
 }  }
   
   
 /*  /*
 * rpc_srv_delValsCall() Clean values from return variables of RPC call * rpc_srv_initServer() - Init & create RPC Server
 * @call = RPC function call *
 * return: -1 error, !=-1 Returned number of cleaned RPC variables * @InstID = Instance for authentication & recognition
  * @concurentClients = Concurent clients at same time to this server
  * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
  * @csHost = Host name or address for bind server, if NULL any address
  * @Port = Port for bind server, if Port == 0 default port is selected
  * @proto = Protocol, if == 0 choose SOCK_STREAM
  * return: NULL == error or !=NULL bind and created RPC server instance
  */   */
inline intrpc_srv_t *
rpc_srv_delValsCall(rpc_func_t * __restrict call)rpc_srv_initServer(u_char InstID, int concurentClients, int netBuf, 
                 const char *csHost, u_short Port, int proto)
 {  {
        if (!call) {        int n = 1;
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t delete return variables ...\n");        rpc_srv_t *srv = NULL;
                return -1;        sockaddr_t sa = E_SOCKADDR_INIT;
 
         if (!concurentClients || (proto < 0 || proto > SOCK_RAW)) {
                 rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
                 return NULL;
         }          }
           if (!e_gethostbyname(csHost, Port, &sa))
                   return NULL;
           if (!Port)
                   Port = RPC_DEFPORT;
           if (!proto)
                   proto = SOCK_STREAM;
           if (netBuf < RPC_MIN_BUFSIZ)
                   netBuf = BUFSIZ;
           else
                   netBuf = E_ALIGN(netBuf, 2);    /* align netBuf length */
   
        memset(call->func_vals, 0, call->func_args * sizeof(rpc_val_t));#ifdef HAVE_SRANDOMDEV
        return call->func_args;        srandomdev();
}#else
         time_t tim;
   
/*        srandom((time(&tim) ^ getpid()));
 * rpc_srv_copyValsCall() Copy return variables for RPC call to new variable#endif
 * @call = RPC function call
 * @newvals = New allocated variables array, must be free after use        srv = e_malloc(sizeof(rpc_srv_t));
 * return: -1 error, !=-1 Returned number of copied RPC variables        if (!srv) {
 */                LOGERR;
inline int                return NULL;
rpc_srv_copyValsCall(rpc_func_t * __restrict call, rpc_val_t ** __restrict newvals)        } else
{                memset(srv, 0, sizeof(rpc_srv_t));
        if (!call || !newvals) {
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t copy return variables to new array\n");        srv->srv_proto = proto;
                return -1;        srv->srv_netbuf = netBuf;
         srv->srv_session.sess_version = RPC_VERSION;
         srv->srv_session.sess_instance = InstID;
 
         srv->srv_server.cli_parent = srv;
         memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa);
 
         /* init functions */
         pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
         SLIST_INIT(&srv->srv_funcs);
         AVL_INIT(&srv->srv_funcs);
 
         /* init scheduler */
         srv->srv_root = schedBegin();
         if (!srv->srv_root) {
                 rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                 pthread_mutex_destroy(&srv->srv_funcs.mtx);
                 e_free(srv);
                 return NULL;
         }          }
   
        *newvals = calloc(call->func_args, sizeof(rpc_val_t));        /* init pool for clients */
        if (!*newvals) {        srv->srv_clients = array_Init(concurentClients);
         if (!srv->srv_clients) {
                 rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
                 schedEnd(&srv->srv_root);
                 pthread_mutex_destroy(&srv->srv_funcs.mtx);
                 e_free(srv);
                 return NULL;
         }
 
         /* create server socket */
         srv->srv_server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, srv->srv_proto, 0);
         if (srv->srv_server.cli_sock == -1) {
                 LOGERR;                  LOGERR;
                return -1;                array_Destroy(&srv->srv_clients);
                 schedEnd(&srv->srv_root);
                 pthread_mutex_destroy(&srv->srv_funcs.mtx);
                 e_free(srv);
                 return NULL;
         }
         if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
                 LOGERR;
                 goto err;
         }
         n = srv->srv_netbuf;
         if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
                 LOGERR;
                 goto err;
         }
         if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
                 LOGERR;
                 goto err;
         }
         if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa, 
                                 srv->srv_server.cli_sa.sa.sa_len) == -1) {
                 LOGERR;
                 goto err;
         } else          } else
                memcpy(*newvals, call->func_vals, call->func_args * sizeof(rpc_val_t));                fcntl(srv->srv_server.cli_sock, F_SETFL, 
                                 fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
   
        return call->func_args;        rpc_register_srvPing(srv);
 
         return srv;
 err:    /* error condition */
         close(srv->srv_server.cli_sock);
         array_Destroy(&srv->srv_clients);
         schedEnd(&srv->srv_root);
         pthread_mutex_destroy(&srv->srv_funcs.mtx);
         e_free(srv);
         return NULL;
 }  }
   
 /*  /*
 * rpc_srv_getValsCall() Get return variables for RPC call * rpc_srv_endServer() - Destroy RPC server, close all opened sockets and free resources
 * @call = RPC function call *
 * @vals = Returned variables, may be NULL * @psrv = RPC Server instance
 * return: -1 error, !=-1 Number of returned variables * return: none
  */   */
inline intvoid
rpc_srv_getValsCall(rpc_func_t * __restrict call, rpc_val_t ** __restrict vals)rpc_srv_endServer(rpc_srv_t ** __restrict psrv)
 {  {
        if (!call) {        if (!psrv || !*psrv)
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t get return variables ...\n");                return;
                return -1; 
        } 
   
        if (vals)        /* if send kill to blob server */
                *vals = call->func_vals;        rpc_srv_endBLOBServer(*psrv);
        return call->func_args; 
} 
   
// ---------------------------------------------------------        (*psrv)->srv_kill = 1;
         sleep(RPC_SCHED_POLLING);
   
           schedEnd(&(*psrv)->srv_root);
   
           pthread_mutex_destroy(&(*psrv)->srv_funcs.mtx);
           e_free(*psrv);
           *psrv = NULL;
   }
   
 /*  /*
 * rpc_srv_registerCall() Register call to RPC server * rpc_srv_loopServer() - Execute Main server loop and wait for clients requests
  *
  * @srv = RPC Server instance   * @srv = RPC Server instance
 * @csModule = Module name, if NULL self binary * return: -1 error or 0 ok, infinite loop ...
 * @csFunc = Function name 
 * @args = Number of function arguments 
 * return: -1 error or 0 register ok 
  */   */
 int  int
rpc_srv_registerCall(rpc_srv_t * __restrict srv, const char *csModule, const char *csFunc, u_char args)rpc_srv_loopServer(rpc_srv_t * __restrict srv)
 {  {
        rpc_func_t *func;        rpc_cli_t *c;
        u_char str[MAXPATHLEN + UCHAR_MAX + 1];        register int i;
         rpc_func_t *f;
         struct timespec ts = { RPC_SCHED_POLLING, 0 };
   
        memset(str, 0, MAXPATHLEN + UCHAR_MAX + 1);        if (!srv) {
        if (!srv || !csFunc) {                rpc_SetErr(EINVAL, "Invalid parameter can`t start RPC server");
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t register function to RPC server ...\n"); 
                 return -1;                  return -1;
         }          }
        if (!(func = malloc(sizeof(rpc_func_t)))) {
                LOGERR;        if (srv->srv_proto == SOCK_STREAM)
                 if (listen(srv->srv_server.cli_sock, array_Size(srv->srv_clients)) == -1) {
                         LOGERR;
                         return -1;
                 }
 
         if (!schedRead(srv->srv_root, cbProto[srv->srv_proto][CB_ACCEPTCLIENT], srv, 
                                 srv->srv_server.cli_sock, NULL, 0)) {
                 rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                 return -1;                  return -1;
         } else {  
                 memset(func, 0, sizeof(rpc_func_t));  
                 strlcpy((char*) func->func_name, csFunc, UCHAR_MAX + 1);  
         }          }
        if (csModule) {
                strlcpy((char*) func->func_file, csModule, MAXPATHLEN);        schedPolling(srv->srv_root, &ts, NULL);
                strlcpy((char*) str, csModule, MAXPATHLEN + UCHAR_MAX + 1);        /* main rpc loop */
         schedRun(srv->srv_root, &srv->srv_kill);
 
         /* close all clients connections & server socket */
         for (i = 0; i < array_Size(srv->srv_clients); i++) {
                 c = array(srv->srv_clients, i, rpc_cli_t*);
                 if (c) {
                         if (srv->srv_proto == SOCK_STREAM) {
                                 shutdown(c->cli_sock, SHUT_RDWR);
                                 close(c->cli_sock);
                         }
 
                         schedCancelby(srv->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
                         ait_freeVars(&RPC_RETVARS(c));
                         AIT_FREE_VAL(&c->cli_buf);
                 }
                 array_Del(srv->srv_clients, i, 42);
         }          }
        strlcat((char*) str, "__", MAXPATHLEN + UCHAR_MAX + 1);        array_Destroy(&srv->srv_clients);
        strlcat((char*) str, csFunc, MAXPATHLEN + UCHAR_MAX + 1); 
   
        func->func_tag = crcFletcher16((u_short*) str, (MAXPATHLEN + UCHAR_MAX + 1) / 2);        if (srv->srv_proto != SOCK_EXT)
        func->func_hash = hash_fnv((char*) str, MAXPATHLEN + UCHAR_MAX + 1);                close(srv->srv_server.cli_sock);
   
        if (rpc_srv_declValsCall(func, args) == -1) {        /* detach exported calls */
                free(func);        RPC_FUNCS_LOCK(&srv->srv_funcs);
                return -1;        while ((f = SLIST_FIRST(&srv->srv_funcs))) {
                 SLIST_REMOVE_HEAD(&srv->srv_funcs, func_next);
 
                 AIT_FREE_VAL(&f->func_name);
                 e_free(f);
         }          }
           srv->srv_funcs.avlh_root = NULL;
           RPC_FUNCS_UNLOCK(&srv->srv_funcs);
   
         func->func_next = srv->srv_funcs;  
         srv->srv_funcs = func;  
         return 0;          return 0;
 }  }
   
   
 /*  /*
 * rpc_srv_unregisterCall() Unregister call from RPC server * rpc_srv_execCall() Execute registered call from RPC server
 * @srv = RPC Server instance *
 * @csModule = Module name, if NULL self binary * @cli = RPC client
 * @csFunc = Function name * @rpc = IN RPC call structure
 * return: -1 error, 0 not found call, 1 unregister ok * @funcname = Execute RPC function
  * @args = IN RPC calling arguments from RPC client
  * return: -1 error, !=-1 ok
  */   */
 int  int
rpc_srv_unregisterCall(rpc_srv_t * __restrict srv, const char *csModule, const char *csFunc)rpc_srv_execCall(rpc_cli_t * __restrict cli, struct tagRPCCall * __restrict rpc, 
                 ait_val_t funcname, array_t * __restrict args)
 {  {
        rpc_func_t func, *f, *curr;        rpc_callback_t func;
        u_char str[MAXPATHLEN + UCHAR_MAX + 1]; 
   
        memset(&func, 0, sizeof(rpc_func_t));        if (!cli || !rpc || !AIT_ADDR(&funcname)) {
        memset(str, 0, MAXPATHLEN + UCHAR_MAX + 1);                rpc_SetErr(EINVAL, "Invalid parameter can`t exec function");
        if (!srv || !csFunc) { 
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t unregister function from RPC server ...\n"); 
                 return -1;                  return -1;
         } else  
                 strlcpy((char*) func.func_name, csFunc, UCHAR_MAX + 1);  
         if (csModule) {  
                 strlcpy((char*) func.func_file, csModule, MAXPATHLEN);  
                 strlcpy((char*) str, csModule, MAXPATHLEN + UCHAR_MAX + 1);  
         }          }
         strlcat((char*) str, "__", MAXPATHLEN + UCHAR_MAX + 1);  
         strlcat((char*) str, csFunc, MAXPATHLEN + UCHAR_MAX + 1);  
   
        func.func_tag = crcFletcher16((u_short*) str, (MAXPATHLEN + UCHAR_MAX + 1) / 2);        func = AIT_GET_LIKE(&funcname, rpc_callback_t);
        func.func_hash = hash_fnv((char*) str, MAXPATHLEN + UCHAR_MAX + 1);        return func(cli, rpc, args);
 }
   
         f = rpc_srv_getCall(srv, func.func_tag, func.func_hash);  
         if (!f)         // not found element for unregister  
                 return 0;  
   
        if (srv->srv_funcs == f) {   // if is 1st element/*
                srv->srv_funcs = srv->srv_funcs->func_next; * rpc_srv_initServer2() - Init & create layer2 RPC Server
  *
  * @InstID = Instance for authentication & recognition
  * @concurentClients = Concurent clients at same time to this server
  * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
  * @csIface = Interface name for bind server, if NULL first interface on host
  * return: NULL == error or !=NULL bind and created RPC server instance
  */
 rpc_srv_t *
 rpc_srv_initServer2(u_char InstID, int concurentClients, int netBuf, const char *csIface)
 {
         int n = 1;
         rpc_srv_t *srv = NULL;
         sockaddr_t sa = E_SOCKADDR_INIT;
         char szIface[64], szStr[STRSIZ];
         register int i;
         struct ifreq ifr;
         struct bpf_insn insns[] = {
                 BPF_STMT(BPF_LD + BPF_H + BPF_ABS, 12),
                 BPF_JUMP(BPF_JMP + BPF_JEQ + BPF_K, RPC_DEFPORT, 0, 1),
                 BPF_STMT(BPF_RET + BPF_K, -1),
                 BPF_STMT(BPF_RET + BPF_K, 0),
         };
         struct bpf_program fcode = { 
                 .bf_len = sizeof(insns) / sizeof(struct bpf_insn), 
                 .bf_insns = insns
         };
   
                if (f->func_args && f->func_vals)        if (!concurentClients) {
                        free(f->func_vals);                rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
                free(f);                return NULL;
        } else { 
                for (curr = srv->srv_funcs; curr->func_next != f; curr = curr->func_next); 
                curr->func_next = curr->func_next->func_next; 
 
                if (f->func_args && f->func_vals) 
                        free(f->func_vals); 
                free(f); 
         }          }
           if (!csIface) {
                   if (e_get1stiface(szIface, sizeof szIface))
                           return NULL;
           } else
                   strlcpy(szIface, csIface, sizeof szIface);
           if (!e_getifacebyname(szIface, &sa))
                   return NULL;
   
        return 1;#ifdef HAVE_SRANDOMDEV
}        srandomdev();
 #else
         time_t tim;
   
/*        srandom((time(&tim) ^ getpid()));
 * rpc_srv_getCall() Get registered call from RPC server#endif
 * @srv = RPC Server instance 
 * @tag = tag for function 
 * @hash = hash for function 
 * return: NULL not found call, !=NULL return call 
 */ 
inline rpc_func_t * 
rpc_srv_getCall(rpc_srv_t * __restrict srv, uint16_t tag, uint32_t hash) 
{ 
        rpc_func_t *f; 
   
           srv = e_malloc(sizeof(rpc_srv_t));
         if (!srv) {          if (!srv) {
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t get function from RPC server ...\n");                LOGERR;
                 return NULL;                  return NULL;
           } else
                   memset(srv, 0, sizeof(rpc_srv_t));
   
           srv->srv_proto = SOCK_BPF;
           srv->srv_netbuf = netBuf;
           srv->srv_session.sess_version = RPC_VERSION;
           srv->srv_session.sess_instance = InstID;
   
           srv->srv_server.cli_parent = srv;
           memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa);
   
           /* init functions */
           pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
           SLIST_INIT(&srv->srv_funcs);
           AVL_INIT(&srv->srv_funcs);
   
           /* init scheduler */
           srv->srv_root = schedBegin();
           if (!srv->srv_root) {
                   rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                   pthread_mutex_destroy(&srv->srv_funcs.mtx);
                   e_free(srv);
                   return NULL;
         }          }
   
        for (f = srv->srv_funcs; f; f = f->func_next)        /* init pool for clients */
                if (f->func_tag == tag && f->func_hash == hash)        srv->srv_clients = array_Init(concurentClients);
         if (!srv->srv_clients) {
                 rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
                 schedEnd(&srv->srv_root);
                 pthread_mutex_destroy(&srv->srv_funcs.mtx);
                 e_free(srv);
                 return NULL;
         }
 
         /* create server handler */
         for (i = 0; i < 10; i++) {
                 memset(szStr, 0, sizeof szStr);
                 snprintf(szStr, sizeof szStr, "/dev/bpf%d", i);
                 srv->srv_server.cli_sock = open(szStr, O_RDWR);
                 if (srv->srv_server.cli_sock > STDERR_FILENO)
                         break;                          break;
           }
           if (srv->srv_server.cli_sock < 3) {
                   LOGERR;
                   array_Destroy(&srv->srv_clients);
                   schedEnd(&srv->srv_root);
                   pthread_mutex_destroy(&srv->srv_funcs.mtx);
                   e_free(srv);
                   return NULL;
           }
   
        return f;        if (ioctl(srv->srv_server.cli_sock, BIOCIMMEDIATE, &n) == -1) {
                 LOGERR;
                 goto err;
         }
         if (ioctl(srv->srv_server.cli_sock, BIOCSETF, &fcode) == -1) {
                 LOGERR;
                 goto err;
         }
         n = (netBuf < RPC_MIN_BUFSIZ) ? getpagesize() : E_ALIGN(netBuf, 2);
         if (ioctl(srv->srv_server.cli_sock, BIOCSBLEN, &n) == -1) {
                 LOGERR;
                 goto err;
         } else
                 srv->srv_netbuf = n;
 
         memset(&ifr, 0, sizeof ifr);
         strlcpy(ifr.ifr_name, szIface, sizeof ifr.ifr_name);
         if (ioctl(srv->srv_server.cli_sock, BIOCSETIF, &ifr) == -1) {
                 LOGERR;
                 goto err;
         } else
                 fcntl(srv->srv_server.cli_sock, F_SETFL, 
                                 fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
 
         rpc_register_srvPing(srv);
 
         return srv;
 err:    /* error condition */
         close(srv->srv_server.cli_sock);
         array_Destroy(&srv->srv_clients);
         schedEnd(&srv->srv_root);
         pthread_mutex_destroy(&srv->srv_funcs.mtx);
         e_free(srv);
         return NULL;
 }  }
   
 /*  /*
 * rpc_srv_getFunc() Get registered call from RPC server by Name * rpc_srv_initServerExt() - Init & create pipe RPC Server
 * @srv = RPC Server instance *
 * @csModule = Module name, if NULL self binary * @InstID = Instance for authentication & recognition
 * @csFunc = Function name * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
 * return: NULL not found call, !=NULL return call * @fd = File descriptor
  * return: NULL == error or !=NULL bind and created RPC server instance
  */   */
rpc_func_t *rpc_srv_t *
rpc_srv_getFunc(rpc_srv_t * __restrict srv, const char *csModule, const char *csFunc)rpc_srv_initServerExt(u_char InstID, int netBuf, int fd)
 {  {
        rpc_func_t func;        rpc_srv_t *srv = NULL;
        u_char str[MAXPATHLEN + UCHAR_MAX + 1]; 
   
        memset(&func, 0, sizeof(rpc_func_t));#ifdef HAVE_SRANDOMDEV
        memset(str, 0, MAXPATHLEN + UCHAR_MAX + 1);        srandomdev();
        if (!srv || !csFunc) {#else
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t get function from RPC server ...\n");        time_t tim;
 
         srandom((time(&tim) ^ getpid()));
 #endif
 
         srv = e_malloc(sizeof(rpc_srv_t));
         if (!srv) {
                 LOGERR;
                 return NULL;                  return NULL;
         } else          } else
                strlcpy((char*) func.func_name, csFunc, UCHAR_MAX + 1);                memset(srv, 0, sizeof(rpc_srv_t));
        if (csModule) { 
                strlcpy((char*) func.func_file, csModule, MAXPATHLEN); 
                strlcpy((char*) str, csModule, MAXPATHLEN + UCHAR_MAX + 1); 
        } 
        strlcat((char*) str, "__", MAXPATHLEN + UCHAR_MAX + 1); 
        strlcat((char*) str, csFunc, MAXPATHLEN + UCHAR_MAX + 1); 
   
        func.func_tag = crcFletcher16((u_short*) str, (MAXPATHLEN + UCHAR_MAX + 1) / 2);        srv->srv_proto = SOCK_EXT;
        func.func_hash = hash_fnv((char*) str, MAXPATHLEN + UCHAR_MAX + 1);        srv->srv_netbuf = netBuf;
         srv->srv_session.sess_version = RPC_VERSION;
         srv->srv_session.sess_instance = InstID;
   
        return rpc_srv_getCall(srv, func.func_tag, func.func_hash);        srv->srv_server.cli_parent = srv;
}        srv->srv_server.cli_sock = fd;
   
// ---------------------------------------------------------        /* init functions */
         pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
         SLIST_INIT(&srv->srv_funcs);
         AVL_INIT(&srv->srv_funcs);
   
/*        /* init scheduler */
 * rpc_srv_execCall() Execute registered call from RPC server        srv->srv_root = schedBegin();
 * @data = RPC const data        if (!srv->srv_root) {
 * @call = Register RPC call                rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
 * @rpc = IN RPC call structure                pthread_mutex_destroy(&srv->srv_funcs.mtx);
 * @args = IN RPC call array of rpc values                e_free(srv);
 * return: -1 error, !=-1 ok                return NULL;
 */ 
int 
rpc_srv_execCall(void * const data, rpc_func_t * __restrict call,  
                struct tagRPCCall * __restrict rpc, rpc_val_t * __restrict args) 
{ 
        void *dl; 
        rpc_callback_t func; 
        int ret; 
 
        if (!data || !call || !rpc) { 
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t exec call from RPC server ...\n"); 
                return -1; 
         }          }
   
        dl = dlopen((char*) (*call->func_file ? call->func_file : NULL), RTLD_NOW);        /* init pool for clients */
        if (!dl) {        srv->srv_clients = array_Init(1);
                rpc_SetErr(ENOENT, "Error:: Can`t attach module %s!\n", dlerror());        if (!srv->srv_clients) {
                return -1;                rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
                 schedEnd(&srv->srv_root);
                 pthread_mutex_destroy(&srv->srv_funcs.mtx);
                 e_free(srv);
                 return NULL;
         }          }
   
        func = dlsym(dl, (char*) call->func_name);        fcntl(srv->srv_server.cli_sock, F_SETFL, 
        if (func)                        fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
                ret = func(data, call, rpc->call_argc, args); 
        else { 
                rpc_SetErr(ENOEXEC, "Error:: Can`t find function %s!\n", dlerror()); 
                ret = -1; 
        } 
   
        dlclose(dl);        rpc_register_srvPing(srv);
        return ret;
         return srv;
 }  }

Removed from v.1.1  
changed lines
  Added in v.1.24.2.5


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>