Diff for /mqtt/src/pubmqtt.c between versions 1.1.2.1 and 1.2.2.11

version 1.1.2.1, 2011/11/22 21:12:52 version 1.2.2.11, 2012/06/26 08:05:58
Line 1 Line 1
 #include "global.h"  #include "global.h"
 #include "dbmqtt.h"  
   
   
   extern const char sql_schema[];
   
   
   /*
    * mqtt_db_log() Log database connection message
    *
    * @fmt = format string
    * @... = argument list
    * return: none
    */
   static void
   mqtt_rtlm_log(const char *fmt, ...)
   {
           va_list lst;
   
           va_start(lst, fmt);
           vsyslog(LOG_ERR, fmt, lst);
           va_end(lst);
   }
   #define MQTT_RTLM_LOG(_sql)     (assert((_sql)), mqtt_rtlm_log("Error:: %s(%d) SQL #%d - %s", \
                                           __func__, __LINE__, \
                                           sqlite3_errcode((_sql)), sqlite3_errmsg((_sql))))
   
   /* library pre-loaded actions */
   void
   _init()
   {
           sqlite3_initialize();
   }
   
   void
   _fini()
   {
           sqlite3_shutdown();
   }
   
   
   /*
    * mqtt_rtlm_open() Open database connection
    *
    * @cfg = config filename
    * return: NULL error or SQL handle
    */
   sqlite3 *
   mqtt_rtlm_open(cfg_root_t *cfg)
   {
           sqlite3 *sql = NULL;
           const char *str = NULL;
   
           if (!cfg)
                   return NULL;
   
           str = cfg_getAttribute(cfg, "mqtt_pub", "name");
           if (!str) {
                   mqtt_rtlm_log("Error:: Unknown database name ...\n");
                   return NULL;
           }
   
           if (sqlite3_open_v2(str, &sql, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL)) {
                   MQTT_RTLM_LOG(sql);
                   sqlite3_close(sql);
                   return NULL;
           }
   
           if (sqlite3_exec(sql, sql_schema, NULL, NULL, NULL)) {
                   MQTT_RTLM_LOG(sql);
                   sqlite3_close(sql);
                   return NULL;
           }
           return sql;
   }
   
   /*
    * mqtt_rtlm_close() Close database connection
    *
    * @sql = SQL handle
    * return: none
    */
   void
   mqtt_rtlm_close(sqlite3 *sql)
   {
           sqlite3_close(sql);
   }
   
   /*
    * mqtt_rtlm_init_session() Create session
    *
    * @cfg = loaded config
    * @sql = SQL handle
    * @connid = connection id
    * @user = username
    * @host = hostname
    * @will = will flag if !=0 must fill arguments
    * @... = will arguments in order topic,msg,qos,retain
    * return: -1 error, 0 session already appears or >0 row changed
    */
   int
   mqtt_rtlm_init_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, 
                   const char *host, char will, ...)
   {
           va_list lst;
           int ret = 0;
           char *str, *psStmt;
           sqlite3_stmt *stmt;
   
           if (!cfg || !sql)
                   return -1;
   
           str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
           if (!str) {
                   mqtt_rtlm_log("Error:: not found online table name");
                   return -1;
           }
           if (!will)
                   psStmt = sqlite3_mprintf("INSERT INTO %s (ConnID, Username, RemoteHost, "
                                   "WillFlag) VALUES ('%q', '%q', '%q', 0);", str, connid, user, host);
           else {
                   va_start(lst, will);
                   psStmt = sqlite3_mprintf("INSERT INTO %s (ConnID, Username, RemoteHost, "
                                   "WillFlag, WillRetain, WillQoS, WillMsg, WillTopic) "
                                   "VALUES ('%q', '%q', '%q', %d, %d, %d, '%q', '%q');", 
                                   str, connid, user, host, will, 
                                   va_arg(lst, int), va_arg(lst, int), va_arg(lst, char*), va_arg(lst, char*));
                   va_end(lst);
           }
   
           if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
                   MQTT_RTLM_LOG(sql);
                   sqlite3_free(psStmt);
                   return -1;
           } else
                   sqlite3_free(psStmt);
           if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                   ret = sqlite3_changes(sql);
           else {
                   if (ret > SQLITE_OK && ret < SQLITE_ROW)
                           MQTT_RTLM_LOG(sql);
                   ret = 0;
           }
           sqlite3_finalize(stmt);
   
           return ret;
   }
   
   /*
    * mqtt_rtlm_fini_session() Delete session(s)
    *
    * @cfg = loaded config
    * @sql = SQL handle
    * @connid = connection id
    * @user = username
    * @host = hostname
    * return: -1 error, 0 session already appears or >0 row changed
    */
   int
   mqtt_rtlm_fini_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
   {
           int ret = 0;
           char *str, *psStmt;
           sqlite3_stmt *stmt;
   
           if (!cfg || !sql)
                   return -1;
   
           str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
           if (!str) {
                   mqtt_rtlm_log("Error:: not found online table name");
                   return -1;
           }
           psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND Username = '%q' "
                           "AND RemoteHost LIKE '%q';", str, connid, user, host);
   
           if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
                   MQTT_RTLM_LOG(sql);
                   sqlite3_free(psStmt);
                   return -1;
           } else
                   sqlite3_free(psStmt);
           if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                   ret = sqlite3_changes(sql);
           else {
                   if (ret > SQLITE_OK && ret < SQLITE_ROW)
                           MQTT_RTLM_LOG(sql);
                   ret = 0;
           }
           sqlite3_finalize(stmt);
   
           return ret;
   }
   
   /*
    * mqtt_rtlm_chk_session() Check session(s)
    *
    * @cfg = loaded config
    * @sql = SQL handle
    * @connid = connection id
    * @user = username
    * @host = hostname
    * return: -1 error, 0 not logged or >0 logged found rows
    */
   int
   mqtt_rtlm_chk_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
   {
           int ret = 0;
           char *str, *psStmt;
           sqlite3_stmt *stmt;
   
           if (!cfg || !sql)
                   return -1;
   
           str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
           if (!str) {
                   mqtt_rtlm_log("Error:: not found online table name");
                   return -1;
           }
           psStmt = sqlite3_mprintf("SELECT ConnID, RemoteHost FROM %s WHERE "
                           "ConnID = '%q' AND Username LIKE '%q' AND RemoteHost LIKE '%q';", 
                           str, connid, user, host);
   
           if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
                   MQTT_RTLM_LOG(sql);
                   sqlite3_free(psStmt);
                   return -1;
           } else
                   sqlite3_free(psStmt);
           if (sqlite3_step(stmt) == SQLITE_ROW)
                   ret = sqlite3_changes(sql);
           else
                   ret = 0;
           sqlite3_finalize(stmt);
   
           return ret;
   }
   
   /*
    * mqtt_rtlm_write_topic() Publish topic
    *
    * @cfg = loaded config
    * @sql = SQL handle
    * @connid = connection id
    * @msgid = MessageID
    * @topic = topic
    * @txt = text
    * @txtlen = text length
    * @user = username
    * @host = hostname
    * @retain = !=0 retain message to database
    * return: -1 error, 0 no publish or >0 published ok
    */
   int
   mqtt_rtlm_write_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
                   const char *topic, void *txt, int txtlen, const char *user, const char *host, char retain)
   {
           int ret = 0;
           char *str, *psStmt;
           sqlite3_stmt *stmt;
   
           if (!cfg || !sql || !topic)
                   return -1;
   
           str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
           if (!str) {
                   mqtt_rtlm_log("Error:: not found topics table name");
                   return -1;
           }
           psStmt = sqlite3_mprintf("INSERT INTO %s (Retain, ConnID, MsgID, Topic, Value, PubUser, "
                           "PubDate, PubHost) VALUES (%d, '%q', %u, '%q', ?1, '%q', "
                           "datetime('now', 'localtime'), '%q');", 
                           str, retain, connid, msgid, topic, txt, user, host);
   
           if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
                   MQTT_RTLM_LOG(sql);
                   sqlite3_free(psStmt);
                   return -1;
           } else
                   sqlite3_free(psStmt);
           if (sqlite3_bind_blob(stmt, 1, txt, txtlen, SQLITE_TRANSIENT)) {
                   MQTT_RTLM_LOG(sql);
                   sqlite3_finalize(stmt);
                   return -1;
           }
           if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                   ret = sqlite3_changes(sql);
           else {
                   if (ret > SQLITE_OK && ret < SQLITE_ROW)
                           MQTT_RTLM_LOG(sql);
                   ret = 0;
           }
           sqlite3_finalize(stmt);
   
           return ret;
   }
   
   /*
    * mqtt_rtlm_wipe_topic() Wipe all topics
    *
    * @cfg = loaded config
    * @sql = SQL handle
    * @connid = connection id
    * @user = username
    * @retain = -1 no matter
    * return: -1 error, 0 no changes or >0 deleted rows
    */
   int
   mqtt_rtlm_wipe_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, char retain)
   {
           int ret = 0;
           char *str, *rtn, *psStmt;
           sqlite3_stmt *stmt;
   
           if (!cfg || !sql || !connid)
                   return -1;
   
           str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
           if (!str) {
                   mqtt_rtlm_log("Error:: not found topics table name");
                   return -1;
           }
           switch (retain) {
                   case -1:
                           rtn = "";
                           break;
                   case 0:
                           rtn = "AND Retain = 0";
                           break;
                   default:
                           rtn = "AND Retain != 0";
                           break;
           }
           psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND "
                           "PubUser LIKE '%q' %s;", str, connid, user, rtn);
   
           if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
                   MQTT_RTLM_LOG(sql);
                   sqlite3_free(psStmt);
                   return -1;
           } else
                   sqlite3_free(psStmt);
           if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                   ret = sqlite3_changes(sql);
           else {
                   if (ret > SQLITE_OK && ret < SQLITE_ROW)
                           MQTT_RTLM_LOG(sql);
                   ret = 0;
           }
           sqlite3_finalize(stmt);
   
           return ret;
   }
   
   /*
    * mqtt_rtlm_delete_topic() Delete topic
    *
    * @cfg = loaded config
    * @sql = SQL handle
    * @connid = connection id
    * @msgid = MessageID
    * @topic = topic
    * @user = username
    * @host = hostname
    * @retain = -1 no matter
    * return: -1 error, 0 no changes or >0 deleted rows
    */
   int
   mqtt_rtlm_delete_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
                   const char *topic, const char *user, const char *host, char retain)
   {
           int ret = 0;
           char *str, *rtn, *psStmt;
           sqlite3_stmt *stmt;
   
           if (!cfg || !sql || !topic)
                   return -1;
   
           str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
           if (!str) {
                   mqtt_rtlm_log("Error:: not found topics table name");
                   return -1;
           }
           switch (retain) {
                   case -1:
                           rtn = "";
                           break;
                   case 0:
                           rtn = "AND Retain = 0";
                           break;
                   default:
                           rtn = "AND Retain != 0";
                           break;
           }
           psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND MsgID = %d AND "
                           "Topic LIKE '%q' AND PubUser LIKE '%q' AND PubHost LIKE '%q' %s;", str, 
                           connid, msgid, topic, user, host, rtn);
   
           if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
                   MQTT_RTLM_LOG(sql);
                   sqlite3_free(psStmt);
                   return -1;
           } else
                   sqlite3_free(psStmt);
           if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                   ret = sqlite3_changes(sql);
           else {
                   if (ret > SQLITE_OK && ret < SQLITE_ROW)
                           MQTT_RTLM_LOG(sql);
                   ret = 0;
           }
           sqlite3_finalize(stmt);
   
           return ret;
   }
   
   /*
    * mqtt_rtlm_read_topic() Get topic
    *
    * @cfg = loaded config
    * @sql = SQL handle
    * @connid = connection id
    * @msgid = MessageID
    * @topic = topic
    * @retain = retain 0 get only dynamic, >0 get only retained and -1 no matter
    * return: NULL error or not found and !=NULL allocated subscribe topics
    */
   mqtt_subscr_t *
   mqtt_rtlm_read_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
                   const char *topic, char retain)
   {
           int rowz = 0;
           char *str, szStr[STRSIZ], *psStmt;
           sqlite3_stmt *stmt;
           register int j;
           mqtt_subscr_t *s = NULL;
           ait_val_t v;
   
           if (!cfg || !sql || !topic)
                   return NULL;
   
           switch (retain) {
                   case -1:
                           memset(szStr, 0, sizeof szStr);
                           break;
                   case 0:
                           snprintf(szStr, sizeof szStr, "AND Retain = 0");
                           break;
                   default:
                           snprintf(szStr, sizeof szStr, "AND Retain > 0");
                           break;
           }
   
           str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
           if (!str) {
                   mqtt_rtlm_log("Error:: not found topics table name");
                   return NULL;
           }
           psStmt = sqlite3_mprintf("SELECT Retain, Topic, Value FROM %s WHERE "
                           "ConnID = '%q' AND MsgID = %d AND Topic LIKE '%q' %s;", 
                           str, connid, msgid, topic, szStr);
   
           if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
                   MQTT_RTLM_LOG(sql);
                   sqlite3_free(psStmt);
                   return NULL;
           } else
                   sqlite3_free(psStmt);
   
           /* calculate count of rows and allocate subscribe items */
           while (sqlite3_step(stmt) == SQLITE_ROW)
                   rowz++;
           if (!(s = io_malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
                   mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
                   goto end;
           } else
                   memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
           sqlite3_reset(stmt);
   
           /* fill with data */
           for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
                   s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
                   s[j].sub_topic.msg_base = (u_char*) io_strdup((char*) sqlite3_column_text(stmt, 1));
                   s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
                   AIT_SET_PTR(&v, (void*) sqlite3_column_blob(stmt, 2), sqlite3_column_bytes(stmt, 2));
                   s[j].sub_value.msg_len = AIT_LEN(&v);
                   s[j].sub_value.msg_base = (u_char*) io_malloc(s[j].sub_value.msg_len);
                   if (s[j].sub_value.msg_base)
                           memcpy(s[j].sub_value.msg_base, AIT_GET_PTR(&v), s[j].sub_value.msg_len);
           }
   end:
           sqlite3_finalize(stmt);
   
           return s;
   }
   
   /*
    * mqtt_rtlm_write_subscribe() Subscribe topic
    *
    * @cfg = loaded config
    * @sql = SQL handle
    * @connid = connection id
    * @msgid = MessageID
    * @topic = topic
    * @user = username
    * @host = hostname
    * @qos = Subscribe QoS
    * return: -1 error, 0 no publish or >0 published ok
    */
   int
   mqtt_rtlm_write_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
                   const char *topic, const char *user, const char *host, char qos)
   {
           int ret = 0;
           char *str, *psStmt;
           sqlite3_stmt *stmt;
   
           if (!cfg || !sql || !topic)
                   return -1;
   
           str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
           if (!str) {
                   mqtt_rtlm_log("Error:: not found subscribes table name");
                   return -1;
           }
           psStmt = sqlite3_mprintf("INSERT INTO %s (ConnID, MsgID, QoS, Topic, PubUser, "
                           "PubDate, PubHost) VALUES ('%q', %d, %d, '%q', '%q', "
                           "datetime('now', 'localtime'), '%q');", str, 
                           connid, msgid, qos, topic, user, host);
   
           if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
                   MQTT_RTLM_LOG(sql);
                   sqlite3_free(psStmt);
                   return -1;
           } else
                   sqlite3_free(psStmt);
           if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                   ret = sqlite3_changes(sql);
           else {
                   if (ret > SQLITE_OK && ret < SQLITE_ROW)
                           MQTT_RTLM_LOG(sql);
                   ret = 0;
           }
           sqlite3_finalize(stmt);
   
           return ret;
   }
   
   /*
    * mqtt_rtlm_delete_subscribe() Delete subscribe
    *
    * @cfg = loaded config
    * @sql = SQL handle
    * @connid = connection id
    * @topic = topic
    * @user = username
    * @host = hostname
    * return: -1 error, 0 no changes or >0 deleted rows
    */
   int
   mqtt_rtlm_delete_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, 
                   const char *topic, const char *user, const char *host)
   {
           int ret = 0;
           char *str, *psStmt;
           sqlite3_stmt *stmt;
   
           if (!cfg || !sql || !topic)
                   return -1;
   
           str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
           if (!str) {
                   mqtt_rtlm_log("Error:: not found subscribes table name");
                   return -1;
           }
           psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND "
                           "Topic LIKE '%q' AND PubUser LIKE '%q' AND PubHost LIKE '%q';", str, 
                           connid, topic, user, host);
   
           if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
                   MQTT_RTLM_LOG(sql);
                   sqlite3_free(psStmt);
                   return -1;
           } else
                   sqlite3_free(psStmt);
           if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                   ret = sqlite3_changes(sql);
           else {
                   if (ret > SQLITE_OK && ret < SQLITE_ROW)
                           MQTT_RTLM_LOG(sql);
                   ret = 0;
           }
           sqlite3_finalize(stmt);
   
           return ret;
   }
   
   /*
    * mqtt_rtlm_read_subscribe() Get subscribe topic
    *
    * @cfg = loaded config
    * @sql = SQL handle
    * @connid = connection id
    * @topic = topic
    * return: NULL error or not found and !=NULL allocated subscribe topics
    */
   mqtt_subscr_t *
   mqtt_rtlm_read_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *topic)
   {
           int rowz = 0;
           char *str, *psStmt;
           sqlite3_stmt *stmt;
           register int j;
           mqtt_subscr_t *s = NULL;
   
           if (!cfg || !sql || !topic)
                   return NULL;
   
           str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
           if (!str) {
                   mqtt_rtlm_log("Error:: not found subscribes table name");
                   return NULL;
           }
           psStmt = sqlite3_mprintf("SELECT QoS, Topic FROM %s WHERE ConnID = '%q' AND "
                           "Topic LIKE '%q';", str, connid, topic);
   
           if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
                   MQTT_RTLM_LOG(sql);
                   sqlite3_free(psStmt);
                   return NULL;
           } else
                   sqlite3_free(psStmt);
   
           /* calculate count of rows and allocate subscribe items */
           while (sqlite3_step(stmt) == SQLITE_ROW)
                   rowz++;
           if (!(s = io_malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
                   mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
                   goto end;
           } else
                   memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
           sqlite3_reset(stmt);
   
           /* fill with data */
           for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
                   s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
                   s[j].sub_topic.msg_base = (u_char*) io_strdup((char*) sqlite3_column_text(stmt, 1));
                   s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
                   s[j].sub_value.msg_base = NULL;
                   s[j].sub_value.msg_len = 0;
           }
   end:
           sqlite3_finalize(stmt);
   
           return s;
   }

Removed from v.1.1.2.1  
changed lines
  Added in v.1.2.2.11


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>