Diff for /libaitrpc/src/srv.c between versions 1.1.1.1.2.2 and 1.12.2.4

version 1.1.1.1.2.2, 2010/06/18 15:47:34 version 1.12.2.4, 2012/11/16 13:30:05
Line 5 Line 5
 * $Author$  * $Author$
 * $Id$  * $Id$
 *  *
*************************************************************************/**************************************************************************
#include "global.h"The ELWIX and AITNET software is distributed under the following
 terms:
   
   All of the documentation and software included in the ELWIX and AITNET
   Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
   
static void *Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
rpc_srv_dispatchCall(void *arg)        by Michael Pounov <misho@elwix.org>.  All rights reserved.
{ 
        rpc_cli_t *c = arg; 
        rpc_srv_t *s; 
        rpc_val_t *vals, *v = NULL; 
        rpc_func_t *f; 
        struct tagRPCCall *rpc; 
        struct tagRPCRet rrpc; 
        fd_set fds; 
        u_char buf[BUFSIZ], *data; 
        int ret, argc, Limit = 0; 
        register int i; 
   
        if (!arg) {Redistribution and use in source and binary forms, with or without
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t procced RPC client ...\n");modification, are permitted provided that the following conditions
                return NULL;are met:
        } else1. Redistributions of source code must retain the above copyright
                s = c->cli_parent;   notice, this list of conditions and the following disclaimer.
 2. Redistributions in binary form must reproduce the above copyright
    notice, this list of conditions and the following disclaimer in the
    documentation and/or other materials provided with the distribution.
 3. All advertising materials mentioning features or use of this software
    must display the following acknowledgement:
 This product includes software developed by Michael Pounov <misho@elwix.org>
 ELWIX - Embedded LightWeight unIX and its contributors.
 4. Neither the name of AITNET nor the names of its contributors
    may be used to endorse or promote products derived from this software
    without specific prior written permission.
   
        do {THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
                FD_ZERO(&fds);ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
                FD_SET(c->cli_sock, &fds);IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
                ret = select(c->cli_sock + 1, &fds, NULL, NULL, NULL);ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
                if (ret == -1) {FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
                        ret = -2;DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
                }OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
                memset(&rrpc, 0, sizeof rrpc);HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
                memset(buf, 0, BUFSIZ);LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
                if ((ret = recv(c->cli_sock, buf, BUFSIZ, 0)) == -1) {OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
                        LOGERR;SUCH DAMAGE.
                        ret = -3;*/
                        break;#include "global.h"
                } 
                if (!ret) {             // receive EOF 
                        ret = 0; 
                        break; 
                } 
                if (ret < sizeof(struct tagRPCCall)) { 
                        rpc_SetErr(EMSGSIZE, "Error:: too short RPC packet ...\n"); 
                        ret = -4; 
                        break; 
                } else 
                        rpc = (struct tagRPCCall*) buf; 
                // check RPC packet session info 
                if (memcmp(&rpc->call_session, &s->srv_session, sizeof rpc->call_session)) { 
                        rpc_SetErr(EINVAL, "Error:: get invalid RPC session ...\n"); 
                        ret = -5; 
                        break; 
                } 
                // RPC is OK! Go decapsulate variables ... 
                if (rpc->call_argc) { 
                        v = (rpc_val_t*) (buf + sizeof(struct tagRPCCall)); 
                        // RPC received variables types OK! 
                        data = (u_char*) v + rpc->call_argc * sizeof(rpc_val_t); 
                        for (i = 0; i < rpc->call_argc; i++) { 
                                switch (v[i].val_type) { 
                                        case buffer: 
                                                v[i].val.buffer = data; 
                                                data += v[i].val_len; 
                                                break; 
                                        case string: 
                                                v[i].val.string = (int8_t*) data; 
                                                data += v[i].val_len + 1; 
                                                break; 
                                        case array: 
                                                v[i].val.array = (int8_t**) data; 
                                                data += v[i].val_len; 
                                                break; 
                                        default: 
                                                break; 
                                } 
                        } 
                } 
   
                 argc = 0;  
                 vals = NULL;  
                 if (!(f = rpc_srv_getCall(s, rpc->call_tag, rpc->call_hash))) {  
                         rpc_SetErr(EINVAL, "Error:: call not found into RPC server ...\n");  
                         ret = -6;  
                 } else  
                         if ((ret = rpc_srv_execCall(s, f, rpc, v)) == -1)  
                                 ret = -6;  
                         else  
                                 argc = rpc_srv_getValsCall(f, &vals);  
   
                memcpy(&rrpc.ret_session, &rpc->call_session, sizeof rrpc.ret_session);/* SOCK_STREAM */
                rrpc.ret_tag = rpc->call_tag;static void *acceptClients(sched_task_t *);
                rrpc.ret_hash = rpc->call_hash;static void *closeClient(sched_task_t *);
                rrpc.ret_errno = rpc_Errno;static void *rxPacket(sched_task_t *);
                rrpc.ret_retcode = ret;static void *txPacket(sched_task_t *);
                rrpc.ret_argc = argc; 
   
                memset(buf, 0, BUFSIZ);/* SOCK_DGRAM */
                memcpy(buf, &rrpc, (Limit = sizeof rrpc));static void *connectClients(sched_task_t *);
                if (argc && vals) {static void *disconnectClient(sched_task_t *);
                        v = (rpc_val_t*) (buf + sizeof rrpc); 
                        memcpy(v, vals, argc * sizeof(rpc_val_t)); 
                        Limit += argc * sizeof(rpc_val_t); 
                        data = (u_char*) v + argc * sizeof(rpc_val_t); 
                        for (ret = i = 0; i < argc; i++) { 
                                switch (vals[i].val_type) { 
                                        case buffer: 
                                                if (ret || Limit + vals[i].val_len > BUFSIZ) { 
                                                        rpc_SetErr(EMSGSIZE, "Error:: in prepare RPC packet (-7) ...\n"); 
                                                        rrpc.ret_retcode = ret = -7; 
                                                        rrpc.ret_argc = 0; 
                                                        break; 
                                                } 
   
                                                memcpy(data, vals[i].val.buffer, vals[i].val_len);/* SOCK_RAW */
                                                data += vals[i].val_len; 
                                                Limit += vals[i].val_len; 
                                                break; 
                                        case string: 
                                                if (ret || Limit + vals[i].val_len + 1 > BUFSIZ) { 
                                                        rpc_SetErr(EMSGSIZE, "Error:: in prepare RPC packet (-7) ...\n"); 
                                                        rrpc.ret_retcode = ret = -7; 
                                                        rrpc.ret_argc = 0; 
                                                        break; 
                                                } 
   
                                                memcpy(data, vals[i].val.string, vals[i].val_len + 1);static sched_task_func_t cbProto[SOCK_RAW + 1][4] = {
                                                data += vals[i].val_len + 1;        { acceptClients, closeClient, rxPacket, txPacket },      /* SOCK_STREAM */
                                                Limit += vals[i].val_len + 1;        { acceptClients, closeClient, rxPacket, txPacket },     /* SOCK_STREAM */
                                                break;        { connectClients, disconnectClient, NULL, NULL }, 
                                        case array:        { NULL, NULL, NULL, NULL }                              /* SOCK_RAW */
                                                if (ret || Limit + vals[i].val_len > BUFSIZ) {};
                                                        rpc_SetErr(EMSGSIZE, "Error:: in prepare RPC packet (-7) ...\n"); 
                                                        rrpc.ret_retcode = ret = -7; 
                                                        rrpc.ret_argc = 0; 
                                                        break; 
                                                } 
   
                                                 memcpy(data, vals[i].val.array, vals[i].val_len);  
                                                 data += vals[i].val_len;  
                                                 Limit += vals[i].val_len;  
                                                 break;  
                                         default:  
                                                 break;  
                                 }  
   
                                RPC_FREE_VAL(&vals[i]);static inline int
                        }_check4freeslot(rpc_srv_t * __restrict srv, io_sockaddr_t * __restrict sa)
                }{
         rpc_cli_t *c = NULL;
         register int i;
   
                if ((ret = send(c->cli_sock, buf, Limit, 0)) == -1) {        /* check free slots for connect */
                        LOGERR;        for (i = 0; i < io_arraySize(srv->srv_clients) && 
                        ret = -8;                        (c = io_array(srv->srv_clients, i, rpc_cli_t*)); i++)
                 /* check for duplicates */
                 if (sa && !io_addrcmp(&c->cli_sa, sa, 42))
                         break;                          break;
                }        if (i >= io_arraySize(srv->srv_clients))
                if (ret != Limit) {                return -1;      /* no more free slots! */
                        rpc_SetErr(EBADMSG, "Error:: in send RPC request, should be send %d bytes, " 
                                        "really is %d\n", Limit, ret); 
                        ret = -9; 
                        break; 
                } 
        } while (ret > -1); 
   
        shutdown(c->cli_sock, SHUT_RDWR);        return i;
        close(c->cli_sock); 
        memset(c, 0, sizeof(rpc_cli_t)); 
        return (void*) ret; 
 }  }
   
