--- mqtt/src/daemon.c 2012/04/24 13:54:50 1.2.2.9 +++ mqtt/src/daemon.c 2012/04/25 13:33:53 1.2.2.13 @@ -6,6 +6,7 @@ static void *startSession(sched_task_t *task); +static pthread_attr_t attr; static inline struct tagSession * @@ -238,11 +239,21 @@ thrSession(struct tagSession *sess) io_freeVar(v); } else ioLIBERR(mqtt); + + SESS_LOCK; + TAILQ_REMOVE(&Sessions, sess, sess_node); + SESS_UNLOCK; + locKill ^= locKill; - continue; + break; case MQTT_TYPE_DISCONNECT: ioDEBUG(5, "Exec DISCONNECT session"); finiSession(sess); + + SESS_LOCK; + TAILQ_REMOVE(&Sessions, sess, sess_node); + SESS_UNLOCK; + locKill ^= locKill; continue; case MQTT_TYPE_PUBLISH: @@ -261,6 +272,9 @@ thrSession(struct tagSession *sess) case MQTT_TYPE_PINGREQ: ioDEBUG(5, "Exec PINGREQ session"); break; + case MQTT_TYPE_PINGRESP: + ioDEBUG(5, "Exec PINGRESP session"); + break; default: ioDEBUG(5, "Error:: Session %s, wrong command %d - DISCARDED", sess->sess_cid, hdr->mqtt_msg.type); @@ -270,6 +284,7 @@ thrSession(struct tagSession *sess) pthread_cleanup_pop(locKill); pthread_exit(NULL); + return NULL; } static void * @@ -281,7 +296,6 @@ startSession(sched_task_t *task) mqtthdr_connack_t cack; struct tagSession *s, *sess = NULL; int ret; - pthread_attr_t attr; ioTRACE(4); @@ -333,6 +347,7 @@ startSession(sched_task_t *task) ioDEBUG(1, "Login:: ALLOWED for username %s ...", sess->sess_user); ret = MQTT_RETCODE_ACCEPTED; } + if (call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%") > 0) { ioDEBUG(2, "Old session %s should be disconnect!", sess->sess_cid); TAILQ_FOREACH(s, &Sessions, sess_node) @@ -342,6 +357,7 @@ startSession(sched_task_t *task) break; } } + if (call.InitSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, sess->sess_addr, sess->sess_will.flag, sess->sess_will.topic, sess->sess_will.msg, sess->sess_will.qos, sess->sess_will.retain) == -1) { @@ -366,9 +382,6 @@ startSession(sched_task_t *task) } /* Start session thread OK ... */ - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); - SESS_LOCK; TAILQ_INSERT_TAIL(&Sessions, sess, sess_node); pthread_create(&sess->sess_tid, &attr, (void*(*)(void*)) thrSession, sess); @@ -376,8 +389,6 @@ startSession(sched_task_t *task) call.LOG(logg, "Session %s started from %s for user %s (timeout=%d) OK!\n", sess->sess_cid, sess->sess_addr, sess->sess_user, sess->sess_ka); - - pthread_attr_destroy(&attr); return NULL; end: /* close client connection */ ret = mqtt_msgCONNACK(&msg, ret); @@ -430,7 +441,8 @@ acceptClient(sched_task_t *task) ioDEBUG(1, "Terminated client with socket=%d", cli); } end: - schedRead(TASK_ROOT(task), acceptClient, NULL, TASK_FD(task), NULL, 0); + if (!schedRead(TASK_ROOT(task), acceptClient, NULL, TASK_FD(task), NULL, 0)) + ioLIBERR(sched); return NULL; } @@ -456,8 +468,13 @@ Run(int sock) return -1; } + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + schedPolling(root, &pl, NULL); schedRun(root, &Kill); + + pthread_attr_destroy(&attr); /* free all undeleted elements into lists */ PUBS_LOCK;