|
|
| version 1.2.2.30, 2012/06/29 13:46:37 | version 1.2.2.32, 2012/06/29 15:43:13 |
|---|---|
| Line 1 | Line 1 |
| #include "global.h" | #include "global.h" |
| #include "mqttd.h" | #include "mqttd.h" |
| #include "utils.h" | |
| #include "rtlm.h" | #include "rtlm.h" |
| #include "mqttd_calls.h" | #include "mqttd_calls.h" |
| Line 306 cmdSUBSCRIBE(void *srv, int len, void *arg) | Line 307 cmdSUBSCRIBE(void *srv, int len, void *arg) |
| /* add to db */ | /* add to db */ |
| for (i = 0; i < siz; i++, subs[i].sub_ret = MQTT_QOS_DENY) { | for (i = 0; i < siz; i++, subs[i].sub_ret = MQTT_QOS_DENY) { |
| /* convert topic to sql search statement */ | store = io_malloc(sizeof(struct tagStore)); |
| if (mqtt_sqlTopic(subs[i].sub_topic.msg_base, buf, sizeof buf) == -1) { | if (!store) { |
| ioDEBUG(5, "Error:: in db #%d - %s", mqtt_GetErrno(), mqtt_GetError()); | ioSYSERR(0); |
| continue; | continue; |
| } else { | |
| store->st_msgid = mid; | |
| mqtt_subCopy(&store->st_subscr, &subs[i]); | |
| } | } |
| if (call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, buf, | |
| sess->sess_user, sess->sess_addr, subs[i].sub_ret) > 0) { | /* add to cache */ |
| store = io_malloc(sizeof(struct tagStore)); | SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node); |
| if (!store) { | |
| /* convert topic to regexp */ | |
| if (mqtt_expandTopic(subs[i].sub_topic.msg_base, buf, sizeof buf, 1, 1) == -1) { | |
| ioDEBUG(5, "Error:: in regexp #%d - %s", mqtt_GetErrno(), mqtt_GetError()); | |
| subs[i].sub_ret = MQTT_QOS_DENY; | |
| } else { | |
| ptr = realloc(store->st_subscr.sub_topic.msg_base, strlen(buf) + 1); | |
| if (!ptr) { | |
| ioSYSERR(0); | ioSYSERR(0); |
| continue; | continue; |
| } else { | } else { |
| store->st_msgid = mid; | store->st_subscr.sub_topic.msg_base = ptr; |
| mqtt_subCopy(&store->st_subscr, &subs[i]); | store->st_subscr.sub_topic.msg_len = strlen(buf) + 1; |
| memcpy(store->st_subscr.sub_topic.msg_base, buf, | |
| store->st_subscr.sub_topic.msg_len); | |
| } | } |
| /* add to cache */ | call.LOG(logg, "Added [%s] SUBSCRIBE '%s'(%d) from %s\n", sess->sess_cid, |
| SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node); | store->st_subscr.sub_topic.msg_base, |
| store->st_subscr.sub_topic.msg_len, sess->sess_addr); | |
| /* convert topic to regexp */ | subs[i].sub_ret = MQTT_QOS_PASS; |
| if (mqtt_expandTopic(subs[i].sub_topic.msg_base, buf, sizeof buf, 1, 1) == -1) { | |
| ioDEBUG(5, "Error:: in regexp #%d - %s", mqtt_GetErrno(), mqtt_GetError()); | |
| subs[i].sub_ret = MQTT_QOS_DENY; | |
| } else { | |
| ptr = realloc(store->st_subscr.sub_topic.msg_base, strlen(buf) + 1); | |
| if (!ptr) | |
| ioSYSERR(0); | |
| else { | |
| store->st_subscr.sub_topic.msg_base = ptr; | |
| store->st_subscr.sub_topic.msg_len = strlen(buf) + 1; | |
| memcpy(store->st_subscr.sub_topic.msg_base, buf, | |
| store->st_subscr.sub_topic.msg_len); | |
| } | |
| call.LOG(logg, "Added [%s] SUBSCRIBE '%s'(%d) from %s\n", sess->sess_cid, | |
| store->st_subscr.sub_topic.msg_base, | |
| store->st_subscr.sub_topic.msg_len, sess->sess_addr); | |
| subs[i].sub_ret = MQTT_QOS_PASS; | |
| } | |
| } | } |
| call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, buf, | |
| sess->sess_user, sess->sess_addr, subs[i].sub_ret); | |
| } | } |
| /* send acknowledge */ | /* send acknowledge */ |
| Line 485 cmdCONNECT(void *srv, int len, void *arg) | Line 482 cmdCONNECT(void *srv, int len, void *arg) |
| io_free(store); | io_free(store); |
| } | } |
| if (sess->sess_will.flag) | |
| srv_Will(sess); | |
| if (sess->sess_will.msg) | if (sess->sess_will.msg) |
| free(sess->sess_will.msg); | free(sess->sess_will.msg); |