Annotation of mqtt/src/mqttd_calls.c, revision 1.2.2.24

1.2       misho       1: #include "global.h"
                      2: #include "mqttd.h"
1.2.2.1   misho       3: #include "rtlm.h"
1.2       misho       4: #include "mqttd_calls.h"
                      5: 
                      6: 
1.2.2.19  misho       7: static inline struct tagPkt *
                      8: mkPkt(void * __restrict data, int dlen)
                      9: {
                     10:        struct tagPkt *p = NULL;
                     11: 
                     12:        p = io_malloc(sizeof(struct tagPkt));
                     13:        if (!p) {
                     14:                ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());
                     15:                return NULL;
                     16:        } else
                     17:                memset(p, 0, sizeof(struct tagPkt));
                     18: 
                     19:        p->pkt_data = io_allocVar();
                     20:        if (!p->pkt_data) {
                     21:                ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());
                     22:                io_free(p);
                     23:                return NULL;
                     24:        }
                     25: 
                     26:        if (data && dlen > 0)
                     27:                AIT_SET_BUF(p->pkt_data, data, dlen);
                     28: 
                     29:        return p;
                     30: }
                     31: 
                     32: static inline void
1.2.2.20  misho      33: freePkt(struct tagPkt * __restrict p)
1.2.2.19  misho      34: {
1.2.2.20  misho      35:        if (!p)
1.2.2.19  misho      36:                return;
                     37: 
1.2.2.20  misho      38:        io_freeVar(&p->pkt_data);
                     39:        io_free(p);
1.2.2.19  misho      40: }
                     41: 
                     42: static void *
                     43: sendPacket(sched_task_t *task)
                     44: {
                     45:        struct tagPkt *p = TASK_ARG(task);
                     46:        register int n, slen;
                     47:        u_char *pos;
                     48: 
                     49:        if (!p || !p->pkt_data || AIT_ISEMPTY(p->pkt_data)) {
                     50:                ioDEBUG(9, "Error:: invalid packet or found empty content ...");
                     51:                return NULL;
                     52:        }
                     53: 
                     54:        for (slen = AIT_LEN(p->pkt_data), pos = AIT_GET_BUF(p->pkt_data); slen > 0; 
                     55:                        slen -= n, pos += n) {
                     56:                n = send(TASK_FD(task), pos, slen, MSG_NOSIGNAL);
                     57:                if (n == -1) {
                     58:                        ioSYSERR(0);
                     59:                        break;
                     60:                }
                     61:        }
                     62: 
1.2.2.20  misho      63:        freePkt(p);
1.2.2.19  misho      64:        return NULL;
                     65: }
                     66: 
1.2.2.22  misho      67: /* --------------------------------------------------- */
1.2.2.19  misho      68: 
1.2.2.17  misho      69: static int
1.2.2.22  misho      70: pubOnce(struct tagSession *sess, char * __restrict psTopic, int datlen)
1.2.2.17  misho      71: {
1.2.2.22  misho      72:        struct tagPkt *p = NULL;
                     73:        struct tagSession *s = NULL;
                     74:        struct tagStore *st = NULL;
                     75:        regex_t re;
                     76:        regmatch_t match;
                     77:        int ret;
                     78:        char szStr[STRSIZ];
                     79: 
                     80:        TAILQ_FOREACH(s, &Sessions, sess_node) {
                     81:                SLIST_FOREACH(st, &s->sess_subscr, st_node) {
                     82:                        if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
                     83:                                regerror(ret, &re, szStr, sizeof szStr);
                     84:                                regfree(&re);
                     85:                                ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*)
                     86:                                                st->st_subscr.sub_topic.msg_base, szStr);
                     87:                        }
                     88:                        if (!regexec(&re, psTopic, 1, &match, 0)) {
                     89:                                /* MATCH */
1.2.2.23  misho      90:                                ioDEBUG(1, "+++ dlen=%d\n", datlen);
1.2.2.22  misho      91:                                p = mkPkt(sess->sess_buf->msg_base, datlen);
1.2.2.23  misho      92:                                schedWrite(root, sendPacket, p, s->sess_sock, NULL, 0);
1.2.2.22  misho      93:                        }
                     94: 
                     95:                        regfree(&re);
                     96:                }
                     97:        }
                     98: 
