Diff for /libaitrpc/src/srv.c between versions 1.24.2.4 and 1.24.2.5

version 1.24.2.4, 2015/01/18 00:06:55 version 1.24.2.5, 2015/01/18 00:55:05
Line 64  static void *rxBPFPacket(sched_task_t *); Line 64  static void *rxBPFPacket(sched_task_t *);
 static void *txBPFPacket(sched_task_t *);  static void *txBPFPacket(sched_task_t *);
   
 /* SOCK_EXT */  /* SOCK_EXT */
   static void *rxEXTPacket(sched_task_t *);
   static void *txEXTPacket(sched_task_t *);
   
 static sched_task_func_t cbProto[SOCK_MAX_SUPPORT][4] = {  static sched_task_func_t cbProto[SOCK_MAX_SUPPORT][4] = {
         { acceptClients, closeClient, rxPacket, txPacket },     /* SOCK_STREAM */          { acceptClients, closeClient, rxPacket, txPacket },     /* SOCK_STREAM */
Line 71  static sched_task_func_t cbProto[SOCK_MAX_SUPPORT][4]  Line 73  static sched_task_func_t cbProto[SOCK_MAX_SUPPORT][4] 
         { rxUDPPacket, freeClient, rxUDPPacket, txUDPPacket },  /* SOCK_DGRAM */          { rxUDPPacket, freeClient, rxUDPPacket, txUDPPacket },  /* SOCK_DGRAM */
         { NULL, NULL, NULL, NULL },                             /* SOCK_RAW */          { NULL, NULL, NULL, NULL },                             /* SOCK_RAW */
         { rxBPFPacket, freeClient, rxBPFPacket, txBPFPacket },  /* SOCK_BPF */          { rxBPFPacket, freeClient, rxBPFPacket, txBPFPacket },  /* SOCK_BPF */
        { NULL, NULL, NULL, NULL }      /* SOCK_EXT */        { rxEXTPacket, freeClient, rxEXTPacket, txEXTPacket }      /* SOCK_EXT */
 };  };
   
 /* Global Signal Argument when kqueue support disabled */  /* Global Signal Argument when kqueue support disabled */
