Annotation of mqtt/src/pub.c, revision 1.1.2.9
1.1.2.1 misho 1: #include "global.h"
2:
3:
4: /* ------------------------------------------------------------------- */
5:
1.1.2.2 misho 6: /*
7: * mqtt_msgPUBLISH() Create PUBLISH message
8: *
9: * @buf = Message buffer
10: * @csTopic = Publish topic
11: * @msgID = MessageID >0, if QOS != MQTT_QOS_ONCE
12: * @Dup = Duplicate message
13: * @QOS = QoS
1.1.2.4 misho 14: * @Retain = Retain message
1.1.2.5 misho 15: * @pData = Publish data into topic
16: * @datlen = Publish data length
1.1.2.2 misho 17: * return: -1 error or >-1 message size for send
18: */
19: int
20: mqtt_msgPUBLISH(mqtt_msg_t * __restrict buf, const char *csTopic, u_short msgID,
1.1.2.8 misho 21: u_char Dup, u_char QOS, u_char Retain, const void *pData, int datlen)
1.1.2.2 misho 22: {
23: int siz = 0;
24: struct mqtthdr *hdr;
25: mqtthdr_var_t *topic;
26: mqtt_v_t *mid;
1.1.2.5 misho 27: void *data;
1.1.2.2 misho 28:
29: if (!buf || !csTopic)
30: return -1;
31: if (QOS > MQTT_QOS_EXACTLY) {
32: mqtt_SetErr(EINVAL, "Error:: invalid QoS parameter");
33: return -1;
34: }
35: if (!msgID && QOS != MQTT_QOS_ONCE) {
36: mqtt_SetErr(EINVAL, "Error:: invalid MessageID parameter must be >0");
37: return -1;
38: }
39:
40: if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
41: return -1;
42: else {
43: hdr = (struct mqtthdr *) (buf->msg_base + siz);
44: siz += sizeof(struct mqtthdr);
45: }
46:
47: /* variable header */
48: topic = (mqtthdr_var_t*) (buf->msg_base + siz);
49: topic->var_sb.val = htons(strlen(csTopic));
50: memcpy(topic->var_data, csTopic, ntohs(topic->var_sb.val));
51: siz += MQTTHDR_VAR_SIZEOF(topic);
52:
53: mid = (mqtt_v_t*) (buf->msg_base + siz);
54: mid->val = htons(msgID);
55: siz += sizeof(mqtt_v_t);
56:
1.1.2.5 misho 57: /* load with data */
58: if (pData && datlen) {
59: data = buf->msg_base + siz;
60: memcpy(data, pData, datlen);
61: siz += datlen;
62: }
63:
1.1.2.2 misho 64: /* fixed header */
1.1.2.6 misho 65: MQTTHDR_MSGINIT(hdr);
1.1.2.2 misho 66: hdr->mqtt_msg.type = MQTT_TYPE_PUBLISH;
67: hdr->mqtt_msg.qos = QOS;
68: hdr->mqtt_msg.dup = Dup ? 1 : 0;
1.1.2.4 misho 69: hdr->mqtt_msg.retain = Retain ? 1 : 0;
1.1.2.2 misho 70: *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
71:
72: mqtt_msgRealloc(buf, siz);
73: return siz;
74: }
75:
76: static int
77: _mqtt_msgPUB_(mqtt_msg_t * __restrict buf, u_char cmd, u_short msgID)
78: {
79: int siz = 0;
80: struct mqtthdr *hdr;
81: mqtt_v_t *v;
82:
83: if (!buf)
84: return -1;
85:
86: if (mqtt_msgRealloc(buf, sizeof(struct mqtthdr) + sizeof(mqtt_v_t)) == -1)
87: return -1;
88: else {
89: hdr = (struct mqtthdr *) (buf->msg_base + siz);
90: siz += sizeof(struct mqtthdr);
91: v = (mqtt_v_t*) (buf->msg_base + siz);
92: siz += sizeof(mqtt_v_t);
93: }
94:
95: /* fixed header */
1.1.2.6 misho 96: MQTTHDR_MSGINIT(hdr);
1.1.2.2 misho 97: hdr->mqtt_msg.type = cmd;
98: *hdr->mqtt_len = sizeof(mqtt_v_t);
99:
100: /* MessageID */
101: v->val = htons(msgID);
102:
103: return siz;
104: }
105:
106: /*
107: * mqtt_msgPUBACK() Create PUBACK message
108: *
109: * @buf = Message buffer
110: * @msgID = MessageID
111: * return: -1 error or >-1 message size for send
112: */
113: inline int
114: mqtt_msgPUBACK(mqtt_msg_t * __restrict buf, u_short msgID)
115: {
116: return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBACK, msgID);
117: }
118:
119: /*
120: * mqtt_msgPUBREC() Create PUBREC message
121: *
122: * @buf = Message buffer
123: * @msgID = MessageID
124: * return: -1 error or >-1 message size for send
125: */
126: inline int
127: mqtt_msgPUBREC(mqtt_msg_t * __restrict buf, u_short msgID)
128: {
129: return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBREC, msgID);
130: }
131:
132: /*
133: * mqtt_msgPUBREL() Create PUBREL message
134: *
135: * @buf = Message buffer
136: * @msgID = MessageID
137: * return: -1 error or >-1 message size for send
138: */
139: inline int
140: mqtt_msgPUBREL(mqtt_msg_t * __restrict buf, u_short msgID)
141: {
142: return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBREL, msgID);
143: }
144:
145: /*
146: * mqtt_msgPUBCOMP() Create PUBCOMP message
147: *
148: * @buf = Message buffer
149: * @msgID = MessageID
150: * return: -1 error or >-1 message size for send
151: */
152: inline int
153: mqtt_msgPUBCOMP(mqtt_msg_t * __restrict buf, u_short msgID)
154: {
155: return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBCOMP, msgID);
156: }
1.1.2.7 misho 157:
158:
159: /* ============= decode ============ */
160:
161: /*
1.1.2.8 misho 162: * mqtt_readPUBLISH() Read PUBLISH message
163: *
164: * @buf = Message buffer
165: * @psTopic = Topic
166: * @topicLen = Topic length
167: * @msgID = MessageID
168: * @pData = Data buffer
169: * @datLen = Data buffer length, if *datLen == 0 allocate memory for pData
170: * return: NULL error or !=NULL MQTT fixed header
171: */
172: struct mqtthdr *
173: mqtt_readPUBLISH(mqtt_msg_t * __restrict buf, char * __restrict psTopic, int topicLen,
174: u_short *msgID, void * __restrict pData, int *datLen)
175: {
176: int len, ret;
177: struct mqtthdr *hdr;
178: mqtthdr_var_t *var;
179: mqtt_v_t *v;
180: caddr_t pos;
181:
182: if (!buf || !psTopic || !msgID || !pData)
183: return NULL;
184:
185: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBLISH, &ret, &len);
186: if (!hdr)
187: return NULL;
1.1.2.9 ! misho 188: pos = buf->msg_base + ret + 1;
! 189: var = (mqtthdr_var_t*) pos;
1.1.2.8 misho 190:
191: /* topic */
192: len -= MQTTHDR_VAR_SIZEOF(var);
193: if (len < 0) {
194: mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
195: return NULL;
196: } else {
197: memset(psTopic, 0, topicLen--);
198: memcpy(psTopic, var->var_data, ntohs(var->var_sb.val) > topicLen ?
199: topicLen : ntohs(var->var_sb.val));
200: pos += MQTTHDR_VAR_SIZEOF(var);
201: v = (mqtt_v_t*) pos;
202: }
203:
204: len -= sizeof(mqtt_v_t);
205: if (len < 0) {
206: mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
207: return NULL;
208: } else {
209: *msgID = ntohs(v->val);
210: pos += sizeof(mqtt_v_t);
211: }
212:
213: /* data */
214: if (len < 0) {
215: mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
216: return NULL;
217: } else {
218: if (!*datLen) {
219: if (!(pData = malloc(len))) {
220: LOGERR;
221: return NULL;
222: } else
223: *datLen = len;
224: }
225:
226: memset(pData, 0, *datLen);
227: if (len < *datLen)
228: *datLen = len;
229: memcpy(pData, pos, *datLen);
230: }
231:
232: return hdr;
233: }
234:
235: /*
1.1.2.7 misho 236: * mqtt_readPUBACK() Read PUBACK message
237: *
238: * @buf = Message buffer
239: * return: -1 error or MessageID
240: */
241: u_short
242: mqtt_readPUBACK(mqtt_msg_t * __restrict buf)
243: {
244: int len, ret;
245: struct mqtthdr *hdr;
246: mqtt_v_t *v;
247: caddr_t pos;
248:
249: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBACK, &ret, &len);
250: if (!hdr)
251: return (u_short) -1;
252: if (len < sizeof(mqtt_v_t)) {
253: mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
254: return (u_short) -1;
255: } else {
256: pos = buf->msg_base + ret + 1;
257: v = (mqtt_v_t*) pos;
258: }
259:
260: return ntohs(v->val);
261: }
262:
263: /*
264: * mqtt_readPUBREC() Read PUBREC message
265: *
266: * @buf = Message buffer
267: * return: -1 error or MessageID
268: */
269: u_short
270: mqtt_readPUBREC(mqtt_msg_t * __restrict buf)
271: {
272: int len, ret;
273: struct mqtthdr *hdr;
274: mqtt_v_t *v;
275: caddr_t pos;
276:
277: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBREC, &ret, &len);
278: if (!hdr)
279: return (u_short) -1;
280: if (len < sizeof(mqtt_v_t)) {
281: mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
282: return (u_short) -1;
283: } else {
284: pos = buf->msg_base + ret + 1;
285: v = (mqtt_v_t*) pos;
286: }
287:
288: return ntohs(v->val);
289: }
290:
291: /*
292: * mqtt_readPUBREL() Read PUBREL message
293: *
294: * @buf = Message buffer
295: * return: -1 error or MessageID
296: */
297: u_short
298: mqtt_readPUBREL(mqtt_msg_t * __restrict buf)
299: {
300: int len, ret;
301: struct mqtthdr *hdr;
302: mqtt_v_t *v;
303: caddr_t pos;
304:
305: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBREL, &ret, &len);
306: if (!hdr)
307: return (u_short) -1;
308: if (len < sizeof(mqtt_v_t)) {
309: mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
310: return (u_short) -1;
311: } else {
312: pos = buf->msg_base + ret + 1;
313: v = (mqtt_v_t*) pos;
314: }
315:
316: return ntohs(v->val);
317: }
318:
319: /*
320: * mqtt_readPUBCOMP() Read PUBCOMP message
321: *
322: * @buf = Message buffer
323: * return: -1 error or MessageID
324: */
325: u_short
326: mqtt_readPUBCOMP(mqtt_msg_t * __restrict buf)
327: {
328: int len, ret;
329: struct mqtthdr *hdr;
330: mqtt_v_t *v;
331: caddr_t pos;
332:
333: hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBCOMP, &ret, &len);
334: if (!hdr)
335: return (u_short) -1;
336: if (len < sizeof(mqtt_v_t)) {
337: mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
338: return (u_short) -1;
339: } else {
340: pos = buf->msg_base + ret + 1;
341: v = (mqtt_v_t*) pos;
342: }
343:
344: return ntohs(v->val);
345: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>