Use lxml for processing XML

The lxml XML toolkit is a binding for the C libraries libxml2 and libxslt.  It
is compatible with ElementTree API, which is more pythonic than the DOM
interface.  In addition, lxml has attractive features like XML validation and
namespace mapping.

Replacing the current DOM interface with the lxml API will simplify Swift3 code
a lot and improve maintainability, I believe.

Change-Id: Ie2291f180421559ed3320a173d2f7eea81f459d9
This commit is contained in:
MORITA Kazutaka 2014-05-26 11:20:07 +09:00
parent 2e78cd0355
commit 1f87522631
3 changed files with 178 additions and 271 deletions

View File

@ -1 +1,2 @@
swift>=1.8
lxml

View File

@ -54,8 +54,7 @@ following for an SAIO setup::
from urllib import quote
import base64
from xml.sax.saxutils import escape as xml_escape
from xml.dom.minidom import parseString
from lxml.etree import fromstring, tostring, Element, SubElement
from simplejson import loads
import email.utils
@ -71,7 +70,9 @@ from swift.common.http import HTTP_OK, HTTP_CREATED, HTTP_ACCEPTED, \
HTTP_NO_CONTENT, HTTP_UNAUTHORIZED, HTTP_FORBIDDEN, HTTP_NOT_FOUND, \
HTTP_CONFLICT, HTTP_UNPROCESSABLE_ENTITY, is_success, \
HTTP_REQUEST_ENTITY_TOO_LARGE
from swift.common.middleware.acl import parse_acl, referrer_allowed
XMLNS_XSI = 'http://www.w3.org/2001/XMLSchema-instance'
MAX_BUCKET_LISTING = 1000
@ -136,128 +137,64 @@ def get_err_response(code):
(HTTPServiceUnavailable, 'Please reduce your request rate')}
resp, message = error_table[code]
body = '<?xml version="1.0" encoding="UTF-8"?>\r\n<Error>\r\n ' \
'<Code>%s</Code>\r\n <Message>%s</Message>\r\n</Error>\r\n' \
% (code, message)
elem = Element('Error')
SubElement(elem, 'Code').text = code
SubElement(elem, 'Message').text = message
body = tostring(elem, xml_declaration=True, encoding='UTF-8')
return resp(body=body, content_type='text/xml')
def add_canonical_user(parent, tag, user, nsmap=None):
"""
Create an element for cannonical user.
"""
elem = SubElement(parent, tag, nsmap=nsmap)
SubElement(elem, 'ID').text = user
SubElement(elem, 'DisplayName').text = user
return elem
def get_acl(account_name, headers):
"""
Attempts to construct an S3 ACL based on what is found in the swift headers
"""
acl = 'private' # default to private
elem = Element('AccessControlPolicy')
add_canonical_user(elem, 'Owner', account_name)
access_control_list = SubElement(elem, 'AccessControlList')
if 'x-container-read' in headers:
if headers['x-container-read'] == ".r:*" or\
".r:*," in headers['x-container-read'] or \
",*," in headers['x-container-read']:
acl = 'public-read'
if 'x-container-write' in headers:
if headers['x-container-write'] == ".r:*" or\
".r:*," in headers['x-container-write'] or \
",*," in headers['x-container-write']:
if acl == 'public-read':
acl = 'public-read-write'
else:
acl = 'public-write'
# grant FULL_CONTROL to myself by default
grant = SubElement(access_control_list, 'Grant')
grantee = add_canonical_user(grant, 'Grantee', account_name,
nsmap={'xsi': XMLNS_XSI})
grantee.set('{%s}type' % XMLNS_XSI, 'CanonicalUser')
SubElement(grant, 'Permission').text = 'FULL_CONTROL'
referrers, _ = parse_acl(headers.get('x-container-read'))
if referrer_allowed('unknown', referrers):
# grant public-read access
grant = SubElement(access_control_list, 'Grant')
grantee = SubElement(grant, 'Grantee', nsmap={'xsi': XMLNS_XSI})
grantee.set('{%s}type' % XMLNS_XSI, 'Group')
SubElement(grantee, 'URI').text = \
'http://acs.amazonaws.com/groups/global/AllUsers'
SubElement(grant, 'Permission').text = 'READ'
referrers, _ = parse_acl(headers.get('x-container-write'))
if referrer_allowed('unknown', referrers):
# grant public-write access
grant = SubElement(access_control_list, 'Grant')
grantee = SubElement(grant, 'Grantee', nsmap={'xsi': XMLNS_XSI})
grantee.set('{%s}type' % XMLNS_XSI, 'Group')
SubElement(grantee, 'URI').text = \
'http://acs.amazonaws.com/groups/global/AllUsers'
SubElement(grant, 'Permission').text = 'WRITE'
body = tostring(elem, xml_declaration=True, encoding='UTF-8')
if acl == 'private':
body = ('<AccessControlPolicy>'
'<Owner>'
'<ID>%s</ID>'
'<DisplayName>%s</DisplayName>'
'</Owner>'
'<AccessControlList>'
'<Grant>'
'<Grantee xmlns:xsi="http://www.w3.org/2001/'
'XMLSchema-instance" xsi:type="CanonicalUser">'
'<ID>%s</ID>'
'<DisplayName>%s</DisplayName>'
'</Grantee>'
'<Permission>FULL_CONTROL</Permission>'
'</Grant>'
'</AccessControlList>'
'</AccessControlPolicy>' %
(account_name, account_name, account_name, account_name))
elif acl == 'public-read':
body = ('<AccessControlPolicy>'
'<Owner>'
'<ID>%s</ID>'
'<DisplayName>%s</DisplayName>'
'</Owner>'
'<AccessControlList>'
'<Grant>'
'<Grantee xmlns:xsi="http://www.w3.org/2001/'
'XMLSchema-instance" xsi:type="CanonicalUser">'
'<ID>%s</ID>'
'<DisplayName>%s</DisplayName>'
'</Grantee>'
'<Permission>FULL_CONTROL</Permission>'
'</Grant>'
'<Grant>'
'<Grantee xmlns:xsi="http://www.w3.org/2001/'
'XMLSchema-instance" xsi:type="Group">'
'<URI>http://acs.amazonaws.com/groups/global/AllUsers</URI>'
'</Grantee>'
'<Permission>READ</Permission>'
'</Grant>'
'</AccessControlList>'
'</AccessControlPolicy>' %
(account_name, account_name, account_name, account_name))
elif acl == 'public-read-write':
body = ('<AccessControlPolicy>'
'<Owner>'
'<ID>%s</ID>'
'<DisplayName>%s</DisplayName>'
'</Owner>'
'<AccessControlList>'
'<Grant>'
'<Grantee xmlns:xsi="http://www.w3.org/2001/'
'XMLSchema-instance" xsi:type="CanonicalUser">'
'<ID>%s</ID>'
'<DisplayName>%s</DisplayName>'
'</Grantee>'
'<Permission>FULL_CONTROL</Permission>'
'</Grant>'
'<Grant>'
'<Grantee xmlns:xsi="http://www.w3.org/2001/'
'XMLSchema-instance" xsi:type="Group">'
'<URI>http://acs.amazonaws.com/groups/global/AllUsers</URI>'
'</Grantee>'
'<Permission>READ</Permission>'
'</Grant>'
'</AccessControlList>'
'<AccessControlList>'
'<Grant>'
'<Grantee xmlns:xsi="http://www.w3.org/2001/'
'XMLSchema-instance" xsi:type="Group">'
'<URI>http://acs.amazonaws.com/groups/global/AllUsers</URI>'
'</Grantee>'
'<Permission>WRITE</Permission>'
'</Grant>'
'</AccessControlList>'
'</AccessControlPolicy>' %
(account_name, account_name, account_name, account_name))
else:
body = ('<AccessControlPolicy>'
'<Owner>'
'<ID>%s</ID>'
'<DisplayName>%s</DisplayName>'
'</Owner>'
'<AccessControlList>'
'<Grant>'
'<Grantee xmlns:xsi="http://www.w3.org/2001/'
'XMLSchema-instance" xsi:type="CanonicalUser">'
'<ID>%s</ID>'
'<DisplayName>%s</DisplayName>'
'</Grantee>'
'<Permission>FULL_CONTROL</Permission>'
'</Grant>'
'</AccessControlList>'
'</AccessControlPolicy>' %
(account_name, account_name, account_name, account_name))
return HTTPOk(body=body, content_type="text/plain")
@ -322,13 +259,11 @@ def swift_acl_translate(acl, group='', user='', xml=False):
['HTTP_X_CONTAINER_READ', '.']]
if xml:
# We are working with XML and need to parse it
dom = parseString(acl)
elem = fromstring(acl)
acl = 'unknown'
for grant in dom.getElementsByTagName('Grant'):
permission = grant.getElementsByTagName('Permission')[0]\
.firstChild.data
grantee = grant.getElementsByTagName('Grantee')[0]\
.getAttributeNode('xsi:type').nodeValue
for grant in elem.findall('./AccessControlList/Grant'):
permission = grant.find('./Permission').text
grantee = grant.find('./Grantee').get('{%s}type' % XMLNS_XSI)
if permission == "FULL_CONTROL" and grantee == 'CanonicalUser' and\
acl != 'public-read' and acl != 'public-read-write':
acl = 'private'
@ -415,14 +350,16 @@ class ServiceController(Controller):
containers = loads(resp.body)
# we don't keep the creation time of a backet (s3cmd doesn't
# work without that) so we use something bogus.
body = '<?xml version="1.0" encoding="UTF-8"?>' \
'<ListAllMyBucketsResult ' \
'xmlns="http://doc.s3.amazonaws.com/2006-03-01">' \
'<Buckets>%s</Buckets>' \
'</ListAllMyBucketsResult>' \
% ("".join(['<Bucket><Name>%s</Name><CreationDate>'
'2009-02-03T16:45:09.000Z</CreationDate></Bucket>'
% xml_escape(i['name']) for i in containers]))
elem = Element('ListAllMyBucketsResult')
buckets = SubElement(elem, 'Buckets')
for c in containers:
bucket = SubElement(buckets, 'Bucket')
SubElement(bucket, 'Name').text = c['name']
SubElement(bucket, 'CreationDate').text = \
'2009-02-03T16:45:09.000Z'
body = tostring(elem, xml_declaration=True, encoding='UTF-8')
return HTTPOk(content_type='application/xml', body=body)
@ -481,37 +418,36 @@ class BucketController(Controller):
return get_err_response('InternalError')
objects = loads(resp.body)
body = ('<?xml version="1.0" encoding="UTF-8"?>'
'<ListBucketResult '
'xmlns="http://s3.amazonaws.com/doc/2006-03-01">'
'<Prefix>%s</Prefix>'
'<Marker>%s</Marker>'
'<Delimiter>%s</Delimiter>'
'<IsTruncated>%s</IsTruncated>'
'<MaxKeys>%s</MaxKeys>'
'<Name>%s</Name>'
'%s'
'%s'
'</ListBucketResult>' %
(
xml_escape(req.params.get('prefix', '')),
xml_escape(req.params.get('marker', '')),
xml_escape(req.params.get('delimiter', '')),
'true' if max_keys > 0 and len(objects) == (max_keys + 1) else
'false',
max_keys,
xml_escape(self.container_name),
"".join(['<Contents><Key>%s</Key><LastModified>%sZ</LastModif'
'ied><ETag>%s</ETag><Size>%s</Size><StorageClass>STA'
'NDARD</StorageClass><Owner><ID>%s</ID><DisplayName>'
'%s</DisplayName></Owner></Contents>' %
(xml_escape(i['name']), i['last_modified'],
i['hash'],
i['bytes'], self.account_name, self.account_name)
for i in objects[:max_keys] if 'subdir' not in i]),
"".join(['<CommonPrefixes><Prefix>%s</Prefix></CommonPrefixes>'
% xml_escape(i['subdir'])
for i in objects[:max_keys] if 'subdir' in i])))
elem = Element('ListBucketResult')
SubElement(elem, 'Prefix').text = req.params.get('prefix')
SubElement(elem, 'Marker').text = req.params.get('marker')
SubElement(elem, 'Delimiter').text = req.params.get('delimiter')
if max_keys > 0 and len(objects) == max_keys + 1:
is_truncated = 'true'
else:
is_truncated = 'false'
SubElement(elem, 'IsTruncated').text = is_truncated
SubElement(elem, 'MaxKeys').text = str(max_keys)
SubElement(elem, 'Name').text = self.container_name
for o in objects[:max_keys]:
if 'subdir' not in o:
contents = SubElement(elem, 'Contents')
SubElement(contents, 'Key').text = o['name']
SubElement(contents, 'LastModified').text = \
o['last_modified'] + 'Z'
SubElement(contents, 'ETag').text = o['hash']
SubElement(contents, 'Size').text = str(o['bytes'])
add_canonical_user(contents, 'Owner', self.account_name)
for o in objects[:max_keys]:
if 'subdir' in o:
common_prefixes = SubElement(elem, 'CommonPrefixes')
SubElement(common_prefixes, 'Prefix').text = o['subdir']
body = tostring(elem, xml_declaration=True, encoding='UTF-8')
return HTTPOk(body=body, content_type='application/xml')
def PUT(self, req):
@ -662,9 +598,9 @@ class ObjectController(Controller):
return get_err_response('InternalError')
if 'HTTP_X_COPY_FROM' in req.environ:
body = '<CopyObjectResult>' \
'<ETag>"%s"</ETag>' \
'</CopyObjectResult>' % resp.etag
elem = Element('CopyObjectResult')
SubElement(elem, 'ETag').text = '"%s"' % resp.etag
body = tostring(elem, xml_declaration=True, encoding='UTF-8')
return HTTPOk(body=body)
return HTTPOk(etag=resp.etag)
@ -802,13 +738,11 @@ class LocationController(Controller):
else:
return get_err_response('InternalError')
body = ('<?xml version="1.0" encoding="UTF-8"?>'
'<LocationConstraint '
'xmlns="http://s3.amazonaws.com/doc/2006-03-01/"')
if self.location == 'US':
body += '/>'
else:
body += ('>%s</LocationConstraint>' % self.location)
elem = Element('LocationConstraint')
if self.location != 'US':
elem.text = self.location
body = tostring(elem, xml_declaration=True, encoding='UTF-8')
return HTTPOk(body=body, content_type='application/xml')
@ -837,9 +771,9 @@ class LoggingStatusController(Controller):
return get_err_response('InternalError')
# logging disabled
body = ('<?xml version="1.0" encoding="UTF-8"?>'
'<BucketLoggingStatus '
'xmlns="http://doc.s3.amazonaws.com/2006-03-01" />')
elem = Element('BucketLoggingStatus')
body = tostring(elem, xml_declaration=True, encoding='UTF-8')
return HTTPOk(body=body, content_type='application/xml')
def PUT(self, req):
@ -859,31 +793,17 @@ class MultiObjectDeleteController(Controller):
Handles Delete Multiple Objects.
"""
def object_key_iter(xml):
dom = parseString(xml)
delete = dom.getElementsByTagName('Delete')[0]
for obj in delete.getElementsByTagName('Object'):
key = obj.getElementsByTagName('Key')[0].firstChild.data
version = None
if obj.getElementsByTagName('VersionId').length > 0:
version = obj.getElementsByTagName('VersionId')[0]\
.firstChild.data
elem = fromstring(xml)
for obj in elem.iterchildren('Object'):
key = obj.find('./Key').text
version = obj.find('./VersionId')
if version is not None:
version = version.text
yield (key, version)
def get_deleted_elem(key):
return ' <Deleted>\r\n' \
' <Key>%s</Key>\r\n' \
' </Deleted>\r\n' % (key)
elem = Element('DeleteResult')
def get_err_elem(key, err_code, message):
return ' <Error>\r\n' \
' <Key>%s</Key>\r\n' \
' <Code>%s</Code>\r\n' \
' <Message>%s</Message>\r\n' \
' </Error>\r\n' % (key, err_code, message)
body = '<?xml version="1.0" encoding="UTF-8"?>\r\n' \
'<DeleteResult ' \
'xmlns="http://doc.s3.amazonaws.com/2006-03-01">\r\n'
for key, version in object_key_iter(req.body):
if version is not None:
# TODO: delete the specific version of the object
@ -900,15 +820,20 @@ class MultiObjectDeleteController(Controller):
status = sub_resp.status_int
if status == HTTP_NO_CONTENT or status == HTTP_NOT_FOUND:
body += get_deleted_elem(key)
deleted = SubElement(elem, 'Deleted')
SubElement(deleted, 'Key').text = key
else:
error = SubElement(elem, 'Error')
SubElement(error, 'Key').text = key
if status == HTTP_UNAUTHORIZED:
body += get_err_elem(key, 'AccessDenied', 'Access Denied')
SubElement(error, 'Code').text = 'AccessDenied'
SubElement(error, 'Message').text = 'Access Denied'
else:
body += get_err_elem(key, 'InternalError',
'Internal Error')
SubElement(error, 'Code').text = 'InternalError'
SubElement(error, 'Message').text = 'Internal Error'
body = tostring(elem, xml_declaration=True, encoding='UTF-8')
body += '</DeleteResult>\r\n'
return HTTPOk(body=body)
@ -1010,8 +935,9 @@ class VersioningController(Controller):
return get_err_response('InternalError')
# Just report there is no versioning configured here.
body = ('<VersioningConfiguration '
'xmlns="http://s3.amazonaws.com/doc/2006-03-01/"/>')
elem = Element('VersioningConfiguration')
body = tostring(elem, xml_declaration=True, encoding='UTF-8')
return HTTPOk(body=body, content_type="text/plain")
def PUT(self, req):

View File

@ -20,7 +20,7 @@ import hashlib
import base64
from urllib import unquote, quote
import xml.dom.minidom
from lxml.etree import fromstring, tostring, Element, SubElement
import simplejson
from swift.common import swob
@ -78,15 +78,12 @@ class TestSwift3(unittest.TestCase):
('with space', '2011-01-05T02:19:14.275290', 0, 390),
('with%20space', '2011-01-05T02:19:14.275290', 0, 390))
json_pattern = ['"name":%s', '"last_modified":%s', '"hash":%s',
json_pattern = ['"name":"%s"', '"last_modified":"%s"', '"hash":"%s"',
'"bytes":%s']
json_pattern = '{' + ','.join(json_pattern) + '}'
json_out = []
for b in self.objects:
name = simplejson.dumps(b[0])
time = simplejson.dumps(b[1])
json_out.append(json_pattern %
(name, time, b[2], b[3]))
json_out.append(json_pattern % b)
object_list = '[' + ','.join(json_out) + ']'
self.swift.register('GET', '/v1/AUTH_test/junk', swob.HTTPOk, {},
object_list)
@ -121,6 +118,11 @@ class TestSwift3(unittest.TestCase):
self.swift.register('DELETE', '/v1/AUTH_test/bucket/object',
swob.HTTPNoContent, {}, None)
def _get_error_code(self, body):
elem = fromstring(body)
self.assertEquals(elem.tag, 'Error')
return elem.find('./Code').text
def call_app(self, req, app=None, expect_exception=False):
if app is None:
app = self.app
@ -163,20 +165,14 @@ class TestSwift3(unittest.TestCase):
req = Request.blank('/something',
headers={'Authorization': 'hoge'})
status, headers, body = self.call_swift3(req)
dom = xml.dom.minidom.parseString(body)
self.assertEquals(dom.firstChild.nodeName, 'Error')
code = dom.getElementsByTagName('Code')[0].childNodes[0].nodeValue
self.assertEquals(code, 'AccessDenied')
self.assertEquals(self._get_error_code(body), 'AccessDenied')
def test_bad_method(self):
req = Request.blank('/',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS test:tester:hmac'})
status, headers, body = self.call_swift3(req)
dom = xml.dom.minidom.parseString(body)
self.assertEquals(dom.firstChild.nodeName, 'Error')
code = dom.getElementsByTagName('Code')[0].childNodes[0].nodeValue
self.assertEquals(code, 'MethodNotAllowed')
self.assertEquals(self._get_error_code(body), 'MethodNotAllowed')
def test_path_info_encode(self):
bucket_name = 'b%75cket'
@ -199,9 +195,7 @@ class TestSwift3(unittest.TestCase):
req = Request.blank(path, environ={'REQUEST_METHOD': method},
headers=headers)
status, headers, body = self.call_swift3(req)
dom = xml.dom.minidom.parseString(body)
self.assertEquals(dom.firstChild.nodeName, 'Error')
return dom.getElementsByTagName('Code')[0].childNodes[0].nodeValue
return self._get_error_code(body)
def test_service_GET_error(self):
code = self._test_method_error('GET', '', swob.HTTPUnauthorized)
@ -218,17 +212,17 @@ class TestSwift3(unittest.TestCase):
status, headers, body = self.call_swift3(req)
self.assertEquals(status.split()[0], '200')
dom = xml.dom.minidom.parseString(body)
self.assertEquals(dom.firstChild.nodeName, 'ListAllMyBucketsResult')
elem = fromstring(body)
self.assertEquals(elem.tag, 'ListAllMyBucketsResult')
buckets = [n for n in dom.getElementsByTagName('Bucket')]
listing = [n for n in buckets[0].childNodes if n.nodeName != '#text']
all_buckets = elem.find('./Buckets')
buckets = all_buckets.iterchildren('Bucket')
listing = list(list(buckets)[0])
self.assertEquals(len(listing), 2)
names = []
for b in buckets:
if b.childNodes[0].nodeName == 'Name':
names.append(b.childNodes[0].childNodes[0].nodeValue)
for b in all_buckets.iterchildren('Bucket'):
names.append(b.find('./Name').text)
self.assertEquals(len(names), len(self.buckets))
for i in self.buckets:
@ -252,20 +246,17 @@ class TestSwift3(unittest.TestCase):
status, headers, body = self.call_swift3(req)
self.assertEquals(status.split()[0], '200')
dom = xml.dom.minidom.parseString(body)
self.assertEquals(dom.firstChild.nodeName, 'ListBucketResult')
name = dom.getElementsByTagName('Name')[0].childNodes[0].nodeValue
elem = fromstring(body)
self.assertEquals(elem.tag, 'ListBucketResult')
name = elem.find('./Name').text
self.assertEquals(name, bucket_name)
objects = [n for n in dom.getElementsByTagName('Contents')]
objects = elem.iterchildren('Contents')
names = []
for o in objects:
if o.childNodes[0].nodeName == 'Key':
names.append(o.childNodes[0].childNodes[0].nodeValue)
if o.childNodes[1].nodeName == 'LastModified':
self.assertTrue(
o.childNodes[1].childNodes[0].nodeValue.endswith('Z'))
names.append(o.find('./Key').text)
self.assertTrue(o.find('./LastModified').text.endswith('Z'))
self.assertEquals(len(names), len(self.objects))
for i in self.objects:
@ -279,18 +270,16 @@ class TestSwift3(unittest.TestCase):
'QUERY_STRING': 'max-keys=5'},
headers={'Authorization': 'AWS test:tester:hmac'})
status, headers, body = self.call_swift3(req)
dom = xml.dom.minidom.parseString(body)
self.assertEquals(dom.getElementsByTagName('IsTruncated')[0].
childNodes[0].nodeValue, 'false')
elem = fromstring(body)
self.assertEquals(elem.find('./IsTruncated').text, 'false')
req = Request.blank('/%s' % bucket_name,
environ={'REQUEST_METHOD': 'GET',
'QUERY_STRING': 'max-keys=4'},
headers={'Authorization': 'AWS test:tester:hmac'})
status, headers, body = self.call_swift3(req)
dom = xml.dom.minidom.parseString(body)
self.assertEquals(dom.getElementsByTagName('IsTruncated')[0].
childNodes[0].nodeValue, 'true')
elem = fromstring(body)
self.assertEquals(elem.find('./IsTruncated').text, 'true')
def test_bucket_GET_max_keys(self):
bucket_name = 'junk'
@ -300,9 +289,8 @@ class TestSwift3(unittest.TestCase):
'QUERY_STRING': 'max-keys=5'},
headers={'Authorization': 'AWS test:tester:hmac'})
status, headers, body = self.call_swift3(req)
dom = xml.dom.minidom.parseString(body)
self.assertEquals(dom.getElementsByTagName('MaxKeys')[0].
childNodes[0].nodeValue, '5')
elem = fromstring(body)
self.assertEquals(elem.find('./MaxKeys').text, '5')
_, path = self.swift.calls[-1]
_, query_string = path.split('?')
args = dict(cgi.parse_qsl(query_string))
@ -313,9 +301,8 @@ class TestSwift3(unittest.TestCase):
'QUERY_STRING': 'max-keys=5000'},
headers={'Authorization': 'AWS test:tester:hmac'})
status, headers, body = self.call_swift3(req)
dom = xml.dom.minidom.parseString(body)
self.assertEquals(dom.getElementsByTagName('MaxKeys')[0].
childNodes[0].nodeValue, '1000')
elem = fromstring(body)
self.assertEquals(elem.find('./MaxKeys').text, '1000')
_, path = self.swift.calls[-1]
_, query_string = path.split('?')
args = dict(cgi.parse_qsl(query_string))
@ -328,13 +315,10 @@ class TestSwift3(unittest.TestCase):
'delimiter=a&marker=b&prefix=c'},
headers={'Authorization': 'AWS test:tester:hmac'})
status, headers, body = self.call_swift3(req)
dom = xml.dom.minidom.parseString(body)
self.assertEquals(dom.getElementsByTagName('Prefix')[0].
childNodes[0].nodeValue, 'c')
self.assertEquals(dom.getElementsByTagName('Marker')[0].
childNodes[0].nodeValue, 'b')
self.assertEquals(dom.getElementsByTagName('Delimiter')[0].
childNodes[0].nodeValue, 'a')
elem = fromstring(body)
self.assertEquals(elem.find('./Prefix').text, 'c')
self.assertEquals(elem.find('./Marker').text, 'b')
self.assertEquals(elem.find('./Delimiter').text, 'a')
_, path = self.swift.calls[-1]
_, query_string = path.split('?')
args = dict(cgi.parse_qsl(query_string))
@ -386,13 +370,12 @@ class TestSwift3(unittest.TestCase):
status, headers, body = self.call_swift3(req)
self.assertEquals(status.split()[0], '204')
def _check_acl(self, owner, resp):
dom = xml.dom.minidom.parseString("".join(resp))
self.assertEquals(dom.firstChild.nodeName, 'AccessControlPolicy')
permission = dom.getElementsByTagName('Permission')[0]
name = permission.childNodes[0].nodeValue
self.assertEquals(name, 'FULL_CONTROL')
name = dom.getElementsByTagName('ID')[0].childNodes[0].nodeValue
def _check_acl(self, owner, body):
elem = fromstring(body)
self.assertEquals(elem.tag, 'AccessControlPolicy')
permission = elem.find('./AccessControlList/Grant/Permission').text
self.assertEquals(permission, 'FULL_CONTROL')
name = elem.find('./AccessControlList/Grant/Grantee/ID').text
self.assertEquals(name, owner)
def test_bucket_acl_GET(self):
@ -409,8 +392,8 @@ class TestSwift3(unittest.TestCase):
environ={'REQUEST_METHOD': 'GET'},
headers={'Authorization': 'AWS test:tester:hmac'})
status, headers, body = self.call_swift3(req)
dom = xml.dom.minidom.parseString(body)
self.assertEquals(dom.firstChild.nodeName, 'VersioningConfiguration')
elem = fromstring(body)
self.assertEquals(elem.tag, 'VersioningConfiguration')
def _test_object_GETorHEAD(self, method):
req = Request.blank('/bucket/object',
@ -535,15 +518,12 @@ class TestSwift3(unittest.TestCase):
self.assertEquals(status.split()[0], '204')
def test_object_multi_DELETE(self):
body = '<?xml version="1.0" encoding="UTF-8"?> \
<Delete>\
<Object>\
<Key>Key1</Key>\
</Object>\
<Object>\
<Key>Key2</Key>\
</Object>\
</Delete>'
elem = Element('Delete')
for key in ['Key1', 'Key2']:
obj = SubElement(elem, 'Object')
SubElement(obj, 'Key').text = key
body = tostring(elem, xml_declaration=True, encoding='UTF-8')
req = Request.blank('/bucket?delete',
environ={'REQUEST_METHOD': 'POST'},
headers={'Authorization': 'AWS test:tester:hmac'},