Diff for /libaitrpc/src/srv.c between versions 1.30.2.1 and 1.34

version 1.30.2.1, 2020/06/25 19:16:43 version 1.34, 2025/03/31 17:06:03
Line 12  terms: Line 12  terms:
 All of the documentation and software included in the ELWIX and AITNET  All of the documentation and software included in the ELWIX and AITNET
 Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>  Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
   
Copyright 2004 - 2020Copyright 2004 - 2025
         by Michael Pounov <misho@elwix.org>.  All rights reserved.          by Michael Pounov <misho@elwix.org>.  All rights reserved.
   
 Redistribution and use in source and binary forms, with or without  Redistribution and use in source and binary forms, with or without
Line 69  static void *txBPFPacket(sched_task_t *); Line 69  static void *txBPFPacket(sched_task_t *);
 static void *rxEXTPacket(sched_task_t *);  static void *rxEXTPacket(sched_task_t *);
 static void *txEXTPacket(sched_task_t *);  static void *txEXTPacket(sched_task_t *);
   
   #ifdef __linux__
           #ifdef __mips__
                   static sched_task_func_t cbProto[SOCK_MAX_SUPPORT][4] = {
                           { acceptClients, closeClient, rxPacket, txPacket },             /* SOCK_STREAM */
                           { rxUDPPacket, freeClient, NULL /*rxUDPPacket*/, txUDPPacket }, /* SOCK_DGRAM */
                           { acceptClients, closeClient, rxPacket, txPacket },             /* SOCK_STREAM */
                           { rxRAWPacket, freeClient, NULL /*rxRAWPacket*/, txRAWPacket }, /* SOCK_RAW */
                           { rxBPFPacket, freeClient, NULL /*rxBPFPacket*/, txBPFPacket }, /* SOCK_BPF */
                           { rxEXTPacket, freeClient, NULL /*rxEXTPacket*/, txEXTPacket }  /* SOCK_EXT */
                   };
           #else
                   static sched_task_func_t cbProto[SOCK_MAX_SUPPORT][4] = {
                           { acceptClients, closeClient, rxPacket, txPacket },             /* SOCK_STREAM */
                           { acceptClients, closeClient, rxPacket, txPacket },             /* SOCK_STREAM */
                           { rxUDPPacket, freeClient, NULL /*rxUDPPacket*/, txUDPPacket }, /* SOCK_DGRAM */
                           { rxRAWPacket, freeClient, NULL /*rxRAWPacket*/, txRAWPacket }, /* SOCK_RAW */
                           { rxBPFPacket, freeClient, NULL /*rxBPFPacket*/, txBPFPacket }, /* SOCK_BPF */
                           { rxEXTPacket, freeClient, NULL /*rxEXTPacket*/, txEXTPacket }  /* SOCK_EXT */
                   };
           #endif
   #else
 static sched_task_func_t cbProto[SOCK_MAX_SUPPORT][4] = {  static sched_task_func_t cbProto[SOCK_MAX_SUPPORT][4] = {
         { acceptClients, closeClient, rxPacket, txPacket },             /* SOCK_STREAM */          { acceptClients, closeClient, rxPacket, txPacket },             /* SOCK_STREAM */
         { acceptClients, closeClient, rxPacket, txPacket },             /* SOCK_STREAM */          { acceptClients, closeClient, rxPacket, txPacket },             /* SOCK_STREAM */
Line 77  static sched_task_func_t cbProto[SOCK_MAX_SUPPORT][4]  Line 98  static sched_task_func_t cbProto[SOCK_MAX_SUPPORT][4] 
         { rxBPFPacket, freeClient, NULL /*rxBPFPacket*/, txBPFPacket }, /* SOCK_BPF */          { rxBPFPacket, freeClient, NULL /*rxBPFPacket*/, txBPFPacket }, /* SOCK_BPF */
         { rxEXTPacket, freeClient, NULL /*rxEXTPacket*/, txEXTPacket }  /* SOCK_EXT */          { rxEXTPacket, freeClient, NULL /*rxEXTPacket*/, txEXTPacket }  /* SOCK_EXT */
 };  };
   #endif
   
 /* Global Signal Argument when kqueue support disabled */  /* Global Signal Argument when kqueue support disabled */
   
