Diff for /libaitrpc/src/srv.c between versions 1.1.1.1.2.12 and 1.9.2.16

version 1.1.1.1.2.12, 2010/06/24 15:23:38 version 1.9.2.16, 2012/05/16 13:17:51
Line 5 Line 5
 * $Author$  * $Author$
 * $Id$  * $Id$
 *  *
*************************************************************************/**************************************************************************
 The ELWIX and AITNET software is distributed under the following
 terms:
 
 All of the documentation and software included in the ELWIX and AITNET
 Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
 
 Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
         by Michael Pounov <misho@elwix.org>.  All rights reserved.
 
 Redistribution and use in source and binary forms, with or without
 modification, are permitted provided that the following conditions
 are met:
 1. Redistributions of source code must retain the above copyright
    notice, this list of conditions and the following disclaimer.
 2. Redistributions in binary form must reproduce the above copyright
    notice, this list of conditions and the following disclaimer in the
    documentation and/or other materials provided with the distribution.
 3. All advertising materials mentioning features or use of this software
    must display the following acknowledgement:
 This product includes software developed by Michael Pounov <misho@elwix.org>
 ELWIX - Embedded LightWeight unIX and its contributors.
 4. Neither the name of AITNET nor the names of its contributors
    may be used to endorse or promote products derived from this software
    without specific prior written permission.
 
 THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
 ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
 OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
 HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
 OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
 SUCH DAMAGE.
 */
 #include "global.h"  #include "global.h"
   
   
 static void *  static void *
