Annotation of embedaddon/mtr/test/probe.py, revision 1.1.1.3

1.1       misho       1: #!/usr/bin/env python
                      2: #
                      3: #   mtr  --  a network diagnostic tool
                      4: #   Copyright (C) 2016  Matt Kimball
                      5: #
                      6: #   This program is free software; you can redistribute it and/or modify
                      7: #   it under the terms of the GNU General Public License version 2 as
                      8: #   published by the Free Software Foundation.
                      9: #
                     10: #   This program is distributed in the hope that it will be useful,
                     11: #   but WITHOUT ANY WARRANTY; without even the implied warranty of
                     12: #   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
                     13: #   GNU General Public License for more details.
                     14: #
1.1.1.2   misho      15: #   You should have received a copy of the GNU General Public License along
                     16: #   with this program; if not, write to the Free Software Foundation, Inc.,
                     17: #   51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
1.1       misho      18: #
                     19: 
1.1.1.3 ! misho      20: '''Test sending probes and receiving responses.'''
1.1       misho      21: 
                     22: import socket
                     23: import sys
                     24: import time
                     25: import unittest
                     26: 
                     27: import mtrpacket
                     28: 
                     29: 
                     30: def resolve_ipv6_address(hostname):  # type: (str) -> str
                     31:     'Resolve a hostname to an IP version 6 address'
                     32: 
                     33:     for addrinfo in socket.getaddrinfo(hostname, 0):
                     34:         # pylint: disable=locally-disabled, unused-variable
                     35:         (family, socktype, proto, name, sockaddr) = addrinfo
                     36: 
                     37:         if family == socket.AF_INET6:
                     38:             sockaddr6 = sockaddr  # type: tuple
                     39: 
                     40:             (address, port, flow, scope) = sockaddr6
                     41:             return address
                     42: 
                     43:     raise LookupError(hostname)
                     44: 
                     45: 
                     46: def check_feature(test, feature):
                     47:     'Check for support for a particular feature with mtr-packet'
                     48: 
                     49:     check_cmd = '70 check-support feature ' + feature
                     50:     test.write_command(check_cmd)
                     51: 
                     52:     reply = test.parse_reply()
                     53:     test.assertEqual(reply.command_name, 'feature-support')
                     54:     test.assertIn('support', reply.argument)
                     55: 
                     56:     if reply.argument['support'] != 'ok':
                     57:         return False
                     58: 
                     59:     return True
                     60: 
                     61: 
                     62: def test_basic_remote_probe(test, ip_version, protocol):
                     63:     'Test a probe to a remote host with a TTL of 1'
                     64: 
                     65:     protocol_str = 'protocol ' + protocol
                     66:     if ip_version == 6:
                     67:         address_str = 'ip-6 ' + resolve_ipv6_address(mtrpacket.IPV6_TEST_HOST)
                     68:     elif ip_version == 4:
                     69:         address_str = 'ip-4 8.8.8.8'
                     70:     else:
                     71:         raise ValueError(ip_version)
                     72: 
                     73:     cmd = '60 send-probe ' + \
                     74:         protocol_str + ' ' + address_str + ' port 164 ttl 1'
                     75:     test.write_command(cmd)
                     76: 
                     77:     reply = test.parse_reply()
                     78:     test.assertEqual(reply.command_name, 'ttl-expired')
                     79: 
                     80: 
                     81: def test_basic_local_probe(test, ip_version, protocol):
                     82:     'Test a probe to a closed port on localhost'
                     83: 
                     84:     protocol_str = 'protocol ' + protocol
                     85:     if ip_version == 6:
                     86:         address_str = 'ip-6 ::1'
                     87:     elif ip_version == 4:
                     88:         address_str = 'ip-4 127.0.0.1'
                     89: 
                     90:     cmd = '61 send-probe ' + \
                     91:         protocol_str + ' ' + address_str + ' port 164'
                     92:     test.write_command(cmd)
                     93: 
                     94:     reply = test.parse_reply()
                     95:     test.assertEqual(reply.command_name, 'reply')
                     96: 
                     97:     if ip_version == 6:
                     98:         test.assertIn('ip-6', reply.argument)
                     99:         test.assertEqual(reply.argument['ip-6'], '::1')
                    100:     elif ip_version == 4:
                    101:         test.assertIn('ip-4', reply.argument)
                    102:         test.assertEqual(reply.argument['ip-4'], '127.0.0.1')
                    103: 
                    104: 
                    105: def test_basic_probe(test, ip_version, protocol):
                    106:     # type: (mtrpacket.MtrPacketTest, int, unicode) -> None
                    107: 
                    108:     '''Test a probe with TTL expiration and a probe which reaches its
                    109:     destination with a particular protocol.'''
                    110: 
                    111:     if not check_feature(test, protocol):
                    112:         err_str = 'Skipping ' + protocol + ' test due to no support\n'
                    113:         sys.stderr.write(err_str.encode('utf-8'))
                    114:         return
                    115: 
                    116:     test_basic_remote_probe(test, ip_version, protocol)
                    117:     test_basic_local_probe(test, ip_version, protocol)
                    118: 
                    119: 
                    120: class TestProbeICMPv4(mtrpacket.MtrPacketTest):
                    121:     '''Test sending probes using IP version 4'''
                    122: 
                    123:     def test_probe(self):
                    124:         'Test sending regular ICMP probes to known addresses'
                    125: 
                    126:         #  Probe Google's well-known DNS server and expect a reply
                    127:         self.write_command('14 send-probe ip-4 8.8.8.8')
                    128:         reply = self.parse_reply()
                    129:         self.assertEqual(reply.token, 14)
                    130:         self.assertEqual(reply.command_name, 'reply')
                    131:         self.assertIn('ip-4', reply.argument)
                    132:         self.assertEqual(reply.argument['ip-4'], '8.8.8.8')
                    133:         self.assertIn('round-trip-time', reply.argument)
                    134: 
                    135:     def test_timeout(self):
                    136:         'Test timeouts when sending to a non-existant address'
                    137: 
                    138:         #
