| version 1.2.2.4, 2012/04/25 07:37:16 | version 1.4, 2012/07/03 12:46:01 | 
| Line 1 | Line 1 | 
 |  | /************************************************************************* | 
 |  | * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com> | 
 |  | *  by Michael Pounov <misho@openbsd-bg.org> | 
 |  | * | 
 |  | * $Author$ | 
 |  | * $Id$ | 
 |  | * | 
 |  | ************************************************************************** | 
 |  | The ELWIX and AITNET software is distributed under the following | 
 |  | terms: | 
 |  |  | 
 |  | All of the documentation and software included in the ELWIX and AITNET | 
 |  | Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org> | 
 |  |  | 
 |  | Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012 | 
 |  | by Michael Pounov <misho@elwix.org>.  All rights reserved. | 
 |  |  | 
 |  | Redistribution and use in source and binary forms, with or without | 
 |  | modification, are permitted provided that the following conditions | 
 |  | are met: | 
 |  | 1. Redistributions of source code must retain the above copyright | 
 |  | notice, this list of conditions and the following disclaimer. | 
 |  | 2. Redistributions in binary form must reproduce the above copyright | 
 |  | notice, this list of conditions and the following disclaimer in the | 
 |  | documentation and/or other materials provided with the distribution. | 
 |  | 3. All advertising materials mentioning features or use of this software | 
 |  | must display the following acknowledgement: | 
 |  | This product includes software developed by Michael Pounov <misho@elwix.org> | 
 |  | ELWIX - Embedded LightWeight unIX and its contributors. | 
 |  | 4. Neither the name of AITNET nor the names of its contributors | 
 |  | may be used to endorse or promote products derived from this software | 
 |  | without specific prior written permission. | 
 |  |  | 
 |  | THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND | 
 |  | ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE | 
 |  | IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE | 
 |  | ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE | 
 |  | FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL | 
 |  | DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS | 
 |  | OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) | 
 |  | HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT | 
 |  | LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY | 
 |  | OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF | 
 |  | SUCH DAMAGE. | 
 |  | */ | 
 | #include "global.h" | #include "global.h" | 
 |  |  | 
 |  |  | 
| Line 20  mqtt_rtlm_log(const char *fmt, ...) | Line 65  mqtt_rtlm_log(const char *fmt, ...) | 
 | vsyslog(LOG_ERR, fmt, lst); | vsyslog(LOG_ERR, fmt, lst); | 
 | va_end(lst); | va_end(lst); | 
 | } | } | 
