File:  [ELWIX - Embedded LightWeight unIX -] / mqtt / src / pubmqtt.c
Revision 1.2.2.11: download - view: text, annotated - select for diffs - revision graph
Tue Jun 26 08:05:58 2012 UTC (12 years ago) by misho
Branches: mqtt1_1
change db pub model

    1: #include "global.h"
    2: 
    3: 
    4: extern const char sql_schema[];
    5: 
    6: 
    7: /*
    8:  * mqtt_db_log() Log database connection message
    9:  *
   10:  * @fmt = format string
   11:  * @... = argument list
   12:  * return: none
   13:  */
   14: static void
   15: mqtt_rtlm_log(const char *fmt, ...)
   16: {
   17: 	va_list lst;
   18: 
   19: 	va_start(lst, fmt);
   20: 	vsyslog(LOG_ERR, fmt, lst);
   21: 	va_end(lst);
   22: }
   23: #define MQTT_RTLM_LOG(_sql)	(assert((_sql)), mqtt_rtlm_log("Error:: %s(%d) SQL #%d - %s", \
   24: 					__func__, __LINE__, \
   25: 					sqlite3_errcode((_sql)), sqlite3_errmsg((_sql))))
   26: 
   27: /* library pre-loaded actions */
   28: void
   29: _init()
   30: {
   31: 	sqlite3_initialize();
   32: }
   33: 
   34: void
   35: _fini()
   36: {
   37: 	sqlite3_shutdown();
   38: }
   39: 
   40: 
   41: /*
   42:  * mqtt_rtlm_open() Open database connection
   43:  *
   44:  * @cfg = config filename
   45:  * return: NULL error or SQL handle
   46:  */
   47: sqlite3 *
   48: mqtt_rtlm_open(cfg_root_t *cfg)
   49: {
   50: 	sqlite3 *sql = NULL;
   51: 	const char *str = NULL;
   52: 
   53: 	if (!cfg)
   54: 		return NULL;
   55: 
   56: 	str = cfg_getAttribute(cfg, "mqtt_pub", "name");
   57: 	if (!str) {
   58: 		mqtt_rtlm_log("Error:: Unknown database name ...\n");
   59: 		return NULL;
   60: 	}
   61: 
   62: 	if (sqlite3_open_v2(str, &sql, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL)) {
   63: 		MQTT_RTLM_LOG(sql);
   64: 		sqlite3_close(sql);
   65: 		return NULL;
   66: 	}
   67: 
   68: 	if (sqlite3_exec(sql, sql_schema, NULL, NULL, NULL)) {
   69: 		MQTT_RTLM_LOG(sql);
   70: 		sqlite3_close(sql);
   71: 		return NULL;
   72: 	}
   73: 	return sql;
   74: }
   75: 
   76: /*
   77:  * mqtt_rtlm_close() Close database connection
   78:  *
   79:  * @sql = SQL handle
   80:  * return: none
   81:  */
   82: void
   83: mqtt_rtlm_close(sqlite3 *sql)
   84: {
   85: 	sqlite3_close(sql);
   86: }
   87: 
   88: /*
   89:  * mqtt_rtlm_init_session() Create session
   90:  *
   91:  * @cfg = loaded config
   92:  * @sql = SQL handle
   93:  * @connid = connection id
   94:  * @user = username
   95:  * @host = hostname
   96:  * @will = will flag if !=0 must fill arguments
   97:  * @... = will arguments in order topic,msg,qos,retain
   98:  * return: -1 error, 0 session already appears or >0 row changed
   99:  */
  100: int
  101: mqtt_rtlm_init_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, 
  102: 		const char *host, char will, ...)
  103: {
  104: 	va_list lst;
  105: 	int ret = 0;
  106: 	char *str, *psStmt;
  107: 	sqlite3_stmt *stmt;
  108: 
  109: 	if (!cfg || !sql)
  110: 		return -1;
  111: 
  112: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
  113: 	if (!str) {
  114: 		mqtt_rtlm_log("Error:: not found online table name");
  115: 		return -1;
  116: 	}
  117: 	if (!will)
  118: 		psStmt = sqlite3_mprintf("INSERT INTO %s (ConnID, Username, RemoteHost, "
  119: 				"WillFlag) VALUES ('%q', '%q', '%q', 0);", str, connid, user, host);
  120: 	else {
  121: 		va_start(lst, will);
  122: 		psStmt = sqlite3_mprintf("INSERT INTO %s (ConnID, Username, RemoteHost, "
  123: 				"WillFlag, WillRetain, WillQoS, WillMsg, WillTopic) "
  124: 				"VALUES ('%q', '%q', '%q', %d, %d, %d, '%q', '%q');", 
  125: 				str, connid, user, host, will, 
  126: 				va_arg(lst, int), va_arg(lst, int), va_arg(lst, char*), va_arg(lst, char*));
  127: 		va_end(lst);
  128: 	}
  129: 
  130: 	if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
  131: 		MQTT_RTLM_LOG(sql);
  132: 		sqlite3_free(psStmt);
  133: 		return -1;
  134: 	} else
  135: 		sqlite3_free(psStmt);
  136: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  137: 		ret = sqlite3_changes(sql);
  138: 	else {
  139: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  140: 			MQTT_RTLM_LOG(sql);
  141: 		ret = 0;
  142: 	}
  143: 	sqlite3_finalize(stmt);
  144: 
  145: 	return ret;
  146: }
  147: 
  148: /*
  149:  * mqtt_rtlm_fini_session() Delete session(s)
  150:  *
  151:  * @cfg = loaded config
  152:  * @sql = SQL handle
  153:  * @connid = connection id
  154:  * @user = username
  155:  * @host = hostname
  156:  * return: -1 error, 0 session already appears or >0 row changed
  157:  */
  158: int
  159: mqtt_rtlm_fini_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
  160: {
  161: 	int ret = 0;
  162: 	char *str, *psStmt;
  163: 	sqlite3_stmt *stmt;
  164: 
  165: 	if (!cfg || !sql)
  166: 		return -1;
  167: 
  168: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
  169: 	if (!str) {
  170: 		mqtt_rtlm_log("Error:: not found online table name");
  171: 		return -1;
  172: 	}
  173: 	psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND Username = '%q' "
  174: 			"AND RemoteHost LIKE '%q';", str, connid, user, host);
  175: 
  176: 	if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
  177: 		MQTT_RTLM_LOG(sql);
  178: 		sqlite3_free(psStmt);
  179: 		return -1;
  180: 	} else
  181: 		sqlite3_free(psStmt);
  182: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  183: 		ret = sqlite3_changes(sql);
  184: 	else {
  185: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  186: 			MQTT_RTLM_LOG(sql);
  187: 		ret = 0;
  188: 	}
  189: 	sqlite3_finalize(stmt);
  190: 
  191: 	return ret;
  192: }
  193: 
  194: /*
  195:  * mqtt_rtlm_chk_session() Check session(s)
  196:  *
  197:  * @cfg = loaded config
  198:  * @sql = SQL handle
  199:  * @connid = connection id
  200:  * @user = username
  201:  * @host = hostname
  202:  * return: -1 error, 0 not logged or >0 logged found rows
  203:  */
  204: int
  205: mqtt_rtlm_chk_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
  206: {
  207: 	int ret = 0;
  208: 	char *str, *psStmt;
  209: 	sqlite3_stmt *stmt;
  210: 
  211: 	if (!cfg || !sql)
  212: 		return -1;
  213: 
  214: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
  215: 	if (!str) {
  216: 		mqtt_rtlm_log("Error:: not found online table name");
  217: 		return -1;
  218: 	}
  219: 	psStmt = sqlite3_mprintf("SELECT ConnID, RemoteHost FROM %s WHERE "
  220: 			"ConnID = '%q' AND Username LIKE '%q' AND RemoteHost LIKE '%q';", 
  221: 			str, connid, user, host);
  222: 
  223: 	if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
  224: 		MQTT_RTLM_LOG(sql);
  225: 		sqlite3_free(psStmt);
  226: 		return -1;
  227: 	} else
  228: 		sqlite3_free(psStmt);
  229: 	if (sqlite3_step(stmt) == SQLITE_ROW)
  230: 		ret = sqlite3_changes(sql);
  231: 	else
  232: 		ret = 0;
  233: 	sqlite3_finalize(stmt);
  234: 
  235: 	return ret;
  236: }
  237: 
  238: /*
  239:  * mqtt_rtlm_write_topic() Publish topic
  240:  *
  241:  * @cfg = loaded config
  242:  * @sql = SQL handle
  243:  * @connid = connection id
  244:  * @msgid = MessageID
  245:  * @topic = topic
  246:  * @txt = text
  247:  * @txtlen = text length
  248:  * @user = username
  249:  * @host = hostname
  250:  * @retain = !=0 retain message to database
  251:  * return: -1 error, 0 no publish or >0 published ok
  252:  */
  253: int
  254: mqtt_rtlm_write_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
  255: 		const char *topic, void *txt, int txtlen, const char *user, const char *host, char retain)
  256: {
  257: 	int ret = 0;
  258: 	char *str, *psStmt;
  259: 	sqlite3_stmt *stmt;
  260: 
  261: 	if (!cfg || !sql || !topic)
  262: 		return -1;
  263: 
  264: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
  265: 	if (!str) {
  266: 		mqtt_rtlm_log("Error:: not found topics table name");
  267: 		return -1;
  268: 	}
  269: 	psStmt = sqlite3_mprintf("INSERT INTO %s (Retain, ConnID, MsgID, Topic, Value, PubUser, "
  270: 			"PubDate, PubHost) VALUES (%d, '%q', %u, '%q', ?1, '%q', "
  271: 			"datetime('now', 'localtime'), '%q');", 
  272: 			str, retain, connid, msgid, topic, txt, user, host);
  273: 
  274: 	if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
  275: 		MQTT_RTLM_LOG(sql);
  276: 		sqlite3_free(psStmt);
  277: 		return -1;
  278: 	} else
  279: 		sqlite3_free(psStmt);
  280: 	if (sqlite3_bind_blob(stmt, 1, txt, txtlen, SQLITE_TRANSIENT)) {
  281: 		MQTT_RTLM_LOG(sql);
  282: 		sqlite3_finalize(stmt);
  283: 		return -1;
  284: 	}
  285: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  286: 		ret = sqlite3_changes(sql);
  287: 	else {
  288: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  289: 			MQTT_RTLM_LOG(sql);
  290: 		ret = 0;
  291: 	}
  292: 	sqlite3_finalize(stmt);
  293: 
  294: 	return ret;
  295: }
  296: 
  297: /*
  298:  * mqtt_rtlm_wipe_topic() Wipe all topics
  299:  *
  300:  * @cfg = loaded config
  301:  * @sql = SQL handle
  302:  * @connid = connection id
  303:  * @user = username
  304:  * @retain = -1 no matter
  305:  * return: -1 error, 0 no changes or >0 deleted rows
  306:  */
  307: int
  308: mqtt_rtlm_wipe_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, char retain)
  309: {
  310: 	int ret = 0;
  311: 	char *str, *rtn, *psStmt;
  312: 	sqlite3_stmt *stmt;
  313: 
  314: 	if (!cfg || !sql || !connid)
  315: 		return -1;
  316: 
  317: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
  318: 	if (!str) {
  319: 		mqtt_rtlm_log("Error:: not found topics table name");
  320: 		return -1;
  321: 	}
  322: 	switch (retain) {
  323: 		case -1:
  324: 			rtn = "";
  325: 			break;
  326: 		case 0:
  327: 			rtn = "AND Retain = 0";
  328: 			break;
  329: 		default:
  330: 			rtn = "AND Retain != 0";
  331: 			break;
  332: 	}
  333: 	psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND "
  334: 			"PubUser LIKE '%q' %s;", str, connid, user, rtn);
  335: 
  336: 	if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
  337: 		MQTT_RTLM_LOG(sql);
  338: 		sqlite3_free(psStmt);
  339: 		return -1;
  340: 	} else
  341: 		sqlite3_free(psStmt);
  342: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  343: 		ret = sqlite3_changes(sql);
  344: 	else {
  345: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  346: 			MQTT_RTLM_LOG(sql);
  347: 		ret = 0;
  348: 	}
  349: 	sqlite3_finalize(stmt);
  350: 
  351: 	return ret;
  352: }
  353: 
  354: /*
  355:  * mqtt_rtlm_delete_topic() Delete topic
  356:  *
  357:  * @cfg = loaded config
  358:  * @sql = SQL handle
  359:  * @connid = connection id
  360:  * @msgid = MessageID
  361:  * @topic = topic
  362:  * @user = username
  363:  * @host = hostname
  364:  * @retain = -1 no matter
  365:  * return: -1 error, 0 no changes or >0 deleted rows
  366:  */
  367: int
  368: mqtt_rtlm_delete_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
  369: 		const char *topic, const char *user, const char *host, char retain)
  370: {
  371: 	int ret = 0;
  372: 	char *str, *rtn, *psStmt;
  373: 	sqlite3_stmt *stmt;
  374: 
  375: 	if (!cfg || !sql || !topic)
  376: 		return -1;
  377: 
  378: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
  379: 	if (!str) {
  380: 		mqtt_rtlm_log("Error:: not found topics table name");
  381: 		return -1;
  382: 	}
  383: 	switch (retain) {
  384: 		case -1:
  385: 			rtn = "";
  386: 			break;
  387: 		case 0:
  388: 			rtn = "AND Retain = 0";
  389: 			break;
  390: 		default:
  391: 			rtn = "AND Retain != 0";
  392: 			break;
  393: 	}
  394: 	psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND MsgID = %d AND "
  395: 			"Topic LIKE '%q' AND PubUser LIKE '%q' AND PubHost LIKE '%q' %s;", str, 
  396: 			connid, msgid, topic, user, host, rtn);
  397: 
  398: 	if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
  399: 		MQTT_RTLM_LOG(sql);
  400: 		sqlite3_free(psStmt);
  401: 		return -1;
  402: 	} else
  403: 		sqlite3_free(psStmt);
  404: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  405: 		ret = sqlite3_changes(sql);
  406: 	else {
  407: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  408: 			MQTT_RTLM_LOG(sql);
  409: 		ret = 0;
  410: 	}
  411: 	sqlite3_finalize(stmt);
  412: 
  413: 	return ret;
  414: }
  415: 
  416: /*
  417:  * mqtt_rtlm_read_topic() Get topic
  418:  *
  419:  * @cfg = loaded config
  420:  * @sql = SQL handle
  421:  * @connid = connection id
  422:  * @msgid = MessageID
  423:  * @topic = topic
  424:  * @retain = retain 0 get only dynamic, >0 get only retained and -1 no matter
  425:  * return: NULL error or not found and !=NULL allocated subscribe topics
  426:  */
  427: mqtt_subscr_t *
  428: mqtt_rtlm_read_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
  429: 		const char *topic, char retain)
  430: {
  431: 	int rowz = 0;
  432: 	char *str, szStr[STRSIZ], *psStmt;
  433: 	sqlite3_stmt *stmt;
  434: 	register int j;
  435: 	mqtt_subscr_t *s = NULL;
  436: 	ait_val_t v;
  437: 
  438: 	if (!cfg || !sql || !topic)
  439: 		return NULL;
  440: 
  441: 	switch (retain) {
  442: 		case -1:
  443: 			memset(szStr, 0, sizeof szStr);
  444: 			break;
  445: 		case 0:
  446: 			snprintf(szStr, sizeof szStr, "AND Retain = 0");
  447: 			break;
  448: 		default:
  449: 			snprintf(szStr, sizeof szStr, "AND Retain > 0");
  450: 			break;
  451: 	}
  452: 
  453: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
  454: 	if (!str) {
  455: 		mqtt_rtlm_log("Error:: not found topics table name");
  456: 		return NULL;
  457: 	}
  458: 	psStmt = sqlite3_mprintf("SELECT Retain, Topic, Value FROM %s WHERE "
  459: 			"ConnID = '%q' AND MsgID = %d AND Topic LIKE '%q' %s;", 
  460: 			str, connid, msgid, topic, szStr);
  461: 
  462: 	if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
  463: 		MQTT_RTLM_LOG(sql);
  464: 		sqlite3_free(psStmt);
  465: 		return NULL;
  466: 	} else
  467: 		sqlite3_free(psStmt);
  468: 
  469: 	/* calculate count of rows and allocate subscribe items */
  470: 	while (sqlite3_step(stmt) == SQLITE_ROW)
  471: 		rowz++;
  472: 	if (!(s = io_malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
  473: 		mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
  474: 		goto end;
  475: 	} else
  476: 		memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
  477: 	sqlite3_reset(stmt);
  478: 
  479: 	/* fill with data */
  480: 	for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
  481: 		s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
  482: 		s[j].sub_topic.msg_base = (u_char*) io_strdup((char*) sqlite3_column_text(stmt, 1));
  483: 		s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
  484: 		AIT_SET_PTR(&v, (void*) sqlite3_column_blob(stmt, 2), sqlite3_column_bytes(stmt, 2));
  485: 		s[j].sub_value.msg_len = AIT_LEN(&v);
  486: 		s[j].sub_value.msg_base = (u_char*) io_malloc(s[j].sub_value.msg_len);
  487: 		if (s[j].sub_value.msg_base)
  488: 			memcpy(s[j].sub_value.msg_base, AIT_GET_PTR(&v), s[j].sub_value.msg_len);
  489: 	}
  490: end:
  491: 	sqlite3_finalize(stmt);
  492: 
  493: 	return s;
  494: }
  495: 
  496: /*
  497:  * mqtt_rtlm_write_subscribe() Subscribe topic
  498:  *
  499:  * @cfg = loaded config
  500:  * @sql = SQL handle
  501:  * @connid = connection id
  502:  * @msgid = MessageID
  503:  * @topic = topic
  504:  * @user = username
  505:  * @host = hostname
  506:  * @qos = Subscribe QoS
  507:  * return: -1 error, 0 no publish or >0 published ok
  508:  */
  509: int
  510: mqtt_rtlm_write_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
  511: 		const char *topic, const char *user, const char *host, char qos)
  512: {
  513: 	int ret = 0;
  514: 	char *str, *psStmt;
  515: 	sqlite3_stmt *stmt;
  516: 
  517: 	if (!cfg || !sql || !topic)
  518: 		return -1;
  519: 
  520: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
  521: 	if (!str) {
  522: 		mqtt_rtlm_log("Error:: not found subscribes table name");
  523: 		return -1;
  524: 	}
  525: 	psStmt = sqlite3_mprintf("INSERT INTO %s (ConnID, MsgID, QoS, Topic, PubUser, "
  526: 			"PubDate, PubHost) VALUES ('%q', %d, %d, '%q', '%q', "
  527: 			"datetime('now', 'localtime'), '%q');", str, 
  528: 			connid, msgid, qos, topic, user, host);
  529: 
  530: 	if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
  531: 		MQTT_RTLM_LOG(sql);
  532: 		sqlite3_free(psStmt);
  533: 		return -1;
  534: 	} else
  535: 		sqlite3_free(psStmt);
  536: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  537: 		ret = sqlite3_changes(sql);
  538: 	else {
  539: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  540: 			MQTT_RTLM_LOG(sql);
  541: 		ret = 0;
  542: 	}
  543: 	sqlite3_finalize(stmt);
  544: 
  545: 	return ret;
  546: }
  547: 
  548: /*
  549:  * mqtt_rtlm_delete_subscribe() Delete subscribe
  550:  *
  551:  * @cfg = loaded config
  552:  * @sql = SQL handle
  553:  * @connid = connection id
  554:  * @topic = topic
  555:  * @user = username
  556:  * @host = hostname
  557:  * return: -1 error, 0 no changes or >0 deleted rows
  558:  */
  559: int
  560: mqtt_rtlm_delete_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, 
  561: 		const char *topic, const char *user, const char *host)
  562: {
  563: 	int ret = 0;
  564: 	char *str, *psStmt;
  565: 	sqlite3_stmt *stmt;
  566: 
  567: 	if (!cfg || !sql || !topic)
  568: 		return -1;
  569: 
  570: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
  571: 	if (!str) {
  572: 		mqtt_rtlm_log("Error:: not found subscribes table name");
  573: 		return -1;
  574: 	}
  575: 	psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND "
  576: 			"Topic LIKE '%q' AND PubUser LIKE '%q' AND PubHost LIKE '%q';", str, 
  577: 			connid, topic, user, host);
  578: 
  579: 	if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
  580: 		MQTT_RTLM_LOG(sql);
  581: 		sqlite3_free(psStmt);
  582: 		return -1;
  583: 	} else
  584: 		sqlite3_free(psStmt);
  585: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  586: 		ret = sqlite3_changes(sql);
  587: 	else {
  588: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  589: 			MQTT_RTLM_LOG(sql);
  590: 		ret = 0;
  591: 	}
  592: 	sqlite3_finalize(stmt);
  593: 
  594: 	return ret;
  595: }
  596: 
  597: /*
  598:  * mqtt_rtlm_read_subscribe() Get subscribe topic
  599:  *
  600:  * @cfg = loaded config
  601:  * @sql = SQL handle
  602:  * @connid = connection id
  603:  * @topic = topic
  604:  * return: NULL error or not found and !=NULL allocated subscribe topics
  605:  */
  606: mqtt_subscr_t *
  607: mqtt_rtlm_read_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *topic)
  608: {
  609: 	int rowz = 0;
  610: 	char *str, *psStmt;
  611: 	sqlite3_stmt *stmt;
  612: 	register int j;
  613: 	mqtt_subscr_t *s = NULL;
  614: 
  615: 	if (!cfg || !sql || !topic)
  616: 		return NULL;
  617: 
  618: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
  619: 	if (!str) {
  620: 		mqtt_rtlm_log("Error:: not found subscribes table name");
  621: 		return NULL;
  622: 	}
  623: 	psStmt = sqlite3_mprintf("SELECT QoS, Topic FROM %s WHERE ConnID = '%q' AND "
  624: 			"Topic LIKE '%q';", str, connid, topic);
  625: 
  626: 	if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
  627: 		MQTT_RTLM_LOG(sql);
  628: 		sqlite3_free(psStmt);
  629: 		return NULL;
  630: 	} else
  631: 		sqlite3_free(psStmt);
  632: 
  633: 	/* calculate count of rows and allocate subscribe items */
  634: 	while (sqlite3_step(stmt) == SQLITE_ROW)
  635: 		rowz++;
  636: 	if (!(s = io_malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
  637: 		mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
  638: 		goto end;
  639: 	} else
  640: 		memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
  641: 	sqlite3_reset(stmt);
  642: 
  643: 	/* fill with data */
  644: 	for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
  645: 		s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
  646: 		s[j].sub_topic.msg_base = (u_char*) io_strdup((char*) sqlite3_column_text(stmt, 1));
  647: 		s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
  648: 		s[j].sub_value.msg_base = NULL;
  649: 		s[j].sub_value.msg_len = 0;
  650: 	}
  651: end:
  652: 	sqlite3_finalize(stmt);
  653: 
  654: 	return s;
  655: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>