Annotation of mqtt/src/mqttd_calls.c, revision 1.2.2.17
1.2 misho 1: #include "global.h"
2: #include "mqttd.h"
1.2.2.1 misho 3: #include "rtlm.h"
1.2 misho 4: #include "mqttd_calls.h"
5:
6:
1.2.2.17! misho 7: static int
! 8: pubOnce(struct tagSession *sess, u_short mid, char * __restrict psTopic,
! 9: int topicLen, char * __restrict data, int datlen)
! 10: {
! 11: return 0;
! 12: }
! 13:
! 14: static int
! 15: pubAck(struct tagSession *sess, u_short mid, char * __restrict psTopic,
! 16: int topicLen, char * __restrict data, int datlen)
! 17: {
! 18: return 0;
! 19: }
! 20:
! 21: static int
! 22: pubExactly(struct tagSession *sess, u_short mid, char * __restrict psTopic,
! 23: int topicLen, char * __restrict data, int datlen)
! 24: {
! 25: return 0;
! 26: }
! 27:
! 28:
1.2 misho 29: int
1.2.2.9 misho 30: cmdPUBLISH(void *srv, int len, void *arg)
1.2 misho 31: {
32: struct mqtthdr *hdr;
1.2.2.1 misho 33: struct tagSession *sess = (struct tagSession*) arg;
1.2.2.17! misho 34: void *data = NULL;
! 35: char szTopic[STRSIZ] = { 0 };
! 36: int siz = 0;
! 37: u_short mid = 0;
1.2 misho 38:
39: ioTRACE(2);
40:
41: if (!sess)
42: return -1;
43:
1.2.2.17! misho 44: ioDEBUG(5, "Exec PUBLISH session");
! 45: siz = mqtt_readPUBLISH(sess->sess_buf, szTopic, sizeof szTopic, &mid, &data);
! 46: if (siz == -1) {
! 47: ioDEBUG(5, "Error:: in readPUBLISH #%d - %s", mqtt_GetErrno(), mqtt_GetError());
! 48: return 0;
! 49: }
! 50:
1.2 misho 51: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
52: switch (hdr->mqtt_msg.qos) {
53: case MQTT_QOS_ACK:
1.2.2.17! misho 54: pubAck(sess, mid, szTopic, sizeof szTopic, data, siz);
! 55: siz = mqtt_msgPUBACK(sess->sess_buf, mid);
! 56: if (siz == -1) {
! 57: ioDEBUG(5, "Error:: in msgPUBACK #%d - %s",
! 58: mqtt_GetErrno(), mqtt_GetError());
! 59: goto end;
! 60: }
1.2 misho 61: break;
62: case MQTT_QOS_EXACTLY:
1.2.2.17! misho 63: pubExactly(sess, mid, szTopic, sizeof szTopic, data, siz);
! 64: siz = mqtt_msgPUBREC(sess->sess_buf, mid);
! 65: if (siz == -1) {
! 66: ioDEBUG(5, "Error:: in msgPUBREC #%d - %s",
! 67: mqtt_GetErrno(), mqtt_GetError());
! 68: goto end;
! 69: }
1.2 misho 70: break;
1.2.2.17! misho 71: case MQTT_QOS_ONCE:
! 72: pubOnce(sess, mid, szTopic, sizeof szTopic, data, siz);
1.2 misho 73: default:
1.2.2.17! misho 74: goto end;
1.2 misho 75: }
76:
1.2.2.17! misho 77: if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1)
! 78: ioSYSERR(0);
! 79: else {
! 80: ioDEBUG(5, "Sended %d bytes.", siz);
! 81: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
! 82: }
! 83: end:
! 84: if (data)
! 85: free(data);
1.2 misho 86: return 0;
87: }
1.2.2.1 misho 88:
89: int
1.2.2.9 misho 90: cmdPUBREL(void *srv, int len, void *arg)
1.2.2.1 misho 91: {
92: struct tagSession *sess = (struct tagSession*) arg;
1.2.2.17! misho 93: int siz = 0;
! 94: u_short mid = 0;
1.2.2.1 misho 95:
96: ioTRACE(2);
97:
98: if (!sess)
99: return -1;
100:
1.2.2.17! misho 101: ioDEBUG(5, "Exec PUBREL session");
! 102: mid = mqtt_readPUBREL(sess->sess_buf);
! 103: if (mid == (u_short) -1) {
! 104: ioDEBUG(5, "Error:: in readPUBREL #%d - %s", mqtt_GetErrno(), mqtt_GetError());
! 105: return 0;
! 106: }
! 107:
! 108: // TODO:: Delete from database topic
! 109:
! 110: siz = mqtt_msgPUBCOMP(sess->sess_buf, mid);
! 111: if (siz == -1) {
! 112: ioDEBUG(5, "Error:: in msgPUBCOMP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
! 113: return 0;
! 114: }
! 115: if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1)
! 116: ioSYSERR(0);
! 117: else {
! 118: ioDEBUG(5, "Sended %d bytes.", siz);
! 119: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
! 120: }
1.2.2.1 misho 121:
122: return 0;
123: }
124:
125: int
1.2.2.9 misho 126: cmdSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1 misho 127: {
128: struct tagSession *sess = (struct tagSession*) arg;
1.2.2.7 misho 129: mqtt_subscr_t *subs = NULL;
130: int siz = 0;
131: u_short mid = 0;
132: register int i;
133: struct tagStore *store;
1.2.2.17! misho 134: char buf[BUFSIZ];
! 135: void *ptr;
1.2.2.1 misho 136:
137: ioTRACE(2);
138:
139: if (!sess)
140: return -1;
141:
1.2.2.7 misho 142: ioDEBUG(5, "Exec SUBSCRIBE session");
143: siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
144: if (siz == -1) {
145: ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
146: return 0;
147: }
148:
149: /* add to db */
150: for (i = 0; i < siz; i++) {
1.2.2.17! misho 151: /* convert topic to sql search statement */
! 152: if (mqtt_sqlTopic(subs[i].sub_topic.msg_base, buf, sizeof buf) == -1) {
! 153: ioDEBUG(5, "Error:: in db #%d - %s", mqtt_GetErrno(), mqtt_GetError());
! 154: goto end;
! 155: }
! 156: if (call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, buf,
1.2.2.8 misho 157: sess->sess_user, sess->sess_addr, subs[i].sub_ret) > 0) {
1.2.2.7 misho 158: store = malloc(sizeof(struct tagStore));
159: if (!store) {
160: ioSYSERR(0);
161: goto end;
162: } else {
163: store->st_msgid = mid;
1.2.2.8 misho 164: mqtt_subCopy(&store->st_subscr, &subs[i]);
1.2.2.7 misho 165: }
166:
167: /* add to cache */
168: SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
169:
1.2.2.17! misho 170: /* convert topic to regexp */
! 171: if (mqtt_expandTopic(subs[i].sub_topic.msg_base, buf, sizeof buf, 1, 0) == -1) {
! 172: ioDEBUG(5, "Error:: in regexp #%d - %s", mqtt_GetErrno(), mqtt_GetError());
! 173: goto end;
! 174: } else {
! 175: ptr = realloc(store->st_subscr.sub_topic.msg_base, strlen(buf) + 1);
! 176: if (!ptr) {
! 177: ioSYSERR(0);
! 178: goto end;
! 179: } else {
! 180: store->st_subscr.sub_topic.msg_base = ptr;
! 181: store->st_subscr.sub_topic.msg_len = strlen(buf) + 1;
! 182: memcpy(store->st_subscr.sub_topic.msg_base, buf,
! 183: store->st_subscr.sub_topic.msg_len);
! 184: }
! 185: }
! 186:
! 187: call.LOG(logg, "Added [%s] SUBSCRIBE '%s'(%d) from %s\n", sess->sess_cid,
! 188: store->st_subscr.sub_topic.msg_base,
! 189: store->st_subscr.sub_topic.msg_len, sess->sess_addr);
! 190:
1.2.2.7 misho 191: subs[i].sub_ret = MQTT_QOS_PASS;
192: } else
193: subs[i].sub_ret = MQTT_QOS_DENY;
194: }
1.2.2.1 misho 195:
1.2.2.7 misho 196: /* send acknowledge */
197: siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
198: if (siz == -1) {
199: ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
200: goto end;
201: }
1.2.2.13 misho 202: if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1)
1.2.2.7 misho 203: ioSYSERR(0);
1.2.2.8 misho 204: else {
1.2.2.7 misho 205: ioDEBUG(5, "Sended %d bytes.", siz);
1.2.2.8 misho 206: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
207: }
1.2.2.7 misho 208: end:
209: mqtt_subFree(&subs);
1.2.2.1 misho 210: return 0;
211: }
212:
213: int
1.2.2.9 misho 214: cmdUNSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1 misho 215: {
216: struct tagSession *sess = (struct tagSession*) arg;
1.2.2.13 misho 217: mqtt_subscr_t *subs = NULL;
218: int siz = 0;
219: u_short mid = 0;
220: register int i;
221: struct tagStore *store, *tmp;
1.2.2.1 misho 222:
223: ioTRACE(2);
224:
225: if (!sess)
226: return -1;
227:
1.2.2.13 misho 228: ioDEBUG(5, "Exec UNSUBSCRIBE session");
229: siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs);
230: if (siz == -1) {
231: ioDEBUG(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
232: return 0;
233: }
234:
235: /* del from db */
236: for (i = 0; i < siz; i++) {
237: SLIST_FOREACH_SAFE(store, &sess->sess_subscr, st_node, tmp) {
238: if (store->st_subscr.sub_ret == subs[i].sub_ret &&
239: store->st_subscr.sub_topic.msg_base &&
240: !strcmp(store->st_subscr.sub_topic.msg_base,
241: subs[i].sub_topic.msg_base)) {
242: SLIST_REMOVE(&sess->sess_subscr, store, tagStore, st_node);
1.2.2.14 misho 243:
244: if (store->st_subscr.sub_topic.msg_base)
245: free(store->st_subscr.sub_topic.msg_base);
246: if (store->st_subscr.sub_value.msg_base)
247: free(store->st_subscr.sub_value.msg_base);
1.2.2.13 misho 248: free(store);
249: }
250: }
251:
1.2.2.15 misho 252: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, subs[i].sub_topic.msg_base,
253: sess->sess_user, "%");
1.2.2.13 misho 254: }
255:
256: /* send acknowledge */
257: siz = mqtt_msgUNSUBACK(sess->sess_buf, mid);
258: if (siz == -1) {
259: ioDEBUG(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
260: goto end;
261: }
262: if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1)
263: ioSYSERR(0);
264: else {
265: ioDEBUG(5, "Sended %d bytes.", siz);
266: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
267: }
268: end:
269: mqtt_subFree(&subs);
1.2.2.1 misho 270: return 0;
271: }
272:
273: int
1.2.2.9 misho 274: cmdPINGREQ(void *srv, int len, void *arg)
1.2.2.1 misho 275: {
276: struct tagSession *sess = (struct tagSession*) arg;
1.2.2.2 misho 277: int siz = 0;
1.2.2.1 misho 278:
279: ioTRACE(2);
280:
281: if (!sess)
282: return -1;
283:
1.2.2.7 misho 284: ioDEBUG(5, "Exec PINGREQ session");
1.2.2.2 misho 285: siz = mqtt_msgPINGRESP(sess->sess_buf);
286: if (siz == -1) {
287: ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
288: return 0;
289: }
1.2.2.13 misho 290: if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1) {
1.2.2.2 misho 291: ioSYSERR(0);
292: return 0;
1.2.2.8 misho 293: } else {
1.2.2.2 misho 294: ioDEBUG(5, "Sended %d bytes.", siz);
1.2.2.8 misho 295: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
296: }
1.2.2.1 misho 297:
298: return 0;
299: }
300:
301: int
1.2.2.9 misho 302: cmdCONNECT(void *srv, int len, void *arg)
1.2.2.1 misho 303: {
304: struct tagStore *store;
305: struct tagSession *sess = (struct tagSession*) arg;
306:
307: ioTRACE(2);
308:
309: if (!sess)
310: return -1;
311:
1.2.2.6 misho 312: ioDEBUG(5, "Exec CONNECT session");
1.2.2.1 misho 313: TAILQ_REMOVE(&Sessions, sess, sess_node);
314:
1.2.2.16 misho 315: if (sess->sess_clean) {
316: if (call.FiniSessPUB)
317: call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
318: if (call.DeletePUB_subscribe)
319: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%");
320: if (call.WipePUB_topic)
321: call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1);
322: }
1.2.2.1 misho 323:
1.2.2.6 misho 324: while ((store = SLIST_FIRST(&sess->sess_subscr))) {
325: SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
1.2.2.3 misho 326:
1.2.2.6 misho 327: if (store->st_subscr.sub_topic.msg_base)
328: free(store->st_subscr.sub_topic.msg_base);
329: if (store->st_subscr.sub_value.msg_base)
330: free(store->st_subscr.sub_value.msg_base);
331:
332: free(store);
333: }
1.2.2.1 misho 334:
335: if (sess->sess_will.msg)
336: free(sess->sess_will.msg);
337: if (sess->sess_will.topic)
338: free(sess->sess_will.topic);
339:
340: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
341: sess->sess_addr, sess->sess_user);
1.2.2.9 misho 342:
1.2.2.12 misho 343: return -3; /* reconnect client */
1.2.2.1 misho 344: }
345:
346: int
1.2.2.9 misho 347: cmdDISCONNECT(void *srv, int len, void *arg)
1.2.2.1 misho 348: {
349: struct tagSession *sess = (struct tagSession*) arg;
350:
351: ioTRACE(2);
352:
353: if (!sess)
354: return -1;
355:
1.2.2.5 misho 356: ioDEBUG(5, "Exec DISCONNECT session");
1.2.2.10 misho 357:
1.2.2.1 misho 358: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
359: sess->sess_addr, sess->sess_user);
1.2.2.9 misho 360:
1.2.2.10 misho 361: return -2; /* must terminate dispatcher */
1.2.2.1 misho 362: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>