Diff for /libaitrpc/src/srv.c between versions 1.27 and 1.31

version 1.27, 2015/07/02 22:28:15 version 1.31, 2024/03/20 17:32:31
Line 12  terms: Line 12  terms:
 All of the documentation and software included in the ELWIX and AITNET  All of the documentation and software included in the ELWIX and AITNET
 Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>  Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
   
Copyright 2004 - 2015Copyright 2004 - 2024
         by Michael Pounov <misho@elwix.org>.  All rights reserved.          by Michael Pounov <misho@elwix.org>.  All rights reserved.
   
 Redistribution and use in source and binary forms, with or without  Redistribution and use in source and binary forms, with or without
Line 87  rpc_freeCli(rpc_cli_t * __restrict c) Line 87  rpc_freeCli(rpc_cli_t * __restrict c)
 {  {
         rpc_srv_t *s = c->cli_parent;          rpc_srv_t *s = c->cli_parent;
   
        schedCancelby(s->srv_root, taskMAX, CRITERIA_ARG, c, NULL);        if (s->srv_proto == SOCK_STREAM)
                 schedCancelby(s->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
   
         /* free buffer */          /* free buffer */
         AIT_FREE_VAL(&c->cli_buf);          AIT_FREE_VAL(&c->cli_buf);
Line 146  _allocClient(rpc_srv_t * __restrict srv, sockaddr_t *  Line 147  _allocClient(rpc_srv_t * __restrict srv, sockaddr_t * 
   
                 /* alloc empty buffer */                  /* alloc empty buffer */
                 AIT_SET_BUFSIZ(&c->cli_buf, 0, srv->srv_netbuf);                  AIT_SET_BUFSIZ(&c->cli_buf, 0, srv->srv_netbuf);
                   if (!AIT_GET_BUF(&c->cli_buf)) {
                           array_Del(srv->srv_clients, n, 0);
                           e_free(c);
                           c = NULL;
                   }
         }          }
   
         return c;          return c;
Line 291  execCall(sched_task_t *task) Line 297  execCall(sched_task_t *task)
                         if (TASK_VAL(task)) {                          if (TASK_VAL(task)) {
                                 /* without reply */                                  /* without reply */
                                 ait_freeVars(&c->cli_vars);                                  ait_freeVars(&c->cli_vars);
                        } else {                        } else if (rpc->call_io & RPC_REQ) {
                                 /* reply */                                  /* reply */
                                 rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));                                  rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
                         }                          }
Line 307  rxPacket(sched_task_t *task) Line 313  rxPacket(sched_task_t *task)
 {  {
         rpc_cli_t *c = TASK_ARG(task);          rpc_cli_t *c = TASK_ARG(task);
         rpc_srv_t *s = c->cli_parent;          rpc_srv_t *s = c->cli_parent;
        int len, noreply = 0, rlen = AIT_LEN(&c->cli_buf);        int len, noreply = 0, rlen;
 #if 0  #if 0
         u_short crc;          u_short crc;
 #endif  #endif
         u_char *buf = AIT_GET_BUF(&c->cli_buf);          u_char *buf = AIT_GET_BUF(&c->cli_buf);
        u_char b[sizeof(struct tagRPCCall)];        struct tagRPCCall b, *rpc = (struct tagRPCCall*) buf;
        struct tagRPCCall *rpc = (struct tagRPCCall*) buf; 
 #ifdef TCP_SESSION_TIMEOUT  #ifdef TCP_SESSION_TIMEOUT
         struct timespec ts = { DEF_RPC_TIMEOUT, 0 };          struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
   
Line 323  rxPacket(sched_task_t *task) Line 328  rxPacket(sched_task_t *task)
 #endif  #endif
   
         /* prepare rx */          /* prepare rx */
        len = recv(TASK_FD(task), b, sizeof b, MSG_PEEK);        len = recv(TASK_FD(task), &b, sizeof b, MSG_PEEK);
        if (len == sizeof b)        if (len < 1) {
                rlen = ntohl(((struct tagRPCCall*) b)->call_len);                /* close connection */
                 schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                 TASK_ARG(task), 0, NULL, 0);
                 taskExit(task, NULL);
         } else if (len == sizeof b)
                 rlen = ntohl(b.call_len);
         else
                 goto end;
   
         rlen = recv(TASK_FD(task), buf, rlen, 0);          rlen = recv(TASK_FD(task), buf, rlen, 0);
         if (rlen == -1) {          if (rlen == -1) {
Line 388  rxPacket(sched_task_t *task) Line 400  rxPacket(sched_task_t *task)
         schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), (int) noreply, rpc, len);          schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), (int) noreply, rpc, len);
 err:  err:
         /* send RPC reply */          /* send RPC reply */
        if (!noreply)        if (!noreply && (rpc->call_io & RPC_REQ))
                 schedWrite(TASK_ROOT(task), cbProto[s->srv_proto][CB_TXPACKET],                   schedWrite(TASK_ROOT(task), cbProto[s->srv_proto][CB_TXPACKET], 
                                 TASK_ARG(task), TASK_FD(task), rpc, len);                                  TASK_ARG(task), TASK_FD(task), rpc, len);
