Annotation of libaitrpc/src/srv.c, revision 1.24.2.4

1.1       misho       1: /*************************************************************************
                      2: * (C) 2010 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
                      3: *  by Michael Pounov <misho@openbsd-bg.org>
                      4: *
                      5: * $Author: misho $
1.24.2.4! misho       6: * $Id: srv.c,v 1.24.2.3 2015/01/18 00:03:01 misho Exp $
1.1       misho       7: *
1.2       misho       8: **************************************************************************
                      9: The ELWIX and AITNET software is distributed under the following
                     10: terms:
                     11: 
                     12: All of the documentation and software included in the ELWIX and AITNET
                     13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
                     14: 
1.24.2.2  misho      15: Copyright 2004 - 2015
1.2       misho      16:        by Michael Pounov <misho@elwix.org>.  All rights reserved.
                     17: 
                     18: Redistribution and use in source and binary forms, with or without
                     19: modification, are permitted provided that the following conditions
                     20: are met:
                     21: 1. Redistributions of source code must retain the above copyright
                     22:    notice, this list of conditions and the following disclaimer.
                     23: 2. Redistributions in binary form must reproduce the above copyright
                     24:    notice, this list of conditions and the following disclaimer in the
                     25:    documentation and/or other materials provided with the distribution.
                     26: 3. All advertising materials mentioning features or use of this software
                     27:    must display the following acknowledgement:
                     28: This product includes software developed by Michael Pounov <misho@elwix.org>
                     29: ELWIX - Embedded LightWeight unIX and its contributors.
                     30: 4. Neither the name of AITNET nor the names of its contributors
                     31:    may be used to endorse or promote products derived from this software
                     32:    without specific prior written permission.
                     33: 
                     34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
                     35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
                     36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
                     37: ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
                     38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
                     39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
                     40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
                     41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
                     42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
                     43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
                     44: SUCH DAMAGE.
                     45: */
1.1       misho      46: #include "global.h"
                     47: 
                     48: 
1.13      misho      49: /* SOCK_STREAM */
                     50: static void *acceptClients(sched_task_t *);
                     51: static void *closeClient(sched_task_t *);
                     52: static void *rxPacket(sched_task_t *);
                     53: static void *txPacket(sched_task_t *);
                     54: 
                     55: /* SOCK_DGRAM */
                     56: static void *freeClient(sched_task_t *);
                     57: static void *rxUDPPacket(sched_task_t *);
                     58: static void *txUDPPacket(sched_task_t *);
                     59: 
                     60: /* SOCK_RAW */
                     61: 
1.24      misho      62: /* SOCK_BPF */
                     63: static void *rxBPFPacket(sched_task_t *);
                     64: static void *txBPFPacket(sched_task_t *);
                     65: 
1.24.2.3  misho      66: /* SOCK_EXT */
1.24.2.1  misho      67: 
                     68: static sched_task_func_t cbProto[SOCK_MAX_SUPPORT][4] = {
1.13      misho      69:        { acceptClients, closeClient, rxPacket, txPacket },     /* SOCK_STREAM */
                     70:        { acceptClients, closeClient, rxPacket, txPacket },     /* SOCK_STREAM */
                     71:        { rxUDPPacket, freeClient, rxUDPPacket, txUDPPacket },  /* SOCK_DGRAM */
1.24      misho      72:        { NULL, NULL, NULL, NULL },                             /* SOCK_RAW */
1.24.2.1  misho      73:        { rxBPFPacket, freeClient, rxBPFPacket, txBPFPacket },  /* SOCK_BPF */
1.24.2.3  misho      74:        { NULL, NULL, NULL, NULL }      /* SOCK_EXT */
1.13      misho      75: };
                     76: 
1.23      misho      77: /* Global Signal Argument when kqueue support disabled */
                     78: 
                     79: static volatile uintptr_t _glSigArg = 0;
                     80: 
1.13      misho      81: 
1.16      misho      82: void
1.13      misho      83: rpc_freeCli(rpc_cli_t * __restrict c)
1.10      misho      84: {
                     85:        rpc_srv_t *s = c->cli_parent;
                     86: 
1.13      misho      87:        schedCancelby(s->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
1.10      misho      88: 
                     89:        /* free buffer */
                     90:        AIT_FREE_VAL(&c->cli_buf);
                     91: 
1.14      misho      92:        array_Del(s->srv_clients, c->cli_id, 0);
1.10      misho      93:        if (c)
1.14      misho      94:                e_free(c);
1.13      misho      95: }
                     96: 
                     97: 
                     98: static inline int
1.14      misho      99: _check4freeslot(rpc_srv_t * __restrict srv, sockaddr_t * __restrict sa)
1.13      misho     100: {
                    101:        rpc_cli_t *c = NULL;
                    102:        register int i;
                    103: 
                    104:        /* check free slots for connect */
1.14      misho     105:        for (i = 0; i < array_Size(srv->srv_clients) && 
                    106:                        (c = array(srv->srv_clients, i, rpc_cli_t*)); i++)
1.13      misho     107:                /* check for duplicates */
1.14      misho     108:                if (sa && !e_addrcmp(&c->cli_sa, sa, 42))
1.13      misho     109:                        break;
1.14      misho     110:        if (i >= array_Size(srv->srv_clients))
1.13      misho     111:                return -1;      /* no more free slots! */
                    112: 
                    113:        return i;
                    114: }
                    115: 
                    116: static rpc_cli_t *
1.14      misho     117: _allocClient(rpc_srv_t * __restrict srv, sockaddr_t * __restrict sa)
1.13      misho     118: {
                    119:        rpc_cli_t *c = NULL;
                    120:        int n;
                    121: 
                    122:        n = _check4freeslot(srv, sa);
                    123:        if (n == -1)
                    124:                return NULL;
                    125:        else
1.14      misho     126:                c = array(srv->srv_clients, n, rpc_cli_t*);
1.13      misho     127: 
                    128:        if (!c) {
1.14      misho     129:                c = e_malloc(sizeof(rpc_cli_t));
1.13      misho     130:                if (!c) {
                    131:                        LOGERR;
                    132:                        srv->srv_kill = 1;
                    133:                        return NULL;
                    134:                } else {
                    135:                        memset(c, 0, sizeof(rpc_cli_t));
1.14      misho     136:                        array_Set(srv->srv_clients, n, c);
1.13      misho     137:                        c->cli_id = n;
                    138:                        c->cli_parent = srv;
                    139:                }
                    140: 
                    141:                /* alloc empty buffer */
1.14      misho     142:                AIT_SET_BUFSIZ(&c->cli_buf, 0, srv->srv_netbuf);
1.13      misho     143:        }
                    144: 
                    145:        return c;
                    146: }
                    147: 
                    148: 
                    149: static void *
                    150: freeClient(sched_task_t *task)
                    151: {
                    152:        rpc_freeCli(TASK_ARG(task));
                    153: 
                    154:        return NULL;
                    155: }
                    156: 
                    157: static void *
                    158: closeClient(sched_task_t *task)
                    159: {
                    160:        int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock;
                    161: 
                    162:        rpc_freeCli(TASK_ARG(task));
                    163: 
                    164:        /* close client socket */
                    165:        shutdown(sock, SHUT_RDWR);
                    166:        close(sock);
1.10      misho     167:        return NULL;
                    168: }
1.7       misho     169: 
                    170: static void *
                    171: txPacket(sched_task_t *task)
                    172: {
                    173:        rpc_cli_t *c = TASK_ARG(task);
                    174:        rpc_srv_t *s = c->cli_parent;
                    175:        rpc_func_t *f = NULL;
1.18      misho     176:        u_char *buf = AIT_GET_BUF(&c->cli_buf);
1.7       misho     177:        struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
1.18      misho     178:        int ret, estlen, wlen = sizeof(struct tagRPCCall);
1.19      misho     179:        struct pollfd pfd;
1.21      misho     180: #ifdef TCP_SESSION_TIMEOUT
                    181:        struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
                    182: 
                    183:        schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
                    184:        schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                    185:                         TASK_ARG(task), ts, TASK_ARG(task), 0);
                    186: #endif
1.7       misho     187: 
                    188:        if (rpc->call_argc) {
1.10      misho     189:                f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
1.7       misho     190:                if (!f) {
1.10      misho     191:                        rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
1.18      misho     192: 
1.7       misho     193:                        rpc->call_argc ^= rpc->call_argc;
                    194:                        rpc->call_rep.ret = RPC_ERROR(-1);
                    195:                        rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                    196:                } else {
1.18      misho     197:                        /* calc estimated length */
                    198:                        estlen = ait_resideVars(RPC_RETVARS(c)) + wlen;
                    199:                        if (estlen > AIT_LEN(&c->cli_buf))
                    200:                                AIT_RE_BUF(&c->cli_buf, estlen);
                    201:                        buf = AIT_GET_BUF(&c->cli_buf);
1.19      misho     202:                        rpc = (struct tagRPCCall*) buf;
1.18      misho     203: 
1.14      misho     204:                        rpc->call_argc = htons(array_Size(RPC_RETVARS(c)));
1.7       misho     205:                        /* Go Encapsulate variables */
1.18      misho     206:                        ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen, 
                    207:                                        RPC_RETVARS(c));
1.10      misho     208:                        /* Free return values */
1.14      misho     209:                        ait_freeVars(&c->cli_vars);
1.7       misho     210:                        if (ret == -1) {
                    211:                                rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
1.19      misho     212: 
1.7       misho     213:                                rpc->call_argc ^= rpc->call_argc;
                    214:                                rpc->call_rep.ret = RPC_ERROR(-1);
                    215:                                rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                    216:                        } else
                    217:                                wlen += ret;
                    218:                }
                    219:        }
                    220: 
1.18      misho     221:        rpc->call_len = htonl(wlen);
1.8       misho     222: 
1.15      misho     223: #if 0
1.7       misho     224:        /* calculate CRC */
                    225:        rpc->call_crc ^= rpc->call_crc;
