multi_upload: preliminary support for S3 multi part upload
This patch adds a basic support for S3 Multipart Upload APIs based on Swift3 static large objects. The s3multi middleware is no longer necessary. There are still many TODO items. They are commented in the source code. Change-Id: Icda01dc31de43e6fe36144921fa1bd276b76e5ea
This commit is contained in:
parent
151031372c
commit
f7e6114157
@ -49,6 +49,10 @@ use = egg:swift#bulk
|
||||
[filter:slo]
|
||||
use = egg:swift#slo
|
||||
|
||||
# AWS S3 document says that each part must be at least 5 MB in a multipart
|
||||
# upload, except the last part.
|
||||
min_segment_size = 5242880
|
||||
|
||||
[filter:dlo]
|
||||
use = egg:swift#dlo
|
||||
|
||||
|
@ -13,9 +13,64 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
Implementation of S3 Multipart Upload.
|
||||
|
||||
This module implements S3 Multipart Upload APIs with the Swift SLO feature.
|
||||
The following explains how swift3 uses swift container and objects to store S3
|
||||
upload information:
|
||||
|
||||
- [bucket]+segments
|
||||
|
||||
A container to store upload information. [bucket] is the original bucket
|
||||
where multipart upload is initiated.
|
||||
|
||||
- [bucket]+segments/[upload_id]
|
||||
|
||||
A object of the ongoing upload id. The object is empty and used for
|
||||
checking the target upload status. If the object exists, it means that the
|
||||
upload is initiated but not either completed or aborted.
|
||||
|
||||
|
||||
- [bucket]+segments/[upload_id]/1
|
||||
[bucket]+segments/[upload_id]/2
|
||||
[bucket]+segments/[upload_id]/3
|
||||
.
|
||||
.
|
||||
|
||||
Uploaded part objects. Those objects are directly used as segments of Swift
|
||||
Static Large Object.
|
||||
"""
|
||||
|
||||
from simplejson import loads, dumps
|
||||
import os
|
||||
|
||||
from swift.common.utils import split_path
|
||||
|
||||
from swift3.controllers.base import Controller, bucket_operation, \
|
||||
object_operation
|
||||
from swift3.response import InvalidRequest
|
||||
from swift3.response import InvalidArgument, ErrorResponse, MalformedXML, \
|
||||
InvalidPart, BucketAlreadyExists, EntityTooSmall, InvalidPartOrder, \
|
||||
InvalidRequest, HTTPOk, HTTPNoContent, NoSuchKey, NoSuchUpload
|
||||
from swift3.exception import BadSwiftRequest
|
||||
from swift3.utils import LOGGER, unique_id
|
||||
from swift3.etree import Element, SubElement, fromstring, tostring, \
|
||||
XMLSyntaxError, DocumentInvalid
|
||||
|
||||
DEFAULT_MAX_PARTS = 1000
|
||||
DEFAULT_MAX_UPLOADS = 1000
|
||||
|
||||
MAX_COMPLETE_UPLOAD_BODY_SIZE = 2048 * 1024
|
||||
|
||||
|
||||
def _check_upload_info(req, app, upload_id):
|
||||
container = req.container_name + '+segments'
|
||||
obj = '%s/%s' % (req.object_name, upload_id)
|
||||
|
||||
try:
|
||||
req.get_response(app, 'HEAD', container=container, obj=obj)
|
||||
except NoSuchKey:
|
||||
raise NoSuchUpload(upload_id=upload_id)
|
||||
|
||||
|
||||
class PartController(Controller):
|
||||
@ -32,8 +87,32 @@ class PartController(Controller):
|
||||
"""
|
||||
Handles Upload Part and Upload Part Copy.
|
||||
"""
|
||||
# Pass it through, the s3multi upload helper will handle it.
|
||||
return req.get_response(self.app)
|
||||
if 'uploadId' not in req.params:
|
||||
raise InvalidArgument('ResourceType', 'partNumber',
|
||||
'Unexpected query string parameter')
|
||||
|
||||
upload_id = req.params['uploadId']
|
||||
|
||||
try:
|
||||
# TODO: check the range of partNumber
|
||||
part_number = int(req.params['partNumber'])
|
||||
except Exception:
|
||||
err_msg = 'Part number must be an integer'
|
||||
raise InvalidArgument('partNumber', req.params['partNumber'],
|
||||
err_msg)
|
||||
|
||||
_check_upload_info(req, self.app, upload_id)
|
||||
|
||||
req.container_name += '+segments'
|
||||
req.object_name = '%s/%s/%d' % (req.object_name, upload_id,
|
||||
part_number)
|
||||
|
||||
resp = req.get_response(self.app)
|
||||
|
||||
# TODO: set xml body for copy requests.
|
||||
|
||||
resp.status = 200
|
||||
return resp
|
||||
|
||||
|
||||
class UploadsController(Controller):
|
||||
@ -52,16 +131,91 @@ class UploadsController(Controller):
|
||||
"""
|
||||
Handles List Multipart Uploads
|
||||
"""
|
||||
# Pass it through, the s3multi upload helper will handle it.
|
||||
return req.get_response(self.app)
|
||||
# TODO: add support for prefix, key-marker, upload-id-marker, and
|
||||
# max-uploads queries.
|
||||
query = {
|
||||
'format': 'json',
|
||||
}
|
||||
container = req.container_name + '+segments'
|
||||
resp = req.get_response(self.app, container=container, query=query)
|
||||
objects = loads(resp.body)
|
||||
|
||||
uploads = []
|
||||
for o in objects:
|
||||
obj, upid = split_path('/' + o['name'], 1, 2, True)
|
||||
if '/' in upid:
|
||||
# This is a part object.
|
||||
continue
|
||||
|
||||
uploads.append(
|
||||
{'key': obj,
|
||||
'upload_id': upid,
|
||||
'last_modified': o['last_modified']}
|
||||
)
|
||||
|
||||
nextkeymarker = ''
|
||||
nextuploadmarker = ''
|
||||
if len(uploads) > 1:
|
||||
nextuploadmarker = uploads[-1]['upload_id']
|
||||
nextkeymarker = uploads[-1]['key']
|
||||
|
||||
result_elem = Element('ListMultipartUploadsResult')
|
||||
SubElement(result_elem, 'Bucket').text = req.container_name
|
||||
SubElement(result_elem, 'KeyMarker').text = ''
|
||||
SubElement(result_elem, 'UploadIdMarker').text = ''
|
||||
SubElement(result_elem, 'NextKeyMarker').text = nextkeymarker
|
||||
SubElement(result_elem, 'NextUploadIdMarker').text = nextuploadmarker
|
||||
|
||||
SubElement(result_elem, 'MaxUploads').text = str(DEFAULT_MAX_UPLOADS)
|
||||
# TODO: add support for EncodingType
|
||||
SubElement(result_elem, 'IsTruncated').text = 'false'
|
||||
|
||||
# TODO: don't show uploads which are initiated before this bucket is
|
||||
# created.
|
||||
for u in uploads:
|
||||
upload_elem = SubElement(result_elem, 'Upload')
|
||||
SubElement(upload_elem, 'Key').text = u['key']
|
||||
SubElement(upload_elem, 'UploadId').text = u['upload_id']
|
||||
initiator_elem = SubElement(upload_elem, 'Initiator')
|
||||
SubElement(initiator_elem, 'ID').text = req.user_id
|
||||
SubElement(initiator_elem, 'DisplayName').text = req.user_id
|
||||
owner_elem = SubElement(upload_elem, 'Owner')
|
||||
SubElement(owner_elem, 'ID').text = req.user_id
|
||||
SubElement(owner_elem, 'DisplayName').text = req.user_id
|
||||
SubElement(upload_elem, 'StorageClass').text = 'STANDARD'
|
||||
SubElement(upload_elem, 'Initiated').text = \
|
||||
u['last_modified'][:-3] + 'Z'
|
||||
|
||||
body = tostring(result_elem)
|
||||
|
||||
return HTTPOk(body=body, content_type='application/xml')
|
||||
|
||||
@object_operation
|
||||
def POST(self, req):
|
||||
"""
|
||||
Handles Initiate Multipart Upload.
|
||||
"""
|
||||
# Pass it through, the s3multi upload helper will handle it.
|
||||
return req.get_response(self.app)
|
||||
# Create a unique S3 upload id from UUID to avoid duplicates.
|
||||
upload_id = unique_id()
|
||||
|
||||
container = req.container_name + '+segments'
|
||||
try:
|
||||
req.get_response(self.app, 'PUT', container, '')
|
||||
except BucketAlreadyExists:
|
||||
pass
|
||||
|
||||
obj = '%s/%s' % (req.object_name, upload_id)
|
||||
|
||||
req.get_response(self.app, 'PUT', container, obj, body='')
|
||||
|
||||
result_elem = Element('InitiateMultipartUploadResult')
|
||||
SubElement(result_elem, 'Bucket').text = req.container_name
|
||||
SubElement(result_elem, 'Key').text = req.object_name
|
||||
SubElement(result_elem, 'UploadId').text = upload_id
|
||||
|
||||
body = tostring(result_elem)
|
||||
|
||||
return HTTPOk(body=body, content_type='application/xml')
|
||||
|
||||
|
||||
class UploadController(Controller):
|
||||
@ -79,21 +233,179 @@ class UploadController(Controller):
|
||||
"""
|
||||
Handles List Parts.
|
||||
"""
|
||||
# Pass it through, the s3multi upload helper will handle it.
|
||||
return req.get_response(self.app)
|
||||
upload_id = req.params['uploadId']
|
||||
_check_upload_info(req, self.app, upload_id)
|
||||
|
||||
part_num_marker = 0
|
||||
|
||||
# TODO: add support for max-parts and part-number-marker queries.
|
||||
query = {
|
||||
'format': 'json',
|
||||
'prefix': '%s/%s/' % (req.object_name, upload_id),
|
||||
'delimiter': '/'
|
||||
}
|
||||
|
||||
container = req.container_name + '+segments'
|
||||
resp = req.get_response(self.app, container=container, obj='',
|
||||
query=query)
|
||||
objects = loads(resp.body)
|
||||
|
||||
last_part = 0
|
||||
|
||||
# pylint: disable-msg=E1103
|
||||
objects.sort(key=lambda o: int(o['name'].split('/')[-1]))
|
||||
|
||||
if len(objects) > 0:
|
||||
o = objects[-1]
|
||||
last_part = os.path.basename(o['name'])
|
||||
|
||||
result_elem = Element('ListPartsResult')
|
||||
SubElement(result_elem, 'Bucket').text = req.container_name
|
||||
SubElement(result_elem, 'Key').text = req.object_name
|
||||
SubElement(result_elem, 'UploadId').text = upload_id
|
||||
|
||||
initiator_elem = SubElement(result_elem, 'Initiator')
|
||||
SubElement(initiator_elem, 'ID').text = req.user_id
|
||||
SubElement(initiator_elem, 'DisplayName').text = req.user_id
|
||||
owner_elem = SubElement(result_elem, 'Owner')
|
||||
SubElement(owner_elem, 'ID').text = req.user_id
|
||||
SubElement(owner_elem, 'DisplayName').text = req.user_id
|
||||
|
||||
SubElement(result_elem, 'StorageClass').text = 'STANDARD'
|
||||
SubElement(result_elem, 'PartNumberMarker').text = str(part_num_marker)
|
||||
SubElement(result_elem, 'NextPartNumberMarker').text = str(last_part)
|
||||
SubElement(result_elem, 'MaxParts').text = str(DEFAULT_MAX_PARTS)
|
||||
# TODO: add support for EncodingType
|
||||
SubElement(result_elem, 'IsTruncated').text = 'false'
|
||||
|
||||
for i in objects:
|
||||
part_elem = SubElement(result_elem, 'Part')
|
||||
SubElement(part_elem, 'PartNumber').text = i['name'].split('/')[-1]
|
||||
SubElement(part_elem, 'LastModified').text = \
|
||||
i['last_modified'][:-3] + 'Z'
|
||||
SubElement(part_elem, 'ETag').text = i['hash']
|
||||
SubElement(part_elem, 'Size').text = str(i['bytes'])
|
||||
|
||||
body = tostring(result_elem)
|
||||
|
||||
return HTTPOk(body=body, content_type='application/xml')
|
||||
|
||||
@object_operation
|
||||
def DELETE(self, req):
|
||||
"""
|
||||
Handles Abort Multipart Upload.
|
||||
"""
|
||||
# Pass it through, the s3multi upload helper will handle it.
|
||||
return req.get_response(self.app)
|
||||
upload_id = req.params['uploadId']
|
||||
_check_upload_info(req, self.app, upload_id)
|
||||
|
||||
# First check to see if this multi-part upload was already
|
||||
# completed. Look in the primary container, if the object exists,
|
||||
# then it was completed and we return an error here.
|
||||
container = req.container_name + '+segments'
|
||||
obj = '%s/%s' % (req.object_name, upload_id)
|
||||
req.get_response(self.app, container=container, obj=obj)
|
||||
|
||||
# The completed object was not found so this
|
||||
# must be a multipart upload abort.
|
||||
# We must delete any uploaded segments for this UploadID and then
|
||||
# delete the object in the main container as well
|
||||
query = {
|
||||
'format': 'json',
|
||||
'prefix': '%s/%s/' % (req.object_name, upload_id),
|
||||
'delimiter': '/',
|
||||
}
|
||||
|
||||
resp = req.get_response(self.app, 'GET', container, '', query=query)
|
||||
|
||||
# Iterate over the segment objects and delete them individually
|
||||
objects = loads(resp.body)
|
||||
for o in objects:
|
||||
container = req.container_name + '+segments'
|
||||
req.get_response(self.app, container=container, obj=o['name'])
|
||||
|
||||
return HTTPNoContent()
|
||||
|
||||
@object_operation
|
||||
def POST(self, req):
|
||||
"""
|
||||
Handles Complete Multipart Upload.
|
||||
"""
|
||||
# Pass it through, the s3multi upload helper will handle it.
|
||||
return req.get_response(self.app)
|
||||
upload_id = req.params['uploadId']
|
||||
_check_upload_info(req, self.app, upload_id)
|
||||
|
||||
# Query for the objects in the segments area to make sure it completed
|
||||
query = {
|
||||
'format': 'json',
|
||||
'prefix': '%s/%s/' % (req.object_name, upload_id),
|
||||
'delimiter': '/'
|
||||
}
|
||||
|
||||
container = req.container_name + '+segments'
|
||||
resp = req.get_response(self.app, 'GET', container, '', query=query)
|
||||
objinfo = loads(resp.body)
|
||||
objtable = dict((o['name'],
|
||||
{'path': '/'.join(['', container, o['name']]),
|
||||
'etag': o['hash'],
|
||||
'size_bytes': o['bytes']}) for o in objinfo)
|
||||
|
||||
manifest = []
|
||||
previous_number = 0
|
||||
try:
|
||||
xml = req.xml(MAX_COMPLETE_UPLOAD_BODY_SIZE)
|
||||
complete_elem = fromstring(xml, 'CompleteMultipartUpload')
|
||||
for part_elem in complete_elem.iterchildren('Part'):
|
||||
part_number = int(part_elem.find('./PartNumber').text)
|
||||
|
||||
if part_number <= previous_number:
|
||||
raise InvalidPartOrder(upload_id=upload_id)
|
||||
previous_number = part_number
|
||||
|
||||
etag = part_elem.find('./ETag').text
|
||||
if len(etag) >= 2 and etag[0] == '"' and etag[-1] == '"':
|
||||
# strip double quotes
|
||||
etag = etag[1:-1]
|
||||
|
||||
info = objtable.get("%s/%s/%s" % (req.object_name, upload_id,
|
||||
part_number))
|
||||
if info is None or info['etag'] != etag:
|
||||
raise InvalidPart(upload_id=upload_id,
|
||||
part_number=part_number)
|
||||
|
||||
manifest.append(info)
|
||||
except (XMLSyntaxError, DocumentInvalid):
|
||||
raise MalformedXML()
|
||||
except ErrorResponse:
|
||||
raise
|
||||
except Exception as e:
|
||||
LOGGER.error(e)
|
||||
raise
|
||||
|
||||
try:
|
||||
# TODO: add support for versioning
|
||||
resp = req.get_response(self.app, 'PUT', body=dumps(manifest),
|
||||
query={'multipart-manifest': 'put'})
|
||||
except BadSwiftRequest as e:
|
||||
msg = str(e)
|
||||
if msg.startswith('Each segment, except the last, '
|
||||
'must be at least '):
|
||||
# FIXME: AWS S3 allows a smaller object than 5 MB if there is
|
||||
# only one part. Use a COPY request to copy the part object
|
||||
# from the segments container instead.
|
||||
raise EntityTooSmall(msg)
|
||||
else:
|
||||
raise
|
||||
|
||||
obj = '%s/%s' % (req.object_name, upload_id)
|
||||
req.get_response(self.app, 'DELETE', container, obj)
|
||||
|
||||
result_elem = Element('CompleteMultipartUploadResult')
|
||||
SubElement(result_elem, 'Location').text = req.host_url + req.path
|
||||
SubElement(result_elem, 'Bucket').text = req.container_name
|
||||
SubElement(result_elem, 'Key').text = req.object_name
|
||||
SubElement(result_elem, 'ETag').text = resp.etag
|
||||
|
||||
resp.body = tostring(result_elem)
|
||||
resp.status = 200
|
||||
resp.content_type = "application/xml"
|
||||
|
||||
return resp
|
||||
|
@ -332,7 +332,8 @@ class Request(swob.Request):
|
||||
def is_object_request(self):
|
||||
return self.container_name and self.object_name
|
||||
|
||||
def to_swift_req(self, method, query=None):
|
||||
def to_swift_req(self, method, container, obj, query=None,
|
||||
body=None):
|
||||
"""
|
||||
Create a Swift request based on this request's environment.
|
||||
"""
|
||||
@ -352,11 +353,10 @@ class Request(swob.Request):
|
||||
env['REQUEST_METHOD'] = method
|
||||
env['HTTP_X_AUTH_TOKEN'] = self.token
|
||||
|
||||
if self.is_object_request:
|
||||
path = '/v1/%s/%s/%s' % (self.access_key, self.container_name,
|
||||
self.object_name)
|
||||
elif self.is_bucket_request:
|
||||
path = '/v1/%s/%s' % (self.access_key, self.container_name)
|
||||
if obj:
|
||||
path = '/v1/%s/%s/%s' % (self.access_key, container, obj)
|
||||
elif container:
|
||||
path = '/v1/%s/%s' % (self.access_key, container)
|
||||
else:
|
||||
path = '/v1/%s' % (self.access_key)
|
||||
env['PATH_INFO'] = path
|
||||
@ -372,20 +372,20 @@ class Request(swob.Request):
|
||||
query_string = '&'.join(params)
|
||||
env['QUERY_STRING'] = query_string
|
||||
|
||||
return swob.Request.blank(quote(path), environ=env)
|
||||
return swob.Request.blank(quote(path), environ=env, body=body)
|
||||
|
||||
def _swift_success_codes(self, method):
|
||||
def _swift_success_codes(self, method, container, obj):
|
||||
"""
|
||||
Returns a list of expected success codes from Swift.
|
||||
"""
|
||||
if self.is_service_request:
|
||||
if not container:
|
||||
# Swift account access.
|
||||
code_map = {
|
||||
'GET': [
|
||||
HTTP_OK,
|
||||
],
|
||||
}
|
||||
elif self.is_bucket_request:
|
||||
elif not obj:
|
||||
# Swift container access.
|
||||
code_map = {
|
||||
'HEAD': [
|
||||
@ -428,34 +428,34 @@ class Request(swob.Request):
|
||||
|
||||
return code_map[method]
|
||||
|
||||
def _swift_error_codes(self, method):
|
||||
def _swift_error_codes(self, method, container, obj):
|
||||
"""
|
||||
Returns a dict from expected Swift error codes to the corresponding S3
|
||||
error responses.
|
||||
"""
|
||||
if self.is_service_request:
|
||||
if not container:
|
||||
# Swift account access.
|
||||
code_map = {
|
||||
'GET': {
|
||||
},
|
||||
}
|
||||
elif self.is_bucket_request:
|
||||
elif not obj:
|
||||
# Swift container access.
|
||||
code_map = {
|
||||
'HEAD': {
|
||||
HTTP_NOT_FOUND: (NoSuchBucket, self.container_name),
|
||||
HTTP_NOT_FOUND: (NoSuchBucket, container),
|
||||
},
|
||||
'GET': {
|
||||
HTTP_NOT_FOUND: (NoSuchBucket, self.container_name),
|
||||
HTTP_NOT_FOUND: (NoSuchBucket, container),
|
||||
},
|
||||
'PUT': {
|
||||
HTTP_ACCEPTED: (BucketAlreadyExists, self.container_name),
|
||||
HTTP_ACCEPTED: (BucketAlreadyExists, container),
|
||||
},
|
||||
'POST': {
|
||||
HTTP_NOT_FOUND: (NoSuchBucket, self.container_name),
|
||||
HTTP_NOT_FOUND: (NoSuchBucket, container),
|
||||
},
|
||||
'DELETE': {
|
||||
HTTP_NOT_FOUND: (NoSuchBucket, self.container_name),
|
||||
HTTP_NOT_FOUND: (NoSuchBucket, container),
|
||||
HTTP_CONFLICT: BucketNotEmpty,
|
||||
},
|
||||
}
|
||||
@ -463,34 +463,41 @@ class Request(swob.Request):
|
||||
# Swift object access.
|
||||
code_map = {
|
||||
'HEAD': {
|
||||
HTTP_NOT_FOUND: (NoSuchKey, self.object_name),
|
||||
HTTP_NOT_FOUND: (NoSuchKey, obj),
|
||||
HTTP_PRECONDITION_FAILED: PreconditionFailed,
|
||||
},
|
||||
'GET': {
|
||||
HTTP_NOT_FOUND: (NoSuchKey, self.object_name),
|
||||
HTTP_NOT_FOUND: (NoSuchKey, obj),
|
||||
HTTP_PRECONDITION_FAILED: PreconditionFailed,
|
||||
HTTP_REQUESTED_RANGE_NOT_SATISFIABLE: InvalidRange,
|
||||
},
|
||||
'PUT': {
|
||||
HTTP_NOT_FOUND: (NoSuchBucket, self.container_name),
|
||||
HTTP_NOT_FOUND: (NoSuchBucket, container),
|
||||
HTTP_UNPROCESSABLE_ENTITY: InvalidDigest,
|
||||
HTTP_REQUEST_ENTITY_TOO_LARGE: EntityTooLarge,
|
||||
HTTP_LENGTH_REQUIRED: MissingContentLength,
|
||||
},
|
||||
'DELETE': {
|
||||
HTTP_NOT_FOUND: (NoSuchKey, self.object_name),
|
||||
HTTP_NOT_FOUND: (NoSuchKey, obj),
|
||||
},
|
||||
}
|
||||
|
||||
return code_map[method]
|
||||
|
||||
def get_response(self, app, method=None, query=None):
|
||||
def get_response(self, app, method=None, container=None, obj=None,
|
||||
body=None, query=None):
|
||||
"""
|
||||
Calls the application with this request's environment. Returns a
|
||||
Response object that wraps up the application's result.
|
||||
"""
|
||||
method = method or self.environ['REQUEST_METHOD']
|
||||
sw_req = self.to_swift_req(method=method, query=query)
|
||||
if container is None:
|
||||
container = self.container_name
|
||||
if obj is None:
|
||||
obj = self.object_name
|
||||
|
||||
sw_req = self.to_swift_req(method, container, obj, query=query,
|
||||
body=body)
|
||||
sw_resp = sw_req.get_response(app)
|
||||
resp = Response.from_swift_resp(sw_resp)
|
||||
status = resp.status_int # pylint: disable-msg=E1101
|
||||
@ -505,8 +512,8 @@ class Request(swob.Request):
|
||||
# tempauth
|
||||
self.user_id = self.access_key
|
||||
|
||||
success_codes = self._swift_success_codes(method)
|
||||
error_codes = self._swift_error_codes(method)
|
||||
success_codes = self._swift_success_codes(method, container, obj)
|
||||
error_codes = self._swift_error_codes(method, container, obj)
|
||||
|
||||
if status in success_codes:
|
||||
return resp
|
||||
|
@ -14,10 +14,14 @@
|
||||
# limitations under the License.
|
||||
|
||||
import unittest
|
||||
import simplejson as json
|
||||
from mock import patch
|
||||
|
||||
from swift.common import swob
|
||||
from swift.common.swob import Request
|
||||
|
||||
from swift3.test.unit import Swift3TestCase
|
||||
from swift3.etree import fromstring
|
||||
|
||||
|
||||
class TestSwift3MultiUpload(Swift3TestCase):
|
||||
@ -25,6 +29,38 @@ class TestSwift3MultiUpload(Swift3TestCase):
|
||||
def setUp(self):
|
||||
super(TestSwift3MultiUpload, self).setUp()
|
||||
|
||||
segment_bucket = '/v1/AUTH_test/bucket+segments'
|
||||
|
||||
self.swift.register('PUT',
|
||||
'/v1/AUTH_test/bucket+segments',
|
||||
swob.HTTPAccepted, {}, None)
|
||||
self.swift.register('GET', segment_bucket, swob.HTTPOk, {},
|
||||
json.dumps([{'name': 'object/X/1',
|
||||
'last_modified':
|
||||
'2014-05-07T19:47:54.592270',
|
||||
'hash': 'HASH',
|
||||
'bytes': 100},
|
||||
{'name': 'object/X/2',
|
||||
'last_modified':
|
||||
'2014-05-07T19:47:54.592270',
|
||||
'hash': 'HASH',
|
||||
'bytes': 100},
|
||||
]))
|
||||
self.swift.register('HEAD', segment_bucket + '/object/X',
|
||||
swob.HTTPOk, {}, None)
|
||||
self.swift.register('PUT', segment_bucket + '/object/X',
|
||||
swob.HTTPCreated, {}, None)
|
||||
self.swift.register('DELETE', segment_bucket + '/object/X',
|
||||
swob.HTTPNoContent, {}, None)
|
||||
self.swift.register('GET', segment_bucket + '/object/invalid',
|
||||
swob.HTTPNotFound, {}, None)
|
||||
self.swift.register('PUT', segment_bucket + '/object/X/1',
|
||||
swob.HTTPCreated, {}, None)
|
||||
self.swift.register('DELETE', segment_bucket + '/object/X/1',
|
||||
swob.HTTPNoContent, {}, None)
|
||||
self.swift.register('DELETE', segment_bucket + '/object/X/2',
|
||||
swob.HTTPNoContent, {}, None)
|
||||
|
||||
def test_bucket_upload_part(self):
|
||||
req = Request.blank('/bucket?partNumber=1&uploadId=x',
|
||||
environ={'REQUEST_METHOD': 'PUT'},
|
||||
@ -67,6 +103,96 @@ class TestSwift3MultiUpload(Swift3TestCase):
|
||||
status, headers, body = self.call_swift3(req)
|
||||
self.assertEquals(self._get_error_code(body), 'InvalidRequest')
|
||||
|
||||
def test_bucket_multipart_uploads_GET(self):
|
||||
req = Request.blank('/bucket/?uploads',
|
||||
environ={'REQUEST_METHOD': 'GET'},
|
||||
headers={'Authorization': 'AWS test:tester:hmac'})
|
||||
status, headers, body = self.call_swift3(req)
|
||||
fromstring(body, 'ListMultipartUploadsResult')
|
||||
self.assertEquals(status.split()[0], '200')
|
||||
|
||||
@patch('swift3.controllers.multi_upload.unique_id', lambda: 'X')
|
||||
def test_object_multipart_upload_initiate(self):
|
||||
req = Request.blank('/bucket/object?uploads',
|
||||
environ={'REQUEST_METHOD': 'POST'},
|
||||
headers={'Authorization':
|
||||
'AWS test:tester:hmac'})
|
||||
status, headers, body = self.call_swift3(req)
|
||||
fromstring(body, 'InitiateMultipartUploadResult')
|
||||
self.assertEquals(status.split()[0], '200')
|
||||
|
||||
def test_object_multipart_upload_complete_error(self):
|
||||
xml = 'malformed_XML'
|
||||
req = Request.blank('/bucket/object?uploadId=X',
|
||||
environ={'REQUEST_METHOD': 'POST'},
|
||||
headers={'Authorization': 'AWS test:tester:hmac'},
|
||||
body=xml)
|
||||
status, headers, body = self.call_swift3(req)
|
||||
self.assertEquals(self._get_error_code(body), 'MalformedXML')
|
||||
|
||||
def test_object_multipart_upload_complete(self):
|
||||
xml = '<CompleteMultipartUpload>' \
|
||||
'<Part>' \
|
||||
'<PartNumber>1</PartNumber>' \
|
||||
'<ETag>HASH</ETag>' \
|
||||
'</Part>' \
|
||||
'<Part>' \
|
||||
'<PartNumber>2</PartNumber>' \
|
||||
'<ETag>"HASH"</ETag>' \
|
||||
'</Part>' \
|
||||
'</CompleteMultipartUpload>'
|
||||
req = Request.blank('/bucket/object?uploadId=X',
|
||||
environ={'REQUEST_METHOD': 'POST'},
|
||||
headers={'Authorization': 'AWS test:tester:hmac'},
|
||||
body=xml)
|
||||
status, headers, body = self.call_swift3(req)
|
||||
fromstring(body, 'CompleteMultipartUploadResult')
|
||||
self.assertEquals(status.split()[0], '200')
|
||||
|
||||
def test_object_multipart_upload_abort_error(self):
|
||||
req = Request.blank('/bucket/object?uploadId=invalid',
|
||||
environ={'REQUEST_METHOD': 'DELETE'},
|
||||
headers={'Authorization': 'AWS test:tester:hmac'})
|
||||
status, headers, body = self.call_swift3(req)
|
||||
self.assertEquals(self._get_error_code(body), 'NoSuchUpload')
|
||||
|
||||
def test_object_multipart_upload_abort(self):
|
||||
req = Request.blank('/bucket/object?uploadId=X',
|
||||
environ={'REQUEST_METHOD': 'DELETE'},
|
||||
headers={'Authorization': 'AWS test:tester:hmac'})
|
||||
status, headers, body = self.call_swift3(req)
|
||||
self.assertEquals(status.split()[0], '204')
|
||||
|
||||
def test_object_upload_part_error(self):
|
||||
req = Request.blank('/bucket/object?partNumber=1',
|
||||
environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'Authorization': 'AWS test:tester:hmac'},
|
||||
body='part object')
|
||||
status, headers, body = self.call_swift3(req)
|
||||
self.assertEquals(self._get_error_code(body), 'InvalidArgument')
|
||||
|
||||
def test_object_upload_part(self):
|
||||
req = Request.blank('/bucket/object?partNumber=1&uploadId=X',
|
||||
environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'Authorization': 'AWS test:tester:hmac'},
|
||||
body='part object')
|
||||
status, headers, body = self.call_swift3(req)
|
||||
self.assertEquals(status.split()[0], '200')
|
||||
|
||||
def test_object_list_parts_error(self):
|
||||
req = Request.blank('/bucket/object?uploadId=invalid',
|
||||
environ={'REQUEST_METHOD': 'GET'},
|
||||
headers={'Authorization': 'AWS test:tester:hmac'})
|
||||
status, headers, body = self.call_swift3(req)
|
||||
self.assertEquals(self._get_error_code(body), 'NoSuchUpload')
|
||||
|
||||
def test_object_list_parts(self):
|
||||
req = Request.blank('/bucket/object?uploadId=X',
|
||||
environ={'REQUEST_METHOD': 'GET'},
|
||||
headers={'Authorization': 'AWS test:tester:hmac'})
|
||||
status, headers, body = self.call_swift3(req)
|
||||
fromstring(body, 'ListPartsResult')
|
||||
self.assertEquals(status.split()[0], '200')
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
@ -14,6 +14,8 @@
|
||||
# limitations under the License.
|
||||
|
||||
import re
|
||||
import uuid
|
||||
import base64
|
||||
|
||||
|
||||
from swift.common.utils import get_logger
|
||||
@ -29,3 +31,7 @@ def camel_to_snake(camel):
|
||||
|
||||
def snake_to_camel(snake):
|
||||
return snake.title().replace('_', '')
|
||||
|
||||
|
||||
def unique_id():
|
||||
return base64.urlsafe_b64encode(str(uuid.uuid4()))
|
||||
|
@ -3,5 +3,6 @@ sphinx
|
||||
nose
|
||||
openstack.nose_plugin
|
||||
coverage
|
||||
mock
|
||||
pylint
|
||||
python-openstackclient
|
||||
|
Loading…
x
Reference in New Issue
Block a user