Annotation of mqtt/src/pubmqtt.c, revision 1.3.2.1
1.3.2.1 ! misho 1: /*************************************************************************
! 2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
! 3: * by Michael Pounov <misho@openbsd-bg.org>
! 4: *
! 5: * $Author: misho $
! 6: * $Id: global.h,v 1.4 2012/07/03 08:57:04 misho Exp $
! 7: *
! 8: **************************************************************************
! 9: The ELWIX and AITNET software is distributed under the following
! 10: terms:
! 11:
! 12: All of the documentation and software included in the ELWIX and AITNET
! 13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
! 14:
! 15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
! 16: by Michael Pounov <misho@elwix.org>. All rights reserved.
! 17:
! 18: Redistribution and use in source and binary forms, with or without
! 19: modification, are permitted provided that the following conditions
! 20: are met:
! 21: 1. Redistributions of source code must retain the above copyright
! 22: notice, this list of conditions and the following disclaimer.
! 23: 2. Redistributions in binary form must reproduce the above copyright
! 24: notice, this list of conditions and the following disclaimer in the
! 25: documentation and/or other materials provided with the distribution.
! 26: 3. All advertising materials mentioning features or use of this software
! 27: must display the following acknowledgement:
! 28: This product includes software developed by Michael Pounov <misho@elwix.org>
! 29: ELWIX - Embedded LightWeight unIX and its contributors.
! 30: 4. Neither the name of AITNET nor the names of its contributors
! 31: may be used to endorse or promote products derived from this software
! 32: without specific prior written permission.
! 33:
! 34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
! 35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
! 36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
! 37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
! 38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
! 39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
! 40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
! 41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
! 42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
! 43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
! 44: SUCH DAMAGE.
! 45: */
1.2 misho 46: #include "global.h"
47:
48:
49: extern const char sql_schema[];
50:
51:
52: /*
53: * mqtt_db_log() Log database connection message
54: *
55: * @fmt = format string
56: * @... = argument list
57: * return: none
58: */
59: static void
60: mqtt_rtlm_log(const char *fmt, ...)
61: {
62: va_list lst;
63:
64: va_start(lst, fmt);
65: vsyslog(LOG_ERR, fmt, lst);
66: va_end(lst);
67: }
1.3 misho 68: #define MQTT_RTLM_LOG(_sql) (assert((_sql)), mqtt_rtlm_log("Error:: %s(%d) SQL #%d - %s", \
69: __func__, __LINE__, \
1.2 misho 70: sqlite3_errcode((_sql)), sqlite3_errmsg((_sql))))
71:
1.3 misho 72: /* library pre-loaded actions */
73: void
74: _init()
75: {
76: sqlite3_initialize();
77: }
78:
79: void
80: _fini()
81: {
82: sqlite3_shutdown();
83: }
84:
1.2 misho 85:
86: /*
87: * mqtt_rtlm_open() Open database connection
88: *
89: * @cfg = config filename
90: * return: NULL error or SQL handle
91: */
92: sqlite3 *
1.3 misho 93: mqtt_rtlm_open(cfg_root_t *cfg)
1.2 misho 94: {
95: sqlite3 *sql = NULL;
96: const char *str = NULL;
97:
98: if (!cfg)
99: return NULL;
100:
1.3 misho 101: str = cfg_getAttribute(cfg, "mqtt_pub", "name");
1.2 misho 102: if (!str) {
103: mqtt_rtlm_log("Error:: Unknown database name ...\n");
104: return NULL;
105: }
106:
107: if (sqlite3_open_v2(str, &sql, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL)) {
108: MQTT_RTLM_LOG(sql);
109: sqlite3_close(sql);
110: return NULL;
111: }
112:
113: if (sqlite3_exec(sql, sql_schema, NULL, NULL, NULL)) {
114: MQTT_RTLM_LOG(sql);
115: sqlite3_close(sql);
116: return NULL;
117: }
118: return sql;
119: }
120:
121: /*
122: * mqtt_rtlm_close() Close database connection
123: *
124: * @sql = SQL handle
125: * return: none
126: */
127: void
128: mqtt_rtlm_close(sqlite3 *sql)
129: {
130: sqlite3_close(sql);
131: }
132:
133: /*
134: * mqtt_rtlm_init_session() Create session
135: *
136: * @cfg = loaded config
137: * @sql = SQL handle
138: * @connid = connection id
139: * @user = username
140: * @host = hostname
141: * @will = will flag if !=0 must fill arguments
142: * @... = will arguments in order topic,msg,qos,retain
143: * return: -1 error, 0 session already appears or >0 row changed
144: */
145: int
1.3 misho 146: mqtt_rtlm_init_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user,
1.2 misho 147: const char *host, char will, ...)
148: {
149: va_list lst;
150: int ret = 0;
1.3 misho 151: char *str, *psStmt;
1.2 misho 152: sqlite3_stmt *stmt;
153:
154: if (!cfg || !sql)
155: return -1;
156:
1.3 misho 157: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
1.2 misho 158: if (!str) {
159: mqtt_rtlm_log("Error:: not found online table name");
160: return -1;
161: }
162: if (!will)
1.3 misho 163: psStmt = sqlite3_mprintf("INSERT INTO %s (ConnID, Username, RemoteHost, "
164: "WillFlag) VALUES ('%q', '%q', '%q', 0);", str, connid, user, host);
1.2 misho 165: else {
166: va_start(lst, will);
1.3 misho 167: psStmt = sqlite3_mprintf("INSERT INTO %s (ConnID, Username, RemoteHost, "
1.2 misho 168: "WillFlag, WillRetain, WillQoS, WillMsg, WillTopic) "
1.3 misho 169: "VALUES ('%q', '%q', '%q', %d, %d, %d, '%q', '%q');",
1.2 misho 170: str, connid, user, host, will,
171: va_arg(lst, int), va_arg(lst, int), va_arg(lst, char*), va_arg(lst, char*));
172: va_end(lst);
173: }
174:
1.3 misho 175: if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2 misho 176: MQTT_RTLM_LOG(sql);
1.3 misho 177: sqlite3_free(psStmt);
1.2 misho 178: return -1;
1.3 misho 179: } else
180: sqlite3_free(psStmt);
1.2 misho 181: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
182: ret = sqlite3_changes(sql);
183: else {
184: if (ret > SQLITE_OK && ret < SQLITE_ROW)
185: MQTT_RTLM_LOG(sql);
186: ret = 0;
187: }
188: sqlite3_finalize(stmt);
189:
190: return ret;
191: }
192:
193: /*
194: * mqtt_rtlm_fini_session() Delete session(s)
195: *
196: * @cfg = loaded config
197: * @sql = SQL handle
198: * @connid = connection id
199: * @user = username
200: * @host = hostname
201: * return: -1 error, 0 session already appears or >0 row changed
202: */
203: int
1.3 misho 204: mqtt_rtlm_fini_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
1.2 misho 205: {
206: int ret = 0;
1.3 misho 207: char *str, *psStmt;
1.2 misho 208: sqlite3_stmt *stmt;
209:
210: if (!cfg || !sql)
211: return -1;
212:
1.3 misho 213: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
1.2 misho 214: if (!str) {
215: mqtt_rtlm_log("Error:: not found online table name");
216: return -1;
217: }
1.3 misho 218: psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND Username = '%q' "
219: "AND RemoteHost LIKE '%q';", str, connid, user, host);
1.2 misho 220:
1.3 misho 221: if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2 misho 222: MQTT_RTLM_LOG(sql);
1.3 misho 223: sqlite3_free(psStmt);
1.2 misho 224: return -1;
1.3 misho 225: } else
226: sqlite3_free(psStmt);
1.2 misho 227: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
228: ret = sqlite3_changes(sql);
229: else {
230: if (ret > SQLITE_OK && ret < SQLITE_ROW)
231: MQTT_RTLM_LOG(sql);
232: ret = 0;
233: }
234: sqlite3_finalize(stmt);
235:
236: return ret;
237: }
238:
239: /*
240: * mqtt_rtlm_chk_session() Check session(s)
241: *
242: * @cfg = loaded config
243: * @sql = SQL handle
244: * @connid = connection id
245: * @user = username
246: * @host = hostname
247: * return: -1 error, 0 not logged or >0 logged found rows
248: */
249: int
1.3 misho 250: mqtt_rtlm_chk_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
1.2 misho 251: {
252: int ret = 0;
1.3 misho 253: char *str, *psStmt;
1.2 misho 254: sqlite3_stmt *stmt;
255:
256: if (!cfg || !sql)
257: return -1;
258:
1.3 misho 259: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
1.2 misho 260: if (!str) {
261: mqtt_rtlm_log("Error:: not found online table name");
262: return -1;
263: }
1.3 misho 264: psStmt = sqlite3_mprintf("SELECT ConnID, RemoteHost FROM %s WHERE "
265: "ConnID = '%q' AND Username LIKE '%q' AND RemoteHost LIKE '%q';",
1.2 misho 266: str, connid, user, host);
267:
1.3 misho 268: if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2 misho 269: MQTT_RTLM_LOG(sql);
1.3 misho 270: sqlite3_free(psStmt);
1.2 misho 271: return -1;
1.3 misho 272: } else
273: sqlite3_free(psStmt);
1.2 misho 274: if (sqlite3_step(stmt) == SQLITE_ROW)
275: ret = sqlite3_changes(sql);
276: else
277: ret = 0;
278: sqlite3_finalize(stmt);
279:
280: return ret;
281: }
282:
283: /*
284: * mqtt_rtlm_write_topic() Publish topic
285: *
286: * @cfg = loaded config
287: * @sql = SQL handle
1.3 misho 288: * @connid = connection id
1.2 misho 289: * @msgid = MessageID
290: * @topic = topic
291: * @txt = text
1.3 misho 292: * @txtlen = text length
1.2 misho 293: * @user = username
294: * @host = hostname
1.3 misho 295: * @qos = QoS
1.2 misho 296: * @retain = !=0 retain message to database
297: * return: -1 error, 0 no publish or >0 published ok
298: */
299: int
1.3 misho 300: mqtt_rtlm_write_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid,
301: const char *topic, void *txt, int txtlen, const char *user,
302: const char *host, char qos, char retain)
1.2 misho 303: {
304: int ret = 0;
1.3 misho 305: char *str, *psStmt;
1.2 misho 306: sqlite3_stmt *stmt;
307:
308: if (!cfg || !sql || !topic)
309: return -1;
310:
1.3 misho 311: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
1.2 misho 312: if (!str) {
313: mqtt_rtlm_log("Error:: not found topics table name");
314: return -1;
315: }
1.3 misho 316: psStmt = sqlite3_mprintf("INSERT INTO %s (QoS, Retain, ConnID, MsgID, Topic, Value, PubUser, "
317: "PubDate, PubHost) VALUES (%d, %d, '%q', %u, '%q', ?1, '%q', "
318: "datetime('now', 'localtime'), '%q');",
319: str, qos, retain, connid, msgid, topic, user, host);
1.2 misho 320:
1.3 misho 321: if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL) || !stmt) {
322: MQTT_RTLM_LOG(sql);
323: sqlite3_free(psStmt);
324: return -1;
325: } else
326: sqlite3_free(psStmt);
327: if (sqlite3_bind_blob(stmt, 1, txt, txtlen, SQLITE_TRANSIENT)) {
1.2 misho 328: MQTT_RTLM_LOG(sql);
1.3 misho 329: sqlite3_finalize(stmt);
1.2 misho 330: return -1;
331: }
332: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
333: ret = sqlite3_changes(sql);
334: else {
335: if (ret > SQLITE_OK && ret < SQLITE_ROW)
336: MQTT_RTLM_LOG(sql);
337: ret = 0;
338: }
339: sqlite3_finalize(stmt);
340:
341: return ret;
342: }
343:
344: /*
1.3 misho 345: * mqtt_rtlm_wipe_topic() Wipe all topics
346: *
347: * @cfg = loaded config
348: * @sql = SQL handle
349: * @connid = connection id
350: * @user = username
351: * @retain = -1 no matter
352: * return: -1 error, 0 no changes or >0 deleted rows
353: */
354: int
355: mqtt_rtlm_wipe_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, char retain)
356: {
357: int ret = 0;
358: char *str, *rtn, *psStmt;
359: sqlite3_stmt *stmt;
360:
361: if (!cfg || !sql || !connid)
362: return -1;
363:
364: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
365: if (!str) {
366: mqtt_rtlm_log("Error:: not found topics table name");
367: return -1;
368: }
369: switch (retain) {
370: case -1:
371: rtn = "";
372: break;
373: case 0:
374: rtn = "AND Retain = 0";
375: break;
376: default:
377: rtn = "AND Retain != 0";
378: break;
379: }
380: psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND "
381: "PubUser LIKE '%q' %s;", str, connid, user, rtn);
382:
383: if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
384: MQTT_RTLM_LOG(sql);
385: sqlite3_free(psStmt);
386: return -1;
387: } else
388: sqlite3_free(psStmt);
389: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
390: ret = sqlite3_changes(sql);
391: else {
392: if (ret > SQLITE_OK && ret < SQLITE_ROW)
393: MQTT_RTLM_LOG(sql);
394: ret = 0;
395: }
396: sqlite3_finalize(stmt);
397:
398: return ret;
399: }
400:
401: /*
1.2 misho 402: * mqtt_rtlm_delete_topic() Delete topic
403: *
404: * @cfg = loaded config
405: * @sql = SQL handle
1.3 misho 406: * @connid = connection id
1.2 misho 407: * @msgid = MessageID
408: * @topic = topic
409: * @user = username
410: * @host = hostname
411: * @retain = -1 no matter
412: * return: -1 error, 0 no changes or >0 deleted rows
413: */
414: int
1.3 misho 415: mqtt_rtlm_delete_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid,
416: const char *topic, const char *user, const char *host, char retain)
1.2 misho 417: {
418: int ret = 0;
1.3 misho 419: char *str, *rtn, *psStmt;
1.2 misho 420: sqlite3_stmt *stmt;
421:
422: if (!cfg || !sql || !topic)
423: return -1;
424:
1.3 misho 425: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
1.2 misho 426: if (!str) {
427: mqtt_rtlm_log("Error:: not found topics table name");
428: return -1;
429: }
430: switch (retain) {
431: case -1:
432: rtn = "";
433: break;
434: case 0:
435: rtn = "AND Retain = 0";
436: break;
437: default:
438: rtn = "AND Retain != 0";
439: break;
440: }
1.3 misho 441: psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND MsgID = %d AND "
442: "Topic LIKE '%q' AND PubUser LIKE '%q' AND PubHost LIKE '%q' %s;", str,
443: connid, msgid, topic, user, host, rtn);
1.2 misho 444:
1.3 misho 445: if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2 misho 446: MQTT_RTLM_LOG(sql);
1.3 misho 447: sqlite3_free(psStmt);
1.2 misho 448: return -1;
1.3 misho 449: } else
450: sqlite3_free(psStmt);
1.2 misho 451: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
452: ret = sqlite3_changes(sql);
453: else {
454: if (ret > SQLITE_OK && ret < SQLITE_ROW)
455: MQTT_RTLM_LOG(sql);
456: ret = 0;
457: }
458: sqlite3_finalize(stmt);
459:
460: return ret;
461: }
462:
463: /*
464: * mqtt_rtlm_read_topic() Get topic
465: *
466: * @cfg = loaded config
467: * @sql = SQL handle
1.3 misho 468: * @connid = connection id
1.2 misho 469: * @topic = topic
470: * @retain = retain 0 get only dynamic, >0 get only retained and -1 no matter
471: * return: NULL error or not found and !=NULL allocated subscribe topics
472: */
473: mqtt_subscr_t *
1.3 misho 474: mqtt_rtlm_read_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid,
475: const char *topic, char retain)
1.2 misho 476: {
477: int rowz = 0;
1.3 misho 478: char *str, szStr[STRSIZ], *psStmt;
1.2 misho 479: sqlite3_stmt *stmt;
480: register int j;
481: mqtt_subscr_t *s = NULL;
1.3 misho 482: ait_val_t v;
1.2 misho 483:
484: if (!cfg || !sql || !topic)
485: return NULL;
486:
487: switch (retain) {
488: case -1:
489: memset(szStr, 0, sizeof szStr);
490: break;
491: case 0:
492: snprintf(szStr, sizeof szStr, "AND Retain = 0");
493: break;
494: default:
495: snprintf(szStr, sizeof szStr, "AND Retain > 0");
496: break;
497: }
498:
1.3 misho 499: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
1.2 misho 500: if (!str) {
501: mqtt_rtlm_log("Error:: not found topics table name");
502: return NULL;
503: }
1.3 misho 504: psStmt = sqlite3_mprintf("SELECT QoS, Topic, Value FROM %s WHERE "
505: "ConnID LIKE '%q' AND Topic LIKE '%q' %s;",
506: str, connid, topic, szStr);
1.2 misho 507:
1.3 misho 508: if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2 misho 509: MQTT_RTLM_LOG(sql);
1.3 misho 510: sqlite3_free(psStmt);
1.2 misho 511: return NULL;
1.3 misho 512: } else
513: sqlite3_free(psStmt);
514:
515: /* calculate count of rows and allocate subscribe items */
516: while (sqlite3_step(stmt) == SQLITE_ROW)
517: rowz++;
518: if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
519: mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
520: goto end;
521: } else
522: memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
523: sqlite3_reset(stmt);
524:
525: /* fill with data */
526: for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
527: s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
528: s[j].sub_topic.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
529: s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
530: AIT_SET_PTR(&v, (void*) sqlite3_column_blob(stmt, 2), sqlite3_column_bytes(stmt, 2));
531: s[j].sub_value.msg_len = AIT_LEN(&v);
532: s[j].sub_value.msg_base = (u_char*) malloc(s[j].sub_value.msg_len);
533: if (s[j].sub_value.msg_base)
534: memcpy(s[j].sub_value.msg_base, AIT_GET_PTR(&v), s[j].sub_value.msg_len);
535: AIT_FREE_VAL(&v);
1.2 misho 536: }
1.3 misho 537: end:
538: sqlite3_finalize(stmt);
539:
540: return s;
541: }
542:
543: /*
544: * mqtt_rtlm_write_subscribe() Subscribe topic
545: *
546: * @cfg = loaded config
547: * @sql = SQL handle
548: * @connid = connection id
549: * @msgid = MessageID
550: * @topic = topic
551: * @user = username
552: * @host = hostname
553: * @qos = Subscribe QoS
554: * return: -1 error, 0 no publish or >0 published ok
555: */
556: int
557: mqtt_rtlm_write_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid,
558: const char *topic, const char *user, const char *host, char qos)
559: {
560: int ret = 0;
561: char *str, *psStmt;
562: sqlite3_stmt *stmt;
563:
564: if (!cfg || !sql || !topic)
565: return -1;
566:
567: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
568: if (!str) {
569: mqtt_rtlm_log("Error:: not found subscribes table name");
570: return -1;
571: }
572: psStmt = sqlite3_mprintf("INSERT INTO %s (ConnID, MsgID, QoS, Topic, PubUser, "
573: "PubDate, PubHost) VALUES ('%q', %d, %d, '%q', '%q', "
574: "datetime('now', 'localtime'), '%q');", str,
575: connid, msgid, qos, topic, user, host);
576:
577: if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
578: MQTT_RTLM_LOG(sql);
579: sqlite3_free(psStmt);
580: return -1;
581: } else
582: sqlite3_free(psStmt);
583: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
584: ret = sqlite3_changes(sql);
585: else {
586: if (ret > SQLITE_OK && ret < SQLITE_ROW)
587: MQTT_RTLM_LOG(sql);
588: ret = 0;
589: }
590: sqlite3_finalize(stmt);
591:
592: return ret;
593: }
594:
595: /*
596: * mqtt_rtlm_delete_subscribe() Delete subscribe
597: *
598: * @cfg = loaded config
599: * @sql = SQL handle
600: * @connid = connection id
601: * @topic = topic
602: * @user = username
603: * @host = hostname
604: * return: -1 error, 0 no changes or >0 deleted rows
605: */
606: int
607: mqtt_rtlm_delete_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid,
608: const char *topic, const char *user, const char *host)
609: {
610: int ret = 0;
611: char *str, *psStmt;
612: sqlite3_stmt *stmt;
613:
614: if (!cfg || !sql || !topic)
615: return -1;
616:
617: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
618: if (!str) {
619: mqtt_rtlm_log("Error:: not found subscribes table name");
620: return -1;
621: }
622: psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND "
623: "Topic LIKE '%q' AND PubUser LIKE '%q' AND PubHost LIKE '%q';", str,
624: connid, topic, user, host);
625:
626: if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
627: MQTT_RTLM_LOG(sql);
628: sqlite3_free(psStmt);
629: return -1;
630: } else
631: sqlite3_free(psStmt);
632: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
633: ret = sqlite3_changes(sql);
634: else {
635: if (ret > SQLITE_OK && ret < SQLITE_ROW)
636: MQTT_RTLM_LOG(sql);
637: ret = 0;
638: }
639: sqlite3_finalize(stmt);
640:
641: return ret;
642: }
643:
644: /*
645: * mqtt_rtlm_read_subscribe() Get subscribe topic
646: *
647: * @cfg = loaded config
648: * @sql = SQL handle
649: * @connid = connection id
650: * @topic = topic
651: * return: NULL error or not found and !=NULL allocated subscribe topics
652: */
653: mqtt_subscr_t *
654: mqtt_rtlm_read_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *topic)
655: {
656: int rowz = 0;
657: char *str, *psStmt;
658: sqlite3_stmt *stmt;
659: register int j;
660: mqtt_subscr_t *s = NULL;
661:
662: if (!cfg || !sql || !topic)
663: return NULL;
664:
665: str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
666: if (!str) {
667: mqtt_rtlm_log("Error:: not found subscribes table name");
668: return NULL;
669: }
670: psStmt = sqlite3_mprintf("SELECT QoS, Topic FROM %s WHERE ConnID = '%q' AND "
671: "Topic LIKE '%q';", str, connid, topic);
672:
673: if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
674: MQTT_RTLM_LOG(sql);
675: sqlite3_free(psStmt);
676: return NULL;
677: } else
678: sqlite3_free(psStmt);
1.2 misho 679:
680: /* calculate count of rows and allocate subscribe items */
681: while (sqlite3_step(stmt) == SQLITE_ROW)
682: rowz++;
683: if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
684: mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
685: goto end;
686: } else
687: memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
688: sqlite3_reset(stmt);
689:
690: /* fill with data */
691: for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
692: s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
1.3 misho 693: s[j].sub_topic.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
694: s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
695: s[j].sub_value.msg_base = NULL;
696: s[j].sub_value.msg_len = 0;
1.2 misho 697: }
698: end:
699: sqlite3_finalize(stmt);
700:
701: return s;
702: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>