| #define MQTT_RTLM_LOG(_sql)     (assert((_sql)), mqtt_rtlm_log("Error:: SQL #%d - %s", \ | #define MQTT_RTLM_LOG(_sql)     (assert((_sql)), mqtt_rtlm_log("Error:: %s(%d) SQL #%d - %s", \ | 
|  | __func__, __LINE__, \ | 
 | sqlite3_errcode((_sql)), sqlite3_errmsg((_sql)))) | sqlite3_errcode((_sql)), sqlite3_errmsg((_sql)))) | 
 |  |  | 
 |  | /* library pre-loaded actions */ | 
 |  | void | 
 |  | _init() | 
 |  | { | 
 |  | sqlite3_initialize(); | 
 |  | } | 
 |  |  | 
 |  | void | 
 |  | _fini() | 
 |  | { | 
 |  | sqlite3_shutdown(); | 
 |  | } | 
 |  |  | 
 |  |  | 
 | /* | /* | 
 | * mqtt_rtlm_open() Open database connection | * mqtt_rtlm_open() Open database connection | 
 | * | * | 
| Line 39  mqtt_rtlm_open(cfg_root_t *cfg) | Line 98  mqtt_rtlm_open(cfg_root_t *cfg) | 
 | if (!cfg) | if (!cfg) | 
 | return NULL; | return NULL; | 
 |  |  | 
 | sqlite3_config(SQLITE_CONFIG_SERIALIZED); |  | 
 | if (!sqlite3_threadsafe()) |  | 
 | return NULL; |  | 
 |  |  | 
 | str = cfg_getAttribute(cfg, "mqtt_pub", "name"); | str = cfg_getAttribute(cfg, "mqtt_pub", "name"); | 
 | if (!str) { | if (!str) { | 
 | mqtt_rtlm_log("Error:: Unknown database name ...\n"); | mqtt_rtlm_log("Error:: Unknown database name ...\n"); | 
| Line 55  mqtt_rtlm_open(cfg_root_t *cfg) | Line 110  mqtt_rtlm_open(cfg_root_t *cfg) | 
 | return NULL; | return NULL; | 
 | } | } | 
 |  |  | 
 | sqlite3_mutex_enter(sqlite3_db_mutex(sql)); |  | 
 | if (sqlite3_exec(sql, sql_schema, NULL, NULL, NULL)) { | if (sqlite3_exec(sql, sql_schema, NULL, NULL, NULL)) { | 
 | MQTT_RTLM_LOG(sql); | MQTT_RTLM_LOG(sql); | 
 | sqlite3_mutex_leave(sqlite3_db_mutex(sql)); |  | 
 | sqlite3_close(sql); | sqlite3_close(sql); | 
 | return NULL; | return NULL; | 
 | } | } | 
 | sqlite3_mutex_leave(sqlite3_db_mutex(sql)); |  | 
 | return sql; | return sql; | 
 | } | } | 
 |  |  | 
| Line 96  mqtt_rtlm_init_session(cfg_root_t *cfg, sqlite3 *sql, | Line 148  mqtt_rtlm_init_session(cfg_root_t *cfg, sqlite3 *sql, | 
 | { | { | 
 | va_list lst; | va_list lst; | 
 | int ret = 0; | int ret = 0; | 
| char *str, szStmt[BUFSIZ] = { 0 }; | char *str, *psStmt; | 
 | sqlite3_stmt *stmt; | sqlite3_stmt *stmt; | 
 |  |  | 
 | if (!cfg || !sql) | if (!cfg || !sql) | 
| Line 108  mqtt_rtlm_init_session(cfg_root_t *cfg, sqlite3 *sql, | Line 160  mqtt_rtlm_init_session(cfg_root_t *cfg, sqlite3 *sql, | 
 | return -1; | return -1; | 
 | } | } | 
 | if (!will) | if (!will) | 
| snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (ConnID, Username, RemoteHost, " | psStmt = sqlite3_mprintf("INSERT INTO %s (ConnID, Username, RemoteHost, " | 
| "WillFlag) VALUES ('%s', '%s', '%s', 0);", str, connid, user, host); | "WillFlag) VALUES ('%q', '%q', '%q', 0);", str, connid, user, host); | 
 | else { | else { | 
 | va_start(lst, will); | va_start(lst, will); | 
| snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (ConnID, Username, RemoteHost, " | psStmt = sqlite3_mprintf("INSERT INTO %s (ConnID, Username, RemoteHost, " | 
 | "WillFlag, WillRetain, WillQoS, WillMsg, WillTopic) " | "WillFlag, WillRetain, WillQoS, WillMsg, WillTopic) " | 
| "VALUES ('%s', '%s', '%s', %d, %d, %d, '%s', '%s');", | "VALUES ('%q', '%q', '%q', %d, %d, %d, '%q', '%q');", | 
 | str, connid, user, host, will, | str, connid, user, host, will, | 
 | va_arg(lst, int), va_arg(lst, int), va_arg(lst, char*), va_arg(lst, char*)); | va_arg(lst, int), va_arg(lst, int), va_arg(lst, char*), va_arg(lst, char*)); | 
 | va_end(lst); | va_end(lst); | 
 | } | } | 
 |  |  | 
| sqlite3_mutex_enter(sqlite3_db_mutex(sql)); | if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) { | 
| if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) { |  | 
 | MQTT_RTLM_LOG(sql); | MQTT_RTLM_LOG(sql); | 
| sqlite3_mutex_leave(sqlite3_db_mutex(sql)); | sqlite3_free(psStmt); | 
 | return -1; | return -1; | 
| } | } else | 
|  | sqlite3_free(psStmt); | 
 | if ((ret = sqlite3_step(stmt)) == SQLITE_DONE) | if ((ret = sqlite3_step(stmt)) == SQLITE_DONE) | 
 | ret = sqlite3_changes(sql); | ret = sqlite3_changes(sql); | 
 | else { | else { | 
| Line 134  mqtt_rtlm_init_session(cfg_root_t *cfg, sqlite3 *sql, | Line 186  mqtt_rtlm_init_session(cfg_root_t *cfg, sqlite3 *sql, | 
 | ret = 0; | ret = 0; | 
 | } | } | 
 | sqlite3_finalize(stmt); | sqlite3_finalize(stmt); | 
 | sqlite3_mutex_leave(sqlite3_db_mutex(sql)); |  | 
 |  |  | 
 | return ret; | return ret; | 
 | } | } | 
| Line 153  int | Line 204  int | 
 | mqtt_rtlm_fini_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host) | mqtt_rtlm_fini_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host) | 
 | { | { | 
 | int ret = 0; | int ret = 0; | 
| char *str, szStmt[BUFSIZ] = { 0 }; | char *str, *psStmt; | 
 | sqlite3_stmt *stmt; | sqlite3_stmt *stmt; | 
 |  |  | 
 | if (!cfg || !sql) | if (!cfg || !sql) | 
| Line 164  mqtt_rtlm_fini_session(cfg_root_t *cfg, sqlite3 *sql, | Line 215  mqtt_rtlm_fini_session(cfg_root_t *cfg, sqlite3 *sql, | 
 | mqtt_rtlm_log("Error:: not found online table name"); | mqtt_rtlm_log("Error:: not found online table name"); | 
 | return -1; | return -1; | 
 | } | } | 
| snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE ConnID = '%s' AND Username = '%s' " | psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND Username = '%q' " | 
| "AND RemoteHost LIKE '%s';", str, connid, user, host); | "AND RemoteHost LIKE '%q';", str, connid, user, host); | 
 |  |  | 
| sqlite3_mutex_enter(sqlite3_db_mutex(sql)); | if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) { | 
| if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) { |  | 
 | MQTT_RTLM_LOG(sql); | MQTT_RTLM_LOG(sql); | 
| sqlite3_mutex_leave(sqlite3_db_mutex(sql)); | sqlite3_free(psStmt); | 
 | return -1; | return -1; | 
| } | } else | 
|  | sqlite3_free(psStmt); | 
 | if ((ret = sqlite3_step(stmt)) == SQLITE_DONE) | if ((ret = sqlite3_step(stmt)) == SQLITE_DONE) | 
 | ret = sqlite3_changes(sql); | ret = sqlite3_changes(sql); | 
 | else { | else { | 
| Line 181  mqtt_rtlm_fini_session(cfg_root_t *cfg, sqlite3 *sql, | Line 232  mqtt_rtlm_fini_session(cfg_root_t *cfg, sqlite3 *sql, | 
 | ret = 0; | ret = 0; | 
 | } | } | 
 | sqlite3_finalize(stmt); | sqlite3_finalize(stmt); | 
 | sqlite3_mutex_leave(sqlite3_db_mutex(sql)); |  | 
 |  |  | 
 | return ret; | return ret; | 
 | } | } | 
| Line 200  int | Line 250  int | 
 | mqtt_rtlm_chk_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host) | mqtt_rtlm_chk_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host) | 
 | { | { | 
 | int ret = 0; | int ret = 0; | 
| char *str, szStmt[BUFSIZ] = { 0 }; | char *str, *psStmt; | 
 | sqlite3_stmt *stmt; | sqlite3_stmt *stmt; | 
 |  |  | 
 | if (!cfg || !sql) | if (!cfg || !sql) | 
| Line 211  mqtt_rtlm_chk_session(cfg_root_t *cfg, sqlite3 *sql, c | Line 261  mqtt_rtlm_chk_session(cfg_root_t *cfg, sqlite3 *sql, c | 
 | mqtt_rtlm_log("Error:: not found online table name"); | mqtt_rtlm_log("Error:: not found online table name"); | 
 | return -1; | return -1; | 
 | } | } | 
| snprintf(szStmt, sizeof szStmt, "SELECT ConnID, RemoteHost FROM %s WHERE " | psStmt = sqlite3_mprintf("SELECT ConnID, RemoteHost FROM %s WHERE " | 
| "ConnID = '%s' AND Username = '%s' AND RemoteHost LIKE '%s';", | "ConnID = '%q' AND Username LIKE '%q' AND RemoteHost LIKE '%q';", | 
 | str, connid, user, host); | str, connid, user, host); | 
 |  |  | 
| sqlite3_mutex_enter(sqlite3_db_mutex(sql)); | if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) { | 
| if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) { |  | 
 | MQTT_RTLM_LOG(sql); | MQTT_RTLM_LOG(sql); | 
| sqlite3_mutex_leave(sqlite3_db_mutex(sql)); | sqlite3_free(psStmt); | 
 | return -1; | return -1; | 
| } | } else | 
|  | sqlite3_free(psStmt); | 
 | if (sqlite3_step(stmt) == SQLITE_ROW) | if (sqlite3_step(stmt) == SQLITE_ROW) | 
 | ret = sqlite3_changes(sql); | ret = sqlite3_changes(sql); | 
 | else | else | 
 | ret = 0; | ret = 0; | 
 | sqlite3_finalize(stmt); | sqlite3_finalize(stmt); | 
 | sqlite3_mutex_leave(sqlite3_db_mutex(sql)); |  | 
 |  |  | 
 | return ret; | return ret; | 
 | } | } | 
| Line 236  mqtt_rtlm_chk_session(cfg_root_t *cfg, sqlite3 *sql, c | Line 285  mqtt_rtlm_chk_session(cfg_root_t *cfg, sqlite3 *sql, c | 
 | * | * | 
 | * @cfg = loaded config | * @cfg = loaded config | 
 | * @sql = SQL handle | * @sql = SQL handle | 
 |  | * @connid = connection id | 
 | * @msgid = MessageID | * @msgid = MessageID | 
 | * @topic = topic | * @topic = topic | 
 | * @txt = text | * @txt = text | 
 |  | * @txtlen = text length | 
 | * @user = username | * @user = username | 
 | * @host = hostname | * @host = hostname | 
 |  | * @qos = QoS | 
 | * @retain = !=0 retain message to database | * @retain = !=0 retain message to database | 
 | * return: -1 error, 0 no publish or >0 published ok | * return: -1 error, 0 no publish or >0 published ok | 
 | */ | */ | 
 | int | int | 
