Diff for /libaitrpc/src/srv.c between versions 1.1.1.1.2.2 and 1.9.2.25

version 1.1.1.1.2.2, 2010/06/18 15:47:34 version 1.9.2.25, 2012/05/17 15:48:45
Line 5 Line 5
 * $Author$  * $Author$
 * $Id$  * $Id$
 *  *
*************************************************************************/**************************************************************************
 The ELWIX and AITNET software is distributed under the following
 terms:
 
 All of the documentation and software included in the ELWIX and AITNET
 Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
 
 Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
         by Michael Pounov <misho@elwix.org>.  All rights reserved.
 
 Redistribution and use in source and binary forms, with or without
 modification, are permitted provided that the following conditions
 are met:
 1. Redistributions of source code must retain the above copyright
    notice, this list of conditions and the following disclaimer.
 2. Redistributions in binary form must reproduce the above copyright
    notice, this list of conditions and the following disclaimer in the
    documentation and/or other materials provided with the distribution.
 3. All advertising materials mentioning features or use of this software
    must display the following acknowledgement:
 This product includes software developed by Michael Pounov <misho@elwix.org>
 ELWIX - Embedded LightWeight unIX and its contributors.
 4. Neither the name of AITNET nor the names of its contributors
    may be used to endorse or promote products derived from this software
    without specific prior written permission.
 
 THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
 ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
 OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
 HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
 OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
 SUCH DAMAGE.
 */
 #include "global.h"  #include "global.h"
   
   
 static void *  static void *
