Maor Blaustein 2ad8456264 Apply rollout wait to remaining tests
This change applies rollout wait introduced in patch [1] to any other
test that can benefit from such extra stability.

[1] 945106: Wait for config rollout to complete in test_api_server
https://review.opendev.org/c/x/whitebox-neutron-tempest-plugin/+/945106

Change-Id: I2aaa0170f702c278be1d193dca30450c46f1303f
2025-03-30 16:44:10 +03:00

110 lines
4.6 KiB
Python

# Copyright 2024 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
from neutron_tempest_plugin.common import ssh
from neutron_tempest_plugin.common import utils
from oslo_log import log
from tempest import config
from whitebox_neutron_tempest_plugin.common import utils as wb_utils
from whitebox_neutron_tempest_plugin.tests.scenario import base as wb_base
CONF = config.CONF
WB_CONF = CONF.whitebox_neutron_plugin_options
LOG = log.getLogger(__name__)
FDB_AGE_THRESHOLD = 5
class OvnFdbAgingTest(wb_base.BaseTempestTestCaseOvn):
@classmethod
def resource_setup(cls):
super().resource_setup()
cls.discover_nodes()
def _check_mac_in_ovn_sb_fdb(self, mac_address, is_present=True):
def _check_mac():
cmd = ('{} --format=list --bare --columns=mac list FDB; '
'true').format(self.sbctl)
fdb_list = self.run_on_master_controller(cmd).lower()
if is_present:
return mac_address.lower() in fdb_list
else:
return mac_address.lower() not in fdb_list
utils.wait_until_true(_check_mac, timeout=60, sleep=1)
def test_fdb_aging(self):
# 1) Configure the FDB learn flag and set the aging threshold to 5
# seconds.
config_list = [
wb_base.ConfigOption('ovn', 'localnet_learn_fdb', 'true'),
wb_base.ConfigOption('ovn', 'fdb_age_threshold',
FDB_AGE_THRESHOLD)]
self.set_service_setting(file=wb_utils.get_ml2_conf_file(),
config_list=config_list,
cfg_change=False,
wait_res='deploy/neutron')
# 2) restart neutron api on all controllers simultaneously
if not WB_CONF.openstack_type == 'podified':
service_ptn = wb_utils.get_neutron_api_service_name()
for node in self.nodes:
if node['is_controller']:
# NOTE(mblue): if reset fails on multinode, consider
# wait_until_active=False for a more simultaneous reset
self.reset_node_service(service_ptn, node['client'])
wb_utils.wait_for_neutron_api(self.client)
# 3) Create a VM and disable the port security on the VM port.
network = self.create_network()
subnet = self.create_subnet(network=network)
keypair = self.create_keypair()
server_kwargs = {
'flavor_ref': CONF.compute.flavor_ref,
'image_ref': CONF.compute.image_ref,
'key_name': keypair['name'],
'networks': [{'uuid': network['id']}]
}
server = self.create_server(**server_kwargs)['server']
port = self.client.list_ports(device_id=server['id'])['ports'][0]
port = self.client.update_port(port['id'], security_groups=[],
port_security_enabled=False)['port']
self.assertEmpty(port['security_groups'])
self.assertFalse(port['port_security_enabled'])
LOG.debug('VM port MAC address: %s', port['mac_address'])
# 4) Server connectivity configuration.
router = self.create_router_by_client()
self.create_router_interface(router['id'], subnet['id'])
fip = self.create_floatingip(port=port)
ssh_client = ssh.Client(fip['floating_ip_address'],
CONF.validation.image_ssh_user,
pkey=keypair['private_key'])
# 5) Ping and check that the MAC address is present in the FDB.
self.check_remote_connectivity(ssh_client, '1.2.3.4',
should_succeed=False)
self._check_mac_in_ovn_sb_fdb(port['mac_address'])
# 6) Wait the FDB aging time and check the MAC address is gone.
compute_client = self.os_primary.servers_client
compute_client.delete_server(server['id'])
time.sleep(FDB_AGE_THRESHOLD)
self._check_mac_in_ovn_sb_fdb(port['mac_address'], is_present=False)