Annotation of embedaddon/curl/tests/server/mqttd.c, revision 1.1

1.1     ! misho       1: /***************************************************************************
        !             2:  *                                  _   _ ____  _
        !             3:  *  Project                     ___| | | |  _ \| |
        !             4:  *                             / __| | | | |_) | |
        !             5:  *                            | (__| |_| |  _ <| |___
        !             6:  *                             \___|\___/|_| \_\_____|
        !             7:  *
        !             8:  * Copyright (C) 1998 - 2020, Daniel Stenberg, <daniel@haxx.se>, et al.
        !             9:  *
        !            10:  * This software is licensed as described in the file COPYING, which
        !            11:  * you should have received as part of this distribution. The terms
        !            12:  * are also available at https://curl.haxx.se/docs/copyright.html.
        !            13:  *
        !            14:  * You may opt to use, copy, modify, merge, publish, distribute and/or sell
        !            15:  * copies of the Software, and permit persons to whom the Software is
        !            16:  * furnished to do so, under the terms of the COPYING file.
        !            17:  *
        !            18:  * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
        !            19:  * KIND, either express or implied.
        !            20:  *
        !            21:  ***************************************************************************/
        !            22: #include "server_setup.h"
        !            23: #include <stdlib.h>
        !            24: #include <string.h>
        !            25: #include "util.h"
        !            26: 
        !            27: /* Function
        !            28:  *
        !            29:  * Accepts a TCP connection on a custom port (IPv4 or IPv6).  Speaks MQTT.
        !            30:  *
        !            31:  * Read commands from FILE (set with --config). The commands control how to
        !            32:  * act and is reset to defaults each client TCP connect.
        !            33:  *
        !            34:  * Config file keywords:
        !            35:  *
        !            36:  * TODO
        !            37:  */
        !            38: 
        !            39: /* based on sockfilt.c */
        !            40: 
        !            41: #ifdef HAVE_SIGNAL_H
        !            42: #include <signal.h>
        !            43: #endif
        !            44: #ifdef HAVE_NETINET_IN_H
        !            45: #include <netinet/in.h>
        !            46: #endif
        !            47: #ifdef HAVE_NETINET_IN6_H
        !            48: #include <netinet/in6.h>
        !            49: #endif
        !            50: #ifdef HAVE_ARPA_INET_H
        !            51: #include <arpa/inet.h>
        !            52: #endif
        !            53: #ifdef HAVE_NETDB_H
        !            54: #include <netdb.h>
        !            55: #endif
        !            56: 
        !            57: #define ENABLE_CURLX_PRINTF
        !            58: /* make the curlx header define all printf() functions to use the curlx_*
        !            59:    versions instead */
        !            60: #include "curlx.h" /* from the private lib dir */
        !            61: #include "getpart.h"
        !            62: #include "inet_pton.h"
        !            63: #include "util.h"
        !            64: #include "server_sockaddr.h"
        !            65: #include "warnless.h"
        !            66: 
        !            67: /* include memdebug.h last */
        !            68: #include "memdebug.h"
        !            69: 
        !            70: #ifdef USE_WINSOCK
        !            71: #undef  EINTR
        !            72: #define EINTR    4 /* errno.h value */
        !            73: #undef  EAGAIN
        !            74: #define EAGAIN  11 /* errno.h value */
        !            75: #undef  ENOMEM
        !            76: #define ENOMEM  12 /* errno.h value */
        !            77: #undef  EINVAL
        !            78: #define EINVAL  22 /* errno.h value */
        !            79: #endif
        !            80: 
        !            81: #define DEFAULT_PORT 1883 /* MQTT default port */
        !            82: 
        !            83: #ifndef DEFAULT_LOGFILE
        !            84: #define DEFAULT_LOGFILE "log/mqttd.log"
        !            85: #endif
        !            86: 
        !            87: #ifndef DEFAULT_CONFIG
        !            88: #define DEFAULT_CONFIG "mqttd.config"
        !            89: #endif
        !            90: 
        !            91: #define MQTT_MSG_CONNECT    0x10
        !            92: #define MQTT_MSG_CONNACK    0x20
        !            93: #define MQTT_MSG_PUBLISH    0x30
        !            94: #define MQTT_MSG_PUBACK     0x40
        !            95: #define MQTT_MSG_SUBSCRIBE  0x82
        !            96: #define MQTT_MSG_SUBACK     0x90
        !            97: #define MQTT_MSG_DISCONNECT 0xe0
        !            98: 
        !            99: #define MQTT_CONNACK_LEN 4
        !           100: #define MQTT_SUBACK_LEN 5
        !           101: #define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */
        !           102: #define MQTT_HEADER_LEN 5    /* max 5 bytes */
        !           103: 
        !           104: struct configurable {
        !           105:   unsigned char version; /* initial version byte in the request must match
        !           106:                             this */
        !           107:   bool publish_before_suback;
        !           108:   bool short_publish;
        !           109:   unsigned char error_connack;
        !           110:   int testnum;
        !           111: };
        !           112: 
        !           113: #define REQUEST_DUMP  "log/server.input"
        !           114: #define CONFIG_VERSION 5
        !           115: 
        !           116: static struct configurable config;
        !           117: 
        !           118: const char *serverlogfile = DEFAULT_LOGFILE;
        !           119: static const char *configfile = DEFAULT_CONFIG;
        !           120: 
        !           121: #ifdef ENABLE_IPV6
        !           122: static bool use_ipv6 = FALSE;
        !           123: #endif
        !           124: static const char *ipv_inuse = "IPv4";
        !           125: static unsigned short port = DEFAULT_PORT;
        !           126: 
        !           127: static void resetdefaults(void)
        !           128: {
        !           129:   logmsg("Reset to defaults");
        !           130:   config.version = CONFIG_VERSION;
        !           131:   config.publish_before_suback = FALSE;
        !           132:   config.short_publish = FALSE;
        !           133:   config.error_connack = 0;
        !           134:   config.testnum = 0;
        !           135: }
        !           136: 
        !           137: static unsigned char byteval(char *value)
        !           138: {
        !           139:   unsigned long num = strtoul(value, NULL, 10);
        !           140:   return num & 0xff;
        !           141: }
        !           142: 
        !           143: static void getconfig(void)
        !           144: {
        !           145:   FILE *fp = fopen(configfile, FOPEN_READTEXT);
        !           146:   resetdefaults();
        !           147:   if(fp) {
        !           148:     char buffer[512];
        !           149:     logmsg("parse config file");
        !           150:     while(fgets(buffer, sizeof(buffer), fp)) {
        !           151:       char key[32];
        !           152:       char value[32];
        !           153:       if(2 == sscanf(buffer, "%31s %31s", key, value)) {
        !           154:         if(!strcmp(key, "version")) {
        !           155:           config.version = byteval(value);
        !           156:           logmsg("version [%d] set", config.version);
        !           157:         }
        !           158:         else if(!strcmp(key, "PUBLISH-before-SUBACK")) {
        !           159:           logmsg("PUBLISH-before-SUBACK set");
        !           160:           config.publish_before_suback = TRUE;
        !           161:         }
        !           162:         else if(!strcmp(key, "short-PUBLISH")) {
        !           163:           logmsg("short-PUBLISH set");
        !           164:           config.short_publish = TRUE;
        !           165:         }
        !           166:         else if(!strcmp(key, "error-CONNACK")) {
        !           167:           config.error_connack = byteval(value);
        !           168:           logmsg("error-CONNACK = %d", config.error_connack);
        !           169:         }
        !           170:         else if(!strcmp(key, "Testnum")) {
        !           171:           config.testnum = atoi(value);
        !           172:           logmsg("testnum = %d", config.testnum);
        !           173:         }
        !           174:       }
        !           175:     }
        !           176:     fclose(fp);
        !           177:   }
        !           178:   else {
        !           179:     logmsg("No config file '%s' to read", configfile);
        !           180:   }
        !           181: }
        !           182: 
        !           183: static void loghex(unsigned char *buffer, ssize_t len)
        !           184: {
        !           185:   char data[12000];
        !           186:   ssize_t i;
        !           187:   unsigned char *ptr = buffer;
        !           188:   char *optr = data;
        !           189:   ssize_t width = 0;
        !           190:   int left = sizeof(data);
        !           191: 
        !           192:   for(i = 0; i<len && (left >= 0); i++) {
        !           193:     msnprintf(optr, left, "%02x", ptr[i]);
        !           194:     width += 2;
        !           195:     optr += 2;
        !           196:     left -= 2;
        !           197:   }
        !           198:   if(width)
        !           199:     logmsg("'%s'", data);
        !           200: }
        !           201: 
        !           202: typedef enum {
        !           203:   FROM_CLIENT,
        !           204:   FROM_SERVER
        !           205: } mqttdir;
        !           206: 
        !           207: static void logprotocol(mqttdir dir,
        !           208:                         const char *prefix, size_t remlen,
        !           209:                         FILE *output,
        !           210:                         unsigned char *buffer, ssize_t len)
        !           211: {
        !           212:   char data[12000] = "";
        !           213:   ssize_t i;
        !           214:   unsigned char *ptr = buffer;
        !           215:   char *optr = data;
        !           216:   ssize_t width = 0;
        !           217:   int left = sizeof(data);
        !           218: 
        !           219:   for(i = 0; i<len && (left >= 0); i++) {
        !           220:     msnprintf(optr, left, "%02x", ptr[i]);
        !           221:     width += 2;
        !           222:     optr += 2;
        !           223:     left -= 2;
        !           224:   }
        !           225:   fprintf(output, "%s %s %zx %s\n",
        !           226:           dir == FROM_CLIENT? "client": "server",
        !           227:           prefix, remlen,
        !           228:           data);
        !           229: }
        !           230: 
        !           231: 
        !           232: /* return 0 on success */
        !           233: static int connack(FILE *dump, curl_socket_t fd)
        !           234: {
        !           235:   unsigned char packet[]={
        !           236:     MQTT_MSG_CONNACK, 0x02,
        !           237:     0x00, 0x00
        !           238:   };
        !           239:   ssize_t rc;
        !           240: 
        !           241:   packet[3] = config.error_connack;
        !           242: 
        !           243:   rc = swrite(fd, (char *)packet, sizeof(packet));
        !           244:   if(rc > 0) {
        !           245:     logmsg("WROTE %d bytes [CONNACK]", rc);
        !           246:     loghex(packet, rc);
        !           247:     logprotocol(FROM_SERVER, "CONNACK", 2, dump, packet, sizeof(packet));
        !           248:   }
        !           249:   if(rc == sizeof(packet)) {
        !           250:     return 0;
        !           251:   }
        !           252:   return 1;
        !           253: }
        !           254: 
        !           255: /* return 0 on success */
        !           256: static int suback(FILE *dump, curl_socket_t fd, unsigned short packetid)
        !           257: {
        !           258:   unsigned char packet[]={
        !           259:     MQTT_MSG_SUBACK, 0x03,
        !           260:     0, 0, /* filled in below */
        !           261:     0x00
        !           262:   };
        !           263:   ssize_t rc;
        !           264:   packet[2] = (unsigned char)(packetid >> 8);
        !           265:   packet[3] = (unsigned char)(packetid & 0xff);
        !           266: 
        !           267:   rc = swrite(fd, (char *)packet, sizeof(packet));
        !           268:   if(rc == sizeof(packet)) {
        !           269:     logmsg("WROTE %d bytes [SUBACK]", rc);
        !           270:     loghex(packet, rc);
        !           271:     logprotocol(FROM_SERVER, "SUBACK", 3, dump, packet, rc);
        !           272:     return 0;
        !           273:   }
        !           274:   return 1;
        !           275: }
        !           276: 
        !           277: #ifdef QOS
        !           278: /* return 0 on success */
        !           279: static int puback(FILE *dump, curl_socket_t fd, unsigned short packetid)
        !           280: {
        !           281:   unsigned char packet[]={
        !           282:     MQTT_MSG_PUBACK, 0x00,
        !           283:     0, 0 /* filled in below */
        !           284:   };
        !           285:   ssize_t rc;
        !           286:   packet[2] = (unsigned char)(packetid >> 8);
        !           287:   packet[3] = (unsigned char)(packetid & 0xff);
        !           288: 
        !           289:   rc = swrite(fd, (char *)packet, sizeof(packet));
        !           290:   if(rc == sizeof(packet)) {
        !           291:     logmsg("WROTE %d bytes [PUBACK]", rc);
        !           292:     loghex(packet, rc);
        !           293:     logprotocol(FROM_SERVER, dump, packet, rc);
        !           294:     return 0;
        !           295:   }
        !           296:   logmsg("Failed sending [PUBACK]");
        !           297:   return 1;
        !           298: }
        !           299: #endif
        !           300: 
        !           301: /* return 0 on success */
        !           302: static int disconnect(FILE *dump, curl_socket_t fd)
        !           303: {
        !           304:   unsigned char packet[]={
        !           305:     MQTT_MSG_DISCONNECT, 0x00,
        !           306:   };
        !           307:   ssize_t rc = swrite(fd, (char *)packet, sizeof(packet));
        !           308:   if(rc == sizeof(packet)) {
        !           309:     logmsg("WROTE %d bytes [DISCONNECT]", rc);
        !           310:     loghex(packet, rc);
        !           311:     logprotocol(FROM_SERVER, "DISCONNECT", 0, dump, packet, rc);
        !           312:     return 0;
        !           313:   }
        !           314:   logmsg("Failed sending [DISCONNECT]");
        !           315:   return 1;
        !           316: }
        !           317: 
        !           318: 
        !           319: 
        !           320: /*
        !           321:   do
        !           322: 
        !           323:      encodedByte = X MOD 128
        !           324: 
        !           325:      X = X DIV 128
        !           326: 
        !           327:      // if there are more data to encode, set the top bit of this byte
        !           328: 
        !           329:      if ( X > 0 )
        !           330: 
        !           331:         encodedByte = encodedByte OR 128
        !           332: 
        !           333:       endif
        !           334: 
        !           335:     'output' encodedByte
        !           336: 
        !           337:   while ( X > 0 )
        !           338: 
        !           339: */
        !           340: 
        !           341: /* return number of bytes used */
        !           342: static int encode_length(size_t packetlen, char *remlength) /* 4 bytes */
        !           343: {
        !           344:   int bytes = 0;
        !           345:   unsigned char encode;
        !           346: 
        !           347:   do {
        !           348:     encode = packetlen % 0x80;
        !           349:     packetlen /= 0x80;
        !           350:     if(packetlen)
        !           351:       encode |= 0x80;
        !           352: 
        !           353:     remlength[bytes++] = encode;
        !           354: 
        !           355:     if(bytes > 3) {
        !           356:       logmsg("too large packet!");
        !           357:       return 0;
        !           358:     }
        !           359:   } while(packetlen);
        !           360: 
        !           361:   return bytes;
        !           362: }
        !           363: 
        !           364: 
        !           365: static size_t decode_length(unsigned char *buf,
        !           366:                             size_t buflen, size_t *lenbytes)
        !           367: {
        !           368:   size_t len = 0;
        !           369:   size_t mult = 1;
        !           370:   size_t i;
        !           371:   unsigned char encoded = 0x80;
        !           372: 
        !           373:   for(i = 0; (i < buflen) && (encoded & 0x80); i++) {
        !           374:     encoded = buf[i];
        !           375:     len += (encoded & 0x7f) * mult;
        !           376:     mult *= 0x80;
        !           377:   }
        !           378: 
        !           379:   if(lenbytes)
        !           380:     *lenbytes = i;
        !           381: 
        !           382:   return len;
        !           383: }
        !           384: 
        !           385: 
        !           386: /* return 0 on success */
        !           387: static int publish(FILE *dump,
        !           388:                    curl_socket_t fd, unsigned short packetid,
        !           389:                    char *topic, char *payload, size_t payloadlen)
        !           390: {
        !           391:   size_t topiclen = strlen(topic);
        !           392:   unsigned char *packet;
        !           393:   size_t payloadindex;
        !           394:   ssize_t remaininglength = topiclen + 2 + payloadlen;
        !           395:   ssize_t packetlen;
        !           396:   ssize_t sendamount;
        !           397:   ssize_t rc;
        !           398:   char rembuffer[4];
        !           399:   int encodedlen;
        !           400: 
        !           401:   encodedlen = encode_length(remaininglength, rembuffer);
        !           402: 
        !           403:   /* one packet type byte (possibly two more for packetid) */
        !           404:   packetlen = remaininglength + encodedlen + 1;
        !           405:   packet = malloc(packetlen);
        !           406:   if(!packet)
        !           407:     return 1;
        !           408: 
        !           409:   packet[0] = MQTT_MSG_PUBLISH; /* TODO: set QoS? */
        !           410:   memcpy(&packet[1], rembuffer, encodedlen);
        !           411: 
        !           412:   (void)packetid;
        !           413:   /* packet_id if QoS is set */
        !           414: 
        !           415:   packet[1 + encodedlen] = (unsigned char)(topiclen >> 8);
        !           416:   packet[2 + encodedlen] = (unsigned char)(topiclen & 0xff);
        !           417:   memcpy(&packet[3 + encodedlen], topic, topiclen);
        !           418: 
        !           419:   payloadindex = 3 + topiclen + encodedlen;
        !           420:   memcpy(&packet[payloadindex], payload, payloadlen);
        !           421: 
        !           422:   sendamount = packetlen;
        !           423:   if(config.short_publish)
        !           424:     sendamount -= 2;
        !           425: 
        !           426:   rc = swrite(fd, (char *)packet, sendamount);
        !           427:   if(rc > 0) {
        !           428:     logmsg("WROTE %d bytes [PUBLISH]", rc);
        !           429:     loghex(packet, rc);
        !           430:     logprotocol(FROM_SERVER, "PUBLISH", remaininglength, dump, packet, rc);
        !           431:   }
        !           432:   if(rc == packetlen)
        !           433:     return 0;
        !           434:   return 1;
        !           435: }
        !           436: 
        !           437: #define MAX_TOPIC_LENGTH 65535
        !           438: #define MAX_CLIENT_ID_LENGTH 32
        !           439: 
        !           440: static char topic[MAX_TOPIC_LENGTH + 1];
        !           441: 
        !           442: static int fixedheader(curl_socket_t fd,
        !           443:                        unsigned char *bytep,
        !           444:                        size_t *remaining_lengthp,
        !           445:                        size_t *remaining_length_bytesp)
        !           446: {
        !           447:   /* get the fixed header */
        !           448:   unsigned char buffer[10];
        !           449: 
        !           450:   /* get the first two bytes */
        !           451:   ssize_t rc = sread(fd, (char *)buffer, 2);
        !           452:   int i;
        !           453:   if(rc < 2) {
        !           454:     logmsg("READ %d bytes [SHORT!]", rc);
        !           455:     return 1; /* fail */
        !           456:   }
        !           457:   logmsg("READ %d bytes", rc);
        !           458:   loghex(buffer, rc);
        !           459:   *bytep = buffer[0];
        !           460: 
        !           461:   /* if the length byte has the top bit set, get the next one too */
        !           462:   i = 1;
        !           463:   while(buffer[i] & 0x80) {
        !           464:     i++;
        !           465:     rc = sread(fd, (char *)&buffer[i], 1);
        !           466:     if(rc != 1) {
        !           467:       logmsg("Remaining Length broken");
        !           468:       return 1;
        !           469:     }
        !           470:   }
        !           471:   *remaining_lengthp = decode_length(&buffer[1], i, remaining_length_bytesp);
        !           472:   logmsg("Remaining Length: %ld [%d bytes]", (long) *remaining_lengthp,
        !           473:          *remaining_length_bytesp);
        !           474:   return 0;
        !           475: }
        !           476: 
        !           477: static curl_socket_t mqttit(curl_socket_t fd)
        !           478: {
        !           479:   unsigned char buffer[10*1024];
        !           480:   ssize_t rc;
        !           481:   unsigned char byte;
        !           482:   unsigned short packet_id;
        !           483:   size_t payload_len;
        !           484:   unsigned int topic_len;
        !           485:   size_t remaining_length = 0;
        !           486:   size_t bytes = 0; /* remaining length field size in bytes */
        !           487:   char client_id[MAX_CLIENT_ID_LENGTH];
        !           488:   long testno;
        !           489: 
        !           490:   static const char protocol[7] = {
        !           491:     0x00, 0x04,       /* protocol length */
        !           492:     'M','Q','T','T',  /* protocol name */
        !           493:     0x04              /* protocol level */
        !           494:   };
        !           495:   FILE *dump = fopen(REQUEST_DUMP, "ab");
        !           496:   if(!dump)
        !           497:     goto end;
        !           498: 
        !           499:   getconfig();
        !           500: 
        !           501:   testno = config.testnum;
        !           502: 
        !           503:   if(testno)
        !           504:     logmsg("Found test number %ld", testno);
        !           505: 
        !           506:   do {
        !           507:     /* get the fixed header */
        !           508:     rc = fixedheader(fd, &byte, &remaining_length, &bytes);
        !           509:     if(rc)
        !           510:       break;
        !           511:     if(remaining_length) {
        !           512:       rc = sread(fd, (char *)buffer, remaining_length);
        !           513:       if(rc > 0) {
        !           514:         logmsg("READ %d bytes", rc);
        !           515:         loghex(buffer, rc);
        !           516:       }
        !           517:     }
        !           518: 
        !           519:     if(byte == MQTT_MSG_CONNECT) {
        !           520:       logprotocol(FROM_CLIENT, "CONNECT", remaining_length,
        !           521:                   dump, buffer, rc);
        !           522: 
        !           523:       if(memcmp(protocol, buffer, sizeof(protocol))) {
        !           524:         logmsg("Protocol preamble mismatch");
        !           525:         goto end;
        !           526:       }
        !           527:       /* ignore the connect flag byte and two keepalive bytes */
        !           528: 
        !           529:       payload_len = (buffer[10] << 8) | buffer[11];
        !           530:       if((ssize_t)payload_len != (rc - 12)) {
        !           531:         logmsg("Payload length mismatch, expected %x got %x",
        !           532:                rc - 12, payload_len);
        !           533:         goto end;
        !           534:       }
        !           535:       else if((payload_len + 1) > MAX_CLIENT_ID_LENGTH) {
        !           536:         logmsg("Too large client id");
        !           537:         goto end;
        !           538:       }
        !           539:       memcpy(client_id, &buffer[14], payload_len);
        !           540:       client_id[payload_len] = 0;
        !           541: 
        !           542:       logmsg("MQTT client connect accepted: %s", client_id);
        !           543: 
        !           544:       /* The first packet sent from the Server to the Client MUST be a
        !           545:          CONNACK Packet */
        !           546: 
        !           547:       if(connack(dump, fd)) {
        !           548:         logmsg("failed sending CONNACK");
        !           549:         goto end;
        !           550:       }
        !           551:     }
        !           552:     else if(byte == MQTT_MSG_SUBSCRIBE) {
        !           553:       FILE *stream;
        !           554:       int error;
        !           555:       char *data;
        !           556:       size_t datalen;
        !           557:       logprotocol(FROM_CLIENT, "SUBSCRIBE", remaining_length,
        !           558:                   dump, buffer, rc);
        !           559:       logmsg("Incoming SUBSCRIBE");
        !           560: 
        !           561:       if(rc < 6) {
        !           562:         logmsg("Too small SUBSCRIBE");
        !           563:         goto end;
        !           564:       }
        !           565: 
        !           566:       /* two bytes packet id */
        !           567:       packet_id = (unsigned short)((buffer[0] << 8) | buffer[1]);
        !           568: 
        !           569:       /* two bytes topic length */
        !           570:       topic_len = (buffer[2] << 8) | buffer[3];
        !           571:       if(topic_len != (remaining_length - 5)) {
        !           572:         logmsg("Wrong topic length, got %d expected %d",
        !           573:                topic_len, remaining_length - 5);
        !           574:         goto end;
        !           575:       }
        !           576:       memcpy(topic, &buffer[4], topic_len);
        !           577:       topic[topic_len] = 0;
        !           578: 
        !           579:       /* there's a QoS byte (two bits) after the topic */
        !           580: 
        !           581:       logmsg("SUBSCRIBE to '%s' [%d]", topic, packet_id);
        !           582:       stream = test2fopen(testno);
        !           583:       error = getpart(&data, &datalen, "reply", "data", stream);
        !           584:       if(!error) {
        !           585:         if(!config.publish_before_suback) {
        !           586:           if(suback(dump, fd, packet_id)) {
        !           587:             logmsg("failed sending SUBACK");
        !           588:             goto end;
        !           589:           }
        !           590:         }
        !           591:         if(publish(dump, fd, packet_id, topic, data, datalen)) {
        !           592:           logmsg("PUBLISH failed");
        !           593:           goto end;
        !           594:         }
        !           595:         if(config.publish_before_suback) {
        !           596:           if(suback(dump, fd, packet_id)) {
        !           597:             logmsg("failed sending SUBACK");
        !           598:             goto end;
        !           599:           }
        !           600:         }
        !           601:       }
        !           602:       else {
        !           603:         char *def = (char *)"this is random payload yes yes it is";
        !           604:         publish(dump, fd, packet_id, topic, def, strlen(def));
        !           605:       }
        !           606:       disconnect(dump, fd);
        !           607:     }
        !           608:     else if((byte & 0xf0) == (MQTT_MSG_PUBLISH & 0xf0)) {
        !           609:       size_t topiclen;
        !           610: 
        !           611:       logmsg("Incoming PUBLISH");
        !           612:       logprotocol(FROM_CLIENT, "PUBLISH", remaining_length,
        !           613:                   dump, buffer, rc);
        !           614: 
        !           615:       topiclen = (buffer[1 + bytes] << 8) | buffer[2 + bytes];
        !           616:       logmsg("Got %d bytes topic", topiclen);
        !           617:       /* TODO: verify topiclen */
        !           618: 
        !           619: #ifdef QOS
        !           620:       /* TODO: handle packetid if there is one. Send puback if QoS > 0 */
        !           621:       puback(dump, fd, 0);
        !           622: #endif
        !           623:       /* expect a disconnect here */
        !           624:       /* get the request */
        !           625:       rc = sread(fd, (char *)&buffer[0], 2);
        !           626: 
        !           627:       logmsg("READ %d bytes [DISCONNECT]", rc);
        !           628:       loghex(buffer, rc);
        !           629:       logprotocol(FROM_CLIENT, "DISCONNECT", 0, dump, buffer, rc);
        !           630:       goto end;
        !           631:     }
        !           632:     else {
        !           633:       /* not supported (yet) */
        !           634:       goto end;
        !           635:     }
        !           636:   } while(1);
        !           637: 
        !           638:   end:
        !           639:   fclose(dump);
        !           640:   return CURL_SOCKET_BAD;
        !           641: }
        !           642: 
        !           643: /*
        !           644:   sockfdp is a pointer to an established stream or CURL_SOCKET_BAD
        !           645: 
        !           646:   if sockfd is CURL_SOCKET_BAD, listendfd is a listening socket we must
        !           647:   accept()
        !           648: */
        !           649: static bool incoming(curl_socket_t listenfd)
        !           650: {
        !           651:   fd_set fds_read;
        !           652:   fd_set fds_write;
        !           653:   fd_set fds_err;
        !           654:   int clients = 0; /* connected clients */
        !           655: 
        !           656:   if(got_exit_signal) {
        !           657:     logmsg("signalled to die, exiting...");
        !           658:     return FALSE;
        !           659:   }
        !           660: 
        !           661: #ifdef HAVE_GETPPID
        !           662:   /* As a last resort, quit if socks5 process becomes orphan. */
        !           663:   if(getppid() <= 1) {
        !           664:     logmsg("process becomes orphan, exiting");
        !           665:     return FALSE;
        !           666:   }
        !           667: #endif
        !           668: 
        !           669:   do {
        !           670:     ssize_t rc;
        !           671:     int error = 0;
        !           672:     curl_socket_t sockfd = listenfd;
        !           673:     int maxfd = (int)sockfd;
        !           674: 
        !           675:     FD_ZERO(&fds_read);
        !           676:     FD_ZERO(&fds_write);
        !           677:     FD_ZERO(&fds_err);
        !           678: 
        !           679:     /* there's always a socket to wait for */
        !           680:     FD_SET(sockfd, &fds_read);
        !           681: 
        !           682:     do {
        !           683:       /* select() blocking behavior call on blocking descriptors please */
        !           684:       rc = select(maxfd + 1, &fds_read, &fds_write, &fds_err, NULL);
        !           685:       if(got_exit_signal) {
        !           686:         logmsg("signalled to die, exiting...");
        !           687:         return FALSE;
        !           688:       }
        !           689:     } while((rc == -1) && ((error = SOCKERRNO) == EINTR));
        !           690: 
        !           691:     if(rc < 0) {
        !           692:       logmsg("select() failed with error: (%d) %s",
        !           693:              error, strerror(error));
        !           694:       return FALSE;
        !           695:     }
        !           696: 
        !           697:     if(FD_ISSET(sockfd, &fds_read)) {
        !           698:       curl_socket_t newfd = accept(sockfd, NULL, NULL);
        !           699:       if(CURL_SOCKET_BAD == newfd) {
        !           700:         error = SOCKERRNO;
        !           701:         logmsg("accept(%d, NULL, NULL) failed with error: (%d) %s",
        !           702:                sockfd, error, strerror(error));
        !           703:       }
        !           704:       else {
        !           705:         logmsg("====> Client connect, fd %d. Read config from %s",
        !           706:                newfd, configfile);
        !           707:         set_advisor_read_lock(SERVERLOGS_LOCK);
        !           708:         (void)mqttit(newfd); /* until done */
        !           709:         clear_advisor_read_lock(SERVERLOGS_LOCK);
        !           710: 
        !           711:         logmsg("====> Client disconnect");
        !           712:         sclose(newfd);
        !           713:       }
        !           714:     }
        !           715:   } while(clients);
        !           716: 
        !           717:   return TRUE;
        !           718: }
        !           719: 
        !           720: static curl_socket_t sockdaemon(curl_socket_t sock,
        !           721:                                 unsigned short *listenport)
        !           722: {
        !           723:   /* passive daemon style */
        !           724:   srvr_sockaddr_union_t listener;
        !           725:   int flag;
        !           726:   int rc;
        !           727:   int totdelay = 0;
        !           728:   int maxretr = 10;
        !           729:   int delay = 20;
        !           730:   int attempt = 0;
        !           731:   int error = 0;
        !           732: 
        !           733:   do {
        !           734:     attempt++;
        !           735:     flag = 1;
        !           736:     rc = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
        !           737:          (void *)&flag, sizeof(flag));
        !           738:     if(rc) {
        !           739:       error = SOCKERRNO;
        !           740:       logmsg("setsockopt(SO_REUSEADDR) failed with error: (%d) %s",
        !           741:              error, strerror(error));
        !           742:       if(maxretr) {
        !           743:         rc = wait_ms(delay);
        !           744:         if(rc) {
        !           745:           /* should not happen */
        !           746:           logmsg("wait_ms() failed with error: %d", rc);
        !           747:           sclose(sock);
        !           748:           return CURL_SOCKET_BAD;
        !           749:         }
        !           750:         if(got_exit_signal) {
        !           751:           logmsg("signalled to die, exiting...");
        !           752:           sclose(sock);
        !           753:           return CURL_SOCKET_BAD;
        !           754:         }
        !           755:         totdelay += delay;
        !           756:         delay *= 2; /* double the sleep for next attempt */
        !           757:       }
        !           758:     }
        !           759:   } while(rc && maxretr--);
        !           760: 
        !           761:   if(rc) {
        !           762:     logmsg("setsockopt(SO_REUSEADDR) failed %d times in %d ms. Error: (%d) %s",
        !           763:            attempt, totdelay, error, strerror(error));
        !           764:     logmsg("Continuing anyway...");
        !           765:   }
        !           766: 
        !           767:   /* When the specified listener port is zero, it is actually a
        !           768:      request to let the system choose a non-zero available port. */
        !           769: 
        !           770: #ifdef ENABLE_IPV6
        !           771:   if(!use_ipv6) {
        !           772: #endif
        !           773:     memset(&listener.sa4, 0, sizeof(listener.sa4));
        !           774:     listener.sa4.sin_family = AF_INET;
        !           775:     listener.sa4.sin_addr.s_addr = INADDR_ANY;
        !           776:     listener.sa4.sin_port = htons(*listenport);
        !           777:     rc = bind(sock, &listener.sa, sizeof(listener.sa4));
        !           778: #ifdef ENABLE_IPV6
        !           779:   }
        !           780:   else {
        !           781:     memset(&listener.sa6, 0, sizeof(listener.sa6));
        !           782:     listener.sa6.sin6_family = AF_INET6;
        !           783:     listener.sa6.sin6_addr = in6addr_any;
        !           784:     listener.sa6.sin6_port = htons(*listenport);
        !           785:     rc = bind(sock, &listener.sa, sizeof(listener.sa6));
        !           786:   }
        !           787: #endif /* ENABLE_IPV6 */
        !           788:   if(rc) {
        !           789:     error = SOCKERRNO;
        !           790:     logmsg("Error binding socket on port %hu: (%d) %s",
        !           791:            *listenport, error, strerror(error));
        !           792:     sclose(sock);
        !           793:     return CURL_SOCKET_BAD;
        !           794:   }
        !           795: 
        !           796:   if(!*listenport) {
        !           797:     /* The system was supposed to choose a port number, figure out which
        !           798:        port we actually got and update the listener port value with it. */
        !           799:     curl_socklen_t la_size;
        !           800:     srvr_sockaddr_union_t localaddr;
        !           801: #ifdef ENABLE_IPV6
        !           802:     if(!use_ipv6)
        !           803: #endif
        !           804:       la_size = sizeof(localaddr.sa4);
        !           805: #ifdef ENABLE_IPV6
        !           806:     else
        !           807:       la_size = sizeof(localaddr.sa6);
        !           808: #endif
        !           809:     memset(&localaddr.sa, 0, (size_t)la_size);
        !           810:     if(getsockname(sock, &localaddr.sa, &la_size) < 0) {
        !           811:       error = SOCKERRNO;
        !           812:       logmsg("getsockname() failed with error: (%d) %s",
        !           813:              error, strerror(error));
        !           814:       sclose(sock);
        !           815:       return CURL_SOCKET_BAD;
        !           816:     }
        !           817:     switch(localaddr.sa.sa_family) {
        !           818:     case AF_INET:
        !           819:       *listenport = ntohs(localaddr.sa4.sin_port);
        !           820:       break;
        !           821: #ifdef ENABLE_IPV6
        !           822:     case AF_INET6:
        !           823:       *listenport = ntohs(localaddr.sa6.sin6_port);
        !           824:       break;
        !           825: #endif
        !           826:     default:
        !           827:       break;
        !           828:     }
        !           829:     if(!*listenport) {
        !           830:       /* Real failure, listener port shall not be zero beyond this point. */
        !           831:       logmsg("Apparently getsockname() succeeded, with listener port zero.");
        !           832:       logmsg("A valid reason for this failure is a binary built without");
        !           833:       logmsg("proper network library linkage. This might not be the only");
        !           834:       logmsg("reason, but double check it before anything else.");
        !           835:       sclose(sock);
        !           836:       return CURL_SOCKET_BAD;
        !           837:     }
        !           838:   }
        !           839: 
        !           840:   /* start accepting connections */
        !           841:   rc = listen(sock, 5);
        !           842:   if(0 != rc) {
        !           843:     error = SOCKERRNO;
        !           844:     logmsg("listen(%d, 5) failed with error: (%d) %s",
        !           845:            sock, error, strerror(error));
        !           846:     sclose(sock);
        !           847:     return CURL_SOCKET_BAD;
        !           848:   }
        !           849: 
        !           850:   return sock;
        !           851: }
        !           852: 
        !           853: 
        !           854: int main(int argc, char *argv[])
        !           855: {
        !           856:   curl_socket_t sock = CURL_SOCKET_BAD;
        !           857:   curl_socket_t msgsock = CURL_SOCKET_BAD;
        !           858:   int wrotepidfile = 0;
        !           859:   int wroteportfile = 0;
        !           860:   const char *pidname = ".mqttd.pid";
        !           861:   const char *portname = ".mqttd.port";
        !           862:   bool juggle_again;
        !           863:   int error;
        !           864:   int arg = 1;
        !           865: 
        !           866:   while(argc>arg) {
        !           867:     if(!strcmp("--version", argv[arg])) {
        !           868:       printf("mqttd IPv4%s\n",
        !           869: #ifdef ENABLE_IPV6
        !           870:              "/IPv6"
        !           871: #else
        !           872:              ""
        !           873: #endif
        !           874:              );
        !           875:       return 0;
        !           876:     }
        !           877:     else if(!strcmp("--pidfile", argv[arg])) {
        !           878:       arg++;
        !           879:       if(argc>arg)
        !           880:         pidname = argv[arg++];
        !           881:     }
        !           882:     else if(!strcmp("--portfile", argv[arg])) {
        !           883:       arg++;
        !           884:       if(argc>arg)
        !           885:         portname = argv[arg++];
        !           886:     }
        !           887:     else if(!strcmp("--config", argv[arg])) {
        !           888:       arg++;
        !           889:       if(argc>arg)
        !           890:         configfile = argv[arg++];
        !           891:     }
        !           892:     else if(!strcmp("--logfile", argv[arg])) {
        !           893:       arg++;
        !           894:       if(argc>arg)
        !           895:         serverlogfile = argv[arg++];
        !           896:     }
        !           897:     else if(!strcmp("--ipv6", argv[arg])) {
        !           898: #ifdef ENABLE_IPV6
        !           899:       ipv_inuse = "IPv6";
        !           900:       use_ipv6 = TRUE;
        !           901: #endif
        !           902:       arg++;
        !           903:     }
        !           904:     else if(!strcmp("--ipv4", argv[arg])) {
        !           905:       /* for completeness, we support this option as well */
        !           906: #ifdef ENABLE_IPV6
        !           907:       ipv_inuse = "IPv4";
        !           908:       use_ipv6 = FALSE;
        !           909: #endif
        !           910:       arg++;
        !           911:     }
        !           912:     else if(!strcmp("--port", argv[arg])) {
        !           913:       arg++;
        !           914:       if(argc>arg) {
        !           915:         char *endptr;
        !           916:         unsigned long ulnum = strtoul(argv[arg], &endptr, 10);
        !           917:         if((endptr != argv[arg] + strlen(argv[arg])) ||
        !           918:            ((ulnum != 0UL) && ((ulnum < 1025UL) || (ulnum > 65535UL)))) {
        !           919:           fprintf(stderr, "mqttd: invalid --port argument (%s)\n",
        !           920:                   argv[arg]);
        !           921:           return 0;
        !           922:         }
        !           923:         port = curlx_ultous(ulnum);
        !           924:         arg++;
        !           925:       }
        !           926:     }
        !           927:     else {
        !           928:       puts("Usage: mqttd [option]\n"
        !           929:            " --config [file]\n"
        !           930:            " --version\n"
        !           931:            " --logfile [file]\n"
        !           932:            " --pidfile [file]\n"
        !           933:            " --ipv4\n"
        !           934:            " --ipv6\n"
        !           935:            " --port [port]\n");
        !           936:       return 0;
        !           937:     }
        !           938:   }
        !           939: 
        !           940: #ifdef WIN32
        !           941:   win32_init();
        !           942:   atexit(win32_cleanup);
        !           943: 
        !           944:   setmode(fileno(stdin), O_BINARY);
        !           945:   setmode(fileno(stdout), O_BINARY);
        !           946:   setmode(fileno(stderr), O_BINARY);
        !           947: #endif
        !           948: 
        !           949:   install_signal_handlers(FALSE);
        !           950: 
        !           951: #ifdef ENABLE_IPV6
        !           952:   if(!use_ipv6)
        !           953: #endif
        !           954:     sock = socket(AF_INET, SOCK_STREAM, 0);
        !           955: #ifdef ENABLE_IPV6
        !           956:   else
        !           957:     sock = socket(AF_INET6, SOCK_STREAM, 0);
        !           958: #endif
        !           959: 
        !           960:   if(CURL_SOCKET_BAD == sock) {
        !           961:     error = SOCKERRNO;
        !           962:     logmsg("Error creating socket: (%d) %s",
        !           963:            error, strerror(error));
        !           964:     goto mqttd_cleanup;
        !           965:   }
        !           966: 
        !           967:   {
        !           968:     /* passive daemon style */
        !           969:     sock = sockdaemon(sock, &port);
        !           970:     if(CURL_SOCKET_BAD == sock) {
        !           971:       goto mqttd_cleanup;
        !           972:     }
        !           973:     msgsock = CURL_SOCKET_BAD; /* no stream socket yet */
        !           974:   }
        !           975: 
        !           976:   logmsg("Running %s version", ipv_inuse);
        !           977:   logmsg("Listening on port %hu", port);
        !           978: 
        !           979:   wrotepidfile = write_pidfile(pidname);
        !           980:   if(!wrotepidfile) {
        !           981:     goto mqttd_cleanup;
        !           982:   }
        !           983: 
        !           984:   wroteportfile = write_portfile(portname, (int)port);
        !           985:   if(!wroteportfile) {
        !           986:     goto mqttd_cleanup;
        !           987:   }
        !           988: 
        !           989:   do {
        !           990:     juggle_again = incoming(sock);
        !           991:   } while(juggle_again);
        !           992: 
        !           993: mqttd_cleanup:
        !           994: 
        !           995:   if((msgsock != sock) && (msgsock != CURL_SOCKET_BAD))
        !           996:     sclose(msgsock);
        !           997: 
        !           998:   if(sock != CURL_SOCKET_BAD)
        !           999:     sclose(sock);
        !          1000: 
        !          1001:   if(wrotepidfile)
        !          1002:     unlink(pidname);
        !          1003: 
        !          1004:   restore_signal_handlers(FALSE);
        !          1005: 
        !          1006:   if(got_exit_signal) {
        !          1007:     logmsg("============> mqttd exits with signal (%d)", exit_signal);
        !          1008:     /*
        !          1009:      * To properly set the return status of the process we
        !          1010:      * must raise the same signal SIGINT or SIGTERM that we
        !          1011:      * caught and let the old handler take care of it.
        !          1012:      */
        !          1013:     raise(exit_signal);
        !          1014:   }
        !          1015: 
        !          1016:   logmsg("============> mqttd quits");
        !          1017:   return 0;
        !          1018: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>