# Copyright (c) 2014 Rackspace, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. # See the License for the specific language governing permissions and # limitations under the License. import json import sys import time from testtools import testcase from barbican.tests import utils from functionaltests.api import base from functionaltests.api.v1.behaviors import container_behaviors from functionaltests.api.v1.behaviors import order_behaviors from functionaltests.api.v1.behaviors import secret_behaviors from functionaltests.api.v1.models import order_models from cryptography.hazmat import backends from cryptography.hazmat.primitives.asymmetric import padding from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives import serialization def get_default_order_create_data(): return {'type': 'key', "meta": { "name": "barbican functional test secret name", "algorithm": "aes", "bit_length": 256, "mode": "cbc", } } # Any field with None will be created in the model with None as the value # but will be omitted in the final request (via the requests package) # to the server. # # Given that fact, order_create_nones_data is effectively an empty json request # to the server. def get_default_order_create_all_none_data(): return { 'type': None, "meta": { "name": None, "algorithm": None, "bit_length": None, "mode": None, } } def get_default_order_create_asymmetric_data(): return { 'type': 'asymmetric', "meta": { "name": "barbican functional test asymmetric secret name", "algorithm": "rsa", "bit_length": 1024, "mode": "cbc", } } @utils.parameterized_test_case class OrdersTestCase(base.TestCase): def setUp(self): super(OrdersTestCase, self).setUp() self.behaviors = order_behaviors.OrderBehaviors(self.client) self.container_behaviors = container_behaviors.ContainerBehaviors( self.client) self.secret_behaviors = secret_behaviors.SecretBehaviors(self.client) self.create_default_data = get_default_order_create_data() self.create_all_none_data = get_default_order_create_all_none_data() self.asymmetric_data = get_default_order_create_asymmetric_data() def tearDown(self): self.behaviors.delete_all_created_orders() super(OrdersTestCase, self).tearDown() def wait_for_order(self, order_resp, order_ref): # Make sure we have an active order time_count = 1 while order_resp.model.status != "ACTIVE" and time_count <= 4: time.sleep(1) time_count += 1 order_resp = self.behaviors.get_order(order_ref) @testcase.attr('positive') def test_order_create_w_out_name(self): """Create an order without the name attribute.""" test_model = order_models.OrderModel(**self.create_default_data) test_model.name = None create_resp, order_ref = self.behaviors.create_order(test_model) self.assertEqual(create_resp.status_code, 202) self.assertIsNotNone(order_ref) @testcase.attr('positive') def test_order_create_w_empty_name(self): """Create an order the name attribute an empty string.""" test_model = order_models.OrderModel(**self.create_default_data) test_model.name = "" create_resp, order_ref = self.behaviors.create_order(test_model) self.assertEqual(create_resp.status_code, 202) self.assertIsNotNone(order_ref) @testcase.attr('positive') def test_orders_create_check_empty_name(self): """Create order with empty meta name. The resulting secret name should be a UUID. """ # first create an order with defaults test_model = order_models.OrderModel(**self.create_default_data) test_model.meta['name'] = "" create_resp, order_ref = self.behaviors.create_order(test_model) # verify that the order was created successfully self.assertEqual(create_resp.status_code, 202) self.assertIsNotNone(order_ref) # given the order href, retrieve the order order_resp = self.behaviors.get_order(order_ref) # verify that the get was successful self.assertEqual(order_resp.status_code, 200) self.assertTrue(order_resp.model.status == "ACTIVE" or order_resp.model.status == "PENDING") # PENDING orders may take a moment to be processed by the workers # when running tests with queue enabled self.wait_for_order(order_resp, order_ref) # verify the new secret's name matches the name in the secret ref # in the newly created order. secret_resp = self.secret_behaviors.get_secret_metadata( order_resp.model.secret_ref) self.assertEqual(secret_resp.status_code, 200) self.assertEqual(secret_resp.model.name, test_model.meta['name']) @testcase.attr('positive') def test_order_and_secret_metadata_same(self): """Checks that metadata from secret GET and order GET are the same. Covers checking that secret metadata from a get on the order and secret metadata from a get on the secret are the same. Assumes that the order status will be active and not pending. """ test_model = order_models.OrderModel(**self.create_default_data) resp, order_ref = self.behaviors.create_order(test_model) self.assertEqual(resp.status_code, 202) order_resp = self.behaviors.get_order(order_ref) self.assertEqual(order_resp.status_code, 200) # PENDING orders may take a moment to be processed by the workers # when running tests with queue enabled self.wait_for_order(order_resp, order_ref) secret_ref = order_resp.model.secret_ref secret_resp = self.secret_behaviors.get_secret_metadata(secret_ref) self.assertEqual(order_resp.model.meta['name'], secret_resp.model.name, 'Names were not the same') self.assertEqual(order_resp.model.meta['algorithm'], secret_resp.model.algorithm, 'Algorithms were not the same') self.assertEqual(order_resp.model.meta['bit_length'], secret_resp.model.bit_length, 'Bit lengths were not the same') self.assertEqual(order_resp.model.meta['expiration'], secret_resp.model.expiration, 'Expirations were not the same') self.assertEqual(order_resp.model.meta['mode'], secret_resp.model.mode, 'Modes were not the same') @testcase.attr('negative') def test_order_get_order_that_doesnt_exist(self): """Covers case of getting a non-existent order.""" # try to get a non-existent order order_resp = self.behaviors.get_order("a ref that does not exist") # verify that the order get failed self.assertEqual(order_resp.status_code, 404) @testcase.attr('negative') def test_order_create_w_invalid_content_type(self): """Covers creating order with invalid content-type header.""" test_model = order_models.OrderModel(**self.create_default_data) extra_headers = {"Content-Type": "crypto/boom"} create_resp, order_ref = self.behaviors.create_order( test_model, extra_headers=extra_headers) self.assertEqual(create_resp.status_code, 415) self.assertIsNone(order_ref) @testcase.attr('negative') def test_order_create_all_none(self): """Covers order creation with empty JSON.""" test_model = order_models.OrderModel(**self.create_all_none_data) create_resp, order_ref = self.behaviors.create_order(test_model) self.assertEqual(create_resp.status_code, 400) self.assertIsNone(order_ref) @testcase.attr('negative') def test_order_create_empty_entries(self): """Covers order creation with empty JSON.""" test_model = order_models.OrderModel(**self.create_all_none_data) test_model.meta['name'] = "" test_model.meta['algorithm'] = "" test_model.meta['mode'] = "" test_model.meta['bit_length'] = "" test_model.meta['payload_content_type'] = "" create_resp, order_ref = self.behaviors.create_order(test_model) self.assertEqual(create_resp.status_code, 400) self.assertIsNone(order_ref) @testcase.attr('negative') def test_order_create_oversized_strings(self): """Covers order creation with empty JSON.""" test_model = order_models.OrderModel(**self.create_default_data) test_model.meta['name'] = base.TestCase.oversized_field test_model.meta['algorithm'] = base.TestCase.oversized_field test_model.meta['mode'] = base.TestCase.oversized_field create_resp, order_ref = self.behaviors.create_order(test_model) self.assertEqual(create_resp.status_code, 400) self.assertIsNone(order_ref) @testcase.attr('negative') def test_order_create_error_message_on_invalid_order_create(self): """Related Launchpad issue: 1269594.""" test_model = order_models.OrderModel(**self.create_default_data) test_model.meta['payload'] = "blarg!" resp, order_ref = self.behaviors.create_order(test_model) # Make sure we actually get a message back error_msg = json.loads(resp.content).get('title') self.assertEqual(resp.status_code, 400) self.assertIsNotNone(error_msg) self.assertNotEqual(error_msg, 'None') @utils.parameterized_dataset({ '8': [8], '64': [64], '128': [128], '192': [192], '256': [256], '1024': [1024], '2048': [2048], '4096': [4096] }) @testcase.attr('positive') def test_order_create_valid_bit_length(self, bit_length): """Covers creating orders with various valid bit lengths.""" test_model = order_models.OrderModel(**self.create_default_data) test_model.meta['bit_length'] = bit_length create_resp, order_ref = self.behaviors.create_order(test_model) self.assertEqual(create_resp.status_code, 202) self.assertIsNotNone(order_ref) @utils.parameterized_dataset({ 'negative_maxint': [-sys.maxint], 'negative_7': [-7], 'negative_1': [-1], '0': [0], '1': [1], '7': [7], '129': [129], 'none': [None], 'empty': [''], 'space': [' '], 'over_signed_small_int': [32768] }) @testcase.attr('negative') def test_order_create_invalid_bit_length(self, bit_length): """Covers creating orders with various invalid bit lengths.""" test_model = order_models.OrderModel(**self.create_default_data) test_model.meta['bit_length'] = bit_length create_resp, order_ref = self.behaviors.create_order(test_model) self.assertEqual(create_resp.status_code, 400) @utils.parameterized_dataset({ 'array': [['array']], 'int': [123], 'oversized_payload': [str(base.TestCase.oversized_payload)], 'standard_payload': ['standard payload'], 'empty': [''] }) @testcase.attr('negative') def test_order_create_invalid_payload(self, payload): """Covers creating orders with various invalid payloads.""" test_model = order_models.OrderModel(**self.create_default_data) test_model.meta['payload'] = payload create_resp, order_ref = self.behaviors.create_order(test_model) self.assertEqual(create_resp.status_code, 400) @utils.parameterized_dataset({ 'alphanumeric': ['1f34ds'], 'len_255': [base.TestCase.max_sized_field], 'uuid': ['54262d9d-4bc7-4821-8df0-dc2ca8e112bb'], 'punctuation': ['~!@#$%^&*()_+`-={}[]|:;<>,.?'], 'empty': [""] }) @testcase.attr('positive') def test_order_create_valid_name(self, name): """Covers creating orders with various valid names.""" test_model = order_models.OrderModel(**self.create_default_data) test_model.meta['name'] = name create_resp, order_ref = self.behaviors.create_order(test_model) self.assertEqual(create_resp.status_code, 202) self.assertIsNotNone(order_ref) @utils.parameterized_dataset({ 'int': [123] }) @testcase.attr('negative') def test_order_create_invalid_name(self, name): """Covers creating orders with various invalid names.""" test_model = order_models.OrderModel(**self.create_default_data) test_model.meta['name'] = name create_resp, order_ref = self.behaviors.create_order(test_model) self.assertEqual(create_resp.status_code, 400) @utils.parameterized_dataset({ 'cbc': ['cbc'] }) @testcase.attr('positive') def test_order_create_valid_mode(self, mode): """Covers creating orders with various valid modes.""" test_model = order_models.OrderModel(**self.create_default_data) test_model.meta['mode'] = mode create_resp, order_ref = self.behaviors.create_order(test_model) self.assertEqual(create_resp.status_code, 202) self.assertIsNotNone(order_ref) @utils.parameterized_dataset({ 'int': [123] }) @testcase.attr('negative') def test_order_create_invalid_mode(self, mode): """Covers creating orders with various invalid modes.""" test_model = order_models.OrderModel(**self.create_default_data) test_model.meta['mode'] = mode create_resp, order_ref = self.behaviors.create_order(test_model) self.assertEqual(create_resp.status_code, 400) @utils.parameterized_dataset({ 'aes': ['aes'] }) @testcase.attr('positive') def test_order_create_valid_algorithm(self, algorithm): """Covers creating orders with various valid algorithms.""" test_model = order_models.OrderModel(**self.create_default_data) test_model.meta['algorithm'] = algorithm create_resp, order_ref = self.behaviors.create_order(test_model) self.assertEqual(create_resp.status_code, 202) self.assertIsNotNone(order_ref) @utils.parameterized_dataset({ 'int': [123] }) @testcase.attr('negative') def test_order_create_invalid_algorithm(self, algorithm): """Covers creating orders with various invalid algorithms.""" test_model = order_models.OrderModel(**self.create_default_data) test_model.meta['algorithm'] = algorithm create_resp, order_ref = self.behaviors.create_order(test_model) self.assertEqual(create_resp.status_code, 400) @utils.parameterized_dataset({ 'empty': [''], 'text/plain': ['text/plain'], 'text_plain_space_charset_utf8': ['text/plain; charset=utf-8'], }) @testcase.attr('positive') def test_order_create_valid_payload_content_type(self, pct): """Covers order creation with various valid payload content types.""" test_model = order_models.OrderModel(**self.create_default_data) test_model.meta['payload_content_type'] = pct create_resp, order_ref = self.behaviors.create_order(test_model) self.assertEqual(create_resp.status_code, 202) self.assertIsNotNone(order_ref) @utils.parameterized_dataset({ 'int': [123], 'invalid': ['invalid'], 'oversized_string': [base.TestCase.oversized_field], 'text': ['text'], 'text_slash_with_no_subtype': ['text/'], }) @testcase.attr('negative') def test_order_create_invalid_payload_content_type(self, pct): """Covers order creation with various invalid payload content types.""" test_model = order_models.OrderModel(**self.create_default_data) test_model.meta['payload_content_type'] = pct create_resp, order_ref = self.behaviors.create_order(test_model) self.assertEqual(create_resp.status_code, 400) @utils.parameterized_dataset({ 'negative_five_long_expire': { 'timezone': '-05:00', 'days': 5}, 'positive_five_long_expire': { 'timezone': '+05:00', 'days': 5}, 'negative_one_short_expire': { 'timezone': '-01', 'days': 1}, 'positive_one_short_expire': { 'timezone': '+01', 'days': 1} }) @testcase.attr('positive') def test_order_create_valid_expiration(self, **kwargs): """Covers creating orders with various valid expiration data.""" timestamp = utils.create_timestamp_w_tz_and_offset(**kwargs) test_model = order_models.OrderModel(**self.create_default_data) test_model.meta['expiration'] = timestamp create_resp, order_ref = self.behaviors.create_order(test_model) self.assertEqual(create_resp.status_code, 202) self.assertIsNotNone(order_ref) @utils.parameterized_dataset({ 'malformed_timezone': { 'timezone': '-5:00', 'days': 5}, }) @testcase.attr('negative') def test_order_create_invalid_expiration(self, **kwargs): """Covers creating orders with various invalid expiration data.""" timestamp = utils.create_timestamp_w_tz_and_offset(**kwargs) test_model = order_models.OrderModel(**self.create_default_data) test_model.meta['expiration'] = timestamp create_resp, order_ref = self.behaviors.create_order(test_model) self.assertEqual(create_resp.status_code, 400) @testcase.attr('positive') def test_order_create_change_host_header(self, **kwargs): """Create an order with a (possibly) malicious host name in header.""" test_model = order_models.OrderModel(**self.create_default_data) malicious_hostname = 'some.bad.server.com' changed_host_header = {'Host': malicious_hostname} resp, order_ref = self.behaviors.create_order( test_model, extra_headers=changed_host_header) self.assertEqual(resp.status_code, 202) # get Location field from result and assert that it is NOT the # malicious one. regex = '.*{0}.*'.format(malicious_hostname) self.assertNotRegexpMatches(resp.headers['location'], regex) @testcase.attr('positive') def test_encryption_using_generated_key(self): """Tests functionality of a generated asymmetric key pair.""" test_model = order_models.OrderModel(**self.asymmetric_data) create_resp, order_ref = self.behaviors.create_order(test_model) self.assertEqual(create_resp.status_code, 202) order_resp = self.behaviors.get_order(order_ref) self.assertEqual(order_resp.status_code, 200) container_resp = self.container_behaviors.get_container( order_resp.model.container_ref) self.assertEqual(container_resp.status_code, 200) secret_dict = {} for secret in container_resp.model.secret_refs: self.assertIsNotNone(secret.secret_ref) secret_resp = self.secret_behaviors.get_secret( secret.secret_ref, "application/octet-stream") self.assertIsNotNone(secret_resp) secret_dict[secret.name] = secret_resp.content private_key = serialization.load_pem_private_key( secret_dict['private_key'], password=None, backend=backends.default_backend() ) public_key = serialization.load_pem_public_key( secret_dict['public_key'], backend=backends.default_backend() ) self.assertIsNotNone(private_key) self.assertIsNotNone(public_key) message = b'plaintext message' ciphertext = public_key.encrypt( message, padding.OAEP( mgf=padding.MGF1(algorithm=hashes.SHA1()), algorithm=hashes.SHA1(), label=None ) ) plaintext = private_key.decrypt( ciphertext, padding.OAEP( mgf=padding.MGF1(algorithm=hashes.SHA1()), algorithm=hashes.SHA1(), label=None ) ) self.assertEqual(message, plaintext) class OrdersUnauthedTestCase(base.TestCase): def setUp(self): super(OrdersUnauthedTestCase, self).setUp() self.behaviors = order_behaviors.OrderBehaviors(self.client) self.container_behaviors = container_behaviors.ContainerBehaviors( self.client) self.secret_behaviors = secret_behaviors.SecretBehaviors(self.client) self.create_default_data = get_default_order_create_data() self.dummy_order_ref = 'orders/dummy-7b86-4071-935d-ef6b83729200' self.dummy_project_id = 'dummy' def tearDown(self): self.behaviors.delete_all_created_orders() super(OrdersUnauthedTestCase, self).tearDown() @testcase.attr('negative', 'security') def test_order_create_unauthed_no_proj_id(self): """Attempt to create an order without a token or project id Should return 401 """ model = order_models.OrderModel(self.create_default_data) resp, order_ref = self.behaviors.create_order(model, use_auth=False) self.assertEqual(401, resp.status_code) @testcase.attr('negative', 'security') def test_order_get_unauthed_no_proj_id(self): """Attempt to get an order without a token or project id Should return 401 """ resp = self.behaviors.get_order(self.dummy_order_ref, use_auth=False) self.assertEqual(401, resp.status_code) @testcase.attr('negative', 'security') def test_order_get_order_list_unauthed_no_proj_id(self): """Attempt to get the list of orders without a token or project id Should return 401 """ resp, orders, next_ref, prev_ref = self.behaviors.get_orders( use_auth=False ) self.assertEqual(401, resp.status_code) @testcase.attr('negative', 'security') def test_order_delete_unauthed_no_proj_id(self): """Attempt to delete an order without a token or project id Should return 401 """ resp = self.behaviors.delete_order( self.dummy_order_ref, expected_fail=True, use_auth=False ) self.assertEqual(401, resp.status_code) @testcase.attr('negative', 'security') def test_order_create_unauthed_with_proj_id(self): """Attempt to create an order with a project id, but no token Should return 401 """ model = order_models.OrderModel(self.create_default_data) headers = {'X-Project-Id': self.dummy_project_id} resp, order_ref = self.behaviors.create_order( model, extra_headers=headers, use_auth=False ) self.assertEqual(401, resp.status_code) @testcase.attr('negative', 'security') def test_order_get_unauthed_with_proj_id(self): """Attempt to get an order with a project id, but no token Should return 401 """ headers = {'X-Project-Id': self.dummy_project_id} resp = self.behaviors.get_order( self.dummy_order_ref, extra_headers=headers, use_auth=False ) self.assertEqual(401, resp.status_code) @testcase.attr('negative', 'security') def test_order_get_order_list_unauthed_with_proj_id(self): """Attempt to get the list of orders with a project id, but no token Should return 401 """ headers = {'X-Project-Id': self.dummy_project_id} resp, orders, next_ref, prev_ref = self.behaviors.get_orders( extra_headers=headers, use_auth=False ) self.assertEqual(401, resp.status_code) @testcase.attr('negative', 'security') def test_order_delete_unauthed_with_proj_id(self): """Attempt to delete an order with a project id, but no token Should return 401 """ headers = {'X-Project-Id': self.dummy_project_id} resp = self.behaviors.delete_order( self.dummy_order_ref, extra_headers=headers, expected_fail=True, use_auth=False ) self.assertEqual(401, resp.status_code)