Annotation of mqtt/src/mqttd_calls.c, revision 1.2.2.25

1.2       misho       1: #include "global.h"
                      2: #include "mqttd.h"
1.2.2.1   misho       3: #include "rtlm.h"
1.2       misho       4: #include "mqttd_calls.h"
                      5: 
                      6: 
1.2.2.19  misho       7: static inline struct tagPkt *
                      8: mkPkt(void * __restrict data, int dlen)
                      9: {
                     10:        struct tagPkt *p = NULL;
                     11: 
                     12:        p = io_malloc(sizeof(struct tagPkt));
                     13:        if (!p) {
                     14:                ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());
                     15:                return NULL;
                     16:        } else
                     17:                memset(p, 0, sizeof(struct tagPkt));
                     18: 
                     19:        p->pkt_data = io_allocVar();
                     20:        if (!p->pkt_data) {
                     21:                ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());
                     22:                io_free(p);
                     23:                return NULL;
                     24:        }
                     25: 
                     26:        if (data && dlen > 0)
                     27:                AIT_SET_BUF(p->pkt_data, data, dlen);
                     28: 
                     29:        return p;
                     30: }
                     31: 
                     32: static inline void
1.2.2.20  misho      33: freePkt(struct tagPkt * __restrict p)
1.2.2.19  misho      34: {
1.2.2.20  misho      35:        if (!p)
1.2.2.19  misho      36:                return;
                     37: 
1.2.2.20  misho      38:        io_freeVar(&p->pkt_data);
                     39:        io_free(p);
1.2.2.19  misho      40: }
                     41: 
                     42: static void *
                     43: sendPacket(sched_task_t *task)
                     44: {
                     45:        struct tagPkt *p = TASK_ARG(task);
                     46:        register int n, slen;
                     47:        u_char *pos;
                     48: 
                     49:        if (!p || !p->pkt_data || AIT_ISEMPTY(p->pkt_data)) {
                     50:                ioDEBUG(9, "Error:: invalid packet or found empty content ...");
                     51:                return NULL;
                     52:        }
                     53: 
                     54:        for (slen = AIT_LEN(p->pkt_data), pos = AIT_GET_BUF(p->pkt_data); slen > 0; 
                     55:                        slen -= n, pos += n) {
                     56:                n = send(TASK_FD(task), pos, slen, MSG_NOSIGNAL);
                     57:                if (n == -1) {
                     58:                        ioSYSERR(0);
                     59:                        break;
                     60:                }
                     61:        }
                     62: 
1.2.2.20  misho      63:        freePkt(p);
1.2.2.19  misho      64:        return NULL;
                     65: }
                     66: 
1.2.2.22  misho      67: /* --------------------------------------------------- */
1.2.2.19  misho      68: 
1.2.2.17  misho      69: static int
1.2.2.22  misho      70: pubOnce(struct tagSession *sess, char * __restrict psTopic, int datlen)
1.2.2.17  misho      71: {
1.2.2.22  misho      72:        struct tagPkt *p = NULL;
                     73:        struct tagSession *s = NULL;
                     74:        struct tagStore *st = NULL;
                     75:        regex_t re;
                     76:        regmatch_t match;
                     77:        int ret;
                     78:        char szStr[STRSIZ];
                     79: 
                     80:        TAILQ_FOREACH(s, &Sessions, sess_node) {
                     81:                SLIST_FOREACH(st, &s->sess_subscr, st_node) {
                     82:                        if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
                     83:                                regerror(ret, &re, szStr, sizeof szStr);
                     84:                                regfree(&re);
                     85:                                ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*)
                     86:                                                st->st_subscr.sub_topic.msg_base, szStr);
                     87:                        }
                     88:                        if (!regexec(&re, psTopic, 1, &match, 0)) {
                     89:                                /* MATCH */
1.2.2.23  misho      90:                                ioDEBUG(1, "+++ dlen=%d\n", datlen);
1.2.2.22  misho      91:                                p = mkPkt(sess->sess_buf->msg_base, datlen);
1.2.2.23  misho      92:                                schedWrite(root, sendPacket, p, s->sess_sock, NULL, 0);
1.2.2.22  misho      93:                        }
                     94: 
                     95:                        regfree(&re);
                     96:                }
                     97:        }
                     98: 
