File:  [ELWIX - Embedded LightWeight unIX -] / embedaddon / mtr / test / mtrpacket.py
Revision 1.1.1.2 (vendor branch): download - view: text, annotated - select for diffs - revision graph
Wed Mar 17 00:07:30 2021 UTC (3 years, 3 months ago) by misho
Branches: mtr, MAIN
CVS tags: v0_95, v0_94, HEAD
mtr 0.94

    1: #
    2: #   mtr  --  a network diagnostic tool
    3: #   Copyright (C) 2016  Matt Kimball
    4: #
    5: #   This program is free software; you can redistribute it and/or modify
    6: #   it under the terms of the GNU General Public License version 2 as
    7: #   published by the Free Software Foundation.
    8: #
    9: #   This program is distributed in the hope that it will be useful,
   10: #   but WITHOUT ANY WARRANTY; without even the implied warranty of
   11: #   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   12: #   GNU General Public License for more details.
   13: #
   14: #   You should have received a copy of the GNU General Public License along
   15: #   with this program; if not, write to the Free Software Foundation, Inc.,
   16: #   51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
   17: #
   18: 
   19: '''Infrastructure for running tests which invoke mtr-packet.'''
   20: 
   21: import fcntl
   22: import os
   23: import select
   24: import socket
   25: import subprocess
   26: import sys
   27: import time
   28: import unittest
   29: 
   30: #
   31: #  typing is used for mypy type checking, but isn't required to run,
   32: #  so it's okay if we can't import it.
   33: #
   34: try:
   35:     # pylint: disable=locally-disabled, unused-import
   36:     from typing import Dict, List
   37: except ImportError:
   38:     pass
   39: 
   40: 
   41: IPV6_TEST_HOST = 'google-public-dns-a.google.com'
   42: 
   43: 
   44: class MtrPacketExecuteError(Exception):
   45:     "Exception raised when MtrPacketTest can't execute mtr-packet"
   46:     pass
   47: 
   48: 
   49: class ReadReplyTimeout(Exception):
   50:     'Exception raised by TestProbe.read_reply upon timeout'
   51: 
   52:     pass
   53: 
   54: 
   55: class WriteCommandTimeout(Exception):
   56:     'Exception raised by TestProbe.write_command upon timeout'
   57: 
   58:     pass
   59: 
   60: 
   61: class MtrPacketReplyParseError(Exception):
   62:     "Exception raised when MtrPacketReply can't parse the reply string"
   63: 
   64:     pass
   65: 
   66: 
   67: class PacketListenError(Exception):
   68:     'Exception raised when we have unexpected results from mtr-packet-listen'
   69: 
   70:     pass
   71: 
   72: 
   73: def set_nonblocking(file_descriptor):  # type: (int) -> None
   74:     'Put a file descriptor into non-blocking mode'
   75: 
   76:     flags = fcntl.fcntl(file_descriptor, fcntl.F_GETFL)
   77: 
   78:     # pylint: disable=locally-disabled, no-member
   79:     fcntl.fcntl(file_descriptor, fcntl.F_SETFL, flags | os.O_NONBLOCK)
   80: 
   81: 
   82: def check_for_local_ipv6():
   83:     '''Check for IPv6 support on the test host, to see if we should skip
   84:     the IPv6 tests'''
   85: 
   86:     addrinfo = socket.getaddrinfo(IPV6_TEST_HOST, 1, socket.AF_INET6)
   87:     if len(addrinfo):
   88:         addr = addrinfo[0][4]
   89: 
   90:     #  Create a UDP socket and check to see it can be connected to
   91:     #  IPV6_TEST_HOST.  (Connecting UDP requires no packets sent, just
   92:     #  a route present.)
   93:     sock = socket.socket(
   94:         socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
   95: 
   96:     connect_success = False
   97:     try:
   98:         sock.connect(addr)
   99:         connect_success = True
  100:     except socket.error:
  101:         pass
  102: 
  103:     sock.close()
  104: 
  105:     if not connect_success:
  106:         sys.stderr.write(
  107:             'This host has no IPv6.  Skipping IPv6 tests.\n')
  108: 
  109:     return connect_success
  110: 
  111: 
  112: HAVE_IPV6 = check_for_local_ipv6()
  113: 
  114: 
  115: # pylint: disable=locally-disabled, too-few-public-methods
  116: class MtrPacketReply(object):
  117:     'A parsed reply from mtr-packet'
  118: 
  119:     def __init__(self, reply):  # type: (unicode) -> None
  120:         self.token = 0  # type: int
  121:         self.command_name = None  # type: unicode
  122:         self.argument = {}  # type: Dict[unicode, unicode]
  123: 
  124:         self.parse_reply(reply)
  125: 
  126:     def parse_reply(self, reply):  # type (unicode) -> None
  127:         'Parses a reply string into members for the instance of this class'
  128: 
  129:         tokens = reply.split()  # type List[unicode]
  130: 
  131:         try:
  132:             self.token = int(tokens[0])
  133:             self.command_name = tokens[1]
  134:         except IndexError:
  135:             raise MtrPacketReplyParseError(reply)
  136: 
  137:         i = 2
  138:         while i < len(tokens):
  139:             try:
  140:                 name = tokens[i]
  141:                 value = tokens[i + 1]
  142:             except IndexError:
  143:                 raise MtrPacketReplyParseError(reply)
  144: 
  145:             self.argument[name] = value
  146:             i += 2
  147: 
  148: 
  149: class PacketListen(object):
  150:     'A test process which listens for a single packet'
  151: 
  152:     def __init__(self, *args):
  153:         self.process_args = list(args)  # type: List[unicode]
  154:         self.listen_process = None  # type: subprocess.Popen
  155:         self.attrib = None  # type: Dict[unicode, unicode]
  156: 
  157:     def __enter__(self):
  158:         try:
  159:             self.listen_process = subprocess.Popen(
  160:                 ['./mtr-packet-listen'] + self.process_args,
  161:                 stdin=subprocess.PIPE,
  162:                 stdout=subprocess.PIPE)
  163:         except OSError:
  164:             raise PacketListenError('unable to launch mtr-packet-listen')
  165: 
  166:         status = self.listen_process.stdout.readline().decode('utf-8')
  167:         if status != 'status listening\n':
  168:             raise PacketListenError('unexpected status')
  169: 
  170:         return self
  171: 
  172:     def __exit__(self, exc_type, exc_value, traceback):
  173:         self.wait_for_exit()
  174: 
  175:         self.attrib = {}
  176:         for line in self.listen_process.stdout.readlines():
  177:             tokens = line.decode('utf-8').split()
  178: 
  179:             if len(tokens) >= 2:
  180:                 name = tokens[0]
  181:                 value = tokens[1]
  182: 
  183:                 self.attrib[name] = value
  184: 
  185:         self.listen_process.stdin.close()
  186:         self.listen_process.stdout.close()
  187: 
  188:     def wait_for_exit(self):
  189:         '''Poll the subprocess for up to ten seconds, until it exits.
  190: 
  191:         We need to wait for its exit to ensure we are able to read its
  192:         output.'''
  193: 
  194:         wait_time = 10
  195:         wait_step = 0.1
  196: 
  197:         steps = int(wait_time / wait_step)
  198: 
  199:         exit_value = None
  200: 
  201:         # pylint: disable=locally-disabled, unused-variable
  202:         for i in range(steps):
  203:             exit_value = self.listen_process.poll()
  204:             if exit_value is not None:
  205:                 break
  206: 
  207:             time.sleep(wait_step)
  208: 
  209:         if exit_value is None:
  210:             raise PacketListenError('mtr-packet-listen timeout')
  211: 
  212:         if exit_value != 0:
  213:             raise PacketListenError('mtr-packet-listen unexpected error')
  214: 
  215: 
  216: class MtrPacketTest(unittest.TestCase):
  217:     '''Base class for tests invoking mtr-packet.
  218: 
  219:     Start a new mtr-packet subprocess for each test, and kill it
  220:     at the conclusion of the test.
  221: 
  222:     Provide methods for writing commands and reading replies.
  223:     '''
  224: 
  225:     def __init__(self, *args):
  226:         self.reply_buffer = None  # type: unicode
  227:         self.packet_process = None  # type: subprocess.Popen
  228:         self.stdout_fd = None  # type: int
  229: 
  230:         super(MtrPacketTest, self).__init__(*args)
  231: 
  232:     def setUp(self):
  233:         'Set up a test case by spawning a mtr-packet process'
  234: 
  235:         packet_path = os.environ.get('MTR_PACKET', './mtr-packet')
  236: 
  237:         self.reply_buffer = ''
  238:         try:
  239:             self.packet_process = subprocess.Popen(
  240:                 [packet_path],
  241:                 stdin=subprocess.PIPE,
  242:                 stdout=subprocess.PIPE)
  243:         except OSError:
  244:             raise MtrPacketExecuteError(packet_path)
  245: 
  246:         #  Put the mtr-packet process's stdout in non-blocking mode
  247:         #  so that we can read from it without a timeout when
  248:         #  no reply is available.
  249:         self.stdout_fd = self.packet_process.stdout.fileno()
  250:         set_nonblocking(self.stdout_fd)
  251: 
  252:         self.stdin_fd = self.packet_process.stdin.fileno()
  253:         set_nonblocking(self.stdin_fd)
  254: 
  255:     def tearDown(self):
  256:         'After a test, kill the running mtr-packet instance'
  257: 
  258:         self.packet_process.stdin.close()
  259:         self.packet_process.stdout.close()
  260: 
  261:         try:
  262:             self.packet_process.kill()
  263:         except OSError:
  264:             return
  265: 
  266:     def parse_reply(self, timeout=10.0):  # type: (float) -> MtrPacketReply
  267:         '''Read the next reply from mtr-packet and parse it into
  268:         an MtrPacketReply object.'''
  269: 
  270:         reply_str = self.read_reply(timeout)
  271: 
  272:         return MtrPacketReply(reply_str)
  273: 
  274:     def read_reply(self, timeout=10.0):  # type: (float) -> unicode
  275:         '''Read the next reply from mtr-packet.
  276: 
  277:         Attempt to read the next command reply from mtr-packet.  If no reply
  278:         is available withing the timeout time, raise ReadReplyTimeout
  279:         instead.'''
  280: 
  281:         start_time = time.time()
  282: 
  283:         #  Read from mtr-packet until either the timeout time has elapsed
  284:         #  or we read a newline character, which indicates a finished
  285:         #  reply.
  286:         while True:
  287:             now = time.time()
  288:             elapsed = now - start_time
  289: 
  290:             select_time = timeout - elapsed
  291:             if select_time < 0:
  292:                 select_time = 0
  293: 
  294:             select.select([self.stdout_fd], [], [], select_time)
  295: 
  296:             reply_bytes = None
  297: 
  298:             try:
  299:                 reply_bytes = os.read(self.stdout_fd, 1024)
  300:             except OSError:
  301:                 pass
  302: 
  303:             if reply_bytes:
  304:                 self.reply_buffer += reply_bytes.decode('utf-8')
  305: 
  306:             #  If we have read a newline character, we can stop waiting
  307:             #  for more input.
  308:             newline_ix = self.reply_buffer.find('\n')
  309:             if newline_ix != -1:
  310:                 break
  311: 
  312:             if elapsed >= timeout:
  313:                 raise ReadReplyTimeout()
  314: 
  315:         reply = self.reply_buffer[:newline_ix]
  316:         self.reply_buffer = self.reply_buffer[newline_ix + 1:]
  317:         return reply
  318: 
  319:     def write_command(self, cmd, timeout=10.0):
  320:         # type: (unicode, float) -> None
  321: 
  322:         '''Send a command string to the mtr-packet instance, timing out
  323:         if we are unable to write for an extended period of time.  The
  324:         timeout is to avoid deadlocks with the child process where both
  325:         the parent and the child are writing to their end of the pipe
  326:         and expecting the other end to be reading.'''
  327: 
  328:         command_str = cmd + '\n'
  329:         command_bytes = command_str.encode('utf-8')
  330: 
  331:         start_time = time.time()
  332: 
  333:         while True:
  334:             now = time.time()
  335:             elapsed = now - start_time
  336: 
  337:             select_time = timeout - elapsed
  338:             if select_time < 0:
  339:                 select_time = 0
  340: 
  341:             select.select([], [self.stdin_fd], [], select_time)
  342: 
  343:             bytes_written = 0
  344:             try:
  345:                 bytes_written = os.write(self.stdin_fd, command_bytes)
  346:             except OSError:
  347:                 pass
  348: 
  349:             command_bytes = command_bytes[bytes_written:]
  350:             if not len(command_bytes):
  351:                 break
  352: 
  353:             if elapsed >= timeout:
  354:                 raise WriteCommandTimeout()
  355: 
  356: 
  357: def check_running_as_root():
  358:     'Print a warning to stderr if we are not running as root.'
  359: 
  360:     # pylint: disable=locally-disabled, no-member
  361:     if sys.platform != 'cygwin' and os.getuid() > 0:
  362:         sys.stderr.write(
  363:             'Warning: many tests require running as root\n')

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>