--- mqtt/src/daemon.c 2012/05/28 08:08:21 1.2.2.34 +++ mqtt/src/daemon.c 2012/06/20 09:23:28 1.2.2.37 @@ -24,6 +24,7 @@ initSession(int sock, ait_val_t * __restrict v) memset(sess, 0, sizeof(struct tagSession)); SLIST_INIT(&sess->sess_subscr); + SLIST_INIT(&sess->sess_sndpkt); str = cfg_getAttribute(&cfg, "mqttd", "retry"); if (!str) @@ -64,6 +65,7 @@ static void finiSession(struct tagSession *sess) { struct tagStore *store; + struct tagPkt *p; ioTRACE(5); @@ -90,6 +92,13 @@ finiSession(struct tagSession *sess) io_free(store); } + while ((p = SLIST_FIRST(&sess->sess_sndpkt))) { + SLIST_REMOVE_HEAD(&sess->sess_sndpkt, pkt_node); + + io_freeVar(&p->pkt_data); + io_free(p); + } + if (sess->sess_will.msg) free(sess->sess_will.msg); if (sess->sess_will.topic) @@ -252,7 +261,8 @@ startSession(sched_task_t *task) TAILQ_FOREACH(s, &Sessions, sess_node) if (!strcmp(s->sess_cid, sess->sess_cid)) { /* found stale session & disconnect it! */ - schedWrite(root, leaveClient, sess, TASK_FD(task), NULL, 0); + schedCancelby(root, taskMAX, CRITERIA_FD, (void*) s->sess_sock, NULL); + schedWrite(root, leaveClient, s, s->sess_sock, NULL, 0); break; } } @@ -274,7 +284,7 @@ startSession(sched_task_t *task) if (call.WipePUB_topic) call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1); } else { - // todo: read_sql subs and prepare publish + // TODO: read_sql subs and prepare publish } /* Start session task OK ... */