Annotation of mqtt/src/pubmqtt.c, revision 1.2
1.2 ! misho 1: #include "global.h"
! 2:
! 3:
! 4: extern const char sql_schema[];
! 5:
! 6:
! 7: /*
! 8: * mqtt_db_log() Log database connection message
! 9: *
! 10: * @fmt = format string
! 11: * @... = argument list
! 12: * return: none
! 13: */
! 14: static void
! 15: mqtt_rtlm_log(const char *fmt, ...)
! 16: {
! 17: va_list lst;
! 18:
! 19: va_start(lst, fmt);
! 20: vsyslog(LOG_ERR, fmt, lst);
! 21: va_end(lst);
! 22: }
! 23: #define MQTT_RTLM_LOG(_sql) (assert((_sql)), mqtt_rtlm_log("Error:: SQL #%d - %s", \
! 24: sqlite3_errcode((_sql)), sqlite3_errmsg((_sql))))
! 25:
! 26:
! 27: /*
! 28: * mqtt_rtlm_open() Open database connection
! 29: *
! 30: * @cfg = config filename
! 31: * return: NULL error or SQL handle
! 32: */
! 33: sqlite3 *
! 34: mqtt_rtlm_open(sl_config *cfg)
! 35: {
! 36: sqlite3 *sql = NULL;
! 37: const char *str = NULL;
! 38:
! 39: if (!cfg)
! 40: return NULL;
! 41:
! 42: /*
! 43: if (!sqlite3_threadsafe() || sqlite3_config(SQLITE_CONFIG_SERIALIZED))
! 44: return NULL;
! 45: */
! 46:
! 47: str = (const char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("name"));
! 48: if (!str) {
! 49: mqtt_rtlm_log("Error:: Unknown database name ...\n");
! 50: return NULL;
! 51: }
! 52:
! 53: if (sqlite3_open_v2(str, &sql, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL)) {
! 54: MQTT_RTLM_LOG(sql);
! 55: sqlite3_close(sql);
! 56: return NULL;
! 57: }
! 58:
! 59: if (sqlite3_exec(sql, sql_schema, NULL, NULL, NULL)) {
! 60: MQTT_RTLM_LOG(sql);
! 61: sqlite3_close(sql);
! 62: return NULL;
! 63: }
! 64: return sql;
! 65: }
! 66:
! 67: /*
! 68: * mqtt_rtlm_close() Close database connection
! 69: *
! 70: * @sql = SQL handle
! 71: * return: none
! 72: */
! 73: void
! 74: mqtt_rtlm_close(sqlite3 *sql)
! 75: {
! 76: sqlite3_close(sql);
! 77: }
! 78:
! 79: /*
! 80: * mqtt_rtlm_init_session() Create session
! 81: *
! 82: * @cfg = loaded config
! 83: * @sql = SQL handle
! 84: * @connid = connection id
! 85: * @user = username
! 86: * @host = hostname
! 87: * @will = will flag if !=0 must fill arguments
! 88: * @... = will arguments in order topic,msg,qos,retain
! 89: * return: -1 error, 0 session already appears or >0 row changed
! 90: */
! 91: int
! 92: mqtt_rtlm_init_session(sl_config *cfg, sqlite3 *sql, const char *connid, const char *user,
! 93: const char *host, char will, ...)
! 94: {
! 95: va_list lst;
! 96: int ret = 0;
! 97: char *str, szStmt[BUFSIZ] = { 0 };
! 98: sqlite3_stmt *stmt;
! 99:
! 100: if (!cfg || !sql)
! 101: return -1;
! 102:
! 103: str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
! 104: if (!str) {
! 105: mqtt_rtlm_log("Error:: not found online table name");
! 106: return -1;
! 107: }
! 108: if (!will)
! 109: snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (ConnID, Username, RemoteHost, "
! 110: "WillFlag) VALUES ('%s', '%s', '%s', 0);", str, connid, user, host);
! 111: else {
! 112: va_start(lst, will);
! 113: snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (ConnID, Username, RemoteHost, "
! 114: "WillFlag, WillRetain, WillQoS, WillMsg, WillTopic) "
! 115: "VALUES ('%s', '%s', '%s', %d, %d, %d, '%s', '%s');",
! 116: str, connid, user, host, will,
! 117: va_arg(lst, int), va_arg(lst, int), va_arg(lst, char*), va_arg(lst, char*));
! 118: va_end(lst);
! 119: }
! 120:
! 121: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
! 122: MQTT_RTLM_LOG(sql);
! 123: return -1;
! 124: }
! 125: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
! 126: ret = sqlite3_changes(sql);
! 127: else {
! 128: if (ret > SQLITE_OK && ret < SQLITE_ROW)
! 129: MQTT_RTLM_LOG(sql);
! 130: ret = 0;
! 131: }
! 132: sqlite3_finalize(stmt);
! 133:
! 134: return ret;
! 135: }
! 136:
! 137: /*
! 138: * mqtt_rtlm_fini_session() Delete session(s)
! 139: *
! 140: * @cfg = loaded config
! 141: * @sql = SQL handle
! 142: * @connid = connection id
! 143: * @user = username
! 144: * @host = hostname
! 145: * return: -1 error, 0 session already appears or >0 row changed
! 146: */
! 147: int
! 148: mqtt_rtlm_fini_session(sl_config *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
! 149: {
! 150: int ret = 0;
! 151: char *str, szStmt[BUFSIZ] = { 0 };
! 152: sqlite3_stmt *stmt;
! 153:
! 154: if (!cfg || !sql)
! 155: return -1;
! 156:
! 157: str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
! 158: if (!str) {
! 159: mqtt_rtlm_log("Error:: not found online table name");
! 160: return -1;
! 161: }
! 162: snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE ConnID = '%s' AND Username = '%s' "
! 163: "AND RemoteHost LIKE '%s';", str, connid, user, host);
! 164:
! 165: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
! 166: MQTT_RTLM_LOG(sql);
! 167: return -1;
! 168: }
! 169: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
! 170: ret = sqlite3_changes(sql);
! 171: else {
! 172: if (ret > SQLITE_OK && ret < SQLITE_ROW)
! 173: MQTT_RTLM_LOG(sql);
! 174: ret = 0;
! 175: }
! 176: sqlite3_finalize(stmt);
! 177:
! 178: return ret;
! 179: }
! 180:
! 181: /*
! 182: * mqtt_rtlm_chk_session() Check session(s)
! 183: *
! 184: * @cfg = loaded config
! 185: * @sql = SQL handle
! 186: * @connid = connection id
! 187: * @user = username
! 188: * @host = hostname
! 189: * return: -1 error, 0 not logged or >0 logged found rows
! 190: */
! 191: int
! 192: mqtt_rtlm_chk_session(sl_config *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
! 193: {
! 194: int ret = 0;
! 195: char *str, szStmt[BUFSIZ] = { 0 };
! 196: sqlite3_stmt *stmt;
! 197:
! 198: if (!cfg || !sql)
! 199: return -1;
! 200:
! 201: str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
! 202: if (!str) {
! 203: mqtt_rtlm_log("Error:: not found online table name");
! 204: return -1;
! 205: }
! 206: snprintf(szStmt, sizeof szStmt, "SELECT ConnID, RemoteHost FROM %s WHERE "
! 207: "ConnID = '%s' AND Username = '%s' AND RemoteHost LIKE '%s';",
! 208: str, connid, user, host);
! 209:
! 210: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
! 211: MQTT_RTLM_LOG(sql);
! 212: return -1;
! 213: }
! 214: if (sqlite3_step(stmt) == SQLITE_ROW)
! 215: ret = sqlite3_changes(sql);
! 216: else
! 217: ret = 0;
! 218: sqlite3_finalize(stmt);
! 219:
! 220: return ret;
! 221: }
! 222:
! 223: /*
! 224: * mqtt_rtlm_write_topic() Publish topic
! 225: *
! 226: * @cfg = loaded config
! 227: * @sql = SQL handle
! 228: * @msgid = MessageID
! 229: * @topic = topic
! 230: * @txt = text
! 231: * @user = username
! 232: * @host = hostname
! 233: * @retain = !=0 retain message to database
! 234: * return: -1 error, 0 no publish or >0 published ok
! 235: */
! 236: int
! 237: mqtt_rtlm_write_topic(sl_config *cfg, sqlite3 *sql, u_short msgid, const char *topic, const char *txt,
! 238: const char *user, const char *host, char retain)
! 239: {
! 240: int ret = 0;
! 241: char *str, szStmt[BUFSIZ] = { 0 };
! 242: sqlite3_stmt *stmt;
! 243:
! 244: if (!cfg || !sql || !topic)
! 245: return -1;
! 246:
! 247: str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));
! 248: if (!str) {
! 249: mqtt_rtlm_log("Error:: not found topics table name");
! 250: return -1;
! 251: }
! 252: snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Retain, MsgID, Topic, Value, PubUser, "
! 253: "PubDate, PubHost) VALUES (%d, %d, '%s', '%s', '%s', "
! 254: "datetime('now', 'localtime'), '%s');",
! 255: str, retain, msgid, topic, txt, user, host);
! 256:
! 257: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
! 258: MQTT_RTLM_LOG(sql);
! 259: return -1;
! 260: }
! 261: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
! 262: ret = sqlite3_changes(sql);
! 263: else {
! 264: if (ret > SQLITE_OK && ret < SQLITE_ROW)
! 265: MQTT_RTLM_LOG(sql);
! 266: ret = 0;
! 267: }
! 268: sqlite3_finalize(stmt);
! 269:
! 270: return ret;
! 271: }
! 272:
! 273: /*
! 274: * mqtt_rtlm_delete_topic() Delete topic
! 275: *
! 276: * @cfg = loaded config
! 277: * @sql = SQL handle
! 278: * @msgid = MessageID
! 279: * @topic = topic
! 280: * @user = username
! 281: * @host = hostname
! 282: * @retain = -1 no matter
! 283: * return: -1 error, 0 no changes or >0 deleted rows
! 284: */
! 285: int
! 286: mqtt_rtlm_delete_topic(sl_config *cfg, sqlite3 *sql, u_short msgid, const char *topic,
! 287: const char *user, const char *host, char retain)
! 288: {
! 289: int ret = 0;
! 290: char *str, *rtn, szStmt[BUFSIZ] = { 0 };
! 291: sqlite3_stmt *stmt;
! 292:
! 293: if (!cfg || !sql || !topic)
! 294: return -1;
! 295:
! 296: str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));
! 297: if (!str) {
! 298: mqtt_rtlm_log("Error:: not found topics table name");
! 299: return -1;
! 300: }
! 301: switch (retain) {
! 302: case -1:
! 303: rtn = "";
! 304: break;
! 305: case 0:
! 306: rtn = "AND Retain = 0";
! 307: break;
! 308: default:
! 309: rtn = "AND Retain != 0";
! 310: break;
! 311: }
! 312: snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE MsgID = %d AND Topic LIKE '%s' AND "
! 313: "PubUser LIKE '%s' AND PubHost LIKE '%s' %s;", str,
! 314: msgid, topic, user, host, rtn);
! 315:
! 316: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
! 317: MQTT_RTLM_LOG(sql);
! 318: return -1;
! 319: }
! 320: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
! 321: ret = sqlite3_changes(sql);
! 322: else {
! 323: if (ret > SQLITE_OK && ret < SQLITE_ROW)
! 324: MQTT_RTLM_LOG(sql);
! 325: ret = 0;
! 326: }
! 327: sqlite3_finalize(stmt);
! 328:
! 329: return ret;
! 330: }
! 331:
! 332: /*
! 333: * mqtt_rtlm_read_topic() Get topic
! 334: *
! 335: * @cfg = loaded config
! 336: * @sql = SQL handle
! 337: * @msgid = MessageID
! 338: * @topic = topic
! 339: * @retain = retain 0 get only dynamic, >0 get only retained and -1 no matter
! 340: * return: NULL error or not found and !=NULL allocated subscribe topics
! 341: */
! 342: mqtt_subscr_t *
! 343: mqtt_rtlm_read_topic(sl_config *cfg, sqlite3 *sql, u_short msgid, const char *topic, char retain)
! 344: {
! 345: int rowz = 0;
! 346: char *str, szStr[STRSIZ], szStmt[BUFSIZ] = { 0 };
! 347: sqlite3_stmt *stmt;
! 348: register int j;
! 349: mqtt_subscr_t *s = NULL;
! 350:
! 351: if (!cfg || !sql || !topic)
! 352: return NULL;
! 353:
! 354: switch (retain) {
! 355: case -1:
! 356: memset(szStr, 0, sizeof szStr);
! 357: break;
! 358: case 0:
! 359: snprintf(szStr, sizeof szStr, "AND Retain = 0");
! 360: break;
! 361: default:
! 362: snprintf(szStr, sizeof szStr, "AND Retain > 0");
! 363: break;
! 364: }
! 365:
! 366: str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));
! 367: if (!str) {
! 368: mqtt_rtlm_log("Error:: not found topics table name");
! 369: return NULL;
! 370: }
! 371: snprintf(szStmt, sizeof szStmt, "SELECT Retain, Topic, Value FROM %s WHERE "
! 372: "MsgID = %d AND Topic LIKE '%s' %s;",
! 373: str, msgid, topic, szStr);
! 374:
! 375: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
! 376: MQTT_RTLM_LOG(sql);
! 377: return NULL;
! 378: }
! 379:
! 380: /* calculate count of rows and allocate subscribe items */
! 381: while (sqlite3_step(stmt) == SQLITE_ROW)
! 382: rowz++;
! 383: if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
! 384: mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
! 385: goto end;
! 386: } else
! 387: memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
! 388: sqlite3_reset(stmt);
! 389:
! 390: /* fill with data */
! 391: for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
! 392: s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
! 393: s[j].sub_topic._base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
! 394: s[j].sub_topic._size = strlen((char*) s[j].sub_topic._base);
! 395: s[j].sub_value._base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 2));
! 396: s[j].sub_value._size = strlen((char*) s[j].sub_value._base);
! 397: }
! 398: end:
! 399: sqlite3_finalize(stmt);
! 400:
! 401: return s;
! 402: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>