Stephen Finucane 2454636386 compute: Generate SSH keypairs ourselves
Starting with the 2.92 microversion, nova will no longer generate SSH
keys. Avoid breaking users by generating keypairs ourselves using the
cryptography library, which was already an indirect dependency through
openstacksdk.

Change-Id: I3ad2732f70854ab72da0947f00847351dda23944
Implements: blueprint keypair-generation-removal
2023-05-02 12:18:52 +01:00

781 lines
25 KiB
Python

# Copyright 2016 IBM
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import copy
from unittest import mock
from unittest.mock import call
import uuid
from novaclient import api_versions
from openstack import utils as sdk_utils
from osc_lib import exceptions
from openstackclient.compute.v2 import keypair
from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes
from openstackclient.tests.unit import fakes
from openstackclient.tests.unit.identity.v2_0 import fakes as identity_fakes
from openstackclient.tests.unit import utils as tests_utils
class TestKeypair(compute_fakes.TestComputev2):
def setUp(self):
super(TestKeypair, self).setUp()
# Initialize the user mock
self.users_mock = self.app.client_manager.identity.users
self.users_mock.reset_mock()
self.users_mock.get.return_value = fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.USER),
loaded=True,
)
self.app.client_manager.sdk_connection = mock.Mock()
self.app.client_manager.sdk_connection.compute = mock.Mock()
self.sdk_client = self.app.client_manager.sdk_connection.compute
self.sdk_client.keypairs = mock.Mock()
self.sdk_client.create_keypair = mock.Mock()
self.sdk_client.delete_keypair = mock.Mock()
self.sdk_client.find_keypair = mock.Mock()
class TestKeypairCreate(TestKeypair):
def setUp(self):
super().setUp()
self.keypair = compute_fakes.FakeKeypair.create_one_keypair()
self.columns = (
'fingerprint',
'name',
'type',
'user_id'
)
self.data = (
self.keypair.fingerprint,
self.keypair.name,
self.keypair.type,
self.keypair.user_id
)
# Get the command object to test
self.cmd = keypair.CreateKeypair(self.app, None)
self.sdk_client.create_keypair.return_value = self.keypair
@mock.patch.object(
keypair, '_generate_keypair',
return_value=keypair.Keypair('private', 'public'),
)
def test_key_pair_create_no_options(self, mock_generate):
arglist = [
self.keypair.name,
]
verifylist = [
('name', self.keypair.name),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.sdk_client.create_keypair.assert_called_with(
name=self.keypair.name,
public_key=mock_generate.return_value.public_key,
)
self.assertEqual({}, columns)
self.assertEqual({}, data)
def test_keypair_create_public_key(self):
self.data = (
self.keypair.fingerprint,
self.keypair.name,
self.keypair.type,
self.keypair.user_id
)
arglist = [
'--public-key', self.keypair.public_key,
self.keypair.name,
]
verifylist = [
('public_key', self.keypair.public_key),
('name', self.keypair.name),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
with mock.patch('io.open') as mock_open:
mock_open.return_value = mock.MagicMock()
m_file = mock_open.return_value.__enter__.return_value
m_file.read.return_value = 'dummy'
columns, data = self.cmd.take_action(parsed_args)
self.sdk_client.create_keypair.assert_called_with(
name=self.keypair.name,
public_key=self.keypair.public_key,
)
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, data)
@mock.patch.object(
keypair, '_generate_keypair',
return_value=keypair.Keypair('private', 'public'),
)
def test_keypair_create_private_key(self, mock_generate):
tmp_pk_file = '/tmp/kp-file-' + uuid.uuid4().hex
arglist = [
'--private-key', tmp_pk_file,
self.keypair.name,
]
verifylist = [
('private_key', tmp_pk_file),
('name', self.keypair.name),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
with mock.patch('io.open') as mock_open:
mock_open.return_value = mock.MagicMock()
m_file = mock_open.return_value.__enter__.return_value
columns, data = self.cmd.take_action(parsed_args)
self.sdk_client.create_keypair.assert_called_with(
name=self.keypair.name,
public_key=mock_generate.return_value.public_key,
)
mock_open.assert_called_once_with(tmp_pk_file, 'w+')
m_file.write.assert_called_once_with(
mock_generate.return_value.private_key,
)
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, data)
@mock.patch.object(sdk_utils, 'supports_microversion', return_value=True)
def test_keypair_create_with_key_type(self, sm_mock):
for key_type in ['x509', 'ssh']:
self.sdk_client.create_keypair.return_value = self.keypair
self.data = (
self.keypair.fingerprint,
self.keypair.name,
self.keypair.type,
self.keypair.user_id,
)
arglist = [
'--public-key', self.keypair.public_key,
self.keypair.name,
'--type', key_type,
]
verifylist = [
('public_key', self.keypair.public_key),
('name', self.keypair.name),
('type', key_type),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
with mock.patch('io.open') as mock_open:
mock_open.return_value = mock.MagicMock()
m_file = mock_open.return_value.__enter__.return_value
m_file.read.return_value = 'dummy'
columns, data = self.cmd.take_action(parsed_args)
self.sdk_client.create_keypair.assert_called_with(
name=self.keypair.name,
public_key=self.keypair.public_key,
key_type=key_type,
)
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, data)
@mock.patch.object(sdk_utils, 'supports_microversion', return_value=False)
def test_keypair_create_with_key_type_pre_v22(self, sm_mock):
for key_type in ['x509', 'ssh']:
arglist = [
'--public-key', self.keypair.public_key,
self.keypair.name,
'--type', 'ssh',
]
verifylist = [
('public_key', self.keypair.public_key),
('name', self.keypair.name),
('type', 'ssh'),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
with mock.patch('io.open') as mock_open:
mock_open.return_value = mock.MagicMock()
m_file = mock_open.return_value.__enter__.return_value
m_file.read.return_value = 'dummy'
ex = self.assertRaises(
exceptions.CommandError,
self.cmd.take_action,
parsed_args)
self.assertIn(
'--os-compute-api-version 2.2 or greater is required',
str(ex))
@mock.patch.object(
keypair, '_generate_keypair',
return_value=keypair.Keypair('private', 'public'),
)
@mock.patch.object(sdk_utils, 'supports_microversion', return_value=True)
def test_key_pair_create_with_user(self, sm_mock, mock_generate):
arglist = [
'--user', identity_fakes.user_name,
self.keypair.name,
]
verifylist = [
('user', identity_fakes.user_name),
('name', self.keypair.name),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.sdk_client.create_keypair.assert_called_with(
name=self.keypair.name,
user_id=identity_fakes.user_id,
public_key=mock_generate.return_value.public_key,
)
self.assertEqual({}, columns)
self.assertEqual({}, data)
@mock.patch.object(sdk_utils, 'supports_microversion', return_value=False)
def test_key_pair_create_with_user_pre_v210(self, sm_mock):
arglist = [
'--user', identity_fakes.user_name,
self.keypair.name,
]
verifylist = [
('user', identity_fakes.user_name),
('name', self.keypair.name),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
ex = self.assertRaises(
exceptions.CommandError,
self.cmd.take_action,
parsed_args)
self.assertIn(
'--os-compute-api-version 2.10 or greater is required', str(ex))
class TestKeypairDelete(TestKeypair):
keypairs = compute_fakes.FakeKeypair.create_keypairs(count=2)
def setUp(self):
super(TestKeypairDelete, self).setUp()
self.cmd = keypair.DeleteKeypair(self.app, None)
def test_keypair_delete(self):
arglist = [
self.keypairs[0].name
]
verifylist = [
('name', [self.keypairs[0].name]),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
ret = self.cmd.take_action(parsed_args)
self.assertIsNone(ret)
self.sdk_client.delete_keypair.assert_called_with(
self.keypairs[0].name, ignore_missing=False)
def test_delete_multiple_keypairs(self):
arglist = []
for k in self.keypairs:
arglist.append(k.name)
verifylist = [
('name', arglist),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
calls = []
for k in self.keypairs:
calls.append(call(k.name, ignore_missing=False))
self.sdk_client.delete_keypair.assert_has_calls(calls)
self.assertIsNone(result)
def test_delete_multiple_keypairs_with_exception(self):
arglist = [
self.keypairs[0].name,
'unexist_keypair',
]
verifylist = [
('name', arglist),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.sdk_client.delete_keypair.side_effect = [
None, exceptions.CommandError]
try:
self.cmd.take_action(parsed_args)
self.fail('CommandError should be raised.')
except exceptions.CommandError as e:
self.assertEqual('1 of 2 keys failed to delete.', str(e))
calls = []
for k in arglist:
calls.append(call(k, ignore_missing=False))
self.sdk_client.delete_keypair.assert_has_calls(calls)
@mock.patch.object(sdk_utils, 'supports_microversion', return_value=True)
def test_keypair_delete_with_user(self, sm_mock):
arglist = [
'--user', identity_fakes.user_name,
self.keypairs[0].name
]
verifylist = [
('user', identity_fakes.user_name),
('name', [self.keypairs[0].name]),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
ret = self.cmd.take_action(parsed_args)
self.assertIsNone(ret)
self.sdk_client.delete_keypair.assert_called_with(
self.keypairs[0].name,
user_id=identity_fakes.user_id,
ignore_missing=False
)
@mock.patch.object(sdk_utils, 'supports_microversion', return_value=False)
def test_keypair_delete_with_user_pre_v210(self, sm_mock):
self.app.client_manager.compute.api_version = \
api_versions.APIVersion('2.9')
arglist = [
'--user', identity_fakes.user_name,
self.keypairs[0].name
]
verifylist = [
('user', identity_fakes.user_name),
('name', [self.keypairs[0].name]),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
ex = self.assertRaises(
exceptions.CommandError,
self.cmd.take_action,
parsed_args)
self.assertIn(
'--os-compute-api-version 2.10 or greater is required', str(ex))
class TestKeypairList(TestKeypair):
# Return value of self.sdk_client.keypairs().
keypairs = compute_fakes.FakeKeypair.create_keypairs(count=1)
def setUp(self):
super(TestKeypairList, self).setUp()
self.sdk_client.keypairs.return_value = self.keypairs
# Get the command object to test
self.cmd = keypair.ListKeypair(self.app, None)
@mock.patch.object(sdk_utils, 'supports_microversion', return_value=False)
def test_keypair_list_no_options(self, sm_mock):
arglist = []
verifylist = []
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
# In base command class Lister in cliff, abstract method take_action()
# returns a tuple containing the column names and an iterable
# containing the data to be listed.
columns, data = self.cmd.take_action(parsed_args)
# Set expected values
self.sdk_client.keypairs.assert_called_with()
self.assertEqual(('Name', 'Fingerprint'), columns)
self.assertEqual(
((self.keypairs[0].name, self.keypairs[0].fingerprint), ),
tuple(data)
)
@mock.patch.object(sdk_utils, 'supports_microversion', return_value=True)
def test_keypair_list_v22(self, sm_mock):
arglist = []
verifylist = []
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
# In base command class Lister in cliff, abstract method take_action()
# returns a tuple containing the column names and an iterable
# containing the data to be listed.
columns, data = self.cmd.take_action(parsed_args)
# Set expected values
self.sdk_client.keypairs.assert_called_with()
self.assertEqual(('Name', 'Fingerprint', 'Type'), columns)
self.assertEqual(
((
self.keypairs[0].name,
self.keypairs[0].fingerprint,
self.keypairs[0].type,
), ),
tuple(data)
)
@mock.patch.object(sdk_utils, 'supports_microversion', return_value=True)
def test_keypair_list_with_user(self, sm_mock):
users_mock = self.app.client_manager.identity.users
users_mock.reset_mock()
users_mock.get.return_value = fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.USER),
loaded=True,
)
arglist = [
'--user', identity_fakes.user_name,
]
verifylist = [
('user', identity_fakes.user_name),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
users_mock.get.assert_called_with(identity_fakes.user_name)
self.sdk_client.keypairs.assert_called_with(
user_id=identity_fakes.user_id,
)
self.assertEqual(('Name', 'Fingerprint', 'Type'), columns)
self.assertEqual(
((
self.keypairs[0].name,
self.keypairs[0].fingerprint,
self.keypairs[0].type,
), ),
tuple(data)
)
@mock.patch.object(sdk_utils, 'supports_microversion', return_value=False)
def test_keypair_list_with_user_pre_v210(self, sm_mock):
arglist = [
'--user', identity_fakes.user_name,
]
verifylist = [
('user', identity_fakes.user_name),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
ex = self.assertRaises(
exceptions.CommandError,
self.cmd.take_action,
parsed_args)
self.assertIn(
'--os-compute-api-version 2.10 or greater is required', str(ex))
@mock.patch.object(sdk_utils, 'supports_microversion', return_value=True)
def test_keypair_list_with_project(self, sm_mock):
projects_mock = self.app.client_manager.identity.tenants
projects_mock.reset_mock()
projects_mock.get.return_value = fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.PROJECT),
loaded=True,
)
users_mock = self.app.client_manager.identity.users
users_mock.reset_mock()
users_mock.list.return_value = [
fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.USER),
loaded=True,
),
]
arglist = ['--project', identity_fakes.project_name]
verifylist = [('project', identity_fakes.project_name)]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
projects_mock.get.assert_called_with(identity_fakes.project_name)
users_mock.list.assert_called_with(tenant_id=identity_fakes.project_id)
self.sdk_client.keypairs.assert_called_with(
user_id=identity_fakes.user_id,
)
self.assertEqual(('Name', 'Fingerprint', 'Type'), columns)
self.assertEqual(
((
self.keypairs[0].name,
self.keypairs[0].fingerprint,
self.keypairs[0].type,
), ),
tuple(data)
)
@mock.patch.object(sdk_utils, 'supports_microversion', return_value=False)
def test_keypair_list_with_project_pre_v210(self, sm_mock):
arglist = ['--project', identity_fakes.project_name]
verifylist = [('project', identity_fakes.project_name)]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
ex = self.assertRaises(
exceptions.CommandError,
self.cmd.take_action,
parsed_args)
self.assertIn(
'--os-compute-api-version 2.10 or greater is required', str(ex))
def test_keypair_list_conflicting_user_options(self):
arglist = [
'--user', identity_fakes.user_name,
'--project', identity_fakes.project_name,
]
self.assertRaises(
tests_utils.ParserException,
self.check_parser, self.cmd, arglist, None)
@mock.patch.object(
sdk_utils, 'supports_microversion', new=mock.Mock(return_value=True))
def test_keypair_list_with_limit(self):
arglist = [
'--limit', '1',
]
verifylist = [
('limit', 1),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.cmd.take_action(parsed_args)
self.sdk_client.keypairs.assert_called_with(limit=1)
@mock.patch.object(
sdk_utils, 'supports_microversion', new=mock.Mock(return_value=False))
def test_keypair_list_with_limit_pre_v235(self):
arglist = [
'--limit', '1',
]
verifylist = [
('limit', 1),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
ex = self.assertRaises(
exceptions.CommandError,
self.cmd.take_action,
parsed_args)
self.assertIn(
'--os-compute-api-version 2.35 or greater is required', str(ex))
@mock.patch.object(
sdk_utils, 'supports_microversion', new=mock.Mock(return_value=True))
def test_keypair_list_with_marker(self):
arglist = [
'--marker', 'test_kp',
]
verifylist = [
('marker', 'test_kp'),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.cmd.take_action(parsed_args)
self.sdk_client.keypairs.assert_called_with(marker='test_kp')
@mock.patch.object(
sdk_utils, 'supports_microversion', new=mock.Mock(return_value=False))
def test_keypair_list_with_marker_pre_v235(self):
arglist = [
'--marker', 'test_kp',
]
verifylist = [
('marker', 'test_kp'),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
ex = self.assertRaises(
exceptions.CommandError,
self.cmd.take_action,
parsed_args)
self.assertIn(
'--os-compute-api-version 2.35 or greater is required', str(ex))
class TestKeypairShow(TestKeypair):
keypair = compute_fakes.FakeKeypair.create_one_keypair()
def setUp(self):
super(TestKeypairShow, self).setUp()
self.sdk_client.find_keypair.return_value = self.keypair
self.cmd = keypair.ShowKeypair(self.app, None)
self.columns = (
"fingerprint",
"name",
"type",
"user_id"
)
self.data = (
self.keypair.fingerprint,
self.keypair.name,
self.keypair.type,
self.keypair.user_id
)
def test_keypair_show_no_options(self):
arglist = []
verifylist = []
# Missing required args should boil here
self.assertRaises(tests_utils.ParserException, self.check_parser,
self.cmd, arglist, verifylist)
def test_keypair_show(self):
self.sdk_client.find_keypair.return_value = self.keypair
self.data = (
self.keypair.fingerprint,
self.keypair.name,
self.keypair.type,
self.keypair.user_id
)
arglist = [
self.keypair.name
]
verifylist = [
('name', self.keypair.name)
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.sdk_client.find_keypair.assert_called_with(
self.keypair.name,
ignore_missing=False
)
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, data)
def test_keypair_show_public(self):
arglist = [
'--public-key',
self.keypair.name
]
verifylist = [
('public_key', True),
('name', self.keypair.name)
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.assertEqual({}, columns)
self.assertEqual({}, data)
@mock.patch.object(sdk_utils, 'supports_microversion', return_value=True)
def test_keypair_show_with_user(self, sm_mock):
self.sdk_client.find_keypair.return_value = self.keypair
self.data = (
self.keypair.fingerprint,
self.keypair.name,
self.keypair.type,
self.keypair.user_id
)
arglist = [
'--user', identity_fakes.user_name,
self.keypair.name,
]
verifylist = [
('user', identity_fakes.user_name),
('name', self.keypair.name)
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.users_mock.get.assert_called_with(identity_fakes.user_name)
self.sdk_client.find_keypair.assert_called_with(
self.keypair.name,
ignore_missing=False,
user_id=identity_fakes.user_id
)
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, data)
@mock.patch.object(sdk_utils, 'supports_microversion', return_value=False)
def test_keypair_show_with_user_pre_v210(self, sm_mock):
arglist = [
'--user', identity_fakes.user_name,
self.keypair.name,
]
verifylist = [
('user', identity_fakes.user_name),
('name', self.keypair.name)
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
ex = self.assertRaises(
exceptions.CommandError,
self.cmd.take_action,
parsed_args)
self.assertIn(
'--os-compute-api-version 2.10 or greater is required', str(ex))