Merge "Make ResourceFixtures configurable"

This commit is contained in:
Zuul 2023-09-29 12:13:31 +00:00 committed by Gerrit Code Review
commit d47aefd7e7
7 changed files with 222 additions and 33 deletions

View File

@ -47,27 +47,31 @@ def is_fixture(obj: typing.Any) -> bool:
@typing.overload @typing.overload
def get_fixture(obj: typing.Type[F], def get_fixture(obj: typing.Type[F],
fixture_id: typing.Any = None, fixture_id: typing.Any = None,
manager: 'FixtureManager' = None) -> F: manager: 'FixtureManager' = None,
**kwargs) -> F:
pass pass
@typing.overload @typing.overload
def get_fixture(obj: F, def get_fixture(obj: F,
fixture_id: typing.Any = None, fixture_id: typing.Any = None,
manager: 'FixtureManager' = None) -> F: manager: 'FixtureManager' = None,
**kwargs) -> F:
pass pass
@typing.overload @typing.overload
def get_fixture(obj: str, def get_fixture(obj: str,
fixture_id: typing.Any = None, fixture_id: typing.Any = None,
manager: 'FixtureManager' = None) -> fixtures.Fixture: manager: 'FixtureManager' = None,
**kwargs) -> fixtures.Fixture:
pass pass
def get_fixture(obj: FixtureType, def get_fixture(obj: FixtureType,
fixture_id: typing.Any = None, fixture_id: typing.Any = None,
manager: 'FixtureManager' = None) -> F: manager: 'FixtureManager' = None,
**kwargs) -> F:
"""Returns a fixture identified by given :param obj: """Returns a fixture identified by given :param obj:
It returns registered fixture for given :param obj:. If none has been It returns registered fixture for given :param obj:. If none has been
@ -95,7 +99,7 @@ def get_fixture(obj: FixtureType,
if isinstance(obj, fixtures.Fixture): if isinstance(obj, fixtures.Fixture):
return typing.cast(F, obj) return typing.cast(F, obj)
return fixture_manager(obj, manager).get_fixture( return fixture_manager(obj, manager).get_fixture(
obj, fixture_id=fixture_id) obj, fixture_id=fixture_id, **kwargs)
def get_fixture_name(obj) -> str: def get_fixture_name(obj) -> str:
@ -153,21 +157,24 @@ def remove_fixture(obj: FixtureType,
@typing.overload @typing.overload
def setup_fixture(obj: typing.Type[F], def setup_fixture(obj: typing.Type[F],
fixture_id: typing.Any = None, fixture_id: typing.Any = None,
manager: 'FixtureManager' = None) -> F: manager: 'FixtureManager' = None,
**kwargs) -> F:
pass pass
@typing.overload @typing.overload
def setup_fixture(obj: F, def setup_fixture(obj: F,
fixture_id: typing.Any = None, fixture_id: typing.Any = None,
manager: 'FixtureManager' = None) -> F: manager: 'FixtureManager' = None,
**kwargs) -> F:
pass pass
def setup_fixture(obj: FixtureType, def setup_fixture(obj: FixtureType,
fixture_id: typing.Any = None, fixture_id: typing.Any = None,
manager: 'FixtureManager' = None, manager: 'FixtureManager' = None,
alternative: FixtureType = None) \ alternative: FixtureType = None,
**kwargs) \
-> F: -> F:
"""I setups registered fixture """I setups registered fixture
@ -183,7 +190,8 @@ def setup_fixture(obj: FixtureType,
fixture: F = typing.cast(F, fixture: F = typing.cast(F,
get_fixture(_obj, get_fixture(_obj,
fixture_id=fixture_id, fixture_id=fixture_id,
manager=manager)) manager=manager,
**kwargs))
try: try:
fixture.setUp() fixture.setUp()
break break
@ -210,22 +218,26 @@ def handle_setup_error(ex_type, ex_value, ex_tb):
@typing.overload @typing.overload
def reset_fixture(obj: typing.Type[F], def reset_fixture(obj: typing.Type[F],
fixture_id: typing.Any = None, fixture_id: typing.Any = None,
manager: 'FixtureManager' = None) -> F: manager: 'FixtureManager' = None,
**kwargs) -> F:
pass pass
@typing.overload @typing.overload
def reset_fixture(obj: F, def reset_fixture(obj: F,
fixture_id: typing.Any = None, fixture_id: typing.Any = None,
manager: 'FixtureManager' = None) -> F: manager: 'FixtureManager' = None,
**kwargs) -> F:
pass pass
def reset_fixture(obj: FixtureType, def reset_fixture(obj: FixtureType,
fixture_id: typing.Any = None, fixture_id: typing.Any = None,
manager: 'FixtureManager' = None) -> F: manager: 'FixtureManager' = None,
**kwargs) -> F:
"""It cleanups and setups registered fixture""" """It cleanups and setups registered fixture"""
fixture: F = get_fixture(obj, fixture_id=fixture_id, manager=manager) fixture: F = get_fixture(
obj, fixture_id=fixture_id, manager=manager, **kwargs)
with _exception.handle_multiple_exceptions(): with _exception.handle_multiple_exceptions():
fixture.reset() fixture.reset()
return fixture return fixture
@ -258,26 +270,30 @@ def cleanup_fixture(obj: FixtureType,
@typing.overload @typing.overload
def use_fixture(obj: typing.Type[F], def use_fixture(obj: typing.Type[F],
fixture_id: typing.Any = None, fixture_id: typing.Any = None,
manager: 'FixtureManager' = None) -> F: manager: 'FixtureManager' = None,
**kwargs) -> F:
pass pass
@typing.overload @typing.overload
def use_fixture(obj: F, def use_fixture(obj: F,
fixture_id: typing.Any = None, fixture_id: typing.Any = None,
manager: 'FixtureManager' = None) -> F: manager: 'FixtureManager' = None,
**kwargs) -> F:
pass pass
def use_fixture(obj: FixtureType, def use_fixture(obj: FixtureType,
fixture_id: typing.Any = None, fixture_id: typing.Any = None,
manager: 'FixtureManager' = None) -> F: manager: 'FixtureManager' = None,
**kwargs) -> F:
"""It setups registered fixture and then register it for cleanup """It setups registered fixture and then register it for cleanup
At the end of the test case execution it will call cleanup_fixture At the end of the test case execution it will call cleanup_fixture
with on the fixture with on the fixture
""" """
fixture = setup_fixture(obj, fixture_id=fixture_id, manager=manager) fixture = setup_fixture(
obj, fixture_id=fixture_id, manager=manager, **kwargs)
_case.add_cleanup(cleanup_fixture, fixture) _case.add_cleanup(cleanup_fixture, fixture)
return fixture return fixture
@ -402,13 +418,14 @@ def get_required_fixture_properties(cls):
def init_fixture(obj: typing.Union[typing.Type[F], F], def init_fixture(obj: typing.Union[typing.Type[F], F],
name: str, name: str,
fixture_id: typing.Any = None) -> F: fixture_id: typing.Any = None,
**kwargs) -> F:
fixture: F fixture: F
if isinstance(obj, fixtures.Fixture): if isinstance(obj, fixtures.Fixture):
fixture = obj fixture = obj(**kwargs)
elif inspect.isclass(obj) and issubclass(obj, fixtures.Fixture): elif inspect.isclass(obj) and issubclass(obj, fixtures.Fixture):
try: try:
fixture = obj() fixture = obj(**kwargs)
except Exception as ex: except Exception as ex:
raise TypeError(f"Error creating fixture '{name}' from class " raise TypeError(f"Error creating fixture '{name}' from class "
f"{obj!r}.") from ex f"{obj!r}.") from ex
@ -467,7 +484,8 @@ class FixtureManager:
def get_fixture(self, def get_fixture(self,
obj: FixtureType, obj: FixtureType,
fixture_id: typing.Any = None) -> F: fixture_id: typing.Any = None,
**kwargs) -> F:
name, obj = get_name_and_object(obj) name, obj = get_name_and_object(obj)
if fixture_id: if fixture_id:
name += f'-{fixture_id}' name += f'-{fixture_id}'
@ -476,7 +494,8 @@ class FixtureManager:
except KeyError: except KeyError:
fixture: F = self.init_fixture(obj=obj, fixture: F = self.init_fixture(obj=obj,
name=name, name=name,
fixture_id=fixture_id) fixture_id=fixture_id,
**kwargs)
assert isinstance(fixture, fixtures.Fixture) assert isinstance(fixture, fixtures.Fixture)
self.fixtures[name] = fixture self.fixtures[name] = fixture
return fixture return fixture
@ -484,10 +503,12 @@ class FixtureManager:
@staticmethod @staticmethod
def init_fixture(obj: typing.Union[typing.Type[F], F], def init_fixture(obj: typing.Union[typing.Type[F], F],
name: str, name: str,
fixture_id: typing.Any) -> F: fixture_id: typing.Any,
**kwargs) -> F:
return init_fixture(obj=obj, return init_fixture(obj=obj,
name=name, name=name,
fixture_id=fixture_id) fixture_id=fixture_id,
**kwargs)
def remove_fixture(self, def remove_fixture(self,
obj: FixtureType, obj: FixtureType,
@ -629,7 +650,7 @@ class RequiredFixture(property, typing.Generic[G]):
def setup_fixture(self, _instance=None) -> G: def setup_fixture(self, _instance=None) -> G:
fixture = self.fixture fixture = self.fixture
setup_fixture(fixture) setup_fixture(fixture, **self.kwargs)
if (hasattr(_instance, 'addCleanup') and if (hasattr(_instance, 'addCleanup') and
hasattr(_instance, 'getDetails')): hasattr(_instance, 'getDetails')):
_instance.addCleanup(_detail.gather_details, _instance.addCleanup(_detail.gather_details,

View File

@ -146,6 +146,7 @@ get_subnet_pool = _subnet_pool.get_subnet_pool
create_subnet_pool = _subnet_pool.create_subnet_pool create_subnet_pool = _subnet_pool.create_subnet_pool
delete_subnet_pool = _subnet_pool.delete_subnet_pool delete_subnet_pool = _subnet_pool.delete_subnet_pool
find_subnet_pool = _subnet_pool.find_subnet_pool find_subnet_pool = _subnet_pool.find_subnet_pool
list_subnet_pools = _subnet_pool.list_subnet_pools
list_security_groups = _security_group.list_security_groups list_security_groups = _security_group.list_security_groups
get_security_group = _security_group.get_security_group get_security_group = _security_group.get_security_group

View File

@ -56,10 +56,6 @@ class ResourceFixture(tobiko.SharedFixture, abc.ABC):
_resource: typing.Optional[object] = None _resource: typing.Optional[object] = None
_not_found_exception_tuple: typing.Type[Exception] = (Exception) _not_found_exception_tuple: typing.Type[Exception] = (Exception)
def __init__(self):
self.name = self.fixture_name
super().__init__()
@property @property
def resource_id(self): def resource_id(self):
if self.resource: if self.resource:
@ -88,6 +84,7 @@ class ResourceFixture(tobiko.SharedFixture, abc.ABC):
pass pass
def setup_fixture(self): def setup_fixture(self):
self.name = self.fixture_name
if config.get_bool_env('TOBIKO_PREVENT_CREATE'): if config.get_bool_env('TOBIKO_PREVENT_CREATE'):
LOG.debug("%r should have been already created: %r", LOG.debug("%r should have been already created: %r",
self.name, self.name,
@ -95,7 +92,9 @@ class ResourceFixture(tobiko.SharedFixture, abc.ABC):
else: else:
self.try_create_resource() self.try_create_resource()
if self.resource: if self.resource is None:
tobiko.fail("%r not found!", self.name)
else:
tobiko.addme_to_shared_resource(__name__, self.name) tobiko.addme_to_shared_resource(__name__, self.name)
def try_create_resource(self): def try_create_resource(self):

View File

@ -302,6 +302,11 @@ class SubnetPoolFixture(_fixture.ResourceFixture):
_not_found_exception_tuple: typing.Type[ _not_found_exception_tuple: typing.Type[
neutron.NoSuchSubnetPool] = (neutron.NoSuchSubnetPool) neutron.NoSuchSubnetPool] = (neutron.NoSuchSubnetPool)
def __init__(self, prefixes=None, default_prefixlen=None):
super().__init__()
self.prefixes = prefixes or self.prefixes
self.default_prefixlen = default_prefixlen or self.default_prefixlen
@property @property
def subnet_pool_id(self): def subnet_pool_id(self):
return self.resource_id return self.resource_id
@ -611,6 +616,11 @@ class StatelessSecurityGroupFixture(_fixture.ResourceFixture):
_not_found_exception_tuple: typing.Type[nc_exceptions.NotFound] = ( _not_found_exception_tuple: typing.Type[nc_exceptions.NotFound] = (
neutron.NotFound) neutron.NotFound)
def __init__(self, description=None, rules=None):
super().__init__()
self.description = description or self.description
self.rules = rules or self.rules
@property @property
def security_group_id(self): def security_group_id(self):
return self.resource_id return self.resource_id
@ -634,7 +644,8 @@ class StatelessSecurityGroupFixture(_fixture.ResourceFixture):
neutron.create_security_group_rule( neutron.create_security_group_rule(
sg['id'], add_cleanup=False, **rule) sg['id'], add_cleanup=False, **rule)
return sg # return the updated SG, including the rules just added
return self.resource_find()
def resource_delete(self): def resource_delete(self):
neutron.delete_security_group(self.security_group_id) neutron.delete_security_group(self.security_group_id)

View File

@ -0,0 +1,62 @@
# Copyright (c) 2023 Red Hat, Inc.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import testtools
import tobiko
from tobiko.openstack import keystone
from tobiko.openstack import neutron
from tobiko.openstack.stacks._neutron import StatelessSecurityGroupFixture
DESCRIPTION = "Amazing Stateless Sec Group number {i}"
NUM_SEC_GROUPS = 10
@keystone.skip_unless_has_keystone_credentials()
class StatelessSecurityGroupTest(testtools.TestCase):
"""Tests Stateless Security Group creation"""
stack_list = [tobiko.required_fixture(
StatelessSecurityGroupFixture,
fixture_id=str(i),
description=DESCRIPTION.format(i=i))
for i in range(NUM_SEC_GROUPS)]
ssg_fixture_list: list = []
@classmethod
def tearDownClass(cls):
for stack in cls.stack_list:
tobiko.cleanup_fixture(stack.fixture)
@classmethod
def setUpClass(cls):
for stack in cls.stack_list:
cls.ssg_fixture_list.append(tobiko.use_fixture(stack.fixture))
def test_stateless_sec_group_list_find(self):
self.assertEqual(NUM_SEC_GROUPS, len(self.ssg_fixture_list))
for i, ssg_fixture in enumerate(self.ssg_fixture_list):
ssg_name = (f"{StatelessSecurityGroupFixture.__module__}."
f"{StatelessSecurityGroupFixture.__qualname__}-{i}")
self.assertEqual(ssg_name, ssg_fixture.name)
ssg = neutron.list_security_groups(name=ssg_name).unique
self.assertEqual(ssg, ssg_fixture.security_group)
def test_stateless_sec_group_list_parameters(self):
for i, ssg_fixture in enumerate(self.ssg_fixture_list):
self.assertEqual(DESCRIPTION.format(i=i),
ssg_fixture.security_group['description'])

View File

@ -0,0 +1,94 @@
# Copyright (c) 2023 Red Hat, Inc.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import testtools
import tobiko
from tobiko.openstack import keystone
from tobiko.openstack import neutron
from tobiko.openstack.stacks._neutron import SubnetPoolFixture
PREFIX = "10.211.{i}.0/24"
DEFAULT_PREFIXLEN = 29
@keystone.skip_unless_has_keystone_credentials()
class SubnetPoolTest(testtools.TestCase):
"""Tests subnet pool creation"""
stack = tobiko.required_fixture(SubnetPoolFixture,
fixture_id="thistest",
prefixes=[PREFIX.format(i=0)],
default_prefixlen=DEFAULT_PREFIXLEN)
@classmethod
def tearDownClass(cls):
# NOTE: the skip at class level does not affect tearDownClass, so
# the following workaround is needed to avoid errors
if keystone.has_keystone_credentials():
tobiko.cleanup_fixture(cls.stack.fixture)
def test_subnet_pool_find(self):
snp = neutron.list_subnet_pools(name=self.stack.name).unique
self.assertEqual(snp, self.stack.subnet_pool)
def test_subnet_pool_parameters(self):
self.assertEqual([PREFIX.format(i=0)],
self.stack.subnet_pool['prefixes'])
self.assertEqual(str(DEFAULT_PREFIXLEN),
self.stack.subnet_pool['default_prefixlen'])
NUM_SUBNET_POOLS = 10
@keystone.skip_unless_has_keystone_credentials()
class SubnetPoolListTest(testtools.TestCase):
"""Tests creation of a list of subnet pools"""
stack_list = [tobiko.required_fixture(SubnetPoolFixture,
fixture_id=str(i),
prefixes=[PREFIX.format(i=i)],
default_prefixlen=DEFAULT_PREFIXLEN)
for i in range(NUM_SUBNET_POOLS)]
snp_fixture_list: list = []
@classmethod
def tearDownClass(cls):
for stack in cls.stack_list:
tobiko.cleanup_fixture(stack.fixture)
@classmethod
def setUpClass(cls):
for stack in cls.stack_list:
cls.snp_fixture_list.append(tobiko.use_fixture(stack.fixture))
def test_subnet_pool_list_find(self):
self.assertEqual(NUM_SUBNET_POOLS, len(self.snp_fixture_list))
for i, snp_fixture in enumerate(self.snp_fixture_list):
snp_name = (f"{SubnetPoolFixture.__module__}."
f"{SubnetPoolFixture.__qualname__}-{i}")
self.assertEqual(snp_name, snp_fixture.name)
snp = neutron.list_subnet_pools(name=snp_name).unique
self.assertEqual(snp, snp_fixture.subnet_pool)
def test_subnet_pool_list_parameters(self):
for i, snp_fixture in enumerate(self.snp_fixture_list):
self.assertEqual([PREFIX.format(i=i)],
snp_fixture.subnet_pool['prefixes'])
self.assertEqual(str(DEFAULT_PREFIXLEN),
snp_fixture.subnet_pool['default_prefixlen'])

View File

@ -48,10 +48,11 @@ class PatchEnvironFixture(tobiko.SharedFixture):
class FixtureManagerPatch(tobiko.FixtureManager, _patch.PatchFixture): class FixtureManagerPatch(tobiko.FixtureManager, _patch.PatchFixture):
def init_fixture(self, obj, name, fixture_id): def init_fixture(self, obj, name, fixture_id, **kwargs):
fixture = super().init_fixture(obj=obj, fixture = super().init_fixture(obj=obj,
name=name, name=name,
fixture_id=fixture_id) fixture_id=fixture_id,
**kwargs)
self.addCleanup(tobiko.cleanup_fixture, fixture) self.addCleanup(tobiko.cleanup_fixture, fixture)
return fixture return fixture