Annotation of mqtt/src/pubmqtt.c, revision 1.2.2.14
1.2 misho 1: #include "global.h"
2:
3:
4: extern const char sql_schema[];
5:
6:
7: /*
8: * mqtt_db_log() Log database connection message
9: *
10: * @fmt = format string
11: * @... = argument list
12: * return: none
13: */
14: static void
15: mqtt_rtlm_log(const char *fmt, ...)
16: {
17: va_list lst;
18:
19: va_start(lst, fmt);
20: vsyslog(LOG_ERR, fmt, lst);
21: va_end(lst);
22: }
1.2.2.5 misho 23: #define MQTT_RTLM_LOG(_sql) (assert((_sql)), mqtt_rtlm_log("Error:: %s(%d) SQL #%d - %s", \
24: __func__, __LINE__, \
1.2 misho 25: sqlite3_errcode((_sql)), sqlite3_errmsg((_sql))))
26:
1.2.2.10 misho 27: /* library pre-loaded actions */
28: void
29: _init()
30: {
31: sqlite3_initialize();
32: }
33:
34: void
35: _fini()
36: {
37: sqlite3_shutdown();
38: }
39:
1.2 misho 40:
41: /*
42: * mqtt_rtlm_open() Open database connection
43: *
44: * @cfg = config filename
45: * return: NULL error or SQL handle
46: */
47: sqlite3 *
1.2.2.3 misho 48: mqtt_rtlm_open(cfg_root_t *cfg)
1.2 misho 49: {
50: sqlite3 *sql = NULL;
51: const char *str = NULL;
52:
53: if (!cfg)
54: return NULL;
55:
1.2.2.3 misho 56: str = cfg_getAttribute(cfg, "mqtt_pub", "name");
1.2 misho 57: if (!str) {
58: mqtt_rtlm_log("Error:: Unknown database name ...\n");
59: return NULL;
60: }
61:
62: if (sqlite3_open_v2(str, &sql, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL)) {
63: MQTT_RTLM_LOG(sql);
64: sqlite3_close(sql);
65: return NULL;
66: }
67:
68: if (sqlite3_exec(sql, sql_schema, NULL, NULL, NULL)) {
69: MQTT_RTLM_LOG(sql);
70: sqlite3_close(sql);
71: return NULL;
72: }
73: return sql;
74: }
75:
76: /*
77: * mqtt_rtlm_close() Close database connection
78: *
79: * @sql = SQL handle
80: * return: none
81: */
82: void
83: mqtt_rtlm_close(sqlite3 *sql)
84: {
85: sqlite3_close(sql);
86: }
87:
88: /*
89: * mqtt_rtlm_init_session() Create session
90: *
91: * @cfg = loaded config
92: * @sql = SQL handle
93: * @connid = connection id
94: * @user = username
95: * @host = hostname
96: * @will = will flag if !=0 must fill arguments
97: * @... = will arguments in order topic,msg,qos,retain
98: * return: -1 error, 0 session already appears or >0 row changed
99: */
100: int
1.2.2.3 misho 101: mqtt_rtlm_init_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user,
1.2 misho 102: const char *host, char will, ...)
103: {
104: va_list lst;
105: int ret = 0;
1.2.2.11 misho 106: char *str, *psStmt;
1.2 misho 107: sqlite3_stmt *stmt;
108:
109: if (!cfg || !sql)
110: return -1;
111:
1.2.2.3 misho 112: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
1.2 misho 113: if (!str) {
114: mqtt_rtlm_log("Error:: not found online table name");
115: return -1;
116: }
117: if (!will)
1.2.2.11 misho 118: psStmt = sqlite3_mprintf("INSERT INTO %s (ConnID, Username, RemoteHost, "
119: "WillFlag) VALUES ('%q', '%q', '%q', 0);", str, connid, user, host);
1.2 misho 120: else {
121: va_start(lst, will);
1.2.2.11 misho 122: psStmt = sqlite3_mprintf("INSERT INTO %s (ConnID, Username, RemoteHost, "
1.2 misho 123: "WillFlag, WillRetain, WillQoS, WillMsg, WillTopic) "
1.2.2.11 misho 124: "VALUES ('%q', '%q', '%q', %d, %d, %d, '%q', '%q');",
1.2 misho 125: str, connid, user, host, will,
126: va_arg(lst, int), va_arg(lst, int), va_arg(lst, char*), va_arg(lst, char*));
127: va_end(lst);
128: }
129:
1.2.2.11 misho 130: if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2 misho 131: MQTT_RTLM_LOG(sql);
1.2.2.11 misho 132: sqlite3_free(psStmt);
1.2 misho 133: return -1;
1.2.2.11 misho 134: } else
135: sqlite3_free(psStmt);
1.2 misho 136: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
137: ret = sqlite3_changes(sql);
138: else {
139: if (ret > SQLITE_OK && ret < SQLITE_ROW)
140: MQTT_RTLM_LOG(sql);
141: ret = 0;
142: }
143: sqlite3_finalize(stmt);
144:
145: return ret;
146: }
147:
148: /*
149: * mqtt_rtlm_fini_session() Delete session(s)
150: *
151: * @cfg = loaded config
152: * @sql = SQL handle
153: * @connid = connection id
154: * @user = username
155: * @host = hostname
156: * return: -1 error, 0 session already appears or >0 row changed
157: */
158: int
1.2.2.3 misho 159: mqtt_rtlm_fini_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
1.2 misho 160: {
161: int ret = 0;
1.2.2.11 misho 162: char *str, *psStmt;
1.2 misho 163: sqlite3_stmt *stmt;
164:
165: if (!cfg || !sql)
166: return -1;
167:
1.2.2.3 misho 168: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
1.2 misho 169: if (!str) {
170: mqtt_rtlm_log("Error:: not found online table name");
171: return -1;
172: }
1.2.2.11 misho 173: psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND Username = '%q' "
174: "AND RemoteHost LIKE '%q';", str, connid, user, host);
1.2 misho 175:
1.2.2.11 misho 176: if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2 misho 177: MQTT_RTLM_LOG(sql);
1.2.2.11 misho 178: sqlite3_free(psStmt);
1.2 misho 179: return -1;
1.2.2.11 misho 180: } else
181: sqlite3_free(psStmt);
1.2 misho 182: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
183: ret = sqlite3_changes(sql);
184: else {
185: if (ret > SQLITE_OK && ret < SQLITE_ROW)
186: MQTT_RTLM_LOG(sql);
187: ret = 0;
188: }
189: sqlite3_finalize(stmt);
190:
191: return ret;
192: }
193:
194: /*
195: * mqtt_rtlm_chk_session() Check session(s)
196: *
197: * @cfg = loaded config
198: * @sql = SQL handle
199: * @connid = connection id
200: * @user = username
201: * @host = hostname
202: * return: -1 error, 0 not logged or >0 logged found rows
203: */
204: int
1.2.2.3 misho 205: mqtt_rtlm_chk_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
1.2 misho 206: {
207: int ret = 0;
1.2.2.11 misho 208: char *str, *psStmt;
1.2 misho 209: sqlite3_stmt *stmt;
210:
211: if (!cfg || !sql)
212: return -1;
213:
1.2.2.3 misho 214: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
1.2 misho 215: if (!str) {
216: mqtt_rtlm_log("Error:: not found online table name");
217: return -1;
218: }
1.2.2.11 misho 219: psStmt = sqlite3_mprintf("SELECT ConnID, RemoteHost FROM %s WHERE "
220: "ConnID = '%q' AND Username LIKE '%q' AND RemoteHost LIKE '%q';",
1.2 misho 221: str, connid, user, host);
222:
1.2.2.11 misho 223: if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2 misho 224: MQTT_RTLM_LOG(sql);
1.2.2.11 misho 225: sqlite3_free(psStmt);
1.2 misho 226: return -1;
1.2.2.11 misho 227: } else
228: sqlite3_free(psStmt);
1.2 misho 229: if (sqlite3_step(stmt) == SQLITE_ROW)
230: ret = sqlite3_changes(sql);
231: else
232: ret = 0;
233: sqlite3_finalize(stmt);
234:
235: return ret;
236: }
237:
238: /*
239: * mqtt_rtlm_write_topic() Publish topic
240: *
241: * @cfg = loaded config
242: * @sql = SQL handle
1.2.2.8 misho 243: * @connid = connection id
1.2 misho 244: * @msgid = MessageID
245: * @topic = topic
246: * @txt = text
1.2.2.11 misho 247: * @txtlen = text length
1.2 misho 248: * @user = username
249: * @host = hostname
1.2.2.12 misho 250: * @qos = QoS
1.2 misho 251: * @retain = !=0 retain message to database
252: * return: -1 error, 0 no publish or >0 published ok
253: */
254: int
1.2.2.8 misho 255: mqtt_rtlm_write_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid,
1.2.2.12 misho 256: const char *topic, void *txt, int txtlen, const char *user,
257: const char *host, char qos, char retain)
1.2 misho 258: {
259: int ret = 0;
1.2.2.11 misho 260: char *str, *psStmt;
1.2 misho 261: sqlite3_stmt *stmt;
262:
263: if (!cfg || !sql || !topic)
264: return -1;
265:
1.2.2.3 misho 266: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
1.2 misho 267: if (!str) {
268: mqtt_rtlm_log("Error:: not found topics table name");
269: return -1;
270: }
1.2.2.12 misho 271: psStmt = sqlite3_mprintf("INSERT INTO %s (QoS, Retain, ConnID, MsgID, Topic, Value, PubUser, "
272: "PubDate, PubHost) VALUES (%d, %d, '%q', %u, '%q', ?1, '%q', "
1.2.2.11 misho 273: "datetime('now', 'localtime'), '%q');",
1.2.2.12 misho 274: str, qos, retain, connid, msgid, topic, user, host);
1.2.2.8 misho 275:
1.2.2.12 misho 276: if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL) || !stmt) {
1.2.2.8 misho 277: MQTT_RTLM_LOG(sql);
1.2.2.11 misho 278: sqlite3_free(psStmt);
279: return -1;
280: } else
281: sqlite3_free(psStmt);
282: if (sqlite3_bind_blob(stmt, 1, txt, txtlen, SQLITE_TRANSIENT)) {
283: MQTT_RTLM_LOG(sql);
284: sqlite3_finalize(stmt);
1.2.2.8 misho 285: return -1;
286: }
287: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
288: ret = sqlite3_changes(sql);
289: else {
290: if (ret > SQLITE_OK && ret < SQLITE_ROW)
291: MQTT_RTLM_LOG(sql);
292: ret = 0;
293: }
294: sqlite3_finalize(stmt);
295:
296: return ret;
297: }
298:
299: /*
300: * mqtt_rtlm_wipe_topic() Wipe all topics
301: *
302: * @cfg = loaded config
303: * @sql = SQL handle
304: * @connid = connection id
305: * @user = username
306: * @retain = -1 no matter
307: * return: -1 error, 0 no changes or >0 deleted rows
308: */
309: int
310: mqtt_rtlm_wipe_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, char retain)
311: {
312: int ret = 0;
1.2.2.11 misho 313: char *str, *rtn, *psStmt;
1.2.2.8 misho 314: sqlite3_stmt *stmt;
315:
316: if (!cfg || !sql || !connid)
317: return -1;
318:
319: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
320: if (!str) {
321: mqtt_rtlm_log("Error:: not found topics table name");
322: return -1;
323: }
324: switch (retain) {
325: case -1:
326: rtn = "";
327: break;
328: case 0:
329: rtn = "AND Retain = 0";
330: break;
331: default:
332: rtn = "AND Retain != 0";
333: break;
334: }
1.2.2.11 misho 335: psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND "
336: "PubUser LIKE '%q' %s;", str, connid, user, rtn);
1.2 misho 337:
1.2.2.11 misho 338: if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2 misho 339: MQTT_RTLM_LOG(sql);
1.2.2.11 misho 340: sqlite3_free(psStmt);
1.2 misho 341: return -1;
1.2.2.11 misho 342: } else
343: sqlite3_free(psStmt);
1.2 misho 344: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
345: ret = sqlite3_changes(sql);
346: else {
347: if (ret > SQLITE_OK && ret < SQLITE_ROW)
348: MQTT_RTLM_LOG(sql);
349: ret = 0;
350: }
351: sqlite3_finalize(stmt);
352:
353: return ret;
354: }
355:
356: /*
357: * mqtt_rtlm_delete_topic() Delete topic
358: *
359: * @cfg = loaded config
360: * @sql = SQL handle
1.2.2.8 misho 361: * @connid = connection id
1.2 misho 362: * @msgid = MessageID
363: * @topic = topic
364: * @user = username
365: * @host = hostname
366: * @retain = -1 no matter
367: * return: -1 error, 0 no changes or >0 deleted rows
368: */
369: int
1.2.2.8 misho 370: mqtt_rtlm_delete_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid,
371: const char *topic, const char *user, const char *host, char retain)
1.2 misho 372: {
373: int ret = 0;
1.2.2.11 misho 374: char *str, *rtn, *psStmt;
1.2 misho 375: sqlite3_stmt *stmt;
376:
377: if (!cfg || !sql || !topic)
378: return -1;
379:
1.2.2.3 misho 380: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
1.2 misho 381: if (!str) {
382: mqtt_rtlm_log("Error:: not found topics table name");
383: return -1;
384: }
385: switch (retain) {
386: case -1:
387: rtn = "";
388: break;
389: case 0:
390: rtn = "AND Retain = 0";
391: break;
392: default:
393: rtn = "AND Retain != 0";
394: break;
395: }
1.2.2.11 misho 396: psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND MsgID = %d AND "
397: "Topic LIKE '%q' AND PubUser LIKE '%q' AND PubHost LIKE '%q' %s;", str,
1.2.2.8 misho 398: connid, msgid, topic, user, host, rtn);
1.2 misho 399:
1.2.2.11 misho 400: if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2 misho 401: MQTT_RTLM_LOG(sql);
1.2.2.11 misho 402: sqlite3_free(psStmt);
1.2 misho 403: return -1;
1.2.2.11 misho 404: } else
405: sqlite3_free(psStmt);
1.2 misho 406: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
407: ret = sqlite3_changes(sql);
408: else {
409: if (ret > SQLITE_OK && ret < SQLITE_ROW)
410: MQTT_RTLM_LOG(sql);
411: ret = 0;
412: }
413: sqlite3_finalize(stmt);
414:
415: return ret;
416: }
417:
418: /*
419: * mqtt_rtlm_read_topic() Get topic
420: *
421: * @cfg = loaded config
422: * @sql = SQL handle
1.2.2.8 misho 423: * @connid = connection id
1.2 misho 424: * @topic = topic
425: * @retain = retain 0 get only dynamic, >0 get only retained and -1 no matter
426: * return: NULL error or not found and !=NULL allocated subscribe topics
427: */
428: mqtt_subscr_t *
1.2.2.14! misho 429: mqtt_rtlm_read_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid,
1.2.2.8 misho 430: const char *topic, char retain)
1.2 misho 431: {
432: int rowz = 0;
1.2.2.11 misho 433: char *str, szStr[STRSIZ], *psStmt;
1.2 misho 434: sqlite3_stmt *stmt;
435: register int j;
436: mqtt_subscr_t *s = NULL;
1.2.2.11 misho 437: ait_val_t v;
1.2 misho 438:
439: if (!cfg || !sql || !topic)
440: return NULL;
441:
442: switch (retain) {
443: case -1:
444: memset(szStr, 0, sizeof szStr);
445: break;
446: case 0:
447: snprintf(szStr, sizeof szStr, "AND Retain = 0");
448: break;
449: default:
450: snprintf(szStr, sizeof szStr, "AND Retain > 0");
451: break;
452: }
453:
1.2.2.3 misho 454: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
1.2 misho 455: if (!str) {
456: mqtt_rtlm_log("Error:: not found topics table name");
457: return NULL;
458: }
1.2.2.14! misho 459: psStmt = sqlite3_mprintf("SELECT QoS, Topic, Value FROM %s WHERE "
! 460: "ConnID LIKE '%q' AND Topic LIKE '%q' %s;",
! 461: str, connid, topic, szStr);
1.2 misho 462:
1.2.2.11 misho 463: if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2 misho 464: MQTT_RTLM_LOG(sql);
1.2.2.11 misho 465: sqlite3_free(psStmt);
1.2 misho 466: return NULL;
1.2.2.11 misho 467: } else
468: sqlite3_free(psStmt);
1.2 misho 469:
470: /* calculate count of rows and allocate subscribe items */
471: while (sqlite3_step(stmt) == SQLITE_ROW)
472: rowz++;
1.2.2.13 misho 473: if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
1.2 misho 474: mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
475: goto end;
476: } else
477: memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
478: sqlite3_reset(stmt);
479:
480: /* fill with data */
481: for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
482: s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
1.2.2.13 misho 483: s[j].sub_topic.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
1.2.2.1 misho 484: s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
1.2.2.11 misho 485: AIT_SET_PTR(&v, (void*) sqlite3_column_blob(stmt, 2), sqlite3_column_bytes(stmt, 2));
486: s[j].sub_value.msg_len = AIT_LEN(&v);
1.2.2.13 misho 487: s[j].sub_value.msg_base = (u_char*) malloc(s[j].sub_value.msg_len);
1.2.2.11 misho 488: if (s[j].sub_value.msg_base)
489: memcpy(s[j].sub_value.msg_base, AIT_GET_PTR(&v), s[j].sub_value.msg_len);
1.2.2.14! misho 490: AIT_FREE_VAL(&v);
1.2 misho 491: }
492: end:
493: sqlite3_finalize(stmt);
494:
495: return s;
496: }
1.2.2.2 misho 497:
498: /*
499: * mqtt_rtlm_write_subscribe() Subscribe topic
500: *
501: * @cfg = loaded config
502: * @sql = SQL handle
1.2.2.8 misho 503: * @connid = connection id
1.2.2.2 misho 504: * @msgid = MessageID
505: * @topic = topic
506: * @user = username
507: * @host = hostname
508: * @qos = Subscribe QoS
509: * return: -1 error, 0 no publish or >0 published ok
510: */
511: int
1.2.2.8 misho 512: mqtt_rtlm_write_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid,
513: const char *topic, const char *user, const char *host, char qos)
1.2.2.2 misho 514: {
515: int ret = 0;
1.2.2.11 misho 516: char *str, *psStmt;
1.2.2.2 misho 517: sqlite3_stmt *stmt;
518:
519: if (!cfg || !sql || !topic)
520: return -1;
521:
1.2.2.3 misho 522: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
1.2.2.2 misho 523: if (!str) {
524: mqtt_rtlm_log("Error:: not found subscribes table name");
525: return -1;
526: }
1.2.2.11 misho 527: psStmt = sqlite3_mprintf("INSERT INTO %s (ConnID, MsgID, QoS, Topic, PubUser, "
528: "PubDate, PubHost) VALUES ('%q', %d, %d, '%q', '%q', "
529: "datetime('now', 'localtime'), '%q');", str,
1.2.2.8 misho 530: connid, msgid, qos, topic, user, host);
1.2.2.2 misho 531:
1.2.2.11 misho 532: if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2.2.2 misho 533: MQTT_RTLM_LOG(sql);
1.2.2.11 misho 534: sqlite3_free(psStmt);
1.2.2.2 misho 535: return -1;
1.2.2.11 misho 536: } else
537: sqlite3_free(psStmt);
1.2.2.2 misho 538: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
539: ret = sqlite3_changes(sql);
540: else {
541: if (ret > SQLITE_OK && ret < SQLITE_ROW)
542: MQTT_RTLM_LOG(sql);
543: ret = 0;
544: }
545: sqlite3_finalize(stmt);
546:
547: return ret;
548: }
549:
550: /*
551: * mqtt_rtlm_delete_subscribe() Delete subscribe
552: *
553: * @cfg = loaded config
554: * @sql = SQL handle
1.2.2.8 misho 555: * @connid = connection id
1.2.2.2 misho 556: * @topic = topic
557: * @user = username
558: * @host = hostname
559: * return: -1 error, 0 no changes or >0 deleted rows
560: */
561: int
1.2.2.8 misho 562: mqtt_rtlm_delete_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid,
563: const char *topic, const char *user, const char *host)
1.2.2.2 misho 564: {
565: int ret = 0;
1.2.2.11 misho 566: char *str, *psStmt;
1.2.2.2 misho 567: sqlite3_stmt *stmt;
568:
569: if (!cfg || !sql || !topic)
570: return -1;
571:
1.2.2.3 misho 572: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
1.2.2.2 misho 573: if (!str) {
574: mqtt_rtlm_log("Error:: not found subscribes table name");
575: return -1;
576: }
1.2.2.11 misho 577: psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND "
578: "Topic LIKE '%q' AND PubUser LIKE '%q' AND PubHost LIKE '%q';", str,
1.2.2.8 misho 579: connid, topic, user, host);
1.2.2.2 misho 580:
1.2.2.11 misho 581: if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2.2.2 misho 582: MQTT_RTLM_LOG(sql);
1.2.2.11 misho 583: sqlite3_free(psStmt);
1.2.2.2 misho 584: return -1;
1.2.2.11 misho 585: } else
586: sqlite3_free(psStmt);
1.2.2.2 misho 587: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
588: ret = sqlite3_changes(sql);
589: else {
590: if (ret > SQLITE_OK && ret < SQLITE_ROW)
591: MQTT_RTLM_LOG(sql);
592: ret = 0;
593: }
594: sqlite3_finalize(stmt);
595:
596: return ret;
597: }
598:
599: /*
600: * mqtt_rtlm_read_subscribe() Get subscribe topic
601: *
602: * @cfg = loaded config
603: * @sql = SQL handle
1.2.2.8 misho 604: * @connid = connection id
1.2.2.2 misho 605: * @topic = topic
606: * return: NULL error or not found and !=NULL allocated subscribe topics
607: */
608: mqtt_subscr_t *
1.2.2.8 misho 609: mqtt_rtlm_read_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *topic)
1.2.2.2 misho 610: {
611: int rowz = 0;
1.2.2.11 misho 612: char *str, *psStmt;
1.2.2.2 misho 613: sqlite3_stmt *stmt;
614: register int j;
615: mqtt_subscr_t *s = NULL;
616:
617: if (!cfg || !sql || !topic)
618: return NULL;
619:
1.2.2.3 misho 620: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
1.2.2.2 misho 621: if (!str) {
622: mqtt_rtlm_log("Error:: not found subscribes table name");
623: return NULL;
624: }
1.2.2.11 misho 625: psStmt = sqlite3_mprintf("SELECT QoS, Topic FROM %s WHERE ConnID = '%q' AND "
626: "Topic LIKE '%q';", str, connid, topic);
1.2.2.2 misho 627:
1.2.2.11 misho 628: if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2.2.2 misho 629: MQTT_RTLM_LOG(sql);
1.2.2.11 misho 630: sqlite3_free(psStmt);
1.2.2.2 misho 631: return NULL;
1.2.2.11 misho 632: } else
633: sqlite3_free(psStmt);
1.2.2.2 misho 634:
635: /* calculate count of rows and allocate subscribe items */
636: while (sqlite3_step(stmt) == SQLITE_ROW)
637: rowz++;
1.2.2.13 misho 638: if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
1.2.2.2 misho 639: mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
640: goto end;
641: } else
642: memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
643: sqlite3_reset(stmt);
644:
645: /* fill with data */
646: for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
647: s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
1.2.2.13 misho 648: s[j].sub_topic.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
1.2.2.2 misho 649: s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
650: s[j].sub_value.msg_base = NULL;
651: s[j].sub_value.msg_len = 0;
652: }
653: end:
654: sqlite3_finalize(stmt);
655:
656: return s;
657: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>