--- mqtt/src/mqttd_calls.c 2012/05/08 13:10:41 1.2.2.14 +++ mqtt/src/mqttd_calls.c 2012/05/08 14:36:10 1.2.2.16 @@ -72,7 +72,7 @@ cmdSUBSCRIBE(void *srv, int len, void *arg) /* add to db */ for (i = 0; i < siz; i++) { - if (call.WritePUB_subscribe(&cfg, pub, mid, subs[i].sub_topic.msg_base, + if (call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, subs[i].sub_topic.msg_base, sess->sess_user, sess->sess_addr, subs[i].sub_ret) > 0) { store = malloc(sizeof(struct tagStore)); if (!store) { @@ -147,8 +147,8 @@ cmdUNSUBSCRIBE(void *srv, int len, void *arg) } } - call.DeletePUB_subscribe(&cfg, pub, subs[i].sub_topic.msg_base, - sess->sess_user, sess->sess_addr); + call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, subs[i].sub_topic.msg_base, + sess->sess_user, "%"); } /* send acknowledge */ @@ -210,8 +210,14 @@ cmdCONNECT(void *srv, int len, void *arg) ioDEBUG(5, "Exec CONNECT session"); TAILQ_REMOVE(&Sessions, sess, sess_node); - if (call.FiniSessPUB) - call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%"); + if (sess->sess_clean) { + if (call.FiniSessPUB) + call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%"); + if (call.DeletePUB_subscribe) + call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%"); + if (call.WipePUB_topic) + call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1); + } while ((store = SLIST_FIRST(&sess->sess_subscr))) { SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);