Diff for /libaitrpc/src/srv.c between versions 1.3.2.9 and 1.23.6.2

version 1.3.2.9, 2011/08/19 12:51:50 version 1.23.6.2, 2014/11/17 23:51:26
Line 12  terms: Line 12  terms:
 All of the documentation and software included in the ELWIX and AITNET  All of the documentation and software included in the ELWIX and AITNET
 Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>  Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
   
Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011Copyright 2004 - 2014
         by Michael Pounov <misho@elwix.org>.  All rights reserved.          by Michael Pounov <misho@elwix.org>.  All rights reserved.
   
 Redistribution and use in source and binary forms, with or without  Redistribution and use in source and binary forms, with or without
Line 46  SUCH DAMAGE. Line 46  SUCH DAMAGE.
 #include "global.h"  #include "global.h"
   
   
static void */* SOCK_STREAM */
rpc_srv_dispatchCall(void *arg)static void *acceptClients(sched_task_t *);
 static void *closeClient(sched_task_t *);
 static void *rxPacket(sched_task_t *);
 static void *txPacket(sched_task_t *);
 
 /* SOCK_DGRAM */
 static void *freeClient(sched_task_t *);
 static void *rxUDPPacket(sched_task_t *);
 static void *txUDPPacket(sched_task_t *);
 
 /* SOCK_RAW */
 
 /* SOCK_BPF */
 static void *rxBPFPacket(sched_task_t *);
 static void *txBPFPacket(sched_task_t *);
 
 static sched_task_func_t cbProto[SOCK_BPF + 1][4] = {
         { acceptClients, closeClient, rxPacket, txPacket },     /* SOCK_STREAM */
         { acceptClients, closeClient, rxPacket, txPacket },     /* SOCK_STREAM */
         { rxUDPPacket, freeClient, rxUDPPacket, txUDPPacket },  /* SOCK_DGRAM */
         { NULL, NULL, NULL, NULL },                             /* SOCK_RAW */
         { rxBPFPacket, freeClient, rxBPFPacket, txBPFPacket }   /* SOCK_BPF */
 };
 
 /* Global Signal Argument when kqueue support disabled */
 
 static volatile uintptr_t _glSigArg = 0;
 
 
 void
 rpc_freeCli(rpc_cli_t * __restrict c)
 {  {
        rpc_cli_t *c = arg;        rpc_srv_t *s = c->cli_parent;
        rpc_srv_t *s;
        rpc_val_t *vals = NULL, *v = NULL;        schedCancelby(s->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
        rpc_func_t *f = NULL;
        struct tagRPCCall *rpc;        /* free buffer */
        struct tagRPCRet *rrpc;        AIT_FREE_VAL(&c->cli_buf);
        rpc_sess_t ses = { 0 };
        fd_set fds;        array_Del(s->srv_clients, c->cli_id, 0);
        u_char buf[BUFSIZ], *data;        if (c)
        int ret, argc = 0, Limit = 0;                e_free(c);
 }
 
 
 static inline int
 _check4freeslot(rpc_srv_t * __restrict srv, sockaddr_t * __restrict sa)
 {
         rpc_cli_t *c = NULL;
         register int i;          register int i;
   
        if (!arg) {        /* check free slots for connect */
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t procced RPC client ...\n");        for (i = 0; i < array_Size(srv->srv_clients) && 
                return NULL;                        (c = array(srv->srv_clients, i, rpc_cli_t*)); i++)
        } else                /* check for duplicates */
                s = c->cli_parent;                if (sa && !e_addrcmp(&c->cli_sa, sa, 42))
                         break;
         if (i >= array_Size(srv->srv_clients))
                 return -1;      /* no more free slots! */
   
        do {        return i;
                v = NULL;}
                FD_ZERO(&fds); 
                FD_SET(c->cli_sock, &fds); 
                ret = select(c->cli_sock + 1, &fds, NULL, NULL, NULL); 
                if (ret == -1) { 
                        if (errno == EINTR && s->srv_kill != kill) 
                                continue; 
   
   static rpc_cli_t *
   _allocClient(rpc_srv_t * __restrict srv, sockaddr_t * __restrict sa)
   {
           rpc_cli_t *c = NULL;
           int n;
   
           n = _check4freeslot(srv, sa);
           if (n == -1)
                   return NULL;
           else
                   c = array(srv->srv_clients, n, rpc_cli_t*);
   
           if (!c) {
                   c = e_malloc(sizeof(rpc_cli_t));
                   if (!c) {
                         LOGERR;                          LOGERR;
                        ret = -2;                        srv->srv_kill = 1;
                        break;                        return NULL;
                 } else {
                         memset(c, 0, sizeof(rpc_cli_t));
                         array_Set(srv->srv_clients, n, c);
                         c->cli_id = n;
                         c->cli_parent = srv;
                 }                  }
                memset(buf, 0, BUFSIZ);
                ret = recv(c->cli_sock, buf, BUFSIZ, 0);                /* alloc empty buffer */
                 AIT_SET_BUFSIZ(&c->cli_buf, 0, srv->srv_netbuf);
         }
 
         return c;
 }
 
 
 static void *
 freeClient(sched_task_t *task)
 {
         rpc_freeCli(TASK_ARG(task));
 
         return NULL;
 }
 
 static void *
 closeClient(sched_task_t *task)
 {
         int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock;
 
         rpc_freeCli(TASK_ARG(task));
 
         /* close client socket */
         shutdown(sock, SHUT_RDWR);
         close(sock);
         return NULL;
 }
 
 static void *
 txPacket(sched_task_t *task)
 {
         rpc_cli_t *c = TASK_ARG(task);
         rpc_srv_t *s = c->cli_parent;
         rpc_func_t *f = NULL;
         u_char *buf = AIT_GET_BUF(&c->cli_buf);
         struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
         int ret, estlen, wlen = sizeof(struct tagRPCCall);
         struct pollfd pfd;
 #ifdef TCP_SESSION_TIMEOUT
         struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
 
         schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
         schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                          TASK_ARG(task), ts, TASK_ARG(task), 0);
 #endif
 
         if (rpc->call_argc) {
                 f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
                 if (!f) {
                         rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
 
                         rpc->call_argc ^= rpc->call_argc;
                         rpc->call_rep.ret = RPC_ERROR(-1);
                         rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                 } else {
                         /* calc estimated length */
                         estlen = ait_resideVars(RPC_RETVARS(c)) + wlen;
                         if (estlen > AIT_LEN(&c->cli_buf))
                                 AIT_RE_BUF(&c->cli_buf, estlen);
                         buf = AIT_GET_BUF(&c->cli_buf);
                         rpc = (struct tagRPCCall*) buf;
 
                         rpc->call_argc = htons(array_Size(RPC_RETVARS(c)));
                         /* Go Encapsulate variables */
                         ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen, 
                                         RPC_RETVARS(c));
                         /* Free return values */
                         ait_freeVars(&c->cli_vars);
                         if (ret == -1) {
                                 rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
 
                                 rpc->call_argc ^= rpc->call_argc;
                                 rpc->call_rep.ret = RPC_ERROR(-1);
                                 rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                         } else
                                 wlen += ret;
                 }
         }
 
         rpc->call_len = htonl(wlen);
 
 #if 0
         /* calculate CRC */
         rpc->call_crc ^= rpc->call_crc;
         rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
 #endif
 
         /* send reply */
         pfd.fd = TASK_FD(task);
         pfd.events = POLLOUT;
         for (; wlen > 0; wlen -= ret, buf += ret) {
                 if ((ret = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 || 
                                 pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
                         if (ret)
                                 LOGERR;
                         else
                                 rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
                         /* close connection */
                         schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                         TASK_ARG(task), 0, NULL, 0);
                         return NULL;
                 }
                 ret = send(TASK_FD(task), buf, MIN(wlen, s->srv_netbuf), MSG_NOSIGNAL);
                 if (ret == -1) {                  if (ret == -1) {
                        LOGERR;                        /* close connection */
                        ret = -3;                        schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                        break;                                        TASK_ARG(task), 0, NULL, 0);
                         return NULL;
                 }                  }
                if (!ret) {          // receive EOF        }
                        ret = 0;
                        break;        return NULL;
 }
 
 static void *
 execCall(sched_task_t *task)
 {
         rpc_cli_t *c = TASK_ARG(task);
         rpc_srv_t *s = c->cli_parent;
         rpc_func_t *f = NULL;
         array_t *arr = NULL;
         u_char *buf = AIT_GET_BUF(&c->cli_buf);
         struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
         int argc = ntohs(rpc->call_argc);
 
         /* Go decapsulate variables ... */
         if (argc) {
                 arr = ait_buffer2vars(buf + sizeof(struct tagRPCCall), 
                                 AIT_LEN(&c->cli_buf) - sizeof(struct tagRPCCall), argc, 42);
                 if (!arr) {
                         rpc_SetErr(ERPCMISMATCH, "#%d - %s", elwix_GetErrno(), elwix_GetError());
 
                         rpc->call_argc ^= rpc->call_argc;
                         rpc->call_rep.ret = RPC_ERROR(-1);
                         rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                         return NULL;
                 }                  }
                if (ret < sizeof(struct tagRPCCall)) {        } else
                        rpc_SetErr(EMSGSIZE, "Error:: too short RPC packet ...\n");                arr = NULL;
                        ret = -4; 
                        break; 
                } else 
                        rpc = (struct tagRPCCall*) buf; 
                // check RPC packet session info 
                if (memcmp(&rpc->call_session, &s->srv_session, sizeof rpc->call_session)) { 
                        rpc_SetErr(EINVAL, "Error:: get invalid RPC session ...\n"); 
                        ret = -5; 
                        goto makeReply; 
                } else 
                        Limit = sizeof(struct tagRPCCall); 
                // RPC is OK! Go decapsulate variables ... 
                if (rpc->call_argc) { 
                        v = (rpc_val_t*) (buf + Limit); 
                        // check RPC packet length 
                        if (rpc->call_argc * sizeof(rpc_val_t) > BUFSIZ - Limit) { 
                                rpc_SetErr(EMSGSIZE, "Error:: Too big RPC packet ...\n"); 
                                ret = -5; 
                                goto makeReply; 
                        } else 
                                Limit += rpc->call_argc * sizeof(rpc_val_t); 
                        // RPC received variables types OK! 
                        data = (u_char*) v + rpc->call_argc * sizeof(rpc_val_t); 
                        for (i = 0; i < rpc->call_argc; i++) { 
                                switch (v[i].val_type) { 
                                        case buffer: 
                                                if (v[i].val_len > BUFSIZ - Limit) { 
                                                        rpc_SetErr(EMSGSIZE, "Error:: Too big RPC packet ...\n"); 
                                                        ret = -5; 
                                                        goto makeReply; 
                                                } else 
                                                        Limit += v[i].val_len; 
   
                                                v[i].val.buffer = data;        if (!(f = rpc_srv_getCall(s, ntohs(rpc->call_tag)))) {
                                                data += v[i].val_len;                rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
                                                break; 
                                        case string: 
                                                if (v[i].val_len > BUFSIZ - Limit) { 
                                                        rpc_SetErr(EMSGSIZE, "Error:: Too big RPC packet ...\n"); 
                                                        ret = -5; 
                                                        goto makeReply; 
                                                } else 
                                                        Limit += v[i].val_len; 
   
                                                v[i].val.string = (int8_t*) data;                rpc->call_argc ^= rpc->call_argc;
                                                data += v[i].val_len;                rpc->call_rep.ret = RPC_ERROR(-1);
                                                break;                rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                                        case blob:        } else {
                                                if (s->srv_blob.state == disable) {                /* if client doesn't want reply */
                                                        rpc_SetErr(ENOTSUP, "Error:: BLOB server is disabled\n");                argc = RPC_CHK_NOREPLY(rpc);
                                                        ret = -5;                rpc->call_rep.ret = RPC_ERROR(rpc_srv_execCall(c, rpc, f->func_name, arr));
                                                        goto makeReply;                if (rpc->call_rep.ret == htonl(-1)) {
                                                }                        if (!rpc->call_rep.eno) {
                                                if (s->srv_blob.state == kill) {                                LOGERR;
                                                        rpc_SetErr(ENOTSUP, "Error:: BLOB server is killed\n");                                rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                                                        ret = -5; 
                                                        goto makeReply; 
                                                } 
                                        default: 
                                                break; 
                                } 
                         }                          }
                           rpc->call_argc ^= rpc->call_argc;
                   } else {
                           rpc->call_rep.eno ^= rpc->call_rep.eno;
                           if (argc) {
                                   /* without reply */
                                   ait_freeVars(&c->cli_vars);
                                   rpc->call_argc ^= rpc->call_argc;
                           } else {
                                   /* reply */
                                   rpc->call_argc = htons(array_Size(RPC_RETVARS(c)));
                           }
                 }                  }
           }
   
                argc = 0;        array_Destroy(&arr);
                vals = NULL;        return NULL;
                memcpy(&ses, &rpc->call_session, sizeof ses);}
                if (!(f = rpc_srv_getCall(s, rpc->call_tag, rpc->call_hash))) {
                        rpc_SetErr(EINVAL, "Error:: call not found into RPC server ...\n");static void *
                        ret = -6;rxPacket(sched_task_t *task)
                } else{
                        if ((ret = rpc_srv_execCall(f, rpc, v)) == -1)        rpc_cli_t *c = TASK_ARG(task);
                                ret = -9;        rpc_srv_t *s = c->cli_parent;
         int len, rlen, noreply, estlen;
 #if 0
         u_short crc;
 #endif
         u_char *buf = AIT_GET_BUF(&c->cli_buf);
         struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
         struct pollfd pfd;
 #ifdef TCP_SESSION_TIMEOUT
         struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
 
         schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
         schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                          TASK_ARG(task), ts, TASK_ARG(task), 0);
 #endif
 
         memset(buf, 0, sizeof(struct tagRPCCall));
         rlen = recv(TASK_FD(task), rpc, sizeof(struct tagRPCCall), MSG_PEEK);
         if (rlen < sizeof(struct tagRPCCall)) {
                 /* close connection */
                 schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                 TASK_ARG(task), 0, NULL, 0);
                 return NULL;
         } else {
                 estlen = ntohl(rpc->call_len);
                 if (estlen > AIT_LEN(&c->cli_buf))
                         AIT_RE_BUF(&c->cli_buf, estlen);
                 rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
                 buf = AIT_GET_BUF(&c->cli_buf);
                 len = estlen;
         }
 
         /* get next part of packet */
         memset(buf, 0, len);
         pfd.fd = TASK_FD(task);
         pfd.events = POLLIN | POLLPRI;
         for (; len > 0; len -= rlen, buf += rlen) {
                 if ((rlen = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 || 
                                 pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
                         if (rlen)
                                 LOGERR;
                         else                          else
                                argc = rpc_srv_getVars(f, &vals);                                rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
makeReply:                        schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                memset(buf, 0, BUFSIZ);                                        TASK_ARG(task), 0, NULL, 0);
                rrpc = (struct tagRPCRet*) buf;                        return NULL;
                Limit = sizeof(struct tagRPCRet);                }
                 rlen = recv(TASK_FD(task), buf, len, 0);
                 if (rlen == -1) {
                         /* close connection */
                         schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                         TASK_ARG(task), 0, NULL, 0);
                         return NULL;
                 }
         }
         len = estlen;
   
                memcpy(&rrpc->ret_session, &ses, sizeof(rpc_sess_t));#if 0
                rrpc->ret_tag = rpc->call_tag;        /* check integrity of packet */
                rrpc->ret_hash = rpc->call_hash;        crc = ntohs(rpc->call_crc);
                rrpc->ret_errno = rpc_Errno;        rpc->call_crc ^= rpc->call_crc;
                rrpc->ret_retcode = ret;        if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
                rrpc->ret_argc = argc;                rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
                 return NULL;
         }
 #endif
   
                if (argc && vals) {        noreply = RPC_CHK_NOREPLY(rpc);
                        v = (rpc_val_t*) (buf + Limit); 
                        if (argc * sizeof(rpc_val_t) > BUFSIZ - Limit) { 
                                for (i = 0; i < argc; i++) 
                                        RPC_FREE_VAL(&vals[i]); 
                                rpc_srv_freeVars(f); 
                                vals = NULL; 
                                argc = 0; 
                                ret = -7; 
                                rpc_SetErr(EMSGSIZE, "Error:: in prepare RPC packet values (-7) ...\n"); 
                                goto makeReply; 
                        } else 
                                Limit += argc * sizeof(rpc_val_t); 
                        memcpy(v, vals, argc * sizeof(rpc_val_t)); 
                        data = (u_char*) v + argc * sizeof(rpc_val_t); 
                        for (ret = i = 0; i < argc; i++) { 
                                switch (vals[i].val_type) { 
                                        case buffer: 
                                                if (ret || Limit + vals[i].val_len > BUFSIZ) { 
                                                        rpc_SetErr(EMSGSIZE, "Error:: in prepare RPC packet (-7) ...\n"); 
                                                        rrpc->ret_retcode = ret = -7; 
                                                        rrpc->ret_argc = 0; 
                                                        break; 
                                                } 
   
                                                memcpy(data, vals[i].val.buffer, vals[i].val_len);        /* check RPC packet session info */
                                                data += vals[i].val_len;        if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) {
                                                Limit += vals[i].val_len;                rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
                                                break; 
                                        case string: 
                                                if (ret || Limit + vals[i].val_len > BUFSIZ) { 
                                                        rpc_SetErr(EMSGSIZE, "Error:: in prepare RPC packet (-7) ...\n"); 
                                                        rrpc->ret_retcode = ret = -7; 
                                                        rrpc->ret_argc = 0; 
                                                        break; 
                                                } 
   
                                                memcpy(data, vals[i].val.string, vals[i].val_len);                rpc->call_argc ^= rpc->call_argc;
                                                data += vals[i].val_len;                rpc->call_rep.ret = RPC_ERROR(-1);
                                                Limit += vals[i].val_len;                rpc->call_rep.eno = RPC_ERROR(errno);
                                                break;        } else {
                                        case blob:                /* execute RPC call */
                                                if (s->srv_blob.state == disable) {                schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), (int) noreply, rpc, len);
                                                        rpc_SetErr(ENOTSUP, "Error:: BLOB server is disabled\n");        }
                                                        rrpc->ret_retcode = ret = -5; 
                                                        rrpc->ret_argc = 0; 
                                                        break; 
                                                } 
                                                if (s->srv_blob.state == kill) { 
                                                        rpc_SetErr(ENOTSUP, "Error:: BLOB server is killed\n"); 
                                                        rrpc->ret_retcode = ret = -5; 
                                                        rrpc->ret_argc = 0; 
                                                        break; 
                                                } 
                                        default: 
                                                break; 
                                } 
   
                                RPC_FREE_VAL(&vals[i]);        /* send RPC reply */
                        }        if (!noreply)
                        rpc_srv_freeVars(f);                schedWrite(TASK_ROOT(task), cbProto[s->srv_proto][CB_TXPACKET], 
                        vals = NULL;                                TASK_ARG(task), TASK_FD(task), rpc, len);
                        argc = 0;
         /* lets get next packet */
         schedReadSelf(task);
         return NULL;
 }
 
 static void *
 acceptClients(sched_task_t *task)
 {
         rpc_srv_t *srv = TASK_ARG(task);
         rpc_cli_t *c = NULL;
         socklen_t salen = sizeof(sockaddr_t);
         int sock;
 #ifdef TCP_SESSION_TIMEOUT
         struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
 #endif
 
         c = _allocClient(srv, NULL);
         if (!c) {
                 EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
                 if ((sock = accept(TASK_FD(task), NULL, NULL)) != -1) {
                         shutdown(sock, SHUT_RDWR);
                         close(sock);
                 }                  }
                   goto end;
           }
   
                ret = send(c->cli_sock, buf, Limit, 0);        /* accept client */
         c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
         if (c->cli_sock == -1) {
                 LOGERR;
                 AIT_FREE_VAL(&c->cli_buf);
                 array_Del(srv->srv_clients, c->cli_id, 42);
                 goto end;
         } else
                 fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
 
 #ifdef TCP_SESSION_TIMEOUT
         /* armed timer for close stateless connection */
         schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
         schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], c, 
                         ts, c, 0);
 #endif
         schedRead(TASK_ROOT(task), cbProto[srv->srv_proto][CB_RXPACKET], c, 
                         c->cli_sock, NULL, 0);
 end:
         schedReadSelf(task);
         return NULL;
 }
 
 
 static void *
 txUDPPacket(sched_task_t *task)
 {
         rpc_cli_t *c = TASK_ARG(task);
         rpc_srv_t *s = c->cli_parent;
         rpc_func_t *f = NULL;
         u_char *buf = AIT_GET_BUF(&c->cli_buf);
         struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
         int ret, estlen, wlen = sizeof(struct tagRPCCall);
         struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
         struct pollfd pfd;
 
         schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
         schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                          TASK_ARG(task), ts, TASK_ARG(task), 0);
 
         if (rpc->call_argc) {
                 f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
                 if (!f) {
                         rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
                         rpc->call_argc ^= rpc->call_argc;
                         rpc->call_rep.ret = RPC_ERROR(-1);
                         rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                 } else {
                         /* calc estimated length */
                         estlen = ait_resideVars(RPC_RETVARS(c)) + wlen;
                         if (estlen > AIT_LEN(&c->cli_buf))
                                 AIT_RE_BUF(&c->cli_buf, estlen);
                         buf = AIT_GET_BUF(&c->cli_buf);
                         rpc = (struct tagRPCCall*) buf;
 
                         rpc->call_argc = htons(array_Size(RPC_RETVARS(c)));
                         /* Go Encapsulate variables */
                         ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen, 
                                         RPC_RETVARS(c));
                         /* Free return values */
                         ait_freeVars(&c->cli_vars);
                         if (ret == -1) {
                                 rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
                                 rpc->call_argc ^= rpc->call_argc;
                                 rpc->call_rep.ret = RPC_ERROR(-1);
                                 rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                         } else
                                 wlen += ret;
                 }
         }
 
         rpc->call_len = htonl(wlen);
 
         /* calculate CRC */
         rpc->call_crc ^= rpc->call_crc;
         rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
 
         /* send reply */
         pfd.fd = TASK_FD(task);
         pfd.events = POLLOUT;
         for (; wlen > 0; wlen -= ret, buf += ret) {
                 if ((ret = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 || 
                                 pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
                         if (ret)
                                 LOGERR;
                         else
                                 rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
                         /* close connection */
                         schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                         TASK_ARG(task), 0, NULL, 0);
                         return NULL;
                 }
                 ret = sendto(TASK_FD(task), buf, MIN(wlen, s->srv_netbuf), MSG_NOSIGNAL, 
                                 &c->cli_sa.sa, c->cli_sa.sa.sa_len);
                 if (ret == -1) {                  if (ret == -1) {
                        LOGERR;                        /* close connection */
                        ret = -8;                        schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                        break;                                        TASK_ARG(task), 0, NULL, 0);
                         return NULL;
                 }                  }
                if (ret != Limit) {        }
                        rpc_SetErr(ECANCELED, "Error:: in send RPC request, should be send %d bytes, "
                                        "really is %d\n", Limit, ret);        return NULL;
                        ret = -9;}
                        break;
 static void *
 rxUDPPacket(sched_task_t *task)
 {
         rpc_srv_t *srv = TASK_ARG(task);
         rpc_cli_t *c = NULL;
         int len, rlen, noreply, estlen;
         u_short crc;
         u_char *buf, b[sizeof(struct tagRPCCall)];
         struct tagRPCCall *rpc = (struct tagRPCCall*) b;
         sockaddr_t sa;
         socklen_t salen;
         struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
         struct pollfd pfd;
 
         /* receive connect packet */
         salen = sa.ss.ss_len = sizeof(sockaddr_t);
         rlen = recvfrom(TASK_FD(task), b, sizeof b, MSG_PEEK, &sa.sa, &salen);
         if (rlen < sizeof(struct tagRPCCall)) {
                 rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
                 goto end;
         }
 
         c = _allocClient(srv, &sa);
         if (!c) {
                 EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
                 usleep(2000);   /* blocked client delay */
                 goto end;
         } else {
                 estlen = ntohl(rpc->call_len);
                 if (estlen > AIT_LEN(&c->cli_buf))
                         AIT_RE_BUF(&c->cli_buf, estlen);
                 rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
                 buf = AIT_GET_BUF(&c->cli_buf);
                 len = estlen;
 
                 c->cli_sock = TASK_FD(task);
                 memcpy(&c->cli_sa, &sa, sizeof c->cli_sa);
 
                 /* armed timer for close stateless connection */
                 schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
                 schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], 
                                 c, ts, c, 0);
         }
 
         /* get next part of packet */
         memset(buf, 0, len);
         pfd.fd = TASK_FD(task);
         pfd.events = POLLIN | POLLPRI;
         for (; len > 0; len -= rlen, buf += rlen) {
                 if ((rlen = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 || 
                                 pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
                         if (rlen)
                                 LOGERR;
                         else
                                 rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
                         schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], 
                                         c, 0, NULL, 0);
                         return NULL;
                 }                  }
        } while (ret > -1 || s->srv_kill != kill);                salen = sa.ss.ss_len = sizeof(sockaddr_t);
                 rlen = recvfrom(TASK_FD(task), buf, len, 0, &sa.sa, &salen);
                 if (rlen == -1) {
                         /* close connection */
                         schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], 
                                         c, 0, NULL, 0);
                         return NULL;
                 }
                 if (e_addrcmp(&c->cli_sa, &sa, 42))
                         rlen ^= rlen;   /* skip if arrive from different address */
         }
         len = estlen;
   
        shutdown(c->cli_sock, SHUT_RDWR);        /* check integrity of packet */
        close(c->cli_sock);        crc = ntohs(rpc->call_crc);
        memset(c, 0, sizeof(rpc_cli_t));        rpc->call_crc ^= rpc->call_crc;
        return (void*) (long)ret;        if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
                 rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
                 return NULL;
         }
 
         noreply = RPC_CHK_NOREPLY(rpc);
 
         /* check RPC packet session info */
         if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
                 rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
 
                 rpc->call_argc ^= rpc->call_argc;
                 rpc->call_rep.ret = RPC_ERROR(-1);
                 rpc->call_rep.eno = RPC_ERROR(errno);
         } else {
                 /* execute RPC call */
                 schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
         }
 
         /* send RPC reply */
         if (!noreply)
                 schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], 
                                 c, TASK_FD(task), rpc, len);
 end:
         schedReadSelf(task);
         return NULL;
 }  }
   
   /* ------------------------------------------------------ */
   
   void
   rpc_freeBLOBCli(rpc_cli_t * __restrict c)
   {
           rpc_srv_t *s = c->cli_parent;
   
           schedCancelby(s->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
   
           /* free buffer */
           AIT_FREE_VAL(&c->cli_buf);
   
           array_Del(s->srv_blob.clients, c->cli_id, 0);
           if (c)
                   e_free(c);
   }
   
   
 static void *  static void *
rpc_srv_dispatchVars(void *arg)closeBLOBClient(sched_task_t *task)
 {  {
        rpc_cli_t *c = arg;        int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock;
        rpc_srv_t *s;
         rpc_freeBLOBCli(TASK_ARG(task));
 
         /* close client socket */
         shutdown(sock, SHUT_RDWR);
         close(sock);
         return NULL;
 }
 
 static void *
 txBLOB(sched_task_t *task)
 {
         rpc_cli_t *c = TASK_ARG(task);
         u_char *buf = AIT_GET_BUF(&c->cli_buf);
         int wlen = sizeof(struct tagBLOBHdr);
 
         /* send reply */
         wlen = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);
         if (wlen == -1 || wlen != sizeof(struct tagBLOBHdr)) {
                 /* close blob connection */
                 schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
         }
 
         return NULL;
 }
 
 static void *
 rxBLOB(sched_task_t *task)
 {
         rpc_cli_t *c = TASK_ARG(task);
         rpc_srv_t *s = c->cli_parent;
         rpc_blob_t *b;          rpc_blob_t *b;
        int ret = 0;        struct tagBLOBHdr blob;
        fd_set fds;        int rlen;
        u_char buf[sizeof(struct tagBLOBHdr)]; 
        struct tagBLOBHdr *blob; 
   
        if (!arg) {        memset(&blob, 0, sizeof blob);
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t procced BLOB client ...\n");        rlen = recv(TASK_FD(task), &blob, sizeof blob, 0);
         if (rlen < 1) {
                 /* close blob connection */
                 schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
                 return NULL;                  return NULL;
        } else        }
                s = c->cli_parent; 
   
        do {        /* check BLOB packet */
                // check for disable service at this moment?        if (rlen < sizeof(struct tagBLOBHdr)) {
                if (s->srv_blob.state == disable && s->srv_kill != kill) {                rpc_SetErr(ERPCMISMATCH, "Short BLOB packet");
                        usleep(100000); 
                        pthread_yield(); 
                        continue; 
                } 
   
                FD_ZERO(&fds);                schedReadSelf(task);
                FD_SET(c->cli_sock, &fds);                return NULL;
                ret = select(c->cli_sock + 1, &fds, NULL, NULL, NULL);        }
                if (ret == -1) { 
                        if (errno == EINTR && s->srv_kill != kill && s->srv_blob.state != kill) 
                                continue; 
   
                        LOGERR;        /* check RPC packet session info */
                        ret = -2;        if (rpc_chkPktSession(&blob.hdr_session, &s->srv_session)) {
                 rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
                 blob.hdr_cmd = error;
                 goto end;
         }
 
         /* Go to proceed packet ... */
         switch (blob.hdr_cmd) {
                 case get:
                         if (!(b = rpc_srv_getBLOB(s, ntohl(blob.hdr_var)))) {
                                 rpc_SetErr(EINVAL, "Var=%x not found", ntohl(blob.hdr_var));
                                 blob.hdr_cmd = no;
                                 blob.hdr_ret = RPC_ERROR(-1);
                                 break;
                         } else
                                 blob.hdr_len = htonl(b->blob_len);
 
                         if (rpc_srv_blobMap(s, b) != -1) {
                                 /* deliver BLOB variable to client */
                                 blob.hdr_ret = htonl(rpc_srv_sendBLOB(c, b));
                                 rpc_srv_blobUnmap(b);
                         } else {
                                 blob.hdr_cmd = error;
                                 blob.hdr_ret = RPC_ERROR(-1);
                         }
                         break;                          break;
                }                case set:
                         if ((b = rpc_srv_registerBLOB(s, ntohl(blob.hdr_len), 
                                                         ntohl(blob.hdr_ret)))) {
                                 /* set new BLOB variable for reply :) */
                                 blob.hdr_var = htonl(b->blob_var);
   
                memset(buf, 0, sizeof buf);                                /* receive BLOB from client */
                ret = recv(c->cli_sock, buf, sizeof buf, 0);                                blob.hdr_ret = htonl(rpc_srv_recvBLOB(c, b));
                if (ret == -1) {                                rpc_srv_blobUnmap(b);
                        LOGERR;                        } else {
                        ret = -3;                                blob.hdr_cmd = error;
                                 blob.hdr_ret = RPC_ERROR(-1);
                         }
                         break;                          break;
                }                case unset:
                /* receive EOF, disable or kill service */                        if (rpc_srv_unregisterBLOB(s, ntohl(blob.hdr_var)) == -1) {
                if (!ret || s->srv_blob.state == kill || s->srv_kill == kill) {                                blob.hdr_cmd = error;
                        ret = 0;                                blob.hdr_ret = RPC_ERROR(-1);
                         }
                         break;                          break;
                }                default:
                if (ret < sizeof(struct tagBLOBHdr)) {                        rpc_SetErr(EPROCUNAVAIL, "Unsupported BLOB command %d", blob.hdr_cmd);
                        rpc_SetErr(EMSGSIZE, "Error:: too short BLOB packet ...\n");                        blob.hdr_cmd = error;
                        ret = -4;                        blob.hdr_ret = RPC_ERROR(-1);
                        break;        }
                } else 
                        blob = (struct tagBLOBHdr*) buf; 
                // check BLOB packet session info 
                if (memcmp(&blob->hdr_session, &s->srv_session, sizeof blob->hdr_session)) { 
                        rpc_SetErr(EINVAL, "Error:: get invalid BLOB session ...\n"); 
                        ret = -5; 
                        goto makeReply; 
                } 
                // Go to proceed packet ... 
                switch (blob->hdr_cmd) { 
                        case get: 
                                if (!(b = rpc_srv_getBLOB(s, blob->hdr_var))) { 
                                        rpc_SetErr(EINVAL, "Error:: var (%x) not found into BLOB server ...\n",  
                                                        blob->hdr_var); 
                                        ret = -6; 
                                        break; 
                                } else 
                                        blob->hdr_len = b->blob_len; 
   
                                if (rpc_srv_blobMap(s, b) != -1) {end:
                                        ret = rpc_srv_sendBLOB(c, b);        memcpy(AIT_ADDR(&c->cli_buf), &blob, sizeof blob);
                                        rpc_srv_blobUnmap(b);        schedWrite(TASK_ROOT(task), txBLOB, TASK_ARG(task), TASK_FD(task), NULL, 0);
                                } else        schedReadSelf(task);
                                        ret = -7;        return NULL;
                                break;}
                        case set: 
                                if ((b = rpc_srv_registerBLOB(s, blob->hdr_len))) { 
                                        // set new BLOB variable for reply :) 
                                        blob->hdr_var = b->blob_var; 
   
                                        ret = rpc_srv_recvBLOB(c, b);static void *
                                        rpc_srv_blobUnmap(b);flushBLOB(sched_task_t *task)
                                } else{
                                        ret = -7;        uintptr_t sigArg = atomic_load_acq_ptr(&_glSigArg);
                                break;        rpc_srv_t *srv = sigArg ? (void*) sigArg : TASK_ARG(task);
                        case unset:        rpc_blob_t *b, *tmp;
                                ret = rpc_srv_unregisterBLOB(s, blob->hdr_var); 
                                if (ret == -1) 
                                        ret = -7; 
                                break; 
                        default: 
                                rpc_SetErr(EINVAL, "Error:: unsupported BLOB command (%d)...\n",  
                                                blob->hdr_cmd); 
                                ret = -7; 
                } 
   
makeReply:        TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
                // Replay to client!                TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
                blob->hdr_cmd = ret < 0 ? error : ok;
                blob->hdr_ret = ret;                rpc_srv_blobFree(srv, b);
                ret = send(c->cli_sock, buf, sizeof buf, 0);                e_free(b);
                if (ret == -1) {        }
                        LOGERR;
                        ret = -8;        if (!schedSignalSelf(task)) {
                        break;                /* disabled kqueue support in libaitsched */
                 struct sigaction sa;
 
                 memset(&sa, 0, sizeof sa);
                 sigemptyset(&sa.sa_mask);
                 sa.sa_handler = (void (*)(int)) flushBLOB;
                 sa.sa_flags = SA_RESTART | SA_RESETHAND;
                 sigaction(SIGFBLOB, &sa, NULL);
         }
 
         return NULL;
 }
 
 static void *
 acceptBLOBClients(sched_task_t *task)
 {
         rpc_srv_t *srv = TASK_ARG(task);
         rpc_cli_t *c = NULL;
         register int i;
         socklen_t salen = sizeof(sockaddr_t);
         int sock;
 #ifdef TCP_NOPUSH
         int n = 1;
 #endif
 
         /* check free slots for connect */
         for (i = 0; i < array_Size(srv->srv_blob.clients) && 
                         (c = array(srv->srv_blob.clients, i, rpc_cli_t*)); i++);
         if (c) {    /* no more free slots! */
                 EVERBOSE(1, "BLOB client quota exceeded! Connection will be shutdown!\n");
                 if ((sock = accept(TASK_FD(task), NULL, NULL)) != -1) {
                         shutdown(sock, SHUT_RDWR);
                         close(sock);
                 }                  }
                if (ret != sizeof buf) {                goto end;
                        rpc_SetErr(ECANCELED, "Error:: in send BLOB reply, should be send %d bytes, "        }
                                        "really is %d\n", sizeof buf, ret); 
                        ret = -9; 
                        break; 
                } 
        } while (ret > -1 || s->srv_kill != kill); 
   
        shutdown(c->cli_sock, SHUT_RDWR);        c = e_malloc(sizeof(rpc_cli_t));
        close(c->cli_sock);        if (!c) {
        memset(c, 0, sizeof(rpc_cli_t));                LOGERR;
        return (void*) (long)ret;                srv->srv_kill = srv->srv_blob.kill = 1;
                 return NULL;
         } else {
                 memset(c, 0, sizeof(rpc_cli_t));
                 array_Set(srv->srv_blob.clients, i, c);
                 c->cli_id = i;
                 c->cli_parent = srv;
         }
 
         /* alloc empty buffer */
         AIT_SET_BUFSIZ(&c->cli_buf, 0, srv->srv_netbuf);
 
         /* accept client */
         c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
         if (c->cli_sock == -1) {
                 LOGERR;
                 AIT_FREE_VAL(&c->cli_buf);
                 array_Del(srv->srv_blob.clients, i, 42);
                 goto end;
         } else {
 #ifdef TCP_NOPUSH
                 setsockopt(c->cli_sock, IPPROTO_TCP, TCP_NOPUSH, &n, sizeof n);
 #endif
                 fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
         }
 
         schedRead(TASK_ROOT(task), rxBLOB, c, c->cli_sock, NULL, 0);
 end:
         schedReadSelf(task);
         return NULL;
 }  }
   
