Annotation of libaitmqtt/src/sub.c, revision 1.1.1.1
1.1 misho 1: /*************************************************************************
2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
6: * $Id: aitsched.c,v 1.5 2012/01/24 21:59:47 misho Exp $
7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011
16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47:
48:
49: /* ------------------------------------------------------------------- */
50:
51: /*
52: * mqtt_msgSUBSCRIBE() Create SUBSCRIBE message
53: *
54: * @buf = Message buffer
55: * @Topics = MQTT subscription topics
56: * @msgID = MessageID
57: * @Dup = Duplicate message
58: * @QOS = QoS
59: * return: -1 error or >-1 message size for send
60: */
61: int
62: mqtt_msgSUBSCRIBE(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics,
63: u_short msgID, u_char Dup, u_char QOS)
64: {
65: int siz = 0;
66: struct mqtthdr *hdr;
67: mqtthdr_var_t *topic;
68: mqtt_len_t *mid;
69: mqtt_subscr_t *t;
70: u_char *qos;
71:
72: if (!buf || !Topics)
73: return -1;
74: if (QOS > MQTT_QOS_EXACTLY) {
75: mqtt_SetErr(EINVAL, "Error:: invalid QoS parameter");
76: return -1;
77: }
78: if (!msgID && QOS != MQTT_QOS_ONCE) {
79: mqtt_SetErr(EINVAL, "Error:: invalid MessageID parameter must be >0");
80: return -1;
81: }
82:
83: if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
84: return -1;
85: else {
86: hdr = (struct mqtthdr *) (buf->msg_base + siz);
87: siz += sizeof(struct mqtthdr);
88: }
89:
90: /* variable header */
91: mid = (mqtt_len_t*) (buf->msg_base + siz);
92: mid->val = htons(msgID);
93: siz += sizeof(mqtt_len_t);
94:
95: /* payload with subscriptions */
96: for (t = Topics; t && t->sub_topic.msg_base; t++) {
97: topic = (mqtthdr_var_t*) (buf->msg_base + siz);
98: topic->var_sb.val = htons(t->sub_topic.msg_len);
99: memcpy(topic->var_data, t->sub_topic.msg_base, ntohs(topic->var_sb.val));
100: siz += MQTTHDR_VAR_SIZEOF(topic);
101: qos = (buf->msg_base + siz);
102: *qos = t->sub_ret;
103: siz++;
104: }
105:
106: /* fixed header */
107: MQTTHDR_MSGINIT(hdr);
108: hdr->mqtt_msg.type = MQTT_TYPE_SUBSCRIBE;
109: hdr->mqtt_msg.qos = QOS;
110: hdr->mqtt_msg.dup = Dup ? 1 : 0;
111: hdr->mqtt_msg.retain = 0;
112: *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
113:
114: mqtt_msgRealloc(buf, siz);
115: return siz;
116: }
117:
118: /*
119: * mqtt_msgSUBACK() Create SUBACK message
120: *
121: * @buf = Message buffer
122: * @Topics = MQTT subscription topics
123: * @msgID = MessageID
124: * return: -1 error or >-1 message size for send
125: */
126: int
127: mqtt_msgSUBACK(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, u_short msgID)
128: {
129: int siz = 0;
130: struct mqtthdr *hdr;
131: mqtt_len_t *v;
132: mqtt_subscr_t *t;
133: u_char *qos;
134:
135: if (!buf || !Topics)
136: return -1;
137:
138: if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
139: return -1;
140: else {
141: hdr = (struct mqtthdr *) (buf->msg_base + siz);
142: siz += sizeof(struct mqtthdr);
143: v = (mqtt_len_t*) (buf->msg_base + siz);
144: siz += sizeof(mqtt_len_t);
145: }
146:
147: /* MessageID */
148: v->val = htons(msgID);
149:
150: /* QoS payload from subscriptions */
151: for (t = Topics; t && t->sub_topic.msg_base; t++) {
152: qos = (buf->msg_base + siz);
153: *qos = t->sub_ret;
154: siz++;
155: }
156:
157: /* fixed header */
158: MQTTHDR_MSGINIT(hdr);
159: hdr->mqtt_msg.type = MQTT_TYPE_SUBACK;
160: *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
161:
162: mqtt_msgRealloc(buf, siz);
163: return siz;
164: }
165:
166: /*
167: * mqtt_msgUNSUBSCRIBE() Create UNSUBSCRIBE message
168: *
169: * @buf = Message buffer
170: * @Topics = MQTT subscription topics
171: * @msgID = MessageID
172: * @Dup = Duplicate message
173: * @QOS = QoS
174: * return: -1 error or >-1 message size for send
175: */
176: int
177: mqtt_msgUNSUBSCRIBE(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics,
178: u_short msgID, u_char Dup, u_char QOS)
179: {
180: int siz = 0;
181: struct mqtthdr *hdr;
182: mqtthdr_var_t *topic;
183: mqtt_len_t *mid;
184: mqtt_subscr_t *t;
185:
186: if (!buf || !Topics)
187: return -1;
188: if (QOS > MQTT_QOS_EXACTLY) {
189: mqtt_SetErr(EINVAL, "Error:: invalid QoS parameter");
190: return -1;
191: }
192: if (!msgID && QOS != MQTT_QOS_ONCE) {
193: mqtt_SetErr(EINVAL, "Error:: invalid MessageID parameter must be >0");
194: return -1;
195: }
196:
197: if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
198: return -1;
199: else {
200: hdr = (struct mqtthdr *) (buf->msg_base + siz);
201: siz += sizeof(struct mqtthdr);
202: }
203:
204: /* variable header */
205: mid = (mqtt_len_t*) (buf->msg_base + siz);
206: mid->val = htons(msgID);
207: siz += sizeof(mqtt_len_t);
208:
209: /* payload with subscriptions */
210: for (t = Topics; t && t->sub_topic.msg_base; t++) {
211: topic = (mqtthdr_var_t*) (buf->msg_base + siz);
212: topic->var_sb.val = htons(t->sub_topic.msg_len);
213: memcpy(topic->var_data, t->sub_topic.msg_base, ntohs(topic->var_sb.val));
214: siz += MQTTHDR_VAR_SIZEOF(topic);
215: }
216:
217: /* fixed header */
218: MQTTHDR_MSGINIT(hdr);
219: hdr->mqtt_msg.type = MQTT_TYPE_UNSUBSCRIBE;
220: hdr->mqtt_msg.qos = QOS;
221: hdr->mqtt_msg.dup = Dup ? 1 : 0;
222: hdr->mqtt_msg.retain = 0;
223: *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
224:
225: mqtt_msgRealloc(buf, siz);
226: return siz;
227: }
228:
229: /*
230: * mqtt_msgUNSUBACK() Create UNSUBACK message
231: *
232: * @buf = Message buffer
233: * @msgID = MessageID
234: * return: -1 error or >-1 message size for send
235: */
236: int
237: mqtt_msgUNSUBACK(mqtt_msg_t * __restrict buf, u_short msgID)
238: {
239: int siz = 0;
240: struct mqtthdr *hdr;
241: mqtt_len_t *v;
242:
243: if (!buf)
244: return -1;
245:
246: if (mqtt_msgRealloc(buf, sizeof(struct mqtthdr) + sizeof(mqtt_len_t)) == -1)
247: return -1;
248: else {
249: hdr = (struct mqtthdr *) (buf->msg_base + siz);
250: siz += sizeof(struct mqtthdr);
251: v = (mqtt_len_t*) (buf->msg_base + siz);
252: siz += sizeof(mqtt_len_t);
253: }
254:
255: /* fixed header */
256: MQTTHDR_MSGINIT(hdr);
257: hdr->mqtt_msg.type = MQTT_TYPE_UNSUBACK;
258: *hdr->mqtt_len = sizeof(mqtt_len_t);
259:
260: /* MessageID */
261: v->val = htons(msgID);
262:
263: return siz;
264: }
265:
266:
267: /* ============= decode ============ */
268:
269: /*
270: * mqtt_readSUBSCRIBE() Read SUBSCRIBE message
271: *
272: * @buf = Message buffer
273: * @msgID = MessageID
274: * @subscr = Subscriptions, must be free after use with mqtt_subFree()
275: * return: NULL error or !=NULL MQTT fixed header
276: */
277: struct mqtthdr *
278: mqtt_readSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)
279: {
280: register int i;
281: int len, ret;
282: struct mqtthdr *hdr;
283: mqtthdr_var_t *var;
284: mqtt_subscr_t *subs;
285: mqtt_len_t *v;
286: caddr_t pos;
287:
288: if (!buf || !msgID || !subscr)
289: return NULL;
290:
291: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBSCRIBE, &ret, &len);
292: if (!hdr)
293: return NULL;
294: pos = buf->msg_base + ret + 1;
295: v = (mqtt_len_t*) pos;
296:
297: /* MessageID */
298: len -= sizeof(mqtt_len_t);
299: if (len < 0) {
300: mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
301: return NULL;
302: } else {
303: *msgID = ntohs(v->val);
304: pos += sizeof(mqtt_len_t);
305: }
306:
307: subs = mqtt_subAlloc(0);
308: if (!subs)
309: return NULL;
310: else
311: *subscr = subs;
312:
313: /* Subscribes */
314: for (i = 0; len > 0; i++) {
315: var = (mqtthdr_var_t*) pos;
316: len -= MQTTHDR_VAR_SIZEOF(var) + 1;
317: if (len < 0) {
318: mqtt_subFree(subscr);
319: mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
320: return NULL;
321: }
322: subs = mqtt_subRealloc(subs, i + 1);
323: if (!subs) {
324: mqtt_subFree(subscr);
325: return NULL;
326: } else
327: *subscr = subs;
328:
329: memset(&subs[i], 0, sizeof subs[i]);
330: subs[i].sub_topic.msg_len = ntohs(var->var_sb.val);
331: subs[i].sub_topic.msg_base = malloc(subs[i].sub_topic.msg_len);
332: if (!subs[i].sub_topic.msg_base) {
333: LOGERR;
334: mqtt_subFree(subscr);
335: return NULL;
336: } else
337: memcpy(subs[i].sub_topic.msg_base, var->var_data, subs[i].sub_topic.msg_len);
338: pos += MQTTHDR_VAR_SIZEOF(var);
339:
340: subs[i].sub_ret = *pos;
341: pos++;
342: }
343:
344: return hdr;
345: }
346:
347: /*
348: * mqtt_readSUBACK() Read SUBACK message
349: *
350: * @buf = Message buffer
351: * @msgID = MessageID
352: * @subqos = Subscribes QoS, must be free after use with free()
353: * return: -1 error or >-1 readed subscribes QoS elements
354: */
355: int
356: mqtt_readSUBACK(mqtt_msg_t * __restrict buf, u_short *msgID, u_char **subqos)
357: {
358: int len, ret;
359: struct mqtthdr *hdr;
360: mqtt_len_t *v;
361: caddr_t pos;
362:
363: if (!buf || !msgID || !subqos)
364: return -1;
365:
366: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBACK, &ret, &len);
367: if (!hdr)
368: return -1;
369: pos = buf->msg_base + ret + 1;
370: v = (mqtt_len_t*) pos;
371:
372: /* MessageID */
373: len -= sizeof(mqtt_len_t);
374: if (len < 0) {
375: mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
376: return -1;
377: } else {
378: *msgID = ntohs(v->val);
379: pos += sizeof(mqtt_len_t);
380: }
381:
382: /* Subscribes */
383: *subqos = malloc(len);
384: if (!*subqos) {
385: LOGERR;
386: return -1;
387: } else
388: memcpy(*subqos, pos, len);
389:
390: return len;
391: }
392:
393: /*
394: * mqtt_readUNSUBSCRIBE() Read UNSUBSCRIBE message
395: *
396: * @buf = Message buffer
397: * @msgID = MessageID
398: * @subscr = Subscriptions, must be free after use with mqtt_subFree()
399: * return: NULL error or !=NULL MQTT fixed header
400: */
401: struct mqtthdr *
402: mqtt_readUNSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)
403: {
404: register int i;
405: int len, ret;
406: struct mqtthdr *hdr;
407: mqtthdr_var_t *var;
408: mqtt_subscr_t *subs;
409: mqtt_len_t *v;
410: caddr_t pos;
411:
412: if (!buf || !msgID || !subscr)
413: return NULL;
414:
415: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBSCRIBE, &ret, &len);
416: if (!hdr)
417: return NULL;
418: pos = buf->msg_base + ret + 1;
419: v = (mqtt_len_t*) pos;
420:
421: /* MessageID */
422: len -= sizeof(mqtt_len_t);
423: if (len < 0) {
424: mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
425: return NULL;
426: } else {
427: *msgID = ntohs(v->val);
428: pos += sizeof(mqtt_len_t);
429: }
430:
431: subs = mqtt_subAlloc(0);
432: if (!subs)
433: return NULL;
434: else
435: *subscr = subs;
436:
437: /* Subscribes */
438: for (i = 0; len > 0; i++) {
439: var = (mqtthdr_var_t*) pos;
440: len -= MQTTHDR_VAR_SIZEOF(var);
441: if (len < 0) {
442: mqtt_subFree(subscr);
443: mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
444: return NULL;
445: }
446: subs = mqtt_subRealloc(subs, i + 1);
447: if (!subs) {
448: mqtt_subFree(subscr);
449: return NULL;
450: } else
451: *subscr = subs;
452:
453: memset(&subs[i], 0, sizeof subs[i]);
454: subs[i].sub_topic.msg_len = ntohs(var->var_sb.val);
455: subs[i].sub_topic.msg_base = malloc(subs[i].sub_topic.msg_len);
456: if (!subs[i].sub_topic.msg_base) {
457: LOGERR;
458: mqtt_subFree(subscr);
459: return NULL;
460: } else
461: memcpy(subs[i].sub_topic.msg_base, var->var_data, subs[i].sub_topic.msg_len);
462: pos += MQTTHDR_VAR_SIZEOF(var);
463: }
464:
465: return hdr;
466: }
467:
468: /*
469: * mqtt_readUNSUBACK() Read UNSUBACK message
470: *
471: * @buf = Message buffer
472: * return: -1 error or MessageID
473: */
474: u_short
475: mqtt_readUNSUBACK(mqtt_msg_t * __restrict buf)
476: {
477: int len, ret;
478: struct mqtthdr *hdr;
479: mqtt_len_t *v;
480: caddr_t pos;
481:
482: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBACK, &ret, &len);
483: if (!hdr)
484: return (u_short) -1;
485: if (len < sizeof(mqtt_len_t)) {
486: mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
487: return (u_short) -1;
488: } else {
489: pos = buf->msg_base + ret + 1;
490: v = (mqtt_len_t*) pos;
491: }
492:
493: return ntohs(v->val);
494: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>