--- mqtt/src/daemon.c 2012/05/05 15:04:29 1.2.2.26 +++ mqtt/src/daemon.c 2012/05/22 14:15:04 1.2.2.32 @@ -39,7 +39,7 @@ initSession(int sock, ait_val_t * __restrict v) } /* init server actor */ - sess->sess_srv = mqtt_srv_Init(sock, sess->sess_buf); + sess->sess_srv = mqtt_srv_Init(sock, sess->sess_buf, sess->sess_ka); if (!sess->sess_srv) { ioDEBUG(3, "Error:: in srv_Init #%d - %s", mqtt_GetErrno(), mqtt_GetError()); mqtt_msgFree(&sess->sess_buf, 42); @@ -70,8 +70,14 @@ finiSession(struct tagSession *sess) if (!sess) return; - if (call.FiniSessPUB) - call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%"); + if (sess->sess_clean) { + if (call.FiniSessPUB) + call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%"); + if (call.DeletePUB_subscribe) + call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%"); + if (call.WipePUB_topic) + call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1); + } while ((store = SLIST_FIRST(&sess->sess_subscr))) { SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node); @@ -153,10 +159,14 @@ dispatchSession(sched_task_t *task) do { /* dispatch message type */ if ((len = mqtt_srv_Dispatch(sess->sess_srv, ret, sess)) < 0) { - if (len == -1) + if (len == -1) { ioLIBERR(mqtt); - TAILQ_REMOVE(&Sessions, sess, sess_node); - finiSession(sess); + finiSession(sess); + } else if (len == -2) { + TAILQ_REMOVE(&Sessions, sess, sess_node); + finiSession(sess); + } else if (len == -3) + schedEvent(root, startSession, NULL, (u_long) TASK_FD(task), sess, ret); } else ret -= len; } while (len > 0 && ret > 0); @@ -176,6 +186,7 @@ startSession(sched_task_t *task) mqtt_msg_t buf = { basebuf, sizeof basebuf }; mqtthdr_connflgs_t flg; mqtthdr_connack_t cack; + ait_val_t *v; struct tagSession *s, *sess = NULL; int ret, wlen; @@ -184,9 +195,10 @@ startSession(sched_task_t *task) assert(task); if (!TASK_DATA(task)) { + v = TASK_ARG(task); /* flow from accept new clients */ - sess = initSession(TASK_FD(task), TASK_ARG(task)); - io_freeVar(TASK_ARG(task)); + sess = initSession(TASK_FD(task), v); + io_freeVar(&v); if (!sess) { close(TASK_FD(task)); return NULL; @@ -219,6 +231,8 @@ startSession(sched_task_t *task) sess->sess_will.qos = flg.will_qos; sess->sess_will.retain = flg.will_retain; sess->sess_will.flag = flg.will_flg; + + sess->sess_srv->timeout = sess->sess_ka; } /* check online table for user */ @@ -232,6 +246,7 @@ startSession(sched_task_t *task) ret = MQTT_RETCODE_ACCEPTED; } + /* db management */ if (call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%") > 0) { ioDEBUG(2, "Old session %s should be disconnect!", sess->sess_cid); TAILQ_FOREACH(s, &Sessions, sess_node) @@ -241,7 +256,6 @@ startSession(sched_task_t *task) break; } } - if (call.InitSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, sess->sess_addr, sess->sess_will.flag, sess->sess_will.topic, sess->sess_will.msg, sess->sess_will.qos, sess->sess_will.retain) == -1) { @@ -253,6 +267,15 @@ startSession(sched_task_t *task) sess->sess_cid, sess->sess_addr, sess->sess_user); ret = MQTT_RETCODE_ACCEPTED; } + /* clean/load session if requested */ + if (sess->sess_clean) { + if (call.DeletePUB_subscribe) + call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%"); + if (call.WipePUB_topic) + call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1); + } else { + // todo: read_sql subs and prepare publish + } /* Start session task OK ... */ if (!schedRead(root, dispatchSession, sess, TASK_FD(task), NULL, 0)) { @@ -309,7 +332,7 @@ acceptClient(sched_task_t *task) ioDEBUG(1, "Connected client with socket=%d from %s", cli, AIT_GET_STR(v)); if (!schedRead(root, startSession, v, cli, NULL, 0)) { - io_freeVar(v); + io_freeVar(&v); close(cli); ioDEBUG(1, "Terminated client with socket=%d", cli); } @@ -324,7 +347,6 @@ end: int Run(int sock) { - struct tagPub *pub; struct tagSession *sess; struct timespec pl = { 0, 100000000 }; @@ -346,13 +368,6 @@ Run(int sock) schedRun(root, &Kill); /* free all undeleted elements into lists */ - TAILQ_FOREACH(pub, &Pubs, pub_node) { - TAILQ_REMOVE(&Pubs, pub, pub_node); - - AIT_FREE_VAL(&pub->pub_name); - if (pub->pub_packet.msg_base) - free(pub->pub_packet.msg_base); - } TAILQ_FOREACH(sess, &Sessions, sess_node) { TAILQ_REMOVE(&Sessions, sess, sess_node);