Diff for /mqtt/src/daemon.c between versions 1.2.2.5 and 1.2.2.6

version 1.2.2.5, 2012/04/11 15:08:27 version 1.2.2.6, 2012/04/11 16:37:17
Line 29  initSession(int sock, ait_val_t * __restrict v) Line 29  initSession(int sock, ait_val_t * __restrict v)
   
         pthread_mutex_init(&sess->sess_mtx, NULL);          pthread_mutex_init(&sess->sess_mtx, NULL);
   
        TAILQ_INIT(&sess->sess_sndqueue);        SLIST_INIT(&sess->sess_txque[MQTT_QOS_ONCE]);
         SLIST_INIT(&sess->sess_txque[MQTT_QOS_ACK]);
         SLIST_INIT(&sess->sess_txque[MQTT_QOS_EXACTLY]);
   
         str = cfg_getAttribute(&cfg, "mqttd", "retry");          str = cfg_getAttribute(&cfg, "mqttd", "retry");
         if (!str)          if (!str)
Line 72  static void Line 74  static void
 finiSession(struct tagSession *sess, int preservSock)  finiSession(struct tagSession *sess, int preservSock)
 {  {
         struct tagStore *store;          struct tagStore *store;
           register int i;
   
         ioTRACE(5);          ioTRACE(5);
   
Line 81  finiSession(struct tagSession *sess, int preservSock) Line 84  finiSession(struct tagSession *sess, int preservSock)
         if (call.FiniSessPUB)          if (call.FiniSessPUB)
                 call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");                  call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
   
        pthread_mutex_lock(&sess->sess_mtx);        SESS_ELEM_LOCK(sess);
        while ((store = TAILQ_FIRST(&sess->sess_sndqueue))) {        for (i = 0; i < MQTT_QOS_RESERVED; i++)
                TAILQ_REMOVE(&sess->sess_sndqueue, store, st_node);                while ((store = SLIST_FIRST(&sess->sess_txque[i]))) {
                         SLIST_REMOVE_HEAD(&sess->sess_txque[i], st_node);
   
                if (store->st_subscr.sub_topic.msg_base)                        if (store->st_subscr.sub_topic.msg_base)
                        free(store->st_subscr.sub_topic.msg_base);                                free(store->st_subscr.sub_topic.msg_base);
                if (store->st_subscr.sub_value.msg_base)                        if (store->st_subscr.sub_value.msg_base)
                        free(store->st_subscr.sub_value.msg_base);                                free(store->st_subscr.sub_value.msg_base);
   
                free(store);                        free(store);
        }                }
        pthread_mutex_unlock(&sess->sess_mtx);        SESS_ELEM_UNLOCK(sess);
         pthread_mutex_destroy(&sess->sess_mtx);          pthread_mutex_destroy(&sess->sess_mtx);
   
         if (sess->sess_will.msg)          if (sess->sess_will.msg)
Line 119  stopSession(struct tagSession *sess) Line 123  stopSession(struct tagSession *sess)
   
         assert(sess);          assert(sess);
   
        pthread_mutex_lock(&mtx_sess);        SESS_LOCK;
         TAILQ_REMOVE(&Sessions, sess, sess_node);          TAILQ_REMOVE(&Sessions, sess, sess_node);
        pthread_mutex_unlock(&mtx_sess);        SESS_UNLOCK;
   
         ret = mqtt_msgDISCONNECT(&msg);          ret = mqtt_msgDISCONNECT(&msg);
         if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1)          if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1)
Line 360  startSession(sched_task_t *task) Line 364  startSession(sched_task_t *task)
         pthread_attr_init(&attr);          pthread_attr_init(&attr);
         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);          pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
   
        pthread_mutex_lock(&mtx_sess);        SESS_LOCK;
         TAILQ_INSERT_TAIL(&Sessions, sess, sess_node);          TAILQ_INSERT_TAIL(&Sessions, sess, sess_node);
         pthread_create(&sess->sess_tid, &attr, (void*(*)(void*)) thrSession, sess);          pthread_create(&sess->sess_tid, &attr, (void*(*)(void*)) thrSession, sess);
        pthread_mutex_unlock(&mtx_sess);        SESS_UNLOCK;
   
         call.LOG(logg, "Session %s started from %s for user %s (timeout=%d) OK!\n", sess->sess_cid,           call.LOG(logg, "Session %s started from %s for user %s (timeout=%d) OK!\n", sess->sess_cid, 
                         sess->sess_addr, sess->sess_user, sess->sess_ka);                          sess->sess_addr, sess->sess_user, sess->sess_ka);

Removed from v.1.2.2.5  
changed lines
  Added in v.1.2.2.6


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>