|
|
| version 1.2.2.21, 2012/06/01 13:13:40 | version 1.2.2.23, 2012/06/20 09:23:28 |
|---|---|
| Line 64 sendPacket(sched_task_t *task) | Line 64 sendPacket(sched_task_t *task) |
| return NULL; | return NULL; |
| } | } |
| /* --------------------------------------------------- */ | |
| static int | static int |
| pubOnce(struct tagSession *sess, u_short mid, char * __restrict psTopic, | pubOnce(struct tagSession *sess, char * __restrict psTopic, int datlen) |
| int topicLen, char * __restrict data, int datlen) | |
| { | { |
| struct tagPkt *p = NULL; | |
| struct tagSession *s = NULL; | |
| struct tagStore *st = NULL; | |
| regex_t re; | |
| regmatch_t match; | |
| int ret; | |
| char szStr[STRSIZ]; | |
| TAILQ_FOREACH(s, &Sessions, sess_node) { | |
| SLIST_FOREACH(st, &s->sess_subscr, st_node) { | |
| if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) { | |
| regerror(ret, &re, szStr, sizeof szStr); | |
| regfree(&re); | |
| ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*) | |
| st->st_subscr.sub_topic.msg_base, szStr); | |
| } | |
| if (!regexec(&re, psTopic, 1, &match, 0)) { | |
| /* MATCH */ | |
| ioDEBUG(1, "+++ dlen=%d\n", datlen); | |
| p = mkPkt(sess->sess_buf->msg_base, datlen); | |
| schedWrite(root, sendPacket, p, s->sess_sock, NULL, 0); | |
| } | |
| regfree(&re); | |
| } | |
| } | |
| return 0; | return 0; |
| } | } |
| static int | static int |
| pubAck(struct tagSession *sess, u_short mid, char * __restrict psTopic, | pubAck(struct tagSession *sess, char * __restrict psTopic, int datlen) |
| int topicLen, char * __restrict data, int datlen) | |
| { | { |
| return 0; | return 0; |
| } | } |
| static int | static int |
| pubExactly(struct tagSession *sess, u_short mid, char * __restrict psTopic, | pubExactly(struct tagSession *sess, char * __restrict psTopic, int datlen) |
| int topicLen, char * __restrict data, int datlen) | |
| { | { |
| return 0; | return 0; |
| } | } |
| Line 92 cmdPUBLISH(void *srv, int len, void *arg) | Line 117 cmdPUBLISH(void *srv, int len, void *arg) |
| { | { |
| struct mqtthdr *hdr; | struct mqtthdr *hdr; |
| struct tagSession *sess = (struct tagSession*) arg; | struct tagSession *sess = (struct tagSession*) arg; |
| void *data = NULL; | |
| char szTopic[STRSIZ] = { 0 }; | char szTopic[STRSIZ] = { 0 }; |
| int siz = 0; | int siz = 0; |
| u_short mid = 0; | u_short mid = 0; |
| struct tagPkt *p = NULL; | |
| ioTRACE(2); | ioTRACE(2); |
| Line 103 cmdPUBLISH(void *srv, int len, void *arg) | Line 128 cmdPUBLISH(void *srv, int len, void *arg) |
| return -1; | return -1; |
| ioDEBUG(5, "Exec PUBLISH session"); | ioDEBUG(5, "Exec PUBLISH session"); |
| siz = mqtt_readPUBLISH(sess->sess_buf, szTopic, sizeof szTopic, &mid, &data); | siz = mqtt_readPUBLISH(sess->sess_buf, szTopic, sizeof szTopic, &mid, NULL); |
| if (siz == -1) { | if (siz == -1) { |
| ioDEBUG(5, "Error:: in readPUBLISH #%d - %s", mqtt_GetErrno(), mqtt_GetError()); | ioDEBUG(5, "Error:: in readPUBLISH #%d - %s", mqtt_GetErrno(), mqtt_GetError()); |
| return 0; | return 0; |
| } | } |
| /* duplicate packet for retransmit to subscribers */ | |
| /* | |
| pubpkt = mqtt_msgDup(sess->sess_buf); | |
| if (!pubpkt) { | |
| ioDEBUG(5, "Error:: in duplicate packet #%d - %s", mqtt_GetErrno(), mqtt_GetError()); | |
| return 0; | |
| } else | |
| */ | |
| hdr = (struct mqtthdr*) sess->sess_buf->msg_base; | hdr = (struct mqtthdr*) sess->sess_buf->msg_base; |
| switch (hdr->mqtt_msg.qos) { | switch (hdr->mqtt_msg.qos) { |
| case MQTT_QOS_ACK: | case MQTT_QOS_ACK: |
| pubAck(sess, mid, szTopic, sizeof szTopic, data, siz); | pubAck(sess, szTopic, mqtt_pktLen(hdr)); |
| siz = mqtt_msgPUBACK(sess->sess_buf, mid); | siz = mqtt_msgPUBACK(sess->sess_buf, mid); |
| if (siz == -1) { | if (siz == -1) { |
| ioDEBUG(5, "Error:: in msgPUBACK #%d - %s", | ioDEBUG(5, "Error:: in msgPUBACK #%d - %s", |
| mqtt_GetErrno(), mqtt_GetError()); | mqtt_GetErrno(), mqtt_GetError()); |
| goto end; | return 0; |
| } | } |
| break; | break; |
| case MQTT_QOS_EXACTLY: | case MQTT_QOS_EXACTLY: |
| pubExactly(sess, mid, szTopic, sizeof szTopic, data, siz); | pubExactly(sess, szTopic, mqtt_pktLen(hdr)); |
| siz = mqtt_msgPUBREC(sess->sess_buf, mid); | siz = mqtt_msgPUBREC(sess->sess_buf, mid); |
| if (siz == -1) { | if (siz == -1) { |
| ioDEBUG(5, "Error:: in msgPUBREC #%d - %s", | ioDEBUG(5, "Error:: in msgPUBREC #%d - %s", |
| mqtt_GetErrno(), mqtt_GetError()); | mqtt_GetErrno(), mqtt_GetError()); |
| goto end; | return 0; |
| } | } |
| break; | break; |
| case MQTT_QOS_ONCE: | case MQTT_QOS_ONCE: |
| pubOnce(sess, mid, szTopic, sizeof szTopic, data, siz); | pubOnce(sess, szTopic, mqtt_pktLen(hdr)); |
| default: | default: |
| goto end; | return 0; |
| } | } |
| if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1) | p = mkPkt(sess->sess_buf->msg_base, siz); |
| ioSYSERR(0); | memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len); |
| else { | schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0); |
| ioDEBUG(5, "Sended %d bytes.", siz); | |
| memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len); | |
| } | |
| end: | |
| if (data) | |
| free(data); | |
| return 0; | return 0; |
| } | } |
| Line 153 cmdPUBREL(void *srv, int len, void *arg) | Line 181 cmdPUBREL(void *srv, int len, void *arg) |
| struct tagSession *sess = (struct tagSession*) arg; | struct tagSession *sess = (struct tagSession*) arg; |
| int siz = 0; | int siz = 0; |
| u_short mid = 0; | u_short mid = 0; |
| struct tagPkt *p = NULL; | |
| ioTRACE(2); | ioTRACE(2); |
| Line 172 cmdPUBREL(void *srv, int len, void *arg) | Line 201 cmdPUBREL(void *srv, int len, void *arg) |
| if (siz == -1) { | if (siz == -1) { |
| ioDEBUG(5, "Error:: in msgPUBCOMP #%d - %s", mqtt_GetErrno(), mqtt_GetError()); | ioDEBUG(5, "Error:: in msgPUBCOMP #%d - %s", mqtt_GetErrno(), mqtt_GetError()); |
| return 0; | return 0; |
| } | } else { |
| if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1) | p = mkPkt(sess->sess_buf->msg_base, siz); |
| ioSYSERR(0); | |
| else { | |
| ioDEBUG(5, "Sended %d bytes.", siz); | |
| memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len); | memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len); |
| } | } |
| schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0); | |
| return 0; | return 0; |
| } | } |