Annotation of libaitmqtt/src/read.c, revision 1.1.2.2
1.1.2.1 misho 1: /*************************************************************************
2: * (C) 2022 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@elwix.org>
4: *
5: * $Author: misho $
1.1.2.2 ! misho 6: * $Id: read.c,v 1.1.2.1 2022/09/14 18:36:23 misho Exp $
1.1.2.1 misho 7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
15: Copyright 2004 - 2022
16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47:
48:
49: /* _mqtt_readHEADER() read fixed header from MQTT message */
50: static struct mqtthdr *
51: _mqtt_readHEADER(mqtt_msg_t * __restrict buf, u_char cmd, int *len, caddr_t *next)
52: {
53: struct mqtthdr *hdr;
54: int bytes;
55:
56: if (!buf || !buf->msg_base || !buf->msg_len)
57: return NULL;
58:
59: hdr = (struct mqtthdr*) buf->msg_base;
60: if (hdr->mqtt_msg.type != cmd) {
61: mqtt_SetErr(EINVAL, "Error:: wrong command #%d should be %d",
62: hdr->mqtt_msg.type, cmd);
63: return NULL;
64: }
65:
66: if (len)
67: *len = mqtt_decodeLen(hdr->mqtt_len, &bytes);
68:
69: if (next)
70: *next = buf->msg_base + bytes + 1;
71:
72: return hdr;
73: }
74:
75:
76: /*
77: * mqtt_readCONNECT() Read elements from CONNECT message
78: *
79: * @buf = Message buffer
80: * @KASec = Keep Alive in seconds for current connection
81: * @psConnID = ConnectID
82: * @connLen = ConnectID length
83: * @psUser = Username if !=NULL
84: * @userLen = Username length
85: * @psPass = Password for Username, only if csUser is set
86: * @passLen = Password length
87: * @psWillTopic = Will Topic if !=NULL Will Flags set into message and must be e_free()
88: * @psWillMessage = Will Message, may be NULL if !NULL must be e_free() after use!
89: * return: .reserved == 1 is error or == 0 connection flags & msg ok
90: */
91: mqtthdr_connack_t
92: mqtt_readCONNECT(mqtt_msg_t * __restrict buf, u_short *KASec, char * __restrict psConnID, int connLen,
93: char * __restrict psUser, int userLen, char * __restrict psPass, int passLen,
94: char ** __restrict psWillTopic, char ** __restrict psWillMessage)
95: {
96: mqtthdr_connflgs_t flg = { MQTT_CONNFLGS_INIT };
97: mqtthdr_connack_t cack = { 1, MQTT_RETCODE_DENIED };
98: struct mqtthdr *hdr;
99: mqtthdr_var_t *var;
100: mqtt_len_t *ka;
101: int len;
102: caddr_t pos;
103:
104: if (!buf || !buf->msg_base || !buf->msg_len || !psConnID || !connLen)
105: return cack;
106:
107: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_CONNECT, &len, &pos);
108: if (!hdr)
109: return cack;
110: if (len < 12) {
111: mqtt_SetErr(EINVAL, "Short message length %d", len);
112: return cack;
113: } else
114: var = (mqtthdr_var_t*) pos;
115:
116: /* check init string & protocol */
117: if (var->var_sb.sb.l == 4 && !strcmp((char*) var->var_data, MQTT_PROTO_STR))
118: pos += var->var_sb.sb.l + sizeof(mqtt_len_t);
119: else if (var->var_sb.sb.l == 6 || strcmp((char*) var->var_data, MQTT_CONN_STR))
120: pos += var->var_sb.sb.l + sizeof(mqtt_len_t);
121: else {
122: mqtt_SetErr(EINVAL, "Invalid init string %.6s(%d)",
123: var->var_data, var->var_sb.sb.l);
124: cack.retcode = MQTT_RETCODE_REFUSE_UNAVAIL;
125: return cack;
126: }
127: switch (*pos) {
128: case MQTT_PROTO_VER_3:
129: case MQTT_PROTO_VER_311:
130: case MQTT_PROTO_VER_5:
131: pos++;
132: break;
133: default:
134: mqtt_SetErr(EINVAL, "Invalid protocol version %d", *pos);
135: cack.retcode = MQTT_RETCODE_REFUSE_VER;
136: return cack;
137: }
138: flg = *(mqtthdr_connflgs_t*) pos;
139: pos++;
140: ka = (mqtt_len_t*) pos;
141: *KASec = ntohs(ka->val);
142: pos += sizeof(mqtt_len_t);
143:
144: len -= pos - (caddr_t) var;
145:
146: /* get ConnID */
147: var = (mqtthdr_var_t*) pos;
148: len -= MQTTHDR_VAR_SIZEOF(var);
149: if (len < 0 || var->var_sb.sb.l >= MQTT_CONNID_MAX) {
150: mqtt_SetErr(EINVAL, "Unexpected EOM at Connection ID %d", len);
151: cack.retcode = MQTT_RETCODE_REFUSE_ID;
152: return cack;
153: } else {
154: memset(psConnID, 0, connLen--);
155: memcpy(psConnID, var->var_data,
156: ntohs(var->var_sb.val) > connLen ? connLen : ntohs(var->var_sb.val));
157: pos += MQTTHDR_VAR_SIZEOF(var);
158: }
159:
160: /* get Willz */
161: if (flg.will_flg) {
162: var = (mqtthdr_var_t*) pos;
163: len -= MQTTHDR_VAR_SIZEOF(var);
164: if (len < 0) {
165: mqtt_SetErr(EINVAL, "Unexpected EOM at Will Topic %d", len);
166: cack.retcode = MQTT_RETCODE_REFUSE_ID;
167: return cack;
168: } else {
169: if (psWillTopic) {
170: *psWillTopic = e_malloc(ntohs(var->var_sb.val) + 1);
171: if (!*psWillTopic) {
172: LOGERR;
173: cack.retcode = MQTT_RETCODE_REFUSE_UNAVAIL;
174: return cack;
175: } else
176: memset(*psWillTopic, 0, ntohs(var->var_sb.val) + 1);
177: memcpy(*psWillTopic, var->var_data, ntohs(var->var_sb.val));
178: }
179: pos += MQTTHDR_VAR_SIZEOF(var);
180: }
181:
182: var = (mqtthdr_var_t*) pos;
183: len -= MQTTHDR_VAR_SIZEOF(var);
184: if (len < 0) {
185: mqtt_SetErr(EINVAL, "Unexpected EOM at Will Message %d", len);
186: e_free(psWillTopic);
187: cack.retcode = MQTT_RETCODE_REFUSE_ID;
188: return cack;
189: } else {
190: if (psWillMessage) {
191: *psWillMessage = e_malloc(ntohs(var->var_sb.val) + 1);
192: if (!*psWillMessage) {
193: LOGERR;
194: e_free(psWillTopic);
195: cack.retcode = MQTT_RETCODE_REFUSE_UNAVAIL;
196: return cack;
197: } else
198: memset(*psWillMessage, 0, ntohs(var->var_sb.val) + 1);
199: memcpy(*psWillMessage, var->var_data, ntohs(var->var_sb.val));
200: }
201: pos += MQTTHDR_VAR_SIZEOF(var);
202: }
203: }
204:
205: /* get User/Pass */
206: if (flg.username) {
207: var = (mqtthdr_var_t*) pos;
208: len -= MQTTHDR_VAR_SIZEOF(var);
209: if (len < 0 || var->var_sb.sb.l > 12) {
210: mqtt_SetErr(EINVAL, "Unexpected EOM at Username %d", len);
211: if (flg.will_flg) {
212: if (psWillTopic)
213: e_free(psWillTopic);
214: if (psWillMessage)
215: e_free(psWillMessage);
216: }
217: cack.retcode = MQTT_RETCODE_REFUSE_USERPASS;
218: return cack;
219: } else {
220: if (psUser && userLen) {
221: memset(psUser, 0, userLen--);
222: memcpy(psUser, var->var_data,
223: ntohs(var->var_sb.val) > userLen ? userLen : ntohs(var->var_sb.val));
224: }
225: pos += MQTTHDR_VAR_SIZEOF(var);
226: }
227: }
228: if (flg.password) {
229: var = (mqtthdr_var_t*) pos;
230: len -= MQTTHDR_VAR_SIZEOF(var);
231: if (len < 0 || var->var_sb.sb.l > 12) {
232: mqtt_SetErr(EINVAL, "Unexpected EOM at Password %d", len);
233: if (flg.will_flg) {
234: if (psWillTopic)
235: e_free(psWillTopic);
236: if (psWillMessage)
237: e_free(psWillMessage);
238: }
239: cack.retcode = MQTT_RETCODE_REFUSE_USERPASS;
240: return cack;
241: } else {
242: if (psPass && passLen) {
243: memset(psPass, 0, passLen--);
244: memcpy(psPass, var->var_data,
245: ntohs(var->var_sb.val) > passLen ? passLen : ntohs(var->var_sb.val));
246: }
247: pos += MQTTHDR_VAR_SIZEOF(var);
248: }
249: }
250:
251: flg.reserved = 0;
252: cack.reserved = flg.flags;
253: cack.retcode = MQTT_RETCODE_ACCEPTED;
254: return cack;
255: }
256:
257: /*
258: * mqtt_readCONNACK() Read CONNACK message
259: *
260: * @buf = Message buffer
261: * return: -1 error or >-1 CONNECT message return code
262: */
263: u_char
264: mqtt_readCONNACK(mqtt_msg_t * __restrict buf)
265: {
266: int len;
267: struct mqtthdr *hdr;
268: mqtthdr_connack_t *ack;
269: caddr_t pos;
270:
271: if (!buf || !buf->msg_base || !buf->msg_len)
272: return (u_char) -1;
273:
274: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_CONNACK, &len, &pos);
275: if (!hdr)
276: return (u_char) -1;
277: if (len < sizeof(mqtthdr_connack_t)) {
278: mqtt_SetErr(EINVAL, "Short message length %d", len);
279: return (u_char) -1;
280: } else
281: ack = (mqtthdr_connack_t*) pos;
282:
283: if (ack->retcode > MQTT_RETCODE_DENIED) {
284: mqtt_SetErr(EINVAL, "Invalid retcode %u", ack->retcode);
285: return (u_char) -1;
286: }
287:
288: return ack->retcode;
289: }
290:
291: /*
292: * mqtt_readDISCONNECT() Read DISCONNECT message
293: *
294: * @buf = Message buffer
295: * return: -1 error, 0 ok, >0 undefined result
296: */
297: int
298: mqtt_readDISCONNECT(mqtt_msg_t * __restrict buf)
299: {
300: int len;
301:
302: if (!_mqtt_readHEADER(buf, MQTT_TYPE_DISCONNECT, &len, NULL))
303: return -1;
304:
305: return len;
306: }
307:
308: /*
309: * mqtt_readPINGREQ() Read PINGREQ message
310: *
311: * @buf = Message buffer
312: * return: -1 error, 0 ok, >0 undefined result
313: */
314: int
315: mqtt_readPINGREQ(mqtt_msg_t * __restrict buf)
316: {
317: int len;
318:
319: if (!_mqtt_readHEADER(buf, MQTT_TYPE_PINGREQ, &len, NULL))
320: return -1;
321:
322: return len;
323: }
324:
325: /*
326: * mqtt_readPINGRESP() Read PINGRESP message
327: *
328: * @buf = Message buffer
329: * return: -1 error, 0 ok, >0 undefined result
330: */
331: int
332: mqtt_readPINGRESP(mqtt_msg_t * __restrict buf)
333: {
334: int len;
335:
336: if (!_mqtt_readHEADER(buf, MQTT_TYPE_PINGRESP, &len, NULL))
337: return -1;
338:
339: return len;
340: }
341:
342: /*
343: * mqtt_readPUBLISH() Read PUBLISH message
344: *
345: * @buf = Message buffer
346: * @psTopic = Topic
347: * @topicLen = Topic length
348: * @msgID = MessageID
349: * @pData = Data buffer, may be NULL
350: * return: -1 error or !=-1 allocated data buffer length
351: */
352: int
353: mqtt_readPUBLISH(mqtt_msg_t * __restrict buf, char * __restrict psTopic, int topicLen,
354: u_short *msgID, void ** __restrict pData)
355: {
356: int len;
357: struct mqtthdr *hdr;
358: mqtthdr_var_t *var;
359: mqtt_len_t *v;
360: caddr_t pos;
361:
362: if (!buf || !psTopic || !msgID)
363: return -1;
364:
365: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBLISH, &len, &pos);
366: if (!hdr)
367: return -1;
368: else
369: var = (mqtthdr_var_t*) pos;
370:
371: /* topic */
372: len -= MQTTHDR_VAR_SIZEOF(var);
373: if (len < 0) {
374: mqtt_SetErr(EINVAL, "Short message length %d", len);
375: return -1;
376: } else {
377: memset(psTopic, 0, topicLen--);
378: memcpy(psTopic, var->var_data, ntohs(var->var_sb.val) > topicLen ?
379: topicLen : ntohs(var->var_sb.val));
380: pos += MQTTHDR_VAR_SIZEOF(var);
381: v = (mqtt_len_t*) pos;
382: }
383:
384: len -= sizeof(mqtt_len_t);
385: if (len < 0) {
386: mqtt_SetErr(EINVAL, "Short message length %d", len);
387: return -1;
388: } else {
389: *msgID = ntohs(v->val);
390: pos += sizeof(mqtt_len_t);
391: }
392:
393: /* data */
394: if (len < 0) {
395: mqtt_SetErr(EINVAL, "Short message length %d", len);
396: return -1;
397: } else if (pData) {
398: if (!(*pData = e_malloc(len + 1))) {
399: LOGERR;
400: return -1;
401: } else
402: ((char*) (*pData))[len] = 0;
403:
404: memcpy(*pData, pos, len);
405: }
406:
407: return len;
408: }
409:
410: /*
411: * mqtt_readPUBACK() Read PUBACK message
412: *
413: * @buf = Message buffer
414: * return: -1 error or MessageID
415: */
416: u_short
417: mqtt_readPUBACK(mqtt_msg_t * __restrict buf)
418: {
419: int len;
420: struct mqtthdr *hdr;
421: mqtt_len_t *v;
422: caddr_t pos;
423:
424: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBACK, &len, &pos);
425: if (!hdr)
426: return (u_short) -1;
427: if (len < sizeof(mqtt_len_t)) {
428: mqtt_SetErr(EINVAL, "Short message length %d", len);
429: return (u_short) -1;
430: } else
431: v = (mqtt_len_t*) pos;
432:
433: return ntohs(v->val);
434: }
435:
436: /*
437: * mqtt_readPUBREC() Read PUBREC message
438: *
439: * @buf = Message buffer
440: * return: -1 error or MessageID
441: */
442: u_short
443: mqtt_readPUBREC(mqtt_msg_t * __restrict buf)
444: {
445: int len;
446: struct mqtthdr *hdr;
447: mqtt_len_t *v;
448: caddr_t pos;
449:
450: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBREC, &len, &pos);
451: if (!hdr)
452: return (u_short) -1;
453: if (len < sizeof(mqtt_len_t)) {
454: mqtt_SetErr(EINVAL, "Short message length %d", len);
455: return (u_short) -1;
456: } else
457: v = (mqtt_len_t*) pos;
458:
459: return ntohs(v->val);
460: }
461:
462: /*
463: * mqtt_readPUBREL() Read PUBREL message
464: *
465: * @buf = Message buffer
466: * return: -1 error or MessageID
467: */
468: u_short
469: mqtt_readPUBREL(mqtt_msg_t * __restrict buf)
470: {
471: int len;
472: struct mqtthdr *hdr;
473: mqtt_len_t *v;
474: caddr_t pos;
475:
476: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBREL, &len, &pos);
477: if (!hdr)
478: return (u_short) -1;
479: if (len < sizeof(mqtt_len_t)) {
480: mqtt_SetErr(EINVAL, "Short message length %d", len);
481: return (u_short) -1;
482: } else
483: v = (mqtt_len_t*) pos;
484:
485: return ntohs(v->val);
486: }
487:
488: /*
489: * mqtt_readPUBCOMP() Read PUBCOMP message
490: *
491: * @buf = Message buffer
492: * return: -1 error or MessageID
493: */
494: u_short
495: mqtt_readPUBCOMP(mqtt_msg_t * __restrict buf)
496: {
497: int len;
498: struct mqtthdr *hdr;
499: mqtt_len_t *v;
500: caddr_t pos;
501:
502: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBCOMP, &len, &pos);
503: if (!hdr)
504: return (u_short) -1;
505: if (len < sizeof(mqtt_len_t)) {
506: mqtt_SetErr(EINVAL, "Short message length %d", len);
507: return (u_short) -1;
508: } else
509: v = (mqtt_len_t*) pos;
510:
511: return ntohs(v->val);
512: }
1.1.2.2 ! misho 513:
! 514: /*
! 515: * mqtt_readSUBSCRIBE() Read SUBSCRIBE message
! 516: *
! 517: * @buf = Message buffe
! 518: * @msgID = MessageID
! 519: * @subscr = Subscriptions, must be free after use with mqtt_subFree()
! 520: * return: -1 error or >-1 elements into subscr
! 521: */
! 522: int
! 523: mqtt_readSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)
! 524: {
! 525: register int i;
! 526: int len;
! 527: struct mqtthdr *hdr;
! 528: mqtthdr_var_t *var;
! 529: mqtt_subscr_t *subs;
! 530: mqtt_len_t *v;
! 531: caddr_t pos;
! 532:
! 533: if (!buf || !msgID || !subscr)
! 534: return -1;
! 535:
! 536: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBSCRIBE, &len, &pos);
! 537: if (!hdr)
! 538: return -1;
! 539: else
! 540: v = (mqtt_len_t*) pos;
! 541:
! 542: /* MessageID */
! 543: len -= sizeof(mqtt_len_t);
! 544: if (len < 0) {
! 545: mqtt_SetErr(EINVAL, "Short message length %d", len);
! 546: return -1;
! 547: } else {
! 548: *msgID = ntohs(v->val);
! 549: pos += sizeof(mqtt_len_t);
! 550: }
! 551:
! 552: subs = mqtt_subAlloc(0);
! 553: if (!subs)
! 554: return -1;
! 555: else
! 556: *subscr = subs;
! 557:
! 558: /* Subscribes */
! 559: for (i = 0; len > 0; i++) {
! 560: var = (mqtthdr_var_t*) pos;
! 561: len -= MQTTHDR_VAR_SIZEOF(var) + 1;
! 562: if (len < 0) {
! 563: mqtt_subFree(subscr);
! 564: mqtt_SetErr(EINVAL, "Short message length %d", len);
! 565: return -1;
! 566: }
! 567: if (!mqtt_subRealloc(&subs, i + 1)) {
! 568: mqtt_subFree(subscr);
! 569: return -1;
! 570: } else
! 571: *subscr = subs;
! 572:
! 573: memset(&subs[i], 0, sizeof subs[i]);
! 574: subs[i].sub_topic.msg_len = ntohs(var->var_sb.val);
! 575: subs[i].sub_topic.msg_base = e_malloc(subs[i].sub_topic.msg_len + 1);
! 576: if (!subs[i].sub_topic.msg_base) {
! 577: LOGERR;
! 578: mqtt_subFree(subscr);
! 579: return -1;
! 580: } else {
! 581: memcpy(subs[i].sub_topic.msg_base, var->var_data, subs[i].sub_topic.msg_len);
! 582: ((char*) subs[i].sub_topic.msg_base)[subs[i].sub_topic.msg_len] = 0;
! 583: }
! 584: pos += MQTTHDR_VAR_SIZEOF(var);
! 585:
! 586: subs[i].sub_qos = *pos;
! 587: pos++;
! 588: }
! 589:
! 590: return i;
! 591: }
! 592:
! 593: /*
! 594: * mqtt_readSUBACK() Read SUBACK message
! 595: *
! 596: * @buf = Message buffer
! 597: * @msgID = MessageID
! 598: * @subqos = Subscribes QoS, must be free after use with e_free()
! 599: * return: -1 error or >-1 readed subscribes QoS elements
! 600: */
! 601: int
! 602: mqtt_readSUBACK(mqtt_msg_t * __restrict buf, u_short *msgID, u_char **subqos)
! 603: {
! 604: int len;
! 605: struct mqtthdr *hdr;
! 606: mqtt_len_t *v;
! 607: caddr_t pos;
! 608:
! 609: if (!buf || !msgID || !subqos)
! 610: return -1;
! 611:
! 612: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBACK, &len, &pos);
! 613: if (!hdr)
! 614: return -1;
! 615: else
! 616: v = (mqtt_len_t*) pos;
! 617:
! 618: /* MessageID */
! 619: len -= sizeof(mqtt_len_t);
! 620: if (len < 0) {
! 621: mqtt_SetErr(EINVAL, "Short message length %d", len);
! 622: return -1;
! 623: } else {
! 624: *msgID = ntohs(v->val);
! 625: pos += sizeof(mqtt_len_t);
! 626: }
! 627:
! 628: /* Subscribes */
! 629: *subqos = e_malloc(len);
! 630: if (!*subqos) {
! 631: LOGERR;
! 632: return -1;
! 633: } else
! 634: memcpy(*subqos, pos, len);
! 635:
! 636: return len;
! 637: }
! 638:
! 639: /*
! 640: * mqtt_readUNSUBSCRIBE() Read UNSUBSCRIBE message
! 641: *
! 642: * @buf = Message buffer
! 643: * @msgID = MessageID
! 644: * @subscr = Subscriptions, must be free after use with mqtt_subFree()
! 645: * return: -1 error or >-1 elements into subscr
! 646: */
! 647: int
! 648: mqtt_readUNSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)
! 649: {
! 650: register int i;
! 651: int len;
! 652: struct mqtthdr *hdr;
! 653: mqtthdr_var_t *var;
! 654: mqtt_subscr_t *subs;
! 655: mqtt_len_t *v;
! 656: caddr_t pos;
! 657:
! 658: if (!buf || !msgID || !subscr)
! 659: return -1;
! 660:
! 661: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBSCRIBE, &len, &pos);
! 662: if (!hdr)
! 663: return -1;
! 664: else
! 665: v = (mqtt_len_t*) pos;
! 666:
! 667: /* MessageID */
! 668: len -= sizeof(mqtt_len_t);
! 669: if (len < 0) {
! 670: mqtt_SetErr(EINVAL, "Short message length %d", len);
! 671: return -1;
! 672: } else {
! 673: *msgID = ntohs(v->val);
! 674: pos += sizeof(mqtt_len_t);
! 675: }
! 676:
! 677: subs = mqtt_subAlloc(0);
! 678: if (!subs)
! 679: return -1;
! 680: else
! 681: *subscr = subs;
! 682:
! 683: /* Subscribes */
! 684: for (i = 0; len > 0; i++) {
! 685: var = (mqtthdr_var_t*) pos;
! 686: len -= MQTTHDR_VAR_SIZEOF(var);
! 687: if (len < 0) {
! 688: mqtt_subFree(subscr);
! 689: mqtt_SetErr(EINVAL, "Short message length %d", len);
! 690: return -1;
! 691: }
! 692: if (!mqtt_subRealloc(&subs, i + 1)) {
! 693: mqtt_subFree(subscr);
! 694: return -1;
! 695: } else
! 696: *subscr = subs;
! 697:
! 698: memset(&subs[i], 0, sizeof subs[i]);
! 699: subs[i].sub_topic.msg_len = ntohs(var->var_sb.val);
! 700: subs[i].sub_topic.msg_base = e_malloc(subs[i].sub_topic.msg_len + 1);
! 701: if (!subs[i].sub_topic.msg_base) {
! 702: LOGERR;
! 703: mqtt_subFree(subscr);
! 704: return -1;
! 705: } else {
! 706: memcpy(subs[i].sub_topic.msg_base, var->var_data, subs[i].sub_topic.msg_len);
! 707: ((char*) subs[i].sub_topic.msg_base)[subs[i].sub_topic.msg_len] = 0;
! 708: }
! 709: pos += MQTTHDR_VAR_SIZEOF(var);
! 710: }
! 711:
! 712: return i;
! 713: }
! 714:
! 715: /*
! 716: * mqtt_readUNSUBACK() Read UNSUBACK message
! 717: *
! 718: * @buf = Message buffer
! 719: * return: -1 error or MessageID
! 720: */
! 721: u_short
! 722: mqtt_readUNSUBACK(mqtt_msg_t * __restrict buf)
! 723: {
! 724: int len;
! 725: struct mqtthdr *hdr;
! 726: mqtt_len_t *v;
! 727: caddr_t pos;
! 728:
! 729: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBACK, &len, &pos);
! 730: if (!hdr)
! 731: return (u_short) -1;
! 732: if (len < sizeof(mqtt_len_t)) {
! 733: mqtt_SetErr(EINVAL, "Short message length %d", len);
! 734: return (u_short) -1;
! 735: } else
! 736: v = (mqtt_len_t*) pos;
! 737:
! 738: return ntohs(v->val);
! 739: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>