version 1.2.2.9, 2012/04/24 13:54:50
|
version 1.2.2.12, 2012/04/25 13:08:15
|
Line 6
|
Line 6
|
|
|
|
|
static void *startSession(sched_task_t *task); |
static void *startSession(sched_task_t *task); |
|
static pthread_attr_t attr; |
|
|
|
|
static inline struct tagSession * |
static inline struct tagSession * |
Line 229 thrSession(struct tagSession *sess)
|
Line 230 thrSession(struct tagSession *sess)
|
/* dispatch message type */ |
/* dispatch message type */ |
if (mqtt_srv_Dispatch(sess->sess_srv, sess)) |
if (mqtt_srv_Dispatch(sess->sess_srv, sess)) |
ioLIBERR(mqtt); |
ioLIBERR(mqtt); |
|
locKill ^= locKill; |
switch (hdr->mqtt_msg.type) { |
switch (hdr->mqtt_msg.type) { |
case MQTT_TYPE_CONNECT: |
case MQTT_TYPE_CONNECT: |
ioDEBUG(5, "Exec CONNECT session"); |
ioDEBUG(5, "Exec CONNECT session"); |
Line 238 thrSession(struct tagSession *sess)
|
Line 240 thrSession(struct tagSession *sess)
|
io_freeVar(v); |
io_freeVar(v); |
} else |
} else |
ioLIBERR(mqtt); |
ioLIBERR(mqtt); |
locKill ^= locKill; |
|
continue; |
continue; |
case MQTT_TYPE_DISCONNECT: |
case MQTT_TYPE_DISCONNECT: |
ioDEBUG(5, "Exec DISCONNECT session"); |
ioDEBUG(5, "Exec DISCONNECT session"); |
finiSession(sess); |
finiSession(sess); |
locKill ^= locKill; | locKill = 42; |
continue; |
continue; |
case MQTT_TYPE_PUBLISH: |
case MQTT_TYPE_PUBLISH: |
ioDEBUG(5, "Exec PUBLISH topic QoS=%d", hdr->mqtt_msg.qos); |
ioDEBUG(5, "Exec PUBLISH topic QoS=%d", hdr->mqtt_msg.qos); |
Line 261 thrSession(struct tagSession *sess)
|
Line 262 thrSession(struct tagSession *sess)
|
case MQTT_TYPE_PINGREQ: |
case MQTT_TYPE_PINGREQ: |
ioDEBUG(5, "Exec PINGREQ session"); |
ioDEBUG(5, "Exec PINGREQ session"); |
break; |
break; |
|
case MQTT_TYPE_PINGRESP: |
|
ioDEBUG(5, "Exec PINGRESP session"); |
|
break; |
default: |
default: |
ioDEBUG(5, "Error:: Session %s, wrong command %d - DISCARDED", |
ioDEBUG(5, "Error:: Session %s, wrong command %d - DISCARDED", |
sess->sess_cid, hdr->mqtt_msg.type); |
sess->sess_cid, hdr->mqtt_msg.type); |
Line 270 thrSession(struct tagSession *sess)
|
Line 274 thrSession(struct tagSession *sess)
|
|
|
pthread_cleanup_pop(locKill); |
pthread_cleanup_pop(locKill); |
pthread_exit(NULL); |
pthread_exit(NULL); |
|
return NULL; |
} |
} |
|
|
static void * |
static void * |
Line 281 startSession(sched_task_t *task)
|
Line 286 startSession(sched_task_t *task)
|
mqtthdr_connack_t cack; |
mqtthdr_connack_t cack; |
struct tagSession *s, *sess = NULL; |
struct tagSession *s, *sess = NULL; |
int ret; |
int ret; |
pthread_attr_t attr; |
|
|
|
ioTRACE(4); |
ioTRACE(4); |
|
|
Line 333 startSession(sched_task_t *task)
|
Line 337 startSession(sched_task_t *task)
|
ioDEBUG(1, "Login:: ALLOWED for username %s ...", sess->sess_user); |
ioDEBUG(1, "Login:: ALLOWED for username %s ...", sess->sess_user); |
ret = MQTT_RETCODE_ACCEPTED; |
ret = MQTT_RETCODE_ACCEPTED; |
} |
} |
|
|
if (call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%") > 0) { |
if (call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%") > 0) { |
ioDEBUG(2, "Old session %s should be disconnect!", sess->sess_cid); |
ioDEBUG(2, "Old session %s should be disconnect!", sess->sess_cid); |
TAILQ_FOREACH(s, &Sessions, sess_node) |
TAILQ_FOREACH(s, &Sessions, sess_node) |
Line 342 startSession(sched_task_t *task)
|
Line 347 startSession(sched_task_t *task)
|
break; |
break; |
} |
} |
} |
} |
|
|
if (call.InitSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, sess->sess_addr, |
if (call.InitSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, sess->sess_addr, |
sess->sess_will.flag, sess->sess_will.topic, sess->sess_will.msg, |
sess->sess_will.flag, sess->sess_will.topic, sess->sess_will.msg, |
sess->sess_will.qos, sess->sess_will.retain) == -1) { |
sess->sess_will.qos, sess->sess_will.retain) == -1) { |
Line 366 startSession(sched_task_t *task)
|
Line 372 startSession(sched_task_t *task)
|
} |
} |
|
|
/* Start session thread OK ... */ |
/* Start session thread OK ... */ |
pthread_attr_init(&attr); |
|
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); |
|
|
|
SESS_LOCK; |
SESS_LOCK; |
TAILQ_INSERT_TAIL(&Sessions, sess, sess_node); |
TAILQ_INSERT_TAIL(&Sessions, sess, sess_node); |
pthread_create(&sess->sess_tid, &attr, (void*(*)(void*)) thrSession, sess); |
pthread_create(&sess->sess_tid, &attr, (void*(*)(void*)) thrSession, sess); |
Line 376 startSession(sched_task_t *task)
|
Line 379 startSession(sched_task_t *task)
|
|
|
call.LOG(logg, "Session %s started from %s for user %s (timeout=%d) OK!\n", sess->sess_cid, |
call.LOG(logg, "Session %s started from %s for user %s (timeout=%d) OK!\n", sess->sess_cid, |
sess->sess_addr, sess->sess_user, sess->sess_ka); |
sess->sess_addr, sess->sess_user, sess->sess_ka); |
|
|
pthread_attr_destroy(&attr); |
|
return NULL; |
return NULL; |
end: /* close client connection */ |
end: /* close client connection */ |
ret = mqtt_msgCONNACK(&msg, ret); |
ret = mqtt_msgCONNACK(&msg, ret); |
Line 430 acceptClient(sched_task_t *task)
|
Line 431 acceptClient(sched_task_t *task)
|
ioDEBUG(1, "Terminated client with socket=%d", cli); |
ioDEBUG(1, "Terminated client with socket=%d", cli); |
} |
} |
end: |
end: |
schedRead(TASK_ROOT(task), acceptClient, NULL, TASK_FD(task), NULL, 0); | if (!schedRead(TASK_ROOT(task), acceptClient, NULL, TASK_FD(task), NULL, 0)) |
| ioLIBERR(sched); |
return NULL; |
return NULL; |
} |
} |
|
|
Line 456 Run(int sock)
|
Line 458 Run(int sock)
|
return -1; |
return -1; |
} |
} |
|
|
|
pthread_attr_init(&attr); |
|
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); |
|
|
schedPolling(root, &pl, NULL); |
schedPolling(root, &pl, NULL); |
schedRun(root, &Kill); |
schedRun(root, &Kill); |
|
|
|
pthread_attr_destroy(&attr); |
|
|
/* free all undeleted elements into lists */ |
/* free all undeleted elements into lists */ |
PUBS_LOCK; |
PUBS_LOCK; |