
The patch removes the neutron-lib from the dependencies. We do not want/need the consumers of this library to pull in all of the neutron-lib dependencies. The patch adds the following: 1. callback to bind is_attr_set - set_is_attr_callback 2. A new exception NsxLibInvalidInput - this is raised when inputs are invalid. Change-Id: Ia8ec71dee2d5de921700a9b4fd7e789d2aed4679
296 lines
12 KiB
Python
296 lines
12 KiB
Python
# Copyright 2015 VMware, Inc.
|
|
# All Rights Reserved
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
#
|
|
import os
|
|
|
|
from OpenSSL import crypto
|
|
from oslo_serialization import jsonutils
|
|
|
|
from vmware_nsxlib.tests.unit.v3 import mocks
|
|
from vmware_nsxlib.tests.unit.v3 import nsxlib_testcase
|
|
from vmware_nsxlib.tests.unit.v3 import test_client
|
|
from vmware_nsxlib.v3 import client
|
|
from vmware_nsxlib.v3 import client_cert
|
|
from vmware_nsxlib.v3 import exceptions as nsxlib_exc
|
|
from vmware_nsxlib.v3 import trust_management as tm
|
|
|
|
|
|
class DummyStorageDriver(dict):
|
|
"""Storage driver simulation - just a dictionary"""
|
|
|
|
def store_cert(self, project_id, certificate, private_key):
|
|
self[project_id] = {}
|
|
self[project_id]['cert'] = certificate
|
|
self[project_id]['key'] = private_key
|
|
|
|
def get_cert(self, project_id):
|
|
if project_id not in self:
|
|
return (None, None)
|
|
|
|
return (self[project_id]['cert'], self[project_id]['key'])
|
|
|
|
def delete_cert(self, project_id):
|
|
del(self[project_id])
|
|
|
|
def is_empty(self, project_id):
|
|
return project_id not in self
|
|
|
|
|
|
class NsxV3ClientCertificateTestCase(nsxlib_testcase.NsxClientTestCase):
|
|
|
|
identity = 'drumknott'
|
|
cert_id = "00000000-1111-2222-3333-444444444444"
|
|
identity_id = "55555555-6666-7777-8888-999999999999"
|
|
node_id = "meh"
|
|
|
|
def _get_mocked_response(self, status_code, results):
|
|
return mocks.MockRequestsResponse(
|
|
status_code,
|
|
jsonutils.dumps({'results': results}))
|
|
|
|
def _get_mocked_error_response(self, status_code, error_code):
|
|
return mocks.MockRequestsResponse(
|
|
status_code,
|
|
jsonutils.dumps({'httpStatus': 'go away',
|
|
'error_code': error_code,
|
|
'module_name': 'never mind',
|
|
'error_message': 'bad luck'}))
|
|
|
|
def _get_mocked_trust(self, action, cert_pem):
|
|
|
|
fake_responses = []
|
|
if 'create' in action:
|
|
# import cert and return its id
|
|
results = [{'id': self.cert_id}]
|
|
fake_responses.append(self._get_mocked_response(201, results))
|
|
# and then bind this id to principal identity
|
|
fake_responses.append(self._get_mocked_response(201, []))
|
|
|
|
if 'delete' in action:
|
|
# get certs list, including same cert imported twice edge case
|
|
results = [{'resource_type': 'Certificate',
|
|
'id': 'dont care',
|
|
'pem_encoded': 'some junk'},
|
|
{'resource_type': 'Certificate',
|
|
'id': 'some_other_cert_id',
|
|
'pem_encoded': cert_pem},
|
|
{'resource_type': 'Certificate',
|
|
'id': self.cert_id,
|
|
'pem_encoded': cert_pem}]
|
|
fake_responses.append(self._get_mocked_response(200, results))
|
|
|
|
# get principal identities list
|
|
results = [{'resource_type': 'Principal Identity',
|
|
'id': 'dont care',
|
|
'name': 'willikins',
|
|
'certificate_id': 'some other id'},
|
|
{'resource_type': 'Principal Identity',
|
|
'id': self.identity_id,
|
|
'name': self.identity,
|
|
'certificate_id': self.cert_id}]
|
|
fake_responses.append(self._get_mocked_response(200, results))
|
|
# delete certificate
|
|
fake_responses.append(self._get_mocked_response(204, []))
|
|
# delete identity
|
|
fake_responses.append(self._get_mocked_response(204, []))
|
|
|
|
mock_client = self.new_mocked_client(
|
|
client.JSONRESTClient,
|
|
url_prefix='api/v1', session_response=fake_responses)
|
|
|
|
return tm.NsxLibTrustManagement(mock_client, {})
|
|
|
|
def _verify_backend_create(self, mocked_trust, cert_pem):
|
|
"""Verify API calls to create cert and identity on backend"""
|
|
# verify API call to import cert on backend
|
|
base_uri = 'https://1.2.3.4/api/v1/trust-management'
|
|
uri = base_uri + '/certificates?action=import'
|
|
expected_body = {'pem_encoded': cert_pem}
|
|
test_client.assert_json_call('post', mocked_trust.client, uri,
|
|
single_call=False,
|
|
data=jsonutils.dumps(expected_body))
|
|
|
|
# verify API call to bind cert to identity on backend
|
|
uri = base_uri + '/principal-identities'
|
|
expected_body = {'name': self.identity,
|
|
'node_id': self.node_id,
|
|
'permission_group': 'read_write_api_users',
|
|
'certificate_id': self.cert_id,
|
|
'is_protected': True}
|
|
test_client.assert_json_call('post', mocked_trust.client, uri,
|
|
single_call=False,
|
|
data=jsonutils.dumps(expected_body,
|
|
sort_keys=True))
|
|
|
|
def _verify_backend_delete(self, mocked_trust):
|
|
"""Verify API calls to fetch and delete cert and identity"""
|
|
# verify API call to query identities in order to get cert id
|
|
base_uri = 'https://1.2.3.4/api/v1/trust-management'
|
|
uri = base_uri + '/principal-identities'
|
|
test_client.assert_json_call('get', mocked_trust.client, uri,
|
|
single_call=False)
|
|
# verify API call to delete openstack principal identity
|
|
uri = uri + '/' + self.identity_id
|
|
test_client.assert_json_call('delete', mocked_trust.client, uri,
|
|
single_call=False)
|
|
|
|
# verify API call to delete certificate
|
|
uri = base_uri + '/certificates/' + self.cert_id
|
|
test_client.assert_json_call('delete', mocked_trust.client, uri,
|
|
single_call=False)
|
|
|
|
def test_generate_cert(self):
|
|
"""Test startup without certificate + certificate generation"""
|
|
|
|
storage_driver = DummyStorageDriver()
|
|
# Prepare fake trust management for "cert create" requests
|
|
cert_pem, key_pem = storage_driver.get_cert(self.identity)
|
|
mocked_trust = self._get_mocked_trust('create', cert_pem)
|
|
cert = client_cert.ClientCertificateManager(self.identity,
|
|
mocked_trust,
|
|
storage_driver)
|
|
self.assertFalse(cert.exists())
|
|
|
|
cert.generate(subject={}, key_size=2048, valid_for_days=333,
|
|
node_id=self.node_id)
|
|
|
|
# verify client cert was generated and makes sense
|
|
self.assertTrue(cert.exists())
|
|
self.assertEqual(332, cert.expires_in_days())
|
|
cert_pem, key_pem = cert.get_pem()
|
|
|
|
# verify cert ans PK were stored in storage
|
|
stored_cert, stored_key = storage_driver.get_cert(self.identity)
|
|
self.assertEqual(cert_pem, stored_cert)
|
|
self.assertEqual(key_pem, stored_key)
|
|
|
|
# verify backend API calls
|
|
self._verify_backend_create(mocked_trust, cert_pem)
|
|
|
|
# try to generate cert again and fail
|
|
self.assertRaises(nsxlib_exc.ObjectAlreadyExists,
|
|
cert.generate, {})
|
|
|
|
def _prepare_storage_with_existing_cert(self, key_size, days, alg, subj):
|
|
# prepare storage driver with existing cert and key
|
|
# this test simulates system startup
|
|
cert, key = client_cert.generate_self_signed_cert_pair(key_size, days,
|
|
alg, subj)
|
|
storage_driver = DummyStorageDriver()
|
|
cert_pem = crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
|
|
key_pem = crypto.dump_privatekey(crypto.FILETYPE_PEM, key)
|
|
|
|
storage_driver.store_cert(self.identity, cert_pem, key_pem)
|
|
return storage_driver
|
|
|
|
def test_load_and_delete_existing_cert(self):
|
|
"""Test startup with existing certificate + certificate deletion"""
|
|
|
|
storage_driver = self._prepare_storage_with_existing_cert(4096,
|
|
3650,
|
|
'sha256',
|
|
{})
|
|
|
|
# get mocked backend driver for trust management,
|
|
# prepared for get request, that preceeds delete operation
|
|
cert_pem, key_pem = storage_driver.get_cert(self.identity)
|
|
mocked_trust = self._get_mocked_trust('delete', cert_pem)
|
|
|
|
cert = client_cert.ClientCertificateManager(self.identity,
|
|
mocked_trust,
|
|
storage_driver)
|
|
self.assertTrue(cert.exists())
|
|
|
|
cert.delete()
|
|
|
|
self.assertFalse(cert.exists())
|
|
self.assertTrue(storage_driver.is_empty(self.identity))
|
|
|
|
self._verify_backend_delete(mocked_trust)
|
|
|
|
def _test_import_and_delete_cert(self, with_pkey=True):
|
|
filename = '/tmp/test.pem'
|
|
# this driver simulates storage==none scenario
|
|
noop_driver = DummyStorageDriver()
|
|
cert, key = client_cert.generate_self_signed_cert_pair(4096,
|
|
20,
|
|
'sha256',
|
|
{})
|
|
|
|
cert_pem = crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
|
|
key_pem = crypto.dump_privatekey(crypto.FILETYPE_PEM, key)
|
|
with open(filename, 'wb') as f:
|
|
f.write(cert_pem)
|
|
if with_pkey:
|
|
f.write(key_pem)
|
|
|
|
mocked_trust = self._get_mocked_trust('create_delete',
|
|
cert_pem)
|
|
cert = client_cert.ClientCertificateManager(self.identity,
|
|
mocked_trust,
|
|
noop_driver)
|
|
cert.import_pem(filename, self.node_id)
|
|
self._verify_backend_create(mocked_trust, cert_pem)
|
|
|
|
cert.delete_pem(filename)
|
|
self._verify_backend_delete(mocked_trust)
|
|
|
|
os.remove(filename)
|
|
|
|
def test_import_and_delete_cert_pkey(self):
|
|
self._test_import_and_delete_cert(True)
|
|
|
|
def test_import_and_delete_cert_only(self):
|
|
self._test_import_and_delete_cert(False)
|
|
|
|
def test_get_certificate_details(self):
|
|
"""Test retrieving cert details for existing cert"""
|
|
|
|
key_size = 2048
|
|
days = 999
|
|
alg = 'sha256'
|
|
subj = {'country': 'CA',
|
|
'organization': 'squirrel rights',
|
|
'hostname': 'www.squirrels.ca',
|
|
'unit': 'nuts',
|
|
'state': 'BC'}
|
|
|
|
storage_driver = self._prepare_storage_with_existing_cert(key_size,
|
|
days, alg,
|
|
subj)
|
|
with client_cert.ClientCertificateManager(self.identity,
|
|
None,
|
|
storage_driver) as cert:
|
|
self.assertTrue(cert.exists())
|
|
self.assertEqual(days - 1, cert.expires_in_days())
|
|
self.assertEqual(key_size, cert.get_key_size())
|
|
cert_subj = cert.get_subject()
|
|
self.assertEqual(subj, cert_subj)
|
|
|
|
def test_bad_certificate_values(self):
|
|
bad_cert_values = [{'key_size': 1024,
|
|
'valid_for_days': 10,
|
|
'signature_alg': 'sha256',
|
|
'subject': {}},
|
|
{'key_size': 4096,
|
|
'valid_for_days': 100,
|
|
'signature_alg': 'sha',
|
|
'subject': {}}]
|
|
|
|
for args in bad_cert_values:
|
|
self.assertRaises(nsxlib_exc.NsxLibInvalidInput,
|
|
client_cert.generate_self_signed_cert_pair,
|
|
**args)
|