Diff for /libaitrpc/src/srv.c between versions 1.26.2.12 and 1.28.2.2

version 1.26.2.12, 2015/07/02 21:52:29 version 1.28.2.2, 2016/08/02 10:39:50
Line 12  terms: Line 12  terms:
 All of the documentation and software included in the ELWIX and AITNET  All of the documentation and software included in the ELWIX and AITNET
 Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>  Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
   
Copyright 2004 - 2015Copyright 2004 - 2016
         by Michael Pounov <misho@elwix.org>.  All rights reserved.          by Michael Pounov <misho@elwix.org>.  All rights reserved.
   
 Redistribution and use in source and binary forms, with or without  Redistribution and use in source and binary forms, with or without
Line 82  static sched_task_func_t cbProto[SOCK_MAX_SUPPORT][4]  Line 82  static sched_task_func_t cbProto[SOCK_MAX_SUPPORT][4] 
   
 static volatile uintptr_t _glSigArg = 0;  static volatile uintptr_t _glSigArg = 0;
   
 #pragma GCC visibility push(hidden)  
   
 static int  
 rpc_funcs_cmp(struct tagRPCFunc *a, struct tagRPCFunc *b)  
 {  
         int ret;  
   
         assert(a && b);  
   
         ret = AIT_KEY(&a->func_name) - AIT_KEY(&b->func_name);  
   
         if (ret < 0)  
                 return -1;  
         else if (ret > 0)  
                 return 1;  
   
         return ret;  
 }  
   
 AVL_GENERATE(tagRPCFuncs, tagRPCFunc, func_node, rpc_funcs_cmp);  
   
 #pragma GCC visibility pop  
   
 void  void
 rpc_freeCli(rpc_cli_t * __restrict c)  rpc_freeCli(rpc_cli_t * __restrict c)
 {  {
Line 335  rxPacket(sched_task_t *task) Line 312  rxPacket(sched_task_t *task)
         u_short crc;          u_short crc;
 #endif  #endif
         u_char *buf = AIT_GET_BUF(&c->cli_buf);          u_char *buf = AIT_GET_BUF(&c->cli_buf);
        u_char b[sizeof(struct tagRPCCall)];        struct tagRPCCall b, *rpc = (struct tagRPCCall*) buf;
        struct tagRPCCall *rpc = (struct tagRPCCall*) buf; 
 #ifdef TCP_SESSION_TIMEOUT  #ifdef TCP_SESSION_TIMEOUT
         struct timespec ts = { DEF_RPC_TIMEOUT, 0 };          struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
   
Line 346  rxPacket(sched_task_t *task) Line 322  rxPacket(sched_task_t *task)
 #endif  #endif
   
         /* prepare rx */          /* prepare rx */
        len = recv(TASK_FD(task), b, sizeof b, MSG_PEEK);        len = recv(TASK_FD(task), &b, sizeof b, MSG_PEEK);
         if (len == sizeof b)          if (len == sizeof b)
                rlen = ntohl(((struct tagRPCCall*) b)->call_len);                rlen = ntohl(b.call_len);
   
         rlen = recv(TASK_FD(task), buf, rlen, 0);          rlen = recv(TASK_FD(task), buf, rlen, 0);
         if (rlen == -1) {          if (rlen == -1) {
Line 522  txUDPPacket(sched_task_t *task) Line 498  txUDPPacket(sched_task_t *task)
   
         /* send reply */          /* send reply */
         ret = sendto(TASK_FD(task), buf, wlen, MSG_NOSIGNAL,           ret = sendto(TASK_FD(task), buf, wlen, MSG_NOSIGNAL, 
                        &c->cli_sa.sa, c->cli_sa.sa.sa_len);                        &c->cli_sa.sa, sizeof c->cli_sa.sa);
         if (ret == -1) {          if (ret == -1) {
                 /* close connection */                  /* close connection */
                 schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],                   schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
Line 541  rxUDPPacket(sched_task_t *task) Line 517  rxUDPPacket(sched_task_t *task)
         u_short crc;          u_short crc;
         struct tagRPCCall *rpc;          struct tagRPCCall *rpc;
         sockaddr_t sa;          sockaddr_t sa;
        socklen_t salen;        socklen_t salen = (u_char) MIN(sizeof(sockaddr_t), 0xff);
         struct timespec ts = { DEF_RPC_TIMEOUT, 0 };          struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
         ait_val_t b = AIT_VAL_INIT;          ait_val_t b = AIT_VAL_INIT;
   
         /* receive connect packet */          /* receive connect packet */
         AIT_SET_BUF(&b, NULL, srv->srv_netbuf);          AIT_SET_BUF(&b, NULL, srv->srv_netbuf);
        salen = sa.ss.ss_len = sizeof(sockaddr_t);#ifndef __linux__
         sa.ss.ss_len = salen;
 #endif
         rlen = recvfrom(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b), 0, &sa.sa, &salen);          rlen = recvfrom(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b), 0, &sa.sa, &salen);
         rpc = (struct tagRPCCall*) AIT_GET_BUF(&b);          rpc = (struct tagRPCCall*) AIT_GET_BUF(&b);
         if (rlen < sizeof(struct tagRPCCall))          if (rlen < sizeof(struct tagRPCCall))
Line 662  txRAWPacket(sched_task_t *task) Line 640  txRAWPacket(sched_task_t *task)
   
         /* send reply */          /* send reply */
         ret = sendto(TASK_FD(task), buf, wlen, MSG_NOSIGNAL,           ret = sendto(TASK_FD(task), buf, wlen, MSG_NOSIGNAL, 
                        &c->cli_sa.sa, c->cli_sa.sa.sa_len);                        &c->cli_sa.sa, sizeof c->cli_sa.sa);
         if (ret == -1) {          if (ret == -1) {
                 /* close connection */                  /* close connection */
                 schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],                   schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
Line 681  rxRAWPacket(sched_task_t *task) Line 659  rxRAWPacket(sched_task_t *task)
         u_short crc;          u_short crc;
         struct tagRPCCall *rpc;          struct tagRPCCall *rpc;
         sockaddr_t sa;          sockaddr_t sa;
        socklen_t salen;        socklen_t salen = (u_char) MIN(sizeof(sockaddr_t), 0xff);
         struct timespec ts = { DEF_RPC_TIMEOUT, 0 };          struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
         ait_val_t b = AIT_VAL_INIT;          ait_val_t b = AIT_VAL_INIT;
   
         /* receive connect packet */          /* receive connect packet */
         AIT_SET_BUF(&b, NULL, srv->srv_netbuf);          AIT_SET_BUF(&b, NULL, srv->srv_netbuf);
        salen = sa.ss.ss_len = sizeof(sockaddr_t);#ifndef __linux__
         sa.ss.ss_len = salen;
 #endif
         rlen = recvfrom(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b), 0, &sa.sa, &salen);          rlen = recvfrom(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b), 0, &sa.sa, &salen);
         if (sa.sa.sa_family == AF_INET) {          if (sa.sa.sa_family == AF_INET) {
                 struct ip *h;                  struct ip *h;
Line 767  end: Line 747  end:
 static void *  static void *
 txBPFPacket(sched_task_t *task)  txBPFPacket(sched_task_t *task)
 {  {
   #ifndef __linux__
         rpc_cli_t *c = TASK_ARG(task);          rpc_cli_t *c = TASK_ARG(task);
         rpc_srv_t *s = c->cli_parent;          rpc_srv_t *s = c->cli_parent;
         rpc_func_t *f = NULL;          rpc_func_t *f = NULL;
Line 835  txBPFPacket(sched_task_t *task) Line 816  txBPFPacket(sched_task_t *task)
                 schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],                   schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                 TASK_ARG(task), 0, NULL, 0);                                  TASK_ARG(task), 0, NULL, 0);
         }          }
   #else
           rpc_SetErr(ENOTSUP, "Feature isn't supported on Linux!");
   #endif
   
         taskExit(task, NULL);          taskExit(task, NULL);
 }  }