rpc_srv_dispatchCall(void *arg)closeClient(sched_task_t *task)
 {  {
        rpc_cli_t *c = arg;        rpc_cli_t *c = TASK_ARG(task);
        rpc_srv_t *s;        rpc_srv_t *s = c->cli_parent;
        rpc_val_t *vals = NULL, *v = NULL; 
        rpc_func_t *f; 
        struct tagRPCCall *rpc; 
        struct tagRPCRet rrpc; 
        fd_set fds; 
        u_char buf[BUFSIZ], *data; 
        int ret, argc = 0, Limit = 0; 
        register int i; 
   
        if (!arg) {        schedCancelby(s->srv_root, taskMAX, CRITERIA_ARG, TASK_ARG(task), NULL);
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t procced RPC client ...\n"); 
                return NULL; 
        } else 
                s = c->cli_parent; 
   
        do {        /* close client socket */
                FD_ZERO(&fds);        if (TASK_VAL(task))
                FD_SET(c->cli_sock, &fds);                shutdown(c->cli_sock, SHUT_RDWR);
                ret = select(c->cli_sock + 1, &fds, NULL, NULL, NULL);        close(c->cli_sock);
                if (ret == -1) {
                        ret = -2;        /* free buffer */
         AIT_FREE_VAL(&c->cli_buf);
 
         io_arrayDel(s->srv_clients, c->cli_id, 42);
         return NULL;
 }
 
 static void *
 txPacket(sched_task_t *task)
 {
         rpc_cli_t *c = TASK_ARG(task);
         rpc_srv_t *s = c->cli_parent;
         rpc_func_t *f = NULL;
         u_char buf[USHRT_MAX] = { 0 };
         struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
         int ret, wlen = sizeof(struct tagRPCCall);
 
         /* copy RPC header */
         memcpy(buf, TASK_DATA(task), wlen);
 
         if (rpc->call_argc) {
                 f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
                 if (!f) {
                         rpc->call_argc ^= rpc->call_argc;
                         rpc->call_rep.ret = RPC_ERROR(-1);
                         rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                 } else {
                         rpc->call_argc = htons(io_arraySize(f->func_vars));
                         /* Go Encapsulate variables */
                         ret = io_vars2buffer(buf + wlen, sizeof buf - wlen, f->func_vars);
                         /* Free return values */
                         io_clrVars(f->func_vars);
                         if (ret == -1) {
                                 rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
                                 rpc->call_argc ^= rpc->call_argc;
                                 rpc->call_rep.ret = RPC_ERROR(-1);
                                 rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                         } else
                                 wlen += ret;
                 }                  }
                memset(&rrpc, 0, sizeof rrpc);        }
                memset(buf, 0, BUFSIZ);
                if ((ret = recv(c->cli_sock, buf, BUFSIZ, 0)) == -1) {        rpc->call_len = htons(wlen);
                        LOGERR;
                        ret = -3;        /* calculate CRC */
                        break;        rpc->call_crc ^= rpc->call_crc;
         rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
 
         /* send reply */
         ret = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);
         if (ret == -1 || ret != wlen) {
                 /* close connection */
                 schedEvent(TASK_ROOT(task), closeClient, c, 42, NULL, 0);
         }
 
         return NULL;
 }
 
 static void *
 execCall(sched_task_t *task)
 {
         rpc_cli_t *c = TASK_ARG(task);
         rpc_srv_t *s = c->cli_parent;
         rpc_func_t *f = NULL;
         array_t *arr = NULL;
         u_char *buf = AIT_GET_BUF(&c->cli_buf) + TASK_VAL(task);
         struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
         int argc = ntohs(rpc->call_argc);
 
         /* Go decapsulate variables ... */
         if (argc) {
                 arr = io_buffer2vars(buf + sizeof(struct tagRPCCall), 
                                 AIT_LEN(&c->cli_buf) - TASK_VAL(task) - sizeof(struct tagRPCCall), 
                                 argc, 1);
                 if (!arr) {
                         rpc_SetErr(ERPCMISMATCH, "#%d - %s", io_GetErrno(), io_GetError());
                         rpc->call_argc ^= rpc->call_argc;
                         rpc->call_rep.ret = RPC_ERROR(-1);
                         rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                         return NULL;
                 }                  }
                if (!ret) {          // receive EOF        } else
                        ret = 0;                arr = NULL;
                        break;
                }        if (!(f = rpc_srv_getCall(s, ntohs(rpc->call_tag)))) {
                if (ret < sizeof(struct tagRPCCall)) {                rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
                        rpc_SetErr(EMSGSIZE, "Error:: too short RPC packet ...\n");                rpc->call_argc ^= rpc->call_argc;
                        ret = -4;                rpc->call_rep.ret = RPC_ERROR(-1);
                        break;                rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                } else        } else {
                        rpc = (struct tagRPCCall*) buf;                /* if client doesn't want reply */
                // check RPC packet session info                argc = rpc->call_req.flags & RPC_NOREPLY;
                if (memcmp(&rpc->call_session, &s->srv_session, sizeof rpc->call_session)) {                rpc->call_rep.ret = RPC_ERROR(rpc_srv_execCall(f, rpc, arr));
                        rpc_SetErr(EINVAL, "Error:: get invalid RPC session ...\n");                if (rpc->call_rep.ret == htonl(-1)) {
                        ret = -5;                        rpc->call_rep.eno = RPC_ERROR(errno);
                        goto makeReply;                        rpc->call_argc ^= rpc->call_argc;
                }                } else {
                // RPC is OK! Go decapsulate variables ...                        rpc->call_rep.eno ^= rpc->call_rep.eno;
                if (rpc->call_argc) {                        if (argc) {
                        v = (rpc_val_t*) (buf + sizeof(struct tagRPCCall));                                /* without reply */
                        // RPC received variables types OK!                                io_clrVars(f->func_vars);
                        data = (u_char*) v + rpc->call_argc * sizeof(rpc_val_t);                                rpc->call_argc ^= rpc->call_argc;
                        for (i = 0; i < rpc->call_argc; i++) {                        } else {
                                switch (v[i].val_type) {                                /* reply */
                                        case buffer:                                rpc->call_argc = htons(io_arraySize(f->func_vars));
                                                v[i].val.buffer = data; 
                                                data += v[i].val_len; 
                                                break; 
                                        case string: 
                                                v[i].val.string = (int8_t*) data; 
                                                data += v[i].val_len + 1; 
                                                break; 
                                        case blob: 
                                                if (s->srv_blob.state == disable) { 
                                                        rpc_SetErr(ENOTSUP, "Error:: BLOB server is disabled\n"); 
                                                        ret = -5; 
                                                        goto makeReply; 
                                                } 
                                        default: 
                                                break; 
                                } 
                         }                          }
                 }                  }
           }
   
                argc = 0;        io_arrayDestroy(&arr);
                vals = NULL;        return NULL;
                if (!(f = rpc_srv_getCall(s, rpc->call_tag, rpc->call_hash))) {}
                        rpc_SetErr(EINVAL, "Error:: call not found into RPC server ...\n"); 
                        ret = -6; 
                } else 
                        if ((ret = rpc_srv_execCall(f, rpc, v)) == -1) 
                                ret = -6; 
                        else 
                                argc = rpc_srv_getValsCall(f, &vals); 
   
makeReply:static void *
                memcpy(&rrpc.ret_session, &rpc->call_session, sizeof rrpc.ret_session);rxPacket(sched_task_t *task)
                rrpc.ret_tag = rpc->call_tag;{
                rrpc.ret_hash = rpc->call_hash;        rpc_cli_t *c = TASK_ARG(task);
                rrpc.ret_errno = rpc_Errno;        rpc_srv_t *s = c->cli_parent;
                rrpc.ret_retcode = ret;        u_char *buf = AIT_GET_BUF(&c->cli_buf);
                rrpc.ret_argc = argc;        int len, rlen, noreply;
         u_short crc, off = 0;
         struct tagRPCCall *rpc;
   
                memset(buf, 0, BUFSIZ);        memset(buf, 0, AIT_LEN(&c->cli_buf));
                memcpy(buf, &rrpc, (Limit = sizeof rrpc));        rlen = recv(TASK_FD(task), buf, AIT_LEN(&c->cli_buf), 0);
                if (argc && vals) {        if (rlen < 1) {
                        v = (rpc_val_t*) (buf + sizeof rrpc);                /* close connection */
                        memcpy(v, vals, argc * sizeof(rpc_val_t));                schedEvent(TASK_ROOT(task), closeClient, c, 42, NULL, 0);
                        Limit += argc * sizeof(rpc_val_t);                return NULL;
                        data = (u_char*) v + argc * sizeof(rpc_val_t);        }
                        for (ret = i = 0; i < argc; i++) { 
                                switch (vals[i].val_type) { 
                                        case buffer: 
                                                if (ret || Limit + vals[i].val_len > BUFSIZ) { 
                                                        rpc_SetErr(EMSGSIZE, "Error:: in prepare RPC packet (-7) ...\n"); 
                                                        rrpc.ret_retcode = ret = -7; 
                                                        rrpc.ret_argc = 0; 
                                                        break; 
                                                } 
   
                                                memcpy(data, vals[i].val.buffer, vals[i].val_len);        do {
                                                data += vals[i].val_len;                /* check RPC packet */
                                                Limit += vals[i].val_len;                if (rlen < sizeof(struct tagRPCCall)) {
                                                break;                        rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
                                        case string: 
                                                if (ret || Limit + vals[i].val_len + 1 > BUFSIZ) { 
                                                        rpc_SetErr(EMSGSIZE, "Error:: in prepare RPC packet (-7) ...\n"); 
                                                        rrpc.ret_retcode = ret = -7; 
                                                        rrpc.ret_argc = 0; 
                                                        break; 
                                                } 
   
                                                memcpy(data, vals[i].val.string, vals[i].val_len + 1);                        schedReadSelf(task);
                                                data += vals[i].val_len + 1;                        return NULL;
                                                Limit += vals[i].val_len + 1;                } else
                                                break;                        rpc = (struct tagRPCCall*) (buf + off);
                                        case blob: 
                                                if (s->srv_blob.state == disable) { 
                                                        rpc_SetErr(ENOTSUP, "Error:: BLOB server is disabled\n"); 
                                                        rrpc.ret_retcode = ret = -5; 
                                                        rrpc.ret_argc = 0; 
                                                        break; 
                                                } 
                                        default: 
                                                break; 
                                } 
   
                                RPC_FREE_VAL(&vals[i]);                len = ntohs(rpc->call_len);
                        }                rlen -= len;
 
                 /* check integrity of packet */
                 crc = ntohs(rpc->call_crc);
                 rpc->call_crc ^= rpc->call_crc;
                 if (crc != crcFletcher16((u_short*) (buf + off), len / 2)) {
                         rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
 
                         off += len;
                         if (rlen < 1)
                                 break;
                         else
                                 continue;
                 }                  }
   
                if ((ret = send(c->cli_sock, buf, Limit, 0)) == -1) {                noreply = rpc->call_req.flags & RPC_NOREPLY;
                        LOGERR;
                        ret = -8;                /* check RPC packet session info */
                        break;                if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) {
                         rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
                         rpc->call_argc ^= rpc->call_argc;
                         rpc->call_rep.ret = RPC_ERROR(-1);
                         rpc->call_rep.eno = RPC_ERROR(errno);
                 } else {
                         /* execute RPC call */
                         schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), off, NULL, 0);
                 }                  }
                 if (ret != Limit) {  
                         rpc_SetErr(EBADMSG, "Error:: in send RPC request, should be send %d bytes, "  
                                         "really is %d\n", Limit, ret);  
                         ret = -9;  
                         break;  
                 }  
         } while (ret > -1);  
   
        shutdown(c->cli_sock, SHUT_RDWR);                /* send RPC reply */
                 if (!noreply)
                         schedWrite(TASK_ROOT(task), txPacket, TASK_ARG(task), TASK_FD(task), rpc, len);
 
                 off += len;
         } while (rlen > 0);
 
         /* lets get next packet */
         schedReadSelf(task);
         return NULL;
 }
 
 static void *
 acceptClients(sched_task_t *task)
 {
         rpc_srv_t *srv = TASK_ARG(task);
         rpc_cli_t *c = NULL;
         register int i;
         socklen_t salen = sizeof(io_sockaddr_t);
 
         /* check free slots for connect */
         for (i = 0; i < io_arraySize(srv->srv_clients) && 
                         (c = io_array(srv->srv_clients, i, rpc_cli_t*)); i++);
         if (c)  /* no more free slots! */
                 goto end;
         c = malloc(sizeof(rpc_cli_t));
         if (!c) {
                 LOGERR;
                 srv->srv_kill = 1;
                 return NULL;
         } else {
                 memset(c, 0, sizeof(rpc_cli_t));
                 io_arraySet(srv->srv_clients, i, c);
                 c->cli_id = i;
                 c->cli_parent = srv;
         }
 
         /* alloc empty buffer */
         AIT_SET_BUF2(&c->cli_buf, 0, srv->srv_netbuf);
 
         /* accept client */
         c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
         if (c->cli_sock == -1) {
                 LOGERR;
                 AIT_FREE_VAL(&c->cli_buf);
                 io_arrayDel(srv->srv_clients, i, 42);
                 goto end;
         } else
                 fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
 
         schedRead(TASK_ROOT(task), rxPacket, c, c->cli_sock, NULL, 0);
 end:
         schedReadSelf(task);
         return NULL;
 }
 
 /* ------------------------------------------------------ */
 
 static void *
 closeBLOBClient(sched_task_t *task)
 {
         rpc_cli_t *c = TASK_ARG(task);
         rpc_srv_t *s = c->cli_parent;
 
         schedCancelby(s->srv_blob.root, taskMAX, CRITERIA_ARG, TASK_ARG(task), NULL);
 
         /* close client socket */
         if (TASK_VAL(task))
                 shutdown(c->cli_sock, SHUT_RDWR);
         close(c->cli_sock);          close(c->cli_sock);
        memset(c, 0, sizeof(rpc_cli_t));
        return (void*) ret;        /* free buffer */
         AIT_FREE_VAL(&c->cli_buf);
 
         io_arrayDel(s->srv_blob.clients, c->cli_id, 42);
         return NULL;
 }  }
   
   static void *
   txBLOB(sched_task_t *task)
   {
           rpc_cli_t *c = TASK_ARG(task);
           u_char *buf = AIT_GET_BUF(&c->cli_buf);
           struct tagBLOBHdr *blob = (struct tagBLOBHdr *) buf;
           int wlen = sizeof(struct tagBLOBHdr);
   
           /* calculate CRC */
           blob->hdr_crc ^= blob->hdr_crc;
           blob->hdr_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
   
           /* send reply */
           wlen = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);
           if (wlen == -1 || wlen != sizeof(struct tagBLOBHdr)) {
                   /* close blob connection */
                   schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
           }
   
           return NULL;
   }
   
 static void *  static void *
