Diff for /libaitrpc/src/srv.c between versions 1.4.2.2 and 1.6.2.5

version 1.4.2.2, 2011/08/30 09:07:55 version 1.6.2.5, 2012/03/13 17:15:31
Line 12  terms: Line 12  terms:
 All of the documentation and software included in the ELWIX and AITNET  All of the documentation and software included in the ELWIX and AITNET
 Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>  Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
   
Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
         by Michael Pounov <misho@elwix.org>.  All rights reserved.          by Michael Pounov <misho@elwix.org>.  All rights reserved.
   
 Redistribution and use in source and binary forms, with or without  Redistribution and use in source and binary forms, with or without
Line 46  SUCH DAMAGE. Line 46  SUCH DAMAGE.
 #include "global.h"  #include "global.h"
   
   
   static void *rxPacket(sched_task_t*);
   
 static void *  static void *
rpc_srv_dispatchCall(void *arg)txPacket(sched_task_t *task)
 {  {
        rpc_cli_t *c = arg;        rpc_cli_t *c = TASK_ARG(task);
        rpc_srv_t *s;        rpc_srv_t *s = c->cli_parent;
        ait_val_t *vals = NULL, *v = NULL; 
         rpc_func_t *f = NULL;          rpc_func_t *f = NULL;
        struct tagRPCCall *rpc;        u_char *buf = TASK_DATA(task);
        struct tagRPCRet *rrpc;        struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
        rpc_sess_t ses = { 0 };        int ret, wlen = sizeof(struct tagRPCCall);
        fd_set fds;        array_t *arr = NULL;
        u_char *buf, *data; 
        int ret, argc = 0, Limit = 0; 
        register int i; 
   
        if (!arg) {        if (rpc->call_argc) {
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t procced RPC client ...\n");                f = rpc_srv_getCall(s, ntohs(rpc->call_tag), ntohl(rpc->call_hash));
                return NULL;                if (!f) {
        } else                        rpc->call_argc ^= rpc->call_argc;
                s = c->cli_parent;                        rpc->call_rep.ret = RPC_ERROR(-1);
                         rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                 } else {
                         rpc->call_argc = htons(rpc_srv_getVars(f, &arr));
                         /* Go Encapsulate variables */
                         ret = io_vars2buffer(buf + wlen, TASK_DATLEN(task) - wlen, arr);
                         io_clrVars(f->func_vars);
                         if (ret == -1) {
                                 rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
                                 rpc->call_argc ^= rpc->call_argc;
                                 rpc->call_rep.ret = RPC_ERROR(-1);
                                 rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                         } else
                                 wlen += ret;
                 }
         }
   
        buf = malloc(s->srv_netbuf);        /* send reply */
        if (!buf) {        ret = send(TASK_FD(task), buf, wlen, 0);
         if (ret == -1)
                 LOGERR;                  LOGERR;
                return NULL;        else if (ret != wlen)
        }                rpc_SetErr(EPROCUNAVAIL, "RPC reply, should be send %d bytes, "
                                 "really sended %d bytes", wlen, ret);
   
        do {        /* lets get next packet */
                v = NULL;        schedRead(TASK_ROOT(task), rxPacket, TASK_ARG(task), TASK_FD(task), 
                FD_ZERO(&fds);                        TASK_DATA(task), TASK_DATLEN(task));
                FD_SET(c->cli_sock, &fds);        return NULL;
                ret = select(c->cli_sock + 1, &fds, NULL, NULL, NULL);}
                if (ret == -1) { 
                        if (errno == EINTR && s->srv_kill != kill) 
                                continue; 
   
                        LOGERR;static void *
                        ret = -2;execCall(sched_task_t *task)
                        break;{
         rpc_cli_t *c = TASK_ARG(task);
         rpc_srv_t *s = c->cli_parent;
         rpc_func_t *f = NULL;
         array_t *arr = NULL;
         u_char *buf = TASK_DATA(task);
         struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
         int argc = ntohs(rpc->call_argc);
 
         /* Go decapsulate variables ... */
         if (!(rpc->call_req.flags & RPC_NOREPLY) && argc) {
                 arr = io_buffer2vars(buf + sizeof(struct tagRPCCall), 
                                 TASK_DATLEN(task) - sizeof(struct tagRPCCall), argc, 1);
                 if (!arr) {
                         rpc_SetErr(ERPCMISMATCH, "#%d - %s", io_GetErrno(), io_GetError());
                         rpc->call_argc ^= rpc->call_argc;
                         rpc->call_rep.ret = RPC_ERROR(-1);
                         rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                         return NULL;
                 }                  }
                memset(buf, 0, s->srv_netbuf);        }
                ret = recv(c->cli_sock, buf, s->srv_netbuf, 0);
                if (ret == -1) {        if (!(f = rpc_srv_getCall(s, ntohs(rpc->call_tag), ntohl(rpc->call_hash)))) {
                        LOGERR;                rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
                        ret = -3;                rpc->call_argc ^= rpc->call_argc;
                        break;                rpc->call_rep.ret = RPC_ERROR(-1);
                 rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
         } else {
                 rpc->call_rep.ret = RPC_ERROR(rpc_srv_execCall(f, rpc, arr));
                 if (rpc->call_rep.ret == htonl(-1)) {
                         rpc->call_rep.eno = RPC_ERROR(errno);
                         rpc->call_argc ^= rpc->call_argc;
                 } else {
                         rpc->call_rep.eno ^= rpc->call_rep.eno;
                         rpc->call_argc = htons(rpc_srv_getVars(f, NULL));
                 }                  }
                if (!ret) {             /* receive EOF */        }
                        ret = 0; 
                        break; 
                } 
                if (ret < sizeof(struct tagRPCCall)) { 
                        rpc_SetErr(EMSGSIZE, "Error:: too short RPC packet ...\n"); 
                        ret = -4; 
                        break; 
                } else 
                        rpc = (struct tagRPCCall*) buf; 
                /* check RPC packet session info */ 
                if (memcmp(&rpc->call_session, &s->srv_session, sizeof rpc->call_session)) { 
                        rpc_SetErr(EINVAL, "Error:: get invalid RPC session ...\n"); 
                        ret = -5; 
                        goto makeReply; 
                } else 
                        Limit = sizeof(struct tagRPCCall); 
   
                /* RPC is OK! Go decapsulate variables ... */        if (arr)
                if (rpc->call_argc) {                io_arrayDestroy(&arr);
                        v = (ait_val_t*) (buf + Limit);        return NULL;
                        if (rpc->call_argc * sizeof(ait_val_t) > s->srv_netbuf - Limit) {}
                                rpc_SetErr(EMSGSIZE, "Error:: too long RPC packet ...\n"); 
                                ret = -5; 
                                goto makeReply; 
                        } else 
                                Limit += rpc->call_argc * sizeof(ait_val_t); 
   
                        /* RPC received variables types are OK! */static void *
                        data = (u_char*) v + rpc->call_argc * sizeof(ait_val_t);rxPacket(sched_task_t *task)
                        for (i = 0; i < rpc->call_argc; i++) {{
                                switch (AIT_TYPE(&v[i])) {        rpc_cli_t *c = TASK_ARG(task);
                                        case buffer:        rpc_srv_t *s = c->cli_parent;
                                                if (AIT_LEN(&v[i]) > s->srv_netbuf - Limit) {        u_char *buf = TASK_DATA(task);
                                                        rpc_SetErr(EMSGSIZE, "Error:: too long RPC packet ...\n");        int rlen;
                                                        ret = -5;        u_short crc;
                                                        goto makeReply;        struct tagRPCCall *rpc;
                                                } else        struct timespec ts;
                                                        Limit += AIT_LEN(&v[i]); 
   
                                                v[i].val.buffer = data;        memset(buf, 0, TASK_DATLEN(task));
                                                data += AIT_LEN(&v[i]);        rlen = recv(TASK_FD(task), buf, TASK_DATLEN(task), 0);
                                                break;        if (rlen == -1) {
                                        case string:                LOGERR;
                                                if (AIT_LEN(&v[i]) > s->srv_netbuf - Limit) {                s->srv_kill = kill;
                                                        rpc_SetErr(EMSGSIZE, "Error:: too long RPC packet ...\n"); 
                                                        ret = -5; 
                                                        goto makeReply; 
                                                } else 
                                                        Limit += AIT_LEN(&v[i]); 
   
                                                v[i].val.string = (int8_t*) data;                schedRead(TASK_ROOT(task), rxPacket, TASK_ARG(task), TASK_FD(task), 
                                                data += AIT_LEN(&v[i]);                                TASK_DATA(task), TASK_DATLEN(task));
                                                break;                return NULL;
                                        case blob:        } else if (!rlen) {        /* receive EOF */
                                                if (s->srv_blob.state == disable) {                s->srv_kill = kill;
                                                        rpc_SetErr(ENOTSUP, "Error:: BLOB server is disabled\n"); 
                                                        ret = -5; 
                                                        goto makeReply; 
                                                } 
                                                if (s->srv_blob.state == kill) { 
                                                        rpc_SetErr(ENOTSUP, "Error:: BLOB server is gone.\n"); 
                                                        ret = -5; 
                                                        goto makeReply; 
                                                } 
                                        default: 
                                                break; 
                                } 
   
                                AIT_ADDZCOPY(&v[i]);                schedRead(TASK_ROOT(task), rxPacket, TASK_ARG(task), TASK_FD(task), 
                        }                                TASK_DATA(task), TASK_DATLEN(task));
                }                return NULL;
         }
         if (rlen < sizeof(struct tagRPCCall)) {
                 rpc_SetErr(ERPCMISMATCH, "Too short RPC packet");
   
                /* execute call */                schedRead(TASK_ROOT(task), rxPacket, TASK_ARG(task), TASK_FD(task), 
                argc = 0;                                TASK_DATA(task), TASK_DATLEN(task));
                vals = NULL;                return NULL;
                memcpy(&ses, &rpc->call_session, sizeof ses);        } else
                if (!(f = rpc_srv_getCall(s, rpc->call_tag, rpc->call_hash))) {                rpc = (struct tagRPCCall*) buf;
                        rpc_SetErr(EINVAL, "Error:: call not found into RPC server ...\n"); 
                        ret = -6; 
                } else 
                        if ((ret = rpc_srv_execCall(f, rpc, v)) == -1) 
                                ret = -9; 
                        else 
                                argc = rpc_srv_getVars(f, &vals); 
makeReply: 
                /* made reply */ 
                memset(buf, 0, s->srv_netbuf); 
                rrpc = (struct tagRPCRet*) buf; 
                Limit = sizeof(struct tagRPCRet); 
   
                memcpy(&rrpc->ret_session, &ses, sizeof(rpc_sess_t));        /* check integrity of packet */
                rrpc->ret_tag = rpc->call_tag;        crc = ntohs(rpc->call_crc);
                rrpc->ret_hash = rpc->call_hash;        rpc->call_crc ^= rpc->call_crc;
                rrpc->ret_errno = rpc_Errno;        if (crc != crcFletcher16((u_short*) buf, ((rlen + 1) & ~1) / 2)) {
                rrpc->ret_retcode = ret;                rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
                rrpc->ret_argc = argc; 
   
                if (argc && vals) {                schedRead(TASK_ROOT(task), rxPacket, TASK_ARG(task), TASK_FD(task), 
                        /* Go Encapsulate variables ... */                                TASK_DATA(task), TASK_DATLEN(task));
                        v = (ait_val_t*) (buf + Limit);                return NULL;
                        if (argc * sizeof(ait_val_t) > s->srv_netbuf - Limit) {        }
                                for (i = 0; i < argc; i++) 
                                        AIT_FREE_VAL(&vals[i]); 
                                rpc_srv_freeVars(f); 
                                vals = NULL; 
                                argc = 0; 
                                ret = -7; 
                                rpc_SetErr(EMSGSIZE, "Error:: in prepare RPC packet values (-7) ...\n"); 
                                goto makeReply; 
                        } else 
                                Limit += argc * sizeof(ait_val_t); 
                        memcpy(v, vals, argc * sizeof(ait_val_t)); 
   
                        /* RPC send variables types are OK! */        /* check RPC packet session info */
                        data = (u_char*) v + argc * sizeof(ait_val_t);        if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) {
                        for (ret = i = 0; i < argc; i++) {                rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
                                switch (AIT_TYPE(&vals[i])) {                rpc->call_argc ^= rpc->call_argc;
                                        case buffer:                rpc->call_rep.ret = RPC_ERROR(-1);
                                                if (ret || Limit + vals[i].val_len > s->srv_netbuf) {                rpc->call_rep.eno = RPC_ERROR(errno);
                                                        rpc_SetErr(EMSGSIZE, "Error:: in prepare RPC packet (-7) ...\n");                goto end;
                                                        rrpc->ret_retcode = ret = -7;        } else {
                                                        rrpc->ret_argc = 0;                /* change socket timeout from last packet */
                                                        break;                ts.tv_sec = rpc->call_session.sess_timeout;
                                                }                ts.tv_nsec = 0;
                 schedPolling(TASK_ROOT(task), &ts, NULL);
         }
   
                                                memcpy(data, vals[i].val.buffer, vals[i].val_len);        /* execute RPC call */
                                                data += AIT_LEN(&vals[i]);        schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), 0, 
                                                Limit += AIT_LEN(&vals[i]);                        TASK_DATA(task), TASK_DATLEN(task));
                                                break; 
                                        case string: 
                                                if (ret || Limit + vals[i].val_len > s->srv_netbuf) { 
                                                        rpc_SetErr(EMSGSIZE, "Error:: in prepare RPC packet (-7) ...\n"); 
                                                        rrpc->ret_retcode = ret = -7; 
                                                        rrpc->ret_argc = 0; 
                                                        break; 
                                                } 
   
                                                memcpy(data, vals[i].val.string, vals[i].val_len);end:
                                                data += AIT_LEN(&vals[i]);        /* send RPC reply */
                                                Limit += AIT_LEN(&vals[i]);        if (!(rpc->call_req.flags & RPC_NOREPLY))
                                                break;                schedWrite(TASK_ROOT(task), txPacket, TASK_ARG(task), TASK_FD(task), 
                                        case blob:                                TASK_DATA(task), TASK_DATLEN(task));
                                                if (s->srv_blob.state == disable) {        else
                                                        rpc_SetErr(ENOTSUP, "Error:: BLOB server is disabled\n");                schedRead(TASK_ROOT(task), rxPacket, TASK_ARG(task), TASK_FD(task), 
                                                        rrpc->ret_retcode = ret = -5;                                TASK_DATA(task), TASK_DATLEN(task));
                                                        rrpc->ret_argc = 0;        return NULL;
                                                        break;}
                                                } 
                                                if (s->srv_blob.state == kill) { 
                                                        rpc_SetErr(ENOTSUP, "Error:: BLOB server is gone.\n"); 
                                                        rrpc->ret_retcode = ret = -5; 
                                                        rrpc->ret_argc = 0; 
                                                        break; 
                                                } 
                                        default: 
                                                break; 
                                } 
   
                                /* don't add zero copy at this position, because buffer/string must be freed! */static void *
                                AIT_FREE_VAL(&vals[i]);rpc_srv_dispatchCall(void *arg)
                        }{
                        rpc_srv_freeVars(f);        rpc_cli_t *c = arg;
                        vals = NULL;        rpc_srv_t *s;
                        argc = 0;        u_char *buf;
                }        sched_root_task_t *root;
         struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
   
                ret = send(c->cli_sock, buf, Limit, 0);        if (!arg) {
                if (ret == -1) {                rpc_SetErr(EINVAL, "Invalid parameter can`t procced RPC client");
                        LOGERR;                return NULL;
                        ret = -8;        } else
                        break;                s = c->cli_parent;
                } 
                if (ret != Limit) { 
                        rpc_SetErr(ECANCELED, "Error:: in send RPC request, should be send %d bytes, " 
                                        "really is %d\n", Limit, ret); 
                        ret = -9; 
                        break; 
                } 
        } while (ret > -1 || s->srv_kill != kill); 
   
           /* allocate net buffer */
           buf = malloc(s->srv_netbuf);
           if (!buf) {
                   LOGERR;
                   return NULL;
           }
   
           root = schedBegin();
           if (!root) {
                   rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                   free(buf);
                   return NULL;
           } else {
                   schedTermCondition(root, kill);
                   schedPolling(root, &ts, NULL);
           }
   
           schedRead(root, rxPacket, c, c->cli_sock, buf, s->srv_netbuf);
   
           schedRun(root, (void*) &s->srv_kill);
           schedEnd(&root);
   
         shutdown(c->cli_sock, SHUT_RDWR);          shutdown(c->cli_sock, SHUT_RDWR);
         close(c->cli_sock);          close(c->cli_sock);
         memset(c, 0, sizeof(rpc_cli_t));          memset(c, 0, sizeof(rpc_cli_t));
         free(buf);          free(buf);
        return (void*) (long)ret;        return NULL;
 }  }
   
   
