
This adds to the provisioning system the ability to preserve and re-display the full reason for a particular DB backend not available, and propagates this all the way through the skip() emitted by the OpportunisticTestCase and DbFixture. As part of the change, the database type requested is added to the messages themselves, which also appears in the messages emitted by the consuming is_backend_avail() function, which has tests that check for these explicit messages. New tests specific to the Backend object are added which are not dependent on is_backend_avail(); as this function is deprecated, we will be able to remove its tests at the same time as the function itself with no loss in coverage. Change-Id: I7c00e0770b02aa751e184edfecdec4306de6340c
275 lines
9.3 KiB
Python
275 lines
9.3 KiB
Python
# Copyright (c) 2013 OpenStack Foundation
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import fixtures
|
|
import testresources
|
|
import testscenarios
|
|
|
|
try:
|
|
from oslotest import base as test_base
|
|
except ImportError:
|
|
raise NameError('Oslotest is not installed. Please add oslotest in your'
|
|
' test-requirements')
|
|
|
|
|
|
import os
|
|
|
|
from oslo_utils import reflection
|
|
import six
|
|
|
|
from oslo_db import exception
|
|
from oslo_db.sqlalchemy import enginefacade
|
|
from oslo_db.sqlalchemy import provision
|
|
from oslo_db.sqlalchemy import session
|
|
|
|
|
|
class DbFixture(fixtures.Fixture):
|
|
"""Basic database fixture.
|
|
|
|
Allows to run tests on various db backends, such as SQLite, MySQL and
|
|
PostgreSQL. By default use sqlite backend. To override default backend
|
|
uri set env variable OS_TEST_DBAPI_ADMIN_CONNECTION with database admin
|
|
credentials for specific backend.
|
|
"""
|
|
|
|
DRIVER = "sqlite"
|
|
|
|
# these names are deprecated, and are not used by DbFixture.
|
|
# they are here for backwards compatibility with test suites that
|
|
# are referring to them directly.
|
|
DBNAME = PASSWORD = USERNAME = 'openstack_citest'
|
|
|
|
def __init__(self, test, skip_on_unavailable_db=True):
|
|
super(DbFixture, self).__init__()
|
|
self.test = test
|
|
self.skip_on_unavailable_db = skip_on_unavailable_db
|
|
|
|
def setUp(self):
|
|
super(DbFixture, self).setUp()
|
|
|
|
testresources.setUpResources(
|
|
self.test, self.test.resources, testresources._get_result())
|
|
self.addCleanup(
|
|
testresources.tearDownResources,
|
|
self.test, self.test.resources, testresources._get_result()
|
|
)
|
|
|
|
if not self.test._has_db_resource():
|
|
msg = self.test._get_db_resource_not_available_reason()
|
|
if self.test.SKIP_ON_UNAVAILABLE_DB:
|
|
self.test.skip(msg)
|
|
else:
|
|
self.test.fail(msg)
|
|
|
|
if self.test.SCHEMA_SCOPE:
|
|
self.test.engine = self.test.transaction_engine
|
|
self.test.sessionmaker = session.get_maker(
|
|
self.test.transaction_engine)
|
|
else:
|
|
self.test.engine = self.test.db.engine
|
|
self.test.sessionmaker = session.get_maker(self.test.engine)
|
|
|
|
self.addCleanup(setattr, self.test, 'sessionmaker', None)
|
|
self.addCleanup(setattr, self.test, 'engine', None)
|
|
|
|
self.test.enginefacade = enginefacade._TestTransactionFactory(
|
|
self.test.engine, self.test.sessionmaker, apply_global=True,
|
|
synchronous_reader=True)
|
|
self.addCleanup(self.test.enginefacade.dispose_global)
|
|
|
|
|
|
class DbTestCase(test_base.BaseTestCase):
|
|
"""Base class for testing of DB code.
|
|
|
|
"""
|
|
|
|
FIXTURE = DbFixture
|
|
SCHEMA_SCOPE = None
|
|
SKIP_ON_UNAVAILABLE_DB = True
|
|
|
|
_db_not_available = {}
|
|
_schema_resources = {}
|
|
_database_resources = {}
|
|
|
|
def _get_db_resource_not_available_reason(self):
|
|
return self._db_not_available.get(self.FIXTURE.DRIVER, None)
|
|
|
|
def _has_db_resource(self):
|
|
return self._database_resources.get(
|
|
self.FIXTURE.DRIVER, None) is not None
|
|
|
|
def _resources_for_driver(self, driver, schema_scope, generate_schema):
|
|
# testresources relies on the identity and state of the
|
|
# TestResourceManager objects in play to correctly manage
|
|
# resources, and it also hardcodes to looking at the
|
|
# ".resources" attribute on the test object, even though the
|
|
# setUpResources() function passes the list of resources in,
|
|
# so we have to code the TestResourceManager logic into the
|
|
# .resources attribute and ensure that the same set of test
|
|
# variables always produces the same TestResourceManager objects.
|
|
|
|
if driver not in self._database_resources:
|
|
try:
|
|
self._database_resources[driver] = \
|
|
provision.DatabaseResource(driver)
|
|
except exception.BackendNotAvailable as bne:
|
|
self._database_resources[driver] = None
|
|
self._db_not_available[driver] = str(bne)
|
|
|
|
database_resource = self._database_resources[driver]
|
|
if database_resource is None:
|
|
return []
|
|
|
|
if schema_scope:
|
|
key = (driver, schema_scope)
|
|
if key not in self._schema_resources:
|
|
schema_resource = provision.SchemaResource(
|
|
database_resource, generate_schema)
|
|
|
|
transaction_resource = provision.TransactionResource(
|
|
database_resource, schema_resource)
|
|
|
|
self._schema_resources[key] = \
|
|
transaction_resource
|
|
|
|
transaction_resource = self._schema_resources[key]
|
|
|
|
return [
|
|
('transaction_engine', transaction_resource),
|
|
('db', database_resource),
|
|
]
|
|
else:
|
|
key = (driver, None)
|
|
if key not in self._schema_resources:
|
|
self._schema_resources[key] = provision.SchemaResource(
|
|
database_resource, generate_schema, teardown=True)
|
|
|
|
schema_resource = self._schema_resources[key]
|
|
return [
|
|
('schema', schema_resource),
|
|
('db', database_resource)
|
|
]
|
|
|
|
@property
|
|
def resources(self):
|
|
return self._resources_for_driver(
|
|
self.FIXTURE.DRIVER, self.SCHEMA_SCOPE, self.generate_schema)
|
|
|
|
def setUp(self):
|
|
super(DbTestCase, self).setUp()
|
|
self.useFixture(
|
|
self.FIXTURE(
|
|
self, skip_on_unavailable_db=self.SKIP_ON_UNAVAILABLE_DB))
|
|
|
|
def generate_schema(self, engine):
|
|
"""Generate schema objects to be used within a test.
|
|
|
|
The function is separate from the setUp() case as the scope
|
|
of this method is controlled by the provisioning system. A
|
|
test that specifies SCHEMA_SCOPE may not call this method
|
|
for each test, as the schema may be maintained from a previous run.
|
|
|
|
"""
|
|
if self.SCHEMA_SCOPE:
|
|
# if SCHEMA_SCOPE is set, then this method definitely
|
|
# has to be implemented. This is a guard against a test
|
|
# that inadvertently does schema setup within setUp().
|
|
raise NotImplementedError(
|
|
"This test requires schema-level setup to be "
|
|
"implemented within generate_schema().")
|
|
|
|
|
|
class OpportunisticTestCase(DbTestCase):
|
|
"""Placeholder for backwards compatibility."""
|
|
|
|
ALLOWED_DIALECTS = ['sqlite', 'mysql', 'postgresql']
|
|
|
|
|
|
def backend_specific(*dialects):
|
|
"""Decorator to skip backend specific tests on inappropriate engines.
|
|
|
|
::dialects: list of dialects names under which the test will be launched.
|
|
"""
|
|
def wrap(f):
|
|
@six.wraps(f)
|
|
def ins_wrap(self):
|
|
if not set(dialects).issubset(ALLOWED_DIALECTS):
|
|
raise ValueError(
|
|
"Please use allowed dialects: %s" % ALLOWED_DIALECTS)
|
|
if self.engine.name not in dialects:
|
|
msg = ('The test "%s" can be run '
|
|
'only on %s. Current engine is %s.')
|
|
args = (reflection.get_callable_name(f), ', '.join(dialects),
|
|
self.engine.name)
|
|
self.skip(msg % args)
|
|
else:
|
|
return f(self)
|
|
return ins_wrap
|
|
return wrap
|
|
|
|
|
|
class MySQLOpportunisticFixture(DbFixture):
|
|
DRIVER = 'mysql'
|
|
|
|
|
|
class PostgreSQLOpportunisticFixture(DbFixture):
|
|
DRIVER = 'postgresql'
|
|
|
|
|
|
class MySQLOpportunisticTestCase(OpportunisticTestCase):
|
|
FIXTURE = MySQLOpportunisticFixture
|
|
|
|
|
|
class PostgreSQLOpportunisticTestCase(OpportunisticTestCase):
|
|
FIXTURE = PostgreSQLOpportunisticFixture
|
|
|
|
|
|
def optimize_db_test_loader(file_):
|
|
"""Package level load_tests() function.
|
|
|
|
Will apply an optimizing test suite to all sub-tests, which groups DB
|
|
tests and other resources appropriately.
|
|
|
|
Place this in an __init__.py package file within the root of the test
|
|
suite, at the level where testresources loads it as a package::
|
|
|
|
from oslo_db.sqlalchemy import test_base
|
|
|
|
load_tests = test_base.optimize_db_test_loader(__file__)
|
|
|
|
Alternatively, the directive can be placed into a test module directly.
|
|
|
|
"""
|
|
|
|
this_dir = os.path.dirname(file_)
|
|
|
|
def load_tests(loader, found_tests, pattern):
|
|
# pattern is None if the directive is placed within
|
|
# a test module directly, as well as within certain test
|
|
# discovery patterns
|
|
|
|
if pattern is not None:
|
|
pkg_tests = loader.discover(start_dir=this_dir, pattern=pattern)
|
|
|
|
result = testresources.OptimisingTestSuite()
|
|
found_tests = testscenarios.load_tests_apply_scenarios(
|
|
loader, found_tests, pattern)
|
|
result.addTest(found_tests)
|
|
|
|
if pattern is not None:
|
|
result.addTest(pkg_tests)
|
|
return result
|
|
return load_tests
|