Diff for /libaitrpc/src/srv.c between versions 1.5.2.11 and 1.9.2.12

version 1.5.2.11, 2011/11/03 15:22:47 version 1.9.2.12, 2012/05/16 09:02:48
Line 12  terms: Line 12  terms:
 All of the documentation and software included in the ELWIX and AITNET  All of the documentation and software included in the ELWIX and AITNET
 Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>  Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
   
Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
         by Michael Pounov <misho@elwix.org>.  All rights reserved.          by Michael Pounov <misho@elwix.org>.  All rights reserved.
   
 Redistribution and use in source and binary forms, with or without  Redistribution and use in source and binary forms, with or without
Line 47  SUCH DAMAGE. Line 47  SUCH DAMAGE.
   
   
 static void *  static void *
rpc_srv_dispatchCall(void *arg)closeClient(sched_task_t *task)
 {  {
        rpc_cli_t *c = arg;        rpc_cli_t *c = TASK_ARG(task);
        rpc_srv_t *s;        rpc_srv_t *s = c->cli_parent;
 
         schedCancelby(s->srv_root, taskMAX, CRITERIA_ARG, TASK_ARG(task), NULL);
 
         /* close client socket */
         if (TASK_VAL(task))
                 shutdown(c->cli_sock, SHUT_RDWR);
         close(c->cli_sock);
 
         /* free buffer */
         AIT_FREE_VAL(&c->cli_buf);
 
         io_arrayDel(s->srv_clients, c->cli_id, 42);
         return NULL;
 }
 
 static void *
 txPacket(sched_task_t *task)
 {
         rpc_cli_t *c = TASK_ARG(task);
         rpc_srv_t *s = c->cli_parent;
         rpc_func_t *f = NULL;          rpc_func_t *f = NULL;
        array_t *arr;        u_char buf[USHRT_MAX] = { 0 };
        struct tagRPCCall *rpc;        struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
        struct tagRPCRet *rrpc;        int ret, wlen = sizeof(struct tagRPCCall);
        rpc_sess_t ses = { 0 }; 
        fd_set fds; 
        u_char *buf; 
        int ret, argc = 0, Limit = 0; 
        register int i; 
        uint16_t tag = 0; 
        uint32_t hash = 0; 
   
        if (!arg) {        /* copy RPC header */
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t procced RPC client ...\n");        memcpy(buf, TASK_DATA(task), wlen);
                return NULL; 
        } else 
                s = c->cli_parent; 
   
        buf = malloc(s->srv_netbuf);        if (rpc->call_argc) {
        if (!buf) {                f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
                LOGERR;                if (!f) {
                return NULL;                        rpc->call_argc ^= rpc->call_argc;
                         rpc->call_rep.ret = RPC_ERROR(-1);
                         rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                 } else {
                         rpc->call_argc = htons(io_arraySize(f->func_vars));
                         /* Go Encapsulate variables */
                         ret = io_vars2buffer(buf + wlen, sizeof buf - wlen, f->func_vars);
                         /* Free return values */
                         io_clrVars(f->func_vars);
                         if (ret == -1) {
                                 rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
                                 rpc->call_argc ^= rpc->call_argc;
                                 rpc->call_rep.ret = RPC_ERROR(-1);
                                 rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                         } else
                                 wlen += ret;
                 }
         }          }
   
        do {        rpc->call_len = htons(wlen);
                FD_ZERO(&fds); 
                FD_SET(c->cli_sock, &fds); 
                ret = select(c->cli_sock + 1, &fds, NULL, NULL, NULL); 
                if (ret == -1) { 
                        if (errno == EINTR && s->srv_kill != kill) 
                                continue; 
   
                        LOGERR;        /* calculate CRC */
                        ret = -2;        rpc->call_crc ^= rpc->call_crc;
                        break;        rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
 
         /* send reply */
         ret = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);
         if (ret == -1 || ret != wlen) {
                 /* close connection */
                 schedEvent(TASK_ROOT(task), closeClient, c, 42, NULL, 0);
         }
 
         return NULL;
 }
 
 static void *
 execCall(sched_task_t *task)
 {
         rpc_cli_t *c = TASK_ARG(task);
         rpc_srv_t *s = c->cli_parent;
         rpc_func_t *f = NULL;
         array_t *arr = NULL;
         u_char *buf = AIT_GET_BUF(&c->cli_buf) + TASK_VAL(task);
         struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
         int argc = ntohs(rpc->call_argc);
 
         /* Go decapsulate variables ... */
         if (argc) {
                 arr = io_buffer2vars(buf + sizeof(struct tagRPCCall), 
                                 AIT_LEN(&c->cli_buf) - TASK_VAL(task) - sizeof(struct tagRPCCall), 
                                 argc, 1);
                 if (!arr) {
                         rpc_SetErr(ERPCMISMATCH, "#%d - %s", io_GetErrno(), io_GetError());
                         rpc->call_argc ^= rpc->call_argc;
                         rpc->call_rep.ret = RPC_ERROR(-1);
                         rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                         return NULL;
                 }                  }
                memset(buf, 0, s->srv_netbuf);        } else
                ret = recv(c->cli_sock, buf, s->srv_netbuf, 0);                arr = NULL;
                if (ret == -1) {
                        LOGERR;        if (!(f = rpc_srv_getCall(s, ntohs(rpc->call_tag)))) {
                        ret = -3;                rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
                        break;                rpc->call_argc ^= rpc->call_argc;
                 rpc->call_rep.ret = RPC_ERROR(-1);
                 rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
         } else {
                 /* if client doesn't want reply */
                 argc = rpc->call_req.flags & RPC_NOREPLY;
                 rpc->call_rep.ret = RPC_ERROR(rpc_srv_execCall(f, rpc, arr));
                 if (rpc->call_rep.ret == htonl(-1)) {
                         rpc->call_rep.eno = RPC_ERROR(errno);
                         rpc->call_argc ^= rpc->call_argc;
                 } else {
                         rpc->call_rep.eno ^= rpc->call_rep.eno;
                         if (argc) {
                                 /* without reply */
                                 io_clrVars(f->func_vars);
                                 rpc->call_argc ^= rpc->call_argc;
                         } else {
                                 /* reply */
                                 rpc->call_argc = htons(io_arraySize(f->func_vars));
                         }
                 }                  }
                if (!ret) {             /* receive EOF */        }
                        ret = 0; 
                        break; 
                } 
                if (ret < sizeof(struct tagRPCCall)) { 
                        rpc_SetErr(ERPCMISMATCH, "Error:: too short RPC packet ...\n"); 
                        ret = -4; 
                        if (s->srv_kill != kill) 
                                continue; 
                        else 
                                break; 
                } else 
                        rpc = (struct tagRPCCall*) buf; 
                /* check RPC packet session info */ 
                if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) { 
                        rpc_SetErr(ERPCMISMATCH, "Error:: get invalid RPC session ...\n"); 
                        ret = -5; 
                        goto makeReply; 
                } else 
                        Limit = sizeof(struct tagRPCCall); 
   
                tag = rpc->call_tag;        io_arrayDestroy(&arr);
                hash = rpc->call_hash;        return NULL;
 }
   
                /* RPC is OK! Go decapsulate variables ... */static void *
                if (ntohs(rpc->call_argc)) {rxPacket(sched_task_t *task)
                        arr = io_buffer2vars(buf + Limit, s->srv_netbuf - Limit, {
                                        ntohs(rpc->call_argc), 1);        rpc_cli_t *c = TASK_ARG(task);
                        if (!arr) {        rpc_srv_t *s = c->cli_parent;
                                ret = -5;        u_char *buf = AIT_GET_BUF(&c->cli_buf);
                                goto makeReply;        int len, rlen, noreply;
                        }        u_short crc, off = 0;
                } else        struct tagRPCCall *rpc;
                        arr = NULL; 
   
                /* execute call */        memset(buf, 0, AIT_LEN(&c->cli_buf));
                argc = 0;        rlen = recv(TASK_FD(task), buf, AIT_LEN(&c->cli_buf), 0);
                memcpy(&ses, &rpc->call_session, sizeof ses);        if (rlen < 1) {
                if (!(f = rpc_srv_getCall(s, ntohs(tag), ntohl(hash)))) {                /* close connection */
                        rpc_SetErr(EPROGUNAVAIL, "Error:: call not found into RPC server ...\n");                schedEvent(TASK_ROOT(task), closeClient, c, 0, NULL, 0);
                        ret = -6;                return NULL;
                } else        }
                        if ((ret = rpc_srv_execCall(f, rpc, arr)) == -1) 
                                ret = -9; 
                        else { 
                                if (arr) 
                                        io_arrayDestroy(&arr); 
                                argc = rpc_srv_getVars(f, &arr); 
                                goto makeReply;         /* Call finish OK */ 
                        } 
   
                if (arr)        do {
                        io_arrayDestroy(&arr);                /* check RPC packet */
                 if (rlen < sizeof(struct tagRPCCall)) {
                         rpc_SetErr(ERPCMISMATCH, "Too short RPC packet");
   
makeReply:                        schedReadSelf(task);
                /* Made reply */                        return NULL;
                memset(buf, 0, s->srv_netbuf);                } else
                rrpc = (struct tagRPCRet*) buf;                        rpc = (struct tagRPCCall*) (buf + off);
                Limit = sizeof(struct tagRPCRet); 
   
                memcpy(&rrpc->ret_session, &ses, sizeof(rpc_sess_t));                len = ntohs(rpc->call_len);
                rrpc->ret_tag = tag;                rlen -= len;
                rrpc->ret_hash = hash; 
                rrpc->ret_errno = htonl(rpc_Errno); 
                rrpc->ret_retcode = htonl(ret); 
                rrpc->ret_argc = htons(argc); 
   
                if (argc && arr) {                /* check integrity of packet */
                        /* Go Encapsulate variables ... */                crc = ntohs(rpc->call_crc);
                        if ((i = io_vars2buffer(buf + Limit, s->srv_netbuf - Limit, arr)) == -1) {                rpc->call_crc ^= rpc->call_crc;
                                rpc_srv_freeVars(f);                if (crc != crcFletcher16((u_short*) (buf + off), len / 2)) {
                                argc = 0;                        rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
                                ret = -7; 
                                rpc_SetErr(EBADRPC, "Error:: in prepare RPC packet values (-7) ...\n"); 
                                goto makeReply; 
                        } else { 
                                Limit += i; 
   
                                rpc_srv_freeVars(f);                        off += len;
                        }                        if (rlen < 1)
                                 break;
                         else
                                 continue;
                 }                  }
   
                ret = send(c->cli_sock, buf, Limit, 0);                noreply = rpc->call_req.flags & RPC_NOREPLY;
                if (ret == -1) {
                        LOGERR;                /* check RPC packet session info */
                        ret = -8;                if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) {
                        break;                        rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
                         rpc->call_argc ^= rpc->call_argc;
                         rpc->call_rep.ret = RPC_ERROR(-1);
                         rpc->call_rep.eno = RPC_ERROR(errno);
                 } else {
                         /* execute RPC call */
                         schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), off, NULL, 0);
                 }                  }
                 if (ret != Limit) {  
                         rpc_SetErr(EPROCUNAVAIL, "Error:: in send RPC request, should be send %d bytes, "  
                                         "really is %d\n", Limit, ret);  
                         ret = -9;  
                         if (s->srv_kill != kill)  
                                 continue;  
                         else  
                                 break;  
                 }  
         } while (ret > -1 || s->srv_kill != kill);  
   
        shutdown(c->cli_sock, SHUT_RDWR);                /* send RPC reply */
        close(c->cli_sock);                if (!noreply)
        memset(c, 0, sizeof(rpc_cli_t));                        schedWrite(TASK_ROOT(task), txPacket, TASK_ARG(task), TASK_FD(task), rpc, len);
        free(buf);
        return (void*) (long)ret;                off += len;
         } while (rlen > 0);
 
         /* lets get next packet */
         schedReadSelf(task);
         return NULL;
 }  }
   
   static void *
   acceptClients(sched_task_t *task)
   {
           rpc_srv_t *srv = TASK_ARG(task);
           rpc_cli_t *c = NULL;
           register int i;
           socklen_t salen = sizeof(io_sockaddr_t);
   
           /* check free slots for connect */
           for (i = 0; i < io_arraySize(srv->srv_clients) && 
                           (c = io_array(srv->srv_clients, i, rpc_cli_t*)); i++);
           if (c)  /* no more free slots! */
                   goto end;
           c = malloc(sizeof(rpc_cli_t));
           if (!c) {
                   LOGERR;
                   srv->srv_kill = kill;
                   return NULL;
           } else {
                   memset(c, 0, sizeof(rpc_cli_t));
                   io_arraySet(srv->srv_clients, i, c);
                   c->cli_id = i;
                   c->cli_parent = srv;
           }
   
           /* alloc empty buffer */
           AIT_SET_BUF2(&c->cli_buf, 0, srv->srv_netbuf);
   
           /* accept client */
           c->cli_sock = accept(srv->srv_server.cli_sock, &c->cli_sa.sa, &salen);
           if (c->cli_sock == -1) {
                   LOGERR;
                   AIT_FREE_VAL(&c->cli_buf);
                   io_arrayDel(srv->srv_clients, i, 42);
                   goto end;
           } else
                   fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
   
           schedRead(TASK_ROOT(task), rxPacket, c, c->cli_sock, NULL, 0);
   end:
           schedReadSelf(task);
           return NULL;
   }
   
   /* ------------------------------------------------------ */
   
 static void *  static void *
