--- mqtt/src/daemon.c 2012/05/05 12:15:25 1.2.2.23 +++ mqtt/src/daemon.c 2012/05/05 13:39:27 1.2.2.24 @@ -5,10 +5,6 @@ #include "mqttd_calls.h" -static void *startSession(sched_task_t *task); -static pthread_attr_t attr; - - static inline struct tagSession * initSession(int sock, ait_val_t * __restrict v) { @@ -66,7 +62,7 @@ initSession(int sock, ait_val_t * __restrict v) return sess; } -static void +void finiSession(struct tagSession *sess) { struct tagStore *store; @@ -138,8 +134,7 @@ leaveClient(sched_task_t *task) static void * dispatchSession(sched_task_t *task) { - int ret; - struct mqtthdr *hdr; + int ret, len = 0; struct tagSession *sess; ioTRACE(2); @@ -158,20 +153,23 @@ dispatchSession(sched_task_t *task) ioDEBUG(4, "Session %s EOF received.", sess->sess_cid); finiSession(sess); return NULL; - } else - hdr = (struct mqtthdr*) sess->sess_buf->msg_base; + } - /* dispatch message type */ - if (mqtt_srv_Dispatch(sess->sess_srv, sess)) - ioLIBERR(mqtt); -// schedEvent(root, startSession, NULL, (u_long) TASK_FD(task), sess, ret); + do { + /* dispatch message type */ + if ((len = mqtt_srv_Dispatch(sess->sess_srv, ret, sess)) == -1) { + ioLIBERR(mqtt); + ret = 0; + } else + ret -= len; + } while (len && ret > 0); if (!schedRead(root, dispatchSession, TASK_ARG(task), TASK_FD(task), NULL, 0)) ioLIBERR(sched); return NULL; } -static void * +void * startSession(sched_task_t *task) { u_char basebuf[USHRT_MAX]; @@ -344,13 +342,8 @@ Run(int sock) return -1; } - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); - schedPolling(root, &pl, NULL); schedRun(root, &Kill); - - pthread_attr_destroy(&attr); /* free all undeleted elements into lists */ PUBS_LOCK;