version 1.2.2.10, 2012/04/25 07:37:16
|
version 1.2.2.11, 2012/04/25 12:04:30
|
Line 230 thrSession(struct tagSession *sess)
|
Line 230 thrSession(struct tagSession *sess)
|
/* dispatch message type */ |
/* dispatch message type */ |
if (mqtt_srv_Dispatch(sess->sess_srv, sess)) |
if (mqtt_srv_Dispatch(sess->sess_srv, sess)) |
ioLIBERR(mqtt); |
ioLIBERR(mqtt); |
|
locKill ^= locKill; |
switch (hdr->mqtt_msg.type) { |
switch (hdr->mqtt_msg.type) { |
case MQTT_TYPE_CONNECT: |
case MQTT_TYPE_CONNECT: |
ioDEBUG(5, "Exec CONNECT session"); |
ioDEBUG(5, "Exec CONNECT session"); |
Line 239 thrSession(struct tagSession *sess)
|
Line 240 thrSession(struct tagSession *sess)
|
io_freeVar(v); |
io_freeVar(v); |
} else |
} else |
ioLIBERR(mqtt); |
ioLIBERR(mqtt); |
locKill ^= locKill; |
|
continue; |
continue; |
case MQTT_TYPE_DISCONNECT: |
case MQTT_TYPE_DISCONNECT: |
ioDEBUG(5, "Exec DISCONNECT session"); |
ioDEBUG(5, "Exec DISCONNECT session"); |
finiSession(sess); |
finiSession(sess); |
locKill ^= locKill; | locKill = 42; |
continue; |
continue; |
case MQTT_TYPE_PUBLISH: |
case MQTT_TYPE_PUBLISH: |
ioDEBUG(5, "Exec PUBLISH topic QoS=%d", hdr->mqtt_msg.qos); |
ioDEBUG(5, "Exec PUBLISH topic QoS=%d", hdr->mqtt_msg.qos); |
Line 262 thrSession(struct tagSession *sess)
|
Line 262 thrSession(struct tagSession *sess)
|
case MQTT_TYPE_PINGREQ: |
case MQTT_TYPE_PINGREQ: |
ioDEBUG(5, "Exec PINGREQ session"); |
ioDEBUG(5, "Exec PINGREQ session"); |
break; |
break; |
|
case MQTT_TYPE_PINGRESP: |
|
ioDEBUG(5, "Exec PINGRESP session"); |
|
break; |
default: |
default: |
ioDEBUG(5, "Error:: Session %s, wrong command %d - DISCARDED", |
ioDEBUG(5, "Error:: Session %s, wrong command %d - DISCARDED", |
sess->sess_cid, hdr->mqtt_msg.type); |
sess->sess_cid, hdr->mqtt_msg.type); |
Line 271 thrSession(struct tagSession *sess)
|
Line 274 thrSession(struct tagSession *sess)
|
|
|
pthread_cleanup_pop(locKill); |
pthread_cleanup_pop(locKill); |
pthread_exit(NULL); |
pthread_exit(NULL); |
|
return NULL; |
} |
} |
|
|
static void * |
static void * |
Line 286 startSession(sched_task_t *task)
|
Line 290 startSession(sched_task_t *task)
|
ioTRACE(4); |
ioTRACE(4); |
|
|
assert(task); |
assert(task); |
|
printf("aaaaaaaaaaaaaaaaa\n"); |
|
fflush(stdout); |
|
|
if (!TASK_DATA(task)) { |
if (!TASK_DATA(task)) { |
/* flow from accept new clients */ |
/* flow from accept new clients */ |
Line 323 startSession(sched_task_t *task)
|
Line 329 startSession(sched_task_t *task)
|
sess->sess_will.flag = flg.will_flg; |
sess->sess_will.flag = flg.will_flg; |
} |
} |
|
|
|
printf("sql=%p\n", acc); |
|
fflush(stdout); |
/* check online table for user */ |
/* check online table for user */ |
if (call.LoginACC(&cfg, acc, sess->sess_user, sess->sess_pass) < 1) { |
if (call.LoginACC(&cfg, acc, sess->sess_user, sess->sess_pass) < 1) { |
ioDEBUG(0, "Login:: DENIED for username %s and password %s", |
ioDEBUG(0, "Login:: DENIED for username %s and password %s", |
Line 333 startSession(sched_task_t *task)
|
Line 341 startSession(sched_task_t *task)
|
ioDEBUG(1, "Login:: ALLOWED for username %s ...", sess->sess_user); |
ioDEBUG(1, "Login:: ALLOWED for username %s ...", sess->sess_user); |
ret = MQTT_RETCODE_ACCEPTED; |
ret = MQTT_RETCODE_ACCEPTED; |
} |
} |
|
printf(".sql=%p\n", pub); |
|
fflush(stdout); |
if (call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%") > 0) { |
if (call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%") > 0) { |
ioDEBUG(2, "Old session %s should be disconnect!", sess->sess_cid); |
ioDEBUG(2, "Old session %s should be disconnect!", sess->sess_cid); |
TAILQ_FOREACH(s, &Sessions, sess_node) |
TAILQ_FOREACH(s, &Sessions, sess_node) |
Line 342 startSession(sched_task_t *task)
|
Line 352 startSession(sched_task_t *task)
|
break; |
break; |
} |
} |
} |
} |
|
printf("...sql=%p\n", pub); |
|
fflush(stdout); |
if (call.InitSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, sess->sess_addr, |
if (call.InitSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, sess->sess_addr, |
sess->sess_will.flag, sess->sess_will.topic, sess->sess_will.msg, |
sess->sess_will.flag, sess->sess_will.topic, sess->sess_will.msg, |
sess->sess_will.qos, sess->sess_will.retain) == -1) { |
sess->sess_will.qos, sess->sess_will.retain) == -1) { |
Line 368 startSession(sched_task_t *task)
|
Line 380 startSession(sched_task_t *task)
|
/* Start session thread OK ... */ |
/* Start session thread OK ... */ |
SESS_LOCK; |
SESS_LOCK; |
TAILQ_INSERT_TAIL(&Sessions, sess, sess_node); |
TAILQ_INSERT_TAIL(&Sessions, sess, sess_node); |
pthread_create(&sess->sess_tid, &attr, (void*(*)(void*)) thrSession, sess); | pthread_create(&sess->sess_tid, NULL, (void*(*)(void*)) thrSession, sess); |
| pthread_detach(sess->sess_tid); |
SESS_UNLOCK; |
SESS_UNLOCK; |
|
|
call.LOG(logg, "Session %s started from %s for user %s (timeout=%d) OK!\n", sess->sess_cid, |
call.LOG(logg, "Session %s started from %s for user %s (timeout=%d) OK!\n", sess->sess_cid, |
Line 425 acceptClient(sched_task_t *task)
|
Line 438 acceptClient(sched_task_t *task)
|
ioDEBUG(1, "Terminated client with socket=%d", cli); |
ioDEBUG(1, "Terminated client with socket=%d", cli); |
} |
} |
end: |
end: |
schedRead(TASK_ROOT(task), acceptClient, NULL, TASK_FD(task), NULL, 0); | if (!schedRead(TASK_ROOT(task), acceptClient, NULL, TASK_FD(task), NULL, 0)) |
| ioLIBERR(sched); |
return NULL; |
return NULL; |
} |
} |
|
|