Diff for /mqtt/src/daemon.c between versions 1.1.2.30 and 1.2.2.8

version 1.1.2.30, 2012/01/24 10:18:45 version 1.2.2.8, 2012/04/15 23:45:34
Line 2 Line 2
 #include "rtlm.h"  #include "rtlm.h"
 #include "utils.h"  #include "utils.h"
 #include "mqttd.h"  #include "mqttd.h"
   #include "mqttd_calls.h"
   
   
 static void *startSession(sched_task_t *task);  static void *startSession(sched_task_t *task);
Line 26  initSession(int sock, ait_val_t * __restrict v) Line 27  initSession(int sock, ait_val_t * __restrict v)
         } else          } else
                 memset(sess, 0, sizeof(struct tagSession));                  memset(sess, 0, sizeof(struct tagSession));
   
        str = (const char*) cfg_GetAttribute(&cfg, CFG("mqttd"), CFG("retry"));        pthread_mutex_init(&sess->sess_mtx, NULL);
 
         SLIST_INIT(&sess->sess_txque[MQTT_QOS_ONCE]);
         SLIST_INIT(&sess->sess_txque[MQTT_QOS_ACK]);
         SLIST_INIT(&sess->sess_txque[MQTT_QOS_EXACTLY]);
 
         str = cfg_getAttribute(&cfg, "mqttd", "retry");
         if (!str)          if (!str)
                 sess->sess_retry = DEFAULT_RETRY;                  sess->sess_retry = DEFAULT_RETRY;
         else          else
Line 40  initSession(int sock, ait_val_t * __restrict v) Line 47  initSession(int sock, ait_val_t * __restrict v)
                 return NULL;                  return NULL;
         }          }
   
           /* init server actor */
           sess->sess_srv = mqtt_srv_Init(sock, sess->sess_buf);
           if (!sess->sess_srv) {
                   ioDEBUG(3, "Error:: in srv_Init #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                   mqtt_msgFree(&sess->sess_buf, 42);
                   free(sess);
                   io_freeVar(v);
                   return NULL;
           } else {
                   mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_CONNECT, cmdCONNECT);
                   mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_PUBLISH, cmdPUBLISH);
                   mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_PUBREL, cmdPUBREL);
                   mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_SUBSCRIBE, cmdSUBSCRIBE);
                   mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_UNSUBSCRIBE, cmdUNSUBSCRIBE);
                   mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_PINGREQ, cmdPINGREQ);
                   mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_DISCONNECT, cmdDISCONNECT);
           }
   
         sess->sess_sock = sock;          sess->sess_sock = sock;
         strlcpy(sess->sess_addr, (char*) AIT_GET_STR(v), sizeof sess->sess_addr);          strlcpy(sess->sess_addr, (char*) AIT_GET_STR(v), sizeof sess->sess_addr);
         io_freeVar(v);          io_freeVar(v);
