Annotation of embedaddon/curl/lib/mqtt.c, revision 1.1.1.1
1.1 misho 1: /***************************************************************************
2: * _ _ ____ _
3: * Project ___| | | | _ \| |
4: * / __| | | | |_) | |
5: * | (__| |_| | _ <| |___
6: * \___|\___/|_| \_\_____|
7: *
8: * Copyright (C) 2020, Daniel Stenberg, <daniel@haxx.se>, et al.
9: * Copyright (C) 2019, Björn Stenberg, <bjorn@haxx.se>
10: *
11: * This software is licensed as described in the file COPYING, which
12: * you should have received as part of this distribution. The terms
13: * are also available at https://curl.haxx.se/docs/copyright.html.
14: *
15: * You may opt to use, copy, modify, merge, publish, distribute and/or sell
16: * copies of the Software, and permit persons to whom the Software is
17: * furnished to do so, under the terms of the COPYING file.
18: *
19: * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
20: * KIND, either express or implied.
21: *
22: ***************************************************************************/
23:
24: #include "curl_setup.h"
25:
26: #ifdef CURL_ENABLE_MQTT
27:
28: #include "urldata.h"
29: #include <curl/curl.h>
30: #include "transfer.h"
31: #include "sendf.h"
32: #include "progress.h"
33: #include "mqtt.h"
34: #include "select.h"
35: #include "strdup.h"
36: #include "url.h"
37: #include "escape.h"
38: #include "warnless.h"
39: #include "curl_printf.h"
40: #include "curl_memory.h"
41: #include "multiif.h"
42: #include "rand.h"
43:
44: /* The last #include file should be: */
45: #include "memdebug.h"
46:
47: #define MQTT_MSG_CONNECT 0x10
48: #define MQTT_MSG_CONNACK 0x20
49: #define MQTT_MSG_PUBLISH 0x30
50: #define MQTT_MSG_SUBSCRIBE 0x82
51: #define MQTT_MSG_SUBACK 0x90
52: #define MQTT_MSG_DISCONNECT 0xe0
53:
54: #define MQTT_CONNACK_LEN 2
55: #define MQTT_SUBACK_LEN 3
56: #define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */
57:
58: /*
59: * Forward declarations.
60: */
61:
62: static CURLcode mqtt_do(struct connectdata *conn, bool *done);
63: static CURLcode mqtt_doing(struct connectdata *conn, bool *done);
64: static int mqtt_getsock(struct connectdata *conn, curl_socket_t *sock);
65: static CURLcode mqtt_setup_conn(struct connectdata *conn);
66:
67: /*
68: * MQTT protocol handler.
69: */
70:
71: const struct Curl_handler Curl_handler_mqtt = {
72: "MQTT", /* scheme */
73: mqtt_setup_conn, /* setup_connection */
74: mqtt_do, /* do_it */
75: ZERO_NULL, /* done */
76: ZERO_NULL, /* do_more */
77: ZERO_NULL, /* connect_it */
78: ZERO_NULL, /* connecting */
79: mqtt_doing, /* doing */
80: ZERO_NULL, /* proto_getsock */
81: mqtt_getsock, /* doing_getsock */
82: ZERO_NULL, /* domore_getsock */
83: ZERO_NULL, /* perform_getsock */
84: ZERO_NULL, /* disconnect */
85: ZERO_NULL, /* readwrite */
86: ZERO_NULL, /* connection_check */
87: PORT_MQTT, /* defport */
88: CURLPROTO_MQTT, /* protocol */
89: PROTOPT_NONE /* flags */
90: };
91:
92: static CURLcode mqtt_setup_conn(struct connectdata *conn)
93: {
94: /* allocate the HTTP-specific struct for the Curl_easy, only to survive
95: during this request */
96: struct MQTT *mq;
97: struct Curl_easy *data = conn->data;
98: DEBUGASSERT(data->req.protop == NULL);
99:
100: mq = calloc(1, sizeof(struct MQTT));
101: if(!mq)
102: return CURLE_OUT_OF_MEMORY;
103: data->req.protop = mq;
104: return CURLE_OK;
105: }
106:
107: static CURLcode mqtt_send(struct connectdata *conn,
108: char *buf, size_t len)
109: {
110: CURLcode result = CURLE_OK;
111: curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
112: struct Curl_easy *data = conn->data;
113: struct MQTT *mq = data->req.protop;
114: ssize_t n;
115: result = Curl_write(conn, sockfd, buf, len, &n);
116: if(!result && data->set.verbose)
117: Curl_debug(data, CURLINFO_HEADER_OUT, buf, (size_t)n);
118: if(len != (size_t)n) {
119: size_t nsend = len - n;
120: char *sendleftovers = Curl_memdup(&buf[n], nsend);
121: if(!sendleftovers)
122: return CURLE_OUT_OF_MEMORY;
123: mq->sendleftovers = sendleftovers;
124: mq->nsend = nsend;
125: }
126: return result;
127: }
128:
129: /* Generic function called by the multi interface to figure out what socket(s)
130: to wait for and for what actions during the DOING and PROTOCONNECT
131: states */
132: static int mqtt_getsock(struct connectdata *conn,
133: curl_socket_t *sock)
134: {
135: sock[0] = conn->sock[FIRSTSOCKET];
136: return GETSOCK_READSOCK(FIRSTSOCKET);
137: }
138:
139: static CURLcode mqtt_connect(struct connectdata *conn)
140: {
141: CURLcode result = CURLE_OK;
142: const size_t client_id_offset = 14;
143: const size_t packetlen = client_id_offset + MQTT_CLIENTID_LEN;
144: char client_id[MQTT_CLIENTID_LEN + 1] = "curl";
145: const size_t curl_len = strlen("curl");
146: char packet[32] = {
147: MQTT_MSG_CONNECT, /* packet type */
148: 0x00, /* remaining length */
149: 0x00, 0x04, /* protocol length */
150: 'M','Q','T','T', /* protocol name */
151: 0x04, /* protocol level */
152: 0x02, /* CONNECT flag: CleanSession */
153: 0x00, 0x3c, /* keep-alive 0 = disabled */
154: 0x00, 0x00 /* payload1 length */
155: };
156: packet[1] = (packetlen - 2) & 0x7f;
157: packet[client_id_offset - 1] = MQTT_CLIENTID_LEN;
158:
159: result = Curl_rand_hex(conn->data, (unsigned char *)&client_id[curl_len],
160: MQTT_CLIENTID_LEN - curl_len + 1);
161: memcpy(&packet[client_id_offset], client_id, MQTT_CLIENTID_LEN);
162: infof(conn->data, "Using client id '%s'\n", client_id);
163: if(!result)
164: result = mqtt_send(conn, packet, packetlen);
165: return result;
166: }
167:
168: static CURLcode mqtt_disconnect(struct connectdata *conn)
169: {
170: CURLcode result = CURLE_OK;
171: result = mqtt_send(conn, (char *)"\xe0\x00", 2);
172: return result;
173: }
174:
175: static CURLcode mqtt_verify_connack(struct connectdata *conn)
176: {
177: CURLcode result;
178: curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
179: unsigned char readbuf[MQTT_CONNACK_LEN];
180: ssize_t nread;
181: struct Curl_easy *data = conn->data;
182:
183: result = Curl_read(conn, sockfd, (char *)readbuf, MQTT_CONNACK_LEN, &nread);
184: if(result)
185: goto fail;
186:
187: if(data->set.verbose)
188: Curl_debug(data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread);
189:
190: /* fixme */
191: if(nread < MQTT_CONNACK_LEN) {
192: result = CURLE_WEIRD_SERVER_REPLY;
193: goto fail;
194: }
195:
196: /* verify CONNACK */
197: if(readbuf[0] != 0x00 || readbuf[1] != 0x00) {
198: failf(data, "Expected %02x%02x but got %02x%02x",
199: 0x00, 0x00, readbuf[0], readbuf[1]);
200: result = CURLE_WEIRD_SERVER_REPLY;
201: }
202:
203: fail:
204: return result;
205: }
206:
207: static CURLcode mqtt_get_topic(struct connectdata *conn,
208: char **topic, size_t *topiclen)
209: {
210: CURLcode result = CURLE_OK;
211: char *path = conn->data->state.up.path;
212:
213: if(strlen(path) > 1) {
214: result = Curl_urldecode(conn->data, path + 1, 0, topic, topiclen, FALSE);
215: }
216: else {
217: failf(conn->data, "Error: No topic specified.");
218: result = CURLE_URL_MALFORMAT;
219: }
220: return result;
221: }
222:
223:
224: static int mqtt_encode_len(char *buf, size_t len)
225: {
226: unsigned char encoded;
227: int i;
228:
229: for(i = 0; (len > 0) && (i<4); i++) {
230: encoded = len % 0x80;
231: len /= 0x80;
232: if(len)
233: encoded |= 0x80;
234: buf[i] = encoded;
235: }
236:
237: return i;
238: }
239:
240: static CURLcode mqtt_subscribe(struct connectdata *conn)
241: {
242: CURLcode result = CURLE_OK;
243: char *topic = NULL;
244: size_t topiclen;
245: unsigned char *packet = NULL;
246: size_t packetlen;
247: char encodedsize[4];
248: size_t n;
249:
250: result = mqtt_get_topic(conn, &topic, &topiclen);
251: if(result)
252: goto fail;
253:
254: conn->proto.mqtt.packetid++;
255:
256: packetlen = topiclen + 5; /* packetid + topic (has a two byte length field)
257: + 2 bytes topic length + QoS byte */
258: n = mqtt_encode_len((char *)encodedsize, packetlen);
259: packetlen += n + 1; /* add one for the control packet type byte */
260:
261: packet = malloc(packetlen);
262: if(!packet) {
263: result = CURLE_OUT_OF_MEMORY;
264: goto fail;
265: }
266:
267: packet[0] = MQTT_MSG_SUBSCRIBE;
268: memcpy(&packet[1], encodedsize, n);
269: packet[1 + n] = (conn->proto.mqtt.packetid >> 8) & 0xff;
270: packet[2 + n] = conn->proto.mqtt.packetid & 0xff;
271: packet[3 + n] = (topiclen >> 8) & 0xff;
272: packet[4 + n ] = topiclen & 0xff;
273: memcpy(&packet[5 + n], topic, topiclen);
274: packet[5 + n + topiclen] = 0; /* QoS zero */
275:
276: result = mqtt_send(conn, (char *)packet, packetlen);
277:
278: fail:
279: free(topic);
280: free(packet);
281: return result;
282: }
283:
284: /*
285: * Called when the first byte was already read.
286: */
287: static CURLcode mqtt_verify_suback(struct connectdata *conn)
288: {
289: CURLcode result;
290: curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
291: unsigned char readbuf[MQTT_SUBACK_LEN];
292: ssize_t nread;
293: struct mqtt_conn *mqtt = &conn->proto.mqtt;
294:
295: result = Curl_read(conn, sockfd, (char *)readbuf, MQTT_SUBACK_LEN, &nread);
296: if(result)
297: goto fail;
298:
299: if(conn->data->set.verbose)
300: Curl_debug(conn->data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread);
301:
302: /* fixme */
303: if(nread < MQTT_SUBACK_LEN) {
304: result = CURLE_WEIRD_SERVER_REPLY;
305: goto fail;
306: }
307:
308: /* verify SUBACK */
309: if(readbuf[0] != ((mqtt->packetid >> 8) & 0xff) ||
310: readbuf[1] != (mqtt->packetid & 0xff) ||
311: readbuf[2] != 0x00)
312: result = CURLE_WEIRD_SERVER_REPLY;
313:
314: fail:
315: return result;
316: }
317:
318: static CURLcode mqtt_publish(struct connectdata *conn)
319: {
320: CURLcode result;
321: char *payload = conn->data->set.postfields;
322: size_t payloadlen = (size_t)conn->data->set.postfieldsize;
323: char *topic = NULL;
324: size_t topiclen;
325: unsigned char *pkt = NULL;
326: size_t i = 0;
327: size_t remaininglength;
328: size_t encodelen;
329: char encodedbytes[4];
330:
331: result = mqtt_get_topic(conn, &topic, &topiclen);
332: if(result)
333: goto fail;
334:
335: remaininglength = payloadlen + 2 + topiclen;
336: encodelen = mqtt_encode_len(encodedbytes, remaininglength);
337:
338: /* add the control byte and the encoded remaining length */
339: pkt = malloc(remaininglength + 1 + encodelen);
340: if(!pkt) {
341: result = CURLE_OUT_OF_MEMORY;
342: goto fail;
343: }
344:
345: /* assemble packet */
346: pkt[i++] = MQTT_MSG_PUBLISH;
347: memcpy(&pkt[i], encodedbytes, encodelen);
348: i += encodelen;
349: pkt[i++] = (topiclen >> 8) & 0xff;
350: pkt[i++] = (topiclen & 0xff);
351: memcpy(&pkt[i], topic, topiclen);
352: i += topiclen;
353: memcpy(&pkt[i], payload, payloadlen);
354: i += payloadlen;
355: result = mqtt_send(conn, (char *)pkt, i);
356:
357: fail:
358: free(pkt);
359: free(topic);
360: return result;
361: }
362:
363: static size_t mqtt_decode_len(unsigned char *buf,
364: size_t buflen, size_t *lenbytes)
365: {
366: size_t len = 0;
367: size_t mult = 1;
368: size_t i;
369: unsigned char encoded = 128;
370:
371: for(i = 0; (i < buflen) && (encoded & 128); i++) {
372: encoded = buf[i];
373: len += (encoded & 127) * mult;
374: mult *= 128;
375: }
376:
377: if(lenbytes)
378: *lenbytes = i;
379:
380: return len;
381: }
382:
383: #ifdef CURLDEBUG
384: static const char *statenames[]={
385: "MQTT_FIRST",
386: "MQTT_REMAINING_LENGTH",
387: "MQTT_CONNACK",
388: "MQTT_SUBACK",
389: "MQTT_SUBACK_COMING",
390: "MQTT_PUBWAIT",
391: "MQTT_PUB_REMAIN",
392:
393: "NOT A STATE"
394: };
395: #endif
396:
397: /* The only way to change state */
398: static void mqstate(struct connectdata *conn,
399: enum mqttstate state,
400: enum mqttstate nextstate) /* used if state == FIRST */
401: {
402: struct mqtt_conn *mqtt = &conn->proto.mqtt;
403: #ifdef CURLDEBUG
404: infof(conn->data, "%s (from %s) (next is %s)\n",
405: statenames[state],
406: statenames[mqtt->state],
407: (state == MQTT_FIRST)? statenames[nextstate] : "");
408: #endif
409: mqtt->state = state;
410: if(state == MQTT_FIRST)
411: mqtt->nextstate = nextstate;
412: }
413:
414:
415: /* for the publish packet */
416: #define MQTT_HEADER_LEN 5 /* max 5 bytes */
417:
418: static CURLcode mqtt_read_publish(struct connectdata *conn,
419: bool *done)
420: {
421: CURLcode result = CURLE_OK;
422: curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
423: ssize_t nread;
424: struct Curl_easy *data = conn->data;
425: unsigned char *pkt = (unsigned char *)data->state.buffer;
426: size_t remlen;
427: struct mqtt_conn *mqtt = &conn->proto.mqtt;
428: struct MQTT *mq = data->req.protop;
429: unsigned char packet;
430:
431: switch(mqtt->state) {
432: MQTT_SUBACK_COMING:
433: case MQTT_SUBACK_COMING:
434: result = mqtt_verify_suback(conn);
435: if(result)
436: break;
437:
438: mqstate(conn, MQTT_FIRST, MQTT_PUBWAIT);
439: break;
440:
441: case MQTT_SUBACK:
442: case MQTT_PUBWAIT:
443: /* we are expecting PUBLISH or SUBACK */
444: packet = mq->firstbyte & 0xf0;
445: if(packet == MQTT_MSG_PUBLISH)
446: mqstate(conn, MQTT_PUB_REMAIN, MQTT_NOSTATE);
447: else if(packet == MQTT_MSG_SUBACK) {
448: mqstate(conn, MQTT_SUBACK_COMING, MQTT_NOSTATE);
449: goto MQTT_SUBACK_COMING;
450: }
451: else if(packet == MQTT_MSG_DISCONNECT) {
452: infof(data, "Got DISCONNECT\n");
453: *done = TRUE;
454: goto end;
455: }
456: else {
457: result = CURLE_WEIRD_SERVER_REPLY;
458: goto end;
459: }
460:
461: /* -- switched state -- */
462: remlen = mq->remaining_length;
463: infof(data, "Remaining length: %zd bytes\n", remlen);
464: Curl_pgrsSetDownloadSize(data, remlen);
465: data->req.bytecount = 0;
466: data->req.size = remlen;
467: mq->npacket = remlen; /* get this many bytes */
468: /* FALLTHROUGH */
469: case MQTT_PUB_REMAIN: {
470: /* read rest of packet, but no more. Cap to buffer size */
471: struct SingleRequest *k = &data->req;
472: size_t rest = mq->npacket;
473: if(rest > (size_t)data->set.buffer_size)
474: rest = (size_t)data->set.buffer_size;
475: result = Curl_read(conn, sockfd, (char *)pkt, rest, &nread);
476: if(result) {
477: if(CURLE_AGAIN == result) {
478: infof(data, "EEEE AAAAGAIN\n");
479: }
480: goto end;
481: }
482: if(!nread) {
483: infof(data, "server disconnected\n");
484: result = CURLE_PARTIAL_FILE;
485: goto end;
486: }
487: if(data->set.verbose)
488: Curl_debug(data, CURLINFO_DATA_IN, (char *)pkt, (size_t)nread);
489:
490: mq->npacket -= nread;
491: k->bytecount += nread;
492: Curl_pgrsSetDownloadCounter(data, k->bytecount);
493:
494: /* if QoS is set, message contains packet id */
495:
496: result = Curl_client_write(conn, CLIENTWRITE_BODY, (char *)pkt, nread);
497: if(result)
498: goto end;
499:
500: if(!mq->npacket)
501: /* no more PUBLISH payload, back to subscribe wait state */
502: mqstate(conn, MQTT_FIRST, MQTT_PUBWAIT);
503: break;
504: }
505: default:
506: DEBUGASSERT(NULL); /* illegal state */
507: result = CURLE_WEIRD_SERVER_REPLY;
508: goto end;
509: }
510: end:
511: return result;
512: }
513:
514: static CURLcode mqtt_do(struct connectdata *conn, bool *done)
515: {
516: CURLcode result = CURLE_OK;
517: struct Curl_easy *data = conn->data;
518:
519: *done = FALSE; /* unconditionally */
520:
521: result = mqtt_connect(conn);
522: if(result) {
523: failf(data, "Error %d sending MQTT CONN request", result);
524: return result;
525: }
526: mqstate(conn, MQTT_FIRST, MQTT_CONNACK);
527: return CURLE_OK;
528: }
529:
530: static CURLcode mqtt_doing(struct connectdata *conn, bool *done)
531: {
532: CURLcode result = CURLE_OK;
533: struct mqtt_conn *mqtt = &conn->proto.mqtt;
534: struct Curl_easy *data = conn->data;
535: struct MQTT *mq = data->req.protop;
536: ssize_t nread;
537: curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
538: unsigned char *pkt = (unsigned char *)data->state.buffer;
539: unsigned char byte;
540:
541: *done = FALSE;
542:
543: if(mq->nsend) {
544: /* send the remainder of an outgoing packet */
545: char *ptr = mq->sendleftovers;
546: result = mqtt_send(conn, mq->sendleftovers, mq->nsend);
547: free(ptr);
548: if(result)
549: return result;
550: }
551:
552: infof(data, "mqtt_doing: state [%d]\n", (int) mqtt->state);
553: switch(mqtt->state) {
554: case MQTT_FIRST:
555: /* Read the initial byte only */
556: result = Curl_read(conn, sockfd, (char *)&mq->firstbyte, 1, &nread);
557: if(result)
558: break;
559: if(data->set.verbose)
560: Curl_debug(data, CURLINFO_HEADER_IN, (char *)&mq->firstbyte, 1);
561: /* remember the first byte */
562: mq->npacket = 0;
563: mqstate(conn, MQTT_REMAINING_LENGTH, MQTT_NOSTATE);
564: /* FALLTHROUGH */
565: case MQTT_REMAINING_LENGTH:
566: do {
567: result = Curl_read(conn, sockfd, (char *)&byte, 1, &nread);
568: if(result)
569: break;
570: if(data->set.verbose)
571: Curl_debug(data, CURLINFO_HEADER_IN, (char *)&byte, 1);
572: pkt[mq->npacket++] = byte;
573: } while((byte & 0x80) && (mq->npacket < 4));
574: if(result)
575: break;
576: mq->remaining_length = mqtt_decode_len(&pkt[0], mq->npacket, NULL);
577: mq->npacket = 0;
578: if(mq->remaining_length) {
579: mqstate(conn, mqtt->nextstate, MQTT_NOSTATE);
580: break;
581: }
582: mqstate(conn, MQTT_FIRST, MQTT_FIRST);
583:
584: if(mq->firstbyte == MQTT_MSG_DISCONNECT) {
585: infof(data, "Got DISCONNECT\n");
586: *done = TRUE;
587: }
588: break;
589: case MQTT_CONNACK:
590: result = mqtt_verify_connack(conn);
591: if(result)
592: break;
593:
594: if(conn->data->set.httpreq == HTTPREQ_POST) {
595: result = mqtt_publish(conn);
596: if(!result) {
597: result = mqtt_disconnect(conn);
598: *done = TRUE;
599: }
600: mqtt->nextstate = MQTT_FIRST;
601: }
602: else {
603: result = mqtt_subscribe(conn);
604: if(!result) {
605: mqstate(conn, MQTT_FIRST, MQTT_SUBACK);
606: }
607: }
608: break;
609:
610: case MQTT_SUBACK:
611: case MQTT_PUBWAIT:
612: case MQTT_PUB_REMAIN:
613: result = mqtt_read_publish(conn, done);
614: break;
615:
616: default:
617: failf(conn->data, "State not handled yet");
618: *done = TRUE;
619: break;
620: }
621:
622: if(result == CURLE_AGAIN)
623: result = CURLE_OK;
624: return result;
625: }
626:
627: #endif /* CURL_ENABLE_MQTT */
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>