rpc_srv_dispatchCall(void *arg)closeClient(sched_task_t *task)
 {  {
        rpc_cli_t *c = arg;        rpc_cli_t *c = TASK_ARG(task);
        rpc_srv_t *s;        rpc_srv_t *s = c->cli_parent;
        rpc_val_t *vals, *v = NULL; 
        rpc_func_t *f; 
        struct tagRPCCall *rpc; 
        struct tagRPCRet rrpc; 
        fd_set fds; 
        u_char buf[BUFSIZ], *data; 
        int ret, argc, Limit = 0; 
        register int i; 
   
        if (!arg) {        schedCancelby(TASK_ROOT(task), taskMAX, CRITERIA_ARG, TASK_ARG(task), NULL);
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t procced RPC client ...\n"); 
                return NULL; 
        } else 
                s = c->cli_parent; 
   
        do {        /* close client socket */
                FD_ZERO(&fds);        if (TASK_VAL(task))
                FD_SET(c->cli_sock, &fds);                shutdown(c->cli_sock, SHUT_RDWR);
                ret = select(c->cli_sock + 1, &fds, NULL, NULL, NULL);        close(c->cli_sock);
                if (ret == -1) {
                        ret = -2;        /* free buffer */
         AIT_FREE_VAL(&c->cli_buf);
 
         io_arrayDel(s->srv_clients, c->cli_id, 42);
         return NULL;
 }
 
 static void *
 txPacket(sched_task_t *task)
 {
         rpc_cli_t *c = TASK_ARG(task);
         rpc_srv_t *s = c->cli_parent;
         rpc_func_t *f = NULL;
         u_char buf[USHRT_MAX] = { 0 };
         struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
         int ret, wlen = sizeof(struct tagRPCCall);
 
         /* copy RPC header */
         memcpy(buf, TASK_DATA(task), wlen);
 
         if (rpc->call_argc) {
                 f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
                 if (!f) {
                         rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
                         rpc->call_argc ^= rpc->call_argc;
                         rpc->call_rep.ret = RPC_ERROR(-1);
                         rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                 } else {
                         rpc->call_argc = htons(io_arraySize(f->func_vars));
                         /* Go Encapsulate variables */
                         ret = io_vars2buffer(buf + wlen, sizeof buf - wlen, f->func_vars);
                         /* Free return values */
                         io_clrVars(f->func_vars);
                         if (ret == -1) {
                                 rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
                                 rpc->call_argc ^= rpc->call_argc;
                                 rpc->call_rep.ret = RPC_ERROR(-1);
                                 rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                         } else
                                 wlen += ret;
                 }                  }
                memset(&rrpc, 0, sizeof rrpc);        }
                memset(buf, 0, BUFSIZ);
                if ((ret = recv(c->cli_sock, buf, BUFSIZ, 0)) == -1) {        rpc->call_len = htons(wlen);
                        LOGERR;
                        ret = -3;        /* calculate CRC */
                        break;        rpc->call_crc ^= rpc->call_crc;
         rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
 
         /* send reply */
         ret = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);
         if (ret == -1 || ret != wlen) {
                 /* close connection */
                 schedEvent(TASK_ROOT(task), closeClient, c, 42, NULL, 0);
         }
 
         return NULL;
 }
 
 static void *
 execCall(sched_task_t *task)
 {
         rpc_cli_t *c = TASK_ARG(task);
         rpc_srv_t *s = c->cli_parent;
         rpc_func_t *f = NULL;
         array_t *arr = NULL;
         u_char *buf = AIT_GET_BUF(&c->cli_buf) + TASK_VAL(task);
         struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
         int argc = ntohs(rpc->call_argc);
 
         /* Go decapsulate variables ... */
         if (argc) {
                 arr = io_buffer2vars(buf + sizeof(struct tagRPCCall), 
                                 AIT_LEN(&c->cli_buf) - TASK_VAL(task) - sizeof(struct tagRPCCall), 
                                 argc, 1);
                 if (!arr) {
                         rpc_SetErr(ERPCMISMATCH, "#%d - %s", io_GetErrno(), io_GetError());
                         rpc->call_argc ^= rpc->call_argc;
                         rpc->call_rep.ret = RPC_ERROR(-1);
                         rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                         return NULL;
                 }                  }
                if (!ret) {          // receive EOF        } else
                        ret = 0;                arr = NULL;
                        break;
                }        if (!(f = rpc_srv_getCall(s, ntohs(rpc->call_tag)))) {
                if (ret < sizeof(struct tagRPCCall)) {                rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
                        rpc_SetErr(EMSGSIZE, "Error:: too short RPC packet ...\n");                rpc->call_argc ^= rpc->call_argc;
                        ret = -4;                rpc->call_rep.ret = RPC_ERROR(-1);
                        break;                rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                } else        } else {
                        rpc = (struct tagRPCCall*) buf;                /* if client doesn't want reply */
                // check RPC packet session info                argc = rpc->call_req.flags & RPC_NOREPLY;
                if (memcmp(&rpc->call_session, &s->srv_session, sizeof rpc->call_session)) {                rpc->call_rep.ret = RPC_ERROR(rpc_srv_execCall(f, rpc, arr));
                        rpc_SetErr(EINVAL, "Error:: get invalid RPC session ...\n");                if (rpc->call_rep.ret == htonl(-1)) {
                        ret = -5;                        rpc->call_rep.eno = RPC_ERROR(errno);
                        break;                        rpc->call_argc ^= rpc->call_argc;
                }                } else {
                // RPC is OK! Go decapsulate variables ...                        rpc->call_rep.eno ^= rpc->call_rep.eno;
                if (rpc->call_argc) {                        if (argc) {
                        v = (rpc_val_t*) (buf + sizeof(struct tagRPCCall));                                /* without reply */
                        // RPC received variables types OK!                                io_clrVars(f->func_vars);
                        data = (u_char*) v + rpc->call_argc * sizeof(rpc_val_t);                                rpc->call_argc ^= rpc->call_argc;
                        for (i = 0; i < rpc->call_argc; i++) {                        } else {
                                switch (v[i].val_type) {                                /* reply */
                                        case buffer:                                rpc->call_argc = htons(io_arraySize(f->func_vars));
                                                v[i].val.buffer = data; 
                                                data += v[i].val_len; 
                                                break; 
                                        case string: 
                                                v[i].val.string = (int8_t*) data; 
                                                data += v[i].val_len + 1; 
                                                break; 
                                        case array: 
                                                v[i].val.array = (int8_t**) data; 
                                                data += v[i].val_len; 
                                                break; 
                                        default: 
                                                break; 
                                } 
                         }                          }
                 }                  }
           }
   
                argc = 0;        io_arrayDestroy(&arr);
                vals = NULL;        return NULL;
                if (!(f = rpc_srv_getCall(s, rpc->call_tag, rpc->call_hash))) {}
                        rpc_SetErr(EINVAL, "Error:: call not found into RPC server ...\n"); 
                        ret = -6; 
                } else 
                        if ((ret = rpc_srv_execCall(s, f, rpc, v)) == -1) 
                                ret = -6; 
                        else 
                                argc = rpc_srv_getValsCall(f, &vals); 
   
                memcpy(&rrpc.ret_session, &rpc->call_session, sizeof rrpc.ret_session);static void *
                rrpc.ret_tag = rpc->call_tag;rxPacket(sched_task_t *task)
                rrpc.ret_hash = rpc->call_hash;{
                rrpc.ret_errno = rpc_Errno;        rpc_cli_t *c = TASK_ARG(task);
                rrpc.ret_retcode = ret;        rpc_srv_t *s = c->cli_parent;
                rrpc.ret_argc = argc;        int len, rlen, noreply;
         u_short crc, off = TASK_DATLEN(task);
         u_char *buf = AIT_GET_BUF(&c->cli_buf);
         struct tagRPCCall *rpc;
   
                memset(buf, 0, BUFSIZ);        memset(buf, 0, AIT_LEN(&c->cli_buf));
                memcpy(buf, &rrpc, (Limit = sizeof rrpc));        rlen = recv(TASK_FD(task), buf + off, AIT_LEN(&c->cli_buf) - off, 0);
                if (argc && vals) {        if (rlen < 1) {
                        v = (rpc_val_t*) (buf + sizeof rrpc);                /* close connection */
                        memcpy(v, vals, argc * sizeof(rpc_val_t));                schedEvent(TASK_ROOT(task), closeClient, c, 42, NULL, 0);
                        Limit += argc * sizeof(rpc_val_t);                return NULL;
                        data = (u_char*) v + argc * sizeof(rpc_val_t);        } else {
                        for (ret = i = 0; i < argc; i++) {                rlen += off;    /* add reminded bytes from previous rxPacket, if exists! */
                                switch (vals[i].val_type) {                off = 0;   /* process buffer from start offset == 0 */
                                        case buffer:        }
                                                if (ret || Limit + vals[i].val_len > BUFSIZ) { 
                                                        rpc_SetErr(EMSGSIZE, "Error:: in prepare RPC packet (-7) ...\n"); 
                                                        rrpc.ret_retcode = ret = -7; 
                                                        rrpc.ret_argc = 0; 
                                                        break; 
                                                } 
   
                                                memcpy(data, vals[i].val.buffer, vals[i].val_len);        do {
                                                data += vals[i].val_len;                /* check RPC packet */
                                                Limit += vals[i].val_len;                if (rlen < sizeof(struct tagRPCCall)) {
                                                break;                        rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
                                        case string: 
                                                if (ret || Limit + vals[i].val_len + 1 > BUFSIZ) { 
                                                        rpc_SetErr(EMSGSIZE, "Error:: in prepare RPC packet (-7) ...\n"); 
                                                        rrpc.ret_retcode = ret = -7; 
                                                        rrpc.ret_argc = 0; 
                                                        break; 
                                                } 
   
                                                memcpy(data, vals[i].val.string, vals[i].val_len + 1);                        /* reminder received previous bytes ;) */
                                                data += vals[i].val_len + 1;                        schedRead(TASK_ROOT(task), TASK_FUNC(task), TASK_ARG(task), 
                                                Limit += vals[i].val_len + 1;                                        TASK_FD(task), TASK_DATA(task), rlen);
                                                break;                        return NULL;
                                        case array:                } else
                                                if (ret || Limit + vals[i].val_len > BUFSIZ) {                        rpc = (struct tagRPCCall*) (buf + off);
                                                        rpc_SetErr(EMSGSIZE, "Error:: in prepare RPC packet (-7) ...\n"); 
                                                        rrpc.ret_retcode = ret = -7; 
                                                        rrpc.ret_argc = 0; 
                                                        break; 
                                                } 
   
                                                memcpy(data, vals[i].val.array, vals[i].val_len);                len = ntohs(rpc->call_len);
                                                data += vals[i].val_len;                rlen -= len;
                                                Limit += vals[i].val_len; 
                                                break; 
                                        default: 
                                                break; 
                                } 
   
                                RPC_FREE_VAL(&vals[i]);                /* check RPC packet lengths */
                        }                if (rlen < 0 || len < sizeof(struct tagRPCCall)) {
                         rpc_SetErr(ERPCMISMATCH, "Broken RPC packet length");
                         /* skip entire packet */
                         break;
                 }                  }
   
                if ((ret = send(c->cli_sock, buf, Limit, 0)) == -1) {                /* check integrity of packet */
                        LOGERR;                crc = ntohs(rpc->call_crc);
                        ret = -8;                rpc->call_crc ^= rpc->call_crc;
                        break;                if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
                         rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
 
                         off += len;
                         /* try next packet remaining into buffer */
                         continue;
                 }                  }
                if (ret != Limit) {
                        rpc_SetErr(EBADMSG, "Error:: in send RPC request, should be send %d bytes, "                noreply = rpc->call_req.flags & RPC_NOREPLY;
                                        "really is %d\n", Limit, ret);
                        ret = -9;                /* check RPC packet session info */
                        break;                if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) {
                         rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
                         rpc->call_argc ^= rpc->call_argc;
                         rpc->call_rep.ret = RPC_ERROR(-1);
                         rpc->call_rep.eno = RPC_ERROR(errno);
                 } else {
                         /* execute RPC call */
                         schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), off, NULL, 0);
                 }                  }
         } while (ret > -1);  
   
        shutdown(c->cli_sock, SHUT_RDWR);                /* send RPC reply */
        close(c->cli_sock);                if (!noreply)
        memset(c, 0, sizeof(rpc_cli_t));                        schedWrite(TASK_ROOT(task), txPacket, TASK_ARG(task), TASK_FD(task), rpc, len);
        return (void*) ret; 
} 
   
