File:  [ELWIX - Embedded LightWeight unIX -] / mqtt / src / pubmqtt.c
Revision 1.1.2.8: download - view: text, annotated - select for diffs - revision graph
Fri Nov 25 15:29:30 2011 UTC (12 years, 6 months ago) by misho
Branches: mqtt1_0
add publish and delete function for topics

#include "global.h"


/*
 * mqtt_db_log() Log database connection message
 *
 * @fmt = format string
 * @... = argument list
 * return: none
 */
static void
mqtt_rtlm_log(const char *fmt, ...)
{
	va_list lst;

	va_start(lst, fmt);
	vsyslog(LOG_ERR, fmt, lst);
	va_end(lst);
}
#define MQTT_RTLM_LOG(_sql)	(assert((_sql)), mqtt_rtlm_log("Error:: SQL #%d - %s", \
					sqlite3_errcode((_sql)), sqlite3_errmsg((_sql))))


/*
 * mqtt_rtlm_open() Open database connection
 *
 * @cfg = config filename
 * return: NULL error or SQL handle
 */
sqlite3 *
mqtt_rtlm_open(sl_config *cfg)
{
	sqlite3 *sql = NULL;
	const char *str = NULL;

	if (!cfg)
		return NULL;

	str = (const char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("name"));
	if (!str) {
		mqtt_rtlm_log("Error:: Unknown database name ...\n");
		return NULL;
	}

	if (sqlite3_open_v2(str, &sql, SQLITE_OPEN_READWRITE, NULL)) {
		MQTT_RTLM_LOG(sql);
		sqlite3_close(sql);
		return NULL;
	}

	return sql;
}

/*
 * mqtt_rtlm_close() Close database connection
 *
 * @sql = SQL handle
 * return: none
 */
void
mqtt_rtlm_close(sqlite3 *sql)
{
	sqlite3_close(sql);
}

/*
 * mqtt_rtlm_init_session() Create session
 *
 * @cfg = loaded config
 * @sql = SQL handle
 * @user = username
 * @host = hostname
 * @port = port
 * return: -1 error, 0 session already appears or >0 row changed
 */
int
mqtt_rtlm_init_session(sl_config *cfg, sqlite3 *sql, const char *user, const char *host, u_short port)
{
	int ret = 0;
	char *str, szStmt[BUFSIZ] = { 0 };
	sqlite3_stmt *stmt;

	if (!cfg || !sql)
		return -1;

	str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
	if (!str) {
		mqtt_rtlm_log("Error:: not found online table name");
		return -1;
	}
	snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Username, RemoteHost, RemotePort) "
		       "VALUES ('%s', '%s', %d);", str, user, host, port);

	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
		MQTT_RTLM_LOG(sql);
		return -1;
	}
	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
		ret = sqlite3_changes(sql);
	else {
		if (ret > SQLITE_OK && ret < SQLITE_ROW)
			MQTT_RTLM_LOG(sql);
		ret = 0;
	}
	sqlite3_finalize(stmt);

	return ret;
}

/*
 * mqtt_rtlm_fini_session() Delete session(s)
 *
 * @cfg = loaded config
 * @sql = SQL handle
 * @user = username
 * @host = hostname
 * return: -1 error, 0 session already appears or >0 row changed
 */
int
mqtt_rtlm_fini_session(sl_config *cfg, sqlite3 *sql, const char *user, const char *host)
{
	int ret = 0;
	char *str, szStmt[BUFSIZ] = { 0 };
	sqlite3_stmt *stmt;

	if (!cfg || !sql)
		return -1;

	str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
	if (!str) {
		mqtt_rtlm_log("Error:: not found online table name");
		return -1;
	}
	snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE Username = '%s' AND RemoteHost LIKE '%s';", 
			str, user, host);

	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
		MQTT_RTLM_LOG(sql);
		return -1;
	}
	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
		ret = sqlite3_changes(sql);
	else {
		if (ret > SQLITE_OK && ret < SQLITE_ROW)
			MQTT_RTLM_LOG(sql);
		ret = 0;
	}
	sqlite3_finalize(stmt);

	return ret;
}

