Diff for /libaitrpc/src/srv.c between versions 1.11.2.3 and 1.26.2.6

version 1.11.2.3, 2012/09/17 14:16:42 version 1.26.2.6, 2015/06/28 21:40:46
Line 12  terms: Line 12  terms:
 All of the documentation and software included in the ELWIX and AITNET  All of the documentation and software included in the ELWIX and AITNET
 Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>  Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
   
Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012Copyright 2004 - 2015
         by Michael Pounov <misho@elwix.org>.  All rights reserved.          by Michael Pounov <misho@elwix.org>.  All rights reserved.
   
 Redistribution and use in source and binary forms, with or without  Redistribution and use in source and binary forms, with or without
Line 46  SUCH DAMAGE. Line 46  SUCH DAMAGE.
 #include "global.h"  #include "global.h"
   
   
static void */* SOCK_STREAM */
closeClient(sched_task_t *task)static void *acceptClients(sched_task_t *);
 static void *closeClient(sched_task_t *);
 static void *rxPacket(sched_task_t *);
 static void *txPacket(sched_task_t *);
 
 /* SOCK_DGRAM */
 static void *freeClient(sched_task_t *);
 static void *rxUDPPacket(sched_task_t *);
 static void *txUDPPacket(sched_task_t *);
 
 /* SOCK_RAW */
 static void *rxRAWPacket(sched_task_t *);
 static void *txRAWPacket(sched_task_t *);
 
 /* SOCK_BPF */
 static void *rxBPFPacket(sched_task_t *);
 static void *txBPFPacket(sched_task_t *);
 
 /* SOCK_EXT */
 static void *rxEXTPacket(sched_task_t *);
 static void *txEXTPacket(sched_task_t *);
 
 static sched_task_func_t cbProto[SOCK_MAX_SUPPORT][4] = {
         { acceptClients, closeClient, rxPacket, txPacket },     /* SOCK_STREAM */
         { acceptClients, closeClient, rxPacket, txPacket },     /* SOCK_STREAM */
         { rxUDPPacket, freeClient, rxUDPPacket, txUDPPacket },  /* SOCK_DGRAM */
         { rxRAWPacket, freeClient, rxRAWPacket, txRAWPacket },  /* SOCK_RAW */
         { rxBPFPacket, freeClient, rxBPFPacket, txBPFPacket },  /* SOCK_BPF */
         { rxEXTPacket, freeClient, rxEXTPacket, txEXTPacket }   /* SOCK_EXT */
 };
 
 /* Global Signal Argument when kqueue support disabled */
 
 static volatile uintptr_t _glSigArg = 0;
 
 
 void
 rpc_freeCli(rpc_cli_t * __restrict c)
 {  {
         rpc_cli_t *c = TASK_ARG(task);  
         rpc_srv_t *s = c->cli_parent;          rpc_srv_t *s = c->cli_parent;
   
        schedCancelby(TASK_ROOT(task), taskMAX, CRITERIA_ARG, TASK_ARG(task), NULL);        schedCancelby(s->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
   
         /* close client socket */  
         if (TASK_VAL(task))  
                 shutdown(c->cli_sock, SHUT_RDWR);  
         close(c->cli_sock);  
   
         /* free buffer */          /* free buffer */
         AIT_FREE_VAL(&c->cli_buf);          AIT_FREE_VAL(&c->cli_buf);
   
        io_arrayDel(s->srv_clients, c->cli_id, 0);        array_Del(s->srv_clients, c->cli_id, 0);
         if (c)          if (c)
                io_free(c);                e_free(c);
 }
 
 
 static inline int
 _check4freeslot(rpc_srv_t * __restrict srv, sockaddr_t * __restrict sa)
 {
         rpc_cli_t *c = NULL;
         register int i;
 
         /* check free slots for connect */
         for (i = 0; i < array_Size(srv->srv_clients) && 
                         (c = array(srv->srv_clients, i, rpc_cli_t*)); i++)
                 /* check for duplicates */
                 if (sa && !e_addrcmp(&c->cli_sa, sa, 42))
                         break;
         if (i >= array_Size(srv->srv_clients))
                 return -1;      /* no more free slots! */
 
         return i;
 }
 
 static rpc_cli_t *
 _allocClient(rpc_srv_t * __restrict srv, sockaddr_t * __restrict sa)
 {
         rpc_cli_t *c = NULL;
         int n;
 
         if (srv->srv_proto != SOCK_EXT)
                 n = _check4freeslot(srv, sa);
         else
                 n = 0;
         if (n == -1)
                 return NULL;
         else
                 c = array(srv->srv_clients, n, rpc_cli_t*);
 
         if (!c) {
                 c = e_malloc(sizeof(rpc_cli_t));
                 if (!c) {
                         LOGERR;
                         srv->srv_kill = 1;
                         return NULL;
                 } else {
                         memset(c, 0, sizeof(rpc_cli_t));
                         array_Set(srv->srv_clients, n, c);
                         c->cli_id = n;
                         c->cli_parent = srv;
                 }
 
                 /* alloc empty buffer */
                 AIT_SET_BUFSIZ(&c->cli_buf, 0, srv->srv_netbuf);
         }
 
         return c;
 }
 
 
 static void *
 freeClient(sched_task_t *task)
 {
         rpc_freeCli(TASK_ARG(task));
 
         return NULL;          return NULL;
 }  }
   
 static void *  static void *
   closeClient(sched_task_t *task)
   {
           int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock;
   
           rpc_freeCli(TASK_ARG(task));
   
           /* close client socket */
           shutdown(sock, SHUT_RDWR);
           close(sock);
           return NULL;
   }
   
   static void *
 txPacket(sched_task_t *task)  txPacket(sched_task_t *task)
 {  {
         rpc_cli_t *c = TASK_ARG(task);          rpc_cli_t *c = TASK_ARG(task);
         rpc_srv_t *s = c->cli_parent;          rpc_srv_t *s = c->cli_parent;
         rpc_func_t *f = NULL;          rpc_func_t *f = NULL;
        u_char buf[USHRT_MAX] = { 0 };        u_char *buf = AIT_GET_BUF(&c->cli_buf);
         struct tagRPCCall *rpc = (struct tagRPCCall*) buf;          struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
        int ret, wlen = sizeof(struct tagRPCCall);        int ret, estlen, wlen = sizeof(struct tagRPCCall);
         struct pollfd pfd;
 #ifdef TCP_SESSION_TIMEOUT
         struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
   
        /* copy RPC header */        schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
        memcpy(buf, TASK_DATA(task), wlen);        schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                          TASK_ARG(task), ts, TASK_ARG(task), 0);
 #endif
   
         if (rpc->call_argc) {          if (rpc->call_argc) {
                 f = rpc_srv_getCall(s, ntohs(rpc->call_tag));                  f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
                 if (!f) {                  if (!f) {
                         rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");                          rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
   
                         rpc->call_argc ^= rpc->call_argc;                          rpc->call_argc ^= rpc->call_argc;
                         rpc->call_rep.ret = RPC_ERROR(-1);                          rpc->call_rep.ret = RPC_ERROR(-1);
                         rpc->call_rep.eno = RPC_ERROR(rpc_Errno);                          rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                 } else {                  } else {
                        rpc->call_argc = htons(io_arraySize(RPC_RETVARS(c)));                        /* calc estimated length */
                         estlen = ait_resideVars(RPC_RETVARS(c)) + wlen;
                         if (estlen > AIT_LEN(&c->cli_buf))
                                 AIT_RE_BUF(&c->cli_buf, estlen);
                         buf = AIT_GET_BUF(&c->cli_buf);
                         rpc = (struct tagRPCCall*) buf;
 
                         rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
                         /* Go Encapsulate variables */                          /* Go Encapsulate variables */
                        ret = io_vars2buffer(buf + wlen, sizeof buf - wlen, RPC_RETVARS(c));                        ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen, 
                        /* Free return values */                                        RPC_RETVARS(c));
                        io_freeVars(&c->cli_vars); 
                         if (ret == -1) {                          if (ret == -1) {
                                 rpc_SetErr(EBADRPC, "Prepare RPC packet failed");                                  rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
   
                                 rpc->call_argc ^= rpc->call_argc;                                  rpc->call_argc ^= rpc->call_argc;
                                 rpc->call_rep.ret = RPC_ERROR(-1);                                  rpc->call_rep.ret = RPC_ERROR(-1);
                                 rpc->call_rep.eno = RPC_ERROR(rpc_Errno);                                  rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
Line 104  txPacket(sched_task_t *task) Line 223  txPacket(sched_task_t *task)
                 }                  }
         }          }
   
        rpc->call_len = htons(wlen);        /* Free return values */
         ait_freeVars(&c->cli_vars);
   
           rpc->call_len = htonl(wlen);
           rpc->call_io = RPC_ACK;
   
   #if 0
         /* calculate CRC */          /* calculate CRC */
         rpc->call_crc ^= rpc->call_crc;          rpc->call_crc ^= rpc->call_crc;
         rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));          rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
   #endif
   
         /* send reply */          /* send reply */
        ret = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);        pfd.fd = TASK_FD(task);
        if (ret == -1 || ret != wlen) {        pfd.events = POLLOUT;
                /* close connection */        for (; wlen > 0; wlen -= ret, buf += ret) {
                schedEvent(TASK_ROOT(task), closeClient, c, 42, NULL, 0);                if ((ret = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 || 
                                 pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
                         if (ret)
                                 LOGERR;
                         else
                                 rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
                         /* close connection */
                         schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                         TASK_ARG(task), 0, NULL, 0);
                         return NULL;
                 }
                 ret = send(TASK_FD(task), buf, MIN(wlen, s->srv_netbuf), MSG_NOSIGNAL);
                 if (ret == -1) {
                         /* close connection */
                         schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                         TASK_ARG(task), 0, NULL, 0);
                         return NULL;
                 }
         }          }
   
         return NULL;          return NULL;
