--- mqtt/src/daemon.c 2012/06/01 10:39:48 1.2.2.35 +++ mqtt/src/daemon.c 2012/06/26 08:05:58 1.2.2.38 @@ -24,7 +24,6 @@ initSession(int sock, ait_val_t * __restrict v) memset(sess, 0, sizeof(struct tagSession)); SLIST_INIT(&sess->sess_subscr); - SLIST_INIT(&sess->sess_sndpkt); str = cfg_getAttribute(&cfg, "mqttd", "retry"); if (!str) @@ -65,7 +64,6 @@ static void finiSession(struct tagSession *sess) { struct tagStore *store; - struct tagPkt *p; ioTRACE(5); @@ -92,13 +90,6 @@ finiSession(struct tagSession *sess) io_free(store); } - while ((p = SLIST_FIRST(&sess->sess_sndpkt))) { - SLIST_REMOVE_HEAD(&sess->sess_sndpkt, pkt_node); - - io_freeVar(&p->pkt_data); - io_free(p); - } - if (sess->sess_will.msg) free(sess->sess_will.msg); if (sess->sess_will.topic) @@ -164,6 +155,7 @@ dispatchSession(sched_task_t *task) finiSession(sess); return NULL; } + ioDEBUG(0, "++++++ret=%d\n", ret); do { /* dispatch message type */ @@ -261,7 +253,8 @@ startSession(sched_task_t *task) TAILQ_FOREACH(s, &Sessions, sess_node) if (!strcmp(s->sess_cid, sess->sess_cid)) { /* found stale session & disconnect it! */ - schedWrite(root, leaveClient, sess, TASK_FD(task), NULL, 0); + schedCancelby(root, taskMAX, CRITERIA_FD, (void*) s->sess_sock, NULL); + schedWrite(root, leaveClient, s, s->sess_sock, NULL, 0); break; } } @@ -283,7 +276,7 @@ startSession(sched_task_t *task) if (call.WipePUB_topic) call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1); } else { - // todo: read_sql subs and prepare publish + // TODO: read_sql subs and prepare publish } /* Start session task OK ... */