Diff for /libaitrpc/src/srv.c between versions 1.12 and 1.13

version 1.12, 2012/11/13 09:22:10 version 1.13, 2012/11/19 21:50:26
Line 46  SUCH DAMAGE. Line 46  SUCH DAMAGE.
 #include "global.h"  #include "global.h"
   
   
static void */* SOCK_STREAM */
closeClient(sched_task_t *task)static void *acceptClients(sched_task_t *);
 static void *closeClient(sched_task_t *);
 static void *rxPacket(sched_task_t *);
 static void *txPacket(sched_task_t *);
 
 /* SOCK_DGRAM */
 static void *freeClient(sched_task_t *);
 static void *rxUDPPacket(sched_task_t *);
 static void *txUDPPacket(sched_task_t *);
 
 /* SOCK_RAW */
 
 static sched_task_func_t cbProto[SOCK_RAW + 1][4] = {
         { acceptClients, closeClient, rxPacket, txPacket },     /* SOCK_STREAM */
         { acceptClients, closeClient, rxPacket, txPacket },     /* SOCK_STREAM */
         { rxUDPPacket, freeClient, rxUDPPacket, txUDPPacket },  /* SOCK_DGRAM */
         { NULL, NULL, NULL, NULL }                              /* SOCK_RAW */
 };
 
 
 inline void
 rpc_freeCli(rpc_cli_t * __restrict c)
 {  {
         rpc_cli_t *c = TASK_ARG(task);  
         rpc_srv_t *s = c->cli_parent;          rpc_srv_t *s = c->cli_parent;
   
        schedCancelby(TASK_ROOT(task), taskMAX, CRITERIA_ARG, TASK_ARG(task), NULL);        schedCancelby(s->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
   
         /* close client socket */  
         if (TASK_VAL(task))  
                 shutdown(c->cli_sock, SHUT_RDWR);  
         close(c->cli_sock);  
   
         /* free buffer */          /* free buffer */
         AIT_FREE_VAL(&c->cli_buf);          AIT_FREE_VAL(&c->cli_buf);
   
         io_arrayDel(s->srv_clients, c->cli_id, 0);          io_arrayDel(s->srv_clients, c->cli_id, 0);
         if (c)          if (c)
                 io_free(c);                  io_free(c);
   }
   
   
   static inline int
   _check4freeslot(rpc_srv_t * __restrict srv, io_sockaddr_t * __restrict sa)
   {
           rpc_cli_t *c = NULL;
           register int i;
   
           /* check free slots for connect */
           for (i = 0; i < io_arraySize(srv->srv_clients) && 
                           (c = io_array(srv->srv_clients, i, rpc_cli_t*)); i++)
                   /* check for duplicates */
                   if (sa && !io_addrcmp(&c->cli_sa, sa, 42))
                           break;
           if (i >= io_arraySize(srv->srv_clients))
                   return -1;      /* no more free slots! */
   
           return i;
   }
   
   static rpc_cli_t *
   _allocClient(rpc_srv_t * __restrict srv, io_sockaddr_t * __restrict sa)
   {
           rpc_cli_t *c = NULL;
           int n;
   
           n = _check4freeslot(srv, sa);
           if (n == -1)
                   return NULL;
           else
                   c = io_array(srv->srv_clients, n, rpc_cli_t*);
   
           if (!c) {
                   c = io_malloc(sizeof(rpc_cli_t));
                   if (!c) {
                           LOGERR;
                           srv->srv_kill = 1;
                           return NULL;
                   } else {
                           memset(c, 0, sizeof(rpc_cli_t));
                           io_arraySet(srv->srv_clients, n, c);
                           c->cli_id = n;
                           c->cli_parent = srv;
                   }
   
                   /* alloc empty buffer */
                   AIT_SET_BUF2(&c->cli_buf, 0, srv->srv_netbuf);
           }
   
           return c;
   }
   
   
   static void *
   freeClient(sched_task_t *task)
   {
           rpc_freeCli(TASK_ARG(task));
   
         return NULL;          return NULL;
 }  }
   
 static void *  static void *
   closeClient(sched_task_t *task)
   {
           int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock;
   
           rpc_freeCli(TASK_ARG(task));
   
           /* close client socket */
           shutdown(sock, SHUT_RDWR);
           close(sock);
           return NULL;
   }
   
   static void *
 txPacket(sched_task_t *task)  txPacket(sched_task_t *task)
 {  {
         rpc_cli_t *c = TASK_ARG(task);          rpc_cli_t *c = TASK_ARG(task);
Line 114  txPacket(sched_task_t *task) Line 201  txPacket(sched_task_t *task)
         ret = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);          ret = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);
         if (ret == -1 || ret != wlen) {          if (ret == -1 || ret != wlen) {
                 /* close connection */                  /* close connection */
                schedEvent(TASK_ROOT(task), closeClient, c, 42, NULL, 0);                schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                 TASK_ARG(task), 0, NULL, 0);
         }          }
   
         return NULL;          return NULL;
