Annotation of libaitmqtt/src/pub.c, revision 1.1.1.1.2.7
1.1 misho 1: /*************************************************************************
2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.1.1.1.2.7! misho 6: * $Id: pub.c,v 1.1.1.1.2.6 2012/06/19 15:41:15 misho Exp $
1.1 misho 7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
1.1.1.1.2.1 misho 15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
1.1 misho 16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47:
48:
49: /*
50: * mqtt_msgPUBLISH() Create PUBLISH message
51: *
52: * @buf = Message buffer
53: * @csTopic = Publish topic
54: * @msgID = MessageID >0, if QOS != MQTT_QOS_ONCE
55: * @Dup = Duplicate message
56: * @QOS = QoS
57: * @Retain = Retain message
58: * @pData = Publish data into topic
59: * @datlen = Publish data length
60: * return: -1 error or >-1 message size for send
61: */
62: int
63: mqtt_msgPUBLISH(mqtt_msg_t * __restrict buf, const char *csTopic, u_short msgID,
64: u_char Dup, u_char QOS, u_char Retain, const void *pData, int datlen)
65: {
1.1.1.1.2.6 misho 66: int len, siz;
1.1.1.1.2.7! misho 67: u_int n;
1.1 misho 68: struct mqtthdr *hdr;
69: mqtthdr_var_t *topic;
70: mqtt_len_t *mid;
71: void *data;
72:
73: if (!buf || !csTopic)
74: return -1;
1.1.1.1.2.6 misho 75: else
76: data = buf->msg_base;
1.1 misho 77: if (QOS > MQTT_QOS_EXACTLY) {
1.1.1.1.2.1 misho 78: mqtt_SetErr(EINVAL, "Invalid QoS parameter");
1.1 misho 79: return -1;
80: }
81: if (!msgID && QOS != MQTT_QOS_ONCE) {
1.1.1.1.2.1 misho 82: mqtt_SetErr(EINVAL, "Invalid MessageID parameter must be >0");
1.1 misho 83: return -1;
84: }
85:
1.1.1.1.2.6 misho 86: /* calculate message size */
87: len = sizeof(mqtt_len_t) + strlen(csTopic); /* topic */
88: len += sizeof(mqtt_len_t); /* msgid */
89: len += datlen; /* data len */
90:
91: /* calculate header size */
92: siz = sizeof(struct mqtthdr); /* mqtt fixed header */
1.1.1.1.2.7! misho 93: n = mqtt_encodeLen(len); /* message size */
! 94: siz += mqtt_sizeLen(n) - 1; /* length size */
1.1.1.1.2.6 misho 95:
96: if (mqtt_msgRealloc(buf, siz + len) == -1)
1.1 misho 97: return -1;
1.1.1.1.2.6 misho 98: else
99: hdr = (struct mqtthdr *) data;
100:
101: /* fixed header */
102: MQTTHDR_MSGINIT(hdr);
103: hdr->mqtt_msg.type = MQTT_TYPE_PUBLISH;
104: hdr->mqtt_msg.qos = QOS;
105: hdr->mqtt_msg.dup = Dup ? 1 : 0;
106: hdr->mqtt_msg.retain = Retain ? 1 : 0;
1.1.1.1.2.7! misho 107: *(u_int*) hdr->mqtt_len = n;
1.1.1.1.2.6 misho 108: data += siz;
1.1 misho 109:
110: /* variable header */
1.1.1.1.2.6 misho 111: topic = (mqtthdr_var_t*) data;
1.1 misho 112: topic->var_sb.val = htons(strlen(csTopic));
113: memcpy(topic->var_data, csTopic, ntohs(topic->var_sb.val));
1.1.1.1.2.6 misho 114: data += MQTTHDR_VAR_SIZEOF(topic);
1.1 misho 115:
1.1.1.1.2.6 misho 116: mid = (mqtt_len_t*) data;
1.1 misho 117: mid->val = htons(msgID);
1.1.1.1.2.6 misho 118: data += sizeof(mqtt_len_t);
1.1 misho 119:
120: /* load with data */
1.1.1.1.2.6 misho 121: if (pData && datlen)
1.1 misho 122: memcpy(data, pData, datlen);
123:
1.1.1.1.2.6 misho 124: return siz + len;
1.1 misho 125: }
126:
127: static int
128: _mqtt_msgPUB_(mqtt_msg_t * __restrict buf, u_char cmd, u_short msgID)
129: {
130: int siz = 0;
131: struct mqtthdr *hdr;
132: mqtt_len_t *v;
133:
134: if (!buf)
135: return -1;
136:
137: if (mqtt_msgRealloc(buf, sizeof(struct mqtthdr) + sizeof(mqtt_len_t)) == -1)
138: return -1;
139: else {
140: hdr = (struct mqtthdr *) (buf->msg_base + siz);
141: siz += sizeof(struct mqtthdr);
142: v = (mqtt_len_t*) (buf->msg_base + siz);
143: siz += sizeof(mqtt_len_t);
144: }
145:
146: /* fixed header */
147: MQTTHDR_MSGINIT(hdr);
148: hdr->mqtt_msg.type = cmd;
149: *hdr->mqtt_len = sizeof(mqtt_len_t);
150:
151: /* MessageID */
152: v->val = htons(msgID);
153:
154: return siz;
155: }
156:
157: /*
158: * mqtt_msgPUBACK() Create PUBACK message
159: *
160: * @buf = Message buffer
161: * @msgID = MessageID
162: * return: -1 error or >-1 message size for send
163: */
164: inline int
165: mqtt_msgPUBACK(mqtt_msg_t * __restrict buf, u_short msgID)
166: {
167: return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBACK, msgID);
168: }
169:
170: /*
171: * mqtt_msgPUBREC() Create PUBREC message
172: *
173: * @buf = Message buffer
174: * @msgID = MessageID
175: * return: -1 error or >-1 message size for send
176: */
177: inline int
178: mqtt_msgPUBREC(mqtt_msg_t * __restrict buf, u_short msgID)
179: {
180: return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBREC, msgID);
181: }
182:
183: /*
184: * mqtt_msgPUBREL() Create PUBREL message
185: *
186: * @buf = Message buffer
187: * @msgID = MessageID
188: * return: -1 error or >-1 message size for send
189: */
190: inline int
191: mqtt_msgPUBREL(mqtt_msg_t * __restrict buf, u_short msgID)
192: {
193: return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBREL, msgID);
194: }
195:
196: /*
197: * mqtt_msgPUBCOMP() Create PUBCOMP message
198: *
199: * @buf = Message buffer
200: * @msgID = MessageID
201: * return: -1 error or >-1 message size for send
202: */
203: inline int
204: mqtt_msgPUBCOMP(mqtt_msg_t * __restrict buf, u_short msgID)
205: {
206: return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBCOMP, msgID);
207: }
208:
209:
210: /* ============= decode ============ */
211:
212: /*
213: * mqtt_readPUBLISH() Read PUBLISH message
214: *
215: * @buf = Message buffer
216: * @psTopic = Topic
217: * @topicLen = Topic length
218: * @msgID = MessageID
1.1.1.1.2.5 misho 219: * @pData = Data buffer, may be NULL
1.1.1.1.2.4 misho 220: * return: -1 error or !=-1 allocated data buffer length
1.1 misho 221: */
1.1.1.1.2.4 misho 222: int
1.1 misho 223: mqtt_readPUBLISH(mqtt_msg_t * __restrict buf, char * __restrict psTopic, int topicLen,
1.1.1.1.2.4 misho 224: u_short *msgID, void ** __restrict pData)
1.1 misho 225: {
226: int len, ret;
227: struct mqtthdr *hdr;
228: mqtthdr_var_t *var;
229: mqtt_len_t *v;
230: caddr_t pos;
231:
1.1.1.1.2.5 misho 232: if (!buf || !psTopic || !msgID)
1.1.1.1.2.4 misho 233: return -1;
1.1 misho 234:
235: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBLISH, &ret, &len);
236: if (!hdr)
1.1.1.1.2.4 misho 237: return -1;
1.1 misho 238: pos = buf->msg_base + ret + 1;
239: var = (mqtthdr_var_t*) pos;
240:
241: /* topic */
242: len -= MQTTHDR_VAR_SIZEOF(var);
243: if (len < 0) {
1.1.1.1.2.1 misho 244: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1.1.1.2.4 misho 245: return -1;
1.1 misho 246: } else {
247: memset(psTopic, 0, topicLen--);
248: memcpy(psTopic, var->var_data, ntohs(var->var_sb.val) > topicLen ?
249: topicLen : ntohs(var->var_sb.val));
250: pos += MQTTHDR_VAR_SIZEOF(var);
251: v = (mqtt_len_t*) pos;
252: }
253:
254: len -= sizeof(mqtt_len_t);
255: if (len < 0) {
1.1.1.1.2.1 misho 256: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1.1.1.2.4 misho 257: return -1;
1.1 misho 258: } else {
259: *msgID = ntohs(v->val);
260: pos += sizeof(mqtt_len_t);
261: }
262:
263: /* data */
264: if (len < 0) {
1.1.1.1.2.1 misho 265: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1.1.1.2.4 misho 266: return -1;
1.1.1.1.2.5 misho 267: } else if (pData) {
1.1.1.1.2.4 misho 268: if (!(*pData = malloc(len + 1))) {
269: LOGERR;
270: return -1;
271: } else
272: ((char*) (*pData))[len] = 0;
273:
274: memcpy(*pData, pos, len);
1.1 misho 275: }
276:
1.1.1.1.2.4 misho 277: return len;
1.1 misho 278: }
279:
280: /*
281: * mqtt_readPUBACK() Read PUBACK message
282: *
283: * @buf = Message buffer
284: * return: -1 error or MessageID
285: */
286: u_short
287: mqtt_readPUBACK(mqtt_msg_t * __restrict buf)
288: {
289: int len, ret;
290: struct mqtthdr *hdr;
291: mqtt_len_t *v;
292: caddr_t pos;
293:
294: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBACK, &ret, &len);
295: if (!hdr)
296: return (u_short) -1;
297: if (len < sizeof(mqtt_len_t)) {
1.1.1.1.2.1 misho 298: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1 misho 299: return (u_short) -1;
300: } else {
301: pos = buf->msg_base + ret + 1;
302: v = (mqtt_len_t*) pos;
303: }
304:
305: return ntohs(v->val);
306: }
307:
308: /*
309: * mqtt_readPUBREC() Read PUBREC message
310: *
311: * @buf = Message buffer
312: * return: -1 error or MessageID
313: */
314: u_short
315: mqtt_readPUBREC(mqtt_msg_t * __restrict buf)
316: {
317: int len, ret;
318: struct mqtthdr *hdr;
319: mqtt_len_t *v;
320: caddr_t pos;
321:
322: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBREC, &ret, &len);
323: if (!hdr)
324: return (u_short) -1;
325: if (len < sizeof(mqtt_len_t)) {
1.1.1.1.2.1 misho 326: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1 misho 327: return (u_short) -1;
328: } else {
329: pos = buf->msg_base + ret + 1;
330: v = (mqtt_len_t*) pos;
331: }
332:
333: return ntohs(v->val);
334: }
335:
336: /*
337: * mqtt_readPUBREL() Read PUBREL message
338: *
339: * @buf = Message buffer
340: * return: -1 error or MessageID
341: */
342: u_short
343: mqtt_readPUBREL(mqtt_msg_t * __restrict buf)
344: {
345: int len, ret;
346: struct mqtthdr *hdr;
347: mqtt_len_t *v;
348: caddr_t pos;
349:
350: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBREL, &ret, &len);
351: if (!hdr)
352: return (u_short) -1;
353: if (len < sizeof(mqtt_len_t)) {
1.1.1.1.2.1 misho 354: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1 misho 355: return (u_short) -1;
356: } else {
357: pos = buf->msg_base + ret + 1;
358: v = (mqtt_len_t*) pos;
359: }
360:
361: return ntohs(v->val);
362: }
363:
364: /*
365: * mqtt_readPUBCOMP() Read PUBCOMP message
366: *
367: * @buf = Message buffer
368: * return: -1 error or MessageID
369: */
370: u_short
371: mqtt_readPUBCOMP(mqtt_msg_t * __restrict buf)
372: {
373: int len, ret;
374: struct mqtthdr *hdr;
375: mqtt_len_t *v;
376: caddr_t pos;
377:
378: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBCOMP, &ret, &len);
379: if (!hdr)
380: return (u_short) -1;
381: if (len < sizeof(mqtt_len_t)) {
1.1.1.1.2.1 misho 382: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1 misho 383: return (u_short) -1;
384: } else {
385: pos = buf->msg_base + ret + 1;
386: v = (mqtt_len_t*) pos;
387: }
388:
389: return ntohs(v->val);
390: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>