Eduardo Olivares 435766502a Fix pep8 errors raised with python3.12 and update advanced guest image
Two recent changes in CI infra have affected
whitebox_neutron_tempest_plugin jobs:
1) After updating of pep8 jobs to ubuntu-noble/python3.12, an error is
   reported and this patch fixes it.
2) Tests using advanced images had issues to boot rocky 9.3 VM instances
   and due to that, this patch updates it to rocky 9.5.
   As a consequence of this change, virt-customize command has been
   updated to:
   * install tcpdump
   * enable dhcp for NetworkManager
   And some tests have been adapted to rocky 9.5 characteristics.

Change-Id: I489ecaf1765570e52b1f2d2676f13a0edc5f6fc4
2024-12-05 11:07:45 +01:00

1450 lines
64 KiB
Python

# Copyright 2024 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import random
import time
import netaddr
import testtools
from neutron_lib.api.definitions import qos as qos_apidef
from neutron_lib import constants
from neutron_tempest_plugin.common import ssh
from neutron_tempest_plugin import exceptions as neutron_exceptions
from neutron_tempest_plugin.scenario import test_qos
from oslo_log import log
from tempest.common import waiters
from tempest import config
from tempest import exceptions as tempest_exceptions
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from tempest.lib import exceptions
from whitebox_neutron_tempest_plugin.common import tcpdump_capture as capture
from whitebox_neutron_tempest_plugin.common import utils
from whitebox_neutron_tempest_plugin.tests.scenario import base
try:
from tempest.lib.common import api_microversion_fixture
from tempest.lib.common import api_version_utils
except ImportError:
pass
CONF = config.CONF
WB_CONF = CONF.whitebox_neutron_plugin_options
LOG = log.getLogger(__name__)
class QosBaseTest(test_qos.QoSTestMixin, base.TrafficFlowTest):
required_extensions = [qos_apidef.ALIAS]
credentials = ['primary', 'admin']
dscp_mark_net = 56
dscp_mark_net_new = 22
dscp_mark_port = 48
dscp_mark_fip = 36
dscp_mark_fip_new = 38
MIN_KBPS_NO_BWLIMIT = 10000
IPERF_PORT = 4321
bwlimit_kbps_net = 1000
bwlimit_kbps_net_new = 500
bwlimit_kbps_port = 250
sriov_test = False
@classmethod
def skip_checks(cls):
super(QosBaseTest, cls).skip_checks()
advanced_image_available = (
CONF.neutron_plugin_options.advanced_image_ref or
CONF.neutron_plugin_options.default_image_is_advanced)
if not advanced_image_available:
skip_reason = "This test require advanced tools for this test"
raise cls.skipException(skip_reason)
@classmethod
def setup_clients(cls):
super(QosBaseTest, cls).setup_clients()
cls.admin_client = cls.os_admin.network_client
cls.qos_bw_limit_rule_client = \
cls.os_admin.qos_limit_bandwidth_rules_client
cls.qos_bw_limit_rule_client_primary = \
cls.os_primary.qos_limit_bandwidth_rules_client
cls.qos_min_bw_rules_client = \
cls.os_admin.qos_minimum_bandwidth_rules_client
cls.qos_min_bw_rules_client_primary = \
cls.os_primary.qos_minimum_bandwidth_rules_client
@classmethod
def resource_setup(cls):
super(QosBaseTest, cls).resource_setup()
if not hasattr(cls, 'nodes'):
raise cls.skipException(
"Nodes info not available. Test won't be able to connect to "
"nodes.")
msg = "Required QoS config is not set"
if WB_CONF.openstack_type == 'devstack':
for node in cls.nodes:
if node['is_controller'] is False:
continue
cls.check_service_setting(
host=node, service='',
config_files=[cls.neutron_conf],
param='service_plugins', value='qos')
cls.check_service_setting(
host=node, service='',
config_files=[WB_CONF.ml2_plugin_config], section='ml2',
param='extension_drivers', value='qos')
if WB_CONF.openstack_type == 'podified':
config_files = cls.get_configs_of_service('neutron')
cls.check_service_setting(
{'client': cls.proxy_host_client}, service='neutron',
config_files=config_files, section='DEFAULT',
param='service_plugins', value='qos', msg=msg)
cls.check_service_setting(
{'client': cls.proxy_host_client}, service='neutron',
config_files=config_files, section='ml2',
param='extension_drivers', value='qos', msg=msg)
if CONF.neutron_plugin_options.default_image_is_advanced:
cls.flavor_ref = CONF.compute.flavor_ref
cls.image_ref = CONF.compute.image_ref
cls.username = CONF.validation.image_ssh_user
else:
cls.flavor_ref = (
CONF.neutron_plugin_options.advanced_image_flavor_ref)
cls.image_ref = CONF.neutron_plugin_options.advanced_image_ref
cls.username = CONF.neutron_plugin_options.advanced_image_ssh_user
# Reduce initial rate for ovs virt envs to avoid instabilities
if not cls.has_sriov_support and not cls.has_ovn_support:
cls.MIN_KBPS_NO_BWLIMIT = 600
cls.bwlimit_kbps_net = 400
cls.bwlimit_kbps_net_new = 250
cls.bwlimit_kbps_port = 125
cls.router = cls.create_router_by_client()
cls.keypair = cls.create_keypair()
cls.secgroup = cls.os_primary.network_client.create_security_group(
name=data_utils.rand_name('secgroup_qos'))
cls.security_groups.append(cls.secgroup['security_group'])
cls.create_loginable_secgroup_rule(
secgroup_id=cls.secgroup['security_group']['id'])
cls.create_pingable_secgroup_rule(
secgroup_id=cls.secgroup['security_group']['id'])
# create security group rules for icmpv6
rulesets = [{'protocol': constants.PROTO_NAME_IPV6_ICMP,
'ethertype': 'IPv6',
'direction': 'ingress'}]
# create security group rules for the bw limit tests both udp and tcp
# ports need to be accessible and both for IPv4 and IPv6
for proto in (constants.PROTO_NAME_TCP, constants.PROTO_NAME_UDP):
for ipversion in ('IPV4', 'IPV6'):
rulesets.append({'protocol': proto,
'ethertype': ipversion,
'port_range_min': cls.IPERF_PORT,
'port_range_max': cls.IPERF_PORT,
'direction': 'ingress'})
cls.create_secgroup_rules(
rulesets, cls.secgroup['security_group']['id'])
@staticmethod
def _get_iperf_proto_param(protocol):
if protocol == constants.PROTO_NAME_TCP:
params = ''
if WB_CONF.window_size:
params = '-w {}'.format(WB_CONF.window_size)
return params
elif protocol == constants.PROTO_NAME_UDP:
return '-u'
else:
raise ValueError('Unsupported protocol %s' % protocol)
def _test_egress_bw(
self, ssh_client, ssh_server, server_ip, protocol, timeout=10):
utils.kill_iperf_process(ssh_server)
utils.kill_iperf_process(ssh_client)
iperf_server_filename = utils.get_temp_file(ssh_server)
server_cmd = ('iperf3 -s -p {port} -J --logfile {output_file} '
'-D'.format(port=self.IPERF_PORT,
output_file=iperf_server_filename))
LOG.debug('Run iperf3 command on server: %s', server_cmd)
ssh_server.exec_command(server_cmd)
time.sleep(0.1)
protocol_param = self._get_iperf_proto_param(protocol)
client_cmd = ('iperf3 -c {server_ip} -p {port} {protocol_param} '
'-t {timeout} -b {maxbitrate}'.format(
server_ip=server_ip,
port=self.IPERF_PORT,
protocol_param=protocol_param,
timeout=timeout,
maxbitrate=self.MIN_KBPS_NO_BWLIMIT * 2000))
LOG.debug('Run iperf3 command on client: %s', client_cmd)
ssh_client.exec_command(client_cmd)
time.sleep(0.1)
return json.loads(utils.cat_remote_file(
ssh_server, iperf_server_filename))
def _test_ingress_bw(
self, ssh_client, ssh_server, server_ip, protocol, timeout=10):
utils.kill_iperf_process(ssh_server)
utils.kill_iperf_process(ssh_client)
server_cmd = 'iperf3 -s -p {port} -D'.format(port=self.IPERF_PORT)
LOG.debug('Run iperf3 command on server: %s', server_cmd)
ssh_server.exec_command(server_cmd)
time.sleep(0.1)
iperf_client_filename = utils.get_temp_file(ssh_client)
protocol_param = self._get_iperf_proto_param(protocol)
client_cmd = ('iperf3 -c {server_ip} -p {port} {protocol_param} '
'-t {timeout} -b {maxbitrate} '
'-R -J > {output_file}'.format(
server_ip=server_ip, port=self.IPERF_PORT,
protocol_param=protocol_param, timeout=timeout,
maxbitrate=self.MIN_KBPS_NO_BWLIMIT * 2000,
output_file=iperf_client_filename))
LOG.debug('Run iperf3 command on client: %s', client_cmd)
ssh_client.exec_command(client_cmd)
time.sleep(0.1)
return json.loads(utils.cat_remote_file(
ssh_client, iperf_client_filename))
def _calculate_bw(self, perf_measures):
# First 3 for ovs envs, 2 for ovn sriov envs and 1 for normal ovn
# intervals are removed because BW measured during it is not
# limited - it takes ~2-4 seconds to traffic shaping algorithm to apply
# several tests in parallel increase this application time
# bw limit properly (buffer is empty when traffic starts being sent)
if not self.has_ovn_support:
intervals_start = 3
elif self.sriov_test:
intervals_start = 2
else:
intervals_start = 1
# For rocky images, final interval is ignored
# TODO(eolivare): provide link to iperf/rocky bug
intervals_end = (len(perf_measures['intervals'])
if self.username != "rocky"
else len(perf_measures['intervals']) - 1)
intervals = perf_measures['intervals'][intervals_start:intervals_end]
bits_received = sum([interval['sum']['bytes'] * 8
for interval in intervals])
totaltime = sum([interval['sum']['seconds'] for interval in intervals])
# bw in bits per second
return bits_received / totaltime
def _skip_if_iperf3_not_installed(self, ssh_client):
try:
ssh_client.execute_script('PATH=$PATH:/usr/sbin which iperf3')
except neutron_exceptions.SSHScriptFailed:
raise self.skipException(
"iperf3 is not available on VM instance")
# retry only when noticed measuring issue, as reported in BZ#2274465
@utils.retry_on_assert_fail(
max_retries=2,
assert_regex='not .* than')
def _validate_bw_limit(self, client, server, egress=True, ipv6=False,
bw_limit=None, protocol=constants.PROTO_NAME_TCP):
server_port = self.client.list_ports(
device_id=server['id'])['ports'][0]
server_ips = []
for fixed_ip in server_port['fixed_ips']:
if netaddr.valid_ipv6(fixed_ip['ip_address']) and not ipv6:
continue
server_ips.append(fixed_ip['ip_address'])
self.assertGreater(len(server_ips), 0)
if egress:
test_bw_method = self._test_egress_bw
direction = 'egress'
else:
test_bw_method = self._test_ingress_bw
direction = 'ingress'
# egress: send payload from client to server
# ingress: download payload from server to client
for server_ip in server_ips:
perf_measures = test_bw_method(
client['ssh_client'], server['ssh_client'],
server_ip, protocol)
LOG.debug('perf_measures = {}'.format(perf_measures))
# verify bw limit
measured_bw = self._calculate_bw(perf_measures)
LOG.debug(
'{} {} / server_ip = {} / measured_bw = {}'.format(
direction, protocol, server_ip, measured_bw))
if bw_limit is None:
LOG.debug('no {} bw_limit configured'.format(direction))
self.assertGreater(measured_bw,
self.MIN_KBPS_NO_BWLIMIT * 1000)
else:
LOG.debug('{} bw_limit = {}'.format(direction, bw_limit))
# a 20% of upper deviation is allowed
self.assertLess(measured_bw, bw_limit * 1.2)
# a 20% of lower deviation is allowed
self.assertGreater(measured_bw, bw_limit * 0.8)
def _validate_traffic_bw_limit(self, client, server,
egress=True, ipv6=False, fip_qos=False):
"""Validate that bw limit is applied to the traffic between client and
server VMs
Scenario:
1. First make sure that bw between VMs is not limited.
2. Create a QoS policy, attach to the network where client VM
is connected to.
3. Add a bw limit rule to the policy.
4. Send traffic between 2 VMs and make sure that now bw limit
is applied.
5. Update the bw limit rule with a different value and make
sure that bw is limited accordingly.
6. Create another QoS policy, apply to the port of client VM
and add a bw limit rule with another value.
7. Send traffic between 2 VMs and make sure that now bw is
limited according to the new value (since rule for port has higher
priority).
8. Delete bw limit rule from the port QoS policy and make sure
that traffic is not limited.
"""
# Prerequisite: install iperf3
self._skip_if_iperf3_not_installed(client['ssh_client'])
self._skip_if_iperf3_not_installed(server['ssh_client'])
# First, let's make sure that bw is not limited
self._validate_bw_limit(client, server, egress)
if self.sriov_test:
# TODO(eolivare): investigate why this delay between iperf tests is
# needed when SRIOV ports are used
time.sleep(5)
self._validate_bw_limit(client, server, egress,
protocol=constants.PROTO_NAME_UDP)
direction = 'egress' if egress else 'ingress'
# Create new QoS policy and attach to the src network
net_bwlimit_policy_id = self._create_qos_policy()
src_port = self.client.list_ports(device_id=client['id'])['ports'][0]
self.admin_client.update_network(
src_port['network_id'], qos_policy_id=net_bwlimit_policy_id)
self.addCleanup(self.admin_client.update_network,
src_port['network_id'], qos_policy_id=None)
max_kbps = (self.bwlimit_kbps_net
if src_port['binding:vnic_type'] == 'normal'
else self.bwlimit_kbps_net * 1000)
rule_data = {
'max_kbps': max_kbps,
'max_burst_kbps': max_kbps * 0.8,
'direction': direction}
net_rule_id = self._create_qos_bw_limit_rule(
net_bwlimit_policy_id, rule_data)['id']
# Validate the bw limit - using UDP
self._validate_bw_limit(
client, server, egress, bw_limit=max_kbps * 1000,
protocol=constants.PROTO_NAME_UDP)
# Update the bw limit rule and verify that new bw limit is applied
max_kbps = (self.bwlimit_kbps_net_new
if src_port['binding:vnic_type'] == 'normal'
else self.bwlimit_kbps_net_new * 1000)
rule_update_data = {
'max_kbps': max_kbps,
'max_burst_kbps': max_kbps * 0.8}
self.qos_bw_limit_rule_client.update_limit_bandwidth_rule(
qos_policy_id=net_bwlimit_policy_id, rule_id=net_rule_id,
**rule_update_data)
# Validate the bw limit - using TCP
self._validate_bw_limit(client, server, egress,
bw_limit=max_kbps * 1000)
# Create a new QoS policy and attach to the port of src server
bwlimit_policy_id_new = self._create_qos_policy()
max_kbps = (self.bwlimit_kbps_port
if src_port['binding:vnic_type'] == 'normal'
else self.bwlimit_kbps_port * 1000)
rule_data = {
'max_kbps': max_kbps,
'max_burst_kbps': max_kbps * 0.8,
'direction': direction}
port_rule_id = self._create_qos_bw_limit_rule(
bwlimit_policy_id_new, rule_data)['id']
self.client.update_port(
src_port['id'], qos_policy_id=bwlimit_policy_id_new)
self.addCleanup(self.admin_client.update_port,
src_port['id'], qos_policy_id=None)
# Verify that bw limit applied corresponds with the port QoS policy
# Validate the bw limit - using UDP
self._validate_bw_limit(
client, server, egress, bw_limit=max_kbps * 1000,
protocol=constants.PROTO_NAME_UDP)
# Create a new Qos Policy and attach to the FIP of src server
# This only applies to south-north tests because the traffic from the
# src server to the dst server goes through the src FIP
if fip_qos:
fip_qos_pol_id = self._create_qos_policy()
# fip bw limit greater than port bw limit
fip_max_kbps = max_kbps * 10
rule_data = {
'max_kbps': fip_max_kbps,
'max_burst_kbps': fip_max_kbps * 0.8,
'direction': direction}
fip_port_rule_id = self._create_qos_bw_limit_rule(
fip_qos_pol_id, rule_data)['id']
src_fip_id = self.client.list_floatingips(
fixed_ip_address=src_port['fixed_ips'][0]['ip_address'])[
'floatingips'][0]['id']
self.client.update_floatingip(
src_fip_id, qos_policy_id=fip_qos_pol_id)
self.addCleanup(self.admin_client.update_floatingip,
src_fip_id, qos_policy_id=None)
# port bw limit is lower than fip bw limit, so max_kbps
# will be the measured bw
self._validate_bw_limit(
client, server, egress, bw_limit=max_kbps * 1000,
protocol=constants.PROTO_NAME_UDP)
# fip bw limit lower than port bw limit
fip_max_kbps = max_kbps // 2
rule_update_data = {
'max_kbps': fip_max_kbps,
'max_burst_kbps': fip_max_kbps * 0.8}
self.qos_bw_limit_rule_client.update_limit_bandwidth_rule(
qos_policy_id=fip_qos_pol_id, rule_id=fip_port_rule_id,
**rule_update_data)
# For rocky images, running iperf tests with low BW limits using
# TCP does not work well, wo UDP is used instead
# TODO(eolivare): provide link to iperf/rocky bug
protocol = (constants.PROTO_NAME_TCP
if self.username != "rocky"
else constants.PROTO_NAME_UDP)
# fip bw limit is lower than port bw limit, so fip_max_kbps
# will be the measured bw
self._validate_bw_limit(client, server, egress,
bw_limit=fip_max_kbps * 1000,
protocol=protocol)
# delete bw limit rule associated to fip qos policy
# port bw limit applies again
self.qos_bw_limit_rule_client.delete_limit_bandwidth_rule(
fip_qos_pol_id, fip_port_rule_id)
self._validate_bw_limit(client, server, egress,
bw_limit=max_kbps * 1000,
protocol=protocol)
# Delete bw limit rule from the port QoS policy and validate that
# bw is not limited anymore
self.qos_bw_limit_rule_client.delete_limit_bandwidth_rule(
bwlimit_policy_id_new, port_rule_id)
# Validate the bw limit - using TCP
self._validate_bw_limit(client, server, egress)
# Verify a QoS policy attached to a port cannot be deleted
try:
self.admin_client.delete_qos_policy(bwlimit_policy_id_new)
except exceptions.Conflict:
LOG.debug('QoS Policy %s cannot be deleted because '
'it is attached to a port', bwlimit_policy_id_new)
else:
raise RuntimeError('Deletion of QoS Policy %s should '
'have been rejected' % bwlimit_policy_id_new)
# remove QoS policies from port and network
self.admin_client.update_port(src_port['id'], qos_policy_id=None)
self.admin_client.update_network(
src_port['network_id'], qos_policy_id=None)
def _get_dscp_filters(
self, src_port, dst_ip, mark, outer=False, tunnel_type=None,
segmentation_id=None):
# IPv4 tcpdump filters explanation:
# ip[1] refers to the byte 1 (the TOS byte) of the IP header
# 0xfc = 11111100 is the mask to get only DSCP value from the ToS
# As DSCP mark is most significant 6 bits we do right shift (>>)
# twice in order to divide by 4 and compare with the decimal value
# See details at http://darenmatthews.com/blog/?p=1199
filters_outer = '(ip and (ip[1] & 0xfc) >> 2 == {})'.format(mark)
# IPv6 tcpdump filters explanation:
# ip6[0:2] refers to the bytes 0 and 1 of the IPv6 header,
# containing Version (bits 0 to 3), Traffic Class (bits 4 to 11)
# and part of the Flow Label
# 0x0fc0 = 0000111111000000 is the mask to get only DSCP value from
# the Traffic Class
# We need to do right shift 6 positions (>>) to obtain the DSCP
# value
filters_outer_v6 = (
'(ip6 and (ip6[0:2] & 0x0fc0) >> 6 == {})'.format(mark))
if not outer:
dst_host_filter = 'dst host {}'.format(dst_ip)
if netaddr.valid_ipv6(dst_ip):
filters_outer = filters_outer_v6
return '"{} and {}"'.format(filters_outer, dst_host_filter)
supported_tunnel_types = ('vxlan', 'geneve')
if tunnel_type not in supported_tunnel_types:
raise self.skipException(
"Tunnel type '{}' is not supported by the test. Supported "
"types: {}".format(tunnel_type, supported_tunnel_types))
mac_pattern = '0x' + src_port['mac_address'].replace(':', '')[4:]
# scenario index, which is used in the lists below
# 0 - outer ipv4, inner ipv4
# 1 - outer ipv4, inner ipv6
# 2 - outer ipv6, inner ipv4
# 3 - outer ipv6, inner ipv6
dscp_length = [1, 2, 1, 2]
tos_pattern = ['0xfc', '0xfc0', '0xfc', '0xfc0']
tos_shift = [2, 6, 2, 6]
icmp_value = ['1', '0x3a', '1', '0x3a']
dscp_offset = {'vxlan': [51, 50, 71, 70],
'geneve': [59, 58, 79, 78]}
icmp_offset = {'vxlan': [59, 56, 79, 76],
'geneve': [67, 64, 87, 84]}
if self.is_ipv6 is True:
ip = 'ip6'
# this offset covers difference between ipv4 and ipv6 header length
extra_offset = 20
scenario = 3 if netaddr.valid_ipv6(dst_ip) else 2
filters_outer = filters_outer_v6
else:
ip = 'ip'
extra_offset = 0
scenario = 1 if netaddr.valid_ipv6(dst_ip) else 0
vni_offset = 31 + extra_offset
if tunnel_type == 'vxlan':
tunnel_port = 4789
tunnel_type_offset = 28 + extra_offset
tunnel_type_value = '0x0800'
mac_offset = 44 + extra_offset
vnet_id = segmentation_id
if tunnel_type == 'geneve':
tunnel_port = 6081
tunnel_type_offset = 30 + extra_offset
tunnel_type_value = '0x6558'
mac_offset = 52 + extra_offset
vnet_id = self.get_datapath_tunnel_key(
"external_id:name2=" + self.networks[0]["name"])
port_filter = 'port {}'.format(tunnel_port)
tunnel_type_filter = '{}[{}:2] = {}'.format(
ip, tunnel_type_offset, tunnel_type_value)
vni_filter = '({}[{}:4] & 0x00FFFFFF) == {}'.format(
ip, vni_offset, vnet_id)
mac_filter = '{}[{}:4] = {}'.format(ip, mac_offset, mac_pattern)
inner_dscp_filter = ('({}[{}:{}] & {}) >> {} = {}'.format(
ip, dscp_offset[tunnel_type][scenario], dscp_length[scenario],
tos_pattern[scenario], tos_shift[scenario], mark))
icmp_filter = '{}[{}:1] = {}'.format(
ip, icmp_offset[tunnel_type][scenario], icmp_value[scenario])
filters_inner = ('{} and {} and {} and {} and {} and {}'.format(
port_filter, tunnel_type_filter, vni_filter,
mac_filter, inner_dscp_filter, icmp_filter))
return '"{} and {}"'.format(filters_outer, filters_inner)
def _validate_traffic_marked(
self, mark, src_server, dst_server, ipv6=False, outer=False):
dst_port = self.client.list_ports(
device_id=dst_server['id'])['ports'][0]
dst_ips = []
for fixed_ip in dst_port['fixed_ips']:
if netaddr.valid_ipv6(fixed_ip['ip_address']) and not ipv6:
continue
dst_ips.append(fixed_ip['ip_address'])
self.assertGreater(len(dst_ips), 0)
src_port = self.client.list_ports(
device_id=src_server['id'])['ports'][0]
if outer:
interface = WB_CONF.node_tenant_interface
dst_server['host'] = self.get_host_for_server(
dst_server['id']).split('.')[0]
for node in self.nodes:
if node['name'] == dst_server['host']:
dst_node_ssh_client = node['client']
break
network = self.os_admin.network_client.show_network(
src_port['network_id'])['network']
segmentation_id = network['provider:segmentation_id']
tunnel_type = network['provider:network_type']
capture_host_client = dst_node_ssh_client
else:
interface = utils.get_default_interface(dst_server['ssh_client'])
segmentation_id = None
tunnel_type = None
capture_host_client = dst_server['ssh_client']
for dst_ip in dst_ips:
filters = self._get_dscp_filters(
src_port, dst_ip, mark, outer, tunnel_type, segmentation_id)
remote_capture = capture.TcpdumpCapture(
capture_host_client, interface, filters)
self.useFixture(remote_capture)
time.sleep(10)
self.check_remote_connectivity(
src_server['ssh_client'], dst_ip, ping_count=1)
time.sleep(5)
remote_capture.stop()
msg = 'Not found packets with expected DSCP value'
if outer:
msg += ' set in inner and outer headers'
self.assertFalse(remote_capture.is_empty(), msg)
def _validate_traffic_dscp_marking(
self, src_server, dst_server, ipv6=False, fip_qos=False):
"""Validate that traffic between servers has a dscp mark
Scenario:
1. First make sure that traffic between servers is not marked.
2. Create a QoS policy, attach to the network where src_server
is connected to.
3. Add a DSCP marking rule to the policy.
4. Send traffic between 2 servers and make sure that now packets
are marked with corresponding DSCP mark.
5. Update the DSCP marking rule to use different mark and make
sure that traffic is marked with a new mark.
6. Create another QoS policy, apply to the port of src_server
and add a DSCP marking rule with another mark.
7. Send traffic between 2 servers and make sure that now packets
are marked with a new DSCP mark (since rule for port has higher
priority).
8. Delete DSCP marking rule from the port QoS policy and make sure
that traffic is not marked.
"""
# First, let's make sure that traffic is not marked
self._validate_traffic_marked(0, src_server, dst_server)
# Create new QoS policy and attach to the src network
net_dscp_policy_id = self._create_qos_policy()
src_port = self.client.list_ports(
device_id=src_server['id'])['ports'][0]
self.admin_client.update_network(
src_port['network_id'], qos_policy_id=net_dscp_policy_id)
self.addCleanup(self.admin_client.update_network,
src_port['network_id'], qos_policy_id=None)
net_rule_id = self.admin_client.create_dscp_marking_rule(
net_dscp_policy_id, self.dscp_mark_net)[
'dscp_marking_rule']['id']
# Validate that traffic that reach destination host is marked
self._validate_traffic_marked(
self.dscp_mark_net, src_server, dst_server)
# Update the DSCP marking rule and verify that traffic is marked
# with a new value
self.admin_client.update_dscp_marking_rule(
net_dscp_policy_id, net_rule_id,
dscp_mark=self.dscp_mark_net_new)
self._validate_traffic_marked(
self.dscp_mark_net_new, src_server, dst_server)
# Create a new QoS policy and attach to the port of src server
dscp_policy_id_new = self._create_qos_policy()
port_rule_id = self.admin_client.create_dscp_marking_rule(
dscp_policy_id_new, self.dscp_mark_port)[
'dscp_marking_rule']['id']
self.client.update_port(
src_port['id'], qos_policy_id=dscp_policy_id_new)
self.addCleanup(self.admin_client.update_port,
src_port['id'], qos_policy_id=None)
# Verify that traffic now is marked with a new value
self._validate_traffic_marked(
self.dscp_mark_port, src_server, dst_server)
# Create a new QoS Policy and attach to the FIP of src server
# This only applies to south-north tests because the traffic from the
# src server to the dst server goes through the src FIP
if fip_qos:
fip_qos_pol_id = self._create_qos_policy()
# dscp mark value applied to fip is self.dscp_mark_fip
fip_port_rule_id = self.admin_client.create_dscp_marking_rule(
fip_qos_pol_id, self.dscp_mark_fip)['dscp_marking_rule']['id']
src_fip_id = self.client.list_floatingips(
fixed_ip_address=src_port['fixed_ips'][0]['ip_address'])[
'floatingips'][0]['id']
self.client.update_floatingip(
src_fip_id, qos_policy_id=fip_qos_pol_id)
# Verify that traffic is marked with a value from fip qos policy
self._validate_traffic_marked(
self.dscp_mark_fip, src_server, dst_server)
# update dscp mark associated fip qos policy
self.admin_client.update_dscp_marking_rule(
fip_qos_pol_id, fip_port_rule_id,
dscp_mark=self.dscp_mark_fip_new)
self._validate_traffic_marked(
self.dscp_mark_fip_new, src_server, dst_server)
# delete dscp mark associated fip qos policy
# port dscp rule applies
self.admin_client.delete_dscp_marking_rule(
fip_qos_pol_id, fip_port_rule_id)
self._validate_traffic_marked(
self.dscp_mark_port, src_server, dst_server)
# Delete DSCP rule from the port QoS policy and validate that traffic
# is not marked
self.admin_client.delete_dscp_marking_rule(
dscp_policy_id_new, port_rule_id)
self._validate_traffic_marked(0, src_server, dst_server)
# Verify a QoS policy attached to a port cannot be deleted
try:
self.admin_client.delete_qos_policy(dscp_policy_id_new)
except exceptions.Conflict:
LOG.debug('QoS Policy %s cannot be deleted because '
'it is attached to a port', dscp_policy_id_new)
else:
raise RuntimeError('Deletion of QoS Policy %s should '
'have been rejected' % dscp_policy_id_new)
def _test_both_bwlimit_dscp(
self, vms, bwlimit_kbps, dscp_mark, network_id):
# Prerequisite: install iperf3
for role in 'sender', 'receiver':
self._skip_if_iperf3_not_installed(vms[role]['ssh_client'])
# First, let's make sure that bw is not limited
self._validate_bw_limit(client=vms['sender'], server=vms['receiver'])
# Also, make sure that traffic is not marked
self._validate_traffic_marked(0, vms['sender'], vms['receiver'])
# configure qos policies
policy_id = self._create_qos_policy()
self.admin_client.update_network(network_id, qos_policy_id=policy_id)
self.addCleanup(
self.admin_client.update_network, network_id, qos_policy_id=None)
rule_data = {
'max_kbps': bwlimit_kbps,
'max_burst_kbps': bwlimit_kbps * 0.8,
'direction': 'egress'}
self._create_qos_bw_limit_rule(
policy_id, rule_data)['id']
self.admin_client.create_dscp_marking_rule(
policy_id, self.dscp_mark_net)['dscp_marking_rule']['id']
# Make sure that bw limit applied properly before action (using TCP)
self._validate_bw_limit(
bw_limit=bwlimit_kbps * 1000, client=vms['sender'],
server=vms['receiver'])
# Validate that traffic that reach receiver host is marked
self._validate_traffic_marked(
self.dscp_mark_net, vms['sender'], vms['receiver'])
def _test_qos_after_migration(
self, src_server, dst_server, migration_method):
vms = {'sender': src_server, 'receiver': dst_server}
port = self.client.list_ports(
device_id=vms['sender']['id'])['ports'][0]
max_kbps = (self.bwlimit_kbps_net
if port['binding:vnic_type'] == 'normal'
else self.bwlimit_kbps_net * 1000)
dscp_mark = self.dscp_mark_net
# validate both bwlimit and dscp mark
self._test_both_bwlimit_dscp(
vms, max_kbps, dscp_mark, port['network_id'])
block_migration = (CONF.compute_feature_enabled.
block_migration_for_live_migration)
for role in 'sender', 'receiver':
vms_host = self.get_host_for_server(
vms[role]['id']).split('.')[0]
if migration_method == 'cold-migration':
self.os_admin.servers_client.migrate_server(vms[role]['id'])
waiters.wait_for_server_status(self.os_admin.servers_client,
vms[role]['id'],
'VERIFY_RESIZE')
# confirm migration
self.os_admin.servers_client.confirm_resize_server(
vms[role]['id'])
elif migration_method == 'live-migration':
block_migration = (CONF.compute_feature_enabled.
block_migration_for_live_migration)
self.os_admin.servers_client.live_migrate_server(
vms[role]['id'], host=None,
block_migration=block_migration, disk_over_commit=False)
else:
raise RuntimeError('Unsupported migration method %s'
% migration_method)
self.wait_for_server_active(vms[role])
vms_new_host = self.get_host_for_server(
vms[role]['id']).split('.')[0]
self.assertNotEqual(vms_host, vms_new_host,
'%s vm did not migrate' % role)
LOG.debug("Validating server '{}'".format(role))
# Validate that traffic that reach destination host is still marked
self._validate_traffic_marked(
dscp_mark, vms['sender'], vms['receiver'])
# Make sure that bw limit still works after migration
self._validate_bw_limit(
bw_limit=max_kbps * 1000, client=vms['sender'],
server=vms['receiver'])
def _check_dscp_inheritance(self):
src_server, dst_server = self._create_vms_by_topology(ipv6=True)
policy_id = self._create_qos_policy()
src_port = self.client.list_ports(
device_id=src_server['id'])['ports'][0]
self.admin_client.create_dscp_marking_rule(
policy_id, self.dscp_mark_port)
self.client.update_port(
src_port['id'], qos_policy_id=policy_id)
self.addCleanup(self.admin_client.update_port,
src_port['id'], qos_policy_id=None)
self._validate_traffic_marked(
self.dscp_mark_port, src_server, dst_server, ipv6=True, outer=True)
class QosTestCommon(QosBaseTest):
@decorators.idempotent_id('db036021-ae2b-4149-b342-a5619aa606e4')
def test_dscp_marking_tenant_network(self):
src_server, dst_server = self._create_vms_by_topology()
self._validate_traffic_dscp_marking(src_server, dst_server, ipv6=True)
@decorators.idempotent_id('8cf45ea9-bdbf-478e-8810-792083e1c467')
def test_dscp_marking_external_network(self):
src_server, dst_server = self._create_vms_by_topology(
topology='external')
self._validate_traffic_dscp_marking(src_server, dst_server)
# Since QoS is applied to egress traffic than south->north
# test direction is more interesting than north->south
@decorators.idempotent_id('886d6c11-7c19-4c01-a5ac-74779bb2f364')
def test_dscp_marking_south_north(self):
# For south>north the order of returned by
# self._create_vms_by_topology should be reverse
dst_server, src_server = self._create_vms_by_topology(
topology='north-south')
if self.has_ovn_support:
self._validate_traffic_dscp_marking(
src_server, dst_server, fip_qos=True)
else:
self._validate_traffic_dscp_marking(src_server, dst_server)
@decorators.idempotent_id('d075dd6f-0101-4b1a-990c-a001313d3914')
def test_dscp_marking_east_west(self):
src_server, dst_server = self._create_vms_by_topology(
topology='east-west')
self._validate_traffic_dscp_marking(src_server, dst_server, ipv6=True)
@decorators.idempotent_id('161dc56f-a7e8-426e-9b06-3807e0c45692')
def test_bw_limit_tenant_network(self):
client, server = self._create_vms_by_topology()
self._validate_traffic_bw_limit(client, server, egress=True, ipv6=True)
self._validate_traffic_bw_limit(
client, server, egress=False, ipv6=True)
@decorators.idempotent_id('653e92b4-1f19-4f85-93f7-c14d0e900f44')
def test_bw_limit_south_north(self):
# For south>north the order of returned by
# self._create_vms_by_topology should be reverse
server, client = self._create_vms_by_topology(topology='north-south')
self._validate_traffic_bw_limit(
client, server, egress=True, fip_qos=True)
self._validate_traffic_bw_limit(
client, server, egress=False, fip_qos=True)
@decorators.idempotent_id('fc833d46-d18f-4edf-b082-5f5fe909fb79')
def test_bw_limit_east_west(self):
client, server = self._create_vms_by_topology(topology='east-west')
self._validate_traffic_bw_limit(client, server, egress=True, ipv6=True)
self._validate_traffic_bw_limit(
client, server, egress=False, ipv6=True)
@decorators.idempotent_id('121bd2dd-6d41-4658-8a05-6fdd444381fa')
@testtools.skipUnless(CONF.compute_feature_enabled.live_migration,
'Live migration is not available.')
@testtools.skipUnless(
CONF.compute.min_compute_nodes > 1,
'Less than 2 compute nodes, skipping multinode tests.')
@decorators.attr(type='slow')
def test_qos_after_live_migration(self):
src_server, dst_server = (
self._create_vms_by_topology(different_host=False))
self._test_qos_after_migration(
src_server, dst_server, migration_method='live-migration')
@decorators.idempotent_id('f6e1e433-33b1-4352-bdf3-92dbe9e3f2ee')
@testtools.skipUnless(CONF.compute_feature_enabled.cold_migration,
'Cold migration is not available.')
@testtools.skipUnless(
CONF.compute.min_compute_nodes > 1,
'Less than 2 compute nodes, skipping multinode tests.')
@decorators.attr(type='slow')
def test_qos_after_cold_migration(self):
src_server, dst_server = self._create_vms_by_topology()
self._test_qos_after_migration(
src_server, dst_server, migration_method='cold-migration')
@decorators.idempotent_id('5cbf643a-b4ed-453c-a073-d468a95a9d78')
def test_default_qos_policy(self):
default_policy = self.admin_client.create_qos_policy(
name='default-qos-policy',
shared=True,
is_default=True)['policy']
self.qos_policies.append(default_policy)
# create network using admin_client (same client used to create qos
# policies and rules)
network1 = self.admin_client.create_network(
name='default-qos-network-1')['network']
self.addCleanup(
self.admin_client.delete_network, network1['id'])
# check default qos policy was assigned at network creation
self.assertEqual(default_policy['id'], network1['qos_policy_id'])
# qos policy is not default anymore
self.admin_client.update_qos_policy(default_policy['id'],
is_default=False)
# create network using admin_client (same client used to create qos
# policies and rules
network2 = self.admin_client.create_network(
name='default-qos-network-2')['network']
self.addCleanup(
self.admin_client.delete_network, network2['id'])
# check no default qos policy was assigned at network creation
self.assertIsNone(network2['qos_policy_id'])
def _get_user_policy_file(self):
cmd = ("{} crudini --get /etc/neutron/neutron.conf "
"oslo_policy policy_file ".format(self.neutron_container))
policy_file = self.run_on_master_controller(cmd).rstrip()
self.assertNotEqual(
'', policy_file,
'Failed to determine a valid policy file path')
return policy_file
def _dump_user_policy_file(self, node):
cmd = ("{} oslopolicy-policy-generator "
"--namespace neutron --output-file {}".format(
self.neutron_container, self.user_policy_file))
node['client'].exec_command(cmd)
node['client'].exec_command("{0} cp -f {1} {1}.bak".format(
self.neutron_container, self.user_policy_file))
def _restore_user_policy_files(self):
for node in self.nodes:
if node['type'] != 'controller':
continue
node['client'].exec_command(
"{0} bash -c 'cp -f {1}.bak {1} || true'".format(
self.neutron_container, self.user_policy_file))
def _set_user_policy(self, node, policy, value):
cmd = (r"{} sed -i 's/\(\"{}\":\ \).*/\1\"{}\"/' {}".format(
self.neutron_container, policy, value, self.user_policy_file))
LOG.debug("cmd = {}".format(cmd))
return node['client'].exec_command(cmd).rstrip()
class QosTestDscpInheritanceOvn(QosBaseTest, base.BaseTempestTestCaseOvn):
@testtools.skipUnless(
'geneve' in config.CONF.neutron_plugin_options.available_type_drivers,
"No 'geneve' in neutron_plugin_options.available_type_drivers")
@decorators.idempotent_id('55f9a614-3c80-4dc6-be4b-de9b18583317')
def test_dscp_inheritance_geneve(self):
cmd = "sudo ovs-vsctl get open . external_ids:ovn-encap-tos || true"
msg = "external_ids:ovn-encap-tos is not set to 'inherit'"
if WB_CONF.openstack_type == 'podified':
for node in self.nodes:
result = node['client'].exec_command(cmd)
if 'inherit' not in result:
raise self.skipException(msg)
if WB_CONF.openstack_type == 'devstack':
result = self.run_on_master_controller(cmd)
if 'inherit' not in result:
raise self.skipException(msg)
self._check_dscp_inheritance()
class QosTestExternalNetwork(QosBaseTest):
"""These tests should not be executed in parallel with other tests because
they perform changes in the external network, which is shared with many
other tests and could affect them
"""
def setUp(self):
super(QosTestExternalNetwork, self).setUp()
self.ensure_external_network_is_shared()
ext_vm = self._create_server(
network=self.external_network,
create_floating_ip=False)
self.ext_vm_ssh_client = ssh.Client(
ext_vm['port']['fixed_ips'][0]['ip_address'], self.username,
pkey=self.keypair['private_key'])
@decorators.idempotent_id('c018b96b-47eb-4b5f-b750-e0c857fd86a9')
def test_dscp_bwlimit_external_network(self):
max_kbps = self.bwlimit_kbps_net
dscp_mark = self.dscp_mark_net
# create sender and receiver VMs connected to the external network
vms = {}
vms['sender'], vms['receiver'] = self._create_vms_by_topology(
topology='external')
# validate both bwlimit and dscp mark from sender to receiver via
# external network
self._test_both_bwlimit_dscp(
vms, max_kbps, dscp_mark, CONF.network.public_network_id)
LOG.debug('testing BW limit from a VM instances connected to the '
'external network (receiver) to an Ext VM')
# we need to test ingress from the Ext VM PoV because the BW limit
# egress rule is associated to the receiver VM instance
self._validate_bw_limit({'ssh_client': self.ext_vm_ssh_client},
vms['receiver'], egress=False,
bw_limit=max_kbps * 1000,
protocol=constants.PROTO_NAME_TCP)
server_port = self.client.list_ports(
device_id=vms['receiver']['id'])['ports'][0]
server_ip = server_port['fixed_ips'][0]['ip_address']
LOG.debug('testing DSCP mark from the test device (undercloud) to '
'a VM instance connected to the external network')
# undercloud's interface towards the receiver IP
interface = utils.get_route_interface(
self.ext_vm_ssh_client, server_ip)
# tcpdump filter to capture marked ping reply
filtrs = '"(ip and (ip[1] & 0xfc) >> 2 == {}) and src host {}"'.format(
dscp_mark, server_ip)
remote_capture = capture.TcpdumpCapture(
self.ext_vm_ssh_client, interface, filtrs)
self.useFixture(remote_capture)
time.sleep(10)
self.check_remote_connectivity(
self.ext_vm_ssh_client, server_ip, ping_count=1)
time.sleep(5)
remote_capture.stop()
msg = 'Not found packets with expected DSCP value'
self.assertFalse(remote_capture.is_empty(), msg)
class QosTestSriovBaseTest(QosBaseTest):
def skip_if_no_qos_extension_sriov_agent(self):
l3_agents = self.os_admin.network_client.list_agents(
binary='neutron-sriov-nic-agent')['agents']
if not l3_agents:
raise self.skipTest('No sriov agent found')
for agent in l3_agents:
if 'qos' in agent['configurations'].get('extensions', []):
return
raise self.skipTest('No sriov agent with qos extension enabled')
def setUp(self):
super(QosTestSriovBaseTest, self).setUp()
self.skip_if_no_qos_extension_sriov_agent()
class QosTestSriovBwLimitTest(QosTestSriovBaseTest):
MIN_KBPS_NO_BWLIMIT = 1000000
sriov_test = True
@decorators.idempotent_id('4aeb240a-b350-4dd4-b3b2-8aadf9ad8e6d')
def test_bw_limit_sriov_direct(self):
client, server = self._create_vms_by_topology(topology='external',
port_type='direct')
# at this moment, SRIOV ports only support egress bw limit
self._validate_traffic_bw_limit(client, server, egress=True)
@decorators.idempotent_id('9a38b196-2492-4bcb-a914-5e1958f6bbb9')
def test_bw_limit_sriov_macvtap(self):
client, server = self._create_vms_by_topology(topology='external',
port_type='macvtap')
# at this moment, SRIOV ports only support egress bw limit
self._validate_traffic_bw_limit(client, server, egress=True)
class QosTestSriovMinBwPlacementEnforcementTest(QosTestSriovBaseTest):
# Nova rejects to boot VM with port which has resource_request field, below
# microversion 2.72
compute_min_microversion = '2.72'
compute_max_microversion = 'latest'
INVENTORY_KEYS = {'egress': 'NET_BW_EGR_KILOBIT_PER_SEC',
'ingress': 'NET_BW_IGR_KILOBIT_PER_SEC'}
@classmethod
def skip_checks(cls):
super(QosTestSriovMinBwPlacementEnforcementTest, cls).skip_checks()
api_version_utils.check_skip_with_microversion(
cls.compute_min_microversion, cls.compute_max_microversion,
CONF.compute.min_microversion, CONF.compute.max_microversion)
@classmethod
def resource_setup(cls):
super(QosTestSriovMinBwPlacementEnforcementTest, cls).resource_setup()
cls.compute_request_microversion = (
api_version_utils.select_request_microversion(
cls.compute_min_microversion,
CONF.compute.min_microversion))
def setUp(self):
super(QosTestSriovMinBwPlacementEnforcementTest, self).setUp()
self.useFixture(api_microversion_fixture.APIMicroversionFixture(
compute_microversion=self.compute_request_microversion))
@classmethod
def setup_clients(cls):
super(QosTestSriovMinBwPlacementEnforcementTest, cls).setup_clients()
try:
cls.resource_providers_client = \
cls.os_admin.placement.ResourceProvidersClient()
except AttributeError:
LOG.info('placement ResourceProvidersClient is not supported - '
'a configured bandwidth should be provided via '
'tempest configuration')
cls.resource_providers_client = None
def _test_minbw_placement_enforcement_sriov(self, direction):
def _check_rp_allocations(configured_min_bws, server_ids):
if not self.resource_providers_client:
LOG.debug('Resource Provider allocations cannot be verified')
return
for rp_id in self.nic_rp_ids:
rp_allocations = (
self.resource_providers_client.show_resource_provider(
rp_id + '/allocations'))['allocations']
for uuid, resources in rp_allocations.items():
self.assertIn(uuid, server_ids)
LOG.debug('Server %s found in RP allocations', uuid)
server_ids.remove(uuid)
for d, configured_min_bw in configured_min_bws.items():
self.assertEqual(
configured_min_bw,
resources['resources'][self.INVENTORY_KEYS[d]])
LOG.debug(
'Verified allocated %s bandwidth (kbps) = %d',
d, configured_min_bw)
self.assertEqual(0, len(server_ids)) # all servers found
def _bw_per_nic(directions):
if not self.resource_providers_client:
LOG.debug('BW and number of NICs need to be configured')
nic_bws = {}
if 'egress' in directions:
nic_bws['egress'] = WB_CONF.minbw_placement_nic_kbps_egress
if 'ingress' in directions:
nic_bws['ingress'] = (WB_CONF.
minbw_placement_nic_kbps_ingress)
return (
nic_bws,
WB_CONF.minbw_placement_num_nics)
# obtain list of resource providers and filter those
# corresponding with NICs
# then, obtain the bw supported per NIC
rp_list = self.resource_providers_client.list_resource_providers()[
'resource_providers']
# nic_rp_ids will contain the RPs corresponding to NIC resources
self.nic_rp_ids = []
nic_bws_list = []
for rp in rp_list:
if ':NIC Switch agent:' in rp['name']:
self.nic_rp_ids.append(rp['uuid'])
inventories = (
self.resource_providers_client.
list_resource_provider_inventories(
rp['uuid'])['inventories'])
nic_bws = {}
for d in directions:
nic_bws[d] = inventories[
self.INVENTORY_KEYS[d]]['total']
nic_bws_list.append(nic_bws)
for bw in nic_bws_list[1:]:
if nic_bws_list[0] != bw:
self.skipTest(
'Test only supported if all NICs are configured with '
'identical BW values '
'(ingress and egress values can be different)')
return nic_bws_list[0], len(nic_bws_list)
if direction in ('egress', 'ingress'):
directions = [direction]
elif direction == 'both':
directions = ['egress', 'ingress']
else:
raise ValueError('direction value not supported: %s' %
direction)
nic_bws, num_nics = _bw_per_nic(directions)
# the number of VMs supported equals the number of NICs,
# considering each VM is created with a port
# with min_bw between 60% and 100% of nic_bw
max_number_vms = num_nics
# create qos policy that will be applied to the provider network
minbw_policy_id = self._create_qos_policy()
configured_min_bws = {}
for d in directions:
configured_min_bws[d] = int(nic_bws[d] * random.uniform(0.6, 1.0))
self.qos_min_bw_rules_client.create_minimum_bandwidth_rule(
qos_policy_id=minbw_policy_id,
**{'direction': d,
'min_kbps': configured_min_bws[d]})
# creation of max_number_vms VMs should be successful
server_ids = []
for i in range(max_number_vms):
port_type = random.choice(('direct', 'macvtap'))
server_id = self._create_server_for_topology(
network_id=CONF.network.public_network_id,
port_type=port_type,
port_qos_policy_id=minbw_policy_id)['id']
server_ids.append(server_id)
_check_rp_allocations(configured_min_bws, server_ids)
self.assertRaises(
tempest_exceptions.BuildErrorException,
self._create_server_for_topology,
network_id=CONF.network.public_network_id,
port_type=random.choice(('direct', 'macvtap')),
port_qos_policy_id=minbw_policy_id)
@decorators.idempotent_id('6c63d5b8-d642-4a7c-9b07-c0755979b6a8')
def test_minbw_placement_enforcement_sriov_egress(self):
self._test_minbw_placement_enforcement_sriov('egress')
@decorators.idempotent_id('c1791f96-9eb4-4119-b598-5610c6043ef1')
def test_minbw_placement_enforcement_sriov_ingress(self):
self._test_minbw_placement_enforcement_sriov('ingress')
@decorators.idempotent_id('e212fab8-9a2d-4753-ba5d-53a299b0af36')
def test_minbw_placement_enforcement_sriov_both(self):
self._test_minbw_placement_enforcement_sriov('both')
class QosTestOvn(base.BaseTempestTestCaseOvn, QosBaseTest):
MAX_KBPS = 1000
MAX_BURST_KBPS = 0.8 * MAX_KBPS
DSCP_MARK_OPTION = QosBaseTest.dscp_mark_net
def _create_qos_policy_bw_and_dscp(self):
# Creates QoS policy with bw and dscp rules,
# rules values are according to class constants.
# Returns:
# str:uuid of QoS policy
policy_id = self._create_qos_policy()
rule_data = {
'max_kbps': self.MAX_KBPS,
'max_burst_kbps': self.MAX_BURST_KBPS}
self._create_qos_bw_limit_rule(
policy_id, rule_data)['id']
self.admin_client.create_dscp_marking_rule(
policy_id,
self.DSCP_MARK_OPTION)
LOG.debug("Created QoS policy and rules")
return policy_id
def _validate_qos_rules_nbdb(
self, port_id=None, fip_id=None, expected_empty=False):
# Validates QoS bw and dscp rules with constant values in OVN NBDB.
# Parameters:
# port_id (str): port uuid that has QoS policy attached.
# fip_id (str): floating ip uuid that has QoS policy attached.
# expected_empty(bool): True if no qos rules are expected
self.assertTrue(port_id or fip_id,
'At least one of the input params is required')
cmds = []
if port_id:
cmds.append(r'{} find qos match="inport\ \=\=\ \"{}\""'.format(
self.nbctl, port_id))
if fip_id:
cmds.append(
r'%s find qos external_ids={"neutron\:fip_id"="%s"}' % (
self.nbctl, fip_id))
for cmd in cmds:
policy_settings = self.run_on_master_controller(
cmd).rstrip()
if expected_empty:
self.assertFalse(policy_settings,
'QoS is not supposed to be applied on this '
'port in OVN NBDB')
LOG.debug('Success: no QoS policies found, as expected')
else:
for line in policy_settings.splitlines():
if line.startswith('action'):
dscp_settings = line
if line.startswith('bandwidth'):
bandwidth_settings = line
self.assertTrue(
'{burst=%d, rate=%d}' % (self.MAX_BURST_KBPS,
self.MAX_KBPS)
in bandwidth_settings,
'Bandwidth options are not set as expected')
LOG.debug('BW limit options found')
self.assertTrue(
'{dscp=%s}' % (self.DSCP_MARK_OPTION,)
in dscp_settings,
'DSCP options are not set as expected')
LOG.debug('DSCP options found')
@decorators.idempotent_id('08b74ece-d7f2-4a80-9a1e-5fb7ec928a9b')
def test_attach_qos_port_to_vm_with_another_port(self):
# create resources
network_qos = self.create_network()
network_no_qos = self.create_network()
subnet_qos = self.create_subnet(network_qos, cidr="10.10.1.0/24")
subnet_no_qos = self.create_subnet(network_no_qos, cidr="10.10.2.0/24")
secgroup = self.secgroup['security_group']
self.create_router_interface(self.router['id'], subnet_qos['id'])
self.create_router_interface(self.router['id'], subnet_no_qos['id'])
policy_id = self._create_qos_policy_bw_and_dscp()
# create port with QoS policy, another port without policy
port_qos = self.create_port(
network_qos,
qos_policy_id=policy_id,
security_groups=[secgroup['id']])
# launch server with non policy port, then attach also to policy port
port_no_qos, fip_no_qos, server = self._create_server(
network=network_no_qos).values()
# other server to validate QoS policy port later
other_fip, other_server = tuple(self._create_server(
network=network_qos).values())[1:]
server['ssh_client'] = ssh.Client(
fip_no_qos['floating_ip_address'],
self.username,
pkey=self.keypair['private_key'])
other_server['ssh_client'] = ssh.Client(
other_fip['floating_ip_address'],
self.username,
pkey=self.keypair['private_key'])
server['ssh_client'].test_connection_auth()
self.create_interface(server['id'], port_qos['id'])
waiters.wait_for_interface_status(
self.os_primary.interfaces_client, server['id'],
port_qos['id'], constants.PORT_STATUS_ACTIVE)
# configure ip and activate QoS port interface from server CLI
utils.configure_interface_up(server['ssh_client'], port_qos)
# validate connectivity with QoS port using another VM on QoS subnet
self.check_remote_connectivity(
other_server['ssh_client'],
port_qos['fixed_ips'][0]['ip_address'],
ping_count=1)
# validate only one port from both has QoS applied in OVN NBDB
self._validate_qos_rules_nbdb(port_id=port_qos['id'])
self._validate_qos_rules_nbdb(port_id=port_no_qos['id'],
expected_empty=True)
# validate dscp rules applied on traffic
self._validate_traffic_marked(
self.DSCP_MARK_OPTION,
src_server=server, dst_server=other_server)
# validate bw limit rules applied on traffic
# (optionally if iperf3 installed on advanced image)
is_iperf_installed = False
try:
for ssh_client in other_server['ssh_client'], server['ssh_client']:
self._skip_if_iperf3_not_installed(ssh_client)
is_iperf_installed = True
except self.skipException:
LOG.debug("iperf3 not found on VM, skipped QoS traffic validation")
if is_iperf_installed:
self._validate_bw_limit(
client=server, server=other_server,
bw_limit=self.MAX_KBPS * 1000)
@decorators.idempotent_id('ba85bd87-f4f6-45a8-a2bd-97acb804b6f9')
def test_create_network_qos_policy_before_creating_vm(self):
# create QoS policy and rules first, then other resources
policy_id = self._create_qos_policy_bw_and_dscp()
network = self.create_network()
LOG.debug("Created network '%s'", network['name'])
subnet = self.create_subnet(network)
self.create_router_interface(self.router['id'], subnet['id'])
# attach QoS policy to network before server is launched
self.admin_client.update_network(
network['id'], qos_policy_id=policy_id)
LOG.debug("Attached QoS policy to network '%s'", network['name'])
port, fip, server = tuple(self._create_server(
create_floating_ip=True, network=network).values())
# attach a qos policy to the fip
fip_policy_id = self._create_qos_policy_bw_and_dscp()
self.client.update_floatingip(fip['id'], qos_policy_id=fip_policy_id)
# validate QoS object appears in NBDB, disappears when server and fip
# are deleted
self._validate_qos_rules_nbdb(port_id=port['id'], fip_id=fip['id'])
self.os_primary.servers_client.delete_server(server['id'])
waiters.wait_for_server_termination(self.os_primary.servers_client,
server['id'])
self.client.delete_floatingip(fip['id'])
self._validate_qos_rules_nbdb(
port_id=port['id'], fip_id=fip['id'], expected_empty=True)