Diff for /libaitrpc/src/srv.c between versions 1.9 and 1.14

version 1.9, 2012/05/14 08:39:06 version 1.14, 2013/03/07 23:10:50
Line 12  terms: Line 12  terms:
 All of the documentation and software included in the ELWIX and AITNET  All of the documentation and software included in the ELWIX and AITNET
 Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>  Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
   
Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013
         by Michael Pounov <misho@elwix.org>.  All rights reserved.          by Michael Pounov <misho@elwix.org>.  All rights reserved.
   
 Redistribution and use in source and binary forms, with or without  Redistribution and use in source and binary forms, with or without
Line 46  SUCH DAMAGE. Line 46  SUCH DAMAGE.
 #include "global.h"  #include "global.h"
   
   
static void *rxPacket(sched_task_t*);/* SOCK_STREAM */
static void *rxBLOB(sched_task_t*);static void *acceptClients(sched_task_t *);
 static void *closeClient(sched_task_t *);
 static void *rxPacket(sched_task_t *);
 static void *txPacket(sched_task_t *);
   
   /* SOCK_DGRAM */
   static void *freeClient(sched_task_t *);
   static void *rxUDPPacket(sched_task_t *);
   static void *txUDPPacket(sched_task_t *);
   
   /* SOCK_RAW */
   
   static sched_task_func_t cbProto[SOCK_RAW + 1][4] = {
           { acceptClients, closeClient, rxPacket, txPacket },     /* SOCK_STREAM */
           { acceptClients, closeClient, rxPacket, txPacket },     /* SOCK_STREAM */
           { rxUDPPacket, freeClient, rxUDPPacket, txUDPPacket },  /* SOCK_DGRAM */
           { NULL, NULL, NULL, NULL }                              /* SOCK_RAW */
   };
   
   
   inline void
   rpc_freeCli(rpc_cli_t * __restrict c)
   {
           rpc_srv_t *s = c->cli_parent;
   
           schedCancelby(s->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
   
           /* free buffer */
           AIT_FREE_VAL(&c->cli_buf);
   
           array_Del(s->srv_clients, c->cli_id, 0);
           if (c)
                   e_free(c);
   }
   
   
   static inline int
   _check4freeslot(rpc_srv_t * __restrict srv, sockaddr_t * __restrict sa)
   {
           rpc_cli_t *c = NULL;
           register int i;
   
           /* check free slots for connect */
           for (i = 0; i < array_Size(srv->srv_clients) && 
                           (c = array(srv->srv_clients, i, rpc_cli_t*)); i++)
                   /* check for duplicates */
                   if (sa && !e_addrcmp(&c->cli_sa, sa, 42))
                           break;
           if (i >= array_Size(srv->srv_clients))
                   return -1;      /* no more free slots! */
   
           return i;
   }
   
   static rpc_cli_t *
   _allocClient(rpc_srv_t * __restrict srv, sockaddr_t * __restrict sa)
   {
           rpc_cli_t *c = NULL;
           int n;
   
           n = _check4freeslot(srv, sa);
           if (n == -1)
                   return NULL;
           else
                   c = array(srv->srv_clients, n, rpc_cli_t*);
   
           if (!c) {
                   c = e_malloc(sizeof(rpc_cli_t));
                   if (!c) {
                           LOGERR;
                           srv->srv_kill = 1;
                           return NULL;
                   } else {
                           memset(c, 0, sizeof(rpc_cli_t));
                           array_Set(srv->srv_clients, n, c);
                           c->cli_id = n;
                           c->cli_parent = srv;
                   }
   
                   /* alloc empty buffer */
                   AIT_SET_BUFSIZ(&c->cli_buf, 0, srv->srv_netbuf);
           }
   
           return c;
   }
   
   
 static void *  static void *
   freeClient(sched_task_t *task)
   {
           rpc_freeCli(TASK_ARG(task));
   
           return NULL;
   }
   
   static void *
   closeClient(sched_task_t *task)
   {
           int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock;
   
           rpc_freeCli(TASK_ARG(task));
   
           /* close client socket */
           shutdown(sock, SHUT_RDWR);
           close(sock);
           return NULL;
   }
   
   static void *
 txPacket(sched_task_t *task)  txPacket(sched_task_t *task)
 {  {
         rpc_cli_t *c = TASK_ARG(task);          rpc_cli_t *c = TASK_ARG(task);
         rpc_srv_t *s = c->cli_parent;          rpc_srv_t *s = c->cli_parent;
         rpc_func_t *f = NULL;          rpc_func_t *f = NULL;
        u_char *buf = TASK_DATA(task);        u_char buf[USHRT_MAX] = { 0 };
         struct tagRPCCall *rpc = (struct tagRPCCall*) buf;          struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
         int ret, wlen = sizeof(struct tagRPCCall);          int ret, wlen = sizeof(struct tagRPCCall);
         array_t *arr = NULL;  
   
           /* copy RPC header */
           memcpy(buf, TASK_DATA(task), wlen);
   
         if (rpc->call_argc) {          if (rpc->call_argc) {
                f = rpc_srv_getCall(s, ntohs(rpc->call_tag), ntohl(rpc->call_hash));                f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
                 if (!f) {                  if (!f) {
                           rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
                         rpc->call_argc ^= rpc->call_argc;                          rpc->call_argc ^= rpc->call_argc;
                         rpc->call_rep.ret = RPC_ERROR(-1);                          rpc->call_rep.ret = RPC_ERROR(-1);
                         rpc->call_rep.eno = RPC_ERROR(rpc_Errno);                          rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                 } else {                  } else {
                        rpc->call_argc = htons(rpc_srv_getVars(f, &arr));                        rpc->call_argc = htons(array_Size(RPC_RETVARS(c)));
                         /* Go Encapsulate variables */                          /* Go Encapsulate variables */
                        ret = io_vars2buffer(buf + wlen, TASK_DATLEN(task) - wlen, arr);                        ret = ait_vars2buffer(buf + wlen, sizeof buf - wlen, RPC_RETVARS(c));
                        io_clrVars(f->func_vars);                        /* Free return values */
                         ait_freeVars(&c->cli_vars);
                         if (ret == -1) {                          if (ret == -1) {
                                 rpc_SetErr(EBADRPC, "Prepare RPC packet failed");                                  rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
                                 rpc->call_argc ^= rpc->call_argc;                                  rpc->call_argc ^= rpc->call_argc;
Line 88  txPacket(sched_task_t *task) Line 198  txPacket(sched_task_t *task)
         rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));          rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
   
         /* send reply */          /* send reply */
        ret = send(TASK_FD(task), buf, wlen, 0);        ret = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);
        if (ret == -1)        if (ret == -1 || ret != wlen) {
                LOGERR;                /* close connection */
        else if (ret != wlen)                schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                rpc_SetErr(EPROCUNAVAIL, "RPC reply, should be send %d bytes, "                                TASK_ARG(task), 0, NULL, 0);
                                "really sended %d bytes", wlen, ret);        }
   
         return NULL;          return NULL;
 }  }
Line 105  execCall(sched_task_t *task) Line 215  execCall(sched_task_t *task)
         rpc_srv_t *s = c->cli_parent;          rpc_srv_t *s = c->cli_parent;
         rpc_func_t *f = NULL;          rpc_func_t *f = NULL;
         array_t *arr = NULL;          array_t *arr = NULL;
        u_char *buf = TASK_DATA(task) + TASK_VAL(task);        u_char *buf = AIT_GET_BUF(&c->cli_buf) + TASK_VAL(task);
         struct tagRPCCall *rpc = (struct tagRPCCall*) buf;          struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
         int argc = ntohs(rpc->call_argc);          int argc = ntohs(rpc->call_argc);
   
         /* Go decapsulate variables ... */          /* Go decapsulate variables ... */
         if (argc) {          if (argc) {
                arr = io_buffer2vars(buf + sizeof(struct tagRPCCall),                 arr = ait_buffer2vars(buf + sizeof(struct tagRPCCall), 
                                TASK_DATLEN(task) - TASK_VAL(task) - sizeof(struct tagRPCCall),                                 AIT_LEN(&c->cli_buf) - TASK_VAL(task) - sizeof(struct tagRPCCall), 
                                argc, 1);                                argc, 42);
                 if (!arr) {                  if (!arr) {
                        rpc_SetErr(ERPCMISMATCH, "#%d - %s", io_GetErrno(), io_GetError());                        rpc_SetErr(ERPCMISMATCH, "#%d - %s", elwix_GetErrno(), elwix_GetError());
                         rpc->call_argc ^= rpc->call_argc;                          rpc->call_argc ^= rpc->call_argc;
                         rpc->call_rep.ret = RPC_ERROR(-1);                          rpc->call_rep.ret = RPC_ERROR(-1);
                         rpc->call_rep.eno = RPC_ERROR(rpc_Errno);                          rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                         return NULL;                          return NULL;
                 }                  }
        }        } else
                 arr = NULL;
   
        if (!(f = rpc_srv_getCall(s, ntohs(rpc->call_tag), ntohl(rpc->call_hash)))) {        if (!(f = rpc_srv_getCall(s, ntohs(rpc->call_tag)))) {
                 rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");                  rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
                 rpc->call_argc ^= rpc->call_argc;                  rpc->call_argc ^= rpc->call_argc;
                 rpc->call_rep.ret = RPC_ERROR(-1);                  rpc->call_rep.ret = RPC_ERROR(-1);
                 rpc->call_rep.eno = RPC_ERROR(rpc_Errno);                  rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
         } else {          } else {
                 /* if client doesn't want reply */                  /* if client doesn't want reply */
                argc = rpc->call_req.flags & RPC_NOREPLY;                argc = RPC_CHK_NOREPLY(rpc);
                rpc->call_rep.ret = RPC_ERROR(rpc_srv_execCall(f, rpc, arr));                rpc->call_rep.ret = RPC_ERROR(rpc_srv_execCall(c, rpc, f->func_name, arr));
                 if (rpc->call_rep.ret == htonl(-1)) {                  if (rpc->call_rep.ret == htonl(-1)) {
                         rpc->call_rep.eno = RPC_ERROR(errno);                          rpc->call_rep.eno = RPC_ERROR(errno);
                         rpc->call_argc ^= rpc->call_argc;                          rpc->call_argc ^= rpc->call_argc;
Line 139  execCall(sched_task_t *task) Line 250  execCall(sched_task_t *task)
                         rpc->call_rep.eno ^= rpc->call_rep.eno;                          rpc->call_rep.eno ^= rpc->call_rep.eno;
                         if (argc) {                          if (argc) {
                                 /* without reply */                                  /* without reply */
                                io_clrVars(f->func_vars);                                ait_freeVars(&c->cli_vars);
                                 rpc->call_argc ^= rpc->call_argc;                                  rpc->call_argc ^= rpc->call_argc;
                        } else                        } else {
                                rpc->call_argc = htons(rpc_srv_getVars(f, NULL));                                /* reply */
                                 rpc->call_argc = htons(array_Size(RPC_RETVARS(c)));
                         }
                 }                  }
         }          }
   
        if (arr)        array_Destroy(&arr);
                io_arrayDestroy(&arr); 
         return NULL;          return NULL;
 }  }
   
Line 156  rxPacket(sched_task_t *task) Line 268  rxPacket(sched_task_t *task)
 {  {
         rpc_cli_t *c = TASK_ARG(task);          rpc_cli_t *c = TASK_ARG(task);
         rpc_srv_t *s = c->cli_parent;          rpc_srv_t *s = c->cli_parent;
        u_char *buf = TASK_DATA(task);        int len, rlen, noreply;
        int rlen, noreply;        u_short crc, off = TASK_DATLEN(task);
        u_short crc, off = 0;        u_char *buf = AIT_GET_BUF(&c->cli_buf);
         struct tagRPCCall *rpc;          struct tagRPCCall *rpc;
         struct timespec ts;  
   
        memset(buf, 0, TASK_DATLEN(task));        if (!off)
        rlen = recv(TASK_FD(task), buf, TASK_DATLEN(task), 0);                memset(buf, 0, AIT_LEN(&c->cli_buf));
        if (rlen == -1) {        rlen = recv(TASK_FD(task), buf + off, AIT_LEN(&c->cli_buf) - off, 0);
                LOGERR;        if (rlen < 1) {
                c->cli_kill = kill;                /* close connection */
                 schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                 TASK_ARG(task), 0, NULL, 0);
                 return NULL;                  return NULL;
        } else if (!rlen) {        /* receive EOF */        } else {
                c->cli_kill = kill;                rlen += off;        /* add reminded bytes from previous rxPacket, if exists! */
                return NULL;                off = 0;        /* process buffer from start offset == 0 */
         }          }
   
         do {          do {
                   /* check RPC packet */
                 if (rlen < sizeof(struct tagRPCCall)) {                  if (rlen < sizeof(struct tagRPCCall)) {
                        rpc_SetErr(ERPCMISMATCH, "Too short RPC packet");                        rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
   
                        schedRead(TASK_ROOT(task), rxPacket, TASK_ARG(task), TASK_FD(task),                         /* reminder received previous bytes ;) */
                                        TASK_DATA(task), TASK_DATLEN(task));                        schedRead(TASK_ROOT(task), TASK_FUNC(task), TASK_ARG(task), 
                                         TASK_FD(task), TASK_DATA(task), rlen);
                         return NULL;                          return NULL;
                 } else                  } else
                         rpc = (struct tagRPCCall*) (buf + off);                          rpc = (struct tagRPCCall*) (buf + off);
   
                rlen -= ntohs(rpc->call_len);                len = ntohs(rpc->call_len);
                 rlen -= len;
   
                   /* check RPC packet lengths */
                   if (rlen < 0 || len < sizeof(struct tagRPCCall)) {
                           rpc_SetErr(ERPCMISMATCH, "Broken RPC packet length");
                           /* skip entire packet */
                           break;
                   }
   
                 /* check integrity of packet */                  /* check integrity of packet */
                 crc = ntohs(rpc->call_crc);                  crc = ntohs(rpc->call_crc);
                 rpc->call_crc ^= rpc->call_crc;                  rpc->call_crc ^= rpc->call_crc;
                if (crc != crcFletcher16((u_short*) (buf + off), ntohs(rpc->call_len) / 2)) {                if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
                         rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");                          rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
   
                        off += ntohs(rpc->call_len);                        off += len;
                        if (rlen < 1)                        /* try next packet remaining into buffer */
                                break;                        continue;
                        else 
                                continue; 
                 }                  }
   
                noreply = rpc->call_req.flags & RPC_NOREPLY;                noreply = RPC_CHK_NOREPLY(rpc);
   
                 /* check RPC packet session info */                  /* check RPC packet session info */
                 if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) {                  if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) {
Line 207  rxPacket(sched_task_t *task) Line 328  rxPacket(sched_task_t *task)
                         rpc->call_rep.ret = RPC_ERROR(-1);                          rpc->call_rep.ret = RPC_ERROR(-1);
                         rpc->call_rep.eno = RPC_ERROR(errno);                          rpc->call_rep.eno = RPC_ERROR(errno);
                 } else {                  } else {
                         /* change socket timeout from last packet */  
                         ts.tv_sec = rpc->call_session.sess_timeout;  
                         ts.tv_nsec = 0;  
                         schedPolling(TASK_ROOT(task), &ts, NULL);  
   
                         /* execute RPC call */                          /* execute RPC call */
                        schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), off,                         schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), off, NULL, 0);
                                        TASK_DATA(task), TASK_DATLEN(task)); 
                 }                  }
   
                 /* send RPC reply */                  /* send RPC reply */
                 if (!noreply)                  if (!noreply)
                        schedWrite(TASK_ROOT(task), txPacket, TASK_ARG(task), TASK_FD(task),                         schedWrite(TASK_ROOT(task), cbProto[s->srv_proto][CB_TXPACKET], 
                                        buf + off, TASK_DATLEN(task) - off);                                        TASK_ARG(task), TASK_FD(task), rpc, len);
   
                off += ntohs(rpc->call_len);                off += len;
         } while (rlen > 0);          } while (rlen > 0);
   
         /* lets get next packet */          /* lets get next packet */
        schedRead(TASK_ROOT(task), rxPacket, TASK_ARG(task), TASK_FD(task),         schedRead(TASK_ROOT(task), TASK_FUNC(task), TASK_ARG(task), TASK_FD(task), TASK_DATA(task), 0);
                        TASK_DATA(task), TASK_DATLEN(task)); 
         return NULL;          return NULL;
 }  }
   
 static void *  static void *
