| 
version 1.2.2.7, 2012/04/27 10:07:51
 | 
version 1.2.2.18, 2012/05/27 10:04:05
 | 
| 
 Line 4
 | 
 Line 4
 | 
 |  #include "mqttd_calls.h" | 
  #include "mqttd_calls.h" | 
 |   | 
   | 
 |   | 
   | 
 |   | 
  static int | 
 |   | 
  pubOnce(struct tagSession *sess, u_short mid, char * __restrict psTopic,  | 
 |   | 
                  int topicLen, char * __restrict data, int datlen) | 
 |   | 
  { | 
 |   | 
          return 0; | 
 |   | 
  } | 
 |   | 
   | 
 |   | 
  static int | 
 |   | 
  pubAck(struct tagSession *sess, u_short mid, char * __restrict psTopic,  | 
 |   | 
                  int topicLen, char * __restrict data, int datlen) | 
 |   | 
  { | 
 |   | 
          return 0; | 
 |   | 
  } | 
 |   | 
   | 
 |   | 
  static int | 
 |   | 
  pubExactly(struct tagSession *sess, u_short mid, char * __restrict psTopic,  | 
 |   | 
                  int topicLen, char * __restrict data, int datlen) | 
 |   | 
  { | 
 |   | 
          return 0; | 
 |   | 
  } | 
 |   | 
   | 
 |   | 
   | 
 |  int | 
  int | 
| cmdPUBLISH(void *srv, void *arg) | cmdPUBLISH(void *srv, int len, void *arg) | 
 |  { | 
  { | 
 |          struct mqtthdr *hdr; | 
          struct mqtthdr *hdr; | 
 |          struct tagSession *sess = (struct tagSession*) arg; | 
          struct tagSession *sess = (struct tagSession*) arg; | 
 |   | 
          void *data = NULL; | 
 |   | 
          char szTopic[STRSIZ] = { 0 }; | 
 |   | 
          int siz = 0; | 
 |   | 
          u_short mid = 0; | 
 |   | 
   | 
 |          ioTRACE(2); | 
          ioTRACE(2); | 
 |   | 
   | 
 |          if (!sess) | 
          if (!sess) | 
 |                  return -1; | 
                  return -1; | 
 |   | 
   | 
 |   | 
          ioDEBUG(5, "Exec PUBLISH session"); | 
 |   | 
          siz = mqtt_readPUBLISH(sess->sess_buf, szTopic, sizeof szTopic, &mid, &data); | 
 |   | 
          if (siz == -1) { | 
 |   | 
                  ioDEBUG(5, "Error:: in readPUBLISH #%d - %s", mqtt_GetErrno(), mqtt_GetError()); | 
 |   | 
                  return 0; | 
 |   | 
          } | 
 |   | 
   | 
 |          hdr = (struct mqtthdr*) sess->sess_buf->msg_base; | 
          hdr = (struct mqtthdr*) sess->sess_buf->msg_base; | 
 |          switch (hdr->mqtt_msg.qos) { | 
          switch (hdr->mqtt_msg.qos) { | 
 |                  case MQTT_QOS_ONCE: | 
   | 
 |                          break; | 
   | 
 |                  case MQTT_QOS_ACK: | 
                  case MQTT_QOS_ACK: | 
 |   | 
                          pubAck(sess, mid, szTopic, sizeof szTopic, data, siz); | 
 |   | 
                          siz = mqtt_msgPUBACK(sess->sess_buf, mid); | 
 |   | 
                          if (siz == -1) { | 
 |   | 
                                  ioDEBUG(5, "Error:: in msgPUBACK #%d - %s",  | 
 |   | 
                                                  mqtt_GetErrno(), mqtt_GetError()); | 
 |   | 
                                  goto end; | 
 |   | 
                          } | 
 |                          break; | 
                          break; | 
 |                  case MQTT_QOS_EXACTLY: | 
                  case MQTT_QOS_EXACTLY: | 
 |   | 
                          pubExactly(sess, mid, szTopic, sizeof szTopic, data, siz); | 
 |   | 
                          siz = mqtt_msgPUBREC(sess->sess_buf, mid); | 
 |   | 
                          if (siz == -1) { | 
 |   | 
                                  ioDEBUG(5, "Error:: in msgPUBREC #%d - %s",  | 
 |   | 
                                                  mqtt_GetErrno(), mqtt_GetError()); | 
 |   | 
                                  goto end; | 
 |   | 
                          } | 
 |                          break; | 
                          break; | 
 |   | 
                  case MQTT_QOS_ONCE: | 
 |   | 
                          pubOnce(sess, mid, szTopic, sizeof szTopic, data, siz); | 
 |                  default: | 
                  default: | 
|                         ioDEBUG(1, "Error:: Unknown QoS %d - rejected publishing request",  |                         goto end; | 
|                                         hdr->mqtt_msg.qos); |   | 
|                         return 0; |   | 
 |          } | 
          } | 
 |   | 
   | 
 |   | 
          if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1) | 
 |   | 
                  ioSYSERR(0); | 
 |   | 
          else { | 
 |   | 
                  ioDEBUG(5, "Sended %d bytes.", siz); | 
 |   | 
                  memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len); | 
 |   | 
          } | 
 |   | 
  end: | 
 |   | 
          if (data) | 
 |   | 
                  free(data); | 
 |          return 0; | 
          return 0; | 
 |  } | 
  } | 
 |   | 
   | 
 |  int | 
  int | 
