Diff for /mqtt/src/daemon.c between versions 1.2.2.10 and 1.2.2.11

version 1.2.2.10, 2012/04/25 07:37:16 version 1.2.2.11, 2012/04/25 12:04:30
Line 230  thrSession(struct tagSession *sess) Line 230  thrSession(struct tagSession *sess)
                 /* dispatch message type */                  /* dispatch message type */
                 if (mqtt_srv_Dispatch(sess->sess_srv, sess))                  if (mqtt_srv_Dispatch(sess->sess_srv, sess))
                         ioLIBERR(mqtt);                          ioLIBERR(mqtt);
                   locKill ^= locKill;
                 switch (hdr->mqtt_msg.type) {                  switch (hdr->mqtt_msg.type) {
                         case MQTT_TYPE_CONNECT:                          case MQTT_TYPE_CONNECT:
                                 ioDEBUG(5, "Exec CONNECT session");                                  ioDEBUG(5, "Exec CONNECT session");
Line 239  thrSession(struct tagSession *sess) Line 240  thrSession(struct tagSession *sess)
                                                 io_freeVar(v);                                                  io_freeVar(v);
                                 } else                                  } else
                                         ioLIBERR(mqtt);                                          ioLIBERR(mqtt);
                                 locKill ^= locKill;  
                                 continue;                                  continue;
                         case MQTT_TYPE_DISCONNECT:                          case MQTT_TYPE_DISCONNECT:
                                 ioDEBUG(5, "Exec DISCONNECT session");                                  ioDEBUG(5, "Exec DISCONNECT session");
                                 finiSession(sess);                                  finiSession(sess);
                                locKill ^= locKill;                                locKill = 42;
                                 continue;                                  continue;
                         case MQTT_TYPE_PUBLISH:                          case MQTT_TYPE_PUBLISH:
                                 ioDEBUG(5, "Exec PUBLISH topic QoS=%d", hdr->mqtt_msg.qos);                                  ioDEBUG(5, "Exec PUBLISH topic QoS=%d", hdr->mqtt_msg.qos);
Line 262  thrSession(struct tagSession *sess) Line 262  thrSession(struct tagSession *sess)
                         case MQTT_TYPE_PINGREQ:                          case MQTT_TYPE_PINGREQ:
                                 ioDEBUG(5, "Exec PINGREQ session");                                  ioDEBUG(5, "Exec PINGREQ session");
                                 break;                                  break;
                           case MQTT_TYPE_PINGRESP:
                                   ioDEBUG(5, "Exec PINGRESP session");
                                   break;
                         default:                          default:
                                 ioDEBUG(5, "Error:: Session %s, wrong command %d - DISCARDED",                                   ioDEBUG(5, "Error:: Session %s, wrong command %d - DISCARDED", 
                                                 sess->sess_cid, hdr->mqtt_msg.type);                                                  sess->sess_cid, hdr->mqtt_msg.type);
Line 271  thrSession(struct tagSession *sess) Line 274  thrSession(struct tagSession *sess)
   
         pthread_cleanup_pop(locKill);          pthread_cleanup_pop(locKill);
         pthread_exit(NULL);          pthread_exit(NULL);
           return NULL;
 }  }
   
 static void *  static void *
Line 286  startSession(sched_task_t *task) Line 290  startSession(sched_task_t *task)
         ioTRACE(4);          ioTRACE(4);
   
         assert(task);          assert(task);
           printf("aaaaaaaaaaaaaaaaa\n");
           fflush(stdout);
   
         if (!TASK_DATA(task)) {          if (!TASK_DATA(task)) {
                 /* flow from accept new clients */                  /* flow from accept new clients */
Line 323  startSession(sched_task_t *task) Line 329  startSession(sched_task_t *task)
                 sess->sess_will.flag = flg.will_flg;                  sess->sess_will.flag = flg.will_flg;
         }          }
   
           printf("sql=%p\n", acc);
           fflush(stdout);
         /* check online table for user */          /* check online table for user */
         if (call.LoginACC(&cfg, acc, sess->sess_user, sess->sess_pass) < 1) {          if (call.LoginACC(&cfg, acc, sess->sess_user, sess->sess_pass) < 1) {
                 ioDEBUG(0, "Login:: DENIED for username %s and password %s",                   ioDEBUG(0, "Login:: DENIED for username %s and password %s", 
Line 333  startSession(sched_task_t *task) Line 341  startSession(sched_task_t *task)
                 ioDEBUG(1, "Login:: ALLOWED for username %s ...", sess->sess_user);                  ioDEBUG(1, "Login:: ALLOWED for username %s ...", sess->sess_user);
                 ret = MQTT_RETCODE_ACCEPTED;                  ret = MQTT_RETCODE_ACCEPTED;
         }          }
           printf(".sql=%p\n", pub);
           fflush(stdout);
         if (call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%") > 0) {          if (call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%") > 0) {
                 ioDEBUG(2, "Old session %s should be disconnect!", sess->sess_cid);                  ioDEBUG(2, "Old session %s should be disconnect!", sess->sess_cid);
                 TAILQ_FOREACH(s, &Sessions, sess_node)                  TAILQ_FOREACH(s, &Sessions, sess_node)
Line 342  startSession(sched_task_t *task) Line 352  startSession(sched_task_t *task)
                                 break;                                  break;
                         }                          }
         }          }
           printf("...sql=%p\n", pub);
           fflush(stdout);
         if (call.InitSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, sess->sess_addr,           if (call.InitSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, sess->sess_addr, 
                                 sess->sess_will.flag, sess->sess_will.topic, sess->sess_will.msg,                                   sess->sess_will.flag, sess->sess_will.topic, sess->sess_will.msg, 
                                 sess->sess_will.qos, sess->sess_will.retain) == -1) {                                  sess->sess_will.qos, sess->sess_will.retain) == -1) {
Line 368  startSession(sched_task_t *task) Line 380  startSession(sched_task_t *task)
         /* Start session thread OK ... */          /* Start session thread OK ... */
         SESS_LOCK;          SESS_LOCK;
         TAILQ_INSERT_TAIL(&Sessions, sess, sess_node);          TAILQ_INSERT_TAIL(&Sessions, sess, sess_node);
        pthread_create(&sess->sess_tid, &attr, (void*(*)(void*)) thrSession, sess);        pthread_create(&sess->sess_tid, NULL, (void*(*)(void*)) thrSession, sess);
         pthread_detach(sess->sess_tid);
         SESS_UNLOCK;          SESS_UNLOCK;
   
         call.LOG(logg, "Session %s started from %s for user %s (timeout=%d) OK!\n", sess->sess_cid,           call.LOG(logg, "Session %s started from %s for user %s (timeout=%d) OK!\n", sess->sess_cid, 
Line 425  acceptClient(sched_task_t *task) Line 438  acceptClient(sched_task_t *task)
                 ioDEBUG(1, "Terminated client with socket=%d", cli);                  ioDEBUG(1, "Terminated client with socket=%d", cli);
         }          }
 end:  end:
        schedRead(TASK_ROOT(task), acceptClient, NULL, TASK_FD(task), NULL, 0);        if (!schedRead(TASK_ROOT(task), acceptClient, NULL, TASK_FD(task), NULL, 0))
                 ioLIBERR(sched);
         return NULL;          return NULL;
 }  }
   

Removed from v.1.2.2.10  
changed lines
  Added in v.1.2.2.11


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>