Merge "Skip test cases unless external network is available"
This commit is contained in:
commit
70cbb0d463
@ -52,9 +52,12 @@ NetworkWithNetMtuWriteStackFixture = (
|
|||||||
_neutron.NetworkWithNetMtuWriteStackFixture)
|
_neutron.NetworkWithNetMtuWriteStackFixture)
|
||||||
SecurityGroupsFixture = _neutron.SecurityGroupsFixture
|
SecurityGroupsFixture = _neutron.SecurityGroupsFixture
|
||||||
|
|
||||||
|
get_external_network = _neutron.get_external_network
|
||||||
|
has_external_network = _neutron.has_external_network
|
||||||
|
skip_unless_has_external_network = _neutron.skip_unless_has_external_network
|
||||||
get_floating_network = _neutron.get_floating_network
|
get_floating_network = _neutron.get_floating_network
|
||||||
has_floating_network = _neutron.has_floating_network
|
has_floating_network = _neutron.has_floating_network
|
||||||
skip_if_missing_floating_network = _neutron.skip_if_missing_floating_network
|
skip_unless_has_floating_network = _neutron.skip_unless_has_floating_network
|
||||||
|
|
||||||
ServerStackFixture = _nova.ServerStackFixture
|
ServerStackFixture = _nova.ServerStackFixture
|
||||||
KeyPairStackFixture = _nova.KeyPairStackFixture
|
KeyPairStackFixture = _nova.KeyPairStackFixture
|
||||||
|
@ -16,6 +16,7 @@
|
|||||||
from __future__ import absolute_import
|
from __future__ import absolute_import
|
||||||
|
|
||||||
import json
|
import json
|
||||||
|
import typing
|
||||||
|
|
||||||
import netaddr
|
import netaddr
|
||||||
from oslo_log import log
|
from oslo_log import log
|
||||||
@ -33,6 +34,8 @@ LOG = log.getLogger(__name__)
|
|||||||
CONF = config.CONF
|
CONF = config.CONF
|
||||||
LOG = log.getLogger(__name__)
|
LOG = log.getLogger(__name__)
|
||||||
|
|
||||||
|
NeutronNetworkType = typing.Dict[str, typing.Any]
|
||||||
|
|
||||||
|
|
||||||
class ExternalNetworkStackFixture(heat.HeatStackFixture):
|
class ExternalNetworkStackFixture(heat.HeatStackFixture):
|
||||||
|
|
||||||
@ -42,15 +45,40 @@ class ExternalNetworkStackFixture(heat.HeatStackFixture):
|
|||||||
def external_name(self):
|
def external_name(self):
|
||||||
return tobiko.tobiko_config().neutron.external_network
|
return tobiko.tobiko_config().neutron.external_network
|
||||||
|
|
||||||
_external_network = None
|
subnet_enable_dhcp: typing.Optional[bool] = True
|
||||||
|
|
||||||
|
_external_network: typing.Optional[NeutronNetworkType] = None
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def external_network(self):
|
def external_network(self) -> typing.Optional[NeutronNetworkType]:
|
||||||
network = self._external_network
|
external_network = self._external_network
|
||||||
if network is None:
|
if external_network is None:
|
||||||
self._external_network = network = find_external_network(
|
subnet_parameters = {}
|
||||||
name=self.external_name) or {}
|
if self.subnet_enable_dhcp is not None:
|
||||||
return network
|
subnet_parameters['enable_dhcp'] = self.subnet_enable_dhcp
|
||||||
|
for network in list_external_networks(name=self.external_name):
|
||||||
|
if not network['subnets']:
|
||||||
|
LOG.debug(f"Network '{network['id']}' has any subnet")
|
||||||
|
continue
|
||||||
|
if subnet_parameters:
|
||||||
|
subnets = neutron.list_subnets(network_id=network['id'],
|
||||||
|
**subnet_parameters)
|
||||||
|
if not subnets:
|
||||||
|
LOG.debug(f"Network '{network['id']}' has any valid "
|
||||||
|
f"subnet: {subnet_parameters}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
network_dump = json.dumps(network, indent=4, sort_keys=True)
|
||||||
|
LOG.debug(f"Found external network for {self.fixture_name}:\n"
|
||||||
|
f"{network_dump}")
|
||||||
|
self._external_network = external_network = network
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
LOG.warning("No external network found for "
|
||||||
|
f"'{self.fixture_name}':\n"
|
||||||
|
f" - name or ID: {self.external_name}\n"
|
||||||
|
f" - subnet attributes: {subnet_parameters}\n")
|
||||||
|
return external_network
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def external_id(self):
|
def external_id(self):
|
||||||
@ -74,6 +102,8 @@ class FloatingNetworkStackFixture(ExternalNetworkStackFixture):
|
|||||||
def external_name(self):
|
def external_name(self):
|
||||||
return tobiko.tobiko_config().neutron.floating_network
|
return tobiko.tobiko_config().neutron.floating_network
|
||||||
|
|
||||||
|
subnet_enable_dhcp = None
|
||||||
|
|
||||||
|
|
||||||
@neutron.skip_if_missing_networking_extensions('port-security')
|
@neutron.skip_if_missing_networking_extensions('port-security')
|
||||||
class NetworkStackFixture(heat.HeatStackFixture):
|
class NetworkStackFixture(heat.HeatStackFixture):
|
||||||
@ -270,32 +300,41 @@ class SecurityGroupsFixture(heat.HeatStackFixture):
|
|||||||
template = _hot.heat_template_file('neutron/security_groups.yaml')
|
template = _hot.heat_template_file('neutron/security_groups.yaml')
|
||||||
|
|
||||||
|
|
||||||
def find_external_network(name=None):
|
def list_external_networks(name: typing.Optional[str] = None) -> \
|
||||||
network = None
|
tobiko.Selection[NeutronNetworkType]:
|
||||||
if name:
|
networks = tobiko.Selection[NeutronNetworkType]()
|
||||||
|
if name is not None:
|
||||||
try:
|
try:
|
||||||
network = neutron.get_network(name)
|
network = neutron.get_network(name)
|
||||||
except neutron.NoSuchNetwork:
|
except neutron.NoSuchNetwork:
|
||||||
LOG.debug('No such network with ID %r', name)
|
LOG.error(f"No such network with ID '{name}'")
|
||||||
|
else:
|
||||||
|
networks.append(network)
|
||||||
|
if not networks:
|
||||||
|
network_params = {'router:external': True, "status": "ACTIVE"}
|
||||||
|
if name is not None:
|
||||||
|
network_params['name'] = name
|
||||||
|
networks += neutron.list_networks(**network_params)
|
||||||
|
if not networks and name:
|
||||||
|
raise ValueError("No such external network with name or ID "
|
||||||
|
f"'{name}'")
|
||||||
|
return networks
|
||||||
|
|
||||||
if not network:
|
|
||||||
params = {'router:external': True, "status": "ACTIVE"}
|
|
||||||
if name:
|
|
||||||
params['name'] = name
|
|
||||||
try:
|
|
||||||
network = neutron.find_network(**params)
|
|
||||||
except tobiko.ObjectNotFound as ex:
|
|
||||||
LOG.exception('No such network (%s):',
|
|
||||||
json.dumps(params, sort_keys=True))
|
|
||||||
if name:
|
|
||||||
raise ValueError("No such external network with name or ID "
|
|
||||||
f"'{name}'") from ex
|
|
||||||
|
|
||||||
if network:
|
def get_external_network_id():
|
||||||
LOG.debug('Found external network %r:\n%s',
|
return tobiko.setup_fixture(ExternalNetworkStackFixture).network_id
|
||||||
network['name'], json.dumps(network, indent=4,
|
|
||||||
sort_keys=True))
|
|
||||||
return network
|
def get_external_network():
|
||||||
|
return tobiko.setup_fixture(ExternalNetworkStackFixture).network_details
|
||||||
|
|
||||||
|
|
||||||
|
def has_external_network():
|
||||||
|
return tobiko.setup_fixture(ExternalNetworkStackFixture).has_network
|
||||||
|
|
||||||
|
|
||||||
|
skip_unless_has_external_network = tobiko.skip_unless(
|
||||||
|
'External network not found', has_external_network)
|
||||||
|
|
||||||
|
|
||||||
def get_floating_network_id():
|
def get_floating_network_id():
|
||||||
@ -310,5 +349,5 @@ def has_floating_network():
|
|||||||
return tobiko.setup_fixture(FloatingNetworkStackFixture).has_network
|
return tobiko.setup_fixture(FloatingNetworkStackFixture).has_network
|
||||||
|
|
||||||
|
|
||||||
skip_if_missing_floating_network = tobiko.skip_unless(
|
skip_unless_has_floating_network = tobiko.skip_unless(
|
||||||
'Floating network not found', has_floating_network)
|
'Floating network not found', has_floating_network)
|
||||||
|
@ -102,15 +102,31 @@ class L3HaNetworkTest(NetworkTest):
|
|||||||
|
|
||||||
|
|
||||||
@keystone.skip_unless_has_keystone_credentials()
|
@keystone.skip_unless_has_keystone_credentials()
|
||||||
|
@stacks.skip_unless_has_external_network
|
||||||
|
class ExternalNetworkStackTest(testtools.TestCase):
|
||||||
|
|
||||||
|
def test_get_external_network(self):
|
||||||
|
network = stacks.get_external_network()
|
||||||
|
self.assertTrue(network['id'])
|
||||||
|
self.assertIs(True, network['router:external'])
|
||||||
|
self.assertEqual('ACTIVE', network['status'])
|
||||||
|
subnets = neutron.list_subnets(network_id=network['id'],
|
||||||
|
enable_dhcp=True)
|
||||||
|
self.assertNotEqual([], subnets)
|
||||||
|
|
||||||
|
def test_has_external_network(self):
|
||||||
|
self.assertIs(True, stacks.has_external_network())
|
||||||
|
|
||||||
|
|
||||||
|
@keystone.skip_unless_has_keystone_credentials()
|
||||||
|
@stacks.skip_unless_has_floating_network
|
||||||
class FloatingNetworkStackTest(testtools.TestCase):
|
class FloatingNetworkStackTest(testtools.TestCase):
|
||||||
|
|
||||||
@stacks.skip_if_missing_floating_network
|
|
||||||
def test_get_floating_network(self):
|
def test_get_floating_network(self):
|
||||||
network = stacks.get_floating_network()
|
network = stacks.get_floating_network()
|
||||||
self.assertTrue(network['id'])
|
self.assertTrue(network['id'])
|
||||||
self.assertIs(True, network['router:external'])
|
self.assertIs(True, network['router:external'])
|
||||||
self.assertEqual('ACTIVE', network['status'])
|
self.assertEqual('ACTIVE', network['status'])
|
||||||
|
|
||||||
@stacks.skip_if_missing_floating_network
|
|
||||||
def test_has_floating_network(self):
|
def test_has_floating_network(self):
|
||||||
self.assertIs(True, stacks.has_floating_network())
|
self.assertIs(True, stacks.has_floating_network())
|
||||||
|
@ -110,6 +110,7 @@ class PortTest(testtools.TestCase):
|
|||||||
|
|
||||||
# --- Test opening ports on external network ----------------------------------
|
# --- Test opening ports on external network ----------------------------------
|
||||||
|
|
||||||
|
@stacks.skip_unless_has_external_network
|
||||||
class ExternalPortTest(PortTest):
|
class ExternalPortTest(PortTest):
|
||||||
"""Test Neutron ports"""
|
"""Test Neutron ports"""
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user