| cmdPUBREL(void *srv, void *arg) | cmdPUBREL(void *srv, int len, void *arg) | 
 |  { | 
  { | 
 |          struct mqtthdr *hdr; | 
   | 
 |          struct tagSession *sess = (struct tagSession*) arg; | 
          struct tagSession *sess = (struct tagSession*) arg; | 
 |   | 
          int siz = 0; | 
 |   | 
          u_short mid = 0; | 
 |   | 
   | 
 |          ioTRACE(2); | 
          ioTRACE(2); | 
 |   | 
   | 
 |          if (!sess) | 
          if (!sess) | 
 |                  return -1; | 
                  return -1; | 
 |   | 
   | 
|         hdr = (struct mqtthdr*) sess->sess_buf->msg_base; |         ioDEBUG(5, "Exec PUBREL session"); | 
|   |         mid = mqtt_readPUBREL(sess->sess_buf); | 
|   |         if (mid == (u_short) -1) { | 
|   |                 ioDEBUG(5, "Error:: in readPUBREL #%d - %s", mqtt_GetErrno(), mqtt_GetError()); | 
|   |                 return 0; | 
|   |         } | 
 |   | 
   | 
 |   | 
          // TODO:: Delete from database topic | 
 |   | 
   | 
 |   | 
          siz = mqtt_msgPUBCOMP(sess->sess_buf, mid); | 
 |   | 
          if (siz == -1) { | 
 |   | 
                  ioDEBUG(5, "Error:: in msgPUBCOMP #%d - %s", mqtt_GetErrno(), mqtt_GetError()); | 
 |   | 
                  return 0; | 
 |   | 
          } | 
 |   | 
          if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1) | 
 |   | 
                  ioSYSERR(0); | 
 |   | 
          else { | 
 |   | 
                  ioDEBUG(5, "Sended %d bytes.", siz); | 
 |   | 
                  memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len); | 
 |   | 
          } | 
 |   | 
   | 
 |          return 0; | 
          return 0; | 
 |  } | 
  } | 
 |   | 
   | 
 |  int | 
  int | 
