Merge "User Metadata API and tests"
This commit is contained in:
commit
967d8a143b
180
barbican/api/controllers/secretmeta.py
Normal file
180
barbican/api/controllers/secretmeta.py
Normal file
@ -0,0 +1,180 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import collections
|
||||
import pecan
|
||||
|
||||
from barbican import api
|
||||
from barbican.api import controllers
|
||||
from barbican.common import hrefs
|
||||
from barbican.common import utils
|
||||
from barbican.common import validators
|
||||
from barbican import i18n as u
|
||||
from barbican.model import repositories as repo
|
||||
|
||||
LOG = utils.getLogger(__name__)
|
||||
|
||||
|
||||
def _secret_metadata_not_found():
|
||||
"""Throw exception indicating secret metadata not found."""
|
||||
pecan.abort(404, u._('Not Found. Sorry but your secret metadata is in '
|
||||
'another castle.'))
|
||||
|
||||
|
||||
class SecretMetadataController(controllers.ACLMixin):
|
||||
"""Handles SecretMetadata requests by a given secret id."""
|
||||
|
||||
def __init__(self, secret):
|
||||
LOG.debug('=== Creating SecretMetadataController ===')
|
||||
self.secret = secret
|
||||
self.secret_project_id = self.secret.project.external_id
|
||||
self.secret_repo = repo.get_secret_repository()
|
||||
self.user_meta_repo = repo.get_secret_user_meta_repository()
|
||||
self.metadata_validator = validators.NewSecretMetadataValidator()
|
||||
self.metadatum_validator = validators.NewSecretMetadatumValidator()
|
||||
|
||||
@pecan.expose(generic=True)
|
||||
def index(self, **kwargs):
|
||||
pecan.abort(405) # HTTP 405 Method Not Allowed as default
|
||||
|
||||
@index.when(method='GET', template='json')
|
||||
@utils.allow_all_content_types
|
||||
@controllers.handle_exceptions(u._('Secret metadata retrieval'))
|
||||
@controllers.enforce_rbac('secret_meta:get')
|
||||
def on_get(self, external_project_id, **kwargs):
|
||||
"""Handles retrieval of existing secret metadata requests."""
|
||||
|
||||
LOG.debug('Start secret metadata on_get '
|
||||
'for secret-ID %s:', self.secret.id)
|
||||
|
||||
resp = self.user_meta_repo.get_metadata_for_secret(self.secret.id)
|
||||
pecan.response.status = 200
|
||||
|
||||
return {"metadata": resp}
|
||||
|
||||
@index.when(method='PUT', template='json')
|
||||
@controllers.handle_exceptions(u._('Secret metadata creation'))
|
||||
@controllers.enforce_rbac('secret_meta:put')
|
||||
@controllers.enforce_content_types(['application/json'])
|
||||
def on_put(self, external_project_id, **kwargs):
|
||||
"""Handles creation/update of secret metadata."""
|
||||
data = api.load_body(pecan.request, validator=self.metadata_validator)
|
||||
LOG.debug('Start secret metadata on_put...%s', data)
|
||||
|
||||
self.user_meta_repo.create_replace_user_metadata(self.secret.id,
|
||||
data)
|
||||
|
||||
url = hrefs.convert_user_meta_to_href(self.secret.id)
|
||||
LOG.debug('URI to secret metadata is %s', url)
|
||||
|
||||
pecan.response.status = 201
|
||||
return {'metadata_ref': url}
|
||||
|
||||
@index.when(method='POST', template='json')
|
||||
@controllers.handle_exceptions(u._('Secret metadatum creation'))
|
||||
@controllers.enforce_rbac('secret_meta:post')
|
||||
@controllers.enforce_content_types(['application/json'])
|
||||
def on_post(self, external_project_id, **kwargs):
|
||||
"""Handles creation of secret metadatum."""
|
||||
|
||||
data = api.load_body(pecan.request, validator=self.metadatum_validator)
|
||||
|
||||
key = data.get('key')
|
||||
value = data.get('value')
|
||||
|
||||
metadata = self.user_meta_repo.get_metadata_for_secret(self.secret.id)
|
||||
if key in metadata:
|
||||
pecan.abort(409, u._('Conflict. Key in request is already in the '
|
||||
'secret metadata'))
|
||||
|
||||
LOG.debug('Start secret metadatum on_post...%s', metadata)
|
||||
self.user_meta_repo.create_replace_user_metadatum(self.secret.id,
|
||||
key, value)
|
||||
|
||||
url = hrefs.convert_user_meta_to_href(self.secret.id)
|
||||
LOG.debug('URI to secret metadata is %s', url)
|
||||
|
||||
pecan.response.status = 201
|
||||
return {'metadata_ref': url + "/%s {key: %s, value:%s}" % (key,
|
||||
key,
|
||||
value)}
|
||||
|
||||
|
||||
class SecretMetadatumController(controllers.ACLMixin):
|
||||
|
||||
def __init__(self, secret):
|
||||
LOG.debug('=== Creating SecretMetadatumController ===')
|
||||
self.user_meta_repo = repo.get_secret_user_meta_repository()
|
||||
self.secret = secret
|
||||
self.metadatum_validator = validators.NewSecretMetadatumValidator()
|
||||
|
||||
@pecan.expose(generic=True)
|
||||
def index(self, **kwargs):
|
||||
pecan.abort(405) # HTTP 405 Method Not Allowed as default
|
||||
|
||||
@index.when(method='GET', template='json')
|
||||
@controllers.handle_exceptions(u._('Secret metadatum retrieval'))
|
||||
@controllers.enforce_rbac('secret_meta:get')
|
||||
def on_get(self, external_project_id, remainder, **kwargs):
|
||||
"""Handles retrieval of existing secret metadatum."""
|
||||
|
||||
LOG.debug('Start secret metadatum on_get '
|
||||
'for secret-ID %s:', self.secret.id)
|
||||
|
||||
metadata = self.user_meta_repo.get_metadata_for_secret(self.secret.id)
|
||||
if remainder in metadata:
|
||||
pecan.response.status = 200
|
||||
pair = {'key': remainder, 'value': metadata[remainder]}
|
||||
return collections.OrderedDict(sorted(pair.items()))
|
||||
else:
|
||||
_secret_metadata_not_found()
|
||||
|
||||
@index.when(method='PUT', template='json')
|
||||
@utils.allow_all_content_types
|
||||
@controllers.handle_exceptions(u._('Secret metadatum update'))
|
||||
@controllers.enforce_rbac('secret_meta:put')
|
||||
@controllers.enforce_content_types(['application/json'])
|
||||
def on_put(self, external_project_id, remainder, **kwargs):
|
||||
"""Handles update of existing secret metadatum."""
|
||||
metadata = self.user_meta_repo.get_metadata_for_secret(self.secret.id)
|
||||
data = api.load_body(pecan.request, validator=self.metadatum_validator)
|
||||
|
||||
key = data.get('key')
|
||||
value = data.get('value')
|
||||
|
||||
if remainder not in metadata:
|
||||
_secret_metadata_not_found()
|
||||
elif remainder != key:
|
||||
msg = 'Key in request data does not match key in the '
|
||||
'request url.'
|
||||
pecan.abort(409, msg)
|
||||
else:
|
||||
LOG.debug('Start secret metadatum on_put...%s', metadata)
|
||||
|
||||
self.user_meta_repo.create_replace_user_metadatum(self.secret.id,
|
||||
key, value)
|
||||
|
||||
pecan.response.status = 200
|
||||
pair = {'key': key, 'value': value}
|
||||
return collections.OrderedDict(sorted(pair.items()))
|
||||
|
||||
@index.when(method='DELETE', template='json')
|
||||
@controllers.handle_exceptions(u._('Secret metadatum removal'))
|
||||
@controllers.enforce_rbac('secret_meta:delete')
|
||||
def on_delete(self, external_project_id, remainder, **kwargs):
|
||||
"""Handles removal of existing secret metadatum."""
|
||||
|
||||
self.user_meta_repo.delete_metadatum(self.secret.id,
|
||||
remainder)
|
||||
msg = 'Deleted secret metadatum: %s for secret %s' % (remainder,
|
||||
self.secret.id)
|
||||
pecan.response.status = 204
|
||||
LOG.info(msg)
|
@ -16,6 +16,7 @@ from six.moves.urllib import parse
|
||||
from barbican import api
|
||||
from barbican.api import controllers
|
||||
from barbican.api.controllers import acls
|
||||
from barbican.api.controllers import secretmeta
|
||||
from barbican.common import exception
|
||||
from barbican.common import hrefs
|
||||
from barbican.common import quota
|
||||
@ -72,8 +73,16 @@ class SecretController(controllers.ACLMixin):
|
||||
def _lookup(self, sub_resource, *remainder):
|
||||
if sub_resource == 'acl':
|
||||
return acls.SecretACLsController(self.secret), remainder
|
||||
elif sub_resource == 'metadata':
|
||||
if len(remainder) == 0 or remainder == ('',):
|
||||
return secretmeta.SecretMetadataController(self.secret), \
|
||||
remainder
|
||||
else:
|
||||
return secretmeta.SecretMetadatumController(self.secret), \
|
||||
remainder
|
||||
else:
|
||||
pecan.abort(405) # only 'acl' as sub-resource is supported
|
||||
# only 'acl' and 'metadata' as sub-resource is supported
|
||||
pecan.abort(405)
|
||||
|
||||
@pecan.expose(generic=True)
|
||||
def index(self, **kwargs):
|
||||
|
@ -89,6 +89,18 @@ class MissingMetadataField(BarbicanHTTPException):
|
||||
status_code = 400
|
||||
|
||||
|
||||
class InvalidMetadataRequest(BarbicanHTTPException):
|
||||
message = u._("Invalid Metadata. Keys and Values must be Strings.")
|
||||
client_message = message
|
||||
status_code = 400
|
||||
|
||||
|
||||
class InvalidMetadataKey(BarbicanHTTPException):
|
||||
message = u._("Invalid Key. Key must be URL safe.")
|
||||
client_message = message
|
||||
status_code = 400
|
||||
|
||||
|
||||
class InvalidSubjectDN(BarbicanHTTPException):
|
||||
message = u._("Invalid subject DN: %(subject_dn)s")
|
||||
client_message = message
|
||||
|
@ -46,6 +46,11 @@ def convert_consumer_to_href(consumer_id):
|
||||
return convert_resource_id_to_href('consumers', consumer_id) + '/consumers'
|
||||
|
||||
|
||||
def convert_user_meta_to_href(secret_id):
|
||||
"""Convert the consumer ID to a HATEOAS-style href."""
|
||||
return convert_resource_id_to_href('secrets', secret_id) + '/metadata'
|
||||
|
||||
|
||||
def convert_certificate_authority_to_href(ca_id):
|
||||
"""Convert the ca ID to a HATEOAS-style href."""
|
||||
return convert_resource_id_to_href('cas', ca_id)
|
||||
|
@ -15,6 +15,7 @@ API JSON validators.
|
||||
|
||||
import abc
|
||||
import base64
|
||||
import re
|
||||
|
||||
import jsonschema as schema
|
||||
from ldap3.core import exceptions as ldap_exceptions
|
||||
@ -353,6 +354,104 @@ class NewSecretValidator(ValidatorBase):
|
||||
return payload.strip()
|
||||
|
||||
|
||||
class NewSecretMetadataValidator(ValidatorBase):
|
||||
"""Validate new secret metadata."""
|
||||
|
||||
def __init__(self):
|
||||
self.name = 'SecretMetadata'
|
||||
self.schema = {
|
||||
"type": "object",
|
||||
"$schema": "http://json-schema.org/draft-03/schema",
|
||||
"properties": {
|
||||
"metadata": {"type": "object", "required": True},
|
||||
}
|
||||
}
|
||||
|
||||
def validate(self, json_data, parent_schema=None):
|
||||
"""Validate the input JSON for the schema for secret metadata."""
|
||||
schema_name = self._full_name(parent_schema)
|
||||
self._assert_schema_is_valid(json_data, schema_name)
|
||||
return self._extract_metadata(json_data)
|
||||
|
||||
def _extract_metadata(self, json_data):
|
||||
"""Extracts and returns the metadata from the JSON data."""
|
||||
metadata = json_data['metadata']
|
||||
|
||||
for key in metadata:
|
||||
# make sure key is a string and url-safe.
|
||||
if not isinstance(key, six.string_types):
|
||||
raise exception.InvalidMetadataRequest()
|
||||
self._check_string_url_safe(key)
|
||||
|
||||
# make sure value is a string.
|
||||
value = metadata[key]
|
||||
if not isinstance(value, six.string_types):
|
||||
raise exception.InvalidMetadataRequest()
|
||||
|
||||
# If key is not lowercase, then change it
|
||||
if not key.islower():
|
||||
del metadata[key]
|
||||
metadata[key.lower()] = value
|
||||
|
||||
return metadata
|
||||
|
||||
def _check_string_url_safe(self, string):
|
||||
"""Checks if string can be part of a URL."""
|
||||
if not re.match("^[A-Za-z0-9_-]*$", string):
|
||||
raise exception.InvalidMetadataKey()
|
||||
|
||||
|
||||
class NewSecretMetadatumValidator(ValidatorBase):
|
||||
"""Validate new secret metadatum."""
|
||||
|
||||
def __init__(self):
|
||||
self.name = 'SecretMetadatum'
|
||||
self.schema = {
|
||||
"type": "object",
|
||||
"$schema": "http://json-schema.org/draft-03/schema",
|
||||
"properties": {
|
||||
"key": {
|
||||
"type": "string",
|
||||
"maxLength": 255,
|
||||
"required": True
|
||||
},
|
||||
"value": {
|
||||
"type": "string",
|
||||
"maxLength": 255,
|
||||
"required": True
|
||||
},
|
||||
},
|
||||
"additionalProperties": False
|
||||
}
|
||||
|
||||
def validate(self, json_data, parent_schema=None):
|
||||
"""Validate the input JSON for the schema for secret metadata."""
|
||||
schema_name = self._full_name(parent_schema)
|
||||
self._assert_schema_is_valid(json_data, schema_name)
|
||||
|
||||
key = self._extract_key(json_data)
|
||||
value = self._extract_value(json_data)
|
||||
|
||||
return {"key": key, "value": value}
|
||||
|
||||
def _extract_key(self, json_data):
|
||||
"""Extracts and returns the metadata from the JSON data."""
|
||||
key = json_data['key']
|
||||
self._check_string_url_safe(key)
|
||||
key = key.lower()
|
||||
return key
|
||||
|
||||
def _extract_value(self, json_data):
|
||||
"""Extracts and returns the metadata from the JSON data."""
|
||||
value = json_data['value']
|
||||
return value
|
||||
|
||||
def _check_string_url_safe(self, string):
|
||||
"""Checks if string can be part of a URL."""
|
||||
if not re.match("^[A-Za-z0-9_-]*$", string):
|
||||
raise exception.InvalidMetadataKey()
|
||||
|
||||
|
||||
class CACommonHelpersMixin(object):
|
||||
def _validate_subject_dn_data(self, subject_dn):
|
||||
"""Confirm that the subject_dn contains valid data
|
||||
|
@ -758,18 +758,13 @@ class SecretStoreMetadatumRepo(BaseRepo):
|
||||
|
||||
session = get_session()
|
||||
|
||||
try:
|
||||
query = session.query(models.SecretStoreMetadatum)
|
||||
query = query.filter_by(deleted=False)
|
||||
query = session.query(models.SecretStoreMetadatum)
|
||||
query = query.filter_by(deleted=False)
|
||||
|
||||
query = query.filter(
|
||||
models.SecretStoreMetadatum.secret_id == secret_id)
|
||||
|
||||
metadata = query.all()
|
||||
|
||||
except sa_orm.exc.NoResultFound:
|
||||
metadata = {}
|
||||
query = query.filter(
|
||||
models.SecretStoreMetadatum.secret_id == secret_id)
|
||||
|
||||
metadata = query.all()
|
||||
return {m.key: m.value for m in metadata}
|
||||
|
||||
def _do_entity_name(self):
|
||||
@ -792,38 +787,57 @@ class SecretUserMetadatumRepo(BaseRepo):
|
||||
Stores key/value information on behalf of a Secret.
|
||||
"""
|
||||
|
||||
def save(self, metadata, secret_model):
|
||||
"""Saves the the specified metadata for the secret.
|
||||
|
||||
:raises NotFound if entity does not exist.
|
||||
"""
|
||||
def create_replace_user_metadata(self, secret_id, metadata):
|
||||
"""Creates or replaces the the specified metadata for the secret."""
|
||||
now = timeutils.utcnow()
|
||||
|
||||
session = get_session()
|
||||
query = session.query(models.SecretUserMetadatum)
|
||||
query = query.filter_by(secret_id=secret_id)
|
||||
query.delete()
|
||||
|
||||
for k, v in metadata.items():
|
||||
meta_model = models.SecretUserMetadatum(k, v)
|
||||
meta_model.secret_id = secret_id
|
||||
meta_model.updated_at = now
|
||||
meta_model.secret = secret_model
|
||||
meta_model.save()
|
||||
meta_model.save(session=session)
|
||||
|
||||
def get_metadata_for_secret(self, secret_id):
|
||||
"""Returns a dict of SecretUserMetadatum instances."""
|
||||
|
||||
session = get_session()
|
||||
|
||||
try:
|
||||
query = session.query(models.SecretUserMetadatum)
|
||||
query = query.filter_by(deleted=False)
|
||||
query = session.query(models.SecretUserMetadatum)
|
||||
query = query.filter_by(deleted=False)
|
||||
|
||||
query = query.filter(
|
||||
models.SecretUserMetadatum.secret_id == secret_id)
|
||||
|
||||
metadata = query.all()
|
||||
|
||||
except sa_orm.exc.NoResultFound:
|
||||
metadata = {}
|
||||
query = query.filter(
|
||||
models.SecretUserMetadatum.secret_id == secret_id)
|
||||
|
||||
metadata = query.all()
|
||||
return {m.key: m.value for m in metadata}
|
||||
|
||||
def create_replace_user_metadatum(self, secret_id, key, value):
|
||||
now = timeutils.utcnow()
|
||||
|
||||
session = get_session()
|
||||
query = session.query(models.SecretUserMetadatum)
|
||||
query = query.filter_by(secret_id=secret_id)
|
||||
query = query.filter_by(key=key)
|
||||
query.delete()
|
||||
|
||||
meta_model = models.SecretUserMetadatum(key, value)
|
||||
meta_model.secret_id = secret_id
|
||||
meta_model.updated_at = now
|
||||
meta_model.save(session=session)
|
||||
|
||||
def delete_metadatum(self, secret_id, key):
|
||||
"""Removes a key from a SecretUserMetadatum instances."""
|
||||
session = get_session()
|
||||
|
||||
query = session.query(models.SecretUserMetadatum)
|
||||
query = query.filter_by(secret_id=secret_id)
|
||||
query = query.filter_by(key=key)
|
||||
query.delete()
|
||||
|
||||
def _do_entity_name(self):
|
||||
"""Sub-class hook: return entity name, such as for debugging."""
|
||||
return "SecretUserMetadatum"
|
||||
|
341
barbican/tests/api/controllers/test_secretmeta.py
Normal file
341
barbican/tests/api/controllers/test_secretmeta.py
Normal file
@ -0,0 +1,341 @@
|
||||
# Copyright (c) 2016 IBM
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import json
|
||||
import mock
|
||||
import os
|
||||
import uuid
|
||||
|
||||
from barbican.tests import utils
|
||||
|
||||
|
||||
@utils.parameterized_test_case
|
||||
class WhenTestingSecretMetadataResource(utils.BarbicanAPIBaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(WhenTestingSecretMetadataResource, self).setUp()
|
||||
|
||||
self.valid_metadata = {
|
||||
"metadata": {
|
||||
"latitude": "30.393805",
|
||||
"longitude": "-97.724077"
|
||||
}
|
||||
}
|
||||
|
||||
def test_create_secret_metadata(self):
|
||||
secret_resp, secret_uuid = create_secret(self.app)
|
||||
meta_resp = create_secret_metadata(self.app,
|
||||
self.valid_metadata,
|
||||
secret_resp)
|
||||
|
||||
self.assertEqual(201, meta_resp.status_int)
|
||||
self.assertIsNotNone(meta_resp.json)
|
||||
|
||||
def test_can_get_secret_metadata(self):
|
||||
secret_resp, secret_uuid = create_secret(self.app)
|
||||
meta_resp = create_secret_metadata(self.app,
|
||||
self.valid_metadata,
|
||||
secret_resp)
|
||||
|
||||
self.assertEqual(201, meta_resp.status_int)
|
||||
|
||||
get_resp = self.app.get('/secrets/%s/metadata' % secret_resp)
|
||||
|
||||
self.assertEqual(200, get_resp.status_int)
|
||||
self.assertEqual(self.valid_metadata, get_resp.json)
|
||||
|
||||
def test_get_secret_metadata_invalid_secret_should_fail(self):
|
||||
secret_resp, secret_uuid = create_secret(self.app)
|
||||
create_secret_metadata(self.app,
|
||||
self.valid_metadata,
|
||||
secret_resp)
|
||||
|
||||
get_resp = self.app.get('/secrets/%s/metadata' % uuid.uuid4().hex,
|
||||
expect_errors=True)
|
||||
self.assertEqual(404, get_resp.status_int)
|
||||
|
||||
|
||||
@utils.parameterized_test_case
|
||||
class WhenTestingSecretMetadatumResource(utils.BarbicanAPIBaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(WhenTestingSecretMetadatumResource, self).setUp()
|
||||
|
||||
self.valid_metadata = {
|
||||
"metadata": {
|
||||
"latitude": "30.393805",
|
||||
"longitude": "-97.724077"
|
||||
}
|
||||
}
|
||||
|
||||
self.updated_valid_metadata = {
|
||||
"metadata": {
|
||||
"latitude": "30.393805",
|
||||
"longitude": "-97.724077",
|
||||
"access-limit": "2"
|
||||
}
|
||||
}
|
||||
self.valid_metadatum = {
|
||||
'key': 'access-limit',
|
||||
'value': '2'
|
||||
}
|
||||
|
||||
@mock.patch('barbican.model.repositories.SecretUserMetadatumRepo.'
|
||||
'get_metadata_for_secret')
|
||||
def test_can_create_secret_metadatum(self, mocked_get):
|
||||
secret_resp, secret_uuid = create_secret(self.app)
|
||||
|
||||
mocked_get.return_value = self.valid_metadata['metadata']
|
||||
meta_resp = create_secret_metadatum(self.app,
|
||||
self.valid_metadatum,
|
||||
secret_resp)
|
||||
|
||||
self.assertEqual(201, meta_resp.status_int)
|
||||
self.assertIsNotNone(meta_resp.json)
|
||||
|
||||
@mock.patch('barbican.model.repositories.SecretUserMetadatumRepo.'
|
||||
'get_metadata_for_secret')
|
||||
def test_conflict_create_same_key_secret_metadatum(self, mocked_get):
|
||||
secret_resp, secret_uuid = create_secret(self.app)
|
||||
|
||||
mocked_get.return_value = self.valid_metadata['metadata']
|
||||
latitude_metadatum = {
|
||||
"key": "latitude",
|
||||
"value": "30.393805"
|
||||
}
|
||||
meta_resp = create_secret_metadatum(self.app,
|
||||
latitude_metadatum,
|
||||
secret_resp,
|
||||
expect_errors=True)
|
||||
|
||||
self.assertEqual(409, meta_resp.status_int)
|
||||
self.assertIsNotNone(meta_resp.json)
|
||||
|
||||
@mock.patch('barbican.model.repositories.SecretUserMetadatumRepo.'
|
||||
'get_metadata_for_secret')
|
||||
def test_can_delete_secret_metadatum(self, mocked_get):
|
||||
secret_resp, secret_uuid = create_secret(self.app)
|
||||
|
||||
mocked_get.return_value = self.valid_metadata['metadata']
|
||||
meta_resp = create_secret_metadatum(self.app,
|
||||
self.valid_metadatum,
|
||||
secret_resp)
|
||||
self.assertEqual(201, meta_resp.status_int)
|
||||
|
||||
delete_resp = self.app.delete('/secrets/%s/metadata/access-limit' %
|
||||
secret_resp)
|
||||
|
||||
self.assertEqual(204, delete_resp.status_int)
|
||||
|
||||
@mock.patch('barbican.model.repositories.SecretUserMetadatumRepo.'
|
||||
'get_metadata_for_secret')
|
||||
def test_can_get_secret_metadatum(self, mocked_get):
|
||||
secret_resp, secret_uuid = create_secret(self.app)
|
||||
|
||||
mocked_get.return_value = self.valid_metadata['metadata']
|
||||
meta_resp = create_secret_metadatum(self.app,
|
||||
self.valid_metadatum,
|
||||
secret_resp)
|
||||
self.assertEqual(201, meta_resp.status_int)
|
||||
|
||||
mocked_get.return_value = self.updated_valid_metadata['metadata']
|
||||
get_resp = self.app.get('/secrets/%s/metadata/access-limit' %
|
||||
secret_resp)
|
||||
self.assertEqual(200, get_resp.status_int)
|
||||
self.assertEqual(self.valid_metadatum, get_resp.json)
|
||||
|
||||
@mock.patch('barbican.model.repositories.SecretUserMetadatumRepo.'
|
||||
'get_metadata_for_secret')
|
||||
def test_get_secret_metadatum_not_found(self, mocked_get):
|
||||
secret_resp, secret_uuid = create_secret(self.app)
|
||||
|
||||
mocked_get.return_value = self.valid_metadata['metadata']
|
||||
meta_resp = create_secret_metadatum(self.app,
|
||||
self.valid_metadatum,
|
||||
secret_resp)
|
||||
self.assertEqual(201, meta_resp.status_int)
|
||||
|
||||
mocked_get.return_value = self.updated_valid_metadata['metadata']
|
||||
get_resp = self.app.get('/secrets/%s/metadata/nothere' %
|
||||
secret_resp,
|
||||
expect_errors=True)
|
||||
self.assertEqual(404, get_resp.status_int)
|
||||
|
||||
@mock.patch('barbican.model.repositories.SecretUserMetadatumRepo.'
|
||||
'get_metadata_for_secret')
|
||||
def test_can_update_secret_metadatum(self, mocked_get):
|
||||
secret_resp, secret_uuid = create_secret(self.app)
|
||||
|
||||
mocked_get.return_value = self.valid_metadata['metadata']
|
||||
meta_resp = create_secret_metadatum(self.app,
|
||||
self.valid_metadatum,
|
||||
secret_resp)
|
||||
self.assertEqual(201, meta_resp.status_int)
|
||||
|
||||
new_metadatum = {
|
||||
'key': 'access-limit',
|
||||
'value': '5'
|
||||
}
|
||||
new_metadatum_json = json.dumps(new_metadatum)
|
||||
|
||||
mocked_get.return_value = self.updated_valid_metadata['metadata']
|
||||
put_resp = self.app.put('/secrets/%s/metadata/access-limit' %
|
||||
secret_resp,
|
||||
new_metadatum_json,
|
||||
headers={'Content-Type': 'application/json'})
|
||||
|
||||
self.assertEqual(200, put_resp.status_int)
|
||||
self.assertEqual(new_metadatum, put_resp.json)
|
||||
|
||||
@mock.patch('barbican.model.repositories.SecretUserMetadatumRepo.'
|
||||
'get_metadata_for_secret')
|
||||
def test_can_update_secret_metadatum_not_found(self, mocked_get):
|
||||
secret_resp, secret_uuid = create_secret(self.app)
|
||||
|
||||
mocked_get.return_value = self.valid_metadata['metadata']
|
||||
meta_resp = create_secret_metadatum(self.app,
|
||||
self.valid_metadatum,
|
||||
secret_resp)
|
||||
self.assertEqual(201, meta_resp.status_int)
|
||||
|
||||
new_metadatum = {
|
||||
'key': 'newwwww',
|
||||
'value': '5'
|
||||
}
|
||||
new_metadatum_json = json.dumps(new_metadatum)
|
||||
|
||||
mocked_get.return_value = self.updated_valid_metadata['metadata']
|
||||
put_resp = self.app.put('/secrets/%s/metadata/newwwww' %
|
||||
secret_resp,
|
||||
new_metadatum_json,
|
||||
headers={'Content-Type': 'application/json'},
|
||||
expect_errors=True)
|
||||
|
||||
self.assertEqual(404, put_resp.status_int)
|
||||
|
||||
@mock.patch('barbican.model.repositories.SecretUserMetadatumRepo.'
|
||||
'get_metadata_for_secret')
|
||||
def test_conflict_update_secret_metadatum(self, mocked_get):
|
||||
secret_resp, secret_uuid = create_secret(self.app)
|
||||
|
||||
mocked_get.return_value = self.valid_metadata['metadata']
|
||||
meta_resp = create_secret_metadatum(self.app,
|
||||
self.valid_metadatum,
|
||||
secret_resp)
|
||||
self.assertEqual(201, meta_resp.status_int)
|
||||
|
||||
new_metadatum = {
|
||||
'key': 'snoop',
|
||||
'value': '5'
|
||||
}
|
||||
new_metadatum_json = json.dumps(new_metadatum)
|
||||
mocked_get.return_value = self.updated_valid_metadata['metadata']
|
||||
put_resp = self.app.put('/secrets/%s/metadata/access-limit' %
|
||||
secret_resp,
|
||||
new_metadatum_json,
|
||||
headers={'Content-Type': 'application/json'},
|
||||
expect_errors=True)
|
||||
|
||||
self.assertEqual(409, put_resp.status_int)
|
||||
|
||||
def test_returns_405_for_delete_on_metadata(self):
|
||||
secret_id, secret_resp = create_secret(self.app)
|
||||
resp = self.app.delete('/secrets/{0}/metadata/'.format(secret_id),
|
||||
expect_errors=True)
|
||||
self.assertEqual(405, resp.status_int)
|
||||
|
||||
|
||||
# ----------------------- Helper Functions ---------------------------
|
||||
def create_secret(app, name=None, algorithm=None, bit_length=None, mode=None,
|
||||
expiration=None, payload='not-encrypted',
|
||||
content_type='text/plain',
|
||||
content_encoding=None, transport_key_id=None,
|
||||
transport_key_needed=None, expect_errors=False):
|
||||
request = {
|
||||
'name': name,
|
||||
'algorithm': algorithm,
|
||||
'bit_length': bit_length,
|
||||
'mode': mode,
|
||||
'expiration': expiration,
|
||||
'payload': payload,
|
||||
'payload_content_type': content_type,
|
||||
'payload_content_encoding': content_encoding,
|
||||
'transport_key_id': transport_key_id,
|
||||
'transport_key_needed': transport_key_needed
|
||||
}
|
||||
cleaned_request = {key: val for key, val in request.items()
|
||||
if val is not None}
|
||||
|
||||
resp = app.post_json(
|
||||
'/secrets/',
|
||||
cleaned_request,
|
||||
expect_errors=expect_errors
|
||||
)
|
||||
created_uuid = None
|
||||
if resp.status_int == 201:
|
||||
secret_ref = resp.json.get('secret_ref', '')
|
||||
_, created_uuid = os.path.split(secret_ref)
|
||||
|
||||
return created_uuid, resp
|
||||
|
||||
|
||||
def create_secret_metadata(app, metadata, secret_uuid,
|
||||
expect_errors=False):
|
||||
request = {}
|
||||
|
||||
for metadatum in metadata:
|
||||
request[metadatum] = metadata.get(metadatum)
|
||||
|
||||
cleaned_request = {key: val for key, val in request.items()
|
||||
if val is not None}
|
||||
|
||||
url = '/secrets/%s/metadata/' % secret_uuid
|
||||
|
||||
resp = app.put_json(
|
||||
url,
|
||||
cleaned_request,
|
||||
expect_errors=expect_errors
|
||||
)
|
||||
|
||||
return resp
|
||||
|
||||
|
||||
def create_secret_metadatum(app, metadata, secret_uuid, remainder=None,
|
||||
update=False, expect_errors=False):
|
||||
request = {}
|
||||
|
||||
for metadatum in metadata:
|
||||
request[metadatum] = metadata.get(metadatum)
|
||||
|
||||
cleaned_request = {key: val for key, val in request.items()
|
||||
if val is not None}
|
||||
|
||||
url = '/secrets/%s/metadata/' % secret_uuid
|
||||
if remainder:
|
||||
url = url + remainder
|
||||
|
||||
if update:
|
||||
resp = app.put_json(
|
||||
url,
|
||||
cleaned_request,
|
||||
expect_errors=expect_errors
|
||||
)
|
||||
else:
|
||||
resp = app.post_json(
|
||||
url,
|
||||
cleaned_request,
|
||||
expect_errors=expect_errors
|
||||
)
|
||||
|
||||
return resp
|
@ -1741,5 +1741,219 @@ class WhenTestingNewCAValidator(utils.BaseTestCase):
|
||||
)
|
||||
|
||||
|
||||
@utils.parameterized_test_case
|
||||
class WhenTestingSecretMetadataValidator(utils.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(WhenTestingSecretMetadataValidator, self).setUp()
|
||||
|
||||
self.top_key = 'metadata'
|
||||
|
||||
self.key1 = 'city'
|
||||
self.value1 = 'Austin'
|
||||
|
||||
self.key2 = 'state'
|
||||
self.value2 = 'Texas'
|
||||
|
||||
self.key3 = 'country'
|
||||
self.value3 = 'USA'
|
||||
|
||||
self.metadata_req = {
|
||||
self.top_key: {
|
||||
self.key1: self.value1,
|
||||
self.key2: self.value2,
|
||||
self.key3: self.value3
|
||||
}
|
||||
}
|
||||
|
||||
self.validator = validators.NewSecretMetadataValidator()
|
||||
|
||||
def test_should_validate_all_fields(self):
|
||||
self.validator.validate(self.metadata_req)
|
||||
|
||||
def test_should_validate_all_fields_and_make_key_lowercase(self):
|
||||
self.key1 = "DOgg"
|
||||
self.value1 = "poodle"
|
||||
self.metadata_req = {
|
||||
self.top_key: {
|
||||
self.key1: self.value1,
|
||||
self.key2: self.value2,
|
||||
self.key3: self.value3
|
||||
}
|
||||
}
|
||||
metadata = self.validator.validate(self.metadata_req)
|
||||
self.assertNotIn("DOgg", metadata.keys())
|
||||
self.assertIn("dogg", metadata.keys())
|
||||
|
||||
def test_should_validate_no_keys(self):
|
||||
del self.metadata_req[self.top_key][self.key1]
|
||||
del self.metadata_req[self.top_key][self.key2]
|
||||
del self.metadata_req[self.top_key][self.key3]
|
||||
self.validator.validate(self.metadata_req)
|
||||
|
||||
def test_should_raise_invalid_key_no_metadata(self):
|
||||
del self.metadata_req[self.top_key]
|
||||
exception = self.assertRaises(excep.InvalidObject,
|
||||
self.validator.validate,
|
||||
self.metadata_req)
|
||||
self.assertIn("metadata' is a required property",
|
||||
six.text_type(exception))
|
||||
|
||||
def test_should_raise_invalid_key_non_string(self):
|
||||
self.key1 = 0
|
||||
metadata_req = {
|
||||
self.top_key: {
|
||||
self.key1: self.value1
|
||||
}
|
||||
}
|
||||
exception = self.assertRaises(excep.InvalidMetadataRequest,
|
||||
self.validator.validate,
|
||||
metadata_req)
|
||||
self.assertIn("Invalid Metadata. Keys and Values must be Strings.",
|
||||
six.text_type(exception))
|
||||
|
||||
def test_should_raise_invalid_key_non_url_safe_string(self):
|
||||
self.key1 = "key/01"
|
||||
metadata_req = {
|
||||
self.top_key: {
|
||||
self.key1: self.value1
|
||||
}
|
||||
}
|
||||
exception = self.assertRaises(excep.InvalidMetadataKey,
|
||||
self.validator.validate,
|
||||
metadata_req)
|
||||
self.assertIn("Invalid Key. Key must be URL safe.",
|
||||
six.text_type(exception))
|
||||
|
||||
def test_should_raise_invalid_value_non_string(self):
|
||||
self.value1 = 0
|
||||
metadata_req = {
|
||||
self.top_key: {
|
||||
self.key1: self.value1
|
||||
}
|
||||
}
|
||||
exception = self.assertRaises(excep.InvalidMetadataRequest,
|
||||
self.validator.validate,
|
||||
metadata_req)
|
||||
self.assertIn("Invalid Metadata. Keys and Values must be Strings.",
|
||||
six.text_type(exception))
|
||||
|
||||
|
||||
@utils.parameterized_test_case
|
||||
class WhenTestingSecretMetadatumValidator(utils.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(WhenTestingSecretMetadatumValidator, self).setUp()
|
||||
|
||||
self.key1 = 'key'
|
||||
self.value1 = 'city'
|
||||
|
||||
self.key2 = 'value'
|
||||
self.value2 = 'Austin'
|
||||
|
||||
self.metadata_req = {
|
||||
self.key1: self.value1,
|
||||
self.key2: self.value2
|
||||
}
|
||||
|
||||
self.validator = validators.NewSecretMetadatumValidator()
|
||||
|
||||
def test_should_validate_all_fields(self):
|
||||
self.validator.validate(self.metadata_req)
|
||||
|
||||
def test_should_validate_all_fields_and_make_key_lowercase(self):
|
||||
self.value1 = "DOgg"
|
||||
self.value2 = "poodle"
|
||||
self.metadata_req = {
|
||||
self.key1: self.value1,
|
||||
self.key2: self.value2
|
||||
}
|
||||
metadata = self.validator.validate(self.metadata_req)
|
||||
self.assertEqual("dogg", metadata['key'])
|
||||
|
||||
def test_should_raise_invalid_empty(self):
|
||||
del self.metadata_req[self.key1]
|
||||
del self.metadata_req[self.key2]
|
||||
exception = self.assertRaises(excep.InvalidObject,
|
||||
self.validator.validate,
|
||||
self.metadata_req)
|
||||
self.assertIn("Provided object does not match schema "
|
||||
"'SecretMetadatum'",
|
||||
six.text_type(exception))
|
||||
|
||||
def test_should_raise_invalid_key_no_key(self):
|
||||
del self.metadata_req[self.key2]
|
||||
exception = self.assertRaises(excep.InvalidObject,
|
||||
self.validator.validate,
|
||||
self.metadata_req)
|
||||
self.assertIn("Provided object does not match schema "
|
||||
"'SecretMetadatum'",
|
||||
six.text_type(exception))
|
||||
|
||||
def test_should_raise_invalid_key_no_value(self):
|
||||
del self.metadata_req[self.key1]
|
||||
exception = self.assertRaises(excep.InvalidObject,
|
||||
self.validator.validate,
|
||||
self.metadata_req)
|
||||
self.assertIn("Provided object does not match schema "
|
||||
"'SecretMetadatum'",
|
||||
six.text_type(exception))
|
||||
|
||||
def test_should_raise_invalid_key_non_string(self):
|
||||
self.value1 = 0
|
||||
metadata_req = {
|
||||
self.key1: self.value1,
|
||||
self.key2: self.value2
|
||||
}
|
||||
|
||||
exception = self.assertRaises(excep.InvalidObject,
|
||||
self.validator.validate,
|
||||
metadata_req)
|
||||
self.assertIn("Provided object does not match schema "
|
||||
"'SecretMetadatum'",
|
||||
six.text_type(exception))
|
||||
|
||||
def test_should_raise_invalid_key_non_url_safe_string(self):
|
||||
self.value1 = "key/01"
|
||||
metadata_req = {
|
||||
self.key1: self.value1,
|
||||
self.key2: self.value2
|
||||
}
|
||||
|
||||
exception = self.assertRaises(excep.InvalidMetadataKey,
|
||||
self.validator.validate,
|
||||
metadata_req)
|
||||
self.assertIn("Invalid Key. Key must be URL safe.",
|
||||
six.text_type(exception))
|
||||
|
||||
def test_should_raise_invalid_value_non_string(self):
|
||||
self.value2 = 0
|
||||
metadata_req = {
|
||||
self.key1: self.value1,
|
||||
self.key2: self.value2
|
||||
}
|
||||
|
||||
exception = self.assertRaises(excep.InvalidObject,
|
||||
self.validator.validate,
|
||||
metadata_req)
|
||||
self.assertIn("Provided object does not match schema "
|
||||
"'SecretMetadatum'",
|
||||
six.text_type(exception))
|
||||
|
||||
def test_should_raise_invalid_extra_sent_key(self):
|
||||
self.value2 = 0
|
||||
metadata_req = {
|
||||
self.key1: self.value1,
|
||||
self.key2: self.value2,
|
||||
"extra_key": "extra_value"
|
||||
}
|
||||
|
||||
exception = self.assertRaises(excep.InvalidObject,
|
||||
self.validator.validate,
|
||||
metadata_req)
|
||||
self.assertIn("Provided object does not match schema "
|
||||
"'SecretMetadatum'",
|
||||
six.text_type(exception))
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
@ -0,0 +1,121 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from barbican.model import models
|
||||
from barbican.model import repositories
|
||||
from barbican.tests import database_utils
|
||||
from barbican.tests import utils
|
||||
|
||||
|
||||
@utils.parameterized_test_case
|
||||
class WhenTestingSecretMetadataRepository(database_utils.RepositoryTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(WhenTestingSecretMetadataRepository, self).setUp()
|
||||
self.repo = repositories.SecretUserMetadatumRepo()
|
||||
|
||||
self.test_metadata = {
|
||||
"dog": "poodle",
|
||||
"cat": "siamese"
|
||||
}
|
||||
|
||||
def _create_base_secret(self, project_id=None):
|
||||
# Setup the secret and needed base relationship
|
||||
secret_repo = repositories.get_secret_repository()
|
||||
session = secret_repo.get_session()
|
||||
|
||||
if project_id is None: # don't re-create project if it created earlier
|
||||
project = models.Project()
|
||||
project.external_id = "keystone_project_id"
|
||||
project.save(session=session)
|
||||
project_id = project.id
|
||||
|
||||
secret_model = models.Secret()
|
||||
secret_model.project_id = project_id
|
||||
secret = secret_repo.create_from(secret_model, session=session)
|
||||
|
||||
secret.save(session=session)
|
||||
session.commit()
|
||||
return secret
|
||||
|
||||
def test_create_and_get_metadata_for_secret(self):
|
||||
secret = self._create_base_secret()
|
||||
|
||||
self.repo.create_replace_user_metadata(secret.id,
|
||||
self.test_metadata)
|
||||
metadata = self.repo.get_metadata_for_secret(secret.id)
|
||||
self.assertEqual(self.test_metadata, metadata)
|
||||
|
||||
def test_get_metadata_invalid_secret(self):
|
||||
metadata = self.repo.get_metadata_for_secret("invalid_id")
|
||||
self.assertEqual({}, metadata)
|
||||
|
||||
def test_create_user_metadatum(self):
|
||||
secret = self._create_base_secret()
|
||||
|
||||
self.repo.create_replace_user_metadata(secret.id,
|
||||
self.test_metadata)
|
||||
|
||||
# adds a new key
|
||||
self.repo.create_replace_user_metadatum(secret.id,
|
||||
'lizard',
|
||||
'green anole')
|
||||
|
||||
self.test_metadata['lizard'] = 'green anole'
|
||||
metadata = self.repo.get_metadata_for_secret(secret.id)
|
||||
|
||||
self.assertEqual(self.test_metadata, metadata)
|
||||
|
||||
def test_replace_user_metadatum(self):
|
||||
secret = self._create_base_secret()
|
||||
|
||||
self.repo.create_replace_user_metadata(secret.id,
|
||||
self.test_metadata)
|
||||
|
||||
# updates existing key
|
||||
self.repo.create_replace_user_metadatum(secret.id,
|
||||
'dog',
|
||||
'rat terrier')
|
||||
|
||||
self.test_metadata['dog'] = 'rat terrier'
|
||||
metadata = self.repo.get_metadata_for_secret(secret.id)
|
||||
|
||||
self.assertEqual(self.test_metadata, metadata)
|
||||
|
||||
def test_delete_user_metadatum(self):
|
||||
secret = self._create_base_secret()
|
||||
|
||||
self.repo.create_replace_user_metadata(secret.id,
|
||||
self.test_metadata)
|
||||
|
||||
# deletes existing key
|
||||
self.repo.delete_metadatum(secret.id,
|
||||
'cat')
|
||||
|
||||
del self.test_metadata['cat']
|
||||
metadata = self.repo.get_metadata_for_secret(secret.id)
|
||||
|
||||
self.assertEqual(self.test_metadata, metadata)
|
||||
|
||||
def test_delete_secret_deletes_secret_metadata(self):
|
||||
secret = self._create_base_secret()
|
||||
|
||||
self.repo.create_replace_user_metadata(secret.id,
|
||||
self.test_metadata)
|
||||
|
||||
metadata = self.repo.get_metadata_for_secret(secret.id)
|
||||
self.assertEqual(self.test_metadata, metadata)
|
||||
|
||||
# deletes existing secret
|
||||
secret.delete()
|
||||
|
||||
metadata = self.repo.get_metadata_for_secret(secret.id)
|
||||
self.assertEqual({}, metadata)
|
@ -74,5 +74,9 @@
|
||||
"quotas:get": "rule:all_users",
|
||||
"project_quotas:get": "rule:service_admin",
|
||||
"project_quotas:put": "rule:service_admin",
|
||||
"project_quotas:delete": "rule:service_admin"
|
||||
"project_quotas:delete": "rule:service_admin",
|
||||
"secret_meta:get": "rule:all_but_audit",
|
||||
"secret_meta:post": "rule:admin_or_creator",
|
||||
"secret_meta:put": "rule:admin_or_creator",
|
||||
"secret_meta:delete": "rule:admin_or_creator"
|
||||
}
|
||||
|
127
functionaltests/api/v1/behaviors/secretmeta_behaviors.py
Normal file
127
functionaltests/api/v1/behaviors/secretmeta_behaviors.py
Normal file
@ -0,0 +1,127 @@
|
||||
"""
|
||||
Copyright 2016 IBM
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
"""
|
||||
import json
|
||||
|
||||
from functionaltests.api.v1.behaviors import base_behaviors
|
||||
|
||||
|
||||
class SecretMetadataBehaviors(base_behaviors.BaseBehaviors):
|
||||
|
||||
def create_or_update_metadata(self, secret_ref, data, extra_headers=None,
|
||||
use_auth=True, user_name=None, admin=None):
|
||||
|
||||
meta_ref = '%s/metadata' % secret_ref
|
||||
data = json.dumps(data)
|
||||
|
||||
resp = self.client.put(meta_ref, data=data,
|
||||
extra_headers=extra_headers, use_auth=use_auth,
|
||||
user_name=user_name)
|
||||
|
||||
# handle expected JSON parsing errors for unauthenticated requests
|
||||
if resp.status_code == 401 and not use_auth:
|
||||
return resp, None
|
||||
|
||||
returned_data = self.get_json(resp)
|
||||
metadata_ref = returned_data.get('metadata_ref')
|
||||
if metadata_ref:
|
||||
if admin is None:
|
||||
admin = user_name
|
||||
self.created_entities.append((metadata_ref, admin))
|
||||
return resp, metadata_ref
|
||||
|
||||
def get_metadata(self, secret_ref, extra_headers=None, use_auth=True,
|
||||
user_name=None, admin=None):
|
||||
|
||||
meta_ref = '%s/metadata' % secret_ref
|
||||
resp = self.client.get(meta_ref, extra_headers=extra_headers,
|
||||
user_name=user_name, use_auth=use_auth)
|
||||
|
||||
# handle expected JSON parsing errors for unauthenticated requests
|
||||
if resp.status_code == 401 and not use_auth:
|
||||
return resp, None
|
||||
|
||||
returned_data = self.get_json(resp)
|
||||
metadata_ref = returned_data.get('metadata_ref')
|
||||
if metadata_ref:
|
||||
if admin is None:
|
||||
admin = user_name
|
||||
self.created_entities.append((metadata_ref, admin))
|
||||
return resp
|
||||
|
||||
def create_metadatum(self, secret_ref, data, extra_headers=None,
|
||||
use_auth=True, user_name=None, admin=None):
|
||||
meta_key_ref = '%s/%s' % (secret_ref, 'metadata')
|
||||
data = json.dumps(data)
|
||||
|
||||
resp = self.client.post(meta_key_ref, data=data,
|
||||
extra_headers=extra_headers, use_auth=use_auth,
|
||||
user_name=user_name)
|
||||
|
||||
# handle expected JSON parsing errors for unauthenticated requests
|
||||
if resp.status_code == 401 and not use_auth:
|
||||
return resp, None
|
||||
|
||||
returned_data = self.get_json(resp)
|
||||
metadata_ref = returned_data.get('metadata_ref')
|
||||
if metadata_ref:
|
||||
if admin is None:
|
||||
admin = user_name
|
||||
self.created_entities.append((metadata_ref, admin))
|
||||
return resp, metadata_ref
|
||||
|
||||
def update_metadatum(self, secret_ref, metadata_key, data,
|
||||
extra_headers=None, use_auth=True, user_name=None,
|
||||
admin=None):
|
||||
meta_key_ref = '%s/%s/%s' % (secret_ref, 'metadata', metadata_key)
|
||||
data = json.dumps(data)
|
||||
|
||||
resp = self.client.put(meta_key_ref, data=data,
|
||||
extra_headers=extra_headers, use_auth=use_auth,
|
||||
user_name=user_name)
|
||||
|
||||
# handle expected JSON parsing errors for unauthenticated requests
|
||||
if resp.status_code == 401 and not use_auth:
|
||||
return resp, None
|
||||
|
||||
return resp
|
||||
|
||||
def get_metadatum(self, secret_ref, metadata_key, extra_headers=None,
|
||||
use_auth=True, user_name=None, admin=None):
|
||||
|
||||
meta_key_ref = '%s/%s/%s' % (secret_ref, 'metadata', metadata_key)
|
||||
resp = self.client.get(meta_key_ref, extra_headers=extra_headers,
|
||||
user_name=user_name, use_auth=use_auth)
|
||||
|
||||
# handle expected JSON parsing errors for unauthenticated requests
|
||||
if resp.status_code == 401 and not use_auth:
|
||||
return resp, None
|
||||
|
||||
returned_data = self.get_json(resp)
|
||||
metadata_ref = returned_data.get('metadata_ref')
|
||||
if metadata_ref:
|
||||
if admin is None:
|
||||
admin = user_name
|
||||
self.created_entities.append((metadata_ref, admin))
|
||||
return resp
|
||||
|
||||
def delete_metadatum(self, secret_ref, metadata_key, extra_headers=None,
|
||||
use_auth=True, user_name=None, admin=None):
|
||||
|
||||
meta_key_ref = '%s/%s/%s' % (secret_ref, 'metadata', metadata_key)
|
||||
resp = self.client.delete(meta_key_ref, extra_headers=extra_headers,
|
||||
user_name=user_name, use_auth=use_auth)
|
||||
|
||||
return resp
|
207
functionaltests/api/v1/functional/test_secretmeta.py
Normal file
207
functionaltests/api/v1/functional/test_secretmeta.py
Normal file
@ -0,0 +1,207 @@
|
||||
# Copyright (c) 2016 IBM
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import json
|
||||
from testtools import testcase
|
||||
import uuid
|
||||
|
||||
from barbican.tests import utils
|
||||
from functionaltests.api import base
|
||||
from functionaltests.api.v1.behaviors import secret_behaviors
|
||||
from functionaltests.api.v1.behaviors import secretmeta_behaviors
|
||||
from functionaltests.api.v1.models import secret_models
|
||||
|
||||
|
||||
@utils.parameterized_test_case
|
||||
class SecretMetadataTestCase(base.TestCase):
|
||||
def setUp(self):
|
||||
super(SecretMetadataTestCase, self).setUp()
|
||||
self.secret_behaviors = secret_behaviors.SecretBehaviors(self.client)
|
||||
self.behaviors = secretmeta_behaviors.SecretMetadataBehaviors(
|
||||
self.client)
|
||||
|
||||
self.default_secret_create_all_none_data = {
|
||||
"name": None,
|
||||
"expiration": None,
|
||||
"algorithm": None,
|
||||
"bit_length": None,
|
||||
"mode": None,
|
||||
"payload": None,
|
||||
"payload_content_type": None,
|
||||
"payload_content_encoding": None,
|
||||
}
|
||||
|
||||
self.valid_metadata = {
|
||||
"metadata": {
|
||||
"latitude": "30.393805",
|
||||
"longitude": "-97.724077"
|
||||
}
|
||||
}
|
||||
|
||||
self.invalid_metadata = {
|
||||
"metadataaaaaaaa": {
|
||||
"latitude": "30.393805",
|
||||
"longitude": "-97.724077"
|
||||
}
|
||||
}
|
||||
|
||||
self.valid_metadatum_key = 'access-limit'
|
||||
self.valid_metadatum = {
|
||||
'key': self.valid_metadatum_key,
|
||||
'value': '2'
|
||||
}
|
||||
|
||||
def tearDown(self):
|
||||
self.secret_behaviors.delete_all_created_secrets()
|
||||
super(SecretMetadataTestCase, self).tearDown()
|
||||
|
||||
@testcase.attr('positive')
|
||||
def test_secret_metadata_create(self):
|
||||
test_model = secret_models.SecretModel(
|
||||
**self.default_secret_create_all_none_data)
|
||||
|
||||
resp, secret_ref = self.secret_behaviors.create_secret(test_model)
|
||||
self.assertEqual(resp.status_code, 201)
|
||||
|
||||
meta_resp, metadata_ref = self.behaviors.create_or_update_metadata(
|
||||
secret_ref, self.valid_metadata)
|
||||
|
||||
self.assertEqual(meta_resp.status_code, 201)
|
||||
self.assertEqual(secret_ref + '/metadata', metadata_ref)
|
||||
|
||||
@testcase.attr('negative')
|
||||
def test_secret_metadata_create_no_secret(self):
|
||||
secret_ref = 'http://localhost:9311/secrets/%s' % uuid.uuid4().hex
|
||||
|
||||
meta_resp, metadata_ref = self.behaviors.create_or_update_metadata(
|
||||
secret_ref, self.invalid_metadata)
|
||||
|
||||
self.assertEqual(meta_resp.status_code, 404)
|
||||
|
||||
@testcase.attr('positive')
|
||||
def test_secret_metadata_get(self):
|
||||
test_model = secret_models.SecretModel(
|
||||
**self.default_secret_create_all_none_data)
|
||||
|
||||
resp, secret_ref = self.secret_behaviors.create_secret(test_model)
|
||||
self.assertEqual(resp.status_code, 201)
|
||||
|
||||
meta_resp, metadata_ref = self.behaviors.create_or_update_metadata(
|
||||
secret_ref, self.valid_metadata)
|
||||
|
||||
self.assertEqual(meta_resp.status_code, 201)
|
||||
self.assertEqual(secret_ref + '/metadata', metadata_ref)
|
||||
|
||||
get_resp = self.behaviors.get_metadata(secret_ref)
|
||||
self.assertEqual(get_resp.status_code, 200)
|
||||
self.assertEqual(get_resp.content, json.dumps(self.valid_metadata))
|
||||
|
||||
@testcase.attr('negative')
|
||||
def test_secret_metadata_get_no_secret(self):
|
||||
secret_ref = 'http://localhost:9311/secrets/%s' % uuid.uuid4().hex
|
||||
|
||||
get_resp = self.behaviors.get_metadata(secret_ref)
|
||||
self.assertEqual(get_resp.status_code, 404)
|
||||
|
||||
@testcase.attr('positive')
|
||||
def test_secret_metadatum_create(self):
|
||||
test_model = secret_models.SecretModel(
|
||||
**self.default_secret_create_all_none_data)
|
||||
|
||||
resp, secret_ref = self.secret_behaviors.create_secret(test_model)
|
||||
self.assertEqual(resp.status_code, 201)
|
||||
|
||||
meta_resp, metadata_ref = self.behaviors.create_metadatum(
|
||||
secret_ref, self.valid_metadatum)
|
||||
|
||||
self.assertEqual(meta_resp.status_code, 201)
|
||||
|
||||
@testcase.attr('positive')
|
||||
def test_secret_metadatum_update(self):
|
||||
test_model = secret_models.SecretModel(
|
||||
**self.default_secret_create_all_none_data)
|
||||
|
||||
resp, secret_ref = self.secret_behaviors.create_secret(test_model)
|
||||
self.assertEqual(resp.status_code, 201)
|
||||
|
||||
meta_resp, metadata_ref = self.behaviors.create_metadatum(
|
||||
secret_ref, self.valid_metadatum)
|
||||
|
||||
self.assertEqual(meta_resp.status_code, 201)
|
||||
|
||||
updated_meta = {
|
||||
'key': self.valid_metadatum_key,
|
||||
'value': '10'
|
||||
}
|
||||
|
||||
put_resp = self.behaviors.update_metadatum(
|
||||
secret_ref, self.valid_metadatum_key, updated_meta)
|
||||
|
||||
self.assertEqual(put_resp.status_code, 200)
|
||||
|
||||
@testcase.attr('positive')
|
||||
def test_secret_metadatum_get(self):
|
||||
test_model = secret_models.SecretModel(
|
||||
**self.default_secret_create_all_none_data)
|
||||
|
||||
resp, secret_ref = self.secret_behaviors.create_secret(test_model)
|
||||
self.assertEqual(resp.status_code, 201)
|
||||
|
||||
meta_resp, metadata_ref = self.behaviors.create_metadatum(
|
||||
secret_ref, self.valid_metadatum)
|
||||
|
||||
self.assertEqual(meta_resp.status_code, 201)
|
||||
|
||||
get_resp = self.behaviors.get_metadatum(secret_ref,
|
||||
self.valid_metadatum_key)
|
||||
self.assertEqual(get_resp.status_code, 200)
|
||||
self.assertEqual(get_resp.content, json.dumps(self.valid_metadatum,
|
||||
sort_keys=True))
|
||||
|
||||
@testcase.attr('negative')
|
||||
def test_secret_metadatum_get_wrong_key(self):
|
||||
test_model = secret_models.SecretModel(
|
||||
**self.default_secret_create_all_none_data)
|
||||
|
||||
resp, secret_ref = self.secret_behaviors.create_secret(test_model)
|
||||
self.assertEqual(resp.status_code, 201)
|
||||
|
||||
meta_resp, metadata_ref = self.behaviors.create_metadatum(
|
||||
secret_ref, self.valid_metadatum)
|
||||
|
||||
self.assertEqual(meta_resp.status_code, 201)
|
||||
|
||||
get_resp = self.behaviors.get_metadatum(secret_ref,
|
||||
'other_key')
|
||||
self.assertEqual(get_resp.status_code, 404)
|
||||
|
||||
@testcase.attr('positive')
|
||||
def test_secret_metadatum_delete(self):
|
||||
test_model = secret_models.SecretModel(
|
||||
**self.default_secret_create_all_none_data)
|
||||
|
||||
resp, secret_ref = self.secret_behaviors.create_secret(test_model)
|
||||
self.assertEqual(resp.status_code, 201)
|
||||
|
||||
meta_resp, metadata_ref = self.behaviors.create_metadatum(
|
||||
secret_ref, self.valid_metadatum)
|
||||
|
||||
self.assertEqual(meta_resp.status_code, 201)
|
||||
|
||||
get_resp = self.behaviors.get_metadatum(secret_ref,
|
||||
self.valid_metadatum_key)
|
||||
self.assertEqual(get_resp.status_code, 200)
|
||||
delete_resp = self.behaviors.delete_metadatum(secret_ref,
|
||||
self.valid_metadatum_key)
|
||||
self.assertEqual(delete_resp.status_code, 204)
|
Loading…
x
Reference in New Issue
Block a user