Diff for /libaitrpc/src/srv.c between versions 1.12.2.4 and 1.12.2.5

version 1.12.2.4, 2012/11/16 13:30:05 version 1.12.2.5, 2012/11/19 10:29:02
Line 53  static void *rxPacket(sched_task_t *); Line 53  static void *rxPacket(sched_task_t *);
 static void *txPacket(sched_task_t *);  static void *txPacket(sched_task_t *);
   
 /* SOCK_DGRAM */  /* SOCK_DGRAM */
static void *connectClients(sched_task_t *);static void *freeClient(sched_task_t *);
static void *disconnectClient(sched_task_t *);static void *rxUDPPacket(sched_task_t *);
 static void *txUDPPacket(sched_task_t *);
   
 /* SOCK_RAW */  /* SOCK_RAW */
   
 static sched_task_func_t cbProto[SOCK_RAW + 1][4] = {  static sched_task_func_t cbProto[SOCK_RAW + 1][4] = {
         { acceptClients, closeClient, rxPacket, txPacket },     /* SOCK_STREAM */          { acceptClients, closeClient, rxPacket, txPacket },     /* SOCK_STREAM */
         { acceptClients, closeClient, rxPacket, txPacket },     /* SOCK_STREAM */          { acceptClients, closeClient, rxPacket, txPacket },     /* SOCK_STREAM */
        { connectClients, disconnectClient, NULL, NULL },         { rxUDPPacket, freeClient, rxUDPPacket, txUDPPacket },     /* SOCK_DGRAM */
         { NULL, NULL, NULL, NULL }                              /* SOCK_RAW */          { NULL, NULL, NULL, NULL }                              /* SOCK_RAW */
 };  };
   
