|
|
| version 1.2.2.26, 2012/05/05 15:04:29 | version 1.2.2.35, 2012/06/01 10:39:48 |
|---|---|
| Line 16 initSession(int sock, ait_val_t * __restrict v) | Line 16 initSession(int sock, ait_val_t * __restrict v) |
| if (!v) | if (!v) |
| return NULL; | return NULL; |
| sess = malloc(sizeof(struct tagSession)); | sess = io_malloc(sizeof(struct tagSession)); |
| if (!sess) { | if (!sess) { |
| ioSYSERR(0); | ioSYSERR(0); |
| return NULL; | return NULL; |
| Line 24 initSession(int sock, ait_val_t * __restrict v) | Line 24 initSession(int sock, ait_val_t * __restrict v) |
| memset(sess, 0, sizeof(struct tagSession)); | memset(sess, 0, sizeof(struct tagSession)); |
| SLIST_INIT(&sess->sess_subscr); | SLIST_INIT(&sess->sess_subscr); |
| SLIST_INIT(&sess->sess_sndpkt); | |
| str = cfg_getAttribute(&cfg, "mqttd", "retry"); | str = cfg_getAttribute(&cfg, "mqttd", "retry"); |
| if (!str) | if (!str) |
| Line 34 initSession(int sock, ait_val_t * __restrict v) | Line 35 initSession(int sock, ait_val_t * __restrict v) |
| sess->sess_buf = mqtt_msgAlloc(USHRT_MAX); | sess->sess_buf = mqtt_msgAlloc(USHRT_MAX); |
| if (!sess->sess_buf) { | if (!sess->sess_buf) { |
| ioLIBERR(mqtt); | ioLIBERR(mqtt); |
| free(sess); | io_free(sess); |
| return NULL; | return NULL; |
| } | } |
| /* init server actor */ | /* init server actor */ |
| sess->sess_srv = mqtt_srv_Init(sock, sess->sess_buf); | sess->sess_srv = mqtt_srv_Init(sock, sess->sess_buf, sess->sess_ka); |
| if (!sess->sess_srv) { | if (!sess->sess_srv) { |
| ioDEBUG(3, "Error:: in srv_Init #%d - %s", mqtt_GetErrno(), mqtt_GetError()); | ioDEBUG(3, "Error:: in srv_Init #%d - %s", mqtt_GetErrno(), mqtt_GetError()); |
| mqtt_msgFree(&sess->sess_buf, 42); | mqtt_msgFree(&sess->sess_buf, 42); |
| free(sess); | io_free(sess); |
| return NULL; | return NULL; |
| } else { | } else { |
| mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_CONNECT, cmdCONNECT); | mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_CONNECT, cmdCONNECT); |
| Line 64 static void | Line 65 static void |
| finiSession(struct tagSession *sess) | finiSession(struct tagSession *sess) |
| { | { |
| struct tagStore *store; | struct tagStore *store; |
| struct tagPkt *p; | |
| ioTRACE(5); | ioTRACE(5); |
| if (!sess) | if (!sess) |
| return; | return; |
| if (call.FiniSessPUB) | if (sess->sess_clean) { |
| call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%"); | if (call.FiniSessPUB) |
| call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%"); | |
| if (call.DeletePUB_subscribe) | |
| call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%"); | |
| if (call.WipePUB_topic) | |
| call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1); | |
| } | |
| while ((store = SLIST_FIRST(&sess->sess_subscr))) { | while ((store = SLIST_FIRST(&sess->sess_subscr))) { |
| SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node); | SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node); |
| Line 81 finiSession(struct tagSession *sess) | Line 89 finiSession(struct tagSession *sess) |
| if (store->st_subscr.sub_value.msg_base) | if (store->st_subscr.sub_value.msg_base) |
| free(store->st_subscr.sub_value.msg_base); | free(store->st_subscr.sub_value.msg_base); |
| free(store); | io_free(store); |
| } | } |
| while ((p = SLIST_FIRST(&sess->sess_sndpkt))) { | |
| SLIST_REMOVE_HEAD(&sess->sess_sndpkt, pkt_node); | |
| io_freeVar(&p->pkt_data); | |
| io_free(p); | |
| } | |
| if (sess->sess_will.msg) | if (sess->sess_will.msg) |
| free(sess->sess_will.msg); | free(sess->sess_will.msg); |
| if (sess->sess_will.topic) | if (sess->sess_will.topic) |
| Line 95 finiSession(struct tagSession *sess) | Line 110 finiSession(struct tagSession *sess) |
| mqtt_srv_Fini(&sess->sess_srv); | mqtt_srv_Fini(&sess->sess_srv); |
| mqtt_msgFree(&sess->sess_buf, 42); | mqtt_msgFree(&sess->sess_buf, 42); |
| free(sess); | io_free(sess); |
| } | } |
| static void * | static void * |
| Line 116 leaveClient(sched_task_t *task) | Line 131 leaveClient(sched_task_t *task) |
| ret = mqtt_msgDISCONNECT(sess->sess_buf); | ret = mqtt_msgDISCONNECT(sess->sess_buf); |
| send(TASK_FD(task), sess->sess_buf->msg_base, ret, MSG_NOSIGNAL); | send(TASK_FD(task), sess->sess_buf->msg_base, ret, MSG_NOSIGNAL); |
| ioDEBUG(1, "Close socket=%d", TASK_FD(task)); | ioDEBUG(1, "Close socket=%ld", (long) TASK_FD(task)); |
| call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, | call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, |
| sess->sess_addr, sess->sess_user); | sess->sess_addr, sess->sess_user); |
| Line 153 dispatchSession(sched_task_t *task) | Line 168 dispatchSession(sched_task_t *task) |
| do { | do { |
| /* dispatch message type */ | /* dispatch message type */ |
| if ((len = mqtt_srv_Dispatch(sess->sess_srv, ret, sess)) < 0) { | if ((len = mqtt_srv_Dispatch(sess->sess_srv, ret, sess)) < 0) { |
| if (len == -1) | if (len == -1) { |
| ioLIBERR(mqtt); | ioLIBERR(mqtt); |
| TAILQ_REMOVE(&Sessions, sess, sess_node); | finiSession(sess); |
| finiSession(sess); | } else if (len == -2) { |
| TAILQ_REMOVE(&Sessions, sess, sess_node); | |
| finiSession(sess); | |
| } else if (len == -3) | |
| schedEvent(root, startSession, NULL, (u_long) TASK_FD(task), sess, ret); | |
| } else | } else |
| ret -= len; | ret -= len; |
| } while (len > 0 && ret > 0); | } while (len > 0 && ret > 0); |
| Line 176 startSession(sched_task_t *task) | Line 195 startSession(sched_task_t *task) |
| mqtt_msg_t buf = { basebuf, sizeof basebuf }; | mqtt_msg_t buf = { basebuf, sizeof basebuf }; |
| mqtthdr_connflgs_t flg; | mqtthdr_connflgs_t flg; |
| mqtthdr_connack_t cack; | mqtthdr_connack_t cack; |
| ait_val_t *v; | |
| struct tagSession *s, *sess = NULL; | struct tagSession *s, *sess = NULL; |
| int ret, wlen; | int ret, wlen; |
| Line 184 startSession(sched_task_t *task) | Line 204 startSession(sched_task_t *task) |
| assert(task); | assert(task); |
| if (!TASK_DATA(task)) { | if (!TASK_DATA(task)) { |
| v = TASK_ARG(task); | |
| /* flow from accept new clients */ | /* flow from accept new clients */ |
| sess = initSession(TASK_FD(task), TASK_ARG(task)); | sess = initSession(TASK_FD(task), v); |
| io_freeVar(TASK_ARG(task)); | io_freeVar(&v); |
| if (!sess) { | if (!sess) { |
| close(TASK_FD(task)); | close(TASK_FD(task)); |
| return NULL; | return NULL; |
| Line 219 startSession(sched_task_t *task) | Line 240 startSession(sched_task_t *task) |
| sess->sess_will.qos = flg.will_qos; | sess->sess_will.qos = flg.will_qos; |
| sess->sess_will.retain = flg.will_retain; | sess->sess_will.retain = flg.will_retain; |
| sess->sess_will.flag = flg.will_flg; | sess->sess_will.flag = flg.will_flg; |
| sess->sess_srv->timeout = sess->sess_ka; | |
| } | } |
| /* check online table for user */ | /* check online table for user */ |
| Line 232 startSession(sched_task_t *task) | Line 255 startSession(sched_task_t *task) |
| ret = MQTT_RETCODE_ACCEPTED; | ret = MQTT_RETCODE_ACCEPTED; |
| } | } |
| /* db management */ | |
| if (call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%") > 0) { | if (call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%") > 0) { |
| ioDEBUG(2, "Old session %s should be disconnect!", sess->sess_cid); | ioDEBUG(2, "Old session %s should be disconnect!", sess->sess_cid); |
| TAILQ_FOREACH(s, &Sessions, sess_node) | TAILQ_FOREACH(s, &Sessions, sess_node) |
| Line 241 startSession(sched_task_t *task) | Line 265 startSession(sched_task_t *task) |
| break; | break; |
| } | } |
| } | } |
| if (call.InitSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, sess->sess_addr, | if (call.InitSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, sess->sess_addr, |
| sess->sess_will.flag, sess->sess_will.topic, sess->sess_will.msg, | sess->sess_will.flag, sess->sess_will.topic, sess->sess_will.msg, |
| sess->sess_will.qos, sess->sess_will.retain) == -1) { | sess->sess_will.qos, sess->sess_will.retain) == -1) { |
| Line 253 startSession(sched_task_t *task) | Line 276 startSession(sched_task_t *task) |
| sess->sess_cid, sess->sess_addr, sess->sess_user); | sess->sess_cid, sess->sess_addr, sess->sess_user); |
| ret = MQTT_RETCODE_ACCEPTED; | ret = MQTT_RETCODE_ACCEPTED; |
| } | } |
| /* clean/load session if requested */ | |
| if (sess->sess_clean) { | |
| if (call.DeletePUB_subscribe) | |
| call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%"); | |
| if (call.WipePUB_topic) | |
| call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1); | |
| } else { | |
| // todo: read_sql subs and prepare publish | |
| } | |
| /* Start session task OK ... */ | /* Start session task OK ... */ |
| if (!schedRead(root, dispatchSession, sess, TASK_FD(task), NULL, 0)) { | if (!schedRead(root, dispatchSession, sess, TASK_FD(task), NULL, 0)) { |
| Line 266 startSession(sched_task_t *task) | Line 298 startSession(sched_task_t *task) |
| end: /* close client connection */ | end: /* close client connection */ |
| wlen = mqtt_msgCONNACK(sess->sess_buf, ret); | wlen = mqtt_msgCONNACK(sess->sess_buf, ret); |
| if ((wlen = send(TASK_FD(task), sess->sess_buf->msg_base, wlen, 0)) == -1) { | if ((wlen = send(TASK_FD(task), sess->sess_buf->msg_base, wlen, 0)) == -1) { |
| ioDEBUG(3, "Error:: send(%d) #%d - %s", TASK_FD(task), errno, strerror(errno)); | ioDEBUG(3, "Error:: send(%ld) #%d - %s", (long) TASK_FD(task), errno, strerror(errno)); |
| } else | } else |
| ioDEBUG(5, "Sended %d bytes", wlen); | ioDEBUG(5, "Sended %d bytes", wlen); |
| if (ret != MQTT_RETCODE_ACCEPTED) { | if (ret != MQTT_RETCODE_ACCEPTED) { |
| ioDEBUG(1, "Close client %s with socket=%d", sess->sess_addr, TASK_FD(task)); | ioDEBUG(1, "Close client %s with socket=%ld", sess->sess_addr, (long) TASK_FD(task)); |
| finiSession(sess); | finiSession(sess); |
| } | } |
| return NULL; | return NULL; |
| Line 309 acceptClient(sched_task_t *task) | Line 341 acceptClient(sched_task_t *task) |
| ioDEBUG(1, "Connected client with socket=%d from %s", cli, AIT_GET_STR(v)); | ioDEBUG(1, "Connected client with socket=%d from %s", cli, AIT_GET_STR(v)); |
| if (!schedRead(root, startSession, v, cli, NULL, 0)) { | if (!schedRead(root, startSession, v, cli, NULL, 0)) { |
| io_freeVar(v); | io_freeVar(&v); |
| close(cli); | close(cli); |
| ioDEBUG(1, "Terminated client with socket=%d", cli); | ioDEBUG(1, "Terminated client with socket=%d", cli); |
| } | } |
| Line 324 end: | Line 356 end: |
| int | int |
| Run(int sock) | Run(int sock) |
| { | { |
| struct tagPub *pub; | |
| struct tagSession *sess; | struct tagSession *sess; |
| struct timespec pl = { 0, 100000000 }; | struct timespec pl = { 0, 100000000 }; |
| Line 346 Run(int sock) | Line 377 Run(int sock) |
| schedRun(root, &Kill); | schedRun(root, &Kill); |
| /* free all undeleted elements into lists */ | /* free all undeleted elements into lists */ |
| TAILQ_FOREACH(pub, &Pubs, pub_node) { | |
| TAILQ_REMOVE(&Pubs, pub, pub_node); | |
| AIT_FREE_VAL(&pub->pub_name); | |
| if (pub->pub_packet.msg_base) | |
| free(pub->pub_packet.msg_base); | |
| } | |
| TAILQ_FOREACH(sess, &Sessions, sess_node) { | TAILQ_FOREACH(sess, &Sessions, sess_node) { |
| TAILQ_REMOVE(&Sessions, sess, sess_node); | TAILQ_REMOVE(&Sessions, sess, sess_node); |