Line 127  execCall(sched_task_t *task) Line 269  execCall(sched_task_t *task)
         rpc_srv_t *s = c->cli_parent;          rpc_srv_t *s = c->cli_parent;
         rpc_func_t *f = NULL;          rpc_func_t *f = NULL;
         array_t *arr = NULL;          array_t *arr = NULL;
        u_char *buf = AIT_GET_BUF(&c->cli_buf) + TASK_VAL(task);        u_char *buf = AIT_GET_BUF(&c->cli_buf);
         struct tagRPCCall *rpc = (struct tagRPCCall*) buf;          struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
        int argc = ntohs(rpc->call_argc);        int argc = rpc->call_argc;
   
         /* Go decapsulate variables ... */          /* Go decapsulate variables ... */
         if (argc) {          if (argc) {
                arr = io_buffer2vars(buf + sizeof(struct tagRPCCall),                 arr = ait_buffer2vars(buf + sizeof(struct tagRPCCall), 
                                AIT_LEN(&c->cli_buf) - TASK_VAL(task) - sizeof(struct tagRPCCall),                                 AIT_LEN(&c->cli_buf) - sizeof(struct tagRPCCall), argc, 42);
                                argc, 1); 
                 if (!arr) {                  if (!arr) {
                        rpc_SetErr(ERPCMISMATCH, "#%d - %s", io_GetErrno(), io_GetError());                        rpc_SetErr(ERPCMISMATCH, "#%d - %s", elwix_GetErrno(), elwix_GetError());
 
                         rpc->call_argc ^= rpc->call_argc;                          rpc->call_argc ^= rpc->call_argc;
                         rpc->call_rep.ret = RPC_ERROR(-1);                          rpc->call_rep.ret = RPC_ERROR(-1);
                         rpc->call_rep.eno = RPC_ERROR(rpc_Errno);                          rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
Line 148  execCall(sched_task_t *task) Line 290  execCall(sched_task_t *task)
   
         if (!(f = rpc_srv_getCall(s, ntohs(rpc->call_tag)))) {          if (!(f = rpc_srv_getCall(s, ntohs(rpc->call_tag)))) {
                 rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");                  rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
   
                 rpc->call_argc ^= rpc->call_argc;                  rpc->call_argc ^= rpc->call_argc;
                 rpc->call_rep.ret = RPC_ERROR(-1);                  rpc->call_rep.ret = RPC_ERROR(-1);
                 rpc->call_rep.eno = RPC_ERROR(rpc_Errno);                  rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
Line 156  execCall(sched_task_t *task) Line 299  execCall(sched_task_t *task)
                 argc = RPC_CHK_NOREPLY(rpc);                  argc = RPC_CHK_NOREPLY(rpc);
                 rpc->call_rep.ret = RPC_ERROR(rpc_srv_execCall(c, rpc, f->func_name, arr));                  rpc->call_rep.ret = RPC_ERROR(rpc_srv_execCall(c, rpc, f->func_name, arr));
                 if (rpc->call_rep.ret == htonl(-1)) {                  if (rpc->call_rep.ret == htonl(-1)) {
                        rpc->call_rep.eno = RPC_ERROR(errno);                        if (!rpc->call_rep.eno) {
                                 LOGERR;
                                 rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                         }
                         rpc->call_argc ^= rpc->call_argc;                          rpc->call_argc ^= rpc->call_argc;
                           ait_freeVars(&c->cli_vars);
                 } else {                  } else {
                         rpc->call_rep.eno ^= rpc->call_rep.eno;                          rpc->call_rep.eno ^= rpc->call_rep.eno;
                         if (argc) {                          if (argc) {
                                 /* without reply */                                  /* without reply */
                                io_freeVars(&c->cli_vars);                                ait_freeVars(&c->cli_vars);
                                 rpc->call_argc ^= rpc->call_argc;                                  rpc->call_argc ^= rpc->call_argc;
                         } else {                          } else {
                                 /* reply */                                  /* reply */
                                rpc->call_argc = htons(io_arraySize(RPC_RETVARS(c)));                                rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
                         }                          }
                 }                  }
         }          }
   
        io_arrayDestroy(&arr);        array_Destroy(&arr);
         return NULL;          return NULL;
 }  }
   
Line 180  rxPacket(sched_task_t *task) Line 327  rxPacket(sched_task_t *task)
 {  {
         rpc_cli_t *c = TASK_ARG(task);          rpc_cli_t *c = TASK_ARG(task);
         rpc_srv_t *s = c->cli_parent;          rpc_srv_t *s = c->cli_parent;
        int len, rlen, noreply;        int len, rlen, noreply, estlen;
        u_short crc, off = TASK_DATLEN(task);#if 0
         u_short crc;
 #endif
         u_char *buf = AIT_GET_BUF(&c->cli_buf);          u_char *buf = AIT_GET_BUF(&c->cli_buf);
        struct tagRPCCall *rpc;        struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
         struct pollfd pfd;
 #ifdef TCP_SESSION_TIMEOUT
         struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
   
        memset(buf, 0, AIT_LEN(&c->cli_buf));        schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
        rlen = recv(TASK_FD(task), buf + off, AIT_LEN(&c->cli_buf) - off, 0);        schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
        if (rlen < 1) {                         TASK_ARG(task), ts, TASK_ARG(task), 0);
 #endif
 
         memset(buf, 0, sizeof(struct tagRPCCall));
         rlen = recv(TASK_FD(task), rpc, sizeof(struct tagRPCCall), MSG_PEEK);
         if (rlen < sizeof(struct tagRPCCall)) {
                 /* close connection */                  /* close connection */
                schedEvent(TASK_ROOT(task), closeClient, c, 42, NULL, 0);                schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                 TASK_ARG(task), 0, NULL, 0);
                 return NULL;                  return NULL;
         } else {          } else {
                rlen += off; /* add reminded bytes from previous rxPacket, if exists! */                estlen = ntohl(rpc->call_len);
                off = 0;      /* process buffer from start offset == 0 */                if (estlen > AIT_LEN(&c->cli_buf))
                         AIT_RE_BUF(&c->cli_buf, estlen);
                 rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
                 buf = AIT_GET_BUF(&c->cli_buf);
                 len = estlen;
         }          }
   
        do {        /* get next part of packet */
                /* check RPC packet */        memset(buf, 0, len);
                if (rlen < sizeof(struct tagRPCCall)) {        pfd.fd = TASK_FD(task);
                        rpc_SetErr(ERPCMISMATCH, "Short RPC packet");        pfd.events = POLLIN | POLLPRI;
        for (; len > 0; len -= rlen, buf += rlen) {
                        /* reminder received previous bytes ;) */                if ((rlen = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 || 
                        schedRead(TASK_ROOT(task), TASK_FUNC(task), TASK_ARG(task),                                 pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
                                        TASK_FD(task), TASK_DATA(task), rlen);                        if (rlen)
                                 LOGERR;
                         else
                                 rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
                         schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                         TASK_ARG(task), 0, NULL, 0);
                         return NULL;                          return NULL;
                } else                }
                        rpc = (struct tagRPCCall*) (buf + off);                rlen = recv(TASK_FD(task), buf, len, 0);
                 if (rlen == -1) {
                         /* close connection */
                         schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                         TASK_ARG(task), 0, NULL, 0);
                         return NULL;
                 }
         }
         len = estlen;
   
                len = ntohs(rpc->call_len);        /* skip loop packet */
                rlen -= len;        if (rpc->call_io & RPC_ACK) {
                 schedReadSelf(task);
                 return NULL;
         }
   
                /* check RPC packet lengths */#if 0
                if (rlen < 0 || len < sizeof(struct tagRPCCall)) {        /* check integrity of packet */
                        rpc_SetErr(ERPCMISMATCH, "Broken RPC packet length");        crc = ntohs(rpc->call_crc);
                        /* skip entire packet */        rpc->call_crc ^= rpc->call_crc;
                        break;        if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
                 rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
                 return NULL;
         }
 #endif
 
         noreply = RPC_CHK_NOREPLY(rpc);
 
         /* check RPC packet session info */
         if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) {
                 rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
 
                 rpc->call_argc ^= rpc->call_argc;
                 rpc->call_rep.ret = RPC_ERROR(-1);
                 rpc->call_rep.eno = RPC_ERROR(errno);
         } else {
                 /* execute RPC call */
                 schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), (int) noreply, rpc, len);
         }
 
         /* send RPC reply */
         if (!noreply)
                 schedWrite(TASK_ROOT(task), cbProto[s->srv_proto][CB_TXPACKET], 
                                 TASK_ARG(task), TASK_FD(task), rpc, len);
 
         /* lets get next packet */
         schedReadSelf(task);
         return NULL;
 }
 
 static void *
 acceptClients(sched_task_t *task)
 {
         rpc_srv_t *srv = TASK_ARG(task);
         rpc_cli_t *c = NULL;
         socklen_t salen = sizeof(sockaddr_t);
         int sock;
 #ifdef TCP_SESSION_TIMEOUT
         struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
 #endif
 
         c = _allocClient(srv, NULL);
         if (!c) {
                 EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
                 if ((sock = accept(TASK_FD(task), NULL, NULL)) != -1) {
                         shutdown(sock, SHUT_RDWR);
                         close(sock);
                 }                  }
                   goto end;
           }
   
                /* check integrity of packet */        /* accept client */
                crc = ntohs(rpc->call_crc);        c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
                rpc->call_crc ^= rpc->call_crc;        if (c->cli_sock == -1) {
                if (crc != crcFletcher16((u_short*) rpc, len / 2)) {                LOGERR;
                        rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");                AIT_FREE_VAL(&c->cli_buf);
                 array_Del(srv->srv_clients, c->cli_id, 42);
                 goto end;
         } else
                 fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
   
                        off += len;#ifdef TCP_SESSION_TIMEOUT
                        /* try next packet remaining into buffer */        /* armed timer for close stateless connection */
                        continue;        schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
         schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], c, 
                         ts, c, 0);
 #endif
         schedRead(TASK_ROOT(task), cbProto[srv->srv_proto][CB_RXPACKET], c, 
                         c->cli_sock, NULL, 0);
 end:
         schedReadSelf(task);
         return NULL;
 }
 
 
 static void *
 txUDPPacket(sched_task_t *task)
 {
         rpc_cli_t *c = TASK_ARG(task);
         rpc_srv_t *s = c->cli_parent;
         rpc_func_t *f = NULL;
         u_char *buf = AIT_GET_BUF(&c->cli_buf);
         struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
         int ret, estlen, wlen = sizeof(struct tagRPCCall);
         struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
 
         schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
         schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                          TASK_ARG(task), ts, TASK_ARG(task), 0);
 
         if (rpc->call_argc) {
                 f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
                 if (!f) {
                         rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
                         rpc->call_argc ^= rpc->call_argc;
                         rpc->call_rep.ret = RPC_ERROR(-1);
                         rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                 } else {
                         /* calc estimated length */
                         estlen = ait_resideVars(RPC_RETVARS(c)) + wlen;
                         if (estlen > AIT_LEN(&c->cli_buf))
                                 AIT_RE_BUF(&c->cli_buf, estlen);
                         buf = AIT_GET_BUF(&c->cli_buf);
                         rpc = (struct tagRPCCall*) buf;
 
                         rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
                         /* Go Encapsulate variables */
                         ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen, 
                                         RPC_RETVARS(c));
                         if (ret == -1) {
                                 rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
                                 rpc->call_argc ^= rpc->call_argc;
                                 rpc->call_rep.ret = RPC_ERROR(-1);
                                 rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                         } else
                                 wlen += ret;
                 }                  }
           }
   
                noreply = RPC_CHK_NOREPLY(rpc);        /* Free return values */
         ait_freeVars(&c->cli_vars);
   
                /* check RPC packet session info */        rpc->call_len = htonl(wlen);
                if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) {        rpc->call_io = RPC_ACK;
                        rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
         /* calculate CRC */
         rpc->call_crc ^= rpc->call_crc;
         rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
 
         /* send reply */
         ret = sendto(TASK_FD(task), buf, MIN(wlen, s->srv_netbuf), MSG_NOSIGNAL, 
                         &c->cli_sa.sa, c->cli_sa.sa.sa_len);
         if (ret == -1) {
                 /* close connection */
                 schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                 TASK_ARG(task), 0, NULL, 0);
                 return NULL;
         }
 
         return NULL;
 }
 
 static void *
 rxUDPPacket(sched_task_t *task)
 {
         rpc_srv_t *srv = TASK_ARG(task);
         rpc_cli_t *c = NULL;
         int len, rlen, noreply;
         u_short crc;
         struct tagRPCCall *rpc;
         sockaddr_t sa;
         socklen_t salen;
         struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
         ait_val_t b = AIT_VAL_INIT;
 
         /* receive connect packet */
         AIT_SET_BUF(&b, NULL, srv->srv_netbuf);
         salen = sa.ss.ss_len = sizeof(sockaddr_t);
         rlen = recvfrom(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b), 0, &sa.sa, &salen);
         rpc = (struct tagRPCCall*) AIT_GET_BUF(&b);
         if (rlen < sizeof(struct tagRPCCall) || rlen < ntohl(rpc->call_len)) {
                 rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
                 goto end;
         }
 
         /* skip loop packet */
         if (rpc->call_io & RPC_ACK)
                 goto end;
 
         c = _allocClient(srv, &sa);
         if (!c) {
                 EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
                 usleep(2000);   /* blocked client delay */
                 goto end;
         } else {
                 len = ntohl(rpc->call_len);
                 if (len > AIT_LEN(&c->cli_buf))
                         AIT_RE_BUF(&c->cli_buf, len);
                 memcpy(AIT_GET_BUF(&c->cli_buf), AIT_GET_BUF(&b), len);
                 rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
 
                 c->cli_sock = TASK_FD(task);
                 memcpy(&c->cli_sa, &sa, sizeof c->cli_sa);
 
                 /* armed timer for close stateless connection */
                 schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
                 schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], 
                                 c, ts, c, 0);
         }
 
         /* check integrity of packet */
         crc = ntohs(rpc->call_crc);
         rpc->call_crc ^= rpc->call_crc;
         if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
                 rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
                 /* close connection */
                 schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], 
                                 c, 0, NULL, 0);
                 goto end;
         }
 
         noreply = RPC_CHK_NOREPLY(rpc);
 
         /* check RPC packet session info */
         if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
                 rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
 
                 rpc->call_argc ^= rpc->call_argc;
                 rpc->call_rep.ret = RPC_ERROR(-1);
                 rpc->call_rep.eno = RPC_ERROR(errno);
         } else {
                 /* execute RPC call */
                 schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
         }
 
         /* send RPC reply */
         if (!noreply)
                 schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], 
                                 c, TASK_FD(task), rpc, len);
 end:
         AIT_FREE_VAL(&b);
         schedReadSelf(task);
         return NULL;
 }
 
 
 static void *
 txRAWPacket(sched_task_t *task)
 {
         rpc_cli_t *c = TASK_ARG(task);
         rpc_srv_t *s = c->cli_parent;
         rpc_func_t *f = NULL;
         u_char *buf = AIT_GET_BUF(&c->cli_buf);
         struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
         int ret, estlen, wlen = sizeof(struct tagRPCCall);
         struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
 
         schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
         schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                          TASK_ARG(task), ts, TASK_ARG(task), 0);
 
         if (rpc->call_argc) {
                 f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
                 if (!f) {
                         rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
                         rpc->call_argc ^= rpc->call_argc;                          rpc->call_argc ^= rpc->call_argc;
                         rpc->call_rep.ret = RPC_ERROR(-1);                          rpc->call_rep.ret = RPC_ERROR(-1);
                        rpc->call_rep.eno = RPC_ERROR(errno);                        rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                 } else {                  } else {
                        /* execute RPC call */                        /* calc estimated length */
                        schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), off, NULL, 0);                        estlen = ait_resideVars(RPC_RETVARS(c)) + wlen;
                         if (estlen > AIT_LEN(&c->cli_buf))
                                 AIT_RE_BUF(&c->cli_buf, estlen);
                         buf = AIT_GET_BUF(&c->cli_buf);
                         rpc = (struct tagRPCCall*) buf;
 
                         rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
                         /* Go Encapsulate variables */
                         ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen, 
                                         RPC_RETVARS(c));
                         if (ret == -1) {
                                 rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
                                 rpc->call_argc ^= rpc->call_argc;
                                 rpc->call_rep.ret = RPC_ERROR(-1);
                                 rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                         } else
                                 wlen += ret;
                 }                  }
           }
   
                /* send RPC reply */        /* Free return values */
                if (!noreply)        ait_freeVars(&c->cli_vars);
                        schedWrite(TASK_ROOT(task), txPacket, TASK_ARG(task), TASK_FD(task), rpc, len); 
   
                off += len;        rpc->call_len = htonl(wlen);
        } while (rlen > 0);        rpc->call_io = RPC_ACK;
   
        /* lets get next packet */        /* calculate CRC */
        schedRead(TASK_ROOT(task), TASK_FUNC(task), TASK_ARG(task), TASK_FD(task), TASK_DATA(task), 0);        rpc->call_crc ^= rpc->call_crc;
         rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
 
         /* send reply */
         ret = sendto(TASK_FD(task), buf, MIN(wlen, s->srv_netbuf), MSG_NOSIGNAL, 
                         &c->cli_sa.sa, c->cli_sa.sa.sa_len);
         if (ret == -1) {
                 /* close connection */
                 schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                 TASK_ARG(task), 0, NULL, 0);
                 return NULL;
         }
 
         return NULL;          return NULL;
 }  }
   
 static void *  static void *
