Annotation of embedaddon/mtr/test/probe.py, revision 1.1
1.1 ! misho 1: #!/usr/bin/env python
! 2: #
! 3: # mtr -- a network diagnostic tool
! 4: # Copyright (C) 2016 Matt Kimball
! 5: #
! 6: # This program is free software; you can redistribute it and/or modify
! 7: # it under the terms of the GNU General Public License version 2 as
! 8: # published by the Free Software Foundation.
! 9: #
! 10: # This program is distributed in the hope that it will be useful,
! 11: # but WITHOUT ANY WARRANTY; without even the implied warranty of
! 12: # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
! 13: # GNU General Public License for more details.
! 14: #
! 15: # You should have received a copy of the GNU General Public License
! 16: # along with this program; if not, write to the Free Software
! 17: # Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
! 18: #
! 19:
! 20: '''Test sending probes and receiving respones.'''
! 21:
! 22: import socket
! 23: import sys
! 24: import time
! 25: import unittest
! 26:
! 27: import mtrpacket
! 28:
! 29:
! 30: def resolve_ipv6_address(hostname): # type: (str) -> str
! 31: 'Resolve a hostname to an IP version 6 address'
! 32:
! 33: for addrinfo in socket.getaddrinfo(hostname, 0):
! 34: # pylint: disable=locally-disabled, unused-variable
! 35: (family, socktype, proto, name, sockaddr) = addrinfo
! 36:
! 37: if family == socket.AF_INET6:
! 38: sockaddr6 = sockaddr # type: tuple
! 39:
! 40: (address, port, flow, scope) = sockaddr6
! 41: return address
! 42:
! 43: raise LookupError(hostname)
! 44:
! 45:
! 46: def check_feature(test, feature):
! 47: 'Check for support for a particular feature with mtr-packet'
! 48:
! 49: check_cmd = '70 check-support feature ' + feature
! 50: test.write_command(check_cmd)
! 51:
! 52: reply = test.parse_reply()
! 53: test.assertEqual(reply.command_name, 'feature-support')
! 54: test.assertIn('support', reply.argument)
! 55:
! 56: if reply.argument['support'] != 'ok':
! 57: return False
! 58:
! 59: return True
! 60:
! 61:
! 62: def test_basic_remote_probe(test, ip_version, protocol):
! 63: 'Test a probe to a remote host with a TTL of 1'
! 64:
! 65: protocol_str = 'protocol ' + protocol
! 66: if ip_version == 6:
! 67: address_str = 'ip-6 ' + resolve_ipv6_address(mtrpacket.IPV6_TEST_HOST)
! 68: elif ip_version == 4:
! 69: address_str = 'ip-4 8.8.8.8'
! 70: else:
! 71: raise ValueError(ip_version)
! 72:
! 73: cmd = '60 send-probe ' + \
! 74: protocol_str + ' ' + address_str + ' port 164 ttl 1'
! 75: test.write_command(cmd)
! 76:
! 77: reply = test.parse_reply()
! 78: test.assertEqual(reply.command_name, 'ttl-expired')
! 79:
! 80:
! 81: def test_basic_local_probe(test, ip_version, protocol):
! 82: 'Test a probe to a closed port on localhost'
! 83:
! 84: protocol_str = 'protocol ' + protocol
! 85: if ip_version == 6:
! 86: address_str = 'ip-6 ::1'
! 87: elif ip_version == 4:
! 88: address_str = 'ip-4 127.0.0.1'
! 89:
! 90: cmd = '61 send-probe ' + \
! 91: protocol_str + ' ' + address_str + ' port 164'
! 92: test.write_command(cmd)
! 93:
! 94: reply = test.parse_reply()
! 95: test.assertEqual(reply.command_name, 'reply')
! 96:
! 97: if ip_version == 6:
! 98: test.assertIn('ip-6', reply.argument)
! 99: test.assertEqual(reply.argument['ip-6'], '::1')
! 100: elif ip_version == 4:
! 101: test.assertIn('ip-4', reply.argument)
! 102: test.assertEqual(reply.argument['ip-4'], '127.0.0.1')
! 103:
! 104:
! 105: def test_basic_probe(test, ip_version, protocol):
! 106: # type: (mtrpacket.MtrPacketTest, int, unicode) -> None
! 107:
! 108: '''Test a probe with TTL expiration and a probe which reaches its
! 109: destination with a particular protocol.'''
! 110:
! 111: if not check_feature(test, protocol):
! 112: err_str = 'Skipping ' + protocol + ' test due to no support\n'
! 113: sys.stderr.write(err_str.encode('utf-8'))
! 114: return
! 115:
! 116: test_basic_remote_probe(test, ip_version, protocol)
! 117: test_basic_local_probe(test, ip_version, protocol)
! 118:
! 119:
! 120: class TestProbeICMPv4(mtrpacket.MtrPacketTest):
! 121: '''Test sending probes using IP version 4'''
! 122:
! 123: def test_probe(self):
! 124: 'Test sending regular ICMP probes to known addresses'
! 125:
! 126: # Probe Google's well-known DNS server and expect a reply
! 127: self.write_command('14 send-probe ip-4 8.8.8.8')
! 128: reply = self.parse_reply()
! 129: self.assertEqual(reply.token, 14)
! 130: self.assertEqual(reply.command_name, 'reply')
! 131: self.assertIn('ip-4', reply.argument)
! 132: self.assertEqual(reply.argument['ip-4'], '8.8.8.8')
! 133: self.assertIn('round-trip-time', reply.argument)
! 134:
! 135: def test_timeout(self):
! 136: 'Test timeouts when sending to a non-existant address'
! 137:
! 138: #
! 139: # Probe a non-existant address, and expect no reply
! 140: #
! 141: # I'm not sure what the best way to find an address that doesn't
! 142: # exist, but is still route-able. If we use a reserved IP
! 143: # address range, Windows will tell us it is non-routeable,
! 144: # rather than timing out when transmitting to that address.
! 145: #
! 146: # We're just using a currently unused address in Google's
! 147: # range instead. This is probably not the best solution.
! 148: #
! 149:
! 150: # pylint: disable=locally-disabled, unused-variable
! 151: for i in range(16):
! 152: self.write_command('15 send-probe ip-4 8.8.254.254 timeout 1')
! 153: reply = self.parse_reply()
! 154: self.assertEqual(reply.token, 15)
! 155: self.assertEqual(reply.command_name, 'no-reply')
! 156:
! 157: def test_exhaust_probes(self):
! 158: 'Test exhausting all available probes'
! 159:
! 160: probe_count = 4 * 1024
! 161: token = 1024
! 162:
! 163: # pylint: disable=locally-disabled, unused-variable
! 164: for i in range(probe_count):
! 165: command = str(token) + ' send-probe ip-4 8.8.254.254 timeout 60'
! 166: token += 1
! 167: self.write_command(command)
! 168:
! 169: reply = None
! 170: try:
! 171: reply = self.parse_reply(0)
! 172: except mtrpacket.ReadReplyTimeout:
! 173: pass
! 174:
! 175: if reply:
! 176: if reply.command_name == 'probes-exhausted':
! 177: break
! 178:
! 179: self.assertIsNotNone(reply)
! 180: self.assertEqual(reply.command_name, 'probes-exhausted')
! 181:
! 182: def test_timeout_values(self):
! 183: '''Test that timeout values wait the right amount of time
! 184:
! 185: Give each probe a half-second grace period to probe a timeout
! 186: reply after the expected timeout time.'''
! 187:
! 188: begin = time.time()
! 189: self.write_command('19 send-probe ip-4 8.8.254.254 timeout 0')
! 190: self.parse_reply()
! 191: elapsed = time.time() - begin
! 192: self.assertLess(elapsed, 0.5)
! 193:
! 194: begin = time.time()
! 195: self.write_command('20 send-probe ip-4 8.8.254.254 timeout 1')
! 196: self.parse_reply()
! 197: elapsed = time.time() - begin
! 198: self.assertGreaterEqual(elapsed, 0.9)
! 199: self.assertLess(elapsed, 1.5)
! 200:
! 201: begin = time.time()
! 202: self.write_command('21 send-probe ip-4 8.8.254.254 timeout 3')
! 203: self.parse_reply()
! 204: elapsed = time.time() - begin
! 205: self.assertGreaterEqual(elapsed, 2.9)
! 206: self.assertLess(elapsed, 3.5)
! 207:
! 208: def test_ttl_expired(self):
! 209: 'Test sending a probe which will have its time-to-live expire'
! 210:
! 211: # Probe Goolge's DNS server, but give the probe only one hop
! 212: # to live.
! 213: self.write_command('16 send-probe ip-4 8.8.8.8 ttl 1')
! 214: reply = self.parse_reply()
! 215: self.assertEqual(reply.command_name, 'ttl-expired')
! 216: self.assertIn('ip-4', reply.argument)
! 217: self.assertIn('round-trip-time', reply.argument)
! 218:
! 219: def test_parallel_probes(self):
! 220: '''Test sending multiple probes in parallel
! 221:
! 222: We will expect the probes to complete out-of-order by sending
! 223: a probe to a distant host immeidately followed by a probe to
! 224: the local host.'''
! 225:
! 226: success_count = 0
! 227: loop_count = 32
! 228:
! 229: # pylint: disable=locally-disabled, unused-variable
! 230: for i in range(loop_count):
! 231: # Probe the distant host before the local host.
! 232: self.write_command('17 send-probe ip-4 8.8.8.8 timeout 1')
! 233: self.write_command('18 send-probe ip-4 127.0.0.1 timeout 1')
! 234:
! 235: reply = self.parse_reply()
! 236: if reply.command_name == 'no-reply':
! 237: continue
! 238:
! 239: self.assertEqual(reply.command_name, 'reply')
! 240: self.assertIn('ip-4', reply.argument)
! 241: self.assertEqual(reply.argument['ip-4'], '127.0.0.1')
! 242: self.assertIn('round-trip-time', reply.argument)
! 243: first_time = int(reply.argument['round-trip-time'])
! 244:
! 245: reply = self.parse_reply()
! 246: if reply.command_name == 'no-reply':
! 247: continue
! 248:
! 249: self.assertEqual(reply.command_name, 'reply')
! 250: self.assertIn('ip-4', reply.argument)
! 251: self.assertEqual(reply.argument['ip-4'], '8.8.8.8')
! 252: self.assertIn('round-trip-time', reply.argument)
! 253: second_time = int(reply.argument['round-trip-time'])
! 254:
! 255: # Ensure we got a reply from the host with the lowest latency
! 256: # first.
! 257: self.assertLess(first_time, second_time)
! 258:
! 259: success_count += 1
! 260:
! 261: # We need 90% success to pass. This allows a few probes to be
! 262: # occasionally dropped by the network without failing the test.
! 263: required_success = int(loop_count * 0.90)
! 264: self.assertGreaterEqual(success_count, required_success)
! 265:
! 266:
! 267: class TestProbeICMPv6(mtrpacket.MtrPacketTest):
! 268: '''Test sending probes using IP version 6'''
! 269:
! 270: def __init__(self, *args):
! 271: google_addr = resolve_ipv6_address(mtrpacket.IPV6_TEST_HOST)
! 272:
! 273: self.google_addr = google_addr # type: str
! 274:
! 275: super(TestProbeICMPv6, self).__init__(*args)
! 276:
! 277: @unittest.skipUnless(mtrpacket.HAVE_IPV6, 'No IPv6')
! 278: def test_probe(self):
! 279: "Test a probe to Google's public DNS server"
! 280:
! 281: # Probe Google's well-known DNS server and expect a reply
! 282: self.write_command('51 send-probe ip-6 ' + self.google_addr)
! 283: reply = self.parse_reply()
! 284: self.assertEqual(reply.command_name, 'reply')
! 285: self.assertIn('ip-6', reply.argument)
! 286: self.assertIn('round-trip-time', reply.argument)
! 287:
! 288: # Probe the loopback, and check the address we get a reply from is
! 289: # also the loopback. While implementing IPv6, I had a bug where
! 290: # the low bits of the received address got zeroed. This checks for
! 291: # that bug.
! 292: self.write_command('52 send-probe ip-6 ::1')
! 293: reply = self.parse_reply()
! 294: self.assertEqual(reply.command_name, 'reply')
! 295: self.assertIn('ip-6', reply.argument)
! 296: self.assertIn('round-trip-time', reply.argument)
! 297: self.assertEqual(reply.argument['ip-6'], '::1')
! 298:
! 299: @unittest.skipUnless(mtrpacket.HAVE_IPV6, 'No IPv6')
! 300: def test_ttl_expired(self):
! 301: 'Test sending a probe which will have its time-to-live expire'
! 302:
! 303: # Probe Goolge's DNS server, but give the probe only one hop
! 304: # to live.
! 305: cmd = '53 send-probe ip-6 ' + self.google_addr + ' ttl 1'
! 306: self.write_command(cmd)
! 307: reply = self.parse_reply()
! 308: self.assertEqual('ttl-expired', reply.command_name)
! 309: self.assertIn('ip-6', reply.argument)
! 310: self.assertIn('round-trip-time', reply.argument)
! 311:
! 312:
! 313: class TestProbeUDP(mtrpacket.MtrPacketTest):
! 314: 'Test transmitting probes using UDP'
! 315:
! 316: def udp_port_test(self, address): # type: (unicode) -> None
! 317: 'Test UDP probes with variations on source port and dest port'
! 318:
! 319: if not check_feature(self, 'udp'):
! 320: return
! 321:
! 322: cmd = '80 send-probe protocol udp ' + address
! 323: self.write_command(cmd)
! 324: reply = self.parse_reply()
! 325: self.assertEqual('reply', reply.command_name)
! 326:
! 327: cmd = '81 send-probe protocol udp port 990 ' + address
! 328: self.write_command(cmd)
! 329: reply = self.parse_reply()
! 330: self.assertEqual('reply', reply.command_name)
! 331:
! 332: cmd = '82 send-probe protocol udp local-port 1991 ' + address
! 333: self.write_command(cmd)
! 334: reply = self.parse_reply()
! 335: self.assertEqual('reply', reply.command_name)
! 336:
! 337: def test_udp_v4(self):
! 338: 'Test IPv4 UDP probes'
! 339:
! 340: test_basic_probe(self, 4, 'udp')
! 341:
! 342: self.udp_port_test('ip-4 127.0.0.1')
! 343:
! 344: @unittest.skipUnless(mtrpacket.HAVE_IPV6, 'No IPv6')
! 345: def test_udp_v6(self):
! 346: 'Test IPv6 UDP probes'
! 347:
! 348: test_basic_probe(self, 6, 'udp')
! 349:
! 350: self.udp_port_test('ip-6 ::1')
! 351:
! 352:
! 353: class TestProbeTCP(mtrpacket.MtrPacketTest):
! 354: 'Test TCP probe support'
! 355:
! 356: def test_tcp_v4(self):
! 357: '''Test IPv4 TCP probes, with TTL expiration, to a refused port
! 358: and to an open port'''
! 359:
! 360: test_basic_probe(self, 4, 'tcp')
! 361:
! 362: if not check_feature(self, 'tcp'):
! 363: return
! 364:
! 365: # Probe a local port assumed to be open (ssh)
! 366: cmd = '80 send-probe ip-4 127.0.0.1 protocol tcp port 22'
! 367: self.write_command(cmd)
! 368:
! 369: reply = self.parse_reply()
! 370: self.assertEqual(reply.command_name, 'reply')
! 371:
! 372: @unittest.skipUnless(mtrpacket.HAVE_IPV6, 'No IPv6')
! 373: def test_tcp_v6(self):
! 374: 'Test IPv6 TCP probes'
! 375:
! 376: test_basic_probe(self, 6, 'tcp')
! 377:
! 378: if not check_feature(self, 'tcp'):
! 379: return
! 380:
! 381: # Probe a local port assumed to be open (ssh)
! 382: cmd = '80 send-probe ip-6 ::1 protocol tcp port 22'
! 383: self.write_command(cmd)
! 384:
! 385: reply = self.parse_reply()
! 386: self.assertEqual(reply.command_name, 'reply')
! 387:
! 388:
! 389: class TestProbeSCTP(mtrpacket.MtrPacketTest):
! 390: 'Test SCTP probes'
! 391:
! 392: def test_sctp_v4(self):
! 393: 'Test basic SCTP probes over IPv4'
! 394:
! 395: test_basic_probe(self, 4, 'sctp')
! 396:
! 397: @unittest.skipUnless(mtrpacket.HAVE_IPV6, 'No IPv6')
! 398: def test_sctp_v6(self):
! 399: 'Test basic SCTP probes over IPv6'
! 400:
! 401: test_basic_probe(self, 6, 'sctp')
! 402:
! 403:
! 404: if __name__ == '__main__':
! 405: mtrpacket.check_running_as_root()
! 406: unittest.main()
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>