
Previously, our compute base class overwrote some clients with their admin versions. This is needlessly confusing. This patch just removes the setup_clients() method altogether, along with the create_test_server() override, and forces tests to explicitly use either self.os_primary or self.os_admin as the clients. This change has a few simple but widespread consequences: * We need to start using self.get_host_for_server() in a few places, instead of looking up the 'OS-EXT-SRV-ATTR:host' attribute in the server dict, as that's only present in the admin response. * We can drop the public visibility in copy_default_image(), as that's only allowed for admins, and the default shared visibility should work just as well for us. * The unit test for list_compute_hosts() would need to get fixed to account for the use of self.os_admin.services_client instead of self.services_client. Rather than do that, just drop the test entirely, it adds no value as list_compute_hosts() is used in whitebox tests themselves. * We need to start explicitly plassing wait_until='ACTIVE' to every one of our create_test_server calls, as the override used to do that for us. * Our live_migrate() helper now needs to be passed a clients manager so that it can pass that through to the waiter when waiting for the server to reach a particular status after the live migration. Depends-on: https://review.opendev.org/c/openstack/tempest/+/820062 Change-Id: I8d5be63275bd8a28b7012e14b99cadafdea53a47
124 lines
5.2 KiB
Python
124 lines
5.2 KiB
Python
# Copyright 2021 Red Hat
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from oslo_log import log as logging
|
|
from tempest import config
|
|
|
|
from whitebox_tempest_plugin.api.compute import base
|
|
from whitebox_tempest_plugin.services.clients import QEMUImgClient
|
|
from whitebox_tempest_plugin.utils import get_ctlplane_address
|
|
|
|
CONF = config.CONF
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
# TODO(lyarwood): Use a non admin base class in order to allow the encrypted
|
|
# volume and secrets to be created by a non-admin user.
|
|
class TestQEMUVolumeEncryption(base.BaseWhiteboxComputeTest):
|
|
'''Test which validates encryption. The test performs the following:
|
|
|
|
1. Create a VM
|
|
2. Create two volumes- encrypted and unencrypted
|
|
3. Attach both volumes to the VM
|
|
4. Gather all xml disk elements from guest
|
|
5. Search all xml disk elements for the disk containing the encryption
|
|
element and correct serial id
|
|
6. Validate the xml disk's encryption format is luks
|
|
7. Create volume path based on disk type and pass it to qemu-img to get
|
|
detailed information about the volume.
|
|
8. From the returned info confirm the volume is encrypted and the format
|
|
is luks
|
|
'''
|
|
|
|
@classmethod
|
|
def skip_checks(cls):
|
|
super(TestQEMUVolumeEncryption, cls).skip_checks()
|
|
if not CONF.compute_feature_enabled.attach_encrypted_volume:
|
|
raise cls.skipException('Encrypted volume attach is not supported')
|
|
|
|
def test_qemu_volume_encryption(self):
|
|
server = self.create_test_server(wait_until="ACTIVE")
|
|
unencrypted_vol = self.create_volume()
|
|
encrypted_vol = self.create_encrypted_volume(
|
|
'luks',
|
|
volume_type='luks'
|
|
)
|
|
|
|
# Attach encrypted and unencrypted volume to the server
|
|
self.attach_volume(server, unencrypted_vol)
|
|
self.attach_volume(server, encrypted_vol)
|
|
|
|
# Gather instance XML to ensure the encrypted volume is attached to the
|
|
# instance
|
|
xml = self.get_server_xml(server['id'])
|
|
|
|
# Create a list of all attached volumes on the instance
|
|
attached_volumes = xml.findall('.//disk')
|
|
|
|
# Search the list of disks for the encrypted volume by matching the
|
|
# disk's serial id with the volume id provided by the volumes client
|
|
# and also query that it contains the 'encryption/secret' elements
|
|
xml_disk_elements = [x for x in attached_volumes if
|
|
getattr(x.find('./serial'), 'text', None) ==
|
|
encrypted_vol['id'] and
|
|
x.find("./encryption/secret") is not None]
|
|
|
|
# There should be one and only one disk element present in the
|
|
# instance xml that matches search criteria
|
|
xml_disk_count = len(xml_disk_elements)
|
|
self.assertEqual(
|
|
1, xml_disk_count, 'Expected to find one and only one xml disk '
|
|
'element matching search criteria but instead found %s' %
|
|
xml_disk_count)
|
|
encrypted_disk_xml_element = xml_disk_elements[0]
|
|
|
|
# Confirm encryption format is luks
|
|
encryption_format = encrypted_disk_xml_element.find(
|
|
'./encryption').get('format')
|
|
self.assertEqual(
|
|
'luks', encryption_format, 'Expected encryption type luks but '
|
|
'found %s' % encryption_format)
|
|
|
|
# Determine the encrypted disk's protocol. If RBD then gather user
|
|
# and volume name to generate path, otherwise just use 'source/dev' for
|
|
# volume path
|
|
protocol = encrypted_disk_xml_element.find('./source').get('protocol')
|
|
if protocol and protocol == 'rbd':
|
|
volume = encrypted_disk_xml_element.find('./source').get('name')
|
|
user = encrypted_disk_xml_element.find('./auth').get('username')
|
|
if user:
|
|
path = f"rbd:{volume}:id={user}"
|
|
else:
|
|
path = f"rbd:{volume}"
|
|
else:
|
|
path = encrypted_disk_xml_element.find('./source').get('dev')
|
|
|
|
# Get volume details from qemu-img info with the previously generated
|
|
# volume path
|
|
host = get_ctlplane_address(self.get_host_for_server(server['id']))
|
|
qemu_img_client = QEMUImgClient(host)
|
|
qemu_info = qemu_img_client.info(path)
|
|
|
|
# Check reported qemu-img info that volume is encrypted and the format
|
|
# is luks
|
|
self.assertTrue(
|
|
qemu_info.get('encrypted'), 'qemu-img did not report that the '
|
|
'volume was encrypted')
|
|
|
|
qemu_info_encrypt_format = qemu_info.get('format')
|
|
self.assertEqual(
|
|
'luks', qemu_info_encrypt_format, 'Expected volume format to be '
|
|
'luks but instead found %s' % qemu_info_encrypt_format)
|