1.2.2.17  misho      99:        return 0;
                    100: }
                    101: 
                    102: static int
1.2.2.22  misho     103: pubAck(struct tagSession *sess, char * __restrict psTopic, int datlen)
1.2.2.17  misho     104: {
                    105:        return 0;
                    106: }
                    107: 
                    108: static int
1.2.2.22  misho     109: pubExactly(struct tagSession *sess, char * __restrict psTopic, int datlen)
1.2.2.17  misho     110: {
                    111:        return 0;
                    112: }
                    113: 
                    114: 
1.2       misho     115: int
1.2.2.9   misho     116: cmdPUBLISH(void *srv, int len, void *arg)
1.2       misho     117: {
                    118:        struct mqtthdr *hdr;
1.2.2.1   misho     119:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.17  misho     120:        char szTopic[STRSIZ] = { 0 };
                    121:        int siz = 0;
                    122:        u_short mid = 0;
1.2.2.22  misho     123:        struct tagPkt *p = NULL;
1.2       misho     124: 
                    125:        ioTRACE(2);
                    126: 
                    127:        if (!sess)
                    128:                return -1;
                    129: 
1.2.2.17  misho     130:        ioDEBUG(5, "Exec PUBLISH session");
1.2.2.22  misho     131:        siz = mqtt_readPUBLISH(sess->sess_buf, szTopic, sizeof szTopic, &mid, NULL);
1.2.2.17  misho     132:        if (siz == -1) {
                    133:                ioDEBUG(5, "Error:: in readPUBLISH #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    134:                return 0;
                    135:        }
                    136: 
1.2.2.22  misho     137:        /* duplicate packet for retransmit to subscribers */
                    138:        /*
                    139:        pubpkt = mqtt_msgDup(sess->sess_buf);
                    140:        if (!pubpkt) {
                    141:                ioDEBUG(5, "Error:: in duplicate packet #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    142:                return 0;
                    143:        } else
                    144:        */
                    145: 
1.2       misho     146:        hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
                    147:        switch (hdr->mqtt_msg.qos) {
                    148:                case MQTT_QOS_ACK:
1.2.2.23  misho     149:                        pubAck(sess, szTopic, mqtt_pktLen(hdr));
1.2.2.17  misho     150:                        siz = mqtt_msgPUBACK(sess->sess_buf, mid);
                    151:                        if (siz == -1) {
                    152:                                ioDEBUG(5, "Error:: in msgPUBACK #%d - %s", 
                    153:                                                mqtt_GetErrno(), mqtt_GetError());
1.2.2.22  misho     154:                                return 0;
1.2.2.17  misho     155:                        }
1.2       misho     156:                        break;
                    157:                case MQTT_QOS_EXACTLY:
1.2.2.23  misho     158:                        pubExactly(sess, szTopic, mqtt_pktLen(hdr));
1.2.2.17  misho     159:                        siz = mqtt_msgPUBREC(sess->sess_buf, mid);
                    160:                        if (siz == -1) {
                    161:                                ioDEBUG(5, "Error:: in msgPUBREC #%d - %s", 
                    162:                                                mqtt_GetErrno(), mqtt_GetError());
1.2.2.22  misho     163:                                return 0;
1.2.2.17  misho     164:                        }
1.2       misho     165:                        break;
1.2.2.17  misho     166:                case MQTT_QOS_ONCE:
1.2.2.23  misho     167:                        pubOnce(sess, szTopic, mqtt_pktLen(hdr));
1.2       misho     168:                default:
1.2.2.22  misho     169:                        return 0;
1.2       misho     170:        }
                    171: 
1.2.2.22  misho     172:        p = mkPkt(sess->sess_buf->msg_base, siz);
                    173:        memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    174:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2       misho     175:        return 0;
                    176: }
1.2.2.1   misho     177: 
                    178: int
1.2.2.9   misho     179: cmdPUBREL(void *srv, int len, void *arg)
1.2.2.1   misho     180: {
                    181:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.17  misho     182:        int siz = 0;
                    183:        u_short mid = 0;
1.2.2.22  misho     184:        struct tagPkt *p = NULL;
1.2.2.1   misho     185: 
                    186:        ioTRACE(2);
                    187: 
                    188:        if (!sess)
                    189:                return -1;
                    190: 
1.2.2.17  misho     191:        ioDEBUG(5, "Exec PUBREL session");
                    192:        mid = mqtt_readPUBREL(sess->sess_buf);
                    193:        if (mid == (u_short) -1) {
                    194:                ioDEBUG(5, "Error:: in readPUBREL #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    195:                return 0;
                    196:        }
                    197: 
                    198:        // TODO:: Delete from database topic
                    199: 
                    200:        siz = mqtt_msgPUBCOMP(sess->sess_buf, mid);
                    201:        if (siz == -1) {
                    202:                ioDEBUG(5, "Error:: in msgPUBCOMP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    203:                return 0;
1.2.2.22  misho     204:        } else {
                    205:                p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.17  misho     206:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    207:        }
1.2.2.1   misho     208: 
1.2.2.22  misho     209:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.1   misho     210:        return 0;
                    211: }
                    212: 
                    213: int
1.2.2.9   misho     214: cmdSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1   misho     215: {
                    216:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.7   misho     217:        mqtt_subscr_t *subs = NULL;
                    218:        int siz = 0;
                    219:        u_short mid = 0;
                    220:        register int i;
                    221:        struct tagStore *store;
1.2.2.17  misho     222:        char buf[BUFSIZ];
                    223:        void *ptr;
1.2.2.21  misho     224:        struct tagPkt *p = NULL;
1.2.2.1   misho     225: 
                    226:        ioTRACE(2);
                    227: 
                    228:        if (!sess)
                    229:                return -1;
                    230: 
1.2.2.7   misho     231:        ioDEBUG(5, "Exec SUBSCRIBE session");
                    232:        siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
                    233:        if (siz == -1) {
                    234:                ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    235:                return 0;
                    236:        }
                    237: 
                    238:        /* add to db */
                    239:        for (i = 0; i < siz; i++) {
1.2.2.17  misho     240:                /* convert topic to sql search statement */
                    241:                if (mqtt_sqlTopic(subs[i].sub_topic.msg_base, buf, sizeof buf) == -1) {
                    242:                        ioDEBUG(5, "Error:: in db #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    243:                        goto end;
                    244:                }
                    245:                if (call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, buf, 
1.2.2.8   misho     246:                                sess->sess_user, sess->sess_addr, subs[i].sub_ret) > 0) {
1.2.2.18  misho     247:                        store = io_malloc(sizeof(struct tagStore));
1.2.2.7   misho     248:                        if (!store) {
                    249:                                ioSYSERR(0);
                    250:                                goto end;
                    251:                        } else {
                    252:                                store->st_msgid = mid;
1.2.2.8   misho     253:                                mqtt_subCopy(&store->st_subscr, &subs[i]);
1.2.2.7   misho     254:                        }
                    255: 
                    256:                        /* add to cache */
                    257:                        SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
                    258: 
1.2.2.17  misho     259:                        /* convert topic to regexp */
1.2.2.24! misho     260:                        if (mqtt_expandTopic(subs[i].sub_topic.msg_base, buf, sizeof buf, 1, 1) == -1) {
1.2.2.17  misho     261:                                ioDEBUG(5, "Error:: in regexp #%d - %s", mqtt_GetErrno(), mqtt_GetError());
1.2.2.24! misho     262: 
        !           263:                                subs[i].sub_ret = MQTT_QOS_DENY;
1.2.2.17  misho     264:                        } else {
                    265:                                ptr = realloc(store->st_subscr.sub_topic.msg_base, strlen(buf) + 1);
                    266:                                if (!ptr) {
                    267:                                        ioSYSERR(0);
1.2.2.24! misho     268: 
        !           269:                                        subs[i].sub_ret = MQTT_QOS_DENY;
1.2.2.17  misho     270:                                } else {
                    271:                                        store->st_subscr.sub_topic.msg_base = ptr;
                    272:                                        store->st_subscr.sub_topic.msg_len = strlen(buf) + 1;
                    273:                                        memcpy(store->st_subscr.sub_topic.msg_base, buf, 
                    274:                                                        store->st_subscr.sub_topic.msg_len);
                    275:                                }
                    276: 
1.2.2.24! misho     277:                                call.LOG(logg, "Added [%s] SUBSCRIBE '%s'(%d) from %s\n", sess->sess_cid, 
        !           278:                                                store->st_subscr.sub_topic.msg_base, 
        !           279:                                                store->st_subscr.sub_topic.msg_len, sess->sess_addr);
1.2.2.17  misho     280: 
1.2.2.24! misho     281:                                subs[i].sub_ret = MQTT_QOS_PASS;
        !           282:                        }
1.2.2.7   misho     283:                } else
                    284:                        subs[i].sub_ret = MQTT_QOS_DENY;
                    285:        }
1.2.2.1   misho     286: 
1.2.2.7   misho     287:        /* send acknowledge */
                    288:        siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
                    289:        if (siz == -1) {
                    290:                ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    291:                goto end;
1.2.2.21  misho     292:        } else {
                    293:                p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.8   misho     294:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    295:        }
1.2.2.21  misho     296: 
                    297:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.7   misho     298: end:
                    299:        mqtt_subFree(&subs);
1.2.2.1   misho     300:        return 0;
                    301: }
                    302: 
                    303: int
1.2.2.9   misho     304: cmdUNSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1   misho     305: {
                    306:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.13  misho     307:        mqtt_subscr_t *subs = NULL;
                    308:        int siz = 0;
                    309:        u_short mid = 0;
                    310:        register int i;
                    311:        struct tagStore *store, *tmp;
1.2.2.21  misho     312:        struct tagPkt *p = NULL;
1.2.2.1   misho     313: 
                    314:        ioTRACE(2);
                    315: 
                    316:        if (!sess)
                    317:                return -1;
                    318: 
1.2.2.13  misho     319:        ioDEBUG(5, "Exec UNSUBSCRIBE session");
                    320:        siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs);
                    321:        if (siz == -1) {
                    322:                ioDEBUG(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    323:                return 0;
                    324:        }
                    325: 
                    326:        /* del from db */
                    327:        for (i = 0; i < siz; i++) {
                    328:                SLIST_FOREACH_SAFE(store, &sess->sess_subscr, st_node, tmp) {
                    329:                        if (store->st_subscr.sub_ret == subs[i].sub_ret && 
                    330:                                        store->st_subscr.sub_topic.msg_base && 
                    331:                                        !strcmp(store->st_subscr.sub_topic.msg_base, 
                    332:                                                subs[i].sub_topic.msg_base)) {
                    333:                                SLIST_REMOVE(&sess->sess_subscr, store, tagStore, st_node);
1.2.2.14  misho     334: 
                    335:                                if (store->st_subscr.sub_topic.msg_base)
                    336:                                        free(store->st_subscr.sub_topic.msg_base);
                    337:                                if (store->st_subscr.sub_value.msg_base)
                    338:                                        free(store->st_subscr.sub_value.msg_base);
1.2.2.18  misho     339:                                io_free(store);
1.2.2.13  misho     340:                        }
                    341:                }
                    342: 
1.2.2.15  misho     343:                call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, subs[i].sub_topic.msg_base, 
                    344:                                sess->sess_user, "%");
1.2.2.13  misho     345:        }
                    346: 
                    347:        /* send acknowledge */
                    348:        siz = mqtt_msgUNSUBACK(sess->sess_buf, mid);
                    349:        if (siz == -1) {
                    350:                ioDEBUG(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    351:                goto end;
1.2.2.21  misho     352:        } else {
                    353:                p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.13  misho     354:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    355:        }
1.2.2.21  misho     356: 
                    357:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.13  misho     358: end:
                    359:        mqtt_subFree(&subs);
1.2.2.1   misho     360:        return 0;
                    361: }
                    362: 
                    363: int
1.2.2.9   misho     364: cmdPINGREQ(void *srv, int len, void *arg)
1.2.2.1   misho     365: {
                    366:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.2   misho     367:        int siz = 0;
1.2.2.19  misho     368:        struct tagPkt *p = NULL;
1.2.2.1   misho     369: 
                    370:        ioTRACE(2);
                    371: 
                    372:        if (!sess)
                    373:                return -1;
                    374: 
1.2.2.7   misho     375:        ioDEBUG(5, "Exec PINGREQ session");
1.2.2.2   misho     376:        siz = mqtt_msgPINGRESP(sess->sess_buf);
                    377:        if (siz == -1) {
                    378:                ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    379:                return 0;
1.2.2.8   misho     380:        } else {
1.2.2.19  misho     381:                p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.8   misho     382:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    383:        }
1.2.2.1   misho     384: 
1.2.2.19  misho     385:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.1   misho     386:        return 0;
                    387: }
                    388: 
                    389: int
1.2.2.9   misho     390: cmdCONNECT(void *srv, int len, void *arg)
1.2.2.1   misho     391: {
                    392:        struct tagStore *store;
1.2.2.19  misho     393:        struct tagPkt *p;
1.2.2.1   misho     394:        struct tagSession *sess = (struct tagSession*) arg;
                    395: 
                    396:        ioTRACE(2);
                    397: 
                    398:        if (!sess)
                    399:                return -1;
                    400: 
1.2.2.6   misho     401:        ioDEBUG(5, "Exec CONNECT session");
1.2.2.1   misho     402:        TAILQ_REMOVE(&Sessions, sess, sess_node);
                    403: 
1.2.2.16  misho     404:        if (sess->sess_clean) {
                    405:                if (call.FiniSessPUB)
                    406:                        call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
                    407:                if (call.DeletePUB_subscribe)
                    408:                        call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%");
                    409:                if (call.WipePUB_topic)
                    410:                        call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1);
                    411:        }
1.2.2.1   misho     412: 
1.2.2.6   misho     413:        while ((store = SLIST_FIRST(&sess->sess_subscr))) {
                    414:                SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
1.2.2.3   misho     415: 
1.2.2.6   misho     416:                if (store->st_subscr.sub_topic.msg_base)
                    417:                        free(store->st_subscr.sub_topic.msg_base);
                    418:                if (store->st_subscr.sub_value.msg_base)
                    419:                        free(store->st_subscr.sub_value.msg_base);
                    420: 
1.2.2.18  misho     421:                io_free(store);
1.2.2.6   misho     422:        }
1.2.2.1   misho     423: 
1.2.2.19  misho     424:        while ((p = SLIST_FIRST(&sess->sess_sndpkt))) {
                    425:                SLIST_REMOVE_HEAD(&sess->sess_sndpkt, pkt_node);
                    426: 
                    427:                io_freeVar(&p->pkt_data);
                    428:                io_free(p);
                    429:        }
                    430: 
1.2.2.1   misho     431:        if (sess->sess_will.msg)
                    432:                free(sess->sess_will.msg);
                    433:        if (sess->sess_will.topic)
                    434:                free(sess->sess_will.topic);
                    435: 
                    436:        call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
                    437:                        sess->sess_addr, sess->sess_user);
1.2.2.9   misho     438: 
1.2.2.12  misho     439:        return -3;      /* reconnect client */
1.2.2.1   misho     440: }
                    441: 
                    442: int
1.2.2.9   misho     443: cmdDISCONNECT(void *srv, int len, void *arg)
1.2.2.1   misho     444: {
                    445:        struct tagSession *sess = (struct tagSession*) arg;
                    446: 
                    447:        ioTRACE(2);
                    448: 
                    449:        if (!sess)
                    450:                return -1;
                    451: 
1.2.2.5   misho     452:        ioDEBUG(5, "Exec DISCONNECT session");
1.2.2.10  misho     453: 
1.2.2.1   misho     454:        call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
                    455:                        sess->sess_addr, sess->sess_user);
1.2.2.9   misho     456: 
1.2.2.10  misho     457:        return -2;      /* must terminate dispatcher */
1.2.2.1   misho     458: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>