--- mqtt/src/daemon.c 2012/04/25 07:37:16 1.2.2.10 +++ mqtt/src/daemon.c 2012/04/25 13:33:53 1.2.2.13 @@ -239,11 +239,21 @@ thrSession(struct tagSession *sess) io_freeVar(v); } else ioLIBERR(mqtt); + + SESS_LOCK; + TAILQ_REMOVE(&Sessions, sess, sess_node); + SESS_UNLOCK; + locKill ^= locKill; - continue; + break; case MQTT_TYPE_DISCONNECT: ioDEBUG(5, "Exec DISCONNECT session"); finiSession(sess); + + SESS_LOCK; + TAILQ_REMOVE(&Sessions, sess, sess_node); + SESS_UNLOCK; + locKill ^= locKill; continue; case MQTT_TYPE_PUBLISH: @@ -262,6 +272,9 @@ thrSession(struct tagSession *sess) case MQTT_TYPE_PINGREQ: ioDEBUG(5, "Exec PINGREQ session"); break; + case MQTT_TYPE_PINGRESP: + ioDEBUG(5, "Exec PINGRESP session"); + break; default: ioDEBUG(5, "Error:: Session %s, wrong command %d - DISCARDED", sess->sess_cid, hdr->mqtt_msg.type); @@ -271,6 +284,7 @@ thrSession(struct tagSession *sess) pthread_cleanup_pop(locKill); pthread_exit(NULL); + return NULL; } static void * @@ -333,6 +347,7 @@ startSession(sched_task_t *task) ioDEBUG(1, "Login:: ALLOWED for username %s ...", sess->sess_user); ret = MQTT_RETCODE_ACCEPTED; } + if (call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%") > 0) { ioDEBUG(2, "Old session %s should be disconnect!", sess->sess_cid); TAILQ_FOREACH(s, &Sessions, sess_node) @@ -342,6 +357,7 @@ startSession(sched_task_t *task) break; } } + if (call.InitSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, sess->sess_addr, sess->sess_will.flag, sess->sess_will.topic, sess->sess_will.msg, sess->sess_will.qos, sess->sess_will.retain) == -1) { @@ -425,7 +441,8 @@ acceptClient(sched_task_t *task) ioDEBUG(1, "Terminated client with socket=%d", cli); } end: - schedRead(TASK_ROOT(task), acceptClient, NULL, TASK_FD(task), NULL, 0); + if (!schedRead(TASK_ROOT(task), acceptClient, NULL, TASK_FD(task), NULL, 0)) + ioLIBERR(sched); return NULL; }