/*
 * mqtt_rtlm_chk_session() Check session(s)
 *
 * @cfg = loaded config
 * @sql = SQL handle
 * @user = username
 * @host = hostname
 * return: -1 error, 0 not logged or >0 logged found rows
 */
int
mqtt_rtlm_chk_session(sl_config *cfg, sqlite3 *sql, const char *user, const char *host)
{
	int ret = 0;
	char *str, szStmt[BUFSIZ] = { 0 };
	sqlite3_stmt *stmt;

	if (!cfg || !sql)
		return -1;

	str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
	if (!str) {
		mqtt_rtlm_log("Error:: not found online table name");
		return -1;
	}
	snprintf(szStmt, sizeof szStmt, "SELECT RemoteHost, RemotePort FROM %s WHERE "
			"Username = '%s' AND RemoteHost LIKE '%s';", str, user, host);

	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
		MQTT_RTLM_LOG(sql);
		return -1;
	}
	if (sqlite3_step(stmt) == SQLITE_ROW)
		ret = sqlite3_changes(sql);
	else
		ret = 0;
	sqlite3_finalize(stmt);

	return ret;
}

/*
 * mqtt_rtlm_write_topic() Publish topic
 *
 * @cfg = loaded config
 * @sql = SQL handle
 * @topic = topic
 * @txt = text
 * @user = username
 * @host = hostname
 * @retain = !=0 retain message to database
 * return: -1 error, 0 no publish or >0 published ok
 */
int
mqtt_rtlm_write_topic(sl_config *cfg, sqlite3 *sql, const char *topic, const char *txt, 
		const char *user, const char *host, char retain)
{
	int ret = 0;
	char *str, szStmt[BUFSIZ] = { 0 };
	sqlite3_stmt *stmt;

	if (!cfg || !sql || !topic)
		return -1;

	str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));
	if (!str) {
		mqtt_rtlm_log("Error:: not found topics table name");
		return -1;
	}
	snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Retain, Topic, Value, PubUser, PubDate, PubHost) "
			"VALUES (%d, '%s', '%s', '%s', datetime('now', 'localtime'), '%s');", str, 
			retain, topic, txt, user, host);

	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
		MQTT_RTLM_LOG(sql);
		return -1;
	}
	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
		ret = sqlite3_changes(sql);
	else {
		if (ret > SQLITE_OK && ret < SQLITE_ROW)
			MQTT_RTLM_LOG(sql);
		ret = 0;
	}
	sqlite3_finalize(stmt);

	return ret;
}

/*
 * mqtt_rtlm_delete_topic() Delete topic
 *
 * @cfg = loaded config
 * @sql = SQL handle
 * @topic = topic
 * @user = username
 * @host = hostname
 * @retain = -1 no matter
 * return: -1 error, 0 no changes or >0 deleted rows
 */
int
mqtt_rtlm_delete_topic(sl_config *cfg, sqlite3 *sql, const char *topic, 
		const char *user, const char *host, char retain)
{
	int ret = 0;
	char *str, *rtn, szStmt[BUFSIZ] = { 0 };
	sqlite3_stmt *stmt;

	if (!cfg || !sql || !topic)
		return -1;

	str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));
	if (!str) {
		mqtt_rtlm_log("Error:: not found topics table name");
		return -1;
	}
	switch (retain) {
		case -1:
			rtn = "";
			break;
		case 0:
			rtn = "AND Retain = 0";
			break;
		default:
			rtn = "AND Retain != 0";
			break;
	}
	snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE Topic LIKE '%s' AND "
			"PubUser LIKE '%s' AND PubHost LIKE '%s' %s;", str, 
			topic, user, host, rtn);

	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
		MQTT_RTLM_LOG(sql);
		return -1;
	}
	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
		ret = sqlite3_changes(sql);
	else {
		if (ret > SQLITE_OK && ret < SQLITE_ROW)
			MQTT_RTLM_LOG(sql);
		ret = 0;
	}
	sqlite3_finalize(stmt);

	return ret;
}

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>