File:  [ELWIX - Embedded LightWeight unIX -] / mqtt / src / pubmqtt.c
Revision 1.2.2.5: download - view: text, annotated - select for diffs - revision graph
Wed Apr 25 12:04:30 2012 UTC (12 years, 2 months ago) by misho
Branches: mqtt1_1
...

    1: #include "global.h"
    2: 
    3: 
    4: extern const char sql_schema[];
    5: 
    6: 
    7: /*
    8:  * mqtt_db_log() Log database connection message
    9:  *
   10:  * @fmt = format string
   11:  * @... = argument list
   12:  * return: none
   13:  */
   14: static void
   15: mqtt_rtlm_log(const char *fmt, ...)
   16: {
   17: 	va_list lst;
   18: 
   19: 	va_start(lst, fmt);
   20: 	vsyslog(LOG_ERR, fmt, lst);
   21: 	va_end(lst);
   22: }
   23: #define MQTT_RTLM_LOG(_sql)	(assert((_sql)), mqtt_rtlm_log("Error:: %s(%d) SQL #%d - %s", \
   24: 					__func__, __LINE__, \
   25: 					sqlite3_errcode((_sql)), sqlite3_errmsg((_sql))))
   26: 
   27: 
   28: /*
   29:  * mqtt_rtlm_open() Open database connection
   30:  *
   31:  * @cfg = config filename
   32:  * return: NULL error or SQL handle
   33:  */
   34: sqlite3 *
   35: mqtt_rtlm_open(cfg_root_t *cfg)
   36: {
   37: 	sqlite3 *sql = NULL;
   38: 	const char *str = NULL;
   39: 
   40: 	if (!cfg)
   41: 		return NULL;
   42: 
   43: 	sqlite3_config(SQLITE_CONFIG_SERIALIZED);
   44: 	if (!sqlite3_threadsafe())
   45: 		return NULL;
   46: 
   47: 	str = cfg_getAttribute(cfg, "mqtt_pub", "name");
   48: 	if (!str) {
   49: 		mqtt_rtlm_log("Error:: Unknown database name ...\n");
   50: 		return NULL;
   51: 	}
   52: 
   53: 	if (sqlite3_open_v2(str, &sql, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL)) {
   54: 		MQTT_RTLM_LOG(sql);
   55: 		sqlite3_close(sql);
   56: 		return NULL;
   57: 	}
   58: 
   59: 	sqlite3_mutex_enter(sqlite3_db_mutex(sql));
   60: 	if (sqlite3_exec(sql, sql_schema, NULL, NULL, NULL)) {
   61: 		MQTT_RTLM_LOG(sql);
   62: 		sqlite3_mutex_leave(sqlite3_db_mutex(sql));
   63: 		sqlite3_close(sql);
   64: 		return NULL;
   65: 	}
   66: 	sqlite3_mutex_leave(sqlite3_db_mutex(sql));
   67: 	return sql;
   68: }
   69: 
   70: /*
   71:  * mqtt_rtlm_close() Close database connection
   72:  *
   73:  * @sql = SQL handle
   74:  * return: none
   75:  */
   76: void
   77: mqtt_rtlm_close(sqlite3 *sql)
   78: {
   79: 	sqlite3_close(sql);
   80: }
   81: 
   82: /*
   83:  * mqtt_rtlm_init_session() Create session
   84:  *
   85:  * @cfg = loaded config
   86:  * @sql = SQL handle
   87:  * @connid = connection id
   88:  * @user = username
   89:  * @host = hostname
   90:  * @will = will flag if !=0 must fill arguments
   91:  * @... = will arguments in order topic,msg,qos,retain
   92:  * return: -1 error, 0 session already appears or >0 row changed
   93:  */
   94: int
   95: mqtt_rtlm_init_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, 
   96: 		const char *host, char will, ...)
   97: {
   98: 	va_list lst;
   99: 	int ret = 0;
  100: 	char *str, szStmt[BUFSIZ] = { 0 };
  101: 	sqlite3_stmt *stmt;
  102: 
  103: 	if (!cfg || !sql)
  104: 		return -1;
  105: 
  106: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
  107: 	if (!str) {
  108: 		mqtt_rtlm_log("Error:: not found online table name");
  109: 		return -1;
  110: 	}
  111: 	if (!will)
  112: 		snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (ConnID, Username, RemoteHost, "
  113: 				"WillFlag) VALUES ('%s', '%s', '%s', 0);", str, connid, user, host);
  114: 	else {
  115: 		va_start(lst, will);
  116: 		snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (ConnID, Username, RemoteHost, "
  117: 				"WillFlag, WillRetain, WillQoS, WillMsg, WillTopic) "
  118: 				"VALUES ('%s', '%s', '%s', %d, %d, %d, '%s', '%s');", 
  119: 				str, connid, user, host, will, 
  120: 				va_arg(lst, int), va_arg(lst, int), va_arg(lst, char*), va_arg(lst, char*));
  121: 		va_end(lst);
  122: 	}
  123: 
  124: 	sqlite3_mutex_enter(sqlite3_db_mutex(sql));
  125: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  126: 		MQTT_RTLM_LOG(sql);
  127: 		sqlite3_mutex_leave(sqlite3_db_mutex(sql));
  128: 		return -1;
  129: 	}
  130: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  131: 		ret = sqlite3_changes(sql);
  132: 	else {
  133: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  134: 			MQTT_RTLM_LOG(sql);
  135: 		ret = 0;
  136: 	}
  137: 	sqlite3_finalize(stmt);
  138: 	sqlite3_mutex_leave(sqlite3_db_mutex(sql));
  139: 
  140: 	return ret;
  141: }
  142: 
  143: /*
  144:  * mqtt_rtlm_fini_session() Delete session(s)
  145:  *
  146:  * @cfg = loaded config
  147:  * @sql = SQL handle
  148:  * @connid = connection id
  149:  * @user = username
  150:  * @host = hostname
  151:  * return: -1 error, 0 session already appears or >0 row changed
  152:  */
  153: int
  154: mqtt_rtlm_fini_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
  155: {
  156: 	int ret = 0;
  157: 	char *str, szStmt[BUFSIZ] = { 0 };
  158: 	sqlite3_stmt *stmt;
  159: 
  160: 	if (!cfg || !sql)
  161: 		return -1;
  162: 
  163: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
  164: 	if (!str) {
  165: 		mqtt_rtlm_log("Error:: not found online table name");
  166: 		return -1;
  167: 	}
  168: 	snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE ConnID = '%s' AND Username = '%s' "
  169: 			"AND RemoteHost LIKE '%s';", str, connid, user, host);
  170: 
  171: 	sqlite3_mutex_enter(sqlite3_db_mutex(sql));
  172: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  173: 		MQTT_RTLM_LOG(sql);
  174: 		sqlite3_mutex_leave(sqlite3_db_mutex(sql));
  175: 		return -1;
  176: 	}
  177: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  178: 		ret = sqlite3_changes(sql);
  179: 	else {
  180: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  181: 			MQTT_RTLM_LOG(sql);
  182: 		ret = 0;
  183: 	}
  184: 	sqlite3_finalize(stmt);
  185: 	sqlite3_mutex_leave(sqlite3_db_mutex(sql));
  186: 
  187: 	return ret;
  188: }
  189: 
  190: /*
  191:  * mqtt_rtlm_chk_session() Check session(s)
  192:  *
  193:  * @cfg = loaded config
  194:  * @sql = SQL handle
  195:  * @connid = connection id
  196:  * @user = username
  197:  * @host = hostname
  198:  * return: -1 error, 0 not logged or >0 logged found rows
  199:  */
  200: int
  201: mqtt_rtlm_chk_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
  202: {
  203: 	int ret = 0;
  204: 	char *str, szStmt[BUFSIZ] = { 0 };
  205: 	sqlite3_stmt *stmt;
  206: 
  207: 	if (!cfg || !sql)
  208: 		return -1;
  209: 
  210: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
  211: 	if (!str) {
  212: 		mqtt_rtlm_log("Error:: not found online table name");
  213: 		return -1;
  214: 	}
  215: 	snprintf(szStmt, sizeof szStmt, "SELECT ConnID, RemoteHost FROM %s WHERE "
  216: 			"ConnID = '%s' AND Username = '%s' AND RemoteHost LIKE '%s';", 
  217: 			str, connid, user, host);
  218: 
  219: 	sqlite3_mutex_enter(sqlite3_db_mutex(sql));
  220: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  221: 		MQTT_RTLM_LOG(sql);
  222: 		sqlite3_mutex_leave(sqlite3_db_mutex(sql));
  223: 		return -1;
  224: 	}
  225: 	if (sqlite3_step(stmt) == SQLITE_ROW)
  226: 		ret = sqlite3_changes(sql);
  227: 	else
  228: 		ret = 0;
  229: 	sqlite3_finalize(stmt);
  230: 	sqlite3_mutex_leave(sqlite3_db_mutex(sql));
  231: 
  232: 	return ret;
  233: }
  234: 
  235: /*
  236:  * mqtt_rtlm_write_topic() Publish topic
  237:  *
  238:  * @cfg = loaded config
  239:  * @sql = SQL handle
  240:  * @msgid = MessageID
  241:  * @topic = topic
  242:  * @txt = text
  243:  * @user = username
  244:  * @host = hostname
  245:  * @retain = !=0 retain message to database
  246:  * return: -1 error, 0 no publish or >0 published ok
  247:  */
  248: int
  249: mqtt_rtlm_write_topic(cfg_root_t *cfg, sqlite3 *sql, u_short msgid, const char *topic, const char *txt, 
  250: 		const char *user, const char *host, char retain)
  251: {
  252: 	int ret = 0;
  253: 	char *str, szStmt[BUFSIZ] = { 0 };
  254: 	sqlite3_stmt *stmt;
  255: 
  256: 	if (!cfg || !sql || !topic)
  257: 		return -1;
  258: 
  259: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
  260: 	if (!str) {
  261: 		mqtt_rtlm_log("Error:: not found topics table name");
  262: 		return -1;
  263: 	}
  264: 	snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Retain, MsgID, Topic, Value, PubUser, "
  265: 			"PubDate, PubHost) VALUES (%d, %d, '%s', '%s', '%s', "
  266: 			"datetime('now', 'localtime'), '%s');", 
  267: 			str, retain, msgid, topic, txt, user, host);
  268: 
  269: 	sqlite3_mutex_enter(sqlite3_db_mutex(sql));
  270: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  271: 		MQTT_RTLM_LOG(sql);
  272: 		sqlite3_mutex_leave(sqlite3_db_mutex(sql));
  273: 		return -1;
  274: 	}
  275: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  276: 		ret = sqlite3_changes(sql);
  277: 	else {
  278: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  279: 			MQTT_RTLM_LOG(sql);
  280: 		ret = 0;
  281: 	}
  282: 	sqlite3_finalize(stmt);
  283: 	sqlite3_mutex_leave(sqlite3_db_mutex(sql));
  284: 
  285: 	return ret;
  286: }
  287: 
  288: /*
  289:  * mqtt_rtlm_delete_topic() Delete topic
  290:  *
  291:  * @cfg = loaded config
  292:  * @sql = SQL handle
  293:  * @msgid = MessageID
  294:  * @topic = topic
  295:  * @user = username
  296:  * @host = hostname
  297:  * @retain = -1 no matter
  298:  * return: -1 error, 0 no changes or >0 deleted rows
  299:  */
  300: int
  301: mqtt_rtlm_delete_topic(cfg_root_t *cfg, sqlite3 *sql, u_short msgid, const char *topic, 
  302: 		const char *user, const char *host, char retain)
  303: {
  304: 	int ret = 0;
  305: 	char *str, *rtn, szStmt[BUFSIZ] = { 0 };
  306: 	sqlite3_stmt *stmt;
  307: 
  308: 	if (!cfg || !sql || !topic)
  309: 		return -1;
  310: 
  311: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
  312: 	if (!str) {
  313: 		mqtt_rtlm_log("Error:: not found topics table name");
  314: 		return -1;
  315: 	}
  316: 	switch (retain) {
  317: 		case -1:
  318: 			rtn = "";
  319: 			break;
  320: 		case 0:
  321: 			rtn = "AND Retain = 0";
  322: 			break;
  323: 		default:
  324: 			rtn = "AND Retain != 0";
  325: 			break;
  326: 	}
  327: 	snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE MsgID = %d AND Topic LIKE '%s' AND "
  328: 			"PubUser LIKE '%s' AND PubHost LIKE '%s' %s;", str, 
  329: 			msgid, topic, user, host, rtn);
  330: 
  331: 	sqlite3_mutex_enter(sqlite3_db_mutex(sql));
  332: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  333: 		MQTT_RTLM_LOG(sql);
  334: 		sqlite3_mutex_leave(sqlite3_db_mutex(sql));
  335: 		return -1;
  336: 	}
  337: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  338: 		ret = sqlite3_changes(sql);
  339: 	else {
  340: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  341: 			MQTT_RTLM_LOG(sql);
  342: 		ret = 0;
  343: 	}
  344: 	sqlite3_finalize(stmt);
  345: 	sqlite3_mutex_leave(sqlite3_db_mutex(sql));
  346: 
  347: 	return ret;
  348: }
  349: 
  350: /*
  351:  * mqtt_rtlm_read_topic() Get topic
  352:  *
  353:  * @cfg = loaded config
  354:  * @sql = SQL handle
  355:  * @msgid = MessageID
  356:  * @topic = topic
  357:  * @retain = retain 0 get only dynamic, >0 get only retained and -1 no matter
  358:  * return: NULL error or not found and !=NULL allocated subscribe topics
  359:  */
  360: mqtt_subscr_t *
  361: mqtt_rtlm_read_topic(cfg_root_t *cfg, sqlite3 *sql, u_short msgid, const char *topic, char retain)
  362: {
  363: 	int rowz = 0;
  364: 	char *str, szStr[STRSIZ], szStmt[BUFSIZ] = { 0 };
  365: 	sqlite3_stmt *stmt;
  366: 	register int j;
  367: 	mqtt_subscr_t *s = NULL;
  368: 
  369: 	if (!cfg || !sql || !topic)
  370: 		return NULL;
  371: 
  372: 	switch (retain) {
  373: 		case -1:
  374: 			memset(szStr, 0, sizeof szStr);
  375: 			break;
  376: 		case 0:
  377: 			snprintf(szStr, sizeof szStr, "AND Retain = 0");
  378: 			break;
  379: 		default:
  380: 			snprintf(szStr, sizeof szStr, "AND Retain > 0");
  381: 			break;
  382: 	}
  383: 
  384: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
  385: 	if (!str) {
  386: 		mqtt_rtlm_log("Error:: not found topics table name");
  387: 		return NULL;
  388: 	}
  389: 	snprintf(szStmt, sizeof szStmt, "SELECT Retain, Topic, Value FROM %s WHERE "
  390: 			"MsgID = %d AND Topic LIKE '%s' %s;", 
  391: 			str, msgid, topic, szStr);
  392: 
  393: 	sqlite3_mutex_enter(sqlite3_db_mutex(sql));
  394: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  395: 		MQTT_RTLM_LOG(sql);
  396: 		sqlite3_mutex_leave(sqlite3_db_mutex(sql));
  397: 		return NULL;
  398: 	}
  399: 
  400: 	/* calculate count of rows and allocate subscribe items */
  401: 	while (sqlite3_step(stmt) == SQLITE_ROW)
  402: 		rowz++;
  403: 	if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
  404: 		mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
  405: 		goto end;
  406: 	} else
  407: 		memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
  408: 	sqlite3_reset(stmt);
  409: 
  410: 	/* fill with data */
  411: 	for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
  412: 		s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
  413: 		s[j].sub_topic.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
  414: 		s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
  415: 		s[j].sub_value.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 2));
  416: 		s[j].sub_value.msg_len = strlen((char*) s[j].sub_value.msg_base);
  417: 	}
  418: end:
  419: 	sqlite3_finalize(stmt);
  420: 	sqlite3_mutex_leave(sqlite3_db_mutex(sql));
  421: 
  422: 	return s;
  423: }
  424: 
  425: /*
  426:  * mqtt_rtlm_write_subscribe() Subscribe topic
  427:  *
  428:  * @cfg = loaded config
  429:  * @sql = SQL handle
  430:  * @msgid = MessageID
  431:  * @topic = topic
  432:  * @user = username
  433:  * @host = hostname
  434:  * @qos = Subscribe QoS
  435:  * return: -1 error, 0 no publish or >0 published ok
  436:  */
  437: int
  438: mqtt_rtlm_write_subscribe(cfg_root_t *cfg, sqlite3 *sql, u_short msgid, const char *topic, 
  439: 		const char *user, const char *host, char qos)
  440: {
  441: 	int ret = 0;
  442: 	char *str, szStmt[BUFSIZ] = { 0 };
  443: 	sqlite3_stmt *stmt;
  444: 
  445: 	if (!cfg || !sql || !topic)
  446: 		return -1;
  447: 
  448: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
  449: 	if (!str) {
  450: 		mqtt_rtlm_log("Error:: not found subscribes table name");
  451: 		return -1;
  452: 	}
  453: 	snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (MsgID, QoS, Topic, PubUser, "
  454: 			"PubDate, PubHost) VALUES (%d, %d, '%s', '%s', "
  455: 			"datetime('now', 'localtime'), '%s');", str, 
  456: 			msgid, qos, topic, user, host);
  457: 
  458: 	sqlite3_mutex_enter(sqlite3_db_mutex(sql));
  459: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  460: 		MQTT_RTLM_LOG(sql);
  461: 		sqlite3_mutex_leave(sqlite3_db_mutex(sql));
  462: 		return -1;
  463: 	}
  464: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  465: 		ret = sqlite3_changes(sql);
  466: 	else {
  467: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  468: 			MQTT_RTLM_LOG(sql);
  469: 		ret = 0;
  470: 	}
  471: 	sqlite3_finalize(stmt);
  472: 	sqlite3_mutex_leave(sqlite3_db_mutex(sql));
  473: 
  474: 	return ret;
  475: }
  476: 
  477: /*
  478:  * mqtt_rtlm_delete_subscribe() Delete subscribe
  479:  *
  480:  * @cfg = loaded config
  481:  * @sql = SQL handle
  482:  * @topic = topic
  483:  * @user = username
  484:  * @host = hostname
  485:  * @qos = Subscribe QoS if -1 no matter
  486:  * return: -1 error, 0 no changes or >0 deleted rows
  487:  */
  488: int
  489: mqtt_rtlm_delete_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *topic, 
  490: 		const char *user, const char *host, char qos)
  491: {
  492: 	int ret = 0;
  493: 	char *str, szStr[STRSIZ] = { 0 }, szStmt[BUFSIZ] = { 0 };
  494: 	sqlite3_stmt *stmt;
  495: 
  496: 	if (!cfg || !sql || !topic)
  497: 		return -1;
  498: 
  499: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
  500: 	if (!str) {
  501: 		mqtt_rtlm_log("Error:: not found subscribes table name");
  502: 		return -1;
  503: 	}
  504: 	if (qos > -1 && qos < 3)
  505: 		snprintf(szStr, sizeof szStr, "AND QoS = %d", qos);
  506: 	snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE Topic LIKE '%s' AND "
  507: 			"PubUser LIKE '%s' AND PubHost LIKE '%s' %s;", str, 
  508: 			topic, user, host, szStr);
  509: 
  510: 	sqlite3_mutex_enter(sqlite3_db_mutex(sql));
  511: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  512: 		MQTT_RTLM_LOG(sql);
  513: 		sqlite3_mutex_leave(sqlite3_db_mutex(sql));
  514: 		return -1;
  515: 	}
  516: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  517: 		ret = sqlite3_changes(sql);
  518: 	else {
  519: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  520: 			MQTT_RTLM_LOG(sql);
  521: 		ret = 0;
  522: 	}
  523: 	sqlite3_finalize(stmt);
  524: 	sqlite3_mutex_leave(sqlite3_db_mutex(sql));
  525: 
  526: 	return ret;
  527: }
  528: 
  529: /*
  530:  * mqtt_rtlm_read_subscribe() Get subscribe topic
  531:  *
  532:  * @cfg = loaded config
  533:  * @sql = SQL handle
  534:  * @topic = topic
  535:  * return: NULL error or not found and !=NULL allocated subscribe topics
  536:  */
  537: mqtt_subscr_t *
  538: mqtt_rtlm_read_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *topic)
  539: {
  540: 	int rowz = 0;
  541: 	char *str, szStmt[BUFSIZ] = { 0 };
  542: 	sqlite3_stmt *stmt;
  543: 	register int j;
  544: 	mqtt_subscr_t *s = NULL;
  545: 
  546: 	if (!cfg || !sql || !topic)
  547: 		return NULL;
  548: 
  549: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
  550: 	if (!str) {
  551: 		mqtt_rtlm_log("Error:: not found subscribes table name");
  552: 		return NULL;
  553: 	}
  554: 	snprintf(szStmt, sizeof szStmt, "SELECT QoS, Topic FROM %s WHERE Topic LIKE '%s';", str, topic);
  555: 
  556: 	sqlite3_mutex_enter(sqlite3_db_mutex(sql));
  557: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  558: 		MQTT_RTLM_LOG(sql);
  559: 		sqlite3_mutex_leave(sqlite3_db_mutex(sql));
  560: 		return NULL;
  561: 	}
  562: 
  563: 	/* calculate count of rows and allocate subscribe items */
  564: 	while (sqlite3_step(stmt) == SQLITE_ROW)
  565: 		rowz++;
  566: 	if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
  567: 		mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
  568: 		goto end;
  569: 	} else
  570: 		memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
  571: 	sqlite3_reset(stmt);
  572: 
  573: 	/* fill with data */
  574: 	for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
  575: 		s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
  576: 		s[j].sub_topic.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
  577: 		s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
  578: 		s[j].sub_value.msg_base = NULL;
  579: 		s[j].sub_value.msg_len = 0;
  580: 	}
  581: end:
  582: 	sqlite3_finalize(stmt);
  583: 	sqlite3_mutex_leave(sqlite3_db_mutex(sql));
  584: 
  585: 	return s;
  586: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>