1.8       misho     226:        rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
1.15      misho     227: #endif
1.7       misho     228: 
                    229:        /* send reply */
1.19      misho     230:        pfd.fd = TASK_FD(task);
                    231:        pfd.events = POLLOUT;
                    232:        for (; wlen > 0; wlen -= ret, buf += ret) {
                    233:                if ((ret = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 || 
                    234:                                pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
                    235:                        if (ret)
                    236:                                LOGERR;
                    237:                        else
1.22      misho     238:                                rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
1.19      misho     239:                        /* close connection */
                    240:                        schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                    241:                                        TASK_ARG(task), 0, NULL, 0);
                    242:                        return NULL;
                    243:                }
                    244:                ret = send(TASK_FD(task), buf, MIN(wlen, s->srv_netbuf), MSG_NOSIGNAL);
                    245:                if (ret == -1) {
                    246:                        /* close connection */
                    247:                        schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                    248:                                        TASK_ARG(task), 0, NULL, 0);
                    249:                        return NULL;
                    250:                }
1.10      misho     251:        }
1.7       misho     252: 
                    253:        return NULL;
                    254: }
                    255: 
                    256: static void *
                    257: execCall(sched_task_t *task)
                    258: {
                    259:        rpc_cli_t *c = TASK_ARG(task);
                    260:        rpc_srv_t *s = c->cli_parent;
                    261:        rpc_func_t *f = NULL;
                    262:        array_t *arr = NULL;
1.18      misho     263:        u_char *buf = AIT_GET_BUF(&c->cli_buf);
1.7       misho     264:        struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
                    265:        int argc = ntohs(rpc->call_argc);
                    266: 
                    267:        /* Go decapsulate variables ... */
1.8       misho     268:        if (argc) {
1.14      misho     269:                arr = ait_buffer2vars(buf + sizeof(struct tagRPCCall), 
1.18      misho     270:                                AIT_LEN(&c->cli_buf) - sizeof(struct tagRPCCall), argc, 42);
1.7       misho     271:                if (!arr) {
1.14      misho     272:                        rpc_SetErr(ERPCMISMATCH, "#%d - %s", elwix_GetErrno(), elwix_GetError());
1.18      misho     273: 
1.7       misho     274:                        rpc->call_argc ^= rpc->call_argc;
                    275:                        rpc->call_rep.ret = RPC_ERROR(-1);
                    276:                        rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                    277:                        return NULL;
                    278:                }
1.10      misho     279:        } else
                    280:                arr = NULL;
1.7       misho     281: 
1.10      misho     282:        if (!(f = rpc_srv_getCall(s, ntohs(rpc->call_tag)))) {
1.7       misho     283:                rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
1.18      misho     284: 
1.7       misho     285:                rpc->call_argc ^= rpc->call_argc;
                    286:                rpc->call_rep.ret = RPC_ERROR(-1);
                    287:                rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                    288:        } else {
1.8       misho     289:                /* if client doesn't want reply */
1.10      misho     290:                argc = RPC_CHK_NOREPLY(rpc);
                    291:                rpc->call_rep.ret = RPC_ERROR(rpc_srv_execCall(c, rpc, f->func_name, arr));
1.7       misho     292:                if (rpc->call_rep.ret == htonl(-1)) {
1.20      misho     293:                        if (!rpc->call_rep.eno) {
                    294:                                LOGERR;
                    295:                                rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                    296:                        }
1.7       misho     297:                        rpc->call_argc ^= rpc->call_argc;
                    298:                } else {
                    299:                        rpc->call_rep.eno ^= rpc->call_rep.eno;
1.8       misho     300:                        if (argc) {
                    301:                                /* without reply */
1.14      misho     302:                                ait_freeVars(&c->cli_vars);
1.8       misho     303:                                rpc->call_argc ^= rpc->call_argc;
1.10      misho     304:                        } else {
                    305:                                /* reply */
1.14      misho     306:                                rpc->call_argc = htons(array_Size(RPC_RETVARS(c)));
1.10      misho     307:                        }
1.7       misho     308:                }
                    309:        }
                    310: 
1.14      misho     311:        array_Destroy(&arr);
1.7       misho     312:        return NULL;
                    313: }
                    314: 
                    315: static void *
                    316: rxPacket(sched_task_t *task)
                    317: {
                    318:        rpc_cli_t *c = TASK_ARG(task);
                    319:        rpc_srv_t *s = c->cli_parent;
1.18      misho     320:        int len, rlen, noreply, estlen;
1.15      misho     321: #if 0
                    322:        u_short crc;
                    323: #endif
1.10      misho     324:        u_char *buf = AIT_GET_BUF(&c->cli_buf);
1.18      misho     325:        struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
1.19      misho     326:        struct pollfd pfd;
1.21      misho     327: #ifdef TCP_SESSION_TIMEOUT
                    328:        struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
                    329: 
                    330:        schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
                    331:        schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                    332:                         TASK_ARG(task), ts, TASK_ARG(task), 0);
                    333: #endif
1.7       misho     334: 
1.18      misho     335:        memset(buf, 0, sizeof(struct tagRPCCall));
1.19      misho     336:        rlen = recv(TASK_FD(task), rpc, sizeof(struct tagRPCCall), MSG_PEEK);
1.18      misho     337:        if (rlen < sizeof(struct tagRPCCall)) {
1.10      misho     338:                /* close connection */
1.13      misho     339:                schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                    340:                                TASK_ARG(task), 0, NULL, 0);
1.7       misho     341:                return NULL;
1.10      misho     342:        } else {
1.18      misho     343:                estlen = ntohl(rpc->call_len);
                    344:                if (estlen > AIT_LEN(&c->cli_buf))
                    345:                        AIT_RE_BUF(&c->cli_buf, estlen);
                    346:                rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
1.19      misho     347:                buf = AIT_GET_BUF(&c->cli_buf);
                    348:                len = estlen;
1.18      misho     349:        }
                    350: 
1.19      misho     351:        /* get next part of packet */
                    352:        memset(buf, 0, len);
                    353:        pfd.fd = TASK_FD(task);
                    354:        pfd.events = POLLIN | POLLPRI;
                    355:        for (; len > 0; len -= rlen, buf += rlen) {
                    356:                if ((rlen = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 || 
                    357:                                pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
                    358:                        if (rlen)
                    359:                                LOGERR;
                    360:                        else
1.22      misho     361:                                rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
1.19      misho     362:                        schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                    363:                                        TASK_ARG(task), 0, NULL, 0);
                    364:                        return NULL;
                    365:                }
1.18      misho     366:                rlen = recv(TASK_FD(task), buf, len, 0);
                    367:                if (rlen == -1) {
                    368:                        /* close connection */
                    369:                        schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                    370:                                        TASK_ARG(task), 0, NULL, 0);
1.8       misho     371:                        return NULL;
1.10      misho     372:                }
1.18      misho     373:        }
1.22      misho     374:        len = estlen;
1.8       misho     375: 
1.15      misho     376: #if 0
1.18      misho     377:        /* check integrity of packet */
                    378:        crc = ntohs(rpc->call_crc);
                    379:        rpc->call_crc ^= rpc->call_crc;
                    380:        if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
                    381:                rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
                    382:                return NULL;
                    383:        }
1.15      misho     384: #endif
1.7       misho     385: 
1.18      misho     386:        noreply = RPC_CHK_NOREPLY(rpc);
1.7       misho     387: 
1.18      misho     388:        /* check RPC packet session info */
                    389:        if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) {
                    390:                rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
1.7       misho     391: 
1.18      misho     392:                rpc->call_argc ^= rpc->call_argc;
                    393:                rpc->call_rep.ret = RPC_ERROR(-1);
                    394:                rpc->call_rep.eno = RPC_ERROR(errno);
                    395:        } else {
                    396:                /* execute RPC call */
                    397:                schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), (int) noreply, rpc, len);
                    398:        }
1.7       misho     399: 
1.18      misho     400:        /* send RPC reply */
                    401:        if (!noreply)
                    402:                schedWrite(TASK_ROOT(task), cbProto[s->srv_proto][CB_TXPACKET], 
                    403:                                TASK_ARG(task), TASK_FD(task), rpc, len);
1.7       misho     404: 
                    405:        /* lets get next packet */
1.18      misho     406:        schedReadSelf(task);
1.7       misho     407:        return NULL;
                    408: }
                    409: 
1.1       misho     410: static void *
1.10      misho     411: acceptClients(sched_task_t *task)
1.1       misho     412: {
1.10      misho     413:        rpc_srv_t *srv = TASK_ARG(task);
                    414:        rpc_cli_t *c = NULL;
1.14      misho     415:        socklen_t salen = sizeof(sockaddr_t);
1.21      misho     416:        int sock;
                    417: #ifdef TCP_SESSION_TIMEOUT
                    418:        struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
                    419: #endif
1.7       misho     420: 
1.13      misho     421:        c = _allocClient(srv, NULL);
1.21      misho     422:        if (!c) {
                    423:                EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
                    424:                if ((sock = accept(TASK_FD(task), NULL, NULL)) != -1) {
                    425:                        shutdown(sock, SHUT_RDWR);
                    426:                        close(sock);
                    427:                }
1.10      misho     428:                goto end;
1.21      misho     429:        }
1.10      misho     430: 
                    431:        /* accept client */
                    432:        c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
                    433:        if (c->cli_sock == -1) {
                    434:                LOGERR;
                    435:                AIT_FREE_VAL(&c->cli_buf);
1.14      misho     436:                array_Del(srv->srv_clients, c->cli_id, 42);
1.10      misho     437:                goto end;
1.1       misho     438:        } else
1.10      misho     439:                fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
1.1       misho     440: 
1.21      misho     441: #ifdef TCP_SESSION_TIMEOUT
                    442:        /* armed timer for close stateless connection */
                    443:        schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
                    444:        schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], c, 
                    445:                        ts, c, 0);
                    446: #endif
