Annotation of libaitmqtt/src/sub.c, revision 1.2
1.1 misho 1: /*************************************************************************
2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.2 ! misho 6: * $Id: sub.c,v 1.1.1.1.2.6 2012/06/20 11:11:30 misho Exp $
1.1 misho 7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
1.2 ! misho 15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
1.1 misho 16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47:
48:
49: /*
50: * mqtt_msgSUBSCRIBE() Create SUBSCRIBE message
51: *
52: * @buf = Message buffer
53: * @Topics = MQTT subscription topics
54: * @msgID = MessageID
55: * @Dup = Duplicate message
56: * @QOS = QoS
57: * return: -1 error or >-1 message size for send
58: */
59: int
60: mqtt_msgSUBSCRIBE(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics,
61: u_short msgID, u_char Dup, u_char QOS)
62: {
1.2 ! misho 63: int len, siz = 0;
! 64: u_int n;
1.1 misho 65: struct mqtthdr *hdr;
66: mqtthdr_var_t *topic;
67: mqtt_len_t *mid;
68: mqtt_subscr_t *t;
69: u_char *qos;
1.2 ! misho 70: void *data;
1.1 misho 71:
72: if (!buf || !Topics)
73: return -1;
74: if (QOS > MQTT_QOS_EXACTLY) {
1.2 ! misho 75: mqtt_SetErr(EINVAL, "Invalid QoS parameter");
1.1 misho 76: return -1;
77: }
78: if (!msgID && QOS != MQTT_QOS_ONCE) {
1.2 ! misho 79: mqtt_SetErr(EINVAL, "Invalid MessageID parameter must be >0");
1.1 misho 80: return -1;
81: }
82:
1.2 ! misho 83: /* calculate message size */
! 84: len = sizeof(mqtt_len_t); /* msgid */
! 85: for (t = Topics; t && t->sub_topic.msg_base; t++) /* subscribes & qos */
! 86: len += sizeof(mqtt_len_t) + t->sub_topic.msg_len + 1;
! 87:
! 88: /* calculate header size */
! 89: siz = sizeof(struct mqtthdr); /* mqtt fixed header */
! 90: n = mqtt_encodeLen(len); /* message size */
! 91: siz += mqtt_sizeLen(n) - 1; /* length size */
! 92:
! 93: if (mqtt_msgRealloc(buf, siz + len) == -1)
1.1 misho 94: return -1;
95: else {
1.2 ! misho 96: data = buf->msg_base;
! 97: hdr = (struct mqtthdr *) data;
1.1 misho 98: }
99:
1.2 ! misho 100: /* fixed header */
! 101: MQTTHDR_MSGINIT(hdr);
! 102: hdr->mqtt_msg.type = MQTT_TYPE_SUBSCRIBE;
! 103: hdr->mqtt_msg.qos = QOS;
! 104: hdr->mqtt_msg.dup = Dup ? 1 : 0;
! 105: hdr->mqtt_msg.retain = 0;
! 106: *(u_int*) hdr->mqtt_len = n;
! 107: data += siz;
! 108:
1.1 misho 109: /* variable header */
1.2 ! misho 110: mid = (mqtt_len_t*) data;
1.1 misho 111: mid->val = htons(msgID);
1.2 ! misho 112: data += sizeof(mqtt_len_t);
1.1 misho 113:
114: /* payload with subscriptions */
115: for (t = Topics; t && t->sub_topic.msg_base; t++) {
1.2 ! misho 116: topic = (mqtthdr_var_t*) data;
1.1 misho 117: topic->var_sb.val = htons(t->sub_topic.msg_len);
118: memcpy(topic->var_data, t->sub_topic.msg_base, ntohs(topic->var_sb.val));
1.2 ! misho 119: data += MQTTHDR_VAR_SIZEOF(topic);
! 120: qos = data++;
1.1 misho 121: *qos = t->sub_ret;
122: }
123:
124: return siz;
125: }
126:
127: /*
128: * mqtt_msgSUBACK() Create SUBACK message
129: *
130: * @buf = Message buffer
131: * @Topics = MQTT subscription topics
132: * @msgID = MessageID
133: * return: -1 error or >-1 message size for send
134: */
135: int
136: mqtt_msgSUBACK(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, u_short msgID)
137: {
138: int siz = 0;
139: struct mqtthdr *hdr;
140: mqtt_len_t *v;
141: mqtt_subscr_t *t;
142: u_char *qos;
143:
144: if (!buf || !Topics)
145: return -1;
146:
147: if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
148: return -1;
149: else {
150: hdr = (struct mqtthdr *) (buf->msg_base + siz);
151: siz += sizeof(struct mqtthdr);
152: v = (mqtt_len_t*) (buf->msg_base + siz);
153: siz += sizeof(mqtt_len_t);
154: }
155:
156: /* MessageID */
157: v->val = htons(msgID);
158:
159: /* QoS payload from subscriptions */
160: for (t = Topics; t && t->sub_topic.msg_base; t++) {
161: qos = (buf->msg_base + siz);
162: *qos = t->sub_ret;
163: siz++;
164: }
165:
166: /* fixed header */
167: MQTTHDR_MSGINIT(hdr);
168: hdr->mqtt_msg.type = MQTT_TYPE_SUBACK;
169: *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
170:
171: return siz;
172: }
173:
174: /*
175: * mqtt_msgUNSUBSCRIBE() Create UNSUBSCRIBE message
176: *
177: * @buf = Message buffer
178: * @Topics = MQTT subscription topics
179: * @msgID = MessageID
180: * @Dup = Duplicate message
181: * @QOS = QoS
182: * return: -1 error or >-1 message size for send
183: */
184: int
185: mqtt_msgUNSUBSCRIBE(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics,
186: u_short msgID, u_char Dup, u_char QOS)
187: {
1.2 ! misho 188: int len, siz = 0;
! 189: u_int n;
1.1 misho 190: struct mqtthdr *hdr;
191: mqtthdr_var_t *topic;
192: mqtt_len_t *mid;
193: mqtt_subscr_t *t;
1.2 ! misho 194: void *data;
1.1 misho 195:
196: if (!buf || !Topics)
197: return -1;
198: if (QOS > MQTT_QOS_EXACTLY) {
1.2 ! misho 199: mqtt_SetErr(EINVAL, "Invalid QoS parameter");
1.1 misho 200: return -1;
201: }
202: if (!msgID && QOS != MQTT_QOS_ONCE) {
1.2 ! misho 203: mqtt_SetErr(EINVAL, "Invalid MessageID parameter must be >0");
1.1 misho 204: return -1;
205: }
206:
1.2 ! misho 207: /* calculate message size */
! 208: len = sizeof(mqtt_len_t); /* msgid */
! 209: for (t = Topics; t && t->sub_topic.msg_base; t++) /* subscribes */
! 210: len += sizeof(mqtt_len_t) + t->sub_topic.msg_len;
! 211:
! 212: /* calculate header size */
! 213: siz = sizeof(struct mqtthdr); /* mqtt fixed header */
! 214: n = mqtt_encodeLen(len); /* message size */
! 215: siz += mqtt_sizeLen(n) - 1; /* length size */
! 216:
! 217: if (mqtt_msgRealloc(buf, siz + len) == -1)
1.1 misho 218: return -1;
219: else {
1.2 ! misho 220: data = buf->msg_base;
! 221: hdr = (struct mqtthdr *) data;
1.1 misho 222: }
223:
1.2 ! misho 224: /* fixed header */
! 225: MQTTHDR_MSGINIT(hdr);
! 226: hdr->mqtt_msg.type = MQTT_TYPE_UNSUBSCRIBE;
! 227: hdr->mqtt_msg.qos = QOS;
! 228: hdr->mqtt_msg.dup = Dup ? 1 : 0;
! 229: hdr->mqtt_msg.retain = 0;
! 230: *(u_int*) hdr->mqtt_len = n;
! 231: data += siz;
! 232:
1.1 misho 233: /* variable header */
234: mid = (mqtt_len_t*) (buf->msg_base + siz);
235: mid->val = htons(msgID);
1.2 ! misho 236: data += sizeof(mqtt_len_t);
1.1 misho 237:
238: /* payload with subscriptions */
239: for (t = Topics; t && t->sub_topic.msg_base; t++) {
1.2 ! misho 240: topic = (mqtthdr_var_t*) data;
1.1 misho 241: topic->var_sb.val = htons(t->sub_topic.msg_len);
242: memcpy(topic->var_data, t->sub_topic.msg_base, ntohs(topic->var_sb.val));
1.2 ! misho 243: data += MQTTHDR_VAR_SIZEOF(topic);
1.1 misho 244: }
245:
246: return siz;
247: }
248:
249: /*
250: * mqtt_msgUNSUBACK() Create UNSUBACK message
251: *
252: * @buf = Message buffer
253: * @msgID = MessageID
254: * return: -1 error or >-1 message size for send
255: */
256: int
257: mqtt_msgUNSUBACK(mqtt_msg_t * __restrict buf, u_short msgID)
258: {
259: int siz = 0;
260: struct mqtthdr *hdr;
261: mqtt_len_t *v;
262:
263: if (!buf)
264: return -1;
265:
266: if (mqtt_msgRealloc(buf, sizeof(struct mqtthdr) + sizeof(mqtt_len_t)) == -1)
267: return -1;
268: else {
269: hdr = (struct mqtthdr *) (buf->msg_base + siz);
270: siz += sizeof(struct mqtthdr);
271: v = (mqtt_len_t*) (buf->msg_base + siz);
272: siz += sizeof(mqtt_len_t);
273: }
274:
275: /* fixed header */
276: MQTTHDR_MSGINIT(hdr);
277: hdr->mqtt_msg.type = MQTT_TYPE_UNSUBACK;
278: *hdr->mqtt_len = sizeof(mqtt_len_t);
279:
280: /* MessageID */
281: v->val = htons(msgID);
282:
283: return siz;
284: }
285:
286:
287: /* ============= decode ============ */
288:
289: /*
290: * mqtt_readSUBSCRIBE() Read SUBSCRIBE message
291: *
292: * @buf = Message buffer
293: * @msgID = MessageID
294: * @subscr = Subscriptions, must be free after use with mqtt_subFree()
1.2 ! misho 295: * return: -1 error or >-1 elements into subscr
1.1 misho 296: */
1.2 ! misho 297: int
1.1 misho 298: mqtt_readSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)
299: {
300: register int i;
301: int len, ret;
302: struct mqtthdr *hdr;
303: mqtthdr_var_t *var;
304: mqtt_subscr_t *subs;
305: mqtt_len_t *v;
306: caddr_t pos;
307:
308: if (!buf || !msgID || !subscr)
1.2 ! misho 309: return -1;
1.1 misho 310:
311: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBSCRIBE, &ret, &len);
312: if (!hdr)
1.2 ! misho 313: return -1;
1.1 misho 314: pos = buf->msg_base + ret + 1;
315: v = (mqtt_len_t*) pos;
316:
317: /* MessageID */
318: len -= sizeof(mqtt_len_t);
319: if (len < 0) {
1.2 ! misho 320: mqtt_SetErr(EINVAL, "Short message length %d", len);
! 321: return -1;
1.1 misho 322: } else {
323: *msgID = ntohs(v->val);
324: pos += sizeof(mqtt_len_t);
325: }
326:
327: subs = mqtt_subAlloc(0);
328: if (!subs)
1.2 ! misho 329: return -1;
1.1 misho 330: else
331: *subscr = subs;
332:
333: /* Subscribes */
334: for (i = 0; len > 0; i++) {
335: var = (mqtthdr_var_t*) pos;
336: len -= MQTTHDR_VAR_SIZEOF(var) + 1;
337: if (len < 0) {
338: mqtt_subFree(subscr);
1.2 ! misho 339: mqtt_SetErr(EINVAL, "Short message length %d", len);
! 340: return -1;
1.1 misho 341: }
1.2 ! misho 342: if (!mqtt_subRealloc(&subs, i + 1)) {
1.1 misho 343: mqtt_subFree(subscr);
1.2 ! misho 344: return -1;
1.1 misho 345: } else
346: *subscr = subs;
347:
348: memset(&subs[i], 0, sizeof subs[i]);
349: subs[i].sub_topic.msg_len = ntohs(var->var_sb.val);
1.2 ! misho 350: subs[i].sub_topic.msg_base = malloc(subs[i].sub_topic.msg_len + 1);
1.1 misho 351: if (!subs[i].sub_topic.msg_base) {
352: LOGERR;
353: mqtt_subFree(subscr);
1.2 ! misho 354: return -1;
! 355: } else {
1.1 misho 356: memcpy(subs[i].sub_topic.msg_base, var->var_data, subs[i].sub_topic.msg_len);
1.2 ! misho 357: ((char*) subs[i].sub_topic.msg_base)[subs[i].sub_topic.msg_len] = 0;
! 358: }
1.1 misho 359: pos += MQTTHDR_VAR_SIZEOF(var);
360:
361: subs[i].sub_ret = *pos;
362: pos++;
363: }
364:
1.2 ! misho 365: return i;
1.1 misho 366: }
367:
368: /*
369: * mqtt_readSUBACK() Read SUBACK message
370: *
371: * @buf = Message buffer
372: * @msgID = MessageID
373: * @subqos = Subscribes QoS, must be free after use with free()
374: * return: -1 error or >-1 readed subscribes QoS elements
375: */
376: int
377: mqtt_readSUBACK(mqtt_msg_t * __restrict buf, u_short *msgID, u_char **subqos)
378: {
379: int len, ret;
380: struct mqtthdr *hdr;
381: mqtt_len_t *v;
382: caddr_t pos;
383:
384: if (!buf || !msgID || !subqos)
385: return -1;
386:
387: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBACK, &ret, &len);
388: if (!hdr)
389: return -1;
390: pos = buf->msg_base + ret + 1;
391: v = (mqtt_len_t*) pos;
392:
393: /* MessageID */
394: len -= sizeof(mqtt_len_t);
395: if (len < 0) {
1.2 ! misho 396: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1 misho 397: return -1;
398: } else {
399: *msgID = ntohs(v->val);
400: pos += sizeof(mqtt_len_t);
401: }
402:
403: /* Subscribes */
404: *subqos = malloc(len);
405: if (!*subqos) {
406: LOGERR;
407: return -1;
408: } else
409: memcpy(*subqos, pos, len);
410:
411: return len;
412: }
413:
414: /*
415: * mqtt_readUNSUBSCRIBE() Read UNSUBSCRIBE message
416: *
417: * @buf = Message buffer
418: * @msgID = MessageID
419: * @subscr = Subscriptions, must be free after use with mqtt_subFree()
1.2 ! misho 420: * return: -1 error or >-1 elements into subscr
1.1 misho 421: */
1.2 ! misho 422: int
1.1 misho 423: mqtt_readUNSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)
424: {
425: register int i;
426: int len, ret;
427: struct mqtthdr *hdr;
428: mqtthdr_var_t *var;
429: mqtt_subscr_t *subs;
430: mqtt_len_t *v;
431: caddr_t pos;
432:
433: if (!buf || !msgID || !subscr)
1.2 ! misho 434: return -1;
1.1 misho 435:
436: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBSCRIBE, &ret, &len);
437: if (!hdr)
1.2 ! misho 438: return -1;
1.1 misho 439: pos = buf->msg_base + ret + 1;
440: v = (mqtt_len_t*) pos;
441:
442: /* MessageID */
443: len -= sizeof(mqtt_len_t);
444: if (len < 0) {
1.2 ! misho 445: mqtt_SetErr(EINVAL, "Short message length %d", len);
! 446: return -1;
1.1 misho 447: } else {
448: *msgID = ntohs(v->val);
449: pos += sizeof(mqtt_len_t);
450: }
451:
452: subs = mqtt_subAlloc(0);
453: if (!subs)
1.2 ! misho 454: return -1;
1.1 misho 455: else
456: *subscr = subs;
457:
458: /* Subscribes */
459: for (i = 0; len > 0; i++) {
460: var = (mqtthdr_var_t*) pos;
461: len -= MQTTHDR_VAR_SIZEOF(var);
462: if (len < 0) {
463: mqtt_subFree(subscr);
1.2 ! misho 464: mqtt_SetErr(EINVAL, "Short message length %d", len);
! 465: return -1;
1.1 misho 466: }
1.2 ! misho 467: if (!mqtt_subRealloc(&subs, i + 1)) {
1.1 misho 468: mqtt_subFree(subscr);
1.2 ! misho 469: return -1;
1.1 misho 470: } else
471: *subscr = subs;
472:
473: memset(&subs[i], 0, sizeof subs[i]);
474: subs[i].sub_topic.msg_len = ntohs(var->var_sb.val);
1.2 ! misho 475: subs[i].sub_topic.msg_base = malloc(subs[i].sub_topic.msg_len + 1);
1.1 misho 476: if (!subs[i].sub_topic.msg_base) {
477: LOGERR;
478: mqtt_subFree(subscr);
1.2 ! misho 479: return -1;
! 480: } else {
1.1 misho 481: memcpy(subs[i].sub_topic.msg_base, var->var_data, subs[i].sub_topic.msg_len);
1.2 ! misho 482: ((char*) subs[i].sub_topic.msg_base)[subs[i].sub_topic.msg_len] = 0;
! 483: }
1.1 misho 484: pos += MQTTHDR_VAR_SIZEOF(var);
485: }
486:
1.2 ! misho 487: return i;
1.1 misho 488: }
489:
490: /*
491: * mqtt_readUNSUBACK() Read UNSUBACK message
492: *
493: * @buf = Message buffer
494: * return: -1 error or MessageID
495: */
496: u_short
497: mqtt_readUNSUBACK(mqtt_msg_t * __restrict buf)
498: {
499: int len, ret;
500: struct mqtthdr *hdr;
501: mqtt_len_t *v;
502: caddr_t pos;
503:
504: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBACK, &ret, &len);
505: if (!hdr)
506: return (u_short) -1;
507: if (len < sizeof(mqtt_len_t)) {
1.2 ! misho 508: mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1 misho 509: return (u_short) -1;
510: } else {
511: pos = buf->msg_base + ret + 1;
512: v = (mqtt_len_t*) pos;
513: }
514:
515: return ntohs(v->val);
516: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>