Diff for /mqtt/src/daemon.c between versions 1.2.2.12 and 1.2.2.16

version 1.2.2.12, 2012/04/25 13:08:15 version 1.2.2.16, 2012/04/26 07:25:00
Line 38  initSession(int sock, ait_val_t * __restrict v) Line 38  initSession(int sock, ait_val_t * __restrict v)
         else          else
                 sess->sess_retry = strtol(str, NULL, 0);                  sess->sess_retry = strtol(str, NULL, 0);
   
         if (!(sess->sess_root = schedBegin())) {  
                 ioLIBERR(sched);  
                 free(sess);  
                 io_freeVar(v);  
                 return NULL;  
         }  
   
         sess->sess_buf = mqtt_msgAlloc(USHRT_MAX);          sess->sess_buf = mqtt_msgAlloc(USHRT_MAX);
         if (!sess->sess_buf) {          if (!sess->sess_buf) {
                 ioLIBERR(mqtt);                  ioLIBERR(mqtt);
                 schedEnd(&sess->sess_root);  
                 free(sess);                  free(sess);
                 io_freeVar(v);                  io_freeVar(v);
                 return NULL;                  return NULL;
Line 59  initSession(int sock, ait_val_t * __restrict v) Line 51  initSession(int sock, ait_val_t * __restrict v)
         if (!sess->sess_srv) {          if (!sess->sess_srv) {
                 ioDEBUG(3, "Error:: in srv_Init #%d - %s", mqtt_GetErrno(), mqtt_GetError());                  ioDEBUG(3, "Error:: in srv_Init #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                 mqtt_msgFree(&sess->sess_buf, 42);                  mqtt_msgFree(&sess->sess_buf, 42);
                 schedEnd(&sess->sess_root);  
                 free(sess);                  free(sess);
                 io_freeVar(v);                  io_freeVar(v);
                 return NULL;                  return NULL;
Line 107  finiSession(struct tagSession *sess) Line 98  finiSession(struct tagSession *sess)
         SESS_ELEM_UNLOCK(sess);          SESS_ELEM_UNLOCK(sess);
         pthread_mutex_destroy(&sess->sess_mtx);          pthread_mutex_destroy(&sess->sess_mtx);
   
         schedEnd(&sess->sess_root);  
   
         if (sess->sess_will.msg)          if (sess->sess_will.msg)
                 free(sess->sess_will.msg);                  free(sess->sess_will.msg);
         if (sess->sess_will.topic)          if (sess->sess_will.topic)
Line 148  stopSession(struct tagSession *sess) Line 137  stopSession(struct tagSession *sess)
                         sess->sess_addr, sess->sess_user);                          sess->sess_addr, sess->sess_user);
 }  }
   
 static int  
 KASession(struct tagSession *sess)  
 {  
         mqtt_msg_t msg = { NULL, 0 };  
         int ret;  
         struct pollfd pfd;  
   
         ioTRACE(4);  
   
         assert(sess);  
   
         /* ping request */  
         ret = mqtt_msgPINGREQ(&msg);  
         if ((ret = send(sess->sess_sock, msg.msg_base, ret, MSG_NOSIGNAL)) == -1) {  
                 ioDEBUG(3, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));  
                 return -1;  
         } else {  
                 ioDEBUG(5, "Sended %d bytes for ping request", ret);  
                 free(msg.msg_base);  
                 memset(&msg, 0, sizeof msg);  
         }  
   
         pfd.fd = sess->sess_sock;  
         pfd.events = POLLIN | POLLPRI;  
         if ((ret = poll(&pfd, 1, sess->sess_ka * 1000)) == -1 ||   
                         pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {  
                 ioDEBUG(3, "Error:: poll(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));  
                 return -1;  
         } else if (!ret) {  
                 ioDEBUG(5, "Warning:: Session is abandoned ... must be disconnect!");  
                 return 1;  
         }  
         /* receive & decode packet */  
         if (recv(sess->sess_sock, sess->sess_buf->msg_base, sess->sess_buf->msg_len, 0) == -1) {  
                 ioDEBUG(3, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));  
                 return -1;  
         }  
         if (mqtt_readPINGRESP(sess->sess_buf)) {  
                 ioDEBUG(5, "Warning:: Session is broken, not hear ping response ... must be disconnect!");  
                 return 2;  
         }  
   
         /* Keep Alive is OK! */  
         return 0;  
 }  
   
 static void *  static void *
 thrSession(struct tagSession *sess)  thrSession(struct tagSession *sess)
 {  {
Line 212  thrSession(struct tagSession *sess) Line 155  thrSession(struct tagSession *sess)
                                 pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {                                  pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
                         ioDEBUG(3, "Error:: poll(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));                          ioDEBUG(3, "Error:: poll(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
                         break;                          break;
                } else if (!ret && (ret = KASession(sess))) {                } else if (!ret && (ret = mqtt_KeepAlive(sess->sess_sock, sess->sess_ka, 1))) {
                         call.LOG(logg, "Session %s keep-alive missing from %s for user %s ...\n",                           call.LOG(logg, "Session %s keep-alive missing from %s for user %s ...\n", 
                                         sess->sess_cid, sess->sess_addr, sess->sess_user);                                          sess->sess_cid, sess->sess_addr, sess->sess_user);
                         break;                          break;
Line 230  thrSession(struct tagSession *sess) Line 173  thrSession(struct tagSession *sess)
                 /* dispatch message type */                  /* dispatch message type */
                 if (mqtt_srv_Dispatch(sess->sess_srv, sess))                  if (mqtt_srv_Dispatch(sess->sess_srv, sess))
                         ioLIBERR(mqtt);                          ioLIBERR(mqtt);
                 locKill ^= locKill;  
                 switch (hdr->mqtt_msg.type) {                  switch (hdr->mqtt_msg.type) {
                         case MQTT_TYPE_CONNECT:                          case MQTT_TYPE_CONNECT:
                                 ioDEBUG(5, "Exec CONNECT session");                                  ioDEBUG(5, "Exec CONNECT session");
Line 240  thrSession(struct tagSession *sess) Line 182  thrSession(struct tagSession *sess)
                                                 io_freeVar(v);                                                  io_freeVar(v);
                                 } else                                  } else
                                         ioLIBERR(mqtt);                                          ioLIBERR(mqtt);
                                continue;
                                 SESS_LOCK;
                                 TAILQ_REMOVE(&Sessions, sess, sess_node);
                                 SESS_UNLOCK;
 
                                 locKill ^= locKill;
                                 break;
                         case MQTT_TYPE_DISCONNECT:                          case MQTT_TYPE_DISCONNECT:
                                   /*
                                 ioDEBUG(5, "Exec DISCONNECT session");                                  ioDEBUG(5, "Exec DISCONNECT session");
                                   SESS_LOCK;
                                   TAILQ_REMOVE(&Sessions, sess, sess_node);
                                   SESS_UNLOCK;
                                   */
   
                                 finiSession(sess);                                  finiSession(sess);
                                locKill = 42;                                locKill ^= locKill;
                                 continue;                                  continue;
                         case MQTT_TYPE_PUBLISH:                          case MQTT_TYPE_PUBLISH:
                                 ioDEBUG(5, "Exec PUBLISH topic QoS=%d", hdr->mqtt_msg.qos);                                  ioDEBUG(5, "Exec PUBLISH topic QoS=%d", hdr->mqtt_msg.qos);

Removed from v.1.2.2.12  
changed lines
  Added in v.1.2.2.16


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>