Annotation of mqtt/src/pubmqtt.c, revision 1.2.2.3
1.2 misho 1: #include "global.h"
2:
3:
4: extern const char sql_schema[];
5:
6:
7: /*
8: * mqtt_db_log() Log database connection message
9: *
10: * @fmt = format string
11: * @... = argument list
12: * return: none
13: */
14: static void
15: mqtt_rtlm_log(const char *fmt, ...)
16: {
17: va_list lst;
18:
19: va_start(lst, fmt);
20: vsyslog(LOG_ERR, fmt, lst);
21: va_end(lst);
22: }
23: #define MQTT_RTLM_LOG(_sql) (assert((_sql)), mqtt_rtlm_log("Error:: SQL #%d - %s", \
24: sqlite3_errcode((_sql)), sqlite3_errmsg((_sql))))
25:
26:
27: /*
28: * mqtt_rtlm_open() Open database connection
29: *
30: * @cfg = config filename
31: * return: NULL error or SQL handle
32: */
33: sqlite3 *
1.2.2.3 ! misho 34: mqtt_rtlm_open(cfg_root_t *cfg)
1.2 misho 35: {
36: sqlite3 *sql = NULL;
37: const char *str = NULL;
38:
39: if (!cfg)
40: return NULL;
41:
42: /*
43: if (!sqlite3_threadsafe() || sqlite3_config(SQLITE_CONFIG_SERIALIZED))
44: return NULL;
45: */
46:
1.2.2.3 ! misho 47: str = cfg_getAttribute(cfg, "mqtt_pub", "name");
1.2 misho 48: if (!str) {
49: mqtt_rtlm_log("Error:: Unknown database name ...\n");
50: return NULL;
51: }
52:
53: if (sqlite3_open_v2(str, &sql, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL)) {
54: MQTT_RTLM_LOG(sql);
55: sqlite3_close(sql);
56: return NULL;
57: }
58:
59: if (sqlite3_exec(sql, sql_schema, NULL, NULL, NULL)) {
60: MQTT_RTLM_LOG(sql);
61: sqlite3_close(sql);
62: return NULL;
63: }
64: return sql;
65: }
66:
67: /*
68: * mqtt_rtlm_close() Close database connection
69: *
70: * @sql = SQL handle
71: * return: none
72: */
73: void
74: mqtt_rtlm_close(sqlite3 *sql)
75: {
76: sqlite3_close(sql);
77: }
78:
79: /*
80: * mqtt_rtlm_init_session() Create session
81: *
82: * @cfg = loaded config
83: * @sql = SQL handle
84: * @connid = connection id
85: * @user = username
86: * @host = hostname
87: * @will = will flag if !=0 must fill arguments
88: * @... = will arguments in order topic,msg,qos,retain
89: * return: -1 error, 0 session already appears or >0 row changed
90: */
91: int
1.2.2.3 ! misho 92: mqtt_rtlm_init_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user,
1.2 misho 93: const char *host, char will, ...)
94: {
95: va_list lst;
96: int ret = 0;
97: char *str, szStmt[BUFSIZ] = { 0 };
98: sqlite3_stmt *stmt;
99:
100: if (!cfg || !sql)
101: return -1;
102:
1.2.2.3 ! misho 103: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
1.2 misho 104: if (!str) {
105: mqtt_rtlm_log("Error:: not found online table name");
106: return -1;
107: }
108: if (!will)
109: snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (ConnID, Username, RemoteHost, "
110: "WillFlag) VALUES ('%s', '%s', '%s', 0);", str, connid, user, host);
111: else {
112: va_start(lst, will);
113: snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (ConnID, Username, RemoteHost, "
114: "WillFlag, WillRetain, WillQoS, WillMsg, WillTopic) "
115: "VALUES ('%s', '%s', '%s', %d, %d, %d, '%s', '%s');",
116: str, connid, user, host, will,
117: va_arg(lst, int), va_arg(lst, int), va_arg(lst, char*), va_arg(lst, char*));
118: va_end(lst);
119: }
120:
121: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
122: MQTT_RTLM_LOG(sql);
123: return -1;
124: }
125: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
126: ret = sqlite3_changes(sql);
127: else {
128: if (ret > SQLITE_OK && ret < SQLITE_ROW)
129: MQTT_RTLM_LOG(sql);
130: ret = 0;
131: }
132: sqlite3_finalize(stmt);
133:
134: return ret;
135: }
136:
137: /*
138: * mqtt_rtlm_fini_session() Delete session(s)
139: *
140: * @cfg = loaded config
141: * @sql = SQL handle
142: * @connid = connection id
143: * @user = username
144: * @host = hostname
145: * return: -1 error, 0 session already appears or >0 row changed
146: */
147: int
1.2.2.3 ! misho 148: mqtt_rtlm_fini_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
1.2 misho 149: {
150: int ret = 0;
151: char *str, szStmt[BUFSIZ] = { 0 };
152: sqlite3_stmt *stmt;
153:
154: if (!cfg || !sql)
155: return -1;
156:
1.2.2.3 ! misho 157: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
1.2 misho 158: if (!str) {
159: mqtt_rtlm_log("Error:: not found online table name");
160: return -1;
161: }
162: snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE ConnID = '%s' AND Username = '%s' "
163: "AND RemoteHost LIKE '%s';", str, connid, user, host);
164:
165: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
166: MQTT_RTLM_LOG(sql);
167: return -1;
168: }
169: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
170: ret = sqlite3_changes(sql);
171: else {
172: if (ret > SQLITE_OK && ret < SQLITE_ROW)
173: MQTT_RTLM_LOG(sql);
174: ret = 0;
175: }
176: sqlite3_finalize(stmt);
177:
178: return ret;
179: }
180:
181: /*
182: * mqtt_rtlm_chk_session() Check session(s)
183: *
184: * @cfg = loaded config
185: * @sql = SQL handle
186: * @connid = connection id
187: * @user = username
188: * @host = hostname
189: * return: -1 error, 0 not logged or >0 logged found rows
190: */
191: int
1.2.2.3 ! misho 192: mqtt_rtlm_chk_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
1.2 misho 193: {
194: int ret = 0;
195: char *str, szStmt[BUFSIZ] = { 0 };
196: sqlite3_stmt *stmt;
197:
198: if (!cfg || !sql)
199: return -1;
200:
1.2.2.3 ! misho 201: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
1.2 misho 202: if (!str) {
203: mqtt_rtlm_log("Error:: not found online table name");
204: return -1;
205: }
206: snprintf(szStmt, sizeof szStmt, "SELECT ConnID, RemoteHost FROM %s WHERE "
207: "ConnID = '%s' AND Username = '%s' AND RemoteHost LIKE '%s';",
208: str, connid, user, host);
209:
210: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
211: MQTT_RTLM_LOG(sql);
212: return -1;
213: }
214: if (sqlite3_step(stmt) == SQLITE_ROW)
215: ret = sqlite3_changes(sql);
216: else
217: ret = 0;
218: sqlite3_finalize(stmt);
219:
220: return ret;
221: }
222:
223: /*
224: * mqtt_rtlm_write_topic() Publish topic
225: *
226: * @cfg = loaded config
227: * @sql = SQL handle
228: * @msgid = MessageID
229: * @topic = topic
230: * @txt = text
231: * @user = username
232: * @host = hostname
233: * @retain = !=0 retain message to database
234: * return: -1 error, 0 no publish or >0 published ok
235: */
236: int
1.2.2.3 ! misho 237: mqtt_rtlm_write_topic(cfg_root_t *cfg, sqlite3 *sql, u_short msgid, const char *topic, const char *txt,
1.2 misho 238: const char *user, const char *host, char retain)
239: {
240: int ret = 0;
241: char *str, szStmt[BUFSIZ] = { 0 };
242: sqlite3_stmt *stmt;
243:
244: if (!cfg || !sql || !topic)
245: return -1;
246:
1.2.2.3 ! misho 247: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
1.2 misho 248: if (!str) {
249: mqtt_rtlm_log("Error:: not found topics table name");
250: return -1;
251: }
252: snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Retain, MsgID, Topic, Value, PubUser, "
253: "PubDate, PubHost) VALUES (%d, %d, '%s', '%s', '%s', "
254: "datetime('now', 'localtime'), '%s');",
255: str, retain, msgid, topic, txt, user, host);
256:
257: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
258: MQTT_RTLM_LOG(sql);
259: return -1;
260: }
261: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
262: ret = sqlite3_changes(sql);
263: else {
264: if (ret > SQLITE_OK && ret < SQLITE_ROW)
265: MQTT_RTLM_LOG(sql);
266: ret = 0;
267: }
268: sqlite3_finalize(stmt);
269:
270: return ret;
271: }
272:
273: /*
274: * mqtt_rtlm_delete_topic() Delete topic
275: *
276: * @cfg = loaded config
277: * @sql = SQL handle
278: * @msgid = MessageID
279: * @topic = topic
280: * @user = username
281: * @host = hostname
282: * @retain = -1 no matter
283: * return: -1 error, 0 no changes or >0 deleted rows
284: */
285: int
1.2.2.3 ! misho 286: mqtt_rtlm_delete_topic(cfg_root_t *cfg, sqlite3 *sql, u_short msgid, const char *topic,
1.2 misho 287: const char *user, const char *host, char retain)
288: {
289: int ret = 0;
290: char *str, *rtn, szStmt[BUFSIZ] = { 0 };
291: sqlite3_stmt *stmt;
292:
293: if (!cfg || !sql || !topic)
294: return -1;
295:
1.2.2.3 ! misho 296: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
1.2 misho 297: if (!str) {
298: mqtt_rtlm_log("Error:: not found topics table name");
299: return -1;
300: }
301: switch (retain) {
302: case -1:
303: rtn = "";
304: break;
305: case 0:
306: rtn = "AND Retain = 0";
307: break;
308: default:
309: rtn = "AND Retain != 0";
310: break;
311: }
312: snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE MsgID = %d AND Topic LIKE '%s' AND "
313: "PubUser LIKE '%s' AND PubHost LIKE '%s' %s;", str,
314: msgid, topic, user, host, rtn);
315:
316: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
317: MQTT_RTLM_LOG(sql);
318: return -1;
319: }
320: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
321: ret = sqlite3_changes(sql);
322: else {
323: if (ret > SQLITE_OK && ret < SQLITE_ROW)
324: MQTT_RTLM_LOG(sql);
325: ret = 0;
326: }
327: sqlite3_finalize(stmt);
328:
329: return ret;
330: }
331:
332: /*
333: * mqtt_rtlm_read_topic() Get topic
334: *
335: * @cfg = loaded config
336: * @sql = SQL handle
337: * @msgid = MessageID
338: * @topic = topic
339: * @retain = retain 0 get only dynamic, >0 get only retained and -1 no matter
340: * return: NULL error or not found and !=NULL allocated subscribe topics
341: */
342: mqtt_subscr_t *
1.2.2.3 ! misho 343: mqtt_rtlm_read_topic(cfg_root_t *cfg, sqlite3 *sql, u_short msgid, const char *topic, char retain)
1.2 misho 344: {
345: int rowz = 0;
346: char *str, szStr[STRSIZ], szStmt[BUFSIZ] = { 0 };
347: sqlite3_stmt *stmt;
348: register int j;
349: mqtt_subscr_t *s = NULL;
350:
351: if (!cfg || !sql || !topic)
352: return NULL;
353:
354: switch (retain) {
355: case -1:
356: memset(szStr, 0, sizeof szStr);
357: break;
358: case 0:
359: snprintf(szStr, sizeof szStr, "AND Retain = 0");
360: break;
361: default:
362: snprintf(szStr, sizeof szStr, "AND Retain > 0");
363: break;
364: }
365:
1.2.2.3 ! misho 366: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
1.2 misho 367: if (!str) {
368: mqtt_rtlm_log("Error:: not found topics table name");
369: return NULL;
370: }
371: snprintf(szStmt, sizeof szStmt, "SELECT Retain, Topic, Value FROM %s WHERE "
372: "MsgID = %d AND Topic LIKE '%s' %s;",
373: str, msgid, topic, szStr);
374:
375: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
376: MQTT_RTLM_LOG(sql);
377: return NULL;
378: }
379:
380: /* calculate count of rows and allocate subscribe items */
381: while (sqlite3_step(stmt) == SQLITE_ROW)
382: rowz++;
383: if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
384: mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
385: goto end;
386: } else
387: memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
388: sqlite3_reset(stmt);
389:
390: /* fill with data */
391: for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
392: s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
1.2.2.1 misho 393: s[j].sub_topic.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
394: s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
395: s[j].sub_value.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 2));
396: s[j].sub_value.msg_len = strlen((char*) s[j].sub_value.msg_base);
1.2 misho 397: }
398: end:
399: sqlite3_finalize(stmt);
400:
401: return s;
402: }
1.2.2.2 misho 403:
404: /*
405: * mqtt_rtlm_write_subscribe() Subscribe topic
406: *
407: * @cfg = loaded config
408: * @sql = SQL handle
409: * @msgid = MessageID
410: * @topic = topic
411: * @user = username
412: * @host = hostname
413: * @qos = Subscribe QoS
414: * return: -1 error, 0 no publish or >0 published ok
415: */
416: int
1.2.2.3 ! misho 417: mqtt_rtlm_write_subscribe(cfg_root_t *cfg, sqlite3 *sql, u_short msgid, const char *topic,
1.2.2.2 misho 418: const char *user, const char *host, char qos)
419: {
420: int ret = 0;
421: char *str, szStmt[BUFSIZ] = { 0 };
422: sqlite3_stmt *stmt;
423:
424: if (!cfg || !sql || !topic)
425: return -1;
426:
1.2.2.3 ! misho 427: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
1.2.2.2 misho 428: if (!str) {
429: mqtt_rtlm_log("Error:: not found subscribes table name");
430: return -1;
431: }
432: snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (MsgID, QoS, Topic, PubUser, "
433: "PubDate, PubHost) VALUES (%d, %d, '%s', '%s', "
434: "datetime('now', 'localtime'), '%s');", str,
435: msgid, qos, topic, user, host);
436:
437: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
438: MQTT_RTLM_LOG(sql);
439: return -1;
440: }
441: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
442: ret = sqlite3_changes(sql);
443: else {
444: if (ret > SQLITE_OK && ret < SQLITE_ROW)
445: MQTT_RTLM_LOG(sql);
446: ret = 0;
447: }
448: sqlite3_finalize(stmt);
449:
450: return ret;
451: }
452:
453: /*
454: * mqtt_rtlm_delete_subscribe() Delete subscribe
455: *
456: * @cfg = loaded config
457: * @sql = SQL handle
458: * @topic = topic
459: * @user = username
460: * @host = hostname
461: * @qos = Subscribe QoS if -1 no matter
462: * return: -1 error, 0 no changes or >0 deleted rows
463: */
464: int
1.2.2.3 ! misho 465: mqtt_rtlm_delete_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *topic,
1.2.2.2 misho 466: const char *user, const char *host, char qos)
467: {
468: int ret = 0;
469: char *str, szStr[STRSIZ] = { 0 }, szStmt[BUFSIZ] = { 0 };
470: sqlite3_stmt *stmt;
471:
472: if (!cfg || !sql || !topic)
473: return -1;
474:
1.2.2.3 ! misho 475: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
1.2.2.2 misho 476: if (!str) {
477: mqtt_rtlm_log("Error:: not found subscribes table name");
478: return -1;
479: }
480: if (qos > -1 && qos < 3)
481: snprintf(szStr, sizeof szStr, "AND QoS = %d", qos);
482: snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE Topic LIKE '%s' AND "
483: "PubUser LIKE '%s' AND PubHost LIKE '%s' %s;", str,
484: topic, user, host, szStr);
485:
486: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
487: MQTT_RTLM_LOG(sql);
488: return -1;
489: }
490: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
491: ret = sqlite3_changes(sql);
492: else {
493: if (ret > SQLITE_OK && ret < SQLITE_ROW)
494: MQTT_RTLM_LOG(sql);
495: ret = 0;
496: }
497: sqlite3_finalize(stmt);
498:
499: return ret;
500: }
501:
502: /*
503: * mqtt_rtlm_read_subscribe() Get subscribe topic
504: *
505: * @cfg = loaded config
506: * @sql = SQL handle
507: * @topic = topic
508: * return: NULL error or not found and !=NULL allocated subscribe topics
509: */
510: mqtt_subscr_t *
1.2.2.3 ! misho 511: mqtt_rtlm_read_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *topic)
1.2.2.2 misho 512: {
513: int rowz = 0;
514: char *str, szStmt[BUFSIZ] = { 0 };
515: sqlite3_stmt *stmt;
516: register int j;
517: mqtt_subscr_t *s = NULL;
518:
519: if (!cfg || !sql || !topic)
520: return NULL;
521:
1.2.2.3 ! misho 522: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
1.2.2.2 misho 523: if (!str) {
524: mqtt_rtlm_log("Error:: not found subscribes table name");
525: return NULL;
526: }
527: snprintf(szStmt, sizeof szStmt, "SELECT QoS, Topic FROM %s WHERE Topic LIKE '%s';", str, topic);
528:
529: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
530: MQTT_RTLM_LOG(sql);
531: return NULL;
532: }
533:
534: /* calculate count of rows and allocate subscribe items */
535: while (sqlite3_step(stmt) == SQLITE_ROW)
536: rowz++;
537: if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
538: mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
539: goto end;
540: } else
541: memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
542: sqlite3_reset(stmt);
543:
544: /* fill with data */
545: for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
546: s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
547: s[j].sub_topic.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
548: s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
549: s[j].sub_value.msg_base = NULL;
550: s[j].sub_value.msg_len = 0;
551: }
552: end:
553: sqlite3_finalize(stmt);
554:
555: return s;
556: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>