Diff for /libaitrpc/src/srv.c between versions 1.11 and 1.15

version 1.11, 2012/07/22 20:44:13 version 1.15, 2013/04/02 15:50:14
Line 12  terms: Line 12  terms:
 All of the documentation and software included in the ELWIX and AITNET  All of the documentation and software included in the ELWIX and AITNET
 Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>  Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
   
Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013
         by Michael Pounov <misho@elwix.org>.  All rights reserved.          by Michael Pounov <misho@elwix.org>.  All rights reserved.
   
 Redistribution and use in source and binary forms, with or without  Redistribution and use in source and binary forms, with or without
Line 46  SUCH DAMAGE. Line 46  SUCH DAMAGE.
 #include "global.h"  #include "global.h"
   
   
static void */* SOCK_STREAM */
closeClient(sched_task_t *task)static void *acceptClients(sched_task_t *);
 static void *closeClient(sched_task_t *);
 static void *rxPacket(sched_task_t *);
 static void *txPacket(sched_task_t *);
 
 /* SOCK_DGRAM */
 static void *freeClient(sched_task_t *);
 static void *rxUDPPacket(sched_task_t *);
 static void *txUDPPacket(sched_task_t *);
 
 /* SOCK_RAW */
 
 static sched_task_func_t cbProto[SOCK_RAW + 1][4] = {
         { acceptClients, closeClient, rxPacket, txPacket },     /* SOCK_STREAM */
         { acceptClients, closeClient, rxPacket, txPacket },     /* SOCK_STREAM */
         { rxUDPPacket, freeClient, rxUDPPacket, txUDPPacket },  /* SOCK_DGRAM */
         { NULL, NULL, NULL, NULL }                              /* SOCK_RAW */
 };
 
 
 inline void
 rpc_freeCli(rpc_cli_t * __restrict c)
 {  {
         rpc_cli_t *c = TASK_ARG(task);  
         rpc_srv_t *s = c->cli_parent;          rpc_srv_t *s = c->cli_parent;
   
        schedCancelby(TASK_ROOT(task), taskMAX, CRITERIA_ARG, TASK_ARG(task), NULL);        schedCancelby(s->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
   
         /* close client socket */  
         if (TASK_VAL(task))  
                 shutdown(c->cli_sock, SHUT_RDWR);  
         close(c->cli_sock);  
   
         /* free buffer */          /* free buffer */
         AIT_FREE_VAL(&c->cli_buf);          AIT_FREE_VAL(&c->cli_buf);
   
        io_arrayDel(s->srv_clients, c->cli_id, 0);        array_Del(s->srv_clients, c->cli_id, 0);
         if (c)          if (c)
                io_free(c);                e_free(c);
 }
 
 
 static inline int
 _check4freeslot(rpc_srv_t * __restrict srv, sockaddr_t * __restrict sa)
 {
         rpc_cli_t *c = NULL;
         register int i;
 
         /* check free slots for connect */
         for (i = 0; i < array_Size(srv->srv_clients) && 
                         (c = array(srv->srv_clients, i, rpc_cli_t*)); i++)
                 /* check for duplicates */
                 if (sa && !e_addrcmp(&c->cli_sa, sa, 42))
                         break;
         if (i >= array_Size(srv->srv_clients))
                 return -1;      /* no more free slots! */
 
         return i;
 }
 
 static rpc_cli_t *
 _allocClient(rpc_srv_t * __restrict srv, sockaddr_t * __restrict sa)
 {
         rpc_cli_t *c = NULL;
         int n;
 
         n = _check4freeslot(srv, sa);
         if (n == -1)
                 return NULL;
         else
                 c = array(srv->srv_clients, n, rpc_cli_t*);
 
         if (!c) {
                 c = e_malloc(sizeof(rpc_cli_t));
                 if (!c) {
                         LOGERR;
                         srv->srv_kill = 1;
                         return NULL;
                 } else {
                         memset(c, 0, sizeof(rpc_cli_t));
                         array_Set(srv->srv_clients, n, c);
                         c->cli_id = n;
                         c->cli_parent = srv;
                 }
 
                 /* alloc empty buffer */
                 AIT_SET_BUFSIZ(&c->cli_buf, 0, srv->srv_netbuf);
         }
 
         return c;
 }
 
 
 static void *
 freeClient(sched_task_t *task)
 {
         rpc_freeCli(TASK_ARG(task));
 
         return NULL;          return NULL;
 }  }
   
 static void *  static void *
   closeClient(sched_task_t *task)
   {
           int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock;
   
           rpc_freeCli(TASK_ARG(task));
   
           /* close client socket */
           shutdown(sock, SHUT_RDWR);
           close(sock);
           return NULL;
   }
   
   static void *
 txPacket(sched_task_t *task)  txPacket(sched_task_t *task)
 {  {
         rpc_cli_t *c = TASK_ARG(task);          rpc_cli_t *c = TASK_ARG(task);
Line 89  txPacket(sched_task_t *task) Line 176  txPacket(sched_task_t *task)
                         rpc->call_rep.ret = RPC_ERROR(-1);                          rpc->call_rep.ret = RPC_ERROR(-1);
                         rpc->call_rep.eno = RPC_ERROR(rpc_Errno);                          rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                 } else {                  } else {
                        rpc->call_argc = htons(io_arraySize(RPC_RETVARS(c)));                        rpc->call_argc = htons(array_Size(RPC_RETVARS(c)));
                         /* Go Encapsulate variables */                          /* Go Encapsulate variables */
                        ret = io_vars2buffer(buf + wlen, sizeof buf - wlen, RPC_RETVARS(c));                        ret = ait_vars2buffer(buf + wlen, sizeof buf - wlen, RPC_RETVARS(c));
                         /* Free return values */                          /* Free return values */
                        io_freeVars(&c->cli_vars);                        ait_freeVars(&c->cli_vars);
                         if (ret == -1) {                          if (ret == -1) {
                                 rpc_SetErr(EBADRPC, "Prepare RPC packet failed");                                  rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
                                 rpc->call_argc ^= rpc->call_argc;                                  rpc->call_argc ^= rpc->call_argc;
Line 106  txPacket(sched_task_t *task) Line 193  txPacket(sched_task_t *task)
   
         rpc->call_len = htons(wlen);          rpc->call_len = htons(wlen);
   
   #if 0
         /* calculate CRC */          /* calculate CRC */
         rpc->call_crc ^= rpc->call_crc;          rpc->call_crc ^= rpc->call_crc;
         rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));          rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
   #endif
   
         /* send reply */          /* send reply */
         ret = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);          ret = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);
         if (ret == -1 || ret != wlen) {          if (ret == -1 || ret != wlen) {
                 /* close connection */                  /* close connection */
                schedEvent(TASK_ROOT(task), closeClient, c, 42, NULL, 0);                schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                 TASK_ARG(task), 0, NULL, 0);
         }          }
   
         return NULL;          return NULL;
