Diff for /mqtt/src/daemon.c between versions 1.2.2.36 and 1.2.2.40

version 1.2.2.36, 2012/06/19 13:55:23 version 1.2.2.40, 2012/06/29 08:47:28
Line 24  initSession(int sock, ait_val_t * __restrict v) Line 24  initSession(int sock, ait_val_t * __restrict v)
                 memset(sess, 0, sizeof(struct tagSession));                  memset(sess, 0, sizeof(struct tagSession));
   
         SLIST_INIT(&sess->sess_subscr);          SLIST_INIT(&sess->sess_subscr);
         SLIST_INIT(&sess->sess_sndpkt);  
   
         str = cfg_getAttribute(&cfg, "mqttd", "retry");          str = cfg_getAttribute(&cfg, "mqttd", "retry");
         if (!str)          if (!str)
Line 40  initSession(int sock, ait_val_t * __restrict v) Line 39  initSession(int sock, ait_val_t * __restrict v)
         }          }
   
         /* init server actor */          /* init server actor */
        sess->sess_srv = mqtt_srv_Init(sock, sess->sess_buf, sess->sess_ka);        sess->sess_srv = mqtt_srv_cliInit(sock, sess->sess_buf, sess->sess_ka, 1);
         if (!sess->sess_srv) {          if (!sess->sess_srv) {
                 ioDEBUG(3, "Error:: in srv_Init #%d - %s", mqtt_GetErrno(), mqtt_GetError());                  ioDEBUG(3, "Error:: in srv_Init #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                 mqtt_msgFree(&sess->sess_buf, 42);                  mqtt_msgFree(&sess->sess_buf, 42);
Line 65  static void Line 64  static void
 finiSession(struct tagSession *sess)  finiSession(struct tagSession *sess)
 {  {
         struct tagStore *store;          struct tagStore *store;
         struct tagPkt *p;  
   
         ioTRACE(5);          ioTRACE(5);
   
Line 92  finiSession(struct tagSession *sess) Line 90  finiSession(struct tagSession *sess)
                 io_free(store);                  io_free(store);
         }          }
   
         while ((p = SLIST_FIRST(&sess->sess_sndpkt))) {  
                 SLIST_REMOVE_HEAD(&sess->sess_sndpkt, pkt_node);  
   
                 io_freeVar(&p->pkt_data);  
                 io_free(p);  
         }  
   
         if (sess->sess_will.msg)          if (sess->sess_will.msg)
                 free(sess->sess_will.msg);                  free(sess->sess_will.msg);
         if (sess->sess_will.topic)          if (sess->sess_will.topic)
Line 107  finiSession(struct tagSession *sess) Line 98  finiSession(struct tagSession *sess)
         if (sess->sess_sock > STDERR_FILENO)          if (sess->sess_sock > STDERR_FILENO)
                 srv_Close(sess->sess_sock);                  srv_Close(sess->sess_sock);
   
        mqtt_srv_Fini(&sess->sess_srv);        mqtt_srv_cliFini(&sess->sess_srv);
         mqtt_msgFree(&sess->sess_buf, 42);          mqtt_msgFree(&sess->sess_buf, 42);
   
         io_free(sess);          io_free(sess);
Line 167  dispatchSession(sched_task_t *task) Line 158  dispatchSession(sched_task_t *task)
   
         do {          do {
                 /* dispatch message type */                  /* dispatch message type */
                if ((len = mqtt_srv_Dispatch(sess->sess_srv, ret, sess)) < 0) {                if ((len = mqtt_srv_cliDispatch(sess->sess_srv, ret, sess)) < 0) {
                         if (len == -1) {                          if (len == -1) {
                                 ioLIBERR(mqtt);                                  ioLIBERR(mqtt);
                                 finiSession(sess);                                  finiSession(sess);
Line 284  startSession(sched_task_t *task) Line 275  startSession(sched_task_t *task)
                 if (call.WipePUB_topic)                  if (call.WipePUB_topic)
                         call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1);                          call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1);
         } else {          } else {
                // todo: read_sql subs and prepare publish                // TODO: read_sql subs and prepare publish
         }          }
   
         /* Start session task OK ... */          /* Start session task OK ... */
Line 325  acceptClient(sched_task_t *task) Line 316  acceptClient(sched_task_t *task)
   
         if ((cli = accept(TASK_FD(task), &sa.sa, &sslen)) == -1)          if ((cli = accept(TASK_FD(task), &sa.sa, &sslen)) == -1)
                 goto end;                  goto end;
         else  
                 fcntl(cli, F_SETFL, fcntl(cli, F_GETFL, 0) | O_NONBLOCK);  
   
         v = io_allocVar();          v = io_allocVar();
         if (!v) {          if (!v) {
Line 362  Run(int sock) Line 351  Run(int sock)
   
         ioTRACE(1);          ioTRACE(1);
   
        if (listen(sock, SOMAXCONN) == -1) {        if (mqtt_srv_Listen(sock, 0, 1) == -1) {
                ioSYSERR(0);                ioLIBERR(mqtt);
                 return -1;                  return -1;
        } else        }
                fcntl(sock, F_SETFL, fcntl(sock, F_GETFL, 0) | O_NONBLOCK); 
   
         /* state machine - accept new connections */          /* state machine - accept new connections */
         if (!schedRead(root, acceptClient, NULL, sock, NULL, 0)) {          if (!schedRead(root, acceptClient, NULL, sock, NULL, 0)) {

Removed from v.1.2.2.36  
changed lines
  Added in v.1.2.2.40


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>