Diff for /libaitrpc/src/srv.c between versions 1.23.6.2 and 1.23.6.3

version 1.23.6.2, 2014/11/17 23:51:26 version 1.23.6.3, 2014/12/17 01:30:01
Line 633  end: Line 633  end:
         return NULL;          return NULL;
 }  }
   
   
   static void *
   txBPFPacket(sched_task_t *task)
   {
           rpc_cli_t *c = TASK_ARG(task);
           rpc_srv_t *s = c->cli_parent;
           rpc_func_t *f = NULL;
           u_char *buf = AIT_GET_BUF(&c->cli_buf);
           struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
           int ret, estlen, wlen = sizeof(struct tagRPCCall);
           struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
           struct pollfd pfd;
   
           schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
           schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                            TASK_ARG(task), ts, TASK_ARG(task), 0);
   
           if (rpc->call_argc) {
                   f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
                   if (!f) {
                           rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
                           rpc->call_argc ^= rpc->call_argc;
                           rpc->call_rep.ret = RPC_ERROR(-1);
                           rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                   } else {
                           /* calc estimated length */
                           estlen = ait_resideVars(RPC_RETVARS(c)) + wlen;
                           if (estlen > AIT_LEN(&c->cli_buf))
                                   AIT_RE_BUF(&c->cli_buf, estlen);
                           buf = AIT_GET_BUF(&c->cli_buf);
                           rpc = (struct tagRPCCall*) buf;
   
                           rpc->call_argc = htons(array_Size(RPC_RETVARS(c)));
                           /* Go Encapsulate variables */
                           ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen, 
                                           RPC_RETVARS(c));
                           /* Free return values */
                           ait_freeVars(&c->cli_vars);
                           if (ret == -1) {
                                   rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
                                   rpc->call_argc ^= rpc->call_argc;
                                   rpc->call_rep.ret = RPC_ERROR(-1);
                                   rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                           } else
                                   wlen += ret;
                   }
           }
   
           rpc->call_len = htonl(wlen);
   
           /* calculate CRC */
           rpc->call_crc ^= rpc->call_crc;
           rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
   
           /* send reply */
           pfd.fd = TASK_FD(task);
           pfd.events = POLLOUT;
           for (; wlen > 0; wlen -= ret, buf += ret) {
                   if ((ret = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 || 
                                   pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
                           if (ret)
                                   LOGERR;
                           else
                                   rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
                           /* close connection */
                           schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                           TASK_ARG(task), 0, NULL, 0);
                           return NULL;
                   }
                   ret = write(TASK_FD(task), buf, MIN(wlen, s->srv_netbuf));
                   if (ret == -1) {
                           /* close connection */
                           schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                           TASK_ARG(task), 0, NULL, 0);
                           return NULL;
                   }
           }
   
           return NULL;
   }
   
   static void *
   rxBPFPacket(sched_task_t *task)
   {
           rpc_srv_t *srv = TASK_ARG(task);
           rpc_cli_t *c = NULL;
           int len, rlen, noreply, estlen;
           u_short crc;
           u_char *buf, b[sizeof(struct tagRPCCall)];
           struct tagRPCCall *rpc = (struct tagRPCCall*) b;
           sockaddr_t sa;
           socklen_t salen;
           struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
           struct pollfd pfd;
           struct bpf_hdr *h;
           struct ether_header *eh;
           ait_val_t b = AIT_VAL_INIT;
   
           /* receive connect packet */
           salen = sa.ss.ss_len = sizeof(sockaddr_t);
           AIT_SET_BUF(&b, NULL, srv->srv_netbuf);
           rlen = read(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b));
           if (rlen < sizeof(struct bpf_hdr) + ETHER_HDR_LEN + sizeof(struct tagRPCCall)) {
                   rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
                   goto end;
           } else {
                   h = (struct bpf_hdr*) AIT_GET_BUF(&b);
                   eh = (struct ether_header*) (AIT_GET_BUF(&b) + h->bh_hdrlen);
                   buf = (u_char*) (eh + 1);
           }
           if ()
   
           c = _allocClient(srv, &sa);
           if (!c) {
                   EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
                   usleep(2000);   /* blocked client delay */
                   goto end;
           } else {
                   estlen = ntohl(rpc->call_len);
                   if (estlen > AIT_LEN(&c->cli_buf))
                           AIT_RE_BUF(&c->cli_buf, estlen);
                   rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
                   buf = AIT_GET_BUF(&c->cli_buf);
                   len = estlen;
   
                   c->cli_sock = TASK_FD(task);
                   memcpy(&c->cli_sa, &sa, sizeof c->cli_sa);
   
                   /* armed timer for close stateless connection */
                   schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
                   schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], 
                                   c, ts, c, 0);
           }
   
           /* get next part of packet */
           memset(buf, 0, len);
           pfd.fd = TASK_FD(task);
           pfd.events = POLLIN | POLLPRI;
           for (; len > 0; len -= rlen, buf += rlen) {
                   if ((rlen = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 || 
                                   pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
                           if (rlen)
                                   LOGERR;
                           else
                                   rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
                           schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], 
                                           c, 0, NULL, 0);
                           return NULL;
                   }
                   salen = sa.ss.ss_len = sizeof(sockaddr_t);
                   rlen = recvfrom(TASK_FD(task), buf, len, 0, &sa.sa, &salen);
                   if (rlen == -1) {
                           /* close connection */
                           schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], 
                                           c, 0, NULL, 0);
                           return NULL;
                   }
                   if (e_addrcmp(&c->cli_sa, &sa, 42))
                           rlen ^= rlen;   /* skip if arrive from different address */
           }
           len = estlen;
   
           /* check integrity of packet */
           crc = ntohs(rpc->call_crc);
           rpc->call_crc ^= rpc->call_crc;
           if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
                   rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
                   return NULL;
           }
   
           noreply = RPC_CHK_NOREPLY(rpc);
   
           /* check RPC packet session info */
           if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
                   rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
   
                   rpc->call_argc ^= rpc->call_argc;
                   rpc->call_rep.ret = RPC_ERROR(-1);
                   rpc->call_rep.eno = RPC_ERROR(errno);
           } else {
                   /* execute RPC call */
                   schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
           }
   
           /* send RPC reply */
           if (!noreply)
                   schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], 
                                   c, TASK_FD(task), rpc, len);
   end:
           AIT_FREE_VAL(&b);
           schedReadSelf(task);
           return NULL;
   }
   
 /* ------------------------------------------------------ */  /* ------------------------------------------------------ */
   
 void  void
