Diff for /libaitrpc/src/srv.c between versions 1.10 and 1.12.2.1

version 1.10, 2012/05/19 00:29:52 version 1.12.2.1, 2012/11/16 08:33:06
Line 46  SUCH DAMAGE. Line 46  SUCH DAMAGE.
 #include "global.h"  #include "global.h"
   
   
   static void *acceptClients(sched_task_t *);
   static void *closeClient(sched_task_t *);
   static sched_task_func_t cbProto[SOCK_RAW][2] = {
           { acceptClients, closeClient }, 
           { acceptClients, closeClient }, 
           { NULL, NULL }
   };
   
   
 static void *  static void *
 closeClient(sched_task_t *task)  closeClient(sched_task_t *task)
 {  {
Line 64  closeClient(sched_task_t *task) Line 73  closeClient(sched_task_t *task)
   
         io_arrayDel(s->srv_clients, c->cli_id, 0);          io_arrayDel(s->srv_clients, c->cli_id, 0);
         if (c)          if (c)
                free(c);                io_free(c);
         return NULL;          return NULL;
 }  }
   
Line 114  txPacket(sched_task_t *task) Line 123  txPacket(sched_task_t *task)
         ret = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);          ret = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);
         if (ret == -1 || ret != wlen) {          if (ret == -1 || ret != wlen) {
                 /* close connection */                  /* close connection */
                schedEvent(TASK_ROOT(task), closeClient, c, 42, NULL, 0);                schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                 c, 42, NULL, 0);
         }          }
   
         return NULL;          return NULL;