// -------------------------------------------------static rpc_cli_t *
_allocClient(rpc_srv_t * __restrict srv, io_sockaddr_t * __restrict sa)
/* 
 * rpc_srv_initServer() Init & create RPC Server 
 * @regProgID = ProgramID for authentication & recognition 
 * @regProcID = ProcessID for authentication & recognition 
 * @concurentClients = Concurent clients at same time to this server 
 * @family = Family socket type, AF_INET or AF_INET6 
 * @csHost = Host name or IP address for bind server, if NULL any address 
 * @Port = Port for bind server, if Port == 0 default port is selected 
 * return: NULL == error or !=NULL bind and created RPC server instance 
 */ 
rpc_srv_t * 
rpc_srv_initServer(u_int regProgID, u_int regProcID, int concurentClients,  
                u_short family, const char *csHost, u_short Port) 
 {  {
        rpc_srv_t *srv = NULL;        rpc_cli_t *c = NULL;
        int n = 1;        int n;
        struct hostent *host = NULL; 
        struct sockaddr_in sin; 
        struct sockaddr_in6 sin6; 
   
        if (!concurentClients || !regProgID || (family != AF_INET && family != AF_INET6)) {        n = _check4freeslot(srv, sa);
                rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t init RPC server ...\n");        if (n == -1)
                 return NULL;                  return NULL;
        }        else
        if (!Port)                c = io_array(srv->srv_clients, n, rpc_cli_t*);
                Port = RPC_DEFPORT;
        if (csHost) {        if (!c) {
                host = gethostbyname2(csHost, family);                c = io_malloc(sizeof(rpc_cli_t));
                if (!host) {                if (!c) {
                        rpc_SetErr(h_errno, "Error:: %s\n", hstrerror(h_errno));                        LOGERR;
                         srv->srv_kill = 1;
                         return NULL;                          return NULL;
                   } else {
                           memset(c, 0, sizeof(rpc_cli_t));
                           io_arraySet(srv->srv_clients, n, c);
                           c->cli_id = n;
                           c->cli_parent = srv;
                 }                  }
   
                   /* alloc empty buffer */
                   AIT_SET_BUF2(&c->cli_buf, 0, srv->srv_netbuf);
         }          }
         switch (family) {  
                 case AF_INET:  
                         memset(&sin, 0, sizeof sin);  
                         sin.sin_len = sizeof sin;  
                         sin.sin_family = family;  
                         sin.sin_port = htons(Port);  
                         if (csHost)  
                                 memcpy(&sin.sin_addr, host->h_addr, host->h_length);  
                         break;  
                 case AF_INET6:  
                         memset(&sin6, 0, sizeof sin6);  
                         sin6.sin6_len = sizeof sin6;  
                         sin6.sin6_family = family;  
                         sin6.sin6_port = htons(Port);  
                         if (csHost)  
                                 memcpy(&sin6.sin6_addr, host->h_addr, host->h_length);  
                         break;  
                 default:  
                         rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t start RPC server ...\n");  
                         return NULL;  
         }  
   
        srv = malloc(sizeof(rpc_srv_t));        return c;
        if (!srv) {}
                LOGERR;
 
 static void *
 connectClients(sched_task_t *task)
 {
         rpc_srv_t *srv = TASK_ARG(task);
         rpc_cli_t *c = NULL;
         socklen_t salen = sizeof(io_sockaddr_t);
         int len, rlen;
         u_short crc;
         u_char buf[USHRT_MAX + 1];
         struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
         io_sockaddr_t sa;
 
 #if 0
         c = _allocClient(srv);
         if (!c) {
                 schedReadSelf(task);
                 return NULL;                  return NULL;
         } else          } else
                memset(srv, 0, sizeof(rpc_srv_t));                buf = AIT_GET_BUF(&c->cli_buf);
 #endif
   
        srv->srv_numcli = concurentClients;        /* receive connect packet */
        srv->srv_session.sess_version = RPC_VERSION;        rlen = recvfrom(TASK_FD(task), buf, sizeof buf, 0, &sa.sa, &salen);
        srv->srv_session.sess_program = regProgID;        if (rlen < 1)
        srv->srv_session.sess_process = regProcID;                goto end;
   
        srv->srv_server.cli_tid = pthread_self();        /* check RPC packet */
        srv->srv_server.cli_parent = srv;        if (rlen < sizeof(struct tagRPCCall)) {
        if (family == AF_INET)                rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
                memcpy(&srv->srv_server.cli_sa, &sin, sizeof srv->srv_server.cli_sa);                goto end;
        else        } else {
                memcpy(&srv->srv_server.cli_sa, &sin6, sizeof srv->srv_server.cli_sa);                len = ntohs(rpc->call_len);
        srv->srv_server.cli_sock = socket(family, SOCK_STREAM, 0);                rlen -= len;
        if (srv->srv_server.cli_sock == -1) { 
                LOGERR; 
                free(srv); 
                return NULL; 
         }          }
        if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {        /* check RPC packet lengths */
                LOGERR;        if (rlen < 0 || len < sizeof(struct tagRPCCall)) {
                close(srv->srv_server.cli_sock);                rpc_SetErr(ERPCMISMATCH, "Broken RPC packet length");
                free(srv);                goto end;
                return NULL; 
         }          }
        if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa, sizeof srv->srv_server.cli_sa) == -1) {
                LOGERR;        /* check integrity of packet */
                close(srv->srv_server.cli_sock);        crc = ntohs(rpc->call_crc);
                free(srv);        rpc->call_crc ^= rpc->call_crc;
                return NULL;        if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
                 rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
                 goto end;
         }          }
   
        srv->srv_clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));        /* check RPC packet session info */
        if (!srv->srv_clients) {        if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
                LOGERR;                rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
                close(srv->srv_server.cli_sock);                goto end;
                free(srv);        }
                return NULL; 
        } else 
                memset(srv->srv_clients, 0, srv->srv_numcli * sizeof(rpc_cli_t)); 
   
        rpc_srv_registerCall(srv, NULL, CALL_SRVSHUTDOWN, 0);        /* RPC packet is OK */
        rpc_srv_registerCall(srv, NULL, CALL_SRVCLIENTS, 0); 
        rpc_srv_registerCall(srv, NULL, CALL_SRVCALLS, 0); 
        rpc_srv_registerCall(srv, NULL, CALL_SRVSESSIONS, 0); 
   
        pthread_mutex_init(&rpc_mtx, NULL);        c = _allocClient(srv, &sa);
        return srv;        if (!c)
                 goto end;
 
         if (0) {
                 schedRead(TASK_ROOT(task), cbProto[srv->srv_proto][CB_RXPACKET], c, 
                                 c->cli_sock, NULL, 0);
                 schedReadSelf(task);
                 return NULL;
         }
 
 end:
 #if 0
         /* close connection & dont send disconnect */
         schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], 
                         c, 42, NULL, 42);
 #endif
         schedReadSelf(task);
         return NULL;
 }  }
   
