Move min segment size configuration to swift3
Swift has removed the minimum segment size setting for multipart upload. To make it compatible with S3, we are re-implementing it in swift3. Each upload part except the last should be more than the minimum segment size (default 5MB, same as the S3 multipart upload chunk size). When a "complete multipart upload" request comes, check all the parts and return a EntityTooSmall error if they are smaller than the minimum segment size. Change-Id: I883b25ab3d43d330ffc60fa2c3ade7a6b5802cee
This commit is contained in:
parent
710738b548
commit
b348692425
@ -116,6 +116,10 @@ use = egg:swift3#swift3
|
||||
# to achieve S3 compatibilities when force_swift_request_proxy_log is set to
|
||||
# 'true'
|
||||
# force_swift_request_proxy_log = false
|
||||
#
|
||||
# AWS S3 document says that each part must be at least 5 MB in a multipart
|
||||
# upload, except the last part.
|
||||
min_segment_size = 5242880
|
||||
|
||||
[filter:catch_errors]
|
||||
use = egg:swift#catch_errors
|
||||
@ -129,10 +133,6 @@ use = egg:swift#bulk
|
||||
[filter:slo]
|
||||
use = egg:swift#slo
|
||||
|
||||
# AWS S3 document says that each part must be at least 5 MB in a multipart
|
||||
# upload, except the last part.
|
||||
min_segment_size = 5242880
|
||||
|
||||
[filter:dlo]
|
||||
use = egg:swift#dlo
|
||||
|
||||
|
@ -45,7 +45,11 @@ class Config(dict):
|
||||
if isinstance(self.get(key), bool):
|
||||
dict.__setitem__(self, key, config_true_value(value))
|
||||
elif isinstance(self.get(key), int):
|
||||
dict.__setitem__(self, key, int(value))
|
||||
try:
|
||||
dict.__setitem__(self, key, int(value))
|
||||
except ValueError:
|
||||
if value: # No need to raise the error if value is ''
|
||||
raise
|
||||
else:
|
||||
dict.__setitem__(self, key, value)
|
||||
|
||||
@ -64,4 +68,5 @@ CONF = Config({
|
||||
'check_bucket_owner': False,
|
||||
'force_swift_request_proxy_log': False,
|
||||
'allow_multipart_uploads': True,
|
||||
'min_segment_size': 5242880,
|
||||
})
|
||||
|
@ -549,6 +549,7 @@ class UploadController(Controller):
|
||||
raise InvalidPart(upload_id=upload_id,
|
||||
part_number=part_number)
|
||||
|
||||
info['size_bytes'] = int(info['size_bytes'])
|
||||
manifest.append(info)
|
||||
except (XMLSyntaxError, DocumentInvalid):
|
||||
raise MalformedXML()
|
||||
@ -561,15 +562,22 @@ class UploadController(Controller):
|
||||
|
||||
# Following swift commit 7f636a5, zero-byte segments aren't allowed,
|
||||
# even as the final segment
|
||||
if int(info['size_bytes']) == 0:
|
||||
manifest.pop()
|
||||
empty_seg = None
|
||||
if manifest[-1]['size_bytes'] == 0:
|
||||
empty_seg = manifest.pop()
|
||||
|
||||
# Ordinarily, we just let SLO check segment sizes. However, we
|
||||
# just popped off a zero-byte segment; if there was a second
|
||||
# zero-byte segment and it was at the end, it would succeed on
|
||||
# Swift < 2.6.0 and fail on newer Swift. It seems reasonable that
|
||||
# it should always fail.
|
||||
if manifest and int(manifest[-1]['size_bytes']) == 0:
|
||||
if manifest and manifest[-1]['size_bytes'] < CONF.min_segment_size:
|
||||
raise EntityTooSmall()
|
||||
|
||||
# Check the size of each segment except the last and make sure they are
|
||||
# all more than the minimum upload chunk size
|
||||
for info in manifest[:-1]:
|
||||
if info['size_bytes'] < CONF.min_segment_size:
|
||||
raise EntityTooSmall()
|
||||
|
||||
try:
|
||||
@ -601,9 +609,9 @@ class UploadController(Controller):
|
||||
else:
|
||||
raise
|
||||
|
||||
if int(info['size_bytes']) == 0:
|
||||
if empty_seg:
|
||||
# clean up the zero-byte segment
|
||||
empty_seg_cont, empty_seg_name = info['path'].split('/', 2)[1:]
|
||||
_, empty_seg_cont, empty_seg_name = empty_seg['path'].split('/', 2)
|
||||
req.get_response(self.app, 'DELETE',
|
||||
container=empty_seg_cont, obj=empty_seg_name)
|
||||
|
||||
|
@ -24,7 +24,6 @@ ceph_s3:
|
||||
s3tests.functional.test_s3.test_list_buckets_invalid_auth: {status: KNOWN}
|
||||
s3tests.functional.test_s3.test_logging_toggle: {status: KNOWN}
|
||||
s3tests.functional.test_s3.test_multipart_resend_first_finishes_last: {status: KNOWN}
|
||||
s3tests.functional.test_s3.test_multipart_upload_size_too_small: {status: KNOWN}
|
||||
s3tests.functional.test_s3.test_object_copy_bucket_not_found: {status: KNOWN}
|
||||
s3tests.functional.test_s3.test_object_copy_canned_acl: {status: KNOWN}
|
||||
s3tests.functional.test_s3.test_object_copy_replacing_metadata: {status: KNOWN}
|
||||
|
@ -26,6 +26,7 @@ s3_acl = %S3ACL%
|
||||
location = US
|
||||
dns_compliant_bucket_names = %DNS_BUCKET_NAMES%
|
||||
check_bucket_owner = %CHECK_BUCKET_OWNER%
|
||||
min_segment_size = %MIN_SEGMENT_SIZE%
|
||||
|
||||
[filter:catch_errors]
|
||||
use = egg:swift#catch_errors
|
||||
@ -38,7 +39,6 @@ use = egg:swift#bulk
|
||||
|
||||
[filter:slo]
|
||||
use = egg:swift#slo
|
||||
min_segment_size = 4
|
||||
|
||||
[filter:dlo]
|
||||
use = egg:swift#dlo
|
||||
|
@ -43,6 +43,7 @@ for server in keystone swift proxy-server object-server container-server account
|
||||
-e "s#%USER%#`whoami`#g" \
|
||||
-e "s#%TEST_DIR%#${TEST_DIR}#g" \
|
||||
-e "s#%CONF_DIR%#${CONF_DIR}#g" \
|
||||
-e "s#%MIN_SEGMENT_SIZE%#${MIN_SEGMENT_SIZE}#g" \
|
||||
conf/${server}.conf.in \
|
||||
> conf/${server}.conf
|
||||
done
|
||||
|
@ -24,13 +24,14 @@ from distutils.version import StrictVersion
|
||||
from hashlib import md5
|
||||
from itertools import izip
|
||||
|
||||
from swift3.cfg import CONF
|
||||
from swift3.test.functional.utils import get_error_code, get_error_msg
|
||||
from swift3.etree import fromstring, tostring, Element, SubElement
|
||||
from swift3.test.functional import Swift3FunctionalTestCase
|
||||
from swift3.utils import mktime
|
||||
from swift3.test.functional.s3_test_client import Connection
|
||||
|
||||
MIN_SEGMENT_SIZE = 5242880
|
||||
MIN_SEGMENT_SIZE = CONF.min_segment_size
|
||||
|
||||
|
||||
class TestSwift3MultiUpload(Swift3FunctionalTestCase):
|
||||
@ -522,6 +523,110 @@ class TestSwift3MultiUpload(Swift3FunctionalTestCase):
|
||||
query=query)
|
||||
self.assertEqual(get_error_code(body), 'InvalidPart')
|
||||
|
||||
def test_complete_upload_min_segment_size(self):
|
||||
bucket = 'bucket'
|
||||
key = 'obj'
|
||||
self.conn.make_request('PUT', bucket)
|
||||
query = 'uploads'
|
||||
status, headers, body = \
|
||||
self.conn.make_request('POST', bucket, key, query=query)
|
||||
elem = fromstring(body, 'InitiateMultipartUploadResult')
|
||||
upload_id = elem.find('UploadId').text
|
||||
|
||||
# multi parts with no body
|
||||
etags = []
|
||||
for i in xrange(1, 3):
|
||||
query = 'partNumber=%s&uploadId=%s' % (i, upload_id)
|
||||
status, headers, body = \
|
||||
self.conn.make_request('PUT', bucket, key, query=query)
|
||||
etags.append(headers['etag'])
|
||||
xml = self._gen_comp_xml(etags)
|
||||
|
||||
query = 'uploadId=%s' % upload_id
|
||||
status, headers, body = \
|
||||
self.conn.make_request('POST', bucket, key, body=xml,
|
||||
query=query)
|
||||
self.assertEqual(get_error_code(body), 'EntityTooSmall')
|
||||
|
||||
# multi parts with all parts less than min segment size
|
||||
etags = []
|
||||
for i in xrange(1, 3):
|
||||
query = 'partNumber=%s&uploadId=%s' % (i, upload_id)
|
||||
status, headers, body = \
|
||||
self.conn.make_request('PUT', bucket, key, query=query,
|
||||
body='AA')
|
||||
etags.append(headers['etag'])
|
||||
xml = self._gen_comp_xml(etags)
|
||||
|
||||
query = 'uploadId=%s' % upload_id
|
||||
status, headers, body = \
|
||||
self.conn.make_request('POST', bucket, key, body=xml,
|
||||
query=query)
|
||||
self.assertEqual(get_error_code(body), 'EntityTooSmall')
|
||||
|
||||
# one part and less than min segment size
|
||||
etags = []
|
||||
query = 'partNumber=1&uploadId=%s' % upload_id
|
||||
status, headers, body = \
|
||||
self.conn.make_request('PUT', bucket, key, query=query,
|
||||
body='AA')
|
||||
etags.append(headers['etag'])
|
||||
xml = self._gen_comp_xml(etags)
|
||||
|
||||
query = 'uploadId=%s' % upload_id
|
||||
status, headers, body = \
|
||||
self.conn.make_request('POST', bucket, key, body=xml,
|
||||
query=query)
|
||||
self.assertEqual(status, 200)
|
||||
|
||||
# multi parts with all parts except the first part less than min
|
||||
# segment size
|
||||
query = 'uploads'
|
||||
status, headers, body = \
|
||||
self.conn.make_request('POST', bucket, key, query=query)
|
||||
elem = fromstring(body, 'InitiateMultipartUploadResult')
|
||||
upload_id = elem.find('UploadId').text
|
||||
|
||||
etags = []
|
||||
body_size = [MIN_SEGMENT_SIZE, MIN_SEGMENT_SIZE - 1, 2]
|
||||
for i in xrange(1, 3):
|
||||
query = 'partNumber=%s&uploadId=%s' % (i, upload_id)
|
||||
status, headers, body = \
|
||||
self.conn.make_request('PUT', bucket, key, query=query,
|
||||
body='A' * body_size[i])
|
||||
etags.append(headers['etag'])
|
||||
xml = self._gen_comp_xml(etags)
|
||||
|
||||
query = 'uploadId=%s' % upload_id
|
||||
status, headers, body = \
|
||||
self.conn.make_request('POST', bucket, key, body=xml,
|
||||
query=query)
|
||||
self.assertEqual(get_error_code(body), 'EntityTooSmall')
|
||||
|
||||
# multi parts with all parts except last part more than min segment
|
||||
# size
|
||||
query = 'uploads'
|
||||
status, headers, body = \
|
||||
self.conn.make_request('POST', bucket, key, query=query)
|
||||
elem = fromstring(body, 'InitiateMultipartUploadResult')
|
||||
upload_id = elem.find('UploadId').text
|
||||
|
||||
etags = []
|
||||
body_size = [MIN_SEGMENT_SIZE, MIN_SEGMENT_SIZE, 2]
|
||||
for i in xrange(1, 3):
|
||||
query = 'partNumber=%s&uploadId=%s' % (i, upload_id)
|
||||
status, headers, body = \
|
||||
self.conn.make_request('PUT', bucket, key, query=query,
|
||||
body='A' * body_size[i])
|
||||
etags.append(headers['etag'])
|
||||
xml = self._gen_comp_xml(etags)
|
||||
|
||||
query = 'uploadId=%s' % upload_id
|
||||
status, headers, body = \
|
||||
self.conn.make_request('POST', bucket, key, body=xml,
|
||||
query=query)
|
||||
self.assertEqual(status, 200)
|
||||
|
||||
def test_complete_upload_with_fewer_etags(self):
|
||||
bucket = 'bucket'
|
||||
key = 'obj'
|
||||
|
@ -150,3 +150,6 @@ class FakeSwift(object):
|
||||
headers.update({key: value})
|
||||
|
||||
self._responses[(method, path)] = (response_class, headers, body)
|
||||
|
||||
def clear_calls(self):
|
||||
del self._calls[:]
|
||||
|
@ -72,6 +72,8 @@ class TestSwift3MultiUpload(Swift3TestCase):
|
||||
self.last_modified = 'Fri, 01 Apr 2014 12:00:00 GMT'
|
||||
put_headers = {'etag': self.etag, 'last-modified': self.last_modified}
|
||||
|
||||
CONF.min_segment_size = 1
|
||||
|
||||
objects = map(lambda item: {'name': item[0], 'last_modified': item[1],
|
||||
'hash': item[2], 'bytes': item[3]},
|
||||
objects_template)
|
||||
@ -673,6 +675,24 @@ class TestSwift3MultiUpload(Swift3TestCase):
|
||||
status, headers, body = self.call_swift3(req)
|
||||
self.assertEqual(status.split()[0], '400')
|
||||
self.assertEqual(self._get_error_code(body), 'EntityTooSmall')
|
||||
self.assertEqual(self._get_error_message(body), msg)
|
||||
|
||||
self.swift.clear_calls()
|
||||
CONF.min_segment_size = 5242880
|
||||
req = Request.blank(
|
||||
'/bucket/object?uploadId=X',
|
||||
environ={'REQUEST_METHOD': 'POST'},
|
||||
headers={'Authorization': 'AWS test:tester:hmac',
|
||||
'Date': self.get_date_header(), },
|
||||
body=xml)
|
||||
|
||||
status, headers, body = self.call_swift3(req)
|
||||
self.assertEqual(status.split()[0], '400')
|
||||
self.assertEqual(self._get_error_code(body), 'EntityTooSmall')
|
||||
self.assertEqual(self._get_error_message(body),
|
||||
'Your proposed upload is smaller than the minimum '
|
||||
'allowed object size.')
|
||||
self.assertNotIn('PUT', [method for method, _ in self.swift.calls])
|
||||
|
||||
def test_object_multipart_upload_complete_single_zero_length_segment(self):
|
||||
segment_bucket = '/v1/AUTH_test/empty-bucket+segments'
|
||||
@ -779,6 +799,69 @@ class TestSwift3MultiUpload(Swift3TestCase):
|
||||
'format=json&prefix=object/X/'),
|
||||
])
|
||||
|
||||
def test_object_multipart_upload_complete_zero_length_final_segment(self):
|
||||
segment_bucket = '/v1/AUTH_test/bucket+segments'
|
||||
|
||||
object_list = [{
|
||||
'name': 'object/X/1',
|
||||
'last_modified': self.last_modified,
|
||||
'hash': 'some hash',
|
||||
'bytes': '100',
|
||||
}, {
|
||||
'name': 'object/X/2',
|
||||
'last_modified': self.last_modified,
|
||||
'hash': 'some other hash',
|
||||
'bytes': '1',
|
||||
}, {
|
||||
'name': 'object/X/3',
|
||||
'last_modified': self.last_modified,
|
||||
'hash': 'd41d8cd98f00b204e9800998ecf8427e',
|
||||
'bytes': '0',
|
||||
}]
|
||||
|
||||
self.swift.register('GET', segment_bucket, swob.HTTPOk, {},
|
||||
json.dumps(object_list))
|
||||
self.swift.register('HEAD', '/v1/AUTH_test/bucket',
|
||||
swob.HTTPNoContent, {}, None)
|
||||
self.swift.register('HEAD', segment_bucket + '/object/X',
|
||||
swob.HTTPOk, {'x-object-meta-foo': 'bar',
|
||||
'content-type': 'baz/quux'}, None)
|
||||
self.swift.register('DELETE', segment_bucket + '/object/X/3',
|
||||
swob.HTTPNoContent, {}, None)
|
||||
|
||||
xml = '<CompleteMultipartUpload>' \
|
||||
'<Part>' \
|
||||
'<PartNumber>1</PartNumber>' \
|
||||
'<ETag>some hash</ETag>' \
|
||||
'</Part>' \
|
||||
'<Part>' \
|
||||
'<PartNumber>2</PartNumber>' \
|
||||
'<ETag>some other hash</ETag>' \
|
||||
'</Part>' \
|
||||
'<Part>' \
|
||||
'<PartNumber>3</PartNumber>' \
|
||||
'<ETag>d41d8cd98f00b204e9800998ecf8427e</ETag>' \
|
||||
'</Part>' \
|
||||
'</CompleteMultipartUpload>'
|
||||
|
||||
req = Request.blank('/bucket/object?uploadId=X',
|
||||
environ={'REQUEST_METHOD': 'POST'},
|
||||
headers={'Authorization': 'AWS test:tester:hmac',
|
||||
'Date': self.get_date_header(), },
|
||||
body=xml)
|
||||
status, headers, body = self.call_swift3(req)
|
||||
self.assertEqual(status.split()[0], '200')
|
||||
|
||||
self.assertEqual(self.swift.calls, [
|
||||
('HEAD', '/v1/AUTH_test/bucket'),
|
||||
('HEAD', '/v1/AUTH_test/bucket+segments/object/X'),
|
||||
('GET', '/v1/AUTH_test/bucket+segments?delimiter=/&'
|
||||
'format=json&prefix=object/X/'),
|
||||
('PUT', '/v1/AUTH_test/bucket/object?multipart-manifest=put'),
|
||||
('DELETE', '/v1/AUTH_test/bucket+segments/object/X/3'),
|
||||
('DELETE', '/v1/AUTH_test/bucket+segments/object/X'),
|
||||
])
|
||||
|
||||
@s3acl(s3acl_only=True)
|
||||
def test_object_multipart_upload_complete_s3acl(self):
|
||||
acl_headers = encode_acl('object', ACLPublicRead(Owner('test:tester',
|
||||
|
Loading…
x
Reference in New Issue
Block a user