Implement way to run iperf3 client in the POD

This patch implements functions necessary to start and stop iperf3
client in the POD on OpenShift. It also implements function to convert
logs from the iperf3 POD's stdout (json-stream), save them locally and
then check this log in the same ways as it is done for the iperf3
running on the guest VM or locally on the server where Tobiko runs.

This patch also adds new config option "iperf3_image" which can be used
to specify container image to run iperf3 client. By default it is the
Tobiko image which have installed iperf3. But currently Tobiko image is
based on Centos9 stream and have too old version of iperf3 (3.17 is
requried at least as this version provides "--json-stream" option).
Because of that it is recommended to use different image for now, like
e.g. [1] which is not official but works fine.

[1] https://quay.io/repository/skaplons/iperf3

Related: #TOBIKO-131
Change-Id: I440c9438954ea8e9e6159d7a1df472539e471988
This commit is contained in:
Slawek Kaplonski 2025-03-06 12:25:43 +01:00
parent ab0640b3f0
commit 0e133d8060
6 changed files with 151 additions and 23 deletions

View File

@ -0,0 +1,9 @@
---
other:
- |
New config option ``podified/iperf3_image`` is added. This option can be
used to specify iperf3 container image used to run iperf3 in the POD.
Default value is
``quay.io/podified-antelope-centos9/openstack-tobiko:current-podified`` but
this image currently don't provide iperf3 in the required version so other
image should be used in tests. Iperf3 >= 3.17 is required in this case.

View File

