Diff for /libaitrpc/src/srv.c between versions 1.4.2.3 and 1.5.2.1

version 1.4.2.3, 2011/08/30 11:13:29 version 1.5.2.1, 2011/09/07 08:56:32
Line 51  rpc_srv_dispatchCall(void *arg) Line 51  rpc_srv_dispatchCall(void *arg)
 {  {
         rpc_cli_t *c = arg;          rpc_cli_t *c = arg;
         rpc_srv_t *s;          rpc_srv_t *s;
         ait_val_t *vals = NULL, *v = NULL;  
         rpc_func_t *f = NULL;          rpc_func_t *f = NULL;
           array_t *arr;
         struct tagRPCCall *rpc;          struct tagRPCCall *rpc;
         struct tagRPCRet *rrpc;          struct tagRPCRet *rrpc;
         rpc_sess_t ses = { 0 };          rpc_sess_t ses = { 0 };
         fd_set fds;          fd_set fds;
        u_char *buf, *data;        u_char *buf;
         int ret, argc = 0, Limit = 0;          int ret, argc = 0, Limit = 0;
         register int i;          register int i;
   
Line 74  rpc_srv_dispatchCall(void *arg) Line 74  rpc_srv_dispatchCall(void *arg)
         }          }
   
         do {          do {
                 v = NULL;  
                 FD_ZERO(&fds);                  FD_ZERO(&fds);
                 FD_SET(c->cli_sock, &fds);                  FD_SET(c->cli_sock, &fds);
                 ret = select(c->cli_sock + 1, &fds, NULL, NULL, NULL);                  ret = select(c->cli_sock + 1, &fds, NULL, NULL, NULL);
Line 98  rpc_srv_dispatchCall(void *arg) Line 97  rpc_srv_dispatchCall(void *arg)
                         break;                          break;
                 }                  }
                 if (ret < sizeof(struct tagRPCCall)) {                  if (ret < sizeof(struct tagRPCCall)) {
                        rpc_SetErr(EMSGSIZE, "Error:: too short RPC packet ...\n");                        rpc_SetErr(ERPCMISMATCH, "Error:: too short RPC packet ...\n");
                         ret = -4;                          ret = -4;
                         break;                          break;
                 } else                  } else
                         rpc = (struct tagRPCCall*) buf;                          rpc = (struct tagRPCCall*) buf;
                 /* check RPC packet session info */                  /* check RPC packet session info */
                if (memcmp(&rpc->call_session, &s->srv_session, sizeof rpc->call_session)) {                if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) {
                        rpc_SetErr(EINVAL, "Error:: get invalid RPC session ...\n");                        rpc_SetErr(ERPCMISMATCH, "Error:: get invalid RPC session ...\n");
                         ret = -5;                          ret = -5;
                         goto makeReply;                          goto makeReply;
                 } else                  } else
