Annotation of embedaddon/curl/lib/mqtt.c, revision 1.1

1.1     ! misho       1: /***************************************************************************
        !             2:  *                                  _   _ ____  _
        !             3:  *  Project                     ___| | | |  _ \| |
        !             4:  *                             / __| | | | |_) | |
        !             5:  *                            | (__| |_| |  _ <| |___
        !             6:  *                             \___|\___/|_| \_\_____|
        !             7:  *
        !             8:  * Copyright (C) 2020, Daniel Stenberg, <daniel@haxx.se>, et al.
        !             9:  * Copyright (C) 2019, Björn Stenberg, <bjorn@haxx.se>
        !            10:  *
        !            11:  * This software is licensed as described in the file COPYING, which
        !            12:  * you should have received as part of this distribution. The terms
        !            13:  * are also available at https://curl.haxx.se/docs/copyright.html.
        !            14:  *
        !            15:  * You may opt to use, copy, modify, merge, publish, distribute and/or sell
        !            16:  * copies of the Software, and permit persons to whom the Software is
        !            17:  * furnished to do so, under the terms of the COPYING file.
        !            18:  *
        !            19:  * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
        !            20:  * KIND, either express or implied.
        !            21:  *
        !            22:  ***************************************************************************/
        !            23: 
        !            24: #include "curl_setup.h"
        !            25: 
        !            26: #ifdef CURL_ENABLE_MQTT
        !            27: 
        !            28: #include "urldata.h"
        !            29: #include <curl/curl.h>
        !            30: #include "transfer.h"
        !            31: #include "sendf.h"
        !            32: #include "progress.h"
        !            33: #include "mqtt.h"
        !            34: #include "select.h"
        !            35: #include "strdup.h"
        !            36: #include "url.h"
        !            37: #include "escape.h"
        !            38: #include "warnless.h"
        !            39: #include "curl_printf.h"
        !            40: #include "curl_memory.h"
        !            41: #include "multiif.h"
        !            42: #include "rand.h"
        !            43: 
        !            44: /* The last #include file should be: */
        !            45: #include "memdebug.h"
        !            46: 
        !            47: #define MQTT_MSG_CONNECT   0x10
        !            48: #define MQTT_MSG_CONNACK   0x20
        !            49: #define MQTT_MSG_PUBLISH   0x30
        !            50: #define MQTT_MSG_SUBSCRIBE 0x82
        !            51: #define MQTT_MSG_SUBACK    0x90
        !            52: #define MQTT_MSG_DISCONNECT 0xe0
        !            53: 
        !            54: #define MQTT_CONNACK_LEN 2
        !            55: #define MQTT_SUBACK_LEN 3
        !            56: #define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */
        !            57: 
        !            58: /*
        !            59:  * Forward declarations.
        !            60:  */
        !            61: 
        !            62: static CURLcode mqtt_do(struct connectdata *conn, bool *done);
        !            63: static CURLcode mqtt_doing(struct connectdata *conn, bool *done);
        !            64: static int mqtt_getsock(struct connectdata *conn, curl_socket_t *sock);
        !            65: static CURLcode mqtt_setup_conn(struct connectdata *conn);
        !            66: 
        !            67: /*
        !            68:  * MQTT protocol handler.
        !            69:  */
        !            70: 
        !            71: const struct Curl_handler Curl_handler_mqtt = {
        !            72:   "MQTT",                             /* scheme */
        !            73:   mqtt_setup_conn,                    /* setup_connection */
        !            74:   mqtt_do,                            /* do_it */
        !            75:   ZERO_NULL,                          /* done */
        !            76:   ZERO_NULL,                          /* do_more */
        !            77:   ZERO_NULL,                          /* connect_it */
        !            78:   ZERO_NULL,                          /* connecting */
        !            79:   mqtt_doing,                         /* doing */
        !            80:   ZERO_NULL,                          /* proto_getsock */
        !            81:   mqtt_getsock,                       /* doing_getsock */
        !            82:   ZERO_NULL,                          /* domore_getsock */
        !            83:   ZERO_NULL,                          /* perform_getsock */
        !            84:   ZERO_NULL,                          /* disconnect */
        !            85:   ZERO_NULL,                          /* readwrite */
        !            86:   ZERO_NULL,                          /* connection_check */
        !            87:   PORT_MQTT,                          /* defport */
        !            88:   CURLPROTO_MQTT,                     /* protocol */
        !            89:   PROTOPT_NONE                        /* flags */
        !            90: };
        !            91: 
        !            92: static CURLcode mqtt_setup_conn(struct connectdata *conn)
        !            93: {
        !            94:   /* allocate the HTTP-specific struct for the Curl_easy, only to survive
        !            95:      during this request */
        !            96:   struct MQTT *mq;
        !            97:   struct Curl_easy *data = conn->data;
        !            98:   DEBUGASSERT(data->req.protop == NULL);
        !            99: 
        !           100:   mq = calloc(1, sizeof(struct MQTT));
        !           101:   if(!mq)
        !           102:     return CURLE_OUT_OF_MEMORY;
        !           103:   data->req.protop = mq;
        !           104:   return CURLE_OK;
        !           105: }
        !           106: 
        !           107: static CURLcode mqtt_send(struct connectdata *conn,
        !           108:                           char *buf, size_t len)
        !           109: {
        !           110:   CURLcode result = CURLE_OK;
        !           111:   curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
        !           112:   struct Curl_easy *data = conn->data;
        !           113:   struct MQTT *mq = data->req.protop;
        !           114:   ssize_t n;
        !           115:   result = Curl_write(conn, sockfd, buf, len, &n);
        !           116:   if(!result && data->set.verbose)
        !           117:     Curl_debug(data, CURLINFO_HEADER_OUT, buf, (size_t)n);
        !           118:   if(len != (size_t)n) {
        !           119:     size_t nsend = len - n;
        !           120:     char *sendleftovers = Curl_memdup(&buf[n], nsend);
        !           121:     if(!sendleftovers)
        !           122:       return CURLE_OUT_OF_MEMORY;
        !           123:     mq->sendleftovers = sendleftovers;
        !           124:     mq->nsend = nsend;
        !           125:   }
        !           126:   return result;
        !           127: }
        !           128: 
        !           129: /* Generic function called by the multi interface to figure out what socket(s)
        !           130:    to wait for and for what actions during the DOING and PROTOCONNECT
        !           131:    states */
        !           132: static int mqtt_getsock(struct connectdata *conn,
        !           133:                         curl_socket_t *sock)
        !           134: {
        !           135:   sock[0] = conn->sock[FIRSTSOCKET];
        !           136:   return GETSOCK_READSOCK(FIRSTSOCKET);
        !           137: }
        !           138: 
        !           139: static CURLcode mqtt_connect(struct connectdata *conn)
        !           140: {
        !           141:   CURLcode result = CURLE_OK;
        !           142:   const size_t client_id_offset = 14;
        !           143:   const size_t packetlen = client_id_offset + MQTT_CLIENTID_LEN;
        !           144:   char client_id[MQTT_CLIENTID_LEN + 1] = "curl";
        !           145:   const size_t curl_len = strlen("curl");
        !           146:   char packet[32] = {
        !           147:     MQTT_MSG_CONNECT,  /* packet type */
        !           148:     0x00,              /* remaining length */
        !           149:     0x00, 0x04,        /* protocol length */
        !           150:     'M','Q','T','T',   /* protocol name */
        !           151:     0x04,              /* protocol level */
        !           152:     0x02,              /* CONNECT flag: CleanSession */
        !           153:     0x00, 0x3c,        /* keep-alive 0 = disabled */
        !           154:     0x00, 0x00         /* payload1 length */
        !           155:   };
        !           156:   packet[1] = (packetlen - 2) & 0x7f;
        !           157:   packet[client_id_offset - 1] = MQTT_CLIENTID_LEN;
        !           158: 
        !           159:   result = Curl_rand_hex(conn->data, (unsigned char *)&client_id[curl_len],
        !           160:                          MQTT_CLIENTID_LEN - curl_len + 1);
        !           161:   memcpy(&packet[client_id_offset], client_id, MQTT_CLIENTID_LEN);
        !           162:   infof(conn->data, "Using client id '%s'\n", client_id);
        !           163:   if(!result)
        !           164:     result = mqtt_send(conn, packet, packetlen);
        !           165:   return result;
        !           166: }
        !           167: 
        !           168: static CURLcode mqtt_disconnect(struct connectdata *conn)
        !           169: {
        !           170:   CURLcode result = CURLE_OK;
        !           171:   result = mqtt_send(conn, (char *)"\xe0\x00", 2);
        !           172:   return result;
        !           173: }
        !           174: 
        !           175: static CURLcode mqtt_verify_connack(struct connectdata *conn)
        !           176: {
        !           177:   CURLcode result;
        !           178:   curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
        !           179:   unsigned char readbuf[MQTT_CONNACK_LEN];
        !           180:   ssize_t nread;
        !           181:   struct Curl_easy *data = conn->data;
        !           182: 
        !           183:   result = Curl_read(conn, sockfd, (char *)readbuf, MQTT_CONNACK_LEN, &nread);
        !           184:   if(result)
        !           185:     goto fail;
        !           186: 
        !           187:   if(data->set.verbose)
        !           188:     Curl_debug(data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread);
        !           189: 
        !           190:   /* fixme */
        !           191:   if(nread < MQTT_CONNACK_LEN) {
        !           192:     result = CURLE_WEIRD_SERVER_REPLY;
        !           193:     goto fail;
        !           194:   }
        !           195: 
        !           196:   /* verify CONNACK */
        !           197:   if(readbuf[0] != 0x00 || readbuf[1] != 0x00) {
        !           198:     failf(data, "Expected %02x%02x but got %02x%02x",
        !           199:           0x00, 0x00, readbuf[0], readbuf[1]);
        !           200:     result = CURLE_WEIRD_SERVER_REPLY;
        !           201:   }
        !           202: 
        !           203: fail:
        !           204:   return result;
        !           205: }
        !           206: 
        !           207: static CURLcode mqtt_get_topic(struct connectdata *conn,
        !           208:                                char **topic, size_t *topiclen)
        !           209: {
        !           210:   CURLcode result = CURLE_OK;
        !           211:   char *path = conn->data->state.up.path;
        !           212: 
        !           213:   if(strlen(path) > 1) {
        !           214:     result = Curl_urldecode(conn->data, path + 1, 0, topic, topiclen, FALSE);
        !           215:   }
        !           216:   else {
        !           217:     failf(conn->data, "Error: No topic specified.");
        !           218:     result = CURLE_URL_MALFORMAT;
        !           219:   }
        !           220:   return result;
        !           221: }
        !           222: 
        !           223: 
        !           224: static int mqtt_encode_len(char *buf, size_t len)
        !           225: {
        !           226:   unsigned char encoded;
        !           227:   int i;
        !           228: 
        !           229:   for(i = 0; (len > 0) && (i<4); i++) {
        !           230:     encoded = len % 0x80;
        !           231:     len /= 0x80;
        !           232:     if(len)
        !           233:       encoded |= 0x80;
        !           234:     buf[i] = encoded;
        !           235:   }
        !           236: 
        !           237:   return i;
        !           238: }
        !           239: 
        !           240: static CURLcode mqtt_subscribe(struct connectdata *conn)
        !           241: {
        !           242:   CURLcode result = CURLE_OK;
        !           243:   char *topic = NULL;
        !           244:   size_t topiclen;
        !           245:   unsigned char *packet = NULL;
        !           246:   size_t packetlen;
        !           247:   char encodedsize[4];
        !           248:   size_t n;
        !           249: 
        !           250:   result = mqtt_get_topic(conn, &topic, &topiclen);
        !           251:   if(result)
        !           252:     goto fail;
        !           253: 
        !           254:   conn->proto.mqtt.packetid++;
        !           255: 
        !           256:   packetlen = topiclen + 5; /* packetid + topic (has a two byte length field)
        !           257:                                + 2 bytes topic length + QoS byte */
        !           258:   n = mqtt_encode_len((char *)encodedsize, packetlen);
        !           259:   packetlen += n + 1; /* add one for the control packet type byte */
        !           260: 
        !           261:   packet = malloc(packetlen);
        !           262:   if(!packet) {
        !           263:     result = CURLE_OUT_OF_MEMORY;
        !           264:     goto fail;
        !           265:   }
        !           266: 
        !           267:   packet[0] = MQTT_MSG_SUBSCRIBE;
        !           268:   memcpy(&packet[1], encodedsize, n);
        !           269:   packet[1 + n] = (conn->proto.mqtt.packetid >> 8) & 0xff;
        !           270:   packet[2 + n] = conn->proto.mqtt.packetid & 0xff;
        !           271:   packet[3 + n] = (topiclen >> 8) & 0xff;
        !           272:   packet[4 + n ] = topiclen & 0xff;
        !           273:   memcpy(&packet[5 + n], topic, topiclen);
        !           274:   packet[5 + n + topiclen] = 0; /* QoS zero */
        !           275: 
        !           276:   result = mqtt_send(conn, (char *)packet, packetlen);
        !           277: 
        !           278: fail:
        !           279:   free(topic);
        !           280:   free(packet);
        !           281:   return result;
        !           282: }
        !           283: 
        !           284: /*
        !           285:  * Called when the first byte was already read.
        !           286:  */
        !           287: static CURLcode mqtt_verify_suback(struct connectdata *conn)
        !           288: {
        !           289:   CURLcode result;
        !           290:   curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
        !           291:   unsigned char readbuf[MQTT_SUBACK_LEN];
        !           292:   ssize_t nread;
        !           293:   struct mqtt_conn *mqtt = &conn->proto.mqtt;
        !           294: 
        !           295:   result = Curl_read(conn, sockfd, (char *)readbuf, MQTT_SUBACK_LEN, &nread);
        !           296:   if(result)
        !           297:     goto fail;
        !           298: 
        !           299:   if(conn->data->set.verbose)
        !           300:     Curl_debug(conn->data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread);
        !           301: 
        !           302:   /* fixme */
        !           303:   if(nread < MQTT_SUBACK_LEN) {
        !           304:     result = CURLE_WEIRD_SERVER_REPLY;
        !           305:     goto fail;
        !           306:   }
        !           307: 
        !           308:   /* verify SUBACK */
        !           309:   if(readbuf[0] != ((mqtt->packetid >> 8) & 0xff) ||
        !           310:      readbuf[1] != (mqtt->packetid & 0xff) ||
        !           311:      readbuf[2] != 0x00)
        !           312:     result = CURLE_WEIRD_SERVER_REPLY;
        !           313: 
        !           314: fail:
        !           315:   return result;
        !           316: }
        !           317: 
        !           318: static CURLcode mqtt_publish(struct connectdata *conn)
        !           319: {
        !           320:   CURLcode result;
        !           321:   char *payload = conn->data->set.postfields;
        !           322:   size_t payloadlen = (size_t)conn->data->set.postfieldsize;
        !           323:   char *topic = NULL;
        !           324:   size_t topiclen;
        !           325:   unsigned char *pkt = NULL;
        !           326:   size_t i = 0;
        !           327:   size_t remaininglength;
        !           328:   size_t encodelen;
        !           329:   char encodedbytes[4];
        !           330: 
        !           331:   result = mqtt_get_topic(conn, &topic, &topiclen);
        !           332:   if(result)
        !           333:     goto fail;
        !           334: 
        !           335:   remaininglength = payloadlen + 2 + topiclen;
        !           336:   encodelen = mqtt_encode_len(encodedbytes, remaininglength);
        !           337: 
        !           338:   /* add the control byte and the encoded remaining length */
        !           339:   pkt = malloc(remaininglength + 1 + encodelen);
        !           340:   if(!pkt) {
        !           341:     result = CURLE_OUT_OF_MEMORY;
        !           342:     goto fail;
        !           343:   }
        !           344: 
        !           345:   /* assemble packet */
        !           346:   pkt[i++] = MQTT_MSG_PUBLISH;
        !           347:   memcpy(&pkt[i], encodedbytes, encodelen);
        !           348:   i += encodelen;
        !           349:   pkt[i++] = (topiclen >> 8) & 0xff;
        !           350:   pkt[i++] = (topiclen & 0xff);
        !           351:   memcpy(&pkt[i], topic, topiclen);
        !           352:   i += topiclen;
        !           353:   memcpy(&pkt[i], payload, payloadlen);
        !           354:   i += payloadlen;
        !           355:   result = mqtt_send(conn, (char *)pkt, i);
        !           356: 
        !           357: fail:
        !           358:   free(pkt);
        !           359:   free(topic);
        !           360:   return result;
        !           361: }
        !           362: 
        !           363: static size_t mqtt_decode_len(unsigned char *buf,
        !           364:                               size_t buflen, size_t *lenbytes)
        !           365: {
        !           366:   size_t len = 0;
        !           367:   size_t mult = 1;
        !           368:   size_t i;
        !           369:   unsigned char encoded = 128;
        !           370: 
        !           371:   for(i = 0; (i < buflen) && (encoded & 128); i++) {
        !           372:     encoded = buf[i];
        !           373:     len += (encoded & 127) * mult;
        !           374:     mult *= 128;
        !           375:   }
        !           376: 
        !           377:   if(lenbytes)
        !           378:     *lenbytes = i;
        !           379: 
        !           380:   return len;
        !           381: }
        !           382: 
        !           383: #ifdef CURLDEBUG
        !           384: static const char *statenames[]={
        !           385:   "MQTT_FIRST",
        !           386:   "MQTT_REMAINING_LENGTH",
        !           387:   "MQTT_CONNACK",
        !           388:   "MQTT_SUBACK",
        !           389:   "MQTT_SUBACK_COMING",
        !           390:   "MQTT_PUBWAIT",
        !           391:   "MQTT_PUB_REMAIN",
        !           392: 
        !           393:   "NOT A STATE"
        !           394: };
        !           395: #endif
        !           396: 
        !           397: /* The only way to change state */
        !           398: static void mqstate(struct connectdata *conn,
        !           399:                     enum mqttstate state,
        !           400:                     enum mqttstate nextstate) /* used if state == FIRST */
        !           401: {
        !           402:   struct mqtt_conn *mqtt = &conn->proto.mqtt;
        !           403: #ifdef CURLDEBUG
        !           404:   infof(conn->data, "%s (from %s) (next is %s)\n",
        !           405:         statenames[state],
        !           406:         statenames[mqtt->state],
        !           407:         (state == MQTT_FIRST)? statenames[nextstate] : "");
        !           408: #endif
        !           409:   mqtt->state = state;
        !           410:   if(state == MQTT_FIRST)
        !           411:     mqtt->nextstate = nextstate;
        !           412: }
        !           413: 
        !           414: 
        !           415: /* for the publish packet */
        !           416: #define MQTT_HEADER_LEN 5    /* max 5 bytes */
        !           417: 
        !           418: static CURLcode mqtt_read_publish(struct connectdata *conn,
        !           419:                                   bool *done)
        !           420: {
        !           421:   CURLcode result = CURLE_OK;
        !           422:   curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
        !           423:   ssize_t nread;
        !           424:   struct Curl_easy *data = conn->data;
        !           425:   unsigned char *pkt = (unsigned char *)data->state.buffer;
        !           426:   size_t remlen;
        !           427:   struct mqtt_conn *mqtt = &conn->proto.mqtt;
        !           428:   struct MQTT *mq = data->req.protop;
        !           429:   unsigned char packet;
        !           430: 
        !           431:   switch(mqtt->state) {
        !           432:   MQTT_SUBACK_COMING:
        !           433:   case MQTT_SUBACK_COMING:
        !           434:     result = mqtt_verify_suback(conn);
        !           435:     if(result)
        !           436:       break;
        !           437: 
        !           438:     mqstate(conn, MQTT_FIRST, MQTT_PUBWAIT);
        !           439:     break;
        !           440: 
        !           441:   case MQTT_SUBACK:
        !           442:   case MQTT_PUBWAIT:
        !           443:     /* we are expecting PUBLISH or SUBACK */
        !           444:     packet = mq->firstbyte & 0xf0;
        !           445:     if(packet == MQTT_MSG_PUBLISH)
        !           446:       mqstate(conn, MQTT_PUB_REMAIN, MQTT_NOSTATE);
        !           447:     else if(packet == MQTT_MSG_SUBACK) {
        !           448:       mqstate(conn, MQTT_SUBACK_COMING, MQTT_NOSTATE);
        !           449:       goto MQTT_SUBACK_COMING;
        !           450:     }
        !           451:     else if(packet == MQTT_MSG_DISCONNECT) {
        !           452:       infof(data, "Got DISCONNECT\n");
        !           453:       *done = TRUE;
        !           454:       goto end;
        !           455:     }
        !           456:     else {
        !           457:       result = CURLE_WEIRD_SERVER_REPLY;
        !           458:       goto end;
        !           459:     }
        !           460: 
        !           461:     /* -- switched state -- */
        !           462:     remlen = mq->remaining_length;
        !           463:     infof(data, "Remaining length: %zd bytes\n", remlen);
        !           464:     Curl_pgrsSetDownloadSize(data, remlen);
        !           465:     data->req.bytecount = 0;
        !           466:     data->req.size = remlen;
        !           467:     mq->npacket = remlen; /* get this many bytes */
        !           468:     /* FALLTHROUGH */
        !           469:   case MQTT_PUB_REMAIN: {
        !           470:     /* read rest of packet, but no more. Cap to buffer size */
        !           471:     struct SingleRequest *k = &data->req;
        !           472:     size_t rest = mq->npacket;
        !           473:     if(rest > (size_t)data->set.buffer_size)
        !           474:       rest = (size_t)data->set.buffer_size;
        !           475:     result = Curl_read(conn, sockfd, (char *)pkt, rest, &nread);
        !           476:     if(result) {
        !           477:       if(CURLE_AGAIN == result) {
        !           478:         infof(data, "EEEE AAAAGAIN\n");
        !           479:       }
        !           480:       goto end;
        !           481:     }
        !           482:     if(!nread) {
        !           483:       infof(data, "server disconnected\n");
        !           484:       result = CURLE_PARTIAL_FILE;
        !           485:       goto end;
        !           486:     }
        !           487:     if(data->set.verbose)
        !           488:       Curl_debug(data, CURLINFO_DATA_IN, (char *)pkt, (size_t)nread);
        !           489: 
        !           490:     mq->npacket -= nread;
        !           491:     k->bytecount += nread;
        !           492:     Curl_pgrsSetDownloadCounter(data, k->bytecount);
        !           493: 
        !           494:     /* if QoS is set, message contains packet id */
        !           495: 
        !           496:     result = Curl_client_write(conn, CLIENTWRITE_BODY, (char *)pkt, nread);
        !           497:     if(result)
        !           498:       goto end;
        !           499: 
        !           500:     if(!mq->npacket)
        !           501:       /* no more PUBLISH payload, back to subscribe wait state */
        !           502:       mqstate(conn, MQTT_FIRST, MQTT_PUBWAIT);
        !           503:     break;
        !           504:   }
        !           505:   default:
        !           506:     DEBUGASSERT(NULL); /* illegal state */
        !           507:     result = CURLE_WEIRD_SERVER_REPLY;
        !           508:     goto end;
        !           509:   }
        !           510:   end:
        !           511:   return result;
        !           512: }
        !           513: 
        !           514: static CURLcode mqtt_do(struct connectdata *conn, bool *done)
        !           515: {
        !           516:   CURLcode result = CURLE_OK;
        !           517:   struct Curl_easy *data = conn->data;
        !           518: 
        !           519:   *done = FALSE; /* unconditionally */
        !           520: 
        !           521:   result = mqtt_connect(conn);
        !           522:   if(result) {
        !           523:     failf(data, "Error %d sending MQTT CONN request", result);
        !           524:     return result;
        !           525:   }
        !           526:   mqstate(conn, MQTT_FIRST, MQTT_CONNACK);
        !           527:   return CURLE_OK;
        !           528: }
        !           529: 
        !           530: static CURLcode mqtt_doing(struct connectdata *conn, bool *done)
        !           531: {
        !           532:   CURLcode result = CURLE_OK;
        !           533:   struct mqtt_conn *mqtt = &conn->proto.mqtt;
        !           534:   struct Curl_easy *data = conn->data;
        !           535:   struct MQTT *mq = data->req.protop;
        !           536:   ssize_t nread;
        !           537:   curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
        !           538:   unsigned char *pkt = (unsigned char *)data->state.buffer;
        !           539:   unsigned char byte;
        !           540: 
        !           541:   *done = FALSE;
        !           542: 
        !           543:   if(mq->nsend) {
        !           544:     /* send the remainder of an outgoing packet */
        !           545:     char *ptr = mq->sendleftovers;
        !           546:     result = mqtt_send(conn, mq->sendleftovers, mq->nsend);
        !           547:     free(ptr);
        !           548:     if(result)
        !           549:       return result;
        !           550:   }
        !           551: 
        !           552:   infof(data, "mqtt_doing: state [%d]\n", (int) mqtt->state);
        !           553:   switch(mqtt->state) {
        !           554:   case MQTT_FIRST:
        !           555:     /* Read the initial byte only */
        !           556:     result = Curl_read(conn, sockfd, (char *)&mq->firstbyte, 1, &nread);
        !           557:     if(result)
        !           558:       break;
        !           559:     if(data->set.verbose)
        !           560:       Curl_debug(data, CURLINFO_HEADER_IN, (char *)&mq->firstbyte, 1);
        !           561:     /* remember the first byte */
        !           562:     mq->npacket = 0;
        !           563:     mqstate(conn, MQTT_REMAINING_LENGTH, MQTT_NOSTATE);
        !           564:     /* FALLTHROUGH */
        !           565:   case MQTT_REMAINING_LENGTH:
        !           566:     do {
        !           567:       result = Curl_read(conn, sockfd, (char *)&byte, 1, &nread);
        !           568:       if(result)
        !           569:         break;
        !           570:       if(data->set.verbose)
        !           571:         Curl_debug(data, CURLINFO_HEADER_IN, (char *)&byte, 1);
        !           572:       pkt[mq->npacket++] = byte;
        !           573:     } while((byte & 0x80) && (mq->npacket < 4));
        !           574:     if(result)
        !           575:       break;
        !           576:     mq->remaining_length = mqtt_decode_len(&pkt[0], mq->npacket, NULL);
        !           577:     mq->npacket = 0;
        !           578:     if(mq->remaining_length) {
        !           579:       mqstate(conn, mqtt->nextstate, MQTT_NOSTATE);
        !           580:       break;
        !           581:     }
        !           582:     mqstate(conn, MQTT_FIRST, MQTT_FIRST);
        !           583: 
        !           584:     if(mq->firstbyte == MQTT_MSG_DISCONNECT) {
        !           585:       infof(data, "Got DISCONNECT\n");
        !           586:       *done = TRUE;
        !           587:     }
        !           588:     break;
        !           589:   case MQTT_CONNACK:
        !           590:     result = mqtt_verify_connack(conn);
        !           591:     if(result)
        !           592:       break;
        !           593: 
        !           594:     if(conn->data->set.httpreq == HTTPREQ_POST) {
        !           595:       result = mqtt_publish(conn);
        !           596:       if(!result) {
        !           597:         result = mqtt_disconnect(conn);
        !           598:         *done = TRUE;
        !           599:       }
        !           600:       mqtt->nextstate = MQTT_FIRST;
        !           601:     }
        !           602:     else {
        !           603:       result = mqtt_subscribe(conn);
        !           604:       if(!result) {
        !           605:         mqstate(conn, MQTT_FIRST, MQTT_SUBACK);
        !           606:       }
        !           607:     }
        !           608:     break;
        !           609: 
        !           610:   case MQTT_SUBACK:
        !           611:   case MQTT_PUBWAIT:
        !           612:   case MQTT_PUB_REMAIN:
        !           613:     result = mqtt_read_publish(conn, done);
        !           614:     break;
        !           615: 
        !           616:   default:
        !           617:     failf(conn->data, "State not handled yet");
        !           618:     *done = TRUE;
        !           619:     break;
        !           620:   }
        !           621: 
        !           622:   if(result == CURLE_AGAIN)
        !           623:     result = CURLE_OK;
        !           624:   return result;
        !           625: }
        !           626: 
        !           627: #endif /* CURL_ENABLE_MQTT */

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>