--- mqtt/src/pubmqtt.c 2012/04/11 15:08:27 1.2.2.3 +++ mqtt/src/pubmqtt.c 2012/04/25 12:04:30 1.2.2.5 @@ -20,7 +20,8 @@ mqtt_rtlm_log(const char *fmt, ...) vsyslog(LOG_ERR, fmt, lst); va_end(lst); } -#define MQTT_RTLM_LOG(_sql) (assert((_sql)), mqtt_rtlm_log("Error:: SQL #%d - %s", \ +#define MQTT_RTLM_LOG(_sql) (assert((_sql)), mqtt_rtlm_log("Error:: %s(%d) SQL #%d - %s", \ + __func__, __LINE__, \ sqlite3_errcode((_sql)), sqlite3_errmsg((_sql)))) @@ -39,10 +40,9 @@ mqtt_rtlm_open(cfg_root_t *cfg) if (!cfg) return NULL; - /* - if (!sqlite3_threadsafe() || sqlite3_config(SQLITE_CONFIG_SERIALIZED)) + sqlite3_config(SQLITE_CONFIG_SERIALIZED); + if (!sqlite3_threadsafe()) return NULL; - */ str = cfg_getAttribute(cfg, "mqtt_pub", "name"); if (!str) { @@ -56,11 +56,14 @@ mqtt_rtlm_open(cfg_root_t *cfg) return NULL; } + sqlite3_mutex_enter(sqlite3_db_mutex(sql)); if (sqlite3_exec(sql, sql_schema, NULL, NULL, NULL)) { MQTT_RTLM_LOG(sql); + sqlite3_mutex_leave(sqlite3_db_mutex(sql)); sqlite3_close(sql); return NULL; } + sqlite3_mutex_leave(sqlite3_db_mutex(sql)); return sql; } @@ -118,8 +121,10 @@ mqtt_rtlm_init_session(cfg_root_t *cfg, sqlite3 *sql, va_end(lst); } + sqlite3_mutex_enter(sqlite3_db_mutex(sql)); if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) { MQTT_RTLM_LOG(sql); + sqlite3_mutex_leave(sqlite3_db_mutex(sql)); return -1; } if ((ret = sqlite3_step(stmt)) == SQLITE_DONE) @@ -130,6 +135,7 @@ mqtt_rtlm_init_session(cfg_root_t *cfg, sqlite3 *sql, ret = 0; } sqlite3_finalize(stmt); + sqlite3_mutex_leave(sqlite3_db_mutex(sql)); return ret; } @@ -162,8 +168,10 @@ mqtt_rtlm_fini_session(cfg_root_t *cfg, sqlite3 *sql, snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE ConnID = '%s' AND Username = '%s' " "AND RemoteHost LIKE '%s';", str, connid, user, host); + sqlite3_mutex_enter(sqlite3_db_mutex(sql)); if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) { MQTT_RTLM_LOG(sql); + sqlite3_mutex_leave(sqlite3_db_mutex(sql)); return -1; } if ((ret = sqlite3_step(stmt)) == SQLITE_DONE) @@ -174,6 +182,7 @@ mqtt_rtlm_fini_session(cfg_root_t *cfg, sqlite3 *sql, ret = 0; } sqlite3_finalize(stmt); + sqlite3_mutex_leave(sqlite3_db_mutex(sql)); return ret; } @@ -207,8 +216,10 @@ mqtt_rtlm_chk_session(cfg_root_t *cfg, sqlite3 *sql, c "ConnID = '%s' AND Username = '%s' AND RemoteHost LIKE '%s';", str, connid, user, host); + sqlite3_mutex_enter(sqlite3_db_mutex(sql)); if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) { MQTT_RTLM_LOG(sql); + sqlite3_mutex_leave(sqlite3_db_mutex(sql)); return -1; } if (sqlite3_step(stmt) == SQLITE_ROW) @@ -216,6 +227,7 @@ mqtt_rtlm_chk_session(cfg_root_t *cfg, sqlite3 *sql, c else ret = 0; sqlite3_finalize(stmt); + sqlite3_mutex_leave(sqlite3_db_mutex(sql)); return ret; } @@ -254,8 +266,10 @@ mqtt_rtlm_write_topic(cfg_root_t *cfg, sqlite3 *sql, u "datetime('now', 'localtime'), '%s');", str, retain, msgid, topic, txt, user, host); + sqlite3_mutex_enter(sqlite3_db_mutex(sql)); if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) { MQTT_RTLM_LOG(sql); + sqlite3_mutex_leave(sqlite3_db_mutex(sql)); return -1; } if ((ret = sqlite3_step(stmt)) == SQLITE_DONE) @@ -266,6 +280,7 @@ mqtt_rtlm_write_topic(cfg_root_t *cfg, sqlite3 *sql, u ret = 0; } sqlite3_finalize(stmt); + sqlite3_mutex_leave(sqlite3_db_mutex(sql)); return ret; } @@ -313,8 +328,10 @@ mqtt_rtlm_delete_topic(cfg_root_t *cfg, sqlite3 *sql, "PubUser LIKE '%s' AND PubHost LIKE '%s' %s;", str, msgid, topic, user, host, rtn); + sqlite3_mutex_enter(sqlite3_db_mutex(sql)); if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) { MQTT_RTLM_LOG(sql); + sqlite3_mutex_leave(sqlite3_db_mutex(sql)); return -1; } if ((ret = sqlite3_step(stmt)) == SQLITE_DONE) @@ -325,6 +342,7 @@ mqtt_rtlm_delete_topic(cfg_root_t *cfg, sqlite3 *sql, ret = 0; } sqlite3_finalize(stmt); + sqlite3_mutex_leave(sqlite3_db_mutex(sql)); return ret; } @@ -372,8 +390,10 @@ mqtt_rtlm_read_topic(cfg_root_t *cfg, sqlite3 *sql, u_ "MsgID = %d AND Topic LIKE '%s' %s;", str, msgid, topic, szStr); + sqlite3_mutex_enter(sqlite3_db_mutex(sql)); if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) { MQTT_RTLM_LOG(sql); + sqlite3_mutex_leave(sqlite3_db_mutex(sql)); return NULL; } @@ -397,6 +417,7 @@ mqtt_rtlm_read_topic(cfg_root_t *cfg, sqlite3 *sql, u_ } end: sqlite3_finalize(stmt); + sqlite3_mutex_leave(sqlite3_db_mutex(sql)); return s; } @@ -434,8 +455,10 @@ mqtt_rtlm_write_subscribe(cfg_root_t *cfg, sqlite3 *sq "datetime('now', 'localtime'), '%s');", str, msgid, qos, topic, user, host); + sqlite3_mutex_enter(sqlite3_db_mutex(sql)); if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) { MQTT_RTLM_LOG(sql); + sqlite3_mutex_leave(sqlite3_db_mutex(sql)); return -1; } if ((ret = sqlite3_step(stmt)) == SQLITE_DONE) @@ -446,6 +469,7 @@ mqtt_rtlm_write_subscribe(cfg_root_t *cfg, sqlite3 *sq ret = 0; } sqlite3_finalize(stmt); + sqlite3_mutex_leave(sqlite3_db_mutex(sql)); return ret; } @@ -483,8 +507,10 @@ mqtt_rtlm_delete_subscribe(cfg_root_t *cfg, sqlite3 *s "PubUser LIKE '%s' AND PubHost LIKE '%s' %s;", str, topic, user, host, szStr); + sqlite3_mutex_enter(sqlite3_db_mutex(sql)); if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) { MQTT_RTLM_LOG(sql); + sqlite3_mutex_leave(sqlite3_db_mutex(sql)); return -1; } if ((ret = sqlite3_step(stmt)) == SQLITE_DONE) @@ -495,6 +521,7 @@ mqtt_rtlm_delete_subscribe(cfg_root_t *cfg, sqlite3 *s ret = 0; } sqlite3_finalize(stmt); + sqlite3_mutex_leave(sqlite3_db_mutex(sql)); return ret; } @@ -526,8 +553,10 @@ mqtt_rtlm_read_subscribe(cfg_root_t *cfg, sqlite3 *sql } snprintf(szStmt, sizeof szStmt, "SELECT QoS, Topic FROM %s WHERE Topic LIKE '%s';", str, topic); + sqlite3_mutex_enter(sqlite3_db_mutex(sql)); if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) { MQTT_RTLM_LOG(sql); + sqlite3_mutex_leave(sqlite3_db_mutex(sql)); return NULL; } @@ -551,6 +580,7 @@ mqtt_rtlm_read_subscribe(cfg_root_t *cfg, sqlite3 *sql } end: sqlite3_finalize(stmt); + sqlite3_mutex_leave(sqlite3_db_mutex(sql)); return s; }