| cmdSUBSCRIBE(void *srv, void *arg) | cmdSUBSCRIBE(void *srv, int len, void *arg) | 
 |  { | 
  { | 
 |          struct tagSession *sess = (struct tagSession*) arg; | 
          struct tagSession *sess = (struct tagSession*) arg; | 
 |          mqtt_subscr_t *subs = NULL; | 
          mqtt_subscr_t *subs = NULL; | 
| 
 Line 57  cmdSUBSCRIBE(void *srv, void *arg)
 | 
 Line 131  cmdSUBSCRIBE(void *srv, void *arg)
 | 
 |          u_short mid = 0; | 
          u_short mid = 0; | 
 |          register int i; | 
          register int i; | 
 |          struct tagStore *store; | 
          struct tagStore *store; | 
 |   | 
          char buf[BUFSIZ]; | 
 |   | 
          void *ptr; | 
 |   | 
   | 
 |          ioTRACE(2); | 
          ioTRACE(2); | 
 |   | 
   | 
| 
 Line 72  cmdSUBSCRIBE(void *srv, void *arg)
 | 
 Line 148  cmdSUBSCRIBE(void *srv, void *arg)
 | 
 |   | 
   | 
 |          /* add to db */ | 
          /* add to db */ | 
 |          for (i = 0; i < siz; i++) { | 
          for (i = 0; i < siz; i++) { | 
|                 if ((siz = call.WritePUB_subscribe(&cfg, pub, mid, subs[i].sub_topic.msg_base,  |                 /* convert topic to sql search statement */ | 
|                                 sess->sess_user, sess->sess_addr, subs[i].sub_ret)) > 0) { |                 if (mqtt_sqlTopic(subs[i].sub_topic.msg_base, buf, sizeof buf) == -1) { | 
|                         store = malloc(sizeof(struct tagStore)); |                         ioDEBUG(5, "Error:: in db #%d - %s", mqtt_GetErrno(), mqtt_GetError()); | 
|   |                         goto end; | 
|   |                 } | 
|   |                 if (call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, buf,  | 
|   |                                 sess->sess_user, sess->sess_addr, subs[i].sub_ret) > 0) { | 
|   |                         store = io_malloc(sizeof(struct tagStore)); | 
 |                          if (!store) { | 
                          if (!store) { | 
 |                                  ioSYSERR(0); | 
                                  ioSYSERR(0); | 
 |                                  goto end; | 
                                  goto end; | 
 |                          } else { | 
                          } else { | 
 |                                  store->st_msgid = mid; | 
                                  store->st_msgid = mid; | 
|                                 store->st_subscr = subs[i]; |                                 mqtt_subCopy(&store->st_subscr, &subs[i]); | 
 |                          } | 
                          } | 
 |   | 
   | 
 |                          /* add to cache */ | 
                          /* add to cache */ | 
 |                          SESS_ELEM_LOCK(sess); | 
   | 
 |                          SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node); | 
                          SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node); | 
 |                          SESS_ELEM_UNLOCK(sess); | 
   | 
 |   | 
   | 
 |   | 
                          /* convert topic to regexp */ | 
 |   | 
                          if (mqtt_expandTopic(subs[i].sub_topic.msg_base, buf, sizeof buf, 1, 0) == -1) { | 
 |   | 
                                  ioDEBUG(5, "Error:: in regexp #%d - %s", mqtt_GetErrno(), mqtt_GetError()); | 
 |   | 
                                  goto end; | 
 |   | 
                          } else { | 
 |   | 
                                  ptr = realloc(store->st_subscr.sub_topic.msg_base, strlen(buf) + 1); | 
 |   | 
                                  if (!ptr) { | 
 |   | 
                                          ioSYSERR(0); | 
 |   | 
                                          goto end; | 
 |   | 
                                  } else { | 
 |   | 
                                          store->st_subscr.sub_topic.msg_base = ptr; | 
 |   | 
                                          store->st_subscr.sub_topic.msg_len = strlen(buf) + 1; | 
 |   | 
                                          memcpy(store->st_subscr.sub_topic.msg_base, buf,  | 
 |   | 
                                                          store->st_subscr.sub_topic.msg_len); | 
 |   | 
                                  } | 
 |   | 
                          } | 
 |   | 
   | 
 |   | 
                          call.LOG(logg, "Added [%s] SUBSCRIBE '%s'(%d) from %s\n", sess->sess_cid,  | 
 |   | 
                                          store->st_subscr.sub_topic.msg_base,  | 
 |   | 
                                          store->st_subscr.sub_topic.msg_len, sess->sess_addr); | 
 |   | 
   | 
 |                          subs[i].sub_ret = MQTT_QOS_PASS; | 
                          subs[i].sub_ret = MQTT_QOS_PASS; | 
 |                  } else | 
                  } else | 
 |                          subs[i].sub_ret = MQTT_QOS_DENY; | 
                          subs[i].sub_ret = MQTT_QOS_DENY; | 
| 
 Line 99  cmdSUBSCRIBE(void *srv, void *arg)
 | 
 Line 199  cmdSUBSCRIBE(void *srv, void *arg)
 | 
 |                  ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError()); | 
                  ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError()); | 
 |                  goto end; | 
                  goto end; | 
 |          } | 
          } | 
|         if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, 0)) == -1) |         if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1) | 
 |                  ioSYSERR(0); | 
                  ioSYSERR(0); | 
|         else |         else { | 
 |                  ioDEBUG(5, "Sended %d bytes.", siz); | 
                  ioDEBUG(5, "Sended %d bytes.", siz); | 
 |   | 
                  memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len); | 
 |   | 
          } | 
 |  end: | 
  end: | 
 |          mqtt_subFree(&subs); | 
          mqtt_subFree(&subs); | 
 |          return 0; | 
          return 0; | 
 |  } | 
  } | 
 |   | 
   | 
 |  int | 
  int | 
