Diff for /mqtt/src/daemon.c between versions 1.2.2.9 and 1.2.2.13

version 1.2.2.9, 2012/04/24 13:54:50 version 1.2.2.13, 2012/04/25 13:33:53
Line 6 Line 6
   
   
 static void *startSession(sched_task_t *task);  static void *startSession(sched_task_t *task);
   static pthread_attr_t attr;
   
   
 static inline struct tagSession *  static inline struct tagSession *
Line 238  thrSession(struct tagSession *sess) Line 239  thrSession(struct tagSession *sess)
                                                 io_freeVar(v);                                                  io_freeVar(v);
                                 } else                                  } else
                                         ioLIBERR(mqtt);                                          ioLIBERR(mqtt);
   
                                   SESS_LOCK;
                                   TAILQ_REMOVE(&Sessions, sess, sess_node);
                                   SESS_UNLOCK;
   
                                 locKill ^= locKill;                                  locKill ^= locKill;
                                continue;                                break;
                         case MQTT_TYPE_DISCONNECT:                          case MQTT_TYPE_DISCONNECT:
                                 ioDEBUG(5, "Exec DISCONNECT session");                                  ioDEBUG(5, "Exec DISCONNECT session");
                                 finiSession(sess);                                  finiSession(sess);
   
                                   SESS_LOCK;
                                   TAILQ_REMOVE(&Sessions, sess, sess_node);
                                   SESS_UNLOCK;
   
                                 locKill ^= locKill;                                  locKill ^= locKill;
                                 continue;                                  continue;
                         case MQTT_TYPE_PUBLISH:                          case MQTT_TYPE_PUBLISH:
Line 261  thrSession(struct tagSession *sess) Line 272  thrSession(struct tagSession *sess)
                         case MQTT_TYPE_PINGREQ:                          case MQTT_TYPE_PINGREQ:
                                 ioDEBUG(5, "Exec PINGREQ session");                                  ioDEBUG(5, "Exec PINGREQ session");
                                 break;                                  break;
                           case MQTT_TYPE_PINGRESP:
                                   ioDEBUG(5, "Exec PINGRESP session");
                                   break;
                         default:                          default:
                                 ioDEBUG(5, "Error:: Session %s, wrong command %d - DISCARDED",                                   ioDEBUG(5, "Error:: Session %s, wrong command %d - DISCARDED", 
                                                 sess->sess_cid, hdr->mqtt_msg.type);                                                  sess->sess_cid, hdr->mqtt_msg.type);
Line 270  thrSession(struct tagSession *sess) Line 284  thrSession(struct tagSession *sess)
   
         pthread_cleanup_pop(locKill);          pthread_cleanup_pop(locKill);
         pthread_exit(NULL);          pthread_exit(NULL);
           return NULL;
 }  }
   
 static void *  static void *
Line 281  startSession(sched_task_t *task) Line 296  startSession(sched_task_t *task)
         mqtthdr_connack_t cack;          mqtthdr_connack_t cack;
         struct tagSession *s, *sess = NULL;          struct tagSession *s, *sess = NULL;
         int ret;          int ret;
         pthread_attr_t attr;  
   
         ioTRACE(4);          ioTRACE(4);
   
Line 333  startSession(sched_task_t *task) Line 347  startSession(sched_task_t *task)
                 ioDEBUG(1, "Login:: ALLOWED for username %s ...", sess->sess_user);                  ioDEBUG(1, "Login:: ALLOWED for username %s ...", sess->sess_user);
                 ret = MQTT_RETCODE_ACCEPTED;                  ret = MQTT_RETCODE_ACCEPTED;
         }          }
   
         if (call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%") > 0) {          if (call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%") > 0) {
                 ioDEBUG(2, "Old session %s should be disconnect!", sess->sess_cid);                  ioDEBUG(2, "Old session %s should be disconnect!", sess->sess_cid);
                 TAILQ_FOREACH(s, &Sessions, sess_node)                  TAILQ_FOREACH(s, &Sessions, sess_node)
Line 342  startSession(sched_task_t *task) Line 357  startSession(sched_task_t *task)
                                 break;                                  break;
                         }                          }
         }          }
   
         if (call.InitSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, sess->sess_addr,           if (call.InitSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, sess->sess_addr, 
                                 sess->sess_will.flag, sess->sess_will.topic, sess->sess_will.msg,                                   sess->sess_will.flag, sess->sess_will.topic, sess->sess_will.msg, 
                                 sess->sess_will.qos, sess->sess_will.retain) == -1) {                                  sess->sess_will.qos, sess->sess_will.retain) == -1) {
Line 366  startSession(sched_task_t *task) Line 382  startSession(sched_task_t *task)
         }          }
   
         /* Start session thread OK ... */          /* Start session thread OK ... */
         pthread_attr_init(&attr);  
         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);  
   
         SESS_LOCK;          SESS_LOCK;
         TAILQ_INSERT_TAIL(&Sessions, sess, sess_node);          TAILQ_INSERT_TAIL(&Sessions, sess, sess_node);
         pthread_create(&sess->sess_tid, &attr, (void*(*)(void*)) thrSession, sess);          pthread_create(&sess->sess_tid, &attr, (void*(*)(void*)) thrSession, sess);
Line 376  startSession(sched_task_t *task) Line 389  startSession(sched_task_t *task)
   
         call.LOG(logg, "Session %s started from %s for user %s (timeout=%d) OK!\n", sess->sess_cid,           call.LOG(logg, "Session %s started from %s for user %s (timeout=%d) OK!\n", sess->sess_cid, 
                         sess->sess_addr, sess->sess_user, sess->sess_ka);                          sess->sess_addr, sess->sess_user, sess->sess_ka);
   
         pthread_attr_destroy(&attr);  
         return NULL;          return NULL;
 end:    /* close client connection */  end:    /* close client connection */
         ret = mqtt_msgCONNACK(&msg, ret);          ret = mqtt_msgCONNACK(&msg, ret);
Line 430  acceptClient(sched_task_t *task) Line 441  acceptClient(sched_task_t *task)
                 ioDEBUG(1, "Terminated client with socket=%d", cli);                  ioDEBUG(1, "Terminated client with socket=%d", cli);
         }          }
 end:  end:
        schedRead(TASK_ROOT(task), acceptClient, NULL, TASK_FD(task), NULL, 0);        if (!schedRead(TASK_ROOT(task), acceptClient, NULL, TASK_FD(task), NULL, 0))
                 ioLIBERR(sched);
         return NULL;          return NULL;
 }  }
   
Line 456  Run(int sock) Line 468  Run(int sock)
                 return -1;                  return -1;
         }          }
   
           pthread_attr_init(&attr);
           pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
   
         schedPolling(root, &pl, NULL);          schedPolling(root, &pl, NULL);
         schedRun(root, &Kill);          schedRun(root, &Kill);
   
           pthread_attr_destroy(&attr);
   
         /* free all undeleted elements into lists */          /* free all undeleted elements into lists */
         PUBS_LOCK;          PUBS_LOCK;

Removed from v.1.2.2.9  
changed lines
  Added in v.1.2.2.13


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>