File:  [ELWIX - Embedded LightWeight unIX -] / embedaddon / curl / tests / smbserver.py
Revision 1.1.1.1 (vendor branch): download - view: text, annotated - select for diffs - revision graph
Wed Jun 3 10:01:16 2020 UTC (5 years ago) by misho
Branches: curl, MAIN
CVS tags: v7_70_0p4, HEAD
curl

    1: #!/usr/bin/env python
    2: # -*- coding: utf-8 -*-
    3: #
    4: #  Project                     ___| | | |  _ \| |
    5: #                             / __| | | | |_) | |
    6: #                            | (__| |_| |  _ <| |___
    7: #                             \___|\___/|_| \_\_____|
    8: #
    9: # Copyright (C) 2017 - 2020, Daniel Stenberg, <daniel@haxx.se>, et al.
   10: #
   11: # This software is licensed as described in the file COPYING, which
   12: # you should have received as part of this distribution. The terms
   13: # are also available at https://curl.haxx.se/docs/copyright.html.
   14: #
   15: # You may opt to use, copy, modify, merge, publish, distribute and/or sell
   16: # copies of the Software, and permit persons to whom the Software is
   17: # furnished to do so, under the terms of the COPYING file.
   18: #
   19: # This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
   20: # KIND, either express or implied.
   21: #
   22: """Server for testing SMB"""
   23: 
   24: from __future__ import (absolute_import, division, print_function)
   25: # NOTE: the impacket configuration is not unicode_literals compatible!
   26: import argparse
   27: import os
   28: import sys
   29: import logging
   30: import tempfile
   31: if sys.version_info.major >= 3:
   32:     import configparser
   33: else:
   34:     import ConfigParser as configparser
   35: 
   36: # Import our curl test data helper
   37: import curl_test_data
   38: 
   39: # impacket needs to be installed in the Python environment
   40: try:
   41:     import impacket
   42: except ImportError:
   43:     sys.stderr.write('Python package impacket needs to be installed!\n')
   44:     sys.stderr.write('Use pip or your package manager to install it.\n')
   45:     sys.exit(1)
   46: from impacket import smbserver as imp_smbserver
   47: from impacket import smb as imp_smb
   48: from impacket.nt_errors import (STATUS_ACCESS_DENIED, STATUS_SUCCESS,
   49:                                 STATUS_NO_SUCH_FILE)
   50: 
   51: log = logging.getLogger(__name__)
   52: SERVER_MAGIC = "SERVER_MAGIC"
   53: TESTS_MAGIC = "TESTS_MAGIC"
   54: VERIFIED_REQ = "verifiedserver"
   55: VERIFIED_RSP = "WE ROOLZ: {pid}\n"
   56: 
   57: 
   58: def smbserver(options):
   59:     """Start up a TCP SMB server that serves forever
   60: 
   61:     """
   62:     if options.pidfile:
   63:         pid = os.getpid()
   64:         # see tests/server/util.c function write_pidfile
   65:         if os.name == "nt":
   66:             pid += 65536
   67:         with open(options.pidfile, "w") as f:
   68:             f.write(str(pid))
   69: 
   70:     # Here we write a mini config for the server
   71:     smb_config = configparser.ConfigParser()
   72:     smb_config.add_section("global")
   73:     smb_config.set("global", "server_name", "SERVICE")
   74:     smb_config.set("global", "server_os", "UNIX")
   75:     smb_config.set("global", "server_domain", "WORKGROUP")
   76:     smb_config.set("global", "log_file", "")
   77:     smb_config.set("global", "credentials_file", "")
   78: 
   79:     # We need a share which allows us to test that the server is running
   80:     smb_config.add_section("SERVER")
   81:     smb_config.set("SERVER", "comment", "server function")
   82:     smb_config.set("SERVER", "read only", "yes")
   83:     smb_config.set("SERVER", "share type", "0")
   84:     smb_config.set("SERVER", "path", SERVER_MAGIC)
   85: 
   86:     # Have a share for tests.  These files will be autogenerated from the
   87:     # test input.
   88:     smb_config.add_section("TESTS")
   89:     smb_config.set("TESTS", "comment", "tests")
   90:     smb_config.set("TESTS", "read only", "yes")
   91:     smb_config.set("TESTS", "share type", "0")
   92:     smb_config.set("TESTS", "path", TESTS_MAGIC)
   93: 
   94:     if not options.srcdir or not os.path.isdir(options.srcdir):
   95:         raise ScriptException("--srcdir is mandatory")
   96: 
   97:     test_data_dir = os.path.join(options.srcdir, "data")
   98: 
   99:     smb_server = TestSmbServer((options.host, options.port),
  100:                                config_parser=smb_config,
  101:                                test_data_directory=test_data_dir)
  102:     log.info("[SMB] setting up SMB server on port %s", options.port)
  103:     smb_server.processConfigFile()
  104:     smb_server.serve_forever()
  105:     return 0
  106: 
  107: 
  108: class TestSmbServer(imp_smbserver.SMBSERVER):
  109:     """
  110:     Test server for SMB which subclasses the impacket SMBSERVER and provides
  111:     test functionality.
  112:     """
  113: 
  114:     def __init__(self,
  115:                  address,
  116:                  config_parser=None,
  117:                  test_data_directory=None):
  118:         imp_smbserver.SMBSERVER.__init__(self,
  119:                                          address,
  120:                                          config_parser=config_parser)
  121: 
  122:         # Set up a test data object so we can get test data later.
  123:         self.ctd = curl_test_data.TestData(test_data_directory)
  124: 
  125:         # Override smbComNtCreateAndX so we can pretend to have files which
  126:         # don't exist.
  127:         self.hookSmbCommand(imp_smb.SMB.SMB_COM_NT_CREATE_ANDX,
  128:                             self.create_and_x)
  129: 
  130:     def create_and_x(self, conn_id, smb_server, smb_command, recv_packet):
  131:         """
  132:         Our version of smbComNtCreateAndX looks for special test files and
  133:         fools the rest of the framework into opening them as if they were
  134:         normal files.
  135:         """
  136:         conn_data = smb_server.getConnectionData(conn_id)
  137: 
  138:         # Wrap processing in a try block which allows us to throw SmbException
  139:         # to control the flow.
  140:         try:
  141:             ncax_parms = imp_smb.SMBNtCreateAndX_Parameters(
  142:                 smb_command["Parameters"])
  143: 
  144:             path = self.get_share_path(conn_data,
  145:                                        ncax_parms["RootFid"],
  146:                                        recv_packet["Tid"])
  147:             log.info("[SMB] Requested share path: %s", path)
  148: 
  149:             disposition = ncax_parms["Disposition"]
  150:             log.debug("[SMB] Requested disposition: %s", disposition)
  151: 
  152:             # Currently we only support reading files.
  153:             if disposition != imp_smb.FILE_OPEN:
  154:                 raise SmbException(STATUS_ACCESS_DENIED,
  155:                                    "Only support reading files")
  156: 
  157:             # Check to see if the path we were given is actually a
  158:             # magic path which needs generating on the fly.
  159:             if path not in [SERVER_MAGIC, TESTS_MAGIC]:
  160:                 # Pass the command onto the original handler.
  161:                 return imp_smbserver.SMBCommands.smbComNtCreateAndX(conn_id,
  162:                                                                     smb_server,
  163:                                                                     smb_command,
  164:                                                                     recv_packet)
  165: 
  166:             flags2 = recv_packet["Flags2"]
  167:             ncax_data = imp_smb.SMBNtCreateAndX_Data(flags=flags2,
  168:                                                      data=smb_command[
  169:                                                          "Data"])
  170:             requested_file = imp_smbserver.decodeSMBString(
  171:                 flags2,
  172:                 ncax_data["FileName"])
  173:             log.debug("[SMB] User requested file '%s'", requested_file)
  174: 
  175:             if path == SERVER_MAGIC:
  176:                 fid, full_path = self.get_server_path(requested_file)
  177:             else:
  178:                 assert (path == TESTS_MAGIC)
  179:                 fid, full_path = self.get_test_path(requested_file)
  180: 
  181:             resp_parms = imp_smb.SMBNtCreateAndXResponse_Parameters()
  182:             resp_data = ""
  183: 
  184:             # Simple way to generate a fid
  185:             if len(conn_data["OpenedFiles"]) == 0:
  186:                 fakefid = 1
  187:             else:
  188:                 fakefid = conn_data["OpenedFiles"].keys()[-1] + 1
  189:             resp_parms["Fid"] = fakefid
  190:             resp_parms["CreateAction"] = disposition
  191: 
  192:             if os.path.isdir(path):
  193:                 resp_parms[
  194:                     "FileAttributes"] = imp_smb.SMB_FILE_ATTRIBUTE_DIRECTORY
  195:                 resp_parms["IsDirectory"] = 1
  196:             else:
  197:                 resp_parms["IsDirectory"] = 0
  198:                 resp_parms["FileAttributes"] = ncax_parms["FileAttributes"]
  199: 
  200:             # Get this file's information
  201:             resp_info, error_code = imp_smbserver.queryPathInformation(
  202:                 "", full_path, level=imp_smb.SMB_QUERY_FILE_ALL_INFO)
  203: 
  204:             if error_code != STATUS_SUCCESS:
  205:                 raise SmbException(error_code, "Failed to query path info")
  206: 
  207:             resp_parms["CreateTime"] = resp_info["CreationTime"]
  208:             resp_parms["LastAccessTime"] = resp_info[
  209:                 "LastAccessTime"]
  210:             resp_parms["LastWriteTime"] = resp_info["LastWriteTime"]
  211:             resp_parms["LastChangeTime"] = resp_info[
  212:                 "LastChangeTime"]
  213:             resp_parms["FileAttributes"] = resp_info[
  214:                 "ExtFileAttributes"]
  215:             resp_parms["AllocationSize"] = resp_info[
  216:                 "AllocationSize"]
  217:             resp_parms["EndOfFile"] = resp_info["EndOfFile"]
  218: 
  219:             # Let's store the fid for the connection
  220:             # smbServer.log("Create file %s, mode:0x%x" % (pathName, mode))
  221:             conn_data["OpenedFiles"][fakefid] = {}
  222:             conn_data["OpenedFiles"][fakefid]["FileHandle"] = fid
  223:             conn_data["OpenedFiles"][fakefid]["FileName"] = path
  224:             conn_data["OpenedFiles"][fakefid]["DeleteOnClose"] = False
  225: 
  226:         except SmbException as s:
  227:             log.debug("[SMB] SmbException hit: %s", s)
  228:             error_code = s.error_code
  229:             resp_parms = ""
  230:             resp_data = ""
  231: 
  232:         resp_cmd = imp_smb.SMBCommand(imp_smb.SMB.SMB_COM_NT_CREATE_ANDX)
  233:         resp_cmd["Parameters"] = resp_parms
  234:         resp_cmd["Data"] = resp_data
  235:         smb_server.setConnectionData(conn_id, conn_data)
  236: 
  237:         return [resp_cmd], None, error_code
  238: 
  239:     def get_share_path(self, conn_data, root_fid, tid):
  240:         conn_shares = conn_data["ConnectedShares"]
  241: 
  242:         if tid in conn_shares:
  243:             if root_fid > 0:
  244:                 # If we have a rootFid, the path is relative to that fid
  245:                 path = conn_data["OpenedFiles"][root_fid]["FileName"]
  246:                 log.debug("RootFid present %s!" % path)
  247:             else:
  248:                 if "path" in conn_shares[tid]:
  249:                     path = conn_shares[tid]["path"]
  250:                 else:
  251:                     raise SmbException(STATUS_ACCESS_DENIED,
  252:                                        "Connection share had no path")
  253:         else:
  254:             raise SmbException(imp_smbserver.STATUS_SMB_BAD_TID,
  255:                                "TID was invalid")
  256: 
  257:         return path
  258: 
  259:     def get_server_path(self, requested_filename):
  260:         log.debug("[SMB] Get server path '%s'", requested_filename)
  261: 
  262:         if requested_filename not in [VERIFIED_REQ]:
  263:             raise SmbException(STATUS_NO_SUCH_FILE, "Couldn't find the file")
  264: 
  265:         fid, filename = tempfile.mkstemp()
  266:         log.debug("[SMB] Created %s (%d) for storing '%s'",
  267:                   filename, fid, requested_filename)
  268: 
  269:         contents = ""
  270: 
  271:         if requested_filename == VERIFIED_REQ:
  272:             log.debug("[SMB] Verifying server is alive")
  273:             pid = os.getpid()
  274:             # see tests/server/util.c function write_pidfile
  275:             if os.name == "nt":
  276:                 pid += 65536
  277:             contents = VERIFIED_RSP.format(pid=pid).encode('utf-8')
  278: 
  279:         self.write_to_fid(fid, contents)
  280:         return fid, filename
  281: 
  282:     def write_to_fid(self, fid, contents):
  283:         # Write the contents to file descriptor
  284:         os.write(fid, contents)
  285:         os.fsync(fid)
  286: 
  287:         # Rewind the file to the beginning so a read gets us the contents
  288:         os.lseek(fid, 0, os.SEEK_SET)
  289: 
  290:     def get_test_path(self, requested_filename):
  291:         log.info("[SMB] Get reply data from 'test%s'", requested_filename)
  292: 
  293:         fid, filename = tempfile.mkstemp()
  294:         log.debug("[SMB] Created %s (%d) for storing test '%s'",
  295:                   filename, fid, requested_filename)
  296: 
  297:         try:
  298:             contents = self.ctd.get_test_data(requested_filename).encode('utf-8')
  299:             self.write_to_fid(fid, contents)
  300:             return fid, filename
  301: 
  302:         except Exception:
  303:             log.exception("Failed to make test file")
  304:             raise SmbException(STATUS_NO_SUCH_FILE, "Failed to make test file")
  305: 
  306: 
  307: class SmbException(Exception):
  308:     def __init__(self, error_code, error_message):
  309:         super(SmbException, self).__init__(error_message)
  310:         self.error_code = error_code
  311: 
  312: 
  313: class ScriptRC(object):
  314:     """Enum for script return codes"""
  315:     SUCCESS = 0
  316:     FAILURE = 1
  317:     EXCEPTION = 2
  318: 
  319: 
  320: class ScriptException(Exception):
  321:     pass
  322: 
  323: 
  324: def get_options():
  325:     parser = argparse.ArgumentParser()
  326: 
  327:     parser.add_argument("--port", action="store", default=9017,
  328:                       type=int, help="port to listen on")
  329:     parser.add_argument("--host", action="store", default="127.0.0.1",
  330:                       help="host to listen on")
  331:     parser.add_argument("--verbose", action="store", type=int, default=0,
  332:                         help="verbose output")
  333:     parser.add_argument("--pidfile", action="store",
  334:                         help="file name for the PID")
  335:     parser.add_argument("--logfile", action="store",
  336:                         help="file name for the log")
  337:     parser.add_argument("--srcdir", action="store", help="test directory")
  338:     parser.add_argument("--id", action="store", help="server ID")
  339:     parser.add_argument("--ipv4", action="store_true", default=0,
  340:                         help="IPv4 flag")
  341: 
  342:     return parser.parse_args()
  343: 
  344: 
  345: def setup_logging(options):
  346:     """
  347:     Set up logging from the command line options
  348:     """
  349:     root_logger = logging.getLogger()
  350:     add_stdout = False
  351: 
  352:     formatter = logging.Formatter("%(asctime)s %(levelname)-5.5s %(message)s")
  353: 
  354:     # Write out to a logfile
  355:     if options.logfile:
  356:         handler = logging.FileHandler(options.logfile, mode="w")
  357:         handler.setFormatter(formatter)
  358:         handler.setLevel(logging.DEBUG)
  359:         root_logger.addHandler(handler)
  360:     else:
  361:         # The logfile wasn't specified. Add a stdout logger.
  362:         add_stdout = True
  363: 
  364:     if options.verbose:
  365:         # Add a stdout logger as well in verbose mode
  366:         root_logger.setLevel(logging.DEBUG)
  367:         add_stdout = True
  368:     else:
  369:         root_logger.setLevel(logging.INFO)
  370: 
  371:     if add_stdout:
  372:         stdout_handler = logging.StreamHandler(sys.stdout)
  373:         stdout_handler.setFormatter(formatter)
  374:         stdout_handler.setLevel(logging.DEBUG)
  375:         root_logger.addHandler(stdout_handler)
  376: 
  377: 
  378: if __name__ == '__main__':
  379:     # Get the options from the user.
  380:     options = get_options()
  381: 
  382:     # Setup logging using the user options
  383:     setup_logging(options)
  384: 
  385:     # Run main script.
  386:     try:
  387:         rc = smbserver(options)
  388:     except Exception as e:
  389:         log.exception(e)
  390:         rc = ScriptRC.EXCEPTION
  391: 
  392:     log.info("[SMB] Returning %d", rc)
  393:     sys.exit(rc)

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>