version 1.2.2.25, 2012/05/05 14:51:02
|
version 1.2.2.28, 2012/05/08 11:45:57
|
Line 39 initSession(int sock, ait_val_t * __restrict v)
|
Line 39 initSession(int sock, ait_val_t * __restrict v)
|
} |
} |
|
|
/* init server actor */ |
/* init server actor */ |
sess->sess_srv = mqtt_srv_Init(sock, sess->sess_buf); | sess->sess_srv = mqtt_srv_Init(sock, sess->sess_buf, sess->sess_ka); |
if (!sess->sess_srv) { |
if (!sess->sess_srv) { |
ioDEBUG(3, "Error:: in srv_Init #%d - %s", mqtt_GetErrno(), mqtt_GetError()); |
ioDEBUG(3, "Error:: in srv_Init #%d - %s", mqtt_GetErrno(), mqtt_GetError()); |
mqtt_msgFree(&sess->sess_buf, 42); |
mqtt_msgFree(&sess->sess_buf, 42); |
Line 140 dispatchSession(sched_task_t *task)
|
Line 140 dispatchSession(sched_task_t *task)
|
/* receive & decode packet */ |
/* receive & decode packet */ |
if ((ret = recv(TASK_FD(task), sess->sess_buf->msg_base, sess->sess_buf->msg_len, 0)) == -1) { |
if ((ret = recv(TASK_FD(task), sess->sess_buf->msg_base, sess->sess_buf->msg_len, 0)) == -1) { |
ioDEBUG(3, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno)); |
ioDEBUG(3, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno)); |
|
TAILQ_REMOVE(&Sessions, sess, sess_node); |
finiSession(sess); |
finiSession(sess); |
return NULL; |
return NULL; |
} else if (!ret) { |
} else if (!ret) { |
ioDEBUG(4, "Session %s EOF received.", sess->sess_cid); |
ioDEBUG(4, "Session %s EOF received.", sess->sess_cid); |
|
TAILQ_REMOVE(&Sessions, sess, sess_node); |
finiSession(sess); |
finiSession(sess); |
return NULL; |
return NULL; |
} |
} |
Line 151 dispatchSession(sched_task_t *task)
|
Line 153 dispatchSession(sched_task_t *task)
|
do { |
do { |
/* dispatch message type */ |
/* dispatch message type */ |
if ((len = mqtt_srv_Dispatch(sess->sess_srv, ret, sess)) < 0) { |
if ((len = mqtt_srv_Dispatch(sess->sess_srv, ret, sess)) < 0) { |
if (len == -1) | if (len == -1) { |
ioLIBERR(mqtt); |
ioLIBERR(mqtt); |
finiSession(sess); | finiSession(sess); |
| } else if (len == -2) { |
| TAILQ_REMOVE(&Sessions, sess, sess_node); |
| finiSession(sess); |
| } else if (len == -3) |
| schedEvent(root, startSession, NULL, (u_long) TASK_FD(task), sess, ret); |
} else |
} else |
ret -= len; |
ret -= len; |
} while (len > 0 && ret > 0); |
} while (len > 0 && ret > 0); |
|
|
if (len >= 0 && !schedRead(root, dispatchSession, TASK_ARG(task), TASK_FD(task), NULL, 0)) { |
if (len >= 0 && !schedRead(root, dispatchSession, TASK_ARG(task), TASK_FD(task), NULL, 0)) { |
ioLIBERR(sched); |
ioLIBERR(sched); |
|
TAILQ_REMOVE(&Sessions, sess, sess_node); |
finiSession(sess); |
finiSession(sess); |
} |
} |
return NULL; |
return NULL; |
Line 215 startSession(sched_task_t *task)
|
Line 223 startSession(sched_task_t *task)
|
sess->sess_will.qos = flg.will_qos; |
sess->sess_will.qos = flg.will_qos; |
sess->sess_will.retain = flg.will_retain; |
sess->sess_will.retain = flg.will_retain; |
sess->sess_will.flag = flg.will_flg; |
sess->sess_will.flag = flg.will_flg; |
|
|
|
sess->sess_srv->timeout = sess->sess_ka; |
} |
} |
|
|
/* check online table for user */ |
/* check online table for user */ |
Line 251 startSession(sched_task_t *task)
|
Line 261 startSession(sched_task_t *task)
|
} |
} |
|
|
/* Start session task OK ... */ |
/* Start session task OK ... */ |
TAILQ_INSERT_TAIL(&Sessions, sess, sess_node); |
|
|
|
if (!schedRead(root, dispatchSession, sess, TASK_FD(task), NULL, 0)) { |
if (!schedRead(root, dispatchSession, sess, TASK_FD(task), NULL, 0)) { |
ioLIBERR(sched); |
ioLIBERR(sched); |
ret = MQTT_RETCODE_DENIED; |
ret = MQTT_RETCODE_DENIED; |
} | } else |
| TAILQ_INSERT_TAIL(&Sessions, sess, sess_node); |
|
|
call.LOG(logg, "Session %s started from %s for user %s (timeout=%d) OK!\n", sess->sess_cid, |
call.LOG(logg, "Session %s started from %s for user %s (timeout=%d) OK!\n", sess->sess_cid, |
sess->sess_addr, sess->sess_user, sess->sess_ka); |
sess->sess_addr, sess->sess_user, sess->sess_ka); |
Line 352 Run(int sock)
|
Line 361 Run(int sock)
|
} |
} |
TAILQ_FOREACH(sess, &Sessions, sess_node) { |
TAILQ_FOREACH(sess, &Sessions, sess_node) { |
TAILQ_REMOVE(&Sessions, sess, sess_node); |
TAILQ_REMOVE(&Sessions, sess, sess_node); |
|
|
finiSession(sess); |
finiSession(sess); |
} |
} |
return 0; |
return 0; |