--- mqtt/src/daemon.c 2012/05/08 11:45:57 1.2.2.28 +++ mqtt/src/daemon.c 2012/05/14 13:04:14 1.2.2.31 @@ -70,8 +70,14 @@ finiSession(struct tagSession *sess) if (!sess) return; - if (call.FiniSessPUB) - call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%"); + if (sess->sess_clean) { + if (call.FiniSessPUB) + call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%"); + if (call.DeletePUB_subscribe) + call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%"); + if (call.WipePUB_topic) + call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1); + } while ((store = SLIST_FIRST(&sess->sess_subscr))) { SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node); @@ -180,6 +186,7 @@ startSession(sched_task_t *task) mqtt_msg_t buf = { basebuf, sizeof basebuf }; mqtthdr_connflgs_t flg; mqtthdr_connack_t cack; + ait_val_t *v; struct tagSession *s, *sess = NULL; int ret, wlen; @@ -188,9 +195,10 @@ startSession(sched_task_t *task) assert(task); if (!TASK_DATA(task)) { + v = TASK_ARG(task); /* flow from accept new clients */ - sess = initSession(TASK_FD(task), TASK_ARG(task)); - io_freeVar(TASK_ARG(task)); + sess = initSession(TASK_FD(task), v); + io_freeVar(&v); if (!sess) { close(TASK_FD(task)); return NULL; @@ -238,6 +246,7 @@ startSession(sched_task_t *task) ret = MQTT_RETCODE_ACCEPTED; } + /* db management */ if (call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%") > 0) { ioDEBUG(2, "Old session %s should be disconnect!", sess->sess_cid); TAILQ_FOREACH(s, &Sessions, sess_node) @@ -247,7 +256,6 @@ startSession(sched_task_t *task) break; } } - if (call.InitSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, sess->sess_addr, sess->sess_will.flag, sess->sess_will.topic, sess->sess_will.msg, sess->sess_will.qos, sess->sess_will.retain) == -1) { @@ -259,6 +267,15 @@ startSession(sched_task_t *task) sess->sess_cid, sess->sess_addr, sess->sess_user); ret = MQTT_RETCODE_ACCEPTED; } + /* clean/load session if requested */ + if (sess->sess_clean) { + if (call.DeletePUB_subscribe) + call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%"); + if (call.WipePUB_topic) + call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1); + } else { + // todo: read_sql subs and prepare publish + } /* Start session task OK ... */ if (!schedRead(root, dispatchSession, sess, TASK_FD(task), NULL, 0)) { @@ -315,7 +332,7 @@ acceptClient(sched_task_t *task) ioDEBUG(1, "Connected client with socket=%d from %s", cli, AIT_GET_STR(v)); if (!schedRead(root, startSession, v, cli, NULL, 0)) { - io_freeVar(v); + io_freeVar(&v); close(cli); ioDEBUG(1, "Terminated client with socket=%d", cli); }