Diff for /mqtt/src/daemon.c between versions 1.2.2.3 and 1.2.2.7

version 1.2.2.3, 2012/01/30 08:29:55 version 1.2.2.7, 2012/04/15 22:06:10
Line 27  initSession(int sock, ait_val_t * __restrict v) Line 27  initSession(int sock, ait_val_t * __restrict v)
         } else          } else
                 memset(sess, 0, sizeof(struct tagSession));                  memset(sess, 0, sizeof(struct tagSession));
   
        TAILQ_INIT(&sess->sess_sndqueue);        pthread_mutex_init(&sess->sess_mtx, NULL);
   
        str = (const char*) cfg_GetAttribute(&cfg, CFG("mqttd"), CFG("retry"));        SLIST_INIT(&sess->sess_txque[MQTT_QOS_ONCE]);
         SLIST_INIT(&sess->sess_txque[MQTT_QOS_ACK]);
         SLIST_INIT(&sess->sess_txque[MQTT_QOS_EXACTLY]);
 
         str = cfg_getAttribute(&cfg, "mqttd", "retry");
         if (!str)          if (!str)
                 sess->sess_retry = DEFAULT_RETRY;                  sess->sess_retry = DEFAULT_RETRY;
         else          else
Line 70  static void Line 74  static void
 finiSession(struct tagSession *sess, int preservSock)  finiSession(struct tagSession *sess, int preservSock)
 {  {
         struct tagStore *store;          struct tagStore *store;
           register int i;
   
         ioTRACE(5);          ioTRACE(5);
   
Line 79  finiSession(struct tagSession *sess, int preservSock) Line 84  finiSession(struct tagSession *sess, int preservSock)
         if (call.FiniSessPUB)          if (call.FiniSessPUB)
                 call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");                  call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
   
        while ((store = TAILQ_FIRST(&sess->sess_sndqueue))) {        SESS_ELEM_LOCK(sess);
                TAILQ_REMOVE(&sess->sess_sndqueue, store, st_node);        for (i = 0; i < MQTT_QOS_RESERVED; i++)
                 while ((store = SLIST_FIRST(&sess->sess_txque[i]))) {
                         SLIST_REMOVE_HEAD(&sess->sess_txque[i], st_node);
   
                if (store->st_subscr.sub_topic.msg_base)                        if (store->st_subscr.sub_topic.msg_base)
                        free(store->st_subscr.sub_topic.msg_base);                                free(store->st_subscr.sub_topic.msg_base);
                if (store->st_subscr.sub_value.msg_base)                        if (store->st_subscr.sub_value.msg_base)
                        free(store->st_subscr.sub_value.msg_base);                                free(store->st_subscr.sub_value.msg_base);
   
                free(store);                        free(store);
        }                }
         SESS_ELEM_UNLOCK(sess);
         pthread_mutex_destroy(&sess->sess_mtx);
   
         if (sess->sess_will.msg)          if (sess->sess_will.msg)
                 free(sess->sess_will.msg);                  free(sess->sess_will.msg);
Line 114  stopSession(struct tagSession *sess) Line 123  stopSession(struct tagSession *sess)
   
         assert(sess);          assert(sess);
   
        pthread_mutex_lock(&mtx_sess);        SESS_LOCK;
         TAILQ_REMOVE(&Sessions, sess, sess_node);          TAILQ_REMOVE(&Sessions, sess, sess_node);
        pthread_mutex_unlock(&mtx_sess);        SESS_UNLOCK;
   
         ret = mqtt_msgDISCONNECT(&msg);          ret = mqtt_msgDISCONNECT(&msg);
         if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1)          if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1)
Line 246  thrSession(struct tagSession *sess) Line 255  thrSession(struct tagSession *sess)
                         case MQTT_TYPE_UNSUBSCRIBE:                          case MQTT_TYPE_UNSUBSCRIBE:
                                 break;                                  break;
                         case MQTT_TYPE_PINGREQ:                          case MQTT_TYPE_PINGREQ:
                                   ioDEBUG(5, "Exec PINGREQ session");
                                 break;                                  break;
                         default:                          default:
                                 ioDEBUG(5, "Error:: Session %s, wrong command %d - DISCARDED",                                   ioDEBUG(5, "Error:: Session %s, wrong command %d - DISCARDED", 
Line 354  startSession(sched_task_t *task) Line 364  startSession(sched_task_t *task)
         pthread_attr_init(&attr);          pthread_attr_init(&attr);
         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);          pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
   
        pthread_mutex_lock(&mtx_sess);        SESS_LOCK;
         TAILQ_INSERT_TAIL(&Sessions, sess, sess_node);          TAILQ_INSERT_TAIL(&Sessions, sess, sess_node);
         pthread_create(&sess->sess_tid, &attr, (void*(*)(void*)) thrSession, sess);          pthread_create(&sess->sess_tid, &attr, (void*(*)(void*)) thrSession, sess);
        pthread_mutex_unlock(&mtx_sess);        SESS_UNLOCK;
   
         call.LOG(logg, "Session %s started from %s for user %s (timeout=%d) OK!\n", sess->sess_cid,           call.LOG(logg, "Session %s started from %s for user %s (timeout=%d) OK!\n", sess->sess_cid, 
                         sess->sess_addr, sess->sess_user, sess->sess_ka);                          sess->sess_addr, sess->sess_user, sess->sess_ka);
Line 411  Run(int sock) Line 421  Run(int sock)
   
         ioTRACE(1);          ioTRACE(1);
   
           /* start alternative thread for scheduler */
         if (pthread_create(&tid, NULL, thrSched, NULL)) {          if (pthread_create(&tid, NULL, thrSched, NULL)) {
                 ioSYSERR(0);                  ioSYSERR(0);
                 return -1;                  return -1;
Line 432  Run(int sock) Line 443  Run(int sock)
                 }                  }
                 v = io_allocVar();                  v = io_allocVar();
                 if (!v) {                  if (!v) {
                        ioLIBERR(mqtt);                        ioLIBERR(io);
                         break;                          break;
                 } else {                  } else {
                         memset(str, 0, sizeof str);                          memset(str, 0, sizeof str);
                         snprintf(str, sizeof str, "%s:%hu", io_n2addr(&sa, v), io_n2port(&sa));                          snprintf(str, sizeof str, "%s:%hu", io_n2addr(&sa, v), io_n2port(&sa));
                           AIT_FREE_VAL(v);
                         AIT_SET_STR(v, str);                          AIT_SET_STR(v, str);
                 }                  }
                 ioDEBUG(1, "Connected client with socket=%d from %s", cli, AIT_GET_STR(v));                  ioDEBUG(1, "Connected client with socket=%d from %s", cli, AIT_GET_STR(v));

Removed from v.1.2.2.3  
changed lines
  Added in v.1.2.2.7


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>