Annotation of embedaddon/mtr/test/probe.py, revision 1.1.1.2
1.1 misho 1: #!/usr/bin/env python
2: #
3: # mtr -- a network diagnostic tool
4: # Copyright (C) 2016 Matt Kimball
5: #
6: # This program is free software; you can redistribute it and/or modify
7: # it under the terms of the GNU General Public License version 2 as
8: # published by the Free Software Foundation.
9: #
10: # This program is distributed in the hope that it will be useful,
11: # but WITHOUT ANY WARRANTY; without even the implied warranty of
12: # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13: # GNU General Public License for more details.
14: #
1.1.1.2 ! misho 15: # You should have received a copy of the GNU General Public License along
! 16: # with this program; if not, write to the Free Software Foundation, Inc.,
! 17: # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
1.1 misho 18: #
19:
20: '''Test sending probes and receiving respones.'''
21:
22: import socket
23: import sys
24: import time
25: import unittest
26:
27: import mtrpacket
28:
29:
30: def resolve_ipv6_address(hostname): # type: (str) -> str
31: 'Resolve a hostname to an IP version 6 address'
32:
33: for addrinfo in socket.getaddrinfo(hostname, 0):
34: # pylint: disable=locally-disabled, unused-variable
35: (family, socktype, proto, name, sockaddr) = addrinfo
36:
37: if family == socket.AF_INET6:
38: sockaddr6 = sockaddr # type: tuple
39:
40: (address, port, flow, scope) = sockaddr6
41: return address
42:
43: raise LookupError(hostname)
44:
45:
46: def check_feature(test, feature):
47: 'Check for support for a particular feature with mtr-packet'
48:
49: check_cmd = '70 check-support feature ' + feature
50: test.write_command(check_cmd)
51:
52: reply = test.parse_reply()
53: test.assertEqual(reply.command_name, 'feature-support')
54: test.assertIn('support', reply.argument)
55:
56: if reply.argument['support'] != 'ok':
57: return False
58:
59: return True
60:
61:
62: def test_basic_remote_probe(test, ip_version, protocol):
63: 'Test a probe to a remote host with a TTL of 1'
64:
65: protocol_str = 'protocol ' + protocol
66: if ip_version == 6:
67: address_str = 'ip-6 ' + resolve_ipv6_address(mtrpacket.IPV6_TEST_HOST)
68: elif ip_version == 4:
69: address_str = 'ip-4 8.8.8.8'
70: else:
71: raise ValueError(ip_version)
72:
73: cmd = '60 send-probe ' + \
74: protocol_str + ' ' + address_str + ' port 164 ttl 1'
75: test.write_command(cmd)
76:
77: reply = test.parse_reply()
78: test.assertEqual(reply.command_name, 'ttl-expired')
79:
80:
81: def test_basic_local_probe(test, ip_version, protocol):
82: 'Test a probe to a closed port on localhost'
83:
84: protocol_str = 'protocol ' + protocol
85: if ip_version == 6:
86: address_str = 'ip-6 ::1'
87: elif ip_version == 4:
88: address_str = 'ip-4 127.0.0.1'
89:
90: cmd = '61 send-probe ' + \
91: protocol_str + ' ' + address_str + ' port 164'
92: test.write_command(cmd)
93:
94: reply = test.parse_reply()
95: test.assertEqual(reply.command_name, 'reply')
96:
97: if ip_version == 6:
98: test.assertIn('ip-6', reply.argument)
99: test.assertEqual(reply.argument['ip-6'], '::1')
100: elif ip_version == 4:
101: test.assertIn('ip-4', reply.argument)
102: test.assertEqual(reply.argument['ip-4'], '127.0.0.1')
103:
104:
105: def test_basic_probe(test, ip_version, protocol):
106: # type: (mtrpacket.MtrPacketTest, int, unicode) -> None
107:
108: '''Test a probe with TTL expiration and a probe which reaches its
109: destination with a particular protocol.'''
110:
111: if not check_feature(test, protocol):
112: err_str = 'Skipping ' + protocol + ' test due to no support\n'
113: sys.stderr.write(err_str.encode('utf-8'))
114: return
115:
116: test_basic_remote_probe(test, ip_version, protocol)
117: test_basic_local_probe(test, ip_version, protocol)
118:
119:
120: class TestProbeICMPv4(mtrpacket.MtrPacketTest):
121: '''Test sending probes using IP version 4'''
122:
123: def test_probe(self):
124: 'Test sending regular ICMP probes to known addresses'
125:
126: # Probe Google's well-known DNS server and expect a reply
127: self.write_command('14 send-probe ip-4 8.8.8.8')
128: reply = self.parse_reply()
129: self.assertEqual(reply.token, 14)
130: self.assertEqual(reply.command_name, 'reply')
131: self.assertIn('ip-4', reply.argument)
132: self.assertEqual(reply.argument['ip-4'], '8.8.8.8')
133: self.assertIn('round-trip-time', reply.argument)
134:
135: def test_timeout(self):
136: 'Test timeouts when sending to a non-existant address'
137:
138: #
139: # Probe a non-existant address, and expect no reply
140: #
141: # I'm not sure what the best way to find an address that doesn't
142: # exist, but is still route-able. If we use a reserved IP
143: # address range, Windows will tell us it is non-routeable,
144: # rather than timing out when transmitting to that address.
145: #
146: # We're just using a currently unused address in Google's
147: # range instead. This is probably not the best solution.
148: #
149:
150: # pylint: disable=locally-disabled, unused-variable
151: for i in range(16):
152: self.write_command('15 send-probe ip-4 8.8.254.254 timeout 1')
153: reply = self.parse_reply()
154: self.assertEqual(reply.token, 15)
155: self.assertEqual(reply.command_name, 'no-reply')
156:
157: def test_exhaust_probes(self):
158: 'Test exhausting all available probes'
159:
160: probe_count = 4 * 1024
161: token = 1024
162:
163: # pylint: disable=locally-disabled, unused-variable
164: for i in range(probe_count):
165: command = str(token) + ' send-probe ip-4 8.8.254.254 timeout 60'
166: token += 1
167: self.write_command(command)
168:
169: reply = None
170: try:
171: reply = self.parse_reply(0)
172: except mtrpacket.ReadReplyTimeout:
173: pass
174:
175: if reply:
176: if reply.command_name == 'probes-exhausted':
177: break
178:
179: self.assertIsNotNone(reply)
180: self.assertEqual(reply.command_name, 'probes-exhausted')
181:
182: def test_timeout_values(self):
183: '''Test that timeout values wait the right amount of time
184:
185: Give each probe a half-second grace period to probe a timeout
186: reply after the expected timeout time.'''
187:
188: begin = time.time()
189: self.write_command('19 send-probe ip-4 8.8.254.254 timeout 0')
190: self.parse_reply()
191: elapsed = time.time() - begin
192: self.assertLess(elapsed, 0.5)
193:
194: begin = time.time()
195: self.write_command('20 send-probe ip-4 8.8.254.254 timeout 1')
196: self.parse_reply()
197: elapsed = time.time() - begin
198: self.assertGreaterEqual(elapsed, 0.9)
199: self.assertLess(elapsed, 1.5)
200:
201: begin = time.time()
202: self.write_command('21 send-probe ip-4 8.8.254.254 timeout 3')
203: self.parse_reply()
204: elapsed = time.time() - begin
205: self.assertGreaterEqual(elapsed, 2.9)
206: self.assertLess(elapsed, 3.5)
207:
208: def test_ttl_expired(self):
209: 'Test sending a probe which will have its time-to-live expire'
210:
1.1.1.2 ! misho 211: # Probe Google's DNS server, but give the probe only one hop
1.1 misho 212: # to live.
213: self.write_command('16 send-probe ip-4 8.8.8.8 ttl 1')
214: reply = self.parse_reply()
215: self.assertEqual(reply.command_name, 'ttl-expired')
216: self.assertIn('ip-4', reply.argument)
217: self.assertIn('round-trip-time', reply.argument)
218:
219: def test_parallel_probes(self):
220: '''Test sending multiple probes in parallel
221:
222: We will expect the probes to complete out-of-order by sending
1.1.1.2 ! misho 223: a probe to a distant host immediately followed by a probe to
1.1 misho 224: the local host.'''
225:
226: success_count = 0
227: loop_count = 32
228:
229: # pylint: disable=locally-disabled, unused-variable
230: for i in range(loop_count):
231: # Probe the distant host before the local host.
232: self.write_command('17 send-probe ip-4 8.8.8.8 timeout 1')
233: self.write_command('18 send-probe ip-4 127.0.0.1 timeout 1')
234:
235: reply = self.parse_reply()
236: if reply.command_name == 'no-reply':
237: continue
238:
239: self.assertEqual(reply.command_name, 'reply')
240: self.assertIn('ip-4', reply.argument)
241: self.assertEqual(reply.argument['ip-4'], '127.0.0.1')
242: self.assertIn('round-trip-time', reply.argument)
243: first_time = int(reply.argument['round-trip-time'])
244:
245: reply = self.parse_reply()
246: if reply.command_name == 'no-reply':
247: continue
248:
249: self.assertEqual(reply.command_name, 'reply')
250: self.assertIn('ip-4', reply.argument)
251: self.assertEqual(reply.argument['ip-4'], '8.8.8.8')
252: self.assertIn('round-trip-time', reply.argument)
253: second_time = int(reply.argument['round-trip-time'])
254:
255: # Ensure we got a reply from the host with the lowest latency
256: # first.
257: self.assertLess(first_time, second_time)
258:
259: success_count += 1
260:
261: # We need 90% success to pass. This allows a few probes to be
262: # occasionally dropped by the network without failing the test.
263: required_success = int(loop_count * 0.90)
264: self.assertGreaterEqual(success_count, required_success)
265:
266:
267: class TestProbeICMPv6(mtrpacket.MtrPacketTest):
268: '''Test sending probes using IP version 6'''
269:
270: def __init__(self, *args):
271: google_addr = resolve_ipv6_address(mtrpacket.IPV6_TEST_HOST)
272:
273: self.google_addr = google_addr # type: str
274:
275: super(TestProbeICMPv6, self).__init__(*args)
276:
277: @unittest.skipUnless(mtrpacket.HAVE_IPV6, 'No IPv6')
278: def test_probe(self):
279: "Test a probe to Google's public DNS server"
280:
281: # Probe Google's well-known DNS server and expect a reply
282: self.write_command('51 send-probe ip-6 ' + self.google_addr)
283: reply = self.parse_reply()
284: self.assertEqual(reply.command_name, 'reply')
285: self.assertIn('ip-6', reply.argument)
286: self.assertIn('round-trip-time', reply.argument)
287:
288: # Probe the loopback, and check the address we get a reply from is
289: # also the loopback. While implementing IPv6, I had a bug where
290: # the low bits of the received address got zeroed. This checks for
291: # that bug.
292: self.write_command('52 send-probe ip-6 ::1')
293: reply = self.parse_reply()
294: self.assertEqual(reply.command_name, 'reply')
295: self.assertIn('ip-6', reply.argument)
296: self.assertIn('round-trip-time', reply.argument)
297: self.assertEqual(reply.argument['ip-6'], '::1')
298:
299: @unittest.skipUnless(mtrpacket.HAVE_IPV6, 'No IPv6')
300: def test_ttl_expired(self):
301: 'Test sending a probe which will have its time-to-live expire'
302:
1.1.1.2 ! misho 303: # Probe Google's DNS server, but give the probe only one hop
1.1 misho 304: # to live.
305: cmd = '53 send-probe ip-6 ' + self.google_addr + ' ttl 1'
306: self.write_command(cmd)
307: reply = self.parse_reply()
308: self.assertEqual('ttl-expired', reply.command_name)
309: self.assertIn('ip-6', reply.argument)
310: self.assertIn('round-trip-time', reply.argument)
311:
312:
313: class TestProbeUDP(mtrpacket.MtrPacketTest):
314: 'Test transmitting probes using UDP'
315:
316: def udp_port_test(self, address): # type: (unicode) -> None
317: 'Test UDP probes with variations on source port and dest port'
318:
319: if not check_feature(self, 'udp'):
320: return
321:
322: cmd = '80 send-probe protocol udp ' + address
323: self.write_command(cmd)
324: reply = self.parse_reply()
325: self.assertEqual('reply', reply.command_name)
326:
327: cmd = '81 send-probe protocol udp port 990 ' + address
328: self.write_command(cmd)
329: reply = self.parse_reply()
330: self.assertEqual('reply', reply.command_name)
331:
332: cmd = '82 send-probe protocol udp local-port 1991 ' + address
333: self.write_command(cmd)
334: reply = self.parse_reply()
335: self.assertEqual('reply', reply.command_name)
336:
337: def test_udp_v4(self):
338: 'Test IPv4 UDP probes'
339:
340: test_basic_probe(self, 4, 'udp')
341:
342: self.udp_port_test('ip-4 127.0.0.1')
343:
344: @unittest.skipUnless(mtrpacket.HAVE_IPV6, 'No IPv6')
345: def test_udp_v6(self):
346: 'Test IPv6 UDP probes'
347:
348: test_basic_probe(self, 6, 'udp')
349:
350: self.udp_port_test('ip-6 ::1')
351:
352:
353: class TestProbeTCP(mtrpacket.MtrPacketTest):
354: 'Test TCP probe support'
355:
356: def test_tcp_v4(self):
357: '''Test IPv4 TCP probes, with TTL expiration, to a refused port
358: and to an open port'''
359:
360: test_basic_probe(self, 4, 'tcp')
361:
362: if not check_feature(self, 'tcp'):
363: return
364:
365: # Probe a local port assumed to be open (ssh)
366: cmd = '80 send-probe ip-4 127.0.0.1 protocol tcp port 22'
367: self.write_command(cmd)
368:
369: reply = self.parse_reply()
370: self.assertEqual(reply.command_name, 'reply')
371:
372: @unittest.skipUnless(mtrpacket.HAVE_IPV6, 'No IPv6')
373: def test_tcp_v6(self):
374: 'Test IPv6 TCP probes'
375:
376: test_basic_probe(self, 6, 'tcp')
377:
378: if not check_feature(self, 'tcp'):
379: return
380:
381: # Probe a local port assumed to be open (ssh)
382: cmd = '80 send-probe ip-6 ::1 protocol tcp port 22'
383: self.write_command(cmd)
384:
385: reply = self.parse_reply()
386: self.assertEqual(reply.command_name, 'reply')
387:
388:
389: class TestProbeSCTP(mtrpacket.MtrPacketTest):
390: 'Test SCTP probes'
391:
392: def test_sctp_v4(self):
393: 'Test basic SCTP probes over IPv4'
394:
395: test_basic_probe(self, 4, 'sctp')
396:
397: @unittest.skipUnless(mtrpacket.HAVE_IPV6, 'No IPv6')
398: def test_sctp_v6(self):
399: 'Test basic SCTP probes over IPv6'
400:
401: test_basic_probe(self, 6, 'sctp')
402:
403:
404: if __name__ == '__main__':
405: mtrpacket.check_running_as_root()
406: unittest.main()
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>