rpc_srv_dispatchVars(void *arg)rxBLOB(sched_task_t *task)
 {  {
        rpc_cli_t *c = arg;        rpc_cli_t *c = TASK_ARG(task);
        rpc_srv_t *s;        rpc_srv_t *s = c->cli_parent;
         rpc_blob_t *b;          rpc_blob_t *b;
        int cx, ret;        u_char *buf = AIT_GET_BUF(&c->cli_buf);
        fd_set fds;        struct tagBLOBHdr *blob = (struct tagBLOBHdr *) buf;
        u_char buf[sizeof(struct tagBLOBHdr)];        int rlen;
        struct tagBLOBHdr *blob;        u_short crc;
   
        if (!arg) {        memset(buf, 0, AIT_LEN(&c->cli_buf));
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t procced BLOB client ...\n");        rlen = recv(TASK_FD(task), buf, AIT_LEN(&c->cli_buf), 0);
         if (rlen < 1) {
                 /* close blob connection */
                 schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
                 return NULL;                  return NULL;
        } else        }
                s = c->cli_parent; 
   
        cx = -1;        /* check BLOB packet */
        do {        if (rlen < sizeof(struct tagBLOBHdr)) {
                // check for disable service at this moment?                rpc_SetErr(ERPCMISMATCH, "Short BLOB packet");
                if (s->srv_blob.state == disable) { 
                        ret = 0; 
                        break; 
                } 
   
                FD_ZERO(&fds);                schedReadSelf(task);
                FD_SET(c->cli_sock, &fds);                return NULL;
                ret = select(c->cli_sock + 1, &fds, NULL, NULL, NULL);        }
                if (ret == -1) { 
                        ret = -2; 
                } else 
                        cx++; 
                memset(buf, 0, sizeof buf); 
                if ((ret = recv(c->cli_sock, buf, sizeof buf, 0)) == -1) { 
                        LOGERR; 
                        ret = -3; 
                        break; 
                } 
                if (!ret || s->srv_blob.state == disable) {     // receive EOF or disable service 
                        ret = 0; 
                        break; 
                } 
                if (ret < sizeof(struct tagBLOBHdr)) { 
                        rpc_SetErr(EMSGSIZE, "Error:: too short BLOB packet ...\n"); 
                        ret = -4; 
                        break; 
                } else 
                        blob = (struct tagBLOBHdr*) buf; 
                // check BLOB packet session info 
                if (memcmp(&blob->hdr_session, &s->srv_session, sizeof blob->hdr_session) ||  
                                blob->hdr_seq != cx) { 
                        rpc_SetErr(EINVAL, "Error:: get invalid BLOB session in seq=%d...\n", blob->hdr_seq); 
                        ret = -5; 
                        goto makeReply; 
                } 
                // Go to proceed packet ... 
                switch (blob->hdr_cmd) { 
                        case get: 
                                if (!(b = rpc_srv_getBLOB(s, blob->hdr_var))) { 
                                        rpc_SetErr(EINVAL, "Error:: var (%x) not found into BLOB server ...\n",  
                                                        blob->hdr_var); 
                                        ret = -6; 
                                        break; 
                                } 
   
                                if (rpc_srv_blobMap(s, b) != -1) {        /* check integrity of packet */
                                        ret = rpc_srv_sendBLOB(c, b);        crc = ntohs(blob->hdr_crc);
                                        rpc_srv_blobUnmap(b);        blob->hdr_crc ^= blob->hdr_crc;
                                } else        if (crc != crcFletcher16((u_short*) buf, rlen / 2)) {
                                        ret = -7;                rpc_SetErr(ERPCMISMATCH, "Bad CRC BLOB packet");
                                break; 
                        case set: 
                                if ((b = rpc_srv_registerBLOB(s, blob->hdr_len))) { 
                                        // set new BLOB variable for reply :) 
                                        blob->hdr_var = b->blob_var; 
   
                                        ret = rpc_srv_recvBLOB(c, b);                schedReadSelf(task);
                                        rpc_srv_blobUnmap(b);                return NULL;
                                } else        }
                                        ret = -7;
         /* check RPC packet session info */
         if ((crc = rpc_chkPktSession(&blob->hdr_session, &s->srv_session))) {
                 rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
                 blob->hdr_cmd = error;
                 goto end;
         }
 
         /* Go to proceed packet ... */
         switch (blob->hdr_cmd) {
                 case get:
                         if (!(b = rpc_srv_getBLOB(s, ntohl(blob->hdr_var)))) {
                                 rpc_SetErr(EINVAL, "Var=%x not found", ntohl(blob->hdr_var));
                                 blob->hdr_cmd = no;
                                 blob->hdr_ret = RPC_ERROR(-1);
                                 break;                                  break;
                        case unset:                        } else
                                ret = rpc_srv_unregisterBLOB(s, blob->hdr_var);                                blob->hdr_len = htonl(b->blob_len);
                                if (ret == -1) 
                                        ret = -7; 
                                break; 
                        default: 
                                rpc_SetErr(EINVAL, "Error:: unsupported BLOB command (%d)...\n",  
                                                blob->hdr_cmd); 
                                ret = -7; 
                } 
   
makeReply:                        if (rpc_srv_blobMap(s, b) != -1) {
                // Replay to client!                                /* deliver BLOB variable to client */
                blob->hdr_cmd = ret < 0 ? error : ok;                                blob->hdr_ret = htonl(rpc_srv_sendBLOB(c, b));
                blob->hdr_seq = ret;                                rpc_srv_blobUnmap(b);
                if ((ret = send(c->cli_sock, buf, sizeof buf, 0)) == -1) {                        } else {
                        LOGERR;                                blob->hdr_cmd = error;
                        ret = -8;                                blob->hdr_ret = RPC_ERROR(-1);
                         }
                         break;                          break;
                }                case set:
                if (ret != sizeof buf) {                        if ((b = rpc_srv_registerBLOB(s, ntohl(blob->hdr_len)))) {
                        rpc_SetErr(EBADMSG, "Error:: in send BLOB reply, should be send %d bytes, "                                /* set new BLOB variable for reply :) */
                                        "really is %d\n", sizeof buf, ret);                                blob->hdr_var = htonl(b->blob_var);
                        ret = -9;
                                 /* receive BLOB from client */
                                 blob->hdr_ret = htonl(rpc_srv_recvBLOB(c, b));
                                 rpc_srv_blobUnmap(b);
                         } else {
                                 blob->hdr_cmd = error;
                                 blob->hdr_ret = RPC_ERROR(-1);
                         }
                         break;                          break;
                }                case unset:
        } while (ret > -1);                        if (rpc_srv_unregisterBLOB(s, blob->hdr_var) == -1) {
                                 blob->hdr_cmd = error;
                                 blob->hdr_ret = RPC_ERROR(-1);
                         }
                         break;
                 default:
                         rpc_SetErr(EPROCUNAVAIL, "Unsupported BLOB command %d", blob->hdr_cmd);
                         blob->hdr_cmd = error;
                         blob->hdr_ret = RPC_ERROR(-1);
         }
   
        shutdown(c->cli_sock, SHUT_RDWR);end:
        close(c->cli_sock);        schedWrite(TASK_ROOT(task), txBLOB, TASK_ARG(task), TASK_FD(task), NULL, 0);
        memset(c, 0, sizeof(rpc_cli_t));        schedReadSelf(task);
        return (void*) ret;        return NULL;
 }  }
   