Line 50  static void Line 75  static void
 finiSession(struct tagSession *sess, int preservSock)  finiSession(struct tagSession *sess, int preservSock)
 {  {
         struct tagStore *store;          struct tagStore *store;
           register int i;
   
         ioTRACE(5);          ioTRACE(5);
   
Line 59  finiSession(struct tagSession *sess, int preservSock) Line 85  finiSession(struct tagSession *sess, int preservSock)
         if (call.FiniSessPUB)          if (call.FiniSessPUB)
                 call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");                  call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
   
        while ((store = SLIST_FIRST(&sess->sess_sndqueue))) {        SESS_ELEM_LOCK(sess);
                SLIST_REMOVE_HEAD(&sess->sess_sndqueue, st_node);        for (i = 0; i < MQTT_QOS_RESERVED; i++)
                free(store);                while ((store = SLIST_FIRST(&sess->sess_txque[i]))) {
        }                        SLIST_REMOVE_HEAD(&sess->sess_txque[i], st_node);
   
                           if (store->st_subscr.sub_topic.msg_base)
                                   free(store->st_subscr.sub_topic.msg_base);
                           if (store->st_subscr.sub_value.msg_base)
                                   free(store->st_subscr.sub_value.msg_base);
   
                           free(store);
                   }
           SESS_ELEM_UNLOCK(sess);
           pthread_mutex_destroy(&sess->sess_mtx);
   
         if (sess->sess_will.msg)          if (sess->sess_will.msg)
                 free(sess->sess_will.msg);                  free(sess->sess_will.msg);
         if (sess->sess_will.topic)          if (sess->sess_will.topic)
Line 72  finiSession(struct tagSession *sess, int preservSock) Line 108  finiSession(struct tagSession *sess, int preservSock)
         if (sess->sess_sock > STDERR_FILENO && !preservSock)          if (sess->sess_sock > STDERR_FILENO && !preservSock)
                 srv_Close(sess->sess_sock);                  srv_Close(sess->sess_sock);
   
           mqtt_srv_Fini(&sess->sess_srv);
         mqtt_msgFree(&sess->sess_buf, 42);          mqtt_msgFree(&sess->sess_buf, 42);
   
         free(sess);          free(sess);
Line 87  stopSession(struct tagSession *sess) Line 124  stopSession(struct tagSession *sess)
   
         assert(sess);          assert(sess);
   
        pthread_mutex_lock(&mtx_sess);        SESS_LOCK;
         TAILQ_REMOVE(&Sessions, sess, sess_node);          TAILQ_REMOVE(&Sessions, sess, sess_node);
        pthread_mutex_unlock(&mtx_sess);        SESS_UNLOCK;
   
         ret = mqtt_msgDISCONNECT(&msg);          ret = mqtt_msgDISCONNECT(&msg);
         if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1)          if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1)