Line 118  _allocClient(rpc_srv_t * __restrict srv, io_sockaddr_t Line 119  _allocClient(rpc_srv_t * __restrict srv, io_sockaddr_t
   
   
 static void *  static void *
connectClients(sched_task_t *task)freeClient(sched_task_t *task)
 {  {
         rpc_srv_t *srv = TASK_ARG(task);  
         rpc_cli_t *c = NULL;  
         socklen_t salen = sizeof(io_sockaddr_t);  
         int len, rlen;  
         u_short crc;  
         u_char buf[USHRT_MAX + 1];  
         struct tagRPCCall *rpc = (struct tagRPCCall*) buf;  
         io_sockaddr_t sa;  
   
 #if 0  
         c = _allocClient(srv);  
         if (!c) {  
                 schedReadSelf(task);  
                 return NULL;  
         } else  
                 buf = AIT_GET_BUF(&c->cli_buf);  
 #endif  
   
         /* receive connect packet */  
         rlen = recvfrom(TASK_FD(task), buf, sizeof buf, 0, &sa.sa, &salen);  
         if (rlen < 1)  
                 goto end;  
   
         /* check RPC packet */  
         if (rlen < sizeof(struct tagRPCCall)) {  
                 rpc_SetErr(ERPCMISMATCH, "Short RPC packet");  
                 goto end;  
         } else {  
                 len = ntohs(rpc->call_len);  
                 rlen -= len;  
         }  
         /* check RPC packet lengths */  
         if (rlen < 0 || len < sizeof(struct tagRPCCall)) {  
                 rpc_SetErr(ERPCMISMATCH, "Broken RPC packet length");  
                 goto end;  
         }  
   
         /* check integrity of packet */  
         crc = ntohs(rpc->call_crc);  
         rpc->call_crc ^= rpc->call_crc;  
         if (crc != crcFletcher16((u_short*) rpc, len / 2)) {  
                 rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");  
                 goto end;  
         }  
   
         /* check RPC packet session info */  
         if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {  
                 rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");  
                 goto end;  
         }  
   
         /* RPC packet is OK */  
   
         c = _allocClient(srv, &sa);  
         if (!c)  
                 goto end;  
   
         if (0) {  
                 schedRead(TASK_ROOT(task), cbProto[srv->srv_proto][CB_RXPACKET], c,   
                                 c->cli_sock, NULL, 0);  
                 schedReadSelf(task);  
                 return NULL;  
         }  
   
 end:  
 #if 0  
         /* close connection & dont send disconnect */  
         schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],   
                         c, 42, NULL, 42);  
 #endif  
         schedReadSelf(task);  
         return NULL;  
 }  
   
 static void *  
 disconnectClient(sched_task_t *task)  
 {  
         rpc_cli_t *c = TASK_ARG(task);          rpc_cli_t *c = TASK_ARG(task);
         rpc_srv_t *s = c->cli_parent;          rpc_srv_t *s = c->cli_parent;
   
         schedCancelby(TASK_ROOT(task), taskMAX, CRITERIA_ARG, TASK_ARG(task), NULL);          schedCancelby(TASK_ROOT(task), taskMAX, CRITERIA_ARG, TASK_ARG(task), NULL);
   
         if (!TASK_DATLEN(task)) {  
                 /* TODO: send disconnect packet */  
         }  
   
         /* free buffer */          /* free buffer */
         AIT_FREE_VAL(&c->cli_buf);          AIT_FREE_VAL(&c->cli_buf);
   
Line 450  acceptClients(sched_task_t *task) Line 370  acceptClients(sched_task_t *task)
   
         schedRead(TASK_ROOT(task), cbProto[srv->srv_proto][CB_RXPACKET], c,           schedRead(TASK_ROOT(task), cbProto[srv->srv_proto][CB_RXPACKET], c, 
                         c->cli_sock, NULL, 0);                          c->cli_sock, NULL, 0);
   end:
           schedReadSelf(task);
           return NULL;
   }
   
   
   static void *
   txUDPPacket(sched_task_t *task)
   {
           rpc_cli_t *c = TASK_ARG(task);
           rpc_srv_t *s = c->cli_parent;
           rpc_func_t *f = NULL;
           u_char buf[USHRT_MAX] = { 0 };
           struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
           int ret, wlen = sizeof(struct tagRPCCall);
           struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
   
           schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
           schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                           c, ts, c, 0);
   
           /* copy RPC header */
           memcpy(buf, TASK_DATA(task), wlen);
   
           if (rpc->call_argc) {
                   f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
                   if (!f) {
                           rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
                           rpc->call_argc ^= rpc->call_argc;
                           rpc->call_rep.ret = RPC_ERROR(-1);
                           rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                   } else {
                           rpc->call_argc = htons(io_arraySize(RPC_RETVARS(c)));
                           /* Go Encapsulate variables */
                           ret = io_vars2buffer(buf + wlen, sizeof buf - wlen, RPC_RETVARS(c));
                           /* Free return values */
                           io_freeVars(&c->cli_vars);
                           if (ret == -1) {
                                   rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
                                   rpc->call_argc ^= rpc->call_argc;
                                   rpc->call_rep.ret = RPC_ERROR(-1);
                                   rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                           } else
                                   wlen += ret;
                   }
           }
   
           rpc->call_len = htons(wlen);
   
           /* calculate CRC */
           rpc->call_crc ^= rpc->call_crc;
           rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
   
           /* send reply */
           ret = sendto(TASK_FD(task), buf, wlen, MSG_NOSIGNAL, 
                           &c->cli_sa.sa, c->cli_sa.sa.sa_len);
           if (ret == -1 || ret != wlen) {
                   /* close connection */
                   schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                   c, 42, NULL, 0);
           }
   
           return NULL;
   }
   
   static void *
   rxUDPPacket(sched_task_t *task)
   {
           rpc_srv_t *srv = TASK_ARG(task);
           rpc_cli_t *c = NULL;
           socklen_t salen = sizeof(io_sockaddr_t);
           int len, rlen, noreply;
           u_short crc, off = 0;
           u_char buf[USHRT_MAX + 1];
           struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
           io_sockaddr_t sa;
           struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
   
           /* receive connect packet */
           rlen = recvfrom(TASK_FD(task), buf, sizeof buf, 0, &sa.sa, &salen);
           if (rlen < 1)
                   goto end;
   
           c = _allocClient(srv, &sa);
           if (!c)
                   goto end;
           else {
                   /* armed timer for close stateless connection */
                   memcpy(AIT_GET_BUF(&c->cli_buf), buf, AIT_LEN(&c->cli_buf));
                   schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
                   schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], 
                                   c, ts, c, 0);
           }
   
           do {
                   /* check RPC packet */
                   if (rlen < sizeof(struct tagRPCCall)) {
                           rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
                           break;
                   } else
                           rpc = (struct tagRPCCall*) (AIT_GET_BUF(&c->cli_buf) + off);
   
                   len = ntohs(rpc->call_len);
                   rlen -= len;
   
                   /* check RPC packet lengths */
                   if (rlen < 0 || len < sizeof(struct tagRPCCall)) {
                           rpc_SetErr(ERPCMISMATCH, "Broken RPC packet length");
                           /* skip entire packet */
                           break;
                   }
   
                   /* check integrity of packet */
                   crc = ntohs(rpc->call_crc);
                   rpc->call_crc ^= rpc->call_crc;
                   if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
                           rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
   
                           off += len;
                           /* try next packet remaining into buffer */
                           continue;
                   }
   
                   noreply = RPC_CHK_NOREPLY(rpc);
   
                   /* check RPC packet session info */
                   if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
                           rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
                           rpc->call_argc ^= rpc->call_argc;
                           rpc->call_rep.ret = RPC_ERROR(-1);
                           rpc->call_rep.eno = RPC_ERROR(errno);
                   } else {
                           /* execute RPC call */
                           schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), off, NULL, 0);
                   }
   
                   /* send RPC reply */
                   if (!noreply)
                           schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], 
                                           TASK_ARG(task), TASK_FD(task), rpc, len);
   
                   off += len;
           } while (rlen > 0);
   
 end:  end:
         schedReadSelf(task);          schedReadSelf(task);
         return NULL;          return NULL;

Removed from v.1.12.2.4  
changed lines
  Added in v.1.12.2.5


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>