Annotation of embedaddon/curl/lib/mqtt.c, revision 1.1
1.1 ! misho 1: /***************************************************************************
! 2: * _ _ ____ _
! 3: * Project ___| | | | _ \| |
! 4: * / __| | | | |_) | |
! 5: * | (__| |_| | _ <| |___
! 6: * \___|\___/|_| \_\_____|
! 7: *
! 8: * Copyright (C) 2020, Daniel Stenberg, <daniel@haxx.se>, et al.
! 9: * Copyright (C) 2019, Björn Stenberg, <bjorn@haxx.se>
! 10: *
! 11: * This software is licensed as described in the file COPYING, which
! 12: * you should have received as part of this distribution. The terms
! 13: * are also available at https://curl.haxx.se/docs/copyright.html.
! 14: *
! 15: * You may opt to use, copy, modify, merge, publish, distribute and/or sell
! 16: * copies of the Software, and permit persons to whom the Software is
! 17: * furnished to do so, under the terms of the COPYING file.
! 18: *
! 19: * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
! 20: * KIND, either express or implied.
! 21: *
! 22: ***************************************************************************/
! 23:
! 24: #include "curl_setup.h"
! 25:
! 26: #ifdef CURL_ENABLE_MQTT
! 27:
! 28: #include "urldata.h"
! 29: #include <curl/curl.h>
! 30: #include "transfer.h"
! 31: #include "sendf.h"
! 32: #include "progress.h"
! 33: #include "mqtt.h"
! 34: #include "select.h"
! 35: #include "strdup.h"
! 36: #include "url.h"
! 37: #include "escape.h"
! 38: #include "warnless.h"
! 39: #include "curl_printf.h"
! 40: #include "curl_memory.h"
! 41: #include "multiif.h"
! 42: #include "rand.h"
! 43:
! 44: /* The last #include file should be: */
! 45: #include "memdebug.h"
! 46:
! 47: #define MQTT_MSG_CONNECT 0x10
! 48: #define MQTT_MSG_CONNACK 0x20
! 49: #define MQTT_MSG_PUBLISH 0x30
! 50: #define MQTT_MSG_SUBSCRIBE 0x82
! 51: #define MQTT_MSG_SUBACK 0x90
! 52: #define MQTT_MSG_DISCONNECT 0xe0
! 53:
! 54: #define MQTT_CONNACK_LEN 2
! 55: #define MQTT_SUBACK_LEN 3
! 56: #define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */
! 57:
! 58: /*
! 59: * Forward declarations.
! 60: */
! 61:
! 62: static CURLcode mqtt_do(struct connectdata *conn, bool *done);
! 63: static CURLcode mqtt_doing(struct connectdata *conn, bool *done);
! 64: static int mqtt_getsock(struct connectdata *conn, curl_socket_t *sock);
! 65: static CURLcode mqtt_setup_conn(struct connectdata *conn);
! 66:
! 67: /*
! 68: * MQTT protocol handler.
! 69: */
! 70:
! 71: const struct Curl_handler Curl_handler_mqtt = {
! 72: "MQTT", /* scheme */
! 73: mqtt_setup_conn, /* setup_connection */
! 74: mqtt_do, /* do_it */
! 75: ZERO_NULL, /* done */
! 76: ZERO_NULL, /* do_more */
! 77: ZERO_NULL, /* connect_it */
! 78: ZERO_NULL, /* connecting */
! 79: mqtt_doing, /* doing */
! 80: ZERO_NULL, /* proto_getsock */
! 81: mqtt_getsock, /* doing_getsock */
! 82: ZERO_NULL, /* domore_getsock */
! 83: ZERO_NULL, /* perform_getsock */
! 84: ZERO_NULL, /* disconnect */
! 85: ZERO_NULL, /* readwrite */
! 86: ZERO_NULL, /* connection_check */
! 87: PORT_MQTT, /* defport */
! 88: CURLPROTO_MQTT, /* protocol */
! 89: PROTOPT_NONE /* flags */
! 90: };
! 91:
! 92: static CURLcode mqtt_setup_conn(struct connectdata *conn)
! 93: {
! 94: /* allocate the HTTP-specific struct for the Curl_easy, only to survive
! 95: during this request */
! 96: struct MQTT *mq;
! 97: struct Curl_easy *data = conn->data;
! 98: DEBUGASSERT(data->req.protop == NULL);
! 99:
! 100: mq = calloc(1, sizeof(struct MQTT));
! 101: if(!mq)
! 102: return CURLE_OUT_OF_MEMORY;
! 103: data->req.protop = mq;
! 104: return CURLE_OK;
! 105: }
! 106:
! 107: static CURLcode mqtt_send(struct connectdata *conn,
! 108: char *buf, size_t len)
! 109: {
! 110: CURLcode result = CURLE_OK;
! 111: curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
! 112: struct Curl_easy *data = conn->data;
! 113: struct MQTT *mq = data->req.protop;
! 114: ssize_t n;
! 115: result = Curl_write(conn, sockfd, buf, len, &n);
! 116: if(!result && data->set.verbose)
! 117: Curl_debug(data, CURLINFO_HEADER_OUT, buf, (size_t)n);
! 118: if(len != (size_t)n) {
! 119: size_t nsend = len - n;
! 120: char *sendleftovers = Curl_memdup(&buf[n], nsend);
! 121: if(!sendleftovers)
! 122: return CURLE_OUT_OF_MEMORY;
! 123: mq->sendleftovers = sendleftovers;
! 124: mq->nsend = nsend;
! 125: }
! 126: return result;
! 127: }
! 128:
! 129: /* Generic function called by the multi interface to figure out what socket(s)
! 130: to wait for and for what actions during the DOING and PROTOCONNECT
! 131: states */
! 132: static int mqtt_getsock(struct connectdata *conn,
! 133: curl_socket_t *sock)
! 134: {
! 135: sock[0] = conn->sock[FIRSTSOCKET];
! 136: return GETSOCK_READSOCK(FIRSTSOCKET);
! 137: }
! 138:
! 139: static CURLcode mqtt_connect(struct connectdata *conn)
! 140: {
! 141: CURLcode result = CURLE_OK;
! 142: const size_t client_id_offset = 14;
! 143: const size_t packetlen = client_id_offset + MQTT_CLIENTID_LEN;
! 144: char client_id[MQTT_CLIENTID_LEN + 1] = "curl";
! 145: const size_t curl_len = strlen("curl");
! 146: char packet[32] = {
! 147: MQTT_MSG_CONNECT, /* packet type */
! 148: 0x00, /* remaining length */
! 149: 0x00, 0x04, /* protocol length */
! 150: 'M','Q','T','T', /* protocol name */
! 151: 0x04, /* protocol level */
! 152: 0x02, /* CONNECT flag: CleanSession */
! 153: 0x00, 0x3c, /* keep-alive 0 = disabled */
! 154: 0x00, 0x00 /* payload1 length */
! 155: };
! 156: packet[1] = (packetlen - 2) & 0x7f;
! 157: packet[client_id_offset - 1] = MQTT_CLIENTID_LEN;
! 158:
! 159: result = Curl_rand_hex(conn->data, (unsigned char *)&client_id[curl_len],
! 160: MQTT_CLIENTID_LEN - curl_len + 1);
! 161: memcpy(&packet[client_id_offset], client_id, MQTT_CLIENTID_LEN);
! 162: infof(conn->data, "Using client id '%s'\n", client_id);
! 163: if(!result)
! 164: result = mqtt_send(conn, packet, packetlen);
! 165: return result;
! 166: }
! 167:
! 168: static CURLcode mqtt_disconnect(struct connectdata *conn)
! 169: {
! 170: CURLcode result = CURLE_OK;
! 171: result = mqtt_send(conn, (char *)"\xe0\x00", 2);
! 172: return result;
! 173: }
! 174:
! 175: static CURLcode mqtt_verify_connack(struct connectdata *conn)
! 176: {
! 177: CURLcode result;
! 178: curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
! 179: unsigned char readbuf[MQTT_CONNACK_LEN];
! 180: ssize_t nread;
! 181: struct Curl_easy *data = conn->data;
! 182:
! 183: result = Curl_read(conn, sockfd, (char *)readbuf, MQTT_CONNACK_LEN, &nread);
! 184: if(result)
! 185: goto fail;
! 186:
! 187: if(data->set.verbose)
! 188: Curl_debug(data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread);
! 189:
! 190: /* fixme */
! 191: if(nread < MQTT_CONNACK_LEN) {
! 192: result = CURLE_WEIRD_SERVER_REPLY;
! 193: goto fail;
! 194: }
! 195:
! 196: /* verify CONNACK */
! 197: if(readbuf[0] != 0x00 || readbuf[1] != 0x00) {
! 198: failf(data, "Expected %02x%02x but got %02x%02x",
! 199: 0x00, 0x00, readbuf[0], readbuf[1]);
! 200: result = CURLE_WEIRD_SERVER_REPLY;
! 201: }
! 202:
! 203: fail:
! 204: return result;
! 205: }
! 206:
! 207: static CURLcode mqtt_get_topic(struct connectdata *conn,
! 208: char **topic, size_t *topiclen)
! 209: {
! 210: CURLcode result = CURLE_OK;
! 211: char *path = conn->data->state.up.path;
! 212:
! 213: if(strlen(path) > 1) {
! 214: result = Curl_urldecode(conn->data, path + 1, 0, topic, topiclen, FALSE);
! 215: }
! 216: else {
! 217: failf(conn->data, "Error: No topic specified.");
! 218: result = CURLE_URL_MALFORMAT;
! 219: }
! 220: return result;
! 221: }
! 222:
! 223:
! 224: static int mqtt_encode_len(char *buf, size_t len)
! 225: {
! 226: unsigned char encoded;
! 227: int i;
! 228:
! 229: for(i = 0; (len > 0) && (i<4); i++) {
! 230: encoded = len % 0x80;
! 231: len /= 0x80;
! 232: if(len)
! 233: encoded |= 0x80;
! 234: buf[i] = encoded;
! 235: }
! 236:
! 237: return i;
! 238: }
! 239:
! 240: static CURLcode mqtt_subscribe(struct connectdata *conn)
! 241: {
! 242: CURLcode result = CURLE_OK;
! 243: char *topic = NULL;
! 244: size_t topiclen;
! 245: unsigned char *packet = NULL;
! 246: size_t packetlen;
! 247: char encodedsize[4];
! 248: size_t n;
! 249:
! 250: result = mqtt_get_topic(conn, &topic, &topiclen);
! 251: if(result)
! 252: goto fail;
! 253:
! 254: conn->proto.mqtt.packetid++;
! 255:
! 256: packetlen = topiclen + 5; /* packetid + topic (has a two byte length field)
! 257: + 2 bytes topic length + QoS byte */
! 258: n = mqtt_encode_len((char *)encodedsize, packetlen);
! 259: packetlen += n + 1; /* add one for the control packet type byte */
! 260:
! 261: packet = malloc(packetlen);
! 262: if(!packet) {
! 263: result = CURLE_OUT_OF_MEMORY;
! 264: goto fail;
! 265: }
! 266:
! 267: packet[0] = MQTT_MSG_SUBSCRIBE;
! 268: memcpy(&packet[1], encodedsize, n);
! 269: packet[1 + n] = (conn->proto.mqtt.packetid >> 8) & 0xff;
! 270: packet[2 + n] = conn->proto.mqtt.packetid & 0xff;
! 271: packet[3 + n] = (topiclen >> 8) & 0xff;
! 272: packet[4 + n ] = topiclen & 0xff;
! 273: memcpy(&packet[5 + n], topic, topiclen);
! 274: packet[5 + n + topiclen] = 0; /* QoS zero */
! 275:
! 276: result = mqtt_send(conn, (char *)packet, packetlen);
! 277:
! 278: fail:
! 279: free(topic);
! 280: free(packet);
! 281: return result;
! 282: }
! 283:
! 284: /*
! 285: * Called when the first byte was already read.
! 286: */
! 287: static CURLcode mqtt_verify_suback(struct connectdata *conn)
! 288: {
! 289: CURLcode result;
! 290: curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
! 291: unsigned char readbuf[MQTT_SUBACK_LEN];
! 292: ssize_t nread;
! 293: struct mqtt_conn *mqtt = &conn->proto.mqtt;
! 294:
! 295: result = Curl_read(conn, sockfd, (char *)readbuf, MQTT_SUBACK_LEN, &nread);
! 296: if(result)
! 297: goto fail;
! 298:
! 299: if(conn->data->set.verbose)
! 300: Curl_debug(conn->data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread);
! 301:
! 302: /* fixme */
! 303: if(nread < MQTT_SUBACK_LEN) {
! 304: result = CURLE_WEIRD_SERVER_REPLY;
! 305: goto fail;
! 306: }
! 307:
! 308: /* verify SUBACK */
! 309: if(readbuf[0] != ((mqtt->packetid >> 8) & 0xff) ||
! 310: readbuf[1] != (mqtt->packetid & 0xff) ||
! 311: readbuf[2] != 0x00)
! 312: result = CURLE_WEIRD_SERVER_REPLY;
! 313:
! 314: fail:
! 315: return result;
! 316: }
! 317:
! 318: static CURLcode mqtt_publish(struct connectdata *conn)
! 319: {
! 320: CURLcode result;
! 321: char *payload = conn->data->set.postfields;
! 322: size_t payloadlen = (size_t)conn->data->set.postfieldsize;
! 323: char *topic = NULL;
! 324: size_t topiclen;
! 325: unsigned char *pkt = NULL;
! 326: size_t i = 0;
! 327: size_t remaininglength;
! 328: size_t encodelen;
! 329: char encodedbytes[4];
! 330:
! 331: result = mqtt_get_topic(conn, &topic, &topiclen);
! 332: if(result)
! 333: goto fail;
! 334:
! 335: remaininglength = payloadlen + 2 + topiclen;
! 336: encodelen = mqtt_encode_len(encodedbytes, remaininglength);
! 337:
! 338: /* add the control byte and the encoded remaining length */
! 339: pkt = malloc(remaininglength + 1 + encodelen);
! 340: if(!pkt) {
! 341: result = CURLE_OUT_OF_MEMORY;
! 342: goto fail;
! 343: }
! 344:
! 345: /* assemble packet */
! 346: pkt[i++] = MQTT_MSG_PUBLISH;
! 347: memcpy(&pkt[i], encodedbytes, encodelen);
! 348: i += encodelen;
! 349: pkt[i++] = (topiclen >> 8) & 0xff;
! 350: pkt[i++] = (topiclen & 0xff);
! 351: memcpy(&pkt[i], topic, topiclen);
! 352: i += topiclen;
! 353: memcpy(&pkt[i], payload, payloadlen);
! 354: i += payloadlen;
! 355: result = mqtt_send(conn, (char *)pkt, i);
! 356:
! 357: fail:
! 358: free(pkt);
! 359: free(topic);
! 360: return result;
! 361: }
! 362:
! 363: static size_t mqtt_decode_len(unsigned char *buf,
! 364: size_t buflen, size_t *lenbytes)
! 365: {
! 366: size_t len = 0;
! 367: size_t mult = 1;
! 368: size_t i;
! 369: unsigned char encoded = 128;
! 370:
! 371: for(i = 0; (i < buflen) && (encoded & 128); i++) {
! 372: encoded = buf[i];
! 373: len += (encoded & 127) * mult;
! 374: mult *= 128;
! 375: }
! 376:
! 377: if(lenbytes)
! 378: *lenbytes = i;
! 379:
! 380: return len;
! 381: }
! 382:
! 383: #ifdef CURLDEBUG
! 384: static const char *statenames[]={
! 385: "MQTT_FIRST",
! 386: "MQTT_REMAINING_LENGTH",
! 387: "MQTT_CONNACK",
! 388: "MQTT_SUBACK",
! 389: "MQTT_SUBACK_COMING",
! 390: "MQTT_PUBWAIT",
! 391: "MQTT_PUB_REMAIN",
! 392:
! 393: "NOT A STATE"
! 394: };
! 395: #endif
! 396:
! 397: /* The only way to change state */
! 398: static void mqstate(struct connectdata *conn,
! 399: enum mqttstate state,
! 400: enum mqttstate nextstate) /* used if state == FIRST */
! 401: {
! 402: struct mqtt_conn *mqtt = &conn->proto.mqtt;
! 403: #ifdef CURLDEBUG
! 404: infof(conn->data, "%s (from %s) (next is %s)\n",
! 405: statenames[state],
! 406: statenames[mqtt->state],
! 407: (state == MQTT_FIRST)? statenames[nextstate] : "");
! 408: #endif
! 409: mqtt->state = state;
! 410: if(state == MQTT_FIRST)
! 411: mqtt->nextstate = nextstate;
! 412: }
! 413:
! 414:
! 415: /* for the publish packet */
! 416: #define MQTT_HEADER_LEN 5 /* max 5 bytes */
! 417:
! 418: static CURLcode mqtt_read_publish(struct connectdata *conn,
! 419: bool *done)
! 420: {
! 421: CURLcode result = CURLE_OK;
! 422: curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
! 423: ssize_t nread;
! 424: struct Curl_easy *data = conn->data;
! 425: unsigned char *pkt = (unsigned char *)data->state.buffer;
! 426: size_t remlen;
! 427: struct mqtt_conn *mqtt = &conn->proto.mqtt;
! 428: struct MQTT *mq = data->req.protop;
! 429: unsigned char packet;
! 430:
! 431: switch(mqtt->state) {
! 432: MQTT_SUBACK_COMING:
! 433: case MQTT_SUBACK_COMING:
! 434: result = mqtt_verify_suback(conn);
! 435: if(result)
! 436: break;
! 437:
! 438: mqstate(conn, MQTT_FIRST, MQTT_PUBWAIT);
! 439: break;
! 440:
! 441: case MQTT_SUBACK:
! 442: case MQTT_PUBWAIT:
! 443: /* we are expecting PUBLISH or SUBACK */
! 444: packet = mq->firstbyte & 0xf0;
! 445: if(packet == MQTT_MSG_PUBLISH)
! 446: mqstate(conn, MQTT_PUB_REMAIN, MQTT_NOSTATE);
! 447: else if(packet == MQTT_MSG_SUBACK) {
! 448: mqstate(conn, MQTT_SUBACK_COMING, MQTT_NOSTATE);
! 449: goto MQTT_SUBACK_COMING;
! 450: }
! 451: else if(packet == MQTT_MSG_DISCONNECT) {
! 452: infof(data, "Got DISCONNECT\n");
! 453: *done = TRUE;
! 454: goto end;
! 455: }
! 456: else {
! 457: result = CURLE_WEIRD_SERVER_REPLY;
! 458: goto end;
! 459: }
! 460:
! 461: /* -- switched state -- */
! 462: remlen = mq->remaining_length;
! 463: infof(data, "Remaining length: %zd bytes\n", remlen);
! 464: Curl_pgrsSetDownloadSize(data, remlen);
! 465: data->req.bytecount = 0;
! 466: data->req.size = remlen;
! 467: mq->npacket = remlen; /* get this many bytes */
! 468: /* FALLTHROUGH */
! 469: case MQTT_PUB_REMAIN: {
! 470: /* read rest of packet, but no more. Cap to buffer size */
! 471: struct SingleRequest *k = &data->req;
! 472: size_t rest = mq->npacket;
! 473: if(rest > (size_t)data->set.buffer_size)
! 474: rest = (size_t)data->set.buffer_size;
! 475: result = Curl_read(conn, sockfd, (char *)pkt, rest, &nread);
! 476: if(result) {
! 477: if(CURLE_AGAIN == result) {
! 478: infof(data, "EEEE AAAAGAIN\n");
! 479: }
! 480: goto end;
! 481: }
! 482: if(!nread) {
! 483: infof(data, "server disconnected\n");
! 484: result = CURLE_PARTIAL_FILE;
! 485: goto end;
! 486: }
! 487: if(data->set.verbose)
! 488: Curl_debug(data, CURLINFO_DATA_IN, (char *)pkt, (size_t)nread);
! 489:
! 490: mq->npacket -= nread;
! 491: k->bytecount += nread;
! 492: Curl_pgrsSetDownloadCounter(data, k->bytecount);
! 493:
! 494: /* if QoS is set, message contains packet id */
! 495:
! 496: result = Curl_client_write(conn, CLIENTWRITE_BODY, (char *)pkt, nread);
! 497: if(result)
! 498: goto end;
! 499:
! 500: if(!mq->npacket)
! 501: /* no more PUBLISH payload, back to subscribe wait state */
! 502: mqstate(conn, MQTT_FIRST, MQTT_PUBWAIT);
! 503: break;
! 504: }
! 505: default:
! 506: DEBUGASSERT(NULL); /* illegal state */
! 507: result = CURLE_WEIRD_SERVER_REPLY;
! 508: goto end;
! 509: }
! 510: end:
! 511: return result;
! 512: }
! 513:
! 514: static CURLcode mqtt_do(struct connectdata *conn, bool *done)
! 515: {
! 516: CURLcode result = CURLE_OK;
! 517: struct Curl_easy *data = conn->data;
! 518:
! 519: *done = FALSE; /* unconditionally */
! 520:
! 521: result = mqtt_connect(conn);
! 522: if(result) {
! 523: failf(data, "Error %d sending MQTT CONN request", result);
! 524: return result;
! 525: }
! 526: mqstate(conn, MQTT_FIRST, MQTT_CONNACK);
! 527: return CURLE_OK;
! 528: }
! 529:
! 530: static CURLcode mqtt_doing(struct connectdata *conn, bool *done)
! 531: {
! 532: CURLcode result = CURLE_OK;
! 533: struct mqtt_conn *mqtt = &conn->proto.mqtt;
! 534: struct Curl_easy *data = conn->data;
! 535: struct MQTT *mq = data->req.protop;
! 536: ssize_t nread;
! 537: curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
! 538: unsigned char *pkt = (unsigned char *)data->state.buffer;
! 539: unsigned char byte;
! 540:
! 541: *done = FALSE;
! 542:
! 543: if(mq->nsend) {
! 544: /* send the remainder of an outgoing packet */
! 545: char *ptr = mq->sendleftovers;
! 546: result = mqtt_send(conn, mq->sendleftovers, mq->nsend);
! 547: free(ptr);
! 548: if(result)
! 549: return result;
! 550: }
! 551:
! 552: infof(data, "mqtt_doing: state [%d]\n", (int) mqtt->state);
! 553: switch(mqtt->state) {
! 554: case MQTT_FIRST:
! 555: /* Read the initial byte only */
! 556: result = Curl_read(conn, sockfd, (char *)&mq->firstbyte, 1, &nread);
! 557: if(result)
! 558: break;
! 559: if(data->set.verbose)
! 560: Curl_debug(data, CURLINFO_HEADER_IN, (char *)&mq->firstbyte, 1);
! 561: /* remember the first byte */
! 562: mq->npacket = 0;
! 563: mqstate(conn, MQTT_REMAINING_LENGTH, MQTT_NOSTATE);
! 564: /* FALLTHROUGH */
! 565: case MQTT_REMAINING_LENGTH:
! 566: do {
! 567: result = Curl_read(conn, sockfd, (char *)&byte, 1, &nread);
! 568: if(result)
! 569: break;
! 570: if(data->set.verbose)
! 571: Curl_debug(data, CURLINFO_HEADER_IN, (char *)&byte, 1);
! 572: pkt[mq->npacket++] = byte;
! 573: } while((byte & 0x80) && (mq->npacket < 4));
! 574: if(result)
! 575: break;
! 576: mq->remaining_length = mqtt_decode_len(&pkt[0], mq->npacket, NULL);
! 577: mq->npacket = 0;
! 578: if(mq->remaining_length) {
! 579: mqstate(conn, mqtt->nextstate, MQTT_NOSTATE);
! 580: break;
! 581: }
! 582: mqstate(conn, MQTT_FIRST, MQTT_FIRST);
! 583:
! 584: if(mq->firstbyte == MQTT_MSG_DISCONNECT) {
! 585: infof(data, "Got DISCONNECT\n");
! 586: *done = TRUE;
! 587: }
! 588: break;
! 589: case MQTT_CONNACK:
! 590: result = mqtt_verify_connack(conn);
! 591: if(result)
! 592: break;
! 593:
! 594: if(conn->data->set.httpreq == HTTPREQ_POST) {
! 595: result = mqtt_publish(conn);
! 596: if(!result) {
! 597: result = mqtt_disconnect(conn);
! 598: *done = TRUE;
! 599: }
! 600: mqtt->nextstate = MQTT_FIRST;
! 601: }
! 602: else {
! 603: result = mqtt_subscribe(conn);
! 604: if(!result) {
! 605: mqstate(conn, MQTT_FIRST, MQTT_SUBACK);
! 606: }
! 607: }
! 608: break;
! 609:
! 610: case MQTT_SUBACK:
! 611: case MQTT_PUBWAIT:
! 612: case MQTT_PUB_REMAIN:
! 613: result = mqtt_read_publish(conn, done);
! 614: break;
! 615:
! 616: default:
! 617: failf(conn->data, "State not handled yet");
! 618: *done = TRUE;
! 619: break;
! 620: }
! 621:
! 622: if(result == CURLE_AGAIN)
! 623: result = CURLE_OK;
! 624: return result;
! 625: }
! 626:
! 627: #endif /* CURL_ENABLE_MQTT */
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>