Diff for /libaitrpc/src/srv.c between versions 1.6 and 1.19

version 1.6, 2011/11/03 15:32:21 version 1.19, 2013/08/23 13:53:15
Line 12  terms: Line 12  terms:
 All of the documentation and software included in the ELWIX and AITNET  All of the documentation and software included in the ELWIX and AITNET
 Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>  Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
   
Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013
         by Michael Pounov <misho@elwix.org>.  All rights reserved.          by Michael Pounov <misho@elwix.org>.  All rights reserved.
   
 Redistribution and use in source and binary forms, with or without  Redistribution and use in source and binary forms, with or without
Line 46  SUCH DAMAGE. Line 46  SUCH DAMAGE.
 #include "global.h"  #include "global.h"
   
   
static void */* SOCK_STREAM */
rpc_srv_dispatchCall(void *arg)static void *acceptClients(sched_task_t *);
 static void *closeClient(sched_task_t *);
 static void *rxPacket(sched_task_t *);
 static void *txPacket(sched_task_t *);
 
 /* SOCK_DGRAM */
 static void *freeClient(sched_task_t *);
 static void *rxUDPPacket(sched_task_t *);
 static void *txUDPPacket(sched_task_t *);
 
 /* SOCK_RAW */
 
 static sched_task_func_t cbProto[SOCK_RAW + 1][4] = {
         { acceptClients, closeClient, rxPacket, txPacket },     /* SOCK_STREAM */
         { acceptClients, closeClient, rxPacket, txPacket },     /* SOCK_STREAM */
         { rxUDPPacket, freeClient, rxUDPPacket, txUDPPacket },  /* SOCK_DGRAM */
         { NULL, NULL, NULL, NULL }                              /* SOCK_RAW */
 };
 
 
 void
 rpc_freeCli(rpc_cli_t * __restrict c)
 {  {
        rpc_cli_t *c = arg;        rpc_srv_t *s = c->cli_parent;
        rpc_srv_t *s;
        rpc_func_t *f = NULL;        schedCancelby(s->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
        array_t *arr;
        struct tagRPCCall *rpc;        /* free buffer */
        struct tagRPCRet *rrpc;        AIT_FREE_VAL(&c->cli_buf);
        rpc_sess_t ses = { 0 };
        fd_set fds;        array_Del(s->srv_clients, c->cli_id, 0);
        u_char *buf;        if (c)
        int ret, argc = 0, Limit = 0;                e_free(c);
 }
 
 
 static inline int
 _check4freeslot(rpc_srv_t * __restrict srv, sockaddr_t * __restrict sa)
 {
         rpc_cli_t *c = NULL;
         register int i;          register int i;
         uint16_t tag = 0;  
         uint32_t hash = 0;  
   
        if (!arg) {        /* check free slots for connect */
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t procced RPC client ...\n");        for (i = 0; i < array_Size(srv->srv_clients) && 
                return NULL;                        (c = array(srv->srv_clients, i, rpc_cli_t*)); i++)
        } else                /* check for duplicates */
                s = c->cli_parent;                if (sa && !e_addrcmp(&c->cli_sa, sa, 42))
                         break;
         if (i >= array_Size(srv->srv_clients))
                 return -1;      /* no more free slots! */
   
        buf = malloc(s->srv_netbuf);        return i;
        if (!buf) {}
                LOGERR;
 static rpc_cli_t *
 _allocClient(rpc_srv_t * __restrict srv, sockaddr_t * __restrict sa)
 {
         rpc_cli_t *c = NULL;
         int n;
 
         n = _check4freeslot(srv, sa);
         if (n == -1)
                 return NULL;                  return NULL;
           else
                   c = array(srv->srv_clients, n, rpc_cli_t*);
   
           if (!c) {
                   c = e_malloc(sizeof(rpc_cli_t));
                   if (!c) {
                           LOGERR;
                           srv->srv_kill = 1;
                           return NULL;
                   } else {
                           memset(c, 0, sizeof(rpc_cli_t));
                           array_Set(srv->srv_clients, n, c);
                           c->cli_id = n;
                           c->cli_parent = srv;
                   }
   
                   /* alloc empty buffer */
                   AIT_SET_BUFSIZ(&c->cli_buf, 0, srv->srv_netbuf);
         }          }
   
        do {        return c;
                FD_ZERO(&fds);}
                FD_SET(c->cli_sock, &fds); 
                ret = select(c->cli_sock + 1, &fds, NULL, NULL, NULL); 
                if (ret == -1) { 
                        if (errno == EINTR && s->srv_kill != kill) 
                                continue; 
   
                        LOGERR;
                        ret = -2;static void *
                        break;freeClient(sched_task_t *task)
 {
         rpc_freeCli(TASK_ARG(task));
 
         return NULL;
 }
 
 static void *
 closeClient(sched_task_t *task)
 {
         int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock;
 
         rpc_freeCli(TASK_ARG(task));
 
         /* close client socket */
         shutdown(sock, SHUT_RDWR);
         close(sock);
         return NULL;
 }
 
 static void *
 txPacket(sched_task_t *task)
 {
         rpc_cli_t *c = TASK_ARG(task);
         rpc_srv_t *s = c->cli_parent;
         rpc_func_t *f = NULL;
         u_char *buf = AIT_GET_BUF(&c->cli_buf);
         struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
         int ret, estlen, wlen = sizeof(struct tagRPCCall);
         struct pollfd pfd;
 
         if (rpc->call_argc) {
                 f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
                 if (!f) {
                         rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
 
                         rpc->call_argc ^= rpc->call_argc;
                         rpc->call_rep.ret = RPC_ERROR(-1);
                         rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                 } else {
                         /* calc estimated length */
                         estlen = ait_resideVars(RPC_RETVARS(c)) + wlen;
                         if (estlen > AIT_LEN(&c->cli_buf))
                                 AIT_RE_BUF(&c->cli_buf, estlen);
                         buf = AIT_GET_BUF(&c->cli_buf);
                         rpc = (struct tagRPCCall*) buf;
 
                         rpc->call_argc = htons(array_Size(RPC_RETVARS(c)));
                         /* Go Encapsulate variables */
                         ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen, 
                                         RPC_RETVARS(c));
                         /* Free return values */
                         ait_freeVars(&c->cli_vars);
                         if (ret == -1) {
                                 rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
 
                                 rpc->call_argc ^= rpc->call_argc;
                                 rpc->call_rep.ret = RPC_ERROR(-1);
                                 rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                         } else
                                 wlen += ret;
                 }                  }
                memset(buf, 0, s->srv_netbuf);        }
                ret = recv(c->cli_sock, buf, s->srv_netbuf, 0);
         rpc->call_len = htonl(wlen);
 
 #if 0
         /* calculate CRC */
         rpc->call_crc ^= rpc->call_crc;
         rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
 #endif
 
         /* send reply */
         pfd.fd = TASK_FD(task);
         pfd.events = POLLOUT;
         for (; wlen > 0; wlen -= ret, buf += ret) {
                 if ((ret = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 || 
                                 pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
                         if (ret)
                                 LOGERR;
                         else
                                 rpc_SetErr(ETIMEDOUT, "Timeout reached! Server not respond");
                         /* close connection */
                         schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                         TASK_ARG(task), 0, NULL, 0);
                         return NULL;
                 }
                 ret = send(TASK_FD(task), buf, MIN(wlen, s->srv_netbuf), MSG_NOSIGNAL);
                 if (ret == -1) {                  if (ret == -1) {
                        LOGERR;                        /* close connection */
                        ret = -3;                        schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                        break;                                        TASK_ARG(task), 0, NULL, 0);
                         return NULL;
                 }                  }
                if (!ret) {             /* receive EOF */        }
                        ret = 0; 
                        break; 
                } 
                if (ret < sizeof(struct tagRPCCall)) { 
                        rpc_SetErr(ERPCMISMATCH, "Error:: too short RPC packet ...\n"); 
                        ret = -4; 
                        if (s->srv_kill != kill) 
                                continue; 
                        else 
                                break; 
                } else 
                        rpc = (struct tagRPCCall*) buf; 
                /* check RPC packet session info */ 
                if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) { 
                        rpc_SetErr(ERPCMISMATCH, "Error:: get invalid RPC session ...\n"); 
                        ret = -5; 
                        goto makeReply; 
                } else 
                        Limit = sizeof(struct tagRPCCall); 
   
                tag = rpc->call_tag;        return NULL;
                hash = rpc->call_hash;}
   
                /* RPC is OK! Go decapsulate variables ... */static void *
                if (ntohs(rpc->call_argc)) {execCall(sched_task_t *task)
                        arr = io_buffer2vars(buf + Limit, s->srv_netbuf - Limit, {
                                        ntohs(rpc->call_argc), 1);        rpc_cli_t *c = TASK_ARG(task);
                        if (!arr) {        rpc_srv_t *s = c->cli_parent;
                                ret = -5;        rpc_func_t *f = NULL;
                                goto makeReply;        array_t *arr = NULL;
                        }        u_char *buf = AIT_GET_BUF(&c->cli_buf);
                } else        struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
                        arr = NULL;        int argc = ntohs(rpc->call_argc);
   
                /* execute call */        /* Go decapsulate variables ... */
                argc = 0;        if (argc) {
                memcpy(&ses, &rpc->call_session, sizeof ses);                arr = ait_buffer2vars(buf + sizeof(struct tagRPCCall), 
                if (!(f = rpc_srv_getCall(s, ntohs(tag), ntohl(hash)))) {                                AIT_LEN(&c->cli_buf) - sizeof(struct tagRPCCall), argc, 42);
                        rpc_SetErr(EPROGUNAVAIL, "Error:: call not found into RPC server ...\n");                if (!arr) {
                        ret = -6;                        rpc_SetErr(ERPCMISMATCH, "#%d - %s", elwix_GetErrno(), elwix_GetError());
                } else 
                        if ((ret = rpc_srv_execCall(f, rpc, arr)) == -1) 
                                ret = -9; 
                        else { 
                                if (arr) 
                                        io_arrayDestroy(&arr); 
                                argc = rpc_srv_getVars(f, &arr); 
                                goto makeReply;         /* Call finish OK */ 
                        } 
   
                if (arr)                        rpc->call_argc ^= rpc->call_argc;
                        io_arrayDestroy(&arr);                        rpc->call_rep.ret = RPC_ERROR(-1);
                         rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                         return NULL;
                 }
         } else
                 arr = NULL;
   
makeReply:        if (!(f = rpc_srv_getCall(s, ntohs(rpc->call_tag)))) {
                /* Made reply */                rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
                memset(buf, 0, s->srv_netbuf); 
                rrpc = (struct tagRPCRet*) buf; 
                Limit = sizeof(struct tagRPCRet); 
   
                memcpy(&rrpc->ret_session, &ses, sizeof(rpc_sess_t));                rpc->call_argc ^= rpc->call_argc;
                rrpc->ret_tag = tag;                rpc->call_rep.ret = RPC_ERROR(-1);
                rrpc->ret_hash = hash;                rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                rrpc->ret_errno = htonl(rpc_Errno);        } else {
                rrpc->ret_retcode = htonl(ret);                /* if client doesn't want reply */
                rrpc->ret_argc = htons(argc);                argc = RPC_CHK_NOREPLY(rpc);
                rpc->call_rep.ret = RPC_ERROR(rpc_srv_execCall(c, rpc, f->func_name, arr));
                if (argc && arr) {                if (rpc->call_rep.ret == htonl(-1)) {
                        /* Go Encapsulate variables ... */                        rpc->call_rep.eno = RPC_ERROR(errno);
                        if ((i = io_vars2buffer(buf + Limit, s->srv_netbuf - Limit, arr)) == -1) {                        rpc->call_argc ^= rpc->call_argc;
                                io_clrVars(f->func_vars);                } else {
                                argc = 0;                        rpc->call_rep.eno ^= rpc->call_rep.eno;
                                ret = -7;                        if (argc) {
                                rpc_SetErr(EBADRPC, "Error:: in prepare RPC packet values (-7) ...\n");                                /* without reply */
                                goto makeReply;                                ait_freeVars(&c->cli_vars);
                                 rpc->call_argc ^= rpc->call_argc;
                         } else {                          } else {
                                Limit += i;                                /* reply */
                                rpc->call_argc = htons(array_Size(RPC_RETVARS(c)));
                                io_clrVars(f->func_vars); 
                         }                          }
                 }                  }
           }
   
                ret = send(c->cli_sock, buf, Limit, 0);        array_Destroy(&arr);
                if (ret == -1) {        return NULL;
                        LOGERR;}
                        ret = -8;
                        break;static void *
                }rxPacket(sched_task_t *task)
                if (ret != Limit) {{
                        rpc_SetErr(EPROCUNAVAIL, "Error:: in send RPC request, should be send %d bytes, "        rpc_cli_t *c = TASK_ARG(task);
                                        "really is %d\n", Limit, ret);        rpc_srv_t *s = c->cli_parent;
                        ret = -9;        int len, rlen, noreply, estlen;
                        if (s->srv_kill != kill)#if 0
                                continue;        u_short crc;
 #endif
         u_char *buf = AIT_GET_BUF(&c->cli_buf);
         struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
         struct pollfd pfd;
 
         memset(buf, 0, sizeof(struct tagRPCCall));
         rlen = recv(TASK_FD(task), rpc, sizeof(struct tagRPCCall), MSG_PEEK);
         if (rlen < sizeof(struct tagRPCCall)) {
                 /* close connection */
                 schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                 TASK_ARG(task), 0, NULL, 0);
                 return NULL;
         } else {
                 estlen = ntohl(rpc->call_len);
                 if (estlen > AIT_LEN(&c->cli_buf))
                         AIT_RE_BUF(&c->cli_buf, estlen);
                 rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
                 buf = AIT_GET_BUF(&c->cli_buf);
                 len = estlen;
         }
 
         /* get next part of packet */
         memset(buf, 0, len);
         pfd.fd = TASK_FD(task);
         pfd.events = POLLIN | POLLPRI;
         for (; len > 0; len -= rlen, buf += rlen) {
                 if ((rlen = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 || 
                                 pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
                         if (rlen)
                                 LOGERR;
                         else                          else
                                break;                                rpc_SetErr(ETIMEDOUT, "Timeout reached! Server not respond");
                         schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                         TASK_ARG(task), 0, NULL, 0);
                         return NULL;
                 }                  }
        } while (ret > -1 || s->srv_kill != kill);                rlen = recv(TASK_FD(task), buf, len, 0);
                 if (rlen == -1) {
                         /* close connection */
                         schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                         TASK_ARG(task), 0, NULL, 0);
                         return NULL;
                 }
         }
   
        shutdown(c->cli_sock, SHUT_RDWR);#if 0
        close(c->cli_sock);        /* check integrity of packet */
        memset(c, 0, sizeof(rpc_cli_t));        crc = ntohs(rpc->call_crc);
        free(buf);        rpc->call_crc ^= rpc->call_crc;
        return (void*) (long)ret;        if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
}                rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
                 return NULL;
         }
 #endif
   
           noreply = RPC_CHK_NOREPLY(rpc);
   
           /* check RPC packet session info */
           if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) {
                   rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
   
                   rpc->call_argc ^= rpc->call_argc;
                   rpc->call_rep.ret = RPC_ERROR(-1);
                   rpc->call_rep.eno = RPC_ERROR(errno);
           } else {
                   /* execute RPC call */
                   schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), (int) noreply, rpc, len);
           }
   
           /* send RPC reply */
           if (!noreply)
                   schedWrite(TASK_ROOT(task), cbProto[s->srv_proto][CB_TXPACKET], 
                                   TASK_ARG(task), TASK_FD(task), rpc, len);
   
           /* lets get next packet */
           schedReadSelf(task);
           return NULL;
   }
   
 static void *  static void *