rpc_srv_dispatchCall(void *arg)acceptClients(sched_task_t *task)
 {  {
        rpc_cli_t *c = arg;        rpc_srv_t *srv = TASK_ARG(task);
        rpc_srv_t *s;        rpc_cli_t *c = NULL;
        u_char *buf;        socklen_t salen = sizeof(sockaddr_t);
        sched_root_task_t *root; 
        struct timespec ts = { DEF_RPC_TIMEOUT, 0 }; 
   
        if (!arg) {        c = _allocClient(srv, NULL);
                rpc_SetErr(EINVAL, "Invalid parameter can`t procced RPC client");        if (!c)
                return NULL;                goto end;
        } else 
                s = c->cli_parent; 
   
        /* allocate net buffer */        /* accept client */
        buf = malloc(s->srv_netbuf);        c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
        if (!buf) {        if (c->cli_sock == -1) {
                 LOGERR;                  LOGERR;
                return NULL;                AIT_FREE_VAL(&c->cli_buf);
                 array_Del(srv->srv_clients, c->cli_id, 42);
                 goto end;
         } else
                 fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
 
         schedRead(TASK_ROOT(task), cbProto[srv->srv_proto][CB_RXPACKET], c, 
                         c->cli_sock, NULL, 0);
 end:
         schedReadSelf(task);
         return NULL;
 }
 
 
 static void *
 txUDPPacket(sched_task_t *task)
 {
         rpc_cli_t *c = TASK_ARG(task);
         rpc_srv_t *s = c->cli_parent;
         rpc_func_t *f = NULL;
         u_char buf[USHRT_MAX] = { 0 };
         struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
         int ret, wlen = sizeof(struct tagRPCCall);
         struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
 
         schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
         schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                          TASK_ARG(task), ts, TASK_ARG(task), 0);
 
         /* copy RPC header */
         memcpy(buf, TASK_DATA(task), wlen);
 
         if (rpc->call_argc) {
                 f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
                 if (!f) {
                         rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
                         rpc->call_argc ^= rpc->call_argc;
                         rpc->call_rep.ret = RPC_ERROR(-1);
                         rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                 } else {
                         rpc->call_argc = htons(array_Size(RPC_RETVARS(c)));
                         /* Go Encapsulate variables */
                         ret = ait_vars2buffer(buf + wlen, sizeof buf - wlen, RPC_RETVARS(c));
                         /* Free return values */
                         ait_freeVars(&c->cli_vars);
                         if (ret == -1) {
                                 rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
                                 rpc->call_argc ^= rpc->call_argc;
                                 rpc->call_rep.ret = RPC_ERROR(-1);
                                 rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                         } else
                                 wlen += ret;
                 }
         }          }
   
        root = schedBegin();        rpc->call_len = htons(wlen);
        if (!root) {
                rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());        /* calculate CRC */
                free(buf);        rpc->call_crc ^= rpc->call_crc;
                return NULL;        rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
        } else {
                schedTermCondition(root, kill);        /* send reply */
                schedPolling(root, &ts, NULL);        ret = sendto(TASK_FD(task), buf, wlen, MSG_NOSIGNAL, 
                         &c->cli_sa.sa, c->cli_sa.sa.sa_len);
         if (ret == -1 || ret != wlen) {
                 /* close connection */
                 schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                  TASK_ARG(task), 0, NULL, 0);
         }          }
   
        schedRead(root, rxPacket, c, c->cli_sock, buf, s->srv_netbuf);        return NULL;
 }
   
        schedRun(root, (void*) &c->cli_kill);static void *
        schedEnd(&root);rxUDPPacket(sched_task_t *task)
 {
         rpc_srv_t *srv = TASK_ARG(task);
         rpc_cli_t *c = NULL;
         int len, rlen, noreply;
         u_short crc, off = 0;
         u_char buf[USHRT_MAX + 1];
         struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
         sockaddr_t sa;
         socklen_t salen;
         struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
   
        shutdown(c->cli_sock, SHUT_RDWR);        /* receive connect packet */
        close(c->cli_sock);        salen = sa.ss.ss_len = sizeof(sockaddr_t);
        memset(c, 0, sizeof(rpc_cli_t));        rlen = recvfrom(TASK_FD(task), buf, sizeof buf, 0, &sa.sa, &salen);
        free(buf);        if (rlen < 1)
                 goto end;
 
         c = _allocClient(srv, &sa);
         if (!c)
                 goto end;
         else {
                 c->cli_sock = TASK_FD(task);
                 memcpy(&c->cli_sa, &sa, sizeof c->cli_sa);
                 memcpy(AIT_GET_BUF(&c->cli_buf), buf, AIT_LEN(&c->cli_buf));
                 /* armed timer for close stateless connection */
                 schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
                 schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], 
                                 c, ts, c, 0);
         }
 
         do {
                 /* check RPC packet */
                 if (rlen < sizeof(struct tagRPCCall)) {
                         rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
                         break;
                 } else
                         rpc = (struct tagRPCCall*) (AIT_GET_BUF(&c->cli_buf) + off);
 
                 len = ntohs(rpc->call_len);
                 rlen -= len;
 
                 /* check RPC packet lengths */
                 if (rlen < 0 || len < sizeof(struct tagRPCCall)) {
                         rpc_SetErr(ERPCMISMATCH, "Broken RPC packet length");
                         /* skip entire packet */
                         break;
                 }
 
                 /* check integrity of packet */
                 crc = ntohs(rpc->call_crc);
                 rpc->call_crc ^= rpc->call_crc;
                 if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
                         rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
 
                         off += len;
                         /* try next packet remaining into buffer */
                         continue;
                 }
 
                 noreply = RPC_CHK_NOREPLY(rpc);
 
                 /* check RPC packet session info */
                 if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
                         rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
                         rpc->call_argc ^= rpc->call_argc;
                         rpc->call_rep.ret = RPC_ERROR(-1);
                         rpc->call_rep.eno = RPC_ERROR(errno);
                 } else {
                         /* execute RPC call */
                         schedEvent(TASK_ROOT(task), execCall, c, off, NULL, 0);
                 }
 
                 /* send RPC reply */
                 if (!noreply)
                         schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], 
                                         c, TASK_FD(task), rpc, len);
 
                 off += len;
         } while (rlen > 0);
 
 end:
         schedReadSelf(task);
         return NULL;          return NULL;
 }  }
   
   /* ------------------------------------------------------ */
   
   inline void
   rpc_freeBLOBCli(rpc_cli_t * __restrict c)
   {
           rpc_srv_t *s = c->cli_parent;
   
           schedCancelby(s->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
   
           /* free buffer */
           AIT_FREE_VAL(&c->cli_buf);
   
           array_Del(s->srv_blob.clients, c->cli_id, 0);
           if (c)
                   e_free(c);
   }
   
   
 static void *  static void *
   closeBLOBClient(sched_task_t *task)
   {
           int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock;
   
           rpc_freeBLOBCli(TASK_ARG(task));
   
           /* close client socket */
           shutdown(sock, SHUT_RDWR);
           close(sock);
           return NULL;
   }
   
   static void *
 txBLOB(sched_task_t *task)  txBLOB(sched_task_t *task)
 {  {
        u_char *buf = TASK_DATA(task);        rpc_cli_t *c = TASK_ARG(task);
         u_char *buf = AIT_GET_BUF(&c->cli_buf);
         struct tagBLOBHdr *blob = (struct tagBLOBHdr *) buf;          struct tagBLOBHdr *blob = (struct tagBLOBHdr *) buf;
         int wlen = sizeof(struct tagBLOBHdr);          int wlen = sizeof(struct tagBLOBHdr);
   
Line 288  txBLOB(sched_task_t *task) Line 564  txBLOB(sched_task_t *task)
         blob->hdr_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));          blob->hdr_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
   
         /* send reply */          /* send reply */
        wlen = send(TASK_FD(task), buf, wlen, 0);        wlen = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);
        if (wlen == -1)        if (wlen == -1 || wlen != sizeof(struct tagBLOBHdr)) {
                LOGERR;                /* close blob connection */
        else if (wlen != sizeof(struct tagBLOBHdr))                schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
                rpc_SetErr(EPROCUNAVAIL, "RPC reply, should be send %d bytes, "        }
                                "really sended %d bytes", sizeof(struct tagBLOBHdr), wlen); 
   
         return NULL;          return NULL;
 }  }