// -------------------------------------------------                off += len;
         } while (rlen > 0);
   
/*        /* lets get next packet */
 * rpc_srv_initServer() Init & create RPC Server        schedRead(TASK_ROOT(task), TASK_FUNC(task), TASK_ARG(task), TASK_FD(task), TASK_DATA(task), 0);
 * @regProgID = ProgramID for authentication & recognition        return NULL;
 * @regProcID = ProcessID for authentication & recognition}
 * @concurentClients = Concurent clients at same time to this server
 * @family = Family socket type, AF_INET or AF_INET6static void *
 * @csHost = Host name or IP address for bind server, if NULL any addressacceptClients(sched_task_t *task)
 * @Port = Port for bind server, if Port == 0 default port is selected 
 * return: NULL == error or !=NULL bind and created RPC server instance 
 */ 
rpc_srv_t * 
rpc_srv_initServer(u_int regProgID, u_int regProcID, int concurentClients,  
                u_short family, const char *csHost, u_short Port) 
 {  {
        rpc_srv_t *srv = NULL;        rpc_srv_t *srv = TASK_ARG(task);
        int n = 1;        rpc_cli_t *c = NULL;
        struct hostent *host = NULL;        register int i;
        struct sockaddr_in sin;        socklen_t salen = sizeof(io_sockaddr_t);
        struct sockaddr_in6 sin6; 
   
        if (!concurentClients || !regProgID || (family != AF_INET && family != AF_INET6)) {        /* check free slots for connect */
                rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t init RPC server ...\n");        for (i = 0; i < io_arraySize(srv->srv_clients) && 
                         (c = io_array(srv->srv_clients, i, rpc_cli_t*)); i++);
         if (c)  /* no more free slots! */
                 goto end;
         c = malloc(sizeof(rpc_cli_t));
         if (!c) {
                 LOGERR;
                 srv->srv_kill = 1;
                 return NULL;                  return NULL;
           } else {
                   memset(c, 0, sizeof(rpc_cli_t));
                   io_arraySet(srv->srv_clients, i, c);
                   c->cli_id = i;
                   c->cli_parent = srv;
         }          }
         if (!Port)  
                 Port = RPC_DEFPORT;  
         if (csHost) {  
                 host = gethostbyname2(csHost, family);  
                 if (!host) {  
                         rpc_SetErr(h_errno, "Error:: %s\n", hstrerror(h_errno));  
                         return NULL;  
                 }  
         }  
         switch (family) {  
                 case AF_INET:  
                         memset(&sin, 0, sizeof sin);  
                         sin.sin_len = sizeof sin;  
                         sin.sin_family = family;  
                         sin.sin_port = htons(Port);  
                         if (csHost)  
                                 memcpy(&sin.sin_addr, host->h_addr, host->h_length);  
                         break;  
                 case AF_INET6:  
                         memset(&sin6, 0, sizeof sin6);  
                         sin6.sin6_len = sizeof sin6;  
                         sin6.sin6_family = family;  
                         sin6.sin6_port = htons(Port);  
                         if (csHost)  
                                 memcpy(&sin6.sin6_addr, host->h_addr, host->h_length);  
                         break;  
                 default:  
                         rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t start RPC server ...\n");  
                         return NULL;  
         }  
   
        srv = malloc(sizeof(rpc_srv_t));        /* alloc empty buffer */
        if (!srv) {        AIT_SET_BUF2(&c->cli_buf, 0, srv->srv_netbuf);
 
         /* accept client */
         c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
         if (c->cli_sock == -1) {
                 LOGERR;                  LOGERR;
                return NULL;                AIT_FREE_VAL(&c->cli_buf);
                 io_arrayDel(srv->srv_clients, i, 42);
                 goto end;
         } else          } else
                memset(srv, 0, sizeof(rpc_srv_t));                fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
   
        srv->srv_numcli = concurentClients;        schedRead(TASK_ROOT(task), rxPacket, c, c->cli_sock, NULL, 0);
        srv->srv_session.sess_version = RPC_VERSION;end:
        srv->srv_session.sess_program = regProgID;        schedReadSelf(task);
        srv->srv_session.sess_process = regProcID;        return NULL;
 }
   
        srv->srv_server.cli_tid = pthread_self();/* ------------------------------------------------------ */
        srv->srv_server.cli_parent = srv;
        if (family == AF_INET)static void *
                memcpy(&srv->srv_server.cli_sa, &sin, sizeof srv->srv_server.cli_sa);closeBLOBClient(sched_task_t *task)
        else{
                memcpy(&srv->srv_server.cli_sa, &sin6, sizeof srv->srv_server.cli_sa);        rpc_cli_t *c = TASK_ARG(task);
        srv->srv_server.cli_sock = socket(family, SOCK_STREAM, 0);        rpc_srv_t *s = c->cli_parent;
        if (srv->srv_server.cli_sock == -1) {
                LOGERR;        schedCancelby(TASK_ROOT(task), taskMAX, CRITERIA_ARG, TASK_ARG(task), NULL);
                free(srv);
                return NULL;        /* close client socket */
         if (TASK_VAL(task))
                 shutdown(c->cli_sock, SHUT_RDWR);
         close(c->cli_sock);
 
         /* free buffer */
         AIT_FREE_VAL(&c->cli_buf);
 
         io_arrayDel(s->srv_blob.clients, c->cli_id, 42);
         return NULL;
 }
 
 static void *
 txBLOB(sched_task_t *task)
 {
         rpc_cli_t *c = TASK_ARG(task);
         u_char *buf = AIT_GET_BUF(&c->cli_buf);
         struct tagBLOBHdr *blob = (struct tagBLOBHdr *) buf;
         int wlen = sizeof(struct tagBLOBHdr);
 
         /* calculate CRC */
         blob->hdr_crc ^= blob->hdr_crc;
         blob->hdr_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
 
         /* send reply */
         wlen = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);
         if (wlen == -1 || wlen != sizeof(struct tagBLOBHdr)) {
                 /* close blob connection */
                 schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
         }          }
        if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
                LOGERR;        return NULL;
                close(srv->srv_server.cli_sock);}
                free(srv);
 static void *
 rxBLOB(sched_task_t *task)
 {
         rpc_cli_t *c = TASK_ARG(task);
         rpc_srv_t *s = c->cli_parent;
         rpc_blob_t *b;
         struct tagBLOBHdr blob;
         int rlen;
         u_short crc;
 
         memset(&blob, 0, sizeof blob);
         rlen = recv(TASK_FD(task), &blob, sizeof blob, 0);
         if (rlen < 1) {
                 /* close blob connection */
                 schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
                 return NULL;                  return NULL;
         }          }
        if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa, sizeof srv->srv_server.cli_sa) == -1) {
                LOGERR;        /* check BLOB packet */
                close(srv->srv_server.cli_sock);        if (rlen < sizeof(struct tagBLOBHdr)) {
                free(srv);                rpc_SetErr(ERPCMISMATCH, "Short BLOB packet");
 
                 schedReadSelf(task);
                 return NULL;                  return NULL;
         }          }
   
        srv->srv_clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));        /* check integrity of packet */
        if (!srv->srv_clients) {        crc = ntohs(blob.hdr_crc);
                LOGERR;        blob.hdr_crc ^= blob.hdr_crc;
                close(srv->srv_server.cli_sock);        if (crc != crcFletcher16((u_short*) &blob, rlen / 2)) {
                free(srv);                rpc_SetErr(ERPCMISMATCH, "Bad CRC BLOB packet");
 
                 schedReadSelf(task);
                 return NULL;                  return NULL;
        } else        }
                memset(srv->srv_clients, 0, srv->srv_numcli * sizeof(rpc_cli_t)); 
   
        rpc_srv_registerCall(srv, NULL, CALL_SRVSHUTDOWN, 0);        /* check RPC packet session info */
        rpc_srv_registerCall(srv, NULL, CALL_SRVCLIENTS, 0);        if ((crc = rpc_chkPktSession(&blob.hdr_session, &s->srv_session))) {
        rpc_srv_registerCall(srv, NULL, CALL_SRVCALLS, 0);                rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
        rpc_srv_registerCall(srv, NULL, CALL_SRVSESSIONS, 0);                blob.hdr_cmd = error;
                 goto end;
         }
   
        pthread_mutex_init(&rpc_mtx, NULL);        /* Go to proceed packet ... */
        return srv;        switch (blob.hdr_cmd) {
                 case get:
                         if (!(b = rpc_srv_getBLOB(s, ntohl(blob.hdr_var)))) {
                                 rpc_SetErr(EINVAL, "Var=%x not found", ntohl(blob.hdr_var));
                                 blob.hdr_cmd = no;
                                 blob.hdr_ret = RPC_ERROR(-1);
                                 break;
                         } else
                                 blob.hdr_len = htonl(b->blob_len);
 
                         if (rpc_srv_blobMap(s, b) != -1) {
                                 /* deliver BLOB variable to client */
                                 blob.hdr_ret = htonl(rpc_srv_sendBLOB(c, b));
                                 rpc_srv_blobUnmap(b);
                         } else {
                                 blob.hdr_cmd = error;
                                 blob.hdr_ret = RPC_ERROR(-1);
                         }
                         break;
                 case set:
                         if ((b = rpc_srv_registerBLOB(s, ntohl(blob.hdr_len)))) {
                                 /* set new BLOB variable for reply :) */
                                 blob.hdr_var = htonl(b->blob_var);
 
                                 /* receive BLOB from client */
                                 blob.hdr_ret = htonl(rpc_srv_recvBLOB(c, b));
                                 rpc_srv_blobUnmap(b);
                         } else {
                                 blob.hdr_cmd = error;
                                 blob.hdr_ret = RPC_ERROR(-1);
                         }
                         break;
                 case unset:
                         if (rpc_srv_unregisterBLOB(s, blob.hdr_var) == -1) {
                                 blob.hdr_cmd = error;
                                 blob.hdr_ret = RPC_ERROR(-1);
                         }
                         break;
                 default:
                         rpc_SetErr(EPROCUNAVAIL, "Unsupported BLOB command %d", blob.hdr_cmd);
                         blob.hdr_cmd = error;
                         blob.hdr_ret = RPC_ERROR(-1);
         }
 
 end:
         memcpy(AIT_ADDR(&c->cli_buf), &blob, sizeof blob);
         schedWrite(TASK_ROOT(task), txBLOB, TASK_ARG(task), TASK_FD(task), NULL, 0);
         schedReadSelf(task);
         return NULL;
 }  }
   