Line 119  _allocClient(rpc_srv_t * __restrict srv, sockaddr_t *  Line 121  _allocClient(rpc_srv_t * __restrict srv, sockaddr_t * 
         rpc_cli_t *c = NULL;          rpc_cli_t *c = NULL;
         int n;          int n;
   
        n = _check4freeslot(srv, sa);        if (srv->srv_proto != SOCK_EXT)
                 n = _check4freeslot(srv, sa);
         else
                 n = 0;
         if (n == -1)          if (n == -1)
                 return NULL;                  return NULL;
         else          else
Line 799  rxBPFPacket(sched_task_t *task) Line 804  rxBPFPacket(sched_task_t *task)
                                 c, TASK_FD(task), rpc, len);                                  c, TASK_FD(task), rpc, len);
 end:  end:
         AIT_FREE_VAL(&b);          AIT_FREE_VAL(&b);
           schedReadSelf(task);
           return NULL;
   }
   
   
   static void *
   txEXTPacket(sched_task_t *task)
   {
           rpc_cli_t *c = TASK_ARG(task);
           rpc_srv_t *s = c->cli_parent;
           rpc_func_t *f = NULL;
           u_char *buf = AIT_GET_BUF(&c->cli_buf);
           struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
           int ret, estlen, wlen = sizeof(struct tagRPCCall);
           struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
           struct pollfd pfd;
   
           schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
           schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                            TASK_ARG(task), ts, TASK_ARG(task), 0);
   
           if (rpc->call_argc) {
                   f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
                   if (!f) {
                           rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
                           rpc->call_argc ^= rpc->call_argc;
                           rpc->call_rep.ret = RPC_ERROR(-1);
                           rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                   } else {
                           /* calc estimated length */
                           estlen = ait_resideVars(RPC_RETVARS(c)) + wlen;
                           if (estlen > AIT_LEN(&c->cli_buf))
                                   AIT_RE_BUF(&c->cli_buf, estlen);
                           buf = AIT_GET_BUF(&c->cli_buf);
                           rpc = (struct tagRPCCall*) buf;
   
                           rpc->call_argc = htons(array_Size(RPC_RETVARS(c)));
                           /* Go Encapsulate variables */
                           ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen, 
                                           RPC_RETVARS(c));
                           /* Free return values */
                           ait_freeVars(&c->cli_vars);
                           if (ret == -1) {
                                   rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
                                   rpc->call_argc ^= rpc->call_argc;
                                   rpc->call_rep.ret = RPC_ERROR(-1);
                                   rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                           } else
                                   wlen += ret;
                   }
           }
   
           rpc->call_len = htonl(wlen);
   
           /* send reply */
           pfd.fd = TASK_FD(task);
           pfd.events = POLLOUT;
           for (; wlen > 0; wlen -= ret, buf += ret) {
                   if ((ret = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 || 
                                   pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
                           if (ret)
                                   LOGERR;
                           else
                                   rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
                           /* close connection */
                           schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                           TASK_ARG(task), 0, NULL, 0);
                           return NULL;
                   }
                   ret = write(TASK_FD(task), buf, MIN(wlen, s->srv_netbuf));
                   if (ret == -1) {
                           /* close connection */
                           schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                           TASK_ARG(task), 0, NULL, 0);
                           return NULL;
                   }
           }
   
           return NULL;
   }
   
   static void *
   rxEXTPacket(sched_task_t *task)
   {
           rpc_srv_t *srv = TASK_ARG(task);
           rpc_cli_t *c = NULL;
           int len, rlen, noreply, estlen;
           u_char *buf, b[sizeof(struct tagRPCCall)];
           struct tagRPCCall *rpc = (struct tagRPCCall*) b;
           struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
           struct pollfd pfd;
   
           /* receive connect packet */
           rlen = read(TASK_FD(task), b, sizeof b);
           if (rlen < sizeof(struct tagRPCCall)) {
                   rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
                   goto end;
           }
   
           c = _allocClient(srv, (sockaddr_t*) -1);
           if (!c) {
                   EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
                   usleep(2000);   /* blocked client delay */
                   goto end;
           } else {
                   estlen = ntohl(rpc->call_len);
                   if (estlen > AIT_LEN(&c->cli_buf))
                           AIT_RE_BUF(&c->cli_buf, estlen);
                   rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
                   memcpy(rpc, b, sizeof b);
                   buf = (u_char*) (rpc + 1);
                   len = estlen;
   
                   c->cli_sock = TASK_FD(task);
   
                   /* armed timer for close stateless connection */
                   schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
                   schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], 
                                   c, ts, c, 0);
           }
   
           /* get next part of packet */
           memset(buf, 0, len);
           pfd.fd = TASK_FD(task);
           pfd.events = POLLIN | POLLPRI;
           for (; len > 0; len -= rlen, buf += rlen) {
                   if ((rlen = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 || 
                                   pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
                           if (rlen)
                                   LOGERR;
                           else
                                   rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
                           schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], 
                                           c, 0, NULL, 0);
                           goto end;
                   }
                   rlen = read(TASK_FD(task), buf, len);
                   if (rlen == -1) {
                           /* close connection */
                           schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], 
                                           c, 0, NULL, 0);
                           goto end;
                   }
           }
           len = estlen;
   
           noreply = RPC_CHK_NOREPLY(rpc);
   
           /* check RPC packet session info */
           if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
                   rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
   
                   rpc->call_argc ^= rpc->call_argc;
                   rpc->call_rep.ret = RPC_ERROR(-1);
                   rpc->call_rep.eno = RPC_ERROR(errno);
           } else {
                   /* execute RPC call */
                   schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
           }
   
           /* send RPC reply */
           if (!noreply)
                   schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], 
                                   c, TASK_FD(task), rpc, len);
   end:
         schedReadSelf(task);          schedReadSelf(task);
         return NULL;          return NULL;
 }  }

Removed from v.1.24.2.4  
changed lines
  Added in v.1.24.2.5


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>