rpc_srv_dispatchVars(void *arg)acceptBLOBClients(sched_task_t *task)
 {  {
        rpc_cli_t *c = arg;        rpc_srv_t *srv = TASK_ARG(task);
        rpc_srv_t *s;        rpc_cli_t *c = NULL;
        rpc_blob_t *b;        register int i;
        int ret = 0;        socklen_t salen = sizeof(io_sockaddr_t);
        fd_set fds; 
        u_char buf[sizeof(struct tagBLOBHdr)]; 
        struct tagBLOBHdr *blob; 
   
        if (!arg) {        /* check free slots for connect */
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t procced BLOB client ...\n");        for (i = 0; i < io_arraySize(srv->srv_clients) && 
                         (c = io_array(srv->srv_clients, i, rpc_cli_t*)); i++);
         if (c)  /* no more free slots! */
                 goto end;
         c = malloc(sizeof(rpc_cli_t));
         if (!c) {
                 LOGERR;
                 srv->srv_kill = kill;
                 return NULL;                  return NULL;
           } else {
                   memset(c, 0, sizeof(rpc_cli_t));
                   io_arraySet(srv->srv_clients, i, c);
                   c->cli_id = i;
                   c->cli_parent = srv;
           }
   
           /* alloc empty buffer */
           AIT_SET_BUF2(&c->cli_buf, 0, srv->srv_netbuf);
   
           /* accept client */
           c->cli_sock = accept(srv->srv_server.cli_sock, &c->cli_sa.sa, &salen);
           if (c->cli_sock == -1) {
                   LOGERR;
                   AIT_FREE_VAL(&c->cli_buf);
                   io_arrayDel(srv->srv_clients, i, 42);
                   goto end;
         } else          } else
                s = c->cli_parent;                fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
   
        do {        schedRead(TASK_ROOT(task), rxPacket, c, c->cli_sock, NULL, 0);
                /* check for disable service at this moment? */end:
                if (s->srv_blob.state == disable && s->srv_kill != kill) {        schedReadSelf(task);
                        usleep(100000);        return NULL;
 }
 
 #if 0
 static void *
 txBLOB(sched_task_t *task)
 {
         u_char *buf = TASK_DATA(task);
         struct tagBLOBHdr *blob = (struct tagBLOBHdr *) buf;
         int wlen = sizeof(struct tagBLOBHdr);
 
         /* calculate CRC */
         blob->hdr_crc ^= blob->hdr_crc;
         blob->hdr_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
 
         /* send reply */
         wlen = send(TASK_FD(task), buf, wlen, 0);
         if (wlen == -1)
                 LOGERR;
         else if (wlen != sizeof(struct tagBLOBHdr))
                 rpc_SetErr(EPROCUNAVAIL, "RPC reply, should be send %d bytes, "
                                 "really sended %d bytes", sizeof(struct tagBLOBHdr), wlen);
 
         return NULL;
 }
 
 static void *
 rxBLOB(sched_task_t *task)
 {
         rpc_cli_t *c = TASK_ARG(task);
         rpc_srv_t *s = c->cli_parent;
         rpc_blob_t *b;
         u_char *buf = TASK_DATA(task);
         struct tagBLOBHdr *blob = (struct tagBLOBHdr *) buf;
         int rlen;
         u_short crc;
         struct timespec ts;
 
         /* check for disable service at this moment? */
         if (!s || s->srv_blob.state == disable) {
                 usleep(100000);
 #ifdef HAVE_PTHREAD_YIELD  #ifdef HAVE_PTHREAD_YIELD
                        pthread_yield();                pthread_yield();
 #endif  #endif
                        continue;                schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task), 
                }                                TASK_DATA(task), TASK_DATLEN(task));
                 return NULL;
         }
   
                FD_ZERO(&fds);        memset(buf, 0, TASK_DATLEN(task));
                FD_SET(c->cli_sock, &fds);        rlen = recv(TASK_FD(task), buf, TASK_DATLEN(task), 0);
                ret = select(c->cli_sock + 1, &fds, NULL, NULL, NULL);        if (rlen == -1) {
                if (ret == -1) {                LOGERR;
                        if (errno == EINTR && s->srv_kill != kill && s->srv_blob.state != kill)                c->cli_kill = kill;
                                continue;                return NULL;
         } else if (!rlen) { /* receive EOF */
                 c->cli_kill = kill;
                 return NULL;
         }
   
                        LOGERR;        if (rlen < sizeof(struct tagBLOBHdr)) {
                        ret = -2;                rpc_SetErr(ERPCMISMATCH, "Too short BLOB packet");
                 schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task), 
                                 TASK_DATA(task), TASK_DATLEN(task));
                 return NULL;
         }
 
         /* check integrity of packet */
         crc = ntohs(blob->hdr_crc);
         blob->hdr_crc ^= blob->hdr_crc;
         if (crc != crcFletcher16((u_short*) buf, rlen / 2)) {
                 rpc_SetErr(ERPCMISMATCH, "Bad CRC BLOB packet");
                 schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task), 
                                 TASK_DATA(task), TASK_DATLEN(task));
                 return NULL;
         }
 
         /* check RPC packet session info */
         if ((crc = rpc_chkPktSession(&blob->hdr_session, &s->srv_session))) {
                 rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
                 blob->hdr_cmd = error;
                 goto end;
         } else {
                 /* change socket timeout from last packet */
                 ts.tv_sec = blob->hdr_session.sess_timeout;
                 ts.tv_nsec = 0;
                 schedPolling(TASK_ROOT(task), &ts, NULL);
         }
 
         /* Go to proceed packet ... */
         switch (blob->hdr_cmd) {
                 case get:
                         if (!(b = rpc_srv_getBLOB(s, ntohl(blob->hdr_var)))) {
                                 rpc_SetErr(EINVAL, "Var=%x not found", ntohl(blob->hdr_var));
                                 blob->hdr_cmd = no;
                                 blob->hdr_ret = RPC_ERROR(-1);
                                 break;
                         } else
                                 blob->hdr_len = htonl(b->blob_len);
 
                         if (rpc_srv_blobMap(s, b) != -1) {
                                 /* deliver BLOB variable to client */
                                 blob->hdr_ret = htonl(rpc_srv_sendBLOB(c, b));
                                 rpc_srv_blobUnmap(b);
                         } else {
                                 blob->hdr_cmd = error;
                                 blob->hdr_ret = RPC_ERROR(-1);
                         }
                         break;                          break;
                }                case set:
                         if ((b = rpc_srv_registerBLOB(s, ntohl(blob->hdr_len)))) {
                                 /* set new BLOB variable for reply :) */
                                 blob->hdr_var = htonl(b->blob_var);
   
                memset(buf, 0, sizeof buf);                                /* receive BLOB from client */
                ret = recv(c->cli_sock, buf, sizeof buf, 0);                                blob->hdr_ret = htonl(rpc_srv_recvBLOB(c, b));
                if (ret == -1) {                                rpc_srv_blobUnmap(b);
                        LOGERR;                        } else {
                        ret = -3;                                blob->hdr_cmd = error;
                                 blob->hdr_ret = RPC_ERROR(-1);
                         }
                         break;                          break;
                }                case unset:
                /* receive EOF, disable or kill service */                        if (rpc_srv_unregisterBLOB(s, blob->hdr_var) == -1) {
                if (!ret || s->srv_blob.state == kill || s->srv_kill == kill) {                                blob->hdr_cmd = error;
                        ret = 0;                                blob->hdr_ret = RPC_ERROR(-1);
                         }
                         break;                          break;
                }                default:
                if (ret < sizeof(struct tagBLOBHdr)) {                        rpc_SetErr(EPROCUNAVAIL, "Unsupported BLOB command %d", blob->hdr_cmd);
                        rpc_SetErr(ERPCMISMATCH, "Error:: too short BLOB packet ...\n");                        blob->hdr_cmd = error;
                        ret = -4;                        blob->hdr_ret = RPC_ERROR(-1);
                        if (s->srv_kill != kill && s->srv_blob.state != kill)        }
                                continue; 
                        else 
                                break; 
                } else 
                        blob = (struct tagBLOBHdr*) buf; 
                /* check BLOB packet session info */ 
                if (memcmp(&blob->hdr_session, &s->srv_session, sizeof blob->hdr_session)) { 
                        rpc_SetErr(EINVAL, "Error:: get invalid BLOB session ...\n"); 
                        ret = -5; 
                        goto makeReply; 
                } 
                /* Go to proceed packet ... */ 
                switch (blob->hdr_cmd) { 
                        case get: 
                                if (!(b = rpc_srv_getBLOB(s, blob->hdr_var))) { 
                                        rpc_SetErr(EINVAL, "Error:: var (%x) not found into BLOB server ...\n",  
                                                        blob->hdr_var); 
                                        ret = -6; 
                                        break; 
                                } else 
                                        blob->hdr_len = b->blob_len; 
   
                                if (rpc_srv_blobMap(s, b) != -1) {end:
                                        ret = rpc_srv_sendBLOB(c, b);        schedWrite(TASK_ROOT(task), txBLOB, TASK_ARG(task), TASK_FD(task), 
                                        rpc_srv_blobUnmap(b);                        TASK_DATA(task), TASK_DATLEN(task));
                                } else        schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task), 
                                        ret = -7;                        TASK_DATA(task), TASK_DATLEN(task));
                                break;        return NULL;
                        case set:}
                                if ((b = rpc_srv_registerBLOB(s, blob->hdr_len))) { 
                                        /* set new BLOB variable for reply :) */ 
                                        blob->hdr_var = b->blob_var; 
   
                                        ret = rpc_srv_recvBLOB(c, b);static void *
                                        rpc_srv_blobUnmap(b);rpc_srv_dispatchVars(void *arg)
                                } else{
                                        ret = -7;        rpc_cli_t *c = arg;
                                break;        rpc_srv_t *s;
                        case unset:        sched_root_task_t *root;
                                ret = rpc_srv_unregisterBLOB(s, blob->hdr_var);        u_char *buf;
                                if (ret == -1)        struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
                                        ret = -7; 
                                break; 
                        default: 
                                rpc_SetErr(EPROCUNAVAIL, "Error:: unsupported BLOB command (%d)...\n",  
                                                blob->hdr_cmd); 
                                ret = -7; 
                } 
   