acceptClients(sched_task_t *task)rxRAWPacket(sched_task_t *task)
 {  {
         rpc_srv_t *srv = TASK_ARG(task);          rpc_srv_t *srv = TASK_ARG(task);
         rpc_cli_t *c = NULL;          rpc_cli_t *c = NULL;
        register int i;        int len, rlen, noreply;
        socklen_t salen = sizeof(io_sockaddr_t);        u_short crc;
         struct tagRPCCall *rpc;
         sockaddr_t sa;
         socklen_t salen;
         struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
         ait_val_t b = AIT_VAL_INIT;
   
        /* check free slots for connect */        /* receive connect packet */
        for (i = 0; i < io_arraySize(srv->srv_clients) &&         AIT_SET_BUF(&b, NULL, srv->srv_netbuf);
                        (c = io_array(srv->srv_clients, i, rpc_cli_t*)); i++);        salen = sa.ss.ss_len = sizeof(sockaddr_t);
        if (c)      /* no more free slots! */        rlen = recvfrom(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b), 0, &sa.sa, &salen);
         if (sa.sa.sa_family == AF_INET) {
                 struct ip *h;
                 h = (struct ip*) AIT_GET_BUF(&b);
                 if (rlen < ntohs(h->ip_len) || h->ip_p != IPPROTO_ERPC)
                         goto end;
                 else {
                         rlen -= sizeof(struct ip);
                         rpc = (struct tagRPCCall*) 
                                 (AIT_GET_BUF(&b) + sizeof(struct ip));
                 }
         } else {
                 struct ip6_hdr *h;
                 h = (struct ip6_hdr*) AIT_GET_BUF(&b);
                 if (rlen < (ntohs(h->ip6_plen) + sizeof(struct ip6_hdr)) || 
                                 h->ip6_nxt != IPPROTO_ERPC)
                         goto end;
                 else {
                         rlen -= sizeof(struct ip6_hdr);
                         rpc = (struct tagRPCCall*) 
                                 (AIT_GET_BUF(&b) + sizeof(struct ip6_hdr));
                 }
         }
         if (rlen < sizeof(struct tagRPCCall) || rlen < ntohl(rpc->call_len)) {
                 rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
                 goto end;                  goto end;
        c = io_malloc(sizeof(rpc_cli_t));        }
 
         /* skip loop packet */
         if (rpc->call_io & RPC_ACK)
                 goto end;
 
         c = _allocClient(srv, &sa);
         if (!c) {          if (!c) {
                LOGERR;                EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
                srv->srv_kill = 1;                usleep(2000);   /* blocked client delay */
                 goto end;
         } else {
                 len = ntohl(rpc->call_len);
                 if (len > AIT_LEN(&c->cli_buf))
                         AIT_RE_BUF(&c->cli_buf, len);
                 memcpy(AIT_GET_BUF(&c->cli_buf), rpc, len);
                 rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
 
                 c->cli_sock = TASK_FD(task);
                 memcpy(&c->cli_sa, &sa, sizeof c->cli_sa);
 
                 /* armed timer for close stateless connection */
                 schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
                 schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], 
                                 c, ts, c, 0);
         }
 
         /* check integrity of packet */
         crc = ntohs(rpc->call_crc);
         rpc->call_crc ^= rpc->call_crc;
         if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
                 rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
                 /* close connection */
                 schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], 
                                 c, 0, NULL, 0);
                 goto end;
         }
 
         noreply = RPC_CHK_NOREPLY(rpc);
 
         /* check RPC packet session info */
         if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
                 rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
 
                 rpc->call_argc ^= rpc->call_argc;
                 rpc->call_rep.ret = RPC_ERROR(-1);
                 rpc->call_rep.eno = RPC_ERROR(errno);
         } else {
                 /* execute RPC call */
                 schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
         }
 
         /* send RPC reply */
         if (!noreply)
                 schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], 
                                 c, TASK_FD(task), rpc, len);
 end:
         AIT_FREE_VAL(&b);
         schedReadSelf(task);
         return NULL;
 }
 
 
 static void *
 txBPFPacket(sched_task_t *task)
 {
         rpc_cli_t *c = TASK_ARG(task);
         rpc_srv_t *s = c->cli_parent;
         rpc_func_t *f = NULL;
         u_char *buf = AIT_GET_BUF(&c->cli_buf);
         struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
         int ret, len, wlen = sizeof(struct tagRPCCall);
         struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
         struct ether_header *eh;
         ait_val_t b = AIT_VAL_INIT;
 
         schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
         schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                          TASK_ARG(task), ts, TASK_ARG(task), 0);
 
         if (rpc->call_argc) {
                 f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
                 if (!f) {
                         rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
                         rpc->call_argc ^= rpc->call_argc;
                         rpc->call_rep.ret = RPC_ERROR(-1);
                         rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                 } else {
                         /* calc estimated length */
                         len = ait_resideVars(RPC_RETVARS(c)) + wlen;
                         if (len > AIT_LEN(&c->cli_buf))
                                 AIT_RE_BUF(&c->cli_buf, len);
                         buf = AIT_GET_BUF(&c->cli_buf);
                         rpc = (struct tagRPCCall*) buf;
 
                         rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
                         /* Go Encapsulate variables */
                         ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen, 
                                         RPC_RETVARS(c));
                         if (ret == -1) {
                                 rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
                                 rpc->call_argc ^= rpc->call_argc;
                                 rpc->call_rep.ret = RPC_ERROR(-1);
                                 rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                         } else
                                 wlen += ret;
                 }
         }
 
         /* Free return values */
         ait_freeVars(&RPC_RETVARS(c));
 
         rpc->call_len = htonl(wlen);
         rpc->call_io = RPC_ACK;
 
         /* calculate CRC */
         rpc->call_crc ^= rpc->call_crc;
         rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
 
         /* send reply */
         AIT_SET_BUF(&b, NULL, MIN(wlen, s->srv_netbuf) + ETHER_HDR_LEN);
         eh = (struct ether_header*) AIT_GET_BUF(&b);
         memcpy(eh->ether_dhost, LLADDR(&c->cli_sa.sdl), ETHER_ADDR_LEN);
         eh->ether_type = htons(RPC_DEFPORT);
         memcpy(eh + 1, buf, MIN(wlen, s->srv_netbuf));
 
         ret = write(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b));
         AIT_FREE_VAL(&b);
         if (ret == -1) {
                 /* close connection */
                 schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                 TASK_ARG(task), 0, NULL, 0);
                 return NULL;                  return NULL;
           }
   
           return NULL;
   }
   
   static void *
   rxBPFPacket(sched_task_t *task)
   {
           rpc_srv_t *srv = TASK_ARG(task);
           rpc_cli_t *c = NULL;
           int len, rlen, noreply;
           u_short crc;
           struct tagRPCCall *rpc;
           sockaddr_t sa;
           struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
           struct bpf_hdr *h;
           struct ether_header *eh;
           ait_val_t b = AIT_VAL_INIT;
   
           /* receive connect packet */
           AIT_SET_BUF(&b, NULL, srv->srv_netbuf);
           rlen = read(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b));
           h = (struct bpf_hdr*) AIT_GET_BUF(&b);
           rlen -= h->bh_hdrlen;
           if (rlen < h->bh_datalen || h->bh_caplen != h->bh_datalen || 
                           rlen < ETHER_HDR_LEN + sizeof(struct tagRPCCall)) {
                   rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
                   goto end;
         } else {          } else {
                memset(c, 0, sizeof(rpc_cli_t));                rlen = h->bh_caplen;
                io_arraySet(srv->srv_clients, i, c);                eh = (struct ether_header*) (AIT_GET_BUF(&b) + h->bh_hdrlen);
                c->cli_id = i;                rlen -= ETHER_HDR_LEN;
                c->cli_parent = srv;                rpc = (struct tagRPCCall*) (eh + 1);
 
 #if 0
                 /* skip loop packet */
                 if (rpc->call_io & RPC_ACK)
                         goto end;
 #endif
 
                 if (eh->ether_type != ntohs(RPC_DEFPORT))
                         goto end;
                 else
                         e_getlinkbymac((const ether_addr_t*) eh->ether_shost, &sa);
         }          }
   
        /* alloc empty buffer */        c = _allocClient(srv, &sa);
        AIT_SET_BUF2(&c->cli_buf, 0, srv->srv_netbuf);        if (!c) {
                 EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
                 usleep(2000);   /* blocked client delay */
                 goto end;
         } else {
                 len = ntohl(rpc->call_len);
                 if (len > AIT_LEN(&c->cli_buf))
                         AIT_RE_BUF(&c->cli_buf, len);
                 memcpy(AIT_GET_BUF(&c->cli_buf), rpc, len);
                 rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
   
        /* accept client */                c->cli_sock = TASK_FD(task);
        c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);                memcpy(&c->cli_sa, &sa, sizeof c->cli_sa);
        if (c->cli_sock == -1) {
                LOGERR;                /* armed timer for close stateless connection */
                AIT_FREE_VAL(&c->cli_buf);                schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
                io_arrayDel(srv->srv_clients, i, 42);                schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], 
                                 c, ts, c, 0);
         }
 
         /* check integrity of packet */
         crc = ntohs(rpc->call_crc);
         rpc->call_crc ^= rpc->call_crc;
         if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
                 rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
                 /* close connection */
                 schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], 
                                 c, 0, NULL, 0);
                 goto end;                  goto end;
        } else        }
                fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK); 
   
        schedRead(TASK_ROOT(task), rxPacket, c, c->cli_sock, NULL, 0);        noreply = RPC_CHK_NOREPLY(rpc);
 
         /* check RPC packet session info */
         if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
                 rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
 
                 rpc->call_argc ^= rpc->call_argc;
                 rpc->call_rep.ret = RPC_ERROR(-1);
                 rpc->call_rep.eno = RPC_ERROR(errno);
         } else {
                 /* execute RPC call */
                 schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
         }
 
         /* send RPC reply */
         if (!noreply)
                 schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], 
                                 c, TASK_FD(task), rpc, len);
 end:  end:
           AIT_FREE_VAL(&b);
         schedReadSelf(task);          schedReadSelf(task);
         return NULL;          return NULL;
 }  }
   
 /* ------------------------------------------------------ */  
   
 static void *  static void *