/*static void *
 * rpc_srv_endServer() Destroy RPC server, close all opened sockets and free resourcesacceptBLOBClients(sched_task_t *task)
 * @srv = RPC Server instance 
 * return: none 
 */ 
void 
rpc_srv_endServer(rpc_srv_t * __restrict srv) 
 {  {
        rpc_cli_t *c;        rpc_srv_t *srv = TASK_ARG(task);
         rpc_cli_t *c = NULL;
         register int i;          register int i;
        rpc_func_t *f;        socklen_t salen = sizeof(io_sockaddr_t);
   
        if (!srv) {        /* check free slots for connect */
                rpc_SetErr(EINVAL, "Error:: Can`t destroy server because parameter is null!\n");        for (i = 0; i < io_arraySize(srv->srv_blob.clients) && 
                return;                        (c = io_array(srv->srv_blob.clients, i, rpc_cli_t*)); i++);
         if (c)  /* no more free slots! */
                 goto end;
         c = malloc(sizeof(rpc_cli_t));
         if (!c) {
                 LOGERR;
                 srv->srv_kill = srv->srv_blob.kill = 1;
                 return NULL;
         } else {
                 memset(c, 0, sizeof(rpc_cli_t));
                 io_arraySet(srv->srv_blob.clients, i, c);
                 c->cli_id = i;
                 c->cli_parent = srv;
         }          }
   
        pthread_mutex_destroy(&rpc_mtx);        /* alloc empty buffer */
         AIT_SET_BUF2(&c->cli_buf, 0, srv->srv_netbuf);
   
        while ((f = srv->srv_funcs)) {        /* accept client */
                srv->srv_funcs = f->func_next;        c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
                free(f);        if (c->cli_sock == -1) {
        }                LOGERR;
                 AIT_FREE_VAL(&c->cli_buf);
                 io_arrayDel(srv->srv_blob.clients, i, 42);
                 goto end;
         } else
                 fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
   
        for (i = 0, c = srv->srv_clients; i < srv->srv_numcli && c; i++, c++)        schedRead(TASK_ROOT(task), rxBLOB, c, c->cli_sock, NULL, 0);
                if (c->cli_sa.sa_family)end:
                        shutdown(c->cli_sock, SHUT_RDWR);        schedReadSelf(task);
        close(srv->srv_server.cli_sock);        return NULL;
 
        if (srv->srv_clients) { 
                free(srv->srv_clients); 
                srv->srv_numcli = 0; 
        } 
 
        free(srv); 
        srv = NULL; 
 }  }
   
   /* ------------------------------------------------------ */
   
 /*  /*
 * rpc_srv_execServer() Execute Main server loop and wait for clients requests * rpc_srv_initBLOBServer() - Init & create BLOB Server
 * @srv = RPC Server instance *
 * return: -1 error or 0 ok, infinite loop ... * @srv = RPC server instance
  * @Port = Port for bind server, if Port == 0 default port is selected
  * @diskDir = Disk place for BLOB file objects
  * return: -1 == error or 0 bind and created BLOB server instance
  */   */
 int  int
