|
|
| version 1.2.2.10, 2012/04/25 07:37:16 | version 1.2.2.11, 2012/04/25 12:04:30 |
|---|---|
| Line 230 thrSession(struct tagSession *sess) | Line 230 thrSession(struct tagSession *sess) |
| /* dispatch message type */ | /* dispatch message type */ |
| if (mqtt_srv_Dispatch(sess->sess_srv, sess)) | if (mqtt_srv_Dispatch(sess->sess_srv, sess)) |
| ioLIBERR(mqtt); | ioLIBERR(mqtt); |
| locKill ^= locKill; | |
| switch (hdr->mqtt_msg.type) { | switch (hdr->mqtt_msg.type) { |
| case MQTT_TYPE_CONNECT: | case MQTT_TYPE_CONNECT: |
| ioDEBUG(5, "Exec CONNECT session"); | ioDEBUG(5, "Exec CONNECT session"); |
| Line 239 thrSession(struct tagSession *sess) | Line 240 thrSession(struct tagSession *sess) |
| io_freeVar(v); | io_freeVar(v); |
| } else | } else |
| ioLIBERR(mqtt); | ioLIBERR(mqtt); |
| locKill ^= locKill; | |
| continue; | continue; |
| case MQTT_TYPE_DISCONNECT: | case MQTT_TYPE_DISCONNECT: |
| ioDEBUG(5, "Exec DISCONNECT session"); | ioDEBUG(5, "Exec DISCONNECT session"); |
| finiSession(sess); | finiSession(sess); |
| locKill ^= locKill; | locKill = 42; |
| continue; | continue; |
| case MQTT_TYPE_PUBLISH: | case MQTT_TYPE_PUBLISH: |
| ioDEBUG(5, "Exec PUBLISH topic QoS=%d", hdr->mqtt_msg.qos); | ioDEBUG(5, "Exec PUBLISH topic QoS=%d", hdr->mqtt_msg.qos); |
| Line 262 thrSession(struct tagSession *sess) | Line 262 thrSession(struct tagSession *sess) |
| case MQTT_TYPE_PINGREQ: | case MQTT_TYPE_PINGREQ: |
| ioDEBUG(5, "Exec PINGREQ session"); | ioDEBUG(5, "Exec PINGREQ session"); |
| break; | break; |
| case MQTT_TYPE_PINGRESP: | |
| ioDEBUG(5, "Exec PINGRESP session"); | |
| break; | |
| default: | default: |
| ioDEBUG(5, "Error:: Session %s, wrong command %d - DISCARDED", | ioDEBUG(5, "Error:: Session %s, wrong command %d - DISCARDED", |
| sess->sess_cid, hdr->mqtt_msg.type); | sess->sess_cid, hdr->mqtt_msg.type); |
| Line 271 thrSession(struct tagSession *sess) | Line 274 thrSession(struct tagSession *sess) |
| pthread_cleanup_pop(locKill); | pthread_cleanup_pop(locKill); |
| pthread_exit(NULL); | pthread_exit(NULL); |
| return NULL; | |
| } | } |
| static void * | static void * |
| Line 286 startSession(sched_task_t *task) | Line 290 startSession(sched_task_t *task) |
| ioTRACE(4); | ioTRACE(4); |
| assert(task); | assert(task); |
| printf("aaaaaaaaaaaaaaaaa\n"); | |
| fflush(stdout); | |
| if (!TASK_DATA(task)) { | if (!TASK_DATA(task)) { |
| /* flow from accept new clients */ | /* flow from accept new clients */ |
| Line 323 startSession(sched_task_t *task) | Line 329 startSession(sched_task_t *task) |
| sess->sess_will.flag = flg.will_flg; | sess->sess_will.flag = flg.will_flg; |
| } | } |
| printf("sql=%p\n", acc); | |
| fflush(stdout); | |
| /* check online table for user */ | /* check online table for user */ |
| if (call.LoginACC(&cfg, acc, sess->sess_user, sess->sess_pass) < 1) { | if (call.LoginACC(&cfg, acc, sess->sess_user, sess->sess_pass) < 1) { |
| ioDEBUG(0, "Login:: DENIED for username %s and password %s", | ioDEBUG(0, "Login:: DENIED for username %s and password %s", |
| Line 333 startSession(sched_task_t *task) | Line 341 startSession(sched_task_t *task) |
| ioDEBUG(1, "Login:: ALLOWED for username %s ...", sess->sess_user); | ioDEBUG(1, "Login:: ALLOWED for username %s ...", sess->sess_user); |
| ret = MQTT_RETCODE_ACCEPTED; | ret = MQTT_RETCODE_ACCEPTED; |
| } | } |
| printf(".sql=%p\n", pub); | |
| fflush(stdout); | |
| if (call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%") > 0) { | if (call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%") > 0) { |
| ioDEBUG(2, "Old session %s should be disconnect!", sess->sess_cid); | ioDEBUG(2, "Old session %s should be disconnect!", sess->sess_cid); |
| TAILQ_FOREACH(s, &Sessions, sess_node) | TAILQ_FOREACH(s, &Sessions, sess_node) |
| Line 342 startSession(sched_task_t *task) | Line 352 startSession(sched_task_t *task) |
| break; | break; |
| } | } |
| } | } |
| printf("...sql=%p\n", pub); | |
| fflush(stdout); | |
| if (call.InitSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, sess->sess_addr, | if (call.InitSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, sess->sess_addr, |
| sess->sess_will.flag, sess->sess_will.topic, sess->sess_will.msg, | sess->sess_will.flag, sess->sess_will.topic, sess->sess_will.msg, |
| sess->sess_will.qos, sess->sess_will.retain) == -1) { | sess->sess_will.qos, sess->sess_will.retain) == -1) { |
| Line 368 startSession(sched_task_t *task) | Line 380 startSession(sched_task_t *task) |
| /* Start session thread OK ... */ | /* Start session thread OK ... */ |
| SESS_LOCK; | SESS_LOCK; |
| TAILQ_INSERT_TAIL(&Sessions, sess, sess_node); | TAILQ_INSERT_TAIL(&Sessions, sess, sess_node); |
| pthread_create(&sess->sess_tid, &attr, (void*(*)(void*)) thrSession, sess); | pthread_create(&sess->sess_tid, NULL, (void*(*)(void*)) thrSession, sess); |
| pthread_detach(sess->sess_tid); | |
| SESS_UNLOCK; | SESS_UNLOCK; |
| call.LOG(logg, "Session %s started from %s for user %s (timeout=%d) OK!\n", sess->sess_cid, | call.LOG(logg, "Session %s started from %s for user %s (timeout=%d) OK!\n", sess->sess_cid, |
| Line 425 acceptClient(sched_task_t *task) | Line 438 acceptClient(sched_task_t *task) |
| ioDEBUG(1, "Terminated client with socket=%d", cli); | ioDEBUG(1, "Terminated client with socket=%d", cli); |
| } | } |
| end: | end: |
| schedRead(TASK_ROOT(task), acceptClient, NULL, TASK_FD(task), NULL, 0); | if (!schedRead(TASK_ROOT(task), acceptClient, NULL, TASK_FD(task), NULL, 0)) |
| ioLIBERR(sched); | |
| return NULL; | return NULL; |
| } | } |