rpc_srv_dispatchVars(void *arg)acceptClients(sched_task_t *task)
 {  {
        rpc_cli_t *c = arg;        rpc_srv_t *srv = TASK_ARG(task);
        rpc_srv_t *s;        rpc_cli_t *c = NULL;
        rpc_blob_t *b;        socklen_t salen = sizeof(sockaddr_t);
        int ret = 0; 
        fd_set fds; 
        u_char buf[sizeof(struct tagBLOBHdr)]; 
        struct tagBLOBHdr *blob; 
   
        if (!arg) {        c = _allocClient(srv, NULL);
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t procced BLOB client ...\n");        if (!c)
                return NULL;                goto end;
 
         /* accept client */
         c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
         if (c->cli_sock == -1) {
                 LOGERR;
                 AIT_FREE_VAL(&c->cli_buf);
                 array_Del(srv->srv_clients, c->cli_id, 42);
                 goto end;
         } else          } else
                s = c->cli_parent;                fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
   
        do {        schedRead(TASK_ROOT(task), cbProto[srv->srv_proto][CB_RXPACKET], c, 
                /* check for disable service at this moment? */                        c->cli_sock, NULL, 0);
                if (s->srv_blob.state == disable && s->srv_kill != kill) {end:
                        usleep(100000);        schedReadSelf(task);
#ifdef HAVE_PTHREAD_YIELD        return NULL;
                        pthread_yield();}
#endif
                        continue;
 static void *
 txUDPPacket(sched_task_t *task)
 {
         rpc_cli_t *c = TASK_ARG(task);
         rpc_srv_t *s = c->cli_parent;
         rpc_func_t *f = NULL;
         u_char *buf = AIT_GET_BUF(&c->cli_buf);
         struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
         int ret, wlen = sizeof(struct tagRPCCall);
         struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
 
         schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
         schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                          TASK_ARG(task), ts, TASK_ARG(task), 0);
 
         if (rpc->call_argc) {
                 f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
                 if (!f) {
                         rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
                         rpc->call_argc ^= rpc->call_argc;
                         rpc->call_rep.ret = RPC_ERROR(-1);
                         rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                 } else {
                         rpc->call_argc = htons(array_Size(RPC_RETVARS(c)));
                         /* Go Encapsulate variables */
                         ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen, 
                                         RPC_RETVARS(c));
                         /* Free return values */
                         ait_freeVars(&c->cli_vars);
                         if (ret == -1) {
                                 rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
                                 rpc->call_argc ^= rpc->call_argc;
                                 rpc->call_rep.ret = RPC_ERROR(-1);
                                 rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                         } else
                                 wlen += ret;
                 }                  }
           }
   
                FD_ZERO(&fds);        rpc->call_len = htonl(wlen);
                FD_SET(c->cli_sock, &fds); 
                ret = select(c->cli_sock + 1, &fds, NULL, NULL, NULL); 
                if (ret == -1) { 
                        if (errno == EINTR && s->srv_kill != kill && s->srv_blob.state != kill) 
                                continue; 
   
                        LOGERR;        /* calculate CRC */
                        ret = -2;        rpc->call_crc ^= rpc->call_crc;
         rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
 
         /* send reply */
         ret = sendto(TASK_FD(task), buf, wlen, MSG_NOSIGNAL, 
                         &c->cli_sa.sa, c->cli_sa.sa.sa_len);
         if (ret == -1 || ret != wlen) {
                 /* close connection */
                 schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                  TASK_ARG(task), 0, NULL, 0);
         }
 
         return NULL;
 }
 
 static void *
 rxUDPPacket(sched_task_t *task)
 {
         rpc_srv_t *srv = TASK_ARG(task);
         rpc_cli_t *c = NULL;
         int len, rlen, noreply;
         u_short crc, off = 0;
         u_char buf[USHRT_MAX + 1];
         struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
         sockaddr_t sa;
         socklen_t salen;
         struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
 
         /* receive connect packet */
         salen = sa.ss.ss_len = sizeof(sockaddr_t);
         rlen = recvfrom(TASK_FD(task), buf, sizeof buf, 0, &sa.sa, &salen);
         if (rlen < 1)
                 goto end;
 
         c = _allocClient(srv, &sa);
         if (!c)
                 goto end;
         else {
                 c->cli_sock = TASK_FD(task);
                 memcpy(&c->cli_sa, &sa, sizeof c->cli_sa);
                 memcpy(AIT_GET_BUF(&c->cli_buf), buf, AIT_LEN(&c->cli_buf));
                 /* armed timer for close stateless connection */
                 schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
                 schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], 
                                 c, ts, c, 0);
         }
 
         do {
                 /* check RPC packet */
                 if (rlen < sizeof(struct tagRPCCall)) {
                         rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
                         break;                          break;
                }                } else
                         rpc = (struct tagRPCCall*) (AIT_GET_BUF(&c->cli_buf) + off);
   
                memset(buf, 0, sizeof buf);                len = ntohl(rpc->call_len);
                ret = recv(c->cli_sock, buf, sizeof buf, 0);                rlen -= len;
                if (ret == -1) {
                        LOGERR;                /* check RPC packet lengths */
                        ret = -3;                if (rlen < 0 || len < sizeof(struct tagRPCCall)) {
                         rpc_SetErr(ERPCMISMATCH, "Broken RPC packet length");
                         /* skip entire packet */
                         break;                          break;
                 }                  }
                /* receive EOF, disable or kill service */
                if (!ret || s->srv_blob.state == kill || s->srv_kill == kill) {                /* check integrity of packet */
                        ret = 0;                crc = ntohs(rpc->call_crc);
                        break;                rpc->call_crc ^= rpc->call_crc;
                 if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
                         rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
 
                         off += len;
                         /* try next packet remaining into buffer */
                         continue;
                 }                  }
                if (ret < sizeof(struct tagBLOBHdr)) {
                        rpc_SetErr(ERPCMISMATCH, "Error:: too short BLOB packet ...\n");                noreply = RPC_CHK_NOREPLY(rpc);
                        ret = -4;
                        if (s->srv_kill != kill && s->srv_blob.state != kill)                /* check RPC packet session info */
                                continue;                if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
                        else                        rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
                                break;
                } else                        rpc->call_argc ^= rpc->call_argc;
                        blob = (struct tagBLOBHdr*) buf;                        rpc->call_rep.ret = RPC_ERROR(-1);
                /* check BLOB packet session info */                        rpc->call_rep.eno = RPC_ERROR(errno);
                if (memcmp(&blob->hdr_session, &s->srv_session, sizeof blob->hdr_session)) {                } else {
                        rpc_SetErr(EINVAL, "Error:: get invalid BLOB session ...\n");                        /* execute RPC call */
                        ret = -5;                        schedEvent(TASK_ROOT(task), execCall, c, 
                        goto makeReply;                                        (int) noreply, rpc, len);
                 }                  }
                 /* Go to proceed packet ... */  
                 switch (blob->hdr_cmd) {  
                         case get:  
                                 if (!(b = rpc_srv_getBLOB(s, blob->hdr_var))) {  
                                         rpc_SetErr(EINVAL, "Error:: var (%x) not found into BLOB server ...\n",   
                                                         blob->hdr_var);  
                                         ret = -6;  
                                         break;  
                                 } else  
                                         blob->hdr_len = b->blob_len;  
   
                                if (rpc_srv_blobMap(s, b) != -1) {                /* send RPC reply */
                                        ret = rpc_srv_sendBLOB(c, b);                if (!noreply)
                                        rpc_srv_blobUnmap(b);                        schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], 
                                } else                                        c, TASK_FD(task), rpc, len);
                                        ret = -7; 
                                break; 
                        case set: 
                                if ((b = rpc_srv_registerBLOB(s, blob->hdr_len))) { 
                                        /* set new BLOB variable for reply :) */ 
                                        blob->hdr_var = b->blob_var; 
   
                                        ret = rpc_srv_recvBLOB(c, b);                off += len;
                                        rpc_srv_blobUnmap(b);        } while (rlen > 0);
                                } else
                                        ret = -7;end:
         schedReadSelf(task);
         return NULL;
 }
 
 /* ------------------------------------------------------ */
 
 void
 rpc_freeBLOBCli(rpc_cli_t * __restrict c)
 {
         rpc_srv_t *s = c->cli_parent;
 
         schedCancelby(s->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
 
         /* free buffer */
         AIT_FREE_VAL(&c->cli_buf);
 
         array_Del(s->srv_blob.clients, c->cli_id, 0);
         if (c)
                 e_free(c);
 }
 
 
 static void *
 closeBLOBClient(sched_task_t *task)
 {
         int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock;
 
         rpc_freeBLOBCli(TASK_ARG(task));
 
         /* close client socket */
         shutdown(sock, SHUT_RDWR);
         close(sock);
         return NULL;
 }
 
 static void *
 txBLOB(sched_task_t *task)
 {
         rpc_cli_t *c = TASK_ARG(task);
         u_char *buf = AIT_GET_BUF(&c->cli_buf);
         int wlen = sizeof(struct tagBLOBHdr);
 
         /* send reply */
         wlen = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);
         if (wlen == -1 || wlen != sizeof(struct tagBLOBHdr)) {
                 /* close blob connection */
                 schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
         }
 
         return NULL;
 }
 
 static void *
 rxBLOB(sched_task_t *task)
 {
         rpc_cli_t *c = TASK_ARG(task);
         rpc_srv_t *s = c->cli_parent;
         rpc_blob_t *b;
         struct tagBLOBHdr blob;
         int rlen;
 
         memset(&blob, 0, sizeof blob);
         rlen = recv(TASK_FD(task), &blob, sizeof blob, 0);
         if (rlen < 1) {
                 /* close blob connection */
                 schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
                 return NULL;
         }
 
         /* check BLOB packet */
         if (rlen < sizeof(struct tagBLOBHdr)) {
                 rpc_SetErr(ERPCMISMATCH, "Short BLOB packet");
 
                 schedReadSelf(task);
                 return NULL;
         }
 
         /* check RPC packet session info */
         if (rpc_chkPktSession(&blob.hdr_session, &s->srv_session)) {
                 rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
                 blob.hdr_cmd = error;
                 goto end;
         }
 
         /* Go to proceed packet ... */
         switch (blob.hdr_cmd) {
                 case get:
                         if (!(b = rpc_srv_getBLOB(s, ntohl(blob.hdr_var)))) {
                                 rpc_SetErr(EINVAL, "Var=%x not found", ntohl(blob.hdr_var));
                                 blob.hdr_cmd = no;
                                 blob.hdr_ret = RPC_ERROR(-1);
                                 break;                                  break;
                        case unset:                        } else
                                ret = rpc_srv_unregisterBLOB(s, blob->hdr_var);                                blob.hdr_len = htonl(b->blob_len);
                                if (ret == -1) 
                                        ret = -7; 
                                break; 
                        default: 
                                rpc_SetErr(EPROCUNAVAIL, "Error:: unsupported BLOB command (%d)...\n",  
                                                blob->hdr_cmd); 
                                ret = -7; 
                } 
   
makeReply:                        if (rpc_srv_blobMap(s, b) != -1) {
                /* Replay to client! */                                /* deliver BLOB variable to client */
                blob->hdr_cmd = ret < 0 ? error : ok;                                blob.hdr_ret = htonl(rpc_srv_sendBLOB(c, b));
                blob->hdr_ret = ret;                                rpc_srv_blobUnmap(b);
                ret = send(c->cli_sock, buf, sizeof buf, 0);                        } else {
                if (ret == -1) {                                blob.hdr_cmd = error;
                        LOGERR;                                blob.hdr_ret = RPC_ERROR(-1);
                        ret = -8;                        }
                         break;                          break;
                }                case set:
                if (ret != sizeof buf) {                        if ((b = rpc_srv_registerBLOB(s, ntohl(blob.hdr_len), 
                        rpc_SetErr(EPROCUNAVAIL, "Error:: in send BLOB reply, should be send %d bytes, "                                                        ntohl(blob.hdr_ret)))) {
                                        "really is %d\n", sizeof buf, ret);                                /* set new BLOB variable for reply :) */
                        ret = -9;                                blob.hdr_var = htonl(b->blob_var);
                        if (s->srv_kill != kill && s->srv_blob.state != kill) 
                                continue; 
                        else 
                                break; 
                } 
        } while (ret > -1 || s->srv_kill != kill); 
   
        shutdown(c->cli_sock, SHUT_RDWR);                                /* receive BLOB from client */
        close(c->cli_sock);                                blob.hdr_ret = htonl(rpc_srv_recvBLOB(c, b));
        memset(c, 0, sizeof(rpc_cli_t));                                rpc_srv_blobUnmap(b);
        return (void*) ((long)ret);                        } else {
                                 blob.hdr_cmd = error;
                                 blob.hdr_ret = RPC_ERROR(-1);
                         }
                         break;
                 case unset:
                         if (rpc_srv_unregisterBLOB(s, ntohl(blob.hdr_var)) == -1) {
                                 blob.hdr_cmd = error;
                                 blob.hdr_ret = RPC_ERROR(-1);
                         }
                         break;
                 default:
                         rpc_SetErr(EPROCUNAVAIL, "Unsupported BLOB command %d", blob.hdr_cmd);
                         blob.hdr_cmd = error;
                         blob.hdr_ret = RPC_ERROR(-1);
         }
 
 end:
         memcpy(AIT_ADDR(&c->cli_buf), &blob, sizeof blob);
         schedWrite(TASK_ROOT(task), txBLOB, TASK_ARG(task), TASK_FD(task), NULL, 0);
         schedReadSelf(task);
         return NULL;
 }  }
   