makeReply:        if (!arg) {
                /* Replay to client! */                rpc_SetErr(EINVAL, "Invalid parameter can`t procced BLOB");
                blob->hdr_cmd = ret < 0 ? error : ok;                return NULL;
                blob->hdr_ret = ret;        } else
                ret = send(c->cli_sock, buf, sizeof buf, 0);                s = c->cli_parent;
                if (ret == -1) { 
                        LOGERR; 
                        ret = -8; 
                        break; 
                } 
                if (ret != sizeof buf) { 
                        rpc_SetErr(EPROCUNAVAIL, "Error:: in send BLOB reply, should be send %d bytes, " 
                                        "really is %d\n", sizeof buf, ret); 
                        ret = -9; 
                        if (s->srv_kill != kill && s->srv_blob.state != kill) 
                                continue; 
                        else 
                                break; 
                } 
        } while (ret > -1 || s->srv_kill != kill); 
   
           /* allocate net buffer */
           buf = malloc(sizeof(struct tagBLOBHdr));
           if (!buf) {
                   LOGERR;
                   return NULL;
           }
   
           root = schedBegin();
           if (!root) {
                   rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                   free(buf);
                   return NULL;
           } else {
                   schedTermCondition(root, kill);
                   schedPolling(root, &ts, NULL);
           }
   
           schedRead(root, rxBLOB, c, c->cli_sock, buf, sizeof(struct tagBLOBHdr));
   
           schedRun(root, (void*) &c->cli_kill);
           schedEnd(&root);
   
         shutdown(c->cli_sock, SHUT_RDWR);          shutdown(c->cli_sock, SHUT_RDWR);
         close(c->cli_sock);          close(c->cli_sock);
         memset(c, 0, sizeof(rpc_cli_t));          memset(c, 0, sizeof(rpc_cli_t));
        return (void*) ((long)ret);        free(buf);
         return NULL;
 }  }
   #endif
   
