Annotation of libaitmqtt/src/sub.c, revision 1.1.1.1.2.1
1.1 misho 1: /*************************************************************************
2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.1.1.1.2.1! misho 6: * $Id: sub.c,v 1.1.1.1 2012/01/26 13:07:33 misho Exp $
1.1 misho 7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
1.1.1.1.2.1! misho 15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
1.1 misho 16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47:
48:
49: /*
50: * mqtt_msgSUBSCRIBE() Create SUBSCRIBE message
51: *
52: * @buf = Message buffer
53: * @Topics = MQTT subscription topics
54: * @msgID = MessageID
55: * @Dup = Duplicate message
56: * @QOS = QoS
57: * return: -1 error or >-1 message size for send
58: */
59: int
60: mqtt_msgSUBSCRIBE(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics,
61: u_short msgID, u_char Dup, u_char QOS)
62: {
63: int siz = 0;
64: struct mqtthdr *hdr;
65: mqtthdr_var_t *topic;
66: mqtt_len_t *mid;
67: mqtt_subscr_t *t;
68: u_char *qos;
69:
70: if (!buf || !Topics)
71: return -1;
72: if (QOS > MQTT_QOS_EXACTLY) {
1.1.1.1.2.1! misho 73: mqtt_SetErr(EINVAL, "Invalid QoS parameter");
1.1 misho 74: return -1;
75: }
76: if (!msgID && QOS != MQTT_QOS_ONCE) {
1.1.1.1.2.1! misho 77: mqtt_SetErr(EINVAL, "Invalid MessageID parameter must be >0");
1.1 misho 78: return -1;
79: }
80:
81: if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
82: return -1;
83: else {
84: hdr = (struct mqtthdr *) (buf->msg_base + siz);
85: siz += sizeof(struct mqtthdr);
86: }
87:
88: /* variable header */
89: mid = (mqtt_len_t*) (buf->msg_base + siz);
90: mid->val = htons(msgID);
91: siz += sizeof(mqtt_len_t);
92:
93: /* payload with subscriptions */
94: for (t = Topics; t && t->sub_topic.msg_base; t++) {
95: topic = (mqtthdr_var_t*) (buf->msg_base + siz);
96: topic->var_sb.val = htons(t->sub_topic.msg_len);
97: memcpy(topic->var_data, t->sub_topic.msg_base, ntohs(topic->var_sb.val));
98: siz += MQTTHDR_VAR_SIZEOF(topic);
99: qos = (buf->msg_base + siz);
100: *qos = t->sub_ret;
101: siz++;
102: }
103:
104: /* fixed header */
105: MQTTHDR_MSGINIT(hdr);
106: hdr->mqtt_msg.type = MQTT_TYPE_SUBSCRIBE;
107: hdr->mqtt_msg.qos = QOS;
108: hdr->mqtt_msg.dup = Dup ? 1 : 0;
109: hdr->mqtt_msg.retain = 0;
110: *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
111:
112: mqtt_msgRealloc(buf, siz);
113: return siz;
114: }
115:
116: /*
117: * mqtt_msgSUBACK() Create SUBACK message
118: *
119: * @buf = Message buffer
120: * @Topics = MQTT subscription topics
121: * @msgID = MessageID
122: * return: -1 error or >-1 message size for send
123: */
124: int
125: mqtt_msgSUBACK(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, u_short msgID)
126: {
127: int siz = 0;
128: struct mqtthdr *hdr;
129: mqtt_len_t *v;
130: mqtt_subscr_t *t;
131: u_char *qos;
132:
133: if (!buf || !Topics)
134: return -1;
135:
136: if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
137: return -1;
138: else {
139: hdr = (struct mqtthdr *) (buf->msg_base + siz);
140: siz += sizeof(struct mqtthdr);
141: v = (mqtt_len_t*) (buf->msg_base + siz);
142: siz += sizeof(mqtt_len_t);
143: }
144:
145: /* MessageID */
146: v->val = htons(msgID);
147:
148: /* QoS payload from subscriptions */
149: for (t = Topics; t && t->sub_topic.msg_base; t++) {
150: qos = (buf->msg_base + siz);
151: *qos = t->sub_ret;
152: siz++;
153: }
154:
155: /* fixed header */
156: MQTTHDR_MSGINIT(hdr);
157: hdr->mqtt_msg.type = MQTT_TYPE_SUBACK;
158: *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
159:
160: mqtt_msgRealloc(buf, siz);
161: return siz;
162: }
163:
164: /*
165: * mqtt_msgUNSUBSCRIBE() Create UNSUBSCRIBE message
166: *
167: * @buf = Message buffer
168: * @Topics = MQTT subscription topics
169: * @msgID = MessageID
170: * @Dup = Duplicate message
171: * @QOS = QoS
172: * return: -1 error or >-1 message size for send
173: */
174: int
175: mqtt_msgUNSUBSCRIBE(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics,
176: u_short msgID, u_char Dup, u_char QOS)
177: {
178: int siz = 0;
179: struct mqtthdr *hdr;
180: mqtthdr_var_t *topic;
181: mqtt_len_t *mid;
182: mqtt_subscr_t *t;
183:
184: if (!buf || !Topics)
185: return -1;
186: if (QOS > MQTT_QOS_EXACTLY) {
1.1.1.1.2.1! misho 187: mqtt_SetErr(EINVAL, "Invalid QoS parameter");
1.1 misho 188: return -1;
189: }
190: if (!msgID && QOS != MQTT_QOS_ONCE) {
1.1.1.1.2.1! misho 191: mqtt_SetErr(EINVAL, "Invalid MessageID parameter must be >0");
1.1 misho 192: return -1;
193: }
194:
195: if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
196: return -1;
197: else {
198: hdr = (struct mqtthdr *) (buf->msg_base + siz);
199: siz += sizeof(struct mqtthdr);
200: }
201:
202: /* variable header */
203: mid = (mqtt_len_t*) (buf->msg_base + siz);
204: mid->val = htons(msgID);
205: siz += sizeof(mqtt_len_t);
206:
207: /* payload with subscriptions */
208: for (t = Topics; t && t->sub_topic.msg_base; t++) {
209: topic = (mqtthdr_var_t*) (buf->msg_base + siz);
210: topic->var_sb.val = htons(t->sub_topic.msg_len);
211: memcpy(topic->var_data, t->sub_topic.msg_base, ntohs(topic->var_sb.val));
212: siz += MQTTHDR_VAR_SIZEOF(topic);
213: }
214:
215: /* fixed header */
216: MQTTHDR_MSGINIT(hdr);
217: hdr->mqtt_msg.type = MQTT_TYPE_UNSUBSCRIBE;
218: hdr->mqtt_msg.qos = QOS;
219: hdr->mqtt_msg.dup = Dup ? 1 : 0;
220: hdr->mqtt_msg.retain = 0;
221: *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
222:
223: mqtt_msgRealloc(buf, siz);
224: return siz;
225: }
226:
227: /*
228: * mqtt_msgUNSUBACK() Create UNSUBACK message
229: *
230: * @buf = Message buffer
231: * @msgID = MessageID
232: * return: -1 error or >-1 message size for send
233: */
234: int
235: mqtt_msgUNSUBACK(mqtt_msg_t * __restrict buf, u_short msgID)
236: {
237: int siz = 0;
238: struct mqtthdr *hdr;
239: mqtt_len_t *v;
240:
241: if (!buf)
242: return -1;
243:
244: if (mqtt_msgRealloc(buf, sizeof(struct mqtthdr) + sizeof(mqtt_len_t)) == -1)
245: return -1;
246: else {
247: hdr = (struct mqtthdr *) (buf->msg_base + siz);
248: siz += sizeof(struct mqtthdr);
249: v = (mqtt_len_t*) (buf->msg_base + siz);
250: siz += sizeof(mqtt_len_t);
251: }
252:
253: /* fixed header */
254: MQTTHDR_MSGINIT(hdr);
255: hdr->mqtt_msg.type = MQTT_TYPE_UNSUBACK;
256: *hdr->mqtt_len = sizeof(mqtt_len_t);
257:
258: /* MessageID */
259: v->val = htons(msgID);
260:
261: return siz;
262: }
263:
264:
265: /* ============= decode ============ */
266:
267: /*
268: * mqtt_readSUBSCRIBE() Read SUBSCRIBE message
269: *
270: * @buf = Message buffer
271: * @msgID = MessageID
272: * @subscr = Subscriptions, must be free after use with mqtt_subFree()
273: * return: NULL error or !=NULL MQTT fixed header
274: */
275: struct mqtthdr *
276: mqtt_readSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)
277: {
278: register int i;
279: int len, ret;
280: struct mqtthdr *hdr;
281: mqtthdr_var_t *var;
282: mqtt_subscr_t *subs;
283: mqtt_len_t *v;
284: caddr_t pos;
285:
286: if (!buf || !msgID || !subscr)
287: return NULL;
288:
289: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBSCRIBE, &ret, &len);
290: if (!hdr)
291: return NULL;
292: pos = buf->msg_base + ret + 1;
293: v = (mqtt_len_t*) pos;
294:
295: /* MessageID */
296: len -= sizeof(mqtt_len_t);
297: if (len < 0) {
1.1.1.1.2.1! misho 298: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1 misho 299: return NULL;
300: } else {
301: *msgID = ntohs(v->val);
302: pos += sizeof(mqtt_len_t);
303: }
304:
305: subs = mqtt_subAlloc(0);
306: if (!subs)
307: return NULL;
308: else
309: *subscr = subs;
310:
311: /* Subscribes */
312: for (i = 0; len > 0; i++) {
313: var = (mqtthdr_var_t*) pos;
314: len -= MQTTHDR_VAR_SIZEOF(var) + 1;
315: if (len < 0) {
316: mqtt_subFree(subscr);
1.1.1.1.2.1! misho 317: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1 misho 318: return NULL;
319: }
320: subs = mqtt_subRealloc(subs, i + 1);
321: if (!subs) {
322: mqtt_subFree(subscr);
323: return NULL;
324: } else
325: *subscr = subs;
326:
327: memset(&subs[i], 0, sizeof subs[i]);
328: subs[i].sub_topic.msg_len = ntohs(var->var_sb.val);
329: subs[i].sub_topic.msg_base = malloc(subs[i].sub_topic.msg_len);
330: if (!subs[i].sub_topic.msg_base) {
331: LOGERR;
332: mqtt_subFree(subscr);
333: return NULL;
334: } else
335: memcpy(subs[i].sub_topic.msg_base, var->var_data, subs[i].sub_topic.msg_len);
336: pos += MQTTHDR_VAR_SIZEOF(var);
337:
338: subs[i].sub_ret = *pos;
339: pos++;
340: }
341:
342: return hdr;
343: }
344:
345: /*
346: * mqtt_readSUBACK() Read SUBACK message
347: *
348: * @buf = Message buffer
349: * @msgID = MessageID
350: * @subqos = Subscribes QoS, must be free after use with free()
351: * return: -1 error or >-1 readed subscribes QoS elements
352: */
353: int
354: mqtt_readSUBACK(mqtt_msg_t * __restrict buf, u_short *msgID, u_char **subqos)
355: {
356: int len, ret;
357: struct mqtthdr *hdr;
358: mqtt_len_t *v;
359: caddr_t pos;
360:
361: if (!buf || !msgID || !subqos)
362: return -1;
363:
364: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBACK, &ret, &len);
365: if (!hdr)
366: return -1;
367: pos = buf->msg_base + ret + 1;
368: v = (mqtt_len_t*) pos;
369:
370: /* MessageID */
371: len -= sizeof(mqtt_len_t);
372: if (len < 0) {
1.1.1.1.2.1! misho 373: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1 misho 374: return -1;
375: } else {
376: *msgID = ntohs(v->val);
377: pos += sizeof(mqtt_len_t);
378: }
379:
380: /* Subscribes */
381: *subqos = malloc(len);
382: if (!*subqos) {
383: LOGERR;
384: return -1;
385: } else
386: memcpy(*subqos, pos, len);
387:
388: return len;
389: }
390:
391: /*
392: * mqtt_readUNSUBSCRIBE() Read UNSUBSCRIBE message
393: *
394: * @buf = Message buffer
395: * @msgID = MessageID
396: * @subscr = Subscriptions, must be free after use with mqtt_subFree()
397: * return: NULL error or !=NULL MQTT fixed header
398: */
399: struct mqtthdr *
400: mqtt_readUNSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)
401: {
402: register int i;
403: int len, ret;
404: struct mqtthdr *hdr;
405: mqtthdr_var_t *var;
406: mqtt_subscr_t *subs;
407: mqtt_len_t *v;
408: caddr_t pos;
409:
410: if (!buf || !msgID || !subscr)
411: return NULL;
412:
413: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBSCRIBE, &ret, &len);
414: if (!hdr)
415: return NULL;
416: pos = buf->msg_base + ret + 1;
417: v = (mqtt_len_t*) pos;
418:
419: /* MessageID */
420: len -= sizeof(mqtt_len_t);
421: if (len < 0) {
1.1.1.1.2.1! misho 422: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1 misho 423: return NULL;
424: } else {
425: *msgID = ntohs(v->val);
426: pos += sizeof(mqtt_len_t);
427: }
428:
429: subs = mqtt_subAlloc(0);
430: if (!subs)
431: return NULL;
432: else
433: *subscr = subs;
434:
435: /* Subscribes */
436: for (i = 0; len > 0; i++) {
437: var = (mqtthdr_var_t*) pos;
438: len -= MQTTHDR_VAR_SIZEOF(var);
439: if (len < 0) {
440: mqtt_subFree(subscr);
1.1.1.1.2.1! misho 441: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1 misho 442: return NULL;
443: }
444: subs = mqtt_subRealloc(subs, i + 1);
445: if (!subs) {
446: mqtt_subFree(subscr);
447: return NULL;
448: } else
449: *subscr = subs;
450:
451: memset(&subs[i], 0, sizeof subs[i]);
452: subs[i].sub_topic.msg_len = ntohs(var->var_sb.val);
453: subs[i].sub_topic.msg_base = malloc(subs[i].sub_topic.msg_len);
454: if (!subs[i].sub_topic.msg_base) {
455: LOGERR;
456: mqtt_subFree(subscr);
457: return NULL;
458: } else
459: memcpy(subs[i].sub_topic.msg_base, var->var_data, subs[i].sub_topic.msg_len);
460: pos += MQTTHDR_VAR_SIZEOF(var);
461: }
462:
463: return hdr;
464: }
465:
466: /*
467: * mqtt_readUNSUBACK() Read UNSUBACK message
468: *
469: * @buf = Message buffer
470: * return: -1 error or MessageID
471: */
472: u_short
473: mqtt_readUNSUBACK(mqtt_msg_t * __restrict buf)
474: {
475: int len, ret;
476: struct mqtthdr *hdr;
477: mqtt_len_t *v;
478: caddr_t pos;
479:
480: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBACK, &ret, &len);
481: if (!hdr)
482: return (u_short) -1;
483: if (len < sizeof(mqtt_len_t)) {
1.1.1.1.2.1! misho 484: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1 misho 485: return (u_short) -1;
486: } else {
487: pos = buf->msg_base + ret + 1;
488: v = (mqtt_len_t*) pos;
489: }
490:
491: return ntohs(v->val);
492: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>