1.13      misho     447:        schedRead(TASK_ROOT(task), cbProto[srv->srv_proto][CB_RXPACKET], c, 
                    448:                        c->cli_sock, NULL, 0);
1.10      misho     449: end:
                    450:        schedReadSelf(task);
                    451:        return NULL;
                    452: }
1.5       misho     453: 
1.7       misho     454: 
1.10      misho     455: static void *
1.13      misho     456: txUDPPacket(sched_task_t *task)
1.10      misho     457: {
                    458:        rpc_cli_t *c = TASK_ARG(task);
                    459:        rpc_srv_t *s = c->cli_parent;
1.13      misho     460:        rpc_func_t *f = NULL;
1.18      misho     461:        u_char *buf = AIT_GET_BUF(&c->cli_buf);
1.13      misho     462:        struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
1.22      misho     463:        int ret, estlen, wlen = sizeof(struct tagRPCCall);
1.13      misho     464:        struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
1.22      misho     465:        struct pollfd pfd;
1.13      misho     466: 
                    467:        schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
                    468:        schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                    469:                         TASK_ARG(task), ts, TASK_ARG(task), 0);
                    470: 
                    471:        if (rpc->call_argc) {
                    472:                f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
                    473:                if (!f) {
                    474:                        rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
                    475:                        rpc->call_argc ^= rpc->call_argc;
                    476:                        rpc->call_rep.ret = RPC_ERROR(-1);
                    477:                        rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                    478:                } else {
1.22      misho     479:                        /* calc estimated length */
                    480:                        estlen = ait_resideVars(RPC_RETVARS(c)) + wlen;
                    481:                        if (estlen > AIT_LEN(&c->cli_buf))
                    482:                                AIT_RE_BUF(&c->cli_buf, estlen);
                    483:                        buf = AIT_GET_BUF(&c->cli_buf);
                    484:                        rpc = (struct tagRPCCall*) buf;
                    485: 
1.14      misho     486:                        rpc->call_argc = htons(array_Size(RPC_RETVARS(c)));
1.13      misho     487:                        /* Go Encapsulate variables */
1.18      misho     488:                        ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen, 
                    489:                                        RPC_RETVARS(c));
1.13      misho     490:                        /* Free return values */
1.14      misho     491:                        ait_freeVars(&c->cli_vars);
1.13      misho     492:                        if (ret == -1) {
                    493:                                rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
                    494:                                rpc->call_argc ^= rpc->call_argc;
                    495:                                rpc->call_rep.ret = RPC_ERROR(-1);
                    496:                                rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                    497:                        } else
                    498:                                wlen += ret;
                    499:                }
                    500:        }
1.7       misho     501: 
1.18      misho     502:        rpc->call_len = htonl(wlen);
1.7       misho     503: 
1.13      misho     504:        /* calculate CRC */
                    505:        rpc->call_crc ^= rpc->call_crc;
                    506:        rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
                    507: 
                    508:        /* send reply */
1.22      misho     509:        pfd.fd = TASK_FD(task);
                    510:        pfd.events = POLLOUT;
                    511:        for (; wlen > 0; wlen -= ret, buf += ret) {
                    512:                if ((ret = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 || 
                    513:                                pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
                    514:                        if (ret)
                    515:                                LOGERR;
                    516:                        else
                    517:                                rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
                    518:                        /* close connection */
                    519:                        schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                    520:                                        TASK_ARG(task), 0, NULL, 0);
                    521:                        return NULL;
                    522:                }
                    523:                ret = sendto(TASK_FD(task), buf, MIN(wlen, s->srv_netbuf), MSG_NOSIGNAL, 
                    524:                                &c->cli_sa.sa, c->cli_sa.sa.sa_len);
                    525:                if (ret == -1) {
                    526:                        /* close connection */
                    527:                        schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                    528:                                        TASK_ARG(task), 0, NULL, 0);
                    529:                        return NULL;
                    530:                }
1.13      misho     531:        }
                    532: 
                    533:        return NULL;
                    534: }
                    535: 
                    536: static void *
                    537: rxUDPPacket(sched_task_t *task)
                    538: {
                    539:        rpc_srv_t *srv = TASK_ARG(task);
                    540:        rpc_cli_t *c = NULL;
1.22      misho     541:        int len, rlen, noreply, estlen;
                    542:        u_short crc;
                    543:        u_char *buf, b[sizeof(struct tagRPCCall)];
                    544:        struct tagRPCCall *rpc = (struct tagRPCCall*) b;
1.14      misho     545:        sockaddr_t sa;
                    546:        socklen_t salen;
1.13      misho     547:        struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
1.22      misho     548:        struct pollfd pfd;
1.13      misho     549: 
                    550:        /* receive connect packet */
1.14      misho     551:        salen = sa.ss.ss_len = sizeof(sockaddr_t);
1.22      misho     552:        rlen = recvfrom(TASK_FD(task), b, sizeof b, MSG_PEEK, &sa.sa, &salen);
                    553:        if (rlen < sizeof(struct tagRPCCall)) {
                    554:                rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
1.13      misho     555:                goto end;
1.22      misho     556:        }
1.13      misho     557: 
                    558:        c = _allocClient(srv, &sa);
1.22      misho     559:        if (!c) {
                    560:                EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
                    561:                usleep(2000);   /* blocked client delay */
1.13      misho     562:                goto end;
1.22      misho     563:        } else {
                    564:                estlen = ntohl(rpc->call_len);
                    565:                if (estlen > AIT_LEN(&c->cli_buf))
                    566:                        AIT_RE_BUF(&c->cli_buf, estlen);
                    567:                rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
                    568:                buf = AIT_GET_BUF(&c->cli_buf);
                    569:                len = estlen;
                    570: 
1.13      misho     571:                c->cli_sock = TASK_FD(task);
                    572:                memcpy(&c->cli_sa, &sa, sizeof c->cli_sa);
1.22      misho     573: 
1.13      misho     574:                /* armed timer for close stateless connection */
                    575:                schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
                    576:                schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], 
                    577:                                c, ts, c, 0);
                    578:        }
                    579: 
1.22      misho     580:        /* get next part of packet */
                    581:        memset(buf, 0, len);
                    582:        pfd.fd = TASK_FD(task);
                    583:        pfd.events = POLLIN | POLLPRI;
                    584:        for (; len > 0; len -= rlen, buf += rlen) {
                    585:                if ((rlen = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 || 
                    586:                                pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
                    587:                        if (rlen)
                    588:                                LOGERR;
                    589:                        else
                    590:                                rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
                    591:                        schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], 
                    592:                                        c, 0, NULL, 0);
1.24      misho     593:                        goto end;
1.13      misho     594:                }
1.22      misho     595:                salen = sa.ss.ss_len = sizeof(sockaddr_t);
                    596:                rlen = recvfrom(TASK_FD(task), buf, len, 0, &sa.sa, &salen);
                    597:                if (rlen == -1) {
                    598:                        /* close connection */
                    599:                        schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], 
                    600:                                        c, 0, NULL, 0);
1.24      misho     601:                        goto end;
1.13      misho     602:                }
1.22      misho     603:                if (e_addrcmp(&c->cli_sa, &sa, 42))
                    604:                        rlen ^= rlen;   /* skip if arrive from different address */
                    605:        }
                    606:        len = estlen;
1.13      misho     607: 
1.22      misho     608:        /* check integrity of packet */
                    609:        crc = ntohs(rpc->call_crc);
                    610:        rpc->call_crc ^= rpc->call_crc;
                    611:        if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
                    612:                rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
1.24      misho     613:                /* close connection */
                    614:                schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], 
                    615:                                c, 0, NULL, 0);
                    616:                goto end;
                    617:        }
                    618: 
                    619:        noreply = RPC_CHK_NOREPLY(rpc);
                    620: 
                    621:        /* check RPC packet session info */
                    622:        if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
                    623:                rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
                    624: 
                    625:                rpc->call_argc ^= rpc->call_argc;
                    626:                rpc->call_rep.ret = RPC_ERROR(-1);
                    627:                rpc->call_rep.eno = RPC_ERROR(errno);
                    628:        } else {
                    629:                /* execute RPC call */
                    630:                schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
                    631:        }
                    632: 
                    633:        /* send RPC reply */
                    634:        if (!noreply)
                    635:                schedWrite(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], 
                    636:                                c, TASK_FD(task), rpc, len);
                    637: end:
                    638:        schedReadSelf(task);
                    639:        return NULL;
                    640: }
                    641: 
                    642: 
                    643: static void *
                    644: txBPFPacket(sched_task_t *task)
                    645: {
                    646:        rpc_cli_t *c = TASK_ARG(task);
                    647:        rpc_srv_t *s = c->cli_parent;
                    648:        rpc_func_t *f = NULL;
                    649:        u_char *buf = AIT_GET_BUF(&c->cli_buf);
                    650:        struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
                    651:        int ret, len, wlen = sizeof(struct tagRPCCall);
                    652:        struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
                    653:        struct ether_header *eh;
                    654:        ait_val_t b = AIT_VAL_INIT;
                    655: 
                    656:        schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, TASK_ARG(task), NULL);
                    657:        schedTimer(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                    658:                         TASK_ARG(task), ts, TASK_ARG(task), 0);
                    659: 
                    660:        if (rpc->call_argc) {
                    661:                f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
                    662:                if (!f) {
                    663:                        rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
                    664:                        rpc->call_argc ^= rpc->call_argc;
                    665:                        rpc->call_rep.ret = RPC_ERROR(-1);
                    666:                        rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                    667:                } else {
                    668:                        /* calc estimated length */
                    669:                        len = ait_resideVars(RPC_RETVARS(c)) + wlen;
                    670:                        if (len > AIT_LEN(&c->cli_buf))
                    671:                                AIT_RE_BUF(&c->cli_buf, len);
                    672:                        buf = AIT_GET_BUF(&c->cli_buf);
                    673:                        rpc = (struct tagRPCCall*) buf;
                    674: 
                    675:                        rpc->call_argc = htons(array_Size(RPC_RETVARS(c)));
                    676:                        /* Go Encapsulate variables */
                    677:                        ret = ait_vars2buffer(buf + wlen, AIT_LEN(&c->cli_buf) - wlen, 
                    678:                                        RPC_RETVARS(c));
                    679:                        /* Free return values */
                    680:                        ait_freeVars(&RPC_RETVARS(c));
                    681:                        if (ret == -1) {
                    682:                                rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
                    683:                                rpc->call_argc ^= rpc->call_argc;
                    684:                                rpc->call_rep.ret = RPC_ERROR(-1);
                    685:                                rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
                    686:                        } else
                    687:                                wlen += ret;
                    688:                }
                    689:        }
                    690: 
                    691:        rpc->call_len = htonl(wlen);
                    692: 
                    693:        /* calculate CRC */
                    694:        rpc->call_crc ^= rpc->call_crc;
                    695:        rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
                    696: 
                    697:        /* send reply */
                    698:        AIT_SET_BUF(&b, NULL, MIN(wlen, s->srv_netbuf) + ETHER_HDR_LEN);
                    699:        eh = (struct ether_header*) AIT_GET_BUF(&b);
                    700:        memcpy(eh->ether_dhost, LLADDR(&c->cli_sa.sdl), ETHER_ADDR_LEN);
                    701:        eh->ether_type = htons(RPC_DEFPORT);
                    702:        memcpy(eh + 1, buf, MIN(wlen, s->srv_netbuf));
                    703: 
                    704:        ret = write(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b));
                    705:        AIT_FREE_VAL(&b);
                    706:        if (ret == -1) {
                    707:                /* close connection */
                    708:                schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                    709:                                TASK_ARG(task), 0, NULL, 0);
