--- mqtt/src/daemon.c 2012/04/25 07:37:16 1.2.2.10 +++ mqtt/src/daemon.c 2012/04/25 12:04:30 1.2.2.11 @@ -230,6 +230,7 @@ thrSession(struct tagSession *sess) /* dispatch message type */ if (mqtt_srv_Dispatch(sess->sess_srv, sess)) ioLIBERR(mqtt); + locKill ^= locKill; switch (hdr->mqtt_msg.type) { case MQTT_TYPE_CONNECT: ioDEBUG(5, "Exec CONNECT session"); @@ -239,12 +240,11 @@ thrSession(struct tagSession *sess) io_freeVar(v); } else ioLIBERR(mqtt); - locKill ^= locKill; continue; case MQTT_TYPE_DISCONNECT: ioDEBUG(5, "Exec DISCONNECT session"); finiSession(sess); - locKill ^= locKill; + locKill = 42; continue; case MQTT_TYPE_PUBLISH: ioDEBUG(5, "Exec PUBLISH topic QoS=%d", hdr->mqtt_msg.qos); @@ -262,6 +262,9 @@ thrSession(struct tagSession *sess) case MQTT_TYPE_PINGREQ: ioDEBUG(5, "Exec PINGREQ session"); break; + case MQTT_TYPE_PINGRESP: + ioDEBUG(5, "Exec PINGRESP session"); + break; default: ioDEBUG(5, "Error:: Session %s, wrong command %d - DISCARDED", sess->sess_cid, hdr->mqtt_msg.type); @@ -271,6 +274,7 @@ thrSession(struct tagSession *sess) pthread_cleanup_pop(locKill); pthread_exit(NULL); + return NULL; } static void * @@ -286,6 +290,8 @@ startSession(sched_task_t *task) ioTRACE(4); assert(task); + printf("aaaaaaaaaaaaaaaaa\n"); + fflush(stdout); if (!TASK_DATA(task)) { /* flow from accept new clients */ @@ -323,6 +329,8 @@ startSession(sched_task_t *task) sess->sess_will.flag = flg.will_flg; } + printf("sql=%p\n", acc); + fflush(stdout); /* check online table for user */ if (call.LoginACC(&cfg, acc, sess->sess_user, sess->sess_pass) < 1) { ioDEBUG(0, "Login:: DENIED for username %s and password %s", @@ -333,6 +341,8 @@ startSession(sched_task_t *task) ioDEBUG(1, "Login:: ALLOWED for username %s ...", sess->sess_user); ret = MQTT_RETCODE_ACCEPTED; } + printf(".sql=%p\n", pub); + fflush(stdout); if (call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%") > 0) { ioDEBUG(2, "Old session %s should be disconnect!", sess->sess_cid); TAILQ_FOREACH(s, &Sessions, sess_node) @@ -342,6 +352,8 @@ startSession(sched_task_t *task) break; } } + printf("...sql=%p\n", pub); + fflush(stdout); if (call.InitSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, sess->sess_addr, sess->sess_will.flag, sess->sess_will.topic, sess->sess_will.msg, sess->sess_will.qos, sess->sess_will.retain) == -1) { @@ -368,7 +380,8 @@ startSession(sched_task_t *task) /* Start session thread OK ... */ SESS_LOCK; TAILQ_INSERT_TAIL(&Sessions, sess, sess_node); - pthread_create(&sess->sess_tid, &attr, (void*(*)(void*)) thrSession, sess); + pthread_create(&sess->sess_tid, NULL, (void*(*)(void*)) thrSession, sess); + pthread_detach(sess->sess_tid); SESS_UNLOCK; call.LOG(logg, "Session %s started from %s for user %s (timeout=%d) OK!\n", sess->sess_cid, @@ -425,7 +438,8 @@ acceptClient(sched_task_t *task) ioDEBUG(1, "Terminated client with socket=%d", cli); } end: - schedRead(TASK_ROOT(task), acceptClient, NULL, TASK_FD(task), NULL, 0); + if (!schedRead(TASK_ROOT(task), acceptClient, NULL, TASK_FD(task), NULL, 0)) + ioLIBERR(sched); return NULL; }