version 1.2.2.25, 2012/05/05 14:51:02
|
version 1.2.2.26, 2012/05/05 15:04:29
|
Line 140 dispatchSession(sched_task_t *task)
|
Line 140 dispatchSession(sched_task_t *task)
|
/* receive & decode packet */ |
/* receive & decode packet */ |
if ((ret = recv(TASK_FD(task), sess->sess_buf->msg_base, sess->sess_buf->msg_len, 0)) == -1) { |
if ((ret = recv(TASK_FD(task), sess->sess_buf->msg_base, sess->sess_buf->msg_len, 0)) == -1) { |
ioDEBUG(3, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno)); |
ioDEBUG(3, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno)); |
|
TAILQ_REMOVE(&Sessions, sess, sess_node); |
finiSession(sess); |
finiSession(sess); |
return NULL; |
return NULL; |
} else if (!ret) { |
} else if (!ret) { |
ioDEBUG(4, "Session %s EOF received.", sess->sess_cid); |
ioDEBUG(4, "Session %s EOF received.", sess->sess_cid); |
|
TAILQ_REMOVE(&Sessions, sess, sess_node); |
finiSession(sess); |
finiSession(sess); |
return NULL; |
return NULL; |
} |
} |
Line 153 dispatchSession(sched_task_t *task)
|
Line 155 dispatchSession(sched_task_t *task)
|
if ((len = mqtt_srv_Dispatch(sess->sess_srv, ret, sess)) < 0) { |
if ((len = mqtt_srv_Dispatch(sess->sess_srv, ret, sess)) < 0) { |
if (len == -1) |
if (len == -1) |
ioLIBERR(mqtt); |
ioLIBERR(mqtt); |
|
TAILQ_REMOVE(&Sessions, sess, sess_node); |
finiSession(sess); |
finiSession(sess); |
} else |
} else |
ret -= len; |
ret -= len; |
Line 160 dispatchSession(sched_task_t *task)
|
Line 163 dispatchSession(sched_task_t *task)
|
|
|
if (len >= 0 && !schedRead(root, dispatchSession, TASK_ARG(task), TASK_FD(task), NULL, 0)) { |
if (len >= 0 && !schedRead(root, dispatchSession, TASK_ARG(task), TASK_FD(task), NULL, 0)) { |
ioLIBERR(sched); |
ioLIBERR(sched); |
|
TAILQ_REMOVE(&Sessions, sess, sess_node); |
finiSession(sess); |
finiSession(sess); |
} |
} |
return NULL; |
return NULL; |
Line 251 startSession(sched_task_t *task)
|
Line 255 startSession(sched_task_t *task)
|
} |
} |
|
|
/* Start session task OK ... */ |
/* Start session task OK ... */ |
TAILQ_INSERT_TAIL(&Sessions, sess, sess_node); |
|
|
|
if (!schedRead(root, dispatchSession, sess, TASK_FD(task), NULL, 0)) { |
if (!schedRead(root, dispatchSession, sess, TASK_FD(task), NULL, 0)) { |
ioLIBERR(sched); |
ioLIBERR(sched); |
ret = MQTT_RETCODE_DENIED; |
ret = MQTT_RETCODE_DENIED; |
} | } else |
| TAILQ_INSERT_TAIL(&Sessions, sess, sess_node); |
|
|
call.LOG(logg, "Session %s started from %s for user %s (timeout=%d) OK!\n", sess->sess_cid, |
call.LOG(logg, "Session %s started from %s for user %s (timeout=%d) OK!\n", sess->sess_cid, |
sess->sess_addr, sess->sess_user, sess->sess_ka); |
sess->sess_addr, sess->sess_user, sess->sess_ka); |
Line 352 Run(int sock)
|
Line 355 Run(int sock)
|
} |
} |
TAILQ_FOREACH(sess, &Sessions, sess_node) { |
TAILQ_FOREACH(sess, &Sessions, sess_node) { |
TAILQ_REMOVE(&Sessions, sess, sess_node); |
TAILQ_REMOVE(&Sessions, sess, sess_node); |
|
|
finiSession(sess); |
finiSession(sess); |
} |
} |
return 0; |
return 0; |