Stop using client headers for cross-middleware communication

Previously, Swift3 used client-facing HTTP headers to pass the S3 access
key, signature, and normalized request through the WSGI pipeline.
However, swauth did not validate that Swift3 actually set the headers;
as a result, an attacker who has captured a single valid request through
the S3 API may impersonate the user that issued the request indefinitely
through the Swift API.

Now, the S3 authentication information will be taken from a separate,
client-inaccessible namespace in the WSGI environment as defined in the
related change.

UpgradeImpact

This addresses a breaking API change in Swift3. No currently deployed
version of Swift3 will work with this. When upgrading swauth, operators
will need to upgrade Swift3 as well.

Change-Id: Ie5481a316397f46734e9dd0e77a8a87197ceec16
Related-Change: Ia3fbb4938f0daa8845cba4137a01cc43bc1a713c
This commit is contained in:
Tim Burke 2017-03-01 21:13:57 +00:00 committed by Ondřej Nový
parent 404b467be5
commit 2a84fe7c69
2 changed files with 52 additions and 20 deletions

View File

@ -232,7 +232,7 @@ class Swauth(object):
start_response)
elif env.get('PATH_INFO', '').startswith(self.auth_prefix):
return self.handle(env, start_response)
s3 = env.get('HTTP_AUTHORIZATION')
s3 = env.get('swift3.auth_details')
if s3 and not self.s3_support:
msg = 'S3 support is disabled in swauth.'
return HTTPBadRequest(body=msg)(env, start_response)
@ -322,7 +322,8 @@ class Swauth(object):
if expires < time():
groups = None
if env.get('HTTP_AUTHORIZATION'):
s3_auth_details = env.get('swift3.auth_details')
if s3_auth_details:
if not self.s3_support:
self.logger.warning('S3 support is disabled in swauth.')
return None
@ -333,12 +334,13 @@ class Swauth(object):
'with swauth_remote mode.')
return None
try:
account = env['HTTP_AUTHORIZATION'].split(' ')[1]
account, user, sign = account.split(':')
account, user = s3_auth_details['access_key'].split(':', 1)
signature_from_user = s3_auth_details['signature']
msg = s3_auth_details['string_to_sign']
except Exception:
self.logger.debug(
'Swauth cannot parse Authorization header value %r' %
env['HTTP_AUTHORIZATION'])
'Swauth cannot parse swift3.auth_details value %r' %
(s3_auth_details, ))
return None
path = quote('/v1/%s/%s/%s' % (self.auth_account, account, user))
resp = self.make_pre_authed_request(
@ -370,7 +372,6 @@ class Swauth(object):
return None
password = creds_dict['hash']
msg = base64.urlsafe_b64decode(unquote(token))
# https://bugs.python.org/issue5285
if isinstance(password, unicode):
@ -378,9 +379,9 @@ class Swauth(object):
if isinstance(msg, unicode):
msg = msg.encode('utf-8')
s = base64.encodestring(hmac.new(password,
msg, sha1).digest()).strip()
if s != sign:
valid_signature = base64.encodestring(hmac.new(
password, msg, sha1).digest()).strip()
if signature_from_user != valid_signature:
return None
groups = [g['name'] for g in detail['groups']]
if '.admin' in groups:

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
from contextlib import contextmanager
import hashlib
import json
@ -4065,15 +4066,15 @@ class TestAuth(unittest.TestCase):
def test_s3_authorization_default_off(self):
self.assertFalse(self.test_auth.s3_support)
req = self._make_request('/v1/AUTH_account', headers={
'authorization': 's3_header'})
req = self._make_request('/v1/AUTH_account', environ={
'swift3.auth_details': {'unused': 'stuff'}})
resp = req.get_response(self.test_auth)
self.assertEqual(resp.status_int, 400) # HTTPBadRequest
self.assertTrue(resp.environ.get('swift.authorize') is None)
def test_s3_turned_off_get_groups(self):
env = \
{'HTTP_AUTHORIZATION': 's3 header'}
env = {
'swift3.auth_details': {'unused': 'stuff'}}
token = 'whatever'
self.test_auth.logger = mock.Mock()
self.assertEqual(self.test_auth.get_groups(env, token), None)
@ -4086,7 +4087,7 @@ class TestAuth(unittest.TestCase):
auth.filter_factory({'default_storage_policy': 'ssd'})(FakeApp())
self.assertEqual(ath.default_storage_policy, 'ssd')
def test_s3_creds_unicode(self):
def test_s3_creds_unicode_bad(self):
self.test_auth.s3_support = True
self.test_auth.app = FakeApp(iter([
('200 Ok', {},
@ -4095,11 +4096,37 @@ class TestAuth(unittest.TestCase):
{'name': ".admin"}]})),
('204 Ok', {'X-Container-Meta-Account-Id': 'AUTH_act'}, '')]))
env = \
{'HTTP_AUTHORIZATION': 'AWS act:user:3yW7oFFWOn+fhHMu7E47RKotL1Q=',
{'swift3.auth_details': {
'access_key': 'act:user',
# NOTE: signature uses password of 'key', not 'key)'
'signature': '3yW7oFFWOn+fhHMu7E47RKotL1Q=',
'string_to_sign': base64.urlsafe_b64decode(
'UFVUCgoKRnJpLCAyNiBGZWIgMjAxNiAwNjo0NT'
'ozNCArMDAwMAovY29udGFpbmVyMw==')},
'PATH_INFO': '/v1/AUTH_act/c1'}
token = 'not used'
self.assertEqual(self.test_auth.get_groups(env, token), None)
def test_s3_creds_unicode_good(self):
self.test_auth.s3_support = True
self.test_auth.app = FakeApp(iter([
('200 Ok', {},
json.dumps({"auth": unicode("plaintext:key)"),
"groups": [{'name': "act:usr"}, {'name': "act"},
{'name': ".admin"}]})),
('204 Ok', {'X-Container-Meta-Account-Id': 'AUTH_act'}, '')]))
env = \
{'swift3.auth_details': {
'access_key': 'act:user',
'signature': 'dElf49mbXP8t7F+P1qXZzaf3a50=',
'string_to_sign': base64.urlsafe_b64decode(
'UFVUCgoKRnJpLCAyNiBGZWIgMjAxNiAwNjo0NT'
'ozNCArMDAwMAovY29udGFpbmVyMw==')},
'PATH_INFO': '/v1/AUTH_act/c1'}
token = 'UFVUCgoKRnJpLCAyNiBGZWIgMjAxNiAwNjo0NT'\
'ozNCArMDAwMAovY29udGFpbmVyMw=='
self.assertEqual(self.test_auth.get_groups(env, token), None)
self.assertEqual(self.test_auth.get_groups(env, token),
'act:usr,act,AUTH_act')
def test_s3_only_hash_passed_to_hmac(self):
self.test_auth.s3_support = True
@ -4114,10 +4141,14 @@ class TestAuth(unittest.TestCase):
{'name': ".admin"}]})),
('204 Ok', {'X-Container-Meta-Account-Id': 'AUTH_act'}, '')]))
env = \
{'HTTP_AUTHORIZATION': 'AWS act:user:whatever',
{'swift3.auth_details': {
'access_key': 'act:user',
'signature': 'whatever',
'string_to_sign': base64.urlsafe_b64decode(
'UFVUCgoKRnJpLCAyNiBGZWIgMjAxNiAwNjo0NT'
'ozNCArMDAwMAovY29udGFpbmVyMw==')},
'PATH_INFO': '/v1/AUTH_act/c1'}
token = 'UFVUCgoKRnJpLCAyNiBGZWIgMjAxNiAwNjo0NT'\
'ozNCArMDAwMAovY29udGFpbmVyMw=='
token = 'not used'
mock_hmac_new = mock.MagicMock()
with mock.patch('hmac.new', mock_hmac_new):
self.test_auth.get_groups(env, token)