Annotation of libaitrpc/src/srv.c, revision 1.17.4.8

1.1       misho       1: /*************************************************************************
                      2: * (C) 2010 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
                      3: *  by Michael Pounov <misho@openbsd-bg.org>
                      4: *
                      5: * $Author: misho $
1.17.4.8! misho       6: * $Id: srv.c,v 1.17.4.7 2013/08/21 13:02:33 misho Exp $
1.1       misho       7: *
1.2       misho       8: **************************************************************************
                      9: The ELWIX and AITNET software is distributed under the following
                     10: terms:
                     11: 
                     12: All of the documentation and software included in the ELWIX and AITNET
                     13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
                     14: 
1.14      misho      15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013
1.2       misho      16:        by Michael Pounov <misho@elwix.org>.  All rights reserved.
                     17: 
                     18: Redistribution and use in source and binary forms, with or without
                     19: modification, are permitted provided that the following conditions
                     20: are met:
                     21: 1. Redistributions of source code must retain the above copyright
                     22:    notice, this list of conditions and the following disclaimer.
                     23: 2. Redistributions in binary form must reproduce the above copyright
                     24:    notice, this list of conditions and the following disclaimer in the
                     25:    documentation and/or other materials provided with the distribution.
                     26: 3. All advertising materials mentioning features or use of this software
                     27:    must display the following acknowledgement:
                     28: This product includes software developed by Michael Pounov <misho@elwix.org>
                     29: ELWIX - Embedded LightWeight unIX and its contributors.
                     30: 4. Neither the name of AITNET nor the names of its contributors
                     31:    may be used to endorse or promote products derived from this software
                     32:    without specific prior written permission.
                     33: 
                     34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
                     35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
                     36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
                     37: ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
                     38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
                     39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
                     40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
                     41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
                     42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
                     43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
                     44: SUCH DAMAGE.
                     45: */
1.1       misho      46: #include "global.h"
                     47: 
                     48: 
1.13      misho      49: /* SOCK_STREAM */
                     50: static void *acceptClients(sched_task_t *);
                     51: static void *closeClient(sched_task_t *);
                     52: static void *rxPacket(sched_task_t *);
                     53: static void *txPacket(sched_task_t *);
                     54: 
                     55: /* SOCK_DGRAM */
                     56: static void *freeClient(sched_task_t *);
                     57: static void *rxUDPPacket(sched_task_t *);
                     58: static void *txUDPPacket(sched_task_t *);
                     59: 
                     60: /* SOCK_RAW */
                     61: 
                     62: static sched_task_func_t cbProto[SOCK_RAW + 1][4] = {
                     63:        { acceptClients, closeClient, rxPacket, txPacket },     /* SOCK_STREAM */
                     64:        { acceptClients, closeClient, rxPacket, txPacket },     /* SOCK_STREAM */
                     65:        { rxUDPPacket, freeClient, rxUDPPacket, txUDPPacket },  /* SOCK_DGRAM */
                     66:        { NULL, NULL, NULL, NULL }                              /* SOCK_RAW */
                     67: };
                     68: 
                     69: 
1.17.4.8! misho      70: ait_val_t *
        !            71: rpc_getBufVar(rpc_cli_t * __restrict c)
        !            72: {
        !            73:        return array(c->cli_buf, RPC_ISNEXTBUF(c), ait_val_t*);
        !            74: }
        !            75: 
        !            76: u_char *
        !            77: rpc_getBuffer(rpc_cli_t * __restrict c)
1.17.4.3  misho      78: {
                     79:        u_char *b = NULL;
                     80: 
                     81:        assert(c);
                     82: 
                     83:        if (RPC_ISNEXTBUF(c))
                     84:                b = AIT_GET_BUF(array(c->cli_buf, 1, ait_val_t*));
                     85:        else
                     86:                b = AIT_GET_BUF(array(c->cli_buf, 0, ait_val_t*)) + 
                     87:                        sizeof(struct tagRPCCall);
                     88: 
                     89:        return b;
                     90: }
                     91: 
1.17.4.8! misho      92: struct tagRPCCall *
        !            93: rpc_getHeader(rpc_cli_t * __restrict c)
1.17.4.3  misho      94: {
                     95:        assert(c);
                     96: 
                     97:        return (struct tagRPCCall*) AIT_GET_BUF(array(c->cli_buf, 0, ait_val_t*));
                     98: }
                     99: 
