From 96478cd62cb1f718647faa22bae24833b9b038ad Mon Sep 17 00:00:00 2001 From: Tim Burke Date: Fri, 20 May 2016 11:23:06 -0700 Subject: [PATCH] Allow IPv6 addresses in auth_host config option Change-Id: I8d29440e1ec3a56988caac265db3d18869518c5b Related-Change: I226881a35a74ef668d4cd1c6829a64c94ff185d9 --- swift3/s3_token_middleware.py | 9 +++++- swift3/test/unit/test_s3_token_middleware.py | 34 +++++++++++++------- swift3/utils.py | 16 +++++++++ 3 files changed, 47 insertions(+), 12 deletions(-) diff --git a/swift3/s3_token_middleware.py b/swift3/s3_token_middleware.py index 86773536..4f2c2129 100644 --- a/swift3/s3_token_middleware.py +++ b/swift3/s3_token_middleware.py @@ -40,6 +40,8 @@ import six from swift.common.swob import Request, Response from swift.common.utils import config_true_value, split_path +from swift3.utils import is_valid_ipv6 + PROTOCOL_NAME = 'S3 Token Authentication' @@ -66,7 +68,12 @@ class S3Token(object): "configuration options was deprecated in the Newton release " "in favor of auth_uri. These options may be removed in a " "future release.") - auth_host = conf.get('auth_host') + auth_host = conf.get('auth_host', '') + if is_valid_ipv6(auth_host): + # Note(timburke) it is an IPv6 address, so it needs to be + # wrapped with '[]' to generate a valid IPv6 URL, based on + # http://www.ietf.org/rfc/rfc2732.txt + auth_host = '[%s]' % auth_host auth_port = int(conf.get('auth_port', 35357)) auth_protocol = conf.get('auth_protocol', 'https') diff --git a/swift3/test/unit/test_s3_token_middleware.py b/swift3/test/unit/test_s3_token_middleware.py index 3fee001b..092fe884 100644 --- a/swift3/test/unit/test_s3_token_middleware.py +++ b/swift3/test/unit/test_s3_token_middleware.py @@ -93,9 +93,11 @@ class S3TokenMiddlewareTestBase(unittest.TestCase): self.time_patcher = mock.patch.object(time, 'time', lambda: 1234) self.time_patcher.start() + self.app = FakeApp() self.conf = { 'auth_uri': self.TEST_AUTH_URI, } + self.middleware = s3_token.S3Token(self.app, self.conf) self.requests_mock = rm_fixture.Fixture() self.requests_mock.setUp() @@ -115,7 +117,6 @@ class S3TokenMiddlewareTestGood(S3TokenMiddlewareTestBase): def setUp(self): super(S3TokenMiddlewareTestGood, self).setUp() - self.middleware = s3_token.S3Token(FakeApp(), self.conf) self.requests_mock.post(self.TEST_URL, status_code=201, @@ -160,7 +161,7 @@ class S3TokenMiddlewareTestGood(S3TokenMiddlewareTestBase): self.middleware = ( s3_token.filter_factory({'auth_protocol': 'http', 'auth_host': host, - 'auth_port': port})(FakeApp())) + 'auth_port': port})(self.app)) req = Request.blank('/v1/AUTH_cfa/c/o') req.headers['Authorization'] = 'AWS access:signature' req.headers['X-Storage-Token'] = 'token' @@ -170,7 +171,7 @@ class S3TokenMiddlewareTestGood(S3TokenMiddlewareTestBase): def test_authorized_trailing_slash(self): self.middleware = s3_token.filter_factory({ - 'auth_uri': self.TEST_AUTH_URI + '/'})(FakeApp()) + 'auth_uri': self.TEST_AUTH_URI + '/'})(self.app) req = Request.blank('/v1/AUTH_cfa/c/o') req.headers['Authorization'] = 'AWS access:signature' req.headers['X-Storage-Token'] = 'token' @@ -189,7 +190,7 @@ class S3TokenMiddlewareTestGood(S3TokenMiddlewareTestBase): @mock.patch.object(requests, 'post') def test_insecure(self, MOCK_REQUEST): self.middleware = ( - s3_token.filter_factory({'insecure': 'True'})(FakeApp())) + s3_token.filter_factory({'insecure': 'True'})(self.app)) text_return_value = json.dumps(GOOD_RESPONSE) MOCK_REQUEST.return_value = TestResponse({ @@ -212,21 +213,36 @@ class S3TokenMiddlewareTestGood(S3TokenMiddlewareTestBase): true_values = ['true', 'True', '1', 'yes'] for val in true_values: config = {'insecure': val, 'certfile': 'false_ind'} - middleware = s3_token.filter_factory(config)(FakeApp()) + middleware = s3_token.filter_factory(config)(self.app) self.assertIs(False, middleware._verify) # Some "secure" values, including unexpected value. false_values = ['false', 'False', '0', 'no', 'someweirdvalue'] for val in false_values: config = {'insecure': val, 'certfile': 'false_ind'} - middleware = s3_token.filter_factory(config)(FakeApp()) + middleware = s3_token.filter_factory(config)(self.app) self.assertEqual('false_ind', middleware._verify) # Default is secure. config = {'certfile': 'false_ind'} - middleware = s3_token.filter_factory(config)(FakeApp()) + middleware = s3_token.filter_factory(config)(self.app) self.assertIs('false_ind', middleware._verify) + def test_ipv6_auth_host_option(self): + config = {} + ipv6_addr = '::FFFF:129.144.52.38' + identity_uri = 'https://[::FFFF:129.144.52.38]:35357' + + # Raw IPv6 address should work + config['auth_host'] = ipv6_addr + middleware = s3_token.filter_factory(config)(self.app) + self.assertEqual(identity_uri, middleware._request_uri) + + # ...as should workarounds already in use + config['auth_host'] = '[%s]' % ipv6_addr + middleware = s3_token.filter_factory(config)(self.app) + self.assertEqual(identity_uri, middleware._request_uri) + def test_unicode_path(self): url = u'/v1/AUTH_cfa/c/euro\u20ac'.encode('utf8') req = Request.blank(urllib.parse.quote(url)) @@ -236,10 +252,6 @@ class S3TokenMiddlewareTestGood(S3TokenMiddlewareTestBase): class S3TokenMiddlewareTestBad(S3TokenMiddlewareTestBase): - def setUp(self): - super(S3TokenMiddlewareTestBad, self).setUp() - self.middleware = s3_token.S3Token(FakeApp(), self.conf) - def test_unauthorized_token(self): ret = {"error": {"message": "EC2 access key not found.", diff --git a/swift3/utils.py b/swift3/utils.py index bc726e54..053cc5ba 100644 --- a/swift3/utils.py +++ b/swift3/utils.py @@ -16,6 +16,7 @@ import re import uuid import base64 +import socket import time from swift.common.utils import get_logger @@ -101,6 +102,21 @@ def check_path_header(req, name, length, error_msg): body=error_msg) +def is_valid_ipv6(ip): + # FIXME: replace with swift.common.ring.utils is_valid_ipv6 + # when swift3 requires swift 2.3 or later + # --or-- + # swift.common.utils is_valid_ipv6 when swift3 requires swift>2.9 + """ + Returns True if the provided ip is a valid IPv6-address + """ + try: + socket.inet_pton(socket.AF_INET6, ip) + except socket.error: # not a valid IPv6 address + return False + return True + + def validate_bucket_name(name): """ Validates the name of the bucket against S3 criteria,