// -------------------------------------------------/* ------------------------------------------------------ */
   
 /*  /*
 * rpc_srv_initBLOBServer() Init & create BLOB Server * rpc_srv_initBLOBServer() - Init & create BLOB Server
  *
  * @srv = RPC server instance   * @srv = RPC server instance
  * @Port = Port for bind server, if Port == 0 default port is selected   * @Port = Port for bind server, if Port == 0 default port is selected
  * @diskDir = Disk place for BLOB file objects   * @diskDir = Disk place for BLOB file objects
Line 347  rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s Line 531  rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s
         int n = 1;          int n = 1;
         io_sockaddr_t sa;          io_sockaddr_t sa;
   
        if (!srv) {        if (!srv || srv->srv_kill) {
                rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t init BLOB server ...\n");                rpc_SetErr(EINVAL, "Invalid parameters can`t init BLOB server");
                 return -1;                  return -1;
         }          }
         if (srv->srv_blob.state) {  
                 rpc_SetErr(EPERM, "Warning:: Already started BLOB server!\n");  
                 return 0;  
         }  
   
         memset(&srv->srv_blob, 0, sizeof srv->srv_blob);          memset(&srv->srv_blob, 0, sizeof srv->srv_blob);
         if (access(diskDir, R_OK | W_OK) == -1) {          if (access(diskDir, R_OK | W_OK) == -1) {
                 LOGERR;                  LOGERR;
                 return -1;                  return -1;
         } else          } else
                strlcpy(srv->srv_blob.dir, diskDir, UCHAR_MAX + 1);                AIT_SET_STR(&srv->srv_blob.dir, diskDir);
   
        srv->srv_blob.server.cli_tid = pthread_self();        /* init blob list */
         TAILQ_INIT(&srv->srv_blob.blobs);
 
         srv->srv_blob.server.cli_parent = srv;          srv->srv_blob.server.cli_parent = srv;
   
        memcpy(&sa, &srv->srv_server.cli_sa, sizeof sa);        memcpy(&srv->srv_blob.server.cli_sa, &srv->srv_server.cli_sa, sizeof(io_sockaddr_t));
         switch (sa.sa.sa_family) {          switch (sa.sa.sa_family) {
                 case AF_INET:                  case AF_INET:
                         sa.sin.sin_port = htons(Port ? Port : ntohs(sa.sin.sin_port) + 1);                          sa.sin.sin_port = htons(Port ? Port : ntohs(sa.sin.sin_port) + 1);
Line 378  rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s Line 560  rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_s
                         strlcat(sa.sun.sun_path, ".blob", sizeof sa.sun.sun_path);                          strlcat(sa.sun.sun_path, ".blob", sizeof sa.sun.sun_path);
                         break;                          break;
                 default:                  default:
                           AIT_FREE_VAL(&srv->srv_blob.dir);
                         return -1;                          return -1;
         }          }
         memcpy(&srv->srv_blob.server.cli_sa, &sa, sizeof sa);  
   
         /* create BLOB server socket */          /* create BLOB server socket */
         srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);          srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);
         if (srv->srv_blob.server.cli_sock == -1) {          if (srv->srv_blob.server.cli_sock == -1) {
                 LOGERR;                  LOGERR;
                   AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;                  return -1;
         }          }
         if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {          if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
                 LOGERR;                  LOGERR;
                 close(srv->srv_blob.server.cli_sock);                  close(srv->srv_blob.server.cli_sock);
                   AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;                  return -1;
         }          }
         n = srv->srv_netbuf;          n = srv->srv_netbuf;
         if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {          if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
                 LOGERR;                  LOGERR;
                 close(srv->srv_blob.server.cli_sock);                  close(srv->srv_blob.server.cli_sock);
                   AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;                  return -1;
         }          }
         if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {          if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
                 LOGERR;                  LOGERR;
                 close(srv->srv_blob.server.cli_sock);                  close(srv->srv_blob.server.cli_sock);
                   AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;                  return -1;
         }          }
         if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa,           if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa, 
                                 srv->srv_blob.server.cli_sa.sa.sa_len) == -1) {                                  srv->srv_blob.server.cli_sa.sa.sa_len) == -1) {
                 LOGERR;                  LOGERR;
                 close(srv->srv_blob.server.cli_sock);                  close(srv->srv_blob.server.cli_sock);
                   AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;                  return -1;
         }          }
   
        /* allocate pool for concurent clients */        /* allocate pool for concurent blob clients */
        srv->srv_blob.clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));        srv->srv_blob.clients = io_arrayInit(io_arraySize(srv->srv_clients));
         if (!srv->srv_blob.clients) {          if (!srv->srv_blob.clients) {
                LOGERR;                rpc_SetErr(io_GetErrno(), "%s", io_GetError());
                 close(srv->srv_blob.server.cli_sock);                  close(srv->srv_blob.server.cli_sock);
                   AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;                  return -1;
        } else        }
                memset(srv->srv_blob.clients, 0, srv->srv_numcli * sizeof(rpc_cli_t)); 
   
        pthread_mutex_init(&srv->srv_blob.mtx, NULL);        /* init blob scheduler */
         srv->srv_blob.root = schedBegin();
         if (!srv->srv_blob.root) {
                 rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                 io_arrayDestroy(&srv->srv_blob.clients);
                 close(srv->srv_blob.server.cli_sock);
                 AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;
         }
   
         rpc_srv_registerCall(srv, NULL, CALL_BLOBSHUTDOWN, 0);  
         rpc_srv_registerCall(srv, NULL, CALL_BLOBCLIENTS, 1);  
         rpc_srv_registerCall(srv, NULL, CALL_BLOBVARS, 1);  
         rpc_srv_registerCall(srv, NULL, CALL_BLOBSTATE, 0);  
   
         srv->srv_blob.state = enable;   /* enable BLOB */  
         return 0;          return 0;
 }  }
   
 /*  /*
 * rpc_srv_endBLOBServer() Destroy BLOB server, close all opened sockets and free resources * rpc_srv_endBLOBServer() - Destroy BLOB server, close all opened sockets and free resources
  *
  * @srv = RPC Server instance   * @srv = RPC Server instance
  * return: none   * return: none
  */   */