Line 113  rpc_srv_dispatchCall(void *arg) Line 112  rpc_srv_dispatchCall(void *arg)
   
                 /* RPC is OK! Go decapsulate variables ... */                  /* RPC is OK! Go decapsulate variables ... */
                 if (rpc->call_argc) {                  if (rpc->call_argc) {
                        v = (ait_val_t*) (buf + Limit);                        arr = io_buffer2vals(buf + Limit, s->srv_netbuf - Limit, rpc->call_argc, 1);
                        if (rpc->call_argc * sizeof(ait_val_t) > s->srv_netbuf - Limit) {                        if (!arr) {
                                rpc_SetErr(EMSGSIZE, "Error:: too long RPC packet ...\n"); 
                                 ret = -5;                                  ret = -5;
                                 goto makeReply;                                  goto makeReply;
                         } else  
                                 Limit += rpc->call_argc * sizeof(ait_val_t);  
   
                         /* RPC received variables types are OK! */  
                         data = (u_char*) v + rpc->call_argc * sizeof(ait_val_t);  
                         for (i = 0; i < rpc->call_argc; i++) {  
                                 switch (AIT_TYPE(&v[i])) {  
                                         case buffer:  
                                                 if (AIT_LEN(&v[i]) > s->srv_netbuf - Limit) {  
                                                         rpc_SetErr(EMSGSIZE, "Error:: too long RPC packet ...\n");  
                                                         ret = -5;  
                                                         goto makeReply;  
                                                 } else  
                                                         Limit += AIT_LEN(&v[i]);  
   
                                                 v[i].val.buffer = data;  
                                                 data += AIT_LEN(&v[i]);  
                                                 break;  
                                         case string:  
                                                 if (AIT_LEN(&v[i]) > s->srv_netbuf - Limit) {  
                                                         rpc_SetErr(EMSGSIZE, "Error:: too long RPC packet ...\n");  
                                                         ret = -5;  
                                                         goto makeReply;  
                                                 } else  
                                                         Limit += AIT_LEN(&v[i]);  
   
                                                 v[i].val.string = (int8_t*) data;  
                                                 data += AIT_LEN(&v[i]);  
                                                 break;  
                                         case blob:  
                                                 if (s->srv_blob.state == disable) {  
                                                         rpc_SetErr(ENOTSUP, "Error:: BLOB server is disabled\n");  
                                                         ret = -5;  
                                                         goto makeReply;  
                                                 }  
                                                 if (s->srv_blob.state == kill) {  
                                                         rpc_SetErr(ENOTSUP, "Error:: BLOB server is gone.\n");  
                                                         ret = -5;  
                                                         goto makeReply;  
                                                 }  
                                         default:  
                                                 break;  
                                 }  
   
                                 AIT_ADDZCOPY(&v[i]);  
                         }                          }
                }                } else
                         arr = NULL;
   
                 /* execute call */                  /* execute call */
                 argc = 0;                  argc = 0;
                 vals = NULL;  
                 memcpy(&ses, &rpc->call_session, sizeof ses);                  memcpy(&ses, &rpc->call_session, sizeof ses);
                 if (!(f = rpc_srv_getCall(s, rpc->call_tag, rpc->call_hash))) {                  if (!(f = rpc_srv_getCall(s, rpc->call_tag, rpc->call_hash))) {
                        rpc_SetErr(EINVAL, "Error:: call not found into RPC server ...\n");                        rpc_SetErr(EPROGUNAVAIL, "Error:: call not found into RPC server ...\n");
                         ret = -6;                          ret = -6;
                 } else                  } else
                        if ((ret = rpc_srv_execCall(f, rpc, v)) == -1)                        if ((ret = rpc_srv_execCall(f, rpc, arr)) == -1)
                                 ret = -9;                                  ret = -9;
                        else                        else {
                                argc = rpc_srv_getVars(f, &vals);                                if (arr)
                                         io_arrayDestroy(&arr);
                                 argc = rpc_srv_getVars(f, &arr);
                                 goto makeReply;         /* Call finish OK */
                         }
 
                 if (arr)
                         io_arrayDestroy(&arr);
 
 makeReply:  makeReply:
                /* made reply */                /* Made reply */
                 memset(buf, 0, s->srv_netbuf);                  memset(buf, 0, s->srv_netbuf);
                 rrpc = (struct tagRPCRet*) buf;                  rrpc = (struct tagRPCRet*) buf;
                 Limit = sizeof(struct tagRPCRet);                  Limit = sizeof(struct tagRPCRet);
