--- mqtt/src/daemon.c 2012/04/24 13:54:50 1.2.2.9 +++ mqtt/src/daemon.c 2012/04/25 12:04:30 1.2.2.11 @@ -6,6 +6,7 @@ static void *startSession(sched_task_t *task); +static pthread_attr_t attr; static inline struct tagSession * @@ -229,6 +230,7 @@ thrSession(struct tagSession *sess) /* dispatch message type */ if (mqtt_srv_Dispatch(sess->sess_srv, sess)) ioLIBERR(mqtt); + locKill ^= locKill; switch (hdr->mqtt_msg.type) { case MQTT_TYPE_CONNECT: ioDEBUG(5, "Exec CONNECT session"); @@ -238,12 +240,11 @@ thrSession(struct tagSession *sess) io_freeVar(v); } else ioLIBERR(mqtt); - locKill ^= locKill; continue; case MQTT_TYPE_DISCONNECT: ioDEBUG(5, "Exec DISCONNECT session"); finiSession(sess); - locKill ^= locKill; + locKill = 42; continue; case MQTT_TYPE_PUBLISH: ioDEBUG(5, "Exec PUBLISH topic QoS=%d", hdr->mqtt_msg.qos); @@ -261,6 +262,9 @@ thrSession(struct tagSession *sess) case MQTT_TYPE_PINGREQ: ioDEBUG(5, "Exec PINGREQ session"); break; + case MQTT_TYPE_PINGRESP: + ioDEBUG(5, "Exec PINGRESP session"); + break; default: ioDEBUG(5, "Error:: Session %s, wrong command %d - DISCARDED", sess->sess_cid, hdr->mqtt_msg.type); @@ -270,6 +274,7 @@ thrSession(struct tagSession *sess) pthread_cleanup_pop(locKill); pthread_exit(NULL); + return NULL; } static void * @@ -281,11 +286,12 @@ startSession(sched_task_t *task) mqtthdr_connack_t cack; struct tagSession *s, *sess = NULL; int ret; - pthread_attr_t attr; ioTRACE(4); assert(task); + printf("aaaaaaaaaaaaaaaaa\n"); + fflush(stdout); if (!TASK_DATA(task)) { /* flow from accept new clients */ @@ -323,6 +329,8 @@ startSession(sched_task_t *task) sess->sess_will.flag = flg.will_flg; } + printf("sql=%p\n", acc); + fflush(stdout); /* check online table for user */ if (call.LoginACC(&cfg, acc, sess->sess_user, sess->sess_pass) < 1) { ioDEBUG(0, "Login:: DENIED for username %s and password %s", @@ -333,6 +341,8 @@ startSession(sched_task_t *task) ioDEBUG(1, "Login:: ALLOWED for username %s ...", sess->sess_user); ret = MQTT_RETCODE_ACCEPTED; } + printf(".sql=%p\n", pub); + fflush(stdout); if (call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%") > 0) { ioDEBUG(2, "Old session %s should be disconnect!", sess->sess_cid); TAILQ_FOREACH(s, &Sessions, sess_node) @@ -342,6 +352,8 @@ startSession(sched_task_t *task) break; } } + printf("...sql=%p\n", pub); + fflush(stdout); if (call.InitSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, sess->sess_addr, sess->sess_will.flag, sess->sess_will.topic, sess->sess_will.msg, sess->sess_will.qos, sess->sess_will.retain) == -1) { @@ -366,18 +378,14 @@ startSession(sched_task_t *task) } /* Start session thread OK ... */ - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); - SESS_LOCK; TAILQ_INSERT_TAIL(&Sessions, sess, sess_node); - pthread_create(&sess->sess_tid, &attr, (void*(*)(void*)) thrSession, sess); + pthread_create(&sess->sess_tid, NULL, (void*(*)(void*)) thrSession, sess); + pthread_detach(sess->sess_tid); SESS_UNLOCK; call.LOG(logg, "Session %s started from %s for user %s (timeout=%d) OK!\n", sess->sess_cid, sess->sess_addr, sess->sess_user, sess->sess_ka); - - pthread_attr_destroy(&attr); return NULL; end: /* close client connection */ ret = mqtt_msgCONNACK(&msg, ret); @@ -430,7 +438,8 @@ acceptClient(sched_task_t *task) ioDEBUG(1, "Terminated client with socket=%d", cli); } end: - schedRead(TASK_ROOT(task), acceptClient, NULL, TASK_FD(task), NULL, 0); + if (!schedRead(TASK_ROOT(task), acceptClient, NULL, TASK_FD(task), NULL, 0)) + ioLIBERR(sched); return NULL; } @@ -456,8 +465,13 @@ Run(int sock) return -1; } + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + schedPolling(root, &pl, NULL); schedRun(root, &Kill); + + pthread_attr_destroy(&attr); /* free all undeleted elements into lists */ PUBS_LOCK;