diff --git a/swift3/middleware.py b/swift3/middleware.py
index f9393428..afbb4c9b 100644
--- a/swift3/middleware.py
+++ b/swift3/middleware.py
@@ -143,7 +143,6 @@ class Swift3Middleware(object):
elif 'keystoneauth' in auth_pipeline:
check_filter_order(auth_pipeline,
['s3token',
- 'authtoken',
'keystoneauth'])
LOGGER.debug('Use keystone middleware.')
elif len(auth_pipeline):
diff --git a/swift3/request.py b/swift3/request.py
index 25a567da..6044653a 100644
--- a/swift3/request.py
+++ b/swift3/request.py
@@ -1198,7 +1198,7 @@ class S3AclRequest(Request):
self.user_id = "%s:%s" % (sw_resp.environ['HTTP_X_TENANT_NAME'],
sw_resp.environ['HTTP_X_USER_NAME'])
self.user_id = utf8encode(self.user_id)
- self.token = sw_resp.environ['HTTP_X_AUTH_TOKEN']
+ self.token = sw_resp.environ.get('HTTP_X_AUTH_TOKEN')
# Need to skip S3 authorization since authtoken middleware
# overwrites account in PATH_INFO
del self.headers['Authorization']
diff --git a/swift3/s3_token_middleware.py b/swift3/s3_token_middleware.py
index b194b788..143b5bbc 100644
--- a/swift3/s3_token_middleware.py
+++ b/swift3/s3_token_middleware.py
@@ -37,7 +37,8 @@ import logging
import requests
import six
-from swift.common.swob import Request, Response
+from swift.common.swob import Request, HTTPBadRequest, HTTPUnauthorized, \
+ HTTPException
from swift.common.utils import config_true_value, split_path
from swift.common.wsgi import ConfigFileError
@@ -46,9 +47,29 @@ from swift3.utils import is_valid_ipv6
PROTOCOL_NAME = 'S3 Token Authentication'
-
-class ServiceError(Exception):
- pass
+# Headers to purge if they came from (or may have come from) the client
+KEYSTONE_AUTH_HEADERS = (
+ 'X-Identity-Status', 'X-Service-Identity-Status',
+ 'X-Domain-Id', 'X-Service-Domain-Id',
+ 'X-Domain-Name', 'X-Service-Domain-Name',
+ 'X-Project-Id', 'X-Service-Project-Id',
+ 'X-Project-Name', 'X-Service-Project-Name',
+ 'X-Project-Domain-Id', 'X-Service-Project-Domain-Id',
+ 'X-Project-Domain-Name', 'X-Service-Project-Domain-Name',
+ 'X-User-Id', 'X-Service-User-Id',
+ 'X-User-Name', 'X-Service-User-Name',
+ 'X-User-Domain-Id', 'X-Service-User-Domain-Id',
+ 'X-User-Domain-Name', 'X-Service-User-Domain-Name',
+ 'X-Roles', 'X-Service-Roles',
+ 'X-Is-Admin-Project',
+ 'X-Service-Catalog',
+ # Deprecated headers, too...
+ 'X-Tenant-Id',
+ 'X-Tenant-Name',
+ 'X-Tenant',
+ 'X-User',
+ 'X-Role',
+)
class S3Token(object):
@@ -104,16 +125,16 @@ class S3Token(object):
self._verify = None
def _deny_request(self, code):
- error_table = {
- 'AccessDenied': (401, 'Access denied'),
- 'InvalidURI': (400, 'Could not parse the specified URI'),
- }
- resp = Response(content_type='text/xml')
- resp.status = error_table[code][0]
+ error_cls, message = {
+ 'AccessDenied': (HTTPUnauthorized, 'Access denied'),
+ 'InvalidURI': (HTTPBadRequest,
+ 'Could not parse the specified URI'),
+ }[code]
+ resp = error_cls(content_type='text/xml')
error_msg = ('\r\n'
'\r\n %s
\r\n '
'%s\r\n\r\n' %
- (code, error_table[code][1]))
+ (code, message))
if six.PY3:
error_msg = error_msg.encode()
resp.body = error_msg
@@ -128,14 +149,12 @@ class S3Token(object):
timeout=self._timeout)
except requests.exceptions.RequestException as e:
self._logger.info('HTTP connection exception: %s', e)
- resp = self._deny_request('InvalidURI')
- raise ServiceError(resp)
+ raise self._deny_request('InvalidURI')
if response.status_code < 200 or response.status_code >= 300:
self._logger.debug('Keystone reply error: status=%s reason=%s',
response.status_code, response.reason)
- resp = self._deny_request('AccessDenied')
- raise ServiceError(resp)
+ raise self._deny_request('AccessDenied')
return response
@@ -144,6 +163,10 @@ class S3Token(object):
req = Request(environ)
self._logger.debug('Calling S3Token middleware.')
+ # Always drop auth headers if we're first in the pipeline
+ if 'keystone.token_info' not in req.environ:
+ req.headers.update({h: None for h in KEYSTONE_AUTH_HEADERS})
+
try:
parts = split_path(req.path, 1, 4, True)
version, account, container, obj = parts
@@ -210,26 +233,45 @@ class S3Token(object):
# identified and not doing a second query and just
# pass it through to swiftauth in this case.
try:
+ # NB: requests.Response, not swob.Response
resp = self._json_request(creds_json)
- except ServiceError as e:
- resp = e.args[0] # NB: swob.Response, not requests.Response
+ except HTTPException as e_resp:
if self._delay_auth_decision:
msg = 'Received error, deferring rejection based on error: %s'
- self._logger.debug(msg, resp.status)
+ self._logger.debug(msg, e_resp.status)
return self._app(environ, start_response)
else:
msg = 'Received error, rejecting request with error: %s'
- self._logger.debug(msg, resp.status)
- return resp(environ, start_response)
+ self._logger.debug(msg, e_resp.status)
+ # NB: swob.Response, not requests.Response
+ return e_resp(environ, start_response)
self._logger.debug('Keystone Reply: Status: %d, Output: %s',
resp.status_code, resp.content)
try:
- identity_info = resp.json()
- token_id = str(identity_info['access']['token']['id'])
- tenant = identity_info['access']['token']['tenant']
- except (ValueError, KeyError):
+ access_info = resp.json()['access']
+ # Populate the environment similar to auth_token,
+ # so we don't have to contact Keystone again.
+ #
+ # Note that although the strings are unicode following json
+ # deserialization, Swift's HeaderEnvironProxy handles ensuring
+ # they're stored as native strings
+ req.headers.update({
+ 'X-Identity-Status': 'Confirmed',
+ 'X-Roles': ','.join(r['name']
+ for r in access_info['user']['roles']),
+ 'X-User-Id': access_info['user']['id'],
+ 'X-User-Name': access_info['user']['name'],
+ 'X-Tenant-Id': access_info['token']['tenant']['id'],
+ 'X-Tenant-Name': access_info['token']['tenant']['name'],
+ 'X-Project-Id': access_info['token']['tenant']['id'],
+ 'X-Project-Name': access_info['token']['tenant']['name'],
+ })
+ token_id = access_info['token'].get('id')
+ tenant = access_info['token']['tenant']
+ req.environ['keystone.token_info'] = resp.json()
+ except (ValueError, KeyError, TypeError):
if self._delay_auth_decision:
error = ('Error on keystone reply: %d %s - '
'deferring rejection downstream')
diff --git a/swift3/test/functional/conf/ceph-known-failures-keystone.yaml b/swift3/test/functional/conf/ceph-known-failures-keystone.yaml
index cfeb3e66..66460570 100644
--- a/swift3/test/functional/conf/ceph-known-failures-keystone.yaml
+++ b/swift3/test/functional/conf/ceph-known-failures-keystone.yaml
@@ -6,8 +6,6 @@ ceph_s3:
s3tests.functional.test_headers.test_object_create_bad_authorization_invalid_aws2: {status: KNOWN}
s3tests.functional.test_headers.test_object_create_bad_authorization_none: {status: KNOWN}
s3tests.functional.test_s3.test_100_continue: {status: KNOWN}
- s3tests.functional.test_s3.test_abort_multipart_upload: {status: KNOWN}
- s3tests.functional.test_s3.test_abort_multipart_upload_not_found: {status: KNOWN}
s3tests.functional.test_s3.test_atomic_conditional_write_1mb: {status: KNOWN}
s3tests.functional.test_s3.test_atomic_dual_conditional_write_1mb: {status: KNOWN}
s3tests.functional.test_s3.test_bucket_acl_default: {status: KNOWN}
@@ -35,20 +33,8 @@ ceph_s3:
s3tests.functional.test_s3.test_cors_origin_wildcard: {status: KNOWN}
s3tests.functional.test_s3.test_list_buckets_anonymous: {status: KNOWN}
s3tests.functional.test_s3.test_list_buckets_invalid_auth: {status: KNOWN}
- s3tests.functional.test_s3.test_list_multipart_upload: {status: KNOWN}
s3tests.functional.test_s3.test_logging_toggle: {status: KNOWN}
- s3tests.functional.test_s3.test_multipart_copy_multiple_sizes: {status: KNOWN}
- s3tests.functional.test_s3.test_multipart_copy_small: {status: KNOWN}
s3tests.functional.test_s3.test_multipart_resend_first_finishes_last: {status: KNOWN}
- s3tests.functional.test_s3.test_multipart_upload: {status: KNOWN}
- s3tests.functional.test_s3.test_multipart_upload_contents: {status: KNOWN}
- s3tests.functional.test_s3.test_multipart_upload_empty: {status: KNOWN}
- s3tests.functional.test_s3.test_multipart_upload_incorrect_etag: {status: KNOWN}
- s3tests.functional.test_s3.test_multipart_upload_missing_part: {status: KNOWN}
- s3tests.functional.test_s3.test_multipart_upload_multiple_sizes: {status: KNOWN}
- s3tests.functional.test_s3.test_multipart_upload_resend_part: {status: KNOWN}
- s3tests.functional.test_s3.test_multipart_upload_size_too_small: {status: KNOWN}
- s3tests.functional.test_s3.test_multipart_upload_small: {status: KNOWN}
s3tests.functional.test_s3.test_object_acl_full_control_verify_owner: {status: KNOWN}
s3tests.functional.test_s3.test_object_acl_xml: {status: KNOWN}
s3tests.functional.test_s3.test_object_acl_xml_read: {status: KNOWN}
diff --git a/swift3/test/functional/conf/proxy-server.conf.in b/swift3/test/functional/conf/proxy-server.conf.in
index 1cc94e4c..87a89426 100644
--- a/swift3/test/functional/conf/proxy-server.conf.in
+++ b/swift3/test/functional/conf/proxy-server.conf.in
@@ -53,15 +53,6 @@ use = egg:swift#memcache
use = egg:swift3#s3token
auth_uri = http://localhost:35357/
-[filter:authtoken]
-paste.filter_factory = keystonemiddleware.auth_token:filter_factory
-identity_uri = http://localhost:35357/
-auth_uri = http://localhost:5000/
-admin_tenant_name = service
-admin_user = swift
-admin_password = password
-cache = swift.cache
-
[filter:keystoneauth]
use = egg:swift#keystoneauth
operator_roles = admin, swiftoperator
diff --git a/swift3/test/functional/run_test.sh b/swift3/test/functional/run_test.sh
index 29dd113a..6aa478ef 100755
--- a/swift3/test/functional/run_test.sh
+++ b/swift3/test/functional/run_test.sh
@@ -27,7 +27,7 @@ mkdir -p ${TEST_DIR}/certs ${TEST_DIR}/private
# create config files
if [ "$AUTH" == 'keystone' ]; then
- MIDDLEWARE="s3token authtoken keystoneauth"
+ MIDDLEWARE="s3token keystoneauth"
elif [ "$AUTH" == 'tempauth' ]; then
MIDDLEWARE="tempauth"
else
diff --git a/swift3/test/unit/test_middleware.py b/swift3/test/unit/test_middleware.py
index 3748d3da..ec032596 100644
--- a/swift3/test/unit/test_middleware.py
+++ b/swift3/test/unit/test_middleware.py
@@ -14,21 +14,31 @@
# limitations under the License.
import unittest
-from mock import patch
+from mock import patch, MagicMock
from contextlib import nested
from datetime import datetime
import hashlib
import base64
+import requests
+import json
+import copy
from urllib import unquote, quote
+from swift.common.middleware.keystoneauth import KeystoneAuth
from swift.common import swob, utils
from swift.common.swob import Request
+from keystonemiddleware.auth_token import AuthProtocol
+from keystoneauth1.access import AccessInfoV2
+
import swift3
from swift3.test.unit import Swift3TestCase
+from swift3.test.unit.helpers import FakeSwift
+from swift3.test.unit.test_s3_token_middleware import GOOD_RESPONSE
from swift3.request import SigV4Request, Request as S3Request
from swift3.etree import fromstring
-from swift3.middleware import filter_factory
+from swift3.middleware import filter_factory, Swift3Middleware
+from swift3.s3_token_middleware import S3Token
from swift3.cfg import CONF
@@ -521,19 +531,24 @@ class TestSwift3Middleware(Swift3TestCase):
pipeline.return_value = 'swift3 tempauth proxy-server'
self.swift3.check_pipeline(conf)
+ # This *should* still work; authtoken will remove our auth details,
+ # but the X-Auth-Token we drop in will remain
+ # if we found one in the response
pipeline.return_value = 'swift3 s3token authtoken keystoneauth ' \
'proxy-server'
self.swift3.check_pipeline(conf)
+ # This should work now; no more doubled-up requests to keystone!
+ pipeline.return_value = 'swift3 s3token keystoneauth proxy-server'
+ self.swift3.check_pipeline(conf)
+
pipeline.return_value = 'swift3 swauth proxy-server'
self.swift3.check_pipeline(conf)
+ # Note that authtoken would need to have delay_auth_decision=True
pipeline.return_value = 'swift3 authtoken s3token keystoneauth ' \
'proxy-server'
- with self.assertRaises(ValueError) as cm:
- self.swift3.check_pipeline(conf)
- self.assertIn('expected filter s3token before authtoken before '
- 'keystoneauth', cm.exception.message)
+ self.swift3.check_pipeline(conf)
pipeline.return_value = 'swift3 proxy-server'
with self.assertRaises(ValueError) as cm:
@@ -861,6 +876,120 @@ class TestSwift3Middleware(Swift3TestCase):
status, headers, body = self.call_swift3(req)
self.assertEqual(status.split()[0], '403', body)
+ def test_swift3_with_only_s3_token(self):
+ self.swift = FakeSwift()
+ self.keystone_auth = KeystoneAuth(
+ self.swift, {'operator_roles': 'swift-user'})
+ self.s3_token = S3Token(
+ self.keystone_auth, {'auth_uri': 'https://fakehost/identity'})
+ self.swift3 = Swift3Middleware(self.s3_token, CONF)
+ req = Request.blank(
+ '/bucket',
+ environ={'REQUEST_METHOD': 'PUT'},
+ headers={'Authorization': 'AWS access:signature',
+ 'Date': self.get_date_header()})
+ self.swift.register('PUT', '/v1/AUTH_TENANT_ID/bucket',
+ swob.HTTPCreated, {}, None)
+ self.swift.register('HEAD', '/v1/AUTH_TENANT_ID',
+ swob.HTTPOk, {}, None)
+ with patch.object(self.s3_token, '_json_request') as mock_req:
+ mock_resp = requests.Response()
+ mock_resp._content = json.dumps(GOOD_RESPONSE)
+ mock_resp.status_code = 201
+ mock_req.return_value = mock_resp
+
+ status, headers, body = self.call_swift3(req)
+ self.assertEqual(body, '')
+ self.assertEqual(1, mock_req.call_count)
+
+ def test_swift3_with_s3_token_and_auth_token(self):
+ self.swift = FakeSwift()
+ self.keystone_auth = KeystoneAuth(
+ self.swift, {'operator_roles': 'swift-user'})
+ self.auth_token = AuthProtocol(
+ self.keystone_auth, {'delay_auth_decision': 'True'})
+ self.s3_token = S3Token(
+ self.auth_token, {'auth_uri': 'https://fakehost/identity'})
+ self.swift3 = Swift3Middleware(self.s3_token, CONF)
+ req = Request.blank(
+ '/bucket',
+ environ={'REQUEST_METHOD': 'PUT'},
+ headers={'Authorization': 'AWS access:signature',
+ 'Date': self.get_date_header()})
+ self.swift.register('PUT', '/v1/AUTH_TENANT_ID/bucket',
+ swob.HTTPCreated, {}, None)
+ self.swift.register('HEAD', '/v1/AUTH_TENANT_ID',
+ swob.HTTPOk, {}, None)
+ with patch.object(self.s3_token, '_json_request') as mock_req:
+ with patch.object(self.auth_token,
+ '_do_fetch_token') as mock_fetch:
+ mock_resp = requests.Response()
+ mock_resp._content = json.dumps(GOOD_RESPONSE)
+ mock_resp.status_code = 201
+ mock_req.return_value = mock_resp
+
+ mock_access_info = AccessInfoV2(GOOD_RESPONSE)
+ mock_access_info.will_expire_soon = \
+ lambda stale_duration: False
+ mock_fetch.return_value = (MagicMock(), mock_access_info)
+
+ status, headers, body = self.call_swift3(req)
+ self.assertEqual(body, '')
+ self.assertEqual(1, mock_req.call_count)
+ # With X-Auth-Token, auth_token will call _do_fetch_token to
+ # connect to keystone in auth_token, again
+ self.assertEqual(1, mock_fetch.call_count)
+
+ def test_swift3_with_s3_token_no_pass_token_to_auth_token(self):
+ self.swift = FakeSwift()
+ self.keystone_auth = KeystoneAuth(
+ self.swift, {'operator_roles': 'swift-user'})
+ self.auth_token = AuthProtocol(
+ self.keystone_auth, {'delay_auth_decision': 'True'})
+ self.s3_token = S3Token(
+ self.auth_token, {'auth_uri': 'https://fakehost/identity'})
+ self.swift3 = Swift3Middleware(self.s3_token, CONF)
+ req = Request.blank(
+ '/bucket',
+ environ={'REQUEST_METHOD': 'PUT'},
+ headers={'Authorization': 'AWS access:signature',
+ 'Date': self.get_date_header()})
+ self.swift.register('PUT', '/v1/AUTH_TENANT_ID/bucket',
+ swob.HTTPCreated, {}, None)
+ self.swift.register('HEAD', '/v1/AUTH_TENANT_ID',
+ swob.HTTPOk, {}, None)
+ with patch.object(self.s3_token, '_json_request') as mock_req:
+ with patch.object(self.auth_token,
+ '_do_fetch_token') as mock_fetch:
+ mock_resp = requests.Response()
+ no_token_id_good_resp = copy.deepcopy(GOOD_RESPONSE)
+ # delete token id
+ del no_token_id_good_resp['access']['token']['id']
+ mock_resp._content = json.dumps(no_token_id_good_resp)
+ mock_resp.status_code = 201
+ mock_req.return_value = mock_resp
+
+ mock_access_info = AccessInfoV2(GOOD_RESPONSE)
+ mock_access_info.will_expire_soon = \
+ lambda stale_duration: False
+ mock_fetch.return_value = (MagicMock(), mock_access_info)
+
+ status, headers, body = self.call_swift3(req)
+ # No token provided from keystone result in 401 Unauthorized
+ # at `swift.common.middleware.keystoneauth` because auth_token
+ # will remove all auth headers including 'X-Identity-Status'[1]
+ # and then, set X-Identity-Status: Invalid at [2]
+ #
+ # 1: https://github.com/openstack/keystonemiddleware/blob/
+ # master/keystonemiddleware/auth_token/__init__.py#L620
+ # 2: https://github.com/openstack/keystonemiddleware/blob/
+ # master/keystonemiddleware/auth_token/__init__.py#L627-L629
+
+ self.assertEqual('403 Forbidden', status)
+ self.assertEqual(1, mock_req.call_count)
+ # if no token provided from keystone, we can skip the call to
+ # fetch the token
+ self.assertEqual(0, mock_fetch.call_count)
if __name__ == '__main__':
unittest.main()
diff --git a/swift3/test/unit/test_s3_token_middleware.py b/swift3/test/unit/test_s3_token_middleware.py
index 07eaabd8..e317fcb6 100644
--- a/swift3/test/unit/test_s3_token_middleware.py
+++ b/swift3/test/unit/test_s3_token_middleware.py
@@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
+import copy
import json
import logging
import time
@@ -28,8 +29,24 @@ from swift3 import s3_token_middleware as s3_token
from swift.common.swob import Request, Response
from swift.common.wsgi import ConfigFileError
-GOOD_RESPONSE = {'access': {'token': {'id': 'TOKEN_ID',
- 'tenant': {'id': 'TENANT_ID'}}}}
+GOOD_RESPONSE = {'access': {
+ 'user': {
+ 'username': 'S3_USER',
+ 'name': 'S3_USER',
+ 'id': 'USER_ID',
+ 'roles': [
+ {'name': 'swift-user'},
+ {'name': '_member_'},
+ ],
+ },
+ 'token': {
+ 'id': 'TOKEN_ID',
+ 'tenant': {
+ 'id': 'TENANT_ID',
+ 'name': 'TENANT_NAME'
+ }
+ }
+}}
class TestResponse(requests.Response):
@@ -138,20 +155,66 @@ class S3TokenMiddlewareTestGood(S3TokenMiddlewareTestBase):
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 200)
+ def test_nukes_auth_headers(self):
+ client_env = {
+ 'HTTP_X_IDENTITY_STATUS': 'Confirmed',
+ 'HTTP_X_ROLES': 'admin,_member_,swift-user',
+ 'HTTP_X_TENANT_ID': 'cfa'
+ }
+ req = Request.blank('/v1/AUTH_cfa/c/o', environ=client_env)
+ self.middleware(req.environ, self.start_fake_response)
+ self.assertEqual(self.response_status, 200)
+ for key in client_env:
+ self.assertNotIn(key, req.environ)
+
def test_without_auth_storage_token(self):
req = Request.blank('/v1/AUTH_cfa/c/o')
req.headers['Authorization'] = 'AWS badboy'
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 200)
+ def _assert_authorized(self, req, expect_token=True):
+ self.assertTrue(req.path.startswith('/v1/AUTH_TENANT_ID'))
+ expected_headers = {
+ 'X-Identity-Status': 'Confirmed',
+ 'X-Roles': 'swift-user,_member_',
+ 'X-User-Id': 'USER_ID',
+ 'X-User-Name': 'S3_USER',
+ 'X-Tenant-Id': 'TENANT_ID',
+ 'X-Tenant-Name': 'TENANT_NAME',
+ 'X-Project-Id': 'TENANT_ID',
+ 'X-Project-Name': 'TENANT_NAME',
+ 'X-Auth-Token': 'TOKEN_ID',
+ }
+ for header, value in expected_headers.items():
+ if header == 'X-Auth-Token' and not expect_token:
+ self.assertNotIn(header, req.headers)
+ continue
+ self.assertIn(header, req.headers)
+ self.assertEqual(value, req.headers[header])
+ # WSGI wants native strings for headers
+ self.assertIsInstance(req.headers[header], str)
+ self.assertEqual(1, self.middleware._app.calls)
+
def test_authorized(self):
req = Request.blank('/v1/AUTH_cfa/c/o')
req.headers['Authorization'] = 'AWS access:signature'
req.headers['X-Storage-Token'] = 'token'
req.get_response(self.middleware)
- self.assertTrue(req.path.startswith('/v1/AUTH_TENANT_ID'))
- self.assertEqual(req.headers['X-Auth-Token'], 'TOKEN_ID')
- self.assertEqual(1, self.middleware._app.calls)
+ self._assert_authorized(req)
+
+ def test_tolerate_missing_token_id(self):
+ resp = copy.deepcopy(GOOD_RESPONSE)
+ del resp['access']['token']['id']
+ self.requests_mock.post(self.TEST_URL,
+ status_code=201,
+ json=resp)
+
+ req = Request.blank('/v1/AUTH_cfa/c/o')
+ req.headers['Authorization'] = 'AWS access:signature'
+ req.headers['X-Storage-Token'] = 'token'
+ req.get_response(self.middleware)
+ self._assert_authorized(req, expect_token=False)
def test_authorized_http(self):
protocol = 'http'
@@ -169,8 +232,7 @@ class S3TokenMiddlewareTestGood(S3TokenMiddlewareTestBase):
req.headers['Authorization'] = 'AWS access:signature'
req.headers['X-Storage-Token'] = 'token'
req.get_response(self.middleware)
- self.assertTrue(req.path.startswith('/v1/AUTH_TENANT_ID'))
- self.assertEqual(req.headers['X-Auth-Token'], 'TOKEN_ID')
+ self._assert_authorized(req)
def test_authorized_trailing_slash(self):
self.middleware = s3_token.filter_factory({
@@ -179,8 +241,7 @@ class S3TokenMiddlewareTestGood(S3TokenMiddlewareTestBase):
req.headers['Authorization'] = 'AWS access:signature'
req.headers['X-Storage-Token'] = 'token'
req.get_response(self.middleware)
- self.assertTrue(req.path.startswith('/v1/AUTH_TENANT_ID'))
- self.assertEqual(req.headers['X-Auth-Token'], 'TOKEN_ID')
+ self._assert_authorized(req)
def test_authorization_nova_toconnect(self):
req = Request.blank('/v1/AUTH_swiftint/c/o')
@@ -311,6 +372,7 @@ class S3TokenMiddlewareTestGood(S3TokenMiddlewareTestBase):
req.headers['Authorization'] = 'AWS access:signature'
req.headers['X-Storage-Token'] = 'token'
req.get_response(self.middleware)
+ self._assert_authorized(req)
class S3TokenMiddlewareTestBad(S3TokenMiddlewareTestBase):
@@ -337,44 +399,87 @@ class S3TokenMiddlewareTestBad(S3TokenMiddlewareTestBase):
req.headers['X-Storage-Token'] = 'token'
resp = req.get_response(self.middleware)
self.assertEqual(resp.status_int, 400) # pylint: disable-msg=E1101
- s3_invalid_req = self.middleware._deny_request('InvalidURI')
- self.assertEqual(resp.body, s3_invalid_req.body)
+ s3_invalid_resp = self.middleware._deny_request('InvalidURI')
+ self.assertEqual(resp.body, s3_invalid_resp.body)
self.assertEqual(
resp.status_int, # pylint: disable-msg=E1101
- s3_invalid_req.status_int) # pylint: disable-msg=E1101
+ s3_invalid_resp.status_int) # pylint: disable-msg=E1101
self.assertEqual(0, self.middleware._app.calls)
def test_fail_to_connect_to_keystone(self):
with mock.patch.object(self.middleware, '_json_request') as o:
- s3_invalid_req = self.middleware._deny_request('InvalidURI')
- o.side_effect = s3_token.ServiceError(s3_invalid_req)
+ s3_invalid_resp = self.middleware._deny_request('InvalidURI')
+ o.side_effect = s3_invalid_resp
req = Request.blank('/v1/AUTH_cfa/c/o')
req.headers['Authorization'] = 'AWS access:signature'
req.headers['X-Storage-Token'] = 'token'
resp = req.get_response(self.middleware)
- self.assertEqual(resp.body, s3_invalid_req.body)
+ self.assertEqual(resp.body, s3_invalid_resp.body)
self.assertEqual(
resp.status_int, # pylint: disable-msg=E1101
- s3_invalid_req.status_int) # pylint: disable-msg=E1101
+ s3_invalid_resp.status_int) # pylint: disable-msg=E1101
self.assertEqual(0, self.middleware._app.calls)
- def test_bad_reply(self):
+ def _test_bad_reply(self, response_body):
self.requests_mock.post(self.TEST_URL,
status_code=201,
- text="")
+ text=response_body)
req = Request.blank('/v1/AUTH_cfa/c/o')
req.headers['Authorization'] = 'AWS access:signature'
req.headers['X-Storage-Token'] = 'token'
resp = req.get_response(self.middleware)
- s3_invalid_req = self.middleware._deny_request('InvalidURI')
- self.assertEqual(resp.body, s3_invalid_req.body)
+ s3_invalid_resp = self.middleware._deny_request('InvalidURI')
+ self.assertEqual(resp.body, s3_invalid_resp.body)
self.assertEqual(
resp.status_int, # pylint: disable-msg=E1101
- s3_invalid_req.status_int) # pylint: disable-msg=E1101
+ s3_invalid_resp.status_int) # pylint: disable-msg=E1101
self.assertEqual(0, self.middleware._app.calls)
+ def test_bad_reply_not_json(self):
+ self._test_bad_reply('')
+
+ def _test_bad_reply_missing_parts(self, *parts):
+ resp = copy.deepcopy(GOOD_RESPONSE)
+ part_dict = resp
+ for part in parts[:-1]:
+ part_dict = part_dict[part]
+ del part_dict[parts[-1]]
+ self._test_bad_reply(json.dumps(resp))
+
+ def test_bad_reply_missing_token_dict(self):
+ self._test_bad_reply_missing_parts('access', 'token')
+
+ def test_bad_reply_missing_user_dict(self):
+ self._test_bad_reply_missing_parts('access', 'user')
+
+ def test_bad_reply_missing_user_roles(self):
+ self._test_bad_reply_missing_parts('access', 'user', 'roles')
+
+ def test_bad_reply_missing_user_name(self):
+ self._test_bad_reply_missing_parts('access', 'user', 'name')
+
+ def test_bad_reply_missing_user_id(self):
+ self._test_bad_reply_missing_parts('access', 'user', 'id')
+
+ def test_bad_reply_missing_tenant_dict(self):
+ self._test_bad_reply_missing_parts('access', 'token', 'tenant')
+
+ def test_bad_reply_missing_tenant_id(self):
+ self._test_bad_reply_missing_parts('access', 'token', 'tenant', 'id')
+
+ def test_bad_reply_missing_tenant_name(self):
+ self._test_bad_reply_missing_parts('access', 'token', 'tenant', 'name')
+
+ def test_bad_reply_valid_but_bad_json(self):
+ self._test_bad_reply('{}')
+ self._test_bad_reply('[]')
+ self._test_bad_reply('null')
+ self._test_bad_reply('"foo"')
+ self._test_bad_reply('1')
+ self._test_bad_reply('true')
+
class S3TokenMiddlewareTestDeferredAuth(S3TokenMiddlewareTestBase):
def setUp(self):
@@ -411,8 +516,7 @@ class S3TokenMiddlewareTestDeferredAuth(S3TokenMiddlewareTestBase):
def test_fail_to_connect_to_keystone(self):
with mock.patch.object(self.middleware, '_json_request') as o:
- s3_invalid_req = self.middleware._deny_request('InvalidURI')
- o.side_effect = s3_token.ServiceError(s3_invalid_req)
+ o.side_effect = self.middleware._deny_request('InvalidURI')
req = Request.blank('/v1/AUTH_cfa/c/o')
req.headers['Authorization'] = 'AWS access:signature'
diff --git a/test-requirements.txt b/test-requirements.txt
index 61740be0..84507a0a 100644
--- a/test-requirements.txt
+++ b/test-requirements.txt
@@ -9,3 +9,4 @@ python-openstackclient
boto
requests-mock>=0.7.0 # Apache-2.0
fixtures<2.0,>=1.3.1 # Apache-2.0/BSD
+keystonemiddleware