Annotation of mqtt/src/pubmqtt.c, revision 1.1.2.15
1.1.2.1 misho 1: #include "global.h"
2:
3:
1.1.2.12 misho 4: extern const char sql_schema[];
5:
6:
1.1.2.2 misho 7: /*
8: * mqtt_db_log() Log database connection message
9: *
10: * @fmt = format string
11: * @... = argument list
12: * return: none
13: */
14: static void
15: mqtt_rtlm_log(const char *fmt, ...)
16: {
17: va_list lst;
18:
19: va_start(lst, fmt);
20: vsyslog(LOG_ERR, fmt, lst);
21: va_end(lst);
22: }
1.1.2.3 misho 23: #define MQTT_RTLM_LOG(_sql) (assert((_sql)), mqtt_rtlm_log("Error:: SQL #%d - %s", \
1.1.2.2 misho 24: sqlite3_errcode((_sql)), sqlite3_errmsg((_sql))))
25:
26:
27: /*
28: * mqtt_rtlm_open() Open database connection
29: *
1.1.2.3 misho 30: * @cfg = config filename
1.1.2.2 misho 31: * return: NULL error or SQL handle
32: */
33: sqlite3 *
34: mqtt_rtlm_open(sl_config *cfg)
35: {
36: sqlite3 *sql = NULL;
37: const char *str = NULL;
38:
39: if (!cfg)
40: return NULL;
41:
1.1.2.12 misho 42: sqlite3_config(SQLITE_CONFIG_SERIALIZED);
43:
1.1.2.2 misho 44: str = (const char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("name"));
45: if (!str) {
46: mqtt_rtlm_log("Error:: Unknown database name ...\n");
47: return NULL;
48: }
49:
1.1.2.13 misho 50: if (sqlite3_open_v2(str, &sql, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL)) {
1.1.2.3 misho 51: MQTT_RTLM_LOG(sql);
1.1.2.2 misho 52: sqlite3_close(sql);
53: return NULL;
54: }
55:
1.1.2.12 misho 56: if (sqlite3_exec(sql, sql_schema, NULL, NULL, NULL)) {
57: MQTT_RTLM_LOG(sql);
58: sqlite3_close(sql);
59: return NULL;
60: }
1.1.2.2 misho 61: return sql;
62: }
63:
64: /*
65: * mqtt_rtlm_close() Close database connection
66: *
67: * @sql = SQL handle
68: * return: none
69: */
70: void
71: mqtt_rtlm_close(sqlite3 *sql)
72: {
73: sqlite3_close(sql);
74: }
1.1.2.4 misho 75:
76: /*
77: * mqtt_rtlm_init_session() Create session
78: *
79: * @cfg = loaded config
80: * @sql = SQL handle
81: * @user = username
1.1.2.14 misho 82: * @connid = connection id
1.1.2.4 misho 83: * @host = hostname
84: * @port = port
1.1.2.14 misho 85: * @will = will flag if !=0 must fill arguments
86: * @... = will arguments in order topic,msg,qos,retain
1.1.2.4 misho 87: * return: -1 error, 0 session already appears or >0 row changed
88: */
89: int
1.1.2.14 misho 90: mqtt_rtlm_init_session(sl_config *cfg, sqlite3 *sql, const char *user, const char *connid,
91: const char *host, u_short port, char will, ...)
1.1.2.4 misho 92: {
1.1.2.14 misho 93: va_list lst;
1.1.2.4 misho 94: int ret = 0;
95: char *str, szStmt[BUFSIZ] = { 0 };
96: sqlite3_stmt *stmt;
97:
98: if (!cfg || !sql)
99: return -1;
100:
101: str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
102: if (!str) {
1.1.2.8 misho 103: mqtt_rtlm_log("Error:: not found online table name");
1.1.2.4 misho 104: return -1;
105: }
1.1.2.14 misho 106: if (!will)
107: snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Username, ConnID, RemoteHost, RemotePort, "
108: "WillFlag) VALUES ('%s', '%s', '%s', %d, 0);", str, user, connid, host, port);
109: else {
110: va_start(lst, will);
111: snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Username, ConnID, RemoteHost, RemotePort, "
112: "WillFlag, WillRetain, WillQoS, WillMsg, WillTopic) "
113: "VALUES ('%s', '%s', '%s', %d, %d, %d, %d, '%s', '%s');",
114: str, user, connid, host, port, will,
115: va_arg(lst, int), va_arg(lst, int), va_arg(lst, char*), va_arg(lst, char*));
116: va_end(lst);
117: }
1.1.2.4 misho 118:
119: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
120: MQTT_RTLM_LOG(sql);
121: return -1;
122: }
123: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
124: ret = sqlite3_changes(sql);
125: else {
126: if (ret > SQLITE_OK && ret < SQLITE_ROW)
127: MQTT_RTLM_LOG(sql);
128: ret = 0;
129: }
130: sqlite3_finalize(stmt);
131:
132: return ret;
133: }
1.1.2.5 misho 134:
135: /*
136: * mqtt_rtlm_fini_session() Delete session(s)
137: *
138: * @cfg = loaded config
139: * @sql = SQL handle
140: * @user = username
1.1.2.14 misho 141: * @connid = connection id
1.1.2.5 misho 142: * @host = hostname
143: * return: -1 error, 0 session already appears or >0 row changed
144: */
145: int
1.1.2.14 misho 146: mqtt_rtlm_fini_session(sl_config *cfg, sqlite3 *sql, const char *user, const char *connid, const char *host)
1.1.2.5 misho 147: {
148: int ret = 0;
149: char *str, szStmt[BUFSIZ] = { 0 };
150: sqlite3_stmt *stmt;
151:
152: if (!cfg || !sql)
153: return -1;
154:
155: str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
156: if (!str) {
1.1.2.8 misho 157: mqtt_rtlm_log("Error:: not found online table name");
1.1.2.5 misho 158: return -1;
159: }
1.1.2.14 misho 160: snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE Username = '%s' AND ConnID = '%s' "
161: "AND RemoteHost LIKE '%s';", str, user, connid, host);
1.1.2.5 misho 162:
163: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
164: MQTT_RTLM_LOG(sql);
165: return -1;
166: }
167: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
168: ret = sqlite3_changes(sql);
169: else {
170: if (ret > SQLITE_OK && ret < SQLITE_ROW)
171: MQTT_RTLM_LOG(sql);
172: ret = 0;
173: }
174: sqlite3_finalize(stmt);
175:
176: return ret;
177: }
1.1.2.6 misho 178:
179: /*
180: * mqtt_rtlm_chk_session() Check session(s)
181: *
182: * @cfg = loaded config
183: * @sql = SQL handle
184: * @user = username
1.1.2.14 misho 185: * @connid = connection id
1.1.2.6 misho 186: * @host = hostname
187: * return: -1 error, 0 not logged or >0 logged found rows
188: */
189: int
1.1.2.14 misho 190: mqtt_rtlm_chk_session(sl_config *cfg, sqlite3 *sql, const char *user, const char *connid, const char *host)
1.1.2.6 misho 191: {
192: int ret = 0;
193: char *str, szStmt[BUFSIZ] = { 0 };
194: sqlite3_stmt *stmt;
195:
196: if (!cfg || !sql)
197: return -1;
198:
199: str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
200: if (!str) {
1.1.2.8 misho 201: mqtt_rtlm_log("Error:: not found online table name");
1.1.2.6 misho 202: return -1;
203: }
1.1.2.14 misho 204: snprintf(szStmt, sizeof szStmt, "SELECT ConnID, RemoteHost, RemotePort FROM %s WHERE "
205: "Username = '%s' AND ConnID = '%s' AND RemoteHost LIKE '%s';",
206: str, user, connid, host);
1.1.2.6 misho 207:
208: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
209: MQTT_RTLM_LOG(sql);
210: return -1;
211: }
1.1.2.7 misho 212: if (sqlite3_step(stmt) == SQLITE_ROW)
213: ret = sqlite3_changes(sql);
214: else
215: ret = 0;
1.1.2.6 misho 216: sqlite3_finalize(stmt);
217:
218: return ret;
219: }
1.1.2.8 misho 220:
221: /*
222: * mqtt_rtlm_write_topic() Publish topic
223: *
224: * @cfg = loaded config
225: * @sql = SQL handle
1.1.2.14 misho 226: * @msgid = MessageID
1.1.2.8 misho 227: * @topic = topic
228: * @txt = text
229: * @user = username
230: * @host = hostname
231: * @retain = !=0 retain message to database
232: * return: -1 error, 0 no publish or >0 published ok
233: */
234: int
1.1.2.14 misho 235: mqtt_rtlm_write_topic(sl_config *cfg, sqlite3 *sql, u_short msgid, const char *topic, const char *txt,
1.1.2.8 misho 236: const char *user, const char *host, char retain)
237: {
238: int ret = 0;
239: char *str, szStmt[BUFSIZ] = { 0 };
240: sqlite3_stmt *stmt;
241:
242: if (!cfg || !sql || !topic)
243: return -1;
244:
245: str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));
246: if (!str) {
247: mqtt_rtlm_log("Error:: not found topics table name");
248: return -1;
249: }
1.1.2.14 misho 250: snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Retain, MsgID, Topic, Value, PubUser, "
251: "PubDate, PubHost) VALUES (%d, %d, '%s', '%s', '%s', "
252: "datetime('now', 'localtime'), '%s');",
253: str, retain, msgid, topic, txt, user, host);
1.1.2.8 misho 254:
255: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
256: MQTT_RTLM_LOG(sql);
257: return -1;
258: }
259: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
260: ret = sqlite3_changes(sql);
261: else {
262: if (ret > SQLITE_OK && ret < SQLITE_ROW)
263: MQTT_RTLM_LOG(sql);
264: ret = 0;
265: }
266: sqlite3_finalize(stmt);
267:
268: return ret;
269: }
270:
271: /*
272: * mqtt_rtlm_delete_topic() Delete topic
273: *
274: * @cfg = loaded config
275: * @sql = SQL handle
1.1.2.14 misho 276: * @msgid = MessageID
1.1.2.8 misho 277: * @topic = topic
278: * @user = username
279: * @host = hostname
280: * @retain = -1 no matter
281: * return: -1 error, 0 no changes or >0 deleted rows
282: */
283: int
1.1.2.14 misho 284: mqtt_rtlm_delete_topic(sl_config *cfg, sqlite3 *sql, u_short msgid, const char *topic,
1.1.2.8 misho 285: const char *user, const char *host, char retain)
286: {
287: int ret = 0;
288: char *str, *rtn, szStmt[BUFSIZ] = { 0 };
289: sqlite3_stmt *stmt;
290:
291: if (!cfg || !sql || !topic)
292: return -1;
293:
294: str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));
295: if (!str) {
296: mqtt_rtlm_log("Error:: not found topics table name");
297: return -1;
298: }
299: switch (retain) {
300: case -1:
301: rtn = "";
302: break;
303: case 0:
304: rtn = "AND Retain = 0";
305: break;
306: default:
307: rtn = "AND Retain != 0";
308: break;
309: }
1.1.2.14 misho 310: snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE MsgID = %d AND Topic LIKE '%s' AND "
1.1.2.8 misho 311: "PubUser LIKE '%s' AND PubHost LIKE '%s' %s;", str,
1.1.2.14 misho 312: msgid, topic, user, host, rtn);
1.1.2.8 misho 313:
314: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
315: MQTT_RTLM_LOG(sql);
316: return -1;
317: }
318: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
319: ret = sqlite3_changes(sql);
320: else {
321: if (ret > SQLITE_OK && ret < SQLITE_ROW)
322: MQTT_RTLM_LOG(sql);
323: ret = 0;
324: }
325: sqlite3_finalize(stmt);
326:
327: return ret;
328: }
1.1.2.9 misho 329:
330: /*
331: * mqtt_rtlm_read_topic() Get topic
332: *
333: * @cfg = loaded config
334: * @sql = SQL handle
1.1.2.14 misho 335: * @msgid = MessageID
1.1.2.9 misho 336: * @topic = topic
1.1.2.10 misho 337: * @retain = retain 0 get only dynamic, >0 get only retained and -1 no matter
338: * return: NULL error or not found and !=NULL allocated subscribe topics
1.1.2.9 misho 339: */
1.1.2.10 misho 340: mqtt_subscr_t *
1.1.2.14 misho 341: mqtt_rtlm_read_topic(sl_config *cfg, sqlite3 *sql, u_short msgid, const char *topic, char retain)
1.1.2.9 misho 342: {
1.1.2.10 misho 343: int rowz = 0;
344: char *str, szStr[STRSIZ], szStmt[BUFSIZ] = { 0 };
1.1.2.9 misho 345: sqlite3_stmt *stmt;
1.1.2.10 misho 346: register int j;
347: mqtt_subscr_t *s = NULL;
1.1.2.9 misho 348:
1.1.2.10 misho 349: if (!cfg || !sql || !topic)
350: return NULL;
351:
352: switch (retain) {
353: case -1:
354: memset(szStr, 0, sizeof szStr);
355: break;
356: case 0:
357: snprintf(szStr, sizeof szStr, "AND Retain = 0");
358: break;
359: default:
360: snprintf(szStr, sizeof szStr, "AND Retain > 0");
361: break;
362: }
1.1.2.9 misho 363:
364: str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));
365: if (!str) {
366: mqtt_rtlm_log("Error:: not found topics table name");
1.1.2.10 misho 367: return NULL;
1.1.2.9 misho 368: }
1.1.2.14 misho 369: snprintf(szStmt, sizeof szStmt, "SELECT Retain, Topic, Value FROM %s WHERE "
370: "MsgID = %d AND Topic LIKE '%s' %s;",
371: str, msgid, topic, szStr);
1.1.2.9 misho 372:
373: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
374: MQTT_RTLM_LOG(sql);
1.1.2.10 misho 375: return NULL;
1.1.2.9 misho 376: }
1.1.2.10 misho 377:
378: /* calculate count of rows and allocate subscribe items */
379: while (sqlite3_step(stmt) == SQLITE_ROW)
380: rowz++;
381: if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
382: mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
383: goto end;
384: } else
385: memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
386: sqlite3_reset(stmt);
387:
388: /* fill with data */
389: for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
1.1.2.11 misho 390: s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
1.1.2.15! misho 391: s[j].sub_topic._base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
! 392: s[j].sub_topic._size = strlen((char*) s[j].sub_topic._base);
! 393: s[j].sub_value._base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 2));
! 394: s[j].sub_value._size = strlen((char*) s[j].sub_value._base);
1.1.2.9 misho 395: }
1.1.2.10 misho 396: end:
1.1.2.9 misho 397: sqlite3_finalize(stmt);
398:
1.1.2.10 misho 399: return s;
1.1.2.9 misho 400: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>