1.22      misho     710:                return NULL;
                    711:        }
1.13      misho     712: 
1.24      misho     713:        return NULL;
                    714: }
                    715: 
                    716: static void *
                    717: rxBPFPacket(sched_task_t *task)
                    718: {
                    719:        rpc_srv_t *srv = TASK_ARG(task);
                    720:        rpc_cli_t *c = NULL;
                    721:        int len, rlen, noreply;
                    722:        u_short crc;
                    723:        struct tagRPCCall *rpc;
                    724:        sockaddr_t sa;
                    725:        struct timespec ts = { DEF_RPC_TIMEOUT, 0 };
                    726:        struct bpf_hdr *h;
                    727:        struct ether_header *eh;
                    728:        ait_val_t b = AIT_VAL_INIT;
                    729: 
                    730:        /* receive connect packet */
                    731:        AIT_SET_BUF(&b, NULL, srv->srv_netbuf);
                    732:        rlen = read(TASK_FD(task), AIT_GET_BUF(&b), AIT_LEN(&b));
                    733:        h = (struct bpf_hdr*) AIT_GET_BUF(&b);
                    734:        rlen -= h->bh_hdrlen;
                    735:        if (rlen < h->bh_datalen || h->bh_caplen != h->bh_datalen || 
                    736:                        rlen < ETHER_HDR_LEN + sizeof(struct tagRPCCall)) {
                    737:                rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
                    738:                goto end;
                    739:        } else {
                    740:                rlen = h->bh_caplen;
                    741:                eh = (struct ether_header*) (AIT_GET_BUF(&b) + h->bh_hdrlen);
                    742:                rlen -= ETHER_HDR_LEN;
                    743:                rpc = (struct tagRPCCall*) (eh + 1);
                    744:                if (eh->ether_type != ntohs(RPC_DEFPORT))
                    745:                        goto end;
                    746:                else
                    747:                        e_getlinkbymac((const ether_addr_t*) eh->ether_shost, &sa);
                    748:        }
                    749: 
                    750:        c = _allocClient(srv, &sa);
                    751:        if (!c) {
                    752:                EVERBOSE(1, "RPC client quota exceeded! Connection will be shutdown!\n");
                    753:                usleep(2000);   /* blocked client delay */
                    754:                goto end;
                    755:        } else {
                    756:                len = ntohl(rpc->call_len);
                    757:                if (len > AIT_LEN(&c->cli_buf))
                    758:                        AIT_RE_BUF(&c->cli_buf, len);
                    759:                memcpy(AIT_GET_BUF(&c->cli_buf), rpc, AIT_LEN(&c->cli_buf));
                    760:                rpc = (struct tagRPCCall*) AIT_GET_BUF(&c->cli_buf);
                    761: 
                    762:                c->cli_sock = TASK_FD(task);
                    763:                memcpy(&c->cli_sa, &sa, sizeof c->cli_sa);
                    764: 
                    765:                /* armed timer for close stateless connection */
                    766:                schedCancelby(TASK_ROOT(task), taskTIMER, CRITERIA_DATA, c, NULL);
                    767:                schedTimer(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], 
                    768:                                c, ts, c, 0);
                    769:        }
                    770: 
                    771:        /* check integrity of packet */
                    772:        crc = ntohs(rpc->call_crc);
                    773:        rpc->call_crc ^= rpc->call_crc;
                    774:        if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
                    775:                rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
                    776:                /* close connection */
                    777:                schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], 
                    778:                                c, 0, NULL, 0);
                    779:                goto end;
                    780:        }
                    781: 
1.22      misho     782:        noreply = RPC_CHK_NOREPLY(rpc);
1.18      misho     783: 
1.22      misho     784:        /* check RPC packet session info */
                    785:        if (rpc_chkPktSession(&rpc->call_session, &srv->srv_session)) {
                    786:                rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
1.13      misho     787: 
1.22      misho     788:                rpc->call_argc ^= rpc->call_argc;
                    789:                rpc->call_rep.ret = RPC_ERROR(-1);
                    790:                rpc->call_rep.eno = RPC_ERROR(errno);
                    791:        } else {
                    792:                /* execute RPC call */
                    793:                schedEvent(TASK_ROOT(task), execCall, c, (int) noreply, rpc, len);
                    794:        }
1.13      misho     795: 
1.22      misho     796:        /* send RPC reply */
                    797:        if (!noreply)
1.24      misho     798:                schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_TXPACKET], 
1.22      misho     799:                                c, TASK_FD(task), rpc, len);
1.13      misho     800: end:
1.24      misho     801:        AIT_FREE_VAL(&b);
1.13      misho     802:        schedReadSelf(task);
                    803:        return NULL;
                    804: }
                    805: 
                    806: /* ------------------------------------------------------ */
                    807: 
1.16      misho     808: void
1.13      misho     809: rpc_freeBLOBCli(rpc_cli_t * __restrict c)
                    810: {
                    811:        rpc_srv_t *s = c->cli_parent;
                    812: 
                    813:        schedCancelby(s->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
1.10      misho     814: 
                    815:        /* free buffer */
                    816:        AIT_FREE_VAL(&c->cli_buf);
                    817: 
1.14      misho     818:        array_Del(s->srv_blob.clients, c->cli_id, 0);
1.10      misho     819:        if (c)
1.14      misho     820:                e_free(c);
1.13      misho     821: }
                    822: 
                    823: 
                    824: static void *
                    825: closeBLOBClient(sched_task_t *task)
                    826: {
                    827:        int sock = ((rpc_cli_t*) TASK_ARG(task))->cli_sock;
                    828: 
                    829:        rpc_freeBLOBCli(TASK_ARG(task));
                    830: 
                    831:        /* close client socket */
                    832:        shutdown(sock, SHUT_RDWR);
                    833:        close(sock);
1.7       misho     834:        return NULL;
                    835: }
                    836: 
                    837: static void *
                    838: txBLOB(sched_task_t *task)
                    839: {
1.10      misho     840:        rpc_cli_t *c = TASK_ARG(task);
                    841:        u_char *buf = AIT_GET_BUF(&c->cli_buf);
1.7       misho     842:        int wlen = sizeof(struct tagBLOBHdr);
                    843: 
                    844:        /* send reply */
1.10      misho     845:        wlen = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);
                    846:        if (wlen == -1 || wlen != sizeof(struct tagBLOBHdr)) {
                    847:                /* close blob connection */
                    848:                schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
                    849:        }
1.7       misho     850: 
                    851:        return NULL;
                    852: }
