From f1f390f2aea4a582b30b8cf45646ebb9afce599b Mon Sep 17 00:00:00 2001 From: Stephen Finucane Date: Tue, 7 May 2024 16:47:14 +0100 Subject: [PATCH] volume: add v3-specific volume type module This makes testing easier. Change-Id: If1ed8d5003160e45d503971ae722fd9983d3dd6d Signed-off-by: Stephen Finucane --- .../tests/unit/volume/v2/test_volume_type.py | 91 +- openstackclient/tests/unit/volume/v3/fakes.py | 133 +- .../tests/unit/volume/v3/test_volume_type.py | 1114 +++++++++++++++++ openstackclient/volume/v2/volume_type.py | 82 -- openstackclient/volume/v3/volume_type.py | 967 ++++++++++++++ setup.cfg | 12 +- 6 files changed, 2222 insertions(+), 177 deletions(-) create mode 100644 openstackclient/tests/unit/volume/v3/test_volume_type.py create mode 100644 openstackclient/volume/v3/volume_type.py diff --git a/openstackclient/tests/unit/volume/v2/test_volume_type.py b/openstackclient/tests/unit/volume/v2/test_volume_type.py index 8b5096ce41..6f50ff2ef6 100644 --- a/openstackclient/tests/unit/volume/v2/test_volume_type.py +++ b/openstackclient/tests/unit/volume/v2/test_volume_type.py @@ -15,7 +15,6 @@ from unittest import mock from unittest.mock import call -from cinderclient import api_versions from osc_lib.cli import format_columns from osc_lib import exceptions from osc_lib import utils @@ -333,7 +332,7 @@ class TestTypeList(TestType): "Name", "Is Public", ] - columns_long = columns + ["Description", "Properties"] + columns_long = columns + ["Description"] data_with_default_type = [(volume_types[0].id, volume_types[0].name, True)] data = [] for t in volume_types: @@ -352,7 +351,6 @@ class TestTypeList(TestType): t.name, t.is_public, t.description, - format_columns.DictColumn(t.extra_specs), ) ) @@ -374,9 +372,7 @@ class TestTypeList(TestType): parsed_args = self.check_parser(self.cmd, arglist, verifylist) columns, data = self.cmd.take_action(parsed_args) - self.volume_types_mock.list.assert_called_once_with( - search_opts={}, is_public=None - ) + self.volume_types_mock.list.assert_called_once_with(is_public=None) self.assertEqual(self.columns, columns) self.assertCountEqual(self.data, list(data)) @@ -393,9 +389,7 @@ class TestTypeList(TestType): parsed_args = self.check_parser(self.cmd, arglist, verifylist) columns, data = self.cmd.take_action(parsed_args) - self.volume_types_mock.list.assert_called_once_with( - search_opts={}, is_public=True - ) + self.volume_types_mock.list.assert_called_once_with(is_public=True) self.assertEqual(self.columns_long, columns) self.assertCountEqual(self.data_long, list(data)) @@ -411,9 +405,7 @@ class TestTypeList(TestType): parsed_args = self.check_parser(self.cmd, arglist, verifylist) columns, data = self.cmd.take_action(parsed_args) - self.volume_types_mock.list.assert_called_once_with( - search_opts={}, is_public=False - ) + self.volume_types_mock.list.assert_called_once_with(is_public=False) self.assertEqual(self.columns, columns) self.assertCountEqual(self.data, list(data)) @@ -434,77 +426,6 @@ class TestTypeList(TestType): self.assertEqual(self.columns, columns) self.assertCountEqual(self.data_with_default_type, list(data)) - def test_type_list_with_properties(self): - self.app.client_manager.volume.api_version = api_versions.APIVersion( - '3.52' - ) - - arglist = [ - "--property", - "foo=bar", - "--multiattach", - "--cacheable", - "--replicated", - "--availability-zone", - "az1", - ] - verifylist = [ - ("encryption_type", False), - ("long", False), - ("is_public", None), - ("default", False), - ("properties", {"foo": "bar"}), - ("multiattach", True), - ("cacheable", True), - ("replicated", True), - ("availability_zones", ["az1"]), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - self.volume_types_mock.list.assert_called_once_with( - search_opts={ - "extra_specs": { - "foo": "bar", - "multiattach": " True", - "cacheable": " True", - "replication_enabled": " True", - "RESKEY:availability_zones": "az1", - } - }, - is_public=None, - ) - self.assertEqual(self.columns, columns) - self.assertCountEqual(self.data, list(data)) - - def test_type_list_with_properties_pre_v352(self): - self.app.client_manager.volume.api_version = api_versions.APIVersion( - '3.51' - ) - - arglist = [ - "--property", - "foo=bar", - ] - verifylist = [ - ("encryption_type", False), - ("long", False), - ("is_public", None), - ("default", False), - ("properties", {"foo": "bar"}), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - exc = self.assertRaises( - exceptions.CommandError, - self.cmd.take_action, - parsed_args, - ) - self.assertIn( - '--os-volume-api-version 3.52 or greater is required', - str(exc), - ) - def test_type_list_with_encryption(self): encryption_type = volume_fakes.create_one_encryption_volume_type( attrs={'volume_type_id': self.volume_types[0].id}, @@ -550,9 +471,7 @@ class TestTypeList(TestType): columns, data = self.cmd.take_action(parsed_args) self.volume_encryption_types_mock.list.assert_called_once_with() - self.volume_types_mock.list.assert_called_once_with( - search_opts={}, is_public=None - ) + self.volume_types_mock.list.assert_called_once_with(is_public=None) self.assertEqual(encryption_columns, columns) self.assertCountEqual(encryption_data, list(data)) diff --git a/openstackclient/tests/unit/volume/v3/fakes.py b/openstackclient/tests/unit/volume/v3/fakes.py index b8ebe516ae..b014efc43b 100644 --- a/openstackclient/tests/unit/volume/v3/fakes.py +++ b/openstackclient/tests/unit/volume/v3/fakes.py @@ -62,12 +62,18 @@ class FakeVolumeClient: self.resource_filters.resource_class = fakes.FakeResource(None, {}) self.restores = mock.Mock() self.restores.resource_class = fakes.FakeResource(None, {}) - self.volumes = mock.Mock() - self.volumes.resource_class = fakes.FakeResource(None, {}) + self.volume_encryption_types = mock.Mock() + self.volume_encryption_types.resource_class = fakes.FakeResource( + None, {} + ) self.volume_snapshots = mock.Mock() self.volume_snapshots.resource_class = fakes.FakeResource(None, {}) + self.volume_type_access = mock.Mock() + self.volume_type_access.resource_class = fakes.FakeResource(None, {}) self.volume_types = mock.Mock() self.volume_types.resource_class = fakes.FakeResource(None, {}) + self.volumes = mock.Mock() + self.volumes.resource_class = fakes.FakeResource(None, {}) self.services = mock.Mock() self.services.resource_class = fakes.FakeResource(None, {}) self.workers = mock.Mock() @@ -118,7 +124,6 @@ class TestVolume( # TODO(stephenfin): Check if the responses are actually the same create_one_snapshot = volume_v2_fakes.create_one_snapshot -create_one_volume_type = volume_v2_fakes.create_one_volume_type def create_one_availability_zone(attrs=None): @@ -364,6 +369,34 @@ def create_clusters(attrs=None, count=2): return clusters +def create_one_encryption_volume_type(attrs=None): + """Create a fake encryption volume type. + + :param dict attrs: + A dictionary with all attributes + :return: + A FakeResource object with volume_type_id etc. + """ + attrs = attrs or {} + + # Set default attributes. + encryption_info = { + "volume_type_id": 'type-id-' + uuid.uuid4().hex, + 'provider': 'LuksEncryptor', + 'cipher': None, + 'key_size': None, + 'control_location': 'front-end', + } + + # Overwrite default attributes. + encryption_info.update(attrs) + + encryption_type = fakes.FakeResource( + info=copy.deepcopy(encryption_info), loaded=True + ) + return encryption_type + + def create_one_resource_filter(attrs=None): """Create a fake resource filter. @@ -405,6 +438,31 @@ def create_resource_filters(attrs=None, count=2): return resource_filters +def create_one_type_access(attrs=None): + """Create a fake volume type access for project. + + :param dict attrs: + A dictionary with all attributes + :return: + A FakeResource object, with Volume_type_ID and Project_ID. + """ + if attrs is None: + attrs = {} + + # Set default attributes. + type_access_attrs = { + 'volume_type_id': 'volume-type-id-' + uuid.uuid4().hex, + 'project_id': 'project-id-' + uuid.uuid4().hex, + } + + # Overwrite default attributes. + type_access_attrs.update(attrs) + + type_access = fakes.FakeResource(None, type_access_attrs, loaded=True) + + return type_access + + def create_one_volume(attrs=None): """Create a fake volume. @@ -821,6 +879,75 @@ def get_volume_attachments(attachments=None, count=2): return mock.Mock(side_effect=attachments) +def create_one_volume_type(attrs=None, methods=None): + """Create a fake volume type. + + :param dict attrs: + A dictionary with all attributes + :param dict methods: + A dictionary with all methods + :return: + A FakeResource object with id, name, description, etc. + """ + attrs = attrs or {} + methods = methods or {} + + # Set default attributes. + volume_type_info = { + "id": 'type-id-' + uuid.uuid4().hex, + "name": 'type-name-' + uuid.uuid4().hex, + "description": 'type-description-' + uuid.uuid4().hex, + "extra_specs": {"foo": "bar"}, + "is_public": True, + } + + # Overwrite default attributes. + volume_type_info.update(attrs) + + volume_type = fakes.FakeResource( + info=copy.deepcopy(volume_type_info), methods=methods, loaded=True + ) + return volume_type + + +def create_volume_types(attrs=None, count=2): + """Create multiple fake volume_types. + + :param dict attrs: + A dictionary with all attributes + :param int count: + The number of types to fake + :return: + A list of FakeResource objects faking the types + """ + volume_types = [] + for i in range(0, count): + volume_type = create_one_volume_type(attrs) + volume_types.append(volume_type) + + return volume_types + + +def get_volume_types(volume_types=None, count=2): + """Get an iterable MagicMock object with a list of faked volume types. + + If volume_types list is provided, then initialize the Mock object with + the list. Otherwise create one. + + :param List volume_types: + A list of FakeResource objects faking volume types + :param Integer count: + The number of volume types to be faked + :return + An iterable Mock object with side_effect set to a list of faked + volume types + """ + if volume_types is None: + volume_types = create_volume_types(count) + + return mock.Mock(side_effect=volume_types) + + def create_service_log_level_entry(attrs=None): service_log_level_info = { 'host': 'host_test', diff --git a/openstackclient/tests/unit/volume/v3/test_volume_type.py b/openstackclient/tests/unit/volume/v3/test_volume_type.py new file mode 100644 index 0000000000..5f791da256 --- /dev/null +++ b/openstackclient/tests/unit/volume/v3/test_volume_type.py @@ -0,0 +1,1114 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from unittest import mock +from unittest.mock import call + +from cinderclient import api_versions +from osc_lib.cli import format_columns +from osc_lib import exceptions +from osc_lib import utils + +from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes +from openstackclient.tests.unit import utils as tests_utils +from openstackclient.tests.unit.volume.v3 import fakes as volume_fakes +from openstackclient.volume.v3 import volume_type + + +class TestType(volume_fakes.TestVolume): + def setUp(self): + super().setUp() + + self.volume_types_mock = self.volume_client.volume_types + self.volume_types_mock.reset_mock() + + self.volume_type_access_mock = self.volume_client.volume_type_access + self.volume_type_access_mock.reset_mock() + + self.volume_encryption_types_mock = ( + self.volume_client.volume_encryption_types + ) + self.volume_encryption_types_mock.reset_mock() + + self.projects_mock = self.identity_client.projects + self.projects_mock.reset_mock() + + +class TestTypeCreate(TestType): + def setUp(self): + super().setUp() + + self.new_volume_type = volume_fakes.create_one_volume_type( + methods={'set_keys': None}, + ) + self.project = identity_fakes.FakeProject.create_one_project() + self.columns = ( + 'description', + 'id', + 'is_public', + 'name', + ) + self.data = ( + self.new_volume_type.description, + self.new_volume_type.id, + True, + self.new_volume_type.name, + ) + + self.volume_types_mock.create.return_value = self.new_volume_type + self.projects_mock.get.return_value = self.project + # Get the command object to test + self.cmd = volume_type.CreateVolumeType(self.app, None) + + def test_type_create_public(self): + arglist = [ + "--description", + self.new_volume_type.description, + "--public", + self.new_volume_type.name, + ] + verifylist = [ + ("description", self.new_volume_type.description), + ("is_public", True), + ("name", self.new_volume_type.name), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + self.volume_types_mock.create.assert_called_with( + self.new_volume_type.name, + description=self.new_volume_type.description, + is_public=True, + ) + + self.assertEqual(self.columns, columns) + self.assertCountEqual(self.data, data) + + def test_type_create_private(self): + arglist = [ + "--description", + self.new_volume_type.description, + "--private", + "--project", + self.project.id, + self.new_volume_type.name, + ] + verifylist = [ + ("description", self.new_volume_type.description), + ("is_public", False), + ("project", self.project.id), + ("name", self.new_volume_type.name), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + self.volume_types_mock.create.assert_called_with( + self.new_volume_type.name, + description=self.new_volume_type.description, + is_public=False, + ) + + self.assertEqual(self.columns, columns) + self.assertCountEqual(self.data, data) + + def test_type_create_with_properties(self): + arglist = [ + '--property', + 'myprop=myvalue', + # this combination isn't viable server-side but is okay for testing + '--multiattach', + '--cacheable', + '--replicated', + '--availability-zone', + 'az1', + self.new_volume_type.name, + ] + verifylist = [ + ('properties', {'myprop': 'myvalue'}), + ('multiattach', True), + ('cacheable', True), + ('replicated', True), + ('availability_zones', ['az1']), + ('name', self.new_volume_type.name), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + self.volume_types_mock.create.assert_called_with( + self.new_volume_type.name, description=None + ) + self.new_volume_type.set_keys.assert_called_once_with( + { + 'myprop': 'myvalue', + 'multiattach': ' True', + 'cacheable': ' True', + 'replication_enabled': ' True', + 'RESKEY:availability_zones': 'az1', + } + ) + + self.columns += ('properties',) + self.data += (format_columns.DictColumn(None),) + + self.assertEqual(self.columns, columns) + self.assertCountEqual(self.data, data) + + def test_public_type_create_with_project_public(self): + arglist = [ + '--project', + self.project.id, + self.new_volume_type.name, + ] + verifylist = [ + ('is_public', None), + ('project', self.project.id), + ('name', self.new_volume_type.name), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + self.assertRaises( + exceptions.CommandError, + self.cmd.take_action, + parsed_args, + ) + + def test_type_create_with_encryption(self): + encryption_info = { + 'provider': 'LuksEncryptor', + 'cipher': 'aes-xts-plain64', + 'key_size': '128', + 'control_location': 'front-end', + } + encryption_type = volume_fakes.create_one_encryption_volume_type( + attrs=encryption_info, + ) + self.new_volume_type = volume_fakes.create_one_volume_type( + attrs={'encryption': encryption_info}, + ) + self.volume_types_mock.create.return_value = self.new_volume_type + self.volume_encryption_types_mock.create.return_value = encryption_type + encryption_columns = ( + 'description', + 'encryption', + 'id', + 'is_public', + 'name', + ) + encryption_data = ( + self.new_volume_type.description, + format_columns.DictColumn(encryption_info), + self.new_volume_type.id, + True, + self.new_volume_type.name, + ) + arglist = [ + '--encryption-provider', + 'LuksEncryptor', + '--encryption-cipher', + 'aes-xts-plain64', + '--encryption-key-size', + '128', + '--encryption-control-location', + 'front-end', + self.new_volume_type.name, + ] + verifylist = [ + ('encryption_provider', 'LuksEncryptor'), + ('encryption_cipher', 'aes-xts-plain64'), + ('encryption_key_size', 128), + ('encryption_control_location', 'front-end'), + ('name', self.new_volume_type.name), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + self.volume_types_mock.create.assert_called_with( + self.new_volume_type.name, + description=None, + ) + body = { + 'provider': 'LuksEncryptor', + 'cipher': 'aes-xts-plain64', + 'key_size': 128, + 'control_location': 'front-end', + } + self.volume_encryption_types_mock.create.assert_called_with( + self.new_volume_type, + body, + ) + self.assertEqual(encryption_columns, columns) + self.assertCountEqual(encryption_data, data) + + +class TestTypeDelete(TestType): + volume_types = volume_fakes.create_volume_types(count=2) + + def setUp(self): + super().setUp() + + self.volume_types_mock.get = volume_fakes.get_volume_types( + self.volume_types, + ) + self.volume_types_mock.delete.return_value = None + + # Get the command object to mock + self.cmd = volume_type.DeleteVolumeType(self.app, None) + + def test_type_delete(self): + arglist = [self.volume_types[0].id] + verifylist = [("volume_types", [self.volume_types[0].id])] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + self.volume_types_mock.delete.assert_called_with(self.volume_types[0]) + self.assertIsNone(result) + + def test_delete_multiple_types(self): + arglist = [] + for t in self.volume_types: + arglist.append(t.id) + verifylist = [ + ('volume_types', arglist), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + calls = [] + for t in self.volume_types: + calls.append(call(t)) + self.volume_types_mock.delete.assert_has_calls(calls) + self.assertIsNone(result) + + def test_delete_multiple_types_with_exception(self): + arglist = [ + self.volume_types[0].id, + 'unexist_type', + ] + verifylist = [ + ('volume_types', arglist), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + find_mock_result = [self.volume_types[0], exceptions.CommandError] + with mock.patch.object( + utils, 'find_resource', side_effect=find_mock_result + ) as find_mock: + try: + self.cmd.take_action(parsed_args) + self.fail('CommandError should be raised.') + except exceptions.CommandError as e: + self.assertEqual( + '1 of 2 volume types failed to delete.', str(e) + ) + find_mock.assert_any_call( + self.volume_types_mock, self.volume_types[0].id + ) + find_mock.assert_any_call(self.volume_types_mock, 'unexist_type') + + self.assertEqual(2, find_mock.call_count) + self.volume_types_mock.delete.assert_called_once_with( + self.volume_types[0] + ) + + +class TestTypeList(TestType): + volume_types = volume_fakes.create_volume_types() + + columns = [ + "ID", + "Name", + "Is Public", + ] + columns_long = columns + ["Description", "Properties"] + data_with_default_type = [(volume_types[0].id, volume_types[0].name, True)] + data = [] + for t in volume_types: + data.append( + ( + t.id, + t.name, + t.is_public, + ) + ) + data_long = [] + for t in volume_types: + data_long.append( + ( + t.id, + t.name, + t.is_public, + t.description, + format_columns.DictColumn(t.extra_specs), + ) + ) + + def setUp(self): + super().setUp() + + self.volume_types_mock.list.return_value = self.volume_types + self.volume_types_mock.default.return_value = self.volume_types[0] + # get the command to test + self.cmd = volume_type.ListVolumeType(self.app, None) + + def test_type_list_without_options(self): + arglist = [] + verifylist = [ + ("long", False), + ("is_public", None), + ("default", False), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + self.volume_types_mock.list.assert_called_once_with( + search_opts={}, is_public=None + ) + self.assertEqual(self.columns, columns) + self.assertCountEqual(self.data, list(data)) + + def test_type_list_with_options(self): + arglist = [ + "--long", + "--public", + ] + verifylist = [ + ("long", True), + ("is_public", True), + ("default", False), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + self.volume_types_mock.list.assert_called_once_with( + search_opts={}, is_public=True + ) + self.assertEqual(self.columns_long, columns) + self.assertCountEqual(self.data_long, list(data)) + + def test_type_list_with_private_option(self): + arglist = [ + "--private", + ] + verifylist = [ + ("long", False), + ("is_public", False), + ("default", False), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + self.volume_types_mock.list.assert_called_once_with( + search_opts={}, is_public=False + ) + self.assertEqual(self.columns, columns) + self.assertCountEqual(self.data, list(data)) + + def test_type_list_with_default_option(self): + arglist = [ + "--default", + ] + verifylist = [ + ("encryption_type", False), + ("long", False), + ("is_public", None), + ("default", True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + self.volume_types_mock.default.assert_called_once_with() + self.assertEqual(self.columns, columns) + self.assertCountEqual(self.data_with_default_type, list(data)) + + def test_type_list_with_properties(self): + self.app.client_manager.volume.api_version = api_versions.APIVersion( + '3.52' + ) + + arglist = [ + "--property", + "foo=bar", + "--multiattach", + "--cacheable", + "--replicated", + "--availability-zone", + "az1", + ] + verifylist = [ + ("encryption_type", False), + ("long", False), + ("is_public", None), + ("default", False), + ("properties", {"foo": "bar"}), + ("multiattach", True), + ("cacheable", True), + ("replicated", True), + ("availability_zones", ["az1"]), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + self.volume_types_mock.list.assert_called_once_with( + search_opts={ + "extra_specs": { + "foo": "bar", + "multiattach": " True", + "cacheable": " True", + "replication_enabled": " True", + "RESKEY:availability_zones": "az1", + } + }, + is_public=None, + ) + self.assertEqual(self.columns, columns) + self.assertCountEqual(self.data, list(data)) + + def test_type_list_with_properties_pre_v352(self): + self.app.client_manager.volume.api_version = api_versions.APIVersion( + '3.51' + ) + + arglist = [ + "--property", + "foo=bar", + ] + verifylist = [ + ("encryption_type", False), + ("long", False), + ("is_public", None), + ("default", False), + ("properties", {"foo": "bar"}), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + exc = self.assertRaises( + exceptions.CommandError, + self.cmd.take_action, + parsed_args, + ) + self.assertIn( + '--os-volume-api-version 3.52 or greater is required', + str(exc), + ) + + def test_type_list_with_encryption(self): + encryption_type = volume_fakes.create_one_encryption_volume_type( + attrs={'volume_type_id': self.volume_types[0].id}, + ) + encryption_info = { + 'provider': 'LuksEncryptor', + 'cipher': None, + 'key_size': None, + 'control_location': 'front-end', + } + encryption_columns = self.columns + [ + "Encryption", + ] + encryption_data = [] + encryption_data.append( + ( + self.volume_types[0].id, + self.volume_types[0].name, + self.volume_types[0].is_public, + volume_type.EncryptionInfoColumn( + self.volume_types[0].id, + {self.volume_types[0].id: encryption_info}, + ), + ) + ) + encryption_data.append( + ( + self.volume_types[1].id, + self.volume_types[1].name, + self.volume_types[1].is_public, + volume_type.EncryptionInfoColumn(self.volume_types[1].id, {}), + ) + ) + + self.volume_encryption_types_mock.list.return_value = [encryption_type] + arglist = [ + "--encryption-type", + ] + verifylist = [ + ("encryption_type", True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + self.volume_encryption_types_mock.list.assert_called_once_with() + self.volume_types_mock.list.assert_called_once_with( + search_opts={}, is_public=None + ) + self.assertEqual(encryption_columns, columns) + self.assertCountEqual(encryption_data, list(data)) + + +class TestTypeSet(TestType): + def setUp(self): + super().setUp() + + self.project = identity_fakes.FakeProject.create_one_project() + self.projects_mock.get.return_value = self.project + + self.volume_type = volume_fakes.create_one_volume_type( + methods={'set_keys': None}, + ) + self.volume_types_mock.get.return_value = self.volume_type + self.volume_encryption_types_mock.create.return_value = None + self.volume_encryption_types_mock.update.return_value = None + + self.cmd = volume_type.SetVolumeType(self.app, None) + + def test_type_set(self): + arglist = [ + '--name', + 'new_name', + '--description', + 'new_description', + '--private', + self.volume_type.id, + ] + verifylist = [ + ('name', 'new_name'), + ('description', 'new_description'), + ('properties', None), + ('volume_type', self.volume_type.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + kwargs = { + 'name': 'new_name', + 'description': 'new_description', + 'is_public': False, + } + self.volume_types_mock.update.assert_called_with( + self.volume_type.id, **kwargs + ) + self.assertIsNone(result) + + self.volume_type_access_mock.add_project_access.assert_not_called() + self.volume_encryption_types_mock.update.assert_not_called() + self.volume_encryption_types_mock.create.assert_not_called() + + def test_type_set_property(self): + arglist = [ + '--property', + 'myprop=myvalue', + # this combination isn't viable server-side but is okay for testing + '--multiattach', + '--cacheable', + '--replicated', + '--availability-zone', + 'az1', + self.volume_type.id, + ] + verifylist = [ + ('name', None), + ('description', None), + ('properties', {'myprop': 'myvalue'}), + ('multiattach', True), + ('cacheable', True), + ('replicated', True), + ('availability_zones', ['az1']), + ('volume_type', self.volume_type.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + self.assertIsNone(result) + + self.volume_type.set_keys.assert_called_once_with( + { + 'myprop': 'myvalue', + 'multiattach': ' True', + 'cacheable': ' True', + 'replication_enabled': ' True', + 'RESKEY:availability_zones': 'az1', + } + ) + self.volume_type_access_mock.add_project_access.assert_not_called() + self.volume_encryption_types_mock.update.assert_not_called() + self.volume_encryption_types_mock.create.assert_not_called() + + def test_type_set_with_empty_project(self): + arglist = [ + '--project', + '', + self.volume_type.id, + ] + verifylist = [ + ('project', ''), + ('volume_type', self.volume_type.id), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + self.assertIsNone(result) + + self.volume_type.set_keys.assert_not_called() + self.volume_type_access_mock.add_project_access.assert_not_called() + self.volume_encryption_types_mock.update.assert_not_called() + self.volume_encryption_types_mock.create.assert_not_called() + + def test_type_set_with_project(self): + arglist = [ + '--project', + self.project.id, + self.volume_type.id, + ] + verifylist = [ + ('project', self.project.id), + ('volume_type', self.volume_type.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + self.assertIsNone(result) + + self.volume_type.set_keys.assert_not_called() + self.volume_type_access_mock.add_project_access.assert_called_with( + self.volume_type.id, + self.project.id, + ) + self.volume_encryption_types_mock.update.assert_not_called() + self.volume_encryption_types_mock.create.assert_not_called() + + def test_type_set_with_new_encryption(self): + self.volume_encryption_types_mock.update.side_effect = ( + exceptions.NotFound('NotFound') + ) + arglist = [ + '--encryption-provider', + 'LuksEncryptor', + '--encryption-cipher', + 'aes-xts-plain64', + '--encryption-key-size', + '128', + '--encryption-control-location', + 'front-end', + self.volume_type.id, + ] + verifylist = [ + ('encryption_provider', 'LuksEncryptor'), + ('encryption_cipher', 'aes-xts-plain64'), + ('encryption_key_size', 128), + ('encryption_control_location', 'front-end'), + ('volume_type', self.volume_type.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + self.assertIsNone(result) + + body = { + 'provider': 'LuksEncryptor', + 'cipher': 'aes-xts-plain64', + 'key_size': 128, + 'control_location': 'front-end', + } + self.volume_encryption_types_mock.update.assert_called_with( + self.volume_type, + body, + ) + self.volume_encryption_types_mock.create.assert_called_with( + self.volume_type, + body, + ) + + @mock.patch.object(utils, 'find_resource') + def test_type_set_with_existing_encryption(self, mock_find): + mock_find.side_effect = [self.volume_type, "existing_encryption_type"] + arglist = [ + '--encryption-provider', + 'LuksEncryptor', + '--encryption-cipher', + 'aes-xts-plain64', + '--encryption-control-location', + 'front-end', + self.volume_type.id, + ] + verifylist = [ + ('encryption_provider', 'LuksEncryptor'), + ('encryption_cipher', 'aes-xts-plain64'), + ('encryption_control_location', 'front-end'), + ('volume_type', self.volume_type.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + self.assertIsNone(result) + + self.volume_type.set_keys.assert_not_called() + self.volume_type_access_mock.add_project_access.assert_not_called() + body = { + 'provider': 'LuksEncryptor', + 'cipher': 'aes-xts-plain64', + 'control_location': 'front-end', + } + self.volume_encryption_types_mock.update.assert_called_with( + self.volume_type, + body, + ) + self.volume_encryption_types_mock.create.assert_not_called() + + def test_type_set_new_encryption_without_provider(self): + self.volume_encryption_types_mock.update.side_effect = ( + exceptions.NotFound('NotFound') + ) + arglist = [ + '--encryption-cipher', + 'aes-xts-plain64', + '--encryption-key-size', + '128', + '--encryption-control-location', + 'front-end', + self.volume_type.id, + ] + verifylist = [ + ('encryption_cipher', 'aes-xts-plain64'), + ('encryption_key_size', 128), + ('encryption_control_location', 'front-end'), + ('volume_type', self.volume_type.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + exc = self.assertRaises( + exceptions.CommandError, + self.cmd.take_action, + parsed_args, + ) + self.assertEqual( + "Command Failed: One or more of the operations failed", + str(exc), + ) + + self.volume_type.set_keys.assert_not_called() + self.volume_type_access_mock.add_project_access.assert_not_called() + body = { + 'cipher': 'aes-xts-plain64', + 'key_size': 128, + 'control_location': 'front-end', + } + self.volume_encryption_types_mock.update.assert_called_with( + self.volume_type, + body, + ) + self.volume_encryption_types_mock.create.assert_not_called() + + +class TestTypeShow(TestType): + columns = ( + 'access_project_ids', + 'description', + 'id', + 'is_public', + 'name', + 'properties', + ) + + def setUp(self): + super().setUp() + + self.volume_type = volume_fakes.create_one_volume_type() + self.data = ( + None, + self.volume_type.description, + self.volume_type.id, + True, + self.volume_type.name, + format_columns.DictColumn(self.volume_type.extra_specs), + ) + + self.volume_types_mock.get.return_value = self.volume_type + + # Get the command object to test + self.cmd = volume_type.ShowVolumeType(self.app, None) + + def test_type_show(self): + arglist = [self.volume_type.id] + verifylist = [ + ("encryption_type", False), + ("volume_type", self.volume_type.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + self.volume_types_mock.get.assert_called_with(self.volume_type.id) + + self.assertEqual(self.columns, columns) + self.assertCountEqual(self.data, data) + + def test_type_show_with_access(self): + arglist = [self.volume_type.id] + verifylist = [("volume_type", self.volume_type.id)] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + private_type = volume_fakes.create_one_volume_type( + attrs={'is_public': False}, + ) + type_access_list = volume_fakes.create_one_type_access() + with mock.patch.object( + self.volume_types_mock, + 'get', + return_value=private_type, + ): + with mock.patch.object( + self.volume_type_access_mock, + 'list', + return_value=[type_access_list], + ): + columns, data = self.cmd.take_action(parsed_args) + self.volume_types_mock.get.assert_called_once_with( + self.volume_type.id + ) + self.volume_type_access_mock.list.assert_called_once_with( + private_type.id + ) + + self.assertEqual(self.columns, columns) + private_type_data = ( + format_columns.ListColumn([type_access_list.project_id]), + private_type.description, + private_type.id, + private_type.is_public, + private_type.name, + format_columns.DictColumn(private_type.extra_specs), + ) + self.assertCountEqual(private_type_data, data) + + def test_type_show_with_list_access_exec(self): + arglist = [self.volume_type.id] + verifylist = [("volume_type", self.volume_type.id)] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + private_type = volume_fakes.create_one_volume_type( + attrs={'is_public': False}, + ) + with mock.patch.object( + self.volume_types_mock, 'get', return_value=private_type + ): + with mock.patch.object( + self.volume_type_access_mock, 'list', side_effect=Exception() + ): + columns, data = self.cmd.take_action(parsed_args) + self.volume_types_mock.get.assert_called_once_with( + self.volume_type.id + ) + self.volume_type_access_mock.list.assert_called_once_with( + private_type.id + ) + + self.assertEqual(self.columns, columns) + private_type_data = ( + None, + private_type.description, + private_type.id, + private_type.is_public, + private_type.name, + format_columns.DictColumn(private_type.extra_specs), + ) + self.assertCountEqual(private_type_data, data) + + def test_type_show_with_encryption(self): + encryption_type = volume_fakes.create_one_encryption_volume_type() + encryption_info = { + 'provider': 'LuksEncryptor', + 'cipher': None, + 'key_size': None, + 'control_location': 'front-end', + } + self.volume_type = volume_fakes.create_one_volume_type( + attrs={'encryption': encryption_info}, + ) + self.volume_types_mock.get.return_value = self.volume_type + self.volume_encryption_types_mock.get.return_value = encryption_type + encryption_columns = ( + 'access_project_ids', + 'description', + 'encryption', + 'id', + 'is_public', + 'name', + 'properties', + ) + encryption_data = ( + None, + self.volume_type.description, + format_columns.DictColumn(encryption_info), + self.volume_type.id, + True, + self.volume_type.name, + format_columns.DictColumn(self.volume_type.extra_specs), + ) + arglist = ['--encryption-type', self.volume_type.id] + verifylist = [ + ('encryption_type', True), + ("volume_type", self.volume_type.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + self.volume_types_mock.get.assert_called_with(self.volume_type.id) + self.volume_encryption_types_mock.get.assert_called_with( + self.volume_type.id + ) + self.assertEqual(encryption_columns, columns) + self.assertCountEqual(encryption_data, data) + + +class TestTypeUnset(TestType): + project = identity_fakes.FakeProject.create_one_project() + volume_type = volume_fakes.create_one_volume_type( + methods={'unset_keys': None}, + ) + + def setUp(self): + super().setUp() + + self.volume_types_mock.get.return_value = self.volume_type + + # Return a project + self.projects_mock.get.return_value = self.project + + # Get the command object to test + self.cmd = volume_type.UnsetVolumeType(self.app, None) + + def test_type_unset(self): + arglist = [ + '--property', + 'property', + '--property', + 'multi_property', + self.volume_type.id, + ] + verifylist = [ + ('properties', ['property', 'multi_property']), + ('volume_type', self.volume_type.id), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + self.volume_type.unset_keys.assert_called_once_with( + ['property', 'multi_property'] + ) + self.assertIsNone(result) + + def test_type_unset_project_access(self): + arglist = [ + '--project', + self.project.id, + self.volume_type.id, + ] + verifylist = [ + ('project', self.project.id), + ('volume_type', self.volume_type.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + self.assertIsNone(result) + + self.volume_type_access_mock.remove_project_access.assert_called_with( + self.volume_type.id, + self.project.id, + ) + + def test_type_unset_not_called_without_project_argument(self): + arglist = [ + '--project', + '', + self.volume_type.id, + ] + verifylist = [ + ('encryption_type', False), + ('project', ''), + ('volume_type', self.volume_type.id), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + self.assertIsNone(result) + self.volume_encryption_types_mock.delete.assert_not_called() + self.assertFalse( + self.volume_type_access_mock.remove_project_access.called + ) + + def test_type_unset_failed_with_missing_volume_type_argument(self): + arglist = [ + '--project', + 'identity_fakes.project_id', + ] + verifylist = [ + ('project', 'identity_fakes.project_id'), + ] + + self.assertRaises( + tests_utils.ParserException, + self.check_parser, + self.cmd, + arglist, + verifylist, + ) + + def test_type_unset_encryption_type(self): + arglist = [ + '--encryption-type', + self.volume_type.id, + ] + verifylist = [ + ('encryption_type', True), + ('volume_type', self.volume_type.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + self.volume_encryption_types_mock.delete.assert_called_with( + self.volume_type + ) + self.assertIsNone(result) + + +class TestColumns(TestType): + def test_encryption_info_column_with_info(self): + fake_volume_type = volume_fakes.create_one_volume_type() + type_id = fake_volume_type.id + + encryption_info = { + 'provider': 'LuksEncryptor', + 'cipher': None, + 'key_size': None, + 'control_location': 'front-end', + } + col = volume_type.EncryptionInfoColumn( + type_id, {type_id: encryption_info} + ) + self.assertEqual( + utils.format_dict(encryption_info), col.human_readable() + ) + self.assertEqual(encryption_info, col.machine_readable()) + + def test_encryption_info_column_without_info(self): + fake_volume_type = volume_fakes.create_one_volume_type() + type_id = fake_volume_type.id + + col = volume_type.EncryptionInfoColumn(type_id, {}) + self.assertEqual('-', col.human_readable()) + self.assertIsNone(col.machine_readable()) diff --git a/openstackclient/volume/v2/volume_type.py b/openstackclient/volume/v2/volume_type.py index c472a3c28c..f4540f4e85 100644 --- a/openstackclient/volume/v2/volume_type.py +++ b/openstackclient/volume/v2/volume_type.py @@ -17,7 +17,6 @@ import functools import logging -from cinderclient import api_versions from cliff import columns as cliff_columns from osc_lib.cli import format_columns from osc_lib.cli import parseractions @@ -409,59 +408,6 @@ class ListVolumeType(command.Lister): "(admin only)" ), ) - parser.add_argument( - '--property', - metavar='', - action=parseractions.KeyValueAction, - dest='properties', - help=_( - 'Filter by a property on the volume types ' - '(repeat option to filter by multiple properties) ' - '(admin only except for user-visible extra specs) ' - '(supported by --os-volume-api-version 3.52 or above)' - ), - ) - parser.add_argument( - '--multiattach', - action='store_true', - default=False, - help=_( - "List only volume types with multi-attach enabled " - "(this is an alias for '--property multiattach= True') " - "(supported by --os-volume-api-version 3.52 or above)" - ), - ) - parser.add_argument( - '--cacheable', - action='store_true', - default=False, - help=_( - "List only volume types with caching enabled " - "(this is an alias for '--property cacheable= True') " - "(admin only) " - "(supported by --os-volume-api-version 3.52 or above)" - ), - ) - parser.add_argument( - '--replicated', - action='store_true', - default=False, - help=_( - "List only volume types with replication enabled " - "(this is an alias for '--property replication_enabled= True') " # noqa: E501 - "(supported by --os-volume-api-version 3.52 or above)" - ), - ) - parser.add_argument( - '--availability-zone', - action='append', - dest='availability_zones', - help=_( - "List only volume types with this availability configured " - "(this is an alias for '--property RESKEY:availability_zones:') " # noqa: E501 - "(repeat option to filter on multiple availability zones)" - ), - ) return parser def take_action(self, parsed_args): @@ -473,14 +419,12 @@ class ListVolumeType(command.Lister): 'Name', 'Is Public', 'Description', - 'Extra Specs', ] column_headers = [ 'ID', 'Name', 'Is Public', 'Description', - 'Properties', ] else: columns = ['ID', 'Name', 'Is Public'] @@ -489,33 +433,7 @@ class ListVolumeType(command.Lister): if parsed_args.default: data = [volume_client.volume_types.default()] else: - search_opts = {} - properties = {} - if parsed_args.properties: - properties.update(parsed_args.properties) - if parsed_args.multiattach: - properties['multiattach'] = ' True' - if parsed_args.cacheable: - properties['cacheable'] = ' True' - if parsed_args.replicated: - properties['replication_enabled'] = ' True' - if parsed_args.availability_zones: - properties['RESKEY:availability_zones'] = ','.join( - parsed_args.availability_zones - ) - if properties: - if volume_client.api_version < api_versions.APIVersion('3.52'): - msg = _( - "--os-volume-api-version 3.52 or greater is required " - "to use the '--property' option or any of the alias " - "options" - ) - raise exceptions.CommandError(msg) - - search_opts['extra_specs'] = properties - data = volume_client.volume_types.list( - search_opts=search_opts, is_public=parsed_args.is_public, ) diff --git a/openstackclient/volume/v3/volume_type.py b/openstackclient/volume/v3/volume_type.py new file mode 100644 index 0000000000..96d7553d4a --- /dev/null +++ b/openstackclient/volume/v3/volume_type.py @@ -0,0 +1,967 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +"""Volume v3 Type action implementations""" + +import functools +import logging + +from cinderclient import api_versions +from cliff import columns as cliff_columns +from osc_lib.cli import format_columns +from osc_lib.cli import parseractions +from osc_lib.command import command +from osc_lib import exceptions +from osc_lib import utils + +from openstackclient.i18n import _ +from openstackclient.identity import common as identity_common + + +LOG = logging.getLogger(__name__) + + +class EncryptionInfoColumn(cliff_columns.FormattableColumn): + """Formattable column for encryption info column. + + Unlike the parent FormattableColumn class, the initializer of the + class takes encryption_data as the second argument. + osc_lib.utils.get_item_properties instantiate cliff FormattableColumn + object with a single parameter "column value", so you need to pass + a partially initialized class like + ``functools.partial(EncryptionInfoColumn encryption_data)``. + """ + + def __init__(self, value, encryption_data=None): + super().__init__(value) + self._encryption_data = encryption_data or {} + + def _get_encryption_info(self): + type_id = self._value + return self._encryption_data.get(type_id) + + def human_readable(self): + encryption_info = self._get_encryption_info() + if encryption_info: + return utils.format_dict(encryption_info) + else: + return '-' + + def machine_readable(self): + return self._get_encryption_info() + + +def _create_encryption_type(volume_client, volume_type, parsed_args): + if not parsed_args.encryption_provider: + msg = _( + "'--encryption-provider' should be specified while " + "creating a new encryption type" + ) + raise exceptions.CommandError(msg) + # set the default of control location while creating + control_location = 'front-end' + if parsed_args.encryption_control_location: + control_location = parsed_args.encryption_control_location + body = { + 'provider': parsed_args.encryption_provider, + 'cipher': parsed_args.encryption_cipher, + 'key_size': parsed_args.encryption_key_size, + 'control_location': control_location, + } + encryption = volume_client.volume_encryption_types.create( + volume_type, body + ) + return encryption + + +def _set_encryption_type(volume_client, volume_type, parsed_args): + # update the existing encryption type + body = {} + for attr in ['provider', 'cipher', 'key_size', 'control_location']: + info = getattr(parsed_args, 'encryption_' + attr, None) + if info is not None: + body[attr] = info + try: + volume_client.volume_encryption_types.update(volume_type, body) + except Exception as e: + if type(e).__name__ == 'NotFound': + # create new encryption type + LOG.warning( + _( + "No existing encryption type found, creating " + "new encryption type for this volume type ..." + ) + ) + _create_encryption_type(volume_client, volume_type, parsed_args) + + +class CreateVolumeType(command.ShowOne): + _description = _("Create new volume type") + + def get_parser(self, prog_name): + parser = super().get_parser(prog_name) + parser.add_argument( + "name", + metavar="", + help=_("Volume type name"), + ) + parser.add_argument( + "--description", + metavar="", + help=_("Volume type description"), + ) + public_group = parser.add_mutually_exclusive_group() + public_group.add_argument( + "--public", + action="store_true", + dest="is_public", + default=None, + help=_("Volume type is accessible to the public"), + ) + public_group.add_argument( + "--private", + action="store_false", + dest="is_public", + default=None, + help=_("Volume type is not accessible to the public"), + ) + parser.add_argument( + '--property', + metavar='', + action=parseractions.KeyValueAction, + dest='properties', + help=_( + 'Set a property on this volume type ' + '(repeat option to set multiple properties)' + ), + ) + parser.add_argument( + '--multiattach', + action='store_true', + default=False, + help=_( + "Enable multi-attach for this volume type " + "(this is an alias for '--property multiattach= True') " + "(requires driver support)" + ), + ) + parser.add_argument( + '--cacheable', + action='store_true', + default=False, + help=_( + "Enable caching for this volume type " + "(this is an alias for '--property cacheable= True') " + "(requires driver support)" + ), + ) + parser.add_argument( + '--replicated', + action='store_true', + default=False, + help=_( + "Enabled replication for this volume type " + "(this is an alias for '--property replication_enabled= True') " # noqa: E501 + "(requires driver support)" + ), + ) + parser.add_argument( + '--availability-zone', + action='append', + dest='availability_zones', + help=_( + "Set an availability zone for this volume type " + "(this is an alias for '--property RESKEY:availability_zones:') " # noqa: E501 + "(repeat option to set multiple availability zones)" + ), + ) + parser.add_argument( + '--project', + metavar='', + help=_( + "Allow to access private type (name or ID) " + "(must be used with --private option)" + ), + ) + identity_common.add_project_domain_option_to_parser(parser) + # TODO(Huanxuan Ao): Add choices for each "--encryption-*" option. + parser.add_argument( + '--encryption-provider', + metavar='', + help=_( + 'Set the encryption provider format for ' + 'this volume type (e.g "luks" or "plain") (admin only) ' + '(this option is required when setting encryption type ' + 'of a volume; consider using other encryption options ' + 'such as: "--encryption-cipher", "--encryption-key-size" ' + 'and "--encryption-control-location")' + ), + ) + parser.add_argument( + '--encryption-cipher', + metavar='', + help=_( + 'Set the encryption algorithm or mode for this ' + 'volume type (e.g "aes-xts-plain64") (admin only)' + ), + ) + parser.add_argument( + '--encryption-key-size', + metavar='', + type=int, + help=_( + 'Set the size of the encryption key of this ' + 'volume type (e.g "128" or "256") (admin only)' + ), + ) + parser.add_argument( + '--encryption-control-location', + metavar='', + choices=['front-end', 'back-end'], + help=_( + 'Set the notional service where the encryption is ' + 'performed ("front-end" or "back-end") (admin only) ' + '(The default value for this option is "front-end" ' + 'when setting encryption type of a volume. Consider ' + 'using other encryption options such as: ' + '"--encryption-cipher", "--encryption-key-size" and ' + '"--encryption-provider")' + ), + ) + return parser + + def take_action(self, parsed_args): + identity_client = self.app.client_manager.identity + volume_client = self.app.client_manager.volume + + if parsed_args.project and parsed_args.is_public is not False: + msg = _("--project is only allowed with --private") + raise exceptions.CommandError(msg) + + kwargs = {} + + if parsed_args.is_public is not None: + kwargs['is_public'] = parsed_args.is_public + + volume_type = volume_client.volume_types.create( + parsed_args.name, + description=parsed_args.description, + **kwargs, + ) + volume_type._info.pop('extra_specs') + + if parsed_args.project: + try: + project_id = identity_common.find_project( + identity_client, + parsed_args.project, + parsed_args.project_domain, + ).id + volume_client.volume_type_access.add_project_access( + volume_type.id, project_id + ) + except Exception as e: + msg = _( + "Failed to add project %(project)s access to " + "type: %(e)s" + ) + LOG.error(msg % {'project': parsed_args.project, 'e': e}) + + properties = {} + if parsed_args.properties: + properties.update(parsed_args.properties) + if parsed_args.multiattach: + properties['multiattach'] = ' True' + if parsed_args.cacheable: + properties['cacheable'] = ' True' + if parsed_args.replicated: + properties['replication_enabled'] = ' True' + if parsed_args.availability_zones: + properties['RESKEY:availability_zones'] = ','.join( + parsed_args.availability_zones + ) + if properties: + result = volume_type.set_keys(properties) + volume_type._info.update( + {'properties': format_columns.DictColumn(result)} + ) + + if ( + parsed_args.encryption_provider + or parsed_args.encryption_cipher + or parsed_args.encryption_key_size + or parsed_args.encryption_control_location + ): + try: + # create new encryption + encryption = _create_encryption_type( + volume_client, volume_type, parsed_args + ) + except Exception as e: + LOG.error( + _( + "Failed to set encryption information for this " + "volume type: %s" + ), + e, + ) + # add encryption info in result + encryption._info.pop("volume_type_id", None) + volume_type._info.update( + {'encryption': format_columns.DictColumn(encryption._info)} + ) + + volume_type._info.pop("os-volume-type-access:is_public", None) + + return zip(*sorted(volume_type._info.items())) + + +class DeleteVolumeType(command.Command): + _description = _("Delete volume type(s)") + + def get_parser(self, prog_name): + parser = super().get_parser(prog_name) + parser.add_argument( + "volume_types", + metavar="", + nargs="+", + help=_("Volume type(s) to delete (name or ID)"), + ) + return parser + + def take_action(self, parsed_args): + volume_client = self.app.client_manager.volume + result = 0 + + for volume_type in parsed_args.volume_types: + try: + vol_type = utils.find_resource( + volume_client.volume_types, volume_type + ) + + volume_client.volume_types.delete(vol_type) + except Exception as e: + result += 1 + LOG.error( + _( + "Failed to delete volume type with " + "name or ID '%(volume_type)s': %(e)s" + ) + % {'volume_type': volume_type, 'e': e} + ) + + if result > 0: + total = len(parsed_args.volume_types) + msg = _( + "%(result)s of %(total)s volume types failed " "to delete." + ) % {'result': result, 'total': total} + raise exceptions.CommandError(msg) + + +class ListVolumeType(command.Lister): + _description = _("List volume types") + + def get_parser(self, prog_name): + parser = super().get_parser(prog_name) + parser.add_argument( + '--long', + action='store_true', + default=False, + help=_('List additional fields in output'), + ) + public_group = parser.add_mutually_exclusive_group() + public_group.add_argument( + "--default", + action='store_true', + default=False, + help=_('List the default volume type'), + ) + public_group.add_argument( + "--public", + action="store_true", + dest="is_public", + default=None, + help=_("List only public types"), + ) + public_group.add_argument( + "--private", + action="store_false", + dest="is_public", + default=None, + help=_("List only private types (admin only)"), + ) + parser.add_argument( + "--encryption-type", + action="store_true", + help=_( + "Display encryption information for each volume type " + "(admin only)" + ), + ) + parser.add_argument( + '--property', + metavar='', + action=parseractions.KeyValueAction, + dest='properties', + help=_( + 'Filter by a property on the volume types ' + '(repeat option to filter by multiple properties) ' + '(admin only except for user-visible extra specs) ' + '(supported by --os-volume-api-version 3.52 or above)' + ), + ) + parser.add_argument( + '--multiattach', + action='store_true', + default=False, + help=_( + "List only volume types with multi-attach enabled " + "(this is an alias for '--property multiattach= True') " + "(supported by --os-volume-api-version 3.52 or above)" + ), + ) + parser.add_argument( + '--cacheable', + action='store_true', + default=False, + help=_( + "List only volume types with caching enabled " + "(this is an alias for '--property cacheable= True') " + "(admin only) " + "(supported by --os-volume-api-version 3.52 or above)" + ), + ) + parser.add_argument( + '--replicated', + action='store_true', + default=False, + help=_( + "List only volume types with replication enabled " + "(this is an alias for '--property replication_enabled= True') " # noqa: E501 + "(supported by --os-volume-api-version 3.52 or above)" + ), + ) + parser.add_argument( + '--availability-zone', + action='append', + dest='availability_zones', + help=_( + "List only volume types with this availability configured " + "(this is an alias for '--property RESKEY:availability_zones:') " # noqa: E501 + "(repeat option to filter on multiple availability zones)" + ), + ) + return parser + + def take_action(self, parsed_args): + volume_client = self.app.client_manager.volume + + if parsed_args.long: + columns = [ + 'ID', + 'Name', + 'Is Public', + 'Description', + 'Extra Specs', + ] + column_headers = [ + 'ID', + 'Name', + 'Is Public', + 'Description', + 'Properties', + ] + else: + columns = ['ID', 'Name', 'Is Public'] + column_headers = ['ID', 'Name', 'Is Public'] + + if parsed_args.default: + data = [volume_client.volume_types.default()] + else: + search_opts = {} + properties = {} + if parsed_args.properties: + properties.update(parsed_args.properties) + if parsed_args.multiattach: + properties['multiattach'] = ' True' + if parsed_args.cacheable: + properties['cacheable'] = ' True' + if parsed_args.replicated: + properties['replication_enabled'] = ' True' + if parsed_args.availability_zones: + properties['RESKEY:availability_zones'] = ','.join( + parsed_args.availability_zones + ) + if properties: + if volume_client.api_version < api_versions.APIVersion('3.52'): + msg = _( + "--os-volume-api-version 3.52 or greater is required " + "to use the '--property' option or any of the alias " + "options" + ) + raise exceptions.CommandError(msg) + + search_opts['extra_specs'] = properties + + data = volume_client.volume_types.list( + search_opts=search_opts, + is_public=parsed_args.is_public, + ) + + formatters = {'Extra Specs': format_columns.DictColumn} + + if parsed_args.encryption_type: + encryption = {} + for d in volume_client.volume_encryption_types.list(): + volume_type_id = d._info['volume_type_id'] + # remove some redundant information + del_key = [ + 'deleted', + 'created_at', + 'updated_at', + 'deleted_at', + 'volume_type_id', + ] + for key in del_key: + d._info.pop(key, None) + # save the encryption information with their volume type ID + encryption[volume_type_id] = d._info + # We need to get volume type ID, then show encryption + # information according to the ID, so use "id" to keep + # difference to the real "ID" column. + columns += ['id'] + column_headers += ['Encryption'] + + _EncryptionInfoColumn = functools.partial( + EncryptionInfoColumn, encryption_data=encryption + ) + formatters['id'] = _EncryptionInfoColumn + + return ( + column_headers, + ( + utils.get_item_properties( + s, + columns, + formatters=formatters, + ) + for s in data + ), + ) + + +class SetVolumeType(command.Command): + _description = _("Set volume type properties") + + def get_parser(self, prog_name): + parser = super().get_parser(prog_name) + parser.add_argument( + 'volume_type', + metavar='', + help=_('Volume type to modify (name or ID)'), + ) + parser.add_argument( + '--name', + metavar='', + help=_('Set volume type name'), + ) + parser.add_argument( + '--description', + metavar='', + help=_('Set volume type description'), + ) + parser.add_argument( + '--property', + metavar='', + action=parseractions.KeyValueAction, + dest='properties', + help=_( + 'Set a property on this volume type ' + '(repeat option to set multiple properties)' + ), + ) + parser.add_argument( + '--multiattach', + action='store_true', + default=False, + help=_( + "Enable multi-attach for this volume type " + "(this is an alias for '--property multiattach= True') " + "(requires driver support)" + ), + ) + parser.add_argument( + '--cacheable', + action='store_true', + default=False, + help=_( + "Enable caching for this volume type " + "(this is an alias for '--property cacheable= True') " + "(requires driver support)" + ), + ) + parser.add_argument( + '--replicated', + action='store_true', + default=False, + help=_( + "Enabled replication for this volume type " + "(this is an alias for '--property replication_enabled= True') " # noqa: E501 + "(requires driver support)" + ), + ) + parser.add_argument( + '--availability-zone', + action='append', + dest='availability_zones', + help=_( + "Set an availability zone for this volume type " + "(this is an alias for '--property RESKEY:availability_zones:') " # noqa: E501 + "(repeat option to set multiple availability zones)" + ), + ) + parser.add_argument( + '--project', + metavar='', + help=_( + 'Set volume type access to project (name or ID) ' + '(admin only)' + ), + ) + public_group = parser.add_mutually_exclusive_group() + public_group.add_argument( + '--public', + action='store_true', + dest='is_public', + default=None, + help=_('Volume type is accessible to the public'), + ) + public_group.add_argument( + '--private', + action='store_false', + dest='is_public', + default=None, + help=_("Volume type is not accessible to the public"), + ) + identity_common.add_project_domain_option_to_parser(parser) + # TODO(Huanxuan Ao): Add choices for each "--encryption-*" option. + parser.add_argument( + '--encryption-provider', + metavar='', + help=_( + 'Set the encryption provider format for ' + 'this volume type (e.g "luks" or "plain") (admin only) ' + '(This option is required when setting encryption type ' + 'of a volume for the first time. Consider using other ' + 'encryption options such as: "--encryption-cipher", ' + '"--encryption-key-size" and ' + '"--encryption-control-location")' + ), + ) + parser.add_argument( + '--encryption-cipher', + metavar='', + help=_( + 'Set the encryption algorithm or mode for this ' + 'volume type (e.g "aes-xts-plain64") (admin only)' + ), + ) + parser.add_argument( + '--encryption-key-size', + metavar='', + type=int, + help=_( + 'Set the size of the encryption key of this ' + 'volume type (e.g "128" or "256") (admin only)' + ), + ) + parser.add_argument( + '--encryption-control-location', + metavar='', + choices=['front-end', 'back-end'], + help=_( + 'Set the notional service where the encryption is ' + 'performed ("front-end" or "back-end") (admin only) ' + '(The default value for this option is "front-end" ' + 'when setting encryption type of a volume for the ' + 'first time. Consider using other encryption options ' + 'such as: "--encryption-cipher", "--encryption-key-size" ' + 'and "--encryption-provider")' + ), + ) + return parser + + def take_action(self, parsed_args): + volume_client = self.app.client_manager.volume + identity_client = self.app.client_manager.identity + + volume_type = utils.find_resource( + volume_client.volume_types, + parsed_args.volume_type, + ) + + result = 0 + kwargs = {} + + if parsed_args.name: + kwargs['name'] = parsed_args.name + + if parsed_args.description: + kwargs['description'] = parsed_args.description + + if parsed_args.is_public is not None: + kwargs['is_public'] = parsed_args.is_public + + if kwargs: + try: + volume_client.volume_types.update(volume_type.id, **kwargs) + except Exception as e: + LOG.error( + _( + "Failed to update volume type name or" + " description: %s" + ), + e, + ) + result += 1 + + properties = {} + + properties = {} + if parsed_args.properties: + properties.update(parsed_args.properties) + if parsed_args.multiattach: + properties['multiattach'] = ' True' + if parsed_args.cacheable: + properties['cacheable'] = ' True' + if parsed_args.replicated: + properties['replication_enabled'] = ' True' + if parsed_args.availability_zones: + properties['RESKEY:availability_zones'] = ','.join( + parsed_args.availability_zones + ) + if properties: + try: + volume_type.set_keys(properties) + except Exception as e: + LOG.error(_("Failed to set volume type properties: %s"), e) + result += 1 + + if parsed_args.project: + project_info = None + try: + project_info = identity_common.find_project( + identity_client, + parsed_args.project, + parsed_args.project_domain, + ) + + volume_client.volume_type_access.add_project_access( + volume_type.id, project_info.id + ) + except Exception as e: + LOG.error( + _("Failed to set volume type access to " "project: %s"), e + ) + result += 1 + + if ( + parsed_args.encryption_provider + or parsed_args.encryption_cipher + or parsed_args.encryption_key_size + or parsed_args.encryption_control_location + ): + try: + _set_encryption_type(volume_client, volume_type, parsed_args) + except Exception as e: + LOG.error( + _( + "Failed to set encryption information for this " + "volume type: %s" + ), + e, + ) + result += 1 + + if result > 0: + raise exceptions.CommandError( + _("Command Failed: One or more of" " the operations failed") + ) + + +class ShowVolumeType(command.ShowOne): + _description = _("Display volume type details") + + def get_parser(self, prog_name): + parser = super().get_parser(prog_name) + parser.add_argument( + "volume_type", + metavar="", + help=_("Volume type to display (name or ID)"), + ) + parser.add_argument( + "--encryption-type", + action="store_true", + help=_( + "Display encryption information of this volume type " + "(admin only)" + ), + ) + return parser + + def take_action(self, parsed_args): + volume_client = self.app.client_manager.volume + volume_type = utils.find_resource( + volume_client.volume_types, parsed_args.volume_type + ) + properties = format_columns.DictColumn( + volume_type._info.pop('extra_specs', {}) + ) + volume_type._info.update({'properties': properties}) + access_project_ids = None + if not volume_type.is_public: + try: + volume_type_access = volume_client.volume_type_access.list( + volume_type.id + ) + project_ids = [ + utils.get_field(item, 'project_id') + for item in volume_type_access + ] + # TODO(Rui Chen): This format list case can be removed after + # patch https://review.opendev.org/#/c/330223/ merged. + access_project_ids = format_columns.ListColumn(project_ids) + except Exception as e: + msg = _( + 'Failed to get access project list for volume type ' + '%(type)s: %(e)s' + ) + LOG.error(msg % {'type': volume_type.id, 'e': e}) + volume_type._info.update({'access_project_ids': access_project_ids}) + if parsed_args.encryption_type: + # show encryption type information for this volume type + try: + encryption = volume_client.volume_encryption_types.get( + volume_type.id + ) + encryption._info.pop("volume_type_id", None) + volume_type._info.update( + {'encryption': format_columns.DictColumn(encryption._info)} + ) + except Exception as e: + LOG.error( + _( + "Failed to display the encryption information " + "of this volume type: %s" + ), + e, + ) + volume_type._info.pop("os-volume-type-access:is_public", None) + return zip(*sorted(volume_type._info.items())) + + +class UnsetVolumeType(command.Command): + _description = _("Unset volume type properties") + + def get_parser(self, prog_name): + parser = super().get_parser(prog_name) + parser.add_argument( + 'volume_type', + metavar='', + help=_('Volume type to modify (name or ID)'), + ) + parser.add_argument( + '--property', + metavar='', + action='append', + dest='properties', + help=_( + 'Remove a property from this volume type ' + '(repeat option to remove multiple properties)' + ), + ) + parser.add_argument( + '--project', + metavar='', + help=_( + 'Removes volume type access to project (name or ID) ' + '(admin only)' + ), + ) + identity_common.add_project_domain_option_to_parser(parser) + parser.add_argument( + "--encryption-type", + action="store_true", + help=_( + "Remove the encryption type for this volume type " + "(admin only)" + ), + ) + return parser + + def take_action(self, parsed_args): + volume_client = self.app.client_manager.volume + identity_client = self.app.client_manager.identity + + volume_type = utils.find_resource( + volume_client.volume_types, + parsed_args.volume_type, + ) + + result = 0 + if parsed_args.properties: + try: + volume_type.unset_keys(parsed_args.properties) + except Exception as e: + LOG.error(_("Failed to unset volume type properties: %s"), e) + result += 1 + + if parsed_args.project: + project_info = None + try: + project_info = identity_common.find_project( + identity_client, + parsed_args.project, + parsed_args.project_domain, + ) + + volume_client.volume_type_access.remove_project_access( + volume_type.id, project_info.id + ) + except Exception as e: + LOG.error( + _( + "Failed to remove volume type access from " + "project: %s" + ), + e, + ) + result += 1 + if parsed_args.encryption_type: + try: + volume_client.volume_encryption_types.delete(volume_type) + except Exception as e: + LOG.error( + _( + "Failed to remove the encryption type for this " + "volume type: %s" + ), + e, + ) + result += 1 + + if result > 0: + raise exceptions.CommandError( + _("Command Failed: One or more of" " the operations failed") + ) diff --git a/setup.cfg b/setup.cfg index b6f8188dff..04e4c9b9e9 100644 --- a/setup.cfg +++ b/setup.cfg @@ -831,12 +831,12 @@ openstack.volume.v3 = volume_snapshot_show = openstackclient.volume.v2.volume_snapshot:ShowVolumeSnapshot volume_snapshot_unset = openstackclient.volume.v2.volume_snapshot:UnsetVolumeSnapshot - volume_type_create = openstackclient.volume.v2.volume_type:CreateVolumeType - volume_type_delete = openstackclient.volume.v2.volume_type:DeleteVolumeType - volume_type_list = openstackclient.volume.v2.volume_type:ListVolumeType - volume_type_set = openstackclient.volume.v2.volume_type:SetVolumeType - volume_type_show = openstackclient.volume.v2.volume_type:ShowVolumeType - volume_type_unset = openstackclient.volume.v2.volume_type:UnsetVolumeType + volume_type_create = openstackclient.volume.v3.volume_type:CreateVolumeType + volume_type_delete = openstackclient.volume.v3.volume_type:DeleteVolumeType + volume_type_list = openstackclient.volume.v3.volume_type:ListVolumeType + volume_type_set = openstackclient.volume.v3.volume_type:SetVolumeType + volume_type_show = openstackclient.volume.v3.volume_type:ShowVolumeType + volume_type_unset = openstackclient.volume.v3.volume_type:UnsetVolumeType volume_qos_associate = openstackclient.volume.v2.qos_specs:AssociateQos volume_qos_create = openstackclient.volume.v2.qos_specs:CreateQos