1.16      misho     100: void
1.13      misho     101: rpc_freeCli(rpc_cli_t * __restrict c)
1.10      misho     102: {
                    103:        rpc_srv_t *s = c->cli_parent;
                    104: 
1.13      misho     105:        schedCancelby(s->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
1.10      misho     106: 
1.17.4.1  misho     107:        /* free buffer(s) */
                    108:        ait_freeVars(&c->cli_buf);
1.10      misho     109: 
1.14      misho     110:        array_Del(s->srv_clients, c->cli_id, 0);
1.10      misho     111:        if (c)
1.14      misho     112:                e_free(c);
1.13      misho     113: }
                    114: 
                    115: 
                    116: static inline int
1.14      misho     117: _check4freeslot(rpc_srv_t * __restrict srv, sockaddr_t * __restrict sa)
1.13      misho     118: {
                    119:        rpc_cli_t *c = NULL;
                    120:        register int i;
                    121: 
                    122:        /* check free slots for connect */
1.14      misho     123:        for (i = 0; i < array_Size(srv->srv_clients) && 
                    124:                        (c = array(srv->srv_clients, i, rpc_cli_t*)); i++)
1.13      misho     125:                /* check for duplicates */
1.14      misho     126:                if (sa && !e_addrcmp(&c->cli_sa, sa, 42))
1.13      misho     127:                        break;
1.14      misho     128:        if (i >= array_Size(srv->srv_clients))
1.13      misho     129:                return -1;      /* no more free slots! */
                    130: 
                    131:        return i;
                    132: }
                    133: 
                    134: static rpc_cli_t *
1.14      misho     135: _allocClient(rpc_srv_t * __restrict srv, sockaddr_t * __restrict sa)
1.13      misho     136: {
                    137:        rpc_cli_t *c = NULL;
                    138:        int n;
                    139: 
                    140:        n = _check4freeslot(srv, sa);
                    141:        if (n == -1)
                    142:                return NULL;
                    143:        else
1.14      misho     144:                c = array(srv->srv_clients, n, rpc_cli_t*);
1.13      misho     145: 
                    146:        if (!c) {
1.14      misho     147:                c = e_malloc(sizeof(rpc_cli_t));
1.13      misho     148:                if (!c) {
                    149:                        LOGERR;
                    150:                        srv->srv_kill = 1;
                    151:                        return NULL;
                    152:                } else {
                    153:                        memset(c, 0, sizeof(rpc_cli_t));
1.14      misho     154:                        array_Set(srv->srv_clients, n, c);
1.13      misho     155:                        c->cli_id = n;
                    156:                        c->cli_parent = srv;
                    157:                }
                    158: 
1.17.4.2  misho     159:                /* init buffer(s) */
1.17.4.5  misho     160:                c->cli_buf = ait_allocVars(2);
1.17.4.1  misho     161:                if (!c->cli_buf) {
                    162:                        rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
                    163:                        array_Del(srv->srv_clients, n, 42);
                    164:                        return NULL;
                    165:                } else
                    166:                        AIT_SET_BUFSIZ(array(c->cli_buf, 0, ait_val_t*), 0, srv->srv_netbuf);
1.13      misho     167:        }
                    168: 
                    169:        return c;
                    170: }
                    171: 
                    172: 
                    173: static void *
                    174: freeClient(sched_task_t *task)
                    175: {
                    176:        rpc_freeCli(TASK_ARG(task));
                    177: 
                    178:        return NULL;
                    179: }
                    180: 
                    181: static void *
                    182: closeClient(sched_task_t *task)
                    183: {
                    184:        int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock;
                    185: 
                    186:        rpc_freeCli(TASK_ARG(task));
                    187: 
                    188:        /* close client socket */
                    189:        shutdown(sock, SHUT_RDWR);
                    190:        close(sock);
1.10      misho     191:        return NULL;
                    192: }
1.7       misho     193: 
                    194: static void *
                    195: txPacket(sched_task_t *task)
                    196: {
                    197:        rpc_cli_t *c = TASK_ARG(task);
                    198:        rpc_srv_t *s = c->cli_parent;
                    199:        rpc_func_t *f = NULL;
1.17.4.4  misho     200:        u_char *buf;
1.17.4.5  misho     201:        struct tagRPCCall *rpc = (struct tagRPCCall*) TASK_DATA(task);
1.7       misho     202:        int ret, wlen = sizeof(struct tagRPCCall);
1.17.4.4  misho     203:        int len = sizeof(struct tagRPCCall) + ntohl(rpc->call_len);
1.10      misho     204: 
1.17.4.4  misho     205:        buf = e_malloc(len);
                    206:        if (!buf) {
                    207:                rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
1.17.4.5  misho     208:                /* close connection */
                    209:                schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                    210:                                TASK_ARG(task), 0, NULL, 0);
1.17.4.4  misho     211:                return NULL;
                    212:        } else {
                    213:                /* copy RPC header */
1.17.4.5  misho     214:                memcpy(buf, rpc, wlen);
1.17.4.4  misho     215:                rpc = (struct tagRPCCall*) buf;
                    216:        }
1.7       misho     217: 
                    218:        if (rpc->call_argc) {
1.10      misho     219:                f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
1.7       misho     220:                if (!f) {
1.10      misho     221:                        rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
1.17.4.4  misho     222: 
1.7       misho     223:                        rpc->call_argc ^= rpc->call_argc;
                    224:                        rpc->call_rep.ret = RPC_ERROR(-1);
                    225:                        rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                    226:                } else {
1.14      misho     227:                        rpc->call_argc = htons(array_Size(RPC_RETVARS(c)));
1.7       misho     228:                        /* Go Encapsulate variables */
1.17.4.4  misho     229:                        ret = ait_vars2buffer(buf + wlen, len - wlen, RPC_RETVARS(c));
1.10      misho     230:                        /* Free return values */
1.14      misho     231:                        ait_freeVars(&c->cli_vars);
1.7       misho     232:                        if (ret == -1) {
                    233:                                rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
1.17.4.4  misho     234: 
1.7       misho     235:                                rpc->call_argc ^= rpc->call_argc;
                    236:                                rpc->call_rep.ret = RPC_ERROR(-1);
                    237:                                rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                    238:                        } else
                    239:                                wlen += ret;
                    240:                }
                    241:        }
                    242: 
1.17.4.4  misho     243:        rpc->call_len = htonl(wlen);
1.7       misho     244: 
                    245:        /* send reply */
1.10      misho     246:        ret = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);
                    247:        if (ret == -1 || ret != wlen) {
                    248:                /* close connection */
1.13      misho     249:                schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                    250:                                TASK_ARG(task), 0, NULL, 0);
1.10      misho     251:        }
1.7       misho     252: 
1.17.4.5  misho     253:        e_free(buf);
1.7       misho     254:        return NULL;
                    255: }
                    256: 
                    257: static void *
                    258: execCall(sched_task_t *task)
                    259: {
                    260:        rpc_cli_t *c = TASK_ARG(task);
                    261:        rpc_srv_t *s = c->cli_parent;
                    262:        rpc_func_t *f = NULL;
                    263:        array_t *arr = NULL;
1.17.4.8! misho     264:        u_char *buf = rpc_getBuffer(c);
        !           265:        struct tagRPCCall *rpc = rpc_getHeader(c);
1.7       misho     266:        int argc = ntohs(rpc->call_argc);
                    267: 
                    268:        /* Go decapsulate variables ... */
1.8       misho     269:        if (argc) {
1.17.4.3  misho     270:                arr = ait_buffer2vars(buf, ntohl(rpc->call_len), argc, 42);
1.7       misho     271:                if (!arr) {
1.14      misho     272:                        rpc_SetErr(ERPCMISMATCH, "#%d - %s", elwix_GetErrno(), elwix_GetError());
1.17.4.3  misho     273: 
1.7       misho     274:                        rpc->call_argc ^= rpc->call_argc;
                    275:                        rpc->call_rep.ret = RPC_ERROR(-1);
                    276:                        rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                    277:                        return NULL;
                    278:                }
1.10      misho     279:        } else
                    280:                arr = NULL;
1.7       misho     281: 
1.10      misho     282:        if (!(f = rpc_srv_getCall(s, ntohs(rpc->call_tag)))) {
1.7       misho     283:                rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
1.17.4.3  misho     284: 
1.7       misho     285:                rpc->call_argc ^= rpc->call_argc;
                    286:                rpc->call_rep.ret = RPC_ERROR(-1);
                    287:                rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                    288:        } else {
1.8       misho     289:                /* if client doesn't want reply */
1.10      misho     290:                argc = RPC_CHK_NOREPLY(rpc);
                    291:                rpc->call_rep.ret = RPC_ERROR(rpc_srv_execCall(c, rpc, f->func_name, arr));
1.7       misho     292:                if (rpc->call_rep.ret == htonl(-1)) {
                    293:                        rpc->call_rep.eno = RPC_ERROR(errno);
                    294:                        rpc->call_argc ^= rpc->call_argc;
                    295:                } else {
                    296:                        rpc->call_rep.eno ^= rpc->call_rep.eno;
1.8       misho     297:                        if (argc) {
                    298:                                /* without reply */
1.14      misho     299:                                ait_freeVars(&c->cli_vars);
1.8       misho     300:                                rpc->call_argc ^= rpc->call_argc;
1.10      misho     301:                        } else {
                    302:                                /* reply */
1.14      misho     303:                                rpc->call_argc = htons(array_Size(RPC_RETVARS(c)));
1.10      misho     304:                        }
1.7       misho     305:                }
                    306:        }
                    307: 
1.14      misho     308:        array_Destroy(&arr);
1.7       misho     309:        return NULL;
                    310: }
                    311: 
                    312: static void *
                    313: rxPacket(sched_task_t *task)
                    314: {
                    315:        rpc_cli_t *c = TASK_ARG(task);
                    316:        rpc_srv_t *s = c->cli_parent;
1.10      misho     317:        int len, rlen, noreply;
1.17.4.3  misho     318:        ait_val_t *bufz = array(c->cli_buf, 0, ait_val_t*);
                    319:        u_char *buf = (u_char*) AIT_GET_BUF(bufz);
                    320:        struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
1.7       misho     321: 
1.17.4.3  misho     322:        memset(buf, 0, AIT_LEN(bufz));
                    323:        /* 1st buffer is last */
                    324:        RPC_CLR_NEXTBUF(c);
                    325: 
                    326:        /* read rpc header */
                    327:        rlen = recv(TASK_FD(task), rpc, MIN(sizeof(struct tagRPCCall), AIT_LEN(bufz)), 0);
1.17.4.5  misho     328:        if (rlen < sizeof(struct tagRPCCall) || 
1.17.4.3  misho     329:                        ntohl(rpc->call_len) < sizeof(struct tagRPCCall)) {
                    330:                /* close connection */
                    331:                schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                    332:                                TASK_ARG(task), 0, NULL, 0);
                    333:                return NULL;
1.10      misho     334:        } else {
1.17.4.3  misho     335:                buf += sizeof(struct tagRPCCall);
                    336:                len = ntohl(rpc->call_len);
1.9       misho     337:        }
1.7       misho     338: 
1.17.4.3  misho     339:        if (len > (AIT_LEN(bufz) - sizeof(struct tagRPCCall))) {
                    340:                /* add extra buffer */
                    341:                if (!(bufz = ait_getVars(&c->cli_buf, 1))) {
                    342:                        /* close connection */
                    343:                        schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                    344:                                        TASK_ARG(task), 0, NULL, 0);
1.8       misho     345:                        return NULL;
1.17.4.3  misho     346:                } else {
1.17.4.5  misho     347:                        AIT_FREE_VAL(bufz);
1.17.4.3  misho     348:                        AIT_SET_BUFSIZ(bufz, 0, len);
                    349:                        buf = AIT_GET_BUF(bufz);
1.10      misho     350:                }
1.17.4.3  misho     351:                /* buffer isnt last */
                    352:                RPC_SET_NEXTBUF(c);
                    353:        }