end:
         /* lets get next packet */          /* lets get next packet */
         schedReadSelf(task);          schedReadSelf(task);
         taskExit(task, NULL);          taskExit(task, NULL);
Line 402  acceptClients(sched_task_t *task) Line 414  acceptClients(sched_task_t *task)
 {  {
         rpc_srv_t *srv = TASK_ARG(task);          rpc_srv_t *srv = TASK_ARG(task);
         rpc_cli_t *c = NULL;          rpc_cli_t *c = NULL;
        socklen_t salen = sizeof(sockaddr_t);        socklen_t salen = E_SOCKADDR_MAX;
         int sock;          int sock;
 #ifdef TCP_SESSION_TIMEOUT  #ifdef TCP_SESSION_TIMEOUT
         struct timespec ts = { DEF_RPC_TIMEOUT, 0 };          struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
Line 425  acceptClients(sched_task_t *task) Line 437  acceptClients(sched_task_t *task)
                 AIT_FREE_VAL(&c->cli_buf);                  AIT_FREE_VAL(&c->cli_buf);
                 array_Del(srv->srv_clients, c->cli_id, 42);                  array_Del(srv->srv_clients, c->cli_id, 42);
                 goto end;                  goto end;
        } else        } else {
                 fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);                  fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
                   fcntl(c->cli_sock, F_SETFD, FD_CLOEXEC);
           }
   
 #ifdef TCP_SESSION_TIMEOUT  #ifdef TCP_SESSION_TIMEOUT
         /* armed timer for close stateless connection */          /* armed timer for close stateless connection */
Line 499  txUDPPacket(sched_task_t *task) Line 513  txUDPPacket(sched_task_t *task)
   
         /* send reply */          /* send reply */
         ret = sendto(TASK_FD(task), buf, wlen, MSG_NOSIGNAL,           ret = sendto(TASK_FD(task), buf, wlen, MSG_NOSIGNAL, 
                        &c->cli_sa.sa, c->cli_sa.sa.sa_len);                        &c->cli_sa.sa, e_addrlen(&c->cli_sa));
         if (ret == -1) {          if (ret == -1) {
                 /* close connection */                  /* close connection */
                 schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],                   schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
Line 518  rxUDPPacket(sched_task_t *task) Line 532  rxUDPPacket(sched_task_t *task)
         u_short crc;          u_short crc;
         struct tagRPCCall *rpc;          struct tagRPCCall *rpc;
         sockaddr_t sa;          sockaddr_t sa;
        socklen_t salen;        socklen_t salen = E_SOCKADDR_MAX;
         struct timespec ts = { DEF_RPC_TIMEOUT, 0 };          struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
         ait_val_t b = AIT_VAL_INIT;          ait_val_t b = AIT_VAL_INIT;
   
         /* receive connect packet */          /* receive connect packet */
         AIT_SET_BUF(&b, NULL, srv->srv_netbuf);          AIT_SET_BUF(&b, NULL, srv->srv_netbuf);
        salen = sa.ss.ss_len = sizeof(sockaddr_t);#ifndef __linux__
         sa.ss.ss_len = salen;
 #endif
         rlen = recvfrom(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b), 0, &sa.sa, &salen);          rlen = recvfrom(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b), 0, &sa.sa, &salen);
         rpc = (struct tagRPCCall*) AIT_GET_BUF(&b);          rpc = (struct tagRPCCall*) AIT_GET_BUF(&b);
         if (rlen < sizeof(struct tagRPCCall))          if (rlen < sizeof(struct tagRPCCall))
