File:  [ELWIX - Embedded LightWeight unIX -] / mqtt / src / pubmqtt.c
Revision 1.1.2.15: download - view: text, annotated - select for diffs - revision graph
Tue Dec 13 10:12:36 2011 UTC (12 years, 6 months ago) by misho
Branches: mqtt1_0
fix signess typecasts

    1: #include "global.h"
    2: 
    3: 
    4: extern const char sql_schema[];
    5: 
    6: 
    7: /*
    8:  * mqtt_db_log() Log database connection message
    9:  *
   10:  * @fmt = format string
   11:  * @... = argument list
   12:  * return: none
   13:  */
   14: static void
   15: mqtt_rtlm_log(const char *fmt, ...)
   16: {
   17: 	va_list lst;
   18: 
   19: 	va_start(lst, fmt);
   20: 	vsyslog(LOG_ERR, fmt, lst);
   21: 	va_end(lst);
   22: }
   23: #define MQTT_RTLM_LOG(_sql)	(assert((_sql)), mqtt_rtlm_log("Error:: SQL #%d - %s", \
   24: 					sqlite3_errcode((_sql)), sqlite3_errmsg((_sql))))
   25: 
   26: 
   27: /*
   28:  * mqtt_rtlm_open() Open database connection
   29:  *
   30:  * @cfg = config filename
   31:  * return: NULL error or SQL handle
   32:  */
   33: sqlite3 *
   34: mqtt_rtlm_open(sl_config *cfg)
   35: {
   36: 	sqlite3 *sql = NULL;
   37: 	const char *str = NULL;
   38: 
   39: 	if (!cfg)
   40: 		return NULL;
   41: 
   42: 	sqlite3_config(SQLITE_CONFIG_SERIALIZED);
   43: 
   44: 	str = (const char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("name"));
   45: 	if (!str) {
   46: 		mqtt_rtlm_log("Error:: Unknown database name ...\n");
   47: 		return NULL;
   48: 	}
   49: 
   50: 	if (sqlite3_open_v2(str, &sql, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL)) {
   51: 		MQTT_RTLM_LOG(sql);
   52: 		sqlite3_close(sql);
   53: 		return NULL;
   54: 	}
   55: 
   56: 	if (sqlite3_exec(sql, sql_schema, NULL, NULL, NULL)) {
   57: 		MQTT_RTLM_LOG(sql);
   58: 		sqlite3_close(sql);
   59: 		return NULL;
   60: 	}
   61: 	return sql;
   62: }
   63: 
   64: /*
   65:  * mqtt_rtlm_close() Close database connection
   66:  *
   67:  * @sql = SQL handle
   68:  * return: none
   69:  */
   70: void
   71: mqtt_rtlm_close(sqlite3 *sql)
   72: {
   73: 	sqlite3_close(sql);
   74: }
   75: 
   76: /*
   77:  * mqtt_rtlm_init_session() Create session
   78:  *
   79:  * @cfg = loaded config
   80:  * @sql = SQL handle
   81:  * @user = username
   82:  * @connid = connection id
   83:  * @host = hostname
   84:  * @port = port
   85:  * @will = will flag if !=0 must fill arguments
   86:  * @... = will arguments in order topic,msg,qos,retain
   87:  * return: -1 error, 0 session already appears or >0 row changed
   88:  */
   89: int
   90: mqtt_rtlm_init_session(sl_config *cfg, sqlite3 *sql, const char *user, const char *connid, 
   91: 		const char *host, u_short port, char will, ...)
   92: {
   93: 	va_list lst;
   94: 	int ret = 0;
   95: 	char *str, szStmt[BUFSIZ] = { 0 };
   96: 	sqlite3_stmt *stmt;
   97: 
   98: 	if (!cfg || !sql)
   99: 		return -1;
  100: 
  101: 	str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
  102: 	if (!str) {
  103: 		mqtt_rtlm_log("Error:: not found online table name");
  104: 		return -1;
  105: 	}
  106: 	if (!will)
  107: 		snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Username, ConnID, RemoteHost, RemotePort, "
  108: 				"WillFlag) VALUES ('%s', '%s', '%s', %d, 0);", str, user, connid, host, port);
  109: 	else {
  110: 		va_start(lst, will);
  111: 		snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Username, ConnID, RemoteHost, RemotePort, "
  112: 				"WillFlag, WillRetain, WillQoS, WillMsg, WillTopic) "
  113: 				"VALUES ('%s', '%s', '%s', %d, %d, %d, %d, '%s', '%s');", 
  114: 				str, user, connid, host, port, will, 
  115: 				va_arg(lst, int), va_arg(lst, int), va_arg(lst, char*), va_arg(lst, char*));
  116: 		va_end(lst);
  117: 	}
  118: 
  119: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  120: 		MQTT_RTLM_LOG(sql);
  121: 		return -1;
  122: 	}
  123: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  124: 		ret = sqlite3_changes(sql);
  125: 	else {
  126: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  127: 			MQTT_RTLM_LOG(sql);
  128: 		ret = 0;
  129: 	}
  130: 	sqlite3_finalize(stmt);
  131: 
  132: 	return ret;
  133: }
  134: 
  135: /*
  136:  * mqtt_rtlm_fini_session() Delete session(s)
  137:  *
  138:  * @cfg = loaded config
  139:  * @sql = SQL handle
  140:  * @user = username
  141:  * @connid = connection id
  142:  * @host = hostname
  143:  * return: -1 error, 0 session already appears or >0 row changed
  144:  */
  145: int
  146: mqtt_rtlm_fini_session(sl_config *cfg, sqlite3 *sql, const char *user, const char *connid, const char *host)
  147: {
  148: 	int ret = 0;
  149: 	char *str, szStmt[BUFSIZ] = { 0 };
  150: 	sqlite3_stmt *stmt;
  151: 
  152: 	if (!cfg || !sql)
  153: 		return -1;
  154: 
  155: 	str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
  156: 	if (!str) {
  157: 		mqtt_rtlm_log("Error:: not found online table name");
  158: 		return -1;
  159: 	}
  160: 	snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE Username = '%s' AND ConnID = '%s' "
  161: 			"AND RemoteHost LIKE '%s';", str, user, connid, host);
  162: 
  163: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  164: 		MQTT_RTLM_LOG(sql);
  165: 		return -1;
  166: 	}
  167: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  168: 		ret = sqlite3_changes(sql);
  169: 	else {
  170: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  171: 			MQTT_RTLM_LOG(sql);
  172: 		ret = 0;
  173: 	}
  174: 	sqlite3_finalize(stmt);
  175: 
  176: 	return ret;
  177: }
  178: 
  179: /*
  180:  * mqtt_rtlm_chk_session() Check session(s)
  181:  *
  182:  * @cfg = loaded config
  183:  * @sql = SQL handle
  184:  * @user = username
  185:  * @connid = connection id
  186:  * @host = hostname
  187:  * return: -1 error, 0 not logged or >0 logged found rows
  188:  */
  189: int
  190: mqtt_rtlm_chk_session(sl_config *cfg, sqlite3 *sql, const char *user, const char *connid, const char *host)
  191: {
  192: 	int ret = 0;
  193: 	char *str, szStmt[BUFSIZ] = { 0 };
  194: 	sqlite3_stmt *stmt;
  195: 
  196: 	if (!cfg || !sql)
  197: 		return -1;
  198: 
  199: 	str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
  200: 	if (!str) {
  201: 		mqtt_rtlm_log("Error:: not found online table name");
  202: 		return -1;
  203: 	}
  204: 	snprintf(szStmt, sizeof szStmt, "SELECT ConnID, RemoteHost, RemotePort FROM %s WHERE "
  205: 			"Username = '%s' AND ConnID = '%s' AND RemoteHost LIKE '%s';", 
  206: 			str, user, connid, host);
  207: 
  208: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  209: 		MQTT_RTLM_LOG(sql);
  210: 		return -1;
  211: 	}
  212: 	if (sqlite3_step(stmt) == SQLITE_ROW)
  213: 		ret = sqlite3_changes(sql);
  214: 	else
  215: 		ret = 0;
  216: 	sqlite3_finalize(stmt);
  217: 
  218: 	return ret;
  219: }
  220: 
  221: /*
  222:  * mqtt_rtlm_write_topic() Publish topic
  223:  *
  224:  * @cfg = loaded config
  225:  * @sql = SQL handle
  226:  * @msgid = MessageID
  227:  * @topic = topic
  228:  * @txt = text
  229:  * @user = username
  230:  * @host = hostname
  231:  * @retain = !=0 retain message to database
  232:  * return: -1 error, 0 no publish or >0 published ok
  233:  */
  234: int
  235: mqtt_rtlm_write_topic(sl_config *cfg, sqlite3 *sql, u_short msgid, const char *topic, const char *txt, 
  236: 		const char *user, const char *host, char retain)
  237: {
  238: 	int ret = 0;
  239: 	char *str, szStmt[BUFSIZ] = { 0 };
  240: 	sqlite3_stmt *stmt;
  241: 
  242: 	if (!cfg || !sql || !topic)
  243: 		return -1;
  244: 
  245: 	str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));
  246: 	if (!str) {
  247: 		mqtt_rtlm_log("Error:: not found topics table name");
  248: 		return -1;
  249: 	}
  250: 	snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Retain, MsgID, Topic, Value, PubUser, "
  251: 			"PubDate, PubHost) VALUES (%d, %d, '%s', '%s', '%s', "
  252: 			"datetime('now', 'localtime'), '%s');", 
  253: 			str, retain, msgid, topic, txt, user, host);
  254: 
  255: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  256: 		MQTT_RTLM_LOG(sql);
  257: 		return -1;
  258: 	}
  259: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  260: 		ret = sqlite3_changes(sql);
  261: 	else {
  262: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  263: 			MQTT_RTLM_LOG(sql);
  264: 		ret = 0;
  265: 	}
  266: 	sqlite3_finalize(stmt);
  267: 
  268: 	return ret;
  269: }
  270: 
  271: /*
  272:  * mqtt_rtlm_delete_topic() Delete topic
  273:  *
  274:  * @cfg = loaded config
  275:  * @sql = SQL handle
  276:  * @msgid = MessageID
  277:  * @topic = topic
  278:  * @user = username
  279:  * @host = hostname
  280:  * @retain = -1 no matter
  281:  * return: -1 error, 0 no changes or >0 deleted rows
  282:  */
  283: int
  284: mqtt_rtlm_delete_topic(sl_config *cfg, sqlite3 *sql, u_short msgid, const char *topic, 
  285: 		const char *user, const char *host, char retain)
  286: {
  287: 	int ret = 0;
  288: 	char *str, *rtn, szStmt[BUFSIZ] = { 0 };
  289: 	sqlite3_stmt *stmt;
  290: 
  291: 	if (!cfg || !sql || !topic)
  292: 		return -1;
  293: 
  294: 	str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));
  295: 	if (!str) {
  296: 		mqtt_rtlm_log("Error:: not found topics table name");
  297: 		return -1;
  298: 	}
  299: 	switch (retain) {
  300: 		case -1:
  301: 			rtn = "";
  302: 			break;
  303: 		case 0:
  304: 			rtn = "AND Retain = 0";
  305: 			break;
  306: 		default:
  307: 			rtn = "AND Retain != 0";
  308: 			break;
  309: 	}
  310: 	snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE MsgID = %d AND Topic LIKE '%s' AND "
  311: 			"PubUser LIKE '%s' AND PubHost LIKE '%s' %s;", str, 
  312: 			msgid, topic, user, host, rtn);
  313: 
  314: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  315: 		MQTT_RTLM_LOG(sql);
  316: 		return -1;
  317: 	}
  318: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  319: 		ret = sqlite3_changes(sql);
  320: 	else {
  321: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  322: 			MQTT_RTLM_LOG(sql);
  323: 		ret = 0;
  324: 	}
  325: 	sqlite3_finalize(stmt);
  326: 
  327: 	return ret;
  328: }
  329: 
  330: /*
  331:  * mqtt_rtlm_read_topic() Get topic
  332:  *
  333:  * @cfg = loaded config
  334:  * @sql = SQL handle
  335:  * @msgid = MessageID
  336:  * @topic = topic
  337:  * @retain = retain 0 get only dynamic, >0 get only retained and -1 no matter
  338:  * return: NULL error or not found and !=NULL allocated subscribe topics
  339:  */
  340: mqtt_subscr_t *
  341: mqtt_rtlm_read_topic(sl_config *cfg, sqlite3 *sql, u_short msgid, const char *topic, char retain)
  342: {
  343: 	int rowz = 0;
  344: 	char *str, szStr[STRSIZ], szStmt[BUFSIZ] = { 0 };
  345: 	sqlite3_stmt *stmt;
  346: 	register int j;
  347: 	mqtt_subscr_t *s = NULL;
  348: 
  349: 	if (!cfg || !sql || !topic)
  350: 		return NULL;
  351: 
  352: 	switch (retain) {
  353: 		case -1:
  354: 			memset(szStr, 0, sizeof szStr);
  355: 			break;
  356: 		case 0:
  357: 			snprintf(szStr, sizeof szStr, "AND Retain = 0");
  358: 			break;
  359: 		default:
  360: 			snprintf(szStr, sizeof szStr, "AND Retain > 0");
  361: 			break;
  362: 	}
  363: 
  364: 	str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));
  365: 	if (!str) {
  366: 		mqtt_rtlm_log("Error:: not found topics table name");
  367: 		return NULL;
  368: 	}
  369: 	snprintf(szStmt, sizeof szStmt, "SELECT Retain, Topic, Value FROM %s WHERE "
  370: 			"MsgID = %d AND Topic LIKE '%s' %s;", 
  371: 			str, msgid, topic, szStr);
  372: 
  373: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  374: 		MQTT_RTLM_LOG(sql);
  375: 		return NULL;
  376: 	}
  377: 
  378: 	/* calculate count of rows and allocate subscribe items */
  379: 	while (sqlite3_step(stmt) == SQLITE_ROW)
  380: 		rowz++;
  381: 	if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
  382: 		mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
  383: 		goto end;
  384: 	} else
  385: 		memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
  386: 	sqlite3_reset(stmt);
  387: 
  388: 	/* fill with data */
  389: 	for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
  390: 		s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
  391: 		s[j].sub_topic._base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
  392: 		s[j].sub_topic._size = strlen((char*) s[j].sub_topic._base);
  393: 		s[j].sub_value._base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 2));
  394: 		s[j].sub_value._size = strlen((char*) s[j].sub_value._base);
  395: 	}
  396: end:
  397: 	sqlite3_finalize(stmt);
  398: 
  399: 	return s;
  400: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>