Line 304  rxBLOB(sched_task_t *task) Line 579  rxBLOB(sched_task_t *task)
         rpc_cli_t *c = TASK_ARG(task);          rpc_cli_t *c = TASK_ARG(task);
         rpc_srv_t *s = c->cli_parent;          rpc_srv_t *s = c->cli_parent;
         rpc_blob_t *b;          rpc_blob_t *b;
        u_char *buf = TASK_DATA(task);        struct tagBLOBHdr blob;
        struct tagBLOBHdr *blob = (struct tagBLOBHdr *) buf; 
         int rlen;          int rlen;
         u_short crc;          u_short crc;
         struct timespec ts;  
   
        /* check for disable service at this moment? */        memset(&blob, 0, sizeof blob);
        if (!s || s->srv_blob.state == disable) {        rlen = recv(TASK_FD(task), &blob, sizeof blob, 0);
                usleep(100000);        if (rlen < 1) {
#ifdef HAVE_PTHREAD_YIELD                /* close blob connection */
                pthread_yield();                schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
#endif 
                schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task),  
                                TASK_DATA(task), TASK_DATLEN(task)); 
                 return NULL;                  return NULL;
         }          }
   
        memset(buf, 0, TASK_DATLEN(task));        /* check BLOB packet */
        rlen = recv(TASK_FD(task), buf, TASK_DATLEN(task), 0); 
        if (rlen == -1) { 
                LOGERR; 
                c->cli_kill = kill; 
                return NULL; 
        } else if (!rlen) { /* receive EOF */ 
                c->cli_kill = kill; 
                return NULL; 
        } 
 
         if (rlen < sizeof(struct tagBLOBHdr)) {          if (rlen < sizeof(struct tagBLOBHdr)) {
                rpc_SetErr(ERPCMISMATCH, "Too short BLOB packet");                rpc_SetErr(ERPCMISMATCH, "Short BLOB packet");
                schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task), 
                                TASK_DATA(task), TASK_DATLEN(task));                schedReadSelf(task);
                 return NULL;                  return NULL;
         }          }
   
         /* check integrity of packet */          /* check integrity of packet */
        crc = ntohs(blob->hdr_crc);        crc = ntohs(blob.hdr_crc);
        blob->hdr_crc ^= blob->hdr_crc;        blob.hdr_crc ^= blob.hdr_crc;
        if (crc != crcFletcher16((u_short*) buf, rlen / 2)) {        if (crc != crcFletcher16((u_short*) &blob, rlen / 2)) {
                 rpc_SetErr(ERPCMISMATCH, "Bad CRC BLOB packet");                  rpc_SetErr(ERPCMISMATCH, "Bad CRC BLOB packet");
                schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task), 
                                TASK_DATA(task), TASK_DATLEN(task));                schedReadSelf(task);
                 return NULL;                  return NULL;
         }          }
   
         /* check RPC packet session info */          /* check RPC packet session info */
        if ((crc = rpc_chkPktSession(&blob->hdr_session, &s->srv_session))) {        if ((crc = rpc_chkPktSession(&blob.hdr_session, &s->srv_session))) {
                 rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");                  rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
                blob->hdr_cmd = error;                blob.hdr_cmd = error;
                 goto end;                  goto end;
         } else {  
                 /* change socket timeout from last packet */  
                 ts.tv_sec = blob->hdr_session.sess_timeout;  
                 ts.tv_nsec = 0;  
                 schedPolling(TASK_ROOT(task), &ts, NULL);  
         }          }
   
         /* Go to proceed packet ... */          /* Go to proceed packet ... */
        switch (blob->hdr_cmd) {        switch (blob.hdr_cmd) {
                 case get:                  case get:
                        if (!(b = rpc_srv_getBLOB(s, ntohl(blob->hdr_var)))) {                        if (!(b = rpc_srv_getBLOB(s, ntohl(blob.hdr_var)))) {
                                rpc_SetErr(EINVAL, "Var=%x not found", ntohl(blob->hdr_var));                                rpc_SetErr(EINVAL, "Var=%x not found", ntohl(blob.hdr_var));
                                blob->hdr_cmd = no;                                blob.hdr_cmd = no;
                                blob->hdr_ret = RPC_ERROR(-1);                                blob.hdr_ret = RPC_ERROR(-1);
                                 break;                                  break;
                         } else                          } else
                                blob->hdr_len = htonl(b->blob_len);                                blob.hdr_len = htonl(b->blob_len);
   
                         if (rpc_srv_blobMap(s, b) != -1) {                          if (rpc_srv_blobMap(s, b) != -1) {
                                 /* deliver BLOB variable to client */                                  /* deliver BLOB variable to client */
                                blob->hdr_ret = htonl(rpc_srv_sendBLOB(c, b));                                blob.hdr_ret = htonl(rpc_srv_sendBLOB(c, b));
                                 rpc_srv_blobUnmap(b);                                  rpc_srv_blobUnmap(b);
                         } else {                          } else {
                                blob->hdr_cmd = error;                                blob.hdr_cmd = error;
                                blob->hdr_ret = RPC_ERROR(-1);                                blob.hdr_ret = RPC_ERROR(-1);
                         }                          }
                         break;                          break;
                 case set:                  case set:
                        if ((b = rpc_srv_registerBLOB(s, ntohl(blob->hdr_len)))) {                        if ((b = rpc_srv_registerBLOB(s, ntohl(blob.hdr_len)))) {
                                 /* set new BLOB variable for reply :) */                                  /* set new BLOB variable for reply :) */
                                blob->hdr_var = htonl(b->blob_var);                                blob.hdr_var = htonl(b->blob_var);
   
                                 /* receive BLOB from client */                                  /* receive BLOB from client */
                                blob->hdr_ret = htonl(rpc_srv_recvBLOB(c, b));                                blob.hdr_ret = htonl(rpc_srv_recvBLOB(c, b));
                                 rpc_srv_blobUnmap(b);                                  rpc_srv_blobUnmap(b);
                         } else {                          } else {
                                blob->hdr_cmd = error;                                blob.hdr_cmd = error;
                                blob->hdr_ret = RPC_ERROR(-1);                                blob.hdr_ret = RPC_ERROR(-1);
                         }                          }
                         break;                          break;
                 case unset:                  case unset:
                        if (rpc_srv_unregisterBLOB(s, blob->hdr_var) == -1) {                        if (rpc_srv_unregisterBLOB(s, ntohl(blob.hdr_var)) == -1) {
                                blob->hdr_cmd = error;                                blob.hdr_cmd = error;
                                blob->hdr_ret = RPC_ERROR(-1);                                blob.hdr_ret = RPC_ERROR(-1);
                         }                          }
                         break;                          break;
                 default:                  default:
                        rpc_SetErr(EPROCUNAVAIL, "Unsupported BLOB command %d", blob->hdr_cmd);                        rpc_SetErr(EPROCUNAVAIL, "Unsupported BLOB command %d", blob.hdr_cmd);
                        blob->hdr_cmd = error;                        blob.hdr_cmd = error;
                        blob->hdr_ret = RPC_ERROR(-1);                        blob.hdr_ret = RPC_ERROR(-1);
         }          }
   
 end:  end:
        schedWrite(TASK_ROOT(task), txBLOB, TASK_ARG(task), TASK_FD(task),         memcpy(AIT_ADDR(&c->cli_buf), &blob, sizeof blob);
                        TASK_DATA(task), TASK_DATLEN(task));        schedWrite(TASK_ROOT(task), txBLOB, TASK_ARG(task), TASK_FD(task), NULL, 0);
        schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task),         schedReadSelf(task);
                        TASK_DATA(task), TASK_DATLEN(task)); 
         return NULL;          return NULL;
 }  }
   
 static void *  static void *
