cloudbase-init/cloudbaseinit/tests/plugins/common/test_setuserpassword.py
Adrian Vladu 8dd0d9dbc3 Added metadata service encryption key retrieval
The encryption public key, if existent, will be used to encrypt the
user password to be sent to the metadata service.
By default, in case of OpenStack, the first public key set by the user
will be used to encrypt the user password.

This special method is needed by metadata services implementations
which will not rely on the same behaviour, as the the encryption key
can be set at another metadata endpoint.

Partially implements: blueprint packet-metadata-suport

Co-Authored-By: Paula Madalina Crismaru <pcrismaru@cloudbasesolutions.com>

Change-Id: I8bfde766d74dcd09e659d09a33a12f6f50b36ed2
2019-10-23 12:37:04 +03:00

322 lines
14 KiB
Python

# Copyright 2013 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import unittest
try:
import unittest.mock as mock
except ImportError:
import mock
from cloudbaseinit import conf as cloudbaseinit_conf
from cloudbaseinit import constant
from cloudbaseinit.plugins.common import constants
from cloudbaseinit.plugins.common import setuserpassword
from cloudbaseinit.tests.metadata import fake_json_response
from cloudbaseinit.tests import testutils
CONF = cloudbaseinit_conf.CONF
class SetUserPasswordPluginTests(unittest.TestCase):
def setUp(self):
self._setpassword_plugin = setuserpassword.SetUserPasswordPlugin()
self.fake_data = fake_json_response.get_fake_metadata_json(
'2013-04-04')
@mock.patch('base64.b64encode')
@mock.patch('cloudbaseinit.utils.crypt.CryptManager'
'.load_ssh_rsa_public_key')
def test_encrypt_password(self, mock_load_ssh_key, mock_b64encode):
mock_rsa = mock.MagicMock()
fake_ssh_pub_key = 'fake key'
fake_password = 'fake password'
mock_load_ssh_key.return_value = mock_rsa
mock_rsa.__enter__().public_encrypt.return_value = 'public encrypted'
mock_b64encode.return_value = 'encrypted password'
response = self._setpassword_plugin._encrypt_password(
fake_ssh_pub_key, fake_password)
mock_load_ssh_key.assert_called_with(fake_ssh_pub_key)
mock_rsa.__enter__().public_encrypt.assert_called_with(
b'fake password')
mock_b64encode.assert_called_with('public encrypted')
self.assertEqual('encrypted password', response)
def _test_get_password(self, inject_password):
shared_data = {}
expected_password = 'Passw0rd'
if not inject_password:
# The password should be the one created by
# CreateUser plugin.
shared_data[constants.SHARED_DATA_PASSWORD] = (
mock.sentinel.create_user_password)
mock_service = mock.MagicMock()
mock_service.get_admin_password.return_value = expected_password
with testutils.LogSnatcher('cloudbaseinit.plugins.common.'
'setuserpassword') as snatcher:
with testutils.ConfPatcher('inject_user_password',
inject_password):
response = self._setpassword_plugin._get_password(
mock_service, shared_data)
expected_logging = [
'Using admin_pass metadata user password. '
'Consider changing it as soon as possible'
]
if inject_password:
self.assertEqual(expected_logging, snatcher.output)
mock_service.get_admin_password.assert_called_with()
expected_password = (expected_password, True)
else:
self.assertFalse(mock_service.get_admin_password.called)
expected_password = (mock.sentinel.create_user_password, False)
self.assertEqual(expected_password, response)
def test_get_password_inject_true(self):
self._test_get_password(inject_password=True)
def test_get_password_inject_false(self):
self._test_get_password(inject_password=False)
@mock.patch('cloudbaseinit.plugins.common.setuserpassword.'
'SetUserPasswordPlugin._encrypt_password')
def _test_set_metadata_password(self, mock_encrypt_password,
ssh_pub_key):
fake_passw0rd = 'fake Passw0rd'
mock_service = mock.MagicMock()
mock_encrypt_password.return_value = 'encrypted password'
mock_service.post_password.return_value = 'value'
mock_service.can_post_password = True
mock_service.get_user_pwd_encryption_key.return_value = ssh_pub_key
mock_service.is_password_set = False
with testutils.LogSnatcher('cloudbaseinit.plugins.common.'
'setuserpassword') as snatcher:
response = self._setpassword_plugin._set_metadata_password(
fake_passw0rd, mock_service)
expected_logging = []
if ssh_pub_key is None:
expected_logging = [
'No SSH public key available for password encryption'
]
self.assertTrue(response)
else:
mock_encrypt_password.assert_called_once_with(ssh_pub_key,
fake_passw0rd)
mock_service.post_password.assert_called_with(
'encrypted password')
self.assertEqual('value', response)
self.assertEqual(expected_logging, snatcher.output)
def test_set_metadata_password_with_ssh_key(self):
fake_key = 'fake key'
self._test_set_metadata_password(ssh_pub_key=fake_key)
def test_set_metadata_password_no_ssh_key(self):
self._test_set_metadata_password(ssh_pub_key=None)
def test_set_metadata_password_already_set(self):
mock_service = mock.MagicMock()
mock_service.is_password_set = True
with testutils.LogSnatcher('cloudbaseinit.plugins.common.'
'setuserpassword') as snatcher:
response = self._setpassword_plugin._set_metadata_password(
mock.sentinel.fake_password, mock_service)
self.assertTrue(response)
expected_logging = ['User\'s password already set in the '
'instance metadata and it cannot be '
'updated in the instance metadata']
self.assertEqual(expected_logging, snatcher.output)
@mock.patch('cloudbaseinit.plugins.common.setuserpassword.'
'SetUserPasswordPlugin._change_logon_behaviour')
@mock.patch('cloudbaseinit.plugins.common.setuserpassword.'
'SetUserPasswordPlugin._get_password')
def _test_set_password(self, mock_get_password,
mock_change_logon_behaviour,
password, can_update_password,
is_password_changed, max_password_length=20,
injected=False):
expected_password = password
expected_logging = []
user = 'fake_user'
mock_get_password.return_value = (password, injected)
mock_service = mock.MagicMock()
mock_osutils = mock.MagicMock()
if not password:
expected_password = "*" * CONF.user_password_length
mock_osutils.generate_random_password.return_value = expected_password
mock_service.can_update_password = can_update_password
mock_service.is_password_changed.return_value = is_password_changed
with testutils.ConfPatcher('user_password_length',
max_password_length):
with testutils.LogSnatcher('cloudbaseinit.plugins.common.'
'setuserpassword') as snatcher:
response = self._setpassword_plugin._set_password(
mock_service, mock_osutils, user,
mock.sentinel.shared_data)
if can_update_password and not is_password_changed:
expected_logging.append('Updating password is not required.')
expected_password = None
if not password:
expected_logging.append('Generating a random user password')
if not can_update_password or is_password_changed:
mock_get_password.assert_called_once_with(
mock_service, mock.sentinel.shared_data)
self.assertEqual(expected_password, response)
self.assertEqual(expected_logging, snatcher.output)
if password and can_update_password and is_password_changed:
mock_change_logon_behaviour.assert_called_once_with(
user, password_injected=injected)
def test_set_password(self):
self._test_set_password(password='Password',
can_update_password=False,
is_password_changed=False)
self._test_set_password(password=None,
can_update_password=False,
is_password_changed=False,
max_password_length=25)
self._test_set_password(password=None,
can_update_password=False,
is_password_changed=False,
max_password_length=10)
self._test_set_password(password='Password',
can_update_password=True,
is_password_changed=True)
self._test_set_password(password='Password',
can_update_password=True,
is_password_changed=False)
@mock.patch('cloudbaseinit.plugins.common.setuserpassword.'
'SetUserPasswordPlugin._set_password')
@mock.patch('cloudbaseinit.plugins.common.setuserpassword.'
'SetUserPasswordPlugin._set_metadata_password')
@mock.patch('cloudbaseinit.osutils.factory.get_os_utils')
def _test_execute(self, mock_get_os_utils, mock_set_metadata_password,
mock_set_password, is_password_set,
can_post_password, can_update_password=False):
mock_service = mock.MagicMock()
mock_osutils = mock.MagicMock()
fake_shared_data = mock.MagicMock()
fake_shared_data.get.return_value = 'fake username'
mock_service.is_password_set = is_password_set
mock_service.can_post_password = can_post_password
mock_service.can_update_password = can_update_password
mock_get_os_utils.return_value = mock_osutils
mock_osutils.user_exists.return_value = True
mock_set_password.return_value = 'fake password'
with testutils.LogSnatcher('cloudbaseinit.plugins.common.'
'setuserpassword') as snatcher:
response = self._setpassword_plugin.execute(mock_service,
fake_shared_data)
mock_get_os_utils.assert_called_once_with()
fake_shared_data.get.assert_called_with(
constants.SHARED_DATA_USERNAME, CONF.username)
mock_osutils.user_exists.assert_called_once_with('fake username')
mock_set_password.assert_called_once_with(mock_service, mock_osutils,
'fake username',
fake_shared_data)
expected_logging = [
"Password succesfully updated for user fake username",
]
if can_post_password:
mock_set_metadata_password.assert_called_once_with('fake password',
mock_service)
else:
expected_logging.append("Cannot set the password in the metadata "
"as it is not supported by this service")
self.assertFalse(mock_set_metadata_password.called)
if can_update_password:
self.assertEqual((2, False), response)
else:
self.assertEqual((1, False), response)
self.assertEqual(expected_logging, snatcher.output)
def test_execute(self):
self._test_execute(is_password_set=False, can_post_password=False)
self._test_execute(is_password_set=True, can_post_password=True)
self._test_execute(is_password_set=False, can_post_password=True)
self._test_execute(is_password_set=True, can_post_password=True,
can_update_password=True)
@mock.patch.object(setuserpassword.osutils_factory, 'get_os_utils')
@testutils.ConfPatcher('first_logon_behaviour',
constant.NEVER_CHANGE)
def test_logon_behaviour_never_change(self, mock_get_os_utils):
self._setpassword_plugin._change_logon_behaviour(
mock.sentinel.username)
self.assertFalse(mock_get_os_utils.called)
@testutils.ConfPatcher('first_logon_behaviour',
constant.ALWAYS_CHANGE)
@mock.patch.object(setuserpassword, 'osutils_factory')
def test_logon_behaviour_always(self, mock_factory):
self._setpassword_plugin._change_logon_behaviour(
mock.sentinel.username)
mock_get_os_utils = mock_factory.get_os_utils
self.assertTrue(mock_get_os_utils.called)
osutils = mock_get_os_utils.return_value
osutils.change_password_next_logon.assert_called_once_with(
mock.sentinel.username)
@testutils.ConfPatcher('first_logon_behaviour',
constant.CLEAR_TEXT_INJECTED_ONLY)
@mock.patch.object(setuserpassword, 'osutils_factory')
def test_change_logon_behaviour_clear_text_password_not_injected(
self, mock_factory):
self._setpassword_plugin._change_logon_behaviour(
mock.sentinel.username,
password_injected=False)
mock_get_os_utils = mock_factory.get_os_utils
self.assertFalse(mock_get_os_utils.called)
@testutils.ConfPatcher('first_logon_behaviour',
constant.CLEAR_TEXT_INJECTED_ONLY)
@mock.patch.object(setuserpassword, 'osutils_factory')
def test_logon_behaviour_clear_text_password_injected(
self, mock_factory):
self._setpassword_plugin._change_logon_behaviour(
mock.sentinel.username,
password_injected=True)
mock_get_os_utils = mock_factory.get_os_utils
self.assertTrue(mock_get_os_utils.called)
osutils = mock_get_os_utils.return_value
osutils.change_password_next_logon.assert_called_once_with(
mock.sentinel.username)