Line 133  execCall(sched_task_t *task) Line 223  execCall(sched_task_t *task)
   
         /* Go decapsulate variables ... */          /* Go decapsulate variables ... */
         if (argc) {          if (argc) {
                arr = io_buffer2vars(buf + sizeof(struct tagRPCCall),                 arr = ait_buffer2vars(buf + sizeof(struct tagRPCCall), 
                                 AIT_LEN(&c->cli_buf) - TASK_VAL(task) - sizeof(struct tagRPCCall),                                   AIT_LEN(&c->cli_buf) - TASK_VAL(task) - sizeof(struct tagRPCCall), 
                                argc, 1);                                argc, 42);
                 if (!arr) {                  if (!arr) {
                        rpc_SetErr(ERPCMISMATCH, "#%d - %s", io_GetErrno(), io_GetError());                        rpc_SetErr(ERPCMISMATCH, "#%d - %s", elwix_GetErrno(), elwix_GetError());
                         rpc->call_argc ^= rpc->call_argc;                          rpc->call_argc ^= rpc->call_argc;
                         rpc->call_rep.ret = RPC_ERROR(-1);                          rpc->call_rep.ret = RPC_ERROR(-1);
                         rpc->call_rep.eno = RPC_ERROR(rpc_Errno);                          rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
Line 162  execCall(sched_task_t *task) Line 252  execCall(sched_task_t *task)
                         rpc->call_rep.eno ^= rpc->call_rep.eno;                          rpc->call_rep.eno ^= rpc->call_rep.eno;
                         if (argc) {                          if (argc) {
                                 /* without reply */                                  /* without reply */
                                io_freeVars(&c->cli_vars);                                ait_freeVars(&c->cli_vars);
                                 rpc->call_argc ^= rpc->call_argc;                                  rpc->call_argc ^= rpc->call_argc;
                         } else {                          } else {
                                 /* reply */                                  /* reply */
                                rpc->call_argc = htons(io_arraySize(RPC_RETVARS(c)));                                rpc->call_argc = htons(array_Size(RPC_RETVARS(c)));
                         }                          }
                 }                  }
         }          }
   
        io_arrayDestroy(&arr);        array_Destroy(&arr);
         return NULL;          return NULL;
 }  }
   
Line 181  rxPacket(sched_task_t *task) Line 271  rxPacket(sched_task_t *task)
         rpc_cli_t *c = TASK_ARG(task);          rpc_cli_t *c = TASK_ARG(task);
         rpc_srv_t *s = c->cli_parent;          rpc_srv_t *s = c->cli_parent;
         int len, rlen, noreply;          int len, rlen, noreply;
        u_short crc, off = TASK_DATLEN(task);        u_short off = TASK_DATLEN(task);
 #if 0
         u_short crc;
 #endif
         u_char *buf = AIT_GET_BUF(&c->cli_buf);          u_char *buf = AIT_GET_BUF(&c->cli_buf);
         struct tagRPCCall *rpc;          struct tagRPCCall *rpc;
   
        memset(buf, 0, AIT_LEN(&c->cli_buf));        if (!off)
                 memset(buf, 0, AIT_LEN(&c->cli_buf));
         rlen = recv(TASK_FD(task), buf + off, AIT_LEN(&c->cli_buf) - off, 0);          rlen = recv(TASK_FD(task), buf + off, AIT_LEN(&c->cli_buf) - off, 0);
         if (rlen < 1) {          if (rlen < 1) {
                 /* close connection */                  /* close connection */
                schedEvent(TASK_ROOT(task), closeClient, c, 42, NULL, 0);                schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                 TASK_ARG(task), 0, NULL, 0);
                 return NULL;                  return NULL;
         } else {          } else {
                 rlen += off;    /* add reminded bytes from previous rxPacket, if exists! */                  rlen += off;    /* add reminded bytes from previous rxPacket, if exists! */
Line 218  rxPacket(sched_task_t *task) Line 313  rxPacket(sched_task_t *task)
                         break;                          break;
                 }                  }
   
   #if 0
                 /* check integrity of packet */                  /* check integrity of packet */
                 crc = ntohs(rpc->call_crc);                  crc = ntohs(rpc->call_crc);
                 rpc->call_crc ^= rpc->call_crc;                  rpc->call_crc ^= rpc->call_crc;