/*static void *
 * rpc_srv_endServer() Destroy RPC server, close all opened sockets and free resourcesdisconnectClient(sched_task_t *task)
 * @srv = RPC Server instance 
 * return: none 
 */ 
void 
rpc_srv_endServer(rpc_srv_t * __restrict srv) 
 {  {
        rpc_cli_t *c;        rpc_cli_t *c = TASK_ARG(task);
        register int i;        rpc_srv_t *s = c->cli_parent;
        rpc_func_t *f; 
   
        if (!srv) {        schedCancelby(TASK_ROOT(task), taskMAX, CRITERIA_ARG, TASK_ARG(task), NULL);
                rpc_SetErr(EINVAL, "Error:: Can`t destroy server because parameter is null!\n");
                return;        if (!TASK_DATLEN(task)) {
                 /* TODO: send disconnect packet */
         }          }
   
        pthread_mutex_destroy(&rpc_mtx);        /* free buffer */
         AIT_FREE_VAL(&c->cli_buf);
   
        while ((f = srv->srv_funcs)) {        io_arrayDel(s->srv_clients, c->cli_id, 0);
                srv->srv_funcs = f->func_next;        if (c)
                free(f);                io_free(c);
         return NULL;
 }
 
 static void *
 closeClient(sched_task_t *task)
 {
         rpc_cli_t *c = TASK_ARG(task);
         rpc_srv_t *s = c->cli_parent;
 
         schedCancelby(TASK_ROOT(task), taskMAX, CRITERIA_ARG, TASK_ARG(task), NULL);
 
         /* close client socket */
         if (TASK_VAL(task))
                 shutdown(c->cli_sock, SHUT_RDWR);
         close(c->cli_sock);
 
         /* free buffer */
         AIT_FREE_VAL(&c->cli_buf);
 
         io_arrayDel(s->srv_clients, c->cli_id, 0);
         if (c)
                 io_free(c);
         return NULL;
 }
 
 static void *
 txPacket(sched_task_t *task)
 {
         rpc_cli_t *c = TASK_ARG(task);
         rpc_srv_t *s = c->cli_parent;
         rpc_func_t *f = NULL;
         u_char buf[USHRT_MAX] = { 0 };
         struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
         int ret, wlen = sizeof(struct tagRPCCall);
 
         /* copy RPC header */
         memcpy(buf, TASK_DATA(task), wlen);
 
         if (rpc->call_argc) {
                 f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
                 if (!f) {
                         rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
                         rpc->call_argc ^= rpc->call_argc;
                         rpc->call_rep.ret = RPC_ERROR(-1);
                         rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                 } else {
                         rpc->call_argc = htons(io_arraySize(RPC_RETVARS(c)));
                         /* Go Encapsulate variables */
                         ret = io_vars2buffer(buf + wlen, sizeof buf - wlen, RPC_RETVARS(c));
                         /* Free return values */
                         io_freeVars(&c->cli_vars);
                         if (ret == -1) {
                                 rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
                                 rpc->call_argc ^= rpc->call_argc;
                                 rpc->call_rep.ret = RPC_ERROR(-1);
                                 rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                         } else
                                 wlen += ret;
                 }
         }          }
   
        for (i = 0, c = srv->srv_clients; i < srv->srv_numcli && c; i++, c++)        rpc->call_len = htons(wlen);
                if (c->cli_sa.sa_family) 
                        shutdown(c->cli_sock, SHUT_RDWR); 
        close(srv->srv_server.cli_sock); 
   
        if (srv->srv_clients) {        /* calculate CRC */
                free(srv->srv_clients);        rpc->call_crc ^= rpc->call_crc;
                srv->srv_numcli = 0;        rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
 
         /* send reply */
         ret = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);
         if (ret == -1 || ret != wlen) {
                 /* close connection */
                 schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                 c, 42, NULL, 0);
         }          }
   
        free(srv);        return NULL;
        srv = NULL; 
 }  }
   
/*static void *
 * rpc_srv_execServer() Execute Main server loop and wait for clients requestsexecCall(sched_task_t *task)
 * @srv = RPC Server instance 
 * return: -1 error or 0 ok, infinite loop ... 
 */ 
