Fixturize
Change-Id: Ie2319d74181f9f1d3496602b923b583fa942c25c
This commit is contained in:
parent
76fb9f7aac
commit
7f773fa10b
@ -14,6 +14,7 @@
|
|||||||
"""
|
"""
|
||||||
A Usage plugin using sqlalchemy...
|
A Usage plugin using sqlalchemy...
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from oslo.config import cfg
|
from oslo.config import cfg
|
||||||
from sqlalchemy.ext.declarative import declarative_base
|
from sqlalchemy.ext.declarative import declarative_base
|
||||||
|
|
||||||
|
@ -155,17 +155,11 @@ class FunctionalTest(ServiceTestCase, APITestMixin):
|
|||||||
super(FunctionalTest, self).setUp()
|
super(FunctionalTest, self).setUp()
|
||||||
|
|
||||||
# NOTE: Needs to be started after the db schema is created
|
# NOTE: Needs to be started after the db schema is created
|
||||||
conn = self.get_storage_connection('central')
|
self.start_storage('central')
|
||||||
conn.setup_schema()
|
|
||||||
self.start_service('central')
|
self.start_service('central')
|
||||||
|
|
||||||
self.setSamples()
|
self.setSamples()
|
||||||
|
|
||||||
self.app = factory({})
|
self.app = factory({})
|
||||||
self.app.wsgi_app = FaultWrapperMiddleware(self.app.wsgi_app)
|
self.app.wsgi_app = FaultWrapperMiddleware(self.app.wsgi_app)
|
||||||
self.app.wsgi_app = NoAuthContextMiddleware(self.app.wsgi_app)
|
self.app.wsgi_app = NoAuthContextMiddleware(self.app.wsgi_app)
|
||||||
self.client = self.app.test_client()
|
self.client = self.app.test_client()
|
||||||
|
|
||||||
def tearDown(self):
|
|
||||||
self.services.central.stop()
|
|
||||||
super(FunctionalTest, self).tearDown()
|
|
||||||
|
@ -1,10 +1,18 @@
|
|||||||
import copy
|
import copy
|
||||||
import unittest2
|
import os
|
||||||
|
import shutil
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
import fixtures
|
||||||
import mox
|
import mox
|
||||||
|
import stubout
|
||||||
|
import testtools
|
||||||
|
|
||||||
from oslo.config import cfg
|
from oslo.config import cfg
|
||||||
# NOTE: Currently disabled
|
# NOTE: Currently disabled
|
||||||
# from billingstack.openstack.common import policy
|
# from billingstack.openstack.common import policy
|
||||||
from billingstack import exceptions
|
from billingstack import exceptions
|
||||||
|
from billingstack import paths
|
||||||
from billingstack import samples
|
from billingstack import samples
|
||||||
from billingstack.openstack.common.context import RequestContext, \
|
from billingstack.openstack.common.context import RequestContext, \
|
||||||
get_admin_context
|
get_admin_context
|
||||||
@ -16,6 +24,207 @@ cfg.CONF.import_opt(
|
|||||||
'billingstack.openstack.common.rpc.impl_fake')
|
'billingstack.openstack.common.rpc.impl_fake')
|
||||||
|
|
||||||
|
|
||||||
|
CONF = cfg.CONF
|
||||||
|
CONF.import_opt('host', 'billingstack.netconf')
|
||||||
|
|
||||||
|
|
||||||
|
STORAGE_CACHE = {}
|
||||||
|
|
||||||
|
|
||||||
|
# Config Methods
|
||||||
|
def set_config(**kwargs):
|
||||||
|
group = kwargs.pop('group', None)
|
||||||
|
|
||||||
|
for k, v in kwargs.iteritems():
|
||||||
|
cfg.CONF.set_override(k, v, group)
|
||||||
|
|
||||||
|
|
||||||
|
class ConfFixture(fixtures.Fixture):
|
||||||
|
"""Fixture to manage global conf settings."""
|
||||||
|
|
||||||
|
def __init__(self, conf):
|
||||||
|
self.conf = conf
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(ConfFixture, self).setUp()
|
||||||
|
self.conf.set_default('host', 'fake-mini')
|
||||||
|
self.conf.set_default('fake_rabbit', True)
|
||||||
|
self.conf.set_default('rpc_backend',
|
||||||
|
'billingstack.openstack.common.rpc.impl_fake')
|
||||||
|
self.conf.set_default('rpc_cast_timeout', 5)
|
||||||
|
self.conf.set_default('rpc_response_timeout', 5)
|
||||||
|
self.conf.set_default('verbose', True)
|
||||||
|
self.addCleanup(self.conf.reset)
|
||||||
|
|
||||||
|
|
||||||
|
class FixtureHelper(object):
|
||||||
|
"""Underlying helper object for a StorageFixture to hold driver methods"""
|
||||||
|
|
||||||
|
def __init__(self, fixture):
|
||||||
|
"""
|
||||||
|
:param fixture: The fixture object
|
||||||
|
"""
|
||||||
|
self.fixture = fixture
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
"""Runs pr test, typically a db reset or similar"""
|
||||||
|
|
||||||
|
def pre_migrate(self):
|
||||||
|
"""Run before migrations"""
|
||||||
|
|
||||||
|
def migrate(self):
|
||||||
|
"""Migrate the storage"""
|
||||||
|
|
||||||
|
def post_migrate(self):
|
||||||
|
"""This is executed after migrations"""
|
||||||
|
|
||||||
|
def post_init(self):
|
||||||
|
"""Runs at the end of the object initialization"""
|
||||||
|
|
||||||
|
|
||||||
|
class SQLAlchemyHelper(FixtureHelper):
|
||||||
|
def __init__(self, fixture):
|
||||||
|
super(SQLAlchemyHelper, self).__init__(fixture)
|
||||||
|
|
||||||
|
self.sqlite_db = fixture.kw.get('sqlite_db')
|
||||||
|
self.sqlite_clean_db = fixture.kw.get('sqlite_clean_db')
|
||||||
|
self.testdb = None
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
if self.fixture.database_connection == "sqlite://":
|
||||||
|
conn = self.fixture.connection.engine.connect()
|
||||||
|
conn.connection.executescript(self._as_string)
|
||||||
|
self.fixture.addCleanup(self.fixture.connection.engine.dispose)
|
||||||
|
else:
|
||||||
|
shutil.copyfile(paths.state_path_rel(self.sqlite_clean_db),
|
||||||
|
paths.state_path_rel(self.sqlite_db))
|
||||||
|
|
||||||
|
def pre_migrate(self):
|
||||||
|
self.fixture.connection.engine.dispose()
|
||||||
|
self.fixture.connection.engine.connect()
|
||||||
|
if self.fixture.database_connection == "sqlite://":
|
||||||
|
#https://github.com/openstack/nova/blob/master/nova/test.py#L82-L84
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
testdb = paths.state_path_rel(self.sqlite_db)
|
||||||
|
if os.path.exists(testdb):
|
||||||
|
return
|
||||||
|
|
||||||
|
def migrate(self):
|
||||||
|
self.fixture.connection.setup_schema()
|
||||||
|
|
||||||
|
def post_init(self):
|
||||||
|
if self.fixture.database_connection == "sqlite://":
|
||||||
|
conn = self.fixture.connection.engine.connect()
|
||||||
|
self._as_string = "".join(
|
||||||
|
l for l in conn.connection.iterdump())
|
||||||
|
self.fixture.connection.engine.dispose()
|
||||||
|
else:
|
||||||
|
cleandb = paths.state_path_rel(self.sqlite_clean_db)
|
||||||
|
shutil.copyfile(self.testdb, cleandb)
|
||||||
|
|
||||||
|
|
||||||
|
class StorageFixture(fixtures.Fixture):
|
||||||
|
"""
|
||||||
|
Storage fixture that for now just supports SQLAlchemy
|
||||||
|
"""
|
||||||
|
def __init__(self, svc, **kw):
|
||||||
|
self.svc = svc
|
||||||
|
self.kw = kw
|
||||||
|
|
||||||
|
self.driver = kw.get('storage_driver', 'sqlalchemy')
|
||||||
|
self.database_connection = kw.get('database_connection', 'sqlite://')
|
||||||
|
|
||||||
|
self.svc_group = 'service:%s' % self.svc
|
||||||
|
self.driver_group = '%s:%s' % (self.svc, self.driver)
|
||||||
|
|
||||||
|
cfg.CONF.import_opt('storage_driver', 'billingstack.%s' % self.svc,
|
||||||
|
group=self.svc_group)
|
||||||
|
set_config(storage_driver=self.driver, group=self.svc_group)
|
||||||
|
|
||||||
|
# FIXME: Move this to a generic get_storage() method instead?
|
||||||
|
self.module = importutils.import_module(
|
||||||
|
'billingstack.%s.storage' % self.svc)
|
||||||
|
|
||||||
|
# FIXME: Workout a way to support the different storage types
|
||||||
|
self.helper = SQLAlchemyHelper(self)
|
||||||
|
|
||||||
|
cfg.CONF.import_opt(
|
||||||
|
'database_connection',
|
||||||
|
'billingstack.%s.storage.impl_%s' % (self.svc, self.driver),
|
||||||
|
group=self.driver_group)
|
||||||
|
|
||||||
|
set_config(database_connection=self.database_connection,
|
||||||
|
group=self.driver_group)
|
||||||
|
|
||||||
|
self.connection = self.get_storage_connection(**kw)
|
||||||
|
|
||||||
|
self.helper.pre_migrate()
|
||||||
|
self.helper.migrate()
|
||||||
|
self.helper.post_migrate()
|
||||||
|
self.helper.post_init()
|
||||||
|
|
||||||
|
for hook in kw.get('hooks', []):
|
||||||
|
hook()
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(StorageFixture, self).setUp()
|
||||||
|
self.helper.setUp()
|
||||||
|
|
||||||
|
def get_storage_connection(self, **kw):
|
||||||
|
"""
|
||||||
|
Import the storage module for the service that we are going to act on,
|
||||||
|
then return a connection object for that storage module.
|
||||||
|
|
||||||
|
:param service: The service.
|
||||||
|
"""
|
||||||
|
engine = self.module.get_engine(self.driver)
|
||||||
|
return engine.get_connection()
|
||||||
|
|
||||||
|
|
||||||
|
class ServiceFixture(fixtures.Fixture):
|
||||||
|
"""Run service as a test fixture, semi-copied from Nova"""
|
||||||
|
|
||||||
|
def __init__(self, name, host=None, **kwargs):
|
||||||
|
host = host and host or uuid.uuid4().hex
|
||||||
|
kwargs.setdefault('host', host)
|
||||||
|
kwargs.setdefault('binary', 'billingstack-%s' % name)
|
||||||
|
self.name = name
|
||||||
|
self.kwargs = kwargs
|
||||||
|
|
||||||
|
self.cls = self.get_service(self.name)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_service(svc):
|
||||||
|
"""
|
||||||
|
Return a service
|
||||||
|
|
||||||
|
:param service: The service.
|
||||||
|
"""
|
||||||
|
return importutils.import_class('billingstack.%s.service.Service' %
|
||||||
|
svc)
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(ServiceFixture, self).setUp()
|
||||||
|
self.service = self.cls()
|
||||||
|
self.service.start()
|
||||||
|
|
||||||
|
|
||||||
|
class MoxStubout(fixtures.Fixture):
|
||||||
|
"""Deal with code around mox and stubout as a fixture."""
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(MoxStubout, self).setUp()
|
||||||
|
# emulate some of the mox stuff, we can't use the metaclass
|
||||||
|
# because it screws with our generators
|
||||||
|
self.mox = mox.Mox()
|
||||||
|
self.stubs = stubout.StubOutForTesting()
|
||||||
|
self.addCleanup(self.stubs.UnsetAll)
|
||||||
|
self.addCleanup(self.stubs.SmartUnsetAll)
|
||||||
|
self.addCleanup(self.mox.UnsetStubs)
|
||||||
|
self.addCleanup(self.mox.VerifyAll)
|
||||||
|
|
||||||
|
|
||||||
class AssertMixin(object):
|
class AssertMixin(object):
|
||||||
"""
|
"""
|
||||||
Mixin to hold assert helpers.
|
Mixin to hold assert helpers.
|
||||||
@ -43,35 +252,56 @@ class AssertMixin(object):
|
|||||||
|
|
||||||
def assertDuplicate(self, func, *args, **kw):
|
def assertDuplicate(self, func, *args, **kw):
|
||||||
exception = kw.pop('exception', exceptions.Duplicate)
|
exception = kw.pop('exception', exceptions.Duplicate)
|
||||||
with self.assertRaises(exception):
|
with testtools.ExpectedException(exception):
|
||||||
func(*args, **kw)
|
func(*args, **kw)
|
||||||
|
|
||||||
def assertMissing(self, func, *args, **kw):
|
def assertMissing(self, func, *args, **kw):
|
||||||
exception = kw.pop('exception', exceptions.NotFound)
|
exception = kw.pop('exception', exceptions.NotFound)
|
||||||
with self.assertRaises(exception):
|
with testtools.ExpectedException(exception):
|
||||||
func(*args, **kw)
|
func(*args, **kw)
|
||||||
|
|
||||||
|
|
||||||
class BaseTestCase(unittest2.TestCase, AssertMixin):
|
class BaseTestCase(testtools.TestCase, AssertMixin):
|
||||||
"""
|
"""
|
||||||
A base test class.
|
A base test class to be used for typically non-service kind of things.
|
||||||
"""
|
"""
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(BaseTestCase, self).setUp()
|
super(BaseTestCase, self).setUp()
|
||||||
self.mox = mox.Mox()
|
|
||||||
|
|
||||||
def tearDown(self):
|
test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
|
||||||
cfg.CONF.reset()
|
try:
|
||||||
self.mox.UnsetStubs()
|
test_timeout = int(test_timeout)
|
||||||
self.mox.VerifyAll()
|
except ValueError:
|
||||||
super(BaseTestCase, self).tearDown()
|
# If timeout value is invalid do not set a timeout.
|
||||||
|
test_timeout = 0
|
||||||
|
if test_timeout > 0:
|
||||||
|
self.useFixture(fixtures.Timeout(test_timeout, gentle=True))
|
||||||
|
|
||||||
# Config Methods
|
if (os.environ.get('OS_STDOUT_CAPTURE') == 'True' or
|
||||||
def config(self, **kwargs):
|
os.environ.get('OS_STDOUT_CAPTURE') == '1'):
|
||||||
group = kwargs.pop('group', None)
|
stdout = self.useFixture(fixtures.StringStream('stdout')).stream
|
||||||
|
self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
|
||||||
|
if (os.environ.get('OS_STDERR_CAPTURE') == 'True' or
|
||||||
|
os.environ.get('OS_STDERR_CAPTURE') == '1'):
|
||||||
|
stderr = self.useFixture(fixtures.StringStream('stderr')).stream
|
||||||
|
self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
|
||||||
|
|
||||||
for k, v in kwargs.iteritems():
|
self.log_fixture = self.useFixture(fixtures.FakeLogger())
|
||||||
cfg.CONF.set_override(k, v, group)
|
self.useFixture(ConfFixture(cfg.CONF))
|
||||||
|
|
||||||
|
mox_fixture = self.useFixture(MoxStubout())
|
||||||
|
self.mox = mox_fixture
|
||||||
|
self.stubs = mox_fixture.stubs
|
||||||
|
self.addCleanup(self._clear_attrs)
|
||||||
|
self.useFixture(fixtures.EnvironmentVariable('http_proxy'))
|
||||||
|
#self.policy = self.useFixture(policy_fixture.PolicyFixture())
|
||||||
|
|
||||||
|
def _clear_attrs(self):
|
||||||
|
# Delete attributes that don't start with _ so they don't pin
|
||||||
|
# memory around unnecessarily for the duration of the test
|
||||||
|
# suite
|
||||||
|
for key in [k for k in self.__dict__.keys() if k[0] != '_']:
|
||||||
|
del self.__dict__[key]
|
||||||
|
|
||||||
def get_fixture(self, name, fixture=0, values={}):
|
def get_fixture(self, name, fixture=0, values={}):
|
||||||
"""
|
"""
|
||||||
@ -81,12 +311,6 @@ class BaseTestCase(unittest2.TestCase, AssertMixin):
|
|||||||
_values.update(values)
|
_values.update(values)
|
||||||
return _values
|
return _values
|
||||||
|
|
||||||
def get_admin_context(self):
|
|
||||||
return get_admin_context()
|
|
||||||
|
|
||||||
def get_context(self, **kw):
|
|
||||||
return RequestContext(**kw)
|
|
||||||
|
|
||||||
|
|
||||||
class Services(dict):
|
class Services(dict):
|
||||||
def __getattr__(self, name):
|
def __getattr__(self, name):
|
||||||
@ -99,60 +323,34 @@ class Services(dict):
|
|||||||
|
|
||||||
|
|
||||||
class TestCase(BaseTestCase):
|
class TestCase(BaseTestCase):
|
||||||
|
"""Base test case for services etc"""
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(TestCase, self).setUp()
|
super(TestCase, self).setUp()
|
||||||
|
|
||||||
self.samples = samples.get_samples()
|
self.samples = samples.get_samples()
|
||||||
self.admin_ctxt = self.get_admin_context()
|
self.admin_ctxt = self.get_admin_context()
|
||||||
|
|
||||||
self.config(rpc_backend='billingstack.openstack.common.rpc.impl_fake')
|
|
||||||
|
|
||||||
# NOTE: No services up by default
|
# NOTE: No services up by default
|
||||||
self.services = Services()
|
self.services = Services()
|
||||||
|
|
||||||
def tearDown(self):
|
def get_admin_context(self):
|
||||||
# NOTE: Currently disabled
|
return get_admin_context()
|
||||||
for svc in self.services.values():
|
|
||||||
svc.storage_conn.teardown_schema()
|
|
||||||
super(TestCase, self).tearDown()
|
|
||||||
|
|
||||||
def get_storage_connection(self, service='central', **kw):
|
def get_context(self, **kw):
|
||||||
"""
|
return RequestContext(**kw)
|
||||||
Import the storage module for the service that we are going to act on,
|
|
||||||
then return a connection object for that storage module.
|
|
||||||
|
|
||||||
:param service: The service.
|
def start_service(self, name, host=None, **kwargs):
|
||||||
"""
|
fixture = self.useFixture(ServiceFixture(name, host, **kwargs))
|
||||||
storage = importutils.import_module('billingstack.%s.storage' %
|
self.services[name] = fixture.service
|
||||||
service)
|
return fixture
|
||||||
|
|
||||||
driver = kw.get('storage_driver', 'sqlalchemy')
|
def start_storage(self, name, **kw):
|
||||||
engine = storage.get_engine(driver)
|
fixture = StorageFixture(name, **kw)
|
||||||
|
global STORAGE_CACHE
|
||||||
self.config(storage_driver=driver, group='service:%s' % service)
|
if not name in STORAGE_CACHE:
|
||||||
|
STORAGE_CACHE[name] = fixture
|
||||||
db = kw.get('database_connection', 'sqlite://')
|
self.useFixture(STORAGE_CACHE[name])
|
||||||
self.config(database_connection=db, group='%s:%s' % (service, driver))
|
return fixture
|
||||||
|
|
||||||
connection = engine.get_connection()
|
|
||||||
|
|
||||||
return connection
|
|
||||||
|
|
||||||
def get_service(self, service='central'):
|
|
||||||
"""
|
|
||||||
Return a service
|
|
||||||
|
|
||||||
:param service: The service.
|
|
||||||
"""
|
|
||||||
svc = importutils.import_class('billingstack.%s.service.Service' %
|
|
||||||
service)
|
|
||||||
return svc()
|
|
||||||
|
|
||||||
def start_service(self, service='central'):
|
|
||||||
svc = self.get_service(service=service)
|
|
||||||
|
|
||||||
svc.start()
|
|
||||||
self.services[service] = svc
|
|
||||||
|
|
||||||
def setSamples(self):
|
def setSamples(self):
|
||||||
_, self.currency = self.create_currency()
|
_, self.currency = self.create_currency()
|
||||||
@ -203,6 +401,7 @@ class TestCase(BaseTestCase):
|
|||||||
|
|
||||||
|
|
||||||
class ServiceTestCase(TestCase):
|
class ServiceTestCase(TestCase):
|
||||||
|
"""Testcase with some base methods when running in Service ish mode"""
|
||||||
def create_language(self, fixture=0, values={}, **kw):
|
def create_language(self, fixture=0, values={}, **kw):
|
||||||
fixture = self.get_fixture('language', fixture, values)
|
fixture = self.get_fixture('language', fixture, values)
|
||||||
ctxt = kw.pop('context', self.admin_ctxt)
|
ctxt = kw.pop('context', self.admin_ctxt)
|
||||||
@ -277,15 +476,3 @@ class ServiceTestCase(TestCase):
|
|||||||
ctxt = kw.pop('context', self.admin_ctxt)
|
ctxt = kw.pop('context', self.admin_ctxt)
|
||||||
return fixture, self.services.central.create_plan(
|
return fixture, self.services.central.create_plan(
|
||||||
ctxt, merchant_id, fixture, **kw)
|
ctxt, merchant_id, fixture, **kw)
|
||||||
|
|
||||||
|
|
||||||
class StorageTestCase(TestCase):
|
|
||||||
def setUp(self):
|
|
||||||
super(StorageTestCase, self).setUp()
|
|
||||||
self.storage_conn = self.get_storage_connection()
|
|
||||||
self.storage_conn.setup_schema()
|
|
||||||
self.setSamples()
|
|
||||||
|
|
||||||
def tearDown(self):
|
|
||||||
self.storage_conn.teardown_schema()
|
|
||||||
super(StorageTestCase, self).tearDown()
|
|
||||||
|
@ -1,8 +0,0 @@
|
|||||||
from oslo.config import cfg
|
|
||||||
|
|
||||||
|
|
||||||
cfg.CONF.import_opt('storage_driver', 'billingstack.central',
|
|
||||||
group='service:central')
|
|
||||||
cfg.CONF.import_opt('database_connection',
|
|
||||||
'billingstack.central.storage.impl_sqlalchemy',
|
|
||||||
group='central:sqlalchemy')
|
|
@ -15,7 +15,6 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
from billingstack.openstack.common import log as logging
|
from billingstack.openstack.common import log as logging
|
||||||
from billingstack.central.storage.impl_sqlalchemy import models
|
from billingstack.central.storage.impl_sqlalchemy import models
|
||||||
from billingstack.tests.base import StorageTestCase
|
|
||||||
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
@ -24,9 +23,7 @@ LOG = logging.getLogger(__name__)
|
|||||||
UUID = 'caf771fc-6b05-4891-bee1-c2a48621f57b'
|
UUID = 'caf771fc-6b05-4891-bee1-c2a48621f57b'
|
||||||
|
|
||||||
|
|
||||||
class StorageDriverTestCase(StorageTestCase):
|
class DriverMixin(object):
|
||||||
__test__ = False
|
|
||||||
|
|
||||||
def create_language(self, fixture=0, values={}, **kw):
|
def create_language(self, fixture=0, values={}, **kw):
|
||||||
fixture = self.get_fixture('language', fixture, values)
|
fixture = self.get_fixture('language', fixture, values)
|
||||||
ctxt = kw.pop('context', self.admin_ctxt)
|
ctxt = kw.pop('context', self.admin_ctxt)
|
||||||
|
@ -16,15 +16,15 @@
|
|||||||
#
|
#
|
||||||
# Copied: billingstack
|
# Copied: billingstack
|
||||||
from billingstack.openstack.common import log as logging
|
from billingstack.openstack.common import log as logging
|
||||||
from billingstack.tests.central.storage.base import StorageDriverTestCase
|
from billingstack.tests.base import TestCase
|
||||||
|
from billingstack.tests.central.storage.base import DriverMixin
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class SqlalchemyStorageTest(StorageDriverTestCase):
|
class SqlalchemyStorageTest(DriverMixin, TestCase):
|
||||||
__test__ = True
|
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
self.config(database_connection='sqlite://',
|
|
||||||
group='central:sqlalchemy')
|
|
||||||
super(SqlalchemyStorageTest, self).setUp()
|
super(SqlalchemyStorageTest, self).setUp()
|
||||||
|
fixture = self.start_storage('central')
|
||||||
|
self.storage_conn = fixture.connection
|
||||||
|
self.setSamples()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user