Diff for /mqtt/src/pubmqtt.c between versions 1.2.2.7 and 1.2.2.10

version 1.2.2.7, 2012/05/08 13:04:02 version 1.2.2.10, 2012/05/27 10:12:48
Line 24  mqtt_rtlm_log(const char *fmt, ...) Line 24  mqtt_rtlm_log(const char *fmt, ...)
                                         __func__, __LINE__, \                                          __func__, __LINE__, \
                                         sqlite3_errcode((_sql)), sqlite3_errmsg((_sql))))                                          sqlite3_errcode((_sql)), sqlite3_errmsg((_sql))))
   
   /* library pre-loaded actions */
   void
   _init()
   {
           sqlite3_initialize();
   }
   
   void
   _fini()
   {
           sqlite3_shutdown();
   }
   
   
 /*  /*
  * mqtt_rtlm_open() Open database connection   * mqtt_rtlm_open() Open database connection
  *   *
Line 200  mqtt_rtlm_chk_session(cfg_root_t *cfg, sqlite3 *sql, c Line 213  mqtt_rtlm_chk_session(cfg_root_t *cfg, sqlite3 *sql, c
                 return -1;                  return -1;
         }          }
         snprintf(szStmt, sizeof szStmt, "SELECT ConnID, RemoteHost FROM %s WHERE "          snprintf(szStmt, sizeof szStmt, "SELECT ConnID, RemoteHost FROM %s WHERE "
                        "ConnID = '%s' AND Username = '%s' AND RemoteHost LIKE '%s';",                         "ConnID = '%s' AND Username LIKE '%s' AND RemoteHost LIKE '%s';", 
                         str, connid, user, host);                          str, connid, user, host);
   
         if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {          if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
Line 221  mqtt_rtlm_chk_session(cfg_root_t *cfg, sqlite3 *sql, c Line 234  mqtt_rtlm_chk_session(cfg_root_t *cfg, sqlite3 *sql, c
  *   *
  * @cfg = loaded config   * @cfg = loaded config
  * @sql = SQL handle   * @sql = SQL handle
    * @connid = connection id
  * @msgid = MessageID   * @msgid = MessageID
  * @topic = topic   * @topic = topic
  * @txt = text   * @txt = text
Line 230  mqtt_rtlm_chk_session(cfg_root_t *cfg, sqlite3 *sql, c Line 244  mqtt_rtlm_chk_session(cfg_root_t *cfg, sqlite3 *sql, c
  * return: -1 error, 0 no publish or >0 published ok   * return: -1 error, 0 no publish or >0 published ok
  */   */
 int  int
mqtt_rtlm_write_topic(cfg_root_t *cfg, sqlite3 *sql, u_short msgid, const char *topic, const char *txt, mqtt_rtlm_write_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
                const char *user, const char *host, char retain)                const char *topic, const char *txt, const char *user, const char *host, char retain)
 {  {
         int ret = 0;          int ret = 0;
         char *str, szStmt[BUFSIZ] = { 0 };          char *str, szStmt[BUFSIZ] = { 0 };
Line 245  mqtt_rtlm_write_topic(cfg_root_t *cfg, sqlite3 *sql, u Line 259  mqtt_rtlm_write_topic(cfg_root_t *cfg, sqlite3 *sql, u
                 mqtt_rtlm_log("Error:: not found topics table name");                  mqtt_rtlm_log("Error:: not found topics table name");
                 return -1;                  return -1;
         }          }
        snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Retain, MsgID, Topic, Value, PubUser, "        snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Retain, ConnID, MsgID, Topic, Value, PubUser, "
                        "PubDate, PubHost) VALUES (%d, %d, '%s', '%s', '%s', "                        "PubDate, PubHost) VALUES (%d, '%s', %d, '%s', '%s', '%s', "
                         "datetime('now', 'localtime'), '%s');",                           "datetime('now', 'localtime'), '%s');", 
                        str, retain, msgid, topic, txt, user, host);                        str, retain, connid, msgid, topic, txt, user, host);
   
         if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {          if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                 MQTT_RTLM_LOG(sql);                  MQTT_RTLM_LOG(sql);
Line 267  mqtt_rtlm_write_topic(cfg_root_t *cfg, sqlite3 *sql, u Line 281  mqtt_rtlm_write_topic(cfg_root_t *cfg, sqlite3 *sql, u
 }  }
   
 /*  /*
    * mqtt_rtlm_wipe_topic() Wipe all topics
    *
    * @cfg = loaded config
    * @sql = SQL handle
    * @connid = connection id
    * @user = username
    * @retain = -1 no matter
    * return: -1 error, 0 no changes or >0 deleted rows
    */
   int
   mqtt_rtlm_wipe_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, char retain)
   {
           int ret = 0;
           char *str, *rtn, szStmt[BUFSIZ] = { 0 };
           sqlite3_stmt *stmt;
   
           if (!cfg || !sql || !connid)
                   return -1;
   
           str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
           if (!str) {
                   mqtt_rtlm_log("Error:: not found topics table name");
                   return -1;
           }
           switch (retain) {
                   case -1:
                           rtn = "";
                           break;
                   case 0:
                           rtn = "AND Retain = 0";
                           break;
                   default:
                           rtn = "AND Retain != 0";
                           break;
           }
           snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE ConnID = '%s' AND "
                           "PubUser LIKE '%s' %s;", str, connid, user, rtn);
   
           if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                   MQTT_RTLM_LOG(sql);
                   return -1;
           }
           if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                   ret = sqlite3_changes(sql);
           else {
                   if (ret > SQLITE_OK && ret < SQLITE_ROW)
                           MQTT_RTLM_LOG(sql);
                   ret = 0;
           }
           sqlite3_finalize(stmt);
   
           return ret;
   }
   
   /*
  * mqtt_rtlm_delete_topic() Delete topic   * mqtt_rtlm_delete_topic() Delete topic
  *   *
  * @cfg = loaded config   * @cfg = loaded config
  * @sql = SQL handle   * @sql = SQL handle
    * @connid = connection id
  * @msgid = MessageID   * @msgid = MessageID
  * @topic = topic   * @topic = topic
  * @user = username   * @user = username
Line 279  mqtt_rtlm_write_topic(cfg_root_t *cfg, sqlite3 *sql, u Line 349  mqtt_rtlm_write_topic(cfg_root_t *cfg, sqlite3 *sql, u
  * return: -1 error, 0 no changes or >0 deleted rows   * return: -1 error, 0 no changes or >0 deleted rows
  */   */
 int  int