// -------------------------------------------------static void *
 flushBLOB(sched_task_t *task)
 {
         rpc_srv_t *srv = TASK_ARG(task);
         rpc_blob_t *b, *tmp;
   
           TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
                   TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
   
                   rpc_srv_blobFree(srv, b);
                   e_free(b);
           }
   
           schedSignalSelf(task);
           return NULL;
   }
   
   static void *
   acceptBLOBClients(sched_task_t *task)
   {
           rpc_srv_t *srv = TASK_ARG(task);
           rpc_cli_t *c = NULL;
           register int i;
           socklen_t salen = sizeof(sockaddr_t);
   #ifdef TCP_NOPUSH
           int n = 1;
   #endif
   
           /* check free slots for connect */
           for (i = 0; i < array_Size(srv->srv_blob.clients) && 
                           (c = array(srv->srv_blob.clients, i, rpc_cli_t*)); i++);
           if (c)  /* no more free slots! */
                   goto end;
           c = e_malloc(sizeof(rpc_cli_t));
           if (!c) {
                   LOGERR;
                   srv->srv_kill = srv->srv_blob.kill = 1;
                   return NULL;
           } else {
                   memset(c, 0, sizeof(rpc_cli_t));
                   array_Set(srv->srv_blob.clients, i, c);
                   c->cli_id = i;
                   c->cli_parent = srv;
           }
   
           /* alloc empty buffer */
           AIT_SET_BUFSIZ(&c->cli_buf, 0, srv->srv_netbuf);
   
           /* accept client */
           c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
           if (c->cli_sock == -1) {
                   LOGERR;
                   AIT_FREE_VAL(&c->cli_buf);
                   array_Del(srv->srv_blob.clients, i, 42);
                   goto end;
           } else {
   #ifdef TCP_NOPUSH
                   setsockopt(c->cli_sock, IPPROTO_TCP, TCP_NOPUSH, &n, sizeof n);
   #endif
                   fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
           }
   
           schedRead(TASK_ROOT(task), rxBLOB, c, c->cli_sock, NULL, 0);
   end:
           schedReadSelf(task);
           return NULL;
   }
   
   /* ------------------------------------------------------ */
   
 /*  /*
 * rpc_srv_initBLOBServer() Init & create BLOB Server * rpc_srv_initBLOBServer() - Init & create BLOB Server
  *
  * @srv = RPC server instance   * @srv = RPC server instance
  * @Port = Port for bind server, if Port == 0 default port is selected   * @Port = Port for bind server, if Port == 0 default port is selected
  * @diskDir = Disk place for BLOB file objects   * @diskDir = Disk place for BLOB file objects
Line 345  int Line 767  int
 rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)  rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)
 {  {
         int n = 1;          int n = 1;
         io_sockaddr_t sa;  
   
        if (!srv) {        if (!srv || srv->srv_kill) {
                rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t init BLOB server ...\n");                rpc_SetErr(EINVAL, "Invalid parameters can`t init BLOB server");
                 return -1;                  return -1;
         }          }
         if (srv->srv_blob.state) {  
                 rpc_SetErr(EPERM, "Warning:: Already started BLOB server!\n");  
                 return 0;  
         }  
   
         memset(&srv->srv_blob, 0, sizeof srv->srv_blob);          memset(&srv->srv_blob, 0, sizeof srv->srv_blob);
         if (access(diskDir, R_OK | W_OK) == -1) {          if (access(diskDir, R_OK | W_OK) == -1) {
                 LOGERR;                  LOGERR;
                 return -1;                  return -1;
         } else          } else
                strlcpy(srv->srv_blob.dir, diskDir, UCHAR_MAX + 1);                AIT_SET_STR(&srv->srv_blob.dir, diskDir);
   
        srv->srv_blob.server.cli_tid = pthread_self();        /* init blob list */
         TAILQ_INIT(&srv->srv_blob.blobs);
 
         srv->srv_blob.server.cli_parent = srv;          srv->srv_blob.server.cli_parent = srv;
   
        memcpy(&sa, &srv->srv_server.cli_sa, sizeof sa);        memcpy(&srv->srv_blob.server.cli_sa, &srv->srv_server.cli_sa, sizeof(sockaddr_t));
        switch (sa.sa.sa_family) {        switch (srv->srv_blob.server.cli_sa.sa.sa_family) {
                 case AF_INET:                  case AF_INET:
                        sa.sin.sin_port = htons(Port ? Port : ntohs(sa.sin.sin_port) + 1);                        srv->srv_blob.server.cli_sa.sin.sin_port = 
                                 htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin.sin_port) + 1);
                         break;                          break;
                 case AF_INET6:                  case AF_INET6:
                        sa.sin6.sin6_port = htons(Port ? Port : ntohs(sa.sin6.sin6_port) + 1);                        srv->srv_blob.server.cli_sa.sin6.sin6_port = 
                                 htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin6.sin6_port) + 1);
                         break;                          break;
                 case AF_LOCAL:                  case AF_LOCAL:
                        strlcat(sa.sun.sun_path, ".blob", sizeof sa.sun.sun_path);                        strlcat(srv->srv_blob.server.cli_sa.sun.sun_path, ".blob", 
                                         sizeof srv->srv_blob.server.cli_sa.sun.sun_path);
                         break;                          break;
                 default:                  default:
                           AIT_FREE_VAL(&srv->srv_blob.dir);
                         return -1;                          return -1;
         }          }
         memcpy(&srv->srv_blob.server.cli_sa, &sa, sizeof sa);  
   
         /* create BLOB server socket */          /* create BLOB server socket */
         srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);          srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);
         if (srv->srv_blob.server.cli_sock == -1) {          if (srv->srv_blob.server.cli_sock == -1) {
                 LOGERR;                  LOGERR;
                   AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;                  return -1;
         }          }
         if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {          if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
                 LOGERR;                  LOGERR;
                 close(srv->srv_blob.server.cli_sock);                  close(srv->srv_blob.server.cli_sock);
                   AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;                  return -1;
         }          }
         n = srv->srv_netbuf;          n = srv->srv_netbuf;
         if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {          if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
                 LOGERR;                  LOGERR;
                 close(srv->srv_blob.server.cli_sock);                  close(srv->srv_blob.server.cli_sock);
                   AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;                  return -1;
         }          }
         if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {          if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
                 LOGERR;                  LOGERR;
                 close(srv->srv_blob.server.cli_sock);                  close(srv->srv_blob.server.cli_sock);
                   AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;                  return -1;
         }          }
         if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa,           if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa, 
                                 srv->srv_blob.server.cli_sa.sa.sa_len) == -1) {                                  srv->srv_blob.server.cli_sa.sa.sa_len) == -1) {
                 LOGERR;                  LOGERR;
                 close(srv->srv_blob.server.cli_sock);                  close(srv->srv_blob.server.cli_sock);
                   AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;                  return -1;
        }        } else
                 fcntl(srv->srv_blob.server.cli_sock, F_SETFL, 
                                 fcntl(srv->srv_blob.server.cli_sock, F_GETFL) | O_NONBLOCK);
   
        /* allocate pool for concurent clients */
        srv->srv_blob.clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));        /* allocate pool for concurent blob clients */
         srv->srv_blob.clients = array_Init(array_Size(srv->srv_clients));
         if (!srv->srv_blob.clients) {          if (!srv->srv_blob.clients) {
                LOGERR;                rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
                 close(srv->srv_blob.server.cli_sock);                  close(srv->srv_blob.server.cli_sock);
                   AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;                  return -1;
        } else        }
                memset(srv->srv_blob.clients, 0, srv->srv_numcli * sizeof(rpc_cli_t)); 
   
        pthread_mutex_init(&srv->srv_blob.mtx, NULL);        /* init blob scheduler */
         srv->srv_blob.root = schedBegin();
         if (!srv->srv_blob.root) {
                 rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                 array_Destroy(&srv->srv_blob.clients);
                 close(srv->srv_blob.server.cli_sock);
                 AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;
         }
   
         rpc_srv_registerCall(srv, NULL, CALL_BLOBSHUTDOWN, 0);  
         rpc_srv_registerCall(srv, NULL, CALL_BLOBCLIENTS, 1);  
         rpc_srv_registerCall(srv, NULL, CALL_BLOBVARS, 1);  
         rpc_srv_registerCall(srv, NULL, CALL_BLOBSTATE, 0);  
   
         srv->srv_blob.state = enable;   /* enable BLOB */  
         return 0;          return 0;
 }  }
   
 /*  /*
 * rpc_srv_endBLOBServer() Destroy BLOB server, close all opened sockets and free resources * rpc_srv_endBLOBServer() - Destroy BLOB server, close all opened sockets and free resources
  *
  * @srv = RPC Server instance   * @srv = RPC Server instance
  * return: none   * return: none
  */   */
 void  void
 rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)  rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
 {  {
        rpc_cli_t *c;        if (!srv)
        register int i; 
        rpc_blob_t *f; 
 
        if (!srv) { 
                rpc_SetErr(EINVAL, "Error:: Can`t destroy server because parameter is null!\n"); 
                 return;                  return;
         } else  
                 srv->srv_blob.state = kill;  
   
        rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSHUTDOWN);        srv->srv_blob.kill = 1;
        rpc_srv_unregisterCall(srv, NULL, CALL_BLOBCLIENTS); 
        rpc_srv_unregisterCall(srv, NULL, CALL_BLOBVARS); 
        rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSTATE); 
   
        /* close all clients connections & server socket */        schedEnd(&srv->srv_blob.root);
        for (i = 0, c = srv->srv_blob.clients; i < srv->srv_numcli && c; i++, c++) 
                if (c->cli_sa.sa.sa_family) 
                        shutdown(c->cli_sock, SHUT_RDWR); 
        close(srv->srv_blob.server.cli_sock); 
 
        pthread_mutex_lock(&srv->srv_blob.mtx); 
        if (srv->srv_blob.clients) { 
                free(srv->srv_blob.clients); 
                srv->srv_blob.clients = NULL; 
        } 
 
        /* detach blobs */ 
        while ((f = srv->srv_blob.blobs)) { 
                srv->srv_blob.blobs = f->blob_next; 
                rpc_srv_blobFree(srv, f); 
                free(f); 
        } 
        pthread_mutex_unlock(&srv->srv_blob.mtx); 
 
        while (pthread_mutex_trylock(&srv->srv_blob.mtx) == EBUSY); 
        pthread_mutex_destroy(&srv->srv_blob.mtx); 
 }  }
   
 /*  /*
 * rpc_srv_loopBLOB() Execute Main BLOB server loop and wait for clients requests * rpc_srv_loopBLOBServer() - Execute Main BLOB server loop and wait for clients requests
  *
  * @srv = RPC Server instance   * @srv = RPC Server instance
  * return: -1 error or 0 ok, infinite loop ...   * return: -1 error or 0 ok, infinite loop ...
  */   */
 int  int
