File:  [ELWIX - Embedded LightWeight unIX -] / mqtt / src / pubmqtt.c
Revision 1.2.2.8: download - view: text, annotated - select for diffs - revision graph
Tue May 8 14:27:46 2012 UTC (12 years, 1 month ago) by misho
Branches: mqtt1_1
add new wipe sql api in rtlm module

    1: #include "global.h"
    2: 
    3: 
    4: extern const char sql_schema[];
    5: 
    6: 
    7: /*
    8:  * mqtt_db_log() Log database connection message
    9:  *
   10:  * @fmt = format string
   11:  * @... = argument list
   12:  * return: none
   13:  */
   14: static void
   15: mqtt_rtlm_log(const char *fmt, ...)
   16: {
   17: 	va_list lst;
   18: 
   19: 	va_start(lst, fmt);
   20: 	vsyslog(LOG_ERR, fmt, lst);
   21: 	va_end(lst);
   22: }
   23: #define MQTT_RTLM_LOG(_sql)	(assert((_sql)), mqtt_rtlm_log("Error:: %s(%d) SQL #%d - %s", \
   24: 					__func__, __LINE__, \
   25: 					sqlite3_errcode((_sql)), sqlite3_errmsg((_sql))))
   26: 
   27: 
   28: /*
   29:  * mqtt_rtlm_open() Open database connection
   30:  *
   31:  * @cfg = config filename
   32:  * return: NULL error or SQL handle
   33:  */
   34: sqlite3 *
   35: mqtt_rtlm_open(cfg_root_t *cfg)
   36: {
   37: 	sqlite3 *sql = NULL;
   38: 	const char *str = NULL;
   39: 
   40: 	if (!cfg)
   41: 		return NULL;
   42: 
   43: 	str = cfg_getAttribute(cfg, "mqtt_pub", "name");
   44: 	if (!str) {
   45: 		mqtt_rtlm_log("Error:: Unknown database name ...\n");
   46: 		return NULL;
   47: 	}
   48: 
   49: 	if (sqlite3_open_v2(str, &sql, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL)) {
   50: 		MQTT_RTLM_LOG(sql);
   51: 		sqlite3_close(sql);
   52: 		return NULL;
   53: 	}
   54: 
   55: 	if (sqlite3_exec(sql, sql_schema, NULL, NULL, NULL)) {
   56: 		MQTT_RTLM_LOG(sql);
   57: 		sqlite3_close(sql);
   58: 		return NULL;
   59: 	}
   60: 	return sql;
   61: }
   62: 
   63: /*
   64:  * mqtt_rtlm_close() Close database connection
   65:  *
   66:  * @sql = SQL handle
   67:  * return: none
   68:  */
   69: void
   70: mqtt_rtlm_close(sqlite3 *sql)
   71: {
   72: 	sqlite3_close(sql);
   73: }
   74: 
   75: /*
   76:  * mqtt_rtlm_init_session() Create session
   77:  *
   78:  * @cfg = loaded config
   79:  * @sql = SQL handle
   80:  * @connid = connection id
   81:  * @user = username
   82:  * @host = hostname
   83:  * @will = will flag if !=0 must fill arguments
   84:  * @... = will arguments in order topic,msg,qos,retain
   85:  * return: -1 error, 0 session already appears or >0 row changed
   86:  */
   87: int
   88: mqtt_rtlm_init_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, 
   89: 		const char *host, char will, ...)
   90: {
   91: 	va_list lst;
   92: 	int ret = 0;
   93: 	char *str, szStmt[BUFSIZ] = { 0 };
   94: 	sqlite3_stmt *stmt;
   95: 
   96: 	if (!cfg || !sql)
   97: 		return -1;
   98: 
   99: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
  100: 	if (!str) {
  101: 		mqtt_rtlm_log("Error:: not found online table name");
  102: 		return -1;
  103: 	}
  104: 	if (!will)
  105: 		snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (ConnID, Username, RemoteHost, "
  106: 				"WillFlag) VALUES ('%s', '%s', '%s', 0);", str, connid, user, host);
  107: 	else {
  108: 		va_start(lst, will);
  109: 		snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (ConnID, Username, RemoteHost, "
  110: 				"WillFlag, WillRetain, WillQoS, WillMsg, WillTopic) "
  111: 				"VALUES ('%s', '%s', '%s', %d, %d, %d, '%s', '%s');", 
  112: 				str, connid, user, host, will, 
  113: 				va_arg(lst, int), va_arg(lst, int), va_arg(lst, char*), va_arg(lst, char*));
  114: 		va_end(lst);
  115: 	}
  116: 
  117: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  118: 		MQTT_RTLM_LOG(sql);
  119: 		return -1;
  120: 	}
  121: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  122: 		ret = sqlite3_changes(sql);
  123: 	else {
  124: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  125: 			MQTT_RTLM_LOG(sql);
  126: 		ret = 0;
  127: 	}
  128: 	sqlite3_finalize(stmt);
  129: 
  130: 	return ret;
  131: }
  132: 
  133: /*
  134:  * mqtt_rtlm_fini_session() Delete session(s)
  135:  *
  136:  * @cfg = loaded config
  137:  * @sql = SQL handle
  138:  * @connid = connection id
  139:  * @user = username
  140:  * @host = hostname
  141:  * return: -1 error, 0 session already appears or >0 row changed
  142:  */
  143: int
  144: mqtt_rtlm_fini_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
  145: {
  146: 	int ret = 0;
  147: 	char *str, szStmt[BUFSIZ] = { 0 };
  148: 	sqlite3_stmt *stmt;
  149: 
  150: 	if (!cfg || !sql)
  151: 		return -1;
  152: 
  153: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
  154: 	if (!str) {
  155: 		mqtt_rtlm_log("Error:: not found online table name");
  156: 		return -1;
  157: 	}
  158: 	snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE ConnID = '%s' AND Username = '%s' "
  159: 			"AND RemoteHost LIKE '%s';", str, connid, user, host);
  160: 
  161: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  162: 		MQTT_RTLM_LOG(sql);
  163: 		return -1;
  164: 	}
  165: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  166: 		ret = sqlite3_changes(sql);
  167: 	else {
  168: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  169: 			MQTT_RTLM_LOG(sql);
  170: 		ret = 0;
  171: 	}
  172: 	sqlite3_finalize(stmt);
  173: 
  174: 	return ret;
  175: }
  176: 
  177: /*
  178:  * mqtt_rtlm_chk_session() Check session(s)
  179:  *
  180:  * @cfg = loaded config
  181:  * @sql = SQL handle
  182:  * @connid = connection id
  183:  * @user = username
  184:  * @host = hostname
  185:  * return: -1 error, 0 not logged or >0 logged found rows
  186:  */
  187: int
  188: mqtt_rtlm_chk_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
  189: {
  190: 	int ret = 0;
  191: 	char *str, szStmt[BUFSIZ] = { 0 };
  192: 	sqlite3_stmt *stmt;
  193: 
  194: 	if (!cfg || !sql)
  195: 		return -1;
  196: 
  197: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
  198: 	if (!str) {
  199: 		mqtt_rtlm_log("Error:: not found online table name");
  200: 		return -1;
  201: 	}
  202: 	snprintf(szStmt, sizeof szStmt, "SELECT ConnID, RemoteHost FROM %s WHERE "
  203: 			"ConnID = '%s' AND Username LIKE '%s' AND RemoteHost LIKE '%s';", 
  204: 			str, connid, user, host);
  205: 
  206: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  207: 		MQTT_RTLM_LOG(sql);
  208: 		return -1;
  209: 	}
  210: 	if (sqlite3_step(stmt) == SQLITE_ROW)
  211: 		ret = sqlite3_changes(sql);
  212: 	else
  213: 		ret = 0;
  214: 	sqlite3_finalize(stmt);
  215: 
  216: 	return ret;
  217: }
  218: 
  219: /*
  220:  * mqtt_rtlm_write_topic() Publish topic
  221:  *
  222:  * @cfg = loaded config
  223:  * @sql = SQL handle
  224:  * @connid = connection id
  225:  * @msgid = MessageID
  226:  * @topic = topic
  227:  * @txt = text
  228:  * @user = username
  229:  * @host = hostname
  230:  * @retain = !=0 retain message to database
  231:  * return: -1 error, 0 no publish or >0 published ok
  232:  */
  233: int
  234: mqtt_rtlm_write_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
  235: 		const char *topic, const char *txt, const char *user, const char *host, char retain)
  236: {
  237: 	int ret = 0;
  238: 	char *str, szStmt[BUFSIZ] = { 0 };
  239: 	sqlite3_stmt *stmt;
  240: 
  241: 	if (!cfg || !sql || !topic)
  242: 		return -1;
  243: 
  244: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
  245: 	if (!str) {
  246: 		mqtt_rtlm_log("Error:: not found topics table name");
  247: 		return -1;
  248: 	}
  249: 	snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Retain, ConnID, MsgID, Topic, Value, PubUser, "
  250: 			"PubDate, PubHost) VALUES (%d, '%s', %d, '%s', '%s', '%s', "
  251: 			"datetime('now', 'localtime'), '%s');", 
  252: 			str, retain, connid, msgid, topic, txt, user, host);
  253: 
  254: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  255: 		MQTT_RTLM_LOG(sql);
  256: 		return -1;
  257: 	}
  258: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  259: 		ret = sqlite3_changes(sql);
  260: 	else {
  261: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  262: 			MQTT_RTLM_LOG(sql);
  263: 		ret = 0;
  264: 	}
  265: 	sqlite3_finalize(stmt);
  266: 
  267: 	return ret;
  268: }
  269: 
  270: /*
  271:  * mqtt_rtlm_wipe_topic() Wipe all topics
  272:  *
  273:  * @cfg = loaded config
  274:  * @sql = SQL handle
  275:  * @connid = connection id
  276:  * @user = username
  277:  * @retain = -1 no matter
  278:  * return: -1 error, 0 no changes or >0 deleted rows
  279:  */
  280: int
  281: mqtt_rtlm_wipe_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, char retain)
  282: {
  283: 	int ret = 0;
  284: 	char *str, *rtn, szStmt[BUFSIZ] = { 0 };
  285: 	sqlite3_stmt *stmt;
  286: 
  287: 	if (!cfg || !sql || !connid)
  288: 		return -1;
  289: 
  290: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
  291: 	if (!str) {
  292: 		mqtt_rtlm_log("Error:: not found topics table name");
  293: 		return -1;
  294: 	}
  295: 	switch (retain) {
  296: 		case -1:
  297: 			rtn = "";
  298: 			break;
  299: 		case 0:
  300: 			rtn = "AND Retain = 0";
  301: 			break;
  302: 		default:
  303: 			rtn = "AND Retain != 0";
  304: 			break;
  305: 	}
  306: 	snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE ConnID = '%s' AND "
  307: 			"PubUser LIKE '%s' %s;", str, connid, user, rtn);
  308: 
  309: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  310: 		MQTT_RTLM_LOG(sql);
  311: 		return -1;
  312: 	}
  313: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  314: 		ret = sqlite3_changes(sql);
  315: 	else {
  316: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  317: 			MQTT_RTLM_LOG(sql);
  318: 		ret = 0;
  319: 	}
  320: 	sqlite3_finalize(stmt);
  321: 
  322: 	return ret;
  323: }
  324: 
  325: /*
  326:  * mqtt_rtlm_delete_topic() Delete topic
  327:  *
  328:  * @cfg = loaded config
  329:  * @sql = SQL handle
  330:  * @connid = connection id
  331:  * @msgid = MessageID
  332:  * @topic = topic
  333:  * @user = username
  334:  * @host = hostname
  335:  * @retain = -1 no matter
  336:  * return: -1 error, 0 no changes or >0 deleted rows
  337:  */
  338: int
  339: mqtt_rtlm_delete_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
  340: 		const char *topic, const char *user, const char *host, char retain)
  341: {
  342: 	int ret = 0;
  343: 	char *str, *rtn, szStmt[BUFSIZ] = { 0 };
  344: 	sqlite3_stmt *stmt;
  345: 
  346: 	if (!cfg || !sql || !topic)
  347: 		return -1;
  348: 
  349: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
  350: 	if (!str) {
  351: 		mqtt_rtlm_log("Error:: not found topics table name");
  352: 		return -1;
  353: 	}
  354: 	switch (retain) {
  355: 		case -1:
  356: 			rtn = "";
  357: 			break;
  358: 		case 0:
  359: 			rtn = "AND Retain = 0";
  360: 			break;
  361: 		default:
  362: 			rtn = "AND Retain != 0";
  363: 			break;
  364: 	}
  365: 	snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE ConnID = '%s' AND MsgID = %d AND "
  366: 			"Topic LIKE '%s' AND PubUser LIKE '%s' AND PubHost LIKE '%s' %s;", str, 
  367: 			connid, msgid, topic, user, host, rtn);
  368: 
  369: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  370: 		MQTT_RTLM_LOG(sql);
  371: 		return -1;
  372: 	}
  373: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  374: 		ret = sqlite3_changes(sql);
  375: 	else {
  376: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  377: 			MQTT_RTLM_LOG(sql);
  378: 		ret = 0;
  379: 	}
  380: 	sqlite3_finalize(stmt);
  381: 
  382: 	return ret;
  383: }
  384: 
  385: /*
  386:  * mqtt_rtlm_read_topic() Get topic
  387:  *
  388:  * @cfg = loaded config
  389:  * @sql = SQL handle
  390:  * @connid = connection id
  391:  * @msgid = MessageID
  392:  * @topic = topic
  393:  * @retain = retain 0 get only dynamic, >0 get only retained and -1 no matter
  394:  * return: NULL error or not found and !=NULL allocated subscribe topics
  395:  */
  396: mqtt_subscr_t *
  397: mqtt_rtlm_read_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
  398: 		const char *topic, char retain)
  399: {
  400: 	int rowz = 0;
  401: 	char *str, szStr[STRSIZ], szStmt[BUFSIZ] = { 0 };
  402: 	sqlite3_stmt *stmt;
  403: 	register int j;
  404: 	mqtt_subscr_t *s = NULL;
  405: 
  406: 	if (!cfg || !sql || !topic)
  407: 		return NULL;
  408: 
  409: 	switch (retain) {
  410: 		case -1:
  411: 			memset(szStr, 0, sizeof szStr);
  412: 			break;
  413: 		case 0:
  414: 			snprintf(szStr, sizeof szStr, "AND Retain = 0");
  415: 			break;
  416: 		default:
  417: 			snprintf(szStr, sizeof szStr, "AND Retain > 0");
  418: 			break;
  419: 	}
  420: 
  421: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
  422: 	if (!str) {
  423: 		mqtt_rtlm_log("Error:: not found topics table name");
  424: 		return NULL;
  425: 	}
  426: 	snprintf(szStmt, sizeof szStmt, "SELECT Retain, Topic, Value FROM %s WHERE "
  427: 			"ConnID = '%s' AND MsgID = %d AND Topic LIKE '%s' %s;", 
  428: 			str, connid, msgid, topic, szStr);
  429: 
  430: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  431: 		MQTT_RTLM_LOG(sql);
  432: 		return NULL;
  433: 	}
  434: 
  435: 	/* calculate count of rows and allocate subscribe items */
  436: 	while (sqlite3_step(stmt) == SQLITE_ROW)
  437: 		rowz++;
  438: 	if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
  439: 		mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
  440: 		goto end;
  441: 	} else
  442: 		memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
  443: 	sqlite3_reset(stmt);
  444: 
  445: 	/* fill with data */
  446: 	for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
  447: 		s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
  448: 		s[j].sub_topic.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
  449: 		s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
  450: 		s[j].sub_value.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 2));
  451: 		s[j].sub_value.msg_len = strlen((char*) s[j].sub_value.msg_base);
  452: 	}
  453: end:
  454: 	sqlite3_finalize(stmt);
  455: 
  456: 	return s;
  457: }
  458: 
  459: /*
  460:  * mqtt_rtlm_write_subscribe() Subscribe topic
  461:  *
  462:  * @cfg = loaded config
  463:  * @sql = SQL handle
  464:  * @connid = connection id
  465:  * @msgid = MessageID
  466:  * @topic = topic
  467:  * @user = username
  468:  * @host = hostname
  469:  * @qos = Subscribe QoS
  470:  * return: -1 error, 0 no publish or >0 published ok
  471:  */
  472: int
  473: mqtt_rtlm_write_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
  474: 		const char *topic, const char *user, const char *host, char qos)
  475: {
  476: 	int ret = 0;
  477: 	char *str, szStmt[BUFSIZ] = { 0 };
  478: 	sqlite3_stmt *stmt;
  479: 
  480: 	if (!cfg || !sql || !topic)
  481: 		return -1;
  482: 
  483: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
  484: 	if (!str) {
  485: 		mqtt_rtlm_log("Error:: not found subscribes table name");
  486: 		return -1;
  487: 	}
  488: 	snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (ConnID, MsgID, QoS, Topic, PubUser, "
  489: 			"PubDate, PubHost) VALUES ('%s', %d, %d, '%s', '%s', "
  490: 			"datetime('now', 'localtime'), '%s');", str, 
  491: 			connid, msgid, qos, topic, user, host);
  492: 
  493: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  494: 		MQTT_RTLM_LOG(sql);
  495: 		return -1;
  496: 	}
  497: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  498: 		ret = sqlite3_changes(sql);
  499: 	else {
  500: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  501: 			MQTT_RTLM_LOG(sql);
  502: 		ret = 0;
  503: 	}
  504: 	sqlite3_finalize(stmt);
  505: 
  506: 	return ret;
  507: }
  508: 
  509: /*
  510:  * mqtt_rtlm_delete_subscribe() Delete subscribe
  511:  *
  512:  * @cfg = loaded config
  513:  * @sql = SQL handle
  514:  * @connid = connection id
  515:  * @topic = topic
  516:  * @user = username
  517:  * @host = hostname
  518:  * return: -1 error, 0 no changes or >0 deleted rows
  519:  */
  520: int
  521: mqtt_rtlm_delete_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, 
  522: 		const char *topic, const char *user, const char *host)
  523: {
  524: 	int ret = 0;
  525: 	char *str, szStmt[BUFSIZ] = { 0 };
  526: 	sqlite3_stmt *stmt;
  527: 
  528: 	if (!cfg || !sql || !topic)
  529: 		return -1;
  530: 
  531: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
  532: 	if (!str) {
  533: 		mqtt_rtlm_log("Error:: not found subscribes table name");
  534: 		return -1;
  535: 	}
  536: 	snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE ConnID = '%s' AND "
  537: 			"Topic LIKE '%s' AND PubUser LIKE '%s' AND PubHost LIKE '%s';", str, 
  538: 			connid, topic, user, host);
  539: 
  540: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  541: 		MQTT_RTLM_LOG(sql);
  542: 		return -1;
  543: 	}
  544: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  545: 		ret = sqlite3_changes(sql);
  546: 	else {
  547: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  548: 			MQTT_RTLM_LOG(sql);
  549: 		ret = 0;
  550: 	}
  551: 	sqlite3_finalize(stmt);
  552: 
  553: 	return ret;
  554: }
  555: 
  556: /*
  557:  * mqtt_rtlm_read_subscribe() Get subscribe topic
  558:  *
  559:  * @cfg = loaded config
  560:  * @sql = SQL handle
  561:  * @connid = connection id
  562:  * @topic = topic
  563:  * return: NULL error or not found and !=NULL allocated subscribe topics
  564:  */
  565: mqtt_subscr_t *
  566: mqtt_rtlm_read_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *topic)
  567: {
  568: 	int rowz = 0;
  569: 	char *str, szStmt[BUFSIZ] = { 0 };
  570: 	sqlite3_stmt *stmt;
  571: 	register int j;
  572: 	mqtt_subscr_t *s = NULL;
  573: 
  574: 	if (!cfg || !sql || !topic)
  575: 		return NULL;
  576: 
  577: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
  578: 	if (!str) {
  579: 		mqtt_rtlm_log("Error:: not found subscribes table name");
  580: 		return NULL;
  581: 	}
  582: 	snprintf(szStmt, sizeof szStmt, "SELECT QoS, Topic FROM %s WHERE ConnID = '%s' AND "
  583: 			"Topic LIKE '%s';", str, connid, topic);
  584: 
  585: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  586: 		MQTT_RTLM_LOG(sql);
  587: 		return NULL;
  588: 	}
  589: 
  590: 	/* calculate count of rows and allocate subscribe items */
  591: 	while (sqlite3_step(stmt) == SQLITE_ROW)
  592: 		rowz++;
  593: 	if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
  594: 		mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
  595: 		goto end;
  596: 	} else
  597: 		memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
  598: 	sqlite3_reset(stmt);
  599: 
  600: 	/* fill with data */
  601: 	for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
  602: 		s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
  603: 		s[j].sub_topic.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
  604: 		s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
  605: 		s[j].sub_value.msg_base = NULL;
  606: 		s[j].sub_value.msg_len = 0;
  607: 	}
  608: end:
  609: 	sqlite3_finalize(stmt);
  610: 
  611: 	return s;
  612: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>