File:  [ELWIX - Embedded LightWeight unIX -] / embedaddon / mtr / test / probe.py
Revision 1.1.1.3 (vendor branch): download - view: text, annotated - select for diffs - revision graph
Wed Sep 27 11:18:58 2023 UTC (18 months, 1 week ago) by misho
Branches: mtr, MAIN
CVS tags: v0_95, HEAD
Version 0.95

    1: #!/usr/bin/env python
    2: #
    3: #   mtr  --  a network diagnostic tool
    4: #   Copyright (C) 2016  Matt Kimball
    5: #
    6: #   This program is free software; you can redistribute it and/or modify
    7: #   it under the terms of the GNU General Public License version 2 as
    8: #   published by the Free Software Foundation.
    9: #
   10: #   This program is distributed in the hope that it will be useful,
   11: #   but WITHOUT ANY WARRANTY; without even the implied warranty of
   12: #   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   13: #   GNU General Public License for more details.
   14: #
   15: #   You should have received a copy of the GNU General Public License along
   16: #   with this program; if not, write to the Free Software Foundation, Inc.,
   17: #   51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
   18: #
   19: 
   20: '''Test sending probes and receiving responses.'''
   21: 
   22: import socket
   23: import sys
   24: import time
   25: import unittest
   26: 
   27: import mtrpacket
   28: 
   29: 
   30: def resolve_ipv6_address(hostname):  # type: (str) -> str
   31:     'Resolve a hostname to an IP version 6 address'
   32: 
   33:     for addrinfo in socket.getaddrinfo(hostname, 0):
   34:         # pylint: disable=locally-disabled, unused-variable
   35:         (family, socktype, proto, name, sockaddr) = addrinfo
   36: 
   37:         if family == socket.AF_INET6:
   38:             sockaddr6 = sockaddr  # type: tuple
   39: 
   40:             (address, port, flow, scope) = sockaddr6
   41:             return address
   42: 
   43:     raise LookupError(hostname)
   44: 
   45: 
   46: def check_feature(test, feature):
   47:     'Check for support for a particular feature with mtr-packet'
   48: 
   49:     check_cmd = '70 check-support feature ' + feature
   50:     test.write_command(check_cmd)
   51: 
   52:     reply = test.parse_reply()
   53:     test.assertEqual(reply.command_name, 'feature-support')
   54:     test.assertIn('support', reply.argument)
   55: 
   56:     if reply.argument['support'] != 'ok':
   57:         return False
   58: 
   59:     return True
   60: 
   61: 
   62: def test_basic_remote_probe(test, ip_version, protocol):
   63:     'Test a probe to a remote host with a TTL of 1'
   64: 
   65:     protocol_str = 'protocol ' + protocol
   66:     if ip_version == 6:
   67:         address_str = 'ip-6 ' + resolve_ipv6_address(mtrpacket.IPV6_TEST_HOST)
   68:     elif ip_version == 4:
   69:         address_str = 'ip-4 8.8.8.8'
   70:     else:
   71:         raise ValueError(ip_version)
   72: 
   73:     cmd = '60 send-probe ' + \
   74:         protocol_str + ' ' + address_str + ' port 164 ttl 1'
   75:     test.write_command(cmd)
   76: 
   77:     reply = test.parse_reply()
   78:     test.assertEqual(reply.command_name, 'ttl-expired')
   79: 
   80: 
   81: def test_basic_local_probe(test, ip_version, protocol):
   82:     'Test a probe to a closed port on localhost'
   83: 
   84:     protocol_str = 'protocol ' + protocol
   85:     if ip_version == 6:
   86:         address_str = 'ip-6 ::1'
   87:     elif ip_version == 4:
   88:         address_str = 'ip-4 127.0.0.1'
   89: 
   90:     cmd = '61 send-probe ' + \
   91:         protocol_str + ' ' + address_str + ' port 164'
   92:     test.write_command(cmd)
   93: 
   94:     reply = test.parse_reply()
   95:     test.assertEqual(reply.command_name, 'reply')
   96: 
   97:     if ip_version == 6:
   98:         test.assertIn('ip-6', reply.argument)
   99:         test.assertEqual(reply.argument['ip-6'], '::1')
  100:     elif ip_version == 4:
  101:         test.assertIn('ip-4', reply.argument)
  102:         test.assertEqual(reply.argument['ip-4'], '127.0.0.1')
  103: 
  104: 
  105: def test_basic_probe(test, ip_version, protocol):
  106:     # type: (mtrpacket.MtrPacketTest, int, unicode) -> None
  107: 
  108:     '''Test a probe with TTL expiration and a probe which reaches its
  109:     destination with a particular protocol.'''
  110: 
  111:     if not check_feature(test, protocol):
  112:         err_str = 'Skipping ' + protocol + ' test due to no support\n'
  113:         sys.stderr.write(err_str.encode('utf-8'))
  114:         return
  115: 
  116:     test_basic_remote_probe(test, ip_version, protocol)
  117:     test_basic_local_probe(test, ip_version, protocol)
  118: 
  119: 
  120: class TestProbeICMPv4(mtrpacket.MtrPacketTest):
  121:     '''Test sending probes using IP version 4'''
  122: 
  123:     def test_probe(self):
  124:         'Test sending regular ICMP probes to known addresses'
  125: 
  126:         #  Probe Google's well-known DNS server and expect a reply
  127:         self.write_command('14 send-probe ip-4 8.8.8.8')
  128:         reply = self.parse_reply()
  129:         self.assertEqual(reply.token, 14)
  130:         self.assertEqual(reply.command_name, 'reply')
  131:         self.assertIn('ip-4', reply.argument)
  132:         self.assertEqual(reply.argument['ip-4'], '8.8.8.8')
  133:         self.assertIn('round-trip-time', reply.argument)
  134: 
  135:     def test_timeout(self):
  136:         'Test timeouts when sending to a non-existant address'
  137: 
  138:         #
  139:         #  Probe a non-existent address, and expect no reply
  140:         #
  141:         #  I'm not sure what the best way to find an address that doesn't
  142:         #  exist, but is still route-able.  If we use a reserved IP
  143:         #  address range, Windows will tell us it is non-routeable,
  144:         #  rather than timing out when transmitting to that address.
  145:         #
  146:         #  We're just using a currently unused address in Google's
  147:         #  range instead.  This is probably not the best solution.
  148:         #
  149: 
  150:         # pylint: disable=locally-disabled, unused-variable
  151:         for i in range(16):
  152:             self.write_command('15 send-probe ip-4 8.8.254.254 timeout 1')
  153:             reply = self.parse_reply()
  154:             self.assertEqual(reply.token, 15)
  155:             self.assertEqual(reply.command_name, 'no-reply')
  156: 
  157:     def test_exhaust_probes(self):
  158:         'Test exhausting all available probes'
  159: 
  160:         probe_count = 4 * 1024
  161:         token = 1024
  162: 
  163:         # pylint: disable=locally-disabled, unused-variable
  164:         for i in range(probe_count):
  165:             command = str(token) + ' send-probe ip-4 8.8.254.254 timeout 60'
  166:             token += 1
  167:             self.write_command(command)
  168: 
  169:             reply = None
  170:             try:
  171:                 reply = self.parse_reply(0)
  172:             except mtrpacket.ReadReplyTimeout:
  173:                 pass
  174: 
  175:             if reply:
  176:                 if reply.command_name == 'probes-exhausted':
  177:                     break
  178: 
  179:         self.assertIsNotNone(reply)
  180:         self.assertEqual(reply.command_name, 'probes-exhausted')
  181: 
  182:     def test_timeout_values(self):
  183:         '''Test that timeout values wait the right amount of time
  184: 
  185:         Give each probe a half-second grace period to probe a timeout
  186:         reply after the expected timeout time.'''
  187: 
  188:         begin = time.time()
  189:         self.write_command('19 send-probe ip-4 8.8.254.254 timeout 0')
  190:         self.parse_reply()
  191:         elapsed = time.time() - begin
  192:         self.assertLess(elapsed, 0.5)
  193: 
  194:         begin = time.time()
  195:         self.write_command('20 send-probe ip-4 8.8.254.254 timeout 1')
  196:         self.parse_reply()
  197:         elapsed = time.time() - begin
  198:         self.assertGreaterEqual(elapsed, 0.9)
  199:         self.assertLess(elapsed, 1.5)
  200: 
  201:         begin = time.time()
  202:         self.write_command('21 send-probe ip-4 8.8.254.254 timeout 3')
  203:         self.parse_reply()
  204:         elapsed = time.time() - begin
  205:         self.assertGreaterEqual(elapsed, 2.9)
  206:         self.assertLess(elapsed, 3.5)
  207: 
  208:     def test_ttl_expired(self):
  209:         'Test sending a probe which will have its time-to-live expire'
  210: 
  211:         #  Probe Google's DNS server, but give the probe only one hop
  212:         #  to live.
  213:         self.write_command('16 send-probe ip-4 8.8.8.8 ttl 1')
  214:         reply = self.parse_reply()
  215:         self.assertEqual(reply.command_name, 'ttl-expired')
  216:         self.assertIn('ip-4', reply.argument)
  217:         self.assertIn('round-trip-time', reply.argument)
  218: 
  219:     def test_parallel_probes(self):
  220:         '''Test sending multiple probes in parallel
  221: 
  222:         We will expect the probes to complete out-of-order by sending
  223:         a probe to a distant host immediately followed by a probe to
  224:         the local host.'''
  225: 
  226:         success_count = 0
  227:         loop_count = 32
  228: 
  229:         # pylint: disable=locally-disabled, unused-variable
  230:         for i in range(loop_count):
  231:             #  Probe the distant host before the local host.
  232:             self.write_command('17 send-probe ip-4 8.8.8.8 timeout 1')
  233:             self.write_command('18 send-probe ip-4 127.0.0.1 timeout 1')
  234: 
  235:             reply = self.parse_reply()
  236:             if reply.command_name == 'no-reply':
  237:                 continue
  238: 
  239:             self.assertEqual(reply.command_name, 'reply')
  240:             self.assertIn('ip-4', reply.argument)
  241:             self.assertEqual(reply.argument['ip-4'], '127.0.0.1')
  242:             self.assertIn('round-trip-time', reply.argument)
  243:             first_time = int(reply.argument['round-trip-time'])
  244: 
  245:             reply = self.parse_reply()
  246:             if reply.command_name == 'no-reply':
  247:                 continue
  248: 
  249:             self.assertEqual(reply.command_name, 'reply')
  250:             self.assertIn('ip-4', reply.argument)
  251:             self.assertEqual(reply.argument['ip-4'], '8.8.8.8')
  252:             self.assertIn('round-trip-time', reply.argument)
  253:             second_time = int(reply.argument['round-trip-time'])
  254: 
  255:             #  Ensure we got a reply from the host with the lowest latency
  256:             #  first.
  257:             self.assertLess(first_time, second_time)
  258: 
  259:             success_count += 1
  260: 
  261:         #  We need 90% success to pass.  This allows a few probes to be
  262:         #  occasionally dropped by the network without failing the test.
  263:         required_success = int(loop_count * 0.90)
  264:         self.assertGreaterEqual(success_count, required_success)
  265: 
  266: 
  267: class TestProbeICMPv6(mtrpacket.MtrPacketTest):
  268:     '''Test sending probes using IP version 6'''
  269: 
  270:     def __init__(self, *args):
  271:         if mtrpacket.HAVE_IPV6:
  272:             google_addr = resolve_ipv6_address(mtrpacket.IPV6_TEST_HOST)
  273: 
  274:             self.google_addr = google_addr  # type: str
  275: 
  276:         super(TestProbeICMPv6, self).__init__(*args)
  277: 
  278:     @unittest.skipUnless(mtrpacket.HAVE_IPV6, 'No IPv6')
  279:     def test_probe(self):
  280:         "Test a probe to Google's public DNS server"
  281: 
  282:         #  Probe Google's well-known DNS server and expect a reply
  283:         self.write_command('51 send-probe ip-6 ' + self.google_addr)
  284:         reply = self.parse_reply()
  285:         self.assertEqual(reply.command_name, 'reply')
  286:         self.assertIn('ip-6', reply.argument)
  287:         self.assertIn('round-trip-time', reply.argument)
  288: 
  289:         #  Probe the loopback, and check the address we get a reply from is
  290:         #  also the loopback.  While implementing IPv6, I had a bug where
  291:         #  the low bits of the received address got zeroed.  This checks for
  292:         #  that bug.
  293:         self.write_command('52 send-probe ip-6 ::1')
  294:         reply = self.parse_reply()
  295:         self.assertEqual(reply.command_name, 'reply')
  296:         self.assertIn('ip-6', reply.argument)
  297:         self.assertIn('round-trip-time', reply.argument)
  298:         self.assertEqual(reply.argument['ip-6'], '::1')
  299: 
  300:     @unittest.skipUnless(mtrpacket.HAVE_IPV6, 'No IPv6')
  301:     def test_ttl_expired(self):
  302:         'Test sending a probe which will have its time-to-live expire'
  303: 
  304:         #  Probe Google's DNS server, but give the probe only one hop
  305:         #  to live.
  306:         cmd = '53 send-probe ip-6 ' + self.google_addr + ' ttl 1'
  307:         self.write_command(cmd)
  308:         reply = self.parse_reply()
  309:         self.assertEqual('ttl-expired', reply.command_name)
  310:         self.assertIn('ip-6', reply.argument)
  311:         self.assertIn('round-trip-time', reply.argument)
  312: 
  313: 
  314: class TestProbeUDP(mtrpacket.MtrPacketTest):
  315:     'Test transmitting probes using UDP'
  316: 
  317:     def udp_port_test(self, address):  # type: (unicode) -> None
  318:         'Test UDP probes with variations on source port and dest port'
  319: 
  320:         if not check_feature(self, 'udp'):
  321:             return
  322: 
  323:         cmd = '80 send-probe protocol udp ' + address
  324:         self.write_command(cmd)
  325:         reply = self.parse_reply()
  326:         self.assertEqual('reply', reply.command_name)
  327: 
  328:         cmd = '81 send-probe protocol udp port 990 ' + address
  329:         self.write_command(cmd)
  330:         reply = self.parse_reply()
  331:         self.assertEqual('reply', reply.command_name)
  332: 
  333:         cmd = '82 send-probe protocol udp local-port 1991 ' + address
  334:         self.write_command(cmd)
  335:         reply = self.parse_reply()
  336:         self.assertEqual('reply', reply.command_name)
  337: 
  338:     def test_udp_v4(self):
  339:         'Test IPv4 UDP probes'
  340: 
  341:         test_basic_probe(self, 4, 'udp')
  342: 
  343:         self.udp_port_test('ip-4 127.0.0.1')
  344: 
  345:     @unittest.skipUnless(mtrpacket.HAVE_IPV6, 'No IPv6')
  346:     def test_udp_v6(self):
  347:         'Test IPv6 UDP probes'
  348: 
  349:         test_basic_probe(self, 6, 'udp')
  350: 
  351:         self.udp_port_test('ip-6 ::1')
  352: 
  353: 
  354: class TestProbeTCP(mtrpacket.MtrPacketTest):
  355:     'Test TCP probe support'
  356: 
  357:     def test_tcp_v4(self):
  358:         '''Test IPv4 TCP probes, with TTL expiration, to a refused port
  359:         and to an open port'''
  360: 
  361:         test_basic_probe(self, 4, 'tcp')
  362: 
  363:         if not check_feature(self, 'tcp'):
  364:             return
  365: 
  366:         #  Probe a local port assumed to be open  (ssh)
  367:         cmd = '80 send-probe ip-4 127.0.0.1 protocol tcp port 22'
  368:         self.write_command(cmd)
  369: 
  370:         reply = self.parse_reply()
  371:         self.assertEqual(reply.command_name, 'reply')
  372: 
  373:     @unittest.skipUnless(mtrpacket.HAVE_IPV6, 'No IPv6')
  374:     def test_tcp_v6(self):
  375:         'Test IPv6 TCP probes'
  376: 
  377:         test_basic_probe(self, 6, 'tcp')
  378: 
  379:         if not check_feature(self, 'tcp'):
  380:             return
  381: 
  382:         #  Probe a local port assumed to be open  (ssh)
  383:         cmd = '80 send-probe ip-6 ::1 protocol tcp port 22'
  384:         self.write_command(cmd)
  385: 
  386:         reply = self.parse_reply()
  387:         self.assertEqual(reply.command_name, 'reply')
  388: 
  389: 
  390: class TestProbeSCTP(mtrpacket.MtrPacketTest):
  391:     'Test SCTP probes'
  392: 
  393:     def test_sctp_v4(self):
  394:         'Test basic SCTP probes over IPv4'
  395: 
  396:         test_basic_probe(self, 4, 'sctp')
  397: 
  398:     @unittest.skipUnless(mtrpacket.HAVE_IPV6, 'No IPv6')
  399:     def test_sctp_v6(self):
  400:         'Test basic SCTP probes over IPv6'
  401: 
  402:         test_basic_probe(self, 6, 'sctp')
  403: 
  404: 
  405: if __name__ == '__main__':
  406:     mtrpacket.check_running_as_root()
  407:     unittest.main()

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>