1.8       misho     354: 
1.17.4.3  misho     355:        /* read payload */
                    356:        rlen = recv(TASK_FD(task), buf, len, 0);
                    357:        if (rlen < len) {
                    358:                /* close connection */
                    359:                schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                    360:                                TASK_ARG(task), 0, NULL, 0);
                    361:                return NULL;
                    362:        }
1.7       misho     363: 
1.17.4.3  misho     364:        noreply = RPC_CHK_NOREPLY(rpc);
1.7       misho     365: 
1.17.4.3  misho     366:        /* check RPC packet session info */
                    367:        if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) {
                    368:                rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
1.7       misho     369: 
1.17.4.3  misho     370:                rpc->call_argc ^= rpc->call_argc;
                    371:                rpc->call_rep.ret = RPC_ERROR(-1);
                    372:                rpc->call_rep.eno = RPC_ERROR(errno);
                    373:        } else {
                    374:                /* execute RPC call */
                    375:                schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), noreply, rpc, len);
                    376:        }
1.7       misho     377: 
1.17.4.3  misho     378:        /* send RPC reply */
                    379:        if (!noreply)
                    380:                schedWrite(TASK_ROOT(task), cbProto[s->srv_proto][CB_TXPACKET], 
                    381:                                TASK_ARG(task), TASK_FD(task), rpc, len);
1.7       misho     382: 
                    383:        /* lets get next packet */
1.17.4.3  misho     384:        schedReadSelf(task);
1.7       misho     385:        return NULL;
                    386: }
                    387: 
1.1       misho     388: static void *
1.10      misho     389: acceptClients(sched_task_t *task)
1.1       misho     390: {
1.10      misho     391:        rpc_srv_t *srv = TASK_ARG(task);
                    392:        rpc_cli_t *c = NULL;
1.14      misho     393:        socklen_t salen = sizeof(sockaddr_t);
1.7       misho     394: 
1.13      misho     395:        c = _allocClient(srv, NULL);
                    396:        if (!c)
1.10      misho     397:                goto end;
                    398: 
                    399:        /* accept client */
                    400:        c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
                    401:        if (c->cli_sock == -1) {
                    402:                LOGERR;
1.17.4.2  misho     403:                ait_freeVars(&c->cli_buf);
1.14      misho     404:                array_Del(srv->srv_clients, c->cli_id, 42);
1.10      misho     405:                goto end;
1.1       misho     406:        } else
1.10      misho     407:                fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
1.1       misho     408: 
1.13      misho     409:        schedRead(TASK_ROOT(task), cbProto[srv->srv_proto][CB_RXPACKET], c, 
                    410:                        c->cli_sock, NULL, 0);
1.10      misho     411: end:
                    412:        schedReadSelf(task);
                    413:        return NULL;
                    414: }
1.5       misho     415: 
1.7       misho     416: 
1.10      misho     417: static void *
1.13      misho     418: txUDPPacket(sched_task_t *task)
1.10      misho     419: {
                    420:        rpc_cli_t *c = TASK_ARG(task);
                    421:        rpc_srv_t *s = c->cli_parent;
1.13      misho     422:        rpc_func_t *f = NULL;
1.17.4.5  misho     423:        u_char *buf;
                    424:        struct tagRPCCall *rpc = (struct tagRPCCall*) TASK_DATA(task);
1.13      misho     425:        int ret, wlen = sizeof(struct tagRPCCall);
1.17.4.5  misho     426:        int len = sizeof(struct tagRPCCall) + ntohl(rpc->call_len);
1.13      misho     427:        struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
                    428: 
                    429:        schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
                    430:        schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                    431:                         TASK_ARG(task), ts, TASK_ARG(task), 0);
                    432: 
1.17.4.5  misho     433:        buf = e_malloc(len);
                    434:        if (!buf) {
                    435:                rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
                    436:                /* close connection */
                    437:                schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                    438:                                 TASK_ARG(task), 0, NULL, 0);
                    439:                return NULL;
                    440:        } else {
                    441:                /* copy RPC header */
                    442:                memcpy(buf, rpc, wlen);
                    443:                rpc = (struct tagRPCCall*) buf;
                    444:        }
1.13      misho     445: 
                    446:        if (rpc->call_argc) {
                    447:                f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
                    448:                if (!f) {
                    449:                        rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
1.17.4.5  misho     450: 
1.13      misho     451:                        rpc->call_argc ^= rpc->call_argc;
                    452:                        rpc->call_rep.ret = RPC_ERROR(-1);
                    453:                        rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                    454:                } else {
1.14      misho     455:                        rpc->call_argc = htons(array_Size(RPC_RETVARS(c)));
1.13      misho     456:                        /* Go Encapsulate variables */
1.17.4.5  misho     457:                        ret = ait_vars2buffer(buf + wlen, len - wlen, RPC_RETVARS(c));
1.13      misho     458:                        /* Free return values */
1.14      misho     459:                        ait_freeVars(&c->cli_vars);
1.13      misho     460:                        if (ret == -1) {
                    461:                                rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
1.17.4.5  misho     462: 
1.13      misho     463:                                rpc->call_argc ^= rpc->call_argc;
                    464:                                rpc->call_rep.ret = RPC_ERROR(-1);
                    465:                                rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                    466:                        } else
                    467:                                wlen += ret;
                    468:                }
                    469:        }