Line 87  rpc_freeCli(rpc_cli_t * __restrict c) Line 109  rpc_freeCli(rpc_cli_t * __restrict c)
 {  {
         rpc_srv_t *s = c->cli_parent;          rpc_srv_t *s = c->cli_parent;
   
        schedCancelby(s->srv_root, taskMAX, CRITERIA_ARG, c, NULL);        if (s->srv_proto == SOCK_STREAM)
                 schedCancelby(s->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
   
         /* free buffer */          /* free buffer */
         AIT_FREE_VAL(&c->cli_buf);          AIT_FREE_VAL(&c->cli_buf);
Line 146  _allocClient(rpc_srv_t * __restrict srv, sockaddr_t *  Line 169  _allocClient(rpc_srv_t * __restrict srv, sockaddr_t * 
   
                 /* alloc empty buffer */                  /* alloc empty buffer */
                 AIT_SET_BUFSIZ(&c->cli_buf, 0, srv->srv_netbuf);                  AIT_SET_BUFSIZ(&c->cli_buf, 0, srv->srv_netbuf);
                   if (!AIT_GET_BUF(&c->cli_buf)) {
                           array_Del(srv->srv_clients, n, 0);
                           e_free(c);
                           c = NULL;
                   }
         }          }
   
         return c;          return c;
Line 291  execCall(sched_task_t *task) Line 319  execCall(sched_task_t *task)
                         if (TASK_VAL(task)) {                          if (TASK_VAL(task)) {
                                 /* without reply */                                  /* without reply */
                                 ait_freeVars(&c->cli_vars);                                  ait_freeVars(&c->cli_vars);
                        } else {                        } else if (rpc->call_io & RPC_REQ) {
                                 /* reply */                                  /* reply */
                                 rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));                                  rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
                         }                          }
Line 307  rxPacket(sched_task_t *task) Line 335  rxPacket(sched_task_t *task)
 {  {
         rpc_cli_t *c = TASK_ARG(task);          rpc_cli_t *c = TASK_ARG(task);
         rpc_srv_t *s = c->cli_parent;          rpc_srv_t *s = c->cli_parent;
        int len, noreply = 0, rlen = AIT_LEN(&c->cli_buf);        int len, noreply = 0, rlen;
 #if 0  #if 0
         u_short crc;          u_short crc;
 #endif  #endif
Line 323  rxPacket(sched_task_t *task) Line 351  rxPacket(sched_task_t *task)
   
         /* prepare rx */          /* prepare rx */
         len = recv(TASK_FD(task), &b, sizeof b, MSG_PEEK);          len = recv(TASK_FD(task), &b, sizeof b, MSG_PEEK);
        if (len == sizeof b)        if (len < 1) {
                 /* close connection */
                 schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                 TASK_ARG(task), 0, NULL, 0);
                 taskExit(task, NULL);
         } else if (len == sizeof b)
                 rlen = ntohl(b.call_len);                  rlen = ntohl(b.call_len);
           else
                   goto end;
   
         rlen = recv(TASK_FD(task), buf, rlen, 0);          rlen = recv(TASK_FD(task), buf, rlen, 0);
         if (rlen == -1) {          if (rlen == -1) {
Line 387  rxPacket(sched_task_t *task) Line 422  rxPacket(sched_task_t *task)
         schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), (int) noreply, rpc, len);          schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), (int) noreply, rpc, len);
 err:  err:
         /* send RPC reply */          /* send RPC reply */
        if (!noreply)        if (!noreply && (rpc->call_io & RPC_REQ))
                 schedWrite(TASK_ROOT(task), cbProto[s->srv_proto][CB_TXPACKET],                   schedWrite(TASK_ROOT(task), cbProto[s->srv_proto][CB_TXPACKET], 
                                 TASK_ARG(task), TASK_FD(task), rpc, len);                                  TASK_ARG(task), TASK_FD(task), rpc, len);