rpc_srv_execServer(rpc_srv_t * __restrict srv)rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)
 {  {
        socklen_t salen = sizeof(struct sockaddr);        int n = 1;
        register int i; 
        rpc_cli_t *c; 
        fd_set fds; 
        int ret; 
        struct timeval tv = { DEF_RPC_TIMEOUT, 0 }; 
   
        if (!srv) {        if (!srv || srv->srv_kill) {
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start RPC server ...\n");                rpc_SetErr(EINVAL, "Invalid parameters can`t init BLOB server");
                 return -1;                  return -1;
         }          }
   
        if (listen(srv->srv_server.cli_sock, SOMAXCONN) == -1) {        memset(&srv->srv_blob, 0, sizeof srv->srv_blob);
         if (access(diskDir, R_OK | W_OK) == -1) {
                 LOGERR;                  LOGERR;
                 return -1;                  return -1;
        }        } else
                 AIT_SET_STR(&srv->srv_blob.dir, diskDir);
   
        while (!rpc_Kill) {        /* init blob list */
                for (c = srv->srv_clients, i = 0; i < srv->srv_numcli && c; i++, c++)        TAILQ_INIT(&srv->srv_blob.blobs);
                        if (!c->cli_sa.sa_family) 
                                break; 
                if (c && c->cli_sa.sa_family && c->cli_parent) { 
                        usleep(1000000); 
                        continue; 
                } 
   
                FD_ZERO(&fds);        srv->srv_blob.server.cli_parent = srv;
                FD_SET(srv->srv_server.cli_sock, &fds);
                ret = select(srv->srv_server.cli_sock + 1, &fds, NULL, NULL, &tv);        memcpy(&srv->srv_blob.server.cli_sa, &srv->srv_server.cli_sa, sizeof(io_sockaddr_t));
                if (ret == -1) {        switch (srv->srv_blob.server.cli_sa.sa.sa_family) {
                        LOGERR;                case AF_INET:
                        ret = 1;                        srv->srv_blob.server.cli_sa.sin.sin_port = 
                                 htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin.sin_port) + 1);
                         break;                          break;
                }                case AF_INET6:
                if (!ret)                        srv->srv_blob.server.cli_sa.sin6.sin6_port = 
                        continue;                                htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin6.sin6_port) + 1);
                         break;
                 case AF_LOCAL:
                         strlcat(srv->srv_blob.server.cli_sa.sun.sun_path, ".blob", 
                                         sizeof srv->srv_blob.server.cli_sa.sun.sun_path);
                         break;
                 default:
                         AIT_FREE_VAL(&srv->srv_blob.dir);
                         return -1;
         }
   
                c->cli_sock = accept(srv->srv_server.cli_sock, &c->cli_sa, &salen);        /* create BLOB server socket */
                if (c->cli_sock == -1) {        srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);
                        LOGERR;        if (srv->srv_blob.server.cli_sock == -1) {
                        continue;                LOGERR;
                } else                AIT_FREE_VAL(&srv->srv_blob.dir);
                        c->cli_parent = srv;                return -1;
         }
         if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
                 LOGERR;
                 close(srv->srv_blob.server.cli_sock);
                 AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;
         }
         n = srv->srv_netbuf;
         if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
                 LOGERR;
                 close(srv->srv_blob.server.cli_sock);
                 AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;
         }
         if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
                 LOGERR;
                 close(srv->srv_blob.server.cli_sock);
                 AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;
         }
         if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa, 
                                 srv->srv_blob.server.cli_sa.sa.sa_len) == -1) {
                 LOGERR;
                 close(srv->srv_blob.server.cli_sock);
                 AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;
         }
   
                if (pthread_create(&c->cli_tid, NULL, rpc_srv_dispatchCall, c)) {        /* allocate pool for concurent blob clients */
                        LOGERR;        srv->srv_blob.clients = io_arrayInit(io_arraySize(srv->srv_clients));
                        continue;        if (!srv->srv_blob.clients) {
                }                rpc_SetErr(io_GetErrno(), "%s", io_GetError());
                 close(srv->srv_blob.server.cli_sock);
                 AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;
         }          }
   
           /* init blob scheduler */
           srv->srv_blob.root = schedBegin();
           if (!srv->srv_blob.root) {
                   rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                   io_arrayDestroy(&srv->srv_blob.clients);
                   close(srv->srv_blob.server.cli_sock);
                   AIT_FREE_VAL(&srv->srv_blob.dir);
                   return -1;
           }
   
         return 0;          return 0;
 }  }
   
 // ---------------------------------------------------------  
   
 /*  /*
 * rpc_srv_freeValsCall() Free return variables for RPC call * rpc_srv_endBLOBServer() - Destroy BLOB server, close all opened sockets and free resources
 * @call = RPC function call *
  * @srv = RPC Server instance
  * return: none   * return: none
  */   */
inline voidvoid
rpc_srv_freeValsCall(rpc_func_t * __restrict call)rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
 {  {
        rpc_srv_declValsCall(call, 0);        rpc_cli_t *c;
}        register int i;
         rpc_blob_t *b, *tmp;
   
/*        if (!srv)
 * rpc_srv_declValsCall() Declare return variables for RPC call                return;
 * @call = RPC function call 
 * @return_vals = Number of return variables 
 * return: -1 error, !=-1 ok 
 */ 
inline int 
rpc_srv_declValsCall(rpc_func_t * __restrict call, int return_vals) 
{ 
        void *ptr; 
   
        if (!call || return_vals < 0) {        /* close all clients connections & server socket */
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t declare return variables for RPC call...\n");        for (i = 0; i < io_arraySize(srv->srv_blob.clients); i++) {
                return -1;                c = io_array(srv->srv_blob.clients, i, rpc_cli_t*);
        } else                if (c) {
                call->func_args = return_vals;                        shutdown(c->cli_sock, SHUT_RDWR);
                         close(c->cli_sock);
   
        if (!return_vals) {                        schedCancelby(srv->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
                if (call->func_vals) {                        AIT_FREE_VAL(&c->cli_buf);
                        free(call->func_vals); 
                        call->func_vals = NULL; 
                 }                  }
        } else {                io_arrayDel(srv->srv_blob.clients, i, 42);
                ptr = realloc(call->func_vals, return_vals * sizeof(rpc_val_t)); 
                if (!ptr) { 
                        LOGERR; 
                        return -1; 
                } else 
                        call->func_vals = ptr; 
         }          }
           io_arrayDestroy(&srv->srv_blob.clients);
   
        return call->func_args;        srv->srv_blob.kill = 1;
}        if (srv->srv_blob.tid)
                 pthread_cancel(srv->srv_blob.tid);
   
/*        close(srv->srv_blob.server.cli_sock);
 * rpc_srv_delValsCall() Clean values from return variables of RPC call
 * @call = RPC function call        /* detach blobs */
 * return: -1 error, !=-1 Returned number of cleaned RPC variables        TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
 */                TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
inline int
rpc_srv_delValsCall(rpc_func_t * __restrict call)                rpc_srv_blobFree(srv, b);
{                free(b);
        if (!call) { 
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t delete return variables ...\n"); 
                return -1; 
         }          }
   
        memset(call->func_vals, 0, call->func_args * sizeof(rpc_val_t));        schedEnd(&srv->srv_blob.root);
        return call->func_args;        AIT_FREE_VAL(&srv->srv_blob.dir);
 }  }
   
 /*  /*
 * rpc_srv_copyValsCall() Copy return variables for RPC call to new variable * rpc_srv_loopBLOBServer() - Execute Main BLOB server loop and wait for clients requests
 * @call = RPC function call *
 * @newvals = New allocated variables array, must be free after use * @srv = RPC Server instance
 * return: -1 error, !=-1 Returned number of copied RPC variables * return: -1 error or 0 ok, infinite loop ...
  */   */
inline intint
rpc_srv_copyValsCall(rpc_func_t * __restrict call, rpc_val_t ** __restrict newvals)rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv)
 {  {
        if (!call || !newvals) {        if (!srv || srv->srv_kill) {
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t copy return variables to new array\n");                rpc_SetErr(EINVAL, "Invalid parameter can`t start BLOB server");
                 return -1;                  return -1;
         }          }
   
        *newvals = calloc(call->func_args, sizeof(rpc_val_t));        fcntl(srv->srv_blob.server.cli_sock, F_SETFL, 
        if (!*newvals) {                        fcntl(srv->srv_blob.server.cli_sock, F_GETFL) | O_NONBLOCK);
 
         if (listen(srv->srv_blob.server.cli_sock, io_arraySize(srv->srv_blob.clients)) == -1) {
                 LOGERR;                  LOGERR;
                 return -1;                  return -1;
        } else        }
                memcpy(*newvals, call->func_vals, call->func_args * sizeof(rpc_val_t)); 
   
        return call->func_args;        if (!schedRead(srv->srv_blob.root, acceptBLOBClients, srv, 
}                                srv->srv_blob.server.cli_sock, NULL, 0)) {
                rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
/* 
 * rpc_srv_getValsCall() Get return variables for RPC call 
 * @call = RPC function call 
 * @vals = Returned variables, may be NULL 
 * return: -1 error, !=-1 Number of returned variables 
 */ 
inline int 
rpc_srv_getValsCall(rpc_func_t * __restrict call, rpc_val_t ** __restrict vals) 
{ 
        if (!call) { 
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t get return variables ...\n"); 
                 return -1;                  return -1;
         }          }
   
        if (vals)        /* main rpc loop */
                *vals = call->func_vals;        schedRun(srv->srv_blob.root, &srv->srv_blob.kill);
        return call->func_args;        return 0;
 }  }
   
 // ---------------------------------------------------------  
   
 /*  /*
 * rpc_srv_registerCall() Register call to RPC server * rpc_srv_initServer() - Init & create RPC Server
 * @srv = RPC Server instance *
 * @csModule = Module name, if NULL self binary * @regProgID = ProgramID for authentication & recognition
 * @csFunc = Function name * @regProcID = ProcessID for authentication & recognition
 * @args = Number of function arguments * @concurentClients = Concurent clients at same time to this server
 * return: -1 error or 0 register ok * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
  * @csHost = Host name or address for bind server, if NULL any address
  * @Port = Port for bind server, if Port == 0 default port is selected
  * return: NULL == error or !=NULL bind and created RPC server instance
  */   */