rpc_srv_dispatchVars(void *arg)acceptBLOBClients(sched_task_t *task)
 {  {
        rpc_cli_t *c = arg;        rpc_srv_t *srv = TASK_ARG(task);
        rpc_srv_t *s;        rpc_cli_t *c = NULL;
        sched_root_task_t *root;        register int i;
        u_char *buf;        socklen_t salen = sizeof(sockaddr_t);
        struct timespec ts = { DEF_RPC_TIMEOUT, 0 };#ifdef TCP_NOPUSH
         int n = 1;
 #endif
   
        if (!arg) {        /* check free slots for connect */
                rpc_SetErr(EINVAL, "Invalid parameter can`t procced BLOB");        for (i = 0; i < array_Size(srv->srv_blob.clients) && 
                return NULL;                        (c = array(srv->srv_blob.clients, i, rpc_cli_t*)); i++);
        } else        if (c)  /* no more free slots! */
                s = c->cli_parent;                goto end;
        c = e_malloc(sizeof(rpc_cli_t));
        /* allocate net buffer */        if (!c) {
        buf = malloc(sizeof(struct tagBLOBHdr)); 
        if (!buf) { 
                 LOGERR;                  LOGERR;
                   srv->srv_kill = srv->srv_blob.kill = 1;
                 return NULL;                  return NULL;
           } else {
                   memset(c, 0, sizeof(rpc_cli_t));
                   array_Set(srv->srv_blob.clients, i, c);
                   c->cli_id = i;
                   c->cli_parent = srv;
         }          }
   
        root = schedBegin();        /* alloc empty buffer */
        if (!root) {        AIT_SET_BUFSIZ(&c->cli_buf, 0, srv->srv_netbuf);
                rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                free(buf);        /* accept client */
                return NULL;        c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
         if (c->cli_sock == -1) {
                 LOGERR;
                 AIT_FREE_VAL(&c->cli_buf);
                 array_Del(srv->srv_blob.clients, i, 42);
                 goto end;
         } else {          } else {
                schedTermCondition(root, kill);#ifdef TCP_NOPUSH
                schedPolling(root, &ts, NULL);                setsockopt(c->cli_sock, IPPROTO_TCP, TCP_NOPUSH, &n, sizeof n);
 #endif
                 fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
         }          }
   
        schedRead(root, rxBLOB, c, c->cli_sock, buf, sizeof(struct tagBLOBHdr));        schedRead(TASK_ROOT(task), rxBLOB, c, c->cli_sock, NULL, 0);
end:
        schedRun(root, (void*) &c->cli_kill);        schedReadSelf(task);
        schedEnd(&root); 
 
        shutdown(c->cli_sock, SHUT_RDWR); 
        close(c->cli_sock); 
        memset(c, 0, sizeof(rpc_cli_t)); 
        free(buf); 
         return NULL;          return NULL;
 }  }
   
