Starting refactor of test_resources
Starting to split out tests for the version and secrets controller into their own test modules. For each test that I'm splitting out, the goal is to remove as many mocks as possible from the equation and clean up the individual tests a bit. Change-Id: I458703fbfd1563ce11ca71eb14d45bd30e8af9e6
This commit is contained in:
parent
9061e376eb
commit
0f45e9a834
0
barbican/tests/api/controllers/__init__.py
Normal file
0
barbican/tests/api/controllers/__init__.py
Normal file
191
barbican/tests/api/controllers/test_secrets.py
Normal file
191
barbican/tests/api/controllers/test_secrets.py
Normal file
@ -0,0 +1,191 @@
|
||||
# Copyright (c) 2015 Rackspace, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import os
|
||||
|
||||
import mock
|
||||
|
||||
from barbican.common import validators
|
||||
from barbican.model import models
|
||||
from barbican.model import repositories
|
||||
from barbican.openstack.common import timeutils
|
||||
from barbican.tests import utils
|
||||
|
||||
project_repo = repositories.get_project_repository()
|
||||
secrets_repo = repositories.get_secret_repository()
|
||||
tkey_repo = repositories.get_transport_key_repository()
|
||||
|
||||
|
||||
class WhenTestingSecretsResource(utils.BarbicanAPIBaseTestCase):
|
||||
|
||||
def test_can_create_new_secret_one_step(self):
|
||||
resp, secret_uuid = create_secret(
|
||||
self.app,
|
||||
payload=b'not-encrypted',
|
||||
content_type='text/plain'
|
||||
)
|
||||
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
self.assertIsNotNone(secret_uuid)
|
||||
|
||||
def test_can_create_new_secret_without_payload(self):
|
||||
resp, secret_uuid = create_secret(self.app, name='test')
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
|
||||
secret = secrets_repo.get(secret_uuid, self.project_id)
|
||||
self.assertEqual(secret.name, 'test')
|
||||
self.assertEqual(secret.encrypted_data, [])
|
||||
|
||||
def test_can_create_new_secret_if_project_doesnt_exist(self):
|
||||
# Build new context
|
||||
new_project_context = self._build_context('test_project_id')
|
||||
self.app.extra_environ = {'barbican.context': new_project_context}
|
||||
|
||||
# Create a generic secret
|
||||
resp, _ = create_secret(self.app, name='test_secret')
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
|
||||
# Verify the new project was created
|
||||
project = project_repo.find_by_external_project_id('test_project_id')
|
||||
self.assertIsNotNone(project)
|
||||
|
||||
def test_can_create_new_secret_with_payload_just_under_max(self):
|
||||
large_payload = b'A' * (validators.DEFAULT_MAX_SECRET_BYTES - 8)
|
||||
resp, _ = create_secret(
|
||||
self.app,
|
||||
payload=large_payload,
|
||||
content_type='text/plain'
|
||||
)
|
||||
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
|
||||
def test_creating_new_secret_with_oversized_payload_should_fail(self):
|
||||
oversized_payload = b'A' * (validators.DEFAULT_MAX_SECRET_BYTES + 10)
|
||||
resp, _ = create_secret(
|
||||
self.app,
|
||||
payload=oversized_payload,
|
||||
content_type='text/plain',
|
||||
expect_errors=True
|
||||
)
|
||||
self.assertEqual(resp.status_int, 413)
|
||||
|
||||
def test_create_new_secret_with_empty_payload_should_fail(self):
|
||||
resp, _ = create_secret(
|
||||
self.app,
|
||||
payload='',
|
||||
content_type='text/plain',
|
||||
expect_errors=True
|
||||
)
|
||||
self.assertEqual(resp.status_int, 400)
|
||||
|
||||
def test_expiration_should_be_normalized_with_new_secret(self):
|
||||
target_expiration = '2114-02-28 12:14:44.180394-05:00'
|
||||
resp, secret_uuid = create_secret(
|
||||
self.app,
|
||||
expiration=target_expiration
|
||||
)
|
||||
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
|
||||
# Verify that the system normalizes time to UTC
|
||||
secret = secrets_repo.get(secret_uuid, self.project_id)
|
||||
local_datetime = timeutils.parse_isotime(target_expiration)
|
||||
datetime_utc = timeutils.normalize_time(local_datetime)
|
||||
|
||||
self.assertEqual(secret.expiration, datetime_utc)
|
||||
|
||||
@mock.patch('barbican.plugin.resources.store_secret')
|
||||
def test_can_create_new_secret_meta_w_transport_key(self, mocked_store):
|
||||
transport_key_model = models.TransportKey('default_plugin', 'tkey1234')
|
||||
|
||||
# TODO(jvrbanac): Look into removing this patch
|
||||
mocked_store.return_value = models.Secret(), transport_key_model
|
||||
|
||||
# Make sure to add the transport key
|
||||
tkey_repo.create_from(transport_key_model)
|
||||
transport_key_id = transport_key_model.id
|
||||
|
||||
resp, secret_uuid = create_secret(
|
||||
self.app,
|
||||
name='test',
|
||||
transport_key_needed='true'
|
||||
)
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
self.assertIsNotNone(secret_uuid)
|
||||
self.assertIn(transport_key_id, resp.json.get('transport_key_ref'))
|
||||
|
||||
@mock.patch('barbican.plugin.resources.store_secret')
|
||||
def test_can_create_new_secret_with_transport_key(self, mocked_store):
|
||||
# TODO(jvrbanac): Look into removing this patch
|
||||
mocked_store.return_value = models.Secret(), None
|
||||
|
||||
# Create Transport Key (keeping for session scoping reasons)
|
||||
transport_key_model = models.TransportKey('default_plugin', 'tkey1234')
|
||||
transport_key_id = transport_key_model.id
|
||||
tkey_repo.create_from(transport_key_model)
|
||||
|
||||
# Create a normal secret with the TransportKey
|
||||
resp, secret_uuid = create_secret(
|
||||
self.app,
|
||||
payload=b'not-encrypted',
|
||||
content_type='text/plain',
|
||||
transport_key_id=transport_key_id
|
||||
)
|
||||
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
# We're interested in the transport key values
|
||||
mocked_store.assert_called_once_with(
|
||||
'not-encrypted',
|
||||
'text/plain',
|
||||
None,
|
||||
mock.ANY,
|
||||
None,
|
||||
mock.ANY,
|
||||
transport_key_id=transport_key_id,
|
||||
transport_key_needed=False
|
||||
)
|
||||
|
||||
|
||||
# ----------------------- Helper Functions ---------------------------
|
||||
def create_secret(app, name=None, algorithm=None, bit_length=None, mode=None,
|
||||
expiration=None, payload=None, content_type=None,
|
||||
content_encoding=None, transport_key_id=None,
|
||||
transport_key_needed=None, expect_errors=False):
|
||||
request = {
|
||||
'name': name,
|
||||
'algorithm': algorithm,
|
||||
'bit_length': bit_length,
|
||||
'mode': mode,
|
||||
'expiration': expiration,
|
||||
'payload': payload,
|
||||
'payload_content_type': content_type,
|
||||
'payload_content_encoding': content_encoding,
|
||||
'transport_key_id': transport_key_id,
|
||||
'transport_key_needed': transport_key_needed
|
||||
}
|
||||
cleaned_request = {key: val for key, val in request.items()
|
||||
if val is not None}
|
||||
|
||||
resp = app.post_json(
|
||||
'/secrets/',
|
||||
cleaned_request,
|
||||
expect_errors=expect_errors
|
||||
)
|
||||
|
||||
created_uuid = None
|
||||
if resp.status_int == 201:
|
||||
secret_ref = resp.json.get('secret_ref', '')
|
||||
_, created_uuid = os.path.split(secret_ref)
|
||||
|
||||
return (resp, created_uuid)
|
30
barbican/tests/api/controllers/test_versions.py
Normal file
30
barbican/tests/api/controllers/test_versions.py
Normal file
@ -0,0 +1,30 @@
|
||||
# Copyright (c) 2015 Rackspace, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from barbican.api import controllers
|
||||
from barbican.tests import utils
|
||||
|
||||
|
||||
class WhenTestingVersionResource(utils.BarbicanAPIBaseTestCase):
|
||||
root_controller = controllers.versions.VersionController()
|
||||
|
||||
def test_should_return_200_on_get(self):
|
||||
resp = self.app.get('/')
|
||||
self.assertEqual(200, resp.status_int)
|
||||
|
||||
def test_should_return_version_json(self):
|
||||
resp = self.app.get('/')
|
||||
|
||||
self.assertTrue('v1' in resp.json)
|
||||
self.assertEqual(resp.json.get('v1'), 'current')
|
@ -37,7 +37,6 @@ from barbican.common import hrefs
|
||||
from barbican.common import validators
|
||||
import barbican.context
|
||||
from barbican.model import models
|
||||
from barbican.openstack.common import timeutils
|
||||
from barbican.tests import database_utils
|
||||
from barbican.tests import utils
|
||||
|
||||
@ -173,19 +172,6 @@ class FunctionalTest(utils.BaseTestCase, utils.MockModelRepositoryMixin,
|
||||
return controllers.versions.VersionController()
|
||||
|
||||
|
||||
class WhenTestingVersionResource(FunctionalTest):
|
||||
|
||||
def test_should_return_200_on_get(self):
|
||||
resp = self.app.get('/')
|
||||
self.assertEqual(200, resp.status_int)
|
||||
|
||||
def test_should_return_version_json(self):
|
||||
resp = self.app.get('/')
|
||||
|
||||
self.assertTrue('v1' in resp.json)
|
||||
self.assertEqual('current', resp.json['v1'])
|
||||
|
||||
|
||||
class BaseSecretsResource(FunctionalTest):
|
||||
"""Base test class for the Secrets resource."""
|
||||
|
||||
@ -285,217 +271,7 @@ class BaseSecretsResource(FunctionalTest):
|
||||
self.setup_transport_key_repository_mock()
|
||||
|
||||
|
||||
class BaseSecretTestSuite(BaseSecretsResource):
|
||||
|
||||
@mock.patch('barbican.plugin.resources.store_secret')
|
||||
def test_should_add_new_secret_with_expiration(self, mock_store_secret):
|
||||
mock_store_secret.return_value = self.secret, None
|
||||
|
||||
expiration = '2114-02-28 12:14:44.180394-05:00'
|
||||
self.secret_req.update({'expiration': expiration})
|
||||
|
||||
resp = self.app.post_json(
|
||||
'/secrets/',
|
||||
self.secret_req
|
||||
)
|
||||
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
|
||||
# Validation replaces the time.
|
||||
expected = dict(self.secret_req)
|
||||
expiration_raw = expected['expiration']
|
||||
expiration_raw = expiration_raw[:-6].replace('12', '17', 1)
|
||||
expiration_tz = timeutils.parse_isotime(expiration_raw.strip())
|
||||
expected['expiration'] = timeutils.normalize_time(expiration_tz)
|
||||
mock_store_secret.assert_called_once_with(
|
||||
self.secret_req.get('payload'),
|
||||
self.secret_req.get('payload_content_type',
|
||||
'application/octet-stream'),
|
||||
self.secret_req.get('payload_content_encoding'),
|
||||
expected,
|
||||
None,
|
||||
self.project,
|
||||
transport_key_needed=False,
|
||||
transport_key_id=None
|
||||
)
|
||||
|
||||
@mock.patch('barbican.plugin.resources.store_secret')
|
||||
def test_should_add_new_secret_one_step(self, mock_store_secret,
|
||||
check_project_id=True):
|
||||
"""Test the one-step secret creation.
|
||||
|
||||
:param check_project_id: True if the retrieved Project id needs to be
|
||||
verified, False to skip this check (necessary
|
||||
for new-Project flows).
|
||||
"""
|
||||
mock_store_secret.return_value = self.secret, None
|
||||
|
||||
resp = self.app.post_json(
|
||||
'/secrets/',
|
||||
self.secret_req
|
||||
)
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
|
||||
expected = dict(self.secret_req)
|
||||
expected['expiration'] = None
|
||||
mock_store_secret.assert_called_once_with(
|
||||
self.secret_req.get('payload'),
|
||||
self.secret_req.get('payload_content_type',
|
||||
'application/octet-stream'),
|
||||
self.secret_req.get('payload_content_encoding'),
|
||||
expected,
|
||||
None,
|
||||
self.project if check_project_id else mock.ANY,
|
||||
transport_key_needed=False,
|
||||
transport_key_id=None
|
||||
)
|
||||
|
||||
@mock.patch('barbican.plugin.resources.store_secret')
|
||||
def test_should_add_new_secret_one_step_with_tkey_id(
|
||||
self, mock_store_secret, check_project_id=True):
|
||||
"""Test the one-step secret creation with transport_key_id set
|
||||
|
||||
:param check_project_id: True if the retrieved Project id needs to be
|
||||
verified, False to skip this check (necessary
|
||||
for new-Project flows).
|
||||
"""
|
||||
mock_store_secret.return_value = self.secret, None
|
||||
self.secret_req['transport_key_id'] = self.transport_key_id
|
||||
|
||||
resp = self.app.post_json('/secrets/', self.secret_req)
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
|
||||
expected = dict(self.secret_req)
|
||||
expected['expiration'] = None
|
||||
mock_store_secret.assert_called_once_with(
|
||||
self.secret_req.get('payload'),
|
||||
self.secret_req.get('payload_content_type',
|
||||
'application/octet-stream'),
|
||||
self.secret_req.get('payload_content_encoding'),
|
||||
expected,
|
||||
None,
|
||||
self.project if check_project_id else mock.ANY,
|
||||
transport_key_needed=False,
|
||||
transport_key_id=self.transport_key_id
|
||||
)
|
||||
|
||||
def test_should_add_new_secret_if_project_does_not_exist(self):
|
||||
self.project_repo.get.return_value = None
|
||||
self.project_repo.find_by_external_project_id.return_value = None
|
||||
|
||||
self.test_should_add_new_secret_one_step(check_project_id=False)
|
||||
|
||||
args, kwargs = self.project_repo.create_from.call_args
|
||||
project = args[0]
|
||||
self.assertIsInstance(project, models.Project)
|
||||
self.assertEqual(self.external_project_id, project.external_id)
|
||||
|
||||
def test_should_add_new_secret_metadata_without_payload(self):
|
||||
self.app.post_json(
|
||||
'/secrets/',
|
||||
{'name': self.name}
|
||||
)
|
||||
|
||||
args, kwargs = self.secret_repo.create_from.call_args
|
||||
secret = args[0]
|
||||
self.assertIsInstance(secret, models.Secret)
|
||||
self.assertEqual(secret.name, self.name)
|
||||
|
||||
args, kwargs = self.project_secret_repo.create_from.call_args
|
||||
project_secret = args[0]
|
||||
self.assertIsInstance(project_secret, models.ProjectSecret)
|
||||
self.assertEqual(project_secret.project_id, self.project_entity_id)
|
||||
self.assertEqual(project_secret.secret_id, secret.id)
|
||||
|
||||
self.assertFalse(self.datum_repo.create_from.called)
|
||||
|
||||
@mock.patch('barbican.plugin.resources.store_secret')
|
||||
def test_should_add_new_secret_metadata_with_tkey(self, mock_store_secret):
|
||||
|
||||
mock_store_secret.return_value = self.secret, self.transport_key
|
||||
resp = self.app.post_json(
|
||||
'/secrets/',
|
||||
{'name': self.name,
|
||||
'transport_key_needed': 'true'}
|
||||
)
|
||||
|
||||
self.assertTrue('secret_ref' in resp.json)
|
||||
self.assertTrue('transport_key_ref' in resp.json)
|
||||
self.assertEqual(resp.json['transport_key_ref'], self.tkey_url)
|
||||
|
||||
@mock.patch('barbican.plugin.resources.store_secret')
|
||||
def test_should_add_secret_payload_almost_too_large(self,
|
||||
mock_store_secret):
|
||||
mock_store_secret.return_value = self.secret, None
|
||||
|
||||
if validators.DEFAULT_MAX_SECRET_BYTES % 4:
|
||||
raise ValueError('Tests currently require max secrets divides by '
|
||||
'4 evenly, due to base64 encoding.')
|
||||
|
||||
big_text = ''.join(['A' for x
|
||||
in moves.range(
|
||||
validators.DEFAULT_MAX_SECRET_BYTES - 8)
|
||||
])
|
||||
|
||||
self.secret_req = {'name': self.name,
|
||||
'algorithm': self.secret_algorithm,
|
||||
'bit_length': self.secret_bit_length,
|
||||
'mode': self.secret_mode,
|
||||
'payload': big_text,
|
||||
'payload_content_type': self.payload_content_type}
|
||||
|
||||
payload_encoding = self.payload_content_encoding
|
||||
if payload_encoding:
|
||||
self.secret_req['payload_content_encoding'] = payload_encoding
|
||||
self.app.post_json('/secrets/', self.secret_req)
|
||||
|
||||
def test_should_raise_due_to_payload_too_large(self):
|
||||
big_text = ''.join(['A' for x
|
||||
in moves.range(
|
||||
validators.DEFAULT_MAX_SECRET_BYTES + 10)
|
||||
])
|
||||
|
||||
self.secret_req = {'name': self.name,
|
||||
'algorithm': self.secret_algorithm,
|
||||
'bit_length': self.secret_bit_length,
|
||||
'mode': self.secret_mode,
|
||||
'payload': big_text,
|
||||
'payload_content_type': self.payload_content_type}
|
||||
|
||||
payload_encoding = self.payload_content_encoding
|
||||
if payload_encoding:
|
||||
self.secret_req['payload_content_encoding'] = payload_encoding
|
||||
|
||||
resp = self.app.post_json(
|
||||
'/secrets/',
|
||||
self.secret_req,
|
||||
expect_errors=True
|
||||
)
|
||||
self.assertEqual(resp.status_int, 413)
|
||||
|
||||
def test_should_raise_due_to_empty_payload(self):
|
||||
self.secret_req = {'name': self.name,
|
||||
'algorithm': self.secret_algorithm,
|
||||
'bit_length': self.secret_bit_length,
|
||||
'mode': self.secret_mode,
|
||||
'payload': ''}
|
||||
|
||||
payload_type = self.payload_content_type
|
||||
payload_encoding = self.payload_content_encoding
|
||||
if payload_type:
|
||||
self.secret_req['payload_content_type'] = payload_type
|
||||
if payload_encoding:
|
||||
self.secret_req['payload_content_encoding'] = payload_encoding
|
||||
|
||||
resp = self.app.post_json(
|
||||
'/secrets/',
|
||||
self.secret_req,
|
||||
expect_errors=True
|
||||
)
|
||||
self.assertEqual(resp.status_int, 400)
|
||||
|
||||
|
||||
class WhenCreatingPlainTextSecretsUsingSecretsResource(BaseSecretTestSuite):
|
||||
class WhenCreatingPlainTextSecretsUsingSecretsResource(BaseSecretsResource):
|
||||
|
||||
def test_should_raise_due_to_unsupported_payload_content_type(self):
|
||||
self.secret_req = {'name': self.name,
|
||||
@ -513,7 +289,7 @@ class WhenCreatingPlainTextSecretsUsingSecretsResource(BaseSecretTestSuite):
|
||||
self.assertEqual(resp.status_int, 400)
|
||||
|
||||
|
||||
class WhenCreatingBinarySecretsUsingSecretsResource(BaseSecretTestSuite):
|
||||
class WhenCreatingBinarySecretsUsingSecretsResource(BaseSecretsResource):
|
||||
|
||||
@property
|
||||
def root(self):
|
||||
|
@ -17,9 +17,9 @@ Supports database/repositories oriented unit testing.
|
||||
Warning: Do not merge this content with the utils.py module, as doing so will
|
||||
break the DevStack functional test discovery process.
|
||||
"""
|
||||
import oslotest.base as oslotest
|
||||
|
||||
from barbican.model import repositories
|
||||
from barbican.tests import utils
|
||||
|
||||
|
||||
def setup_in_memory_db():
|
||||
@ -40,7 +40,7 @@ def in_memory_cleanup():
|
||||
repositories.clear()
|
||||
|
||||
|
||||
class RepositoryTestCase(utils.BaseTestCase):
|
||||
class RepositoryTestCase(oslotest.BaseTestCase):
|
||||
"""Base test case class for in-memory database unit tests.
|
||||
|
||||
Database/Repository oriented unit tests should *not* modify the global
|
||||
|
@ -18,13 +18,58 @@ from os import path
|
||||
import time
|
||||
import types
|
||||
import urlparse
|
||||
import uuid
|
||||
|
||||
import mock
|
||||
import oslotest.base as oslotest
|
||||
import six
|
||||
import webtest
|
||||
|
||||
from barbican.api import app
|
||||
import barbican.context
|
||||
from barbican.tests import database_utils
|
||||
|
||||
|
||||
class BarbicanAPIBaseTestCase(oslotest.BaseTestCase):
|
||||
"""Base TestCase for all tests needing to interact with a Barbican app."""
|
||||
root_controller = None
|
||||
|
||||
def _build_context(self, project_id):
|
||||
context = barbican.context.RequestContext(
|
||||
roles=None,
|
||||
user=None,
|
||||
project=project_id,
|
||||
is_admin=True
|
||||
)
|
||||
context.policy_enforcer = None
|
||||
return context
|
||||
|
||||
def setUp(self):
|
||||
super(BarbicanAPIBaseTestCase, self).setUp()
|
||||
# Make sure we have a test db and session to work with
|
||||
database_utils.setup_in_memory_db()
|
||||
|
||||
# Generic project id to perform actions under
|
||||
self.project_id = str(uuid.uuid4())
|
||||
|
||||
# Build the test app
|
||||
wsgi_app = app.build_wsgi_app(
|
||||
controller=self.root_controller,
|
||||
transactional=True
|
||||
)
|
||||
|
||||
self.app = webtest.TestApp(wsgi_app)
|
||||
self.app.extra_environ = {
|
||||
'barbican.context': self._build_context(self.project_id)
|
||||
}
|
||||
|
||||
def tearDown(self):
|
||||
database_utils.in_memory_cleanup()
|
||||
super(BarbicanAPIBaseTestCase, self).tearDown()
|
||||
|
||||
|
||||
class BaseTestCase(oslotest.BaseTestCase):
|
||||
"""DEPRECATED - Will remove in future refactoring."""
|
||||
def setUp(self):
|
||||
super(BaseTestCase, self).setUp()
|
||||
self.order_id = 'order1234'
|
||||
|
Loading…
x
Reference in New Issue
Block a user