Diff for /mqtt/src/pubmqtt.c between versions 1.1.2.9 and 1.2.2.4

version 1.1.2.9, 2011/11/28 10:17:12 version 1.2.2.4, 2012/04/25 07:37:16
Line 1 Line 1
 #include "global.h"  #include "global.h"
   
   
   extern const char sql_schema[];
   
   
 /*  /*
  * mqtt_db_log() Log database connection message   * mqtt_db_log() Log database connection message
  *   *
Line 28  mqtt_rtlm_log(const char *fmt, ...) Line 31  mqtt_rtlm_log(const char *fmt, ...)
  * return: NULL error or SQL handle   * return: NULL error or SQL handle
  */   */
 sqlite3 *  sqlite3 *
mqtt_rtlm_open(sl_config *cfg)mqtt_rtlm_open(cfg_root_t *cfg)
 {  {
         sqlite3 *sql = NULL;          sqlite3 *sql = NULL;
         const char *str = NULL;          const char *str = NULL;
Line 36  mqtt_rtlm_open(sl_config *cfg) Line 39  mqtt_rtlm_open(sl_config *cfg)
         if (!cfg)          if (!cfg)
                 return NULL;                  return NULL;
   
        str = (const char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("name"));        sqlite3_config(SQLITE_CONFIG_SERIALIZED);
         if (!sqlite3_threadsafe())
                 return NULL;
 
         str = cfg_getAttribute(cfg, "mqtt_pub", "name");
         if (!str) {          if (!str) {
                 mqtt_rtlm_log("Error:: Unknown database name ...\n");                  mqtt_rtlm_log("Error:: Unknown database name ...\n");
                 return NULL;                  return NULL;
         }          }
   
        if (sqlite3_open_v2(str, &sql, SQLITE_OPEN_READWRITE, NULL)) {        if (sqlite3_open_v2(str, &sql, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL)) {
                 MQTT_RTLM_LOG(sql);                  MQTT_RTLM_LOG(sql);
                 sqlite3_close(sql);                  sqlite3_close(sql);
                 return NULL;                  return NULL;
         }          }
   
           sqlite3_mutex_enter(sqlite3_db_mutex(sql));
           if (sqlite3_exec(sql, sql_schema, NULL, NULL, NULL)) {
                   MQTT_RTLM_LOG(sql);
                   sqlite3_mutex_leave(sqlite3_db_mutex(sql));
                   sqlite3_close(sql);
                   return NULL;
           }
           sqlite3_mutex_leave(sqlite3_db_mutex(sql));
         return sql;          return sql;
 }  }
   
Line 68  mqtt_rtlm_close(sqlite3 *sql) Line 83  mqtt_rtlm_close(sqlite3 *sql)
  *   *
  * @cfg = loaded config   * @cfg = loaded config
  * @sql = SQL handle   * @sql = SQL handle
    * @connid = connection id
  * @user = username   * @user = username
  * @host = hostname   * @host = hostname
 * @port = port * @will = will flag if !=0 must fill arguments
  * @... = will arguments in order topic,msg,qos,retain
  * return: -1 error, 0 session already appears or >0 row changed   * return: -1 error, 0 session already appears or >0 row changed
  */   */
 int  int
mqtt_rtlm_init_session(sl_config *cfg, sqlite3 *sql, const char *user, const char *host, u_short port)mqtt_rtlm_init_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, 
                 const char *host, char will, ...)
 {  {
           va_list lst;
         int ret = 0;          int ret = 0;
         char *str, szStmt[BUFSIZ] = { 0 };          char *str, szStmt[BUFSIZ] = { 0 };
         sqlite3_stmt *stmt;          sqlite3_stmt *stmt;
Line 83  mqtt_rtlm_init_session(sl_config *cfg, sqlite3 *sql, c Line 102  mqtt_rtlm_init_session(sl_config *cfg, sqlite3 *sql, c
         if (!cfg || !sql)          if (!cfg || !sql)
                 return -1;                  return -1;
   
        str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
         if (!str) {          if (!str) {
                 mqtt_rtlm_log("Error:: not found online table name");                  mqtt_rtlm_log("Error:: not found online table name");
                 return -1;                  return -1;
         }          }
        snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Username, RemoteHost, RemotePort) "        if (!will)
                       "VALUES ('%s', '%s', %d);", str, user, host, port);                snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (ConnID, Username, RemoteHost, "
                                 "WillFlag) VALUES ('%s', '%s', '%s', 0);", str, connid, user, host);
         else {
                 va_start(lst, will);
                 snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (ConnID, Username, RemoteHost, "
                                 "WillFlag, WillRetain, WillQoS, WillMsg, WillTopic) "
                                 "VALUES ('%s', '%s', '%s', %d, %d, %d, '%s', '%s');", 
                                 str, connid, user, host, will, 
                                 va_arg(lst, int), va_arg(lst, int), va_arg(lst, char*), va_arg(lst, char*));
                 va_end(lst);
         }
   
           sqlite3_mutex_enter(sqlite3_db_mutex(sql));
         if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {          if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                 MQTT_RTLM_LOG(sql);                  MQTT_RTLM_LOG(sql);
                   sqlite3_mutex_leave(sqlite3_db_mutex(sql));
                 return -1;                  return -1;
         }          }
         if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)          if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
