Return to mqttd.c CVS log | Up to [ELWIX - Embedded LightWeight unIX -] / embedaddon / curl / tests / server |
1.1 ! misho 1: /*************************************************************************** ! 2: * _ _ ____ _ ! 3: * Project ___| | | | _ \| | ! 4: * / __| | | | |_) | | ! 5: * | (__| |_| | _ <| |___ ! 6: * \___|\___/|_| \_\_____| ! 7: * ! 8: * Copyright (C) 1998 - 2020, Daniel Stenberg, <daniel@haxx.se>, et al. ! 9: * ! 10: * This software is licensed as described in the file COPYING, which ! 11: * you should have received as part of this distribution. The terms ! 12: * are also available at https://curl.haxx.se/docs/copyright.html. ! 13: * ! 14: * You may opt to use, copy, modify, merge, publish, distribute and/or sell ! 15: * copies of the Software, and permit persons to whom the Software is ! 16: * furnished to do so, under the terms of the COPYING file. ! 17: * ! 18: * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY ! 19: * KIND, either express or implied. ! 20: * ! 21: ***************************************************************************/ ! 22: #include "server_setup.h" ! 23: #include <stdlib.h> ! 24: #include <string.h> ! 25: #include "util.h" ! 26: ! 27: /* Function ! 28: * ! 29: * Accepts a TCP connection on a custom port (IPv4 or IPv6). Speaks MQTT. ! 30: * ! 31: * Read commands from FILE (set with --config). The commands control how to ! 32: * act and is reset to defaults each client TCP connect. ! 33: * ! 34: * Config file keywords: ! 35: * ! 36: * TODO ! 37: */ ! 38: ! 39: /* based on sockfilt.c */ ! 40: ! 41: #ifdef HAVE_SIGNAL_H ! 42: #include <signal.h> ! 43: #endif ! 44: #ifdef HAVE_NETINET_IN_H ! 45: #include <netinet/in.h> ! 46: #endif ! 47: #ifdef HAVE_NETINET_IN6_H ! 48: #include <netinet/in6.h> ! 49: #endif ! 50: #ifdef HAVE_ARPA_INET_H ! 51: #include <arpa/inet.h> ! 52: #endif ! 53: #ifdef HAVE_NETDB_H ! 54: #include <netdb.h> ! 55: #endif ! 56: ! 57: #define ENABLE_CURLX_PRINTF ! 58: /* make the curlx header define all printf() functions to use the curlx_* ! 59: versions instead */ ! 60: #include "curlx.h" /* from the private lib dir */ ! 61: #include "getpart.h" ! 62: #include "inet_pton.h" ! 63: #include "util.h" ! 64: #include "server_sockaddr.h" ! 65: #include "warnless.h" ! 66: ! 67: /* include memdebug.h last */ ! 68: #include "memdebug.h" ! 69: ! 70: #ifdef USE_WINSOCK ! 71: #undef EINTR ! 72: #define EINTR 4 /* errno.h value */ ! 73: #undef EAGAIN ! 74: #define EAGAIN 11 /* errno.h value */ ! 75: #undef ENOMEM ! 76: #define ENOMEM 12 /* errno.h value */ ! 77: #undef EINVAL ! 78: #define EINVAL 22 /* errno.h value */ ! 79: #endif ! 80: ! 81: #define DEFAULT_PORT 1883 /* MQTT default port */ ! 82: ! 83: #ifndef DEFAULT_LOGFILE ! 84: #define DEFAULT_LOGFILE "log/mqttd.log" ! 85: #endif ! 86: ! 87: #ifndef DEFAULT_CONFIG ! 88: #define DEFAULT_CONFIG "mqttd.config" ! 89: #endif ! 90: ! 91: #define MQTT_MSG_CONNECT 0x10 ! 92: #define MQTT_MSG_CONNACK 0x20 ! 93: #define MQTT_MSG_PUBLISH 0x30 ! 94: #define MQTT_MSG_PUBACK 0x40 ! 95: #define MQTT_MSG_SUBSCRIBE 0x82 ! 96: #define MQTT_MSG_SUBACK 0x90 ! 97: #define MQTT_MSG_DISCONNECT 0xe0 ! 98: ! 99: #define MQTT_CONNACK_LEN 4 ! 100: #define MQTT_SUBACK_LEN 5 ! 101: #define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */ ! 102: #define MQTT_HEADER_LEN 5 /* max 5 bytes */ ! 103: ! 104: struct configurable { ! 105: unsigned char version; /* initial version byte in the request must match ! 106: this */ ! 107: bool publish_before_suback; ! 108: bool short_publish; ! 109: unsigned char error_connack; ! 110: int testnum; ! 111: }; ! 112: ! 113: #define REQUEST_DUMP "log/server.input" ! 114: #define CONFIG_VERSION 5 ! 115: ! 116: static struct configurable config; ! 117: ! 118: const char *serverlogfile = DEFAULT_LOGFILE; ! 119: static const char *configfile = DEFAULT_CONFIG; ! 120: ! 121: #ifdef ENABLE_IPV6 ! 122: static bool use_ipv6 = FALSE; ! 123: #endif ! 124: static const char *ipv_inuse = "IPv4"; ! 125: static unsigned short port = DEFAULT_PORT; ! 126: ! 127: static void resetdefaults(void) ! 128: { ! 129: logmsg("Reset to defaults"); ! 130: config.version = CONFIG_VERSION; ! 131: config.publish_before_suback = FALSE; ! 132: config.short_publish = FALSE; ! 133: config.error_connack = 0; ! 134: config.testnum = 0; ! 135: } ! 136: ! 137: static unsigned char byteval(char *value) ! 138: { ! 139: unsigned long num = strtoul(value, NULL, 10); ! 140: return num & 0xff; ! 141: } ! 142: ! 143: static void getconfig(void) ! 144: { ! 145: FILE *fp = fopen(configfile, FOPEN_READTEXT); ! 146: resetdefaults(); ! 147: if(fp) { ! 148: char buffer[512]; ! 149: logmsg("parse config file"); ! 150: while(fgets(buffer, sizeof(buffer), fp)) { ! 151: char key[32]; ! 152: char value[32]; ! 153: if(2 == sscanf(buffer, "%31s %31s", key, value)) { ! 154: if(!strcmp(key, "version")) { ! 155: config.version = byteval(value); ! 156: logmsg("version [%d] set", config.version); ! 157: } ! 158: else if(!strcmp(key, "PUBLISH-before-SUBACK")) { ! 159: logmsg("PUBLISH-before-SUBACK set"); ! 160: config.publish_before_suback = TRUE; ! 161: } ! 162: else if(!strcmp(key, "short-PUBLISH")) { ! 163: logmsg("short-PUBLISH set"); ! 164: config.short_publish = TRUE; ! 165: } ! 166: else if(!strcmp(key, "error-CONNACK")) { ! 167: config.error_connack = byteval(value); ! 168: logmsg("error-CONNACK = %d", config.error_connack); ! 169: } ! 170: else if(!strcmp(key, "Testnum")) { ! 171: config.testnum = atoi(value); ! 172: logmsg("testnum = %d", config.testnum); ! 173: } ! 174: } ! 175: } ! 176: fclose(fp); ! 177: } ! 178: else { ! 179: logmsg("No config file '%s' to read", configfile); ! 180: } ! 181: } ! 182: ! 183: static void loghex(unsigned char *buffer, ssize_t len) ! 184: { ! 185: char data[12000]; ! 186: ssize_t i; ! 187: unsigned char *ptr = buffer; ! 188: char *optr = data; ! 189: ssize_t width = 0; ! 190: int left = sizeof(data); ! 191: ! 192: for(i = 0; i<len && (left >= 0); i++) { ! 193: msnprintf(optr, left, "%02x", ptr[i]); ! 194: width += 2; ! 195: optr += 2; ! 196: left -= 2; ! 197: } ! 198: if(width) ! 199: logmsg("'%s'", data); ! 200: } ! 201: ! 202: typedef enum { ! 203: FROM_CLIENT, ! 204: FROM_SERVER ! 205: } mqttdir; ! 206: ! 207: static void logprotocol(mqttdir dir, ! 208: const char *prefix, size_t remlen, ! 209: FILE *output, ! 210: unsigned char *buffer, ssize_t len) ! 211: { ! 212: char data[12000] = ""; ! 213: ssize_t i; ! 214: unsigned char *ptr = buffer; ! 215: char *optr = data; ! 216: ssize_t width = 0; ! 217: int left = sizeof(data); ! 218: ! 219: for(i = 0; i<len && (left >= 0); i++) { ! 220: msnprintf(optr, left, "%02x", ptr[i]); ! 221: width += 2; ! 222: optr += 2; ! 223: left -= 2; ! 224: } ! 225: fprintf(output, "%s %s %zx %s\n", ! 226: dir == FROM_CLIENT? "client": "server", ! 227: prefix, remlen, ! 228: data); ! 229: } ! 230: ! 231: ! 232: /* return 0 on success */ ! 233: static int connack(FILE *dump, curl_socket_t fd) ! 234: { ! 235: unsigned char packet[]={ ! 236: MQTT_MSG_CONNACK, 0x02, ! 237: 0x00, 0x00 ! 238: }; ! 239: ssize_t rc; ! 240: ! 241: packet[3] = config.error_connack; ! 242: ! 243: rc = swrite(fd, (char *)packet, sizeof(packet)); ! 244: if(rc > 0) { ! 245: logmsg("WROTE %d bytes [CONNACK]", rc); ! 246: loghex(packet, rc); ! 247: logprotocol(FROM_SERVER, "CONNACK", 2, dump, packet, sizeof(packet)); ! 248: } ! 249: if(rc == sizeof(packet)) { ! 250: return 0; ! 251: } ! 252: return 1; ! 253: } ! 254: ! 255: /* return 0 on success */ ! 256: static int suback(FILE *dump, curl_socket_t fd, unsigned short packetid) ! 257: { ! 258: unsigned char packet[]={ ! 259: MQTT_MSG_SUBACK, 0x03, ! 260: 0, 0, /* filled in below */ ! 261: 0x00 ! 262: }; ! 263: ssize_t rc; ! 264: packet[2] = (unsigned char)(packetid >> 8); ! 265: packet[3] = (unsigned char)(packetid & 0xff); ! 266: ! 267: rc = swrite(fd, (char *)packet, sizeof(packet)); ! 268: if(rc == sizeof(packet)) { ! 269: logmsg("WROTE %d bytes [SUBACK]", rc); ! 270: loghex(packet, rc); ! 271: logprotocol(FROM_SERVER, "SUBACK", 3, dump, packet, rc); ! 272: return 0; ! 273: } ! 274: return 1; ! 275: } ! 276: ! 277: #ifdef QOS ! 278: /* return 0 on success */ ! 279: static int puback(FILE *dump, curl_socket_t fd, unsigned short packetid) ! 280: { ! 281: unsigned char packet[]={ ! 282: MQTT_MSG_PUBACK, 0x00, ! 283: 0, 0 /* filled in below */ ! 284: }; ! 285: ssize_t rc; ! 286: packet[2] = (unsigned char)(packetid >> 8); ! 287: packet[3] = (unsigned char)(packetid & 0xff); ! 288: ! 289: rc = swrite(fd, (char *)packet, sizeof(packet)); ! 290: if(rc == sizeof(packet)) { ! 291: logmsg("WROTE %d bytes [PUBACK]", rc); ! 292: loghex(packet, rc); ! 293: logprotocol(FROM_SERVER, dump, packet, rc); ! 294: return 0; ! 295: } ! 296: logmsg("Failed sending [PUBACK]"); ! 297: return 1; ! 298: } ! 299: #endif ! 300: ! 301: /* return 0 on success */ ! 302: static int disconnect(FILE *dump, curl_socket_t fd) ! 303: { ! 304: unsigned char packet[]={ ! 305: MQTT_MSG_DISCONNECT, 0x00, ! 306: }; ! 307: ssize_t rc = swrite(fd, (char *)packet, sizeof(packet)); ! 308: if(rc == sizeof(packet)) { ! 309: logmsg("WROTE %d bytes [DISCONNECT]", rc); ! 310: loghex(packet, rc); ! 311: logprotocol(FROM_SERVER, "DISCONNECT", 0, dump, packet, rc); ! 312: return 0; ! 313: } ! 314: logmsg("Failed sending [DISCONNECT]"); ! 315: return 1; ! 316: } ! 317: ! 318: ! 319: ! 320: /* ! 321: do ! 322: ! 323: encodedByte = X MOD 128 ! 324: ! 325: X = X DIV 128 ! 326: ! 327: // if there are more data to encode, set the top bit of this byte ! 328: ! 329: if ( X > 0 ) ! 330: ! 331: encodedByte = encodedByte OR 128 ! 332: ! 333: endif ! 334: ! 335: 'output' encodedByte ! 336: ! 337: while ( X > 0 ) ! 338: ! 339: */ ! 340: ! 341: /* return number of bytes used */ ! 342: static int encode_length(size_t packetlen, char *remlength) /* 4 bytes */ ! 343: { ! 344: int bytes = 0; ! 345: unsigned char encode; ! 346: ! 347: do { ! 348: encode = packetlen % 0x80; ! 349: packetlen /= 0x80; ! 350: if(packetlen) ! 351: encode |= 0x80; ! 352: ! 353: remlength[bytes++] = encode; ! 354: ! 355: if(bytes > 3) { ! 356: logmsg("too large packet!"); ! 357: return 0; ! 358: } ! 359: } while(packetlen); ! 360: ! 361: return bytes; ! 362: } ! 363: ! 364: ! 365: static size_t decode_length(unsigned char *buf, ! 366: size_t buflen, size_t *lenbytes) ! 367: { ! 368: size_t len = 0; ! 369: size_t mult = 1; ! 370: size_t i; ! 371: unsigned char encoded = 0x80; ! 372: ! 373: for(i = 0; (i < buflen) && (encoded & 0x80); i++) { ! 374: encoded = buf[i]; ! 375: len += (encoded & 0x7f) * mult; ! 376: mult *= 0x80; ! 377: } ! 378: ! 379: if(lenbytes) ! 380: *lenbytes = i; ! 381: ! 382: return len; ! 383: } ! 384: ! 385: ! 386: /* return 0 on success */ ! 387: static int publish(FILE *dump, ! 388: curl_socket_t fd, unsigned short packetid, ! 389: char *topic, char *payload, size_t payloadlen) ! 390: { ! 391: size_t topiclen = strlen(topic); ! 392: unsigned char *packet; ! 393: size_t payloadindex; ! 394: ssize_t remaininglength = topiclen + 2 + payloadlen; ! 395: ssize_t packetlen; ! 396: ssize_t sendamount; ! 397: ssize_t rc; ! 398: char rembuffer[4]; ! 399: int encodedlen; ! 400: ! 401: encodedlen = encode_length(remaininglength, rembuffer); ! 402: ! 403: /* one packet type byte (possibly two more for packetid) */ ! 404: packetlen = remaininglength + encodedlen + 1; ! 405: packet = malloc(packetlen); ! 406: if(!packet) ! 407: return 1; ! 408: ! 409: packet[0] = MQTT_MSG_PUBLISH; /* TODO: set QoS? */ ! 410: memcpy(&packet[1], rembuffer, encodedlen); ! 411: ! 412: (void)packetid; ! 413: /* packet_id if QoS is set */ ! 414: ! 415: packet[1 + encodedlen] = (unsigned char)(topiclen >> 8); ! 416: packet[2 + encodedlen] = (unsigned char)(topiclen & 0xff); ! 417: memcpy(&packet[3 + encodedlen], topic, topiclen); ! 418: ! 419: payloadindex = 3 + topiclen + encodedlen; ! 420: memcpy(&packet[payloadindex], payload, payloadlen); ! 421: ! 422: sendamount = packetlen; ! 423: if(config.short_publish) ! 424: sendamount -= 2; ! 425: ! 426: rc = swrite(fd, (char *)packet, sendamount); ! 427: if(rc > 0) { ! 428: logmsg("WROTE %d bytes [PUBLISH]", rc); ! 429: loghex(packet, rc); ! 430: logprotocol(FROM_SERVER, "PUBLISH", remaininglength, dump, packet, rc); ! 431: } ! 432: if(rc == packetlen) ! 433: return 0; ! 434: return 1; ! 435: } ! 436: ! 437: #define MAX_TOPIC_LENGTH 65535 ! 438: #define MAX_CLIENT_ID_LENGTH 32 ! 439: ! 440: static char topic[MAX_TOPIC_LENGTH + 1]; ! 441: ! 442: static int fixedheader(curl_socket_t fd, ! 443: unsigned char *bytep, ! 444: size_t *remaining_lengthp, ! 445: size_t *remaining_length_bytesp) ! 446: { ! 447: /* get the fixed header */ ! 448: unsigned char buffer[10]; ! 449: ! 450: /* get the first two bytes */ ! 451: ssize_t rc = sread(fd, (char *)buffer, 2); ! 452: int i; ! 453: if(rc < 2) { ! 454: logmsg("READ %d bytes [SHORT!]", rc); ! 455: return 1; /* fail */ ! 456: } ! 457: logmsg("READ %d bytes", rc); ! 458: loghex(buffer, rc); ! 459: *bytep = buffer[0]; ! 460: ! 461: /* if the length byte has the top bit set, get the next one too */ ! 462: i = 1; ! 463: while(buffer[i] & 0x80) { ! 464: i++; ! 465: rc = sread(fd, (char *)&buffer[i], 1); ! 466: if(rc != 1) { ! 467: logmsg("Remaining Length broken"); ! 468: return 1; ! 469: } ! 470: } ! 471: *remaining_lengthp = decode_length(&buffer[1], i, remaining_length_bytesp); ! 472: logmsg("Remaining Length: %ld [%d bytes]", (long) *remaining_lengthp, ! 473: *remaining_length_bytesp); ! 474: return 0; ! 475: } ! 476: ! 477: static curl_socket_t mqttit(curl_socket_t fd) ! 478: { ! 479: unsigned char buffer[10*1024]; ! 480: ssize_t rc; ! 481: unsigned char byte; ! 482: unsigned short packet_id; ! 483: size_t payload_len; ! 484: unsigned int topic_len; ! 485: size_t remaining_length = 0; ! 486: size_t bytes = 0; /* remaining length field size in bytes */ ! 487: char client_id[MAX_CLIENT_ID_LENGTH]; ! 488: long testno; ! 489: ! 490: static const char protocol[7] = { ! 491: 0x00, 0x04, /* protocol length */ ! 492: 'M','Q','T','T', /* protocol name */ ! 493: 0x04 /* protocol level */ ! 494: }; ! 495: FILE *dump = fopen(REQUEST_DUMP, "ab"); ! 496: if(!dump) ! 497: goto end; ! 498: ! 499: getconfig(); ! 500: ! 501: testno = config.testnum; ! 502: ! 503: if(testno) ! 504: logmsg("Found test number %ld", testno); ! 505: ! 506: do { ! 507: /* get the fixed header */ ! 508: rc = fixedheader(fd, &byte, &remaining_length, &bytes); ! 509: if(rc) ! 510: break; ! 511: if(remaining_length) { ! 512: rc = sread(fd, (char *)buffer, remaining_length); ! 513: if(rc > 0) { ! 514: logmsg("READ %d bytes", rc); ! 515: loghex(buffer, rc); ! 516: } ! 517: } ! 518: ! 519: if(byte == MQTT_MSG_CONNECT) { ! 520: logprotocol(FROM_CLIENT, "CONNECT", remaining_length, ! 521: dump, buffer, rc); ! 522: ! 523: if(memcmp(protocol, buffer, sizeof(protocol))) { ! 524: logmsg("Protocol preamble mismatch"); ! 525: goto end; ! 526: } ! 527: /* ignore the connect flag byte and two keepalive bytes */ ! 528: ! 529: payload_len = (buffer[10] << 8) | buffer[11]; ! 530: if((ssize_t)payload_len != (rc - 12)) { ! 531: logmsg("Payload length mismatch, expected %x got %x", ! 532: rc - 12, payload_len); ! 533: goto end; ! 534: } ! 535: else if((payload_len + 1) > MAX_CLIENT_ID_LENGTH) { ! 536: logmsg("Too large client id"); ! 537: goto end; ! 538: } ! 539: memcpy(client_id, &buffer[14], payload_len); ! 540: client_id[payload_len] = 0; ! 541: ! 542: logmsg("MQTT client connect accepted: %s", client_id); ! 543: ! 544: /* The first packet sent from the Server to the Client MUST be a ! 545: CONNACK Packet */ ! 546: ! 547: if(connack(dump, fd)) { ! 548: logmsg("failed sending CONNACK"); ! 549: goto end; ! 550: } ! 551: } ! 552: else if(byte == MQTT_MSG_SUBSCRIBE) { ! 553: FILE *stream; ! 554: int error; ! 555: char *data; ! 556: size_t datalen; ! 557: logprotocol(FROM_CLIENT, "SUBSCRIBE", remaining_length, ! 558: dump, buffer, rc); ! 559: logmsg("Incoming SUBSCRIBE"); ! 560: ! 561: if(rc < 6) { ! 562: logmsg("Too small SUBSCRIBE"); ! 563: goto end; ! 564: } ! 565: ! 566: /* two bytes packet id */ ! 567: packet_id = (unsigned short)((buffer[0] << 8) | buffer[1]); ! 568: ! 569: /* two bytes topic length */ ! 570: topic_len = (buffer[2] << 8) | buffer[3]; ! 571: if(topic_len != (remaining_length - 5)) { ! 572: logmsg("Wrong topic length, got %d expected %d", ! 573: topic_len, remaining_length - 5); ! 574: goto end; ! 575: } ! 576: memcpy(topic, &buffer[4], topic_len); ! 577: topic[topic_len] = 0; ! 578: ! 579: /* there's a QoS byte (two bits) after the topic */ ! 580: ! 581: logmsg("SUBSCRIBE to '%s' [%d]", topic, packet_id); ! 582: stream = test2fopen(testno); ! 583: error = getpart(&data, &datalen, "reply", "data", stream); ! 584: if(!error) { ! 585: if(!config.publish_before_suback) { ! 586: if(suback(dump, fd, packet_id)) { ! 587: logmsg("failed sending SUBACK"); ! 588: goto end; ! 589: } ! 590: } ! 591: if(publish(dump, fd, packet_id, topic, data, datalen)) { ! 592: logmsg("PUBLISH failed"); ! 593: goto end; ! 594: } ! 595: if(config.publish_before_suback) { ! 596: if(suback(dump, fd, packet_id)) { ! 597: logmsg("failed sending SUBACK"); ! 598: goto end; ! 599: } ! 600: } ! 601: } ! 602: else { ! 603: char *def = (char *)"this is random payload yes yes it is"; ! 604: publish(dump, fd, packet_id, topic, def, strlen(def)); ! 605: } ! 606: disconnect(dump, fd); ! 607: } ! 608: else if((byte & 0xf0) == (MQTT_MSG_PUBLISH & 0xf0)) { ! 609: size_t topiclen; ! 610: ! 611: logmsg("Incoming PUBLISH"); ! 612: logprotocol(FROM_CLIENT, "PUBLISH", remaining_length, ! 613: dump, buffer, rc); ! 614: ! 615: topiclen = (buffer[1 + bytes] << 8) | buffer[2 + bytes]; ! 616: logmsg("Got %d bytes topic", topiclen); ! 617: /* TODO: verify topiclen */ ! 618: ! 619: #ifdef QOS ! 620: /* TODO: handle packetid if there is one. Send puback if QoS > 0 */ ! 621: puback(dump, fd, 0); ! 622: #endif ! 623: /* expect a disconnect here */ ! 624: /* get the request */ ! 625: rc = sread(fd, (char *)&buffer[0], 2); ! 626: ! 627: logmsg("READ %d bytes [DISCONNECT]", rc); ! 628: loghex(buffer, rc); ! 629: logprotocol(FROM_CLIENT, "DISCONNECT", 0, dump, buffer, rc); ! 630: goto end; ! 631: } ! 632: else { ! 633: /* not supported (yet) */ ! 634: goto end; ! 635: } ! 636: } while(1); ! 637: ! 638: end: ! 639: fclose(dump); ! 640: return CURL_SOCKET_BAD; ! 641: } ! 642: ! 643: /* ! 644: sockfdp is a pointer to an established stream or CURL_SOCKET_BAD ! 645: ! 646: if sockfd is CURL_SOCKET_BAD, listendfd is a listening socket we must ! 647: accept() ! 648: */ ! 649: static bool incoming(curl_socket_t listenfd) ! 650: { ! 651: fd_set fds_read; ! 652: fd_set fds_write; ! 653: fd_set fds_err; ! 654: int clients = 0; /* connected clients */ ! 655: ! 656: if(got_exit_signal) { ! 657: logmsg("signalled to die, exiting..."); ! 658: return FALSE; ! 659: } ! 660: ! 661: #ifdef HAVE_GETPPID ! 662: /* As a last resort, quit if socks5 process becomes orphan. */ ! 663: if(getppid() <= 1) { ! 664: logmsg("process becomes orphan, exiting"); ! 665: return FALSE; ! 666: } ! 667: #endif ! 668: ! 669: do { ! 670: ssize_t rc; ! 671: int error = 0; ! 672: curl_socket_t sockfd = listenfd; ! 673: int maxfd = (int)sockfd; ! 674: ! 675: FD_ZERO(&fds_read); ! 676: FD_ZERO(&fds_write); ! 677: FD_ZERO(&fds_err); ! 678: ! 679: /* there's always a socket to wait for */ ! 680: FD_SET(sockfd, &fds_read); ! 681: ! 682: do { ! 683: /* select() blocking behavior call on blocking descriptors please */ ! 684: rc = select(maxfd + 1, &fds_read, &fds_write, &fds_err, NULL); ! 685: if(got_exit_signal) { ! 686: logmsg("signalled to die, exiting..."); ! 687: return FALSE; ! 688: } ! 689: } while((rc == -1) && ((error = SOCKERRNO) == EINTR)); ! 690: ! 691: if(rc < 0) { ! 692: logmsg("select() failed with error: (%d) %s", ! 693: error, strerror(error)); ! 694: return FALSE; ! 695: } ! 696: ! 697: if(FD_ISSET(sockfd, &fds_read)) { ! 698: curl_socket_t newfd = accept(sockfd, NULL, NULL); ! 699: if(CURL_SOCKET_BAD == newfd) { ! 700: error = SOCKERRNO; ! 701: logmsg("accept(%d, NULL, NULL) failed with error: (%d) %s", ! 702: sockfd, error, strerror(error)); ! 703: } ! 704: else { ! 705: logmsg("====> Client connect, fd %d. Read config from %s", ! 706: newfd, configfile); ! 707: set_advisor_read_lock(SERVERLOGS_LOCK); ! 708: (void)mqttit(newfd); /* until done */ ! 709: clear_advisor_read_lock(SERVERLOGS_LOCK); ! 710: ! 711: logmsg("====> Client disconnect"); ! 712: sclose(newfd); ! 713: } ! 714: } ! 715: } while(clients); ! 716: ! 717: return TRUE; ! 718: } ! 719: ! 720: static curl_socket_t sockdaemon(curl_socket_t sock, ! 721: unsigned short *listenport) ! 722: { ! 723: /* passive daemon style */ ! 724: srvr_sockaddr_union_t listener; ! 725: int flag; ! 726: int rc; ! 727: int totdelay = 0; ! 728: int maxretr = 10; ! 729: int delay = 20; ! 730: int attempt = 0; ! 731: int error = 0; ! 732: ! 733: do { ! 734: attempt++; ! 735: flag = 1; ! 736: rc = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, ! 737: (void *)&flag, sizeof(flag)); ! 738: if(rc) { ! 739: error = SOCKERRNO; ! 740: logmsg("setsockopt(SO_REUSEADDR) failed with error: (%d) %s", ! 741: error, strerror(error)); ! 742: if(maxretr) { ! 743: rc = wait_ms(delay); ! 744: if(rc) { ! 745: /* should not happen */ ! 746: logmsg("wait_ms() failed with error: %d", rc); ! 747: sclose(sock); ! 748: return CURL_SOCKET_BAD; ! 749: } ! 750: if(got_exit_signal) { ! 751: logmsg("signalled to die, exiting..."); ! 752: sclose(sock); ! 753: return CURL_SOCKET_BAD; ! 754: } ! 755: totdelay += delay; ! 756: delay *= 2; /* double the sleep for next attempt */ ! 757: } ! 758: } ! 759: } while(rc && maxretr--); ! 760: ! 761: if(rc) { ! 762: logmsg("setsockopt(SO_REUSEADDR) failed %d times in %d ms. Error: (%d) %s", ! 763: attempt, totdelay, error, strerror(error)); ! 764: logmsg("Continuing anyway..."); ! 765: } ! 766: ! 767: /* When the specified listener port is zero, it is actually a ! 768: request to let the system choose a non-zero available port. */ ! 769: ! 770: #ifdef ENABLE_IPV6 ! 771: if(!use_ipv6) { ! 772: #endif ! 773: memset(&listener.sa4, 0, sizeof(listener.sa4)); ! 774: listener.sa4.sin_family = AF_INET; ! 775: listener.sa4.sin_addr.s_addr = INADDR_ANY; ! 776: listener.sa4.sin_port = htons(*listenport); ! 777: rc = bind(sock, &listener.sa, sizeof(listener.sa4)); ! 778: #ifdef ENABLE_IPV6 ! 779: } ! 780: else { ! 781: memset(&listener.sa6, 0, sizeof(listener.sa6)); ! 782: listener.sa6.sin6_family = AF_INET6; ! 783: listener.sa6.sin6_addr = in6addr_any; ! 784: listener.sa6.sin6_port = htons(*listenport); ! 785: rc = bind(sock, &listener.sa, sizeof(listener.sa6)); ! 786: } ! 787: #endif /* ENABLE_IPV6 */ ! 788: if(rc) { ! 789: error = SOCKERRNO; ! 790: logmsg("Error binding socket on port %hu: (%d) %s", ! 791: *listenport, error, strerror(error)); ! 792: sclose(sock); ! 793: return CURL_SOCKET_BAD; ! 794: } ! 795: ! 796: if(!*listenport) { ! 797: /* The system was supposed to choose a port number, figure out which ! 798: port we actually got and update the listener port value with it. */ ! 799: curl_socklen_t la_size; ! 800: srvr_sockaddr_union_t localaddr; ! 801: #ifdef ENABLE_IPV6 ! 802: if(!use_ipv6) ! 803: #endif ! 804: la_size = sizeof(localaddr.sa4); ! 805: #ifdef ENABLE_IPV6 ! 806: else ! 807: la_size = sizeof(localaddr.sa6); ! 808: #endif ! 809: memset(&localaddr.sa, 0, (size_t)la_size); ! 810: if(getsockname(sock, &localaddr.sa, &la_size) < 0) { ! 811: error = SOCKERRNO; ! 812: logmsg("getsockname() failed with error: (%d) %s", ! 813: error, strerror(error)); ! 814: sclose(sock); ! 815: return CURL_SOCKET_BAD; ! 816: } ! 817: switch(localaddr.sa.sa_family) { ! 818: case AF_INET: ! 819: *listenport = ntohs(localaddr.sa4.sin_port); ! 820: break; ! 821: #ifdef ENABLE_IPV6 ! 822: case AF_INET6: ! 823: *listenport = ntohs(localaddr.sa6.sin6_port); ! 824: break; ! 825: #endif ! 826: default: ! 827: break; ! 828: } ! 829: if(!*listenport) { ! 830: /* Real failure, listener port shall not be zero beyond this point. */ ! 831: logmsg("Apparently getsockname() succeeded, with listener port zero."); ! 832: logmsg("A valid reason for this failure is a binary built without"); ! 833: logmsg("proper network library linkage. This might not be the only"); ! 834: logmsg("reason, but double check it before anything else."); ! 835: sclose(sock); ! 836: return CURL_SOCKET_BAD; ! 837: } ! 838: } ! 839: ! 840: /* start accepting connections */ ! 841: rc = listen(sock, 5); ! 842: if(0 != rc) { ! 843: error = SOCKERRNO; ! 844: logmsg("listen(%d, 5) failed with error: (%d) %s", ! 845: sock, error, strerror(error)); ! 846: sclose(sock); ! 847: return CURL_SOCKET_BAD; ! 848: } ! 849: ! 850: return sock; ! 851: } ! 852: ! 853: ! 854: int main(int argc, char *argv[]) ! 855: { ! 856: curl_socket_t sock = CURL_SOCKET_BAD; ! 857: curl_socket_t msgsock = CURL_SOCKET_BAD; ! 858: int wrotepidfile = 0; ! 859: int wroteportfile = 0; ! 860: const char *pidname = ".mqttd.pid"; ! 861: const char *portname = ".mqttd.port"; ! 862: bool juggle_again; ! 863: int error; ! 864: int arg = 1; ! 865: ! 866: while(argc>arg) { ! 867: if(!strcmp("--version", argv[arg])) { ! 868: printf("mqttd IPv4%s\n", ! 869: #ifdef ENABLE_IPV6 ! 870: "/IPv6" ! 871: #else ! 872: "" ! 873: #endif ! 874: ); ! 875: return 0; ! 876: } ! 877: else if(!strcmp("--pidfile", argv[arg])) { ! 878: arg++; ! 879: if(argc>arg) ! 880: pidname = argv[arg++]; ! 881: } ! 882: else if(!strcmp("--portfile", argv[arg])) { ! 883: arg++; ! 884: if(argc>arg) ! 885: portname = argv[arg++]; ! 886: } ! 887: else if(!strcmp("--config", argv[arg])) { ! 888: arg++; ! 889: if(argc>arg) ! 890: configfile = argv[arg++]; ! 891: } ! 892: else if(!strcmp("--logfile", argv[arg])) { ! 893: arg++; ! 894: if(argc>arg) ! 895: serverlogfile = argv[arg++]; ! 896: } ! 897: else if(!strcmp("--ipv6", argv[arg])) { ! 898: #ifdef ENABLE_IPV6 ! 899: ipv_inuse = "IPv6"; ! 900: use_ipv6 = TRUE; ! 901: #endif ! 902: arg++; ! 903: } ! 904: else if(!strcmp("--ipv4", argv[arg])) { ! 905: /* for completeness, we support this option as well */ ! 906: #ifdef ENABLE_IPV6 ! 907: ipv_inuse = "IPv4"; ! 908: use_ipv6 = FALSE; ! 909: #endif ! 910: arg++; ! 911: } ! 912: else if(!strcmp("--port", argv[arg])) { ! 913: arg++; ! 914: if(argc>arg) { ! 915: char *endptr; ! 916: unsigned long ulnum = strtoul(argv[arg], &endptr, 10); ! 917: if((endptr != argv[arg] + strlen(argv[arg])) || ! 918: ((ulnum != 0UL) && ((ulnum < 1025UL) || (ulnum > 65535UL)))) { ! 919: fprintf(stderr, "mqttd: invalid --port argument (%s)\n", ! 920: argv[arg]); ! 921: return 0; ! 922: } ! 923: port = curlx_ultous(ulnum); ! 924: arg++; ! 925: } ! 926: } ! 927: else { ! 928: puts("Usage: mqttd [option]\n" ! 929: " --config [file]\n" ! 930: " --version\n" ! 931: " --logfile [file]\n" ! 932: " --pidfile [file]\n" ! 933: " --ipv4\n" ! 934: " --ipv6\n" ! 935: " --port [port]\n"); ! 936: return 0; ! 937: } ! 938: } ! 939: ! 940: #ifdef WIN32 ! 941: win32_init(); ! 942: atexit(win32_cleanup); ! 943: ! 944: setmode(fileno(stdin), O_BINARY); ! 945: setmode(fileno(stdout), O_BINARY); ! 946: setmode(fileno(stderr), O_BINARY); ! 947: #endif ! 948: ! 949: install_signal_handlers(FALSE); ! 950: ! 951: #ifdef ENABLE_IPV6 ! 952: if(!use_ipv6) ! 953: #endif ! 954: sock = socket(AF_INET, SOCK_STREAM, 0); ! 955: #ifdef ENABLE_IPV6 ! 956: else ! 957: sock = socket(AF_INET6, SOCK_STREAM, 0); ! 958: #endif ! 959: ! 960: if(CURL_SOCKET_BAD == sock) { ! 961: error = SOCKERRNO; ! 962: logmsg("Error creating socket: (%d) %s", ! 963: error, strerror(error)); ! 964: goto mqttd_cleanup; ! 965: } ! 966: ! 967: { ! 968: /* passive daemon style */ ! 969: sock = sockdaemon(sock, &port); ! 970: if(CURL_SOCKET_BAD == sock) { ! 971: goto mqttd_cleanup; ! 972: } ! 973: msgsock = CURL_SOCKET_BAD; /* no stream socket yet */ ! 974: } ! 975: ! 976: logmsg("Running %s version", ipv_inuse); ! 977: logmsg("Listening on port %hu", port); ! 978: ! 979: wrotepidfile = write_pidfile(pidname); ! 980: if(!wrotepidfile) { ! 981: goto mqttd_cleanup; ! 982: } ! 983: ! 984: wroteportfile = write_portfile(portname, (int)port); ! 985: if(!wroteportfile) { ! 986: goto mqttd_cleanup; ! 987: } ! 988: ! 989: do { ! 990: juggle_again = incoming(sock); ! 991: } while(juggle_again); ! 992: ! 993: mqttd_cleanup: ! 994: ! 995: if((msgsock != sock) && (msgsock != CURL_SOCKET_BAD)) ! 996: sclose(msgsock); ! 997: ! 998: if(sock != CURL_SOCKET_BAD) ! 999: sclose(sock); ! 1000: ! 1001: if(wrotepidfile) ! 1002: unlink(pidname); ! 1003: ! 1004: restore_signal_handlers(FALSE); ! 1005: ! 1006: if(got_exit_signal) { ! 1007: logmsg("============> mqttd exits with signal (%d)", exit_signal); ! 1008: /* ! 1009: * To properly set the return status of the process we ! 1010: * must raise the same signal SIGINT or SIGTERM that we ! 1011: * caught and let the old handler take care of it. ! 1012: */ ! 1013: raise(exit_signal); ! 1014: } ! 1015: ! 1016: logmsg("============> mqttd quits"); ! 1017: return 0; ! 1018: }