version 1.1.2.30, 2012/01/24 10:18:45
|
version 1.1.2.32, 2012/01/25 10:34:14
|
Line 2
|
Line 2
|
#include "rtlm.h" |
#include "rtlm.h" |
#include "utils.h" |
#include "utils.h" |
#include "mqttd.h" |
#include "mqttd.h" |
|
#include "mqttd_calls.h" |
|
|
|
|
static void *startSession(sched_task_t *task); |
static void *startSession(sched_task_t *task); |
Line 26 initSession(int sock, ait_val_t * __restrict v)
|
Line 27 initSession(int sock, ait_val_t * __restrict v)
|
} else |
} else |
memset(sess, 0, sizeof(struct tagSession)); |
memset(sess, 0, sizeof(struct tagSession)); |
|
|
|
TAILQ_INIT(&sess->sess_sndqueue); |
|
|
str = (const char*) cfg_GetAttribute(&cfg, CFG("mqttd"), CFG("retry")); |
str = (const char*) cfg_GetAttribute(&cfg, CFG("mqttd"), CFG("retry")); |
if (!str) |
if (!str) |
sess->sess_retry = DEFAULT_RETRY; |
sess->sess_retry = DEFAULT_RETRY; |
Line 59 finiSession(struct tagSession *sess, int preservSock)
|
Line 62 finiSession(struct tagSession *sess, int preservSock)
|
if (call.FiniSessPUB) |
if (call.FiniSessPUB) |
call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%"); |
call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%"); |
|
|
while ((store = SLIST_FIRST(&sess->sess_sndqueue))) { | while ((store = TAILQ_FIRST(&sess->sess_sndqueue))) { |
SLIST_REMOVE_HEAD(&sess->sess_sndqueue, st_node); | TAILQ_REMOVE(&sess->sess_sndqueue, store, st_node); |
| |
| if (store->st_subscr.sub_topic._base) |
| free(store->st_subscr.sub_topic._base); |
| if (store->st_subscr.sub_value._base) |
| free(store->st_subscr.sub_value._base); |
| |
free(store); |
free(store); |
} |
} |
|
|
Line 199 thrSession(struct tagSession *sess)
|
Line 208 thrSession(struct tagSession *sess)
|
if (call.FiniSessPUB) |
if (call.FiniSessPUB) |
call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%"); |
call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%"); |
|
|
while ((store = SLIST_FIRST(&sess->sess_sndqueue))) { | while ((store = TAILQ_FIRST(&sess->sess_sndqueue))) { |
SLIST_REMOVE_HEAD(&sess->sess_sndqueue, st_node); | TAILQ_REMOVE(&sess->sess_sndqueue, store, st_node); |
| |
| if (store->st_subscr.sub_topic._base) |
| free(store->st_subscr.sub_topic._base); |
| if (store->st_subscr.sub_value._base) |
| free(store->st_subscr.sub_value._base); |
| |
free(store); |
free(store); |
} |
} |
|
|
Line 217 thrSession(struct tagSession *sess)
|
Line 232 thrSession(struct tagSession *sess)
|
ioLIBERR(mqtt); |
ioLIBERR(mqtt); |
|
|
locKill ^= locKill; |
locKill ^= locKill; |
|
|
|
call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, |
|
sess->sess_addr, sess->sess_user); |
continue; |
continue; |
case MQTT_TYPE_DISCONNECT: |
case MQTT_TYPE_DISCONNECT: |
ioDEBUG(5, "Exec DISCONNECT session"); |
ioDEBUG(5, "Exec DISCONNECT session"); |
Line 226 thrSession(struct tagSession *sess)
|
Line 244 thrSession(struct tagSession *sess)
|
|
|
finiSession(sess, 0); |
finiSession(sess, 0); |
locKill ^= locKill; |
locKill ^= locKill; |
|
|
|
call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, |
|
sess->sess_addr, sess->sess_user); |
continue; |
continue; |
case MQTT_TYPE_PUBLISH: |
case MQTT_TYPE_PUBLISH: |
ioDEBUG(5, "Work in progress ..."); | ioDEBUG(5, "Exec PUBLISH topic QoS=%d", hdr->mqtt_msg.qos); |
| if (Publish(sess)) |
| locKill ^= locKill; |
break; |
break; |
case MQTT_TYPE_PUBREL: |
case MQTT_TYPE_PUBREL: |
break; |
break; |
Line 264 startSession(sched_task_t *task)
|
Line 287 startSession(sched_task_t *task)
|
|
|
assert(task); |
assert(task); |
|
|
ioDEBUG(3, "task_Data=%p", TASK_DATA(task)); |
|
if (!TASK_DATA(task)) { |
if (!TASK_DATA(task)) { |
sess = initSession(TASK_FD(task), TASK_ARG(task)); |
sess = initSession(TASK_FD(task), TASK_ARG(task)); |
if (!sess) { |
if (!sess) { |
Line 282 startSession(sched_task_t *task)
|
Line 304 startSession(sched_task_t *task)
|
} else { |
} else { |
sess = TASK_DATA(task); |
sess = TASK_DATA(task); |
buf.msg_len = TASK_DATLEN(task) > sizeof basebuf ? sizeof basebuf : TASK_DATLEN(task); |
buf.msg_len = TASK_DATLEN(task) > sizeof basebuf ? sizeof basebuf : TASK_DATLEN(task); |
ioDEBUG(3, "debug:: sock=%d s=%p sbuf=%p sbl=%d ret=%d\n", sess->sess_sock, sess->sess_buf, sess->sess_buf->msg_base, (int) sess->sess_buf->msg_len, (int) TASK_DATLEN(task)); |
|
memcpy(buf.msg_base, sess->sess_buf->msg_base, buf.msg_len); |
memcpy(buf.msg_base, sess->sess_buf->msg_base, buf.msg_len); |
} |
} |
|
|
Line 378 static void *
|
Line 399 static void *
|
thrSched(void *arg __unused) |
thrSched(void *arg __unused) |
{ |
{ |
struct tagSession *sess; |
struct tagSession *sess; |
|
struct timespec pl = { 0, 10000000 }; |
|
|
ioTRACE(1); |
ioTRACE(1); |
|
|
|
schedPolling(root, &pl, NULL); |
schedRun(root, &Kill); |
schedRun(root, &Kill); |
|
|
TAILQ_FOREACH(sess, &Sessions, sess_node) |
TAILQ_FOREACH(sess, &Sessions, sess_node) |