Line 541  rxUDPPacket(sched_task_t *task) Line 557  rxUDPPacket(sched_task_t *task)
         /* check integrity of packet */          /* check integrity of packet */
         crc = ntohs(rpc->call_crc);          crc = ntohs(rpc->call_crc);
         rpc->call_crc ^= rpc->call_crc;          rpc->call_crc ^= rpc->call_crc;
        if (crc != crcFletcher16((u_short*) rpc, len / 2))        if (crc != crcFletcher16((u_short*) AIT_GET_BUF(&b), len / 2))
                 goto end;                  goto end;
   
         /* check RPC packet session info */          /* check RPC packet session info */
Line 550  rxUDPPacket(sched_task_t *task) Line 566  rxUDPPacket(sched_task_t *task)
   
         c = _allocClient(srv, &sa);          c = _allocClient(srv, &sa);
         if (!c) {          if (!c) {
                EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");                EVERBOSE(1, "RPC client quota exceeded!");
                 usleep(2000);   /* blocked client delay */                  usleep(2000);   /* blocked client delay */
                 goto end;                  goto end;
         } else {          } else {
Line 572  rxUDPPacket(sched_task_t *task) Line 588  rxUDPPacket(sched_task_t *task)
         schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);          schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
   
         /* send RPC reply */          /* send RPC reply */
        if (!noreply)        if (!noreply && (rpc->call_io & RPC_REQ))
                 schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],                   schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], 
                                 c, TASK_FD(task), rpc, len);                                  c, TASK_FD(task), rpc, len);
 end:  end:
