Annotation of libaitmqtt/src/sub.c, revision 1.3.12.1
1.1 misho 1: /*************************************************************************
2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.3.12.1! misho 6: * $Id: sub.c,v 1.3 2012/06/28 11:06:17 misho Exp $
1.1 misho 7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
1.3.12.1! misho 15: Copyright 2004 - 2022
1.1 misho 16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47:
48:
49: /*
50: * mqtt_msgSUBSCRIBE() Create SUBSCRIBE message
51: *
52: * @Topics = MQTT subscription topics
53: * @msgID = MessageID
54: * @Dup = Duplicate message
55: * @QOS = QoS
1.3.12.1! misho 56: * return: NULL error or allocated SUBSCRIBE message
1.1 misho 57: */
1.3.12.1! misho 58: mqtt_msg_t *
! 59: mqtt_msgSUBSCRIBE(mqtt_subscr_t ** __restrict Topics, u_short msgID,
! 60: u_char Dup, u_char QOS)
1.1 misho 61: {
1.3.12.1! misho 62: int len, siz;
1.3 misho 63: u_int n, *l;
1.1 misho 64: struct mqtthdr *hdr;
65: mqtthdr_var_t *topic;
66: mqtt_len_t *mid;
67: mqtt_subscr_t *t;
68: u_char *qos;
1.2 misho 69: void *data;
1.3.12.1! misho 70: mqtt_msg_t *msg = NULL;
1.1 misho 71:
1.3.12.1! misho 72: if (!Topics || !*Topics)
! 73: return NULL;
1.1 misho 74: if (QOS > MQTT_QOS_EXACTLY) {
1.2 misho 75: mqtt_SetErr(EINVAL, "Invalid QoS parameter");
1.3.12.1! misho 76: return NULL;
1.1 misho 77: }
78: if (!msgID && QOS != MQTT_QOS_ONCE) {
1.2 misho 79: mqtt_SetErr(EINVAL, "Invalid MessageID parameter must be >0");
1.3.12.1! misho 80: return NULL;
1.1 misho 81: }
82:
1.2 misho 83: /* calculate message size */
84: len = sizeof(mqtt_len_t); /* msgid */
1.3.12.1! misho 85: for (t = *Topics; t && t->sub_topic.msg_base; t++) /* subscribes & qos */
! 86: len += sizeof(mqtt_len_t) + t->sub_topic.msg_len;
1.2 misho 87:
88: /* calculate header size */
89: siz = sizeof(struct mqtthdr); /* mqtt fixed header */
90: n = mqtt_encodeLen(len); /* message size */
91: siz += mqtt_sizeLen(n) - 1; /* length size */
92:
1.3.12.1! misho 93: if (!(msg = mqtt_msgAlloc(siz + len)))
! 94: return NULL;
1.1 misho 95: else {
1.3.12.1! misho 96: data = msg->msg_base;
1.2 misho 97: hdr = (struct mqtthdr *) data;
1.1 misho 98: }
99:
1.2 misho 100: /* fixed header */
101: hdr->mqtt_msg.type = MQTT_TYPE_SUBSCRIBE;
102: hdr->mqtt_msg.qos = QOS;
103: hdr->mqtt_msg.dup = Dup ? 1 : 0;
104: hdr->mqtt_msg.retain = 0;
1.3 misho 105: l = (u_int*) hdr->mqtt_len;
106: *l = n;
1.2 misho 107: data += siz;
108:
1.1 misho 109: /* variable header */
1.2 misho 110: mid = (mqtt_len_t*) data;
1.1 misho 111: mid->val = htons(msgID);
1.2 misho 112: data += sizeof(mqtt_len_t);
1.1 misho 113:
114: /* payload with subscriptions */
1.3.12.1! misho 115: for (t = *Topics; t && t->sub_topic.msg_base; t++) {
1.2 misho 116: topic = (mqtthdr_var_t*) data;
1.1 misho 117: topic->var_sb.val = htons(t->sub_topic.msg_len);
118: memcpy(topic->var_data, t->sub_topic.msg_base, ntohs(topic->var_sb.val));
1.2 misho 119: data += MQTTHDR_VAR_SIZEOF(topic);
120: qos = data++;
1.1 misho 121: *qos = t->sub_ret;
122: }
123:
1.3.12.1! misho 124: return msg;
1.1 misho 125: }
126:
127: /*
128: * mqtt_msgSUBACK() Create SUBACK message
129: *
130: * @Topics = MQTT subscription topics
131: * @msgID = MessageID
1.3.12.1! misho 132: * return: NULL error or allocated SUBACK message
1.1 misho 133: */
1.3.12.1! misho 134: mqtt_msg_t *
! 135: mqtt_msgSUBACK(mqtt_subscr_t ** __restrict Topics, u_short msgID)
1.1 misho 136: {
137: int siz = 0;
138: struct mqtthdr *hdr;
139: mqtt_len_t *v;
140: mqtt_subscr_t *t;
141: u_char *qos;
1.3.12.1! misho 142: mqtt_msg_t *msg = NULL;
1.1 misho 143:
1.3.12.1! misho 144: if (!Topics || !*Topics)
! 145: return NULL;
1.1 misho 146:
1.3.12.1! misho 147: if (!(msg = mqtt_msgAlloc(MQTTMSG_MAX)))
! 148: return NULL;
1.1 misho 149: else {
1.3.12.1! misho 150: hdr = (struct mqtthdr *) msg->msg_base;
! 151: siz = sizeof(struct mqtthdr);
! 152: v = (mqtt_len_t*) (msg->msg_base + siz);
1.1 misho 153: siz += sizeof(mqtt_len_t);
154: }
155:
156: /* MessageID */
157: v->val = htons(msgID);
158:
159: /* QoS payload from subscriptions */
1.3.12.1! misho 160: for (t = *Topics; t && t->sub_topic.msg_base; t++, siz++) {
! 161: qos = (msg->msg_base + siz);
1.1 misho 162: *qos = t->sub_ret;
163: }
164:
165: /* fixed header */
166: hdr->mqtt_msg.type = MQTT_TYPE_SUBACK;
167: *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
168:
1.3.12.1! misho 169: return msg;
1.1 misho 170: }
171:
172: /*
173: * mqtt_msgUNSUBSCRIBE() Create UNSUBSCRIBE message
174: *
175: * @Topics = MQTT subscription topics
176: * @msgID = MessageID
177: * @Dup = Duplicate message
178: * @QOS = QoS
1.3.12.1! misho 179: * return: NULL error or allocated UNSUBSCRIBE message
1.1 misho 180: */
1.3.12.1! misho 181: mqtt_msg_t *
! 182: mqtt_msgUNSUBSCRIBE(mqtt_subscr_t ** __restrict Topics, u_short msgID,
! 183: u_char Dup, u_char QOS)
1.1 misho 184: {
1.2 misho 185: int len, siz = 0;
1.3 misho 186: u_int n, *l;
1.1 misho 187: struct mqtthdr *hdr;
188: mqtthdr_var_t *topic;
189: mqtt_len_t *mid;
190: mqtt_subscr_t *t;
1.2 misho 191: void *data;
1.3.12.1! misho 192: mqtt_msg_t *msg = NULL;
1.1 misho 193:
1.3.12.1! misho 194: if (!Topics || !*Topics)
! 195: return NULL;
1.1 misho 196: if (QOS > MQTT_QOS_EXACTLY) {
1.2 misho 197: mqtt_SetErr(EINVAL, "Invalid QoS parameter");
1.3.12.1! misho 198: return NULL;
1.1 misho 199: }
200: if (!msgID && QOS != MQTT_QOS_ONCE) {
1.2 misho 201: mqtt_SetErr(EINVAL, "Invalid MessageID parameter must be >0");
1.3.12.1! misho 202: return NULL;
1.1 misho 203: }
204:
1.2 misho 205: /* calculate message size */
206: len = sizeof(mqtt_len_t); /* msgid */
1.3.12.1! misho 207: for (t = *Topics; t && t->sub_topic.msg_base; t++) /* subscribes */
1.2 misho 208: len += sizeof(mqtt_len_t) + t->sub_topic.msg_len;
209:
210: /* calculate header size */
211: siz = sizeof(struct mqtthdr); /* mqtt fixed header */
212: n = mqtt_encodeLen(len); /* message size */
213: siz += mqtt_sizeLen(n) - 1; /* length size */
214:
1.3.12.1! misho 215: if (!(msg = mqtt_msgAlloc(siz + len)))
! 216: return NULL;
1.1 misho 217: else {
1.3.12.1! misho 218: data = msg->msg_base;
1.2 misho 219: hdr = (struct mqtthdr *) data;
1.1 misho 220: }
221:
1.2 misho 222: /* fixed header */
223: hdr->mqtt_msg.type = MQTT_TYPE_UNSUBSCRIBE;
224: hdr->mqtt_msg.qos = QOS;
225: hdr->mqtt_msg.dup = Dup ? 1 : 0;
226: hdr->mqtt_msg.retain = 0;
1.3 misho 227: l = (u_int*) hdr->mqtt_len;
228: *l = n;
1.2 misho 229: data += siz;
230:
1.1 misho 231: /* variable header */
1.3.12.1! misho 232: mid = (mqtt_len_t*) (msg->msg_base + siz);
1.1 misho 233: mid->val = htons(msgID);
1.2 misho 234: data += sizeof(mqtt_len_t);
1.1 misho 235:
236: /* payload with subscriptions */
1.3.12.1! misho 237: for (t = *Topics; t && t->sub_topic.msg_base; t++) {
1.2 misho 238: topic = (mqtthdr_var_t*) data;
1.1 misho 239: topic->var_sb.val = htons(t->sub_topic.msg_len);
240: memcpy(topic->var_data, t->sub_topic.msg_base, ntohs(topic->var_sb.val));
1.2 misho 241: data += MQTTHDR_VAR_SIZEOF(topic);
1.1 misho 242: }
243:
1.3.12.1! misho 244: return msg;
1.1 misho 245: }
246:
247: /*
248: * mqtt_msgUNSUBACK() Create UNSUBACK message
249: *
250: * @msgID = MessageID
1.3.12.1! misho 251: * return: NULL error or allocated UNSUBACK message
1.1 misho 252: */
1.3.12.1! misho 253: mqtt_msg_t *
! 254: mqtt_msgUNSUBACK(u_short msgID)
1.1 misho 255: {
256: struct mqtthdr *hdr;
257: mqtt_len_t *v;
1.3.12.1! misho 258: mqtt_msg_t *msg = NULL;
1.1 misho 259:
1.3.12.1! misho 260: if (!(msg = mqtt_msgAlloc(sizeof(struct mqtthdr) + sizeof(mqtt_len_t))))
! 261: return NULL;
1.1 misho 262: else {
1.3.12.1! misho 263: hdr = (struct mqtthdr *) msg->msg_base;
! 264: v = (mqtt_len_t*) (msg->msg_base + sizeof(struct mqtthdr));
1.1 misho 265: }
266:
267: /* fixed header */
268: hdr->mqtt_msg.type = MQTT_TYPE_UNSUBACK;
269: *hdr->mqtt_len = sizeof(mqtt_len_t);
270:
271: /* MessageID */
272: v->val = htons(msgID);
273:
1.3.12.1! misho 274: return msg;
1.1 misho 275: }
276:
277:
278: /* ============= decode ============ */
279:
1.3.12.1! misho 280: #if 0
1.1 misho 281: /*
282: * mqtt_readSUBSCRIBE() Read SUBSCRIBE message
283: *
284: * @buf = Message buffer
285: * @msgID = MessageID
286: * @subscr = Subscriptions, must be free after use with mqtt_subFree()
1.2 misho 287: * return: -1 error or >-1 elements into subscr
1.1 misho 288: */
1.2 misho 289: int
1.1 misho 290: mqtt_readSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)
291: {
292: register int i;
293: int len, ret;
294: struct mqtthdr *hdr;
295: mqtthdr_var_t *var;
296: mqtt_subscr_t *subs;
297: mqtt_len_t *v;
298: caddr_t pos;
299:
300: if (!buf || !msgID || !subscr)
1.2 misho 301: return -1;
1.1 misho 302:
303: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBSCRIBE, &ret, &len);
304: if (!hdr)
1.2 misho 305: return -1;
1.1 misho 306: pos = buf->msg_base + ret + 1;
307: v = (mqtt_len_t*) pos;
308:
309: /* MessageID */
310: len -= sizeof(mqtt_len_t);
311: if (len < 0) {
1.2 misho 312: mqtt_SetErr(EINVAL, "Short message length %d", len);
313: return -1;
1.1 misho 314: } else {
315: *msgID = ntohs(v->val);
316: pos += sizeof(mqtt_len_t);
317: }
318:
319: subs = mqtt_subAlloc(0);
320: if (!subs)
1.2 misho 321: return -1;
1.1 misho 322: else
323: *subscr = subs;
324:
325: /* Subscribes */
326: for (i = 0; len > 0; i++) {
327: var = (mqtthdr_var_t*) pos;
328: len -= MQTTHDR_VAR_SIZEOF(var) + 1;
329: if (len < 0) {
330: mqtt_subFree(subscr);
1.2 misho 331: mqtt_SetErr(EINVAL, "Short message length %d", len);
332: return -1;
1.1 misho 333: }
1.2 misho 334: if (!mqtt_subRealloc(&subs, i + 1)) {
1.1 misho 335: mqtt_subFree(subscr);
1.2 misho 336: return -1;
1.1 misho 337: } else
338: *subscr = subs;
339:
340: memset(&subs[i], 0, sizeof subs[i]);
341: subs[i].sub_topic.msg_len = ntohs(var->var_sb.val);
1.2 misho 342: subs[i].sub_topic.msg_base = malloc(subs[i].sub_topic.msg_len + 1);
1.1 misho 343: if (!subs[i].sub_topic.msg_base) {
344: LOGERR;
345: mqtt_subFree(subscr);
1.2 misho 346: return -1;
347: } else {
1.1 misho 348: memcpy(subs[i].sub_topic.msg_base, var->var_data, subs[i].sub_topic.msg_len);
1.2 misho 349: ((char*) subs[i].sub_topic.msg_base)[subs[i].sub_topic.msg_len] = 0;
350: }
1.1 misho 351: pos += MQTTHDR_VAR_SIZEOF(var);
352:
353: subs[i].sub_ret = *pos;
354: pos++;
355: }
356:
1.2 misho 357: return i;
1.1 misho 358: }
359:
360: /*
361: * mqtt_readSUBACK() Read SUBACK message
362: *
363: * @buf = Message buffer
364: * @msgID = MessageID
365: * @subqos = Subscribes QoS, must be free after use with free()
366: * return: -1 error or >-1 readed subscribes QoS elements
367: */
368: int
369: mqtt_readSUBACK(mqtt_msg_t * __restrict buf, u_short *msgID, u_char **subqos)
370: {
371: int len, ret;
372: struct mqtthdr *hdr;
373: mqtt_len_t *v;
374: caddr_t pos;
375:
376: if (!buf || !msgID || !subqos)
377: return -1;
378:
379: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBACK, &ret, &len);
380: if (!hdr)
381: return -1;
382: pos = buf->msg_base + ret + 1;
383: v = (mqtt_len_t*) pos;
384:
385: /* MessageID */
386: len -= sizeof(mqtt_len_t);
387: if (len < 0) {
1.2 misho 388: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1 misho 389: return -1;
390: } else {
391: *msgID = ntohs(v->val);
392: pos += sizeof(mqtt_len_t);
393: }
394:
395: /* Subscribes */
396: *subqos = malloc(len);
397: if (!*subqos) {
398: LOGERR;
399: return -1;
400: } else
401: memcpy(*subqos, pos, len);
402:
403: return len;
404: }
405:
406: /*
407: * mqtt_readUNSUBSCRIBE() Read UNSUBSCRIBE message
408: *
409: * @buf = Message buffer
410: * @msgID = MessageID
411: * @subscr = Subscriptions, must be free after use with mqtt_subFree()
1.2 misho 412: * return: -1 error or >-1 elements into subscr
1.1 misho 413: */
1.2 misho 414: int
1.1 misho 415: mqtt_readUNSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)
416: {
417: register int i;
418: int len, ret;
419: struct mqtthdr *hdr;
420: mqtthdr_var_t *var;
421: mqtt_subscr_t *subs;
422: mqtt_len_t *v;
423: caddr_t pos;
424:
425: if (!buf || !msgID || !subscr)
1.2 misho 426: return -1;
1.1 misho 427:
428: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBSCRIBE, &ret, &len);
429: if (!hdr)
1.2 misho 430: return -1;
1.1 misho 431: pos = buf->msg_base + ret + 1;
432: v = (mqtt_len_t*) pos;
433:
434: /* MessageID */
435: len -= sizeof(mqtt_len_t);
436: if (len < 0) {
1.2 misho 437: mqtt_SetErr(EINVAL, "Short message length %d", len);
438: return -1;
1.1 misho 439: } else {
440: *msgID = ntohs(v->val);
441: pos += sizeof(mqtt_len_t);
442: }
443:
444: subs = mqtt_subAlloc(0);
445: if (!subs)
1.2 misho 446: return -1;
1.1 misho 447: else
448: *subscr = subs;
449:
450: /* Subscribes */
451: for (i = 0; len > 0; i++) {
452: var = (mqtthdr_var_t*) pos;
453: len -= MQTTHDR_VAR_SIZEOF(var);
454: if (len < 0) {
455: mqtt_subFree(subscr);
1.2 misho 456: mqtt_SetErr(EINVAL, "Short message length %d", len);
457: return -1;
1.1 misho 458: }
1.2 misho 459: if (!mqtt_subRealloc(&subs, i + 1)) {
1.1 misho 460: mqtt_subFree(subscr);
1.2 misho 461: return -1;
1.1 misho 462: } else
463: *subscr = subs;
464:
465: memset(&subs[i], 0, sizeof subs[i]);
466: subs[i].sub_topic.msg_len = ntohs(var->var_sb.val);
1.2 misho 467: subs[i].sub_topic.msg_base = malloc(subs[i].sub_topic.msg_len + 1);
1.1 misho 468: if (!subs[i].sub_topic.msg_base) {
469: LOGERR;
470: mqtt_subFree(subscr);
1.2 misho 471: return -1;
472: } else {
1.1 misho 473: memcpy(subs[i].sub_topic.msg_base, var->var_data, subs[i].sub_topic.msg_len);
1.2 misho 474: ((char*) subs[i].sub_topic.msg_base)[subs[i].sub_topic.msg_len] = 0;
475: }
1.1 misho 476: pos += MQTTHDR_VAR_SIZEOF(var);
477: }
478:
1.2 misho 479: return i;
1.1 misho 480: }
481:
482: /*
483: * mqtt_readUNSUBACK() Read UNSUBACK message
484: *
485: * @buf = Message buffer
486: * return: -1 error or MessageID
487: */
488: u_short
489: mqtt_readUNSUBACK(mqtt_msg_t * __restrict buf)
490: {
491: int len, ret;
492: struct mqtthdr *hdr;
493: mqtt_len_t *v;
494: caddr_t pos;
495:
496: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBACK, &ret, &len);
497: if (!hdr)
498: return (u_short) -1;
499: if (len < sizeof(mqtt_len_t)) {
1.2 misho 500: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1 misho 501: return (u_short) -1;
502: } else {
503: pos = buf->msg_base + ret + 1;
504: v = (mqtt_len_t*) pos;
505: }
506:
507: return ntohs(v->val);
508: }
1.3.12.1! misho 509: #endif
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>