end:
         /* lets get next packet */          /* lets get next packet */
         schedReadSelf(task);          schedReadSelf(task);
         taskExit(task, NULL);          taskExit(task, NULL);
Line 530  rxUDPPacket(sched_task_t *task) Line 565  rxUDPPacket(sched_task_t *task)
 #endif  #endif
         rlen = recvfrom(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b), 0, &sa.sa, &salen);          rlen = recvfrom(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b), 0, &sa.sa, &salen);
         rpc = (struct tagRPCCall*) AIT_GET_BUF(&b);          rpc = (struct tagRPCCall*) AIT_GET_BUF(&b);
        if (rlen < sizeof(struct tagRPCCall))        if (!rpc || rlen < sizeof(struct tagRPCCall))
                 goto end;                  goto end;
         else          else
                 len = ntohl(rpc->call_len);                  len = ntohl(rpc->call_len);
Line 544  rxUDPPacket(sched_task_t *task) Line 579  rxUDPPacket(sched_task_t *task)
         /* check integrity of packet */          /* check integrity of packet */
         crc = ntohs(rpc->call_crc);          crc = ntohs(rpc->call_crc);
         rpc->call_crc ^= rpc->call_crc;          rpc->call_crc ^= rpc->call_crc;
        if (crc != crcFletcher16((u_short*) rpc, len / 2))        if (crc != crcFletcher16((u_short*) AIT_GET_BUF(&b), len / 2))
                 goto end;                  goto end;
   
         /* check RPC packet session info */          /* check RPC packet session info */
Line 553  rxUDPPacket(sched_task_t *task) Line 588  rxUDPPacket(sched_task_t *task)
   
         c = _allocClient(srv, &sa);          c = _allocClient(srv, &sa);
         if (!c) {          if (!c) {
                EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");                EVERBOSE(1, "RPC client quota exceeded!");
                 usleep(2000);   /* blocked client delay */                  usleep(2000);   /* blocked client delay */
                 goto end;                  goto end;
         } else {          } else {
Line 575  rxUDPPacket(sched_task_t *task) Line 610  rxUDPPacket(sched_task_t *task)
         schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);          schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
   
         /* send RPC reply */          /* send RPC reply */
        if (!noreply)        if (!noreply && (rpc->call_io & RPC_REQ))
                 schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],                   schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], 
                                 c, TASK_FD(task), rpc, len);                                  c, TASK_FD(task), rpc, len);
 end:  end:
Line 674  rxRAWPacket(sched_task_t *task) Line 709  rxRAWPacket(sched_task_t *task)
         if (sa.sa.sa_family == AF_INET) {          if (sa.sa.sa_family == AF_INET) {
                 struct ip *h;                  struct ip *h;
                 h = (struct ip*) AIT_GET_BUF(&b);                  h = (struct ip*) AIT_GET_BUF(&b);
                if (rlen < ntohs(h->ip_len) || h->ip_p != IPPROTO_ERPC)                if (!h || rlen < ntohs(h->ip_len) || h->ip_p != IPPROTO_ERPC)
                         goto end;                          goto end;
                 else {                  else {
                         rlen -= sizeof(struct ip);                          rlen -= sizeof(struct ip);
Line 684  rxRAWPacket(sched_task_t *task) Line 719  rxRAWPacket(sched_task_t *task)
 #ifdef IPV6_REMOVE_HEADER  #ifdef IPV6_REMOVE_HEADER
                 struct ip6_hdr *h;                  struct ip6_hdr *h;
                 h = (struct ip6_hdr*) AIT_GET_BUF(&b);                  h = (struct ip6_hdr*) AIT_GET_BUF(&b);
                if (rlen < ntohs(h->ip6_plen) || h->ip6_nxt != IPPROTO_ERPC)                if (!h || rlen < ntohs(h->ip6_plen) || h->ip6_nxt != IPPROTO_ERPC)
                         goto end;                          goto end;
                 else {                  else {
                         rlen -= sizeof(struct ip6_hdr);                          rlen -= sizeof(struct ip6_hdr);
Line 694  rxRAWPacket(sched_task_t *task) Line 729  rxRAWPacket(sched_task_t *task)
                 rpc = (struct tagRPCCall*) AIT_GET_BUF(&b);                  rpc = (struct tagRPCCall*) AIT_GET_BUF(&b);
 #endif  #endif
         }          }
        if (rlen < sizeof(struct tagRPCCall))        if (!rpc || rlen < sizeof(struct tagRPCCall))
                 goto end;                  goto end;
         else          else
                 len = ntohl(rpc->call_len);                  len = ntohl(rpc->call_len);
Line 708  rxRAWPacket(sched_task_t *task) Line 743  rxRAWPacket(sched_task_t *task)
         /* check integrity of packet */          /* check integrity of packet */
         crc = ntohs(rpc->call_crc);          crc = ntohs(rpc->call_crc);
         rpc->call_crc ^= rpc->call_crc;          rpc->call_crc ^= rpc->call_crc;
        if (crc != crcFletcher16((u_short*) rpc, len / 2))        if (crc != crcFletcher16((u_short*) AIT_GET_BUF(&b), len / 2))
                 goto end;                  goto end;
   
         /* check RPC packet session info */          /* check RPC packet session info */
Line 739  rxRAWPacket(sched_task_t *task) Line 774  rxRAWPacket(sched_task_t *task)
         schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);          schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
   
         /* send RPC reply */          /* send RPC reply */
        if (!noreply)        if (!noreply && (rpc->call_io & RPC_REQ))
                 schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],                   schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], 
                                 c, TASK_FD(task), rpc, len);                                  c, TASK_FD(task), rpc, len);
 end:  end:
Line 909  rxBPFPacket(sched_task_t *task) Line 944  rxBPFPacket(sched_task_t *task)
         schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);          schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
   
         /* send RPC reply */          /* send RPC reply */
        if (!noreply)        if (!noreply && (rpc->call_io & RPC_REQ))
                 schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],                   schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], 
                                 c, TASK_FD(task), rpc, len);                                  c, TASK_FD(task), rpc, len);
 end:  end:
Line 990  rxEXTPacket(sched_task_t *task) Line 1025  rxEXTPacket(sched_task_t *task)
 {  {
         rpc_srv_t *srv = TASK_ARG(task);          rpc_srv_t *srv = TASK_ARG(task);
         rpc_cli_t *c = NULL;          rpc_cli_t *c = NULL;
        int len, noreply = 0, rlen = AIT_LEN(&c->cli_buf);        int len, noreply = 0, rlen;
         struct tagRPCCall *rpc;          struct tagRPCCall *rpc;
         struct timespec ts = { DEF_RPC_TIMEOUT, 0 };          struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
         sockaddr_t sa;          sockaddr_t sa;
Line 1001  rxEXTPacket(sched_task_t *task) Line 1036  rxEXTPacket(sched_task_t *task)
         AIT_SET_BUF(&b, NULL, srv->srv_netbuf);          AIT_SET_BUF(&b, NULL, srv->srv_netbuf);
         rlen = read(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b));          rlen = read(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b));
         rpc = (struct tagRPCCall*) AIT_GET_BUF(&b);          rpc = (struct tagRPCCall*) AIT_GET_BUF(&b);
        if (rlen < sizeof(struct tagRPCCall))        if (!rpc || rlen < sizeof(struct tagRPCCall))
                 goto end;                  goto end;
         else          else
                 len = ntohl(rpc->call_len);                  len = ntohl(rpc->call_len);
