Annotation of libaitmqtt/src/sub.c, revision 1.1.1.1.2.5
1.1 misho 1: /*************************************************************************
2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.1.1.1.2.5! misho 6: * $Id: sub.c,v 1.1.1.1.2.4 2012/04/27 08:12:30 misho Exp $
1.1 misho 7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
1.1.1.1.2.1 misho 15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
1.1 misho 16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47:
48:
49: /*
50: * mqtt_msgSUBSCRIBE() Create SUBSCRIBE message
51: *
52: * @buf = Message buffer
53: * @Topics = MQTT subscription topics
54: * @msgID = MessageID
55: * @Dup = Duplicate message
56: * @QOS = QoS
57: * return: -1 error or >-1 message size for send
58: */
59: int
60: mqtt_msgSUBSCRIBE(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics,
61: u_short msgID, u_char Dup, u_char QOS)
62: {
63: int siz = 0;
64: struct mqtthdr *hdr;
65: mqtthdr_var_t *topic;
66: mqtt_len_t *mid;
67: mqtt_subscr_t *t;
68: u_char *qos;
69:
70: if (!buf || !Topics)
71: return -1;
72: if (QOS > MQTT_QOS_EXACTLY) {
1.1.1.1.2.1 misho 73: mqtt_SetErr(EINVAL, "Invalid QoS parameter");
1.1 misho 74: return -1;
75: }
76: if (!msgID && QOS != MQTT_QOS_ONCE) {
1.1.1.1.2.1 misho 77: mqtt_SetErr(EINVAL, "Invalid MessageID parameter must be >0");
1.1 misho 78: return -1;
79: }
80:
81: if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
82: return -1;
83: else {
84: hdr = (struct mqtthdr *) (buf->msg_base + siz);
85: siz += sizeof(struct mqtthdr);
86: }
87:
88: /* variable header */
89: mid = (mqtt_len_t*) (buf->msg_base + siz);
90: mid->val = htons(msgID);
91: siz += sizeof(mqtt_len_t);
92:
93: /* payload with subscriptions */
94: for (t = Topics; t && t->sub_topic.msg_base; t++) {
95: topic = (mqtthdr_var_t*) (buf->msg_base + siz);
96: topic->var_sb.val = htons(t->sub_topic.msg_len);
97: memcpy(topic->var_data, t->sub_topic.msg_base, ntohs(topic->var_sb.val));
98: siz += MQTTHDR_VAR_SIZEOF(topic);
99: qos = (buf->msg_base + siz);
100: *qos = t->sub_ret;
101: siz++;
102: }
103:
104: /* fixed header */
105: MQTTHDR_MSGINIT(hdr);
106: hdr->mqtt_msg.type = MQTT_TYPE_SUBSCRIBE;
107: hdr->mqtt_msg.qos = QOS;
108: hdr->mqtt_msg.dup = Dup ? 1 : 0;
109: hdr->mqtt_msg.retain = 0;
110: *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
111:
112: return siz;
113: }
114:
115: /*
116: * mqtt_msgSUBACK() Create SUBACK message
117: *
118: * @buf = Message buffer
119: * @Topics = MQTT subscription topics
120: * @msgID = MessageID
121: * return: -1 error or >-1 message size for send
122: */
123: int
124: mqtt_msgSUBACK(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, u_short msgID)
125: {
126: int siz = 0;
127: struct mqtthdr *hdr;
128: mqtt_len_t *v;
129: mqtt_subscr_t *t;
130: u_char *qos;
131:
132: if (!buf || !Topics)
133: return -1;
134:
135: if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
136: return -1;
137: else {
138: hdr = (struct mqtthdr *) (buf->msg_base + siz);
139: siz += sizeof(struct mqtthdr);
140: v = (mqtt_len_t*) (buf->msg_base + siz);
141: siz += sizeof(mqtt_len_t);
142: }
143:
144: /* MessageID */
145: v->val = htons(msgID);
146:
147: /* QoS payload from subscriptions */
148: for (t = Topics; t && t->sub_topic.msg_base; t++) {
149: qos = (buf->msg_base + siz);
150: *qos = t->sub_ret;
151: siz++;
152: }
153:
154: /* fixed header */
155: MQTTHDR_MSGINIT(hdr);
156: hdr->mqtt_msg.type = MQTT_TYPE_SUBACK;
157: *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
158:
159: return siz;
160: }
161:
162: /*
163: * mqtt_msgUNSUBSCRIBE() Create UNSUBSCRIBE message
164: *
165: * @buf = Message buffer
166: * @Topics = MQTT subscription topics
167: * @msgID = MessageID
168: * @Dup = Duplicate message
169: * @QOS = QoS
170: * return: -1 error or >-1 message size for send
171: */
172: int
173: mqtt_msgUNSUBSCRIBE(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics,
174: u_short msgID, u_char Dup, u_char QOS)
175: {
176: int siz = 0;
177: struct mqtthdr *hdr;
178: mqtthdr_var_t *topic;
179: mqtt_len_t *mid;
180: mqtt_subscr_t *t;
181:
182: if (!buf || !Topics)
183: return -1;
184: if (QOS > MQTT_QOS_EXACTLY) {
1.1.1.1.2.1 misho 185: mqtt_SetErr(EINVAL, "Invalid QoS parameter");
1.1 misho 186: return -1;
187: }
188: if (!msgID && QOS != MQTT_QOS_ONCE) {
1.1.1.1.2.1 misho 189: mqtt_SetErr(EINVAL, "Invalid MessageID parameter must be >0");
1.1 misho 190: return -1;
191: }
192:
193: if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
194: return -1;
195: else {
196: hdr = (struct mqtthdr *) (buf->msg_base + siz);
197: siz += sizeof(struct mqtthdr);
198: }
199:
200: /* variable header */
201: mid = (mqtt_len_t*) (buf->msg_base + siz);
202: mid->val = htons(msgID);
203: siz += sizeof(mqtt_len_t);
204:
205: /* payload with subscriptions */
206: for (t = Topics; t && t->sub_topic.msg_base; t++) {
207: topic = (mqtthdr_var_t*) (buf->msg_base + siz);
208: topic->var_sb.val = htons(t->sub_topic.msg_len);
209: memcpy(topic->var_data, t->sub_topic.msg_base, ntohs(topic->var_sb.val));
210: siz += MQTTHDR_VAR_SIZEOF(topic);
211: }
212:
213: /* fixed header */
214: MQTTHDR_MSGINIT(hdr);
215: hdr->mqtt_msg.type = MQTT_TYPE_UNSUBSCRIBE;
216: hdr->mqtt_msg.qos = QOS;
217: hdr->mqtt_msg.dup = Dup ? 1 : 0;
218: hdr->mqtt_msg.retain = 0;
219: *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
220:
221: return siz;
222: }
223:
224: /*
225: * mqtt_msgUNSUBACK() Create UNSUBACK message
226: *
227: * @buf = Message buffer
228: * @msgID = MessageID
229: * return: -1 error or >-1 message size for send
230: */
231: int
232: mqtt_msgUNSUBACK(mqtt_msg_t * __restrict buf, u_short msgID)
233: {
234: int siz = 0;
235: struct mqtthdr *hdr;
236: mqtt_len_t *v;
237:
238: if (!buf)
239: return -1;
240:
241: if (mqtt_msgRealloc(buf, sizeof(struct mqtthdr) + sizeof(mqtt_len_t)) == -1)
242: return -1;
243: else {
244: hdr = (struct mqtthdr *) (buf->msg_base + siz);
245: siz += sizeof(struct mqtthdr);
246: v = (mqtt_len_t*) (buf->msg_base + siz);
247: siz += sizeof(mqtt_len_t);
248: }
249:
250: /* fixed header */
251: MQTTHDR_MSGINIT(hdr);
252: hdr->mqtt_msg.type = MQTT_TYPE_UNSUBACK;
253: *hdr->mqtt_len = sizeof(mqtt_len_t);
254:
255: /* MessageID */
256: v->val = htons(msgID);
257:
258: return siz;
259: }
260:
261:
262: /* ============= decode ============ */
263:
264: /*
265: * mqtt_readSUBSCRIBE() Read SUBSCRIBE message
266: *
267: * @buf = Message buffer
268: * @msgID = MessageID
269: * @subscr = Subscriptions, must be free after use with mqtt_subFree()
1.1.1.1.2.2 misho 270: * return: -1 error or >-1 elements into subscr
1.1 misho 271: */
1.1.1.1.2.2 misho 272: int
1.1 misho 273: mqtt_readSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)
274: {
275: register int i;
276: int len, ret;
277: struct mqtthdr *hdr;
278: mqtthdr_var_t *var;
279: mqtt_subscr_t *subs;
280: mqtt_len_t *v;
281: caddr_t pos;
282:
283: if (!buf || !msgID || !subscr)
1.1.1.1.2.2 misho 284: return -1;
1.1 misho 285:
286: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBSCRIBE, &ret, &len);
287: if (!hdr)
1.1.1.1.2.2 misho 288: return -1;
1.1 misho 289: pos = buf->msg_base + ret + 1;
290: v = (mqtt_len_t*) pos;
291:
292: /* MessageID */
293: len -= sizeof(mqtt_len_t);
294: if (len < 0) {
1.1.1.1.2.1 misho 295: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1.1.1.2.2 misho 296: return -1;
1.1 misho 297: } else {
298: *msgID = ntohs(v->val);
299: pos += sizeof(mqtt_len_t);
300: }
301:
302: subs = mqtt_subAlloc(0);
303: if (!subs)
1.1.1.1.2.2 misho 304: return -1;
1.1 misho 305: else
306: *subscr = subs;
307:
308: /* Subscribes */
309: for (i = 0; len > 0; i++) {
310: var = (mqtthdr_var_t*) pos;
311: len -= MQTTHDR_VAR_SIZEOF(var) + 1;
312: if (len < 0) {
313: mqtt_subFree(subscr);
1.1.1.1.2.1 misho 314: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1.1.1.2.2 misho 315: return -1;
1.1 misho 316: }
1.1.1.1.2.4 misho 317: if (!mqtt_subRealloc(&subs, i + 1)) {
1.1 misho 318: mqtt_subFree(subscr);
1.1.1.1.2.2 misho 319: return -1;
1.1 misho 320: } else
321: *subscr = subs;
322:
323: memset(&subs[i], 0, sizeof subs[i]);
324: subs[i].sub_topic.msg_len = ntohs(var->var_sb.val);
1.1.1.1.2.5! misho 325: subs[i].sub_topic.msg_base = malloc(subs[i].sub_topic.msg_len + 1);
1.1 misho 326: if (!subs[i].sub_topic.msg_base) {
327: LOGERR;
328: mqtt_subFree(subscr);
1.1.1.1.2.2 misho 329: return -1;
1.1.1.1.2.5! misho 330: } else {
1.1 misho 331: memcpy(subs[i].sub_topic.msg_base, var->var_data, subs[i].sub_topic.msg_len);
1.1.1.1.2.5! misho 332: ((char*) subs[i].sub_topic.msg_base)[subs[i].sub_topic.msg_len] = 0;
! 333: }
1.1 misho 334: pos += MQTTHDR_VAR_SIZEOF(var);
335:
336: subs[i].sub_ret = *pos;
337: pos++;
338: }
339:
1.1.1.1.2.2 misho 340: return i;
1.1 misho 341: }
342:
343: /*
344: * mqtt_readSUBACK() Read SUBACK message
345: *
346: * @buf = Message buffer
347: * @msgID = MessageID
348: * @subqos = Subscribes QoS, must be free after use with free()
349: * return: -1 error or >-1 readed subscribes QoS elements
350: */
351: int
352: mqtt_readSUBACK(mqtt_msg_t * __restrict buf, u_short *msgID, u_char **subqos)
353: {
354: int len, ret;
355: struct mqtthdr *hdr;
356: mqtt_len_t *v;
357: caddr_t pos;
358:
359: if (!buf || !msgID || !subqos)
360: return -1;
361:
362: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBACK, &ret, &len);
363: if (!hdr)
364: return -1;
365: pos = buf->msg_base + ret + 1;
366: v = (mqtt_len_t*) pos;
367:
368: /* MessageID */
369: len -= sizeof(mqtt_len_t);
370: if (len < 0) {
1.1.1.1.2.1 misho 371: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1 misho 372: return -1;
373: } else {
374: *msgID = ntohs(v->val);
375: pos += sizeof(mqtt_len_t);
376: }
377:
378: /* Subscribes */
379: *subqos = malloc(len);
380: if (!*subqos) {
381: LOGERR;
382: return -1;
383: } else
384: memcpy(*subqos, pos, len);
385:
386: return len;
387: }
388:
389: /*
390: * mqtt_readUNSUBSCRIBE() Read UNSUBSCRIBE message
391: *
392: * @buf = Message buffer
393: * @msgID = MessageID
394: * @subscr = Subscriptions, must be free after use with mqtt_subFree()
1.1.1.1.2.2 misho 395: * return: -1 error or >-1 elements into subscr
1.1 misho 396: */
1.1.1.1.2.2 misho 397: int
1.1 misho 398: mqtt_readUNSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)
399: {
400: register int i;
401: int len, ret;
402: struct mqtthdr *hdr;
403: mqtthdr_var_t *var;
404: mqtt_subscr_t *subs;
405: mqtt_len_t *v;
406: caddr_t pos;
407:
408: if (!buf || !msgID || !subscr)
1.1.1.1.2.2 misho 409: return -1;
1.1 misho 410:
411: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBSCRIBE, &ret, &len);
412: if (!hdr)
1.1.1.1.2.2 misho 413: return -1;
1.1 misho 414: pos = buf->msg_base + ret + 1;
415: v = (mqtt_len_t*) pos;
416:
417: /* MessageID */
418: len -= sizeof(mqtt_len_t);
419: if (len < 0) {
1.1.1.1.2.1 misho 420: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1.1.1.2.2 misho 421: return -1;
1.1 misho 422: } else {
423: *msgID = ntohs(v->val);
424: pos += sizeof(mqtt_len_t);
425: }
426:
427: subs = mqtt_subAlloc(0);
428: if (!subs)
1.1.1.1.2.2 misho 429: return -1;
1.1 misho 430: else
431: *subscr = subs;
432:
433: /* Subscribes */
434: for (i = 0; len > 0; i++) {
435: var = (mqtthdr_var_t*) pos;
436: len -= MQTTHDR_VAR_SIZEOF(var);
437: if (len < 0) {
438: mqtt_subFree(subscr);
1.1.1.1.2.1 misho 439: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1.1.1.2.2 misho 440: return -1;
1.1 misho 441: }
1.1.1.1.2.4 misho 442: if (!mqtt_subRealloc(&subs, i + 1)) {
1.1 misho 443: mqtt_subFree(subscr);
1.1.1.1.2.2 misho 444: return -1;
1.1 misho 445: } else
446: *subscr = subs;
447:
448: memset(&subs[i], 0, sizeof subs[i]);
449: subs[i].sub_topic.msg_len = ntohs(var->var_sb.val);
1.1.1.1.2.5! misho 450: subs[i].sub_topic.msg_base = malloc(subs[i].sub_topic.msg_len + 1);
1.1 misho 451: if (!subs[i].sub_topic.msg_base) {
452: LOGERR;
453: mqtt_subFree(subscr);
1.1.1.1.2.2 misho 454: return -1;
1.1.1.1.2.5! misho 455: } else {
1.1 misho 456: memcpy(subs[i].sub_topic.msg_base, var->var_data, subs[i].sub_topic.msg_len);
1.1.1.1.2.5! misho 457: ((char*) subs[i].sub_topic.msg_base)[subs[i].sub_topic.msg_len] = 0;
! 458: }
1.1 misho 459: pos += MQTTHDR_VAR_SIZEOF(var);
460: }
461:
1.1.1.1.2.2 misho 462: return i;
1.1 misho 463: }
464:
465: /*
466: * mqtt_readUNSUBACK() Read UNSUBACK message
467: *
468: * @buf = Message buffer
469: * return: -1 error or MessageID
470: */
471: u_short
472: mqtt_readUNSUBACK(mqtt_msg_t * __restrict buf)
473: {
474: int len, ret;
475: struct mqtthdr *hdr;
476: mqtt_len_t *v;
477: caddr_t pos;
478:
479: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBACK, &ret, &len);
480: if (!hdr)
481: return (u_short) -1;
482: if (len < sizeof(mqtt_len_t)) {
1.1.1.1.2.1 misho 483: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1 misho 484: return (u_short) -1;
485: } else {
486: pos = buf->msg_base + ret + 1;
487: v = (mqtt_len_t*) pos;
488: }
489:
490: return ntohs(v->val);
491: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>