File:  [ELWIX - Embedded LightWeight unIX -] / mqtt / src / pubmqtt.c
Revision 1.1.2.13: download - view: text, annotated - select for diffs - revision graph
Mon Nov 28 22:31:19 2011 UTC (12 years, 7 months ago) by misho
Branches: mqtt1_0
add forgotten flag

    1: #include "global.h"
    2: 
    3: 
    4: extern const char sql_schema[];
    5: 
    6: 
    7: /*
    8:  * mqtt_db_log() Log database connection message
    9:  *
   10:  * @fmt = format string
   11:  * @... = argument list
   12:  * return: none
   13:  */
   14: static void
   15: mqtt_rtlm_log(const char *fmt, ...)
   16: {
   17: 	va_list lst;
   18: 
   19: 	va_start(lst, fmt);
   20: 	vsyslog(LOG_ERR, fmt, lst);
   21: 	va_end(lst);
   22: }
   23: #define MQTT_RTLM_LOG(_sql)	(assert((_sql)), mqtt_rtlm_log("Error:: SQL #%d - %s", \
   24: 					sqlite3_errcode((_sql)), sqlite3_errmsg((_sql))))
   25: 
   26: 
   27: /*
   28:  * mqtt_rtlm_open() Open database connection
   29:  *
   30:  * @cfg = config filename
   31:  * return: NULL error or SQL handle
   32:  */
   33: sqlite3 *
   34: mqtt_rtlm_open(sl_config *cfg)
   35: {
   36: 	sqlite3 *sql = NULL;
   37: 	const char *str = NULL;
   38: 
   39: 	if (!cfg)
   40: 		return NULL;
   41: 
   42: 	sqlite3_config(SQLITE_CONFIG_SERIALIZED);
   43: 
   44: 	str = (const char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("name"));
   45: 	if (!str) {
   46: 		mqtt_rtlm_log("Error:: Unknown database name ...\n");
   47: 		return NULL;
   48: 	}
   49: 
   50: 	if (sqlite3_open_v2(str, &sql, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL)) {
   51: 		MQTT_RTLM_LOG(sql);
   52: 		sqlite3_close(sql);
   53: 		return NULL;
   54: 	}
   55: 
   56: 	if (sqlite3_exec(sql, sql_schema, NULL, NULL, NULL)) {
   57: 		MQTT_RTLM_LOG(sql);
   58: 		sqlite3_close(sql);
   59: 		return NULL;
   60: 	}
   61: 	return sql;
   62: }
   63: 
   64: /*
   65:  * mqtt_rtlm_close() Close database connection
   66:  *
   67:  * @sql = SQL handle
   68:  * return: none
   69:  */
   70: void
   71: mqtt_rtlm_close(sqlite3 *sql)
   72: {
   73: 	sqlite3_close(sql);
   74: }
   75: 
   76: /*
   77:  * mqtt_rtlm_init_session() Create session
   78:  *
   79:  * @cfg = loaded config
   80:  * @sql = SQL handle
   81:  * @user = username
   82:  * @host = hostname
   83:  * @port = port
   84:  * return: -1 error, 0 session already appears or >0 row changed
   85:  */
   86: int
   87: mqtt_rtlm_init_session(sl_config *cfg, sqlite3 *sql, const char *user, const char *host, u_short port)
   88: {
   89: 	int ret = 0;
   90: 	char *str, szStmt[BUFSIZ] = { 0 };
   91: 	sqlite3_stmt *stmt;
   92: 
   93: 	if (!cfg || !sql)
   94: 		return -1;
   95: 
   96: 	str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
   97: 	if (!str) {
   98: 		mqtt_rtlm_log("Error:: not found online table name");
   99: 		return -1;
  100: 	}
  101: 	snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Username, RemoteHost, RemotePort) "
  102: 		       "VALUES ('%s', '%s', %d);", str, user, host, port);
  103: 
  104: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  105: 		MQTT_RTLM_LOG(sql);
  106: 		return -1;
  107: 	}
  108: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  109: 		ret = sqlite3_changes(sql);
  110: 	else {
  111: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  112: 			MQTT_RTLM_LOG(sql);
  113: 		ret = 0;
  114: 	}
  115: 	sqlite3_finalize(stmt);
  116: 
  117: 	return ret;
  118: }
  119: 
  120: /*
  121:  * mqtt_rtlm_fini_session() Delete session(s)
  122:  *
  123:  * @cfg = loaded config
  124:  * @sql = SQL handle
  125:  * @user = username
  126:  * @host = hostname
  127:  * return: -1 error, 0 session already appears or >0 row changed
  128:  */
  129: int
  130: mqtt_rtlm_fini_session(sl_config *cfg, sqlite3 *sql, const char *user, const char *host)
  131: {
  132: 	int ret = 0;
  133: 	char *str, szStmt[BUFSIZ] = { 0 };
  134: 	sqlite3_stmt *stmt;
  135: 
  136: 	if (!cfg || !sql)
  137: 		return -1;
  138: 
  139: 	str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
  140: 	if (!str) {
  141: 		mqtt_rtlm_log("Error:: not found online table name");
  142: 		return -1;
  143: 	}
  144: 	snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE Username = '%s' AND RemoteHost LIKE '%s';", 
  145: 			str, user, host);
  146: 
  147: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  148: 		MQTT_RTLM_LOG(sql);
  149: 		return -1;
  150: 	}
  151: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  152: 		ret = sqlite3_changes(sql);
  153: 	else {
  154: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  155: 			MQTT_RTLM_LOG(sql);
  156: 		ret = 0;
  157: 	}
  158: 	sqlite3_finalize(stmt);
  159: 
  160: 	return ret;
  161: }
  162: 
  163: /*
  164:  * mqtt_rtlm_chk_session() Check session(s)
  165:  *
  166:  * @cfg = loaded config
  167:  * @sql = SQL handle
  168:  * @user = username
  169:  * @host = hostname
  170:  * return: -1 error, 0 not logged or >0 logged found rows
  171:  */
  172: int
  173: mqtt_rtlm_chk_session(sl_config *cfg, sqlite3 *sql, const char *user, const char *host)
  174: {
  175: 	int ret = 0;
  176: 	char *str, szStmt[BUFSIZ] = { 0 };
  177: 	sqlite3_stmt *stmt;
  178: 
  179: 	if (!cfg || !sql)
  180: 		return -1;
  181: 
  182: 	str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
  183: 	if (!str) {
  184: 		mqtt_rtlm_log("Error:: not found online table name");
  185: 		return -1;
  186: 	}
  187: 	snprintf(szStmt, sizeof szStmt, "SELECT RemoteHost, RemotePort FROM %s WHERE "
  188: 			"Username = '%s' AND RemoteHost LIKE '%s';", str, user, host);
  189: 
  190: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  191: 		MQTT_RTLM_LOG(sql);
  192: 		return -1;
  193: 	}
  194: 	if (sqlite3_step(stmt) == SQLITE_ROW)
  195: 		ret = sqlite3_changes(sql);
  196: 	else
  197: 		ret = 0;
  198: 	sqlite3_finalize(stmt);
  199: 
  200: 	return ret;
  201: }
  202: 
  203: /*
  204:  * mqtt_rtlm_write_topic() Publish topic
  205:  *
  206:  * @cfg = loaded config
  207:  * @sql = SQL handle
  208:  * @topic = topic
  209:  * @txt = text
  210:  * @user = username
  211:  * @host = hostname
  212:  * @retain = !=0 retain message to database
  213:  * return: -1 error, 0 no publish or >0 published ok
  214:  */
  215: int
  216: mqtt_rtlm_write_topic(sl_config *cfg, sqlite3 *sql, const char *topic, const char *txt, 
  217: 		const char *user, const char *host, char retain)
  218: {
  219: 	int ret = 0;
  220: 	char *str, szStmt[BUFSIZ] = { 0 };
  221: 	sqlite3_stmt *stmt;
  222: 
  223: 	if (!cfg || !sql || !topic)
  224: 		return -1;
  225: 
  226: 	str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));
  227: 	if (!str) {
  228: 		mqtt_rtlm_log("Error:: not found topics table name");
  229: 		return -1;
  230: 	}
  231: 	snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Retain, Topic, Value, PubUser, PubDate, PubHost) "
  232: 			"VALUES (%d, '%s', '%s', '%s', datetime('now', 'localtime'), '%s');", str, 
  233: 			retain, topic, txt, user, host);
  234: 
  235: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  236: 		MQTT_RTLM_LOG(sql);
  237: 		return -1;
  238: 	}
  239: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  240: 		ret = sqlite3_changes(sql);
  241: 	else {
  242: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  243: 			MQTT_RTLM_LOG(sql);
  244: 		ret = 0;
  245: 	}
  246: 	sqlite3_finalize(stmt);
  247: 
  248: 	return ret;
  249: }
  250: 
  251: /*
  252:  * mqtt_rtlm_delete_topic() Delete topic
  253:  *
  254:  * @cfg = loaded config
  255:  * @sql = SQL handle
  256:  * @topic = topic
  257:  * @user = username
  258:  * @host = hostname
  259:  * @retain = -1 no matter
  260:  * return: -1 error, 0 no changes or >0 deleted rows
  261:  */
  262: int
  263: mqtt_rtlm_delete_topic(sl_config *cfg, sqlite3 *sql, const char *topic, 
  264: 		const char *user, const char *host, char retain)
  265: {
  266: 	int ret = 0;
  267: 	char *str, *rtn, szStmt[BUFSIZ] = { 0 };
  268: 	sqlite3_stmt *stmt;
  269: 
  270: 	if (!cfg || !sql || !topic)
  271: 		return -1;
  272: 
  273: 	str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));
  274: 	if (!str) {
  275: 		mqtt_rtlm_log("Error:: not found topics table name");
  276: 		return -1;
  277: 	}
  278: 	switch (retain) {
  279: 		case -1:
  280: 			rtn = "";
  281: 			break;
  282: 		case 0:
  283: 			rtn = "AND Retain = 0";
  284: 			break;
  285: 		default:
  286: 			rtn = "AND Retain != 0";
  287: 			break;
  288: 	}
  289: 	snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE Topic LIKE '%s' AND "
  290: 			"PubUser LIKE '%s' AND PubHost LIKE '%s' %s;", str, 
  291: 			topic, user, host, rtn);
  292: 
  293: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  294: 		MQTT_RTLM_LOG(sql);
  295: 		return -1;
  296: 	}
  297: 	if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
  298: 		ret = sqlite3_changes(sql);
  299: 	else {
  300: 		if (ret > SQLITE_OK && ret < SQLITE_ROW)
  301: 			MQTT_RTLM_LOG(sql);
  302: 		ret = 0;
  303: 	}
  304: 	sqlite3_finalize(stmt);
  305: 
  306: 	return ret;
  307: }
  308: 
  309: /*
  310:  * mqtt_rtlm_read_topic() Get topic
  311:  *
  312:  * @cfg = loaded config
  313:  * @sql = SQL handle
  314:  * @topic = topic
  315:  * @retain = retain 0 get only dynamic, >0 get only retained and -1 no matter
  316:  * return: NULL error or not found and !=NULL allocated subscribe topics
  317:  */
  318: mqtt_subscr_t *
  319: mqtt_rtlm_read_topic(sl_config *cfg, sqlite3 *sql, const char *topic, char retain)
  320: {
  321: 	int rowz = 0;
  322: 	char *str, szStr[STRSIZ], szStmt[BUFSIZ] = { 0 };
  323: 	sqlite3_stmt *stmt;
  324: 	register int j;
  325: 	mqtt_subscr_t *s = NULL;
  326: 
  327: 	if (!cfg || !sql || !topic)
  328: 		return NULL;
  329: 
  330: 	switch (retain) {
  331: 		case -1:
  332: 			memset(szStr, 0, sizeof szStr);
  333: 			break;
  334: 		case 0:
  335: 			snprintf(szStr, sizeof szStr, "AND Retain = 0");
  336: 			break;
  337: 		default:
  338: 			snprintf(szStr, sizeof szStr, "AND Retain > 0");
  339: 			break;
  340: 	}
  341: 
  342: 	str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));
  343: 	if (!str) {
  344: 		mqtt_rtlm_log("Error:: not found topics table name");
  345: 		return NULL;
  346: 	}
  347: 	snprintf(szStmt, sizeof szStmt, "SELECT Retain, Topic, Value FROM %s WHERE Topic LIKE '%s' %s;", 
  348: 			str, topic, szStr);
  349: 
  350: 	if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
  351: 		MQTT_RTLM_LOG(sql);
  352: 		return NULL;
  353: 	}
  354: 
  355: 	/* calculate count of rows and allocate subscribe items */
  356: 	while (sqlite3_step(stmt) == SQLITE_ROW)
  357: 		rowz++;
  358: 	if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
  359: 		mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
  360: 		goto end;
  361: 	} else
  362: 		memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
  363: 	sqlite3_reset(stmt);
  364: 
  365: 	/* fill with data */
  366: 	for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
  367: 		s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
  368: 		s[j].sub_topic._base = strdup(sqlite3_column_text(stmt, 1));
  369: 		s[j].sub_topic._size = strlen(s[j].sub_topic._base);
  370: 		s[j].sub_value._base = strdup(sqlite3_column_text(stmt, 2));
  371: 		s[j].sub_value._size = strlen(s[j].sub_value._base);
  372: 	}
  373: end:
  374: 	sqlite3_finalize(stmt);
  375: 
  376: 	return s;
  377: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>