Annotation of libaitmqtt/src/sub.c, revision 1.1
1.1 ! misho 1: /*************************************************************************
! 2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
! 3: * by Michael Pounov <misho@openbsd-bg.org>
! 4: *
! 5: * $Author: misho $
! 6: * $Id: aitsched.c,v 1.5 2012/01/24 21:59:47 misho Exp $
! 7: *
! 8: **************************************************************************
! 9: The ELWIX and AITNET software is distributed under the following
! 10: terms:
! 11:
! 12: All of the documentation and software included in the ELWIX and AITNET
! 13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
! 14:
! 15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011
! 16: by Michael Pounov <misho@elwix.org>. All rights reserved.
! 17:
! 18: Redistribution and use in source and binary forms, with or without
! 19: modification, are permitted provided that the following conditions
! 20: are met:
! 21: 1. Redistributions of source code must retain the above copyright
! 22: notice, this list of conditions and the following disclaimer.
! 23: 2. Redistributions in binary form must reproduce the above copyright
! 24: notice, this list of conditions and the following disclaimer in the
! 25: documentation and/or other materials provided with the distribution.
! 26: 3. All advertising materials mentioning features or use of this software
! 27: must display the following acknowledgement:
! 28: This product includes software developed by Michael Pounov <misho@elwix.org>
! 29: ELWIX - Embedded LightWeight unIX and its contributors.
! 30: 4. Neither the name of AITNET nor the names of its contributors
! 31: may be used to endorse or promote products derived from this software
! 32: without specific prior written permission.
! 33:
! 34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
! 35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
! 36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
! 37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
! 38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
! 39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
! 40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
! 41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
! 42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
! 43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
! 44: SUCH DAMAGE.
! 45: */
! 46: #include "global.h"
! 47:
! 48:
! 49: /* ------------------------------------------------------------------- */
! 50:
! 51: /*
! 52: * mqtt_msgSUBSCRIBE() Create SUBSCRIBE message
! 53: *
! 54: * @buf = Message buffer
! 55: * @Topics = MQTT subscription topics
! 56: * @msgID = MessageID
! 57: * @Dup = Duplicate message
! 58: * @QOS = QoS
! 59: * return: -1 error or >-1 message size for send
! 60: */
! 61: int
! 62: mqtt_msgSUBSCRIBE(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics,
! 63: u_short msgID, u_char Dup, u_char QOS)
! 64: {
! 65: int siz = 0;
! 66: struct mqtthdr *hdr;
! 67: mqtthdr_var_t *topic;
! 68: mqtt_len_t *mid;
! 69: mqtt_subscr_t *t;
! 70: u_char *qos;
! 71:
! 72: if (!buf || !Topics)
! 73: return -1;
! 74: if (QOS > MQTT_QOS_EXACTLY) {
! 75: mqtt_SetErr(EINVAL, "Error:: invalid QoS parameter");
! 76: return -1;
! 77: }
! 78: if (!msgID && QOS != MQTT_QOS_ONCE) {
! 79: mqtt_SetErr(EINVAL, "Error:: invalid MessageID parameter must be >0");
! 80: return -1;
! 81: }
! 82:
! 83: if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
! 84: return -1;
! 85: else {
! 86: hdr = (struct mqtthdr *) (buf->msg_base + siz);
! 87: siz += sizeof(struct mqtthdr);
! 88: }
! 89:
! 90: /* variable header */
! 91: mid = (mqtt_len_t*) (buf->msg_base + siz);
! 92: mid->val = htons(msgID);
! 93: siz += sizeof(mqtt_len_t);
! 94:
! 95: /* payload with subscriptions */
! 96: for (t = Topics; t && t->sub_topic.msg_base; t++) {
! 97: topic = (mqtthdr_var_t*) (buf->msg_base + siz);
! 98: topic->var_sb.val = htons(t->sub_topic.msg_len);
! 99: memcpy(topic->var_data, t->sub_topic.msg_base, ntohs(topic->var_sb.val));
! 100: siz += MQTTHDR_VAR_SIZEOF(topic);
! 101: qos = (buf->msg_base + siz);
! 102: *qos = t->sub_ret;
! 103: siz++;
! 104: }
! 105:
! 106: /* fixed header */
! 107: MQTTHDR_MSGINIT(hdr);
! 108: hdr->mqtt_msg.type = MQTT_TYPE_SUBSCRIBE;
! 109: hdr->mqtt_msg.qos = QOS;
! 110: hdr->mqtt_msg.dup = Dup ? 1 : 0;
! 111: hdr->mqtt_msg.retain = 0;
! 112: *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
! 113:
! 114: mqtt_msgRealloc(buf, siz);
! 115: return siz;
! 116: }
! 117:
! 118: /*
! 119: * mqtt_msgSUBACK() Create SUBACK message
! 120: *
! 121: * @buf = Message buffer
! 122: * @Topics = MQTT subscription topics
! 123: * @msgID = MessageID
! 124: * return: -1 error or >-1 message size for send
! 125: */
! 126: int
! 127: mqtt_msgSUBACK(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, u_short msgID)
! 128: {
! 129: int siz = 0;
! 130: struct mqtthdr *hdr;
! 131: mqtt_len_t *v;
! 132: mqtt_subscr_t *t;
! 133: u_char *qos;
! 134:
! 135: if (!buf || !Topics)
! 136: return -1;
! 137:
! 138: if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
! 139: return -1;
! 140: else {
! 141: hdr = (struct mqtthdr *) (buf->msg_base + siz);
! 142: siz += sizeof(struct mqtthdr);
! 143: v = (mqtt_len_t*) (buf->msg_base + siz);
! 144: siz += sizeof(mqtt_len_t);
! 145: }
! 146:
! 147: /* MessageID */
! 148: v->val = htons(msgID);
! 149:
! 150: /* QoS payload from subscriptions */
! 151: for (t = Topics; t && t->sub_topic.msg_base; t++) {
! 152: qos = (buf->msg_base + siz);
! 153: *qos = t->sub_ret;
! 154: siz++;
! 155: }
! 156:
! 157: /* fixed header */
! 158: MQTTHDR_MSGINIT(hdr);
! 159: hdr->mqtt_msg.type = MQTT_TYPE_SUBACK;
! 160: *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
! 161:
! 162: mqtt_msgRealloc(buf, siz);
! 163: return siz;
! 164: }
! 165:
! 166: /*
! 167: * mqtt_msgUNSUBSCRIBE() Create UNSUBSCRIBE message
! 168: *
! 169: * @buf = Message buffer
! 170: * @Topics = MQTT subscription topics
! 171: * @msgID = MessageID
! 172: * @Dup = Duplicate message
! 173: * @QOS = QoS
! 174: * return: -1 error or >-1 message size for send
! 175: */
! 176: int
! 177: mqtt_msgUNSUBSCRIBE(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics,
! 178: u_short msgID, u_char Dup, u_char QOS)
! 179: {
! 180: int siz = 0;
! 181: struct mqtthdr *hdr;
! 182: mqtthdr_var_t *topic;
! 183: mqtt_len_t *mid;
! 184: mqtt_subscr_t *t;
! 185:
! 186: if (!buf || !Topics)
! 187: return -1;
! 188: if (QOS > MQTT_QOS_EXACTLY) {
! 189: mqtt_SetErr(EINVAL, "Error:: invalid QoS parameter");
! 190: return -1;
! 191: }
! 192: if (!msgID && QOS != MQTT_QOS_ONCE) {
! 193: mqtt_SetErr(EINVAL, "Error:: invalid MessageID parameter must be >0");
! 194: return -1;
! 195: }
! 196:
! 197: if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
! 198: return -1;
! 199: else {
! 200: hdr = (struct mqtthdr *) (buf->msg_base + siz);
! 201: siz += sizeof(struct mqtthdr);
! 202: }
! 203:
! 204: /* variable header */
! 205: mid = (mqtt_len_t*) (buf->msg_base + siz);
! 206: mid->val = htons(msgID);
! 207: siz += sizeof(mqtt_len_t);
! 208:
! 209: /* payload with subscriptions */
! 210: for (t = Topics; t && t->sub_topic.msg_base; t++) {
! 211: topic = (mqtthdr_var_t*) (buf->msg_base + siz);
! 212: topic->var_sb.val = htons(t->sub_topic.msg_len);
! 213: memcpy(topic->var_data, t->sub_topic.msg_base, ntohs(topic->var_sb.val));
! 214: siz += MQTTHDR_VAR_SIZEOF(topic);
! 215: }
! 216:
! 217: /* fixed header */
! 218: MQTTHDR_MSGINIT(hdr);
! 219: hdr->mqtt_msg.type = MQTT_TYPE_UNSUBSCRIBE;
! 220: hdr->mqtt_msg.qos = QOS;
! 221: hdr->mqtt_msg.dup = Dup ? 1 : 0;
! 222: hdr->mqtt_msg.retain = 0;
! 223: *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
! 224:
! 225: mqtt_msgRealloc(buf, siz);
! 226: return siz;
! 227: }
! 228:
! 229: /*
! 230: * mqtt_msgUNSUBACK() Create UNSUBACK message
! 231: *
! 232: * @buf = Message buffer
! 233: * @msgID = MessageID
! 234: * return: -1 error or >-1 message size for send
! 235: */
! 236: int
! 237: mqtt_msgUNSUBACK(mqtt_msg_t * __restrict buf, u_short msgID)
! 238: {
! 239: int siz = 0;
! 240: struct mqtthdr *hdr;
! 241: mqtt_len_t *v;
! 242:
! 243: if (!buf)
! 244: return -1;
! 245:
! 246: if (mqtt_msgRealloc(buf, sizeof(struct mqtthdr) + sizeof(mqtt_len_t)) == -1)
! 247: return -1;
! 248: else {
! 249: hdr = (struct mqtthdr *) (buf->msg_base + siz);
! 250: siz += sizeof(struct mqtthdr);
! 251: v = (mqtt_len_t*) (buf->msg_base + siz);
! 252: siz += sizeof(mqtt_len_t);
! 253: }
! 254:
! 255: /* fixed header */
! 256: MQTTHDR_MSGINIT(hdr);
! 257: hdr->mqtt_msg.type = MQTT_TYPE_UNSUBACK;
! 258: *hdr->mqtt_len = sizeof(mqtt_len_t);
! 259:
! 260: /* MessageID */
! 261: v->val = htons(msgID);
! 262:
! 263: return siz;
! 264: }
! 265:
! 266:
! 267: /* ============= decode ============ */
! 268:
! 269: /*
! 270: * mqtt_readSUBSCRIBE() Read SUBSCRIBE message
! 271: *
! 272: * @buf = Message buffer
! 273: * @msgID = MessageID
! 274: * @subscr = Subscriptions, must be free after use with mqtt_subFree()
! 275: * return: NULL error or !=NULL MQTT fixed header
! 276: */
! 277: struct mqtthdr *
! 278: mqtt_readSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)
! 279: {
! 280: register int i;
! 281: int len, ret;
! 282: struct mqtthdr *hdr;
! 283: mqtthdr_var_t *var;
! 284: mqtt_subscr_t *subs;
! 285: mqtt_len_t *v;
! 286: caddr_t pos;
! 287:
! 288: if (!buf || !msgID || !subscr)
! 289: return NULL;
! 290:
! 291: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBSCRIBE, &ret, &len);
! 292: if (!hdr)
! 293: return NULL;
! 294: pos = buf->msg_base + ret + 1;
! 295: v = (mqtt_len_t*) pos;
! 296:
! 297: /* MessageID */
! 298: len -= sizeof(mqtt_len_t);
! 299: if (len < 0) {
! 300: mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
! 301: return NULL;
! 302: } else {
! 303: *msgID = ntohs(v->val);
! 304: pos += sizeof(mqtt_len_t);
! 305: }
! 306:
! 307: subs = mqtt_subAlloc(0);
! 308: if (!subs)
! 309: return NULL;
! 310: else
! 311: *subscr = subs;
! 312:
! 313: /* Subscribes */
! 314: for (i = 0; len > 0; i++) {
! 315: var = (mqtthdr_var_t*) pos;
! 316: len -= MQTTHDR_VAR_SIZEOF(var) + 1;
! 317: if (len < 0) {
! 318: mqtt_subFree(subscr);
! 319: mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
! 320: return NULL;
! 321: }
! 322: subs = mqtt_subRealloc(subs, i + 1);
! 323: if (!subs) {
! 324: mqtt_subFree(subscr);
! 325: return NULL;
! 326: } else
! 327: *subscr = subs;
! 328:
! 329: memset(&subs[i], 0, sizeof subs[i]);
! 330: subs[i].sub_topic.msg_len = ntohs(var->var_sb.val);
! 331: subs[i].sub_topic.msg_base = malloc(subs[i].sub_topic.msg_len);
! 332: if (!subs[i].sub_topic.msg_base) {
! 333: LOGERR;
! 334: mqtt_subFree(subscr);
! 335: return NULL;
! 336: } else
! 337: memcpy(subs[i].sub_topic.msg_base, var->var_data, subs[i].sub_topic.msg_len);
! 338: pos += MQTTHDR_VAR_SIZEOF(var);
! 339:
! 340: subs[i].sub_ret = *pos;
! 341: pos++;
! 342: }
! 343:
! 344: return hdr;
! 345: }
! 346:
! 347: /*
! 348: * mqtt_readSUBACK() Read SUBACK message
! 349: *
! 350: * @buf = Message buffer
! 351: * @msgID = MessageID
! 352: * @subqos = Subscribes QoS, must be free after use with free()
! 353: * return: -1 error or >-1 readed subscribes QoS elements
! 354: */
! 355: int
! 356: mqtt_readSUBACK(mqtt_msg_t * __restrict buf, u_short *msgID, u_char **subqos)
! 357: {
! 358: int len, ret;
! 359: struct mqtthdr *hdr;
! 360: mqtt_len_t *v;
! 361: caddr_t pos;
! 362:
! 363: if (!buf || !msgID || !subqos)
! 364: return -1;
! 365:
! 366: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBACK, &ret, &len);
! 367: if (!hdr)
! 368: return -1;
! 369: pos = buf->msg_base + ret + 1;
! 370: v = (mqtt_len_t*) pos;
! 371:
! 372: /* MessageID */
! 373: len -= sizeof(mqtt_len_t);
! 374: if (len < 0) {
! 375: mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
! 376: return -1;
! 377: } else {
! 378: *msgID = ntohs(v->val);
! 379: pos += sizeof(mqtt_len_t);
! 380: }
! 381:
! 382: /* Subscribes */
! 383: *subqos = malloc(len);
! 384: if (!*subqos) {
! 385: LOGERR;
! 386: return -1;
! 387: } else
! 388: memcpy(*subqos, pos, len);
! 389:
! 390: return len;
! 391: }
! 392:
! 393: /*
! 394: * mqtt_readUNSUBSCRIBE() Read UNSUBSCRIBE message
! 395: *
! 396: * @buf = Message buffer
! 397: * @msgID = MessageID
! 398: * @subscr = Subscriptions, must be free after use with mqtt_subFree()
! 399: * return: NULL error or !=NULL MQTT fixed header
! 400: */
! 401: struct mqtthdr *
! 402: mqtt_readUNSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)
! 403: {
! 404: register int i;
! 405: int len, ret;
! 406: struct mqtthdr *hdr;
! 407: mqtthdr_var_t *var;
! 408: mqtt_subscr_t *subs;
! 409: mqtt_len_t *v;
! 410: caddr_t pos;
! 411:
! 412: if (!buf || !msgID || !subscr)
! 413: return NULL;
! 414:
! 415: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBSCRIBE, &ret, &len);
! 416: if (!hdr)
! 417: return NULL;
! 418: pos = buf->msg_base + ret + 1;
! 419: v = (mqtt_len_t*) pos;
! 420:
! 421: /* MessageID */
! 422: len -= sizeof(mqtt_len_t);
! 423: if (len < 0) {
! 424: mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
! 425: return NULL;
! 426: } else {
! 427: *msgID = ntohs(v->val);
! 428: pos += sizeof(mqtt_len_t);
! 429: }
! 430:
! 431: subs = mqtt_subAlloc(0);
! 432: if (!subs)
! 433: return NULL;
! 434: else
! 435: *subscr = subs;
! 436:
! 437: /* Subscribes */
! 438: for (i = 0; len > 0; i++) {
! 439: var = (mqtthdr_var_t*) pos;
! 440: len -= MQTTHDR_VAR_SIZEOF(var);
! 441: if (len < 0) {
! 442: mqtt_subFree(subscr);
! 443: mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
! 444: return NULL;
! 445: }
! 446: subs = mqtt_subRealloc(subs, i + 1);
! 447: if (!subs) {
! 448: mqtt_subFree(subscr);
! 449: return NULL;
! 450: } else
! 451: *subscr = subs;
! 452:
! 453: memset(&subs[i], 0, sizeof subs[i]);
! 454: subs[i].sub_topic.msg_len = ntohs(var->var_sb.val);
! 455: subs[i].sub_topic.msg_base = malloc(subs[i].sub_topic.msg_len);
! 456: if (!subs[i].sub_topic.msg_base) {
! 457: LOGERR;
! 458: mqtt_subFree(subscr);
! 459: return NULL;
! 460: } else
! 461: memcpy(subs[i].sub_topic.msg_base, var->var_data, subs[i].sub_topic.msg_len);
! 462: pos += MQTTHDR_VAR_SIZEOF(var);
! 463: }
! 464:
! 465: return hdr;
! 466: }
! 467:
! 468: /*
! 469: * mqtt_readUNSUBACK() Read UNSUBACK message
! 470: *
! 471: * @buf = Message buffer
! 472: * return: -1 error or MessageID
! 473: */
! 474: u_short
! 475: mqtt_readUNSUBACK(mqtt_msg_t * __restrict buf)
! 476: {
! 477: int len, ret;
! 478: struct mqtthdr *hdr;
! 479: mqtt_len_t *v;
! 480: caddr_t pos;
! 481:
! 482: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBACK, &ret, &len);
! 483: if (!hdr)
! 484: return (u_short) -1;
! 485: if (len < sizeof(mqtt_len_t)) {
! 486: mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
! 487: return (u_short) -1;
! 488: } else {
! 489: pos = buf->msg_base + ret + 1;
! 490: v = (mqtt_len_t*) pos;
! 491: }
! 492:
! 493: return ntohs(v->val);
! 494: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>