| mqtt_rtlm_write_topic(cfg_root_t *cfg, sqlite3 *sql, u_short msgid, const char *topic, const char *txt, | mqtt_rtlm_write_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, | 
| const char *user, const char *host, char retain) | const char *topic, void *txt, int txtlen, const char *user, | 
|  | const char *host, char qos, char retain) | 
 | { | { | 
 | int ret = 0; | int ret = 0; | 
| char *str, szStmt[BUFSIZ] = { 0 }; | char *str, *psStmt; | 
 | sqlite3_stmt *stmt; | sqlite3_stmt *stmt; | 
 |  |  | 
 | if (!cfg || !sql || !topic) | if (!cfg || !sql || !topic) | 
| Line 260  mqtt_rtlm_write_topic(cfg_root_t *cfg, sqlite3 *sql, u | Line 313  mqtt_rtlm_write_topic(cfg_root_t *cfg, sqlite3 *sql, u | 
 | mqtt_rtlm_log("Error:: not found topics table name"); | mqtt_rtlm_log("Error:: not found topics table name"); | 
 | return -1; | return -1; | 
 | } | } | 
| snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Retain, MsgID, Topic, Value, PubUser, " | psStmt = sqlite3_mprintf("INSERT INTO %s (QoS, Retain, ConnID, MsgID, Topic, Value, PubUser, " | 
| "PubDate, PubHost) VALUES (%d, %d, '%s', '%s', '%s', " | "PubDate, PubHost) VALUES (%d, %d, '%q', %u, '%q', ?1, '%q', " | 
| "datetime('now', 'localtime'), '%s');", | "datetime('now', 'localtime'), '%q');", | 
| str, retain, msgid, topic, txt, user, host); | str, qos, retain, connid, msgid, topic, user, host); | 
 |  |  | 
| sqlite3_mutex_enter(sqlite3_db_mutex(sql)); | if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL) || !stmt) { | 
| if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) { |  | 
 | MQTT_RTLM_LOG(sql); | MQTT_RTLM_LOG(sql); | 
| sqlite3_mutex_leave(sqlite3_db_mutex(sql)); | sqlite3_free(psStmt); | 
 | return -1; | return -1; | 
 |  | } else | 
 |  | sqlite3_free(psStmt); | 
 |  | if (sqlite3_bind_blob(stmt, 1, txt, txtlen, SQLITE_TRANSIENT)) { | 
 |  | MQTT_RTLM_LOG(sql); | 
 |  | sqlite3_finalize(stmt); | 
 |  | return -1; | 
 | } | } | 
 | if ((ret = sqlite3_step(stmt)) == SQLITE_DONE) | if ((ret = sqlite3_step(stmt)) == SQLITE_DONE) | 
 | ret = sqlite3_changes(sql); | ret = sqlite3_changes(sql); | 