| cmdUNSUBSCRIBE(void *srv, void *arg) | cmdUNSUBSCRIBE(void *srv, int len, void *arg) | 
 |  { | 
  { | 
 |          struct tagSession *sess = (struct tagSession*) arg; | 
          struct tagSession *sess = (struct tagSession*) arg; | 
 |   | 
          mqtt_subscr_t *subs = NULL; | 
 |   | 
          int siz = 0; | 
 |   | 
          u_short mid = 0; | 
 |   | 
          register int i; | 
 |   | 
          struct tagStore *store, *tmp; | 
 |   | 
   | 
 |          ioTRACE(2); | 
          ioTRACE(2); | 
 |   | 
   | 
 |          if (!sess) | 
          if (!sess) | 
 |                  return -1; | 
                  return -1; | 
 |   | 
   | 
 |   | 
          ioDEBUG(5, "Exec UNSUBSCRIBE session"); | 
 |   | 
          siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs); | 
 |   | 
          if (siz == -1) { | 
 |   | 
                  ioDEBUG(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError()); | 
 |   | 
                  return 0; | 
 |   | 
          } | 
 |   | 
   | 
 |   | 
          /* del from db */ | 
 |   | 
          for (i = 0; i < siz; i++) { | 
 |   | 
                  SLIST_FOREACH_SAFE(store, &sess->sess_subscr, st_node, tmp) { | 
 |   | 
                          if (store->st_subscr.sub_ret == subs[i].sub_ret &&  | 
 |   | 
                                          store->st_subscr.sub_topic.msg_base &&  | 
 |   | 
                                          !strcmp(store->st_subscr.sub_topic.msg_base,  | 
 |   | 
                                                  subs[i].sub_topic.msg_base)) { | 
 |   | 
                                  SLIST_REMOVE(&sess->sess_subscr, store, tagStore, st_node); | 
 |   | 
   | 
 |   | 
                                  if (store->st_subscr.sub_topic.msg_base) | 
 |   | 
                                          free(store->st_subscr.sub_topic.msg_base); | 
 |   | 
                                  if (store->st_subscr.sub_value.msg_base) | 
 |   | 
                                          free(store->st_subscr.sub_value.msg_base); | 
 |   | 
                                  io_free(store); | 
 |   | 
                          } | 
 |   | 
                  } | 
 |   | 
   | 
 |   | 
                  call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, subs[i].sub_topic.msg_base,  | 
 |   | 
                                  sess->sess_user, "%"); | 
 |   | 
          } | 
 |   | 
   | 
 |   | 
          /* send acknowledge */ | 
 |   | 
          siz = mqtt_msgUNSUBACK(sess->sess_buf, mid); | 
 |   | 
          if (siz == -1) { | 
 |   | 
                  ioDEBUG(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError()); | 
 |   | 
                  goto end; | 
 |   | 
          } | 
 |   | 
          if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1) | 
 |   | 
                  ioSYSERR(0); | 
 |   | 
          else { | 
 |   | 
                  ioDEBUG(5, "Sended %d bytes.", siz); | 
 |   | 
                  memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len); | 
 |   | 
          } | 
 |   | 
  end: | 
 |   | 
          mqtt_subFree(&subs); | 
 |          return 0; | 
          return 0; | 
 |  } | 
  } | 
 |   | 
   | 
 |  int | 
  int | 
| cmdPINGREQ(void *srv, void *arg) | cmdPINGREQ(void *srv, int len, void *arg) | 
 |  { | 
  { | 
 |          struct tagSession *sess = (struct tagSession*) arg; | 
          struct tagSession *sess = (struct tagSession*) arg; | 
 |          int siz = 0; | 
          int siz = 0; | 
| 
 Line 138  cmdPINGREQ(void *srv, void *arg)
 | 
 Line 287  cmdPINGREQ(void *srv, void *arg)
 | 
 |                  ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError()); | 
                  ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError()); | 
 |                  return 0; | 
                  return 0; | 
 |          } | 
          } | 
|         if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, 0)) == -1) { |         if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1) { | 
 |                  ioSYSERR(0); | 
                  ioSYSERR(0); | 
 |                  return 0; | 
                  return 0; | 
|         } else |         } else { | 
 |                  ioDEBUG(5, "Sended %d bytes.", siz); | 
                  ioDEBUG(5, "Sended %d bytes.", siz); | 
 |   | 
                  memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len); | 
 |   | 
          } | 
 |   | 
   | 
 |          return 0; | 
          return 0; | 
 |  } | 
  } | 
 |   | 
   | 
 |  int | 
  int | 