Line 639  txRAWPacket(sched_task_t *task) Line 655  txRAWPacket(sched_task_t *task)
   
         /* send reply */          /* send reply */
         ret = sendto(TASK_FD(task), buf, wlen, MSG_NOSIGNAL,           ret = sendto(TASK_FD(task), buf, wlen, MSG_NOSIGNAL, 
                        &c->cli_sa.sa, c->cli_sa.sa.sa_len);                        &c->cli_sa.sa, e_addrlen(&c->cli_sa));
         if (ret == -1) {          if (ret == -1) {
                 /* close connection */                  /* close connection */
                 schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],                   schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
Line 658  rxRAWPacket(sched_task_t *task) Line 674  rxRAWPacket(sched_task_t *task)
         u_short crc;          u_short crc;
         struct tagRPCCall *rpc;          struct tagRPCCall *rpc;
         sockaddr_t sa;          sockaddr_t sa;
        socklen_t salen;        socklen_t salen = E_SOCKADDR_MAX;
         struct timespec ts = { DEF_RPC_TIMEOUT, 0 };          struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
         ait_val_t b = AIT_VAL_INIT;          ait_val_t b = AIT_VAL_INIT;
   
         /* receive connect packet */          /* receive connect packet */
         AIT_SET_BUF(&b, NULL, srv->srv_netbuf);          AIT_SET_BUF(&b, NULL, srv->srv_netbuf);
        salen = sa.ss.ss_len = sizeof(sockaddr_t);#ifndef __linux__
         sa.ss.ss_len = salen;
 #endif
         rlen = recvfrom(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b), 0, &sa.sa, &salen);          rlen = recvfrom(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b), 0, &sa.sa, &salen);
         if (sa.sa.sa_family == AF_INET) {          if (sa.sa.sa_family == AF_INET) {
                 struct ip *h;                  struct ip *h;
Line 676  rxRAWPacket(sched_task_t *task) Line 694  rxRAWPacket(sched_task_t *task)
                         rpc = (struct tagRPCCall*) (h + 1);                          rpc = (struct tagRPCCall*) (h + 1);
                 }                  }
         } else {          } else {
   #ifdef IPV6_REMOVE_HEADER
                 struct ip6_hdr *h;                  struct ip6_hdr *h;
                 h = (struct ip6_hdr*) AIT_GET_BUF(&b);                  h = (struct ip6_hdr*) AIT_GET_BUF(&b);
                if (rlen < (ntohs(h->ip6_plen) + sizeof(struct ip6_hdr)) ||                 if (rlen < ntohs(h->ip6_plen) || h->ip6_nxt != IPPROTO_ERPC)
                                h->ip6_nxt != IPPROTO_ERPC) 
                         goto end;                          goto end;
                 else {                  else {
                         rlen -= sizeof(struct ip6_hdr);                          rlen -= sizeof(struct ip6_hdr);
                         rpc = (struct tagRPCCall*) (h + 1);                          rpc = (struct tagRPCCall*) (h + 1);
                 }                  }
   #else
                   rpc = (struct tagRPCCall*) AIT_GET_BUF(&b);
   #endif
         }          }
         if (rlen < sizeof(struct tagRPCCall))          if (rlen < sizeof(struct tagRPCCall))
                 goto end;                  goto end;
Line 700  rxRAWPacket(sched_task_t *task) Line 721  rxRAWPacket(sched_task_t *task)
         /* check integrity of packet */          /* check integrity of packet */
         crc = ntohs(rpc->call_crc);          crc = ntohs(rpc->call_crc);
         rpc->call_crc ^= rpc->call_crc;          rpc->call_crc ^= rpc->call_crc;
        if (crc != crcFletcher16((u_short*) rpc, len / 2))        if (crc != crcFletcher16((u_short*) AIT_GET_BUF(&b), len / 2))
                 goto end;                  goto end;
   
         /* check RPC packet session info */          /* check RPC packet session info */
Line 731  rxRAWPacket(sched_task_t *task) Line 752  rxRAWPacket(sched_task_t *task)
         schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);          schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
   
         /* send RPC reply */          /* send RPC reply */
        if (!noreply)        if (!noreply && (rpc->call_io & RPC_REQ))
                 schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],                   schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], 
                                 c, TASK_FD(task), rpc, len);                                  c, TASK_FD(task), rpc, len);
 end:  end:
Line 744  end: Line 765  end:
 static void *  static void *
 txBPFPacket(sched_task_t *task)  txBPFPacket(sched_task_t *task)
 {  {
   #ifndef __linux__
         rpc_cli_t *c = TASK_ARG(task);          rpc_cli_t *c = TASK_ARG(task);
         rpc_srv_t *s = c->cli_parent;          rpc_srv_t *s = c->cli_parent;
         rpc_func_t *f = NULL;          rpc_func_t *f = NULL;
Line 812  txBPFPacket(sched_task_t *task) Line 834  txBPFPacket(sched_task_t *task)
                 schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],                   schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                 TASK_ARG(task), 0, NULL, 0);                                  TASK_ARG(task), 0, NULL, 0);
         }          }
   #else
           rpc_SetErr(ENOTSUP, "Feature isn't supported on Linux!");
   #endif
   
         taskExit(task, NULL);          taskExit(task, NULL);
 }  }
