Diff for /libaitrpc/src/srv.c between versions 1.23.6.1 and 1.23.6.6

version 1.23.6.1, 2014/11/17 23:28:55 version 1.23.6.6, 2014/12/18 00:50:06
Line 59  static void *txUDPPacket(sched_task_t *); Line 59  static void *txUDPPacket(sched_task_t *);
   
 /* SOCK_RAW */  /* SOCK_RAW */
   
static sched_task_func_t cbProto[SOCK_RAW + 1][4] = {/* SOCK_BPF */
 static void *rxBPFPacket(sched_task_t *);
 static void *txBPFPacket(sched_task_t *);
 
 static sched_task_func_t cbProto[SOCK_BPF + 1][4] = {
         { acceptClients, closeClient, rxPacket, txPacket },     /* SOCK_STREAM */          { acceptClients, closeClient, rxPacket, txPacket },     /* SOCK_STREAM */
         { acceptClients, closeClient, rxPacket, txPacket },     /* SOCK_STREAM */          { acceptClients, closeClient, rxPacket, txPacket },     /* SOCK_STREAM */
         { rxUDPPacket, freeClient, rxUDPPacket, txUDPPacket },  /* SOCK_DGRAM */          { rxUDPPacket, freeClient, rxUDPPacket, txUDPPacket },  /* SOCK_DGRAM */
        { NULL, NULL, NULL, NULL }                              /* SOCK_RAW */        { NULL, NULL, NULL, NULL },                                /* SOCK_RAW */
         { rxBPFPacket, freeClient, rxBPFPacket, txBPFPacket }   /* SOCK_BPF */
 };  };
   
 /* Global Signal Argument when kqueue support disabled */  /* Global Signal Argument when kqueue support disabled */
Line 582  rxUDPPacket(sched_task_t *task) Line 587  rxUDPPacket(sched_task_t *task)
                                 rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");                                  rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
                         schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],                           schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], 
                                         c, 0, NULL, 0);                                          c, 0, NULL, 0);
                        return NULL;                        goto end;
                 }                  }
                 salen = sa.ss.ss_len = sizeof(sockaddr_t);                  salen = sa.ss.ss_len = sizeof(sockaddr_t);
                 rlen = recvfrom(TASK_FD(task), buf, len, 0, &sa.sa, &salen);                  rlen = recvfrom(TASK_FD(task), buf, len, 0, &sa.sa, &salen);
Line 590  rxUDPPacket(sched_task_t *task) Line 595  rxUDPPacket(sched_task_t *task)
                         /* close connection */                          /* close connection */
                         schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],                           schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], 
                                         c, 0, NULL, 0);                                          c, 0, NULL, 0);
                        return NULL;                        goto end;
                 }                  }
                 if (e_addrcmp(&c->cli_sa, &sa, 42))                  if (e_addrcmp(&c->cli_sa, &sa, 42))
                         rlen ^= rlen;   /* skip if arrive from different address */                          rlen ^= rlen;   /* skip if arrive from different address */
