Annotation of mqtt/src/pubmqtt.c, revision 1.1.2.13
1.1.2.1 misho 1: #include "global.h"
2:
3:
1.1.2.12 misho 4: extern const char sql_schema[];
5:
6:
1.1.2.2 misho 7: /*
8: * mqtt_db_log() Log database connection message
9: *
10: * @fmt = format string
11: * @... = argument list
12: * return: none
13: */
14: static void
15: mqtt_rtlm_log(const char *fmt, ...)
16: {
17: va_list lst;
18:
19: va_start(lst, fmt);
20: vsyslog(LOG_ERR, fmt, lst);
21: va_end(lst);
22: }
1.1.2.3 misho 23: #define MQTT_RTLM_LOG(_sql) (assert((_sql)), mqtt_rtlm_log("Error:: SQL #%d - %s", \
1.1.2.2 misho 24: sqlite3_errcode((_sql)), sqlite3_errmsg((_sql))))
25:
26:
27: /*
28: * mqtt_rtlm_open() Open database connection
29: *
1.1.2.3 misho 30: * @cfg = config filename
1.1.2.2 misho 31: * return: NULL error or SQL handle
32: */
33: sqlite3 *
34: mqtt_rtlm_open(sl_config *cfg)
35: {
36: sqlite3 *sql = NULL;
37: const char *str = NULL;
38:
39: if (!cfg)
40: return NULL;
41:
1.1.2.12 misho 42: sqlite3_config(SQLITE_CONFIG_SERIALIZED);
43:
1.1.2.2 misho 44: str = (const char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("name"));
45: if (!str) {
46: mqtt_rtlm_log("Error:: Unknown database name ...\n");
47: return NULL;
48: }
49:
1.1.2.13! misho 50: if (sqlite3_open_v2(str, &sql, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL)) {
1.1.2.3 misho 51: MQTT_RTLM_LOG(sql);
1.1.2.2 misho 52: sqlite3_close(sql);
53: return NULL;
54: }
55:
1.1.2.12 misho 56: if (sqlite3_exec(sql, sql_schema, NULL, NULL, NULL)) {
57: MQTT_RTLM_LOG(sql);
58: sqlite3_close(sql);
59: return NULL;
60: }
1.1.2.2 misho 61: return sql;
62: }
63:
64: /*
65: * mqtt_rtlm_close() Close database connection
66: *
67: * @sql = SQL handle
68: * return: none
69: */
70: void
71: mqtt_rtlm_close(sqlite3 *sql)
72: {
73: sqlite3_close(sql);
74: }
1.1.2.4 misho 75:
76: /*
77: * mqtt_rtlm_init_session() Create session
78: *
79: * @cfg = loaded config
80: * @sql = SQL handle
81: * @user = username
82: * @host = hostname
83: * @port = port
84: * return: -1 error, 0 session already appears or >0 row changed
85: */
86: int
87: mqtt_rtlm_init_session(sl_config *cfg, sqlite3 *sql, const char *user, const char *host, u_short port)
88: {
89: int ret = 0;
90: char *str, szStmt[BUFSIZ] = { 0 };
91: sqlite3_stmt *stmt;
92:
93: if (!cfg || !sql)
94: return -1;
95:
96: str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
97: if (!str) {
1.1.2.8 misho 98: mqtt_rtlm_log("Error:: not found online table name");
1.1.2.4 misho 99: return -1;
100: }
101: snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Username, RemoteHost, RemotePort) "
102: "VALUES ('%s', '%s', %d);", str, user, host, port);
103:
104: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
105: MQTT_RTLM_LOG(sql);
106: return -1;
107: }
108: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
109: ret = sqlite3_changes(sql);
110: else {
111: if (ret > SQLITE_OK && ret < SQLITE_ROW)
112: MQTT_RTLM_LOG(sql);
113: ret = 0;
114: }
115: sqlite3_finalize(stmt);
116:
117: return ret;
118: }
1.1.2.5 misho 119:
120: /*
121: * mqtt_rtlm_fini_session() Delete session(s)
122: *
123: * @cfg = loaded config
124: * @sql = SQL handle
125: * @user = username
126: * @host = hostname
127: * return: -1 error, 0 session already appears or >0 row changed
128: */
129: int
130: mqtt_rtlm_fini_session(sl_config *cfg, sqlite3 *sql, const char *user, const char *host)
131: {
132: int ret = 0;
133: char *str, szStmt[BUFSIZ] = { 0 };
134: sqlite3_stmt *stmt;
135:
136: if (!cfg || !sql)
137: return -1;
138:
139: str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
140: if (!str) {
1.1.2.8 misho 141: mqtt_rtlm_log("Error:: not found online table name");
1.1.2.5 misho 142: return -1;
143: }
144: snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE Username = '%s' AND RemoteHost LIKE '%s';",
145: str, user, host);
146:
147: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
148: MQTT_RTLM_LOG(sql);
149: return -1;
150: }
151: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
152: ret = sqlite3_changes(sql);
153: else {
154: if (ret > SQLITE_OK && ret < SQLITE_ROW)
155: MQTT_RTLM_LOG(sql);
156: ret = 0;
157: }
158: sqlite3_finalize(stmt);
159:
160: return ret;
161: }
1.1.2.6 misho 162:
163: /*
164: * mqtt_rtlm_chk_session() Check session(s)
165: *
166: * @cfg = loaded config
167: * @sql = SQL handle
168: * @user = username
169: * @host = hostname
170: * return: -1 error, 0 not logged or >0 logged found rows
171: */
172: int
173: mqtt_rtlm_chk_session(sl_config *cfg, sqlite3 *sql, const char *user, const char *host)
174: {
175: int ret = 0;
176: char *str, szStmt[BUFSIZ] = { 0 };
177: sqlite3_stmt *stmt;
178:
179: if (!cfg || !sql)
180: return -1;
181:
182: str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
183: if (!str) {
1.1.2.8 misho 184: mqtt_rtlm_log("Error:: not found online table name");
1.1.2.6 misho 185: return -1;
186: }
187: snprintf(szStmt, sizeof szStmt, "SELECT RemoteHost, RemotePort FROM %s WHERE "
188: "Username = '%s' AND RemoteHost LIKE '%s';", str, user, host);
189:
190: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
191: MQTT_RTLM_LOG(sql);
192: return -1;
193: }
1.1.2.7 misho 194: if (sqlite3_step(stmt) == SQLITE_ROW)
195: ret = sqlite3_changes(sql);
196: else
197: ret = 0;
1.1.2.6 misho 198: sqlite3_finalize(stmt);
199:
200: return ret;
201: }
1.1.2.8 misho 202:
203: /*
204: * mqtt_rtlm_write_topic() Publish topic
205: *
206: * @cfg = loaded config
207: * @sql = SQL handle
208: * @topic = topic
209: * @txt = text
210: * @user = username
211: * @host = hostname
212: * @retain = !=0 retain message to database
213: * return: -1 error, 0 no publish or >0 published ok
214: */
215: int
216: mqtt_rtlm_write_topic(sl_config *cfg, sqlite3 *sql, const char *topic, const char *txt,
217: const char *user, const char *host, char retain)
218: {
219: int ret = 0;
220: char *str, szStmt[BUFSIZ] = { 0 };
221: sqlite3_stmt *stmt;
222:
223: if (!cfg || !sql || !topic)
224: return -1;
225:
226: str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));
227: if (!str) {
228: mqtt_rtlm_log("Error:: not found topics table name");
229: return -1;
230: }
231: snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Retain, Topic, Value, PubUser, PubDate, PubHost) "
232: "VALUES (%d, '%s', '%s', '%s', datetime('now', 'localtime'), '%s');", str,
233: retain, topic, txt, user, host);
234:
235: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
236: MQTT_RTLM_LOG(sql);
237: return -1;
238: }
239: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
240: ret = sqlite3_changes(sql);
241: else {
242: if (ret > SQLITE_OK && ret < SQLITE_ROW)
243: MQTT_RTLM_LOG(sql);
244: ret = 0;
245: }
246: sqlite3_finalize(stmt);
247:
248: return ret;
249: }
250:
251: /*
252: * mqtt_rtlm_delete_topic() Delete topic
253: *
254: * @cfg = loaded config
255: * @sql = SQL handle
256: * @topic = topic
257: * @user = username
258: * @host = hostname
259: * @retain = -1 no matter
260: * return: -1 error, 0 no changes or >0 deleted rows
261: */
262: int
263: mqtt_rtlm_delete_topic(sl_config *cfg, sqlite3 *sql, const char *topic,
264: const char *user, const char *host, char retain)
265: {
266: int ret = 0;
267: char *str, *rtn, szStmt[BUFSIZ] = { 0 };
268: sqlite3_stmt *stmt;
269:
270: if (!cfg || !sql || !topic)
271: return -1;
272:
273: str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));
274: if (!str) {
275: mqtt_rtlm_log("Error:: not found topics table name");
276: return -1;
277: }
278: switch (retain) {
279: case -1:
280: rtn = "";
281: break;
282: case 0:
283: rtn = "AND Retain = 0";
284: break;
285: default:
286: rtn = "AND Retain != 0";
287: break;
288: }
289: snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE Topic LIKE '%s' AND "
290: "PubUser LIKE '%s' AND PubHost LIKE '%s' %s;", str,
291: topic, user, host, rtn);
292:
293: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
294: MQTT_RTLM_LOG(sql);
295: return -1;
296: }
297: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
298: ret = sqlite3_changes(sql);
299: else {
300: if (ret > SQLITE_OK && ret < SQLITE_ROW)
301: MQTT_RTLM_LOG(sql);
302: ret = 0;
303: }
304: sqlite3_finalize(stmt);
305:
306: return ret;
307: }
1.1.2.9 misho 308:
309: /*
310: * mqtt_rtlm_read_topic() Get topic
311: *
312: * @cfg = loaded config
313: * @sql = SQL handle
314: * @topic = topic
1.1.2.10 misho 315: * @retain = retain 0 get only dynamic, >0 get only retained and -1 no matter
316: * return: NULL error or not found and !=NULL allocated subscribe topics
1.1.2.9 misho 317: */
1.1.2.10 misho 318: mqtt_subscr_t *
319: mqtt_rtlm_read_topic(sl_config *cfg, sqlite3 *sql, const char *topic, char retain)
1.1.2.9 misho 320: {
1.1.2.10 misho 321: int rowz = 0;
322: char *str, szStr[STRSIZ], szStmt[BUFSIZ] = { 0 };
1.1.2.9 misho 323: sqlite3_stmt *stmt;
1.1.2.10 misho 324: register int j;
325: mqtt_subscr_t *s = NULL;
1.1.2.9 misho 326:
1.1.2.10 misho 327: if (!cfg || !sql || !topic)
328: return NULL;
329:
330: switch (retain) {
331: case -1:
332: memset(szStr, 0, sizeof szStr);
333: break;
334: case 0:
335: snprintf(szStr, sizeof szStr, "AND Retain = 0");
336: break;
337: default:
338: snprintf(szStr, sizeof szStr, "AND Retain > 0");
339: break;
340: }
1.1.2.9 misho 341:
342: str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));
343: if (!str) {
344: mqtt_rtlm_log("Error:: not found topics table name");
1.1.2.10 misho 345: return NULL;
1.1.2.9 misho 346: }
1.1.2.10 misho 347: snprintf(szStmt, sizeof szStmt, "SELECT Retain, Topic, Value FROM %s WHERE Topic LIKE '%s' %s;",
348: str, topic, szStr);
1.1.2.9 misho 349:
350: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
351: MQTT_RTLM_LOG(sql);
1.1.2.10 misho 352: return NULL;
1.1.2.9 misho 353: }
1.1.2.10 misho 354:
355: /* calculate count of rows and allocate subscribe items */
356: while (sqlite3_step(stmt) == SQLITE_ROW)
357: rowz++;
358: if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
359: mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
360: goto end;
361: } else
362: memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
363: sqlite3_reset(stmt);
364:
365: /* fill with data */
366: for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
1.1.2.11 misho 367: s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
1.1.2.10 misho 368: s[j].sub_topic._base = strdup(sqlite3_column_text(stmt, 1));
369: s[j].sub_topic._size = strlen(s[j].sub_topic._base);
370: s[j].sub_value._base = strdup(sqlite3_column_text(stmt, 2));
371: s[j].sub_value._size = strlen(s[j].sub_value._base);
1.1.2.9 misho 372: }
1.1.2.10 misho 373: end:
1.1.2.9 misho 374: sqlite3_finalize(stmt);
375:
1.1.2.10 misho 376: return s;
1.1.2.9 misho 377: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>