// -------------------------------------------------/* ------------------------------------------------------ */
   
 /*  /*
  * rpc_srv_initBLOBServer() - Init & create BLOB Server   * rpc_srv_initBLOBServer() - Init & create BLOB Server
Line 472  int Line 733  int
 rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)  rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)
 {  {
         int n = 1;          int n = 1;
         io_sockaddr_t sa;  
   
        if (!srv) {        if (!srv || srv->srv_kill) {
                 rpc_SetErr(EINVAL, "Invalid parameters can`t init BLOB server");                  rpc_SetErr(EINVAL, "Invalid parameters can`t init BLOB server");
                 return -1;                  return -1;
         }          }
         if (srv->srv_blob.state) {  
                 rpc_SetErr(EPERM, "Already started BLOB server!");  
                 return 0;  
         }  
   
         memset(&srv->srv_blob, 0, sizeof srv->srv_blob);          memset(&srv->srv_blob, 0, sizeof srv->srv_blob);
         if (access(diskDir, R_OK | W_OK) == -1) {          if (access(diskDir, R_OK | W_OK) == -1) {
Line 490  rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s Line 746  rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s
         } else          } else
                 AIT_SET_STR(&srv->srv_blob.dir, diskDir);                  AIT_SET_STR(&srv->srv_blob.dir, diskDir);
   
        srv->srv_blob.server.cli_tid = pthread_self();        /* init blob list */
         TAILQ_INIT(&srv->srv_blob.blobs);
 
         srv->srv_blob.server.cli_parent = srv;          srv->srv_blob.server.cli_parent = srv;
   
        memcpy(&sa, &srv->srv_server.cli_sa, sizeof sa);        memcpy(&srv->srv_blob.server.cli_sa, &srv->srv_server.cli_sa, sizeof(sockaddr_t));
        switch (sa.sa.sa_family) {        switch (srv->srv_blob.server.cli_sa.sa.sa_family) {
                 case AF_INET:                  case AF_INET:
                        sa.sin.sin_port = htons(Port ? Port : ntohs(sa.sin.sin_port) + 1);                        srv->srv_blob.server.cli_sa.sin.sin_port = 
                                 htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin.sin_port) + 1);
                         break;                          break;
                 case AF_INET6:                  case AF_INET6:
                        sa.sin6.sin6_port = htons(Port ? Port : ntohs(sa.sin6.sin6_port) + 1);                        srv->srv_blob.server.cli_sa.sin6.sin6_port = 
                                 htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin6.sin6_port) + 1);
                         break;                          break;
                 case AF_LOCAL:                  case AF_LOCAL:
                        strlcat(sa.sun.sun_path, ".blob", sizeof sa.sun.sun_path);                        strlcat(srv->srv_blob.server.cli_sa.sun.sun_path, ".blob", 
                                         sizeof srv->srv_blob.server.cli_sa.sun.sun_path);
                         break;                          break;
                 default:                  default:
                         AIT_FREE_VAL(&srv->srv_blob.dir);                          AIT_FREE_VAL(&srv->srv_blob.dir);
                         return -1;                          return -1;
         }          }
         memcpy(&srv->srv_blob.server.cli_sa, &sa, sizeof sa);  
   
         /* create BLOB server socket */          /* create BLOB server socket */
         srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);          srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);
Line 542  rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s Line 802  rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s
                 close(srv->srv_blob.server.cli_sock);                  close(srv->srv_blob.server.cli_sock);
                 AIT_FREE_VAL(&srv->srv_blob.dir);                  AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;                  return -1;
        }        } else
                 fcntl(srv->srv_blob.server.cli_sock, F_SETFL, 
                                 fcntl(srv->srv_blob.server.cli_sock, F_GETFL) | O_NONBLOCK);
   
        /* allocate pool for concurent clients */
        srv->srv_blob.clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));        /* allocate pool for concurent blob clients */
         srv->srv_blob.clients = array_Init(array_Size(srv->srv_clients));
         if (!srv->srv_blob.clients) {          if (!srv->srv_blob.clients) {
                LOGERR;                rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
                 close(srv->srv_blob.server.cli_sock);                  close(srv->srv_blob.server.cli_sock);
                 AIT_FREE_VAL(&srv->srv_blob.dir);                  AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;                  return -1;
        } else        }
                memset(srv->srv_blob.clients, 0, srv->srv_numcli * sizeof(rpc_cli_t)); 
   
        rpc_srv_registerCall(srv, NULL, CALL_BLOBSHUTDOWN, 0);        /* init blob scheduler */
        rpc_srv_registerCall(srv, NULL, CALL_BLOBCLIENTS, 1);        srv->srv_blob.root = schedBegin();
        rpc_srv_registerCall(srv, NULL, CALL_BLOBVARS, 1);        if (!srv->srv_blob.root) {
        rpc_srv_registerCall(srv, NULL, CALL_BLOBSTATE, 0);                rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                 array_Destroy(&srv->srv_blob.clients);
                 close(srv->srv_blob.server.cli_sock);
                 AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;
         }
   
         srv->srv_blob.state = enable;   /* enable BLOB */  
         return 0;          return 0;
 }  }
   