| Line 279  mqtt_rtlm_write_topic(cfg_root_t *cfg, sqlite3 *sql, u | Line 337  mqtt_rtlm_write_topic(cfg_root_t *cfg, sqlite3 *sql, u | 
 | ret = 0; | ret = 0; | 
 | } | } | 
 | sqlite3_finalize(stmt); | sqlite3_finalize(stmt); | 
 | sqlite3_mutex_leave(sqlite3_db_mutex(sql)); |  | 
 |  |  | 
 | return ret; | return ret; | 
 | } | } | 
 |  |  | 
 | /* | /* | 
 |  | * mqtt_rtlm_wipe_topic() Wipe all topics | 
 |  | * | 
 |  | * @cfg = loaded config | 
 |  | * @sql = SQL handle | 
 |  | * @connid = connection id | 
 |  | * @user = username | 
 |  | * @retain = -1 no matter | 
 |  | * return: -1 error, 0 no changes or >0 deleted rows | 
 |  | */ | 
 |  | int | 
 |  | mqtt_rtlm_wipe_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, char retain) | 
 |  | { | 
 |  | int ret = 0; | 
 |  | char *str, *rtn, *psStmt; | 
 |  | sqlite3_stmt *stmt; | 
 |  |  | 
 |  | if (!cfg || !sql || !connid) | 
 |  | return -1; | 
 |  |  | 
 |  | str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics"); | 
 |  | if (!str) { | 
 |  | mqtt_rtlm_log("Error:: not found topics table name"); | 
 |  | return -1; | 
 |  | } | 
 |  | switch (retain) { | 
 |  | case -1: | 
 |  | rtn = ""; | 
 |  | break; | 
 |  | case 0: | 
 |  | rtn = "AND Retain = 0"; | 
 |  | break; | 
 |  | default: | 
 |  | rtn = "AND Retain != 0"; | 
 |  | break; | 
 |  | } | 
 |  | psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND " | 
 |  | "PubUser LIKE '%q' %s;", str, connid, user, rtn); | 
 |  |  | 
 |  | if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) { | 
 |  | MQTT_RTLM_LOG(sql); | 
 |  | sqlite3_free(psStmt); | 
 |  | return -1; | 
 |  | } else | 
 |  | sqlite3_free(psStmt); | 
 |  | if ((ret = sqlite3_step(stmt)) == SQLITE_DONE) | 
 |  | ret = sqlite3_changes(sql); | 
 |  | else { | 
 |  | if (ret > SQLITE_OK && ret < SQLITE_ROW) | 
 |  | MQTT_RTLM_LOG(sql); | 
 |  | ret = 0; | 
 |  | } | 
 |  | sqlite3_finalize(stmt); | 
 |  |  | 
 |  | return ret; | 
 |  | } | 
 |  |  | 
 |  | /* | 
 | * mqtt_rtlm_delete_topic() Delete topic | * mqtt_rtlm_delete_topic() Delete topic | 
 | * | * | 
 | * @cfg = loaded config | * @cfg = loaded config | 
 | * @sql = SQL handle | * @sql = SQL handle | 
 |  | * @connid = connection id | 
 | * @msgid = MessageID | * @msgid = MessageID | 
 | * @topic = topic | * @topic = topic | 
 | * @user = username | * @user = username | 
| Line 297  mqtt_rtlm_write_topic(cfg_root_t *cfg, sqlite3 *sql, u | Line 412  mqtt_rtlm_write_topic(cfg_root_t *cfg, sqlite3 *sql, u | 
 | * return: -1 error, 0 no changes or >0 deleted rows | * return: -1 error, 0 no changes or >0 deleted rows | 
 | */ | */ | 
 | int | int | 