// -------------------------------------------------static void *
 acceptBLOBClients(sched_task_t *task)
 {
         rpc_srv_t *srv = TASK_ARG(task);
         rpc_cli_t *c = NULL;
         register int i;
         socklen_t salen = sizeof(io_sockaddr_t);
   
           /* check free slots for connect */
           for (i = 0; i < io_arraySize(srv->srv_blob.clients) && 
                           (c = io_array(srv->srv_blob.clients, i, rpc_cli_t*)); i++);
           if (c)  /* no more free slots! */
                   goto end;
           c = malloc(sizeof(rpc_cli_t));
           if (!c) {
                   LOGERR;
                   srv->srv_kill = srv->srv_blob.kill = 1;
                   return NULL;
           } else {
                   memset(c, 0, sizeof(rpc_cli_t));
                   io_arraySet(srv->srv_blob.clients, i, c);
                   c->cli_id = i;
                   c->cli_parent = srv;
           }
   
           /* alloc empty buffer */
           AIT_SET_BUF2(&c->cli_buf, 0, srv->srv_netbuf);
   
           /* accept client */
           c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
           if (c->cli_sock == -1) {
                   LOGERR;
                   AIT_FREE_VAL(&c->cli_buf);
                   io_arrayDel(srv->srv_blob.clients, i, 42);
                   goto end;
           } else
                   fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
   
           schedRead(TASK_ROOT(task), rxBLOB, c, c->cli_sock, NULL, 0);
   end:
           schedReadSelf(task);
           return NULL;
   }
   
   /* ------------------------------------------------------ */
   
 /*  /*
 * rpc_srv_initBLOBServer() Init & create BLOB Server * rpc_srv_initBLOBServer() - Init & create BLOB Server
  *
  * @srv = RPC server instance
  * @Port = Port for bind server, if Port == 0 default port is selected   * @Port = Port for bind server, if Port == 0 default port is selected
  * @diskDir = Disk place for BLOB file objects   * @diskDir = Disk place for BLOB file objects
  * return: -1 == error or 0 bind and created BLOB server instance   * return: -1 == error or 0 bind and created BLOB server instance
Line 300  int Line 482  int
 rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)  rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)
 {  {
         int n = 1;          int n = 1;
         struct sockaddr_in sin;  
         struct sockaddr_in6 sin6;  
   
        if (!srv) {        if (!srv || srv->srv_kill) {
                rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t init BLOB server ...\n");                rpc_SetErr(EINVAL, "Invalid parameters can`t init BLOB server");
                 return -1;                  return -1;
         }          }
         if (srv->srv_blob.state) {  
                 rpc_SetErr(EPERM, "Warning:: Already started BLOB server!\n");  
                 return 0;  
         }  
         if (!Port)  
                 Port = RPC_DEFPORT + 1;  
   
         memset(&srv->srv_blob, 0, sizeof srv->srv_blob);          memset(&srv->srv_blob, 0, sizeof srv->srv_blob);
         if (access(diskDir, R_OK | W_OK) == -1) {          if (access(diskDir, R_OK | W_OK) == -1) {
                 LOGERR;                  LOGERR;
                 return -1;                  return -1;
         } else          } else
                strlcpy(srv->srv_blob.dir, diskDir, UCHAR_MAX + 1);                AIT_SET_STR(&srv->srv_blob.dir, diskDir);
   
        srv->srv_blob.server.cli_tid = pthread_self();        /* init blob list */
         TAILQ_INIT(&srv->srv_blob.blobs);
 
         srv->srv_blob.server.cli_parent = srv;          srv->srv_blob.server.cli_parent = srv;
        if (srv->srv_server.cli_sa.sa_family == AF_INET) {
                memcpy(&sin, &srv->srv_server.cli_sa, sizeof sin);        memcpy(&srv->srv_blob.server.cli_sa, &srv->srv_server.cli_sa, sizeof(io_sockaddr_t));
                sin.sin_port = htons(Port);        switch (srv->srv_blob.server.cli_sa.sa.sa_family) {
                memcpy(&srv->srv_blob.server.cli_sa, &sin, sizeof(struct sockaddr));                case AF_INET:
        } else {                        srv->srv_blob.server.cli_sa.sin.sin_port = 
                memcpy(&sin6, &srv->srv_server.cli_sa, sizeof sin6);                                htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin.sin_port) + 1);
                sin6.sin6_port = htons(Port);                        break;
                memcpy(&srv->srv_blob.server.cli_sa, &sin6, sizeof(struct sockaddr));                case AF_INET6:
                         srv->srv_blob.server.cli_sa.sin6.sin6_port = 
                                 htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin6.sin6_port) + 1);
                         break;
                 case AF_LOCAL:
                         strlcat(srv->srv_blob.server.cli_sa.sun.sun_path, ".blob", 
                                         sizeof srv->srv_blob.server.cli_sa.sun.sun_path);
                         break;
                 default:
                         AIT_FREE_VAL(&srv->srv_blob.dir);
                         return -1;
         }          }
   
        srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa_family, SOCK_STREAM, 0);        /* create BLOB server socket */
         srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);
         if (srv->srv_blob.server.cli_sock == -1) {          if (srv->srv_blob.server.cli_sock == -1) {
                 LOGERR;                  LOGERR;
                   AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;                  return -1;
         }          }
         if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {          if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
                 LOGERR;                  LOGERR;
                 close(srv->srv_blob.server.cli_sock);                  close(srv->srv_blob.server.cli_sock);
                   AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;                  return -1;
         }          }
        if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa,         n = srv->srv_netbuf;
                                sizeof srv->srv_blob.server.cli_sa) == -1) {        if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
                 LOGERR;                  LOGERR;
                 close(srv->srv_blob.server.cli_sock);                  close(srv->srv_blob.server.cli_sock);
                   AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;                  return -1;
         }          }
           if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
                   LOGERR;
                   close(srv->srv_blob.server.cli_sock);
                   AIT_FREE_VAL(&srv->srv_blob.dir);
                   return -1;
           }
           if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa, 
                                   srv->srv_blob.server.cli_sa.sa.sa_len) == -1) {
                   LOGERR;
                   close(srv->srv_blob.server.cli_sock);
                   AIT_FREE_VAL(&srv->srv_blob.dir);
                   return -1;
           }
   
        srv->srv_blob.clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));        /* allocate pool for concurent blob clients */
         srv->srv_blob.clients = io_arrayInit(io_arraySize(srv->srv_clients));
         if (!srv->srv_blob.clients) {          if (!srv->srv_blob.clients) {
                LOGERR;                rpc_SetErr(io_GetErrno(), "%s", io_GetError());
                 close(srv->srv_blob.server.cli_sock);                  close(srv->srv_blob.server.cli_sock);
                   AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;                  return -1;
        } else        }
                memset(srv->srv_blob.clients, 0, srv->srv_numcli * sizeof(rpc_cli_t)); 
   
        pthread_mutex_init(&srv->srv_blob.mtx, NULL);        /* init blob scheduler */
         srv->srv_blob.root = schedBegin();
         if (!srv->srv_blob.root) {
                 rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                 io_arrayDestroy(&srv->srv_blob.clients);
                 close(srv->srv_blob.server.cli_sock);
                 AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;
         }
   
         pthread_mutex_lock(&srv->srv_mtx);  
         rpc_srv_registerCall(srv, NULL, CALL_BLOBSHUTDOWN, 0);  
         rpc_srv_registerCall(srv, NULL, CALL_BLOBCLIENTS, 0);  
         rpc_srv_registerCall(srv, NULL, CALL_BLOBVARS, 0);  
         pthread_mutex_unlock(&srv->srv_mtx);  
   
         srv->srv_blob.state = enable;   // enable BLOB  
         return 0;          return 0;
 }  }
   
 /*  /*
 * rpc_srv_endBLOBServer() Destroy BLOB server, close all opened sockets and free resources * rpc_srv_endBLOBServer() - Destroy BLOB server, close all opened sockets and free resources
  *
  * @srv = RPC Server instance   * @srv = RPC Server instance
  * return: none   * return: none
  */   */