@ -13,13 +13,18 @@
# under the License.
from __future__ import absolute_import
import json
import typing
import netaddr
from oslo_log import log
import tobiko
from tobiko import config
from tobiko.shell import iperf3
from tobiko.shell import ping
from tobiko.shell import sh
from tobiko.shell import ssh
CONF = config.CONF
LOG = log.getLogger(__name__)
@ -327,7 +332,7 @@ def check_or_start_tobiko_ping_command(server_ip):
def check_or_start_tobiko_command(cmd_args, pod_name, check_function):
pod_obj = _get_tobiko_command_pod(pod_name)
pod_obj = _get_pod(pod_name)
if pod_obj:
# in any case test is still running, check for failures:
# execute process check i.e. go over results file
@ -352,7 +357,7 @@ def check_or_start_tobiko_command(cmd_args, pod_name, check_function):
check_function(pod_obj)
def _get_tobiko_command_pod(pod_name):
def _get_pod(pod_name):
with tobiko_project_context():
pod_sel = oc.selector(f'pod/{pod_name}')
if len(pod_sel.objects()) > 1:
@ -362,7 +367,7 @@ def _get_tobiko_command_pod(pod_name):
return pod_sel.objects()[0]
def _start_tobiko_command_pod(cmd_args, pod_name):
def _start_pod(cmd, args, pod_name, pod_image):
pod_def = {
"apiVersion": "v1",
"kind": "Pod",
@ -373,9 +378,9 @@ def _start_tobiko_command_pod(cmd_args, pod_name):
"spec": {
"containers": [{
"name": pod_name,
"image": CONF.tobiko.podified.tobiko_image,
"command": ["tobiko"],
"args": cmd_args,
"image": pod_image,
"command": cmd,
"args": args,
}],
"restartPolicy": "Never"
}
@ -403,6 +408,12 @@ def _start_tobiko_command_pod(cmd_args, pod_name):
return pod_objs[0]
def _start_tobiko_command_pod(cmd_args, pod_name):
return _start_pod(
cmd=["tobiko"], args=cmd_args, pod_name=pod_name,
pod_image=CONF.tobiko.podified.tobiko_image)
def _check_ping_results(pod):
# NOTE(slaweq): we have to put ping log files in the directory
# as defined below because it is expected to be like that by the
@ -427,3 +438,89 @@ def execute_in_pod(pod_name, command, container_name=None):
with oc.project(CONF.tobiko.podified.osp_project):
return oc.selector(f'pod/{pod_name}').object().execute(
['sh', '-c', command], container_name=container_name)
def _get_iperf_client_pod_name(
address: typing.Union[str, netaddr.IPAddress]) -> str:
return f'tobiko-iperf-client-{address}'.replace('.', '-')
def _store_iperf3_client_results(
address: typing.Union[str, netaddr.IPAddress],
output_dir: str = 'tobiko_iperf_results'):
# openshift client returns logs in dict where keyname has format
# <fully-qualified-name> -> <log output>
# In this case, we don't really need to check it as requested logs
# are only from the single POD so it can just try to get first
# item from the dict's values()
pod_obj = _get_pod(_get_iperf_client_pod_name(address))
raw_pod_logs = list(pod_obj.logs().values())[0]
if not raw_pod_logs:
LOG.warning('No logs from the iperf3 client POD.')
return
# Logs are printed by ipef3 client to the stdout in json
# format, but the format is different then what is stored
# in the file when "--logfile" option is used in ipef3
# So to be able to validate them in the same way, logs from
# stdout of the Pod needs to be converted
iperf3_results_data: dict = {
"intervals": []
}
for log_line in raw_pod_logs.split("\n"):
log_line_json = json.loads(log_line)
if log_line_json.get('event') != 'interval':
continue
iperf3_results_data["intervals"].append(
log_line_json["data"])
logfile = iperf3.get_iperf3_logs_filepath(address, output_dir)
with open(logfile, "w") as f:
json.dump(iperf3_results_data, f)
def start_iperf3(
address: typing.Union[str, netaddr.IPAddress],
bitrate: int = None,
download: bool = None,
port: int = None,
protocol: str = None,
iperf3_server_ssh_client: ssh.SSHClientType = None,
**kwargs): # noqa; pylint: disable=W0613
if iperf3_server_ssh_client:
iperf3.start_iperf3_server(
port, protocol, iperf3_server_ssh_client)
parameters = iperf3.iperf3_client_parameters(
address=address, bitrate=bitrate,
download=download, port=port, protocol=protocol,
timeout=0, logfile=iperf3.JSON_STREAM)
cmd_args = iperf3.get_iperf3_client_command(
parameters).as_list()[1:]
pod_name = _get_iperf_client_pod_name(address)
_start_pod(
cmd=["iperf3"], args=cmd_args, pod_name=pod_name,
pod_image=CONF.tobiko.podified.iperf3_image)
def stop_iperf3_client(
address: typing.Union[str, netaddr.IPAddress],
**kwargs): # noqa; pylint: disable=W0613
# First logs from the POD needs to be stored in the file
# so that it can be validated later
_store_iperf3_client_results(address)
pod_obj = _get_pod(_get_iperf_client_pod_name(address))
with tobiko_project_context():
pod_obj.delete(ignore_not_found=True)
def iperf3_pod_alive(
address: typing.Union[str, netaddr.IPAddress], # noqa; pylint: disable=W0613
**kwargs) -> bool:
pod_obj = _get_pod(_get_iperf_client_pod_name(address))
if not pod_obj:
return False
return pod_obj.as_dict()['status']['phase'] == 'Running'

View File

@ -189,20 +189,27 @@ class PodifiedTopology(rhosp.RhospTopology):
ssh_client: ssh.SSHClientType = None,
iperf3_server_ssh_client: ssh.SSHClientType = None):
kwargs = {
'address': server_ip,
'port': port,
'protocol': protocol,
'ssh_client': ssh_client,
'iperf3_server_ssh_client': iperf3_server_ssh_client,
'check_function': iperf3.check_iperf3_client_results
}
if not ssh_client:
LOG.debug("Running iperf3 client in the POD is "
"implemented yet.")
kwargs['start_function'] = _openshift.start_iperf3
kwargs['liveness_function'] = _openshift.iperf3_pod_alive
kwargs['stop_function'] = _openshift.stop_iperf3_client
else:
sh.check_or_start_external_process(
start_function=iperf3.execute_iperf3_client_in_background,
check_function=iperf3.check_iperf3_client_results,
liveness_function=iperf3.iperf3_client_alive,
stop_function=iperf3.stop_iperf3_client,
address=server_ip,
port=port,
protocol=protocol,
ssh_client=ssh_client,
iperf3_server_ssh_client=iperf3_server_ssh_client)
kwargs['start_function'] = \
iperf3.execute_iperf3_client_in_background
kwargs['liveness_function'] = iperf3.iperf3_client_alive
kwargs['stop_function'] = iperf3.stop_iperf3_client
sh.check_or_start_external_process(**kwargs)
class EdpmNode(rhosp.RhospNode):