Line 228  rxPacket(sched_task_t *task) Line 324  rxPacket(sched_task_t *task)
                         /* try next packet remaining into buffer */                          /* try next packet remaining into buffer */
                         continue;                          continue;
                 }                  }
   #endif
   
                 noreply = RPC_CHK_NOREPLY(rpc);                  noreply = RPC_CHK_NOREPLY(rpc);
   
Line 244  rxPacket(sched_task_t *task) Line 341  rxPacket(sched_task_t *task)
   
                 /* send RPC reply */                  /* send RPC reply */
                 if (!noreply)                  if (!noreply)
                        schedWrite(TASK_ROOT(task), txPacket, TASK_ARG(task), TASK_FD(task), rpc, len);                        schedWrite(TASK_ROOT(task), cbProto[s->srv_proto][CB_TXPACKET], 
                                         TASK_ARG(task), TASK_FD(task), rpc, len);
   
                 off += len;                  off += len;
         } while (rlen > 0);          } while (rlen > 0);
Line 259  acceptClients(sched_task_t *task) Line 357  acceptClients(sched_task_t *task)
 {  {
         rpc_srv_t *srv = TASK_ARG(task);          rpc_srv_t *srv = TASK_ARG(task);
         rpc_cli_t *c = NULL;          rpc_cli_t *c = NULL;
        register int i;        socklen_t salen = sizeof(sockaddr_t);
        socklen_t salen = sizeof(io_sockaddr_t); 
   
        /* check free slots for connect */        c = _allocClient(srv, NULL);
        for (i = 0; i < io_arraySize(srv->srv_clients) &&         if (!c)
                        (c = io_array(srv->srv_clients, i, rpc_cli_t*)); i++); 
        if (c)      /* no more free slots! */ 
                 goto end;                  goto end;
         c = io_malloc(sizeof(rpc_cli_t));  
         if (!c) {  
                 LOGERR;  
                 srv->srv_kill = 1;  
                 return NULL;  
         } else {  
                 memset(c, 0, sizeof(rpc_cli_t));  
                 io_arraySet(srv->srv_clients, i, c);  
                 c->cli_id = i;  
                 c->cli_parent = srv;  
         }  
   
         /* alloc empty buffer */  
         AIT_SET_BUF2(&c->cli_buf, 0, srv->srv_netbuf);  
   
         /* accept client */          /* accept client */
         c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);          c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
         if (c->cli_sock == -1) {          if (c->cli_sock == -1) {
                 LOGERR;                  LOGERR;
                 AIT_FREE_VAL(&c->cli_buf);                  AIT_FREE_VAL(&c->cli_buf);
                io_arrayDel(srv->srv_clients, i, 42);                array_Del(srv->srv_clients, c->cli_id, 42);
                 goto end;                  goto end;
         } else          } else
                 fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);                  fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
   
        schedRead(TASK_ROOT(task), rxPacket, c, c->cli_sock, NULL, 0);        schedRead(TASK_ROOT(task), cbProto[srv->srv_proto][CB_RXPACKET], c, 
                         c->cli_sock, NULL, 0);
 end:  end:
         schedReadSelf(task);          schedReadSelf(task);
         return NULL;          return NULL;
 }  }
   
 /* ------------------------------------------------------ */  
   
 static void *  static void *