closeBLOBClient(sched_task_t *task)txEXTPacket(sched_task_t *task)
 {  {
         rpc_cli_t *c = TASK_ARG(task);          rpc_cli_t *c = TASK_ARG(task);
         rpc_srv_t *s = c->cli_parent;          rpc_srv_t *s = c->cli_parent;
           rpc_func_t *f = NULL;
           u_char *buf = AIT_GET_BUF(&c->cli_buf);
           struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
           int ret, len, wlen = sizeof(struct tagRPCCall);
           struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
   
        schedCancelby(TASK_ROOT(task), taskMAX, CRITERIA_ARG, TASK_ARG(task), NULL);        schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
         schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                          TASK_ARG(task), ts, TASK_ARG(task), 0);
   
        /* close client socket */        if (rpc->call_argc) {
        if (TASK_VAL(task))                f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
                shutdown(c->cli_sock, SHUT_RDWR);                if (!f) {
        close(c->cli_sock);                        rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
                         rpc->call_argc ^= rpc->call_argc;
                         rpc->call_rep.ret = RPC_ERROR(-1);
                         rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                 } else {
                         /* calc estimated length */
                         len = ait_resideVars(RPC_RETVARS(c)) + wlen;
                         if (len > AIT_LEN(&c->cli_buf))
                                 AIT_RE_BUF(&c->cli_buf, len);
                         buf = AIT_GET_BUF(&c->cli_buf);
                         rpc = (struct tagRPCCall*) buf;
   
                           rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
                           /* Go Encapsulate variables */
                           ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen, 
                                           RPC_RETVARS(c));
                           if (ret == -1) {
                                   rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
                                   rpc->call_argc ^= rpc->call_argc;
                                   rpc->call_rep.ret = RPC_ERROR(-1);
                                   rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                           } else
                                   wlen += ret;
                   }
           }
   
           /* Free return values */
           ait_freeVars(&RPC_RETVARS(c));
   
           rpc->call_len = htonl(wlen);
           rpc->call_io = RPC_ACK;
   
           /* send reply */
           ret = write(TASK_FD(task), buf, MIN(wlen, s->srv_netbuf));
           if (ret == -1) {
                   /* close connection */
                   schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                   TASK_ARG(task), 0, NULL, 0);
                   return NULL;
           }
   
           return NULL;
   }
   
   static void *
   rxEXTPacket(sched_task_t *task)
   {
           rpc_srv_t *srv = TASK_ARG(task);
           rpc_cli_t *c = NULL;
           int len, rlen, noreply;
           struct tagRPCCall *rpc;
           struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
           ait_val_t b = AIT_VAL_INIT;
           sockaddr_t sa;
   
           memset(&sa, 0, sizeof sa);
           /* receive connect packet */
           AIT_SET_BUF(&b, NULL, srv->srv_netbuf);
           rlen = read(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b));
           rpc = (struct tagRPCCall*) AIT_GET_BUF(&b);
           if (rlen < sizeof(struct tagRPCCall) || rlen < ntohl(rpc->call_len)) {
                   rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
                   goto end;
           }
   
           /* skip loop packet */
           if (rpc->call_io & RPC_ACK)
                   goto end;
   
           c = _allocClient(srv, &sa);
           if (!c) {
                   EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
                   usleep(2000);   /* blocked client delay */
                   goto end;
           } else {
                   len = ntohl(rpc->call_len);
                   if (len > AIT_LEN(&c->cli_buf))
                           AIT_RE_BUF(&c->cli_buf, len);
                   memcpy(AIT_GET_BUF(&c->cli_buf), rpc, AIT_LEN(&c->cli_buf));
                   rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
   
                   c->cli_sock = TASK_FD(task);
   
                   /* armed timer for close stateless connection */
                   schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
                   schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], 
                                   c, ts, c, 0);
           }
   
           noreply = RPC_CHK_NOREPLY(rpc);
   
           /* check RPC packet session info */
           if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
                   rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
   
                   rpc->call_argc ^= rpc->call_argc;
                   rpc->call_rep.ret = RPC_ERROR(-1);
                   rpc->call_rep.eno = RPC_ERROR(errno);
           } else {
                   /* execute RPC call */
                   schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
           }
   
           /* send RPC reply */
           if (!noreply)
                   schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], 
                                   c, TASK_FD(task), rpc, len);
   end:
           AIT_FREE_VAL(&b);
           schedReadSelf(task);
           return NULL;
   }
   
   /* ------------------------------------------------------ */
   
   void
   rpc_freeBLOBCli(rpc_cli_t * __restrict c)
   {
           rpc_srv_t *s = c->cli_parent;
   
           schedCancelby(s->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
   
         /* free buffer */          /* free buffer */
         AIT_FREE_VAL(&c->cli_buf);          AIT_FREE_VAL(&c->cli_buf);
   
        io_arrayDel(s->srv_blob.clients, c->cli_id, 0);        array_Del(s->srv_blob.clients, c->cli_id, 0);
         if (c)          if (c)
                io_free(c);                e_free(c);
 }
 
 
 static void *
 closeBLOBClient(sched_task_t *task)
 {
         int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock;
 
         rpc_freeBLOBCli(TASK_ARG(task));
 
         /* close client socket */
         shutdown(sock, SHUT_RDWR);
         close(sock);
         return NULL;          return NULL;
 }  }
   