Line 103  mqtt_rtlm_init_session(sl_config *cfg, sqlite3 *sql, c Line 134  mqtt_rtlm_init_session(sl_config *cfg, sqlite3 *sql, c
                 ret = 0;                  ret = 0;
         }          }
         sqlite3_finalize(stmt);          sqlite3_finalize(stmt);
           sqlite3_mutex_leave(sqlite3_db_mutex(sql));
   
         return ret;          return ret;
 }  }
Line 112  mqtt_rtlm_init_session(sl_config *cfg, sqlite3 *sql, c Line 144  mqtt_rtlm_init_session(sl_config *cfg, sqlite3 *sql, c
  *   *
  * @cfg = loaded config   * @cfg = loaded config
  * @sql = SQL handle   * @sql = SQL handle
    * @connid = connection id
  * @user = username   * @user = username
  * @host = hostname   * @host = hostname
  * return: -1 error, 0 session already appears or >0 row changed   * return: -1 error, 0 session already appears or >0 row changed
  */   */
 int  int
mqtt_rtlm_fini_session(sl_config *cfg, sqlite3 *sql, const char *user, const char *host)mqtt_rtlm_fini_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
 {  {
         int ret = 0;          int ret = 0;
         char *str, szStmt[BUFSIZ] = { 0 };          char *str, szStmt[BUFSIZ] = { 0 };
Line 126  mqtt_rtlm_fini_session(sl_config *cfg, sqlite3 *sql, c Line 159  mqtt_rtlm_fini_session(sl_config *cfg, sqlite3 *sql, c
         if (!cfg || !sql)          if (!cfg || !sql)
                 return -1;                  return -1;
   
        str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
         if (!str) {          if (!str) {
                 mqtt_rtlm_log("Error:: not found online table name");                  mqtt_rtlm_log("Error:: not found online table name");
                 return -1;                  return -1;
         }          }
        snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE Username = '%s' AND RemoteHost LIKE '%s';",         snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE ConnID = '%s' AND Username = '%s' "
                        str, user, host);                        "AND RemoteHost LIKE '%s';", str, connid, user, host);
   
           sqlite3_mutex_enter(sqlite3_db_mutex(sql));
         if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {          if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                 MQTT_RTLM_LOG(sql);                  MQTT_RTLM_LOG(sql);
                   sqlite3_mutex_leave(sqlite3_db_mutex(sql));
                 return -1;                  return -1;
         }          }
         if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)          if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
Line 146  mqtt_rtlm_fini_session(sl_config *cfg, sqlite3 *sql, c Line 181  mqtt_rtlm_fini_session(sl_config *cfg, sqlite3 *sql, c
                 ret = 0;                  ret = 0;
         }          }
         sqlite3_finalize(stmt);          sqlite3_finalize(stmt);
           sqlite3_mutex_leave(sqlite3_db_mutex(sql));
   
         return ret;          return ret;
 }  }
