--- mqtt/src/daemon.c 2011/12/09 13:43:55 1.1.2.14 +++ mqtt/src/daemon.c 2011/12/09 14:01:37 1.1.2.15 @@ -11,13 +11,14 @@ initSession(int sock, char * __restrict addr) struct tagSession *sess = NULL; const char *str; + FTRACE(5); + if (!addr) return NULL; sess = malloc(sizeof(struct tagSession)); if (!sess) { VERB(3) syslog(LOG_ERR, "Error:: in malloc #%d - %s", errno, strerror(errno)); - free(addr); return NULL; } else memset(sess, 0, sizeof(struct tagSession)); @@ -28,10 +29,6 @@ initSession(int sock, char * __restrict addr) else sess->sess_retry = strtol(str, NULL, 0); - sess->sess_sock = sock; - strlcpy(sess->sess_addr, addr, sizeof sess->sess_addr); - free(addr); - sess->sess_buf = mqtt_msgAlloc(0); if (!sess->sess_buf) { VERB(3) syslog(LOG_ERR, "Error:: in msgAlloc #%d - %s", mqtt_GetErrno(), mqtt_GetError()); @@ -39,6 +36,9 @@ initSession(int sock, char * __restrict addr) return NULL; } + sess->sess_sock = sock; + strlcpy(sess->sess_addr, addr, sizeof sess->sess_addr); + free(addr); return sess; } @@ -48,6 +48,8 @@ finiSession(struct tagSession *sess) register int i; struct tagStore *store; + FTRACE(5); + if (!sess) return; @@ -75,22 +77,33 @@ finiSession(struct tagSession *sess) } static void * +thrSession(struct tagSession *sess) +{ + FTRACE(2); + + TAILQ_REMOVE(&Sessions, sess, sess_node); + pthread_exit(NULL); + return NULL; +} + +static void * startSession(sched_task_t *task) { u_char basebuf[USHRT_MAX]; - mqtt_cb_t cbs[MQTT_TYPE_MAX + 1] = { 0 }; mqtt_msg_t buf = { basebuf, sizeof basebuf }; mqtthdr_connflgs_t flg; mqtthdr_connack_t cack; - int ret = 0; - struct timeval tv = { 0 }; struct tagSession *sess = NULL; FTRACE(4); sess = initSession(TASK_FD(task), TASK_ARG(task)); - if (!sess) + if (!sess) { + if (TASK_ARG(task)) + free(TASK_ARG(task)); + close(TASK_FD(task)); return NULL; + } if (recv(sess->sess_sock, buf.msg_base, buf.msg_len, 0) == -1) { VERB(3) syslog(LOG_ERR, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno)); @@ -112,14 +125,17 @@ startSession(sched_task_t *task) /* check online table for user */ + /* Start session thread OK ... */ + TAILQ_INSERT_TAIL(&Sessions, sess, sess_node); + pthread_create(&sess->sess_tid, NULL, (void*(*)(void*)) thrSession, sess); + pthread_detach(sess->sess_tid); + VERB(1) syslog(LOG_DEBUG, "ConnID=%s(%s) for %s login OK!", + sess->sess_cid, sess->sess_user, sess->sess_addr); + return NULL; end: /* close client connection */ - if (sess) - free(sess); - close(TASK_FD(task)); VERB(1) syslog(LOG_DEBUG, "Close client %s with socket=%d", (char*) TASK_ARG(task), (int) TASK_FD(task)); - if (TASK_ARG(task)) - free(TASK_ARG(task)); + finiSession(sess); return NULL; }