Line 191  makeReply: Line 152  makeReply:
                 rrpc->ret_retcode = ret;                  rrpc->ret_retcode = ret;
                 rrpc->ret_argc = argc;                  rrpc->ret_argc = argc;
   
                if (argc && vals) {                if (argc && arr) {
                         /* Go Encapsulate variables ... */                          /* Go Encapsulate variables ... */
                        v = (ait_val_t*) (buf + Limit);                        if ((i = io_vals2buffer(buf + Limit, s->srv_netbuf - Limit, arr)) == -1) {
                        if (argc * sizeof(ait_val_t) > s->srv_netbuf - Limit) {                                rpc_srv_freeVals(f);
                                for (i = 0; i < argc; i++) 
                                        AIT_FREE_VAL(&vals[i]); 
                                rpc_srv_freeVars(f); 
                                vals = NULL; 
                                 argc = 0;                                  argc = 0;
                                 ret = -7;                                  ret = -7;
                                rpc_SetErr(EMSGSIZE, "Error:: in prepare RPC packet values (-7) ...\n");                                rpc_SetErr(EBADRPC, "Error:: in prepare RPC packet values (-7) ...\n");
                                 goto makeReply;                                  goto makeReply;
                        } else                        } else {
                                Limit += argc * sizeof(ait_val_t);                                Limit += i;
                        memcpy(v, vals, argc * sizeof(ait_val_t)); 
   
                        /* RPC send variables types are OK! */                                rpc_srv_freeVals(f);
                        data = (u_char*) v + argc * sizeof(ait_val_t); 
                        for (ret = i = 0; i < argc; i++) { 
                                switch (AIT_TYPE(&vals[i])) { 
                                        case buffer: 
                                                if (ret || Limit + vals[i].val_len > s->srv_netbuf) { 
                                                        rpc_SetErr(EMSGSIZE, "Error:: in prepare RPC packet (-7) ...\n"); 
                                                        rrpc->ret_retcode = ret = -7; 
                                                        rrpc->ret_argc = 0; 
                                                        break; 
                                                } 
 
                                                memcpy(data, vals[i].val.buffer, vals[i].val_len); 
                                                data += AIT_LEN(&vals[i]); 
                                                Limit += AIT_LEN(&vals[i]); 
                                                break; 
                                        case string: 
                                                if (ret || Limit + vals[i].val_len > s->srv_netbuf) { 
                                                        rpc_SetErr(EMSGSIZE, "Error:: in prepare RPC packet (-7) ...\n"); 
                                                        rrpc->ret_retcode = ret = -7; 
                                                        rrpc->ret_argc = 0; 
                                                        break; 
                                                } 
 
                                                memcpy(data, vals[i].val.string, vals[i].val_len); 
                                                data += AIT_LEN(&vals[i]); 
                                                Limit += AIT_LEN(&vals[i]); 
                                                break; 
                                        case blob: 
                                                if (s->srv_blob.state == disable) { 
                                                        rpc_SetErr(ENOTSUP, "Error:: BLOB server is disabled\n"); 
                                                        rrpc->ret_retcode = ret = -5; 
                                                        rrpc->ret_argc = 0; 
                                                        break; 
                                                } 
                                                if (s->srv_blob.state == kill) { 
                                                        rpc_SetErr(ENOTSUP, "Error:: BLOB server is gone.\n"); 
                                                        rrpc->ret_retcode = ret = -5; 
                                                        rrpc->ret_argc = 0; 
                                                        break; 
                                                } 
                                        default: 
                                                break; 
                                } 
 
                                /* don't add zero copy at this position, because buffer/string must be freed! */ 
                                AIT_FREE_VAL(&vals[i]); 
                         }                          }
                         rpc_srv_freeVars(f);  
                         vals = NULL;  
                         argc = 0;  
                 }                  }
   
                 ret = send(c->cli_sock, buf, Limit, 0);                  ret = send(c->cli_sock, buf, Limit, 0);
Line 267  makeReply: Line 174  makeReply:
                         break;                          break;
                 }                  }
                 if (ret != Limit) {                  if (ret != Limit) {
                        rpc_SetErr(ECANCELED, "Error:: in send RPC request, should be send %d bytes, "                        rpc_SetErr(EPROCUNAVAIL, "Error:: in send RPC request, should be send %d bytes, "
                                         "really is %d\n", Limit, ret);                                          "really is %d\n", Limit, ret);
                         ret = -9;                          ret = -9;
                         break;                          break;
Line 334  rpc_srv_dispatchVars(void *arg) Line 241  rpc_srv_dispatchVars(void *arg)
                         break;                          break;
                 }                  }
                 if (ret < sizeof(struct tagBLOBHdr)) {                  if (ret < sizeof(struct tagBLOBHdr)) {
                        rpc_SetErr(EMSGSIZE, "Error:: too short BLOB packet ...\n");                        rpc_SetErr(ERPCMISMATCH, "Error:: too short BLOB packet ...\n");
                         ret = -4;                          ret = -4;
                         break;                          break;
                 } else                  } else
Line 378  rpc_srv_dispatchVars(void *arg) Line 285  rpc_srv_dispatchVars(void *arg)
                                         ret = -7;                                          ret = -7;
                                 break;                                  break;
                         default:                          default:
                                rpc_SetErr(EINVAL, "Error:: unsupported BLOB command (%d)...\n",                                 rpc_SetErr(EPROCUNAVAIL, "Error:: unsupported BLOB command (%d)...\n", 
                                                 blob->hdr_cmd);                                                  blob->hdr_cmd);
                                 ret = -7;                                  ret = -7;
                 }                  }
Line 394  makeReply: Line 301  makeReply:
                         break;                          break;
                 }                  }
                 if (ret != sizeof buf) {                  if (ret != sizeof buf) {
                        rpc_SetErr(ECANCELED, "Error:: in send BLOB reply, should be send %d bytes, "                        rpc_SetErr(EPROCUNAVAIL, "Error:: in send BLOB reply, should be send %d bytes, "
                                         "really is %d\n", sizeof buf, ret);                                          "really is %d\n", sizeof buf, ret);
                         ret = -9;                          ret = -9;
                         break;                          break;
Line 404  makeReply: Line 311  makeReply:
         shutdown(c->cli_sock, SHUT_RDWR);          shutdown(c->cli_sock, SHUT_RDWR);
         close(c->cli_sock);          close(c->cli_sock);
         memset(c, 0, sizeof(rpc_cli_t));          memset(c, 0, sizeof(rpc_cli_t));
        return (void*) (long)ret;        return (void*) ((long)ret);
 }  }
   
 // -------------------------------------------------  // -------------------------------------------------
