Diff for /libaitrpc/src/srv.c between versions 1.4 and 1.11

version 1.4, 2011/08/29 22:37:06 version 1.11, 2012/07/22 20:44:13
Line 12  terms: Line 12  terms:
 All of the documentation and software included in the ELWIX and AITNET  All of the documentation and software included in the ELWIX and AITNET
 Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>  Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
   
Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
         by Michael Pounov <misho@elwix.org>.  All rights reserved.          by Michael Pounov <misho@elwix.org>.  All rights reserved.
   
 Redistribution and use in source and binary forms, with or without  Redistribution and use in source and binary forms, with or without
Line 47  SUCH DAMAGE. Line 47  SUCH DAMAGE.
   
   
 static void *  static void *
rpc_srv_dispatchCall(void *arg)closeClient(sched_task_t *task)
 {  {
        rpc_cli_t *c = arg;        rpc_cli_t *c = TASK_ARG(task);
        rpc_srv_t *s;        rpc_srv_t *s = c->cli_parent;
        rpc_val_t *vals = NULL, *v = NULL;
         schedCancelby(TASK_ROOT(task), taskMAX, CRITERIA_ARG, TASK_ARG(task), NULL);
 
         /* close client socket */
         if (TASK_VAL(task))
                 shutdown(c->cli_sock, SHUT_RDWR);
         close(c->cli_sock);
 
         /* free buffer */
         AIT_FREE_VAL(&c->cli_buf);
 
         io_arrayDel(s->srv_clients, c->cli_id, 0);
         if (c)
                 io_free(c);
         return NULL;
 }
 
 static void *
 txPacket(sched_task_t *task)
 {
         rpc_cli_t *c = TASK_ARG(task);
         rpc_srv_t *s = c->cli_parent;
         rpc_func_t *f = NULL;          rpc_func_t *f = NULL;
           u_char buf[USHRT_MAX] = { 0 };
           struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
           int ret, wlen = sizeof(struct tagRPCCall);
   
           /* copy RPC header */
           memcpy(buf, TASK_DATA(task), wlen);
   
           if (rpc->call_argc) {
                   f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
                   if (!f) {
                           rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
                           rpc->call_argc ^= rpc->call_argc;
                           rpc->call_rep.ret = RPC_ERROR(-1);
                           rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                   } else {
                           rpc->call_argc = htons(io_arraySize(RPC_RETVARS(c)));
                           /* Go Encapsulate variables */
                           ret = io_vars2buffer(buf + wlen, sizeof buf - wlen, RPC_RETVARS(c));
                           /* Free return values */
                           io_freeVars(&c->cli_vars);
                           if (ret == -1) {
                                   rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
                                   rpc->call_argc ^= rpc->call_argc;
                                   rpc->call_rep.ret = RPC_ERROR(-1);
                                   rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                           } else
                                   wlen += ret;
                   }
           }
   
           rpc->call_len = htons(wlen);
   
           /* calculate CRC */
           rpc->call_crc ^= rpc->call_crc;
           rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
   
           /* send reply */
           ret = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);
           if (ret == -1 || ret != wlen) {
                   /* close connection */
                   schedEvent(TASK_ROOT(task), closeClient, c, 42, NULL, 0);
           }
   
           return NULL;
   }
   
   static void *
   execCall(sched_task_t *task)
   {
           rpc_cli_t *c = TASK_ARG(task);
           rpc_srv_t *s = c->cli_parent;
           rpc_func_t *f = NULL;
           array_t *arr = NULL;
           u_char *buf = AIT_GET_BUF(&c->cli_buf) + TASK_VAL(task);
           struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
           int argc = ntohs(rpc->call_argc);
   
           /* Go decapsulate variables ... */
           if (argc) {
                   arr = io_buffer2vars(buf + sizeof(struct tagRPCCall), 
                                   AIT_LEN(&c->cli_buf) - TASK_VAL(task) - sizeof(struct tagRPCCall), 
                                   argc, 1);
                   if (!arr) {
                           rpc_SetErr(ERPCMISMATCH, "#%d - %s", io_GetErrno(), io_GetError());
                           rpc->call_argc ^= rpc->call_argc;
                           rpc->call_rep.ret = RPC_ERROR(-1);
                           rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                           return NULL;
                   }
           } else
                   arr = NULL;
   
           if (!(f = rpc_srv_getCall(s, ntohs(rpc->call_tag)))) {
                   rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
                   rpc->call_argc ^= rpc->call_argc;
                   rpc->call_rep.ret = RPC_ERROR(-1);
                   rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
           } else {
                   /* if client doesn't want reply */
                   argc = RPC_CHK_NOREPLY(rpc);
                   rpc->call_rep.ret = RPC_ERROR(rpc_srv_execCall(c, rpc, f->func_name, arr));
                   if (rpc->call_rep.ret == htonl(-1)) {
                           rpc->call_rep.eno = RPC_ERROR(errno);
                           rpc->call_argc ^= rpc->call_argc;
                   } else {
                           rpc->call_rep.eno ^= rpc->call_rep.eno;
                           if (argc) {
                                   /* without reply */
                                   io_freeVars(&c->cli_vars);
                                   rpc->call_argc ^= rpc->call_argc;
                           } else {
                                   /* reply */
                                   rpc->call_argc = htons(io_arraySize(RPC_RETVARS(c)));
                           }
                   }
           }
   
           io_arrayDestroy(&arr);
           return NULL;
   }
   
   static void *
   rxPacket(sched_task_t *task)
   {
           rpc_cli_t *c = TASK_ARG(task);
           rpc_srv_t *s = c->cli_parent;
           int len, rlen, noreply;
           u_short crc, off = TASK_DATLEN(task);
           u_char *buf = AIT_GET_BUF(&c->cli_buf);
         struct tagRPCCall *rpc;          struct tagRPCCall *rpc;
         struct tagRPCRet *rrpc;  
         rpc_sess_t ses = { 0 };  
         fd_set fds;  
         u_char buf[BUFSIZ], *data;  
         int ret, argc = 0, Limit = 0;  
         register int i;  
   
        if (!arg) {        memset(buf, 0, AIT_LEN(&c->cli_buf));
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t procced RPC client ...\n");        rlen = recv(TASK_FD(task), buf + off, AIT_LEN(&c->cli_buf) - off, 0);
         if (rlen < 1) {
                 /* close connection */
                 schedEvent(TASK_ROOT(task), closeClient, c, 42, NULL, 0);
                 return NULL;                  return NULL;
        } else        } else {
                s = c->cli_parent;                rlen += off;    /* add reminded bytes from previous rxPacket, if exists! */
                 off = 0;        /* process buffer from start offset == 0 */
         }
   
         do {          do {
                v = NULL;                /* check RPC packet */
                FD_ZERO(&fds);                if (rlen < sizeof(struct tagRPCCall)) {
                FD_SET(c->cli_sock, &fds);                        rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
                ret = select(c->cli_sock + 1, &fds, NULL, NULL, NULL); 
                if (ret == -1) { 
                        if (errno == EINTR && s->srv_kill != kill) 
                                continue; 
   
                        LOGERR;                        /* reminder received previous bytes ;) */
                        ret = -2;                        schedRead(TASK_ROOT(task), TASK_FUNC(task), TASK_ARG(task), 
                                         TASK_FD(task), TASK_DATA(task), rlen);
                         return NULL;
                 } else
                         rpc = (struct tagRPCCall*) (buf + off);
 
                 len = ntohs(rpc->call_len);
                 rlen -= len;
 
                 /* check RPC packet lengths */
                 if (rlen < 0 || len < sizeof(struct tagRPCCall)) {
                         rpc_SetErr(ERPCMISMATCH, "Broken RPC packet length");
                         /* skip entire packet */
                         break;                          break;
                 }                  }
                memset(buf, 0, sizeof buf);
                ret = recv(c->cli_sock, buf, sizeof buf, 0);                /* check integrity of packet */
                if (ret == -1) {                crc = ntohs(rpc->call_crc);
                        LOGERR;                rpc->call_crc ^= rpc->call_crc;
                        ret = -3;                if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
                        break;                        rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
 
                         off += len;
                         /* try next packet remaining into buffer */
                         continue;
                 }                  }
                 if (!ret) {             /* receive EOF */  
                         ret = 0;  
                         break;  
                 }  
                 if (ret < sizeof(struct tagRPCCall)) {  
                         rpc_SetErr(EMSGSIZE, "Error:: too short RPC packet ...\n");  
                         ret = -4;  
                         break;  
                 } else  
                         rpc = (struct tagRPCCall*) buf;  
                 /* check RPC packet session info */  
                 if (memcmp(&rpc->call_session, &s->srv_session, sizeof rpc->call_session)) {  
                         rpc_SetErr(EINVAL, "Error:: get invalid RPC session ...\n");  
                         ret = -5;  
                         goto makeReply;  
                 } else  
                         Limit = sizeof(struct tagRPCCall);  
                 /* RPC is OK! Go decapsulate variables ... */  
                 if (rpc->call_argc) {  
                         v = (rpc_val_t*) (buf + Limit);  
                         /* check RPC packet length */  
                         if (rpc->call_argc * sizeof(rpc_val_t) > sizeof buf - Limit) {  
                                 rpc_SetErr(EMSGSIZE, "Error:: too long RPC packet ...\n");  
                                 ret = -5;  
                                 goto makeReply;  
                         } else  
                                 Limit += rpc->call_argc * sizeof(rpc_val_t);  
                         /* RPC received variables types OK! */  
                         data = (u_char*) v + rpc->call_argc * sizeof(rpc_val_t);  
                         for (i = 0; i < rpc->call_argc; i++) {  
                                 switch (v[i].val_type) {  
                                         case buffer:  
                                                 if (v[i].val_len > sizeof buf - Limit) {  
                                                         rpc_SetErr(EMSGSIZE, "Error:: too long RPC packet ...\n");  
                                                         ret = -5;  
                                                         goto makeReply;  
                                                 } else  
                                                         Limit += v[i].val_len;  
   
                                                v[i].val.buffer = data;                noreply = RPC_CHK_NOREPLY(rpc);
                                                data += v[i].val_len; 
                                                break; 
                                        case string: 
                                                if (v[i].val_len > sizeof buf - Limit) { 
                                                        rpc_SetErr(EMSGSIZE, "Error:: too long RPC packet ...\n"); 
                                                        ret = -5; 
                                                        goto makeReply; 
                                                } else 
                                                        Limit += v[i].val_len; 
   
                                                v[i].val.string = (int8_t*) data;                /* check RPC packet session info */
                                                data += v[i].val_len;                if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) {
                                                break;                        rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
                                        case blob:                        rpc->call_argc ^= rpc->call_argc;
                                                if (s->srv_blob.state == disable) {                        rpc->call_rep.ret = RPC_ERROR(-1);
                                                        rpc_SetErr(ENOTSUP, "Error:: BLOB server is disabled\n");                        rpc->call_rep.eno = RPC_ERROR(errno);
                                                        ret = -5;                } else {
                                                        goto makeReply;                        /* execute RPC call */
                                                }                        schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), off, NULL, 0);
                                                if (s->srv_blob.state == kill) { 
                                                        rpc_SetErr(ENOTSUP, "Error:: BLOB server is gone.\n"); 
                                                        ret = -5; 
                                                        goto makeReply; 
                                                } 
                                        default: 
                                                break; 
                                } 
                        } 
                 }                  }
   
                /* execute call */                /* send RPC reply */
                argc = 0;                if (!noreply)
                vals = NULL;                        schedWrite(TASK_ROOT(task), txPacket, TASK_ARG(task), TASK_FD(task), rpc, len);
                memcpy(&ses, &rpc->call_session, sizeof ses); 
                if (!(f = rpc_srv_getCall(s, rpc->call_tag, rpc->call_hash))) { 
                        rpc_SetErr(EINVAL, "Error:: call not found into RPC server ...\n"); 
                        ret = -6; 
                } else 
                        if ((ret = rpc_srv_execCall(f, rpc, v)) == -1) 
                                ret = -9; 
                        else 
                                argc = rpc_srv_getVars(f, &vals); 
