1: #include "global.h"
2: #include "mqttd.h"
3: #include "rtlm.h"
4: #include "mqttd_calls.h"
5:
6:
7: static int
8: pubOnce(struct tagSession *sess, u_short mid, char * __restrict psTopic,
9: int topicLen, char * __restrict data, int datlen)
10: {
11: return 0;
12: }
13:
14: static int
15: pubAck(struct tagSession *sess, u_short mid, char * __restrict psTopic,
16: int topicLen, char * __restrict data, int datlen)
17: {
18: return 0;
19: }
20:
21: static int
22: pubExactly(struct tagSession *sess, u_short mid, char * __restrict psTopic,
23: int topicLen, char * __restrict data, int datlen)
24: {
25: return 0;
26: }
27:
28:
29: int
30: cmdPUBLISH(void *srv, int len, void *arg)
31: {
32: struct mqtthdr *hdr;
33: struct tagSession *sess = (struct tagSession*) arg;
34: void *data = NULL;
35: char szTopic[STRSIZ] = { 0 };
36: int siz = 0;
37: u_short mid = 0;
38:
39: ioTRACE(2);
40:
41: if (!sess)
42: return -1;
43:
44: ioDEBUG(5, "Exec PUBLISH session");
45: siz = mqtt_readPUBLISH(sess->sess_buf, szTopic, sizeof szTopic, &mid, &data);
46: if (siz == -1) {
47: ioDEBUG(5, "Error:: in readPUBLISH #%d - %s", mqtt_GetErrno(), mqtt_GetError());
48: return 0;
49: }
50:
51: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
52: switch (hdr->mqtt_msg.qos) {
53: case MQTT_QOS_ACK:
54: pubAck(sess, mid, szTopic, sizeof szTopic, data, siz);
55: siz = mqtt_msgPUBACK(sess->sess_buf, mid);
56: if (siz == -1) {
57: ioDEBUG(5, "Error:: in msgPUBACK #%d - %s",
58: mqtt_GetErrno(), mqtt_GetError());
59: goto end;
60: }
61: break;
62: case MQTT_QOS_EXACTLY:
63: pubExactly(sess, mid, szTopic, sizeof szTopic, data, siz);
64: siz = mqtt_msgPUBREC(sess->sess_buf, mid);
65: if (siz == -1) {
66: ioDEBUG(5, "Error:: in msgPUBREC #%d - %s",
67: mqtt_GetErrno(), mqtt_GetError());
68: goto end;
69: }
70: break;
71: case MQTT_QOS_ONCE:
72: pubOnce(sess, mid, szTopic, sizeof szTopic, data, siz);
73: default:
74: goto end;
75: }
76:
77: if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1)
78: ioSYSERR(0);
79: else {
80: ioDEBUG(5, "Sended %d bytes.", siz);
81: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
82: }
83: end:
84: if (data)
85: free(data);
86: return 0;
87: }
88:
89: int
90: cmdPUBREL(void *srv, int len, void *arg)
91: {
92: struct tagSession *sess = (struct tagSession*) arg;
93: int siz = 0;
94: u_short mid = 0;
95:
96: ioTRACE(2);
97:
98: if (!sess)
99: return -1;
100:
101: ioDEBUG(5, "Exec PUBREL session");
102: mid = mqtt_readPUBREL(sess->sess_buf);
103: if (mid == (u_short) -1) {
104: ioDEBUG(5, "Error:: in readPUBREL #%d - %s", mqtt_GetErrno(), mqtt_GetError());
105: return 0;
106: }
107:
108: // TODO:: Delete from database topic
109:
110: siz = mqtt_msgPUBCOMP(sess->sess_buf, mid);
111: if (siz == -1) {
112: ioDEBUG(5, "Error:: in msgPUBCOMP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
113: return 0;
114: }
115: if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1)
116: ioSYSERR(0);
117: else {
118: ioDEBUG(5, "Sended %d bytes.", siz);
119: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
120: }
121:
122: return 0;
123: }
124:
125: int
126: cmdSUBSCRIBE(void *srv, int len, void *arg)
127: {
128: struct tagSession *sess = (struct tagSession*) arg;
129: mqtt_subscr_t *subs = NULL;
130: int siz = 0;
131: u_short mid = 0;
132: register int i;
133: struct tagStore *store;
134: char buf[BUFSIZ];
135: void *ptr;
136:
137: ioTRACE(2);
138:
139: if (!sess)
140: return -1;
141:
142: ioDEBUG(5, "Exec SUBSCRIBE session");
143: siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
144: if (siz == -1) {
145: ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
146: return 0;
147: }
148:
149: /* add to db */
150: for (i = 0; i < siz; i++) {
151: /* convert topic to sql search statement */
152: if (mqtt_sqlTopic(subs[i].sub_topic.msg_base, buf, sizeof buf) == -1) {
153: ioDEBUG(5, "Error:: in db #%d - %s", mqtt_GetErrno(), mqtt_GetError());
154: goto end;
155: }
156: if (call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, buf,
157: sess->sess_user, sess->sess_addr, subs[i].sub_ret) > 0) {
158: store = io_malloc(sizeof(struct tagStore));
159: if (!store) {
160: ioSYSERR(0);
161: goto end;
162: } else {
163: store->st_msgid = mid;
164: mqtt_subCopy(&store->st_subscr, &subs[i]);
165: }
166:
167: /* add to cache */
168: SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
169:
170: /* convert topic to regexp */
171: if (mqtt_expandTopic(subs[i].sub_topic.msg_base, buf, sizeof buf, 1, 0) == -1) {
172: ioDEBUG(5, "Error:: in regexp #%d - %s", mqtt_GetErrno(), mqtt_GetError());
173: goto end;
174: } else {
175: ptr = realloc(store->st_subscr.sub_topic.msg_base, strlen(buf) + 1);
176: if (!ptr) {
177: ioSYSERR(0);
178: goto end;
179: } else {
180: store->st_subscr.sub_topic.msg_base = ptr;
181: store->st_subscr.sub_topic.msg_len = strlen(buf) + 1;
182: memcpy(store->st_subscr.sub_topic.msg_base, buf,
183: store->st_subscr.sub_topic.msg_len);
184: }
185: }
186:
187: call.LOG(logg, "Added [%s] SUBSCRIBE '%s'(%d) from %s\n", sess->sess_cid,
188: store->st_subscr.sub_topic.msg_base,
189: store->st_subscr.sub_topic.msg_len, sess->sess_addr);
190:
191: subs[i].sub_ret = MQTT_QOS_PASS;
192: } else
193: subs[i].sub_ret = MQTT_QOS_DENY;
194: }
195:
196: /* send acknowledge */
197: siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
198: if (siz == -1) {
199: ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
200: goto end;
201: }
202: if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1)
203: ioSYSERR(0);
204: else {
205: ioDEBUG(5, "Sended %d bytes.", siz);
206: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
207: }
208: end:
209: mqtt_subFree(&subs);
210: return 0;
211: }
212:
213: int
214: cmdUNSUBSCRIBE(void *srv, int len, void *arg)
215: {
216: struct tagSession *sess = (struct tagSession*) arg;
217: mqtt_subscr_t *subs = NULL;
218: int siz = 0;
219: u_short mid = 0;
220: register int i;
221: struct tagStore *store, *tmp;
222:
223: ioTRACE(2);
224:
225: if (!sess)
226: return -1;
227:
228: ioDEBUG(5, "Exec UNSUBSCRIBE session");
229: siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs);
230: if (siz == -1) {
231: ioDEBUG(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
232: return 0;
233: }
234:
235: /* del from db */
236: for (i = 0; i < siz; i++) {
237: SLIST_FOREACH_SAFE(store, &sess->sess_subscr, st_node, tmp) {
238: if (store->st_subscr.sub_ret == subs[i].sub_ret &&
239: store->st_subscr.sub_topic.msg_base &&
240: !strcmp(store->st_subscr.sub_topic.msg_base,
241: subs[i].sub_topic.msg_base)) {
242: SLIST_REMOVE(&sess->sess_subscr, store, tagStore, st_node);
243:
244: if (store->st_subscr.sub_topic.msg_base)
245: free(store->st_subscr.sub_topic.msg_base);
246: if (store->st_subscr.sub_value.msg_base)
247: free(store->st_subscr.sub_value.msg_base);
248: io_free(store);
249: }
250: }
251:
252: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, subs[i].sub_topic.msg_base,
253: sess->sess_user, "%");
254: }
255:
256: /* send acknowledge */
257: siz = mqtt_msgUNSUBACK(sess->sess_buf, mid);
258: if (siz == -1) {
259: ioDEBUG(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
260: goto end;
261: }
262: if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1)
263: ioSYSERR(0);
264: else {
265: ioDEBUG(5, "Sended %d bytes.", siz);
266: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
267: }
268: end:
269: mqtt_subFree(&subs);
270: return 0;
271: }
272:
273: int
274: cmdPINGREQ(void *srv, int len, void *arg)
275: {
276: struct tagSession *sess = (struct tagSession*) arg;
277: int siz = 0;
278:
279: ioTRACE(2);
280:
281: if (!sess)
282: return -1;
283:
284: ioDEBUG(5, "Exec PINGREQ session");
285: siz = mqtt_msgPINGRESP(sess->sess_buf);
286: if (siz == -1) {
287: ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
288: return 0;
289: }
290: if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1) {
291: ioSYSERR(0);
292: return 0;
293: } else {
294: ioDEBUG(5, "Sended %d bytes.", siz);
295: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
296: }
297:
298: return 0;
299: }
300:
301: int
302: cmdCONNECT(void *srv, int len, void *arg)
303: {
304: struct tagStore *store;
305: struct tagSession *sess = (struct tagSession*) arg;
306:
307: ioTRACE(2);
308:
309: if (!sess)
310: return -1;
311:
312: ioDEBUG(5, "Exec CONNECT session");
313: TAILQ_REMOVE(&Sessions, sess, sess_node);
314:
315: if (sess->sess_clean) {
316: if (call.FiniSessPUB)
317: call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
318: if (call.DeletePUB_subscribe)
319: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%");
320: if (call.WipePUB_topic)
321: call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1);
322: }
323:
324: while ((store = SLIST_FIRST(&sess->sess_subscr))) {
325: SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
326:
327: if (store->st_subscr.sub_topic.msg_base)
328: free(store->st_subscr.sub_topic.msg_base);
329: if (store->st_subscr.sub_value.msg_base)
330: free(store->st_subscr.sub_value.msg_base);
331:
332: io_free(store);
333: }
334:
335: if (sess->sess_will.msg)
336: free(sess->sess_will.msg);
337: if (sess->sess_will.topic)
338: free(sess->sess_will.topic);
339:
340: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
341: sess->sess_addr, sess->sess_user);
342:
343: return -3; /* reconnect client */
344: }
345:
346: int
347: cmdDISCONNECT(void *srv, int len, void *arg)
348: {
349: struct tagSession *sess = (struct tagSession*) arg;
350:
351: ioTRACE(2);
352:
353: if (!sess)
354: return -1;
355:
356: ioDEBUG(5, "Exec DISCONNECT session");
357:
358: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
359: sess->sess_addr, sess->sess_user);
360:
361: return -2; /* must terminate dispatcher */
362: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>