Line 334  rpc_srv_dispatchVars(void *arg) Line 312  rpc_srv_dispatchVars(void *arg)
                         break;                          break;
                 }                  }
                 if (ret < sizeof(struct tagBLOBHdr)) {                  if (ret < sizeof(struct tagBLOBHdr)) {
                        rpc_SetErr(EMSGSIZE, "Error:: too short BLOB packet ...\n");                        rpc_SetErr(ERPCMISMATCH, "Error:: too short BLOB packet ...\n");
                         ret = -4;                          ret = -4;
                        break;                        if (s->srv_kill != kill && s->srv_blob.state != kill)
                                 continue;
                         else
                                 break;
                 } else                  } else
                         blob = (struct tagBLOBHdr*) buf;                          blob = (struct tagBLOBHdr*) buf;
                 /* check BLOB packet session info */                  /* check BLOB packet session info */
Line 378  rpc_srv_dispatchVars(void *arg) Line 359  rpc_srv_dispatchVars(void *arg)
                                         ret = -7;                                          ret = -7;
                                 break;                                  break;
                         default:                          default:
                                rpc_SetErr(EINVAL, "Error:: unsupported BLOB command (%d)...\n",                                 rpc_SetErr(EPROCUNAVAIL, "Error:: unsupported BLOB command (%d)...\n", 
                                                 blob->hdr_cmd);                                                  blob->hdr_cmd);
                                 ret = -7;                                  ret = -7;
                 }                  }
