Merge "Enable secret decrypt through 'payload' resource"
This commit is contained in:
commit
b68eec2c57
@ -134,6 +134,17 @@ class SecretController(object):
|
||||
|
||||
return transport_key_model.transport_key
|
||||
|
||||
@pecan.expose()
|
||||
@allow_all_content_types
|
||||
@controllers.handle_exceptions(u._('Secret payload retrieval'))
|
||||
@controllers.enforce_rbac('secret:decrypt')
|
||||
def payload(self, external_project_id, **kwargs):
|
||||
if pecan.request.method != 'GET':
|
||||
pecan.abort(405)
|
||||
return self._on_get_secret_payload(self.secret,
|
||||
external_project_id,
|
||||
**kwargs)
|
||||
|
||||
@index.when(method='PUT')
|
||||
@allow_all_content_types
|
||||
@controllers.handle_exceptions(u._('Secret update'))
|
||||
|
@ -26,6 +26,7 @@ import urllib
|
||||
import mock
|
||||
import pecan
|
||||
from six import moves
|
||||
from testtools import testcase
|
||||
import webtest
|
||||
|
||||
from barbican import api
|
||||
@ -152,7 +153,8 @@ class SecretAllowAllMimeTypesDecoratorTest(utils.BaseTestCase):
|
||||
self._empty_function)
|
||||
|
||||
|
||||
class FunctionalTest(utils.BaseTestCase, utils.MockModelRepositoryMixin):
|
||||
class FunctionalTest(utils.BaseTestCase, utils.MockModelRepositoryMixin,
|
||||
testcase.WithAttributes):
|
||||
|
||||
def setUp(self):
|
||||
super(FunctionalTest, self).setUp()
|
||||
@ -237,7 +239,7 @@ class BaseSecretsResource(FunctionalTest):
|
||||
|
||||
# Set up mocked secret
|
||||
self.secret = models.Secret()
|
||||
self.secret.id = '123'
|
||||
self.secret.id = utils.generate_test_uuid(tail_value=1)
|
||||
|
||||
# Set up mocked secret repo
|
||||
self.secret_repo = mock.MagicMock()
|
||||
@ -905,13 +907,40 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest):
|
||||
resp.namespace['content_types'].itervalues())
|
||||
self.assertNotIn('mime_type', resp.namespace)
|
||||
|
||||
@testcase.attr('deprecated')
|
||||
@mock.patch('barbican.plugin.resources.get_secret')
|
||||
def test_should_get_secret_as_plain_based_on_content_type(self,
|
||||
mock_get_secret):
|
||||
data = 'unencrypted_data'
|
||||
mock_get_secret.return_value = data
|
||||
|
||||
resp = self.app.get(
|
||||
'/secrets/{0}/payload/'.format(self.secret.id),
|
||||
headers={'Accept': 'text/plain'}
|
||||
)
|
||||
|
||||
self.secret_repo.get.assert_called_once_with(
|
||||
entity_id=self.secret.id,
|
||||
external_project_id=self.external_project_id,
|
||||
suppress_exception=True)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
|
||||
self.assertEqual(resp.body, data)
|
||||
mock_get_secret.assert_called_once_with(
|
||||
'text/plain',
|
||||
self.secret,
|
||||
self.project,
|
||||
None,
|
||||
None
|
||||
)
|
||||
|
||||
@mock.patch('barbican.plugin.resources.get_secret')
|
||||
def test_should_get_secret_as_plain(self, mock_get_secret):
|
||||
data = 'unencrypted_data'
|
||||
mock_get_secret.return_value = data
|
||||
|
||||
resp = self.app.get(
|
||||
'/secrets/{0}/'.format(self.secret.id),
|
||||
'/secrets/{0}/payload/'.format(self.secret.id),
|
||||
headers={'Accept': 'text/plain'}
|
||||
)
|
||||
|
||||
@ -937,7 +966,38 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest):
|
||||
|
||||
twsk = "trans_wrapped_session_key"
|
||||
resp = self.app.get(
|
||||
'/secrets/{0}/?trans_wrapped_session_key={1}&transport_key_id={2}'
|
||||
('/secrets/{0}/payload/'
|
||||
'?trans_wrapped_session_key={1}&transport_key_id={2}')
|
||||
.format(self.secret.id, twsk, self.transport_key_id),
|
||||
headers={'Accept': 'text/plain'}
|
||||
)
|
||||
|
||||
self.secret_repo.get.assert_called_once_with(
|
||||
entity_id=self.secret.id,
|
||||
external_project_id=self.external_project_id,
|
||||
suppress_exception=True)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
|
||||
self.assertEqual(resp.body, data)
|
||||
mock_get_secret.assert_called_once_with(
|
||||
'text/plain',
|
||||
self.secret,
|
||||
self.project,
|
||||
twsk,
|
||||
self.transport_key_model.transport_key
|
||||
)
|
||||
|
||||
@testcase.attr('deprecated')
|
||||
@mock.patch('barbican.plugin.resources.get_secret')
|
||||
def test_should_get_secret_as_plain_with_twsk_based_on_content_type(
|
||||
self, mock_get_secret):
|
||||
data = 'encrypted_data'
|
||||
mock_get_secret.return_value = data
|
||||
|
||||
twsk = "trans_wrapped_session_key"
|
||||
resp = self.app.get(
|
||||
('/secrets/{0}/'
|
||||
'?trans_wrapped_session_key={1}&transport_key_id={2}')
|
||||
.format(self.secret.id, twsk, self.transport_key_id),
|
||||
headers={'Accept': 'text/plain'}
|
||||
)
|
||||
@ -965,7 +1025,28 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest):
|
||||
|
||||
twsk = "trans_wrapped_session_key"
|
||||
resp = self.app.get(
|
||||
'/secrets/{0}/?trans_wrapped_session_key={1}'.format(
|
||||
'/secrets/{0}/payload/?trans_wrapped_session_key={1}'.format(
|
||||
self.secret.id, twsk),
|
||||
headers={'Accept': 'text/plain'},
|
||||
expect_errors=True
|
||||
)
|
||||
|
||||
self.secret_repo.get.assert_called_once_with(
|
||||
entity_id=self.secret.id,
|
||||
external_project_id=self.external_project_id,
|
||||
suppress_exception=True)
|
||||
self.assertEqual(resp.status_int, 400)
|
||||
|
||||
@testcase.attr('deprecated')
|
||||
@mock.patch('barbican.plugin.resources.get_secret')
|
||||
def test_should_throw_exception_for_get_when_twsk_but_no_tkey_id_old_way(
|
||||
self, mock_get_secret):
|
||||
data = 'encrypted_data'
|
||||
mock_get_secret.return_value = data
|
||||
|
||||
twsk = "trans_wrapped_session_key"
|
||||
resp = self.app.get(
|
||||
'/secrets/{0}/payload/?trans_wrapped_session_key={1}'.format(
|
||||
self.secret.id, twsk),
|
||||
headers={'Accept': 'text/plain'},
|
||||
expect_errors=True
|
||||
@ -1045,6 +1126,34 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest):
|
||||
self.datum.content_type = "application/octet-stream"
|
||||
self.datum.cypher_text = 'aaaa'
|
||||
|
||||
resp = self.app.get(
|
||||
'/secrets/{0}/payload/'.format(self.secret.id),
|
||||
headers={
|
||||
'Accept': 'application/octet-stream',
|
||||
'Accept-Encoding': 'gzip'
|
||||
}
|
||||
)
|
||||
|
||||
self.assertEqual(resp.body, data)
|
||||
|
||||
mock_get_secret.assert_called_once_with(
|
||||
'application/octet-stream',
|
||||
self.secret,
|
||||
self.project,
|
||||
None,
|
||||
None
|
||||
)
|
||||
|
||||
@testcase.attr('deprecated')
|
||||
@mock.patch('barbican.plugin.resources.get_secret')
|
||||
def test_should_get_secret_as_binary_based_on_content_type(
|
||||
self, mock_get_secret):
|
||||
data = 'unencrypted_data'
|
||||
mock_get_secret.return_value = data
|
||||
|
||||
self.datum.content_type = "application/octet-stream"
|
||||
self.datum.cypher_text = 'aaaa'
|
||||
|
||||
resp = self.app.get(
|
||||
'/secrets/{0}/'.format(self.secret.id),
|
||||
headers={
|
||||
@ -1075,7 +1184,17 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest):
|
||||
|
||||
def test_should_throw_exception_for_get_when_accept_not_supported(self):
|
||||
resp = self.app.get(
|
||||
'/secrets/{0}/'.format(self.secret.id),
|
||||
'/secrets/{0}/payload/'.format(self.secret.id),
|
||||
headers={'Accept': 'bogusaccept', 'Accept-Encoding': 'gzip'},
|
||||
expect_errors=True
|
||||
)
|
||||
self.assertEqual(resp.status_int, 406)
|
||||
|
||||
@testcase.attr('deprecated')
|
||||
def test_should_throw_exception_for_get_when_accept_not_supported_old_way(
|
||||
self):
|
||||
resp = self.app.get(
|
||||
'/secrets/{0}/payload/'.format(self.secret.id),
|
||||
headers={'Accept': 'bogusaccept', 'Accept-Encoding': 'gzip'},
|
||||
expect_errors=True
|
||||
)
|
||||
@ -1358,6 +1477,31 @@ class WhenPerformingUnallowedOperationsOnSecrets(BaseSecretsResource):
|
||||
|
||||
self.assertEqual(resp.status_int, 405)
|
||||
|
||||
def test_should_only_allow_get_for_secret_payload_uri(self):
|
||||
resp = self.app.post(
|
||||
'/secrets/{0}/payload/'.format(self.secret.id),
|
||||
'plain text',
|
||||
headers={'Accept': 'text/plain'},
|
||||
expect_errors=True
|
||||
)
|
||||
self.assertEqual(resp.status_int, 405)
|
||||
|
||||
resp = self.app.put(
|
||||
'/secrets/{0}/payload/'.format(self.secret.id),
|
||||
'plain text',
|
||||
headers={'Accept': 'text/plain'},
|
||||
expect_errors=True
|
||||
)
|
||||
self.assertEqual(resp.status_int, 405)
|
||||
|
||||
resp = self.app.delete(
|
||||
'/secrets/{0}/payload/'.format(self.secret.id),
|
||||
'plain text',
|
||||
headers={'Accept': 'text/plain'},
|
||||
expect_errors=True
|
||||
)
|
||||
self.assertEqual(resp.status_int, 405)
|
||||
|
||||
|
||||
class WhenCreatingOrdersUsingOrdersResource(FunctionalTest):
|
||||
|
||||
|
@ -13,8 +13,6 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
"""
|
||||
|
||||
|
||||
from functionaltests.api.v1.behaviors import base_behaviors
|
||||
from functionaltests.api.v1.models import secret_models
|
||||
|
||||
@ -53,6 +51,20 @@ class SecretBehaviors(base_behaviors.BaseBehaviors):
|
||||
def get_secret(self, secret_ref, payload_content_type,
|
||||
payload_content_encoding=None):
|
||||
|
||||
headers = {'Accept': payload_content_type,
|
||||
'Accept-Encoding': payload_content_encoding}
|
||||
|
||||
return self.client.get(secret_ref + '/payload',
|
||||
extra_headers=headers)
|
||||
|
||||
def get_secret_based_on_content_type(self, secret_ref,
|
||||
payload_content_type,
|
||||
payload_content_encoding=None):
|
||||
"""Retrieves a secret's payload based on the content type
|
||||
|
||||
NOTE: This way will be deprecated in subsequent versions of the API.
|
||||
"""
|
||||
|
||||
headers = {'Accept': payload_content_type,
|
||||
'Accept-Encoding': payload_content_encoding}
|
||||
|
||||
|
@ -598,6 +598,45 @@ class SecretsTestCase(base.TestCase):
|
||||
else:
|
||||
self.assertIn(test_model.payload, get_resp.content)
|
||||
|
||||
@utils.parameterized_dataset({
|
||||
'text_content_type_none_encoding': {
|
||||
'payload_content_type': 'text/plain',
|
||||
'payload_content_encoding': None},
|
||||
|
||||
'utf8_text_content_type_none_encoding': {
|
||||
'payload_content_type': 'text/plain; charset=utf-8',
|
||||
'payload_content_encoding': None},
|
||||
|
||||
'no_space_utf8_text_content_type_none_encoding': {
|
||||
'payload_content_type': 'text/plain;charset=utf-8',
|
||||
'payload_content_encoding': None},
|
||||
|
||||
'octet_content_type_base64_encoding': {
|
||||
'payload_content_type': 'application/octet-stream',
|
||||
'payload_content_encoding': 'base64'}
|
||||
})
|
||||
@testcase.attr('positive', 'deprecated')
|
||||
def test_secret_create_defaults_valid_types_and_encoding_old_way(self,
|
||||
**kwargs):
|
||||
"""Creates secrets with various content types and encodings."""
|
||||
test_model = secret_models.SecretModel(**secret_create_defaults_data)
|
||||
test_model.override_values(**kwargs)
|
||||
payload_content_encoding = test_model.payload_content_encoding
|
||||
|
||||
resp, secret_ref = self.behaviors.create_secret(test_model)
|
||||
self.assertEqual(resp.status_code, 201)
|
||||
|
||||
get_resp = self.behaviors.get_secret_based_on_content_type(
|
||||
secret_ref,
|
||||
payload_content_type=test_model.payload_content_type,
|
||||
payload_content_encoding=payload_content_encoding)
|
||||
|
||||
if payload_content_encoding == 'base64':
|
||||
self.assertIn(test_model.payload,
|
||||
binascii.b2a_base64(get_resp.content))
|
||||
else:
|
||||
self.assertIn(test_model.payload, get_resp.content)
|
||||
|
||||
@utils.parameterized_dataset({
|
||||
'empty_content_type_and_encoding': {
|
||||
'payload_content_type': '',
|
||||
|
Loading…
x
Reference in New Issue
Block a user