Annotation of embedaddon/curl/tests/server/mqttd.c, revision 1.1
1.1 ! misho 1: /***************************************************************************
! 2: * _ _ ____ _
! 3: * Project ___| | | | _ \| |
! 4: * / __| | | | |_) | |
! 5: * | (__| |_| | _ <| |___
! 6: * \___|\___/|_| \_\_____|
! 7: *
! 8: * Copyright (C) 1998 - 2020, Daniel Stenberg, <daniel@haxx.se>, et al.
! 9: *
! 10: * This software is licensed as described in the file COPYING, which
! 11: * you should have received as part of this distribution. The terms
! 12: * are also available at https://curl.haxx.se/docs/copyright.html.
! 13: *
! 14: * You may opt to use, copy, modify, merge, publish, distribute and/or sell
! 15: * copies of the Software, and permit persons to whom the Software is
! 16: * furnished to do so, under the terms of the COPYING file.
! 17: *
! 18: * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
! 19: * KIND, either express or implied.
! 20: *
! 21: ***************************************************************************/
! 22: #include "server_setup.h"
! 23: #include <stdlib.h>
! 24: #include <string.h>
! 25: #include "util.h"
! 26:
! 27: /* Function
! 28: *
! 29: * Accepts a TCP connection on a custom port (IPv4 or IPv6). Speaks MQTT.
! 30: *
! 31: * Read commands from FILE (set with --config). The commands control how to
! 32: * act and is reset to defaults each client TCP connect.
! 33: *
! 34: * Config file keywords:
! 35: *
! 36: * TODO
! 37: */
! 38:
! 39: /* based on sockfilt.c */
! 40:
! 41: #ifdef HAVE_SIGNAL_H
! 42: #include <signal.h>
! 43: #endif
! 44: #ifdef HAVE_NETINET_IN_H
! 45: #include <netinet/in.h>
! 46: #endif
! 47: #ifdef HAVE_NETINET_IN6_H
! 48: #include <netinet/in6.h>
! 49: #endif
! 50: #ifdef HAVE_ARPA_INET_H
! 51: #include <arpa/inet.h>
! 52: #endif
! 53: #ifdef HAVE_NETDB_H
! 54: #include <netdb.h>
! 55: #endif
! 56:
! 57: #define ENABLE_CURLX_PRINTF
! 58: /* make the curlx header define all printf() functions to use the curlx_*
! 59: versions instead */
! 60: #include "curlx.h" /* from the private lib dir */
! 61: #include "getpart.h"
! 62: #include "inet_pton.h"
! 63: #include "util.h"
! 64: #include "server_sockaddr.h"
! 65: #include "warnless.h"
! 66:
! 67: /* include memdebug.h last */
! 68: #include "memdebug.h"
! 69:
! 70: #ifdef USE_WINSOCK
! 71: #undef EINTR
! 72: #define EINTR 4 /* errno.h value */
! 73: #undef EAGAIN
! 74: #define EAGAIN 11 /* errno.h value */
! 75: #undef ENOMEM
! 76: #define ENOMEM 12 /* errno.h value */
! 77: #undef EINVAL
! 78: #define EINVAL 22 /* errno.h value */
! 79: #endif
! 80:
! 81: #define DEFAULT_PORT 1883 /* MQTT default port */
! 82:
! 83: #ifndef DEFAULT_LOGFILE
! 84: #define DEFAULT_LOGFILE "log/mqttd.log"
! 85: #endif
! 86:
! 87: #ifndef DEFAULT_CONFIG
! 88: #define DEFAULT_CONFIG "mqttd.config"
! 89: #endif
! 90:
! 91: #define MQTT_MSG_CONNECT 0x10
! 92: #define MQTT_MSG_CONNACK 0x20
! 93: #define MQTT_MSG_PUBLISH 0x30
! 94: #define MQTT_MSG_PUBACK 0x40
! 95: #define MQTT_MSG_SUBSCRIBE 0x82
! 96: #define MQTT_MSG_SUBACK 0x90
! 97: #define MQTT_MSG_DISCONNECT 0xe0
! 98:
! 99: #define MQTT_CONNACK_LEN 4
! 100: #define MQTT_SUBACK_LEN 5
! 101: #define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */
! 102: #define MQTT_HEADER_LEN 5 /* max 5 bytes */
! 103:
! 104: struct configurable {
! 105: unsigned char version; /* initial version byte in the request must match
! 106: this */
! 107: bool publish_before_suback;
! 108: bool short_publish;
! 109: unsigned char error_connack;
! 110: int testnum;
! 111: };
! 112:
! 113: #define REQUEST_DUMP "log/server.input"
! 114: #define CONFIG_VERSION 5
! 115:
! 116: static struct configurable config;
! 117:
! 118: const char *serverlogfile = DEFAULT_LOGFILE;
! 119: static const char *configfile = DEFAULT_CONFIG;
! 120:
! 121: #ifdef ENABLE_IPV6
! 122: static bool use_ipv6 = FALSE;
! 123: #endif
! 124: static const char *ipv_inuse = "IPv4";
! 125: static unsigned short port = DEFAULT_PORT;
! 126:
! 127: static void resetdefaults(void)
! 128: {
! 129: logmsg("Reset to defaults");
! 130: config.version = CONFIG_VERSION;
! 131: config.publish_before_suback = FALSE;
! 132: config.short_publish = FALSE;
! 133: config.error_connack = 0;
! 134: config.testnum = 0;
! 135: }
! 136:
! 137: static unsigned char byteval(char *value)
! 138: {
! 139: unsigned long num = strtoul(value, NULL, 10);
! 140: return num & 0xff;
! 141: }
! 142:
! 143: static void getconfig(void)
! 144: {
! 145: FILE *fp = fopen(configfile, FOPEN_READTEXT);
! 146: resetdefaults();
! 147: if(fp) {
! 148: char buffer[512];
! 149: logmsg("parse config file");
! 150: while(fgets(buffer, sizeof(buffer), fp)) {
! 151: char key[32];
! 152: char value[32];
! 153: if(2 == sscanf(buffer, "%31s %31s", key, value)) {
! 154: if(!strcmp(key, "version")) {
! 155: config.version = byteval(value);
! 156: logmsg("version [%d] set", config.version);
! 157: }
! 158: else if(!strcmp(key, "PUBLISH-before-SUBACK")) {
! 159: logmsg("PUBLISH-before-SUBACK set");
! 160: config.publish_before_suback = TRUE;
! 161: }
! 162: else if(!strcmp(key, "short-PUBLISH")) {
! 163: logmsg("short-PUBLISH set");
! 164: config.short_publish = TRUE;
! 165: }
! 166: else if(!strcmp(key, "error-CONNACK")) {
! 167: config.error_connack = byteval(value);
! 168: logmsg("error-CONNACK = %d", config.error_connack);
! 169: }
! 170: else if(!strcmp(key, "Testnum")) {
! 171: config.testnum = atoi(value);
! 172: logmsg("testnum = %d", config.testnum);
! 173: }
! 174: }
! 175: }
! 176: fclose(fp);
! 177: }
! 178: else {
! 179: logmsg("No config file '%s' to read", configfile);
! 180: }
! 181: }
! 182:
! 183: static void loghex(unsigned char *buffer, ssize_t len)
! 184: {
! 185: char data[12000];
! 186: ssize_t i;
! 187: unsigned char *ptr = buffer;
! 188: char *optr = data;
! 189: ssize_t width = 0;
! 190: int left = sizeof(data);
! 191:
! 192: for(i = 0; i<len && (left >= 0); i++) {
! 193: msnprintf(optr, left, "%02x", ptr[i]);
! 194: width += 2;
! 195: optr += 2;
! 196: left -= 2;
! 197: }
! 198: if(width)
! 199: logmsg("'%s'", data);
! 200: }
! 201:
! 202: typedef enum {
! 203: FROM_CLIENT,
! 204: FROM_SERVER
! 205: } mqttdir;
! 206:
! 207: static void logprotocol(mqttdir dir,
! 208: const char *prefix, size_t remlen,
! 209: FILE *output,
! 210: unsigned char *buffer, ssize_t len)
! 211: {
! 212: char data[12000] = "";
! 213: ssize_t i;
! 214: unsigned char *ptr = buffer;
! 215: char *optr = data;
! 216: ssize_t width = 0;
! 217: int left = sizeof(data);
! 218:
! 219: for(i = 0; i<len && (left >= 0); i++) {
! 220: msnprintf(optr, left, "%02x", ptr[i]);
! 221: width += 2;
! 222: optr += 2;
! 223: left -= 2;
! 224: }
! 225: fprintf(output, "%s %s %zx %s\n",
! 226: dir == FROM_CLIENT? "client": "server",
! 227: prefix, remlen,
! 228: data);
! 229: }
! 230:
! 231:
! 232: /* return 0 on success */
! 233: static int connack(FILE *dump, curl_socket_t fd)
! 234: {
! 235: unsigned char packet[]={
! 236: MQTT_MSG_CONNACK, 0x02,
! 237: 0x00, 0x00
! 238: };
! 239: ssize_t rc;
! 240:
! 241: packet[3] = config.error_connack;
! 242:
! 243: rc = swrite(fd, (char *)packet, sizeof(packet));
! 244: if(rc > 0) {
! 245: logmsg("WROTE %d bytes [CONNACK]", rc);
! 246: loghex(packet, rc);
! 247: logprotocol(FROM_SERVER, "CONNACK", 2, dump, packet, sizeof(packet));
! 248: }
! 249: if(rc == sizeof(packet)) {
! 250: return 0;
! 251: }
! 252: return 1;
! 253: }
! 254:
! 255: /* return 0 on success */
! 256: static int suback(FILE *dump, curl_socket_t fd, unsigned short packetid)
! 257: {
! 258: unsigned char packet[]={
! 259: MQTT_MSG_SUBACK, 0x03,
! 260: 0, 0, /* filled in below */
! 261: 0x00
! 262: };
! 263: ssize_t rc;
! 264: packet[2] = (unsigned char)(packetid >> 8);
! 265: packet[3] = (unsigned char)(packetid & 0xff);
! 266:
! 267: rc = swrite(fd, (char *)packet, sizeof(packet));
! 268: if(rc == sizeof(packet)) {
! 269: logmsg("WROTE %d bytes [SUBACK]", rc);
! 270: loghex(packet, rc);
! 271: logprotocol(FROM_SERVER, "SUBACK", 3, dump, packet, rc);
! 272: return 0;
! 273: }
! 274: return 1;
! 275: }
! 276:
! 277: #ifdef QOS
! 278: /* return 0 on success */
! 279: static int puback(FILE *dump, curl_socket_t fd, unsigned short packetid)
! 280: {
! 281: unsigned char packet[]={
! 282: MQTT_MSG_PUBACK, 0x00,
! 283: 0, 0 /* filled in below */
! 284: };
! 285: ssize_t rc;
! 286: packet[2] = (unsigned char)(packetid >> 8);
! 287: packet[3] = (unsigned char)(packetid & 0xff);
! 288:
! 289: rc = swrite(fd, (char *)packet, sizeof(packet));
! 290: if(rc == sizeof(packet)) {
! 291: logmsg("WROTE %d bytes [PUBACK]", rc);
! 292: loghex(packet, rc);
! 293: logprotocol(FROM_SERVER, dump, packet, rc);
! 294: return 0;
! 295: }
! 296: logmsg("Failed sending [PUBACK]");
! 297: return 1;
! 298: }
! 299: #endif
! 300:
! 301: /* return 0 on success */
! 302: static int disconnect(FILE *dump, curl_socket_t fd)
! 303: {
! 304: unsigned char packet[]={
! 305: MQTT_MSG_DISCONNECT, 0x00,
! 306: };
! 307: ssize_t rc = swrite(fd, (char *)packet, sizeof(packet));
! 308: if(rc == sizeof(packet)) {
! 309: logmsg("WROTE %d bytes [DISCONNECT]", rc);
! 310: loghex(packet, rc);
! 311: logprotocol(FROM_SERVER, "DISCONNECT", 0, dump, packet, rc);
! 312: return 0;
! 313: }
! 314: logmsg("Failed sending [DISCONNECT]");
! 315: return 1;
! 316: }
! 317:
! 318:
! 319:
! 320: /*
! 321: do
! 322:
! 323: encodedByte = X MOD 128
! 324:
! 325: X = X DIV 128
! 326:
! 327: // if there are more data to encode, set the top bit of this byte
! 328:
! 329: if ( X > 0 )
! 330:
! 331: encodedByte = encodedByte OR 128
! 332:
! 333: endif
! 334:
! 335: 'output' encodedByte
! 336:
! 337: while ( X > 0 )
! 338:
! 339: */
! 340:
! 341: /* return number of bytes used */
! 342: static int encode_length(size_t packetlen, char *remlength) /* 4 bytes */
! 343: {
! 344: int bytes = 0;
! 345: unsigned char encode;
! 346:
! 347: do {
! 348: encode = packetlen % 0x80;
! 349: packetlen /= 0x80;
! 350: if(packetlen)
! 351: encode |= 0x80;
! 352:
! 353: remlength[bytes++] = encode;
! 354:
! 355: if(bytes > 3) {
! 356: logmsg("too large packet!");
! 357: return 0;
! 358: }
! 359: } while(packetlen);
! 360:
! 361: return bytes;
! 362: }
! 363:
! 364:
! 365: static size_t decode_length(unsigned char *buf,
! 366: size_t buflen, size_t *lenbytes)
! 367: {
! 368: size_t len = 0;
! 369: size_t mult = 1;
! 370: size_t i;
! 371: unsigned char encoded = 0x80;
! 372:
! 373: for(i = 0; (i < buflen) && (encoded & 0x80); i++) {
! 374: encoded = buf[i];
! 375: len += (encoded & 0x7f) * mult;
! 376: mult *= 0x80;
! 377: }
! 378:
! 379: if(lenbytes)
! 380: *lenbytes = i;
! 381:
! 382: return len;
! 383: }
! 384:
! 385:
! 386: /* return 0 on success */
! 387: static int publish(FILE *dump,
! 388: curl_socket_t fd, unsigned short packetid,
! 389: char *topic, char *payload, size_t payloadlen)
! 390: {
! 391: size_t topiclen = strlen(topic);
! 392: unsigned char *packet;
! 393: size_t payloadindex;
! 394: ssize_t remaininglength = topiclen + 2 + payloadlen;
! 395: ssize_t packetlen;
! 396: ssize_t sendamount;
! 397: ssize_t rc;
! 398: char rembuffer[4];
! 399: int encodedlen;
! 400:
! 401: encodedlen = encode_length(remaininglength, rembuffer);
! 402:
! 403: /* one packet type byte (possibly two more for packetid) */
! 404: packetlen = remaininglength + encodedlen + 1;
! 405: packet = malloc(packetlen);
! 406: if(!packet)
! 407: return 1;
! 408:
! 409: packet[0] = MQTT_MSG_PUBLISH; /* TODO: set QoS? */
! 410: memcpy(&packet[1], rembuffer, encodedlen);
! 411:
! 412: (void)packetid;
! 413: /* packet_id if QoS is set */
! 414:
! 415: packet[1 + encodedlen] = (unsigned char)(topiclen >> 8);
! 416: packet[2 + encodedlen] = (unsigned char)(topiclen & 0xff);
! 417: memcpy(&packet[3 + encodedlen], topic, topiclen);
! 418:
! 419: payloadindex = 3 + topiclen + encodedlen;
! 420: memcpy(&packet[payloadindex], payload, payloadlen);
! 421:
! 422: sendamount = packetlen;
! 423: if(config.short_publish)
! 424: sendamount -= 2;
! 425:
! 426: rc = swrite(fd, (char *)packet, sendamount);
! 427: if(rc > 0) {
! 428: logmsg("WROTE %d bytes [PUBLISH]", rc);
! 429: loghex(packet, rc);
! 430: logprotocol(FROM_SERVER, "PUBLISH", remaininglength, dump, packet, rc);
! 431: }
! 432: if(rc == packetlen)
! 433: return 0;
! 434: return 1;
! 435: }
! 436:
! 437: #define MAX_TOPIC_LENGTH 65535
! 438: #define MAX_CLIENT_ID_LENGTH 32
! 439:
! 440: static char topic[MAX_TOPIC_LENGTH + 1];
! 441:
! 442: static int fixedheader(curl_socket_t fd,
! 443: unsigned char *bytep,
! 444: size_t *remaining_lengthp,
! 445: size_t *remaining_length_bytesp)
! 446: {
! 447: /* get the fixed header */
! 448: unsigned char buffer[10];
! 449:
! 450: /* get the first two bytes */
! 451: ssize_t rc = sread(fd, (char *)buffer, 2);
! 452: int i;
! 453: if(rc < 2) {
! 454: logmsg("READ %d bytes [SHORT!]", rc);
! 455: return 1; /* fail */
! 456: }
! 457: logmsg("READ %d bytes", rc);
! 458: loghex(buffer, rc);
! 459: *bytep = buffer[0];
! 460:
! 461: /* if the length byte has the top bit set, get the next one too */
! 462: i = 1;
! 463: while(buffer[i] & 0x80) {
! 464: i++;
! 465: rc = sread(fd, (char *)&buffer[i], 1);
! 466: if(rc != 1) {
! 467: logmsg("Remaining Length broken");
! 468: return 1;
! 469: }
! 470: }
! 471: *remaining_lengthp = decode_length(&buffer[1], i, remaining_length_bytesp);
! 472: logmsg("Remaining Length: %ld [%d bytes]", (long) *remaining_lengthp,
! 473: *remaining_length_bytesp);
! 474: return 0;
! 475: }
! 476:
! 477: static curl_socket_t mqttit(curl_socket_t fd)
! 478: {
! 479: unsigned char buffer[10*1024];
! 480: ssize_t rc;
! 481: unsigned char byte;
! 482: unsigned short packet_id;
! 483: size_t payload_len;
! 484: unsigned int topic_len;
! 485: size_t remaining_length = 0;
! 486: size_t bytes = 0; /* remaining length field size in bytes */
! 487: char client_id[MAX_CLIENT_ID_LENGTH];
! 488: long testno;
! 489:
! 490: static const char protocol[7] = {
! 491: 0x00, 0x04, /* protocol length */
! 492: 'M','Q','T','T', /* protocol name */
! 493: 0x04 /* protocol level */
! 494: };
! 495: FILE *dump = fopen(REQUEST_DUMP, "ab");
! 496: if(!dump)
! 497: goto end;
! 498:
! 499: getconfig();
! 500:
! 501: testno = config.testnum;
! 502:
! 503: if(testno)
! 504: logmsg("Found test number %ld", testno);
! 505:
! 506: do {
! 507: /* get the fixed header */
! 508: rc = fixedheader(fd, &byte, &remaining_length, &bytes);
! 509: if(rc)
! 510: break;
! 511: if(remaining_length) {
! 512: rc = sread(fd, (char *)buffer, remaining_length);
! 513: if(rc > 0) {
! 514: logmsg("READ %d bytes", rc);
! 515: loghex(buffer, rc);
! 516: }
! 517: }
! 518:
! 519: if(byte == MQTT_MSG_CONNECT) {
! 520: logprotocol(FROM_CLIENT, "CONNECT", remaining_length,
! 521: dump, buffer, rc);
! 522:
! 523: if(memcmp(protocol, buffer, sizeof(protocol))) {
! 524: logmsg("Protocol preamble mismatch");
! 525: goto end;
! 526: }
! 527: /* ignore the connect flag byte and two keepalive bytes */
! 528:
! 529: payload_len = (buffer[10] << 8) | buffer[11];
! 530: if((ssize_t)payload_len != (rc - 12)) {
! 531: logmsg("Payload length mismatch, expected %x got %x",
! 532: rc - 12, payload_len);
! 533: goto end;
! 534: }
! 535: else if((payload_len + 1) > MAX_CLIENT_ID_LENGTH) {
! 536: logmsg("Too large client id");
! 537: goto end;
! 538: }
! 539: memcpy(client_id, &buffer[14], payload_len);
! 540: client_id[payload_len] = 0;
! 541:
! 542: logmsg("MQTT client connect accepted: %s", client_id);
! 543:
! 544: /* The first packet sent from the Server to the Client MUST be a
! 545: CONNACK Packet */
! 546:
! 547: if(connack(dump, fd)) {
! 548: logmsg("failed sending CONNACK");
! 549: goto end;
! 550: }
! 551: }
! 552: else if(byte == MQTT_MSG_SUBSCRIBE) {
! 553: FILE *stream;
! 554: int error;
! 555: char *data;
! 556: size_t datalen;
! 557: logprotocol(FROM_CLIENT, "SUBSCRIBE", remaining_length,
! 558: dump, buffer, rc);
! 559: logmsg("Incoming SUBSCRIBE");
! 560:
! 561: if(rc < 6) {
! 562: logmsg("Too small SUBSCRIBE");
! 563: goto end;
! 564: }
! 565:
! 566: /* two bytes packet id */
! 567: packet_id = (unsigned short)((buffer[0] << 8) | buffer[1]);
! 568:
! 569: /* two bytes topic length */
! 570: topic_len = (buffer[2] << 8) | buffer[3];
! 571: if(topic_len != (remaining_length - 5)) {
! 572: logmsg("Wrong topic length, got %d expected %d",
! 573: topic_len, remaining_length - 5);
! 574: goto end;
! 575: }
! 576: memcpy(topic, &buffer[4], topic_len);
! 577: topic[topic_len] = 0;
! 578:
! 579: /* there's a QoS byte (two bits) after the topic */
! 580:
! 581: logmsg("SUBSCRIBE to '%s' [%d]", topic, packet_id);
! 582: stream = test2fopen(testno);
! 583: error = getpart(&data, &datalen, "reply", "data", stream);
! 584: if(!error) {
! 585: if(!config.publish_before_suback) {
! 586: if(suback(dump, fd, packet_id)) {
! 587: logmsg("failed sending SUBACK");
! 588: goto end;
! 589: }
! 590: }
! 591: if(publish(dump, fd, packet_id, topic, data, datalen)) {
! 592: logmsg("PUBLISH failed");
! 593: goto end;
! 594: }
! 595: if(config.publish_before_suback) {
! 596: if(suback(dump, fd, packet_id)) {
! 597: logmsg("failed sending SUBACK");
! 598: goto end;
! 599: }
! 600: }
! 601: }
! 602: else {
! 603: char *def = (char *)"this is random payload yes yes it is";
! 604: publish(dump, fd, packet_id, topic, def, strlen(def));
! 605: }
! 606: disconnect(dump, fd);
! 607: }
! 608: else if((byte & 0xf0) == (MQTT_MSG_PUBLISH & 0xf0)) {
! 609: size_t topiclen;
! 610:
! 611: logmsg("Incoming PUBLISH");
! 612: logprotocol(FROM_CLIENT, "PUBLISH", remaining_length,
! 613: dump, buffer, rc);
! 614:
! 615: topiclen = (buffer[1 + bytes] << 8) | buffer[2 + bytes];
! 616: logmsg("Got %d bytes topic", topiclen);
! 617: /* TODO: verify topiclen */
! 618:
! 619: #ifdef QOS
! 620: /* TODO: handle packetid if there is one. Send puback if QoS > 0 */
! 621: puback(dump, fd, 0);
! 622: #endif
! 623: /* expect a disconnect here */
! 624: /* get the request */
! 625: rc = sread(fd, (char *)&buffer[0], 2);
! 626:
! 627: logmsg("READ %d bytes [DISCONNECT]", rc);
! 628: loghex(buffer, rc);
! 629: logprotocol(FROM_CLIENT, "DISCONNECT", 0, dump, buffer, rc);
! 630: goto end;
! 631: }
! 632: else {
! 633: /* not supported (yet) */
! 634: goto end;
! 635: }
! 636: } while(1);
! 637:
! 638: end:
! 639: fclose(dump);
! 640: return CURL_SOCKET_BAD;
! 641: }
! 642:
! 643: /*
! 644: sockfdp is a pointer to an established stream or CURL_SOCKET_BAD
! 645:
! 646: if sockfd is CURL_SOCKET_BAD, listendfd is a listening socket we must
! 647: accept()
! 648: */
! 649: static bool incoming(curl_socket_t listenfd)
! 650: {
! 651: fd_set fds_read;
! 652: fd_set fds_write;
! 653: fd_set fds_err;
! 654: int clients = 0; /* connected clients */
! 655:
! 656: if(got_exit_signal) {
! 657: logmsg("signalled to die, exiting...");
! 658: return FALSE;
! 659: }
! 660:
! 661: #ifdef HAVE_GETPPID
! 662: /* As a last resort, quit if socks5 process becomes orphan. */
! 663: if(getppid() <= 1) {
! 664: logmsg("process becomes orphan, exiting");
! 665: return FALSE;
! 666: }
! 667: #endif
! 668:
! 669: do {
! 670: ssize_t rc;
! 671: int error = 0;
! 672: curl_socket_t sockfd = listenfd;
! 673: int maxfd = (int)sockfd;
! 674:
! 675: FD_ZERO(&fds_read);
! 676: FD_ZERO(&fds_write);
! 677: FD_ZERO(&fds_err);
! 678:
! 679: /* there's always a socket to wait for */
! 680: FD_SET(sockfd, &fds_read);
! 681:
! 682: do {
! 683: /* select() blocking behavior call on blocking descriptors please */
! 684: rc = select(maxfd + 1, &fds_read, &fds_write, &fds_err, NULL);
! 685: if(got_exit_signal) {
! 686: logmsg("signalled to die, exiting...");
! 687: return FALSE;
! 688: }
! 689: } while((rc == -1) && ((error = SOCKERRNO) == EINTR));
! 690:
! 691: if(rc < 0) {
! 692: logmsg("select() failed with error: (%d) %s",
! 693: error, strerror(error));
! 694: return FALSE;
! 695: }
! 696:
! 697: if(FD_ISSET(sockfd, &fds_read)) {
! 698: curl_socket_t newfd = accept(sockfd, NULL, NULL);
! 699: if(CURL_SOCKET_BAD == newfd) {
! 700: error = SOCKERRNO;
! 701: logmsg("accept(%d, NULL, NULL) failed with error: (%d) %s",
! 702: sockfd, error, strerror(error));
! 703: }
! 704: else {
! 705: logmsg("====> Client connect, fd %d. Read config from %s",
! 706: newfd, configfile);
! 707: set_advisor_read_lock(SERVERLOGS_LOCK);
! 708: (void)mqttit(newfd); /* until done */
! 709: clear_advisor_read_lock(SERVERLOGS_LOCK);
! 710:
! 711: logmsg("====> Client disconnect");
! 712: sclose(newfd);
! 713: }
! 714: }
! 715: } while(clients);
! 716:
! 717: return TRUE;
! 718: }
! 719:
! 720: static curl_socket_t sockdaemon(curl_socket_t sock,
! 721: unsigned short *listenport)
! 722: {
! 723: /* passive daemon style */
! 724: srvr_sockaddr_union_t listener;
! 725: int flag;
! 726: int rc;
! 727: int totdelay = 0;
! 728: int maxretr = 10;
! 729: int delay = 20;
! 730: int attempt = 0;
! 731: int error = 0;
! 732:
! 733: do {
! 734: attempt++;
! 735: flag = 1;
! 736: rc = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
! 737: (void *)&flag, sizeof(flag));
! 738: if(rc) {
! 739: error = SOCKERRNO;
! 740: logmsg("setsockopt(SO_REUSEADDR) failed with error: (%d) %s",
! 741: error, strerror(error));
! 742: if(maxretr) {
! 743: rc = wait_ms(delay);
! 744: if(rc) {
! 745: /* should not happen */
! 746: logmsg("wait_ms() failed with error: %d", rc);
! 747: sclose(sock);
! 748: return CURL_SOCKET_BAD;
! 749: }
! 750: if(got_exit_signal) {
! 751: logmsg("signalled to die, exiting...");
! 752: sclose(sock);
! 753: return CURL_SOCKET_BAD;
! 754: }
! 755: totdelay += delay;
! 756: delay *= 2; /* double the sleep for next attempt */
! 757: }
! 758: }
! 759: } while(rc && maxretr--);
! 760:
! 761: if(rc) {
! 762: logmsg("setsockopt(SO_REUSEADDR) failed %d times in %d ms. Error: (%d) %s",
! 763: attempt, totdelay, error, strerror(error));
! 764: logmsg("Continuing anyway...");
! 765: }
! 766:
! 767: /* When the specified listener port is zero, it is actually a
! 768: request to let the system choose a non-zero available port. */
! 769:
! 770: #ifdef ENABLE_IPV6
! 771: if(!use_ipv6) {
! 772: #endif
! 773: memset(&listener.sa4, 0, sizeof(listener.sa4));
! 774: listener.sa4.sin_family = AF_INET;
! 775: listener.sa4.sin_addr.s_addr = INADDR_ANY;
! 776: listener.sa4.sin_port = htons(*listenport);
! 777: rc = bind(sock, &listener.sa, sizeof(listener.sa4));
! 778: #ifdef ENABLE_IPV6
! 779: }
! 780: else {
! 781: memset(&listener.sa6, 0, sizeof(listener.sa6));
! 782: listener.sa6.sin6_family = AF_INET6;
! 783: listener.sa6.sin6_addr = in6addr_any;
! 784: listener.sa6.sin6_port = htons(*listenport);
! 785: rc = bind(sock, &listener.sa, sizeof(listener.sa6));
! 786: }
! 787: #endif /* ENABLE_IPV6 */
! 788: if(rc) {
! 789: error = SOCKERRNO;
! 790: logmsg("Error binding socket on port %hu: (%d) %s",
! 791: *listenport, error, strerror(error));
! 792: sclose(sock);
! 793: return CURL_SOCKET_BAD;
! 794: }
! 795:
! 796: if(!*listenport) {
! 797: /* The system was supposed to choose a port number, figure out which
! 798: port we actually got and update the listener port value with it. */
! 799: curl_socklen_t la_size;
! 800: srvr_sockaddr_union_t localaddr;
! 801: #ifdef ENABLE_IPV6
! 802: if(!use_ipv6)
! 803: #endif
! 804: la_size = sizeof(localaddr.sa4);
! 805: #ifdef ENABLE_IPV6
! 806: else
! 807: la_size = sizeof(localaddr.sa6);
! 808: #endif
! 809: memset(&localaddr.sa, 0, (size_t)la_size);
! 810: if(getsockname(sock, &localaddr.sa, &la_size) < 0) {
! 811: error = SOCKERRNO;
! 812: logmsg("getsockname() failed with error: (%d) %s",
! 813: error, strerror(error));
! 814: sclose(sock);
! 815: return CURL_SOCKET_BAD;
! 816: }
! 817: switch(localaddr.sa.sa_family) {
! 818: case AF_INET:
! 819: *listenport = ntohs(localaddr.sa4.sin_port);
! 820: break;
! 821: #ifdef ENABLE_IPV6
! 822: case AF_INET6:
! 823: *listenport = ntohs(localaddr.sa6.sin6_port);
! 824: break;
! 825: #endif
! 826: default:
! 827: break;
! 828: }
! 829: if(!*listenport) {
! 830: /* Real failure, listener port shall not be zero beyond this point. */
! 831: logmsg("Apparently getsockname() succeeded, with listener port zero.");
! 832: logmsg("A valid reason for this failure is a binary built without");
! 833: logmsg("proper network library linkage. This might not be the only");
! 834: logmsg("reason, but double check it before anything else.");
! 835: sclose(sock);
! 836: return CURL_SOCKET_BAD;
! 837: }
! 838: }
! 839:
! 840: /* start accepting connections */
! 841: rc = listen(sock, 5);
! 842: if(0 != rc) {
! 843: error = SOCKERRNO;
! 844: logmsg("listen(%d, 5) failed with error: (%d) %s",
! 845: sock, error, strerror(error));
! 846: sclose(sock);
! 847: return CURL_SOCKET_BAD;
! 848: }
! 849:
! 850: return sock;
! 851: }
! 852:
! 853:
! 854: int main(int argc, char *argv[])
! 855: {
! 856: curl_socket_t sock = CURL_SOCKET_BAD;
! 857: curl_socket_t msgsock = CURL_SOCKET_BAD;
! 858: int wrotepidfile = 0;
! 859: int wroteportfile = 0;
! 860: const char *pidname = ".mqttd.pid";
! 861: const char *portname = ".mqttd.port";
! 862: bool juggle_again;
! 863: int error;
! 864: int arg = 1;
! 865:
! 866: while(argc>arg) {
! 867: if(!strcmp("--version", argv[arg])) {
! 868: printf("mqttd IPv4%s\n",
! 869: #ifdef ENABLE_IPV6
! 870: "/IPv6"
! 871: #else
! 872: ""
! 873: #endif
! 874: );
! 875: return 0;
! 876: }
! 877: else if(!strcmp("--pidfile", argv[arg])) {
! 878: arg++;
! 879: if(argc>arg)
! 880: pidname = argv[arg++];
! 881: }
! 882: else if(!strcmp("--portfile", argv[arg])) {
! 883: arg++;
! 884: if(argc>arg)
! 885: portname = argv[arg++];
! 886: }
! 887: else if(!strcmp("--config", argv[arg])) {
! 888: arg++;
! 889: if(argc>arg)
! 890: configfile = argv[arg++];
! 891: }
! 892: else if(!strcmp("--logfile", argv[arg])) {
! 893: arg++;
! 894: if(argc>arg)
! 895: serverlogfile = argv[arg++];
! 896: }
! 897: else if(!strcmp("--ipv6", argv[arg])) {
! 898: #ifdef ENABLE_IPV6
! 899: ipv_inuse = "IPv6";
! 900: use_ipv6 = TRUE;
! 901: #endif
! 902: arg++;
! 903: }
! 904: else if(!strcmp("--ipv4", argv[arg])) {
! 905: /* for completeness, we support this option as well */
! 906: #ifdef ENABLE_IPV6
! 907: ipv_inuse = "IPv4";
! 908: use_ipv6 = FALSE;
! 909: #endif
! 910: arg++;
! 911: }
! 912: else if(!strcmp("--port", argv[arg])) {
! 913: arg++;
! 914: if(argc>arg) {
! 915: char *endptr;
! 916: unsigned long ulnum = strtoul(argv[arg], &endptr, 10);
! 917: if((endptr != argv[arg] + strlen(argv[arg])) ||
! 918: ((ulnum != 0UL) && ((ulnum < 1025UL) || (ulnum > 65535UL)))) {
! 919: fprintf(stderr, "mqttd: invalid --port argument (%s)\n",
! 920: argv[arg]);
! 921: return 0;
! 922: }
! 923: port = curlx_ultous(ulnum);
! 924: arg++;
! 925: }
! 926: }
! 927: else {
! 928: puts("Usage: mqttd [option]\n"
! 929: " --config [file]\n"
! 930: " --version\n"
! 931: " --logfile [file]\n"
! 932: " --pidfile [file]\n"
! 933: " --ipv4\n"
! 934: " --ipv6\n"
! 935: " --port [port]\n");
! 936: return 0;
! 937: }
! 938: }
! 939:
! 940: #ifdef WIN32
! 941: win32_init();
! 942: atexit(win32_cleanup);
! 943:
! 944: setmode(fileno(stdin), O_BINARY);
! 945: setmode(fileno(stdout), O_BINARY);
! 946: setmode(fileno(stderr), O_BINARY);
! 947: #endif
! 948:
! 949: install_signal_handlers(FALSE);
! 950:
! 951: #ifdef ENABLE_IPV6
! 952: if(!use_ipv6)
! 953: #endif
! 954: sock = socket(AF_INET, SOCK_STREAM, 0);
! 955: #ifdef ENABLE_IPV6
! 956: else
! 957: sock = socket(AF_INET6, SOCK_STREAM, 0);
! 958: #endif
! 959:
! 960: if(CURL_SOCKET_BAD == sock) {
! 961: error = SOCKERRNO;
! 962: logmsg("Error creating socket: (%d) %s",
! 963: error, strerror(error));
! 964: goto mqttd_cleanup;
! 965: }
! 966:
! 967: {
! 968: /* passive daemon style */
! 969: sock = sockdaemon(sock, &port);
! 970: if(CURL_SOCKET_BAD == sock) {
! 971: goto mqttd_cleanup;
! 972: }
! 973: msgsock = CURL_SOCKET_BAD; /* no stream socket yet */
! 974: }
! 975:
! 976: logmsg("Running %s version", ipv_inuse);
! 977: logmsg("Listening on port %hu", port);
! 978:
! 979: wrotepidfile = write_pidfile(pidname);
! 980: if(!wrotepidfile) {
! 981: goto mqttd_cleanup;
! 982: }
! 983:
! 984: wroteportfile = write_portfile(portname, (int)port);
! 985: if(!wroteportfile) {
! 986: goto mqttd_cleanup;
! 987: }
! 988:
! 989: do {
! 990: juggle_again = incoming(sock);
! 991: } while(juggle_again);
! 992:
! 993: mqttd_cleanup:
! 994:
! 995: if((msgsock != sock) && (msgsock != CURL_SOCKET_BAD))
! 996: sclose(msgsock);
! 997:
! 998: if(sock != CURL_SOCKET_BAD)
! 999: sclose(sock);
! 1000:
! 1001: if(wrotepidfile)
! 1002: unlink(pidname);
! 1003:
! 1004: restore_signal_handlers(FALSE);
! 1005:
! 1006: if(got_exit_signal) {
! 1007: logmsg("============> mqttd exits with signal (%d)", exit_signal);
! 1008: /*
! 1009: * To properly set the return status of the process we
! 1010: * must raise the same signal SIGINT or SIGTERM that we
! 1011: * caught and let the old handler take care of it.
! 1012: */
! 1013: raise(exit_signal);
! 1014: }
! 1015:
! 1016: logmsg("============> mqttd quits");
! 1017: return 0;
! 1018: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>