int 
rpc_srv_execServer(rpc_srv_t * __restrict srv) 
 {  {
        socklen_t salen = sizeof(struct sockaddr);        rpc_cli_t *c = TASK_ARG(task);
        register int i;        rpc_srv_t *s = c->cli_parent;
        rpc_cli_t *c;        rpc_func_t *f = NULL;
        fd_set fds;        array_t *arr = NULL;
        int ret;        u_char *buf = AIT_GET_BUF(&c->cli_buf) + TASK_VAL(task);
        struct timeval tv = { DEF_RPC_TIMEOUT, 0 };        struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
         int argc = ntohs(rpc->call_argc);
   
        if (!srv) {        /* Go decapsulate variables ... */
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start RPC server ...\n");        if (argc) {
                return -1;                arr = io_buffer2vars(buf + sizeof(struct tagRPCCall), 
                                 AIT_LEN(&c->cli_buf) - TASK_VAL(task) - sizeof(struct tagRPCCall), 
                                 argc, 42);
                 if (!arr) {
                         rpc_SetErr(ERPCMISMATCH, "#%d - %s", io_GetErrno(), io_GetError());
                         rpc->call_argc ^= rpc->call_argc;
                         rpc->call_rep.ret = RPC_ERROR(-1);
                         rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                         return NULL;
                 }
         } else
                 arr = NULL;
 
         if (!(f = rpc_srv_getCall(s, ntohs(rpc->call_tag)))) {
                 rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
                 rpc->call_argc ^= rpc->call_argc;
                 rpc->call_rep.ret = RPC_ERROR(-1);
                 rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
         } else {
                 /* if client doesn't want reply */
                 argc = RPC_CHK_NOREPLY(rpc);
                 rpc->call_rep.ret = RPC_ERROR(rpc_srv_execCall(c, rpc, f->func_name, arr));
                 if (rpc->call_rep.ret == htonl(-1)) {
                         rpc->call_rep.eno = RPC_ERROR(errno);
                         rpc->call_argc ^= rpc->call_argc;
                 } else {
                         rpc->call_rep.eno ^= rpc->call_rep.eno;
                         if (argc) {
                                 /* without reply */
                                 io_freeVars(&c->cli_vars);
                                 rpc->call_argc ^= rpc->call_argc;
                         } else {
                                 /* reply */
                                 rpc->call_argc = htons(io_arraySize(RPC_RETVARS(c)));
                         }
                 }
         }          }
   
        if (listen(srv->srv_server.cli_sock, SOMAXCONN) == -1) {        io_arrayDestroy(&arr);
                LOGERR;        return NULL;
                return -1;}
 
 static void *
 rxPacket(sched_task_t *task)
 {
         rpc_cli_t *c = TASK_ARG(task);
         rpc_srv_t *s = c->cli_parent;
         int len, rlen, noreply;
         u_short crc, off = TASK_DATLEN(task);
         u_char *buf = AIT_GET_BUF(&c->cli_buf);
         struct tagRPCCall *rpc;
 
         if (!off)
                 memset(buf, 0, AIT_LEN(&c->cli_buf));
         rlen = recv(TASK_FD(task), buf + off, AIT_LEN(&c->cli_buf) - off, 0);
         if (rlen < 1) {
                 /* close connection */
                 schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                 c, 42, NULL, 0);
                 return NULL;
         } else {
                 rlen += off;    /* add reminded bytes from previous rxPacket, if exists! */
                 off = 0;        /* process buffer from start offset == 0 */
         }          }
   
        while (!rpc_Kill) {        do {
                for (c = srv->srv_clients, i = 0; i < srv->srv_numcli && c; i++, c++)                /* check RPC packet */
                        if (!c->cli_sa.sa_family)                if (rlen < sizeof(struct tagRPCCall)) {
                                break;                        rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
                if (c && c->cli_sa.sa_family && c->cli_parent) { 
                        usleep(1000000); 
                        continue; 
                } 
   
                FD_ZERO(&fds);                        /* reminder received previous bytes ;) */
                FD_SET(srv->srv_server.cli_sock, &fds);                        schedRead(TASK_ROOT(task), TASK_FUNC(task), TASK_ARG(task), 
                ret = select(srv->srv_server.cli_sock + 1, &fds, NULL, NULL, &tv);                                        TASK_FD(task), TASK_DATA(task), rlen);
                if (ret == -1) {                        return NULL;
                        LOGERR;                } else
                        ret = 1;                        rpc = (struct tagRPCCall*) (buf + off);
 
                 len = ntohs(rpc->call_len);
                 rlen -= len;
 
                 /* check RPC packet lengths */
                 if (rlen < 0 || len < sizeof(struct tagRPCCall)) {
                         rpc_SetErr(ERPCMISMATCH, "Broken RPC packet length");
                         /* skip entire packet */
                         break;                          break;
                 }                  }
                 if (!ret)  
                         continue;  
   
                c->cli_sock = accept(srv->srv_server.cli_sock, &c->cli_sa, &salen);                /* check integrity of packet */
                if (c->cli_sock == -1) {                crc = ntohs(rpc->call_crc);
                        LOGERR;                rpc->call_crc ^= rpc->call_crc;
                        continue;                if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
                } else                        rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
                        c->cli_parent = srv; 
   
                if (pthread_create(&c->cli_tid, NULL, rpc_srv_dispatchCall, c)) {                        off += len;
                        LOGERR;                        /* try next packet remaining into buffer */
                         continue;                          continue;
                 }                  }
         }  
   
        return 0;                noreply = RPC_CHK_NOREPLY(rpc);
 
                 /* check RPC packet session info */
                 if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) {
                         rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
                         rpc->call_argc ^= rpc->call_argc;
                         rpc->call_rep.ret = RPC_ERROR(-1);
                         rpc->call_rep.eno = RPC_ERROR(errno);
                 } else {
                         /* execute RPC call */
                         schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), off, NULL, 0);
                 }
 
                 /* send RPC reply */
                 if (!noreply)
                         schedWrite(TASK_ROOT(task), cbProto[s->srv_proto][CB_TXPACKET], 
                                         TASK_ARG(task), TASK_FD(task), rpc, len);
 
                 off += len;
         } while (rlen > 0);
 
         /* lets get next packet */
         schedRead(TASK_ROOT(task), TASK_FUNC(task), TASK_ARG(task), TASK_FD(task), TASK_DATA(task), 0);
         return NULL;
 }  }
   
// ---------------------------------------------------------static void *
 acceptClients(sched_task_t *task)
 {
         rpc_srv_t *srv = TASK_ARG(task);
         rpc_cli_t *c = NULL;
         socklen_t salen = sizeof(io_sockaddr_t);
   
/*        c = _allocClient(srv, NULL);
 * rpc_srv_freeValsCall() Free return variables for RPC call        if (!c)
 * @call = RPC function call                goto end;
 * return: none
 */        /* accept client */
inline void        c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
rpc_srv_freeValsCall(rpc_func_t * __restrict call)        if (c->cli_sock == -1) {
                 LOGERR;
                 AIT_FREE_VAL(&c->cli_buf);
                 io_arrayDel(srv->srv_clients, c->cli_id, 42);
                 goto end;
         } else
                 fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
 
         schedRead(TASK_ROOT(task), cbProto[srv->srv_proto][CB_RXPACKET], c, 
                         c->cli_sock, NULL, 0);
 end:
         schedReadSelf(task);
         return NULL;
 }
 
 /* ------------------------------------------------------ */
 
 static void *
 closeBLOBClient(sched_task_t *task)
 {  {
        rpc_srv_declValsCall(call, 0);        rpc_cli_t *c = TASK_ARG(task);
         rpc_srv_t *s = c->cli_parent;
 
         schedCancelby(TASK_ROOT(task), taskMAX, CRITERIA_ARG, TASK_ARG(task), NULL);
 
         /* close client socket */
         if (TASK_VAL(task))
                 shutdown(c->cli_sock, SHUT_RDWR);
         close(c->cli_sock);
 
         /* free buffer */
         AIT_FREE_VAL(&c->cli_buf);
 
         io_arrayDel(s->srv_blob.clients, c->cli_id, 0);
         if (c)
                 io_free(c);
         return NULL;
 }  }
   
/*static void *
 * rpc_srv_declValsCall() Declare return variables for RPC calltxBLOB(sched_task_t *task)
 * @call = RPC function call 
 * @return_vals = Number of return variables 
 * return: -1 error, !=-1 ok 
 */ 