Line 394  makeReply: Line 375  makeReply:
                         break;                          break;
                 }                  }
                 if (ret != sizeof buf) {                  if (ret != sizeof buf) {
                        rpc_SetErr(ECANCELED, "Error:: in send BLOB reply, should be send %d bytes, "                        rpc_SetErr(EPROCUNAVAIL, "Error:: in send BLOB reply, should be send %d bytes, "
                                         "really is %d\n", sizeof buf, ret);                                          "really is %d\n", sizeof buf, ret);
                         ret = -9;                          ret = -9;
                        break;                        if (s->srv_kill != kill && s->srv_blob.state != kill)
                                 continue;
                         else
                                 break;
                 }                  }
         } while (ret > -1 || s->srv_kill != kill);          } while (ret > -1 || s->srv_kill != kill);
   
         shutdown(c->cli_sock, SHUT_RDWR);          shutdown(c->cli_sock, SHUT_RDWR);
         close(c->cli_sock);          close(c->cli_sock);
         memset(c, 0, sizeof(rpc_cli_t));          memset(c, 0, sizeof(rpc_cli_t));
        return (void*) (long)ret;        return (void*) ((long)ret);
 }  }
   
 // -------------------------------------------------  // -------------------------------------------------
   
 /*  /*
 * rpc_srv_initBLOBServer() Init & create BLOB Server * rpc_srv_initBLOBServer() - Init & create BLOB Server
  *
  * @srv = RPC server instance   * @srv = RPC server instance
  * @Port = Port for bind server, if Port == 0 default port is selected   * @Port = Port for bind server, if Port == 0 default port is selected
  * @diskDir = Disk place for BLOB file objects   * @diskDir = Disk place for BLOB file objects
Line 420  int Line 405  int
 rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)  rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)
 {  {
         int n = 1;          int n = 1;
        struct sockaddr sa;        io_sockaddr_t sa;
        struct sockaddr_in *sin = (struct sockaddr_in*) &sa; 
        struct sockaddr_in6 *sin6 = (struct sockaddr_in6*) &sa; 
        struct sockaddr_un *sun = (struct sockaddr_un*) &sa; 
   
         if (!srv) {          if (!srv) {
                rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t init BLOB server ...\n");                rpc_SetErr(EINVAL, "Invalid parameters can`t init BLOB server");
                 return -1;                  return -1;
         }          }
         if (srv->srv_blob.state) {          if (srv->srv_blob.state) {
                rpc_SetErr(EPERM, "Warning:: Already started BLOB server!\n");                rpc_SetErr(EPERM, "Already started BLOB server!");
                 return 0;                  return 0;
         }          }
         if (!Port)  
                 Port = RPC_DEFPORT + 1;  
   
         memset(&srv->srv_blob, 0, sizeof srv->srv_blob);          memset(&srv->srv_blob, 0, sizeof srv->srv_blob);
         if (access(diskDir, R_OK | W_OK) == -1) {          if (access(diskDir, R_OK | W_OK) == -1) {
                 LOGERR;                  LOGERR;
                 return -1;                  return -1;
         } else          } else
                strlcpy(srv->srv_blob.dir, diskDir, UCHAR_MAX + 1);                srv->srv_blob.dir = strdup(diskDir);
   
         srv->srv_blob.server.cli_tid = pthread_self();          srv->srv_blob.server.cli_tid = pthread_self();
         srv->srv_blob.server.cli_parent = srv;          srv->srv_blob.server.cli_parent = srv;
   
         memcpy(&sa, &srv->srv_server.cli_sa, sizeof sa);          memcpy(&sa, &srv->srv_server.cli_sa, sizeof sa);
        switch (srv->srv_server.cli_sa.sa_family) {        switch (sa.sa.sa_family) {
                 case AF_INET:                  case AF_INET:
                        sin->sin_port = htons(Port);                        sa.sin.sin_port = htons(Port ? Port : ntohs(sa.sin.sin_port) + 1);
                        memcpy(&srv->srv_blob.server.cli_sa, sin, sizeof(struct sockaddr)); 
                         break;                          break;
                 case AF_INET6:                  case AF_INET6:
                        sin6->sin6_port = htons(Port);                        sa.sin6.sin6_port = htons(Port ? Port : ntohs(sa.sin6.sin6_port) + 1);
                        memcpy(&srv->srv_blob.server.cli_sa, sin6, sizeof(struct sockaddr)); 
                         break;                          break;
                 case AF_LOCAL:                  case AF_LOCAL:
                        strlcat(sun->sun_path, ".blob", sizeof sun->sun_path);                        strlcat(sa.sun.sun_path, ".blob", sizeof sa.sun.sun_path);
                        memcpy(&srv->srv_blob.server.cli_sa, sun, sizeof(struct sockaddr)); 
                         break;                          break;
                 default:                  default:
                           free(srv->srv_blob.dir);
                         return -1;                          return -1;
         }          }
           memcpy(&srv->srv_blob.server.cli_sa, &sa, sizeof sa);
   
         /* create BLOB server socket */          /* create BLOB server socket */
        srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa_family, SOCK_STREAM, 0);        srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);
         if (srv->srv_blob.server.cli_sock == -1) {          if (srv->srv_blob.server.cli_sock == -1) {
                 LOGERR;                  LOGERR;
                   free(srv->srv_blob.dir);
                 return -1;                  return -1;
         }          }
         if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {          if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
                 LOGERR;                  LOGERR;
                 close(srv->srv_blob.server.cli_sock);                  close(srv->srv_blob.server.cli_sock);
                   free(srv->srv_blob.dir);
                 return -1;                  return -1;
         }          }
         n = srv->srv_netbuf;          n = srv->srv_netbuf;
         if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {          if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
                 LOGERR;                  LOGERR;
                 close(srv->srv_blob.server.cli_sock);                  close(srv->srv_blob.server.cli_sock);
                   free(srv->srv_blob.dir);
                 return -1;                  return -1;
         }          }
         if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {          if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
                 LOGERR;                  LOGERR;
                 close(srv->srv_blob.server.cli_sock);                  close(srv->srv_blob.server.cli_sock);
                   free(srv->srv_blob.dir);
                 return -1;                  return -1;
         }          }
        if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa        if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa, 
                                sizeof srv->srv_blob.server.cli_sa) == -1) {                                srv->srv_blob.server.cli_sa.sa.sa_len) == -1) {
                 LOGERR;                  LOGERR;
                 close(srv->srv_blob.server.cli_sock);                  close(srv->srv_blob.server.cli_sock);
                   free(srv->srv_blob.dir);
                 return -1;                  return -1;
         }          }
   
