Annotation of mqtt/src/pubmqtt.c, revision 1.1.2.17
1.1.2.1 misho 1: #include "global.h"
2:
3:
1.1.2.12 misho 4: extern const char sql_schema[];
5:
6:
1.1.2.2 misho 7: /*
8: * mqtt_db_log() Log database connection message
9: *
10: * @fmt = format string
11: * @... = argument list
12: * return: none
13: */
14: static void
15: mqtt_rtlm_log(const char *fmt, ...)
16: {
17: va_list lst;
18:
19: va_start(lst, fmt);
20: vsyslog(LOG_ERR, fmt, lst);
21: va_end(lst);
22: }
1.1.2.3 misho 23: #define MQTT_RTLM_LOG(_sql) (assert((_sql)), mqtt_rtlm_log("Error:: SQL #%d - %s", \
1.1.2.2 misho 24: sqlite3_errcode((_sql)), sqlite3_errmsg((_sql))))
25:
26:
27: /*
28: * mqtt_rtlm_open() Open database connection
29: *
1.1.2.3 misho 30: * @cfg = config filename
1.1.2.2 misho 31: * return: NULL error or SQL handle
32: */
33: sqlite3 *
34: mqtt_rtlm_open(sl_config *cfg)
35: {
36: sqlite3 *sql = NULL;
37: const char *str = NULL;
38:
39: if (!cfg)
40: return NULL;
41:
1.1.2.16 misho 42: /*
43: if (!sqlite3_threadsafe() || sqlite3_config(SQLITE_CONFIG_SERIALIZED))
44: return NULL;
45: */
1.1.2.12 misho 46:
1.1.2.2 misho 47: str = (const char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("name"));
48: if (!str) {
49: mqtt_rtlm_log("Error:: Unknown database name ...\n");
50: return NULL;
51: }
52:
1.1.2.13 misho 53: if (sqlite3_open_v2(str, &sql, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL)) {
1.1.2.3 misho 54: MQTT_RTLM_LOG(sql);
1.1.2.2 misho 55: sqlite3_close(sql);
56: return NULL;
57: }
58:
1.1.2.12 misho 59: if (sqlite3_exec(sql, sql_schema, NULL, NULL, NULL)) {
60: MQTT_RTLM_LOG(sql);
61: sqlite3_close(sql);
62: return NULL;
63: }
1.1.2.2 misho 64: return sql;
65: }
66:
67: /*
68: * mqtt_rtlm_close() Close database connection
69: *
70: * @sql = SQL handle
71: * return: none
72: */
73: void
74: mqtt_rtlm_close(sqlite3 *sql)
75: {
76: sqlite3_close(sql);
77: }
1.1.2.4 misho 78:
79: /*
80: * mqtt_rtlm_init_session() Create session
81: *
82: * @cfg = loaded config
83: * @sql = SQL handle
1.1.2.14 misho 84: * @connid = connection id
1.1.2.17! misho 85: * @user = username
1.1.2.4 misho 86: * @host = hostname
1.1.2.14 misho 87: * @will = will flag if !=0 must fill arguments
88: * @... = will arguments in order topic,msg,qos,retain
1.1.2.4 misho 89: * return: -1 error, 0 session already appears or >0 row changed
90: */
91: int
1.1.2.17! misho 92: mqtt_rtlm_init_session(sl_config *cfg, sqlite3 *sql, const char *connid, const char *user,
! 93: const char *host, char will, ...)
1.1.2.4 misho 94: {
1.1.2.14 misho 95: va_list lst;
1.1.2.4 misho 96: int ret = 0;
97: char *str, szStmt[BUFSIZ] = { 0 };
98: sqlite3_stmt *stmt;
99:
100: if (!cfg || !sql)
101: return -1;
102:
103: str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
104: if (!str) {
1.1.2.8 misho 105: mqtt_rtlm_log("Error:: not found online table name");
1.1.2.4 misho 106: return -1;
107: }
1.1.2.14 misho 108: if (!will)
1.1.2.17! misho 109: snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (ConnID, Username, RemoteHost, "
! 110: "WillFlag) VALUES ('%s', '%s', '%s', 0);", str, connid, user, host);
1.1.2.14 misho 111: else {
112: va_start(lst, will);
1.1.2.17! misho 113: snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (ConnID, Username, RemoteHost, "
1.1.2.14 misho 114: "WillFlag, WillRetain, WillQoS, WillMsg, WillTopic) "
1.1.2.17! misho 115: "VALUES ('%s', '%s', '%s', %d, %d, %d, '%s', '%s');",
! 116: str, connid, user, host, will,
1.1.2.14 misho 117: va_arg(lst, int), va_arg(lst, int), va_arg(lst, char*), va_arg(lst, char*));
118: va_end(lst);
119: }
1.1.2.4 misho 120:
121: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
122: MQTT_RTLM_LOG(sql);
123: return -1;
124: }
125: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
126: ret = sqlite3_changes(sql);
127: else {
128: if (ret > SQLITE_OK && ret < SQLITE_ROW)
129: MQTT_RTLM_LOG(sql);
130: ret = 0;
131: }
132: sqlite3_finalize(stmt);
133:
134: return ret;
135: }
1.1.2.5 misho 136:
137: /*
138: * mqtt_rtlm_fini_session() Delete session(s)
139: *
140: * @cfg = loaded config
141: * @sql = SQL handle
1.1.2.14 misho 142: * @connid = connection id
1.1.2.17! misho 143: * @user = username
1.1.2.5 misho 144: * @host = hostname
145: * return: -1 error, 0 session already appears or >0 row changed
146: */
147: int
1.1.2.17! misho 148: mqtt_rtlm_fini_session(sl_config *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
1.1.2.5 misho 149: {
150: int ret = 0;
151: char *str, szStmt[BUFSIZ] = { 0 };
152: sqlite3_stmt *stmt;
153:
154: if (!cfg || !sql)
155: return -1;
156:
157: str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
158: if (!str) {
1.1.2.8 misho 159: mqtt_rtlm_log("Error:: not found online table name");
1.1.2.5 misho 160: return -1;
161: }
1.1.2.17! misho 162: snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE ConnID = '%s' AND Username = '%s' "
1.1.2.14 misho 163: "AND RemoteHost LIKE '%s';", str, user, connid, host);
1.1.2.5 misho 164:
165: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
166: MQTT_RTLM_LOG(sql);
167: return -1;
168: }
169: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
170: ret = sqlite3_changes(sql);
171: else {
172: if (ret > SQLITE_OK && ret < SQLITE_ROW)
173: MQTT_RTLM_LOG(sql);
174: ret = 0;
175: }
176: sqlite3_finalize(stmt);
177:
178: return ret;
179: }
1.1.2.6 misho 180:
181: /*
182: * mqtt_rtlm_chk_session() Check session(s)
183: *
184: * @cfg = loaded config
185: * @sql = SQL handle
1.1.2.14 misho 186: * @connid = connection id
1.1.2.17! misho 187: * @user = username
1.1.2.6 misho 188: * @host = hostname
189: * return: -1 error, 0 not logged or >0 logged found rows
190: */
191: int
1.1.2.17! misho 192: mqtt_rtlm_chk_session(sl_config *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
1.1.2.6 misho 193: {
194: int ret = 0;
195: char *str, szStmt[BUFSIZ] = { 0 };
196: sqlite3_stmt *stmt;
197:
198: if (!cfg || !sql)
199: return -1;
200:
201: str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
202: if (!str) {
1.1.2.8 misho 203: mqtt_rtlm_log("Error:: not found online table name");
1.1.2.6 misho 204: return -1;
205: }
1.1.2.17! misho 206: snprintf(szStmt, sizeof szStmt, "SELECT ConnID, RemoteHost FROM %s WHERE "
! 207: "ConnID = '%s' AND Username = '%s' AND RemoteHost LIKE '%s';",
! 208: str, connid, user, host);
1.1.2.6 misho 209:
210: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
211: MQTT_RTLM_LOG(sql);
212: return -1;
213: }
1.1.2.7 misho 214: if (sqlite3_step(stmt) == SQLITE_ROW)
215: ret = sqlite3_changes(sql);
216: else
217: ret = 0;
1.1.2.6 misho 218: sqlite3_finalize(stmt);
219:
220: return ret;
221: }
1.1.2.8 misho 222:
223: /*
224: * mqtt_rtlm_write_topic() Publish topic
225: *
226: * @cfg = loaded config
227: * @sql = SQL handle
1.1.2.14 misho 228: * @msgid = MessageID
1.1.2.8 misho 229: * @topic = topic
230: * @txt = text
231: * @user = username
232: * @host = hostname
233: * @retain = !=0 retain message to database
234: * return: -1 error, 0 no publish or >0 published ok
235: */
236: int
1.1.2.14 misho 237: mqtt_rtlm_write_topic(sl_config *cfg, sqlite3 *sql, u_short msgid, const char *topic, const char *txt,
1.1.2.8 misho 238: const char *user, const char *host, char retain)
239: {
240: int ret = 0;
241: char *str, szStmt[BUFSIZ] = { 0 };
242: sqlite3_stmt *stmt;
243:
244: if (!cfg || !sql || !topic)
245: return -1;
246:
247: str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));
248: if (!str) {
249: mqtt_rtlm_log("Error:: not found topics table name");
250: return -1;
251: }
1.1.2.14 misho 252: snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Retain, MsgID, Topic, Value, PubUser, "
253: "PubDate, PubHost) VALUES (%d, %d, '%s', '%s', '%s', "
254: "datetime('now', 'localtime'), '%s');",
255: str, retain, msgid, topic, txt, user, host);
1.1.2.8 misho 256:
257: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
258: MQTT_RTLM_LOG(sql);
259: return -1;
260: }
261: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
262: ret = sqlite3_changes(sql);
263: else {
264: if (ret > SQLITE_OK && ret < SQLITE_ROW)
265: MQTT_RTLM_LOG(sql);
266: ret = 0;
267: }
268: sqlite3_finalize(stmt);
269:
270: return ret;
271: }
272:
273: /*
274: * mqtt_rtlm_delete_topic() Delete topic
275: *
276: * @cfg = loaded config
277: * @sql = SQL handle
1.1.2.14 misho 278: * @msgid = MessageID
1.1.2.8 misho 279: * @topic = topic
280: * @user = username
281: * @host = hostname
282: * @retain = -1 no matter
283: * return: -1 error, 0 no changes or >0 deleted rows
284: */
285: int
1.1.2.14 misho 286: mqtt_rtlm_delete_topic(sl_config *cfg, sqlite3 *sql, u_short msgid, const char *topic,
1.1.2.8 misho 287: const char *user, const char *host, char retain)
288: {
289: int ret = 0;
290: char *str, *rtn, szStmt[BUFSIZ] = { 0 };
291: sqlite3_stmt *stmt;
292:
293: if (!cfg || !sql || !topic)
294: return -1;
295:
296: str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));
297: if (!str) {
298: mqtt_rtlm_log("Error:: not found topics table name");
299: return -1;
300: }
301: switch (retain) {
302: case -1:
303: rtn = "";
304: break;
305: case 0:
306: rtn = "AND Retain = 0";
307: break;
308: default:
309: rtn = "AND Retain != 0";
310: break;
311: }
1.1.2.14 misho 312: snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE MsgID = %d AND Topic LIKE '%s' AND "
1.1.2.8 misho 313: "PubUser LIKE '%s' AND PubHost LIKE '%s' %s;", str,
1.1.2.14 misho 314: msgid, topic, user, host, rtn);
1.1.2.8 misho 315:
316: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
317: MQTT_RTLM_LOG(sql);
318: return -1;
319: }
320: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
321: ret = sqlite3_changes(sql);
322: else {
323: if (ret > SQLITE_OK && ret < SQLITE_ROW)
324: MQTT_RTLM_LOG(sql);
325: ret = 0;
326: }
327: sqlite3_finalize(stmt);
328:
329: return ret;
330: }
1.1.2.9 misho 331:
332: /*
333: * mqtt_rtlm_read_topic() Get topic
334: *
335: * @cfg = loaded config
336: * @sql = SQL handle
1.1.2.14 misho 337: * @msgid = MessageID
1.1.2.9 misho 338: * @topic = topic
1.1.2.10 misho 339: * @retain = retain 0 get only dynamic, >0 get only retained and -1 no matter
340: * return: NULL error or not found and !=NULL allocated subscribe topics
1.1.2.9 misho 341: */
1.1.2.10 misho 342: mqtt_subscr_t *
1.1.2.14 misho 343: mqtt_rtlm_read_topic(sl_config *cfg, sqlite3 *sql, u_short msgid, const char *topic, char retain)
1.1.2.9 misho 344: {
1.1.2.10 misho 345: int rowz = 0;
346: char *str, szStr[STRSIZ], szStmt[BUFSIZ] = { 0 };
1.1.2.9 misho 347: sqlite3_stmt *stmt;
1.1.2.10 misho 348: register int j;
349: mqtt_subscr_t *s = NULL;
1.1.2.9 misho 350:
1.1.2.10 misho 351: if (!cfg || !sql || !topic)
352: return NULL;
353:
354: switch (retain) {
355: case -1:
356: memset(szStr, 0, sizeof szStr);
357: break;
358: case 0:
359: snprintf(szStr, sizeof szStr, "AND Retain = 0");
360: break;
361: default:
362: snprintf(szStr, sizeof szStr, "AND Retain > 0");
363: break;
364: }
1.1.2.9 misho 365:
366: str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));
367: if (!str) {
368: mqtt_rtlm_log("Error:: not found topics table name");
1.1.2.10 misho 369: return NULL;
1.1.2.9 misho 370: }
1.1.2.14 misho 371: snprintf(szStmt, sizeof szStmt, "SELECT Retain, Topic, Value FROM %s WHERE "
372: "MsgID = %d AND Topic LIKE '%s' %s;",
373: str, msgid, topic, szStr);
1.1.2.9 misho 374:
375: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
376: MQTT_RTLM_LOG(sql);
1.1.2.10 misho 377: return NULL;
1.1.2.9 misho 378: }
1.1.2.10 misho 379:
380: /* calculate count of rows and allocate subscribe items */
381: while (sqlite3_step(stmt) == SQLITE_ROW)
382: rowz++;
383: if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
384: mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
385: goto end;
386: } else
387: memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
388: sqlite3_reset(stmt);
389:
390: /* fill with data */
391: for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
1.1.2.11 misho 392: s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
1.1.2.15 misho 393: s[j].sub_topic._base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
394: s[j].sub_topic._size = strlen((char*) s[j].sub_topic._base);
395: s[j].sub_value._base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 2));
396: s[j].sub_value._size = strlen((char*) s[j].sub_value._base);
1.1.2.9 misho 397: }
1.1.2.10 misho 398: end:
1.1.2.9 misho 399: sqlite3_finalize(stmt);
400:
1.1.2.10 misho 401: return s;
1.1.2.9 misho 402: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>