Annotation of libaitmqtt/src/pub.c, revision 1.3.8.1
1.1 misho 1: /*************************************************************************
2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.3.8.1 ! misho 6: * $Id: pub.c,v 1.3 2012/06/28 11:06:17 misho Exp $
1.1 misho 7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
1.3.8.1 ! misho 15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013
1.1 misho 16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47:
48:
49: /*
50: * mqtt_msgPUBLISH() Create PUBLISH message
51: *
52: * @buf = Message buffer
53: * @csTopic = Publish topic
54: * @msgID = MessageID >0, if QOS != MQTT_QOS_ONCE
55: * @Dup = Duplicate message
56: * @QOS = QoS
57: * @Retain = Retain message
58: * @pData = Publish data into topic
59: * @datlen = Publish data length
60: * return: -1 error or >-1 message size for send
61: */
62: int
63: mqtt_msgPUBLISH(mqtt_msg_t * __restrict buf, const char *csTopic, u_short msgID,
64: u_char Dup, u_char QOS, u_char Retain, const void *pData, int datlen)
65: {
1.2 misho 66: int len, siz;
1.3 misho 67: u_int n, *l;
1.1 misho 68: struct mqtthdr *hdr;
69: mqtthdr_var_t *topic;
70: mqtt_len_t *mid;
71: void *data;
72:
73: if (!buf || !csTopic)
74: return -1;
75: if (QOS > MQTT_QOS_EXACTLY) {
1.2 misho 76: mqtt_SetErr(EINVAL, "Invalid QoS parameter");
1.1 misho 77: return -1;
78: }
79: if (!msgID && QOS != MQTT_QOS_ONCE) {
1.2 misho 80: mqtt_SetErr(EINVAL, "Invalid MessageID parameter must be >0");
1.1 misho 81: return -1;
82: }
83:
1.2 misho 84: /* calculate message size */
85: len = sizeof(mqtt_len_t) + strlen(csTopic); /* topic */
86: len += sizeof(mqtt_len_t); /* msgid */
87: len += datlen; /* data len */
88:
89: /* calculate header size */
90: siz = sizeof(struct mqtthdr); /* mqtt fixed header */
91: n = mqtt_encodeLen(len); /* message size */
92: siz += mqtt_sizeLen(n) - 1; /* length size */
93:
94: if (mqtt_msgRealloc(buf, siz + len) == -1)
1.1 misho 95: return -1;
96: else {
1.2 misho 97: data = buf->msg_base;
98: hdr = (struct mqtthdr *) data;
1.1 misho 99: }
100:
1.2 misho 101: /* fixed header */
102: MQTTHDR_MSGINIT(hdr);
103: hdr->mqtt_msg.type = MQTT_TYPE_PUBLISH;
104: hdr->mqtt_msg.qos = QOS;
105: hdr->mqtt_msg.dup = Dup ? 1 : 0;
106: hdr->mqtt_msg.retain = Retain ? 1 : 0;
1.3 misho 107: l = (u_int*) hdr->mqtt_len;
108: *l = n;
1.2 misho 109: data += siz;
110:
1.1 misho 111: /* variable header */
1.2 misho 112: topic = (mqtthdr_var_t*) data;
1.1 misho 113: topic->var_sb.val = htons(strlen(csTopic));
114: memcpy(topic->var_data, csTopic, ntohs(topic->var_sb.val));
1.2 misho 115: data += MQTTHDR_VAR_SIZEOF(topic);
1.1 misho 116:
1.2 misho 117: mid = (mqtt_len_t*) data;
1.1 misho 118: mid->val = htons(msgID);
1.2 misho 119: data += sizeof(mqtt_len_t);
1.1 misho 120:
121: /* load with data */
1.2 misho 122: if (pData && datlen)
1.1 misho 123: memcpy(data, pData, datlen);
124:
1.2 misho 125: return siz + len;
1.1 misho 126: }
127:
128: static int
129: _mqtt_msgPUB_(mqtt_msg_t * __restrict buf, u_char cmd, u_short msgID)
130: {
1.3 misho 131: int siz = sizeof(struct mqtthdr);
1.1 misho 132: struct mqtthdr *hdr;
133: mqtt_len_t *v;
134:
135: if (!buf)
136: return -1;
137:
138: if (mqtt_msgRealloc(buf, sizeof(struct mqtthdr) + sizeof(mqtt_len_t)) == -1)
139: return -1;
1.3 misho 140: else
141: hdr = (struct mqtthdr *) buf->msg_base;
1.1 misho 142:
143: /* fixed header */
144: MQTTHDR_MSGINIT(hdr);
145: hdr->mqtt_msg.type = cmd;
146: *hdr->mqtt_len = sizeof(mqtt_len_t);
147:
148: /* MessageID */
1.3 misho 149: v = (mqtt_len_t*) (buf->msg_base + siz);
1.1 misho 150: v->val = htons(msgID);
1.3 misho 151: siz += sizeof(mqtt_len_t);
1.1 misho 152:
153: return siz;
154: }
155:
156: /*
157: * mqtt_msgPUBACK() Create PUBACK message
158: *
159: * @buf = Message buffer
160: * @msgID = MessageID
161: * return: -1 error or >-1 message size for send
162: */
1.3.8.1 ! misho 163: int
1.1 misho 164: mqtt_msgPUBACK(mqtt_msg_t * __restrict buf, u_short msgID)
165: {
166: return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBACK, msgID);
167: }
168:
169: /*
170: * mqtt_msgPUBREC() Create PUBREC message
171: *
172: * @buf = Message buffer
173: * @msgID = MessageID
174: * return: -1 error or >-1 message size for send
175: */
1.3.8.1 ! misho 176: int
1.1 misho 177: mqtt_msgPUBREC(mqtt_msg_t * __restrict buf, u_short msgID)
178: {
179: return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBREC, msgID);
180: }
181:
182: /*
183: * mqtt_msgPUBREL() Create PUBREL message
184: *
185: * @buf = Message buffer
186: * @msgID = MessageID
187: * return: -1 error or >-1 message size for send
188: */
1.3.8.1 ! misho 189: int
1.1 misho 190: mqtt_msgPUBREL(mqtt_msg_t * __restrict buf, u_short msgID)
191: {
192: return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBREL, msgID);
193: }
194:
195: /*
196: * mqtt_msgPUBCOMP() Create PUBCOMP message
197: *
198: * @buf = Message buffer
199: * @msgID = MessageID
200: * return: -1 error or >-1 message size for send
201: */
1.3.8.1 ! misho 202: int
1.1 misho 203: mqtt_msgPUBCOMP(mqtt_msg_t * __restrict buf, u_short msgID)
204: {
205: return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBCOMP, msgID);
206: }
207:
208:
209: /* ============= decode ============ */
210:
211: /*
212: * mqtt_readPUBLISH() Read PUBLISH message
213: *
214: * @buf = Message buffer
215: * @psTopic = Topic
216: * @topicLen = Topic length
217: * @msgID = MessageID
1.2 misho 218: * @pData = Data buffer, may be NULL
219: * return: -1 error or !=-1 allocated data buffer length
1.1 misho 220: */
1.2 misho 221: int
1.1 misho 222: mqtt_readPUBLISH(mqtt_msg_t * __restrict buf, char * __restrict psTopic, int topicLen,
1.2 misho 223: u_short *msgID, void ** __restrict pData)
1.1 misho 224: {
225: int len, ret;
226: struct mqtthdr *hdr;
227: mqtthdr_var_t *var;
228: mqtt_len_t *v;
229: caddr_t pos;
230:
1.2 misho 231: if (!buf || !psTopic || !msgID)
232: return -1;
1.1 misho 233:
234: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBLISH, &ret, &len);
235: if (!hdr)
1.2 misho 236: return -1;
1.1 misho 237: pos = buf->msg_base + ret + 1;
238: var = (mqtthdr_var_t*) pos;
239:
240: /* topic */
241: len -= MQTTHDR_VAR_SIZEOF(var);
242: if (len < 0) {
1.2 misho 243: mqtt_SetErr(EINVAL, "Short message length %d", len);
244: return -1;
1.1 misho 245: } else {
246: memset(psTopic, 0, topicLen--);
247: memcpy(psTopic, var->var_data, ntohs(var->var_sb.val) > topicLen ?
248: topicLen : ntohs(var->var_sb.val));
249: pos += MQTTHDR_VAR_SIZEOF(var);
250: v = (mqtt_len_t*) pos;
251: }
252:
253: len -= sizeof(mqtt_len_t);
254: if (len < 0) {
1.2 misho 255: mqtt_SetErr(EINVAL, "Short message length %d", len);
256: return -1;
1.1 misho 257: } else {
258: *msgID = ntohs(v->val);
259: pos += sizeof(mqtt_len_t);
260: }
261:
262: /* data */
263: if (len < 0) {
1.2 misho 264: mqtt_SetErr(EINVAL, "Short message length %d", len);
265: return -1;
266: } else if (pData) {
267: if (!(*pData = malloc(len + 1))) {
268: LOGERR;
269: return -1;
270: } else
271: ((char*) (*pData))[len] = 0;
272:
273: memcpy(*pData, pos, len);
1.1 misho 274: }
275:
1.2 misho 276: return len;
1.1 misho 277: }
278:
279: /*
280: * mqtt_readPUBACK() Read PUBACK message
281: *
282: * @buf = Message buffer
283: * return: -1 error or MessageID
284: */
285: u_short
286: mqtt_readPUBACK(mqtt_msg_t * __restrict buf)
287: {
288: int len, ret;
289: struct mqtthdr *hdr;
290: mqtt_len_t *v;
291: caddr_t pos;
292:
293: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBACK, &ret, &len);
294: if (!hdr)
295: return (u_short) -1;
296: if (len < sizeof(mqtt_len_t)) {
1.2 misho 297: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1 misho 298: return (u_short) -1;
299: } else {
300: pos = buf->msg_base + ret + 1;
301: v = (mqtt_len_t*) pos;
302: }
303:
304: return ntohs(v->val);
305: }
306:
307: /*
308: * mqtt_readPUBREC() Read PUBREC message
309: *
310: * @buf = Message buffer
311: * return: -1 error or MessageID
312: */
313: u_short
314: mqtt_readPUBREC(mqtt_msg_t * __restrict buf)
315: {
316: int len, ret;
317: struct mqtthdr *hdr;
318: mqtt_len_t *v;
319: caddr_t pos;
320:
321: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBREC, &ret, &len);
322: if (!hdr)
323: return (u_short) -1;
324: if (len < sizeof(mqtt_len_t)) {
1.2 misho 325: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1 misho 326: return (u_short) -1;
327: } else {
328: pos = buf->msg_base + ret + 1;
329: v = (mqtt_len_t*) pos;
330: }
331:
332: return ntohs(v->val);
333: }
334:
335: /*
336: * mqtt_readPUBREL() Read PUBREL message
337: *
338: * @buf = Message buffer
339: * return: -1 error or MessageID
340: */
341: u_short
342: mqtt_readPUBREL(mqtt_msg_t * __restrict buf)
343: {
344: int len, ret;
345: struct mqtthdr *hdr;
346: mqtt_len_t *v;
347: caddr_t pos;
348:
349: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBREL, &ret, &len);
350: if (!hdr)
351: return (u_short) -1;
352: if (len < sizeof(mqtt_len_t)) {
1.2 misho 353: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1 misho 354: return (u_short) -1;
355: } else {
356: pos = buf->msg_base + ret + 1;
357: v = (mqtt_len_t*) pos;
358: }
359:
360: return ntohs(v->val);
361: }
362:
363: /*
364: * mqtt_readPUBCOMP() Read PUBCOMP message
365: *
366: * @buf = Message buffer
367: * return: -1 error or MessageID
368: */
369: u_short
370: mqtt_readPUBCOMP(mqtt_msg_t * __restrict buf)
371: {
372: int len, ret;
373: struct mqtthdr *hdr;
374: mqtt_len_t *v;
375: caddr_t pos;
376:
377: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBCOMP, &ret, &len);
378: if (!hdr)
379: return (u_short) -1;
380: if (len < sizeof(mqtt_len_t)) {
1.2 misho 381: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1 misho 382: return (u_short) -1;
383: } else {
384: pos = buf->msg_base + ret + 1;
385: v = (mqtt_len_t*) pos;
386: }
387:
388: return ntohs(v->val);
389: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>