File:  [ELWIX - Embedded LightWeight unIX -] / mqtt / src / pubmqtt.c
Revision 1.2.2.10: download - view: text, annotated - select for diffs - revision graph
Sun May 27 10:12:48 2012 UTC (12 years, 1 month ago) by misho
Branches: mqtt1_1
add init/fini preloaded actions

    1: #include "global.h"
    2: 
    3: 
    4: extern const char sql_schema[];
    5: 
    6: 
    7: /*
    8:  * mqtt_db_log() Log database connection message
    9:  *
   10:  * @fmt = format string
   11:  * @... = argument list
   12:  * return: none
   13:  */
   14: static void
   15: mqtt_rtlm_log(const char *fmt, ...)
   16: {
   17: 	va_list lst;
   18: 
   19: 	va_start(lst, fmt);
   20: 	vsyslog(LOG_ERR, fmt, lst);
   21: 	va_end(lst);
   22: }
   23: #define MQTT_RTLM_LOG(_sql)	(assert((_sql)), mqtt_rtlm_log("Error:: %s(%d) SQL #%d - %s", \
   24: 					__func__, __LINE__, \
   25: 					sqlite3_errcode((_sql)), sqlite3_errmsg((_sql))))
   26: 
   27: /* library pre-loaded actions */
   28: void
   29: _init()
   30: {
   31: 	sqlite3_initialize();
   32: }
   33: 
   34: void
   35: _fini()
   36: {
   37: 	sqlite3_shutdown();
   38: }
   39: 
   40: 
   41: /*
   42:  * mqtt_rtlm_open() Open database connection
   43:  *
   44:  * @cfg = config filename
   45:  * return: NULL error or SQL handle
   46:  */
   47: sqlite3 *
   48: mqtt_rtlm_open(cfg_root_t *cfg)
   49: {
   50: 	sqlite3 *sql = NULL;
   51: 	const char *str = NULL;
   52: 
   53: 	if (!cfg)
   54: 		return NULL;
   55: 
   56: 	str = cfg_getAttribute(cfg, "mqtt_pub", "name");
   57: 	if (!str) {
   58: 		mqtt_rtlm_log("Error:: Unknown database name ...\n");
   59: 		return NULL;
   60: 	}
   61: 
   62: 	if (sqlite3_open_v2(str, &sql, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL)) {
   63: 		MQTT_RTLM_LOG(sql);
   64: 		sqlite3_close(sql);
   65: 		return NULL;
   66: 	}
   67: 
   68: 	if (sqlite3_exec(sql, sql_schema, NULL, NULL, NULL)) {
   69: 		MQTT_RTLM_LOG(sql);
   70: 		sqlite3_close(sql);
   71: 		return NULL;
   72: 	}
   73: 	return sql;
   74: }
   75: 
   76: /*
   77:  * mqtt_rtlm_close() Close database connection
   78:  *
   79:  * @sql = SQL handle
   80:  * return: none
   81:  */
   82: void
   83: mqtt_rtlm_close(sqlite3 *sql)
   84: {
   85: 	sqlite3_close(sql);
   86: }
   87: 
   88: /*
   89:  * mqtt_rtlm_init_session() Create session
   90:  *
   91:  * @cfg = loaded config
   92:  * @sql = SQL handle
   93:  * @connid = connection id
   94:  * @user = username
   95:  * @host = hostname
   96:  * @will = will flag if !=0 must fill arguments
   97:  * @... = will arguments in order topic,msg,qos,retain
   98:  * return: -1 error, 0 session already appears or >0 row changed
   99:  */
  100: int
  101: mqtt_rtlm_init_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, 
  102: 		const char *host, char will, ...)
  103: {
  104: 	va_list lst;
  105: 	int ret = 0;
  106: 	char *str, szStmt[BUFSIZ] = { 0 };
  107: 	sqlite3_stmt *stmt;
  108: 
  109: 	if (!cfg || !sql)
  110: 		return -1;
  111: 
  112: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
  113: 	if (!str) {
  114: 		mqtt_rtlm_log("Error:: not found online table name");
  115: 		return -1;
  116: 	}
  117: 	if (!will)
  118: 		snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (ConnID, Username, RemoteHost, "
  119: 				"WillFlag) VALUES ('%s', '%s', '%s', 0);", str, connid, user, host);
  120: 	else {
  121: 		va_start(lst, will);
  122: 		snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (ConnID, Username, RemoteHost, "
  123: 				"WillFlag, WillRetain, WillQoS, WillMsg, WillTopic) "
  124: 				"VALUES ('%s', '%s', '%s', %d, %d, %d, '%s', '%s');", 
  125: 				str, connid, user, host, will, 
  126: 				va_arg(lst, int), va_arg(lst, int), va_arg(lst, char*), va_arg(lst, char*));
  127: 		va_end(lst);
  128: 	}
  129: 
  130: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  131: 		MQTT_RTLM_LOG(sql);
  132: 		return -1;
  133: 	}
  134: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  135: 		ret = sqlite3_changes(sql);
  136: 	else {
  137: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  138: 			MQTT_RTLM_LOG(sql);
  139: 		ret = 0;
  140: 	}
  141: 	sqlite3_finalize(stmt);
  142: 
  143: 	return ret;
  144: }
  145: 
  146: /*
  147:  * mqtt_rtlm_fini_session() Delete session(s)
  148:  *
  149:  * @cfg = loaded config
  150:  * @sql = SQL handle
  151:  * @connid = connection id
  152:  * @user = username
  153:  * @host = hostname
  154:  * return: -1 error, 0 session already appears or >0 row changed
  155:  */
  156: int
  157: mqtt_rtlm_fini_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
  158: {
  159: 	int ret = 0;
  160: 	char *str, szStmt[BUFSIZ] = { 0 };
  161: 	sqlite3_stmt *stmt;
  162: 
  163: 	if (!cfg || !sql)
  164: 		return -1;
  165: 
  166: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
  167: 	if (!str) {
  168: 		mqtt_rtlm_log("Error:: not found online table name");
  169: 		return -1;
  170: 	}
  171: 	snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE ConnID = '%s' AND Username = '%s' "
  172: 			"AND RemoteHost LIKE '%s';", str, connid, user, host);
  173: 
  174: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  175: 		MQTT_RTLM_LOG(sql);
  176: 		return -1;
  177: 	}
  178: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  179: 		ret = sqlite3_changes(sql);
  180: 	else {
  181: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  182: 			MQTT_RTLM_LOG(sql);
  183: 		ret = 0;
  184: 	}
  185: 	sqlite3_finalize(stmt);
  186: 
  187: 	return ret;
  188: }
  189: 
  190: /*
  191:  * mqtt_rtlm_chk_session() Check session(s)
  192:  *
  193:  * @cfg = loaded config
  194:  * @sql = SQL handle
  195:  * @connid = connection id
  196:  * @user = username
  197:  * @host = hostname
  198:  * return: -1 error, 0 not logged or >0 logged found rows
  199:  */
  200: int
  201: mqtt_rtlm_chk_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
  202: {
  203: 	int ret = 0;
  204: 	char *str, szStmt[BUFSIZ] = { 0 };
  205: 	sqlite3_stmt *stmt;
  206: 
  207: 	if (!cfg || !sql)
  208: 		return -1;
  209: 
  210: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
  211: 	if (!str) {
  212: 		mqtt_rtlm_log("Error:: not found online table name");
  213: 		return -1;
  214: 	}
  215: 	snprintf(szStmt, sizeof szStmt, "SELECT ConnID, RemoteHost FROM %s WHERE "
  216: 			"ConnID = '%s' AND Username LIKE '%s' AND RemoteHost LIKE '%s';", 
  217: 			str, connid, user, host);
  218: 
  219: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  220: 		MQTT_RTLM_LOG(sql);
  221: 		return -1;
  222: 	}
  223: 	if (sqlite3_step(stmt) == SQLITE_ROW)
  224: 		ret = sqlite3_changes(sql);
  225: 	else
  226: 		ret = 0;
  227: 	sqlite3_finalize(stmt);
  228: 
  229: 	return ret;
  230: }
  231: 
  232: /*
  233:  * mqtt_rtlm_write_topic() Publish topic
  234:  *
  235:  * @cfg = loaded config
  236:  * @sql = SQL handle
  237:  * @connid = connection id
  238:  * @msgid = MessageID
  239:  * @topic = topic
  240:  * @txt = text
  241:  * @user = username
  242:  * @host = hostname
  243:  * @retain = !=0 retain message to database
  244:  * return: -1 error, 0 no publish or >0 published ok
  245:  */
  246: int
  247: mqtt_rtlm_write_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
  248: 		const char *topic, const char *txt, const char *user, const char *host, char retain)
  249: {
  250: 	int ret = 0;
  251: 	char *str, szStmt[BUFSIZ] = { 0 };
  252: 	sqlite3_stmt *stmt;
  253: 
  254: 	if (!cfg || !sql || !topic)
  255: 		return -1;
  256: 
  257: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
  258: 	if (!str) {
  259: 		mqtt_rtlm_log("Error:: not found topics table name");
  260: 		return -1;
  261: 	}
  262: 	snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Retain, ConnID, MsgID, Topic, Value, PubUser, "
  263: 			"PubDate, PubHost) VALUES (%d, '%s', %d, '%s', '%s', '%s', "
  264: 			"datetime('now', 'localtime'), '%s');", 
  265: 			str, retain, connid, msgid, topic, txt, user, host);
  266: 
  267: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  268: 		MQTT_RTLM_LOG(sql);
  269: 		return -1;
  270: 	}
  271: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  272: 		ret = sqlite3_changes(sql);
  273: 	else {
  274: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  275: 			MQTT_RTLM_LOG(sql);
  276: 		ret = 0;
  277: 	}
  278: 	sqlite3_finalize(stmt);
  279: 
  280: 	return ret;
  281: }
  282: 
  283: /*
  284:  * mqtt_rtlm_wipe_topic() Wipe all topics
  285:  *
  286:  * @cfg = loaded config
  287:  * @sql = SQL handle
  288:  * @connid = connection id
  289:  * @user = username
  290:  * @retain = -1 no matter
  291:  * return: -1 error, 0 no changes or >0 deleted rows
  292:  */
  293: int
  294: mqtt_rtlm_wipe_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, char retain)
  295: {
  296: 	int ret = 0;
  297: 	char *str, *rtn, szStmt[BUFSIZ] = { 0 };
  298: 	sqlite3_stmt *stmt;
  299: 
  300: 	if (!cfg || !sql || !connid)
  301: 		return -1;
  302: 
  303: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
  304: 	if (!str) {
  305: 		mqtt_rtlm_log("Error:: not found topics table name");
  306: 		return -1;
  307: 	}
  308: 	switch (retain) {
  309: 		case -1:
  310: 			rtn = "";
  311: 			break;
  312: 		case 0:
  313: 			rtn = "AND Retain = 0";
  314: 			break;
  315: 		default:
  316: 			rtn = "AND Retain != 0";
  317: 			break;
  318: 	}
  319: 	snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE ConnID = '%s' AND "
  320: 			"PubUser LIKE '%s' %s;", str, connid, user, rtn);
  321: 
  322: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  323: 		MQTT_RTLM_LOG(sql);
  324: 		return -1;
  325: 	}
  326: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  327: 		ret = sqlite3_changes(sql);
  328: 	else {
  329: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  330: 			MQTT_RTLM_LOG(sql);
  331: 		ret = 0;
  332: 	}
  333: 	sqlite3_finalize(stmt);
  334: 
  335: 	return ret;
  336: }
  337: 
  338: /*
  339:  * mqtt_rtlm_delete_topic() Delete topic
  340:  *
  341:  * @cfg = loaded config
  342:  * @sql = SQL handle
  343:  * @connid = connection id
  344:  * @msgid = MessageID
  345:  * @topic = topic
  346:  * @user = username
  347:  * @host = hostname
  348:  * @retain = -1 no matter
  349:  * return: -1 error, 0 no changes or >0 deleted rows
  350:  */
  351: int
  352: mqtt_rtlm_delete_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
  353: 		const char *topic, const char *user, const char *host, char retain)
  354: {
  355: 	int ret = 0;
  356: 	char *str, *rtn, szStmt[BUFSIZ] = { 0 };
  357: 	sqlite3_stmt *stmt;
  358: 
  359: 	if (!cfg || !sql || !topic)
  360: 		return -1;
  361: 
  362: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
  363: 	if (!str) {
  364: 		mqtt_rtlm_log("Error:: not found topics table name");
  365: 		return -1;
  366: 	}
  367: 	switch (retain) {
  368: 		case -1:
  369: 			rtn = "";
  370: 			break;
  371: 		case 0:
  372: 			rtn = "AND Retain = 0";
  373: 			break;
  374: 		default:
  375: 			rtn = "AND Retain != 0";
  376: 			break;
  377: 	}
  378: 	snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE ConnID = '%s' AND MsgID = %d AND "
  379: 			"Topic LIKE '%s' AND PubUser LIKE '%s' AND PubHost LIKE '%s' %s;", str, 
  380: 			connid, msgid, topic, user, host, rtn);
  381: 
  382: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  383: 		MQTT_RTLM_LOG(sql);
  384: 		return -1;
  385: 	}
  386: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  387: 		ret = sqlite3_changes(sql);
  388: 	else {
  389: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  390: 			MQTT_RTLM_LOG(sql);
  391: 		ret = 0;
  392: 	}
  393: 	sqlite3_finalize(stmt);
  394: 
  395: 	return ret;
  396: }
  397: 
  398: /*
  399:  * mqtt_rtlm_read_topic() Get topic
  400:  *
  401:  * @cfg = loaded config
  402:  * @sql = SQL handle
  403:  * @connid = connection id
  404:  * @msgid = MessageID
  405:  * @topic = topic
  406:  * @retain = retain 0 get only dynamic, >0 get only retained and -1 no matter
  407:  * return: NULL error or not found and !=NULL allocated subscribe topics
  408:  */
  409: mqtt_subscr_t *
  410: mqtt_rtlm_read_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
  411: 		const char *topic, char retain)
  412: {
  413: 	int rowz = 0;
  414: 	char *str, szStr[STRSIZ], szStmt[BUFSIZ] = { 0 };
  415: 	sqlite3_stmt *stmt;
  416: 	register int j;
  417: 	mqtt_subscr_t *s = NULL;
  418: 
  419: 	if (!cfg || !sql || !topic)
  420: 		return NULL;
  421: 
  422: 	switch (retain) {
  423: 		case -1:
  424: 			memset(szStr, 0, sizeof szStr);
  425: 			break;
  426: 		case 0:
  427: 			snprintf(szStr, sizeof szStr, "AND Retain = 0");
  428: 			break;
  429: 		default:
  430: 			snprintf(szStr, sizeof szStr, "AND Retain > 0");
  431: 			break;
  432: 	}
  433: 
  434: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
  435: 	if (!str) {
  436: 		mqtt_rtlm_log("Error:: not found topics table name");
  437: 		return NULL;
  438: 	}
  439: 	snprintf(szStmt, sizeof szStmt, "SELECT Retain, Topic, Value FROM %s WHERE "
  440: 			"ConnID = '%s' AND MsgID = %d AND Topic LIKE '%s' %s;", 
  441: 			str, connid, msgid, topic, szStr);
  442: 
  443: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  444: 		MQTT_RTLM_LOG(sql);
  445: 		return NULL;
  446: 	}
  447: 
  448: 	/* calculate count of rows and allocate subscribe items */
  449: 	while (sqlite3_step(stmt) == SQLITE_ROW)
  450: 		rowz++;
  451: 	if (!(s = io_malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
  452: 		mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
  453: 		goto end;
  454: 	} else
  455: 		memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
  456: 	sqlite3_reset(stmt);
  457: 
  458: 	/* fill with data */
  459: 	for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
  460: 		s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
  461: 		s[j].sub_topic.msg_base = (u_char*) io_strdup((char*) sqlite3_column_text(stmt, 1));
  462: 		s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
  463: 		s[j].sub_value.msg_base = (u_char*) io_strdup((char*) sqlite3_column_text(stmt, 2));
  464: 		s[j].sub_value.msg_len = strlen((char*) s[j].sub_value.msg_base);
  465: 	}
  466: end:
  467: 	sqlite3_finalize(stmt);
  468: 
  469: 	return s;
  470: }
  471: 
  472: /*
  473:  * mqtt_rtlm_write_subscribe() Subscribe topic
  474:  *
  475:  * @cfg = loaded config
  476:  * @sql = SQL handle
  477:  * @connid = connection id
  478:  * @msgid = MessageID
  479:  * @topic = topic
  480:  * @user = username
  481:  * @host = hostname
  482:  * @qos = Subscribe QoS
  483:  * return: -1 error, 0 no publish or >0 published ok
  484:  */
  485: int
  486: mqtt_rtlm_write_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
  487: 		const char *topic, const char *user, const char *host, char qos)
  488: {
  489: 	int ret = 0;
  490: 	char *str, szStmt[BUFSIZ] = { 0 };
  491: 	sqlite3_stmt *stmt;
  492: 
  493: 	if (!cfg || !sql || !topic)
  494: 		return -1;
  495: 
  496: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
  497: 	if (!str) {
  498: 		mqtt_rtlm_log("Error:: not found subscribes table name");
  499: 		return -1;
  500: 	}
  501: 	snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (ConnID, MsgID, QoS, Topic, PubUser, "
  502: 			"PubDate, PubHost) VALUES ('%s', %d, %d, '%s', '%s', "
  503: 			"datetime('now', 'localtime'), '%s');", str, 
  504: 			connid, msgid, qos, topic, user, host);
  505: 
  506: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  507: 		MQTT_RTLM_LOG(sql);
  508: 		return -1;
  509: 	}
  510: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  511: 		ret = sqlite3_changes(sql);
  512: 	else {
  513: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  514: 			MQTT_RTLM_LOG(sql);
  515: 		ret = 0;
  516: 	}
  517: 	sqlite3_finalize(stmt);
  518: 
  519: 	return ret;
  520: }
  521: 
  522: /*
  523:  * mqtt_rtlm_delete_subscribe() Delete subscribe
  524:  *
  525:  * @cfg = loaded config
  526:  * @sql = SQL handle
  527:  * @connid = connection id
  528:  * @topic = topic
  529:  * @user = username
  530:  * @host = hostname
  531:  * return: -1 error, 0 no changes or >0 deleted rows
  532:  */
  533: int
  534: mqtt_rtlm_delete_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, 
  535: 		const char *topic, const char *user, const char *host)
  536: {
  537: 	int ret = 0;
  538: 	char *str, szStmt[BUFSIZ] = { 0 };
  539: 	sqlite3_stmt *stmt;
  540: 
  541: 	if (!cfg || !sql || !topic)
  542: 		return -1;
  543: 
  544: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
  545: 	if (!str) {
  546: 		mqtt_rtlm_log("Error:: not found subscribes table name");
  547: 		return -1;
  548: 	}
  549: 	snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE ConnID = '%s' AND "
  550: 			"Topic LIKE '%s' AND PubUser LIKE '%s' AND PubHost LIKE '%s';", str, 
  551: 			connid, topic, user, host);
  552: 
  553: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  554: 		MQTT_RTLM_LOG(sql);
  555: 		return -1;
  556: 	}
  557: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  558: 		ret = sqlite3_changes(sql);
  559: 	else {
  560: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  561: 			MQTT_RTLM_LOG(sql);
  562: 		ret = 0;
  563: 	}
  564: 	sqlite3_finalize(stmt);
  565: 
  566: 	return ret;
  567: }
  568: 
  569: /*
  570:  * mqtt_rtlm_read_subscribe() Get subscribe topic
  571:  *
  572:  * @cfg = loaded config
  573:  * @sql = SQL handle
  574:  * @connid = connection id
  575:  * @topic = topic
  576:  * return: NULL error or not found and !=NULL allocated subscribe topics
  577:  */
  578: mqtt_subscr_t *
  579: mqtt_rtlm_read_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *topic)
  580: {
  581: 	int rowz = 0;
  582: 	char *str, szStmt[BUFSIZ] = { 0 };
  583: 	sqlite3_stmt *stmt;
  584: 	register int j;
  585: 	mqtt_subscr_t *s = NULL;
  586: 
  587: 	if (!cfg || !sql || !topic)
  588: 		return NULL;
  589: 
  590: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
  591: 	if (!str) {
  592: 		mqtt_rtlm_log("Error:: not found subscribes table name");
  593: 		return NULL;
  594: 	}
  595: 	snprintf(szStmt, sizeof szStmt, "SELECT QoS, Topic FROM %s WHERE ConnID = '%s' AND "
  596: 			"Topic LIKE '%s';", str, connid, topic);
  597: 
  598: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  599: 		MQTT_RTLM_LOG(sql);
  600: 		return NULL;
  601: 	}
  602: 
  603: 	/* calculate count of rows and allocate subscribe items */
  604: 	while (sqlite3_step(stmt) == SQLITE_ROW)
  605: 		rowz++;
  606: 	if (!(s = io_malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
  607: 		mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
  608: 		goto end;
  609: 	} else
  610: 		memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
  611: 	sqlite3_reset(stmt);
  612: 
  613: 	/* fill with data */
  614: 	for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
  615: 		s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
  616: 		s[j].sub_topic.msg_base = (u_char*) io_strdup((char*) sqlite3_column_text(stmt, 1));
  617: 		s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
  618: 		s[j].sub_value.msg_base = NULL;
  619: 		s[j].sub_value.msg_len = 0;
  620: 	}
  621: end:
  622: 	sqlite3_finalize(stmt);
  623: 
  624: 	return s;
  625: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>