Line 602  rxUDPPacket(sched_task_t *task) Line 607  rxUDPPacket(sched_task_t *task)
         rpc->call_crc ^= rpc->call_crc;          rpc->call_crc ^= rpc->call_crc;
         if (crc != crcFletcher16((u_short*) rpc, len / 2)) {          if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
                 rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");                  rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
                   /* close connection */
                   schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], 
                                   c, 0, NULL, 0);
                   goto end;
           }
   
           noreply = RPC_CHK_NOREPLY(rpc);
   
           /* check RPC packet session info */
           if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
                   rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
   
                   rpc->call_argc ^= rpc->call_argc;
                   rpc->call_rep.ret = RPC_ERROR(-1);
                   rpc->call_rep.eno = RPC_ERROR(errno);
           } else {
                   /* execute RPC call */
                   schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
           }
   
           /* send RPC reply */
           if (!noreply)
                   schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], 
                                   c, TASK_FD(task), rpc, len);
   end:
           schedReadSelf(task);
           return NULL;
   }
   
   
   static void *
   txBPFPacket(sched_task_t *task)
   {
           rpc_cli_t *c = TASK_ARG(task);
           rpc_srv_t *s = c->cli_parent;
           rpc_func_t *f = NULL;
           u_char *buf = AIT_GET_BUF(&c->cli_buf);
           struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
           int ret, len, wlen = sizeof(struct tagRPCCall);
           struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
           struct ether_header *eh;
           ait_val_t b = AIT_VAL_INIT;
   
           schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
           schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                            TASK_ARG(task), ts, TASK_ARG(task), 0);
   
           if (rpc->call_argc) {
                   f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
                   if (!f) {
                           rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
                           rpc->call_argc ^= rpc->call_argc;
                           rpc->call_rep.ret = RPC_ERROR(-1);
                           rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                   } else {
                           /* calc estimated length */
                           len = ait_resideVars(RPC_RETVARS(c)) + wlen;
                           if (len > AIT_LEN(&c->cli_buf))
                                   AIT_RE_BUF(&c->cli_buf, len);
                           buf = AIT_GET_BUF(&c->cli_buf);
                           rpc = (struct tagRPCCall*) buf;
   
                           rpc->call_argc = htons(array_Size(RPC_RETVARS(c)));
                           /* Go Encapsulate variables */
                           ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen, 
                                           RPC_RETVARS(c));
                           /* Free return values */
                           ait_freeVars(&RPC_RETVARS(c));
                           if (ret == -1) {
                                   rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
                                   rpc->call_argc ^= rpc->call_argc;
                                   rpc->call_rep.ret = RPC_ERROR(-1);
                                   rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                           } else
                                   wlen += ret;
                   }
           }
   
           rpc->call_len = htonl(wlen);
   
           /* calculate CRC */
           rpc->call_crc ^= rpc->call_crc;
           rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
   
           /* send reply */
           AIT_SET_BUF(&b, NULL, MIN(wlen, s->srv_netbuf) + ETHER_HDR_LEN);
           eh = (struct ether_header*) AIT_GET_BUF(&b);
           memcpy(eh->ether_dhost, LLADDR(&c->cli_sa.sdl), ETHER_ADDR_LEN);
           eh->ether_type = htons(RPC_DEFPORT);
           memcpy(eh + 1, buf, MIN(wlen, s->srv_netbuf));
   
           ret = write(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b));
           AIT_FREE_VAL(&b);
           if (ret == -1) {
                   /* close connection */
                   schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                   TASK_ARG(task), 0, NULL, 0);
                 return NULL;                  return NULL;
         }          }
   
           return NULL;
   }
   
   static void *
   rxBPFPacket(sched_task_t *task)
   {
           rpc_srv_t *srv = TASK_ARG(task);
           rpc_cli_t *c = NULL;
           int len, rlen, noreply;
           u_short crc;
           struct tagRPCCall *rpc;
           sockaddr_t sa;
           struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
           struct bpf_hdr *h;
           struct ether_header *eh;
           ait_val_t b = AIT_VAL_INIT;
   
           /* receive connect packet */
           AIT_SET_BUF(&b, NULL, srv->srv_netbuf);
           rlen = read(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b));
           h = (struct bpf_hdr*) AIT_GET_BUF(&b);
           rlen -= h->bh_hdrlen;
           if (rlen < h->bh_caplen || h->bh_caplen != h->bh_datalen || 
                           rlen < ETHER_HDR_LEN + sizeof(struct tagRPCCall)) {
                   rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
                   goto end;
           } else {
                   rlen = h->bh_caplen;
                   eh = (struct ether_header*) (AIT_GET_BUF(&b) + h->bh_hdrlen);
                   rlen -= ETHER_HDR_LEN;
                   rpc = (struct tagRPCCall*) (eh + 1);
                   if (eh->ether_type != ntohs(RPC_DEFPORT))
                           goto end;
                   else
                           e_getlinkbymac((const ether_addr_t*) eh->ether_shost, &sa);
           }
   
           c = _allocClient(srv, &sa);
           if (!c) {
                   EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
                   usleep(2000);   /* blocked client delay */
                   goto end;
           } else {
                   len = ntohl(rpc->call_len);
                   if (len > AIT_LEN(&c->cli_buf))
                           AIT_RE_BUF(&c->cli_buf, len);
                   memcpy(AIT_GET_BUF(&c->cli_buf), rpc, AIT_LEN(&c->cli_buf));
                   rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
   
                   c->cli_sock = TASK_FD(task);
                   memcpy(&c->cli_sa, &sa, sizeof c->cli_sa);
   
                   /* armed timer for close stateless connection */
                   schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
                   schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], 
                                   c, ts, c, 0);
           }
   
           /* check integrity of packet */
           crc = ntohs(rpc->call_crc);
           rpc->call_crc ^= rpc->call_crc;
           if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
                   rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
                   /* close connection */
                   schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], 
                                   c, 0, NULL, 0);
                   goto end;
           }
   
         noreply = RPC_CHK_NOREPLY(rpc);          noreply = RPC_CHK_NOREPLY(rpc);
   
         /* check RPC packet session info */          /* check RPC packet session info */
Line 624  rxUDPPacket(sched_task_t *task) Line 795  rxUDPPacket(sched_task_t *task)
                 schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],                   schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], 
                                 c, TASK_FD(task), rpc, len);                                  c, TASK_FD(task), rpc, len);
 end:  end:
           AIT_FREE_VAL(&b);
         schedReadSelf(task);          schedReadSelf(task);
         return NULL;          return NULL;
 }  }
