Annotation of embedaddon/curl/tests/server/mqttd.c, revision 1.1.1.1

1.1       misho       1: /***************************************************************************
                      2:  *                                  _   _ ____  _
                      3:  *  Project                     ___| | | |  _ \| |
                      4:  *                             / __| | | | |_) | |
                      5:  *                            | (__| |_| |  _ <| |___
                      6:  *                             \___|\___/|_| \_\_____|
                      7:  *
                      8:  * Copyright (C) 1998 - 2020, Daniel Stenberg, <daniel@haxx.se>, et al.
                      9:  *
                     10:  * This software is licensed as described in the file COPYING, which
                     11:  * you should have received as part of this distribution. The terms
                     12:  * are also available at https://curl.haxx.se/docs/copyright.html.
                     13:  *
                     14:  * You may opt to use, copy, modify, merge, publish, distribute and/or sell
                     15:  * copies of the Software, and permit persons to whom the Software is
                     16:  * furnished to do so, under the terms of the COPYING file.
                     17:  *
                     18:  * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
                     19:  * KIND, either express or implied.
                     20:  *
                     21:  ***************************************************************************/
                     22: #include "server_setup.h"
                     23: #include <stdlib.h>
                     24: #include <string.h>
                     25: #include "util.h"
                     26: 
                     27: /* Function
                     28:  *
                     29:  * Accepts a TCP connection on a custom port (IPv4 or IPv6).  Speaks MQTT.
                     30:  *
                     31:  * Read commands from FILE (set with --config). The commands control how to
                     32:  * act and is reset to defaults each client TCP connect.
                     33:  *
                     34:  * Config file keywords:
                     35:  *
                     36:  * TODO
                     37:  */
                     38: 
                     39: /* based on sockfilt.c */
                     40: 
                     41: #ifdef HAVE_SIGNAL_H
                     42: #include <signal.h>
                     43: #endif
                     44: #ifdef HAVE_NETINET_IN_H
                     45: #include <netinet/in.h>
                     46: #endif
                     47: #ifdef HAVE_NETINET_IN6_H
                     48: #include <netinet/in6.h>
                     49: #endif
                     50: #ifdef HAVE_ARPA_INET_H
                     51: #include <arpa/inet.h>
                     52: #endif
                     53: #ifdef HAVE_NETDB_H
                     54: #include <netdb.h>
                     55: #endif
                     56: 
                     57: #define ENABLE_CURLX_PRINTF
                     58: /* make the curlx header define all printf() functions to use the curlx_*
                     59:    versions instead */
                     60: #include "curlx.h" /* from the private lib dir */
                     61: #include "getpart.h"
                     62: #include "inet_pton.h"
                     63: #include "util.h"
                     64: #include "server_sockaddr.h"
                     65: #include "warnless.h"
                     66: 
                     67: /* include memdebug.h last */
                     68: #include "memdebug.h"
                     69: 
                     70: #ifdef USE_WINSOCK
                     71: #undef  EINTR
                     72: #define EINTR    4 /* errno.h value */
                     73: #undef  EAGAIN
                     74: #define EAGAIN  11 /* errno.h value */
                     75: #undef  ENOMEM
                     76: #define ENOMEM  12 /* errno.h value */
                     77: #undef  EINVAL
                     78: #define EINVAL  22 /* errno.h value */
                     79: #endif
                     80: 
                     81: #define DEFAULT_PORT 1883 /* MQTT default port */
                     82: 
                     83: #ifndef DEFAULT_LOGFILE
                     84: #define DEFAULT_LOGFILE "log/mqttd.log"
                     85: #endif
                     86: 
                     87: #ifndef DEFAULT_CONFIG
                     88: #define DEFAULT_CONFIG "mqttd.config"
                     89: #endif
                     90: 
                     91: #define MQTT_MSG_CONNECT    0x10
                     92: #define MQTT_MSG_CONNACK    0x20
                     93: #define MQTT_MSG_PUBLISH    0x30
                     94: #define MQTT_MSG_PUBACK     0x40
                     95: #define MQTT_MSG_SUBSCRIBE  0x82
                     96: #define MQTT_MSG_SUBACK     0x90
                     97: #define MQTT_MSG_DISCONNECT 0xe0
                     98: 
                     99: #define MQTT_CONNACK_LEN 4
                    100: #define MQTT_SUBACK_LEN 5
                    101: #define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */
                    102: #define MQTT_HEADER_LEN 5    /* max 5 bytes */
                    103: 
                    104: struct configurable {
                    105:   unsigned char version; /* initial version byte in the request must match
                    106:                             this */
                    107:   bool publish_before_suback;
                    108:   bool short_publish;
                    109:   unsigned char error_connack;
                    110:   int testnum;
                    111: };
                    112: 
                    113: #define REQUEST_DUMP  "log/server.input"
                    114: #define CONFIG_VERSION 5
                    115: 
                    116: static struct configurable config;
                    117: 
                    118: const char *serverlogfile = DEFAULT_LOGFILE;
                    119: static const char *configfile = DEFAULT_CONFIG;
                    120: 
                    121: #ifdef ENABLE_IPV6
                    122: static bool use_ipv6 = FALSE;
                    123: #endif
                    124: static const char *ipv_inuse = "IPv4";
                    125: static unsigned short port = DEFAULT_PORT;
                    126: 
                    127: static void resetdefaults(void)
                    128: {
                    129:   logmsg("Reset to defaults");
                    130:   config.version = CONFIG_VERSION;
                    131:   config.publish_before_suback = FALSE;
                    132:   config.short_publish = FALSE;
                    133:   config.error_connack = 0;
                    134:   config.testnum = 0;
                    135: }
                    136: 
                    137: static unsigned char byteval(char *value)
                    138: {
                    139:   unsigned long num = strtoul(value, NULL, 10);
                    140:   return num & 0xff;
                    141: }
                    142: 
                    143: static void getconfig(void)
                    144: {
                    145:   FILE *fp = fopen(configfile, FOPEN_READTEXT);
                    146:   resetdefaults();
                    147:   if(fp) {
                    148:     char buffer[512];
                    149:     logmsg("parse config file");
                    150:     while(fgets(buffer, sizeof(buffer), fp)) {
                    151:       char key[32];
                    152:       char value[32];
                    153:       if(2 == sscanf(buffer, "%31s %31s", key, value)) {
                    154:         if(!strcmp(key, "version")) {
                    155:           config.version = byteval(value);
                    156:           logmsg("version [%d] set", config.version);
                    157:         }
                    158:         else if(!strcmp(key, "PUBLISH-before-SUBACK")) {
                    159:           logmsg("PUBLISH-before-SUBACK set");
                    160:           config.publish_before_suback = TRUE;
                    161:         }
                    162:         else if(!strcmp(key, "short-PUBLISH")) {
                    163:           logmsg("short-PUBLISH set");
                    164:           config.short_publish = TRUE;
                    165:         }
                    166:         else if(!strcmp(key, "error-CONNACK")) {
                    167:           config.error_connack = byteval(value);
                    168:           logmsg("error-CONNACK = %d", config.error_connack);
                    169:         }
                    170:         else if(!strcmp(key, "Testnum")) {
                    171:           config.testnum = atoi(value);
                    172:           logmsg("testnum = %d", config.testnum);
                    173:         }
                    174:       }
                    175:     }
                    176:     fclose(fp);
                    177:   }
                    178:   else {
                    179:     logmsg("No config file '%s' to read", configfile);
                    180:   }
                    181: }
                    182: 
                    183: static void loghex(unsigned char *buffer, ssize_t len)
                    184: {
                    185:   char data[12000];
                    186:   ssize_t i;
                    187:   unsigned char *ptr = buffer;
                    188:   char *optr = data;
                    189:   ssize_t width = 0;
                    190:   int left = sizeof(data);
                    191: 
                    192:   for(i = 0; i<len && (left >= 0); i++) {
                    193:     msnprintf(optr, left, "%02x", ptr[i]);
                    194:     width += 2;
                    195:     optr += 2;
                    196:     left -= 2;
                    197:   }
                    198:   if(width)
                    199:     logmsg("'%s'", data);
                    200: }
                    201: 
                    202: typedef enum {
                    203:   FROM_CLIENT,
                    204:   FROM_SERVER
                    205: } mqttdir;
                    206: 
                    207: static void logprotocol(mqttdir dir,
                    208:                         const char *prefix, size_t remlen,
                    209:                         FILE *output,
                    210:                         unsigned char *buffer, ssize_t len)
                    211: {
                    212:   char data[12000] = "";
                    213:   ssize_t i;
                    214:   unsigned char *ptr = buffer;
                    215:   char *optr = data;
                    216:   ssize_t width = 0;
                    217:   int left = sizeof(data);
                    218: 
                    219:   for(i = 0; i<len && (left >= 0); i++) {
                    220:     msnprintf(optr, left, "%02x", ptr[i]);
                    221:     width += 2;
                    222:     optr += 2;
                    223:     left -= 2;
                    224:   }
                    225:   fprintf(output, "%s %s %zx %s\n",
                    226:           dir == FROM_CLIENT? "client": "server",
                    227:           prefix, remlen,
                    228:           data);
                    229: }
                    230: 
                    231: 
                    232: /* return 0 on success */
                    233: static int connack(FILE *dump, curl_socket_t fd)
                    234: {
                    235:   unsigned char packet[]={
                    236:     MQTT_MSG_CONNACK, 0x02,
                    237:     0x00, 0x00
                    238:   };
                    239:   ssize_t rc;
                    240: 
                    241:   packet[3] = config.error_connack;
                    242: 
                    243:   rc = swrite(fd, (char *)packet, sizeof(packet));
                    244:   if(rc > 0) {
                    245:     logmsg("WROTE %d bytes [CONNACK]", rc);
                    246:     loghex(packet, rc);
                    247:     logprotocol(FROM_SERVER, "CONNACK", 2, dump, packet, sizeof(packet));
                    248:   }
                    249:   if(rc == sizeof(packet)) {
                    250:     return 0;
                    251:   }
                    252:   return 1;
                    253: }
                    254: 
                    255: /* return 0 on success */
                    256: static int suback(FILE *dump, curl_socket_t fd, unsigned short packetid)
                    257: {
                    258:   unsigned char packet[]={
                    259:     MQTT_MSG_SUBACK, 0x03,
                    260:     0, 0, /* filled in below */
                    261:     0x00
                    262:   };
                    263:   ssize_t rc;
                    264:   packet[2] = (unsigned char)(packetid >> 8);
                    265:   packet[3] = (unsigned char)(packetid & 0xff);
                    266: 
                    267:   rc = swrite(fd, (char *)packet, sizeof(packet));
                    268:   if(rc == sizeof(packet)) {
                    269:     logmsg("WROTE %d bytes [SUBACK]", rc);
                    270:     loghex(packet, rc);
                    271:     logprotocol(FROM_SERVER, "SUBACK", 3, dump, packet, rc);
                    272:     return 0;
                    273:   }
                    274:   return 1;
                    275: }
                    276: 
                    277: #ifdef QOS
                    278: /* return 0 on success */
                    279: static int puback(FILE *dump, curl_socket_t fd, unsigned short packetid)
                    280: {
                    281:   unsigned char packet[]={
                    282:     MQTT_MSG_PUBACK, 0x00,
                    283:     0, 0 /* filled in below */
                    284:   };
                    285:   ssize_t rc;
                    286:   packet[2] = (unsigned char)(packetid >> 8);
                    287:   packet[3] = (unsigned char)(packetid & 0xff);
                    288: 
                    289:   rc = swrite(fd, (char *)packet, sizeof(packet));
                    290:   if(rc == sizeof(packet)) {
                    291:     logmsg("WROTE %d bytes [PUBACK]", rc);
                    292:     loghex(packet, rc);
                    293:     logprotocol(FROM_SERVER, dump, packet, rc);
                    294:     return 0;
                    295:   }
                    296:   logmsg("Failed sending [PUBACK]");
                    297:   return 1;
                    298: }
                    299: #endif
                    300: 
                    301: /* return 0 on success */
                    302: static int disconnect(FILE *dump, curl_socket_t fd)
                    303: {
                    304:   unsigned char packet[]={
                    305:     MQTT_MSG_DISCONNECT, 0x00,
                    306:   };
                    307:   ssize_t rc = swrite(fd, (char *)packet, sizeof(packet));
                    308:   if(rc == sizeof(packet)) {
                    309:     logmsg("WROTE %d bytes [DISCONNECT]", rc);
                    310:     loghex(packet, rc);
                    311:     logprotocol(FROM_SERVER, "DISCONNECT", 0, dump, packet, rc);
                    312:     return 0;
                    313:   }
                    314:   logmsg("Failed sending [DISCONNECT]");
                    315:   return 1;
                    316: }
                    317: 
                    318: 
                    319: 
                    320: /*
                    321:   do
                    322: 
                    323:      encodedByte = X MOD 128
                    324: 
                    325:      X = X DIV 128
                    326: 
                    327:      // if there are more data to encode, set the top bit of this byte
                    328: 
                    329:      if ( X > 0 )
                    330: 
                    331:         encodedByte = encodedByte OR 128
                    332: 
                    333:       endif
                    334: 
                    335:     'output' encodedByte
                    336: 
                    337:   while ( X > 0 )
                    338: 
                    339: */
                    340: 
                    341: /* return number of bytes used */
                    342: static int encode_length(size_t packetlen, char *remlength) /* 4 bytes */
                    343: {
                    344:   int bytes = 0;
                    345:   unsigned char encode;
                    346: 
                    347:   do {
                    348:     encode = packetlen % 0x80;
                    349:     packetlen /= 0x80;
                    350:     if(packetlen)
                    351:       encode |= 0x80;
                    352: 
                    353:     remlength[bytes++] = encode;
                    354: 
                    355:     if(bytes > 3) {
                    356:       logmsg("too large packet!");
                    357:       return 0;
                    358:     }
                    359:   } while(packetlen);
                    360: 
                    361:   return bytes;
                    362: }
                    363: 
                    364: 
                    365: static size_t decode_length(unsigned char *buf,
                    366:                             size_t buflen, size_t *lenbytes)
                    367: {
                    368:   size_t len = 0;
                    369:   size_t mult = 1;
                    370:   size_t i;
                    371:   unsigned char encoded = 0x80;
                    372: 
                    373:   for(i = 0; (i < buflen) && (encoded & 0x80); i++) {
                    374:     encoded = buf[i];
                    375:     len += (encoded & 0x7f) * mult;
                    376:     mult *= 0x80;
                    377:   }
                    378: 
                    379:   if(lenbytes)
                    380:     *lenbytes = i;
                    381: 
                    382:   return len;
                    383: }
                    384: 
                    385: 
                    386: /* return 0 on success */
                    387: static int publish(FILE *dump,
                    388:                    curl_socket_t fd, unsigned short packetid,
                    389:                    char *topic, char *payload, size_t payloadlen)
                    390: {
                    391:   size_t topiclen = strlen(topic);
                    392:   unsigned char *packet;
                    393:   size_t payloadindex;
                    394:   ssize_t remaininglength = topiclen + 2 + payloadlen;
                    395:   ssize_t packetlen;
                    396:   ssize_t sendamount;
                    397:   ssize_t rc;
                    398:   char rembuffer[4];
                    399:   int encodedlen;
                    400: 
                    401:   encodedlen = encode_length(remaininglength, rembuffer);
                    402: 
                    403:   /* one packet type byte (possibly two more for packetid) */
                    404:   packetlen = remaininglength + encodedlen + 1;
                    405:   packet = malloc(packetlen);
                    406:   if(!packet)
                    407:     return 1;
                    408: 
                    409:   packet[0] = MQTT_MSG_PUBLISH; /* TODO: set QoS? */
                    410:   memcpy(&packet[1], rembuffer, encodedlen);
                    411: 
                    412:   (void)packetid;
                    413:   /* packet_id if QoS is set */
                    414: 
                    415:   packet[1 + encodedlen] = (unsigned char)(topiclen >> 8);
                    416:   packet[2 + encodedlen] = (unsigned char)(topiclen & 0xff);
                    417:   memcpy(&packet[3 + encodedlen], topic, topiclen);
                    418: 
                    419:   payloadindex = 3 + topiclen + encodedlen;
                    420:   memcpy(&packet[payloadindex], payload, payloadlen);
                    421: 
                    422:   sendamount = packetlen;
                    423:   if(config.short_publish)
                    424:     sendamount -= 2;
                    425: 
                    426:   rc = swrite(fd, (char *)packet, sendamount);
                    427:   if(rc > 0) {
                    428:     logmsg("WROTE %d bytes [PUBLISH]", rc);
                    429:     loghex(packet, rc);
                    430:     logprotocol(FROM_SERVER, "PUBLISH", remaininglength, dump, packet, rc);
                    431:   }
                    432:   if(rc == packetlen)
                    433:     return 0;
                    434:   return 1;
                    435: }
                    436: 
                    437: #define MAX_TOPIC_LENGTH 65535
                    438: #define MAX_CLIENT_ID_LENGTH 32
                    439: 
                    440: static char topic[MAX_TOPIC_LENGTH + 1];
                    441: 
                    442: static int fixedheader(curl_socket_t fd,
                    443:                        unsigned char *bytep,
                    444:                        size_t *remaining_lengthp,
                    445:                        size_t *remaining_length_bytesp)
                    446: {
                    447:   /* get the fixed header */
                    448:   unsigned char buffer[10];
                    449: 
                    450:   /* get the first two bytes */
                    451:   ssize_t rc = sread(fd, (char *)buffer, 2);
                    452:   int i;
                    453:   if(rc < 2) {
                    454:     logmsg("READ %d bytes [SHORT!]", rc);
                    455:     return 1; /* fail */
                    456:   }
                    457:   logmsg("READ %d bytes", rc);
                    458:   loghex(buffer, rc);
                    459:   *bytep = buffer[0];
                    460: 
                    461:   /* if the length byte has the top bit set, get the next one too */
                    462:   i = 1;
                    463:   while(buffer[i] & 0x80) {
                    464:     i++;
                    465:     rc = sread(fd, (char *)&buffer[i], 1);
                    466:     if(rc != 1) {
                    467:       logmsg("Remaining Length broken");
                    468:       return 1;
                    469:     }
                    470:   }
                    471:   *remaining_lengthp = decode_length(&buffer[1], i, remaining_length_bytesp);
                    472:   logmsg("Remaining Length: %ld [%d bytes]", (long) *remaining_lengthp,
                    473:          *remaining_length_bytesp);
                    474:   return 0;
                    475: }
                    476: 
                    477: static curl_socket_t mqttit(curl_socket_t fd)
                    478: {
                    479:   unsigned char buffer[10*1024];
                    480:   ssize_t rc;
                    481:   unsigned char byte;
                    482:   unsigned short packet_id;
                    483:   size_t payload_len;
                    484:   unsigned int topic_len;
                    485:   size_t remaining_length = 0;
                    486:   size_t bytes = 0; /* remaining length field size in bytes */
                    487:   char client_id[MAX_CLIENT_ID_LENGTH];
                    488:   long testno;
                    489: 
                    490:   static const char protocol[7] = {
                    491:     0x00, 0x04,       /* protocol length */
                    492:     'M','Q','T','T',  /* protocol name */
                    493:     0x04              /* protocol level */
                    494:   };
                    495:   FILE *dump = fopen(REQUEST_DUMP, "ab");
                    496:   if(!dump)
                    497:     goto end;
                    498: 
                    499:   getconfig();
                    500: 
                    501:   testno = config.testnum;
                    502: 
                    503:   if(testno)
                    504:     logmsg("Found test number %ld", testno);
                    505: 
                    506:   do {
                    507:     /* get the fixed header */
                    508:     rc = fixedheader(fd, &byte, &remaining_length, &bytes);
                    509:     if(rc)
                    510:       break;
                    511:     if(remaining_length) {
                    512:       rc = sread(fd, (char *)buffer, remaining_length);
                    513:       if(rc > 0) {
                    514:         logmsg("READ %d bytes", rc);
                    515:         loghex(buffer, rc);
                    516:       }
                    517:     }
                    518: 
                    519:     if(byte == MQTT_MSG_CONNECT) {
                    520:       logprotocol(FROM_CLIENT, "CONNECT", remaining_length,
                    521:                   dump, buffer, rc);
                    522: 
                    523:       if(memcmp(protocol, buffer, sizeof(protocol))) {
                    524:         logmsg("Protocol preamble mismatch");
                    525:         goto end;
                    526:       }
                    527:       /* ignore the connect flag byte and two keepalive bytes */
                    528: 
                    529:       payload_len = (buffer[10] << 8) | buffer[11];
                    530:       if((ssize_t)payload_len != (rc - 12)) {
                    531:         logmsg("Payload length mismatch, expected %x got %x",
                    532:                rc - 12, payload_len);
                    533:         goto end;
                    534:       }
                    535:       else if((payload_len + 1) > MAX_CLIENT_ID_LENGTH) {
                    536:         logmsg("Too large client id");
                    537:         goto end;
                    538:       }
                    539:       memcpy(client_id, &buffer[14], payload_len);
                    540:       client_id[payload_len] = 0;
                    541: 
                    542:       logmsg("MQTT client connect accepted: %s", client_id);
                    543: 
                    544:       /* The first packet sent from the Server to the Client MUST be a
                    545:          CONNACK Packet */
                    546: 
                    547:       if(connack(dump, fd)) {
                    548:         logmsg("failed sending CONNACK");
                    549:         goto end;
                    550:       }
                    551:     }
                    552:     else if(byte == MQTT_MSG_SUBSCRIBE) {
                    553:       FILE *stream;
                    554:       int error;
                    555:       char *data;
                    556:       size_t datalen;
                    557:       logprotocol(FROM_CLIENT, "SUBSCRIBE", remaining_length,
                    558:                   dump, buffer, rc);
                    559:       logmsg("Incoming SUBSCRIBE");
                    560: 
                    561:       if(rc < 6) {
                    562:         logmsg("Too small SUBSCRIBE");
                    563:         goto end;
                    564:       }
                    565: 
                    566:       /* two bytes packet id */
                    567:       packet_id = (unsigned short)((buffer[0] << 8) | buffer[1]);
                    568: 
                    569:       /* two bytes topic length */
                    570:       topic_len = (buffer[2] << 8) | buffer[3];
                    571:       if(topic_len != (remaining_length - 5)) {
                    572:         logmsg("Wrong topic length, got %d expected %d",
                    573:                topic_len, remaining_length - 5);
                    574:         goto end;
                    575:       }
                    576:       memcpy(topic, &buffer[4], topic_len);
                    577:       topic[topic_len] = 0;
                    578: 
                    579:       /* there's a QoS byte (two bits) after the topic */
                    580: 
                    581:       logmsg("SUBSCRIBE to '%s' [%d]", topic, packet_id);
                    582:       stream = test2fopen(testno);
                    583:       error = getpart(&data, &datalen, "reply", "data", stream);
                    584:       if(!error) {
                    585:         if(!config.publish_before_suback) {
                    586:           if(suback(dump, fd, packet_id)) {
                    587:             logmsg("failed sending SUBACK");
                    588:             goto end;
                    589:           }
                    590:         }
                    591:         if(publish(dump, fd, packet_id, topic, data, datalen)) {
                    592:           logmsg("PUBLISH failed");
                    593:           goto end;
                    594:         }
                    595:         if(config.publish_before_suback) {
                    596:           if(suback(dump, fd, packet_id)) {
                    597:             logmsg("failed sending SUBACK");
                    598:             goto end;
                    599:           }
                    600:         }
                    601:       }
                    602:       else {
                    603:         char *def = (char *)"this is random payload yes yes it is";
                    604:         publish(dump, fd, packet_id, topic, def, strlen(def));
                    605:       }
                    606:       disconnect(dump, fd);
                    607:     }
                    608:     else if((byte & 0xf0) == (MQTT_MSG_PUBLISH & 0xf0)) {
                    609:       size_t topiclen;
                    610: 
                    611:       logmsg("Incoming PUBLISH");
                    612:       logprotocol(FROM_CLIENT, "PUBLISH", remaining_length,
                    613:                   dump, buffer, rc);
                    614: 
                    615:       topiclen = (buffer[1 + bytes] << 8) | buffer[2 + bytes];
                    616:       logmsg("Got %d bytes topic", topiclen);
                    617:       /* TODO: verify topiclen */
                    618: 
                    619: #ifdef QOS
                    620:       /* TODO: handle packetid if there is one. Send puback if QoS > 0 */
                    621:       puback(dump, fd, 0);
                    622: #endif
                    623:       /* expect a disconnect here */
                    624:       /* get the request */
                    625:       rc = sread(fd, (char *)&buffer[0], 2);
                    626: 
                    627:       logmsg("READ %d bytes [DISCONNECT]", rc);
                    628:       loghex(buffer, rc);
                    629:       logprotocol(FROM_CLIENT, "DISCONNECT", 0, dump, buffer, rc);
                    630:       goto end;
                    631:     }
                    632:     else {
                    633:       /* not supported (yet) */
                    634:       goto end;
                    635:     }
                    636:   } while(1);
                    637: 
                    638:   end:
                    639:   fclose(dump);
                    640:   return CURL_SOCKET_BAD;
                    641: }
                    642: 
                    643: /*
                    644:   sockfdp is a pointer to an established stream or CURL_SOCKET_BAD
                    645: 
                    646:   if sockfd is CURL_SOCKET_BAD, listendfd is a listening socket we must
                    647:   accept()
                    648: */
                    649: static bool incoming(curl_socket_t listenfd)
                    650: {
                    651:   fd_set fds_read;
                    652:   fd_set fds_write;
                    653:   fd_set fds_err;
                    654:   int clients = 0; /* connected clients */
                    655: 
                    656:   if(got_exit_signal) {
                    657:     logmsg("signalled to die, exiting...");
                    658:     return FALSE;
                    659:   }
                    660: 
                    661: #ifdef HAVE_GETPPID
                    662:   /* As a last resort, quit if socks5 process becomes orphan. */
                    663:   if(getppid() <= 1) {
                    664:     logmsg("process becomes orphan, exiting");
                    665:     return FALSE;
                    666:   }
                    667: #endif
                    668: 
                    669:   do {
                    670:     ssize_t rc;
                    671:     int error = 0;
                    672:     curl_socket_t sockfd = listenfd;
                    673:     int maxfd = (int)sockfd;
                    674: 
                    675:     FD_ZERO(&fds_read);
                    676:     FD_ZERO(&fds_write);
                    677:     FD_ZERO(&fds_err);
                    678: 
                    679:     /* there's always a socket to wait for */
                    680:     FD_SET(sockfd, &fds_read);
                    681: 
                    682:     do {
                    683:       /* select() blocking behavior call on blocking descriptors please */
                    684:       rc = select(maxfd + 1, &fds_read, &fds_write, &fds_err, NULL);
                    685:       if(got_exit_signal) {
                    686:         logmsg("signalled to die, exiting...");
                    687:         return FALSE;
                    688:       }
                    689:     } while((rc == -1) && ((error = SOCKERRNO) == EINTR));
                    690: 
                    691:     if(rc < 0) {
                    692:       logmsg("select() failed with error: (%d) %s",
                    693:              error, strerror(error));
                    694:       return FALSE;
                    695:     }
                    696: 
                    697:     if(FD_ISSET(sockfd, &fds_read)) {
                    698:       curl_socket_t newfd = accept(sockfd, NULL, NULL);
                    699:       if(CURL_SOCKET_BAD == newfd) {
                    700:         error = SOCKERRNO;
                    701:         logmsg("accept(%d, NULL, NULL) failed with error: (%d) %s",
                    702:                sockfd, error, strerror(error));
                    703:       }
                    704:       else {
                    705:         logmsg("====> Client connect, fd %d. Read config from %s",
                    706:                newfd, configfile);
                    707:         set_advisor_read_lock(SERVERLOGS_LOCK);
                    708:         (void)mqttit(newfd); /* until done */
                    709:         clear_advisor_read_lock(SERVERLOGS_LOCK);
                    710: 
                    711:         logmsg("====> Client disconnect");
                    712:         sclose(newfd);
                    713:       }
                    714:     }
                    715:   } while(clients);
                    716: 
                    717:   return TRUE;
                    718: }
                    719: 
                    720: static curl_socket_t sockdaemon(curl_socket_t sock,
                    721:                                 unsigned short *listenport)
                    722: {
                    723:   /* passive daemon style */
                    724:   srvr_sockaddr_union_t listener;
                    725:   int flag;
                    726:   int rc;
                    727:   int totdelay = 0;
                    728:   int maxretr = 10;
                    729:   int delay = 20;
                    730:   int attempt = 0;
                    731:   int error = 0;
                    732: 
                    733:   do {
                    734:     attempt++;
                    735:     flag = 1;
                    736:     rc = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
                    737:          (void *)&flag, sizeof(flag));
                    738:     if(rc) {
                    739:       error = SOCKERRNO;
                    740:       logmsg("setsockopt(SO_REUSEADDR) failed with error: (%d) %s",
                    741:              error, strerror(error));
                    742:       if(maxretr) {
                    743:         rc = wait_ms(delay);
                    744:         if(rc) {
                    745:           /* should not happen */
                    746:           logmsg("wait_ms() failed with error: %d", rc);
                    747:           sclose(sock);
                    748:           return CURL_SOCKET_BAD;
                    749:         }
                    750:         if(got_exit_signal) {
                    751:           logmsg("signalled to die, exiting...");
                    752:           sclose(sock);
                    753:           return CURL_SOCKET_BAD;
                    754:         }
                    755:         totdelay += delay;
                    756:         delay *= 2; /* double the sleep for next attempt */
                    757:       }
                    758:     }
                    759:   } while(rc && maxretr--);
                    760: 
                    761:   if(rc) {
                    762:     logmsg("setsockopt(SO_REUSEADDR) failed %d times in %d ms. Error: (%d) %s",
                    763:            attempt, totdelay, error, strerror(error));
                    764:     logmsg("Continuing anyway...");
                    765:   }
                    766: 
                    767:   /* When the specified listener port is zero, it is actually a
                    768:      request to let the system choose a non-zero available port. */
                    769: 
                    770: #ifdef ENABLE_IPV6
                    771:   if(!use_ipv6) {
                    772: #endif
                    773:     memset(&listener.sa4, 0, sizeof(listener.sa4));
                    774:     listener.sa4.sin_family = AF_INET;
                    775:     listener.sa4.sin_addr.s_addr = INADDR_ANY;
                    776:     listener.sa4.sin_port = htons(*listenport);
                    777:     rc = bind(sock, &listener.sa, sizeof(listener.sa4));
                    778: #ifdef ENABLE_IPV6
                    779:   }
                    780:   else {
                    781:     memset(&listener.sa6, 0, sizeof(listener.sa6));
                    782:     listener.sa6.sin6_family = AF_INET6;
                    783:     listener.sa6.sin6_addr = in6addr_any;
                    784:     listener.sa6.sin6_port = htons(*listenport);
                    785:     rc = bind(sock, &listener.sa, sizeof(listener.sa6));
                    786:   }
                    787: #endif /* ENABLE_IPV6 */
                    788:   if(rc) {
                    789:     error = SOCKERRNO;
                    790:     logmsg("Error binding socket on port %hu: (%d) %s",
                    791:            *listenport, error, strerror(error));
                    792:     sclose(sock);
                    793:     return CURL_SOCKET_BAD;
                    794:   }
                    795: 
                    796:   if(!*listenport) {
                    797:     /* The system was supposed to choose a port number, figure out which
                    798:        port we actually got and update the listener port value with it. */
                    799:     curl_socklen_t la_size;
                    800:     srvr_sockaddr_union_t localaddr;
                    801: #ifdef ENABLE_IPV6
                    802:     if(!use_ipv6)
                    803: #endif
                    804:       la_size = sizeof(localaddr.sa4);
                    805: #ifdef ENABLE_IPV6
                    806:     else
                    807:       la_size = sizeof(localaddr.sa6);
                    808: #endif
                    809:     memset(&localaddr.sa, 0, (size_t)la_size);
                    810:     if(getsockname(sock, &localaddr.sa, &la_size) < 0) {
                    811:       error = SOCKERRNO;
                    812:       logmsg("getsockname() failed with error: (%d) %s",
                    813:              error, strerror(error));
                    814:       sclose(sock);
                    815:       return CURL_SOCKET_BAD;
                    816:     }
                    817:     switch(localaddr.sa.sa_family) {
                    818:     case AF_INET:
                    819:       *listenport = ntohs(localaddr.sa4.sin_port);
                    820:       break;
                    821: #ifdef ENABLE_IPV6
                    822:     case AF_INET6:
                    823:       *listenport = ntohs(localaddr.sa6.sin6_port);
                    824:       break;
                    825: #endif
                    826:     default:
                    827:       break;
                    828:     }
                    829:     if(!*listenport) {
                    830:       /* Real failure, listener port shall not be zero beyond this point. */
                    831:       logmsg("Apparently getsockname() succeeded, with listener port zero.");
                    832:       logmsg("A valid reason for this failure is a binary built without");
                    833:       logmsg("proper network library linkage. This might not be the only");
                    834:       logmsg("reason, but double check it before anything else.");
                    835:       sclose(sock);
                    836:       return CURL_SOCKET_BAD;
                    837:     }
                    838:   }
                    839: 
                    840:   /* start accepting connections */
                    841:   rc = listen(sock, 5);
                    842:   if(0 != rc) {
                    843:     error = SOCKERRNO;
                    844:     logmsg("listen(%d, 5) failed with error: (%d) %s",
                    845:            sock, error, strerror(error));
                    846:     sclose(sock);
                    847:     return CURL_SOCKET_BAD;
                    848:   }
                    849: 
                    850:   return sock;
                    851: }
                    852: 
                    853: 
                    854: int main(int argc, char *argv[])
                    855: {
                    856:   curl_socket_t sock = CURL_SOCKET_BAD;
                    857:   curl_socket_t msgsock = CURL_SOCKET_BAD;
                    858:   int wrotepidfile = 0;
                    859:   int wroteportfile = 0;
                    860:   const char *pidname = ".mqttd.pid";
                    861:   const char *portname = ".mqttd.port";
                    862:   bool juggle_again;
                    863:   int error;
                    864:   int arg = 1;
                    865: 
                    866:   while(argc>arg) {
                    867:     if(!strcmp("--version", argv[arg])) {
                    868:       printf("mqttd IPv4%s\n",
                    869: #ifdef ENABLE_IPV6
                    870:              "/IPv6"
                    871: #else
                    872:              ""
                    873: #endif
                    874:              );
                    875:       return 0;
                    876:     }
                    877:     else if(!strcmp("--pidfile", argv[arg])) {
                    878:       arg++;
                    879:       if(argc>arg)
                    880:         pidname = argv[arg++];
                    881:     }
                    882:     else if(!strcmp("--portfile", argv[arg])) {
                    883:       arg++;
                    884:       if(argc>arg)
                    885:         portname = argv[arg++];
                    886:     }
                    887:     else if(!strcmp("--config", argv[arg])) {
                    888:       arg++;
                    889:       if(argc>arg)
                    890:         configfile = argv[arg++];
                    891:     }
                    892:     else if(!strcmp("--logfile", argv[arg])) {
                    893:       arg++;
                    894:       if(argc>arg)
                    895:         serverlogfile = argv[arg++];
                    896:     }
                    897:     else if(!strcmp("--ipv6", argv[arg])) {
                    898: #ifdef ENABLE_IPV6
                    899:       ipv_inuse = "IPv6";
                    900:       use_ipv6 = TRUE;
                    901: #endif
                    902:       arg++;
                    903:     }
                    904:     else if(!strcmp("--ipv4", argv[arg])) {
                    905:       /* for completeness, we support this option as well */
                    906: #ifdef ENABLE_IPV6
                    907:       ipv_inuse = "IPv4";
                    908:       use_ipv6 = FALSE;
                    909: #endif
                    910:       arg++;
                    911:     }
                    912:     else if(!strcmp("--port", argv[arg])) {
                    913:       arg++;
                    914:       if(argc>arg) {
                    915:         char *endptr;
                    916:         unsigned long ulnum = strtoul(argv[arg], &endptr, 10);
                    917:         if((endptr != argv[arg] + strlen(argv[arg])) ||
                    918:            ((ulnum != 0UL) && ((ulnum < 1025UL) || (ulnum > 65535UL)))) {
                    919:           fprintf(stderr, "mqttd: invalid --port argument (%s)\n",
                    920:                   argv[arg]);
                    921:           return 0;
                    922:         }
                    923:         port = curlx_ultous(ulnum);
                    924:         arg++;
                    925:       }
                    926:     }
                    927:     else {
                    928:       puts("Usage: mqttd [option]\n"
                    929:            " --config [file]\n"
                    930:            " --version\n"
                    931:            " --logfile [file]\n"
                    932:            " --pidfile [file]\n"
                    933:            " --ipv4\n"
                    934:            " --ipv6\n"
                    935:            " --port [port]\n");
                    936:       return 0;
                    937:     }
                    938:   }
                    939: 
                    940: #ifdef WIN32
                    941:   win32_init();
                    942:   atexit(win32_cleanup);
                    943: 
                    944:   setmode(fileno(stdin), O_BINARY);
                    945:   setmode(fileno(stdout), O_BINARY);
                    946:   setmode(fileno(stderr), O_BINARY);
                    947: #endif
                    948: 
                    949:   install_signal_handlers(FALSE);
                    950: 
                    951: #ifdef ENABLE_IPV6
                    952:   if(!use_ipv6)
                    953: #endif
                    954:     sock = socket(AF_INET, SOCK_STREAM, 0);
                    955: #ifdef ENABLE_IPV6
                    956:   else
                    957:     sock = socket(AF_INET6, SOCK_STREAM, 0);
                    958: #endif
                    959: 
                    960:   if(CURL_SOCKET_BAD == sock) {
                    961:     error = SOCKERRNO;
                    962:     logmsg("Error creating socket: (%d) %s",
                    963:            error, strerror(error));
                    964:     goto mqttd_cleanup;
                    965:   }
                    966: 
                    967:   {
                    968:     /* passive daemon style */
                    969:     sock = sockdaemon(sock, &port);
                    970:     if(CURL_SOCKET_BAD == sock) {
                    971:       goto mqttd_cleanup;
                    972:     }
                    973:     msgsock = CURL_SOCKET_BAD; /* no stream socket yet */
                    974:   }
                    975: 
                    976:   logmsg("Running %s version", ipv_inuse);
                    977:   logmsg("Listening on port %hu", port);
                    978: 
                    979:   wrotepidfile = write_pidfile(pidname);
                    980:   if(!wrotepidfile) {
                    981:     goto mqttd_cleanup;
                    982:   }
                    983: 
                    984:   wroteportfile = write_portfile(portname, (int)port);
                    985:   if(!wroteportfile) {
                    986:     goto mqttd_cleanup;
                    987:   }
                    988: 
                    989:   do {
                    990:     juggle_again = incoming(sock);
                    991:   } while(juggle_again);
                    992: 
                    993: mqttd_cleanup:
                    994: 
                    995:   if((msgsock != sock) && (msgsock != CURL_SOCKET_BAD))
                    996:     sclose(msgsock);
                    997: 
                    998:   if(sock != CURL_SOCKET_BAD)
                    999:     sclose(sock);
                   1000: 
                   1001:   if(wrotepidfile)
                   1002:     unlink(pidname);
                   1003: 
                   1004:   restore_signal_handlers(FALSE);
                   1005: 
                   1006:   if(got_exit_signal) {
                   1007:     logmsg("============> mqttd exits with signal (%d)", exit_signal);
                   1008:     /*
                   1009:      * To properly set the return status of the process we
                   1010:      * must raise the same signal SIGINT or SIGTERM that we
                   1011:      * caught and let the old handler take care of it.
                   1012:      */
                   1013:     raise(exit_signal);
                   1014:   }
                   1015: 
                   1016:   logmsg("============> mqttd quits");
                   1017:   return 0;
                   1018: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>