Merge "Starting refactor of test_resources"
This commit is contained in:
commit
926cbc18b5
0
barbican/tests/api/controllers/__init__.py
Normal file
0
barbican/tests/api/controllers/__init__.py
Normal file
191
barbican/tests/api/controllers/test_secrets.py
Normal file
191
barbican/tests/api/controllers/test_secrets.py
Normal file
@ -0,0 +1,191 @@
|
||||
# Copyright (c) 2015 Rackspace, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import os
|
||||
|
||||
import mock
|
||||
|
||||
from barbican.common import validators
|
||||
from barbican.model import models
|
||||
from barbican.model import repositories
|
||||
from barbican.openstack.common import timeutils
|
||||
from barbican.tests import utils
|
||||
|
||||
project_repo = repositories.get_project_repository()
|
||||
secrets_repo = repositories.get_secret_repository()
|
||||
tkey_repo = repositories.get_transport_key_repository()
|
||||
|
||||
|
||||
class WhenTestingSecretsResource(utils.BarbicanAPIBaseTestCase):
|
||||
|
||||
def test_can_create_new_secret_one_step(self):
|
||||
resp, secret_uuid = create_secret(
|
||||
self.app,
|
||||
payload=b'not-encrypted',
|
||||
content_type='text/plain'
|
||||
)
|
||||
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
self.assertIsNotNone(secret_uuid)
|
||||
|
||||
def test_can_create_new_secret_without_payload(self):
|
||||
resp, secret_uuid = create_secret(self.app, name='test')
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
|
||||
secret = secrets_repo.get(secret_uuid, self.project_id)
|
||||
self.assertEqual(secret.name, 'test')
|
||||
self.assertEqual(secret.encrypted_data, [])
|
||||
|
||||
def test_can_create_new_secret_if_project_doesnt_exist(self):
|
||||
# Build new context
|
||||
new_project_context = self._build_context('test_project_id')
|
||||
self.app.extra_environ = {'barbican.context': new_project_context}
|
||||
|
||||
# Create a generic secret
|
||||
resp, _ = create_secret(self.app, name='test_secret')
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
|
||||
# Verify the new project was created
|
||||
project = project_repo.find_by_external_project_id('test_project_id')
|
||||
self.assertIsNotNone(project)
|
||||
|
||||
def test_can_create_new_secret_with_payload_just_under_max(self):
|
||||
large_payload = b'A' * (validators.DEFAULT_MAX_SECRET_BYTES - 8)
|
||||
resp, _ = create_secret(
|
||||
self.app,
|
||||
payload=large_payload,
|
||||
content_type='text/plain'
|
||||
)
|
||||
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
|
||||
def test_creating_new_secret_with_oversized_payload_should_fail(self):
|
||||
oversized_payload = b'A' * (validators.DEFAULT_MAX_SECRET_BYTES + 10)
|
||||
resp, _ = create_secret(
|
||||
self.app,
|
||||
payload=oversized_payload,
|
||||
content_type='text/plain',
|
||||
expect_errors=True
|
||||
)
|
||||
self.assertEqual(resp.status_int, 413)
|
||||
|
||||
def test_create_new_secret_with_empty_payload_should_fail(self):
|
||||
resp, _ = create_secret(
|
||||
self.app,
|
||||
payload='',
|
||||
content_type='text/plain',
|
||||
expect_errors=True
|
||||
)
|
||||
self.assertEqual(resp.status_int, 400)
|
||||
|
||||
def test_expiration_should_be_normalized_with_new_secret(self):
|
||||
target_expiration = '2114-02-28 12:14:44.180394-05:00'
|
||||
resp, secret_uuid = create_secret(
|
||||
self.app,
|
||||
expiration=target_expiration
|
||||
)
|
||||
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
|
||||
# Verify that the system normalizes time to UTC
|
||||
secret = secrets_repo.get(secret_uuid, self.project_id)
|
||||
local_datetime = timeutils.parse_isotime(target_expiration)
|
||||
datetime_utc = timeutils.normalize_time(local_datetime)
|
||||
|
||||
self.assertEqual(secret.expiration, datetime_utc)
|
||||
|
||||
@mock.patch('barbican.plugin.resources.store_secret')
|
||||
def test_can_create_new_secret_meta_w_transport_key(self, mocked_store):
|
||||
transport_key_model = models.TransportKey('default_plugin', 'tkey1234')
|
||||
|
||||
# TODO(jvrbanac): Look into removing this patch
|
||||
mocked_store.return_value = models.Secret(), transport_key_model
|
||||
|
||||
# Make sure to add the transport key
|
||||
tkey_repo.create_from(transport_key_model)
|
||||
transport_key_id = transport_key_model.id
|
||||
|
||||
resp, secret_uuid = create_secret(
|
||||
self.app,
|
||||
name='test',
|
||||
transport_key_needed='true'
|
||||
)
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
self.assertIsNotNone(secret_uuid)
|
||||
self.assertIn(transport_key_id, resp.json.get('transport_key_ref'))
|
||||
|
||||
@mock.patch('barbican.plugin.resources.store_secret')
|
||||
def test_can_create_new_secret_with_transport_key(self, mocked_store):
|
||||
# TODO(jvrbanac): Look into removing this patch
|
||||
mocked_store.return_value = models.Secret(), None
|
||||
|
||||
# Create Transport Key (keeping for session scoping reasons)
|
||||
transport_key_model = models.TransportKey('default_plugin', 'tkey1234')
|
||||
transport_key_id = transport_key_model.id
|
||||
tkey_repo.create_from(transport_key_model)
|
||||
|
||||
# Create a normal secret with the TransportKey
|
||||
resp, secret_uuid = create_secret(
|
||||
self.app,
|
||||
payload=b'not-encrypted',
|
||||
content_type='text/plain',
|
||||
transport_key_id=transport_key_id
|
||||
)
|
||||
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
# We're interested in the transport key values
|
||||
mocked_store.assert_called_once_with(
|
||||
'not-encrypted',
|
||||
'text/plain',
|
||||
None,
|
||||
mock.ANY,
|
||||
None,
|
||||
mock.ANY,
|
||||
transport_key_id=transport_key_id,
|
||||
transport_key_needed=False
|
||||
)
|
||||
|
||||
|
||||
# ----------------------- Helper Functions ---------------------------
|
||||
def create_secret(app, name=None, algorithm=None, bit_length=None, mode=None,
|
||||
expiration=None, payload=None, content_type=None,
|
||||
content_encoding=None, transport_key_id=None,
|
||||
transport_key_needed=None, expect_errors=False):
|
||||
request = {
|
||||
'name': name,
|
||||
'algorithm': algorithm,
|
||||
'bit_length': bit_length,
|
||||
'mode': mode,
|
||||
'expiration': expiration,
|
||||
'payload': payload,
|
||||
'payload_content_type': content_type,
|
||||
'payload_content_encoding': content_encoding,
|
||||
'transport_key_id': transport_key_id,
|
||||
'transport_key_needed': transport_key_needed
|
||||
}
|
||||
cleaned_request = {key: val for key, val in request.items()
|
||||
if val is not None}
|
||||
|
||||
resp = app.post_json(
|
||||
'/secrets/',
|
||||
cleaned_request,
|
||||
expect_errors=expect_errors
|
||||
)
|
||||
|
||||
created_uuid = None
|
||||
if resp.status_int == 201:
|
||||
secret_ref = resp.json.get('secret_ref', '')
|
||||
_, created_uuid = os.path.split(secret_ref)
|
||||
|
||||
return (resp, created_uuid)
|
30
barbican/tests/api/controllers/test_versions.py
Normal file
30
barbican/tests/api/controllers/test_versions.py
Normal file
@ -0,0 +1,30 @@
|
||||
# Copyright (c) 2015 Rackspace, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from barbican.api import controllers
|
||||
from barbican.tests import utils
|
||||
|
||||
|
||||
class WhenTestingVersionResource(utils.BarbicanAPIBaseTestCase):
|
||||
root_controller = controllers.versions.VersionController()
|
||||
|
||||
def test_should_return_200_on_get(self):
|
||||
resp = self.app.get('/')
|
||||
self.assertEqual(200, resp.status_int)
|
||||
|
||||
def test_should_return_version_json(self):
|
||||
resp = self.app.get('/')
|
||||
|
||||
self.assertTrue('v1' in resp.json)
|
||||
self.assertEqual(resp.json.get('v1'), 'current')
|
@ -37,7 +37,6 @@ from barbican.common import hrefs
|
||||
from barbican.common import validators
|
||||
import barbican.context
|
||||
from barbican.model import models
|
||||
from barbican.openstack.common import timeutils
|
||||
from barbican.tests import database_utils
|
||||
from barbican.tests import utils
|
||||
|
||||
@ -173,19 +172,6 @@ class FunctionalTest(utils.BaseTestCase, utils.MockModelRepositoryMixin,
|
||||
return controllers.versions.VersionController()
|
||||
|
||||
|
||||
class WhenTestingVersionResource(FunctionalTest):
|
||||
|
||||
def test_should_return_200_on_get(self):
|
||||
resp = self.app.get('/')
|
||||
self.assertEqual(200, resp.status_int)
|
||||
|
||||
def test_should_return_version_json(self):
|
||||
resp = self.app.get('/')
|
||||
|
||||
self.assertTrue('v1' in resp.json)
|
||||
self.assertEqual('current', resp.json['v1'])
|
||||
|
||||
|
||||
class BaseSecretsResource(FunctionalTest):
|
||||
"""Base test class for the Secrets resource."""
|
||||
|
||||
@ -285,217 +271,7 @@ class BaseSecretsResource(FunctionalTest):
|
||||
self.setup_transport_key_repository_mock()
|
||||
|
||||
|
||||
class BaseSecretTestSuite(BaseSecretsResource):
|
||||
|
||||
@mock.patch('barbican.plugin.resources.store_secret')
|
||||
def test_should_add_new_secret_with_expiration(self, mock_store_secret):
|
||||
mock_store_secret.return_value = self.secret, None
|
||||
|
||||
expiration = '2114-02-28 12:14:44.180394-05:00'
|
||||
self.secret_req.update({'expiration': expiration})
|
||||
|
||||
resp = self.app.post_json(
|
||||
'/secrets/',
|
||||
self.secret_req
|
||||
)
|
||||
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
|
||||
# Validation replaces the time.
|
||||
expected = dict(self.secret_req)
|
||||
expiration_raw = expected['expiration']
|
||||
expiration_raw = expiration_raw[:-6].replace('12', '17', 1)
|
||||
expiration_tz = timeutils.parse_isotime(expiration_raw.strip())
|
||||
expected['expiration'] = timeutils.normalize_time(expiration_tz)
|
||||
mock_store_secret.assert_called_once_with(
|
||||
self.secret_req.get('payload'),
|
||||
self.secret_req.get('payload_content_type',
|
||||
'application/octet-stream'),
|
||||
self.secret_req.get('payload_content_encoding'),
|
||||
expected,
|
||||
None,
|
||||
self.project,
|
||||
transport_key_needed=False,
|
||||
transport_key_id=None
|
||||
)
|
||||
|
||||
@mock.patch('barbican.plugin.resources.store_secret')
|
||||
def test_should_add_new_secret_one_step(self, mock_store_secret,
|
||||
check_project_id=True):
|
||||
"""Test the one-step secret creation.
|
||||
|
||||
:param check_project_id: True if the retrieved Project id needs to be
|
||||
verified, False to skip this check (necessary
|
||||
for new-Project flows).
|
||||
"""
|
||||
mock_store_secret.return_value = self.secret, None
|
||||
|
||||
resp = self.app.post_json(
|
||||
'/secrets/',
|
||||
self.secret_req
|
||||
)
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
|
||||
expected = dict(self.secret_req)
|
||||
expected['expiration'] = None
|
||||
mock_store_secret.assert_called_once_with(
|
||||
self.secret_req.get('payload'),
|
||||
self.secret_req.get('payload_content_type',
|
||||
'application/octet-stream'),
|
||||
self.secret_req.get('payload_content_encoding'),
|
||||
expected,
|
||||
None,
|
||||
self.project if check_project_id else mock.ANY,
|
||||
transport_key_needed=False,
|
||||
transport_key_id=None
|
||||
)
|
||||
|
||||
@mock.patch('barbican.plugin.resources.store_secret')
|
||||
def test_should_add_new_secret_one_step_with_tkey_id(
|
||||
self, mock_store_secret, check_project_id=True):
|
||||
"""Test the one-step secret creation with transport_key_id set
|
||||
|
||||
:param check_project_id: True if the retrieved Project id needs to be
|
||||
verified, False to skip this check (necessary
|
||||
for new-Project flows).
|
||||
"""
|
||||
mock_store_secret.return_value = self.secret, None
|
||||
self.secret_req['transport_key_id'] = self.transport_key_id
|
||||
|
||||
resp = self.app.post_json('/secrets/', self.secret_req)
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
|
||||
expected = dict(self.secret_req)
|
||||
expected['expiration'] = None
|
||||
mock_store_secret.assert_called_once_with(
|
||||
self.secret_req.get('payload'),
|
||||
self.secret_req.get('payload_content_type',
|
||||
'application/octet-stream'),
|
||||
self.secret_req.get('payload_content_encoding'),
|
||||
expected,
|
||||
None,
|
||||
self.project if check_project_id else mock.ANY,
|
||||
transport_key_needed=False,
|
||||
transport_key_id=self.transport_key_id
|
||||
)
|
||||
|
||||
def test_should_add_new_secret_if_project_does_not_exist(self):
|
||||
self.project_repo.get.return_value = None
|
||||
self.project_repo.find_by_external_project_id.return_value = None
|
||||
|
||||
self.test_should_add_new_secret_one_step(check_project_id=False)
|
||||
|
||||
args, kwargs = self.project_repo.create_from.call_args
|
||||
project = args[0]
|
||||
self.assertIsInstance(project, models.Project)
|
||||
self.assertEqual(self.external_project_id, project.external_id)
|
||||
|
||||
def test_should_add_new_secret_metadata_without_payload(self):
|
||||
self.app.post_json(
|
||||
'/secrets/',
|
||||
{'name': self.name}
|
||||
)
|
||||
|
||||
args, kwargs = self.secret_repo.create_from.call_args
|
||||
secret = args[0]
|
||||
self.assertIsInstance(secret, models.Secret)
|
||||
self.assertEqual(secret.name, self.name)
|
||||
|
||||
args, kwargs = self.project_secret_repo.create_from.call_args
|
||||
project_secret = args[0]
|
||||
self.assertIsInstance(project_secret, models.ProjectSecret)
|
||||
self.assertEqual(project_secret.project_id, self.project_entity_id)
|
||||
self.assertEqual(project_secret.secret_id, secret.id)
|
||||
|
||||
self.assertFalse(self.datum_repo.create_from.called)
|
||||
|
||||
@mock.patch('barbican.plugin.resources.store_secret')
|
||||
def test_should_add_new_secret_metadata_with_tkey(self, mock_store_secret):
|
||||
|
||||
mock_store_secret.return_value = self.secret, self.transport_key
|
||||
resp = self.app.post_json(
|
||||
'/secrets/',
|
||||
{'name': self.name,
|
||||
'transport_key_needed': 'true'}
|
||||
)
|
||||
|
||||
self.assertTrue('secret_ref' in resp.json)
|
||||
self.assertTrue('transport_key_ref' in resp.json)
|
||||
self.assertEqual(resp.json['transport_key_ref'], self.tkey_url)
|
||||
|
||||
@mock.patch('barbican.plugin.resources.store_secret')
|
||||
def test_should_add_secret_payload_almost_too_large(self,
|
||||
mock_store_secret):
|
||||
mock_store_secret.return_value = self.secret, None
|
||||
|
||||
if validators.DEFAULT_MAX_SECRET_BYTES % 4:
|
||||
raise ValueError('Tests currently require max secrets divides by '
|
||||
'4 evenly, due to base64 encoding.')
|
||||
|
||||
big_text = ''.join(['A' for x
|
||||
in moves.range(
|
||||
validators.DEFAULT_MAX_SECRET_BYTES - 8)
|
||||
])
|
||||
|
||||
self.secret_req = {'name': self.name,
|
||||
'algorithm': self.secret_algorithm,
|
||||
'bit_length': self.secret_bit_length,
|
||||
'mode': self.secret_mode,
|
||||
'payload': big_text,
|
||||
'payload_content_type': self.payload_content_type}
|
||||
|
||||
payload_encoding = self.payload_content_encoding
|
||||
if payload_encoding:
|
||||
self.secret_req['payload_content_encoding'] = payload_encoding
|
||||
self.app.post_json('/secrets/', self.secret_req)
|
||||
|
||||
def test_should_raise_due_to_payload_too_large(self):
|
||||
big_text = ''.join(['A' for x
|
||||
in moves.range(
|
||||
validators.DEFAULT_MAX_SECRET_BYTES + 10)
|
||||
])
|
||||
|
||||
self.secret_req = {'name': self.name,
|
||||
'algorithm': self.secret_algorithm,
|
||||
'bit_length': self.secret_bit_length,
|
||||
'mode': self.secret_mode,
|
||||
'payload': big_text,
|
||||
'payload_content_type': self.payload_content_type}
|
||||
|
||||
payload_encoding = self.payload_content_encoding
|
||||
if payload_encoding:
|
||||
self.secret_req['payload_content_encoding'] = payload_encoding
|
||||
|
||||
resp = self.app.post_json(
|
||||
'/secrets/',
|
||||
self.secret_req,
|
||||
expect_errors=True
|
||||
)
|
||||
self.assertEqual(resp.status_int, 413)
|
||||
|
||||
def test_should_raise_due_to_empty_payload(self):
|
||||
self.secret_req = {'name': self.name,
|
||||
'algorithm': self.secret_algorithm,
|
||||
'bit_length': self.secret_bit_length,
|
||||
'mode': self.secret_mode,
|
||||
'payload': ''}
|
||||
|
||||
payload_type = self.payload_content_type
|
||||
payload_encoding = self.payload_content_encoding
|
||||
if payload_type:
|
||||
self.secret_req['payload_content_type'] = payload_type
|
||||
if payload_encoding:
|
||||
self.secret_req['payload_content_encoding'] = payload_encoding
|
||||
|
||||
resp = self.app.post_json(
|
||||
'/secrets/',
|
||||
self.secret_req,
|
||||
expect_errors=True
|
||||
)
|
||||
self.assertEqual(resp.status_int, 400)
|
||||
|
||||
|
||||
class WhenCreatingPlainTextSecretsUsingSecretsResource(BaseSecretTestSuite):
|
||||
class WhenCreatingPlainTextSecretsUsingSecretsResource(BaseSecretsResource):
|
||||
|
||||
def test_should_raise_due_to_unsupported_payload_content_type(self):
|
||||
self.secret_req = {'name': self.name,
|
||||
@ -513,7 +289,7 @@ class WhenCreatingPlainTextSecretsUsingSecretsResource(BaseSecretTestSuite):
|
||||
self.assertEqual(resp.status_int, 400)
|
||||
|
||||
|
||||
class WhenCreatingBinarySecretsUsingSecretsResource(BaseSecretTestSuite):
|
||||
class WhenCreatingBinarySecretsUsingSecretsResource(BaseSecretsResource):
|
||||
|
||||
@property
|
||||
def root(self):
|
||||
|
@ -17,9 +17,9 @@ Supports database/repositories oriented unit testing.
|
||||
Warning: Do not merge this content with the utils.py module, as doing so will
|
||||
break the DevStack functional test discovery process.
|
||||
"""
|
||||
import oslotest.base as oslotest
|
||||
|
||||
from barbican.model import repositories
|
||||
from barbican.tests import utils
|
||||
|
||||
|
||||
def setup_in_memory_db():
|
||||
@ -40,7 +40,7 @@ def in_memory_cleanup():
|
||||
repositories.clear()
|
||||
|
||||
|
||||
class RepositoryTestCase(utils.BaseTestCase):
|
||||
class RepositoryTestCase(oslotest.BaseTestCase):
|
||||
"""Base test case class for in-memory database unit tests.
|
||||
|
||||
Database/Repository oriented unit tests should *not* modify the global
|
||||
|
@ -18,13 +18,58 @@ from os import path
|
||||
import time
|
||||
import types
|
||||
import urlparse
|
||||
import uuid
|
||||
|
||||
import mock
|
||||
import oslotest.base as oslotest
|
||||
import six
|
||||
import webtest
|
||||
|
||||
from barbican.api import app
|
||||
import barbican.context
|
||||
from barbican.tests import database_utils
|
||||
|
||||
|
||||
class BarbicanAPIBaseTestCase(oslotest.BaseTestCase):
|
||||
"""Base TestCase for all tests needing to interact with a Barbican app."""
|
||||
root_controller = None
|
||||
|
||||
def _build_context(self, project_id):
|
||||
context = barbican.context.RequestContext(
|
||||
roles=None,
|
||||
user=None,
|
||||
project=project_id,
|
||||
is_admin=True
|
||||
)
|
||||
context.policy_enforcer = None
|
||||
return context
|
||||
|
||||
def setUp(self):
|
||||
super(BarbicanAPIBaseTestCase, self).setUp()
|
||||
# Make sure we have a test db and session to work with
|
||||
database_utils.setup_in_memory_db()
|
||||
|
||||
# Generic project id to perform actions under
|
||||
self.project_id = str(uuid.uuid4())
|
||||
|
||||
# Build the test app
|
||||
wsgi_app = app.build_wsgi_app(
|
||||
controller=self.root_controller,
|
||||
transactional=True
|
||||
)
|
||||
|
||||
self.app = webtest.TestApp(wsgi_app)
|
||||
self.app.extra_environ = {
|
||||
'barbican.context': self._build_context(self.project_id)
|
||||
}
|
||||
|
||||
def tearDown(self):
|
||||
database_utils.in_memory_cleanup()
|
||||
super(BarbicanAPIBaseTestCase, self).tearDown()
|
||||
|
||||
|
||||
class BaseTestCase(oslotest.BaseTestCase):
|
||||
"""DEPRECATED - Will remove in future refactoring."""
|
||||
def setUp(self):
|
||||
super(BaseTestCase, self).setUp()
|
||||
self.order_id = 'order1234'
|
||||
|
Loading…
x
Reference in New Issue
Block a user