// -------------------------------------------------/* ------------------------------------------------------ */
   
 /*  /*
 * rpc_srv_initBLOBServer() Init & create BLOB Server * rpc_srv_initBLOBServer() - Init & create BLOB Server
  *
  * @srv = RPC server instance   * @srv = RPC server instance
  * @Port = Port for bind server, if Port == 0 default port is selected   * @Port = Port for bind server, if Port == 0 default port is selected
  * @diskDir = Disk place for BLOB file objects   * @diskDir = Disk place for BLOB file objects
Line 402  int Line 867  int
 rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)  rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)
 {  {
         int n = 1;          int n = 1;
         struct sockaddr sa;  
         struct sockaddr_in *sin = (struct sockaddr_in*) &sa;  
         struct sockaddr_in6 *sin6 = (struct sockaddr_in6*) &sa;  
         struct sockaddr_un *sun = (struct sockaddr_un*) &sa;  
   
        if (!srv) {        if (!srv || srv->srv_kill) {
                rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t init BLOB server ...\n");                rpc_SetErr(EINVAL, "Invalid parameters can`t init BLOB server");
                 return -1;                  return -1;
         }          }
         if (srv->srv_blob.state) {  
                 rpc_SetErr(EPERM, "Warning:: Already started BLOB server!\n");  
                 return 0;  
         }  
         if (!Port)  
                 Port = RPC_DEFPORT + 1;  
   
         memset(&srv->srv_blob, 0, sizeof srv->srv_blob);          memset(&srv->srv_blob, 0, sizeof srv->srv_blob);
         if (access(diskDir, R_OK | W_OK) == -1) {          if (access(diskDir, R_OK | W_OK) == -1) {
                 LOGERR;                  LOGERR;
                 return -1;                  return -1;
         } else          } else
                strlcpy(srv->srv_blob.dir, diskDir, UCHAR_MAX + 1);                AIT_SET_STR(&srv->srv_blob.dir, diskDir);
   
        srv->srv_blob.server.cli_tid = pthread_self();        /* init blob list */
         TAILQ_INIT(&srv->srv_blob.blobs);
 
         srv->srv_blob.server.cli_parent = srv;          srv->srv_blob.server.cli_parent = srv;
   
        memcpy(&sa, &srv->srv_server.cli_sa, sizeof sa);        memcpy(&srv->srv_blob.server.cli_sa, &srv->srv_server.cli_sa, sizeof(sockaddr_t));
        switch (srv->srv_server.cli_sa.sa_family) {        switch (srv->srv_blob.server.cli_sa.sa.sa_family) {
                 case AF_INET:                  case AF_INET:
                        sin->sin_port = htons(Port);                        srv->srv_blob.server.cli_sa.sin.sin_port = 
                        memcpy(&srv->srv_blob.server.cli_sa, sin, sizeof(struct sockaddr));                                htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin.sin_port) + 1);
                         break;                          break;
                 case AF_INET6:                  case AF_INET6:
                        sin6->sin6_port = htons(Port);                        srv->srv_blob.server.cli_sa.sin6.sin6_port = 
                        memcpy(&srv->srv_blob.server.cli_sa, sin6, sizeof(struct sockaddr));                                htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin6.sin6_port) + 1);
                         break;                          break;
                 case AF_LOCAL:                  case AF_LOCAL:
                        strlcat(sun->sun_path, ".blob", sizeof sun->sun_path);                        strlcat(srv->srv_blob.server.cli_sa.sun.sun_path, ".blob", 
                        memcpy(&srv->srv_blob.server.cli_sa, sun, sizeof(struct sockaddr));                                        sizeof srv->srv_blob.server.cli_sa.sun.sun_path);
                         break;                          break;
                 default:                  default:
                           AIT_FREE_VAL(&srv->srv_blob.dir);
                         return -1;                          return -1;
         }          }
   
         /* create BLOB server socket */          /* create BLOB server socket */
        srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa_family, SOCK_STREAM, 0);        srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);
         if (srv->srv_blob.server.cli_sock == -1) {          if (srv->srv_blob.server.cli_sock == -1) {
                 LOGERR;                  LOGERR;
                   AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;                  return -1;
         }          }
         if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {          if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
                 LOGERR;                  LOGERR;
                 close(srv->srv_blob.server.cli_sock);                  close(srv->srv_blob.server.cli_sock);
                   AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;                  return -1;
         }          }
        if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa,         n = srv->srv_netbuf;
                                sizeof srv->srv_blob.server.cli_sa) == -1) {        if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
                 LOGERR;                  LOGERR;
                 close(srv->srv_blob.server.cli_sock);                  close(srv->srv_blob.server.cli_sock);
                   AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;                  return -1;
         }          }
        if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
        /* allocate pool for concurent clients */ 
        srv->srv_blob.clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t)); 
        if (!srv->srv_blob.clients) { 
                 LOGERR;                  LOGERR;
                 close(srv->srv_blob.server.cli_sock);                  close(srv->srv_blob.server.cli_sock);
                   AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;                  return -1;
           }
           if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa, 
                                   srv->srv_blob.server.cli_sa.sa.sa_len) == -1) {
                   LOGERR;
                   close(srv->srv_blob.server.cli_sock);
                   AIT_FREE_VAL(&srv->srv_blob.dir);
                   return -1;
         } else          } else
                memset(srv->srv_blob.clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));                fcntl(srv->srv_blob.server.cli_sock, F_SETFL, 
                                 fcntl(srv->srv_blob.server.cli_sock, F_GETFL) | O_NONBLOCK);
   
         pthread_mutex_init(&srv->srv_blob.mtx, NULL);  
   
        pthread_mutex_lock(&srv->srv_mtx);        /* allocate pool for concurent blob clients */
        rpc_srv_registerCall(srv, NULL, CALL_BLOBSHUTDOWN, 0);        srv->srv_blob.clients = array_Init(array_Size(srv->srv_clients));
        rpc_srv_registerCall(srv, NULL, CALL_BLOBCLIENTS, 0);        if (!srv->srv_blob.clients) {
        rpc_srv_registerCall(srv, NULL, CALL_BLOBVARS, 0);                rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
        rpc_srv_registerCall(srv, NULL, CALL_BLOBSTATE, 1);                close(srv->srv_blob.server.cli_sock);
        pthread_mutex_unlock(&srv->srv_mtx);                AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;
         }
   
        srv->srv_blob.state = enable;  // enable BLOB        /* init blob scheduler */
         srv->srv_blob.root = schedBegin();
         if (!srv->srv_blob.root) {
                 rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                 array_Destroy(&srv->srv_blob.clients);
                 close(srv->srv_blob.server.cli_sock);
                 AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;
         }
 
         return 0;          return 0;
 }  }
   
 /*  /*
 * rpc_srv_endBLOBServer() Destroy BLOB server, close all opened sockets and free resources * rpc_srv_endBLOBServer() - Destroy BLOB server, close all opened sockets and free resources
  *
  * @srv = RPC Server instance   * @srv = RPC Server instance
  * return: none   * return: none
  */   */
 void  void
 rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)  rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
 {  {
        rpc_cli_t *c;        if (!srv)
        register int i; 
        rpc_blob_t *f; 
 
        if (!srv) { 
                rpc_SetErr(EINVAL, "Error:: Can`t destroy server because parameter is null!\n"); 
                 return;                  return;
         } else  
                 srv->srv_blob.state = kill;  
   
        rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSHUTDOWN);        srv->srv_blob.kill = 1;
        rpc_srv_unregisterCall(srv, NULL, CALL_BLOBCLIENTS); 
        rpc_srv_unregisterCall(srv, NULL, CALL_BLOBVARS); 
        rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSTATE); 
   
        /* close all clients connections & server socket */        schedEnd(&srv->srv_blob.root);
        for (i = 0, c = srv->srv_blob.clients; i < srv->srv_numcli && c; i++, c++) 
                if (c->cli_sa.sa_family) 
                        shutdown(c->cli_sock, SHUT_RDWR); 
        close(srv->srv_blob.server.cli_sock); 
 
        if (srv->srv_blob.clients) { 
                free(srv->srv_blob.clients); 
                srv->srv_blob.clients = NULL; 
        } 
 
        /* detach blobs */ 
        pthread_mutex_lock(&srv->srv_blob.mtx); 
        while ((f = srv->srv_blob.blobs)) { 
                srv->srv_blob.blobs = f->blob_next; 
                rpc_srv_blobFree(srv, f); 
                free(f); 
        } 
        pthread_mutex_unlock(&srv->srv_blob.mtx); 
 
        while (pthread_mutex_trylock(&srv->srv_blob.mtx) == EBUSY); 
        pthread_mutex_destroy(&srv->srv_blob.mtx); 
 }  }
   
 /*  /*
 * rpc_srv_execBLOBServer() Execute Main BLOB server loop and wait for clients requests * rpc_srv_loopBLOBServer() - Execute Main BLOB server loop and wait for clients requests
  *
  * @srv = RPC Server instance   * @srv = RPC Server instance
  * return: -1 error or 0 ok, infinite loop ...   * return: -1 error or 0 ok, infinite loop ...
  */   */
 int  int