1.1.1.3 ! misho     139:         #  Probe a non-existent address, and expect no reply
1.1       misho     140:         #
                    141:         #  I'm not sure what the best way to find an address that doesn't
                    142:         #  exist, but is still route-able.  If we use a reserved IP
                    143:         #  address range, Windows will tell us it is non-routeable,
                    144:         #  rather than timing out when transmitting to that address.
                    145:         #
                    146:         #  We're just using a currently unused address in Google's
                    147:         #  range instead.  This is probably not the best solution.
                    148:         #
                    149: 
                    150:         # pylint: disable=locally-disabled, unused-variable
                    151:         for i in range(16):
                    152:             self.write_command('15 send-probe ip-4 8.8.254.254 timeout 1')
                    153:             reply = self.parse_reply()
                    154:             self.assertEqual(reply.token, 15)
                    155:             self.assertEqual(reply.command_name, 'no-reply')
                    156: 
                    157:     def test_exhaust_probes(self):
                    158:         'Test exhausting all available probes'
                    159: 
                    160:         probe_count = 4 * 1024
                    161:         token = 1024
                    162: 
                    163:         # pylint: disable=locally-disabled, unused-variable
                    164:         for i in range(probe_count):
                    165:             command = str(token) + ' send-probe ip-4 8.8.254.254 timeout 60'
                    166:             token += 1
                    167:             self.write_command(command)
                    168: 
                    169:             reply = None
                    170:             try:
                    171:                 reply = self.parse_reply(0)
                    172:             except mtrpacket.ReadReplyTimeout:
                    173:                 pass
                    174: 
                    175:             if reply:
                    176:                 if reply.command_name == 'probes-exhausted':
                    177:                     break
                    178: 
                    179:         self.assertIsNotNone(reply)
                    180:         self.assertEqual(reply.command_name, 'probes-exhausted')
                    181: 
                    182:     def test_timeout_values(self):
                    183:         '''Test that timeout values wait the right amount of time
                    184: 
                    185:         Give each probe a half-second grace period to probe a timeout
                    186:         reply after the expected timeout time.'''
                    187: 
                    188:         begin = time.time()
                    189:         self.write_command('19 send-probe ip-4 8.8.254.254 timeout 0')
                    190:         self.parse_reply()
                    191:         elapsed = time.time() - begin
                    192:         self.assertLess(elapsed, 0.5)
                    193: 
                    194:         begin = time.time()
                    195:         self.write_command('20 send-probe ip-4 8.8.254.254 timeout 1')
                    196:         self.parse_reply()
                    197:         elapsed = time.time() - begin
                    198:         self.assertGreaterEqual(elapsed, 0.9)
                    199:         self.assertLess(elapsed, 1.5)
                    200: 
                    201:         begin = time.time()
                    202:         self.write_command('21 send-probe ip-4 8.8.254.254 timeout 3')
                    203:         self.parse_reply()
                    204:         elapsed = time.time() - begin
                    205:         self.assertGreaterEqual(elapsed, 2.9)
                    206:         self.assertLess(elapsed, 3.5)
                    207: 
                    208:     def test_ttl_expired(self):
                    209:         'Test sending a probe which will have its time-to-live expire'
                    210: 
