File:  [ELWIX - Embedded LightWeight unIX -] / mqtt / src / pubmqtt.c
Revision 1.2.2.6: download - view: text, annotated - select for diffs - revision graph
Sat May 5 14:51:02 2012 UTC (12 years, 1 month ago) by misho
Branches: mqtt1_1
removed mutexes

    1: #include "global.h"
    2: 
    3: 
    4: extern const char sql_schema[];
    5: 
    6: 
    7: /*
    8:  * mqtt_db_log() Log database connection message
    9:  *
   10:  * @fmt = format string
   11:  * @... = argument list
   12:  * return: none
   13:  */
   14: static void
   15: mqtt_rtlm_log(const char *fmt, ...)
   16: {
   17: 	va_list lst;
   18: 
   19: 	va_start(lst, fmt);
   20: 	vsyslog(LOG_ERR, fmt, lst);
   21: 	va_end(lst);
   22: }
   23: #define MQTT_RTLM_LOG(_sql)	(assert((_sql)), mqtt_rtlm_log("Error:: %s(%d) SQL #%d - %s", \
   24: 					__func__, __LINE__, \
   25: 					sqlite3_errcode((_sql)), sqlite3_errmsg((_sql))))
   26: 
   27: 
   28: /*
   29:  * mqtt_rtlm_open() Open database connection
   30:  *
   31:  * @cfg = config filename
   32:  * return: NULL error or SQL handle
   33:  */
   34: sqlite3 *
   35: mqtt_rtlm_open(cfg_root_t *cfg)
   36: {
   37: 	sqlite3 *sql = NULL;
   38: 	const char *str = NULL;
   39: 
   40: 	if (!cfg)
   41: 		return NULL;
   42: 
   43: 	str = cfg_getAttribute(cfg, "mqtt_pub", "name");
   44: 	if (!str) {
   45: 		mqtt_rtlm_log("Error:: Unknown database name ...\n");
   46: 		return NULL;
   47: 	}
   48: 
   49: 	if (sqlite3_open_v2(str, &sql, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL)) {
   50: 		MQTT_RTLM_LOG(sql);
   51: 		sqlite3_close(sql);
   52: 		return NULL;
   53: 	}
   54: 
   55: 	if (sqlite3_exec(sql, sql_schema, NULL, NULL, NULL)) {
   56: 		MQTT_RTLM_LOG(sql);
   57: 		sqlite3_close(sql);
   58: 		return NULL;
   59: 	}
   60: 	return sql;
   61: }
   62: 
   63: /*
   64:  * mqtt_rtlm_close() Close database connection
   65:  *
   66:  * @sql = SQL handle
   67:  * return: none
   68:  */
   69: void
   70: mqtt_rtlm_close(sqlite3 *sql)
   71: {
   72: 	sqlite3_close(sql);
   73: }
   74: 
   75: /*
   76:  * mqtt_rtlm_init_session() Create session
   77:  *
   78:  * @cfg = loaded config
   79:  * @sql = SQL handle
   80:  * @connid = connection id
   81:  * @user = username
   82:  * @host = hostname
   83:  * @will = will flag if !=0 must fill arguments
   84:  * @... = will arguments in order topic,msg,qos,retain
   85:  * return: -1 error, 0 session already appears or >0 row changed
   86:  */
   87: int
   88: mqtt_rtlm_init_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, 
   89: 		const char *host, char will, ...)
   90: {
   91: 	va_list lst;
   92: 	int ret = 0;
   93: 	char *str, szStmt[BUFSIZ] = { 0 };
   94: 	sqlite3_stmt *stmt;
   95: 
   96: 	if (!cfg || !sql)
   97: 		return -1;
   98: 
   99: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
  100: 	if (!str) {
  101: 		mqtt_rtlm_log("Error:: not found online table name");
  102: 		return -1;
  103: 	}
  104: 	if (!will)
  105: 		snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (ConnID, Username, RemoteHost, "
  106: 				"WillFlag) VALUES ('%s', '%s', '%s', 0);", str, connid, user, host);
  107: 	else {
  108: 		va_start(lst, will);
  109: 		snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (ConnID, Username, RemoteHost, "
  110: 				"WillFlag, WillRetain, WillQoS, WillMsg, WillTopic) "
  111: 				"VALUES ('%s', '%s', '%s', %d, %d, %d, '%s', '%s');", 
  112: 				str, connid, user, host, will, 
  113: 				va_arg(lst, int), va_arg(lst, int), va_arg(lst, char*), va_arg(lst, char*));
  114: 		va_end(lst);
  115: 	}
  116: 
  117: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  118: 		MQTT_RTLM_LOG(sql);
  119: 		return -1;
  120: 	}
  121: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  122: 		ret = sqlite3_changes(sql);
  123: 	else {
  124: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  125: 			MQTT_RTLM_LOG(sql);
  126: 		ret = 0;
  127: 	}
  128: 	sqlite3_finalize(stmt);
  129: 
  130: 	return ret;
  131: }
  132: 
  133: /*
  134:  * mqtt_rtlm_fini_session() Delete session(s)
  135:  *
  136:  * @cfg = loaded config
  137:  * @sql = SQL handle
  138:  * @connid = connection id
  139:  * @user = username
  140:  * @host = hostname
  141:  * return: -1 error, 0 session already appears or >0 row changed
  142:  */
  143: int
  144: mqtt_rtlm_fini_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
  145: {
  146: 	int ret = 0;
  147: 	char *str, szStmt[BUFSIZ] = { 0 };
  148: 	sqlite3_stmt *stmt;
  149: 
  150: 	if (!cfg || !sql)
  151: 		return -1;
  152: 
  153: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
  154: 	if (!str) {
  155: 		mqtt_rtlm_log("Error:: not found online table name");
  156: 		return -1;
  157: 	}
  158: 	snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE ConnID = '%s' AND Username = '%s' "
  159: 			"AND RemoteHost LIKE '%s';", str, connid, user, host);
  160: 
  161: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  162: 		MQTT_RTLM_LOG(sql);
  163: 		return -1;
  164: 	}
  165: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  166: 		ret = sqlite3_changes(sql);
  167: 	else {
  168: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  169: 			MQTT_RTLM_LOG(sql);
  170: 		ret = 0;
  171: 	}
  172: 	sqlite3_finalize(stmt);
  173: 
  174: 	return ret;
  175: }
  176: 
  177: /*
  178:  * mqtt_rtlm_chk_session() Check session(s)
  179:  *
  180:  * @cfg = loaded config
  181:  * @sql = SQL handle
  182:  * @connid = connection id
  183:  * @user = username
  184:  * @host = hostname
  185:  * return: -1 error, 0 not logged or >0 logged found rows
  186:  */
  187: int
  188: mqtt_rtlm_chk_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
  189: {
  190: 	int ret = 0;
  191: 	char *str, szStmt[BUFSIZ] = { 0 };
  192: 	sqlite3_stmt *stmt;
  193: 
  194: 	if (!cfg || !sql)
  195: 		return -1;
  196: 
  197: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
  198: 	if (!str) {
  199: 		mqtt_rtlm_log("Error:: not found online table name");
  200: 		return -1;
  201: 	}
  202: 	snprintf(szStmt, sizeof szStmt, "SELECT ConnID, RemoteHost FROM %s WHERE "
  203: 			"ConnID = '%s' AND Username = '%s' AND RemoteHost LIKE '%s';", 
  204: 			str, connid, user, host);
  205: 
  206: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  207: 		MQTT_RTLM_LOG(sql);
  208: 		return -1;
  209: 	}
  210: 	if (sqlite3_step(stmt) == SQLITE_ROW)
  211: 		ret = sqlite3_changes(sql);
  212: 	else
  213: 		ret = 0;
  214: 	sqlite3_finalize(stmt);
  215: 
  216: 	return ret;
  217: }
  218: 
  219: /*
  220:  * mqtt_rtlm_write_topic() Publish topic
  221:  *
  222:  * @cfg = loaded config
  223:  * @sql = SQL handle
  224:  * @msgid = MessageID
  225:  * @topic = topic
  226:  * @txt = text
  227:  * @user = username
  228:  * @host = hostname
  229:  * @retain = !=0 retain message to database
  230:  * return: -1 error, 0 no publish or >0 published ok
  231:  */
  232: int
  233: mqtt_rtlm_write_topic(cfg_root_t *cfg, sqlite3 *sql, u_short msgid, const char *topic, const char *txt, 
  234: 		const char *user, const char *host, char retain)
  235: {
  236: 	int ret = 0;
  237: 	char *str, szStmt[BUFSIZ] = { 0 };
  238: 	sqlite3_stmt *stmt;
  239: 
  240: 	if (!cfg || !sql || !topic)
  241: 		return -1;
  242: 
  243: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
  244: 	if (!str) {
  245: 		mqtt_rtlm_log("Error:: not found topics table name");
  246: 		return -1;
  247: 	}
  248: 	snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Retain, MsgID, Topic, Value, PubUser, "
  249: 			"PubDate, PubHost) VALUES (%d, %d, '%s', '%s', '%s', "
  250: 			"datetime('now', 'localtime'), '%s');", 
  251: 			str, retain, msgid, topic, txt, user, host);
  252: 
  253: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  254: 		MQTT_RTLM_LOG(sql);
  255: 		return -1;
  256: 	}
  257: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  258: 		ret = sqlite3_changes(sql);
  259: 	else {
  260: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  261: 			MQTT_RTLM_LOG(sql);
  262: 		ret = 0;
  263: 	}
  264: 	sqlite3_finalize(stmt);
  265: 
  266: 	return ret;
  267: }
  268: 
  269: /*
  270:  * mqtt_rtlm_delete_topic() Delete topic
  271:  *
  272:  * @cfg = loaded config
  273:  * @sql = SQL handle
  274:  * @msgid = MessageID
  275:  * @topic = topic
  276:  * @user = username
  277:  * @host = hostname
  278:  * @retain = -1 no matter
  279:  * return: -1 error, 0 no changes or >0 deleted rows
  280:  */
  281: int
  282: mqtt_rtlm_delete_topic(cfg_root_t *cfg, sqlite3 *sql, u_short msgid, const char *topic, 
  283: 		const char *user, const char *host, char retain)
  284: {
  285: 	int ret = 0;
  286: 	char *str, *rtn, szStmt[BUFSIZ] = { 0 };
  287: 	sqlite3_stmt *stmt;
  288: 
  289: 	if (!cfg || !sql || !topic)
  290: 		return -1;
  291: 
  292: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
  293: 	if (!str) {
  294: 		mqtt_rtlm_log("Error:: not found topics table name");
  295: 		return -1;
  296: 	}
  297: 	switch (retain) {
  298: 		case -1:
  299: 			rtn = "";
  300: 			break;
  301: 		case 0:
  302: 			rtn = "AND Retain = 0";
  303: 			break;
  304: 		default:
  305: 			rtn = "AND Retain != 0";
  306: 			break;
  307: 	}
  308: 	snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE MsgID = %d AND Topic LIKE '%s' AND "
  309: 			"PubUser LIKE '%s' AND PubHost LIKE '%s' %s;", str, 
  310: 			msgid, topic, user, host, rtn);
  311: 
  312: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  313: 		MQTT_RTLM_LOG(sql);
  314: 		return -1;
  315: 	}
  316: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  317: 		ret = sqlite3_changes(sql);
  318: 	else {
  319: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  320: 			MQTT_RTLM_LOG(sql);
  321: 		ret = 0;
  322: 	}
  323: 	sqlite3_finalize(stmt);
  324: 
  325: 	return ret;
  326: }
  327: 
  328: /*
  329:  * mqtt_rtlm_read_topic() Get topic
  330:  *
  331:  * @cfg = loaded config
  332:  * @sql = SQL handle
  333:  * @msgid = MessageID
  334:  * @topic = topic
  335:  * @retain = retain 0 get only dynamic, >0 get only retained and -1 no matter
  336:  * return: NULL error or not found and !=NULL allocated subscribe topics
  337:  */
  338: mqtt_subscr_t *
  339: mqtt_rtlm_read_topic(cfg_root_t *cfg, sqlite3 *sql, u_short msgid, const char *topic, char retain)
  340: {
  341: 	int rowz = 0;
  342: 	char *str, szStr[STRSIZ], szStmt[BUFSIZ] = { 0 };
  343: 	sqlite3_stmt *stmt;
  344: 	register int j;
  345: 	mqtt_subscr_t *s = NULL;
  346: 
  347: 	if (!cfg || !sql || !topic)
  348: 		return NULL;
  349: 
  350: 	switch (retain) {
  351: 		case -1:
  352: 			memset(szStr, 0, sizeof szStr);
  353: 			break;
  354: 		case 0:
  355: 			snprintf(szStr, sizeof szStr, "AND Retain = 0");
  356: 			break;
  357: 		default:
  358: 			snprintf(szStr, sizeof szStr, "AND Retain > 0");
  359: 			break;
  360: 	}
  361: 
  362: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
  363: 	if (!str) {
  364: 		mqtt_rtlm_log("Error:: not found topics table name");
  365: 		return NULL;
  366: 	}
  367: 	snprintf(szStmt, sizeof szStmt, "SELECT Retain, Topic, Value FROM %s WHERE "
  368: 			"MsgID = %d AND Topic LIKE '%s' %s;", 
  369: 			str, msgid, topic, szStr);
  370: 
  371: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  372: 		MQTT_RTLM_LOG(sql);
  373: 		return NULL;
  374: 	}
  375: 
  376: 	/* calculate count of rows and allocate subscribe items */
  377: 	while (sqlite3_step(stmt) == SQLITE_ROW)
  378: 		rowz++;
  379: 	if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
  380: 		mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
  381: 		goto end;
  382: 	} else
  383: 		memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
  384: 	sqlite3_reset(stmt);
  385: 
  386: 	/* fill with data */
  387: 	for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
  388: 		s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
  389: 		s[j].sub_topic.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
  390: 		s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
  391: 		s[j].sub_value.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 2));
  392: 		s[j].sub_value.msg_len = strlen((char*) s[j].sub_value.msg_base);
  393: 	}
  394: end:
  395: 	sqlite3_finalize(stmt);
  396: 
  397: 	return s;
  398: }
  399: 
  400: /*
  401:  * mqtt_rtlm_write_subscribe() Subscribe topic
  402:  *
  403:  * @cfg = loaded config
  404:  * @sql = SQL handle
  405:  * @msgid = MessageID
  406:  * @topic = topic
  407:  * @user = username
  408:  * @host = hostname
  409:  * @qos = Subscribe QoS
  410:  * return: -1 error, 0 no publish or >0 published ok
  411:  */
  412: int
  413: mqtt_rtlm_write_subscribe(cfg_root_t *cfg, sqlite3 *sql, u_short msgid, const char *topic, 
  414: 		const char *user, const char *host, char qos)
  415: {
  416: 	int ret = 0;
  417: 	char *str, szStmt[BUFSIZ] = { 0 };
  418: 	sqlite3_stmt *stmt;
  419: 
  420: 	if (!cfg || !sql || !topic)
  421: 		return -1;
  422: 
  423: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
  424: 	if (!str) {
  425: 		mqtt_rtlm_log("Error:: not found subscribes table name");
  426: 		return -1;
  427: 	}
  428: 	snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (MsgID, QoS, Topic, PubUser, "
  429: 			"PubDate, PubHost) VALUES (%d, %d, '%s', '%s', "
  430: 			"datetime('now', 'localtime'), '%s');", str, 
  431: 			msgid, qos, topic, user, host);
  432: 
  433: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  434: 		MQTT_RTLM_LOG(sql);
  435: 		return -1;
  436: 	}
  437: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  438: 		ret = sqlite3_changes(sql);
  439: 	else {
  440: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  441: 			MQTT_RTLM_LOG(sql);
  442: 		ret = 0;
  443: 	}
  444: 	sqlite3_finalize(stmt);
  445: 
  446: 	return ret;
  447: }
  448: 
  449: /*
  450:  * mqtt_rtlm_delete_subscribe() Delete subscribe
  451:  *
  452:  * @cfg = loaded config
  453:  * @sql = SQL handle
  454:  * @topic = topic
  455:  * @user = username
  456:  * @host = hostname
  457:  * @qos = Subscribe QoS if -1 no matter
  458:  * return: -1 error, 0 no changes or >0 deleted rows
  459:  */
  460: int
  461: mqtt_rtlm_delete_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *topic, 
  462: 		const char *user, const char *host, char qos)
  463: {
  464: 	int ret = 0;
  465: 	char *str, szStr[STRSIZ] = { 0 }, szStmt[BUFSIZ] = { 0 };
  466: 	sqlite3_stmt *stmt;
  467: 
  468: 	if (!cfg || !sql || !topic)
  469: 		return -1;
  470: 
  471: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
  472: 	if (!str) {
  473: 		mqtt_rtlm_log("Error:: not found subscribes table name");
  474: 		return -1;
  475: 	}
  476: 	if (qos > -1 && qos < 3)
  477: 		snprintf(szStr, sizeof szStr, "AND QoS = %d", qos);
  478: 	snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE Topic LIKE '%s' AND "
  479: 			"PubUser LIKE '%s' AND PubHost LIKE '%s' %s;", str, 
  480: 			topic, user, host, szStr);
  481: 
  482: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  483: 		MQTT_RTLM_LOG(sql);
  484: 		return -1;
  485: 	}
  486: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  487: 		ret = sqlite3_changes(sql);
  488: 	else {
  489: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  490: 			MQTT_RTLM_LOG(sql);
  491: 		ret = 0;
  492: 	}
  493: 	sqlite3_finalize(stmt);
  494: 
  495: 	return ret;
  496: }
  497: 
  498: /*
  499:  * mqtt_rtlm_read_subscribe() Get subscribe topic
  500:  *
  501:  * @cfg = loaded config
  502:  * @sql = SQL handle
  503:  * @topic = topic
  504:  * return: NULL error or not found and !=NULL allocated subscribe topics
  505:  */
  506: mqtt_subscr_t *
  507: mqtt_rtlm_read_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *topic)
  508: {
  509: 	int rowz = 0;
  510: 	char *str, szStmt[BUFSIZ] = { 0 };
  511: 	sqlite3_stmt *stmt;
  512: 	register int j;
  513: 	mqtt_subscr_t *s = NULL;
  514: 
  515: 	if (!cfg || !sql || !topic)
  516: 		return NULL;
  517: 
  518: 	str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
  519: 	if (!str) {
  520: 		mqtt_rtlm_log("Error:: not found subscribes table name");
  521: 		return NULL;
  522: 	}
  523: 	snprintf(szStmt, sizeof szStmt, "SELECT QoS, Topic FROM %s WHERE Topic LIKE '%s';", str, topic);
  524: 
  525: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  526: 		MQTT_RTLM_LOG(sql);
  527: 		return NULL;
  528: 	}
  529: 
  530: 	/* calculate count of rows and allocate subscribe items */
  531: 	while (sqlite3_step(stmt) == SQLITE_ROW)
  532: 		rowz++;
  533: 	if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
  534: 		mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
  535: 		goto end;
  536: 	} else
  537: 		memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
  538: 	sqlite3_reset(stmt);
  539: 
  540: 	/* fill with data */
  541: 	for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
  542: 		s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
  543: 		s[j].sub_topic.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
  544: 		s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
  545: 		s[j].sub_value.msg_base = NULL;
  546: 		s[j].sub_value.msg_len = 0;
  547: 	}
  548: end:
  549: 	sqlite3_finalize(stmt);
  550: 
  551: 	return s;
  552: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>