inline int 
rpc_srv_declValsCall(rpc_func_t * __restrict call, int return_vals) 
 {  {
        void *ptr;        rpc_cli_t *c = TASK_ARG(task);
         u_char *buf = AIT_GET_BUF(&c->cli_buf);
         struct tagBLOBHdr *blob = (struct tagBLOBHdr *) buf;
         int wlen = sizeof(struct tagBLOBHdr);
   
        if (!call || return_vals < 0) {        /* calculate CRC */
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t declare return variables for RPC call...\n");        blob->hdr_crc ^= blob->hdr_crc;
                return -1;        blob->hdr_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
        } else 
                call->func_args = return_vals; 
   
        if (!return_vals) {        /* send reply */
                if (call->func_vals) {        wlen = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);
                        free(call->func_vals);        if (wlen == -1 || wlen != sizeof(struct tagBLOBHdr)) {
                        call->func_vals = NULL;                /* close blob connection */
                }                schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
        } else { 
                ptr = realloc(call->func_vals, return_vals * sizeof(rpc_val_t)); 
                if (!ptr) { 
                        LOGERR; 
                        return -1; 
                } else 
                        call->func_vals = ptr; 
         }          }
   
        return call->func_args;        return NULL;
 }  }
   
/*static void *
 * rpc_srv_delValsCall() Clean values from return variables of RPC callrxBLOB(sched_task_t *task)
 * @call = RPC function call 
 * return: -1 error, !=-1 Returned number of cleaned RPC variables 
 */ 
inline int 
rpc_srv_delValsCall(rpc_func_t * __restrict call) 
 {  {
        if (!call) {        rpc_cli_t *c = TASK_ARG(task);
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t delete return variables ...\n");        rpc_srv_t *s = c->cli_parent;
                return -1;        rpc_blob_t *b;
         struct tagBLOBHdr blob;
         int rlen;
         u_short crc;
 
         memset(&blob, 0, sizeof blob);
         rlen = recv(TASK_FD(task), &blob, sizeof blob, 0);
         if (rlen < 1) {
                 /* close blob connection */
                 schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
                 return NULL;
         }          }
   
        memset(call->func_vals, 0, call->func_args * sizeof(rpc_val_t));        /* check BLOB packet */
        return call->func_args;        if (rlen < sizeof(struct tagBLOBHdr)) {
                 rpc_SetErr(ERPCMISMATCH, "Short BLOB packet");
 
                 schedReadSelf(task);
                 return NULL;
         }
 
         /* check integrity of packet */
         crc = ntohs(blob.hdr_crc);
         blob.hdr_crc ^= blob.hdr_crc;
         if (crc != crcFletcher16((u_short*) &blob, rlen / 2)) {
                 rpc_SetErr(ERPCMISMATCH, "Bad CRC BLOB packet");
 
                 schedReadSelf(task);
                 return NULL;
         }
 
         /* check RPC packet session info */
         if ((crc = rpc_chkPktSession(&blob.hdr_session, &s->srv_session))) {
                 rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
                 blob.hdr_cmd = error;
                 goto end;
         }
 
         /* Go to proceed packet ... */
         switch (blob.hdr_cmd) {
                 case get:
                         if (!(b = rpc_srv_getBLOB(s, ntohl(blob.hdr_var)))) {
                                 rpc_SetErr(EINVAL, "Var=%x not found", ntohl(blob.hdr_var));
                                 blob.hdr_cmd = no;
                                 blob.hdr_ret = RPC_ERROR(-1);
                                 break;
                         } else
                                 blob.hdr_len = htonl(b->blob_len);
 
                         if (rpc_srv_blobMap(s, b) != -1) {
                                 /* deliver BLOB variable to client */
                                 blob.hdr_ret = htonl(rpc_srv_sendBLOB(c, b));
                                 rpc_srv_blobUnmap(b);
                         } else {
                                 blob.hdr_cmd = error;
                                 blob.hdr_ret = RPC_ERROR(-1);
                         }
                         break;
                 case set:
                         if ((b = rpc_srv_registerBLOB(s, ntohl(blob.hdr_len)))) {
                                 /* set new BLOB variable for reply :) */
                                 blob.hdr_var = htonl(b->blob_var);
 
                                 /* receive BLOB from client */
                                 blob.hdr_ret = htonl(rpc_srv_recvBLOB(c, b));
                                 rpc_srv_blobUnmap(b);
                         } else {
                                 blob.hdr_cmd = error;
                                 blob.hdr_ret = RPC_ERROR(-1);
                         }
                         break;
                 case unset:
                         if (rpc_srv_unregisterBLOB(s, ntohl(blob.hdr_var)) == -1) {
                                 blob.hdr_cmd = error;
                                 blob.hdr_ret = RPC_ERROR(-1);
                         }
                         break;
                 default:
                         rpc_SetErr(EPROCUNAVAIL, "Unsupported BLOB command %d", blob.hdr_cmd);
                         blob.hdr_cmd = error;
                         blob.hdr_ret = RPC_ERROR(-1);
         }
 
 end:
         memcpy(AIT_ADDR(&c->cli_buf), &blob, sizeof blob);
         schedWrite(TASK_ROOT(task), txBLOB, TASK_ARG(task), TASK_FD(task), NULL, 0);
         schedReadSelf(task);
         return NULL;
 }  }
   
   static void *
   acceptBLOBClients(sched_task_t *task)
   {
           rpc_srv_t *srv = TASK_ARG(task);
           rpc_cli_t *c = NULL;
           register int i;
           socklen_t salen = sizeof(io_sockaddr_t);
   #ifdef TCP_NOPUSH
           int n = 1;
   #endif
   
           /* check free slots for connect */
           for (i = 0; i < io_arraySize(srv->srv_blob.clients) && 
                           (c = io_array(srv->srv_blob.clients, i, rpc_cli_t*)); i++);
           if (c)  /* no more free slots! */
                   goto end;
           c = io_malloc(sizeof(rpc_cli_t));
           if (!c) {
                   LOGERR;
                   srv->srv_kill = srv->srv_blob.kill = 1;
                   return NULL;
           } else {
                   memset(c, 0, sizeof(rpc_cli_t));
                   io_arraySet(srv->srv_blob.clients, i, c);
                   c->cli_id = i;
                   c->cli_parent = srv;
           }
   
           /* alloc empty buffer */
           AIT_SET_BUF2(&c->cli_buf, 0, srv->srv_netbuf);
   
           /* accept client */
           c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
           if (c->cli_sock == -1) {
                   LOGERR;
                   AIT_FREE_VAL(&c->cli_buf);
                   io_arrayDel(srv->srv_blob.clients, i, 42);
                   goto end;
           } else {
   #ifdef TCP_NOPUSH
                   setsockopt(c->cli_sock, IPPROTO_TCP, TCP_NOPUSH, &n, sizeof n);
   #endif
                   fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
           }
   
           schedRead(TASK_ROOT(task), rxBLOB, c, c->cli_sock, NULL, 0);
   end:
           schedReadSelf(task);
           return NULL;
   }
   
   /* ------------------------------------------------------ */
   
 /*  /*
 * rpc_srv_copyValsCall() Copy return variables for RPC call to new variable * rpc_srv_initBLOBServer() - Init & create BLOB Server
 * @call = RPC function call *
 * @newvals = New allocated variables array, must be free after use * @srv = RPC server instance
 * return: -1 error, !=-1 Returned number of copied RPC variables * @Port = Port for bind server, if Port == 0 default port is selected
  * @diskDir = Disk place for BLOB file objects
  * return: -1 == error or 0 bind and created BLOB server instance
  */   */
