File:  [ELWIX - Embedded LightWeight unIX -] / mqtt / src / pubmqtt.c
Revision 1.3: download - view: text, annotated - select for diffs - revision graph
Tue Jul 3 09:02:50 2012 UTC (11 years, 11 months ago) by misho
Branches: MAIN
CVS tags: mqtt1_2, MQTT1_1, HEAD
version 1.1

#include "global.h"


extern const char sql_schema[];


/*
 * mqtt_db_log() Log database connection message
 *
 * @fmt = format string
 * @... = argument list
 * return: none
 */
static void
mqtt_rtlm_log(const char *fmt, ...)
{
	va_list lst;

	va_start(lst, fmt);
	vsyslog(LOG_ERR, fmt, lst);
	va_end(lst);
}
#define MQTT_RTLM_LOG(_sql)	(assert((_sql)), mqtt_rtlm_log("Error:: %s(%d) SQL #%d - %s", \
					__func__, __LINE__, \
					sqlite3_errcode((_sql)), sqlite3_errmsg((_sql))))

/* library pre-loaded actions */
void
_init()
{
	sqlite3_initialize();
}

void
_fini()
{
	sqlite3_shutdown();
}


/*
 * mqtt_rtlm_open() Open database connection
 *
 * @cfg = config filename
 * return: NULL error or SQL handle
 */
sqlite3 *
mqtt_rtlm_open(cfg_root_t *cfg)
{
	sqlite3 *sql = NULL;
	const char *str = NULL;

	if (!cfg)
		return NULL;

	str = cfg_getAttribute(cfg, "mqtt_pub", "name");
	if (!str) {
		mqtt_rtlm_log("Error:: Unknown database name ...\n");
		return NULL;
	}

	if (sqlite3_open_v2(str, &sql, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL)) {
		MQTT_RTLM_LOG(sql);
		sqlite3_close(sql);
		return NULL;
	}

	if (sqlite3_exec(sql, sql_schema, NULL, NULL, NULL)) {
		MQTT_RTLM_LOG(sql);
		sqlite3_close(sql);
		return NULL;
	}
	return sql;
}

/*
 * mqtt_rtlm_close() Close database connection
 *
 * @sql = SQL handle
 * return: none
 */
void
mqtt_rtlm_close(sqlite3 *sql)
{
	sqlite3_close(sql);
}

/*
 * mqtt_rtlm_init_session() Create session
 *
 * @cfg = loaded config
 * @sql = SQL handle
 * @connid = connection id
 * @user = username
 * @host = hostname
 * @will = will flag if !=0 must fill arguments
 * @... = will arguments in order topic,msg,qos,retain
 * return: -1 error, 0 session already appears or >0 row changed
 */
int
mqtt_rtlm_init_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, 
		const char *host, char will, ...)
{
	va_list lst;
	int ret = 0;
	char *str, *psStmt;
	sqlite3_stmt *stmt;

	if (!cfg || !sql)
		return -1;

	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
	if (!str) {
		mqtt_rtlm_log("Error:: not found online table name");
		return -1;
	}
	if (!will)
		psStmt = sqlite3_mprintf("INSERT INTO %s (ConnID, Username, RemoteHost, "
				"WillFlag) VALUES ('%q', '%q', '%q', 0);", str, connid, user, host);
	else {
		va_start(lst, will);
		psStmt = sqlite3_mprintf("INSERT INTO %s (ConnID, Username, RemoteHost, "
				"WillFlag, WillRetain, WillQoS, WillMsg, WillTopic) "
				"VALUES ('%q', '%q', '%q', %d, %d, %d, '%q', '%q');", 
				str, connid, user, host, will, 
				va_arg(lst, int), va_arg(lst, int), va_arg(lst, char*), va_arg(lst, char*));
		va_end(lst);
	}

	if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
		MQTT_RTLM_LOG(sql);
		sqlite3_free(psStmt);
		return -1;
	} else
		sqlite3_free(psStmt);
	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
		ret = sqlite3_changes(sql);
	else {
		if (ret > SQLITE_OK && ret < SQLITE_ROW)
			MQTT_RTLM_LOG(sql);
		ret = 0;
	}
	sqlite3_finalize(stmt);

	return ret;
}

/*
 * mqtt_rtlm_fini_session() Delete session(s)
 *
 * @cfg = loaded config
 * @sql = SQL handle
 * @connid = connection id
 * @user = username
 * @host = hostname
 * return: -1 error, 0 session already appears or >0 row changed
 */
