Annotation of libaitmqtt/src/sub.c, revision 1.3.12.4
1.1 misho 1: /*************************************************************************
2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.3.12.4! misho 6: * $Id: sub.c,v 1.3.12.3 2022/09/15 15:04:44 misho Exp $
1.1 misho 7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
1.3.12.1 misho 15: Copyright 2004 - 2022
1.1 misho 16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47:
48:
49: /*
50: * mqtt_msgSUBSCRIBE() Create SUBSCRIBE message
51: *
52: * @Topics = MQTT subscription topics
53: * @msgID = MessageID
1.3.12.1 misho 54: * return: NULL error or allocated SUBSCRIBE message
1.1 misho 55: */
1.3.12.1 misho 56: mqtt_msg_t *
1.3.12.4! misho 57: mqtt_msgSUBSCRIBE(mqtt_subscr_t * __restrict Topics, u_short msgID)
1.1 misho 58: {
1.3.12.1 misho 59: int len, siz;
1.3 misho 60: u_int n, *l;
1.1 misho 61: struct mqtthdr *hdr;
62: mqtthdr_var_t *topic;
63: mqtt_len_t *mid;
64: mqtt_subscr_t *t;
1.2 misho 65: void *data;
1.3.12.1 misho 66: mqtt_msg_t *msg = NULL;
1.1 misho 67:
1.3.12.4! misho 68: if (!Topics)
1.3.12.1 misho 69: return NULL;
1.3.12.3 misho 70: if (!msgID) {
1.2 misho 71: mqtt_SetErr(EINVAL, "Invalid MessageID parameter must be >0");
1.3.12.1 misho 72: return NULL;
1.1 misho 73: }
74:
1.2 misho 75: /* calculate message size */
76: len = sizeof(mqtt_len_t); /* msgid */
1.3.12.4! misho 77: for (t = Topics; t && t->sub_topic.msg_base; t++) /* subscribes & qos */
1.3.12.3 misho 78: len += sizeof(mqtt_len_t) + t->sub_topic.msg_len + 1;
1.2 misho 79:
80: /* calculate header size */
81: siz = sizeof(struct mqtthdr); /* mqtt fixed header */
82: n = mqtt_encodeLen(len); /* message size */
83: siz += mqtt_sizeLen(n) - 1; /* length size */
84:
1.3.12.1 misho 85: if (!(msg = mqtt_msgAlloc(siz + len)))
86: return NULL;
1.1 misho 87: else {
1.3.12.1 misho 88: data = msg->msg_base;
1.2 misho 89: hdr = (struct mqtthdr *) data;
1.1 misho 90: }
91:
1.2 misho 92: /* fixed header */
93: hdr->mqtt_msg.type = MQTT_TYPE_SUBSCRIBE;
1.3.12.3 misho 94: hdr->mqtt_msg.qos = MQTT_QOS_ACK;
1.3 misho 95: l = (u_int*) hdr->mqtt_len;
96: *l = n;
1.2 misho 97: data += siz;
98:
1.1 misho 99: /* variable header */
1.2 misho 100: mid = (mqtt_len_t*) data;
1.1 misho 101: mid->val = htons(msgID);
1.2 misho 102: data += sizeof(mqtt_len_t);
1.1 misho 103:
104: /* payload with subscriptions */
1.3.12.4! misho 105: for (t = Topics; t && t->sub_topic.msg_base; t++) {
1.2 misho 106: topic = (mqtthdr_var_t*) data;
1.1 misho 107: topic->var_sb.val = htons(t->sub_topic.msg_len);
108: memcpy(topic->var_data, t->sub_topic.msg_base, ntohs(topic->var_sb.val));
1.2 misho 109: data += MQTTHDR_VAR_SIZEOF(topic);
1.3.12.3 misho 110: *((char*) data) = t->sub_qos;
111: data++;
1.1 misho 112: }
113:
1.3.12.1 misho 114: return msg;
1.1 misho 115: }
116:
117: /*
118: * mqtt_msgSUBACK() Create SUBACK message
119: *
120: * @Topics = MQTT subscription topics
121: * @msgID = MessageID
1.3.12.1 misho 122: * return: NULL error or allocated SUBACK message
1.1 misho 123: */
1.3.12.1 misho 124: mqtt_msg_t *
1.3.12.4! misho 125: mqtt_msgSUBACK(mqtt_subscr_t * __restrict Topics, u_short msgID)
1.1 misho 126: {
127: int siz = 0;
128: struct mqtthdr *hdr;
129: mqtt_len_t *v;
130: mqtt_subscr_t *t;
131: u_char *qos;
1.3.12.1 misho 132: mqtt_msg_t *msg = NULL;
1.1 misho 133:
1.3.12.4! misho 134: if (!Topics)
1.3.12.1 misho 135: return NULL;
1.1 misho 136:
1.3.12.1 misho 137: if (!(msg = mqtt_msgAlloc(MQTTMSG_MAX)))
138: return NULL;
1.1 misho 139: else {
1.3.12.1 misho 140: hdr = (struct mqtthdr *) msg->msg_base;
141: siz = sizeof(struct mqtthdr);
142: v = (mqtt_len_t*) (msg->msg_base + siz);
1.1 misho 143: siz += sizeof(mqtt_len_t);
144: }
145:
146: /* MessageID */
147: v->val = htons(msgID);
148:
149: /* QoS payload from subscriptions */
1.3.12.4! misho 150: for (t = Topics; t && t->sub_topic.msg_base; t++, siz++) {
1.3.12.1 misho 151: qos = (msg->msg_base + siz);
1.3.12.2 misho 152: *qos = t->sub_qos;
1.1 misho 153: }
154:
155: /* fixed header */
156: hdr->mqtt_msg.type = MQTT_TYPE_SUBACK;
157: *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
158:
1.3.12.1 misho 159: return msg;
1.1 misho 160: }
161:
162: /*
163: * mqtt_msgUNSUBSCRIBE() Create UNSUBSCRIBE message
164: *
165: * @Topics = MQTT subscription topics
166: * @msgID = MessageID
167: * @Dup = Duplicate message
168: * @QOS = QoS
1.3.12.1 misho 169: * return: NULL error or allocated UNSUBSCRIBE message
1.1 misho 170: */
1.3.12.1 misho 171: mqtt_msg_t *
1.3.12.4! misho 172: mqtt_msgUNSUBSCRIBE(mqtt_subscr_t * __restrict Topics, u_short msgID,
1.3.12.1 misho 173: u_char Dup, u_char QOS)
1.1 misho 174: {
1.2 misho 175: int len, siz = 0;
1.3 misho 176: u_int n, *l;
1.1 misho 177: struct mqtthdr *hdr;
178: mqtthdr_var_t *topic;
179: mqtt_len_t *mid;
180: mqtt_subscr_t *t;
1.2 misho 181: void *data;
1.3.12.1 misho 182: mqtt_msg_t *msg = NULL;
1.1 misho 183:
1.3.12.4! misho 184: if (!Topics)
1.3.12.1 misho 185: return NULL;
1.1 misho 186: if (QOS > MQTT_QOS_EXACTLY) {
1.2 misho 187: mqtt_SetErr(EINVAL, "Invalid QoS parameter");
1.3.12.1 misho 188: return NULL;
1.1 misho 189: }
190: if (!msgID && QOS != MQTT_QOS_ONCE) {
1.2 misho 191: mqtt_SetErr(EINVAL, "Invalid MessageID parameter must be >0");
1.3.12.1 misho 192: return NULL;
1.1 misho 193: }
194:
1.2 misho 195: /* calculate message size */
196: len = sizeof(mqtt_len_t); /* msgid */
1.3.12.4! misho 197: for (t = Topics; t && t->sub_topic.msg_base; t++) /* subscribes */
1.2 misho 198: len += sizeof(mqtt_len_t) + t->sub_topic.msg_len;
199:
200: /* calculate header size */
201: siz = sizeof(struct mqtthdr); /* mqtt fixed header */
202: n = mqtt_encodeLen(len); /* message size */
203: siz += mqtt_sizeLen(n) - 1; /* length size */
204:
1.3.12.1 misho 205: if (!(msg = mqtt_msgAlloc(siz + len)))
206: return NULL;
1.1 misho 207: else {
1.3.12.1 misho 208: data = msg->msg_base;
1.2 misho 209: hdr = (struct mqtthdr *) data;
1.1 misho 210: }
211:
1.2 misho 212: /* fixed header */
213: hdr->mqtt_msg.type = MQTT_TYPE_UNSUBSCRIBE;
214: hdr->mqtt_msg.qos = QOS;
215: hdr->mqtt_msg.dup = Dup ? 1 : 0;
216: hdr->mqtt_msg.retain = 0;
1.3 misho 217: l = (u_int*) hdr->mqtt_len;
218: *l = n;
1.2 misho 219: data += siz;
220:
1.1 misho 221: /* variable header */
1.3.12.1 misho 222: mid = (mqtt_len_t*) (msg->msg_base + siz);
1.1 misho 223: mid->val = htons(msgID);
1.2 misho 224: data += sizeof(mqtt_len_t);
1.1 misho 225:
226: /* payload with subscriptions */
1.3.12.4! misho 227: for (t = Topics; t && t->sub_topic.msg_base; t++) {
1.2 misho 228: topic = (mqtthdr_var_t*) data;
1.1 misho 229: topic->var_sb.val = htons(t->sub_topic.msg_len);
230: memcpy(topic->var_data, t->sub_topic.msg_base, ntohs(topic->var_sb.val));
1.2 misho 231: data += MQTTHDR_VAR_SIZEOF(topic);
1.1 misho 232: }
233:
1.3.12.1 misho 234: return msg;
1.1 misho 235: }
236:
237: /*
238: * mqtt_msgUNSUBACK() Create UNSUBACK message
239: *
240: * @msgID = MessageID
1.3.12.1 misho 241: * return: NULL error or allocated UNSUBACK message
1.1 misho 242: */
1.3.12.1 misho 243: mqtt_msg_t *
244: mqtt_msgUNSUBACK(u_short msgID)
1.1 misho 245: {
246: struct mqtthdr *hdr;
247: mqtt_len_t *v;
1.3.12.1 misho 248: mqtt_msg_t *msg = NULL;
1.1 misho 249:
1.3.12.1 misho 250: if (!(msg = mqtt_msgAlloc(sizeof(struct mqtthdr) + sizeof(mqtt_len_t))))
251: return NULL;
1.1 misho 252: else {
1.3.12.1 misho 253: hdr = (struct mqtthdr *) msg->msg_base;
254: v = (mqtt_len_t*) (msg->msg_base + sizeof(struct mqtthdr));
1.1 misho 255: }
256:
257: /* fixed header */
258: hdr->mqtt_msg.type = MQTT_TYPE_UNSUBACK;
259: *hdr->mqtt_len = sizeof(mqtt_len_t);
260:
261: /* MessageID */
262: v->val = htons(msgID);
263:
1.3.12.1 misho 264: return msg;
1.1 misho 265: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>