
Validate "openstack port list --long" output has correct related security group. Test class may easily verify OSPRH-14118 (LP#2098980) when fixed. Also adds `force_bash` argument to `validate_command` method, adjusted where bash wrap was previously used. Bump to include tested openstackclient fix on master branch: Depends-On: https://review.opendev.org/c/openstack/releases/+/942104 Depends-On: https://review.opendev.org/c/openstack/requirements/+/942491 Skip test on RDO whitebox job until openstackclient bigger than antelope version (which has no more releases): https://github.com/openstack-k8s-operators/ci-framework/pull/2775 Change-Id: Id5e554a8f0079b31706574cece3b364e6a5e8e64
185 lines
7.4 KiB
Python
185 lines
7.4 KiB
Python
# Copyright 2024 Red Hat, Inc.
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
from neutron_tempest_plugin import config
|
|
from neutron_tempest_plugin.scenario import constants
|
|
from tempest.common import waiters
|
|
from tempest.lib.common.utils import data_utils
|
|
from tempest.lib.common.utils import test_utils
|
|
from tempest.lib import decorators
|
|
import testtools
|
|
|
|
from whitebox_neutron_tempest_plugin.tests.scenario import base
|
|
|
|
CONF = config.CONF
|
|
WB_CONF = config.CONF.whitebox_neutron_plugin_options
|
|
|
|
|
|
class NetworkPortTestManyVmsBase(base.BaseTempestWhiteboxTestCase):
|
|
credentials = ['primary', 'admin']
|
|
|
|
def _update_quota(self, quota_item, quota_value):
|
|
quota_set = self.quotas_client.show_quota_set(
|
|
self.client.tenant_id)['quota_set']
|
|
default_quota_value = quota_set[quota_item]
|
|
|
|
self.quotas_client.update_quota_set(
|
|
self.client.tenant_id, force=True,
|
|
**{quota_item: quota_value})
|
|
self.addCleanup(
|
|
self.quotas_client.update_quota_set, self.client.tenant_id,
|
|
**{quota_item: default_quota_value})
|
|
|
|
@testtools.skipIf(CONF.compute_feature_enabled.console_output,
|
|
'With console_output set to enabled, VMs are spawned '
|
|
'too slowly.')
|
|
def _test_port_status_when_many_vms(
|
|
self, stop_vms_before_compute_reboot=False):
|
|
if not self.image_ref:
|
|
raise self.skipException(
|
|
"Default image is too heavy for launching many VMs. "
|
|
"Alternative image was not found.")
|
|
|
|
network = self.create_network()
|
|
self.create_subnet(network, cidr="192.168.1.0/24")
|
|
self.keypair = self.create_keypair()
|
|
|
|
servers_count = WB_CONF.servers_count
|
|
self.flavors_client = self.os_admin.compute.FlavorsClient()
|
|
self.quotas_client = self.os_admin.compute.QuotasClient()
|
|
vcpus = self.flavors_client.show_flavor(
|
|
self.flavor_ref)['flavor']['vcpus']
|
|
self._update_quota('cores', servers_count * vcpus)
|
|
self._update_quota('instances', servers_count)
|
|
|
|
name = self._testMethodName
|
|
params = {
|
|
'flavor_ref': self.flavor_ref,
|
|
'image_ref': self.image_ref,
|
|
'networks': [{'uuid': network['id']}],
|
|
'key_name': self.keypair['name'],
|
|
'name': data_utils.rand_name(name)
|
|
}
|
|
server = self.create_server(**params)['server']
|
|
params.update({
|
|
'min_count': servers_count - 1,
|
|
'scheduler_hints': {'same_host': server['id']}})
|
|
self.create_server(**params)
|
|
client = self.os_primary.servers_client
|
|
servers = client.list_servers(**{'name': name})['servers']
|
|
|
|
for server in servers:
|
|
self.addCleanup(
|
|
test_utils.call_and_ignore_notfound_exc,
|
|
waiters.wait_for_server_termination,
|
|
client, server['id'])
|
|
self.addCleanup(
|
|
test_utils.call_and_ignore_notfound_exc,
|
|
client.delete_server, server['id'])
|
|
for server in servers:
|
|
# since the test creates a high number of servers, it may take
|
|
# a long time until they are ACTIVE
|
|
self.wait_for_server_status(
|
|
server, constants.SERVER_STATUS_ACTIVE, extra_timeout=300)
|
|
|
|
if stop_vms_before_compute_reboot:
|
|
for server in servers:
|
|
client.stop_server(server['id'])
|
|
for server in servers:
|
|
waiters.wait_for_server_status(
|
|
client, server['id'], 'SHUTOFF')
|
|
|
|
az_list = self.os_admin.az_client.list_availability_zones(detail=True)
|
|
zones = [zone['zoneName'] for zone in az_list['availabilityZoneInfo']
|
|
if zone['zoneName'] != 'internal']
|
|
|
|
ports = []
|
|
for zone in zones:
|
|
zone_ports = self.client.list_ports(
|
|
network_id=network['id'],
|
|
device_owner='compute:' + zone)['ports']
|
|
ports.extend(zone_ports)
|
|
|
|
self.assertEqual(
|
|
len(ports), servers_count,
|
|
'Number of ports does not match number of servers.')
|
|
|
|
if not stop_vms_before_compute_reboot:
|
|
for port in ports:
|
|
self._check_port(port)
|
|
|
|
def _check_port(self, port):
|
|
self.assertEqual(
|
|
port['status'], 'ACTIVE', 'A nonactive port found')
|
|
|
|
|
|
class NetworkPortTestManyVmsOvn(NetworkPortTestManyVmsBase,
|
|
base.BaseTempestTestCaseOvn):
|
|
|
|
def _get_nbdb_port_state(self, port_id):
|
|
cmd = "%s %s get logical_switch_port %s up" % (self.nbctl, self.opt,
|
|
port_id)
|
|
return self.run_on_master_controller(cmd).rstrip()
|
|
|
|
def _check_port(self, port):
|
|
super(NetworkPortTestManyVmsOvn, self)._check_port(port)
|
|
self.assertEqual(
|
|
self._get_nbdb_port_state(port['id']), 'true',
|
|
'Port is not up in NB DB')
|
|
|
|
@decorators.idempotent_id('6a6d4ae6-2d61-4b0e-b8fb-5aef5292dccc')
|
|
def test_port_status_when_many_vms_ovn(self):
|
|
self.opt = "--no-leader-only"
|
|
self._test_port_status_when_many_vms()
|
|
|
|
|
|
class PortListLongOptSGsCmd(base.BaseTempestWhiteboxTestCase):
|
|
"""Test class verifies BZ#2214566/OSPRH-13533 doesn't regress:
|
|
"openstack port list --long" output has correct related security group.
|
|
|
|
Test class may also verify OSPRH-14118 (LP#2098980) when fixed.
|
|
"""
|
|
credentials = ['primary', 'admin']
|
|
|
|
@classmethod
|
|
def resource_setup(cls):
|
|
super(PortListLongOptSGsCmd, cls).resource_setup()
|
|
cls.secgroup = cls.create_security_group(
|
|
name=data_utils.rand_name('port-list-sgs-test-secgroup'))
|
|
cls.security_groups.append(cls.secgroup)
|
|
network = cls.create_network(
|
|
name=data_utils.rand_name('port-list-sgs-test-network'))
|
|
cls.create_subnet(
|
|
network,
|
|
name=data_utils.rand_name('port-list-sgs-test-secgroup'))
|
|
cls.vm_kwargs = {
|
|
'flavor_ref': cls.flavor_ref,
|
|
'image_ref': cls.image_ref,
|
|
'key_name': cls.create_keypair()['name'],
|
|
'networks': [{'uuid': network['id']}],
|
|
'security_groups': [{'name': cls.secgroup['name']}],
|
|
'name': data_utils.rand_name('port-list-sgs-test-vm')}
|
|
|
|
@decorators.idempotent_id('9b33caa4-62a8-49a6-b661-ecbdc520df8c')
|
|
def test_port_list_long_opt_sgs_cmd(self):
|
|
vm = self.create_server(**self.vm_kwargs)['server']
|
|
prefix = self.get_osp_cmd_prefix()
|
|
# TODO(mblue): add test for OSPRH-14118 (LP#2098980) when fixed
|
|
cmd = ("{}openstack port list --server {} --long -c 'Security Groups'"
|
|
).format(prefix, vm['id'])
|
|
# validates correct security group uuid in output
|
|
self.validate_command(cmd,
|
|
self.secgroup['id'],
|
|
force_bash=True)
|