mqtt_rtlm_delete_topic(cfg_root_t *cfg, sqlite3 *sql, u_short msgid, const char *topic, mqtt_rtlm_delete_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
                const char *user, const char *host, char retain)                const char *topic, const char *user, const char *host, char retain)
 {  {
         int ret = 0;          int ret = 0;
         char *str, *rtn, szStmt[BUFSIZ] = { 0 };          char *str, *rtn, szStmt[BUFSIZ] = { 0 };
Line 305  mqtt_rtlm_delete_topic(cfg_root_t *cfg, sqlite3 *sql,  Line 375  mqtt_rtlm_delete_topic(cfg_root_t *cfg, sqlite3 *sql, 
                         rtn = "AND Retain != 0";                          rtn = "AND Retain != 0";
                         break;                          break;
         }          }
        snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE MsgID = %d AND Topic LIKE '%s' AND "        snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE ConnID = '%s' AND MsgID = %d AND "
                        "PubUser LIKE '%s' AND PubHost LIKE '%s' %s;", str,                         "Topic LIKE '%s' AND PubUser LIKE '%s' AND PubHost LIKE '%s' %s;", str, 
                        msgid, topic, user, host, rtn);                        connid, msgid, topic, user, host, rtn);
   
         if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {          if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                 MQTT_RTLM_LOG(sql);                  MQTT_RTLM_LOG(sql);
Line 330  mqtt_rtlm_delete_topic(cfg_root_t *cfg, sqlite3 *sql,  Line 400  mqtt_rtlm_delete_topic(cfg_root_t *cfg, sqlite3 *sql, 
  *   *
  * @cfg = loaded config   * @cfg = loaded config
  * @sql = SQL handle   * @sql = SQL handle
    * @connid = connection id
  * @msgid = MessageID   * @msgid = MessageID
  * @topic = topic   * @topic = topic
  * @retain = retain 0 get only dynamic, >0 get only retained and -1 no matter   * @retain = retain 0 get only dynamic, >0 get only retained and -1 no matter
  * return: NULL error or not found and !=NULL allocated subscribe topics   * return: NULL error or not found and !=NULL allocated subscribe topics
  */   */
 mqtt_subscr_t *  mqtt_subscr_t *
mqtt_rtlm_read_topic(cfg_root_t *cfg, sqlite3 *sql, u_short msgid, const char *topic, char retain)mqtt_rtlm_read_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
                 const char *topic, char retain)
 {  {
         int rowz = 0;          int rowz = 0;
         char *str, szStr[STRSIZ], szStmt[BUFSIZ] = { 0 };          char *str, szStr[STRSIZ], szStmt[BUFSIZ] = { 0 };
Line 365  mqtt_rtlm_read_topic(cfg_root_t *cfg, sqlite3 *sql, u_ Line 437  mqtt_rtlm_read_topic(cfg_root_t *cfg, sqlite3 *sql, u_
                 return NULL;                  return NULL;
         }          }
         snprintf(szStmt, sizeof szStmt, "SELECT Retain, Topic, Value FROM %s WHERE "          snprintf(szStmt, sizeof szStmt, "SELECT Retain, Topic, Value FROM %s WHERE "
                        "MsgID = %d AND Topic LIKE '%s' %s;",                         "ConnID = '%s' AND MsgID = %d AND Topic LIKE '%s' %s;", 
                        str, msgid, topic, szStr);                        str, connid, msgid, topic, szStr);
   
         if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {          if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                 MQTT_RTLM_LOG(sql);                  MQTT_RTLM_LOG(sql);
Line 376  mqtt_rtlm_read_topic(cfg_root_t *cfg, sqlite3 *sql, u_ Line 448  mqtt_rtlm_read_topic(cfg_root_t *cfg, sqlite3 *sql, u_
         /* calculate count of rows and allocate subscribe items */          /* calculate count of rows and allocate subscribe items */
         while (sqlite3_step(stmt) == SQLITE_ROW)          while (sqlite3_step(stmt) == SQLITE_ROW)
                 rowz++;                  rowz++;
        if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {        if (!(s = io_malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
                 mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));                  mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
                 goto end;                  goto end;
         } else          } else
Line 386  mqtt_rtlm_read_topic(cfg_root_t *cfg, sqlite3 *sql, u_ Line 458  mqtt_rtlm_read_topic(cfg_root_t *cfg, sqlite3 *sql, u_
         /* fill with data */          /* fill with data */
         for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {          for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
                 s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);                  s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
                s[j].sub_topic.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));                s[j].sub_topic.msg_base = (u_char*) io_strdup((char*) sqlite3_column_text(stmt, 1));
                 s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);                  s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
                s[j].sub_value.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 2));                s[j].sub_value.msg_base = (u_char*) io_strdup((char*) sqlite3_column_text(stmt, 2));
                 s[j].sub_value.msg_len = strlen((char*) s[j].sub_value.msg_base);                  s[j].sub_value.msg_len = strlen((char*) s[j].sub_value.msg_base);
         }          }
 end:  end:
Line 402  end: Line 474  end:
  *   *
  * @cfg = loaded config   * @cfg = loaded config
  * @sql = SQL handle   * @sql = SQL handle
    * @connid = connection id
  * @msgid = MessageID   * @msgid = MessageID
  * @topic = topic   * @topic = topic
  * @user = username   * @user = username
Line 410  end: Line 483  end:
  * return: -1 error, 0 no publish or >0 published ok   * return: -1 error, 0 no publish or >0 published ok
  */   */
 int  int
mqtt_rtlm_write_subscribe(cfg_root_t *cfg, sqlite3 *sql, u_short msgid, const char *topic, mqtt_rtlm_write_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
                const char *user, const char *host, char qos)                const char *topic, const char *user, const char *host, char qos)
 {  {
         int ret = 0;          int ret = 0;
         char *str, szStmt[BUFSIZ] = { 0 };          char *str, szStmt[BUFSIZ] = { 0 };
Line 425  mqtt_rtlm_write_subscribe(cfg_root_t *cfg, sqlite3 *sq Line 498  mqtt_rtlm_write_subscribe(cfg_root_t *cfg, sqlite3 *sq
                 mqtt_rtlm_log("Error:: not found subscribes table name");                  mqtt_rtlm_log("Error:: not found subscribes table name");
                 return -1;                  return -1;
         }          }
        snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (MsgID, QoS, Topic, PubUser, "        snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (ConnID, MsgID, QoS, Topic, PubUser, "
                        "PubDate, PubHost) VALUES (%d, %d, '%s', '%s', "                        "PubDate, PubHost) VALUES ('%s', %d, %d, '%s', '%s', "
                         "datetime('now', 'localtime'), '%s');", str,                           "datetime('now', 'localtime'), '%s');", str, 
                        msgid, qos, topic, user, host);                        connid, msgid, qos, topic, user, host);
   
         if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {          if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                 MQTT_RTLM_LOG(sql);                  MQTT_RTLM_LOG(sql);
Line 451  mqtt_rtlm_write_subscribe(cfg_root_t *cfg, sqlite3 *sq Line 524  mqtt_rtlm_write_subscribe(cfg_root_t *cfg, sqlite3 *sq
  *   *
  * @cfg = loaded config   * @cfg = loaded config
  * @sql = SQL handle   * @sql = SQL handle
    * @connid = connection id
  * @topic = topic   * @topic = topic
  * @user = username   * @user = username
  * @host = hostname   * @host = hostname
  * return: -1 error, 0 no changes or >0 deleted rows   * return: -1 error, 0 no changes or >0 deleted rows
  */   */
 int  int
mqtt_rtlm_delete_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *topic, mqtt_rtlm_delete_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, 
                const char *user, const char *host)                const char *topic, const char *user, const char *host)
 {  {
         int ret = 0;          int ret = 0;
         char *str, szStmt[BUFSIZ] = { 0 };          char *str, szStmt[BUFSIZ] = { 0 };
Line 472  mqtt_rtlm_delete_subscribe(cfg_root_t *cfg, sqlite3 *s Line 546  mqtt_rtlm_delete_subscribe(cfg_root_t *cfg, sqlite3 *s
                 mqtt_rtlm_log("Error:: not found subscribes table name");                  mqtt_rtlm_log("Error:: not found subscribes table name");
                 return -1;                  return -1;
         }          }
        snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE Topic LIKE '%s' AND "        snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE ConnID = '%s' AND "
                        "PubUser LIKE '%s' AND PubHost LIKE '%s';", str,                         "Topic LIKE '%s' AND PubUser LIKE '%s' AND PubHost LIKE '%s';", str, 
                        topic, user, host);                        connid, topic, user, host);
   
         if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {          if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                 MQTT_RTLM_LOG(sql);                  MQTT_RTLM_LOG(sql);
Line 497  mqtt_rtlm_delete_subscribe(cfg_root_t *cfg, sqlite3 *s Line 571  mqtt_rtlm_delete_subscribe(cfg_root_t *cfg, sqlite3 *s
  *   *
  * @cfg = loaded config   * @cfg = loaded config
  * @sql = SQL handle   * @sql = SQL handle
    * @connid = connection id
  * @topic = topic   * @topic = topic
  * return: NULL error or not found and !=NULL allocated subscribe topics   * return: NULL error or not found and !=NULL allocated subscribe topics
  */   */
 mqtt_subscr_t *  mqtt_subscr_t *
mqtt_rtlm_read_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *topic)mqtt_rtlm_read_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *topic)
 {  {
         int rowz = 0;          int rowz = 0;
         char *str, szStmt[BUFSIZ] = { 0 };          char *str, szStmt[BUFSIZ] = { 0 };
Line 517  mqtt_rtlm_read_subscribe(cfg_root_t *cfg, sqlite3 *sql Line 592  mqtt_rtlm_read_subscribe(cfg_root_t *cfg, sqlite3 *sql
                 mqtt_rtlm_log("Error:: not found subscribes table name");                  mqtt_rtlm_log("Error:: not found subscribes table name");
                 return NULL;                  return NULL;
         }          }
        snprintf(szStmt, sizeof szStmt, "SELECT QoS, Topic FROM %s WHERE Topic LIKE '%s';", str, topic);        snprintf(szStmt, sizeof szStmt, "SELECT QoS, Topic FROM %s WHERE ConnID = '%s' AND "
                         "Topic LIKE '%s';", str, connid, topic);
   
         if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {          if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                 MQTT_RTLM_LOG(sql);                  MQTT_RTLM_LOG(sql);
Line 527  mqtt_rtlm_read_subscribe(cfg_root_t *cfg, sqlite3 *sql Line 603  mqtt_rtlm_read_subscribe(cfg_root_t *cfg, sqlite3 *sql
         /* calculate count of rows and allocate subscribe items */          /* calculate count of rows and allocate subscribe items */
         while (sqlite3_step(stmt) == SQLITE_ROW)          while (sqlite3_step(stmt) == SQLITE_ROW)
                 rowz++;                  rowz++;
        if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {        if (!(s = io_malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
                 mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));                  mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
                 goto end;                  goto end;
         } else          } else
Line 537  mqtt_rtlm_read_subscribe(cfg_root_t *cfg, sqlite3 *sql Line 613  mqtt_rtlm_read_subscribe(cfg_root_t *cfg, sqlite3 *sql
         /* fill with data */          /* fill with data */
         for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {          for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
                 s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);                  s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
                s[j].sub_topic.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));                s[j].sub_topic.msg_base = (u_char*) io_strdup((char*) sqlite3_column_text(stmt, 1));
                 s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);                  s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
                 s[j].sub_value.msg_base = NULL;                  s[j].sub_value.msg_base = NULL;
                 s[j].sub_value.msg_len = 0;                  s[j].sub_value.msg_len = 0;

Removed from v.1.2.2.7  
changed lines
  Added in v.1.2.2.10


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>