Annotation of embedaddon/mtr/test/probe.py, revision 1.1

1.1     ! misho       1: #!/usr/bin/env python
        !             2: #
        !             3: #   mtr  --  a network diagnostic tool
        !             4: #   Copyright (C) 2016  Matt Kimball
        !             5: #
        !             6: #   This program is free software; you can redistribute it and/or modify
        !             7: #   it under the terms of the GNU General Public License version 2 as
        !             8: #   published by the Free Software Foundation.
        !             9: #
        !            10: #   This program is distributed in the hope that it will be useful,
        !            11: #   but WITHOUT ANY WARRANTY; without even the implied warranty of
        !            12: #   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
        !            13: #   GNU General Public License for more details.
        !            14: #
        !            15: #   You should have received a copy of the GNU General Public License
        !            16: #   along with this program; if not, write to the Free Software
        !            17: #   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
        !            18: #
        !            19: 
        !            20: '''Test sending probes and receiving respones.'''
        !            21: 
        !            22: import socket
        !            23: import sys
        !            24: import time
        !            25: import unittest
        !            26: 
        !            27: import mtrpacket
        !            28: 
        !            29: 
        !            30: def resolve_ipv6_address(hostname):  # type: (str) -> str
        !            31:     'Resolve a hostname to an IP version 6 address'
        !            32: 
        !            33:     for addrinfo in socket.getaddrinfo(hostname, 0):
        !            34:         # pylint: disable=locally-disabled, unused-variable
        !            35:         (family, socktype, proto, name, sockaddr) = addrinfo
        !            36: 
        !            37:         if family == socket.AF_INET6:
        !            38:             sockaddr6 = sockaddr  # type: tuple
        !            39: 
        !            40:             (address, port, flow, scope) = sockaddr6
        !            41:             return address
        !            42: 
        !            43:     raise LookupError(hostname)
        !            44: 
        !            45: 
        !            46: def check_feature(test, feature):
        !            47:     'Check for support for a particular feature with mtr-packet'
        !            48: 
        !            49:     check_cmd = '70 check-support feature ' + feature
        !            50:     test.write_command(check_cmd)
        !            51: 
        !            52:     reply = test.parse_reply()
        !            53:     test.assertEqual(reply.command_name, 'feature-support')
        !            54:     test.assertIn('support', reply.argument)
        !            55: 
        !            56:     if reply.argument['support'] != 'ok':
        !            57:         return False
        !            58: 
        !            59:     return True
        !            60: 
        !            61: 
        !            62: def test_basic_remote_probe(test, ip_version, protocol):
        !            63:     'Test a probe to a remote host with a TTL of 1'
        !            64: 
        !            65:     protocol_str = 'protocol ' + protocol
        !            66:     if ip_version == 6:
        !            67:         address_str = 'ip-6 ' + resolve_ipv6_address(mtrpacket.IPV6_TEST_HOST)
        !            68:     elif ip_version == 4:
        !            69:         address_str = 'ip-4 8.8.8.8'
        !            70:     else:
        !            71:         raise ValueError(ip_version)
        !            72: 
        !            73:     cmd = '60 send-probe ' + \
        !            74:         protocol_str + ' ' + address_str + ' port 164 ttl 1'
        !            75:     test.write_command(cmd)
        !            76: 
        !            77:     reply = test.parse_reply()
        !            78:     test.assertEqual(reply.command_name, 'ttl-expired')
        !            79: 
        !            80: 
        !            81: def test_basic_local_probe(test, ip_version, protocol):
        !            82:     'Test a probe to a closed port on localhost'
        !            83: 
        !            84:     protocol_str = 'protocol ' + protocol
        !            85:     if ip_version == 6:
        !            86:         address_str = 'ip-6 ::1'
        !            87:     elif ip_version == 4:
        !            88:         address_str = 'ip-4 127.0.0.1'
        !            89: 
        !            90:     cmd = '61 send-probe ' + \
        !            91:         protocol_str + ' ' + address_str + ' port 164'
        !            92:     test.write_command(cmd)
        !            93: 
        !            94:     reply = test.parse_reply()
        !            95:     test.assertEqual(reply.command_name, 'reply')
        !            96: 
        !            97:     if ip_version == 6:
        !            98:         test.assertIn('ip-6', reply.argument)
        !            99:         test.assertEqual(reply.argument['ip-6'], '::1')
        !           100:     elif ip_version == 4:
        !           101:         test.assertIn('ip-4', reply.argument)
        !           102:         test.assertEqual(reply.argument['ip-4'], '127.0.0.1')
        !           103: 
        !           104: 
        !           105: def test_basic_probe(test, ip_version, protocol):
        !           106:     # type: (mtrpacket.MtrPacketTest, int, unicode) -> None
        !           107: 
        !           108:     '''Test a probe with TTL expiration and a probe which reaches its
        !           109:     destination with a particular protocol.'''
        !           110: 
        !           111:     if not check_feature(test, protocol):
        !           112:         err_str = 'Skipping ' + protocol + ' test due to no support\n'
        !           113:         sys.stderr.write(err_str.encode('utf-8'))
        !           114:         return
        !           115: 
        !           116:     test_basic_remote_probe(test, ip_version, protocol)
        !           117:     test_basic_local_probe(test, ip_version, protocol)
        !           118: 
        !           119: 
        !           120: class TestProbeICMPv4(mtrpacket.MtrPacketTest):
        !           121:     '''Test sending probes using IP version 4'''
        !           122: 
        !           123:     def test_probe(self):
        !           124:         'Test sending regular ICMP probes to known addresses'
        !           125: 
        !           126:         #  Probe Google's well-known DNS server and expect a reply
        !           127:         self.write_command('14 send-probe ip-4 8.8.8.8')
        !           128:         reply = self.parse_reply()
        !           129:         self.assertEqual(reply.token, 14)
        !           130:         self.assertEqual(reply.command_name, 'reply')
        !           131:         self.assertIn('ip-4', reply.argument)
        !           132:         self.assertEqual(reply.argument['ip-4'], '8.8.8.8')
        !           133:         self.assertIn('round-trip-time', reply.argument)
        !           134: 
        !           135:     def test_timeout(self):
        !           136:         'Test timeouts when sending to a non-existant address'
        !           137: 
        !           138:         #
        !           139:         #  Probe a non-existant address, and expect no reply
        !           140:         #
        !           141:         #  I'm not sure what the best way to find an address that doesn't
        !           142:         #  exist, but is still route-able.  If we use a reserved IP
        !           143:         #  address range, Windows will tell us it is non-routeable,
        !           144:         #  rather than timing out when transmitting to that address.
        !           145:         #
        !           146:         #  We're just using a currently unused address in Google's
        !           147:         #  range instead.  This is probably not the best solution.
        !           148:         #
        !           149: 
        !           150:         # pylint: disable=locally-disabled, unused-variable
        !           151:         for i in range(16):
        !           152:             self.write_command('15 send-probe ip-4 8.8.254.254 timeout 1')
        !           153:             reply = self.parse_reply()
        !           154:             self.assertEqual(reply.token, 15)
        !           155:             self.assertEqual(reply.command_name, 'no-reply')
        !           156: 
        !           157:     def test_exhaust_probes(self):
        !           158:         'Test exhausting all available probes'
        !           159: 
        !           160:         probe_count = 4 * 1024
        !           161:         token = 1024
        !           162: 
        !           163:         # pylint: disable=locally-disabled, unused-variable
        !           164:         for i in range(probe_count):
        !           165:             command = str(token) + ' send-probe ip-4 8.8.254.254 timeout 60'
        !           166:             token += 1
        !           167:             self.write_command(command)
        !           168: 
        !           169:             reply = None
        !           170:             try:
        !           171:                 reply = self.parse_reply(0)
        !           172:             except mtrpacket.ReadReplyTimeout:
        !           173:                 pass
        !           174: 
        !           175:             if reply:
        !           176:                 if reply.command_name == 'probes-exhausted':
        !           177:                     break
        !           178: 
        !           179:         self.assertIsNotNone(reply)
        !           180:         self.assertEqual(reply.command_name, 'probes-exhausted')
        !           181: 
        !           182:     def test_timeout_values(self):
        !           183:         '''Test that timeout values wait the right amount of time
        !           184: 
        !           185:         Give each probe a half-second grace period to probe a timeout
        !           186:         reply after the expected timeout time.'''
        !           187: 
        !           188:         begin = time.time()
        !           189:         self.write_command('19 send-probe ip-4 8.8.254.254 timeout 0')
        !           190:         self.parse_reply()
        !           191:         elapsed = time.time() - begin
        !           192:         self.assertLess(elapsed, 0.5)
        !           193: 
        !           194:         begin = time.time()
        !           195:         self.write_command('20 send-probe ip-4 8.8.254.254 timeout 1')
        !           196:         self.parse_reply()
        !           197:         elapsed = time.time() - begin
        !           198:         self.assertGreaterEqual(elapsed, 0.9)
        !           199:         self.assertLess(elapsed, 1.5)
        !           200: 
        !           201:         begin = time.time()
        !           202:         self.write_command('21 send-probe ip-4 8.8.254.254 timeout 3')
        !           203:         self.parse_reply()
        !           204:         elapsed = time.time() - begin
        !           205:         self.assertGreaterEqual(elapsed, 2.9)
        !           206:         self.assertLess(elapsed, 3.5)
        !           207: 
        !           208:     def test_ttl_expired(self):
        !           209:         'Test sending a probe which will have its time-to-live expire'
        !           210: 
        !           211:         #  Probe Goolge's DNS server, but give the probe only one hop
        !           212:         #  to live.
        !           213:         self.write_command('16 send-probe ip-4 8.8.8.8 ttl 1')
        !           214:         reply = self.parse_reply()
        !           215:         self.assertEqual(reply.command_name, 'ttl-expired')
        !           216:         self.assertIn('ip-4', reply.argument)
        !           217:         self.assertIn('round-trip-time', reply.argument)
        !           218: 
        !           219:     def test_parallel_probes(self):
        !           220:         '''Test sending multiple probes in parallel
        !           221: 
        !           222:         We will expect the probes to complete out-of-order by sending
        !           223:         a probe to a distant host immeidately followed by a probe to
        !           224:         the local host.'''
        !           225: 
        !           226:         success_count = 0
        !           227:         loop_count = 32
        !           228: 
        !           229:         # pylint: disable=locally-disabled, unused-variable
        !           230:         for i in range(loop_count):
        !           231:             #  Probe the distant host before the local host.
        !           232:             self.write_command('17 send-probe ip-4 8.8.8.8 timeout 1')
        !           233:             self.write_command('18 send-probe ip-4 127.0.0.1 timeout 1')
        !           234: 
        !           235:             reply = self.parse_reply()
        !           236:             if reply.command_name == 'no-reply':
        !           237:                 continue
        !           238: 
        !           239:             self.assertEqual(reply.command_name, 'reply')
        !           240:             self.assertIn('ip-4', reply.argument)
        !           241:             self.assertEqual(reply.argument['ip-4'], '127.0.0.1')
        !           242:             self.assertIn('round-trip-time', reply.argument)
        !           243:             first_time = int(reply.argument['round-trip-time'])
        !           244: 
        !           245:             reply = self.parse_reply()
        !           246:             if reply.command_name == 'no-reply':
        !           247:                 continue
        !           248: 
        !           249:             self.assertEqual(reply.command_name, 'reply')
        !           250:             self.assertIn('ip-4', reply.argument)
        !           251:             self.assertEqual(reply.argument['ip-4'], '8.8.8.8')
        !           252:             self.assertIn('round-trip-time', reply.argument)
        !           253:             second_time = int(reply.argument['round-trip-time'])
        !           254: 
        !           255:             #  Ensure we got a reply from the host with the lowest latency
        !           256:             #  first.
        !           257:             self.assertLess(first_time, second_time)
        !           258: 
        !           259:             success_count += 1
        !           260: 
        !           261:         #  We need 90% success to pass.  This allows a few probes to be
        !           262:         #  occasionally dropped by the network without failing the test.
        !           263:         required_success = int(loop_count * 0.90)
        !           264:         self.assertGreaterEqual(success_count, required_success)
        !           265: 
        !           266: 
        !           267: class TestProbeICMPv6(mtrpacket.MtrPacketTest):
        !           268:     '''Test sending probes using IP version 6'''
        !           269: 
        !           270:     def __init__(self, *args):
        !           271:         google_addr = resolve_ipv6_address(mtrpacket.IPV6_TEST_HOST)
        !           272: 
        !           273:         self.google_addr = google_addr  # type: str
        !           274: 
        !           275:         super(TestProbeICMPv6, self).__init__(*args)
        !           276: 
        !           277:     @unittest.skipUnless(mtrpacket.HAVE_IPV6, 'No IPv6')
        !           278:     def test_probe(self):
        !           279:         "Test a probe to Google's public DNS server"
        !           280: 
        !           281:         #  Probe Google's well-known DNS server and expect a reply
        !           282:         self.write_command('51 send-probe ip-6 ' + self.google_addr)
        !           283:         reply = self.parse_reply()
        !           284:         self.assertEqual(reply.command_name, 'reply')
        !           285:         self.assertIn('ip-6', reply.argument)
        !           286:         self.assertIn('round-trip-time', reply.argument)
        !           287: 
        !           288:         #  Probe the loopback, and check the address we get a reply from is
        !           289:         #  also the loopback.  While implementing IPv6, I had a bug where
        !           290:         #  the low bits of the received address got zeroed.  This checks for
        !           291:         #  that bug.
        !           292:         self.write_command('52 send-probe ip-6 ::1')
        !           293:         reply = self.parse_reply()
        !           294:         self.assertEqual(reply.command_name, 'reply')
        !           295:         self.assertIn('ip-6', reply.argument)
        !           296:         self.assertIn('round-trip-time', reply.argument)
        !           297:         self.assertEqual(reply.argument['ip-6'], '::1')
        !           298: 
        !           299:     @unittest.skipUnless(mtrpacket.HAVE_IPV6, 'No IPv6')
        !           300:     def test_ttl_expired(self):
        !           301:         'Test sending a probe which will have its time-to-live expire'
        !           302: 
        !           303:         #  Probe Goolge's DNS server, but give the probe only one hop
        !           304:         #  to live.
        !           305:         cmd = '53 send-probe ip-6 ' + self.google_addr + ' ttl 1'
        !           306:         self.write_command(cmd)
        !           307:         reply = self.parse_reply()
        !           308:         self.assertEqual('ttl-expired', reply.command_name)
        !           309:         self.assertIn('ip-6', reply.argument)
        !           310:         self.assertIn('round-trip-time', reply.argument)
        !           311: 
        !           312: 
        !           313: class TestProbeUDP(mtrpacket.MtrPacketTest):
        !           314:     'Test transmitting probes using UDP'
        !           315: 
        !           316:     def udp_port_test(self, address):  # type: (unicode) -> None
        !           317:         'Test UDP probes with variations on source port and dest port'
        !           318: 
        !           319:         if not check_feature(self, 'udp'):
        !           320:             return
        !           321: 
        !           322:         cmd = '80 send-probe protocol udp ' + address
        !           323:         self.write_command(cmd)
        !           324:         reply = self.parse_reply()
        !           325:         self.assertEqual('reply', reply.command_name)
        !           326: 
        !           327:         cmd = '81 send-probe protocol udp port 990 ' + address
        !           328:         self.write_command(cmd)
        !           329:         reply = self.parse_reply()
        !           330:         self.assertEqual('reply', reply.command_name)
        !           331: 
        !           332:         cmd = '82 send-probe protocol udp local-port 1991 ' + address
        !           333:         self.write_command(cmd)
        !           334:         reply = self.parse_reply()
        !           335:         self.assertEqual('reply', reply.command_name)
        !           336: 
        !           337:     def test_udp_v4(self):
        !           338:         'Test IPv4 UDP probes'
        !           339: 
        !           340:         test_basic_probe(self, 4, 'udp')
        !           341: 
        !           342:         self.udp_port_test('ip-4 127.0.0.1')
        !           343: 
        !           344:     @unittest.skipUnless(mtrpacket.HAVE_IPV6, 'No IPv6')
        !           345:     def test_udp_v6(self):
        !           346:         'Test IPv6 UDP probes'
        !           347: 
        !           348:         test_basic_probe(self, 6, 'udp')
        !           349: 
        !           350:         self.udp_port_test('ip-6 ::1')
        !           351: 
        !           352: 
        !           353: class TestProbeTCP(mtrpacket.MtrPacketTest):
        !           354:     'Test TCP probe support'
        !           355: 
        !           356:     def test_tcp_v4(self):
        !           357:         '''Test IPv4 TCP probes, with TTL expiration, to a refused port
        !           358:         and to an open port'''
        !           359: 
        !           360:         test_basic_probe(self, 4, 'tcp')
        !           361: 
        !           362:         if not check_feature(self, 'tcp'):
        !           363:             return
        !           364: 
        !           365:         #  Probe a local port assumed to be open  (ssh)
        !           366:         cmd = '80 send-probe ip-4 127.0.0.1 protocol tcp port 22'
        !           367:         self.write_command(cmd)
        !           368: 
        !           369:         reply = self.parse_reply()
        !           370:         self.assertEqual(reply.command_name, 'reply')
        !           371: 
        !           372:     @unittest.skipUnless(mtrpacket.HAVE_IPV6, 'No IPv6')
        !           373:     def test_tcp_v6(self):
        !           374:         'Test IPv6 TCP probes'
        !           375: 
        !           376:         test_basic_probe(self, 6, 'tcp')
        !           377: 
        !           378:         if not check_feature(self, 'tcp'):
        !           379:             return
        !           380: 
        !           381:         #  Probe a local port assumed to be open  (ssh)
        !           382:         cmd = '80 send-probe ip-6 ::1 protocol tcp port 22'
        !           383:         self.write_command(cmd)
        !           384: 
        !           385:         reply = self.parse_reply()
        !           386:         self.assertEqual(reply.command_name, 'reply')
        !           387: 
        !           388: 
        !           389: class TestProbeSCTP(mtrpacket.MtrPacketTest):
        !           390:     'Test SCTP probes'
        !           391: 
        !           392:     def test_sctp_v4(self):
        !           393:         'Test basic SCTP probes over IPv4'
        !           394: 
        !           395:         test_basic_probe(self, 4, 'sctp')
        !           396: 
        !           397:     @unittest.skipUnless(mtrpacket.HAVE_IPV6, 'No IPv6')
        !           398:     def test_sctp_v6(self):
        !           399:         'Test basic SCTP probes over IPv6'
        !           400: 
        !           401:         test_basic_probe(self, 6, 'sctp')
        !           402: 
        !           403: 
        !           404: if __name__ == '__main__':
        !           405:     mtrpacket.check_running_as_root()
        !           406:     unittest.main()

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>