intrpc_srv_t *
rpc_srv_registerCall(rpc_srv_t * __restrict srv, const char *csModule, const char *csFunc, u_char args)rpc_srv_initServer(u_int regProgID, u_char regProcID, int concurentClients, 
                 int netBuf, const char *csHost, u_short Port)
 {  {
        rpc_func_t *func;        int n = 1;
        u_char str[MAXPATHLEN + UCHAR_MAX + 1];        rpc_srv_t *srv = NULL;
         io_sockaddr_t sa;
   
        memset(str, 0, MAXPATHLEN + UCHAR_MAX + 1);        if (!concurentClients || !regProgID) {
        if (!srv || !csFunc) {                rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t register function to RPC server ...\n");                return NULL;
                return -1; 
         }          }
        if (!(func = malloc(sizeof(rpc_func_t)))) {        if (!io_gethostbyname(csHost, Port, &sa))
                 return NULL;
         if (!Port)
                 Port = RPC_DEFPORT;
         if (netBuf < RPC_MIN_BUFSIZ)
                 netBuf = BUFSIZ;
         else
                 netBuf = io_align(netBuf, 1);   /* align netBuf length */
 
 #ifdef HAVE_SRANDOMDEV
         srandomdev();
 #else
         time_t tim;
 
         srandom((time(&tim) ^ getpid()));
 #endif
 
         srv = malloc(sizeof(rpc_srv_t));
         if (!srv) {
                 LOGERR;                  LOGERR;
                return -1;                return NULL;
        } else {        } else
                memset(func, 0, sizeof(rpc_func_t));                memset(srv, 0, sizeof(rpc_srv_t));
                strlcpy((char*) func->func_name, csFunc, UCHAR_MAX + 1);
         srv->srv_netbuf = netBuf;
         srv->srv_session.sess_version = RPC_VERSION;
         srv->srv_session.sess_program = regProgID;
         srv->srv_session.sess_process = regProcID;
 
         srv->srv_server.cli_parent = srv;
         memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa);
 
         /* init functions list */
         TAILQ_INIT(&srv->srv_funcs);
 
         /* init scheduler */
         srv->srv_root = schedBegin();
         if (!srv->srv_root) {
                 rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                 free(srv);
                 return NULL;
         }          }
        if (csModule) {
                strlcpy((char*) func->func_file, csModule, MAXPATHLEN);        /* init pool for clients */
                strlcpy((char*) str, csModule, MAXPATHLEN + UCHAR_MAX + 1);        srv->srv_clients = io_arrayInit(concurentClients);
         if (!srv->srv_clients) {
                 rpc_SetErr(io_GetErrno(), "%s", io_GetError());
                 schedEnd(&srv->srv_root);
                 free(srv);
                 return NULL;
         }          }
         strlcat((char*) str, "__", MAXPATHLEN + UCHAR_MAX + 1);  
         strlcat((char*) str, csFunc, MAXPATHLEN + UCHAR_MAX + 1);  
   
        func->func_tag = crcFletcher16((u_short*) str, (MAXPATHLEN + UCHAR_MAX + 1) / 2);        /* create server socket */
        func->func_hash = hash_fnv((char*) str, MAXPATHLEN + UCHAR_MAX + 1);        srv->srv_server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);
        if (srv->srv_server.cli_sock == -1) {
        if (rpc_srv_declValsCall(func, args) == -1) {                LOGERR;
                free(func);                io_arrayDestroy(&srv->srv_clients);
                return -1;                schedEnd(&srv->srv_root);
                 free(srv);
                 return NULL;
         }          }
           if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
                   LOGERR;
                   goto err;
           }
           n = srv->srv_netbuf;
           if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
                   LOGERR;
                   goto err;
           }
           if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
                   LOGERR;
                   goto err;
           }
           if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa, 
                                   srv->srv_server.cli_sa.sa.sa_len) == -1) {
                   LOGERR;
                   goto err;
           }
   
        func->func_next = srv->srv_funcs;        rpc_register_srvPing(srv);
        srv->srv_funcs = func;
        return 0;        return srv;
 err:    /* error condition */
         close(srv->srv_server.cli_sock);
         io_arrayDestroy(&srv->srv_clients);
         schedEnd(&srv->srv_root);
         free(srv);
         return NULL;
 }  }
   
 /*  /*
 * rpc_srv_unregisterCall() Unregister call from RPC server * rpc_srv_endServer() - Destroy RPC server, close all opened sockets and free resources
 * @srv = RPC Server instance *
 * @csModule = Module name, if NULL self binary * @psrv = RPC Server instance
 * @csFunc = Function name * return: none
 * return: -1 error, 0 not found call, 1 unregister ok 
  */   */