1.4       misho     853: 
1.7       misho     854: static void *
                    855: rxBLOB(sched_task_t *task)
                    856: {
                    857:        rpc_cli_t *c = TASK_ARG(task);
                    858:        rpc_srv_t *s = c->cli_parent;
                    859:        rpc_blob_t *b;
1.10      misho     860:        struct tagBLOBHdr blob;
1.7       misho     861:        int rlen;
                    862: 
1.10      misho     863:        memset(&blob, 0, sizeof blob);
                    864:        rlen = recv(TASK_FD(task), &blob, sizeof blob, 0);
                    865:        if (rlen < 1) {
                    866:                /* close blob connection */
                    867:                schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
1.7       misho     868:                return NULL;
                    869:        }
1.5       misho     870: 
1.10      misho     871:        /* check BLOB packet */
                    872:        if (rlen < sizeof(struct tagBLOBHdr)) {
                    873:                rpc_SetErr(ERPCMISMATCH, "Short BLOB packet");
1.6       misho     874: 
1.10      misho     875:                schedReadSelf(task);
1.7       misho     876:                return NULL;
                    877:        }
1.1       misho     878: 
1.7       misho     879:        /* check RPC packet session info */
1.15      misho     880:        if (rpc_chkPktSession(&blob.hdr_session, &s->srv_session)) {
1.7       misho     881:                rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
1.10      misho     882:                blob.hdr_cmd = error;
1.7       misho     883:                goto end;
                    884:        }
                    885: 
                    886:        /* Go to proceed packet ... */
1.10      misho     887:        switch (blob.hdr_cmd) {
1.7       misho     888:                case get:
1.10      misho     889:                        if (!(b = rpc_srv_getBLOB(s, ntohl(blob.hdr_var)))) {
                    890:                                rpc_SetErr(EINVAL, "Var=%x not found", ntohl(blob.hdr_var));
                    891:                                blob.hdr_cmd = no;
                    892:                                blob.hdr_ret = RPC_ERROR(-1);
1.7       misho     893:                                break;
                    894:                        } else
1.10      misho     895:                                blob.hdr_len = htonl(b->blob_len);
1.5       misho     896: 
1.7       misho     897:                        if (rpc_srv_blobMap(s, b) != -1) {
                    898:                                /* deliver BLOB variable to client */
1.10      misho     899:                                blob.hdr_ret = htonl(rpc_srv_sendBLOB(c, b));
1.7       misho     900:                                rpc_srv_blobUnmap(b);
                    901:                        } else {
1.10      misho     902:                                blob.hdr_cmd = error;
                    903:                                blob.hdr_ret = RPC_ERROR(-1);
1.7       misho     904:                        }
                    905:                        break;
                    906:                case set:
1.17      misho     907:                        if ((b = rpc_srv_registerBLOB(s, ntohl(blob.hdr_len), 
                    908:                                                        ntohl(blob.hdr_ret)))) {
1.7       misho     909:                                /* set new BLOB variable for reply :) */
1.10      misho     910:                                blob.hdr_var = htonl(b->blob_var);
1.7       misho     911: 
                    912:                                /* receive BLOB from client */
1.10      misho     913:                                blob.hdr_ret = htonl(rpc_srv_recvBLOB(c, b));
1.7       misho     914:                                rpc_srv_blobUnmap(b);
1.5       misho     915:                        } else {
1.10      misho     916:                                blob.hdr_cmd = error;
                    917:                                blob.hdr_ret = RPC_ERROR(-1);
1.7       misho     918:                        }
                    919:                        break;
                    920:                case unset:
1.11      misho     921:                        if (rpc_srv_unregisterBLOB(s, ntohl(blob.hdr_var)) == -1) {
1.10      misho     922:                                blob.hdr_cmd = error;
                    923:                                blob.hdr_ret = RPC_ERROR(-1);
1.1       misho     924:                        }
                    925:                        break;
1.7       misho     926:                default:
1.10      misho     927:                        rpc_SetErr(EPROCUNAVAIL, "Unsupported BLOB command %d", blob.hdr_cmd);
                    928:                        blob.hdr_cmd = error;
                    929:                        blob.hdr_ret = RPC_ERROR(-1);
1.7       misho     930:        }
1.1       misho     931: 
1.7       misho     932: end:
1.10      misho     933:        memcpy(AIT_ADDR(&c->cli_buf), &blob, sizeof blob);
                    934:        schedWrite(TASK_ROOT(task), txBLOB, TASK_ARG(task), TASK_FD(task), NULL, 0);
                    935:        schedReadSelf(task);
1.7       misho     936:        return NULL;
1.2       misho     937: }
                    938: 
                    939: static void *
1.17      misho     940: flushBLOB(sched_task_t *task)
                    941: {
1.23      misho     942:        uintptr_t sigArg = atomic_load_acq_ptr(&_glSigArg);
                    943:        rpc_srv_t *srv = sigArg ? (void*) sigArg : TASK_ARG(task);
1.17      misho     944:        rpc_blob_t *b, *tmp;
                    945: 
                    946:        TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
                    947:                TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
                    948: 
                    949:                rpc_srv_blobFree(srv, b);
                    950:                e_free(b);
                    951:        }
                    952: 
1.23      misho     953:        if (!schedSignalSelf(task)) {
                    954:                /* disabled kqueue support in libaitsched */
                    955:                struct sigaction sa;
                    956: 
                    957:                memset(&sa, 0, sizeof sa);
                    958:                sigemptyset(&sa.sa_mask);
                    959:                sa.sa_handler = (void (*)(int)) flushBLOB;
                    960:                sa.sa_flags = SA_RESTART | SA_RESETHAND;
                    961:                sigaction(SIGFBLOB, &sa, NULL);
                    962:        }
                    963: 
1.17      misho     964:        return NULL;
                    965: }
                    966: 
                    967: static void *
1.10      misho     968: acceptBLOBClients(sched_task_t *task)
1.2       misho     969: {
1.10      misho     970:        rpc_srv_t *srv = TASK_ARG(task);
                    971:        rpc_cli_t *c = NULL;
                    972:        register int i;
1.14      misho     973:        socklen_t salen = sizeof(sockaddr_t);
1.21      misho     974:        int sock;
1.12      misho     975: #ifdef TCP_NOPUSH
                    976:        int n = 1;
                    977: #endif
1.7       misho     978: 
1.10      misho     979:        /* check free slots for connect */
1.14      misho     980:        for (i = 0; i < array_Size(srv->srv_blob.clients) && 
                    981:                        (c = array(srv->srv_blob.clients, i, rpc_cli_t*)); i++);
1.21      misho     982:        if (c) {        /* no more free slots! */
                    983:                EVERBOSE(1, "BLOB client quota exceeded! Connection will be shutdown!\n");
                    984:                if ((sock = accept(TASK_FD(task), NULL, NULL)) != -1) {
                    985:                        shutdown(sock, SHUT_RDWR);
                    986:                        close(sock);
                    987:                }
1.10      misho     988:                goto end;
1.21      misho     989:        }
                    990: 
1.14      misho     991:        c = e_malloc(sizeof(rpc_cli_t));
1.10      misho     992:        if (!c) {
1.7       misho     993:                LOGERR;
1.10      misho     994:                srv->srv_kill = srv->srv_blob.kill = 1;
1.7       misho     995:                return NULL;
                    996:        } else {
1.10      misho     997:                memset(c, 0, sizeof(rpc_cli_t));
1.14      misho     998:                array_Set(srv->srv_blob.clients, i, c);
1.10      misho     999:                c->cli_id = i;
                   1000:                c->cli_parent = srv;
1.7       misho    1001:        }
1.4       misho    1002: 
1.10      misho    1003:        /* alloc empty buffer */
1.14      misho    1004:        AIT_SET_BUFSIZ(&c->cli_buf, 0, srv->srv_netbuf);
1.2       misho    1005: 
1.10      misho    1006:        /* accept client */
                   1007:        c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
                   1008:        if (c->cli_sock == -1) {
                   1009:                LOGERR;
                   1010:                AIT_FREE_VAL(&c->cli_buf);
1.14      misho    1011:                array_Del(srv->srv_blob.clients, i, 42);
1.10      misho    1012:                goto end;
1.12      misho    1013:        } else {
                   1014: #ifdef TCP_NOPUSH
                   1015:                setsockopt(c->cli_sock, IPPROTO_TCP, TCP_NOPUSH, &n, sizeof n);
                   1016: #endif
1.10      misho    1017:                fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
1.12      misho    1018:        }
1.2       misho    1019: 
1.10      misho    1020:        schedRead(TASK_ROOT(task), rxBLOB, c, c->cli_sock, NULL, 0);
                   1021: end:
                   1022:        schedReadSelf(task);
1.7       misho    1023:        return NULL;
1.1       misho    1024: }
                   1025: 
1.10      misho    1026: /* ------------------------------------------------------ */
1.1       misho    1027: 
                   1028: /*
1.7       misho    1029:  * rpc_srv_initBLOBServer() - Init & create BLOB Server
                   1030:  *
1.4       misho    1031:  * @srv = RPC server instance
1.2       misho    1032:  * @Port = Port for bind server, if Port == 0 default port is selected
                   1033:  * @diskDir = Disk place for BLOB file objects
                   1034:  * return: -1 == error or 0 bind and created BLOB server instance
                   1035:  */
                   1036: int
                   1037: rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)
                   1038: {
                   1039:        int n = 1;
                   1040: 
1.10      misho    1041:        if (!srv || srv->srv_kill) {
1.7       misho    1042:                rpc_SetErr(EINVAL, "Invalid parameters can`t init BLOB server");
1.2       misho    1043:                return -1;
                   1044:        }
                   1045: 
                   1046:        memset(&srv->srv_blob, 0, sizeof srv->srv_blob);
                   1047:        if (access(diskDir, R_OK | W_OK) == -1) {
                   1048:                LOGERR;
                   1049:                return -1;
                   1050:        } else
1.7       misho    1051:                AIT_SET_STR(&srv->srv_blob.dir, diskDir);
1.2       misho    1052: 
1.10      misho    1053:        /* init blob list */
                   1054:        TAILQ_INIT(&srv->srv_blob.blobs);
                   1055: 
1.2       misho    1056:        srv->srv_blob.server.cli_parent = srv;
1.4       misho    1057: 
1.14      misho    1058:        memcpy(&srv->srv_blob.server.cli_sa, &srv->srv_server.cli_sa, sizeof(sockaddr_t));
1.10      misho    1059:        switch (srv->srv_blob.server.cli_sa.sa.sa_family) {
1.4       misho    1060:                case AF_INET:
1.10      misho    1061:                        srv->srv_blob.server.cli_sa.sin.sin_port = 
                   1062:                                htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin.sin_port) + 1);
1.4       misho    1063:                        break;
                   1064:                case AF_INET6:
1.10      misho    1065:                        srv->srv_blob.server.cli_sa.sin6.sin6_port = 
                   1066:                                htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin6.sin6_port) + 1);
1.4       misho    1067:                        break;
                   1068:                case AF_LOCAL:
1.10      misho    1069:                        strlcat(srv->srv_blob.server.cli_sa.sun.sun_path, ".blob", 
                   1070:                                        sizeof srv->srv_blob.server.cli_sa.sun.sun_path);