Line 327  txBLOB(sched_task_t *task) Line 1131  txBLOB(sched_task_t *task)
 {  {
         rpc_cli_t *c = TASK_ARG(task);          rpc_cli_t *c = TASK_ARG(task);
         u_char *buf = AIT_GET_BUF(&c->cli_buf);          u_char *buf = AIT_GET_BUF(&c->cli_buf);
         struct tagBLOBHdr *blob = (struct tagBLOBHdr *) buf;  
         int wlen = sizeof(struct tagBLOBHdr);          int wlen = sizeof(struct tagBLOBHdr);
   
         /* calculate CRC */  
         blob->hdr_crc ^= blob->hdr_crc;  
         blob->hdr_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));  
   
         /* send reply */          /* send reply */
         wlen = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);          wlen = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);
         if (wlen == -1 || wlen != sizeof(struct tagBLOBHdr)) {          if (wlen == -1 || wlen != sizeof(struct tagBLOBHdr)) {
Line 352  rxBLOB(sched_task_t *task) Line 1151  rxBLOB(sched_task_t *task)
         rpc_blob_t *b;          rpc_blob_t *b;
         struct tagBLOBHdr blob;          struct tagBLOBHdr blob;
         int rlen;          int rlen;
         u_short crc;  
   
         memset(&blob, 0, sizeof blob);          memset(&blob, 0, sizeof blob);
         rlen = recv(TASK_FD(task), &blob, sizeof blob, 0);          rlen = recv(TASK_FD(task), &blob, sizeof blob, 0);
Line 370  rxBLOB(sched_task_t *task) Line 1168  rxBLOB(sched_task_t *task)
                 return NULL;                  return NULL;
         }          }
   
         /* check integrity of packet */  
         crc = ntohs(blob.hdr_crc);  
         blob.hdr_crc ^= blob.hdr_crc;  
         if (crc != crcFletcher16((u_short*) &blob, rlen / 2)) {  
                 rpc_SetErr(ERPCMISMATCH, "Bad CRC BLOB packet");  
   
                 schedReadSelf(task);  
                 return NULL;  
         }  
   
         /* check RPC packet session info */          /* check RPC packet session info */
        if ((crc = rpc_chkPktSession(&blob.hdr_session, &s->srv_session))) {        if (rpc_chkPktSession(&blob.hdr_session, &s->srv_session)) {
                 rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");                  rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
                 blob.hdr_cmd = error;                  blob.hdr_cmd = error;
                 goto end;                  goto end;
Line 408  rxBLOB(sched_task_t *task) Line 1196  rxBLOB(sched_task_t *task)
                         }                          }
                         break;                          break;
                 case set:                  case set:
                        if ((b = rpc_srv_registerBLOB(s, ntohl(blob.hdr_len)))) {                        if ((b = rpc_srv_registerBLOB(s, ntohl(blob.hdr_len)
                                                         ntohl(blob.hdr_ret)))) {
                                 /* set new BLOB variable for reply :) */                                  /* set new BLOB variable for reply :) */
                                 blob.hdr_var = htonl(b->blob_var);                                  blob.hdr_var = htonl(b->blob_var);
   
Line 440  end: Line 1229  end:
 }  }
   
 static void *  static void *
   flushBLOB(sched_task_t *task)
   {
           uintptr_t sigArg = atomic_load_acq_ptr(&_glSigArg);
           rpc_srv_t *srv = sigArg ? (void*) sigArg : TASK_ARG(task);
           rpc_blob_t *b, *tmp;
   
           TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
                   TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
   
                   rpc_srv_blobFree(srv, b);
                   e_free(b);
           }
   
           if (!schedSignalSelf(task)) {
                   /* disabled kqueue support in libaitsched */
                   struct sigaction sa;
   
                   memset(&sa, 0, sizeof sa);
                   sigemptyset(&sa.sa_mask);
                   sa.sa_handler = (void (*)(int)) flushBLOB;
                   sa.sa_flags = SA_RESTART | SA_RESETHAND;
                   sigaction(SIGFBLOB, &sa, NULL);
           }
   
           return NULL;
   }
   
   static void *
 acceptBLOBClients(sched_task_t *task)  acceptBLOBClients(sched_task_t *task)
 {  {
         rpc_srv_t *srv = TASK_ARG(task);          rpc_srv_t *srv = TASK_ARG(task);
         rpc_cli_t *c = NULL;          rpc_cli_t *c = NULL;
         register int i;          register int i;
        socklen_t salen = sizeof(io_sockaddr_t);        socklen_t salen = sizeof(sockaddr_t);
         int sock;
 #ifdef TCP_NOPUSH  #ifdef TCP_NOPUSH
         int n = 1;          int n = 1;
 #endif  #endif
   
         /* check free slots for connect */          /* check free slots for connect */
        for (i = 0; i < io_arraySize(srv->srv_blob.clients) &&         for (i = 0; i < array_Size(srv->srv_blob.clients) && 
                        (c = io_array(srv->srv_blob.clients, i, rpc_cli_t*)); i++);                        (c = array(srv->srv_blob.clients, i, rpc_cli_t*)); i++);
        if (c)  /* no more free slots! */        if (c) {        /* no more free slots! */
                 EVERBOSE(1, "BLOB client quota exceeded! Connection will be shutdown!\n");
                 if ((sock = accept(TASK_FD(task), NULL, NULL)) != -1) {
                         shutdown(sock, SHUT_RDWR);
                         close(sock);
                 }
                 goto end;                  goto end;
        c = io_malloc(sizeof(rpc_cli_t));        }
 
         c = e_malloc(sizeof(rpc_cli_t));
         if (!c) {          if (!c) {
                 LOGERR;                  LOGERR;
                 srv->srv_kill = srv->srv_blob.kill = 1;                  srv->srv_kill = srv->srv_blob.kill = 1;
                 return NULL;                  return NULL;
         } else {          } else {
                 memset(c, 0, sizeof(rpc_cli_t));                  memset(c, 0, sizeof(rpc_cli_t));
                io_arraySet(srv->srv_blob.clients, i, c);                array_Set(srv->srv_blob.clients, i, c);
                 c->cli_id = i;                  c->cli_id = i;
                 c->cli_parent = srv;                  c->cli_parent = srv;
         }          }
   
         /* alloc empty buffer */          /* alloc empty buffer */
        AIT_SET_BUF2(&c->cli_buf, 0, srv->srv_netbuf);        AIT_SET_BUFSIZ(&c->cli_buf, 0, srv->srv_netbuf);
   
         /* accept client */          /* accept client */
         c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);          c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
         if (c->cli_sock == -1) {          if (c->cli_sock == -1) {
                 LOGERR;                  LOGERR;
                 AIT_FREE_VAL(&c->cli_buf);                  AIT_FREE_VAL(&c->cli_buf);
                io_arrayDel(srv->srv_blob.clients, i, 42);                array_Del(srv->srv_blob.clients, i, 42);
                 goto end;                  goto end;
         } else {          } else {
 #ifdef TCP_NOPUSH  #ifdef TCP_NOPUSH
Line 522  rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s Line 1347  rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s
   
         srv->srv_blob.server.cli_parent = srv;          srv->srv_blob.server.cli_parent = srv;
   
        memcpy(&srv->srv_blob.server.cli_sa, &srv->srv_server.cli_sa, sizeof(io_sockaddr_t));        memcpy(&srv->srv_blob.server.cli_sa, &srv->srv_server.cli_sa, sizeof(sockaddr_t));
         switch (srv->srv_blob.server.cli_sa.sa.sa_family) {          switch (srv->srv_blob.server.cli_sa.sa.sa_family) {
                 case AF_INET:                  case AF_INET:
                         srv->srv_blob.server.cli_sa.sin.sin_port =                           srv->srv_blob.server.cli_sa.sin.sin_port = 
Line 573  rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s Line 1398  rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s
                 close(srv->srv_blob.server.cli_sock);                  close(srv->srv_blob.server.cli_sock);
                 AIT_FREE_VAL(&srv->srv_blob.dir);                  AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;                  return -1;
        }        } else
                 fcntl(srv->srv_blob.server.cli_sock, F_SETFL, 
                                 fcntl(srv->srv_blob.server.cli_sock, F_GETFL) | O_NONBLOCK);
   
   
         /* allocate pool for concurent blob clients */          /* allocate pool for concurent blob clients */
        srv->srv_blob.clients = io_arrayInit(io_arraySize(srv->srv_clients));        srv->srv_blob.clients = array_Init(array_Size(srv->srv_clients));
         if (!srv->srv_blob.clients) {          if (!srv->srv_blob.clients) {
                rpc_SetErr(io_GetErrno(), "%s", io_GetError());                rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
                 close(srv->srv_blob.server.cli_sock);                  close(srv->srv_blob.server.cli_sock);
                 AIT_FREE_VAL(&srv->srv_blob.dir);                  AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;                  return -1;
Line 588  rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s Line 1416  rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s
         srv->srv_blob.root = schedBegin();          srv->srv_blob.root = schedBegin();
         if (!srv->srv_blob.root) {          if (!srv->srv_blob.root) {
                 rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());                  rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                io_arrayDestroy(&srv->srv_blob.clients);                array_Destroy(&srv->srv_blob.clients);
                 close(srv->srv_blob.server.cli_sock);                  close(srv->srv_blob.server.cli_sock);
                 AIT_FREE_VAL(&srv->srv_blob.dir);                  AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;                  return -1;
Line 603  rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s Line 1431  rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s
  * @srv = RPC Server instance   * @srv = RPC Server instance
  * return: none   * return: none
  */   */
inline voidvoid
 rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)  rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
 {  {
         if (!srv)          if (!srv)
                 return;                  return;
   
         srv->srv_blob.kill = 1;          srv->srv_blob.kill = 1;
   
           schedEnd(&srv->srv_blob.root);
   
           if (srv->srv_blob.server.cli_sa.sa.sa_family == AF_LOCAL)
                   unlink(srv->srv_blob.server.cli_sa.sun.sun_path);
 }  }
   
 /*  /*
Line 631  rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv) Line 1464  rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv)
                 return -1;                  return -1;
         }          }
   
        fcntl(srv->srv_blob.server.cli_sock, F_SETFL,         if (listen(srv->srv_blob.server.cli_sock, array_Size(srv->srv_blob.clients)) == -1) {
                        fcntl(srv->srv_blob.server.cli_sock, F_GETFL) | O_NONBLOCK); 
 
        if (listen(srv->srv_blob.server.cli_sock, io_arraySize(srv->srv_blob.clients)) == -1) { 
                 LOGERR;                  LOGERR;
                 return -1;                  return -1;
         }          }
   
           if (!schedSignal(srv->srv_blob.root, flushBLOB, srv, SIGFBLOB, NULL, 0)) {
                   /* disabled kqueue support in libaitsched */
                   struct sigaction sa;
   
                   atomic_store_rel_ptr(&_glSigArg, (uintptr_t) srv);
   
                   memset(&sa, 0, sizeof sa);
                   sigemptyset(&sa.sa_mask);
                   sa.sa_handler = (void (*)(int)) flushBLOB;
                   sa.sa_flags = SA_RESTART | SA_RESETHAND;
                   sigaction(SIGFBLOB, &sa, NULL);
           }
   
         if (!schedRead(srv->srv_blob.root, acceptBLOBClients, srv,           if (!schedRead(srv->srv_blob.root, acceptBLOBClients, srv, 
                                 srv->srv_blob.server.cli_sock, NULL, 0)) {                                  srv->srv_blob.server.cli_sock, NULL, 0)) {
                 rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());                  rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
Line 649  rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv) Line 1492  rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv)
         /* main rpc loop */          /* main rpc loop */
         schedRun(srv->srv_blob.root, &srv->srv_blob.kill);          schedRun(srv->srv_blob.root, &srv->srv_blob.kill);
   
           /* detach blobs */
           TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
                   TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
   
                   rpc_srv_blobFree(srv, b);
                   e_free(b);
           }
   
         /* close all clients connections & server socket */          /* close all clients connections & server socket */
        for (i = 0; i < io_arraySize(srv->srv_blob.clients); i++) {        for (i = 0; i < array_Size(srv->srv_blob.clients); i++) {
                c = io_array(srv->srv_blob.clients, i, rpc_cli_t*);                c = array(srv->srv_blob.clients, i, rpc_cli_t*);
                 if (c) {                  if (c) {
                         shutdown(c->cli_sock, SHUT_RDWR);                          shutdown(c->cli_sock, SHUT_RDWR);
                         close(c->cli_sock);                          close(c->cli_sock);
Line 659  rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv) Line 1510  rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv)
                         schedCancelby(srv->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);                          schedCancelby(srv->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
                         AIT_FREE_VAL(&c->cli_buf);                          AIT_FREE_VAL(&c->cli_buf);
                 }                  }
                io_arrayDel(srv->srv_blob.clients, i, 42);                array_Del(srv->srv_blob.clients, i, 42);
         }          }
        io_arrayDestroy(&srv->srv_blob.clients);        array_Destroy(&srv->srv_blob.clients);
   
         close(srv->srv_blob.server.cli_sock);          close(srv->srv_blob.server.cli_sock);
   
         /* detach blobs */  
         TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {  
                 TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);  
   
                 rpc_srv_blobFree(srv, b);  
                 io_free(b);  
         }  
   
         schedEnd(&srv->srv_blob.root);  
         AIT_FREE_VAL(&srv->srv_blob.dir);          AIT_FREE_VAL(&srv->srv_blob.dir);
         return 0;          return 0;
 }  }