Line 380  rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv) Line 586  rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
 {  {
         rpc_cli_t *c;          rpc_cli_t *c;
         register int i;          register int i;
        rpc_blob_t *f;        rpc_blob_t *b, *tmp;
   
        if (!srv) {        if (!srv)
                rpc_SetErr(EINVAL, "Error:: Can`t destroy server because parameter is null!\n"); 
                 return;                  return;
         } else  
                 srv->srv_blob.state = disable;  
   
        rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSHUTDOWN);        /* close all clients connections & server socket */
        rpc_srv_unregisterCall(srv, NULL, CALL_BLOBCLIENTS);        for (i = 0; i < io_arraySize(srv->srv_blob.clients); i++) {
        rpc_srv_unregisterCall(srv, NULL, CALL_BLOBVARS);                c = io_array(srv->srv_blob.clients, i, rpc_cli_t*);
                if (c) {
        for (i = 0, c = srv->srv_blob.clients; i < srv->srv_numcli && c; i++, c++) 
                if (c->cli_sa.sa_family) 
                         shutdown(c->cli_sock, SHUT_RDWR);                          shutdown(c->cli_sock, SHUT_RDWR);
                           close(c->cli_sock);
   
                           io_arrayDel(srv->srv_blob.clients, i, 42);
                   }
           }
           io_arrayDestroy(&srv->srv_blob.clients);
   
         close(srv->srv_blob.server.cli_sock);          close(srv->srv_blob.server.cli_sock);
   
        if (srv->srv_blob.clients)        /* detach blobs */
                free(srv->srv_blob.clients);        TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
                 TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
   
        pthread_mutex_lock(&srv->srv_blob.mtx);                rpc_srv_blobFree(srv, b);
        while ((f = srv->srv_blob.blobs)) {                free(b);
                srv->srv_blob.blobs = f->blob_next; 
                free(f); 
         }          }
         pthread_mutex_unlock(&srv->srv_blob.mtx);  
   
        pthread_mutex_destroy(&srv->srv_blob.mtx);        schedEnd(&srv->srv_blob.root);
         AIT_FREE_VAL(&srv->srv_blob.dir);
 
         srv->srv_blob.kill = 1;
 }  }
   
 /*  /*
 * rpc_srv_execBLOBServer() Execute Main BLOB server loop and wait for clients requests * rpc_srv_loopBLOBServer() - Execute Main BLOB server loop and wait for clients requests
  *
  * @srv = RPC Server instance   * @srv = RPC Server instance
  * return: -1 error or 0 ok, infinite loop ...   * return: -1 error or 0 ok, infinite loop ...
  */   */
 int  int
