Cleanup of the model test code and model instancing. Moved the database setup and teardown into package-level fixtures.

TODO: Make all tests use package-level DBs for testing.
This commit is contained in:
Aurynn Shaw 2014-02-26 14:14:15 +13:00
parent 0f1004d527
commit 4c5bdc1507
3 changed files with 137 additions and 56 deletions

View File

@ -48,7 +48,7 @@ class UsageEntry(Base):
__table_args__ = ( ForeignKeyConstraint( __table_args__ = ( ForeignKeyConstraint(
["resource_id", "tenant_id"], ["resource_id", "tenant_id"],
["resources.id", "resources.tenant_id"], ["resources.id", "resources.tenant_id"],
name="fk_resource", use_alter=True name="fk_resource_constraint"
), ) ), )
@hybrid_property @hybrid_property
@ -72,21 +72,20 @@ class Tenant(Base):
resources = relationship(Resource, backref="tenant") resources = relationship(Resource, backref="tenant")
# usages = relationship(UsageEntry, backref="tenant", primaryjoin=(id == UsageEntry.tenant_id)) # usages = relationship(UsageEntry, backref="tenant", primaryjoin=(id == UsageEntry.tenant_id))
# Some reference data to something else? # Some reference data to something else?
#
# this might not be a needed model? # this might not be a needed model?
class SalesOrder(Base): class SalesOrder(Base):
"""Historic billing periods so that tenants cannot be rebuild accidentally.""" """Historic billing periods so that tenants cannot be rebuild accidentally."""
__tablename__ = 'sales_orders' __tablename__ = 'sales_orders'
tenant_id = Column(String(100), primary_key=True) tenant_id = Column(
resource_id = Column(String(100), primary_key=True) String(100),
ForeignKey("tenants.id"),
primary_key=True )
start = Column(DateTime, nullable=False) start = Column(DateTime, nullable=False)
end = Column(DateTime, nullable=False) end = Column(DateTime, nullable=False)
resource = relationship(Resource, tenant = relationship("Tenant")
primaryjoin=(resource_id == Resource.id))
tenant = relationship(Resource,
primaryjoin=(tenant_id == Resource.tenant_id))
@hybrid_property @hybrid_property
def length(self): def length(self):
@ -96,18 +95,13 @@ class SalesOrder(Base):
def intersects(self, other): def intersects(self, other):
return ( self.start <= other.end and other.start <= self.end ) return ( self.start <= other.end and other.start <= self.end )
__table_args__ = ( ForeignKeyConstraint(
["resource_id", "tenant_id"],
["resources.id", "resources.tenant_id"],
name="fk_sales", use_alter=True
), )
# Create a trigger in MySQL that enforces our range overlap constraints, # Create a trigger in MySQL that enforces our range overlap constraints,
# since MySQL lacks a native range overlap type. # since MySQL lacks a native range overlap type.
# Mysql trigger: # Mysql trigger:
mysql_trigger = """
mysql_table_triggers = {
UsageEntry.__table__:"""
CREATE TRIGGER %(table)s_%(funcname)s_range_constraint CREATE TRIGGER %(table)s_%(funcname)s_range_constraint
BEFORE %(type)s ON `%(table)s` BEFORE %(type)s ON `%(table)s`
FOR EACH ROW FOR EACH ROW
@ -116,15 +110,31 @@ mysql_trigger = """
SET existing = ( SELECT COUNT(*) FROM `%(table)s` t SET existing = ( SELECT COUNT(*) FROM `%(table)s` t
WHERE ( NEW.start <= t.end WHERE ( NEW.start <= t.end
AND t.start <= NEW.end ) AND t.start <= NEW.end )
AND service = NEW.service
AND tenant_id = NEW.tenant_id AND tenant_id = NEW.tenant_id
AND resource_id = NEW.resource_id ); AND resource_id = NEW.resource_id );
IF existing > 0 THEN IF existing > 0 THEN
SET NEW.start = NULL; SET NEW.start = NULL;
SET NEW.end = NULL; SET NEW.end = NULL;
END IF; END IF;
END;""",
SalesOrder.__table__:"""
CREATE TRIGGER %(table)s_%(funcname)s_range_constraint
BEFORE %(type)s ON `%(table)s`
FOR EACH ROW
BEGIN
DECLARE existing INT;
SET existing = ( SELECT COUNT(*) FROM `%(table)s` t
WHERE ( NEW.start <= t.end
AND t.start <= NEW.end )
AND tenant_id = NEW.tenant_id );
IF existing > 0 THEN
SET NEW.start = NULL;
SET NEW.end = NULL;
END IF;
END; END;
""" """
}
# before insert # before insert
@ -134,7 +144,7 @@ for table in (SalesOrder.__table__, UsageEntry.__table__):
event.listen( event.listen(
table, table,
"after_create", "after_create",
DDL(mysql_trigger % { DDL(mysql_table_triggers[table] % {
"table": table, "table": table,
"type": type_, "type": type_,
"funcname": funcmaps[type_]}).\ "funcname": funcmaps[type_]}).\
@ -146,7 +156,8 @@ for table in (SalesOrder.__table__, UsageEntry.__table__):
# This is currently not feasible because I can't find a way to emit different # This is currently not feasible because I can't find a way to emit different
# DDL for MySQL and Postgres to support the varying concepts (single vs. dual columns). # DDL for MySQL and Postgres to support the varying concepts (single vs. dual columns).
pgsql_trigger_func = """ pgsql_trigger_funcs = {
UsageEntry.__table__:"""
CREATE FUNCTION %(table)s_exclusion_constraint_trigger() RETURNS trigger AS $trigger$ CREATE FUNCTION %(table)s_exclusion_constraint_trigger() RETURNS trigger AS $trigger$
DECLARE DECLARE
existing INTEGER = 0; existing INTEGER = 0;
@ -163,43 +174,80 @@ CREATE FUNCTION %(table)s_exclusion_constraint_trigger() RETURNS trigger AS $tri
END IF; END IF;
RETURN NEW; RETURN NEW;
END; END;
$trigger$ LANGUAGE PLPGSQL; $trigger$ LANGUAGE PLPGSQL;""",
""" SalesOrder.__table__:"""
CREATE FUNCTION %(table)s_exclusion_constraint_trigger() RETURNS trigger AS $trigger$
DECLARE
existing INTEGER = 0;
BEGIN
SELECT count(*) INTO existing FROM %(table)s t
WHERE t.tenant_id = NEW.tenant_id
AND ( NEW.start <= t."end"
AND t.start <= NEW."end" );
IF existing > 0 THEN
RAISE SQLSTATE '23P01';
RETURN NULL;
END IF;
RETURN NEW;
END;
$trigger$ LANGUAGE PLPGSQL;"""
}
pgsql_trigger = """ pgsql_trigger = """
CREATE TRIGGER %(table)s_exclusion_trigger BEFORE INSERT OR UPDATE ON %(table)s CREATE TRIGGER %(table)s_exclusion_trigger BEFORE INSERT OR UPDATE ON %(table)s
FOR EACH ROW EXECUTE PROCEDURE %(table)s_exclusion_constraint_trigger(); FOR EACH ROW EXECUTE PROCEDURE %(table)s_exclusion_constraint_trigger();
""" """
event.listen( for table in (UsageEntry.__table__, SalesOrder.__table__):
UsageEntry.__table__, event.listen(
table,
"after_create", "after_create",
DDL(pgsql_trigger_func % {"table": UsageEntry.__tablename__}).execute_if(dialect="postgresql") DDL(pgsql_trigger_funcs[table] % {
) "table": table
event.listen( }).execute_if(dialect="postgresql")
UsageEntry.__table__, )
event.listen(
table,
"after_create", "after_create",
DDL(pgsql_trigger % {"table": UsageEntry.__tablename__}).execute_if(dialect="postgresql") DDL(pgsql_trigger % {
) "table": table
}
).execute_if(dialect="postgresql")
)
# event.listen( # Create the PGSQL secondary trigger for sales order overlaps, for
# SalesOrder.__table__, # the usage entry
# "after_create",
# DDL(pgsql_trigger_func % {"table": SalesOrder.__tablename__}).execute_if(dialect="postgresql")
# )
# event.listen(
# SalesOrder.__table__, pgsql_secondary_trigger = """
# "after_create", CREATE TRIGGER %(table)s_secondary_exclusion_trigger BEFORE INSERT OR UPDATE ON %(table)s
# DDL(pgsql_trigger % {"table": SalesOrder.__tablename__}).\ FOR EACH ROW EXECUTE PROCEDURE %(secondary_table)s_exclusion_constraint_trigger();
# execute_if(dialect="postgresql") """
# )
event.listen(
UsageEntry.__table__,
"after_create",
DDL(pgsql_secondary_trigger % {
"table": UsageEntry.__table__,
"secondary_table": SalesOrder.__table__
}).execute_if(dialect="postgresql")
)
event.listen(
UsageEntry.__table__,
"before_drop",
DDL("""DROP TRIGGER %(table)s_secondary_exclusion_trigger ON %(table)s""" % {
"table": UsageEntry.__table__,
"secondary_table": SalesOrder.__table__
}).execute_if(dialect="postgresql")
)
event.listen( event.listen(
UsageEntry.__table__, UsageEntry.__table__,
"before_drop", "before_drop",
DDL("DROP TRIGGER %s_exclusion_trigger" % UsageEntry.__tablename__).\ DDL("DROP TRIGGER %(table)s_exclusion_trigger ON %(table)s" % {
"table": UsageEntry.__tablename__ }).\
execute_if(dialect="postgresql") execute_if(dialect="postgresql")
) )
@ -213,7 +261,8 @@ event.listen(
event.listen( event.listen(
UsageEntry.__table__, UsageEntry.__table__,
"before_drop", "before_drop",
DDL("DROP TRIGGER %s_exclusion_trigger()" % SalesOrder.__tablename__ ).\ DDL("DROP TRIGGER %(table)s_exclusion_trigger ON %(table)s" % {
"table": SalesOrder.__tablename__} ).\
execute_if(dialect="postgresql") execute_if(dialect="postgresql")
) )

View File

@ -0,0 +1,37 @@
import subprocess
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session,create_session
from sqlalchemy.pool import NullPool
from artifice.models import Resource, Tenant, UsageEntry, SalesOrder, Base
DATABASE_NAME = "test_artifice"
PG_DATABASE_URI = "postgresql://aurynn:postgres@localhost/%s" % DATABASE_NAME
MY_DATABASE_URI = "mysql://root:password@localhost/%s" % DATABASE_NAME
def setUp():
subprocess.call(["/usr/bin/createdb","%s" % DATABASE_NAME])
subprocess.call(["mysql", "-u", "root","--password=password", "-e", "CREATE DATABASE %s" % DATABASE_NAME])
mysql_engine = create_engine(MY_DATABASE_URI, poolclass=NullPool)
pg_engine = create_engine(PG_DATABASE_URI, poolclass=NullPool)
Base.metadata.create_all(bind=mysql_engine)
Base.metadata.create_all(bind=pg_engine)
mysql_engine.dispose()
pg_engine.dispose()
def tearDown():
mysql_engine = create_engine(MY_DATABASE_URI, poolclass=NullPool)
pg_engine = create_engine(PG_DATABASE_URI, poolclass=NullPool)
Base.metadata.drop_all(bind=mysql_engine)
Base.metadata.drop_all(bind=pg_engine)
mysql_engine.dispose()
pg_engine.dispose()
subprocess.call(["/usr/bin/dropdb","%s" % DATABASE_NAME])
subprocess.call(["mysql", "-u", "root", "--password=password", "-e", "DROP DATABASE %s" % DATABASE_NAME])

View File

@ -6,31 +6,27 @@ from artifice.models import Resource, Tenant, UsageEntry, SalesOrder, Base
import datetime import datetime
import subprocess import subprocess
DATABASE_NAME = "test_artifice" from . import DATABASE_NAME
pg_engine = None pg_engine = None
mysql_engine = None mysql_engine = None
PG_DATABASE_URI = "postgresql://aurynn:postgres@localhost/%s" % DATABASE_NAME PG_DATABASE_URI = "postgresql://aurynn:postgres@localhost/%s" % DATABASE_NAME
MY_DATABASE_URI = "mysql://root@localhost/%s" % DATABASE_NAME MY_DATABASE_URI = "mysql://root:password@localhost/%s" % DATABASE_NAME
def setUp(): def setUp():
subprocess.call(["/usr/bin/createdb","%s" % DATABASE_NAME]) # subprocess.call(["/usr/bin/createdb","%s" % DATABASE_NAME])
subprocess.call(["mysql", "-u", "root", "-e", "CREATE DATABASE %s" % DATABASE_NAME]) # subprocess.call(["mysql", "-u", "root","--password=password", "-e", "CREATE DATABASE %s" % DATABASE_NAME])
global mysql_engine global mysql_engine
mysql_engine = create_engine(MY_DATABASE_URI, poolclass=NullPool) mysql_engine = create_engine(MY_DATABASE_URI, poolclass=NullPool)
global pg_engine global pg_engine
pg_engine = create_engine(PG_DATABASE_URI, poolclass=NullPool) pg_engine = create_engine(PG_DATABASE_URI, poolclass=NullPool)
Base.metadata.create_all(bind=mysql_engine)
Base.metadata.create_all(bind=pg_engine)
def tearDown(): def tearDown():
pg_engine.dispose() pg_engine.dispose()
mysql_engine.dispose() mysql_engine.dispose()
# subprocess.call(["/usr/bin/dropdb","%s" % DATABASE_NAME])
# subprocess.call(["mysql", "-u", "root", "-e", "DROP DATABASE %s" % DATABASE_NAME])
class db(unittest.TestCase): class db(unittest.TestCase):
@ -105,15 +101,14 @@ class db(unittest.TestCase):
self.test_insert_usage_entry() self.test_insert_usage_entry()
self.db.begin() self.db.begin()
usage = self.db.query(UsageEntry)[0] usage = self.db.query(UsageEntry)[0]
so = SalesOrder(tenant = usage.tenant, tenant =self.db.query(Tenant).get("asfd")
resource = usage.resource, so = SalesOrder(tenant = tenant,
start = usage.start, start = usage.start,
end = usage.end) end = usage.end)
self.db.add(so) self.db.add(so)
self.db.commit() self.db.commit()
so2 = self.db.query(SalesOrder)[0] so2 = self.db.query(SalesOrder)[0]
self.assertTrue(so2.tenant.id == so.tenant.id) self.assertTrue(so2.tenant.id == so.tenant.id)
self.assertTrue(so2.resource.id == so.resource.id)
self.assertTrue(so2.start == so.start) self.assertTrue(so2.start == so.start)
self.assertTrue(so2.end == so.end) self.assertTrue(so2.end == so.end)
def test_overlap_sales_order_fails(self): def test_overlap_sales_order_fails(self):