Add SecretConsumerValidator and its tests

This patch is part of a series to implement the Secret Consumers spec:
https://specs.openstack.org/openstack/barbican-specs/specs/train/secret-consumers.html

Change-Id: I6fdeccc226ab732019e672294b5a7ce977a0e209
Signed-off-by: Moises Guimaraes de Medeiros <moguimar@redhat.com>
This commit is contained in:
Moises Guimaraes de Medeiros 2019-09-24 16:10:13 +02:00
parent d2b7016f13
commit d5b3dc0c0a
2 changed files with 132 additions and 21 deletions

View File

@ -59,31 +59,31 @@ def get_invalid_property(validation_error):
def validate_stored_key_rsa_container(project_id, container_ref, req):
try:
container_id = hrefs.get_container_id_from_ref(container_ref)
except Exception:
reason = u._("Bad Container Reference {ref}").format(
ref=container_ref
)
raise exception.InvalidContainer(reason=reason)
try:
container_id = hrefs.get_container_id_from_ref(container_ref)
except Exception:
reason = u._("Bad Container Reference {ref}").format(
ref=container_ref
)
raise exception.InvalidContainer(reason=reason)
container_repo = repo.get_container_repository()
container_repo = repo.get_container_repository()
container = container_repo.get_container_by_id(entity_id=container_id,
suppress_exception=True)
if not container:
reason = u._("Container Not Found")
raise exception.InvalidContainer(reason=reason)
container = container_repo.get_container_by_id(entity_id=container_id,
suppress_exception=True)
if not container:
reason = u._("Container Not Found")
raise exception.InvalidContainer(reason=reason)
if container.type != 'rsa':
reason = u._("Container Wrong Type")
raise exception.InvalidContainer(reason=reason)
if container.type != 'rsa':
reason = u._("Container Wrong Type")
raise exception.InvalidContainer(reason=reason)
ctxt = controllers._get_barbican_context(req)
inst = controllers.containers.ContainerController(container)
controllers._do_enforce_rbac(inst, req,
controllers.containers.CONTAINER_GET,
ctxt)
ctxt = controllers._get_barbican_context(req)
inst = controllers.containers.ContainerController(container)
controllers._do_enforce_rbac(inst, req,
controllers.containers.CONTAINER_GET,
ctxt)
@six.add_metaclass(abc.ABCMeta)
@ -1004,3 +1004,35 @@ class NewCAValidator(ValidatorBase, CACommonHelpersMixin):
subject_dn = json_data['subject_dn']
self._validate_subject_dn_data(subject_dn)
return json_data
class SecretConsumerValidator(ValidatorBase):
"""Validate a new Secret Consumer."""
def __init__(self):
self.name = "Secret Consumer"
self.schema = {
"type": "object",
"properties": {
"service": {
"type": "string",
"maxLength": 255,
"minLength": 1,
},
"resource_type": {
"type": "string",
"maxLength": 255,
"minLength": 1,
},
"resource_id": {"type": "string", "minLength": 1},
},
"required": ["service", "resource_type", "resource_id"],
}
def validate(self, json_data, parent_schema=None):
schema_name = self._full_name(parent_schema)
self._assert_schema_is_valid(json_data, schema_name)
return json_data

View File

@ -1972,5 +1972,84 @@ class WhenTestingSecretMetadatumValidator(utils.BaseTestCase):
"'SecretMetadatum'",
six.text_type(exception))
class WhenTestingSecretConsumerValidator(utils.BaseTestCase):
def setUp(self):
super(WhenTestingSecretConsumerValidator, self).setUp()
self.service = "service"
self.resource_type = "resource_type"
self.resource_id = "resource_id"
self.consumer_req = {
"service": self.service,
"resource_type": self.resource_type,
"resource_id": self.resource_id,
}
self.validator = validators.SecretConsumerValidator()
def test_should_raise_with_invalid_json_data_type(self):
self.assertRaises(
excep.InvalidObject,
self.validator.validate,
[]
)
def test_should_raise_with_missing_service(self):
self.consumer_req.pop("service")
exception = self.assertRaises(
excep.InvalidObject,
self.validator.validate,
self.consumer_req
)
self.assertIn('\'service\'', exception.args[0])
def test_should_raise_with_missing_resource_type(self):
self.consumer_req.pop("resource_type")
exception = self.assertRaises(
excep.InvalidObject,
self.validator.validate,
self.consumer_req
)
self.assertIn('\'resource_type\'', exception.args[0])
def test_should_raise_with_missing_resource_id(self):
self.consumer_req.pop("resource_id")
exception = self.assertRaises(
excep.InvalidObject,
self.validator.validate,
self.consumer_req
)
self.assertIn('\'resource_id\'', exception.args[0])
def test_should_validate_all_fields(self):
self.validator.validate(self.consumer_req)
def test_service_too_long_should_raise_with_invalid_object(self):
# Negative test to make sure our maxLength parameter for the
# service field raises the proper exception when a value greater
# than 255 in this case is passed in.
self.consumer_req["service"] = 'a' * 256
self.assertRaises(
excep.InvalidObject,
self.validator.validate,
self.consumer_req
)
def test_resource_type_too_long_should_raise_with_invalid_object(self):
# Negative test to make sure our maxLength parameter for the
# service field raises the proper exception when a value greater
# than 255 in this case is passed in.
self.consumer_req["resource_type"] = 'a' * 256
self.assertRaises(
excep.InvalidObject,
self.validator.validate,
self.consumer_req
)
if __name__ == '__main__':
unittest.main()