rpc_srv_execBLOBServer(rpc_srv_t * __restrict srv)rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv)
 {  {
        socklen_t salen = sizeof(struct sockaddr);        if (!srv || srv->srv_kill) {
        register int i;                rpc_SetErr(EINVAL, "Invalid parameter can`t start BLOB server");
        rpc_cli_t *c; 
        fd_set fds; 
        int ret; 
        struct timeval tv = { DEF_RPC_TIMEOUT, 0 }; 
 
        if (!srv || srv->srv_blob.state == disable) { 
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start BLOB server ...\n"); 
                 return -1;                  return -1;
         }          }
   
        if (listen(srv->srv_blob.server.cli_sock, SOMAXCONN) == -1) {        fcntl(srv->srv_blob.server.cli_sock, F_SETFL, 
                         fcntl(srv->srv_blob.server.cli_sock, F_GETFL) | O_NONBLOCK);
 
         if (listen(srv->srv_blob.server.cli_sock, io_arraySize(srv->srv_blob.clients)) == -1) {
                 LOGERR;                  LOGERR;
                 return -1;                  return -1;
         }          }
   
        while (!blob_Kill && !rpc_Kill) {        if (!schedRead(srv->srv_blob.root, acceptBLOBClients, srv, 
                                 srv->srv_blob.server.cli_sock, NULL, 0)) {
                 rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                 return -1;
         }
 
 #if 0
                 for (c = srv->srv_blob.clients, i = 0; i < srv->srv_numcli && c; i++, c++)                  for (c = srv->srv_blob.clients, i = 0; i < srv->srv_numcli && c; i++, c++)
                        if (!c->cli_sa.sa_family)                        if (!c->cli_sa.sa.sa_family)
                                 break;                                  break;
                if (c && c->cli_sa.sa_family && c->cli_parent) {                if (i >= srv->srv_numcli) {
 #ifdef HAVE_PTHREAD_YIELD
                         pthread_yield();
 #endif
                         usleep(1000000);                          usleep(1000000);
                         continue;                          continue;
                 }                  }
   
                FD_ZERO(&fds);                c->cli_sock = accept(srv->srv_blob.server.cli_sock, &c->cli_sa.sa, &salen);
                FD_SET(srv->srv_blob.server.cli_sock, &fds); 
                ret = select(srv->srv_blob.server.cli_sock + 1, &fds, NULL, NULL, &tv); 
                if (ret == -1) { 
                        LOGERR; 
                        ret = 1; 
                        break; 
                } 
                if (!ret) 
                        continue; 
 
                c->cli_sock = accept(srv->srv_blob.server.cli_sock, &c->cli_sa, &salen); 
                 if (c->cli_sock == -1) {                  if (c->cli_sock == -1) {
                         LOGERR;                          LOGERR;
                         continue;                          continue;
                 } else                  } else
                         c->cli_parent = srv;                          c->cli_parent = srv;
   
                 if (pthread_create(&c->cli_tid, NULL, rpc_srv_dispatchVars, c)) {  
                         LOGERR;  
                         continue;  
                 }  
         }          }
   #endif
   
        srv->srv_blob.state = disable;        /* main rpc loop */
        schedRun(srv->srv_blob.root, &srv->srv_blob.kill);
         return 0;          return 0;
 }  }
   
   
 /*  /*
 * rpc_srv_initServer() Init & create RPC Server * rpc_srv_initServer() - Init & create RPC Server
  *
  * @regProgID = ProgramID for authentication & recognition   * @regProgID = ProgramID for authentication & recognition
  * @regProcID = ProcessID for authentication & recognition   * @regProcID = ProcessID for authentication & recognition
  * @concurentClients = Concurent clients at same time to this server   * @concurentClients = Concurent clients at same time to this server
 * @family = Family socket type, AF_INET or AF_INET6 * @netBuf = Network buffer length, if =0 == BUFSIZ (also meaning max RPC packet)
 * @csHost = Host name or IP address for bind server, if NULL any address * @csHost = Host name or address for bind server, if NULL any address
  * @Port = Port for bind server, if Port == 0 default port is selected   * @Port = Port for bind server, if Port == 0 default port is selected
  * return: NULL == error or !=NULL bind and created RPC server instance   * return: NULL == error or !=NULL bind and created RPC server instance
  */   */
 rpc_srv_t *  rpc_srv_t *
