1: #include "global.h"
2:
3:
4: extern const char sql_schema[];
5:
6:
7: /*
8: * mqtt_db_log() Log database connection message
9: *
10: * @fmt = format string
11: * @... = argument list
12: * return: none
13: */
14: static void
15: mqtt_rtlm_log(const char *fmt, ...)
16: {
17: va_list lst;
18:
19: va_start(lst, fmt);
20: vsyslog(LOG_ERR, fmt, lst);
21: va_end(lst);
22: }
23: #define MQTT_RTLM_LOG(_sql) (assert((_sql)), mqtt_rtlm_log("Error:: %s(%d) SQL #%d - %s", \
24: __func__, __LINE__, \
25: sqlite3_errcode((_sql)), sqlite3_errmsg((_sql))))
26:
27: /* library pre-loaded actions */
28: void
29: _init()
30: {
31: sqlite3_initialize();
32: }
33:
34: void
35: _fini()
36: {
37: sqlite3_shutdown();
38: }
39:
40:
41: /*
42: * mqtt_rtlm_open() Open database connection
43: *
44: * @cfg = config filename
45: * return: NULL error or SQL handle
46: */
47: sqlite3 *
48: mqtt_rtlm_open(cfg_root_t *cfg)
49: {
50: sqlite3 *sql = NULL;
51: const char *str = NULL;
52:
53: if (!cfg)
54: return NULL;
55:
56: str = cfg_getAttribute(cfg, "mqtt_pub", "name");
57: if (!str) {
58: mqtt_rtlm_log("Error:: Unknown database name ...\n");
59: return NULL;
60: }
61:
62: if (sqlite3_open_v2(str, &sql, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL)) {
63: MQTT_RTLM_LOG(sql);
64: sqlite3_close(sql);
65: return NULL;
66: }
67:
68: if (sqlite3_exec(sql, sql_schema, NULL, NULL, NULL)) {
69: MQTT_RTLM_LOG(sql);
70: sqlite3_close(sql);
71: return NULL;
72: }
73: return sql;
74: }
75:
76: /*
77: * mqtt_rtlm_close() Close database connection
78: *
79: * @sql = SQL handle
80: * return: none
81: */
82: void
83: mqtt_rtlm_close(sqlite3 *sql)
84: {
85: sqlite3_close(sql);
86: }
87:
88: /*
89: * mqtt_rtlm_init_session() Create session
90: *
91: * @cfg = loaded config
92: * @sql = SQL handle
93: * @connid = connection id
94: * @user = username
95: * @host = hostname
96: * @will = will flag if !=0 must fill arguments
97: * @... = will arguments in order topic,msg,qos,retain
98: * return: -1 error, 0 session already appears or >0 row changed
99: */
100: int
101: mqtt_rtlm_init_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user,
102: const char *host, char will, ...)
103: {
104: va_list lst;
105: int ret = 0;
106: char *str, *psStmt;
107: sqlite3_stmt *stmt;
108:
109: if (!cfg || !sql)
110: return -1;
111:
112: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
113: if (!str) {
114: mqtt_rtlm_log("Error:: not found online table name");
115: return -1;
116: }
117: if (!will)
118: psStmt = sqlite3_mprintf("INSERT INTO %s (ConnID, Username, RemoteHost, "
119: "WillFlag) VALUES ('%q', '%q', '%q', 0);", str, connid, user, host);
120: else {
121: va_start(lst, will);
122: psStmt = sqlite3_mprintf("INSERT INTO %s (ConnID, Username, RemoteHost, "
123: "WillFlag, WillRetain, WillQoS, WillMsg, WillTopic) "
124: "VALUES ('%q', '%q', '%q', %d, %d, %d, '%q', '%q');",
125: str, connid, user, host, will,
126: va_arg(lst, int), va_arg(lst, int), va_arg(lst, char*), va_arg(lst, char*));
127: va_end(lst);
128: }
129:
130: if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
131: MQTT_RTLM_LOG(sql);
132: sqlite3_free(psStmt);
133: return -1;
134: } else
135: sqlite3_free(psStmt);
136: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
137: ret = sqlite3_changes(sql);
138: else {
139: if (ret > SQLITE_OK && ret < SQLITE_ROW)
140: MQTT_RTLM_LOG(sql);
141: ret = 0;
142: }
143: sqlite3_finalize(stmt);
144:
145: return ret;
146: }
147:
148: /*
149: * mqtt_rtlm_fini_session() Delete session(s)
150: *
151: * @cfg = loaded config
152: * @sql = SQL handle
153: * @connid = connection id
154: * @user = username
155: * @host = hostname
156: * return: -1 error, 0 session already appears or >0 row changed
157: */
158: int
159: mqtt_rtlm_fini_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
160: {
161: int ret = 0;
162: char *str, *psStmt;
163: sqlite3_stmt *stmt;
164:
165: if (!cfg || !sql)
166: return -1;
167:
168: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
169: if (!str) {
170: mqtt_rtlm_log("Error:: not found online table name");
171: return -1;
172: }
173: psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND Username = '%q' "
174: "AND RemoteHost LIKE '%q';", str, connid, user, host);
175:
176: if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
177: MQTT_RTLM_LOG(sql);
178: sqlite3_free(psStmt);
179: return -1;
180: } else
181: sqlite3_free(psStmt);
182: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
183: ret = sqlite3_changes(sql);
184: else {
185: if (ret > SQLITE_OK && ret < SQLITE_ROW)
186: MQTT_RTLM_LOG(sql);
187: ret = 0;
188: }
189: sqlite3_finalize(stmt);
190:
191: return ret;
192: }
193:
194: /*
195: * mqtt_rtlm_chk_session() Check session(s)
196: *
197: * @cfg = loaded config
198: * @sql = SQL handle
199: * @connid = connection id
200: * @user = username
201: * @host = hostname
202: * return: -1 error, 0 not logged or >0 logged found rows
203: */
204: int
205: mqtt_rtlm_chk_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
206: {
207: int ret = 0;
208: char *str, *psStmt;
209: sqlite3_stmt *stmt;
210:
211: if (!cfg || !sql)
212: return -1;
213:
214: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
215: if (!str) {
216: mqtt_rtlm_log("Error:: not found online table name");
217: return -1;
218: }
219: psStmt = sqlite3_mprintf("SELECT ConnID, RemoteHost FROM %s WHERE "
220: "ConnID = '%q' AND Username LIKE '%q' AND RemoteHost LIKE '%q';",
221: str, connid, user, host);
222:
223: if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
224: MQTT_RTLM_LOG(sql);
225: sqlite3_free(psStmt);
226: return -1;
227: } else
228: sqlite3_free(psStmt);
229: if (sqlite3_step(stmt) == SQLITE_ROW)
230: ret = sqlite3_changes(sql);
231: else
232: ret = 0;
233: sqlite3_finalize(stmt);
234:
235: return ret;
236: }
237:
238: /*
239: * mqtt_rtlm_write_topic() Publish topic
240: *
241: * @cfg = loaded config
242: * @sql = SQL handle
243: * @connid = connection id
244: * @msgid = MessageID
245: * @topic = topic
246: * @txt = text
247: * @txtlen = text length
248: * @user = username
249: * @host = hostname
250: * @retain = !=0 retain message to database
251: * return: -1 error, 0 no publish or >0 published ok
252: */
253: int
254: mqtt_rtlm_write_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid,
255: const char *topic, void *txt, int txtlen, const char *user, const char *host, char retain)
256: {
257: int ret = 0;
258: char *str, *psStmt;
259: sqlite3_stmt *stmt;
260:
261: if (!cfg || !sql || !topic)
262: return -1;
263:
264: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
265: if (!str) {
266: mqtt_rtlm_log("Error:: not found topics table name");
267: return -1;
268: }
269: psStmt = sqlite3_mprintf("INSERT INTO %s (Retain, ConnID, MsgID, Topic, Value, PubUser, "
270: "PubDate, PubHost) VALUES (%d, '%q', %u, '%q', ?1, '%q', "
271: "datetime('now', 'localtime'), '%q');",
272: str, retain, connid, msgid, topic, txt, user, host);
273:
274: if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
275: MQTT_RTLM_LOG(sql);
276: sqlite3_free(psStmt);
277: return -1;
278: } else
279: sqlite3_free(psStmt);
280: if (sqlite3_bind_blob(stmt, 1, txt, txtlen, SQLITE_TRANSIENT)) {
281: MQTT_RTLM_LOG(sql);
282: sqlite3_finalize(stmt);
283: return -1;
284: }
285: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
286: ret = sqlite3_changes(sql);
287: else {
288: if (ret > SQLITE_OK && ret < SQLITE_ROW)
289: MQTT_RTLM_LOG(sql);
290: ret = 0;
291: }
292: sqlite3_finalize(stmt);
293:
294: return ret;
295: }
296:
297: /*
298: * mqtt_rtlm_wipe_topic() Wipe all topics
299: *
300: * @cfg = loaded config
301: * @sql = SQL handle
302: * @connid = connection id
303: * @user = username
304: * @retain = -1 no matter
305: * return: -1 error, 0 no changes or >0 deleted rows
306: */
307: int
308: mqtt_rtlm_wipe_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, char retain)
309: {
310: int ret = 0;
311: char *str, *rtn, *psStmt;
312: sqlite3_stmt *stmt;
313:
314: if (!cfg || !sql || !connid)
315: return -1;
316:
317: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
318: if (!str) {
319: mqtt_rtlm_log("Error:: not found topics table name");
320: return -1;
321: }
322: switch (retain) {
323: case -1:
324: rtn = "";
325: break;
326: case 0:
327: rtn = "AND Retain = 0";
328: break;
329: default:
330: rtn = "AND Retain != 0";
331: break;
332: }
333: psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND "
334: "PubUser LIKE '%q' %s;", str, connid, user, rtn);
335:
336: if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
337: MQTT_RTLM_LOG(sql);
338: sqlite3_free(psStmt);
339: return -1;
340: } else
341: sqlite3_free(psStmt);
342: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
343: ret = sqlite3_changes(sql);
344: else {
345: if (ret > SQLITE_OK && ret < SQLITE_ROW)
346: MQTT_RTLM_LOG(sql);
347: ret = 0;
348: }
349: sqlite3_finalize(stmt);
350:
351: return ret;
352: }
353:
354: /*
355: * mqtt_rtlm_delete_topic() Delete topic
356: *
357: * @cfg = loaded config
358: * @sql = SQL handle
359: * @connid = connection id
360: * @msgid = MessageID
361: * @topic = topic
362: * @user = username
363: * @host = hostname
364: * @retain = -1 no matter
365: * return: -1 error, 0 no changes or >0 deleted rows
366: */
367: int
368: mqtt_rtlm_delete_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid,
369: const char *topic, const char *user, const char *host, char retain)
370: {
371: int ret = 0;
372: char *str, *rtn, *psStmt;
373: sqlite3_stmt *stmt;
374:
375: if (!cfg || !sql || !topic)
376: return -1;
377:
378: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
379: if (!str) {
380: mqtt_rtlm_log("Error:: not found topics table name");
381: return -1;
382: }
383: switch (retain) {
384: case -1:
385: rtn = "";
386: break;
387: case 0:
388: rtn = "AND Retain = 0";
389: break;
390: default:
391: rtn = "AND Retain != 0";
392: break;
393: }
394: psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND MsgID = %d AND "
395: "Topic LIKE '%q' AND PubUser LIKE '%q' AND PubHost LIKE '%q' %s;", str,
396: connid, msgid, topic, user, host, rtn);
397:
398: if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
399: MQTT_RTLM_LOG(sql);
400: sqlite3_free(psStmt);
401: return -1;
402: } else
403: sqlite3_free(psStmt);
404: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
405: ret = sqlite3_changes(sql);
406: else {
407: if (ret > SQLITE_OK && ret < SQLITE_ROW)
408: MQTT_RTLM_LOG(sql);
409: ret = 0;
410: }
411: sqlite3_finalize(stmt);
412:
413: return ret;
414: }
415:
416: /*
417: * mqtt_rtlm_read_topic() Get topic
418: *
419: * @cfg = loaded config
420: * @sql = SQL handle
421: * @connid = connection id
422: * @msgid = MessageID
423: * @topic = topic
424: * @retain = retain 0 get only dynamic, >0 get only retained and -1 no matter
425: * return: NULL error or not found and !=NULL allocated subscribe topics
426: */
427: mqtt_subscr_t *
428: mqtt_rtlm_read_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid,
429: const char *topic, char retain)
430: {
431: int rowz = 0;
432: char *str, szStr[STRSIZ], *psStmt;
433: sqlite3_stmt *stmt;
434: register int j;
435: mqtt_subscr_t *s = NULL;
436: ait_val_t v;
437:
438: if (!cfg || !sql || !topic)
439: return NULL;
440:
441: switch (retain) {
442: case -1:
443: memset(szStr, 0, sizeof szStr);
444: break;
445: case 0:
446: snprintf(szStr, sizeof szStr, "AND Retain = 0");
447: break;
448: default:
449: snprintf(szStr, sizeof szStr, "AND Retain > 0");
450: break;
451: }
452:
453: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
454: if (!str) {
455: mqtt_rtlm_log("Error:: not found topics table name");
456: return NULL;
457: }
458: psStmt = sqlite3_mprintf("SELECT Retain, Topic, Value FROM %s WHERE "
459: "ConnID = '%q' AND MsgID = %d AND Topic LIKE '%q' %s;",
460: str, connid, msgid, topic, szStr);
461:
462: if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
463: MQTT_RTLM_LOG(sql);
464: sqlite3_free(psStmt);
465: return NULL;
466: } else
467: sqlite3_free(psStmt);
468:
469: /* calculate count of rows and allocate subscribe items */
470: while (sqlite3_step(stmt) == SQLITE_ROW)
471: rowz++;
472: if (!(s = io_malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
473: mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
474: goto end;
475: } else
476: memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
477: sqlite3_reset(stmt);
478:
479: /* fill with data */
480: for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
481: s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
482: s[j].sub_topic.msg_base = (u_char*) io_strdup((char*) sqlite3_column_text(stmt, 1));
483: s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
484: AIT_SET_PTR(&v, (void*) sqlite3_column_blob(stmt, 2), sqlite3_column_bytes(stmt, 2));
485: s[j].sub_value.msg_len = AIT_LEN(&v);
486: s[j].sub_value.msg_base = (u_char*) io_malloc(s[j].sub_value.msg_len);
487: if (s[j].sub_value.msg_base)
488: memcpy(s[j].sub_value.msg_base, AIT_GET_PTR(&v), s[j].sub_value.msg_len);
489: }
490: end:
491: sqlite3_finalize(stmt);
492:
493: return s;
494: }
495:
496: /*
497: * mqtt_rtlm_write_subscribe() Subscribe topic
498: *
499: * @cfg = loaded config
500: * @sql = SQL handle
501: * @connid = connection id
502: * @msgid = MessageID
503: * @topic = topic
504: * @user = username
505: * @host = hostname
506: * @qos = Subscribe QoS
507: * return: -1 error, 0 no publish or >0 published ok
508: */
509: int
510: mqtt_rtlm_write_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid,
511: const char *topic, const char *user, const char *host, char qos)
512: {
513: int ret = 0;
514: char *str, *psStmt;
515: sqlite3_stmt *stmt;
516:
517: if (!cfg || !sql || !topic)
518: return -1;
519:
520: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
521: if (!str) {
522: mqtt_rtlm_log("Error:: not found subscribes table name");
523: return -1;
524: }
525: psStmt = sqlite3_mprintf("INSERT INTO %s (ConnID, MsgID, QoS, Topic, PubUser, "
526: "PubDate, PubHost) VALUES ('%q', %d, %d, '%q', '%q', "
527: "datetime('now', 'localtime'), '%q');", str,
528: connid, msgid, qos, topic, user, host);
529:
530: if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
531: MQTT_RTLM_LOG(sql);
532: sqlite3_free(psStmt);
533: return -1;
534: } else
535: sqlite3_free(psStmt);
536: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
537: ret = sqlite3_changes(sql);
538: else {
539: if (ret > SQLITE_OK && ret < SQLITE_ROW)
540: MQTT_RTLM_LOG(sql);
541: ret = 0;
542: }
543: sqlite3_finalize(stmt);
544:
545: return ret;
546: }
547:
548: /*
549: * mqtt_rtlm_delete_subscribe() Delete subscribe
550: *
551: * @cfg = loaded config
552: * @sql = SQL handle
553: * @connid = connection id
554: * @topic = topic
555: * @user = username
556: * @host = hostname
557: * return: -1 error, 0 no changes or >0 deleted rows
558: */
559: int
560: mqtt_rtlm_delete_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid,
561: const char *topic, const char *user, const char *host)
562: {
563: int ret = 0;
564: char *str, *psStmt;
565: sqlite3_stmt *stmt;
566:
567: if (!cfg || !sql || !topic)
568: return -1;
569:
570: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
571: if (!str) {
572: mqtt_rtlm_log("Error:: not found subscribes table name");
573: return -1;
574: }
575: psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND "
576: "Topic LIKE '%q' AND PubUser LIKE '%q' AND PubHost LIKE '%q';", str,
577: connid, topic, user, host);
578:
579: if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
580: MQTT_RTLM_LOG(sql);
581: sqlite3_free(psStmt);
582: return -1;
583: } else
584: sqlite3_free(psStmt);
585: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
586: ret = sqlite3_changes(sql);
587: else {
588: if (ret > SQLITE_OK && ret < SQLITE_ROW)
589: MQTT_RTLM_LOG(sql);
590: ret = 0;
591: }
592: sqlite3_finalize(stmt);
593:
594: return ret;
595: }
596:
597: /*
598: * mqtt_rtlm_read_subscribe() Get subscribe topic
599: *
600: * @cfg = loaded config
601: * @sql = SQL handle
602: * @connid = connection id
603: * @topic = topic
604: * return: NULL error or not found and !=NULL allocated subscribe topics
605: */
606: mqtt_subscr_t *
607: mqtt_rtlm_read_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *topic)
608: {
609: int rowz = 0;
610: char *str, *psStmt;
611: sqlite3_stmt *stmt;
612: register int j;
613: mqtt_subscr_t *s = NULL;
614:
615: if (!cfg || !sql || !topic)
616: return NULL;
617:
618: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
619: if (!str) {
620: mqtt_rtlm_log("Error:: not found subscribes table name");
621: return NULL;
622: }
623: psStmt = sqlite3_mprintf("SELECT QoS, Topic FROM %s WHERE ConnID = '%q' AND "
624: "Topic LIKE '%q';", str, connid, topic);
625:
626: if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
627: MQTT_RTLM_LOG(sql);
628: sqlite3_free(psStmt);
629: return NULL;
630: } else
631: sqlite3_free(psStmt);
632:
633: /* calculate count of rows and allocate subscribe items */
634: while (sqlite3_step(stmt) == SQLITE_ROW)
635: rowz++;
636: if (!(s = io_malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
637: mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
638: goto end;
639: } else
640: memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
641: sqlite3_reset(stmt);
642:
643: /* fill with data */
644: for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
645: s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
646: s[j].sub_topic.msg_base = (u_char*) io_strdup((char*) sqlite3_column_text(stmt, 1));
647: s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
648: s[j].sub_value.msg_base = NULL;
649: s[j].sub_value.msg_len = 0;
650: }
651: end:
652: sqlite3_finalize(stmt);
653:
654: return s;
655: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>