vmware-nsxlib/vmware_nsxlib/v3/trust_management.py
Anna Khmelnitsky 1ac9c11b03 Support multiple client certificate per identity
When openstack runs in HA mode, admin might choose to assign two
separate client certificates for each openstack host. This is
possible with storage_type=none. This change allows deleting cert
and identity based not only on identity name, but on cert pem.
In addition, allow faster cluster recovery in case of certificate
change.

Change-Id: Ia4eea874cfa2bf4befc724b719e53e936292e11f
2017-03-08 02:52:21 -08:00

132 lines
4.8 KiB
Python

# Copyright 2016 VMware, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from vmware_nsxlib.v3 import exceptions as nsxlib_exc
from vmware_nsxlib.v3 import utils
BASE_SECTION = 'trust-management'
CERT_SECTION = BASE_SECTION + '/certificates'
ID_SECTION = BASE_SECTION + '/principal-identities'
USER_GROUP_TYPES = [
'read_only_api_users',
'read_write_api_users',
'superusers']
class NsxLibTrustManagement(utils.NsxLibApiBase):
@staticmethod
def remove_newlines_from_pem(pem):
"""NSX expects pem without newlines in certificate body
BEGIN and END sections should be separated with newlines
"""
lines = pem.split(b'\n')
if len(lines) <= 1:
return pem
result = lines[0] + b'\n'
result += b''.join(lines[1:-2])
result += b'\n' + lines[-2]
return result
def create_cert(self, cert_pem):
resource = CERT_SECTION + '?action=import'
body = {'pem_encoded': self.remove_newlines_from_pem(cert_pem)}
results = self.client.create(resource, body)['results']
if len(results) > 0:
# should be only one result
return results[0]['id']
def get_cert(self, cert_id):
resource = CERT_SECTION + '/' + cert_id
return self.client.get(resource)
def get_certs(self):
return self.client.get(CERT_SECTION)['results']
def delete_cert(self, cert_id):
resource = CERT_SECTION + '/' + cert_id
self.client.delete(resource)
def create_identity(self, name, cert_id,
node_id, permission_group):
# Validate permission group before sending to server
if permission_group not in USER_GROUP_TYPES:
raise nsxlib_exc.InvalidInput(
operation='create_identity',
arg_val=permission_group,
arg_name='permission_group')
body = {'name': name, 'certificate_id': cert_id,
'node_id': node_id, 'permission_group': permission_group,
'is_protected': True}
self.client.create(ID_SECTION, body)
def get_identities(self, name):
ids = self.client.get(ID_SECTION)['results']
return [identity for identity in ids if identity['name'] == name]
def delete_identity(self, identity_id):
resource = ID_SECTION + '/' + identity_id
self.client.delete(resource)
# TODO(annak): kept for sake of short-term stability, remove this
def get_identity_details(self, identity):
results = self.client.get(ID_SECTION)['results']
for result in results:
if result['name'] == identity:
return result
raise nsxlib_exc.ResourceNotFound(
manager=self.client.nsx_api_managers,
operation="Principal identity %s not found" % identity)
def find_cert_and_identity(self, name, cert_pem):
nsx_style_pem = self.remove_newlines_from_pem(cert_pem)
certs = self.get_certs()
cert_ids = [cert['id'] for cert in certs
if cert['pem_encoded'] == nsx_style_pem.decode('ascii')]
if not cert_ids:
raise nsxlib_exc.ResourceNotFound(
manager=self.client.nsx_api_managers,
operation="delete_certificate")
identities = self.get_identities(name)
# should be zero or one matching identities
results = [identity for identity in identities
if identity['certificate_id'] in cert_ids]
if not results:
raise nsxlib_exc.ResourceNotFound(
manager=self.client.nsx_api_managers,
operation="delete_identity")
return results[0]['certificate_id'], results[0]['id']
def delete_cert_and_identity(self, name, cert_pem):
cert_id, identity_id = self.find_cert_and_identity(name, cert_pem)
self.delete_identity(identity_id)
self.delete_cert(cert_id)
def create_cert_and_identity(self, name, cert_pem,
node_id,
permission_group='read_write_api_users'):
nsx_cert_id = self.create_cert(cert_pem)
try:
self.create_identity(name, nsx_cert_id, node_id, permission_group)
except nsxlib_exc.ManagerError as e:
self.delete_cert(nsx_cert_id)
raise e