Annotation of mqtt/src/pubmqtt.c, revision 1.1.2.16
1.1.2.1 misho 1: #include "global.h"
2:
3:
1.1.2.12 misho 4: extern const char sql_schema[];
5:
6:
1.1.2.2 misho 7: /*
8: * mqtt_db_log() Log database connection message
9: *
10: * @fmt = format string
11: * @... = argument list
12: * return: none
13: */
14: static void
15: mqtt_rtlm_log(const char *fmt, ...)
16: {
17: va_list lst;
18:
19: va_start(lst, fmt);
20: vsyslog(LOG_ERR, fmt, lst);
21: va_end(lst);
22: }
1.1.2.3 misho 23: #define MQTT_RTLM_LOG(_sql) (assert((_sql)), mqtt_rtlm_log("Error:: SQL #%d - %s", \
1.1.2.2 misho 24: sqlite3_errcode((_sql)), sqlite3_errmsg((_sql))))
25:
26:
27: /*
28: * mqtt_rtlm_open() Open database connection
29: *
1.1.2.3 misho 30: * @cfg = config filename
1.1.2.2 misho 31: * return: NULL error or SQL handle
32: */
33: sqlite3 *
34: mqtt_rtlm_open(sl_config *cfg)
35: {
36: sqlite3 *sql = NULL;
37: const char *str = NULL;
38:
39: if (!cfg)
40: return NULL;
41:
1.1.2.16! misho 42: /*
! 43: if (!sqlite3_threadsafe() || sqlite3_config(SQLITE_CONFIG_SERIALIZED))
! 44: return NULL;
! 45: */
1.1.2.12 misho 46:
1.1.2.2 misho 47: str = (const char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("name"));
48: if (!str) {
49: mqtt_rtlm_log("Error:: Unknown database name ...\n");
50: return NULL;
51: }
52:
1.1.2.13 misho 53: if (sqlite3_open_v2(str, &sql, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL)) {
1.1.2.3 misho 54: MQTT_RTLM_LOG(sql);
1.1.2.2 misho 55: sqlite3_close(sql);
56: return NULL;
57: }
58:
1.1.2.12 misho 59: if (sqlite3_exec(sql, sql_schema, NULL, NULL, NULL)) {
60: MQTT_RTLM_LOG(sql);
61: sqlite3_close(sql);
62: return NULL;
63: }
1.1.2.2 misho 64: return sql;
65: }
66:
67: /*
68: * mqtt_rtlm_close() Close database connection
69: *
70: * @sql = SQL handle
71: * return: none
72: */
73: void
74: mqtt_rtlm_close(sqlite3 *sql)
75: {
76: sqlite3_close(sql);
77: }
1.1.2.4 misho 78:
79: /*
80: * mqtt_rtlm_init_session() Create session
81: *
82: * @cfg = loaded config
83: * @sql = SQL handle
84: * @user = username
1.1.2.14 misho 85: * @connid = connection id
1.1.2.4 misho 86: * @host = hostname
87: * @port = port
1.1.2.14 misho 88: * @will = will flag if !=0 must fill arguments
89: * @... = will arguments in order topic,msg,qos,retain
1.1.2.4 misho 90: * return: -1 error, 0 session already appears or >0 row changed
91: */
92: int
1.1.2.14 misho 93: mqtt_rtlm_init_session(sl_config *cfg, sqlite3 *sql, const char *user, const char *connid,
94: const char *host, u_short port, char will, ...)
1.1.2.4 misho 95: {
1.1.2.14 misho 96: va_list lst;
1.1.2.4 misho 97: int ret = 0;
98: char *str, szStmt[BUFSIZ] = { 0 };
99: sqlite3_stmt *stmt;
100:
101: if (!cfg || !sql)
102: return -1;
103:
104: str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
105: if (!str) {
1.1.2.8 misho 106: mqtt_rtlm_log("Error:: not found online table name");
1.1.2.4 misho 107: return -1;
108: }
1.1.2.14 misho 109: if (!will)
110: snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Username, ConnID, RemoteHost, RemotePort, "
111: "WillFlag) VALUES ('%s', '%s', '%s', %d, 0);", str, user, connid, host, port);
112: else {
113: va_start(lst, will);
114: snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Username, ConnID, RemoteHost, RemotePort, "
115: "WillFlag, WillRetain, WillQoS, WillMsg, WillTopic) "
116: "VALUES ('%s', '%s', '%s', %d, %d, %d, %d, '%s', '%s');",
117: str, user, connid, host, port, will,
118: va_arg(lst, int), va_arg(lst, int), va_arg(lst, char*), va_arg(lst, char*));
119: va_end(lst);
120: }
1.1.2.4 misho 121:
122: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
123: MQTT_RTLM_LOG(sql);
124: return -1;
125: }
126: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
127: ret = sqlite3_changes(sql);
128: else {
129: if (ret > SQLITE_OK && ret < SQLITE_ROW)
130: MQTT_RTLM_LOG(sql);
131: ret = 0;
132: }
133: sqlite3_finalize(stmt);
134:
135: return ret;
136: }
1.1.2.5 misho 137:
138: /*
139: * mqtt_rtlm_fini_session() Delete session(s)
140: *
141: * @cfg = loaded config
142: * @sql = SQL handle
143: * @user = username
1.1.2.14 misho 144: * @connid = connection id
1.1.2.5 misho 145: * @host = hostname
146: * return: -1 error, 0 session already appears or >0 row changed
147: */
148: int
1.1.2.14 misho 149: mqtt_rtlm_fini_session(sl_config *cfg, sqlite3 *sql, const char *user, const char *connid, const char *host)
1.1.2.5 misho 150: {
151: int ret = 0;
152: char *str, szStmt[BUFSIZ] = { 0 };
153: sqlite3_stmt *stmt;
154:
155: if (!cfg || !sql)
156: return -1;
157:
158: str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
159: if (!str) {
1.1.2.8 misho 160: mqtt_rtlm_log("Error:: not found online table name");
1.1.2.5 misho 161: return -1;
162: }
1.1.2.14 misho 163: snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE Username = '%s' AND ConnID = '%s' "
164: "AND RemoteHost LIKE '%s';", str, user, connid, host);
1.1.2.5 misho 165:
166: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
167: MQTT_RTLM_LOG(sql);
168: return -1;
169: }
170: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
171: ret = sqlite3_changes(sql);
172: else {
173: if (ret > SQLITE_OK && ret < SQLITE_ROW)
174: MQTT_RTLM_LOG(sql);
175: ret = 0;
176: }
177: sqlite3_finalize(stmt);
178:
179: return ret;
180: }
1.1.2.6 misho 181:
182: /*
183: * mqtt_rtlm_chk_session() Check session(s)
184: *
185: * @cfg = loaded config
186: * @sql = SQL handle
187: * @user = username
1.1.2.14 misho 188: * @connid = connection id
1.1.2.6 misho 189: * @host = hostname
190: * return: -1 error, 0 not logged or >0 logged found rows
191: */
192: int
1.1.2.14 misho 193: mqtt_rtlm_chk_session(sl_config *cfg, sqlite3 *sql, const char *user, const char *connid, const char *host)
1.1.2.6 misho 194: {
195: int ret = 0;
196: char *str, szStmt[BUFSIZ] = { 0 };
197: sqlite3_stmt *stmt;
198:
199: if (!cfg || !sql)
200: return -1;
201:
202: str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
203: if (!str) {
1.1.2.8 misho 204: mqtt_rtlm_log("Error:: not found online table name");
1.1.2.6 misho 205: return -1;
206: }
1.1.2.14 misho 207: snprintf(szStmt, sizeof szStmt, "SELECT ConnID, RemoteHost, RemotePort FROM %s WHERE "
208: "Username = '%s' AND ConnID = '%s' AND RemoteHost LIKE '%s';",
209: str, user, connid, host);
1.1.2.6 misho 210:
211: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
212: MQTT_RTLM_LOG(sql);
213: return -1;
214: }
1.1.2.7 misho 215: if (sqlite3_step(stmt) == SQLITE_ROW)
216: ret = sqlite3_changes(sql);
217: else
218: ret = 0;
1.1.2.6 misho 219: sqlite3_finalize(stmt);
220:
221: return ret;
222: }
1.1.2.8 misho 223:
224: /*
225: * mqtt_rtlm_write_topic() Publish topic
226: *
227: * @cfg = loaded config
228: * @sql = SQL handle
1.1.2.14 misho 229: * @msgid = MessageID
1.1.2.8 misho 230: * @topic = topic
231: * @txt = text
232: * @user = username
233: * @host = hostname
234: * @retain = !=0 retain message to database
235: * return: -1 error, 0 no publish or >0 published ok
236: */
237: int
1.1.2.14 misho 238: mqtt_rtlm_write_topic(sl_config *cfg, sqlite3 *sql, u_short msgid, const char *topic, const char *txt,
1.1.2.8 misho 239: const char *user, const char *host, char retain)
240: {
241: int ret = 0;
242: char *str, szStmt[BUFSIZ] = { 0 };
243: sqlite3_stmt *stmt;
244:
245: if (!cfg || !sql || !topic)
246: return -1;
247:
248: str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));
249: if (!str) {
250: mqtt_rtlm_log("Error:: not found topics table name");
251: return -1;
252: }
1.1.2.14 misho 253: snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Retain, MsgID, Topic, Value, PubUser, "
254: "PubDate, PubHost) VALUES (%d, %d, '%s', '%s', '%s', "
255: "datetime('now', 'localtime'), '%s');",
256: str, retain, msgid, topic, txt, user, host);
1.1.2.8 misho 257:
258: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
259: MQTT_RTLM_LOG(sql);
260: return -1;
261: }
262: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
263: ret = sqlite3_changes(sql);
264: else {
265: if (ret > SQLITE_OK && ret < SQLITE_ROW)
266: MQTT_RTLM_LOG(sql);
267: ret = 0;
268: }
269: sqlite3_finalize(stmt);
270:
271: return ret;
272: }
273:
274: /*
275: * mqtt_rtlm_delete_topic() Delete topic
276: *
277: * @cfg = loaded config
278: * @sql = SQL handle
1.1.2.14 misho 279: * @msgid = MessageID
1.1.2.8 misho 280: * @topic = topic
281: * @user = username
282: * @host = hostname
283: * @retain = -1 no matter
284: * return: -1 error, 0 no changes or >0 deleted rows
285: */
286: int
1.1.2.14 misho 287: mqtt_rtlm_delete_topic(sl_config *cfg, sqlite3 *sql, u_short msgid, const char *topic,
1.1.2.8 misho 288: const char *user, const char *host, char retain)
289: {
290: int ret = 0;
291: char *str, *rtn, szStmt[BUFSIZ] = { 0 };
292: sqlite3_stmt *stmt;
293:
294: if (!cfg || !sql || !topic)
295: return -1;
296:
297: str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));
298: if (!str) {
299: mqtt_rtlm_log("Error:: not found topics table name");
300: return -1;
301: }
302: switch (retain) {
303: case -1:
304: rtn = "";
305: break;
306: case 0:
307: rtn = "AND Retain = 0";
308: break;
309: default:
310: rtn = "AND Retain != 0";
311: break;
312: }
1.1.2.14 misho 313: snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE MsgID = %d AND Topic LIKE '%s' AND "
1.1.2.8 misho 314: "PubUser LIKE '%s' AND PubHost LIKE '%s' %s;", str,
1.1.2.14 misho 315: msgid, topic, user, host, rtn);
1.1.2.8 misho 316:
317: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
318: MQTT_RTLM_LOG(sql);
319: return -1;
320: }
321: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
322: ret = sqlite3_changes(sql);
323: else {
324: if (ret > SQLITE_OK && ret < SQLITE_ROW)
325: MQTT_RTLM_LOG(sql);
326: ret = 0;
327: }
328: sqlite3_finalize(stmt);
329:
330: return ret;
331: }
1.1.2.9 misho 332:
333: /*
334: * mqtt_rtlm_read_topic() Get topic
335: *
336: * @cfg = loaded config
337: * @sql = SQL handle
1.1.2.14 misho 338: * @msgid = MessageID
1.1.2.9 misho 339: * @topic = topic
1.1.2.10 misho 340: * @retain = retain 0 get only dynamic, >0 get only retained and -1 no matter
341: * return: NULL error or not found and !=NULL allocated subscribe topics
1.1.2.9 misho 342: */
1.1.2.10 misho 343: mqtt_subscr_t *
1.1.2.14 misho 344: mqtt_rtlm_read_topic(sl_config *cfg, sqlite3 *sql, u_short msgid, const char *topic, char retain)
1.1.2.9 misho 345: {
1.1.2.10 misho 346: int rowz = 0;
347: char *str, szStr[STRSIZ], szStmt[BUFSIZ] = { 0 };
1.1.2.9 misho 348: sqlite3_stmt *stmt;
1.1.2.10 misho 349: register int j;
350: mqtt_subscr_t *s = NULL;
1.1.2.9 misho 351:
1.1.2.10 misho 352: if (!cfg || !sql || !topic)
353: return NULL;
354:
355: switch (retain) {
356: case -1:
357: memset(szStr, 0, sizeof szStr);
358: break;
359: case 0:
360: snprintf(szStr, sizeof szStr, "AND Retain = 0");
361: break;
362: default:
363: snprintf(szStr, sizeof szStr, "AND Retain > 0");
364: break;
365: }
1.1.2.9 misho 366:
367: str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));
368: if (!str) {
369: mqtt_rtlm_log("Error:: not found topics table name");
1.1.2.10 misho 370: return NULL;
1.1.2.9 misho 371: }
1.1.2.14 misho 372: snprintf(szStmt, sizeof szStmt, "SELECT Retain, Topic, Value FROM %s WHERE "
373: "MsgID = %d AND Topic LIKE '%s' %s;",
374: str, msgid, topic, szStr);
1.1.2.9 misho 375:
376: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
377: MQTT_RTLM_LOG(sql);
1.1.2.10 misho 378: return NULL;
1.1.2.9 misho 379: }
1.1.2.10 misho 380:
381: /* calculate count of rows and allocate subscribe items */
382: while (sqlite3_step(stmt) == SQLITE_ROW)
383: rowz++;
384: if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
385: mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
386: goto end;
387: } else
388: memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
389: sqlite3_reset(stmt);
390:
391: /* fill with data */
392: for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
1.1.2.11 misho 393: s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
1.1.2.15 misho 394: s[j].sub_topic._base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
395: s[j].sub_topic._size = strlen((char*) s[j].sub_topic._base);
396: s[j].sub_value._base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 2));
397: s[j].sub_value._size = strlen((char*) s[j].sub_value._base);
1.1.2.9 misho 398: }
1.1.2.10 misho 399: end:
1.1.2.9 misho 400: sqlite3_finalize(stmt);
401:
1.1.2.10 misho 402: return s;
1.1.2.9 misho 403: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>