Annotation of libaitmqtt/src/pub.c, revision 1.1.1.1.2.2
1.1 misho 1: /*************************************************************************
2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.1.1.1.2.2! misho 6: * $Id: pub.c,v 1.1.1.1.2.1 2012/04/07 20:56:49 misho Exp $
1.1 misho 7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
1.1.1.1.2.1 misho 15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
1.1 misho 16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47:
48:
49: /*
50: * mqtt_msgPUBLISH() Create PUBLISH message
51: *
52: * @buf = Message buffer
53: * @csTopic = Publish topic
54: * @msgID = MessageID >0, if QOS != MQTT_QOS_ONCE
55: * @Dup = Duplicate message
56: * @QOS = QoS
57: * @Retain = Retain message
58: * @pData = Publish data into topic
59: * @datlen = Publish data length
60: * return: -1 error or >-1 message size for send
61: */
62: int
63: mqtt_msgPUBLISH(mqtt_msg_t * __restrict buf, const char *csTopic, u_short msgID,
64: u_char Dup, u_char QOS, u_char Retain, const void *pData, int datlen)
65: {
66: int siz = 0;
67: struct mqtthdr *hdr;
68: mqtthdr_var_t *topic;
69: mqtt_len_t *mid;
70: void *data;
71:
72: if (!buf || !csTopic)
73: return -1;
74: if (QOS > MQTT_QOS_EXACTLY) {
1.1.1.1.2.1 misho 75: mqtt_SetErr(EINVAL, "Invalid QoS parameter");
1.1 misho 76: return -1;
77: }
78: if (!msgID && QOS != MQTT_QOS_ONCE) {
1.1.1.1.2.1 misho 79: mqtt_SetErr(EINVAL, "Invalid MessageID parameter must be >0");
1.1 misho 80: return -1;
81: }
82:
83: if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
84: return -1;
85: else {
86: hdr = (struct mqtthdr *) (buf->msg_base + siz);
87: siz += sizeof(struct mqtthdr);
88: }
89:
90: /* variable header */
91: topic = (mqtthdr_var_t*) (buf->msg_base + siz);
92: topic->var_sb.val = htons(strlen(csTopic));
93: memcpy(topic->var_data, csTopic, ntohs(topic->var_sb.val));
94: siz += MQTTHDR_VAR_SIZEOF(topic);
95:
96: mid = (mqtt_len_t*) (buf->msg_base + siz);
97: mid->val = htons(msgID);
98: siz += sizeof(mqtt_len_t);
99:
100: /* load with data */
101: if (pData && datlen) {
102: data = buf->msg_base + siz;
103: memcpy(data, pData, datlen);
104: siz += datlen;
105: }
106:
107: /* fixed header */
108: MQTTHDR_MSGINIT(hdr);
109: hdr->mqtt_msg.type = MQTT_TYPE_PUBLISH;
110: hdr->mqtt_msg.qos = QOS;
111: hdr->mqtt_msg.dup = Dup ? 1 : 0;
112: hdr->mqtt_msg.retain = Retain ? 1 : 0;
113: *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
114:
115: return siz;
116: }
117:
118: static int
119: _mqtt_msgPUB_(mqtt_msg_t * __restrict buf, u_char cmd, u_short msgID)
120: {
121: int siz = 0;
122: struct mqtthdr *hdr;
123: mqtt_len_t *v;
124:
125: if (!buf)
126: return -1;
127:
128: if (mqtt_msgRealloc(buf, sizeof(struct mqtthdr) + sizeof(mqtt_len_t)) == -1)
129: return -1;
130: else {
131: hdr = (struct mqtthdr *) (buf->msg_base + siz);
132: siz += sizeof(struct mqtthdr);
133: v = (mqtt_len_t*) (buf->msg_base + siz);
134: siz += sizeof(mqtt_len_t);
135: }
136:
137: /* fixed header */
138: MQTTHDR_MSGINIT(hdr);
139: hdr->mqtt_msg.type = cmd;
140: *hdr->mqtt_len = sizeof(mqtt_len_t);
141:
142: /* MessageID */
143: v->val = htons(msgID);
144:
145: return siz;
146: }
147:
148: /*
149: * mqtt_msgPUBACK() Create PUBACK message
150: *
151: * @buf = Message buffer
152: * @msgID = MessageID
153: * return: -1 error or >-1 message size for send
154: */
155: inline int
156: mqtt_msgPUBACK(mqtt_msg_t * __restrict buf, u_short msgID)
157: {
158: return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBACK, msgID);
159: }
160:
161: /*
162: * mqtt_msgPUBREC() Create PUBREC message
163: *
164: * @buf = Message buffer
165: * @msgID = MessageID
166: * return: -1 error or >-1 message size for send
167: */
168: inline int
169: mqtt_msgPUBREC(mqtt_msg_t * __restrict buf, u_short msgID)
170: {
171: return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBREC, msgID);
172: }
173:
174: /*
175: * mqtt_msgPUBREL() Create PUBREL message
176: *
177: * @buf = Message buffer
178: * @msgID = MessageID
179: * return: -1 error or >-1 message size for send
180: */
181: inline int
182: mqtt_msgPUBREL(mqtt_msg_t * __restrict buf, u_short msgID)
183: {
184: return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBREL, msgID);
185: }
186:
187: /*
188: * mqtt_msgPUBCOMP() Create PUBCOMP message
189: *
190: * @buf = Message buffer
191: * @msgID = MessageID
192: * return: -1 error or >-1 message size for send
193: */
194: inline int
195: mqtt_msgPUBCOMP(mqtt_msg_t * __restrict buf, u_short msgID)
196: {
197: return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBCOMP, msgID);
198: }
199:
200:
201: /* ============= decode ============ */
202:
203: /*
204: * mqtt_readPUBLISH() Read PUBLISH message
205: *
206: * @buf = Message buffer
207: * @psTopic = Topic
208: * @topicLen = Topic length
209: * @msgID = MessageID
210: * @pData = Data buffer
211: * @datLen = Data buffer length, if *datLen == 0 allocate memory for pData
212: * return: NULL error or !=NULL MQTT fixed header
213: */
214: struct mqtthdr *
215: mqtt_readPUBLISH(mqtt_msg_t * __restrict buf, char * __restrict psTopic, int topicLen,
216: u_short *msgID, void * __restrict pData, int *datLen)
217: {
218: int len, ret;
219: struct mqtthdr *hdr;
220: mqtthdr_var_t *var;
221: mqtt_len_t *v;
222: caddr_t pos;
223:
224: if (!buf || !psTopic || !msgID || !pData)
225: return NULL;
226:
227: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBLISH, &ret, &len);
228: if (!hdr)
229: return NULL;
230: pos = buf->msg_base + ret + 1;
231: var = (mqtthdr_var_t*) pos;
232:
233: /* topic */
234: len -= MQTTHDR_VAR_SIZEOF(var);
235: if (len < 0) {
1.1.1.1.2.1 misho 236: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1 misho 237: return NULL;
238: } else {
239: memset(psTopic, 0, topicLen--);
240: memcpy(psTopic, var->var_data, ntohs(var->var_sb.val) > topicLen ?
241: topicLen : ntohs(var->var_sb.val));
242: pos += MQTTHDR_VAR_SIZEOF(var);
243: v = (mqtt_len_t*) pos;
244: }
245:
246: len -= sizeof(mqtt_len_t);
247: if (len < 0) {
1.1.1.1.2.1 misho 248: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1 misho 249: return NULL;
250: } else {
251: *msgID = ntohs(v->val);
252: pos += sizeof(mqtt_len_t);
253: }
254:
255: /* data */
256: if (len < 0) {
1.1.1.1.2.1 misho 257: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1 misho 258: return NULL;
259: } else {
260: if (!*datLen) {
261: if (!(pData = malloc(len))) {
262: LOGERR;
263: return NULL;
264: } else
265: *datLen = len;
266: }
267:
268: memset(pData, 0, *datLen);
269: if (len < *datLen)
270: *datLen = len;
271: memcpy(pData, pos, *datLen);
272: }
273:
274: return hdr;
275: }
276:
277: /*
278: * mqtt_readPUBACK() Read PUBACK message
279: *
280: * @buf = Message buffer
281: * return: -1 error or MessageID
282: */
283: u_short
284: mqtt_readPUBACK(mqtt_msg_t * __restrict buf)
285: {
286: int len, ret;
287: struct mqtthdr *hdr;
288: mqtt_len_t *v;
289: caddr_t pos;
290:
291: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBACK, &ret, &len);
292: if (!hdr)
293: return (u_short) -1;
294: if (len < sizeof(mqtt_len_t)) {
1.1.1.1.2.1 misho 295: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1 misho 296: return (u_short) -1;
297: } else {
298: pos = buf->msg_base + ret + 1;
299: v = (mqtt_len_t*) pos;
300: }
301:
302: return ntohs(v->val);
303: }
304:
305: /*
306: * mqtt_readPUBREC() Read PUBREC message
307: *
308: * @buf = Message buffer
309: * return: -1 error or MessageID
310: */
311: u_short
312: mqtt_readPUBREC(mqtt_msg_t * __restrict buf)
313: {
314: int len, ret;
315: struct mqtthdr *hdr;
316: mqtt_len_t *v;
317: caddr_t pos;
318:
319: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBREC, &ret, &len);
320: if (!hdr)
321: return (u_short) -1;
322: if (len < sizeof(mqtt_len_t)) {
1.1.1.1.2.1 misho 323: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1 misho 324: return (u_short) -1;
325: } else {
326: pos = buf->msg_base + ret + 1;
327: v = (mqtt_len_t*) pos;
328: }
329:
330: return ntohs(v->val);
331: }
332:
333: /*
334: * mqtt_readPUBREL() Read PUBREL message
335: *
336: * @buf = Message buffer
337: * return: -1 error or MessageID
338: */
339: u_short
340: mqtt_readPUBREL(mqtt_msg_t * __restrict buf)
341: {
342: int len, ret;
343: struct mqtthdr *hdr;
344: mqtt_len_t *v;
345: caddr_t pos;
346:
347: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBREL, &ret, &len);
348: if (!hdr)
349: return (u_short) -1;
350: if (len < sizeof(mqtt_len_t)) {
1.1.1.1.2.1 misho 351: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1 misho 352: return (u_short) -1;
353: } else {
354: pos = buf->msg_base + ret + 1;
355: v = (mqtt_len_t*) pos;
356: }
357:
358: return ntohs(v->val);
359: }
360:
361: /*
362: * mqtt_readPUBCOMP() Read PUBCOMP message
363: *
364: * @buf = Message buffer
365: * return: -1 error or MessageID
366: */
367: u_short
368: mqtt_readPUBCOMP(mqtt_msg_t * __restrict buf)
369: {
370: int len, ret;
371: struct mqtthdr *hdr;
372: mqtt_len_t *v;
373: caddr_t pos;
374:
375: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBCOMP, &ret, &len);
376: if (!hdr)
377: return (u_short) -1;
378: if (len < sizeof(mqtt_len_t)) {
1.1.1.1.2.1 misho 379: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1 misho 380: return (u_short) -1;
381: } else {
382: pos = buf->msg_base + ret + 1;
383: v = (mqtt_len_t*) pos;
384: }
385:
386: return ntohs(v->val);
387: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>