tuskar-ui/horizon/tests/api_tests/swift_tests.py
Ke WU 359a71e0b0 Swift Object filter improve and fix
bug#1013409
bug#1013412

Change-Id: Ib2efa534622793bfa9f190d74e3b91cc66e7516a
2012-06-18 16:51:32 -07:00

209 lines
8.5 KiB
Python

# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Copyright 2012 Nebula, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import cloudfiles
from horizon import api
from horizon import exceptions
from horizon import test
class SwiftApiTests(test.APITestCase):
def test_swift_get_containers(self):
containers = self.containers.list()
swift_api = self.stub_swiftclient()
swift_api.get_all_containers(limit=1001,
marker=None).AndReturn(containers)
self.mox.ReplayAll()
(conts, more) = api.swift_get_containers(self.request)
self.assertEqual(len(conts), len(containers))
self.assertFalse(more)
def test_swift_create_container(self):
container = self.containers.first()
swift_api = self.stub_swiftclient(expected_calls=2)
# Check for existence, then create
exc = cloudfiles.errors.NoSuchContainer()
swift_api.get_container(container.name).AndRaise(exc)
swift_api.create_container(container.name).AndReturn(container)
self.mox.ReplayAll()
# Verification handled by mox, no assertions needed.
api.swift_create_container(self.request, container.name)
def test_swift_create_duplicate_container(self):
container = self.containers.first()
swift_api = self.stub_swiftclient()
swift_api.get_container(container.name).AndReturn(container)
self.mox.ReplayAll()
# Verification handled by mox, no assertions needed.
with self.assertRaises(exceptions.AlreadyExists):
api.swift_create_container(self.request, container.name)
def test_swift_get_objects(self):
container = self.containers.first()
objects = self.objects.list()
swift_api = self.stub_swiftclient()
swift_api.get_container(container.name).AndReturn(container)
self.mox.StubOutWithMock(container, 'get_objects')
container.get_objects(limit=1001,
marker=None,
prefix=None,
delimiter='/',
path=None).AndReturn(objects)
self.mox.ReplayAll()
(objs, more) = api.swift_get_objects(self.request, container.name)
self.assertEqual(len(objs), len(objects))
self.assertFalse(more)
def test_swift_filter_objects(self):
container = self.containers.first()
objects = self.objects.list()
first_obj = self.objects.first()
expected_objs = [obj.name.encode('utf8') for obj in
self.objects.filter(name=first_obj.name)]
swift_api = self.stub_swiftclient()
swift_api.get_container(container.name).AndReturn(container)
self.mox.StubOutWithMock(container, 'get_objects')
container.get_objects(limit=10000,
marker=None,
prefix=None,
delimiter='/',
path=None).AndReturn(objects)
self.mox.ReplayAll()
result_objs = api.swift_filter_objects(self.request,
first_obj.name,
container.name)
self.assertQuerysetEqual(result_objs, expected_objs,
lambda obj: obj.name.encode('utf8'))
def test_swift_upload_object(self):
container = self.containers.first()
obj = self.objects.first()
OBJECT_DATA = 'someData'
swift_api = self.stub_swiftclient()
swift_api.get_container(container.name).AndReturn(container)
self.mox.StubOutWithMock(container, 'create_object')
container.create_object(obj.name).AndReturn(obj)
self.mox.StubOutWithMock(obj, 'send')
obj.send(OBJECT_DATA).AndReturn(obj)
self.mox.ReplayAll()
ret_val = api.swift_upload_object(self.request,
container.name,
obj.name,
OBJECT_DATA)
self.assertEqual(ret_val, obj)
def test_swift_delete_object(self):
container = self.containers.first()
obj = self.objects.first()
swift_api = self.stub_swiftclient()
swift_api.get_container(container.name).AndReturn(container)
self.mox.StubOutWithMock(container, 'delete_object')
container.delete_object(obj.name).AndReturn(obj)
self.mox.ReplayAll()
ret_val = api.swift_delete_object(self.request,
container.name,
obj.name)
self.assertIsNone(ret_val)
def test_swift_get_object_data(self):
container = self.containers.first()
obj = self.objects.first()
OBJECT_DATA = 'objectData'
swift_api = self.stub_swiftclient()
swift_api.get_container(container.name).AndReturn(container)
self.mox.StubOutWithMock(container, 'get_object')
container.get_object(obj.name).AndReturn(obj)
self.mox.StubOutWithMock(obj, 'stream')
obj.stream().AndReturn(OBJECT_DATA)
self.mox.ReplayAll()
ret_val = api.swift_get_object_data(self.request,
container.name,
obj.name)
self.assertEqual(ret_val, OBJECT_DATA)
def test_swift_object_exists(self):
container = self.containers.first()
obj = self.objects.first()
swift_api = self.stub_swiftclient(expected_calls=2)
self.mox.StubOutWithMock(container, 'get_object')
swift_api.get_container(container.name).AndReturn(container)
container.get_object(obj.name).AndReturn(obj)
swift_api.get_container(container.name).AndReturn(container)
exc = cloudfiles.errors.NoSuchObject()
container.get_object(obj.name).AndRaise(exc)
self.mox.ReplayAll()
args = self.request, container.name, obj.name
self.assertTrue(api.swift_object_exists(*args))
# Again, for a "non-existent" object
self.assertFalse(api.swift_object_exists(*args))
def test_swift_copy_object(self):
container = self.containers.get(name=u"container_one\u6346")
container_2 = self.containers.get(name=u"container_two\u6346")
obj = self.objects.first()
swift_api = self.stub_swiftclient()
self.mox.StubOutWithMock(api.swift, 'swift_object_exists')
self.mox.StubOutWithMock(container, 'get_object')
self.mox.StubOutWithMock(obj, 'copy_to')
# Using the non-unicode names here, see below.
swift_api.get_container("no_unicode").AndReturn(container)
api.swift.swift_object_exists(self.request,
"also no unicode",
"obj_with_no_unicode").AndReturn(False)
container.get_object("obj_with_no_unicode").AndReturn(obj)
obj.copy_to("also no unicode", "obj_with_no_unicode")
self.mox.ReplayAll()
# Unicode fails... we'll get to a successful test in a minute
with self.assertRaises(exceptions.HorizonException):
api.swift_copy_object(self.request,
container.name,
obj.name,
container_2.name,
obj.name)
# Verification handled by mox. No assertions needed.
container.name = "no_unicode"
container_2.name = "also no unicode"
obj.name = "obj_with_no_unicode"
api.swift_copy_object(self.request,
container.name,
obj.name,
container_2.name,
obj.name)