Diff for /libaitrpc/src/srv.c between versions 1.1.1.1.2.6 and 1.2.2.2

version 1.1.1.1.2.6, 2010/06/23 16:23:31 version 1.2.2.2, 2011/07/14 01:25:11
Line 5 Line 5
 * $Author$  * $Author$
 * $Id$  * $Id$
 *  *
*************************************************************************/**************************************************************************
 The ELWIX and AITNET software is distributed under the following
 terms:
 
 All of the documentation and software included in the ELWIX and AITNET
 Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
 
 Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011
         by Michael Pounov <misho@elwix.org>.  All rights reserved.
 
 Redistribution and use in source and binary forms, with or without
 modification, are permitted provided that the following conditions
 are met:
 1. Redistributions of source code must retain the above copyright
    notice, this list of conditions and the following disclaimer.
 2. Redistributions in binary form must reproduce the above copyright
    notice, this list of conditions and the following disclaimer in the
    documentation and/or other materials provided with the distribution.
 3. All advertising materials mentioning features or use of this software
    must display the following acknowledgement:
 This product includes software developed by Michael Pounov <misho@elwix.org>
 ELWIX - Embedded LightWeight unIX and its contributors.
 4. Neither the name of AITNET nor the names of its contributors
    may be used to endorse or promote products derived from this software
    without specific prior written permission.
 
 THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
 ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
 OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
 HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
 OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
 SUCH DAMAGE.
 */
 #include "global.h"  #include "global.h"
   
   
 static void *  static void *
 rpc_srv_dispatchCall(void *arg)  rpc_srv_dispatchCall(void *arg)
 {  {
        rpc_cli_t cli, *c = arg;        rpc_cli_t *c = arg;
         rpc_srv_t *s;          rpc_srv_t *s;
        rpc_val_t *vals, *v = NULL;        rpc_val_t *vals = NULL, *v = NULL;
         rpc_func_t *f;          rpc_func_t *f;
         struct tagRPCCall *rpc;          struct tagRPCCall *rpc;
        struct tagRPCRet rrpc;        struct tagRPCRet *rrpc;
         fd_set fds;          fd_set fds;
         u_char buf[BUFSIZ], *data;          u_char buf[BUFSIZ], *data;
        int ret, argc, Limit = 0;        int ret, argc = 0, Limit = 0;
         register int i;          register int i;
   
         if (!arg) {          if (!arg) {
Line 30  rpc_srv_dispatchCall(void *arg) Line 67  rpc_srv_dispatchCall(void *arg)
                 s = c->cli_parent;                  s = c->cli_parent;
   
         do {          do {
                   v = NULL;
                 FD_ZERO(&fds);                  FD_ZERO(&fds);
                 FD_SET(c->cli_sock, &fds);                  FD_SET(c->cli_sock, &fds);
                 ret = select(c->cli_sock + 1, &fds, NULL, NULL, NULL);                  ret = select(c->cli_sock + 1, &fds, NULL, NULL, NULL);
                 if (ret == -1) {                  if (ret == -1) {
                         ret = -2;                          ret = -2;
                 }                  }
                 memset(&rrpc, 0, sizeof rrpc);  
                 memset(buf, 0, BUFSIZ);                  memset(buf, 0, BUFSIZ);
                 if ((ret = recv(c->cli_sock, buf, BUFSIZ, 0)) == -1) {                  if ((ret = recv(c->cli_sock, buf, BUFSIZ, 0)) == -1) {
                         LOGERR;                          LOGERR;
Line 57  rpc_srv_dispatchCall(void *arg) Line 94  rpc_srv_dispatchCall(void *arg)
                 if (memcmp(&rpc->call_session, &s->srv_session, sizeof rpc->call_session)) {                  if (memcmp(&rpc->call_session, &s->srv_session, sizeof rpc->call_session)) {
                         rpc_SetErr(EINVAL, "Error:: get invalid RPC session ...\n");                          rpc_SetErr(EINVAL, "Error:: get invalid RPC session ...\n");
                         ret = -5;                          ret = -5;
                        break;                        goto makeReply;
                }                } else
                         Limit = sizeof(struct tagRPCCall);
                 // RPC is OK! Go decapsulate variables ...                  // RPC is OK! Go decapsulate variables ...
                 if (rpc->call_argc) {                  if (rpc->call_argc) {
                        v = (rpc_val_t*) (buf + sizeof(struct tagRPCCall));                        v = (rpc_val_t*) (buf + Limit);
                         // check RPC packet length
                         if (rpc->call_argc * sizeof(rpc_val_t) > BUFSIZ - Limit) {
                                 rpc_SetErr(EMSGSIZE, "Error:: Too big RPC packet ...\n");
                                 ret = -5;
                                 goto makeReply;
                         } else
                                 Limit += rpc->call_argc * sizeof(rpc_val_t);
                         // RPC received variables types OK!                          // RPC received variables types OK!
                         data = (u_char*) v + rpc->call_argc * sizeof(rpc_val_t);                          data = (u_char*) v + rpc->call_argc * sizeof(rpc_val_t);
                         for (i = 0; i < rpc->call_argc; i++) {                          for (i = 0; i < rpc->call_argc; i++) {
                                 switch (v[i].val_type) {                                  switch (v[i].val_type) {
                                         case buffer:                                          case buffer:
                                                   if (v[i].val_len > BUFSIZ - Limit) {
                                                           rpc_SetErr(EMSGSIZE, "Error:: Too big RPC packet ...\n");
                                                           ret = -5;
                                                           goto makeReply;
                                                   } else
                                                           Limit += v[i].val_len;
   
                                                 v[i].val.buffer = data;                                                  v[i].val.buffer = data;
                                                 data += v[i].val_len;                                                  data += v[i].val_len;
                                                 break;                                                  break;
                                         case string:                                          case string:
                                                   if (v[i].val_len > BUFSIZ - Limit) {
                                                           rpc_SetErr(EMSGSIZE, "Error:: Too big RPC packet ...\n");
                                                           ret = -5;
                                                           goto makeReply;
                                                   } else
                                                           Limit += v[i].val_len;
   
                                                 v[i].val.string = (int8_t*) data;                                                  v[i].val.string = (int8_t*) data;
                                                data += v[i].val_len + 1;                                                data += v[i].val_len;
                                                 break;                                                  break;
                                         case blob:                                          case blob:
                                                rpc_srv_getBLOBVar(&v[i], data);                                                if (s->srv_blob.state == disable) {
                                                data += sizeof(rpc_cli_t);                                                        rpc_SetErr(ENOTSUP, "Error:: BLOB server is disabled\n");
                                                break;                                                        ret = -5;
                                                         goto makeReply;
                                                 }
                                         default:                                          default:
                                                 break;                                                  break;
                                 }                                  }
Line 90  rpc_srv_dispatchCall(void *arg) Line 151  rpc_srv_dispatchCall(void *arg)
                         rpc_SetErr(EINVAL, "Error:: call not found into RPC server ...\n");                          rpc_SetErr(EINVAL, "Error:: call not found into RPC server ...\n");
                         ret = -6;                          ret = -6;
                 } else                  } else
                        if ((ret = rpc_srv_execCall(s, f, rpc, v)) == -1)                        if ((ret = rpc_srv_execCall(f, rpc, v)) == -1)
                                ret = -6;                                ret = -9;
                         else                          else
                                 argc = rpc_srv_getValsCall(f, &vals);                                  argc = rpc_srv_getValsCall(f, &vals);
   
                memcpy(&rrpc.ret_session, &rpc->call_session, sizeof rrpc.ret_session);makeReply:
                rrpc.ret_tag = rpc->call_tag; 
                rrpc.ret_hash = rpc->call_hash; 
                rrpc.ret_errno = rpc_Errno; 
                rrpc.ret_retcode = ret; 
                rrpc.ret_argc = argc; 
 
                 memset(buf, 0, BUFSIZ);                  memset(buf, 0, BUFSIZ);
                memcpy(buf, &rrpc, (Limit = sizeof rrpc));                rrpc = (struct tagRPCRet*) buf;
                 Limit = sizeof(struct tagRPCRet);
 
                 memcpy(&rrpc->ret_session, &rpc->call_session, sizeof rrpc->ret_session);
                 rrpc->ret_tag = rpc->call_tag;
                 rrpc->ret_hash = rpc->call_hash;
                 rrpc->ret_errno = rpc_Errno;
                 rrpc->ret_retcode = ret;
                 rrpc->ret_argc = argc;
 
                 if (argc && vals) {                  if (argc && vals) {
                        v = (rpc_val_t*) (buf + sizeof rrpc);                        v = (rpc_val_t*) (buf + Limit);
                         if (argc * sizeof(rpc_val_t) > BUFSIZ - Limit) {
                                 for (i = 0; i < argc; i++)
                                         RPC_FREE_VAL(&vals[i]);
                                 free(vals);
                                 vals = NULL;
                                 argc = 0;
                                 ret = -7;
                                 rpc_SetErr(EMSGSIZE, "Error:: in prepare RPC packet values (-7) ...\n");
                                 goto makeReply;
                         } else
                                 Limit += argc * sizeof(rpc_val_t);
                         memcpy(v, vals, argc * sizeof(rpc_val_t));                          memcpy(v, vals, argc * sizeof(rpc_val_t));
                         Limit += argc * sizeof(rpc_val_t);  
                         data = (u_char*) v + argc * sizeof(rpc_val_t);                          data = (u_char*) v + argc * sizeof(rpc_val_t);
                         for (ret = i = 0; i < argc; i++) {                          for (ret = i = 0; i < argc; i++) {
                                 switch (vals[i].val_type) {                                  switch (vals[i].val_type) {
                                         case buffer:                                          case buffer:
                                                 if (ret || Limit + vals[i].val_len > BUFSIZ) {                                                  if (ret || Limit + vals[i].val_len > BUFSIZ) {
                                                         rpc_SetErr(EMSGSIZE, "Error:: in prepare RPC packet (-7) ...\n");                                                          rpc_SetErr(EMSGSIZE, "Error:: in prepare RPC packet (-7) ...\n");
                                                        rrpc.ret_retcode = ret = -7;                                                        rrpc->ret_retcode = ret = -7;
                                                        rrpc.ret_argc = 0;                                                        rrpc->ret_argc = 0;
                                                         break;                                                          break;
                                                 }                                                  }
   
Line 124  rpc_srv_dispatchCall(void *arg) Line 198  rpc_srv_dispatchCall(void *arg)
                                                 Limit += vals[i].val_len;                                                  Limit += vals[i].val_len;
                                                 break;                                                  break;
                                         case string:                                          case string:
                                                if (ret || Limit + vals[i].val_len + 1 > BUFSIZ) {                                                if (ret || Limit + vals[i].val_len > BUFSIZ) {
                                                         rpc_SetErr(EMSGSIZE, "Error:: in prepare RPC packet (-7) ...\n");                                                          rpc_SetErr(EMSGSIZE, "Error:: in prepare RPC packet (-7) ...\n");
                                                        rrpc.ret_retcode = ret = -7;                                                        rrpc->ret_retcode = ret = -7;
                                                        rrpc.ret_argc = 0;                                                        rrpc->ret_argc = 0;
                                                         break;                                                          break;
                                                 }                                                  }
   
                                                memcpy(data, vals[i].val.string, vals[i].val_len + 1);                                                memcpy(data, vals[i].val.string, vals[i].val_len);
                                                data += vals[i].val_len + 1;                                                data += vals[i].val_len;
                                                Limit += vals[i].val_len + 1;                                                Limit += vals[i].val_len;
                                                 break;                                                  break;
                                         case blob:                                          case blob:
                                                if (ret || Limit + sizeof(rpc_cli_t) > BUFSIZ) {                                                if (s->srv_blob.state == disable) {
                                                        rpc_SetErr(EMSGSIZE, "Error:: in prepare RPC packet (-7) ...\n");                                                        rpc_SetErr(ENOTSUP, "Error:: BLOB server is disabled\n");
                                                        rrpc.ret_retcode = ret = -7;                                                        rrpc->ret_retcode = ret = -5;
                                                        rrpc.ret_argc = 0;                                                        rrpc->ret_argc = 0;
                                                         break;                                                          break;
                                                 }                                                  }
   
                                                 memcpy(data, &cli, sizeof(rpc_cli_t));  
                                                 data += sizeof(rpc_cli_t);  
                                                 Limit += sizeof(rpc_cli_t);  
   
                                                 rpc_srv_setBLOBVar(&vals[i], &cli);  
                                                 break;  
                                         default:                                          default:
                                                 break;                                                  break;
                                 }                                  }
   
                                 RPC_FREE_VAL(&vals[i]);                                  RPC_FREE_VAL(&vals[i]);
                                   free(vals);
                                   vals = NULL;
                         }                          }
                 }                  }
   
Line 163  rpc_srv_dispatchCall(void *arg) Line 232  rpc_srv_dispatchCall(void *arg)
                         break;                          break;
                 }                  }
                 if (ret != Limit) {                  if (ret != Limit) {
                        rpc_SetErr(EBADMSG, "Error:: in send RPC request, should be send %d bytes, "                        rpc_SetErr(ECANCELED, "Error:: in send RPC request, should be send %d bytes, "
                                         "really is %d\n", Limit, ret);                                          "really is %d\n", Limit, ret);
                         ret = -9;                          ret = -9;
                         break;                          break;
Line 173  rpc_srv_dispatchCall(void *arg) Line 242  rpc_srv_dispatchCall(void *arg)
         shutdown(c->cli_sock, SHUT_RDWR);          shutdown(c->cli_sock, SHUT_RDWR);
         close(c->cli_sock);          close(c->cli_sock);
         memset(c, 0, sizeof(rpc_cli_t));          memset(c, 0, sizeof(rpc_cli_t));
        return (void*) ret;        return (void*) (long)ret;
 }  }
   
   
Line 183  rpc_srv_dispatchVars(void *arg) Line 252  rpc_srv_dispatchVars(void *arg)
         rpc_cli_t *c = arg;          rpc_cli_t *c = arg;
         rpc_srv_t *s;          rpc_srv_t *s;
         rpc_blob_t *b;          rpc_blob_t *b;
        int cx, ret;        int ret;
         fd_set fds;          fd_set fds;
         u_char buf[sizeof(struct tagBLOBHdr)];          u_char buf[sizeof(struct tagBLOBHdr)];
         struct tagBLOBHdr *blob;          struct tagBLOBHdr *blob;
Line 194  rpc_srv_dispatchVars(void *arg) Line 263  rpc_srv_dispatchVars(void *arg)
         } else          } else
                 s = c->cli_parent;                  s = c->cli_parent;
   
         cx = -1;  
         do {          do {
                cx++;                // check for disable service at this moment?
                 if (s->srv_blob.state == disable) {
                         ret = 0;
                         break;
                 }
   
                 FD_ZERO(&fds);                  FD_ZERO(&fds);
                 FD_SET(c->cli_sock, &fds);                  FD_SET(c->cli_sock, &fds);
Line 204  rpc_srv_dispatchVars(void *arg) Line 276  rpc_srv_dispatchVars(void *arg)
                 if (ret == -1) {                  if (ret == -1) {
                         ret = -2;                          ret = -2;
                 }                  }
   
                 memset(buf, 0, sizeof buf);                  memset(buf, 0, sizeof buf);
                 if ((ret = recv(c->cli_sock, buf, sizeof buf, 0)) == -1) {                  if ((ret = recv(c->cli_sock, buf, sizeof buf, 0)) == -1) {
                         LOGERR;                          LOGERR;
                         ret = -3;                          ret = -3;
                         break;                          break;
                 }                  }
                if (!ret) {             // receive EOF                if (!ret || s->srv_blob.state == disable) {        // receive EOF or disable service
                         ret = 0;                          ret = 0;
                         break;                          break;
                 }                  }
Line 221  rpc_srv_dispatchVars(void *arg) Line 294  rpc_srv_dispatchVars(void *arg)
                 } else                  } else
                         blob = (struct tagBLOBHdr*) buf;                          blob = (struct tagBLOBHdr*) buf;
                 // check BLOB packet session info                  // check BLOB packet session info
                if (memcmp(&blob->hdr_session, &s->srv_session, sizeof blob->hdr_session) ||                 if (memcmp(&blob->hdr_session, &s->srv_session, sizeof blob->hdr_session)) {
                                blob->hdr_seq != cx) {                        rpc_SetErr(EINVAL, "Error:: get invalid BLOB session ...\n");
                        rpc_SetErr(EINVAL, "Error:: get invalid BLOB session in seq=%d...\n", blob->hdr_seq); 
                         ret = -5;                          ret = -5;
                        break;                        goto makeReply;
                 }                  }
                // Go to decapsulate packet ...                // Go to proceed packet ...
                if (!(b = rpc_srv_getBLOB(s, blob->hdr_var))) { 
                        rpc_SetErr(EINVAL, "Error:: var (%x) not found into BLOB server ...\n", blob->hdr_var); 
                        ret = -6; 
                        break; 
                } 
                 switch (blob->hdr_cmd) {                  switch (blob->hdr_cmd) {
                         case get:                          case get:
                                   if (!(b = rpc_srv_getBLOB(s, blob->hdr_var))) {
                                           rpc_SetErr(EINVAL, "Error:: var (%x) not found into BLOB server ...\n", 
                                                           blob->hdr_var);
                                           ret = -6;
                                           break;
                                   } else
                                           blob->hdr_len = b->blob_len;
   
                                 if (rpc_srv_blobMap(s, b) != -1) {                                  if (rpc_srv_blobMap(s, b) != -1) {
                                         ret = rpc_srv_sendBLOB(c, b);                                          ret = rpc_srv_sendBLOB(c, b);
                                         rpc_srv_blobUnmap(b);                                          rpc_srv_blobUnmap(b);
Line 242  rpc_srv_dispatchVars(void *arg) Line 317  rpc_srv_dispatchVars(void *arg)
                                         ret = -7;                                          ret = -7;
                                 break;                                  break;
                         case set:                          case set:
                                ret = rpc_srv_recvBLOB(c, b);                                if ((b = rpc_srv_registerBLOB(s, blob->hdr_len))) {
                                if (ret == -1)                                        // set new BLOB variable for reply :)
                                         blob->hdr_var = b->blob_var;
 
                                         ret = rpc_srv_recvBLOB(c, b);
                                         rpc_srv_blobUnmap(b);
                                 } else
                                         ret = -7;                                          ret = -7;
                                 break;                                  break;
                         case unset:                          case unset:
                                ret = rpc_srv_deleteBLOB(c, b);                                ret = rpc_srv_unregisterBLOB(s, blob->hdr_var);
                                 if (ret == -1)                                  if (ret == -1)
                                         ret = -7;                                          ret = -7;
                                 break;                                  break;
                         default:                          default:
                                 rpc_SetErr(EINVAL, "Error:: unsupported BLOB command (%d)...\n",                                   rpc_SetErr(EINVAL, "Error:: unsupported BLOB command (%d)...\n", 
                                                 blob->hdr_cmd);                                                  blob->hdr_cmd);
                                ret -7;                                ret = -7;
                 }                  }
   
   makeReply:
                 // Replay to client!                  // Replay to client!
                   blob->hdr_cmd = ret < 0 ? error : ok;
                   blob->hdr_ret = ret;
                   if ((ret = send(c->cli_sock, buf, sizeof buf, 0)) == -1) {
                           LOGERR;
                           ret = -8;
                           break;
                   }
                   if (ret != sizeof buf) {
                           rpc_SetErr(ECANCELED, "Error:: in send BLOB reply, should be send %d bytes, "
                                           "really is %d\n", sizeof buf, ret);
                           ret = -9;
                           break;
                   }
         } while (ret > -1);          } while (ret > -1);
   
         shutdown(c->cli_sock, SHUT_RDWR);          shutdown(c->cli_sock, SHUT_RDWR);
         close(c->cli_sock);          close(c->cli_sock);
         memset(c, 0, sizeof(rpc_cli_t));          memset(c, 0, sizeof(rpc_cli_t));
        return (void*) ret;        return (void*) (long)ret;
 }  }
   
 // -------------------------------------------------  // -------------------------------------------------
Line 342  rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s Line 436  rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s
         rpc_srv_registerCall(srv, NULL, CALL_BLOBSHUTDOWN, 0);          rpc_srv_registerCall(srv, NULL, CALL_BLOBSHUTDOWN, 0);
         rpc_srv_registerCall(srv, NULL, CALL_BLOBCLIENTS, 0);          rpc_srv_registerCall(srv, NULL, CALL_BLOBCLIENTS, 0);
         rpc_srv_registerCall(srv, NULL, CALL_BLOBVARS, 0);          rpc_srv_registerCall(srv, NULL, CALL_BLOBVARS, 0);
           rpc_srv_registerCall(srv, NULL, CALL_BLOBSTATE, 1);
         pthread_mutex_unlock(&srv->srv_mtx);          pthread_mutex_unlock(&srv->srv_mtx);
   
         srv->srv_blob.state = enable;   // enable BLOB          srv->srv_blob.state = enable;   // enable BLOB
Line 366  rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv) Line 461  rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
         } else          } else
                 srv->srv_blob.state = disable;                  srv->srv_blob.state = disable;
   
           rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSHUTDOWN);
           rpc_srv_unregisterCall(srv, NULL, CALL_BLOBCLIENTS);
           rpc_srv_unregisterCall(srv, NULL, CALL_BLOBVARS);
           rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSTATE);
   
         for (i = 0, c = srv->srv_blob.clients; i < srv->srv_numcli && c; i++, c++)          for (i = 0, c = srv->srv_blob.clients; i < srv->srv_numcli && c; i++, c++)
                 if (c->cli_sa.sa_family)                  if (c->cli_sa.sa_family)
                         shutdown(c->cli_sock, SHUT_RDWR);                          shutdown(c->cli_sock, SHUT_RDWR);
         close(srv->srv_blob.server.cli_sock);          close(srv->srv_blob.server.cli_sock);
   
        if (srv->srv_blob.clients)        if (srv->srv_blob.clients) {
                 free(srv->srv_blob.clients);                  free(srv->srv_blob.clients);
                   srv->srv_blob.clients = NULL;
           }
   
         pthread_mutex_lock(&srv->srv_blob.mtx);          pthread_mutex_lock(&srv->srv_blob.mtx);
         while ((f = srv->srv_blob.blobs)) {          while ((f = srv->srv_blob.blobs)) {
                 srv->srv_blob.blobs = f->blob_next;                  srv->srv_blob.blobs = f->blob_next;
                   rpc_srv_blobFree(srv, f);
                 free(f);                  free(f);
         }          }
         pthread_mutex_unlock(&srv->srv_blob.mtx);          pthread_mutex_unlock(&srv->srv_blob.mtx);
   
           while (pthread_mutex_trylock(&srv->srv_blob.mtx) == EBUSY);
         pthread_mutex_destroy(&srv->srv_blob.mtx);          pthread_mutex_destroy(&srv->srv_blob.mtx);
 }  }
   