Line 819  txBPFPacket(sched_task_t *task) Line 844  txBPFPacket(sched_task_t *task)
 static void *  static void *
 rxBPFPacket(sched_task_t *task)  rxBPFPacket(sched_task_t *task)
 {  {
   #ifndef __linux__
         rpc_srv_t *srv = TASK_ARG(task);          rpc_srv_t *srv = TASK_ARG(task);
         rpc_cli_t *c = NULL;          rpc_cli_t *c = NULL;
         int len, rlen, noreply;          int len, rlen, noreply;
Line 896  rxBPFPacket(sched_task_t *task) Line 922  rxBPFPacket(sched_task_t *task)
         schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);          schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
   
         /* send RPC reply */          /* send RPC reply */
        if (!noreply)        if (!noreply && (rpc->call_io & RPC_REQ))
                 schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],                   schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], 
                                 c, TASK_FD(task), rpc, len);                                  c, TASK_FD(task), rpc, len);
 end:  end:
         AIT_FREE_VAL(&b);          AIT_FREE_VAL(&b);
         schedReadSelf(task);          schedReadSelf(task);
   #else
           rpc_SetErr(ENOTSUP, "Feature isn't supported on Linux!");
   #endif
   
         taskExit(task, NULL);          taskExit(task, NULL);
 }  }
   
Line 973  rxEXTPacket(sched_task_t *task) Line 1003  rxEXTPacket(sched_task_t *task)
 {  {
         rpc_srv_t *srv = TASK_ARG(task);          rpc_srv_t *srv = TASK_ARG(task);
         rpc_cli_t *c = NULL;          rpc_cli_t *c = NULL;
        int len, noreply = 0, rlen = AIT_LEN(&c->cli_buf);        int len, noreply = 0, rlen;
         struct tagRPCCall *rpc;          struct tagRPCCall *rpc;
         struct timespec ts = { DEF_RPC_TIMEOUT, 0 };          struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
         sockaddr_t sa;          sockaddr_t sa;
Line 1022  rxEXTPacket(sched_task_t *task) Line 1052  rxEXTPacket(sched_task_t *task)
         schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);          schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
   
         /* send RPC reply */          /* send RPC reply */
        if (!noreply)        if (!noreply && (rpc->call_io & RPC_REQ))
                 schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET],                   schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], 
                                 c, TASK_FD(task), rpc, len);                                  c, TASK_FD(task), rpc, len);
 end:  end:
Line 1167  end: Line 1197  end:
 static void *  static void *
 flushBLOB(sched_task_t *task)  flushBLOB(sched_task_t *task)
 {  {
   #ifdef atomic_load_acq_ptr
         uintptr_t sigArg = atomic_load_acq_ptr(&_glSigArg);          uintptr_t sigArg = atomic_load_acq_ptr(&_glSigArg);
   #else
           uintptr_t sigArg = *((volatile uintptr_t*) &_glSigArg);
   #endif
         rpc_srv_t *srv = sigArg ? (void*) sigArg : TASK_ARG(task);          rpc_srv_t *srv = sigArg ? (void*) sigArg : TASK_ARG(task);
         rpc_blob_t *b, *tmp;          rpc_blob_t *b, *tmp;
   
Line 1178  flushBLOB(sched_task_t *task) Line 1212  flushBLOB(sched_task_t *task)
                 e_free(b);                  e_free(b);
         }          }
   
        if (!schedSignalSelf(task)) {        if (sigArg) {
                 /* disabled kqueue support in libaitsched */                  /* disabled kqueue support in libaitsched */
                 struct sigaction sa;                  struct sigaction sa;
   
Line 1187  flushBLOB(sched_task_t *task) Line 1221  flushBLOB(sched_task_t *task)
                 sa.sa_handler = (void (*)(int)) flushBLOB;                  sa.sa_handler = (void (*)(int)) flushBLOB;
                 sa.sa_flags = SA_RESTART | SA_RESETHAND;                  sa.sa_flags = SA_RESTART | SA_RESETHAND;
                 sigaction(SIGFBLOB, &sa, NULL);                  sigaction(SIGFBLOB, &sa, NULL);
                   return NULL;
           } else {
                   schedSignalSelf(task);
                   taskExit(task, NULL);
         }          }
   
         taskExit(task, NULL);  
 }  }
   
 static void *  static void *