Line 504  rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s Line 411  rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s
   
         pthread_mutex_init(&srv->srv_blob.mtx, NULL);          pthread_mutex_init(&srv->srv_blob.mtx, NULL);
   
         pthread_mutex_lock(&srv->srv_mtx);  
         rpc_srv_registerCall(srv, NULL, CALL_BLOBSHUTDOWN, 0);          rpc_srv_registerCall(srv, NULL, CALL_BLOBSHUTDOWN, 0);
        rpc_srv_registerCall(srv, NULL, CALL_BLOBCLIENTS, 0);        rpc_srv_registerCall(srv, NULL, CALL_BLOBCLIENTS, 1);
        rpc_srv_registerCall(srv, NULL, CALL_BLOBVARS, 0);        rpc_srv_registerCall(srv, NULL, CALL_BLOBVARS, 1);
        rpc_srv_registerCall(srv, NULL, CALL_BLOBSTATE, 1);        rpc_srv_registerCall(srv, NULL, CALL_BLOBSTATE, 0);
        pthread_mutex_unlock(&srv->srv_mtx); 
   
         srv->srv_blob.state = enable;   /* enable BLOB */          srv->srv_blob.state = enable;   /* enable BLOB */
         return 0;          return 0;
Line 544  rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv) Line 449  rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
                         shutdown(c->cli_sock, SHUT_RDWR);                          shutdown(c->cli_sock, SHUT_RDWR);
         close(srv->srv_blob.server.cli_sock);          close(srv->srv_blob.server.cli_sock);
   
           pthread_mutex_lock(&srv->srv_blob.mtx);
         if (srv->srv_blob.clients) {          if (srv->srv_blob.clients) {
                 free(srv->srv_blob.clients);                  free(srv->srv_blob.clients);
                 srv->srv_blob.clients = NULL;                  srv->srv_blob.clients = NULL;
         }          }
   
         /* detach blobs */          /* detach blobs */
         pthread_mutex_lock(&srv->srv_blob.mtx);  
         while ((f = srv->srv_blob.blobs)) {          while ((f = srv->srv_blob.blobs)) {
                 srv->srv_blob.blobs = f->blob_next;                  srv->srv_blob.blobs = f->blob_next;
                 rpc_srv_blobFree(srv, f);                  rpc_srv_blobFree(srv, f);
Line 563  rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv) Line 468  rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
 }  }
   
 /*  /*
 * rpc_srv_execBLOBServer() Execute Main BLOB server loop and wait for clients requests * rpc_srv_loopBLOB() Execute Main BLOB server loop and wait for clients requests
  * @srv = RPC Server instance   * @srv = RPC Server instance
  * return: -1 error or 0 ok, infinite loop ...   * return: -1 error or 0 ok, infinite loop ...
  */   */
 int  int
rpc_srv_execBLOBServer(rpc_srv_t * __restrict srv)rpc_srv_loopBLOB(rpc_srv_t * __restrict srv)
 {  {
         socklen_t salen = sizeof(struct sockaddr);          socklen_t salen = sizeof(struct sockaddr);
         register int i;          register int i;
Line 592  rpc_srv_execBLOBServer(rpc_srv_t * __restrict srv) Line 497  rpc_srv_execBLOBServer(rpc_srv_t * __restrict srv)
                         if (!c->cli_sa.sa_family)                          if (!c->cli_sa.sa_family)
                                 break;                                  break;
                 if (i >= srv->srv_numcli) {                  if (i >= srv->srv_numcli) {
                         usleep(1000000);  
 #ifdef HAVE_PTHREAD_YIELD  #ifdef HAVE_PTHREAD_YIELD
                         pthread_yield();                          pthread_yield();
   #else
                           usleep(1000000);
 #endif  #endif
                         continue;                          continue;
                 }                  }
Line 624  rpc_srv_execBLOBServer(rpc_srv_t * __restrict srv) Line 530  rpc_srv_execBLOBServer(rpc_srv_t * __restrict srv)
                         pthread_detach(c->cli_tid);                          pthread_detach(c->cli_tid);
         }          }
   
        srv->srv_blob.state = disable;        srv->srv_blob.state = kill;
   
         return 0;          return 0;
 }  }
