Annotation of mqtt/src/mqttd_calls.c, revision 1.2.2.22

1.2       misho       1: #include "global.h"
                      2: #include "mqttd.h"
1.2.2.1   misho       3: #include "rtlm.h"
1.2       misho       4: #include "mqttd_calls.h"
                      5: 
                      6: 
1.2.2.19  misho       7: static inline struct tagPkt *
                      8: mkPkt(void * __restrict data, int dlen)
                      9: {
                     10:        struct tagPkt *p = NULL;
                     11: 
                     12:        p = io_malloc(sizeof(struct tagPkt));
                     13:        if (!p) {
                     14:                ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());
                     15:                return NULL;
                     16:        } else
                     17:                memset(p, 0, sizeof(struct tagPkt));
                     18: 
                     19:        p->pkt_data = io_allocVar();
                     20:        if (!p->pkt_data) {
                     21:                ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());
                     22:                io_free(p);
                     23:                return NULL;
                     24:        }
                     25: 
                     26:        if (data && dlen > 0)
                     27:                AIT_SET_BUF(p->pkt_data, data, dlen);
                     28: 
                     29:        return p;
                     30: }
                     31: 
                     32: static inline void
1.2.2.20  misho      33: freePkt(struct tagPkt * __restrict p)
1.2.2.19  misho      34: {
1.2.2.20  misho      35:        if (!p)
1.2.2.19  misho      36:                return;
                     37: 
1.2.2.20  misho      38:        io_freeVar(&p->pkt_data);
                     39:        io_free(p);
1.2.2.19  misho      40: }
                     41: 
                     42: static void *
                     43: sendPacket(sched_task_t *task)
                     44: {
                     45:        struct tagPkt *p = TASK_ARG(task);
                     46:        register int n, slen;
                     47:        u_char *pos;
                     48: 
                     49:        if (!p || !p->pkt_data || AIT_ISEMPTY(p->pkt_data)) {
                     50:                ioDEBUG(9, "Error:: invalid packet or found empty content ...");
                     51:                return NULL;
                     52:        }
                     53: 
                     54:        for (slen = AIT_LEN(p->pkt_data), pos = AIT_GET_BUF(p->pkt_data); slen > 0; 
                     55:                        slen -= n, pos += n) {
                     56:                n = send(TASK_FD(task), pos, slen, MSG_NOSIGNAL);
                     57:                if (n == -1) {
                     58:                        ioSYSERR(0);
                     59:                        break;
                     60:                }
                     61:        }
                     62: 
1.2.2.20  misho      63:        freePkt(p);
1.2.2.19  misho      64:        return NULL;
                     65: }
                     66: 
1.2.2.22! misho      67: /* --------------------------------------------------- */
1.2.2.19  misho      68: 
1.2.2.17  misho      69: static int
1.2.2.22! misho      70: pubOnce(struct tagSession *sess, char * __restrict psTopic, int datlen)
1.2.2.17  misho      71: {
1.2.2.22! misho      72:        struct tagPkt *p = NULL;
        !            73:        struct tagSession *s = NULL;
        !            74:        struct tagStore *st = NULL;
        !            75:        regex_t re;
        !            76:        regmatch_t match;
        !            77:        int ret;
        !            78:        char szStr[STRSIZ];
        !            79: 
        !            80:        TAILQ_FOREACH(s, &Sessions, sess_node) {
        !            81:                SLIST_FOREACH(st, &s->sess_subscr, st_node) {
        !            82:                        if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
        !            83:                                regerror(ret, &re, szStr, sizeof szStr);
        !            84:                                regfree(&re);
        !            85:                                ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*)
        !            86:                                                st->st_subscr.sub_topic.msg_base, szStr);
        !            87:                        }
        !            88:                        if (!regexec(&re, psTopic, 1, &match, 0)) {
        !            89:                                /* MATCH */
        !            90:                                p = mkPkt(sess->sess_buf->msg_base, datlen);
        !            91:                                schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
        !            92:                        }
        !            93: 
        !            94:                        regfree(&re);
        !            95:                }
        !            96:        }
        !            97: 
