Annotation of mqtt/src/mqtt_subs.c, revision 1.4

1.4     ! misho       1: /*************************************************************************
        !             2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
        !             3: *  by Michael Pounov <misho@openbsd-bg.org>
        !             4: *
        !             5: * $Author: misho $
        !             6: * $Id: mqtt_subs.c,v 1.3.2.1 2012/07/03 12:22:56 misho Exp $
        !             7: *
        !             8: **************************************************************************
        !             9: The ELWIX and AITNET software is distributed under the following
        !            10: terms:
        !            11: 
        !            12: All of the documentation and software included in the ELWIX and AITNET
        !            13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
        !            14: 
        !            15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
        !            16:        by Michael Pounov <misho@elwix.org>.  All rights reserved.
        !            17: 
        !            18: Redistribution and use in source and binary forms, with or without
        !            19: modification, are permitted provided that the following conditions
        !            20: are met:
        !            21: 1. Redistributions of source code must retain the above copyright
        !            22:    notice, this list of conditions and the following disclaimer.
        !            23: 2. Redistributions in binary form must reproduce the above copyright
        !            24:    notice, this list of conditions and the following disclaimer in the
        !            25:    documentation and/or other materials provided with the distribution.
        !            26: 3. All advertising materials mentioning features or use of this software
        !            27:    must display the following acknowledgement:
        !            28: This product includes software developed by Michael Pounov <misho@elwix.org>
        !            29: ELWIX - Embedded LightWeight unIX and its contributors.
        !            30: 4. Neither the name of AITNET nor the names of its contributors
        !            31:    may be used to endorse or promote products derived from this software
        !            32:    without specific prior written permission.
        !            33: 
        !            34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
        !            35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
        !            36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
        !            37: ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
        !            38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
        !            39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
        !            40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
        !            41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
        !            42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
        !            43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
        !            44: SUCH DAMAGE.
        !            45: */
1.2       misho      46: #include "global.h"
                     47: #include "rtlm.h"
                     48: #include "mqtt.h"
                     49: #include "client.h"
                     50: 
                     51: 
                     52: io_enableDEBUG;
                     53: 
                     54: extern char compiled[], compiledby[], compilehost[];
1.3       misho      55: volatile intptr_t Kill;
                     56: sched_root_task_t *root;