| mqtt_rtlm_delete_topic(cfg_root_t *cfg, sqlite3 *sql, u_short msgid, const char *topic, | mqtt_rtlm_delete_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, | 
| const char *user, const char *host, char retain) | const char *topic, const char *user, const char *host, char retain) | 
 | { | { | 
 | int ret = 0; | int ret = 0; | 
| char *str, *rtn, szStmt[BUFSIZ] = { 0 }; | char *str, *rtn, *psStmt; | 
 | sqlite3_stmt *stmt; | sqlite3_stmt *stmt; | 
 |  |  | 
 | if (!cfg || !sql || !topic) | if (!cfg || !sql || !topic) | 
| Line 323  mqtt_rtlm_delete_topic(cfg_root_t *cfg, sqlite3 *sql, | Line 438  mqtt_rtlm_delete_topic(cfg_root_t *cfg, sqlite3 *sql, | 
 | rtn = "AND Retain != 0"; | rtn = "AND Retain != 0"; | 
 | break; | break; | 
 | } | } | 
| snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE MsgID = %d AND Topic LIKE '%s' AND " | psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND MsgID = %d AND " | 
| "PubUser LIKE '%s' AND PubHost LIKE '%s' %s;", str, | "Topic LIKE '%q' AND PubUser LIKE '%q' AND PubHost LIKE '%q' %s;", str, | 
| msgid, topic, user, host, rtn); | connid, msgid, topic, user, host, rtn); | 
 |  |  | 
| sqlite3_mutex_enter(sqlite3_db_mutex(sql)); | if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) { | 
| if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) { |  | 
 | MQTT_RTLM_LOG(sql); | MQTT_RTLM_LOG(sql); | 
| sqlite3_mutex_leave(sqlite3_db_mutex(sql)); | sqlite3_free(psStmt); | 
 | return -1; | return -1; | 
| } | } else | 
|  | sqlite3_free(psStmt); | 
 | if ((ret = sqlite3_step(stmt)) == SQLITE_DONE) | if ((ret = sqlite3_step(stmt)) == SQLITE_DONE) | 
 | ret = sqlite3_changes(sql); | ret = sqlite3_changes(sql); | 
 | else { | else { | 
| Line 341  mqtt_rtlm_delete_topic(cfg_root_t *cfg, sqlite3 *sql, | Line 456  mqtt_rtlm_delete_topic(cfg_root_t *cfg, sqlite3 *sql, | 
 | ret = 0; | ret = 0; | 
 | } | } | 
 | sqlite3_finalize(stmt); | sqlite3_finalize(stmt); | 
 | sqlite3_mutex_leave(sqlite3_db_mutex(sql)); |  | 
 |  |  | 
 | return ret; | return ret; | 
 | } | } | 
