File:  [ELWIX - Embedded LightWeight unIX -] / mqtt / src / pubmqtt.c
Revision 1.2.2.13: download - view: text, annotated - select for diffs - revision graph
Fri Jun 29 14:26:39 2012 UTC (12 years ago) by misho
Branches: mqtt1_1
fix allocs

    1: #include "global.h"
    2: 
    3: 
    4: extern const char sql_schema[];
    5: 
    6: 
    7: /*
    8:  * mqtt_db_log() Log database connection message
    9:  *
   10:  * @fmt = format string
   11:  * @... = argument list
   12:  * return: none
   13:  */
   14: static void
   15: mqtt_rtlm_log(const char *fmt, ...)
   16: {
   17: 	va_list lst;
   18: 
   19: 	va_start(lst, fmt);
   20: 	vsyslog(LOG_ERR, fmt, lst);
   21: 	va_end(lst);
   22: }
   23: #define MQTT_RTLM_LOG(_sql)	(assert((_sql)), mqtt_rtlm_log("Error:: %s(%d) SQL #%d - %s", \
   24: 					__func__, __LINE__, \
   25: 					sqlite3_errcode((_sql)), sqlite3_errmsg((_sql))))
   26: 
   27: /* library pre-loaded actions */
   28: void
   29: _init()
   30: {
   31: 	sqlite3_initialize();
   32: }
   33: 
   34: void
   35: _fini()
   36: {
   37: 	sqlite3_shutdown();
   38: }
   39: 
   40: 
   41: /*
   42:  * mqtt_rtlm_open() Open database connection
   43:  *
   44:  * @cfg = config filename
   45:  * return: NULL error or SQL handle
   46:  */
   47: sqlite3 *
   48: mqtt_rtlm_open(cfg_root_t *cfg)
   49: {
   50: 	sqlite3 *sql = NULL;
   51: 	const char *str = NULL;
   52: 
   53: 	if (!cfg)
   54: 		return NULL;
   55: 
   56: 	str = cfg_getAttribute(cfg, "mqtt_pub", "name");
   57: 	if (!str) {
   58: 		mqtt_rtlm_log("Error:: Unknown database name ...\n");
   59: 		return NULL;
   60: 	}
   61: 
   62: 	if (sqlite3_open_v2(str, &sql, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL)) {
   63: 		MQTT_RTLM_LOG(sql);
   64: 		sqlite3_close(sql);
   65: 		return NULL;
   66: 	}
   67: 
   68: 	if (sqlite3_exec(sql, sql_schema, NULL, NULL, NULL)) {
   69: 		MQTT_RTLM_LOG(sql);
   70: 		sqlite3_close(sql);
   71: 		return NULL;
   72: 	}
   73: 	return sql;
   74: }
   75: 
   76: /*
   77:  * mqtt_rtlm_close() Close database connection
   78:  *
   79:  * @sql = SQL handle
   80:  * return: none
   81:  */
   82: void
   83: mqtt_rtlm_close(sqlite3 *sql)
   84: {
   85: 	sqlite3_close(sql);
   86: }
   87: 
   88: /*
   89:  * mqtt_rtlm_init_session() Create session
   90:  *
   91:  * @cfg = loaded config
   92:  * @sql = SQL handle
   93:  * @connid = connection id
   94:  * @user = username
   95:  * @host = hostname
   96:  * @will = will flag if !=0 must fill arguments
   97:  * @... = will arguments in order topic,msg,qos,retain
   98:  * return: -1 error, 0 session already appears or >0 row changed
   99:  */
  100: int
  101: mqtt_rtlm_init_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, 
  102: 		const char *host, char will, ...)
  103: {
  104: 	va_list lst;
  105: 	int ret = 0;
  106: 	char *str, *psStmt;
  107: 	sqlite3_stmt *stmt;
  108: 
  109: 	if (!cfg || !sql)
  110: 		return -1;
  111: 
  112: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
  113: 	if (!str) {
  114: 		mqtt_rtlm_log("Error:: not found online table name");
  115: 		return -1;
  116: 	}
  117: 	if (!will)
  118: 		psStmt = sqlite3_mprintf("INSERT INTO %s (ConnID, Username, RemoteHost, "
  119: 				"WillFlag) VALUES ('%q', '%q', '%q', 0);", str, connid, user, host);
  120: 	else {
  121: 		va_start(lst, will);
  122: 		psStmt = sqlite3_mprintf("INSERT INTO %s (ConnID, Username, RemoteHost, "
  123: 				"WillFlag, WillRetain, WillQoS, WillMsg, WillTopic) "
  124: 				"VALUES ('%q', '%q', '%q', %d, %d, %d, '%q', '%q');", 
  125: 				str, connid, user, host, will, 
  126: 				va_arg(lst, int), va_arg(lst, int), va_arg(lst, char*), va_arg(lst, char*));
  127: 		va_end(lst);
  128: 	}
  129: 
  130: 	if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
  131: 		MQTT_RTLM_LOG(sql);
  132: 		sqlite3_free(psStmt);
  133: 		return -1;
  134: 	} else
  135: 		sqlite3_free(psStmt);
  136: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  137: 		ret = sqlite3_changes(sql);
  138: 	else {
  139: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  140: 			MQTT_RTLM_LOG(sql);
  141: 		ret = 0;
  142: 	}
  143: 	sqlite3_finalize(stmt);
  144: 
  145: 	return ret;
  146: }
  147: 
  148: /*
  149:  * mqtt_rtlm_fini_session() Delete session(s)
  150:  *
  151:  * @cfg = loaded config
  152:  * @sql = SQL handle
  153:  * @connid = connection id
  154:  * @user = username
  155:  * @host = hostname
  156:  * return: -1 error, 0 session already appears or >0 row changed
  157:  */
  158: int
  159: mqtt_rtlm_fini_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
  160: {
  161: 	int ret = 0;
  162: 	char *str, *psStmt;
  163: 	sqlite3_stmt *stmt;
  164: 
  165: 	if (!cfg || !sql)
  166: 		return -1;
  167: 
  168: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
  169: 	if (!str) {
  170: 		mqtt_rtlm_log("Error:: not found online table name");
  171: 		return -1;
  172: 	}
  173: 	psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND Username = '%q' "
  174: 			"AND RemoteHost LIKE '%q';", str, connid, user, host);
  175: 
  176: 	if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
  177: 		MQTT_RTLM_LOG(sql);
  178: 		sqlite3_free(psStmt);
  179: 		return -1;
  180: 	} else
  181: 		sqlite3_free(psStmt);
  182: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  183: 		ret = sqlite3_changes(sql);
  184: 	else {
  185: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  186: 			MQTT_RTLM_LOG(sql);
  187: 		ret = 0;
  188: 	}
  189: 	sqlite3_finalize(stmt);
  190: 
  191: 	return ret;
  192: }
  193: 
  194: /*
  195:  * mqtt_rtlm_chk_session() Check session(s)
  196:  *
  197:  * @cfg = loaded config
  198:  * @sql = SQL handle
  199:  * @connid = connection id
  200:  * @user = username
  201:  * @host = hostname
  202:  * return: -1 error, 0 not logged or >0 logged found rows
  203:  */
  204: int
  205: mqtt_rtlm_chk_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
  206: {
  207: 	int ret = 0;
  208: 	char *str, *psStmt;
  209: 	sqlite3_stmt *stmt;
  210: 
  211: 	if (!cfg || !sql)
  212: 		return -1;
  213: 
  214: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
  215: 	if (!str) {
  216: 		mqtt_rtlm_log("Error:: not found online table name");
  217: 		return -1;
  218: 	}
  219: 	psStmt = sqlite3_mprintf("SELECT ConnID, RemoteHost FROM %s WHERE "
  220: 			"ConnID = '%q' AND Username LIKE '%q' AND RemoteHost LIKE '%q';", 
  221: 			str, connid, user, host);
  222: 
  223: 	if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
  224: 		MQTT_RTLM_LOG(sql);
  225: 		sqlite3_free(psStmt);
  226: 		return -1;
  227: 	} else
  228: 		sqlite3_free(psStmt);
  229: 	if (sqlite3_step(stmt) == SQLITE_ROW)
  230: 		ret = sqlite3_changes(sql);
  231: 	else
  232: 		ret = 0;
  233: 	sqlite3_finalize(stmt);
  234: 
  235: 	return ret;
  236: }
  237: 
  238: /*
  239:  * mqtt_rtlm_write_topic() Publish topic
  240:  *
  241:  * @cfg = loaded config
  242:  * @sql = SQL handle
  243:  * @connid = connection id
  244:  * @msgid = MessageID
  245:  * @topic = topic
  246:  * @txt = text
  247:  * @txtlen = text length
  248:  * @user = username
  249:  * @host = hostname
  250:  * @qos = QoS
  251:  * @retain = !=0 retain message to database
  252:  * return: -1 error, 0 no publish or >0 published ok
  253:  */
  254: int
  255: mqtt_rtlm_write_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
  256: 		const char *topic, void *txt, int txtlen, const char *user, 
  257: 		const char *host, char qos, char retain)
  258: {
  259: 	int ret = 0;
  260: 	char *str, *psStmt;
  261: 	sqlite3_stmt *stmt;
  262: 
  263: 	if (!cfg || !sql || !topic)
  264: 		return -1;
  265: 
  266: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
  267: 	if (!str) {
  268: 		mqtt_rtlm_log("Error:: not found topics table name");
  269: 		return -1;
  270: 	}
  271: 	psStmt = sqlite3_mprintf("INSERT INTO %s (QoS, Retain, ConnID, MsgID, Topic, Value, PubUser, "
  272: 			"PubDate, PubHost) VALUES (%d, %d, '%q', %u, '%q', ?1, '%q', "
  273: 			"datetime('now', 'localtime'), '%q');", 
  274: 			str, qos, retain, connid, msgid, topic, user, host);
  275: 
  276: 	if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL) || !stmt) {
  277: 		MQTT_RTLM_LOG(sql);
  278: 		sqlite3_free(psStmt);
  279: 		return -1;
  280: 	} else
  281: 		sqlite3_free(psStmt);
  282: 	if (sqlite3_bind_blob(stmt, 1, txt, txtlen, SQLITE_TRANSIENT)) {
  283: 		MQTT_RTLM_LOG(sql);
  284: 		sqlite3_finalize(stmt);
  285: 		return -1;
  286: 	}
  287: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  288: 		ret = sqlite3_changes(sql);
  289: 	else {
  290: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  291: 			MQTT_RTLM_LOG(sql);
  292: 		ret = 0;
  293: 	}
  294: 	sqlite3_finalize(stmt);
  295: 
  296: 	return ret;
  297: }
  298: 
  299: /*
  300:  * mqtt_rtlm_wipe_topic() Wipe all topics
  301:  *
  302:  * @cfg = loaded config
  303:  * @sql = SQL handle
  304:  * @connid = connection id
  305:  * @user = username
  306:  * @retain = -1 no matter
  307:  * return: -1 error, 0 no changes or >0 deleted rows
  308:  */
  309: int
  310: mqtt_rtlm_wipe_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, char retain)
  311: {
  312: 	int ret = 0;
  313: 	char *str, *rtn, *psStmt;
  314: 	sqlite3_stmt *stmt;
  315: 
  316: 	if (!cfg || !sql || !connid)
  317: 		return -1;
  318: 
  319: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
  320: 	if (!str) {
  321: 		mqtt_rtlm_log("Error:: not found topics table name");
  322: 		return -1;
  323: 	}
  324: 	switch (retain) {
  325: 		case -1:
  326: 			rtn = "";
  327: 			break;
  328: 		case 0:
  329: 			rtn = "AND Retain = 0";
  330: 			break;
  331: 		default:
  332: 			rtn = "AND Retain != 0";
  333: 			break;
  334: 	}
  335: 	psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND "
  336: 			"PubUser LIKE '%q' %s;", str, connid, user, rtn);
  337: 
  338: 	if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
  339: 		MQTT_RTLM_LOG(sql);
  340: 		sqlite3_free(psStmt);
  341: 		return -1;
  342: 	} else
  343: 		sqlite3_free(psStmt);
  344: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  345: 		ret = sqlite3_changes(sql);
  346: 	else {
  347: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  348: 			MQTT_RTLM_LOG(sql);
  349: 		ret = 0;
  350: 	}
  351: 	sqlite3_finalize(stmt);
  352: 
  353: 	return ret;
  354: }
  355: 
  356: /*
  357:  * mqtt_rtlm_delete_topic() Delete topic
  358:  *
  359:  * @cfg = loaded config
  360:  * @sql = SQL handle
  361:  * @connid = connection id
  362:  * @msgid = MessageID
  363:  * @topic = topic
  364:  * @user = username
  365:  * @host = hostname
  366:  * @retain = -1 no matter
  367:  * return: -1 error, 0 no changes or >0 deleted rows
  368:  */
  369: int
  370: mqtt_rtlm_delete_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
  371: 		const char *topic, const char *user, const char *host, char retain)
  372: {
  373: 	int ret = 0;
  374: 	char *str, *rtn, *psStmt;
  375: 	sqlite3_stmt *stmt;
  376: 
  377: 	if (!cfg || !sql || !topic)
  378: 		return -1;
  379: 
  380: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
  381: 	if (!str) {
  382: 		mqtt_rtlm_log("Error:: not found topics table name");
  383: 		return -1;
  384: 	}
  385: 	switch (retain) {
  386: 		case -1:
  387: 			rtn = "";
  388: 			break;
  389: 		case 0:
  390: 			rtn = "AND Retain = 0";
  391: 			break;
  392: 		default:
  393: 			rtn = "AND Retain != 0";
  394: 			break;
  395: 	}
  396: 	psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND MsgID = %d AND "
  397: 			"Topic LIKE '%q' AND PubUser LIKE '%q' AND PubHost LIKE '%q' %s;", str, 
  398: 			connid, msgid, topic, user, host, rtn);
  399: 
  400: 	if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
  401: 		MQTT_RTLM_LOG(sql);
  402: 		sqlite3_free(psStmt);
  403: 		return -1;
  404: 	} else
  405: 		sqlite3_free(psStmt);
  406: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  407: 		ret = sqlite3_changes(sql);
  408: 	else {
  409: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  410: 			MQTT_RTLM_LOG(sql);
  411: 		ret = 0;
  412: 	}
  413: 	sqlite3_finalize(stmt);
  414: 
  415: 	return ret;
  416: }
  417: 
  418: /*
  419:  * mqtt_rtlm_read_topic() Get topic
  420:  *
  421:  * @cfg = loaded config
  422:  * @sql = SQL handle
  423:  * @connid = connection id
  424:  * @msgid = MessageID
  425:  * @topic = topic
  426:  * @retain = retain 0 get only dynamic, >0 get only retained and -1 no matter
  427:  * return: NULL error or not found and !=NULL allocated subscribe topics
  428:  */
  429: mqtt_subscr_t *
  430: mqtt_rtlm_read_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
  431: 		const char *topic, char retain)
  432: {
  433: 	int rowz = 0;
  434: 	char *str, szStr[STRSIZ], *psStmt;
  435: 	sqlite3_stmt *stmt;
  436: 	register int j;
  437: 	mqtt_subscr_t *s = NULL;
  438: 	ait_val_t v;
  439: 
  440: 	if (!cfg || !sql || !topic)
  441: 		return NULL;
  442: 
  443: 	switch (retain) {
  444: 		case -1:
  445: 			memset(szStr, 0, sizeof szStr);
  446: 			break;
  447: 		case 0:
  448: 			snprintf(szStr, sizeof szStr, "AND Retain = 0");
  449: 			break;
  450: 		default:
  451: 			snprintf(szStr, sizeof szStr, "AND Retain > 0");
  452: 			break;
  453: 	}
  454: 
  455: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
  456: 	if (!str) {
  457: 		mqtt_rtlm_log("Error:: not found topics table name");
  458: 		return NULL;
  459: 	}
  460: 	psStmt = sqlite3_mprintf("SELECT Retain, Topic, Value FROM %s WHERE "
  461: 			"ConnID = '%q' AND MsgID = %d AND Topic LIKE '%q' %s;", 
  462: 			str, connid, msgid, topic, szStr);
  463: 
  464: 	if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
  465: 		MQTT_RTLM_LOG(sql);
  466: 		sqlite3_free(psStmt);
  467: 		return NULL;
  468: 	} else
  469: 		sqlite3_free(psStmt);
  470: 
  471: 	/* calculate count of rows and allocate subscribe items */
  472: 	while (sqlite3_step(stmt) == SQLITE_ROW)
  473: 		rowz++;
  474: 	if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
  475: 		mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
  476: 		goto end;
  477: 	} else
  478: 		memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
  479: 	sqlite3_reset(stmt);
  480: 
  481: 	/* fill with data */
  482: 	for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
  483: 		s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
  484: 		s[j].sub_topic.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
  485: 		s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
  486: 		AIT_SET_PTR(&v, (void*) sqlite3_column_blob(stmt, 2), sqlite3_column_bytes(stmt, 2));
  487: 		s[j].sub_value.msg_len = AIT_LEN(&v);
  488: 		s[j].sub_value.msg_base = (u_char*) malloc(s[j].sub_value.msg_len);
  489: 		if (s[j].sub_value.msg_base)
  490: 			memcpy(s[j].sub_value.msg_base, AIT_GET_PTR(&v), s[j].sub_value.msg_len);
  491: 	}
  492: end:
  493: 	sqlite3_finalize(stmt);
  494: 
  495: 	return s;
  496: }
  497: 
  498: /*
  499:  * mqtt_rtlm_write_subscribe() Subscribe topic
  500:  *
  501:  * @cfg = loaded config
  502:  * @sql = SQL handle
  503:  * @connid = connection id
  504:  * @msgid = MessageID
  505:  * @topic = topic
  506:  * @user = username
  507:  * @host = hostname
  508:  * @qos = Subscribe QoS
  509:  * return: -1 error, 0 no publish or >0 published ok
  510:  */
  511: int
  512: mqtt_rtlm_write_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
  513: 		const char *topic, const char *user, const char *host, char qos)
  514: {
  515: 	int ret = 0;
  516: 	char *str, *psStmt;
  517: 	sqlite3_stmt *stmt;
  518: 
  519: 	if (!cfg || !sql || !topic)
  520: 		return -1;
  521: 
  522: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
  523: 	if (!str) {
  524: 		mqtt_rtlm_log("Error:: not found subscribes table name");
  525: 		return -1;
  526: 	}
  527: 	psStmt = sqlite3_mprintf("INSERT INTO %s (ConnID, MsgID, QoS, Topic, PubUser, "
  528: 			"PubDate, PubHost) VALUES ('%q', %d, %d, '%q', '%q', "
  529: 			"datetime('now', 'localtime'), '%q');", str, 
  530: 			connid, msgid, qos, topic, user, host);
  531: 
  532: 	if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
  533: 		MQTT_RTLM_LOG(sql);
  534: 		sqlite3_free(psStmt);
  535: 		return -1;
  536: 	} else
  537: 		sqlite3_free(psStmt);
  538: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  539: 		ret = sqlite3_changes(sql);
  540: 	else {
  541: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  542: 			MQTT_RTLM_LOG(sql);
  543: 		ret = 0;
  544: 	}
  545: 	sqlite3_finalize(stmt);
  546: 
  547: 	return ret;
  548: }
  549: 
  550: /*
  551:  * mqtt_rtlm_delete_subscribe() Delete subscribe
  552:  *
  553:  * @cfg = loaded config
  554:  * @sql = SQL handle
  555:  * @connid = connection id
  556:  * @topic = topic
  557:  * @user = username
  558:  * @host = hostname
  559:  * return: -1 error, 0 no changes or >0 deleted rows
  560:  */
  561: int
  562: mqtt_rtlm_delete_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, 
  563: 		const char *topic, const char *user, const char *host)
  564: {
  565: 	int ret = 0;
  566: 	char *str, *psStmt;
  567: 	sqlite3_stmt *stmt;
  568: 
  569: 	if (!cfg || !sql || !topic)
  570: 		return -1;
  571: 
  572: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
  573: 	if (!str) {
  574: 		mqtt_rtlm_log("Error:: not found subscribes table name");
  575: 		return -1;
  576: 	}
  577: 	psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND "
  578: 			"Topic LIKE '%q' AND PubUser LIKE '%q' AND PubHost LIKE '%q';", str, 
  579: 			connid, topic, user, host);
  580: 
  581: 	if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
  582: 		MQTT_RTLM_LOG(sql);
  583: 		sqlite3_free(psStmt);
  584: 		return -1;
  585: 	} else
  586: 		sqlite3_free(psStmt);
  587: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  588: 		ret = sqlite3_changes(sql);
  589: 	else {
  590: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  591: 			MQTT_RTLM_LOG(sql);
  592: 		ret = 0;
  593: 	}
  594: 	sqlite3_finalize(stmt);
  595: 
  596: 	return ret;
  597: }
  598: 
  599: /*
  600:  * mqtt_rtlm_read_subscribe() Get subscribe topic
  601:  *
  602:  * @cfg = loaded config
  603:  * @sql = SQL handle
  604:  * @connid = connection id
  605:  * @topic = topic
  606:  * return: NULL error or not found and !=NULL allocated subscribe topics
  607:  */
  608: mqtt_subscr_t *
  609: mqtt_rtlm_read_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *topic)
  610: {
  611: 	int rowz = 0;
  612: 	char *str, *psStmt;
  613: 	sqlite3_stmt *stmt;
  614: 	register int j;
  615: 	mqtt_subscr_t *s = NULL;
  616: 
  617: 	if (!cfg || !sql || !topic)
  618: 		return NULL;
  619: 
  620: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
  621: 	if (!str) {
  622: 		mqtt_rtlm_log("Error:: not found subscribes table name");
  623: 		return NULL;
  624: 	}
  625: 	psStmt = sqlite3_mprintf("SELECT QoS, Topic FROM %s WHERE ConnID = '%q' AND "
  626: 			"Topic LIKE '%q';", str, connid, topic);
  627: 
  628: 	if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
  629: 		MQTT_RTLM_LOG(sql);
  630: 		sqlite3_free(psStmt);
  631: 		return NULL;
  632: 	} else
  633: 		sqlite3_free(psStmt);
  634: 
  635: 	/* calculate count of rows and allocate subscribe items */
  636: 	while (sqlite3_step(stmt) == SQLITE_ROW)
  637: 		rowz++;
  638: 	if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
  639: 		mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
  640: 		goto end;
  641: 	} else
  642: 		memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
  643: 	sqlite3_reset(stmt);
  644: 
  645: 	/* fill with data */
  646: 	for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
  647: 		s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
  648: 		s[j].sub_topic.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
  649: 		s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
  650: 		s[j].sub_value.msg_base = NULL;
  651: 		s[j].sub_value.msg_len = 0;
  652: 	}
  653: end:
  654: 	sqlite3_finalize(stmt);
  655: 
  656: 	return s;
  657: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>