Annotation of embedaddon/mtr/test/probe.py, revision 1.1.1.2

1.1       misho       1: #!/usr/bin/env python
                      2: #
                      3: #   mtr  --  a network diagnostic tool
                      4: #   Copyright (C) 2016  Matt Kimball
                      5: #
                      6: #   This program is free software; you can redistribute it and/or modify
                      7: #   it under the terms of the GNU General Public License version 2 as
                      8: #   published by the Free Software Foundation.
                      9: #
                     10: #   This program is distributed in the hope that it will be useful,
                     11: #   but WITHOUT ANY WARRANTY; without even the implied warranty of
                     12: #   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
                     13: #   GNU General Public License for more details.
                     14: #
1.1.1.2 ! misho      15: #   You should have received a copy of the GNU General Public License along
        !            16: #   with this program; if not, write to the Free Software Foundation, Inc.,
        !            17: #   51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
1.1       misho      18: #
                     19: 
                     20: '''Test sending probes and receiving respones.'''
                     21: 
                     22: import socket
                     23: import sys
                     24: import time
                     25: import unittest
                     26: 
                     27: import mtrpacket
                     28: 
                     29: 
                     30: def resolve_ipv6_address(hostname):  # type: (str) -> str
                     31:     'Resolve a hostname to an IP version 6 address'
                     32: 
                     33:     for addrinfo in socket.getaddrinfo(hostname, 0):
                     34:         # pylint: disable=locally-disabled, unused-variable
                     35:         (family, socktype, proto, name, sockaddr) = addrinfo
                     36: 
                     37:         if family == socket.AF_INET6:
                     38:             sockaddr6 = sockaddr  # type: tuple
                     39: 
                     40:             (address, port, flow, scope) = sockaddr6
                     41:             return address
                     42: 
                     43:     raise LookupError(hostname)
                     44: 
                     45: 
                     46: def check_feature(test, feature):
                     47:     'Check for support for a particular feature with mtr-packet'
                     48: 
                     49:     check_cmd = '70 check-support feature ' + feature
                     50:     test.write_command(check_cmd)
                     51: 
                     52:     reply = test.parse_reply()
                     53:     test.assertEqual(reply.command_name, 'feature-support')
                     54:     test.assertIn('support', reply.argument)
                     55: 
                     56:     if reply.argument['support'] != 'ok':
                     57:         return False
                     58: 
                     59:     return True
                     60: 
                     61: 
                     62: def test_basic_remote_probe(test, ip_version, protocol):
                     63:     'Test a probe to a remote host with a TTL of 1'
                     64: 
                     65:     protocol_str = 'protocol ' + protocol
                     66:     if ip_version == 6:
                     67:         address_str = 'ip-6 ' + resolve_ipv6_address(mtrpacket.IPV6_TEST_HOST)
                     68:     elif ip_version == 4:
                     69:         address_str = 'ip-4 8.8.8.8'
                     70:     else:
                     71:         raise ValueError(ip_version)
                     72: 
                     73:     cmd = '60 send-probe ' + \
                     74:         protocol_str + ' ' + address_str + ' port 164 ttl 1'
                     75:     test.write_command(cmd)
                     76: 
                     77:     reply = test.parse_reply()
                     78:     test.assertEqual(reply.command_name, 'ttl-expired')
                     79: 
                     80: 
                     81: def test_basic_local_probe(test, ip_version, protocol):
                     82:     'Test a probe to a closed port on localhost'
                     83: 
                     84:     protocol_str = 'protocol ' + protocol
                     85:     if ip_version == 6:
                     86:         address_str = 'ip-6 ::1'
                     87:     elif ip_version == 4:
                     88:         address_str = 'ip-4 127.0.0.1'
                     89: 
                     90:     cmd = '61 send-probe ' + \
                     91:         protocol_str + ' ' + address_str + ' port 164'
                     92:     test.write_command(cmd)
                     93: 
                     94:     reply = test.parse_reply()
                     95:     test.assertEqual(reply.command_name, 'reply')
                     96: 
                     97:     if ip_version == 6:
                     98:         test.assertIn('ip-6', reply.argument)
                     99:         test.assertEqual(reply.argument['ip-6'], '::1')
                    100:     elif ip_version == 4:
                    101:         test.assertIn('ip-4', reply.argument)
                    102:         test.assertEqual(reply.argument['ip-4'], '127.0.0.1')
                    103: 
                    104: 
                    105: def test_basic_probe(test, ip_version, protocol):
                    106:     # type: (mtrpacket.MtrPacketTest, int, unicode) -> None
                    107: 
                    108:     '''Test a probe with TTL expiration and a probe which reaches its
                    109:     destination with a particular protocol.'''
                    110: 
                    111:     if not check_feature(test, protocol):
                    112:         err_str = 'Skipping ' + protocol + ' test due to no support\n'
                    113:         sys.stderr.write(err_str.encode('utf-8'))
                    114:         return
                    115: 
                    116:     test_basic_remote_probe(test, ip_version, protocol)
                    117:     test_basic_local_probe(test, ip_version, protocol)
                    118: 
                    119: 
                    120: class TestProbeICMPv4(mtrpacket.MtrPacketTest):
                    121:     '''Test sending probes using IP version 4'''
                    122: 
                    123:     def test_probe(self):
                    124:         'Test sending regular ICMP probes to known addresses'
                    125: 
                    126:         #  Probe Google's well-known DNS server and expect a reply
                    127:         self.write_command('14 send-probe ip-4 8.8.8.8')
                    128:         reply = self.parse_reply()
                    129:         self.assertEqual(reply.token, 14)
                    130:         self.assertEqual(reply.command_name, 'reply')
                    131:         self.assertIn('ip-4', reply.argument)
                    132:         self.assertEqual(reply.argument['ip-4'], '8.8.8.8')
                    133:         self.assertIn('round-trip-time', reply.argument)
                    134: 
                    135:     def test_timeout(self):
                    136:         'Test timeouts when sending to a non-existant address'
                    137: 
                    138:         #
                    139:         #  Probe a non-existant address, and expect no reply
                    140:         #
                    141:         #  I'm not sure what the best way to find an address that doesn't
                    142:         #  exist, but is still route-able.  If we use a reserved IP
                    143:         #  address range, Windows will tell us it is non-routeable,
                    144:         #  rather than timing out when transmitting to that address.
                    145:         #
                    146:         #  We're just using a currently unused address in Google's
                    147:         #  range instead.  This is probably not the best solution.
                    148:         #
                    149: 
                    150:         # pylint: disable=locally-disabled, unused-variable
                    151:         for i in range(16):
                    152:             self.write_command('15 send-probe ip-4 8.8.254.254 timeout 1')
                    153:             reply = self.parse_reply()
                    154:             self.assertEqual(reply.token, 15)
                    155:             self.assertEqual(reply.command_name, 'no-reply')
                    156: 
                    157:     def test_exhaust_probes(self):
                    158:         'Test exhausting all available probes'
                    159: 
                    160:         probe_count = 4 * 1024
                    161:         token = 1024
                    162: 
                    163:         # pylint: disable=locally-disabled, unused-variable
                    164:         for i in range(probe_count):
                    165:             command = str(token) + ' send-probe ip-4 8.8.254.254 timeout 60'
                    166:             token += 1
                    167:             self.write_command(command)
                    168: 
                    169:             reply = None
                    170:             try:
                    171:                 reply = self.parse_reply(0)
                    172:             except mtrpacket.ReadReplyTimeout:
                    173:                 pass
                    174: 
                    175:             if reply:
                    176:                 if reply.command_name == 'probes-exhausted':
                    177:                     break
                    178: 
                    179:         self.assertIsNotNone(reply)
                    180:         self.assertEqual(reply.command_name, 'probes-exhausted')
                    181: 
                    182:     def test_timeout_values(self):
                    183:         '''Test that timeout values wait the right amount of time
                    184: 
                    185:         Give each probe a half-second grace period to probe a timeout
                    186:         reply after the expected timeout time.'''
                    187: 
                    188:         begin = time.time()
                    189:         self.write_command('19 send-probe ip-4 8.8.254.254 timeout 0')
                    190:         self.parse_reply()
                    191:         elapsed = time.time() - begin
                    192:         self.assertLess(elapsed, 0.5)
                    193: 
                    194:         begin = time.time()
                    195:         self.write_command('20 send-probe ip-4 8.8.254.254 timeout 1')
                    196:         self.parse_reply()
                    197:         elapsed = time.time() - begin
                    198:         self.assertGreaterEqual(elapsed, 0.9)
                    199:         self.assertLess(elapsed, 1.5)
                    200: 
                    201:         begin = time.time()
                    202:         self.write_command('21 send-probe ip-4 8.8.254.254 timeout 3')
                    203:         self.parse_reply()
                    204:         elapsed = time.time() - begin
                    205:         self.assertGreaterEqual(elapsed, 2.9)
                    206:         self.assertLess(elapsed, 3.5)
                    207: 
                    208:     def test_ttl_expired(self):
                    209:         'Test sending a probe which will have its time-to-live expire'
                    210: 