closeBLOBClient(sched_task_t *task)txUDPPacket(sched_task_t *task)
 {  {
         rpc_cli_t *c = TASK_ARG(task);          rpc_cli_t *c = TASK_ARG(task);
         rpc_srv_t *s = c->cli_parent;          rpc_srv_t *s = c->cli_parent;
           rpc_func_t *f = NULL;
           u_char buf[USHRT_MAX] = { 0 };
           struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
           int ret, wlen = sizeof(struct tagRPCCall);
           struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
   
        schedCancelby(TASK_ROOT(task), taskMAX, CRITERIA_ARG, TASK_ARG(task), NULL);        schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
         schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                          TASK_ARG(task), ts, TASK_ARG(task), 0);
   
        /* close client socket */        /* copy RPC header */
        if (TASK_VAL(task))        memcpy(buf, TASK_DATA(task), wlen);
                shutdown(c->cli_sock, SHUT_RDWR); 
        close(c->cli_sock); 
   
           if (rpc->call_argc) {
                   f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
                   if (!f) {
                           rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
                           rpc->call_argc ^= rpc->call_argc;
                           rpc->call_rep.ret = RPC_ERROR(-1);
                           rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                   } else {
                           rpc->call_argc = htons(array_Size(RPC_RETVARS(c)));
                           /* Go Encapsulate variables */
                           ret = ait_vars2buffer(buf + wlen, sizeof buf - wlen, RPC_RETVARS(c));
                           /* Free return values */
                           ait_freeVars(&c->cli_vars);
                           if (ret == -1) {
                                   rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
                                   rpc->call_argc ^= rpc->call_argc;
                                   rpc->call_rep.ret = RPC_ERROR(-1);
                                   rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                           } else
                                   wlen += ret;
                   }
           }
   
           rpc->call_len = htons(wlen);
   
           /* calculate CRC */
           rpc->call_crc ^= rpc->call_crc;
           rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
   
           /* send reply */
           ret = sendto(TASK_FD(task), buf, wlen, MSG_NOSIGNAL, 
                           &c->cli_sa.sa, c->cli_sa.sa.sa_len);
           if (ret == -1 || ret != wlen) {
                   /* close connection */
                   schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                    TASK_ARG(task), 0, NULL, 0);
           }
   
           return NULL;
   }
   
   static void *
   rxUDPPacket(sched_task_t *task)
   {
           rpc_srv_t *srv = TASK_ARG(task);
           rpc_cli_t *c = NULL;
           int len, rlen, noreply;
           u_short crc, off = 0;
           u_char buf[USHRT_MAX + 1];
           struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
           sockaddr_t sa;
           socklen_t salen;
           struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
   
           /* receive connect packet */
           salen = sa.ss.ss_len = sizeof(sockaddr_t);
           rlen = recvfrom(TASK_FD(task), buf, sizeof buf, 0, &sa.sa, &salen);
           if (rlen < 1)
                   goto end;
   
           c = _allocClient(srv, &sa);
           if (!c)
                   goto end;
           else {
                   c->cli_sock = TASK_FD(task);
                   memcpy(&c->cli_sa, &sa, sizeof c->cli_sa);
                   memcpy(AIT_GET_BUF(&c->cli_buf), buf, AIT_LEN(&c->cli_buf));
                   /* armed timer for close stateless connection */
                   schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
                   schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], 
                                   c, ts, c, 0);
           }
   
           do {
                   /* check RPC packet */
                   if (rlen < sizeof(struct tagRPCCall)) {
                           rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
                           break;
                   } else
                           rpc = (struct tagRPCCall*) (AIT_GET_BUF(&c->cli_buf) + off);
   
                   len = ntohs(rpc->call_len);
                   rlen -= len;
   
                   /* check RPC packet lengths */
                   if (rlen < 0 || len < sizeof(struct tagRPCCall)) {
                           rpc_SetErr(ERPCMISMATCH, "Broken RPC packet length");
                           /* skip entire packet */
                           break;
                   }
   
                   /* check integrity of packet */
                   crc = ntohs(rpc->call_crc);
                   rpc->call_crc ^= rpc->call_crc;
                   if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
                           rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
   
                           off += len;
                           /* try next packet remaining into buffer */
                           continue;
                   }
   
                   noreply = RPC_CHK_NOREPLY(rpc);
   
                   /* check RPC packet session info */
                   if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
                           rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
                           rpc->call_argc ^= rpc->call_argc;
                           rpc->call_rep.ret = RPC_ERROR(-1);
                           rpc->call_rep.eno = RPC_ERROR(errno);
                   } else {
                           /* execute RPC call */
                           schedEvent(TASK_ROOT(task), execCall, c, off, NULL, 0);
                   }
   
                   /* send RPC reply */
                   if (!noreply)
                           schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], 
                                           c, TASK_FD(task), rpc, len);
   
                   off += len;
           } while (rlen > 0);
   
   end:
           schedReadSelf(task);
           return NULL;
   }
   
   /* ------------------------------------------------------ */
   
   inline void
   rpc_freeBLOBCli(rpc_cli_t * __restrict c)
   {
           rpc_srv_t *s = c->cli_parent;
   
           schedCancelby(s->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
   
         /* free buffer */          /* free buffer */
         AIT_FREE_VAL(&c->cli_buf);          AIT_FREE_VAL(&c->cli_buf);
   
        io_arrayDel(s->srv_blob.clients, c->cli_id, 0);        array_Del(s->srv_blob.clients, c->cli_id, 0);
         if (c)          if (c)
                io_free(c);                e_free(c);
 }
 
 
 static void *
 closeBLOBClient(sched_task_t *task)
 {
         int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock;
 
         rpc_freeBLOBCli(TASK_ARG(task));
 
         /* close client socket */
         shutdown(sock, SHUT_RDWR);
         close(sock);
         return NULL;          return NULL;
 }  }
   
