Annotation of embedaddon/mtr/test/probe.py, revision 1.1.1.3
1.1 misho 1: #!/usr/bin/env python
2: #
3: # mtr -- a network diagnostic tool
4: # Copyright (C) 2016 Matt Kimball
5: #
6: # This program is free software; you can redistribute it and/or modify
7: # it under the terms of the GNU General Public License version 2 as
8: # published by the Free Software Foundation.
9: #
10: # This program is distributed in the hope that it will be useful,
11: # but WITHOUT ANY WARRANTY; without even the implied warranty of
12: # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13: # GNU General Public License for more details.
14: #
1.1.1.2 misho 15: # You should have received a copy of the GNU General Public License along
16: # with this program; if not, write to the Free Software Foundation, Inc.,
17: # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
1.1 misho 18: #
19:
1.1.1.3 ! misho 20: '''Test sending probes and receiving responses.'''
1.1 misho 21:
22: import socket
23: import sys
24: import time
25: import unittest
26:
27: import mtrpacket
28:
29:
30: def resolve_ipv6_address(hostname): # type: (str) -> str
31: 'Resolve a hostname to an IP version 6 address'
32:
33: for addrinfo in socket.getaddrinfo(hostname, 0):
34: # pylint: disable=locally-disabled, unused-variable
35: (family, socktype, proto, name, sockaddr) = addrinfo
36:
37: if family == socket.AF_INET6:
38: sockaddr6 = sockaddr # type: tuple
39:
40: (address, port, flow, scope) = sockaddr6
41: return address
42:
43: raise LookupError(hostname)
44:
45:
46: def check_feature(test, feature):
47: 'Check for support for a particular feature with mtr-packet'
48:
49: check_cmd = '70 check-support feature ' + feature
50: test.write_command(check_cmd)
51:
52: reply = test.parse_reply()
53: test.assertEqual(reply.command_name, 'feature-support')
54: test.assertIn('support', reply.argument)
55:
56: if reply.argument['support'] != 'ok':
57: return False
58:
59: return True
60:
61:
62: def test_basic_remote_probe(test, ip_version, protocol):
63: 'Test a probe to a remote host with a TTL of 1'
64:
65: protocol_str = 'protocol ' + protocol
66: if ip_version == 6:
67: address_str = 'ip-6 ' + resolve_ipv6_address(mtrpacket.IPV6_TEST_HOST)
68: elif ip_version == 4:
69: address_str = 'ip-4 8.8.8.8'
70: else:
71: raise ValueError(ip_version)
72:
73: cmd = '60 send-probe ' + \
74: protocol_str + ' ' + address_str + ' port 164 ttl 1'
75: test.write_command(cmd)
76:
77: reply = test.parse_reply()
78: test.assertEqual(reply.command_name, 'ttl-expired')
79:
80:
81: def test_basic_local_probe(test, ip_version, protocol):
82: 'Test a probe to a closed port on localhost'
83:
84: protocol_str = 'protocol ' + protocol
85: if ip_version == 6:
86: address_str = 'ip-6 ::1'
87: elif ip_version == 4:
88: address_str = 'ip-4 127.0.0.1'
89:
90: cmd = '61 send-probe ' + \
91: protocol_str + ' ' + address_str + ' port 164'
92: test.write_command(cmd)
93:
94: reply = test.parse_reply()
95: test.assertEqual(reply.command_name, 'reply')
96:
97: if ip_version == 6:
98: test.assertIn('ip-6', reply.argument)
99: test.assertEqual(reply.argument['ip-6'], '::1')
100: elif ip_version == 4:
101: test.assertIn('ip-4', reply.argument)
102: test.assertEqual(reply.argument['ip-4'], '127.0.0.1')
103:
104:
105: def test_basic_probe(test, ip_version, protocol):
106: # type: (mtrpacket.MtrPacketTest, int, unicode) -> None
107:
108: '''Test a probe with TTL expiration and a probe which reaches its
109: destination with a particular protocol.'''
110:
111: if not check_feature(test, protocol):
112: err_str = 'Skipping ' + protocol + ' test due to no support\n'
113: sys.stderr.write(err_str.encode('utf-8'))
114: return
115:
116: test_basic_remote_probe(test, ip_version, protocol)
117: test_basic_local_probe(test, ip_version, protocol)
118:
119:
120: class TestProbeICMPv4(mtrpacket.MtrPacketTest):
121: '''Test sending probes using IP version 4'''
122:
123: def test_probe(self):
124: 'Test sending regular ICMP probes to known addresses'
125:
126: # Probe Google's well-known DNS server and expect a reply
127: self.write_command('14 send-probe ip-4 8.8.8.8')
128: reply = self.parse_reply()
129: self.assertEqual(reply.token, 14)
130: self.assertEqual(reply.command_name, 'reply')
131: self.assertIn('ip-4', reply.argument)
132: self.assertEqual(reply.argument['ip-4'], '8.8.8.8')
133: self.assertIn('round-trip-time', reply.argument)
134:
135: def test_timeout(self):
136: 'Test timeouts when sending to a non-existant address'
137:
138: #
1.1.1.3 ! misho 139: # Probe a non-existent address, and expect no reply
1.1 misho 140: #
141: # I'm not sure what the best way to find an address that doesn't
142: # exist, but is still route-able. If we use a reserved IP
143: # address range, Windows will tell us it is non-routeable,
144: # rather than timing out when transmitting to that address.
145: #
146: # We're just using a currently unused address in Google's
147: # range instead. This is probably not the best solution.
148: #
149:
150: # pylint: disable=locally-disabled, unused-variable
151: for i in range(16):
152: self.write_command('15 send-probe ip-4 8.8.254.254 timeout 1')
153: reply = self.parse_reply()
154: self.assertEqual(reply.token, 15)
155: self.assertEqual(reply.command_name, 'no-reply')
156:
157: def test_exhaust_probes(self):
158: 'Test exhausting all available probes'
159:
160: probe_count = 4 * 1024
161: token = 1024
162:
163: # pylint: disable=locally-disabled, unused-variable
164: for i in range(probe_count):
165: command = str(token) + ' send-probe ip-4 8.8.254.254 timeout 60'
166: token += 1
167: self.write_command(command)
168:
169: reply = None
170: try:
171: reply = self.parse_reply(0)
172: except mtrpacket.ReadReplyTimeout:
173: pass
174:
175: if reply:
176: if reply.command_name == 'probes-exhausted':
177: break
178:
179: self.assertIsNotNone(reply)
180: self.assertEqual(reply.command_name, 'probes-exhausted')
181:
182: def test_timeout_values(self):
183: '''Test that timeout values wait the right amount of time
184:
185: Give each probe a half-second grace period to probe a timeout
186: reply after the expected timeout time.'''
187:
188: begin = time.time()
189: self.write_command('19 send-probe ip-4 8.8.254.254 timeout 0')
190: self.parse_reply()
191: elapsed = time.time() - begin
192: self.assertLess(elapsed, 0.5)
193:
194: begin = time.time()
195: self.write_command('20 send-probe ip-4 8.8.254.254 timeout 1')
196: self.parse_reply()
197: elapsed = time.time() - begin
198: self.assertGreaterEqual(elapsed, 0.9)
199: self.assertLess(elapsed, 1.5)
200:
201: begin = time.time()
202: self.write_command('21 send-probe ip-4 8.8.254.254 timeout 3')
203: self.parse_reply()
204: elapsed = time.time() - begin
205: self.assertGreaterEqual(elapsed, 2.9)
206: self.assertLess(elapsed, 3.5)
207:
208: def test_ttl_expired(self):
209: 'Test sending a probe which will have its time-to-live expire'
210:
1.1.1.2 misho 211: # Probe Google's DNS server, but give the probe only one hop
1.1 misho 212: # to live.
213: self.write_command('16 send-probe ip-4 8.8.8.8 ttl 1')
214: reply = self.parse_reply()
215: self.assertEqual(reply.command_name, 'ttl-expired')
216: self.assertIn('ip-4', reply.argument)
217: self.assertIn('round-trip-time', reply.argument)
218:
219: def test_parallel_probes(self):
220: '''Test sending multiple probes in parallel
221:
222: We will expect the probes to complete out-of-order by sending
1.1.1.2 misho 223: a probe to a distant host immediately followed by a probe to
1.1 misho 224: the local host.'''
225:
226: success_count = 0
227: loop_count = 32
228:
229: # pylint: disable=locally-disabled, unused-variable
230: for i in range(loop_count):
231: # Probe the distant host before the local host.
232: self.write_command('17 send-probe ip-4 8.8.8.8 timeout 1')
233: self.write_command('18 send-probe ip-4 127.0.0.1 timeout 1')
234:
235: reply = self.parse_reply()
236: if reply.command_name == 'no-reply':
237: continue
238:
239: self.assertEqual(reply.command_name, 'reply')
240: self.assertIn('ip-4', reply.argument)
241: self.assertEqual(reply.argument['ip-4'], '127.0.0.1')
242: self.assertIn('round-trip-time', reply.argument)
243: first_time = int(reply.argument['round-trip-time'])
244:
245: reply = self.parse_reply()
246: if reply.command_name == 'no-reply':
247: continue
248:
249: self.assertEqual(reply.command_name, 'reply')
250: self.assertIn('ip-4', reply.argument)
251: self.assertEqual(reply.argument['ip-4'], '8.8.8.8')
252: self.assertIn('round-trip-time', reply.argument)
253: second_time = int(reply.argument['round-trip-time'])
254:
255: # Ensure we got a reply from the host with the lowest latency
256: # first.
257: self.assertLess(first_time, second_time)
258:
259: success_count += 1
260:
261: # We need 90% success to pass. This allows a few probes to be
262: # occasionally dropped by the network without failing the test.
263: required_success = int(loop_count * 0.90)
264: self.assertGreaterEqual(success_count, required_success)
265:
266:
267: class TestProbeICMPv6(mtrpacket.MtrPacketTest):
268: '''Test sending probes using IP version 6'''
269:
270: def __init__(self, *args):
1.1.1.3 ! misho 271: if mtrpacket.HAVE_IPV6:
! 272: google_addr = resolve_ipv6_address(mtrpacket.IPV6_TEST_HOST)
1.1 misho 273:
1.1.1.3 ! misho 274: self.google_addr = google_addr # type: str
1.1 misho 275:
276: super(TestProbeICMPv6, self).__init__(*args)
277:
278: @unittest.skipUnless(mtrpacket.HAVE_IPV6, 'No IPv6')
279: def test_probe(self):
280: "Test a probe to Google's public DNS server"
281:
282: # Probe Google's well-known DNS server and expect a reply
283: self.write_command('51 send-probe ip-6 ' + self.google_addr)
284: reply = self.parse_reply()
285: self.assertEqual(reply.command_name, 'reply')
286: self.assertIn('ip-6', reply.argument)
287: self.assertIn('round-trip-time', reply.argument)
288:
289: # Probe the loopback, and check the address we get a reply from is
290: # also the loopback. While implementing IPv6, I had a bug where
291: # the low bits of the received address got zeroed. This checks for
292: # that bug.
293: self.write_command('52 send-probe ip-6 ::1')
294: reply = self.parse_reply()
295: self.assertEqual(reply.command_name, 'reply')
296: self.assertIn('ip-6', reply.argument)
297: self.assertIn('round-trip-time', reply.argument)
298: self.assertEqual(reply.argument['ip-6'], '::1')
299:
300: @unittest.skipUnless(mtrpacket.HAVE_IPV6, 'No IPv6')
301: def test_ttl_expired(self):
302: 'Test sending a probe which will have its time-to-live expire'
303:
1.1.1.2 misho 304: # Probe Google's DNS server, but give the probe only one hop
1.1 misho 305: # to live.
306: cmd = '53 send-probe ip-6 ' + self.google_addr + ' ttl 1'
307: self.write_command(cmd)
308: reply = self.parse_reply()
309: self.assertEqual('ttl-expired', reply.command_name)
310: self.assertIn('ip-6', reply.argument)
311: self.assertIn('round-trip-time', reply.argument)
312:
313:
314: class TestProbeUDP(mtrpacket.MtrPacketTest):
315: 'Test transmitting probes using UDP'
316:
317: def udp_port_test(self, address): # type: (unicode) -> None
318: 'Test UDP probes with variations on source port and dest port'
319:
320: if not check_feature(self, 'udp'):
321: return
322:
323: cmd = '80 send-probe protocol udp ' + address
324: self.write_command(cmd)
325: reply = self.parse_reply()
326: self.assertEqual('reply', reply.command_name)
327:
328: cmd = '81 send-probe protocol udp port 990 ' + address
329: self.write_command(cmd)
330: reply = self.parse_reply()
331: self.assertEqual('reply', reply.command_name)
332:
333: cmd = '82 send-probe protocol udp local-port 1991 ' + address
334: self.write_command(cmd)
335: reply = self.parse_reply()
336: self.assertEqual('reply', reply.command_name)
337:
338: def test_udp_v4(self):
339: 'Test IPv4 UDP probes'
340:
341: test_basic_probe(self, 4, 'udp')
342:
343: self.udp_port_test('ip-4 127.0.0.1')
344:
345: @unittest.skipUnless(mtrpacket.HAVE_IPV6, 'No IPv6')
346: def test_udp_v6(self):
347: 'Test IPv6 UDP probes'
348:
349: test_basic_probe(self, 6, 'udp')
350:
351: self.udp_port_test('ip-6 ::1')
352:
353:
354: class TestProbeTCP(mtrpacket.MtrPacketTest):
355: 'Test TCP probe support'
356:
357: def test_tcp_v4(self):
358: '''Test IPv4 TCP probes, with TTL expiration, to a refused port
359: and to an open port'''
360:
361: test_basic_probe(self, 4, 'tcp')
362:
363: if not check_feature(self, 'tcp'):
364: return
365:
366: # Probe a local port assumed to be open (ssh)
367: cmd = '80 send-probe ip-4 127.0.0.1 protocol tcp port 22'
368: self.write_command(cmd)
369:
370: reply = self.parse_reply()
371: self.assertEqual(reply.command_name, 'reply')
372:
373: @unittest.skipUnless(mtrpacket.HAVE_IPV6, 'No IPv6')
374: def test_tcp_v6(self):
375: 'Test IPv6 TCP probes'
376:
377: test_basic_probe(self, 6, 'tcp')
378:
379: if not check_feature(self, 'tcp'):
380: return
381:
382: # Probe a local port assumed to be open (ssh)
383: cmd = '80 send-probe ip-6 ::1 protocol tcp port 22'
384: self.write_command(cmd)
385:
386: reply = self.parse_reply()
387: self.assertEqual(reply.command_name, 'reply')
388:
389:
390: class TestProbeSCTP(mtrpacket.MtrPacketTest):
391: 'Test SCTP probes'
392:
393: def test_sctp_v4(self):
394: 'Test basic SCTP probes over IPv4'
395:
396: test_basic_probe(self, 4, 'sctp')
397:
398: @unittest.skipUnless(mtrpacket.HAVE_IPV6, 'No IPv6')
399: def test_sctp_v6(self):
400: 'Test basic SCTP probes over IPv6'
401:
402: test_basic_probe(self, 6, 'sctp')
403:
404:
405: if __name__ == '__main__':
406: mtrpacket.check_running_as_root()
407: unittest.main()
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>