1.4       misho    1071:                        break;
                   1072:                default:
1.7       misho    1073:                        AIT_FREE_VAL(&srv->srv_blob.dir);
1.4       misho    1074:                        return -1;
1.2       misho    1075:        }
                   1076: 
1.4       misho    1077:        /* create BLOB server socket */
1.6       misho    1078:        srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);
1.2       misho    1079:        if (srv->srv_blob.server.cli_sock == -1) {
                   1080:                LOGERR;
1.7       misho    1081:                AIT_FREE_VAL(&srv->srv_blob.dir);
1.2       misho    1082:                return -1;
                   1083:        }
                   1084:        if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
                   1085:                LOGERR;
                   1086:                close(srv->srv_blob.server.cli_sock);
1.7       misho    1087:                AIT_FREE_VAL(&srv->srv_blob.dir);
1.2       misho    1088:                return -1;
                   1089:        }
1.5       misho    1090:        n = srv->srv_netbuf;
                   1091:        if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
                   1092:                LOGERR;
                   1093:                close(srv->srv_blob.server.cli_sock);
1.7       misho    1094:                AIT_FREE_VAL(&srv->srv_blob.dir);
1.5       misho    1095:                return -1;
                   1096:        }
                   1097:        if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
                   1098:                LOGERR;
                   1099:                close(srv->srv_blob.server.cli_sock);
1.7       misho    1100:                AIT_FREE_VAL(&srv->srv_blob.dir);
1.5       misho    1101:                return -1;
                   1102:        }
1.6       misho    1103:        if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa, 
                   1104:                                srv->srv_blob.server.cli_sa.sa.sa_len) == -1) {
1.2       misho    1105:                LOGERR;
                   1106:                close(srv->srv_blob.server.cli_sock);
1.7       misho    1107:                AIT_FREE_VAL(&srv->srv_blob.dir);
1.2       misho    1108:                return -1;
1.13      misho    1109:        } else
                   1110:                fcntl(srv->srv_blob.server.cli_sock, F_SETFL, 
                   1111:                                fcntl(srv->srv_blob.server.cli_sock, F_GETFL) | O_NONBLOCK);
                   1112: 
1.2       misho    1113: 
1.10      misho    1114:        /* allocate pool for concurent blob clients */
1.14      misho    1115:        srv->srv_blob.clients = array_Init(array_Size(srv->srv_clients));
1.2       misho    1116:        if (!srv->srv_blob.clients) {
1.14      misho    1117:                rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
1.2       misho    1118:                close(srv->srv_blob.server.cli_sock);
1.7       misho    1119:                AIT_FREE_VAL(&srv->srv_blob.dir);
1.2       misho    1120:                return -1;
1.10      misho    1121:        }
1.2       misho    1122: 
1.10      misho    1123:        /* init blob scheduler */
                   1124:        srv->srv_blob.root = schedBegin();
                   1125:        if (!srv->srv_blob.root) {
                   1126:                rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1.14      misho    1127:                array_Destroy(&srv->srv_blob.clients);
1.10      misho    1128:                close(srv->srv_blob.server.cli_sock);
                   1129:                AIT_FREE_VAL(&srv->srv_blob.dir);
                   1130:                return -1;
                   1131:        }
1.2       misho    1132: 
                   1133:        return 0;
                   1134: }
                   1135: 
                   1136: /*
1.7       misho    1137:  * rpc_srv_endBLOBServer() - Destroy BLOB server, close all opened sockets and free resources
                   1138:  *
1.2       misho    1139:  * @srv = RPC Server instance
                   1140:  * return: none
                   1141:  */
1.16      misho    1142: void
1.2       misho    1143: rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
                   1144: {
1.10      misho    1145:        if (!srv)
1.2       misho    1146:                return;
                   1147: 
1.10      misho    1148:        srv->srv_blob.kill = 1;
1.17      misho    1149: 
                   1150:        schedEnd(&srv->srv_blob.root);
1.2       misho    1151: }
                   1152: 
                   1153: /*
1.10      misho    1154:  * rpc_srv_loopBLOBServer() - Execute Main BLOB server loop and wait for clients requests
1.7       misho    1155:  *
1.2       misho    1156:  * @srv = RPC Server instance
                   1157:  * return: -1 error or 0 ok, infinite loop ...
                   1158:  */
                   1159: int
1.10      misho    1160: rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv)
1.2       misho    1161: {
1.10      misho    1162:        rpc_cli_t *c;
1.2       misho    1163:        register int i;
1.10      misho    1164:        rpc_blob_t *b, *tmp;
                   1165:        struct timespec ts = { RPC_SCHED_POLLING, 0 };
1.2       misho    1166: 
1.10      misho    1167:        if (!srv || srv->srv_kill) {
1.7       misho    1168:                rpc_SetErr(EINVAL, "Invalid parameter can`t start BLOB server");
1.2       misho    1169:                return -1;
                   1170:        }
                   1171: 
1.14      misho    1172:        if (listen(srv->srv_blob.server.cli_sock, array_Size(srv->srv_blob.clients)) == -1) {
1.2       misho    1173:                LOGERR;
                   1174:                return -1;
1.10      misho    1175:        }
                   1176: 
1.23      misho    1177:        if (!schedSignal(srv->srv_blob.root, flushBLOB, srv, SIGFBLOB, NULL, 0)) {
                   1178:                /* disabled kqueue support in libaitsched */
                   1179:                struct sigaction sa;
                   1180: 
                   1181:                atomic_store_rel_ptr(&_glSigArg, (uintptr_t) srv);
                   1182: 
                   1183:                memset(&sa, 0, sizeof sa);
                   1184:                sigemptyset(&sa.sa_mask);
                   1185:                sa.sa_handler = (void (*)(int)) flushBLOB;
                   1186:                sa.sa_flags = SA_RESTART | SA_RESETHAND;
                   1187:                sigaction(SIGFBLOB, &sa, NULL);
                   1188:        }
                   1189: 
1.10      misho    1190:        if (!schedRead(srv->srv_blob.root, acceptBLOBClients, srv, 
                   1191:                                srv->srv_blob.server.cli_sock, NULL, 0)) {
                   1192:                rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                   1193:                return -1;
                   1194:        }
1.2       misho    1195: 
1.10      misho    1196:        schedPolling(srv->srv_blob.root, &ts, NULL);
                   1197:        /* main rpc loop */
                   1198:        schedRun(srv->srv_blob.root, &srv->srv_blob.kill);
1.7       misho    1199: 
1.17      misho    1200:        /* detach blobs */
                   1201:        TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
                   1202:                TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
                   1203: 
                   1204:                rpc_srv_blobFree(srv, b);
                   1205:                e_free(b);
                   1206:        }
                   1207: 
1.10      misho    1208:        /* close all clients connections & server socket */
1.14      misho    1209:        for (i = 0; i < array_Size(srv->srv_blob.clients); i++) {
                   1210:                c = array(srv->srv_blob.clients, i, rpc_cli_t*);
1.10      misho    1211:                if (c) {
                   1212:                        shutdown(c->cli_sock, SHUT_RDWR);
                   1213:                        close(c->cli_sock);
                   1214: 
                   1215:                        schedCancelby(srv->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
                   1216:                        AIT_FREE_VAL(&c->cli_buf);
1.2       misho    1217:                }
1.14      misho    1218:                array_Del(srv->srv_blob.clients, i, 42);
1.10      misho    1219:        }
1.14      misho    1220:        array_Destroy(&srv->srv_blob.clients);
1.2       misho    1221: 
1.10      misho    1222:        close(srv->srv_blob.server.cli_sock);
1.2       misho    1223: 
1.10      misho    1224:        AIT_FREE_VAL(&srv->srv_blob.dir);
1.2       misho    1225:        return 0;
                   1226: }
                   1227: 
                   1228: 
                   1229: /*
1.7       misho    1230:  * rpc_srv_initServer() - Init & create RPC Server
                   1231:  *
1.15      misho    1232:  * @InstID = Instance for authentication & recognition
1.1       misho    1233:  * @concurentClients = Concurent clients at same time to this server
1.10      misho    1234:  * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
1.4       misho    1235:  * @csHost = Host name or address for bind server, if NULL any address
1.1       misho    1236:  * @Port = Port for bind server, if Port == 0 default port is selected
1.13      misho    1237:  * @proto = Protocol, if == 0 choose SOCK_STREAM
1.1       misho    1238:  * return: NULL == error or !=NULL bind and created RPC server instance
                   1239:  */
                   1240: rpc_srv_t *
1.15      misho    1241: rpc_srv_initServer(u_char InstID, int concurentClients, int netBuf, 
                   1242:                const char *csHost, u_short Port, int proto)
1.1       misho    1243: {
1.10      misho    1244:        int n = 1;
1.1       misho    1245:        rpc_srv_t *srv = NULL;
1.14      misho    1246:        sockaddr_t sa = E_SOCKADDR_INIT;
1.1       misho    1247: 
1.24.2.2  misho    1248:        if (!concurentClients || (proto < 0 || proto > SOCK_RAW)) {
1.10      misho    1249:                rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
1.1       misho    1250:                return NULL;
                   1251:        }
1.14      misho    1252:        if (!e_gethostbyname(csHost, Port, &sa))
1.10      misho    1253:                return NULL;
1.1       misho    1254:        if (!Port)
                   1255:                Port = RPC_DEFPORT;
1.13      misho    1256:        if (!proto)
                   1257:                proto = SOCK_STREAM;
1.10      misho    1258:        if (netBuf < RPC_MIN_BUFSIZ)
1.5       misho    1259:                netBuf = BUFSIZ;
1.7       misho    1260:        else
1.14      misho    1261:                netBuf = E_ALIGN(netBuf, 2);    /* align netBuf length */
1.10      misho    1262: 
                   1263: #ifdef HAVE_SRANDOMDEV
                   1264:        srandomdev();
                   1265: #else
                   1266:        time_t tim;
                   1267: 
                   1268:        srandom((time(&tim) ^ getpid()));
                   1269: #endif
1.1       misho    1270: 
1.14      misho    1271:        srv = e_malloc(sizeof(rpc_srv_t));
1.1       misho    1272:        if (!srv) {
                   1273:                LOGERR;
                   1274:                return NULL;
                   1275:        } else
                   1276:                memset(srv, 0, sizeof(rpc_srv_t));
                   1277: 
1.13      misho    1278:        srv->srv_proto = proto;
1.5       misho    1279:        srv->srv_netbuf = netBuf;
1.1       misho    1280:        srv->srv_session.sess_version = RPC_VERSION;
1.15      misho    1281:        srv->srv_session.sess_instance = InstID;
1.1       misho    1282: 
                   1283:        srv->srv_server.cli_parent = srv;
1.10      misho    1284:        memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa);
                   1285: 