Line 498  rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s Line 482  rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s
         if (!srv->srv_blob.clients) {          if (!srv->srv_blob.clients) {
                 LOGERR;                  LOGERR;
                 close(srv->srv_blob.server.cli_sock);                  close(srv->srv_blob.server.cli_sock);
                   free(srv->srv_blob.dir);
                 return -1;                  return -1;
         } else          } else
                 memset(srv->srv_blob.clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));                  memset(srv->srv_blob.clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));
   
         pthread_mutex_init(&srv->srv_blob.mtx, NULL);          pthread_mutex_init(&srv->srv_blob.mtx, NULL);
   
         pthread_mutex_lock(&srv->srv_mtx);  
         rpc_srv_registerCall(srv, NULL, CALL_BLOBSHUTDOWN, 0);          rpc_srv_registerCall(srv, NULL, CALL_BLOBSHUTDOWN, 0);
        rpc_srv_registerCall(srv, NULL, CALL_BLOBCLIENTS, 0);        rpc_srv_registerCall(srv, NULL, CALL_BLOBCLIENTS, 1);
        rpc_srv_registerCall(srv, NULL, CALL_BLOBVARS, 0);        rpc_srv_registerCall(srv, NULL, CALL_BLOBVARS, 1);
        rpc_srv_registerCall(srv, NULL, CALL_BLOBSTATE, 1);        rpc_srv_registerCall(srv, NULL, CALL_BLOBSTATE, 0);
        pthread_mutex_unlock(&srv->srv_mtx); 
   
         srv->srv_blob.state = enable;   /* enable BLOB */          srv->srv_blob.state = enable;   /* enable BLOB */
         return 0;          return 0;
 }  }
   
 /*  /*
 * rpc_srv_endBLOBServer() Destroy BLOB server, close all opened sockets and free resources * rpc_srv_endBLOBServer() - Destroy BLOB server, close all opened sockets and free resources
  *
  * @srv = RPC Server instance   * @srv = RPC Server instance
  * return: none   * return: none
  */   */