inline intint
rpc_srv_copyValsCall(rpc_func_t * __restrict call, rpc_val_t ** __restrict newvals)rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)
 {  {
        if (!call || !newvals) {        int n = 1;
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t copy return variables to new array\n");
         if (!srv || srv->srv_kill) {
                 rpc_SetErr(EINVAL, "Invalid parameters can`t init BLOB server");
                 return -1;                  return -1;
         }          }
   
        *newvals = calloc(call->func_args, sizeof(rpc_val_t));        memset(&srv->srv_blob, 0, sizeof srv->srv_blob);
        if (!*newvals) {        if (access(diskDir, R_OK | W_OK) == -1) {
                 LOGERR;                  LOGERR;
                 return -1;                  return -1;
         } else          } else
                memcpy(*newvals, call->func_vals, call->func_args * sizeof(rpc_val_t));                AIT_SET_STR(&srv->srv_blob.dir, diskDir);
   
        return call->func_args;        /* init blob list */
         TAILQ_INIT(&srv->srv_blob.blobs);
 
         srv->srv_blob.server.cli_parent = srv;
 
         memcpy(&srv->srv_blob.server.cli_sa, &srv->srv_server.cli_sa, sizeof(io_sockaddr_t));
         switch (srv->srv_blob.server.cli_sa.sa.sa_family) {
                 case AF_INET:
                         srv->srv_blob.server.cli_sa.sin.sin_port = 
                                 htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin.sin_port) + 1);
                         break;
                 case AF_INET6:
                         srv->srv_blob.server.cli_sa.sin6.sin6_port = 
                                 htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin6.sin6_port) + 1);
                         break;
                 case AF_LOCAL:
                         strlcat(srv->srv_blob.server.cli_sa.sun.sun_path, ".blob", 
                                         sizeof srv->srv_blob.server.cli_sa.sun.sun_path);
                         break;
                 default:
                         AIT_FREE_VAL(&srv->srv_blob.dir);
                         return -1;
         }
 
         /* create BLOB server socket */
         srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);
         if (srv->srv_blob.server.cli_sock == -1) {
                 LOGERR;
                 AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;
         }
         if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
                 LOGERR;
                 close(srv->srv_blob.server.cli_sock);
                 AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;
         }
         n = srv->srv_netbuf;
         if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
                 LOGERR;
                 close(srv->srv_blob.server.cli_sock);
                 AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;
         }
         if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
                 LOGERR;
                 close(srv->srv_blob.server.cli_sock);
                 AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;
         }
         if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa, 
                                 srv->srv_blob.server.cli_sa.sa.sa_len) == -1) {
                 LOGERR;
                 close(srv->srv_blob.server.cli_sock);
                 AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;
         } else
                 fcntl(srv->srv_blob.server.cli_sock, F_SETFL, 
                                 fcntl(srv->srv_blob.server.cli_sock, F_GETFL) | O_NONBLOCK);
 
 
         /* allocate pool for concurent blob clients */
         srv->srv_blob.clients = io_arrayInit(io_arraySize(srv->srv_clients));
         if (!srv->srv_blob.clients) {
                 rpc_SetErr(io_GetErrno(), "%s", io_GetError());
                 close(srv->srv_blob.server.cli_sock);
                 AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;
         }
 
         /* init blob scheduler */
         srv->srv_blob.root = schedBegin();
         if (!srv->srv_blob.root) {
                 rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                 io_arrayDestroy(&srv->srv_blob.clients);
                 close(srv->srv_blob.server.cli_sock);
                 AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;
         }
 
         return 0;
 }  }
   
 /*  /*
 * rpc_srv_getValsCall() Get return variables for RPC call * rpc_srv_endBLOBServer() - Destroy BLOB server, close all opened sockets and free resources
 * @call = RPC function call *
 * @vals = Returned variables, may be NULL * @srv = RPC Server instance
 * return: -1 error, !=-1 Number of returned variables * return: none
  */   */
inline intinline void
rpc_srv_getValsCall(rpc_func_t * __restrict call, rpc_val_t ** __restrict vals)rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
 {  {
        if (!call) {        if (!srv)
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t get return variables ...\n");                return;
                return -1; 
        } 
   
        if (vals)        srv->srv_blob.kill = 1;
                *vals = call->func_vals; 
        return call->func_args; 
 }  }
   
 // ---------------------------------------------------------  
   
 /*  /*
 * rpc_srv_registerCall() Register call to RPC server * rpc_srv_loopBLOBServer() - Execute Main BLOB server loop and wait for clients requests
  *
  * @srv = RPC Server instance   * @srv = RPC Server instance
 * @csModule = Module name, if NULL self binary * return: -1 error or 0 ok, infinite loop ...
 * @csFunc = Function name 
 * @args = Number of function arguments 
 * return: -1 error or 0 register ok 
  */   */
 int  int
rpc_srv_registerCall(rpc_srv_t * __restrict srv, const char *csModule, const char *csFunc, u_char args)rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv)
 {  {
        rpc_func_t *func;        rpc_cli_t *c;
        u_char str[MAXPATHLEN + UCHAR_MAX + 1];        register int i;
         rpc_blob_t *b, *tmp;
         struct timespec ts = { RPC_SCHED_POLLING, 0 };
   
        memset(str, 0, MAXPATHLEN + UCHAR_MAX + 1);        if (!srv || srv->srv_kill) {
        if (!srv || !csFunc) {                rpc_SetErr(EINVAL, "Invalid parameter can`t start BLOB server");
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t register function to RPC server ...\n"); 
                 return -1;                  return -1;
         }          }
        if (!(func = malloc(sizeof(rpc_func_t)))) {
         if (listen(srv->srv_blob.server.cli_sock, io_arraySize(srv->srv_blob.clients)) == -1) {
                 LOGERR;                  LOGERR;
                 return -1;                  return -1;
         } else {  
                 memset(func, 0, sizeof(rpc_func_t));  
                 strlcpy((char*) func->func_name, csFunc, UCHAR_MAX + 1);  
         }          }
        if (csModule) {
                strlcpy((char*) func->func_file, csModule, MAXPATHLEN);        if (!schedRead(srv->srv_blob.root, acceptBLOBClients, srv, 
                strlcpy((char*) str, csModule, MAXPATHLEN + UCHAR_MAX + 1);                                srv->srv_blob.server.cli_sock, NULL, 0)) {
                 rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                 return -1;
         }          }
         strlcat((char*) str, "__", MAXPATHLEN + UCHAR_MAX + 1);  
         strlcat((char*) str, csFunc, MAXPATHLEN + UCHAR_MAX + 1);  
   
        func->func_tag = crcFletcher16((u_short*) str, (MAXPATHLEN + UCHAR_MAX + 1) / 2);        schedPolling(srv->srv_blob.root, &ts, NULL);
        func->func_hash = hash_fnv((char*) str, MAXPATHLEN + UCHAR_MAX + 1);        /* main rpc loop */
         schedRun(srv->srv_blob.root, &srv->srv_blob.kill);
   
        if (rpc_srv_declValsCall(func, args) == -1) {        /* close all clients connections & server socket */
                free(func);        for (i = 0; i < io_arraySize(srv->srv_blob.clients); i++) {
                return -1;                c = io_array(srv->srv_blob.clients, i, rpc_cli_t*);
                 if (c) {
                         shutdown(c->cli_sock, SHUT_RDWR);
                         close(c->cli_sock);
 
                         schedCancelby(srv->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
                         AIT_FREE_VAL(&c->cli_buf);
                 }
                 io_arrayDel(srv->srv_blob.clients, i, 42);
         }          }
           io_arrayDestroy(&srv->srv_blob.clients);
   
        func->func_next = srv->srv_funcs;        close(srv->srv_blob.server.cli_sock);
        srv->srv_funcs = func;
         /* detach blobs */
         TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
                 TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
 
                 rpc_srv_blobFree(srv, b);
                 io_free(b);
         }
 
         schedEnd(&srv->srv_blob.root);
         AIT_FREE_VAL(&srv->srv_blob.dir);
         return 0;          return 0;
 }  }
   
   
 /*  /*
 * rpc_srv_unregisterCall() Unregister call from RPC server * rpc_srv_initServer() - Init & create RPC Server
 * @srv = RPC Server instance *
 * @csModule = Module name, if NULL self binary * @regProgID = ProgramID for authentication & recognition
 * @csFunc = Function name * @regProcID = ProcessID for authentication & recognition
 * return: -1 error, 0 not found call, 1 unregister ok * @concurentClients = Concurent clients at same time to this server
  * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
  * @csHost = Host name or address for bind server, if NULL any address
  * @Port = Port for bind server, if Port == 0 default port is selected
  * @proto = Protocol, if == 0 choose SOCK_STREAM
  * return: NULL == error or !=NULL bind and created RPC server instance
  */   */
