- refactor migrate.changeset;

- visitors are refactored to be more unified
- constraint module is refactored, CheckConstraint is added
- documentation is partialy updated, dialect support table is added (unfinished)
- test_constraint was updated
NOTE: oracle and mysql were not tested, *may be broken*
This commit is contained in:
iElectric 2009-06-16 15:17:33 +00:00
parent cc82a1ad12
commit 7eafe744c2
15 changed files with 509 additions and 287 deletions

3
TODO
View File

@ -12,3 +12,6 @@ make_update_script_for_model:
- refactor test_shell to test_api and use TestScript for cmd line testing - refactor test_shell to test_api and use TestScript for cmd line testing
- controlledschema.drop() drops whole migrate table, maybe there are some other repositories bound to it! - controlledschema.drop() drops whole migrate table, maybe there are some other repositories bound to it!
- document sqlite hacks (unique index for pk constraint)
- document constraints usage, document all ways then can be used, document cascade,table,columns options

View File

@ -10,6 +10,7 @@ Module :mod:`ansisql <migrate.changeset.ansisql>`
.. automodule:: migrate.changeset.ansisql .. automodule:: migrate.changeset.ansisql
:members: :members:
:member-order: groupwise
:synopsis: Standard SQL implementation for altering database schemas :synopsis: Standard SQL implementation for altering database schemas
Module :mod:`constraint <migrate.changeset.constraint>` Module :mod:`constraint <migrate.changeset.constraint>`
@ -17,6 +18,8 @@ Module :mod:`constraint <migrate.changeset.constraint>`
.. automodule:: migrate.changeset.constraint .. automodule:: migrate.changeset.constraint
:members: :members:
:show-inheritance:
:member-order: groupwise
:synopsis: Standalone schema constraint objects :synopsis: Standalone schema constraint objects
Module :mod:`databases <migrate.changeset.databases>` Module :mod:`databases <migrate.changeset.databases>`
@ -26,20 +29,28 @@ Module :mod:`databases <migrate.changeset.databases>`
:members: :members:
:synopsis: Database specific changeset implementations :synopsis: Database specific changeset implementations
.. _mysql-d:
Module :mod:`mysql <migrate.changeset.databases.mysql>` Module :mod:`mysql <migrate.changeset.databases.mysql>`
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. automodule:: migrate.changeset.databases.mysql .. automodule:: migrate.changeset.databases.mysql
:members: :members:
:synopsis: MySQL database specific changeset implementations :synopsis: MySQL database specific changeset implementations
.. _oracle-d:
Module :mod:`oracle <migrate.changeset.databases.oracle>` Module :mod:`oracle <migrate.changeset.databases.oracle>`
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. automodule:: migrate.changeset.databases.oracle .. automodule:: migrate.changeset.databases.oracle
:members: :members:
:synopsis: Oracle database specific changeset implementations :synopsis: Oracle database specific changeset implementations
.. _postgres-d:
Module :mod:`postgres <migrate.changeset.databases.postgres>` Module :mod:`postgres <migrate.changeset.databases.postgres>`
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
@ -47,8 +58,10 @@ Module :mod:`postgres <migrate.changeset.databases.postgres>`
:members: :members:
:synopsis: PostgreSQL database specific changeset implementations :synopsis: PostgreSQL database specific changeset implementations
Module :mod:`sqlite <migrate.changeset.databases.slite>` .. _sqlite-d:
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Module :mod:`sqlite <migrate.changeset.databases.sqlite>`
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. automodule:: migrate.changeset.databases.sqlite .. automodule:: migrate.changeset.databases.sqlite
:members: :members:

View File

@ -1,6 +1,13 @@
0.5.5 0.5.5
----- -----
- code coverage is up to 99%
- Constraint classes have cascade=True keyword argument to issue CASCADE drop where supported
- added UniqueConstraint/CheckConstraint and corresponding create/drop methods
- partial refactoring of changeset package
- majoy update to documentation
- dialect support table was added to documentation
.. _backwards-055: .. _backwards-055:
**Backward incompatible changes**: **Backward incompatible changes**:

View File

@ -1,4 +1,5 @@
.. _changeset-system: .. _changeset-system:
.. highlight:: python
****************** ******************
Database changeset Database changeset
@ -73,6 +74,7 @@ Rename a table::
table.rename('newtablename') table.rename('newtablename')
.. _`table create/drop`: http://www.sqlalchemy.org/docs/05/metadata.html#creating-and-dropping-database-tables .. _`table create/drop`: http://www.sqlalchemy.org/docs/05/metadata.html#creating-and-dropping-database-tables
.. currentmodule:: migrate.changeset.constraint
Index Index
===== =====
@ -88,28 +90,86 @@ Rename an index, given an SQLAlchemy ``Index`` object::
Constraint Constraint
========== ==========
SQLAlchemy supports creating/dropping constraints at the same time a table is created/dropped. SQLAlchemy Migrate adds support for creating/dropping primary/foreign key constraints independently. SQLAlchemy supports creating/dropping constraints at the same time a table is created/dropped. SQLAlchemy Migrate adds support for creating/dropping :class:`PrimaryKeyConstraint`/:class:`ForeignKeyConstraint`/:class:`CheckConstraint`/:class:`UniqueConstraint` constraints independently. (as ALTER TABLE statements).
The following rundowns are true for all constraints classes:
1. Make sure you do ``from migrate.changeset import *`` after SQLAlchemy imports since `migrate` does not patch SA's Constraints.
2. You can also use Constraints as in SQLAlchemy. In this case passing table argument explicitly is required::
cons = PrimaryKeyConstraint('id', 'num', table=self.table)
# Create the constraint
cons.create()
# Drop the constraint
cons.drop()
or you can pass column objects (and table argument can be left out).
3. Some dialects support CASCADE option when dropping constraints::
cons = PrimaryKeyConstraint(col1, col2)
# Create the constraint
cons.create()
# Drop the constraint
cons.drop(cascade=True)
.. note::
SQLAlchemy Migrate will try to guess the name of the constraints for databases, but if it's something other than the default, you'll need to give its name. Best practice is to always name your constraints. Note that Oracle requires that you state the name of the constraint to be created/dropped.
Examples
---------
Primary key constraints:: Primary key constraints::
from migrate.changeset import *
cons = PrimaryKeyConstraint(col1, col2) cons = PrimaryKeyConstraint(col1, col2)
# Create the constraint # Create the constraint
cons.create() cons.create()
# Drop the constraint # Drop the constraint
cons.drop() cons.drop()
Note that Oracle requires that you state the name of the primary key constraint to be created/dropped. SQLAlchemy Migrate will try to guess the name of the PK constraint for other databases, but if it's something other than the default, you'll need to give its name::
PrimaryKeyConstraint(col1, col2, name='my_pk_constraint')
Foreign key constraints:: Foreign key constraints::
from migrate.changeset import *
cons = ForeignKeyConstraint([table.c.fkey], [othertable.c.id]) cons = ForeignKeyConstraint([table.c.fkey], [othertable.c.id])
# Create the constraint # Create the constraint
cons.create() cons.create()
# Drop the constraint # Drop the constraint
cons.drop() cons.drop()
Names are specified just as with primary key constraints:: Check constraints::
ForeignKeyConstraint([table.c.fkey], [othertable.c.id], name='my_fk_constraint') from migrate.changeset import *
cons = CheckConstraint('id > 3', columns=[table.c.id])
# Create the constraint
cons.create()
# Drop the constraint
cons.drop()
Unique constraints::
from migrate.changeset import *
cons = UniqueConstraint('id', 'age', table=self.table)
# Create the constraint
cons.create()
# Drop the constraint
cons.drop()

