Annotation of libaitmqtt/src/pub.c, revision 1.4.4.1
1.1 misho 1: /*************************************************************************
2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.4.4.1 ! misho 6: * $Id: pub.c,v 1.4 2013/05/30 09:18:33 misho Exp $
1.1 misho 7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
1.4.4.1 ! misho 15: Copyright 2004 - 2022
1.1 misho 16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47:
48:
49: /*
50: * mqtt_msgPUBLISH() Create PUBLISH message
51: *
52: * @csTopic = Publish topic
53: * @msgID = MessageID >0, if QOS != MQTT_QOS_ONCE
54: * @Dup = Duplicate message
55: * @QOS = QoS
56: * @Retain = Retain message
57: * @pData = Publish data into topic
58: * @datlen = Publish data length
1.4.4.1 ! misho 59: * return: NULL error or allocated PUBLISH message
1.1 misho 60: */
1.4.4.1 ! misho 61: mqtt_msg_t *
! 62: mqtt_msgPUBLISH(const char *csTopic, u_short msgID, u_char Dup,
! 63: u_char QOS, u_char Retain, const void *pData, int datlen)
1.1 misho 64: {
1.2 misho 65: int len, siz;
1.3 misho 66: u_int n, *l;
1.1 misho 67: struct mqtthdr *hdr;
68: mqtthdr_var_t *topic;
69: mqtt_len_t *mid;
70: void *data;
1.4.4.1 ! misho 71: mqtt_msg_t *msg = NULL;
1.1 misho 72:
1.4.4.1 ! misho 73: if (!csTopic)
! 74: return NULL;
1.1 misho 75: if (QOS > MQTT_QOS_EXACTLY) {
1.2 misho 76: mqtt_SetErr(EINVAL, "Invalid QoS parameter");
1.4.4.1 ! misho 77: return NULL;
1.1 misho 78: }
79: if (!msgID && QOS != MQTT_QOS_ONCE) {
1.2 misho 80: mqtt_SetErr(EINVAL, "Invalid MessageID parameter must be >0");
1.4.4.1 ! misho 81: return NULL;
1.1 misho 82: }
83:
1.2 misho 84: /* calculate message size */
85: len = sizeof(mqtt_len_t) + strlen(csTopic); /* topic */
86: len += sizeof(mqtt_len_t); /* msgid */
87: len += datlen; /* data len */
88:
89: /* calculate header size */
90: siz = sizeof(struct mqtthdr); /* mqtt fixed header */
91: n = mqtt_encodeLen(len); /* message size */
92: siz += mqtt_sizeLen(n) - 1; /* length size */
93:
1.4.4.1 ! misho 94: if (!(msg = mqtt_msgAlloc(siz + len)))
! 95: return NULL;
1.1 misho 96: else {
1.4.4.1 ! misho 97: data = msg->msg_base;
1.2 misho 98: hdr = (struct mqtthdr *) data;
1.1 misho 99: }
100:
1.2 misho 101: /* fixed header */
102: hdr->mqtt_msg.type = MQTT_TYPE_PUBLISH;
103: hdr->mqtt_msg.qos = QOS;
104: hdr->mqtt_msg.dup = Dup ? 1 : 0;
105: hdr->mqtt_msg.retain = Retain ? 1 : 0;
1.3 misho 106: l = (u_int*) hdr->mqtt_len;
107: *l = n;
1.2 misho 108: data += siz;
109:
1.1 misho 110: /* variable header */
1.2 misho 111: topic = (mqtthdr_var_t*) data;
1.1 misho 112: topic->var_sb.val = htons(strlen(csTopic));
113: memcpy(topic->var_data, csTopic, ntohs(topic->var_sb.val));
1.2 misho 114: data += MQTTHDR_VAR_SIZEOF(topic);
1.1 misho 115:
1.2 misho 116: mid = (mqtt_len_t*) data;
1.1 misho 117: mid->val = htons(msgID);
1.2 misho 118: data += sizeof(mqtt_len_t);
1.1 misho 119:
120: /* load with data */
1.2 misho 121: if (pData && datlen)
1.1 misho 122: memcpy(data, pData, datlen);
123:
1.4.4.1 ! misho 124: return msg;
1.1 misho 125: }
126:
1.4.4.1 ! misho 127: static mqtt_msg_t *
! 128: _mqtt_msgPUB_(u_char cmd, u_short msgID)
1.1 misho 129: {
130: struct mqtthdr *hdr;
131: mqtt_len_t *v;
1.4.4.1 ! misho 132: mqtt_msg_t *msg = NULL;
1.1 misho 133:
1.4.4.1 ! misho 134: if (!(msg = mqtt_msgAlloc(sizeof(struct mqtthdr) + sizeof(mqtt_len_t))))
! 135: return NULL;
! 136: else {
! 137: hdr = (struct mqtthdr *) msg->msg_base;
! 138: v = (mqtt_len_t*) (msg->msg_base + sizeof(struct mqtthdr));
! 139: }
1.1 misho 140:
141: /* fixed header */
142: hdr->mqtt_msg.type = cmd;
143: *hdr->mqtt_len = sizeof(mqtt_len_t);
144:
145: /* MessageID */
146: v->val = htons(msgID);
147:
1.4.4.1 ! misho 148: return msg;
1.1 misho 149: }
150:
151: /*
152: * mqtt_msgPUBACK() Create PUBACK message
153: *
154: * @msgID = MessageID
1.4.4.1 ! misho 155: * return: NULL error or allocated PUBACK message
1.1 misho 156: */
1.4.4.1 ! misho 157: mqtt_msg_t *
! 158: mqtt_msgPUBACK(u_short msgID)
1.1 misho 159: {
1.4.4.1 ! misho 160: return _mqtt_msgPUB_(MQTT_TYPE_PUBACK, msgID);
1.1 misho 161: }
162:
163: /*
164: * mqtt_msgPUBREC() Create PUBREC message
165: *
166: * @msgID = MessageID
1.4.4.1 ! misho 167: * return: NULL error or allocated PUBREC message
1.1 misho 168: */
1.4.4.1 ! misho 169: mqtt_msg_t *
! 170: mqtt_msgPUBREC(u_short msgID)
1.1 misho 171: {
1.4.4.1 ! misho 172: return _mqtt_msgPUB_(MQTT_TYPE_PUBREC, msgID);
1.1 misho 173: }
174:
175: /*
176: * mqtt_msgPUBREL() Create PUBREL message
177: *
178: * @msgID = MessageID
1.4.4.1 ! misho 179: * return: NULL error or allocated PUBREL message
1.1 misho 180: */
1.4.4.1 ! misho 181: mqtt_msg_t *
! 182: mqtt_msgPUBREL(u_short msgID)
1.1 misho 183: {
1.4.4.1 ! misho 184: return _mqtt_msgPUB_(MQTT_TYPE_PUBREL, msgID);
1.1 misho 185: }
186:
187: /*
188: * mqtt_msgPUBCOMP() Create PUBCOMP message
189: *
190: * @msgID = MessageID
1.4.4.1 ! misho 191: * return: NULL error or allocated PUBCOMP message
1.1 misho 192: */
1.4.4.1 ! misho 193: mqtt_msg_t *
! 194: mqtt_msgPUBCOMP(u_short msgID)
1.1 misho 195: {
1.4.4.1 ! misho 196: return _mqtt_msgPUB_(MQTT_TYPE_PUBCOMP, msgID);
1.1 misho 197: }
198:
199:
200: /* ============= decode ============ */
201:
1.4.4.1 ! misho 202: #if 0
1.1 misho 203: /*
204: * mqtt_readPUBLISH() Read PUBLISH message
205: *
206: * @buf = Message buffer
207: * @psTopic = Topic
208: * @topicLen = Topic length
209: * @msgID = MessageID
1.2 misho 210: * @pData = Data buffer, may be NULL
211: * return: -1 error or !=-1 allocated data buffer length
1.1 misho 212: */
1.2 misho 213: int
1.1 misho 214: mqtt_readPUBLISH(mqtt_msg_t * __restrict buf, char * __restrict psTopic, int topicLen,
1.2 misho 215: u_short *msgID, void ** __restrict pData)
1.1 misho 216: {
217: int len, ret;
218: struct mqtthdr *hdr;
219: mqtthdr_var_t *var;
220: mqtt_len_t *v;
221: caddr_t pos;
222:
1.2 misho 223: if (!buf || !psTopic || !msgID)
224: return -1;
1.1 misho 225:
226: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBLISH, &ret, &len);
227: if (!hdr)
1.2 misho 228: return -1;
1.1 misho 229: pos = buf->msg_base + ret + 1;
230: var = (mqtthdr_var_t*) pos;
231:
232: /* topic */
233: len -= MQTTHDR_VAR_SIZEOF(var);
234: if (len < 0) {
1.2 misho 235: mqtt_SetErr(EINVAL, "Short message length %d", len);
236: return -1;
1.1 misho 237: } else {
238: memset(psTopic, 0, topicLen--);
239: memcpy(psTopic, var->var_data, ntohs(var->var_sb.val) > topicLen ?
240: topicLen : ntohs(var->var_sb.val));
241: pos += MQTTHDR_VAR_SIZEOF(var);
242: v = (mqtt_len_t*) pos;
243: }
244:
245: len -= sizeof(mqtt_len_t);
246: if (len < 0) {
1.2 misho 247: mqtt_SetErr(EINVAL, "Short message length %d", len);
248: return -1;
1.1 misho 249: } else {
250: *msgID = ntohs(v->val);
251: pos += sizeof(mqtt_len_t);
252: }
253:
254: /* data */
255: if (len < 0) {
1.2 misho 256: mqtt_SetErr(EINVAL, "Short message length %d", len);
257: return -1;
258: } else if (pData) {
259: if (!(*pData = malloc(len + 1))) {
260: LOGERR;
261: return -1;
262: } else
263: ((char*) (*pData))[len] = 0;
264:
265: memcpy(*pData, pos, len);
1.1 misho 266: }
267:
1.2 misho 268: return len;
1.1 misho 269: }
270:
271: /*
272: * mqtt_readPUBACK() Read PUBACK message
273: *
274: * @buf = Message buffer
275: * return: -1 error or MessageID
276: */
277: u_short
278: mqtt_readPUBACK(mqtt_msg_t * __restrict buf)
279: {
280: int len, ret;
281: struct mqtthdr *hdr;
282: mqtt_len_t *v;
283: caddr_t pos;
284:
285: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBACK, &ret, &len);
286: if (!hdr)
287: return (u_short) -1;
288: if (len < sizeof(mqtt_len_t)) {
1.2 misho 289: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1 misho 290: return (u_short) -1;
291: } else {
292: pos = buf->msg_base + ret + 1;
293: v = (mqtt_len_t*) pos;
294: }
295:
296: return ntohs(v->val);
297: }
298:
299: /*
300: * mqtt_readPUBREC() Read PUBREC message
301: *
302: * @buf = Message buffer
303: * return: -1 error or MessageID
304: */
305: u_short
306: mqtt_readPUBREC(mqtt_msg_t * __restrict buf)
307: {
308: int len, ret;
309: struct mqtthdr *hdr;
310: mqtt_len_t *v;
311: caddr_t pos;
312:
313: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBREC, &ret, &len);
314: if (!hdr)
315: return (u_short) -1;
316: if (len < sizeof(mqtt_len_t)) {
1.2 misho 317: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1 misho 318: return (u_short) -1;
319: } else {
320: pos = buf->msg_base + ret + 1;
321: v = (mqtt_len_t*) pos;
322: }
323:
324: return ntohs(v->val);
325: }
326:
327: /*
328: * mqtt_readPUBREL() Read PUBREL message
329: *
330: * @buf = Message buffer
331: * return: -1 error or MessageID
332: */
333: u_short
334: mqtt_readPUBREL(mqtt_msg_t * __restrict buf)
335: {
336: int len, ret;
337: struct mqtthdr *hdr;
338: mqtt_len_t *v;
339: caddr_t pos;
340:
341: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBREL, &ret, &len);
342: if (!hdr)
343: return (u_short) -1;
344: if (len < sizeof(mqtt_len_t)) {
1.2 misho 345: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1 misho 346: return (u_short) -1;
347: } else {
348: pos = buf->msg_base + ret + 1;
349: v = (mqtt_len_t*) pos;
350: }
351:
352: return ntohs(v->val);
353: }
354:
355: /*
356: * mqtt_readPUBCOMP() Read PUBCOMP message
357: *
358: * @buf = Message buffer
359: * return: -1 error or MessageID
360: */
361: u_short
362: mqtt_readPUBCOMP(mqtt_msg_t * __restrict buf)
363: {
364: int len, ret;
365: struct mqtthdr *hdr;
366: mqtt_len_t *v;
367: caddr_t pos;
368:
369: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBCOMP, &ret, &len);
370: if (!hdr)
371: return (u_short) -1;
372: if (len < sizeof(mqtt_len_t)) {
1.2 misho 373: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1 misho 374: return (u_short) -1;
375: } else {
376: pos = buf->msg_base + ret + 1;
377: v = (mqtt_len_t*) pos;
378: }
379:
380: return ntohs(v->val);
381: }
1.4.4.1 ! misho 382: #endif
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>