intvoid
rpc_srv_unregisterCall(rpc_srv_t * __restrict srv, const char *csModule, const char *csFunc)rpc_srv_endServer(rpc_srv_t ** __restrict psrv)
 {  {
        rpc_func_t func, *f, *curr;        rpc_cli_t *c;
        u_char str[MAXPATHLEN + UCHAR_MAX + 1];        register int i;
         rpc_func_t *f, *tmp;
   
        memset(&func, 0, sizeof(rpc_func_t));        if (!psrv || !*psrv)
        memset(str, 0, MAXPATHLEN + UCHAR_MAX + 1);                return;
        if (!srv || !csFunc) { 
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t unregister function from RPC server ...\n"); 
                return -1; 
        } else 
                strlcpy((char*) func.func_name, csFunc, UCHAR_MAX + 1); 
        if (csModule) { 
                strlcpy((char*) func.func_file, csModule, MAXPATHLEN); 
                strlcpy((char*) str, csModule, MAXPATHLEN + UCHAR_MAX + 1); 
        } 
        strlcat((char*) str, "__", MAXPATHLEN + UCHAR_MAX + 1); 
        strlcat((char*) str, csFunc, MAXPATHLEN + UCHAR_MAX + 1); 
   
        func.func_tag = crcFletcher16((u_short*) str, (MAXPATHLEN + UCHAR_MAX + 1) / 2);        if (!(*psrv)->srv_blob.kill)
        func.func_hash = hash_fnv((char*) str, MAXPATHLEN + UCHAR_MAX + 1);                rpc_srv_endBLOBServer(*psrv);
   
        f = rpc_srv_getCall(srv, func.func_tag, func.func_hash);        /* close all clients connections & server socket */
        if (!f)             // not found element for unregister        for (i = 0; i < io_arraySize((*psrv)->srv_clients); i++) {
                return 0;                c = io_array((*psrv)->srv_clients, i, rpc_cli_t*);
                 if (c) {
                         shutdown(c->cli_sock, SHUT_RDWR);
                         close(c->cli_sock);
   
        if (srv->srv_funcs == f) {   // if is 1st element                        schedCancelby((*psrv)->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
                srv->srv_funcs = srv->srv_funcs->func_next;                        AIT_FREE_VAL(&c->cli_buf);
                 }
                 io_arrayDel((*psrv)->srv_clients, i, 42);
         }
         io_arrayDestroy(&(*psrv)->srv_clients);
   
                if (f->func_args && f->func_vals)        close((*psrv)->srv_server.cli_sock);
                        free(f->func_vals); 
                free(f); 
        } else { 
                for (curr = srv->srv_funcs; curr->func_next != f; curr = curr->func_next); 
                curr->func_next = curr->func_next->func_next; 
   
                if (f->func_args && f->func_vals)        /* detach exported calls */
                        free(f->func_vals);        TAILQ_FOREACH_SAFE(f, &(*psrv)->srv_funcs, func_node, tmp) {
                 TAILQ_REMOVE(&(*psrv)->srv_funcs, f, func_node);
 
                 io_freeVars(&f->func_vars);
                 AIT_FREE_VAL(&f->func_name);
                 free(f);                  free(f);
         }          }
   
        return 1;        schedEnd(&(*psrv)->srv_root);
         free(*psrv);
         *psrv = NULL;
 }  }
   
 /*  /*
 * rpc_srv_getCall() Get registered call from RPC server * rpc_srv_loopServer() - Execute Main server loop and wait for clients requests
  *
  * @srv = RPC Server instance   * @srv = RPC Server instance
 * @tag = tag for function * return: -1 error or 0 ok, infinite loop ...
 * @hash = hash for function 
 * return: NULL not found call, !=NULL return call 
  */   */