rpc_srv_execBLOBServer(rpc_srv_t * __restrict srv)rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv)
 {  {
         socklen_t salen = sizeof(struct sockaddr);  
         register int i;  
         rpc_cli_t *c;          rpc_cli_t *c;
        fd_set fds;        register int i;
        int ret;        rpc_blob_t *b, *tmp;
        struct timeval tv = { DEF_RPC_TIMEOUT, 0 };        struct timespec ts = { RPC_SCHED_POLLING, 0 };
   
        if (!srv || srv->srv_blob.state == kill) {        if (!srv || srv->srv_kill) {
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start BLOB server ...\n");                rpc_SetErr(EINVAL, "Invalid parameter can`t start BLOB server");
                 return -1;                  return -1;
         }          }
   
        if (listen(srv->srv_blob.server.cli_sock, SOMAXCONN) == -1) {        if (listen(srv->srv_blob.server.cli_sock, array_Size(srv->srv_blob.clients)) == -1) {
                 LOGERR;                  LOGERR;
                 return -1;                  return -1;
         }          }
   
        while (srv->srv_blob.state != kill && srv->srv_kill != kill) {        if (!schedSignal(srv->srv_blob.root, flushBLOB, srv, SIGFBLOB, NULL, 0)) {
                for (c = srv->srv_blob.clients, i = 0; i < srv->srv_numcli && c; i++, c++)                /* disabled kqueue support in libaitsched */
                        if (!c->cli_sa.sa_family)                struct sigaction sa;
                                break; 
                if (i >= srv->srv_numcli) { 
                        usleep(1000000); 
                        continue; 
                } 
   
                FD_ZERO(&fds);                atomic_store_rel_ptr(&_glSigArg, (uintptr_t) srv);
                FD_SET(srv->srv_blob.server.cli_sock, &fds); 
                ret = select(srv->srv_blob.server.cli_sock + 1, &fds, NULL, NULL, &tv); 
                if (ret == -1) { 
                        LOGERR; 
                        ret = 1; 
                        break; 
                } 
                if (!ret) 
                        continue; 
   
                c->cli_sock = accept(srv->srv_blob.server.cli_sock, &c->cli_sa, &salen);                memset(&sa, 0, sizeof sa);
                if (c->cli_sock == -1) {                sigemptyset(&sa.sa_mask);
                        LOGERR;                sa.sa_handler = (void (*)(int)) flushBLOB;
                        continue;                sa.sa_flags = SA_RESTART | SA_RESETHAND;
                } else                sigaction(SIGFBLOB, &sa, NULL);
                        c->cli_parent = srv;        }
   
                if (pthread_create(&c->cli_tid, NULL, rpc_srv_dispatchVars, c)) {        if (!schedRead(srv->srv_blob.root, acceptBLOBClients, srv, 
                        LOGERR;                                srv->srv_blob.server.cli_sock, NULL, 0)) {
                        continue;                rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                } else                return -1;
                        pthread_detach(c->cli_tid); 
         }          }
   
        srv->srv_blob.state = disable;        schedPolling(srv->srv_blob.root, &ts, NULL);
         /* main rpc loop */
         schedRun(srv->srv_blob.root, &srv->srv_blob.kill);
   
           /* detach blobs */
           TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
                   TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
   
                   rpc_srv_blobFree(srv, b);
                   e_free(b);
           }
   
           /* close all clients connections & server socket */
           for (i = 0; i < array_Size(srv->srv_blob.clients); i++) {
                   c = array(srv->srv_blob.clients, i, rpc_cli_t*);
                   if (c) {
                           shutdown(c->cli_sock, SHUT_RDWR);
                           close(c->cli_sock);
   
                           schedCancelby(srv->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
                           AIT_FREE_VAL(&c->cli_buf);
                   }
                   array_Del(srv->srv_blob.clients, i, 42);
           }
           array_Destroy(&srv->srv_blob.clients);
   
           close(srv->srv_blob.server.cli_sock);
   
           AIT_FREE_VAL(&srv->srv_blob.dir);
         return 0;          return 0;
 }  }
   
   
 /*  /*
 * rpc_srv_initServer() Init & create RPC Server * rpc_srv_initServer() - Init & create RPC Server
 * @regProgID = ProgramID for authentication & recognition *
 * @regProcID = ProcessID for authentication & recognition * @InstID = Instance for authentication & recognition
  * @concurentClients = Concurent clients at same time to this server   * @concurentClients = Concurent clients at same time to this server
 * @family = Family type, AF_INET, AF_INET6 or AF_LOCAL * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
  * @csHost = Host name or address for bind server, if NULL any address   * @csHost = Host name or address for bind server, if NULL any address
  * @Port = Port for bind server, if Port == 0 default port is selected   * @Port = Port for bind server, if Port == 0 default port is selected
    * @proto = Protocol, if == 0 choose SOCK_STREAM
  * return: NULL == error or !=NULL bind and created RPC server instance   * return: NULL == error or !=NULL bind and created RPC server instance
  */   */
 rpc_srv_t *  rpc_srv_t *
rpc_srv_initServer(u_int regProgID, u_int regProcID, int concurentClients, rpc_srv_initServer(u_char InstID, int concurentClients, int netBuf, 
                u_short family, const char *csHost, u_short Port)                const char *csHost, u_short Port, int proto)
 {  {
         rpc_srv_t *srv = NULL;  
         int n = 1;          int n = 1;
        struct hostent *host = NULL;        rpc_srv_t *srv = NULL;
        struct sockaddr sa;        sockaddr_t sa = E_SOCKADDR_INIT;
        struct sockaddr_in *sin = (struct sockaddr_in*) &sa; 
        struct sockaddr_in6 *sin6 = (struct sockaddr_in6*) &sa; 
        struct sockaddr_un *sun = (struct sockaddr_un*) &sa; 
   
        if (!concurentClients || !regProgID ||         if (!concurentClients || (proto < 0 || proto > SOCK_DGRAM)) {
                        (family != AF_INET && family != AF_INET6 && family != AF_LOCAL)) {                rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
                rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t init RPC server ...\n"); 
                 return NULL;                  return NULL;
         }          }
           if (!e_gethostbyname(csHost, Port, &sa))
                   return NULL;
         if (!Port)          if (!Port)
                 Port = RPC_DEFPORT;                  Port = RPC_DEFPORT;
        if (csHost && family != AF_LOCAL) {        if (!proto)
                host = gethostbyname2(csHost, family);                proto = SOCK_STREAM;
                if (!host) {        if (netBuf < RPC_MIN_BUFSIZ)
                        rpc_SetErr(h_errno, "Error:: %s\n", hstrerror(h_errno));                netBuf = BUFSIZ;
                        return NULL;        else
                }                netBuf = E_ALIGN(netBuf, 2);    /* align netBuf length */
        } 
        memset(&sa, 0, sizeof sa); 
        sa.sa_family = family; 
        switch (family) { 
                case AF_INET: 
                        sin->sin_len = sizeof(struct sockaddr_in); 
                        sin->sin_port = htons(Port); 
                        if (csHost) 
                                memcpy(&sin->sin_addr, host->h_addr, host->h_length); 
                        break; 
                case AF_INET6: 
                        sin6->sin6_len = sizeof(struct sockaddr_in6); 
                        sin6->sin6_port = htons(Port); 
                        if (csHost) 
                                memcpy(&sin6->sin6_addr, host->h_addr, host->h_length); 
                        break; 
                case AF_LOCAL: 
                        sun->sun_len = sizeof(struct sockaddr_un); 
                        if (csHost) 
                                strlcpy(sun->sun_path, csHost, sizeof sun->sun_path); 
                        break; 
                default: 
                        rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t start RPC server ...\n"); 
                        return NULL; 
        } 
   
        srv = malloc(sizeof(rpc_srv_t));#ifdef HAVE_SRANDOMDEV
         srandomdev();
 #else
         time_t tim;
 
         srandom((time(&tim) ^ getpid()));
 #endif
 
         srv = e_malloc(sizeof(rpc_srv_t));
         if (!srv) {          if (!srv) {
                 LOGERR;                  LOGERR;
                 return NULL;                  return NULL;
         } else          } else
                 memset(srv, 0, sizeof(rpc_srv_t));                  memset(srv, 0, sizeof(rpc_srv_t));
   
        srv->srv_numcli = concurentClients;        srv->srv_proto = proto;
         srv->srv_netbuf = netBuf;
         srv->srv_session.sess_version = RPC_VERSION;          srv->srv_session.sess_version = RPC_VERSION;
        srv->srv_session.sess_program = regProgID;        srv->srv_session.sess_instance = InstID;
        srv->srv_session.sess_process = regProcID; 
   
         srv->srv_server.cli_tid = pthread_self();  
         srv->srv_server.cli_parent = srv;          srv->srv_server.cli_parent = srv;
        switch (family) {        memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa);
                case AF_INET:
                        memcpy(&srv->srv_server.cli_sa, sin, sizeof srv->srv_server.cli_sa);        /* init functions */
                        break;        pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
                case AF_INET6:        SLIST_INIT(&srv->srv_funcs);
                        memcpy(&srv->srv_server.cli_sa, sin6, sizeof srv->srv_server.cli_sa);        AVL_INIT(&srv->srv_funcs);
                        break;
                case AF_LOCAL:        /* init scheduler */
                        memcpy(&srv->srv_server.cli_sa, sun, sizeof srv->srv_server.cli_sa);        srv->srv_root = schedBegin();
                        unlink(sun->sun_path);        if (!srv->srv_root) {
                        break;                rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                 pthread_mutex_destroy(&srv->srv_funcs.mtx);
                 e_free(srv);
                 return NULL;
         }          }
   
           /* init pool for clients */
           srv->srv_clients = array_Init(concurentClients);
           if (!srv->srv_clients) {
                   rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
                   schedEnd(&srv->srv_root);
                   pthread_mutex_destroy(&srv->srv_funcs.mtx);
                   e_free(srv);
                   return NULL;
           }
   
         /* create server socket */          /* create server socket */
        srv->srv_server.cli_sock = socket(family, SOCK_STREAM, 0);        srv->srv_server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, srv->srv_proto, 0);
         if (srv->srv_server.cli_sock == -1) {          if (srv->srv_server.cli_sock == -1) {
                 LOGERR;                  LOGERR;
                free(srv);                array_Destroy(&srv->srv_clients);
                 schedEnd(&srv->srv_root);
                 pthread_mutex_destroy(&srv->srv_funcs.mtx);
                 e_free(srv);
                 return NULL;                  return NULL;
         }          }
         if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {          if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
                 LOGERR;                  LOGERR;
                close(srv->srv_server.cli_sock);                goto err;
                free(srv); 
                return NULL; 
         }          }
        if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa, sizeof srv->srv_server.cli_sa) == -1) {        n = srv->srv_netbuf;
         if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
                 LOGERR;                  LOGERR;
                close(srv->srv_server.cli_sock);                goto err;
                free(srv); 
                return NULL; 
         }          }
        if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
        /* allocate pool for concurent clients */ 
        srv->srv_clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t)); 
        if (!srv->srv_clients) { 
                 LOGERR;                  LOGERR;
                close(srv->srv_server.cli_sock);                goto err;
                free(srv);        }
                return NULL;        if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa, 
                                 srv->srv_server.cli_sa.sa.sa_len) == -1) {
                 LOGERR;
                 goto err;
         } else          } else
                memset(srv->srv_clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));                fcntl(srv->srv_server.cli_sock, F_SETFL, 
                                 fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
   
        pthread_mutex_init(&srv->srv_mtx, NULL);        rpc_register_srvPing(srv);
   
         rpc_srv_registerCall(srv, NULL, CALL_SRVSHUTDOWN, 0);  
         rpc_srv_registerCall(srv, NULL, CALL_SRVCLIENTS, 0);  
         rpc_srv_registerCall(srv, NULL, CALL_SRVCALLS, 0);  
         rpc_srv_registerCall(srv, NULL, CALL_SRVSESSIONS, 0);  
         return srv;          return srv;
   err:    /* error condition */
           close(srv->srv_server.cli_sock);
           array_Destroy(&srv->srv_clients);
           schedEnd(&srv->srv_root);
           pthread_mutex_destroy(&srv->srv_funcs.mtx);
           e_free(srv);
           return NULL;
 }  }
   
 /*  /*
 * rpc_srv_endServer() Destroy RPC server, close all opened sockets and free resources * rpc_srv_endServer() - Destroy RPC server, close all opened sockets and free resources
 * @srv = RPC Server instance *
  * @psrv = RPC Server instance
  * return: none   * return: none
  */   */
 void  void
rpc_srv_endServer(rpc_srv_t * __restrict srv)rpc_srv_endServer(rpc_srv_t ** __restrict psrv)
 {  {
           if (!psrv || !*psrv)
                   return;
   
           /* if send kill to blob server */
           rpc_srv_endBLOBServer(*psrv);
   
           (*psrv)->srv_kill = 1;
           sleep(RPC_SCHED_POLLING);
   
           schedEnd(&(*psrv)->srv_root);
   
           pthread_mutex_destroy(&(*psrv)->srv_funcs.mtx);
           e_free(*psrv);
           *psrv = NULL;
   }
   
   /*
    * rpc_srv_loopServer() - Execute Main server loop and wait for clients requests
    *
    * @srv = RPC Server instance
    * return: -1 error or 0 ok, infinite loop ...
    */
   int
   rpc_srv_loopServer(rpc_srv_t * __restrict srv)
   {
         rpc_cli_t *c;          rpc_cli_t *c;
         register int i;          register int i;
         rpc_func_t *f;          rpc_func_t *f;
           struct timespec ts = { RPC_SCHED_POLLING, 0 };
   
         if (!srv) {          if (!srv) {
                rpc_SetErr(EINVAL, "Error:: Can`t destroy server because parameter is null!\n");                rpc_SetErr(EINVAL, "Invalid parameter can`t start RPC server");
                return;                return -1;
         }          }
   
        rpc_srv_endBLOBServer(srv);        if (srv->srv_proto == SOCK_STREAM)
                 if (listen(srv->srv_server.cli_sock, array_Size(srv->srv_clients)) == -1) {
                         LOGERR;
                         return -1;
                 }
   
           if (!schedRead(srv->srv_root, cbProto[srv->srv_proto][CB_ACCEPTCLIENT], srv, 
                                   srv->srv_server.cli_sock, NULL, 0)) {
                   rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                   return -1;
           }
   
           schedPolling(srv->srv_root, &ts, NULL);
           /* main rpc loop */
           schedRun(srv->srv_root, &srv->srv_kill);
   
         /* close all clients connections & server socket */          /* close all clients connections & server socket */
        for (i = 0, c = srv->srv_clients; i < srv->srv_numcli && c; i++, c++)        for (i = 0; i < array_Size(srv->srv_clients); i++) {
                if (c->cli_sa.sa_family) {                c = array(srv->srv_clients, i, rpc_cli_t*);
                 if (c) {
                         shutdown(c->cli_sock, SHUT_RDWR);                          shutdown(c->cli_sock, SHUT_RDWR);
                         close(c->cli_sock);                          close(c->cli_sock);
                 }  
         close(srv->srv_server.cli_sock);  
   
        if (srv->srv_clients) {                        schedCancelby(srv->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
                free(srv->srv_clients);                        ait_freeVars(&RPC_RETVARS(c));
                srv->srv_clients = NULL;                        AIT_FREE_VAL(&c->cli_buf);
                srv->srv_numcli = 0;                }
                 array_Del(srv->srv_clients, i, 42);
         }          }
           array_Destroy(&srv->srv_clients);
   
           close(srv->srv_server.cli_sock);
   
         /* detach exported calls */          /* detach exported calls */
        pthread_mutex_lock(&srv->srv_mtx);        RPC_FUNCS_LOCK(&srv->srv_funcs);
        while ((f = srv->srv_funcs)) {        while ((f = SLIST_FIRST(&srv->srv_funcs))) {
                srv->srv_funcs = f->func_next;                SLIST_REMOVE_HEAD(&srv->srv_funcs, func_next);
                free(f);
                 AIT_FREE_VAL(&f->func_name);
                 e_free(f);
         }          }
        pthread_mutex_unlock(&srv->srv_mtx);        srv->srv_funcs.avlh_root = NULL;
         RPC_FUNCS_UNLOCK(&srv->srv_funcs);
   
        while (pthread_mutex_trylock(&srv->srv_mtx) == EBUSY);        return 0;
        pthread_mutex_destroy(&srv->srv_mtx); 
 
        free(srv); 
        srv = NULL; 
 }  }
   
   
 /*  /*
 * rpc_srv_execServer() Execute Main server loop and wait for clients requests * rpc_srv_execCall() Execute registered call from RPC server
 * @srv = RPC Server instance *
 * return: -1 error or 0 ok, infinite loop ... * @cli = RPC client
  * @rpc = IN RPC call structure
  * @funcname = Execute RPC function
  * @args = IN RPC calling arguments from RPC client
  * return: -1 error, !=-1 ok
  */   */
 int  int
rpc_srv_execServer(rpc_srv_t * __restrict srv)rpc_srv_execCall(rpc_cli_t * __restrict cli, struct tagRPCCall * __restrict rpc, 
                 ait_val_t funcname, array_t * __restrict args)
 {  {
        socklen_t salen = sizeof(struct sockaddr);        rpc_callback_t func;
        register int i; 
        rpc_cli_t *c; 
        fd_set fds; 
        int ret; 
        struct timeval tv = { DEF_RPC_TIMEOUT, 0 }; 
   
        if (!srv) {        if (!cli || !rpc || !AIT_ADDR(&funcname)) {
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start RPC server ...\n");                rpc_SetErr(EINVAL, "Invalid parameter can`t exec function");
                 return -1;                  return -1;
         }          }
   
        if (listen(srv->srv_server.cli_sock, SOMAXCONN) == -1) {        func = AIT_GET_LIKE(&funcname, rpc_callback_t);
         return func(cli, rpc, args);
 }
 
 
 /*
  * rpc_srv_initServer2() - Init & create layer2 RPC Server
  *
  * @InstID = Instance for authentication & recognition
  * @concurentClients = Concurent clients at same time to this server
  * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
  * @csIface = Interface name for bind server, if NULL first interface on host
  * return: NULL == error or !=NULL bind and created RPC server instance
  */
 rpc_srv_t *
 rpc_srv_initServer2(u_char InstID, int concurentClients, int netBuf, const char *csIface)
 {
         int n = 1;
         rpc_srv_t *srv = NULL;
         sockaddr_t sa = E_SOCKADDR_INIT;
         char szIface[64], szStr[STRSIZ];
         register int i;
         struct ifreq ifr;
 
         if (!concurentClients) {
                 rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
                 return NULL;
         }
         if (!csIface) {
                 if (e_get1stiface(szIface, sizeof szIface))
                         return NULL;
         } else
                 strlcpy(szIface, csIface, sizeof szIface);
         if (e_getifacebyname(szIface, &sa))
                 return NULL;
 
 #ifdef HAVE_SRANDOMDEV
         srandomdev();
 #else
         time_t tim;
 
         srandom((time(&tim) ^ getpid()));
 #endif
 
         srv = e_malloc(sizeof(rpc_srv_t));
         if (!srv) {
                 LOGERR;                  LOGERR;
                return -1;                return NULL;
         } else
                 memset(srv, 0, sizeof(rpc_srv_t));
 
         srv->srv_proto = SOCK_BPF;
         srv->srv_netbuf = netBuf;
         srv->srv_session.sess_version = RPC_VERSION;
         srv->srv_session.sess_instance = InstID;
 
         srv->srv_server.cli_parent = srv;
         memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa);
 
         /* init functions */
         pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
         SLIST_INIT(&srv->srv_funcs);
         AVL_INIT(&srv->srv_funcs);
 
         /* init scheduler */
         srv->srv_root = schedBegin();
         if (!srv->srv_root) {
                 rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                 pthread_mutex_destroy(&srv->srv_funcs.mtx);
                 e_free(srv);
                 return NULL;
         }          }
   
        while (srv->srv_kill != kill) {        /* init pool for clients */
                for (c = srv->srv_clients, i = 0; i < srv->srv_numcli && c; i++, c++)        srv->srv_clients = array_Init(concurentClients);
                        if (!c->cli_sa.sa_family)        if (!srv->srv_clients) {
                                break;                rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
                if (i >= srv->srv_numcli) {                schedEnd(&srv->srv_root);
                        usleep(1000000);                pthread_mutex_destroy(&srv->srv_funcs.mtx);
                        continue;                e_free(srv);
                }                return NULL;
         }
   
                FD_ZERO(&fds);        /* create server handler */
                FD_SET(srv->srv_server.cli_sock, &fds);        for (i = 0; i < 10; i++) {
                ret = select(srv->srv_server.cli_sock + 1, &fds, NULL, NULL, &tv);                memset(szStr, 0, sizeof szStr);
                if (ret == -1) {                snprintf(szStr, sizeof szStr, "/dev/bpf%d", i);
                        LOGERR;                srv->srv_server.cli_sock = open(szStr, O_RDWR);
                        ret = 1;                if (srv->srv_server.cli_sock > STDERR_FILENO)
                         break;                          break;
                }        }
                if (!ret)        if (srv->srv_server.cli_sock < 3) {
                        continue;                LOGERR;
                 array_Destroy(&srv->srv_clients);
                 schedEnd(&srv->srv_root);
                 pthread_mutex_destroy(&srv->srv_funcs.mtx);
                 e_free(srv);
                 return NULL;
         }
   
                c->cli_sock = accept(srv->srv_server.cli_sock, &c->cli_sa, &salen);        if (ioctl(srv->srv_server.cli_sock, BIOCIMMEDIATE, &n) == -1) {
                if (c->cli_sock == -1) {                LOGERR;
                        LOGERR;                goto err;
                        continue; 
                } else 
                        c->cli_parent = srv; 
 
                if (pthread_create(&c->cli_tid, NULL, rpc_srv_dispatchCall, c)) { 
                        LOGERR; 
                        continue; 
                } else 
                        pthread_detach(c->cli_tid); 
         }          }
           n = (netBuf < RPC_MIN_BUFSIZ) ? getpagesize() : E_ALIGN(netBuf, 2);
           if (ioctl(srv->srv_server.cli_sock, BIOCSBLEN, &n) == -1) {
                   LOGERR;
                   goto err;
           } else
                   srv->srv_netbuf = n;
   
        return 0;        memset(&ifr, 0, sizeof ifr);
}        strlcpy(ifr.ifr_name, szIface, sizeof ifr.ifr_name);
         if (ioctl(srv->srv_server.cli_sock, BIOCSETIF, &ifr) == -1) {
                 LOGERR;
                 goto err;
         } else
                 fcntl(srv->srv_server.cli_sock, F_SETFL, 
                                 fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
   
// ---------------------------------------------------------        rpc_register_srvPing(srv);
   
           return srv;
   err:    /* error condition */
           close(srv->srv_server.cli_sock);
           array_Destroy(&srv->srv_clients);
           schedEnd(&srv->srv_root);
           pthread_mutex_destroy(&srv->srv_funcs.mtx);
           e_free(srv);
           return NULL;
   }
   
 /*  /*
 * rpc_srv_execCall() Execute registered call from RPC server * rpc_srv_loopServer2() - Execute Main layer2 server loop and wait for clients requests
 * @call = Register RPC call *
 * @rpc = IN RPC call structure * @srv = RPC Server instance
 * @args = IN RPC call array of rpc values * return: -1 error or 0 ok, infinite loop ...
 * return: -1 error, !=-1 ok 
  */   */
 int  int