Line 1198  acceptBLOBClients(sched_task_t *task) Line 1234  acceptBLOBClients(sched_task_t *task)
         rpc_srv_t *srv = TASK_ARG(task);          rpc_srv_t *srv = TASK_ARG(task);
         rpc_cli_t *c = NULL;          rpc_cli_t *c = NULL;
         register int i;          register int i;
        socklen_t salen = sizeof(sockaddr_t);        socklen_t salen = E_SOCKADDR_MAX;
         int sock;          int sock;
 #ifdef TCP_NOPUSH  #ifdef TCP_NOPUSH
         int n = 1;          int n = 1;
Line 1243  acceptBLOBClients(sched_task_t *task) Line 1279  acceptBLOBClients(sched_task_t *task)
                 setsockopt(c->cli_sock, IPPROTO_TCP, TCP_NOPUSH, &n, sizeof n);                  setsockopt(c->cli_sock, IPPROTO_TCP, TCP_NOPUSH, &n, sizeof n);
 #endif  #endif
                 fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);                  fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
                   fcntl(c->cli_sock, F_SETFD, FD_CLOEXEC);
         }          }
   
         schedRead(TASK_ROOT(task), rxBLOB, c, c->cli_sock, NULL, 0);          schedRead(TASK_ROOT(task), rxBLOB, c, c->cli_sock, NULL, 0);
Line 1265  int Line 1302  int
 rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)  rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)
 {  {
         int n = 1;          int n = 1;
           socklen_t salen;
   
         if (!srv || srv->srv_kill) {          if (!srv || srv->srv_kill) {
                 rpc_SetErr(EINVAL, "Invalid parameters can`t init BLOB server");                  rpc_SetErr(EINVAL, "Invalid parameters can`t init BLOB server");
Line 1283  rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s Line 1321  rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s
   
         srv->srv_blob.server.cli_parent = srv;          srv->srv_blob.server.cli_parent = srv;
   
        memcpy(&srv->srv_blob.server.cli_sa, &srv->srv_server.cli_sa, sizeof(sockaddr_t));        memcpy(&srv->srv_blob.server.cli_sa, &srv->srv_server.cli_sa, sizeof srv->srv_blob.server.cli_sa);
         switch (srv->srv_blob.server.cli_sa.sa.sa_family) {          switch (srv->srv_blob.server.cli_sa.sa.sa_family) {
                 case AF_INET:                  case AF_INET:
                         srv->srv_blob.server.cli_sa.sin.sin_port =                           srv->srv_blob.server.cli_sa.sin.sin_port = 
                                 htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin.sin_port) + 1);                                  htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin.sin_port) + 1);
                           salen = sizeof srv->srv_blob.server.cli_sa.sin;
                         break;                          break;
                 case AF_INET6:                  case AF_INET6:
                         srv->srv_blob.server.cli_sa.sin6.sin6_port =                           srv->srv_blob.server.cli_sa.sin6.sin6_port = 
                                 htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin6.sin6_port) + 1);                                  htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin6.sin6_port) + 1);
                           salen = sizeof srv->srv_blob.server.cli_sa.sin6;
                         break;                          break;
                 case AF_LOCAL:                  case AF_LOCAL:
                         strlcat(srv->srv_blob.server.cli_sa.sun.sun_path, ".blob",                           strlcat(srv->srv_blob.server.cli_sa.sun.sun_path, ".blob", 
                                         sizeof srv->srv_blob.server.cli_sa.sun.sun_path);                                          sizeof srv->srv_blob.server.cli_sa.sun.sun_path);
                           salen = sizeof srv->srv_blob.server.cli_sa.sun;
                         break;                          break;
                 default:                  default:
                         AIT_FREE_VAL(&srv->srv_blob.dir);                          AIT_FREE_VAL(&srv->srv_blob.dir);