1.1.1.2   misho     211:         #  Probe Google's DNS server, but give the probe only one hop
1.1       misho     212:         #  to live.
                    213:         self.write_command('16 send-probe ip-4 8.8.8.8 ttl 1')
                    214:         reply = self.parse_reply()
                    215:         self.assertEqual(reply.command_name, 'ttl-expired')
                    216:         self.assertIn('ip-4', reply.argument)
                    217:         self.assertIn('round-trip-time', reply.argument)
                    218: 
                    219:     def test_parallel_probes(self):
                    220:         '''Test sending multiple probes in parallel
                    221: 
                    222:         We will expect the probes to complete out-of-order by sending
1.1.1.2   misho     223:         a probe to a distant host immediately followed by a probe to
1.1       misho     224:         the local host.'''
                    225: 
                    226:         success_count = 0
                    227:         loop_count = 32
                    228: 
                    229:         # pylint: disable=locally-disabled, unused-variable
                    230:         for i in range(loop_count):
                    231:             #  Probe the distant host before the local host.
                    232:             self.write_command('17 send-probe ip-4 8.8.8.8 timeout 1')
                    233:             self.write_command('18 send-probe ip-4 127.0.0.1 timeout 1')
                    234: 
                    235:             reply = self.parse_reply()
                    236:             if reply.command_name == 'no-reply':
                    237:                 continue
                    238: 
                    239:             self.assertEqual(reply.command_name, 'reply')
                    240:             self.assertIn('ip-4', reply.argument)
                    241:             self.assertEqual(reply.argument['ip-4'], '127.0.0.1')
                    242:             self.assertIn('round-trip-time', reply.argument)
                    243:             first_time = int(reply.argument['round-trip-time'])
                    244: 
                    245:             reply = self.parse_reply()
                    246:             if reply.command_name == 'no-reply':
                    247:                 continue
                    248: 
                    249:             self.assertEqual(reply.command_name, 'reply')
                    250:             self.assertIn('ip-4', reply.argument)
                    251:             self.assertEqual(reply.argument['ip-4'], '8.8.8.8')
                    252:             self.assertIn('round-trip-time', reply.argument)
                    253:             second_time = int(reply.argument['round-trip-time'])
                    254: 
                    255:             #  Ensure we got a reply from the host with the lowest latency
                    256:             #  first.
                    257:             self.assertLess(first_time, second_time)
                    258: 
                    259:             success_count += 1
                    260: 
                    261:         #  We need 90% success to pass.  This allows a few probes to be
                    262:         #  occasionally dropped by the network without failing the test.
                    263:         required_success = int(loop_count * 0.90)
                    264:         self.assertGreaterEqual(success_count, required_success)
                    265: 
                    266: 
                    267: class TestProbeICMPv6(mtrpacket.MtrPacketTest):
                    268:     '''Test sending probes using IP version 6'''
                    269: 
                    270:     def __init__(self, *args):
1.1.1.3 ! misho     271:         if mtrpacket.HAVE_IPV6:
        !           272:             google_addr = resolve_ipv6_address(mtrpacket.IPV6_TEST_HOST)
1.1       misho     273: 
1.1.1.3 ! misho     274:             self.google_addr = google_addr  # type: str
1.1       misho     275: 
                    276:         super(TestProbeICMPv6, self).__init__(*args)
                    277: 
                    278:     @unittest.skipUnless(mtrpacket.HAVE_IPV6, 'No IPv6')
                    279:     def test_probe(self):
                    280:         "Test a probe to Google's public DNS server"
                    281: 
                    282:         #  Probe Google's well-known DNS server and expect a reply
                    283:         self.write_command('51 send-probe ip-6 ' + self.google_addr)
                    284:         reply = self.parse_reply()
                    285:         self.assertEqual(reply.command_name, 'reply')
                    286:         self.assertIn('ip-6', reply.argument)
                    287:         self.assertIn('round-trip-time', reply.argument)
                    288: 
                    289:         #  Probe the loopback, and check the address we get a reply from is
                    290:         #  also the loopback.  While implementing IPv6, I had a bug where
                    291:         #  the low bits of the received address got zeroed.  This checks for
                    292:         #  that bug.
                    293:         self.write_command('52 send-probe ip-6 ::1')
                    294:         reply = self.parse_reply()
                    295:         self.assertEqual(reply.command_name, 'reply')
                    296:         self.assertIn('ip-6', reply.argument)
                    297:         self.assertIn('round-trip-time', reply.argument)
                    298:         self.assertEqual(reply.argument['ip-6'], '::1')
                    299: 
                    300:     @unittest.skipUnless(mtrpacket.HAVE_IPV6, 'No IPv6')
                    301:     def test_ttl_expired(self):
                    302:         'Test sending a probe which will have its time-to-live expire'
                    303: 
