Annotation of mqtt/src/pubmqtt.c, revision 1.2.2.7
1.2 misho 1: #include "global.h"
2:
3:
4: extern const char sql_schema[];
5:
6:
7: /*
8: * mqtt_db_log() Log database connection message
9: *
10: * @fmt = format string
11: * @... = argument list
12: * return: none
13: */
14: static void
15: mqtt_rtlm_log(const char *fmt, ...)
16: {
17: va_list lst;
18:
19: va_start(lst, fmt);
20: vsyslog(LOG_ERR, fmt, lst);
21: va_end(lst);
22: }
1.2.2.5 misho 23: #define MQTT_RTLM_LOG(_sql) (assert((_sql)), mqtt_rtlm_log("Error:: %s(%d) SQL #%d - %s", \
24: __func__, __LINE__, \
1.2 misho 25: sqlite3_errcode((_sql)), sqlite3_errmsg((_sql))))
26:
27:
28: /*
29: * mqtt_rtlm_open() Open database connection
30: *
31: * @cfg = config filename
32: * return: NULL error or SQL handle
33: */
34: sqlite3 *
1.2.2.3 misho 35: mqtt_rtlm_open(cfg_root_t *cfg)
1.2 misho 36: {
37: sqlite3 *sql = NULL;
38: const char *str = NULL;
39:
40: if (!cfg)
41: return NULL;
42:
1.2.2.3 misho 43: str = cfg_getAttribute(cfg, "mqtt_pub", "name");
1.2 misho 44: if (!str) {
45: mqtt_rtlm_log("Error:: Unknown database name ...\n");
46: return NULL;
47: }
48:
49: if (sqlite3_open_v2(str, &sql, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL)) {
50: MQTT_RTLM_LOG(sql);
51: sqlite3_close(sql);
52: return NULL;
53: }
54:
55: if (sqlite3_exec(sql, sql_schema, NULL, NULL, NULL)) {
56: MQTT_RTLM_LOG(sql);
57: sqlite3_close(sql);
58: return NULL;
59: }
60: return sql;
61: }
62:
63: /*
64: * mqtt_rtlm_close() Close database connection
65: *
66: * @sql = SQL handle
67: * return: none
68: */
69: void
70: mqtt_rtlm_close(sqlite3 *sql)
71: {
72: sqlite3_close(sql);
73: }
74:
75: /*
76: * mqtt_rtlm_init_session() Create session
77: *
78: * @cfg = loaded config
79: * @sql = SQL handle
80: * @connid = connection id
81: * @user = username
82: * @host = hostname
83: * @will = will flag if !=0 must fill arguments
84: * @... = will arguments in order topic,msg,qos,retain
85: * return: -1 error, 0 session already appears or >0 row changed
86: */
87: int
1.2.2.3 misho 88: mqtt_rtlm_init_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user,
1.2 misho 89: const char *host, char will, ...)
90: {
91: va_list lst;
92: int ret = 0;
93: char *str, szStmt[BUFSIZ] = { 0 };
94: sqlite3_stmt *stmt;
95:
96: if (!cfg || !sql)
97: return -1;
98:
1.2.2.3 misho 99: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
1.2 misho 100: if (!str) {
101: mqtt_rtlm_log("Error:: not found online table name");
102: return -1;
103: }
104: if (!will)
105: snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (ConnID, Username, RemoteHost, "
106: "WillFlag) VALUES ('%s', '%s', '%s', 0);", str, connid, user, host);
107: else {
108: va_start(lst, will);
109: snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (ConnID, Username, RemoteHost, "
110: "WillFlag, WillRetain, WillQoS, WillMsg, WillTopic) "
111: "VALUES ('%s', '%s', '%s', %d, %d, %d, '%s', '%s');",
112: str, connid, user, host, will,
113: va_arg(lst, int), va_arg(lst, int), va_arg(lst, char*), va_arg(lst, char*));
114: va_end(lst);
115: }
116:
117: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
118: MQTT_RTLM_LOG(sql);
119: return -1;
120: }
121: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
122: ret = sqlite3_changes(sql);
123: else {
124: if (ret > SQLITE_OK && ret < SQLITE_ROW)
125: MQTT_RTLM_LOG(sql);
126: ret = 0;
127: }
128: sqlite3_finalize(stmt);
129:
130: return ret;
131: }
132:
133: /*
134: * mqtt_rtlm_fini_session() Delete session(s)
135: *
136: * @cfg = loaded config
137: * @sql = SQL handle
138: * @connid = connection id
139: * @user = username
140: * @host = hostname
141: * return: -1 error, 0 session already appears or >0 row changed
142: */
143: int
1.2.2.3 misho 144: mqtt_rtlm_fini_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
1.2 misho 145: {
146: int ret = 0;
147: char *str, szStmt[BUFSIZ] = { 0 };
148: sqlite3_stmt *stmt;
149:
150: if (!cfg || !sql)
151: return -1;
152:
1.2.2.3 misho 153: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
1.2 misho 154: if (!str) {
155: mqtt_rtlm_log("Error:: not found online table name");
156: return -1;
157: }
158: snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE ConnID = '%s' AND Username = '%s' "
159: "AND RemoteHost LIKE '%s';", str, connid, user, host);
160:
161: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
162: MQTT_RTLM_LOG(sql);
163: return -1;
164: }
165: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
166: ret = sqlite3_changes(sql);
167: else {
168: if (ret > SQLITE_OK && ret < SQLITE_ROW)
169: MQTT_RTLM_LOG(sql);
170: ret = 0;
171: }
172: sqlite3_finalize(stmt);
173:
174: return ret;
175: }
176:
177: /*
178: * mqtt_rtlm_chk_session() Check session(s)
179: *
180: * @cfg = loaded config
181: * @sql = SQL handle
182: * @connid = connection id
183: * @user = username
184: * @host = hostname
185: * return: -1 error, 0 not logged or >0 logged found rows
186: */
187: int
1.2.2.3 misho 188: mqtt_rtlm_chk_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
1.2 misho 189: {
190: int ret = 0;
191: char *str, szStmt[BUFSIZ] = { 0 };
192: sqlite3_stmt *stmt;
193:
194: if (!cfg || !sql)
195: return -1;
196:
1.2.2.3 misho 197: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
1.2 misho 198: if (!str) {
199: mqtt_rtlm_log("Error:: not found online table name");
200: return -1;
201: }
202: snprintf(szStmt, sizeof szStmt, "SELECT ConnID, RemoteHost FROM %s WHERE "
203: "ConnID = '%s' AND Username = '%s' AND RemoteHost LIKE '%s';",
204: str, connid, user, host);
205:
206: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
207: MQTT_RTLM_LOG(sql);
208: return -1;
209: }
210: if (sqlite3_step(stmt) == SQLITE_ROW)
211: ret = sqlite3_changes(sql);
212: else
213: ret = 0;
214: sqlite3_finalize(stmt);
215:
216: return ret;
217: }
218:
219: /*
220: * mqtt_rtlm_write_topic() Publish topic
221: *
222: * @cfg = loaded config
223: * @sql = SQL handle
224: * @msgid = MessageID
225: * @topic = topic
226: * @txt = text
227: * @user = username
228: * @host = hostname
229: * @retain = !=0 retain message to database
230: * return: -1 error, 0 no publish or >0 published ok
231: */
232: int
1.2.2.3 misho 233: mqtt_rtlm_write_topic(cfg_root_t *cfg, sqlite3 *sql, u_short msgid, const char *topic, const char *txt,
1.2 misho 234: const char *user, const char *host, char retain)
235: {
236: int ret = 0;
237: char *str, szStmt[BUFSIZ] = { 0 };
238: sqlite3_stmt *stmt;
239:
240: if (!cfg || !sql || !topic)
241: return -1;
242:
1.2.2.3 misho 243: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
1.2 misho 244: if (!str) {
245: mqtt_rtlm_log("Error:: not found topics table name");
246: return -1;
247: }
248: snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Retain, MsgID, Topic, Value, PubUser, "
249: "PubDate, PubHost) VALUES (%d, %d, '%s', '%s', '%s', "
250: "datetime('now', 'localtime'), '%s');",
251: str, retain, msgid, topic, txt, user, host);
252:
253: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
254: MQTT_RTLM_LOG(sql);
255: return -1;
256: }
257: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
258: ret = sqlite3_changes(sql);
259: else {
260: if (ret > SQLITE_OK && ret < SQLITE_ROW)
261: MQTT_RTLM_LOG(sql);
262: ret = 0;
263: }
264: sqlite3_finalize(stmt);
265:
266: return ret;
267: }
268:
269: /*
270: * mqtt_rtlm_delete_topic() Delete topic
271: *
272: * @cfg = loaded config
273: * @sql = SQL handle
274: * @msgid = MessageID
275: * @topic = topic
276: * @user = username
277: * @host = hostname
278: * @retain = -1 no matter
279: * return: -1 error, 0 no changes or >0 deleted rows
280: */
281: int
1.2.2.3 misho 282: mqtt_rtlm_delete_topic(cfg_root_t *cfg, sqlite3 *sql, u_short msgid, const char *topic,
1.2 misho 283: const char *user, const char *host, char retain)
284: {
285: int ret = 0;
286: char *str, *rtn, szStmt[BUFSIZ] = { 0 };
287: sqlite3_stmt *stmt;
288:
289: if (!cfg || !sql || !topic)
290: return -1;
291:
1.2.2.3 misho 292: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
1.2 misho 293: if (!str) {
294: mqtt_rtlm_log("Error:: not found topics table name");
295: return -1;
296: }
297: switch (retain) {
298: case -1:
299: rtn = "";
300: break;
301: case 0:
302: rtn = "AND Retain = 0";
303: break;
304: default:
305: rtn = "AND Retain != 0";
306: break;
307: }
308: snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE MsgID = %d AND Topic LIKE '%s' AND "
309: "PubUser LIKE '%s' AND PubHost LIKE '%s' %s;", str,
310: msgid, topic, user, host, rtn);
311:
312: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
313: MQTT_RTLM_LOG(sql);
314: return -1;
315: }
316: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
317: ret = sqlite3_changes(sql);
318: else {
319: if (ret > SQLITE_OK && ret < SQLITE_ROW)
320: MQTT_RTLM_LOG(sql);
321: ret = 0;
322: }
323: sqlite3_finalize(stmt);
324:
325: return ret;
326: }
327:
328: /*
329: * mqtt_rtlm_read_topic() Get topic
330: *
331: * @cfg = loaded config
332: * @sql = SQL handle
333: * @msgid = MessageID
334: * @topic = topic
335: * @retain = retain 0 get only dynamic, >0 get only retained and -1 no matter
336: * return: NULL error or not found and !=NULL allocated subscribe topics
337: */
338: mqtt_subscr_t *
1.2.2.3 misho 339: mqtt_rtlm_read_topic(cfg_root_t *cfg, sqlite3 *sql, u_short msgid, const char *topic, char retain)
1.2 misho 340: {
341: int rowz = 0;
342: char *str, szStr[STRSIZ], szStmt[BUFSIZ] = { 0 };
343: sqlite3_stmt *stmt;
344: register int j;
345: mqtt_subscr_t *s = NULL;
346:
347: if (!cfg || !sql || !topic)
348: return NULL;
349:
350: switch (retain) {
351: case -1:
352: memset(szStr, 0, sizeof szStr);
353: break;
354: case 0:
355: snprintf(szStr, sizeof szStr, "AND Retain = 0");
356: break;
357: default:
358: snprintf(szStr, sizeof szStr, "AND Retain > 0");
359: break;
360: }
361:
1.2.2.3 misho 362: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
1.2 misho 363: if (!str) {
364: mqtt_rtlm_log("Error:: not found topics table name");
365: return NULL;
366: }
367: snprintf(szStmt, sizeof szStmt, "SELECT Retain, Topic, Value FROM %s WHERE "
368: "MsgID = %d AND Topic LIKE '%s' %s;",
369: str, msgid, topic, szStr);
370:
371: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
372: MQTT_RTLM_LOG(sql);
373: return NULL;
374: }
375:
376: /* calculate count of rows and allocate subscribe items */
377: while (sqlite3_step(stmt) == SQLITE_ROW)
378: rowz++;
379: if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
380: mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
381: goto end;
382: } else
383: memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
384: sqlite3_reset(stmt);
385:
386: /* fill with data */
387: for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
388: s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
1.2.2.1 misho 389: s[j].sub_topic.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
390: s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
391: s[j].sub_value.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 2));
392: s[j].sub_value.msg_len = strlen((char*) s[j].sub_value.msg_base);
1.2 misho 393: }
394: end:
395: sqlite3_finalize(stmt);
396:
397: return s;
398: }
1.2.2.2 misho 399:
400: /*
401: * mqtt_rtlm_write_subscribe() Subscribe topic
402: *
403: * @cfg = loaded config
404: * @sql = SQL handle
405: * @msgid = MessageID
406: * @topic = topic
407: * @user = username
408: * @host = hostname
409: * @qos = Subscribe QoS
410: * return: -1 error, 0 no publish or >0 published ok
411: */
412: int
1.2.2.3 misho 413: mqtt_rtlm_write_subscribe(cfg_root_t *cfg, sqlite3 *sql, u_short msgid, const char *topic,
1.2.2.2 misho 414: const char *user, const char *host, char qos)
415: {
416: int ret = 0;
417: char *str, szStmt[BUFSIZ] = { 0 };
418: sqlite3_stmt *stmt;
419:
420: if (!cfg || !sql || !topic)
421: return -1;
422:
1.2.2.3 misho 423: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
1.2.2.2 misho 424: if (!str) {
425: mqtt_rtlm_log("Error:: not found subscribes table name");
426: return -1;
427: }
428: snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (MsgID, QoS, Topic, PubUser, "
429: "PubDate, PubHost) VALUES (%d, %d, '%s', '%s', "
430: "datetime('now', 'localtime'), '%s');", str,
431: msgid, qos, topic, user, host);
432:
433: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
434: MQTT_RTLM_LOG(sql);
435: return -1;
436: }
437: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
438: ret = sqlite3_changes(sql);
439: else {
440: if (ret > SQLITE_OK && ret < SQLITE_ROW)
441: MQTT_RTLM_LOG(sql);
442: ret = 0;
443: }
444: sqlite3_finalize(stmt);
445:
446: return ret;
447: }
448:
449: /*
450: * mqtt_rtlm_delete_subscribe() Delete subscribe
451: *
452: * @cfg = loaded config
453: * @sql = SQL handle
454: * @topic = topic
455: * @user = username
456: * @host = hostname
457: * return: -1 error, 0 no changes or >0 deleted rows
458: */
459: int
1.2.2.3 misho 460: mqtt_rtlm_delete_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *topic,
1.2.2.7 ! misho 461: const char *user, const char *host)
1.2.2.2 misho 462: {
463: int ret = 0;
1.2.2.7 ! misho 464: char *str, szStmt[BUFSIZ] = { 0 };
1.2.2.2 misho 465: sqlite3_stmt *stmt;
466:
467: if (!cfg || !sql || !topic)
468: return -1;
469:
1.2.2.3 misho 470: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
1.2.2.2 misho 471: if (!str) {
472: mqtt_rtlm_log("Error:: not found subscribes table name");
473: return -1;
474: }
475: snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE Topic LIKE '%s' AND "
1.2.2.7 ! misho 476: "PubUser LIKE '%s' AND PubHost LIKE '%s';", str,
! 477: topic, user, host);
1.2.2.2 misho 478:
479: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
480: MQTT_RTLM_LOG(sql);
481: return -1;
482: }
483: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
484: ret = sqlite3_changes(sql);
485: else {
486: if (ret > SQLITE_OK && ret < SQLITE_ROW)
487: MQTT_RTLM_LOG(sql);
488: ret = 0;
489: }
490: sqlite3_finalize(stmt);
491:
492: return ret;
493: }
494:
495: /*
496: * mqtt_rtlm_read_subscribe() Get subscribe topic
497: *
498: * @cfg = loaded config
499: * @sql = SQL handle
500: * @topic = topic
501: * return: NULL error or not found and !=NULL allocated subscribe topics
502: */
503: mqtt_subscr_t *
1.2.2.3 misho 504: mqtt_rtlm_read_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *topic)
1.2.2.2 misho 505: {
506: int rowz = 0;
507: char *str, szStmt[BUFSIZ] = { 0 };
508: sqlite3_stmt *stmt;
509: register int j;
510: mqtt_subscr_t *s = NULL;
511:
512: if (!cfg || !sql || !topic)
513: return NULL;
514:
1.2.2.3 misho 515: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
1.2.2.2 misho 516: if (!str) {
517: mqtt_rtlm_log("Error:: not found subscribes table name");
518: return NULL;
519: }
520: snprintf(szStmt, sizeof szStmt, "SELECT QoS, Topic FROM %s WHERE Topic LIKE '%s';", str, topic);
521:
522: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
523: MQTT_RTLM_LOG(sql);
524: return NULL;
525: }
526:
527: /* calculate count of rows and allocate subscribe items */
528: while (sqlite3_step(stmt) == SQLITE_ROW)
529: rowz++;
530: if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
531: mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
532: goto end;
533: } else
534: memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
535: sqlite3_reset(stmt);
536:
537: /* fill with data */
538: for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
539: s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
540: s[j].sub_topic.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
541: s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
542: s[j].sub_value.msg_base = NULL;
543: s[j].sub_value.msg_len = 0;
544: }
545: end:
546: sqlite3_finalize(stmt);
547:
548: return s;
549: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>