makeReply: 
                /* made reply */ 
                memset(buf, 0, sizeof buf); 
                rrpc = (struct tagRPCRet*) buf; 
                Limit = sizeof(struct tagRPCRet); 
   
                memcpy(&rrpc->ret_session, &ses, sizeof(rpc_sess_t));                off += len;
                rrpc->ret_tag = rpc->call_tag;        } while (rlen > 0);
                rrpc->ret_hash = rpc->call_hash; 
                rrpc->ret_errno = rpc_Errno; 
                rrpc->ret_retcode = ret; 
                rrpc->ret_argc = argc; 
   
                if (argc && vals) {        /* lets get next packet */
                        v = (rpc_val_t*) (buf + Limit);        schedRead(TASK_ROOT(task), TASK_FUNC(task), TASK_ARG(task), TASK_FD(task), TASK_DATA(task), 0);
                        if (argc * sizeof(rpc_val_t) > sizeof buf - Limit) {        return NULL;
                                for (i = 0; i < argc; i++)}
                                        RPC_FREE_VAL(&vals[i]); 
                                rpc_srv_freeVars(f); 
                                vals = NULL; 
                                argc = 0; 
                                ret = -7; 
                                rpc_SetErr(EMSGSIZE, "Error:: in prepare RPC packet values (-7) ...\n"); 
                                goto makeReply; 
                        } else 
                                Limit += argc * sizeof(rpc_val_t); 
                        memcpy(v, vals, argc * sizeof(rpc_val_t)); 
                        data = (u_char*) v + argc * sizeof(rpc_val_t); 
                        for (ret = i = 0; i < argc; i++) { 
                                switch (vals[i].val_type) { 
                                        case buffer: 
                                                if (ret || Limit + vals[i].val_len > sizeof buf) { 
                                                        rpc_SetErr(EMSGSIZE, "Error:: in prepare RPC packet (-7) ...\n"); 
                                                        rrpc->ret_retcode = ret = -7; 
                                                        rrpc->ret_argc = 0; 
                                                        break; 
                                                } 
   
                                                memcpy(data, vals[i].val.buffer, vals[i].val_len);static void *
                                                data += vals[i].val_len;acceptClients(sched_task_t *task)
                                                Limit += vals[i].val_len;{
                                                break;        rpc_srv_t *srv = TASK_ARG(task);
                                        case string:        rpc_cli_t *c = NULL;
                                                if (ret || Limit + vals[i].val_len > sizeof buf) {        register int i;
                                                        rpc_SetErr(EMSGSIZE, "Error:: in prepare RPC packet (-7) ...\n");        socklen_t salen = sizeof(io_sockaddr_t);
                                                        rrpc->ret_retcode = ret = -7; 
                                                        rrpc->ret_argc = 0; 
                                                        break; 
                                                } 
   
                                                memcpy(data, vals[i].val.string, vals[i].val_len);        /* check free slots for connect */
                                                data += vals[i].val_len;        for (i = 0; i < io_arraySize(srv->srv_clients) && 
                                                Limit += vals[i].val_len;                        (c = io_array(srv->srv_clients, i, rpc_cli_t*)); i++);
                                                break;        if (c)      /* no more free slots! */
                                        case blob:                goto end;
                                                if (s->srv_blob.state == disable) {        c = io_malloc(sizeof(rpc_cli_t));
                                                        rpc_SetErr(ENOTSUP, "Error:: BLOB server is disabled\n");        if (!c) {
                                                        rrpc->ret_retcode = ret = -5;                LOGERR;
                                                        rrpc->ret_argc = 0;                srv->srv_kill = 1;
                                                        break;                return NULL;
                                                }        } else {
                                                if (s->srv_blob.state == kill) {                memset(c, 0, sizeof(rpc_cli_t));
                                                        rpc_SetErr(ENOTSUP, "Error:: BLOB server is gone.\n");                io_arraySet(srv->srv_clients, i, c);
                                                        rrpc->ret_retcode = ret = -5;                c->cli_id = i;
                                                        rrpc->ret_argc = 0;                c->cli_parent = srv;
                                                        break;        }
                                                } 
                                        default: 
                                                break; 
                                } 
   
                                RPC_FREE_VAL(&vals[i]);        /* alloc empty buffer */
                        }        AIT_SET_BUF2(&c->cli_buf, 0, srv->srv_netbuf);
                        rpc_srv_freeVars(f); 
                        vals = NULL; 
                        argc = 0; 
                } 
   
                ret = send(c->cli_sock, buf, Limit, 0);        /* accept client */
                if (ret == -1) {        c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
                        LOGERR;        if (c->cli_sock == -1) {
                        ret = -8;                LOGERR;
                        break;                AIT_FREE_VAL(&c->cli_buf);
                }                io_arrayDel(srv->srv_clients, i, 42);
                if (ret != Limit) {                goto end;
                        rpc_SetErr(ECANCELED, "Error:: in send RPC request, should be send %d bytes, "        } else
                                        "really is %d\n", Limit, ret);                fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
                        ret = -9; 
                        break; 
                } 
        } while (ret > -1 || s->srv_kill != kill); 
   
        shutdown(c->cli_sock, SHUT_RDWR);        schedRead(TASK_ROOT(task), rxPacket, c, c->cli_sock, NULL, 0);
 end:
         schedReadSelf(task);
         return NULL;
 }
 
 /* ------------------------------------------------------ */
 
 static void *
 closeBLOBClient(sched_task_t *task)
 {
         rpc_cli_t *c = TASK_ARG(task);
         rpc_srv_t *s = c->cli_parent;
 
         schedCancelby(TASK_ROOT(task), taskMAX, CRITERIA_ARG, TASK_ARG(task), NULL);
 
         /* close client socket */
         if (TASK_VAL(task))
                 shutdown(c->cli_sock, SHUT_RDWR);
         close(c->cli_sock);          close(c->cli_sock);
        memset(c, 0, sizeof(rpc_cli_t));
        return (void*) (long)ret;        /* free buffer */
         AIT_FREE_VAL(&c->cli_buf);
 
         io_arrayDel(s->srv_blob.clients, c->cli_id, 0);
         if (c)
                 io_free(c);
         return NULL;
 }  }
   
   static void *
   txBLOB(sched_task_t *task)
   {
           rpc_cli_t *c = TASK_ARG(task);
           u_char *buf = AIT_GET_BUF(&c->cli_buf);
           struct tagBLOBHdr *blob = (struct tagBLOBHdr *) buf;
           int wlen = sizeof(struct tagBLOBHdr);
   
           /* calculate CRC */
           blob->hdr_crc ^= blob->hdr_crc;
           blob->hdr_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
   
           /* send reply */
           wlen = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);
           if (wlen == -1 || wlen != sizeof(struct tagBLOBHdr)) {
                   /* close blob connection */
                   schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
           }
   
           return NULL;
   }
   
 static void *  static void *