1.7       misho     470: 
1.17.4.5  misho     471:        rpc->call_len = htonl(wlen);
1.7       misho     472: 
1.13      misho     473:        /* calculate CRC */
                    474:        rpc->call_crc ^= rpc->call_crc;
                    475:        rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
                    476: 
                    477:        /* send reply */
                    478:        ret = sendto(TASK_FD(task), buf, wlen, MSG_NOSIGNAL, 
                    479:                        &c->cli_sa.sa, c->cli_sa.sa.sa_len);
                    480:        if (ret == -1 || ret != wlen) {
                    481:                /* close connection */
                    482:                schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                    483:                                 TASK_ARG(task), 0, NULL, 0);
                    484:        }
                    485: 
1.17.4.5  misho     486:        e_free(buf);
1.13      misho     487:        return NULL;
                    488: }
                    489: 
                    490: static void *
                    491: rxUDPPacket(sched_task_t *task)
                    492: {
                    493:        rpc_srv_t *srv = TASK_ARG(task);
                    494:        rpc_cli_t *c = NULL;
                    495:        int len, rlen, noreply;
1.17.4.5  misho     496:        ait_val_t *bufz;
                    497:        u_char *buf = NULL;
                    498:        struct tagRPCCall rpcbuf, *rpc;
                    499:        sockaddr_t sa[2];
1.14      misho     500:        socklen_t salen;
1.13      misho     501:        struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
                    502: 
1.17.4.5  misho     503:        memset(&rpcbuf, 0, sizeof rpcbuf);
                    504: 
1.13      misho     505:        /* receive connect packet */
1.17.4.5  misho     506:        salen = sa[0].ss.ss_len = sizeof(sockaddr_t);
                    507:        rlen = recvfrom(TASK_FD(task), &rpcbuf, sizeof rpcbuf, 0, &sa[0].sa, &salen);
                    508:        if (rlen < sizeof(struct tagRPCCall) || ntohl(rpcbuf.call_len) < sizeof(struct tagRPCCall))
                    509:                goto end;
                    510:        else
                    511:                len = ntohl(rpcbuf.call_len);
                    512: 
                    513:        buf = e_malloc(len);
                    514:        if (!buf)
1.13      misho     515:                goto end;
1.17.4.5  misho     516:        else
                    517:                memset(buf, 0, len);
1.13      misho     518: 
1.17.4.5  misho     519:        /* read payload */
                    520:        salen = sa[1].ss.ss_len = sizeof(sockaddr_t);
                    521:        rlen = recvfrom(TASK_FD(task), buf, len, 0, &sa[1].sa, &salen);
                    522:        if (rlen < len || memcmp(&sa[0], &sa[1], sizeof sa[0]))
                    523:                goto end;
                    524: 
                    525:        c = _allocClient(srv, sa);
1.13      misho     526:        if (!c)
                    527:                goto end;
                    528:        else {
1.17.4.5  misho     529:                /* add extra buffer */
                    530:                if (!(bufz = ait_getVars(&c->cli_buf, 1)))
                    531:                        goto end;
                    532:                else {
                    533:                        AIT_FREE_VAL(bufz);
                    534:                        AIT_SET_BUFSIZ(bufz, 0, len);
                    535:                        /* buffer isnt last */
                    536:                        RPC_SET_NEXTBUF(c);
                    537:                }
                    538: 
1.17.4.8! misho     539:                rpc = rpc_getHeader(c);
1.17.4.5  misho     540:                memcpy(rpc, &rpcbuf, sizeof(struct tagRPCCall));
1.17.4.8! misho     541:                memcpy(rpc_getBuffer(c), buf, len);
1.17.4.5  misho     542: 
1.13      misho     543:                c->cli_sock = TASK_FD(task);
1.17.4.5  misho     544:                memcpy(&c->cli_sa, sa, sizeof c->cli_sa);
1.13      misho     545:                /* armed timer for close stateless connection */
                    546:                schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
                    547:                schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], 
                    548:                                c, ts, c, 0);
                    549:        }
                    550: 
1.17.4.5  misho     551:        /* check integrity of packet */
                    552:        if (ntohs(rpc->call_crc) != crcFletcher16((u_short*) buf, len / 2)) {
                    553:                rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
                    554:                goto end;
                    555:        }
1.13      misho     556: 
1.17.4.5  misho     557:        noreply = RPC_CHK_NOREPLY(rpc);
1.13      misho     558: 
1.17.4.5  misho     559:        /* check RPC packet session info */
                    560:        if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
                    561:                rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
1.13      misho     562: 
1.17.4.5  misho     563:                rpc->call_argc ^= rpc->call_argc;
                    564:                rpc->call_rep.ret = RPC_ERROR(-1);
                    565:                rpc->call_rep.eno = RPC_ERROR(errno);
                    566:        } else {
                    567:                /* execute RPC call */
                    568:                schedEvent(TASK_ROOT(task), execCall, c, noreply, rpc, len);
                    569:        }
1.13      misho     570: 
1.17.4.5  misho     571:        /* send RPC reply */
                    572:        if (!noreply)
                    573:                schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], 
                    574:                                c, TASK_FD(task), rpc, len);
1.13      misho     575: end:
1.17.4.5  misho     576:        if (buf)
                    577:                e_free(buf);
1.13      misho     578:        schedReadSelf(task);
                    579:        return NULL;
                    580: }
                    581: 
                    582: /* ------------------------------------------------------ */
                    583: 
1.16      misho     584: void
1.13      misho     585: rpc_freeBLOBCli(rpc_cli_t * __restrict c)
                    586: {
                    587:        rpc_srv_t *s = c->cli_parent;
                    588: 
                    589:        schedCancelby(s->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
1.10      misho     590: 
1.17.4.2  misho     591:        /* free buffer(s) */
                    592:        ait_freeVars(&c->cli_buf);
1.10      misho     593: 
1.14      misho     594:        array_Del(s->srv_blob.clients, c->cli_id, 0);
1.10      misho     595:        if (c)
1.14      misho     596:                e_free(c);
1.13      misho     597: }
                    598: 
                    599: 
                    600: static void *
                    601: closeBLOBClient(sched_task_t *task)
                    602: {
                    603:        int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock;
                    604: 
                    605:        rpc_freeBLOBCli(TASK_ARG(task));
                    606: 
                    607:        /* close client socket */
                    608:        shutdown(sock, SHUT_RDWR);
                    609:        close(sock);
1.7       misho     610:        return NULL;
                    611: }
                    612: 
                    613: static void *
                    614: txBLOB(sched_task_t *task)
                    615: {
1.10      misho     616:        rpc_cli_t *c = TASK_ARG(task);
1.17.4.6  misho     617:        u_char *buf = AIT_GET_BUF(array(c->cli_buf, 0, ait_val_t*));
1.7       misho     618:        int wlen = sizeof(struct tagBLOBHdr);
                    619: 
                    620:        /* send reply */
1.10      misho     621:        wlen = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);
                    622:        if (wlen == -1 || wlen != sizeof(struct tagBLOBHdr)) {
                    623:                /* close blob connection */
                    624:                schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
                    625:        }
1.7       misho     626: 
                    627:        return NULL;
                    628: }