intrpc_srv_t *
rpc_srv_unregisterCall(rpc_srv_t * __restrict srv, const char *csModule, const char *csFunc)rpc_srv_initServer(u_int regProgID, u_char regProcID, int concurentClients, 
                 int netBuf, const char *csHost, u_short Port, int proto)
 {  {
        rpc_func_t func, *f, *curr;        int n = 1;
        u_char str[MAXPATHLEN + UCHAR_MAX + 1];        rpc_srv_t *srv = NULL;
         io_sockaddr_t sa = IO_SOCKADDR_INIT;
   
        memset(&func, 0, sizeof(rpc_func_t));        if (!concurentClients || !regProgID || (proto < 0 || proto > SOCK_DGRAM)) {
        memset(str, 0, MAXPATHLEN + UCHAR_MAX + 1);                rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
        if (!srv || !csFunc) {                return NULL;
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t unregister function from RPC server ...\n"); 
                return -1; 
        } else 
                strlcpy((char*) func.func_name, csFunc, UCHAR_MAX + 1); 
        if (csModule) { 
                strlcpy((char*) func.func_file, csModule, MAXPATHLEN); 
                strlcpy((char*) str, csModule, MAXPATHLEN + UCHAR_MAX + 1); 
         }          }
        strlcat((char*) str, "__", MAXPATHLEN + UCHAR_MAX + 1);        if (!proto)
        strlcat((char*) str, csFunc, MAXPATHLEN + UCHAR_MAX + 1);                proto = SOCK_STREAM;
         if (!io_gethostbyname(csHost, Port, &sa))
                 return NULL;
         if (!Port)
                 Port = RPC_DEFPORT;
         if (netBuf < RPC_MIN_BUFSIZ)
                 netBuf = BUFSIZ;
         else
                 netBuf = io_align(netBuf, 2);   /* align netBuf length */
   
        func.func_tag = crcFletcher16((u_short*) str, (MAXPATHLEN + UCHAR_MAX + 1) / 2);#ifdef HAVE_SRANDOMDEV
        func.func_hash = hash_fnv((char*) str, MAXPATHLEN + UCHAR_MAX + 1);        srandomdev();
 #else
         time_t tim;
   
        f = rpc_srv_getCall(srv, func.func_tag, func.func_hash);        srandom((time(&tim) ^ getpid()));
        if (!f)         // not found element for unregister#endif
                return 0; 
   
        if (srv->srv_funcs == f) {   // if is 1st element        srv = io_malloc(sizeof(rpc_srv_t));
                srv->srv_funcs = srv->srv_funcs->func_next;        if (!srv) {
                 LOGERR;
                 return NULL;
         } else
                 memset(srv, 0, sizeof(rpc_srv_t));
   
                if (f->func_args && f->func_vals)        srv->srv_proto = proto;
                        free(f->func_vals);        srv->srv_netbuf = netBuf;
                free(f);        srv->srv_session.sess_version = RPC_VERSION;
        } else {        srv->srv_session.sess_program = regProgID;
                for (curr = srv->srv_funcs; curr->func_next != f; curr = curr->func_next);        srv->srv_session.sess_process = regProcID;
                curr->func_next = curr->func_next->func_next; 
   
                if (f->func_args && f->func_vals)        srv->srv_server.cli_parent = srv;
                        free(f->func_vals);        memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa);
                free(f);
         /* init functions */
         pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
         SLIST_INIT(&srv->srv_funcs);
         AVL_INIT(&srv->srv_funcs);
 
         /* init scheduler */
         srv->srv_root = schedBegin();
         if (!srv->srv_root) {
                 rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                 pthread_mutex_destroy(&srv->srv_funcs.mtx);
                 io_free(srv);
                 return NULL;
         }          }
   
        return 1;        /* init pool for clients */
         srv->srv_clients = io_arrayInit(concurentClients);
         if (!srv->srv_clients) {
                 rpc_SetErr(io_GetErrno(), "%s", io_GetError());
                 schedEnd(&srv->srv_root);
                 pthread_mutex_destroy(&srv->srv_funcs.mtx);
                 io_free(srv);
                 return NULL;
         }
 
         /* create server socket */
         srv->srv_server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, srv->srv_proto, 0);
         if (srv->srv_server.cli_sock == -1) {
                 LOGERR;
                 io_arrayDestroy(&srv->srv_clients);
                 schedEnd(&srv->srv_root);
                 pthread_mutex_destroy(&srv->srv_funcs.mtx);
                 io_free(srv);
                 return NULL;
         }
         if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
                 LOGERR;
                 goto err;
         }
         n = srv->srv_netbuf;
         if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
                 LOGERR;
                 goto err;
         }
         if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
                 LOGERR;
                 goto err;
         }
         if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa, 
                                 srv->srv_server.cli_sa.sa.sa_len) == -1) {
                 LOGERR;
                 goto err;
         } else
                 fcntl(srv->srv_server.cli_sock, F_SETFL, 
                                 fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
 
         rpc_register_srvPing(srv);
 
         return srv;
 err:    /* error condition */
         close(srv->srv_server.cli_sock);
         io_arrayDestroy(&srv->srv_clients);
         schedEnd(&srv->srv_root);
         pthread_mutex_destroy(&srv->srv_funcs.mtx);
         io_free(srv);
         return NULL;
 }  }
   
 /*  /*
 * rpc_srv_getCall() Get registered call from RPC server * rpc_srv_endServer() - Destroy RPC server, close all opened sockets and free resources
 * @srv = RPC Server instance *
 * @tag = tag for function * @psrv = RPC Server instance
 * @hash = hash for function * return: none
 * return: NULL not found call, !=NULL return call 
  */   */
