Annotation of mqtt/src/mqttd_calls.c, revision 1.2.2.17

1.2       misho       1: #include "global.h"
                      2: #include "mqttd.h"
1.2.2.1   misho       3: #include "rtlm.h"
1.2       misho       4: #include "mqttd_calls.h"
                      5: 
                      6: 
1.2.2.17! misho       7: static int
        !             8: pubOnce(struct tagSession *sess, u_short mid, char * __restrict psTopic, 
        !             9:                int topicLen, char * __restrict data, int datlen)
        !            10: {
        !            11:        return 0;
        !            12: }
        !            13: 
        !            14: static int
        !            15: pubAck(struct tagSession *sess, u_short mid, char * __restrict psTopic, 
        !            16:                int topicLen, char * __restrict data, int datlen)
        !            17: {
        !            18:        return 0;
        !            19: }
        !            20: 
        !            21: static int
        !            22: pubExactly(struct tagSession *sess, u_short mid, char * __restrict psTopic, 
        !            23:                int topicLen, char * __restrict data, int datlen)
        !            24: {
        !            25:        return 0;
        !            26: }
        !            27: 
        !            28: 
1.2       misho      29: int
1.2.2.9   misho      30: cmdPUBLISH(void *srv, int len, void *arg)
1.2       misho      31: {
                     32:        struct mqtthdr *hdr;
1.2.2.1   misho      33:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.17! misho      34:        void *data = NULL;
        !            35:        char szTopic[STRSIZ] = { 0 };
        !            36:        int siz = 0;
        !            37:        u_short mid = 0;
1.2       misho      38: 
                     39:        ioTRACE(2);
                     40: 
                     41:        if (!sess)
                     42:                return -1;
                     43: 
1.2.2.17! misho      44:        ioDEBUG(5, "Exec PUBLISH session");
        !            45:        siz = mqtt_readPUBLISH(sess->sess_buf, szTopic, sizeof szTopic, &mid, &data);
        !            46:        if (siz == -1) {
        !            47:                ioDEBUG(5, "Error:: in readPUBLISH #%d - %s", mqtt_GetErrno(), mqtt_GetError());
        !            48:                return 0;
        !            49:        }
        !            50: 
1.2       misho      51:        hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
                     52:        switch (hdr->mqtt_msg.qos) {
                     53:                case MQTT_QOS_ACK:
1.2.2.17! misho      54:                        pubAck(sess, mid, szTopic, sizeof szTopic, data, siz);
        !            55:                        siz = mqtt_msgPUBACK(sess->sess_buf, mid);
        !            56:                        if (siz == -1) {
        !            57:                                ioDEBUG(5, "Error:: in msgPUBACK #%d - %s", 
        !            58:                                                mqtt_GetErrno(), mqtt_GetError());
        !            59:                                goto end;
        !            60:                        }
1.2       misho      61:                        break;
                     62:                case MQTT_QOS_EXACTLY:
1.2.2.17! misho      63:                        pubExactly(sess, mid, szTopic, sizeof szTopic, data, siz);
        !            64:                        siz = mqtt_msgPUBREC(sess->sess_buf, mid);
        !            65:                        if (siz == -1) {
        !            66:                                ioDEBUG(5, "Error:: in msgPUBREC #%d - %s", 
        !            67:                                                mqtt_GetErrno(), mqtt_GetError());
        !            68:                                goto end;
        !            69:                        }
1.2       misho      70:                        break;
1.2.2.17! misho      71:                case MQTT_QOS_ONCE:
        !            72:                        pubOnce(sess, mid, szTopic, sizeof szTopic, data, siz);
1.2       misho      73:                default:
1.2.2.17! misho      74:                        goto end;
1.2       misho      75:        }
                     76: 
1.2.2.17! misho      77:        if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1)
        !            78:                ioSYSERR(0);
        !            79:        else {
        !            80:                ioDEBUG(5, "Sended %d bytes.", siz);
        !            81:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
        !            82:        }
        !            83: end:
        !            84:        if (data)
        !            85:                free(data);