1.4       misho     629: 
1.7       misho     630: static void *
                    631: rxBLOB(sched_task_t *task)
                    632: {
                    633:        rpc_cli_t *c = TASK_ARG(task);
                    634:        rpc_srv_t *s = c->cli_parent;
                    635:        rpc_blob_t *b;
1.10      misho     636:        struct tagBLOBHdr blob;
1.7       misho     637:        int rlen;
                    638: 
1.10      misho     639:        memset(&blob, 0, sizeof blob);
                    640:        rlen = recv(TASK_FD(task), &blob, sizeof blob, 0);
                    641:        if (rlen < 1) {
                    642:                /* close blob connection */
                    643:                schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
1.7       misho     644:                return NULL;
                    645:        }
1.5       misho     646: 
1.10      misho     647:        /* check BLOB packet */
                    648:        if (rlen < sizeof(struct tagBLOBHdr)) {
                    649:                rpc_SetErr(ERPCMISMATCH, "Short BLOB packet");
1.6       misho     650: 
1.10      misho     651:                schedReadSelf(task);
1.7       misho     652:                return NULL;
                    653:        }
1.1       misho     654: 
1.7       misho     655:        /* check RPC packet session info */
1.15      misho     656:        if (rpc_chkPktSession(&blob.hdr_session, &s->srv_session)) {
1.7       misho     657:                rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
1.10      misho     658:                blob.hdr_cmd = error;
1.7       misho     659:                goto end;
                    660:        }
                    661: 
                    662:        /* Go to proceed packet ... */
1.10      misho     663:        switch (blob.hdr_cmd) {
1.7       misho     664:                case get:
1.10      misho     665:                        if (!(b = rpc_srv_getBLOB(s, ntohl(blob.hdr_var)))) {
                    666:                                rpc_SetErr(EINVAL, "Var=%x not found", ntohl(blob.hdr_var));
                    667:                                blob.hdr_cmd = no;
                    668:                                blob.hdr_ret = RPC_ERROR(-1);
1.7       misho     669:                                break;
                    670:                        } else
1.10      misho     671:                                blob.hdr_len = htonl(b->blob_len);
1.5       misho     672: 
1.7       misho     673:                        if (rpc_srv_blobMap(s, b) != -1) {
                    674:                                /* deliver BLOB variable to client */
1.10      misho     675:                                blob.hdr_ret = htonl(rpc_srv_sendBLOB(c, b));
1.7       misho     676:                                rpc_srv_blobUnmap(b);
                    677:                        } else {
1.10      misho     678:                                blob.hdr_cmd = error;
                    679:                                blob.hdr_ret = RPC_ERROR(-1);
1.7       misho     680:                        }
                    681:                        break;
                    682:                case set:
1.17      misho     683:                        if ((b = rpc_srv_registerBLOB(s, ntohl(blob.hdr_len), 
                    684:                                                        ntohl(blob.hdr_ret)))) {
1.7       misho     685:                                /* set new BLOB variable for reply :) */
1.10      misho     686:                                blob.hdr_var = htonl(b->blob_var);
1.7       misho     687: 
                    688:                                /* receive BLOB from client */
1.10      misho     689:                                blob.hdr_ret = htonl(rpc_srv_recvBLOB(c, b));
1.7       misho     690:                                rpc_srv_blobUnmap(b);
1.5       misho     691:                        } else {
1.10      misho     692:                                blob.hdr_cmd = error;
                    693:                                blob.hdr_ret = RPC_ERROR(-1);
1.7       misho     694:                        }
                    695:                        break;
                    696:                case unset:
1.11      misho     697:                        if (rpc_srv_unregisterBLOB(s, ntohl(blob.hdr_var)) == -1) {
1.10      misho     698:                                blob.hdr_cmd = error;
                    699:                                blob.hdr_ret = RPC_ERROR(-1);
1.1       misho     700:                        }
                    701:                        break;
1.7       misho     702:                default:
1.10      misho     703:                        rpc_SetErr(EPROCUNAVAIL, "Unsupported BLOB command %d", blob.hdr_cmd);
                    704:                        blob.hdr_cmd = error;
                    705:                        blob.hdr_ret = RPC_ERROR(-1);
1.7       misho     706:        }
1.1       misho     707: 
1.7       misho     708: end:
1.17.4.6  misho     709:        memcpy(AIT_ADDR(array(c->cli_buf, 0, ait_val_t*)), &blob, sizeof blob);
1.10      misho     710:        schedWrite(TASK_ROOT(task), txBLOB, TASK_ARG(task), TASK_FD(task), NULL, 0);
                    711:        schedReadSelf(task);
1.7       misho     712:        return NULL;
1.2       misho     713: }
                    714: 
                    715: static void *
1.17      misho     716: flushBLOB(sched_task_t *task)
                    717: {
                    718:        rpc_srv_t *srv = TASK_ARG(task);
                    719:        rpc_blob_t *b, *tmp;
                    720: 
                    721:        TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
                    722:                TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
                    723: 
                    724:                rpc_srv_blobFree(srv, b);
                    725:                e_free(b);
                    726:        }
                    727: 
                    728:        schedSignalSelf(task);
                    729:        return NULL;
                    730: }
                    731: 
                    732: static void *
1.10      misho     733: acceptBLOBClients(sched_task_t *task)
1.2       misho     734: {
1.10      misho     735:        rpc_srv_t *srv = TASK_ARG(task);
                    736:        rpc_cli_t *c = NULL;
                    737:        register int i;
1.14      misho     738:        socklen_t salen = sizeof(sockaddr_t);
1.12      misho     739: #ifdef TCP_NOPUSH
                    740:        int n = 1;
                    741: #endif
1.7       misho     742: 
1.10      misho     743:        /* check free slots for connect */
1.14      misho     744:        for (i = 0; i < array_Size(srv->srv_blob.clients) && 
                    745:                        (c = array(srv->srv_blob.clients, i, rpc_cli_t*)); i++);
1.10      misho     746:        if (c)  /* no more free slots! */
                    747:                goto end;
1.14      misho     748:        c = e_malloc(sizeof(rpc_cli_t));
1.10      misho     749:        if (!c) {
1.7       misho     750:                LOGERR;
1.10      misho     751:                srv->srv_kill = srv->srv_blob.kill = 1;
1.7       misho     752:                return NULL;
                    753:        } else {
1.10      misho     754:                memset(c, 0, sizeof(rpc_cli_t));
1.14      misho     755:                array_Set(srv->srv_blob.clients, i, c);
1.10      misho     756:                c->cli_id = i;
                    757:                c->cli_parent = srv;
1.7       misho     758:        }
1.4       misho     759: 
1.17.4.2  misho     760:        /* init buffer(s) */
                    761:        c->cli_buf = ait_allocVars(1);
                    762:        if (!c->cli_buf) {
                    763:                rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
                    764:                array_Del(srv->srv_blob.clients, i, 42);
                    765:                goto end;
                    766:        } else
                    767:                AIT_SET_BUFSIZ(array(c->cli_buf, 0, ait_val_t*), 0, srv->srv_netbuf);
1.2       misho     768: 
1.10      misho     769:        /* accept client */
                    770:        c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
                    771:        if (c->cli_sock == -1) {
                    772:                LOGERR;
1.17.4.2  misho     773:                ait_freeVars(&c->cli_buf);
1.14      misho     774:                array_Del(srv->srv_blob.clients, i, 42);
1.10      misho     775:                goto end;
1.12      misho     776:        } else {
                    777: #ifdef TCP_NOPUSH
                    778:                setsockopt(c->cli_sock, IPPROTO_TCP, TCP_NOPUSH, &n, sizeof n);
                    779: #endif
1.10      misho     780:                fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
1.12      misho     781:        }
1.2       misho     782: 
1.10      misho     783:        schedRead(TASK_ROOT(task), rxBLOB, c, c->cli_sock, NULL, 0);
                    784: end:
                    785:        schedReadSelf(task);
1.7       misho     786:        return NULL;
1.1       misho     787: }
                    788: 