1.2.2.17  misho      98:        return 0;
                     99: }
                    100: 
                    101: static int
1.2.2.22! misho     102: pubAck(struct tagSession *sess, char * __restrict psTopic, int datlen)
1.2.2.17  misho     103: {
                    104:        return 0;
                    105: }
                    106: 
                    107: static int
1.2.2.22! misho     108: pubExactly(struct tagSession *sess, char * __restrict psTopic, int datlen)
1.2.2.17  misho     109: {
                    110:        return 0;
                    111: }
                    112: 
                    113: 
1.2       misho     114: int
1.2.2.9   misho     115: cmdPUBLISH(void *srv, int len, void *arg)
1.2       misho     116: {
                    117:        struct mqtthdr *hdr;
1.2.2.1   misho     118:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.17  misho     119:        char szTopic[STRSIZ] = { 0 };
                    120:        int siz = 0;
                    121:        u_short mid = 0;
1.2.2.22! misho     122:        struct tagPkt *p = NULL;
1.2       misho     123: 
                    124:        ioTRACE(2);
                    125: 
                    126:        if (!sess)
                    127:                return -1;
                    128: 
1.2.2.17  misho     129:        ioDEBUG(5, "Exec PUBLISH session");
1.2.2.22! misho     130:        siz = mqtt_readPUBLISH(sess->sess_buf, szTopic, sizeof szTopic, &mid, NULL);
1.2.2.17  misho     131:        if (siz == -1) {
                    132:                ioDEBUG(5, "Error:: in readPUBLISH #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    133:                return 0;
                    134:        }
                    135: 
1.2.2.22! misho     136:        /* duplicate packet for retransmit to subscribers */
        !           137:        /*
        !           138:        pubpkt = mqtt_msgDup(sess->sess_buf);
        !           139:        if (!pubpkt) {
        !           140:                ioDEBUG(5, "Error:: in duplicate packet #%d - %s", mqtt_GetErrno(), mqtt_GetError());
        !           141:                return 0;
        !           142:        } else
        !           143:        */
        !           144: 
1.2       misho     145:        hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
                    146:        switch (hdr->mqtt_msg.qos) {
                    147:                case MQTT_QOS_ACK:
1.2.2.22! misho     148:                        pubAck(sess, szTopic, siz);
1.2.2.17  misho     149:                        siz = mqtt_msgPUBACK(sess->sess_buf, mid);
                    150:                        if (siz == -1) {
                    151:                                ioDEBUG(5, "Error:: in msgPUBACK #%d - %s", 
                    152:                                                mqtt_GetErrno(), mqtt_GetError());
1.2.2.22! misho     153:                                return 0;
1.2.2.17  misho     154:                        }
1.2       misho     155:                        break;
                    156:                case MQTT_QOS_EXACTLY:
1.2.2.22! misho     157:                        pubExactly(sess, szTopic, siz);
1.2.2.17  misho     158:                        siz = mqtt_msgPUBREC(sess->sess_buf, mid);
                    159:                        if (siz == -1) {
                    160:                                ioDEBUG(5, "Error:: in msgPUBREC #%d - %s", 
                    161:                                                mqtt_GetErrno(), mqtt_GetError());
1.2.2.22! misho     162:                                return 0;
1.2.2.17  misho     163:                        }
1.2       misho     164:                        break;
1.2.2.17  misho     165:                case MQTT_QOS_ONCE:
1.2.2.22! misho     166:                        pubOnce(sess, szTopic, siz);
1.2       misho     167:                default:
1.2.2.22! misho     168:                        return 0;
1.2       misho     169:        }
                    170: 
1.2.2.22! misho     171:        p = mkPkt(sess->sess_buf->msg_base, siz);
        !           172:        memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
        !           173:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2       misho     174:        return 0;
                    175: }
1.2.2.1   misho     176: 
                    177: int
1.2.2.9   misho     178: cmdPUBREL(void *srv, int len, void *arg)
1.2.2.1   misho     179: {
                    180:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.17  misho     181:        int siz = 0;
                    182:        u_short mid = 0;
1.2.2.22! misho     183:        struct tagPkt *p = NULL;
1.2.2.1   misho     184: 
                    185:        ioTRACE(2);
                    186: 
                    187:        if (!sess)
                    188:                return -1;
                    189: 
1.2.2.17  misho     190:        ioDEBUG(5, "Exec PUBREL session");
                    191:        mid = mqtt_readPUBREL(sess->sess_buf);
                    192:        if (mid == (u_short) -1) {
                    193:                ioDEBUG(5, "Error:: in readPUBREL #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    194:                return 0;
                    195:        }
                    196: 
                    197:        // TODO:: Delete from database topic
                    198: 
                    199:        siz = mqtt_msgPUBCOMP(sess->sess_buf, mid);
                    200:        if (siz == -1) {
                    201:                ioDEBUG(5, "Error:: in msgPUBCOMP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    202:                return 0;
1.2.2.22! misho     203:        } else {
        !           204:                p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.17  misho     205:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    206:        }
1.2.2.1   misho     207: 
1.2.2.22! misho     208:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.1   misho     209:        return 0;
                    210: }
                    211: 
                    212: int
1.2.2.9   misho     213: cmdSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1   misho     214: {
                    215:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.7   misho     216:        mqtt_subscr_t *subs = NULL;
                    217:        int siz = 0;
                    218:        u_short mid = 0;
                    219:        register int i;
                    220:        struct tagStore *store;
1.2.2.17  misho     221:        char buf[BUFSIZ];
                    222:        void *ptr;
1.2.2.21  misho     223:        struct tagPkt *p = NULL;
1.2.2.1   misho     224: 
                    225:        ioTRACE(2);
                    226: 
                    227:        if (!sess)
                    228:                return -1;
                    229: 
1.2.2.7   misho     230:        ioDEBUG(5, "Exec SUBSCRIBE session");
                    231:        siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
                    232:        if (siz == -1) {
                    233:                ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    234:                return 0;
                    235:        }
                    236: 
                    237:        /* add to db */
                    238:        for (i = 0; i < siz; i++) {
1.2.2.17  misho     239:                /* convert topic to sql search statement */
                    240:                if (mqtt_sqlTopic(subs[i].sub_topic.msg_base, buf, sizeof buf) == -1) {
                    241:                        ioDEBUG(5, "Error:: in db #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    242:                        goto end;
                    243:                }
                    244:                if (call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, buf, 
1.2.2.8   misho     245:                                sess->sess_user, sess->sess_addr, subs[i].sub_ret) > 0) {
1.2.2.18  misho     246:                        store = io_malloc(sizeof(struct tagStore));
1.2.2.7   misho     247:                        if (!store) {
                    248:                                ioSYSERR(0);
                    249:                                goto end;
                    250:                        } else {
                    251:                                store->st_msgid = mid;
1.2.2.8   misho     252:                                mqtt_subCopy(&store->st_subscr, &subs[i]);
1.2.2.7   misho     253:                        }
                    254: 
                    255:                        /* add to cache */
                    256:                        SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
                    257: 
1.2.2.17  misho     258:                        /* convert topic to regexp */
                    259:                        if (mqtt_expandTopic(subs[i].sub_topic.msg_base, buf, sizeof buf, 1, 0) == -1) {
                    260:                                ioDEBUG(5, "Error:: in regexp #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    261:                                goto end;
                    262:                        } else {
                    263:                                ptr = realloc(store->st_subscr.sub_topic.msg_base, strlen(buf) + 1);
                    264:                                if (!ptr) {
                    265:                                        ioSYSERR(0);
                    266:                                        goto end;
                    267:                                } else {
                    268:                                        store->st_subscr.sub_topic.msg_base = ptr;
                    269:                                        store->st_subscr.sub_topic.msg_len = strlen(buf) + 1;
                    270:                                        memcpy(store->st_subscr.sub_topic.msg_base, buf, 
                    271:                                                        store->st_subscr.sub_topic.msg_len);
                    272:                                }
                    273:                        }
                    274: 
                    275:                        call.LOG(logg, "Added [%s] SUBSCRIBE '%s'(%d) from %s\n", sess->sess_cid, 
                    276:                                        store->st_subscr.sub_topic.msg_base, 
                    277:                                        store->st_subscr.sub_topic.msg_len, sess->sess_addr);
                    278: 
1.2.2.7   misho     279:                        subs[i].sub_ret = MQTT_QOS_PASS;
                    280:                } else
                    281:                        subs[i].sub_ret = MQTT_QOS_DENY;
                    282:        }
1.2.2.1   misho     283: 
1.2.2.7   misho     284:        /* send acknowledge */
                    285:        siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
                    286:        if (siz == -1) {
                    287:                ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    288:                goto end;
1.2.2.21  misho     289:        } else {
                    290:                p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.8   misho     291:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    292:        }
1.2.2.21  misho     293: 
                    294:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.7   misho     295: end:
                    296:        mqtt_subFree(&subs);
1.2.2.1   misho     297:        return 0;
                    298: }
                    299: 
                    300: int
1.2.2.9   misho     301: cmdUNSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1   misho     302: {
                    303:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.13  misho     304:        mqtt_subscr_t *subs = NULL;
                    305:        int siz = 0;
                    306:        u_short mid = 0;
                    307:        register int i;
                    308:        struct tagStore *store, *tmp;
1.2.2.21  misho     309:        struct tagPkt *p = NULL;
1.2.2.1   misho     310: 
                    311:        ioTRACE(2);
                    312: 
                    313:        if (!sess)
                    314:                return -1;
                    315: 
1.2.2.13  misho     316:        ioDEBUG(5, "Exec UNSUBSCRIBE session");
                    317:        siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs);
                    318:        if (siz == -1) {
                    319:                ioDEBUG(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    320:                return 0;
                    321:        }
                    322: 
                    323:        /* del from db */
                    324:        for (i = 0; i < siz; i++) {
                    325:                SLIST_FOREACH_SAFE(store, &sess->sess_subscr, st_node, tmp) {
                    326:                        if (store->st_subscr.sub_ret == subs[i].sub_ret && 
                    327:                                        store->st_subscr.sub_topic.msg_base && 
                    328:                                        !strcmp(store->st_subscr.sub_topic.msg_base, 
                    329:                                                subs[i].sub_topic.msg_base)) {
                    330:                                SLIST_REMOVE(&sess->sess_subscr, store, tagStore, st_node);
1.2.2.14  misho     331: 
                    332:                                if (store->st_subscr.sub_topic.msg_base)
                    333:                                        free(store->st_subscr.sub_topic.msg_base);
                    334:                                if (store->st_subscr.sub_value.msg_base)
                    335:                                        free(store->st_subscr.sub_value.msg_base);
1.2.2.18  misho     336:                                io_free(store);
1.2.2.13  misho     337:                        }
                    338:                }
                    339: 
1.2.2.15  misho     340:                call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, subs[i].sub_topic.msg_base, 
                    341:                                sess->sess_user, "%");
1.2.2.13  misho     342:        }
                    343: 
                    344:        /* send acknowledge */
                    345:        siz = mqtt_msgUNSUBACK(sess->sess_buf, mid);
                    346:        if (siz == -1) {
                    347:                ioDEBUG(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    348:                goto end;
1.2.2.21  misho     349:        } else {
                    350:                p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.13  misho     351:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    352:        }
1.2.2.21  misho     353: 
                    354:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.13  misho     355: end:
                    356:        mqtt_subFree(&subs);
1.2.2.1   misho     357:        return 0;
                    358: }
                    359: 
                    360: int
1.2.2.9   misho     361: cmdPINGREQ(void *srv, int len, void *arg)
1.2.2.1   misho     362: {
                    363:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.2   misho     364:        int siz = 0;
1.2.2.19  misho     365:        struct tagPkt *p = NULL;
1.2.2.1   misho     366: 
                    367:        ioTRACE(2);
                    368: 
                    369:        if (!sess)
                    370:                return -1;
                    371: 
1.2.2.7   misho     372:        ioDEBUG(5, "Exec PINGREQ session");
1.2.2.2   misho     373:        siz = mqtt_msgPINGRESP(sess->sess_buf);
                    374:        if (siz == -1) {
                    375:                ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    376:                return 0;
1.2.2.8   misho     377:        } else {
1.2.2.19  misho     378:                p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.8   misho     379:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    380:        }
1.2.2.1   misho     381: 
1.2.2.19  misho     382:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.1   misho     383:        return 0;
                    384: }
                    385: 
                    386: int
1.2.2.9   misho     387: cmdCONNECT(void *srv, int len, void *arg)
1.2.2.1   misho     388: {
                    389:        struct tagStore *store;
1.2.2.19  misho     390:        struct tagPkt *p;
1.2.2.1   misho     391:        struct tagSession *sess = (struct tagSession*) arg;
                    392: 
                    393:        ioTRACE(2);
                    394: 
                    395:        if (!sess)
                    396:                return -1;
                    397: 
1.2.2.6   misho     398:        ioDEBUG(5, "Exec CONNECT session");
1.2.2.1   misho     399:        TAILQ_REMOVE(&Sessions, sess, sess_node);
                    400: 
1.2.2.16  misho     401:        if (sess->sess_clean) {
                    402:                if (call.FiniSessPUB)
                    403:                        call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
                    404:                if (call.DeletePUB_subscribe)
                    405:                        call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%");
                    406:                if (call.WipePUB_topic)
                    407:                        call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1);
                    408:        }
1.2.2.1   misho     409: 
1.2.2.6   misho     410:        while ((store = SLIST_FIRST(&sess->sess_subscr))) {
                    411:                SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
1.2.2.3   misho     412: 
1.2.2.6   misho     413:                if (store->st_subscr.sub_topic.msg_base)
                    414:                        free(store->st_subscr.sub_topic.msg_base);
                    415:                if (store->st_subscr.sub_value.msg_base)
                    416:                        free(store->st_subscr.sub_value.msg_base);
                    417: 
1.2.2.18  misho     418:                io_free(store);
1.2.2.6   misho     419:        }
1.2.2.1   misho     420: 
1.2.2.19  misho     421:        while ((p = SLIST_FIRST(&sess->sess_sndpkt))) {
                    422:                SLIST_REMOVE_HEAD(&sess->sess_sndpkt, pkt_node);
                    423: 
                    424:                io_freeVar(&p->pkt_data);
                    425:                io_free(p);
                    426:        }
                    427: 
1.2.2.1   misho     428:        if (sess->sess_will.msg)
                    429:                free(sess->sess_will.msg);
                    430:        if (sess->sess_will.topic)
                    431:                free(sess->sess_will.topic);
                    432: 
                    433:        call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
                    434:                        sess->sess_addr, sess->sess_user);
1.2.2.9   misho     435: 
1.2.2.12  misho     436:        return -3;      /* reconnect client */
1.2.2.1   misho     437: }
                    438: 
                    439: int
1.2.2.9   misho     440: cmdDISCONNECT(void *srv, int len, void *arg)
1.2.2.1   misho     441: {
                    442:        struct tagSession *sess = (struct tagSession*) arg;
                    443: 
                    444:        ioTRACE(2);
                    445: 
                    446:        if (!sess)
                    447:                return -1;
                    448: 
1.2.2.5   misho     449:        ioDEBUG(5, "Exec DISCONNECT session");
1.2.2.10  misho     450: 
1.2.2.1   misho     451:        call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
                    452:                        sess->sess_addr, sess->sess_user);
1.2.2.9   misho     453: 
1.2.2.10  misho     454:        return -2;      /* must terminate dispatcher */
1.2.2.1   misho     455: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>