1.2       misho      86:        return 0;
                     87: }
1.2.2.1   misho      88: 
                     89: int
1.2.2.9   misho      90: cmdPUBREL(void *srv, int len, void *arg)
1.2.2.1   misho      91: {
                     92:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.17! misho      93:        int siz = 0;
        !            94:        u_short mid = 0;
1.2.2.1   misho      95: 
                     96:        ioTRACE(2);
                     97: 
                     98:        if (!sess)
                     99:                return -1;
                    100: 
1.2.2.17! misho     101:        ioDEBUG(5, "Exec PUBREL session");
        !           102:        mid = mqtt_readPUBREL(sess->sess_buf);
        !           103:        if (mid == (u_short) -1) {
        !           104:                ioDEBUG(5, "Error:: in readPUBREL #%d - %s", mqtt_GetErrno(), mqtt_GetError());
        !           105:                return 0;
        !           106:        }
        !           107: 
        !           108:        // TODO:: Delete from database topic
        !           109: 
        !           110:        siz = mqtt_msgPUBCOMP(sess->sess_buf, mid);
        !           111:        if (siz == -1) {
        !           112:                ioDEBUG(5, "Error:: in msgPUBCOMP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
        !           113:                return 0;
        !           114:        }
        !           115:        if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1)
        !           116:                ioSYSERR(0);
        !           117:        else {
        !           118:                ioDEBUG(5, "Sended %d bytes.", siz);
        !           119:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
        !           120:        }
1.2.2.1   misho     121: 
                    122:        return 0;
                    123: }
                    124: 
                    125: int
1.2.2.9   misho     126: cmdSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1   misho     127: {
                    128:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.7   misho     129:        mqtt_subscr_t *subs = NULL;
                    130:        int siz = 0;
                    131:        u_short mid = 0;
                    132:        register int i;
                    133:        struct tagStore *store;
1.2.2.17! misho     134:        char buf[BUFSIZ];
        !           135:        void *ptr;
1.2.2.1   misho     136: 
                    137:        ioTRACE(2);
                    138: 
                    139:        if (!sess)
                    140:                return -1;
                    141: 
1.2.2.7   misho     142:        ioDEBUG(5, "Exec SUBSCRIBE session");
                    143:        siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
                    144:        if (siz == -1) {
                    145:                ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    146:                return 0;
                    147:        }
                    148: 
                    149:        /* add to db */
                    150:        for (i = 0; i < siz; i++) {
1.2.2.17! misho     151:                /* convert topic to sql search statement */
        !           152:                if (mqtt_sqlTopic(subs[i].sub_topic.msg_base, buf, sizeof buf) == -1) {
        !           153:                        ioDEBUG(5, "Error:: in db #%d - %s", mqtt_GetErrno(), mqtt_GetError());
        !           154:                        goto end;
        !           155:                }
        !           156:                if (call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, buf, 
1.2.2.8   misho     157:                                sess->sess_user, sess->sess_addr, subs[i].sub_ret) > 0) {
1.2.2.7   misho     158:                        store = malloc(sizeof(struct tagStore));
                    159:                        if (!store) {
                    160:                                ioSYSERR(0);
                    161:                                goto end;
                    162:                        } else {
                    163:                                store->st_msgid = mid;
1.2.2.8   misho     164:                                mqtt_subCopy(&store->st_subscr, &subs[i]);
1.2.2.7   misho     165:                        }
                    166: 
                    167:                        /* add to cache */
                    168:                        SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
                    169: 
1.2.2.17! misho     170:                        /* convert topic to regexp */
        !           171:                        if (mqtt_expandTopic(subs[i].sub_topic.msg_base, buf, sizeof buf, 1, 0) == -1) {
        !           172:                                ioDEBUG(5, "Error:: in regexp #%d - %s", mqtt_GetErrno(), mqtt_GetError());
        !           173:                                goto end;
        !           174:                        } else {
        !           175:                                ptr = realloc(store->st_subscr.sub_topic.msg_base, strlen(buf) + 1);
        !           176:                                if (!ptr) {
        !           177:                                        ioSYSERR(0);
        !           178:                                        goto end;
        !           179:                                } else {
        !           180:                                        store->st_subscr.sub_topic.msg_base = ptr;
        !           181:                                        store->st_subscr.sub_topic.msg_len = strlen(buf) + 1;
        !           182:                                        memcpy(store->st_subscr.sub_topic.msg_base, buf, 
        !           183:                                                        store->st_subscr.sub_topic.msg_len);
        !           184:                                }
        !           185:                        }
        !           186: 
        !           187:                        call.LOG(logg, "Added [%s] SUBSCRIBE '%s'(%d) from %s\n", sess->sess_cid, 
        !           188:                                        store->st_subscr.sub_topic.msg_base, 
        !           189:                                        store->st_subscr.sub_topic.msg_len, sess->sess_addr);
        !           190: 
1.2.2.7   misho     191:                        subs[i].sub_ret = MQTT_QOS_PASS;
                    192:                } else
                    193:                        subs[i].sub_ret = MQTT_QOS_DENY;
                    194:        }