rpc_srv_dispatchVars(void *arg)rxBLOB(sched_task_t *task)
 {  {
        rpc_cli_t *c = arg;        rpc_cli_t *c = TASK_ARG(task);
        rpc_srv_t *s;        rpc_srv_t *s = c->cli_parent;
         rpc_blob_t *b;          rpc_blob_t *b;
        int ret = 0;        struct tagBLOBHdr blob;
        fd_set fds;        int rlen;
        u_char buf[sizeof(struct tagBLOBHdr)];        u_short crc;
        struct tagBLOBHdr *blob; 
   
        if (!arg) {        memset(&blob, 0, sizeof blob);
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t procced BLOB client ...\n");        rlen = recv(TASK_FD(task), &blob, sizeof blob, 0);
         if (rlen < 1) {
                 /* close blob connection */
                 schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
                 return NULL;                  return NULL;
        } else        }
                s = c->cli_parent; 
   
        do {        /* check BLOB packet */
                /* check for disable service at this moment? */        if (rlen < sizeof(struct tagBLOBHdr)) {
                if (s->srv_blob.state == disable && s->srv_kill != kill) {                rpc_SetErr(ERPCMISMATCH, "Short BLOB packet");
                        usleep(100000); 
#ifdef HAVE_PTHREAD_YIELD 
                        pthread_yield(); 
#endif 
                        continue; 
                } 
   
                FD_ZERO(&fds);                schedReadSelf(task);
                FD_SET(c->cli_sock, &fds);                return NULL;
                ret = select(c->cli_sock + 1, &fds, NULL, NULL, NULL);        }
                if (ret == -1) { 
                        if (errno == EINTR && s->srv_kill != kill && s->srv_blob.state != kill) 
                                continue; 
   
                        LOGERR;        /* check integrity of packet */
                        ret = -2;        crc = ntohs(blob.hdr_crc);
                        break;        blob.hdr_crc ^= blob.hdr_crc;
                }        if (crc != crcFletcher16((u_short*) &blob, rlen / 2)) {
                 rpc_SetErr(ERPCMISMATCH, "Bad CRC BLOB packet");
   
                memset(buf, 0, sizeof buf);                schedReadSelf(task);
                ret = recv(c->cli_sock, buf, sizeof buf, 0);                return NULL;
                if (ret == -1) {        }
                        LOGERR; 
                        ret = -3; 
                        break; 
                } 
                /* receive EOF, disable or kill service */ 
                if (!ret || s->srv_blob.state == kill || s->srv_kill == kill) { 
                        ret = 0; 
                        break; 
                } 
                if (ret < sizeof(struct tagBLOBHdr)) { 
                        rpc_SetErr(EMSGSIZE, "Error:: too short BLOB packet ...\n"); 
                        ret = -4; 
                        break; 
                } else 
                        blob = (struct tagBLOBHdr*) buf; 
                /* check BLOB packet session info */ 
                if (memcmp(&blob->hdr_session, &s->srv_session, sizeof blob->hdr_session)) { 
                        rpc_SetErr(EINVAL, "Error:: get invalid BLOB session ...\n"); 
                        ret = -5; 
                        goto makeReply; 
                } 
                /* Go to proceed packet ... */ 
                switch (blob->hdr_cmd) { 
                        case get: 
                                if (!(b = rpc_srv_getBLOB(s, blob->hdr_var))) { 
                                        rpc_SetErr(EINVAL, "Error:: var (%x) not found into BLOB server ...\n",  
                                                        blob->hdr_var); 
                                        ret = -6; 
                                        break; 
                                } else 
                                        blob->hdr_len = b->blob_len; 
   
                                if (rpc_srv_blobMap(s, b) != -1) {        /* check RPC packet session info */
                                        ret = rpc_srv_sendBLOB(c, b);        if ((crc = rpc_chkPktSession(&blob.hdr_session, &s->srv_session))) {
                                        rpc_srv_blobUnmap(b);                rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
                                } else                blob.hdr_cmd = error;
                                        ret = -7;                goto end;
                                break;        }
                        case set: 
                                if ((b = rpc_srv_registerBLOB(s, blob->hdr_len))) { 
                                        /* set new BLOB variable for reply :) */ 
                                        blob->hdr_var = b->blob_var; 
   
                                        ret = rpc_srv_recvBLOB(c, b);        /* Go to proceed packet ... */
                                        rpc_srv_blobUnmap(b);        switch (blob.hdr_cmd) {
                                } else                case get:
                                        ret = -7;                        if (!(b = rpc_srv_getBLOB(s, ntohl(blob.hdr_var)))) {
                                 rpc_SetErr(EINVAL, "Var=%x not found", ntohl(blob.hdr_var));
                                 blob.hdr_cmd = no;
                                 blob.hdr_ret = RPC_ERROR(-1);
                                 break;                                  break;
                        case unset:                        } else
                                ret = rpc_srv_unregisterBLOB(s, blob->hdr_var);                                blob.hdr_len = htonl(b->blob_len);
                                if (ret == -1) 
                                        ret = -7; 
                                break; 
                        default: 
                                rpc_SetErr(EINVAL, "Error:: unsupported BLOB command (%d)...\n",  
                                                blob->hdr_cmd); 
                                ret = -7; 
                } 
   
makeReply:                        if (rpc_srv_blobMap(s, b) != -1) {
                /* Replay to client! */                                /* deliver BLOB variable to client */
                blob->hdr_cmd = ret < 0 ? error : ok;                                blob.hdr_ret = htonl(rpc_srv_sendBLOB(c, b));
                blob->hdr_ret = ret;                                rpc_srv_blobUnmap(b);
                ret = send(c->cli_sock, buf, sizeof buf, 0);                        } else {
                if (ret == -1) {                                blob.hdr_cmd = error;
                        LOGERR;                                blob.hdr_ret = RPC_ERROR(-1);
                        ret = -8;                        }
                         break;                          break;
                }                case set:
                if (ret != sizeof buf) {                        if ((b = rpc_srv_registerBLOB(s, ntohl(blob.hdr_len)))) {
                        rpc_SetErr(ECANCELED, "Error:: in send BLOB reply, should be send %d bytes, "                                /* set new BLOB variable for reply :) */
                                        "really is %d\n", sizeof buf, ret);                                blob.hdr_var = htonl(b->blob_var);
                        ret = -9;
                                 /* receive BLOB from client */
                                 blob.hdr_ret = htonl(rpc_srv_recvBLOB(c, b));
                                 rpc_srv_blobUnmap(b);
                         } else {
                                 blob.hdr_cmd = error;
                                 blob.hdr_ret = RPC_ERROR(-1);
                         }
                         break;                          break;
                }                case unset:
        } while (ret > -1 || s->srv_kill != kill);                        if (rpc_srv_unregisterBLOB(s, ntohl(blob.hdr_var)) == -1) {
                                 blob.hdr_cmd = error;
                                 blob.hdr_ret = RPC_ERROR(-1);
                         }
                         break;
                 default:
                         rpc_SetErr(EPROCUNAVAIL, "Unsupported BLOB command %d", blob.hdr_cmd);
                         blob.hdr_cmd = error;
                         blob.hdr_ret = RPC_ERROR(-1);
         }
   
        shutdown(c->cli_sock, SHUT_RDWR);end:
        close(c->cli_sock);        memcpy(AIT_ADDR(&c->cli_buf), &blob, sizeof blob);
        memset(c, 0, sizeof(rpc_cli_t));        schedWrite(TASK_ROOT(task), txBLOB, TASK_ARG(task), TASK_FD(task), NULL, 0);
        return (void*) (long)ret;        schedReadSelf(task);
         return NULL;
 }  }
   
