1: #include "global.h"
2: #include "mqttd.h"
3: #include "rtlm.h"
4: #include "mqttd_calls.h"
5:
6:
7: static inline struct tagPkt *
8: mkPkt(void * __restrict data, int dlen)
9: {
10: struct tagPkt *p = NULL;
11:
12: p = io_malloc(sizeof(struct tagPkt));
13: if (!p) {
14: ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());
15: return NULL;
16: } else
17: memset(p, 0, sizeof(struct tagPkt));
18:
19: p->pkt_data = io_allocVar();
20: if (!p->pkt_data) {
21: ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());
22: io_free(p);
23: return NULL;
24: }
25:
26: if (data && dlen > 0)
27: AIT_SET_BUF(p->pkt_data, data, dlen);
28:
29: return p;
30: }
31:
32: static inline void
33: freePkt(struct tagPkt * __restrict p)
34: {
35: if (!p)
36: return;
37:
38: io_freeVar(&p->pkt_data);
39: io_free(p);
40: }
41:
42: static void *
43: sendPacket(sched_task_t *task)
44: {
45: struct tagPkt *p = TASK_ARG(task);
46: register int n, slen;
47: u_char *pos;
48:
49: if (!p || !p->pkt_data || AIT_ISEMPTY(p->pkt_data)) {
50: ioDEBUG(9, "Error:: invalid packet or found empty content ...");
51: return NULL;
52: }
53:
54: for (slen = AIT_LEN(p->pkt_data), pos = AIT_GET_BUF(p->pkt_data); slen > 0;
55: slen -= n, pos += n) {
56: n = send(TASK_FD(task), pos, slen, MSG_NOSIGNAL);
57: if (n == -1) {
58: ioSYSERR(0);
59: break;
60: }
61: }
62:
63: freePkt(p);
64: return NULL;
65: }
66:
67:
68: static int
69: pubOnce(struct tagSession *sess, u_short mid, char * __restrict psTopic,
70: int topicLen, char * __restrict data, int datlen)
71: {
72: return 0;
73: }
74:
75: static int
76: pubAck(struct tagSession *sess, u_short mid, char * __restrict psTopic,
77: int topicLen, char * __restrict data, int datlen)
78: {
79: return 0;
80: }
81:
82: static int
83: pubExactly(struct tagSession *sess, u_short mid, char * __restrict psTopic,
84: int topicLen, char * __restrict data, int datlen)
85: {
86: return 0;
87: }
88:
89:
90: int
91: cmdPUBLISH(void *srv, int len, void *arg)
92: {
93: struct mqtthdr *hdr;
94: struct tagSession *sess = (struct tagSession*) arg;
95: void *data = NULL;
96: char szTopic[STRSIZ] = { 0 };
97: int siz = 0;
98: u_short mid = 0;
99:
100: ioTRACE(2);
101:
102: if (!sess)
103: return -1;
104:
105: ioDEBUG(5, "Exec PUBLISH session");
106: siz = mqtt_readPUBLISH(sess->sess_buf, szTopic, sizeof szTopic, &mid, &data);
107: if (siz == -1) {
108: ioDEBUG(5, "Error:: in readPUBLISH #%d - %s", mqtt_GetErrno(), mqtt_GetError());
109: return 0;
110: }
111:
112: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
113: switch (hdr->mqtt_msg.qos) {
114: case MQTT_QOS_ACK:
115: pubAck(sess, mid, szTopic, sizeof szTopic, data, siz);
116: siz = mqtt_msgPUBACK(sess->sess_buf, mid);
117: if (siz == -1) {
118: ioDEBUG(5, "Error:: in msgPUBACK #%d - %s",
119: mqtt_GetErrno(), mqtt_GetError());
120: goto end;
121: }
122: break;
123: case MQTT_QOS_EXACTLY:
124: pubExactly(sess, mid, szTopic, sizeof szTopic, data, siz);
125: siz = mqtt_msgPUBREC(sess->sess_buf, mid);
126: if (siz == -1) {
127: ioDEBUG(5, "Error:: in msgPUBREC #%d - %s",
128: mqtt_GetErrno(), mqtt_GetError());
129: goto end;
130: }
131: break;
132: case MQTT_QOS_ONCE:
133: pubOnce(sess, mid, szTopic, sizeof szTopic, data, siz);
134: default:
135: goto end;
136: }
137:
138: if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1)
139: ioSYSERR(0);
140: else {
141: ioDEBUG(5, "Sended %d bytes.", siz);
142: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
143: }
144: end:
145: if (data)
146: free(data);
147: return 0;
148: }
149:
150: int
151: cmdPUBREL(void *srv, int len, void *arg)
152: {
153: struct tagSession *sess = (struct tagSession*) arg;
154: int siz = 0;
155: u_short mid = 0;
156:
157: ioTRACE(2);
158:
159: if (!sess)
160: return -1;
161:
162: ioDEBUG(5, "Exec PUBREL session");
163: mid = mqtt_readPUBREL(sess->sess_buf);
164: if (mid == (u_short) -1) {
165: ioDEBUG(5, "Error:: in readPUBREL #%d - %s", mqtt_GetErrno(), mqtt_GetError());
166: return 0;
167: }
168:
169: // TODO:: Delete from database topic
170:
171: siz = mqtt_msgPUBCOMP(sess->sess_buf, mid);
172: if (siz == -1) {
173: ioDEBUG(5, "Error:: in msgPUBCOMP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
174: return 0;
175: }
176: if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1)
177: ioSYSERR(0);
178: else {
179: ioDEBUG(5, "Sended %d bytes.", siz);
180: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
181: }
182:
183: return 0;
184: }
185:
186: int
187: cmdSUBSCRIBE(void *srv, int len, void *arg)
188: {
189: struct tagSession *sess = (struct tagSession*) arg;
190: mqtt_subscr_t *subs = NULL;
191: int siz = 0;
192: u_short mid = 0;
193: register int i;
194: struct tagStore *store;
195: char buf[BUFSIZ];
196: void *ptr;
197: struct tagPkt *p = NULL;
198:
199: ioTRACE(2);
200:
201: if (!sess)
202: return -1;
203:
204: ioDEBUG(5, "Exec SUBSCRIBE session");
205: siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
206: if (siz == -1) {
207: ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
208: return 0;
209: }
210:
211: /* add to db */
212: for (i = 0; i < siz; i++) {
213: /* convert topic to sql search statement */
214: if (mqtt_sqlTopic(subs[i].sub_topic.msg_base, buf, sizeof buf) == -1) {
215: ioDEBUG(5, "Error:: in db #%d - %s", mqtt_GetErrno(), mqtt_GetError());
216: goto end;
217: }
218: if (call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, buf,
219: sess->sess_user, sess->sess_addr, subs[i].sub_ret) > 0) {
220: store = io_malloc(sizeof(struct tagStore));
221: if (!store) {
222: ioSYSERR(0);
223: goto end;
224: } else {
225: store->st_msgid = mid;
226: mqtt_subCopy(&store->st_subscr, &subs[i]);
227: }
228:
229: /* add to cache */
230: SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
231:
232: /* convert topic to regexp */
233: if (mqtt_expandTopic(subs[i].sub_topic.msg_base, buf, sizeof buf, 1, 0) == -1) {
234: ioDEBUG(5, "Error:: in regexp #%d - %s", mqtt_GetErrno(), mqtt_GetError());
235: goto end;
236: } else {
237: ptr = realloc(store->st_subscr.sub_topic.msg_base, strlen(buf) + 1);
238: if (!ptr) {
239: ioSYSERR(0);
240: goto end;
241: } else {
242: store->st_subscr.sub_topic.msg_base = ptr;
243: store->st_subscr.sub_topic.msg_len = strlen(buf) + 1;
244: memcpy(store->st_subscr.sub_topic.msg_base, buf,
245: store->st_subscr.sub_topic.msg_len);
246: }
247: }
248:
249: call.LOG(logg, "Added [%s] SUBSCRIBE '%s'(%d) from %s\n", sess->sess_cid,
250: store->st_subscr.sub_topic.msg_base,
251: store->st_subscr.sub_topic.msg_len, sess->sess_addr);
252:
253: subs[i].sub_ret = MQTT_QOS_PASS;
254: } else
255: subs[i].sub_ret = MQTT_QOS_DENY;
256: }
257:
258: /* send acknowledge */
259: siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
260: if (siz == -1) {
261: ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
262: goto end;
263: } else {
264: p = mkPkt(sess->sess_buf->msg_base, siz);
265: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
266: }
267:
268: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
269: end:
270: mqtt_subFree(&subs);
271: return 0;
272: }
273:
274: int
275: cmdUNSUBSCRIBE(void *srv, int len, void *arg)
276: {
277: struct tagSession *sess = (struct tagSession*) arg;
278: mqtt_subscr_t *subs = NULL;
279: int siz = 0;
280: u_short mid = 0;
281: register int i;
282: struct tagStore *store, *tmp;
283: struct tagPkt *p = NULL;
284:
285: ioTRACE(2);
286:
287: if (!sess)
288: return -1;
289:
290: ioDEBUG(5, "Exec UNSUBSCRIBE session");
291: siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs);
292: if (siz == -1) {
293: ioDEBUG(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
294: return 0;
295: }
296:
297: /* del from db */
298: for (i = 0; i < siz; i++) {
299: SLIST_FOREACH_SAFE(store, &sess->sess_subscr, st_node, tmp) {
300: if (store->st_subscr.sub_ret == subs[i].sub_ret &&
301: store->st_subscr.sub_topic.msg_base &&
302: !strcmp(store->st_subscr.sub_topic.msg_base,
303: subs[i].sub_topic.msg_base)) {
304: SLIST_REMOVE(&sess->sess_subscr, store, tagStore, st_node);
305:
306: if (store->st_subscr.sub_topic.msg_base)
307: free(store->st_subscr.sub_topic.msg_base);
308: if (store->st_subscr.sub_value.msg_base)
309: free(store->st_subscr.sub_value.msg_base);
310: io_free(store);
311: }
312: }
313:
314: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, subs[i].sub_topic.msg_base,
315: sess->sess_user, "%");
316: }
317:
318: /* send acknowledge */
319: siz = mqtt_msgUNSUBACK(sess->sess_buf, mid);
320: if (siz == -1) {
321: ioDEBUG(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
322: goto end;
323: } else {
324: p = mkPkt(sess->sess_buf->msg_base, siz);
325: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
326: }
327:
328: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
329: end:
330: mqtt_subFree(&subs);
331: return 0;
332: }
333:
334: int
335: cmdPINGREQ(void *srv, int len, void *arg)
336: {
337: struct tagSession *sess = (struct tagSession*) arg;
338: int siz = 0;
339: struct tagPkt *p = NULL;
340:
341: ioTRACE(2);
342:
343: if (!sess)
344: return -1;
345:
346: ioDEBUG(5, "Exec PINGREQ session");
347: siz = mqtt_msgPINGRESP(sess->sess_buf);
348: if (siz == -1) {
349: ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
350: return 0;
351: } else {
352: p = mkPkt(sess->sess_buf->msg_base, siz);
353: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
354: }
355:
356: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
357: return 0;
358: }
359:
360: int
361: cmdCONNECT(void *srv, int len, void *arg)
362: {
363: struct tagStore *store;
364: struct tagPkt *p;
365: struct tagSession *sess = (struct tagSession*) arg;
366:
367: ioTRACE(2);
368:
369: if (!sess)
370: return -1;
371:
372: ioDEBUG(5, "Exec CONNECT session");
373: TAILQ_REMOVE(&Sessions, sess, sess_node);
374:
375: if (sess->sess_clean) {
376: if (call.FiniSessPUB)
377: call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
378: if (call.DeletePUB_subscribe)
379: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%");
380: if (call.WipePUB_topic)
381: call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1);
382: }
383:
384: while ((store = SLIST_FIRST(&sess->sess_subscr))) {
385: SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
386:
387: if (store->st_subscr.sub_topic.msg_base)
388: free(store->st_subscr.sub_topic.msg_base);
389: if (store->st_subscr.sub_value.msg_base)
390: free(store->st_subscr.sub_value.msg_base);
391:
392: io_free(store);
393: }
394:
395: while ((p = SLIST_FIRST(&sess->sess_sndpkt))) {
396: SLIST_REMOVE_HEAD(&sess->sess_sndpkt, pkt_node);
397:
398: io_freeVar(&p->pkt_data);
399: io_free(p);
400: }
401:
402: if (sess->sess_will.msg)
403: free(sess->sess_will.msg);
404: if (sess->sess_will.topic)
405: free(sess->sess_will.topic);
406:
407: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
408: sess->sess_addr, sess->sess_user);
409:
410: return -3; /* reconnect client */
411: }
412:
413: int
414: cmdDISCONNECT(void *srv, int len, void *arg)
415: {
416: struct tagSession *sess = (struct tagSession*) arg;
417:
418: ioTRACE(2);
419:
420: if (!sess)
421: return -1;
422:
423: ioDEBUG(5, "Exec DISCONNECT session");
424:
425: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
426: sess->sess_addr, sess->sess_user);
427:
428: return -2; /* must terminate dispatcher */
429: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>