Annotation of mqtt/src/sub.c, revision 1.2
1.2 ! misho 1: #include "global.h"
! 2:
! 3:
! 4: /* ------------------------------------------------------------------- */
! 5:
! 6: /*
! 7: * mqtt_msgSUBSCRIBE() Create SUBSCRIBE message
! 8: *
! 9: * @buf = Message buffer
! 10: * @Topics = MQTT subscription topics
! 11: * @msgID = MessageID
! 12: * @Dup = Duplicate message
! 13: * @QOS = QoS
! 14: * return: -1 error or >-1 message size for send
! 15: */
! 16: int
! 17: mqtt_msgSUBSCRIBE(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics,
! 18: u_short msgID, u_char Dup, u_char QOS)
! 19: {
! 20: int siz = 0;
! 21: struct mqtthdr *hdr;
! 22: mqtthdr_var_t *topic;
! 23: mqtt_v_t *mid;
! 24: mqtt_subscr_t *t;
! 25: u_char *qos;
! 26:
! 27: if (!buf || !Topics)
! 28: return -1;
! 29: if (QOS > MQTT_QOS_EXACTLY) {
! 30: mqtt_SetErr(EINVAL, "Error:: invalid QoS parameter");
! 31: return -1;
! 32: }
! 33: if (!msgID && QOS != MQTT_QOS_ONCE) {
! 34: mqtt_SetErr(EINVAL, "Error:: invalid MessageID parameter must be >0");
! 35: return -1;
! 36: }
! 37:
! 38: if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
! 39: return -1;
! 40: else {
! 41: hdr = (struct mqtthdr *) (buf->msg_base + siz);
! 42: siz += sizeof(struct mqtthdr);
! 43: }
! 44:
! 45: /* variable header */
! 46: mid = (mqtt_v_t*) (buf->msg_base + siz);
! 47: mid->val = htons(msgID);
! 48: siz += sizeof(mqtt_v_t);
! 49:
! 50: /* payload with subscriptions */
! 51: for (t = Topics; t && t->sub_topic._base; t++) {
! 52: topic = (mqtthdr_var_t*) (buf->msg_base + siz);
! 53: topic->var_sb.val = htons(t->sub_topic._size);
! 54: memcpy(topic->var_data, t->sub_topic._base, ntohs(topic->var_sb.val));
! 55: siz += MQTTHDR_VAR_SIZEOF(topic);
! 56: qos = (buf->msg_base + siz);
! 57: *qos = t->sub_ret;
! 58: siz++;
! 59: }
! 60:
! 61: /* fixed header */
! 62: MQTTHDR_MSGINIT(hdr);
! 63: hdr->mqtt_msg.type = MQTT_TYPE_SUBSCRIBE;
! 64: hdr->mqtt_msg.qos = QOS;
! 65: hdr->mqtt_msg.dup = Dup ? 1 : 0;
! 66: hdr->mqtt_msg.retain = 0;
! 67: *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
! 68:
! 69: mqtt_msgRealloc(buf, siz);
! 70: return siz;
! 71: }
! 72:
! 73: /*
! 74: * mqtt_msgSUBACK() Create SUBACK message
! 75: *
! 76: * @buf = Message buffer
! 77: * @Topics = MQTT subscription topics
! 78: * @msgID = MessageID
! 79: * return: -1 error or >-1 message size for send
! 80: */
! 81: int
! 82: mqtt_msgSUBACK(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, u_short msgID)
! 83: {
! 84: int siz = 0;
! 85: struct mqtthdr *hdr;
! 86: mqtt_v_t *v;
! 87: mqtt_subscr_t *t;
! 88: u_char *qos;
! 89:
! 90: if (!buf || !Topics)
! 91: return -1;
! 92:
! 93: if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
! 94: return -1;
! 95: else {
! 96: hdr = (struct mqtthdr *) (buf->msg_base + siz);
! 97: siz += sizeof(struct mqtthdr);
! 98: v = (mqtt_v_t*) (buf->msg_base + siz);
! 99: siz += sizeof(mqtt_v_t);
! 100: }
! 101:
! 102: /* MessageID */
! 103: v->val = htons(msgID);
! 104:
! 105: /* QoS payload from subscriptions */
! 106: for (t = Topics; t && t->sub_topic._base; t++) {
! 107: qos = (buf->msg_base + siz);
! 108: *qos = t->sub_ret;
! 109: siz++;
! 110: }
! 111:
! 112: /* fixed header */
! 113: MQTTHDR_MSGINIT(hdr);
! 114: hdr->mqtt_msg.type = MQTT_TYPE_SUBACK;
! 115: *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
! 116:
! 117: mqtt_msgRealloc(buf, siz);
! 118: return siz;
! 119: }
! 120:
! 121: /*
! 122: * mqtt_msgUNSUBSCRIBE() Create UNSUBSCRIBE message
! 123: *
! 124: * @buf = Message buffer
! 125: * @Topics = MQTT subscription topics
! 126: * @msgID = MessageID
! 127: * @Dup = Duplicate message
! 128: * @QOS = QoS
! 129: * return: -1 error or >-1 message size for send
! 130: */
! 131: int
! 132: mqtt_msgUNSUBSCRIBE(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics,
! 133: u_short msgID, u_char Dup, u_char QOS)
! 134: {
! 135: int siz = 0;
! 136: struct mqtthdr *hdr;
! 137: mqtthdr_var_t *topic;
! 138: mqtt_v_t *mid;
! 139: mqtt_subscr_t *t;
! 140:
! 141: if (!buf || !Topics)
! 142: return -1;
! 143: if (QOS > MQTT_QOS_EXACTLY) {
! 144: mqtt_SetErr(EINVAL, "Error:: invalid QoS parameter");
! 145: return -1;
! 146: }
! 147: if (!msgID && QOS != MQTT_QOS_ONCE) {
! 148: mqtt_SetErr(EINVAL, "Error:: invalid MessageID parameter must be >0");
! 149: return -1;
! 150: }
! 151:
! 152: if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
! 153: return -1;
! 154: else {
! 155: hdr = (struct mqtthdr *) (buf->msg_base + siz);
! 156: siz += sizeof(struct mqtthdr);
! 157: }
! 158:
! 159: /* variable header */
! 160: mid = (mqtt_v_t*) (buf->msg_base + siz);
! 161: mid->val = htons(msgID);
! 162: siz += sizeof(mqtt_v_t);
! 163:
! 164: /* payload with subscriptions */
! 165: for (t = Topics; t && t->sub_topic._base; t++) {
! 166: topic = (mqtthdr_var_t*) (buf->msg_base + siz);
! 167: topic->var_sb.val = htons(t->sub_topic._size);
! 168: memcpy(topic->var_data, t->sub_topic._base, ntohs(topic->var_sb.val));
! 169: siz += MQTTHDR_VAR_SIZEOF(topic);
! 170: }
! 171:
! 172: /* fixed header */
! 173: MQTTHDR_MSGINIT(hdr);
! 174: hdr->mqtt_msg.type = MQTT_TYPE_UNSUBSCRIBE;
! 175: hdr->mqtt_msg.qos = QOS;
! 176: hdr->mqtt_msg.dup = Dup ? 1 : 0;
! 177: hdr->mqtt_msg.retain = 0;
! 178: *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
! 179:
! 180: mqtt_msgRealloc(buf, siz);
! 181: return siz;
! 182: }
! 183:
! 184: /*
! 185: * mqtt_msgUNSUBACK() Create UNSUBACK message
! 186: *
! 187: * @buf = Message buffer
! 188: * @msgID = MessageID
! 189: * return: -1 error or >-1 message size for send
! 190: */
! 191: int
! 192: mqtt_msgUNSUBACK(mqtt_msg_t * __restrict buf, u_short msgID)
! 193: {
! 194: int siz = 0;
! 195: struct mqtthdr *hdr;
! 196: mqtt_v_t *v;
! 197:
! 198: if (!buf)
! 199: return -1;
! 200:
! 201: if (mqtt_msgRealloc(buf, sizeof(struct mqtthdr) + sizeof(mqtt_v_t)) == -1)
! 202: return -1;
! 203: else {
! 204: hdr = (struct mqtthdr *) (buf->msg_base + siz);
! 205: siz += sizeof(struct mqtthdr);
! 206: v = (mqtt_v_t*) (buf->msg_base + siz);
! 207: siz += sizeof(mqtt_v_t);
! 208: }
! 209:
! 210: /* fixed header */
! 211: MQTTHDR_MSGINIT(hdr);
! 212: hdr->mqtt_msg.type = MQTT_TYPE_UNSUBACK;
! 213: *hdr->mqtt_len = sizeof(mqtt_v_t);
! 214:
! 215: /* MessageID */
! 216: v->val = htons(msgID);
! 217:
! 218: return siz;
! 219: }
! 220:
! 221:
! 222: /* ============= decode ============ */
! 223:
! 224: /*
! 225: * mqtt_readSUBSCRIBE() Read SUBSCRIBE message
! 226: *
! 227: * @buf = Message buffer
! 228: * @msgID = MessageID
! 229: * @subscr = Subscriptions, must be free after use with mqtt_subFree()
! 230: * return: NULL error or !=NULL MQTT fixed header
! 231: */
! 232: struct mqtthdr *
! 233: mqtt_readSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)
! 234: {
! 235: register int i;
! 236: int len, ret;
! 237: struct mqtthdr *hdr;
! 238: mqtthdr_var_t *var;
! 239: mqtt_subscr_t *subs;
! 240: mqtt_v_t *v;
! 241: caddr_t pos;
! 242:
! 243: if (!buf || !msgID || !subscr)
! 244: return NULL;
! 245:
! 246: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBSCRIBE, &ret, &len);
! 247: if (!hdr)
! 248: return NULL;
! 249: pos = buf->msg_base + ret + 1;
! 250: v = (mqtt_v_t*) pos;
! 251:
! 252: /* MessageID */
! 253: len -= sizeof(mqtt_v_t);
! 254: if (len < 0) {
! 255: mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
! 256: return NULL;
! 257: } else {
! 258: *msgID = ntohs(v->val);
! 259: pos += sizeof(mqtt_v_t);
! 260: }
! 261:
! 262: subs = mqtt_subAlloc(0);
! 263: if (!subs)
! 264: return NULL;
! 265: else
! 266: *subscr = subs;
! 267:
! 268: /* Subscribes */
! 269: for (i = 0; len > 0; i++) {
! 270: var = (mqtthdr_var_t*) pos;
! 271: len -= MQTTHDR_VAR_SIZEOF(var) + 1;
! 272: if (len < 0) {
! 273: mqtt_subFree(subscr);
! 274: mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
! 275: return NULL;
! 276: }
! 277: subs = mqtt_subRealloc(subs, i + 1);
! 278: if (!subs) {
! 279: mqtt_subFree(subscr);
! 280: return NULL;
! 281: } else
! 282: *subscr = subs;
! 283:
! 284: memset(&subs[i], 0, sizeof subs[i]);
! 285: subs[i].sub_topic._size = ntohs(var->var_sb.val);
! 286: subs[i].sub_topic._base = malloc(subs[i].sub_topic._size);
! 287: if (!subs[i].sub_topic._base) {
! 288: LOGERR;
! 289: mqtt_subFree(subscr);
! 290: return NULL;
! 291: } else
! 292: memcpy(subs[i].sub_topic._base, var->var_data, subs[i].sub_topic._size);
! 293: pos += MQTTHDR_VAR_SIZEOF(var);
! 294:
! 295: subs[i].sub_ret = *pos;
! 296: pos++;
! 297: }
! 298:
! 299: return hdr;
! 300: }
! 301:
! 302: /*
! 303: * mqtt_readSUBACK() Read SUBACK message
! 304: *
! 305: * @buf = Message buffer
! 306: * @msgID = MessageID
! 307: * @subqos = Subscribes QoS, must be free after use with free()
! 308: * return: -1 error or >-1 readed subscribes QoS elements
! 309: */
! 310: int
! 311: mqtt_readSUBACK(mqtt_msg_t * __restrict buf, u_short *msgID, u_char **subqos)
! 312: {
! 313: int len, ret;
! 314: struct mqtthdr *hdr;
! 315: mqtt_v_t *v;
! 316: caddr_t pos;
! 317:
! 318: if (!buf || !msgID || !subqos)
! 319: return -1;
! 320:
! 321: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBACK, &ret, &len);
! 322: if (!hdr)
! 323: return -1;
! 324: pos = buf->msg_base + ret + 1;
! 325: v = (mqtt_v_t*) pos;
! 326:
! 327: /* MessageID */
! 328: len -= sizeof(mqtt_v_t);
! 329: if (len < 0) {
! 330: mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
! 331: return -1;
! 332: } else {
! 333: *msgID = ntohs(v->val);
! 334: pos += sizeof(mqtt_v_t);
! 335: }
! 336:
! 337: /* Subscribes */
! 338: *subqos = malloc(len);
! 339: if (!*subqos) {
! 340: LOGERR;
! 341: return -1;
! 342: } else
! 343: memcpy(*subqos, pos, len);
! 344:
! 345: return len;
! 346: }
! 347:
! 348: /*
! 349: * mqtt_readUNSUBSCRIBE() Read UNSUBSCRIBE message
! 350: *
! 351: * @buf = Message buffer
! 352: * @msgID = MessageID
! 353: * @subscr = Subscriptions, must be free after use with mqtt_subFree()
! 354: * return: NULL error or !=NULL MQTT fixed header
! 355: */
! 356: struct mqtthdr *
! 357: mqtt_readUNSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)
! 358: {
! 359: register int i;
! 360: int len, ret;
! 361: struct mqtthdr *hdr;
! 362: mqtthdr_var_t *var;
! 363: mqtt_subscr_t *subs;
! 364: mqtt_v_t *v;
! 365: caddr_t pos;
! 366:
! 367: if (!buf || !msgID || !subscr)
! 368: return NULL;
! 369:
! 370: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBSCRIBE, &ret, &len);
! 371: if (!hdr)
! 372: return NULL;
! 373: pos = buf->msg_base + ret + 1;
! 374: v = (mqtt_v_t*) pos;
! 375:
! 376: /* MessageID */
! 377: len -= sizeof(mqtt_v_t);
! 378: if (len < 0) {
! 379: mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
! 380: return NULL;
! 381: } else {
! 382: *msgID = ntohs(v->val);
! 383: pos += sizeof(mqtt_v_t);
! 384: }
! 385:
! 386: subs = mqtt_subAlloc(0);
! 387: if (!subs)
! 388: return NULL;
! 389: else
! 390: *subscr = subs;
! 391:
! 392: /* Subscribes */
! 393: for (i = 0; len > 0; i++) {
! 394: var = (mqtthdr_var_t*) pos;
! 395: len -= MQTTHDR_VAR_SIZEOF(var);
! 396: if (len < 0) {
! 397: mqtt_subFree(subscr);
! 398: mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
! 399: return NULL;
! 400: }
! 401: subs = mqtt_subRealloc(subs, i + 1);
! 402: if (!subs) {
! 403: mqtt_subFree(subscr);
! 404: return NULL;
! 405: } else
! 406: *subscr = subs;
! 407:
! 408: memset(&subs[i], 0, sizeof subs[i]);
! 409: subs[i].sub_topic._size = ntohs(var->var_sb.val);
! 410: subs[i].sub_topic._base = malloc(subs[i].sub_topic._size);
! 411: if (!subs[i].sub_topic._base) {
! 412: LOGERR;
! 413: mqtt_subFree(subscr);
! 414: return NULL;
! 415: } else
! 416: memcpy(subs[i].sub_topic._base, var->var_data, subs[i].sub_topic._size);
! 417: pos += MQTTHDR_VAR_SIZEOF(var);
! 418: }
! 419:
! 420: return hdr;
! 421: }
! 422:
! 423: /*
! 424: * mqtt_readUNSUBACK() Read UNSUBACK message
! 425: *
! 426: * @buf = Message buffer
! 427: * return: -1 error or MessageID
! 428: */
! 429: u_short
! 430: mqtt_readUNSUBACK(mqtt_msg_t * __restrict buf)
! 431: {
! 432: int len, ret;
! 433: struct mqtthdr *hdr;
! 434: mqtt_v_t *v;
! 435: caddr_t pos;
! 436:
! 437: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBACK, &ret, &len);
! 438: if (!hdr)
! 439: return (u_short) -1;
! 440: if (len < sizeof(mqtt_v_t)) {
! 441: mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
! 442: return (u_short) -1;
! 443: } else {
! 444: pos = buf->msg_base + ret + 1;
! 445: v = (mqtt_v_t*) pos;
! 446: }
! 447:
! 448: return ntohs(v->val);
! 449: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>