version 1.2.2.25, 2012/05/05 14:51:02
|
version 1.2.2.31, 2012/05/14 13:04:14
|
Line 39 initSession(int sock, ait_val_t * __restrict v)
|
Line 39 initSession(int sock, ait_val_t * __restrict v)
|
} |
} |
|
|
/* init server actor */ |
/* init server actor */ |
sess->sess_srv = mqtt_srv_Init(sock, sess->sess_buf); | sess->sess_srv = mqtt_srv_Init(sock, sess->sess_buf, sess->sess_ka); |
if (!sess->sess_srv) { |
if (!sess->sess_srv) { |
ioDEBUG(3, "Error:: in srv_Init #%d - %s", mqtt_GetErrno(), mqtt_GetError()); |
ioDEBUG(3, "Error:: in srv_Init #%d - %s", mqtt_GetErrno(), mqtt_GetError()); |
mqtt_msgFree(&sess->sess_buf, 42); |
mqtt_msgFree(&sess->sess_buf, 42); |
Line 70 finiSession(struct tagSession *sess)
|
Line 70 finiSession(struct tagSession *sess)
|
if (!sess) |
if (!sess) |
return; |
return; |
|
|
if (call.FiniSessPUB) | if (sess->sess_clean) { |
call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%"); | if (call.FiniSessPUB) |
| call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%"); |
| if (call.DeletePUB_subscribe) |
| call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%"); |
| if (call.WipePUB_topic) |
| call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1); |
| } |
|
|
while ((store = SLIST_FIRST(&sess->sess_subscr))) { |
while ((store = SLIST_FIRST(&sess->sess_subscr))) { |
SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node); |
SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node); |
Line 140 dispatchSession(sched_task_t *task)
|
Line 146 dispatchSession(sched_task_t *task)
|
/* receive & decode packet */ |
/* receive & decode packet */ |
if ((ret = recv(TASK_FD(task), sess->sess_buf->msg_base, sess->sess_buf->msg_len, 0)) == -1) { |
if ((ret = recv(TASK_FD(task), sess->sess_buf->msg_base, sess->sess_buf->msg_len, 0)) == -1) { |
ioDEBUG(3, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno)); |
ioDEBUG(3, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno)); |
|
TAILQ_REMOVE(&Sessions, sess, sess_node); |
finiSession(sess); |
finiSession(sess); |
return NULL; |
return NULL; |
} else if (!ret) { |
} else if (!ret) { |
ioDEBUG(4, "Session %s EOF received.", sess->sess_cid); |
ioDEBUG(4, "Session %s EOF received.", sess->sess_cid); |
|
TAILQ_REMOVE(&Sessions, sess, sess_node); |
finiSession(sess); |
finiSession(sess); |
return NULL; |
return NULL; |
} |
} |
Line 151 dispatchSession(sched_task_t *task)
|
Line 159 dispatchSession(sched_task_t *task)
|
do { |
do { |
/* dispatch message type */ |
/* dispatch message type */ |
if ((len = mqtt_srv_Dispatch(sess->sess_srv, ret, sess)) < 0) { |
if ((len = mqtt_srv_Dispatch(sess->sess_srv, ret, sess)) < 0) { |
if (len == -1) | if (len == -1) { |
ioLIBERR(mqtt); |
ioLIBERR(mqtt); |
finiSession(sess); | finiSession(sess); |
| } else if (len == -2) { |
| TAILQ_REMOVE(&Sessions, sess, sess_node); |
| finiSession(sess); |
| } else if (len == -3) |
| schedEvent(root, startSession, NULL, (u_long) TASK_FD(task), sess, ret); |
} else |
} else |
ret -= len; |
ret -= len; |
} while (len > 0 && ret > 0); |
} while (len > 0 && ret > 0); |
|
|
if (len >= 0 && !schedRead(root, dispatchSession, TASK_ARG(task), TASK_FD(task), NULL, 0)) { |
if (len >= 0 && !schedRead(root, dispatchSession, TASK_ARG(task), TASK_FD(task), NULL, 0)) { |
ioLIBERR(sched); |
ioLIBERR(sched); |
|
TAILQ_REMOVE(&Sessions, sess, sess_node); |
finiSession(sess); |
finiSession(sess); |
} |
} |
return NULL; |
return NULL; |
Line 172 startSession(sched_task_t *task)
|
Line 186 startSession(sched_task_t *task)
|
mqtt_msg_t buf = { basebuf, sizeof basebuf }; |
mqtt_msg_t buf = { basebuf, sizeof basebuf }; |
mqtthdr_connflgs_t flg; |
mqtthdr_connflgs_t flg; |
mqtthdr_connack_t cack; |
mqtthdr_connack_t cack; |
|
ait_val_t *v; |
struct tagSession *s, *sess = NULL; |
struct tagSession *s, *sess = NULL; |
int ret, wlen; |
int ret, wlen; |
|
|
Line 180 startSession(sched_task_t *task)
|
Line 195 startSession(sched_task_t *task)
|
assert(task); |
assert(task); |
|
|
if (!TASK_DATA(task)) { |
if (!TASK_DATA(task)) { |
|
v = TASK_ARG(task); |
/* flow from accept new clients */ |
/* flow from accept new clients */ |
sess = initSession(TASK_FD(task), TASK_ARG(task)); | sess = initSession(TASK_FD(task), v); |
io_freeVar(TASK_ARG(task)); | io_freeVar(&v); |
if (!sess) { |
if (!sess) { |
close(TASK_FD(task)); |
close(TASK_FD(task)); |
return NULL; |
return NULL; |
Line 215 startSession(sched_task_t *task)
|
Line 231 startSession(sched_task_t *task)
|
sess->sess_will.qos = flg.will_qos; |
sess->sess_will.qos = flg.will_qos; |
sess->sess_will.retain = flg.will_retain; |
sess->sess_will.retain = flg.will_retain; |
sess->sess_will.flag = flg.will_flg; |
sess->sess_will.flag = flg.will_flg; |
|
|
|
sess->sess_srv->timeout = sess->sess_ka; |
} |
} |
|
|
/* check online table for user */ |
/* check online table for user */ |
Line 228 startSession(sched_task_t *task)
|
Line 246 startSession(sched_task_t *task)
|
ret = MQTT_RETCODE_ACCEPTED; |
ret = MQTT_RETCODE_ACCEPTED; |
} |
} |
|
|
|
/* db management */ |
if (call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%") > 0) { |
if (call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%") > 0) { |
ioDEBUG(2, "Old session %s should be disconnect!", sess->sess_cid); |
ioDEBUG(2, "Old session %s should be disconnect!", sess->sess_cid); |
TAILQ_FOREACH(s, &Sessions, sess_node) |
TAILQ_FOREACH(s, &Sessions, sess_node) |
Line 237 startSession(sched_task_t *task)
|
Line 256 startSession(sched_task_t *task)
|
break; |
break; |
} |
} |
} |
} |
|
|
if (call.InitSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, sess->sess_addr, |
if (call.InitSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, sess->sess_addr, |
sess->sess_will.flag, sess->sess_will.topic, sess->sess_will.msg, |
sess->sess_will.flag, sess->sess_will.topic, sess->sess_will.msg, |
sess->sess_will.qos, sess->sess_will.retain) == -1) { |
sess->sess_will.qos, sess->sess_will.retain) == -1) { |
Line 249 startSession(sched_task_t *task)
|
Line 267 startSession(sched_task_t *task)
|
sess->sess_cid, sess->sess_addr, sess->sess_user); |
sess->sess_cid, sess->sess_addr, sess->sess_user); |
ret = MQTT_RETCODE_ACCEPTED; |
ret = MQTT_RETCODE_ACCEPTED; |
} |
} |
|
/* clean/load session if requested */ |
|
if (sess->sess_clean) { |
|
if (call.DeletePUB_subscribe) |
|
call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%"); |
|
if (call.WipePUB_topic) |
|
call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1); |
|
} else { |
|
// todo: read_sql subs and prepare publish |
|
} |
|
|
/* Start session task OK ... */ |
/* Start session task OK ... */ |
TAILQ_INSERT_TAIL(&Sessions, sess, sess_node); |
|
|
|
if (!schedRead(root, dispatchSession, sess, TASK_FD(task), NULL, 0)) { |
if (!schedRead(root, dispatchSession, sess, TASK_FD(task), NULL, 0)) { |
ioLIBERR(sched); |
ioLIBERR(sched); |
ret = MQTT_RETCODE_DENIED; |
ret = MQTT_RETCODE_DENIED; |
} | } else |
| TAILQ_INSERT_TAIL(&Sessions, sess, sess_node); |
|
|
call.LOG(logg, "Session %s started from %s for user %s (timeout=%d) OK!\n", sess->sess_cid, |
call.LOG(logg, "Session %s started from %s for user %s (timeout=%d) OK!\n", sess->sess_cid, |
sess->sess_addr, sess->sess_user, sess->sess_ka); |
sess->sess_addr, sess->sess_user, sess->sess_ka); |
Line 306 acceptClient(sched_task_t *task)
|
Line 332 acceptClient(sched_task_t *task)
|
ioDEBUG(1, "Connected client with socket=%d from %s", cli, AIT_GET_STR(v)); |
ioDEBUG(1, "Connected client with socket=%d from %s", cli, AIT_GET_STR(v)); |
|
|
if (!schedRead(root, startSession, v, cli, NULL, 0)) { |
if (!schedRead(root, startSession, v, cli, NULL, 0)) { |
io_freeVar(v); | io_freeVar(&v); |
close(cli); |
close(cli); |
ioDEBUG(1, "Terminated client with socket=%d", cli); |
ioDEBUG(1, "Terminated client with socket=%d", cli); |
} |
} |
Line 352 Run(int sock)
|
Line 378 Run(int sock)
|
} |
} |
TAILQ_FOREACH(sess, &Sessions, sess_node) { |
TAILQ_FOREACH(sess, &Sessions, sess_node) { |
TAILQ_REMOVE(&Sessions, sess, sess_node); |
TAILQ_REMOVE(&Sessions, sess, sess_node); |
|
|
finiSession(sess); |
finiSession(sess); |
} |
} |
return 0; |
return 0; |