File:  [ELWIX - Embedded LightWeight unIX -] / mqtt / src / pubmqtt.c
Revision 1.5: download - view: text, annotated - select for diffs - revision graph
Sun Oct 8 22:49:25 2017 UTC (6 years, 8 months ago) by misho
Branches: MAIN
CVS tags: mqtt2_1, MQTT2_0, HEAD
version 2.0

    1: /*************************************************************************
    2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
    3: *  by Michael Pounov <misho@openbsd-bg.org>
    4: *
    5: * $Author: misho $
    6: * $Id: pubmqtt.c,v 1.5 2017/10/08 22:49:25 misho Exp $
    7: *
    8: **************************************************************************
    9: The ELWIX and AITNET software is distributed under the following
   10: terms:
   11: 
   12: All of the documentation and software included in the ELWIX and AITNET
   13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
   14: 
   15: Copyright 2004 - 2017
   16: 	by Michael Pounov <misho@elwix.org>.  All rights reserved.
   17: 
   18: Redistribution and use in source and binary forms, with or without
   19: modification, are permitted provided that the following conditions
   20: are met:
   21: 1. Redistributions of source code must retain the above copyright
   22:    notice, this list of conditions and the following disclaimer.
   23: 2. Redistributions in binary form must reproduce the above copyright
   24:    notice, this list of conditions and the following disclaimer in the
   25:    documentation and/or other materials provided with the distribution.
   26: 3. All advertising materials mentioning features or use of this software
   27:    must display the following acknowledgement:
   28: This product includes software developed by Michael Pounov <misho@elwix.org>
   29: ELWIX - Embedded LightWeight unIX and its contributors.
   30: 4. Neither the name of AITNET nor the names of its contributors
   31:    may be used to endorse or promote products derived from this software
   32:    without specific prior written permission.
   33: 
   34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
   35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
   36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
   37: ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
   38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
   39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
   40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
   41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
   43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   44: SUCH DAMAGE.
   45: */
   46: #include "global.h"
   47: 
   48: 
   49: extern const char sql_schema[];
   50: 
   51: 
   52: /*
   53:  * mqtt_db_log() Log database connection message
   54:  *
   55:  * @fmt = format string
   56:  * @... = argument list
   57:  * return: none
   58:  */
   59: static void
   60: mqtt_rtlm_log(const char *fmt, ...)
   61: {
   62: 	va_list lst;
   63: 
   64: 	va_start(lst, fmt);
   65: 	vsyslog(LOG_ERR, fmt, lst);
   66: 	va_end(lst);
   67: }
   68: #define MQTT_RTLM_LOG(_sql)	(assert((_sql)), mqtt_rtlm_log("Error:: %s(%d) SQL #%d - %s", \
   69: 					__func__, __LINE__, \
   70: 					sqlite3_errcode((_sql)), sqlite3_errmsg((_sql))))
   71: 
   72: /* library pre-loaded actions */
   73: void
   74: _init()
   75: {
   76: 	sqlite3_initialize();
   77: }
   78: 
   79: void
   80: _fini()
   81: {
   82: 	sqlite3_shutdown();
   83: }
   84: 
   85: 
   86: /*
   87:  * mqtt_rtlm_open() Open database connection
   88:  *
   89:  * @cfg = config filename
   90:  * return: NULL error or SQL handle
   91:  */
   92: sqlite3 *
   93: mqtt_rtlm_open(cfg_root_t *cfg)
   94: {
   95: 	sqlite3 *sql = NULL;
   96: 	const char *str = NULL;
   97: 
   98: 	if (!cfg)
   99: 		return NULL;
  100: 
  101: 	str = cfg_getAttribute(cfg, "mqtt_pub", "name");
  102: 	if (!str) {
  103: 		mqtt_rtlm_log("Error:: Unknown database name ...\n");
  104: 		return NULL;
  105: 	}
  106: 
  107: 	if (sqlite3_open_v2(str, &sql, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL)) {
  108: 		MQTT_RTLM_LOG(sql);
  109: 		sqlite3_close(sql);
  110: 		return NULL;
  111: 	}
  112: 
  113: 	if (sqlite3_exec(sql, sql_schema, NULL, NULL, NULL)) {
  114: 		MQTT_RTLM_LOG(sql);
  115: 		sqlite3_close(sql);
  116: 		return NULL;
  117: 	}
  118: 	return sql;
  119: }
  120: 
  121: /*
  122:  * mqtt_rtlm_close() Close database connection
  123:  *
  124:  * @sql = SQL handle
  125:  * return: none
  126:  */
  127: void
  128: mqtt_rtlm_close(sqlite3 *sql)
  129: {
  130: 	sqlite3_close(sql);
  131: }
  132: 
  133: /*
  134:  * mqtt_rtlm_init_session() Create session
  135:  *
  136:  * @cfg = loaded config
  137:  * @sql = SQL handle
  138:  * @connid = connection id
  139:  * @user = username
  140:  * @host = hostname
  141:  * @will = will flag if !=0 must fill arguments
  142:  * @... = will arguments in order topic,msg,qos,retain
  143:  * return: -1 error, 0 session already appears or >0 row changed
  144:  */
  145: int
  146: mqtt_rtlm_init_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, 
  147: 		const char *host, int will, ...)
  148: {
  149: 	va_list lst;
  150: 	int ret = 0;
  151: 	char *str, *psStmt;
  152: 	sqlite3_stmt *stmt;
  153: 
  154: 	if (!cfg || !sql)
  155: 		return -1;
  156: 
  157: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
  158: 	if (!str) {
  159: 		mqtt_rtlm_log("Error:: not found online table name");
  160: 		return -1;
  161: 	}
  162: 	if (!will)
  163: 		psStmt = sqlite3_mprintf("INSERT INTO %s (ConnID, Username, RemoteHost, "
  164: 				"WillFlag) VALUES ('%q', '%q', '%q', 0);", str, connid, user, host);
  165: 	else {
  166: 		va_start(lst, will);
  167: 		psStmt = sqlite3_mprintf("INSERT INTO %s (ConnID, Username, RemoteHost, "
  168: 				"WillFlag, WillRetain, WillQoS, WillMsg, WillTopic) "
  169: 				"VALUES ('%q', '%q', '%q', %d, %d, %d, '%q', '%q');", 
  170: 				str, connid, user, host, will, 
  171: 				va_arg(lst, int), va_arg(lst, int), va_arg(lst, char*), va_arg(lst, char*));
  172: 		va_end(lst);
  173: 	}
  174: 
  175: 	if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
  176: 		MQTT_RTLM_LOG(sql);
  177: 		sqlite3_free(psStmt);
  178: 		return -1;
  179: 	} else
  180: 		sqlite3_free(psStmt);
  181: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  182: 		ret = sqlite3_changes(sql);
  183: 	else {
  184: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  185: 			MQTT_RTLM_LOG(sql);
  186: 		ret = 0;
  187: 	}
  188: 	sqlite3_finalize(stmt);
  189: 
  190: 	return ret;
  191: }
  192: 
  193: /*
  194:  * mqtt_rtlm_fini_session() Delete session(s)
  195:  *
  196:  * @cfg = loaded config
  197:  * @sql = SQL handle
  198:  * @connid = connection id
  199:  * @user = username
  200:  * @host = hostname
  201:  * return: -1 error, 0 session already appears or >0 row changed
  202:  */
  203: int
  204: mqtt_rtlm_fini_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
  205: {
  206: 	int ret = 0;
  207: 	char *str, *psStmt;
  208: 	sqlite3_stmt *stmt;
  209: 
  210: 	if (!cfg || !sql)
  211: 		return -1;
  212: 
  213: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
  214: 	if (!str) {
  215: 		mqtt_rtlm_log("Error:: not found online table name");
  216: 		return -1;
  217: 	}
  218: 	psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND Username = '%q' "
  219: 			"AND RemoteHost LIKE '%q';", str, connid, user, host);
  220: 
  221: 	if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
  222: 		MQTT_RTLM_LOG(sql);
  223: 		sqlite3_free(psStmt);
  224: 		return -1;
  225: 	} else
  226: 		sqlite3_free(psStmt);
  227: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  228: 		ret = sqlite3_changes(sql);
  229: 	else {
  230: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  231: 			MQTT_RTLM_LOG(sql);
  232: 		ret = 0;
  233: 	}
  234: 	sqlite3_finalize(stmt);
  235: 
  236: 	return ret;
  237: }
  238: 
  239: /*
  240:  * mqtt_rtlm_chk_session() Check session(s)
  241:  *
  242:  * @cfg = loaded config
  243:  * @sql = SQL handle
  244:  * @connid = connection id
  245:  * @user = username
  246:  * @host = hostname
  247:  * return: -1 error, 0 not logged or >0 logged found rows
  248:  */
  249: int
  250: mqtt_rtlm_chk_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
  251: {
  252: 	int ret = 0;
  253: 	char *str, *psStmt;
  254: 	sqlite3_stmt *stmt;
  255: 
  256: 	if (!cfg || !sql)
  257: 		return -1;
  258: 
  259: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
  260: 	if (!str) {
  261: 		mqtt_rtlm_log("Error:: not found online table name");
  262: 		return -1;
  263: 	}
  264: 	psStmt = sqlite3_mprintf("SELECT ConnID, RemoteHost FROM %s WHERE "
  265: 			"ConnID = '%q' AND Username LIKE '%q' AND RemoteHost LIKE '%q';", 
  266: 			str, connid, user, host);
  267: 
  268: 	if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
  269: 		MQTT_RTLM_LOG(sql);
  270: 		sqlite3_free(psStmt);
  271: 		return -1;
  272: 	} else
  273: 		sqlite3_free(psStmt);
  274: 	if (sqlite3_step(stmt) == SQLITE_ROW)
  275: 		ret = sqlite3_changes(sql);
  276: 	else
  277: 		ret = 0;
  278: 	sqlite3_finalize(stmt);
  279: 
  280: 	return ret;
  281: }
  282: 
  283: /*
  284:  * mqtt_rtlm_write_topic() Publish topic
  285:  *
  286:  * @cfg = loaded config
  287:  * @sql = SQL handle
  288:  * @connid = connection id
  289:  * @msgid = MessageID
  290:  * @topic = topic
  291:  * @txt = text
  292:  * @txtlen = text length
  293:  * @user = username
  294:  * @host = hostname
  295:  * @qos = QoS
  296:  * @retain = !=0 retain message to database
  297:  * return: -1 error, 0 no publish or >0 published ok
  298:  */
  299: int
  300: mqtt_rtlm_write_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
  301: 		const char *topic, void *txt, int txtlen, const char *user, 
  302: 		const char *host, char qos, char retain)
  303: {
  304: 	int ret = 0;
  305: 	char *str, *psStmt;
  306: 	sqlite3_stmt *stmt;
  307: 
  308: 	if (!cfg || !sql || !topic)
  309: 		return -1;
  310: 
  311: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
  312: 	if (!str) {
  313: 		mqtt_rtlm_log("Error:: not found topics table name");
  314: 		return -1;
  315: 	}
  316: 	psStmt = sqlite3_mprintf("INSERT INTO %s (QoS, Retain, ConnID, MsgID, Topic, Value, PubUser, "
  317: 			"PubDate, PubHost) VALUES (%d, %d, '%q', %u, '%q', ?1, '%q', "
  318: 			"datetime('now', 'localtime'), '%q');", 
  319: 			str, qos, retain, connid, msgid, topic, user, host);
  320: 
  321: 	if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL) || !stmt) {
  322: 		MQTT_RTLM_LOG(sql);
  323: 		sqlite3_free(psStmt);
  324: 		return -1;
  325: 	} else
  326: 		sqlite3_free(psStmt);
  327: 	if (sqlite3_bind_blob(stmt, 1, txt, txtlen, SQLITE_TRANSIENT)) {
  328: 		MQTT_RTLM_LOG(sql);
  329: 		sqlite3_finalize(stmt);
  330: 		return -1;
  331: 	}
  332: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  333: 		ret = sqlite3_changes(sql);
  334: 	else {
  335: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  336: 			MQTT_RTLM_LOG(sql);
  337: 		ret = 0;
  338: 	}
  339: 	sqlite3_finalize(stmt);
  340: 
  341: 	return ret;
  342: }
  343: 
  344: /*
  345:  * mqtt_rtlm_wipe_topic() Wipe all topics
  346:  *
  347:  * @cfg = loaded config
  348:  * @sql = SQL handle
  349:  * @connid = connection id
  350:  * @user = username
  351:  * @retain = -1 no matter
  352:  * return: -1 error, 0 no changes or >0 deleted rows
  353:  */
  354: int
  355: mqtt_rtlm_wipe_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, char retain)
  356: {
  357: 	int ret = 0;
  358: 	char *str, *rtn, *psStmt;
  359: 	sqlite3_stmt *stmt;
  360: 
  361: 	if (!cfg || !sql || !connid)
  362: 		return -1;
  363: 
  364: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
  365: 	if (!str) {
  366: 		mqtt_rtlm_log("Error:: not found topics table name");
  367: 		return -1;
  368: 	}
  369: 	switch (retain) {
  370: 		case -1:
  371: 			rtn = "";
  372: 			break;
  373: 		case 0:
  374: 			rtn = "AND Retain = 0";
  375: 			break;
  376: 		default:
  377: 			rtn = "AND Retain != 0";
  378: 			break;
  379: 	}
  380: 	psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND "
  381: 			"PubUser LIKE '%q' %s;", str, connid, user, rtn);
  382: 
  383: 	if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
  384: 		MQTT_RTLM_LOG(sql);
  385: 		sqlite3_free(psStmt);
  386: 		return -1;
  387: 	} else
  388: 		sqlite3_free(psStmt);
  389: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  390: 		ret = sqlite3_changes(sql);
  391: 	else {
  392: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  393: 			MQTT_RTLM_LOG(sql);
  394: 		ret = 0;
  395: 	}
  396: 	sqlite3_finalize(stmt);
  397: 
  398: 	return ret;
  399: }
  400: 
  401: /*
  402:  * mqtt_rtlm_delete_topic() Delete topic
  403:  *
  404:  * @cfg = loaded config
  405:  * @sql = SQL handle
  406:  * @connid = connection id
  407:  * @msgid = MessageID
  408:  * @topic = topic
  409:  * @user = username
  410:  * @host = hostname
  411:  * @retain = -1 no matter
  412:  * return: -1 error, 0 no changes or >0 deleted rows
  413:  */
  414: int
  415: mqtt_rtlm_delete_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
  416: 		const char *topic, const char *user, const char *host, char retain)
  417: {
  418: 	int ret = 0;
  419: 	char *str, *rtn, *psStmt;
  420: 	sqlite3_stmt *stmt;
  421: 
  422: 	if (!cfg || !sql || !topic)
  423: 		return -1;
  424: 
  425: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
  426: 	if (!str) {
  427: 		mqtt_rtlm_log("Error:: not found topics table name");
  428: 		return -1;
  429: 	}
  430: 	switch (retain) {
  431: 		case -1:
  432: 			rtn = "";
  433: 			break;
  434: 		case 0:
  435: 			rtn = "AND Retain = 0";
  436: 			break;
  437: 		default:
  438: 			rtn = "AND Retain != 0";
  439: 			break;
  440: 	}
  441: 	psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND MsgID = %d AND "
  442: 			"Topic LIKE '%q' AND PubUser LIKE '%q' AND PubHost LIKE '%q' %s;", str, 
  443: 			connid, msgid, topic, user, host, rtn);
  444: 
  445: 	if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
  446: 		MQTT_RTLM_LOG(sql);
  447: 		sqlite3_free(psStmt);
  448: 		return -1;
  449: 	} else
  450: 		sqlite3_free(psStmt);
  451: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  452: 		ret = sqlite3_changes(sql);
  453: 	else {
  454: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  455: 			MQTT_RTLM_LOG(sql);
  456: 		ret = 0;
  457: 	}
  458: 	sqlite3_finalize(stmt);
  459: 
  460: 	return ret;
  461: }
  462: 
  463: /*
  464:  * mqtt_rtlm_read_topic() Get topic
  465:  *
  466:  * @cfg = loaded config
  467:  * @sql = SQL handle
  468:  * @connid = connection id
  469:  * @topic = topic
  470:  * @retain = retain 0 get only dynamic, >0 get only retained and -1 no matter
  471:  * return: NULL error or not found and !=NULL allocated subscribe topics
  472:  */
  473: mqtt_subscr_t *
  474: mqtt_rtlm_read_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, 
  475: 		const char *topic, char retain)
  476: {
  477: 	int rowz = 0;
  478: 	char *str, szStr[STRSIZ], *psStmt;
  479: 	sqlite3_stmt *stmt;
  480: 	register int j;
  481: 	mqtt_subscr_t *s = NULL;
  482: 	ait_val_t v;
  483: 
  484: 	if (!cfg || !sql || !topic)
  485: 		return NULL;
  486: 
  487: 	switch (retain) {
  488: 		case -1:
  489: 			memset(szStr, 0, sizeof szStr);
  490: 			break;
  491: 		case 0:
  492: 			snprintf(szStr, sizeof szStr, "AND Retain = 0");
  493: 			break;
  494: 		default:
  495: 			snprintf(szStr, sizeof szStr, "AND Retain > 0");
  496: 			break;
  497: 	}
  498: 
  499: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
  500: 	if (!str) {
  501: 		mqtt_rtlm_log("Error:: not found topics table name");
  502: 		return NULL;
  503: 	}
  504: 	psStmt = sqlite3_mprintf("SELECT QoS, Topic, Value  FROM %s WHERE "
  505: 			"ConnID LIKE '%q' AND Topic LIKE '%q' %s;", 
  506: 			str, connid, topic, szStr);
  507: 
  508: 	if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
  509: 		MQTT_RTLM_LOG(sql);
  510: 		sqlite3_free(psStmt);
  511: 		return NULL;
  512: 	} else
  513: 		sqlite3_free(psStmt);
  514: 
  515: 	/* calculate count of rows and allocate subscribe items */
  516: 	while (sqlite3_step(stmt) == SQLITE_ROW)
  517: 		rowz++;
  518: 	if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
  519: 		mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
  520: 		goto end;
  521: 	} else
  522: 		memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
  523: 	sqlite3_reset(stmt);
  524: 
  525: 	/* fill with data */
  526: 	for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
  527: 		s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
  528: 		s[j].sub_topic.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
  529: 		s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
  530: 		AIT_SET_PTR(&v, (void*) sqlite3_column_blob(stmt, 2), sqlite3_column_bytes(stmt, 2));
  531: 		s[j].sub_value.msg_len = AIT_LEN(&v);
  532: 		s[j].sub_value.msg_base = (u_char*) malloc(s[j].sub_value.msg_len);
  533: 		if (s[j].sub_value.msg_base)
  534: 			memcpy(s[j].sub_value.msg_base, AIT_GET_PTR(&v), s[j].sub_value.msg_len);
  535: 		AIT_FREE_VAL(&v);
  536: 	}
  537: end:
  538: 	sqlite3_finalize(stmt);
  539: 
  540: 	return s;
  541: }
  542: 
  543: /*
  544:  * mqtt_rtlm_write_subscribe() Subscribe topic
  545:  *
  546:  * @cfg = loaded config
  547:  * @sql = SQL handle
  548:  * @connid = connection id
  549:  * @msgid = MessageID
  550:  * @topic = topic
  551:  * @user = username
  552:  * @host = hostname
  553:  * @qos = Subscribe QoS
  554:  * return: -1 error, 0 no publish or >0 published ok
  555:  */
  556: int
  557: mqtt_rtlm_write_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
  558: 		const char *topic, const char *user, const char *host, char qos)
  559: {
  560: 	int ret = 0;
  561: 	char *str, *psStmt;
  562: 	sqlite3_stmt *stmt;
  563: 
  564: 	if (!cfg || !sql || !topic)
  565: 		return -1;
  566: 
  567: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
  568: 	if (!str) {
  569: 		mqtt_rtlm_log("Error:: not found subscribes table name");
  570: 		return -1;
  571: 	}
  572: 	psStmt = sqlite3_mprintf("INSERT INTO %s (ConnID, MsgID, QoS, Topic, PubUser, "
  573: 			"PubDate, PubHost) VALUES ('%q', %d, %d, '%q', '%q', "
  574: 			"datetime('now', 'localtime'), '%q');", str, 
  575: 			connid, msgid, qos, topic, user, host);
  576: 
  577: 	if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
  578: 		MQTT_RTLM_LOG(sql);
  579: 		sqlite3_free(psStmt);
  580: 		return -1;
  581: 	} else
  582: 		sqlite3_free(psStmt);
  583: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  584: 		ret = sqlite3_changes(sql);
  585: 	else {
  586: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  587: 			MQTT_RTLM_LOG(sql);
  588: 		ret = 0;
  589: 	}
  590: 	sqlite3_finalize(stmt);
  591: 
  592: 	return ret;
  593: }
  594: 
  595: /*
  596:  * mqtt_rtlm_delete_subscribe() Delete subscribe
  597:  *
  598:  * @cfg = loaded config
  599:  * @sql = SQL handle
  600:  * @connid = connection id
  601:  * @topic = topic
  602:  * @user = username
  603:  * @host = hostname
  604:  * return: -1 error, 0 no changes or >0 deleted rows
  605:  */
  606: int
  607: mqtt_rtlm_delete_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, 
  608: 		const char *topic, const char *user, const char *host)
  609: {
  610: 	int ret = 0;
  611: 	char *str, *psStmt;
  612: 	sqlite3_stmt *stmt;
  613: 
  614: 	if (!cfg || !sql || !topic)
  615: 		return -1;
  616: 
  617: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
  618: 	if (!str) {
  619: 		mqtt_rtlm_log("Error:: not found subscribes table name");
  620: 		return -1;
  621: 	}
  622: 	psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND "
  623: 			"Topic LIKE '%q' AND PubUser LIKE '%q' AND PubHost LIKE '%q';", str, 
  624: 			connid, topic, user, host);
  625: 
  626: 	if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
  627: 		MQTT_RTLM_LOG(sql);
  628: 		sqlite3_free(psStmt);
  629: 		return -1;
  630: 	} else
  631: 		sqlite3_free(psStmt);
  632: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  633: 		ret = sqlite3_changes(sql);
  634: 	else {
  635: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  636: 			MQTT_RTLM_LOG(sql);
  637: 		ret = 0;
  638: 	}
  639: 	sqlite3_finalize(stmt);
  640: 
  641: 	return ret;
  642: }
  643: 
  644: /*
  645:  * mqtt_rtlm_read_subscribe() Get subscribe topic
  646:  *
  647:  * @cfg = loaded config
  648:  * @sql = SQL handle
  649:  * @connid = connection id
  650:  * @topic = topic
  651:  * return: NULL error or not found and !=NULL allocated subscribe topics
  652:  */
  653: mqtt_subscr_t *
  654: mqtt_rtlm_read_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *topic)
  655: {
  656: 	int rowz = 0;
  657: 	char *str, *psStmt;
  658: 	sqlite3_stmt *stmt;
  659: 	register int j;
  660: 	mqtt_subscr_t *s = NULL;
  661: 
  662: 	if (!cfg || !sql || !topic)
  663: 		return NULL;
  664: 
  665: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
  666: 	if (!str) {
  667: 		mqtt_rtlm_log("Error:: not found subscribes table name");
  668: 		return NULL;
  669: 	}
  670: 	psStmt = sqlite3_mprintf("SELECT QoS, Topic FROM %s WHERE ConnID = '%q' AND "
  671: 			"Topic LIKE '%q';", str, connid, topic);
  672: 
  673: 	if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
  674: 		MQTT_RTLM_LOG(sql);
  675: 		sqlite3_free(psStmt);
  676: 		return NULL;
  677: 	} else
  678: 		sqlite3_free(psStmt);
  679: 
  680: 	/* calculate count of rows and allocate subscribe items */
  681: 	while (sqlite3_step(stmt) == SQLITE_ROW)
  682: 		rowz++;
  683: 	if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
  684: 		mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
  685: 		goto end;
  686: 	} else
  687: 		memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
  688: 	sqlite3_reset(stmt);
  689: 
  690: 	/* fill with data */
  691: 	for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
  692: 		s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
  693: 		s[j].sub_topic.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
  694: 		s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
  695: 		s[j].sub_value.msg_base = NULL;
  696: 		s[j].sub_value.msg_len = 0;
  697: 	}
  698: end:
  699: 	sqlite3_finalize(stmt);
  700: 
  701: 	return s;
  702: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>