Annotation of embedaddon/curl/tests/smbserver.py, revision 1.1.1.1

1.1       misho       1: #!/usr/bin/env python
                      2: # -*- coding: utf-8 -*-
                      3: #
                      4: #  Project                     ___| | | |  _ \| |
                      5: #                             / __| | | | |_) | |
                      6: #                            | (__| |_| |  _ <| |___
                      7: #                             \___|\___/|_| \_\_____|
                      8: #
                      9: # Copyright (C) 2017 - 2020, Daniel Stenberg, <daniel@haxx.se>, et al.
                     10: #
                     11: # This software is licensed as described in the file COPYING, which
                     12: # you should have received as part of this distribution. The terms
                     13: # are also available at https://curl.haxx.se/docs/copyright.html.
                     14: #
                     15: # You may opt to use, copy, modify, merge, publish, distribute and/or sell
                     16: # copies of the Software, and permit persons to whom the Software is
                     17: # furnished to do so, under the terms of the COPYING file.
                     18: #
                     19: # This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
                     20: # KIND, either express or implied.
                     21: #
                     22: """Server for testing SMB"""
                     23: 
                     24: from __future__ import (absolute_import, division, print_function)
                     25: # NOTE: the impacket configuration is not unicode_literals compatible!
                     26: import argparse
                     27: import os
                     28: import sys
                     29: import logging
                     30: import tempfile
                     31: if sys.version_info.major >= 3:
                     32:     import configparser
                     33: else:
                     34:     import ConfigParser as configparser
                     35: 
                     36: # Import our curl test data helper
                     37: import curl_test_data
                     38: 
                     39: # impacket needs to be installed in the Python environment
                     40: try:
                     41:     import impacket
                     42: except ImportError:
                     43:     sys.stderr.write('Python package impacket needs to be installed!\n')
                     44:     sys.stderr.write('Use pip or your package manager to install it.\n')
                     45:     sys.exit(1)
                     46: from impacket import smbserver as imp_smbserver
                     47: from impacket import smb as imp_smb
                     48: from impacket.nt_errors import (STATUS_ACCESS_DENIED, STATUS_SUCCESS,
                     49:                                 STATUS_NO_SUCH_FILE)
                     50: 
                     51: log = logging.getLogger(__name__)
                     52: SERVER_MAGIC = "SERVER_MAGIC"
                     53: TESTS_MAGIC = "TESTS_MAGIC"
                     54: VERIFIED_REQ = "verifiedserver"
                     55: VERIFIED_RSP = "WE ROOLZ: {pid}\n"
                     56: 
                     57: 
                     58: def smbserver(options):
                     59:     """Start up a TCP SMB server that serves forever
                     60: 
                     61:     """
                     62:     if options.pidfile:
                     63:         pid = os.getpid()
                     64:         # see tests/server/util.c function write_pidfile
                     65:         if os.name == "nt":
                     66:             pid += 65536
                     67:         with open(options.pidfile, "w") as f:
                     68:             f.write(str(pid))
                     69: 
                     70:     # Here we write a mini config for the server
                     71:     smb_config = configparser.ConfigParser()
                     72:     smb_config.add_section("global")
                     73:     smb_config.set("global", "server_name", "SERVICE")
                     74:     smb_config.set("global", "server_os", "UNIX")
                     75:     smb_config.set("global", "server_domain", "WORKGROUP")
                     76:     smb_config.set("global", "log_file", "")
                     77:     smb_config.set("global", "credentials_file", "")
                     78: 
                     79:     # We need a share which allows us to test that the server is running
                     80:     smb_config.add_section("SERVER")
                     81:     smb_config.set("SERVER", "comment", "server function")
                     82:     smb_config.set("SERVER", "read only", "yes")
                     83:     smb_config.set("SERVER", "share type", "0")
                     84:     smb_config.set("SERVER", "path", SERVER_MAGIC)
                     85: 
                     86:     # Have a share for tests.  These files will be autogenerated from the
                     87:     # test input.
                     88:     smb_config.add_section("TESTS")
                     89:     smb_config.set("TESTS", "comment", "tests")
                     90:     smb_config.set("TESTS", "read only", "yes")
                     91:     smb_config.set("TESTS", "share type", "0")
                     92:     smb_config.set("TESTS", "path", TESTS_MAGIC)
                     93: 
                     94:     if not options.srcdir or not os.path.isdir(options.srcdir):
                     95:         raise ScriptException("--srcdir is mandatory")
                     96: 
                     97:     test_data_dir = os.path.join(options.srcdir, "data")
                     98: 
                     99:     smb_server = TestSmbServer((options.host, options.port),
                    100:                                config_parser=smb_config,
                    101:                                test_data_directory=test_data_dir)
                    102:     log.info("[SMB] setting up SMB server on port %s", options.port)
                    103:     smb_server.processConfigFile()
                    104:     smb_server.serve_forever()
                    105:     return 0
                    106: 
                    107: 
                    108: class TestSmbServer(imp_smbserver.SMBSERVER):
                    109:     """
                    110:     Test server for SMB which subclasses the impacket SMBSERVER and provides
                    111:     test functionality.
                    112:     """
                    113: 
                    114:     def __init__(self,
                    115:                  address,
                    116:                  config_parser=None,
                    117:                  test_data_directory=None):
                    118:         imp_smbserver.SMBSERVER.__init__(self,
                    119:                                          address,
                    120:                                          config_parser=config_parser)
                    121: 
                    122:         # Set up a test data object so we can get test data later.
                    123:         self.ctd = curl_test_data.TestData(test_data_directory)
                    124: 
                    125:         # Override smbComNtCreateAndX so we can pretend to have files which
                    126:         # don't exist.
                    127:         self.hookSmbCommand(imp_smb.SMB.SMB_COM_NT_CREATE_ANDX,
                    128:                             self.create_and_x)
                    129: 
                    130:     def create_and_x(self, conn_id, smb_server, smb_command, recv_packet):
                    131:         """
                    132:         Our version of smbComNtCreateAndX looks for special test files and
                    133:         fools the rest of the framework into opening them as if they were
                    134:         normal files.
                    135:         """
                    136:         conn_data = smb_server.getConnectionData(conn_id)
                    137: 
                    138:         # Wrap processing in a try block which allows us to throw SmbException
                    139:         # to control the flow.
                    140:         try:
                    141:             ncax_parms = imp_smb.SMBNtCreateAndX_Parameters(
                    142:                 smb_command["Parameters"])
                    143: 
                    144:             path = self.get_share_path(conn_data,
                    145:                                        ncax_parms["RootFid"],
                    146:                                        recv_packet["Tid"])
                    147:             log.info("[SMB] Requested share path: %s", path)
                    148: 
                    149:             disposition = ncax_parms["Disposition"]
                    150:             log.debug("[SMB] Requested disposition: %s", disposition)
                    151: 
                    152:             # Currently we only support reading files.
                    153:             if disposition != imp_smb.FILE_OPEN:
                    154:                 raise SmbException(STATUS_ACCESS_DENIED,
                    155:                                    "Only support reading files")
                    156: 
                    157:             # Check to see if the path we were given is actually a
                    158:             # magic path which needs generating on the fly.
                    159:             if path not in [SERVER_MAGIC, TESTS_MAGIC]:
                    160:                 # Pass the command onto the original handler.
                    161:                 return imp_smbserver.SMBCommands.smbComNtCreateAndX(conn_id,
                    162:                                                                     smb_server,
                    163:                                                                     smb_command,
                    164:                                                                     recv_packet)
                    165: 
                    166:             flags2 = recv_packet["Flags2"]
                    167:             ncax_data = imp_smb.SMBNtCreateAndX_Data(flags=flags2,
                    168:                                                      data=smb_command[
                    169:                                                          "Data"])
                    170:             requested_file = imp_smbserver.decodeSMBString(
                    171:                 flags2,
                    172:                 ncax_data["FileName"])
                    173:             log.debug("[SMB] User requested file '%s'", requested_file)
                    174: 
                    175:             if path == SERVER_MAGIC:
                    176:                 fid, full_path = self.get_server_path(requested_file)
                    177:             else:
                    178:                 assert (path == TESTS_MAGIC)
                    179:                 fid, full_path = self.get_test_path(requested_file)
                    180: 
                    181:             resp_parms = imp_smb.SMBNtCreateAndXResponse_Parameters()
                    182:             resp_data = ""
                    183: 
                    184:             # Simple way to generate a fid
                    185:             if len(conn_data["OpenedFiles"]) == 0:
                    186:                 fakefid = 1
                    187:             else:
                    188:                 fakefid = conn_data["OpenedFiles"].keys()[-1] + 1
                    189:             resp_parms["Fid"] = fakefid
                    190:             resp_parms["CreateAction"] = disposition
                    191: 
                    192:             if os.path.isdir(path):
                    193:                 resp_parms[
                    194:                     "FileAttributes"] = imp_smb.SMB_FILE_ATTRIBUTE_DIRECTORY
                    195:                 resp_parms["IsDirectory"] = 1
                    196:             else:
                    197:                 resp_parms["IsDirectory"] = 0
                    198:                 resp_parms["FileAttributes"] = ncax_parms["FileAttributes"]
                    199: 
                    200:             # Get this file's information
                    201:             resp_info, error_code = imp_smbserver.queryPathInformation(
                    202:                 "", full_path, level=imp_smb.SMB_QUERY_FILE_ALL_INFO)
                    203: 
                    204:             if error_code != STATUS_SUCCESS:
                    205:                 raise SmbException(error_code, "Failed to query path info")
                    206: 
                    207:             resp_parms["CreateTime"] = resp_info["CreationTime"]
                    208:             resp_parms["LastAccessTime"] = resp_info[
                    209:                 "LastAccessTime"]
                    210:             resp_parms["LastWriteTime"] = resp_info["LastWriteTime"]
                    211:             resp_parms["LastChangeTime"] = resp_info[
                    212:                 "LastChangeTime"]
                    213:             resp_parms["FileAttributes"] = resp_info[
                    214:                 "ExtFileAttributes"]
                    215:             resp_parms["AllocationSize"] = resp_info[
                    216:                 "AllocationSize"]
                    217:             resp_parms["EndOfFile"] = resp_info["EndOfFile"]
                    218: 
                    219:             # Let's store the fid for the connection
                    220:             # smbServer.log("Create file %s, mode:0x%x" % (pathName, mode))
                    221:             conn_data["OpenedFiles"][fakefid] = {}
                    222:             conn_data["OpenedFiles"][fakefid]["FileHandle"] = fid
                    223:             conn_data["OpenedFiles"][fakefid]["FileName"] = path
                    224:             conn_data["OpenedFiles"][fakefid]["DeleteOnClose"] = False
                    225: 
                    226:         except SmbException as s:
                    227:             log.debug("[SMB] SmbException hit: %s", s)
                    228:             error_code = s.error_code
                    229:             resp_parms = ""
                    230:             resp_data = ""
                    231: 
                    232:         resp_cmd = imp_smb.SMBCommand(imp_smb.SMB.SMB_COM_NT_CREATE_ANDX)
                    233:         resp_cmd["Parameters"] = resp_parms
                    234:         resp_cmd["Data"] = resp_data
                    235:         smb_server.setConnectionData(conn_id, conn_data)
                    236: 
                    237:         return [resp_cmd], None, error_code
                    238: 
                    239:     def get_share_path(self, conn_data, root_fid, tid):
                    240:         conn_shares = conn_data["ConnectedShares"]
                    241: 
                    242:         if tid in conn_shares:
                    243:             if root_fid > 0:
                    244:                 # If we have a rootFid, the path is relative to that fid
                    245:                 path = conn_data["OpenedFiles"][root_fid]["FileName"]
                    246:                 log.debug("RootFid present %s!" % path)
                    247:             else:
                    248:                 if "path" in conn_shares[tid]:
                    249:                     path = conn_shares[tid]["path"]
                    250:                 else:
                    251:                     raise SmbException(STATUS_ACCESS_DENIED,
                    252:                                        "Connection share had no path")
                    253:         else:
                    254:             raise SmbException(imp_smbserver.STATUS_SMB_BAD_TID,
                    255:                                "TID was invalid")
                    256: 
                    257:         return path
                    258: 
                    259:     def get_server_path(self, requested_filename):
                    260:         log.debug("[SMB] Get server path '%s'", requested_filename)
                    261: 
                    262:         if requested_filename not in [VERIFIED_REQ]:
                    263:             raise SmbException(STATUS_NO_SUCH_FILE, "Couldn't find the file")
                    264: 
                    265:         fid, filename = tempfile.mkstemp()
                    266:         log.debug("[SMB] Created %s (%d) for storing '%s'",
                    267:                   filename, fid, requested_filename)
                    268: 
                    269:         contents = ""
                    270: 
                    271:         if requested_filename == VERIFIED_REQ:
                    272:             log.debug("[SMB] Verifying server is alive")
                    273:             pid = os.getpid()
                    274:             # see tests/server/util.c function write_pidfile
                    275:             if os.name == "nt":
                    276:                 pid += 65536
                    277:             contents = VERIFIED_RSP.format(pid=pid).encode('utf-8')
                    278: 
                    279:         self.write_to_fid(fid, contents)
                    280:         return fid, filename
                    281: 
                    282:     def write_to_fid(self, fid, contents):
                    283:         # Write the contents to file descriptor
                    284:         os.write(fid, contents)
                    285:         os.fsync(fid)
                    286: 
                    287:         # Rewind the file to the beginning so a read gets us the contents
                    288:         os.lseek(fid, 0, os.SEEK_SET)
                    289: 
                    290:     def get_test_path(self, requested_filename):
                    291:         log.info("[SMB] Get reply data from 'test%s'", requested_filename)
                    292: 
                    293:         fid, filename = tempfile.mkstemp()
                    294:         log.debug("[SMB] Created %s (%d) for storing test '%s'",
                    295:                   filename, fid, requested_filename)
                    296: 
                    297:         try:
                    298:             contents = self.ctd.get_test_data(requested_filename).encode('utf-8')
                    299:             self.write_to_fid(fid, contents)
                    300:             return fid, filename
                    301: 
                    302:         except Exception:
                    303:             log.exception("Failed to make test file")
                    304:             raise SmbException(STATUS_NO_SUCH_FILE, "Failed to make test file")
                    305: 
                    306: 
                    307: class SmbException(Exception):
                    308:     def __init__(self, error_code, error_message):
                    309:         super(SmbException, self).__init__(error_message)
                    310:         self.error_code = error_code
                    311: 
                    312: 
                    313: class ScriptRC(object):
                    314:     """Enum for script return codes"""
                    315:     SUCCESS = 0
                    316:     FAILURE = 1
                    317:     EXCEPTION = 2
                    318: 
                    319: 
                    320: class ScriptException(Exception):
                    321:     pass
                    322: 
                    323: 
                    324: def get_options():
                    325:     parser = argparse.ArgumentParser()
                    326: 
                    327:     parser.add_argument("--port", action="store", default=9017,
                    328:                       type=int, help="port to listen on")
                    329:     parser.add_argument("--host", action="store", default="127.0.0.1",
                    330:                       help="host to listen on")
                    331:     parser.add_argument("--verbose", action="store", type=int, default=0,
                    332:                         help="verbose output")
                    333:     parser.add_argument("--pidfile", action="store",
                    334:                         help="file name for the PID")
                    335:     parser.add_argument("--logfile", action="store",
                    336:                         help="file name for the log")
                    337:     parser.add_argument("--srcdir", action="store", help="test directory")
                    338:     parser.add_argument("--id", action="store", help="server ID")
                    339:     parser.add_argument("--ipv4", action="store_true", default=0,
                    340:                         help="IPv4 flag")
                    341: 
                    342:     return parser.parse_args()
                    343: 
                    344: 
                    345: def setup_logging(options):
                    346:     """
                    347:     Set up logging from the command line options
                    348:     """
                    349:     root_logger = logging.getLogger()
                    350:     add_stdout = False
                    351: 
                    352:     formatter = logging.Formatter("%(asctime)s %(levelname)-5.5s %(message)s")
                    353: 
                    354:     # Write out to a logfile
                    355:     if options.logfile:
                    356:         handler = logging.FileHandler(options.logfile, mode="w")
                    357:         handler.setFormatter(formatter)
                    358:         handler.setLevel(logging.DEBUG)
                    359:         root_logger.addHandler(handler)
                    360:     else:
                    361:         # The logfile wasn't specified. Add a stdout logger.
                    362:         add_stdout = True
                    363: 
                    364:     if options.verbose:
                    365:         # Add a stdout logger as well in verbose mode
                    366:         root_logger.setLevel(logging.DEBUG)
                    367:         add_stdout = True
                    368:     else:
                    369:         root_logger.setLevel(logging.INFO)
                    370: 
                    371:     if add_stdout:
                    372:         stdout_handler = logging.StreamHandler(sys.stdout)
                    373:         stdout_handler.setFormatter(formatter)
                    374:         stdout_handler.setLevel(logging.DEBUG)
                    375:         root_logger.addHandler(stdout_handler)
                    376: 
                    377: 
                    378: if __name__ == '__main__':
                    379:     # Get the options from the user.
                    380:     options = get_options()
                    381: 
                    382:     # Setup logging using the user options
                    383:     setup_logging(options)
                    384: 
                    385:     # Run main script.
                    386:     try:
                    387:         rc = smbserver(options)
                    388:     except Exception as e:
                    389:         log.exception(e)
                    390:         rc = ScriptRC.EXCEPTION
                    391: 
                    392:     log.info("[SMB] Returning %d", rc)
                    393:     sys.exit(rc)

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>