Line 399  rpc_srv_execBLOBServer(rpc_srv_t * __restrict srv) Line 503  rpc_srv_execBLOBServer(rpc_srv_t * __restrict srv)
         int ret;          int ret;
         struct timeval tv = { DEF_RPC_TIMEOUT, 0 };          struct timeval tv = { DEF_RPC_TIMEOUT, 0 };
   
        if (!srv || !srv->srv_blob.state) {        if (!srv || srv->srv_blob.state == disable) {
                 rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start BLOB server ...\n");                  rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start BLOB server ...\n");
                 return -1;                  return -1;
         }          }
Line 409  rpc_srv_execBLOBServer(rpc_srv_t * __restrict srv) Line 513  rpc_srv_execBLOBServer(rpc_srv_t * __restrict srv)
                 return -1;                  return -1;
         }          }
   
        while (!rpc_Kill) {        while (!blob_Kill && !rpc_Kill) {
                 for (c = srv->srv_blob.clients, i = 0; i < srv->srv_numcli && c; i++, c++)                  for (c = srv->srv_blob.clients, i = 0; i < srv->srv_numcli && c; i++, c++)
                         if (!c->cli_sa.sa_family)                          if (!c->cli_sa.sa_family)
                                 break;                                  break;
                if (c && c->cli_sa.sa_family && c->cli_parent) {                if (i >= srv->srv_numcli) {
                         usleep(1000000);                          usleep(1000000);
                         continue;                          continue;
                 }                  }
Line 442  rpc_srv_execBLOBServer(rpc_srv_t * __restrict srv) Line 546  rpc_srv_execBLOBServer(rpc_srv_t * __restrict srv)
                 }                  }
         }          }
   
           srv->srv_blob.state = disable;
   
         return 0;          return 0;
 }  }
   