1.1.1.2   misho     304:         #  Probe Google's DNS server, but give the probe only one hop
1.1       misho     305:         #  to live.
                    306:         cmd = '53 send-probe ip-6 ' + self.google_addr + ' ttl 1'
                    307:         self.write_command(cmd)
                    308:         reply = self.parse_reply()
                    309:         self.assertEqual('ttl-expired', reply.command_name)
                    310:         self.assertIn('ip-6', reply.argument)
                    311:         self.assertIn('round-trip-time', reply.argument)
                    312: 
                    313: 
                    314: class TestProbeUDP(mtrpacket.MtrPacketTest):
                    315:     'Test transmitting probes using UDP'
                    316: 
                    317:     def udp_port_test(self, address):  # type: (unicode) -> None
                    318:         'Test UDP probes with variations on source port and dest port'
                    319: 
                    320:         if not check_feature(self, 'udp'):
                    321:             return
                    322: 
                    323:         cmd = '80 send-probe protocol udp ' + address
                    324:         self.write_command(cmd)
                    325:         reply = self.parse_reply()
                    326:         self.assertEqual('reply', reply.command_name)
                    327: 
                    328:         cmd = '81 send-probe protocol udp port 990 ' + address
                    329:         self.write_command(cmd)
                    330:         reply = self.parse_reply()
                    331:         self.assertEqual('reply', reply.command_name)
                    332: 
                    333:         cmd = '82 send-probe protocol udp local-port 1991 ' + address
                    334:         self.write_command(cmd)
                    335:         reply = self.parse_reply()
                    336:         self.assertEqual('reply', reply.command_name)
                    337: 
                    338:     def test_udp_v4(self):
                    339:         'Test IPv4 UDP probes'
                    340: 
                    341:         test_basic_probe(self, 4, 'udp')
                    342: 
                    343:         self.udp_port_test('ip-4 127.0.0.1')
                    344: 
                    345:     @unittest.skipUnless(mtrpacket.HAVE_IPV6, 'No IPv6')
                    346:     def test_udp_v6(self):
                    347:         'Test IPv6 UDP probes'
                    348: 
                    349:         test_basic_probe(self, 6, 'udp')
                    350: 
                    351:         self.udp_port_test('ip-6 ::1')
                    352: 
                    353: 
                    354: class TestProbeTCP(mtrpacket.MtrPacketTest):
                    355:     'Test TCP probe support'
                    356: 
                    357:     def test_tcp_v4(self):
                    358:         '''Test IPv4 TCP probes, with TTL expiration, to a refused port
                    359:         and to an open port'''
                    360: 
                    361:         test_basic_probe(self, 4, 'tcp')
                    362: 
                    363:         if not check_feature(self, 'tcp'):
                    364:             return
                    365: 
                    366:         #  Probe a local port assumed to be open  (ssh)
                    367:         cmd = '80 send-probe ip-4 127.0.0.1 protocol tcp port 22'
                    368:         self.write_command(cmd)
                    369: 
                    370:         reply = self.parse_reply()
                    371:         self.assertEqual(reply.command_name, 'reply')
                    372: 
                    373:     @unittest.skipUnless(mtrpacket.HAVE_IPV6, 'No IPv6')
                    374:     def test_tcp_v6(self):
                    375:         'Test IPv6 TCP probes'
                    376: 
                    377:         test_basic_probe(self, 6, 'tcp')
                    378: 
                    379:         if not check_feature(self, 'tcp'):
                    380:             return
                    381: 
                    382:         #  Probe a local port assumed to be open  (ssh)
                    383:         cmd = '80 send-probe ip-6 ::1 protocol tcp port 22'
                    384:         self.write_command(cmd)
                    385: 
                    386:         reply = self.parse_reply()
                    387:         self.assertEqual(reply.command_name, 'reply')
                    388: 
                    389: 
                    390: class TestProbeSCTP(mtrpacket.MtrPacketTest):
                    391:     'Test SCTP probes'
                    392: 
                    393:     def test_sctp_v4(self):
                    394:         'Test basic SCTP probes over IPv4'
                    395: 
                    396:         test_basic_probe(self, 4, 'sctp')
                    397: 
                    398:     @unittest.skipUnless(mtrpacket.HAVE_IPV6, 'No IPv6')
                    399:     def test_sctp_v6(self):
                    400:         'Test basic SCTP probes over IPv6'
                    401: 
                    402:         test_basic_probe(self, 6, 'sctp')
                    403: 
                    404: 
                    405: if __name__ == '__main__':
                    406:     mtrpacket.check_running_as_root()
                    407:     unittest.main()

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>