Line 528  rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv) Line 512  rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
         rpc_blob_t *f;          rpc_blob_t *f;
   
         if (!srv) {          if (!srv) {
                rpc_SetErr(EINVAL, "Error:: Can`t destroy server because parameter is null!\n");                rpc_SetErr(EINVAL, "Can`t destroy server because parameter is null!");
                 return;                  return;
         } else          } else
                 srv->srv_blob.state = kill;                  srv->srv_blob.state = kill;
Line 538  rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv) Line 522  rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
         rpc_srv_unregisterCall(srv, NULL, CALL_BLOBVARS);          rpc_srv_unregisterCall(srv, NULL, CALL_BLOBVARS);
         rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSTATE);          rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSTATE);
   
           if (srv->srv_blob.dir)
                   free(srv->srv_blob.dir);
   
         /* close all clients connections & server socket */          /* close all clients connections & server socket */
         for (i = 0, c = srv->srv_blob.clients; i < srv->srv_numcli && c; i++, c++)          for (i = 0, c = srv->srv_blob.clients; i < srv->srv_numcli && c; i++, c++)
                if (c->cli_sa.sa_family)                if (c->cli_sa.sa.sa_family)
                         shutdown(c->cli_sock, SHUT_RDWR);                          shutdown(c->cli_sock, SHUT_RDWR);
         close(srv->srv_blob.server.cli_sock);          close(srv->srv_blob.server.cli_sock);
   
           pthread_mutex_lock(&srv->srv_blob.mtx);
         if (srv->srv_blob.clients) {          if (srv->srv_blob.clients) {
                 free(srv->srv_blob.clients);                  free(srv->srv_blob.clients);
                 srv->srv_blob.clients = NULL;                  srv->srv_blob.clients = NULL;
         }          }
   
         /* detach blobs */          /* detach blobs */
         pthread_mutex_lock(&srv->srv_blob.mtx);  
         while ((f = srv->srv_blob.blobs)) {          while ((f = srv->srv_blob.blobs)) {
                 srv->srv_blob.blobs = f->blob_next;                  srv->srv_blob.blobs = f->blob_next;
                 rpc_srv_blobFree(srv, f);                  rpc_srv_blobFree(srv, f);
Line 563  rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv) Line 550  rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
 }  }
   
 /*  /*
 * rpc_srv_execBLOBServer() Execute Main BLOB server loop and wait for clients requests * rpc_srv_loopBLOB() - Execute Main BLOB server loop and wait for clients requests
  *
  * @srv = RPC Server instance   * @srv = RPC Server instance
  * return: -1 error or 0 ok, infinite loop ...   * return: -1 error or 0 ok, infinite loop ...
  */   */
 int  int