Line 569  rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s Line 835  rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s
  * @srv = RPC Server instance   * @srv = RPC Server instance
  * return: none   * return: none
  */   */
voidinline void
 rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)  rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
 {  {
        rpc_cli_t *c;        if (!srv)
        register int i; 
        rpc_blob_t *f; 
 
        if (!srv || srv->srv_blob.state == disable) 
                 return;                  return;
         else  
                 srv->srv_blob.state = kill;  
   
        rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSHUTDOWN);        srv->srv_blob.kill = 1;
        rpc_srv_unregisterCall(srv, NULL, CALL_BLOBCLIENTS); 
        rpc_srv_unregisterCall(srv, NULL, CALL_BLOBVARS); 
        rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSTATE); 
 
        /* close all clients connections & server socket */ 
        for (i = 0, c = srv->srv_blob.clients; i < srv->srv_numcli && c; i++, c++) 
                if (c->cli_sa.sa.sa_family) 
                        shutdown(c->cli_sock, SHUT_RDWR); 
        close(srv->srv_blob.server.cli_sock); 
 
        if (srv->srv_blob.clients) { 
                free(srv->srv_blob.clients); 
                srv->srv_blob.clients = NULL; 
        } 
 
        /* detach blobs */ 
        while ((f = srv->srv_blob.blobs)) { 
                srv->srv_blob.blobs = f->blob_next; 
                rpc_srv_blobFree(srv, f); 
                free(f); 
        } 
 
        AIT_FREE_VAL(&srv->srv_blob.dir); 
 
        /* at final, save disable BLOB service state */ 
        srv->srv_blob.state = disable; 
 }  }
   
 /*  /*
 * rpc_srv_loopBLOB() - Execute Main BLOB server loop and wait for clients requests * rpc_srv_loopBLOBServer() - Execute Main BLOB server loop and wait for clients requests
  *   *
  * @srv = RPC Server instance   * @srv = RPC Server instance
  * return: -1 error or 0 ok, infinite loop ...   * return: -1 error or 0 ok, infinite loop ...
  */   */
 int  int
rpc_srv_loopBLOB(rpc_srv_t * __restrict srv)rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv)
 {  {
         socklen_t salen = sizeof(io_sockaddr_t);  
         register int i;  
         rpc_cli_t *c;          rpc_cli_t *c;
        int ret;        register int i;
        pthread_attr_t attr;        rpc_blob_t *b, *tmp;
        struct pollfd pfd;        struct timespec ts = { RPC_SCHED_POLLING, 0 };
   
        if (!srv || srv->srv_blob.state == kill) {        if (!srv || srv->srv_kill) {
                 rpc_SetErr(EINVAL, "Invalid parameter can`t start BLOB server");                  rpc_SetErr(EINVAL, "Invalid parameter can`t start BLOB server");
                 return -1;                  return -1;
         }          }
   
        if (listen(srv->srv_blob.server.cli_sock, SOMAXCONN) == -1) {        if (listen(srv->srv_blob.server.cli_sock, array_Size(srv->srv_blob.clients)) == -1) {
                 LOGERR;                  LOGERR;
                 return -1;                  return -1;
        } else        }
                fcntl(srv->srv_blob.server.cli_sock, F_SETFL,  
                                fcntl(srv->srv_blob.server.cli_sock, F_GETFL) | O_NONBLOCK); 
   
        pthread_attr_init(&attr);        if (!schedRead(srv->srv_blob.root, acceptBLOBClients, srv, 
        pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);                                srv->srv_blob.server.cli_sock, NULL, 0)) {
                 rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                 return -1;
         }
   
        pfd.fd = srv->srv_blob.server.cli_sock;        schedPolling(srv->srv_blob.root, &ts, NULL);
        pfd.events = POLLIN | POLLPRI;        /* main rpc loop */
        /* main BLOB loop */        schedRun(srv->srv_blob.root, &srv->srv_blob.kill);
        while (srv->srv_blob.state != kill && srv->srv_kill != kill) { 
                for (c = srv->srv_blob.clients, i = 0; i < srv->srv_numcli && c; i++, c++) 
                        if (!c->cli_sa.sa.sa_family) 
                                break; 
                if (i >= srv->srv_numcli) { 
#ifdef HAVE_PTHREAD_YIELD 
                        pthread_yield(); 
#else 
                        usleep(1000000); 
#endif 
                        continue; 
                } 
   
                if ((ret = poll(&pfd, 1, srv->srv_session.sess_timeout * 1000)) == -1 ||         /* close all clients connections & server socket */
                                pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {        for (i = 0; i < array_Size(srv->srv_blob.clients); i++) {
                        LOGERR;                c = array(srv->srv_blob.clients, i, rpc_cli_t*);
                        ret = 1;                if (c) {
                        break;                        shutdown(c->cli_sock, SHUT_RDWR);
                } else if (!ret)                        close(c->cli_sock);
                        continue; 
   
                c->cli_sock = accept(srv->srv_blob.server.cli_sock, &c->cli_sa.sa, &salen);                        schedCancelby(srv->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
                if (c->cli_sock == -1) {                        AIT_FREE_VAL(&c->cli_buf);
                        LOGERR; 
                        continue; 
                } else 
                        c->cli_parent = srv; 
 
                /* spawn dispatch thread for BLOB client */ 
                c->cli_kill = enable; 
                if (pthread_create(&c->cli_tid, &attr, rpc_srv_dispatchVars, c)) { 
                        LOGERR; 
                        continue; 
                 }                  }
                   array_Del(srv->srv_blob.clients, i, 42);
         }          }
           array_Destroy(&srv->srv_blob.clients);
   
        srv->srv_blob.state = kill;        close(srv->srv_blob.server.cli_sock);
   
        pthread_attr_destroy(&attr);        /* detach blobs */
         TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
                 TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
 
                 rpc_srv_blobFree(srv, b);
                 e_free(b);
         }
 
         schedEnd(&srv->srv_blob.root);
         AIT_FREE_VAL(&srv->srv_blob.dir);
         return 0;          return 0;
 }  }
   
Line 693  rpc_srv_loopBLOB(rpc_srv_t * __restrict srv) Line 914  rpc_srv_loopBLOB(rpc_srv_t * __restrict srv)
  * @regProgID = ProgramID for authentication & recognition   * @regProgID = ProgramID for authentication & recognition
  * @regProcID = ProcessID for authentication & recognition   * @regProcID = ProcessID for authentication & recognition
  * @concurentClients = Concurent clients at same time to this server   * @concurentClients = Concurent clients at same time to this server
 * @netBuf = Network buffer length, if =0 == BUFSIZ (also meaning max RPC packet) * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
 * @family = Family type, AF_INET, AF_INET6 or AF_LOCAL 
  * @csHost = Host name or address for bind server, if NULL any address   * @csHost = Host name or address for bind server, if NULL any address
  * @Port = Port for bind server, if Port == 0 default port is selected   * @Port = Port for bind server, if Port == 0 default port is selected
    * @proto = Protocol, if == 0 choose SOCK_STREAM
  * return: NULL == error or !=NULL bind and created RPC server instance   * return: NULL == error or !=NULL bind and created RPC server instance
  */   */
 rpc_srv_t *  rpc_srv_t *
