245 lines
8.2 KiB
Python
245 lines
8.2 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import os
|
|
import sys
|
|
import shutil
|
|
|
|
from migrate.versioning import exceptions, version, repository
|
|
from migrate.versioning.script import *
|
|
from migrate.versioning.util import *
|
|
|
|
from migrate.tests import fixture
|
|
from migrate.tests.fixture.models import tmp_sql_table
|
|
|
|
|
|
class TestBaseScript(fixture.Pathed):
|
|
|
|
def test_all(self):
|
|
"""Testing all basic BaseScript operations"""
|
|
# verify / source / run
|
|
src = self.tmp()
|
|
open(src, 'w').close()
|
|
bscript = BaseScript(src)
|
|
BaseScript.verify(src)
|
|
self.assertEqual(bscript.source(), '')
|
|
self.assertRaises(NotImplementedError, bscript.run, 'foobar')
|
|
|
|
|
|
class TestPyScript(fixture.Pathed, fixture.DB):
|
|
cls = PythonScript
|
|
def test_create(self):
|
|
"""We can create a migration script"""
|
|
path = self.tmp_py()
|
|
# Creating a file that doesn't exist should succeed
|
|
self.cls.create(path)
|
|
self.assert_(os.path.exists(path))
|
|
# Created file should be a valid script (If not, raises an error)
|
|
self.cls.verify(path)
|
|
# Can't create it again: it already exists
|
|
self.assertRaises(exceptions.PathFoundError,self.cls.create,path)
|
|
|
|
@fixture.usedb(supported='sqlite')
|
|
def test_run(self):
|
|
script_path = self.tmp_py()
|
|
pyscript = PythonScript.create(script_path)
|
|
pyscript.run(self.engine, 1)
|
|
pyscript.run(self.engine, -1)
|
|
|
|
self.assertRaises(exceptions.ScriptError, pyscript.run, self.engine, 0)
|
|
self.assertRaises(exceptions.ScriptError, pyscript._func, 'foobar')
|
|
|
|
# clean pyc file
|
|
os.remove(script_path + 'c')
|
|
|
|
# test deprecated upgrade/downgrade with no arguments
|
|
contents = open(script_path, 'r').read()
|
|
f = open(script_path, 'w')
|
|
f.write(contents.replace("upgrade(migrate_engine)", "upgrade()"))
|
|
f.close()
|
|
|
|
pyscript = PythonScript(script_path)
|
|
pyscript._module = None
|
|
try:
|
|
pyscript.run(self.engine, 1)
|
|
pyscript.run(self.engine, -1)
|
|
except exceptions.ScriptError:
|
|
pass
|
|
else:
|
|
self.fail()
|
|
|
|
def test_verify_notfound(self):
|
|
"""Correctly verify a python migration script: nonexistant file"""
|
|
path = self.tmp_py()
|
|
self.assertFalse(os.path.exists(path))
|
|
# Fails on empty path
|
|
self.assertRaises(exceptions.InvalidScriptError,self.cls.verify,path)
|
|
self.assertRaises(exceptions.InvalidScriptError,self.cls,path)
|
|
|
|
def test_verify_invalidpy(self):
|
|
"""Correctly verify a python migration script: invalid python file"""
|
|
path=self.tmp_py()
|
|
# Create empty file
|
|
f = open(path,'w')
|
|
f.write("def fail")
|
|
f.close()
|
|
self.assertRaises(Exception,self.cls.verify_module,path)
|
|
# script isn't verified on creation, but on module reference
|
|
py = self.cls(path)
|
|
self.assertRaises(Exception,(lambda x: x.module),py)
|
|
|
|
def test_verify_nofuncs(self):
|
|
"""Correctly verify a python migration script: valid python file; no upgrade func"""
|
|
path = self.tmp_py()
|
|
# Create empty file
|
|
f = open(path, 'w')
|
|
f.write("def zergling():\n\tprint 'rush'")
|
|
f.close()
|
|
self.assertRaises(exceptions.InvalidScriptError, self.cls.verify_module, path)
|
|
# script isn't verified on creation, but on module reference
|
|
py = self.cls(path)
|
|
self.assertRaises(exceptions.InvalidScriptError,(lambda x: x.module),py)
|
|
|
|
@fixture.usedb(supported='sqlite')
|
|
def test_preview_sql(self):
|
|
"""Preview SQL abstract from ORM layer (sqlite)"""
|
|
path = self.tmp_py()
|
|
|
|
f = open(path, 'w')
|
|
content = '''
|
|
from migrate import *
|
|
from sqlalchemy import *
|
|
|
|
metadata = MetaData()
|
|
|
|
UserGroup = Table('Link', metadata,
|
|
Column('link1ID', Integer),
|
|
Column('link2ID', Integer),
|
|
UniqueConstraint('link1ID', 'link2ID'))
|
|
|
|
def upgrade(migrate_engine):
|
|
metadata.create_all(migrate_engine)
|
|
'''
|
|
f.write(content)
|
|
f.close()
|
|
|
|
pyscript = self.cls(path)
|
|
SQL = pyscript.preview_sql(self.url, 1)
|
|
self.assertEqualsIgnoreWhitespace("""
|
|
CREATE TABLE "Link"
|
|
("link1ID" INTEGER,
|
|
"link2ID" INTEGER,
|
|
UNIQUE ("link1ID", "link2ID"))
|
|
""", SQL)
|
|
# TODO: test: No SQL should be executed!
|
|
|
|
def test_verify_success(self):
|
|
"""Correctly verify a python migration script: success"""
|
|
path = self.tmp_py()
|
|
# Succeeds after creating
|
|
self.cls.create(path)
|
|
self.cls.verify(path)
|
|
|
|
# test for PythonScript.make_update_script_for_model
|
|
|
|
@fixture.usedb()
|
|
def test_make_update_script_for_model(self):
|
|
"""Construct script source from differences of two models"""
|
|
|
|
self.setup_model_params()
|
|
self.write_file(self.first_model_path, self.base_source)
|
|
self.write_file(self.second_model_path, self.base_source + self.model_source)
|
|
|
|
source_script = self.pyscript.make_update_script_for_model(
|
|
engine=self.engine,
|
|
oldmodel=load_model('testmodel_first:meta'),
|
|
model=load_model('testmodel_second:meta'),
|
|
repository=self.repo_path,
|
|
)
|
|
|
|
self.assertTrue('User.create()' in source_script)
|
|
self.assertTrue('User.drop()' in source_script)
|
|
|
|
#@fixture.usedb()
|
|
#def test_make_update_script_for_model_equals(self):
|
|
# """Try to make update script from two identical models"""
|
|
|
|
# self.setup_model_params()
|
|
# self.write_file(self.first_model_path, self.base_source + self.model_source)
|
|
# self.write_file(self.second_model_path, self.base_source + self.model_source)
|
|
|
|
# source_script = self.pyscript.make_update_script_for_model(
|
|
# engine=self.engine,
|
|
# oldmodel=load_model('testmodel_first:meta'),
|
|
# model=load_model('testmodel_second:meta'),
|
|
# repository=self.repo_path,
|
|
# )
|
|
|
|
# self.assertFalse('User.create()' in source_script)
|
|
# self.assertFalse('User.drop()' in source_script)
|
|
|
|
def setup_model_params(self):
|
|
self.script_path = self.tmp_py()
|
|
self.repo_path = self.tmp()
|
|
self.first_model_path = os.path.join(self.temp_usable_dir, 'testmodel_first.py')
|
|
self.second_model_path = os.path.join(self.temp_usable_dir, 'testmodel_second.py')
|
|
|
|
self.base_source = """from sqlalchemy import *\nmeta = MetaData()\n"""
|
|
self.model_source = """
|
|
User = Table('User', meta,
|
|
Column('id', Integer, primary_key=True),
|
|
Column('login', Unicode(40)),
|
|
Column('passwd', String(40)),
|
|
)"""
|
|
|
|
self.repo = repository.Repository.create(self.repo_path, 'repo')
|
|
self.pyscript = PythonScript.create(self.script_path)
|
|
|
|
def write_file(self, path, contents):
|
|
f = open(path, 'w')
|
|
f.write(contents)
|
|
f.close()
|
|
|
|
|
|
class TestSqlScript(fixture.Pathed, fixture.DB):
|
|
|
|
@fixture.usedb()
|
|
def test_error(self):
|
|
"""Test if exception is raised on wrong script source"""
|
|
src = self.tmp()
|
|
|
|
f = open(src, 'w')
|
|
f.write("""foobar""")
|
|
f.close()
|
|
|
|
sqls = SqlScript(src)
|
|
self.assertRaises(Exception, sqls.run, self.engine)
|
|
|
|
@fixture.usedb()
|
|
def test_success(self):
|
|
"""Test sucessful SQL execution"""
|
|
# cleanup and prepare python script
|
|
tmp_sql_table.metadata.drop_all(self.engine, checkfirst=True)
|
|
script_path = self.tmp_py()
|
|
pyscript = PythonScript.create(script_path)
|
|
|
|
# populate python script
|
|
contents = open(script_path, 'r').read()
|
|
contents = contents.replace("pass", "tmp_sql_table.create(migrate_engine)")
|
|
contents = 'from migrate.tests.fixture.models import tmp_sql_table\n' + contents
|
|
f = open(script_path, 'w')
|
|
f.write(contents)
|
|
f.close()
|
|
|
|
# write SQL script from python script preview
|
|
pyscript = PythonScript(script_path)
|
|
src = self.tmp()
|
|
f = open(src, 'w')
|
|
f.write(pyscript.preview_sql(self.url, 1))
|
|
f.close()
|
|
|
|
# run the change
|
|
sqls = SqlScript(src)
|
|
sqls.run(self.engine, executemany=False)
|
|
tmp_sql_table.metadata.drop_all(self.engine, checkfirst=True)
|