File:  [ELWIX - Embedded LightWeight unIX -] / mqtt / src / pubmqtt.c
Revision 1.2.2.3: download - view: text, annotated - select for diffs - revision graph
Wed Apr 11 15:08:27 2012 UTC (12 years, 2 months ago) by misho
Branches: mqtt1_1
migrate to new libaitcfg

    1: #include "global.h"
    2: 
    3: 
    4: extern const char sql_schema[];
    5: 
    6: 
    7: /*
    8:  * mqtt_db_log() Log database connection message
    9:  *
   10:  * @fmt = format string
   11:  * @... = argument list
   12:  * return: none
   13:  */
   14: static void
   15: mqtt_rtlm_log(const char *fmt, ...)
   16: {
   17: 	va_list lst;
   18: 
   19: 	va_start(lst, fmt);
   20: 	vsyslog(LOG_ERR, fmt, lst);
   21: 	va_end(lst);
   22: }
   23: #define MQTT_RTLM_LOG(_sql)	(assert((_sql)), mqtt_rtlm_log("Error:: SQL #%d - %s", \
   24: 					sqlite3_errcode((_sql)), sqlite3_errmsg((_sql))))
   25: 
   26: 
   27: /*
   28:  * mqtt_rtlm_open() Open database connection
   29:  *
   30:  * @cfg = config filename
   31:  * return: NULL error or SQL handle
   32:  */
   33: sqlite3 *
   34: mqtt_rtlm_open(cfg_root_t *cfg)
   35: {
   36: 	sqlite3 *sql = NULL;
   37: 	const char *str = NULL;
   38: 
   39: 	if (!cfg)
   40: 		return NULL;
   41: 
   42: 	/*
   43: 	if (!sqlite3_threadsafe() || sqlite3_config(SQLITE_CONFIG_SERIALIZED))
   44: 		return NULL;
   45: 		*/
   46: 
   47: 	str = cfg_getAttribute(cfg, "mqtt_pub", "name");
   48: 	if (!str) {
   49: 		mqtt_rtlm_log("Error:: Unknown database name ...\n");
   50: 		return NULL;
   51: 	}
   52: 
   53: 	if (sqlite3_open_v2(str, &sql, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL)) {
   54: 		MQTT_RTLM_LOG(sql);
   55: 		sqlite3_close(sql);
   56: 		return NULL;
   57: 	}
   58: 
   59: 	if (sqlite3_exec(sql, sql_schema, NULL, NULL, NULL)) {
   60: 		MQTT_RTLM_LOG(sql);
   61: 		sqlite3_close(sql);
   62: 		return NULL;
   63: 	}
   64: 	return sql;
   65: }
   66: 
   67: /*
   68:  * mqtt_rtlm_close() Close database connection
   69:  *
   70:  * @sql = SQL handle
   71:  * return: none
   72:  */
   73: void
   74: mqtt_rtlm_close(sqlite3 *sql)
   75: {
   76: 	sqlite3_close(sql);
   77: }
   78: 
   79: /*
   80:  * mqtt_rtlm_init_session() Create session
   81:  *
   82:  * @cfg = loaded config
   83:  * @sql = SQL handle
   84:  * @connid = connection id
   85:  * @user = username
   86:  * @host = hostname
   87:  * @will = will flag if !=0 must fill arguments
   88:  * @... = will arguments in order topic,msg,qos,retain
   89:  * return: -1 error, 0 session already appears or >0 row changed
   90:  */
   91: int
   92: mqtt_rtlm_init_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, 
   93: 		const char *host, char will, ...)
   94: {
   95: 	va_list lst;
   96: 	int ret = 0;
   97: 	char *str, szStmt[BUFSIZ] = { 0 };
   98: 	sqlite3_stmt *stmt;
   99: 
  100: 	if (!cfg || !sql)
  101: 		return -1;
  102: 
  103: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
  104: 	if (!str) {
  105: 		mqtt_rtlm_log("Error:: not found online table name");
  106: 		return -1;
  107: 	}
  108: 	if (!will)
  109: 		snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (ConnID, Username, RemoteHost, "
  110: 				"WillFlag) VALUES ('%s', '%s', '%s', 0);", str, connid, user, host);
  111: 	else {
  112: 		va_start(lst, will);
  113: 		snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (ConnID, Username, RemoteHost, "
  114: 				"WillFlag, WillRetain, WillQoS, WillMsg, WillTopic) "
  115: 				"VALUES ('%s', '%s', '%s', %d, %d, %d, '%s', '%s');", 
  116: 				str, connid, user, host, will, 
  117: 				va_arg(lst, int), va_arg(lst, int), va_arg(lst, char*), va_arg(lst, char*));
  118: 		va_end(lst);
  119: 	}
  120: 
  121: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  122: 		MQTT_RTLM_LOG(sql);
  123: 		return -1;
  124: 	}
  125: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  126: 		ret = sqlite3_changes(sql);
  127: 	else {
  128: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  129: 			MQTT_RTLM_LOG(sql);
  130: 		ret = 0;
  131: 	}
  132: 	sqlite3_finalize(stmt);
  133: 
  134: 	return ret;
  135: }
  136: 
  137: /*
  138:  * mqtt_rtlm_fini_session() Delete session(s)
  139:  *
  140:  * @cfg = loaded config
  141:  * @sql = SQL handle
  142:  * @connid = connection id
  143:  * @user = username
  144:  * @host = hostname
  145:  * return: -1 error, 0 session already appears or >0 row changed
  146:  */
  147: int
  148: mqtt_rtlm_fini_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
  149: {
  150: 	int ret = 0;
  151: 	char *str, szStmt[BUFSIZ] = { 0 };
  152: 	sqlite3_stmt *stmt;
  153: 
  154: 	if (!cfg || !sql)
  155: 		return -1;
  156: 
  157: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
  158: 	if (!str) {
  159: 		mqtt_rtlm_log("Error:: not found online table name");
  160: 		return -1;
  161: 	}
  162: 	snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE ConnID = '%s' AND Username = '%s' "
  163: 			"AND RemoteHost LIKE '%s';", str, connid, user, host);
  164: 
  165: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  166: 		MQTT_RTLM_LOG(sql);
  167: 		return -1;
  168: 	}
  169: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  170: 		ret = sqlite3_changes(sql);
  171: 	else {
  172: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  173: 			MQTT_RTLM_LOG(sql);
  174: 		ret = 0;
  175: 	}
  176: 	sqlite3_finalize(stmt);
  177: 
  178: 	return ret;
  179: }
  180: 
  181: /*
  182:  * mqtt_rtlm_chk_session() Check session(s)
  183:  *
  184:  * @cfg = loaded config
  185:  * @sql = SQL handle
  186:  * @connid = connection id
  187:  * @user = username
  188:  * @host = hostname
  189:  * return: -1 error, 0 not logged or >0 logged found rows
  190:  */
  191: int
  192: mqtt_rtlm_chk_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
  193: {
  194: 	int ret = 0;
  195: 	char *str, szStmt[BUFSIZ] = { 0 };
  196: 	sqlite3_stmt *stmt;
  197: 
  198: 	if (!cfg || !sql)
  199: 		return -1;
  200: 
  201: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
  202: 	if (!str) {
  203: 		mqtt_rtlm_log("Error:: not found online table name");
  204: 		return -1;
  205: 	}
  206: 	snprintf(szStmt, sizeof szStmt, "SELECT ConnID, RemoteHost FROM %s WHERE "
  207: 			"ConnID = '%s' AND Username = '%s' AND RemoteHost LIKE '%s';", 
  208: 			str, connid, user, host);
  209: 
  210: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  211: 		MQTT_RTLM_LOG(sql);
  212: 		return -1;
  213: 	}
  214: 	if (sqlite3_step(stmt) == SQLITE_ROW)
  215: 		ret = sqlite3_changes(sql);
  216: 	else
  217: 		ret = 0;
  218: 	sqlite3_finalize(stmt);
  219: 
  220: 	return ret;
  221: }
  222: 
  223: /*
  224:  * mqtt_rtlm_write_topic() Publish topic
  225:  *
  226:  * @cfg = loaded config
  227:  * @sql = SQL handle
  228:  * @msgid = MessageID
  229:  * @topic = topic
  230:  * @txt = text
  231:  * @user = username
  232:  * @host = hostname
  233:  * @retain = !=0 retain message to database
  234:  * return: -1 error, 0 no publish or >0 published ok
  235:  */
  236: int
  237: mqtt_rtlm_write_topic(cfg_root_t *cfg, sqlite3 *sql, u_short msgid, const char *topic, const char *txt, 
  238: 		const char *user, const char *host, char retain)
  239: {
  240: 	int ret = 0;
  241: 	char *str, szStmt[BUFSIZ] = { 0 };
  242: 	sqlite3_stmt *stmt;
  243: 
  244: 	if (!cfg || !sql || !topic)
  245: 		return -1;
  246: 
  247: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
  248: 	if (!str) {
  249: 		mqtt_rtlm_log("Error:: not found topics table name");
  250: 		return -1;
  251: 	}
  252: 	snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Retain, MsgID, Topic, Value, PubUser, "
  253: 			"PubDate, PubHost) VALUES (%d, %d, '%s', '%s', '%s', "
  254: 			"datetime('now', 'localtime'), '%s');", 
  255: 			str, retain, msgid, topic, txt, user, host);
  256: 
  257: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  258: 		MQTT_RTLM_LOG(sql);
  259: 		return -1;
  260: 	}
  261: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  262: 		ret = sqlite3_changes(sql);
  263: 	else {
  264: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  265: 			MQTT_RTLM_LOG(sql);
  266: 		ret = 0;
  267: 	}
  268: 	sqlite3_finalize(stmt);
  269: 
  270: 	return ret;
  271: }
  272: 
  273: /*
  274:  * mqtt_rtlm_delete_topic() Delete topic
  275:  *
  276:  * @cfg = loaded config
  277:  * @sql = SQL handle
  278:  * @msgid = MessageID
  279:  * @topic = topic
  280:  * @user = username
  281:  * @host = hostname
  282:  * @retain = -1 no matter
  283:  * return: -1 error, 0 no changes or >0 deleted rows
  284:  */
  285: int
  286: mqtt_rtlm_delete_topic(cfg_root_t *cfg, sqlite3 *sql, u_short msgid, const char *topic, 
  287: 		const char *user, const char *host, char retain)
  288: {
  289: 	int ret = 0;
  290: 	char *str, *rtn, szStmt[BUFSIZ] = { 0 };
  291: 	sqlite3_stmt *stmt;
  292: 
  293: 	if (!cfg || !sql || !topic)
  294: 		return -1;
  295: 
  296: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
  297: 	if (!str) {
  298: 		mqtt_rtlm_log("Error:: not found topics table name");
  299: 		return -1;
  300: 	}
  301: 	switch (retain) {
  302: 		case -1:
  303: 			rtn = "";
  304: 			break;
  305: 		case 0:
  306: 			rtn = "AND Retain = 0";
  307: 			break;
  308: 		default:
  309: 			rtn = "AND Retain != 0";
  310: 			break;
  311: 	}
  312: 	snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE MsgID = %d AND Topic LIKE '%s' AND "
  313: 			"PubUser LIKE '%s' AND PubHost LIKE '%s' %s;", str, 
  314: 			msgid, topic, user, host, rtn);
  315: 
  316: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  317: 		MQTT_RTLM_LOG(sql);
  318: 		return -1;
  319: 	}
  320: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  321: 		ret = sqlite3_changes(sql);
  322: 	else {
  323: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  324: 			MQTT_RTLM_LOG(sql);
  325: 		ret = 0;
  326: 	}
  327: 	sqlite3_finalize(stmt);
  328: 
  329: 	return ret;
  330: }
  331: 
  332: /*
  333:  * mqtt_rtlm_read_topic() Get topic
  334:  *
  335:  * @cfg = loaded config
  336:  * @sql = SQL handle
  337:  * @msgid = MessageID
  338:  * @topic = topic
  339:  * @retain = retain 0 get only dynamic, >0 get only retained and -1 no matter
  340:  * return: NULL error or not found and !=NULL allocated subscribe topics
  341:  */
  342: mqtt_subscr_t *
  343: mqtt_rtlm_read_topic(cfg_root_t *cfg, sqlite3 *sql, u_short msgid, const char *topic, char retain)
  344: {
  345: 	int rowz = 0;
  346: 	char *str, szStr[STRSIZ], szStmt[BUFSIZ] = { 0 };
  347: 	sqlite3_stmt *stmt;
  348: 	register int j;
  349: 	mqtt_subscr_t *s = NULL;
  350: 
  351: 	if (!cfg || !sql || !topic)
  352: 		return NULL;
  353: 
  354: 	switch (retain) {
  355: 		case -1:
  356: 			memset(szStr, 0, sizeof szStr);
  357: 			break;
  358: 		case 0:
  359: 			snprintf(szStr, sizeof szStr, "AND Retain = 0");
  360: 			break;
  361: 		default:
  362: 			snprintf(szStr, sizeof szStr, "AND Retain > 0");
  363: 			break;
  364: 	}
  365: 
  366: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
  367: 	if (!str) {
  368: 		mqtt_rtlm_log("Error:: not found topics table name");
  369: 		return NULL;
  370: 	}
  371: 	snprintf(szStmt, sizeof szStmt, "SELECT Retain, Topic, Value FROM %s WHERE "
  372: 			"MsgID = %d AND Topic LIKE '%s' %s;", 
  373: 			str, msgid, topic, szStr);
  374: 
  375: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  376: 		MQTT_RTLM_LOG(sql);
  377: 		return NULL;
  378: 	}
  379: 
  380: 	/* calculate count of rows and allocate subscribe items */
  381: 	while (sqlite3_step(stmt) == SQLITE_ROW)
  382: 		rowz++;
  383: 	if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
  384: 		mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
  385: 		goto end;
  386: 	} else
  387: 		memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
  388: 	sqlite3_reset(stmt);
  389: 
  390: 	/* fill with data */
  391: 	for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
  392: 		s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
  393: 		s[j].sub_topic.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
  394: 		s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
  395: 		s[j].sub_value.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 2));
  396: 		s[j].sub_value.msg_len = strlen((char*) s[j].sub_value.msg_base);
  397: 	}
  398: end:
  399: 	sqlite3_finalize(stmt);
  400: 
  401: 	return s;
  402: }
  403: 
  404: /*
  405:  * mqtt_rtlm_write_subscribe() Subscribe topic
  406:  *
  407:  * @cfg = loaded config
  408:  * @sql = SQL handle
  409:  * @msgid = MessageID
  410:  * @topic = topic
  411:  * @user = username
  412:  * @host = hostname
  413:  * @qos = Subscribe QoS
  414:  * return: -1 error, 0 no publish or >0 published ok
  415:  */
  416: int
  417: mqtt_rtlm_write_subscribe(cfg_root_t *cfg, sqlite3 *sql, u_short msgid, const char *topic, 
  418: 		const char *user, const char *host, char qos)
  419: {
  420: 	int ret = 0;
  421: 	char *str, szStmt[BUFSIZ] = { 0 };
  422: 	sqlite3_stmt *stmt;
  423: 
  424: 	if (!cfg || !sql || !topic)
  425: 		return -1;
  426: 
  427: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
  428: 	if (!str) {
  429: 		mqtt_rtlm_log("Error:: not found subscribes table name");
  430: 		return -1;
  431: 	}
  432: 	snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (MsgID, QoS, Topic, PubUser, "
  433: 			"PubDate, PubHost) VALUES (%d, %d, '%s', '%s', "
  434: 			"datetime('now', 'localtime'), '%s');", str, 
  435: 			msgid, qos, topic, user, host);
  436: 
  437: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  438: 		MQTT_RTLM_LOG(sql);
  439: 		return -1;
  440: 	}
  441: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  442: 		ret = sqlite3_changes(sql);
  443: 	else {
  444: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  445: 			MQTT_RTLM_LOG(sql);
  446: 		ret = 0;
  447: 	}
  448: 	sqlite3_finalize(stmt);
  449: 
  450: 	return ret;
  451: }
  452: 
  453: /*
  454:  * mqtt_rtlm_delete_subscribe() Delete subscribe
  455:  *
  456:  * @cfg = loaded config
  457:  * @sql = SQL handle
  458:  * @topic = topic
  459:  * @user = username
  460:  * @host = hostname
  461:  * @qos = Subscribe QoS if -1 no matter
  462:  * return: -1 error, 0 no changes or >0 deleted rows
  463:  */
  464: int
  465: mqtt_rtlm_delete_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *topic, 
  466: 		const char *user, const char *host, char qos)
  467: {
  468: 	int ret = 0;
  469: 	char *str, szStr[STRSIZ] = { 0 }, szStmt[BUFSIZ] = { 0 };
  470: 	sqlite3_stmt *stmt;
  471: 
  472: 	if (!cfg || !sql || !topic)
  473: 		return -1;
  474: 
  475: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
  476: 	if (!str) {
  477: 		mqtt_rtlm_log("Error:: not found subscribes table name");
  478: 		return -1;
  479: 	}
  480: 	if (qos > -1 && qos < 3)
  481: 		snprintf(szStr, sizeof szStr, "AND QoS = %d", qos);
  482: 	snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE Topic LIKE '%s' AND "
  483: 			"PubUser LIKE '%s' AND PubHost LIKE '%s' %s;", str, 
  484: 			topic, user, host, szStr);
  485: 
  486: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  487: 		MQTT_RTLM_LOG(sql);
  488: 		return -1;
  489: 	}
  490: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  491: 		ret = sqlite3_changes(sql);
  492: 	else {
  493: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  494: 			MQTT_RTLM_LOG(sql);
  495: 		ret = 0;
  496: 	}
  497: 	sqlite3_finalize(stmt);
  498: 
  499: 	return ret;
  500: }
  501: 
  502: /*
  503:  * mqtt_rtlm_read_subscribe() Get subscribe topic
  504:  *
  505:  * @cfg = loaded config
  506:  * @sql = SQL handle
  507:  * @topic = topic
  508:  * return: NULL error or not found and !=NULL allocated subscribe topics
  509:  */
  510: mqtt_subscr_t *
  511: mqtt_rtlm_read_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *topic)
  512: {
  513: 	int rowz = 0;
  514: 	char *str, szStmt[BUFSIZ] = { 0 };
  515: 	sqlite3_stmt *stmt;
  516: 	register int j;
  517: 	mqtt_subscr_t *s = NULL;
  518: 
  519: 	if (!cfg || !sql || !topic)
  520: 		return NULL;
  521: 
  522: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
  523: 	if (!str) {
  524: 		mqtt_rtlm_log("Error:: not found subscribes table name");
  525: 		return NULL;
  526: 	}
  527: 	snprintf(szStmt, sizeof szStmt, "SELECT QoS, Topic FROM %s WHERE Topic LIKE '%s';", str, topic);
  528: 
  529: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  530: 		MQTT_RTLM_LOG(sql);
  531: 		return NULL;
  532: 	}
  533: 
  534: 	/* calculate count of rows and allocate subscribe items */
  535: 	while (sqlite3_step(stmt) == SQLITE_ROW)
  536: 		rowz++;
  537: 	if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
  538: 		mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
  539: 		goto end;
  540: 	} else
  541: 		memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
  542: 	sqlite3_reset(stmt);
  543: 
  544: 	/* fill with data */
  545: 	for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
  546: 		s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
  547: 		s[j].sub_topic.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
  548: 		s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
  549: 		s[j].sub_value.msg_base = NULL;
  550: 		s[j].sub_value.msg_len = 0;
  551: 	}
  552: end:
  553: 	sqlite3_finalize(stmt);
  554: 
  555: 	return s;
  556: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>