File:  [ELWIX - Embedded LightWeight unIX -] / mqtt / src / pubmqtt.c
Revision 1.2.2.4: download - view: text, annotated - select for diffs - revision graph
Wed Apr 25 07:37:16 2012 UTC (12 years, 2 months ago) by misho
Branches: mqtt1_1
fix multithread serialize support of sqlite3

    1: #include "global.h"
    2: 
    3: 
    4: extern const char sql_schema[];
    5: 
    6: 
    7: /*
    8:  * mqtt_db_log() Log database connection message
    9:  *
   10:  * @fmt = format string
   11:  * @... = argument list
   12:  * return: none
   13:  */
   14: static void
   15: mqtt_rtlm_log(const char *fmt, ...)
   16: {
   17: 	va_list lst;
   18: 
   19: 	va_start(lst, fmt);
   20: 	vsyslog(LOG_ERR, fmt, lst);
   21: 	va_end(lst);
   22: }
   23: #define MQTT_RTLM_LOG(_sql)	(assert((_sql)), mqtt_rtlm_log("Error:: SQL #%d - %s", \
   24: 					sqlite3_errcode((_sql)), sqlite3_errmsg((_sql))))
   25: 
   26: 
   27: /*
   28:  * mqtt_rtlm_open() Open database connection
   29:  *
   30:  * @cfg = config filename
   31:  * return: NULL error or SQL handle
   32:  */
   33: sqlite3 *
   34: mqtt_rtlm_open(cfg_root_t *cfg)
   35: {
   36: 	sqlite3 *sql = NULL;
   37: 	const char *str = NULL;
   38: 
   39: 	if (!cfg)
   40: 		return NULL;
   41: 
   42: 	sqlite3_config(SQLITE_CONFIG_SERIALIZED);
   43: 	if (!sqlite3_threadsafe())
   44: 		return NULL;
   45: 
   46: 	str = cfg_getAttribute(cfg, "mqtt_pub", "name");
   47: 	if (!str) {
   48: 		mqtt_rtlm_log("Error:: Unknown database name ...\n");
   49: 		return NULL;
   50: 	}
   51: 
   52: 	if (sqlite3_open_v2(str, &sql, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL)) {
   53: 		MQTT_RTLM_LOG(sql);
   54: 		sqlite3_close(sql);
   55: 		return NULL;
   56: 	}
   57: 
   58: 	sqlite3_mutex_enter(sqlite3_db_mutex(sql));
   59: 	if (sqlite3_exec(sql, sql_schema, NULL, NULL, NULL)) {
   60: 		MQTT_RTLM_LOG(sql);
   61: 		sqlite3_mutex_leave(sqlite3_db_mutex(sql));
   62: 		sqlite3_close(sql);
   63: 		return NULL;
   64: 	}
   65: 	sqlite3_mutex_leave(sqlite3_db_mutex(sql));
   66: 	return sql;
   67: }
   68: 
   69: /*
   70:  * mqtt_rtlm_close() Close database connection
   71:  *
   72:  * @sql = SQL handle
   73:  * return: none
   74:  */
   75: void
   76: mqtt_rtlm_close(sqlite3 *sql)
   77: {
   78: 	sqlite3_close(sql);
   79: }
   80: 
   81: /*
   82:  * mqtt_rtlm_init_session() Create session
   83:  *
   84:  * @cfg = loaded config
   85:  * @sql = SQL handle
   86:  * @connid = connection id
   87:  * @user = username
   88:  * @host = hostname
   89:  * @will = will flag if !=0 must fill arguments
   90:  * @... = will arguments in order topic,msg,qos,retain
   91:  * return: -1 error, 0 session already appears or >0 row changed
   92:  */
   93: int
   94: mqtt_rtlm_init_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, 
   95: 		const char *host, char will, ...)
   96: {
   97: 	va_list lst;
   98: 	int ret = 0;
   99: 	char *str, szStmt[BUFSIZ] = { 0 };
  100: 	sqlite3_stmt *stmt;
  101: 
  102: 	if (!cfg || !sql)
  103: 		return -1;
  104: 
  105: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
  106: 	if (!str) {
  107: 		mqtt_rtlm_log("Error:: not found online table name");
  108: 		return -1;
  109: 	}
  110: 	if (!will)
  111: 		snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (ConnID, Username, RemoteHost, "
  112: 				"WillFlag) VALUES ('%s', '%s', '%s', 0);", str, connid, user, host);
  113: 	else {
  114: 		va_start(lst, will);
  115: 		snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (ConnID, Username, RemoteHost, "
  116: 				"WillFlag, WillRetain, WillQoS, WillMsg, WillTopic) "
  117: 				"VALUES ('%s', '%s', '%s', %d, %d, %d, '%s', '%s');", 
  118: 				str, connid, user, host, will, 
  119: 				va_arg(lst, int), va_arg(lst, int), va_arg(lst, char*), va_arg(lst, char*));
  120: 		va_end(lst);
  121: 	}
  122: 
  123: 	sqlite3_mutex_enter(sqlite3_db_mutex(sql));
  124: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  125: 		MQTT_RTLM_LOG(sql);
  126: 		sqlite3_mutex_leave(sqlite3_db_mutex(sql));
  127: 		return -1;
  128: 	}
  129: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  130: 		ret = sqlite3_changes(sql);
  131: 	else {
  132: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  133: 			MQTT_RTLM_LOG(sql);
  134: 		ret = 0;
  135: 	}
  136: 	sqlite3_finalize(stmt);
  137: 	sqlite3_mutex_leave(sqlite3_db_mutex(sql));
  138: 
  139: 	return ret;
  140: }
  141: 
  142: /*
  143:  * mqtt_rtlm_fini_session() Delete session(s)
  144:  *
  145:  * @cfg = loaded config
  146:  * @sql = SQL handle
  147:  * @connid = connection id
  148:  * @user = username
  149:  * @host = hostname
  150:  * return: -1 error, 0 session already appears or >0 row changed
  151:  */
  152: int
  153: mqtt_rtlm_fini_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
  154: {
  155: 	int ret = 0;
  156: 	char *str, szStmt[BUFSIZ] = { 0 };
  157: 	sqlite3_stmt *stmt;
  158: 
  159: 	if (!cfg || !sql)
  160: 		return -1;
  161: 
  162: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
  163: 	if (!str) {
  164: 		mqtt_rtlm_log("Error:: not found online table name");
  165: 		return -1;
  166: 	}
  167: 	snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE ConnID = '%s' AND Username = '%s' "
  168: 			"AND RemoteHost LIKE '%s';", str, connid, user, host);
  169: 
  170: 	sqlite3_mutex_enter(sqlite3_db_mutex(sql));
  171: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  172: 		MQTT_RTLM_LOG(sql);
  173: 		sqlite3_mutex_leave(sqlite3_db_mutex(sql));
  174: 		return -1;
  175: 	}
  176: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  177: 		ret = sqlite3_changes(sql);
  178: 	else {
  179: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  180: 			MQTT_RTLM_LOG(sql);
  181: 		ret = 0;
  182: 	}
  183: 	sqlite3_finalize(stmt);
  184: 	sqlite3_mutex_leave(sqlite3_db_mutex(sql));
  185: 
  186: 	return ret;
  187: }
  188: 
  189: /*
  190:  * mqtt_rtlm_chk_session() Check session(s)
  191:  *
  192:  * @cfg = loaded config
  193:  * @sql = SQL handle
  194:  * @connid = connection id
  195:  * @user = username
  196:  * @host = hostname
  197:  * return: -1 error, 0 not logged or >0 logged found rows
  198:  */
  199: int
  200: mqtt_rtlm_chk_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
  201: {
  202: 	int ret = 0;
  203: 	char *str, szStmt[BUFSIZ] = { 0 };
  204: 	sqlite3_stmt *stmt;
  205: 
  206: 	if (!cfg || !sql)
  207: 		return -1;
  208: 
  209: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
  210: 	if (!str) {
  211: 		mqtt_rtlm_log("Error:: not found online table name");
  212: 		return -1;
  213: 	}
  214: 	snprintf(szStmt, sizeof szStmt, "SELECT ConnID, RemoteHost FROM %s WHERE "
  215: 			"ConnID = '%s' AND Username = '%s' AND RemoteHost LIKE '%s';", 
  216: 			str, connid, user, host);
  217: 
  218: 	sqlite3_mutex_enter(sqlite3_db_mutex(sql));
  219: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  220: 		MQTT_RTLM_LOG(sql);
  221: 		sqlite3_mutex_leave(sqlite3_db_mutex(sql));
  222: 		return -1;
  223: 	}
  224: 	if (sqlite3_step(stmt) == SQLITE_ROW)
  225: 		ret = sqlite3_changes(sql);
  226: 	else
  227: 		ret = 0;
  228: 	sqlite3_finalize(stmt);
  229: 	sqlite3_mutex_leave(sqlite3_db_mutex(sql));
  230: 
  231: 	return ret;
  232: }
  233: 
  234: /*
  235:  * mqtt_rtlm_write_topic() Publish topic
  236:  *
  237:  * @cfg = loaded config
  238:  * @sql = SQL handle
  239:  * @msgid = MessageID
  240:  * @topic = topic
  241:  * @txt = text
  242:  * @user = username
  243:  * @host = hostname
  244:  * @retain = !=0 retain message to database
  245:  * return: -1 error, 0 no publish or >0 published ok
  246:  */
  247: int
  248: mqtt_rtlm_write_topic(cfg_root_t *cfg, sqlite3 *sql, u_short msgid, const char *topic, const char *txt, 
  249: 		const char *user, const char *host, char retain)
  250: {
  251: 	int ret = 0;
  252: 	char *str, szStmt[BUFSIZ] = { 0 };
  253: 	sqlite3_stmt *stmt;
  254: 
  255: 	if (!cfg || !sql || !topic)
  256: 		return -1;
  257: 
  258: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
  259: 	if (!str) {
  260: 		mqtt_rtlm_log("Error:: not found topics table name");
  261: 		return -1;
  262: 	}
  263: 	snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Retain, MsgID, Topic, Value, PubUser, "
  264: 			"PubDate, PubHost) VALUES (%d, %d, '%s', '%s', '%s', "
  265: 			"datetime('now', 'localtime'), '%s');", 
  266: 			str, retain, msgid, topic, txt, user, host);
  267: 
  268: 	sqlite3_mutex_enter(sqlite3_db_mutex(sql));
  269: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  270: 		MQTT_RTLM_LOG(sql);
  271: 		sqlite3_mutex_leave(sqlite3_db_mutex(sql));
  272: 		return -1;
  273: 	}
  274: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  275: 		ret = sqlite3_changes(sql);
  276: 	else {
  277: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  278: 			MQTT_RTLM_LOG(sql);
  279: 		ret = 0;
  280: 	}
  281: 	sqlite3_finalize(stmt);
  282: 	sqlite3_mutex_leave(sqlite3_db_mutex(sql));
  283: 
  284: 	return ret;
  285: }
  286: 
  287: /*
  288:  * mqtt_rtlm_delete_topic() Delete topic
  289:  *
  290:  * @cfg = loaded config
  291:  * @sql = SQL handle
  292:  * @msgid = MessageID
  293:  * @topic = topic
  294:  * @user = username
  295:  * @host = hostname
  296:  * @retain = -1 no matter
  297:  * return: -1 error, 0 no changes or >0 deleted rows
  298:  */
  299: int
  300: mqtt_rtlm_delete_topic(cfg_root_t *cfg, sqlite3 *sql, u_short msgid, const char *topic, 
  301: 		const char *user, const char *host, char retain)
  302: {
  303: 	int ret = 0;
  304: 	char *str, *rtn, szStmt[BUFSIZ] = { 0 };
  305: 	sqlite3_stmt *stmt;
  306: 
  307: 	if (!cfg || !sql || !topic)
  308: 		return -1;
  309: 
  310: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
  311: 	if (!str) {
  312: 		mqtt_rtlm_log("Error:: not found topics table name");
  313: 		return -1;
  314: 	}
  315: 	switch (retain) {
  316: 		case -1:
  317: 			rtn = "";
  318: 			break;
  319: 		case 0:
  320: 			rtn = "AND Retain = 0";
  321: 			break;
  322: 		default:
  323: 			rtn = "AND Retain != 0";
  324: 			break;
  325: 	}
  326: 	snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE MsgID = %d AND Topic LIKE '%s' AND "
  327: 			"PubUser LIKE '%s' AND PubHost LIKE '%s' %s;", str, 
  328: 			msgid, topic, user, host, rtn);
  329: 
  330: 	sqlite3_mutex_enter(sqlite3_db_mutex(sql));
  331: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  332: 		MQTT_RTLM_LOG(sql);
  333: 		sqlite3_mutex_leave(sqlite3_db_mutex(sql));
  334: 		return -1;
  335: 	}
  336: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  337: 		ret = sqlite3_changes(sql);
  338: 	else {
  339: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  340: 			MQTT_RTLM_LOG(sql);
  341: 		ret = 0;
  342: 	}
  343: 	sqlite3_finalize(stmt);
  344: 	sqlite3_mutex_leave(sqlite3_db_mutex(sql));
  345: 
  346: 	return ret;
  347: }
  348: 
  349: /*
  350:  * mqtt_rtlm_read_topic() Get topic
  351:  *
  352:  * @cfg = loaded config
  353:  * @sql = SQL handle
  354:  * @msgid = MessageID
  355:  * @topic = topic
  356:  * @retain = retain 0 get only dynamic, >0 get only retained and -1 no matter
  357:  * return: NULL error or not found and !=NULL allocated subscribe topics
  358:  */
  359: mqtt_subscr_t *
  360: mqtt_rtlm_read_topic(cfg_root_t *cfg, sqlite3 *sql, u_short msgid, const char *topic, char retain)
  361: {
  362: 	int rowz = 0;
  363: 	char *str, szStr[STRSIZ], szStmt[BUFSIZ] = { 0 };
  364: 	sqlite3_stmt *stmt;
  365: 	register int j;
  366: 	mqtt_subscr_t *s = NULL;
  367: 
  368: 	if (!cfg || !sql || !topic)
  369: 		return NULL;
  370: 
  371: 	switch (retain) {
  372: 		case -1:
  373: 			memset(szStr, 0, sizeof szStr);
  374: 			break;
  375: 		case 0:
  376: 			snprintf(szStr, sizeof szStr, "AND Retain = 0");
  377: 			break;
  378: 		default:
  379: 			snprintf(szStr, sizeof szStr, "AND Retain > 0");
  380: 			break;
  381: 	}
  382: 
  383: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
  384: 	if (!str) {
  385: 		mqtt_rtlm_log("Error:: not found topics table name");
  386: 		return NULL;
  387: 	}
  388: 	snprintf(szStmt, sizeof szStmt, "SELECT Retain, Topic, Value FROM %s WHERE "
  389: 			"MsgID = %d AND Topic LIKE '%s' %s;", 
  390: 			str, msgid, topic, szStr);
  391: 
  392: 	sqlite3_mutex_enter(sqlite3_db_mutex(sql));
  393: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  394: 		MQTT_RTLM_LOG(sql);
  395: 		sqlite3_mutex_leave(sqlite3_db_mutex(sql));
  396: 		return NULL;
  397: 	}
  398: 
  399: 	/* calculate count of rows and allocate subscribe items */
  400: 	while (sqlite3_step(stmt) == SQLITE_ROW)
  401: 		rowz++;
  402: 	if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
  403: 		mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
  404: 		goto end;
  405: 	} else
  406: 		memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
  407: 	sqlite3_reset(stmt);
  408: 
  409: 	/* fill with data */
  410: 	for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
  411: 		s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
  412: 		s[j].sub_topic.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
  413: 		s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
  414: 		s[j].sub_value.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 2));
  415: 		s[j].sub_value.msg_len = strlen((char*) s[j].sub_value.msg_base);
  416: 	}
  417: end:
  418: 	sqlite3_finalize(stmt);
  419: 	sqlite3_mutex_leave(sqlite3_db_mutex(sql));
  420: 
  421: 	return s;
  422: }
  423: 
  424: /*
  425:  * mqtt_rtlm_write_subscribe() Subscribe topic
  426:  *
  427:  * @cfg = loaded config
  428:  * @sql = SQL handle
  429:  * @msgid = MessageID
  430:  * @topic = topic
  431:  * @user = username
  432:  * @host = hostname
  433:  * @qos = Subscribe QoS
  434:  * return: -1 error, 0 no publish or >0 published ok
  435:  */
  436: int
  437: mqtt_rtlm_write_subscribe(cfg_root_t *cfg, sqlite3 *sql, u_short msgid, const char *topic, 
  438: 		const char *user, const char *host, char qos)
  439: {
  440: 	int ret = 0;
  441: 	char *str, szStmt[BUFSIZ] = { 0 };
  442: 	sqlite3_stmt *stmt;
  443: 
  444: 	if (!cfg || !sql || !topic)
  445: 		return -1;
  446: 
  447: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
  448: 	if (!str) {
  449: 		mqtt_rtlm_log("Error:: not found subscribes table name");
  450: 		return -1;
  451: 	}
  452: 	snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (MsgID, QoS, Topic, PubUser, "
  453: 			"PubDate, PubHost) VALUES (%d, %d, '%s', '%s', "
  454: 			"datetime('now', 'localtime'), '%s');", str, 
  455: 			msgid, qos, topic, user, host);
  456: 
  457: 	sqlite3_mutex_enter(sqlite3_db_mutex(sql));
  458: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  459: 		MQTT_RTLM_LOG(sql);
  460: 		sqlite3_mutex_leave(sqlite3_db_mutex(sql));
  461: 		return -1;
  462: 	}
  463: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  464: 		ret = sqlite3_changes(sql);
  465: 	else {
  466: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  467: 			MQTT_RTLM_LOG(sql);
  468: 		ret = 0;
  469: 	}
  470: 	sqlite3_finalize(stmt);
  471: 	sqlite3_mutex_leave(sqlite3_db_mutex(sql));
  472: 
  473: 	return ret;
  474: }
  475: 
  476: /*
  477:  * mqtt_rtlm_delete_subscribe() Delete subscribe
  478:  *
  479:  * @cfg = loaded config
  480:  * @sql = SQL handle
  481:  * @topic = topic
  482:  * @user = username
  483:  * @host = hostname
  484:  * @qos = Subscribe QoS if -1 no matter
  485:  * return: -1 error, 0 no changes or >0 deleted rows
  486:  */
  487: int
  488: mqtt_rtlm_delete_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *topic, 
  489: 		const char *user, const char *host, char qos)
  490: {
  491: 	int ret = 0;
  492: 	char *str, szStr[STRSIZ] = { 0 }, szStmt[BUFSIZ] = { 0 };
  493: 	sqlite3_stmt *stmt;
  494: 
  495: 	if (!cfg || !sql || !topic)
  496: 		return -1;
  497: 
  498: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
  499: 	if (!str) {
  500: 		mqtt_rtlm_log("Error:: not found subscribes table name");
  501: 		return -1;
  502: 	}
  503: 	if (qos > -1 && qos < 3)
  504: 		snprintf(szStr, sizeof szStr, "AND QoS = %d", qos);
  505: 	snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE Topic LIKE '%s' AND "
  506: 			"PubUser LIKE '%s' AND PubHost LIKE '%s' %s;", str, 
  507: 			topic, user, host, szStr);
  508: 
  509: 	sqlite3_mutex_enter(sqlite3_db_mutex(sql));
  510: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  511: 		MQTT_RTLM_LOG(sql);
  512: 		sqlite3_mutex_leave(sqlite3_db_mutex(sql));
  513: 		return -1;
  514: 	}
  515: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  516: 		ret = sqlite3_changes(sql);
  517: 	else {
  518: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  519: 			MQTT_RTLM_LOG(sql);
  520: 		ret = 0;
  521: 	}
  522: 	sqlite3_finalize(stmt);
  523: 	sqlite3_mutex_leave(sqlite3_db_mutex(sql));
  524: 
  525: 	return ret;
  526: }
  527: 
  528: /*
  529:  * mqtt_rtlm_read_subscribe() Get subscribe topic
  530:  *
  531:  * @cfg = loaded config
  532:  * @sql = SQL handle
  533:  * @topic = topic
  534:  * return: NULL error or not found and !=NULL allocated subscribe topics
  535:  */
  536: mqtt_subscr_t *
  537: mqtt_rtlm_read_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *topic)
  538: {
  539: 	int rowz = 0;
  540: 	char *str, szStmt[BUFSIZ] = { 0 };
  541: 	sqlite3_stmt *stmt;
  542: 	register int j;
  543: 	mqtt_subscr_t *s = NULL;
  544: 
  545: 	if (!cfg || !sql || !topic)
  546: 		return NULL;
  547: 
  548: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
  549: 	if (!str) {
  550: 		mqtt_rtlm_log("Error:: not found subscribes table name");
  551: 		return NULL;
  552: 	}
  553: 	snprintf(szStmt, sizeof szStmt, "SELECT QoS, Topic FROM %s WHERE Topic LIKE '%s';", str, topic);
  554: 
  555: 	sqlite3_mutex_enter(sqlite3_db_mutex(sql));
  556: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  557: 		MQTT_RTLM_LOG(sql);
  558: 		sqlite3_mutex_leave(sqlite3_db_mutex(sql));
  559: 		return NULL;
  560: 	}
  561: 
  562: 	/* calculate count of rows and allocate subscribe items */
  563: 	while (sqlite3_step(stmt) == SQLITE_ROW)
  564: 		rowz++;
  565: 	if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
  566: 		mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
  567: 		goto end;
  568: 	} else
  569: 		memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
  570: 	sqlite3_reset(stmt);
  571: 
  572: 	/* fill with data */
  573: 	for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
  574: 		s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
  575: 		s[j].sub_topic.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
  576: 		s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
  577: 		s[j].sub_value.msg_base = NULL;
  578: 		s[j].sub_value.msg_len = 0;
  579: 	}
  580: end:
  581: 	sqlite3_finalize(stmt);
  582: 	sqlite3_mutex_leave(sqlite3_db_mutex(sql));
  583: 
  584: 	return s;
  585: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>