Line 768  rpc_srv_initServer(u_int regProgID, u_int regProcID, i Line 674  rpc_srv_initServer(u_int regProgID, u_int regProcID, i
         pthread_mutex_init(&srv->srv_mtx, NULL);          pthread_mutex_init(&srv->srv_mtx, NULL);
   
         rpc_srv_registerCall(srv, NULL, CALL_SRVSHUTDOWN, 0);          rpc_srv_registerCall(srv, NULL, CALL_SRVSHUTDOWN, 0);
        rpc_srv_registerCall(srv, NULL, CALL_SRVCLIENTS, 0);        rpc_srv_registerCall(srv, NULL, CALL_SRVCLIENTS, 1);
        rpc_srv_registerCall(srv, NULL, CALL_SRVCALLS, 0);        rpc_srv_registerCall(srv, NULL, CALL_SRVSESSIONS, 4);
        rpc_srv_registerCall(srv, NULL, CALL_SRVSESSIONS, 0);        rpc_srv_registerCall(srv, NULL, CALL_SRVCALLS, 1);
         return srv;          return srv;
 }  }
   
Line 823  rpc_srv_endServer(rpc_srv_t * __restrict srv) Line 729  rpc_srv_endServer(rpc_srv_t * __restrict srv)
 }  }
   
 /*  /*
 * rpc_srv_execServer() Execute Main server loop and wait for clients requests * rpc_srv_loopServer() Execute Main server loop and wait for clients requests
  * @srv = RPC Server instance   * @srv = RPC Server instance
  * return: -1 error or 0 ok, infinite loop ...   * return: -1 error or 0 ok, infinite loop ...
  */   */
 int  int
rpc_srv_execServer(rpc_srv_t * __restrict srv)rpc_srv_loopServer(rpc_srv_t * __restrict srv)
 {  {
         socklen_t salen = sizeof(struct sockaddr);          socklen_t salen = sizeof(struct sockaddr);
         register int i;          register int i;
Line 842  rpc_srv_execServer(rpc_srv_t * __restrict srv) Line 748  rpc_srv_execServer(rpc_srv_t * __restrict srv)
                 return -1;                  return -1;
         }          }
   
           /* activate BLOB server worker if srv->srv_blob.state == enable */
           rpc_srv_execBLOBServer(srv);
   
         if (listen(srv->srv_server.cli_sock, SOMAXCONN) == -1) {          if (listen(srv->srv_server.cli_sock, SOMAXCONN) == -1) {
                 LOGERR;                  LOGERR;
                 return -1;                  return -1;
Line 852  rpc_srv_execServer(rpc_srv_t * __restrict srv) Line 761  rpc_srv_execServer(rpc_srv_t * __restrict srv)
                         if (!c->cli_sa.sa_family)                          if (!c->cli_sa.sa_family)
                                 break;                                  break;
                 if (i >= srv->srv_numcli) {                  if (i >= srv->srv_numcli) {
                         usleep(1000000);  
 #ifdef HAVE_PTHREAD_YIELD  #ifdef HAVE_PTHREAD_YIELD
                         pthread_yield();                          pthread_yield();
   #else
                           usleep(1000000);
 #endif  #endif
                         continue;                          continue;
                 }                  }
Line 893  rpc_srv_execServer(rpc_srv_t * __restrict srv) Line 803  rpc_srv_execServer(rpc_srv_t * __restrict srv)
  * rpc_srv_execCall() Execute registered call from RPC server   * rpc_srv_execCall() Execute registered call from RPC server
  * @call = Register RPC call   * @call = Register RPC call
  * @rpc = IN RPC call structure   * @rpc = IN RPC call structure
 * @args = IN RPC call array of rpc values * @args = IN RPC calling arguments from RPC client
  * return: -1 error, !=-1 ok   * return: -1 error, !=-1 ok
  */   */
 int  int
 rpc_srv_execCall(rpc_func_t * __restrict call, struct tagRPCCall * __restrict rpc,   rpc_srv_execCall(rpc_func_t * __restrict call, struct tagRPCCall * __restrict rpc, 
                ait_val_t * __restrict args)                array_t * __restrict args)
 {  {
         void *dl;          void *dl;
         rpc_callback_t func;          rpc_callback_t func;

Removed from v.1.4.2.3  
changed lines
  Added in v.1.5.2.1


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>