1.2.2.1   misho     195: 
1.2.2.7   misho     196:        /* send acknowledge */
                    197:        siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
                    198:        if (siz == -1) {
                    199:                ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    200:                goto end;
                    201:        }
1.2.2.13  misho     202:        if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1)
1.2.2.7   misho     203:                ioSYSERR(0);
1.2.2.8   misho     204:        else {
1.2.2.7   misho     205:                ioDEBUG(5, "Sended %d bytes.", siz);
1.2.2.8   misho     206:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    207:        }
1.2.2.7   misho     208: end:
                    209:        mqtt_subFree(&subs);
1.2.2.1   misho     210:        return 0;
                    211: }
                    212: 
                    213: int
1.2.2.9   misho     214: cmdUNSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1   misho     215: {
                    216:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.13  misho     217:        mqtt_subscr_t *subs = NULL;
                    218:        int siz = 0;
                    219:        u_short mid = 0;
                    220:        register int i;
                    221:        struct tagStore *store, *tmp;
1.2.2.1   misho     222: 
                    223:        ioTRACE(2);
                    224: 
                    225:        if (!sess)
                    226:                return -1;
                    227: 
1.2.2.13  misho     228:        ioDEBUG(5, "Exec UNSUBSCRIBE session");
                    229:        siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs);
                    230:        if (siz == -1) {
                    231:                ioDEBUG(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    232:                return 0;
                    233:        }
                    234: 
                    235:        /* del from db */
                    236:        for (i = 0; i < siz; i++) {
                    237:                SLIST_FOREACH_SAFE(store, &sess->sess_subscr, st_node, tmp) {
                    238:                        if (store->st_subscr.sub_ret == subs[i].sub_ret && 
                    239:                                        store->st_subscr.sub_topic.msg_base && 
                    240:                                        !strcmp(store->st_subscr.sub_topic.msg_base, 
                    241:                                                subs[i].sub_topic.msg_base)) {
                    242:                                SLIST_REMOVE(&sess->sess_subscr, store, tagStore, st_node);
1.2.2.14  misho     243: 
                    244:                                if (store->st_subscr.sub_topic.msg_base)
                    245:                                        free(store->st_subscr.sub_topic.msg_base);
                    246:                                if (store->st_subscr.sub_value.msg_base)
                    247:                                        free(store->st_subscr.sub_value.msg_base);
1.2.2.13  misho     248:                                free(store);
                    249:                        }
                    250:                }
                    251: 
1.2.2.15  misho     252:                call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, subs[i].sub_topic.msg_base, 
                    253:                                sess->sess_user, "%");
1.2.2.13  misho     254:        }
                    255: 
                    256:        /* send acknowledge */
                    257:        siz = mqtt_msgUNSUBACK(sess->sess_buf, mid);
                    258:        if (siz == -1) {
                    259:                ioDEBUG(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    260:                goto end;
                    261:        }
                    262:        if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1)
                    263:                ioSYSERR(0);
                    264:        else {
                    265:                ioDEBUG(5, "Sended %d bytes.", siz);
                    266:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    267:        }
                    268: end:
                    269:        mqtt_subFree(&subs);
