Annotation of mqtt/src/pubmqtt.c, revision 1.2.2.1
1.2 misho 1: #include "global.h"
2:
3:
4: extern const char sql_schema[];
5:
6:
7: /*
8: * mqtt_db_log() Log database connection message
9: *
10: * @fmt = format string
11: * @... = argument list
12: * return: none
13: */
14: static void
15: mqtt_rtlm_log(const char *fmt, ...)
16: {
17: va_list lst;
18:
19: va_start(lst, fmt);
20: vsyslog(LOG_ERR, fmt, lst);
21: va_end(lst);
22: }
23: #define MQTT_RTLM_LOG(_sql) (assert((_sql)), mqtt_rtlm_log("Error:: SQL #%d - %s", \
24: sqlite3_errcode((_sql)), sqlite3_errmsg((_sql))))
25:
26:
27: /*
28: * mqtt_rtlm_open() Open database connection
29: *
30: * @cfg = config filename
31: * return: NULL error or SQL handle
32: */
33: sqlite3 *
34: mqtt_rtlm_open(sl_config *cfg)
35: {
36: sqlite3 *sql = NULL;
37: const char *str = NULL;
38:
39: if (!cfg)
40: return NULL;
41:
42: /*
43: if (!sqlite3_threadsafe() || sqlite3_config(SQLITE_CONFIG_SERIALIZED))
44: return NULL;
45: */
46:
47: str = (const char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("name"));
48: if (!str) {
49: mqtt_rtlm_log("Error:: Unknown database name ...\n");
50: return NULL;
51: }
52:
53: if (sqlite3_open_v2(str, &sql, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL)) {
54: MQTT_RTLM_LOG(sql);
55: sqlite3_close(sql);
56: return NULL;
57: }
58:
59: if (sqlite3_exec(sql, sql_schema, NULL, NULL, NULL)) {
60: MQTT_RTLM_LOG(sql);
61: sqlite3_close(sql);
62: return NULL;
63: }
64: return sql;
65: }
66:
67: /*
68: * mqtt_rtlm_close() Close database connection
69: *
70: * @sql = SQL handle
71: * return: none
72: */
73: void
74: mqtt_rtlm_close(sqlite3 *sql)
75: {
76: sqlite3_close(sql);
77: }
78:
79: /*
80: * mqtt_rtlm_init_session() Create session
81: *
82: * @cfg = loaded config
83: * @sql = SQL handle
84: * @connid = connection id
85: * @user = username
86: * @host = hostname
87: * @will = will flag if !=0 must fill arguments
88: * @... = will arguments in order topic,msg,qos,retain
89: * return: -1 error, 0 session already appears or >0 row changed
90: */
91: int
92: mqtt_rtlm_init_session(sl_config *cfg, sqlite3 *sql, const char *connid, const char *user,
93: const char *host, char will, ...)
94: {
95: va_list lst;
96: int ret = 0;
97: char *str, szStmt[BUFSIZ] = { 0 };
98: sqlite3_stmt *stmt;
99:
100: if (!cfg || !sql)
101: return -1;
102:
103: str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
104: if (!str) {
105: mqtt_rtlm_log("Error:: not found online table name");
106: return -1;
107: }
108: if (!will)
109: snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (ConnID, Username, RemoteHost, "
110: "WillFlag) VALUES ('%s', '%s', '%s', 0);", str, connid, user, host);
111: else {
112: va_start(lst, will);
113: snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (ConnID, Username, RemoteHost, "
114: "WillFlag, WillRetain, WillQoS, WillMsg, WillTopic) "
115: "VALUES ('%s', '%s', '%s', %d, %d, %d, '%s', '%s');",
116: str, connid, user, host, will,
117: va_arg(lst, int), va_arg(lst, int), va_arg(lst, char*), va_arg(lst, char*));
118: va_end(lst);
119: }
120:
121: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
122: MQTT_RTLM_LOG(sql);
123: return -1;
124: }
125: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
126: ret = sqlite3_changes(sql);
127: else {
128: if (ret > SQLITE_OK && ret < SQLITE_ROW)
129: MQTT_RTLM_LOG(sql);
130: ret = 0;
131: }
132: sqlite3_finalize(stmt);
133:
134: return ret;
135: }
136:
137: /*
138: * mqtt_rtlm_fini_session() Delete session(s)
139: *
140: * @cfg = loaded config
141: * @sql = SQL handle
142: * @connid = connection id
143: * @user = username
144: * @host = hostname
145: * return: -1 error, 0 session already appears or >0 row changed
146: */
147: int
148: mqtt_rtlm_fini_session(sl_config *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
149: {
150: int ret = 0;
151: char *str, szStmt[BUFSIZ] = { 0 };
152: sqlite3_stmt *stmt;
153:
154: if (!cfg || !sql)
155: return -1;
156:
157: str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
158: if (!str) {
159: mqtt_rtlm_log("Error:: not found online table name");
160: return -1;
161: }
162: snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE ConnID = '%s' AND Username = '%s' "
163: "AND RemoteHost LIKE '%s';", str, connid, user, host);
164:
165: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
166: MQTT_RTLM_LOG(sql);
167: return -1;
168: }
169: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
170: ret = sqlite3_changes(sql);
171: else {
172: if (ret > SQLITE_OK && ret < SQLITE_ROW)
173: MQTT_RTLM_LOG(sql);
174: ret = 0;
175: }
176: sqlite3_finalize(stmt);
177:
178: return ret;
179: }
180:
181: /*
182: * mqtt_rtlm_chk_session() Check session(s)
183: *
184: * @cfg = loaded config
185: * @sql = SQL handle
186: * @connid = connection id
187: * @user = username
188: * @host = hostname
189: * return: -1 error, 0 not logged or >0 logged found rows
190: */
191: int
192: mqtt_rtlm_chk_session(sl_config *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
193: {
194: int ret = 0;
195: char *str, szStmt[BUFSIZ] = { 0 };
196: sqlite3_stmt *stmt;
197:
198: if (!cfg || !sql)
199: return -1;
200:
201: str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
202: if (!str) {
203: mqtt_rtlm_log("Error:: not found online table name");
204: return -1;
205: }
206: snprintf(szStmt, sizeof szStmt, "SELECT ConnID, RemoteHost FROM %s WHERE "
207: "ConnID = '%s' AND Username = '%s' AND RemoteHost LIKE '%s';",
208: str, connid, user, host);
209:
210: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
211: MQTT_RTLM_LOG(sql);
212: return -1;
213: }
214: if (sqlite3_step(stmt) == SQLITE_ROW)
215: ret = sqlite3_changes(sql);
216: else
217: ret = 0;
218: sqlite3_finalize(stmt);
219:
220: return ret;
221: }
222:
223: /*
224: * mqtt_rtlm_write_topic() Publish topic
225: *
226: * @cfg = loaded config
227: * @sql = SQL handle
228: * @msgid = MessageID
229: * @topic = topic
230: * @txt = text
231: * @user = username
232: * @host = hostname
233: * @retain = !=0 retain message to database
234: * return: -1 error, 0 no publish or >0 published ok
235: */
236: int
237: mqtt_rtlm_write_topic(sl_config *cfg, sqlite3 *sql, u_short msgid, const char *topic, const char *txt,
238: const char *user, const char *host, char retain)
239: {
240: int ret = 0;
241: char *str, szStmt[BUFSIZ] = { 0 };
242: sqlite3_stmt *stmt;
243:
244: if (!cfg || !sql || !topic)
245: return -1;
246:
247: str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));
248: if (!str) {
249: mqtt_rtlm_log("Error:: not found topics table name");
250: return -1;
251: }
252: snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Retain, MsgID, Topic, Value, PubUser, "
253: "PubDate, PubHost) VALUES (%d, %d, '%s', '%s', '%s', "
254: "datetime('now', 'localtime'), '%s');",
255: str, retain, msgid, topic, txt, user, host);
256:
257: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
258: MQTT_RTLM_LOG(sql);
259: return -1;
260: }
261: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
262: ret = sqlite3_changes(sql);
263: else {
264: if (ret > SQLITE_OK && ret < SQLITE_ROW)
265: MQTT_RTLM_LOG(sql);
266: ret = 0;
267: }
268: sqlite3_finalize(stmt);
269:
270: return ret;
271: }
272:
273: /*
274: * mqtt_rtlm_delete_topic() Delete topic
275: *
276: * @cfg = loaded config
277: * @sql = SQL handle
278: * @msgid = MessageID
279: * @topic = topic
280: * @user = username
281: * @host = hostname
282: * @retain = -1 no matter
283: * return: -1 error, 0 no changes or >0 deleted rows
284: */
285: int
286: mqtt_rtlm_delete_topic(sl_config *cfg, sqlite3 *sql, u_short msgid, const char *topic,
287: const char *user, const char *host, char retain)
288: {
289: int ret = 0;
290: char *str, *rtn, szStmt[BUFSIZ] = { 0 };
291: sqlite3_stmt *stmt;
292:
293: if (!cfg || !sql || !topic)
294: return -1;
295:
296: str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));
297: if (!str) {
298: mqtt_rtlm_log("Error:: not found topics table name");
299: return -1;
300: }
301: switch (retain) {
302: case -1:
303: rtn = "";
304: break;
305: case 0:
306: rtn = "AND Retain = 0";
307: break;
308: default:
309: rtn = "AND Retain != 0";
310: break;
311: }
312: snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE MsgID = %d AND Topic LIKE '%s' AND "
313: "PubUser LIKE '%s' AND PubHost LIKE '%s' %s;", str,
314: msgid, topic, user, host, rtn);
315:
316: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
317: MQTT_RTLM_LOG(sql);
318: return -1;
319: }
320: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
321: ret = sqlite3_changes(sql);
322: else {
323: if (ret > SQLITE_OK && ret < SQLITE_ROW)
324: MQTT_RTLM_LOG(sql);
325: ret = 0;
326: }
327: sqlite3_finalize(stmt);
328:
329: return ret;
330: }
331:
332: /*
333: * mqtt_rtlm_read_topic() Get topic
334: *
335: * @cfg = loaded config
336: * @sql = SQL handle
337: * @msgid = MessageID
338: * @topic = topic
339: * @retain = retain 0 get only dynamic, >0 get only retained and -1 no matter
340: * return: NULL error or not found and !=NULL allocated subscribe topics
341: */
342: mqtt_subscr_t *
343: mqtt_rtlm_read_topic(sl_config *cfg, sqlite3 *sql, u_short msgid, const char *topic, char retain)
344: {
345: int rowz = 0;
346: char *str, szStr[STRSIZ], szStmt[BUFSIZ] = { 0 };
347: sqlite3_stmt *stmt;
348: register int j;
349: mqtt_subscr_t *s = NULL;
350:
351: if (!cfg || !sql || !topic)
352: return NULL;
353:
354: switch (retain) {
355: case -1:
356: memset(szStr, 0, sizeof szStr);
357: break;
358: case 0:
359: snprintf(szStr, sizeof szStr, "AND Retain = 0");
360: break;
361: default:
362: snprintf(szStr, sizeof szStr, "AND Retain > 0");
363: break;
364: }
365:
366: str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));
367: if (!str) {
368: mqtt_rtlm_log("Error:: not found topics table name");
369: return NULL;
370: }
371: snprintf(szStmt, sizeof szStmt, "SELECT Retain, Topic, Value FROM %s WHERE "
372: "MsgID = %d AND Topic LIKE '%s' %s;",
373: str, msgid, topic, szStr);
374:
375: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
376: MQTT_RTLM_LOG(sql);
377: return NULL;
378: }
379:
380: /* calculate count of rows and allocate subscribe items */
381: while (sqlite3_step(stmt) == SQLITE_ROW)
382: rowz++;
383: if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
384: mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
385: goto end;
386: } else
387: memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
388: sqlite3_reset(stmt);
389:
390: /* fill with data */
391: for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
392: s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
1.2.2.1 ! misho 393: s[j].sub_topic.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
! 394: s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
! 395: s[j].sub_value.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 2));
! 396: s[j].sub_value.msg_len = strlen((char*) s[j].sub_value.msg_base);
1.2 misho 397: }
398: end:
399: sqlite3_finalize(stmt);
400:
401: return s;
402: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>