Annotation of mqtt/src/mqttd_calls.c, revision 1.2.2.23

1.2       misho       1: #include "global.h"
                      2: #include "mqttd.h"
1.2.2.1   misho       3: #include "rtlm.h"
1.2       misho       4: #include "mqttd_calls.h"
                      5: 
                      6: 
1.2.2.19  misho       7: static inline struct tagPkt *
                      8: mkPkt(void * __restrict data, int dlen)
                      9: {
                     10:        struct tagPkt *p = NULL;
                     11: 
                     12:        p = io_malloc(sizeof(struct tagPkt));
                     13:        if (!p) {
                     14:                ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());
                     15:                return NULL;
                     16:        } else
                     17:                memset(p, 0, sizeof(struct tagPkt));
                     18: 
                     19:        p->pkt_data = io_allocVar();
                     20:        if (!p->pkt_data) {
                     21:                ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());
                     22:                io_free(p);
                     23:                return NULL;
                     24:        }
                     25: 
                     26:        if (data && dlen > 0)
                     27:                AIT_SET_BUF(p->pkt_data, data, dlen);
                     28: 
                     29:        return p;
                     30: }
                     31: 
                     32: static inline void
1.2.2.20  misho      33: freePkt(struct tagPkt * __restrict p)
1.2.2.19  misho      34: {
1.2.2.20  misho      35:        if (!p)
1.2.2.19  misho      36:                return;
                     37: 
1.2.2.20  misho      38:        io_freeVar(&p->pkt_data);
                     39:        io_free(p);
1.2.2.19  misho      40: }
                     41: 
                     42: static void *
                     43: sendPacket(sched_task_t *task)
                     44: {
                     45:        struct tagPkt *p = TASK_ARG(task);
                     46:        register int n, slen;
                     47:        u_char *pos;
                     48: 
                     49:        if (!p || !p->pkt_data || AIT_ISEMPTY(p->pkt_data)) {
                     50:                ioDEBUG(9, "Error:: invalid packet or found empty content ...");
                     51:                return NULL;
                     52:        }
                     53: 
                     54:        for (slen = AIT_LEN(p->pkt_data), pos = AIT_GET_BUF(p->pkt_data); slen > 0; 
                     55:                        slen -= n, pos += n) {
                     56:                n = send(TASK_FD(task), pos, slen, MSG_NOSIGNAL);
                     57:                if (n == -1) {
                     58:                        ioSYSERR(0);
                     59:                        break;
                     60:                }
                     61:        }
                     62: 
1.2.2.20  misho      63:        freePkt(p);
1.2.2.19  misho      64:        return NULL;
                     65: }
                     66: 
1.2.2.22  misho      67: /* --------------------------------------------------- */
1.2.2.19  misho      68: 
1.2.2.17  misho      69: static int
1.2.2.22  misho      70: pubOnce(struct tagSession *sess, char * __restrict psTopic, int datlen)
1.2.2.17  misho      71: {
1.2.2.22  misho      72:        struct tagPkt *p = NULL;
                     73:        struct tagSession *s = NULL;
                     74:        struct tagStore *st = NULL;
                     75:        regex_t re;
                     76:        regmatch_t match;
                     77:        int ret;
                     78:        char szStr[STRSIZ];
                     79: 
                     80:        TAILQ_FOREACH(s, &Sessions, sess_node) {
                     81:                SLIST_FOREACH(st, &s->sess_subscr, st_node) {
                     82:                        if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
                     83:                                regerror(ret, &re, szStr, sizeof szStr);
                     84:                                regfree(&re);
                     85:                                ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*)
                     86:                                                st->st_subscr.sub_topic.msg_base, szStr);
                     87:                        }
                     88:                        if (!regexec(&re, psTopic, 1, &match, 0)) {
                     89:                                /* MATCH */
1.2.2.23! misho      90:                                ioDEBUG(1, "+++ dlen=%d\n", datlen);
1.2.2.22  misho      91:                                p = mkPkt(sess->sess_buf->msg_base, datlen);
1.2.2.23! misho      92:                                schedWrite(root, sendPacket, p, s->sess_sock, NULL, 0);
1.2.2.22  misho      93:                        }
                     94: 
                     95:                        regfree(&re);
                     96:                }
                     97:        }
                     98: 
