# Copyright 2013 IBM Corp. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import copy import datetime import logging import pytz import six import mock from oslo_context import context from oslo_serialization import jsonutils from oslo_utils import timeutils from testtools import matchers from oslo_versionedobjects import base from oslo_versionedobjects import exception from oslo_versionedobjects import fields from oslo_versionedobjects import fixture from oslo_versionedobjects import test LOG = logging.getLogger(__name__) def is_test_object(cls): """Return True if class is defined in the tests. :param cls: Class to inspect """ return 'oslo_versionedobjects.tests' in cls.__module__ @base.VersionedObjectRegistry.register class MyOwnedObject(base.VersionedObject): VERSION = '1.0' fields = {'baz': fields.Field(fields.Integer())} @base.VersionedObjectRegistry.register class MyObj(base.VersionedObject, base.VersionedObjectDictCompat): VERSION = '1.6' fields = {'foo': fields.Field(fields.Integer(), default=1), 'bar': fields.Field(fields.String()), 'missing': fields.Field(fields.String()), 'readonly': fields.Field(fields.Integer(), read_only=True), 'rel_object': fields.ObjectField('MyOwnedObject', nullable=True), 'rel_objects': fields.ListOfObjectsField('MyOwnedObject', nullable=True), 'mutable_default': fields.ListOfStringsField(default=[]), 'timestamp': fields.DateTimeField(nullable=True), } @staticmethod def _from_db_object(context, obj, db_obj): self = MyObj() self.foo = db_obj['foo'] self.bar = db_obj['bar'] self.missing = db_obj['missing'] self.readonly = 1 return self def obj_load_attr(self, attrname): setattr(self, attrname, 'loaded!') @base.remotable_classmethod def query(cls, context): obj = cls(context=context, foo=1, bar='bar') obj.obj_reset_changes() return obj @base.remotable def marco(self): return 'polo' @base.remotable def _update_test(self): project_id = getattr(context, 'tenant', None) if project_id is None: project_id = getattr(context, 'project_id', None) if project_id == 'alternate': self.bar = 'alternate-context' else: self.bar = 'updated' @base.remotable def save(self): self.obj_reset_changes() @base.remotable def refresh(self): self.foo = 321 self.bar = 'refreshed' self.obj_reset_changes() @base.remotable def modify_save_modify(self): self.bar = 'meow' self.save() self.foo = 42 self.rel_object = MyOwnedObject(baz=42) def obj_make_compatible(self, primitive, target_version): super(MyObj, self).obj_make_compatible(primitive, target_version) # NOTE(danms): Simulate an older version that had a different # format for the 'bar' attribute if target_version == '1.1' and 'bar' in primitive: primitive['bar'] = 'old%s' % primitive['bar'] @base.VersionedObjectRegistry.register class MyComparableObj(MyObj, base.ComparableVersionedObject): pass @base.VersionedObjectRegistry.register class MyObjDiffVers(MyObj): VERSION = '1.5' @classmethod def obj_name(cls): return 'MyObj' @base.VersionedObjectRegistry.register_if(False) class MyObj2(base.VersionedObject): @classmethod def obj_name(cls): return 'MyObj' @base.remotable_classmethod def query(cls, *args, **kwargs): pass @base.VersionedObjectRegistry.register_if(False) class MySensitiveObj(base.VersionedObject): VERSION = '1.0' fields = { 'data': fields.SensitiveStringField(nullable=True) } @base.remotable_classmethod def query(cls, *args, **kwargs): pass class RandomMixInWithNoFields(object): """Used to test object inheritance using a mixin that has no fields.""" pass @base.VersionedObjectRegistry.register class TestSubclassedObject(RandomMixInWithNoFields, MyObj): fields = {'new_field': fields.Field(fields.String())} child_versions = { '1.0': '1.0', '1.1': '1.1', '1.2': '1.1', '1.3': '1.2', '1.4': '1.3', '1.5': '1.4', '1.6': '1.5', '1.7': '1.6', } @base.VersionedObjectRegistry.register class MyCompoundObject(base.VersionedObject): fields = { "foo": fields.Field(fields.List(fields.Integer())), "bar": fields.Field(fields.Dict(fields.Integer())), "baz": fields.Field(fields.Set(fields.Integer())) } class TestRegistry(test.TestCase): def test_obj_tracking(self): @base.VersionedObjectRegistry.register class NewBaseClass(object): VERSION = '1.0' fields = {} @classmethod def obj_name(cls): return cls.__name__ @base.VersionedObjectRegistry.register class Fake1TestObj1(NewBaseClass): @classmethod def obj_name(cls): return 'fake1' @base.VersionedObjectRegistry.register class Fake1TestObj2(Fake1TestObj1): pass @base.VersionedObjectRegistry.register class Fake1TestObj3(Fake1TestObj1): VERSION = '1.1' @base.VersionedObjectRegistry.register class Fake2TestObj1(NewBaseClass): @classmethod def obj_name(cls): return 'fake2' @base.VersionedObjectRegistry.register class Fake1TestObj4(Fake1TestObj3): VERSION = '1.2' @base.VersionedObjectRegistry.register class Fake2TestObj2(Fake2TestObj1): VERSION = '1.1' @base.VersionedObjectRegistry.register class Fake1TestObj5(Fake1TestObj1): VERSION = '1.1' @base.VersionedObjectRegistry.register_if(False) class ConditionalObj1(NewBaseClass): fields = {'foo': fields.IntegerField()} @base.VersionedObjectRegistry.register_if(True) class ConditionalObj2(NewBaseClass): fields = {'foo': fields.IntegerField()} # Newest versions first in the list. Duplicate versions take the # newest object. expected = {'fake1': [Fake1TestObj4, Fake1TestObj5, Fake1TestObj2], 'fake2': [Fake2TestObj2, Fake2TestObj1]} self.assertEqual(expected['fake1'], base.VersionedObjectRegistry.obj_classes()['fake1']) self.assertEqual(expected['fake2'], base.VersionedObjectRegistry.obj_classes()['fake2']) self.assertEqual( [], base.VersionedObjectRegistry.obj_classes()['ConditionalObj1']) self.assertTrue(hasattr(ConditionalObj1, 'foo')) self.assertEqual( [ConditionalObj2], base.VersionedObjectRegistry.obj_classes()['ConditionalObj2']) self.assertTrue(hasattr(ConditionalObj2, 'foo')) def test_field_checking(self): def create_class(field): @base.VersionedObjectRegistry.register class TestField(base.VersionedObject): VERSION = '1.5' fields = {'foo': field()} return TestField create_class(fields.DateTimeField) self.assertRaises(exception.ObjectFieldInvalid, create_class, fields.DateTime) self.assertRaises(exception.ObjectFieldInvalid, create_class, int) def test_registration_hook(self): class TestObject(base.VersionedObject): VERSION = '1.0' class TestObjectNewer(base.VersionedObject): VERSION = '1.1' @classmethod def obj_name(cls): return 'TestObject' registry = base.VersionedObjectRegistry() with mock.patch.object(registry, 'registration_hook') as mock_hook: registry._register_class(TestObject) mock_hook.assert_called_once_with(TestObject, 0) with mock.patch.object(registry, 'registration_hook') as mock_hook: registry._register_class(TestObjectNewer) mock_hook.assert_called_once_with(TestObjectNewer, 0) def test_subclassability(self): class MyRegistryOne(base.VersionedObjectRegistry): def registration_hook(self, cls, index): cls.reg_to = "one" class MyRegistryTwo(base.VersionedObjectRegistry): def registration_hook(self, cls, index): cls.reg_to = "two" @MyRegistryOne.register class AVersionedObject1(base.VersionedObject): VERSION = '1.0' fields = {'baz': fields.Field(fields.Integer())} @MyRegistryTwo.register class AVersionedObject2(base.VersionedObject): VERSION = '1.0' fields = {'baz': fields.Field(fields.Integer())} self.assertIn('AVersionedObject1', MyRegistryOne.obj_classes()) self.assertIn('AVersionedObject2', MyRegistryOne.obj_classes()) self.assertIn('AVersionedObject1', MyRegistryTwo.obj_classes()) self.assertIn('AVersionedObject2', MyRegistryTwo.obj_classes()) self.assertIn('AVersionedObject1', base.VersionedObjectRegistry.obj_classes()) self.assertIn('AVersionedObject2', base.VersionedObjectRegistry.obj_classes()) self.assertEqual(AVersionedObject1.reg_to, "one") self.assertEqual(AVersionedObject2.reg_to, "two") class TestObjMakeList(test.TestCase): def test_obj_make_list(self): @base.VersionedObjectRegistry.register class MyList(base.ObjectListBase, base.VersionedObject): fields = { 'objects': fields.ListOfObjectsField('MyObj'), } db_objs = [{'foo': 1, 'bar': 'baz', 'missing': 'banana'}, {'foo': 2, 'bar': 'bat', 'missing': 'apple'}, ] mylist = base.obj_make_list('ctxt', MyList(), MyObj, db_objs) self.assertEqual(2, len(mylist)) self.assertEqual('ctxt', mylist._context) for index, item in enumerate(mylist): self.assertEqual(db_objs[index]['foo'], item.foo) self.assertEqual(db_objs[index]['bar'], item.bar) self.assertEqual(db_objs[index]['missing'], item.missing) class TestGetSubobjectVersion(test.TestCase): def setUp(self): super(TestGetSubobjectVersion, self).setUp() self.backport_mock = mock.MagicMock() self.rels = [('1.1', '1.0'), ('1.3', '1.1')] def test_get_subobject_version_not_existing(self): # Verify that exception is raised if we try backporting # to a version where we did not contain the subobject self.assertRaises(exception.TargetBeforeSubobjectExistedException, base._get_subobject_version, '1.0', self.rels, self.backport_mock) def test_get_subobject_version_explicit_version(self): # Verify that we backport to the correct subobject version when the # version we are going back to is explicitly said in the relationships base._get_subobject_version('1.3', self.rels, self.backport_mock) self.backport_mock.assert_called_once_with('1.1') def test_get_subobject_version_implicit_version(self): # Verify that we backport to the correct subobject version when the # version backporting to is not explicitly stated in the relationships base._get_subobject_version('1.2', self.rels, self.backport_mock) self.backport_mock.assert_called_once_with('1.0') class TestDoSubobjectBackport(test.TestCase): @base.VersionedObjectRegistry.register class ParentObj(base.VersionedObject): VERSION = '1.1' fields = {'child': fields.ObjectField('ChildObj', nullable=True)} obj_relationships = {'child': [('1.0', '1.0'), ('1.1', '1.1')]} @base.VersionedObjectRegistry.register class ParentObjList(base.VersionedObject, base.ObjectListBase): VERSION = '1.1' fields = {'objects': fields.ListOfObjectsField('ChildObj')} obj_relationships = {'objects': [('1.0', '1.0'), ('1.1', '1.1')]} @base.VersionedObjectRegistry.register class ChildObj(base.VersionedObject): VERSION = '1.1' fields = {'foo': fields.IntegerField()} def test_do_subobject_backport_without_manifest(self): child = self.ChildObj(foo=1) parent = self.ParentObj(child=child) parent_primitive = parent.obj_to_primitive()['versioned_object.data'] primitive = child.obj_to_primitive()['versioned_object.data'] version = '1.0' compat_func = 'obj_make_compatible_from_manifest' with mock.patch.object(child, compat_func) as mock_compat: base._do_subobject_backport(version, parent, 'child', parent_primitive) mock_compat.assert_called_once_with(primitive, version, version_manifest=None) def test_do_subobject_backport_with_manifest(self): child = self.ChildObj(foo=1) parent = self.ParentObj(child=child) parent_primitive = parent.obj_to_primitive()['versioned_object.data'] primitive = child.obj_to_primitive()['versioned_object.data'] version = '1.0' manifest = {'ChildObj': '1.0'} parent._obj_version_manifest = manifest compat_func = 'obj_make_compatible_from_manifest' with mock.patch.object(child, compat_func) as mock_compat: base._do_subobject_backport(version, parent, 'child', parent_primitive) mock_compat.assert_called_once_with(primitive, version, version_manifest=manifest) def test_do_subobject_backport_with_manifest_old_parent(self): child = self.ChildObj(foo=1) parent = self.ParentObj(child=child) manifest = {'ChildObj': '1.0'} parent_primitive = parent.obj_to_primitive(target_version='1.1', version_manifest=manifest) child_primitive = parent_primitive['versioned_object.data']['child'] self.assertEqual('1.0', child_primitive['versioned_object.version']) def test_do_subobject_backport_list_object(self): child = self.ChildObj(foo=1) parent = self.ParentObjList(objects=[child]) parent_primitive = parent.obj_to_primitive()['versioned_object.data'] primitive = child.obj_to_primitive()['versioned_object.data'] version = '1.0' compat_func = 'obj_make_compatible_from_manifest' with mock.patch.object(child, compat_func) as mock_compat: base._do_subobject_backport(version, parent, 'objects', parent_primitive) mock_compat.assert_called_once_with(primitive, version, version_manifest=None) def test_do_subobject_backport_list_object_with_manifest(self): child = self.ChildObj(foo=1) parent = self.ParentObjList(objects=[child]) manifest = {'ChildObj': '1.0', 'ParentObjList': '1.0'} parent_primitive = parent.obj_to_primitive(target_version='1.0', version_manifest=manifest) self.assertEqual('1.0', parent_primitive['versioned_object.version']) child_primitive = parent_primitive['versioned_object.data']['objects'] self.assertEqual('1.0', child_primitive[0]['versioned_object.version']) def test_do_subobject_backport_null_child(self): parent = self.ParentObj(child=None) parent_primitive = parent.obj_to_primitive()['versioned_object.data'] version = '1.0' compat_func = 'obj_make_compatible_from_manifest' with mock.patch.object(self.ChildObj, compat_func) as mock_compat: base._do_subobject_backport(version, parent, 'child', parent_primitive) self.assertFalse(mock_compat.called, "obj_make_compatible_from_manifest() should not " "have been called because the subobject is " "None.") def test_to_primitive_calls_make_compatible_manifest(self): obj = self.ParentObj() with mock.patch.object(obj, 'obj_make_compatible_from_manifest') as m: obj.obj_to_primitive(target_version='1.0', version_manifest=mock.sentinel.manifest) m.assert_called_once_with(mock.ANY, '1.0', mock.sentinel.manifest) class _BaseTestCase(test.TestCase): def setUp(self): super(_BaseTestCase, self).setUp() self.user_id = 'fake-user' self.project_id = 'fake-project' self.context = context.RequestContext(self.user_id, self.project_id) def json_comparator(self, expected, obj_val): # json-ify an object field for comparison with its db str # equivalent self.assertEqual(expected, jsonutils.dumps(obj_val)) def str_comparator(self, expected, obj_val): """Compare a field to a string value Compare an object field to a string in the db by performing a simple coercion on the object field value. """ self.assertEqual(expected, str(obj_val)) def assertNotIsInstance(self, obj, cls, msg=None): """Python < v2.7 compatibility. Assert 'not isinstance(obj, cls).""" try: f = super(_BaseTestCase, self).assertNotIsInstance except AttributeError: self.assertThat(obj, matchers.Not(matchers.IsInstance(cls)), message=msg or '') else: f(obj, cls, msg=msg) class TestFixture(_BaseTestCase): def test_fake_indirection_takes_serializer(self): ser = mock.MagicMock() iapi = fixture.FakeIndirectionAPI(ser) ser.serialize_entity.return_value = mock.sentinel.serial iapi.object_action(mock.sentinel.context, mock.sentinel.objinst, mock.sentinel.objmethod, (), {}) ser.serialize_entity.assert_called_once_with(mock.sentinel.context, mock.sentinel.objinst) ser.deserialize_entity.assert_called_once_with(mock.sentinel.context, mock.sentinel.serial) def test_indirection_fixture_takes_indirection_api(self): iapi = mock.sentinel.iapi self.useFixture(fixture.IndirectionFixture(iapi)) self.assertEqual(iapi, base.VersionedObject.indirection_api) def test_indirection_action(self): self.useFixture(fixture.IndirectionFixture()) obj = MyObj(context=self.context) with mock.patch.object(base.VersionedObject.indirection_api, 'object_action') as mock_action: mock_action.return_value = ({}, 'foo') obj.marco() mock_action.assert_called_once_with(self.context, obj, 'marco', (), {}) @mock.patch('oslo_versionedobjects.base.obj_tree_get_versions') def test_indirection_class_action(self, mock_otgv): mock_otgv.return_value = mock.sentinel.versions self.useFixture(fixture.IndirectionFixture()) with mock.patch.object(base.VersionedObject.indirection_api, 'object_class_action_versions') as mock_caction: mock_caction.return_value = 'foo' MyObj.query(self.context) mock_caction.assert_called_once_with(self.context, 'MyObj', 'query', mock.sentinel.versions, (), {}) def test_fake_indirection_serializes_arguments(self): ser = mock.MagicMock() iapi = fixture.FakeIndirectionAPI(serializer=ser) arg1 = mock.MagicMock() arg2 = mock.MagicMock() iapi.object_action(mock.sentinel.context, mock.sentinel.objinst, mock.sentinel.objmethod, (arg1,), {'foo': arg2}) ser.serialize_entity.assert_any_call(mock.sentinel.context, arg1) ser.serialize_entity.assert_any_call(mock.sentinel.context, arg2) def test_get_hashes(self): checker = fixture.ObjectVersionChecker() hashes = checker.get_hashes() # NOTE(danms): If this object's version or hash changes, this needs # to change. Otherwise, leave it alone. self.assertEqual('1.6-fb5f5379168bf08f7f2ce0a745e91027', hashes['TestSubclassedObject']) def test_test_hashes(self): checker = fixture.ObjectVersionChecker() hashes = checker.get_hashes() actual_hash = hashes['TestSubclassedObject'] hashes['TestSubclassedObject'] = 'foo' expected, actual = checker.test_hashes(hashes) self.assertEqual(['TestSubclassedObject'], list(expected.keys())) self.assertEqual(['TestSubclassedObject'], list(actual.keys())) self.assertEqual('foo', expected['TestSubclassedObject']) self.assertEqual(actual_hash, actual['TestSubclassedObject']) def test_get_dependency_tree(self): checker = fixture.ObjectVersionChecker() tree = checker.get_dependency_tree() # NOTE(danms): If this object's dependencies change, this n eeds # to change. Otherwise, leave it alone. self.assertEqual({'MyOwnedObject': '1.0'}, tree['TestSubclassedObject']) def test_test_relationships(self): checker = fixture.ObjectVersionChecker() tree = checker.get_dependency_tree() actual = tree['TestSubclassedObject'] tree['TestSubclassedObject']['Foo'] = '9.8' expected, actual = checker.test_relationships(tree) self.assertEqual(['TestSubclassedObject'], list(expected.keys())) self.assertEqual(['TestSubclassedObject'], list(actual.keys())) self.assertEqual({'MyOwnedObject': '1.0', 'Foo': '9.8'}, expected['TestSubclassedObject']) self.assertEqual({'MyOwnedObject': '1.0'}, actual['TestSubclassedObject']) def test_test_compatibility(self): fake_classes = {mock.sentinel.class_one: [mock.sentinel.impl_one_one, mock.sentinel.impl_one_two], mock.sentinel.class_two: [mock.sentinel.impl_two_one, mock.sentinel.impl_two_two], } checker = fixture.ObjectVersionChecker(fake_classes) @mock.patch.object(checker, '_test_object_compatibility') def test(mock_compat): checker.test_compatibility_routines() mock_compat.assert_has_calls( [mock.call(mock.sentinel.impl_one_one, manifest=None, init_args=[], init_kwargs={}), mock.call(mock.sentinel.impl_one_two, manifest=None, init_args=[], init_kwargs={}), mock.call(mock.sentinel.impl_two_one, manifest=None, init_args=[], init_kwargs={}), mock.call(mock.sentinel.impl_two_two, manifest=None, init_args=[], init_kwargs={})], any_order=True) test() def test_test_compatibility_checks_obj_to_primitive(self): fake = mock.MagicMock() fake.VERSION = '1.3' checker = fixture.ObjectVersionChecker() checker._test_object_compatibility(fake) fake().obj_to_primitive.assert_has_calls( [mock.call(target_version='1.0'), mock.call(target_version='1.1'), mock.call(target_version='1.2'), mock.call(target_version='1.3')]) def test_test_relationships_in_order(self): fake_classes = {mock.sentinel.class_one: [mock.sentinel.impl_one_one, mock.sentinel.impl_one_two], mock.sentinel.class_two: [mock.sentinel.impl_two_one, mock.sentinel.impl_two_two], } checker = fixture.ObjectVersionChecker(fake_classes) @mock.patch.object(checker, '_test_relationships_in_order') def test(mock_compat): checker.test_relationships_in_order() mock_compat.assert_has_calls( [mock.call(mock.sentinel.impl_one_one), mock.call(mock.sentinel.impl_one_two), mock.call(mock.sentinel.impl_two_one), mock.call(mock.sentinel.impl_two_two)], any_order=True) test() def test_test_relationships_in_order_good(self): fake = mock.MagicMock() fake.VERSION = '1.5' fake.fields = {'foo': fields.ObjectField('bar')} fake.obj_relationships = {'foo': [('1.2', '1.0'), ('1.3', '1.2')]} checker = fixture.ObjectVersionChecker() checker._test_relationships_in_order(fake) def _test_test_relationships_in_order_bad(self, fake_rels): fake = mock.MagicMock() fake.VERSION = '1.5' fake.fields = {'foo': fields.ObjectField('bar')} fake.obj_relationships = fake_rels checker = fixture.ObjectVersionChecker() self.assertRaises(AssertionError, checker._test_relationships_in_order, fake) def test_test_relationships_in_order_bad_my_version(self): self._test_test_relationships_in_order_bad( {'foo': [('1.4', '1.1'), ('1.3', '1.2')]}) def test_test_relationships_in_order_bad_child_version(self): self._test_test_relationships_in_order_bad( {'foo': [('1.2', '1.3'), ('1.3', '1.2')]}) def test_test_relationships_in_order_bad_both_versions(self): self._test_test_relationships_in_order_bad( {'foo': [('1.5', '1.4'), ('1.3', '1.2')]}) class _LocalTest(_BaseTestCase): def setUp(self): super(_LocalTest, self).setUp() self.assertIsNone(base.VersionedObject.indirection_api) class _RemoteTest(_BaseTestCase): def setUp(self): super(_RemoteTest, self).setUp() self.useFixture(fixture.IndirectionFixture()) class _TestObject(object): # def test_object_attrs_in_init(self): # # Spot check a few # objects.Instance # objects.InstanceInfoCache # objects.SecurityGroup # # Now check the test one in this file. Should be newest version # self.assertEqual('1.6', objects.MyObj.VERSION) def test_hydration_type_error(self): primitive = {'versioned_object.name': 'MyObj', 'versioned_object.namespace': 'versionedobjects', 'versioned_object.version': '1.5', 'versioned_object.data': {'foo': 'a'}} self.assertRaises(ValueError, MyObj.obj_from_primitive, primitive) def test_hydration(self): primitive = {'versioned_object.name': 'MyObj', 'versioned_object.namespace': 'versionedobjects', 'versioned_object.version': '1.5', 'versioned_object.data': {'foo': 1}} real_method = MyObj._obj_from_primitive def _obj_from_primitive(*args): return real_method(*args) with mock.patch.object(MyObj, '_obj_from_primitive') as ofp: ofp.side_effect = _obj_from_primitive obj = MyObj.obj_from_primitive(primitive) ofp.assert_called_once_with(None, '1.5', primitive) self.assertEqual(obj.foo, 1) def test_hydration_version_different(self): primitive = {'versioned_object.name': 'MyObj', 'versioned_object.namespace': 'versionedobjects', 'versioned_object.version': '1.2', 'versioned_object.data': {'foo': 1}} obj = MyObj.obj_from_primitive(primitive) self.assertEqual(obj.foo, 1) self.assertEqual('1.2', obj.VERSION) def test_hydration_bad_ns(self): primitive = {'versioned_object.name': 'MyObj', 'versioned_object.namespace': 'foo', 'versioned_object.version': '1.5', 'versioned_object.data': {'foo': 1}} self.assertRaises(exception.UnsupportedObjectError, MyObj.obj_from_primitive, primitive) def test_hydration_additional_unexpected_stuff(self): primitive = {'versioned_object.name': 'MyObj', 'versioned_object.namespace': 'versionedobjects', 'versioned_object.version': '1.5.1', 'versioned_object.data': { 'foo': 1, 'unexpected_thing': 'foobar'}} obj = MyObj.obj_from_primitive(primitive) self.assertEqual(1, obj.foo) self.assertFalse(hasattr(obj, 'unexpected_thing')) # NOTE(danms): If we call obj_from_primitive() directly # with a version containing .z, we'll get that version # in the resulting object. In reality, when using the # serializer, we'll get that snipped off (tested # elsewhere) self.assertEqual('1.5.1', obj.VERSION) def test_dehydration(self): expected = {'versioned_object.name': 'MyObj', 'versioned_object.namespace': 'versionedobjects', 'versioned_object.version': '1.6', 'versioned_object.data': {'foo': 1}} obj = MyObj(foo=1) obj.obj_reset_changes() self.assertEqual(obj.obj_to_primitive(), expected) def test_dehydration_invalid_version(self): obj = MyObj(foo=1) obj.obj_reset_changes() self.assertRaises(exception.InvalidTargetVersion, obj.obj_to_primitive, target_version='1.7') def test_dehydration_same_version(self): expected = {'versioned_object.name': 'MyObj', 'versioned_object.namespace': 'versionedobjects', 'versioned_object.version': '1.6', 'versioned_object.data': {'foo': 1}} obj = MyObj(foo=1) obj.obj_reset_changes() with mock.patch.object(obj, 'obj_make_compatible') as mock_compat: self.assertEqual( obj.obj_to_primitive(target_version='1.6'), expected) self.assertFalse(mock_compat.called) def test_object_property(self): obj = MyObj(foo=1) self.assertEqual(obj.foo, 1) def test_object_property_type_error(self): obj = MyObj() def fail(): obj.foo = 'a' self.assertRaises(ValueError, fail) def test_object_dict_syntax(self): obj = MyObj(foo=123, bar=u'text') self.assertEqual(obj['foo'], 123) self.assertIn('bar', obj) self.assertNotIn('missing', obj) self.assertEqual(sorted(iter(obj)), ['bar', 'foo']) self.assertEqual(sorted(obj.keys()), ['bar', 'foo']) self.assertEqual(sorted(obj.iterkeys()), ['bar', 'foo']) self.assertEqual(sorted(obj.values(), key=str), [123, u'text']) self.assertEqual(sorted(obj.itervalues(), key=str), [123, u'text']) self.assertEqual(sorted(obj.items()), [('bar', u'text'), ('foo', 123)]) self.assertEqual(sorted(list(obj.iteritems())), [('bar', u'text'), ('foo', 123)]) self.assertEqual(dict(obj), {'foo': 123, 'bar': u'text'}) def test_non_dict_remotable(self): @base.VersionedObjectRegistry.register class TestObject(base.VersionedObject): @base.remotable def test_method(self): return 123 obj = TestObject(context=self.context) self.assertEqual(123, obj.test_method()) def test_load(self): obj = MyObj() self.assertEqual(obj.bar, 'loaded!') def test_load_in_base(self): @base.VersionedObjectRegistry.register class Foo(base.VersionedObject): fields = {'foobar': fields.Field(fields.Integer())} obj = Foo() with self.assertRaisesRegex(NotImplementedError, ".*foobar.*"): obj.foobar def test_loaded_in_primitive(self): obj = MyObj(foo=1) obj.obj_reset_changes() self.assertEqual(obj.bar, 'loaded!') expected = {'versioned_object.name': 'MyObj', 'versioned_object.namespace': 'versionedobjects', 'versioned_object.version': '1.6', 'versioned_object.changes': ['bar'], 'versioned_object.data': {'foo': 1, 'bar': 'loaded!'}} self.assertEqual(obj.obj_to_primitive(), expected) def test_changes_in_primitive(self): obj = MyObj(foo=123) self.assertEqual(obj.obj_what_changed(), set(['foo'])) primitive = obj.obj_to_primitive() self.assertIn('versioned_object.changes', primitive) obj2 = MyObj.obj_from_primitive(primitive) self.assertEqual(obj2.obj_what_changed(), set(['foo'])) obj2.obj_reset_changes() self.assertEqual(obj2.obj_what_changed(), set()) def test_obj_class_from_name(self): obj = base.VersionedObject.obj_class_from_name('MyObj', '1.5') self.assertEqual('1.5', obj.VERSION) def test_obj_class_from_name_latest_compatible(self): obj = base.VersionedObject.obj_class_from_name('MyObj', '1.1') self.assertEqual('1.6', obj.VERSION) def test_unknown_objtype(self): self.assertRaises(exception.UnsupportedObjectError, base.VersionedObject.obj_class_from_name, 'foo', '1.0') def test_obj_class_from_name_supported_version(self): self.assertRaises(exception.IncompatibleObjectVersion, base.VersionedObject.obj_class_from_name, 'MyObj', '1.25') try: base.VersionedObject.obj_class_from_name('MyObj', '1.25') except exception.IncompatibleObjectVersion as error: self.assertEqual('1.6', error.kwargs['supported']) def test_orphaned_object(self): obj = MyObj.query(self.context) obj._context = None self.assertRaises(exception.OrphanedObjectError, obj._update_test) def test_changed_1(self): obj = MyObj.query(self.context) obj.foo = 123 self.assertEqual(obj.obj_what_changed(), set(['foo'])) obj._update_test() self.assertEqual(obj.obj_what_changed(), set(['foo', 'bar'])) self.assertEqual(obj.foo, 123) def test_changed_2(self): obj = MyObj.query(self.context) obj.foo = 123 self.assertEqual(obj.obj_what_changed(), set(['foo'])) obj.save() self.assertEqual(obj.obj_what_changed(), set([])) self.assertEqual(obj.foo, 123) def test_changed_3(self): obj = MyObj.query(self.context) obj.foo = 123 self.assertEqual(obj.obj_what_changed(), set(['foo'])) obj.refresh() self.assertEqual(obj.obj_what_changed(), set([])) self.assertEqual(obj.foo, 321) self.assertEqual(obj.bar, 'refreshed') def test_changed_4(self): obj = MyObj.query(self.context) obj.bar = 'something' self.assertEqual(obj.obj_what_changed(), set(['bar'])) obj.modify_save_modify() self.assertEqual(obj.obj_what_changed(), set(['foo', 'rel_object'])) self.assertEqual(obj.foo, 42) self.assertEqual(obj.bar, 'meow') self.assertIsInstance(obj.rel_object, MyOwnedObject) def test_changed_with_sub_object(self): @base.VersionedObjectRegistry.register class ParentObject(base.VersionedObject): fields = {'foo': fields.IntegerField(), 'bar': fields.ObjectField('MyObj'), } obj = ParentObject() self.assertEqual(set(), obj.obj_what_changed()) obj.foo = 1 self.assertEqual(set(['foo']), obj.obj_what_changed()) bar = MyObj() obj.bar = bar self.assertEqual(set(['foo', 'bar']), obj.obj_what_changed()) obj.obj_reset_changes() self.assertEqual(set(), obj.obj_what_changed()) bar.foo = 1 self.assertEqual(set(['bar']), obj.obj_what_changed()) def test_static_result(self): obj = MyObj.query(self.context) self.assertEqual(obj.bar, 'bar') result = obj.marco() self.assertEqual(result, 'polo') def test_updates(self): obj = MyObj.query(self.context) self.assertEqual(obj.foo, 1) obj._update_test() self.assertEqual(obj.bar, 'updated') def test_contains(self): obj = MyOwnedObject() self.assertNotIn('baz', obj) obj.baz = 1 self.assertIn('baz', obj) self.assertNotIn('does_not_exist', obj) def test_obj_attr_is_set(self): obj = MyObj(foo=1) self.assertTrue(obj.obj_attr_is_set('foo')) self.assertFalse(obj.obj_attr_is_set('bar')) self.assertRaises(AttributeError, obj.obj_attr_is_set, 'bang') def test_obj_reset_changes_recursive(self): obj = MyObj(rel_object=MyOwnedObject(baz=123), rel_objects=[MyOwnedObject(baz=456)]) self.assertEqual(set(['rel_object', 'rel_objects']), obj.obj_what_changed()) obj.obj_reset_changes() self.assertEqual(set(['rel_object']), obj.obj_what_changed()) self.assertEqual(set(['baz']), obj.rel_object.obj_what_changed()) self.assertEqual(set(['baz']), obj.rel_objects[0].obj_what_changed()) obj.obj_reset_changes(recursive=True, fields=['foo']) self.assertEqual(set(['rel_object']), obj.obj_what_changed()) self.assertEqual(set(['baz']), obj.rel_object.obj_what_changed()) self.assertEqual(set(['baz']), obj.rel_objects[0].obj_what_changed()) obj.obj_reset_changes(recursive=True) self.assertEqual(set([]), obj.rel_object.obj_what_changed()) self.assertEqual(set([]), obj.obj_what_changed()) def test_get(self): obj = MyObj(foo=1) # Foo has value, should not get the default self.assertEqual(obj.get('foo', 2), 1) # Foo has value, should return the value without error self.assertEqual(obj.get('foo'), 1) # Bar is not loaded, so we should get the default self.assertEqual(obj.get('bar', 'not-loaded'), 'not-loaded') # Bar without a default should lazy-load self.assertEqual(obj.get('bar'), 'loaded!') # Bar now has a default, but loaded value should be returned self.assertEqual(obj.get('bar', 'not-loaded'), 'loaded!') # Invalid attribute should raise AttributeError self.assertRaises(AttributeError, obj.get, 'nothing') # ...even with a default self.assertRaises(AttributeError, obj.get, 'nothing', 3) def test_object_inheritance(self): base_fields = [] myobj_fields = (['foo', 'bar', 'missing', 'readonly', 'rel_object', 'rel_objects', 'mutable_default', 'timestamp'] + base_fields) myobj3_fields = ['new_field'] self.assertTrue(issubclass(TestSubclassedObject, MyObj)) self.assertEqual(len(myobj_fields), len(MyObj.fields)) self.assertEqual(set(myobj_fields), set(MyObj.fields.keys())) self.assertEqual(len(myobj_fields) + len(myobj3_fields), len(TestSubclassedObject.fields)) self.assertEqual(set(myobj_fields) | set(myobj3_fields), set(TestSubclassedObject.fields.keys())) def test_obj_as_admin(self): self.skip('oslo.context does not support elevated()') obj = MyObj(context=self.context) def fake(*args, **kwargs): self.assertTrue(obj._context.is_admin) with mock.patch.object(obj, 'obj_reset_changes') as mock_fn: mock_fn.side_effect = fake with obj.obj_as_admin(): obj.save() self.assertTrue(mock_fn.called) self.assertFalse(obj._context.is_admin) def test_get_changes(self): obj = MyObj() self.assertEqual({}, obj.obj_get_changes()) obj.foo = 123 self.assertEqual({'foo': 123}, obj.obj_get_changes()) obj.bar = 'test' self.assertEqual({'foo': 123, 'bar': 'test'}, obj.obj_get_changes()) obj.obj_reset_changes() self.assertEqual({}, obj.obj_get_changes()) timestamp = datetime.datetime(2001, 1, 1, tzinfo=pytz.utc) with mock.patch.object(timeutils, 'utcnow') as mock_utcnow: mock_utcnow.return_value = timestamp obj.timestamp = timeutils.utcnow() self.assertEqual({'timestamp': timestamp}, obj.obj_get_changes()) obj.obj_reset_changes() self.assertEqual({}, obj.obj_get_changes()) # Timestamp without tzinfo causes mismatch timestamp = datetime.datetime(2001, 1, 1) with mock.patch.object(timeutils, 'utcnow') as mock_utcnow: mock_utcnow.return_value = timestamp obj.timestamp = timeutils.utcnow() self.assertRaises(TypeError, obj.obj_get_changes()) obj.obj_reset_changes() self.assertEqual({}, obj.obj_get_changes()) def test_obj_fields(self): class TestObj(base.VersionedObject): fields = {'foo': fields.Field(fields.Integer())} obj_extra_fields = ['bar'] @property def bar(self): return 'this is bar' obj = TestObj() self.assertEqual(['foo', 'bar'], obj.obj_fields) def test_obj_constructor(self): obj = MyObj(context=self.context, foo=123, bar='abc') self.assertEqual(123, obj.foo) self.assertEqual('abc', obj.bar) self.assertEqual(set(['foo', 'bar']), obj.obj_what_changed()) def test_obj_read_only(self): obj = MyObj(context=self.context, foo=123, bar='abc') obj.readonly = 1 self.assertRaises(exception.ReadOnlyFieldError, setattr, obj, 'readonly', 2) def test_obj_mutable_default(self): obj = MyObj(context=self.context, foo=123, bar='abc') obj.mutable_default = None obj.mutable_default.append('s1') self.assertEqual(obj.mutable_default, ['s1']) obj1 = MyObj(context=self.context, foo=123, bar='abc') obj1.mutable_default = None obj1.mutable_default.append('s2') self.assertEqual(obj1.mutable_default, ['s2']) def test_obj_mutable_default_set_default(self): obj1 = MyObj(context=self.context, foo=123, bar='abc') obj1.obj_set_defaults('mutable_default') self.assertEqual(obj1.mutable_default, []) obj1.mutable_default.append('s1') self.assertEqual(obj1.mutable_default, ['s1']) obj2 = MyObj(context=self.context, foo=123, bar='abc') obj2.obj_set_defaults('mutable_default') self.assertEqual(obj2.mutable_default, []) obj2.mutable_default.append('s2') self.assertEqual(obj2.mutable_default, ['s2']) def test_obj_repr(self): obj = MyObj(foo=123) self.assertEqual('MyObj(bar=,foo=123,missing=,' 'mutable_default=,readonly=,' 'rel_object=,rel_objects=,timestamp=)', repr(obj)) def test_obj_repr_sensitive(self): obj = MySensitiveObj(data="""{'admin_password':'mypassword'}""") self.assertEqual( 'MySensitiveObj(data=\'{\'admin_password\':\'***\'}\')', repr(obj)) obj2 = MySensitiveObj() self.assertEqual('MySensitiveObj(data=)', repr(obj2)) def test_obj_make_obj_compatible_with_relationships(self): subobj = MyOwnedObject(baz=1) obj = MyObj(rel_object=subobj) obj.obj_relationships = { 'rel_object': [('1.5', '1.1'), ('1.7', '1.2')], } primitive = obj.obj_to_primitive()['versioned_object.data'] with mock.patch.object(subobj, 'obj_make_compatible') as mock_compat: obj._obj_make_obj_compatible(copy.copy(primitive), '1.8', 'rel_object') self.assertFalse(mock_compat.called) with mock.patch.object(subobj, 'obj_make_compatible') as mock_compat: obj._obj_make_obj_compatible(copy.copy(primitive), '1.7', 'rel_object') mock_compat.assert_called_once_with( primitive['rel_object']['versioned_object.data'], '1.2') self.assertEqual( '1.2', primitive['rel_object']['versioned_object.version']) with mock.patch.object(subobj, 'obj_make_compatible') as mock_compat: obj._obj_make_obj_compatible(copy.copy(primitive), '1.6', 'rel_object') mock_compat.assert_called_once_with( primitive['rel_object']['versioned_object.data'], '1.1') self.assertEqual( '1.1', primitive['rel_object']['versioned_object.version']) with mock.patch.object(subobj, 'obj_make_compatible') as mock_compat: obj._obj_make_obj_compatible(copy.copy(primitive), '1.5', 'rel_object') mock_compat.assert_called_once_with( primitive['rel_object']['versioned_object.data'], '1.1') self.assertEqual( '1.1', primitive['rel_object']['versioned_object.version']) with mock.patch.object(subobj, 'obj_make_compatible') as mock_compat: _prim = copy.copy(primitive) obj._obj_make_obj_compatible(_prim, '1.4', 'rel_object') self.assertFalse(mock_compat.called) self.assertNotIn('rel_object', _prim) def test_obj_make_compatible_hits_sub_objects_with_rels(self): subobj = MyOwnedObject(baz=1) obj = MyObj(foo=123, rel_object=subobj) obj.obj_relationships = {'rel_object': [('1.0', '1.0')]} with mock.patch.object(obj, '_obj_make_obj_compatible') as mock_compat: obj.obj_make_compatible({'rel_object': 'foo'}, '1.10') mock_compat.assert_called_once_with({'rel_object': 'foo'}, '1.10', 'rel_object') def test_obj_make_compatible_skips_unset_sub_objects_with_rels(self): obj = MyObj(foo=123) obj.obj_relationships = {'rel_object': [('1.0', '1.0')]} with mock.patch.object(obj, '_obj_make_obj_compatible') as mock_compat: obj.obj_make_compatible({'rel_object': 'foo'}, '1.10') self.assertFalse(mock_compat.called) def test_obj_make_compatible_complains_about_missing_rel_rules(self): subobj = MyOwnedObject(baz=1) obj = MyObj(foo=123, rel_object=subobj) obj.obj_relationships = {} self.assertRaises(exception.ObjectActionError, obj.obj_make_compatible, {}, '1.0') def test_obj_make_compatible_handles_list_of_objects_with_rels(self): subobj = MyOwnedObject(baz=1) obj = MyObj(rel_objects=[subobj]) obj.obj_relationships = {'rel_objects': [('1.0', '1.123')]} def fake_make_compat(primitive, version, **k): self.assertEqual('1.123', version) self.assertIn('baz', primitive) with mock.patch.object(subobj, 'obj_make_compatible') as mock_mc: mock_mc.side_effect = fake_make_compat obj.obj_to_primitive('1.0') self.assertTrue(mock_mc.called) def test_obj_make_compatible_with_manifest(self): subobj = MyOwnedObject(baz=1) obj = MyObj(rel_object=subobj) obj.obj_relationships = {} orig_primitive = obj.obj_to_primitive()['versioned_object.data'] with mock.patch.object(subobj, 'obj_make_compatible') as mock_compat: manifest = {'MyOwnedObject': '1.2'} primitive = copy.deepcopy(orig_primitive) obj.obj_make_compatible_from_manifest(primitive, '1.5', manifest) mock_compat.assert_called_once_with( primitive['rel_object']['versioned_object.data'], '1.2') self.assertEqual( '1.2', primitive['rel_object']['versioned_object.version']) with mock.patch.object(subobj, 'obj_make_compatible') as mock_compat: manifest = {'MyOwnedObject': '1.0'} primitive = copy.deepcopy(orig_primitive) obj.obj_make_compatible_from_manifest(primitive, '1.5', manifest) mock_compat.assert_called_once_with( primitive['rel_object']['versioned_object.data'], '1.0') self.assertEqual( '1.0', primitive['rel_object']['versioned_object.version']) with mock.patch.object(subobj, 'obj_make_compatible') as mock_compat: manifest = {} primitive = copy.deepcopy(orig_primitive) obj.obj_make_compatible_from_manifest(primitive, '1.5', manifest) self.assertFalse(mock_compat.called) self.assertEqual( '1.0', primitive['rel_object']['versioned_object.version']) def test_obj_make_compatible_with_manifest_subobj(self): # Make sure that we call the subobject's "from_manifest" method # as well subobj = MyOwnedObject(baz=1) obj = MyObj(rel_object=subobj) obj.obj_relationships = {} manifest = {'MyOwnedObject': '1.2'} primitive = obj.obj_to_primitive()['versioned_object.data'] method = 'obj_make_compatible_from_manifest' with mock.patch.object(subobj, method) as mock_compat: obj.obj_make_compatible_from_manifest(primitive, '1.5', manifest) mock_compat.assert_called_once_with( primitive['rel_object']['versioned_object.data'], '1.2', version_manifest=manifest) def test_obj_make_compatible_with_manifest_subobj_list(self): # Make sure that we call the subobject's "from_manifest" method # as well subobj = MyOwnedObject(baz=1) obj = MyObj(rel_objects=[subobj]) obj.obj_relationships = {} manifest = {'MyOwnedObject': '1.2'} primitive = obj.obj_to_primitive()['versioned_object.data'] method = 'obj_make_compatible_from_manifest' with mock.patch.object(subobj, method) as mock_compat: obj.obj_make_compatible_from_manifest(primitive, '1.5', manifest) mock_compat.assert_called_once_with( primitive['rel_objects'][0]['versioned_object.data'], '1.2', version_manifest=manifest) def test_delattr(self): obj = MyObj(bar='foo') del obj.bar # Should appear unset now self.assertFalse(obj.obj_attr_is_set('bar')) # Make sure post-delete, references trigger lazy loads self.assertEqual('loaded!', getattr(obj, 'bar')) def test_delattr_unset(self): obj = MyObj() self.assertRaises(AttributeError, delattr, obj, 'bar') def test_obj_make_compatible_on_list_base(self): @base.VersionedObjectRegistry.register_if(False) class MyList(base.ObjectListBase, base.VersionedObject): VERSION = '1.1' fields = {'objects': fields.ListOfObjectsField('MyObj')} childobj = MyObj(foo=1) listobj = MyList(objects=[childobj]) compat_func = 'obj_make_compatible_from_manifest' with mock.patch.object(childobj, compat_func) as mock_compat: listobj.obj_to_primitive(target_version='1.0') mock_compat.assert_called_once_with({'foo': 1}, '1.0', version_manifest=None) def test_comparable_objects(self): obj1 = MyComparableObj(foo=1) obj2 = MyComparableObj(foo=1) obj3 = MyComparableObj(foo=2) self.assertTrue(obj1 == obj2) self.assertFalse(obj1 == obj3) def test_compound_clone(self): obj = MyCompoundObject() obj.foo = [1, 2, 3] obj.bar = {"a": 1, "b": 2, "c": 3} obj.baz = set([1, 2, 3]) copy = obj.obj_clone() self.assertEqual(obj.foo, copy.foo) self.assertEqual(obj.bar, copy.bar) self.assertEqual(obj.baz, copy.baz) # ensure that the cloned object still coerces values in its compounds copy.foo.append("4") copy.bar.update(d="4") copy.baz.add("4") self.assertEqual([1, 2, 3, 4], copy.foo) self.assertEqual({"a": 1, "b": 2, "c": 3, "d": 4}, copy.bar) self.assertEqual(set([1, 2, 3, 4]), copy.baz) def test_obj_list_fields_modifications(self): @base.VersionedObjectRegistry.register class ObjWithList(base.VersionedObject): fields = { 'list_field': fields.Field(fields.List(fields.Integer())), } obj = ObjWithList() def set_by_index(val): obj.list_field[0] = val def append(val): obj.list_field.append(val) def extend(val): obj.list_field.extend([val]) def add(val): obj.list_field = obj.list_field + [val] def iadd(val): """Test += corner case a=a+b and a+=b use different magic methods under the hood: first one calls __add__ which clones initial value before the assignment, second one call __iadd__ which modifies the initial list. Assignment should cause coercing in both cases, but __iadd__ may corrupt the initial value even if the assignment fails. So it should be overridden as well, and this test is needed to verify it """ obj.list_field += [val] def insert(val): obj.list_field.insert(0, val) def simple_slice(val): obj.list_field[:] = [val] def extended_slice(val): """Extended slice case Extended slice (and regular slices in py3) are handled differently thus needing a separate test """ obj.list_field[::2] = [val] # positive tests to ensure that coercing works properly obj.list_field = ["42"] set_by_index("1") append("2") extend("3") add("4") iadd("5") insert("0") self.assertEqual([0, 1, 2, 3, 4, 5], obj.list_field) simple_slice("10") self.assertEqual([10], obj.list_field) extended_slice("42") self.assertEqual([42], obj.list_field) obj.obj_reset_changes() # negative tests with non-coerceable values self.assertRaises(ValueError, set_by_index, "abc") self.assertRaises(ValueError, append, "abc") self.assertRaises(ValueError, extend, "abc") self.assertRaises(ValueError, add, "abc") self.assertRaises(ValueError, iadd, "abc") self.assertRaises(ValueError, insert, "abc") self.assertRaises(ValueError, simple_slice, "abc") self.assertRaises(ValueError, extended_slice, "abc") # ensure that nothing has been changed self.assertEqual([42], obj.list_field) self.assertEqual({}, obj.obj_get_changes()) def test_obj_dict_field_modifications(self): @base.VersionedObjectRegistry.register class ObjWithDict(base.VersionedObject): fields = { 'dict_field': fields.Field(fields.Dict(fields.Integer())), } obj = ObjWithDict() obj.dict_field = {"1": 1, "3": 3, "4": 4} def set_by_key(key, value): obj.dict_field[key] = value def add_by_key(key, value): obj.dict_field[key] = value def update_w_dict(key, value): obj.dict_field.update({key: value}) def update_w_kwargs(key, value): obj.dict_field.update(**{key: value}) def setdefault(key, value): obj.dict_field.setdefault(key, value) # positive tests to ensure that coercing works properly set_by_key("1", "10") add_by_key("2", "20") update_w_dict("3", "30") update_w_kwargs("4", "40") setdefault("5", "50") self.assertEqual({"1": 10, "2": 20, "3": 30, "4": 40, "5": 50}, obj.dict_field) obj.obj_reset_changes() # negative tests with non-coerceable values self.assertRaises(ValueError, set_by_key, "key", "abc") self.assertRaises(ValueError, add_by_key, "other", "abc") self.assertRaises(ValueError, update_w_dict, "key", "abc") self.assertRaises(ValueError, update_w_kwargs, "key", "abc") self.assertRaises(ValueError, setdefault, "other", "abc") # ensure that nothing has been changed self.assertEqual({"1": 10, "2": 20, "3": 30, "4": 40, "5": 50}, obj.dict_field) self.assertEqual({}, obj.obj_get_changes()) def test_obj_set_field_modifications(self): @base.VersionedObjectRegistry.register class ObjWithSet(base.VersionedObject): fields = { 'set_field': fields.Field(fields.Set(fields.Integer())) } obj = ObjWithSet() obj.set_field = set([42]) def add(value): obj.set_field.add(value) def update_w_set(value): obj.set_field.update(set([value])) def update_w_list(value): obj.set_field.update([value, value, value]) def sym_diff_upd(value): obj.set_field.symmetric_difference_update(set([value])) def union(value): obj.set_field = obj.set_field | set([value]) def iunion(value): obj.set_field |= set([value]) def xor(value): obj.set_field = obj.set_field ^ set([value]) def ixor(value): obj.set_field ^= set([value]) # positive tests to ensure that coercing works properly sym_diff_upd("42") add("1") update_w_list("2") update_w_set("3") union("4") iunion("5") xor("6") ixor("7") self.assertEqual(set([1, 2, 3, 4, 5, 6, 7]), obj.set_field) obj.set_field = set([42]) obj.obj_reset_changes() # negative tests with non-coerceable values self.assertRaises(ValueError, add, "abc") self.assertRaises(ValueError, update_w_list, "abc") self.assertRaises(ValueError, update_w_set, "abc") self.assertRaises(ValueError, sym_diff_upd, "abc") self.assertRaises(ValueError, union, "abc") self.assertRaises(ValueError, iunion, "abc") self.assertRaises(ValueError, xor, "abc") self.assertRaises(ValueError, ixor, "abc") # ensure that nothing has been changed self.assertEqual(set([42]), obj.set_field) self.assertEqual({}, obj.obj_get_changes()) class TestObject(_LocalTest, _TestObject): def test_set_defaults(self): obj = MyObj() obj.obj_set_defaults('foo') self.assertTrue(obj.obj_attr_is_set('foo')) self.assertEqual(1, obj.foo) def test_set_defaults_no_default(self): obj = MyObj() self.assertRaises(exception.ObjectActionError, obj.obj_set_defaults, 'bar') def test_set_all_defaults(self): obj = MyObj() obj.obj_set_defaults() self.assertEqual(set(['mutable_default', 'foo']), obj.obj_what_changed()) self.assertEqual(1, obj.foo) def test_set_defaults_not_overwrite(self): # NOTE(danms): deleted defaults to False, so verify that it does # not get reset by obj_set_defaults() obj = MyObj(deleted=True) obj.obj_set_defaults() self.assertEqual(1, obj.foo) self.assertTrue(obj.deleted) class TestRemoteObject(_RemoteTest, _TestObject): @mock.patch('oslo_versionedobjects.base.obj_tree_get_versions') def test_major_version_mismatch(self, mock_otgv): mock_otgv.return_value = {'MyObj': '2.0'} self.assertRaises(exception.IncompatibleObjectVersion, MyObj2.query, self.context) @mock.patch('oslo_versionedobjects.base.obj_tree_get_versions') def test_minor_version_greater(self, mock_otgv): mock_otgv.return_value = {'MyObj': '1.7'} self.assertRaises(exception.IncompatibleObjectVersion, MyObj2.query, self.context) @mock.patch('oslo_versionedobjects.base.obj_tree_get_versions') def test_minor_version_less(self, mock_otgv): mock_otgv.return_value = {'MyObj': '1.2'} obj = MyObj2.query(self.context) self.assertEqual(obj.bar, 'bar') @mock.patch('oslo_versionedobjects.base.obj_tree_get_versions') def test_compat(self, mock_otgv): mock_otgv.return_value = {'MyObj': '1.1'} obj = MyObj2.query(self.context) self.assertEqual('oldbar', obj.bar) @mock.patch('oslo_versionedobjects.base.obj_tree_get_versions') def test_revision_ignored(self, mock_otgv): mock_otgv.return_value = {'MyObj': '1.1.456'} obj = MyObj2.query(self.context) self.assertEqual('bar', obj.bar) def test_class_action_falls_back_compat(self): with mock.patch.object(base.VersionedObject, 'indirection_api') as ma: ma.object_class_action_versions.side_effect = NotImplementedError MyObj.query(self.context) ma.object_class_action.assert_called_once_with( self.context, 'MyObj', 'query', MyObj.VERSION, (), {}) class TestObjectListBase(test.TestCase): def test_list_like_operations(self): @base.VersionedObjectRegistry.register class MyElement(base.VersionedObject): fields = {'foo': fields.IntegerField()} def __init__(self, foo): super(MyElement, self).__init__() self.foo = foo class Foo(base.ObjectListBase, base.VersionedObject): fields = {'objects': fields.ListOfObjectsField('MyElement')} objlist = Foo(context='foo', objects=[MyElement(1), MyElement(2), MyElement(3)]) self.assertEqual(list(objlist), objlist.objects) self.assertEqual(len(objlist), 3) self.assertIn(objlist.objects[0], objlist) self.assertEqual(list(objlist[:1]), [objlist.objects[0]]) self.assertEqual(objlist[:1]._context, 'foo') self.assertEqual(objlist[2], objlist.objects[2]) self.assertEqual(objlist.count(objlist.objects[0]), 1) self.assertEqual(objlist.index(objlist.objects[1]), 1) objlist.sort(key=lambda x: x.foo, reverse=True) self.assertEqual([3, 2, 1], [x.foo for x in objlist]) def test_serialization(self): @base.VersionedObjectRegistry.register class Foo(base.ObjectListBase, base.VersionedObject): fields = {'objects': fields.ListOfObjectsField('Bar')} @base.VersionedObjectRegistry.register class Bar(base.VersionedObject): fields = {'foo': fields.Field(fields.String())} obj = Foo(objects=[]) for i in 'abc': bar = Bar(foo=i) obj.objects.append(bar) obj2 = base.VersionedObject.obj_from_primitive(obj.obj_to_primitive()) self.assertFalse(obj is obj2) self.assertEqual([x.foo for x in obj], [y.foo for y in obj2]) def _test_object_list_version_mappings(self, list_obj_class): # Figure out what sort of object this list is for list_field = list_obj_class.fields['objects'] item_obj_field = list_field._type._element_type item_obj_name = item_obj_field._type._obj_name # Look through all object classes of this type and make sure that # the versions we find are covered by the parent list class obj_classes = base.VersionedObjectRegistry.obj_classes()[item_obj_name] for item_class in obj_classes: if is_test_object(item_class): continue self.assertIn( item_class.VERSION, list_obj_class.child_versions.values(), 'Version mapping is incomplete for %s' % ( list_obj_class.__name__)) def test_object_version_mappings(self): self.skip('this needs to be generalized') # Find all object list classes and make sure that they at least handle # all the current object versions for obj_classes in base.VersionedObjectRegistry.obj_classes().values(): for obj_class in obj_classes: if issubclass(obj_class, base.ObjectListBase): self._test_object_list_version_mappings(obj_class) def test_obj_make_compatible_child_versions(self): @base.VersionedObjectRegistry.register class MyElement(base.VersionedObject): fields = {'foo': fields.IntegerField()} @base.VersionedObjectRegistry.register class Foo(base.ObjectListBase, base.VersionedObject): VERSION = '1.1' fields = {'objects': fields.ListOfObjectsField('MyElement')} child_versions = {'1.0': '1.0', '1.1': '1.0'} subobj = MyElement(foo=1) obj = Foo(objects=[subobj]) primitive = obj.obj_to_primitive()['versioned_object.data'] with mock.patch.object(subobj, 'obj_make_compatible') as mock_compat: obj.obj_make_compatible(copy.copy(primitive), '1.1') self.assertTrue(mock_compat.called) def test_obj_make_compatible_obj_relationships(self): @base.VersionedObjectRegistry.register class MyElement(base.VersionedObject): fields = {'foo': fields.IntegerField()} @base.VersionedObjectRegistry.register class Bar(base.ObjectListBase, base.VersionedObject): VERSION = '1.1' fields = {'objects': fields.ListOfObjectsField('MyElement')} obj_relationships = { 'objects': [('1.0', '1.0'), ('1.1', '1.0')] } subobj = MyElement(foo=1) obj = Bar(objects=[subobj]) primitive = obj.obj_to_primitive()['versioned_object.data'] with mock.patch.object(subobj, 'obj_make_compatible') as mock_compat: obj.obj_make_compatible(copy.copy(primitive), '1.1') self.assertTrue(mock_compat.called) def test_obj_make_compatible_no_relationships(self): @base.VersionedObjectRegistry.register class MyElement(base.VersionedObject): fields = {'foo': fields.IntegerField()} @base.VersionedObjectRegistry.register class Baz(base.ObjectListBase, base.VersionedObject): VERSION = '1.1' fields = {'objects': fields.ListOfObjectsField('MyElement')} subobj = MyElement(foo=1) obj = Baz(objects=[subobj]) primitive = obj.obj_to_primitive()['versioned_object.data'] with mock.patch.object(subobj, 'obj_make_compatible') as mock_compat: obj.obj_make_compatible(copy.copy(primitive), '1.1') self.assertTrue(mock_compat.called) def test_list_changes(self): @base.VersionedObjectRegistry.register class Foo(base.ObjectListBase, base.VersionedObject): fields = {'objects': fields.ListOfObjectsField('Bar')} @base.VersionedObjectRegistry.register class Bar(base.VersionedObject): fields = {'foo': fields.StringField()} obj = Foo(objects=[]) self.assertEqual(set(['objects']), obj.obj_what_changed()) obj.objects.append(Bar(foo='test')) self.assertEqual(set(['objects']), obj.obj_what_changed()) obj.obj_reset_changes() # This should still look dirty because the child is dirty self.assertEqual(set(['objects']), obj.obj_what_changed()) obj.objects[0].obj_reset_changes() # This should now look clean because the child is clean self.assertEqual(set(), obj.obj_what_changed()) def test_initialize_objects(self): class Foo(base.ObjectListBase, base.VersionedObject): fields = {'objects': fields.ListOfObjectsField('Bar')} class Bar(base.VersionedObject): fields = {'foo': fields.StringField()} obj = Foo() self.assertEqual([], obj.objects) self.assertEqual(set(), obj.obj_what_changed()) def test_obj_repr(self): @base.VersionedObjectRegistry.register class Foo(base.ObjectListBase, base.VersionedObject): fields = {'objects': fields.ListOfObjectsField('Bar')} @base.VersionedObjectRegistry.register class Bar(base.VersionedObject): fields = {'uuid': fields.StringField()} obj = Foo(objects=[Bar(uuid='fake-uuid')]) self.assertEqual('Foo(objects=[Bar(fake-uuid)])', repr(obj)) class TestObjectSerializer(_BaseTestCase): def test_serialize_entity_primitive(self): ser = base.VersionedObjectSerializer() for thing in (1, 'foo', [1, 2], {'foo': 'bar'}): self.assertEqual(thing, ser.serialize_entity(None, thing)) def test_deserialize_entity_primitive(self): ser = base.VersionedObjectSerializer() for thing in (1, 'foo', [1, 2], {'foo': 'bar'}): self.assertEqual(thing, ser.deserialize_entity(None, thing)) def test_serialize_set_to_list(self): ser = base.VersionedObjectSerializer() self.assertEqual([1, 2], ser.serialize_entity(None, set([1, 2]))) @mock.patch('oslo_versionedobjects.base.VersionedObject.indirection_api') def _test_deserialize_entity_newer(self, obj_version, backported_to, mock_iapi, my_version='1.6'): ser = base.VersionedObjectSerializer() mock_iapi.object_backport_versions.return_value = 'backported' @base.VersionedObjectRegistry.register class MyTestObj(MyObj): VERSION = my_version obj = MyTestObj() obj.VERSION = obj_version primitive = obj.obj_to_primitive() result = ser.deserialize_entity(self.context, primitive) if backported_to is None: self.assertFalse(mock_iapi.object_backport_versions.called) else: self.assertEqual('backported', result) mock_iapi.object_backport_versions.assert_called_with( self.context, primitive, {'MyTestObj': my_version, 'MyOwnedObject': '1.0'}) def test_deserialize_entity_newer_version_backports(self): self._test_deserialize_entity_newer('1.25', '1.6') def test_deserialize_entity_newer_revision_does_not_backport_zero(self): self._test_deserialize_entity_newer('1.6.0', None) def test_deserialize_entity_newer_revision_does_not_backport(self): self._test_deserialize_entity_newer('1.6.1', None) def test_deserialize_entity_newer_version_passes_revision(self): self._test_deserialize_entity_newer('1.7', '1.6.1', my_version='1.6.1') def test_deserialize_dot_z_with_extra_stuff(self): primitive = {'versioned_object.name': 'MyObj', 'versioned_object.namespace': 'versionedobjects', 'versioned_object.version': '1.6.1', 'versioned_object.data': { 'foo': 1, 'unexpected_thing': 'foobar'}} ser = base.VersionedObjectSerializer() obj = ser.deserialize_entity(self.context, primitive) self.assertEqual(1, obj.foo) self.assertFalse(hasattr(obj, 'unexpected_thing')) # NOTE(danms): The serializer is where the logic lives that # avoids backports for cases where only a .z difference in # the received object version is detected. As a result, we # end up with a version of what we expected, effectively the # .0 of the object. self.assertEqual('1.6', obj.VERSION) def test_deserialize_entity_newer_version_no_indirection(self): ser = base.VersionedObjectSerializer() obj = MyObj() obj.VERSION = '1.25' primitive = obj.obj_to_primitive() self.assertRaises(exception.IncompatibleObjectVersion, ser.deserialize_entity, self.context, primitive) def _test_nested_backport(self, old): @base.VersionedObjectRegistry.register class Parent(base.VersionedObject): VERSION = '1.0' fields = { 'child': fields.ObjectField('MyObj'), } @base.VersionedObjectRegistry.register # noqa class Parent(base.VersionedObject): VERSION = '1.1' fields = { 'child': fields.ObjectField('MyObj'), } child = MyObj(foo=1) parent = Parent(child=child) prim = parent.obj_to_primitive() child_prim = prim['versioned_object.data']['child'] child_prim['versioned_object.version'] = '1.10' ser = base.VersionedObjectSerializer() with mock.patch.object(base.VersionedObject, 'indirection_api') as a: if old: a.object_backport_versions.side_effect = NotImplementedError ser.deserialize_entity(self.context, prim) a.object_backport_versions.assert_called_once_with( self.context, prim, {'Parent': '1.1', 'MyObj': '1.6', 'MyOwnedObject': '1.0'}) if old: # NOTE(danms): This should be the version of the parent object, # not the child. If wrong, this will be '1.6', which is the max # child version in our registry. a.object_backport.assert_called_once_with( self.context, prim, '1.1') def test_nested_backport_new_method(self): self._test_nested_backport(old=False) def test_nested_backport_old_method(self): self._test_nested_backport(old=True) def test_object_serialization(self): ser = base.VersionedObjectSerializer() obj = MyObj() primitive = ser.serialize_entity(self.context, obj) self.assertIn('versioned_object.name', primitive) obj2 = ser.deserialize_entity(self.context, primitive) self.assertIsInstance(obj2, MyObj) self.assertEqual(self.context, obj2._context) def test_object_serialization_iterables(self): ser = base.VersionedObjectSerializer() obj = MyObj() for iterable in (list, tuple, set): thing = iterable([obj]) primitive = ser.serialize_entity(self.context, thing) self.assertEqual(1, len(primitive)) for item in primitive: self.assertNotIsInstance(item, base.VersionedObject) thing2 = ser.deserialize_entity(self.context, primitive) self.assertEqual(1, len(thing2)) for item in thing2: self.assertIsInstance(item, MyObj) # dict case thing = {'key': obj} primitive = ser.serialize_entity(self.context, thing) self.assertEqual(1, len(primitive)) for item in six.itervalues(primitive): self.assertNotIsInstance(item, base.VersionedObject) thing2 = ser.deserialize_entity(self.context, primitive) self.assertEqual(1, len(thing2)) for item in six.itervalues(thing2): self.assertIsInstance(item, MyObj) # object-action updates dict case thing = {'foo': obj.obj_to_primitive()} primitive = ser.serialize_entity(self.context, thing) self.assertEqual(thing, primitive) thing2 = ser.deserialize_entity(self.context, thing) self.assertIsInstance(thing2['foo'], base.VersionedObject) def test_serializer_subclass_namespace(self): @base.VersionedObjectRegistry.register class MyNSObj(base.VersionedObject): OBJ_SERIAL_NAMESPACE = 'foo' fields = {'foo': fields.IntegerField()} class MySerializer(base.VersionedObjectSerializer): OBJ_BASE_CLASS = MyNSObj ser = MySerializer() obj = MyNSObj(foo=123) obj2 = ser.deserialize_entity(None, ser.serialize_entity(None, obj)) self.assertIsInstance(obj2, MyNSObj) self.assertEqual(obj.foo, obj2.foo) def test_serializer_subclass_namespace_mismatch(self): @base.VersionedObjectRegistry.register class MyNSObj(base.VersionedObject): OBJ_SERIAL_NAMESPACE = 'foo' fields = {'foo': fields.IntegerField()} class MySerializer(base.VersionedObjectSerializer): OBJ_BASE_CLASS = MyNSObj myser = MySerializer() voser = base.VersionedObjectSerializer() obj = MyObj(foo=123) obj2 = myser.deserialize_entity(None, voser.serialize_entity(None, obj)) # NOTE(danms): The new serializer should have ignored the objects # serialized by the base serializer, so obj2 here should be a dict # primitive and not a hydrated object self.assertNotIsInstance(obj2, MyNSObj) self.assertIn('versioned_object.name', obj2) def test_serializer_subclass_base_object_indirection(self): @base.VersionedObjectRegistry.register class MyNSObj(base.VersionedObject): OBJ_SERIAL_NAMESPACE = 'foo' fields = {'foo': fields.IntegerField()} indirection_api = mock.MagicMock() class MySerializer(base.VersionedObjectSerializer): OBJ_BASE_CLASS = MyNSObj ser = MySerializer() prim = MyNSObj(foo=1).obj_to_primitive() prim['foo.version'] = '2.0' ser.deserialize_entity(mock.sentinel.context, prim) indirection_api = MyNSObj.indirection_api indirection_api.object_backport_versions.assert_called_once_with( mock.sentinel.context, prim, {'MyNSObj': '1.0'}) @mock.patch('oslo_versionedobjects.base.VersionedObject.indirection_api') def test_serializer_calls_old_backport_interface(self, indirection_api): @base.VersionedObjectRegistry.register class MyOldObj(base.VersionedObject): pass ser = base.VersionedObjectSerializer() prim = MyOldObj(foo=1).obj_to_primitive() prim['versioned_object.version'] = '2.0' indirection_api.object_backport_versions.side_effect = ( NotImplementedError('Old')) ser.deserialize_entity(mock.sentinel.context, prim) indirection_api.object_backport.assert_called_once_with( mock.sentinel.context, prim, '1.0') class TestNamespaceCompatibility(test.TestCase): def setUp(self): super(TestNamespaceCompatibility, self).setUp() @base.VersionedObjectRegistry.register_if(False) class TestObject(base.VersionedObject): OBJ_SERIAL_NAMESPACE = 'foo' OBJ_PROJECT_NAMESPACE = 'tests' self.test_class = TestObject def test_obj_primitive_key(self): self.assertEqual('foo.data', self.test_class._obj_primitive_key('data')) def test_obj_primitive_field(self): primitive = { 'foo.data': mock.sentinel.data, } self.assertEqual(mock.sentinel.data, self.test_class._obj_primitive_field(primitive, 'data')) def test_obj_primitive_field_namespace(self): primitive = { 'foo.name': 'TestObject', 'foo.namespace': 'tests', 'foo.version': '1.0', 'foo.data': {}, } with mock.patch.object(self.test_class, 'obj_class_from_name'): self.test_class.obj_from_primitive(primitive) def test_obj_primitive_field_namespace_wrong(self): primitive = { 'foo.name': 'TestObject', 'foo.namespace': 'wrong', 'foo.version': '1.0', 'foo.data': {}, } self.assertRaises(exception.UnsupportedObjectError, self.test_class.obj_from_primitive, primitive) class TestUtilityMethods(test.TestCase): def test_flat(self): @base.VersionedObjectRegistry.register class TestObject(base.VersionedObject): VERSION = '1.23' fields = {} tree = base.obj_tree_get_versions('TestObject') self.assertEqual({'TestObject': '1.23'}, tree) def test_parent_child(self): @base.VersionedObjectRegistry.register class TestChild(base.VersionedObject): VERSION = '2.34' @base.VersionedObjectRegistry.register class TestObject(base.VersionedObject): VERSION = '1.23' fields = { 'child': fields.ObjectField('TestChild'), } tree = base.obj_tree_get_versions('TestObject') self.assertEqual({'TestObject': '1.23', 'TestChild': '2.34'}, tree) def test_complex(self): @base.VersionedObjectRegistry.register class TestChild(base.VersionedObject): VERSION = '2.34' @base.VersionedObjectRegistry.register class TestChildTwo(base.VersionedObject): VERSION = '4.56' fields = { 'sibling': fields.ObjectField('TestChild'), } @base.VersionedObjectRegistry.register class TestObject(base.VersionedObject): VERSION = '1.23' fields = { 'child': fields.ObjectField('TestChild'), 'childtwo': fields.ListOfObjectsField('TestChildTwo'), } tree = base.obj_tree_get_versions('TestObject') self.assertEqual({'TestObject': '1.23', 'TestChild': '2.34', 'TestChildTwo': '4.56'}, tree) def test_complex_loopy(self): @base.VersionedObjectRegistry.register class TestChild(base.VersionedObject): VERSION = '2.34' fields = { 'sibling': fields.ObjectField('TestChildTwo'), } @base.VersionedObjectRegistry.register class TestChildTwo(base.VersionedObject): VERSION = '4.56' fields = { 'sibling': fields.ObjectField('TestChild'), 'parents': fields.ListOfObjectsField('TestObject'), } @base.VersionedObjectRegistry.register class TestObject(base.VersionedObject): VERSION = '1.23' fields = { 'child': fields.ObjectField('TestChild'), 'childtwo': fields.ListOfObjectsField('TestChildTwo'), } tree = base.obj_tree_get_versions('TestObject') self.assertEqual({'TestObject': '1.23', 'TestChild': '2.34', 'TestChildTwo': '4.56'}, tree)