From 9a0cd1d01275afd1c20d7fdc3226b88344cd4daa Mon Sep 17 00:00:00 2001 From: Slawek Kaplonski Date: Fri, 28 Feb 2025 16:24:26 +0100 Subject: [PATCH] Run iperf3 client (and server) in background This patch implements possibility to run iperf3 client and server as external processes in background. Client can be run on the local machine where Tobiko is run or, if ssh_client is given it will run it on the remote machine. For the iperf3 server, this one will be run on the remote machine if the ``iperf3_server_ssh_client`` is given. If not, iperf3 server is expected to be run on the destination to perform tests correctly. Additionally 2 new config options for the RHOSO topologies are added: * max_traffic_break_allowed - to specify longest allowed single break in the traffic tested with iperf3, * max_total_breaks_allowed - to specify total allowed breaks time in the traffic tested with iperf3. Related: #TOBIKO-128 Change-Id: I459e8505e8ccc75caa1c9e3e0955e94ea38c4352 --- tobiko/rhosp/config.py | 17 ++ tobiko/shell/iperf3/__init__.py | 6 + tobiko/shell/iperf3/_execute.py | 255 ++++++++++++++++++++++++++++- tobiko/shell/iperf3/_interface.py | 36 +++- tobiko/shell/iperf3/_parameters.py | 25 ++- 5 files changed, 328 insertions(+), 11 deletions(-) diff --git a/tobiko/rhosp/config.py b/tobiko/rhosp/config.py index 1e523d1bd..be8b77614 100644 --- a/tobiko/rhosp/config.py +++ b/tobiko/rhosp/config.py @@ -70,6 +70,23 @@ RHOSP_OPTIONS = [ default=False, deprecated_group=TRIPLEO_GROUP_NAME, help="whether Ceph RGW is deployed"), + + # Background connectivity related settings: + cfg.IntOpt('max_traffic_break_allowed', + default=0, + help="longest allowed single break time during the background " + "connectivity tests like e.g. those using iperf3 " + "(in seconds)"), + cfg.IntOpt('max_total_breaks_allowed', + default=0, + help="longest allowed total break time during the background " + "connectivity tests like e.g. those using iperf3. " + "This option represents total time when connetion " + "was not working. " + "For example it could be: not working for 3 seconds, " + "then working for 60 seconds and then again not working " + "for another 10 seconds. In such case this total break " + "time would be 13 seconds."), ] TRIPLEO_OPTIONS = [ diff --git a/tobiko/shell/iperf3/__init__.py b/tobiko/shell/iperf3/__init__.py index ff840fc80..7d3e1cee5 100644 --- a/tobiko/shell/iperf3/__init__.py +++ b/tobiko/shell/iperf3/__init__.py @@ -16,6 +16,12 @@ from __future__ import absolute_import from tobiko.shell.iperf3 import _assert +from tobiko.shell.iperf3 import _execute assert_has_bandwith_limits = _assert.assert_has_bandwith_limits +execute_iperf3_client_in_background = \ + _execute.execute_iperf3_client_in_background +check_iperf3_client_results = _execute.check_iperf3_client_results +iperf3_client_alive = _execute.iperf3_client_alive +stop_iperf3_client = _execute.stop_iperf3_client diff --git a/tobiko/shell/iperf3/_execute.py b/tobiko/shell/iperf3/_execute.py index 540f00eec..8667c5dab 100644 --- a/tobiko/shell/iperf3/_execute.py +++ b/tobiko/shell/iperf3/_execute.py @@ -17,21 +17,79 @@ from __future__ import absolute_import from __future__ import division import json +import os import typing import netaddr from oslo_log import log import tobiko +from tobiko import config from tobiko.shell.iperf3 import _interface from tobiko.shell.iperf3 import _parameters from tobiko.shell import sh from tobiko.shell import ssh +CONF = config.CONF LOG = log.getLogger(__name__) +def _get_filepath(address: typing.Union[str, netaddr.IPAddress], + path: str, + ssh_client: ssh.SSHClientType = None) -> str: + if ssh_client: + final_dir = _get_remote_filepath(path, ssh_client) + else: + final_dir = _get_local_filepath(path) + filename = f'iperf_{address}.log' + return os.path.join(final_dir, filename) + + +def _get_local_filepath(path: str) -> str: + final_dir_path = f'{sh.get_user_home_dir()}/{path}' + if not os.path.exists(final_dir_path): + os.makedirs(final_dir_path) + return final_dir_path + + +def _get_remote_filepath(path: str, + ssh_client: ssh.SSHClientType) -> str: + homedir = sh.execute('echo ~', ssh_client=ssh_client).stdout.rstrip() + final_dir_path = f'{homedir}/{path}' + sh.execute(f'/usr/bin/mkdir -p {final_dir_path}', + ssh_client=ssh_client) + return final_dir_path + + +def _truncate_iperf3_client_logfile( + logfile: str, + ssh_client: ssh.SSHClientType = None) -> None: + if ssh_client: + _truncate_remote_logfile(logfile, ssh_client) + else: + tobiko.truncate_logfile(logfile) + + +def _truncate_remote_logfile(logfile: str, + ssh_client: ssh.SSHClientType) -> None: + truncated_logfile = tobiko.get_truncated_filename(logfile) + sh.execute(f'/usr/bin/mv {logfile} {truncated_logfile}', + ssh_client=ssh_client) + + +def _remove_old_logfile(logfile: str, + ssh_client: ssh.SSHClientType = None): + if ssh_client: + sh.execute(f'/usr/bin/rm -f {logfile}', + ssh_client=ssh_client) + else: + try: + os.remove(logfile) + except FileNotFoundError: + pass + + def get_bandwidth(address: typing.Union[str, netaddr.IPAddress], bitrate: int = None, download: bool = None, @@ -68,21 +126,202 @@ def execute_iperf3_client(address: typing.Union[str, netaddr.IPAddress], port: int = None, protocol: str = None, ssh_client: ssh.SSHClientType = None, - timeout: tobiko.Seconds = None) \ + timeout: tobiko.Seconds = None, + logfile: str = None, + run_in_background: bool = False) \ -> typing.Dict: params_timeout: typing.Optional[int] = None - if timeout is not None: + if run_in_background: + params_timeout = 0 + elif timeout is not None: params_timeout = int(timeout - 0.5) - parameters = _parameters.iperf3_client_parameters(address=address, - bitrate=bitrate, - download=download, - port=port, - protocol=protocol, - timeout=params_timeout) + parameters = _parameters.iperf3_client_parameters( + address=address, bitrate=bitrate, + download=download, port=port, protocol=protocol, + timeout=params_timeout, logfile=logfile) command = _interface.get_iperf3_client_command(parameters) # output is a dictionary + if run_in_background: + process = sh.process(command, ssh_client=ssh_client) + process.execute() + return {} output = sh.execute(command, ssh_client=ssh_client, timeout=timeout).stdout return json.loads(output) + + +def execute_iperf3_client_in_background( + address: typing.Union[str, netaddr.IPAddress], # noqa; pylint: disable=W0613 + bitrate: int = None, + download: bool = None, + port: int = None, + protocol: str = None, + ssh_client: ssh.SSHClientType = None, + iperf3_server_ssh_client: ssh.SSHClientType = None, + output_dir: str = 'tobiko_iperf_results', + **kwargs) -> None: + output_path = _get_filepath(address, output_dir, ssh_client) + LOG.info(f'starting iperf3 client process to > {address} , ' + f'output file is : {output_path}') + # just in case there is some leftover file from previous run, + # it needs to be removed, otherwise iperf will append new log + # to the end of the existing file and this will make json output + # file to be malformed + _remove_old_logfile(output_path, ssh_client=ssh_client) + # If there is ssh client for the server where iperf3 server is going + # to run, lets make sure it is started fresh as e.g. in case of + # failure in the previous run, it may report that is still "busy" thus + # iperf3 client will not start properly + if iperf3_server_ssh_client: + _stop_iperf3_server( + port=port, protocol=protocol, + ssh_client=iperf3_server_ssh_client) + _start_iperf3_server( + port=port, protocol=protocol, + ssh_client=iperf3_server_ssh_client) + + if not _iperf3_server_alive( + port=port, protocol=protocol, + ssh_client=iperf3_server_ssh_client): + testcase = tobiko.get_test_case() + testcase.fail('iperf3 server did not start properly ' + f'on the server {iperf3_server_ssh_client}') + + # Now, finally iperf3 client should be ready to start + execute_iperf3_client( + address=address, + bitrate=bitrate, + download=download, + port=port, + protocol=protocol, + ssh_client=ssh_client, + logfile=output_path, + run_in_background=True) + + +def _get_iperf3_pid( + address: typing.Union[str, netaddr.IPAddress, None] = None, + port: int = None, + protocol: str = None, + ssh_client: ssh.SSHClientType = None) -> typing.Union[int, None]: + try: + iperf_pids = sh.execute( + 'pidof iperf3', ssh_client=ssh_client).stdout.rstrip().split(" ") + except sh.ShellCommandFailed: + return None + for iperf_pid in iperf_pids: + proc_cmdline = sh.get_command_line( + iperf_pid, + ssh_client=ssh_client) + if address and str(address) in proc_cmdline: + # This is looking for the iperf client instance + return int(iperf_pid) + elif port and protocol: + # By looking for port and protocol we are looking + # for the iperf3 server's PID + if "-s" in proc_cmdline and f"-p {port}" in proc_cmdline: + if ((protocol.lower() == 'udp' and "-u" in proc_cmdline) or + (protocol.lower() == 'tcp' and + '-u' not in proc_cmdline)): + return int(iperf_pid) + return None + + +def check_iperf3_client_results(address: typing.Union[str, netaddr.IPAddress], + output_dir: str = 'tobiko_iperf_results', + ssh_client: ssh.SSHClientType = None, + **kwargs): # noqa; pylint: disable=W0613 + # This function expects that the result file is available locally already + # + logfile = _get_filepath(address, output_dir, ssh_client) + try: + iperf_log_raw = sh.execute( + f"cat {logfile}", ssh_client=ssh_client).stdout + except sh.ShellCommandFailed as err: + if config.is_prevent_create(): + # Tobiko is not expected to create resources in this run + # so iperf should be already running and log file should + # be already there, if it is not, it should fail + tobiko.fail('Failed to read iperf log from the file. ' + f'Server IP address: {address}; Logfile: {logfile}') + else: + # Tobiko is creating resources so it is normal that file was not + # there yet + LOG.debug(f'Failed to read iperf log from the file. ' + f'Error: {err}') + return + + iperf_log = json.loads(iperf_log_raw) + longest_break = 0 # seconds + breaks_total = 0 # seconds + current_break = 0 # seconds + intervals = iperf_log.get("intervals") + if not intervals: + tobiko.fail(f"No intervals data found in {logfile}") + for interval in intervals: + if interval["sum"]["bytes"] == 0: + interval_duration = ( + interval["sum"]["end"] - interval["sum"]["start"]) + current_break += interval_duration + if current_break > longest_break: + longest_break = current_break + breaks_total += interval_duration + else: + current_break = 0 + + _truncate_iperf3_client_logfile(logfile, ssh_client) + + testcase = tobiko.get_test_case() + testcase.assertLessEqual(longest_break, + CONF.tobiko.rhosp.max_traffic_break_allowed) + testcase.assertLessEqual(breaks_total, + CONF.tobiko.rhosp.max_total_breaks_allowed) + + +def iperf3_client_alive(address: typing.Union[str, netaddr.IPAddress], # noqa; pylint: disable=W0613 + ssh_client: ssh.SSHClientType = None, + **kwargs) -> bool: + return bool(_get_iperf3_pid(address=address, ssh_client=ssh_client)) + + +def stop_iperf3_client(address: typing.Union[str, netaddr.IPAddress], + ssh_client: ssh.SSHClientType = None, + **kwargs): # noqa; pylint: disable=W0613 + pid = _get_iperf3_pid(address=address, ssh_client=ssh_client) + if pid: + LOG.info(f'iperf3 client process to > {address} already running ' + f'with PID: {pid}') + sh.execute(f'sudo kill {pid}', ssh_client=ssh_client) + + +def _start_iperf3_server( + port: typing.Union[int, None], + protocol: typing.Union[str, None], + ssh_client: ssh.SSHClientType): + parameters = _parameters.iperf3_server_parameters( + port=port, protocol=protocol) + command = _interface.get_iperf3_server_command(parameters) + process = sh.process(command, ssh_client=ssh_client) + process.execute() + + +def _iperf3_server_alive( + port: typing.Union[int, None], + protocol: typing.Union[str, None], + ssh_client: ssh.SSHClientType = None) -> bool: + return bool( + _get_iperf3_pid(port=port, protocol=protocol, + ssh_client=ssh_client)) + + +def _stop_iperf3_server( + port: typing.Union[int, None], + protocol: typing.Union[str, None], + ssh_client: ssh.SSHClientType = None): + pid = _get_iperf3_pid(port=port, protocol=protocol, ssh_client=ssh_client) + if pid: + LOG.info(f'iperf3 server listening on the {protocol} port: {port} ' + f'is already running with PID: {pid}') + sh.execute(f'sudo kill {pid}', ssh_client=ssh_client) diff --git a/tobiko/shell/iperf3/_interface.py b/tobiko/shell/iperf3/_interface.py index f29ca1beb..711e3fd77 100644 --- a/tobiko/shell/iperf3/_interface.py +++ b/tobiko/shell/iperf3/_interface.py @@ -29,6 +29,11 @@ def get_iperf3_client_command(parameters: _parameters.Iperf3ClientParameters): return interface.get_iperf3_client_command(parameters) +def get_iperf3_server_command(parameters: _parameters.Iperf3ServerParameters): + interface = Iperf3Interface() + return interface.get_iperf3_server_command(parameters) + + class Iperf3Interface: def get_iperf3_client_command( @@ -38,6 +43,13 @@ class Iperf3Interface: options = self.get_iperf3_client_options(parameters=parameters) return sh.shell_command('iperf3') + options + def get_iperf3_server_command( + self, + parameters: _parameters.Iperf3ServerParameters) \ + -> sh.ShellCommand: + options = self.get_iperf3_server_options(parameters=parameters) + return sh.shell_command('iperf3') + options + def get_iperf3_client_options( self, parameters: _parameters.Iperf3ClientParameters) \ @@ -54,6 +66,20 @@ class Iperf3Interface: options += self.get_download_option(parameters.download) if parameters.protocol is not None: options += self.get_protocol_option(parameters.protocol) + if parameters.logfile is not None: + options += self.get_logfile_option(parameters.logfile) + return options + + def get_iperf3_server_options( + self, + parameters: _parameters.Iperf3ServerParameters) \ + -> sh.ShellCommand: + options = sh.ShellCommand(['-J']) + options += self.get_server_mode_option() + if parameters.port is not None: + options += self.get_port_option(parameters.port) + if parameters.protocol is not None: + options += self.get_protocol_option(parameters.protocol) return options @staticmethod @@ -64,6 +90,10 @@ class Iperf3Interface: def get_client_mode_option(server_address: str): return ['-c', server_address] + @staticmethod + def get_server_mode_option(): + return ["-s"] + @staticmethod def get_download_option(download: bool): if download: @@ -82,7 +112,7 @@ class Iperf3Interface: @staticmethod def get_timeout_option(timeout: int): - if timeout > 0: + if timeout >= 0: return ['-t', timeout] else: return [] @@ -90,3 +120,7 @@ class Iperf3Interface: @staticmethod def get_port_option(port): return ['-p', port] + + @staticmethod + def get_logfile_option(logfile): + return ['--logfile', logfile] diff --git a/tobiko/shell/iperf3/_parameters.py b/tobiko/shell/iperf3/_parameters.py index fc7f8410c..558ba614a 100644 --- a/tobiko/shell/iperf3/_parameters.py +++ b/tobiko/shell/iperf3/_parameters.py @@ -29,6 +29,12 @@ class Iperf3ClientParameters(typing.NamedTuple): port: typing.Optional[int] = None protocol: typing.Optional[str] = None timeout: typing.Optional[int] = None + logfile: typing.Optional[str] = None + + +class Iperf3ServerParameters(typing.NamedTuple): + port: typing.Optional[int] = None + protocol: typing.Optional[str] = None def iperf3_client_parameters( @@ -37,7 +43,8 @@ def iperf3_client_parameters( download: bool = None, port: int = None, protocol: str = None, - timeout: int = None): + timeout: int = None, + logfile: str = None): """Get iperf3 client parameters mode allowed values: client or server ip is only needed for client mode @@ -60,4 +67,18 @@ def iperf3_client_parameters( download=download, port=port, protocol=protocol, - timeout=timeout) + timeout=timeout, + logfile=logfile) + + +def iperf3_server_parameters( + port: int = None, protocol: str = None) -> Iperf3ServerParameters: + """Get iperf3 server parameters + """ + config = tobiko.tobiko_config().iperf3 + if port is None: + port = config.port + if protocol is None: + protocol = config.protocol + return Iperf3ServerParameters(port=port, + protocol=protocol)