1.10      misho     789: /* ------------------------------------------------------ */
1.1       misho     790: 
                    791: /*
1.7       misho     792:  * rpc_srv_initBLOBServer() - Init & create BLOB Server
                    793:  *
1.4       misho     794:  * @srv = RPC server instance
1.2       misho     795:  * @Port = Port for bind server, if Port == 0 default port is selected
                    796:  * @diskDir = Disk place for BLOB file objects
                    797:  * return: -1 == error or 0 bind and created BLOB server instance
                    798:  */
                    799: int
                    800: rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)
                    801: {
                    802:        int n = 1;
                    803: 
1.10      misho     804:        if (!srv || srv->srv_kill) {
1.7       misho     805:                rpc_SetErr(EINVAL, "Invalid parameters can`t init BLOB server");
1.2       misho     806:                return -1;
                    807:        }
                    808: 
                    809:        memset(&srv->srv_blob, 0, sizeof srv->srv_blob);
                    810:        if (access(diskDir, R_OK | W_OK) == -1) {
                    811:                LOGERR;
                    812:                return -1;
                    813:        } else
1.7       misho     814:                AIT_SET_STR(&srv->srv_blob.dir, diskDir);
1.2       misho     815: 
1.10      misho     816:        /* init blob list */
                    817:        TAILQ_INIT(&srv->srv_blob.blobs);
                    818: 
1.2       misho     819:        srv->srv_blob.server.cli_parent = srv;
1.4       misho     820: 
1.14      misho     821:        memcpy(&srv->srv_blob.server.cli_sa, &srv->srv_server.cli_sa, sizeof(sockaddr_t));
1.10      misho     822:        switch (srv->srv_blob.server.cli_sa.sa.sa_family) {
1.4       misho     823:                case AF_INET:
1.10      misho     824:                        srv->srv_blob.server.cli_sa.sin.sin_port = 
                    825:                                htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin.sin_port) + 1);
1.4       misho     826:                        break;
                    827:                case AF_INET6:
1.10      misho     828:                        srv->srv_blob.server.cli_sa.sin6.sin6_port = 
                    829:                                htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin6.sin6_port) + 1);
1.4       misho     830:                        break;
                    831:                case AF_LOCAL:
1.10      misho     832:                        strlcat(srv->srv_blob.server.cli_sa.sun.sun_path, ".blob", 
                    833:                                        sizeof srv->srv_blob.server.cli_sa.sun.sun_path);
1.4       misho     834:                        break;
                    835:                default:
1.7       misho     836:                        AIT_FREE_VAL(&srv->srv_blob.dir);
1.4       misho     837:                        return -1;
1.2       misho     838:        }
                    839: 
1.4       misho     840:        /* create BLOB server socket */
1.6       misho     841:        srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);
1.2       misho     842:        if (srv->srv_blob.server.cli_sock == -1) {
                    843:                LOGERR;
1.7       misho     844:                AIT_FREE_VAL(&srv->srv_blob.dir);
1.2       misho     845:                return -1;
                    846:        }
                    847:        if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
                    848:                LOGERR;
                    849:                close(srv->srv_blob.server.cli_sock);
1.7       misho     850:                AIT_FREE_VAL(&srv->srv_blob.dir);
1.2       misho     851:                return -1;
                    852:        }
1.5       misho     853:        n = srv->srv_netbuf;
                    854:        if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
                    855:                LOGERR;
                    856:                close(srv->srv_blob.server.cli_sock);
1.7       misho     857:                AIT_FREE_VAL(&srv->srv_blob.dir);
1.5       misho     858:                return -1;
                    859:        }
                    860:        if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
                    861:                LOGERR;
                    862:                close(srv->srv_blob.server.cli_sock);
1.7       misho     863:                AIT_FREE_VAL(&srv->srv_blob.dir);
1.5       misho     864:                return -1;
                    865:        }
1.6       misho     866:        if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa, 
                    867:                                srv->srv_blob.server.cli_sa.sa.sa_len) == -1) {
1.2       misho     868:                LOGERR;
                    869:                close(srv->srv_blob.server.cli_sock);
1.7       misho     870:                AIT_FREE_VAL(&srv->srv_blob.dir);
1.2       misho     871:                return -1;
1.13      misho     872:        } else
                    873:                fcntl(srv->srv_blob.server.cli_sock, F_SETFL, 
                    874:                                fcntl(srv->srv_blob.server.cli_sock, F_GETFL) | O_NONBLOCK);
                    875: 
1.2       misho     876: 
1.10      misho     877:        /* allocate pool for concurent blob clients */
1.14      misho     878:        srv->srv_blob.clients = array_Init(array_Size(srv->srv_clients));
1.2       misho     879:        if (!srv->srv_blob.clients) {
1.14      misho     880:                rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
1.2       misho     881:                close(srv->srv_blob.server.cli_sock);
1.7       misho     882:                AIT_FREE_VAL(&srv->srv_blob.dir);
1.2       misho     883:                return -1;
1.10      misho     884:        }
1.2       misho     885: 
1.10      misho     886:        /* init blob scheduler */
                    887:        srv->srv_blob.root = schedBegin();
                    888:        if (!srv->srv_blob.root) {
                    889:                rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1.14      misho     890:                array_Destroy(&srv->srv_blob.clients);
1.10      misho     891:                close(srv->srv_blob.server.cli_sock);
                    892:                AIT_FREE_VAL(&srv->srv_blob.dir);
                    893:                return -1;
                    894:        }
1.2       misho     895: 
                    896:        return 0;
                    897: }
                    898: 
                    899: /*
1.7       misho     900:  * rpc_srv_endBLOBServer() - Destroy BLOB server, close all opened sockets and free resources
                    901:  *
1.2       misho     902:  * @srv = RPC Server instance
                    903:  * return: none
                    904:  */
1.16      misho     905: void
1.2       misho     906: rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
                    907: {
1.10      misho     908:        if (!srv)
1.2       misho     909:                return;
                    910: 
1.10      misho     911:        srv->srv_blob.kill = 1;
1.17      misho     912: 
                    913:        schedEnd(&srv->srv_blob.root);
1.2       misho     914: }
                    915: 
                    916: /*
1.10      misho     917:  * rpc_srv_loopBLOBServer() - Execute Main BLOB server loop and wait for clients requests
1.7       misho     918:  *
1.2       misho     919:  * @srv = RPC Server instance
                    920:  * return: -1 error or 0 ok, infinite loop ...
                    921:  */
                    922: int
