Fix security bug with host HTTP header field

Previously a user could place any value the host header of their
http request to Barbican and the result returned would have
the correct URL in the body, but the response location header was
built using the provided (and possibly malicious) host value from
the request header.

Resolved this by ensuring that the location header in the response
field matches the URL returned in the body.

Also added functional tests to ensure that this exposure won't
reappear.

Change-Id: I49a9e44be68b20f7602cf58202dd8e522a0c25c3
Closes-Bug: 1421479
This commit is contained in:
Steve Heyman 2015-02-18 19:13:50 -06:00
parent 47db4c28e3
commit d88f89f45b
11 changed files with 124 additions and 21 deletions

View File

@ -144,9 +144,8 @@ class ContainerConsumersController(object):
new_consumer.project_id = project.id
self.consumer_repo.create_or_update_from(new_consumer, container)
pecan.response.headers['Location'] = (
'/containers/{0}/consumers'.format(new_consumer.container_id)
)
url = hrefs.convert_consumer_to_href(new_consumer.container_id)
pecan.response.headers['Location'] = url
return self._return_container_data(self.container_id,
external_project_id)

View File

@ -189,9 +189,9 @@ class ContainersController(object):
self.container_repo.create_from(new_container)
pecan.response.status = 201
pecan.response.headers['Location'] = '/containers/{0}'.format(
new_container.id
)
url = hrefs.convert_container_to_href(new_container.id)
pecan.response.status = 201
pecan.response.headers['Location'] = url
return {'container_ref': url}

View File

@ -197,9 +197,10 @@ class OrdersController(object):
self.queue.process_type_order(order_id=new_order.id,
project_id=external_project_id)
pecan.response.status = 202
pecan.response.headers['Location'] = '/{0}/orders/{1}'.format(
external_project_id, new_order.id
)
url = hrefs.convert_order_to_href(new_order.id)
pecan.response.status = 202
pecan.response.headers['Location'] = url
return {'order_ref': url}

View File

@ -307,12 +307,12 @@ class SecretsController(object):
transport_key_needed=transport_key_needed,
transport_key_id=data.get('transport_key_id'))
pecan.response.status = 201
pecan.response.headers['Location'] = '/secrets/{0}'.format(
new_secret.id
)
url = hrefs.convert_secret_to_href(new_secret.id)
LOG.debug('URI to secret is %s', url)
pecan.response.status = 201
pecan.response.headers['Location'] = url
if transport_key_model is not None:
tkey_url = hrefs.convert_transport_key_to_href(
transport_key_model.id)

View File

@ -146,10 +146,10 @@ class TransportKeysController(object):
self.repo.create_from(new_key)
pecan.response.status = 201
pecan.response.headers['Location'] = '/transport_keys/{0}'.format(
new_key.id
)
url = hrefs.convert_transport_key_to_href(new_key.id)
LOG.debug('URI to transport key is %s', url)
pecan.response.status = 201
pecan.response.headers['Location'] = url
return {'transport_key_ref': url}

View File

@ -42,6 +42,11 @@ def convert_transport_key_to_href(transport_key_id):
return convert_resource_id_to_href('transport_keys', transport_key_id)
def convert_consumer_to_href(consumer_id):
"""Convert the consumer ID to a HATEOS-style href."""
return convert_resource_id_to_href('consumers', consumer_id) + '/consumers'
# TODO(hgedikli) handle list of fields in here
def convert_to_hrefs(fields):
"""Convert id's within a fields dict to HATEOS-style hrefs."""

View File