int
mqtt_rtlm_fini_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
{
	int ret = 0;
	char *str, *psStmt;
	sqlite3_stmt *stmt;

	if (!cfg || !sql)
		return -1;

	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
	if (!str) {
		mqtt_rtlm_log("Error:: not found online table name");
		return -1;
	}
	psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND Username = '%q' "
			"AND RemoteHost LIKE '%q';", str, connid, user, host);

	if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
		MQTT_RTLM_LOG(sql);
		sqlite3_free(psStmt);
		return -1;
	} else
		sqlite3_free(psStmt);
	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
		ret = sqlite3_changes(sql);
	else {
		if (ret > SQLITE_OK && ret < SQLITE_ROW)
			MQTT_RTLM_LOG(sql);
		ret = 0;
	}
	sqlite3_finalize(stmt);

	return ret;
}

/*
 * mqtt_rtlm_chk_session() Check session(s)
 *
 * @cfg = loaded config
 * @sql = SQL handle
 * @connid = connection id
 * @user = username
 * @host = hostname
 * return: -1 error, 0 not logged or >0 logged found rows
 */
int
mqtt_rtlm_chk_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
{
	int ret = 0;
	char *str, *psStmt;
	sqlite3_stmt *stmt;

	if (!cfg || !sql)
		return -1;

	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
	if (!str) {
		mqtt_rtlm_log("Error:: not found online table name");
		return -1;
	}
	psStmt = sqlite3_mprintf("SELECT ConnID, RemoteHost FROM %s WHERE "
			"ConnID = '%q' AND Username LIKE '%q' AND RemoteHost LIKE '%q';", 
			str, connid, user, host);

	if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
		MQTT_RTLM_LOG(sql);
		sqlite3_free(psStmt);
		return -1;
	} else
		sqlite3_free(psStmt);
	if (sqlite3_step(stmt) == SQLITE_ROW)
		ret = sqlite3_changes(sql);
	else
		ret = 0;
	sqlite3_finalize(stmt);

	return ret;
}

/*
 * mqtt_rtlm_write_topic() Publish topic
 *
 * @cfg = loaded config
 * @sql = SQL handle
 * @connid = connection id
 * @msgid = MessageID
 * @topic = topic
 * @txt = text
 * @txtlen = text length
 * @user = username
 * @host = hostname
 * @qos = QoS
 * @retain = !=0 retain message to database
 * return: -1 error, 0 no publish or >0 published ok
 */
int
mqtt_rtlm_write_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
		const char *topic, void *txt, int txtlen, const char *user, 
		const char *host, char qos, char retain)
{
	int ret = 0;
	char *str, *psStmt;
	sqlite3_stmt *stmt;

	if (!cfg || !sql || !topic)
		return -1;

	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
	if (!str) {
		mqtt_rtlm_log("Error:: not found topics table name");
		return -1;
	}
	psStmt = sqlite3_mprintf("INSERT INTO %s (QoS, Retain, ConnID, MsgID, Topic, Value, PubUser, "
			"PubDate, PubHost) VALUES (%d, %d, '%q', %u, '%q', ?1, '%q', "
			"datetime('now', 'localtime'), '%q');", 
			str, qos, retain, connid, msgid, topic, user, host);

	if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL) || !stmt) {
		MQTT_RTLM_LOG(sql);
		sqlite3_free(psStmt);
		return -1;
	} else
		sqlite3_free(psStmt);
	if (sqlite3_bind_blob(stmt, 1, txt, txtlen, SQLITE_TRANSIENT)) {
		MQTT_RTLM_LOG(sql);
		sqlite3_finalize(stmt);
		return -1;
	}
	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
		ret = sqlite3_changes(sql);
	else {
		if (ret > SQLITE_OK && ret < SQLITE_ROW)
			MQTT_RTLM_LOG(sql);
		ret = 0;
	}
	sqlite3_finalize(stmt);

	return ret;
}

/*
 * mqtt_rtlm_wipe_topic() Wipe all topics
 *
 * @cfg = loaded config
 * @sql = SQL handle
 * @connid = connection id
 * @user = username
 * @retain = -1 no matter
 * return: -1 error, 0 no changes or >0 deleted rows
 */