Line 441  rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv) Line 631  rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
 {  {
         rpc_cli_t *c;          rpc_cli_t *c;
         register int i;          register int i;
        rpc_blob_t *f;        rpc_blob_t *b, *tmp;
   
        if (!srv) {        if (!srv)
                rpc_SetErr(EINVAL, "Error:: Can`t destroy server because parameter is null!\n"); 
                 return;                  return;
         } else  
                 srv->srv_blob.state = kill;  
   
         rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSHUTDOWN);  
         rpc_srv_unregisterCall(srv, NULL, CALL_BLOBCLIENTS);  
         rpc_srv_unregisterCall(srv, NULL, CALL_BLOBVARS);  
         rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSTATE);  
   
         /* close all clients connections & server socket */          /* close all clients connections & server socket */
        for (i = 0, c = srv->srv_blob.clients; i < srv->srv_numcli && c; i++, c++)        for (i = 0; i < io_arraySize(srv->srv_blob.clients); i++) {
                if (c->cli_sa.sa.sa_family)                c = io_array(srv->srv_blob.clients, i, rpc_cli_t*);
                 if (c) {
                         shutdown(c->cli_sock, SHUT_RDWR);                          shutdown(c->cli_sock, SHUT_RDWR);
        close(srv->srv_blob.server.cli_sock);                        close(c->cli_sock);
   
        pthread_mutex_lock(&srv->srv_blob.mtx);                        io_arrayDel(srv->srv_blob.clients, i, 42);
        if (srv->srv_blob.clients) {                }
                free(srv->srv_blob.clients); 
                srv->srv_blob.clients = NULL; 
         }          }
           io_arrayDestroy(&srv->srv_blob.clients);
   
           close(srv->srv_blob.server.cli_sock);
   
         /* detach blobs */          /* detach blobs */
        while ((f = srv->srv_blob.blobs)) {        TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
                srv->srv_blob.blobs = f->blob_next;                TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
                rpc_srv_blobFree(srv, f);
                free(f);                rpc_srv_blobFree(srv, b);
                 free(b);
         }          }
         pthread_mutex_unlock(&srv->srv_blob.mtx);  
   
        while (pthread_mutex_trylock(&srv->srv_blob.mtx) == EBUSY);        schedEnd(&srv->srv_blob.root);
        pthread_mutex_destroy(&srv->srv_blob.mtx);        AIT_FREE_VAL(&srv->srv_blob.dir);
 
         srv->srv_blob.kill = 1;
 }  }
   
 /*  /*
 * rpc_srv_loopBLOB() Execute Main BLOB server loop and wait for clients requests * rpc_srv_loopBLOBServer() - Execute Main BLOB server loop and wait for clients requests
  *
  * @srv = RPC Server instance   * @srv = RPC Server instance
  * return: -1 error or 0 ok, infinite loop ...   * return: -1 error or 0 ok, infinite loop ...
  */   */
 int  int