Line 842  txBPFPacket(sched_task_t *task) Line 826  txBPFPacket(sched_task_t *task)
 static void *  static void *
 rxBPFPacket(sched_task_t *task)  rxBPFPacket(sched_task_t *task)
 {  {
   #ifndef __linux__
         rpc_srv_t *srv = TASK_ARG(task);          rpc_srv_t *srv = TASK_ARG(task);
         rpc_cli_t *c = NULL;          rpc_cli_t *c = NULL;
         int len, rlen, noreply;          int len, rlen, noreply;
Line 925  rxBPFPacket(sched_task_t *task) Line 910  rxBPFPacket(sched_task_t *task)
 end:  end:
         AIT_FREE_VAL(&b);          AIT_FREE_VAL(&b);
         schedReadSelf(task);          schedReadSelf(task);
   #else
           rpc_SetErr(ENOTSUP, "Feature isn't supported on Linux!");
   #endif
   
         taskExit(task, NULL);          taskExit(task, NULL);
 }  }
   
Line 1190  end: Line 1179  end:
 static void *  static void *
 flushBLOB(sched_task_t *task)  flushBLOB(sched_task_t *task)
 {  {
   #ifdef atomic_load_acq_ptr
         uintptr_t sigArg = atomic_load_acq_ptr(&_glSigArg);          uintptr_t sigArg = atomic_load_acq_ptr(&_glSigArg);
   #else
           uintptr_t sigArg = *((volatile uintptr_t*) &_glSigArg);
   #endif
         rpc_srv_t *srv = sigArg ? (void*) sigArg : TASK_ARG(task);          rpc_srv_t *srv = sigArg ? (void*) sigArg : TASK_ARG(task);
         rpc_blob_t *b, *tmp;          rpc_blob_t *b, *tmp;
   
Line 1352  rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s Line 1345  rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s
                 return -1;                  return -1;
         }          }
         if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa,           if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa, 
                                srv->srv_blob.server.cli_sa.sa.sa_len) == -1) {                                sizeof srv->srv_blob.server.cli_sa.sa) == -1) {
                 LOGERR;                  LOGERR;
                 close(srv->srv_blob.server.cli_sock);                  close(srv->srv_blob.server.cli_sock);
                 AIT_FREE_VAL(&srv->srv_blob.dir);                  AIT_FREE_VAL(&srv->srv_blob.dir);
Line 1398  rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv) Line 1391  rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
   
         srv->srv_blob.kill = 1;          srv->srv_blob.kill = 1;
   
         schedEnd(&srv->srv_blob.root);  
   
         if (srv->srv_blob.server.cli_sa.sa.sa_family == AF_LOCAL)          if (srv->srv_blob.server.cli_sa.sa.sa_family == AF_LOCAL)
                 unlink(srv->srv_blob.server.cli_sa.sun.sun_path);                  unlink(srv->srv_blob.server.cli_sa.sun.sun_path);
   
           schedEnd(&srv->srv_blob.root);
 }  }
   
 /*  /*
Line 1432  rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv) Line 1425  rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv)
                 /* disabled kqueue support in libaitsched */                  /* disabled kqueue support in libaitsched */
                 struct sigaction sa;                  struct sigaction sa;
   
   #ifdef atomic_store_rel_ptr
                 atomic_store_rel_ptr(&_glSigArg, (uintptr_t) srv);                  atomic_store_rel_ptr(&_glSigArg, (uintptr_t) srv);
   #else
                   *((volatile uintptr_t*) &_glSigArg) = (uintptr_t) srv;
   #endif
   
                 memset(&sa, 0, sizeof sa);                  memset(&sa, 0, sizeof sa);
                 sigemptyset(&sa.sa_mask);                  sigemptyset(&sa.sa_mask);