rpc_srv_execBLOBServer(rpc_srv_t * __restrict srv)rpc_srv_loopBLOB(rpc_srv_t * __restrict srv)
 {  {
        socklen_t salen = sizeof(struct sockaddr);        socklen_t salen = sizeof(io_sockaddr_t);
         register int i;          register int i;
         rpc_cli_t *c;          rpc_cli_t *c;
         fd_set fds;          fd_set fds;
         int ret;          int ret;
         struct timeval tv = { DEF_RPC_TIMEOUT, 0 };          struct timeval tv = { DEF_RPC_TIMEOUT, 0 };
           pthread_attr_t attr;
   
         if (!srv || srv->srv_blob.state == kill) {          if (!srv || srv->srv_blob.state == kill) {
                 rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start BLOB server ...\n");                  rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start BLOB server ...\n");
                 return -1;                  return -1;
         }          }
   
           tv.tv_sec = srv->srv_session.sess_timeout;
   
         if (listen(srv->srv_blob.server.cli_sock, SOMAXCONN) == -1) {          if (listen(srv->srv_blob.server.cli_sock, SOMAXCONN) == -1) {
                 LOGERR;                  LOGERR;
                 return -1;                  return -1;
         }          }
   
           pthread_attr_init(&attr);
           pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
   
           /* main BLOB loop */
         while (srv->srv_blob.state != kill && srv->srv_kill != kill) {          while (srv->srv_blob.state != kill && srv->srv_kill != kill) {
                 for (c = srv->srv_blob.clients, i = 0; i < srv->srv_numcli && c; i++, c++)                  for (c = srv->srv_blob.clients, i = 0; i < srv->srv_numcli && c; i++, c++)
                        if (!c->cli_sa.sa_family)                        if (!c->cli_sa.sa.sa_family)
                                 break;                                  break;
                 if (i >= srv->srv_numcli) {                  if (i >= srv->srv_numcli) {
                         usleep(1000000);  
 #ifdef HAVE_PTHREAD_YIELD  #ifdef HAVE_PTHREAD_YIELD
                         pthread_yield();                          pthread_yield();
   #else
                           usleep(1000000);
 #endif  #endif
                         continue;                          continue;
                 }                  }
Line 610  rpc_srv_execBLOBServer(rpc_srv_t * __restrict srv) Line 606  rpc_srv_execBLOBServer(rpc_srv_t * __restrict srv)
                 if (!ret)                  if (!ret)
                         continue;                          continue;
   
                c->cli_sock = accept(srv->srv_blob.server.cli_sock, &c->cli_sa, &salen);                c->cli_sock = accept(srv->srv_blob.server.cli_sock, &c->cli_sa.sa, &salen);
                 if (c->cli_sock == -1) {                  if (c->cli_sock == -1) {
                         LOGERR;                          LOGERR;
                         continue;                          continue;
                 } else                  } else
                         c->cli_parent = srv;                          c->cli_parent = srv;
   
                if (pthread_create(&c->cli_tid, NULL, rpc_srv_dispatchVars, c)) {                /* spawn dispatch thread for BLOB client */
                 if (pthread_create(&c->cli_tid, &attr, rpc_srv_dispatchVars, c)) {
                         LOGERR;                          LOGERR;
                         continue;                          continue;
                } else                }
                        pthread_detach(c->cli_tid); 
         }          }
   
        srv->srv_blob.state = disable;        srv->srv_blob.state = kill;
   
           pthread_attr_destroy(&attr);
         return 0;          return 0;
 }  }
   
   
 /*  /*
 * rpc_srv_initServer() Init & create RPC Server * rpc_srv_initServer() - Init & create RPC Server
  *
  * @regProgID = ProgramID for authentication & recognition   * @regProgID = ProgramID for authentication & recognition
  * @regProcID = ProcessID for authentication & recognition   * @regProcID = ProcessID for authentication & recognition
  * @concurentClients = Concurent clients at same time to this server   * @concurentClients = Concurent clients at same time to this server
 * @netBuf = Network buffer length, if =0 == BUFSIZ * @netBuf = Network buffer length, if =0 == BUFSIZ (also meaning max RPC packet)
  * @family = Family type, AF_INET, AF_INET6 or AF_LOCAL   * @family = Family type, AF_INET, AF_INET6 or AF_LOCAL
  * @csHost = Host name or address for bind server, if NULL any address   * @csHost = Host name or address for bind server, if NULL any address
  * @Port = Port for bind server, if Port == 0 default port is selected   * @Port = Port for bind server, if Port == 0 default port is selected
Line 648  rpc_srv_initServer(u_int regProgID, u_int regProcID, i Line 646  rpc_srv_initServer(u_int regProgID, u_int regProcID, i
         rpc_srv_t *srv = NULL;          rpc_srv_t *srv = NULL;
         int n = 1;          int n = 1;
         struct hostent *host = NULL;          struct hostent *host = NULL;
        struct sockaddr sa;        io_sockaddr_t sa;
        struct sockaddr_in *sin = (struct sockaddr_in*) &sa; 
        struct sockaddr_in6 *sin6 = (struct sockaddr_in6*) &sa; 
        struct sockaddr_un *sun = (struct sockaddr_un*) &sa; 
   
         if (!concurentClients || !regProgID ||           if (!concurentClients || !regProgID || 
                         (family != AF_INET && family != AF_INET6 && family != AF_LOCAL)) {                          (family != AF_INET && family != AF_INET6 && family != AF_LOCAL)) {
Line 670  rpc_srv_initServer(u_int regProgID, u_int regProcID, i Line 665  rpc_srv_initServer(u_int regProgID, u_int regProcID, i
                 }                  }
         }          }
         memset(&sa, 0, sizeof sa);          memset(&sa, 0, sizeof sa);
        sa.sa_family = family;        sa.sa.sa_family = family;
         switch (family) {          switch (family) {
                 case AF_INET:                  case AF_INET:
                        sin->sin_len = sizeof(struct sockaddr_in);                        sa.sin.sin_len = sizeof(struct sockaddr_in);
                        sin->sin_port = htons(Port);                        sa.sin.sin_port = htons(Port);
                         if (csHost)                          if (csHost)
                                memcpy(&sin->sin_addr, host->h_addr, host->h_length);                                memcpy(&sa.sin.sin_addr, host->h_addr, host->h_length);
                         break;                          break;
                 case AF_INET6:                  case AF_INET6:
                        sin6->sin6_len = sizeof(struct sockaddr_in6);                        sa.sin6.sin6_len = sizeof(struct sockaddr_in6);
                        sin6->sin6_port = htons(Port);                        sa.sin6.sin6_port = htons(Port);
                         if (csHost)                          if (csHost)
                                memcpy(&sin6->sin6_addr, host->h_addr, host->h_length);                                memcpy(&sa.sin6.sin6_addr, host->h_addr, host->h_length);
                         break;                          break;
                 case AF_LOCAL:                  case AF_LOCAL:
                        sun->sun_len = sizeof(struct sockaddr_un);                        sa.sun.sun_len = sizeof(struct sockaddr_un);
                         if (csHost)                          if (csHost)
                                strlcpy(sun->sun_path, csHost, sizeof sun->sun_path);                                strlcpy(sa.sun.sun_path, csHost, sizeof sa.sun.sun_path);
                         unlink(sa.sun.sun_path);
                         break;                          break;
                 default:                  default:
                         rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t start RPC server ...\n");                          rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t start RPC server ...\n");
Line 704  rpc_srv_initServer(u_int regProgID, u_int regProcID, i Line 700  rpc_srv_initServer(u_int regProgID, u_int regProcID, i
         srv->srv_netbuf = netBuf;          srv->srv_netbuf = netBuf;
         srv->srv_numcli = concurentClients;          srv->srv_numcli = concurentClients;
         srv->srv_session.sess_version = RPC_VERSION;          srv->srv_session.sess_version = RPC_VERSION;
           srv->srv_session.sess_timeout = DEF_RPC_TIMEOUT;
         srv->srv_session.sess_program = regProgID;          srv->srv_session.sess_program = regProgID;
         srv->srv_session.sess_process = regProcID;          srv->srv_session.sess_process = regProcID;
   
         srv->srv_server.cli_tid = pthread_self();          srv->srv_server.cli_tid = pthread_self();
         srv->srv_server.cli_parent = srv;          srv->srv_server.cli_parent = srv;
        switch (family) {        memcpy(&srv->srv_server.cli_sa, &sa, sizeof sa);
                case AF_INET: 
                        memcpy(&srv->srv_server.cli_sa, sin, sizeof srv->srv_server.cli_sa); 
                        break; 
                case AF_INET6: 
                        memcpy(&srv->srv_server.cli_sa, sin6, sizeof srv->srv_server.cli_sa); 
                        break; 
                case AF_LOCAL: 
                        memcpy(&srv->srv_server.cli_sa, sun, sizeof srv->srv_server.cli_sa); 
                        unlink(sun->sun_path); 
                        break; 
        } 
   
         /* create server socket */          /* create server socket */
         srv->srv_server.cli_sock = socket(family, SOCK_STREAM, 0);          srv->srv_server.cli_sock = socket(family, SOCK_STREAM, 0);
