Diff for /libaitrpc/src/srv.c between versions 1.2 and 1.8

version 1.2, 2011/05/02 23:12:08 version 1.8, 2012/03/29 01:34:16
Line 12  terms: Line 12  terms:
 All of the documentation and software included in the ELWIX and AITNET  All of the documentation and software included in the ELWIX and AITNET
 Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>  Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
   
Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
         by Michael Pounov <misho@elwix.org>.  All rights reserved.          by Michael Pounov <misho@elwix.org>.  All rights reserved.
   
 Redistribution and use in source and binary forms, with or without  Redistribution and use in source and binary forms, with or without
Line 46  SUCH DAMAGE. Line 46  SUCH DAMAGE.
 #include "global.h"  #include "global.h"
   
   
   static void *rxPacket(sched_task_t*);
   static void *rxBLOB(sched_task_t*);
   
 static void *  static void *
rpc_srv_dispatchCall(void *arg)txPacket(sched_task_t *task)
 {  {
        rpc_cli_t *c = arg;        rpc_cli_t *c = TASK_ARG(task);
        rpc_srv_t *s;        rpc_srv_t *s = c->cli_parent;
        rpc_val_t *vals = NULL, *v = NULL;        rpc_func_t *f = NULL;
        rpc_func_t *f;        u_char *buf = TASK_DATA(task);
         struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
         int ret, wlen = sizeof(struct tagRPCCall);
         array_t *arr = NULL;
 
         ioTRACE(RPC_TRACE_LEVEL);
 
         if (rpc->call_argc) {
                 f = rpc_srv_getCall(s, ntohs(rpc->call_tag), ntohl(rpc->call_hash));
                 if (!f) {
                         rpc->call_argc ^= rpc->call_argc;
                         rpc->call_rep.ret = RPC_ERROR(-1);
                         rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                 } else {
                         rpc->call_argc = htons(rpc_srv_getVars(f, &arr));
                         /* Go Encapsulate variables */
                         ret = io_vars2buffer(buf + wlen, TASK_DATLEN(task) - wlen, arr);
                         io_clrVars(f->func_vars);
                         if (ret == -1) {
                                 rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
                                 rpc->call_argc ^= rpc->call_argc;
                                 rpc->call_rep.ret = RPC_ERROR(-1);
                                 rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                         } else
                                 wlen += ret;
                 }
         }
 
         rpc->call_len = htons(wlen);
 
         /* calculate CRC */
         rpc->call_crc ^= rpc->call_crc;
         rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
 
         /* send reply */
         ret = send(TASK_FD(task), buf, wlen, 0);
         if (ret == -1)
                 LOGERR;
         else if (ret != wlen)
                 rpc_SetErr(EPROCUNAVAIL, "RPC reply, should be send %d bytes, "
                                 "really sended %d bytes", wlen, ret);
         else
                 ioDEBUG(RPC_DEBUG_LEVEL, "Sended %d bytes", ret);
 
         return NULL;
 }
 
 static void *
 execCall(sched_task_t *task)
 {
         rpc_cli_t *c = TASK_ARG(task);
         rpc_srv_t *s = c->cli_parent;
         rpc_func_t *f = NULL;
         array_t *arr = NULL;
         u_char *buf = TASK_DATA(task) + TASK_VAL(task);
         struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
         int argc = ntohs(rpc->call_argc);
 
         ioTRACE(RPC_TRACE_LEVEL);
 
         /* Go decapsulate variables ... */
         if (argc) {
                 arr = io_buffer2vars(buf + sizeof(struct tagRPCCall), 
                                 TASK_DATLEN(task) - TASK_VAL(task) - sizeof(struct tagRPCCall), 
                                 argc, 1);
                 if (!arr) {
                         rpc_SetErr(ERPCMISMATCH, "#%d - %s", io_GetErrno(), io_GetError());
                         rpc->call_argc ^= rpc->call_argc;
                         rpc->call_rep.ret = RPC_ERROR(-1);
                         rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                         return NULL;
                 }
         }
 
         if (!(f = rpc_srv_getCall(s, ntohs(rpc->call_tag), ntohl(rpc->call_hash)))) {
                 rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
                 rpc->call_argc ^= rpc->call_argc;
                 rpc->call_rep.ret = RPC_ERROR(-1);
                 rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
         } else {
                 ioDEBUG(RPC_DEBUG_LEVEL, "RPC function %s from module %s", 
                                 AIT_GET_STR(&f->func_name), AIT_GET_LIKE(&f->func_file, char*));
 
                 /* if client doesn't want reply */
                 argc = rpc->call_req.flags & RPC_NOREPLY;
                 rpc->call_rep.ret = RPC_ERROR(rpc_srv_execCall(f, rpc, arr));
                 if (rpc->call_rep.ret == htonl(-1)) {
                         rpc->call_rep.eno = RPC_ERROR(errno);
                         rpc->call_argc ^= rpc->call_argc;
                 } else {
                         rpc->call_rep.eno ^= rpc->call_rep.eno;
                         if (argc) {
                                 /* without reply */
                                 io_clrVars(f->func_vars);
                                 rpc->call_argc ^= rpc->call_argc;
                         } else
                                 rpc->call_argc = htons(rpc_srv_getVars(f, NULL));
                 }
         }
 
         if (arr)
                 io_arrayDestroy(&arr);
         return NULL;
 }
 
 static void *
 rxPacket(sched_task_t *task)
 {
         rpc_cli_t *c = TASK_ARG(task);
         rpc_srv_t *s = c->cli_parent;
         u_char *buf = TASK_DATA(task);
         int rlen, noreply;
         u_short crc, off = 0;
         struct tagRPCCall *rpc;          struct tagRPCCall *rpc;
        struct tagRPCRet rrpc;        struct timespec ts;
        fd_set fds; 
        u_char buf[BUFSIZ], *data; 
        int ret, argc = 0, Limit = 0; 
        register int i; 
   
        if (!arg) {        ioTRACE(RPC_TRACE_LEVEL);
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t procced RPC client ...\n");
         memset(buf, 0, TASK_DATLEN(task));
         rlen = recv(TASK_FD(task), buf, TASK_DATLEN(task), 0);
         if (rlen == -1) {
                 LOGERR;
                 s->srv_kill = s->srv_blob.state = kill;
                 return NULL;                  return NULL;
           } else if (!rlen) {     /* receive EOF */
                   s->srv_kill = s->srv_blob.state = kill;
                   return NULL;
         } else          } else
                s = c->cli_parent;                ioDEBUG(RPC_DEBUG_LEVEL, "Readed %d bytes", rlen);
   
         do {          do {
                FD_ZERO(&fds);                if (rlen < sizeof(struct tagRPCCall)) {
                FD_SET(c->cli_sock, &fds);                        rpc_SetErr(ERPCMISMATCH, "Too short RPC packet");
                ret = select(c->cli_sock + 1, &fds, NULL, NULL, NULL);
                if (ret == -1) {                        schedRead(TASK_ROOT(task), rxPacket, TASK_ARG(task), TASK_FD(task), 
                        ret = -2;                                        TASK_DATA(task), TASK_DATLEN(task));
                }                        return NULL;
                memset(&rrpc, 0, sizeof rrpc); 
                memset(buf, 0, BUFSIZ); 
                if ((ret = recv(c->cli_sock, buf, BUFSIZ, 0)) == -1) { 
                        LOGERR; 
                        ret = -3; 
                        break; 
                } 
                if (!ret) {             // receive EOF 
                        ret = 0; 
                        break; 
                } 
                if (ret < sizeof(struct tagRPCCall)) { 
                        rpc_SetErr(EMSGSIZE, "Error:: too short RPC packet ...\n"); 
                        ret = -4; 
                        break; 
                 } else                  } else
                        rpc = (struct tagRPCCall*) buf;                        rpc = (struct tagRPCCall*) (buf + off);
                // check RPC packet session info 
                if (memcmp(&rpc->call_session, &s->srv_session, sizeof rpc->call_session)) { 
                        rpc_SetErr(EINVAL, "Error:: get invalid RPC session ...\n"); 
                        ret = -5; 
                        goto makeReply; 
                } else 
                        Limit = sizeof(struct tagRPCCall); 
                // RPC is OK! Go decapsulate variables ... 
                if (rpc->call_argc) { 
                        v = (rpc_val_t*) (buf + Limit); 
                        // check RPC packet length 
                        if (rpc->call_argc * sizeof(rpc_val_t) > BUFSIZ - Limit) { 
                                rpc_SetErr(EMSGSIZE, "Error:: Too big RPC packet ...\n"); 
                                ret = -5; 
                                goto makeReply; 
                        } else 
                                Limit += rpc->call_argc * sizeof(rpc_val_t); 
                        // RPC received variables types OK! 
                        data = (u_char*) v + rpc->call_argc * sizeof(rpc_val_t); 
                        for (i = 0; i < rpc->call_argc; i++) { 
                                switch (v[i].val_type) { 
                                        case buffer: 
                                                if (v[i].val_len > BUFSIZ - Limit) { 
                                                        rpc_SetErr(EMSGSIZE, "Error:: Too big RPC packet ...\n"); 
                                                        ret = -5; 
                                                        goto makeReply; 
                                                } else 
                                                        Limit += v[i].val_len; 
   
                                                v[i].val.buffer = data;                rlen -= ntohs(rpc->call_len);
                                                data += v[i].val_len; 
                                                break; 
                                        case string: 
                                                if (v[i].val_len + 1 > BUFSIZ - Limit) { 
                                                        rpc_SetErr(EMSGSIZE, "Error:: Too big RPC packet ...\n"); 
                                                        ret = -5; 
                                                        goto makeReply; 
                                                } else 
                                                        Limit += v[i].val_len; 
   
                                                v[i].val.string = (int8_t*) data;                /* check integrity of packet */
                                                data += v[i].val_len + 1;                crc = ntohs(rpc->call_crc);
                                                break;                rpc->call_crc ^= rpc->call_crc;
                                        case blob:                if (crc != crcFletcher16((u_short*) (buf + off), ntohs(rpc->call_len) / 2)) {
                                                if (s->srv_blob.state == disable) {                        rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
                                                        rpc_SetErr(ENOTSUP, "Error:: BLOB server is disabled\n"); 
                                                        ret = -5; 
                                                        goto makeReply; 
                                                } 
                                        default: 
                                                break; 
                                } 
                        } 
                } 
   
                argc = 0;                        off += ntohs(rpc->call_len);
                vals = NULL;                        if (rlen < 1)
                if (!(f = rpc_srv_getCall(s, rpc->call_tag, rpc->call_hash))) {                                break;
                        rpc_SetErr(EINVAL, "Error:: call not found into RPC server ...\n"); 
                        ret = -6; 
                } else 
                        if ((ret = rpc_srv_execCall(f, rpc, v)) == -1) 
                                ret = -9; 
                         else                          else
                                argc = rpc_srv_getValsCall(f, &vals);                                continue;
                 }
   
makeReply:                noreply = rpc->call_req.flags & RPC_NOREPLY;
                memcpy(&rrpc.ret_session, &rpc->call_session, sizeof rrpc.ret_session); 
                rrpc.ret_tag = rpc->call_tag; 
                rrpc.ret_hash = rpc->call_hash; 
                rrpc.ret_errno = rpc_Errno; 
                rrpc.ret_retcode = ret; 
                rrpc.ret_argc = argc; 
   
                memset(buf, 0, BUFSIZ);                /* check RPC packet session info */
                memcpy(buf, &rrpc, (Limit = sizeof rrpc));                if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) {
                if (argc && vals) {                        rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
                        v = (rpc_val_t*) (buf + sizeof rrpc);                        rpc->call_argc ^= rpc->call_argc;
                        memcpy(v, vals, argc * sizeof(rpc_val_t));                        rpc->call_rep.ret = RPC_ERROR(-1);
                        Limit += argc * sizeof(rpc_val_t);                        rpc->call_rep.eno = RPC_ERROR(errno);
                        data = (u_char*) v + argc * sizeof(rpc_val_t);                } else {
                        for (ret = i = 0; i < argc; i++) {                        /* change socket timeout from last packet */
                                switch (vals[i].val_type) {                        ts.tv_sec = rpc->call_session.sess_timeout;
                                        case buffer:                        ts.tv_nsec = 0;
                                                if (ret || Limit + vals[i].val_len > BUFSIZ) {                        schedPolling(TASK_ROOT(task), &ts, NULL);
                                                        rpc_SetErr(EMSGSIZE, "Error:: in prepare RPC packet (-7) ...\n"); 
                                                        rrpc.ret_retcode = ret = -7; 
                                                        rrpc.ret_argc = 0; 
                                                        break; 
                                                } 
   
                                                memcpy(data, vals[i].val.buffer, vals[i].val_len);                        /* execute RPC call */
                                                data += vals[i].val_len;                        schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), off, 
                                                Limit += vals[i].val_len;                                        TASK_DATA(task), TASK_DATLEN(task));
                                                break;                }
                                        case string: 
                                                if (ret || Limit + vals[i].val_len + 1 > BUFSIZ) { 
                                                        rpc_SetErr(EMSGSIZE, "Error:: in prepare RPC packet (-7) ...\n"); 
                                                        rrpc.ret_retcode = ret = -7; 
                                                        rrpc.ret_argc = 0; 
                                                        break; 
                                                } 
   
                                                memcpy(data, vals[i].val.string, vals[i].val_len + 1);                /* send RPC reply */
                                                data += vals[i].val_len + 1;                if (!noreply)
                                                Limit += vals[i].val_len + 1;                        schedWrite(TASK_ROOT(task), txPacket, TASK_ARG(task), TASK_FD(task), 
                                                break;                                        buf + off, TASK_DATLEN(task) - off);
                                        case blob: 
                                                if (s->srv_blob.state == disable) { 
                                                        rpc_SetErr(ENOTSUP, "Error:: BLOB server is disabled\n"); 
                                                        rrpc.ret_retcode = ret = -5; 
                                                        rrpc.ret_argc = 0; 
                                                        break; 
                                                } 
                                        default: 
                                                break; 
                                } 
   
                                RPC_FREE_VAL(&vals[i]);                off += ntohs(rpc->call_len);
                        }        } while (rlen > 0);
                } 
   
                if ((ret = send(c->cli_sock, buf, Limit, 0)) == -1) {        /* lets get next packet */
                        LOGERR;        schedRead(TASK_ROOT(task), rxPacket, TASK_ARG(task), TASK_FD(task), 
                        ret = -8;                        TASK_DATA(task), TASK_DATLEN(task));
                        break;        return NULL;
                }}
                if (ret != Limit) { 
                        rpc_SetErr(ECANCELED, "Error:: in send RPC request, should be send %d bytes, " 
                                        "really is %d\n", Limit, ret); 
                        ret = -9; 
                        break; 
                } 
        } while (ret > -1); 
   
   static void *
   rpc_srv_dispatchCall(void *arg)
   {
           rpc_cli_t *c = arg;
           rpc_srv_t *s;
           u_char *buf;
           sched_root_task_t *root;
           struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
   
           ioTRACE(RPC_TRACE_LEVEL);
   
           if (!arg) {
                   rpc_SetErr(EINVAL, "Invalid parameter can`t procced RPC client");
                   return NULL;
           } else
                   s = c->cli_parent;
   
           /* allocate net buffer */
           buf = malloc(s->srv_netbuf);
           if (!buf) {
                   LOGERR;
                   return NULL;
           }
   
           root = schedBegin();
           if (!root) {
                   rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                   free(buf);
                   return NULL;
           } else {
                   schedTermCondition(root, kill);
                   schedPolling(root, &ts, NULL);
           }
   
           schedRead(root, rxPacket, c, c->cli_sock, buf, s->srv_netbuf);
   
           schedRun(root, (void*) &s->srv_kill);
           schedEnd(&root);
   
         shutdown(c->cli_sock, SHUT_RDWR);          shutdown(c->cli_sock, SHUT_RDWR);
         close(c->cli_sock);          close(c->cli_sock);
         memset(c, 0, sizeof(rpc_cli_t));          memset(c, 0, sizeof(rpc_cli_t));
        return (void*) (long)ret;        free(buf);
         return NULL;
 }  }
   
   
 static void *  static void *