Line 156  KASession(struct tagSession *sess) Line 193  KASession(struct tagSession *sess)
 static void *  static void *
 thrSession(struct tagSession *sess)  thrSession(struct tagSession *sess)
 {  {
         mqtt_msg_t msg = { NULL, 0 };  
         int ret, locKill = 42;          int ret, locKill = 42;
         struct pollfd pfd;          struct pollfd pfd;
         struct mqtthdr *hdr;          struct mqtthdr *hdr;
         ait_val_t *v;          ait_val_t *v;
         struct tagStore *store;  
   
         pthread_cleanup_push((void(*)(void*)) stopSession, sess);          pthread_cleanup_push((void(*)(void*)) stopSession, sess);
         ioTRACE(2);          ioTRACE(2);
Line 189  thrSession(struct tagSession *sess) Line 224  thrSession(struct tagSession *sess)
                         hdr = (struct mqtthdr*) sess->sess_buf->msg_base;                          hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
   
                 /* dispatch message type */                  /* dispatch message type */
                   if (mqtt_srv_Dispatch(sess->sess_srv, sess))
                           ioLIBERR(mqtt);
                 switch (hdr->mqtt_msg.type) {                  switch (hdr->mqtt_msg.type) {
                         case MQTT_TYPE_CONNECT:                          case MQTT_TYPE_CONNECT:
                                 ioDEBUG(5, "Exec CONNECT session");                                  ioDEBUG(5, "Exec CONNECT session");
                                 pthread_mutex_lock(&mtx_sess);  
                                 TAILQ_REMOVE(&Sessions, sess, sess_node);  
                                 pthread_mutex_unlock(&mtx_sess);  
   
                                 if (call.FiniSessPUB)  
                                         call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");  
   
                                 while ((store = SLIST_FIRST(&sess->sess_sndqueue))) {  
                                         SLIST_REMOVE_HEAD(&sess->sess_sndqueue, st_node);  
                                         free(store);  
                                 }  
   
                                 if (sess->sess_will.msg)  
                                         free(sess->sess_will.msg);  
                                 if (sess->sess_will.topic)  
                                         free(sess->sess_will.topic);  
   
                                 if ((v = io_allocVar())) {                                  if ((v = io_allocVar())) {
                                         AIT_SET_STR(v, sess->sess_addr);                                          AIT_SET_STR(v, sess->sess_addr);
                                         if (!schedEvent(root, startSession, v, (u_long) sess->sess_sock, sess, ret))                                          if (!schedEvent(root, startSession, v, (u_long) sess->sess_sock, sess, ret))
                                                 io_freeVar(v);                                                  io_freeVar(v);
                                 } else                                  } else
                                         ioLIBERR(mqtt);                                          ioLIBERR(mqtt);
   
                                 locKill ^= locKill;                                  locKill ^= locKill;
                                 continue;                                  continue;
                         case MQTT_TYPE_DISCONNECT:                          case MQTT_TYPE_DISCONNECT:
                                 ioDEBUG(5, "Exec DISCONNECT session");                                  ioDEBUG(5, "Exec DISCONNECT session");
                                 pthread_mutex_lock(&mtx_sess);  
                                 TAILQ_REMOVE(&Sessions, sess, sess_node);  
                                 pthread_mutex_unlock(&mtx_sess);  
   
                                 finiSession(sess, 0);                                  finiSession(sess, 0);
                                 locKill ^= locKill;                                  locKill ^= locKill;
                                 continue;                                  continue;
                         case MQTT_TYPE_PUBLISH:                          case MQTT_TYPE_PUBLISH:
                                ioDEBUG(5, "Work in progress ...");                                ioDEBUG(5, "Exec PUBLISH topic QoS=%d", hdr->mqtt_msg.qos);
                                 /*
                                 if (cmdPUBLISH(sess))
                                         locKill ^= locKill;
                                         */
                                 break;                                  break;
                         case MQTT_TYPE_PUBREL:                          case MQTT_TYPE_PUBREL:
                                 break;                                  break;
Line 237  thrSession(struct tagSession *sess) Line 256  thrSession(struct tagSession *sess)
                         case MQTT_TYPE_UNSUBSCRIBE:                          case MQTT_TYPE_UNSUBSCRIBE:
                                 break;                                  break;
                         case MQTT_TYPE_PINGREQ:                          case MQTT_TYPE_PINGREQ:
                                   ioDEBUG(5, "Exec PINGREQ session");
                                 break;                                  break;
                         default:                          default:
                                 ioDEBUG(5, "Error:: Session %s, wrong command %d - DISCARDED",                                   ioDEBUG(5, "Error:: Session %s, wrong command %d - DISCARDED", 
Line 264  startSession(sched_task_t *task) Line 284  startSession(sched_task_t *task)
   
         assert(task);          assert(task);
   
         ioDEBUG(3, "task_Data=%p", TASK_DATA(task));  
         if (!TASK_DATA(task)) {          if (!TASK_DATA(task)) {
                   /* flow from accept new clients */
                 sess = initSession(TASK_FD(task), TASK_ARG(task));                  sess = initSession(TASK_FD(task), TASK_ARG(task));
                 if (!sess) {                  if (!sess) {
                         io_freeVar(TASK_ARG(task));                          io_freeVar(TASK_ARG(task));
Line 282  startSession(sched_task_t *task) Line 302  startSession(sched_task_t *task)
         } else {          } else {
                 sess = TASK_DATA(task);                  sess = TASK_DATA(task);
                 buf.msg_len = TASK_DATLEN(task) > sizeof basebuf ? sizeof basebuf : TASK_DATLEN(task);                  buf.msg_len = TASK_DATLEN(task) > sizeof basebuf ? sizeof basebuf : TASK_DATLEN(task);
                 ioDEBUG(3, "debug:: sock=%d s=%p sbuf=%p sbl=%d ret=%d\n", sess->sess_sock, sess->sess_buf, sess->sess_buf->msg_base, (int) sess->sess_buf->msg_len, (int) TASK_DATLEN(task));  
                 memcpy(buf.msg_base, sess->sess_buf->msg_base, buf.msg_len);                  memcpy(buf.msg_base, sess->sess_buf->msg_base, buf.msg_len);
         }          }
   
Line 347  startSession(sched_task_t *task) Line 366  startSession(sched_task_t *task)
         pthread_attr_init(&attr);          pthread_attr_init(&attr);
         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);          pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
   
        pthread_mutex_lock(&mtx_sess);        SESS_LOCK;
         TAILQ_INSERT_TAIL(&Sessions, sess, sess_node);          TAILQ_INSERT_TAIL(&Sessions, sess, sess_node);
         pthread_create(&sess->sess_tid, &attr, (void*(*)(void*)) thrSession, sess);          pthread_create(&sess->sess_tid, &attr, (void*(*)(void*)) thrSession, sess);
        pthread_mutex_unlock(&mtx_sess);        SESS_UNLOCK;
   
         call.LOG(logg, "Session %s started from %s for user %s (timeout=%d) OK!\n", sess->sess_cid,           call.LOG(logg, "Session %s started from %s for user %s (timeout=%d) OK!\n", sess->sess_cid, 
                         sess->sess_addr, sess->sess_user, sess->sess_ka);                          sess->sess_addr, sess->sess_user, sess->sess_ka);
Line 378  static void * Line 397  static void *
 thrSched(void *arg __unused)  thrSched(void *arg __unused)
 {  {
         struct tagSession *sess;          struct tagSession *sess;
           struct timespec pl = { 0, 100000000 };
   
         ioTRACE(1);          ioTRACE(1);
   
           /* start scheduler loop */
           schedPolling(root, &pl, NULL);
         schedRun(root, &Kill);          schedRun(root, &Kill);
   
         TAILQ_FOREACH(sess, &Sessions, sess_node)          TAILQ_FOREACH(sess, &Sessions, sess_node)
Line 402  Run(int sock) Line 424  Run(int sock)
   
         ioTRACE(1);          ioTRACE(1);
   
           /* start alternative thread for scheduler */
         if (pthread_create(&tid, NULL, thrSched, NULL)) {          if (pthread_create(&tid, NULL, thrSched, NULL)) {
                 ioSYSERR(0);                  ioSYSERR(0);
                 return -1;                  return -1;
Line 414  Run(int sock) Line 437  Run(int sock)
                 return -1;                  return -1;
         }          }
   
           /* state machine - accept new connections */
         while (!Kill) {          while (!Kill) {
                 if ((cli = accept(sock, &sa.sa, &sslen)) == -1) {                  if ((cli = accept(sock, &sa.sa, &sslen)) == -1) {
                         if (!Kill)                          if (!Kill)
Line 423  Run(int sock) Line 447  Run(int sock)
                 }                  }
                 v = io_allocVar();                  v = io_allocVar();
                 if (!v) {                  if (!v) {
                        ioLIBERR(mqtt);                        ioLIBERR(io);
                         break;                          break;
                 } else {                  } else {
                         memset(str, 0, sizeof str);                          memset(str, 0, sizeof str);
                         snprintf(str, sizeof str, "%s:%hu", io_n2addr(&sa, v), io_n2port(&sa));                          snprintf(str, sizeof str, "%s:%hu", io_n2addr(&sa, v), io_n2port(&sa));
                           AIT_FREE_VAL(v);
                         AIT_SET_STR(v, str);                          AIT_SET_STR(v, str);
                 }                  }
                 ioDEBUG(1, "Connected client with socket=%d from %s", cli, AIT_GET_STR(v));                  ioDEBUG(1, "Connected client with socket=%d from %s", cli, AIT_GET_STR(v));

Removed from v.1.1.2.30  
changed lines
  Added in v.1.2.2.8


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>