Line 547  rpc_srv_initServer(u_int regProgID, u_int regProcID, i Line 653  rpc_srv_initServer(u_int regProgID, u_int regProcID, i
         } else          } else
                 memset(srv->srv_clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));                  memset(srv->srv_clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));
   
           pthread_mutex_init(&srv->srv_mtx, NULL);
   
         rpc_srv_registerCall(srv, NULL, CALL_SRVSHUTDOWN, 0);          rpc_srv_registerCall(srv, NULL, CALL_SRVSHUTDOWN, 0);
         rpc_srv_registerCall(srv, NULL, CALL_SRVCLIENTS, 0);          rpc_srv_registerCall(srv, NULL, CALL_SRVCLIENTS, 0);
         rpc_srv_registerCall(srv, NULL, CALL_SRVCALLS, 0);          rpc_srv_registerCall(srv, NULL, CALL_SRVCALLS, 0);
         rpc_srv_registerCall(srv, NULL, CALL_SRVSESSIONS, 0);          rpc_srv_registerCall(srv, NULL, CALL_SRVSESSIONS, 0);
   
         pthread_mutex_init(&srv->srv_mtx, NULL);  
         return srv;          return srv;
 }  }
   
Line 576  rpc_srv_endServer(rpc_srv_t * __restrict srv) Line 682  rpc_srv_endServer(rpc_srv_t * __restrict srv)
         rpc_srv_endBLOBServer(srv);          rpc_srv_endBLOBServer(srv);
   
         for (i = 0, c = srv->srv_clients; i < srv->srv_numcli && c; i++, c++)          for (i = 0, c = srv->srv_clients; i < srv->srv_numcli && c; i++, c++)
                if (c->cli_sa.sa_family)                if (c->cli_sa.sa_family) {
                         shutdown(c->cli_sock, SHUT_RDWR);                          shutdown(c->cli_sock, SHUT_RDWR);
                           close(c->cli_sock);
                   }
         close(srv->srv_server.cli_sock);          close(srv->srv_server.cli_sock);
   
         if (srv->srv_clients) {          if (srv->srv_clients) {
                 free(srv->srv_clients);                  free(srv->srv_clients);
                   srv->srv_clients = NULL;
                 srv->srv_numcli = 0;                  srv->srv_numcli = 0;
         }          }
   
