1: #include "global.h"
2: #include "mqttd.h"
3: #include "rtlm.h"
4: #include "mqttd_calls.h"
5:
6:
7: static inline struct tagPkt *
8: mkPkt(void * __restrict data, int dlen)
9: {
10: struct tagPkt *p = NULL;
11:
12: p = io_malloc(sizeof(struct tagPkt));
13: if (!p) {
14: ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());
15: return NULL;
16: } else
17: memset(p, 0, sizeof(struct tagPkt));
18:
19: p->pkt_data = io_allocVar();
20: if (!p->pkt_data) {
21: ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());
22: io_free(p);
23: return NULL;
24: }
25:
26: if (data && dlen > 0)
27: AIT_SET_BUF(p->pkt_data, data, dlen);
28:
29: return p;
30: }
31:
32: static inline void
33: freePkt(struct tagPkt ** __restrict p)
34: {
35: if (!p || !*p)
36: return;
37:
38: io_freeVar(&(*p)->pkt_data);
39: io_free(*p);
40: *p = NULL;
41: }
42:
43: static void *
44: sendPacket(sched_task_t *task)
45: {
46: struct tagPkt *p = TASK_ARG(task);
47: register int n, slen;
48: u_char *pos;
49:
50: if (!p || !p->pkt_data || AIT_ISEMPTY(p->pkt_data)) {
51: ioDEBUG(9, "Error:: invalid packet or found empty content ...");
52: return NULL;
53: }
54:
55: for (slen = AIT_LEN(p->pkt_data), pos = AIT_GET_BUF(p->pkt_data); slen > 0;
56: slen -= n, pos += n) {
57: n = send(TASK_FD(task), pos, slen, MSG_NOSIGNAL);
58: if (n == -1) {
59: ioSYSERR(0);
60: break;
61: }
62: }
63:
64: return NULL;
65: }
66:
67:
68: static int
69: pubOnce(struct tagSession *sess, u_short mid, char * __restrict psTopic,
70: int topicLen, char * __restrict data, int datlen)
71: {
72: return 0;
73: }
74:
75: static int
76: pubAck(struct tagSession *sess, u_short mid, char * __restrict psTopic,
77: int topicLen, char * __restrict data, int datlen)
78: {
79: return 0;
80: }
81:
82: static int
83: pubExactly(struct tagSession *sess, u_short mid, char * __restrict psTopic,
84: int topicLen, char * __restrict data, int datlen)
85: {
86: return 0;
87: }
88:
89:
90: int
91: cmdPUBLISH(void *srv, int len, void *arg)
92: {
93: struct mqtthdr *hdr;
94: struct tagSession *sess = (struct tagSession*) arg;
95: void *data = NULL;
96: char szTopic[STRSIZ] = { 0 };
97: int siz = 0;
98: u_short mid = 0;
99:
100: ioTRACE(2);
101:
102: if (!sess)
103: return -1;
104:
105: ioDEBUG(5, "Exec PUBLISH session");
106: siz = mqtt_readPUBLISH(sess->sess_buf, szTopic, sizeof szTopic, &mid, &data);
107: if (siz == -1) {
108: ioDEBUG(5, "Error:: in readPUBLISH #%d - %s", mqtt_GetErrno(), mqtt_GetError());
109: return 0;
110: }
111:
112: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
113: switch (hdr->mqtt_msg.qos) {
114: case MQTT_QOS_ACK:
115: pubAck(sess, mid, szTopic, sizeof szTopic, data, siz);
116: siz = mqtt_msgPUBACK(sess->sess_buf, mid);
117: if (siz == -1) {
118: ioDEBUG(5, "Error:: in msgPUBACK #%d - %s",
119: mqtt_GetErrno(), mqtt_GetError());
120: goto end;
121: }
122: break;
123: case MQTT_QOS_EXACTLY:
124: pubExactly(sess, mid, szTopic, sizeof szTopic, data, siz);
125: siz = mqtt_msgPUBREC(sess->sess_buf, mid);
126: if (siz == -1) {
127: ioDEBUG(5, "Error:: in msgPUBREC #%d - %s",
128: mqtt_GetErrno(), mqtt_GetError());
129: goto end;
130: }
131: break;
132: case MQTT_QOS_ONCE:
133: pubOnce(sess, mid, szTopic, sizeof szTopic, data, siz);
134: default:
135: goto end;
136: }
137:
138: if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1)
139: ioSYSERR(0);
140: else {
141: ioDEBUG(5, "Sended %d bytes.", siz);
142: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
143: }
144: end:
145: if (data)
146: free(data);
147: return 0;
148: }
149:
150: int
151: cmdPUBREL(void *srv, int len, void *arg)
152: {
153: struct tagSession *sess = (struct tagSession*) arg;
154: int siz = 0;
155: u_short mid = 0;
156:
157: ioTRACE(2);
158:
159: if (!sess)
160: return -1;
161:
162: ioDEBUG(5, "Exec PUBREL session");
163: mid = mqtt_readPUBREL(sess->sess_buf);
164: if (mid == (u_short) -1) {
165: ioDEBUG(5, "Error:: in readPUBREL #%d - %s", mqtt_GetErrno(), mqtt_GetError());
166: return 0;
167: }
168:
169: // TODO:: Delete from database topic
170:
171: siz = mqtt_msgPUBCOMP(sess->sess_buf, mid);
172: if (siz == -1) {
173: ioDEBUG(5, "Error:: in msgPUBCOMP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
174: return 0;
175: }
176: if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1)
177: ioSYSERR(0);
178: else {
179: ioDEBUG(5, "Sended %d bytes.", siz);
180: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
181: }
182:
183: return 0;
184: }
185:
186: int
187: cmdSUBSCRIBE(void *srv, int len, void *arg)
188: {
189: struct tagSession *sess = (struct tagSession*) arg;
190: mqtt_subscr_t *subs = NULL;
191: int siz = 0;
192: u_short mid = 0;
193: register int i;
194: struct tagStore *store;
195: char buf[BUFSIZ];
196: void *ptr;
197:
198: ioTRACE(2);
199:
200: if (!sess)
201: return -1;
202:
203: ioDEBUG(5, "Exec SUBSCRIBE session");
204: siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
205: if (siz == -1) {
206: ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
207: return 0;
208: }
209:
210: /* add to db */
211: for (i = 0; i < siz; i++) {
212: /* convert topic to sql search statement */
213: if (mqtt_sqlTopic(subs[i].sub_topic.msg_base, buf, sizeof buf) == -1) {
214: ioDEBUG(5, "Error:: in db #%d - %s", mqtt_GetErrno(), mqtt_GetError());
215: goto end;
216: }
217: if (call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, buf,
218: sess->sess_user, sess->sess_addr, subs[i].sub_ret) > 0) {
219: store = io_malloc(sizeof(struct tagStore));
220: if (!store) {
221: ioSYSERR(0);
222: goto end;
223: } else {
224: store->st_msgid = mid;
225: mqtt_subCopy(&store->st_subscr, &subs[i]);
226: }
227:
228: /* add to cache */
229: SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
230:
231: /* convert topic to regexp */
232: if (mqtt_expandTopic(subs[i].sub_topic.msg_base, buf, sizeof buf, 1, 0) == -1) {
233: ioDEBUG(5, "Error:: in regexp #%d - %s", mqtt_GetErrno(), mqtt_GetError());
234: goto end;
235: } else {
236: ptr = realloc(store->st_subscr.sub_topic.msg_base, strlen(buf) + 1);
237: if (!ptr) {
238: ioSYSERR(0);
239: goto end;
240: } else {
241: store->st_subscr.sub_topic.msg_base = ptr;
242: store->st_subscr.sub_topic.msg_len = strlen(buf) + 1;
243: memcpy(store->st_subscr.sub_topic.msg_base, buf,
244: store->st_subscr.sub_topic.msg_len);
245: }
246: }
247:
248: call.LOG(logg, "Added [%s] SUBSCRIBE '%s'(%d) from %s\n", sess->sess_cid,
249: store->st_subscr.sub_topic.msg_base,
250: store->st_subscr.sub_topic.msg_len, sess->sess_addr);
251:
252: subs[i].sub_ret = MQTT_QOS_PASS;
253: } else
254: subs[i].sub_ret = MQTT_QOS_DENY;
255: }
256:
257: /* send acknowledge */
258: siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
259: if (siz == -1) {
260: ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
261: goto end;
262: }
263: if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1)
264: ioSYSERR(0);
265: else {
266: ioDEBUG(5, "Sended %d bytes.", siz);
267: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
268: }
269: end:
270: mqtt_subFree(&subs);
271: return 0;
272: }
273:
274: int
275: cmdUNSUBSCRIBE(void *srv, int len, void *arg)
276: {
277: struct tagSession *sess = (struct tagSession*) arg;
278: mqtt_subscr_t *subs = NULL;
279: int siz = 0;
280: u_short mid = 0;
281: register int i;
282: struct tagStore *store, *tmp;
283:
284: ioTRACE(2);
285:
286: if (!sess)
287: return -1;
288:
289: ioDEBUG(5, "Exec UNSUBSCRIBE session");
290: siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs);
291: if (siz == -1) {
292: ioDEBUG(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
293: return 0;
294: }
295:
296: /* del from db */
297: for (i = 0; i < siz; i++) {
298: SLIST_FOREACH_SAFE(store, &sess->sess_subscr, st_node, tmp) {
299: if (store->st_subscr.sub_ret == subs[i].sub_ret &&
300: store->st_subscr.sub_topic.msg_base &&
301: !strcmp(store->st_subscr.sub_topic.msg_base,
302: subs[i].sub_topic.msg_base)) {
303: SLIST_REMOVE(&sess->sess_subscr, store, tagStore, st_node);
304:
305: if (store->st_subscr.sub_topic.msg_base)
306: free(store->st_subscr.sub_topic.msg_base);
307: if (store->st_subscr.sub_value.msg_base)
308: free(store->st_subscr.sub_value.msg_base);
309: io_free(store);
310: }
311: }
312:
313: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, subs[i].sub_topic.msg_base,
314: sess->sess_user, "%");
315: }
316:
317: /* send acknowledge */
318: siz = mqtt_msgUNSUBACK(sess->sess_buf, mid);
319: if (siz == -1) {
320: ioDEBUG(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
321: goto end;
322: }
323: if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1)
324: ioSYSERR(0);
325: else {
326: ioDEBUG(5, "Sended %d bytes.", siz);
327: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
328: }
329: end:
330: mqtt_subFree(&subs);
331: return 0;
332: }
333:
334: int
335: cmdPINGREQ(void *srv, int len, void *arg)
336: {
337: struct tagSession *sess = (struct tagSession*) arg;
338: int siz = 0;
339: struct tagPkt *p = NULL;
340:
341: ioTRACE(2);
342:
343: if (!sess)
344: return -1;
345:
346: ioDEBUG(5, "Exec PINGREQ session");
347: siz = mqtt_msgPINGRESP(sess->sess_buf);
348: if (siz == -1) {
349: ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
350: return 0;
351: } else {
352: p = mkPkt(sess->sess_buf->msg_base, siz);
353: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
354: }
355:
356: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
357: return 0;
358: }
359:
360: int
361: cmdCONNECT(void *srv, int len, void *arg)
362: {
363: struct tagStore *store;
364: struct tagPkt *p;
365: struct tagSession *sess = (struct tagSession*) arg;
366:
367: ioTRACE(2);
368:
369: if (!sess)
370: return -1;
371:
372: ioDEBUG(5, "Exec CONNECT session");
373: TAILQ_REMOVE(&Sessions, sess, sess_node);
374:
375: if (sess->sess_clean) {
376: if (call.FiniSessPUB)
377: call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
378: if (call.DeletePUB_subscribe)
379: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%");
380: if (call.WipePUB_topic)
381: call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1);
382: }
383:
384: while ((store = SLIST_FIRST(&sess->sess_subscr))) {
385: SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
386:
387: if (store->st_subscr.sub_topic.msg_base)
388: free(store->st_subscr.sub_topic.msg_base);
389: if (store->st_subscr.sub_value.msg_base)
390: free(store->st_subscr.sub_value.msg_base);
391:
392: io_free(store);
393: }
394:
395: while ((p = SLIST_FIRST(&sess->sess_sndpkt))) {
396: SLIST_REMOVE_HEAD(&sess->sess_sndpkt, pkt_node);
397:
398: io_freeVar(&p->pkt_data);
399: io_free(p);
400: }
401:
402: if (sess->sess_will.msg)
403: free(sess->sess_will.msg);
404: if (sess->sess_will.topic)
405: free(sess->sess_will.topic);
406:
407: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
408: sess->sess_addr, sess->sess_user);
409:
410: return -3; /* reconnect client */
411: }
412:
413: int
414: cmdDISCONNECT(void *srv, int len, void *arg)
415: {
416: struct tagSession *sess = (struct tagSession*) arg;
417:
418: ioTRACE(2);
419:
420: if (!sess)
421: return -1;
422:
423: ioDEBUG(5, "Exec DISCONNECT session");
424:
425: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
426: sess->sess_addr, sess->sess_user);
427:
428: return -2; /* must terminate dispatcher */
429: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>