swiftonfile/test/functional/gluster_swift_tests.py
Peter Portante 04e402599b Sync functional tests with Swift v1.9.1
Change-Id: Id478f651fe937883837291059da9da853fcd2de2
Signed-off-by: Peter Portante <peter.portante@redhat.com>
Reviewed-on: http://review.gluster.org/6141
Reviewed-by: Luis Pabon <lpabon@redhat.com>
Tested-by: Luis Pabon <lpabon@redhat.com>
2013-10-25 10:57:40 -07:00

242 lines
8.9 KiB
Python

# Copyright (c) 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" OpenStack Swift based functional tests for Gluster for Swift"""
import random
from nose import SkipTest
from test.functional.tests import config, locale, Base, Base2, Utils, \
TestFileEnv
from test.functional.swift_test_client import Account, Connection, File, \
ResponseError
web_front_end = config.get('web_front_end', 'integral')
class TestFile(Base):
env = TestFileEnv
set_up = False
def testObjectManifest(self):
if (web_front_end == 'apache2'):
raise SkipTest()
data = File.random_data(10000)
parts = random.randrange(2,10)
charsEachPart = len(data)/parts
for i in range(parts+1):
if i==0 :
file = self.env.container.file('objectmanifest')
hdrs={}
hdrs['Content-Length']='0'
hdrs['X-Object-Manifest']=str(self.env.container.name)+'/objectmanifest'
self.assert_(file.write('',hdrs=hdrs))
self.assert_(file.name in self.env.container.files())
self.assert_(file.read() == '')
elif i==parts :
file = self.env.container.file('objectmanifest'+'-'+str(i))
segment=data[ (i-1)*charsEachPart :]
self.assertTrue(file.write(segment))
else :
file = self.env.container.file('objectmanifest'+'-'+str(i))
segment=data[ (i-1)*charsEachPart : i*charsEachPart]
self.assertTrue(file.write(segment))
#matching the manifest file content with orignal data, as etag won't match
file = self.env.container.file('objectmanifest')
data_read = file.read()
self.assertEquals(data,data_read)
class TestFileUTF8(Base2, TestFile):
set_up = False
class TestContainerPathsEnv:
@classmethod
def setUp(cls):
cls.conn = Connection(config)
cls.conn.authenticate()
cls.account = Account(cls.conn, config.get('account',
config['username']))
cls.account.delete_containers()
cls.file_size = 8
cls.container = cls.account.container(Utils.create_name())
if not cls.container.create():
raise ResponseError(cls.conn.response)
cls.dirs = [
'dir1',
'dir2',
'dir1/subdir1',
'dir1/subdir2',
'dir1/subdir1/subsubdir1',
'dir1/subdir1/subsubdir2',
'dir1/subdir with spaces',
'dir1/subdir+with{whatever',
]
cls.files = [
'file1',
'file A',
'dir1/file2',
'dir1/subdir1/file2',
'dir1/subdir1/file3',
'dir1/subdir1/file4',
'dir1/subdir1/subsubdir1/file5',
'dir1/subdir1/subsubdir1/file6',
'dir1/subdir1/subsubdir1/file7',
'dir1/subdir1/subsubdir1/file8',
'dir1/subdir1/subsubdir2/file9',
'dir1/subdir1/subsubdir2/file0',
'dir1/subdir with spaces/file B',
'dir1/subdir+with{whatever/file D',
]
stored_files = set()
for d in cls.dirs:
file = cls.container.file(d)
file.write(hdrs={'Content-Type': 'application/directory'})
for f in cls.files:
file = cls.container.file(f)
file.write_random(cls.file_size, hdrs={'Content-Type':
'application/octet-stream'})
stored_files.add(f)
cls.stored_files = sorted(stored_files)
cls.sorted_objects = sorted(set(cls.dirs + cls.files))
class TestContainerPaths(Base):
env = TestContainerPathsEnv
set_up = False
def testTraverseContainer(self):
found_files = []
found_dirs = []
def recurse_path(path, count=0):
if count > 10:
raise ValueError('too deep recursion')
for file in self.env.container.files(parms={'path': path}):
self.assert_(file.startswith(path))
if file in self.env.dirs:
recurse_path(file, count + 1)
found_dirs.append(file)
else:
found_files.append(file)
recurse_path('')
for file in self.env.stored_files:
self.assert_(file in found_files)
self.assert_(file not in found_dirs)
def testContainerListing(self):
for format in (None, 'json', 'xml'):
files = self.env.container.files(parms={'format': format})
self.assertFalse(len(files) == 0)
if isinstance(files[0], dict):
files = [str(x['name']) for x in files]
self.assertEquals(files, self.env.sorted_objects)
for format in ('json', 'xml'):
for file in self.env.container.files(parms={'format': format}):
self.assert_(int(file['bytes']) >= 0)
self.assert_('last_modified' in file)
if file['name'] in self.env.dirs:
self.assertEquals(file['content_type'],
'application/directory')
else:
self.assertEquals(file['content_type'],
'application/octet-stream')
def testStructure(self):
def assert_listing(path, list):
files = self.env.container.files(parms={'path': path})
self.assertEquals(sorted(list, cmp=locale.strcoll), files)
assert_listing('', ['file1', 'dir1', 'dir2', 'file A'])
assert_listing('dir1', ['dir1/file2', 'dir1/subdir1',
'dir1/subdir2', 'dir1/subdir with spaces',
'dir1/subdir+with{whatever'])
assert_listing('dir1/subdir1',
['dir1/subdir1/file4', 'dir1/subdir1/subsubdir2',
'dir1/subdir1/file2', 'dir1/subdir1/file3',
'dir1/subdir1/subsubdir1'])
assert_listing('dir1/subdir1/subsubdir1',
['dir1/subdir1/subsubdir1/file7',
'dir1/subdir1/subsubdir1/file5',
'dir1/subdir1/subsubdir1/file8',
'dir1/subdir1/subsubdir1/file6'])
assert_listing('dir1/subdir1/subsubdir1',
['dir1/subdir1/subsubdir1/file7',
'dir1/subdir1/subsubdir1/file5',
'dir1/subdir1/subsubdir1/file8',
'dir1/subdir1/subsubdir1/file6'])
assert_listing('dir1/subdir with spaces',
['dir1/subdir with spaces/file B'])
class TestObjectVersioningEnv:
@classmethod
def setUp(cls):
cls.conn = Connection(config)
cls.conn.authenticate()
cls.account = Account(cls.conn, config.get('account',
config['username']))
cls.account.delete_containers()
cls.containers = {}
#create two containers one for object other for versions of objects
for i in range(2):
hdrs={}
if i==0:
hdrs={'X-Versions-Location':'versions'}
cont = cls.containers['object'] = cls.account.container('object')
else:
cont = cls.containers['versions'] = cls.account.container('versions')
if not cont.create(hdrs=hdrs):
raise ResponseError(cls.conn.response)
cls.containers.append(cont)
class TestObjectVersioning(Base):
env = TestObjectVersioningEnv
set_up = False
def testObjectVersioning(self):
versions = random.randrange(2,10)
dataArr=[]
#create versions
for i in range(versions):
data = File.random_data(10000*(i+1))
file = self.env.containers['object'].file('object')
self.assertTrue(file.write(data))
dataArr.append(data)
cont = self.env.containers['versions']
info = cont.info()
self.assertEquals(info['object_count'], versions-1)
#match the current version of object with data in arr and delete it
for i in range(versions):
data = dataArr[-(i+1)]
file = self.env.containers['object'].file('object')
self.assertEquals(data,file.read())
self.assert_(file.delete())
self.assert_status(204)