Annotation of mqtt/src/pubmqtt.c, revision 1.3
1.2 misho 1: #include "global.h"
2:
3:
4: extern const char sql_schema[];
5:
6:
7: /*
8: * mqtt_db_log() Log database connection message
9: *
10: * @fmt = format string
11: * @... = argument list
12: * return: none
13: */
14: static void
15: mqtt_rtlm_log(const char *fmt, ...)
16: {
17: va_list lst;
18:
19: va_start(lst, fmt);
20: vsyslog(LOG_ERR, fmt, lst);
21: va_end(lst);
22: }
1.3 ! misho 23: #define MQTT_RTLM_LOG(_sql) (assert((_sql)), mqtt_rtlm_log("Error:: %s(%d) SQL #%d - %s", \
! 24: __func__, __LINE__, \
1.2 misho 25: sqlite3_errcode((_sql)), sqlite3_errmsg((_sql))))
26:
1.3 ! misho 27: /* library pre-loaded actions */
! 28: void
! 29: _init()
! 30: {
! 31: sqlite3_initialize();
! 32: }
! 33:
! 34: void
! 35: _fini()
! 36: {
! 37: sqlite3_shutdown();
! 38: }
! 39:
1.2 misho 40:
41: /*
42: * mqtt_rtlm_open() Open database connection
43: *
44: * @cfg = config filename
45: * return: NULL error or SQL handle
46: */
47: sqlite3 *
1.3 ! misho 48: mqtt_rtlm_open(cfg_root_t *cfg)
1.2 misho 49: {
50: sqlite3 *sql = NULL;
51: const char *str = NULL;
52:
53: if (!cfg)
54: return NULL;
55:
1.3 ! misho 56: str = cfg_getAttribute(cfg, "mqtt_pub", "name");
1.2 misho 57: if (!str) {
58: mqtt_rtlm_log("Error:: Unknown database name ...\n");
59: return NULL;
60: }
61:
62: if (sqlite3_open_v2(str, &sql, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL)) {
63: MQTT_RTLM_LOG(sql);
64: sqlite3_close(sql);
65: return NULL;
66: }
67:
68: if (sqlite3_exec(sql, sql_schema, NULL, NULL, NULL)) {
69: MQTT_RTLM_LOG(sql);
70: sqlite3_close(sql);
71: return NULL;
72: }
73: return sql;
74: }
75:
76: /*
77: * mqtt_rtlm_close() Close database connection
78: *
79: * @sql = SQL handle
80: * return: none
81: */
82: void
83: mqtt_rtlm_close(sqlite3 *sql)
84: {
85: sqlite3_close(sql);
86: }
87:
88: /*
89: * mqtt_rtlm_init_session() Create session
90: *
91: * @cfg = loaded config
92: * @sql = SQL handle
93: * @connid = connection id
94: * @user = username
95: * @host = hostname
96: * @will = will flag if !=0 must fill arguments
97: * @... = will arguments in order topic,msg,qos,retain
98: * return: -1 error, 0 session already appears or >0 row changed
99: */
100: int
1.3 ! misho 101: mqtt_rtlm_init_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user,
1.2 misho 102: const char *host, char will, ...)
103: {
104: va_list lst;
105: int ret = 0;
1.3 ! misho 106: char *str, *psStmt;
1.2 misho 107: sqlite3_stmt *stmt;
108:
109: if (!cfg || !sql)
110: return -1;
111:
1.3 ! misho 112: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
1.2 misho 113: if (!str) {
114: mqtt_rtlm_log("Error:: not found online table name");
115: return -1;
116: }
117: if (!will)
1.3 ! misho 118: psStmt = sqlite3_mprintf("INSERT INTO %s (ConnID, Username, RemoteHost, "
! 119: "WillFlag) VALUES ('%q', '%q', '%q', 0);", str, connid, user, host);
1.2 misho 120: else {
121: va_start(lst, will);
1.3 ! misho 122: psStmt = sqlite3_mprintf("INSERT INTO %s (ConnID, Username, RemoteHost, "
1.2 misho 123: "WillFlag, WillRetain, WillQoS, WillMsg, WillTopic) "
1.3 ! misho 124: "VALUES ('%q', '%q', '%q', %d, %d, %d, '%q', '%q');",
1.2 misho 125: str, connid, user, host, will,
126: va_arg(lst, int), va_arg(lst, int), va_arg(lst, char*), va_arg(lst, char*));
127: va_end(lst);
128: }
129:
1.3 ! misho 130: if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2 misho 131: MQTT_RTLM_LOG(sql);
1.3 ! misho 132: sqlite3_free(psStmt);
1.2 misho 133: return -1;
1.3 ! misho 134: } else
! 135: sqlite3_free(psStmt);
1.2 misho 136: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
137: ret = sqlite3_changes(sql);
138: else {
139: if (ret > SQLITE_OK && ret < SQLITE_ROW)
140: MQTT_RTLM_LOG(sql);
141: ret = 0;
142: }
143: sqlite3_finalize(stmt);
144:
145: return ret;
146: }
147:
148: /*
149: * mqtt_rtlm_fini_session() Delete session(s)
150: *
151: * @cfg = loaded config
152: * @sql = SQL handle
153: * @connid = connection id
154: * @user = username
155: * @host = hostname
156: * return: -1 error, 0 session already appears or >0 row changed
157: */
158: int
1.3 ! misho 159: mqtt_rtlm_fini_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
1.2 misho 160: {
161: int ret = 0;
1.3 ! misho 162: char *str, *psStmt;
1.2 misho 163: sqlite3_stmt *stmt;
164:
165: if (!cfg || !sql)
166: return -1;
167:
1.3 ! misho 168: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
1.2 misho 169: if (!str) {
170: mqtt_rtlm_log("Error:: not found online table name");
171: return -1;
172: }
1.3 ! misho 173: psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND Username = '%q' "
! 174: "AND RemoteHost LIKE '%q';", str, connid, user, host);
1.2 misho 175:
1.3 ! misho 176: if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2 misho 177: MQTT_RTLM_LOG(sql);
1.3 ! misho 178: sqlite3_free(psStmt);
1.2 misho 179: return -1;
1.3 ! misho 180: } else
! 181: sqlite3_free(psStmt);
1.2 misho 182: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
183: ret = sqlite3_changes(sql);
184: else {
185: if (ret > SQLITE_OK && ret < SQLITE_ROW)
186: MQTT_RTLM_LOG(sql);
187: ret = 0;
188: }
189: sqlite3_finalize(stmt);
190:
191: return ret;
192: }
193:
194: /*
195: * mqtt_rtlm_chk_session() Check session(s)
196: *
197: * @cfg = loaded config
198: * @sql = SQL handle
199: * @connid = connection id
200: * @user = username
201: * @host = hostname
202: * return: -1 error, 0 not logged or >0 logged found rows
203: */
204: int
1.3 ! misho 205: mqtt_rtlm_chk_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
1.2 misho 206: {
207: int ret = 0;
1.3 ! misho 208: char *str, *psStmt;
1.2 misho 209: sqlite3_stmt *stmt;
210:
211: if (!cfg || !sql)
212: return -1;
213:
1.3 ! misho 214: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
1.2 misho 215: if (!str) {
216: mqtt_rtlm_log("Error:: not found online table name");
217: return -1;
218: }
1.3 ! misho 219: psStmt = sqlite3_mprintf("SELECT ConnID, RemoteHost FROM %s WHERE "
! 220: "ConnID = '%q' AND Username LIKE '%q' AND RemoteHost LIKE '%q';",
1.2 misho 221: str, connid, user, host);
222:
1.3 ! misho 223: if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2 misho 224: MQTT_RTLM_LOG(sql);
1.3 ! misho 225: sqlite3_free(psStmt);
1.2 misho 226: return -1;
1.3 ! misho 227: } else
! 228: sqlite3_free(psStmt);
1.2 misho 229: if (sqlite3_step(stmt) == SQLITE_ROW)
230: ret = sqlite3_changes(sql);
231: else
232: ret = 0;
233: sqlite3_finalize(stmt);
234:
235: return ret;
236: }
237:
238: /*
239: * mqtt_rtlm_write_topic() Publish topic
240: *
241: * @cfg = loaded config
242: * @sql = SQL handle
1.3 ! misho 243: * @connid = connection id
1.2 misho 244: * @msgid = MessageID
245: * @topic = topic
246: * @txt = text
1.3 ! misho 247: * @txtlen = text length
1.2 misho 248: * @user = username
249: * @host = hostname
1.3 ! misho 250: * @qos = QoS
1.2 misho 251: * @retain = !=0 retain message to database
252: * return: -1 error, 0 no publish or >0 published ok
253: */
254: int
1.3 ! misho 255: mqtt_rtlm_write_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid,
! 256: const char *topic, void *txt, int txtlen, const char *user,
! 257: const char *host, char qos, char retain)
1.2 misho 258: {
259: int ret = 0;
1.3 ! misho 260: char *str, *psStmt;
1.2 misho 261: sqlite3_stmt *stmt;
262:
263: if (!cfg || !sql || !topic)
264: return -1;
265:
1.3 ! misho 266: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
1.2 misho 267: if (!str) {
268: mqtt_rtlm_log("Error:: not found topics table name");
269: return -1;
270: }
1.3 ! misho 271: psStmt = sqlite3_mprintf("INSERT INTO %s (QoS, Retain, ConnID, MsgID, Topic, Value, PubUser, "
! 272: "PubDate, PubHost) VALUES (%d, %d, '%q', %u, '%q', ?1, '%q', "
! 273: "datetime('now', 'localtime'), '%q');",
! 274: str, qos, retain, connid, msgid, topic, user, host);
1.2 misho 275:
1.3 ! misho 276: if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL) || !stmt) {
! 277: MQTT_RTLM_LOG(sql);
! 278: sqlite3_free(psStmt);
! 279: return -1;
! 280: } else
! 281: sqlite3_free(psStmt);
! 282: if (sqlite3_bind_blob(stmt, 1, txt, txtlen, SQLITE_TRANSIENT)) {
1.2 misho 283: MQTT_RTLM_LOG(sql);
1.3 ! misho 284: sqlite3_finalize(stmt);
1.2 misho 285: return -1;
286: }
287: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
288: ret = sqlite3_changes(sql);
289: else {
290: if (ret > SQLITE_OK && ret < SQLITE_ROW)
291: MQTT_RTLM_LOG(sql);
292: ret = 0;
293: }
294: sqlite3_finalize(stmt);
295:
296: return ret;
297: }
298:
299: /*
1.3 ! misho 300: * mqtt_rtlm_wipe_topic() Wipe all topics
! 301: *
! 302: * @cfg = loaded config
! 303: * @sql = SQL handle
! 304: * @connid = connection id
! 305: * @user = username
! 306: * @retain = -1 no matter
! 307: * return: -1 error, 0 no changes or >0 deleted rows
! 308: */
! 309: int
! 310: mqtt_rtlm_wipe_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, char retain)
! 311: {
! 312: int ret = 0;
! 313: char *str, *rtn, *psStmt;
! 314: sqlite3_stmt *stmt;
! 315:
! 316: if (!cfg || !sql || !connid)
! 317: return -1;
! 318:
! 319: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
! 320: if (!str) {
! 321: mqtt_rtlm_log("Error:: not found topics table name");
! 322: return -1;
! 323: }
! 324: switch (retain) {
! 325: case -1:
! 326: rtn = "";
! 327: break;
! 328: case 0:
! 329: rtn = "AND Retain = 0";
! 330: break;
! 331: default:
! 332: rtn = "AND Retain != 0";
! 333: break;
! 334: }
! 335: psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND "
! 336: "PubUser LIKE '%q' %s;", str, connid, user, rtn);
! 337:
! 338: if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
! 339: MQTT_RTLM_LOG(sql);
! 340: sqlite3_free(psStmt);
! 341: return -1;
! 342: } else
! 343: sqlite3_free(psStmt);
! 344: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
! 345: ret = sqlite3_changes(sql);
! 346: else {
! 347: if (ret > SQLITE_OK && ret < SQLITE_ROW)
! 348: MQTT_RTLM_LOG(sql);
! 349: ret = 0;
! 350: }
! 351: sqlite3_finalize(stmt);
! 352:
! 353: return ret;
! 354: }
! 355:
! 356: /*
1.2 misho 357: * mqtt_rtlm_delete_topic() Delete topic
358: *
359: * @cfg = loaded config
360: * @sql = SQL handle
1.3 ! misho 361: * @connid = connection id
1.2 misho 362: * @msgid = MessageID
363: * @topic = topic
364: * @user = username
365: * @host = hostname
366: * @retain = -1 no matter
367: * return: -1 error, 0 no changes or >0 deleted rows
368: */
369: int
1.3 ! misho 370: mqtt_rtlm_delete_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid,
! 371: const char *topic, const char *user, const char *host, char retain)
1.2 misho 372: {
373: int ret = 0;
1.3 ! misho 374: char *str, *rtn, *psStmt;
1.2 misho 375: sqlite3_stmt *stmt;
376:
377: if (!cfg || !sql || !topic)
378: return -1;
379:
1.3 ! misho 380: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
1.2 misho 381: if (!str) {
382: mqtt_rtlm_log("Error:: not found topics table name");
383: return -1;
384: }
385: switch (retain) {
386: case -1:
387: rtn = "";
388: break;
389: case 0:
390: rtn = "AND Retain = 0";
391: break;
392: default:
393: rtn = "AND Retain != 0";
394: break;
395: }
1.3 ! misho 396: psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND MsgID = %d AND "
! 397: "Topic LIKE '%q' AND PubUser LIKE '%q' AND PubHost LIKE '%q' %s;", str,
! 398: connid, msgid, topic, user, host, rtn);
1.2 misho 399:
1.3 ! misho 400: if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2 misho 401: MQTT_RTLM_LOG(sql);
1.3 ! misho 402: sqlite3_free(psStmt);
1.2 misho 403: return -1;
1.3 ! misho 404: } else
! 405: sqlite3_free(psStmt);
1.2 misho 406: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
407: ret = sqlite3_changes(sql);
408: else {
409: if (ret > SQLITE_OK && ret < SQLITE_ROW)
410: MQTT_RTLM_LOG(sql);
411: ret = 0;
412: }
413: sqlite3_finalize(stmt);
414:
415: return ret;
416: }
417:
418: /*
419: * mqtt_rtlm_read_topic() Get topic
420: *
421: * @cfg = loaded config
422: * @sql = SQL handle
1.3 ! misho 423: * @connid = connection id
1.2 misho 424: * @topic = topic
425: * @retain = retain 0 get only dynamic, >0 get only retained and -1 no matter
426: * return: NULL error or not found and !=NULL allocated subscribe topics
427: */
428: mqtt_subscr_t *
1.3 ! misho 429: mqtt_rtlm_read_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid,
! 430: const char *topic, char retain)
1.2 misho 431: {
432: int rowz = 0;
1.3 ! misho 433: char *str, szStr[STRSIZ], *psStmt;
1.2 misho 434: sqlite3_stmt *stmt;
435: register int j;
436: mqtt_subscr_t *s = NULL;
1.3 ! misho 437: ait_val_t v;
1.2 misho 438:
439: if (!cfg || !sql || !topic)
440: return NULL;
441:
442: switch (retain) {
443: case -1:
444: memset(szStr, 0, sizeof szStr);
445: break;
446: case 0:
447: snprintf(szStr, sizeof szStr, "AND Retain = 0");
448: break;
449: default:
450: snprintf(szStr, sizeof szStr, "AND Retain > 0");
451: break;
452: }
453:
1.3 ! misho 454: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
1.2 misho 455: if (!str) {
456: mqtt_rtlm_log("Error:: not found topics table name");
457: return NULL;
458: }
1.3 ! misho 459: psStmt = sqlite3_mprintf("SELECT QoS, Topic, Value FROM %s WHERE "
! 460: "ConnID LIKE '%q' AND Topic LIKE '%q' %s;",
! 461: str, connid, topic, szStr);
1.2 misho 462:
1.3 ! misho 463: if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2 misho 464: MQTT_RTLM_LOG(sql);
1.3 ! misho 465: sqlite3_free(psStmt);
1.2 misho 466: return NULL;
1.3 ! misho 467: } else
! 468: sqlite3_free(psStmt);
! 469:
! 470: /* calculate count of rows and allocate subscribe items */
! 471: while (sqlite3_step(stmt) == SQLITE_ROW)
! 472: rowz++;
! 473: if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
! 474: mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
! 475: goto end;
! 476: } else
! 477: memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
! 478: sqlite3_reset(stmt);
! 479:
! 480: /* fill with data */
! 481: for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
! 482: s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
! 483: s[j].sub_topic.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
! 484: s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
! 485: AIT_SET_PTR(&v, (void*) sqlite3_column_blob(stmt, 2), sqlite3_column_bytes(stmt, 2));
! 486: s[j].sub_value.msg_len = AIT_LEN(&v);
! 487: s[j].sub_value.msg_base = (u_char*) malloc(s[j].sub_value.msg_len);
! 488: if (s[j].sub_value.msg_base)
! 489: memcpy(s[j].sub_value.msg_base, AIT_GET_PTR(&v), s[j].sub_value.msg_len);
! 490: AIT_FREE_VAL(&v);
1.2 misho 491: }
1.3 ! misho 492: end:
! 493: sqlite3_finalize(stmt);
! 494:
! 495: return s;
! 496: }
! 497:
! 498: /*
! 499: * mqtt_rtlm_write_subscribe() Subscribe topic
! 500: *
! 501: * @cfg = loaded config
! 502: * @sql = SQL handle
! 503: * @connid = connection id
! 504: * @msgid = MessageID
! 505: * @topic = topic
! 506: * @user = username
! 507: * @host = hostname
! 508: * @qos = Subscribe QoS
! 509: * return: -1 error, 0 no publish or >0 published ok
! 510: */
! 511: int
! 512: mqtt_rtlm_write_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid,
! 513: const char *topic, const char *user, const char *host, char qos)
! 514: {
! 515: int ret = 0;
! 516: char *str, *psStmt;
! 517: sqlite3_stmt *stmt;
! 518:
! 519: if (!cfg || !sql || !topic)
! 520: return -1;
! 521:
! 522: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
! 523: if (!str) {
! 524: mqtt_rtlm_log("Error:: not found subscribes table name");
! 525: return -1;
! 526: }
! 527: psStmt = sqlite3_mprintf("INSERT INTO %s (ConnID, MsgID, QoS, Topic, PubUser, "
! 528: "PubDate, PubHost) VALUES ('%q', %d, %d, '%q', '%q', "
! 529: "datetime('now', 'localtime'), '%q');", str,
! 530: connid, msgid, qos, topic, user, host);
! 531:
! 532: if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
! 533: MQTT_RTLM_LOG(sql);
! 534: sqlite3_free(psStmt);
! 535: return -1;
! 536: } else
! 537: sqlite3_free(psStmt);
! 538: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
! 539: ret = sqlite3_changes(sql);
! 540: else {
! 541: if (ret > SQLITE_OK && ret < SQLITE_ROW)
! 542: MQTT_RTLM_LOG(sql);
! 543: ret = 0;
! 544: }
! 545: sqlite3_finalize(stmt);
! 546:
! 547: return ret;
! 548: }
! 549:
! 550: /*
! 551: * mqtt_rtlm_delete_subscribe() Delete subscribe
! 552: *
! 553: * @cfg = loaded config
! 554: * @sql = SQL handle
! 555: * @connid = connection id
! 556: * @topic = topic
! 557: * @user = username
! 558: * @host = hostname
! 559: * return: -1 error, 0 no changes or >0 deleted rows
! 560: */
! 561: int
! 562: mqtt_rtlm_delete_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid,
! 563: const char *topic, const char *user, const char *host)
! 564: {
! 565: int ret = 0;
! 566: char *str, *psStmt;
! 567: sqlite3_stmt *stmt;
! 568:
! 569: if (!cfg || !sql || !topic)
! 570: return -1;
! 571:
! 572: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
! 573: if (!str) {
! 574: mqtt_rtlm_log("Error:: not found subscribes table name");
! 575: return -1;
! 576: }
! 577: psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND "
! 578: "Topic LIKE '%q' AND PubUser LIKE '%q' AND PubHost LIKE '%q';", str,
! 579: connid, topic, user, host);
! 580:
! 581: if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
! 582: MQTT_RTLM_LOG(sql);
! 583: sqlite3_free(psStmt);
! 584: return -1;
! 585: } else
! 586: sqlite3_free(psStmt);
! 587: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
! 588: ret = sqlite3_changes(sql);
! 589: else {
! 590: if (ret > SQLITE_OK && ret < SQLITE_ROW)
! 591: MQTT_RTLM_LOG(sql);
! 592: ret = 0;
! 593: }
! 594: sqlite3_finalize(stmt);
! 595:
! 596: return ret;
! 597: }
! 598:
! 599: /*
! 600: * mqtt_rtlm_read_subscribe() Get subscribe topic
! 601: *
! 602: * @cfg = loaded config
! 603: * @sql = SQL handle
! 604: * @connid = connection id
! 605: * @topic = topic
! 606: * return: NULL error or not found and !=NULL allocated subscribe topics
! 607: */
! 608: mqtt_subscr_t *
! 609: mqtt_rtlm_read_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *topic)
! 610: {
! 611: int rowz = 0;
! 612: char *str, *psStmt;
! 613: sqlite3_stmt *stmt;
! 614: register int j;
! 615: mqtt_subscr_t *s = NULL;
! 616:
! 617: if (!cfg || !sql || !topic)
! 618: return NULL;
! 619:
! 620: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
! 621: if (!str) {
! 622: mqtt_rtlm_log("Error:: not found subscribes table name");
! 623: return NULL;
! 624: }
! 625: psStmt = sqlite3_mprintf("SELECT QoS, Topic FROM %s WHERE ConnID = '%q' AND "
! 626: "Topic LIKE '%q';", str, connid, topic);
! 627:
! 628: if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
! 629: MQTT_RTLM_LOG(sql);
! 630: sqlite3_free(psStmt);
! 631: return NULL;
! 632: } else
! 633: sqlite3_free(psStmt);
1.2 misho 634:
635: /* calculate count of rows and allocate subscribe items */
636: while (sqlite3_step(stmt) == SQLITE_ROW)
637: rowz++;
638: if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
639: mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
640: goto end;
641: } else
642: memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
643: sqlite3_reset(stmt);
644:
645: /* fill with data */
646: for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
647: s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
1.3 ! misho 648: s[j].sub_topic.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
! 649: s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
! 650: s[j].sub_value.msg_base = NULL;
! 651: s[j].sub_value.msg_len = 0;
1.2 misho 652: }
653: end:
654: sqlite3_finalize(stmt);
655:
656: return s;
657: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>