// -------------------------------------------------static void *
 acceptBLOBClients(sched_task_t *task)
 {
         rpc_srv_t *srv = TASK_ARG(task);
         rpc_cli_t *c = NULL;
         register int i;
         socklen_t salen = sizeof(io_sockaddr_t);
   
           /* check free slots for connect */
           for (i = 0; i < io_arraySize(srv->srv_blob.clients) && 
                           (c = io_array(srv->srv_blob.clients, i, rpc_cli_t*)); i++);
           if (c)  /* no more free slots! */
                   goto end;
           c = io_malloc(sizeof(rpc_cli_t));
           if (!c) {
                   LOGERR;
                   srv->srv_kill = srv->srv_blob.kill = 1;
                   return NULL;
           } else {
                   memset(c, 0, sizeof(rpc_cli_t));
                   io_arraySet(srv->srv_blob.clients, i, c);
                   c->cli_id = i;
                   c->cli_parent = srv;
           }
   
           /* alloc empty buffer */
           AIT_SET_BUF2(&c->cli_buf, 0, srv->srv_netbuf);
   
           /* accept client */
           c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
           if (c->cli_sock == -1) {
                   LOGERR;
                   AIT_FREE_VAL(&c->cli_buf);
                   io_arrayDel(srv->srv_blob.clients, i, 42);
                   goto end;
           } else
                   fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
   
           schedRead(TASK_ROOT(task), rxBLOB, c, c->cli_sock, NULL, 0);
   end:
           schedReadSelf(task);
           return NULL;
   }
   
   /* ------------------------------------------------------ */
   
 /*  /*
 * rpc_srv_initBLOBServer() Init & create BLOB Server * rpc_srv_initBLOBServer() - Init & create BLOB Server
  *
  * @srv = RPC server instance   * @srv = RPC server instance
  * @Port = Port for bind server, if Port == 0 default port is selected   * @Port = Port for bind server, if Port == 0 default port is selected
  * @diskDir = Disk place for BLOB file objects   * @diskDir = Disk place for BLOB file objects
Line 406  int Line 497  int
 rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)  rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)
 {  {
         int n = 1;          int n = 1;
         struct sockaddr sa;  
         struct sockaddr_in *sin = (struct sockaddr_in*) &sa;  
         struct sockaddr_in6 *sin6 = (struct sockaddr_in6*) &sa;  
         struct sockaddr_un *sun = (struct sockaddr_un*) &sa;  
   
        if (!srv) {        if (!srv || srv->srv_kill) {
                rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t init BLOB server ...\n");                rpc_SetErr(EINVAL, "Invalid parameters can`t init BLOB server");
                 return -1;                  return -1;
         }          }
         if (srv->srv_blob.state) {  
                 rpc_SetErr(EPERM, "Warning:: Already started BLOB server!\n");  
                 return 0;  
         }  
         if (!Port)  
                 Port = RPC_DEFPORT + 1;  
   
         memset(&srv->srv_blob, 0, sizeof srv->srv_blob);          memset(&srv->srv_blob, 0, sizeof srv->srv_blob);
         if (access(diskDir, R_OK | W_OK) == -1) {          if (access(diskDir, R_OK | W_OK) == -1) {
                 LOGERR;                  LOGERR;
                 return -1;                  return -1;
         } else          } else
                strlcpy(srv->srv_blob.dir, diskDir, UCHAR_MAX + 1);                AIT_SET_STR(&srv->srv_blob.dir, diskDir);
   
        srv->srv_blob.server.cli_tid = pthread_self();        /* init blob list */
         TAILQ_INIT(&srv->srv_blob.blobs);
 
         srv->srv_blob.server.cli_parent = srv;          srv->srv_blob.server.cli_parent = srv;
   
        memcpy(&sa, &srv->srv_server.cli_sa, sizeof sa);        memcpy(&srv->srv_blob.server.cli_sa, &srv->srv_server.cli_sa, sizeof(io_sockaddr_t));
        switch (srv->srv_server.cli_sa.sa_family) {        switch (srv->srv_blob.server.cli_sa.sa.sa_family) {
                 case AF_INET:                  case AF_INET:
                        sin->sin_port = htons(Port);                        srv->srv_blob.server.cli_sa.sin.sin_port = 
                        memcpy(&srv->srv_blob.server.cli_sa, sin, sizeof(struct sockaddr));                                htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin.sin_port) + 1);
                         break;                          break;
                 case AF_INET6:                  case AF_INET6:
                        sin6->sin6_port = htons(Port);                        srv->srv_blob.server.cli_sa.sin6.sin6_port = 
                        memcpy(&srv->srv_blob.server.cli_sa, sin6, sizeof(struct sockaddr));                                htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin6.sin6_port) + 1);
                         break;                          break;
                 case AF_LOCAL:                  case AF_LOCAL:
                        strlcat(sun->sun_path, ".blob", sizeof sun->sun_path);                        strlcat(srv->srv_blob.server.cli_sa.sun.sun_path, ".blob", 
                        memcpy(&srv->srv_blob.server.cli_sa, sun, sizeof(struct sockaddr));                                        sizeof srv->srv_blob.server.cli_sa.sun.sun_path);
                         break;                          break;
                 default:                  default:
                           AIT_FREE_VAL(&srv->srv_blob.dir);
                         return -1;                          return -1;
         }          }
   
         /* create BLOB server socket */          /* create BLOB server socket */
        srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa_family, SOCK_STREAM, 0);        srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);
         if (srv->srv_blob.server.cli_sock == -1) {          if (srv->srv_blob.server.cli_sock == -1) {
                 LOGERR;                  LOGERR;
                   AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;                  return -1;
         }          }
         if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {          if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
                 LOGERR;                  LOGERR;
                 close(srv->srv_blob.server.cli_sock);                  close(srv->srv_blob.server.cli_sock);
                   AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;                  return -1;
         }          }
        if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa,         n = srv->srv_netbuf;
                                sizeof srv->srv_blob.server.cli_sa) == -1) {        if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
                 LOGERR;                  LOGERR;
                 close(srv->srv_blob.server.cli_sock);                  close(srv->srv_blob.server.cli_sock);
                   AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;                  return -1;
         }          }
           if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
                   LOGERR;
                   close(srv->srv_blob.server.cli_sock);
                   AIT_FREE_VAL(&srv->srv_blob.dir);
                   return -1;
           }
           if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa, 
                                   srv->srv_blob.server.cli_sa.sa.sa_len) == -1) {
                   LOGERR;
                   close(srv->srv_blob.server.cli_sock);
                   AIT_FREE_VAL(&srv->srv_blob.dir);
                   return -1;
           }
   
        /* allocate pool for concurent clients */        /* allocate pool for concurent blob clients */
        srv->srv_blob.clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));        srv->srv_blob.clients = io_arrayInit(io_arraySize(srv->srv_clients));
         if (!srv->srv_blob.clients) {          if (!srv->srv_blob.clients) {
                LOGERR;                rpc_SetErr(io_GetErrno(), "%s", io_GetError());
                 close(srv->srv_blob.server.cli_sock);                  close(srv->srv_blob.server.cli_sock);
                   AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;                  return -1;
        } else        }
                memset(srv->srv_blob.clients, 0, srv->srv_numcli * sizeof(rpc_cli_t)); 
   
        pthread_mutex_init(&srv->srv_blob.mtx, NULL);        /* init blob scheduler */
         srv->srv_blob.root = schedBegin();
         if (!srv->srv_blob.root) {
                 rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                 io_arrayDestroy(&srv->srv_blob.clients);
                 close(srv->srv_blob.server.cli_sock);
                 AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;
         }
   
         pthread_mutex_lock(&srv->srv_mtx);  
         rpc_srv_registerCall(srv, NULL, CALL_BLOBSHUTDOWN, 0);  
         rpc_srv_registerCall(srv, NULL, CALL_BLOBCLIENTS, 0);  
         rpc_srv_registerCall(srv, NULL, CALL_BLOBVARS, 0);  
         rpc_srv_registerCall(srv, NULL, CALL_BLOBSTATE, 1);  
         pthread_mutex_unlock(&srv->srv_mtx);  
   
         srv->srv_blob.state = enable;   /* enable BLOB */  
         return 0;          return 0;
 }  }
   
 /*  /*
 * rpc_srv_endBLOBServer() Destroy BLOB server, close all opened sockets and free resources * rpc_srv_endBLOBServer() - Destroy BLOB server, close all opened sockets and free resources
  *
  * @srv = RPC Server instance   * @srv = RPC Server instance
  * return: none   * return: none
  */   */