View File

@ -41,6 +41,13 @@ OPTIONS = [
default='quay.io/podified-antelope-centos9/openstack-tobiko:current-podified', # noqa
help='Contaniner image used to run background tobiko commands '
'like e.g. `tobiko ping` in the POD.'),
cfg.StrOpt('iperf3_image',
default='quay.io/podified-antelope-centos9/openstack-tobiko:current-podified', # noqa
help='Container image to run iperf3 client or server in the '
'backgroun in POD. It can be any image which provides '
'iperf3 but it should be in version 3.17 at least as '
'this version supports "--json-stream" option required '
'by Tobiko.'),
cfg.IntOpt('tobiko_start_pod_timeout',
default=100,
help='Defines how long Tobiko will wait until POD with the '

View File

@ -18,6 +18,7 @@ from __future__ import absolute_import
from tobiko.shell.iperf3 import _assert
from tobiko.shell.iperf3 import _execute
from tobiko.shell.iperf3 import _interface
from tobiko.shell.iperf3 import _parameters
JSON_STREAM = _interface.JSON_STREAM
@ -25,5 +26,12 @@ assert_has_bandwith_limits = _assert.assert_has_bandwith_limits
execute_iperf3_client_in_background = \
_execute.execute_iperf3_client_in_background
check_iperf3_client_results = _execute.check_iperf3_client_results
get_iperf3_logs_filepath = _execute.get_iperf3_logs_filepath
iperf3_client_alive = _execute.iperf3_client_alive
stop_iperf3_client = _execute.stop_iperf3_client
start_iperf3_server = _execute.start_iperf3_server
get_iperf3_client_command = _interface.get_iperf3_client_command
Iperf3ClientParameters = _parameters.Iperf3ClientParameters
iperf3_client_parameters = _parameters.iperf3_client_parameters

View File

@ -35,9 +35,9 @@ CONF = config.CONF
LOG = log.getLogger(__name__)
def _get_filepath(address: typing.Union[str, netaddr.IPAddress],
path: str,
ssh_client: ssh.SSHClientType = None) -> str:
def get_iperf3_logs_filepath(address: typing.Union[str, netaddr.IPAddress],
path: str,
ssh_client: ssh.SSHClientType = None) -> str:
if ssh_client:
final_dir = _get_remote_filepath(path, ssh_client)
else:
@ -162,7 +162,7 @@ def execute_iperf3_client_in_background(
iperf3_server_ssh_client: ssh.SSHClientType = None,
output_dir: str = 'tobiko_iperf_results',
**kwargs) -> None:
output_path = _get_filepath(address, output_dir, ssh_client)
output_path = get_iperf3_logs_filepath(address, output_dir, ssh_client)
LOG.info(f'starting iperf3 client process to > {address} , '
f'output file is : {output_path}')
# just in case there is some leftover file from previous run,
@ -178,7 +178,7 @@ def execute_iperf3_client_in_background(
_stop_iperf3_server(
port=port, protocol=protocol,
ssh_client=iperf3_server_ssh_client)
_start_iperf3_server(
start_iperf3_server(
port=port, protocol=protocol,
ssh_client=iperf3_server_ssh_client)
@ -235,7 +235,7 @@ def check_iperf3_client_results(address: typing.Union[str, netaddr.IPAddress],
**kwargs): # noqa; pylint: disable=W0613
# This function expects that the result file is available locally already
#
logfile = _get_filepath(address, output_dir, ssh_client)
logfile = get_iperf3_logs_filepath(address, output_dir, ssh_client)
try:
iperf_log_raw = sh.execute(
f"cat {logfile}", ssh_client=ssh_client).stdout
@ -296,7 +296,7 @@ def stop_iperf3_client(address: typing.Union[str, netaddr.IPAddress],
sh.execute(f'sudo kill {pid}', ssh_client=ssh_client)
def _start_iperf3_server(
def start_iperf3_server(
port: typing.Union[int, None],
protocol: typing.Union[str, None],
ssh_client: ssh.SSHClientType):