--- mqtt/src/mqttd_calls.c 2012/07/02 12:52:19 1.2.2.33 +++ mqtt/src/mqttd_calls.c 2012/07/03 08:07:15 1.2.2.36 @@ -42,7 +42,7 @@ sendPacket(sched_task_t *task) return NULL; } - ioDEBUG(7, "Send packet length %d for socket %d\n", AIT_LEN(p), TASK_FD(task)); + ioDEBUG(7, "Send packet length %d for socket %d\n", AIT_LEN(p), (u_int) TASK_FD(task)); for (slen = AIT_LEN(p), pos = AIT_GET_BUF(p); slen > 0; slen -= n, pos += n) { n = send(TASK_FD(task), pos, slen, MSG_NOSIGNAL); @@ -95,6 +95,43 @@ search4send(struct tagSession * __restrict sess, const /* --------------------------------------------------- */ +void * +sendRetain(sched_task_t *task) +{ + mqtt_subscr_t *subs, *s; + struct tagSession *sess; + int siz; + + ioTRACE(2); + + assert(task); + + sess = TASK_ARG(task); + assert(sess); + + if (!sess->sess_buf) { + ioDEBUG(9, "WARNING! No allocated buffer!?!\n"); + return NULL; + } + + subs = call.ReadPUB_topic(&cfg, pub, "%", "%", 1); + if (!subs) + return NULL; + + for (s = subs; s && s->sub_topic.msg_base; s++) { + siz = s->sub_value.msg_len; + memcpy(sess->sess_buf->msg_base, s->sub_value.msg_base, + MIN(sess->sess_buf->msg_len, s->sub_value.msg_len)); + ioDEBUG(7, "Sending retain message %d bytes, QoS %hhd topic '%s' data length %d\n", + siz, s->sub_ret, (char*) s->sub_topic.msg_base, s->sub_value.msg_len); + if (siz > 0) + search4send(sess, s->sub_topic.msg_base, siz, s->sub_ret); + } + + mqtt_subFree(&subs); + return NULL; +} + int pubWill(struct tagSession * __restrict sess) { @@ -428,6 +465,8 @@ cmdCONNECT(void *srv, int len, void *arg) ioDEBUG(5, "Exec CONNECT session"); TAILQ_REMOVE(&Sessions, sess, sess_node); + + schedCancelby(root, taskTIMER, CRITERIA_CALL, sendRetain, NULL); if (sess->sess_clean) { if (call.FiniSessPUB)