voidinline void
 rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)  rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
 {  {
        rpc_cli_t *c;        if (!srv)
        register int i; 
        rpc_blob_t *f; 
 
        if (!srv) { 
                rpc_SetErr(EINVAL, "Error:: Can`t destroy server because parameter is null!\n"); 
                 return;                  return;
         } else  
                 srv->srv_blob.state = kill;  
   
        rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSHUTDOWN);        srv->srv_blob.kill = 1;
        rpc_srv_unregisterCall(srv, NULL, CALL_BLOBCLIENTS); 
        rpc_srv_unregisterCall(srv, NULL, CALL_BLOBVARS); 
        rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSTATE); 
 
        /* close all clients connections & server socket */ 
        for (i = 0, c = srv->srv_blob.clients; i < srv->srv_numcli && c; i++, c++) 
                if (c->cli_sa.sa_family) 
                        shutdown(c->cli_sock, SHUT_RDWR); 
        close(srv->srv_blob.server.cli_sock); 
 
        if (srv->srv_blob.clients) { 
                free(srv->srv_blob.clients); 
                srv->srv_blob.clients = NULL; 
        } 
 
        /* detach blobs */ 
        pthread_mutex_lock(&srv->srv_blob.mtx); 
        while ((f = srv->srv_blob.blobs)) { 
                srv->srv_blob.blobs = f->blob_next; 
                rpc_srv_blobFree(srv, f); 
                free(f); 
        } 
        pthread_mutex_unlock(&srv->srv_blob.mtx); 
 
        while (pthread_mutex_trylock(&srv->srv_blob.mtx) == EBUSY); 
        pthread_mutex_destroy(&srv->srv_blob.mtx); 
 }  }
   
 /*  /*
 * rpc_srv_execBLOBServer() Execute Main BLOB server loop and wait for clients requests * rpc_srv_loopBLOBServer() - Execute Main BLOB server loop and wait for clients requests
  *
  * @srv = RPC Server instance   * @srv = RPC Server instance
  * return: -1 error or 0 ok, infinite loop ...   * return: -1 error or 0 ok, infinite loop ...
  */   */
 int  int