1.1.1.2 ! misho     211:         #  Probe Google's DNS server, but give the probe only one hop
1.1       misho     212:         #  to live.
                    213:         self.write_command('16 send-probe ip-4 8.8.8.8 ttl 1')
                    214:         reply = self.parse_reply()
                    215:         self.assertEqual(reply.command_name, 'ttl-expired')
                    216:         self.assertIn('ip-4', reply.argument)
                    217:         self.assertIn('round-trip-time', reply.argument)
                    218: 
                    219:     def test_parallel_probes(self):
                    220:         '''Test sending multiple probes in parallel
                    221: 
                    222:         We will expect the probes to complete out-of-order by sending
1.1.1.2 ! misho     223:         a probe to a distant host immediately followed by a probe to
1.1       misho     224:         the local host.'''
                    225: 
                    226:         success_count = 0
                    227:         loop_count = 32
                    228: 
                    229:         # pylint: disable=locally-disabled, unused-variable
                    230:         for i in range(loop_count):
                    231:             #  Probe the distant host before the local host.
                    232:             self.write_command('17 send-probe ip-4 8.8.8.8 timeout 1')
                    233:             self.write_command('18 send-probe ip-4 127.0.0.1 timeout 1')
                    234: 
                    235:             reply = self.parse_reply()
                    236:             if reply.command_name == 'no-reply':
                    237:                 continue
                    238: 
                    239:             self.assertEqual(reply.command_name, 'reply')
                    240:             self.assertIn('ip-4', reply.argument)
                    241:             self.assertEqual(reply.argument['ip-4'], '127.0.0.1')
                    242:             self.assertIn('round-trip-time', reply.argument)
                    243:             first_time = int(reply.argument['round-trip-time'])
                    244: 
                    245:             reply = self.parse_reply()
                    246:             if reply.command_name == 'no-reply':
                    247:                 continue
                    248: 
                    249:             self.assertEqual(reply.command_name, 'reply')
                    250:             self.assertIn('ip-4', reply.argument)
                    251:             self.assertEqual(reply.argument['ip-4'], '8.8.8.8')
                    252:             self.assertIn('round-trip-time', reply.argument)
                    253:             second_time = int(reply.argument['round-trip-time'])
                    254: 
                    255:             #  Ensure we got a reply from the host with the lowest latency
                    256:             #  first.
                    257:             self.assertLess(first_time, second_time)
                    258: 
                    259:             success_count += 1
                    260: 
                    261:         #  We need 90% success to pass.  This allows a few probes to be
                    262:         #  occasionally dropped by the network without failing the test.
                    263:         required_success = int(loop_count * 0.90)
                    264:         self.assertGreaterEqual(success_count, required_success)
                    265: 
                    266: 
                    267: class TestProbeICMPv6(mtrpacket.MtrPacketTest):
                    268:     '''Test sending probes using IP version 6'''
                    269: 
                    270:     def __init__(self, *args):
                    271:         google_addr = resolve_ipv6_address(mtrpacket.IPV6_TEST_HOST)
                    272: 
                    273:         self.google_addr = google_addr  # type: str
                    274: 
                    275:         super(TestProbeICMPv6, self).__init__(*args)
                    276: 
                    277:     @unittest.skipUnless(mtrpacket.HAVE_IPV6, 'No IPv6')
                    278:     def test_probe(self):
                    279:         "Test a probe to Google's public DNS server"
                    280: 
                    281:         #  Probe Google's well-known DNS server and expect a reply
                    282:         self.write_command('51 send-probe ip-6 ' + self.google_addr)
                    283:         reply = self.parse_reply()
                    284:         self.assertEqual(reply.command_name, 'reply')
                    285:         self.assertIn('ip-6', reply.argument)
                    286:         self.assertIn('round-trip-time', reply.argument)
                    287: 
                    288:         #  Probe the loopback, and check the address we get a reply from is
                    289:         #  also the loopback.  While implementing IPv6, I had a bug where
                    290:         #  the low bits of the received address got zeroed.  This checks for
                    291:         #  that bug.
                    292:         self.write_command('52 send-probe ip-6 ::1')
                    293:         reply = self.parse_reply()
                    294:         self.assertEqual(reply.command_name, 'reply')
                    295:         self.assertIn('ip-6', reply.argument)
                    296:         self.assertIn('round-trip-time', reply.argument)
                    297:         self.assertEqual(reply.argument['ip-6'], '::1')
                    298: 
                    299:     @unittest.skipUnless(mtrpacket.HAVE_IPV6, 'No IPv6')
                    300:     def test_ttl_expired(self):
                    301:         'Test sending a probe which will have its time-to-live expire'
                    302: 
