Annotation of libaitmqtt/src/pub.c, revision 1.1.1.1.2.6
1.1 misho 1: /*************************************************************************
2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.1.1.1.2.6! misho 6: * $Id: pub.c,v 1.1.1.1.2.5 2012/06/11 08:37:41 misho Exp $
1.1 misho 7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
1.1.1.1.2.1 misho 15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
1.1 misho 16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47:
48:
49: /*
50: * mqtt_msgPUBLISH() Create PUBLISH message
51: *
52: * @buf = Message buffer
53: * @csTopic = Publish topic
54: * @msgID = MessageID >0, if QOS != MQTT_QOS_ONCE
55: * @Dup = Duplicate message
56: * @QOS = QoS
57: * @Retain = Retain message
58: * @pData = Publish data into topic
59: * @datlen = Publish data length
60: * return: -1 error or >-1 message size for send
61: */
62: int
63: mqtt_msgPUBLISH(mqtt_msg_t * __restrict buf, const char *csTopic, u_short msgID,
64: u_char Dup, u_char QOS, u_char Retain, const void *pData, int datlen)
65: {
1.1.1.1.2.6! misho 66: int len, siz;
1.1 misho 67: struct mqtthdr *hdr;
68: mqtthdr_var_t *topic;
69: mqtt_len_t *mid;
70: void *data;
71:
72: if (!buf || !csTopic)
73: return -1;
1.1.1.1.2.6! misho 74: else
! 75: data = buf->msg_base;
1.1 misho 76: if (QOS > MQTT_QOS_EXACTLY) {
1.1.1.1.2.1 misho 77: mqtt_SetErr(EINVAL, "Invalid QoS parameter");
1.1 misho 78: return -1;
79: }
80: if (!msgID && QOS != MQTT_QOS_ONCE) {
1.1.1.1.2.1 misho 81: mqtt_SetErr(EINVAL, "Invalid MessageID parameter must be >0");
1.1 misho 82: return -1;
83: }
84:
1.1.1.1.2.6! misho 85: /* calculate message size */
! 86: len = sizeof(mqtt_len_t) + strlen(csTopic); /* topic */
! 87: len += sizeof(mqtt_len_t); /* msgid */
! 88: len += datlen; /* data len */
! 89:
! 90: /* calculate header size */
! 91: siz = sizeof(struct mqtthdr); /* mqtt fixed header */
! 92: siz += mqtt_sizeLen(mqtt_encodeLen(len)) - 1; /* length size */
! 93:
! 94: if (mqtt_msgRealloc(buf, siz + len) == -1)
1.1 misho 95: return -1;
1.1.1.1.2.6! misho 96: else
! 97: hdr = (struct mqtthdr *) data;
! 98:
! 99: /* fixed header */
! 100: MQTTHDR_MSGINIT(hdr);
! 101: hdr->mqtt_msg.type = MQTT_TYPE_PUBLISH;
! 102: hdr->mqtt_msg.qos = QOS;
! 103: hdr->mqtt_msg.dup = Dup ? 1 : 0;
! 104: hdr->mqtt_msg.retain = Retain ? 1 : 0;
! 105: *hdr->mqtt_len = mqtt_encodeLen(len);
! 106: data += siz;
1.1 misho 107:
108: /* variable header */
1.1.1.1.2.6! misho 109: topic = (mqtthdr_var_t*) data;
1.1 misho 110: topic->var_sb.val = htons(strlen(csTopic));
111: memcpy(topic->var_data, csTopic, ntohs(topic->var_sb.val));
1.1.1.1.2.6! misho 112: data += MQTTHDR_VAR_SIZEOF(topic);
1.1 misho 113:
1.1.1.1.2.6! misho 114: mid = (mqtt_len_t*) data;
1.1 misho 115: mid->val = htons(msgID);
1.1.1.1.2.6! misho 116: data += sizeof(mqtt_len_t);
1.1 misho 117:
118: /* load with data */
1.1.1.1.2.6! misho 119: if (pData && datlen)
1.1 misho 120: memcpy(data, pData, datlen);
121:
1.1.1.1.2.6! misho 122: return siz + len;
1.1 misho 123: }
124:
125: static int
126: _mqtt_msgPUB_(mqtt_msg_t * __restrict buf, u_char cmd, u_short msgID)
127: {
128: int siz = 0;
129: struct mqtthdr *hdr;
130: mqtt_len_t *v;
131:
132: if (!buf)
133: return -1;
134:
135: if (mqtt_msgRealloc(buf, sizeof(struct mqtthdr) + sizeof(mqtt_len_t)) == -1)
136: return -1;
137: else {
138: hdr = (struct mqtthdr *) (buf->msg_base + siz);
139: siz += sizeof(struct mqtthdr);
140: v = (mqtt_len_t*) (buf->msg_base + siz);
141: siz += sizeof(mqtt_len_t);
142: }
143:
144: /* fixed header */
145: MQTTHDR_MSGINIT(hdr);
146: hdr->mqtt_msg.type = cmd;
147: *hdr->mqtt_len = sizeof(mqtt_len_t);
148:
149: /* MessageID */
150: v->val = htons(msgID);
151:
152: return siz;
153: }
154:
155: /*
156: * mqtt_msgPUBACK() Create PUBACK message
157: *
158: * @buf = Message buffer
159: * @msgID = MessageID
160: * return: -1 error or >-1 message size for send
161: */
162: inline int
163: mqtt_msgPUBACK(mqtt_msg_t * __restrict buf, u_short msgID)
164: {
165: return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBACK, msgID);
166: }
167:
168: /*
169: * mqtt_msgPUBREC() Create PUBREC message
170: *
171: * @buf = Message buffer
172: * @msgID = MessageID
173: * return: -1 error or >-1 message size for send
174: */
175: inline int
176: mqtt_msgPUBREC(mqtt_msg_t * __restrict buf, u_short msgID)
177: {
178: return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBREC, msgID);
179: }
180:
181: /*
182: * mqtt_msgPUBREL() Create PUBREL message
183: *
184: * @buf = Message buffer
185: * @msgID = MessageID
186: * return: -1 error or >-1 message size for send
187: */
188: inline int
189: mqtt_msgPUBREL(mqtt_msg_t * __restrict buf, u_short msgID)
190: {
191: return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBREL, msgID);
192: }
193:
194: /*
195: * mqtt_msgPUBCOMP() Create PUBCOMP message
196: *
197: * @buf = Message buffer
198: * @msgID = MessageID
199: * return: -1 error or >-1 message size for send
200: */
201: inline int
202: mqtt_msgPUBCOMP(mqtt_msg_t * __restrict buf, u_short msgID)
203: {
204: return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBCOMP, msgID);
205: }
206:
207:
208: /* ============= decode ============ */
209:
210: /*
211: * mqtt_readPUBLISH() Read PUBLISH message
212: *
213: * @buf = Message buffer
214: * @psTopic = Topic
215: * @topicLen = Topic length
216: * @msgID = MessageID
1.1.1.1.2.5 misho 217: * @pData = Data buffer, may be NULL
1.1.1.1.2.4 misho 218: * return: -1 error or !=-1 allocated data buffer length
1.1 misho 219: */
1.1.1.1.2.4 misho 220: int
1.1 misho 221: mqtt_readPUBLISH(mqtt_msg_t * __restrict buf, char * __restrict psTopic, int topicLen,
1.1.1.1.2.4 misho 222: u_short *msgID, void ** __restrict pData)
1.1 misho 223: {
224: int len, ret;
225: struct mqtthdr *hdr;
226: mqtthdr_var_t *var;
227: mqtt_len_t *v;
228: caddr_t pos;
229:
1.1.1.1.2.5 misho 230: if (!buf || !psTopic || !msgID)
1.1.1.1.2.4 misho 231: return -1;
1.1 misho 232:
233: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBLISH, &ret, &len);
234: if (!hdr)
1.1.1.1.2.4 misho 235: return -1;
1.1 misho 236: pos = buf->msg_base + ret + 1;
237: var = (mqtthdr_var_t*) pos;
238:
239: /* topic */
240: len -= MQTTHDR_VAR_SIZEOF(var);
241: if (len < 0) {
1.1.1.1.2.1 misho 242: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1.1.1.2.4 misho 243: return -1;
1.1 misho 244: } else {
245: memset(psTopic, 0, topicLen--);
246: memcpy(psTopic, var->var_data, ntohs(var->var_sb.val) > topicLen ?
247: topicLen : ntohs(var->var_sb.val));
248: pos += MQTTHDR_VAR_SIZEOF(var);
249: v = (mqtt_len_t*) pos;
250: }
251:
252: len -= sizeof(mqtt_len_t);
253: if (len < 0) {
1.1.1.1.2.1 misho 254: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1.1.1.2.4 misho 255: return -1;
1.1 misho 256: } else {
257: *msgID = ntohs(v->val);
258: pos += sizeof(mqtt_len_t);
259: }
260:
261: /* data */
262: if (len < 0) {
1.1.1.1.2.1 misho 263: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1.1.1.2.4 misho 264: return -1;
1.1.1.1.2.5 misho 265: } else if (pData) {
1.1.1.1.2.4 misho 266: if (!(*pData = malloc(len + 1))) {
267: LOGERR;
268: return -1;
269: } else
270: ((char*) (*pData))[len] = 0;
271:
272: memcpy(*pData, pos, len);
1.1 misho 273: }
274:
1.1.1.1.2.4 misho 275: return len;
1.1 misho 276: }
277:
278: /*
279: * mqtt_readPUBACK() Read PUBACK message
280: *
281: * @buf = Message buffer
282: * return: -1 error or MessageID
283: */
284: u_short
285: mqtt_readPUBACK(mqtt_msg_t * __restrict buf)
286: {
287: int len, ret;
288: struct mqtthdr *hdr;
289: mqtt_len_t *v;
290: caddr_t pos;
291:
292: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBACK, &ret, &len);
293: if (!hdr)
294: return (u_short) -1;
295: if (len < sizeof(mqtt_len_t)) {
1.1.1.1.2.1 misho 296: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1 misho 297: return (u_short) -1;
298: } else {
299: pos = buf->msg_base + ret + 1;
300: v = (mqtt_len_t*) pos;
301: }
302:
303: return ntohs(v->val);
304: }
305:
306: /*
307: * mqtt_readPUBREC() Read PUBREC message
308: *
309: * @buf = Message buffer
310: * return: -1 error or MessageID
311: */
312: u_short
313: mqtt_readPUBREC(mqtt_msg_t * __restrict buf)
314: {
315: int len, ret;
316: struct mqtthdr *hdr;
317: mqtt_len_t *v;
318: caddr_t pos;
319:
320: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBREC, &ret, &len);
321: if (!hdr)
322: return (u_short) -1;
323: if (len < sizeof(mqtt_len_t)) {
1.1.1.1.2.1 misho 324: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1 misho 325: return (u_short) -1;
326: } else {
327: pos = buf->msg_base + ret + 1;
328: v = (mqtt_len_t*) pos;
329: }
330:
331: return ntohs(v->val);
332: }
333:
334: /*
335: * mqtt_readPUBREL() Read PUBREL message
336: *
337: * @buf = Message buffer
338: * return: -1 error or MessageID
339: */
340: u_short
341: mqtt_readPUBREL(mqtt_msg_t * __restrict buf)
342: {
343: int len, ret;
344: struct mqtthdr *hdr;
345: mqtt_len_t *v;
346: caddr_t pos;
347:
348: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBREL, &ret, &len);
349: if (!hdr)
350: return (u_short) -1;
351: if (len < sizeof(mqtt_len_t)) {
1.1.1.1.2.1 misho 352: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1 misho 353: return (u_short) -1;
354: } else {
355: pos = buf->msg_base + ret + 1;
356: v = (mqtt_len_t*) pos;
357: }
358:
359: return ntohs(v->val);
360: }
361:
362: /*
363: * mqtt_readPUBCOMP() Read PUBCOMP message
364: *
365: * @buf = Message buffer
366: * return: -1 error or MessageID
367: */
368: u_short
369: mqtt_readPUBCOMP(mqtt_msg_t * __restrict buf)
370: {
371: int len, ret;
372: struct mqtthdr *hdr;
373: mqtt_len_t *v;
374: caddr_t pos;
375:
376: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBCOMP, &ret, &len);
377: if (!hdr)
378: return (u_short) -1;
379: if (len < sizeof(mqtt_len_t)) {
1.1.1.1.2.1 misho 380: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1 misho 381: return (u_short) -1;
382: } else {
383: pos = buf->msg_base + ret + 1;
384: v = (mqtt_len_t*) pos;
385: }
386:
387: return ntohs(v->val);
388: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>