rpc_srv_loopBLOB(rpc_srv_t * __restrict srv)rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv)
 {  {
        socklen_t salen = sizeof(io_sockaddr_t);        if (!srv || srv->srv_kill) {
        register int i;                rpc_SetErr(EINVAL, "Invalid parameter can`t start BLOB server");
        rpc_cli_t *c; 
        fd_set fds; 
        int ret; 
        struct timeval tv = { DEF_RPC_TIMEOUT, 0 }; 
 
        if (!srv || srv->srv_blob.state == kill) { 
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start BLOB server ...\n"); 
                 return -1;                  return -1;
         }          }
   
        if (listen(srv->srv_blob.server.cli_sock, SOMAXCONN) == -1) {        fcntl(srv->srv_blob.server.cli_sock, F_SETFL, 
                         fcntl(srv->srv_blob.server.cli_sock, F_GETFL) | O_NONBLOCK);
 
         if (listen(srv->srv_blob.server.cli_sock, io_arraySize(srv->srv_blob.clients)) == -1) {
                 LOGERR;                  LOGERR;
                 return -1;                  return -1;
         }          }
   
        while (srv->srv_blob.state != kill && srv->srv_kill != kill) {        if (!schedRead(srv->srv_blob.root, acceptBLOBClients, srv, 
                                 srv->srv_blob.server.cli_sock, NULL, 0)) {
                 rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                 return -1;
         }
 
 #if 0
                 for (c = srv->srv_blob.clients, i = 0; i < srv->srv_numcli && c; i++, c++)                  for (c = srv->srv_blob.clients, i = 0; i < srv->srv_numcli && c; i++, c++)
                         if (!c->cli_sa.sa.sa_family)                          if (!c->cli_sa.sa.sa_family)
                                 break;                                  break;
                 if (i >= srv->srv_numcli) {                  if (i >= srv->srv_numcli) {
 #ifdef HAVE_PTHREAD_YIELD  #ifdef HAVE_PTHREAD_YIELD
                         pthread_yield();                          pthread_yield();
 #else  
                         usleep(1000000);  
 #endif  #endif
                           usleep(1000000);
                         continue;                          continue;
                 }                  }
   
                 FD_ZERO(&fds);  
                 FD_SET(srv->srv_blob.server.cli_sock, &fds);  
                 ret = select(srv->srv_blob.server.cli_sock + 1, &fds, NULL, NULL, &tv);  
                 if (ret == -1) {  
                         LOGERR;  
                         ret = 1;  
                         break;  
                 }  
                 if (!ret)  
                         continue;  
   
                 c->cli_sock = accept(srv->srv_blob.server.cli_sock, &c->cli_sa.sa, &salen);                  c->cli_sock = accept(srv->srv_blob.server.cli_sock, &c->cli_sa.sa, &salen);
                 if (c->cli_sock == -1) {                  if (c->cli_sock == -1) {
                         LOGERR;                          LOGERR;
                         continue;                          continue;
                 } else                  } else
                         c->cli_parent = srv;                          c->cli_parent = srv;
   
                 if (pthread_create(&c->cli_tid, NULL, rpc_srv_dispatchVars, c)) {  
                         LOGERR;  
                         continue;  
                 } else  
                         pthread_detach(c->cli_tid);  
         }          }
   #endif
   
        srv->srv_blob.state = kill;        /* main rpc loop */
        schedRun(srv->srv_blob.root, &srv->srv_blob.kill);
         return 0;          return 0;
 }  }
   
   
 /*  /*
 * rpc_srv_initServer() Init & create RPC Server * rpc_srv_initServer() - Init & create RPC Server
  *
  * @regProgID = ProgramID for authentication & recognition   * @regProgID = ProgramID for authentication & recognition
  * @regProcID = ProcessID for authentication & recognition   * @regProcID = ProcessID for authentication & recognition
  * @concurentClients = Concurent clients at same time to this server   * @concurentClients = Concurent clients at same time to this server
  * @netBuf = Network buffer length, if =0 == BUFSIZ (also meaning max RPC packet)   * @netBuf = Network buffer length, if =0 == BUFSIZ (also meaning max RPC packet)
  * @family = Family type, AF_INET, AF_INET6 or AF_LOCAL  
  * @csHost = Host name or address for bind server, if NULL any address   * @csHost = Host name or address for bind server, if NULL any address
  * @Port = Port for bind server, if Port == 0 default port is selected   * @Port = Port for bind server, if Port == 0 default port is selected
  * return: NULL == error or !=NULL bind and created RPC server instance   * return: NULL == error or !=NULL bind and created RPC server instance
  */   */
 rpc_srv_t *  rpc_srv_t *