1.1.1.2 ! misho     303:         #  Probe Google's DNS server, but give the probe only one hop
1.1       misho     304:         #  to live.
                    305:         cmd = '53 send-probe ip-6 ' + self.google_addr + ' ttl 1'
                    306:         self.write_command(cmd)
                    307:         reply = self.parse_reply()
                    308:         self.assertEqual('ttl-expired', reply.command_name)
                    309:         self.assertIn('ip-6', reply.argument)
                    310:         self.assertIn('round-trip-time', reply.argument)
                    311: 
                    312: 
                    313: class TestProbeUDP(mtrpacket.MtrPacketTest):
                    314:     'Test transmitting probes using UDP'
                    315: 
                    316:     def udp_port_test(self, address):  # type: (unicode) -> None
                    317:         'Test UDP probes with variations on source port and dest port'
                    318: 
                    319:         if not check_feature(self, 'udp'):
                    320:             return
                    321: 
                    322:         cmd = '80 send-probe protocol udp ' + address
                    323:         self.write_command(cmd)
                    324:         reply = self.parse_reply()
                    325:         self.assertEqual('reply', reply.command_name)
                    326: 
                    327:         cmd = '81 send-probe protocol udp port 990 ' + address
                    328:         self.write_command(cmd)
                    329:         reply = self.parse_reply()
                    330:         self.assertEqual('reply', reply.command_name)
                    331: 
                    332:         cmd = '82 send-probe protocol udp local-port 1991 ' + address
                    333:         self.write_command(cmd)
                    334:         reply = self.parse_reply()
                    335:         self.assertEqual('reply', reply.command_name)
                    336: 
                    337:     def test_udp_v4(self):
                    338:         'Test IPv4 UDP probes'
                    339: 
                    340:         test_basic_probe(self, 4, 'udp')
                    341: 
                    342:         self.udp_port_test('ip-4 127.0.0.1')
                    343: 
                    344:     @unittest.skipUnless(mtrpacket.HAVE_IPV6, 'No IPv6')
                    345:     def test_udp_v6(self):
                    346:         'Test IPv6 UDP probes'
                    347: 
                    348:         test_basic_probe(self, 6, 'udp')
                    349: 
                    350:         self.udp_port_test('ip-6 ::1')
                    351: 
                    352: 
                    353: class TestProbeTCP(mtrpacket.MtrPacketTest):
                    354:     'Test TCP probe support'
                    355: 
                    356:     def test_tcp_v4(self):
                    357:         '''Test IPv4 TCP probes, with TTL expiration, to a refused port
                    358:         and to an open port'''
                    359: 
                    360:         test_basic_probe(self, 4, 'tcp')
                    361: 
                    362:         if not check_feature(self, 'tcp'):
                    363:             return
                    364: 
                    365:         #  Probe a local port assumed to be open  (ssh)
                    366:         cmd = '80 send-probe ip-4 127.0.0.1 protocol tcp port 22'
                    367:         self.write_command(cmd)
                    368: 
                    369:         reply = self.parse_reply()
                    370:         self.assertEqual(reply.command_name, 'reply')
                    371: 
                    372:     @unittest.skipUnless(mtrpacket.HAVE_IPV6, 'No IPv6')
                    373:     def test_tcp_v6(self):
                    374:         'Test IPv6 TCP probes'
                    375: 
                    376:         test_basic_probe(self, 6, 'tcp')
                    377: 
                    378:         if not check_feature(self, 'tcp'):
                    379:             return
                    380: 
                    381:         #  Probe a local port assumed to be open  (ssh)
                    382:         cmd = '80 send-probe ip-6 ::1 protocol tcp port 22'
                    383:         self.write_command(cmd)
                    384: 
                    385:         reply = self.parse_reply()
                    386:         self.assertEqual(reply.command_name, 'reply')
                    387: 
                    388: 
                    389: class TestProbeSCTP(mtrpacket.MtrPacketTest):
                    390:     'Test SCTP probes'
                    391: 
                    392:     def test_sctp_v4(self):
                    393:         'Test basic SCTP probes over IPv4'
                    394: 
                    395:         test_basic_probe(self, 4, 'sctp')
                    396: 
                    397:     @unittest.skipUnless(mtrpacket.HAVE_IPV6, 'No IPv6')
                    398:     def test_sctp_v6(self):
                    399:         'Test basic SCTP probes over IPv6'
                    400: 
                    401:         test_basic_probe(self, 6, 'sctp')
                    402: 
                    403: 
                    404: if __name__ == '__main__':
                    405:     mtrpacket.check_running_as_root()
                    406:     unittest.main()

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>