rpc_srv_dispatchVars(void *arg)txBLOB(sched_task_t *task)
 {  {
        rpc_cli_t *c = arg;        u_char *buf = TASK_DATA(task);
        rpc_srv_t *s;        struct tagBLOBHdr *blob = (struct tagBLOBHdr *) buf;
         int wlen = sizeof(struct tagBLOBHdr);
 
         ioTRACE(RPC_TRACE_LEVEL);
 
         /* calculate CRC */
         blob->hdr_crc ^= blob->hdr_crc;
         blob->hdr_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
 
         /* send reply */
         wlen = send(TASK_FD(task), buf, wlen, 0);
         if (wlen == -1)
                 LOGERR;
         else if (wlen != sizeof(struct tagBLOBHdr))
                 rpc_SetErr(EPROCUNAVAIL, "RPC reply, should be send %d bytes, "
                                 "really sended %d bytes", sizeof(struct tagBLOBHdr), wlen);
         else
                 ioDEBUG(RPC_DEBUG_LEVEL, "Sended %d bytes", wlen);
 
         return NULL;
 }
 
 static void *
 rxBLOB(sched_task_t *task)
 {
         rpc_cli_t *c = TASK_ARG(task);
         rpc_srv_t *s = c->cli_parent;
         rpc_blob_t *b;          rpc_blob_t *b;
        int ret;        u_char *buf = TASK_DATA(task);
        fd_set fds;        struct tagBLOBHdr *blob = (struct tagBLOBHdr *) buf;
        u_char buf[sizeof(struct tagBLOBHdr)];        int rlen;
        struct tagBLOBHdr *blob;        u_short crc;
         struct timespec ts;
   
        if (!arg) {        ioTRACE(RPC_TRACE_LEVEL);
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t procced BLOB client ...\n");
         /* check for disable service at this moment? */
         if (!s || s->srv_blob.state == disable) {
                 usleep(100000);
 #ifdef HAVE_PTHREAD_YIELD
                 pthread_yield();
 #endif
                 schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task), 
                                 TASK_DATA(task), TASK_DATLEN(task));
                 return NULL;                  return NULL;
           }
   
           memset(buf, 0, TASK_DATLEN(task));
           rlen = recv(TASK_FD(task), buf, TASK_DATLEN(task), 0);
           if (rlen == -1) {
                   LOGERR;
                   s->srv_blob.state = kill;
                   return NULL;
           } else if (!rlen || s->srv_kill == kill) { /* receive EOF */
                   s->srv_blob.state = kill;
                   return NULL;
         } else          } else
                s = c->cli_parent;                ioDEBUG(RPC_DEBUG_LEVEL, "Readed %d bytes", rlen);
   
        do {        if (rlen < sizeof(struct tagBLOBHdr)) {
                // check for disable service at this moment?                rpc_SetErr(ERPCMISMATCH, "Too short BLOB packet");
                if (s->srv_blob.state == disable) {                schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task), 
                        ret = 0;                                TASK_DATA(task), TASK_DATLEN(task));
                        break;                return NULL;
                }        }
   
                FD_ZERO(&fds);        /* check integrity of packet */
                FD_SET(c->cli_sock, &fds);        crc = ntohs(blob->hdr_crc);
                ret = select(c->cli_sock + 1, &fds, NULL, NULL, NULL);        blob->hdr_crc ^= blob->hdr_crc;
                if (ret == -1) {        if (crc != crcFletcher16((u_short*) buf, rlen / 2)) {
                        ret = -2;                rpc_SetErr(ERPCMISMATCH, "Bad CRC BLOB packet");
                }                schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task), 
                                 TASK_DATA(task), TASK_DATLEN(task));
                 return NULL;
         }
   
                memset(buf, 0, sizeof buf);        /* check RPC packet session info */
                if ((ret = recv(c->cli_sock, buf, sizeof buf, 0)) == -1) {        if ((crc = rpc_chkPktSession(&blob->hdr_session, &s->srv_session))) {
                        LOGERR;                rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
                        ret = -3;                blob->hdr_cmd = error;
                        break;                goto end;
                }        } else {
                if (!ret || s->srv_blob.state == disable) {     // receive EOF or disable service                /* change socket timeout from last packet */
                        ret = 0;                ts.tv_sec = blob->hdr_session.sess_timeout;
                        break;                ts.tv_nsec = 0;
                }                schedPolling(TASK_ROOT(task), &ts, NULL);
                if (ret < sizeof(struct tagBLOBHdr)) {        }
                        rpc_SetErr(EMSGSIZE, "Error:: too short BLOB packet ...\n"); 
                        ret = -4; 
                        break; 
                } else 
                        blob = (struct tagBLOBHdr*) buf; 
                // check BLOB packet session info 
                if (memcmp(&blob->hdr_session, &s->srv_session, sizeof blob->hdr_session)) { 
                        rpc_SetErr(EINVAL, "Error:: get invalid BLOB session ...\n"); 
                        ret = -5; 
                        goto makeReply; 
                } 
                // Go to proceed packet ... 
                switch (blob->hdr_cmd) { 
                        case get: 
                                if (!(b = rpc_srv_getBLOB(s, blob->hdr_var))) { 
                                        rpc_SetErr(EINVAL, "Error:: var (%x) not found into BLOB server ...\n",  
                                                        blob->hdr_var); 
                                        ret = -6; 
                                        break; 
                                } else 
                                        blob->hdr_len = b->blob_len; 
   
                                if (rpc_srv_blobMap(s, b) != -1) {        /* Go to proceed packet ... */
                                        ret = rpc_srv_sendBLOB(c, b);        switch (blob->hdr_cmd) {
                                        rpc_srv_blobUnmap(b);                case get:
                                } else                        if (!(b = rpc_srv_getBLOB(s, ntohl(blob->hdr_var)))) {
                                        ret = -7;                                rpc_SetErr(EINVAL, "Var=%x not found", ntohl(blob->hdr_var));
                                 blob->hdr_cmd = no;
                                 blob->hdr_ret = RPC_ERROR(-1);
                                 break;                                  break;
                        case set:                        } else
                                if ((b = rpc_srv_registerBLOB(s, blob->hdr_len))) {                                blob->hdr_len = htonl(b->blob_len);
                                        // set new BLOB variable for reply :) 
                                        blob->hdr_var = b->blob_var; 
   
                                        ret = rpc_srv_recvBLOB(c, b);                        if (rpc_srv_blobMap(s, b) != -1) {
                                        rpc_srv_blobUnmap(b);                                /* deliver BLOB variable to client */
                                } else                                blob->hdr_ret = htonl(rpc_srv_sendBLOB(c, b));
                                        ret = -7;                                rpc_srv_blobUnmap(b);
                                break;                        } else {
                        case unset:                                blob->hdr_cmd = error;
                                ret = rpc_srv_unregisterBLOB(s, blob->hdr_var);                                blob->hdr_ret = RPC_ERROR(-1);
                                if (ret == -1)                        }
                                        ret = -7;                        break;
                                break;                case set:
                        default:                        if ((b = rpc_srv_registerBLOB(s, ntohl(blob->hdr_len)))) {
                                rpc_SetErr(EINVAL, "Error:: unsupported BLOB command (%d)...\n",                                 /* set new BLOB variable for reply :) */
                                                blob->hdr_cmd);                                blob->hdr_var = htonl(b->blob_var);
                                ret = -7; 
                } 
   
makeReply:                                /* receive BLOB from client */
                // Replay to client!                                blob->hdr_ret = htonl(rpc_srv_recvBLOB(c, b));
                blob->hdr_cmd = ret < 0 ? error : ok;                                rpc_srv_blobUnmap(b);
                blob->hdr_ret = ret;                        } else {
                if ((ret = send(c->cli_sock, buf, sizeof buf, 0)) == -1) {                                blob->hdr_cmd = error;
                        LOGERR;                                blob->hdr_ret = RPC_ERROR(-1);
                        ret = -8;                        }
                         break;                          break;
                }                case unset:
                if (ret != sizeof buf) {                        if (rpc_srv_unregisterBLOB(s, blob->hdr_var) == -1) {
                        rpc_SetErr(ECANCELED, "Error:: in send BLOB reply, should be send %d bytes, "                                blob->hdr_cmd = error;
                                        "really is %d\n", sizeof buf, ret);                                blob->hdr_ret = RPC_ERROR(-1);
                        ret = -9;                        }
                         break;                          break;
                }                default:
        } while (ret > -1);                        rpc_SetErr(EPROCUNAVAIL, "Unsupported BLOB command %d", blob->hdr_cmd);
                         blob->hdr_cmd = error;
                         blob->hdr_ret = RPC_ERROR(-1);
         }
   
   end:
           schedWrite(TASK_ROOT(task), txBLOB, TASK_ARG(task), TASK_FD(task), 
                           TASK_DATA(task), TASK_DATLEN(task));
           schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task), 
                           TASK_DATA(task), TASK_DATLEN(task));
           return NULL;
   }
   
   static void *
   rpc_srv_dispatchVars(void *arg)
   {
           rpc_cli_t *c = arg;
           rpc_srv_t *s;
           sched_root_task_t *root;
           u_char *buf;
           struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
   
           ioTRACE(RPC_TRACE_LEVEL);
   
           if (!arg) {
                   rpc_SetErr(EINVAL, "Invalid parameter can`t procced BLOB");
                   return NULL;
           } else
                   s = c->cli_parent;
   
           /* allocate net buffer */
           buf = malloc(sizeof(struct tagBLOBHdr));
           if (!buf) {
                   LOGERR;
                   return NULL;
           }
   
           root = schedBegin();
           if (!root) {
                   rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                   free(buf);
                   return NULL;
           } else {
                   schedTermCondition(root, kill);
                   schedPolling(root, &ts, NULL);
           }
   
           schedRead(root, rxBLOB, c, c->cli_sock, buf, sizeof(struct tagBLOBHdr));
   
           schedRun(root, (void*) &s->srv_blob.state);
           schedEnd(&root);
   
         shutdown(c->cli_sock, SHUT_RDWR);          shutdown(c->cli_sock, SHUT_RDWR);
         close(c->cli_sock);          close(c->cli_sock);
         memset(c, 0, sizeof(rpc_cli_t));          memset(c, 0, sizeof(rpc_cli_t));
        return (void*) (long)ret;        free(buf);
         return NULL;
 }  }
   
 // -------------------------------------------------  // -------------------------------------------------
   
 /*  /*
 * rpc_srv_initBLOBServer() Init & create BLOB Server * rpc_srv_initBLOBServer() - Init & create BLOB Server
  *
  * @srv = RPC server instance
  * @Port = Port for bind server, if Port == 0 default port is selected   * @Port = Port for bind server, if Port == 0 default port is selected
  * @diskDir = Disk place for BLOB file objects   * @diskDir = Disk place for BLOB file objects
  * return: -1 == error or 0 bind and created BLOB server instance   * return: -1 == error or 0 bind and created BLOB server instance
Line 358  int Line 495  int
 rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)  rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)
 {  {
         int n = 1;          int n = 1;
        struct sockaddr_in sin;        io_sockaddr_t sa;
        struct sockaddr_in6 sin6; 
   
           ioTRACE(RPC_TRACE_LEVEL);
   
         if (!srv) {          if (!srv) {
                rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t init BLOB server ...\n");                rpc_SetErr(EINVAL, "Invalid parameters can`t init BLOB server");
                 return -1;                  return -1;
         }          }
         if (srv->srv_blob.state) {          if (srv->srv_blob.state) {
                rpc_SetErr(EPERM, "Warning:: Already started BLOB server!\n");                rpc_SetErr(EPERM, "Already started BLOB server!");
                 return 0;                  return 0;
         }          }
         if (!Port)  
                 Port = RPC_DEFPORT + 1;  
   
         memset(&srv->srv_blob, 0, sizeof srv->srv_blob);          memset(&srv->srv_blob, 0, sizeof srv->srv_blob);
         if (access(diskDir, R_OK | W_OK) == -1) {          if (access(diskDir, R_OK | W_OK) == -1) {
                 LOGERR;                  LOGERR;
                 return -1;                  return -1;
         } else          } else
                strlcpy(srv->srv_blob.dir, diskDir, UCHAR_MAX + 1);                AIT_SET_STR(&srv->srv_blob.dir, diskDir);
   
         srv->srv_blob.server.cli_tid = pthread_self();          srv->srv_blob.server.cli_tid = pthread_self();
         srv->srv_blob.server.cli_parent = srv;          srv->srv_blob.server.cli_parent = srv;
        if (srv->srv_server.cli_sa.sa_family == AF_INET) {
                memcpy(&sin, &srv->srv_server.cli_sa, sizeof sin);        memcpy(&sa, &srv->srv_server.cli_sa, sizeof sa);
                sin.sin_port = htons(Port);        switch (sa.sa.sa_family) {
                memcpy(&srv->srv_blob.server.cli_sa, &sin, sizeof(struct sockaddr));                case AF_INET:
        } else {                        sa.sin.sin_port = htons(Port ? Port : ntohs(sa.sin.sin_port) + 1);
                memcpy(&sin6, &srv->srv_server.cli_sa, sizeof sin6);                        break;
                sin6.sin6_port = htons(Port);                case AF_INET6:
                memcpy(&srv->srv_blob.server.cli_sa, &sin6, sizeof(struct sockaddr));                        sa.sin6.sin6_port = htons(Port ? Port : ntohs(sa.sin6.sin6_port) + 1);
                         break;
                 case AF_LOCAL:
                         strlcat(sa.sun.sun_path, ".blob", sizeof sa.sun.sun_path);
                         break;
                 default:
                         AIT_FREE_VAL(&srv->srv_blob.dir);
                         return -1;
         }          }
           memcpy(&srv->srv_blob.server.cli_sa, &sa, sizeof sa);
   
        srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa_family, SOCK_STREAM, 0);        /* create BLOB server socket */
         srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);
         if (srv->srv_blob.server.cli_sock == -1) {          if (srv->srv_blob.server.cli_sock == -1) {
                 LOGERR;                  LOGERR;
                   AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;                  return -1;
         }          }
         if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {          if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
                 LOGERR;                  LOGERR;
                 close(srv->srv_blob.server.cli_sock);                  close(srv->srv_blob.server.cli_sock);
                   AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;                  return -1;
         }          }
        if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa,         n = srv->srv_netbuf;
                                sizeof srv->srv_blob.server.cli_sa) == -1) {        if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
                 LOGERR;                  LOGERR;
                 close(srv->srv_blob.server.cli_sock);                  close(srv->srv_blob.server.cli_sock);
                   AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;                  return -1;
         }          }
           if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
                   LOGERR;
                   close(srv->srv_blob.server.cli_sock);
                   AIT_FREE_VAL(&srv->srv_blob.dir);
                   return -1;
           }
           if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa, 
                                   srv->srv_blob.server.cli_sa.sa.sa_len) == -1) {
                   LOGERR;
                   close(srv->srv_blob.server.cli_sock);
                   AIT_FREE_VAL(&srv->srv_blob.dir);
                   return -1;
           }
   
           /* allocate pool for concurent clients */
         srv->srv_blob.clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));          srv->srv_blob.clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));
         if (!srv->srv_blob.clients) {          if (!srv->srv_blob.clients) {
                 LOGERR;                  LOGERR;
                 close(srv->srv_blob.server.cli_sock);                  close(srv->srv_blob.server.cli_sock);
                   AIT_FREE_VAL(&srv->srv_blob.dir);
                 return -1;                  return -1;
         } else          } else
                 memset(srv->srv_blob.clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));                  memset(srv->srv_blob.clients, 0, srv->srv_numcli * sizeof(rpc_cli_t));
   
         pthread_mutex_init(&srv->srv_blob.mtx, NULL);          pthread_mutex_init(&srv->srv_blob.mtx, NULL);
   
         pthread_mutex_lock(&srv->srv_mtx);  
         rpc_srv_registerCall(srv, NULL, CALL_BLOBSHUTDOWN, 0);          rpc_srv_registerCall(srv, NULL, CALL_BLOBSHUTDOWN, 0);
        rpc_srv_registerCall(srv, NULL, CALL_BLOBCLIENTS, 0);        rpc_srv_registerCall(srv, NULL, CALL_BLOBCLIENTS, 1);
        rpc_srv_registerCall(srv, NULL, CALL_BLOBVARS, 0);        rpc_srv_registerCall(srv, NULL, CALL_BLOBVARS, 1);
        rpc_srv_registerCall(srv, NULL, CALL_BLOBSTATE, 1);        rpc_srv_registerCall(srv, NULL, CALL_BLOBSTATE, 0);
        pthread_mutex_unlock(&srv->srv_mtx); 
   
        srv->srv_blob.state = enable;   // enable BLOB        srv->srv_blob.state = enable;   /* enable BLOB */
         return 0;          return 0;
 }  }
   
 /*  /*
 * rpc_srv_endBLOBServer() Destroy BLOB server, close all opened sockets and free resources * rpc_srv_endBLOBServer() - Destroy BLOB server, close all opened sockets and free resources
  *
  * @srv = RPC Server instance   * @srv = RPC Server instance
  * return: none   * return: none
  */   */
