Annotation of mqtt/src/mqttd_calls.c, revision 1.2.2.19
1.2 misho 1: #include "global.h"
2: #include "mqttd.h"
1.2.2.1 misho 3: #include "rtlm.h"
1.2 misho 4: #include "mqttd_calls.h"
5:
6:
1.2.2.19! misho 7: static inline struct tagPkt *
! 8: mkPkt(void * __restrict data, int dlen)
! 9: {
! 10: struct tagPkt *p = NULL;
! 11:
! 12: p = io_malloc(sizeof(struct tagPkt));
! 13: if (!p) {
! 14: ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());
! 15: return NULL;
! 16: } else
! 17: memset(p, 0, sizeof(struct tagPkt));
! 18:
! 19: p->pkt_data = io_allocVar();
! 20: if (!p->pkt_data) {
! 21: ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());
! 22: io_free(p);
! 23: return NULL;
! 24: }
! 25:
! 26: if (data && dlen > 0)
! 27: AIT_SET_BUF(p->pkt_data, data, dlen);
! 28:
! 29: return p;
! 30: }
! 31:
! 32: static inline void
! 33: freePkt(struct tagPkt ** __restrict p)
! 34: {
! 35: if (!p || !*p)
! 36: return;
! 37:
! 38: io_freeVar(&(*p)->pkt_data);
! 39: io_free(*p);
! 40: *p = NULL;
! 41: }
! 42:
! 43: static void *
! 44: sendPacket(sched_task_t *task)
! 45: {
! 46: struct tagPkt *p = TASK_ARG(task);
! 47: register int n, slen;
! 48: u_char *pos;
! 49:
! 50: if (!p || !p->pkt_data || AIT_ISEMPTY(p->pkt_data)) {
! 51: ioDEBUG(9, "Error:: invalid packet or found empty content ...");
! 52: return NULL;
! 53: }
! 54:
! 55: for (slen = AIT_LEN(p->pkt_data), pos = AIT_GET_BUF(p->pkt_data); slen > 0;
! 56: slen -= n, pos += n) {
! 57: n = send(TASK_FD(task), pos, slen, MSG_NOSIGNAL);
! 58: if (n == -1) {
! 59: ioSYSERR(0);
! 60: break;
! 61: }
! 62: }
! 63:
! 64: return NULL;
! 65: }
! 66:
! 67:
1.2.2.17 misho 68: static int
69: pubOnce(struct tagSession *sess, u_short mid, char * __restrict psTopic,
70: int topicLen, char * __restrict data, int datlen)
71: {
72: return 0;
73: }
74:
75: static int
76: pubAck(struct tagSession *sess, u_short mid, char * __restrict psTopic,
77: int topicLen, char * __restrict data, int datlen)
78: {
79: return 0;
80: }
81:
82: static int
83: pubExactly(struct tagSession *sess, u_short mid, char * __restrict psTopic,
84: int topicLen, char * __restrict data, int datlen)
85: {
86: return 0;
87: }
88:
89:
1.2 misho 90: int
1.2.2.9 misho 91: cmdPUBLISH(void *srv, int len, void *arg)
1.2 misho 92: {
93: struct mqtthdr *hdr;
1.2.2.1 misho 94: struct tagSession *sess = (struct tagSession*) arg;
1.2.2.17 misho 95: void *data = NULL;
96: char szTopic[STRSIZ] = { 0 };
97: int siz = 0;
98: u_short mid = 0;
1.2 misho 99:
100: ioTRACE(2);
101:
102: if (!sess)
103: return -1;
104:
1.2.2.17 misho 105: ioDEBUG(5, "Exec PUBLISH session");
106: siz = mqtt_readPUBLISH(sess->sess_buf, szTopic, sizeof szTopic, &mid, &data);
107: if (siz == -1) {
108: ioDEBUG(5, "Error:: in readPUBLISH #%d - %s", mqtt_GetErrno(), mqtt_GetError());
109: return 0;
110: }
111:
1.2 misho 112: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
113: switch (hdr->mqtt_msg.qos) {
114: case MQTT_QOS_ACK:
1.2.2.17 misho 115: pubAck(sess, mid, szTopic, sizeof szTopic, data, siz);
116: siz = mqtt_msgPUBACK(sess->sess_buf, mid);
117: if (siz == -1) {
118: ioDEBUG(5, "Error:: in msgPUBACK #%d - %s",
119: mqtt_GetErrno(), mqtt_GetError());
120: goto end;
121: }
1.2 misho 122: break;
123: case MQTT_QOS_EXACTLY:
1.2.2.17 misho 124: pubExactly(sess, mid, szTopic, sizeof szTopic, data, siz);
125: siz = mqtt_msgPUBREC(sess->sess_buf, mid);
126: if (siz == -1) {
127: ioDEBUG(5, "Error:: in msgPUBREC #%d - %s",
128: mqtt_GetErrno(), mqtt_GetError());
129: goto end;
130: }
1.2 misho 131: break;
1.2.2.17 misho 132: case MQTT_QOS_ONCE:
133: pubOnce(sess, mid, szTopic, sizeof szTopic, data, siz);
1.2 misho 134: default:
1.2.2.17 misho 135: goto end;
1.2 misho 136: }
137:
1.2.2.17 misho 138: if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1)
139: ioSYSERR(0);
140: else {
141: ioDEBUG(5, "Sended %d bytes.", siz);
142: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
143: }
144: end:
145: if (data)
146: free(data);
1.2 misho 147: return 0;
148: }
1.2.2.1 misho 149:
150: int
1.2.2.9 misho 151: cmdPUBREL(void *srv, int len, void *arg)
1.2.2.1 misho 152: {
153: struct tagSession *sess = (struct tagSession*) arg;
1.2.2.17 misho 154: int siz = 0;
155: u_short mid = 0;
1.2.2.1 misho 156:
157: ioTRACE(2);
158:
159: if (!sess)
160: return -1;
161:
1.2.2.17 misho 162: ioDEBUG(5, "Exec PUBREL session");
163: mid = mqtt_readPUBREL(sess->sess_buf);
164: if (mid == (u_short) -1) {
165: ioDEBUG(5, "Error:: in readPUBREL #%d - %s", mqtt_GetErrno(), mqtt_GetError());
166: return 0;
167: }
168:
169: // TODO:: Delete from database topic
170:
171: siz = mqtt_msgPUBCOMP(sess->sess_buf, mid);
172: if (siz == -1) {
173: ioDEBUG(5, "Error:: in msgPUBCOMP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
174: return 0;
175: }
176: if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1)
177: ioSYSERR(0);
178: else {
179: ioDEBUG(5, "Sended %d bytes.", siz);
180: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
181: }
1.2.2.1 misho 182:
183: return 0;
184: }
185:
186: int
1.2.2.9 misho 187: cmdSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1 misho 188: {
189: struct tagSession *sess = (struct tagSession*) arg;
1.2.2.7 misho 190: mqtt_subscr_t *subs = NULL;
191: int siz = 0;
192: u_short mid = 0;
193: register int i;
194: struct tagStore *store;
1.2.2.17 misho 195: char buf[BUFSIZ];
196: void *ptr;
1.2.2.1 misho 197:
198: ioTRACE(2);
199:
200: if (!sess)
201: return -1;
202:
1.2.2.7 misho 203: ioDEBUG(5, "Exec SUBSCRIBE session");
204: siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
205: if (siz == -1) {
206: ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
207: return 0;
208: }
209:
210: /* add to db */
211: for (i = 0; i < siz; i++) {
1.2.2.17 misho 212: /* convert topic to sql search statement */
213: if (mqtt_sqlTopic(subs[i].sub_topic.msg_base, buf, sizeof buf) == -1) {
214: ioDEBUG(5, "Error:: in db #%d - %s", mqtt_GetErrno(), mqtt_GetError());
215: goto end;
216: }
217: if (call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, buf,
1.2.2.8 misho 218: sess->sess_user, sess->sess_addr, subs[i].sub_ret) > 0) {
1.2.2.18 misho 219: store = io_malloc(sizeof(struct tagStore));
1.2.2.7 misho 220: if (!store) {
221: ioSYSERR(0);
222: goto end;
223: } else {
224: store->st_msgid = mid;
1.2.2.8 misho 225: mqtt_subCopy(&store->st_subscr, &subs[i]);
1.2.2.7 misho 226: }
227:
228: /* add to cache */
229: SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
230:
1.2.2.17 misho 231: /* convert topic to regexp */
232: if (mqtt_expandTopic(subs[i].sub_topic.msg_base, buf, sizeof buf, 1, 0) == -1) {
233: ioDEBUG(5, "Error:: in regexp #%d - %s", mqtt_GetErrno(), mqtt_GetError());
234: goto end;
235: } else {
236: ptr = realloc(store->st_subscr.sub_topic.msg_base, strlen(buf) + 1);
237: if (!ptr) {
238: ioSYSERR(0);
239: goto end;
240: } else {
241: store->st_subscr.sub_topic.msg_base = ptr;
242: store->st_subscr.sub_topic.msg_len = strlen(buf) + 1;
243: memcpy(store->st_subscr.sub_topic.msg_base, buf,
244: store->st_subscr.sub_topic.msg_len);
245: }
246: }
247:
248: call.LOG(logg, "Added [%s] SUBSCRIBE '%s'(%d) from %s\n", sess->sess_cid,
249: store->st_subscr.sub_topic.msg_base,
250: store->st_subscr.sub_topic.msg_len, sess->sess_addr);
251:
1.2.2.7 misho 252: subs[i].sub_ret = MQTT_QOS_PASS;
253: } else
254: subs[i].sub_ret = MQTT_QOS_DENY;
255: }
1.2.2.1 misho 256:
1.2.2.7 misho 257: /* send acknowledge */
258: siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
259: if (siz == -1) {
260: ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
261: goto end;
262: }
1.2.2.13 misho 263: if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1)
1.2.2.7 misho 264: ioSYSERR(0);
1.2.2.8 misho 265: else {
1.2.2.7 misho 266: ioDEBUG(5, "Sended %d bytes.", siz);
1.2.2.8 misho 267: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
268: }
1.2.2.7 misho 269: end:
270: mqtt_subFree(&subs);
1.2.2.1 misho 271: return 0;
272: }
273:
274: int
1.2.2.9 misho 275: cmdUNSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1 misho 276: {
277: struct tagSession *sess = (struct tagSession*) arg;
1.2.2.13 misho 278: mqtt_subscr_t *subs = NULL;
279: int siz = 0;
280: u_short mid = 0;
281: register int i;
282: struct tagStore *store, *tmp;
1.2.2.1 misho 283:
284: ioTRACE(2);
285:
286: if (!sess)
287: return -1;
288:
1.2.2.13 misho 289: ioDEBUG(5, "Exec UNSUBSCRIBE session");
290: siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs);
291: if (siz == -1) {
292: ioDEBUG(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
293: return 0;
294: }
295:
296: /* del from db */
297: for (i = 0; i < siz; i++) {
298: SLIST_FOREACH_SAFE(store, &sess->sess_subscr, st_node, tmp) {
299: if (store->st_subscr.sub_ret == subs[i].sub_ret &&
300: store->st_subscr.sub_topic.msg_base &&
301: !strcmp(store->st_subscr.sub_topic.msg_base,
302: subs[i].sub_topic.msg_base)) {
303: SLIST_REMOVE(&sess->sess_subscr, store, tagStore, st_node);
1.2.2.14 misho 304:
305: if (store->st_subscr.sub_topic.msg_base)
306: free(store->st_subscr.sub_topic.msg_base);
307: if (store->st_subscr.sub_value.msg_base)
308: free(store->st_subscr.sub_value.msg_base);
1.2.2.18 misho 309: io_free(store);
1.2.2.13 misho 310: }
311: }
312:
1.2.2.15 misho 313: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, subs[i].sub_topic.msg_base,
314: sess->sess_user, "%");
1.2.2.13 misho 315: }
316:
317: /* send acknowledge */
318: siz = mqtt_msgUNSUBACK(sess->sess_buf, mid);
319: if (siz == -1) {
320: ioDEBUG(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
321: goto end;
322: }
323: if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1)
324: ioSYSERR(0);
325: else {
326: ioDEBUG(5, "Sended %d bytes.", siz);
327: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
328: }
329: end:
330: mqtt_subFree(&subs);
1.2.2.1 misho 331: return 0;
332: }
333:
334: int
1.2.2.9 misho 335: cmdPINGREQ(void *srv, int len, void *arg)
1.2.2.1 misho 336: {
337: struct tagSession *sess = (struct tagSession*) arg;
1.2.2.2 misho 338: int siz = 0;
1.2.2.19! misho 339: struct tagPkt *p = NULL;
1.2.2.1 misho 340:
341: ioTRACE(2);
342:
343: if (!sess)
344: return -1;
345:
1.2.2.7 misho 346: ioDEBUG(5, "Exec PINGREQ session");
1.2.2.2 misho 347: siz = mqtt_msgPINGRESP(sess->sess_buf);
348: if (siz == -1) {
349: ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
350: return 0;
1.2.2.8 misho 351: } else {
1.2.2.19! misho 352: p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.8 misho 353: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
354: }
1.2.2.1 misho 355:
1.2.2.19! misho 356: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.1 misho 357: return 0;
358: }
359:
360: int
1.2.2.9 misho 361: cmdCONNECT(void *srv, int len, void *arg)
1.2.2.1 misho 362: {
363: struct tagStore *store;
1.2.2.19! misho 364: struct tagPkt *p;
1.2.2.1 misho 365: struct tagSession *sess = (struct tagSession*) arg;
366:
367: ioTRACE(2);
368:
369: if (!sess)
370: return -1;
371:
1.2.2.6 misho 372: ioDEBUG(5, "Exec CONNECT session");
1.2.2.1 misho 373: TAILQ_REMOVE(&Sessions, sess, sess_node);
374:
1.2.2.16 misho 375: if (sess->sess_clean) {
376: if (call.FiniSessPUB)
377: call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
378: if (call.DeletePUB_subscribe)
379: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%");
380: if (call.WipePUB_topic)
381: call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1);
382: }
1.2.2.1 misho 383:
1.2.2.6 misho 384: while ((store = SLIST_FIRST(&sess->sess_subscr))) {
385: SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
1.2.2.3 misho 386:
1.2.2.6 misho 387: if (store->st_subscr.sub_topic.msg_base)
388: free(store->st_subscr.sub_topic.msg_base);
389: if (store->st_subscr.sub_value.msg_base)
390: free(store->st_subscr.sub_value.msg_base);
391:
1.2.2.18 misho 392: io_free(store);
1.2.2.6 misho 393: }
1.2.2.1 misho 394:
1.2.2.19! misho 395: while ((p = SLIST_FIRST(&sess->sess_sndpkt))) {
! 396: SLIST_REMOVE_HEAD(&sess->sess_sndpkt, pkt_node);
! 397:
! 398: io_freeVar(&p->pkt_data);
! 399: io_free(p);
! 400: }
! 401:
1.2.2.1 misho 402: if (sess->sess_will.msg)
403: free(sess->sess_will.msg);
404: if (sess->sess_will.topic)
405: free(sess->sess_will.topic);
406:
407: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
408: sess->sess_addr, sess->sess_user);
1.2.2.9 misho 409:
1.2.2.12 misho 410: return -3; /* reconnect client */
1.2.2.1 misho 411: }
412:
413: int
1.2.2.9 misho 414: cmdDISCONNECT(void *srv, int len, void *arg)
1.2.2.1 misho 415: {
416: struct tagSession *sess = (struct tagSession*) arg;
417:
418: ioTRACE(2);
419:
420: if (!sess)
421: return -1;
422:
1.2.2.5 misho 423: ioDEBUG(5, "Exec DISCONNECT session");
1.2.2.10 misho 424:
1.2.2.1 misho 425: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
426: sess->sess_addr, sess->sess_user);
1.2.2.9 misho 427:
1.2.2.10 misho 428: return -2; /* must terminate dispatcher */
1.2.2.1 misho 429: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>