| Line 351  mqtt_rtlm_delete_topic(cfg_root_t *cfg, sqlite3 *sql, | Line 465  mqtt_rtlm_delete_topic(cfg_root_t *cfg, sqlite3 *sql, | 
 | * | * | 
 | * @cfg = loaded config | * @cfg = loaded config | 
 | * @sql = SQL handle | * @sql = SQL handle | 
| * @msgid = MessageID | * @connid = connection id | 
 | * @topic = topic | * @topic = topic | 
 | * @retain = retain 0 get only dynamic, >0 get only retained and -1 no matter | * @retain = retain 0 get only dynamic, >0 get only retained and -1 no matter | 
 | * return: NULL error or not found and !=NULL allocated subscribe topics | * return: NULL error or not found and !=NULL allocated subscribe topics | 
 | */ | */ | 
 | mqtt_subscr_t * | mqtt_subscr_t * | 
| mqtt_rtlm_read_topic(cfg_root_t *cfg, sqlite3 *sql, u_short msgid, const char *topic, char retain) | mqtt_rtlm_read_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, | 
|  | const char *topic, char retain) | 
 | { | { | 
 | int rowz = 0; | int rowz = 0; | 
| char *str, szStr[STRSIZ], szStmt[BUFSIZ] = { 0 }; | char *str, szStr[STRSIZ], *psStmt; | 
 | sqlite3_stmt *stmt; | sqlite3_stmt *stmt; | 
 | register int j; | register int j; | 
 | mqtt_subscr_t *s = NULL; | mqtt_subscr_t *s = NULL; | 
 |  | ait_val_t v; | 
 |  |  | 
 | if (!cfg || !sql || !topic) | if (!cfg || !sql || !topic) | 
 | return NULL; | return NULL; | 
| Line 385  mqtt_rtlm_read_topic(cfg_root_t *cfg, sqlite3 *sql, u_ | Line 501  mqtt_rtlm_read_topic(cfg_root_t *cfg, sqlite3 *sql, u_ | 
 | mqtt_rtlm_log("Error:: not found topics table name"); | mqtt_rtlm_log("Error:: not found topics table name"); | 
 | return NULL; | return NULL; | 
 | } | } | 
| snprintf(szStmt, sizeof szStmt, "SELECT Retain, Topic, Value FROM %s WHERE " | psStmt = sqlite3_mprintf("SELECT QoS, Topic, Value  FROM %s WHERE " | 
| "MsgID = %d AND Topic LIKE '%s' %s;", | "ConnID LIKE '%q' AND Topic LIKE '%q' %s;", | 
| str, msgid, topic, szStr); | str, connid, topic, szStr); | 
 |  |  | 
| sqlite3_mutex_enter(sqlite3_db_mutex(sql)); | if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) { | 
| if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) { |  | 
 | MQTT_RTLM_LOG(sql); | MQTT_RTLM_LOG(sql); | 
| sqlite3_mutex_leave(sqlite3_db_mutex(sql)); | sqlite3_free(psStmt); | 
 | return NULL; | return NULL; | 
| } | } else | 
|  | sqlite3_free(psStmt); | 
 |  |  | 
 | /* calculate count of rows and allocate subscribe items */ | /* calculate count of rows and allocate subscribe items */ | 
 | while (sqlite3_step(stmt) == SQLITE_ROW) | while (sqlite3_step(stmt) == SQLITE_ROW) | 
| Line 411  mqtt_rtlm_read_topic(cfg_root_t *cfg, sqlite3 *sql, u_ | Line 527  mqtt_rtlm_read_topic(cfg_root_t *cfg, sqlite3 *sql, u_ | 
 | s[j].sub_ret = (char) sqlite3_column_int(stmt, 0); | s[j].sub_ret = (char) sqlite3_column_int(stmt, 0); | 
 | s[j].sub_topic.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1)); | s[j].sub_topic.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1)); | 
 | s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base); | s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base); | 
| s[j].sub_value.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 2)); | AIT_SET_PTR(&v, (void*) sqlite3_column_blob(stmt, 2), sqlite3_column_bytes(stmt, 2)); | 
| s[j].sub_value.msg_len = strlen((char*) s[j].sub_value.msg_base); | s[j].sub_value.msg_len = AIT_LEN(&v); | 
|  | s[j].sub_value.msg_base = (u_char*) malloc(s[j].sub_value.msg_len); | 
|  | if (s[j].sub_value.msg_base) | 
|  | memcpy(s[j].sub_value.msg_base, AIT_GET_PTR(&v), s[j].sub_value.msg_len); | 
|  | AIT_FREE_VAL(&v); | 
 | } | } | 
 | end: | end: | 
 | sqlite3_finalize(stmt); | sqlite3_finalize(stmt); | 
 | sqlite3_mutex_leave(sqlite3_db_mutex(sql)); |  | 
 |  |  | 
 | return s; | return s; | 
 | } | } | 
| Line 426  end: | Line 545  end: | 
 | * | * | 
 | * @cfg = loaded config | * @cfg = loaded config | 
 | * @sql = SQL handle | * @sql = SQL handle | 
 |  | * @connid = connection id | 
 | * @msgid = MessageID | * @msgid = MessageID | 
 | * @topic = topic | * @topic = topic | 
 | * @user = username | * @user = username | 
| Line 434  end: | Line 554  end: | 
 | * return: -1 error, 0 no publish or >0 published ok | * return: -1 error, 0 no publish or >0 published ok | 
 | */ | */ | 
 | int | int | 