Line 187  rxPacket(sched_task_t *task) Line 275  rxPacket(sched_task_t *task)
   
         if (!off)          if (!off)
                 memset(buf, 0, AIT_LEN(&c->cli_buf));                  memset(buf, 0, AIT_LEN(&c->cli_buf));
         else  
                 memmove(buf, buf + off, AIT_LEN(&c->cli_buf) - off);  
         rlen = recv(TASK_FD(task), buf + off, AIT_LEN(&c->cli_buf) - off, 0);          rlen = recv(TASK_FD(task), buf + off, AIT_LEN(&c->cli_buf) - off, 0);
         if (rlen < 1) {          if (rlen < 1) {
                 /* close connection */                  /* close connection */
                schedEvent(TASK_ROOT(task), closeClient, c, 42, NULL, 0);                schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                 TASK_ARG(task), 0, NULL, 0);
                 return NULL;                  return NULL;
         } else {          } else {
                 rlen += off;    /* add reminded bytes from previous rxPacket, if exists! */                  rlen += off;    /* add reminded bytes from previous rxPacket, if exists! */
Line 247  rxPacket(sched_task_t *task) Line 334  rxPacket(sched_task_t *task)
   
                 /* send RPC reply */                  /* send RPC reply */
                 if (!noreply)                  if (!noreply)
                        schedWrite(TASK_ROOT(task), txPacket, TASK_ARG(task), TASK_FD(task), rpc, len);                        schedWrite(TASK_ROOT(task), cbProto[s->srv_proto][CB_TXPACKET], 
                                         TASK_ARG(task), TASK_FD(task), rpc, len);
   
                 off += len;                  off += len;
         } while (rlen > 0);          } while (rlen > 0);
Line 262  acceptClients(sched_task_t *task) Line 350  acceptClients(sched_task_t *task)
 {  {
         rpc_srv_t *srv = TASK_ARG(task);          rpc_srv_t *srv = TASK_ARG(task);
         rpc_cli_t *c = NULL;          rpc_cli_t *c = NULL;
         register int i;  
         socklen_t salen = sizeof(io_sockaddr_t);          socklen_t salen = sizeof(io_sockaddr_t);
   
        /* check free slots for connect */        c = _allocClient(srv, NULL);
        for (i = 0; i < io_arraySize(srv->srv_clients) &&         if (!c)
                        (c = io_array(srv->srv_clients, i, rpc_cli_t*)); i++); 
        if (c)      /* no more free slots! */ 
                 goto end;                  goto end;
         c = io_malloc(sizeof(rpc_cli_t));  
         if (!c) {  
                 LOGERR;  
                 srv->srv_kill = 1;  
                 return NULL;  
         } else {  
                 memset(c, 0, sizeof(rpc_cli_t));  
                 io_arraySet(srv->srv_clients, i, c);  
                 c->cli_id = i;  
                 c->cli_parent = srv;  
         }  
   
         /* alloc empty buffer */  
         AIT_SET_BUF2(&c->cli_buf, 0, srv->srv_netbuf);  
   
         /* accept client */          /* accept client */
         c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);          c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
         if (c->cli_sock == -1) {          if (c->cli_sock == -1) {
                 LOGERR;                  LOGERR;
                 AIT_FREE_VAL(&c->cli_buf);                  AIT_FREE_VAL(&c->cli_buf);
                io_arrayDel(srv->srv_clients, i, 42);                io_arrayDel(srv->srv_clients, c->cli_id, 42);
                 goto end;                  goto end;
         } else          } else
                 fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);                  fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
   
        schedRead(TASK_ROOT(task), rxPacket, c, c->cli_sock, NULL, 0);        schedRead(TASK_ROOT(task), cbProto[srv->srv_proto][CB_RXPACKET], c, 
                         c->cli_sock, NULL, 0);
 end:  end:
         schedReadSelf(task);          schedReadSelf(task);
         return NULL;          return NULL;
 }  }
   
 /* ------------------------------------------------------ */  
   
 static void *  static void *