rpc_srv_initServer(u_int regProgID, u_int regProcID, int concurentClients, rpc_srv_initServer(u_int regProgID, u_char regProcID, int concurentClients, 
                int netBuf, u_short family, const char *csHost, u_short Port)                int netBuf, const char *csHost, u_short Port, int proto)
 {  {
         rpc_srv_t *srv = NULL;  
         int n = 1;          int n = 1;
        struct hostent *host = NULL;        rpc_srv_t *srv = NULL;
        io_sockaddr_t sa;        sockaddr_t sa = E_SOCKADDR_INIT;
   
        if (!concurentClients || !regProgID ||         if (!concurentClients || !regProgID || (proto < 0 || proto > SOCK_DGRAM)) {
                        (family != AF_INET && family != AF_INET6 && family != AF_LOCAL)) {                rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
                rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t init RPC server ...\n"); 
                 return NULL;                  return NULL;
         }          }
           if (!e_gethostbyname(csHost, Port, &sa))
                   return NULL;
         if (!Port)          if (!Port)
                 Port = RPC_DEFPORT;                  Port = RPC_DEFPORT;
        if (!netBuf)        if (!proto)
                 proto = SOCK_STREAM;
         if (netBuf < RPC_MIN_BUFSIZ)
                 netBuf = BUFSIZ;                  netBuf = BUFSIZ;
         else          else
                netBuf = io_align(netBuf, 1);      /* align netBuf length */                netBuf = E_ALIGN(netBuf, 2);      /* align netBuf length */
        if (csHost && family != AF_LOCAL) { 
                host = gethostbyname2(csHost, family); 
                if (!host) { 
                        rpc_SetErr(h_errno, "Error:: %s\n", hstrerror(h_errno)); 
                        return NULL; 
                } 
        } 
        memset(&sa, 0, sizeof sa); 
        sa.sa.sa_family = family; 
        switch (family) { 
                case AF_INET: 
                        sa.sin.sin_len = sizeof(struct sockaddr_in); 
                        sa.sin.sin_port = htons(Port); 
                        if (csHost) 
                                memcpy(&sa.sin.sin_addr, host->h_addr, host->h_length); 
                        break; 
                case AF_INET6: 
                        sa.sin6.sin6_len = sizeof(struct sockaddr_in6); 
                        sa.sin6.sin6_port = htons(Port); 
                        if (csHost) 
                                memcpy(&sa.sin6.sin6_addr, host->h_addr, host->h_length); 
                        break; 
                case AF_LOCAL: 
                        sa.sun.sun_len = sizeof(struct sockaddr_un); 
                        if (csHost) 
                                strlcpy(sa.sun.sun_path, csHost, sizeof sa.sun.sun_path); 
                        unlink(sa.sun.sun_path); 
                        break; 
                default: 
                        rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t start RPC server ...\n"); 
                        return NULL; 
        } 
   
        srv = malloc(sizeof(rpc_srv_t));#ifdef HAVE_SRANDOMDEV
         srandomdev();
 #else
         time_t tim;
 
         srandom((time(&tim) ^ getpid()));
 #endif
 
         srv = e_malloc(sizeof(rpc_srv_t));
         if (!srv) {          if (!srv) {
                 LOGERR;                  LOGERR;
                 return NULL;                  return NULL;
         } else          } else
                 memset(srv, 0, sizeof(rpc_srv_t));                  memset(srv, 0, sizeof(rpc_srv_t));
   
           srv->srv_proto = proto;
         srv->srv_netbuf = netBuf;          srv->srv_netbuf = netBuf;
         srv->srv_numcli = concurentClients;  
         srv->srv_session.sess_version = RPC_VERSION;          srv->srv_session.sess_version = RPC_VERSION;
         srv->srv_session.sess_timeout = DEF_RPC_TIMEOUT;  
         srv->srv_session.sess_program = regProgID;          srv->srv_session.sess_program = regProgID;
         srv->srv_session.sess_process = regProcID;          srv->srv_session.sess_process = regProcID;
   
         srv->srv_server.cli_tid = pthread_self();  
         srv->srv_server.cli_parent = srv;          srv->srv_server.cli_parent = srv;
        memcpy(&srv->srv_server.cli_sa, &sa, sizeof sa);        memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa);
   
           /* init functions */
           pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
           SLIST_INIT(&srv->srv_funcs);
           AVL_INIT(&srv->srv_funcs);
   
           /* init scheduler */
           srv->srv_root = schedBegin();
           if (!srv->srv_root) {
                   rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                   pthread_mutex_destroy(&srv->srv_funcs.mtx);
                   e_free(srv);
                   return NULL;
           }
   
           /* init pool for clients */
           srv->srv_clients = array_Init(concurentClients);
           if (!srv->srv_clients) {
                   rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
                   schedEnd(&srv->srv_root);
                   pthread_mutex_destroy(&srv->srv_funcs.mtx);
                   e_free(srv);
                   return NULL;
           }
   
         /* create server socket */          /* create server socket */
        srv->srv_server.cli_sock = socket(family, SOCK_STREAM, 0);        srv->srv_server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, srv->srv_proto, 0);
         if (srv->srv_server.cli_sock == -1) {          if (srv->srv_server.cli_sock == -1) {
                 LOGERR;                  LOGERR;
                free(srv);                array_Destroy(&srv->srv_clients);
                 schedEnd(&srv->srv_root);
                 pthread_mutex_destroy(&srv->srv_funcs.mtx);
                 e_free(srv);
                 return NULL;                  return NULL;
         }          }
         if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {          if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
                 LOGERR;                  LOGERR;
                close(srv->srv_server.cli_sock);                goto err;
                free(srv); 
                return NULL; 
         }          }
         n = srv->srv_netbuf;          n = srv->srv_netbuf;
         if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {          if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
                 LOGERR;                  LOGERR;
                close(srv->srv_server.cli_sock);                goto err;
                free(srv); 
                return NULL; 
         }          }
         if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {          if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
                 LOGERR;                  LOGERR;
                close(srv->srv_server.cli_sock);                goto err;
                free(srv); 
                return NULL; 
         }          }
         if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa,           if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa, 
                                 srv->srv_server.cli_sa.sa.sa_len) == -1) {                                  srv->srv_server.cli_sa.sa.sa_len) == -1) {
                 LOGERR;                  LOGERR;
                close(srv->srv_server.cli_sock);                goto err;
                free(srv); 
                return NULL; 
        } 
 
        /* allocate pool for concurent clients */ 
        srv->srv_clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t)); 
        if (!srv->srv_clients) { 
                LOGERR; 
                close(srv->srv_server.cli_sock); 
                free(srv); 
                return NULL; 
         } else          } else
                memset(srv->srv_clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));                fcntl(srv->srv_server.cli_sock, F_SETFL, 
                                 fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
   
        rpc_srv_registerCall(srv, NULL, CALL_SRVSHUTDOWN, 0);        rpc_register_srvPing(srv);
        rpc_srv_registerCall(srv, NULL, CALL_SRVCLIENTS, 1); 
        rpc_srv_registerCall(srv, NULL, CALL_SRVSESSIONS, 4); 
        rpc_srv_registerCall(srv, NULL, CALL_SRVCALLS, 1); 
   
         return srv;          return srv;
   err:    /* error condition */
           close(srv->srv_server.cli_sock);
           array_Destroy(&srv->srv_clients);
           schedEnd(&srv->srv_root);
           pthread_mutex_destroy(&srv->srv_funcs.mtx);
           e_free(srv);
           return NULL;
 }  }
   
 /*  /*
Line 828  rpc_srv_initServer(u_int regProgID, u_int regProcID, i Line 1040  rpc_srv_initServer(u_int regProgID, u_int regProcID, i
  * @psrv = RPC Server instance   * @psrv = RPC Server instance
  * return: none   * return: none
  */   */
