Diff for /mqtt/src/daemon.c between versions 1.1.2.31 and 1.2.2.2

version 1.1.2.31, 2012/01/24 16:28:28 version 1.2.2.2, 2012/01/27 16:19:04
Line 2 Line 2
 #include "rtlm.h"  #include "rtlm.h"
 #include "utils.h"  #include "utils.h"
 #include "mqttd.h"  #include "mqttd.h"
   #include "mqttd_calls.h"
   
   
 static void *startSession(sched_task_t *task);  static void *startSession(sched_task_t *task);
Line 26  initSession(int sock, ait_val_t * __restrict v) Line 27  initSession(int sock, ait_val_t * __restrict v)
         } else          } else
                 memset(sess, 0, sizeof(struct tagSession));                  memset(sess, 0, sizeof(struct tagSession));
   
           TAILQ_INIT(&sess->sess_sndqueue);
   
         str = (const char*) cfg_GetAttribute(&cfg, CFG("mqttd"), CFG("retry"));          str = (const char*) cfg_GetAttribute(&cfg, CFG("mqttd"), CFG("retry"));
         if (!str)          if (!str)
                 sess->sess_retry = DEFAULT_RETRY;                  sess->sess_retry = DEFAULT_RETRY;
Line 40  initSession(int sock, ait_val_t * __restrict v) Line 43  initSession(int sock, ait_val_t * __restrict v)
                 return NULL;                  return NULL;
         }          }
   
           sess->sess_srv = mqtt_srv_Init(sock, sess->sess_buf);
           if (!sess->sess_srv) {
                   ioDEBUG(3, "Error:: in srv_Init #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                   mqtt_msgFree(&sess->sess_buf, 42);
                   free(sess);
                   io_freeVar(v);
                   return NULL;
           }
   
         sess->sess_sock = sock;          sess->sess_sock = sock;
         strlcpy(sess->sess_addr, (char*) AIT_GET_STR(v), sizeof sess->sess_addr);          strlcpy(sess->sess_addr, (char*) AIT_GET_STR(v), sizeof sess->sess_addr);
         io_freeVar(v);          io_freeVar(v);
Line 59  finiSession(struct tagSession *sess, int preservSock) Line 71  finiSession(struct tagSession *sess, int preservSock)
         if (call.FiniSessPUB)          if (call.FiniSessPUB)
                 call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");                  call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
   
        while ((store = SLIST_FIRST(&sess->sess_sndqueue))) {        while ((store = TAILQ_FIRST(&sess->sess_sndqueue))) {
                SLIST_REMOVE_HEAD(&sess->sess_sndqueue, st_node);                TAILQ_REMOVE(&sess->sess_sndqueue, store, st_node);
 
                 if (store->st_subscr.sub_topic.msg_base)
                         free(store->st_subscr.sub_topic.msg_base);
                 if (store->st_subscr.sub_value.msg_base)
                         free(store->st_subscr.sub_value.msg_base);
 
                 free(store);                  free(store);
         }          }
   
Line 72  finiSession(struct tagSession *sess, int preservSock) Line 90  finiSession(struct tagSession *sess, int preservSock)
         if (sess->sess_sock > STDERR_FILENO && !preservSock)          if (sess->sess_sock > STDERR_FILENO && !preservSock)
                 srv_Close(sess->sess_sock);                  srv_Close(sess->sess_sock);
   
           mqtt_srv_Fini(&sess->sess_srv);
         mqtt_msgFree(&sess->sess_buf, 42);          mqtt_msgFree(&sess->sess_buf, 42);
   
         free(sess);          free(sess);
Line 156  KASession(struct tagSession *sess) Line 175  KASession(struct tagSession *sess)
 static void *  static void *
 thrSession(struct tagSession *sess)  thrSession(struct tagSession *sess)
 {  {
         mqtt_msg_t msg = { NULL, 0 };  
         int ret, locKill = 42;          int ret, locKill = 42;
         struct pollfd pfd;          struct pollfd pfd;
         struct mqtthdr *hdr;          struct mqtthdr *hdr;
Line 199  thrSession(struct tagSession *sess) Line 217  thrSession(struct tagSession *sess)
                                 if (call.FiniSessPUB)                                  if (call.FiniSessPUB)
                                         call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");                                          call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
   
                                while ((store = SLIST_FIRST(&sess->sess_sndqueue))) {                                while ((store = TAILQ_FIRST(&sess->sess_sndqueue))) {
                                        SLIST_REMOVE_HEAD(&sess->sess_sndqueue, st_node);                                        TAILQ_REMOVE(&sess->sess_sndqueue, store, st_node);
 
                                         if (store->st_subscr.sub_topic.msg_base)
                                                 free(store->st_subscr.sub_topic.msg_base);
                                         if (store->st_subscr.sub_value.msg_base)
                                                 free(store->st_subscr.sub_value.msg_base);
 
                                         free(store);                                          free(store);
                                 }                                  }
   
Line 234  thrSession(struct tagSession *sess) Line 258  thrSession(struct tagSession *sess)
                                                 sess->sess_addr, sess->sess_user);                                                  sess->sess_addr, sess->sess_user);
                                 continue;                                  continue;
                         case MQTT_TYPE_PUBLISH:                          case MQTT_TYPE_PUBLISH:
                                ioDEBUG(5, "Work in progress ...");                                ioDEBUG(5, "Exec PUBLISH topic QoS=%d", hdr->mqtt_msg.qos);
                                 if (Publish(sess))
                                         locKill ^= locKill;
                                 break;                                  break;
                         case MQTT_TYPE_PUBREL:                          case MQTT_TYPE_PUBREL:
                                 break;                                  break;

Removed from v.1.1.2.31  
changed lines
  Added in v.1.2.2.2


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>