Line 327  txBLOB(sched_task_t *task) Line 563  txBLOB(sched_task_t *task)
 {  {
         rpc_cli_t *c = TASK_ARG(task);          rpc_cli_t *c = TASK_ARG(task);
         u_char *buf = AIT_GET_BUF(&c->cli_buf);          u_char *buf = AIT_GET_BUF(&c->cli_buf);
         struct tagBLOBHdr *blob = (struct tagBLOBHdr *) buf;  
         int wlen = sizeof(struct tagBLOBHdr);          int wlen = sizeof(struct tagBLOBHdr);
   
         /* calculate CRC */  
         blob->hdr_crc ^= blob->hdr_crc;  
         blob->hdr_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));  
   
         /* send reply */          /* send reply */
         wlen = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);          wlen = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);
         if (wlen == -1 || wlen != sizeof(struct tagBLOBHdr)) {          if (wlen == -1 || wlen != sizeof(struct tagBLOBHdr)) {
Line 352  rxBLOB(sched_task_t *task) Line 583  rxBLOB(sched_task_t *task)
         rpc_blob_t *b;          rpc_blob_t *b;
         struct tagBLOBHdr blob;          struct tagBLOBHdr blob;
         int rlen;          int rlen;
         u_short crc;  
   
         memset(&blob, 0, sizeof blob);          memset(&blob, 0, sizeof blob);
         rlen = recv(TASK_FD(task), &blob, sizeof blob, 0);          rlen = recv(TASK_FD(task), &blob, sizeof blob, 0);
Line 370  rxBLOB(sched_task_t *task) Line 600  rxBLOB(sched_task_t *task)
                 return NULL;                  return NULL;
         }          }
   
         /* check integrity of packet */  
         crc = ntohs(blob.hdr_crc);  
         blob.hdr_crc ^= blob.hdr_crc;  
         if (crc != crcFletcher16((u_short*) &blob, rlen / 2)) {  
                 rpc_SetErr(ERPCMISMATCH, "Bad CRC BLOB packet");  
   
                 schedReadSelf(task);  
                 return NULL;  
         }  
   
         /* check RPC packet session info */          /* check RPC packet session info */
        if ((crc = rpc_chkPktSession(&blob.hdr_session, &s->srv_session))) {        if (rpc_chkPktSession(&blob.hdr_session, &s->srv_session)) {
                 rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");                  rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
                 blob.hdr_cmd = error;                  blob.hdr_cmd = error;
                 goto end;                  goto end;
Line 445  acceptBLOBClients(sched_task_t *task) Line 665  acceptBLOBClients(sched_task_t *task)
         rpc_srv_t *srv = TASK_ARG(task);          rpc_srv_t *srv = TASK_ARG(task);
         rpc_cli_t *c = NULL;          rpc_cli_t *c = NULL;
         register int i;          register int i;
        socklen_t salen = sizeof(io_sockaddr_t);        socklen_t salen = sizeof(sockaddr_t);
 #ifdef TCP_NOPUSH
         int n = 1;
 #endif
   
         /* check free slots for connect */          /* check free slots for connect */
        for (i = 0; i < io_arraySize(srv->srv_blob.clients) &&         for (i = 0; i < array_Size(srv->srv_blob.clients) && 
                        (c = io_array(srv->srv_blob.clients, i, rpc_cli_t*)); i++);                        (c = array(srv->srv_blob.clients, i, rpc_cli_t*)); i++);
         if (c)  /* no more free slots! */          if (c)  /* no more free slots! */
                 goto end;                  goto end;
        c = io_malloc(sizeof(rpc_cli_t));        c = e_malloc(sizeof(rpc_cli_t));
         if (!c) {          if (!c) {
                 LOGERR;                  LOGERR;
                 srv->srv_kill = srv->srv_blob.kill = 1;                  srv->srv_kill = srv->srv_blob.kill = 1;
                 return NULL;                  return NULL;
         } else {          } else {
                 memset(c, 0, sizeof(rpc_cli_t));                  memset(c, 0, sizeof(rpc_cli_t));
                io_arraySet(srv->srv_blob.clients, i, c);                array_Set(srv->srv_blob.clients, i, c);
                 c->cli_id = i;                  c->cli_id = i;
                 c->cli_parent = srv;                  c->cli_parent = srv;
         }          }
   
         /* alloc empty buffer */          /* alloc empty buffer */
        AIT_SET_BUF2(&c->cli_buf, 0, srv->srv_netbuf);        AIT_SET_BUFSIZ(&c->cli_buf, 0, srv->srv_netbuf);
   
         /* accept client */          /* accept client */
         c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);          c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
         if (c->cli_sock == -1) {          if (c->cli_sock == -1) {
                 LOGERR;                  LOGERR;
                 AIT_FREE_VAL(&c->cli_buf);                  AIT_FREE_VAL(&c->cli_buf);
                io_arrayDel(srv->srv_blob.clients, i, 42);                array_Del(srv->srv_blob.clients, i, 42);
                 goto end;                  goto end;
        } else        } else {
 #ifdef TCP_NOPUSH
                 setsockopt(c->cli_sock, IPPROTO_TCP, TCP_NOPUSH, &n, sizeof n);
 #endif
                 fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);                  fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
           }
   
         schedRead(TASK_ROOT(task), rxBLOB, c, c->cli_sock, NULL, 0);          schedRead(TASK_ROOT(task), rxBLOB, c, c->cli_sock, NULL, 0);
 end:  end:
Line 515  rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s Line 742  rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s
   
         srv->srv_blob.server.cli_parent = srv;          srv->srv_blob.server.cli_parent = srv;
   
        memcpy(&srv->srv_blob.server.cli_sa, &srv->srv_server.cli_sa, sizeof(io_sockaddr_t));        memcpy(&srv->srv_blob.server.cli_sa, &srv->srv_server.cli_sa, sizeof(sockaddr_t));
         switch (srv->srv_blob.server.cli_sa.sa.sa_family) {          switch (srv->srv_blob.server.cli_sa.sa.sa_family) {
                 case AF_INET:                  case AF_INET:
                         srv->srv_blob.server.cli_sa.sin.sin_port =                           srv->srv_blob.server.cli_sa.sin.sin_port = 
Line 566  rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s Line 793  rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s
                 close(srv->srv_blob.server.cli_sock);                  close(srv->srv_blob.server.cli_sock);
                 AIT_FREE_VAL(&srv->srv_blob.dir);                  AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;                  return -1;
        }        } else
                 fcntl(srv->srv_blob.server.cli_sock, F_SETFL, 
                                 fcntl(srv->srv_blob.server.cli_sock, F_GETFL) | O_NONBLOCK);
   
   
         /* allocate pool for concurent blob clients */          /* allocate pool for concurent blob clients */
        srv->srv_blob.clients = io_arrayInit(io_arraySize(srv->srv_clients));        srv->srv_blob.clients = array_Init(array_Size(srv->srv_clients));
         if (!srv->srv_blob.clients) {          if (!srv->srv_blob.clients) {
                rpc_SetErr(io_GetErrno(), "%s", io_GetError());                rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
                 close(srv->srv_blob.server.cli_sock);                  close(srv->srv_blob.server.cli_sock);
                 AIT_FREE_VAL(&srv->srv_blob.dir);                  AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;                  return -1;
Line 581  rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s Line 811  rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s
         srv->srv_blob.root = schedBegin();          srv->srv_blob.root = schedBegin();
         if (!srv->srv_blob.root) {          if (!srv->srv_blob.root) {
                 rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());                  rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                io_arrayDestroy(&srv->srv_blob.clients);                array_Destroy(&srv->srv_blob.clients);
                 close(srv->srv_blob.server.cli_sock);                  close(srv->srv_blob.server.cli_sock);
                 AIT_FREE_VAL(&srv->srv_blob.dir);                  AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;                  return -1;
Line 624  rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv) Line 854  rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv)
                 return -1;                  return -1;
         }          }
   
        fcntl(srv->srv_blob.server.cli_sock, F_SETFL,         if (listen(srv->srv_blob.server.cli_sock, array_Size(srv->srv_blob.clients)) == -1) {
                        fcntl(srv->srv_blob.server.cli_sock, F_GETFL) | O_NONBLOCK); 
 
        if (listen(srv->srv_blob.server.cli_sock, io_arraySize(srv->srv_blob.clients)) == -1) { 
                 LOGERR;                  LOGERR;
                 return -1;                  return -1;
         }          }
Line 643  rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv) Line 870  rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv)
         schedRun(srv->srv_blob.root, &srv->srv_blob.kill);          schedRun(srv->srv_blob.root, &srv->srv_blob.kill);
   
         /* close all clients connections & server socket */          /* close all clients connections & server socket */
        for (i = 0; i < io_arraySize(srv->srv_blob.clients); i++) {        for (i = 0; i < array_Size(srv->srv_blob.clients); i++) {
                c = io_array(srv->srv_blob.clients, i, rpc_cli_t*);                c = array(srv->srv_blob.clients, i, rpc_cli_t*);
                 if (c) {                  if (c) {
                         shutdown(c->cli_sock, SHUT_RDWR);                          shutdown(c->cli_sock, SHUT_RDWR);
                         close(c->cli_sock);                          close(c->cli_sock);
Line 652  rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv) Line 879  rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv)
                         schedCancelby(srv->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);                          schedCancelby(srv->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
                         AIT_FREE_VAL(&c->cli_buf);                          AIT_FREE_VAL(&c->cli_buf);
                 }                  }
                io_arrayDel(srv->srv_blob.clients, i, 42);                array_Del(srv->srv_blob.clients, i, 42);
         }          }
        io_arrayDestroy(&srv->srv_blob.clients);        array_Destroy(&srv->srv_blob.clients);
   
         close(srv->srv_blob.server.cli_sock);          close(srv->srv_blob.server.cli_sock);
   
Line 663  rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv) Line 890  rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv)
                 TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);                  TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
   
                 rpc_srv_blobFree(srv, b);                  rpc_srv_blobFree(srv, b);
                io_free(b);                e_free(b);
         }          }
   
         schedEnd(&srv->srv_blob.root);          schedEnd(&srv->srv_blob.root);
Line 675  rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv) Line 902  rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv)
 /*  /*
  * rpc_srv_initServer() - Init & create RPC Server   * rpc_srv_initServer() - Init & create RPC Server
  *   *
 * @regProgID = ProgramID for authentication & recognition * @InstID = Instance for authentication & recognition
 * @regProcID = ProcessID for authentication & recognition 
  * @concurentClients = Concurent clients at same time to this server   * @concurentClients = Concurent clients at same time to this server
  * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)   * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
  * @csHost = Host name or address for bind server, if NULL any address   * @csHost = Host name or address for bind server, if NULL any address
  * @Port = Port for bind server, if Port == 0 default port is selected   * @Port = Port for bind server, if Port == 0 default port is selected
    * @proto = Protocol, if == 0 choose SOCK_STREAM
  * return: NULL == error or !=NULL bind and created RPC server instance   * return: NULL == error or !=NULL bind and created RPC server instance
  */   */
 rpc_srv_t *  rpc_srv_t *