closeBLOBClient(sched_task_t *task)txUDPPacket(sched_task_t *task)
 {  {
         rpc_cli_t *c = TASK_ARG(task);          rpc_cli_t *c = TASK_ARG(task);
         rpc_srv_t *s = c->cli_parent;          rpc_srv_t *s = c->cli_parent;
           rpc_func_t *f = NULL;
           u_char buf[USHRT_MAX] = { 0 };
           struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
           int ret, wlen = sizeof(struct tagRPCCall);
           struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
   
        schedCancelby(TASK_ROOT(task), taskMAX, CRITERIA_ARG, TASK_ARG(task), NULL);        schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
         schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                          TASK_ARG(task), ts, TASK_ARG(task), 0);
   
        /* close client socket */        /* copy RPC header */
        if (TASK_VAL(task))        memcpy(buf, TASK_DATA(task), wlen);
                shutdown(c->cli_sock, SHUT_RDWR); 
        close(c->cli_sock); 
   
           if (rpc->call_argc) {
                   f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
                   if (!f) {
                           rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
                           rpc->call_argc ^= rpc->call_argc;
                           rpc->call_rep.ret = RPC_ERROR(-1);
                           rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                   } else {
                           rpc->call_argc = htons(io_arraySize(RPC_RETVARS(c)));
                           /* Go Encapsulate variables */
                           ret = io_vars2buffer(buf + wlen, sizeof buf - wlen, RPC_RETVARS(c));
                           /* Free return values */
                           io_freeVars(&c->cli_vars);
                           if (ret == -1) {
                                   rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
                                   rpc->call_argc ^= rpc->call_argc;
                                   rpc->call_rep.ret = RPC_ERROR(-1);
                                   rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                           } else
                                   wlen += ret;
                   }
           }
   
           rpc->call_len = htons(wlen);
   
           /* calculate CRC */
           rpc->call_crc ^= rpc->call_crc;
           rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
   
           /* send reply */
           ret = sendto(TASK_FD(task), buf, wlen, MSG_NOSIGNAL, 
                           &c->cli_sa.sa, c->cli_sa.sa.sa_len);
           if (ret == -1 || ret != wlen) {
                   /* close connection */
                   schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                    TASK_ARG(task), 0, NULL, 0);
           }
   
           return NULL;
   }
   
   static void *
   rxUDPPacket(sched_task_t *task)
   {
           rpc_srv_t *srv = TASK_ARG(task);
           rpc_cli_t *c = NULL;
           socklen_t salen = sizeof(io_sockaddr_t);
           int len, rlen, noreply;
           u_short crc, off = 0;
           u_char buf[USHRT_MAX + 1];
           struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
           io_sockaddr_t sa;
           struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
   
           /* receive connect packet */
           rlen = recvfrom(TASK_FD(task), buf, sizeof buf, 0, &sa.sa, &salen);
           if (rlen < 1)
                   goto end;
   
           c = _allocClient(srv, &sa);
           if (!c)
                   goto end;
           else {
                   c->cli_sock = TASK_FD(task);
                   memcpy(&c->cli_sa, &sa, sizeof c->cli_sa);
                   memcpy(AIT_GET_BUF(&c->cli_buf), buf, AIT_LEN(&c->cli_buf));
                   /* armed timer for close stateless connection */
                   schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
                   schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], 
                                   c, ts, c, 0);
           }
   
           do {
                   /* check RPC packet */
                   if (rlen < sizeof(struct tagRPCCall)) {
                           rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
                           break;
                   } else
                           rpc = (struct tagRPCCall*) (AIT_GET_BUF(&c->cli_buf) + off);
   
                   len = ntohs(rpc->call_len);
                   rlen -= len;
   
                   /* check RPC packet lengths */
                   if (rlen < 0 || len < sizeof(struct tagRPCCall)) {
                           rpc_SetErr(ERPCMISMATCH, "Broken RPC packet length");
                           /* skip entire packet */
                           break;
                   }
   
                   /* check integrity of packet */
                   crc = ntohs(rpc->call_crc);
                   rpc->call_crc ^= rpc->call_crc;
                   if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
                           rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
   
                           off += len;
                           /* try next packet remaining into buffer */
                           continue;
                   }
   
                   noreply = RPC_CHK_NOREPLY(rpc);
   
                   /* check RPC packet session info */
                   if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
                           rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
                           rpc->call_argc ^= rpc->call_argc;
                           rpc->call_rep.ret = RPC_ERROR(-1);
                           rpc->call_rep.eno = RPC_ERROR(errno);
                   } else {
                           /* execute RPC call */
                           schedEvent(TASK_ROOT(task), execCall, c, off, NULL, 0);
                   }
   
                   /* send RPC reply */
                   if (!noreply)
                           schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], 
                                           c, TASK_FD(task), rpc, len);
   
                   off += len;
           } while (rlen > 0);
   
   end:
           schedReadSelf(task);
           return NULL;
   }
   
   /* ------------------------------------------------------ */
   
   inline void
   rpc_freeBLOBCli(rpc_cli_t * __restrict c)
   {
           rpc_srv_t *s = c->cli_parent;
   
           schedCancelby(s->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
   
         /* free buffer */          /* free buffer */
         AIT_FREE_VAL(&c->cli_buf);          AIT_FREE_VAL(&c->cli_buf);
   
         io_arrayDel(s->srv_blob.clients, c->cli_id, 0);          io_arrayDel(s->srv_blob.clients, c->cli_id, 0);
         if (c)          if (c)
                 io_free(c);                  io_free(c);
   }
   
   
   static void *
   closeBLOBClient(sched_task_t *task)
   {
           int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock;
   
           rpc_freeBLOBCli(TASK_ARG(task));
   
           /* close client socket */
           shutdown(sock, SHUT_RDWR);
           close(sock);
         return NULL;          return NULL;
 }  }
   
