Allow IPv6 addresses in auth_host config option
Change-Id: I8d29440e1ec3a56988caac265db3d18869518c5b Related-Change: I226881a35a74ef668d4cd1c6829a64c94ff185d9
This commit is contained in:
parent
2d4e1832c8
commit
96478cd62c
@ -40,6 +40,8 @@ import six
|
||||
from swift.common.swob import Request, Response
|
||||
from swift.common.utils import config_true_value, split_path
|
||||
|
||||
from swift3.utils import is_valid_ipv6
|
||||
|
||||
|
||||
PROTOCOL_NAME = 'S3 Token Authentication'
|
||||
|
||||
@ -66,7 +68,12 @@ class S3Token(object):
|
||||
"configuration options was deprecated in the Newton release "
|
||||
"in favor of auth_uri. These options may be removed in a "
|
||||
"future release.")
|
||||
auth_host = conf.get('auth_host')
|
||||
auth_host = conf.get('auth_host', '')
|
||||
if is_valid_ipv6(auth_host):
|
||||
# Note(timburke) it is an IPv6 address, so it needs to be
|
||||
# wrapped with '[]' to generate a valid IPv6 URL, based on
|
||||
# http://www.ietf.org/rfc/rfc2732.txt
|
||||
auth_host = '[%s]' % auth_host
|
||||
auth_port = int(conf.get('auth_port', 35357))
|
||||
auth_protocol = conf.get('auth_protocol', 'https')
|
||||
|
||||
|
@ -93,9 +93,11 @@ class S3TokenMiddlewareTestBase(unittest.TestCase):
|
||||
self.time_patcher = mock.patch.object(time, 'time', lambda: 1234)
|
||||
self.time_patcher.start()
|
||||
|
||||
self.app = FakeApp()
|
||||
self.conf = {
|
||||
'auth_uri': self.TEST_AUTH_URI,
|
||||
}
|
||||
self.middleware = s3_token.S3Token(self.app, self.conf)
|
||||
|
||||
self.requests_mock = rm_fixture.Fixture()
|
||||
self.requests_mock.setUp()
|
||||
@ -115,7 +117,6 @@ class S3TokenMiddlewareTestGood(S3TokenMiddlewareTestBase):
|
||||
|
||||
def setUp(self):
|
||||
super(S3TokenMiddlewareTestGood, self).setUp()
|
||||
self.middleware = s3_token.S3Token(FakeApp(), self.conf)
|
||||
|
||||
self.requests_mock.post(self.TEST_URL,
|
||||
status_code=201,
|
||||
@ -160,7 +161,7 @@ class S3TokenMiddlewareTestGood(S3TokenMiddlewareTestBase):
|
||||
self.middleware = (
|
||||
s3_token.filter_factory({'auth_protocol': 'http',
|
||||
'auth_host': host,
|
||||
'auth_port': port})(FakeApp()))
|
||||
'auth_port': port})(self.app))
|
||||
req = Request.blank('/v1/AUTH_cfa/c/o')
|
||||
req.headers['Authorization'] = 'AWS access:signature'
|
||||
req.headers['X-Storage-Token'] = 'token'
|
||||
@ -170,7 +171,7 @@ class S3TokenMiddlewareTestGood(S3TokenMiddlewareTestBase):
|
||||
|
||||
def test_authorized_trailing_slash(self):
|
||||
self.middleware = s3_token.filter_factory({
|
||||
'auth_uri': self.TEST_AUTH_URI + '/'})(FakeApp())
|
||||
'auth_uri': self.TEST_AUTH_URI + '/'})(self.app)
|
||||
req = Request.blank('/v1/AUTH_cfa/c/o')
|
||||
req.headers['Authorization'] = 'AWS access:signature'
|
||||
req.headers['X-Storage-Token'] = 'token'
|
||||
@ -189,7 +190,7 @@ class S3TokenMiddlewareTestGood(S3TokenMiddlewareTestBase):
|
||||
@mock.patch.object(requests, 'post')
|
||||
def test_insecure(self, MOCK_REQUEST):
|
||||
self.middleware = (
|
||||
s3_token.filter_factory({'insecure': 'True'})(FakeApp()))
|
||||
s3_token.filter_factory({'insecure': 'True'})(self.app))
|
||||
|
||||
text_return_value = json.dumps(GOOD_RESPONSE)
|
||||
MOCK_REQUEST.return_value = TestResponse({
|
||||
@ -212,21 +213,36 @@ class S3TokenMiddlewareTestGood(S3TokenMiddlewareTestBase):
|
||||
true_values = ['true', 'True', '1', 'yes']
|
||||
for val in true_values:
|
||||
config = {'insecure': val, 'certfile': 'false_ind'}
|
||||
middleware = s3_token.filter_factory(config)(FakeApp())
|
||||
middleware = s3_token.filter_factory(config)(self.app)
|
||||
self.assertIs(False, middleware._verify)
|
||||
|
||||
# Some "secure" values, including unexpected value.
|
||||
false_values = ['false', 'False', '0', 'no', 'someweirdvalue']
|
||||
for val in false_values:
|
||||
config = {'insecure': val, 'certfile': 'false_ind'}
|
||||
middleware = s3_token.filter_factory(config)(FakeApp())
|
||||
middleware = s3_token.filter_factory(config)(self.app)
|
||||
self.assertEqual('false_ind', middleware._verify)
|
||||
|
||||
# Default is secure.
|
||||
config = {'certfile': 'false_ind'}
|
||||
middleware = s3_token.filter_factory(config)(FakeApp())
|
||||
middleware = s3_token.filter_factory(config)(self.app)
|
||||
self.assertIs('false_ind', middleware._verify)
|
||||
|
||||
def test_ipv6_auth_host_option(self):
|
||||
config = {}
|
||||
ipv6_addr = '::FFFF:129.144.52.38'
|
||||
identity_uri = 'https://[::FFFF:129.144.52.38]:35357'
|
||||
|
||||
# Raw IPv6 address should work
|
||||
config['auth_host'] = ipv6_addr
|
||||
middleware = s3_token.filter_factory(config)(self.app)
|
||||
self.assertEqual(identity_uri, middleware._request_uri)
|
||||
|
||||
# ...as should workarounds already in use
|
||||
config['auth_host'] = '[%s]' % ipv6_addr
|
||||
middleware = s3_token.filter_factory(config)(self.app)
|
||||
self.assertEqual(identity_uri, middleware._request_uri)
|
||||
|
||||
def test_unicode_path(self):
|
||||
url = u'/v1/AUTH_cfa/c/euro\u20ac'.encode('utf8')
|
||||
req = Request.blank(urllib.parse.quote(url))
|
||||
@ -236,10 +252,6 @@ class S3TokenMiddlewareTestGood(S3TokenMiddlewareTestBase):
|
||||
|
||||
|
||||
class S3TokenMiddlewareTestBad(S3TokenMiddlewareTestBase):
|
||||
def setUp(self):
|
||||
super(S3TokenMiddlewareTestBad, self).setUp()
|
||||
self.middleware = s3_token.S3Token(FakeApp(), self.conf)
|
||||
|
||||
def test_unauthorized_token(self):
|
||||
ret = {"error":
|
||||
{"message": "EC2 access key not found.",
|
||||
|
@ -16,6 +16,7 @@
|
||||
import re
|
||||
import uuid
|
||||
import base64
|
||||
import socket
|
||||
import time
|
||||
|
||||
from swift.common.utils import get_logger
|
||||
@ -101,6 +102,21 @@ def check_path_header(req, name, length, error_msg):
|
||||
body=error_msg)
|
||||
|
||||
|
||||
def is_valid_ipv6(ip):
|
||||
# FIXME: replace with swift.common.ring.utils is_valid_ipv6
|
||||
# when swift3 requires swift 2.3 or later
|
||||
# --or--
|
||||
# swift.common.utils is_valid_ipv6 when swift3 requires swift>2.9
|
||||
"""
|
||||
Returns True if the provided ip is a valid IPv6-address
|
||||
"""
|
||||
try:
|
||||
socket.inet_pton(socket.AF_INET6, ip)
|
||||
except socket.error: # not a valid IPv6 address
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def validate_bucket_name(name):
|
||||
"""
|
||||
Validates the name of the bucket against S3 criteria,
|
||||
|
Loading…
x
Reference in New Issue
Block a user