1.10      misho     923: rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv)
1.2       misho     924: {
1.10      misho     925:        rpc_cli_t *c;
1.2       misho     926:        register int i;
1.10      misho     927:        rpc_blob_t *b, *tmp;
                    928:        struct timespec ts = { RPC_SCHED_POLLING, 0 };
1.2       misho     929: 
1.10      misho     930:        if (!srv || srv->srv_kill) {
1.7       misho     931:                rpc_SetErr(EINVAL, "Invalid parameter can`t start BLOB server");
1.2       misho     932:                return -1;
                    933:        }
                    934: 
1.14      misho     935:        if (listen(srv->srv_blob.server.cli_sock, array_Size(srv->srv_blob.clients)) == -1) {
1.2       misho     936:                LOGERR;
                    937:                return -1;
1.10      misho     938:        }
                    939: 
1.17      misho     940:        schedSignal(srv->srv_blob.root, flushBLOB, srv, SIGFBLOB, NULL, 0);
1.10      misho     941:        if (!schedRead(srv->srv_blob.root, acceptBLOBClients, srv, 
                    942:                                srv->srv_blob.server.cli_sock, NULL, 0)) {
                    943:                rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                    944:                return -1;
                    945:        }
1.2       misho     946: 
1.10      misho     947:        schedPolling(srv->srv_blob.root, &ts, NULL);
                    948:        /* main rpc loop */
                    949:        schedRun(srv->srv_blob.root, &srv->srv_blob.kill);
1.7       misho     950: 
1.17      misho     951:        /* detach blobs */
                    952:        TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
                    953:                TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
                    954: 
                    955:                rpc_srv_blobFree(srv, b);
                    956:                e_free(b);
                    957:        }
                    958: 
1.10      misho     959:        /* close all clients connections & server socket */
1.14      misho     960:        for (i = 0; i < array_Size(srv->srv_blob.clients); i++) {
                    961:                c = array(srv->srv_blob.clients, i, rpc_cli_t*);
1.10      misho     962:                if (c) {
                    963:                        shutdown(c->cli_sock, SHUT_RDWR);
                    964:                        close(c->cli_sock);
                    965: 
                    966:                        schedCancelby(srv->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
1.17.4.2  misho     967:                        ait_freeVars(&c->cli_buf);
1.2       misho     968:                }
1.14      misho     969:                array_Del(srv->srv_blob.clients, i, 42);
1.10      misho     970:        }
1.14      misho     971:        array_Destroy(&srv->srv_blob.clients);
1.2       misho     972: 
1.10      misho     973:        close(srv->srv_blob.server.cli_sock);
1.2       misho     974: 
1.10      misho     975:        AIT_FREE_VAL(&srv->srv_blob.dir);
1.2       misho     976:        return 0;
                    977: }
                    978: 
                    979: 
                    980: /*
1.7       misho     981:  * rpc_srv_initServer() - Init & create RPC Server
                    982:  *
1.15      misho     983:  * @InstID = Instance for authentication & recognition
1.1       misho     984:  * @concurentClients = Concurent clients at same time to this server
1.10      misho     985:  * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
1.4       misho     986:  * @csHost = Host name or address for bind server, if NULL any address
1.1       misho     987:  * @Port = Port for bind server, if Port == 0 default port is selected
1.13      misho     988:  * @proto = Protocol, if == 0 choose SOCK_STREAM
1.1       misho     989:  * return: NULL == error or !=NULL bind and created RPC server instance
                    990:  */
                    991: rpc_srv_t *
1.15      misho     992: rpc_srv_initServer(u_char InstID, int concurentClients, int netBuf, 
                    993:                const char *csHost, u_short Port, int proto)
1.1       misho     994: {
1.10      misho     995:        int n = 1;
1.1       misho     996:        rpc_srv_t *srv = NULL;
1.14      misho     997:        sockaddr_t sa = E_SOCKADDR_INIT;
1.1       misho     998: 
1.15      misho     999:        if (!concurentClients || (proto < 0 || proto > SOCK_DGRAM)) {
1.10      misho    1000:                rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
1.1       misho    1001:                return NULL;
                   1002:        }
1.14      misho    1003:        if (!e_gethostbyname(csHost, Port, &sa))
1.10      misho    1004:                return NULL;
1.1       misho    1005:        if (!Port)
                   1006:                Port = RPC_DEFPORT;
1.13      misho    1007:        if (!proto)
                   1008:                proto = SOCK_STREAM;
1.10      misho    1009:        if (netBuf < RPC_MIN_BUFSIZ)
1.5       misho    1010:                netBuf = BUFSIZ;
1.7       misho    1011:        else
1.14      misho    1012:                netBuf = E_ALIGN(netBuf, 2);    /* align netBuf length */
1.10      misho    1013: 
                   1014: #ifdef HAVE_SRANDOMDEV
                   1015:        srandomdev();
                   1016: #else
                   1017:        time_t tim;
                   1018: 
                   1019:        srandom((time(&tim) ^ getpid()));
                   1020: #endif
1.1       misho    1021: 
1.14      misho    1022:        srv = e_malloc(sizeof(rpc_srv_t));
1.1       misho    1023:        if (!srv) {
                   1024:                LOGERR;
                   1025:                return NULL;
                   1026:        } else
                   1027:                memset(srv, 0, sizeof(rpc_srv_t));
                   1028: 
1.13      misho    1029:        srv->srv_proto = proto;
1.5       misho    1030:        srv->srv_netbuf = netBuf;
1.1       misho    1031:        srv->srv_session.sess_version = RPC_VERSION;
1.15      misho    1032:        srv->srv_session.sess_instance = InstID;
1.1       misho    1033: 
                   1034:        srv->srv_server.cli_parent = srv;
1.10      misho    1035:        memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa);
                   1036: 
1.12      misho    1037:        /* init functions */
                   1038:        pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
                   1039:        SLIST_INIT(&srv->srv_funcs);
                   1040:        AVL_INIT(&srv->srv_funcs);
1.10      misho    1041: 
                   1042:        /* init scheduler */
                   1043:        srv->srv_root = schedBegin();
                   1044:        if (!srv->srv_root) {
                   1045:                rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1.12      misho    1046:                pthread_mutex_destroy(&srv->srv_funcs.mtx);
1.14      misho    1047:                e_free(srv);
1.10      misho    1048:                return NULL;
                   1049:        }
                   1050: 
                   1051:        /* init pool for clients */
1.14      misho    1052:        srv->srv_clients = array_Init(concurentClients);
1.10      misho    1053:        if (!srv->srv_clients) {
1.14      misho    1054:                rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
1.10      misho    1055:                schedEnd(&srv->srv_root);
1.12      misho    1056:                pthread_mutex_destroy(&srv->srv_funcs.mtx);
1.14      misho    1057:                e_free(srv);
1.10      misho    1058:                return NULL;
                   1059:        }
1.4       misho    1060: 
                   1061:        /* create server socket */
1.13      misho    1062:        srv->srv_server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, srv->srv_proto, 0);
1.1       misho    1063:        if (srv->srv_server.cli_sock == -1) {
                   1064:                LOGERR;
1.14      misho    1065:                array_Destroy(&srv->srv_clients);
1.10      misho    1066:                schedEnd(&srv->srv_root);
1.12      misho    1067:                pthread_mutex_destroy(&srv->srv_funcs.mtx);
1.14      misho    1068:                e_free(srv);
1.1       misho    1069:                return NULL;
                   1070:        }
                   1071:        if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
                   1072:                LOGERR;
1.10      misho    1073:                goto err;
1.1       misho    1074:        }
1.5       misho    1075:        n = srv->srv_netbuf;
                   1076:        if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
                   1077:                LOGERR;
