Annotation of mqtt/src/mqttd_calls.c, revision 1.2.2.21
1.2 misho 1: #include "global.h"
2: #include "mqttd.h"
1.2.2.1 misho 3: #include "rtlm.h"
1.2 misho 4: #include "mqttd_calls.h"
5:
6:
1.2.2.19 misho 7: static inline struct tagPkt *
8: mkPkt(void * __restrict data, int dlen)
9: {
10: struct tagPkt *p = NULL;
11:
12: p = io_malloc(sizeof(struct tagPkt));
13: if (!p) {
14: ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());
15: return NULL;
16: } else
17: memset(p, 0, sizeof(struct tagPkt));
18:
19: p->pkt_data = io_allocVar();
20: if (!p->pkt_data) {
21: ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());
22: io_free(p);
23: return NULL;
24: }
25:
26: if (data && dlen > 0)
27: AIT_SET_BUF(p->pkt_data, data, dlen);
28:
29: return p;
30: }
31:
32: static inline void
1.2.2.20 misho 33: freePkt(struct tagPkt * __restrict p)
1.2.2.19 misho 34: {
1.2.2.20 misho 35: if (!p)
1.2.2.19 misho 36: return;
37:
1.2.2.20 misho 38: io_freeVar(&p->pkt_data);
39: io_free(p);
1.2.2.19 misho 40: }
41:
42: static void *
43: sendPacket(sched_task_t *task)
44: {
45: struct tagPkt *p = TASK_ARG(task);
46: register int n, slen;
47: u_char *pos;
48:
49: if (!p || !p->pkt_data || AIT_ISEMPTY(p->pkt_data)) {
50: ioDEBUG(9, "Error:: invalid packet or found empty content ...");
51: return NULL;
52: }
53:
54: for (slen = AIT_LEN(p->pkt_data), pos = AIT_GET_BUF(p->pkt_data); slen > 0;
55: slen -= n, pos += n) {
56: n = send(TASK_FD(task), pos, slen, MSG_NOSIGNAL);
57: if (n == -1) {
58: ioSYSERR(0);
59: break;
60: }
61: }
62:
1.2.2.20 misho 63: freePkt(p);
1.2.2.19 misho 64: return NULL;
65: }
66:
67:
1.2.2.17 misho 68: static int
69: pubOnce(struct tagSession *sess, u_short mid, char * __restrict psTopic,
70: int topicLen, char * __restrict data, int datlen)
71: {
72: return 0;
73: }
74:
75: static int
76: pubAck(struct tagSession *sess, u_short mid, char * __restrict psTopic,
77: int topicLen, char * __restrict data, int datlen)
78: {
79: return 0;
80: }
81:
82: static int
83: pubExactly(struct tagSession *sess, u_short mid, char * __restrict psTopic,
84: int topicLen, char * __restrict data, int datlen)
85: {
86: return 0;
87: }
88:
89:
1.2 misho 90: int
1.2.2.9 misho 91: cmdPUBLISH(void *srv, int len, void *arg)
1.2 misho 92: {
93: struct mqtthdr *hdr;
1.2.2.1 misho 94: struct tagSession *sess = (struct tagSession*) arg;
1.2.2.17 misho 95: void *data = NULL;
96: char szTopic[STRSIZ] = { 0 };
97: int siz = 0;
98: u_short mid = 0;
1.2 misho 99:
100: ioTRACE(2);
101:
102: if (!sess)
103: return -1;
104:
1.2.2.17 misho 105: ioDEBUG(5, "Exec PUBLISH session");
106: siz = mqtt_readPUBLISH(sess->sess_buf, szTopic, sizeof szTopic, &mid, &data);
107: if (siz == -1) {
108: ioDEBUG(5, "Error:: in readPUBLISH #%d - %s", mqtt_GetErrno(), mqtt_GetError());
109: return 0;
110: }
111:
1.2 misho 112: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
113: switch (hdr->mqtt_msg.qos) {
114: case MQTT_QOS_ACK:
1.2.2.17 misho 115: pubAck(sess, mid, szTopic, sizeof szTopic, data, siz);
116: siz = mqtt_msgPUBACK(sess->sess_buf, mid);
117: if (siz == -1) {
118: ioDEBUG(5, "Error:: in msgPUBACK #%d - %s",
119: mqtt_GetErrno(), mqtt_GetError());
120: goto end;
121: }
1.2 misho 122: break;
123: case MQTT_QOS_EXACTLY:
1.2.2.17 misho 124: pubExactly(sess, mid, szTopic, sizeof szTopic, data, siz);
125: siz = mqtt_msgPUBREC(sess->sess_buf, mid);
126: if (siz == -1) {
127: ioDEBUG(5, "Error:: in msgPUBREC #%d - %s",
128: mqtt_GetErrno(), mqtt_GetError());
129: goto end;
130: }
1.2 misho 131: break;
1.2.2.17 misho 132: case MQTT_QOS_ONCE:
133: pubOnce(sess, mid, szTopic, sizeof szTopic, data, siz);
1.2 misho 134: default:
1.2.2.17 misho 135: goto end;
1.2 misho 136: }
137:
1.2.2.17 misho 138: if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1)
139: ioSYSERR(0);
140: else {
141: ioDEBUG(5, "Sended %d bytes.", siz);
142: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
143: }
144: end:
145: if (data)
146: free(data);
1.2 misho 147: return 0;
148: }
1.2.2.1 misho 149:
150: int
1.2.2.9 misho 151: cmdPUBREL(void *srv, int len, void *arg)
1.2.2.1 misho 152: {
153: struct tagSession *sess = (struct tagSession*) arg;
1.2.2.17 misho 154: int siz = 0;
155: u_short mid = 0;
1.2.2.1 misho 156:
157: ioTRACE(2);
158:
159: if (!sess)
160: return -1;
161:
1.2.2.17 misho 162: ioDEBUG(5, "Exec PUBREL session");
163: mid = mqtt_readPUBREL(sess->sess_buf);
164: if (mid == (u_short) -1) {
165: ioDEBUG(5, "Error:: in readPUBREL #%d - %s", mqtt_GetErrno(), mqtt_GetError());
166: return 0;
167: }
168:
169: // TODO:: Delete from database topic
170:
171: siz = mqtt_msgPUBCOMP(sess->sess_buf, mid);
172: if (siz == -1) {
173: ioDEBUG(5, "Error:: in msgPUBCOMP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
174: return 0;
175: }
176: if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1)
177: ioSYSERR(0);
178: else {
179: ioDEBUG(5, "Sended %d bytes.", siz);
180: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
181: }
1.2.2.1 misho 182:
183: return 0;
184: }
185:
186: int
1.2.2.9 misho 187: cmdSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1 misho 188: {
189: struct tagSession *sess = (struct tagSession*) arg;
1.2.2.7 misho 190: mqtt_subscr_t *subs = NULL;
191: int siz = 0;
192: u_short mid = 0;
193: register int i;
194: struct tagStore *store;
1.2.2.17 misho 195: char buf[BUFSIZ];
196: void *ptr;
1.2.2.21! misho 197: struct tagPkt *p = NULL;
1.2.2.1 misho 198:
199: ioTRACE(2);
200:
201: if (!sess)
202: return -1;
203:
1.2.2.7 misho 204: ioDEBUG(5, "Exec SUBSCRIBE session");
205: siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
206: if (siz == -1) {
207: ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
208: return 0;
209: }
210:
211: /* add to db */
212: for (i = 0; i < siz; i++) {
1.2.2.17 misho 213: /* convert topic to sql search statement */
214: if (mqtt_sqlTopic(subs[i].sub_topic.msg_base, buf, sizeof buf) == -1) {
215: ioDEBUG(5, "Error:: in db #%d - %s", mqtt_GetErrno(), mqtt_GetError());
216: goto end;
217: }
218: if (call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, buf,
1.2.2.8 misho 219: sess->sess_user, sess->sess_addr, subs[i].sub_ret) > 0) {
1.2.2.18 misho 220: store = io_malloc(sizeof(struct tagStore));
1.2.2.7 misho 221: if (!store) {
222: ioSYSERR(0);
223: goto end;
224: } else {
225: store->st_msgid = mid;
1.2.2.8 misho 226: mqtt_subCopy(&store->st_subscr, &subs[i]);
1.2.2.7 misho 227: }
228:
229: /* add to cache */
230: SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
231:
1.2.2.17 misho 232: /* convert topic to regexp */
233: if (mqtt_expandTopic(subs[i].sub_topic.msg_base, buf, sizeof buf, 1, 0) == -1) {
234: ioDEBUG(5, "Error:: in regexp #%d - %s", mqtt_GetErrno(), mqtt_GetError());
235: goto end;
236: } else {
237: ptr = realloc(store->st_subscr.sub_topic.msg_base, strlen(buf) + 1);
238: if (!ptr) {
239: ioSYSERR(0);
240: goto end;
241: } else {
242: store->st_subscr.sub_topic.msg_base = ptr;
243: store->st_subscr.sub_topic.msg_len = strlen(buf) + 1;
244: memcpy(store->st_subscr.sub_topic.msg_base, buf,
245: store->st_subscr.sub_topic.msg_len);
246: }
247: }
248:
249: call.LOG(logg, "Added [%s] SUBSCRIBE '%s'(%d) from %s\n", sess->sess_cid,
250: store->st_subscr.sub_topic.msg_base,
251: store->st_subscr.sub_topic.msg_len, sess->sess_addr);
252:
1.2.2.7 misho 253: subs[i].sub_ret = MQTT_QOS_PASS;
254: } else
255: subs[i].sub_ret = MQTT_QOS_DENY;
256: }
1.2.2.1 misho 257:
1.2.2.7 misho 258: /* send acknowledge */
259: siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
260: if (siz == -1) {
261: ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
262: goto end;
1.2.2.21! misho 263: } else {
! 264: p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.8 misho 265: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
266: }
1.2.2.21! misho 267:
! 268: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.7 misho 269: end:
270: mqtt_subFree(&subs);
1.2.2.1 misho 271: return 0;
272: }
273:
274: int
1.2.2.9 misho 275: cmdUNSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1 misho 276: {
277: struct tagSession *sess = (struct tagSession*) arg;
1.2.2.13 misho 278: mqtt_subscr_t *subs = NULL;
279: int siz = 0;
280: u_short mid = 0;
281: register int i;
282: struct tagStore *store, *tmp;
1.2.2.21! misho 283: struct tagPkt *p = NULL;
1.2.2.1 misho 284:
285: ioTRACE(2);
286:
287: if (!sess)
288: return -1;
289:
1.2.2.13 misho 290: ioDEBUG(5, "Exec UNSUBSCRIBE session");
291: siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs);
292: if (siz == -1) {
293: ioDEBUG(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
294: return 0;
295: }
296:
297: /* del from db */
298: for (i = 0; i < siz; i++) {
299: SLIST_FOREACH_SAFE(store, &sess->sess_subscr, st_node, tmp) {
300: if (store->st_subscr.sub_ret == subs[i].sub_ret &&
301: store->st_subscr.sub_topic.msg_base &&
302: !strcmp(store->st_subscr.sub_topic.msg_base,
303: subs[i].sub_topic.msg_base)) {
304: SLIST_REMOVE(&sess->sess_subscr, store, tagStore, st_node);
1.2.2.14 misho 305:
306: if (store->st_subscr.sub_topic.msg_base)
307: free(store->st_subscr.sub_topic.msg_base);
308: if (store->st_subscr.sub_value.msg_base)
309: free(store->st_subscr.sub_value.msg_base);
1.2.2.18 misho 310: io_free(store);
1.2.2.13 misho 311: }
312: }
313:
1.2.2.15 misho 314: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, subs[i].sub_topic.msg_base,
315: sess->sess_user, "%");
1.2.2.13 misho 316: }
317:
318: /* send acknowledge */
319: siz = mqtt_msgUNSUBACK(sess->sess_buf, mid);
320: if (siz == -1) {
321: ioDEBUG(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
322: goto end;
1.2.2.21! misho 323: } else {
! 324: p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.13 misho 325: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
326: }
1.2.2.21! misho 327:
! 328: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.13 misho 329: end:
330: mqtt_subFree(&subs);
1.2.2.1 misho 331: return 0;
332: }
333:
334: int
1.2.2.9 misho 335: cmdPINGREQ(void *srv, int len, void *arg)
1.2.2.1 misho 336: {
337: struct tagSession *sess = (struct tagSession*) arg;
1.2.2.2 misho 338: int siz = 0;
1.2.2.19 misho 339: struct tagPkt *p = NULL;
1.2.2.1 misho 340:
341: ioTRACE(2);
342:
343: if (!sess)
344: return -1;
345:
1.2.2.7 misho 346: ioDEBUG(5, "Exec PINGREQ session");
1.2.2.2 misho 347: siz = mqtt_msgPINGRESP(sess->sess_buf);
348: if (siz == -1) {
349: ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
350: return 0;
1.2.2.8 misho 351: } else {
1.2.2.19 misho 352: p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.8 misho 353: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
354: }
1.2.2.1 misho 355:
1.2.2.19 misho 356: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.1 misho 357: return 0;
358: }
359:
360: int
1.2.2.9 misho 361: cmdCONNECT(void *srv, int len, void *arg)
1.2.2.1 misho 362: {
363: struct tagStore *store;
1.2.2.19 misho 364: struct tagPkt *p;
1.2.2.1 misho 365: struct tagSession *sess = (struct tagSession*) arg;
366:
367: ioTRACE(2);
368:
369: if (!sess)
370: return -1;
371:
1.2.2.6 misho 372: ioDEBUG(5, "Exec CONNECT session");
1.2.2.1 misho 373: TAILQ_REMOVE(&Sessions, sess, sess_node);
374:
1.2.2.16 misho 375: if (sess->sess_clean) {
376: if (call.FiniSessPUB)
377: call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
378: if (call.DeletePUB_subscribe)
379: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%");
380: if (call.WipePUB_topic)
381: call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1);
382: }
1.2.2.1 misho 383:
1.2.2.6 misho 384: while ((store = SLIST_FIRST(&sess->sess_subscr))) {
385: SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
1.2.2.3 misho 386:
1.2.2.6 misho 387: if (store->st_subscr.sub_topic.msg_base)
388: free(store->st_subscr.sub_topic.msg_base);
389: if (store->st_subscr.sub_value.msg_base)
390: free(store->st_subscr.sub_value.msg_base);
391:
1.2.2.18 misho 392: io_free(store);
1.2.2.6 misho 393: }
1.2.2.1 misho 394:
1.2.2.19 misho 395: while ((p = SLIST_FIRST(&sess->sess_sndpkt))) {
396: SLIST_REMOVE_HEAD(&sess->sess_sndpkt, pkt_node);
397:
398: io_freeVar(&p->pkt_data);
399: io_free(p);
400: }
401:
1.2.2.1 misho 402: if (sess->sess_will.msg)
403: free(sess->sess_will.msg);
404: if (sess->sess_will.topic)
405: free(sess->sess_will.topic);
406:
407: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
408: sess->sess_addr, sess->sess_user);
1.2.2.9 misho 409:
1.2.2.12 misho 410: return -3; /* reconnect client */
1.2.2.1 misho 411: }
412:
413: int
1.2.2.9 misho 414: cmdDISCONNECT(void *srv, int len, void *arg)
1.2.2.1 misho 415: {
416: struct tagSession *sess = (struct tagSession*) arg;
417:
418: ioTRACE(2);
419:
420: if (!sess)
421: return -1;
422:
1.2.2.5 misho 423: ioDEBUG(5, "Exec DISCONNECT session");
1.2.2.10 misho 424:
1.2.2.1 misho 425: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
426: sess->sess_addr, sess->sess_user);
1.2.2.9 misho 427:
1.2.2.10 misho 428: return -2; /* must terminate dispatcher */
1.2.2.1 misho 429: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>