Line 748  rpc_srv_initServer(u_int regProgID, u_int regProcID, i Line 734  rpc_srv_initServer(u_int regProgID, u_int regProcID, i
                 free(srv);                  free(srv);
                 return NULL;                  return NULL;
         }          }
        if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa, sizeof srv->srv_server.cli_sa) == -1) {        if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa, 
                                 srv->srv_server.cli_sa.sa.sa_len) == -1) {
                 LOGERR;                  LOGERR;
                 close(srv->srv_server.cli_sock);                  close(srv->srv_server.cli_sock);
                 free(srv);                  free(srv);
Line 768  rpc_srv_initServer(u_int regProgID, u_int regProcID, i Line 755  rpc_srv_initServer(u_int regProgID, u_int regProcID, i
         pthread_mutex_init(&srv->srv_mtx, NULL);          pthread_mutex_init(&srv->srv_mtx, NULL);
   
         rpc_srv_registerCall(srv, NULL, CALL_SRVSHUTDOWN, 0);          rpc_srv_registerCall(srv, NULL, CALL_SRVSHUTDOWN, 0);
        rpc_srv_registerCall(srv, NULL, CALL_SRVCLIENTS, 0);        rpc_srv_registerCall(srv, NULL, CALL_SRVCLIENTS, 1);
        rpc_srv_registerCall(srv, NULL, CALL_SRVCALLS, 0);        rpc_srv_registerCall(srv, NULL, CALL_SRVSESSIONS, 4);
        rpc_srv_registerCall(srv, NULL, CALL_SRVSESSIONS, 0);        rpc_srv_registerCall(srv, NULL, CALL_SRVCALLS, 1);
         return srv;          return srv;
 }  }
   
 /*  /*
 * rpc_srv_endServer() Destroy RPC server, close all opened sockets and free resources * rpc_srv_endServer() - Destroy RPC server, close all opened sockets and free resources
 * @srv = RPC Server instance *
  * @psrv = RPC Server instance
  * return: none   * return: none
  */   */
 void  void
rpc_srv_endServer(rpc_srv_t * __restrict srv)rpc_srv_endServer(rpc_srv_t ** __restrict psrv)
 {  {
         rpc_cli_t *c;          rpc_cli_t *c;
         register int i;          register int i;
         rpc_func_t *f;          rpc_func_t *f;
   
        if (!srv) {        if (!psrv || !*psrv) {
                 rpc_SetErr(EINVAL, "Error:: Can`t destroy server because parameter is null!\n");                  rpc_SetErr(EINVAL, "Error:: Can`t destroy server because parameter is null!\n");
                 return;                  return;
         }          }
   
        rpc_srv_endBLOBServer(srv);        rpc_srv_endBLOBServer(*psrv);
   
         /* close all clients connections & server socket */          /* close all clients connections & server socket */
        for (i = 0, c = srv->srv_clients; i < srv->srv_numcli && c; i++, c++)        for (i = 0, c = (*psrv)->srv_clients; i < (*psrv)->srv_numcli && c; i++, c++)
                if (c->cli_sa.sa_family) {                if (c->cli_sa.sa.sa_family) {
                         shutdown(c->cli_sock, SHUT_RDWR);                          shutdown(c->cli_sock, SHUT_RDWR);
                         close(c->cli_sock);                          close(c->cli_sock);
                 }                  }
        close(srv->srv_server.cli_sock);        close((*psrv)->srv_server.cli_sock);
   
        if (srv->srv_clients) {        if ((*psrv)->srv_clients) {
                free(srv->srv_clients);                free((*psrv)->srv_clients);
                srv->srv_clients = NULL;                (*psrv)->srv_clients = NULL;
                srv->srv_numcli = 0;                (*psrv)->srv_numcli = 0;
         }          }
   
         /* detach exported calls */          /* detach exported calls */
        pthread_mutex_lock(&srv->srv_mtx);        pthread_mutex_lock(&(*psrv)->srv_mtx);
        while ((f = srv->srv_funcs)) {        while ((f = (*psrv)->srv_funcs)) {
                srv->srv_funcs = f->func_next;                (*psrv)->srv_funcs = f->func_next;
                 io_freeVars(&f->func_vars);
                 AIT_FREE_VAL(&f->func_name);
                 AIT_FREE_VAL(&f->func_file);
                 free(f);                  free(f);
         }          }
        pthread_mutex_unlock(&srv->srv_mtx);        pthread_mutex_unlock(&(*psrv)->srv_mtx);
   
        while (pthread_mutex_trylock(&srv->srv_mtx) == EBUSY);        while (pthread_mutex_trylock(&(*psrv)->srv_mtx) == EBUSY);
        pthread_mutex_destroy(&srv->srv_mtx);        pthread_mutex_destroy(&(*psrv)->srv_mtx);
   
        free(srv);        free(*psrv);
        srv = NULL;        *psrv = NULL;
 }  }
   
 /*  /*
 * rpc_srv_execServer() Execute Main server loop and wait for clients requests * rpc_srv_loopServer() - Execute Main server loop and wait for clients requests
  *
  * @srv = RPC Server instance   * @srv = RPC Server instance
  * return: -1 error or 0 ok, infinite loop ...   * return: -1 error or 0 ok, infinite loop ...
  */   */
 int  int
rpc_srv_execServer(rpc_srv_t * __restrict srv)rpc_srv_loopServer(rpc_srv_t * __restrict srv)
 {  {
        socklen_t salen = sizeof(struct sockaddr);        socklen_t salen = sizeof(io_sockaddr_t);
         register int i;          register int i;
         rpc_cli_t *c;          rpc_cli_t *c;
         fd_set fds;          fd_set fds;
         int ret;          int ret;
         struct timeval tv = { DEF_RPC_TIMEOUT, 0 };          struct timeval tv = { DEF_RPC_TIMEOUT, 0 };
           pthread_attr_t attr;
   
         if (!srv) {          if (!srv) {
                 rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start RPC server ...\n");                  rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start RPC server ...\n");
                 return -1;                  return -1;
         }          }
   
           tv.tv_sec = srv->srv_session.sess_timeout;
   
           /* activate BLOB server worker if srv->srv_blob.state == enable */
           rpc_srv_execBLOBServer(srv);
   
         if (listen(srv->srv_server.cli_sock, SOMAXCONN) == -1) {          if (listen(srv->srv_server.cli_sock, SOMAXCONN) == -1) {
                 LOGERR;                  LOGERR;
                 return -1;                  return -1;
         }          }
   
           pthread_attr_init(&attr);
           pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
   
           /* main rpc loop */
         while (srv->srv_kill != kill) {          while (srv->srv_kill != kill) {
                 for (c = srv->srv_clients, i = 0; i < srv->srv_numcli && c; i++, c++)                  for (c = srv->srv_clients, i = 0; i < srv->srv_numcli && c; i++, c++)
                        if (!c->cli_sa.sa_family)                        if (!c->cli_sa.sa.sa_family)
                                 break;                                  break;
                 if (i >= srv->srv_numcli) {                  if (i >= srv->srv_numcli) {
                         usleep(1000000);  
 #ifdef HAVE_PTHREAD_YIELD  #ifdef HAVE_PTHREAD_YIELD
                         pthread_yield();                          pthread_yield();
   #else
                           usleep(1000000);
 #endif  #endif
                         continue;                          continue;
                 }                  }
Line 870  rpc_srv_execServer(rpc_srv_t * __restrict srv) Line 873  rpc_srv_execServer(rpc_srv_t * __restrict srv)
                 if (!ret)                  if (!ret)
                         continue;                          continue;
   
                c->cli_sock = accept(srv->srv_server.cli_sock, &c->cli_sa, &salen);                c->cli_sock = accept(srv->srv_server.cli_sock, &c->cli_sa.sa, &salen);
                 if (c->cli_sock == -1) {                  if (c->cli_sock == -1) {
                         LOGERR;                          LOGERR;
                         continue;                          continue;
                 } else                  } else
                         c->cli_parent = srv;                          c->cli_parent = srv;
   
                if (pthread_create(&c->cli_tid, NULL, rpc_srv_dispatchCall, c)) {                /* spawn rpc client dispatcher */
                 if (pthread_create(&c->cli_tid, &attr, rpc_srv_dispatchCall, c)) {
                         LOGERR;                          LOGERR;
                         continue;                          continue;
                } else                }
                        pthread_detach(c->cli_tid); 
         }          }
   
           pthread_attr_destroy(&attr);
         return 0;          return 0;
 }  }
   