1.12      misho    1286:        /* init functions */
                   1287:        pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
                   1288:        SLIST_INIT(&srv->srv_funcs);
                   1289:        AVL_INIT(&srv->srv_funcs);
1.10      misho    1290: 
                   1291:        /* init scheduler */
                   1292:        srv->srv_root = schedBegin();
                   1293:        if (!srv->srv_root) {
                   1294:                rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1.12      misho    1295:                pthread_mutex_destroy(&srv->srv_funcs.mtx);
1.14      misho    1296:                e_free(srv);
1.10      misho    1297:                return NULL;
                   1298:        }
                   1299: 
                   1300:        /* init pool for clients */
1.14      misho    1301:        srv->srv_clients = array_Init(concurentClients);
1.10      misho    1302:        if (!srv->srv_clients) {
1.14      misho    1303:                rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
1.10      misho    1304:                schedEnd(&srv->srv_root);
1.12      misho    1305:                pthread_mutex_destroy(&srv->srv_funcs.mtx);
1.14      misho    1306:                e_free(srv);
1.10      misho    1307:                return NULL;
                   1308:        }
1.4       misho    1309: 
                   1310:        /* create server socket */
1.13      misho    1311:        srv->srv_server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, srv->srv_proto, 0);
1.1       misho    1312:        if (srv->srv_server.cli_sock == -1) {
                   1313:                LOGERR;
1.14      misho    1314:                array_Destroy(&srv->srv_clients);
1.10      misho    1315:                schedEnd(&srv->srv_root);
1.12      misho    1316:                pthread_mutex_destroy(&srv->srv_funcs.mtx);
1.14      misho    1317:                e_free(srv);
1.1       misho    1318:                return NULL;
                   1319:        }
                   1320:        if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
                   1321:                LOGERR;
1.10      misho    1322:                goto err;
1.1       misho    1323:        }
1.5       misho    1324:        n = srv->srv_netbuf;
                   1325:        if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
                   1326:                LOGERR;
1.10      misho    1327:                goto err;
1.5       misho    1328:        }
                   1329:        if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
                   1330:                LOGERR;
1.10      misho    1331:                goto err;
1.5       misho    1332:        }
1.6       misho    1333:        if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa, 
                   1334:                                srv->srv_server.cli_sa.sa.sa_len) == -1) {
1.1       misho    1335:                LOGERR;
1.10      misho    1336:                goto err;
1.13      misho    1337:        } else
                   1338:                fcntl(srv->srv_server.cli_sock, F_SETFL, 
                   1339:                                fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
1.1       misho    1340: 
1.10      misho    1341:        rpc_register_srvPing(srv);
1.8       misho    1342: 
1.1       misho    1343:        return srv;
1.10      misho    1344: err:   /* error condition */
                   1345:        close(srv->srv_server.cli_sock);
1.14      misho    1346:        array_Destroy(&srv->srv_clients);
1.10      misho    1347:        schedEnd(&srv->srv_root);
1.12      misho    1348:        pthread_mutex_destroy(&srv->srv_funcs.mtx);
1.14      misho    1349:        e_free(srv);
1.10      misho    1350:        return NULL;
1.1       misho    1351: }
                   1352: 
                   1353: /*
1.7       misho    1354:  * rpc_srv_endServer() - Destroy RPC server, close all opened sockets and free resources
                   1355:  *
1.6       misho    1356:  * @psrv = RPC Server instance
1.1       misho    1357:  * return: none
                   1358:  */
1.16      misho    1359: void
1.6       misho    1360: rpc_srv_endServer(rpc_srv_t ** __restrict psrv)
1.1       misho    1361: {
1.10      misho    1362:        if (!psrv || !*psrv)
1.1       misho    1363:                return;
                   1364: 
1.10      misho    1365:        /* if send kill to blob server */
1.17      misho    1366:        rpc_srv_endBLOBServer(*psrv);
1.1       misho    1367: 
1.10      misho    1368:        (*psrv)->srv_kill = 1;
                   1369:        sleep(RPC_SCHED_POLLING);
1.2       misho    1370: 
1.17      misho    1371:        schedEnd(&(*psrv)->srv_root);
                   1372: 
1.12      misho    1373:        pthread_mutex_destroy(&(*psrv)->srv_funcs.mtx);
1.14      misho    1374:        e_free(*psrv);
1.6       misho    1375:        *psrv = NULL;
1.1       misho    1376: }
                   1377: 
                   1378: /*
1.7       misho    1379:  * rpc_srv_loopServer() - Execute Main server loop and wait for clients requests
                   1380:  *
1.1       misho    1381:  * @srv = RPC Server instance
                   1382:  * return: -1 error or 0 ok, infinite loop ...
                   1383:  */
                   1384: int