Line 682  rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv) Line 1524  rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv)
 /*  /*
  * rpc_srv_initServer() - Init & create RPC Server   * rpc_srv_initServer() - Init & create RPC Server
  *   *
 * @regProgID = ProgramID for authentication & recognition * @InstID = Instance for authentication & recognition
 * @regProcID = ProcessID for authentication & recognition 
  * @concurentClients = Concurent clients at same time to this server   * @concurentClients = Concurent clients at same time to this server
  * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)   * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
  * @csHost = Host name or address for bind server, if NULL any address   * @csHost = Host name or address for bind server, if NULL any address
  * @Port = Port for bind server, if Port == 0 default port is selected   * @Port = Port for bind server, if Port == 0 default port is selected
    * @proto = Protocol, if == 0 choose SOCK_STREAM
  * return: NULL == error or !=NULL bind and created RPC server instance   * return: NULL == error or !=NULL bind and created RPC server instance
  */   */
 rpc_srv_t *  rpc_srv_t *
rpc_srv_initServer(u_int regProgID, u_char regProcID, int concurentClients, rpc_srv_initServer(u_char InstID, int concurentClients, int netBuf, 
                int netBuf, const char *csHost, u_short Port)                const char *csHost, u_short Port, int proto)
 {  {
         int n = 1;          int n = 1;
         rpc_srv_t *srv = NULL;          rpc_srv_t *srv = NULL;
        io_sockaddr_t sa = IO_SOCKADDR_INIT;        sockaddr_t sa = E_SOCKADDR_INIT;
   
        if (!concurentClients || !regProgID) {        if (!concurentClients || (proto < 0 || proto > SOCK_RAW)) {
                 rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");                  rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
                 return NULL;                  return NULL;
         }          }
        if (!io_gethostbyname(csHost, Port, &sa))        if (!Port && proto < SOCK_RAW)
                return NULL; 
        if (!Port) 
                 Port = RPC_DEFPORT;                  Port = RPC_DEFPORT;
           if (!e_gethostbyname(csHost, Port, &sa))
                   return NULL;
           if (!proto)
                   proto = SOCK_STREAM;
         if (netBuf < RPC_MIN_BUFSIZ)          if (netBuf < RPC_MIN_BUFSIZ)
                 netBuf = BUFSIZ;                  netBuf = BUFSIZ;
         else          else
                netBuf = io_align(netBuf, 2);    /* align netBuf length */                netBuf = E_ALIGN(netBuf, 2);    /* align netBuf length */
   
 #ifdef HAVE_SRANDOMDEV  #ifdef HAVE_SRANDOMDEV
         srandomdev();          srandomdev();
Line 719  rpc_srv_initServer(u_int regProgID, u_char regProcID,  Line 1563  rpc_srv_initServer(u_int regProgID, u_char regProcID, 
         srandom((time(&tim) ^ getpid()));          srandom((time(&tim) ^ getpid()));
 #endif  #endif
   
        srv = io_malloc(sizeof(rpc_srv_t));        srv = e_malloc(sizeof(rpc_srv_t));
         if (!srv) {          if (!srv) {
                 LOGERR;                  LOGERR;
                 return NULL;                  return NULL;
         } else          } else
                 memset(srv, 0, sizeof(rpc_srv_t));                  memset(srv, 0, sizeof(rpc_srv_t));
   
           srv->srv_proto = proto;
         srv->srv_netbuf = netBuf;          srv->srv_netbuf = netBuf;
         srv->srv_session.sess_version = RPC_VERSION;          srv->srv_session.sess_version = RPC_VERSION;
        srv->srv_session.sess_program = regProgID;        srv->srv_session.sess_instance = InstID;
        srv->srv_session.sess_process = regProcID; 
   
         srv->srv_server.cli_parent = srv;          srv->srv_server.cli_parent = srv;
         memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa);          memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa);
   
        /* init functions list */        /* init functions */
        TAILQ_INIT(&srv->srv_funcs);        pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
         SLIST_INIT(&srv->srv_funcs);
         AVL_INIT(&srv->srv_funcs);
   
         /* init scheduler */          /* init scheduler */
         srv->srv_root = schedBegin();          srv->srv_root = schedBegin();
         if (!srv->srv_root) {          if (!srv->srv_root) {
                 rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());                  rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                io_free(srv);                pthread_mutex_destroy(&srv->srv_funcs.mtx);
                 e_free(srv);
                 return NULL;                  return NULL;
         }          }
   
         /* init pool for clients */          /* init pool for clients */
        srv->srv_clients = io_arrayInit(concurentClients);        srv->srv_clients = array_Init(concurentClients);
         if (!srv->srv_clients) {          if (!srv->srv_clients) {
                rpc_SetErr(io_GetErrno(), "%s", io_GetError());                rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
                 schedEnd(&srv->srv_root);                  schedEnd(&srv->srv_root);
                io_free(srv);                pthread_mutex_destroy(&srv->srv_funcs.mtx);
                 e_free(srv);
                 return NULL;                  return NULL;
         }          }
   
         /* create server socket */          /* create server socket */
        srv->srv_server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);        srv->srv_server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, 
                         srv->srv_proto, srv->srv_proto == SOCK_RAW ? IPPROTO_ERPC : 0);
         if (srv->srv_server.cli_sock == -1) {          if (srv->srv_server.cli_sock == -1) {
                 LOGERR;                  LOGERR;
                io_arrayDestroy(&srv->srv_clients);                array_Destroy(&srv->srv_clients);
                 schedEnd(&srv->srv_root);                  schedEnd(&srv->srv_root);
                io_free(srv);                pthread_mutex_destroy(&srv->srv_funcs.mtx);
                 e_free(srv);
                 return NULL;                  return NULL;
         }          }
         if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {          if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
Line 780  rpc_srv_initServer(u_int regProgID, u_char regProcID,  Line 1630  rpc_srv_initServer(u_int regProgID, u_char regProcID, 
                                 srv->srv_server.cli_sa.sa.sa_len) == -1) {                                  srv->srv_server.cli_sa.sa.sa_len) == -1) {
                 LOGERR;                  LOGERR;
                 goto err;                  goto err;
        }        } else
                 fcntl(srv->srv_server.cli_sock, F_SETFL, 
                                 fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
   
         rpc_register_srvPing(srv);          rpc_register_srvPing(srv);
   
         return srv;          return srv;
 err:    /* error condition */  err:    /* error condition */
         close(srv->srv_server.cli_sock);          close(srv->srv_server.cli_sock);
        io_arrayDestroy(&srv->srv_clients);        array_Destroy(&srv->srv_clients);
         schedEnd(&srv->srv_root);          schedEnd(&srv->srv_root);
        io_free(srv);        pthread_mutex_destroy(&srv->srv_funcs.mtx);
         e_free(srv);
         return NULL;          return NULL;
 }  }
   
