File:  [ELWIX - Embedded LightWeight unIX -] / embedaddon / curl / tests / server / mqttd.c
Revision 1.1.1.1 (vendor branch): download - view: text, annotated - select for diffs - revision graph
Wed Jun 3 10:01:16 2020 UTC (5 years ago) by misho
Branches: curl, MAIN
CVS tags: v7_70_0p4, HEAD
curl

    1: /***************************************************************************
    2:  *                                  _   _ ____  _
    3:  *  Project                     ___| | | |  _ \| |
    4:  *                             / __| | | | |_) | |
    5:  *                            | (__| |_| |  _ <| |___
    6:  *                             \___|\___/|_| \_\_____|
    7:  *
    8:  * Copyright (C) 1998 - 2020, Daniel Stenberg, <daniel@haxx.se>, et al.
    9:  *
   10:  * This software is licensed as described in the file COPYING, which
   11:  * you should have received as part of this distribution. The terms
   12:  * are also available at https://curl.haxx.se/docs/copyright.html.
   13:  *
   14:  * You may opt to use, copy, modify, merge, publish, distribute and/or sell
   15:  * copies of the Software, and permit persons to whom the Software is
   16:  * furnished to do so, under the terms of the COPYING file.
   17:  *
   18:  * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
   19:  * KIND, either express or implied.
   20:  *
   21:  ***************************************************************************/
   22: #include "server_setup.h"
   23: #include <stdlib.h>
   24: #include <string.h>
   25: #include "util.h"
   26: 
   27: /* Function
   28:  *
   29:  * Accepts a TCP connection on a custom port (IPv4 or IPv6).  Speaks MQTT.
   30:  *
   31:  * Read commands from FILE (set with --config). The commands control how to
   32:  * act and is reset to defaults each client TCP connect.
   33:  *
   34:  * Config file keywords:
   35:  *
   36:  * TODO
   37:  */
   38: 
   39: /* based on sockfilt.c */
   40: 
   41: #ifdef HAVE_SIGNAL_H
   42: #include <signal.h>
   43: #endif
   44: #ifdef HAVE_NETINET_IN_H
   45: #include <netinet/in.h>
   46: #endif
   47: #ifdef HAVE_NETINET_IN6_H
   48: #include <netinet/in6.h>
   49: #endif
   50: #ifdef HAVE_ARPA_INET_H
   51: #include <arpa/inet.h>
   52: #endif
   53: #ifdef HAVE_NETDB_H
   54: #include <netdb.h>
   55: #endif
   56: 
   57: #define ENABLE_CURLX_PRINTF
   58: /* make the curlx header define all printf() functions to use the curlx_*
   59:    versions instead */
   60: #include "curlx.h" /* from the private lib dir */
   61: #include "getpart.h"
   62: #include "inet_pton.h"
   63: #include "util.h"
   64: #include "server_sockaddr.h"
   65: #include "warnless.h"
   66: 
   67: /* include memdebug.h last */
   68: #include "memdebug.h"
   69: 
   70: #ifdef USE_WINSOCK
   71: #undef  EINTR
   72: #define EINTR    4 /* errno.h value */
   73: #undef  EAGAIN
   74: #define EAGAIN  11 /* errno.h value */
   75: #undef  ENOMEM
   76: #define ENOMEM  12 /* errno.h value */
   77: #undef  EINVAL
   78: #define EINVAL  22 /* errno.h value */
   79: #endif
   80: 
   81: #define DEFAULT_PORT 1883 /* MQTT default port */
   82: 
   83: #ifndef DEFAULT_LOGFILE
   84: #define DEFAULT_LOGFILE "log/mqttd.log"
   85: #endif
   86: 
   87: #ifndef DEFAULT_CONFIG
   88: #define DEFAULT_CONFIG "mqttd.config"
   89: #endif
   90: 
   91: #define MQTT_MSG_CONNECT    0x10
   92: #define MQTT_MSG_CONNACK    0x20
   93: #define MQTT_MSG_PUBLISH    0x30
   94: #define MQTT_MSG_PUBACK     0x40
   95: #define MQTT_MSG_SUBSCRIBE  0x82
   96: #define MQTT_MSG_SUBACK     0x90
   97: #define MQTT_MSG_DISCONNECT 0xe0
   98: 
   99: #define MQTT_CONNACK_LEN 4
  100: #define MQTT_SUBACK_LEN 5
  101: #define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */
  102: #define MQTT_HEADER_LEN 5    /* max 5 bytes */
  103: 
  104: struct configurable {
  105:   unsigned char version; /* initial version byte in the request must match
  106:                             this */
  107:   bool publish_before_suback;
  108:   bool short_publish;
  109:   unsigned char error_connack;
  110:   int testnum;
  111: };
  112: 
  113: #define REQUEST_DUMP  "log/server.input"
  114: #define CONFIG_VERSION 5
  115: 
  116: static struct configurable config;
  117: 
  118: const char *serverlogfile = DEFAULT_LOGFILE;
  119: static const char *configfile = DEFAULT_CONFIG;
  120: 
  121: #ifdef ENABLE_IPV6
  122: static bool use_ipv6 = FALSE;
  123: #endif
  124: static const char *ipv_inuse = "IPv4";
  125: static unsigned short port = DEFAULT_PORT;
  126: 
  127: static void resetdefaults(void)
  128: {
  129:   logmsg("Reset to defaults");
  130:   config.version = CONFIG_VERSION;
  131:   config.publish_before_suback = FALSE;
  132:   config.short_publish = FALSE;
  133:   config.error_connack = 0;
  134:   config.testnum = 0;
  135: }
  136: 
  137: static unsigned char byteval(char *value)
  138: {
  139:   unsigned long num = strtoul(value, NULL, 10);
  140:   return num & 0xff;
  141: }
  142: 
  143: static void getconfig(void)
  144: {
  145:   FILE *fp = fopen(configfile, FOPEN_READTEXT);
  146:   resetdefaults();
  147:   if(fp) {
  148:     char buffer[512];
  149:     logmsg("parse config file");
  150:     while(fgets(buffer, sizeof(buffer), fp)) {
  151:       char key[32];
  152:       char value[32];
  153:       if(2 == sscanf(buffer, "%31s %31s", key, value)) {
  154:         if(!strcmp(key, "version")) {
  155:           config.version = byteval(value);
  156:           logmsg("version [%d] set", config.version);
  157:         }
  158:         else if(!strcmp(key, "PUBLISH-before-SUBACK")) {
  159:           logmsg("PUBLISH-before-SUBACK set");
  160:           config.publish_before_suback = TRUE;
  161:         }
  162:         else if(!strcmp(key, "short-PUBLISH")) {
  163:           logmsg("short-PUBLISH set");
  164:           config.short_publish = TRUE;
  165:         }
  166:         else if(!strcmp(key, "error-CONNACK")) {
  167:           config.error_connack = byteval(value);
  168:           logmsg("error-CONNACK = %d", config.error_connack);
  169:         }
  170:         else if(!strcmp(key, "Testnum")) {
  171:           config.testnum = atoi(value);
  172:           logmsg("testnum = %d", config.testnum);
  173:         }
  174:       }
  175:     }
  176:     fclose(fp);
  177:   }
  178:   else {
  179:     logmsg("No config file '%s' to read", configfile);
  180:   }
  181: }
  182: 
  183: static void loghex(unsigned char *buffer, ssize_t len)
  184: {
  185:   char data[12000];
  186:   ssize_t i;
  187:   unsigned char *ptr = buffer;
  188:   char *optr = data;
  189:   ssize_t width = 0;
  190:   int left = sizeof(data);
  191: 
  192:   for(i = 0; i<len && (left >= 0); i++) {
  193:     msnprintf(optr, left, "%02x", ptr[i]);
  194:     width += 2;
  195:     optr += 2;
  196:     left -= 2;
  197:   }
  198:   if(width)
  199:     logmsg("'%s'", data);
  200: }
  201: 
  202: typedef enum {
  203:   FROM_CLIENT,
  204:   FROM_SERVER
  205: } mqttdir;
  206: 
  207: static void logprotocol(mqttdir dir,
  208:                         const char *prefix, size_t remlen,
  209:                         FILE *output,
  210:                         unsigned char *buffer, ssize_t len)
  211: {
  212:   char data[12000] = "";
  213:   ssize_t i;
  214:   unsigned char *ptr = buffer;
  215:   char *optr = data;
  216:   ssize_t width = 0;
  217:   int left = sizeof(data);
  218: 
  219:   for(i = 0; i<len && (left >= 0); i++) {
  220:     msnprintf(optr, left, "%02x", ptr[i]);
  221:     width += 2;
  222:     optr += 2;
  223:     left -= 2;
  224:   }
  225:   fprintf(output, "%s %s %zx %s\n",
  226:           dir == FROM_CLIENT? "client": "server",
  227:           prefix, remlen,
  228:           data);
  229: }
  230: 
  231: 
  232: /* return 0 on success */
  233: static int connack(FILE *dump, curl_socket_t fd)
  234: {
  235:   unsigned char packet[]={
  236:     MQTT_MSG_CONNACK, 0x02,
  237:     0x00, 0x00
  238:   };
  239:   ssize_t rc;
  240: 
  241:   packet[3] = config.error_connack;
  242: 
  243:   rc = swrite(fd, (char *)packet, sizeof(packet));
  244:   if(rc > 0) {
  245:     logmsg("WROTE %d bytes [CONNACK]", rc);
  246:     loghex(packet, rc);
  247:     logprotocol(FROM_SERVER, "CONNACK", 2, dump, packet, sizeof(packet));
  248:   }
  249:   if(rc == sizeof(packet)) {
  250:     return 0;
  251:   }
  252:   return 1;
  253: }
  254: 
  255: /* return 0 on success */
  256: static int suback(FILE *dump, curl_socket_t fd, unsigned short packetid)
  257: {
  258:   unsigned char packet[]={
  259:     MQTT_MSG_SUBACK, 0x03,
  260:     0, 0, /* filled in below */
  261:     0x00
  262:   };
  263:   ssize_t rc;
  264:   packet[2] = (unsigned char)(packetid >> 8);
  265:   packet[3] = (unsigned char)(packetid & 0xff);
  266: 
  267:   rc = swrite(fd, (char *)packet, sizeof(packet));
  268:   if(rc == sizeof(packet)) {
  269:     logmsg("WROTE %d bytes [SUBACK]", rc);
  270:     loghex(packet, rc);
  271:     logprotocol(FROM_SERVER, "SUBACK", 3, dump, packet, rc);
  272:     return 0;
  273:   }
  274:   return 1;
  275: }
  276: 
  277: #ifdef QOS
  278: /* return 0 on success */
  279: static int puback(FILE *dump, curl_socket_t fd, unsigned short packetid)
  280: {
  281:   unsigned char packet[]={
  282:     MQTT_MSG_PUBACK, 0x00,
  283:     0, 0 /* filled in below */
  284:   };
  285:   ssize_t rc;
  286:   packet[2] = (unsigned char)(packetid >> 8);
  287:   packet[3] = (unsigned char)(packetid & 0xff);
  288: 
  289:   rc = swrite(fd, (char *)packet, sizeof(packet));
  290:   if(rc == sizeof(packet)) {
  291:     logmsg("WROTE %d bytes [PUBACK]", rc);
  292:     loghex(packet, rc);
  293:     logprotocol(FROM_SERVER, dump, packet, rc);
  294:     return 0;
  295:   }
  296:   logmsg("Failed sending [PUBACK]");
  297:   return 1;
  298: }
  299: #endif
  300: 
  301: /* return 0 on success */
  302: static int disconnect(FILE *dump, curl_socket_t fd)
  303: {
  304:   unsigned char packet[]={
  305:     MQTT_MSG_DISCONNECT, 0x00,
  306:   };
  307:   ssize_t rc = swrite(fd, (char *)packet, sizeof(packet));
  308:   if(rc == sizeof(packet)) {
  309:     logmsg("WROTE %d bytes [DISCONNECT]", rc);
  310:     loghex(packet, rc);
  311:     logprotocol(FROM_SERVER, "DISCONNECT", 0, dump, packet, rc);
  312:     return 0;
  313:   }
  314:   logmsg("Failed sending [DISCONNECT]");
  315:   return 1;
  316: }
  317: 
  318: 
  319: 
  320: /*
  321:   do
  322: 
  323:      encodedByte = X MOD 128
  324: 
  325:      X = X DIV 128
  326: 
  327:      // if there are more data to encode, set the top bit of this byte
  328: 
  329:      if ( X > 0 )
  330: 
  331:         encodedByte = encodedByte OR 128
  332: 
  333:       endif
  334: 
  335:     'output' encodedByte
  336: 
  337:   while ( X > 0 )
  338: 
  339: */
  340: 
  341: /* return number of bytes used */
  342: static int encode_length(size_t packetlen, char *remlength) /* 4 bytes */
  343: {
  344:   int bytes = 0;
  345:   unsigned char encode;
  346: 
  347:   do {
  348:     encode = packetlen % 0x80;
  349:     packetlen /= 0x80;
  350:     if(packetlen)
  351:       encode |= 0x80;
  352: 
  353:     remlength[bytes++] = encode;
  354: 
  355:     if(bytes > 3) {
  356:       logmsg("too large packet!");
  357:       return 0;
  358:     }
  359:   } while(packetlen);
  360: 
  361:   return bytes;
  362: }
  363: 
  364: 
  365: static size_t decode_length(unsigned char *buf,
  366:                             size_t buflen, size_t *lenbytes)
  367: {
  368:   size_t len = 0;
  369:   size_t mult = 1;
  370:   size_t i;
  371:   unsigned char encoded = 0x80;
  372: 
  373:   for(i = 0; (i < buflen) && (encoded & 0x80); i++) {
  374:     encoded = buf[i];
  375:     len += (encoded & 0x7f) * mult;
  376:     mult *= 0x80;
  377:   }
  378: 
  379:   if(lenbytes)
  380:     *lenbytes = i;
  381: 
  382:   return len;
  383: }
  384: 
  385: 
  386: /* return 0 on success */
  387: static int publish(FILE *dump,
  388:                    curl_socket_t fd, unsigned short packetid,
  389:                    char *topic, char *payload, size_t payloadlen)
  390: {
  391:   size_t topiclen = strlen(topic);
  392:   unsigned char *packet;
  393:   size_t payloadindex;
  394:   ssize_t remaininglength = topiclen + 2 + payloadlen;
  395:   ssize_t packetlen;
  396:   ssize_t sendamount;
  397:   ssize_t rc;
  398:   char rembuffer[4];
  399:   int encodedlen;
  400: 
  401:   encodedlen = encode_length(remaininglength, rembuffer);
  402: 
  403:   /* one packet type byte (possibly two more for packetid) */
  404:   packetlen = remaininglength + encodedlen + 1;
  405:   packet = malloc(packetlen);
  406:   if(!packet)
  407:     return 1;
  408: 
  409:   packet[0] = MQTT_MSG_PUBLISH; /* TODO: set QoS? */
  410:   memcpy(&packet[1], rembuffer, encodedlen);
  411: 
  412:   (void)packetid;
  413:   /* packet_id if QoS is set */
  414: 
  415:   packet[1 + encodedlen] = (unsigned char)(topiclen >> 8);
  416:   packet[2 + encodedlen] = (unsigned char)(topiclen & 0xff);
  417:   memcpy(&packet[3 + encodedlen], topic, topiclen);
  418: 
  419:   payloadindex = 3 + topiclen + encodedlen;
  420:   memcpy(&packet[payloadindex], payload, payloadlen);
  421: 
  422:   sendamount = packetlen;
  423:   if(config.short_publish)
  424:     sendamount -= 2;
  425: 
  426:   rc = swrite(fd, (char *)packet, sendamount);
  427:   if(rc > 0) {
  428:     logmsg("WROTE %d bytes [PUBLISH]", rc);
  429:     loghex(packet, rc);
  430:     logprotocol(FROM_SERVER, "PUBLISH", remaininglength, dump, packet, rc);
  431:   }
  432:   if(rc == packetlen)
  433:     return 0;
  434:   return 1;
  435: }
  436: 
  437: #define MAX_TOPIC_LENGTH 65535
  438: #define MAX_CLIENT_ID_LENGTH 32
  439: 
  440: static char topic[MAX_TOPIC_LENGTH + 1];
  441: 
  442: static int fixedheader(curl_socket_t fd,
  443:                        unsigned char *bytep,
  444:                        size_t *remaining_lengthp,
  445:                        size_t *remaining_length_bytesp)
  446: {
  447:   /* get the fixed header */
  448:   unsigned char buffer[10];
  449: 
  450:   /* get the first two bytes */
  451:   ssize_t rc = sread(fd, (char *)buffer, 2);
  452:   int i;
  453:   if(rc < 2) {
  454:     logmsg("READ %d bytes [SHORT!]", rc);
  455:     return 1; /* fail */
  456:   }
  457:   logmsg("READ %d bytes", rc);
  458:   loghex(buffer, rc);
  459:   *bytep = buffer[0];
  460: 
  461:   /* if the length byte has the top bit set, get the next one too */
  462:   i = 1;
  463:   while(buffer[i] & 0x80) {
  464:     i++;
  465:     rc = sread(fd, (char *)&buffer[i], 1);
  466:     if(rc != 1) {
  467:       logmsg("Remaining Length broken");
  468:       return 1;
  469:     }
  470:   }
  471:   *remaining_lengthp = decode_length(&buffer[1], i, remaining_length_bytesp);
  472:   logmsg("Remaining Length: %ld [%d bytes]", (long) *remaining_lengthp,
  473:          *remaining_length_bytesp);
  474:   return 0;
  475: }
  476: 
  477: static curl_socket_t mqttit(curl_socket_t fd)
  478: {
  479:   unsigned char buffer[10*1024];
  480:   ssize_t rc;
  481:   unsigned char byte;
  482:   unsigned short packet_id;
  483:   size_t payload_len;
  484:   unsigned int topic_len;
  485:   size_t remaining_length = 0;
  486:   size_t bytes = 0; /* remaining length field size in bytes */
  487:   char client_id[MAX_CLIENT_ID_LENGTH];
  488:   long testno;
  489: 
  490:   static const char protocol[7] = {
  491:     0x00, 0x04,       /* protocol length */
  492:     'M','Q','T','T',  /* protocol name */
  493:     0x04              /* protocol level */
  494:   };
  495:   FILE *dump = fopen(REQUEST_DUMP, "ab");
  496:   if(!dump)
  497:     goto end;
  498: 
  499:   getconfig();
  500: 
  501:   testno = config.testnum;
  502: 
  503:   if(testno)
  504:     logmsg("Found test number %ld", testno);
  505: 
  506:   do {
  507:     /* get the fixed header */
  508:     rc = fixedheader(fd, &byte, &remaining_length, &bytes);
  509:     if(rc)
  510:       break;
  511:     if(remaining_length) {
  512:       rc = sread(fd, (char *)buffer, remaining_length);
  513:       if(rc > 0) {
  514:         logmsg("READ %d bytes", rc);
  515:         loghex(buffer, rc);
  516:       }
  517:     }
  518: 
  519:     if(byte == MQTT_MSG_CONNECT) {
  520:       logprotocol(FROM_CLIENT, "CONNECT", remaining_length,
  521:                   dump, buffer, rc);
  522: 
  523:       if(memcmp(protocol, buffer, sizeof(protocol))) {
  524:         logmsg("Protocol preamble mismatch");
  525:         goto end;
  526:       }
  527:       /* ignore the connect flag byte and two keepalive bytes */
  528: 
  529:       payload_len = (buffer[10] << 8) | buffer[11];
  530:       if((ssize_t)payload_len != (rc - 12)) {
  531:         logmsg("Payload length mismatch, expected %x got %x",
  532:                rc - 12, payload_len);
  533:         goto end;
  534:       }
  535:       else if((payload_len + 1) > MAX_CLIENT_ID_LENGTH) {
  536:         logmsg("Too large client id");
  537:         goto end;
  538:       }
  539:       memcpy(client_id, &buffer[14], payload_len);
  540:       client_id[payload_len] = 0;
  541: 
  542:       logmsg("MQTT client connect accepted: %s", client_id);
  543: 
  544:       /* The first packet sent from the Server to the Client MUST be a
  545:          CONNACK Packet */
  546: 
  547:       if(connack(dump, fd)) {
  548:         logmsg("failed sending CONNACK");
  549:         goto end;
  550:       }
  551:     }
  552:     else if(byte == MQTT_MSG_SUBSCRIBE) {
  553:       FILE *stream;
  554:       int error;
  555:       char *data;
  556:       size_t datalen;
  557:       logprotocol(FROM_CLIENT, "SUBSCRIBE", remaining_length,
  558:                   dump, buffer, rc);
  559:       logmsg("Incoming SUBSCRIBE");
  560: 
  561:       if(rc < 6) {
  562:         logmsg("Too small SUBSCRIBE");
  563:         goto end;
  564:       }
  565: 
  566:       /* two bytes packet id */
  567:       packet_id = (unsigned short)((buffer[0] << 8) | buffer[1]);
  568: 
  569:       /* two bytes topic length */
  570:       topic_len = (buffer[2] << 8) | buffer[3];
  571:       if(topic_len != (remaining_length - 5)) {
  572:         logmsg("Wrong topic length, got %d expected %d",
  573:                topic_len, remaining_length - 5);
  574:         goto end;
  575:       }
  576:       memcpy(topic, &buffer[4], topic_len);
  577:       topic[topic_len] = 0;
  578: 
  579:       /* there's a QoS byte (two bits) after the topic */
  580: 
  581:       logmsg("SUBSCRIBE to '%s' [%d]", topic, packet_id);
  582:       stream = test2fopen(testno);
  583:       error = getpart(&data, &datalen, "reply", "data", stream);
  584:       if(!error) {
  585:         if(!config.publish_before_suback) {
  586:           if(suback(dump, fd, packet_id)) {
  587:             logmsg("failed sending SUBACK");
  588:             goto end;
  589:           }
  590:         }
  591:         if(publish(dump, fd, packet_id, topic, data, datalen)) {
  592:           logmsg("PUBLISH failed");
  593:           goto end;
  594:         }
  595:         if(config.publish_before_suback) {
  596:           if(suback(dump, fd, packet_id)) {
  597:             logmsg("failed sending SUBACK");
  598:             goto end;
  599:           }
  600:         }
  601:       }
  602:       else {
  603:         char *def = (char *)"this is random payload yes yes it is";
  604:         publish(dump, fd, packet_id, topic, def, strlen(def));
  605:       }
  606:       disconnect(dump, fd);
  607:     }
  608:     else if((byte & 0xf0) == (MQTT_MSG_PUBLISH & 0xf0)) {
  609:       size_t topiclen;
  610: 
  611:       logmsg("Incoming PUBLISH");
  612:       logprotocol(FROM_CLIENT, "PUBLISH", remaining_length,
  613:                   dump, buffer, rc);
  614: 
  615:       topiclen = (buffer[1 + bytes] << 8) | buffer[2 + bytes];
  616:       logmsg("Got %d bytes topic", topiclen);
  617:       /* TODO: verify topiclen */
  618: 
  619: #ifdef QOS
  620:       /* TODO: handle packetid if there is one. Send puback if QoS > 0 */
  621:       puback(dump, fd, 0);
  622: #endif
  623:       /* expect a disconnect here */
  624:       /* get the request */
  625:       rc = sread(fd, (char *)&buffer[0], 2);
  626: 
  627:       logmsg("READ %d bytes [DISCONNECT]", rc);
  628:       loghex(buffer, rc);
  629:       logprotocol(FROM_CLIENT, "DISCONNECT", 0, dump, buffer, rc);
  630:       goto end;
  631:     }
  632:     else {
  633:       /* not supported (yet) */
  634:       goto end;
  635:     }
  636:   } while(1);
  637: 
  638:   end:
  639:   fclose(dump);
  640:   return CURL_SOCKET_BAD;
  641: }
  642: 
  643: /*
  644:   sockfdp is a pointer to an established stream or CURL_SOCKET_BAD
  645: 
  646:   if sockfd is CURL_SOCKET_BAD, listendfd is a listening socket we must
  647:   accept()
  648: */
  649: static bool incoming(curl_socket_t listenfd)
  650: {
  651:   fd_set fds_read;
  652:   fd_set fds_write;
  653:   fd_set fds_err;
  654:   int clients = 0; /* connected clients */
  655: 
  656:   if(got_exit_signal) {
  657:     logmsg("signalled to die, exiting...");
  658:     return FALSE;
  659:   }
  660: 
  661: #ifdef HAVE_GETPPID
  662:   /* As a last resort, quit if socks5 process becomes orphan. */
  663:   if(getppid() <= 1) {
  664:     logmsg("process becomes orphan, exiting");
  665:     return FALSE;
  666:   }
  667: #endif
  668: 
  669:   do {
  670:     ssize_t rc;
  671:     int error = 0;
  672:     curl_socket_t sockfd = listenfd;
  673:     int maxfd = (int)sockfd;
  674: 
  675:     FD_ZERO(&fds_read);
  676:     FD_ZERO(&fds_write);
  677:     FD_ZERO(&fds_err);
  678: 
  679:     /* there's always a socket to wait for */
  680:     FD_SET(sockfd, &fds_read);
  681: 
  682:     do {
  683:       /* select() blocking behavior call on blocking descriptors please */
  684:       rc = select(maxfd + 1, &fds_read, &fds_write, &fds_err, NULL);
  685:       if(got_exit_signal) {
  686:         logmsg("signalled to die, exiting...");
  687:         return FALSE;
  688:       }
  689:     } while((rc == -1) && ((error = SOCKERRNO) == EINTR));
  690: 
  691:     if(rc < 0) {
  692:       logmsg("select() failed with error: (%d) %s",
  693:              error, strerror(error));
  694:       return FALSE;
  695:     }
  696: 
  697:     if(FD_ISSET(sockfd, &fds_read)) {
  698:       curl_socket_t newfd = accept(sockfd, NULL, NULL);
  699:       if(CURL_SOCKET_BAD == newfd) {
  700:         error = SOCKERRNO;
  701:         logmsg("accept(%d, NULL, NULL) failed with error: (%d) %s",
  702:                sockfd, error, strerror(error));
  703:       }
  704:       else {
  705:         logmsg("====> Client connect, fd %d. Read config from %s",
  706:                newfd, configfile);
  707:         set_advisor_read_lock(SERVERLOGS_LOCK);
  708:         (void)mqttit(newfd); /* until done */
  709:         clear_advisor_read_lock(SERVERLOGS_LOCK);
  710: 
  711:         logmsg("====> Client disconnect");
  712:         sclose(newfd);
  713:       }
  714:     }
  715:   } while(clients);
  716: 
  717:   return TRUE;
  718: }
  719: 
  720: static curl_socket_t sockdaemon(curl_socket_t sock,
  721:                                 unsigned short *listenport)
  722: {
  723:   /* passive daemon style */
  724:   srvr_sockaddr_union_t listener;
  725:   int flag;
  726:   int rc;
  727:   int totdelay = 0;
  728:   int maxretr = 10;
  729:   int delay = 20;
  730:   int attempt = 0;
  731:   int error = 0;
  732: 
  733:   do {
  734:     attempt++;
  735:     flag = 1;
  736:     rc = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
  737:          (void *)&flag, sizeof(flag));
  738:     if(rc) {
  739:       error = SOCKERRNO;
  740:       logmsg("setsockopt(SO_REUSEADDR) failed with error: (%d) %s",
  741:              error, strerror(error));
  742:       if(maxretr) {
  743:         rc = wait_ms(delay);
  744:         if(rc) {
  745:           /* should not happen */
  746:           logmsg("wait_ms() failed with error: %d", rc);
  747:           sclose(sock);
  748:           return CURL_SOCKET_BAD;
  749:         }
  750:         if(got_exit_signal) {
  751:           logmsg("signalled to die, exiting...");
  752:           sclose(sock);
  753:           return CURL_SOCKET_BAD;
  754:         }
  755:         totdelay += delay;
  756:         delay *= 2; /* double the sleep for next attempt */
  757:       }
  758:     }
  759:   } while(rc && maxretr--);
  760: 
  761:   if(rc) {
  762:     logmsg("setsockopt(SO_REUSEADDR) failed %d times in %d ms. Error: (%d) %s",
  763:            attempt, totdelay, error, strerror(error));
  764:     logmsg("Continuing anyway...");
  765:   }
  766: 
  767:   /* When the specified listener port is zero, it is actually a
  768:      request to let the system choose a non-zero available port. */
  769: 
  770: #ifdef ENABLE_IPV6
  771:   if(!use_ipv6) {
  772: #endif
  773:     memset(&listener.sa4, 0, sizeof(listener.sa4));
  774:     listener.sa4.sin_family = AF_INET;
  775:     listener.sa4.sin_addr.s_addr = INADDR_ANY;
  776:     listener.sa4.sin_port = htons(*listenport);
  777:     rc = bind(sock, &listener.sa, sizeof(listener.sa4));
  778: #ifdef ENABLE_IPV6
  779:   }
  780:   else {
  781:     memset(&listener.sa6, 0, sizeof(listener.sa6));
  782:     listener.sa6.sin6_family = AF_INET6;
  783:     listener.sa6.sin6_addr = in6addr_any;
  784:     listener.sa6.sin6_port = htons(*listenport);
  785:     rc = bind(sock, &listener.sa, sizeof(listener.sa6));
  786:   }
  787: #endif /* ENABLE_IPV6 */
  788:   if(rc) {
  789:     error = SOCKERRNO;
  790:     logmsg("Error binding socket on port %hu: (%d) %s",
  791:            *listenport, error, strerror(error));
  792:     sclose(sock);
  793:     return CURL_SOCKET_BAD;
  794:   }
  795: 
  796:   if(!*listenport) {
  797:     /* The system was supposed to choose a port number, figure out which
  798:        port we actually got and update the listener port value with it. */
  799:     curl_socklen_t la_size;
  800:     srvr_sockaddr_union_t localaddr;
  801: #ifdef ENABLE_IPV6
  802:     if(!use_ipv6)
  803: #endif
  804:       la_size = sizeof(localaddr.sa4);
  805: #ifdef ENABLE_IPV6
  806:     else
  807:       la_size = sizeof(localaddr.sa6);
  808: #endif
  809:     memset(&localaddr.sa, 0, (size_t)la_size);
  810:     if(getsockname(sock, &localaddr.sa, &la_size) < 0) {
  811:       error = SOCKERRNO;
  812:       logmsg("getsockname() failed with error: (%d) %s",
  813:              error, strerror(error));
  814:       sclose(sock);
  815:       return CURL_SOCKET_BAD;
  816:     }
  817:     switch(localaddr.sa.sa_family) {
  818:     case AF_INET:
  819:       *listenport = ntohs(localaddr.sa4.sin_port);
  820:       break;
  821: #ifdef ENABLE_IPV6
  822:     case AF_INET6:
  823:       *listenport = ntohs(localaddr.sa6.sin6_port);
  824:       break;
  825: #endif
  826:     default:
  827:       break;
  828:     }
  829:     if(!*listenport) {
  830:       /* Real failure, listener port shall not be zero beyond this point. */
  831:       logmsg("Apparently getsockname() succeeded, with listener port zero.");
  832:       logmsg("A valid reason for this failure is a binary built without");
  833:       logmsg("proper network library linkage. This might not be the only");
  834:       logmsg("reason, but double check it before anything else.");
  835:       sclose(sock);
  836:       return CURL_SOCKET_BAD;
  837:     }
  838:   }
  839: 
  840:   /* start accepting connections */
  841:   rc = listen(sock, 5);
  842:   if(0 != rc) {
  843:     error = SOCKERRNO;
  844:     logmsg("listen(%d, 5) failed with error: (%d) %s",
  845:            sock, error, strerror(error));
  846:     sclose(sock);
  847:     return CURL_SOCKET_BAD;
  848:   }
  849: 
  850:   return sock;
  851: }
  852: 
  853: 
  854: int main(int argc, char *argv[])
  855: {
  856:   curl_socket_t sock = CURL_SOCKET_BAD;
  857:   curl_socket_t msgsock = CURL_SOCKET_BAD;
  858:   int wrotepidfile = 0;
  859:   int wroteportfile = 0;
  860:   const char *pidname = ".mqttd.pid";
  861:   const char *portname = ".mqttd.port";
  862:   bool juggle_again;
  863:   int error;
  864:   int arg = 1;
  865: 
  866:   while(argc>arg) {
  867:     if(!strcmp("--version", argv[arg])) {
  868:       printf("mqttd IPv4%s\n",
  869: #ifdef ENABLE_IPV6
  870:              "/IPv6"
  871: #else
  872:              ""
  873: #endif
  874:              );
  875:       return 0;
  876:     }
  877:     else if(!strcmp("--pidfile", argv[arg])) {
  878:       arg++;
  879:       if(argc>arg)
  880:         pidname = argv[arg++];
  881:     }
  882:     else if(!strcmp("--portfile", argv[arg])) {
  883:       arg++;
  884:       if(argc>arg)
  885:         portname = argv[arg++];
  886:     }
  887:     else if(!strcmp("--config", argv[arg])) {
  888:       arg++;
  889:       if(argc>arg)
  890:         configfile = argv[arg++];
  891:     }
  892:     else if(!strcmp("--logfile", argv[arg])) {
  893:       arg++;
  894:       if(argc>arg)
  895:         serverlogfile = argv[arg++];
  896:     }
  897:     else if(!strcmp("--ipv6", argv[arg])) {
  898: #ifdef ENABLE_IPV6
  899:       ipv_inuse = "IPv6";
  900:       use_ipv6 = TRUE;
  901: #endif
  902:       arg++;
  903:     }
  904:     else if(!strcmp("--ipv4", argv[arg])) {
  905:       /* for completeness, we support this option as well */
  906: #ifdef ENABLE_IPV6
  907:       ipv_inuse = "IPv4";
  908:       use_ipv6 = FALSE;
  909: #endif
  910:       arg++;
  911:     }
  912:     else if(!strcmp("--port", argv[arg])) {
  913:       arg++;
  914:       if(argc>arg) {
  915:         char *endptr;
  916:         unsigned long ulnum = strtoul(argv[arg], &endptr, 10);
  917:         if((endptr != argv[arg] + strlen(argv[arg])) ||
  918:            ((ulnum != 0UL) && ((ulnum < 1025UL) || (ulnum > 65535UL)))) {
  919:           fprintf(stderr, "mqttd: invalid --port argument (%s)\n",
  920:                   argv[arg]);
  921:           return 0;
  922:         }
  923:         port = curlx_ultous(ulnum);
  924:         arg++;
  925:       }
  926:     }
  927:     else {
  928:       puts("Usage: mqttd [option]\n"
  929:            " --config [file]\n"
  930:            " --version\n"
  931:            " --logfile [file]\n"
  932:            " --pidfile [file]\n"
  933:            " --ipv4\n"
  934:            " --ipv6\n"
  935:            " --port [port]\n");
  936:       return 0;
  937:     }
  938:   }
  939: 
  940: #ifdef WIN32
  941:   win32_init();
  942:   atexit(win32_cleanup);
  943: 
  944:   setmode(fileno(stdin), O_BINARY);
  945:   setmode(fileno(stdout), O_BINARY);
  946:   setmode(fileno(stderr), O_BINARY);
  947: #endif
  948: 
  949:   install_signal_handlers(FALSE);
  950: 
  951: #ifdef ENABLE_IPV6
  952:   if(!use_ipv6)
  953: #endif
  954:     sock = socket(AF_INET, SOCK_STREAM, 0);
  955: #ifdef ENABLE_IPV6
  956:   else
  957:     sock = socket(AF_INET6, SOCK_STREAM, 0);
  958: #endif
  959: 
  960:   if(CURL_SOCKET_BAD == sock) {
  961:     error = SOCKERRNO;
  962:     logmsg("Error creating socket: (%d) %s",
  963:            error, strerror(error));
  964:     goto mqttd_cleanup;
  965:   }
  966: 
  967:   {
  968:     /* passive daemon style */
  969:     sock = sockdaemon(sock, &port);
  970:     if(CURL_SOCKET_BAD == sock) {
  971:       goto mqttd_cleanup;
  972:     }
  973:     msgsock = CURL_SOCKET_BAD; /* no stream socket yet */
  974:   }
  975: 
  976:   logmsg("Running %s version", ipv_inuse);
  977:   logmsg("Listening on port %hu", port);
  978: 
  979:   wrotepidfile = write_pidfile(pidname);
  980:   if(!wrotepidfile) {
  981:     goto mqttd_cleanup;
  982:   }
  983: 
  984:   wroteportfile = write_portfile(portname, (int)port);
  985:   if(!wroteportfile) {
  986:     goto mqttd_cleanup;
  987:   }
  988: 
  989:   do {
  990:     juggle_again = incoming(sock);
  991:   } while(juggle_again);
  992: 
  993: mqttd_cleanup:
  994: 
  995:   if((msgsock != sock) && (msgsock != CURL_SOCKET_BAD))
  996:     sclose(msgsock);
  997: 
  998:   if(sock != CURL_SOCKET_BAD)
  999:     sclose(sock);
 1000: 
 1001:   if(wrotepidfile)
 1002:     unlink(pidname);
 1003: 
 1004:   restore_signal_handlers(FALSE);
 1005: 
 1006:   if(got_exit_signal) {
 1007:     logmsg("============> mqttd exits with signal (%d)", exit_signal);
 1008:     /*
 1009:      * To properly set the return status of the process we
 1010:      * must raise the same signal SIGINT or SIGTERM that we
 1011:      * caught and let the old handler take care of it.
 1012:      */
 1013:     raise(exit_signal);
 1014:   }
 1015: 
 1016:   logmsg("============> mqttd quits");
 1017:   return 0;
 1018: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>