Line 592  rpc_srv_endServer(rpc_srv_t * __restrict srv) Line 701  rpc_srv_endServer(rpc_srv_t * __restrict srv)
         }          }
         pthread_mutex_unlock(&srv->srv_mtx);          pthread_mutex_unlock(&srv->srv_mtx);
   
           while (pthread_mutex_trylock(&srv->srv_mtx) == EBUSY);
         pthread_mutex_destroy(&srv->srv_mtx);          pthread_mutex_destroy(&srv->srv_mtx);
   
         free(srv);          free(srv);
Line 627  rpc_srv_execServer(rpc_srv_t * __restrict srv) Line 737  rpc_srv_execServer(rpc_srv_t * __restrict srv)
                 for (c = srv->srv_clients, i = 0; i < srv->srv_numcli && c; i++, c++)                  for (c = srv->srv_clients, i = 0; i < srv->srv_numcli && c; i++, c++)
                         if (!c->cli_sa.sa_family)                          if (!c->cli_sa.sa_family)
                                 break;                                  break;
                if (c && c->cli_sa.sa_family && c->cli_parent) {                if (i >= srv->srv_numcli) {
                         usleep(1000000);                          usleep(1000000);
                         continue;                          continue;
                 }                  }
Line 663  rpc_srv_execServer(rpc_srv_t * __restrict srv) Line 773  rpc_srv_execServer(rpc_srv_t * __restrict srv)
   
 /*  /*
  * rpc_srv_execCall() Execute registered call from RPC server   * rpc_srv_execCall() Execute registered call from RPC server
  * @data = RPC const data  
  * @call = Register RPC call   * @call = Register RPC call
  * @rpc = IN RPC call structure   * @rpc = IN RPC call structure
  * @args = IN RPC call array of rpc values   * @args = IN RPC call array of rpc values
  * return: -1 error, !=-1 ok   * return: -1 error, !=-1 ok
  */   */
 int  int