1.5       misho    1385: rpc_srv_loopServer(rpc_srv_t * __restrict srv)
1.1       misho    1386: {
1.10      misho    1387:        rpc_cli_t *c;
1.1       misho    1388:        register int i;
1.12      misho    1389:        rpc_func_t *f;
1.10      misho    1390:        struct timespec ts = { RPC_SCHED_POLLING, 0 };
1.1       misho    1391: 
                   1392:        if (!srv) {
1.10      misho    1393:                rpc_SetErr(EINVAL, "Invalid parameter can`t start RPC server");
1.1       misho    1394:                return -1;
                   1395:        }
                   1396: 
1.13      misho    1397:        if (srv->srv_proto == SOCK_STREAM)
1.14      misho    1398:                if (listen(srv->srv_server.cli_sock, array_Size(srv->srv_clients)) == -1) {
1.13      misho    1399:                        LOGERR;
                   1400:                        return -1;
                   1401:                }
1.1       misho    1402: 
1.13      misho    1403:        if (!schedRead(srv->srv_root, cbProto[srv->srv_proto][CB_ACCEPTCLIENT], srv, 
                   1404:                                srv->srv_server.cli_sock, NULL, 0)) {
1.10      misho    1405:                rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                   1406:                return -1;
                   1407:        }
1.7       misho    1408: 
1.10      misho    1409:        schedPolling(srv->srv_root, &ts, NULL);
1.7       misho    1410:        /* main rpc loop */
1.10      misho    1411:        schedRun(srv->srv_root, &srv->srv_kill);
                   1412: 
                   1413:        /* close all clients connections & server socket */
1.14      misho    1414:        for (i = 0; i < array_Size(srv->srv_clients); i++) {
                   1415:                c = array(srv->srv_clients, i, rpc_cli_t*);
1.10      misho    1416:                if (c) {
1.24      misho    1417:                        if (srv->srv_proto == SOCK_STREAM) {
                   1418:                                shutdown(c->cli_sock, SHUT_RDWR);
                   1419:                                close(c->cli_sock);
                   1420:                        }
1.10      misho    1421: 
                   1422:                        schedCancelby(srv->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
1.14      misho    1423:                        ait_freeVars(&RPC_RETVARS(c));
1.10      misho    1424:                        AIT_FREE_VAL(&c->cli_buf);
1.1       misho    1425:                }
1.14      misho    1426:                array_Del(srv->srv_clients, i, 42);
1.10      misho    1427:        }
1.14      misho    1428:        array_Destroy(&srv->srv_clients);
1.2       misho    1429: 
1.24.2.4! misho    1430:        if (srv->srv_proto != SOCK_EXT)
        !          1431:                close(srv->srv_server.cli_sock);
1.2       misho    1432: 
1.10      misho    1433:        /* detach exported calls */
1.12      misho    1434:        RPC_FUNCS_LOCK(&srv->srv_funcs);
                   1435:        while ((f = SLIST_FIRST(&srv->srv_funcs))) {
                   1436:                SLIST_REMOVE_HEAD(&srv->srv_funcs, func_next);
1.1       misho    1437: 
1.10      misho    1438:                AIT_FREE_VAL(&f->func_name);
1.14      misho    1439:                e_free(f);
1.1       misho    1440:        }
1.12      misho    1441:        srv->srv_funcs.avlh_root = NULL;
                   1442:        RPC_FUNCS_UNLOCK(&srv->srv_funcs);
1.1       misho    1443: 
                   1444:        return 0;
                   1445: }
                   1446: 
                   1447: 
                   1448: /*
                   1449:  * rpc_srv_execCall() Execute registered call from RPC server
1.7       misho    1450:  *
1.10      misho    1451:  * @cli = RPC client
1.1       misho    1452:  * @rpc = IN RPC call structure
1.10      misho    1453:  * @funcname = Execute RPC function
1.5       misho    1454:  * @args = IN RPC calling arguments from RPC client
1.1       misho    1455:  * return: -1 error, !=-1 ok
                   1456:  */
                   1457: int
1.10      misho    1458: rpc_srv_execCall(rpc_cli_t * __restrict cli, struct tagRPCCall * __restrict rpc, 
                   1459:                ait_val_t funcname, array_t * __restrict args)
1.1       misho    1460: {
                   1461:        rpc_callback_t func;
                   1462: 
1.10      misho    1463:        if (!cli || !rpc || !AIT_ADDR(&funcname)) {
1.7       misho    1464:                rpc_SetErr(EINVAL, "Invalid parameter can`t exec function");
1.1       misho    1465:                return -1;
                   1466:        }
                   1467: 
1.10      misho    1468:        func = AIT_GET_LIKE(&funcname, rpc_callback_t);
                   1469:        return func(cli, rpc, args);
1.1       misho    1470: }
1.24      misho    1471: 
                   1472: 
                   1473: /*
                   1474:  * rpc_srv_initServer2() - Init & create layer2 RPC Server
                   1475:  *
                   1476:  * @InstID = Instance for authentication & recognition
                   1477:  * @concurentClients = Concurent clients at same time to this server
                   1478:  * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
                   1479:  * @csIface = Interface name for bind server, if NULL first interface on host
                   1480:  * return: NULL == error or !=NULL bind and created RPC server instance
                   1481:  */
                   1482: rpc_srv_t *
                   1483: rpc_srv_initServer2(u_char InstID, int concurentClients, int netBuf, const char *csIface)
                   1484: {
                   1485:        int n = 1;
                   1486:        rpc_srv_t *srv = NULL;
                   1487:        sockaddr_t sa = E_SOCKADDR_INIT;
                   1488:        char szIface[64], szStr[STRSIZ];
                   1489:        register int i;
                   1490:        struct ifreq ifr;
                   1491:        struct bpf_insn insns[] = {
                   1492:                BPF_STMT(BPF_LD + BPF_H + BPF_ABS, 12),
                   1493:                BPF_JUMP(BPF_JMP + BPF_JEQ + BPF_K, RPC_DEFPORT, 0, 1),
                   1494:                BPF_STMT(BPF_RET + BPF_K, -1),
                   1495:                BPF_STMT(BPF_RET + BPF_K, 0),
                   1496:        };
                   1497:        struct bpf_program fcode = { 
                   1498:                .bf_len = sizeof(insns) / sizeof(struct bpf_insn), 
                   1499:                        .bf_insns = insns
                   1500:        };
                   1501: 
                   1502:        if (!concurentClients) {
                   1503:                rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
                   1504:                return NULL;
                   1505:        }
                   1506:        if (!csIface) {
                   1507:                if (e_get1stiface(szIface, sizeof szIface))
                   1508:                        return NULL;
                   1509:        } else
                   1510:                strlcpy(szIface, csIface, sizeof szIface);
                   1511:        if (!e_getifacebyname(szIface, &sa))
                   1512:                return NULL;
                   1513: 
                   1514: #ifdef HAVE_SRANDOMDEV
                   1515:        srandomdev();
                   1516: #else
                   1517:        time_t tim;
                   1518: 
                   1519:        srandom((time(&tim) ^ getpid()));
                   1520: #endif
                   1521: 
                   1522:        srv = e_malloc(sizeof(rpc_srv_t));
                   1523:        if (!srv) {
                   1524:                LOGERR;
                   1525:                return NULL;
                   1526:        } else
                   1527:                memset(srv, 0, sizeof(rpc_srv_t));
                   1528: 
                   1529:        srv->srv_proto = SOCK_BPF;
                   1530:        srv->srv_netbuf = netBuf;
                   1531:        srv->srv_session.sess_version = RPC_VERSION;
                   1532:        srv->srv_session.sess_instance = InstID;
                   1533: 
                   1534:        srv->srv_server.cli_parent = srv;
                   1535:        memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa);
                   1536: 
                   1537:        /* init functions */
                   1538:        pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
                   1539:        SLIST_INIT(&srv->srv_funcs);
                   1540:        AVL_INIT(&srv->srv_funcs);
                   1541: 
                   1542:        /* init scheduler */
                   1543:        srv->srv_root = schedBegin();
                   1544:        if (!srv->srv_root) {
                   1545:                rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                   1546:                pthread_mutex_destroy(&srv->srv_funcs.mtx);
                   1547:                e_free(srv);
                   1548:                return NULL;
                   1549:        }
                   1550: 
                   1551:        /* init pool for clients */
                   1552:        srv->srv_clients = array_Init(concurentClients);
                   1553:        if (!srv->srv_clients) {
                   1554:                rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
                   1555:                schedEnd(&srv->srv_root);
                   1556:                pthread_mutex_destroy(&srv->srv_funcs.mtx);
                   1557:                e_free(srv);
                   1558:                return NULL;
                   1559:        }
                   1560: 
                   1561:        /* create server handler */
                   1562:        for (i = 0; i < 10; i++) {
                   1563:                memset(szStr, 0, sizeof szStr);
                   1564:                snprintf(szStr, sizeof szStr, "/dev/bpf%d", i);
                   1565:                srv->srv_server.cli_sock = open(szStr, O_RDWR);
                   1566:                if (srv->srv_server.cli_sock > STDERR_FILENO)
                   1567:                        break;
                   1568:        }
                   1569:        if (srv->srv_server.cli_sock < 3) {
                   1570:                LOGERR;
                   1571:                array_Destroy(&srv->srv_clients);
                   1572:                schedEnd(&srv->srv_root);
                   1573:                pthread_mutex_destroy(&srv->srv_funcs.mtx);
                   1574:                e_free(srv);
                   1575:                return NULL;
                   1576:        }
                   1577: 
                   1578:        if (ioctl(srv->srv_server.cli_sock, BIOCIMMEDIATE, &n) == -1) {
                   1579:                LOGERR;
                   1580:                goto err;
                   1581:        }
                   1582:        if (ioctl(srv->srv_server.cli_sock, BIOCSETF, &fcode) == -1) {
                   1583:                LOGERR;
                   1584:                goto err;
                   1585:        }
                   1586:        n = (netBuf < RPC_MIN_BUFSIZ) ? getpagesize() : E_ALIGN(netBuf, 2);
                   1587:        if (ioctl(srv->srv_server.cli_sock, BIOCSBLEN, &n) == -1) {
                   1588:                LOGERR;
                   1589:                goto err;
                   1590:        } else
                   1591:                srv->srv_netbuf = n;
                   1592: 
                   1593:        memset(&ifr, 0, sizeof ifr);
                   1594:        strlcpy(ifr.ifr_name, szIface, sizeof ifr.ifr_name);
                   1595:        if (ioctl(srv->srv_server.cli_sock, BIOCSETIF, &ifr) == -1) {
                   1596:                LOGERR;
                   1597:                goto err;
                   1598:        } else
                   1599:                fcntl(srv->srv_server.cli_sock, F_SETFL, 
                   1600:                                fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
                   1601: 
                   1602:        rpc_register_srvPing(srv);
                   1603: 
                   1604:        return srv;
                   1605: err:   /* error condition */
                   1606:        close(srv->srv_server.cli_sock);
                   1607:        array_Destroy(&srv->srv_clients);
                   1608:        schedEnd(&srv->srv_root);
                   1609:        pthread_mutex_destroy(&srv->srv_funcs.mtx);
                   1610:        e_free(srv);
                   1611:        return NULL;
                   1612: }
1.24.2.3  misho    1613: 
                   1614: /*
                   1615:  * rpc_srv_initServerExt() - Init & create pipe RPC Server
                   1616:  *
                   1617:  * @InstID = Instance for authentication & recognition
                   1618:  * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
                   1619:  * @fd = File descriptor
                   1620:  * return: NULL == error or !=NULL bind and created RPC server instance
                   1621:  */
                   1622: rpc_srv_t *
                   1623: rpc_srv_initServerExt(u_char InstID, int netBuf, int fd)
                   1624: {
                   1625:        rpc_srv_t *srv = NULL;
                   1626: 
                   1627: #ifdef HAVE_SRANDOMDEV
                   1628:        srandomdev();
                   1629: #else
                   1630:        time_t tim;
                   1631: 
                   1632:        srandom((time(&tim) ^ getpid()));
                   1633: #endif
                   1634: 
                   1635:        srv = e_malloc(sizeof(rpc_srv_t));
                   1636:        if (!srv) {
                   1637:                LOGERR;
                   1638:                return NULL;
                   1639:        } else
                   1640:                memset(srv, 0, sizeof(rpc_srv_t));
                   1641: 
                   1642:        srv->srv_proto = SOCK_EXT;
                   1643:        srv->srv_netbuf = netBuf;
                   1644:        srv->srv_session.sess_version = RPC_VERSION;
                   1645:        srv->srv_session.sess_instance = InstID;
                   1646: 
                   1647:        srv->srv_server.cli_parent = srv;
                   1648:        srv->srv_server.cli_sock = fd;
                   1649: 
                   1650:        /* init functions */
                   1651:        pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
                   1652:        SLIST_INIT(&srv->srv_funcs);
                   1653:        AVL_INIT(&srv->srv_funcs);
                   1654: 
                   1655:        /* init scheduler */
                   1656:        srv->srv_root = schedBegin();
                   1657:        if (!srv->srv_root) {
                   1658:                rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                   1659:                pthread_mutex_destroy(&srv->srv_funcs.mtx);
                   1660:                e_free(srv);
                   1661:                return NULL;
                   1662:        }
                   1663: 
                   1664:        /* init pool for clients */
                   1665:        srv->srv_clients = array_Init(1);
                   1666:        if (!srv->srv_clients) {
                   1667:                rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
                   1668:                schedEnd(&srv->srv_root);
                   1669:                pthread_mutex_destroy(&srv->srv_funcs.mtx);
                   1670:                e_free(srv);
                   1671:                return NULL;
                   1672:        }
                   1673: 
                   1674:        fcntl(srv->srv_server.cli_sock, F_SETFL, 
                   1675:                        fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
                   1676: 
                   1677:        rpc_register_srvPing(srv);
                   1678: 
                   1679:        return srv;
                   1680: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>