1.2.2.1   misho     270:        return 0;
                    271: }
                    272: 
                    273: int
1.2.2.9   misho     274: cmdPINGREQ(void *srv, int len, void *arg)
1.2.2.1   misho     275: {
                    276:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.2   misho     277:        int siz = 0;
1.2.2.1   misho     278: 
                    279:        ioTRACE(2);
                    280: 
                    281:        if (!sess)
                    282:                return -1;
                    283: 
1.2.2.7   misho     284:        ioDEBUG(5, "Exec PINGREQ session");
1.2.2.2   misho     285:        siz = mqtt_msgPINGRESP(sess->sess_buf);
                    286:        if (siz == -1) {
                    287:                ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    288:                return 0;
                    289:        }
1.2.2.13  misho     290:        if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1) {
1.2.2.2   misho     291:                ioSYSERR(0);
                    292:                return 0;
1.2.2.8   misho     293:        } else {
1.2.2.2   misho     294:                ioDEBUG(5, "Sended %d bytes.", siz);
1.2.2.8   misho     295:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    296:        }
1.2.2.1   misho     297: 
                    298:        return 0;
                    299: }
                    300: 
                    301: int
1.2.2.9   misho     302: cmdCONNECT(void *srv, int len, void *arg)
1.2.2.1   misho     303: {
                    304:        struct tagStore *store;
                    305:        struct tagSession *sess = (struct tagSession*) arg;
                    306: 
                    307:        ioTRACE(2);
                    308: 
                    309:        if (!sess)
                    310:                return -1;
                    311: 
1.2.2.6   misho     312:        ioDEBUG(5, "Exec CONNECT session");
1.2.2.1   misho     313:        TAILQ_REMOVE(&Sessions, sess, sess_node);
                    314: 
1.2.2.16  misho     315:        if (sess->sess_clean) {
                    316:                if (call.FiniSessPUB)
                    317:                        call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
                    318:                if (call.DeletePUB_subscribe)
                    319:                        call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%");
                    320:                if (call.WipePUB_topic)
                    321:                        call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1);
                    322:        }
1.2.2.1   misho     323: 
1.2.2.6   misho     324:        while ((store = SLIST_FIRST(&sess->sess_subscr))) {
                    325:                SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
1.2.2.3   misho     326: 
1.2.2.6   misho     327:                if (store->st_subscr.sub_topic.msg_base)
                    328:                        free(store->st_subscr.sub_topic.msg_base);
                    329:                if (store->st_subscr.sub_value.msg_base)
                    330:                        free(store->st_subscr.sub_value.msg_base);
                    331: 
                    332:                free(store);
                    333:        }
1.2.2.1   misho     334: 
                    335:        if (sess->sess_will.msg)
                    336:                free(sess->sess_will.msg);
                    337:        if (sess->sess_will.topic)
                    338:                free(sess->sess_will.topic);
                    339: 
                    340:        call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
                    341:                        sess->sess_addr, sess->sess_user);
1.2.2.9   misho     342: 
1.2.2.12  misho     343:        return -3;      /* reconnect client */
1.2.2.1   misho     344: }
                    345: 
                    346: int
1.2.2.9   misho     347: cmdDISCONNECT(void *srv, int len, void *arg)
1.2.2.1   misho     348: {
                    349:        struct tagSession *sess = (struct tagSession*) arg;
                    350: 
                    351:        ioTRACE(2);
                    352: 
                    353:        if (!sess)
                    354:                return -1;
                    355: 
1.2.2.5   misho     356:        ioDEBUG(5, "Exec DISCONNECT session");
1.2.2.10  misho     357: 
1.2.2.1   misho     358:        call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
                    359:                        sess->sess_addr, sess->sess_user);
1.2.2.9   misho     360: 
1.2.2.10  misho     361:        return -2;      /* must terminate dispatcher */
1.2.2.1   misho     362: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>