Line 135  execCall(sched_task_t *task) Line 145  execCall(sched_task_t *task)
         if (argc) {          if (argc) {
                 arr = io_buffer2vars(buf + sizeof(struct tagRPCCall),                   arr = io_buffer2vars(buf + sizeof(struct tagRPCCall), 
                                 AIT_LEN(&c->cli_buf) - TASK_VAL(task) - sizeof(struct tagRPCCall),                                   AIT_LEN(&c->cli_buf) - TASK_VAL(task) - sizeof(struct tagRPCCall), 
                                argc, 1);                                argc, 42);
                 if (!arr) {                  if (!arr) {
                         rpc_SetErr(ERPCMISMATCH, "#%d - %s", io_GetErrno(), io_GetError());                          rpc_SetErr(ERPCMISMATCH, "#%d - %s", io_GetErrno(), io_GetError());
                         rpc->call_argc ^= rpc->call_argc;                          rpc->call_argc ^= rpc->call_argc;
Line 185  rxPacket(sched_task_t *task) Line 195  rxPacket(sched_task_t *task)
         u_char *buf = AIT_GET_BUF(&c->cli_buf);          u_char *buf = AIT_GET_BUF(&c->cli_buf);
         struct tagRPCCall *rpc;          struct tagRPCCall *rpc;
   
        memset(buf, 0, AIT_LEN(&c->cli_buf));        if (!off)
                 memset(buf, 0, AIT_LEN(&c->cli_buf));
         else
                 memmove(buf, buf + off, AIT_LEN(&c->cli_buf) - off);
         rlen = recv(TASK_FD(task), buf + off, AIT_LEN(&c->cli_buf) - off, 0);          rlen = recv(TASK_FD(task), buf + off, AIT_LEN(&c->cli_buf) - off, 0);
         if (rlen < 1) {          if (rlen < 1) {
                 /* close connection */                  /* close connection */
                schedEvent(TASK_ROOT(task), closeClient, c, 42, NULL, 0);                schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                 c, 42, NULL, 0);
                 return NULL;                  return NULL;
         } else {          } else {
                 rlen += off;    /* add reminded bytes from previous rxPacket, if exists! */                  rlen += off;    /* add reminded bytes from previous rxPacket, if exists! */
Line 267  acceptClients(sched_task_t *task) Line 281  acceptClients(sched_task_t *task)
                         (c = io_array(srv->srv_clients, i, rpc_cli_t*)); i++);                          (c = io_array(srv->srv_clients, i, rpc_cli_t*)); i++);
         if (c)  /* no more free slots! */          if (c)  /* no more free slots! */
                 goto end;                  goto end;
        c = malloc(sizeof(rpc_cli_t));        c = io_malloc(sizeof(rpc_cli_t));
         if (!c) {          if (!c) {
                 LOGERR;                  LOGERR;
                 srv->srv_kill = 1;                  srv->srv_kill = 1;
Line 318  closeBLOBClient(sched_task_t *task) Line 332  closeBLOBClient(sched_task_t *task)
   
         io_arrayDel(s->srv_blob.clients, c->cli_id, 0);          io_arrayDel(s->srv_blob.clients, c->cli_id, 0);
         if (c)          if (c)
                free(c);                io_free(c);
         return NULL;          return NULL;
 }  }
   
Line 421  rxBLOB(sched_task_t *task) Line 435  rxBLOB(sched_task_t *task)
                         }                          }
                         break;                          break;
                 case unset:                  case unset:
                        if (rpc_srv_unregisterBLOB(s, blob.hdr_var) == -1) {                        if (rpc_srv_unregisterBLOB(s, ntohl(blob.hdr_var)) == -1) {
                                 blob.hdr_cmd = error;                                  blob.hdr_cmd = error;
                                 blob.hdr_ret = RPC_ERROR(-1);                                  blob.hdr_ret = RPC_ERROR(-1);
                         }                          }
Line 446  acceptBLOBClients(sched_task_t *task) Line 460  acceptBLOBClients(sched_task_t *task)
         rpc_cli_t *c = NULL;          rpc_cli_t *c = NULL;
         register int i;          register int i;
         socklen_t salen = sizeof(io_sockaddr_t);          socklen_t salen = sizeof(io_sockaddr_t);
   #ifdef TCP_NOPUSH
           int n = 1;
   #endif
   
         /* check free slots for connect */          /* check free slots for connect */
         for (i = 0; i < io_arraySize(srv->srv_blob.clients) &&           for (i = 0; i < io_arraySize(srv->srv_blob.clients) && 
                         (c = io_array(srv->srv_blob.clients, i, rpc_cli_t*)); i++);                          (c = io_array(srv->srv_blob.clients, i, rpc_cli_t*)); i++);
         if (c)  /* no more free slots! */          if (c)  /* no more free slots! */
                 goto end;                  goto end;
        c = malloc(sizeof(rpc_cli_t));        c = io_malloc(sizeof(rpc_cli_t));
         if (!c) {          if (!c) {
                 LOGERR;                  LOGERR;
                 srv->srv_kill = srv->srv_blob.kill = 1;                  srv->srv_kill = srv->srv_blob.kill = 1;
Line 474  acceptBLOBClients(sched_task_t *task) Line 491  acceptBLOBClients(sched_task_t *task)
                 AIT_FREE_VAL(&c->cli_buf);                  AIT_FREE_VAL(&c->cli_buf);
                 io_arrayDel(srv->srv_blob.clients, i, 42);                  io_arrayDel(srv->srv_blob.clients, i, 42);
                 goto end;                  goto end;
        } else        } else {
 #ifdef TCP_NOPUSH
                 setsockopt(c->cli_sock, IPPROTO_TCP, TCP_NOPUSH, &n, sizeof n);
 #endif
                 fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);                  fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
           }
   
         schedRead(TASK_ROOT(task), rxBLOB, c, c->cli_sock, NULL, 0);          schedRead(TASK_ROOT(task), rxBLOB, c, c->cli_sock, NULL, 0);
 end:  end:
Line 663  rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv) Line 684  rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv)
                 TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);                  TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
   
                 rpc_srv_blobFree(srv, b);                  rpc_srv_blobFree(srv, b);
                free(b);                io_free(b);
         }          }
   
         schedEnd(&srv->srv_blob.root);          schedEnd(&srv->srv_blob.root);
