#include "global.h"
extern const char sql_schema[];
/*
* mqtt_db_log() Log database connection message
*
* @fmt = format string
* @... = argument list
* return: none
*/
static void
mqtt_rtlm_log(const char *fmt, ...)
{
va_list lst;
va_start(lst, fmt);
vsyslog(LOG_ERR, fmt, lst);
va_end(lst);
}
#define MQTT_RTLM_LOG(_sql) (assert((_sql)), mqtt_rtlm_log("Error:: %s(%d) SQL #%d - %s", \
__func__, __LINE__, \
sqlite3_errcode((_sql)), sqlite3_errmsg((_sql))))
/*
* mqtt_rtlm_open() Open database connection
*
* @cfg = config filename
* return: NULL error or SQL handle
*/
sqlite3 *
mqtt_rtlm_open(cfg_root_t *cfg)
{
sqlite3 *sql = NULL;
const char *str = NULL;
if (!cfg)
return NULL;
sqlite3_config(SQLITE_CONFIG_SERIALIZED);
if (!sqlite3_threadsafe())
return NULL;
str = cfg_getAttribute(cfg, "mqtt_pub", "name");
if (!str) {
mqtt_rtlm_log("Error:: Unknown database name ...\n");
return NULL;
}
if (sqlite3_open_v2(str, &sql, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL)) {
MQTT_RTLM_LOG(sql);
sqlite3_close(sql);
return NULL;
}
sqlite3_mutex_enter(sqlite3_db_mutex(sql));
if (sqlite3_exec(sql, sql_schema, NULL, NULL, NULL)) {
MQTT_RTLM_LOG(sql);
sqlite3_mutex_leave(sqlite3_db_mutex(sql));
sqlite3_close(sql);
return NULL;
}
sqlite3_mutex_leave(sqlite3_db_mutex(sql));
return sql;
}
/*
* mqtt_rtlm_close() Close database connection
*
* @sql = SQL handle
* return: none
*/
void
mqtt_rtlm_close(sqlite3 *sql)
{
sqlite3_close(sql);
}
/*
* mqtt_rtlm_init_session() Create session
*
* @cfg = loaded config
* @sql = SQL handle
* @connid = connection id
* @user = username
* @host = hostname
* @will = will flag if !=0 must fill arguments
* @... = will arguments in order topic,msg,qos,retain
* return: -1 error, 0 session already appears or >0 row changed
*/
int
mqtt_rtlm_init_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user,
const char *host, char will, ...)
{
va_list lst;
int ret = 0;
char *str, szStmt[BUFSIZ] = { 0 };
sqlite3_stmt *stmt;
if (!cfg || !sql)
return -1;
str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
if (!str) {
mqtt_rtlm_log("Error:: not found online table name");
return -1;
}
if (!will)
snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (ConnID, Username, RemoteHost, "
"WillFlag) VALUES ('%s', '%s', '%s', 0);", str, connid, user, host);
else {
va_start(lst, will);
snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (ConnID, Username, RemoteHost, "
"WillFlag, WillRetain, WillQoS, WillMsg, WillTopic) "
"VALUES ('%s', '%s', '%s', %d, %d, %d, '%s', '%s');",
str, connid, user, host, will,
va_arg(lst, int), va_arg(lst, int), va_arg(lst, char*), va_arg(lst, char*));
va_end(lst);
}
sqlite3_mutex_enter(sqlite3_db_mutex(sql));
if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
MQTT_RTLM_LOG(sql);
sqlite3_mutex_leave(sqlite3_db_mutex(sql));
return -1;
}
if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
ret = sqlite3_changes(sql);
else {
if (ret > SQLITE_OK && ret < SQLITE_ROW)
MQTT_RTLM_LOG(sql);
ret = 0;
}
sqlite3_finalize(stmt);
sqlite3_mutex_leave(sqlite3_db_mutex(sql));
return ret;
}
/*
* mqtt_rtlm_fini_session() Delete session(s)
*
* @cfg = loaded config
* @sql = SQL handle
* @connid = connection id
* @user = username
* @host = hostname
* return: -1 error, 0 session already appears or >0 row changed
*/
int
mqtt_rtlm_fini_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
{
int ret = 0;
char *str, szStmt[BUFSIZ] = { 0 };
sqlite3_stmt *stmt;
if (!cfg || !sql)
return -1;
str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
if (!str) {
mqtt_rtlm_log("Error:: not found online table name");
return -1;
}
snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE ConnID = '%s' AND Username = '%s' "
"AND RemoteHost LIKE '%s';", str, connid, user, host);
sqlite3_mutex_enter(sqlite3_db_mutex(sql));
if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
MQTT_RTLM_LOG(sql);
sqlite3_mutex_leave(sqlite3_db_mutex(sql));
return -1;
}
if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
ret = sqlite3_changes(sql);
else {
if (ret > SQLITE_OK && ret < SQLITE_ROW)
MQTT_RTLM_LOG(sql);
ret = 0;
}
sqlite3_finalize(stmt);
sqlite3_mutex_leave(sqlite3_db_mutex(sql));
return ret;
}
/*
* mqtt_rtlm_chk_session() Check session(s)
*
* @cfg = loaded config
* @sql = SQL handle
* @connid = connection id
* @user = username
* @host = hostname
* return: -1 error, 0 not logged or >0 logged found rows
*/
int
mqtt_rtlm_chk_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
{
int ret = 0;
char *str, szStmt[BUFSIZ] = { 0 };
sqlite3_stmt *stmt;
if (!cfg || !sql)
return -1;
str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
if (!str) {
mqtt_rtlm_log("Error:: not found online table name");
return -1;
}
snprintf(szStmt, sizeof szStmt, "SELECT ConnID, RemoteHost FROM %s WHERE "
"ConnID = '%s' AND Username = '%s' AND RemoteHost LIKE '%s';",
str, connid, user, host);
sqlite3_mutex_enter(sqlite3_db_mutex(sql));
if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
MQTT_RTLM_LOG(sql);
sqlite3_mutex_leave(sqlite3_db_mutex(sql));
return -1;
}
if (sqlite3_step(stmt) == SQLITE_ROW)
ret = sqlite3_changes(sql);
else
ret = 0;
sqlite3_finalize(stmt);
sqlite3_mutex_leave(sqlite3_db_mutex(sql));
return ret;
}
/*
* mqtt_rtlm_write_topic() Publish topic
*
* @cfg = loaded config
* @sql = SQL handle
* @msgid = MessageID
* @topic = topic
* @txt = text
* @user = username
* @host = hostname
* @retain = !=0 retain message to database
* return: -1 error, 0 no publish or >0 published ok
*/
int
mqtt_rtlm_write_topic(cfg_root_t *cfg, sqlite3 *sql, u_short msgid, const char *topic, const char *txt,
const char *user, const char *host, char retain)
{
int ret = 0;
char *str, szStmt[BUFSIZ] = { 0 };
sqlite3_stmt *stmt;
if (!cfg || !sql || !topic)
return -1;
str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
if (!str) {
mqtt_rtlm_log("Error:: not found topics table name");
return -1;
}
snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Retain, MsgID, Topic, Value, PubUser, "
"PubDate, PubHost) VALUES (%d, %d, '%s', '%s', '%s', "
"datetime('now', 'localtime'), '%s');",
str, retain, msgid, topic, txt, user, host);
sqlite3_mutex_enter(sqlite3_db_mutex(sql));
if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
MQTT_RTLM_LOG(sql);
sqlite3_mutex_leave(sqlite3_db_mutex(sql));
return -1;
}
if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
ret = sqlite3_changes(sql);
else {
if (ret > SQLITE_OK && ret < SQLITE_ROW)
MQTT_RTLM_LOG(sql);
ret = 0;
}
sqlite3_finalize(stmt);
sqlite3_mutex_leave(sqlite3_db_mutex(sql));
return ret;
}
/*
* mqtt_rtlm_delete_topic() Delete topic
*
* @cfg = loaded config
* @sql = SQL handle
* @msgid = MessageID
* @topic = topic
* @user = username
* @host = hostname
* @retain = -1 no matter
* return: -1 error, 0 no changes or >0 deleted rows
*/
int
mqtt_rtlm_delete_topic(cfg_root_t *cfg, sqlite3 *sql, u_short msgid, const char *topic,
const char *user, const char *host, char retain)
{
int ret = 0;
char *str, *rtn, szStmt[BUFSIZ] = { 0 };
sqlite3_stmt *stmt;
if (!cfg || !sql || !topic)
return -1;
str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
if (!str) {
mqtt_rtlm_log("Error:: not found topics table name");
return -1;
}
switch (retain) {
case -1:
rtn = "";
break;
case 0:
rtn = "AND Retain = 0";
break;
default:
rtn = "AND Retain != 0";
break;
}
snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE MsgID = %d AND Topic LIKE '%s' AND "
"PubUser LIKE '%s' AND PubHost LIKE '%s' %s;", str,
msgid, topic, user, host, rtn);
sqlite3_mutex_enter(sqlite3_db_mutex(sql));
if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
MQTT_RTLM_LOG(sql);
sqlite3_mutex_leave(sqlite3_db_mutex(sql));
return -1;
}
if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
ret = sqlite3_changes(sql);
else {
if (ret > SQLITE_OK && ret < SQLITE_ROW)
MQTT_RTLM_LOG(sql);
ret = 0;
}
sqlite3_finalize(stmt);
sqlite3_mutex_leave(sqlite3_db_mutex(sql));
return ret;
}
/*
* mqtt_rtlm_read_topic() Get topic
*
* @cfg = loaded config
* @sql = SQL handle
* @msgid = MessageID
* @topic = topic
* @retain = retain 0 get only dynamic, >0 get only retained and -1 no matter
* return: NULL error or not found and !=NULL allocated subscribe topics
*/
mqtt_subscr_t *
mqtt_rtlm_read_topic(cfg_root_t *cfg, sqlite3 *sql, u_short msgid, const char *topic, char retain)
{
int rowz = 0;
char *str, szStr[STRSIZ], szStmt[BUFSIZ] = { 0 };
sqlite3_stmt *stmt;
register int j;
mqtt_subscr_t *s = NULL;
if (!cfg || !sql || !topic)
return NULL;
switch (retain) {
case -1:
memset(szStr, 0, sizeof szStr);
break;
case 0:
snprintf(szStr, sizeof szStr, "AND Retain = 0");
break;
default:
snprintf(szStr, sizeof szStr, "AND Retain > 0");
break;
}
str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
if (!str) {
mqtt_rtlm_log("Error:: not found topics table name");
return NULL;
}
snprintf(szStmt, sizeof szStmt, "SELECT Retain, Topic, Value FROM %s WHERE "
"MsgID = %d AND Topic LIKE '%s' %s;",
str, msgid, topic, szStr);
sqlite3_mutex_enter(sqlite3_db_mutex(sql));
if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
MQTT_RTLM_LOG(sql);
sqlite3_mutex_leave(sqlite3_db_mutex(sql));
return NULL;
}
/* calculate count of rows and allocate subscribe items */
while (sqlite3_step(stmt) == SQLITE_ROW)
rowz++;
if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
goto end;
} else
memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
sqlite3_reset(stmt);
/* fill with data */
for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
s[j].sub_topic.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
s[j].sub_value.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 2));
s[j].sub_value.msg_len = strlen((char*) s[j].sub_value.msg_base);
}
end:
sqlite3_finalize(stmt);
sqlite3_mutex_leave(sqlite3_db_mutex(sql));
return s;
}
/*
* mqtt_rtlm_write_subscribe() Subscribe topic
*
* @cfg = loaded config
* @sql = SQL handle
* @msgid = MessageID
* @topic = topic
* @user = username
* @host = hostname
* @qos = Subscribe QoS
* return: -1 error, 0 no publish or >0 published ok
*/
int
mqtt_rtlm_write_subscribe(cfg_root_t *cfg, sqlite3 *sql, u_short msgid, const char *topic,
const char *user, const char *host, char qos)
{
int ret = 0;
char *str, szStmt[BUFSIZ] = { 0 };
sqlite3_stmt *stmt;
if (!cfg || !sql || !topic)
return -1;
str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
if (!str) {
mqtt_rtlm_log("Error:: not found subscribes table name");
return -1;
}
snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (MsgID, QoS, Topic, PubUser, "
"PubDate, PubHost) VALUES (%d, %d, '%s', '%s', "
"datetime('now', 'localtime'), '%s');", str,
msgid, qos, topic, user, host);
sqlite3_mutex_enter(sqlite3_db_mutex(sql));
if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
MQTT_RTLM_LOG(sql);
sqlite3_mutex_leave(sqlite3_db_mutex(sql));
return -1;
}
if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
ret = sqlite3_changes(sql);
else {
if (ret > SQLITE_OK && ret < SQLITE_ROW)
MQTT_RTLM_LOG(sql);
ret = 0;
}
sqlite3_finalize(stmt);
sqlite3_mutex_leave(sqlite3_db_mutex(sql));
return ret;
}
/*
* mqtt_rtlm_delete_subscribe() Delete subscribe
*
* @cfg = loaded config
* @sql = SQL handle
* @topic = topic
* @user = username
* @host = hostname
* @qos = Subscribe QoS if -1 no matter
* return: -1 error, 0 no changes or >0 deleted rows
*/
int
mqtt_rtlm_delete_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *topic,
const char *user, const char *host, char qos)
{
int ret = 0;
char *str, szStr[STRSIZ] = { 0 }, szStmt[BUFSIZ] = { 0 };
sqlite3_stmt *stmt;
if (!cfg || !sql || !topic)
return -1;
str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
if (!str) {
mqtt_rtlm_log("Error:: not found subscribes table name");
return -1;
}
if (qos > -1 && qos < 3)
snprintf(szStr, sizeof szStr, "AND QoS = %d", qos);
snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE Topic LIKE '%s' AND "
"PubUser LIKE '%s' AND PubHost LIKE '%s' %s;", str,
topic, user, host, szStr);
sqlite3_mutex_enter(sqlite3_db_mutex(sql));
if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
MQTT_RTLM_LOG(sql);
sqlite3_mutex_leave(sqlite3_db_mutex(sql));
return -1;
}
if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
ret = sqlite3_changes(sql);
else {
if (ret > SQLITE_OK && ret < SQLITE_ROW)
MQTT_RTLM_LOG(sql);
ret = 0;
}
sqlite3_finalize(stmt);
sqlite3_mutex_leave(sqlite3_db_mutex(sql));
return ret;
}
/*
* mqtt_rtlm_read_subscribe() Get subscribe topic
*
* @cfg = loaded config
* @sql = SQL handle
* @topic = topic
* return: NULL error or not found and !=NULL allocated subscribe topics
*/
mqtt_subscr_t *
mqtt_rtlm_read_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *topic)
{
int rowz = 0;
char *str, szStmt[BUFSIZ] = { 0 };
sqlite3_stmt *stmt;
register int j;
mqtt_subscr_t *s = NULL;
if (!cfg || !sql || !topic)
return NULL;
str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
if (!str) {
mqtt_rtlm_log("Error:: not found subscribes table name");
return NULL;
}
snprintf(szStmt, sizeof szStmt, "SELECT QoS, Topic FROM %s WHERE Topic LIKE '%s';", str, topic);
sqlite3_mutex_enter(sqlite3_db_mutex(sql));
if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
MQTT_RTLM_LOG(sql);
sqlite3_mutex_leave(sqlite3_db_mutex(sql));
return NULL;
}
/* calculate count of rows and allocate subscribe items */
while (sqlite3_step(stmt) == SQLITE_ROW)
rowz++;
if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
goto end;
} else
memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
sqlite3_reset(stmt);
/* fill with data */
for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
s[j].sub_topic.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
s[j].sub_value.msg_base = NULL;
s[j].sub_value.msg_len = 0;
}
end:
sqlite3_finalize(stmt);
sqlite3_mutex_leave(sqlite3_db_mutex(sql));
return s;
}
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>