Restore the check_foreign_keys() method.

This method was prematurely removed from oslo.db without
a deprecation phase, when Alembic added support for
foreign key autodetection.   oslo.db continues to use
alembic for this purpose, however the
ModelsMigrationsSync.check_foreign_keys() method is restored
directly for those projects which were calling into it
explicitly.

Change-Id: Id892567bd60d6b4b88765bbfe3cd5c5e75910b25
This commit is contained in:
Mike Bayer 2015-01-12 17:26:23 -05:00
parent 98cdceb1ec
commit b1fc55c7ce
2 changed files with 259 additions and 0 deletions

View File

@ -15,6 +15,7 @@
# under the License.
import abc
import collections
import logging
import pprint
@ -510,6 +511,77 @@ class ModelsMigrationsSync(object):
for table in tbs:
conn.execute(schema.DropTable(table))
FKInfo = collections.namedtuple('fk_info', ['constrained_columns',
'referred_table',
'referred_columns'])
def check_foreign_keys(self, metadata, bind):
"""Compare foreign keys between model and db table.
:returns: a list that contains information about:
* should be a new key added or removed existing,
* name of that key,
* source table,
* referred table,
* constrained columns,
* referred columns
Output::
[('drop_key',
'testtbl_fk_check_fkey',
'testtbl',
fk_info(constrained_columns=(u'fk_check',),
referred_table=u'table',
referred_columns=(u'fk_check',)))]
DEPRECATED: this function is deprecated and will be removed from
oslo.db in a few releases. Alembic autogenerate.compare_metadata()
now includes foreign key comparison directly.
"""
diff = []
insp = sqlalchemy.engine.reflection.Inspector.from_engine(bind)
# Get all tables from db
db_tables = insp.get_table_names()
# Get all tables from models
model_tables = metadata.tables
for table in db_tables:
if table not in model_tables:
continue
# Get all necessary information about key of current table from db
fk_db = dict((self._get_fk_info_from_db(i), i['name'])
for i in insp.get_foreign_keys(table))
fk_db_set = set(fk_db.keys())
# Get all necessary information about key of current table from
# models
fk_models = dict((self._get_fk_info_from_model(fk), fk)
for fk in model_tables[table].foreign_keys)
fk_models_set = set(fk_models.keys())
for key in (fk_db_set - fk_models_set):
diff.append(('drop_key', fk_db[key], table, key))
LOG.info(("Detected removed foreign key %(fk)r on "
"table %(table)r"), {'fk': fk_db[key],
'table': table})
for key in (fk_models_set - fk_db_set):
diff.append(('add_key', fk_models[key], table, key))
LOG.info((
"Detected added foreign key for column %(fk)r on table "
"%(table)r"), {'fk': fk_models[key].column.name,
'table': table})
return diff
def _get_fk_info_from_db(self, fk):
return self.FKInfo(tuple(fk['constrained_columns']),
fk['referred_table'],
tuple(fk['referred_columns']))
def _get_fk_info_from_model(self, fk):
return self.FKInfo((fk.parent.name,), fk.column.table.name,
(fk.column.name,))
def test_models_sync(self):
# recent versions of sqlalchemy and alembic are needed for running of
# this test, but we already have them in requirements

View File