rpc_srv_execBLOBServer(rpc_srv_t * __restrict srv)rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv)
 {  {
         socklen_t salen = sizeof(struct sockaddr);  
         register int i;  
         rpc_cli_t *c;          rpc_cli_t *c;
        fd_set fds;        register int i;
        int ret;        rpc_blob_t *b, *tmp;
        struct timeval tv = { DEF_RPC_TIMEOUT, 0 };        struct timespec ts = { RPC_SCHED_POLLING, 0 };
   
        if (!srv || srv->srv_blob.state == kill) {        if (!srv || srv->srv_kill) {
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start BLOB server ...\n");                rpc_SetErr(EINVAL, "Invalid parameter can`t start BLOB server");
                 return -1;                  return -1;
         }          }
   
        if (listen(srv->srv_blob.server.cli_sock, SOMAXCONN) == -1) {        fcntl(srv->srv_blob.server.cli_sock, F_SETFL, 
                         fcntl(srv->srv_blob.server.cli_sock, F_GETFL) | O_NONBLOCK);
 
         if (listen(srv->srv_blob.server.cli_sock, io_arraySize(srv->srv_blob.clients)) == -1) {
                 LOGERR;                  LOGERR;
                 return -1;                  return -1;
         }          }
   
        while (srv->srv_blob.state != kill && srv->srv_kill != kill) {        if (!schedRead(srv->srv_blob.root, acceptBLOBClients, srv, 
                for (c = srv->srv_blob.clients, i = 0; i < srv->srv_numcli && c; i++, c++)                                srv->srv_blob.server.cli_sock, NULL, 0)) {
                        if (!c->cli_sa.sa_family)                rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                                break;                return -1;
                if (i >= srv->srv_numcli) {        }
                        usleep(1000000); 
                        continue; 
                } 
   
                FD_ZERO(&fds);        schedPolling(srv->srv_blob.root, &ts, NULL);
                FD_SET(srv->srv_blob.server.cli_sock, &fds);        /* main rpc loop */
                ret = select(srv->srv_blob.server.cli_sock + 1, &fds, NULL, NULL, &tv);        schedRun(srv->srv_blob.root, &srv->srv_blob.kill);
                if (ret == -1) { 
                        LOGERR; 
                        ret = 1; 
                        break; 
                } 
                if (!ret) 
                        continue; 
   
                c->cli_sock = accept(srv->srv_blob.server.cli_sock, &c->cli_sa, &salen);        /* close all clients connections & server socket */
                if (c->cli_sock == -1) {        for (i = 0; i < io_arraySize(srv->srv_blob.clients); i++) {
                        LOGERR;                c = io_array(srv->srv_blob.clients, i, rpc_cli_t*);
                        continue;                if (c) {
                } else                        shutdown(c->cli_sock, SHUT_RDWR);
                        c->cli_parent = srv;                        close(c->cli_sock);
   
                if (pthread_create(&c->cli_tid, NULL, rpc_srv_dispatchVars, c)) {                        schedCancelby(srv->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
                        LOGERR;                        AIT_FREE_VAL(&c->cli_buf);
                        continue;                }
                } else                io_arrayDel(srv->srv_blob.clients, i, 42);
                        pthread_detach(c->cli_tid); 
         }          }
           io_arrayDestroy(&srv->srv_blob.clients);
   
        srv->srv_blob.state = disable;        close(srv->srv_blob.server.cli_sock);
   
           /* detach blobs */
           TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
                   TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
   
                   rpc_srv_blobFree(srv, b);
                   io_free(b);
           }
   
           schedEnd(&srv->srv_blob.root);
           AIT_FREE_VAL(&srv->srv_blob.dir);
         return 0;          return 0;
 }  }
   
   
 /*  /*
 * rpc_srv_initServer() Init & create RPC Server * rpc_srv_initServer() - Init & create RPC Server
  *
  * @regProgID = ProgramID for authentication & recognition   * @regProgID = ProgramID for authentication & recognition
  * @regProcID = ProcessID for authentication & recognition   * @regProcID = ProcessID for authentication & recognition
  * @concurentClients = Concurent clients at same time to this server   * @concurentClients = Concurent clients at same time to this server
 * @family = Family type, AF_INET, AF_INET6 or AF_LOCAL * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
  * @csHost = Host name or address for bind server, if NULL any address   * @csHost = Host name or address for bind server, if NULL any address
  * @Port = Port for bind server, if Port == 0 default port is selected   * @Port = Port for bind server, if Port == 0 default port is selected
  * return: NULL == error or !=NULL bind and created RPC server instance   * return: NULL == error or !=NULL bind and created RPC server instance
  */   */
 rpc_srv_t *  rpc_srv_t *
rpc_srv_initServer(u_int regProgID, u_int regProcID, int concurentClients, rpc_srv_initServer(u_int regProgID, u_char regProcID, int concurentClients, 
                u_short family, const char *csHost, u_short Port)                int netBuf, const char *csHost, u_short Port)
 {  {
         rpc_srv_t *srv = NULL;  
         int n = 1;          int n = 1;
        struct hostent *host = NULL;        rpc_srv_t *srv = NULL;
        struct sockaddr sa;        io_sockaddr_t sa = IO_SOCKADDR_INIT;
        struct sockaddr_in *sin = (struct sockaddr_in*) &sa; 
        struct sockaddr_in6 *sin6 = (struct sockaddr_in6*) &sa; 
        struct sockaddr_un *sun = (struct sockaddr_un*) &sa; 
   
        if (!concurentClients || !regProgID ||         if (!concurentClients || !regProgID) {
                        (family != AF_INET && family != AF_INET6 && family != AF_LOCAL)) {                rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
                rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t init RPC server ...\n"); 
                 return NULL;                  return NULL;
         }          }
           if (!io_gethostbyname(csHost, Port, &sa))
                   return NULL;
         if (!Port)          if (!Port)
                 Port = RPC_DEFPORT;                  Port = RPC_DEFPORT;
        if (csHost && family != AF_LOCAL) {        if (netBuf < RPC_MIN_BUFSIZ)
                host = gethostbyname2(csHost, family);                netBuf = BUFSIZ;
                if (!host) {        else
                        rpc_SetErr(h_errno, "Error:: %s\n", hstrerror(h_errno));                netBuf = io_align(netBuf, 1);   /* align netBuf length */
                        return NULL; 
                } 
        } 
        memset(&sa, 0, sizeof sa); 
        sa.sa_family = family; 
        switch (family) { 
                case AF_INET: 
                        sin->sin_len = sizeof(struct sockaddr_in); 
                        sin->sin_port = htons(Port); 
                        if (csHost) 
                                memcpy(&sin->sin_addr, host->h_addr, host->h_length); 
                        break; 
                case AF_INET6: 
                        sin6->sin6_len = sizeof(struct sockaddr_in6); 
                        sin6->sin6_port = htons(Port); 
                        if (csHost) 
                                memcpy(&sin6->sin6_addr, host->h_addr, host->h_length); 
                        break; 
                case AF_LOCAL: 
                        sun->sun_len = sizeof(struct sockaddr_un); 
                        if (csHost) 
                                strlcpy(sun->sun_path, csHost, sizeof sun->sun_path); 
                        break; 
                default: 
                        rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t start RPC server ...\n"); 
                        return NULL; 
        } 
   
        srv = malloc(sizeof(rpc_srv_t));#ifdef HAVE_SRANDOMDEV
         srandomdev();
 #else
         time_t tim;
 
         srandom((time(&tim) ^ getpid()));
 #endif
 
         srv = io_malloc(sizeof(rpc_srv_t));
         if (!srv) {          if (!srv) {
                 LOGERR;                  LOGERR;
                 return NULL;                  return NULL;
         } else          } else
                 memset(srv, 0, sizeof(rpc_srv_t));                  memset(srv, 0, sizeof(rpc_srv_t));
   
        srv->srv_numcli = concurentClients;        srv->srv_netbuf = netBuf;
         srv->srv_session.sess_version = RPC_VERSION;          srv->srv_session.sess_version = RPC_VERSION;
         srv->srv_session.sess_program = regProgID;          srv->srv_session.sess_program = regProgID;
         srv->srv_session.sess_process = regProcID;          srv->srv_session.sess_process = regProcID;
   
         srv->srv_server.cli_tid = pthread_self();  
         srv->srv_server.cli_parent = srv;          srv->srv_server.cli_parent = srv;
        switch (family) {        memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa);
                case AF_INET:
                        memcpy(&srv->srv_server.cli_sa, sin, sizeof srv->srv_server.cli_sa);        /* init functions list */
                        break;        TAILQ_INIT(&srv->srv_funcs);
                case AF_INET6:
                        memcpy(&srv->srv_server.cli_sa, sin6, sizeof srv->srv_server.cli_sa);        /* init scheduler */
                        break;        srv->srv_root = schedBegin();
                case AF_LOCAL:        if (!srv->srv_root) {
                        memcpy(&srv->srv_server.cli_sa, sun, sizeof srv->srv_server.cli_sa);                rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                        unlink(sun->sun_path);                io_free(srv);
                        break;                return NULL;
         }          }
   
           /* init pool for clients */
           srv->srv_clients = io_arrayInit(concurentClients);
           if (!srv->srv_clients) {
                   rpc_SetErr(io_GetErrno(), "%s", io_GetError());
                   schedEnd(&srv->srv_root);
                   io_free(srv);
                   return NULL;
           }
   
         /* create server socket */          /* create server socket */
        srv->srv_server.cli_sock = socket(family, SOCK_STREAM, 0);        srv->srv_server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);
         if (srv->srv_server.cli_sock == -1) {          if (srv->srv_server.cli_sock == -1) {
                 LOGERR;                  LOGERR;
                free(srv);                io_arrayDestroy(&srv->srv_clients);
                 schedEnd(&srv->srv_root);
                 io_free(srv);
                 return NULL;                  return NULL;
         }          }
         if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {          if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
                 LOGERR;                  LOGERR;
                close(srv->srv_server.cli_sock);                goto err;
                free(srv); 
                return NULL; 
         }          }
        if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa, sizeof srv->srv_server.cli_sa) == -1) {        n = srv->srv_netbuf;
         if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
                 LOGERR;                  LOGERR;
                close(srv->srv_server.cli_sock);                goto err;
                free(srv); 
                return NULL; 
         }          }
        if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
        /* allocate pool for concurent clients */ 
        srv->srv_clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t)); 
        if (!srv->srv_clients) { 
                 LOGERR;                  LOGERR;
                close(srv->srv_server.cli_sock);                goto err;
                free(srv);        }
                return NULL;        if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa, 
        } else                                srv->srv_server.cli_sa.sa.sa_len) == -1) {
                memset(srv->srv_clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));                LOGERR;
                 goto err;
         }
   
        pthread_mutex_init(&srv->srv_mtx, NULL);        rpc_register_srvPing(srv);
   
         rpc_srv_registerCall(srv, NULL, CALL_SRVSHUTDOWN, 0);  
         rpc_srv_registerCall(srv, NULL, CALL_SRVCLIENTS, 0);  
         rpc_srv_registerCall(srv, NULL, CALL_SRVCALLS, 0);  
         rpc_srv_registerCall(srv, NULL, CALL_SRVSESSIONS, 0);  
         return srv;          return srv;
   err:    /* error condition */
           close(srv->srv_server.cli_sock);
           io_arrayDestroy(&srv->srv_clients);
           schedEnd(&srv->srv_root);
           io_free(srv);
           return NULL;
 }  }
   
 /*  /*
 * rpc_srv_endServer() Destroy RPC server, close all opened sockets and free resources * rpc_srv_endServer() - Destroy RPC server, close all opened sockets and free resources
 * @srv = RPC Server instance *
  * @psrv = RPC Server instance
  * return: none   * return: none
  */   */
voidinline void
rpc_srv_endServer(rpc_srv_t * __restrict srv)rpc_srv_endServer(rpc_srv_t ** __restrict psrv)
 {  {
        rpc_cli_t *c;        if (!psrv || !*psrv)
        register int i; 
        rpc_func_t *f; 
 
        if (!srv) { 
                rpc_SetErr(EINVAL, "Error:: Can`t destroy server because parameter is null!\n"); 
                 return;                  return;
         }  
   
        rpc_srv_endBLOBServer(srv);        /* if send kill to blob server */
         if (!(*psrv)->srv_blob.kill)
                 rpc_srv_endBLOBServer(*psrv);
   
        /* close all clients connections & server socket */        (*psrv)->srv_kill = 1;
        for (i = 0, c = srv->srv_clients; i < srv->srv_numcli && c; i++, c++)        sleep(RPC_SCHED_POLLING);
                if (c->cli_sa.sa_family) { 
                        shutdown(c->cli_sock, SHUT_RDWR); 
                        close(c->cli_sock); 
                } 
        close(srv->srv_server.cli_sock); 
   
        if (srv->srv_clients) {        io_free(*psrv);
                free(srv->srv_clients);        *psrv = NULL;
                srv->srv_clients = NULL; 
                srv->srv_numcli = 0; 
        } 
 
        /* detach exported calls */ 
        pthread_mutex_lock(&srv->srv_mtx); 
        while ((f = srv->srv_funcs)) { 
                srv->srv_funcs = f->func_next; 
                free(f); 
        } 
        pthread_mutex_unlock(&srv->srv_mtx); 
 
        while (pthread_mutex_trylock(&srv->srv_mtx) == EBUSY); 
        pthread_mutex_destroy(&srv->srv_mtx); 
 
        free(srv); 
        srv = NULL; 
 }  }
   
 /*  /*
 * rpc_srv_execServer() Execute Main server loop and wait for clients requests * rpc_srv_loopServer() - Execute Main server loop and wait for clients requests
  *
  * @srv = RPC Server instance   * @srv = RPC Server instance
  * return: -1 error or 0 ok, infinite loop ...   * return: -1 error or 0 ok, infinite loop ...
  */   */
 int  int
rpc_srv_execServer(rpc_srv_t * __restrict srv)rpc_srv_loopServer(rpc_srv_t * __restrict srv)
 {  {
         socklen_t salen = sizeof(struct sockaddr);  
         register int i;  
         rpc_cli_t *c;          rpc_cli_t *c;
        fd_set fds;        register int i;
        int ret;        rpc_func_t *f, *tmp;
        struct timeval tv = { DEF_RPC_TIMEOUT, 0 };        struct timespec ts = { RPC_SCHED_POLLING, 0 };
   
         if (!srv) {          if (!srv) {
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start RPC server ...\n");                rpc_SetErr(EINVAL, "Invalid parameter can`t start RPC server");
                 return -1;                  return -1;
         }          }
   
        if (listen(srv->srv_server.cli_sock, SOMAXCONN) == -1) {        fcntl(srv->srv_server.cli_sock, F_SETFL, 
                         fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
 
         if (listen(srv->srv_server.cli_sock, io_arraySize(srv->srv_clients)) == -1) {
                 LOGERR;                  LOGERR;
                 return -1;                  return -1;
         }          }
   
        while (srv->srv_kill != kill) {        if (!schedRead(srv->srv_root, acceptClients, srv, srv->srv_server.cli_sock, NULL, 0)) {
                for (c = srv->srv_clients, i = 0; i < srv->srv_numcli && c; i++, c++)                rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                        if (!c->cli_sa.sa_family)                return -1;
                                break;        }
                if (i >= srv->srv_numcli) { 
                        usleep(1000000); 
                        continue; 
                } 
   
                FD_ZERO(&fds);        schedPolling(srv->srv_root, &ts, NULL);
                FD_SET(srv->srv_server.cli_sock, &fds);        /* main rpc loop */
                ret = select(srv->srv_server.cli_sock + 1, &fds, NULL, NULL, &tv);        schedRun(srv->srv_root, &srv->srv_kill);
                if (ret == -1) {
                        LOGERR;        /* close all clients connections & server socket */
                        ret = 1;        for (i = 0; i < io_arraySize(srv->srv_clients); i++) {
                        break;                c = io_array(srv->srv_clients, i, rpc_cli_t*);
                 if (c) {
                         shutdown(c->cli_sock, SHUT_RDWR);
                         close(c->cli_sock);
 
                         schedCancelby(srv->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
                         io_freeVars(&RPC_RETVARS(c));
                         AIT_FREE_VAL(&c->cli_buf);
                 }                  }
                if (!ret)                io_arrayDel(srv->srv_clients, i, 42);
                        continue;        }
         io_arrayDestroy(&srv->srv_clients);
   
                c->cli_sock = accept(srv->srv_server.cli_sock, &c->cli_sa, &salen);        close(srv->srv_server.cli_sock);
                if (c->cli_sock == -1) { 
                        LOGERR; 
                        continue; 
                } else 
                        c->cli_parent = srv; 
   
                if (pthread_create(&c->cli_tid, NULL, rpc_srv_dispatchCall, c)) {        /* detach exported calls */
                        LOGERR;        TAILQ_FOREACH_SAFE(f, &srv->srv_funcs, func_node, tmp) {
                        continue;                TAILQ_REMOVE(&srv->srv_funcs, f, func_node);
                } else
                        pthread_detach(c->cli_tid);                AIT_FREE_VAL(&f->func_name);
                 io_free(f);
         }          }
   
           schedEnd(&srv->srv_root);
         return 0;          return 0;
 }  }
   
 // ---------------------------------------------------------  
   
 /*  /*
  * rpc_srv_execCall() Execute registered call from RPC server   * rpc_srv_execCall() Execute registered call from RPC server
 * @call = Register RPC call *
  * @cli = RPC client
  * @rpc = IN RPC call structure   * @rpc = IN RPC call structure
 * @args = IN RPC call array of rpc values * @funcname = Execute RPC function
  * @args = IN RPC calling arguments from RPC client
  * return: -1 error, !=-1 ok   * return: -1 error, !=-1 ok
  */   */
 int  int
rpc_srv_execCall(rpc_func_t * __restrict call, struct tagRPCCall * __restrict rpc, rpc_srv_execCall(rpc_cli_t * __restrict cli, struct tagRPCCall * __restrict rpc, 
                rpc_val_t * __restrict args)                ait_val_t funcname, array_t * __restrict args)
 {  {
         void *dl;  
         rpc_callback_t func;          rpc_callback_t func;
         int ret;  
   
        if (!call || !rpc || !call->func_parent) {        if (!cli || !rpc || !AIT_ADDR(&funcname)) {
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t exec call from RPC server ...\n");                rpc_SetErr(EINVAL, "Invalid parameter can`t exec function");
                 return -1;                  return -1;
         }          }
   
        dl = dlopen((char*) (*call->func_file ? call->func_file : NULL), RTLD_NOW);        func = AIT_GET_LIKE(&funcname, rpc_callback_t);
        if (!dl) {        return func(cli, rpc, args);
                rpc_SetErr(ENOENT, "Error:: Can`t attach module %s!\n", dlerror()); 
                return -1; 
        } 
 
        func = dlsym(dl, (char*) call->func_name); 
        if (func) 
                ret = func(call, rpc->call_argc, args); 
        else { 
                rpc_SetErr(ENOEXEC, "Error:: Can`t find function %s!\n", dlerror()); 
                ret = -1; 
        } 
 
        dlclose(dl); 
        return ret; 
 }  }

Removed from v.1.4  
changed lines
  Added in v.1.11


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>