Line 681  rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv) Line 702  rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv)
  * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)   * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
  * @csHost = Host name or address for bind server, if NULL any address   * @csHost = Host name or address for bind server, if NULL any address
  * @Port = Port for bind server, if Port == 0 default port is selected   * @Port = Port for bind server, if Port == 0 default port is selected
    * @proto = Protocol, if == 0 choose SOCK_STREAM
  * return: NULL == error or !=NULL bind and created RPC server instance   * return: NULL == error or !=NULL bind and created RPC server instance
  */   */
 rpc_srv_t *  rpc_srv_t *
 rpc_srv_initServer(u_int regProgID, u_char regProcID, int concurentClients,   rpc_srv_initServer(u_int regProgID, u_char regProcID, int concurentClients, 
                int netBuf, const char *csHost, u_short Port)                int netBuf, const char *csHost, u_short Port, int proto)
 {  {
         int n = 1;          int n = 1;
         rpc_srv_t *srv = NULL;          rpc_srv_t *srv = NULL;
        io_sockaddr_t sa;        io_sockaddr_t sa = IO_SOCKADDR_INIT;
   
        if (!concurentClients || !regProgID) {        if (!concurentClients || !regProgID || (proto < 0 || proto > SOCK_DGRAM)) {
                 rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");                  rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
                 return NULL;                  return NULL;
         }          }
           if (!proto)
                   proto = SOCK_STREAM;
         if (!io_gethostbyname(csHost, Port, &sa))          if (!io_gethostbyname(csHost, Port, &sa))
                 return NULL;                  return NULL;
         if (!Port)          if (!Port)
Line 702  rpc_srv_initServer(u_int regProgID, u_char regProcID,  Line 726  rpc_srv_initServer(u_int regProgID, u_char regProcID, 
         if (netBuf < RPC_MIN_BUFSIZ)          if (netBuf < RPC_MIN_BUFSIZ)
                 netBuf = BUFSIZ;                  netBuf = BUFSIZ;
         else          else
                netBuf = io_align(netBuf, 1);      /* align netBuf length */                netBuf = io_align(netBuf, 2);      /* align netBuf length */
   
 #ifdef HAVE_SRANDOMDEV  #ifdef HAVE_SRANDOMDEV
         srandomdev();          srandomdev();
Line 712  rpc_srv_initServer(u_int regProgID, u_char regProcID,  Line 736  rpc_srv_initServer(u_int regProgID, u_char regProcID, 
         srandom((time(&tim) ^ getpid()));          srandom((time(&tim) ^ getpid()));
 #endif  #endif
   
        srv = malloc(sizeof(rpc_srv_t));        srv = io_malloc(sizeof(rpc_srv_t));
         if (!srv) {          if (!srv) {
                 LOGERR;                  LOGERR;
                 return NULL;                  return NULL;
         } else          } else
                 memset(srv, 0, sizeof(rpc_srv_t));                  memset(srv, 0, sizeof(rpc_srv_t));
   
           srv->srv_proto = proto;
         srv->srv_netbuf = netBuf;          srv->srv_netbuf = netBuf;
         srv->srv_session.sess_version = RPC_VERSION;          srv->srv_session.sess_version = RPC_VERSION;
         srv->srv_session.sess_program = regProgID;          srv->srv_session.sess_program = regProgID;
Line 727  rpc_srv_initServer(u_int regProgID, u_char regProcID,  Line 752  rpc_srv_initServer(u_int regProgID, u_char regProcID, 
         srv->srv_server.cli_parent = srv;          srv->srv_server.cli_parent = srv;
         memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa);          memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa);
   
        /* init functions list */        /* init functions */
        TAILQ_INIT(&srv->srv_funcs);        pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
         SLIST_INIT(&srv->srv_funcs);
         AVL_INIT(&srv->srv_funcs);
   
         /* init scheduler */          /* init scheduler */
         srv->srv_root = schedBegin();          srv->srv_root = schedBegin();
         if (!srv->srv_root) {          if (!srv->srv_root) {
                 rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());                  rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                free(srv);                pthread_mutex_destroy(&srv->srv_funcs.mtx);
                 io_free(srv);
                 return NULL;                  return NULL;
         }          }
   
Line 743  rpc_srv_initServer(u_int regProgID, u_char regProcID,  Line 771  rpc_srv_initServer(u_int regProgID, u_char regProcID, 
         if (!srv->srv_clients) {          if (!srv->srv_clients) {
                 rpc_SetErr(io_GetErrno(), "%s", io_GetError());                  rpc_SetErr(io_GetErrno(), "%s", io_GetError());
                 schedEnd(&srv->srv_root);                  schedEnd(&srv->srv_root);
                free(srv);                pthread_mutex_destroy(&srv->srv_funcs.mtx);
                 io_free(srv);
                 return NULL;                  return NULL;
         }          }
   
         /* create server socket */          /* create server socket */
        srv->srv_server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);        srv->srv_server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, srv->srv_proto, 0);
         if (srv->srv_server.cli_sock == -1) {          if (srv->srv_server.cli_sock == -1) {
                 LOGERR;                  LOGERR;
                 io_arrayDestroy(&srv->srv_clients);                  io_arrayDestroy(&srv->srv_clients);
                 schedEnd(&srv->srv_root);                  schedEnd(&srv->srv_root);
                free(srv);                pthread_mutex_destroy(&srv->srv_funcs.mtx);
                 io_free(srv);
                 return NULL;                  return NULL;
         }          }
         if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {          if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
Line 782  err: /* error condition */ Line 812  err: /* error condition */
         close(srv->srv_server.cli_sock);          close(srv->srv_server.cli_sock);
         io_arrayDestroy(&srv->srv_clients);          io_arrayDestroy(&srv->srv_clients);
         schedEnd(&srv->srv_root);          schedEnd(&srv->srv_root);
        free(srv);        pthread_mutex_destroy(&srv->srv_funcs.mtx);
         io_free(srv);
         return NULL;          return NULL;
 }  }
   
Line 805  rpc_srv_endServer(rpc_srv_t ** __restrict psrv) Line 836  rpc_srv_endServer(rpc_srv_t ** __restrict psrv)
         (*psrv)->srv_kill = 1;          (*psrv)->srv_kill = 1;
         sleep(RPC_SCHED_POLLING);          sleep(RPC_SCHED_POLLING);
   
        free(*psrv);        pthread_mutex_destroy(&(*psrv)->srv_funcs.mtx);
         io_free(*psrv);
         *psrv = NULL;          *psrv = NULL;
 }  }
   
Line 820  rpc_srv_loopServer(rpc_srv_t * __restrict srv) Line 852  rpc_srv_loopServer(rpc_srv_t * __restrict srv)
 {  {
         rpc_cli_t *c;          rpc_cli_t *c;
         register int i;          register int i;
        rpc_func_t *f, *tmp;        rpc_func_t *f;
         struct timespec ts = { RPC_SCHED_POLLING, 0 };          struct timespec ts = { RPC_SCHED_POLLING, 0 };
   
         if (!srv) {          if (!srv) {
Line 828  rpc_srv_loopServer(rpc_srv_t * __restrict srv) Line 860  rpc_srv_loopServer(rpc_srv_t * __restrict srv)
                 return -1;                  return -1;
         }          }
   
         fcntl(srv->srv_server.cli_sock, F_SETFL,   
                         fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);  
   
         if (listen(srv->srv_server.cli_sock, io_arraySize(srv->srv_clients)) == -1) {          if (listen(srv->srv_server.cli_sock, io_arraySize(srv->srv_clients)) == -1) {
                 LOGERR;                  LOGERR;
                 return -1;                  return -1;
        }        } else
                 fcntl(srv->srv_server.cli_sock, F_SETFL, 
                                 fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
   
        if (!schedRead(srv->srv_root, acceptClients, srv, srv->srv_server.cli_sock, NULL, 0)) {        if (!schedRead(srv->srv_root, cbProto[srv->srv_proto][CB_ACCEPTCLIENT], srv, 
                                 srv->srv_server.cli_sock, NULL, 0)) {
                 rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());                  rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                 return -1;                  return -1;
         }          }
Line 863  rpc_srv_loopServer(rpc_srv_t * __restrict srv) Line 895  rpc_srv_loopServer(rpc_srv_t * __restrict srv)
         close(srv->srv_server.cli_sock);          close(srv->srv_server.cli_sock);
   
         /* detach exported calls */          /* detach exported calls */
        TAILQ_FOREACH_SAFE(f, &srv->srv_funcs, func_node, tmp) {        RPC_FUNCS_LOCK(&srv->srv_funcs);
                TAILQ_REMOVE(&srv->srv_funcs, f, func_node);        while ((f = SLIST_FIRST(&srv->srv_funcs))) {
                 SLIST_REMOVE_HEAD(&srv->srv_funcs, func_next);
   
                 AIT_FREE_VAL(&f->func_name);                  AIT_FREE_VAL(&f->func_name);
                free(f);                io_free(f);
         }          }
           srv->srv_funcs.avlh_root = NULL;
           RPC_FUNCS_UNLOCK(&srv->srv_funcs);
   
         schedEnd(&srv->srv_root);          schedEnd(&srv->srv_root);
         return 0;          return 0;

Removed from v.1.10  
changed lines
  Added in v.1.12.2.1


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>