Line 576  rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s Line 801  rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s
                 close(srv->srv_blob.server.cli_sock);                  close(srv->srv_blob.server.cli_sock);
                 AIT_FREE_VAL(&srv->srv_blob.dir);                  AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;                  return -1;
        }        } else
                 fcntl(srv->srv_blob.server.cli_sock, F_SETFL, 
                                 fcntl(srv->srv_blob.server.cli_sock, F_GETFL) | O_NONBLOCK);
   
   
         /* allocate pool for concurent blob clients */          /* allocate pool for concurent blob clients */
         srv->srv_blob.clients = io_arrayInit(io_arraySize(srv->srv_clients));          srv->srv_blob.clients = io_arrayInit(io_arraySize(srv->srv_clients));
         if (!srv->srv_blob.clients) {          if (!srv->srv_blob.clients) {
Line 634  rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv) Line 862  rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv)
                 return -1;                  return -1;
         }          }
   
         fcntl(srv->srv_blob.server.cli_sock, F_SETFL,   
                         fcntl(srv->srv_blob.server.cli_sock, F_GETFL) | O_NONBLOCK);  
   
         if (listen(srv->srv_blob.server.cli_sock, io_arraySize(srv->srv_blob.clients)) == -1) {          if (listen(srv->srv_blob.server.cli_sock, io_arraySize(srv->srv_blob.clients)) == -1) {
                 LOGERR;                  LOGERR;
                 return -1;                  return -1;
Line 691  rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv) Line 916  rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv)
  * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)   * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
  * @csHost = Host name or address for bind server, if NULL any address   * @csHost = Host name or address for bind server, if NULL any address
  * @Port = Port for bind server, if Port == 0 default port is selected   * @Port = Port for bind server, if Port == 0 default port is selected
    * @proto = Protocol, if == 0 choose SOCK_STREAM
  * return: NULL == error or !=NULL bind and created RPC server instance   * return: NULL == error or !=NULL bind and created RPC server instance
  */   */
 rpc_srv_t *  rpc_srv_t *
 rpc_srv_initServer(u_int regProgID, u_char regProcID, int concurentClients,   rpc_srv_initServer(u_int regProgID, u_char regProcID, int concurentClients, 
                int netBuf, const char *csHost, u_short Port)                int netBuf, const char *csHost, u_short Port, int proto)
 {  {
         int n = 1;          int n = 1;
         rpc_srv_t *srv = NULL;          rpc_srv_t *srv = NULL;
         io_sockaddr_t sa = IO_SOCKADDR_INIT;          io_sockaddr_t sa = IO_SOCKADDR_INIT;
   
        if (!concurentClients || !regProgID) {        if (!concurentClients || !regProgID || (proto < 0 || proto > SOCK_DGRAM)) {
                 rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");                  rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
                 return NULL;                  return NULL;
         }          }
Line 709  rpc_srv_initServer(u_int regProgID, u_char regProcID,  Line 935  rpc_srv_initServer(u_int regProgID, u_char regProcID, 
                 return NULL;                  return NULL;
         if (!Port)          if (!Port)
                 Port = RPC_DEFPORT;                  Port = RPC_DEFPORT;
           if (!proto)
                   proto = SOCK_STREAM;
         if (netBuf < RPC_MIN_BUFSIZ)          if (netBuf < RPC_MIN_BUFSIZ)
                 netBuf = BUFSIZ;                  netBuf = BUFSIZ;
         else          else
Line 729  rpc_srv_initServer(u_int regProgID, u_char regProcID,  Line 957  rpc_srv_initServer(u_int regProgID, u_char regProcID, 
         } else          } else
                 memset(srv, 0, sizeof(rpc_srv_t));                  memset(srv, 0, sizeof(rpc_srv_t));
   
           srv->srv_proto = proto;
         srv->srv_netbuf = netBuf;          srv->srv_netbuf = netBuf;
         srv->srv_session.sess_version = RPC_VERSION;          srv->srv_session.sess_version = RPC_VERSION;
         srv->srv_session.sess_program = regProgID;          srv->srv_session.sess_program = regProgID;
Line 762  rpc_srv_initServer(u_int regProgID, u_char regProcID,  Line 991  rpc_srv_initServer(u_int regProgID, u_char regProcID, 
         }          }
   
         /* create server socket */          /* create server socket */
        srv->srv_server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);        srv->srv_server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, srv->srv_proto, 0);
         if (srv->srv_server.cli_sock == -1) {          if (srv->srv_server.cli_sock == -1) {
                 LOGERR;                  LOGERR;
                 io_arrayDestroy(&srv->srv_clients);                  io_arrayDestroy(&srv->srv_clients);
Line 788  rpc_srv_initServer(u_int regProgID, u_char regProcID,  Line 1017  rpc_srv_initServer(u_int regProgID, u_char regProcID, 
                                 srv->srv_server.cli_sa.sa.sa_len) == -1) {                                  srv->srv_server.cli_sa.sa.sa_len) == -1) {
                 LOGERR;                  LOGERR;
                 goto err;                  goto err;
        }        } else
                 fcntl(srv->srv_server.cli_sock, F_SETFL, 
                                 fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
   
         rpc_register_srvPing(srv);          rpc_register_srvPing(srv);
   
Line 845  rpc_srv_loopServer(rpc_srv_t * __restrict srv) Line 1076  rpc_srv_loopServer(rpc_srv_t * __restrict srv)
                 return -1;                  return -1;
         }          }
   
        if (listen(srv->srv_server.cli_sock, io_arraySize(srv->srv_clients)) == -1) {        if (srv->srv_proto == SOCK_STREAM)
                LOGERR;                if (listen(srv->srv_server.cli_sock, 
                return -1;                                        io_arraySize(srv->srv_clients)) == -1) {
        } else                        LOGERR;
                fcntl(srv->srv_server.cli_sock, F_SETFL,                         return -1;
                                fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);                }
   
        if (!schedRead(srv->srv_root, acceptClients, srv, srv->srv_server.cli_sock, NULL, 0)) {        if (!schedRead(srv->srv_root, cbProto[srv->srv_proto][CB_ACCEPTCLIENT], srv, 
                                 srv->srv_server.cli_sock, NULL, 0)) {
                 rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());                  rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                 return -1;                  return -1;
         }          }

Removed from v.1.12  
changed lines
  Added in v.1.13


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>