inline rpc_func_t *inline void
rpc_srv_getCall(rpc_srv_t * __restrict srv, uint16_t tag, uint32_t hash)rpc_srv_endServer(rpc_srv_t ** __restrict psrv)
 {  {
        rpc_func_t *f;        if (!psrv || !*psrv)
                 return;
   
        if (!srv) {        /* if send kill to blob server */
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t get function from RPC server ...\n");        if (!(*psrv)->srv_blob.kill)
                return NULL;                rpc_srv_endBLOBServer(*psrv);
        } 
   
        for (f = srv->srv_funcs; f; f = f->func_next)        (*psrv)->srv_kill = 1;
                if (f->func_tag == tag && f->func_hash == hash)        sleep(RPC_SCHED_POLLING);
                        break; 
   
        return f;        pthread_mutex_destroy(&(*psrv)->srv_funcs.mtx);
         io_free(*psrv);
         *psrv = NULL;
 }  }
   
 /*  /*
 * rpc_srv_getFunc() Get registered call from RPC server by Name * rpc_srv_loopServer() - Execute Main server loop and wait for clients requests
  *
  * @srv = RPC Server instance   * @srv = RPC Server instance
 * @csModule = Module name, if NULL self binary * return: -1 error or 0 ok, infinite loop ...
 * @csFunc = Function name 
 * return: NULL not found call, !=NULL return call 
  */   */
rpc_func_t *int
rpc_srv_getFunc(rpc_srv_t * __restrict srv, const char *csModule, const char *csFunc)rpc_srv_loopServer(rpc_srv_t * __restrict srv)
 {  {
        rpc_func_t func;        rpc_cli_t *c;
        u_char str[MAXPATHLEN + UCHAR_MAX + 1];        register int i;
         rpc_func_t *f;
         struct timespec ts = { RPC_SCHED_POLLING, 0 };
   
        memset(&func, 0, sizeof(rpc_func_t));        if (!srv) {
        memset(str, 0, MAXPATHLEN + UCHAR_MAX + 1);                rpc_SetErr(EINVAL, "Invalid parameter can`t start RPC server");
        if (!srv || !csFunc) {                return -1;
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t get function from RPC server ...\n"); 
                return NULL; 
        } else 
                strlcpy((char*) func.func_name, csFunc, UCHAR_MAX + 1); 
        if (csModule) { 
                strlcpy((char*) func.func_file, csModule, MAXPATHLEN); 
                strlcpy((char*) str, csModule, MAXPATHLEN + UCHAR_MAX + 1); 
         }          }
         strlcat((char*) str, "__", MAXPATHLEN + UCHAR_MAX + 1);  
         strlcat((char*) str, csFunc, MAXPATHLEN + UCHAR_MAX + 1);  
   
        func.func_tag = crcFletcher16((u_short*) str, (MAXPATHLEN + UCHAR_MAX + 1) / 2);        if (listen(srv->srv_server.cli_sock, io_arraySize(srv->srv_clients)) == -1) {
        func.func_hash = hash_fnv((char*) str, MAXPATHLEN + UCHAR_MAX + 1);                LOGERR;
                 return -1;
         }
   
        return rpc_srv_getCall(srv, func.func_tag, func.func_hash);        if (!schedRead(srv->srv_root, cbProto[srv->srv_proto][CB_ACCEPTCLIENT], srv, 
                                 srv->srv_server.cli_sock, NULL, 0)) {
                 rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                 return -1;
         }
 
         schedPolling(srv->srv_root, &ts, NULL);
         /* main rpc loop */
         schedRun(srv->srv_root, &srv->srv_kill);
 
         /* close all clients connections & server socket */
         for (i = 0; i < io_arraySize(srv->srv_clients); i++) {
                 c = io_array(srv->srv_clients, i, rpc_cli_t*);
                 if (c) {
                         shutdown(c->cli_sock, SHUT_RDWR);
                         close(c->cli_sock);
 
                         schedCancelby(srv->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
                         io_freeVars(&RPC_RETVARS(c));
                         AIT_FREE_VAL(&c->cli_buf);
                 }
                 io_arrayDel(srv->srv_clients, i, 42);
         }
         io_arrayDestroy(&srv->srv_clients);
 
         close(srv->srv_server.cli_sock);
 
         /* detach exported calls */
         RPC_FUNCS_LOCK(&srv->srv_funcs);
         while ((f = SLIST_FIRST(&srv->srv_funcs))) {
                 SLIST_REMOVE_HEAD(&srv->srv_funcs, func_next);
 
                 AIT_FREE_VAL(&f->func_name);
                 io_free(f);
         }
         srv->srv_funcs.avlh_root = NULL;
         RPC_FUNCS_UNLOCK(&srv->srv_funcs);
 
         schedEnd(&srv->srv_root);
         return 0;
 }  }
   
 // ---------------------------------------------------------  
   
 /*  /*
  * rpc_srv_execCall() Execute registered call from RPC server   * rpc_srv_execCall() Execute registered call from RPC server
 * @data = RPC const data *
 * @call = Register RPC call * @cli = RPC client
  * @rpc = IN RPC call structure   * @rpc = IN RPC call structure
 * @args = IN RPC call array of rpc values * @funcname = Execute RPC function
  * @args = IN RPC calling arguments from RPC client
  * return: -1 error, !=-1 ok   * return: -1 error, !=-1 ok
  */   */
 int  int
rpc_srv_execCall(void * const data, rpc_func_t * __restrict call, rpc_srv_execCall(rpc_cli_t * __restrict cli, struct tagRPCCall * __restrict rpc, 
                struct tagRPCCall * __restrict rpc, rpc_val_t * __restrict args)                ait_val_t funcname, array_t * __restrict args)
 {  {
         void *dl;  
         rpc_callback_t func;          rpc_callback_t func;
         int ret;  
   
        if (!data || !call || !rpc) {        if (!cli || !rpc || !AIT_ADDR(&funcname)) {
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t exec call from RPC server ...\n");                rpc_SetErr(EINVAL, "Invalid parameter can`t exec function");
                 return -1;                  return -1;
         }          }
   
        dl = dlopen((char*) (*call->func_file ? call->func_file : NULL), RTLD_NOW);        func = AIT_GET_LIKE(&funcname, rpc_callback_t);
        if (!dl) {        return func(cli, rpc, args);
                rpc_SetErr(ENOENT, "Error:: Can`t attach module %s!\n", dlerror()); 
                return -1; 
        } 
 
        func = dlsym(dl, (char*) call->func_name); 
        if (func) 
                ret = func(data, call, rpc->call_argc, args); 
        else { 
                rpc_SetErr(ENOEXEC, "Error:: Can`t find function %s!\n", dlerror()); 
                ret = -1; 
        } 
 
        dlclose(dl); 
        return ret; 
 }  }

Removed from v.1.1.1.1.2.2  
changed lines
  Added in v.1.12.2.4


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>