rpc_srv_execCall(rpc_func_t * __restrict call, struct tagRPCCall * __restrict rpc, rpc_srv_loopServer2(rpc_srv_t * __restrict srv)
                rpc_val_t * __restrict args) 
 {  {
        void *dl;        rpc_cli_t *c;
        rpc_callback_t func;        register int i;
        int ret;        rpc_func_t *f;
         struct timespec ts = { RPC_SCHED_POLLING, 0 };
   
        if (!call || !rpc || !call->func_parent) {        if (!srv) {
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t exec call from RPC server ...\n");                rpc_SetErr(EINVAL, "Invalid parameter can`t start RPC server");
                 return -1;                  return -1;
         }          }
   
        dl = dlopen((char*) (*call->func_file ? call->func_file : NULL), RTLD_NOW);        if (!schedRead(srv->srv_root, cbProto[srv->srv_proto][CB_ACCEPTCLIENT], srv, 
        if (!dl) {                                srv->srv_server.cli_sock, NULL, 0)) {
                rpc_SetErr(ENOENT, "Error:: Can`t attach module %s!\n", dlerror());                rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                 return -1;                  return -1;
         }          }
   
        func = dlsym(dl, (char*) call->func_name);        schedPolling(srv->srv_root, &ts, NULL);
        if (func)        /* main rpc loop */
                ret = func(call, rpc->call_argc, args);        schedRun(srv->srv_root, &srv->srv_kill);
        else {
                rpc_SetErr(ENOEXEC, "Error:: Can`t find function %s!\n", dlerror());        /* close all clients connections & server socket */
                ret = -1;        for (i = 0; i < array_Size(srv->srv_clients); i++) {
                 c = array(srv->srv_clients, i, rpc_cli_t*);
                 if (c) {
                         schedCancelby(srv->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
                         ait_freeVars(&RPC_RETVARS(c));
                         AIT_FREE_VAL(&c->cli_buf);
                 }
                 array_Del(srv->srv_clients, i, 42);
         }          }
           array_Destroy(&srv->srv_clients);
   
        dlclose(dl);        close(srv->srv_server.cli_sock);
        return ret;
         /* detach exported calls */
         RPC_FUNCS_LOCK(&srv->srv_funcs);
         while ((f = SLIST_FIRST(&srv->srv_funcs))) {
                 SLIST_REMOVE_HEAD(&srv->srv_funcs, func_next);
 
                 AIT_FREE_VAL(&f->func_name);
                 e_free(f);
         }
         srv->srv_funcs.avlh_root = NULL;
         RPC_FUNCS_UNLOCK(&srv->srv_funcs);
 
         return 0;
 }  }

Removed from v.1.3.2.9  
changed lines
  Added in v.1.23.6.2


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>