Line 155  mqtt_rtlm_fini_session(sl_config *cfg, sqlite3 *sql, c Line 191  mqtt_rtlm_fini_session(sl_config *cfg, sqlite3 *sql, c
  *   *
  * @cfg = loaded config   * @cfg = loaded config
  * @sql = SQL handle   * @sql = SQL handle
    * @connid = connection id
  * @user = username   * @user = username
  * @host = hostname   * @host = hostname
  * return: -1 error, 0 not logged or >0 logged found rows   * return: -1 error, 0 not logged or >0 logged found rows
  */   */
 int  int
mqtt_rtlm_chk_session(sl_config *cfg, sqlite3 *sql, const char *user, const char *host)mqtt_rtlm_chk_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
 {  {
         int ret = 0;          int ret = 0;
         char *str, szStmt[BUFSIZ] = { 0 };          char *str, szStmt[BUFSIZ] = { 0 };
Line 169  mqtt_rtlm_chk_session(sl_config *cfg, sqlite3 *sql, co Line 206  mqtt_rtlm_chk_session(sl_config *cfg, sqlite3 *sql, co
         if (!cfg || !sql)          if (!cfg || !sql)
                 return -1;                  return -1;
   
        str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
         if (!str) {          if (!str) {
                 mqtt_rtlm_log("Error:: not found online table name");                  mqtt_rtlm_log("Error:: not found online table name");
                 return -1;                  return -1;
         }          }
        snprintf(szStmt, sizeof szStmt, "SELECT RemoteHost, RemotePort FROM %s WHERE "        snprintf(szStmt, sizeof szStmt, "SELECT ConnID, RemoteHost FROM %s WHERE "
                        "Username = '%s' AND RemoteHost LIKE '%s';", str, user, host);                        "ConnID = '%s' AND Username = '%s' AND RemoteHost LIKE '%s';", 
                         str, connid, user, host);
   
           sqlite3_mutex_enter(sqlite3_db_mutex(sql));
         if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {          if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                 MQTT_RTLM_LOG(sql);                  MQTT_RTLM_LOG(sql);
                   sqlite3_mutex_leave(sqlite3_db_mutex(sql));
                 return -1;                  return -1;
         }          }
         if (sqlite3_step(stmt) == SQLITE_ROW)          if (sqlite3_step(stmt) == SQLITE_ROW)
Line 186  mqtt_rtlm_chk_session(sl_config *cfg, sqlite3 *sql, co Line 226  mqtt_rtlm_chk_session(sl_config *cfg, sqlite3 *sql, co
         else          else
                 ret = 0;                  ret = 0;
         sqlite3_finalize(stmt);          sqlite3_finalize(stmt);
           sqlite3_mutex_leave(sqlite3_db_mutex(sql));
   
         return ret;          return ret;
 }  }
Line 195  mqtt_rtlm_chk_session(sl_config *cfg, sqlite3 *sql, co Line 236  mqtt_rtlm_chk_session(sl_config *cfg, sqlite3 *sql, co
  *   *
  * @cfg = loaded config   * @cfg = loaded config
  * @sql = SQL handle   * @sql = SQL handle
    * @msgid = MessageID
  * @topic = topic   * @topic = topic
  * @txt = text   * @txt = text
  * @user = username   * @user = username
Line 203  mqtt_rtlm_chk_session(sl_config *cfg, sqlite3 *sql, co Line 245  mqtt_rtlm_chk_session(sl_config *cfg, sqlite3 *sql, co
  * return: -1 error, 0 no publish or >0 published ok   * return: -1 error, 0 no publish or >0 published ok
  */   */
 int  int
mqtt_rtlm_write_topic(sl_config *cfg, sqlite3 *sql, const char *topic, const char *txt, mqtt_rtlm_write_topic(cfg_root_t *cfg, sqlite3 *sql, u_short msgid, const char *topic, const char *txt, 
                 const char *user, const char *host, char retain)                  const char *user, const char *host, char retain)
 {  {
         int ret = 0;          int ret = 0;
Line 213  mqtt_rtlm_write_topic(sl_config *cfg, sqlite3 *sql, co Line 255  mqtt_rtlm_write_topic(sl_config *cfg, sqlite3 *sql, co
         if (!cfg || !sql || !topic)          if (!cfg || !sql || !topic)
                 return -1;                  return -1;
   
        str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
         if (!str) {          if (!str) {
                 mqtt_rtlm_log("Error:: not found topics table name");                  mqtt_rtlm_log("Error:: not found topics table name");
                 return -1;                  return -1;
         }          }
        snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Retain, Topic, Value, PubUser, PubDate, PubHost) "        snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Retain, MsgID, Topic, Value, PubUser, "
                        "VALUES (%d, '%s', '%s', '%s', datetime('now', 'localtime'), '%s');", str,                         "PubDate, PubHost) VALUES (%d, %d, '%s', '%s', '%s', "
                        retain, topic, txt, user, host);                        "datetime('now', 'localtime'), '%s');", 
                         str, retain, msgid, topic, txt, user, host);
   
           sqlite3_mutex_enter(sqlite3_db_mutex(sql));
         if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {          if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                 MQTT_RTLM_LOG(sql);                  MQTT_RTLM_LOG(sql);
                   sqlite3_mutex_leave(sqlite3_db_mutex(sql));
                 return -1;                  return -1;
         }          }
         if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)          if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
Line 234  mqtt_rtlm_write_topic(sl_config *cfg, sqlite3 *sql, co Line 279  mqtt_rtlm_write_topic(sl_config *cfg, sqlite3 *sql, co
                 ret = 0;                  ret = 0;
         }          }
         sqlite3_finalize(stmt);          sqlite3_finalize(stmt);
           sqlite3_mutex_leave(sqlite3_db_mutex(sql));
   
         return ret;          return ret;
 }  }
Line 243  mqtt_rtlm_write_topic(sl_config *cfg, sqlite3 *sql, co Line 289  mqtt_rtlm_write_topic(sl_config *cfg, sqlite3 *sql, co
  *   *
  * @cfg = loaded config   * @cfg = loaded config
  * @sql = SQL handle   * @sql = SQL handle
    * @msgid = MessageID
  * @topic = topic   * @topic = topic
  * @user = username   * @user = username
  * @host = hostname   * @host = hostname
Line 250  mqtt_rtlm_write_topic(sl_config *cfg, sqlite3 *sql, co Line 297  mqtt_rtlm_write_topic(sl_config *cfg, sqlite3 *sql, co
  * return: -1 error, 0 no changes or >0 deleted rows   * return: -1 error, 0 no changes or >0 deleted rows
  */   */
 int  int
mqtt_rtlm_delete_topic(sl_config *cfg, sqlite3 *sql, const char *topic, mqtt_rtlm_delete_topic(cfg_root_t *cfg, sqlite3 *sql, u_short msgid, const char *topic, 
                 const char *user, const char *host, char retain)                  const char *user, const char *host, char retain)
 {  {
         int ret = 0;          int ret = 0;
Line 260  mqtt_rtlm_delete_topic(sl_config *cfg, sqlite3 *sql, c Line 307  mqtt_rtlm_delete_topic(sl_config *cfg, sqlite3 *sql, c
         if (!cfg || !sql || !topic)          if (!cfg || !sql || !topic)
                 return -1;                  return -1;
   
        str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
         if (!str) {          if (!str) {
                 mqtt_rtlm_log("Error:: not found topics table name");                  mqtt_rtlm_log("Error:: not found topics table name");
                 return -1;                  return -1;
Line 276  mqtt_rtlm_delete_topic(sl_config *cfg, sqlite3 *sql, c Line 323  mqtt_rtlm_delete_topic(sl_config *cfg, sqlite3 *sql, c
                         rtn = "AND Retain != 0";                          rtn = "AND Retain != 0";
                         break;                          break;
         }          }
        snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE Topic LIKE '%s' AND "        snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE MsgID = %d AND Topic LIKE '%s' AND "
                         "PubUser LIKE '%s' AND PubHost LIKE '%s' %s;", str,                           "PubUser LIKE '%s' AND PubHost LIKE '%s' %s;", str, 
                        topic, user, host, rtn);                        msgid, topic, user, host, rtn);
   
           sqlite3_mutex_enter(sqlite3_db_mutex(sql));
         if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {          if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                 MQTT_RTLM_LOG(sql);                  MQTT_RTLM_LOG(sql);
                   sqlite3_mutex_leave(sqlite3_db_mutex(sql));
                 return -1;                  return -1;
         }          }
         if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)          if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
Line 292  mqtt_rtlm_delete_topic(sl_config *cfg, sqlite3 *sql, c Line 341  mqtt_rtlm_delete_topic(sl_config *cfg, sqlite3 *sql, c
                 ret = 0;                  ret = 0;
         }          }
         sqlite3_finalize(stmt);          sqlite3_finalize(stmt);
           sqlite3_mutex_leave(sqlite3_db_mutex(sql));
   
         return ret;          return ret;
 }  }
Line 301  mqtt_rtlm_delete_topic(sl_config *cfg, sqlite3 *sql, c Line 351  mqtt_rtlm_delete_topic(sl_config *cfg, sqlite3 *sql, c
  *   *
  * @cfg = loaded config   * @cfg = loaded config
  * @sql = SQL handle   * @sql = SQL handle
    * @msgid = MessageID
  * @topic = topic   * @topic = topic
 * return: -1 error, 0 not found or >0 topics rows * @retain = retain 0 get only dynamic, >0 get only retained and -1 no matter
  * return: NULL error or not found and !=NULL allocated subscribe topics
  */   */
   mqtt_subscr_t *
   mqtt_rtlm_read_topic(cfg_root_t *cfg, sqlite3 *sql, u_short msgid, const char *topic, char retain)
   {
           int rowz = 0;
           char *str, szStr[STRSIZ], szStmt[BUFSIZ] = { 0 };
           sqlite3_stmt *stmt;
           register int j;
           mqtt_subscr_t *s = NULL;
   
           if (!cfg || !sql || !topic)
                   return NULL;
   
           switch (retain) {
                   case -1:
                           memset(szStr, 0, sizeof szStr);
                           break;
                   case 0:
                           snprintf(szStr, sizeof szStr, "AND Retain = 0");
                           break;
                   default:
                           snprintf(szStr, sizeof szStr, "AND Retain > 0");
                           break;
           }
   
           str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
           if (!str) {
                   mqtt_rtlm_log("Error:: not found topics table name");
                   return NULL;
           }
           snprintf(szStmt, sizeof szStmt, "SELECT Retain, Topic, Value FROM %s WHERE "
                           "MsgID = %d AND Topic LIKE '%s' %s;", 
                           str, msgid, topic, szStr);
   
           sqlite3_mutex_enter(sqlite3_db_mutex(sql));
           if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                   MQTT_RTLM_LOG(sql);
                   sqlite3_mutex_leave(sqlite3_db_mutex(sql));
                   return NULL;
           }
   
           /* calculate count of rows and allocate subscribe items */
           while (sqlite3_step(stmt) == SQLITE_ROW)
                   rowz++;
           if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
                   mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
                   goto end;
           } else
                   memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
           sqlite3_reset(stmt);
   
           /* fill with data */
           for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
                   s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
                   s[j].sub_topic.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
                   s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
                   s[j].sub_value.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 2));
                   s[j].sub_value.msg_len = strlen((char*) s[j].sub_value.msg_base);
           }
   end:
           sqlite3_finalize(stmt);
           sqlite3_mutex_leave(sqlite3_db_mutex(sql));
   
           return s;
   }
   
   /*
    * mqtt_rtlm_write_subscribe() Subscribe topic
    *
    * @cfg = loaded config
    * @sql = SQL handle
    * @msgid = MessageID
    * @topic = topic
    * @user = username
    * @host = hostname
    * @qos = Subscribe QoS
    * return: -1 error, 0 no publish or >0 published ok
    */
 int  int
mqtt_rtlm_read_topic(sl_config *cfg, sqlite3 *sql, const char *topic)mqtt_rtlm_write_subscribe(cfg_root_t *cfg, sqlite3 *sql, u_short msgid, const char *topic
                 const char *user, const char *host, char qos)
 {  {
        int colz, ret = 0;        int ret = 0;
         char *str, szStmt[BUFSIZ] = { 0 };          char *str, szStmt[BUFSIZ] = { 0 };
         sqlite3_stmt *stmt;          sqlite3_stmt *stmt;
         register int i;  
   
        if (!cfg || !sql)        if (!cfg || !sql || !topic)
                 return -1;                  return -1;
   
        str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
         if (!str) {          if (!str) {
                mqtt_rtlm_log("Error:: not found topics table name");                mqtt_rtlm_log("Error:: not found subscribes table name");
                 return -1;                  return -1;
         }          }
        snprintf(szStmt, sizeof szStmt, "SELECT Retain, Topic, Value FROM %s WHERE Topic LIKE '%s';",         snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (MsgID, QoS, Topic, PubUser, "
                        str, topic);                        "PubDate, PubHost) VALUES (%d, %d, '%s', '%s', "
                         "datetime('now', 'localtime'), '%s');", str, 
                         msgid, qos, topic, user, host);
   
           sqlite3_mutex_enter(sqlite3_db_mutex(sql));
         if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {          if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                 MQTT_RTLM_LOG(sql);                  MQTT_RTLM_LOG(sql);
                   sqlite3_mutex_leave(sqlite3_db_mutex(sql));
                 return -1;                  return -1;
         }          }
        while (sqlite3_step(stmt) == SQLITE_ROW && (colz = sqlite3_column_count(stmt))) {        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                for (i = 0; i < colz; i++) {                ret = sqlite3_changes(sql);
                }        else {
                 if (ret > SQLITE_OK && ret < SQLITE_ROW)
                         MQTT_RTLM_LOG(sql);
                 ret = 0;
         }          }
         sqlite3_finalize(stmt);          sqlite3_finalize(stmt);
           sqlite3_mutex_leave(sqlite3_db_mutex(sql));
   
         return ret;          return ret;
   }
   
   /*
    * mqtt_rtlm_delete_subscribe() Delete subscribe
    *
    * @cfg = loaded config
    * @sql = SQL handle
    * @topic = topic
    * @user = username
    * @host = hostname
    * @qos = Subscribe QoS if -1 no matter
    * return: -1 error, 0 no changes or >0 deleted rows
    */
   int
   mqtt_rtlm_delete_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *topic, 
                   const char *user, const char *host, char qos)
   {
           int ret = 0;
           char *str, szStr[STRSIZ] = { 0 }, szStmt[BUFSIZ] = { 0 };
           sqlite3_stmt *stmt;
   
           if (!cfg || !sql || !topic)
                   return -1;
   
           str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
           if (!str) {
                   mqtt_rtlm_log("Error:: not found subscribes table name");
                   return -1;
           }
           if (qos > -1 && qos < 3)
                   snprintf(szStr, sizeof szStr, "AND QoS = %d", qos);
           snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE Topic LIKE '%s' AND "
                           "PubUser LIKE '%s' AND PubHost LIKE '%s' %s;", str, 
                           topic, user, host, szStr);
   
           sqlite3_mutex_enter(sqlite3_db_mutex(sql));
           if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                   MQTT_RTLM_LOG(sql);
                   sqlite3_mutex_leave(sqlite3_db_mutex(sql));
                   return -1;
           }
           if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                   ret = sqlite3_changes(sql);
           else {
                   if (ret > SQLITE_OK && ret < SQLITE_ROW)
                           MQTT_RTLM_LOG(sql);
                   ret = 0;
           }
           sqlite3_finalize(stmt);
           sqlite3_mutex_leave(sqlite3_db_mutex(sql));
   
           return ret;
   }
   
   /*
    * mqtt_rtlm_read_subscribe() Get subscribe topic
    *
    * @cfg = loaded config
    * @sql = SQL handle
    * @topic = topic
    * return: NULL error or not found and !=NULL allocated subscribe topics
    */
   mqtt_subscr_t *
   mqtt_rtlm_read_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *topic)
   {
           int rowz = 0;
           char *str, szStmt[BUFSIZ] = { 0 };
           sqlite3_stmt *stmt;
           register int j;
           mqtt_subscr_t *s = NULL;
   
           if (!cfg || !sql || !topic)
                   return NULL;
   
           str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
           if (!str) {
                   mqtt_rtlm_log("Error:: not found subscribes table name");
                   return NULL;
           }
           snprintf(szStmt, sizeof szStmt, "SELECT QoS, Topic FROM %s WHERE Topic LIKE '%s';", str, topic);
   
           sqlite3_mutex_enter(sqlite3_db_mutex(sql));
           if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                   MQTT_RTLM_LOG(sql);
                   sqlite3_mutex_leave(sqlite3_db_mutex(sql));
                   return NULL;
           }
   
           /* calculate count of rows and allocate subscribe items */
           while (sqlite3_step(stmt) == SQLITE_ROW)
                   rowz++;
           if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
                   mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
                   goto end;
           } else
                   memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
           sqlite3_reset(stmt);
   
           /* fill with data */
           for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
                   s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
                   s[j].sub_topic.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
                   s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
                   s[j].sub_value.msg_base = NULL;
                   s[j].sub_value.msg_len = 0;
           }
   end:
           sqlite3_finalize(stmt);
           sqlite3_mutex_leave(sqlite3_db_mutex(sql));
   
           return s;
 }  }

Removed from v.1.1.2.9  
changed lines
  Added in v.1.2.2.4


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>