1.2.2.17  misho      99:        return 0;
                    100: }
                    101: 
                    102: static int
1.2.2.22  misho     103: pubAck(struct tagSession *sess, char * __restrict psTopic, int datlen)
1.2.2.17  misho     104: {
                    105:        return 0;
                    106: }
                    107: 
                    108: static int
1.2.2.22  misho     109: pubExactly(struct tagSession *sess, char * __restrict psTopic, int datlen)
1.2.2.17  misho     110: {
                    111:        return 0;
                    112: }
                    113: 
                    114: 
1.2       misho     115: int
1.2.2.9   misho     116: cmdPUBLISH(void *srv, int len, void *arg)
1.2       misho     117: {
                    118:        struct mqtthdr *hdr;
1.2.2.1   misho     119:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.17  misho     120:        char szTopic[STRSIZ] = { 0 };
                    121:        int siz = 0;
                    122:        u_short mid = 0;
1.2.2.22  misho     123:        struct tagPkt *p = NULL;
1.2       misho     124: 
                    125:        ioTRACE(2);
                    126: 
                    127:        if (!sess)
                    128:                return -1;
                    129: 
1.2.2.17  misho     130:        ioDEBUG(5, "Exec PUBLISH session");
1.2.2.22  misho     131:        siz = mqtt_readPUBLISH(sess->sess_buf, szTopic, sizeof szTopic, &mid, NULL);
1.2.2.17  misho     132:        if (siz == -1) {
                    133:                ioDEBUG(5, "Error:: in readPUBLISH #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    134:                return 0;
                    135:        }
                    136: 
1.2.2.22  misho     137:        /* duplicate packet for retransmit to subscribers */
                    138:        /*
                    139:        pubpkt = mqtt_msgDup(sess->sess_buf);
                    140:        if (!pubpkt) {
                    141:                ioDEBUG(5, "Error:: in duplicate packet #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    142:                return 0;
                    143:        } else
                    144:        */
                    145: 
1.2       misho     146:        hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
                    147:        switch (hdr->mqtt_msg.qos) {
                    148:                case MQTT_QOS_ACK:
1.2.2.23  misho     149:                        pubAck(sess, szTopic, mqtt_pktLen(hdr));
1.2.2.17  misho     150:                        siz = mqtt_msgPUBACK(sess->sess_buf, mid);
                    151:                        if (siz == -1) {
                    152:                                ioDEBUG(5, "Error:: in msgPUBACK #%d - %s", 
                    153:                                                mqtt_GetErrno(), mqtt_GetError());
1.2.2.22  misho     154:                                return 0;
1.2.2.17  misho     155:                        }
1.2       misho     156:                        break;
                    157:                case MQTT_QOS_EXACTLY:
1.2.2.23  misho     158:                        pubExactly(sess, szTopic, mqtt_pktLen(hdr));
1.2.2.17  misho     159:                        siz = mqtt_msgPUBREC(sess->sess_buf, mid);
                    160:                        if (siz == -1) {
                    161:                                ioDEBUG(5, "Error:: in msgPUBREC #%d - %s", 
                    162:                                                mqtt_GetErrno(), mqtt_GetError());
1.2.2.22  misho     163:                                return 0;
1.2.2.17  misho     164:                        }
1.2       misho     165:                        break;
1.2.2.17  misho     166:                case MQTT_QOS_ONCE:
1.2.2.23  misho     167:                        pubOnce(sess, szTopic, mqtt_pktLen(hdr));
1.2       misho     168:                default:
1.2.2.22  misho     169:                        return 0;
1.2       misho     170:        }
                    171: 
1.2.2.22  misho     172:        p = mkPkt(sess->sess_buf->msg_base, siz);
                    173:        memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    174:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2       misho     175:        return 0;
                    176: }
1.2.2.1   misho     177: 
                    178: int
1.2.2.9   misho     179: cmdPUBREL(void *srv, int len, void *arg)
1.2.2.1   misho     180: {
                    181:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.17  misho     182:        int siz = 0;
                    183:        u_short mid = 0;
1.2.2.22  misho     184:        struct tagPkt *p = NULL;
1.2.2.1   misho     185: 
                    186:        ioTRACE(2);
                    187: 
                    188:        if (!sess)
                    189:                return -1;
                    190: 
1.2.2.17  misho     191:        ioDEBUG(5, "Exec PUBREL session");
                    192:        mid = mqtt_readPUBREL(sess->sess_buf);
                    193:        if (mid == (u_short) -1) {
                    194:                ioDEBUG(5, "Error:: in readPUBREL #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    195:                return 0;
                    196:        }
                    197: 
                    198:        // TODO:: Delete from database topic
                    199: 
                    200:        siz = mqtt_msgPUBCOMP(sess->sess_buf, mid);
                    201:        if (siz == -1) {
                    202:                ioDEBUG(5, "Error:: in msgPUBCOMP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    203:                return 0;
1.2.2.22  misho     204:        } else {
                    205:                p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.17  misho     206:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    207:        }
1.2.2.1   misho     208: 
1.2.2.22  misho     209:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.1   misho     210:        return 0;
                    211: }
                    212: 
                    213: int
1.2.2.9   misho     214: cmdSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1   misho     215: {
                    216:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.7   misho     217:        mqtt_subscr_t *subs = NULL;
                    218:        int siz = 0;
                    219:        u_short mid = 0;
                    220:        register int i;
                    221:        struct tagStore *store;
1.2.2.17  misho     222:        char buf[BUFSIZ];
                    223:        void *ptr;
1.2.2.21  misho     224:        struct tagPkt *p = NULL;
1.2.2.1   misho     225: 
                    226:        ioTRACE(2);
                    227: 
                    228:        if (!sess)
                    229:                return -1;
                    230: 
1.2.2.7   misho     231:        ioDEBUG(5, "Exec SUBSCRIBE session");
                    232:        siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
                    233:        if (siz == -1) {
                    234:                ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    235:                return 0;
                    236:        }
                    237: 
                    238:        /* add to db */
1.2.2.25! misho     239:        for (i = 0; i < siz; i++, subs[i].sub_ret = MQTT_QOS_DENY) {
1.2.2.17  misho     240:                /* convert topic to sql search statement */
                    241:                if (mqtt_sqlTopic(subs[i].sub_topic.msg_base, buf, sizeof buf) == -1) {
                    242:                        ioDEBUG(5, "Error:: in db #%d - %s", mqtt_GetErrno(), mqtt_GetError());
1.2.2.25! misho     243:                        continue;
1.2.2.17  misho     244:                }
                    245:                if (call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, buf, 
1.2.2.8   misho     246:                                sess->sess_user, sess->sess_addr, subs[i].sub_ret) > 0) {
1.2.2.18  misho     247:                        store = io_malloc(sizeof(struct tagStore));
1.2.2.7   misho     248:                        if (!store) {
                    249:                                ioSYSERR(0);
1.2.2.25! misho     250:                                continue;
1.2.2.7   misho     251:                        } else {
                    252:                                store->st_msgid = mid;
1.2.2.8   misho     253:                                mqtt_subCopy(&store->st_subscr, &subs[i]);
1.2.2.7   misho     254:                        }
                    255: 
                    256:                        /* add to cache */
                    257:                        SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
                    258: 
1.2.2.17  misho     259:                        /* convert topic to regexp */
1.2.2.24  misho     260:                        if (mqtt_expandTopic(subs[i].sub_topic.msg_base, buf, sizeof buf, 1, 1) == -1) {
1.2.2.17  misho     261:                                ioDEBUG(5, "Error:: in regexp #%d - %s", mqtt_GetErrno(), mqtt_GetError());
1.2.2.24  misho     262: 
                    263:                                subs[i].sub_ret = MQTT_QOS_DENY;
1.2.2.17  misho     264:                        } else {
                    265:                                ptr = realloc(store->st_subscr.sub_topic.msg_base, strlen(buf) + 1);
1.2.2.25! misho     266:                                if (!ptr)
1.2.2.17  misho     267:                                        ioSYSERR(0);
1.2.2.25! misho     268:                                else {
1.2.2.17  misho     269:                                        store->st_subscr.sub_topic.msg_base = ptr;
                    270:                                        store->st_subscr.sub_topic.msg_len = strlen(buf) + 1;
                    271:                                        memcpy(store->st_subscr.sub_topic.msg_base, buf, 
                    272:                                                        store->st_subscr.sub_topic.msg_len);
                    273:                                }
                    274: 
1.2.2.24  misho     275:                                call.LOG(logg, "Added [%s] SUBSCRIBE '%s'(%d) from %s\n", sess->sess_cid, 
                    276:                                                store->st_subscr.sub_topic.msg_base, 
                    277:                                                store->st_subscr.sub_topic.msg_len, sess->sess_addr);
1.2.2.17  misho     278: 
1.2.2.24  misho     279:                                subs[i].sub_ret = MQTT_QOS_PASS;
                    280:                        }
1.2.2.25! misho     281:                }
1.2.2.7   misho     282:        }
1.2.2.1   misho     283: 
1.2.2.7   misho     284:        /* send acknowledge */
                    285:        siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
                    286:        if (siz == -1) {
                    287:                ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    288:                goto end;
1.2.2.21  misho     289:        } else {
                    290:                p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.8   misho     291:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    292:        }
1.2.2.21  misho     293: 
                    294:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.7   misho     295: end:
                    296:        mqtt_subFree(&subs);
1.2.2.1   misho     297:        return 0;
                    298: }
                    299: 
                    300: int
1.2.2.9   misho     301: cmdUNSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1   misho     302: {
                    303:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.13  misho     304:        mqtt_subscr_t *subs = NULL;
                    305:        int siz = 0;
                    306:        u_short mid = 0;
                    307:        register int i;
                    308:        struct tagStore *store, *tmp;
1.2.2.21  misho     309:        struct tagPkt *p = NULL;
1.2.2.1   misho     310: 
                    311:        ioTRACE(2);
                    312: 
                    313:        if (!sess)
                    314:                return -1;
                    315: 
1.2.2.13  misho     316:        ioDEBUG(5, "Exec UNSUBSCRIBE session");
                    317:        siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs);
                    318:        if (siz == -1) {
                    319:                ioDEBUG(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    320:                return 0;
                    321:        }
                    322: 
                    323:        /* del from db */
                    324:        for (i = 0; i < siz; i++) {
                    325:                SLIST_FOREACH_SAFE(store, &sess->sess_subscr, st_node, tmp) {
                    326:                        if (store->st_subscr.sub_ret == subs[i].sub_ret && 
                    327:                                        store->st_subscr.sub_topic.msg_base && 
                    328:                                        !strcmp(store->st_subscr.sub_topic.msg_base, 
                    329:                                                subs[i].sub_topic.msg_base)) {
                    330:                                SLIST_REMOVE(&sess->sess_subscr, store, tagStore, st_node);
1.2.2.14  misho     331: 
                    332:                                if (store->st_subscr.sub_topic.msg_base)
                    333:                                        free(store->st_subscr.sub_topic.msg_base);
                    334:                                if (store->st_subscr.sub_value.msg_base)
                    335:                                        free(store->st_subscr.sub_value.msg_base);
1.2.2.18  misho     336:                                io_free(store);
1.2.2.13  misho     337:                        }
                    338:                }
                    339: 
1.2.2.15  misho     340:                call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, subs[i].sub_topic.msg_base, 
                    341:                                sess->sess_user, "%");
1.2.2.13  misho     342:        }
                    343: 
                    344:        /* send acknowledge */
                    345:        siz = mqtt_msgUNSUBACK(sess->sess_buf, mid);
                    346:        if (siz == -1) {
                    347:                ioDEBUG(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    348:                goto end;
1.2.2.21  misho     349:        } else {
                    350:                p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.13  misho     351:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    352:        }
1.2.2.21  misho     353: 
                    354:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.13  misho     355: end:
                    356:        mqtt_subFree(&subs);
1.2.2.1   misho     357:        return 0;
                    358: }
                    359: 
                    360: int
1.2.2.9   misho     361: cmdPINGREQ(void *srv, int len, void *arg)
1.2.2.1   misho     362: {
                    363:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.2   misho     364:        int siz = 0;
1.2.2.19  misho     365:        struct tagPkt *p = NULL;
1.2.2.1   misho     366: 
                    367:        ioTRACE(2);
                    368: 
                    369:        if (!sess)
                    370:                return -1;
                    371: 
1.2.2.7   misho     372:        ioDEBUG(5, "Exec PINGREQ session");
1.2.2.2   misho     373:        siz = mqtt_msgPINGRESP(sess->sess_buf);
                    374:        if (siz == -1) {
                    375:                ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    376:                return 0;
1.2.2.8   misho     377:        } else {
1.2.2.19  misho     378:                p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.8   misho     379:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    380:        }
1.2.2.1   misho     381: 
1.2.2.19  misho     382:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.1   misho     383:        return 0;
                    384: }
                    385: 
                    386: int
1.2.2.9   misho     387: cmdCONNECT(void *srv, int len, void *arg)
1.2.2.1   misho     388: {
                    389:        struct tagStore *store;
1.2.2.19  misho     390:        struct tagPkt *p;
1.2.2.1   misho     391:        struct tagSession *sess = (struct tagSession*) arg;
                    392: 
                    393:        ioTRACE(2);
                    394: 
                    395:        if (!sess)
                    396:                return -1;
                    397: 
1.2.2.6   misho     398:        ioDEBUG(5, "Exec CONNECT session");
1.2.2.1   misho     399:        TAILQ_REMOVE(&Sessions, sess, sess_node);
                    400: 
1.2.2.16  misho     401:        if (sess->sess_clean) {
                    402:                if (call.FiniSessPUB)
                    403:                        call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
                    404:                if (call.DeletePUB_subscribe)
                    405:                        call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%");
                    406:                if (call.WipePUB_topic)
                    407:                        call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1);
                    408:        }
1.2.2.1   misho     409: 
1.2.2.6   misho     410:        while ((store = SLIST_FIRST(&sess->sess_subscr))) {
                    411:                SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
1.2.2.3   misho     412: 
1.2.2.6   misho     413:                if (store->st_subscr.sub_topic.msg_base)
                    414:                        free(store->st_subscr.sub_topic.msg_base);
                    415:                if (store->st_subscr.sub_value.msg_base)
                    416:                        free(store->st_subscr.sub_value.msg_base);
                    417: 
1.2.2.18  misho     418:                io_free(store);
1.2.2.6   misho     419:        }
1.2.2.1   misho     420: 
1.2.2.19  misho     421:        while ((p = SLIST_FIRST(&sess->sess_sndpkt))) {
                    422:                SLIST_REMOVE_HEAD(&sess->sess_sndpkt, pkt_node);
                    423: 
                    424:                io_freeVar(&p->pkt_data);
                    425:                io_free(p);
                    426:        }
                    427: 
1.2.2.1   misho     428:        if (sess->sess_will.msg)
                    429:                free(sess->sess_will.msg);
                    430:        if (sess->sess_will.topic)
                    431:                free(sess->sess_will.topic);
                    432: 
                    433:        call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
                    434:                        sess->sess_addr, sess->sess_user);
1.2.2.9   misho     435: 
1.2.2.12  misho     436:        return -3;      /* reconnect client */
1.2.2.1   misho     437: }
                    438: 
                    439: int
1.2.2.9   misho     440: cmdDISCONNECT(void *srv, int len, void *arg)
1.2.2.1   misho     441: {
                    442:        struct tagSession *sess = (struct tagSession*) arg;
                    443: 
                    444:        ioTRACE(2);
                    445: 
                    446:        if (!sess)
                    447:                return -1;
                    448: 
1.2.2.5   misho     449:        ioDEBUG(5, "Exec DISCONNECT session");
1.2.2.10  misho     450: 
1.2.2.1   misho     451:        call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
                    452:                        sess->sess_addr, sess->sess_user);
1.2.2.9   misho     453: 
1.2.2.10  misho     454:        return -2;      /* must terminate dispatcher */
1.2.2.1   misho     455: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>