@ -19,11 +19,12 @@ from functionaltests.api.v1.models import consumer_model
class ConsumerBehaviors(base_behaviors.BaseBehaviors):
def create_consumer(self, model, container_ref):
def create_consumer(self, model, container_ref, extra_headers=None):
"""Register a consumer to a container.
:param model: The metadata for the consumer
:param container_ref: Full reference to a container
:param extra_headers: Any additional headers to pass to the request
:return: A tuple containing the response from the create
and the href to the newly registered consumer
@ -31,7 +32,8 @@ class ConsumerBehaviors(base_behaviors.BaseBehaviors):
url = '{0}/consumers'.format(container_ref)
resp = self.client.post(url, request_model=model)
resp = self.client.post(url, request_model=model,
extra_headers=extra_headers)
returned_data = self.get_json(resp)
consumer_data = returned_data['consumers']

View File

@ -194,6 +194,25 @@ class GenericContainersTestCase(BaseContainerTestCase):
resp = self.behaviors.delete_container("not_a_ref", expected_fail=True)
self.assertEqual(resp.status_code, 404)
@testcase.attr('positive')
def test_create_change_host_header(self, **kwargs):
"""Create a container with a (possibly) malicious host name header."""
test_model = container_models.ContainerModel(**self.default_data)
malicious_hostname = 'some.bad.server.com'
changed_host_header = {'Host': malicious_hostname}
resp, container_ref = self.behaviors.create_container(
test_model, extra_headers=changed_host_header)
self.assertEqual(resp.status_code, 201)
# get Location field from result and assert that it is NOT the
# malicious one.
regex = '.*{0}.*'.format(malicious_hostname)
self.assertNotRegexpMatches(resp.headers['location'], regex)
@utils.parameterized_test_case
class RSAContainersTestCase(BaseContainerTestCase):
@ -266,3 +285,22 @@ class RSAContainersTestCase(BaseContainerTestCase):
resp, container_ref = self.behaviors.create_container(test_model)
self.assertEqual(resp.status_code, 400)
@testcase.attr('positive')
def test_create_rsa_change_host_header(self, **kwargs):
"""Create a container with a (possibly) malicious host name header."""
test_model = container_models.ContainerModel(**self.default_data)
malicious_hostname = 'some.bad.server.com'
changed_host_header = {'Host': malicious_hostname}
resp, container_ref = self.behaviors.create_container(
test_model, extra_headers=changed_host_header)
self.assertEqual(resp.status_code, 201)
# get Location field from result and assert that it is NOT the
# malicious one.
regex = '.*{0}.*'.format(malicious_hostname)
self.assertNotRegexpMatches(resp.headers['location'], regex)

View File

@ -455,3 +455,22 @@ class OrdersTestCase(base.TestCase):
create_resp, order_ref = self.behaviors.create_order(test_model)
self.assertEqual(create_resp.status_code, 400)
@testcase.attr('positive')
def test_order_create_change_host_header(self, **kwargs):
"""Create an order with a (possibly) malicious host name in header."""
test_model = order_models.OrderModel(**order_create_defaults_data)
malicious_hostname = 'some.bad.server.com'
changed_host_header = {'Host': malicious_hostname}
resp, order_ref = self.behaviors.create_order(
test_model, extra_headers=changed_host_header)
self.assertEqual(resp.status_code, 202)
# get Location field from result and assert that it is NOT the
# malicious one.
regex = '.*{0}.*'.format(malicious_hostname)
self.assertNotRegexpMatches(resp.headers['location'], regex)

View File

@ -752,3 +752,22 @@ class SecretsTestCase(base.TestCase):
resp, secret_ref = self.behaviors.create_secret(test_model)
self.assertEqual(resp.status_code, 400)
@testcase.attr('positive')
def test_secret_create_change_host_header(self, **kwargs):
"""Create a secret with a (possibly) malicious host name in header."""
test_model = secret_models.SecretModel(**secret_create_defaults_data)
malicious_hostname = 'some.bad.server.com'
changed_host_header = {'Host': malicious_hostname}
resp, secret_ref = self.behaviors.create_secret(
test_model, headers=changed_host_header)
self.assertEqual(resp.status_code, 201)
# get Location field from result and assert that it is NOT the
# malicious one.
regex = '.*{0}.*'.format(malicious_hostname)
self.assertNotRegexpMatches(resp.headers['location'], regex)

View File

@ -214,3 +214,23 @@ class ConsumersTestCase(base.TestCase):
self.assertIsNotNone(consumer_data)
self.assertIn(self.consumer_data, consumer_data)
self.assertEqual(1, count)
@testcase.attr('positive')
def test_create_consumer_change_host_header(self, **kwargs):
"""Create a consumer with a (possibly) malicious host name header."""
test_model = consumer_model.ConsumerModel(**self.consumer_data)
malicious_hostname = 'some.bad.server.com'
changed_host_header = {'Host': malicious_hostname}
resp, consumer_data = self.consumer_behaviors.create_consumer(
test_model, self.container_ref, extra_headers=changed_host_header)
self.assertEqual(resp.status_code, 200)
self.assertIsNotNone(consumer_data)
# get Location field from result and assert that it is NOT the
# malicious one.
regex = '.*{0}.*'.format(malicious_hostname)
self.assertNotRegexpMatches(resp.headers['location'], regex)