1.2.2.17  misho      99:        return 0;
                    100: }
                    101: 
                    102: static int
1.2.2.22  misho     103: pubAck(struct tagSession *sess, char * __restrict psTopic, int datlen)
1.2.2.17  misho     104: {
                    105:        return 0;
                    106: }
                    107: 
                    108: static int
1.2.2.22  misho     109: pubExactly(struct tagSession *sess, char * __restrict psTopic, int datlen)
1.2.2.17  misho     110: {
                    111:        return 0;
                    112: }
                    113: 
                    114: 
1.2       misho     115: int
1.2.2.9   misho     116: cmdPUBLISH(void *srv, int len, void *arg)
1.2       misho     117: {
                    118:        struct mqtthdr *hdr;
1.2.2.1   misho     119:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.17  misho     120:        char szTopic[STRSIZ] = { 0 };
                    121:        int siz = 0;
                    122:        u_short mid = 0;
1.2.2.22  misho     123:        struct tagPkt *p = NULL;
1.2       misho     124: 
                    125:        ioTRACE(2);
                    126: 
                    127:        if (!sess)
                    128:                return -1;
                    129: 
1.2.2.17  misho     130:        ioDEBUG(5, "Exec PUBLISH session");
1.2.2.22  misho     131:        siz = mqtt_readPUBLISH(sess->sess_buf, szTopic, sizeof szTopic, &mid, NULL);
1.2.2.17  misho     132:        if (siz == -1) {
                    133:                ioDEBUG(5, "Error:: in readPUBLISH #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    134:                return 0;
                    135:        }
                    136: 
1.2.2.22  misho     137:        /* duplicate packet for retransmit to subscribers */
                    138:        /*
                    139:        pubpkt = mqtt_msgDup(sess->sess_buf);
                    140:        if (!pubpkt) {
                    141:                ioDEBUG(5, "Error:: in duplicate packet #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    142:                return 0;
                    143:        } else
                    144:        */
                    145: 
1.2       misho     146:        hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
                    147:        switch (hdr->mqtt_msg.qos) {
                    148:                case MQTT_QOS_ACK:
1.2.2.23! misho     149:                        pubAck(sess, szTopic, mqtt_pktLen(hdr));
1.2.2.17  misho     150:                        siz = mqtt_msgPUBACK(sess->sess_buf, mid);
                    151:                        if (siz == -1) {
                    152:                                ioDEBUG(5, "Error:: in msgPUBACK #%d - %s", 
                    153:                                                mqtt_GetErrno(), mqtt_GetError());
1.2.2.22  misho     154:                                return 0;
1.2.2.17  misho     155:                        }
1.2       misho     156:                        break;
                    157:                case MQTT_QOS_EXACTLY:
1.2.2.23! misho     158:                        pubExactly(sess, szTopic, mqtt_pktLen(hdr));
1.2.2.17  misho     159:                        siz = mqtt_msgPUBREC(sess->sess_buf, mid);
                    160:                        if (siz == -1) {
                    161:                                ioDEBUG(5, "Error:: in msgPUBREC #%d - %s", 
                    162:                                                mqtt_GetErrno(), mqtt_GetError());
1.2.2.22  misho     163:                                return 0;
1.2.2.17  misho     164:                        }
1.2       misho     165:                        break;
1.2.2.17  misho     166:                case MQTT_QOS_ONCE:
1.2.2.23! misho     167:                        pubOnce(sess, szTopic, mqtt_pktLen(hdr));
1.2       misho     168:                default:
1.2.2.22  misho     169:                        return 0;
1.2       misho     170:        }
                    171: 
1.2.2.22  misho     172:        p = mkPkt(sess->sess_buf->msg_base, siz);
                    173:        memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    174:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2       misho     175:        return 0;
                    176: }
1.2.2.1   misho     177: 
                    178: int
1.2.2.9   misho     179: cmdPUBREL(void *srv, int len, void *arg)
1.2.2.1   misho     180: {
                    181:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.17  misho     182:        int siz = 0;
                    183:        u_short mid = 0;
1.2.2.22  misho     184:        struct tagPkt *p = NULL;
1.2.2.1   misho     185: 
                    186:        ioTRACE(2);
                    187: 
                    188:        if (!sess)
                    189:                return -1;
                    190: 
1.2.2.17  misho     191:        ioDEBUG(5, "Exec PUBREL session");
                    192:        mid = mqtt_readPUBREL(sess->sess_buf);
                    193:        if (mid == (u_short) -1) {
                    194:                ioDEBUG(5, "Error:: in readPUBREL #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    195:                return 0;
                    196:        }
                    197: 
                    198:        // TODO:: Delete from database topic
                    199: 
                    200:        siz = mqtt_msgPUBCOMP(sess->sess_buf, mid);
                    201:        if (siz == -1) {
                    202:                ioDEBUG(5, "Error:: in msgPUBCOMP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    203:                return 0;
1.2.2.22  misho     204:        } else {
                    205:                p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.17  misho     206:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    207:        }
1.2.2.1   misho     208: 
1.2.2.22  misho     209:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.1   misho     210:        return 0;
                    211: }
                    212: 
                    213: int
1.2.2.9   misho     214: cmdSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1   misho     215: {
                    216:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.7   misho     217:        mqtt_subscr_t *subs = NULL;
                    218:        int siz = 0;
                    219:        u_short mid = 0;
                    220:        register int i;
                    221:        struct tagStore *store;
1.2.2.17  misho     222:        char buf[BUFSIZ];
                    223:        void *ptr;
1.2.2.21  misho     224:        struct tagPkt *p = NULL;
1.2.2.1   misho     225: 
                    226:        ioTRACE(2);
                    227: 
                    228:        if (!sess)
                    229:                return -1;
                    230: 
1.2.2.7   misho     231:        ioDEBUG(5, "Exec SUBSCRIBE session");
                    232:        siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
                    233:        if (siz == -1) {
                    234:                ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    235:                return 0;
                    236:        }
                    237: 
                    238:        /* add to db */
                    239:        for (i = 0; i < siz; i++) {
1.2.2.17  misho     240:                /* convert topic to sql search statement */
                    241:                if (mqtt_sqlTopic(subs[i].sub_topic.msg_base, buf, sizeof buf) == -1) {
                    242:                        ioDEBUG(5, "Error:: in db #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    243:                        goto end;
                    244:                }
                    245:                if (call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, buf, 
1.2.2.8   misho     246:                                sess->sess_user, sess->sess_addr, subs[i].sub_ret) > 0) {
1.2.2.18  misho     247:                        store = io_malloc(sizeof(struct tagStore));
1.2.2.7   misho     248:                        if (!store) {
                    249:                                ioSYSERR(0);
                    250:                                goto end;
                    251:                        } else {
                    252:                                store->st_msgid = mid;
1.2.2.8   misho     253:                                mqtt_subCopy(&store->st_subscr, &subs[i]);
1.2.2.7   misho     254:                        }
                    255: 
                    256:                        /* add to cache */
                    257:                        SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
                    258: 
1.2.2.17  misho     259:                        /* convert topic to regexp */
                    260:                        if (mqtt_expandTopic(subs[i].sub_topic.msg_base, buf, sizeof buf, 1, 0) == -1) {
                    261:                                ioDEBUG(5, "Error:: in regexp #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    262:                                goto end;
                    263:                        } else {
                    264:                                ptr = realloc(store->st_subscr.sub_topic.msg_base, strlen(buf) + 1);
                    265:                                if (!ptr) {
                    266:                                        ioSYSERR(0);
                    267:                                        goto end;
                    268:                                } else {
                    269:                                        store->st_subscr.sub_topic.msg_base = ptr;
                    270:                                        store->st_subscr.sub_topic.msg_len = strlen(buf) + 1;
                    271:                                        memcpy(store->st_subscr.sub_topic.msg_base, buf, 
                    272:                                                        store->st_subscr.sub_topic.msg_len);
                    273:                                }
                    274:                        }
                    275: 
                    276:                        call.LOG(logg, "Added [%s] SUBSCRIBE '%s'(%d) from %s\n", sess->sess_cid, 
                    277:                                        store->st_subscr.sub_topic.msg_base, 
                    278:                                        store->st_subscr.sub_topic.msg_len, sess->sess_addr);
                    279: 
1.2.2.7   misho     280:                        subs[i].sub_ret = MQTT_QOS_PASS;
                    281:                } else
                    282:                        subs[i].sub_ret = MQTT_QOS_DENY;
                    283:        }
1.2.2.1   misho     284: 
1.2.2.7   misho     285:        /* send acknowledge */
                    286:        siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
                    287:        if (siz == -1) {
                    288:                ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    289:                goto end;
1.2.2.21  misho     290:        } else {
                    291:                p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.8   misho     292:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    293:        }
1.2.2.21  misho     294: 
                    295:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.7   misho     296: end:
                    297:        mqtt_subFree(&subs);
1.2.2.1   misho     298:        return 0;
                    299: }
                    300: 
                    301: int
1.2.2.9   misho     302: cmdUNSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1   misho     303: {
                    304:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.13  misho     305:        mqtt_subscr_t *subs = NULL;
                    306:        int siz = 0;
                    307:        u_short mid = 0;
                    308:        register int i;
                    309:        struct tagStore *store, *tmp;
1.2.2.21  misho     310:        struct tagPkt *p = NULL;
1.2.2.1   misho     311: 
                    312:        ioTRACE(2);
                    313: 
                    314:        if (!sess)
                    315:                return -1;
                    316: 
1.2.2.13  misho     317:        ioDEBUG(5, "Exec UNSUBSCRIBE session");
                    318:        siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs);
                    319:        if (siz == -1) {
                    320:                ioDEBUG(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    321:                return 0;
                    322:        }
                    323: 
                    324:        /* del from db */
                    325:        for (i = 0; i < siz; i++) {
                    326:                SLIST_FOREACH_SAFE(store, &sess->sess_subscr, st_node, tmp) {
                    327:                        if (store->st_subscr.sub_ret == subs[i].sub_ret && 
                    328:                                        store->st_subscr.sub_topic.msg_base && 
                    329:                                        !strcmp(store->st_subscr.sub_topic.msg_base, 
                    330:                                                subs[i].sub_topic.msg_base)) {
                    331:                                SLIST_REMOVE(&sess->sess_subscr, store, tagStore, st_node);
1.2.2.14  misho     332: 
                    333:                                if (store->st_subscr.sub_topic.msg_base)
                    334:                                        free(store->st_subscr.sub_topic.msg_base);
                    335:                                if (store->st_subscr.sub_value.msg_base)
                    336:                                        free(store->st_subscr.sub_value.msg_base);
1.2.2.18  misho     337:                                io_free(store);
1.2.2.13  misho     338:                        }
                    339:                }
                    340: 
1.2.2.15  misho     341:                call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, subs[i].sub_topic.msg_base, 
                    342:                                sess->sess_user, "%");
1.2.2.13  misho     343:        }
                    344: 
                    345:        /* send acknowledge */
                    346:        siz = mqtt_msgUNSUBACK(sess->sess_buf, mid);
                    347:        if (siz == -1) {
                    348:                ioDEBUG(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    349:                goto end;
1.2.2.21  misho     350:        } else {
                    351:                p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.13  misho     352:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    353:        }
1.2.2.21  misho     354: 
                    355:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.13  misho     356: end:
                    357:        mqtt_subFree(&subs);
1.2.2.1   misho     358:        return 0;
                    359: }
                    360: 
                    361: int
1.2.2.9   misho     362: cmdPINGREQ(void *srv, int len, void *arg)
1.2.2.1   misho     363: {
                    364:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.2   misho     365:        int siz = 0;
1.2.2.19  misho     366:        struct tagPkt *p = NULL;
1.2.2.1   misho     367: 
                    368:        ioTRACE(2);
                    369: 
                    370:        if (!sess)
                    371:                return -1;
                    372: 
1.2.2.7   misho     373:        ioDEBUG(5, "Exec PINGREQ session");
1.2.2.2   misho     374:        siz = mqtt_msgPINGRESP(sess->sess_buf);
                    375:        if (siz == -1) {
                    376:                ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    377:                return 0;
1.2.2.8   misho     378:        } else {
1.2.2.19  misho     379:                p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.8   misho     380:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    381:        }
1.2.2.1   misho     382: 
1.2.2.19  misho     383:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.1   misho     384:        return 0;
                    385: }
                    386: 
                    387: int
1.2.2.9   misho     388: cmdCONNECT(void *srv, int len, void *arg)
1.2.2.1   misho     389: {
                    390:        struct tagStore *store;
1.2.2.19  misho     391:        struct tagPkt *p;
1.2.2.1   misho     392:        struct tagSession *sess = (struct tagSession*) arg;
                    393: 
                    394:        ioTRACE(2);
                    395: 
                    396:        if (!sess)
                    397:                return -1;
                    398: 
1.2.2.6   misho     399:        ioDEBUG(5, "Exec CONNECT session");
1.2.2.1   misho     400:        TAILQ_REMOVE(&Sessions, sess, sess_node);
                    401: 
1.2.2.16  misho     402:        if (sess->sess_clean) {
                    403:                if (call.FiniSessPUB)
                    404:                        call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
                    405:                if (call.DeletePUB_subscribe)
                    406:                        call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%");
                    407:                if (call.WipePUB_topic)
                    408:                        call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1);
                    409:        }
1.2.2.1   misho     410: 
1.2.2.6   misho     411:        while ((store = SLIST_FIRST(&sess->sess_subscr))) {
                    412:                SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
1.2.2.3   misho     413: 
1.2.2.6   misho     414:                if (store->st_subscr.sub_topic.msg_base)
                    415:                        free(store->st_subscr.sub_topic.msg_base);
                    416:                if (store->st_subscr.sub_value.msg_base)
                    417:                        free(store->st_subscr.sub_value.msg_base);
                    418: 
1.2.2.18  misho     419:                io_free(store);
1.2.2.6   misho     420:        }
1.2.2.1   misho     421: 
1.2.2.19  misho     422:        while ((p = SLIST_FIRST(&sess->sess_sndpkt))) {
                    423:                SLIST_REMOVE_HEAD(&sess->sess_sndpkt, pkt_node);
                    424: 
                    425:                io_freeVar(&p->pkt_data);
                    426:                io_free(p);
                    427:        }
                    428: 
1.2.2.1   misho     429:        if (sess->sess_will.msg)
                    430:                free(sess->sess_will.msg);
                    431:        if (sess->sess_will.topic)
                    432:                free(sess->sess_will.topic);
                    433: 
                    434:        call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
                    435:                        sess->sess_addr, sess->sess_user);
1.2.2.9   misho     436: 
1.2.2.12  misho     437:        return -3;      /* reconnect client */
1.2.2.1   misho     438: }
                    439: 
                    440: int
1.2.2.9   misho     441: cmdDISCONNECT(void *srv, int len, void *arg)
1.2.2.1   misho     442: {
                    443:        struct tagSession *sess = (struct tagSession*) arg;
                    444: 
                    445:        ioTRACE(2);
                    446: 
                    447:        if (!sess)
                    448:                return -1;
                    449: 
1.2.2.5   misho     450:        ioDEBUG(5, "Exec DISCONNECT session");
1.2.2.10  misho     451: 
1.2.2.1   misho     452:        call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
                    453:                        sess->sess_addr, sess->sess_user);
1.2.2.9   misho     454: 
1.2.2.10  misho     455:        return -2;      /* must terminate dispatcher */
1.2.2.1   misho     456: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>