View File

@ -28,7 +28,10 @@ sys.path.append(os.path.dirname(os.path.abspath('.')))
# Add any Sphinx extension module names here, as strings. They can be extensions # Add any Sphinx extension module names here, as strings. They can be extensions
# coming with Sphinx (named 'sphinx.ext.*') or your custom ones. # coming with Sphinx (named 'sphinx.ext.*') or your custom ones.
extensions = ['sphinx.ext.autodoc'] extensions = ['sphinx.ext.autodoc', 'sphinx.ext.intersphinx']
# link to sqlalchemy docs
intersphinx_mapping = {'http://www.sqlalchemy.org/docs/05/': None}
# Add any paths that contain templates here, relative to this directory. # Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates'] templates_path = ['_templates']

View File

@ -31,13 +31,52 @@
Version **0.5.5** breaks backward compatability, please read :ref:`changelog <backwards-055>` for more info. Version **0.5.5** breaks backward compatability, please read :ref:`changelog <backwards-055>` for more info.
Download and Development of SQLAlchemy Migrate
---------------------------------------------- Download and Development
------------------------
.. toctree:: .. toctree::
download download
Dialect support
----------------------------------
+--------------------------+--------------------------+------------------------------+------------------------+---------------------------+----------+-------+
| Operation / Dialect | :ref:`sqlite <sqlite-d>` | :ref:`postgres <postgres-d>` | :ref:`mysql <mysql-d>` | :ref:`oracle <oracle-d>` | firebird | mssql |
| | | | | | | |
+==========================+==========================+==============================+========================+===========================+==========+=======+
| ALTER TABLE | yes | yes | | | | |
| RENAME TABLE | | | | | | |
+--------------------------+--------------------------+------------------------------+------------------------+---------------------------+----------+-------+
| ALTER TABLE | yes | yes | | | | |
| RENAME COLUMN | (workaround) [#1]_ | | | | | |
+--------------------------+--------------------------+------------------------------+------------------------+---------------------------+----------+-------+
| ALTER TABLE | yes | yes | | | | |
| DROP COLUMN | (workaround) [#1]_ | | | | | |
+--------------------------+--------------------------+------------------------------+------------------------+---------------------------+----------+-------+
| ALTER TABLE | yes | yes | | | | |
| ADD COLUMN | (with limitations) [#2]_ | | | | | |
+--------------------------+--------------------------+------------------------------+------------------------+---------------------------+----------+-------+
| ALTER TABLE | no | yes | | | | |
| ADD CONSTRAINT | | | | | | |
+--------------------------+--------------------------+------------------------------+------------------------+---------------------------+----------+-------+
| ALTER TABLE | no | yes | | | | |
| DROP CONSTRAINT | | | | | | |
+--------------------------+--------------------------+------------------------------+------------------------+---------------------------+----------+-------+
| ALTER TABLE | no | yes | | | | |
| ALTER COLUMN | | | | | | |
+--------------------------+--------------------------+------------------------------+------------------------+---------------------------+----------+-------+
| RENAME INDEX | no | yes | | | | |
| | | | | | | |
+--------------------------+--------------------------+------------------------------+------------------------+---------------------------+----------+-------+
.. [#1] Table is renamed to temporary table, new table is created followed by INSERT statements.
.. [#2] Visit http://www.sqlite.org/lang_altertable.html for more information.
Documentation Documentation
------------- -------------

View File

@ -4,5 +4,11 @@
.. [#] SQL Data Definition Language .. [#] SQL Data Definition Language
""" """
import sqlalchemy
from migrate.changeset.schema import * from migrate.changeset.schema import *
from migrate.changeset.constraint import * from migrate.changeset.constraint import *
sqlalchemy.schema.Table.__bases__ += (ChangesetTable, )
sqlalchemy.schema.Column.__bases__ += (ChangesetColumn, )
sqlalchemy.schema.Index.__bases__ += (ChangesetIndex, )

View File

@ -5,10 +5,15 @@
things that just happen to work with multiple databases. things that just happen to work with multiple databases.
""" """
import sqlalchemy as sa import sqlalchemy as sa
from sqlalchemy.engine.base import Connection, Dialect from sqlalchemy.engine.default import DefaultDialect
from sqlalchemy.sql.compiler import SchemaGenerator from sqlalchemy.sql.compiler import SchemaGenerator, SchemaDropper
from sqlalchemy.schema import ForeignKeyConstraint from sqlalchemy.schema import (ForeignKeyConstraint,
from migrate.changeset import constraint, exceptions PrimaryKeyConstraint,
CheckConstraint,
UniqueConstraint)
from migrate.changeset import exceptions, constraint
SchemaIterator = sa.engine.SchemaIterator SchemaIterator = sa.engine.SchemaIterator
@ -78,6 +83,14 @@ class ANSIColumnGenerator(AlterTableVisitor, SchemaGenerator):
self.append(colspec) self.append(colspec)
self.execute() self.execute()
# add in foreign keys
if column.foreign_keys:
self.visit_alter_foriegn_keys(column)
def visit_alter_foriegn_keys(self, column):
for fk in column.foreign_keys:
self.define_foreign_key(fk.constraint)
def visit_table(self, table): def visit_table(self, table):
"""Default table visitor, does nothing. """Default table visitor, does nothing.
@ -87,7 +100,8 @@ class ANSIColumnGenerator(AlterTableVisitor, SchemaGenerator):
pass pass
class ANSIColumnDropper(AlterTableVisitor):
class ANSIColumnDropper(AlterTableVisitor, SchemaDropper):
"""Extends ANSI SQL dropper for column dropping (``ALTER TABLE """Extends ANSI SQL dropper for column dropping (``ALTER TABLE
DROP COLUMN``). DROP COLUMN``).
""" """
@ -118,24 +132,23 @@ class ANSISchemaChanger(AlterTableVisitor, SchemaGenerator):
name. NONE means the name is unchanged. name. NONE means the name is unchanged.
""" """
def visit_table(self, param): def visit_table(self, table):
"""Rename a table. Other ops aren't supported.""" """Rename a table. Other ops aren't supported."""
table, newname = param
self.start_alter_table(table) self.start_alter_table(table)
self.append("RENAME TO %s" % self.preparer.quote(newname, table.quote)) self.append("RENAME TO %s" % self.preparer.quote(table.new_name, table.quote))
self.execute() self.execute()
def visit_index(self, param): def visit_index(self, index):
"""Rename an index""" """Rename an index"""
index, newname = param
self.append("ALTER INDEX %s RENAME TO %s" % self.append("ALTER INDEX %s RENAME TO %s" %
(self.preparer.quote(self._validate_identifier(index.name, True), index.quote), (self.preparer.quote(self._validate_identifier(index.name, True), index.quote),
self.preparer.quote(self._validate_identifier(newname, True) , index.quote))) self.preparer.quote(self._validate_identifier(index.new_name, True) , index.quote)))
self.execute() self.execute()
def visit_column(self, delta): def visit_column(self, column):
"""Rename/change a column.""" """Rename/change a column."""
# ALTER COLUMN is implemented as several ALTER statements # ALTER COLUMN is implemented as several ALTER statements
delta = column.delta
keys = delta.keys() keys = delta.keys()
if 'type' in keys: if 'type' in keys:
self._run_subvisit(delta, self._visit_column_type) self._run_subvisit(delta, self._visit_column_type)
@ -246,99 +259,73 @@ class ANSIConstraintCommon(AlterTableVisitor):
ret = cons.name = cons.autoname() ret = cons.name = cons.autoname()
return self.preparer.quote(ret, cons.quote) return self.preparer.quote(ret, cons.quote)
def visit_migrate_primary_key_constraint(self, *p, **k):
self._visit_constraint(*p, **k)
class ANSIConstraintGenerator(ANSIConstraintCommon): def visit_migrate_foreign_key_constraint(self, *p, **k):
self._visit_constraint(*p, **k)
def visit_migrate_check_constraint(self, *p, **k):
self._visit_constraint(*p, **k)
def visit_migrate_unique_constraint(self, *p, **k):
self._visit_constraint(*p, **k)
class ANSIConstraintGenerator(ANSIConstraintCommon, SchemaGenerator):
def get_constraint_specification(self, cons, **kwargs): def get_constraint_specification(self, cons, **kwargs):
if isinstance(cons, constraint.PrimaryKeyConstraint): """Constaint SQL generators.
col_names = ', '.join([self.preparer.format_column(col) for col in cons.columns])
ret = "PRIMARY KEY (%s)" % col_names We cannot use SA visitors because they append comma.
if cons.name: """
# Named constraint if isinstance(cons, PrimaryKeyConstraint):
ret = ("CONSTRAINT %s " % self.preparer.format_constraint(cons)) + ret if cons.name is not None:
elif isinstance(cons, constraint.ForeignKeyConstraint): self.append("CONSTRAINT %s " % self.preparer.format_constraint(cons))
params = dict( self.append("PRIMARY KEY ")
columns = ', '.join(map(self.preparer.format_column, cons.columns)), self.append("(%s)" % ', '.join(self.preparer.quote(c.name, c.quote)
reftable = self.preparer.format_table(cons.reftable), for c in cons))
referenced = ', '.join(map(self.preparer.format_column, cons.referenced)), self.define_constraint_deferrability(cons)
name = self.get_constraint_name(cons), elif isinstance(cons, ForeignKeyConstraint):
) self.define_foreign_key(cons)
ret = "CONSTRAINT %(name)s FOREIGN KEY (%(columns)s) "\ elif isinstance(cons, CheckConstraint):
"REFERENCES %(reftable)s (%(referenced)s)" % params if cons.name is not None:
if cons.onupdate: self.append("CONSTRAINT %s " %
ret = ret + " ON UPDATE %s" % cons.onupdate self.preparer.format_constraint(cons))
if cons.ondelete: self.append(" CHECK (%s)" % cons.sqltext)
ret = ret + " ON DELETE %s" % cons.ondelete self.define_constraint_deferrability(cons)
elif isinstance(cons, constraint.CheckConstraint): elif isinstance(cons, UniqueConstraint):
ret = "CHECK (%s)" % cons.sqltext if cons.name is not None:
self.append("CONSTRAINT %s " %
self.preparer.format_constraint(cons))
self.append(" UNIQUE (%s)" % \
(', '.join(self.preparer.quote(c.name, c.quote) for c in cons)))
self.define_constraint_deferrability(cons)
else: else:
raise exceptions.InvalidConstraintError(cons) raise exceptions.InvalidConstraintError(cons)
return ret
def _visit_constraint(self, constraint): def _visit_constraint(self, constraint):
table = self.start_alter_table(constraint) table = self.start_alter_table(constraint)
constraint.name = self.get_constraint_name(constraint)
self.append("ADD ") self.append("ADD ")
spec = self.get_constraint_specification(constraint) self.get_constraint_specification(constraint)
self.append(spec)
self.execute() self.execute()
def visit_migrate_primary_key_constraint(self, *p, **k):
return self._visit_constraint(*p, **k)
def visit_migrate_foreign_key_constraint(self, *p, **k): class ANSIConstraintDropper(ANSIConstraintCommon, SchemaDropper):
return self._visit_constraint(*p, **k)
def visit_migrate_check_constraint(self, *p, **k):
return self._visit_constraint(*p, **k)
class ANSIConstraintDropper(ANSIConstraintCommon):
def _visit_constraint(self, constraint): def _visit_constraint(self, constraint):
self.start_alter_table(constraint) self.start_alter_table(constraint)
self.append("DROP CONSTRAINT ") self.append("DROP CONSTRAINT ")
self.append(self.get_constraint_name(constraint)) self.append(self.get_constraint_name(constraint))
if constraint.cascade:
self.append(" CASCADE")
self.execute() self.execute()
def visit_migrate_primary_key_constraint(self, *p, **k):
return self._visit_constraint(*p, **k)
def visit_migrate_foreign_key_constraint(self, *p, **k): class ANSIDialect(DefaultDialect):
return self._visit_constraint(*p, **k)
def visit_migrate_check_constraint(self, *p, **k):
return self._visit_constraint(*p, **k)
class ANSIFKGenerator(AlterTableVisitor, SchemaGenerator):
"""Extends ansisql generator for column creation (alter table add col)"""
def __init__(self, *args, **kwargs):
self.fk = kwargs.pop('fk', None)
super(ANSIFKGenerator, self).__init__(*args, **kwargs)
def visit_column(self, column):
"""Create foreign keys for a column (table already exists); #32"""
if self.fk:
self.add_foreignkey(self.fk.constraint)
if self.buffer.getvalue() != '':
self.execute()
def visit_table(self, table):
pass
class ANSIDialect(object):
columngenerator = ANSIColumnGenerator columngenerator = ANSIColumnGenerator
columndropper = ANSIColumnDropper columndropper = ANSIColumnDropper
schemachanger = ANSISchemaChanger schemachanger = ANSISchemaChanger
columnfkgenerator = ANSIFKGenerator constraintgenerator = ANSIConstraintGenerator
constraintdropper = ANSIConstraintDropper
@classmethod
def visitor(self, name):
return getattr(self, name)
def reflectconstraints(self, connection, table_name):
raise NotImplementedError()

View File

@ -4,6 +4,8 @@
import sqlalchemy import sqlalchemy
from sqlalchemy import schema from sqlalchemy import schema
from migrate.changeset.exceptions import *
class ConstraintChangeset(object): class ConstraintChangeset(object):
"""Base class for Constraint classes.""" """Base class for Constraint classes."""
@ -24,55 +26,50 @@ class ConstraintChangeset(object):
colnames.append(col) colnames.append(col)
return colnames, table return colnames, table
def create(self, *args, **kwargs): def __do_imports(self, visitor_name, *a, **kw):
engine = kw.pop('engine', self.table.bind)
from migrate.changeset.databases.visitor import (get_engine_visitor,
run_single_visitor)
visitorcallable = get_engine_visitor(engine, visitor_name)
run_single_visitor(engine, visitorcallable, self, *a, **kw)
def create(self, *a, **kw):
"""Create the constraint in the database. """Create the constraint in the database.
:param engine: the database engine to use. If this is \ :param engine: the database engine to use. If this is \
:keyword:`None` the instance's engine will be used :keyword:`None` the instance's engine will be used
:type engine: :class:`sqlalchemy.engine.base.Engine` :type engine: :class:`sqlalchemy.engine.base.Engine`
""" """
from migrate.changeset.databases.visitor import get_engine_visitor self.__do_imports('constraintgenerator', *a, **kw)
visitorcallable = get_engine_visitor(self.table.bind,
'constraintgenerator')
_engine_run_visitor(self.table.bind, visitorcallable, self)
def drop(self, *args, **kwargs): def drop(self, *a, **kw):
"""Drop the constraint from the database. """Drop the constraint from the database.
:param engine: the database engine to use. If this is :param engine: the database engine to use. If this is
:keyword:`None` the instance's engine will be used :keyword:`None` the instance's engine will be used
:param cascade: Issue CASCADE drop if database supports it
:type engine: :class:`sqlalchemy.engine.base.Engine` :type engine: :class:`sqlalchemy.engine.base.Engine`
:type cascade: bool
:returns: Instance with cleared columns
""" """
from migrate.changeset.databases.visitor import get_engine_visitor self.cascade = kw.pop('cascade', False)
visitorcallable = get_engine_visitor(self.table.bind, self.__do_imports('constraintdropper', *a, **kw)
'constraintdropper')
_engine_run_visitor(self.table.bind, visitorcallable, self)
self.columns.clear() self.columns.clear()
return self return self
def accept_schema_visitor(self, visitor, *p, **k):
"""Call the visitor only if it defines the given function"""
return getattr(visitor, self._func)(self)
def autoname(self):
"""Automatically generate a name for the constraint instance.
Subclasses must implement this method.
"""
def _engine_run_visitor(engine, visitorcallable, element, **kwargs):
conn = engine.connect()
try:
element.accept_schema_visitor(visitorcallable(conn))
finally:
conn.close()
class PrimaryKeyConstraint(ConstraintChangeset, schema.PrimaryKeyConstraint): class PrimaryKeyConstraint(ConstraintChangeset, schema.PrimaryKeyConstraint):
"""Primary key constraint class.""" """Construct PrimaryKeyConstraint
_func = 'visit_migrate_primary_key_constraint' Migrate's additional parameters:
:param cols: Columns in constraint.
:param table: If columns are passed as strings, this kw is required
:type table: Table instance
:type cols: strings or Column instances
"""
__visit_name__ = 'migrate_primary_key_constraint'
def __init__(self, *cols, **kwargs): def __init__(self, *cols, **kwargs):
colnames, table = self._normalize_columns(cols) colnames, table = self._normalize_columns(cols)
@ -81,23 +78,34 @@ class PrimaryKeyConstraint(ConstraintChangeset, schema.PrimaryKeyConstraint):
if table is not None: if table is not None:
self._set_parent(table) self._set_parent(table)
def autoname(self): def autoname(self):
"""Mimic the database's automatic constraint names""" """Mimic the database's automatic constraint names"""
return "%s_pkey" % self.table.name return "%s_pkey" % self.table.name
class ForeignKeyConstraint(ConstraintChangeset, schema.ForeignKeyConstraint): class ForeignKeyConstraint(ConstraintChangeset, schema.ForeignKeyConstraint):
"""Foreign key constraint class.""" """Construct ForeignKeyConstraint
_func = 'visit_migrate_foreign_key_constraint' Migrate's additional parameters:
def __init__(self, columns, refcolumns, *p, **k): :param columns: Columns in constraint
:param refcolumns: Columns that this FK reffers to in another table.
:param table: If columns are passed as strings, this kw is required
:type table: Table instance
:type columns: list of strings or Column instances
:type refcolumns: list of strings or Column instances
"""
__visit_name__ = 'migrate_foreign_key_constraint'
def __init__(self, columns, refcolumns, *args, **kwargs):
colnames, table = self._normalize_columns(columns) colnames, table = self._normalize_columns(columns)
table = k.pop('table', table) table = kwargs.pop('table', table)
refcolnames, reftable = self._normalize_columns(refcolumns, refcolnames, reftable = self._normalize_columns(refcolumns,
table_name=True) table_name=True)
super(ForeignKeyConstraint, self).__init__(colnames, refcolnames, *p, super(ForeignKeyConstraint, self).__init__(colnames, refcolnames, *args,
**k) **kwargs)
if table is not None: if table is not None:
self._set_parent(table) self._set_parent(table)
@ -118,20 +126,60 @@ class ForeignKeyConstraint(ConstraintChangeset, schema.ForeignKeyConstraint):
class CheckConstraint(ConstraintChangeset, schema.CheckConstraint): class CheckConstraint(ConstraintChangeset, schema.CheckConstraint):
"""Check constraint class.""" """Construct CheckConstraint
_func = 'visit_migrate_check_constraint' Migrate's additional parameters:
:param sqltext: Plain SQL text to check condition
:param columns: If not name is applied, you must supply this kw\
to autoname constraint
:param table: If columns are passed as strings, this kw is required
:type table: Table instance
:type columns: list of Columns instances
:type sqltext: string
"""
__visit_name__ = 'migrate_check_constraint'
def __init__(self, sqltext, *args, **kwargs): def __init__(self, sqltext, *args, **kwargs):
cols = kwargs.pop('columns') cols = kwargs.pop('columns', False)
if not cols and not kwargs.get('name', False):
raise InvalidConstraintError('You must either set "name"'
'parameter or "columns" to autogenarate it.')
colnames, table = self._normalize_columns(cols) colnames, table = self._normalize_columns(cols)
table = kwargs.pop('table', table) table = kwargs.pop('table', table)
ConstraintChangeset.__init__(self, *args, **kwargs) ConstraintChangeset.__init__(self, *args, **kwargs)
schema.CheckConstraint.__init__(self, sqltext, *args, **kwargs) schema.CheckConstraint.__init__(self, sqltext, *args, **kwargs)
if table is not None: if table is not None:
self.table = table
self._set_parent(table) self._set_parent(table)
self.colnames = colnames self.colnames = colnames
def autoname(self): def autoname(self):
return "%(table)s_%(cols)s_check" % \ return "%(table)s_%(cols)s_check" % \
dict(table=self.table.name, cols="_".join(self.colnames)) dict(table=self.table.name, cols="_".join(self.colnames))
class UniqueConstraint(ConstraintChangeset, schema.UniqueConstraint):
"""Construct UniqueConstraint
Migrate's additional parameters:
:param cols: Columns in constraint.
:param table: If columns are passed as strings, this kw is required
:type table: Table instance
:type cols: strings or Column instances
"""
__visit_name__ = 'migrate_unique_constraint'
def __init__(self, *cols, **kwargs):
self.colnames, table = self._normalize_columns(cols)
table = kwargs.pop('table', table)
super(UniqueConstraint, self).__init__(*self.colnames, **kwargs)
if table is not None:
self._set_parent(table)
def autoname(self):
"""Mimic the database's automatic constraint names"""
return "%s_%s_key" % (self.table.name, self.colnames[0])

View File

@ -53,18 +53,11 @@ class MySQLSchemaChanger(MySQLSchemaGenerator, ansisql.ANSISchemaChanger):
# If MySQL can do this, I can't find how # If MySQL can do this, I can't find how
raise exceptions.NotSupportedError("MySQL cannot rename indexes") raise exceptions.NotSupportedError("MySQL cannot rename indexes")
class MySQLConstraintGenerator(ansisql.ANSIConstraintGenerator): class MySQLConstraintGenerator(ansisql.ANSIConstraintGenerator):
pass pass
class MySQLConstraintDropper(ansisql.ANSIConstraintDropper): class MySQLConstraintDropper(ansisql.ANSIConstraintDropper):
#def visit_constraint(self,constraint):
# if isinstance(constraint,sqlalchemy.schema.PrimaryKeyConstraint):
# return self._visit_constraint_pk(constraint)
# elif isinstance(constraint,sqlalchemy.schema.ForeignKeyConstraint):
# return self._visit_constraint_fk(constraint)
# return super(MySQLConstraintDropper,self).visit_constraint(constraint)
def visit_migrate_primary_key_constraint(self, constraint): def visit_migrate_primary_key_constraint(self, constraint):
self.start_alter_table(constraint) self.start_alter_table(constraint)
@ -77,7 +70,6 @@ class MySQLConstraintDropper(ansisql.ANSIConstraintDropper):
self.append(self.preparer.format_constraint(constraint)) self.append(self.preparer.format_constraint(constraint))
self.execute() self.execute()
class MySQLDialect(ansisql.ANSIDialect): class MySQLDialect(ansisql.ANSIDialect):
columngenerator = MySQLColumnGenerator columngenerator = MySQLColumnGenerator
columndropper = MySQLColumnDropper columndropper = MySQLColumnDropper

View File

@ -3,27 +3,33 @@
.. _`SQLite`: http://www.sqlite.org/ .. _`SQLite`: http://www.sqlite.org/
""" """
from migrate.changeset import ansisql, constraint, exceptions from migrate.changeset import ansisql, exceptions, constraint
from sqlalchemy.databases import sqlite as sa_base from sqlalchemy.databases import sqlite as sa_base
from sqlalchemy import Table, MetaData from sqlalchemy import Table, MetaData
#import sqlalchemy as sa #import sqlalchemy as sa
SQLiteSchemaGenerator = sa_base.SQLiteSchemaGenerator SQLiteSchemaGenerator = sa_base.SQLiteSchemaGenerator
class SQLiteCommon(object):
class SQLiteHelper(object): def _not_supported(self, op):
raise exceptions.NotSupportedError("SQLite does not support "
"%s; see http://www.sqlite.org/lang_altertable.html" % op)
def visit_column(self, param):
class SQLiteHelper(SQLiteCommon):
def visit_column(self, column):
try: try:
table = self._to_table(param.table) table = self._to_table(column.table)
except: except:
table = self._to_table(param) table = self._to_table(column)
raise raise
table_name = self.preparer.format_table(table) table_name = self.preparer.format_table(table)
self.append('ALTER TABLE %s RENAME TO migration_tmp' % table_name) self.append('ALTER TABLE %s RENAME TO migration_tmp' % table_name)
self.execute() self.execute()
insertion_string = self._modify_table(table, param) insertion_string = self._modify_table(table, column)
table.create() table.create()
self.append(insertion_string % {'table_name': table_name}) self.append(insertion_string % {'table_name': table_name})
@ -32,12 +38,17 @@ class SQLiteHelper(object):
self.execute() self.execute()
class SQLiteColumnGenerator(SQLiteSchemaGenerator, class SQLiteColumnGenerator(SQLiteSchemaGenerator, SQLiteCommon,
ansisql.ANSIColumnGenerator): ansisql.ANSIColumnGenerator):
pass """SQLite ColumnGenerator"""
def visit_alter_foriegn_keys(self, column):
"""Does not support ALTER TABLE ADD FOREIGN KEY"""
self._not_supported("ALTER TABLE ADD CONSTRAINT")
class SQLiteColumnDropper(SQLiteHelper, ansisql.ANSIColumnDropper): class SQLiteColumnDropper(SQLiteHelper, ansisql.ANSIColumnDropper):
"""SQLite ColumnDropper"""
def _modify_table(self, table, column): def _modify_table(self, table, column):
del table.columns[column.name] del table.columns[column.name]
@ -47,18 +58,17 @@ class SQLiteColumnDropper(SQLiteHelper, ansisql.ANSIColumnDropper):
class SQLiteSchemaChanger(SQLiteHelper, ansisql.ANSISchemaChanger): class SQLiteSchemaChanger(SQLiteHelper, ansisql.ANSISchemaChanger):
"""SQLite SchemaChanger"""
def _not_supported(self, op): def _modify_table(self, table, column):
raise exceptions.NotSupportedError("SQLite does not support " delta = column.delta
"%s; see http://www.sqlite.org/lang_altertable.html" % op)
def _modify_table(self, table, delta):
column = table.columns[delta.current_name] column = table.columns[delta.current_name]
for k, v in delta.items(): for k, v in delta.items():
setattr(column, k, v) setattr(column, k, v)
return 'INSERT INTO %(table_name)s SELECT * from migration_tmp' return 'INSERT INTO %(table_name)s SELECT * from migration_tmp'
def visit_index(self, param): def visit_index(self, index):
"""Does not support ALTER INDEX"""
self._not_supported('ALTER INDEX') self._not_supported('ALTER INDEX')
@ -74,17 +84,6 @@ class SQLiteConstraintGenerator(ansisql.ANSIConstraintGenerator):
self.execute() self.execute()
class SQLiteFKGenerator(SQLiteSchemaChanger, ansisql.ANSIFKGenerator):
def visit_column(self, column):
"""Create foreign keys for a column (table already exists); #32"""
if self.fk:
self._not_supported("ALTER TABLE ADD FOREIGN KEY")
if self.buffer.getvalue() != '':
self.execute()
class SQLiteConstraintDropper(ansisql.ANSIColumnDropper, ansisql.ANSIConstraintCommon): class SQLiteConstraintDropper(ansisql.ANSIColumnDropper, ansisql.ANSIConstraintCommon):
def visit_migrate_primary_key_constraint(self, constraint): def visit_migrate_primary_key_constraint(self, constraint):
@ -94,6 +93,7 @@ class SQLiteConstraintDropper(ansisql.ANSIColumnDropper, ansisql.ANSIConstraintC
self.append(msg) self.append(msg)
self.execute() self.execute()
# TODO: add not_supported tags for constraint dropper/generator
class SQLiteDialect(ansisql.ANSIDialect): class SQLiteDialect(ansisql.ANSIDialect):
columngenerator = SQLiteColumnGenerator columngenerator = SQLiteColumnGenerator
@ -101,4 +101,3 @@ class SQLiteDialect(ansisql.ANSIDialect):
schemachanger = SQLiteSchemaChanger schemachanger = SQLiteSchemaChanger
constraintgenerator = SQLiteConstraintGenerator constraintgenerator = SQLiteConstraintGenerator
constraintdropper = SQLiteConstraintDropper constraintdropper = SQLiteConstraintDropper
columnfkgenerator = SQLiteFKGenerator

View File

@ -42,9 +42,18 @@ def get_dialect_visitor(sa_dialect, name):
# map sa dialect to migrate dialect and return visitor # map sa dialect to migrate dialect and return visitor
sa_dialect_cls = sa_dialect.__class__ sa_dialect_cls = sa_dialect.__class__
migrate_dialect_cls = dialects[sa_dialect_cls] migrate_dialect_cls = dialects[sa_dialect_cls]
visitor = migrate_dialect_cls.visitor(name) visitor = getattr(migrate_dialect_cls, name)
# bind preparer # bind preparer
visitor.preparer = sa_dialect.preparer(sa_dialect) visitor.preparer = sa_dialect.preparer(sa_dialect)
return visitor return visitor
def run_single_visitor(engine, visitorcallable, element, **kwargs):
"""Runs only one method on the visitor"""
conn = engine.contextual_connect(close_with_result=False)
try:
visitor = visitorcallable(engine.dialect, conn)
getattr(visitor, 'visit_' + element.__visit_name__)(element, **kwargs)
finally:
conn.close()

View File

@ -5,7 +5,9 @@ import re
import sqlalchemy import sqlalchemy
from migrate.changeset.databases.visitor import get_engine_visitor from migrate.changeset.databases.visitor import (get_engine_visitor,
run_single_visitor)
from migrate.changeset.exceptions import *
__all__ = [ __all__ = [
@ -14,6 +16,9 @@ __all__ = [
'alter_column', 'alter_column',
'rename_table', 'rename_table',
'rename_index', 'rename_index',
'ChangesetTable',
'ChangesetColumn',
'ChangesetIndex',
] ]
@ -97,7 +102,12 @@ def alter_column(*p, **k):
engine = k['engine'] engine = k['engine']
delta = _ColumnDelta(*p, **k) delta = _ColumnDelta(*p, **k)
visitorcallable = get_engine_visitor(engine, 'schemachanger') visitorcallable = get_engine_visitor(engine, 'schemachanger')
_engine_run_visitor(engine, visitorcallable, delta)
column = sqlalchemy.Column(delta.current_name)
column.delta = delta
column.table = delta.table
engine._run_visitor(visitorcallable, column)
#_engine_run_visitor(engine, visitorcallable, delta)
# Update column # Update column
if col is not None: if col is not None:
@ -145,15 +155,6 @@ def _to_index(index, table=None, engine=None):
return ret return ret
def _engine_run_visitor(engine, visitorcallable, element, **kwargs):
conn = engine.connect()
try:
element.accept_schema_visitor(visitorcallable(engine.dialect,
connection=conn))
finally:
conn.close()
def _normalize_table(column, table): def _normalize_table(column, table):
if table is not None: if table is not None:
if table is not column.table: if table is not column.table:
@ -166,22 +167,6 @@ def _normalize_table(column, table):
return column.table return column.table
class _WrapRename(object):
def __init__(self, item, name):
self.item = item
self.name = name
def accept_schema_visitor(self, visitor):
"""Map Class (Table, Index, Column) to visitor function"""
suffix = self.item.__class__.__name__.lower()
funcname = 'visit_%s' % suffix
func = getattr(visitor, funcname)
param = self.item, self.name
return func(param)
class _ColumnDelta(dict): class _ColumnDelta(dict):
"""Extracts the differences between two columns/column-parameters""" """Extracts the differences between two columns/column-parameters"""
@ -330,15 +315,14 @@ class ChangesetTable(object):
Python object Python object
""" """
engine = self.bind engine = self.bind
self.new_name = name
visitorcallable = get_engine_visitor(engine, 'schemachanger') visitorcallable = get_engine_visitor(engine, 'schemachanger')
param = _WrapRename(self, name) run_single_visitor(engine, visitorcallable, self, *args, **kwargs)
_engine_run_visitor(engine, visitorcallable, param, *args, **kwargs)
# Fix metadata registration # Fix metadata registration
meta = self.metadata
self.deregister()
self.name = name self.name = name
self._set_parent(meta) self.deregister()
self._set_parent(self.metadata)
def _meta_key(self): def _meta_key(self):
return sqlalchemy.schema._get_table_key(self.name, self.schema) return sqlalchemy.schema._get_table_key(self.name, self.schema)
@ -368,6 +352,9 @@ class ChangesetColumn(object):
Column name, type, default, and nullable may be changed Column name, type, default, and nullable may be changed
here. Note that for column defaults, only PassiveDefaults are here. Note that for column defaults, only PassiveDefaults are
managed by the database - changing others doesn't make sense. managed by the database - changing others doesn't make sense.
:param table: Table to be altered
:param engine: Engine to be used
""" """
if 'table' not in k: if 'table' not in k:
k['table'] = self.table k['table'] = self.table
@ -386,12 +373,6 @@ class ChangesetColumn(object):
visitorcallable = get_engine_visitor(engine, 'columngenerator') visitorcallable = get_engine_visitor(engine, 'columngenerator')
engine._run_visitor(visitorcallable, self, *args, **kwargs) engine._run_visitor(visitorcallable, self, *args, **kwargs)
# add in foreign keys
if self.foreign_keys:
for fk in self.foreign_keys:
visitorcallable = get_engine_visitor(engine,
'columnfkgenerator')
engine._run_visitor(visitorcallable, self, fk=fk)
return self return self
def drop(self, table=None, *args, **kwargs): def drop(self, table=None, *args, **kwargs):
@ -402,14 +383,15 @@ class ChangesetColumn(object):
table = _normalize_table(self, table) table = _normalize_table(self, table)
engine = table.bind engine = table.bind
visitorcallable = get_engine_visitor(engine, 'columndropper') visitorcallable = get_engine_visitor(engine, 'columndropper')
engine._run_visitor(lambda dialect, conn: visitorcallable(conn), engine._run_visitor(visitorcallable, self, *args, **kwargs)
self, *args, **kwargs)
return self return self
class ChangesetIndex(object): class ChangesetIndex(object):
"""Changeset extensions to SQLAlchemy Indexes.""" """Changeset extensions to SQLAlchemy Indexes."""
__visit_name__ = 'index'
def rename(self, name, *args, **kwargs): def rename(self, name, *args, **kwargs):
"""Change the name of an index. """Change the name of an index.
@ -417,15 +399,7 @@ class ChangesetIndex(object):
name. name.
""" """
engine = self.table.bind engine = self.table.bind
self.new_name = name
visitorcallable = get_engine_visitor(engine, 'schemachanger') visitorcallable = get_engine_visitor(engine, 'schemachanger')
param = _WrapRename(self, name) engine._run_visitor(visitorcallable, self, *args, **kwargs)
_engine_run_visitor(engine, visitorcallable, param, *args, **kwargs)
self.name = name self.name = name
def _patch():
"""All the 'ugly' operations that patch SQLAlchemy's internals."""
sqlalchemy.schema.Table.__bases__ += (ChangesetTable, )
sqlalchemy.schema.Column.__bases__ += (ChangesetColumn, )
sqlalchemy.schema.Index.__bases__ += (ChangesetIndex, )
_patch()

View File

@ -13,7 +13,7 @@ from migrate.changeset.schema import _ColumnDelta
from test import fixture from test import fixture
# TODO: add sqlite unique constraints (indexes), test quoting # TODO: test quoting
class TestAddDropColumn(fixture.DB): class TestAddDropColumn(fixture.DB):
level = fixture.DB.CONNECT level = fixture.DB.CONNECT
@ -195,12 +195,12 @@ class TestRename(fixture.DB):
@fixture.usedb() @fixture.usedb()
def test_rename_table(self): def test_rename_table(self):
"""Tables can be renamed""" """Tables can be renamed"""
#self.engine.echo=True c_name = 'col_1'
name1 = 'name_one' name1 = 'name_one'
name2 = 'name_two' name2 = 'name_two'
xname1 = 'x' + name1 xname1 = 'x' + name1
xname2 = 'x' + name2 xname2 = 'x' + name2
self.column = Column(name1,Integer) self.column = Column(c_name, Integer)
self.meta.clear() self.meta.clear()
self.table = Table(name1, self.meta, self.column) self.table = Table(name1, self.meta, self.column)
self.index = Index(xname1, self.column, unique=False) self.index = Index(xname1, self.column, unique=False)

View File

@ -3,38 +3,50 @@
from sqlalchemy import * from sqlalchemy import *
from sqlalchemy.util import * from sqlalchemy.util import *
from sqlalchemy.exc import *
from migrate.changeset import * from migrate.changeset import *
from test import fixture from test import fixture
class TestConstraint(fixture.DB): class CommonTestConstraint(fixture.DB):
level = fixture.DB.CONNECT """helper functions to test constraints.
we just create a fresh new table and make sure everything is
as required.
"""
def _setup(self, url): def _setup(self, url):
super(TestConstraint, self)._setup(url) super(CommonTestConstraint, self)._setup(url)
self._create_table() self._create_table()
def _teardown(self): def _teardown(self):
if hasattr(self, 'table') and self.engine.has_table(self.table.name): if hasattr(self, 'table') and self.engine.has_table(self.table.name):
self.table.drop() self.table.drop()
super(TestConstraint, self)._teardown() super(CommonTestConstraint, self)._teardown()
def _create_table(self): def _create_table(self):
self._connect(self.url) self._connect(self.url)
self.meta = MetaData(self.engine) self.meta = MetaData(self.engine)
self.table = Table('mytable', self.meta, self.tablename = 'mytable'
Column('id', Integer), self.table = Table(self.tablename, self.meta,
Column('id', Integer, unique=True),
Column('fkey', Integer), Column('fkey', Integer),
mysql_engine='InnoDB') mysql_engine='InnoDB')
if self.engine.has_table(self.table.name): if self.engine.has_table(self.table.name):
self.table.drop() self.table.drop()
self.table.create() self.table.create()
# make sure we start at zero
self.assertEquals(len(self.table.primary_key), 0) self.assertEquals(len(self.table.primary_key), 0)
self.assert_(isinstance(self.table.primary_key, self.assert_(isinstance(self.table.primary_key,
schema.PrimaryKeyConstraint), self.table.primary_key.__class__) schema.PrimaryKeyConstraint), self.table.primary_key.__class__)
class TestConstraint(CommonTestConstraint):
level = fixture.DB.CONNECT
def _define_pk(self, *cols): def _define_pk(self, *cols):
# Add a pk by creating a PK constraint # Add a pk by creating a PK constraint
pk = PrimaryKeyConstraint(table=self.table, *cols) pk = PrimaryKeyConstraint(table=self.table, *cols)
@ -46,7 +58,6 @@ class TestConstraint(fixture.DB):
self.refresh_table() self.refresh_table()
if not self.url.startswith('sqlite'): if not self.url.startswith('sqlite'):
self.assertEquals(list(self.table.primary_key), list(cols)) self.assertEquals(list(self.table.primary_key), list(cols))
#self.assert_(self.table.primary_key.name is not None)
# Drop the PK constraint # Drop the PK constraint
if not self.url.startswith('oracle'): if not self.url.startswith('oracle'):
@ -54,21 +65,20 @@ class TestConstraint(fixture.DB):
pk.name = self.table.primary_key.name pk.name = self.table.primary_key.name
pk.drop() pk.drop()
self.refresh_table() self.refresh_table()
#self.assertEquals(list(self.table.primary_key),list())
self.assertEquals(len(self.table.primary_key), 0) self.assertEquals(len(self.table.primary_key), 0)
self.assert_(isinstance(self.table.primary_key, self.assert_(isinstance(self.table.primary_key, schema.PrimaryKeyConstraint))
schema.PrimaryKeyConstraint),self.table.primary_key.__class__)
return pk return pk
@fixture.usedb(not_supported='sqlite') @fixture.usedb(not_supported='sqlite')
def test_define_fk(self): def test_define_fk(self):
"""FK constraints can be defined, created, and dropped""" """FK constraints can be defined, created, and dropped"""
# FK target must be unique # FK target must be unique
pk = PrimaryKeyConstraint(self.table.c.id, table=self.table) pk = PrimaryKeyConstraint(self.table.c.id, table=self.table, name="pkid")
pk.create() pk.create()
# Add a FK by creating a FK constraint # Add a FK by creating a FK constraint
self.assertEquals(self.table.c.fkey.foreign_keys._list, []) self.assertEquals(self.table.c.fkey.foreign_keys._list, [])
fk = ForeignKeyConstraint([self.table.c.fkey],[self.table.c.id], table=self.table) fk = ForeignKeyConstraint([self.table.c.fkey], [self.table.c.id], name="fk_id_fkey")
self.assert_(self.table.c.fkey.foreign_keys._list is not []) self.assert_(self.table.c.fkey.foreign_keys._list is not [])
self.assertEquals(list(fk.columns), [self.table.c.fkey]) self.assertEquals(list(fk.columns), [self.table.c.fkey])
self.assertEquals([e.column for e in fk.elements], [self.table.c.id]) self.assertEquals([e.column for e in fk.elements], [self.table.c.id])
@ -78,22 +88,11 @@ class TestConstraint(fixture.DB):
# MySQL FKs need an index # MySQL FKs need an index
index = Index('index_name', self.table.c.fkey) index = Index('index_name', self.table.c.fkey)
index.create() index.create()
if self.url.startswith('oracle'):
# Oracle constraints need a name
fk.name = 'fgsfds'
print 'drop...'
#self.engine.echo=True
fk.create() fk.create()
#self.engine.echo=False
print 'dropped'
self.refresh_table() self.refresh_table()
self.assert_(self.table.c.fkey.foreign_keys._list is not []) self.assert_(self.table.c.fkey.foreign_keys._list is not [])
print 'drop...'
#self.engine.echo=True
fk.drop() fk.drop()
#self.engine.echo=False
print 'dropped'
self.refresh_table() self.refresh_table()
self.assertEquals(self.table.c.fkey.foreign_keys._list, []) self.assertEquals(self.table.c.fkey.foreign_keys._list, [])
@ -108,36 +107,31 @@ class TestConstraint(fixture.DB):
#self.engine.echo=True #self.engine.echo=True
self._define_pk(self.table.c.id, self.table.c.fkey) self._define_pk(self.table.c.id, self.table.c.fkey)
@fixture.usedb()
def test_drop_cascade(self):
pk = PrimaryKeyConstraint('id', table=self.table, name="id_pkey")
pk.create()
self.refresh_table()
class TestAutoname(fixture.DB): # Drop the PK constraint forcing cascade
pk.drop(cascade=True)
class TestAutoname(CommonTestConstraint):
"""Every method tests for a type of constraint wether it can autoname
itself and if you can pass object instance and names to classes.
"""
level = fixture.DB.CONNECT level = fixture.DB.CONNECT
def _setup(self, url):
super(TestAutoname, self)._setup(url)
self._connect(self.url)
self.meta = MetaData(self.engine)
self.table = Table('mytable',self.meta,
Column('id', Integer),
Column('fkey', String(40)),
)
if self.engine.has_table(self.table.name):
self.table.drop()
self.table.create()
def _teardown(self):
if hasattr(self,'table') and self.engine.has_table(self.table.name):
self.table.drop()
super(TestAutoname, self)._teardown()
@fixture.usedb(not_supported='oracle') @fixture.usedb(not_supported='oracle')
def test_autoname(self): def test_autoname_pk(self):
"""Constraints can guess their name if none is given""" """PrimaryKeyConstraints can guess their name if None is given"""
# Don't supply a name; it should create one # Don't supply a name; it should create one
cons = PrimaryKeyConstraint(self.table.c.id) cons = PrimaryKeyConstraint(self.table.c.id)
cons.create() cons.create()
self.refresh_table() self.refresh_table()
# TODO: test for index for sqlite
if not self.url.startswith('sqlite'): if not self.url.startswith('sqlite'):
# TODO: test for index for sqlite
self.assertEquals(list(cons.columns), list(self.table.primary_key)) self.assertEquals(list(cons.columns), list(self.table.primary_key))
# Remove the name, drop the constraint; it should succeed # Remove the name, drop the constraint; it should succeed
@ -145,3 +139,91 @@ class TestAutoname(fixture.DB):
cons.drop() cons.drop()
self.refresh_table() self.refresh_table()
self.assertEquals(list(), list(self.table.primary_key)) self.assertEquals(list(), list(self.table.primary_key))
# test string names
cons = PrimaryKeyConstraint('id', table=self.table)
cons.create()
self.refresh_table()
if not self.url.startswith('sqlite'):
# TODO: test for index for sqlite
self.assertEquals(list(cons.columns), list(self.table.primary_key))
cons.name = None
cons.drop()
@fixture.usedb(not_supported=['oracle', 'sqlite'])
def test_autoname_fk(self):
"""ForeignKeyConstraints can guess their name if None is given"""
cons = ForeignKeyConstraint([self.table.c.fkey], [self.table.c.id])
if self.url.startswith('mysql'):
# MySQL FKs need an index
index = Index('index_name', self.table.c.fkey)
index.create()
cons.create()
self.refresh_table()
self.table.c.fkey.foreign_keys[0].column is self.table.c.id
# Remove the name, drop the constraint; it should succeed
cons.name = None
cons.drop()
self.refresh_table()
self.assertEquals(self.table.c.fkey.foreign_keys._list, list())
# test string names
cons = ForeignKeyConstraint(['fkey'], ['%s.id' % self.tablename], table=self.table)
if self.url.startswith('mysql'):
# MySQL FKs need an index
index = Index('index_name', self.table.c.fkey)
index.create()
cons.create()
self.refresh_table()
self.table.c.fkey.foreign_keys[0].column is self.table.c.id
# Remove the name, drop the constraint; it should succeed
cons.name = None
cons.drop()
@fixture.usedb(not_supported=['oracle', 'sqlite'])
def test_autoname_check(self):
"""CheckConstraints can guess their name if None is given"""
cons = CheckConstraint('id > 3', columns=[self.table.c.id])
cons.create()
self.refresh_table()
self.table.insert(values={'id': 4}).execute()
try:
self.table.insert(values={'id': 1}).execute()
except IntegrityError:
pass
else:
self.fail()
# Remove the name, drop the constraint; it should succeed
cons.name = None
cons.drop()
self.refresh_table()
self.table.insert(values={'id': 2}).execute()
self.table.insert(values={'id': 5}).execute()
@fixture.usedb(not_supported=['oracle', 'sqlite'])
def test_autoname_unique(self):
"""UniqueConstraints can guess their name if None is given"""
cons = UniqueConstraint(self.table.c.fkey)
cons.create()
self.refresh_table()
self.table.insert(values={'fkey': 4}).execute()
try:
self.table.insert(values={'fkey': 4}).execute()
except IntegrityError:
pass
else:
self.fail()
# Remove the name, drop the constraint; it should succeed
cons.name = None
cons.drop()
self.refresh_table()
self.table.insert(values={'fkey': 4}).execute()
self.table.insert(values={'fkey': 4}).execute()