int
mqtt_rtlm_wipe_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, char retain)
{
	int ret = 0;
	char *str, *rtn, *psStmt;
	sqlite3_stmt *stmt;

	if (!cfg || !sql || !connid)
		return -1;

	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
	if (!str) {
		mqtt_rtlm_log("Error:: not found topics table name");
		return -1;
	}
	switch (retain) {
		case -1:
			rtn = "";
			break;
		case 0:
			rtn = "AND Retain = 0";
			break;
		default:
			rtn = "AND Retain != 0";
			break;
	}
	psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND "
			"PubUser LIKE '%q' %s;", str, connid, user, rtn);

	if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
		MQTT_RTLM_LOG(sql);
		sqlite3_free(psStmt);
		return -1;
	} else
		sqlite3_free(psStmt);
	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
		ret = sqlite3_changes(sql);
	else {
		if (ret > SQLITE_OK && ret < SQLITE_ROW)
			MQTT_RTLM_LOG(sql);
		ret = 0;
	}
	sqlite3_finalize(stmt);

	return ret;
}

/*
 * mqtt_rtlm_delete_topic() Delete topic
 *
 * @cfg = loaded config
 * @sql = SQL handle
 * @connid = connection id
 * @msgid = MessageID
 * @topic = topic
 * @user = username
 * @host = hostname
 * @retain = -1 no matter
 * return: -1 error, 0 no changes or >0 deleted rows
 */
int
mqtt_rtlm_delete_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
		const char *topic, const char *user, const char *host, char retain)
{
	int ret = 0;
	char *str, *rtn, *psStmt;
	sqlite3_stmt *stmt;

	if (!cfg || !sql || !topic)
		return -1;

	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
	if (!str) {
		mqtt_rtlm_log("Error:: not found topics table name");
		return -1;
	}
	switch (retain) {
		case -1:
			rtn = "";
			break;
		case 0:
			rtn = "AND Retain = 0";
			break;
		default:
			rtn = "AND Retain != 0";
			break;
	}
	psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND MsgID = %d AND "
			"Topic LIKE '%q' AND PubUser LIKE '%q' AND PubHost LIKE '%q' %s;", str, 
			connid, msgid, topic, user, host, rtn);

	if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
		MQTT_RTLM_LOG(sql);
		sqlite3_free(psStmt);
		return -1;
	} else
		sqlite3_free(psStmt);
	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
		ret = sqlite3_changes(sql);
	else {
		if (ret > SQLITE_OK && ret < SQLITE_ROW)
			MQTT_RTLM_LOG(sql);
		ret = 0;
	}
	sqlite3_finalize(stmt);

	return ret;
}

/*
 * mqtt_rtlm_read_topic() Get topic
 *
 * @cfg = loaded config
 * @sql = SQL handle
 * @connid = connection id
 * @topic = topic
 * @retain = retain 0 get only dynamic, >0 get only retained and -1 no matter
 * return: NULL error or not found and !=NULL allocated subscribe topics
 */
mqtt_subscr_t *
mqtt_rtlm_read_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, 
		const char *topic, char retain)
{
	int rowz = 0;
	char *str, szStr[STRSIZ], *psStmt;
	sqlite3_stmt *stmt;
	register int j;
	mqtt_subscr_t *s = NULL;
	ait_val_t v;

	if (!cfg || !sql || !topic)
		return NULL;

	switch (retain) {
		case -1:
			memset(szStr, 0, sizeof szStr);
			break;
		case 0:
			snprintf(szStr, sizeof szStr, "AND Retain = 0");
			break;
		default:
			snprintf(szStr, sizeof szStr, "AND Retain > 0");
			break;
	}

	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
	if (!str) {
		mqtt_rtlm_log("Error:: not found topics table name");
		return NULL;
	}
	psStmt = sqlite3_mprintf("SELECT QoS, Topic, Value  FROM %s WHERE "
			"ConnID LIKE '%q' AND Topic LIKE '%q' %s;", 
			str, connid, topic, szStr);

	if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
		MQTT_RTLM_LOG(sql);
		sqlite3_free(psStmt);
		return NULL;
	} else
		sqlite3_free(psStmt);

	/* calculate count of rows and allocate subscribe items */
	while (sqlite3_step(stmt) == SQLITE_ROW)
		rowz++;
	if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
		mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
		goto end;
	} else
		memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
	sqlite3_reset(stmt);

	/* fill with data */
	for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
		s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
		s[j].sub_topic.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
		s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
		AIT_SET_PTR(&v, (void*) sqlite3_column_blob(stmt, 2), sqlite3_column_bytes(stmt, 2));
		s[j].sub_value.msg_len = AIT_LEN(&v);
		s[j].sub_value.msg_base = (u_char*) malloc(s[j].sub_value.msg_len);
		if (s[j].sub_value.msg_base)
			memcpy(s[j].sub_value.msg_base, AIT_GET_PTR(&v), s[j].sub_value.msg_len);
		AIT_FREE_VAL(&v);
	}
