--- mqtt/src/daemon.c 2012/06/29 15:43:13 1.2.2.42 +++ mqtt/src/daemon.c 2012/07/03 07:48:49 1.2.2.44 @@ -70,6 +70,8 @@ finiSession(struct tagSession *sess) if (!sess) return; + schedCancelby(root, taskTIMER, CRITERIA_CALL, sendRetain, NULL); + if (sess->sess_clean) { if (call.FiniSessPUB) call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%"); @@ -91,7 +93,7 @@ finiSession(struct tagSession *sess) } if (sess->sess_will.flag) - srv_Will(sess); + pubWill(sess); if (sess->sess_will.topic) free(sess->sess_will.topic); @@ -219,6 +221,7 @@ startSession(sched_task_t *task) struct tagSession *s, *sess = NULL; int ret, wlen; mqtt_subscr_t *subs; + struct timespec ts = { RETAIN_TIMEOUT, 0 }; ioTRACE(4); @@ -305,10 +308,15 @@ startSession(sched_task_t *task) if (call.WipePUB_topic) call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1); } else if (call.ReadPUB_subscribe) { + /* load subscribes */ subs = call.ReadPUB_subscribe(&cfg, pub, sess->sess_cid, "%"); loadSubscribes(sess, subs); mqtt_subFree(&subs); } + + /* timer event for retain messages */ + if (call.ReadPUB_topic) + schedTimer(root, sendRetain, sess, ts, NULL, 0); /* Start session task OK ... */ if (!schedRead(root, dispatchSession, sess, TASK_FD(task), NULL, 0)) {