Annotation of embedaddon/curl/tests/smbserver.py, revision 1.1.1.1
1.1 misho 1: #!/usr/bin/env python
2: # -*- coding: utf-8 -*-
3: #
4: # Project ___| | | | _ \| |
5: # / __| | | | |_) | |
6: # | (__| |_| | _ <| |___
7: # \___|\___/|_| \_\_____|
8: #
9: # Copyright (C) 2017 - 2020, Daniel Stenberg, <daniel@haxx.se>, et al.
10: #
11: # This software is licensed as described in the file COPYING, which
12: # you should have received as part of this distribution. The terms
13: # are also available at https://curl.haxx.se/docs/copyright.html.
14: #
15: # You may opt to use, copy, modify, merge, publish, distribute and/or sell
16: # copies of the Software, and permit persons to whom the Software is
17: # furnished to do so, under the terms of the COPYING file.
18: #
19: # This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
20: # KIND, either express or implied.
21: #
22: """Server for testing SMB"""
23:
24: from __future__ import (absolute_import, division, print_function)
25: # NOTE: the impacket configuration is not unicode_literals compatible!
26: import argparse
27: import os
28: import sys
29: import logging
30: import tempfile
31: if sys.version_info.major >= 3:
32: import configparser
33: else:
34: import ConfigParser as configparser
35:
36: # Import our curl test data helper
37: import curl_test_data
38:
39: # impacket needs to be installed in the Python environment
40: try:
41: import impacket
42: except ImportError:
43: sys.stderr.write('Python package impacket needs to be installed!\n')
44: sys.stderr.write('Use pip or your package manager to install it.\n')
45: sys.exit(1)
46: from impacket import smbserver as imp_smbserver
47: from impacket import smb as imp_smb
48: from impacket.nt_errors import (STATUS_ACCESS_DENIED, STATUS_SUCCESS,
49: STATUS_NO_SUCH_FILE)
50:
51: log = logging.getLogger(__name__)
52: SERVER_MAGIC = "SERVER_MAGIC"
53: TESTS_MAGIC = "TESTS_MAGIC"
54: VERIFIED_REQ = "verifiedserver"
55: VERIFIED_RSP = "WE ROOLZ: {pid}\n"
56:
57:
58: def smbserver(options):
59: """Start up a TCP SMB server that serves forever
60:
61: """
62: if options.pidfile:
63: pid = os.getpid()
64: # see tests/server/util.c function write_pidfile
65: if os.name == "nt":
66: pid += 65536
67: with open(options.pidfile, "w") as f:
68: f.write(str(pid))
69:
70: # Here we write a mini config for the server
71: smb_config = configparser.ConfigParser()
72: smb_config.add_section("global")
73: smb_config.set("global", "server_name", "SERVICE")
74: smb_config.set("global", "server_os", "UNIX")
75: smb_config.set("global", "server_domain", "WORKGROUP")
76: smb_config.set("global", "log_file", "")
77: smb_config.set("global", "credentials_file", "")
78:
79: # We need a share which allows us to test that the server is running
80: smb_config.add_section("SERVER")
81: smb_config.set("SERVER", "comment", "server function")
82: smb_config.set("SERVER", "read only", "yes")
83: smb_config.set("SERVER", "share type", "0")
84: smb_config.set("SERVER", "path", SERVER_MAGIC)
85:
86: # Have a share for tests. These files will be autogenerated from the
87: # test input.
88: smb_config.add_section("TESTS")
89: smb_config.set("TESTS", "comment", "tests")
90: smb_config.set("TESTS", "read only", "yes")
91: smb_config.set("TESTS", "share type", "0")
92: smb_config.set("TESTS", "path", TESTS_MAGIC)
93:
94: if not options.srcdir or not os.path.isdir(options.srcdir):
95: raise ScriptException("--srcdir is mandatory")
96:
97: test_data_dir = os.path.join(options.srcdir, "data")
98:
99: smb_server = TestSmbServer((options.host, options.port),
100: config_parser=smb_config,
101: test_data_directory=test_data_dir)
102: log.info("[SMB] setting up SMB server on port %s", options.port)
103: smb_server.processConfigFile()
104: smb_server.serve_forever()
105: return 0
106:
107:
108: class TestSmbServer(imp_smbserver.SMBSERVER):
109: """
110: Test server for SMB which subclasses the impacket SMBSERVER and provides
111: test functionality.
112: """
113:
114: def __init__(self,
115: address,
116: config_parser=None,
117: test_data_directory=None):
118: imp_smbserver.SMBSERVER.__init__(self,
119: address,
120: config_parser=config_parser)
121:
122: # Set up a test data object so we can get test data later.
123: self.ctd = curl_test_data.TestData(test_data_directory)
124:
125: # Override smbComNtCreateAndX so we can pretend to have files which
126: # don't exist.
127: self.hookSmbCommand(imp_smb.SMB.SMB_COM_NT_CREATE_ANDX,
128: self.create_and_x)
129:
130: def create_and_x(self, conn_id, smb_server, smb_command, recv_packet):
131: """
132: Our version of smbComNtCreateAndX looks for special test files and
133: fools the rest of the framework into opening them as if they were
134: normal files.
135: """
136: conn_data = smb_server.getConnectionData(conn_id)
137:
138: # Wrap processing in a try block which allows us to throw SmbException
139: # to control the flow.
140: try:
141: ncax_parms = imp_smb.SMBNtCreateAndX_Parameters(
142: smb_command["Parameters"])
143:
144: path = self.get_share_path(conn_data,
145: ncax_parms["RootFid"],
146: recv_packet["Tid"])
147: log.info("[SMB] Requested share path: %s", path)
148:
149: disposition = ncax_parms["Disposition"]
150: log.debug("[SMB] Requested disposition: %s", disposition)
151:
152: # Currently we only support reading files.
153: if disposition != imp_smb.FILE_OPEN:
154: raise SmbException(STATUS_ACCESS_DENIED,
155: "Only support reading files")
156:
157: # Check to see if the path we were given is actually a
158: # magic path which needs generating on the fly.
159: if path not in [SERVER_MAGIC, TESTS_MAGIC]:
160: # Pass the command onto the original handler.
161: return imp_smbserver.SMBCommands.smbComNtCreateAndX(conn_id,
162: smb_server,
163: smb_command,
164: recv_packet)
165:
166: flags2 = recv_packet["Flags2"]
167: ncax_data = imp_smb.SMBNtCreateAndX_Data(flags=flags2,
168: data=smb_command[
169: "Data"])
170: requested_file = imp_smbserver.decodeSMBString(
171: flags2,
172: ncax_data["FileName"])
173: log.debug("[SMB] User requested file '%s'", requested_file)
174:
175: if path == SERVER_MAGIC:
176: fid, full_path = self.get_server_path(requested_file)
177: else:
178: assert (path == TESTS_MAGIC)
179: fid, full_path = self.get_test_path(requested_file)
180:
181: resp_parms = imp_smb.SMBNtCreateAndXResponse_Parameters()
182: resp_data = ""
183:
184: # Simple way to generate a fid
185: if len(conn_data["OpenedFiles"]) == 0:
186: fakefid = 1
187: else:
188: fakefid = conn_data["OpenedFiles"].keys()[-1] + 1
189: resp_parms["Fid"] = fakefid
190: resp_parms["CreateAction"] = disposition
191:
192: if os.path.isdir(path):
193: resp_parms[
194: "FileAttributes"] = imp_smb.SMB_FILE_ATTRIBUTE_DIRECTORY
195: resp_parms["IsDirectory"] = 1
196: else:
197: resp_parms["IsDirectory"] = 0
198: resp_parms["FileAttributes"] = ncax_parms["FileAttributes"]
199:
200: # Get this file's information
201: resp_info, error_code = imp_smbserver.queryPathInformation(
202: "", full_path, level=imp_smb.SMB_QUERY_FILE_ALL_INFO)
203:
204: if error_code != STATUS_SUCCESS:
205: raise SmbException(error_code, "Failed to query path info")
206:
207: resp_parms["CreateTime"] = resp_info["CreationTime"]
208: resp_parms["LastAccessTime"] = resp_info[
209: "LastAccessTime"]
210: resp_parms["LastWriteTime"] = resp_info["LastWriteTime"]
211: resp_parms["LastChangeTime"] = resp_info[
212: "LastChangeTime"]
213: resp_parms["FileAttributes"] = resp_info[
214: "ExtFileAttributes"]
215: resp_parms["AllocationSize"] = resp_info[
216: "AllocationSize"]
217: resp_parms["EndOfFile"] = resp_info["EndOfFile"]
218:
219: # Let's store the fid for the connection
220: # smbServer.log("Create file %s, mode:0x%x" % (pathName, mode))
221: conn_data["OpenedFiles"][fakefid] = {}
222: conn_data["OpenedFiles"][fakefid]["FileHandle"] = fid
223: conn_data["OpenedFiles"][fakefid]["FileName"] = path
224: conn_data["OpenedFiles"][fakefid]["DeleteOnClose"] = False
225:
226: except SmbException as s:
227: log.debug("[SMB] SmbException hit: %s", s)
228: error_code = s.error_code
229: resp_parms = ""
230: resp_data = ""
231:
232: resp_cmd = imp_smb.SMBCommand(imp_smb.SMB.SMB_COM_NT_CREATE_ANDX)
233: resp_cmd["Parameters"] = resp_parms
234: resp_cmd["Data"] = resp_data
235: smb_server.setConnectionData(conn_id, conn_data)
236:
237: return [resp_cmd], None, error_code
238:
239: def get_share_path(self, conn_data, root_fid, tid):
240: conn_shares = conn_data["ConnectedShares"]
241:
242: if tid in conn_shares:
243: if root_fid > 0:
244: # If we have a rootFid, the path is relative to that fid
245: path = conn_data["OpenedFiles"][root_fid]["FileName"]
246: log.debug("RootFid present %s!" % path)
247: else:
248: if "path" in conn_shares[tid]:
249: path = conn_shares[tid]["path"]
250: else:
251: raise SmbException(STATUS_ACCESS_DENIED,
252: "Connection share had no path")
253: else:
254: raise SmbException(imp_smbserver.STATUS_SMB_BAD_TID,
255: "TID was invalid")
256:
257: return path
258:
259: def get_server_path(self, requested_filename):
260: log.debug("[SMB] Get server path '%s'", requested_filename)
261:
262: if requested_filename not in [VERIFIED_REQ]:
263: raise SmbException(STATUS_NO_SUCH_FILE, "Couldn't find the file")
264:
265: fid, filename = tempfile.mkstemp()
266: log.debug("[SMB] Created %s (%d) for storing '%s'",
267: filename, fid, requested_filename)
268:
269: contents = ""
270:
271: if requested_filename == VERIFIED_REQ:
272: log.debug("[SMB] Verifying server is alive")
273: pid = os.getpid()
274: # see tests/server/util.c function write_pidfile
275: if os.name == "nt":
276: pid += 65536
277: contents = VERIFIED_RSP.format(pid=pid).encode('utf-8')
278:
279: self.write_to_fid(fid, contents)
280: return fid, filename
281:
282: def write_to_fid(self, fid, contents):
283: # Write the contents to file descriptor
284: os.write(fid, contents)
285: os.fsync(fid)
286:
287: # Rewind the file to the beginning so a read gets us the contents
288: os.lseek(fid, 0, os.SEEK_SET)
289:
290: def get_test_path(self, requested_filename):
291: log.info("[SMB] Get reply data from 'test%s'", requested_filename)
292:
293: fid, filename = tempfile.mkstemp()
294: log.debug("[SMB] Created %s (%d) for storing test '%s'",
295: filename, fid, requested_filename)
296:
297: try:
298: contents = self.ctd.get_test_data(requested_filename).encode('utf-8')
299: self.write_to_fid(fid, contents)
300: return fid, filename
301:
302: except Exception:
303: log.exception("Failed to make test file")
304: raise SmbException(STATUS_NO_SUCH_FILE, "Failed to make test file")
305:
306:
307: class SmbException(Exception):
308: def __init__(self, error_code, error_message):
309: super(SmbException, self).__init__(error_message)
310: self.error_code = error_code
311:
312:
313: class ScriptRC(object):
314: """Enum for script return codes"""
315: SUCCESS = 0
316: FAILURE = 1
317: EXCEPTION = 2
318:
319:
320: class ScriptException(Exception):
321: pass
322:
323:
324: def get_options():
325: parser = argparse.ArgumentParser()
326:
327: parser.add_argument("--port", action="store", default=9017,
328: type=int, help="port to listen on")
329: parser.add_argument("--host", action="store", default="127.0.0.1",
330: help="host to listen on")
331: parser.add_argument("--verbose", action="store", type=int, default=0,
332: help="verbose output")
333: parser.add_argument("--pidfile", action="store",
334: help="file name for the PID")
335: parser.add_argument("--logfile", action="store",
336: help="file name for the log")
337: parser.add_argument("--srcdir", action="store", help="test directory")
338: parser.add_argument("--id", action="store", help="server ID")
339: parser.add_argument("--ipv4", action="store_true", default=0,
340: help="IPv4 flag")
341:
342: return parser.parse_args()
343:
344:
345: def setup_logging(options):
346: """
347: Set up logging from the command line options
348: """
349: root_logger = logging.getLogger()
350: add_stdout = False
351:
352: formatter = logging.Formatter("%(asctime)s %(levelname)-5.5s %(message)s")
353:
354: # Write out to a logfile
355: if options.logfile:
356: handler = logging.FileHandler(options.logfile, mode="w")
357: handler.setFormatter(formatter)
358: handler.setLevel(logging.DEBUG)
359: root_logger.addHandler(handler)
360: else:
361: # The logfile wasn't specified. Add a stdout logger.
362: add_stdout = True
363:
364: if options.verbose:
365: # Add a stdout logger as well in verbose mode
366: root_logger.setLevel(logging.DEBUG)
367: add_stdout = True
368: else:
369: root_logger.setLevel(logging.INFO)
370:
371: if add_stdout:
372: stdout_handler = logging.StreamHandler(sys.stdout)
373: stdout_handler.setFormatter(formatter)
374: stdout_handler.setLevel(logging.DEBUG)
375: root_logger.addHandler(stdout_handler)
376:
377:
378: if __name__ == '__main__':
379: # Get the options from the user.
380: options = get_options()
381:
382: # Setup logging using the user options
383: setup_logging(options)
384:
385: # Run main script.
386: try:
387: rc = smbserver(options)
388: except Exception as e:
389: log.exception(e)
390: rc = ScriptRC.EXCEPTION
391:
392: log.info("[SMB] Returning %d", rc)
393: sys.exit(rc)
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>