rpc_srv_initServer(u_int regProgID, u_int regProcID, int concurentClients, rpc_srv_initServer(u_int regProgID, u_char regProcID, int concurentClients, 
                u_short family, const char *csHost, u_short Port)                int netBuf, const char *csHost, u_short Port)
 {  {
         rpc_srv_t *srv = NULL;  
         int n = 1;          int n = 1;
        struct hostent *host = NULL;        rpc_srv_t *srv = NULL;
        struct sockaddr_in sin;        io_sockaddr_t sa;
        struct sockaddr_in6 sin6; 
   
        if (!concurentClients || !regProgID || (family != AF_INET && family != AF_INET6)) {        if (!concurentClients || !regProgID) {
                rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t init RPC server ...\n");                rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
                 return NULL;                  return NULL;
         }          }
           if (!io_gethostbyname(csHost, Port, &sa))
                   return NULL;
         if (!Port)          if (!Port)
                 Port = RPC_DEFPORT;                  Port = RPC_DEFPORT;
        if (csHost) {        if (!netBuf)
                host = gethostbyname2(csHost, family);                netBuf = BUFSIZ;
                if (!host) {        else
                        rpc_SetErr(h_errno, "Error:: %s\n", hstrerror(h_errno));                netBuf = io_align(netBuf, 1);   /* align netBuf length */
                        return NULL; 
                } 
        } 
        switch (family) { 
                case AF_INET: 
                        memset(&sin, 0, sizeof sin); 
                        sin.sin_len = sizeof sin; 
                        sin.sin_family = family; 
                        sin.sin_port = htons(Port); 
                        if (csHost) 
                                memcpy(&sin.sin_addr, host->h_addr, host->h_length); 
                        break; 
                case AF_INET6: 
                        memset(&sin6, 0, sizeof sin6); 
                        sin6.sin6_len = sizeof sin6; 
                        sin6.sin6_family = family; 
                        sin6.sin6_port = htons(Port); 
                        if (csHost) 
                                memcpy(&sin6.sin6_addr, host->h_addr, host->h_length); 
                        break; 
                default: 
                        rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t start RPC server ...\n"); 
                        return NULL; 
        } 
   
   #ifdef HAVE_SRANDOMDEV
           srandomdev();
   #else
           time_t tim;
   
           srandom((time(&tim) ^ getpid()));
   #endif
   
         srv = malloc(sizeof(rpc_srv_t));          srv = malloc(sizeof(rpc_srv_t));
         if (!srv) {          if (!srv) {
                 LOGERR;                  LOGERR;
Line 536  rpc_srv_initServer(u_int regProgID, u_int regProcID, i Line 721  rpc_srv_initServer(u_int regProgID, u_int regProcID, i
         } else          } else
                 memset(srv, 0, sizeof(rpc_srv_t));                  memset(srv, 0, sizeof(rpc_srv_t));
   
        srv->srv_numcli = concurentClients;        srv->srv_netbuf = netBuf;
         srv->srv_session.sess_version = RPC_VERSION;          srv->srv_session.sess_version = RPC_VERSION;
         srv->srv_session.sess_program = regProgID;          srv->srv_session.sess_program = regProgID;
         srv->srv_session.sess_process = regProcID;          srv->srv_session.sess_process = regProcID;
   
         srv->srv_server.cli_tid = pthread_self();  
         srv->srv_server.cli_parent = srv;          srv->srv_server.cli_parent = srv;
        if (family == AF_INET)        memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa);
                memcpy(&srv->srv_server.cli_sa, &sin, sizeof srv->srv_server.cli_sa);
        else        /* init functions list */
                memcpy(&srv->srv_server.cli_sa, &sin6, sizeof srv->srv_server.cli_sa);        TAILQ_INIT(&srv->srv_funcs);
        srv->srv_server.cli_sock = socket(family, SOCK_STREAM, 0);
        if (srv->srv_server.cli_sock == -1) {        /* init scheduler */
                LOGERR;        srv->srv_root = schedBegin();
         if (!srv->srv_root) {
                 rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                 free(srv);                  free(srv);
                 return NULL;                  return NULL;
         }          }
        if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
                LOGERR;        /* init pool for clients */
                close(srv->srv_server.cli_sock);        srv->srv_clients = io_arrayInit(concurentClients);
         if (!srv->srv_clients) {
                 rpc_SetErr(io_GetErrno(), "%s", io_GetError());
                 schedEnd(&srv->srv_root);
                 free(srv);                  free(srv);
                 return NULL;                  return NULL;
         }          }
        if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa, sizeof srv->srv_server.cli_sa) == -1) {
         /* create server socket */
         srv->srv_server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);
         if (srv->srv_server.cli_sock == -1) {
                 LOGERR;                  LOGERR;
                close(srv->srv_server.cli_sock);                io_arrayDestroy(&srv->srv_clients);
                 schedEnd(&srv->srv_root);
                 free(srv);                  free(srv);
                 return NULL;                  return NULL;
         }          }
        if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
        srv->srv_clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t)); 
        if (!srv->srv_clients) { 
                 LOGERR;                  LOGERR;
                close(srv->srv_server.cli_sock);                goto err;
                free(srv);        }
                return NULL;        n = srv->srv_netbuf;
        } else        if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
                memset(srv->srv_clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));                LOGERR;
                 goto err;
         }
         if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
                 LOGERR;
                 goto err;
         }
         if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa, 
                                 srv->srv_server.cli_sa.sa.sa_len) == -1) {
                 LOGERR;
                 goto err;
         }
   
         rpc_srv_registerCall(srv, NULL, CALL_SRVSHUTDOWN, 0);  
         rpc_srv_registerCall(srv, NULL, CALL_SRVCLIENTS, 0);  
         rpc_srv_registerCall(srv, NULL, CALL_SRVCALLS, 0);  
         rpc_srv_registerCall(srv, NULL, CALL_SRVSESSIONS, 0);  
   
         pthread_mutex_init(&srv->srv_mtx, NULL);  
         return srv;          return srv;
   err:    /* error condition */
           close(srv->srv_server.cli_sock);
           io_arrayDestroy(&srv->srv_clients);
           schedEnd(&srv->srv_root);
           free(srv);
           return NULL;
 }  }
   
 /*  /*
 * rpc_srv_endServer() Destroy RPC server, close all opened sockets and free resources * rpc_srv_endServer() - Destroy RPC server, close all opened sockets and free resources
 * @srv = RPC Server instance *
  * @psrv = RPC Server instance
  * return: none   * return: none
  */   */
 void  void