| mqtt_rtlm_write_subscribe(cfg_root_t *cfg, sqlite3 *sql, u_short msgid, const char *topic, | mqtt_rtlm_write_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, | 
| const char *user, const char *host, char qos) | const char *topic, const char *user, const char *host, char qos) | 
 | { | { | 
 | int ret = 0; | int ret = 0; | 
| char *str, szStmt[BUFSIZ] = { 0 }; | char *str, *psStmt; | 
 | sqlite3_stmt *stmt; | sqlite3_stmt *stmt; | 
 |  |  | 
 | if (!cfg || !sql || !topic) | if (!cfg || !sql || !topic) | 
| Line 449  mqtt_rtlm_write_subscribe(cfg_root_t *cfg, sqlite3 *sq | Line 569  mqtt_rtlm_write_subscribe(cfg_root_t *cfg, sqlite3 *sq | 
 | mqtt_rtlm_log("Error:: not found subscribes table name"); | mqtt_rtlm_log("Error:: not found subscribes table name"); | 
 | return -1; | return -1; | 
 | } | } | 
| snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (MsgID, QoS, Topic, PubUser, " | psStmt = sqlite3_mprintf("INSERT INTO %s (ConnID, MsgID, QoS, Topic, PubUser, " | 
| "PubDate, PubHost) VALUES (%d, %d, '%s', '%s', " | "PubDate, PubHost) VALUES ('%q', %d, %d, '%q', '%q', " | 
| "datetime('now', 'localtime'), '%s');", str, | "datetime('now', 'localtime'), '%q');", str, | 
| msgid, qos, topic, user, host); | connid, msgid, qos, topic, user, host); | 
 |  |  | 
| sqlite3_mutex_enter(sqlite3_db_mutex(sql)); | if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) { | 
| if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) { |  | 
 | MQTT_RTLM_LOG(sql); | MQTT_RTLM_LOG(sql); | 
| sqlite3_mutex_leave(sqlite3_db_mutex(sql)); | sqlite3_free(psStmt); | 
 | return -1; | return -1; | 
| } | } else | 
|  | sqlite3_free(psStmt); | 
 | if ((ret = sqlite3_step(stmt)) == SQLITE_DONE) | if ((ret = sqlite3_step(stmt)) == SQLITE_DONE) | 
 | ret = sqlite3_changes(sql); | ret = sqlite3_changes(sql); | 
 | else { | else { | 
| Line 468  mqtt_rtlm_write_subscribe(cfg_root_t *cfg, sqlite3 *sq | Line 588  mqtt_rtlm_write_subscribe(cfg_root_t *cfg, sqlite3 *sq | 
 | ret = 0; | ret = 0; | 
 | } | } | 
 | sqlite3_finalize(stmt); | sqlite3_finalize(stmt); | 
 | sqlite3_mutex_leave(sqlite3_db_mutex(sql)); |  | 
 |  |  | 
 | return ret; | return ret; | 
 | } | } | 
| Line 478  mqtt_rtlm_write_subscribe(cfg_root_t *cfg, sqlite3 *sq | Line 597  mqtt_rtlm_write_subscribe(cfg_root_t *cfg, sqlite3 *sq | 
 | * | * | 
 | * @cfg = loaded config | * @cfg = loaded config | 
 | * @sql = SQL handle | * @sql = SQL handle | 
 |  | * @connid = connection id | 
 | * @topic = topic | * @topic = topic | 
 | * @user = username | * @user = username | 
 | * @host = hostname | * @host = hostname | 
 | * @qos = Subscribe QoS if -1 no matter |  | 
 | * return: -1 error, 0 no changes or >0 deleted rows | * return: -1 error, 0 no changes or >0 deleted rows | 
 | */ | */ | 
 | int | int | 
| mqtt_rtlm_delete_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *topic, | mqtt_rtlm_delete_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, | 
| const char *user, const char *host, char qos) | const char *topic, const char *user, const char *host) | 
 | { | { | 
 | int ret = 0; | int ret = 0; | 
| char *str, szStr[STRSIZ] = { 0 }, szStmt[BUFSIZ] = { 0 }; | char *str, *psStmt; | 
 | sqlite3_stmt *stmt; | sqlite3_stmt *stmt; | 
 |  |  | 
 | if (!cfg || !sql || !topic) | if (!cfg || !sql || !topic) | 
| Line 500  mqtt_rtlm_delete_subscribe(cfg_root_t *cfg, sqlite3 *s | Line 619  mqtt_rtlm_delete_subscribe(cfg_root_t *cfg, sqlite3 *s | 
 | mqtt_rtlm_log("Error:: not found subscribes table name"); | mqtt_rtlm_log("Error:: not found subscribes table name"); | 
 | return -1; | return -1; | 
 | } | } | 
| if (qos > -1 && qos < 3) | psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND " | 
| snprintf(szStr, sizeof szStr, "AND QoS = %d", qos); | "Topic LIKE '%q' AND PubUser LIKE '%q' AND PubHost LIKE '%q';", str, | 
| snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE Topic LIKE '%s' AND " | connid, topic, user, host); | 
| "PubUser LIKE '%s' AND PubHost LIKE '%s' %s;", str, |  | 
| topic, user, host, szStr); |  | 
 |  |  | 
| sqlite3_mutex_enter(sqlite3_db_mutex(sql)); | if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) { | 
| if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) { |  | 
 | MQTT_RTLM_LOG(sql); | MQTT_RTLM_LOG(sql); | 
| sqlite3_mutex_leave(sqlite3_db_mutex(sql)); | sqlite3_free(psStmt); | 
 | return -1; | return -1; | 
| } | } else | 
|  | sqlite3_free(psStmt); | 
 | if ((ret = sqlite3_step(stmt)) == SQLITE_DONE) | if ((ret = sqlite3_step(stmt)) == SQLITE_DONE) | 
 | ret = sqlite3_changes(sql); | ret = sqlite3_changes(sql); | 
 | else { | else { | 
| Line 520  mqtt_rtlm_delete_subscribe(cfg_root_t *cfg, sqlite3 *s | Line 637  mqtt_rtlm_delete_subscribe(cfg_root_t *cfg, sqlite3 *s | 
 | ret = 0; | ret = 0; | 
 | } | } | 
 | sqlite3_finalize(stmt); | sqlite3_finalize(stmt); | 
 | sqlite3_mutex_leave(sqlite3_db_mutex(sql)); |  | 
 |  |  | 
 | return ret; | return ret; | 
 | } | } | 