rpc_srv_initServer(u_int regProgID, u_char regProcID, int concurentClients, rpc_srv_initServer(u_char InstID, int concurentClients, int netBuf, 
                int netBuf, const char *csHost, u_short Port)                const char *csHost, u_short Port, int proto)
 {  {
         int n = 1;          int n = 1;
         rpc_srv_t *srv = NULL;          rpc_srv_t *srv = NULL;
        io_sockaddr_t sa = IO_SOCKADDR_INIT;        sockaddr_t sa = E_SOCKADDR_INIT;
   
        if (!concurentClients || !regProgID) {        if (!concurentClients || (proto < 0 || proto > SOCK_DGRAM)) {
                 rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");                  rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
                 return NULL;                  return NULL;
         }          }
        if (!io_gethostbyname(csHost, Port, &sa))        if (!e_gethostbyname(csHost, Port, &sa))
                 return NULL;                  return NULL;
         if (!Port)          if (!Port)
                 Port = RPC_DEFPORT;                  Port = RPC_DEFPORT;
           if (!proto)
                   proto = SOCK_STREAM;
         if (netBuf < RPC_MIN_BUFSIZ)          if (netBuf < RPC_MIN_BUFSIZ)
                 netBuf = BUFSIZ;                  netBuf = BUFSIZ;
         else          else
                netBuf = io_align(netBuf, 1);      /* align netBuf length */                netBuf = E_ALIGN(netBuf, 2);      /* align netBuf length */
   
 #ifdef HAVE_SRANDOMDEV  #ifdef HAVE_SRANDOMDEV
         srandomdev();          srandomdev();
Line 712  rpc_srv_initServer(u_int regProgID, u_char regProcID,  Line 941  rpc_srv_initServer(u_int regProgID, u_char regProcID, 
         srandom((time(&tim) ^ getpid()));          srandom((time(&tim) ^ getpid()));
 #endif  #endif
   
        srv = io_malloc(sizeof(rpc_srv_t));        srv = e_malloc(sizeof(rpc_srv_t));
         if (!srv) {          if (!srv) {
                 LOGERR;                  LOGERR;
                 return NULL;                  return NULL;
         } else          } else
                 memset(srv, 0, sizeof(rpc_srv_t));                  memset(srv, 0, sizeof(rpc_srv_t));
   
           srv->srv_proto = proto;
         srv->srv_netbuf = netBuf;          srv->srv_netbuf = netBuf;
         srv->srv_session.sess_version = RPC_VERSION;          srv->srv_session.sess_version = RPC_VERSION;
        srv->srv_session.sess_program = regProgID;        srv->srv_session.sess_instance = InstID;
        srv->srv_session.sess_process = regProcID; 
   
         srv->srv_server.cli_parent = srv;          srv->srv_server.cli_parent = srv;
         memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa);          memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa);
   
        /* init functions list */        /* init functions */
        TAILQ_INIT(&srv->srv_funcs);        pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
         SLIST_INIT(&srv->srv_funcs);
         AVL_INIT(&srv->srv_funcs);
   
         /* init scheduler */          /* init scheduler */
         srv->srv_root = schedBegin();          srv->srv_root = schedBegin();
         if (!srv->srv_root) {          if (!srv->srv_root) {
                 rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());                  rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                io_free(srv);                pthread_mutex_destroy(&srv->srv_funcs.mtx);
                 e_free(srv);
                 return NULL;                  return NULL;
         }          }
   
         /* init pool for clients */          /* init pool for clients */
        srv->srv_clients = io_arrayInit(concurentClients);        srv->srv_clients = array_Init(concurentClients);
         if (!srv->srv_clients) {          if (!srv->srv_clients) {
                rpc_SetErr(io_GetErrno(), "%s", io_GetError());                rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
                 schedEnd(&srv->srv_root);                  schedEnd(&srv->srv_root);
                io_free(srv);                pthread_mutex_destroy(&srv->srv_funcs.mtx);
                 e_free(srv);
                 return NULL;                  return NULL;
         }          }
   
         /* create server socket */          /* create server socket */
        srv->srv_server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);        srv->srv_server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, srv->srv_proto, 0);
         if (srv->srv_server.cli_sock == -1) {          if (srv->srv_server.cli_sock == -1) {
                 LOGERR;                  LOGERR;
                io_arrayDestroy(&srv->srv_clients);                array_Destroy(&srv->srv_clients);
                 schedEnd(&srv->srv_root);                  schedEnd(&srv->srv_root);
                io_free(srv);                pthread_mutex_destroy(&srv->srv_funcs.mtx);
                 e_free(srv);
                 return NULL;                  return NULL;
         }          }
         if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {          if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
Line 773  rpc_srv_initServer(u_int regProgID, u_char regProcID,  Line 1007  rpc_srv_initServer(u_int regProgID, u_char regProcID, 
                                 srv->srv_server.cli_sa.sa.sa_len) == -1) {                                  srv->srv_server.cli_sa.sa.sa_len) == -1) {
                 LOGERR;                  LOGERR;
                 goto err;                  goto err;
        }        } else
                 fcntl(srv->srv_server.cli_sock, F_SETFL, 
                                 fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
   
         rpc_register_srvPing(srv);          rpc_register_srvPing(srv);
   
         return srv;          return srv;
 err:    /* error condition */  err:    /* error condition */
         close(srv->srv_server.cli_sock);          close(srv->srv_server.cli_sock);
        io_arrayDestroy(&srv->srv_clients);        array_Destroy(&srv->srv_clients);
         schedEnd(&srv->srv_root);          schedEnd(&srv->srv_root);
        io_free(srv);        pthread_mutex_destroy(&srv->srv_funcs.mtx);
         e_free(srv);
         return NULL;          return NULL;
 }  }
   
