Diff for /mqtt/src/daemon.c between versions 1.2.2.35 and 1.2.2.41

version 1.2.2.35, 2012/06/01 10:39:48 version 1.2.2.41, 2012/06/29 13:46:37
Line 24  initSession(int sock, ait_val_t * __restrict v) Line 24  initSession(int sock, ait_val_t * __restrict v)
                 memset(sess, 0, sizeof(struct tagSession));                  memset(sess, 0, sizeof(struct tagSession));
   
         SLIST_INIT(&sess->sess_subscr);          SLIST_INIT(&sess->sess_subscr);
         SLIST_INIT(&sess->sess_sndpkt);  
   
         str = cfg_getAttribute(&cfg, "mqttd", "retry");          str = cfg_getAttribute(&cfg, "mqttd", "retry");
         if (!str)          if (!str)
Line 40  initSession(int sock, ait_val_t * __restrict v) Line 39  initSession(int sock, ait_val_t * __restrict v)
         }          }
   
         /* init server actor */          /* init server actor */
        sess->sess_srv = mqtt_srv_Init(sock, sess->sess_buf, sess->sess_ka);        sess->sess_srv = mqtt_srv_cliInit(sock, sess->sess_buf, sess->sess_ka, 1);
         if (!sess->sess_srv) {          if (!sess->sess_srv) {
                 ioDEBUG(3, "Error:: in srv_Init #%d - %s", mqtt_GetErrno(), mqtt_GetError());                  ioDEBUG(3, "Error:: in srv_Init #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                 mqtt_msgFree(&sess->sess_buf, 42);                  mqtt_msgFree(&sess->sess_buf, 42);
Line 65  static void Line 64  static void
 finiSession(struct tagSession *sess)  finiSession(struct tagSession *sess)
 {  {
         struct tagStore *store;          struct tagStore *store;
         struct tagPkt *p;  
   
         ioTRACE(5);          ioTRACE(5);
   
Line 92  finiSession(struct tagSession *sess) Line 90  finiSession(struct tagSession *sess)
                 io_free(store);                  io_free(store);
         }          }
   
         while ((p = SLIST_FIRST(&sess->sess_sndpkt))) {  
                 SLIST_REMOVE_HEAD(&sess->sess_sndpkt, pkt_node);  
   
                 io_freeVar(&p->pkt_data);  
                 io_free(p);  
         }  
   
         if (sess->sess_will.msg)          if (sess->sess_will.msg)
                 free(sess->sess_will.msg);                  free(sess->sess_will.msg);
         if (sess->sess_will.topic)          if (sess->sess_will.topic)
Line 107  finiSession(struct tagSession *sess) Line 98  finiSession(struct tagSession *sess)
         if (sess->sess_sock > STDERR_FILENO)          if (sess->sess_sock > STDERR_FILENO)
                 srv_Close(sess->sess_sock);                  srv_Close(sess->sess_sock);
   
        mqtt_srv_Fini(&sess->sess_srv);        mqtt_srv_cliFini(&sess->sess_srv);
         mqtt_msgFree(&sess->sess_buf, 42);          mqtt_msgFree(&sess->sess_buf, 42);
   
         io_free(sess);          io_free(sess);
Line 167  dispatchSession(sched_task_t *task) Line 158  dispatchSession(sched_task_t *task)
   
         do {          do {
                 /* dispatch message type */                  /* dispatch message type */
                if ((len = mqtt_srv_Dispatch(sess->sess_srv, ret, sess)) < 0) {                if ((len = mqtt_srv_cliDispatch(sess->sess_srv, ret, sess)) < 0) {
                         if (len == -1) {                          if (len == -1) {
                                 ioLIBERR(mqtt);                                  ioLIBERR(mqtt);
                                 finiSession(sess);                                  finiSession(sess);
Line 188  dispatchSession(sched_task_t *task) Line 179  dispatchSession(sched_task_t *task)
         return NULL;          return NULL;
 }  }
   
   static int
   loadSubscribes(struct tagSession * __restrict sess, mqtt_subscr_t * __restrict subs)
   {
           register int i;
           struct tagStore *store;
   
           if (!subs)
                   return -1;
   
           for (i = 0; subs[i].sub_topic.msg_base; i++) {
                   store = io_malloc(sizeof(struct tagStore));
                   if (!store) {
                           ioSYSERR(0);
                           continue;
                   } else {
                           store->st_msgid = 0;
                           mqtt_subCopy(&store->st_subscr, &subs[i]);
                   }
   
                   /* add to cache */
                   SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
           }
   
           return 0;
   }
   
 void *  void *
 startSession(sched_task_t *task)  startSession(sched_task_t *task)
 {  {
Line 198  startSession(sched_task_t *task) Line 215  startSession(sched_task_t *task)
         ait_val_t *v;          ait_val_t *v;
         struct tagSession *s, *sess = NULL;          struct tagSession *s, *sess = NULL;
         int ret, wlen;          int ret, wlen;
           mqtt_subscr_t *subs;
   
         ioTRACE(4);          ioTRACE(4);
   
Line 261  startSession(sched_task_t *task) Line 279  startSession(sched_task_t *task)
                 TAILQ_FOREACH(s, &Sessions, sess_node)                  TAILQ_FOREACH(s, &Sessions, sess_node)
                         if (!strcmp(s->sess_cid, sess->sess_cid)) {                          if (!strcmp(s->sess_cid, sess->sess_cid)) {
                                 /* found stale session & disconnect it! */                                  /* found stale session & disconnect it! */
                                schedWrite(root, leaveClient, sess, TASK_FD(task), NULL, 0);                                schedCancelby(root, taskMAX, CRITERIA_FD, (void*) s->sess_sock, NULL);
                                 schedWrite(root, leaveClient, s, s->sess_sock, NULL, 0);
                                 break;                                  break;
                         }                          }
         }          }
Line 282  startSession(sched_task_t *task) Line 301  startSession(sched_task_t *task)
                         call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%");                          call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%");
                 if (call.WipePUB_topic)                  if (call.WipePUB_topic)
                         call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1);                          call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1);
        } else {        } else if (call.ReadPUB_subscribe) {
                // todo: read_sql subs and prepare publish                subs = call.ReadPUB_subscribe(&cfg, pub, sess->sess_cid, "%");
                 loadSubscribes(sess, subs);
                 mqtt_subFree(&subs);
         }          }
   
         /* Start session task OK ... */          /* Start session task OK ... */
Line 324  acceptClient(sched_task_t *task) Line 345  acceptClient(sched_task_t *task)
   
         if ((cli = accept(TASK_FD(task), &sa.sa, &sslen)) == -1)          if ((cli = accept(TASK_FD(task), &sa.sa, &sslen)) == -1)
                 goto end;                  goto end;
         else  
                 fcntl(cli, F_SETFL, fcntl(cli, F_GETFL, 0) | O_NONBLOCK);  
   
         v = io_allocVar();          v = io_allocVar();
         if (!v) {          if (!v) {
Line 361  Run(int sock) Line 380  Run(int sock)
   
         ioTRACE(1);          ioTRACE(1);
   
        if (listen(sock, SOMAXCONN) == -1) {        if (mqtt_srv_Listen(sock, 0, 1) == -1) {
                ioSYSERR(0);                ioLIBERR(mqtt);
                 return -1;                  return -1;
        } else        }
                fcntl(sock, F_SETFL, fcntl(sock, F_GETFL, 0) | O_NONBLOCK); 
   
         /* state machine - accept new connections */          /* state machine - accept new connections */
         if (!schedRead(root, acceptClient, NULL, sock, NULL, 0)) {          if (!schedRead(root, acceptClient, NULL, sock, NULL, 0)) {

Removed from v.1.2.2.35  
changed lines
  Added in v.1.2.2.41


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>