rpc_srv_initServer(u_int regProgID, u_int regProcID, int concurentClients, rpc_srv_initServer(u_int regProgID, u_char regProcID, int concurentClients, 
                int netBuf, u_short family, const char *csHost, u_short Port)                int netBuf, const char *csHost, u_short Port)
 {  {
         rpc_srv_t *srv = NULL;  
         int n = 1;          int n = 1;
        struct hostent *host = NULL;        rpc_srv_t *srv = NULL;
         io_sockaddr_t sa;          io_sockaddr_t sa;
   
        if (!concurentClients || !regProgID ||         if (!concurentClients || !regProgID) {
                        (family != AF_INET && family != AF_INET6 && family != AF_LOCAL)) {                rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
                rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t init RPC server ...\n"); 
                 return NULL;                  return NULL;
         }          }
           if (!io_gethostbyname(csHost, Port, &sa))
                   return NULL;
         if (!Port)          if (!Port)
                 Port = RPC_DEFPORT;                  Port = RPC_DEFPORT;
         if (!netBuf)          if (!netBuf)
                 netBuf = BUFSIZ;                  netBuf = BUFSIZ;
        if (csHost && family != AF_LOCAL) {        else
                host = gethostbyname2(csHost, family);                netBuf = io_align(netBuf, 1);   /* align netBuf length */
                if (!host) { 
                        rpc_SetErr(h_errno, "Error:: %s\n", hstrerror(h_errno)); 
                        return NULL; 
                } 
        } 
        memset(&sa, 0, sizeof sa); 
        sa.sa.sa_family = family; 
        switch (family) { 
                case AF_INET: 
                        sa.sin.sin_len = sizeof(struct sockaddr_in); 
                        sa.sin.sin_port = htons(Port); 
                        if (csHost) 
                                memcpy(&sa.sin.sin_addr, host->h_addr, host->h_length); 
                        break; 
                case AF_INET6: 
                        sa.sin6.sin6_len = sizeof(struct sockaddr_in6); 
                        sa.sin6.sin6_port = htons(Port); 
                        if (csHost) 
                                memcpy(&sa.sin6.sin6_addr, host->h_addr, host->h_length); 
                        break; 
                case AF_LOCAL: 
                        sa.sun.sun_len = sizeof(struct sockaddr_un); 
                        if (csHost) 
                                strlcpy(sa.sun.sun_path, csHost, sizeof sa.sun.sun_path); 
                        unlink(sa.sun.sun_path); 
                        break; 
                default: 
                        rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t start RPC server ...\n"); 
                        return NULL; 
        } 
   
         srv = malloc(sizeof(rpc_srv_t));          srv = malloc(sizeof(rpc_srv_t));
         if (!srv) {          if (!srv) {
Line 617  rpc_srv_initServer(u_int regProgID, u_int regProcID, i Line 759  rpc_srv_initServer(u_int regProgID, u_int regProcID, i
                 memset(srv, 0, sizeof(rpc_srv_t));                  memset(srv, 0, sizeof(rpc_srv_t));
   
         srv->srv_netbuf = netBuf;          srv->srv_netbuf = netBuf;
         srv->srv_numcli = concurentClients;  
         srv->srv_session.sess_version = RPC_VERSION;          srv->srv_session.sess_version = RPC_VERSION;
         srv->srv_session.sess_program = regProgID;          srv->srv_session.sess_program = regProgID;
         srv->srv_session.sess_process = regProcID;          srv->srv_session.sess_process = regProcID;
   
         srv->srv_server.cli_tid = pthread_self();  
         srv->srv_server.cli_parent = srv;          srv->srv_server.cli_parent = srv;
        memcpy(&srv->srv_server.cli_sa, &sa, sizeof sa);        memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa);
   
           /* init functions list */
           TAILQ_INIT(&srv->srv_funcs);
   
           /* init scheduler */
           srv->srv_root = schedBegin();
           if (!srv->srv_root) {
                   rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                   free(srv);
                   return NULL;
           }
   
           /* init pool for clients */
           srv->srv_clients = io_arrayInit(concurentClients);
           if (!srv->srv_clients) {
                   rpc_SetErr(io_GetErrno(), "%s", io_GetError());
                   schedEnd(&srv->srv_root);
                   free(srv);
                   return NULL;
           }
   
         /* create server socket */          /* create server socket */
        srv->srv_server.cli_sock = socket(family, SOCK_STREAM, 0);        srv->srv_server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);
         if (srv->srv_server.cli_sock == -1) {          if (srv->srv_server.cli_sock == -1) {
                 LOGERR;                  LOGERR;
                   io_arrayDestroy(&srv->srv_clients);
                   schedEnd(&srv->srv_root);
                 free(srv);                  free(srv);
                 return NULL;                  return NULL;
         }          }
         if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {          if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
                 LOGERR;                  LOGERR;
                close(srv->srv_server.cli_sock);                goto err;
                free(srv); 
                return NULL; 
         }          }
         n = srv->srv_netbuf;          n = srv->srv_netbuf;
         if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {          if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
                 LOGERR;                  LOGERR;
                close(srv->srv_server.cli_sock);                goto err;
                free(srv); 
                return NULL; 
         }          }
         if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {          if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
                 LOGERR;                  LOGERR;
                close(srv->srv_server.cli_sock);                goto err;
                free(srv); 
                return NULL; 
         }          }
         if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa,           if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa, 
                                 srv->srv_server.cli_sa.sa.sa_len) == -1) {                                  srv->srv_server.cli_sa.sa.sa_len) == -1) {
                 LOGERR;                  LOGERR;
                close(srv->srv_server.cli_sock);                goto err;
                free(srv); 
                return NULL; 
         }          }
   
         /* allocate pool for concurent clients */  
         srv->srv_clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));  
         if (!srv->srv_clients) {  
                 LOGERR;  
                 close(srv->srv_server.cli_sock);  
                 free(srv);  
                 return NULL;  
         } else  
                 memset(srv->srv_clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));  
   
         pthread_mutex_init(&srv->srv_mtx, NULL);  
   
         rpc_srv_registerCall(srv, NULL, CALL_SRVSHUTDOWN, 0);  
         rpc_srv_registerCall(srv, NULL, CALL_SRVCLIENTS, 1);  
         rpc_srv_registerCall(srv, NULL, CALL_SRVSESSIONS, 4);  
         rpc_srv_registerCall(srv, NULL, CALL_SRVCALLS, 1);  
         return srv;          return srv;
   err:    /* error condition */
           close(srv->srv_server.cli_sock);
           io_arrayDestroy(&srv->srv_clients);
           schedEnd(&srv->srv_root);
           free(srv);
           return NULL;
 }  }
   
 /*  /*
 * rpc_srv_endServer() Destroy RPC server, close all opened sockets and free resources * rpc_srv_endServer() - Destroy RPC server, close all opened sockets and free resources
  *
  * @psrv = RPC Server instance   * @psrv = RPC Server instance
  * return: none   * return: none
  */   */