| cmdCONNECT(void *srv, void *arg) | cmdCONNECT(void *srv, int len, void *arg) | 
 |  { | 
  { | 
 |          struct tagStore *store; | 
          struct tagStore *store; | 
 |          struct tagSession *sess = (struct tagSession*) arg; | 
          struct tagSession *sess = (struct tagSession*) arg; | 
| 
 Line 159  cmdCONNECT(void *srv, void *arg)
 | 
 Line 310  cmdCONNECT(void *srv, void *arg)
 | 
 |                  return -1; | 
                  return -1; | 
 |   | 
   | 
 |          ioDEBUG(5, "Exec CONNECT session"); | 
          ioDEBUG(5, "Exec CONNECT session"); | 
 |          SESS_LOCK; | 
   | 
 |          TAILQ_REMOVE(&Sessions, sess, sess_node); | 
          TAILQ_REMOVE(&Sessions, sess, sess_node); | 
 |          SESS_UNLOCK; | 
   | 
 |   | 
   | 
|         if (call.FiniSessPUB) |         if (sess->sess_clean) { | 
|                 call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%"); |                 if (call.FiniSessPUB) | 
|   |                         call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%"); | 
|   |                 if (call.DeletePUB_subscribe) | 
|   |                         call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%"); | 
|   |                 if (call.WipePUB_topic) | 
|   |                         call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1); | 
|   |         } | 
 |   | 
   | 
 |          SESS_ELEM_LOCK(sess); | 
   | 
 |          while ((store = SLIST_FIRST(&sess->sess_subscr))) { | 
          while ((store = SLIST_FIRST(&sess->sess_subscr))) { | 
 |                  SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node); | 
                  SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node); | 
 |   | 
   | 
| 
 Line 175  cmdCONNECT(void *srv, void *arg)
 | 
 Line 329  cmdCONNECT(void *srv, void *arg)
 | 
 |                  if (store->st_subscr.sub_value.msg_base) | 
                  if (store->st_subscr.sub_value.msg_base) | 
 |                          free(store->st_subscr.sub_value.msg_base); | 
                          free(store->st_subscr.sub_value.msg_base); | 
 |   | 
   | 
|                 free(store); |                 io_free(store); | 
 |          } | 
          } | 
 |          SESS_ELEM_UNLOCK(sess); | 
   | 
 |   | 
   | 
 |          if (sess->sess_will.msg) | 
          if (sess->sess_will.msg) | 
 |                  free(sess->sess_will.msg); | 
                  free(sess->sess_will.msg); | 
| 
 Line 186  cmdCONNECT(void *srv, void *arg)
 | 
 Line 339  cmdCONNECT(void *srv, void *arg)
 | 
 |   | 
   | 
 |          call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,  | 
          call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,  | 
 |                          sess->sess_addr, sess->sess_user); | 
                          sess->sess_addr, sess->sess_user); | 
|         return 0; |  | 
|   |         return -3;     /* reconnect client */ | 
 |  } | 
  } | 
 |   | 
   | 
 |  int | 
  int | 
| cmdDISCONNECT(void *srv, void *arg) | cmdDISCONNECT(void *srv, int len, void *arg) | 
 |  { | 
  { | 
 |          struct tagSession *sess = (struct tagSession*) arg; | 
          struct tagSession *sess = (struct tagSession*) arg; | 
 |   | 
   | 
| 
 Line 200  cmdDISCONNECT(void *srv, void *arg)
 | 
 Line 354  cmdDISCONNECT(void *srv, void *arg)
 | 
 |                  return -1; | 
                  return -1; | 
 |   | 
   | 
 |          ioDEBUG(5, "Exec DISCONNECT session"); | 
          ioDEBUG(5, "Exec DISCONNECT session"); | 
 |          SESS_LOCK; | 
   | 
 |          TAILQ_REMOVE(&Sessions, sess, sess_node); | 
   | 
 |          SESS_UNLOCK; | 
   | 
 |   | 
   | 
 |          call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,  | 
          call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,  | 
 |                          sess->sess_addr, sess->sess_user); | 
                          sess->sess_addr, sess->sess_user); | 
|         return 0; |  | 
|   |         return -2;     /* must terminate dispatcher */ | 
 |  } | 
  } |