Line 441  rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv) Line 603  rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
         register int i;          register int i;
         rpc_blob_t *f;          rpc_blob_t *f;
   
        if (!srv) {        ioTRACE(RPC_TRACE_LEVEL);
                rpc_SetErr(EINVAL, "Error:: Can`t destroy server because parameter is null!\n");
         if (!srv || srv->srv_blob.state == disable)
                 return;                  return;
        } else        else
                srv->srv_blob.state = disable;                srv->srv_blob.state = kill;
   
         rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSHUTDOWN);          rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSHUTDOWN);
         rpc_srv_unregisterCall(srv, NULL, CALL_BLOBCLIENTS);          rpc_srv_unregisterCall(srv, NULL, CALL_BLOBCLIENTS);
         rpc_srv_unregisterCall(srv, NULL, CALL_BLOBVARS);          rpc_srv_unregisterCall(srv, NULL, CALL_BLOBVARS);
         rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSTATE);          rpc_srv_unregisterCall(srv, NULL, CALL_BLOBSTATE);
   
           /* close all clients connections & server socket */
         for (i = 0, c = srv->srv_blob.clients; i < srv->srv_numcli && c; i++, c++)          for (i = 0, c = srv->srv_blob.clients; i < srv->srv_numcli && c; i++, c++)
                if (c->cli_sa.sa_family)                if (c->cli_sa.sa.sa_family)
                         shutdown(c->cli_sock, SHUT_RDWR);                          shutdown(c->cli_sock, SHUT_RDWR);
         close(srv->srv_blob.server.cli_sock);          close(srv->srv_blob.server.cli_sock);
   
           pthread_mutex_lock(&srv->srv_blob.mtx);
         if (srv->srv_blob.clients) {          if (srv->srv_blob.clients) {
                 free(srv->srv_blob.clients);                  free(srv->srv_blob.clients);
                 srv->srv_blob.clients = NULL;                  srv->srv_blob.clients = NULL;
         }          }
   
        pthread_mutex_lock(&srv->srv_blob.mtx);        /* detach blobs */
         while ((f = srv->srv_blob.blobs)) {          while ((f = srv->srv_blob.blobs)) {
                 srv->srv_blob.blobs = f->blob_next;                  srv->srv_blob.blobs = f->blob_next;
                 rpc_srv_blobFree(srv, f);                  rpc_srv_blobFree(srv, f);
Line 470  rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv) Line 635  rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
         }          }
         pthread_mutex_unlock(&srv->srv_blob.mtx);          pthread_mutex_unlock(&srv->srv_blob.mtx);
   
           AIT_FREE_VAL(&srv->srv_blob.dir);
   
         while (pthread_mutex_trylock(&srv->srv_blob.mtx) == EBUSY);          while (pthread_mutex_trylock(&srv->srv_blob.mtx) == EBUSY);
         pthread_mutex_destroy(&srv->srv_blob.mtx);          pthread_mutex_destroy(&srv->srv_blob.mtx);
   
           /* at final, save disable BLOB service state */
           srv->srv_blob.state = disable;
 }  }
   
 /*  /*
 * rpc_srv_execBLOBServer() Execute Main BLOB server loop and wait for clients requests * rpc_srv_loopBLOB() - Execute Main BLOB server loop and wait for clients requests
  *
  * @srv = RPC Server instance   * @srv = RPC Server instance
  * return: -1 error or 0 ok, infinite loop ...   * return: -1 error or 0 ok, infinite loop ...
  */   */
 int  int
