#include "global.h"
extern const char sql_schema[];
/*
* mqtt_db_log() Log database connection message
*
* @fmt = format string
* @... = argument list
* return: none
*/
static void
mqtt_rtlm_log(const char *fmt, ...)
{
va_list lst;
va_start(lst, fmt);
vsyslog(LOG_ERR, fmt, lst);
va_end(lst);
}
#define MQTT_RTLM_LOG(_sql) (assert((_sql)), mqtt_rtlm_log("Error:: %s(%d) SQL #%d - %s", \
__func__, __LINE__, \
sqlite3_errcode((_sql)), sqlite3_errmsg((_sql))))
/* library pre-loaded actions */
void
_init()
{
sqlite3_initialize();
}
void
_fini()
{
sqlite3_shutdown();
}
/*
* mqtt_rtlm_open() Open database connection
*
* @cfg = config filename
* return: NULL error or SQL handle
*/
sqlite3 *
mqtt_rtlm_open(cfg_root_t *cfg)
{
sqlite3 *sql = NULL;
const char *str = NULL;
if (!cfg)
return NULL;
str = cfg_getAttribute(cfg, "mqtt_pub", "name");
if (!str) {
mqtt_rtlm_log("Error:: Unknown database name ...\n");
return NULL;
}
if (sqlite3_open_v2(str, &sql, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL)) {
MQTT_RTLM_LOG(sql);
sqlite3_close(sql);
return NULL;
}
if (sqlite3_exec(sql, sql_schema, NULL, NULL, NULL)) {
MQTT_RTLM_LOG(sql);
sqlite3_close(sql);
return NULL;
}
return sql;
}
/*
* mqtt_rtlm_close() Close database connection
*
* @sql = SQL handle
* return: none
*/
void
mqtt_rtlm_close(sqlite3 *sql)
{
sqlite3_close(sql);
}
/*
* mqtt_rtlm_init_session() Create session
*
* @cfg = loaded config
* @sql = SQL handle
* @connid = connection id
* @user = username
* @host = hostname
* @will = will flag if !=0 must fill arguments
* @... = will arguments in order topic,msg,qos,retain
* return: -1 error, 0 session already appears or >0 row changed
*/
int
mqtt_rtlm_init_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user,
const char *host, char will, ...)
{
va_list lst;
int ret = 0;
char *str, *psStmt;
sqlite3_stmt *stmt;
if (!cfg || !sql)
return -1;
str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
if (!str) {
mqtt_rtlm_log("Error:: not found online table name");
return -1;
}
if (!will)
psStmt = sqlite3_mprintf("INSERT INTO %s (ConnID, Username, RemoteHost, "
"WillFlag) VALUES ('%q', '%q', '%q', 0);", str, connid, user, host);
else {
va_start(lst, will);
psStmt = sqlite3_mprintf("INSERT INTO %s (ConnID, Username, RemoteHost, "
"WillFlag, WillRetain, WillQoS, WillMsg, WillTopic) "
"VALUES ('%q', '%q', '%q', %d, %d, %d, '%q', '%q');",
str, connid, user, host, will,
va_arg(lst, int), va_arg(lst, int), va_arg(lst, char*), va_arg(lst, char*));
va_end(lst);
}
if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
MQTT_RTLM_LOG(sql);
sqlite3_free(psStmt);
return -1;
} else
sqlite3_free(psStmt);
if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
ret = sqlite3_changes(sql);
else {
if (ret > SQLITE_OK && ret < SQLITE_ROW)
MQTT_RTLM_LOG(sql);
ret = 0;
}
sqlite3_finalize(stmt);
return ret;
}
/*
* mqtt_rtlm_fini_session() Delete session(s)
*
* @cfg = loaded config
* @sql = SQL handle
* @connid = connection id
* @user = username
* @host = hostname
* return: -1 error, 0 session already appears or >0 row changed
*/
int
mqtt_rtlm_fini_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
{
int ret = 0;
char *str, *psStmt;
sqlite3_stmt *stmt;
if (!cfg || !sql)
return -1;
str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
if (!str) {
mqtt_rtlm_log("Error:: not found online table name");
return -1;
}
psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND Username = '%q' "
"AND RemoteHost LIKE '%q';", str, connid, user, host);
if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
MQTT_RTLM_LOG(sql);
sqlite3_free(psStmt);
return -1;
} else
sqlite3_free(psStmt);
if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
ret = sqlite3_changes(sql);
else {
if (ret > SQLITE_OK && ret < SQLITE_ROW)
MQTT_RTLM_LOG(sql);
ret = 0;
}
sqlite3_finalize(stmt);
return ret;
}
/*
* mqtt_rtlm_chk_session() Check session(s)
*
* @cfg = loaded config
* @sql = SQL handle
* @connid = connection id
* @user = username
* @host = hostname
* return: -1 error, 0 not logged or >0 logged found rows
*/
int
mqtt_rtlm_chk_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
{
int ret = 0;
char *str, *psStmt;
sqlite3_stmt *stmt;
if (!cfg || !sql)
return -1;
str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
if (!str) {
mqtt_rtlm_log("Error:: not found online table name");
return -1;
}
psStmt = sqlite3_mprintf("SELECT ConnID, RemoteHost FROM %s WHERE "
"ConnID = '%q' AND Username LIKE '%q' AND RemoteHost LIKE '%q';",
str, connid, user, host);
if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
MQTT_RTLM_LOG(sql);
sqlite3_free(psStmt);
return -1;
} else
sqlite3_free(psStmt);
if (sqlite3_step(stmt) == SQLITE_ROW)
ret = sqlite3_changes(sql);
else
ret = 0;
sqlite3_finalize(stmt);
return ret;
}
/*
* mqtt_rtlm_write_topic() Publish topic
*
* @cfg = loaded config
* @sql = SQL handle
* @connid = connection id
* @msgid = MessageID
* @topic = topic
* @txt = text
* @txtlen = text length
* @user = username
* @host = hostname
* @qos = QoS
* @retain = !=0 retain message to database
* return: -1 error, 0 no publish or >0 published ok
*/
int
mqtt_rtlm_write_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid,
const char *topic, void *txt, int txtlen, const char *user,
const char *host, char qos, char retain)
{
int ret = 0;
char *str, *psStmt;
sqlite3_stmt *stmt;
if (!cfg || !sql || !topic)
return -1;
str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
if (!str) {
mqtt_rtlm_log("Error:: not found topics table name");
return -1;
}
psStmt = sqlite3_mprintf("INSERT INTO %s (QoS, Retain, ConnID, MsgID, Topic, Value, PubUser, "
"PubDate, PubHost) VALUES (%d, %d, '%q', %u, '%q', ?1, '%q', "
"datetime('now', 'localtime'), '%q');",
str, qos, retain, connid, msgid, topic, user, host);
if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL) || !stmt) {
MQTT_RTLM_LOG(sql);
sqlite3_free(psStmt);
return -1;
} else
sqlite3_free(psStmt);
if (sqlite3_bind_blob(stmt, 1, txt, txtlen, SQLITE_TRANSIENT)) {
MQTT_RTLM_LOG(sql);
sqlite3_finalize(stmt);
return -1;
}
if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
ret = sqlite3_changes(sql);
else {
if (ret > SQLITE_OK && ret < SQLITE_ROW)
MQTT_RTLM_LOG(sql);
ret = 0;
}
sqlite3_finalize(stmt);
return ret;
}
/*
* mqtt_rtlm_wipe_topic() Wipe all topics
*
* @cfg = loaded config
* @sql = SQL handle
* @connid = connection id
* @user = username
* @retain = -1 no matter
* return: -1 error, 0 no changes or >0 deleted rows
*/
int
mqtt_rtlm_wipe_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, char retain)
{
int ret = 0;
char *str, *rtn, *psStmt;
sqlite3_stmt *stmt;
if (!cfg || !sql || !connid)
return -1;
str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
if (!str) {
mqtt_rtlm_log("Error:: not found topics table name");
return -1;
}
switch (retain) {
case -1:
rtn = "";
break;
case 0:
rtn = "AND Retain = 0";
break;
default:
rtn = "AND Retain != 0";
break;
}
psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND "
"PubUser LIKE '%q' %s;", str, connid, user, rtn);
if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
MQTT_RTLM_LOG(sql);
sqlite3_free(psStmt);
return -1;
} else
sqlite3_free(psStmt);
if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
ret = sqlite3_changes(sql);
else {
if (ret > SQLITE_OK && ret < SQLITE_ROW)
MQTT_RTLM_LOG(sql);
ret = 0;
}
sqlite3_finalize(stmt);
return ret;
}
/*
* mqtt_rtlm_delete_topic() Delete topic
*
* @cfg = loaded config
* @sql = SQL handle
* @connid = connection id
* @msgid = MessageID
* @topic = topic
* @user = username
* @host = hostname
* @retain = -1 no matter
* return: -1 error, 0 no changes or >0 deleted rows
*/
int
mqtt_rtlm_delete_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid,
const char *topic, const char *user, const char *host, char retain)
{
int ret = 0;
char *str, *rtn, *psStmt;
sqlite3_stmt *stmt;
if (!cfg || !sql || !topic)
return -1;
str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
if (!str) {
mqtt_rtlm_log("Error:: not found topics table name");
return -1;
}
switch (retain) {
case -1:
rtn = "";
break;
case 0:
rtn = "AND Retain = 0";
break;
default:
rtn = "AND Retain != 0";
break;
}
psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND MsgID = %d AND "
"Topic LIKE '%q' AND PubUser LIKE '%q' AND PubHost LIKE '%q' %s;", str,
connid, msgid, topic, user, host, rtn);
if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
MQTT_RTLM_LOG(sql);
sqlite3_free(psStmt);
return -1;
} else
sqlite3_free(psStmt);
if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
ret = sqlite3_changes(sql);
else {
if (ret > SQLITE_OK && ret < SQLITE_ROW)
MQTT_RTLM_LOG(sql);
ret = 0;
}
sqlite3_finalize(stmt);
return ret;
}
/*
* mqtt_rtlm_read_topic() Get topic
*
* @cfg = loaded config
* @sql = SQL handle
* @connid = connection id
* @msgid = MessageID
* @topic = topic
* @retain = retain 0 get only dynamic, >0 get only retained and -1 no matter
* return: NULL error or not found and !=NULL allocated subscribe topics
*/
mqtt_subscr_t *
mqtt_rtlm_read_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid,
const char *topic, char retain)
{
int rowz = 0;
char *str, szStr[STRSIZ], *psStmt;
sqlite3_stmt *stmt;
register int j;
mqtt_subscr_t *s = NULL;
ait_val_t v;
if (!cfg || !sql || !topic)
return NULL;
switch (retain) {
case -1:
memset(szStr, 0, sizeof szStr);
break;
case 0:
snprintf(szStr, sizeof szStr, "AND Retain = 0");
break;
default:
snprintf(szStr, sizeof szStr, "AND Retain > 0");
break;
}
str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
if (!str) {
mqtt_rtlm_log("Error:: not found topics table name");
return NULL;
}
psStmt = sqlite3_mprintf("SELECT Retain, Topic, Value FROM %s WHERE "
"ConnID = '%q' AND MsgID = %d AND Topic LIKE '%q' %s;",
str, connid, msgid, topic, szStr);
if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
MQTT_RTLM_LOG(sql);
sqlite3_free(psStmt);
return NULL;
} else
sqlite3_free(psStmt);
/* calculate count of rows and allocate subscribe items */
while (sqlite3_step(stmt) == SQLITE_ROW)
rowz++;
if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
goto end;
} else
memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
sqlite3_reset(stmt);
/* fill with data */
for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
s[j].sub_topic.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
AIT_SET_PTR(&v, (void*) sqlite3_column_blob(stmt, 2), sqlite3_column_bytes(stmt, 2));
s[j].sub_value.msg_len = AIT_LEN(&v);
s[j].sub_value.msg_base = (u_char*) malloc(s[j].sub_value.msg_len);
if (s[j].sub_value.msg_base)
memcpy(s[j].sub_value.msg_base, AIT_GET_PTR(&v), s[j].sub_value.msg_len);
}
end:
sqlite3_finalize(stmt);
return s;
}
/*
* mqtt_rtlm_write_subscribe() Subscribe topic
*
* @cfg = loaded config
* @sql = SQL handle
* @connid = connection id
* @msgid = MessageID
* @topic = topic
* @user = username
* @host = hostname
* @qos = Subscribe QoS
* return: -1 error, 0 no publish or >0 published ok
*/
int
mqtt_rtlm_write_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid,
const char *topic, const char *user, const char *host, char qos)
{
int ret = 0;
char *str, *psStmt;
sqlite3_stmt *stmt;
if (!cfg || !sql || !topic)
return -1;
str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
if (!str) {
mqtt_rtlm_log("Error:: not found subscribes table name");
return -1;
}
psStmt = sqlite3_mprintf("INSERT INTO %s (ConnID, MsgID, QoS, Topic, PubUser, "
"PubDate, PubHost) VALUES ('%q', %d, %d, '%q', '%q', "
"datetime('now', 'localtime'), '%q');", str,
connid, msgid, qos, topic, user, host);
if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
MQTT_RTLM_LOG(sql);
sqlite3_free(psStmt);
return -1;
} else
sqlite3_free(psStmt);
if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
ret = sqlite3_changes(sql);
else {
if (ret > SQLITE_OK && ret < SQLITE_ROW)
MQTT_RTLM_LOG(sql);
ret = 0;
}
sqlite3_finalize(stmt);
return ret;
}
/*
* mqtt_rtlm_delete_subscribe() Delete subscribe
*
* @cfg = loaded config
* @sql = SQL handle
* @connid = connection id
* @topic = topic
* @user = username
* @host = hostname
* return: -1 error, 0 no changes or >0 deleted rows
*/
int
mqtt_rtlm_delete_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid,
const char *topic, const char *user, const char *host)
{
int ret = 0;
char *str, *psStmt;
sqlite3_stmt *stmt;
if (!cfg || !sql || !topic)
return -1;
str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
if (!str) {
mqtt_rtlm_log("Error:: not found subscribes table name");
return -1;
}
psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND "
"Topic LIKE '%q' AND PubUser LIKE '%q' AND PubHost LIKE '%q';", str,
connid, topic, user, host);
if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
MQTT_RTLM_LOG(sql);
sqlite3_free(psStmt);
return -1;
} else
sqlite3_free(psStmt);
if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
ret = sqlite3_changes(sql);
else {
if (ret > SQLITE_OK && ret < SQLITE_ROW)
MQTT_RTLM_LOG(sql);
ret = 0;
}
sqlite3_finalize(stmt);
return ret;
}
/*
* mqtt_rtlm_read_subscribe() Get subscribe topic
*
* @cfg = loaded config
* @sql = SQL handle
* @connid = connection id
* @topic = topic
* return: NULL error or not found and !=NULL allocated subscribe topics
*/
mqtt_subscr_t *
mqtt_rtlm_read_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *topic)
{
int rowz = 0;
char *str, *psStmt;
sqlite3_stmt *stmt;
register int j;
mqtt_subscr_t *s = NULL;
if (!cfg || !sql || !topic)
return NULL;
str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
if (!str) {
mqtt_rtlm_log("Error:: not found subscribes table name");
return NULL;
}
psStmt = sqlite3_mprintf("SELECT QoS, Topic FROM %s WHERE ConnID = '%q' AND "
"Topic LIKE '%q';", str, connid, topic);
if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
MQTT_RTLM_LOG(sql);
sqlite3_free(psStmt);
return NULL;
} else
sqlite3_free(psStmt);
/* calculate count of rows and allocate subscribe items */
while (sqlite3_step(stmt) == SQLITE_ROW)
rowz++;
if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
goto end;
} else
memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
sqlite3_reset(stmt);
/* fill with data */
for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
s[j].sub_topic.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
s[j].sub_value.msg_base = NULL;
s[j].sub_value.msg_len = 0;
}
end:
sqlite3_finalize(stmt);
return s;
}
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>