Annotation of libaitmqtt/src/pub.c, revision 1.2.2.1
1.1 misho 1: /*************************************************************************
2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.2.2.1 ! misho 6: * $Id: pub.c,v 1.2 2012/06/20 15:02:24 misho Exp $
1.1 misho 7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
1.2 misho 15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
1.1 misho 16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47:
48:
49: /*
50: * mqtt_msgPUBLISH() Create PUBLISH message
51: *
52: * @buf = Message buffer
53: * @csTopic = Publish topic
54: * @msgID = MessageID >0, if QOS != MQTT_QOS_ONCE
55: * @Dup = Duplicate message
56: * @QOS = QoS
57: * @Retain = Retain message
58: * @pData = Publish data into topic
59: * @datlen = Publish data length
60: * return: -1 error or >-1 message size for send
61: */
62: int
63: mqtt_msgPUBLISH(mqtt_msg_t * __restrict buf, const char *csTopic, u_short msgID,
64: u_char Dup, u_char QOS, u_char Retain, const void *pData, int datlen)
65: {
1.2 misho 66: int len, siz;
1.2.2.1 ! misho 67: u_int n, *l;
1.1 misho 68: struct mqtthdr *hdr;
69: mqtthdr_var_t *topic;
70: mqtt_len_t *mid;
71: void *data;
72:
73: if (!buf || !csTopic)
74: return -1;
75: if (QOS > MQTT_QOS_EXACTLY) {
1.2 misho 76: mqtt_SetErr(EINVAL, "Invalid QoS parameter");
1.1 misho 77: return -1;
78: }
79: if (!msgID && QOS != MQTT_QOS_ONCE) {
1.2 misho 80: mqtt_SetErr(EINVAL, "Invalid MessageID parameter must be >0");
1.1 misho 81: return -1;
82: }
83:
1.2 misho 84: /* calculate message size */
85: len = sizeof(mqtt_len_t) + strlen(csTopic); /* topic */
86: len += sizeof(mqtt_len_t); /* msgid */
87: len += datlen; /* data len */
88:
89: /* calculate header size */
90: siz = sizeof(struct mqtthdr); /* mqtt fixed header */
91: n = mqtt_encodeLen(len); /* message size */
92: siz += mqtt_sizeLen(n) - 1; /* length size */
93:
94: if (mqtt_msgRealloc(buf, siz + len) == -1)
1.1 misho 95: return -1;
96: else {
1.2 misho 97: data = buf->msg_base;
98: hdr = (struct mqtthdr *) data;
1.1 misho 99: }
100:
1.2 misho 101: /* fixed header */
102: MQTTHDR_MSGINIT(hdr);
103: hdr->mqtt_msg.type = MQTT_TYPE_PUBLISH;
104: hdr->mqtt_msg.qos = QOS;
105: hdr->mqtt_msg.dup = Dup ? 1 : 0;
106: hdr->mqtt_msg.retain = Retain ? 1 : 0;
1.2.2.1 ! misho 107: l = (u_int*) hdr->mqtt_len;
! 108: *l = n;
1.2 misho 109: data += siz;
110:
1.1 misho 111: /* variable header */
1.2 misho 112: topic = (mqtthdr_var_t*) data;
1.1 misho 113: topic->var_sb.val = htons(strlen(csTopic));
114: memcpy(topic->var_data, csTopic, ntohs(topic->var_sb.val));
1.2 misho 115: data += MQTTHDR_VAR_SIZEOF(topic);
1.1 misho 116:
1.2 misho 117: mid = (mqtt_len_t*) data;
1.1 misho 118: mid->val = htons(msgID);
1.2 misho 119: data += sizeof(mqtt_len_t);
1.1 misho 120:
121: /* load with data */
1.2 misho 122: if (pData && datlen)
1.1 misho 123: memcpy(data, pData, datlen);
124:
1.2 misho 125: return siz + len;
1.1 misho 126: }
127:
128: static int
129: _mqtt_msgPUB_(mqtt_msg_t * __restrict buf, u_char cmd, u_short msgID)
130: {
131: int siz = 0;
132: struct mqtthdr *hdr;
133: mqtt_len_t *v;
134:
135: if (!buf)
136: return -1;
137:
138: if (mqtt_msgRealloc(buf, sizeof(struct mqtthdr) + sizeof(mqtt_len_t)) == -1)
139: return -1;
140: else {
141: hdr = (struct mqtthdr *) (buf->msg_base + siz);
142: siz += sizeof(struct mqtthdr);
143: v = (mqtt_len_t*) (buf->msg_base + siz);
144: siz += sizeof(mqtt_len_t);
145: }
146:
147: /* fixed header */
148: MQTTHDR_MSGINIT(hdr);
149: hdr->mqtt_msg.type = cmd;
150: *hdr->mqtt_len = sizeof(mqtt_len_t);
151:
152: /* MessageID */
153: v->val = htons(msgID);
154:
155: return siz;
156: }
157:
158: /*
159: * mqtt_msgPUBACK() Create PUBACK message
160: *
161: * @buf = Message buffer
162: * @msgID = MessageID
163: * return: -1 error or >-1 message size for send
164: */
165: inline int
166: mqtt_msgPUBACK(mqtt_msg_t * __restrict buf, u_short msgID)
167: {
168: return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBACK, msgID);
169: }
170:
171: /*
172: * mqtt_msgPUBREC() Create PUBREC message
173: *
174: * @buf = Message buffer
175: * @msgID = MessageID
176: * return: -1 error or >-1 message size for send
177: */
178: inline int
179: mqtt_msgPUBREC(mqtt_msg_t * __restrict buf, u_short msgID)
180: {
181: return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBREC, msgID);
182: }
183:
184: /*
185: * mqtt_msgPUBREL() Create PUBREL message
186: *
187: * @buf = Message buffer
188: * @msgID = MessageID
189: * return: -1 error or >-1 message size for send
190: */
191: inline int
192: mqtt_msgPUBREL(mqtt_msg_t * __restrict buf, u_short msgID)
193: {
194: return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBREL, msgID);
195: }
196:
197: /*
198: * mqtt_msgPUBCOMP() Create PUBCOMP message
199: *
200: * @buf = Message buffer
201: * @msgID = MessageID
202: * return: -1 error or >-1 message size for send
203: */
204: inline int
205: mqtt_msgPUBCOMP(mqtt_msg_t * __restrict buf, u_short msgID)
206: {
207: return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBCOMP, msgID);
208: }
209:
210:
211: /* ============= decode ============ */
212:
213: /*
214: * mqtt_readPUBLISH() Read PUBLISH message
215: *
216: * @buf = Message buffer
217: * @psTopic = Topic
218: * @topicLen = Topic length
219: * @msgID = MessageID
1.2 misho 220: * @pData = Data buffer, may be NULL
221: * return: -1 error or !=-1 allocated data buffer length
1.1 misho 222: */
1.2 misho 223: int
1.1 misho 224: mqtt_readPUBLISH(mqtt_msg_t * __restrict buf, char * __restrict psTopic, int topicLen,
1.2 misho 225: u_short *msgID, void ** __restrict pData)
1.1 misho 226: {
227: int len, ret;
228: struct mqtthdr *hdr;
229: mqtthdr_var_t *var;
230: mqtt_len_t *v;
231: caddr_t pos;
232:
1.2 misho 233: if (!buf || !psTopic || !msgID)
234: return -1;
1.1 misho 235:
236: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBLISH, &ret, &len);
237: if (!hdr)
1.2 misho 238: return -1;
1.1 misho 239: pos = buf->msg_base + ret + 1;
240: var = (mqtthdr_var_t*) pos;
241:
242: /* topic */
243: len -= MQTTHDR_VAR_SIZEOF(var);
244: if (len < 0) {
1.2 misho 245: mqtt_SetErr(EINVAL, "Short message length %d", len);
246: return -1;
1.1 misho 247: } else {
248: memset(psTopic, 0, topicLen--);
249: memcpy(psTopic, var->var_data, ntohs(var->var_sb.val) > topicLen ?
250: topicLen : ntohs(var->var_sb.val));
251: pos += MQTTHDR_VAR_SIZEOF(var);
252: v = (mqtt_len_t*) pos;
253: }
254:
255: len -= sizeof(mqtt_len_t);
256: if (len < 0) {
1.2 misho 257: mqtt_SetErr(EINVAL, "Short message length %d", len);
258: return -1;
1.1 misho 259: } else {
260: *msgID = ntohs(v->val);
261: pos += sizeof(mqtt_len_t);
262: }
263:
264: /* data */
265: if (len < 0) {
1.2 misho 266: mqtt_SetErr(EINVAL, "Short message length %d", len);
267: return -1;
268: } else if (pData) {
269: if (!(*pData = malloc(len + 1))) {
270: LOGERR;
271: return -1;
272: } else
273: ((char*) (*pData))[len] = 0;
274:
275: memcpy(*pData, pos, len);
1.1 misho 276: }
277:
1.2 misho 278: return len;
1.1 misho 279: }
280:
281: /*
282: * mqtt_readPUBACK() Read PUBACK message
283: *
284: * @buf = Message buffer
285: * return: -1 error or MessageID
286: */
287: u_short
288: mqtt_readPUBACK(mqtt_msg_t * __restrict buf)
289: {
290: int len, ret;
291: struct mqtthdr *hdr;
292: mqtt_len_t *v;
293: caddr_t pos;
294:
295: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBACK, &ret, &len);
296: if (!hdr)
297: return (u_short) -1;
298: if (len < sizeof(mqtt_len_t)) {
1.2 misho 299: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1 misho 300: return (u_short) -1;
301: } else {
302: pos = buf->msg_base + ret + 1;
303: v = (mqtt_len_t*) pos;
304: }
305:
306: return ntohs(v->val);
307: }
308:
309: /*
310: * mqtt_readPUBREC() Read PUBREC message
311: *
312: * @buf = Message buffer
313: * return: -1 error or MessageID
314: */
315: u_short
316: mqtt_readPUBREC(mqtt_msg_t * __restrict buf)
317: {
318: int len, ret;
319: struct mqtthdr *hdr;
320: mqtt_len_t *v;
321: caddr_t pos;
322:
323: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBREC, &ret, &len);
324: if (!hdr)
325: return (u_short) -1;
326: if (len < sizeof(mqtt_len_t)) {
1.2 misho 327: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1 misho 328: return (u_short) -1;
329: } else {
330: pos = buf->msg_base + ret + 1;
331: v = (mqtt_len_t*) pos;
332: }
333:
334: return ntohs(v->val);
335: }
336:
337: /*
338: * mqtt_readPUBREL() Read PUBREL message
339: *
340: * @buf = Message buffer
341: * return: -1 error or MessageID
342: */
343: u_short
344: mqtt_readPUBREL(mqtt_msg_t * __restrict buf)
345: {
346: int len, ret;
347: struct mqtthdr *hdr;
348: mqtt_len_t *v;
349: caddr_t pos;
350:
351: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBREL, &ret, &len);
352: if (!hdr)
353: return (u_short) -1;
354: if (len < sizeof(mqtt_len_t)) {
1.2 misho 355: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1 misho 356: return (u_short) -1;
357: } else {
358: pos = buf->msg_base + ret + 1;
359: v = (mqtt_len_t*) pos;
360: }
361:
362: return ntohs(v->val);
363: }
364:
365: /*
366: * mqtt_readPUBCOMP() Read PUBCOMP message
367: *
368: * @buf = Message buffer
369: * return: -1 error or MessageID
370: */
371: u_short
372: mqtt_readPUBCOMP(mqtt_msg_t * __restrict buf)
373: {
374: int len, ret;
375: struct mqtthdr *hdr;
376: mqtt_len_t *v;
377: caddr_t pos;
378:
379: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBCOMP, &ret, &len);
380: if (!hdr)
381: return (u_short) -1;
382: if (len < sizeof(mqtt_len_t)) {
1.2 misho 383: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1 misho 384: return (u_short) -1;
385: } else {
386: pos = buf->msg_base + ret + 1;
387: v = (mqtt_len_t*) pos;
388: }
389:
390: return ntohs(v->val);
391: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>