voidinline void
 rpc_srv_endServer(rpc_srv_t ** __restrict psrv)  rpc_srv_endServer(rpc_srv_t ** __restrict psrv)
 {  {
        rpc_cli_t *c;        if (!psrv || !*psrv)
        register int i; 
        rpc_func_t *f; 
 
        if (!psrv || !*psrv) { 
                rpc_SetErr(EINVAL, "Error:: Can`t destroy server because parameter is null!\n"); 
                 return;                  return;
         }  
   
        rpc_srv_endBLOBServer(*psrv);        /* if send kill to blob server */
         if (!(*psrv)->srv_blob.kill)
                 rpc_srv_endBLOBServer(*psrv);
   
        /* close all clients connections & server socket */        (*psrv)->srv_kill = 1;
        for (i = 0, c = (*psrv)->srv_clients; i < (*psrv)->srv_numcli && c; i++, c++)        sleep(RPC_SCHED_POLLING);
                if (c->cli_sa.sa.sa_family) { 
                        shutdown(c->cli_sock, SHUT_RDWR); 
                        close(c->cli_sock); 
                } 
        close((*psrv)->srv_server.cli_sock); 
   
        if ((*psrv)->srv_clients) {        pthread_mutex_destroy(&(*psrv)->srv_funcs.mtx);
                free((*psrv)->srv_clients);        e_free(*psrv);
                (*psrv)->srv_clients = NULL; 
                (*psrv)->srv_numcli = 0; 
        } 
 
        /* detach exported calls */ 
        while ((f = (*psrv)->srv_funcs)) { 
                (*psrv)->srv_funcs = f->func_next; 
                io_freeVars(&f->func_vars); 
                AIT_FREE_VAL(&f->func_name); 
                AIT_FREE_VAL(&f->func_file); 
                free(f); 
        } 
 
        free(*psrv); 
         *psrv = NULL;          *psrv = NULL;
 }  }
   
Line 878  rpc_srv_endServer(rpc_srv_t ** __restrict psrv) Line 1067  rpc_srv_endServer(rpc_srv_t ** __restrict psrv)
 int  int
 rpc_srv_loopServer(rpc_srv_t * __restrict srv)  rpc_srv_loopServer(rpc_srv_t * __restrict srv)
 {  {
         socklen_t salen = sizeof(io_sockaddr_t);  
         register int i;  
         rpc_cli_t *c;          rpc_cli_t *c;
        int ret;        register int i;
        pthread_attr_t attr;        rpc_func_t *f;
        struct pollfd pfd;        struct timespec ts = { RPC_SCHED_POLLING, 0 };
   
         if (!srv) {          if (!srv) {
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start RPC server ...\n");                rpc_SetErr(EINVAL, "Invalid parameter can`t start RPC server");
                 return -1;                  return -1;
         }          }
   
        /* activate BLOB server worker if srv->srv_blob.state == enable */        if (srv->srv_proto == SOCK_STREAM)
        rpc_srv_execBLOBServer(srv);                if (listen(srv->srv_server.cli_sock, array_Size(srv->srv_clients)) == -1) {
                         LOGERR;
                         return -1;
                 }
   
        if (listen(srv->srv_server.cli_sock, SOMAXCONN) == -1) {        if (!schedRead(srv->srv_root, cbProto[srv->srv_proto][CB_ACCEPTCLIENT], srv, 
                LOGERR;                                srv->srv_server.cli_sock, NULL, 0)) {
                 rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                 return -1;                  return -1;
        } else        }
                fcntl(srv->srv_server.cli_sock, F_SETFL, fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK); 
   
        pthread_attr_init(&attr);        schedPolling(srv->srv_root, &ts, NULL);
        pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); 
 
        pfd.fd = srv->srv_server.cli_sock; 
        pfd.events = POLLIN | POLLPRI; 
         /* main rpc loop */          /* main rpc loop */
        while (srv->srv_kill != kill) {        schedRun(srv->srv_root, &srv->srv_kill);
                for (c = srv->srv_clients, i = 0; i < srv->srv_numcli && c; i++, c++)
                        if (!c->cli_sa.sa.sa_family)        /* close all clients connections & server socket */
                                break;        for (i = 0; i < array_Size(srv->srv_clients); i++) {
                if (i >= srv->srv_numcli) {                c = array(srv->srv_clients, i, rpc_cli_t*);
#ifdef HAVE_PTHREAD_YIELD                if (c) {
                        pthread_yield();                        shutdown(c->cli_sock, SHUT_RDWR);
#else                        close(c->cli_sock);
                        usleep(1000000);
#endif                        schedCancelby(srv->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
                        continue;                        ait_freeVars(&RPC_RETVARS(c));
                         AIT_FREE_VAL(&c->cli_buf);
                 }                  }
                   array_Del(srv->srv_clients, i, 42);
           }
           array_Destroy(&srv->srv_clients);
   
                if ((ret = poll(&pfd, 1, srv->srv_session.sess_timeout * 1000)) == -1 ||         close(srv->srv_server.cli_sock);
                                pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) { 
                        LOGERR; 
                        ret = 1; 
                        break; 
                } else if (!ret) 
                        continue; 
   
                c->cli_sock = accept(srv->srv_server.cli_sock, &c->cli_sa.sa, &salen);        /* detach exported calls */
                if (c->cli_sock == -1) {        RPC_FUNCS_LOCK(&srv->srv_funcs);
                        LOGERR;        while ((f = SLIST_FIRST(&srv->srv_funcs))) {
                        continue;                SLIST_REMOVE_HEAD(&srv->srv_funcs, func_next);
                } else 
                        c->cli_parent = srv; 
   
                /* spawn rpc client dispatcher */                AIT_FREE_VAL(&f->func_name);
                c->cli_kill = enable;                e_free(f);
                if (pthread_create(&c->cli_tid, &attr, rpc_srv_dispatchCall, c)) { 
                        LOGERR; 
                        continue; 
                } 
         }          }
           srv->srv_funcs.avlh_root = NULL;
           RPC_FUNCS_UNLOCK(&srv->srv_funcs);
   
        pthread_attr_destroy(&attr);        schedEnd(&srv->srv_root);
         return 0;          return 0;
 }  }
   
 // ---------------------------------------------------------  
   
 /*  /*
  * rpc_srv_execCall() Execute registered call from RPC server   * rpc_srv_execCall() Execute registered call from RPC server
  *   *
 * @call = Register RPC call * @cli = RPC client
  * @rpc = IN RPC call structure   * @rpc = IN RPC call structure
    * @funcname = Execute RPC function
  * @args = IN RPC calling arguments from RPC client   * @args = IN RPC calling arguments from RPC client
  * return: -1 error, !=-1 ok   * return: -1 error, !=-1 ok
  */   */
 int  int
rpc_srv_execCall(rpc_func_t * __restrict call, struct tagRPCCall * __restrict rpc, rpc_srv_execCall(rpc_cli_t * __restrict cli, struct tagRPCCall * __restrict rpc, 
                array_t * __restrict args)                ait_val_t funcname, array_t * __restrict args)
 {  {
         void *dl;  
         rpc_callback_t func;          rpc_callback_t func;
         int ret;  
   
        if (!call || !rpc || !call->func_parent) {        if (!cli || !rpc || !AIT_ADDR(&funcname)) {
                 rpc_SetErr(EINVAL, "Invalid parameter can`t exec function");                  rpc_SetErr(EINVAL, "Invalid parameter can`t exec function");
                 return -1;                  return -1;
         }          }
   
        dl = dlopen(AIT_ADDR(&call->func_file), RTLD_NOW);        func = AIT_GET_LIKE(&funcname, rpc_callback_t);
        if (!dl) {        return func(cli, rpc, args);
                rpc_SetErr(ENOENT, "Can`t attach module %s!", dlerror()); 
                return -1; 
        } 
 
        func = dlsym(dl, (const char*) AIT_GET_STR(&call->func_name)); 
        if (func) 
                ret = func(call, ntohs(rpc->call_argc), args); 
        else { 
                rpc_SetErr(ENOEXEC, "Can`t find function %s!", dlerror()); 
                ret = -1; 
        } 
 
        dlclose(dl); 
        return ret; 
 }  }

Removed from v.1.9  
changed lines
  Added in v.1.14


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>