Line 1328  rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s Line 1369  rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s
                 AIT_FREE_VAL(&srv->srv_blob.dir);                  AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;                  return -1;
         }          }
        if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa,         if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa, salen) == -1) {
                                srv->srv_blob.server.cli_sa.sa.sa_len) == -1) { 
                 LOGERR;                  LOGERR;
                 close(srv->srv_blob.server.cli_sock);                  close(srv->srv_blob.server.cli_sock);
                 AIT_FREE_VAL(&srv->srv_blob.dir);                  AIT_FREE_VAL(&srv->srv_blob.dir);
Line 1375  rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv) Line 1415  rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
   
         srv->srv_blob.kill = 1;          srv->srv_blob.kill = 1;
   
         schedEnd(&srv->srv_blob.root);  
   
         if (srv->srv_blob.server.cli_sa.sa.sa_family == AF_LOCAL)          if (srv->srv_blob.server.cli_sa.sa.sa_family == AF_LOCAL)
                 unlink(srv->srv_blob.server.cli_sa.sun.sun_path);                  unlink(srv->srv_blob.server.cli_sa.sun.sun_path);
   
           schedEnd(&srv->srv_blob.root);
 }  }
   
 /*  /*
Line 1409  rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv) Line 1449  rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv)
                 /* disabled kqueue support in libaitsched */                  /* disabled kqueue support in libaitsched */
                 struct sigaction sa;                  struct sigaction sa;
   
   #ifdef atomic_store_rel_ptr
                 atomic_store_rel_ptr(&_glSigArg, (uintptr_t) srv);                  atomic_store_rel_ptr(&_glSigArg, (uintptr_t) srv);
   #else
                   *((volatile uintptr_t*) &_glSigArg) = (uintptr_t) srv;
   #endif
   
                 memset(&sa, 0, sizeof sa);                  memset(&sa, 0, sizeof sa);
                 sigemptyset(&sa.sa_mask);                  sigemptyset(&sa.sa_mask);
Line 1475  rpc_srv_initServer(u_char InstID, int concurentClients Line 1519  rpc_srv_initServer(u_char InstID, int concurentClients
         int n = 1;          int n = 1;
         rpc_srv_t *srv = NULL;          rpc_srv_t *srv = NULL;
         sockaddr_t sa = E_SOCKADDR_INIT;          sockaddr_t sa = E_SOCKADDR_INIT;
           socklen_t salen;
   
         if (!concurentClients || (proto < 0 || proto > SOCK_RAW)) {          if (!concurentClients || (proto < 0 || proto > SOCK_RAW)) {
                 rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");                  rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
Line 1482  rpc_srv_initServer(u_char InstID, int concurentClients Line 1527  rpc_srv_initServer(u_char InstID, int concurentClients
         }          }
         if (!Port && proto < SOCK_RAW)          if (!Port && proto < SOCK_RAW)
                 Port = RPC_DEFPORT;                  Port = RPC_DEFPORT;
        if (!e_gethostbyname(csHost, Port, &sa))        if (!(salen = e_gethostbyname(csHost, Port, &sa)))
                 return NULL;                  return NULL;
         if (!proto)          if (!proto)
                 proto = SOCK_STREAM;                  proto = SOCK_STREAM;
Line 1564  rpc_srv_initServer(u_char InstID, int concurentClients Line 1609  rpc_srv_initServer(u_char InstID, int concurentClients
                 LOGERR;                  LOGERR;
                 goto err;                  goto err;
         }          }
        if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa,         if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa, salen) == -1) {
                                srv->srv_server.cli_sa.sa.sa_len) == -1) { 
                 LOGERR;                  LOGERR;
                 goto err;                  goto err;
         } else          } else
