Annotation of mqtt/src/pubmqtt.c, revision 1.2.2.11
1.2 misho 1: #include "global.h"
2:
3:
4: extern const char sql_schema[];
5:
6:
7: /*
8: * mqtt_db_log() Log database connection message
9: *
10: * @fmt = format string
11: * @... = argument list
12: * return: none
13: */
14: static void
15: mqtt_rtlm_log(const char *fmt, ...)
16: {
17: va_list lst;
18:
19: va_start(lst, fmt);
20: vsyslog(LOG_ERR, fmt, lst);
21: va_end(lst);
22: }
1.2.2.5 misho 23: #define MQTT_RTLM_LOG(_sql) (assert((_sql)), mqtt_rtlm_log("Error:: %s(%d) SQL #%d - %s", \
24: __func__, __LINE__, \
1.2 misho 25: sqlite3_errcode((_sql)), sqlite3_errmsg((_sql))))
26:
1.2.2.10 misho 27: /* library pre-loaded actions */
28: void
29: _init()
30: {
31: sqlite3_initialize();
32: }
33:
34: void
35: _fini()
36: {
37: sqlite3_shutdown();
38: }
39:
1.2 misho 40:
41: /*
42: * mqtt_rtlm_open() Open database connection
43: *
44: * @cfg = config filename
45: * return: NULL error or SQL handle
46: */
47: sqlite3 *
1.2.2.3 misho 48: mqtt_rtlm_open(cfg_root_t *cfg)
1.2 misho 49: {
50: sqlite3 *sql = NULL;
51: const char *str = NULL;
52:
53: if (!cfg)
54: return NULL;
55:
1.2.2.3 misho 56: str = cfg_getAttribute(cfg, "mqtt_pub", "name");
1.2 misho 57: if (!str) {
58: mqtt_rtlm_log("Error:: Unknown database name ...\n");
59: return NULL;
60: }
61:
62: if (sqlite3_open_v2(str, &sql, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL)) {
63: MQTT_RTLM_LOG(sql);
64: sqlite3_close(sql);
65: return NULL;
66: }
67:
68: if (sqlite3_exec(sql, sql_schema, NULL, NULL, NULL)) {
69: MQTT_RTLM_LOG(sql);
70: sqlite3_close(sql);
71: return NULL;
72: }
73: return sql;
74: }
75:
76: /*
77: * mqtt_rtlm_close() Close database connection
78: *
79: * @sql = SQL handle
80: * return: none
81: */
82: void
83: mqtt_rtlm_close(sqlite3 *sql)
84: {
85: sqlite3_close(sql);
86: }
87:
88: /*
89: * mqtt_rtlm_init_session() Create session
90: *
91: * @cfg = loaded config
92: * @sql = SQL handle
93: * @connid = connection id
94: * @user = username
95: * @host = hostname
96: * @will = will flag if !=0 must fill arguments
97: * @... = will arguments in order topic,msg,qos,retain
98: * return: -1 error, 0 session already appears or >0 row changed
99: */
100: int
1.2.2.3 misho 101: mqtt_rtlm_init_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user,
1.2 misho 102: const char *host, char will, ...)
103: {
104: va_list lst;
105: int ret = 0;
1.2.2.11! misho 106: char *str, *psStmt;
1.2 misho 107: sqlite3_stmt *stmt;
108:
109: if (!cfg || !sql)
110: return -1;
111:
1.2.2.3 misho 112: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
1.2 misho 113: if (!str) {
114: mqtt_rtlm_log("Error:: not found online table name");
115: return -1;
116: }
117: if (!will)
1.2.2.11! misho 118: psStmt = sqlite3_mprintf("INSERT INTO %s (ConnID, Username, RemoteHost, "
! 119: "WillFlag) VALUES ('%q', '%q', '%q', 0);", str, connid, user, host);
1.2 misho 120: else {
121: va_start(lst, will);
1.2.2.11! misho 122: psStmt = sqlite3_mprintf("INSERT INTO %s (ConnID, Username, RemoteHost, "
1.2 misho 123: "WillFlag, WillRetain, WillQoS, WillMsg, WillTopic) "
1.2.2.11! misho 124: "VALUES ('%q', '%q', '%q', %d, %d, %d, '%q', '%q');",
1.2 misho 125: str, connid, user, host, will,
126: va_arg(lst, int), va_arg(lst, int), va_arg(lst, char*), va_arg(lst, char*));
127: va_end(lst);
128: }
129:
1.2.2.11! misho 130: if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2 misho 131: MQTT_RTLM_LOG(sql);
1.2.2.11! misho 132: sqlite3_free(psStmt);
1.2 misho 133: return -1;
1.2.2.11! misho 134: } else
! 135: sqlite3_free(psStmt);
1.2 misho 136: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
137: ret = sqlite3_changes(sql);
138: else {
139: if (ret > SQLITE_OK && ret < SQLITE_ROW)
140: MQTT_RTLM_LOG(sql);
141: ret = 0;
142: }
143: sqlite3_finalize(stmt);
144:
145: return ret;
146: }
147:
148: /*
149: * mqtt_rtlm_fini_session() Delete session(s)
150: *
151: * @cfg = loaded config
152: * @sql = SQL handle
153: * @connid = connection id
154: * @user = username
155: * @host = hostname
156: * return: -1 error, 0 session already appears or >0 row changed
157: */
158: int
1.2.2.3 misho 159: mqtt_rtlm_fini_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
1.2 misho 160: {
161: int ret = 0;
1.2.2.11! misho 162: char *str, *psStmt;
1.2 misho 163: sqlite3_stmt *stmt;
164:
165: if (!cfg || !sql)
166: return -1;
167:
1.2.2.3 misho 168: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
1.2 misho 169: if (!str) {
170: mqtt_rtlm_log("Error:: not found online table name");
171: return -1;
172: }
1.2.2.11! misho 173: psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND Username = '%q' "
! 174: "AND RemoteHost LIKE '%q';", str, connid, user, host);
1.2 misho 175:
1.2.2.11! misho 176: if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2 misho 177: MQTT_RTLM_LOG(sql);
1.2.2.11! misho 178: sqlite3_free(psStmt);
1.2 misho 179: return -1;
1.2.2.11! misho 180: } else
! 181: sqlite3_free(psStmt);
1.2 misho 182: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
183: ret = sqlite3_changes(sql);
184: else {
185: if (ret > SQLITE_OK && ret < SQLITE_ROW)
186: MQTT_RTLM_LOG(sql);
187: ret = 0;
188: }
189: sqlite3_finalize(stmt);
190:
191: return ret;
192: }
193:
194: /*
195: * mqtt_rtlm_chk_session() Check session(s)
196: *
197: * @cfg = loaded config
198: * @sql = SQL handle
199: * @connid = connection id
200: * @user = username
201: * @host = hostname
202: * return: -1 error, 0 not logged or >0 logged found rows
203: */
204: int
1.2.2.3 misho 205: mqtt_rtlm_chk_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
1.2 misho 206: {
207: int ret = 0;
1.2.2.11! misho 208: char *str, *psStmt;
1.2 misho 209: sqlite3_stmt *stmt;
210:
211: if (!cfg || !sql)
212: return -1;
213:
1.2.2.3 misho 214: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
1.2 misho 215: if (!str) {
216: mqtt_rtlm_log("Error:: not found online table name");
217: return -1;
218: }
1.2.2.11! misho 219: psStmt = sqlite3_mprintf("SELECT ConnID, RemoteHost FROM %s WHERE "
! 220: "ConnID = '%q' AND Username LIKE '%q' AND RemoteHost LIKE '%q';",
1.2 misho 221: str, connid, user, host);
222:
1.2.2.11! misho 223: if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2 misho 224: MQTT_RTLM_LOG(sql);
1.2.2.11! misho 225: sqlite3_free(psStmt);
1.2 misho 226: return -1;
1.2.2.11! misho 227: } else
! 228: sqlite3_free(psStmt);
1.2 misho 229: if (sqlite3_step(stmt) == SQLITE_ROW)
230: ret = sqlite3_changes(sql);
231: else
232: ret = 0;
233: sqlite3_finalize(stmt);
234:
235: return ret;
236: }
237:
238: /*
239: * mqtt_rtlm_write_topic() Publish topic
240: *
241: * @cfg = loaded config
242: * @sql = SQL handle
1.2.2.8 misho 243: * @connid = connection id
1.2 misho 244: * @msgid = MessageID
245: * @topic = topic
246: * @txt = text
1.2.2.11! misho 247: * @txtlen = text length
1.2 misho 248: * @user = username
249: * @host = hostname
250: * @retain = !=0 retain message to database
251: * return: -1 error, 0 no publish or >0 published ok
252: */
253: int
1.2.2.8 misho 254: mqtt_rtlm_write_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid,
1.2.2.11! misho 255: const char *topic, void *txt, int txtlen, const char *user, const char *host, char retain)
1.2 misho 256: {
257: int ret = 0;
1.2.2.11! misho 258: char *str, *psStmt;
1.2 misho 259: sqlite3_stmt *stmt;
260:
261: if (!cfg || !sql || !topic)
262: return -1;
263:
1.2.2.3 misho 264: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
1.2 misho 265: if (!str) {
266: mqtt_rtlm_log("Error:: not found topics table name");
267: return -1;
268: }
1.2.2.11! misho 269: psStmt = sqlite3_mprintf("INSERT INTO %s (Retain, ConnID, MsgID, Topic, Value, PubUser, "
! 270: "PubDate, PubHost) VALUES (%d, '%q', %u, '%q', ?1, '%q', "
! 271: "datetime('now', 'localtime'), '%q');",
1.2.2.8 misho 272: str, retain, connid, msgid, topic, txt, user, host);
273:
1.2.2.11! misho 274: if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2.2.8 misho 275: MQTT_RTLM_LOG(sql);
1.2.2.11! misho 276: sqlite3_free(psStmt);
! 277: return -1;
! 278: } else
! 279: sqlite3_free(psStmt);
! 280: if (sqlite3_bind_blob(stmt, 1, txt, txtlen, SQLITE_TRANSIENT)) {
! 281: MQTT_RTLM_LOG(sql);
! 282: sqlite3_finalize(stmt);
1.2.2.8 misho 283: return -1;
284: }
285: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
286: ret = sqlite3_changes(sql);
287: else {
288: if (ret > SQLITE_OK && ret < SQLITE_ROW)
289: MQTT_RTLM_LOG(sql);
290: ret = 0;
291: }
292: sqlite3_finalize(stmt);
293:
294: return ret;
295: }
296:
297: /*
298: * mqtt_rtlm_wipe_topic() Wipe all topics
299: *
300: * @cfg = loaded config
301: * @sql = SQL handle
302: * @connid = connection id
303: * @user = username
304: * @retain = -1 no matter
305: * return: -1 error, 0 no changes or >0 deleted rows
306: */
307: int
308: mqtt_rtlm_wipe_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, char retain)
309: {
310: int ret = 0;
1.2.2.11! misho 311: char *str, *rtn, *psStmt;
1.2.2.8 misho 312: sqlite3_stmt *stmt;
313:
314: if (!cfg || !sql || !connid)
315: return -1;
316:
317: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
318: if (!str) {
319: mqtt_rtlm_log("Error:: not found topics table name");
320: return -1;
321: }
322: switch (retain) {
323: case -1:
324: rtn = "";
325: break;
326: case 0:
327: rtn = "AND Retain = 0";
328: break;
329: default:
330: rtn = "AND Retain != 0";
331: break;
332: }
1.2.2.11! misho 333: psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND "
! 334: "PubUser LIKE '%q' %s;", str, connid, user, rtn);
1.2 misho 335:
1.2.2.11! misho 336: if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2 misho 337: MQTT_RTLM_LOG(sql);
1.2.2.11! misho 338: sqlite3_free(psStmt);
1.2 misho 339: return -1;
1.2.2.11! misho 340: } else
! 341: sqlite3_free(psStmt);
1.2 misho 342: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
343: ret = sqlite3_changes(sql);
344: else {
345: if (ret > SQLITE_OK && ret < SQLITE_ROW)
346: MQTT_RTLM_LOG(sql);
347: ret = 0;
348: }
349: sqlite3_finalize(stmt);
350:
351: return ret;
352: }
353:
354: /*
355: * mqtt_rtlm_delete_topic() Delete topic
356: *
357: * @cfg = loaded config
358: * @sql = SQL handle
1.2.2.8 misho 359: * @connid = connection id
1.2 misho 360: * @msgid = MessageID
361: * @topic = topic
362: * @user = username
363: * @host = hostname
364: * @retain = -1 no matter
365: * return: -1 error, 0 no changes or >0 deleted rows
366: */
367: int
1.2.2.8 misho 368: mqtt_rtlm_delete_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid,
369: const char *topic, const char *user, const char *host, char retain)
1.2 misho 370: {
371: int ret = 0;
1.2.2.11! misho 372: char *str, *rtn, *psStmt;
1.2 misho 373: sqlite3_stmt *stmt;
374:
375: if (!cfg || !sql || !topic)
376: return -1;
377:
1.2.2.3 misho 378: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
1.2 misho 379: if (!str) {
380: mqtt_rtlm_log("Error:: not found topics table name");
381: return -1;
382: }
383: switch (retain) {
384: case -1:
385: rtn = "";
386: break;
387: case 0:
388: rtn = "AND Retain = 0";
389: break;
390: default:
391: rtn = "AND Retain != 0";
392: break;
393: }
1.2.2.11! misho 394: psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND MsgID = %d AND "
! 395: "Topic LIKE '%q' AND PubUser LIKE '%q' AND PubHost LIKE '%q' %s;", str,
1.2.2.8 misho 396: connid, msgid, topic, user, host, rtn);
1.2 misho 397:
1.2.2.11! misho 398: if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2 misho 399: MQTT_RTLM_LOG(sql);
1.2.2.11! misho 400: sqlite3_free(psStmt);
1.2 misho 401: return -1;
1.2.2.11! misho 402: } else
! 403: sqlite3_free(psStmt);
1.2 misho 404: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
405: ret = sqlite3_changes(sql);
406: else {
407: if (ret > SQLITE_OK && ret < SQLITE_ROW)
408: MQTT_RTLM_LOG(sql);
409: ret = 0;
410: }
411: sqlite3_finalize(stmt);
412:
413: return ret;
414: }
415:
416: /*
417: * mqtt_rtlm_read_topic() Get topic
418: *
419: * @cfg = loaded config
420: * @sql = SQL handle
1.2.2.8 misho 421: * @connid = connection id
1.2 misho 422: * @msgid = MessageID
423: * @topic = topic
424: * @retain = retain 0 get only dynamic, >0 get only retained and -1 no matter
425: * return: NULL error or not found and !=NULL allocated subscribe topics
426: */
427: mqtt_subscr_t *
1.2.2.8 misho 428: mqtt_rtlm_read_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid,
429: const char *topic, char retain)
1.2 misho 430: {
431: int rowz = 0;
1.2.2.11! misho 432: char *str, szStr[STRSIZ], *psStmt;
1.2 misho 433: sqlite3_stmt *stmt;
434: register int j;
435: mqtt_subscr_t *s = NULL;
1.2.2.11! misho 436: ait_val_t v;
1.2 misho 437:
438: if (!cfg || !sql || !topic)
439: return NULL;
440:
441: switch (retain) {
442: case -1:
443: memset(szStr, 0, sizeof szStr);
444: break;
445: case 0:
446: snprintf(szStr, sizeof szStr, "AND Retain = 0");
447: break;
448: default:
449: snprintf(szStr, sizeof szStr, "AND Retain > 0");
450: break;
451: }
452:
1.2.2.3 misho 453: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
1.2 misho 454: if (!str) {
455: mqtt_rtlm_log("Error:: not found topics table name");
456: return NULL;
457: }
1.2.2.11! misho 458: psStmt = sqlite3_mprintf("SELECT Retain, Topic, Value FROM %s WHERE "
! 459: "ConnID = '%q' AND MsgID = %d AND Topic LIKE '%q' %s;",
1.2.2.8 misho 460: str, connid, msgid, topic, szStr);
1.2 misho 461:
1.2.2.11! misho 462: if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2 misho 463: MQTT_RTLM_LOG(sql);
1.2.2.11! misho 464: sqlite3_free(psStmt);
1.2 misho 465: return NULL;
1.2.2.11! misho 466: } else
! 467: sqlite3_free(psStmt);
1.2 misho 468:
469: /* calculate count of rows and allocate subscribe items */
470: while (sqlite3_step(stmt) == SQLITE_ROW)
471: rowz++;
1.2.2.9 misho 472: if (!(s = io_malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
1.2 misho 473: mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
474: goto end;
475: } else
476: memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
477: sqlite3_reset(stmt);
478:
479: /* fill with data */
480: for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
481: s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
1.2.2.9 misho 482: s[j].sub_topic.msg_base = (u_char*) io_strdup((char*) sqlite3_column_text(stmt, 1));
1.2.2.1 misho 483: s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
1.2.2.11! misho 484: AIT_SET_PTR(&v, (void*) sqlite3_column_blob(stmt, 2), sqlite3_column_bytes(stmt, 2));
! 485: s[j].sub_value.msg_len = AIT_LEN(&v);
! 486: s[j].sub_value.msg_base = (u_char*) io_malloc(s[j].sub_value.msg_len);
! 487: if (s[j].sub_value.msg_base)
! 488: memcpy(s[j].sub_value.msg_base, AIT_GET_PTR(&v), s[j].sub_value.msg_len);
1.2 misho 489: }
490: end:
491: sqlite3_finalize(stmt);
492:
493: return s;
494: }
1.2.2.2 misho 495:
496: /*
497: * mqtt_rtlm_write_subscribe() Subscribe topic
498: *
499: * @cfg = loaded config
500: * @sql = SQL handle
1.2.2.8 misho 501: * @connid = connection id
1.2.2.2 misho 502: * @msgid = MessageID
503: * @topic = topic
504: * @user = username
505: * @host = hostname
506: * @qos = Subscribe QoS
507: * return: -1 error, 0 no publish or >0 published ok
508: */
509: int
1.2.2.8 misho 510: mqtt_rtlm_write_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid,
511: const char *topic, const char *user, const char *host, char qos)
1.2.2.2 misho 512: {
513: int ret = 0;
1.2.2.11! misho 514: char *str, *psStmt;
1.2.2.2 misho 515: sqlite3_stmt *stmt;
516:
517: if (!cfg || !sql || !topic)
518: return -1;
519:
1.2.2.3 misho 520: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
1.2.2.2 misho 521: if (!str) {
522: mqtt_rtlm_log("Error:: not found subscribes table name");
523: return -1;
524: }
1.2.2.11! misho 525: psStmt = sqlite3_mprintf("INSERT INTO %s (ConnID, MsgID, QoS, Topic, PubUser, "
! 526: "PubDate, PubHost) VALUES ('%q', %d, %d, '%q', '%q', "
! 527: "datetime('now', 'localtime'), '%q');", str,
1.2.2.8 misho 528: connid, msgid, qos, topic, user, host);
1.2.2.2 misho 529:
1.2.2.11! misho 530: if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2.2.2 misho 531: MQTT_RTLM_LOG(sql);
1.2.2.11! misho 532: sqlite3_free(psStmt);
1.2.2.2 misho 533: return -1;
1.2.2.11! misho 534: } else
! 535: sqlite3_free(psStmt);
1.2.2.2 misho 536: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
537: ret = sqlite3_changes(sql);
538: else {
539: if (ret > SQLITE_OK && ret < SQLITE_ROW)
540: MQTT_RTLM_LOG(sql);
541: ret = 0;
542: }
543: sqlite3_finalize(stmt);
544:
545: return ret;
546: }
547:
548: /*
549: * mqtt_rtlm_delete_subscribe() Delete subscribe
550: *
551: * @cfg = loaded config
552: * @sql = SQL handle
1.2.2.8 misho 553: * @connid = connection id
1.2.2.2 misho 554: * @topic = topic
555: * @user = username
556: * @host = hostname
557: * return: -1 error, 0 no changes or >0 deleted rows
558: */
559: int
1.2.2.8 misho 560: mqtt_rtlm_delete_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid,
561: const char *topic, const char *user, const char *host)
1.2.2.2 misho 562: {
563: int ret = 0;
1.2.2.11! misho 564: char *str, *psStmt;
1.2.2.2 misho 565: sqlite3_stmt *stmt;
566:
567: if (!cfg || !sql || !topic)
568: return -1;
569:
1.2.2.3 misho 570: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
1.2.2.2 misho 571: if (!str) {
572: mqtt_rtlm_log("Error:: not found subscribes table name");
573: return -1;
574: }
1.2.2.11! misho 575: psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND "
! 576: "Topic LIKE '%q' AND PubUser LIKE '%q' AND PubHost LIKE '%q';", str,
1.2.2.8 misho 577: connid, topic, user, host);
1.2.2.2 misho 578:
1.2.2.11! misho 579: if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2.2.2 misho 580: MQTT_RTLM_LOG(sql);
1.2.2.11! misho 581: sqlite3_free(psStmt);
1.2.2.2 misho 582: return -1;
1.2.2.11! misho 583: } else
! 584: sqlite3_free(psStmt);
1.2.2.2 misho 585: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
586: ret = sqlite3_changes(sql);
587: else {
588: if (ret > SQLITE_OK && ret < SQLITE_ROW)
589: MQTT_RTLM_LOG(sql);
590: ret = 0;
591: }
592: sqlite3_finalize(stmt);
593:
594: return ret;
595: }
596:
597: /*
598: * mqtt_rtlm_read_subscribe() Get subscribe topic
599: *
600: * @cfg = loaded config
601: * @sql = SQL handle
1.2.2.8 misho 602: * @connid = connection id
1.2.2.2 misho 603: * @topic = topic
604: * return: NULL error or not found and !=NULL allocated subscribe topics
605: */
606: mqtt_subscr_t *
1.2.2.8 misho 607: mqtt_rtlm_read_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *topic)
1.2.2.2 misho 608: {
609: int rowz = 0;
1.2.2.11! misho 610: char *str, *psStmt;
1.2.2.2 misho 611: sqlite3_stmt *stmt;
612: register int j;
613: mqtt_subscr_t *s = NULL;
614:
615: if (!cfg || !sql || !topic)
616: return NULL;
617:
1.2.2.3 misho 618: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
1.2.2.2 misho 619: if (!str) {
620: mqtt_rtlm_log("Error:: not found subscribes table name");
621: return NULL;
622: }
1.2.2.11! misho 623: psStmt = sqlite3_mprintf("SELECT QoS, Topic FROM %s WHERE ConnID = '%q' AND "
! 624: "Topic LIKE '%q';", str, connid, topic);
1.2.2.2 misho 625:
1.2.2.11! misho 626: if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2.2.2 misho 627: MQTT_RTLM_LOG(sql);
1.2.2.11! misho 628: sqlite3_free(psStmt);
1.2.2.2 misho 629: return NULL;
1.2.2.11! misho 630: } else
! 631: sqlite3_free(psStmt);
1.2.2.2 misho 632:
633: /* calculate count of rows and allocate subscribe items */
634: while (sqlite3_step(stmt) == SQLITE_ROW)
635: rowz++;
1.2.2.9 misho 636: if (!(s = io_malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
1.2.2.2 misho 637: mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
638: goto end;
639: } else
640: memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
641: sqlite3_reset(stmt);
642:
643: /* fill with data */
644: for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
645: s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
1.2.2.9 misho 646: s[j].sub_topic.msg_base = (u_char*) io_strdup((char*) sqlite3_column_text(stmt, 1));
1.2.2.2 misho 647: s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
648: s[j].sub_value.msg_base = NULL;
649: s[j].sub_value.msg_len = 0;
650: }
651: end:
652: sqlite3_finalize(stmt);
653:
654: return s;
655: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>