@ -307,3 +307,190 @@ class ModelsMigrationsSyncPsql(ModelsMigrationSyncMixin,
def test_models_not_sync(self):
self._test_models_not_sync()
class TestOldCheckForeignKeys(test_base.DbTestCase):
def setUp(self):
super(TestOldCheckForeignKeys, self).setUp()
test = self
class MigrateSync(migrate.ModelsMigrationsSync):
def get_engine(self):
return test.engine
def get_metadata(self):
return test.metadata
def db_sync(self):
raise NotImplementedError()
self.migration_sync = MigrateSync()
def _fk_added_fixture(self):
self.metadata = sa.MetaData()
self.metadata_migrations = sa.MetaData()
sa.Table(
'testtbl_one', self.metadata,
sa.Column('id', sa.Integer, primary_key=True),
mysql_engine='InnoDB'
)
sa.Table(
'testtbl_two', self.metadata,
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('tone_id', sa.Integer),
mysql_engine='InnoDB'
)
sa.Table(
'testtbl_one', self.metadata_migrations,
sa.Column('id', sa.Integer, primary_key=True),
mysql_engine='InnoDB'
)
sa.Table(
'testtbl_two', self.metadata_migrations,
sa.Column('id', sa.Integer, primary_key=True),
sa.Column(
'tone_id', sa.Integer,
sa.ForeignKey('testtbl_one.id', name="tone_id_fk")),
mysql_engine='InnoDB'
)
def _fk_removed_fixture(self):
self.metadata = sa.MetaData()
self.metadata_migrations = sa.MetaData()
sa.Table(
'testtbl_one', self.metadata,
sa.Column('id', sa.Integer, primary_key=True),
mysql_engine='InnoDB'
)
sa.Table(
'testtbl_two', self.metadata,
sa.Column('id', sa.Integer, primary_key=True),
sa.Column(
'tone_id', sa.Integer,
sa.ForeignKey('testtbl_one.id', name="tone_id_fk")),
mysql_engine='InnoDB'
)
sa.Table(
'testtbl_one', self.metadata_migrations,
sa.Column('id', sa.Integer, primary_key=True),
mysql_engine='InnoDB'
)
sa.Table(
'testtbl_two', self.metadata_migrations,
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('tone_id', sa.Integer),
mysql_engine='InnoDB'
)
def _fk_no_change_fixture(self):
self.metadata = sa.MetaData()
self.metadata_migrations = sa.MetaData()
sa.Table(
'testtbl_one', self.metadata,
sa.Column('id', sa.Integer, primary_key=True),
mysql_engine='InnoDB'
)
sa.Table(
'testtbl_two', self.metadata,
sa.Column('id', sa.Integer, primary_key=True),
sa.Column(
'tone_id', sa.Integer,
sa.ForeignKey('testtbl_one.id', name="tone_id_fk")),
mysql_engine='InnoDB'
)
sa.Table(
'testtbl_one', self.metadata_migrations,
sa.Column('id', sa.Integer, primary_key=True),
mysql_engine='InnoDB'
)
sa.Table(
'testtbl_two', self.metadata_migrations,
sa.Column('id', sa.Integer, primary_key=True),
sa.Column(
'tone_id', sa.Integer,
sa.ForeignKey('testtbl_one.id', name="tone_id_fk")),
mysql_engine='InnoDB'
)
def _run_test(self):
self.metadata.create_all(bind=self.engine)
return self.migration_sync.check_foreign_keys(
self.metadata_migrations, self.engine)
def _compare_diffs(self, diffs, compare_to):
diffs = [
(
cmd,
fk._get_colspec() if isinstance(fk, sa.ForeignKey)
else "tone_id_fk" if fk is None # sqlite workaround
else fk,
tname, fk_info
)
for cmd, fk, tname, fk_info in diffs
]
self.assertEqual(diffs, compare_to)
def test_fk_added(self):
self._fk_added_fixture()
diffs = self._run_test()
self._compare_diffs(
diffs,
[(
'add_key',
'testtbl_one.id',
'testtbl_two',
self.migration_sync.FKInfo(
constrained_columns=('tone_id',),
referred_table='testtbl_one',
referred_columns=('id',))
)]
)
def test_fk_removed(self):
self._fk_removed_fixture()
diffs = self._run_test()
self._compare_diffs(
diffs,
[(
'drop_key',
"tone_id_fk",
'testtbl_two',
self.migration_sync.FKInfo(
constrained_columns=('tone_id',),
referred_table='testtbl_one',
referred_columns=('id',))
)]
)
def test_fk_no_change(self):
self._fk_no_change_fixture()
diffs = self._run_test()
self._compare_diffs(
diffs,
[])
class PGTestOldCheckForeignKeys(
TestOldCheckForeignKeys, test_base.PostgreSQLOpportunisticTestCase):
pass
class MySQLTestOldCheckForeignKeys(
TestOldCheckForeignKeys, test_base.MySQLOpportunisticTestCase):
pass