Line 1244  rpc_srv_loopServer(rpc_srv_t * __restrict srv) Line 1438  rpc_srv_loopServer(rpc_srv_t * __restrict srv)
         for (i = 0; i < array_Size(srv->srv_clients); i++) {          for (i = 0; i < array_Size(srv->srv_clients); i++) {
                 c = array(srv->srv_clients, i, rpc_cli_t*);                  c = array(srv->srv_clients, i, rpc_cli_t*);
                 if (c) {                  if (c) {
                        shutdown(c->cli_sock, SHUT_RDWR);                        if (srv->srv_proto == SOCK_STREAM) {
                        close(c->cli_sock);                                shutdown(c->cli_sock, SHUT_RDWR);
                                 close(c->cli_sock);
                         }
   
                         schedCancelby(srv->srv_root, taskMAX, CRITERIA_ARG, c, NULL);                          schedCancelby(srv->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
                         ait_freeVars(&RPC_RETVARS(c));                          ait_freeVars(&RPC_RETVARS(c));
Line 1315  rpc_srv_initServer2(u_char InstID, int concurentClient Line 1511  rpc_srv_initServer2(u_char InstID, int concurentClient
         char szIface[64], szStr[STRSIZ];          char szIface[64], szStr[STRSIZ];
         register int i;          register int i;
         struct ifreq ifr;          struct ifreq ifr;
           struct bpf_insn insns[] = {
                   BPF_STMT(BPF_LD + BPF_H + BPF_ABS, 12),
                   BPF_JUMP(BPF_JMP + BPF_JEQ + BPF_K, RPC_DEFPORT, 0, 1),
                   BPF_STMT(BPF_RET + BPF_K, -1),
                   BPF_STMT(BPF_RET + BPF_K, 0),
           };
           struct bpf_program fcode = { 
                   .bf_len = sizeof(insns) / sizeof(struct bpf_insn), 
                   .bf_insns = insns
           };
   
         if (!concurentClients) {          if (!concurentClients) {
                 rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");                  rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
Line 1325  rpc_srv_initServer2(u_char InstID, int concurentClient Line 1531  rpc_srv_initServer2(u_char InstID, int concurentClient
                         return NULL;                          return NULL;
         } else          } else
                 strlcpy(szIface, csIface, sizeof szIface);                  strlcpy(szIface, csIface, sizeof szIface);
        if (e_getifacebyname(szIface, &sa))        if (!e_getifacebyname(szIface, &sa))
                 return NULL;                  return NULL;
   
 #ifdef HAVE_SRANDOMDEV  #ifdef HAVE_SRANDOMDEV
Line 1396  rpc_srv_initServer2(u_char InstID, int concurentClient Line 1602  rpc_srv_initServer2(u_char InstID, int concurentClient
                 LOGERR;                  LOGERR;
                 goto err;                  goto err;
         }          }
           if (ioctl(srv->srv_server.cli_sock, BIOCSETF, &fcode) == -1) {
                   LOGERR;
                   goto err;
           }
         n = (netBuf < RPC_MIN_BUFSIZ) ? getpagesize() : E_ALIGN(netBuf, 2);          n = (netBuf < RPC_MIN_BUFSIZ) ? getpagesize() : E_ALIGN(netBuf, 2);
         if (ioctl(srv->srv_server.cli_sock, BIOCSBLEN, &n) == -1) {          if (ioctl(srv->srv_server.cli_sock, BIOCSBLEN, &n) == -1) {
                 LOGERR;                  LOGERR;
Line 1422  err: /* error condition */ Line 1632  err: /* error condition */
         pthread_mutex_destroy(&srv->srv_funcs.mtx);          pthread_mutex_destroy(&srv->srv_funcs.mtx);
         e_free(srv);          e_free(srv);
         return NULL;          return NULL;
 }  
   
 /*  
  * rpc_srv_loopServer2() - Execute Main layer2 server loop and wait for clients requests  
  *  
  * @srv = RPC Server instance  
  * return: -1 error or 0 ok, infinite loop ...  
  */  
 int  
 rpc_srv_loopServer2(rpc_srv_t * __restrict srv)  
 {  
         rpc_cli_t *c;  
         register int i;  
         rpc_func_t *f;  
         struct timespec ts = { RPC_SCHED_POLLING, 0 };  
   
         if (!srv) {  
                 rpc_SetErr(EINVAL, "Invalid parameter can`t start RPC server");  
                 return -1;  
         }  
   
         if (!schedRead(srv->srv_root, cbProto[srv->srv_proto][CB_ACCEPTCLIENT], srv,   
                                 srv->srv_server.cli_sock, NULL, 0)) {  
                 rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());  
                 return -1;  
         }  
   
         schedPolling(srv->srv_root, &ts, NULL);  
         /* main rpc loop */  
         schedRun(srv->srv_root, &srv->srv_kill);  
   
         /* close all clients connections & server socket */  
         for (i = 0; i < array_Size(srv->srv_clients); i++) {  
                 c = array(srv->srv_clients, i, rpc_cli_t*);  
                 if (c) {  
                         schedCancelby(srv->srv_root, taskMAX, CRITERIA_ARG, c, NULL);  
                         ait_freeVars(&RPC_RETVARS(c));  
                         AIT_FREE_VAL(&c->cli_buf);  
                 }  
                 array_Del(srv->srv_clients, i, 42);  
         }  
         array_Destroy(&srv->srv_clients);  
   
         close(srv->srv_server.cli_sock);  
   
         /* detach exported calls */  
         RPC_FUNCS_LOCK(&srv->srv_funcs);  
         while ((f = SLIST_FIRST(&srv->srv_funcs))) {  
                 SLIST_REMOVE_HEAD(&srv->srv_funcs, func_next);  
   
                 AIT_FREE_VAL(&f->func_name);  
                 e_free(f);  
         }  
         srv->srv_funcs.avlh_root = NULL;  
         RPC_FUNCS_UNLOCK(&srv->srv_funcs);  
   
         return 0;  
 }  }

Removed from v.1.23.6.2  
changed lines
  Added in v.1.23.6.3


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>