--- mqtt/src/mqttd_calls.c 2012/05/08 14:36:10 1.2.2.16 +++ mqtt/src/mqttd_calls.c 2012/05/22 14:15:04 1.2.2.17 @@ -4,47 +4,121 @@ #include "mqttd_calls.h" +static int +pubOnce(struct tagSession *sess, u_short mid, char * __restrict psTopic, + int topicLen, char * __restrict data, int datlen) +{ + return 0; +} + +static int +pubAck(struct tagSession *sess, u_short mid, char * __restrict psTopic, + int topicLen, char * __restrict data, int datlen) +{ + return 0; +} + +static int +pubExactly(struct tagSession *sess, u_short mid, char * __restrict psTopic, + int topicLen, char * __restrict data, int datlen) +{ + return 0; +} + + int cmdPUBLISH(void *srv, int len, void *arg) { struct mqtthdr *hdr; struct tagSession *sess = (struct tagSession*) arg; + void *data = NULL; + char szTopic[STRSIZ] = { 0 }; + int siz = 0; + u_short mid = 0; ioTRACE(2); if (!sess) return -1; + ioDEBUG(5, "Exec PUBLISH session"); + siz = mqtt_readPUBLISH(sess->sess_buf, szTopic, sizeof szTopic, &mid, &data); + if (siz == -1) { + ioDEBUG(5, "Error:: in readPUBLISH #%d - %s", mqtt_GetErrno(), mqtt_GetError()); + return 0; + } + hdr = (struct mqtthdr*) sess->sess_buf->msg_base; switch (hdr->mqtt_msg.qos) { - case MQTT_QOS_ONCE: - break; case MQTT_QOS_ACK: + pubAck(sess, mid, szTopic, sizeof szTopic, data, siz); + siz = mqtt_msgPUBACK(sess->sess_buf, mid); + if (siz == -1) { + ioDEBUG(5, "Error:: in msgPUBACK #%d - %s", + mqtt_GetErrno(), mqtt_GetError()); + goto end; + } break; case MQTT_QOS_EXACTLY: + pubExactly(sess, mid, szTopic, sizeof szTopic, data, siz); + siz = mqtt_msgPUBREC(sess->sess_buf, mid); + if (siz == -1) { + ioDEBUG(5, "Error:: in msgPUBREC #%d - %s", + mqtt_GetErrno(), mqtt_GetError()); + goto end; + } break; + case MQTT_QOS_ONCE: + pubOnce(sess, mid, szTopic, sizeof szTopic, data, siz); default: - ioDEBUG(1, "Error:: Unknown QoS %d - rejected publishing request", - hdr->mqtt_msg.qos); - return 0; + goto end; } + if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1) + ioSYSERR(0); + else { + ioDEBUG(5, "Sended %d bytes.", siz); + memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len); + } +end: + if (data) + free(data); return 0; } int cmdPUBREL(void *srv, int len, void *arg) { - struct mqtthdr *hdr; struct tagSession *sess = (struct tagSession*) arg; + int siz = 0; + u_short mid = 0; ioTRACE(2); if (!sess) return -1; - hdr = (struct mqtthdr*) sess->sess_buf->msg_base; + ioDEBUG(5, "Exec PUBREL session"); + mid = mqtt_readPUBREL(sess->sess_buf); + if (mid == (u_short) -1) { + ioDEBUG(5, "Error:: in readPUBREL #%d - %s", mqtt_GetErrno(), mqtt_GetError()); + return 0; + } + // TODO:: Delete from database topic + + siz = mqtt_msgPUBCOMP(sess->sess_buf, mid); + if (siz == -1) { + ioDEBUG(5, "Error:: in msgPUBCOMP #%d - %s", mqtt_GetErrno(), mqtt_GetError()); + return 0; + } + if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1) + ioSYSERR(0); + else { + ioDEBUG(5, "Sended %d bytes.", siz); + memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len); + } + return 0; } @@ -57,6 +131,8 @@ cmdSUBSCRIBE(void *srv, int len, void *arg) u_short mid = 0; register int i; struct tagStore *store; + char buf[BUFSIZ]; + void *ptr; ioTRACE(2); @@ -72,7 +148,12 @@ cmdSUBSCRIBE(void *srv, int len, void *arg) /* add to db */ for (i = 0; i < siz; i++) { - if (call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, subs[i].sub_topic.msg_base, + /* convert topic to sql search statement */ + if (mqtt_sqlTopic(subs[i].sub_topic.msg_base, buf, sizeof buf) == -1) { + ioDEBUG(5, "Error:: in db #%d - %s", mqtt_GetErrno(), mqtt_GetError()); + goto end; + } + if (call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, buf, sess->sess_user, sess->sess_addr, subs[i].sub_ret) > 0) { store = malloc(sizeof(struct tagStore)); if (!store) { @@ -85,6 +166,27 @@ cmdSUBSCRIBE(void *srv, int len, void *arg) /* add to cache */ SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node); + + /* convert topic to regexp */ + if (mqtt_expandTopic(subs[i].sub_topic.msg_base, buf, sizeof buf, 1, 0) == -1) { + ioDEBUG(5, "Error:: in regexp #%d - %s", mqtt_GetErrno(), mqtt_GetError()); + goto end; + } else { + ptr = realloc(store->st_subscr.sub_topic.msg_base, strlen(buf) + 1); + if (!ptr) { + ioSYSERR(0); + goto end; + } else { + store->st_subscr.sub_topic.msg_base = ptr; + store->st_subscr.sub_topic.msg_len = strlen(buf) + 1; + memcpy(store->st_subscr.sub_topic.msg_base, buf, + store->st_subscr.sub_topic.msg_len); + } + } + + call.LOG(logg, "Added [%s] SUBSCRIBE '%s'(%d) from %s\n", sess->sess_cid, + store->st_subscr.sub_topic.msg_base, + store->st_subscr.sub_topic.msg_len, sess->sess_addr); subs[i].sub_ret = MQTT_QOS_PASS; } else