Line 805  rpc_srv_endServer(rpc_srv_t ** __restrict psrv) Line 1042  rpc_srv_endServer(rpc_srv_t ** __restrict psrv)
         (*psrv)->srv_kill = 1;          (*psrv)->srv_kill = 1;
         sleep(RPC_SCHED_POLLING);          sleep(RPC_SCHED_POLLING);
   
        io_free(*psrv);        pthread_mutex_destroy(&(*psrv)->srv_funcs.mtx);
         e_free(*psrv);
         *psrv = NULL;          *psrv = NULL;
 }  }
   
Line 820  rpc_srv_loopServer(rpc_srv_t * __restrict srv) Line 1058  rpc_srv_loopServer(rpc_srv_t * __restrict srv)
 {  {
         rpc_cli_t *c;          rpc_cli_t *c;
         register int i;          register int i;
        rpc_func_t *f, *tmp;        rpc_func_t *f;
         struct timespec ts = { RPC_SCHED_POLLING, 0 };          struct timespec ts = { RPC_SCHED_POLLING, 0 };
   
         if (!srv) {          if (!srv) {
Line 828  rpc_srv_loopServer(rpc_srv_t * __restrict srv) Line 1066  rpc_srv_loopServer(rpc_srv_t * __restrict srv)
                 return -1;                  return -1;
         }          }
   
        fcntl(srv->srv_server.cli_sock, F_SETFL,         if (srv->srv_proto == SOCK_STREAM)
                        fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);                if (listen(srv->srv_server.cli_sock, array_Size(srv->srv_clients)) == -1) {
                         LOGERR;
                         return -1;
                 }
   
        if (listen(srv->srv_server.cli_sock, io_arraySize(srv->srv_clients)) == -1) {        if (!schedRead(srv->srv_root, cbProto[srv->srv_proto][CB_ACCEPTCLIENT], srv, 
                LOGERR;                                srv->srv_server.cli_sock, NULL, 0)) {
                return -1; 
        } 
 
        if (!schedRead(srv->srv_root, acceptClients, srv, srv->srv_server.cli_sock, NULL, 0)) { 
                 rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());                  rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                 return -1;                  return -1;
         }          }
Line 846  rpc_srv_loopServer(rpc_srv_t * __restrict srv) Line 1083  rpc_srv_loopServer(rpc_srv_t * __restrict srv)
         schedRun(srv->srv_root, &srv->srv_kill);          schedRun(srv->srv_root, &srv->srv_kill);
   
         /* close all clients connections & server socket */          /* close all clients connections & server socket */
        for (i = 0; i < io_arraySize(srv->srv_clients); i++) {        for (i = 0; i < array_Size(srv->srv_clients); i++) {
                c = io_array(srv->srv_clients, i, rpc_cli_t*);                c = array(srv->srv_clients, i, rpc_cli_t*);
                 if (c) {                  if (c) {
                         shutdown(c->cli_sock, SHUT_RDWR);                          shutdown(c->cli_sock, SHUT_RDWR);
                         close(c->cli_sock);                          close(c->cli_sock);
   
                         schedCancelby(srv->srv_root, taskMAX, CRITERIA_ARG, c, NULL);                          schedCancelby(srv->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
                        io_freeVars(&RPC_RETVARS(c));                        ait_freeVars(&RPC_RETVARS(c));
                         AIT_FREE_VAL(&c->cli_buf);                          AIT_FREE_VAL(&c->cli_buf);
                 }                  }
                io_arrayDel(srv->srv_clients, i, 42);                array_Del(srv->srv_clients, i, 42);
         }          }
        io_arrayDestroy(&srv->srv_clients);        array_Destroy(&srv->srv_clients);
   
         close(srv->srv_server.cli_sock);          close(srv->srv_server.cli_sock);
   
         /* detach exported calls */          /* detach exported calls */
        TAILQ_FOREACH_SAFE(f, &srv->srv_funcs, func_node, tmp) {        RPC_FUNCS_LOCK(&srv->srv_funcs);
                TAILQ_REMOVE(&srv->srv_funcs, f, func_node);        while ((f = SLIST_FIRST(&srv->srv_funcs))) {
                 SLIST_REMOVE_HEAD(&srv->srv_funcs, func_next);
   
                 AIT_FREE_VAL(&f->func_name);                  AIT_FREE_VAL(&f->func_name);
                io_free(f);                e_free(f);
         }          }
           srv->srv_funcs.avlh_root = NULL;
           RPC_FUNCS_UNLOCK(&srv->srv_funcs);
   
         schedEnd(&srv->srv_root);          schedEnd(&srv->srv_root);
         return 0;          return 0;

Removed from v.1.11  
changed lines
  Added in v.1.15


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>