end:
	sqlite3_finalize(stmt);

	return s;
}

/*
 * mqtt_rtlm_write_subscribe() Subscribe topic
 *
 * @cfg = loaded config
 * @sql = SQL handle
 * @connid = connection id
 * @msgid = MessageID
 * @topic = topic
 * @user = username
 * @host = hostname
 * @qos = Subscribe QoS
 * return: -1 error, 0 no publish or >0 published ok
 */
int
mqtt_rtlm_write_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
		const char *topic, const char *user, const char *host, char qos)
{
	int ret = 0;
	char *str, *psStmt;
	sqlite3_stmt *stmt;

	if (!cfg || !sql || !topic)
		return -1;

	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
	if (!str) {
		mqtt_rtlm_log("Error:: not found subscribes table name");
		return -1;
	}
	psStmt = sqlite3_mprintf("INSERT INTO %s (ConnID, MsgID, QoS, Topic, PubUser, "
			"PubDate, PubHost) VALUES ('%q', %d, %d, '%q', '%q', "
			"datetime('now', 'localtime'), '%q');", str, 
			connid, msgid, qos, topic, user, host);

	if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
		MQTT_RTLM_LOG(sql);
		sqlite3_free(psStmt);
		return -1;
	} else
		sqlite3_free(psStmt);
	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
		ret = sqlite3_changes(sql);
	else {
		if (ret > SQLITE_OK && ret < SQLITE_ROW)
			MQTT_RTLM_LOG(sql);
		ret = 0;
	}
	sqlite3_finalize(stmt);

	return ret;
}

/*
 * mqtt_rtlm_delete_subscribe() Delete subscribe
 *
 * @cfg = loaded config
 * @sql = SQL handle
 * @connid = connection id
 * @topic = topic
 * @user = username
 * @host = hostname
 * return: -1 error, 0 no changes or >0 deleted rows
 */
int
mqtt_rtlm_delete_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, 
		const char *topic, const char *user, const char *host)
{
	int ret = 0;
	char *str, *psStmt;
	sqlite3_stmt *stmt;

	if (!cfg || !sql || !topic)
		return -1;

	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
	if (!str) {
		mqtt_rtlm_log("Error:: not found subscribes table name");
		return -1;
	}
	psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND "
			"Topic LIKE '%q' AND PubUser LIKE '%q' AND PubHost LIKE '%q';", str, 
			connid, topic, user, host);

	if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
		MQTT_RTLM_LOG(sql);
		sqlite3_free(psStmt);
		return -1;
	} else
		sqlite3_free(psStmt);
	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
		ret = sqlite3_changes(sql);
	else {
		if (ret > SQLITE_OK && ret < SQLITE_ROW)
			MQTT_RTLM_LOG(sql);
		ret = 0;
	}
	sqlite3_finalize(stmt);

	return ret;
}

/*
 * mqtt_rtlm_read_subscribe() Get subscribe topic
 *
 * @cfg = loaded config
 * @sql = SQL handle
 * @connid = connection id
 * @topic = topic
 * return: NULL error or not found and !=NULL allocated subscribe topics
 */
mqtt_subscr_t *
mqtt_rtlm_read_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *topic)
{
	int rowz = 0;
	char *str, *psStmt;
	sqlite3_stmt *stmt;
	register int j;
	mqtt_subscr_t *s = NULL;

	if (!cfg || !sql || !topic)
		return NULL;

	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
	if (!str) {
		mqtt_rtlm_log("Error:: not found subscribes table name");
		return NULL;
	}
	psStmt = sqlite3_mprintf("SELECT QoS, Topic FROM %s WHERE ConnID = '%q' AND "
			"Topic LIKE '%q';", str, connid, topic);

	if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
		MQTT_RTLM_LOG(sql);
		sqlite3_free(psStmt);
		return NULL;
	} else
		sqlite3_free(psStmt);

	/* calculate count of rows and allocate subscribe items */
	while (sqlite3_step(stmt) == SQLITE_ROW)
		rowz++;
	if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
		mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
		goto end;
	} else
		memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
	sqlite3_reset(stmt);

	/* fill with data */
	for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
		s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
		s[j].sub_topic.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
		s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
		s[j].sub_value.msg_base = NULL;
		s[j].sub_value.msg_len = 0;
	}
end:
	sqlite3_finalize(stmt);

	return s;
}

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>