rpc_srv_execCall(void * const data, rpc_func_t * __restrict call, rpc_srv_execCall(rpc_func_t * __restrict call, struct tagRPCCall * __restrict rpc, 
                struct tagRPCCall * __restrict rpc, rpc_val_t * __restrict args)                rpc_val_t * __restrict args)
 {  {
         void *dl;          void *dl;
         rpc_callback_t func;          rpc_callback_t func;
         int ret;          int ret;
   
        if (!data || !call || !rpc) {        if (!call || !rpc || !call->func_parent) {
                 rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t exec call from RPC server ...\n");                  rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t exec call from RPC server ...\n");
                 return -1;                  return -1;
         }          }
Line 690  rpc_srv_execCall(void * const data, rpc_func_t * __res Line 799  rpc_srv_execCall(void * const data, rpc_func_t * __res
   
         func = dlsym(dl, (char*) call->func_name);          func = dlsym(dl, (char*) call->func_name);
         if (func)          if (func)
                ret = func(data, call, rpc->call_argc, args);                ret = func(call, rpc->call_argc, args);
         else {          else {
                 rpc_SetErr(ENOEXEC, "Error:: Can`t find function %s!\n", dlerror());                  rpc_SetErr(ENOEXEC, "Error:: Can`t find function %s!\n", dlerror());
                 ret = -1;                  ret = -1;

Removed from v.1.1.1.1.2.6  
changed lines
  Added in v.1.2.2.2


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>