Line 689  rpc_srv_endServer(rpc_srv_t ** __restrict psrv) Line 834  rpc_srv_endServer(rpc_srv_t ** __restrict psrv)
 {  {
         rpc_cli_t *c;          rpc_cli_t *c;
         register int i;          register int i;
        rpc_func_t *f;        rpc_func_t *f, *tmp;
   
         if (!psrv || !*psrv) {          if (!psrv || !*psrv) {
                rpc_SetErr(EINVAL, "Error:: Can`t destroy server because parameter is null!\n");                rpc_SetErr(EINVAL, "Can`t destroy server because parameter is null!");
                 return;                  return;
         }          }
   
        rpc_srv_endBLOBServer(*psrv);        if (!(*psrv)->srv_blob.kill)
                 rpc_srv_endBLOBServer(*psrv);
   
         /* close all clients connections & server socket */          /* close all clients connections & server socket */
        for (i = 0, c = (*psrv)->srv_clients; i < (*psrv)->srv_numcli && c; i++, c++)        for (i = 0; i < io_arraySize((*psrv)->srv_clients); i++) {
                if (c->cli_sa.sa.sa_family) {                c = io_array((*psrv)->srv_clients, i, rpc_cli_t*);
                 if (c) {
                         shutdown(c->cli_sock, SHUT_RDWR);                          shutdown(c->cli_sock, SHUT_RDWR);
                         close(c->cli_sock);                          close(c->cli_sock);
                 }  
         close((*psrv)->srv_server.cli_sock);  
   
        if ((*psrv)->srv_clients) {                        io_arrayDel((*psrv)->srv_clients, i, 42);
                free((*psrv)->srv_clients);                }
                (*psrv)->srv_clients = NULL; 
                (*psrv)->srv_numcli = 0; 
         }          }
           io_arrayDestroy(&(*psrv)->srv_clients);
   
           close((*psrv)->srv_server.cli_sock);
   
         /* detach exported calls */          /* detach exported calls */
        pthread_mutex_lock(&(*psrv)->srv_mtx);        TAILQ_FOREACH_SAFE(f, &(*psrv)->srv_funcs, func_node, tmp) {
        while ((f = (*psrv)->srv_funcs)) {                TAILQ_REMOVE(&(*psrv)->srv_funcs, f, func_node);
                (*psrv)->srv_funcs = f->func_next;
                 io_freeVars(&f->func_vars);                  io_freeVars(&f->func_vars);
                   AIT_FREE_VAL(&f->func_name);
                 free(f);                  free(f);
         }          }
         pthread_mutex_unlock(&(*psrv)->srv_mtx);  
   
        while (pthread_mutex_trylock(&(*psrv)->srv_mtx) == EBUSY);        schedEnd(&(*psrv)->srv_root);
        pthread_mutex_destroy(&(*psrv)->srv_mtx); 
 
         free(*psrv);          free(*psrv);
         *psrv = NULL;          *psrv = NULL;
 }  }
   
 /*  /*
 * rpc_srv_loopServer() Execute Main server loop and wait for clients requests * rpc_srv_loopServer() - Execute Main server loop and wait for clients requests
  *
  * @srv = RPC Server instance   * @srv = RPC Server instance
  * return: -1 error or 0 ok, infinite loop ...   * return: -1 error or 0 ok, infinite loop ...
  */   */
 int  int
 rpc_srv_loopServer(rpc_srv_t * __restrict srv)  rpc_srv_loopServer(rpc_srv_t * __restrict srv)
 {  {
         socklen_t salen = sizeof(io_sockaddr_t);  
         register int i;  
         rpc_cli_t *c;  
         fd_set fds;  
         int ret;  
         struct timeval tv = { DEF_RPC_TIMEOUT, 0 };  
   
         if (!srv) {          if (!srv) {
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start RPC server ...\n");                rpc_SetErr(EINVAL, "Invalid parameter can`t start RPC server");
                 return -1;                  return -1;
         }          }
   
        /* activate BLOB server worker if srv->srv_blob.state == enable */        fcntl(srv->srv_server.cli_sock, F_SETFL, 
        rpc_srv_execBLOBServer(srv);                        fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
   
        if (listen(srv->srv_server.cli_sock, SOMAXCONN) == -1) {        if (listen(srv->srv_server.cli_sock, io_arraySize(srv->srv_clients)) == -1) {
                 LOGERR;                  LOGERR;
                 return -1;                  return -1;
         }          }
   
        while (srv->srv_kill != kill) {        if (!schedRead(srv->srv_root, acceptClients, srv, srv->srv_server.cli_sock, NULL, 0)) {
                for (c = srv->srv_clients, i = 0; i < srv->srv_numcli && c; i++, c++)                rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                        if (!c->cli_sa.sa.sa_family)                return -1;
                                break; 
                if (i >= srv->srv_numcli) { 
#ifdef HAVE_PTHREAD_YIELD 
                        pthread_yield(); 
#else 
                        usleep(1000000); 
#endif 
                        continue; 
                } 
 
                FD_ZERO(&fds); 
                FD_SET(srv->srv_server.cli_sock, &fds); 
                ret = select(srv->srv_server.cli_sock + 1, &fds, NULL, NULL, &tv); 
                if (ret == -1) { 
                        LOGERR; 
                        ret = 1; 
                        break; 
                } 
                if (!ret) 
                        continue; 
 
                c->cli_sock = accept(srv->srv_server.cli_sock, &c->cli_sa.sa, &salen); 
                if (c->cli_sock == -1) { 
                        LOGERR; 
                        continue; 
                } else 
                        c->cli_parent = srv; 
 
                if (pthread_create(&c->cli_tid, NULL, rpc_srv_dispatchCall, c)) { 
                        LOGERR; 
                        continue; 
                } else 
                        pthread_detach(c->cli_tid); 
         }          }
   
           /* main rpc loop */
           schedRun(srv->srv_root, &srv->srv_kill);
         return 0;          return 0;
 }  }
   
 // ---------------------------------------------------------  
   
 /*  /*
  * rpc_srv_execCall() Execute registered call from RPC server   * rpc_srv_execCall() Execute registered call from RPC server
    *
  * @call = Register RPC call   * @call = Register RPC call
  * @rpc = IN RPC call structure   * @rpc = IN RPC call structure
  * @args = IN RPC calling arguments from RPC client   * @args = IN RPC calling arguments from RPC client
Line 810  int Line 917  int
 rpc_srv_execCall(rpc_func_t * __restrict call, struct tagRPCCall * __restrict rpc,   rpc_srv_execCall(rpc_func_t * __restrict call, struct tagRPCCall * __restrict rpc, 
                 array_t * __restrict args)                  array_t * __restrict args)
 {  {
         void *dl;  
         rpc_callback_t func;          rpc_callback_t func;
         int ret;  
   
        if (!call || !rpc || !call->func_parent) {        if (!call || !rpc || !call->func_parent || !AIT_ADDR(&call->func_name)) {
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t exec call from RPC server ...\n");                rpc_SetErr(EINVAL, "Invalid parameter can`t exec function");
                 return -1;                  return -1;
         }          }
   
        dl = dlopen((char*) (*call->func_file ? call->func_file : NULL), RTLD_NOW);        func = AIT_GET_LIKE(&call->func_name, rpc_callback_t);
        if (!dl) {        return func(call, ntohs(rpc->call_argc), args);
                rpc_SetErr(ENOENT, "Error:: Can`t attach module %s!\n", dlerror()); 
                return -1; 
        } 
 
        func = dlsym(dl, (char*) call->func_name); 
        if (func) 
                ret = func(call, ntohs(rpc->call_argc), args); 
        else { 
                rpc_SetErr(ENOEXEC, "Error:: Can`t find function %s!\n", dlerror()); 
                ret = -1; 
        } 
 
        dlclose(dl); 
        return ret; 
 }  }

Removed from v.1.5.2.11  
changed lines
  Added in v.1.9.2.12


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>