1.10      misho    1078:                goto err;
1.5       misho    1079:        }
                   1080:        if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
                   1081:                LOGERR;
1.10      misho    1082:                goto err;
1.5       misho    1083:        }
1.6       misho    1084:        if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa, 
                   1085:                                srv->srv_server.cli_sa.sa.sa_len) == -1) {
1.1       misho    1086:                LOGERR;
1.10      misho    1087:                goto err;
1.13      misho    1088:        } else
                   1089:                fcntl(srv->srv_server.cli_sock, F_SETFL, 
                   1090:                                fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
1.1       misho    1091: 
1.10      misho    1092:        rpc_register_srvPing(srv);
1.8       misho    1093: 
1.1       misho    1094:        return srv;
1.10      misho    1095: err:   /* error condition */
                   1096:        close(srv->srv_server.cli_sock);
1.14      misho    1097:        array_Destroy(&srv->srv_clients);
1.10      misho    1098:        schedEnd(&srv->srv_root);
1.12      misho    1099:        pthread_mutex_destroy(&srv->srv_funcs.mtx);
1.14      misho    1100:        e_free(srv);
1.10      misho    1101:        return NULL;
1.1       misho    1102: }
                   1103: 
                   1104: /*
1.7       misho    1105:  * rpc_srv_endServer() - Destroy RPC server, close all opened sockets and free resources
                   1106:  *
1.6       misho    1107:  * @psrv = RPC Server instance
1.1       misho    1108:  * return: none
                   1109:  */
1.16      misho    1110: void
1.6       misho    1111: rpc_srv_endServer(rpc_srv_t ** __restrict psrv)
1.1       misho    1112: {
1.10      misho    1113:        if (!psrv || !*psrv)
1.1       misho    1114:                return;
                   1115: 
1.10      misho    1116:        /* if send kill to blob server */
1.17      misho    1117:        rpc_srv_endBLOBServer(*psrv);
1.1       misho    1118: 
1.10      misho    1119:        (*psrv)->srv_kill = 1;
                   1120:        sleep(RPC_SCHED_POLLING);
1.2       misho    1121: 
1.17      misho    1122:        schedEnd(&(*psrv)->srv_root);
                   1123: 
1.12      misho    1124:        pthread_mutex_destroy(&(*psrv)->srv_funcs.mtx);
1.14      misho    1125:        e_free(*psrv);
1.6       misho    1126:        *psrv = NULL;
1.1       misho    1127: }
                   1128: 
                   1129: /*
1.7       misho    1130:  * rpc_srv_loopServer() - Execute Main server loop and wait for clients requests
                   1131:  *
1.1       misho    1132:  * @srv = RPC Server instance
                   1133:  * return: -1 error or 0 ok, infinite loop ...
                   1134:  */
                   1135: int
1.5       misho    1136: rpc_srv_loopServer(rpc_srv_t * __restrict srv)
1.1       misho    1137: {
1.10      misho    1138:        rpc_cli_t *c;
1.1       misho    1139:        register int i;
1.12      misho    1140:        rpc_func_t *f;
1.10      misho    1141:        struct timespec ts = { RPC_SCHED_POLLING, 0 };
1.1       misho    1142: 
                   1143:        if (!srv) {
1.10      misho    1144:                rpc_SetErr(EINVAL, "Invalid parameter can`t start RPC server");
1.1       misho    1145:                return -1;
                   1146:        }
                   1147: 
1.13      misho    1148:        if (srv->srv_proto == SOCK_STREAM)
1.14      misho    1149:                if (listen(srv->srv_server.cli_sock, array_Size(srv->srv_clients)) == -1) {
1.13      misho    1150:                        LOGERR;
                   1151:                        return -1;
                   1152:                }
1.1       misho    1153: 
1.13      misho    1154:        if (!schedRead(srv->srv_root, cbProto[srv->srv_proto][CB_ACCEPTCLIENT], srv, 
                   1155:                                srv->srv_server.cli_sock, NULL, 0)) {
1.10      misho    1156:                rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                   1157:                return -1;
                   1158:        }
1.7       misho    1159: 
1.10      misho    1160:        schedPolling(srv->srv_root, &ts, NULL);
1.7       misho    1161:        /* main rpc loop */
1.10      misho    1162:        schedRun(srv->srv_root, &srv->srv_kill);
                   1163: 
                   1164:        /* close all clients connections & server socket */
1.14      misho    1165:        for (i = 0; i < array_Size(srv->srv_clients); i++) {
                   1166:                c = array(srv->srv_clients, i, rpc_cli_t*);
1.10      misho    1167:                if (c) {
                   1168:                        shutdown(c->cli_sock, SHUT_RDWR);
                   1169:                        close(c->cli_sock);
                   1170: 
                   1171:                        schedCancelby(srv->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
1.14      misho    1172:                        ait_freeVars(&RPC_RETVARS(c));
1.17.4.2  misho    1173:                        ait_freeVars(&c->cli_buf);
1.1       misho    1174:                }
1.14      misho    1175:                array_Del(srv->srv_clients, i, 42);
1.10      misho    1176:        }
1.14      misho    1177:        array_Destroy(&srv->srv_clients);
1.2       misho    1178: 
1.10      misho    1179:        close(srv->srv_server.cli_sock);
1.2       misho    1180: 
1.10      misho    1181:        /* detach exported calls */
1.12      misho    1182:        RPC_FUNCS_LOCK(&srv->srv_funcs);
                   1183:        while ((f = SLIST_FIRST(&srv->srv_funcs))) {
                   1184:                SLIST_REMOVE_HEAD(&srv->srv_funcs, func_next);
1.1       misho    1185: 
1.10      misho    1186:                AIT_FREE_VAL(&f->func_name);
1.14      misho    1187:                e_free(f);
1.1       misho    1188:        }
1.12      misho    1189:        srv->srv_funcs.avlh_root = NULL;
                   1190:        RPC_FUNCS_UNLOCK(&srv->srv_funcs);
1.1       misho    1191: 
                   1192:        return 0;
                   1193: }
                   1194: 
                   1195: 
                   1196: /*
                   1197:  * rpc_srv_execCall() Execute registered call from RPC server
1.7       misho    1198:  *
1.10      misho    1199:  * @cli = RPC client
1.1       misho    1200:  * @rpc = IN RPC call structure
1.10      misho    1201:  * @funcname = Execute RPC function
1.5       misho    1202:  * @args = IN RPC calling arguments from RPC client
1.1       misho    1203:  * return: -1 error, !=-1 ok
                   1204:  */
                   1205: int
1.10      misho    1206: rpc_srv_execCall(rpc_cli_t * __restrict cli, struct tagRPCCall * __restrict rpc, 
                   1207:                ait_val_t funcname, array_t * __restrict args)
1.1       misho    1208: {
                   1209:        rpc_callback_t func;
                   1210: 
1.10      misho    1211:        if (!cli || !rpc || !AIT_ADDR(&funcname)) {
1.7       misho    1212:                rpc_SetErr(EINVAL, "Invalid parameter can`t exec function");
1.1       misho    1213:                return -1;
                   1214:        }
                   1215: 
1.10      misho    1216:        func = AIT_GET_LIKE(&funcname, rpc_callback_t);
                   1217:        return func(cli, rpc, args);
1.1       misho    1218: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>