Annotation of mqtt/src/sub.c, revision 1.1.2.11
1.1.2.1 misho 1: #include "global.h"
2:
3:
4: /* ------------------------------------------------------------------- */
5:
1.1.2.5 misho 6: /*
7: * mqtt_msgSUBSCRIBE() Create SUBSCRIBE message
8: *
9: * @buf = Message buffer
10: * @Topics = MQTT subscription topics
11: * @msgID = MessageID
12: * @Dup = Duplicate message
13: * @QOS = QoS
14: * return: -1 error or >-1 message size for send
15: */
1.1.2.2 misho 16: int
1.1.2.5 misho 17: mqtt_msgSUBSCRIBE(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics,
18: u_short msgID, u_char Dup, u_char QOS)
1.1.2.2 misho 19: {
1.1.2.3 misho 20: int siz = 0;
21: struct mqtthdr *hdr;
22: mqtthdr_var_t *topic;
23: mqtt_v_t *mid;
1.1.2.5 misho 24: mqtt_subscr_t *t;
25: u_char *qos;
1.1.2.3 misho 26:
1.1.2.5 misho 27: if (!buf || !Topics)
1.1.2.3 misho 28: return -1;
29: if (QOS > MQTT_QOS_EXACTLY) {
30: mqtt_SetErr(EINVAL, "Error:: invalid QoS parameter");
31: return -1;
32: }
33: if (!msgID && QOS != MQTT_QOS_ONCE) {
34: mqtt_SetErr(EINVAL, "Error:: invalid MessageID parameter must be >0");
35: return -1;
36: }
37:
38: if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
39: return -1;
40: else {
41: hdr = (struct mqtthdr *) (buf->msg_base + siz);
42: siz += sizeof(struct mqtthdr);
43: }
44:
45: /* variable header */
1.1.2.4 misho 46: mid = (mqtt_v_t*) (buf->msg_base + siz);
47: mid->val = htons(msgID);
48: siz += sizeof(mqtt_v_t);
49:
1.1.2.5 misho 50: /* payload with subscriptions */
1.1.2.8 misho 51: for (t = Topics; t && t->sub_topic._base; t++) {
1.1.2.5 misho 52: topic = (mqtthdr_var_t*) (buf->msg_base + siz);
1.1.2.8 misho 53: topic->var_sb.val = htons(t->sub_topic._size);
54: memcpy(topic->var_data, t->sub_topic._base, ntohs(topic->var_sb.val));
1.1.2.5 misho 55: siz += MQTTHDR_VAR_SIZEOF(topic);
56: qos = (buf->msg_base + siz);
1.1.2.8 misho 57: *qos = t->sub_ret;
1.1.2.5 misho 58: siz++;
59: }
1.1.2.3 misho 60:
61: /* fixed header */
1.1.2.7 misho 62: MQTTHDR_MSGINIT(hdr);
1.1.2.4 misho 63: hdr->mqtt_msg.type = MQTT_TYPE_SUBSCRIBE;
1.1.2.3 misho 64: hdr->mqtt_msg.qos = QOS;
65: hdr->mqtt_msg.dup = Dup ? 1 : 0;
1.1.2.4 misho 66: hdr->mqtt_msg.retain = 0;
1.1.2.3 misho 67: *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
68:
69: mqtt_msgRealloc(buf, siz);
70: return siz;
1.1.2.2 misho 71: }
1.1.2.6 misho 72:
73: /*
74: * mqtt_msgSUBACK() Create SUBACK message
75: *
76: * @buf = Message buffer
77: * @Topics = MQTT subscription topics
78: * @msgID = MessageID
79: * return: -1 error or >-1 message size for send
80: */
81: int
82: mqtt_msgSUBACK(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, u_short msgID)
83: {
84: int siz = 0;
85: struct mqtthdr *hdr;
86: mqtt_v_t *v;
87: mqtt_subscr_t *t;
88: u_char *qos;
89:
90: if (!buf || !Topics)
91: return -1;
92:
93: if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
94: return -1;
95: else {
96: hdr = (struct mqtthdr *) (buf->msg_base + siz);
97: siz += sizeof(struct mqtthdr);
98: v = (mqtt_v_t*) (buf->msg_base + siz);
99: siz += sizeof(mqtt_v_t);
100: }
101:
102: /* MessageID */
103: v->val = htons(msgID);
104:
105: /* QoS payload from subscriptions */
1.1.2.8 misho 106: for (t = Topics; t && t->sub_topic._base; t++) {
1.1.2.6 misho 107: qos = (buf->msg_base + siz);
1.1.2.8 misho 108: *qos = t->sub_ret;
1.1.2.6 misho 109: siz++;
110: }
111:
112: /* fixed header */
1.1.2.7 misho 113: MQTTHDR_MSGINIT(hdr);
1.1.2.6 misho 114: hdr->mqtt_msg.type = MQTT_TYPE_SUBACK;
115: *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
116:
117: mqtt_msgRealloc(buf, siz);
118: return siz;
119: }
1.1.2.7 misho 120:
121: /*
122: * mqtt_msgUNSUBSCRIBE() Create UNSUBSCRIBE message
123: *
124: * @buf = Message buffer
125: * @Topics = MQTT subscription topics
126: * @msgID = MessageID
127: * @Dup = Duplicate message
128: * @QOS = QoS
129: * return: -1 error or >-1 message size for send
130: */
131: int
132: mqtt_msgUNSUBSCRIBE(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics,
133: u_short msgID, u_char Dup, u_char QOS)
134: {
135: int siz = 0;
136: struct mqtthdr *hdr;
137: mqtthdr_var_t *topic;
138: mqtt_v_t *mid;
139: mqtt_subscr_t *t;
140:
141: if (!buf || !Topics)
142: return -1;
143: if (QOS > MQTT_QOS_EXACTLY) {
144: mqtt_SetErr(EINVAL, "Error:: invalid QoS parameter");
145: return -1;
146: }
147: if (!msgID && QOS != MQTT_QOS_ONCE) {
148: mqtt_SetErr(EINVAL, "Error:: invalid MessageID parameter must be >0");
149: return -1;
150: }
151:
152: if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
153: return -1;
154: else {
155: hdr = (struct mqtthdr *) (buf->msg_base + siz);
156: siz += sizeof(struct mqtthdr);
157: }
158:
159: /* variable header */
160: mid = (mqtt_v_t*) (buf->msg_base + siz);
161: mid->val = htons(msgID);
162: siz += sizeof(mqtt_v_t);
163:
164: /* payload with subscriptions */
1.1.2.8 misho 165: for (t = Topics; t && t->sub_topic._base; t++) {
1.1.2.7 misho 166: topic = (mqtthdr_var_t*) (buf->msg_base + siz);
1.1.2.8 misho 167: topic->var_sb.val = htons(t->sub_topic._size);
168: memcpy(topic->var_data, t->sub_topic._base, ntohs(topic->var_sb.val));
1.1.2.7 misho 169: siz += MQTTHDR_VAR_SIZEOF(topic);
170: }
171:
172: /* fixed header */
173: MQTTHDR_MSGINIT(hdr);
174: hdr->mqtt_msg.type = MQTT_TYPE_UNSUBSCRIBE;
175: hdr->mqtt_msg.qos = QOS;
176: hdr->mqtt_msg.dup = Dup ? 1 : 0;
177: hdr->mqtt_msg.retain = 0;
178: *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
179:
180: mqtt_msgRealloc(buf, siz);
181: return siz;
182: }
183:
184: /*
185: * mqtt_msgUNSUBACK() Create UNSUBACK message
186: *
187: * @buf = Message buffer
188: * @msgID = MessageID
189: * return: -1 error or >-1 message size for send
190: */
191: int
192: mqtt_msgUNSUBACK(mqtt_msg_t * __restrict buf, u_short msgID)
193: {
194: int siz = 0;
195: struct mqtthdr *hdr;
196: mqtt_v_t *v;
197:
198: if (!buf)
199: return -1;
200:
201: if (mqtt_msgRealloc(buf, sizeof(struct mqtthdr) + sizeof(mqtt_v_t)) == -1)
202: return -1;
203: else {
204: hdr = (struct mqtthdr *) (buf->msg_base + siz);
205: siz += sizeof(struct mqtthdr);
206: v = (mqtt_v_t*) (buf->msg_base + siz);
207: siz += sizeof(mqtt_v_t);
208: }
209:
210: /* fixed header */
211: MQTTHDR_MSGINIT(hdr);
212: hdr->mqtt_msg.type = MQTT_TYPE_UNSUBACK;
213: *hdr->mqtt_len = sizeof(mqtt_v_t);
214:
215: /* MessageID */
216: v->val = htons(msgID);
217:
218: return siz;
219: }
1.1.2.9 misho 220:
221:
222: /* ============= decode ============ */
223:
224: /*
1.1.2.10 misho 225: * mqtt_readSUBSCRIBE() Read SUBSCRIBE message
226: *
227: * @buf = Message buffer
228: * @msgID = MessageID
229: * @subscr = Subscriptions, must be free after use with mqtt_subFree()
230: * return: NULL error or !=NULL MQTT fixed header
231: */
232: struct mqtthdr *
233: mqtt_readSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)
234: {
235: register int i;
236: int len, ret;
237: struct mqtthdr *hdr;
238: mqtthdr_var_t *var;
239: mqtt_subscr_t *subs;
240: mqtt_v_t *v;
241: caddr_t pos;
242:
243: if (!buf || !msgID || !subscr)
244: return NULL;
245:
246: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBSCRIBE, &ret, &len);
247: if (!hdr)
248: return NULL;
249: pos = buf->msg_base + ret + 1;
250: v = (mqtt_v_t*) pos;
251:
252: /* MessageID */
253: len -= sizeof(mqtt_v_t);
254: if (len < 0) {
255: mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
256: return NULL;
257: } else {
258: *msgID = ntohs(v->val);
259: pos += sizeof(mqtt_v_t);
260: }
261:
262: subs = mqtt_subAlloc(0);
263: if (!subs)
264: return NULL;
265: else
266: *subscr = subs;
267:
268: /* Subscribes */
269: for (i = 0; len > 0; i++) {
270: var = (mqtthdr_var_t*) pos;
271: len -= MQTTHDR_VAR_SIZEOF(var) + 1;
272: if (len < 0) {
273: mqtt_subFree(subscr);
274: mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
275: return NULL;
276: }
277: subs = mqtt_subRealloc(subs, i + 1);
278: if (!subs) {
279: mqtt_subFree(subscr);
280: return NULL;
281: } else
282: *subscr = subs;
283:
284: memset(&subs[i], 0, sizeof subs[i]);
285: subs[i].sub_topic._size = ntohs(var->var_sb.val);
286: subs[i].sub_topic._base = malloc(subs[i].sub_topic._size);
287: if (!subs[i].sub_topic._base) {
288: LOGERR;
289: mqtt_subFree(subscr);
290: return NULL;
291: } else
292: memcpy(subs[i].sub_topic._base, var->var_data, subs[i].sub_topic._size);
293: pos += MQTTHDR_VAR_SIZEOF(var);
294:
295: subs[i].sub_ret = *pos;
296: pos++;
297: }
298:
299: return hdr;
300: }
301:
302: /*
1.1.2.11! misho 303: * mqtt_readSUBACK() Read SUBACK message
! 304: *
! 305: * @buf = Message buffer
! 306: * @msgID = MessageID
! 307: * @subqos = Subscribes QoS, must be free after use with free()
! 308: * return: -1 error or >-1 readed subscribes QoS elements
! 309: */
! 310: int
! 311: mqtt_readSUBACK(mqtt_msg_t * __restrict buf, u_short *msgID, u_char **subqos)
! 312: {
! 313: int len, ret;
! 314: struct mqtthdr *hdr;
! 315: mqtt_v_t *v;
! 316: caddr_t pos;
! 317:
! 318: if (!buf || !msgID || !subqos)
! 319: return -1;
! 320:
! 321: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBACK, &ret, &len);
! 322: if (!hdr)
! 323: return -1;
! 324: pos = buf->msg_base + ret + 1;
! 325: v = (mqtt_v_t*) pos;
! 326:
! 327: /* MessageID */
! 328: len -= sizeof(mqtt_v_t);
! 329: if (len < 0) {
! 330: mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
! 331: return -1;
! 332: } else {
! 333: *msgID = ntohs(v->val);
! 334: pos += sizeof(mqtt_v_t);
! 335: }
! 336:
! 337: /* Subscribes */
! 338: *subqos = malloc(len);
! 339: if (!*subqos) {
! 340: LOGERR;
! 341: return -1;
! 342: } else
! 343: memcpy(*subqos, pos, len);
! 344:
! 345: return len;
! 346: }
! 347:
! 348: /*
! 349: * mqtt_readUNSUBSCRIBE() Read UNSUBSCRIBE message
! 350: *
! 351: * @buf = Message buffer
! 352: * @msgID = MessageID
! 353: * @subscr = Subscriptions, must be free after use with mqtt_subFree()
! 354: * return: NULL error or !=NULL MQTT fixed header
! 355: */
! 356: struct mqtthdr *
! 357: mqtt_readUNSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)
! 358: {
! 359: register int i;
! 360: int len, ret;
! 361: struct mqtthdr *hdr;
! 362: mqtthdr_var_t *var;
! 363: mqtt_subscr_t *subs;
! 364: mqtt_v_t *v;
! 365: caddr_t pos;
! 366:
! 367: if (!buf || !msgID || !subscr)
! 368: return NULL;
! 369:
! 370: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBSCRIBE, &ret, &len);
! 371: if (!hdr)
! 372: return NULL;
! 373: pos = buf->msg_base + ret + 1;
! 374: v = (mqtt_v_t*) pos;
! 375:
! 376: /* MessageID */
! 377: len -= sizeof(mqtt_v_t);
! 378: if (len < 0) {
! 379: mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
! 380: return NULL;
! 381: } else {
! 382: *msgID = ntohs(v->val);
! 383: pos += sizeof(mqtt_v_t);
! 384: }
! 385:
! 386: subs = mqtt_subAlloc(0);
! 387: if (!subs)
! 388: return NULL;
! 389: else
! 390: *subscr = subs;
! 391:
! 392: /* Subscribes */
! 393: for (i = 0; len > 0; i++) {
! 394: var = (mqtthdr_var_t*) pos;
! 395: len -= MQTTHDR_VAR_SIZEOF(var);
! 396: if (len < 0) {
! 397: mqtt_subFree(subscr);
! 398: mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
! 399: return NULL;
! 400: }
! 401: subs = mqtt_subRealloc(subs, i + 1);
! 402: if (!subs) {
! 403: mqtt_subFree(subscr);
! 404: return NULL;
! 405: } else
! 406: *subscr = subs;
! 407:
! 408: memset(&subs[i], 0, sizeof subs[i]);
! 409: subs[i].sub_topic._size = ntohs(var->var_sb.val);
! 410: subs[i].sub_topic._base = malloc(subs[i].sub_topic._size);
! 411: if (!subs[i].sub_topic._base) {
! 412: LOGERR;
! 413: mqtt_subFree(subscr);
! 414: return NULL;
! 415: } else
! 416: memcpy(subs[i].sub_topic._base, var->var_data, subs[i].sub_topic._size);
! 417: pos += MQTTHDR_VAR_SIZEOF(var);
! 418: }
! 419:
! 420: return hdr;
! 421: }
! 422:
! 423: /*
1.1.2.9 misho 424: * mqtt_readUNSUBACK() Read UNSUBACK message
425: *
426: * @buf = Message buffer
427: * return: -1 error or MessageID
428: */
429: u_short
430: mqtt_readUNSUBACK(mqtt_msg_t * __restrict buf)
431: {
432: int len, ret;
433: struct mqtthdr *hdr;
434: mqtt_v_t *v;
435: caddr_t pos;
436:
437: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBACK, &ret, &len);
438: if (!hdr)
439: return (u_short) -1;
440: if (len < sizeof(mqtt_v_t)) {
441: mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
442: return (u_short) -1;
443: } else {
444: pos = buf->msg_base + ret + 1;
445: v = (mqtt_v_t*) pos;
446: }
447:
448: return ntohs(v->val);
449: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>