--- mqtt/src/daemon.c 2012/04/25 12:04:30 1.2.2.11 +++ mqtt/src/daemon.c 2012/04/25 13:33:53 1.2.2.13 @@ -230,7 +230,6 @@ thrSession(struct tagSession *sess) /* dispatch message type */ if (mqtt_srv_Dispatch(sess->sess_srv, sess)) ioLIBERR(mqtt); - locKill ^= locKill; switch (hdr->mqtt_msg.type) { case MQTT_TYPE_CONNECT: ioDEBUG(5, "Exec CONNECT session"); @@ -240,11 +239,22 @@ thrSession(struct tagSession *sess) io_freeVar(v); } else ioLIBERR(mqtt); - continue; + + SESS_LOCK; + TAILQ_REMOVE(&Sessions, sess, sess_node); + SESS_UNLOCK; + + locKill ^= locKill; + break; case MQTT_TYPE_DISCONNECT: ioDEBUG(5, "Exec DISCONNECT session"); finiSession(sess); - locKill = 42; + + SESS_LOCK; + TAILQ_REMOVE(&Sessions, sess, sess_node); + SESS_UNLOCK; + + locKill ^= locKill; continue; case MQTT_TYPE_PUBLISH: ioDEBUG(5, "Exec PUBLISH topic QoS=%d", hdr->mqtt_msg.qos); @@ -290,8 +300,6 @@ startSession(sched_task_t *task) ioTRACE(4); assert(task); - printf("aaaaaaaaaaaaaaaaa\n"); - fflush(stdout); if (!TASK_DATA(task)) { /* flow from accept new clients */ @@ -329,8 +337,6 @@ startSession(sched_task_t *task) sess->sess_will.flag = flg.will_flg; } - printf("sql=%p\n", acc); - fflush(stdout); /* check online table for user */ if (call.LoginACC(&cfg, acc, sess->sess_user, sess->sess_pass) < 1) { ioDEBUG(0, "Login:: DENIED for username %s and password %s", @@ -341,8 +347,7 @@ startSession(sched_task_t *task) ioDEBUG(1, "Login:: ALLOWED for username %s ...", sess->sess_user); ret = MQTT_RETCODE_ACCEPTED; } - printf(".sql=%p\n", pub); - fflush(stdout); + if (call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%") > 0) { ioDEBUG(2, "Old session %s should be disconnect!", sess->sess_cid); TAILQ_FOREACH(s, &Sessions, sess_node) @@ -352,8 +357,7 @@ startSession(sched_task_t *task) break; } } - printf("...sql=%p\n", pub); - fflush(stdout); + if (call.InitSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, sess->sess_addr, sess->sess_will.flag, sess->sess_will.topic, sess->sess_will.msg, sess->sess_will.qos, sess->sess_will.retain) == -1) { @@ -380,8 +384,7 @@ startSession(sched_task_t *task) /* Start session thread OK ... */ SESS_LOCK; TAILQ_INSERT_TAIL(&Sessions, sess, sess_node); - pthread_create(&sess->sess_tid, NULL, (void*(*)(void*)) thrSession, sess); - pthread_detach(sess->sess_tid); + pthread_create(&sess->sess_tid, &attr, (void*(*)(void*)) thrSession, sess); SESS_UNLOCK; call.LOG(logg, "Session %s started from %s for user %s (timeout=%d) OK!\n", sess->sess_cid,