1.2       misho      57: 
                     58: struct tagArgs *args;
                     59: 
                     60: 
                     61: static void
                     62: Usage(void)
                     63: {
                     64:        printf( " -= MQTT Subscriber Client =- Subscriber from ELWIX\n"
                     65:                "=== %s@%s === Compiled: %s ===\n\n"
1.3       misho      66:                " Syntax: mqtt_subs [options] <connect_to_broker[:port]> <ConnectID> [exec_script <value>]\n\n"
1.2       misho      67:                "\t-l <value2file>\t\tSave received values to file\n"
1.3       misho      68:                "\t-u\t\t\tUnsubscribe given topic(s)\n"
1.2       misho      69:                "\t-s <topic[|QoS]>\tSubscribe for this topic, if wish add different |QoS to topic\n"
                     70:                "\t-d\t\t\tSend duplicate message\n\n"
                     71:                "\t-C\t\t\tNot clear before connect!\n"
                     72:                "\t-p <port>\t\tDifferent port for connect (default: 1883)\n"
                     73:                "\t-T <timeout>\t\tKeep alive timeout in seconds\n"
                     74:                "\t-U <username>\t\tUsername\n"
                     75:                "\t-P <password>\t\tPassword\n"
                     76:                "\t-W <topic>\t\tWill Topic\n"
                     77:                "\t-M <message>\t\tWill Message\n\n"
                     78:                "\t-D\t\t\tDaemon mode\n"
                     79:                "\t-v\t\t\tVerbose (more -vvv, more verbose)\n"
                     80:                "\t-h\t\t\tHelp! This screen\n\n", 
                     81:                compiledby, compilehost, compiled);
                     82: }
                     83: 
                     84: static void
                     85: cleanArgs(struct tagArgs * __restrict args)
                     86: {
                     87:        mqtt_msgFree(&args->msg, 42);
1.3       misho      88:        mqtt_subFree(&args->subscr);
1.2       misho      89:        AIT_FREE_VAL(&args->Will.Msg);
                     90:        AIT_FREE_VAL(&args->Will.Topic);
                     91:        AIT_FREE_VAL(&args->User);
                     92:        AIT_FREE_VAL(&args->Pass);
                     93:        AIT_FREE_VAL(&args->Publish);
                     94:        AIT_FREE_VAL(&args->Value);
                     95:        AIT_FREE_VAL(&args->ConnID);
                     96: }
                     97: 
                     98: static int
                     99: Subscribe(int sock, FILE *lf)
                    100: {
1.3       misho     101:        u_char *qoses, *qos;
                    102:        u_short mid;
                    103:        mqtt_subscr_t *sub;
                    104: 
                    105: #ifdef __NetBSD__
                    106:        srandom(getpid() ^ time(NULL));
                    107: #else
                    108:        srandomdev();
                    109: #endif
                    110:        mid = random() % USHRT_MAX;
                    111: 
                    112:        printf(" > Execute SUBSCRIBE request #%d ... ", mid);
                    113:        qoses = mqtt_cli_Subscribe(args->cli, args->subscr, mid, args->Dup, MQTT_QOS_ACK);
                    114:        if (!qoses) {
                    115:                printf("Error:: Subscribe #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
                    116:                return -1;
                    117:        } else
                    118:                printf("OK\n");
                    119: 
                    120:        for (sub = args->subscr, qos = qoses; sub->sub_topic.msg_base; sub++, qos++)
                    121:                printf("  + Topic %s with QoS %d subscribe %s\n", (char*)
                    122:                                sub->sub_topic.msg_base, sub->sub_ret, *qos ? "done" : "failed");
                    123: 
                    124:        free(qoses);
                    125:        return 0;
                    126: }
                    127: 
                    128: static int
                    129: Unsubscribe(int sock)
                    130: {
                    131:        u_short mid;
                    132: 
                    133: #ifdef __NetBSD__
                    134:        srandom(getpid() ^ time(NULL));
                    135: #else
                    136:        srandomdev();
                    137: #endif
                    138:        mid = random() % USHRT_MAX;
                    139: 
                    140:        printf(" > Execute UNSUBSCRIBE request #%d ... ", mid);
                    141:        if (mqtt_cli_Unsubscribe(args->cli, args->subscr, mid, args->Dup, MQTT_QOS_ACK)) {
                    142:                printf("Error:: Unsubscribe #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
                    143:                return -1;
                    144:        } else
                    145:                printf("OK\n");
                    146: 
1.2       misho     147:        return 0;
                    148: }
                    149: 
1.3       misho     150: static void
                    151: sigz(int sig)
                    152: {
                    153:        int stat;
                    154: 
                    155:        switch (sig) {
                    156:                case SIGINT:
                    157:                case SIGTERM:
                    158:                        Kill++;
                    159:                        break;
                    160:                case SIGCHLD:
                    161:                        while (waitpid(-1, &stat, WNOHANG) > 0);
                    162:                        break;
                    163:        }
                    164: }
                    165: 
                    166: 
                    167: static void *
                    168: execProc(sched_task_t *task)
                    169: {
                    170:        FILE *f;
                    171:        char szLine[MAXPATHLEN + BUFSIZ] = { 0 };
                    172: 
                    173:        snprintf(szLine, sizeof szLine, "%s '%s' %hu %u", AIT_GET_STR(&args->Value), 
                    174:                        (char*) TASK_ARG(task), (u_short) TASK_VAL(task), (u_int) TASK_DATLEN(task));
                    175:        if (TASK_ARG(task))
                    176:                io_free(TASK_ARG(task));
                    177: 
                    178:        f = popen(szLine, "w");
                    179:        if (!f) {
                    180:                ioSYSERR(0);
                    181:                return NULL;
                    182:        } else
                    183:                fputs(TASK_DATA(task), f);
                    184:        pclose(f);
                    185:        return NULL;
                    186: }
                    187: 
                    188: static void *
                    189: pubRX(sched_task_t *task)
                    190: {
                    191:        int siz, rlen;
                    192:        char szTime[STRSIZ] = { 0 }, szTopic[STRSIZ] = { 0 };
                    193:        void *data = NULL;
                    194:        u_short mid;
                    195:        time_t tim;
                    196:        struct mqtthdr *hdr;
                    197: 
                    198:        rlen = RecvFrom(TASK_FD(task));
                    199:        if (rlen == -1)
                    200:                goto end;
                    201:        if (!rlen) {
                    202:                Kill++;
                    203:                return NULL;
                    204:        }
                    205: 
                    206:        while (rlen > 0) {
                    207:                hdr = (struct mqtthdr*) args->msg->msg_base;
                    208: 
                    209:                switch (hdr->mqtt_msg.type) {
                    210:                        case MQTT_TYPE_PUBLISH:
                    211:                                siz = mqtt_readPUBLISH(args->msg, szTopic, sizeof szTopic, &mid, &data);
                    212:                                if (siz == -1)
                    213:                                        goto end;
                    214:                                else {
                    215:                                        siz = mqtt_pktLen(hdr);
                    216:                                        rlen -= siz;
                    217:                                        ioVERBOSE(4) printf("Remains %d bytes, packet %d bytes\n", 
                    218:                                                        rlen, siz);
                    219:                                }
                    220: 
                    221:                                /* send to output */
                    222:                                tim = time(NULL);
                    223:                                strftime(szTime, sizeof szTime, "%Y-%m-%d %H:%M:%S", localtime(&tim));
                    224:                                fprintf(TASK_ARG(task), "\n[%s] Message ID: %04hu, QoS: %hhu, "
                    225:                                                "Length: %u, Topic: %s\n", szTime, mid, hdr->mqtt_msg.qos, 
                    226:                                                siz, szTopic);
                    227: 
                    228:                                if (data) {
                    229:                                        fputs((const char*) data, TASK_ARG(task));
                    230:                                        free(data);
                    231:                                }
                    232: 
                    233:                                fprintf(TASK_ARG(task), "\n");
                    234:                                fflush(TASK_ARG(task));
                    235: 
                    236:                                /* if exists exec script */
                    237:                                if (!AIT_ISEMPTY(&args->Value))
                    238:                                        schedEvent(root, execProc, io_strdup(szTopic), mid, data, siz);
                    239: 
                    240:                                memmove(args->msg->msg_base, args->msg->msg_base + siz, rlen);
                    241:                                break;
                    242:                        case MQTT_TYPE_PINGREQ:
                    243:                                siz = mqtt_msgPINGRESP(args->msg);
                    244:                                if (siz == -1)
                    245:                                        goto end;
                    246:                                else
                    247:                                        rlen -= siz;
                    248: 
                    249:                                /* send ping reply */
                    250:                                if (SendTo(TASK_FD(task), siz) == -1)
                    251:                                        goto end;
                    252: 
                    253:                                memmove(args->msg->msg_base, args->msg->msg_base + siz, rlen);
                    254:                                break;
                    255:                        default:
                    256:                                ioVERBOSE(1) printf("Unwanted message type #%d ...\n", hdr->mqtt_msg.type);
                    257:                                goto end;
                    258:                }
                    259:        }
                    260: end:
                    261:        schedReadSelf(task);
                    262:        return NULL;
                    263: }
                    264: 
1.2       misho     265: 
                    266: int
                    267: main(int argc, char **argv)
                    268: {
1.3       misho     269:        char ch, un = 0, idx = 0, batch = 1;
                    270:        ait_val_t val;
1.2       misho     271:        u_short port = atoi(MQTT_PORT);
1.3       misho     272:        mqtt_subscr_t *sub;
                    273:        int ret = 0;
                    274:        char *str, szStr[STRSIZ], szLogName[MAXPATHLEN] = { 0 };
1.2       misho     275:        FILE *lf;
1.3       misho     276:        struct sigaction sa;
1.2       misho     277: 
1.3       misho     278:        if (!(args = io_malloc(sizeof(struct tagArgs)))) {
1.2       misho     279:                printf("Error:: in arguments #%d - %s\n", errno, strerror(errno));
                    280:                return 1;
                    281:        } else
                    282:                memset(args, 0, sizeof(struct tagArgs));
1.3       misho     283:        if (!(args->subscr = mqtt_subAlloc(idx))) {
                    284:                printf("Error:: in subscribes array #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
                    285:                io_free(args);
1.2       misho     286:                return 1;
                    287:        } else
                    288:                args->free = cleanArgs;
                    289: 
                    290:        if (!(args->msg = mqtt_msgAlloc(USHRT_MAX))) {
                    291:                printf("Error:: in mqtt buffer #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
                    292:                args->free(args);
1.3       misho     293:                io_free(args);
1.2       misho     294:                return 1;
                    295:        }
                    296: 
                    297:        AIT_SET_STR(&args->ConnID, "");
                    298:        AIT_SET_STR(&args->User, "");
                    299:        AIT_SET_STR(&args->Pass, "");
                    300: 
                    301:        args->ka = MQTT_KEEPALIVE;
1.3       misho     302:        while ((ch = getopt(argc, argv, "T:U:P:p:s:q:dl:W:M:CDvuh")) != -1)
1.2       misho     303:                switch (ch) {
                    304:                        case 'T':
                    305:                                args->ka = (u_short) strtol(optarg, NULL, 0);
                    306:                                break;
                    307:                        case 'M':
                    308:                                AIT_FREE_VAL(&args->Will.Msg);
                    309:                                AIT_SET_STR(&args->Will.Msg, optarg);
                    310:                                break;
                    311:                        case 'W':
                    312:                                AIT_FREE_VAL(&args->Will.Topic);
                    313:                                AIT_SET_STR(&args->Will.Topic, optarg);
                    314:                                break;
                    315:                        case 'U':
                    316:                                AIT_FREE_VAL(&args->User);
                    317:                                AIT_SET_STR(&args->User, optarg);
                    318:                                break;
                    319:                        case 'P':
                    320:                                AIT_FREE_VAL(&args->Pass);
                    321:                                AIT_SET_STR(&args->Pass, optarg);
                    322:                                break;
                    323:                        case 'p':
                    324:                                port = (u_short) strtol(optarg, NULL, 0);
                    325:                                break;
                    326:                        case 's':
1.3       misho     327:                                sub = mqtt_subRealloc(&args->subscr, idx + 1);
                    328:                                if (!sub) {
                    329:                                        printf("Error:: #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
1.2       misho     330:                                        args->free(args);
1.3       misho     331:                                        io_free(args);
1.2       misho     332:                                        return 1;
                    333:                                } else
1.3       misho     334:                                        sub += idx++;
                    335: 
                    336:                                strlcpy(szStr, optarg, sizeof szStr);
                    337:                                if ((str = strchr(szStr, '|'))) {
                    338:                                        *str++ = 0;
                    339:                                        *str -= 0x30;
                    340:                                        if (*str < 0 || *str > MQTT_QOS_RESERVED)
                    341:                                                sub->sub_ret = (u_char) args->QoS;
                    342:                                        else
                    343:                                                sub->sub_ret = (u_char) *str;
                    344:                                } else
                    345:                                        sub->sub_ret = (u_char) args->QoS;
                    346:                                sub->sub_topic.msg_base = strdup(szStr);
                    347:                                sub->sub_topic.msg_len = strlen(szStr);
1.2       misho     348:                                break;
                    349:                        case 'q':
                    350:                                args->QoS = (char) strtol(optarg, NULL, 0);
                    351:                                if (args->QoS > MQTT_QOS_EXACTLY) {
                    352:                                        printf("Error:: invalid QoS level %d\n", args->QoS);
                    353:                                        args->free(args);
1.3       misho     354:                                        io_free(args);
1.2       misho     355:                                        return 1;
                    356:                                }
                    357:                                break;
                    358:                        case 'd':
                    359:                                args->Dup++;
                    360:                                break;
                    361:                        case 'C':
                    362:                                args->notClear++;
                    363:                                break;
                    364:                        case 'l':
                    365:                                strlcpy(szLogName, optarg, sizeof szLogName);
                    366:                                break;
                    367:                        case 'D':
                    368:                                batch = 0;
                    369:                                break;
                    370:                        case 'v':
                    371:                                io_incDebug;
                    372:                                break;
1.3       misho     373:                        case 'u':
                    374:                                un = 1;
                    375:                                break;
1.2       misho     376:                        case 'h':
                    377:                        default:
                    378:                                args->free(args);
1.3       misho     379:                                io_free(args);
1.2       misho     380:                                Usage();
                    381:                                return 1;
                    382:                }
                    383:        argc -= optind;
                    384:        argv += optind;
1.3       misho     385:        if (argc < 2) {
1.2       misho     386:                printf("Error:: host for connect not found, connection id or topic not supplied!\n\n");
                    387:                args->free(args);
1.3       misho     388:                io_free(args);
1.2       misho     389:                Usage();
                    390:                return 1;
                    391:        } else {
                    392:                AIT_FREE_VAL(&args->ConnID);
                    393:                AIT_SET_STR(&args->ConnID, argv[1]);
                    394:        }
1.3       misho     395:        if (argc > 2) {
1.2       misho     396:                AIT_FREE_VAL(&args->Value);
1.3       misho     397:                AIT_SET_STR(&args->Value, argv[2]);
1.2       misho     398:        }
                    399:        if (!io_gethostbyname(*argv, port, &args->addr)) {
                    400:                printf("Error:: host not valid #%d - %s\n", io_GetErrno(), io_GetError());
                    401:                args->free(args);
1.3       misho     402:                io_free(args);
1.2       misho     403:                Usage();
                    404:                return 1;
                    405:        }
1.3       misho     406:        printf("Connecting to %s:%d ... ", io_n2addr(&args->addr, &val), io_n2port(&args->addr));
                    407:        AIT_FREE_VAL(&val);
1.2       misho     408: 
1.3       misho     409:        sa.sa_handler = sigz;
                    410:        sigemptyset(&sa.sa_mask);
                    411:        sigaction(SIGTERM, &sa, NULL);
                    412:        sigaction(SIGINT, &sa, NULL);
                    413:        sigaction(SIGCHLD, &sa, NULL);
                    414: 
                    415:        if (!batch)
                    416:                switch (fork()) {
                    417:                        case -1:        /* error */
                    418:                                printf("Error:: in fork() #%d - %s\n", errno, strerror(errno));
                    419:                                ret = 2;
                    420:                                goto end;
                    421:                        case 0:         /* child */
                    422:                                setsid();
                    423: 
                    424:                                ret = open("/dev/null", O_RDWR);
                    425:                                if (ret != -1) {
                    426:                                        dup2(ret, STDIN_FILENO);
                    427:                                        dup2(ret, STDOUT_FILENO);
                    428:                                        dup2(ret, STDERR_FILENO);
                    429:                                        close(ret);
                    430:                                }
                    431:                                break;
                    432:                        default:        /* parent */
                    433:                                printf(">> Service started\n");
                    434:                                ret = 0;
                    435:                                goto end;
                    436:                }
                    437: 
                    438:        if (!(args->cli = mqtt_cli_Open(&args->addr.sa, args->ka))) {
                    439:                ret = 2;
                    440:                goto end;
1.2       misho     441:        }
                    442: 
1.3       misho     443:        switch ((ret = ConnectClient(args->cli->sock))) {
1.2       misho     444:                case -1:
                    445:                        printf(">> FAILED!\n");
                    446:                        break;
                    447:                case MQTT_RETCODE_ACCEPTED:
                    448:                        printf(">> OK\n");
                    449:                        break;
                    450:                case MQTT_RETCODE_REFUSE_VER:
                    451:                        printf(">> Incorrect version\n");
                    452:                        break;
                    453:                case MQTT_RETCODE_REFUSE_ID:
                    454:                        printf(">> Incorrect connectID\n");
                    455:                        break;
                    456:                case MQTT_RETCODE_REFUSE_UNAVAIL:
                    457:                        printf(">> Service unavailable\n");
                    458:                        break;
                    459:                case MQTT_RETCODE_REFUSE_USERPASS:
                    460:                        printf(">> Refuse user/pass\n");
                    461:                        break;
                    462:                case MQTT_RETCODE_DENIED:
                    463:                        printf(">> DENIED.\n");
                    464:                        break;
                    465:        }
                    466: 
                    467:        if (ret == MQTT_RETCODE_ACCEPTED) {
                    468:                if (*szLogName)
                    469:                        lf = fopen(szLogName, "w");
                    470:                else
                    471:                        lf = stdout;
                    472:                if (lf) {
1.3       misho     473:                        ret = Subscribe(args->cli->sock, lf);
                    474: 
                    475:                        root = schedBegin();
                    476: 
                    477:                        schedRead(root, pubRX, lf, args->cli->sock, NULL, 0);
                    478:                        schedRun(root, &Kill);
                    479: 
                    480:                        schedEnd(&root);
                    481: 
                    482:                        if (un)
                    483:                                Unsubscribe(args->cli->sock);
1.2       misho     484:                        fclose(lf);
                    485:                } else
1.3       misho     486:                        printf("Error:: in output file #%d - %s\n", errno, strerror(errno));
                    487:        } else
1.2       misho     488:                ret = 3;
                    489: 
1.3       misho     490:        mqtt_cli_Close(&args->cli);
                    491: end:
1.2       misho     492:        args->free(args);
1.3       misho     493:        io_free(args);
1.2       misho     494:        return ret;
                    495: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>