Line 1239  rpc_srv_loopServer(rpc_srv_t * __restrict srv) Line 1411  rpc_srv_loopServer(rpc_srv_t * __restrict srv)
         for (i = 0; i < array_Size(srv->srv_clients); i++) {          for (i = 0; i < array_Size(srv->srv_clients); i++) {
                 c = array(srv->srv_clients, i, rpc_cli_t*);                  c = array(srv->srv_clients, i, rpc_cli_t*);
                 if (c) {                  if (c) {
                        shutdown(c->cli_sock, SHUT_RDWR);                        if (srv->srv_proto == SOCK_STREAM) {
                        close(c->cli_sock);                                shutdown(c->cli_sock, SHUT_RDWR);
                                 close(c->cli_sock);
                         }
   
                         schedCancelby(srv->srv_root, taskMAX, CRITERIA_ARG, c, NULL);                          schedCancelby(srv->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
                         ait_freeVars(&RPC_RETVARS(c));                          ait_freeVars(&RPC_RETVARS(c));
Line 1299  rpc_srv_execCall(rpc_cli_t * __restrict cli, struct ta Line 1473  rpc_srv_execCall(rpc_cli_t * __restrict cli, struct ta
  * @concurentClients = Concurent clients at same time to this server   * @concurentClients = Concurent clients at same time to this server
  * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)   * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
  * @csIface = Interface name for bind server, if NULL first interface on host   * @csIface = Interface name for bind server, if NULL first interface on host
  * @protoNum = Protocol ethernet number for bind server, if Port == 0 default port is selected  
  * return: NULL == error or !=NULL bind and created RPC server instance   * return: NULL == error or !=NULL bind and created RPC server instance
  */   */
 rpc_srv_t *  rpc_srv_t *
rpc_srv_initServer2(u_char InstID, int concurentClients, int netBuf, rpc_srv_initServer2(u_char InstID, int concurentClients, int netBuf, const char *csIface)
                const char *csIface, u_short protoNum) 
 {  {
         int n = 1;          int n = 1;
         rpc_srv_t *srv = NULL;          rpc_srv_t *srv = NULL;
Line 1312  rpc_srv_initServer2(u_char InstID, int concurentClient Line 1484  rpc_srv_initServer2(u_char InstID, int concurentClient
         char szIface[64], szStr[STRSIZ];          char szIface[64], szStr[STRSIZ];
         register int i;          register int i;
         struct ifreq ifr;          struct ifreq ifr;
           struct bpf_insn insns[] = {
                   BPF_STMT(BPF_LD + BPF_H + BPF_ABS, 12),
                   BPF_JUMP(BPF_JMP + BPF_JEQ + BPF_K, RPC_DEFPORT, 0, 1),
                   BPF_STMT(BPF_RET + BPF_K, -1),
                   BPF_STMT(BPF_RET + BPF_K, 0),
           };
           struct bpf_program fcode = { 
                   .bf_len = sizeof(insns) / sizeof(struct bpf_insn), 
                   .bf_insns = insns
           };
   
         if (!concurentClients) {          if (!concurentClients) {
                 rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");                  rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
Line 1322  rpc_srv_initServer2(u_char InstID, int concurentClient Line 1504  rpc_srv_initServer2(u_char InstID, int concurentClient
                         return NULL;                          return NULL;
         } else          } else
                 strlcpy(szIface, csIface, sizeof szIface);                  strlcpy(szIface, csIface, sizeof szIface);
        if (e_getifacebyname(szIface, &sa))        if (!e_getifacebyname(szIface, &sa))
                 return NULL;                  return NULL;
         if (!protoNum)  
                 protoNum = RPC_DEFPORT;  
   
 #ifdef HAVE_SRANDOMDEV  #ifdef HAVE_SRANDOMDEV
         srandomdev();          srandomdev();
Line 1342  rpc_srv_initServer2(u_char InstID, int concurentClient Line 1522  rpc_srv_initServer2(u_char InstID, int concurentClient
         } else          } else
                 memset(srv, 0, sizeof(rpc_srv_t));                  memset(srv, 0, sizeof(rpc_srv_t));
   
        srv->srv_proto = protoNum;        srv->srv_proto = SOCK_BPF;
         srv->srv_netbuf = netBuf;          srv->srv_netbuf = netBuf;
         srv->srv_session.sess_version = RPC_VERSION;          srv->srv_session.sess_version = RPC_VERSION;
         srv->srv_session.sess_instance = InstID;          srv->srv_session.sess_instance = InstID;
Line 1395  rpc_srv_initServer2(u_char InstID, int concurentClient Line 1575  rpc_srv_initServer2(u_char InstID, int concurentClient
                 LOGERR;                  LOGERR;
                 goto err;                  goto err;
         }          }
           if (ioctl(srv->srv_server.cli_sock, BIOCSETF, &fcode) == -1) {
                   LOGERR;
                   goto err;
           }
         n = (netBuf < RPC_MIN_BUFSIZ) ? getpagesize() : E_ALIGN(netBuf, 2);          n = (netBuf < RPC_MIN_BUFSIZ) ? getpagesize() : E_ALIGN(netBuf, 2);
         if (ioctl(srv->srv_server.cli_sock, BIOCSBLEN, &n) == -1) {          if (ioctl(srv->srv_server.cli_sock, BIOCSBLEN, &n) == -1) {
                 LOGERR;                  LOGERR;
Line 1422  err: /* error condition */ Line 1606  err: /* error condition */
         e_free(srv);          e_free(srv);
         return NULL;          return NULL;
 }  }
   

Removed from v.1.23.6.1  
changed lines
  Added in v.1.23.6.6


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>