Line 1588  rpc_srv_initServer(u_char InstID, int concurentClients Line 1585  rpc_srv_initServer(u_char InstID, int concurentClients
                 goto err;                  goto err;
         }          }
         if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa,           if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa, 
                                srv->srv_server.cli_sa.sa.sa_len) == -1) {                                sizeof srv->srv_server.cli_sa.sa) == -1) {
                 LOGERR;                  LOGERR;
                 goto err;                  goto err;
         } else          } else
Line 1621  rpc_srv_endServer(rpc_srv_t ** __restrict psrv) Line 1618  rpc_srv_endServer(rpc_srv_t ** __restrict psrv)
   
         /* if send kill to blob server */          /* if send kill to blob server */
         rpc_srv_endBLOBServer(*psrv);          rpc_srv_endBLOBServer(*psrv);
           /* wait for BLOB server done */
           while (*(&(*psrv)->srv_blob.root))
                   usleep(1000);
   
         (*psrv)->srv_kill = 1;          (*psrv)->srv_kill = 1;
         sleep(RPC_SCHED_POLLING);          sleep(RPC_SCHED_POLLING);
   
         schedEnd(&(*psrv)->srv_root);  
   
         if ((*psrv)->srv_server.cli_sa.sa.sa_family == AF_LOCAL)          if ((*psrv)->srv_server.cli_sa.sa.sa_family == AF_LOCAL)
                 unlink((*psrv)->srv_server.cli_sa.sun.sun_path);                  unlink((*psrv)->srv_server.cli_sa.sun.sun_path);
   
           schedEnd(&(*psrv)->srv_root);
   
         pthread_mutex_destroy(&(*psrv)->srv_funcs.mtx);          pthread_mutex_destroy(&(*psrv)->srv_funcs.mtx);
         e_free(*psrv);          e_free(*psrv);
         *psrv = NULL;          *psrv = NULL;
Line 1742  rpc_srv_execCall(rpc_cli_t * __restrict cli, struct ta Line 1742  rpc_srv_execCall(rpc_cli_t * __restrict cli, struct ta
 rpc_srv_t *  rpc_srv_t *
 rpc_srv_initServer2(u_char InstID, int concurentClients, int netBuf, const char *csIface)  rpc_srv_initServer2(u_char InstID, int concurentClients, int netBuf, const char *csIface)
 {  {
   #ifndef __linux__
         int n = 1;          int n = 1;
         rpc_srv_t *srv = NULL;          rpc_srv_t *srv = NULL;
         sockaddr_t sa = E_SOCKADDR_INIT;          sockaddr_t sa = E_SOCKADDR_INIT;
Line 1868  err: /* error condition */ Line 1869  err: /* error condition */
         schedEnd(&srv->srv_root);          schedEnd(&srv->srv_root);
         pthread_mutex_destroy(&srv->srv_funcs.mtx);          pthread_mutex_destroy(&srv->srv_funcs.mtx);
         e_free(srv);          e_free(srv);
   #else
           rpc_SetErr(ENOTSUP, "Feature isn't supported on Linux!");
   #endif
   
         return NULL;          return NULL;
 }  }
   

Removed from v.1.26.2.12  
changed lines
  Added in v.1.28.2.2


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>