Merge "Run iperf3 client (and server) in background"

This commit is contained in:
Zuul 2025-03-10 10:56:35 +00:00 committed by Gerrit Code Review
commit 3d9c3f9b47
5 changed files with 328 additions and 11 deletions

View File

@ -70,6 +70,23 @@ RHOSP_OPTIONS = [
default=False,
deprecated_group=TRIPLEO_GROUP_NAME,
help="whether Ceph RGW is deployed"),
# Background connectivity related settings:
cfg.IntOpt('max_traffic_break_allowed',
default=0,
help="longest allowed single break time during the background "
"connectivity tests like e.g. those using iperf3 "
"(in seconds)"),
cfg.IntOpt('max_total_breaks_allowed',
default=0,
help="longest allowed total break time during the background "
"connectivity tests like e.g. those using iperf3. "
"This option represents total time when connetion "
"was not working. "
"For example it could be: not working for 3 seconds, "
"then working for 60 seconds and then again not working "
"for another 10 seconds. In such case this total break "
"time would be 13 seconds."),
]
TRIPLEO_OPTIONS = [

View File

@ -16,6 +16,12 @@
from __future__ import absolute_import
from tobiko.shell.iperf3 import _assert
from tobiko.shell.iperf3 import _execute
assert_has_bandwith_limits = _assert.assert_has_bandwith_limits
execute_iperf3_client_in_background = \
_execute.execute_iperf3_client_in_background
check_iperf3_client_results = _execute.check_iperf3_client_results
iperf3_client_alive = _execute.iperf3_client_alive
stop_iperf3_client = _execute.stop_iperf3_client

View File

@ -17,21 +17,79 @@ from __future__ import absolute_import
from __future__ import division
import json
import os
import typing
import netaddr
from oslo_log import log
import tobiko
from tobiko import config
from tobiko.shell.iperf3 import _interface
from tobiko.shell.iperf3 import _parameters
from tobiko.shell import sh
from tobiko.shell import ssh
CONF = config.CONF
LOG = log.getLogger(__name__)
def _get_filepath(address: typing.Union[str, netaddr.IPAddress],
path: str,
ssh_client: ssh.SSHClientType = None) -> str:
if ssh_client:
final_dir = _get_remote_filepath(path, ssh_client)
else:
final_dir = _get_local_filepath(path)
filename = f'iperf_{address}.log'
return os.path.join(final_dir, filename)
def _get_local_filepath(path: str) -> str:
final_dir_path = f'{sh.get_user_home_dir()}/{path}'
if not os.path.exists(final_dir_path):
os.makedirs(final_dir_path)
return final_dir_path
def _get_remote_filepath(path: str,
ssh_client: ssh.SSHClientType) -> str:
homedir = sh.execute('echo ~', ssh_client=ssh_client).stdout.rstrip()
final_dir_path = f'{homedir}/{path}'
sh.execute(f'/usr/bin/mkdir -p {final_dir_path}',
ssh_client=ssh_client)
return final_dir_path
def _truncate_iperf3_client_logfile(
logfile: str,
ssh_client: ssh.SSHClientType = None) -> None:
if ssh_client:
_truncate_remote_logfile(logfile, ssh_client)
else:
tobiko.truncate_logfile(logfile)
def _truncate_remote_logfile(logfile: str,
ssh_client: ssh.SSHClientType) -> None:
truncated_logfile = tobiko.get_truncated_filename(logfile)
sh.execute(f'/usr/bin/mv {logfile} {truncated_logfile}',
ssh_client=ssh_client)
def _remove_old_logfile(logfile: str,
ssh_client: ssh.SSHClientType = None):
if ssh_client:
sh.execute(f'/usr/bin/rm -f {logfile}',
ssh_client=ssh_client)
else:
try:
os.remove(logfile)
except FileNotFoundError:
pass
def get_bandwidth(address: typing.Union[str, netaddr.IPAddress],
bitrate: int = None,
download: bool = None,
@ -68,21 +126,202 @@ def execute_iperf3_client(address: typing.Union[str, netaddr.IPAddress],
port: int = None,
protocol: str = None,
ssh_client: ssh.SSHClientType = None,
timeout: tobiko.Seconds = None) \
timeout: tobiko.Seconds = None,
logfile: str = None,
run_in_background: bool = False) \
-> typing.Dict:
params_timeout: typing.Optional[int] = None
if timeout is not None:
if run_in_background:
params_timeout = 0
elif timeout is not None:
params_timeout = int(timeout - 0.5)
parameters = _parameters.iperf3_client_parameters(address=address,
bitrate=bitrate,
download=download,
port=port,
protocol=protocol,
timeout=params_timeout)
parameters = _parameters.iperf3_client_parameters(
address=address, bitrate=bitrate,
download=download, port=port, protocol=protocol,
timeout=params_timeout, logfile=logfile)
command = _interface.get_iperf3_client_command(parameters)
# output is a dictionary
if run_in_background:
process = sh.process(command, ssh_client=ssh_client)
process.execute()
return {}
output = sh.execute(command,
ssh_client=ssh_client,
timeout=timeout).stdout
return json.loads(output)
def execute_iperf3_client_in_background(
address: typing.Union[str, netaddr.IPAddress], # noqa; pylint: disable=W0613
bitrate: int = None,
download: bool = None,
port: int = None,
protocol: str = None,
ssh_client: ssh.SSHClientType = None,
iperf3_server_ssh_client: ssh.SSHClientType = None,
output_dir: str = 'tobiko_iperf_results',
**kwargs) -> None:
output_path = _get_filepath(address, output_dir, ssh_client)
LOG.info(f'starting iperf3 client process to > {address} , '
f'output file is : {output_path}')
# just in case there is some leftover file from previous run,
# it needs to be removed, otherwise iperf will append new log
# to the end of the existing file and this will make json output
# file to be malformed
_remove_old_logfile(output_path, ssh_client=ssh_client)
# If there is ssh client for the server where iperf3 server is going
# to run, lets make sure it is started fresh as e.g. in case of
# failure in the previous run, it may report that is still "busy" thus
# iperf3 client will not start properly
if iperf3_server_ssh_client:
_stop_iperf3_server(
port=port, protocol=protocol,
ssh_client=iperf3_server_ssh_client)
_start_iperf3_server(
port=port, protocol=protocol,
ssh_client=iperf3_server_ssh_client)
if not _iperf3_server_alive(
port=port, protocol=protocol,
ssh_client=iperf3_server_ssh_client):
testcase = tobiko.get_test_case()
testcase.fail('iperf3 server did not start properly '
f'on the server {iperf3_server_ssh_client}')
# Now, finally iperf3 client should be ready to start
execute_iperf3_client(
address=address,
bitrate=bitrate,
download=download,
port=port,
protocol=protocol,
ssh_client=ssh_client,
logfile=output_path,
run_in_background=True)
def _get_iperf3_pid(
address: typing.Union[str, netaddr.IPAddress, None] = None,
port: int = None,
protocol: str = None,
ssh_client: ssh.SSHClientType = None) -> typing.Union[int, None]:
try:
iperf_pids = sh.execute(
'pidof iperf3', ssh_client=ssh_client).stdout.rstrip().split(" ")
except sh.ShellCommandFailed:
return None
for iperf_pid in iperf_pids:
proc_cmdline = sh.get_command_line(
iperf_pid,
ssh_client=ssh_client)
if address and str(address) in proc_cmdline:
# This is looking for the iperf client instance
return int(iperf_pid)
elif port and protocol:
# By looking for port and protocol we are looking
# for the iperf3 server's PID
if "-s" in proc_cmdline and f"-p {port}" in proc_cmdline:
if ((protocol.lower() == 'udp' and "-u" in proc_cmdline) or
(protocol.lower() == 'tcp' and
'-u' not in proc_cmdline)):
return int(iperf_pid)
return None
def check_iperf3_client_results(address: typing.Union[str, netaddr.IPAddress],
output_dir: str = 'tobiko_iperf_results',
ssh_client: ssh.SSHClientType = None,
**kwargs): # noqa; pylint: disable=W0613
# This function expects that the result file is available locally already
#
logfile = _get_filepath(address, output_dir, ssh_client)
try:
iperf_log_raw = sh.execute(
f"cat {logfile}", ssh_client=ssh_client).stdout
except sh.ShellCommandFailed as err:
if config.is_prevent_create():
# Tobiko is not expected to create resources in this run
# so iperf should be already running and log file should
# be already there, if it is not, it should fail
tobiko.fail('Failed to read iperf log from the file. '
f'Server IP address: {address}; Logfile: {logfile}')
else:
# Tobiko is creating resources so it is normal that file was not
# there yet
LOG.debug(f'Failed to read iperf log from the file. '
f'Error: {err}')
return
iperf_log = json.loads(iperf_log_raw)
longest_break = 0 # seconds
breaks_total = 0 # seconds
current_break = 0 # seconds
intervals = iperf_log.get("intervals")
if not intervals:
tobiko.fail(f"No intervals data found in {logfile}")
for interval in intervals:
if interval["sum"]["bytes"] == 0:
interval_duration = (
interval["sum"]["end"] - interval["sum"]["start"])
current_break += interval_duration
if current_break > longest_break:
longest_break = current_break
breaks_total += interval_duration
else:
current_break = 0
_truncate_iperf3_client_logfile(logfile, ssh_client)
testcase = tobiko.get_test_case()
testcase.assertLessEqual(longest_break,
CONF.tobiko.rhosp.max_traffic_break_allowed)
testcase.assertLessEqual(breaks_total,
CONF.tobiko.rhosp.max_total_breaks_allowed)
def iperf3_client_alive(address: typing.Union[str, netaddr.IPAddress], # noqa; pylint: disable=W0613
ssh_client: ssh.SSHClientType = None,
**kwargs) -> bool:
return bool(_get_iperf3_pid(address=address, ssh_client=ssh_client))
def stop_iperf3_client(address: typing.Union[str, netaddr.IPAddress],
ssh_client: ssh.SSHClientType = None,
**kwargs): # noqa; pylint: disable=W0613
pid = _get_iperf3_pid(address=address, ssh_client=ssh_client)
if pid:
LOG.info(f'iperf3 client process to > {address} already running '
f'with PID: {pid}')
sh.execute(f'sudo kill {pid}', ssh_client=ssh_client)
def _start_iperf3_server(
port: typing.Union[int, None],
protocol: typing.Union[str, None],
ssh_client: ssh.SSHClientType):
parameters = _parameters.iperf3_server_parameters(
port=port, protocol=protocol)
command = _interface.get_iperf3_server_command(parameters)
process = sh.process(command, ssh_client=ssh_client)
process.execute()
def _iperf3_server_alive(
port: typing.Union[int, None],
protocol: typing.Union[str, None],
ssh_client: ssh.SSHClientType = None) -> bool:
return bool(
_get_iperf3_pid(port=port, protocol=protocol,
ssh_client=ssh_client))
def _stop_iperf3_server(
port: typing.Union[int, None],
protocol: typing.Union[str, None],
ssh_client: ssh.SSHClientType = None):
pid = _get_iperf3_pid(port=port, protocol=protocol, ssh_client=ssh_client)
if pid:
LOG.info(f'iperf3 server listening on the {protocol} port: {port} '
f'is already running with PID: {pid}')
sh.execute(f'sudo kill {pid}', ssh_client=ssh_client)

View File

@ -29,6 +29,11 @@ def get_iperf3_client_command(parameters: _parameters.Iperf3ClientParameters):
return interface.get_iperf3_client_command(parameters)
def get_iperf3_server_command(parameters: _parameters.Iperf3ServerParameters):
interface = Iperf3Interface()
return interface.get_iperf3_server_command(parameters)
class Iperf3Interface:
def get_iperf3_client_command(
@ -38,6 +43,13 @@ class Iperf3Interface:
options = self.get_iperf3_client_options(parameters=parameters)
return sh.shell_command('iperf3') + options
def get_iperf3_server_command(
self,
parameters: _parameters.Iperf3ServerParameters) \
-> sh.ShellCommand:
options = self.get_iperf3_server_options(parameters=parameters)
return sh.shell_command('iperf3') + options
def get_iperf3_client_options(
self,
parameters: _parameters.Iperf3ClientParameters) \
@ -54,6 +66,20 @@ class Iperf3Interface:
options += self.get_download_option(parameters.download)
if parameters.protocol is not None:
options += self.get_protocol_option(parameters.protocol)
if parameters.logfile is not None:
options += self.get_logfile_option(parameters.logfile)
return options
def get_iperf3_server_options(
self,
parameters: _parameters.Iperf3ServerParameters) \
-> sh.ShellCommand:
options = sh.ShellCommand(['-J'])
options += self.get_server_mode_option()
if parameters.port is not None:
options += self.get_port_option(parameters.port)
if parameters.protocol is not None:
options += self.get_protocol_option(parameters.protocol)
return options
@staticmethod
@ -64,6 +90,10 @@ class Iperf3Interface:
def get_client_mode_option(server_address: str):
return ['-c', server_address]
@staticmethod
def get_server_mode_option():
return ["-s"]
@staticmethod
def get_download_option(download: bool):
if download:
@ -82,7 +112,7 @@ class Iperf3Interface:
@staticmethod
def get_timeout_option(timeout: int):
if timeout > 0:
if timeout >= 0:
return ['-t', timeout]
else:
return []
@ -90,3 +120,7 @@ class Iperf3Interface:
@staticmethod
def get_port_option(port):
return ['-p', port]
@staticmethod
def get_logfile_option(logfile):
return ['--logfile', logfile]

View File

@ -29,6 +29,12 @@ class Iperf3ClientParameters(typing.NamedTuple):
port: typing.Optional[int] = None
protocol: typing.Optional[str] = None
timeout: typing.Optional[int] = None
logfile: typing.Optional[str] = None
class Iperf3ServerParameters(typing.NamedTuple):
port: typing.Optional[int] = None
protocol: typing.Optional[str] = None
def iperf3_client_parameters(
@ -37,7 +43,8 @@ def iperf3_client_parameters(
download: bool = None,
port: int = None,
protocol: str = None,
timeout: int = None):
timeout: int = None,
logfile: str = None):
"""Get iperf3 client parameters
mode allowed values: client or server
ip is only needed for client mode
@ -60,4 +67,18 @@ def iperf3_client_parameters(
download=download,
port=port,
protocol=protocol,
timeout=timeout)
timeout=timeout,
logfile=logfile)
def iperf3_server_parameters(
port: int = None, protocol: str = None) -> Iperf3ServerParameters:
"""Get iperf3 server parameters
"""
config = tobiko.tobiko_config().iperf3
if port is None:
port = config.port
if protocol is None:
protocol = config.protocol
return Iperf3ServerParameters(port=port,
protocol=protocol)