Annotation of embedaddon/curl/tests/smbserver.py, revision 1.1
1.1 ! misho 1: #!/usr/bin/env python
! 2: # -*- coding: utf-8 -*-
! 3: #
! 4: # Project ___| | | | _ \| |
! 5: # / __| | | | |_) | |
! 6: # | (__| |_| | _ <| |___
! 7: # \___|\___/|_| \_\_____|
! 8: #
! 9: # Copyright (C) 2017 - 2020, Daniel Stenberg, <daniel@haxx.se>, et al.
! 10: #
! 11: # This software is licensed as described in the file COPYING, which
! 12: # you should have received as part of this distribution. The terms
! 13: # are also available at https://curl.haxx.se/docs/copyright.html.
! 14: #
! 15: # You may opt to use, copy, modify, merge, publish, distribute and/or sell
! 16: # copies of the Software, and permit persons to whom the Software is
! 17: # furnished to do so, under the terms of the COPYING file.
! 18: #
! 19: # This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
! 20: # KIND, either express or implied.
! 21: #
! 22: """Server for testing SMB"""
! 23:
! 24: from __future__ import (absolute_import, division, print_function)
! 25: # NOTE: the impacket configuration is not unicode_literals compatible!
! 26: import argparse
! 27: import os
! 28: import sys
! 29: import logging
! 30: import tempfile
! 31: if sys.version_info.major >= 3:
! 32: import configparser
! 33: else:
! 34: import ConfigParser as configparser
! 35:
! 36: # Import our curl test data helper
! 37: import curl_test_data
! 38:
! 39: # impacket needs to be installed in the Python environment
! 40: try:
! 41: import impacket
! 42: except ImportError:
! 43: sys.stderr.write('Python package impacket needs to be installed!\n')
! 44: sys.stderr.write('Use pip or your package manager to install it.\n')
! 45: sys.exit(1)
! 46: from impacket import smbserver as imp_smbserver
! 47: from impacket import smb as imp_smb
! 48: from impacket.nt_errors import (STATUS_ACCESS_DENIED, STATUS_SUCCESS,
! 49: STATUS_NO_SUCH_FILE)
! 50:
! 51: log = logging.getLogger(__name__)
! 52: SERVER_MAGIC = "SERVER_MAGIC"
! 53: TESTS_MAGIC = "TESTS_MAGIC"
! 54: VERIFIED_REQ = "verifiedserver"
! 55: VERIFIED_RSP = "WE ROOLZ: {pid}\n"
! 56:
! 57:
! 58: def smbserver(options):
! 59: """Start up a TCP SMB server that serves forever
! 60:
! 61: """
! 62: if options.pidfile:
! 63: pid = os.getpid()
! 64: # see tests/server/util.c function write_pidfile
! 65: if os.name == "nt":
! 66: pid += 65536
! 67: with open(options.pidfile, "w") as f:
! 68: f.write(str(pid))
! 69:
! 70: # Here we write a mini config for the server
! 71: smb_config = configparser.ConfigParser()
! 72: smb_config.add_section("global")
! 73: smb_config.set("global", "server_name", "SERVICE")
! 74: smb_config.set("global", "server_os", "UNIX")
! 75: smb_config.set("global", "server_domain", "WORKGROUP")
! 76: smb_config.set("global", "log_file", "")
! 77: smb_config.set("global", "credentials_file", "")
! 78:
! 79: # We need a share which allows us to test that the server is running
! 80: smb_config.add_section("SERVER")
! 81: smb_config.set("SERVER", "comment", "server function")
! 82: smb_config.set("SERVER", "read only", "yes")
! 83: smb_config.set("SERVER", "share type", "0")
! 84: smb_config.set("SERVER", "path", SERVER_MAGIC)
! 85:
! 86: # Have a share for tests. These files will be autogenerated from the
! 87: # test input.
! 88: smb_config.add_section("TESTS")
! 89: smb_config.set("TESTS", "comment", "tests")
! 90: smb_config.set("TESTS", "read only", "yes")
! 91: smb_config.set("TESTS", "share type", "0")
! 92: smb_config.set("TESTS", "path", TESTS_MAGIC)
! 93:
! 94: if not options.srcdir or not os.path.isdir(options.srcdir):
! 95: raise ScriptException("--srcdir is mandatory")
! 96:
! 97: test_data_dir = os.path.join(options.srcdir, "data")
! 98:
! 99: smb_server = TestSmbServer((options.host, options.port),
! 100: config_parser=smb_config,
! 101: test_data_directory=test_data_dir)
! 102: log.info("[SMB] setting up SMB server on port %s", options.port)
! 103: smb_server.processConfigFile()
! 104: smb_server.serve_forever()
! 105: return 0
! 106:
! 107:
! 108: class TestSmbServer(imp_smbserver.SMBSERVER):
! 109: """
! 110: Test server for SMB which subclasses the impacket SMBSERVER and provides
! 111: test functionality.
! 112: """
! 113:
! 114: def __init__(self,
! 115: address,
! 116: config_parser=None,
! 117: test_data_directory=None):
! 118: imp_smbserver.SMBSERVER.__init__(self,
! 119: address,
! 120: config_parser=config_parser)
! 121:
! 122: # Set up a test data object so we can get test data later.
! 123: self.ctd = curl_test_data.TestData(test_data_directory)
! 124:
! 125: # Override smbComNtCreateAndX so we can pretend to have files which
! 126: # don't exist.
! 127: self.hookSmbCommand(imp_smb.SMB.SMB_COM_NT_CREATE_ANDX,
! 128: self.create_and_x)
! 129:
! 130: def create_and_x(self, conn_id, smb_server, smb_command, recv_packet):
! 131: """
! 132: Our version of smbComNtCreateAndX looks for special test files and
! 133: fools the rest of the framework into opening them as if they were
! 134: normal files.
! 135: """
! 136: conn_data = smb_server.getConnectionData(conn_id)
! 137:
! 138: # Wrap processing in a try block which allows us to throw SmbException
! 139: # to control the flow.
! 140: try:
! 141: ncax_parms = imp_smb.SMBNtCreateAndX_Parameters(
! 142: smb_command["Parameters"])
! 143:
! 144: path = self.get_share_path(conn_data,
! 145: ncax_parms["RootFid"],
! 146: recv_packet["Tid"])
! 147: log.info("[SMB] Requested share path: %s", path)
! 148:
! 149: disposition = ncax_parms["Disposition"]
! 150: log.debug("[SMB] Requested disposition: %s", disposition)
! 151:
! 152: # Currently we only support reading files.
! 153: if disposition != imp_smb.FILE_OPEN:
! 154: raise SmbException(STATUS_ACCESS_DENIED,
! 155: "Only support reading files")
! 156:
! 157: # Check to see if the path we were given is actually a
! 158: # magic path which needs generating on the fly.
! 159: if path not in [SERVER_MAGIC, TESTS_MAGIC]:
! 160: # Pass the command onto the original handler.
! 161: return imp_smbserver.SMBCommands.smbComNtCreateAndX(conn_id,
! 162: smb_server,
! 163: smb_command,
! 164: recv_packet)
! 165:
! 166: flags2 = recv_packet["Flags2"]
! 167: ncax_data = imp_smb.SMBNtCreateAndX_Data(flags=flags2,
! 168: data=smb_command[
! 169: "Data"])
! 170: requested_file = imp_smbserver.decodeSMBString(
! 171: flags2,
! 172: ncax_data["FileName"])
! 173: log.debug("[SMB] User requested file '%s'", requested_file)
! 174:
! 175: if path == SERVER_MAGIC:
! 176: fid, full_path = self.get_server_path(requested_file)
! 177: else:
! 178: assert (path == TESTS_MAGIC)
! 179: fid, full_path = self.get_test_path(requested_file)
! 180:
! 181: resp_parms = imp_smb.SMBNtCreateAndXResponse_Parameters()
! 182: resp_data = ""
! 183:
! 184: # Simple way to generate a fid
! 185: if len(conn_data["OpenedFiles"]) == 0:
! 186: fakefid = 1
! 187: else:
! 188: fakefid = conn_data["OpenedFiles"].keys()[-1] + 1
! 189: resp_parms["Fid"] = fakefid
! 190: resp_parms["CreateAction"] = disposition
! 191:
! 192: if os.path.isdir(path):
! 193: resp_parms[
! 194: "FileAttributes"] = imp_smb.SMB_FILE_ATTRIBUTE_DIRECTORY
! 195: resp_parms["IsDirectory"] = 1
! 196: else:
! 197: resp_parms["IsDirectory"] = 0
! 198: resp_parms["FileAttributes"] = ncax_parms["FileAttributes"]
! 199:
! 200: # Get this file's information
! 201: resp_info, error_code = imp_smbserver.queryPathInformation(
! 202: "", full_path, level=imp_smb.SMB_QUERY_FILE_ALL_INFO)
! 203:
! 204: if error_code != STATUS_SUCCESS:
! 205: raise SmbException(error_code, "Failed to query path info")
! 206:
! 207: resp_parms["CreateTime"] = resp_info["CreationTime"]
! 208: resp_parms["LastAccessTime"] = resp_info[
! 209: "LastAccessTime"]
! 210: resp_parms["LastWriteTime"] = resp_info["LastWriteTime"]
! 211: resp_parms["LastChangeTime"] = resp_info[
! 212: "LastChangeTime"]
! 213: resp_parms["FileAttributes"] = resp_info[
! 214: "ExtFileAttributes"]
! 215: resp_parms["AllocationSize"] = resp_info[
! 216: "AllocationSize"]
! 217: resp_parms["EndOfFile"] = resp_info["EndOfFile"]
! 218:
! 219: # Let's store the fid for the connection
! 220: # smbServer.log("Create file %s, mode:0x%x" % (pathName, mode))
! 221: conn_data["OpenedFiles"][fakefid] = {}
! 222: conn_data["OpenedFiles"][fakefid]["FileHandle"] = fid
! 223: conn_data["OpenedFiles"][fakefid]["FileName"] = path
! 224: conn_data["OpenedFiles"][fakefid]["DeleteOnClose"] = False
! 225:
! 226: except SmbException as s:
! 227: log.debug("[SMB] SmbException hit: %s", s)
! 228: error_code = s.error_code
! 229: resp_parms = ""
! 230: resp_data = ""
! 231:
! 232: resp_cmd = imp_smb.SMBCommand(imp_smb.SMB.SMB_COM_NT_CREATE_ANDX)
! 233: resp_cmd["Parameters"] = resp_parms
! 234: resp_cmd["Data"] = resp_data
! 235: smb_server.setConnectionData(conn_id, conn_data)
! 236:
! 237: return [resp_cmd], None, error_code
! 238:
! 239: def get_share_path(self, conn_data, root_fid, tid):
! 240: conn_shares = conn_data["ConnectedShares"]
! 241:
! 242: if tid in conn_shares:
! 243: if root_fid > 0:
! 244: # If we have a rootFid, the path is relative to that fid
! 245: path = conn_data["OpenedFiles"][root_fid]["FileName"]
! 246: log.debug("RootFid present %s!" % path)
! 247: else:
! 248: if "path" in conn_shares[tid]:
! 249: path = conn_shares[tid]["path"]
! 250: else:
! 251: raise SmbException(STATUS_ACCESS_DENIED,
! 252: "Connection share had no path")
! 253: else:
! 254: raise SmbException(imp_smbserver.STATUS_SMB_BAD_TID,
! 255: "TID was invalid")
! 256:
! 257: return path
! 258:
! 259: def get_server_path(self, requested_filename):
! 260: log.debug("[SMB] Get server path '%s'", requested_filename)
! 261:
! 262: if requested_filename not in [VERIFIED_REQ]:
! 263: raise SmbException(STATUS_NO_SUCH_FILE, "Couldn't find the file")
! 264:
! 265: fid, filename = tempfile.mkstemp()
! 266: log.debug("[SMB] Created %s (%d) for storing '%s'",
! 267: filename, fid, requested_filename)
! 268:
! 269: contents = ""
! 270:
! 271: if requested_filename == VERIFIED_REQ:
! 272: log.debug("[SMB] Verifying server is alive")
! 273: pid = os.getpid()
! 274: # see tests/server/util.c function write_pidfile
! 275: if os.name == "nt":
! 276: pid += 65536
! 277: contents = VERIFIED_RSP.format(pid=pid).encode('utf-8')
! 278:
! 279: self.write_to_fid(fid, contents)
! 280: return fid, filename
! 281:
! 282: def write_to_fid(self, fid, contents):
! 283: # Write the contents to file descriptor
! 284: os.write(fid, contents)
! 285: os.fsync(fid)
! 286:
! 287: # Rewind the file to the beginning so a read gets us the contents
! 288: os.lseek(fid, 0, os.SEEK_SET)
! 289:
! 290: def get_test_path(self, requested_filename):
! 291: log.info("[SMB] Get reply data from 'test%s'", requested_filename)
! 292:
! 293: fid, filename = tempfile.mkstemp()
! 294: log.debug("[SMB] Created %s (%d) for storing test '%s'",
! 295: filename, fid, requested_filename)
! 296:
! 297: try:
! 298: contents = self.ctd.get_test_data(requested_filename).encode('utf-8')
! 299: self.write_to_fid(fid, contents)
! 300: return fid, filename
! 301:
! 302: except Exception:
! 303: log.exception("Failed to make test file")
! 304: raise SmbException(STATUS_NO_SUCH_FILE, "Failed to make test file")
! 305:
! 306:
! 307: class SmbException(Exception):
! 308: def __init__(self, error_code, error_message):
! 309: super(SmbException, self).__init__(error_message)
! 310: self.error_code = error_code
! 311:
! 312:
! 313: class ScriptRC(object):
! 314: """Enum for script return codes"""
! 315: SUCCESS = 0
! 316: FAILURE = 1
! 317: EXCEPTION = 2
! 318:
! 319:
! 320: class ScriptException(Exception):
! 321: pass
! 322:
! 323:
! 324: def get_options():
! 325: parser = argparse.ArgumentParser()
! 326:
! 327: parser.add_argument("--port", action="store", default=9017,
! 328: type=int, help="port to listen on")
! 329: parser.add_argument("--host", action="store", default="127.0.0.1",
! 330: help="host to listen on")
! 331: parser.add_argument("--verbose", action="store", type=int, default=0,
! 332: help="verbose output")
! 333: parser.add_argument("--pidfile", action="store",
! 334: help="file name for the PID")
! 335: parser.add_argument("--logfile", action="store",
! 336: help="file name for the log")
! 337: parser.add_argument("--srcdir", action="store", help="test directory")
! 338: parser.add_argument("--id", action="store", help="server ID")
! 339: parser.add_argument("--ipv4", action="store_true", default=0,
! 340: help="IPv4 flag")
! 341:
! 342: return parser.parse_args()
! 343:
! 344:
! 345: def setup_logging(options):
! 346: """
! 347: Set up logging from the command line options
! 348: """
! 349: root_logger = logging.getLogger()
! 350: add_stdout = False
! 351:
! 352: formatter = logging.Formatter("%(asctime)s %(levelname)-5.5s %(message)s")
! 353:
! 354: # Write out to a logfile
! 355: if options.logfile:
! 356: handler = logging.FileHandler(options.logfile, mode="w")
! 357: handler.setFormatter(formatter)
! 358: handler.setLevel(logging.DEBUG)
! 359: root_logger.addHandler(handler)
! 360: else:
! 361: # The logfile wasn't specified. Add a stdout logger.
! 362: add_stdout = True
! 363:
! 364: if options.verbose:
! 365: # Add a stdout logger as well in verbose mode
! 366: root_logger.setLevel(logging.DEBUG)
! 367: add_stdout = True
! 368: else:
! 369: root_logger.setLevel(logging.INFO)
! 370:
! 371: if add_stdout:
! 372: stdout_handler = logging.StreamHandler(sys.stdout)
! 373: stdout_handler.setFormatter(formatter)
! 374: stdout_handler.setLevel(logging.DEBUG)
! 375: root_logger.addHandler(stdout_handler)
! 376:
! 377:
! 378: if __name__ == '__main__':
! 379: # Get the options from the user.
! 380: options = get_options()
! 381:
! 382: # Setup logging using the user options
! 383: setup_logging(options)
! 384:
! 385: # Run main script.
! 386: try:
! 387: rc = smbserver(options)
! 388: except Exception as e:
! 389: log.exception(e)
! 390: rc = ScriptRC.EXCEPTION
! 391:
! 392: log.info("[SMB] Returning %d", rc)
! 393: sys.exit(rc)
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>