rpc_srv_execBLOBServer(rpc_srv_t * __restrict srv)rpc_srv_loopBLOB(rpc_srv_t * __restrict srv)
 {  {
        socklen_t salen = sizeof(struct sockaddr);        socklen_t salen = sizeof(io_sockaddr_t);
         register int i;          register int i;
         rpc_cli_t *c;          rpc_cli_t *c;
         fd_set fds;          fd_set fds;
         int ret;          int ret;
         struct timeval tv = { DEF_RPC_TIMEOUT, 0 };          struct timeval tv = { DEF_RPC_TIMEOUT, 0 };
           pthread_attr_t attr;
   
        if (!srv || srv->srv_blob.state == disable) {        ioTRACE(RPC_TRACE_LEVEL);
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start BLOB server ...\n");
         if (!srv || srv->srv_blob.state == kill) {
                 rpc_SetErr(EINVAL, "Invalid parameter can`t start BLOB server");
                 return -1;                  return -1;
         }          }
   
           tv.tv_sec = srv->srv_session.sess_timeout;
   
         if (listen(srv->srv_blob.server.cli_sock, SOMAXCONN) == -1) {          if (listen(srv->srv_blob.server.cli_sock, SOMAXCONN) == -1) {
                 LOGERR;                  LOGERR;
                 return -1;                  return -1;
         }          }
   
        while (!blob_Kill && !rpc_Kill) {        pthread_attr_init(&attr);
         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
 
         /* main BLOB loop */
         while (srv->srv_blob.state != kill && srv->srv_kill != kill) {
                 for (c = srv->srv_blob.clients, i = 0; i < srv->srv_numcli && c; i++, c++)                  for (c = srv->srv_blob.clients, i = 0; i < srv->srv_numcli && c; i++, c++)
                        if (!c->cli_sa.sa_family)                        if (!c->cli_sa.sa.sa_family)
                                 break;                                  break;
                 if (i >= srv->srv_numcli) {                  if (i >= srv->srv_numcli) {
   #ifdef HAVE_PTHREAD_YIELD
                           pthread_yield();
   #else
                         usleep(1000000);                          usleep(1000000);
   #endif
                         continue;                          continue;
                 }                  }
   
Line 519  rpc_srv_execBLOBServer(rpc_srv_t * __restrict srv) Line 703  rpc_srv_execBLOBServer(rpc_srv_t * __restrict srv)
                 if (!ret)                  if (!ret)
                         continue;                          continue;
   
                c->cli_sock = accept(srv->srv_blob.server.cli_sock, &c->cli_sa, &salen);                c->cli_sock = accept(srv->srv_blob.server.cli_sock, &c->cli_sa.sa, &salen);
                 if (c->cli_sock == -1) {                  if (c->cli_sock == -1) {
                         LOGERR;                          LOGERR;
                         continue;                          continue;
                 } else                  } else
                         c->cli_parent = srv;                          c->cli_parent = srv;
   
                if (pthread_create(&c->cli_tid, NULL, rpc_srv_dispatchVars, c)) {                /* spawn dispatch thread for BLOB client */
                 if (pthread_create(&c->cli_tid, &attr, rpc_srv_dispatchVars, c)) {
                         LOGERR;                          LOGERR;
                         continue;                          continue;
                 }                  }
         }          }
   
        srv->srv_blob.state = disable;        srv->srv_blob.state = kill;
   
           pthread_attr_destroy(&attr);
         return 0;          return 0;
 }  }
   
   
 /*  /*
 * rpc_srv_initServer() Init & create RPC Server * rpc_srv_initServer() - Init & create RPC Server
  *
  * @regProgID = ProgramID for authentication & recognition   * @regProgID = ProgramID for authentication & recognition
  * @regProcID = ProcessID for authentication & recognition   * @regProcID = ProcessID for authentication & recognition
  * @concurentClients = Concurent clients at same time to this server   * @concurentClients = Concurent clients at same time to this server
 * @family = Family socket type, AF_INET or AF_INET6 * @netBuf = Network buffer length, if =0 == BUFSIZ (also meaning max RPC packet)
 * @csHost = Host name or IP address for bind server, if NULL any address * @family = Family type, AF_INET, AF_INET6 or AF_LOCAL
  * @csHost = Host name or address for bind server, if NULL any address
  * @Port = Port for bind server, if Port == 0 default port is selected   * @Port = Port for bind server, if Port == 0 default port is selected
  * return: NULL == error or !=NULL bind and created RPC server instance   * return: NULL == error or !=NULL bind and created RPC server instance
  */   */
 rpc_srv_t *  rpc_srv_t *
 rpc_srv_initServer(u_int regProgID, u_int regProcID, int concurentClients,   rpc_srv_initServer(u_int regProgID, u_int regProcID, int concurentClients, 
                u_short family, const char *csHost, u_short Port)                int netBuf, u_short family, const char *csHost, u_short Port)
 {  {
         rpc_srv_t *srv = NULL;          rpc_srv_t *srv = NULL;
         int n = 1;          int n = 1;
         struct hostent *host = NULL;          struct hostent *host = NULL;
        struct sockaddr_in sin;        io_sockaddr_t sa;
        struct sockaddr_in6 sin6; 
   
        if (!concurentClients || !regProgID || (family != AF_INET && family != AF_INET6)) {        ioTRACE(RPC_TRACE_LEVEL);
 
         if (!concurentClients || !regProgID || 
                         (family != AF_INET && family != AF_INET6 && family != AF_LOCAL)) {
                 rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t init RPC server ...\n");                  rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t init RPC server ...\n");
                 return NULL;                  return NULL;
         }          }
         if (!Port)          if (!Port)
                 Port = RPC_DEFPORT;                  Port = RPC_DEFPORT;
        if (csHost) {        if (!netBuf)
                 netBuf = BUFSIZ;
         else
                 netBuf = io_align(netBuf, 1);   /* align netBuf length */
         if (csHost && family != AF_LOCAL) {
                 host = gethostbyname2(csHost, family);                  host = gethostbyname2(csHost, family);
                 if (!host) {                  if (!host) {
                         rpc_SetErr(h_errno, "Error:: %s\n", hstrerror(h_errno));                          rpc_SetErr(h_errno, "Error:: %s\n", hstrerror(h_errno));
                         return NULL;                          return NULL;
                 }                  }
         }          }
           memset(&sa, 0, sizeof sa);
           sa.sa.sa_family = family;
         switch (family) {          switch (family) {
                 case AF_INET:                  case AF_INET:
                        memset(&sin, 0, sizeof sin);                        sa.sin.sin_len = sizeof(struct sockaddr_in);
                        sin.sin_len = sizeof sin;                        sa.sin.sin_port = htons(Port);
                        sin.sin_family = family; 
                        sin.sin_port = htons(Port); 
                         if (csHost)                          if (csHost)
                                memcpy(&sin.sin_addr, host->h_addr, host->h_length);                                memcpy(&sa.sin.sin_addr, host->h_addr, host->h_length);
                         break;                          break;
                 case AF_INET6:                  case AF_INET6:
                        memset(&sin6, 0, sizeof sin6);                        sa.sin6.sin6_len = sizeof(struct sockaddr_in6);
                        sin6.sin6_len = sizeof sin6;                        sa.sin6.sin6_port = htons(Port);
                        sin6.sin6_family = family; 
                        sin6.sin6_port = htons(Port); 
                         if (csHost)                          if (csHost)
                                memcpy(&sin6.sin6_addr, host->h_addr, host->h_length);                                memcpy(&sa.sin6.sin6_addr, host->h_addr, host->h_length);
                         break;                          break;
                   case AF_LOCAL:
                           sa.sun.sun_len = sizeof(struct sockaddr_un);
                           if (csHost)
                                   strlcpy(sa.sun.sun_path, csHost, sizeof sa.sun.sun_path);
                           unlink(sa.sun.sun_path);
                           break;
                 default:                  default:
                         rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t start RPC server ...\n");                          rpc_SetErr(EINVAL, "Error:: Invalid parameters can`t start RPC server ...\n");
                         return NULL;                          return NULL;
Line 600  rpc_srv_initServer(u_int regProgID, u_int regProcID, i Line 798  rpc_srv_initServer(u_int regProgID, u_int regProcID, i
         } else          } else
                 memset(srv, 0, sizeof(rpc_srv_t));                  memset(srv, 0, sizeof(rpc_srv_t));
   
           srv->srv_netbuf = netBuf;
         srv->srv_numcli = concurentClients;          srv->srv_numcli = concurentClients;
         srv->srv_session.sess_version = RPC_VERSION;          srv->srv_session.sess_version = RPC_VERSION;
           srv->srv_session.sess_timeout = DEF_RPC_TIMEOUT;
         srv->srv_session.sess_program = regProgID;          srv->srv_session.sess_program = regProgID;
         srv->srv_session.sess_process = regProcID;          srv->srv_session.sess_process = regProcID;
   
         srv->srv_server.cli_tid = pthread_self();          srv->srv_server.cli_tid = pthread_self();
         srv->srv_server.cli_parent = srv;          srv->srv_server.cli_parent = srv;
        if (family == AF_INET)        memcpy(&srv->srv_server.cli_sa, &sa, sizeof sa);
                memcpy(&srv->srv_server.cli_sa, &sin, sizeof srv->srv_server.cli_sa);
        else        /* create server socket */
                memcpy(&srv->srv_server.cli_sa, &sin6, sizeof srv->srv_server.cli_sa); 
         srv->srv_server.cli_sock = socket(family, SOCK_STREAM, 0);          srv->srv_server.cli_sock = socket(family, SOCK_STREAM, 0);
         if (srv->srv_server.cli_sock == -1) {          if (srv->srv_server.cli_sock == -1) {
                 LOGERR;                  LOGERR;
Line 623  rpc_srv_initServer(u_int regProgID, u_int regProcID, i Line 822  rpc_srv_initServer(u_int regProgID, u_int regProcID, i
                 free(srv);                  free(srv);
                 return NULL;                  return NULL;
         }          }
        if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa, sizeof srv->srv_server.cli_sa) == -1) {        n = srv->srv_netbuf;
         if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
                 LOGERR;                  LOGERR;
                 close(srv->srv_server.cli_sock);                  close(srv->srv_server.cli_sock);
                 free(srv);                  free(srv);
                 return NULL;                  return NULL;
         }          }
           if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
                   LOGERR;
                   close(srv->srv_server.cli_sock);
                   free(srv);
                   return NULL;
           }
           if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa, 
                                   srv->srv_server.cli_sa.sa.sa_len) == -1) {
                   LOGERR;
                   close(srv->srv_server.cli_sock);
                   free(srv);
                   return NULL;
           }
   
           /* allocate pool for concurent clients */
         srv->srv_clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));          srv->srv_clients = calloc(srv->srv_numcli, sizeof(rpc_cli_t));
         if (!srv->srv_clients) {          if (!srv->srv_clients) {
                 LOGERR;                  LOGERR;
Line 642  rpc_srv_initServer(u_int regProgID, u_int regProcID, i Line 856  rpc_srv_initServer(u_int regProgID, u_int regProcID, i
         pthread_mutex_init(&srv->srv_mtx, NULL);          pthread_mutex_init(&srv->srv_mtx, NULL);
   
         rpc_srv_registerCall(srv, NULL, CALL_SRVSHUTDOWN, 0);          rpc_srv_registerCall(srv, NULL, CALL_SRVSHUTDOWN, 0);
        rpc_srv_registerCall(srv, NULL, CALL_SRVCLIENTS, 0);        rpc_srv_registerCall(srv, NULL, CALL_SRVCLIENTS, 1);
        rpc_srv_registerCall(srv, NULL, CALL_SRVCALLS, 0);        rpc_srv_registerCall(srv, NULL, CALL_SRVSESSIONS, 4);
        rpc_srv_registerCall(srv, NULL, CALL_SRVSESSIONS, 0);        rpc_srv_registerCall(srv, NULL, CALL_SRVCALLS, 1);
 
         return srv;          return srv;
 }  }
   
 /*  /*
 * rpc_srv_endServer() Destroy RPC server, close all opened sockets and free resources * rpc_srv_endServer() - Destroy RPC server, close all opened sockets and free resources
 * @srv = RPC Server instance *
  * @psrv = RPC Server instance
  * return: none   * return: none
  */   */
 void  void
rpc_srv_endServer(rpc_srv_t * __restrict srv)rpc_srv_endServer(rpc_srv_t ** __restrict psrv)
 {  {
         rpc_cli_t *c;          rpc_cli_t *c;
         register int i;          register int i;
         rpc_func_t *f;          rpc_func_t *f;
   
        if (!srv) {        ioTRACE(RPC_TRACE_LEVEL);
 
         if (!psrv || !*psrv) {
                 rpc_SetErr(EINVAL, "Error:: Can`t destroy server because parameter is null!\n");                  rpc_SetErr(EINVAL, "Error:: Can`t destroy server because parameter is null!\n");
                 return;                  return;
         }          }
   
        rpc_srv_endBLOBServer(srv);        rpc_srv_endBLOBServer(*psrv);
   
        for (i = 0, c = srv->srv_clients; i < srv->srv_numcli && c; i++, c++)        /* close all clients connections & server socket */
                if (c->cli_sa.sa_family) {        for (i = 0, c = (*psrv)->srv_clients; i < (*psrv)->srv_numcli && c; i++, c++)
                 if (c->cli_sa.sa.sa_family) {
                         shutdown(c->cli_sock, SHUT_RDWR);                          shutdown(c->cli_sock, SHUT_RDWR);
                         close(c->cli_sock);                          close(c->cli_sock);
                 }                  }
        close(srv->srv_server.cli_sock);        close((*psrv)->srv_server.cli_sock);
   
        if (srv->srv_clients) {        if ((*psrv)->srv_clients) {
                free(srv->srv_clients);                free((*psrv)->srv_clients);
                srv->srv_clients = NULL;                (*psrv)->srv_clients = NULL;
                srv->srv_numcli = 0;                (*psrv)->srv_numcli = 0;
         }          }
   
        pthread_mutex_lock(&srv->srv_mtx);        /* detach exported calls */
        while ((f = srv->srv_funcs)) {        pthread_mutex_lock(&(*psrv)->srv_mtx);
                srv->srv_funcs = f->func_next;        while ((f = (*psrv)->srv_funcs)) {
                 (*psrv)->srv_funcs = f->func_next;
                 io_freeVars(&f->func_vars);
                 AIT_FREE_VAL(&f->func_name);
                 AIT_FREE_VAL(&f->func_file);
                 free(f);                  free(f);
         }          }
        pthread_mutex_unlock(&srv->srv_mtx);        pthread_mutex_unlock(&(*psrv)->srv_mtx);
   
        while (pthread_mutex_trylock(&srv->srv_mtx) == EBUSY);        while (pthread_mutex_trylock(&(*psrv)->srv_mtx) == EBUSY);
        pthread_mutex_destroy(&srv->srv_mtx);        pthread_mutex_destroy(&(*psrv)->srv_mtx);
   
        free(srv);        free(*psrv);
        srv = NULL;        *psrv = NULL;
 }  }
   
 /*  /*
 * rpc_srv_execServer() Execute Main server loop and wait for clients requests * rpc_srv_loopServer() - Execute Main server loop and wait for clients requests
  *
  * @srv = RPC Server instance   * @srv = RPC Server instance
  * return: -1 error or 0 ok, infinite loop ...   * return: -1 error or 0 ok, infinite loop ...
  */   */
 int  int
rpc_srv_execServer(rpc_srv_t * __restrict srv)rpc_srv_loopServer(rpc_srv_t * __restrict srv)
 {  {
        socklen_t salen = sizeof(struct sockaddr);        socklen_t salen = sizeof(io_sockaddr_t);
         register int i;          register int i;
         rpc_cli_t *c;          rpc_cli_t *c;
         fd_set fds;          fd_set fds;
         int ret;          int ret;
         struct timeval tv = { DEF_RPC_TIMEOUT, 0 };          struct timeval tv = { DEF_RPC_TIMEOUT, 0 };
           pthread_attr_t attr;
   
           ioTRACE(RPC_TRACE_LEVEL);
   
         if (!srv) {          if (!srv) {
                 rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start RPC server ...\n");                  rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t start RPC server ...\n");
                 return -1;                  return -1;
         }          }
   
           tv.tv_sec = srv->srv_session.sess_timeout;
   
           /* activate BLOB server worker if srv->srv_blob.state == enable */
           rpc_srv_execBLOBServer(srv);
   
         if (listen(srv->srv_server.cli_sock, SOMAXCONN) == -1) {          if (listen(srv->srv_server.cli_sock, SOMAXCONN) == -1) {
                 LOGERR;                  LOGERR;
                 return -1;                  return -1;
         }          }
   
        while (!rpc_Kill) {        pthread_attr_init(&attr);
         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
 
         /* main rpc loop */
         while (srv->srv_kill != kill) {
                 for (c = srv->srv_clients, i = 0; i < srv->srv_numcli && c; i++, c++)                  for (c = srv->srv_clients, i = 0; i < srv->srv_numcli && c; i++, c++)
                        if (!c->cli_sa.sa_family)                        if (!c->cli_sa.sa.sa_family)
                                 break;                                  break;
                 if (i >= srv->srv_numcli) {                  if (i >= srv->srv_numcli) {
   #ifdef HAVE_PTHREAD_YIELD
                           pthread_yield();
   #else
                         usleep(1000000);                          usleep(1000000);
   #endif
                         continue;                          continue;
                 }                  }
   
Line 739  rpc_srv_execServer(rpc_srv_t * __restrict srv) Line 979  rpc_srv_execServer(rpc_srv_t * __restrict srv)
                 if (!ret)                  if (!ret)
                         continue;                          continue;
   
                c->cli_sock = accept(srv->srv_server.cli_sock, &c->cli_sa, &salen);                c->cli_sock = accept(srv->srv_server.cli_sock, &c->cli_sa.sa, &salen);
                 if (c->cli_sock == -1) {                  if (c->cli_sock == -1) {
                         LOGERR;                          LOGERR;
                         continue;                          continue;
                 } else                  } else
                         c->cli_parent = srv;                          c->cli_parent = srv;
   
                if (pthread_create(&c->cli_tid, NULL, rpc_srv_dispatchCall, c)) {                /* spawn rpc client dispatcher */
                 if (pthread_create(&c->cli_tid, &attr, rpc_srv_dispatchCall, c)) {
                         LOGERR;                          LOGERR;
                         continue;                          continue;
                 }                  }
         }          }
   
           pthread_attr_destroy(&attr);
         return 0;          return 0;
 }  }
   
Line 759  rpc_srv_execServer(rpc_srv_t * __restrict srv) Line 1001  rpc_srv_execServer(rpc_srv_t * __restrict srv)
   
 /*  /*
  * rpc_srv_execCall() Execute registered call from RPC server   * rpc_srv_execCall() Execute registered call from RPC server
    *
  * @call = Register RPC call   * @call = Register RPC call
  * @rpc = IN RPC call structure   * @rpc = IN RPC call structure
 * @args = IN RPC call array of rpc values * @args = IN RPC calling arguments from RPC client
  * return: -1 error, !=-1 ok   * return: -1 error, !=-1 ok
  */   */
 int  int
 rpc_srv_execCall(rpc_func_t * __restrict call, struct tagRPCCall * __restrict rpc,   rpc_srv_execCall(rpc_func_t * __restrict call, struct tagRPCCall * __restrict rpc, 
                rpc_val_t * __restrict args)                array_t * __restrict args)
 {  {
         void *dl;          void *dl;
         rpc_callback_t func;          rpc_callback_t func;
         int ret;          int ret;
   
           ioTRACE(RPC_TRACE_LEVEL);
   
         if (!call || !rpc || !call->func_parent) {          if (!call || !rpc || !call->func_parent) {
                rpc_SetErr(EINVAL, "Error:: Invalid parameter can`t exec call from RPC server ...\n");                rpc_SetErr(EINVAL, "Invalid parameter can`t exec function");
                 return -1;                  return -1;
         }          }
   
        dl = dlopen((char*) (*call->func_file ? call->func_file : NULL), RTLD_NOW);        dl = dlopen(AIT_VOID(&call->func_file), RTLD_NOW);
         if (!dl) {          if (!dl) {
                rpc_SetErr(ENOENT, "Error:: Can`t attach module %s!\n", dlerror());                rpc_SetErr(ENOENT, "Can`t attach module %s!", dlerror());
                 return -1;                  return -1;
         }          }
   
        func = dlsym(dl, (char*) call->func_name);        func = dlsym(dl, (const char*) AIT_GET_STR(&call->func_name));
         if (func)          if (func)
                ret = func(call, rpc->call_argc, args);                ret = func(call, ntohs(rpc->call_argc), args);
         else {          else {
                rpc_SetErr(ENOEXEC, "Error:: Can`t find function %s!\n", dlerror());                rpc_SetErr(ENOEXEC, "Can`t find function %s!", dlerror());
                 ret = -1;                  ret = -1;
         }          }
   

Removed from v.1.2  
changed lines
  Added in v.1.8


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>