| Line 530  mqtt_rtlm_delete_subscribe(cfg_root_t *cfg, sqlite3 *s | Line 646  mqtt_rtlm_delete_subscribe(cfg_root_t *cfg, sqlite3 *s | 
 | * | * | 
 | * @cfg = loaded config | * @cfg = loaded config | 
 | * @sql = SQL handle | * @sql = SQL handle | 
 |  | * @connid = connection id | 
 | * @topic = topic | * @topic = topic | 
 | * return: NULL error or not found and !=NULL allocated subscribe topics | * return: NULL error or not found and !=NULL allocated subscribe topics | 
 | */ | */ | 
 | mqtt_subscr_t * | mqtt_subscr_t * | 
| mqtt_rtlm_read_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *topic) | mqtt_rtlm_read_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *topic) | 
 | { | { | 
 | int rowz = 0; | int rowz = 0; | 
| char *str, szStmt[BUFSIZ] = { 0 }; | char *str, *psStmt; | 
 | sqlite3_stmt *stmt; | sqlite3_stmt *stmt; | 
 | register int j; | register int j; | 
 | mqtt_subscr_t *s = NULL; | mqtt_subscr_t *s = NULL; | 
| Line 550  mqtt_rtlm_read_subscribe(cfg_root_t *cfg, sqlite3 *sql | Line 667  mqtt_rtlm_read_subscribe(cfg_root_t *cfg, sqlite3 *sql | 
 | mqtt_rtlm_log("Error:: not found subscribes table name"); | mqtt_rtlm_log("Error:: not found subscribes table name"); | 
 | return NULL; | return NULL; | 
 | } | } | 
| snprintf(szStmt, sizeof szStmt, "SELECT QoS, Topic FROM %s WHERE Topic LIKE '%s';", str, topic); | psStmt = sqlite3_mprintf("SELECT QoS, Topic FROM %s WHERE ConnID = '%q' AND " | 
|  | "Topic LIKE '%q';", str, connid, topic); | 
 |  |  | 
| sqlite3_mutex_enter(sqlite3_db_mutex(sql)); | if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) { | 
| if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) { |  | 
 | MQTT_RTLM_LOG(sql); | MQTT_RTLM_LOG(sql); | 
| sqlite3_mutex_leave(sqlite3_db_mutex(sql)); | sqlite3_free(psStmt); | 
 | return NULL; | return NULL; | 
| } | } else | 
|  | sqlite3_free(psStmt); | 
 |  |  | 
 | /* calculate count of rows and allocate subscribe items */ | /* calculate count of rows and allocate subscribe items */ | 
 | while (sqlite3_step(stmt) == SQLITE_ROW) | while (sqlite3_step(stmt) == SQLITE_ROW) | 
| Line 579  mqtt_rtlm_read_subscribe(cfg_root_t *cfg, sqlite3 *sql | Line 697  mqtt_rtlm_read_subscribe(cfg_root_t *cfg, sqlite3 *sql | 
 | } | } | 
 | end: | end: | 
 | sqlite3_finalize(stmt); | sqlite3_finalize(stmt); | 
 | sqlite3_mutex_leave(sqlite3_db_mutex(sql)); |  | 
 |  |  | 
 | return s; | return s; | 
 | } | } |