File:  [ELWIX - Embedded LightWeight unIX -] / embedaddon / curl / lib / mqtt.c
Revision 1.1.1.1 (vendor branch): download - view: text, annotated - select for diffs - revision graph
Wed Jun 3 10:01:15 2020 UTC (4 years, 1 month ago) by misho
Branches: curl, MAIN
CVS tags: v7_70_0p4, HEAD
curl

    1: /***************************************************************************
    2:  *                                  _   _ ____  _
    3:  *  Project                     ___| | | |  _ \| |
    4:  *                             / __| | | | |_) | |
    5:  *                            | (__| |_| |  _ <| |___
    6:  *                             \___|\___/|_| \_\_____|
    7:  *
    8:  * Copyright (C) 2020, Daniel Stenberg, <daniel@haxx.se>, et al.
    9:  * Copyright (C) 2019, Björn Stenberg, <bjorn@haxx.se>
   10:  *
   11:  * This software is licensed as described in the file COPYING, which
   12:  * you should have received as part of this distribution. The terms
   13:  * are also available at https://curl.haxx.se/docs/copyright.html.
   14:  *
   15:  * You may opt to use, copy, modify, merge, publish, distribute and/or sell
   16:  * copies of the Software, and permit persons to whom the Software is
   17:  * furnished to do so, under the terms of the COPYING file.
   18:  *
   19:  * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
   20:  * KIND, either express or implied.
   21:  *
   22:  ***************************************************************************/
   23: 
   24: #include "curl_setup.h"
   25: 
   26: #ifdef CURL_ENABLE_MQTT
   27: 
   28: #include "urldata.h"
   29: #include <curl/curl.h>
   30: #include "transfer.h"
   31: #include "sendf.h"
   32: #include "progress.h"
   33: #include "mqtt.h"
   34: #include "select.h"
   35: #include "strdup.h"
   36: #include "url.h"
   37: #include "escape.h"
   38: #include "warnless.h"
   39: #include "curl_printf.h"
   40: #include "curl_memory.h"
   41: #include "multiif.h"
   42: #include "rand.h"
   43: 
   44: /* The last #include file should be: */
   45: #include "memdebug.h"
   46: 
   47: #define MQTT_MSG_CONNECT   0x10
   48: #define MQTT_MSG_CONNACK   0x20
   49: #define MQTT_MSG_PUBLISH   0x30
   50: #define MQTT_MSG_SUBSCRIBE 0x82
   51: #define MQTT_MSG_SUBACK    0x90
   52: #define MQTT_MSG_DISCONNECT 0xe0
   53: 
   54: #define MQTT_CONNACK_LEN 2
   55: #define MQTT_SUBACK_LEN 3
   56: #define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */
   57: 
   58: /*
   59:  * Forward declarations.
   60:  */
   61: 
   62: static CURLcode mqtt_do(struct connectdata *conn, bool *done);
   63: static CURLcode mqtt_doing(struct connectdata *conn, bool *done);
   64: static int mqtt_getsock(struct connectdata *conn, curl_socket_t *sock);
   65: static CURLcode mqtt_setup_conn(struct connectdata *conn);
   66: 
   67: /*
   68:  * MQTT protocol handler.
   69:  */
   70: 
   71: const struct Curl_handler Curl_handler_mqtt = {
   72:   "MQTT",                             /* scheme */
   73:   mqtt_setup_conn,                    /* setup_connection */
   74:   mqtt_do,                            /* do_it */
   75:   ZERO_NULL,                          /* done */
   76:   ZERO_NULL,                          /* do_more */
   77:   ZERO_NULL,                          /* connect_it */
   78:   ZERO_NULL,                          /* connecting */
   79:   mqtt_doing,                         /* doing */
   80:   ZERO_NULL,                          /* proto_getsock */
   81:   mqtt_getsock,                       /* doing_getsock */
   82:   ZERO_NULL,                          /* domore_getsock */
   83:   ZERO_NULL,                          /* perform_getsock */
   84:   ZERO_NULL,                          /* disconnect */
   85:   ZERO_NULL,                          /* readwrite */
   86:   ZERO_NULL,                          /* connection_check */
   87:   PORT_MQTT,                          /* defport */
   88:   CURLPROTO_MQTT,                     /* protocol */
   89:   PROTOPT_NONE                        /* flags */
   90: };
   91: 
   92: static CURLcode mqtt_setup_conn(struct connectdata *conn)
   93: {
   94:   /* allocate the HTTP-specific struct for the Curl_easy, only to survive
   95:      during this request */
   96:   struct MQTT *mq;
   97:   struct Curl_easy *data = conn->data;
   98:   DEBUGASSERT(data->req.protop == NULL);
   99: 
  100:   mq = calloc(1, sizeof(struct MQTT));
  101:   if(!mq)
  102:     return CURLE_OUT_OF_MEMORY;
  103:   data->req.protop = mq;
  104:   return CURLE_OK;
  105: }
  106: 
  107: static CURLcode mqtt_send(struct connectdata *conn,
  108:                           char *buf, size_t len)
  109: {
  110:   CURLcode result = CURLE_OK;
  111:   curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
  112:   struct Curl_easy *data = conn->data;
  113:   struct MQTT *mq = data->req.protop;
  114:   ssize_t n;
  115:   result = Curl_write(conn, sockfd, buf, len, &n);
  116:   if(!result && data->set.verbose)
  117:     Curl_debug(data, CURLINFO_HEADER_OUT, buf, (size_t)n);
  118:   if(len != (size_t)n) {
  119:     size_t nsend = len - n;
  120:     char *sendleftovers = Curl_memdup(&buf[n], nsend);
  121:     if(!sendleftovers)
  122:       return CURLE_OUT_OF_MEMORY;
  123:     mq->sendleftovers = sendleftovers;
  124:     mq->nsend = nsend;
  125:   }
  126:   return result;
  127: }
  128: 
  129: /* Generic function called by the multi interface to figure out what socket(s)
  130:    to wait for and for what actions during the DOING and PROTOCONNECT
  131:    states */
  132: static int mqtt_getsock(struct connectdata *conn,
  133:                         curl_socket_t *sock)
  134: {
  135:   sock[0] = conn->sock[FIRSTSOCKET];
  136:   return GETSOCK_READSOCK(FIRSTSOCKET);
  137: }
  138: 
  139: static CURLcode mqtt_connect(struct connectdata *conn)
  140: {
  141:   CURLcode result = CURLE_OK;
  142:   const size_t client_id_offset = 14;
  143:   const size_t packetlen = client_id_offset + MQTT_CLIENTID_LEN;
  144:   char client_id[MQTT_CLIENTID_LEN + 1] = "curl";
  145:   const size_t curl_len = strlen("curl");
  146:   char packet[32] = {
  147:     MQTT_MSG_CONNECT,  /* packet type */
  148:     0x00,              /* remaining length */
  149:     0x00, 0x04,        /* protocol length */
  150:     'M','Q','T','T',   /* protocol name */
  151:     0x04,              /* protocol level */
  152:     0x02,              /* CONNECT flag: CleanSession */
  153:     0x00, 0x3c,        /* keep-alive 0 = disabled */
  154:     0x00, 0x00         /* payload1 length */
  155:   };
  156:   packet[1] = (packetlen - 2) & 0x7f;
  157:   packet[client_id_offset - 1] = MQTT_CLIENTID_LEN;
  158: 
  159:   result = Curl_rand_hex(conn->data, (unsigned char *)&client_id[curl_len],
  160:                          MQTT_CLIENTID_LEN - curl_len + 1);
  161:   memcpy(&packet[client_id_offset], client_id, MQTT_CLIENTID_LEN);
  162:   infof(conn->data, "Using client id '%s'\n", client_id);
  163:   if(!result)
  164:     result = mqtt_send(conn, packet, packetlen);
  165:   return result;
  166: }
  167: 
  168: static CURLcode mqtt_disconnect(struct connectdata *conn)
  169: {
  170:   CURLcode result = CURLE_OK;
  171:   result = mqtt_send(conn, (char *)"\xe0\x00", 2);
  172:   return result;
  173: }
  174: 
  175: static CURLcode mqtt_verify_connack(struct connectdata *conn)
  176: {
  177:   CURLcode result;
  178:   curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
  179:   unsigned char readbuf[MQTT_CONNACK_LEN];
  180:   ssize_t nread;
  181:   struct Curl_easy *data = conn->data;
  182: 
  183:   result = Curl_read(conn, sockfd, (char *)readbuf, MQTT_CONNACK_LEN, &nread);
  184:   if(result)
  185:     goto fail;
  186: 
  187:   if(data->set.verbose)
  188:     Curl_debug(data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread);
  189: 
  190:   /* fixme */
  191:   if(nread < MQTT_CONNACK_LEN) {
  192:     result = CURLE_WEIRD_SERVER_REPLY;
  193:     goto fail;
  194:   }
  195: 
  196:   /* verify CONNACK */
  197:   if(readbuf[0] != 0x00 || readbuf[1] != 0x00) {
  198:     failf(data, "Expected %02x%02x but got %02x%02x",
  199:           0x00, 0x00, readbuf[0], readbuf[1]);
  200:     result = CURLE_WEIRD_SERVER_REPLY;
  201:   }
  202: 
  203: fail:
  204:   return result;
  205: }
  206: 
  207: static CURLcode mqtt_get_topic(struct connectdata *conn,
  208:                                char **topic, size_t *topiclen)
  209: {
  210:   CURLcode result = CURLE_OK;
  211:   char *path = conn->data->state.up.path;
  212: 
  213:   if(strlen(path) > 1) {
  214:     result = Curl_urldecode(conn->data, path + 1, 0, topic, topiclen, FALSE);
  215:   }
  216:   else {
  217:     failf(conn->data, "Error: No topic specified.");
  218:     result = CURLE_URL_MALFORMAT;
  219:   }
  220:   return result;
  221: }
  222: 
  223: 
  224: static int mqtt_encode_len(char *buf, size_t len)
  225: {
  226:   unsigned char encoded;
  227:   int i;
  228: 
  229:   for(i = 0; (len > 0) && (i<4); i++) {
  230:     encoded = len % 0x80;
  231:     len /= 0x80;
  232:     if(len)
  233:       encoded |= 0x80;
  234:     buf[i] = encoded;
  235:   }
  236: 
  237:   return i;
  238: }
  239: 
  240: static CURLcode mqtt_subscribe(struct connectdata *conn)
  241: {
  242:   CURLcode result = CURLE_OK;
  243:   char *topic = NULL;
  244:   size_t topiclen;
  245:   unsigned char *packet = NULL;
  246:   size_t packetlen;
  247:   char encodedsize[4];
  248:   size_t n;
  249: 
  250:   result = mqtt_get_topic(conn, &topic, &topiclen);
  251:   if(result)
  252:     goto fail;
  253: 
  254:   conn->proto.mqtt.packetid++;
  255: 
  256:   packetlen = topiclen + 5; /* packetid + topic (has a two byte length field)
  257:                                + 2 bytes topic length + QoS byte */
  258:   n = mqtt_encode_len((char *)encodedsize, packetlen);
  259:   packetlen += n + 1; /* add one for the control packet type byte */
  260: 
  261:   packet = malloc(packetlen);
  262:   if(!packet) {
  263:     result = CURLE_OUT_OF_MEMORY;
  264:     goto fail;
  265:   }
  266: 
  267:   packet[0] = MQTT_MSG_SUBSCRIBE;
  268:   memcpy(&packet[1], encodedsize, n);
  269:   packet[1 + n] = (conn->proto.mqtt.packetid >> 8) & 0xff;
  270:   packet[2 + n] = conn->proto.mqtt.packetid & 0xff;
  271:   packet[3 + n] = (topiclen >> 8) & 0xff;
  272:   packet[4 + n ] = topiclen & 0xff;
  273:   memcpy(&packet[5 + n], topic, topiclen);
  274:   packet[5 + n + topiclen] = 0; /* QoS zero */
  275: 
  276:   result = mqtt_send(conn, (char *)packet, packetlen);
  277: 
  278: fail:
  279:   free(topic);
  280:   free(packet);
  281:   return result;
  282: }
  283: 
  284: /*
  285:  * Called when the first byte was already read.
  286:  */
  287: static CURLcode mqtt_verify_suback(struct connectdata *conn)
  288: {
  289:   CURLcode result;
  290:   curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
  291:   unsigned char readbuf[MQTT_SUBACK_LEN];
  292:   ssize_t nread;
  293:   struct mqtt_conn *mqtt = &conn->proto.mqtt;
  294: 
  295:   result = Curl_read(conn, sockfd, (char *)readbuf, MQTT_SUBACK_LEN, &nread);
  296:   if(result)
  297:     goto fail;
  298: 
  299:   if(conn->data->set.verbose)
  300:     Curl_debug(conn->data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread);
  301: 
  302:   /* fixme */
  303:   if(nread < MQTT_SUBACK_LEN) {
  304:     result = CURLE_WEIRD_SERVER_REPLY;
  305:     goto fail;
  306:   }
  307: 
  308:   /* verify SUBACK */
  309:   if(readbuf[0] != ((mqtt->packetid >> 8) & 0xff) ||
  310:      readbuf[1] != (mqtt->packetid & 0xff) ||
  311:      readbuf[2] != 0x00)
  312:     result = CURLE_WEIRD_SERVER_REPLY;
  313: 
  314: fail:
  315:   return result;
  316: }
  317: 
  318: static CURLcode mqtt_publish(struct connectdata *conn)
  319: {
  320:   CURLcode result;
  321:   char *payload = conn->data->set.postfields;
  322:   size_t payloadlen = (size_t)conn->data->set.postfieldsize;
  323:   char *topic = NULL;
  324:   size_t topiclen;
  325:   unsigned char *pkt = NULL;
  326:   size_t i = 0;
  327:   size_t remaininglength;
  328:   size_t encodelen;
  329:   char encodedbytes[4];
  330: 
  331:   result = mqtt_get_topic(conn, &topic, &topiclen);
  332:   if(result)
  333:     goto fail;
  334: 
  335:   remaininglength = payloadlen + 2 + topiclen;
  336:   encodelen = mqtt_encode_len(encodedbytes, remaininglength);
  337: 
  338:   /* add the control byte and the encoded remaining length */
  339:   pkt = malloc(remaininglength + 1 + encodelen);
  340:   if(!pkt) {
  341:     result = CURLE_OUT_OF_MEMORY;
  342:     goto fail;
  343:   }
  344: 
  345:   /* assemble packet */
  346:   pkt[i++] = MQTT_MSG_PUBLISH;
  347:   memcpy(&pkt[i], encodedbytes, encodelen);
  348:   i += encodelen;
  349:   pkt[i++] = (topiclen >> 8) & 0xff;
  350:   pkt[i++] = (topiclen & 0xff);
  351:   memcpy(&pkt[i], topic, topiclen);
  352:   i += topiclen;
  353:   memcpy(&pkt[i], payload, payloadlen);
  354:   i += payloadlen;
  355:   result = mqtt_send(conn, (char *)pkt, i);
  356: 
  357: fail:
  358:   free(pkt);
  359:   free(topic);
  360:   return result;
  361: }
  362: 
  363: static size_t mqtt_decode_len(unsigned char *buf,
  364:                               size_t buflen, size_t *lenbytes)
  365: {
  366:   size_t len = 0;
  367:   size_t mult = 1;
  368:   size_t i;
  369:   unsigned char encoded = 128;
  370: 
  371:   for(i = 0; (i < buflen) && (encoded & 128); i++) {
  372:     encoded = buf[i];
  373:     len += (encoded & 127) * mult;
  374:     mult *= 128;
  375:   }
  376: 
  377:   if(lenbytes)
  378:     *lenbytes = i;
  379: 
  380:   return len;
  381: }
  382: 
  383: #ifdef CURLDEBUG
  384: static const char *statenames[]={
  385:   "MQTT_FIRST",
  386:   "MQTT_REMAINING_LENGTH",
  387:   "MQTT_CONNACK",
  388:   "MQTT_SUBACK",
  389:   "MQTT_SUBACK_COMING",
  390:   "MQTT_PUBWAIT",
  391:   "MQTT_PUB_REMAIN",
  392: 
  393:   "NOT A STATE"
  394: };
  395: #endif
  396: 
  397: /* The only way to change state */
  398: static void mqstate(struct connectdata *conn,
  399:                     enum mqttstate state,
  400:                     enum mqttstate nextstate) /* used if state == FIRST */
  401: {
  402:   struct mqtt_conn *mqtt = &conn->proto.mqtt;
  403: #ifdef CURLDEBUG
  404:   infof(conn->data, "%s (from %s) (next is %s)\n",
  405:         statenames[state],
  406:         statenames[mqtt->state],
  407:         (state == MQTT_FIRST)? statenames[nextstate] : "");
  408: #endif
  409:   mqtt->state = state;
  410:   if(state == MQTT_FIRST)
  411:     mqtt->nextstate = nextstate;
  412: }
  413: 
  414: 
  415: /* for the publish packet */
  416: #define MQTT_HEADER_LEN 5    /* max 5 bytes */
  417: 
  418: static CURLcode mqtt_read_publish(struct connectdata *conn,
  419:                                   bool *done)
  420: {
  421:   CURLcode result = CURLE_OK;
  422:   curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
  423:   ssize_t nread;
  424:   struct Curl_easy *data = conn->data;
  425:   unsigned char *pkt = (unsigned char *)data->state.buffer;
  426:   size_t remlen;
  427:   struct mqtt_conn *mqtt = &conn->proto.mqtt;
  428:   struct MQTT *mq = data->req.protop;
  429:   unsigned char packet;
  430: 
  431:   switch(mqtt->state) {
  432:   MQTT_SUBACK_COMING:
  433:   case MQTT_SUBACK_COMING:
  434:     result = mqtt_verify_suback(conn);
  435:     if(result)
  436:       break;
  437: 
  438:     mqstate(conn, MQTT_FIRST, MQTT_PUBWAIT);
  439:     break;
  440: 
  441:   case MQTT_SUBACK:
  442:   case MQTT_PUBWAIT:
  443:     /* we are expecting PUBLISH or SUBACK */
  444:     packet = mq->firstbyte & 0xf0;
  445:     if(packet == MQTT_MSG_PUBLISH)
  446:       mqstate(conn, MQTT_PUB_REMAIN, MQTT_NOSTATE);
  447:     else if(packet == MQTT_MSG_SUBACK) {
  448:       mqstate(conn, MQTT_SUBACK_COMING, MQTT_NOSTATE);
  449:       goto MQTT_SUBACK_COMING;
  450:     }
  451:     else if(packet == MQTT_MSG_DISCONNECT) {
  452:       infof(data, "Got DISCONNECT\n");
  453:       *done = TRUE;
  454:       goto end;
  455:     }
  456:     else {
  457:       result = CURLE_WEIRD_SERVER_REPLY;
  458:       goto end;
  459:     }
  460: 
  461:     /* -- switched state -- */
  462:     remlen = mq->remaining_length;
  463:     infof(data, "Remaining length: %zd bytes\n", remlen);
  464:     Curl_pgrsSetDownloadSize(data, remlen);
  465:     data->req.bytecount = 0;
  466:     data->req.size = remlen;
  467:     mq->npacket = remlen; /* get this many bytes */
  468:     /* FALLTHROUGH */
  469:   case MQTT_PUB_REMAIN: {
  470:     /* read rest of packet, but no more. Cap to buffer size */
  471:     struct SingleRequest *k = &data->req;
  472:     size_t rest = mq->npacket;
  473:     if(rest > (size_t)data->set.buffer_size)
  474:       rest = (size_t)data->set.buffer_size;
  475:     result = Curl_read(conn, sockfd, (char *)pkt, rest, &nread);
  476:     if(result) {
  477:       if(CURLE_AGAIN == result) {
  478:         infof(data, "EEEE AAAAGAIN\n");
  479:       }
  480:       goto end;
  481:     }
  482:     if(!nread) {
  483:       infof(data, "server disconnected\n");
  484:       result = CURLE_PARTIAL_FILE;
  485:       goto end;
  486:     }
  487:     if(data->set.verbose)
  488:       Curl_debug(data, CURLINFO_DATA_IN, (char *)pkt, (size_t)nread);
  489: 
  490:     mq->npacket -= nread;
  491:     k->bytecount += nread;
  492:     Curl_pgrsSetDownloadCounter(data, k->bytecount);
  493: 
  494:     /* if QoS is set, message contains packet id */
  495: 
  496:     result = Curl_client_write(conn, CLIENTWRITE_BODY, (char *)pkt, nread);
  497:     if(result)
  498:       goto end;
  499: 
  500:     if(!mq->npacket)
  501:       /* no more PUBLISH payload, back to subscribe wait state */
  502:       mqstate(conn, MQTT_FIRST, MQTT_PUBWAIT);
  503:     break;
  504:   }
  505:   default:
  506:     DEBUGASSERT(NULL); /* illegal state */
  507:     result = CURLE_WEIRD_SERVER_REPLY;
  508:     goto end;
  509:   }
  510:   end:
  511:   return result;
  512: }
  513: 
  514: static CURLcode mqtt_do(struct connectdata *conn, bool *done)
  515: {
  516:   CURLcode result = CURLE_OK;
  517:   struct Curl_easy *data = conn->data;
  518: 
  519:   *done = FALSE; /* unconditionally */
  520: 
  521:   result = mqtt_connect(conn);
  522:   if(result) {
  523:     failf(data, "Error %d sending MQTT CONN request", result);
  524:     return result;
  525:   }
  526:   mqstate(conn, MQTT_FIRST, MQTT_CONNACK);
  527:   return CURLE_OK;
  528: }
  529: 
  530: static CURLcode mqtt_doing(struct connectdata *conn, bool *done)
  531: {
  532:   CURLcode result = CURLE_OK;
  533:   struct mqtt_conn *mqtt = &conn->proto.mqtt;
  534:   struct Curl_easy *data = conn->data;
  535:   struct MQTT *mq = data->req.protop;
  536:   ssize_t nread;
  537:   curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
  538:   unsigned char *pkt = (unsigned char *)data->state.buffer;
  539:   unsigned char byte;
  540: 
  541:   *done = FALSE;
  542: 
  543:   if(mq->nsend) {
  544:     /* send the remainder of an outgoing packet */
  545:     char *ptr = mq->sendleftovers;
  546:     result = mqtt_send(conn, mq->sendleftovers, mq->nsend);
  547:     free(ptr);
  548:     if(result)
  549:       return result;
  550:   }
  551: 
  552:   infof(data, "mqtt_doing: state [%d]\n", (int) mqtt->state);
  553:   switch(mqtt->state) {
  554:   case MQTT_FIRST:
  555:     /* Read the initial byte only */
  556:     result = Curl_read(conn, sockfd, (char *)&mq->firstbyte, 1, &nread);
  557:     if(result)
  558:       break;
  559:     if(data->set.verbose)
  560:       Curl_debug(data, CURLINFO_HEADER_IN, (char *)&mq->firstbyte, 1);
  561:     /* remember the first byte */
  562:     mq->npacket = 0;
  563:     mqstate(conn, MQTT_REMAINING_LENGTH, MQTT_NOSTATE);
  564:     /* FALLTHROUGH */
  565:   case MQTT_REMAINING_LENGTH:
  566:     do {
  567:       result = Curl_read(conn, sockfd, (char *)&byte, 1, &nread);
  568:       if(result)
  569:         break;
  570:       if(data->set.verbose)
  571:         Curl_debug(data, CURLINFO_HEADER_IN, (char *)&byte, 1);
  572:       pkt[mq->npacket++] = byte;
  573:     } while((byte & 0x80) && (mq->npacket < 4));
  574:     if(result)
  575:       break;
  576:     mq->remaining_length = mqtt_decode_len(&pkt[0], mq->npacket, NULL);
  577:     mq->npacket = 0;
  578:     if(mq->remaining_length) {
  579:       mqstate(conn, mqtt->nextstate, MQTT_NOSTATE);
  580:       break;
  581:     }
  582:     mqstate(conn, MQTT_FIRST, MQTT_FIRST);
  583: 
  584:     if(mq->firstbyte == MQTT_MSG_DISCONNECT) {
  585:       infof(data, "Got DISCONNECT\n");
  586:       *done = TRUE;
  587:     }
  588:     break;
  589:   case MQTT_CONNACK:
  590:     result = mqtt_verify_connack(conn);
  591:     if(result)
  592:       break;
  593: 
  594:     if(conn->data->set.httpreq == HTTPREQ_POST) {
  595:       result = mqtt_publish(conn);
  596:       if(!result) {
  597:         result = mqtt_disconnect(conn);
  598:         *done = TRUE;
  599:       }
  600:       mqtt->nextstate = MQTT_FIRST;
  601:     }
  602:     else {
  603:       result = mqtt_subscribe(conn);
  604:       if(!result) {
  605:         mqstate(conn, MQTT_FIRST, MQTT_SUBACK);
  606:       }
  607:     }
  608:     break;
  609: 
  610:   case MQTT_SUBACK:
  611:   case MQTT_PUBWAIT:
  612:   case MQTT_PUB_REMAIN:
  613:     result = mqtt_read_publish(conn, done);
  614:     break;
  615: 
  616:   default:
  617:     failf(conn->data, "State not handled yet");
  618:     *done = TRUE;
  619:     break;
  620:   }
  621: 
  622:   if(result == CURLE_AGAIN)
  623:     result = CURLE_OK;
  624:   return result;
  625: }
  626: 
  627: #endif /* CURL_ENABLE_MQTT */

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>