Line 1039  rxEXTPacket(sched_task_t *task) Line 1074  rxEXTPacket(sched_task_t *task)
         schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);          schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
   
         /* send RPC reply */          /* send RPC reply */
        if (!noreply)        if (!noreply && (rpc->call_io & RPC_REQ))
                 schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],                   schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], 
                                 c, TASK_FD(task), rpc, len);                                  c, TASK_FD(task), rpc, len);
 end:  end:
Line 1549  rpc_srv_initServer(u_char InstID, int concurentClients Line 1584  rpc_srv_initServer(u_char InstID, int concurentClients
         /* init functions */          /* init functions */
         pthread_mutex_init(&srv->srv_funcs.mtx, NULL);          pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
         SLIST_INIT(&srv->srv_funcs);          SLIST_INIT(&srv->srv_funcs);
        AVL_INIT(&srv->srv_funcs);        RB_INIT(&srv->srv_funcs);
   
         /* init scheduler */          /* init scheduler */
         srv->srv_root = schedBegin();          srv->srv_root = schedBegin();
Line 1709  rpc_srv_loopServer(rpc_srv_t * __restrict srv) Line 1744  rpc_srv_loopServer(rpc_srv_t * __restrict srv)
                 AIT_FREE_VAL(&f->func_name);                  AIT_FREE_VAL(&f->func_name);
                 e_free(f);                  e_free(f);
         }          }
        srv->srv_funcs.avlh_root = NULL;        srv->srv_funcs.rbh_root = NULL;
         RPC_FUNCS_UNLOCK(&srv->srv_funcs);          RPC_FUNCS_UNLOCK(&srv->srv_funcs);
   
         return 0;          return 0;
Line 1809  rpc_srv_initServer2(u_char InstID, int concurentClient Line 1844  rpc_srv_initServer2(u_char InstID, int concurentClient
         /* init functions */          /* init functions */
         pthread_mutex_init(&srv->srv_funcs.mtx, NULL);          pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
         SLIST_INIT(&srv->srv_funcs);          SLIST_INIT(&srv->srv_funcs);
        AVL_INIT(&srv->srv_funcs);        RB_INIT(&srv->srv_funcs);
   
         /* init scheduler */          /* init scheduler */
         srv->srv_root = schedBegin();          srv->srv_root = schedBegin();
Line 1927  rpc_srv_initServerExt(u_char InstID, int netBuf, int f Line 1962  rpc_srv_initServerExt(u_char InstID, int netBuf, int f
         /* init functions */          /* init functions */
         pthread_mutex_init(&srv->srv_funcs.mtx, NULL);          pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
         SLIST_INIT(&srv->srv_funcs);          SLIST_INIT(&srv->srv_funcs);
        AVL_INIT(&srv->srv_funcs);        RB_INIT(&srv->srv_funcs);
   
         /* init scheduler */          /* init scheduler */
         srv->srv_root = schedBegin();          srv->srv_root = schedBegin();
Line 1954  rpc_srv_initServerExt(u_char InstID, int netBuf, int f Line 1989  rpc_srv_initServerExt(u_char InstID, int netBuf, int f
         rpc_register_srvPing(srv);          rpc_register_srvPing(srv);
   
         return srv;          return srv;
   }
   
   /*
    * rpc_srv_Return() - Prepare IPC return answer to RPC client
    *
    * @c = RPC client 
    * return: number of arguments in response
    */
   int
   rpc_srv_Return(rpc_cli_t *c)
   {
           rpc_srv_t *s = c->cli_parent;
           u_char *buf = AIT_GET_BUF(&c->cli_buf);
           struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
   
           if (!RPC_CHK_NOREPLY(rpc)) {
                   rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
                   schedWrite(s->srv_root, cbProto[s->srv_proto][CB_TXPACKET], c, c->cli_sock, rpc, 0);
           } else
                   rpc->call_argc ^= rpc->call_argc;
   
           return rpc->call_argc;
 }  }

Removed from v.1.30.2.1  
changed lines
  Added in v.1.34


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>