Annotation of mqtt/src/pubmqtt.c, revision 1.1.2.10
1.1.2.1 misho 1: #include "global.h"
2:
3:
1.1.2.2 misho 4: /*
5: * mqtt_db_log() Log database connection message
6: *
7: * @fmt = format string
8: * @... = argument list
9: * return: none
10: */
11: static void
12: mqtt_rtlm_log(const char *fmt, ...)
13: {
14: va_list lst;
15:
16: va_start(lst, fmt);
17: vsyslog(LOG_ERR, fmt, lst);
18: va_end(lst);
19: }
1.1.2.3 misho 20: #define MQTT_RTLM_LOG(_sql) (assert((_sql)), mqtt_rtlm_log("Error:: SQL #%d - %s", \
1.1.2.2 misho 21: sqlite3_errcode((_sql)), sqlite3_errmsg((_sql))))
22:
23:
24: /*
25: * mqtt_rtlm_open() Open database connection
26: *
1.1.2.3 misho 27: * @cfg = config filename
1.1.2.2 misho 28: * return: NULL error or SQL handle
29: */
30: sqlite3 *
31: mqtt_rtlm_open(sl_config *cfg)
32: {
33: sqlite3 *sql = NULL;
34: const char *str = NULL;
35:
36: if (!cfg)
37: return NULL;
38:
39: str = (const char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("name"));
40: if (!str) {
41: mqtt_rtlm_log("Error:: Unknown database name ...\n");
42: return NULL;
43: }
44:
45: if (sqlite3_open_v2(str, &sql, SQLITE_OPEN_READWRITE, NULL)) {
1.1.2.3 misho 46: MQTT_RTLM_LOG(sql);
1.1.2.2 misho 47: sqlite3_close(sql);
48: return NULL;
49: }
50:
51: return sql;
52: }
53:
54: /*
55: * mqtt_rtlm_close() Close database connection
56: *
57: * @sql = SQL handle
58: * return: none
59: */
60: void
61: mqtt_rtlm_close(sqlite3 *sql)
62: {
63: sqlite3_close(sql);
64: }
1.1.2.4 misho 65:
66: /*
67: * mqtt_rtlm_init_session() Create session
68: *
69: * @cfg = loaded config
70: * @sql = SQL handle
71: * @user = username
72: * @host = hostname
73: * @port = port
74: * return: -1 error, 0 session already appears or >0 row changed
75: */
76: int
77: mqtt_rtlm_init_session(sl_config *cfg, sqlite3 *sql, const char *user, const char *host, u_short port)
78: {
79: int ret = 0;
80: char *str, szStmt[BUFSIZ] = { 0 };
81: sqlite3_stmt *stmt;
82:
83: if (!cfg || !sql)
84: return -1;
85:
86: str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
87: if (!str) {
1.1.2.8 misho 88: mqtt_rtlm_log("Error:: not found online table name");
1.1.2.4 misho 89: return -1;
90: }
91: snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Username, RemoteHost, RemotePort) "
92: "VALUES ('%s', '%s', %d);", str, user, host, port);
93:
94: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
95: MQTT_RTLM_LOG(sql);
96: return -1;
97: }
98: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
99: ret = sqlite3_changes(sql);
100: else {
101: if (ret > SQLITE_OK && ret < SQLITE_ROW)
102: MQTT_RTLM_LOG(sql);
103: ret = 0;
104: }
105: sqlite3_finalize(stmt);
106:
107: return ret;
108: }
1.1.2.5 misho 109:
110: /*
111: * mqtt_rtlm_fini_session() Delete session(s)
112: *
113: * @cfg = loaded config
114: * @sql = SQL handle
115: * @user = username
116: * @host = hostname
117: * return: -1 error, 0 session already appears or >0 row changed
118: */
119: int
120: mqtt_rtlm_fini_session(sl_config *cfg, sqlite3 *sql, const char *user, const char *host)
121: {
122: int ret = 0;
123: char *str, szStmt[BUFSIZ] = { 0 };
124: sqlite3_stmt *stmt;
125:
126: if (!cfg || !sql)
127: return -1;
128:
129: str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
130: if (!str) {
1.1.2.8 misho 131: mqtt_rtlm_log("Error:: not found online table name");
1.1.2.5 misho 132: return -1;
133: }
134: snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE Username = '%s' AND RemoteHost LIKE '%s';",
135: str, user, host);
136:
137: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
138: MQTT_RTLM_LOG(sql);
139: return -1;
140: }
141: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
142: ret = sqlite3_changes(sql);
143: else {
144: if (ret > SQLITE_OK && ret < SQLITE_ROW)
145: MQTT_RTLM_LOG(sql);
146: ret = 0;
147: }
148: sqlite3_finalize(stmt);
149:
150: return ret;
151: }
1.1.2.6 misho 152:
153: /*
154: * mqtt_rtlm_chk_session() Check session(s)
155: *
156: * @cfg = loaded config
157: * @sql = SQL handle
158: * @user = username
159: * @host = hostname
160: * return: -1 error, 0 not logged or >0 logged found rows
161: */
162: int
163: mqtt_rtlm_chk_session(sl_config *cfg, sqlite3 *sql, const char *user, const char *host)
164: {
165: int ret = 0;
166: char *str, szStmt[BUFSIZ] = { 0 };
167: sqlite3_stmt *stmt;
168:
169: if (!cfg || !sql)
170: return -1;
171:
172: str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
173: if (!str) {
1.1.2.8 misho 174: mqtt_rtlm_log("Error:: not found online table name");
1.1.2.6 misho 175: return -1;
176: }
177: snprintf(szStmt, sizeof szStmt, "SELECT RemoteHost, RemotePort FROM %s WHERE "
178: "Username = '%s' AND RemoteHost LIKE '%s';", str, user, host);
179:
180: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
181: MQTT_RTLM_LOG(sql);
182: return -1;
183: }
1.1.2.7 misho 184: if (sqlite3_step(stmt) == SQLITE_ROW)
185: ret = sqlite3_changes(sql);
186: else
187: ret = 0;
1.1.2.6 misho 188: sqlite3_finalize(stmt);
189:
190: return ret;
191: }
1.1.2.8 misho 192:
193: /*
194: * mqtt_rtlm_write_topic() Publish topic
195: *
196: * @cfg = loaded config
197: * @sql = SQL handle
198: * @topic = topic
199: * @txt = text
200: * @user = username
201: * @host = hostname
202: * @retain = !=0 retain message to database
203: * return: -1 error, 0 no publish or >0 published ok
204: */
205: int
206: mqtt_rtlm_write_topic(sl_config *cfg, sqlite3 *sql, const char *topic, const char *txt,
207: const char *user, const char *host, char retain)
208: {
209: int ret = 0;
210: char *str, szStmt[BUFSIZ] = { 0 };
211: sqlite3_stmt *stmt;
212:
213: if (!cfg || !sql || !topic)
214: return -1;
215:
216: str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));
217: if (!str) {
218: mqtt_rtlm_log("Error:: not found topics table name");
219: return -1;
220: }
221: snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Retain, Topic, Value, PubUser, PubDate, PubHost) "
222: "VALUES (%d, '%s', '%s', '%s', datetime('now', 'localtime'), '%s');", str,
223: retain, topic, txt, user, host);
224:
225: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
226: MQTT_RTLM_LOG(sql);
227: return -1;
228: }
229: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
230: ret = sqlite3_changes(sql);
231: else {
232: if (ret > SQLITE_OK && ret < SQLITE_ROW)
233: MQTT_RTLM_LOG(sql);
234: ret = 0;
235: }
236: sqlite3_finalize(stmt);
237:
238: return ret;
239: }
240:
241: /*
242: * mqtt_rtlm_delete_topic() Delete topic
243: *
244: * @cfg = loaded config
245: * @sql = SQL handle
246: * @topic = topic
247: * @user = username
248: * @host = hostname
249: * @retain = -1 no matter
250: * return: -1 error, 0 no changes or >0 deleted rows
251: */
252: int
253: mqtt_rtlm_delete_topic(sl_config *cfg, sqlite3 *sql, const char *topic,
254: const char *user, const char *host, char retain)
255: {
256: int ret = 0;
257: char *str, *rtn, szStmt[BUFSIZ] = { 0 };
258: sqlite3_stmt *stmt;
259:
260: if (!cfg || !sql || !topic)
261: return -1;
262:
263: str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));
264: if (!str) {
265: mqtt_rtlm_log("Error:: not found topics table name");
266: return -1;
267: }
268: switch (retain) {
269: case -1:
270: rtn = "";
271: break;
272: case 0:
273: rtn = "AND Retain = 0";
274: break;
275: default:
276: rtn = "AND Retain != 0";
277: break;
278: }
279: snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE Topic LIKE '%s' AND "
280: "PubUser LIKE '%s' AND PubHost LIKE '%s' %s;", str,
281: topic, user, host, rtn);
282:
283: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
284: MQTT_RTLM_LOG(sql);
285: return -1;
286: }
287: if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
288: ret = sqlite3_changes(sql);
289: else {
290: if (ret > SQLITE_OK && ret < SQLITE_ROW)
291: MQTT_RTLM_LOG(sql);
292: ret = 0;
293: }
294: sqlite3_finalize(stmt);
295:
296: return ret;
297: }
1.1.2.9 misho 298:
299: /*
300: * mqtt_rtlm_read_topic() Get topic
301: *
302: * @cfg = loaded config
303: * @sql = SQL handle
304: * @topic = topic
1.1.2.10! misho 305: * @retain = retain 0 get only dynamic, >0 get only retained and -1 no matter
! 306: * return: NULL error or not found and !=NULL allocated subscribe topics
1.1.2.9 misho 307: */
1.1.2.10! misho 308: mqtt_subscr_t *
! 309: mqtt_rtlm_read_topic(sl_config *cfg, sqlite3 *sql, const char *topic, char retain)
1.1.2.9 misho 310: {
1.1.2.10! misho 311: int rowz = 0;
! 312: char *str, szStr[STRSIZ], szStmt[BUFSIZ] = { 0 };
1.1.2.9 misho 313: sqlite3_stmt *stmt;
1.1.2.10! misho 314: register int j;
! 315: mqtt_subscr_t *s = NULL;
1.1.2.9 misho 316:
1.1.2.10! misho 317: if (!cfg || !sql || !topic)
! 318: return NULL;
! 319:
! 320: switch (retain) {
! 321: case -1:
! 322: memset(szStr, 0, sizeof szStr);
! 323: break;
! 324: case 0:
! 325: snprintf(szStr, sizeof szStr, "AND Retain = 0");
! 326: break;
! 327: default:
! 328: snprintf(szStr, sizeof szStr, "AND Retain > 0");
! 329: break;
! 330: }
1.1.2.9 misho 331:
332: str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));
333: if (!str) {
334: mqtt_rtlm_log("Error:: not found topics table name");
1.1.2.10! misho 335: return NULL;
1.1.2.9 misho 336: }
1.1.2.10! misho 337: snprintf(szStmt, sizeof szStmt, "SELECT Retain, Topic, Value FROM %s WHERE Topic LIKE '%s' %s;",
! 338: str, topic, szStr);
1.1.2.9 misho 339:
340: if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
341: MQTT_RTLM_LOG(sql);
1.1.2.10! misho 342: return NULL;
1.1.2.9 misho 343: }
1.1.2.10! misho 344:
! 345: /* calculate count of rows and allocate subscribe items */
! 346: while (sqlite3_step(stmt) == SQLITE_ROW)
! 347: rowz++;
! 348: if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
! 349: mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
! 350: goto end;
! 351: } else
! 352: memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
! 353: sqlite3_reset(stmt);
! 354:
! 355: /* fill with data */
! 356: for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
! 357: s[j].sub_ret = *sqlite3_column_text(stmt, 0);
! 358: s[j].sub_topic._base = strdup(sqlite3_column_text(stmt, 1));
! 359: s[j].sub_topic._size = strlen(s[j].sub_topic._base);
! 360: s[j].sub_value._base = strdup(sqlite3_column_text(stmt, 2));
! 361: s[j].sub_value._size = strlen(s[j].sub_value._base);
1.1.2.9 misho 362: }
1.1.2.10! misho 363: end:
1.1.2.9 misho 364: sqlite3_finalize(stmt);
365:
1.1.2.10! misho 366: return s;
1.1.2.9 misho 367: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>