rpc_srv_loopBLOB(rpc_srv_t * __restrict srv)rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv)
 {  {
         socklen_t salen = sizeof(io_sockaddr_t);  
         register int i;  
         rpc_cli_t *c;          rpc_cli_t *c;
        fd_set fds;        register int i;
        int ret;        rpc_blob_t *b, *tmp;
        struct timeval tv = { DEF_RPC_TIMEOUT, 0 };        struct timespec ts = { RPC_SCHED_POLLING, 0 };
   
        if (!srv || srv->srv_blob.state == kill) {        if (!srv || srv->srv_kill) {
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start BLOB server ...\n");                rpc_SetErr(EINVAL, "Invalid parameter can`t start BLOB server");
                 return -1;                  return -1;
         }          }
   
        if (listen(srv->srv_blob.server.cli_sock, SOMAXCONN) == -1) {        if (listen(srv->srv_blob.server.cli_sock, array_Size(srv->srv_blob.clients)) == -1) {
                 LOGERR;                  LOGERR;
                 return -1;                  return -1;
         }          }
   
        while (srv->srv_blob.state != kill && srv->srv_kill != kill) {        schedSignal(srv->srv_blob.root, flushBLOB, srv, SIGFBLOB, NULL, 0);
                for (c = srv->srv_blob.clients, i = 0; i < srv->srv_numcli && c; i++, c++)        if (!schedRead(srv->srv_blob.root, acceptBLOBClients, srv, 
                        if (!c->cli_sa.sa.sa_family)                                srv->srv_blob.server.cli_sock, NULL, 0)) {
                                break;                rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                if (i >= srv->srv_numcli) {                return -1;
#ifdef HAVE_PTHREAD_YIELD        }
                        pthread_yield(); 
#else 
                        usleep(1000000); 
#endif 
                        continue; 
                } 
   
                FD_ZERO(&fds);        schedPolling(srv->srv_blob.root, &ts, NULL);
                FD_SET(srv->srv_blob.server.cli_sock, &fds);        /* main rpc loop */
                ret = select(srv->srv_blob.server.cli_sock + 1, &fds, NULL, NULL, &tv);        schedRun(srv->srv_blob.root, &srv->srv_blob.kill);
                if (ret == -1) { 
                        LOGERR; 
                        ret = 1; 
                        break; 
                } 
                if (!ret) 
                        continue; 
   
                c->cli_sock = accept(srv->srv_blob.server.cli_sock, &c->cli_sa.sa, &salen);        /* detach blobs */
                if (c->cli_sock == -1) {        TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
                        LOGERR;                TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
                        continue; 
                } else 
                        c->cli_parent = srv; 
   
                if (pthread_create(&c->cli_tid, NULL, rpc_srv_dispatchVars, c)) {                rpc_srv_blobFree(srv, b);
                        LOGERR;                e_free(b);
                        continue; 
                } else 
                        pthread_detach(c->cli_tid); 
         }          }
   
        srv->srv_blob.state = kill;        /* close all clients connections & server socket */
         for (i = 0; i < array_Size(srv->srv_blob.clients); i++) {
                 c = array(srv->srv_blob.clients, i, rpc_cli_t*);
                 if (c) {
                         shutdown(c->cli_sock, SHUT_RDWR);
                         close(c->cli_sock);
   
                           schedCancelby(srv->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
                           AIT_FREE_VAL(&c->cli_buf);
                   }
                   array_Del(srv->srv_blob.clients, i, 42);
           }
           array_Destroy(&srv->srv_blob.clients);
   
           close(srv->srv_blob.server.cli_sock);
   
           AIT_FREE_VAL(&srv->srv_blob.dir);
         return 0;          return 0;
 }  }
   
   
 /*  /*
 * rpc_srv_initServer() Init & create RPC Server * rpc_srv_initServer() - Init & create RPC Server
 * @regProgID = ProgramID for authentication & recognition *
 * @regProcID = ProcessID for authentication & recognition * @InstID = Instance for authentication & recognition
  * @concurentClients = Concurent clients at same time to this server   * @concurentClients = Concurent clients at same time to this server
 * @netBuf = Network buffer length, if =0 == BUFSIZ (also meaning max RPC packet) * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
 * @family = Family type, AF_INET, AF_INET6 or AF_LOCAL 
  * @csHost = Host name or address for bind server, if NULL any address   * @csHost = Host name or address for bind server, if NULL any address
  * @Port = Port for bind server, if Port == 0 default port is selected   * @Port = Port for bind server, if Port == 0 default port is selected
    * @proto = Protocol, if == 0 choose SOCK_STREAM
  * return: NULL == error or !=NULL bind and created RPC server instance   * return: NULL == error or !=NULL bind and created RPC server instance
  */   */
 rpc_srv_t *  rpc_srv_t *
rpc_srv_initServer(u_int regProgID, u_int regProcID, int concurentClients, rpc_srv_initServer(u_char InstID, int concurentClients, int netBuf, 
                int netBuf, u_short family, const char *csHost, u_short Port)                const char *csHost, u_short Port, int proto)
 {  {
         rpc_srv_t *srv = NULL;  
         int n = 1;          int n = 1;
        struct hostent *host = NULL;        rpc_srv_t *srv = NULL;
        io_sockaddr_t sa;        sockaddr_t sa = E_SOCKADDR_INIT;
   
        if (!concurentClients || !regProgID ||         if (!concurentClients || (proto < 0 || proto > SOCK_DGRAM)) {
                        (family != AF_INET && family != AF_INET6 && family != AF_LOCAL)) {                rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
                rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t init RPC server ...\n"); 
                 return NULL;                  return NULL;
         }          }
           if (!e_gethostbyname(csHost, Port, &sa))
                   return NULL;
         if (!Port)          if (!Port)
                 Port = RPC_DEFPORT;                  Port = RPC_DEFPORT;
        if (!netBuf)        if (!proto)
                 proto = SOCK_STREAM;
         if (netBuf < RPC_MIN_BUFSIZ)
                 netBuf = BUFSIZ;                  netBuf = BUFSIZ;
        if (csHost && family != AF_LOCAL) {        else
                host = gethostbyname2(csHost, family);                netBuf = E_ALIGN(netBuf, 2);    /* align netBuf length */
                if (!host) { 
                        rpc_SetErr(h_errno, "Error:: %s\n", hstrerror(h_errno)); 
                        return NULL; 
                } 
        } 
        memset(&sa, 0, sizeof sa); 
        sa.sa.sa_family = family; 
        switch (family) { 
                case AF_INET: 
                        sa.sin.sin_len = sizeof(struct sockaddr_in); 
                        sa.sin.sin_port = htons(Port); 
                        if (csHost) 
                                memcpy(&sa.sin.sin_addr, host->h_addr, host->h_length); 
                        break; 
                case AF_INET6: 
                        sa.sin6.sin6_len = sizeof(struct sockaddr_in6); 
                        sa.sin6.sin6_port = htons(Port); 
                        if (csHost) 
                                memcpy(&sa.sin6.sin6_addr, host->h_addr, host->h_length); 
                        break; 
                case AF_LOCAL: 
                        sa.sun.sun_len = sizeof(struct sockaddr_un); 
                        if (csHost) 
                                strlcpy(sa.sun.sun_path, csHost, sizeof sa.sun.sun_path); 
                        unlink(sa.sun.sun_path); 
                        break; 
                default: 
                        rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t start RPC server ...\n"); 
                        return NULL; 
        } 
   
        srv = malloc(sizeof(rpc_srv_t));#ifdef HAVE_SRANDOMDEV
         srandomdev();
 #else
         time_t tim;
 
         srandom((time(&tim) ^ getpid()));
 #endif
 
         srv = e_malloc(sizeof(rpc_srv_t));
         if (!srv) {          if (!srv) {
                 LOGERR;                  LOGERR;
                 return NULL;                  return NULL;
         } else          } else
                 memset(srv, 0, sizeof(rpc_srv_t));                  memset(srv, 0, sizeof(rpc_srv_t));
   
           srv->srv_proto = proto;
         srv->srv_netbuf = netBuf;          srv->srv_netbuf = netBuf;
         srv->srv_numcli = concurentClients;  
         srv->srv_session.sess_version = RPC_VERSION;          srv->srv_session.sess_version = RPC_VERSION;
        srv->srv_session.sess_program = regProgID;        srv->srv_session.sess_instance = InstID;
        srv->srv_session.sess_process = regProcID; 
   
         srv->srv_server.cli_tid = pthread_self();  
         srv->srv_server.cli_parent = srv;          srv->srv_server.cli_parent = srv;
        memcpy(&srv->srv_server.cli_sa, &sa, sizeof sa);        memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa);
   
           /* init functions */
           pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
           SLIST_INIT(&srv->srv_funcs);
           AVL_INIT(&srv->srv_funcs);
   
           /* init scheduler */
           srv->srv_root = schedBegin();
           if (!srv->srv_root) {
                   rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                   pthread_mutex_destroy(&srv->srv_funcs.mtx);
                   e_free(srv);
                   return NULL;
           }
   
           /* init pool for clients */
           srv->srv_clients = array_Init(concurentClients);
           if (!srv->srv_clients) {
                   rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
                   schedEnd(&srv->srv_root);
                   pthread_mutex_destroy(&srv->srv_funcs.mtx);
                   e_free(srv);
                   return NULL;
           }
   
         /* create server socket */          /* create server socket */
        srv->srv_server.cli_sock = socket(family, SOCK_STREAM, 0);        srv->srv_server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, srv->srv_proto, 0);
         if (srv->srv_server.cli_sock == -1) {          if (srv->srv_server.cli_sock == -1) {
                 LOGERR;                  LOGERR;
                free(srv);                array_Destroy(&srv->srv_clients);
                 schedEnd(&srv->srv_root);
                 pthread_mutex_destroy(&srv->srv_funcs.mtx);
                 e_free(srv);
                 return NULL;                  return NULL;
         }          }
         if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {          if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
                 LOGERR;                  LOGERR;
                close(srv->srv_server.cli_sock);                goto err;
                free(srv); 
                return NULL; 
         }          }
         n = srv->srv_netbuf;          n = srv->srv_netbuf;
         if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {          if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
                 LOGERR;                  LOGERR;
                close(srv->srv_server.cli_sock);                goto err;
                free(srv); 
                return NULL; 
         }          }
         if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {          if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
                 LOGERR;                  LOGERR;
                close(srv->srv_server.cli_sock);                goto err;
                free(srv); 
                return NULL; 
         }          }
         if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa,           if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa, 
                                 srv->srv_server.cli_sa.sa.sa_len) == -1) {                                  srv->srv_server.cli_sa.sa.sa_len) == -1) {
                 LOGERR;                  LOGERR;
                close(srv->srv_server.cli_sock);                goto err;
                free(srv); 
                return NULL; 
        } 
 
        /* allocate pool for concurent clients */ 
        srv->srv_clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t)); 
        if (!srv->srv_clients) { 
                LOGERR; 
                close(srv->srv_server.cli_sock); 
                free(srv); 
                return NULL; 
         } else          } else
                memset(srv->srv_clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));                fcntl(srv->srv_server.cli_sock, F_SETFL, 
                                 fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
   
        pthread_mutex_init(&srv->srv_mtx, NULL);        rpc_register_srvPing(srv);
   
         rpc_srv_registerCall(srv, NULL, CALL_SRVSHUTDOWN, 0);  
         rpc_srv_registerCall(srv, NULL, CALL_SRVCLIENTS, 1);  
         rpc_srv_registerCall(srv, NULL, CALL_SRVSESSIONS, 4);  
         rpc_srv_registerCall(srv, NULL, CALL_SRVCALLS, 1);  
         return srv;          return srv;
   err:    /* error condition */
           close(srv->srv_server.cli_sock);
           array_Destroy(&srv->srv_clients);
           schedEnd(&srv->srv_root);
           pthread_mutex_destroy(&srv->srv_funcs.mtx);
           e_free(srv);
           return NULL;
 }  }
   
 /*  /*
 * rpc_srv_endServer() Destroy RPC server, close all opened sockets and free resources * rpc_srv_endServer() - Destroy RPC server, close all opened sockets and free resources
  *
  * @psrv = RPC Server instance   * @psrv = RPC Server instance
  * return: none   * return: none
  */   */
 void  void
 rpc_srv_endServer(rpc_srv_t ** __restrict psrv)  rpc_srv_endServer(rpc_srv_t ** __restrict psrv)
 {  {
        rpc_cli_t *c;        if (!psrv || !*psrv)
        register int i; 
        rpc_func_t *f; 
 
        if (!psrv || !*psrv) { 
                rpc_SetErr(EINVAL, "Error:: Can`t destroy server because parameter is null!\n"); 
                 return;                  return;
         }  
   
           /* if send kill to blob server */
         rpc_srv_endBLOBServer(*psrv);          rpc_srv_endBLOBServer(*psrv);
   
        /* close all clients connections & server socket */        (*psrv)->srv_kill = 1;
        for (i = 0, c = (*psrv)->srv_clients; i < (*psrv)->srv_numcli && c; i++, c++)        sleep(RPC_SCHED_POLLING);
                if (c->cli_sa.sa.sa_family) { 
                        shutdown(c->cli_sock, SHUT_RDWR); 
                        close(c->cli_sock); 
                } 
        close((*psrv)->srv_server.cli_sock); 
   
        if ((*psrv)->srv_clients) {        schedEnd(&(*psrv)->srv_root);
                free((*psrv)->srv_clients); 
                (*psrv)->srv_clients = NULL; 
                (*psrv)->srv_numcli = 0; 
        } 
   
        /* detach exported calls */        pthread_mutex_destroy(&(*psrv)->srv_funcs.mtx);
        pthread_mutex_lock(&(*psrv)->srv_mtx);        e_free(*psrv);
        while ((f = (*psrv)->srv_funcs)) { 
                (*psrv)->srv_funcs = f->func_next; 
                io_freeVars(&f->func_vars); 
                free(f); 
        } 
        pthread_mutex_unlock(&(*psrv)->srv_mtx); 
 
        while (pthread_mutex_trylock(&(*psrv)->srv_mtx) == EBUSY); 
        pthread_mutex_destroy(&(*psrv)->srv_mtx); 
 
        free(*psrv); 
         *psrv = NULL;          *psrv = NULL;
 }  }
   
 /*  /*
 * rpc_srv_loopServer() Execute Main server loop and wait for clients requests * rpc_srv_loopServer() - Execute Main server loop and wait for clients requests
  *
  * @srv = RPC Server instance   * @srv = RPC Server instance
  * return: -1 error or 0 ok, infinite loop ...   * return: -1 error or 0 ok, infinite loop ...
  */   */
 int  int
 rpc_srv_loopServer(rpc_srv_t * __restrict srv)  rpc_srv_loopServer(rpc_srv_t * __restrict srv)
 {  {
         socklen_t salen = sizeof(io_sockaddr_t);  
         register int i;  
         rpc_cli_t *c;          rpc_cli_t *c;
        fd_set fds;        register int i;
        int ret;        rpc_func_t *f;
        struct timeval tv = { DEF_RPC_TIMEOUT, 0 };        struct timespec ts = { RPC_SCHED_POLLING, 0 };
   
         if (!srv) {          if (!srv) {
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start RPC server ...\n");                rpc_SetErr(EINVAL, "Invalid parameter can`t start RPC server");
                 return -1;                  return -1;
         }          }
   
        /* activate BLOB server worker if srv->srv_blob.state == enable */        if (srv->srv_proto == SOCK_STREAM)
        rpc_srv_execBLOBServer(srv);                if (listen(srv->srv_server.cli_sock, array_Size(srv->srv_clients)) == -1) {
                         LOGERR;
                         return -1;
                 }
   
        if (listen(srv->srv_server.cli_sock, SOMAXCONN) == -1) {        if (!schedRead(srv->srv_root, cbProto[srv->srv_proto][CB_ACCEPTCLIENT], srv, 
                LOGERR;                                srv->srv_server.cli_sock, NULL, 0)) {
                 rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                 return -1;                  return -1;
         }          }
   
        while (srv->srv_kill != kill) {        schedPolling(srv->srv_root, &ts, NULL);
                for (c = srv->srv_clients, i = 0; i < srv->srv_numcli && c; i++, c++)        /* main rpc loop */
                        if (!c->cli_sa.sa.sa_family)        schedRun(srv->srv_root, &srv->srv_kill);
                                break; 
                if (i >= srv->srv_numcli) { 
#ifdef HAVE_PTHREAD_YIELD 
                        pthread_yield(); 
#else 
                        usleep(1000000); 
#endif 
                        continue; 
                } 
   
                FD_ZERO(&fds);        /* close all clients connections & server socket */
                FD_SET(srv->srv_server.cli_sock, &fds);        for (i = 0; i < array_Size(srv->srv_clients); i++) {
                ret = select(srv->srv_server.cli_sock + 1, &fds, NULL, NULL, &tv);                c = array(srv->srv_clients, i, rpc_cli_t*);
                if (ret == -1) {                if (c) {
                        LOGERR;                        shutdown(c->cli_sock, SHUT_RDWR);
                        ret = 1;                        close(c->cli_sock);
                        break;
                         schedCancelby(srv->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
                         ait_freeVars(&RPC_RETVARS(c));
                         AIT_FREE_VAL(&c->cli_buf);
                 }                  }
                if (!ret)                array_Del(srv->srv_clients, i, 42);
                        continue;        }
         array_Destroy(&srv->srv_clients);
   
                c->cli_sock = accept(srv->srv_server.cli_sock, &c->cli_sa.sa, &salen);        close(srv->srv_server.cli_sock);
                if (c->cli_sock == -1) { 
                        LOGERR; 
                        continue; 
                } else 
                        c->cli_parent = srv; 
   
                if (pthread_create(&c->cli_tid, NULL, rpc_srv_dispatchCall, c)) {        /* detach exported calls */
                        LOGERR;        RPC_FUNCS_LOCK(&srv->srv_funcs);
                        continue;        while ((f = SLIST_FIRST(&srv->srv_funcs))) {
                } else                SLIST_REMOVE_HEAD(&srv->srv_funcs, func_next);
                        pthread_detach(c->cli_tid);
                 AIT_FREE_VAL(&f->func_name);
                 e_free(f);
         }          }
           srv->srv_funcs.avlh_root = NULL;
           RPC_FUNCS_UNLOCK(&srv->srv_funcs);
   
         return 0;          return 0;
 }  }
   
 // ---------------------------------------------------------  
   
 /*  /*
  * rpc_srv_execCall() Execute registered call from RPC server   * rpc_srv_execCall() Execute registered call from RPC server
 * @call = Register RPC call *
  * @cli = RPC client
  * @rpc = IN RPC call structure   * @rpc = IN RPC call structure
    * @funcname = Execute RPC function
  * @args = IN RPC calling arguments from RPC client   * @args = IN RPC calling arguments from RPC client
  * return: -1 error, !=-1 ok   * return: -1 error, !=-1 ok
  */   */
 int  int
rpc_srv_execCall(rpc_func_t * __restrict call, struct tagRPCCall * __restrict rpc, rpc_srv_execCall(rpc_cli_t * __restrict cli, struct tagRPCCall * __restrict rpc, 
                array_t * __restrict args)                ait_val_t funcname, array_t * __restrict args)
 {  {
         void *dl;  
         rpc_callback_t func;          rpc_callback_t func;
         int ret;  
   
        if (!call || !rpc || !call->func_parent) {        if (!cli || !rpc || !AIT_ADDR(&funcname)) {
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t exec call from RPC server ...\n");                rpc_SetErr(EINVAL, "Invalid parameter can`t exec function");
                 return -1;                  return -1;
         }          }
   
        dl = dlopen((char*) (*call->func_file ? call->func_file : NULL), RTLD_NOW);        func = AIT_GET_LIKE(&funcname, rpc_callback_t);
        if (!dl) {        return func(cli, rpc, args);
                rpc_SetErr(ENOENT, "Error:: Can`t attach module %s!\n", dlerror()); 
                return -1; 
        } 
 
        func = dlsym(dl, (char*) call->func_name); 
        if (func) 
                ret = func(call, ntohs(rpc->call_argc), args); 
        else { 
                rpc_SetErr(ENOEXEC, "Error:: Can`t find function %s!\n", dlerror()); 
                ret = -1; 
        } 
 
        dlclose(dl); 
        return ret; 
 }  }

Removed from v.1.6  
changed lines
  Added in v.1.19


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>