Line 799  err: /* error condition */ Line 1652  err: /* error condition */
  * @psrv = RPC Server instance   * @psrv = RPC Server instance
  * return: none   * return: none
  */   */
inline voidvoid
 rpc_srv_endServer(rpc_srv_t ** __restrict psrv)  rpc_srv_endServer(rpc_srv_t ** __restrict psrv)
 {  {
         if (!psrv || !*psrv)          if (!psrv || !*psrv)
                 return;                  return;
   
         /* if send kill to blob server */          /* if send kill to blob server */
        if (!(*psrv)->srv_blob.kill)        rpc_srv_endBLOBServer(*psrv);
                rpc_srv_endBLOBServer(*psrv); 
   
         (*psrv)->srv_kill = 1;          (*psrv)->srv_kill = 1;
         sleep(RPC_SCHED_POLLING);          sleep(RPC_SCHED_POLLING);
   
        io_free(*psrv);        schedEnd(&(*psrv)->srv_root);
 
         if ((*psrv)->srv_server.cli_sa.sa.sa_family == AF_LOCAL)
                 unlink((*psrv)->srv_server.cli_sa.sun.sun_path);
 
         pthread_mutex_destroy(&(*psrv)->srv_funcs.mtx);
         e_free(*psrv);
         *psrv = NULL;          *psrv = NULL;
 }  }
   