inline rpc_func_t *int
rpc_srv_getCall(rpc_srv_t * __restrict srv, uint16_t tag, uint32_t hash)rpc_srv_loopServer(rpc_srv_t * __restrict srv)
 {  {
         rpc_func_t *f;  
   
         if (!srv) {          if (!srv) {
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t get function from RPC server ...\n");                rpc_SetErr(EINVAL, "Invalid parameter can`t start RPC server");
                return NULL;                return -1;
         }          }
   
        for (f = srv->srv_funcs; f; f = f->func_next)        fcntl(srv->srv_server.cli_sock, F_SETFL, 
                if (f->func_tag == tag && f->func_hash == hash)                        fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
                        break; 
   
        return f;        if (listen(srv->srv_server.cli_sock, io_arraySize(srv->srv_clients)) == -1) {
}                LOGERR;
                 return -1;
         }
   
/*        if (!schedRead(srv->srv_root, acceptClients, srv, srv->srv_server.cli_sock, NULL, 0)) {
 * rpc_srv_getFunc() Get registered call from RPC server by Name                rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
 * @srv = RPC Server instance                return -1;
 * @csModule = Module name, if NULL self binary 
 * @csFunc = Function name 
 * return: NULL not found call, !=NULL return call 
 */ 
rpc_func_t * 
rpc_srv_getFunc(rpc_srv_t * __restrict srv, const char *csModule, const char *csFunc) 
{ 
        rpc_func_t func; 
        u_char str[MAXPATHLEN + UCHAR_MAX + 1]; 
 
        memset(&func, 0, sizeof(rpc_func_t)); 
        memset(str, 0, MAXPATHLEN + UCHAR_MAX + 1); 
        if (!srv || !csFunc) { 
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t get function from RPC server ...\n"); 
                return NULL; 
        } else 
                strlcpy((char*) func.func_name, csFunc, UCHAR_MAX + 1); 
        if (csModule) { 
                strlcpy((char*) func.func_file, csModule, MAXPATHLEN); 
                strlcpy((char*) str, csModule, MAXPATHLEN + UCHAR_MAX + 1); 
         }          }
         strlcat((char*) str, "__", MAXPATHLEN + UCHAR_MAX + 1);  
         strlcat((char*) str, csFunc, MAXPATHLEN + UCHAR_MAX + 1);  
   
        func.func_tag = crcFletcher16((u_short*) str, (MAXPATHLEN + UCHAR_MAX + 1) / 2);        /* main rpc loop */
        func.func_hash = hash_fnv((char*) str, MAXPATHLEN + UCHAR_MAX + 1);        schedRun(srv->srv_root, &srv->srv_kill);
        return 0;
        return rpc_srv_getCall(srv, func.func_tag, func.func_hash); 
 }  }
   
 // ---------------------------------------------------------  
   
 /*  /*
  * rpc_srv_execCall() Execute registered call from RPC server   * rpc_srv_execCall() Execute registered call from RPC server
 * @data = RPC const data *
  * @call = Register RPC call   * @call = Register RPC call
  * @rpc = IN RPC call structure   * @rpc = IN RPC call structure
 * @args = IN RPC call array of rpc values * @args = IN RPC calling arguments from RPC client
  * return: -1 error, !=-1 ok   * return: -1 error, !=-1 ok
  */   */
 int  int
rpc_srv_execCall(void * const data, rpc_func_t * __restrict call, rpc_srv_execCall(rpc_func_t * __restrict call, struct tagRPCCall * __restrict rpc, 
                struct tagRPCCall * __restrict rpc, rpc_val_t * __restrict args)                array_t * __restrict args)
 {  {
         void *dl;  
         rpc_callback_t func;          rpc_callback_t func;
         int ret;  
   
        if (!data || !call || !rpc) {        if (!call || !rpc || !call->func_parent || !AIT_ADDR(&call->func_name)) {
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t exec call from RPC server ...\n");                rpc_SetErr(EINVAL, "Invalid parameter can`t exec function");
                 return -1;                  return -1;
         }          }
   
        dl = dlopen((char*) (*call->func_file ? call->func_file : NULL), RTLD_NOW);        func = AIT_GET_LIKE(&call->func_name, rpc_callback_t);
        if (!dl) {        return func(call, rpc, args);
                rpc_SetErr(ENOENT, "Error:: Can`t attach module %s!\n", dlerror()); 
                return -1; 
        } 
 
        func = dlsym(dl, (char*) call->func_name); 
        if (func) 
                ret = func(data, call, rpc->call_argc, args); 
        else { 
                rpc_SetErr(ENOEXEC, "Error:: Can`t find function %s!\n", dlerror()); 
                ret = -1; 
        } 
 
        dlclose(dl); 
        return ret; 
 }  }

Removed from v.1.1.1.1.2.2  
changed lines
  Added in v.1.9.2.25


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>