Diff for /mqtt/src/daemon.c between versions 1.1.2.30 and 1.1.2.32

version 1.1.2.30, 2012/01/24 10:18:45 version 1.1.2.32, 2012/01/25 10:34:14
Line 2 Line 2
 #include "rtlm.h"  #include "rtlm.h"
 #include "utils.h"  #include "utils.h"
 #include "mqttd.h"  #include "mqttd.h"
   #include "mqttd_calls.h"
   
   
 static void *startSession(sched_task_t *task);  static void *startSession(sched_task_t *task);
Line 26  initSession(int sock, ait_val_t * __restrict v) Line 27  initSession(int sock, ait_val_t * __restrict v)
         } else          } else
                 memset(sess, 0, sizeof(struct tagSession));                  memset(sess, 0, sizeof(struct tagSession));
   
           TAILQ_INIT(&sess->sess_sndqueue);
   
         str = (const char*) cfg_GetAttribute(&cfg, CFG("mqttd"), CFG("retry"));          str = (const char*) cfg_GetAttribute(&cfg, CFG("mqttd"), CFG("retry"));
         if (!str)          if (!str)
                 sess->sess_retry = DEFAULT_RETRY;                  sess->sess_retry = DEFAULT_RETRY;
Line 59  finiSession(struct tagSession *sess, int preservSock) Line 62  finiSession(struct tagSession *sess, int preservSock)
         if (call.FiniSessPUB)          if (call.FiniSessPUB)
                 call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");                  call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
   
        while ((store = SLIST_FIRST(&sess->sess_sndqueue))) {        while ((store = TAILQ_FIRST(&sess->sess_sndqueue))) {
                SLIST_REMOVE_HEAD(&sess->sess_sndqueue, st_node);                TAILQ_REMOVE(&sess->sess_sndqueue, store, st_node);
 
                 if (store->st_subscr.sub_topic._base)
                         free(store->st_subscr.sub_topic._base);
                 if (store->st_subscr.sub_value._base)
                         free(store->st_subscr.sub_value._base);
 
                 free(store);                  free(store);
         }          }
   
Line 199  thrSession(struct tagSession *sess) Line 208  thrSession(struct tagSession *sess)
                                 if (call.FiniSessPUB)                                  if (call.FiniSessPUB)
                                         call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");                                          call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
   
                                while ((store = SLIST_FIRST(&sess->sess_sndqueue))) {                                while ((store = TAILQ_FIRST(&sess->sess_sndqueue))) {
                                        SLIST_REMOVE_HEAD(&sess->sess_sndqueue, st_node);                                        TAILQ_REMOVE(&sess->sess_sndqueue, store, st_node);
 
                                         if (store->st_subscr.sub_topic._base)
                                                 free(store->st_subscr.sub_topic._base);
                                         if (store->st_subscr.sub_value._base)
                                                 free(store->st_subscr.sub_value._base);
 
                                         free(store);                                          free(store);
                                 }                                  }
   
Line 217  thrSession(struct tagSession *sess) Line 232  thrSession(struct tagSession *sess)
                                         ioLIBERR(mqtt);                                          ioLIBERR(mqtt);
   
                                 locKill ^= locKill;                                  locKill ^= locKill;
   
                                   call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
                                                   sess->sess_addr, sess->sess_user);
                                 continue;                                  continue;
                         case MQTT_TYPE_DISCONNECT:                          case MQTT_TYPE_DISCONNECT:
                                 ioDEBUG(5, "Exec DISCONNECT session");                                  ioDEBUG(5, "Exec DISCONNECT session");
Line 226  thrSession(struct tagSession *sess) Line 244  thrSession(struct tagSession *sess)
   
                                 finiSession(sess, 0);                                  finiSession(sess, 0);
                                 locKill ^= locKill;                                  locKill ^= locKill;
   
                                   call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
                                                   sess->sess_addr, sess->sess_user);
                                 continue;                                  continue;
                         case MQTT_TYPE_PUBLISH:                          case MQTT_TYPE_PUBLISH:
                                ioDEBUG(5, "Work in progress ...");                                ioDEBUG(5, "Exec PUBLISH topic QoS=%d", hdr->mqtt_msg.qos);
                                 if (Publish(sess))
                                         locKill ^= locKill;
                                 break;                                  break;
                         case MQTT_TYPE_PUBREL:                          case MQTT_TYPE_PUBREL:
                                 break;                                  break;
Line 264  startSession(sched_task_t *task) Line 287  startSession(sched_task_t *task)
   
         assert(task);          assert(task);
   
         ioDEBUG(3, "task_Data=%p", TASK_DATA(task));  
         if (!TASK_DATA(task)) {          if (!TASK_DATA(task)) {
                 sess = initSession(TASK_FD(task), TASK_ARG(task));                  sess = initSession(TASK_FD(task), TASK_ARG(task));
                 if (!sess) {                  if (!sess) {
Line 282  startSession(sched_task_t *task) Line 304  startSession(sched_task_t *task)
         } else {          } else {
                 sess = TASK_DATA(task);                  sess = TASK_DATA(task);
                 buf.msg_len = TASK_DATLEN(task) > sizeof basebuf ? sizeof basebuf : TASK_DATLEN(task);                  buf.msg_len = TASK_DATLEN(task) > sizeof basebuf ? sizeof basebuf : TASK_DATLEN(task);
                 ioDEBUG(3, "debug:: sock=%d s=%p sbuf=%p sbl=%d ret=%d\n", sess->sess_sock, sess->sess_buf, sess->sess_buf->msg_base, (int) sess->sess_buf->msg_len, (int) TASK_DATLEN(task));  
                 memcpy(buf.msg_base, sess->sess_buf->msg_base, buf.msg_len);                  memcpy(buf.msg_base, sess->sess_buf->msg_base, buf.msg_len);
         }          }
   
Line 378  static void * Line 399  static void *
 thrSched(void *arg __unused)  thrSched(void *arg __unused)
 {  {
         struct tagSession *sess;          struct tagSession *sess;
           struct timespec pl = { 0, 10000000 };
   
         ioTRACE(1);          ioTRACE(1);
   
           schedPolling(root, &pl, NULL);
         schedRun(root, &Kill);          schedRun(root, &Kill);
   
         TAILQ_FOREACH(sess, &Sessions, sess_node)          TAILQ_FOREACH(sess, &Sessions, sess_node)

Removed from v.1.1.2.30  
changed lines
  Added in v.1.1.2.32


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>