Line 1598  rpc_srv_endServer(rpc_srv_t ** __restrict psrv) Line 1642  rpc_srv_endServer(rpc_srv_t ** __restrict psrv)
   
         /* if send kill to blob server */          /* if send kill to blob server */
         rpc_srv_endBLOBServer(*psrv);          rpc_srv_endBLOBServer(*psrv);
           /* wait for BLOB server done */
           while (*(&(*psrv)->srv_blob.root))
                   usleep(1000);
   
         (*psrv)->srv_kill = 1;          (*psrv)->srv_kill = 1;
         sleep(RPC_SCHED_POLLING);          sleep(RPC_SCHED_POLLING);
   
         schedEnd(&(*psrv)->srv_root);  
   
         if ((*psrv)->srv_server.cli_sa.sa.sa_family == AF_LOCAL)          if ((*psrv)->srv_server.cli_sa.sa.sa_family == AF_LOCAL)
                 unlink((*psrv)->srv_server.cli_sa.sun.sun_path);                  unlink((*psrv)->srv_server.cli_sa.sun.sun_path);
   
           schedEnd(&(*psrv)->srv_root);
   
         pthread_mutex_destroy(&(*psrv)->srv_funcs.mtx);          pthread_mutex_destroy(&(*psrv)->srv_funcs.mtx);
         e_free(*psrv);          e_free(*psrv);
         *psrv = NULL;          *psrv = NULL;
Line 1719  rpc_srv_execCall(rpc_cli_t * __restrict cli, struct ta Line 1766  rpc_srv_execCall(rpc_cli_t * __restrict cli, struct ta
 rpc_srv_t *  rpc_srv_t *
 rpc_srv_initServer2(u_char InstID, int concurentClients, int netBuf, const char *csIface)  rpc_srv_initServer2(u_char InstID, int concurentClients, int netBuf, const char *csIface)
 {  {
   #ifndef __linux__
         int n = 1;          int n = 1;
         rpc_srv_t *srv = NULL;          rpc_srv_t *srv = NULL;
         sockaddr_t sa = E_SOCKADDR_INIT;          sockaddr_t sa = E_SOCKADDR_INIT;
Line 1845  err: /* error condition */ Line 1893  err: /* error condition */
         schedEnd(&srv->srv_root);          schedEnd(&srv->srv_root);
         pthread_mutex_destroy(&srv->srv_funcs.mtx);          pthread_mutex_destroy(&srv->srv_funcs.mtx);
         e_free(srv);          e_free(srv);
   #else
           rpc_SetErr(ENOTSUP, "Feature isn't supported on Linux!");
   #endif
   
         return NULL;          return NULL;
 }  }
   
Line 1915  rpc_srv_initServerExt(u_char InstID, int netBuf, int f Line 1967  rpc_srv_initServerExt(u_char InstID, int netBuf, int f
         rpc_register_srvPing(srv);          rpc_register_srvPing(srv);
   
         return srv;          return srv;
   }
   
   /*
    * rpc_srv_Return() - Prepare IPC return answer to RPC client
    *
    * @c = RPC client 
    * return: number of arguments in response
    */
   int
   rpc_srv_Return(rpc_cli_t *c)
   {
           rpc_srv_t *s = c->cli_parent;
           u_char *buf = AIT_GET_BUF(&c->cli_buf);
           struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
   
           if (!RPC_CHK_NOREPLY(rpc)) {
                   rpc->call_argc = (u_char) array_Size(RPC_RETVARS(c));
                   schedWrite(s->srv_root, cbProto[s->srv_proto][CB_TXPACKET], c, c->cli_sock, rpc, 0);
           } else
                   rpc->call_argc ^= rpc->call_argc;
   
           return rpc->call_argc;
 }  }

Removed from v.1.27  
changed lines
  Added in v.1.31


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>