Annotation of mqtt/src/pub.c, revision 1.1.2.8
1.1.2.1 misho 1: #include "global.h"
2:
3:
4: /* ------------------------------------------------------------------- */
5:
1.1.2.2 misho 6: /*
7: * mqtt_msgPUBLISH() Create PUBLISH message
8: *
9: * @buf = Message buffer
10: * @csTopic = Publish topic
11: * @msgID = MessageID >0, if QOS != MQTT_QOS_ONCE
12: * @Dup = Duplicate message
13: * @QOS = QoS
1.1.2.4 misho 14: * @Retain = Retain message
1.1.2.5 misho 15: * @pData = Publish data into topic
16: * @datlen = Publish data length
1.1.2.2 misho 17: * return: -1 error or >-1 message size for send
18: */
19: int
20: mqtt_msgPUBLISH(mqtt_msg_t * __restrict buf, const char *csTopic, u_short msgID,
1.1.2.8 ! misho 21: u_char Dup, u_char QOS, u_char Retain, const void *pData, int datlen)
1.1.2.2 misho 22: {
23: int siz = 0;
24: struct mqtthdr *hdr;
25: mqtthdr_var_t *topic;
26: mqtt_v_t *mid;
1.1.2.5 misho 27: void *data;
1.1.2.2 misho 28:
29: if (!buf || !csTopic)
30: return -1;
31: if (QOS > MQTT_QOS_EXACTLY) {
32: mqtt_SetErr(EINVAL, "Error:: invalid QoS parameter");
33: return -1;
34: }
35: if (!msgID && QOS != MQTT_QOS_ONCE) {
36: mqtt_SetErr(EINVAL, "Error:: invalid MessageID parameter must be >0");
37: return -1;
38: }
39:
40: if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
41: return -1;
42: else {
43: hdr = (struct mqtthdr *) (buf->msg_base + siz);
44: siz += sizeof(struct mqtthdr);
45: }
46:
47: /* variable header */
48: topic = (mqtthdr_var_t*) (buf->msg_base + siz);
49: topic->var_sb.val = htons(strlen(csTopic));
50: memcpy(topic->var_data, csTopic, ntohs(topic->var_sb.val));
51: siz += MQTTHDR_VAR_SIZEOF(topic);
52:
53: mid = (mqtt_v_t*) (buf->msg_base + siz);
54: mid->val = htons(msgID);
55: siz += sizeof(mqtt_v_t);
56:
1.1.2.5 misho 57: /* load with data */
58: if (pData && datlen) {
59: data = buf->msg_base + siz;
60: memcpy(data, pData, datlen);
61: siz += datlen;
62: }
63:
1.1.2.2 misho 64: /* fixed header */
1.1.2.6 misho 65: MQTTHDR_MSGINIT(hdr);
1.1.2.2 misho 66: hdr->mqtt_msg.type = MQTT_TYPE_PUBLISH;
67: hdr->mqtt_msg.qos = QOS;
68: hdr->mqtt_msg.dup = Dup ? 1 : 0;
1.1.2.4 misho 69: hdr->mqtt_msg.retain = Retain ? 1 : 0;
1.1.2.2 misho 70: *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
71:
72: mqtt_msgRealloc(buf, siz);
73: return siz;
74: }
75:
76: static int
77: _mqtt_msgPUB_(mqtt_msg_t * __restrict buf, u_char cmd, u_short msgID)
78: {
79: int siz = 0;
80: struct mqtthdr *hdr;
81: mqtt_v_t *v;
82:
83: if (!buf)
84: return -1;
85:
86: if (mqtt_msgRealloc(buf, sizeof(struct mqtthdr) + sizeof(mqtt_v_t)) == -1)
87: return -1;
88: else {
89: hdr = (struct mqtthdr *) (buf->msg_base + siz);
90: siz += sizeof(struct mqtthdr);
91: v = (mqtt_v_t*) (buf->msg_base + siz);
92: siz += sizeof(mqtt_v_t);
93: }
94:
95: /* fixed header */
1.1.2.6 misho 96: MQTTHDR_MSGINIT(hdr);
1.1.2.2 misho 97: hdr->mqtt_msg.type = cmd;
98: *hdr->mqtt_len = sizeof(mqtt_v_t);
99:
100: /* MessageID */
101: v->val = htons(msgID);
102:
103: return siz;
104: }
105:
106: /*
107: * mqtt_msgPUBACK() Create PUBACK message
108: *
109: * @buf = Message buffer
110: * @msgID = MessageID
111: * return: -1 error or >-1 message size for send
112: */
113: inline int
114: mqtt_msgPUBACK(mqtt_msg_t * __restrict buf, u_short msgID)
115: {
116: return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBACK, msgID);
117: }
118:
119: /*
120: * mqtt_msgPUBREC() Create PUBREC message
121: *
122: * @buf = Message buffer
123: * @msgID = MessageID
124: * return: -1 error or >-1 message size for send
125: */
126: inline int
127: mqtt_msgPUBREC(mqtt_msg_t * __restrict buf, u_short msgID)
128: {
129: return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBREC, msgID);
130: }
131:
132: /*
133: * mqtt_msgPUBREL() Create PUBREL message
134: *
135: * @buf = Message buffer
136: * @msgID = MessageID
137: * return: -1 error or >-1 message size for send
138: */
139: inline int
140: mqtt_msgPUBREL(mqtt_msg_t * __restrict buf, u_short msgID)
141: {
142: return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBREL, msgID);
143: }
144:
145: /*
146: * mqtt_msgPUBCOMP() Create PUBCOMP message
147: *
148: * @buf = Message buffer
149: * @msgID = MessageID
150: * return: -1 error or >-1 message size for send
151: */
152: inline int
153: mqtt_msgPUBCOMP(mqtt_msg_t * __restrict buf, u_short msgID)
154: {
155: return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBCOMP, msgID);
156: }
1.1.2.7 misho 157:
158:
159: /* ============= decode ============ */
160:
161: /*
1.1.2.8 ! misho 162: * mqtt_readPUBLISH() Read PUBLISH message
! 163: *
! 164: * @buf = Message buffer
! 165: * @psTopic = Topic
! 166: * @topicLen = Topic length
! 167: * @msgID = MessageID
! 168: * @pData = Data buffer
! 169: * @datLen = Data buffer length, if *datLen == 0 allocate memory for pData
! 170: * return: NULL error or !=NULL MQTT fixed header
! 171: */
! 172: struct mqtthdr *
! 173: mqtt_readPUBLISH(mqtt_msg_t * __restrict buf, char * __restrict psTopic, int topicLen,
! 174: u_short *msgID, void * __restrict pData, int *datLen)
! 175: {
! 176: int len, ret;
! 177: struct mqtthdr *hdr;
! 178: mqtthdr_var_t *var;
! 179: mqtt_v_t *v;
! 180: caddr_t pos;
! 181:
! 182: if (!buf || !psTopic || !msgID || !pData)
! 183: return NULL;
! 184:
! 185: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBLISH, &ret, &len);
! 186: if (!hdr)
! 187: return NULL;
! 188: if (len < sizeof(mqtt_v_t)) {
! 189: mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
! 190: return NULL;
! 191: } else {
! 192: pos = buf->msg_base + ret + 1;
! 193: var = (mqtthdr_var_t*) pos;
! 194: }
! 195:
! 196: /* topic */
! 197: len -= MQTTHDR_VAR_SIZEOF(var);
! 198: if (len < 0) {
! 199: mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
! 200: return NULL;
! 201: } else {
! 202: memset(psTopic, 0, topicLen--);
! 203: memcpy(psTopic, var->var_data, ntohs(var->var_sb.val) > topicLen ?
! 204: topicLen : ntohs(var->var_sb.val));
! 205: pos += MQTTHDR_VAR_SIZEOF(var);
! 206: v = (mqtt_v_t*) pos;
! 207: }
! 208:
! 209: len -= sizeof(mqtt_v_t);
! 210: if (len < 0) {
! 211: mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
! 212: return NULL;
! 213: } else {
! 214: *msgID = ntohs(v->val);
! 215: pos += sizeof(mqtt_v_t);
! 216: }
! 217:
! 218: /* data */
! 219: if (len < 0) {
! 220: mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
! 221: return NULL;
! 222: } else {
! 223: if (!*datLen) {
! 224: if (!(pData = malloc(len))) {
! 225: LOGERR;
! 226: return NULL;
! 227: } else
! 228: *datLen = len;
! 229: }
! 230:
! 231: memset(pData, 0, *datLen);
! 232: if (len < *datLen)
! 233: *datLen = len;
! 234: memcpy(pData, pos, *datLen);
! 235: }
! 236:
! 237: return hdr;
! 238: }
! 239:
! 240: /*
1.1.2.7 misho 241: * mqtt_readPUBACK() Read PUBACK message
242: *
243: * @buf = Message buffer
244: * return: -1 error or MessageID
245: */
246: u_short
247: mqtt_readPUBACK(mqtt_msg_t * __restrict buf)
248: {
249: int len, ret;
250: struct mqtthdr *hdr;
251: mqtt_v_t *v;
252: caddr_t pos;
253:
254: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBACK, &ret, &len);
255: if (!hdr)
256: return (u_short) -1;
257: if (len < sizeof(mqtt_v_t)) {
258: mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
259: return (u_short) -1;
260: } else {
261: pos = buf->msg_base + ret + 1;
262: v = (mqtt_v_t*) pos;
263: }
264:
265: return ntohs(v->val);
266: }
267:
268: /*
269: * mqtt_readPUBREC() Read PUBREC message
270: *
271: * @buf = Message buffer
272: * return: -1 error or MessageID
273: */
274: u_short
275: mqtt_readPUBREC(mqtt_msg_t * __restrict buf)
276: {
277: int len, ret;
278: struct mqtthdr *hdr;
279: mqtt_v_t *v;
280: caddr_t pos;
281:
282: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBREC, &ret, &len);
283: if (!hdr)
284: return (u_short) -1;
285: if (len < sizeof(mqtt_v_t)) {
286: mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
287: return (u_short) -1;
288: } else {
289: pos = buf->msg_base + ret + 1;
290: v = (mqtt_v_t*) pos;
291: }
292:
293: return ntohs(v->val);
294: }
295:
296: /*
297: * mqtt_readPUBREL() Read PUBREL message
298: *
299: * @buf = Message buffer
300: * return: -1 error or MessageID
301: */
302: u_short
303: mqtt_readPUBREL(mqtt_msg_t * __restrict buf)
304: {
305: int len, ret;
306: struct mqtthdr *hdr;
307: mqtt_v_t *v;
308: caddr_t pos;
309:
310: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBREL, &ret, &len);
311: if (!hdr)
312: return (u_short) -1;
313: if (len < sizeof(mqtt_v_t)) {
314: mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
315: return (u_short) -1;
316: } else {
317: pos = buf->msg_base + ret + 1;
318: v = (mqtt_v_t*) pos;
319: }
320:
321: return ntohs(v->val);
322: }
323:
324: /*
325: * mqtt_readPUBCOMP() Read PUBCOMP message
326: *
327: * @buf = Message buffer
328: * return: -1 error or MessageID
329: */
330: u_short
331: mqtt_readPUBCOMP(mqtt_msg_t * __restrict buf)
332: {
333: int len, ret;
334: struct mqtthdr *hdr;
335: mqtt_v_t *v;
336: caddr_t pos;
337:
338: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBCOMP, &ret, &len);
339: if (!hdr)
340: return (u_short) -1;
341: if (len < sizeof(mqtt_v_t)) {
342: mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
343: return (u_short) -1;
344: } else {
345: pos = buf->msg_base + ret + 1;
346: v = (mqtt_v_t*) pos;
347: }
348:
349: return ntohs(v->val);
350: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>