Annotation of libaitmqtt/src/sub.c, revision 1.3
1.1 misho 1: /*************************************************************************
2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.3 ! misho 6: * $Id: sub.c,v 1.2.2.2 2012/06/26 08:01:07 misho Exp $
1.1 misho 7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
1.2 misho 15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
1.1 misho 16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47:
48:
49: /*
50: * mqtt_msgSUBSCRIBE() Create SUBSCRIBE message
51: *
52: * @buf = Message buffer
53: * @Topics = MQTT subscription topics
54: * @msgID = MessageID
55: * @Dup = Duplicate message
56: * @QOS = QoS
57: * return: -1 error or >-1 message size for send
58: */
59: int
60: mqtt_msgSUBSCRIBE(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics,
61: u_short msgID, u_char Dup, u_char QOS)
62: {
1.2 misho 63: int len, siz = 0;
1.3 ! misho 64: u_int n, *l;
1.1 misho 65: struct mqtthdr *hdr;
66: mqtthdr_var_t *topic;
67: mqtt_len_t *mid;
68: mqtt_subscr_t *t;
69: u_char *qos;
1.2 misho 70: void *data;
1.1 misho 71:
72: if (!buf || !Topics)
73: return -1;
74: if (QOS > MQTT_QOS_EXACTLY) {
1.2 misho 75: mqtt_SetErr(EINVAL, "Invalid QoS parameter");
1.1 misho 76: return -1;
77: }
78: if (!msgID && QOS != MQTT_QOS_ONCE) {
1.2 misho 79: mqtt_SetErr(EINVAL, "Invalid MessageID parameter must be >0");
1.1 misho 80: return -1;
81: }
82:
1.2 misho 83: /* calculate message size */
84: len = sizeof(mqtt_len_t); /* msgid */
85: for (t = Topics; t && t->sub_topic.msg_base; t++) /* subscribes & qos */
86: len += sizeof(mqtt_len_t) + t->sub_topic.msg_len + 1;
87:
88: /* calculate header size */
89: siz = sizeof(struct mqtthdr); /* mqtt fixed header */
90: n = mqtt_encodeLen(len); /* message size */
91: siz += mqtt_sizeLen(n) - 1; /* length size */
92:
93: if (mqtt_msgRealloc(buf, siz + len) == -1)
1.1 misho 94: return -1;
95: else {
1.2 misho 96: data = buf->msg_base;
97: hdr = (struct mqtthdr *) data;
1.1 misho 98: }
99:
1.2 misho 100: /* fixed header */
101: MQTTHDR_MSGINIT(hdr);
102: hdr->mqtt_msg.type = MQTT_TYPE_SUBSCRIBE;
103: hdr->mqtt_msg.qos = QOS;
104: hdr->mqtt_msg.dup = Dup ? 1 : 0;
105: hdr->mqtt_msg.retain = 0;
1.3 ! misho 106: l = (u_int*) hdr->mqtt_len;
! 107: *l = n;
1.2 misho 108: data += siz;
109:
1.1 misho 110: /* variable header */
1.2 misho 111: mid = (mqtt_len_t*) data;
1.1 misho 112: mid->val = htons(msgID);
1.2 misho 113: data += sizeof(mqtt_len_t);
1.1 misho 114:
115: /* payload with subscriptions */
116: for (t = Topics; t && t->sub_topic.msg_base; t++) {
1.2 misho 117: topic = (mqtthdr_var_t*) data;
1.1 misho 118: topic->var_sb.val = htons(t->sub_topic.msg_len);
119: memcpy(topic->var_data, t->sub_topic.msg_base, ntohs(topic->var_sb.val));
1.2 misho 120: data += MQTTHDR_VAR_SIZEOF(topic);
121: qos = data++;
1.1 misho 122: *qos = t->sub_ret;
123: }
124:
1.3 ! misho 125: return siz + len;
1.1 misho 126: }
127:
128: /*
129: * mqtt_msgSUBACK() Create SUBACK message
130: *
131: * @buf = Message buffer
132: * @Topics = MQTT subscription topics
133: * @msgID = MessageID
134: * return: -1 error or >-1 message size for send
135: */
136: int
137: mqtt_msgSUBACK(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, u_short msgID)
138: {
139: int siz = 0;
140: struct mqtthdr *hdr;
141: mqtt_len_t *v;
142: mqtt_subscr_t *t;
143: u_char *qos;
144:
145: if (!buf || !Topics)
146: return -1;
147:
148: if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
149: return -1;
150: else {
151: hdr = (struct mqtthdr *) (buf->msg_base + siz);
152: siz += sizeof(struct mqtthdr);
153: v = (mqtt_len_t*) (buf->msg_base + siz);
154: siz += sizeof(mqtt_len_t);
155: }
156:
157: /* MessageID */
158: v->val = htons(msgID);
159:
160: /* QoS payload from subscriptions */
161: for (t = Topics; t && t->sub_topic.msg_base; t++) {
162: qos = (buf->msg_base + siz);
163: *qos = t->sub_ret;
164: siz++;
165: }
166:
167: /* fixed header */
168: MQTTHDR_MSGINIT(hdr);
169: hdr->mqtt_msg.type = MQTT_TYPE_SUBACK;
170: *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
171:
172: return siz;
173: }
174:
175: /*
176: * mqtt_msgUNSUBSCRIBE() Create UNSUBSCRIBE message
177: *
178: * @buf = Message buffer
179: * @Topics = MQTT subscription topics
180: * @msgID = MessageID
181: * @Dup = Duplicate message
182: * @QOS = QoS
183: * return: -1 error or >-1 message size for send
184: */
185: int
186: mqtt_msgUNSUBSCRIBE(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics,
187: u_short msgID, u_char Dup, u_char QOS)
188: {
1.2 misho 189: int len, siz = 0;
1.3 ! misho 190: u_int n, *l;
1.1 misho 191: struct mqtthdr *hdr;
192: mqtthdr_var_t *topic;
193: mqtt_len_t *mid;
194: mqtt_subscr_t *t;
1.2 misho 195: void *data;
1.1 misho 196:
197: if (!buf || !Topics)
198: return -1;
199: if (QOS > MQTT_QOS_EXACTLY) {
1.2 misho 200: mqtt_SetErr(EINVAL, "Invalid QoS parameter");
1.1 misho 201: return -1;
202: }
203: if (!msgID && QOS != MQTT_QOS_ONCE) {
1.2 misho 204: mqtt_SetErr(EINVAL, "Invalid MessageID parameter must be >0");
1.1 misho 205: return -1;
206: }
207:
1.2 misho 208: /* calculate message size */
209: len = sizeof(mqtt_len_t); /* msgid */
210: for (t = Topics; t && t->sub_topic.msg_base; t++) /* subscribes */
211: len += sizeof(mqtt_len_t) + t->sub_topic.msg_len;
212:
213: /* calculate header size */
214: siz = sizeof(struct mqtthdr); /* mqtt fixed header */
215: n = mqtt_encodeLen(len); /* message size */
216: siz += mqtt_sizeLen(n) - 1; /* length size */
217:
218: if (mqtt_msgRealloc(buf, siz + len) == -1)
1.1 misho 219: return -1;
220: else {
1.2 misho 221: data = buf->msg_base;
222: hdr = (struct mqtthdr *) data;
1.1 misho 223: }
224:
1.2 misho 225: /* fixed header */
226: MQTTHDR_MSGINIT(hdr);
227: hdr->mqtt_msg.type = MQTT_TYPE_UNSUBSCRIBE;
228: hdr->mqtt_msg.qos = QOS;
229: hdr->mqtt_msg.dup = Dup ? 1 : 0;
230: hdr->mqtt_msg.retain = 0;
1.3 ! misho 231: l = (u_int*) hdr->mqtt_len;
! 232: *l = n;
1.2 misho 233: data += siz;
234:
1.1 misho 235: /* variable header */
236: mid = (mqtt_len_t*) (buf->msg_base + siz);
237: mid->val = htons(msgID);
1.2 misho 238: data += sizeof(mqtt_len_t);
1.1 misho 239:
240: /* payload with subscriptions */
241: for (t = Topics; t && t->sub_topic.msg_base; t++) {
1.2 misho 242: topic = (mqtthdr_var_t*) data;
1.1 misho 243: topic->var_sb.val = htons(t->sub_topic.msg_len);
244: memcpy(topic->var_data, t->sub_topic.msg_base, ntohs(topic->var_sb.val));
1.2 misho 245: data += MQTTHDR_VAR_SIZEOF(topic);
1.1 misho 246: }
247:
1.3 ! misho 248: return siz + len;
1.1 misho 249: }
250:
251: /*
252: * mqtt_msgUNSUBACK() Create UNSUBACK message
253: *
254: * @buf = Message buffer
255: * @msgID = MessageID
256: * return: -1 error or >-1 message size for send
257: */
258: int
259: mqtt_msgUNSUBACK(mqtt_msg_t * __restrict buf, u_short msgID)
260: {
261: int siz = 0;
262: struct mqtthdr *hdr;
263: mqtt_len_t *v;
264:
265: if (!buf)
266: return -1;
267:
268: if (mqtt_msgRealloc(buf, sizeof(struct mqtthdr) + sizeof(mqtt_len_t)) == -1)
269: return -1;
270: else {
271: hdr = (struct mqtthdr *) (buf->msg_base + siz);
272: siz += sizeof(struct mqtthdr);
273: v = (mqtt_len_t*) (buf->msg_base + siz);
274: siz += sizeof(mqtt_len_t);
275: }
276:
277: /* fixed header */
278: MQTTHDR_MSGINIT(hdr);
279: hdr->mqtt_msg.type = MQTT_TYPE_UNSUBACK;
280: *hdr->mqtt_len = sizeof(mqtt_len_t);
281:
282: /* MessageID */
283: v->val = htons(msgID);
284:
285: return siz;
286: }
287:
288:
289: /* ============= decode ============ */
290:
291: /*
292: * mqtt_readSUBSCRIBE() Read SUBSCRIBE message
293: *
294: * @buf = Message buffer
295: * @msgID = MessageID
296: * @subscr = Subscriptions, must be free after use with mqtt_subFree()
1.2 misho 297: * return: -1 error or >-1 elements into subscr
1.1 misho 298: */
1.2 misho 299: int
1.1 misho 300: mqtt_readSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)
301: {
302: register int i;
303: int len, ret;
304: struct mqtthdr *hdr;
305: mqtthdr_var_t *var;
306: mqtt_subscr_t *subs;
307: mqtt_len_t *v;
308: caddr_t pos;
309:
310: if (!buf || !msgID || !subscr)
1.2 misho 311: return -1;
1.1 misho 312:
313: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBSCRIBE, &ret, &len);
314: if (!hdr)
1.2 misho 315: return -1;
1.1 misho 316: pos = buf->msg_base + ret + 1;
317: v = (mqtt_len_t*) pos;
318:
319: /* MessageID */
320: len -= sizeof(mqtt_len_t);
321: if (len < 0) {
1.2 misho 322: mqtt_SetErr(EINVAL, "Short message length %d", len);
323: return -1;
1.1 misho 324: } else {
325: *msgID = ntohs(v->val);
326: pos += sizeof(mqtt_len_t);
327: }
328:
329: subs = mqtt_subAlloc(0);
330: if (!subs)
1.2 misho 331: return -1;
1.1 misho 332: else
333: *subscr = subs;
334:
335: /* Subscribes */
336: for (i = 0; len > 0; i++) {
337: var = (mqtthdr_var_t*) pos;
338: len -= MQTTHDR_VAR_SIZEOF(var) + 1;
339: if (len < 0) {
340: mqtt_subFree(subscr);
1.2 misho 341: mqtt_SetErr(EINVAL, "Short message length %d", len);
342: return -1;
1.1 misho 343: }
1.2 misho 344: if (!mqtt_subRealloc(&subs, i + 1)) {
1.1 misho 345: mqtt_subFree(subscr);
1.2 misho 346: return -1;
1.1 misho 347: } else
348: *subscr = subs;
349:
350: memset(&subs[i], 0, sizeof subs[i]);
351: subs[i].sub_topic.msg_len = ntohs(var->var_sb.val);
1.2 misho 352: subs[i].sub_topic.msg_base = malloc(subs[i].sub_topic.msg_len + 1);
1.1 misho 353: if (!subs[i].sub_topic.msg_base) {
354: LOGERR;
355: mqtt_subFree(subscr);
1.2 misho 356: return -1;
357: } else {
1.1 misho 358: memcpy(subs[i].sub_topic.msg_base, var->var_data, subs[i].sub_topic.msg_len);
1.2 misho 359: ((char*) subs[i].sub_topic.msg_base)[subs[i].sub_topic.msg_len] = 0;
360: }
1.1 misho 361: pos += MQTTHDR_VAR_SIZEOF(var);
362:
363: subs[i].sub_ret = *pos;
364: pos++;
365: }
366:
1.2 misho 367: return i;
1.1 misho 368: }
369:
370: /*
371: * mqtt_readSUBACK() Read SUBACK message
372: *
373: * @buf = Message buffer
374: * @msgID = MessageID
375: * @subqos = Subscribes QoS, must be free after use with free()
376: * return: -1 error or >-1 readed subscribes QoS elements
377: */
378: int
379: mqtt_readSUBACK(mqtt_msg_t * __restrict buf, u_short *msgID, u_char **subqos)
380: {
381: int len, ret;
382: struct mqtthdr *hdr;
383: mqtt_len_t *v;
384: caddr_t pos;
385:
386: if (!buf || !msgID || !subqos)
387: return -1;
388:
389: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBACK, &ret, &len);
390: if (!hdr)
391: return -1;
392: pos = buf->msg_base + ret + 1;
393: v = (mqtt_len_t*) pos;
394:
395: /* MessageID */
396: len -= sizeof(mqtt_len_t);
397: if (len < 0) {
1.2 misho 398: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1 misho 399: return -1;
400: } else {
401: *msgID = ntohs(v->val);
402: pos += sizeof(mqtt_len_t);
403: }
404:
405: /* Subscribes */
406: *subqos = malloc(len);
407: if (!*subqos) {
408: LOGERR;
409: return -1;
410: } else
411: memcpy(*subqos, pos, len);
412:
413: return len;
414: }
415:
416: /*
417: * mqtt_readUNSUBSCRIBE() Read UNSUBSCRIBE message
418: *
419: * @buf = Message buffer
420: * @msgID = MessageID
421: * @subscr = Subscriptions, must be free after use with mqtt_subFree()
1.2 misho 422: * return: -1 error or >-1 elements into subscr
1.1 misho 423: */
1.2 misho 424: int
1.1 misho 425: mqtt_readUNSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)
426: {
427: register int i;
428: int len, ret;
429: struct mqtthdr *hdr;
430: mqtthdr_var_t *var;
431: mqtt_subscr_t *subs;
432: mqtt_len_t *v;
433: caddr_t pos;
434:
435: if (!buf || !msgID || !subscr)
1.2 misho 436: return -1;
1.1 misho 437:
438: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBSCRIBE, &ret, &len);
439: if (!hdr)
1.2 misho 440: return -1;
1.1 misho 441: pos = buf->msg_base + ret + 1;
442: v = (mqtt_len_t*) pos;
443:
444: /* MessageID */
445: len -= sizeof(mqtt_len_t);
446: if (len < 0) {
1.2 misho 447: mqtt_SetErr(EINVAL, "Short message length %d", len);
448: return -1;
1.1 misho 449: } else {
450: *msgID = ntohs(v->val);
451: pos += sizeof(mqtt_len_t);
452: }
453:
454: subs = mqtt_subAlloc(0);
455: if (!subs)
1.2 misho 456: return -1;
1.1 misho 457: else
458: *subscr = subs;
459:
460: /* Subscribes */
461: for (i = 0; len > 0; i++) {
462: var = (mqtthdr_var_t*) pos;
463: len -= MQTTHDR_VAR_SIZEOF(var);
464: if (len < 0) {
465: mqtt_subFree(subscr);
1.2 misho 466: mqtt_SetErr(EINVAL, "Short message length %d", len);
467: return -1;
1.1 misho 468: }
1.2 misho 469: if (!mqtt_subRealloc(&subs, i + 1)) {
1.1 misho 470: mqtt_subFree(subscr);
1.2 misho 471: return -1;
1.1 misho 472: } else
473: *subscr = subs;
474:
475: memset(&subs[i], 0, sizeof subs[i]);
476: subs[i].sub_topic.msg_len = ntohs(var->var_sb.val);
1.2 misho 477: subs[i].sub_topic.msg_base = malloc(subs[i].sub_topic.msg_len + 1);
1.1 misho 478: if (!subs[i].sub_topic.msg_base) {
479: LOGERR;
480: mqtt_subFree(subscr);
1.2 misho 481: return -1;
482: } else {
1.1 misho 483: memcpy(subs[i].sub_topic.msg_base, var->var_data, subs[i].sub_topic.msg_len);
1.2 misho 484: ((char*) subs[i].sub_topic.msg_base)[subs[i].sub_topic.msg_len] = 0;
485: }
1.1 misho 486: pos += MQTTHDR_VAR_SIZEOF(var);
487: }
488:
1.2 misho 489: return i;
1.1 misho 490: }
491:
492: /*
493: * mqtt_readUNSUBACK() Read UNSUBACK message
494: *
495: * @buf = Message buffer
496: * return: -1 error or MessageID
497: */
498: u_short
499: mqtt_readUNSUBACK(mqtt_msg_t * __restrict buf)
500: {
501: int len, ret;
502: struct mqtthdr *hdr;
503: mqtt_len_t *v;
504: caddr_t pos;
505:
506: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBACK, &ret, &len);
507: if (!hdr)
508: return (u_short) -1;
509: if (len < sizeof(mqtt_len_t)) {
1.2 misho 510: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1 misho 511: return (u_short) -1;
512: } else {
513: pos = buf->msg_base + ret + 1;
514: v = (mqtt_len_t*) pos;
515: }
516:
517: return ntohs(v->val);
518: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>