rpc_srv_endServer(rpc_srv_t * __restrict srv)rpc_srv_endServer(rpc_srv_t ** __restrict psrv)
 {  {
         rpc_cli_t *c;          rpc_cli_t *c;
         register int i;          register int i;
        rpc_func_t *f;        rpc_func_t *f, *tmp;
   
        if (!srv) {        if (!psrv || !*psrv)
                rpc_SetErr(EINVAL, "Error:: Can`t destroy server because parameter is null!\n"); 
                 return;                  return;
         }  
   
        rpc_srv_endBLOBServer(srv);        if (!(*psrv)->srv_blob.kill)
                 rpc_srv_endBLOBServer(*psrv);
   
        for (i = 0, c = srv->srv_clients; i < srv->srv_numcli && c; i++, c++)        /* close all clients connections & server socket */
                if (c->cli_sa.sa_family)        for (i = 0; i < io_arraySize((*psrv)->srv_clients); i++) {
                 c = io_array((*psrv)->srv_clients, i, rpc_cli_t*);
                 if (c) {
                         shutdown(c->cli_sock, SHUT_RDWR);                          shutdown(c->cli_sock, SHUT_RDWR);
        close(srv->srv_server.cli_sock);                        close(c->cli_sock);
   
        if (srv->srv_clients) {                        io_arrayDel((*psrv)->srv_clients, i, 42);
                free(srv->srv_clients);                }
                srv->srv_numcli = 0; 
         }          }
           io_arrayDestroy(&(*psrv)->srv_clients);
   
        pthread_mutex_lock(&srv->srv_mtx);        close((*psrv)->srv_server.cli_sock);
        while ((f = srv->srv_funcs)) {
                srv->srv_funcs = f->func_next;        /* detach exported calls */
         TAILQ_FOREACH_SAFE(f, &(*psrv)->srv_funcs, func_node, tmp) {
                 TAILQ_REMOVE(&(*psrv)->srv_funcs, f, func_node);
 
                 io_freeVars(&f->func_vars);
                 AIT_FREE_VAL(&f->func_name);
                 free(f);                  free(f);
         }          }
         pthread_mutex_unlock(&srv->srv_mtx);  
   
        pthread_mutex_destroy(&srv->srv_mtx);        schedEnd(&(*psrv)->srv_root);
        free(*psrv);
        free(srv);        *psrv = NULL;
        srv = NULL; 
 }  }
   
 /*  /*
 * rpc_srv_execServer() Execute Main server loop and wait for clients requests * rpc_srv_loopServer() - Execute Main server loop and wait for clients requests
  *
  * @srv = RPC Server instance   * @srv = RPC Server instance
  * return: -1 error or 0 ok, infinite loop ...   * return: -1 error or 0 ok, infinite loop ...
  */   */
 int  int
rpc_srv_execServer(rpc_srv_t * __restrict srv)rpc_srv_loopServer(rpc_srv_t * __restrict srv)
 {  {
         socklen_t salen = sizeof(struct sockaddr);  
         register int i;  
         rpc_cli_t *c;  
         fd_set fds;  
         int ret;  
         struct timeval tv = { DEF_RPC_TIMEOUT, 0 };  
   
         if (!srv) {          if (!srv) {
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start RPC server ...\n");                rpc_SetErr(EINVAL, "Invalid parameter can`t start RPC server");
                 return -1;                  return -1;
         }          }
   
        if (listen(srv->srv_server.cli_sock, SOMAXCONN) == -1) {        fcntl(srv->srv_server.cli_sock, F_SETFL, 
                         fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
 
         if (listen(srv->srv_server.cli_sock, io_arraySize(srv->srv_clients)) == -1) {
                 LOGERR;                  LOGERR;
                 return -1;                  return -1;
         }          }
   
        while (!rpc_Kill) {        if (!schedRead(srv->srv_root, acceptClients, srv, srv->srv_server.cli_sock, NULL, 0)) {
                for (c = srv->srv_clients, i = 0; i < srv->srv_numcli && c; i++, c++)                rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                        if (!c->cli_sa.sa_family)                return -1;
                                break; 
                if (c && c->cli_sa.sa_family && c->cli_parent) { 
                        usleep(1000000); 
                        continue; 
                } 
 
                FD_ZERO(&fds); 
                FD_SET(srv->srv_server.cli_sock, &fds); 
                ret = select(srv->srv_server.cli_sock + 1, &fds, NULL, NULL, &tv); 
                if (ret == -1) { 
                        LOGERR; 
                        ret = 1; 
                        break; 
                } 
                if (!ret) 
                        continue; 
 
                c->cli_sock = accept(srv->srv_server.cli_sock, &c->cli_sa, &salen); 
                if (c->cli_sock == -1) { 
                        LOGERR; 
                        continue; 
                } else 
                        c->cli_parent = srv; 
 
                if (pthread_create(&c->cli_tid, NULL, rpc_srv_dispatchCall, c)) { 
                        LOGERR; 
                        continue; 
                } 
         }          }
   
           /* main rpc loop */
           schedRun(srv->srv_root, &srv->srv_kill);
         return 0;          return 0;
 }  }
   
 // ---------------------------------------------------------  
   
 /*  /*
  * rpc_srv_execCall() Execute registered call from RPC server   * rpc_srv_execCall() Execute registered call from RPC server
    *
  * @call = Register RPC call   * @call = Register RPC call
  * @rpc = IN RPC call structure   * @rpc = IN RPC call structure
 * @args = IN RPC call array of rpc values * @args = IN RPC calling arguments from RPC client
  * return: -1 error, !=-1 ok   * return: -1 error, !=-1 ok
  */   */
 int  int
 rpc_srv_execCall(rpc_func_t * __restrict call, struct tagRPCCall * __restrict rpc,   rpc_srv_execCall(rpc_func_t * __restrict call, struct tagRPCCall * __restrict rpc, 
                rpc_val_t * __restrict args)                array_t * __restrict args)
 {  {
         void *dl;  
         rpc_callback_t func;          rpc_callback_t func;
         int ret;  
   
        if (!call || !rpc || !call->func_parent) {        if (!call || !rpc || !call->func_parent || !AIT_ADDR(&call->func_name)) {
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t exec call from RPC server ...\n");                rpc_SetErr(EINVAL, "Invalid parameter can`t exec function");
                 return -1;                  return -1;
         }          }
   
        dl = dlopen((char*) (*call->func_file ? call->func_file : NULL), RTLD_NOW);        func = AIT_GET_LIKE(&call->func_name, rpc_callback_t);
        if (!dl) {        return func(call, ntohs(rpc->call_argc), args);
                rpc_SetErr(ENOENT, "Error:: Can`t attach module %s!\n", dlerror()); 
                return -1; 
        } 
 
        func = dlsym(dl, (char*) call->func_name); 
        if (func) 
                ret = func(call, rpc->call_argc, args); 
        else { 
                rpc_SetErr(ENOEXEC, "Error:: Can`t find function %s!\n", dlerror()); 
                ret = -1; 
        } 
 
        dlclose(dl); 
        return ret; 
 }  }

Removed from v.1.1.1.1.2.12  
changed lines
  Added in v.1.9.2.16


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>