Line 891  rpc_srv_execServer(rpc_srv_t * __restrict srv) Line 895  rpc_srv_execServer(rpc_srv_t * __restrict srv)
   
 /*  /*
  * rpc_srv_execCall() Execute registered call from RPC server   * rpc_srv_execCall() Execute registered call from RPC server
    *
  * @call = Register RPC call   * @call = Register RPC call
  * @rpc = IN RPC call structure   * @rpc = IN RPC call structure
 * @args = IN RPC call array of rpc values * @args = IN RPC calling arguments from RPC client
  * return: -1 error, !=-1 ok   * return: -1 error, !=-1 ok
  */   */
 int  int
 rpc_srv_execCall(rpc_func_t * __restrict call, struct tagRPCCall * __restrict rpc,   rpc_srv_execCall(rpc_func_t * __restrict call, struct tagRPCCall * __restrict rpc, 
                ait_val_t * __restrict args)                array_t * __restrict args)
 {  {
         void *dl;          void *dl;
         rpc_callback_t func;          rpc_callback_t func;
         int ret;          int ret;
   
         if (!call || !rpc || !call->func_parent) {          if (!call || !rpc || !call->func_parent) {
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t exec call from RPC server ...\n");                rpc_SetErr(EINVAL, "Invalid parameter can`t exec function");
                 return -1;                  return -1;
         }          }
   
        dl = dlopen((char*) (*call->func_file ? call->func_file : NULL), RTLD_NOW);        dl = dlopen(AIT_VOID(&call->func_file), RTLD_NOW);
         if (!dl) {          if (!dl) {
                rpc_SetErr(ENOENT, "Error:: Can`t attach module %s!\n", dlerror());                rpc_SetErr(ENOENT, "Can`t attach module %s!", dlerror());
                 return -1;                  return -1;
         }          }
   
        func = dlsym(dl, (char*) call->func_name);        func = dlsym(dl, (const char*) AIT_GET_STR(&call->func_name));
         if (func)          if (func)
                ret = func(call, rpc->call_argc, args);                ret = func(call, ntohs(rpc->call_argc), args);
         else {          else {
                rpc_SetErr(ENOEXEC, "Error:: Can`t find function %s!\n", dlerror());                rpc_SetErr(ENOEXEC, "Can`t find function %s!", dlerror());
                 ret = -1;                  ret = -1;
         }          }
   

Removed from v.1.4.2.2  
changed lines
  Added in v.1.6.2.5


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>