Line 827  rpc_srv_loopServer(rpc_srv_t * __restrict srv) Line 1685  rpc_srv_loopServer(rpc_srv_t * __restrict srv)
 {  {
         rpc_cli_t *c;          rpc_cli_t *c;
         register int i;          register int i;
        rpc_func_t *f, *tmp;        rpc_func_t *f;
         struct timespec ts = { RPC_SCHED_POLLING, 0 };          struct timespec ts = { RPC_SCHED_POLLING, 0 };
   
         if (!srv) {          if (!srv) {
Line 835  rpc_srv_loopServer(rpc_srv_t * __restrict srv) Line 1693  rpc_srv_loopServer(rpc_srv_t * __restrict srv)
                 return -1;                  return -1;
         }          }
   
        fcntl(srv->srv_server.cli_sock, F_SETFL,         if (srv->srv_proto == SOCK_STREAM)
                        fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);                if (listen(srv->srv_server.cli_sock, array_Size(srv->srv_clients)) == -1) {
                         LOGERR;
                         return -1;
                 }
   
        if (listen(srv->srv_server.cli_sock, io_arraySize(srv->srv_clients)) == -1) {        if (!schedRead(srv->srv_root, cbProto[srv->srv_proto][CB_ACCEPTCLIENT], srv, 
                LOGERR;                                srv->srv_server.cli_sock, NULL, 0)) {
                return -1; 
        } 
 
        if (!schedRead(srv->srv_root, acceptClients, srv, srv->srv_server.cli_sock, NULL, 0)) { 
                 rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());                  rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                 return -1;                  return -1;
         }          }
Line 853  rpc_srv_loopServer(rpc_srv_t * __restrict srv) Line 1710  rpc_srv_loopServer(rpc_srv_t * __restrict srv)
         schedRun(srv->srv_root, &srv->srv_kill);          schedRun(srv->srv_root, &srv->srv_kill);
   
         /* close all clients connections & server socket */          /* close all clients connections & server socket */
        for (i = 0; i < io_arraySize(srv->srv_clients); i++) {        for (i = 0; i < array_Size(srv->srv_clients); i++) {
                c = io_array(srv->srv_clients, i, rpc_cli_t*);                c = array(srv->srv_clients, i, rpc_cli_t*);
                 if (c) {                  if (c) {
                        shutdown(c->cli_sock, SHUT_RDWR);                        if (srv->srv_proto == SOCK_STREAM) {
                        close(c->cli_sock);                                shutdown(c->cli_sock, SHUT_RDWR);
                                 close(c->cli_sock);
                         }
   
                         schedCancelby(srv->srv_root, taskMAX, CRITERIA_ARG, c, NULL);                          schedCancelby(srv->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
                        io_freeVars(&RPC_RETVARS(c));                        ait_freeVars(&RPC_RETVARS(c));
                         AIT_FREE_VAL(&c->cli_buf);                          AIT_FREE_VAL(&c->cli_buf);
                 }                  }
                io_arrayDel(srv->srv_clients, i, 42);                array_Del(srv->srv_clients, i, 42);
         }          }
        io_arrayDestroy(&srv->srv_clients);        array_Destroy(&srv->srv_clients);
   
        close(srv->srv_server.cli_sock);        if (srv->srv_proto != SOCK_EXT)
                 close(srv->srv_server.cli_sock);
   
         /* detach exported calls */          /* detach exported calls */
        TAILQ_FOREACH_SAFE(f, &srv->srv_funcs, func_node, tmp) {        RPC_FUNCS_LOCK(&srv->srv_funcs);
                TAILQ_REMOVE(&srv->srv_funcs, f, func_node);        while ((f = SLIST_FIRST(&srv->srv_funcs))) {
                 SLIST_REMOVE_HEAD(&srv->srv_funcs, func_next);
   
                 AIT_FREE_VAL(&f->func_name);                  AIT_FREE_VAL(&f->func_name);
                io_free(f);                e_free(f);
         }          }
           srv->srv_funcs.avlh_root = NULL;
           RPC_FUNCS_UNLOCK(&srv->srv_funcs);
   
         schedEnd(&srv->srv_root);  
         return 0;          return 0;
 }  }
   
Line 904  rpc_srv_execCall(rpc_cli_t * __restrict cli, struct ta Line 1766  rpc_srv_execCall(rpc_cli_t * __restrict cli, struct ta
   
         func = AIT_GET_LIKE(&funcname, rpc_callback_t);          func = AIT_GET_LIKE(&funcname, rpc_callback_t);
         return func(cli, rpc, args);          return func(cli, rpc, args);
   }
   
   
   /*
    * rpc_srv_initServer2() - Init & create layer2 RPC Server
    *
    * @InstID = Instance for authentication & recognition
    * @concurentClients = Concurent clients at same time to this server
    * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
    * @csIface = Interface name for bind server, if NULL first interface on host
    * return: NULL == error or !=NULL bind and created RPC server instance
    */
   rpc_srv_t *
   rpc_srv_initServer2(u_char InstID, int concurentClients, int netBuf, const char *csIface)
   {
           int n = 1;
           rpc_srv_t *srv = NULL;
           sockaddr_t sa = E_SOCKADDR_INIT;
           char szIface[64], szStr[STRSIZ];
           register int i;
           struct ifreq ifr;
           struct bpf_insn insns[] = {
                   BPF_STMT(BPF_LD + BPF_H + BPF_ABS, 12),
                   BPF_JUMP(BPF_JMP + BPF_JEQ + BPF_K, RPC_DEFPORT, 0, 1),
                   BPF_STMT(BPF_RET + BPF_K, -1),
                   BPF_STMT(BPF_RET + BPF_K, 0),
           };
           struct bpf_program fcode = { 
                   .bf_len = sizeof(insns) / sizeof(struct bpf_insn), 
                   .bf_insns = insns
           };
   
           if (!concurentClients) {
                   rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
                   return NULL;
           }
           if (!csIface) {
                   if (e_get1stiface(szIface, sizeof szIface))
                           return NULL;
           } else
                   strlcpy(szIface, csIface, sizeof szIface);
           if (!e_getifacebyname(szIface, &sa))
                   return NULL;
   
   #ifdef HAVE_SRANDOMDEV
           srandomdev();
   #else
           time_t tim;
   
           srandom((time(&tim) ^ getpid()));
   #endif
   
           srv = e_malloc(sizeof(rpc_srv_t));
           if (!srv) {
                   LOGERR;
                   return NULL;
           } else
                   memset(srv, 0, sizeof(rpc_srv_t));
   
           srv->srv_proto = SOCK_BPF;
           srv->srv_netbuf = netBuf;
           srv->srv_session.sess_version = RPC_VERSION;
           srv->srv_session.sess_instance = InstID;
   
           srv->srv_server.cli_parent = srv;
           memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa);
   
           /* init functions */
           pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
           SLIST_INIT(&srv->srv_funcs);
           AVL_INIT(&srv->srv_funcs);
   
           /* init scheduler */
           srv->srv_root = schedBegin();
           if (!srv->srv_root) {
                   rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                   pthread_mutex_destroy(&srv->srv_funcs.mtx);
                   e_free(srv);
                   return NULL;
           }
   
           /* init pool for clients */
           srv->srv_clients = array_Init(concurentClients);
           if (!srv->srv_clients) {
                   rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
                   schedEnd(&srv->srv_root);
                   pthread_mutex_destroy(&srv->srv_funcs.mtx);
                   e_free(srv);
                   return NULL;
           }
   
           /* create server handler */
           for (i = 0; i < 10; i++) {
                   memset(szStr, 0, sizeof szStr);
                   snprintf(szStr, sizeof szStr, "/dev/bpf%d", i);
                   srv->srv_server.cli_sock = open(szStr, O_RDWR);
                   if (srv->srv_server.cli_sock > STDERR_FILENO)
                           break;
           }
           if (srv->srv_server.cli_sock < 3) {
                   LOGERR;
                   array_Destroy(&srv->srv_clients);
                   schedEnd(&srv->srv_root);
                   pthread_mutex_destroy(&srv->srv_funcs.mtx);
                   e_free(srv);
                   return NULL;
           }
   
           if (ioctl(srv->srv_server.cli_sock, BIOCIMMEDIATE, &n) == -1) {
                   LOGERR;
                   goto err;
           }
           if (ioctl(srv->srv_server.cli_sock, BIOCSETF, &fcode) == -1) {
                   LOGERR;
                   goto err;
           }
           n = (netBuf < RPC_MIN_BUFSIZ) ? getpagesize() : E_ALIGN(netBuf, 2);
           if (ioctl(srv->srv_server.cli_sock, BIOCSBLEN, &n) == -1) {
                   LOGERR;
                   goto err;
           } else
                   srv->srv_netbuf = n;
   
           memset(&ifr, 0, sizeof ifr);
           strlcpy(ifr.ifr_name, szIface, sizeof ifr.ifr_name);
           if (ioctl(srv->srv_server.cli_sock, BIOCSETIF, &ifr) == -1) {
                   LOGERR;
                   goto err;
           } else
                   fcntl(srv->srv_server.cli_sock, F_SETFL, 
                                   fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
   
           rpc_register_srvPing(srv);
   
           return srv;
   err:    /* error condition */
           close(srv->srv_server.cli_sock);
           array_Destroy(&srv->srv_clients);
           schedEnd(&srv->srv_root);
           pthread_mutex_destroy(&srv->srv_funcs.mtx);
           e_free(srv);
           return NULL;
   }
   
   /*
    * rpc_srv_initServerExt() - Init & create pipe RPC Server
    *
    * @InstID = Instance for authentication & recognition
    * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
    * @fd = File descriptor
    * return: NULL == error or !=NULL bind and created RPC server instance
    */
   rpc_srv_t *
   rpc_srv_initServerExt(u_char InstID, int netBuf, int fd)
   {
           rpc_srv_t *srv = NULL;
   
   #ifdef HAVE_SRANDOMDEV
           srandomdev();
   #else
           time_t tim;
   
           srandom((time(&tim) ^ getpid()));
   #endif
   
           srv = e_malloc(sizeof(rpc_srv_t));
           if (!srv) {
                   LOGERR;
                   return NULL;
           } else
                   memset(srv, 0, sizeof(rpc_srv_t));
   
           srv->srv_proto = SOCK_EXT;
           srv->srv_netbuf = (netBuf < RPC_MIN_BUFSIZ) ? 
                   getpagesize() : E_ALIGN(netBuf, 2);
           srv->srv_session.sess_version = RPC_VERSION;
           srv->srv_session.sess_instance = InstID;
   
           srv->srv_server.cli_parent = srv;
           srv->srv_server.cli_sock = fd;
   
           /* init functions */
           pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
           SLIST_INIT(&srv->srv_funcs);
           AVL_INIT(&srv->srv_funcs);
   
           /* init scheduler */
           srv->srv_root = schedBegin();
           if (!srv->srv_root) {
                   rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                   pthread_mutex_destroy(&srv->srv_funcs.mtx);
                   e_free(srv);
                   return NULL;
           }
   
           /* init pool for clients */
           srv->srv_clients = array_Init(1);
           if (!srv->srv_clients) {
                   rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
                   schedEnd(&srv->srv_root);
                   pthread_mutex_destroy(&srv->srv_funcs.mtx);
                   e_free(srv);
                   return NULL;
           }
   
           fcntl(srv->srv_server.cli_sock, F_SETFL, 
                           fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
   
           rpc_register_srvPing(srv);
   
           return srv;
 }  }

Removed from v.1.11.2.3  
changed lines
  Added in v.1.26.2.6


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>