diff --git a/cloudbaseinit/metadata/services/base.py b/cloudbaseinit/metadata/services/base.py index d9d80058..b6e4b598 100644 --- a/cloudbaseinit/metadata/services/base.py +++ b/cloudbaseinit/metadata/services/base.py @@ -139,3 +139,30 @@ class BaseMetadataService(object): def cleanup(self): pass + + @property + def can_update_password(self): + """The ability to update password of the metadata provider. + + If :meth:`~can_update_password` is True, plugins can check + periodically (e.g. at every boot) if the password changed. + + :rtype: bool + + .. notes: + The password will be updated only if the + :meth:`~is_password_changed` returns True. + """ + return False + + def is_password_changed(self): + """Check if the metadata provider has a new password for this + instance. + + :rtype: bool + + .. notes: + This method will be used only when :meth:`~can_update_password` + is True. + """ + return False diff --git a/cloudbaseinit/plugins/common/setuserpassword.py b/cloudbaseinit/plugins/common/setuserpassword.py index 73baf452..54f5c111 100644 --- a/cloudbaseinit/plugins/common/setuserpassword.py +++ b/cloudbaseinit/plugins/common/setuserpassword.py @@ -60,11 +60,6 @@ class SetUserPasswordPlugin(base.BasePlugin): 'changing it as soon as possible') else: password = shared_data.get(constants.SHARED_DATA_PASSWORD) - if not password: - LOG.debug('Generating a random user password') - # Generate a random password - maximum_length = osutils.get_maximum_password_length() - password = osutils.generate_random_password(maximum_length) return password @@ -84,8 +79,27 @@ class SetUserPasswordPlugin(base.BasePlugin): return True def _set_password(self, service, osutils, user_name, shared_data): + """Change the password for the received username if it is required. + + The used password can be the one received from the metadata provider, + if it does exist, or a random one will be generated. + + .. notes: + This method has a different behaviour depending on the value of + :meth:`~can_update password` if this is True the password will + be set only if the :meth:`~is_password_changed` is also True. + """ + if service.can_update_password and not service.is_password_changed(): + LOG.info('Updating password is not required.') + return None + password = self._get_password(service, osutils, shared_data) - LOG.info('Setting the user\'s password') + if not password: + LOG.debug('Generating a random user password') + maximum_length = osutils.get_maximum_password_length() + password = osutils.generate_random_password( + maximum_length) + osutils.set_user_password(user_name, password) return password @@ -99,14 +113,22 @@ class SetUserPasswordPlugin(base.BasePlugin): if osutils.user_exists(user_name): password = self._set_password(service, osutils, user_name, shared_data) - LOG.info('Password succesfully updated for user %s' % user_name) - # TODO(alexpilotti): encrypt with DPAPI - shared_data[constants.SHARED_DATA_PASSWORD] = password + if password: + LOG.info('Password succesfully updated for user %s' % + user_name) + # TODO(alexpilotti): encrypt with DPAPI + shared_data[constants.SHARED_DATA_PASSWORD] = password - if not service.can_post_password: - LOG.info('Cannot set the password in the metadata as it is ' - 'not supported by this service') - else: - self._set_metadata_password(password, service) + if not service.can_post_password: + LOG.info('Cannot set the password in the metadata as it ' + 'is not supported by this service') + else: + self._set_metadata_password(password, service) - return (base.PLUGIN_EXECUTION_DONE, False) + if service.can_update_password: + # If the metadata provider can update the password, the plugin + # must run at every boot in order to update the password if + # it was changed. + return (base.PLUGIN_EXECUTE_ON_NEXT_BOOT, False) + else: + return (base.PLUGIN_EXECUTION_DONE, False) diff --git a/cloudbaseinit/tests/plugins/common/test_setuserpassword.py b/cloudbaseinit/tests/plugins/common/test_setuserpassword.py index 4fcf077e..47803b82 100644 --- a/cloudbaseinit/tests/plugins/common/test_setuserpassword.py +++ b/cloudbaseinit/tests/plugins/common/test_setuserpassword.py @@ -71,11 +71,10 @@ class SetUserPasswordPluginTests(unittest.TestCase): def test_get_ssh_plublic_key_no_pub_keys(self): self._test_get_ssh_public_key(data_exists=False) - def _test_get_password(self, inject_password, generate_password): + def _test_get_password(self, inject_password): shared_data = {} - reuse_password = not generate_password and not inject_password expected_password = 'Passw0rd' - if reuse_password: + if not inject_password: # The password should be the one created by # CreateUser plugin. shared_data[constants.SHARED_DATA_PASSWORD] = ( @@ -84,7 +83,6 @@ class SetUserPasswordPluginTests(unittest.TestCase): mock_service = mock.MagicMock() mock_osutils = mock.MagicMock() mock_service.get_admin_password.return_value = expected_password - mock_osutils.generate_random_password.return_value = expected_password with testutils.ConfPatcher('inject_user_password', inject_password): response = self._setpassword_plugin._get_password(mock_service, @@ -92,31 +90,17 @@ class SetUserPasswordPluginTests(unittest.TestCase): shared_data) if inject_password: mock_service.get_admin_password.assert_called_with() - elif reuse_password: - self.assertFalse(mock_service.get_admin_password.called) - self.assertFalse(mock_osutils.generate_random_password.called) - expected_password = mock.sentinel.create_user_password else: - mock_osutils.get_maximum_password_length.assert_called_once_with() - mock_osutils.generate_random_password.assert_called_once_with( - mock_osutils.get_maximum_password_length()) + self.assertFalse(mock_service.get_admin_password.called) + expected_password = mock.sentinel.create_user_password + self.assertEqual(expected_password, response) def test_get_password_inject_true(self): - self._test_get_password(generate_password=False, - inject_password=True) + self._test_get_password(inject_password=True) def test_get_password_inject_false(self): - self._test_get_password(generate_password=False, - inject_password=False) - - def test_get_password_get_from_create_user_plugin(self): - self._test_get_password(inject_password=False, - generate_password=False) - - def test_get_password_generate(self): - self._test_get_password(inject_password=False, - generate_password=True) + self._test_get_password(inject_password=False) @mock.patch('cloudbaseinit.plugins.common.setuserpassword.' 'SetUserPasswordPlugin._get_ssh_public_key') @@ -174,23 +158,54 @@ class SetUserPasswordPluginTests(unittest.TestCase): @mock.patch('cloudbaseinit.plugins.common.setuserpassword.' 'SetUserPasswordPlugin._get_password') - def test_set_password(self, mock_get_password): + def _test_set_password(self, mock_get_password, password, + can_update_password, is_password_changed): + expected_password = password + expected_logging = [] + + mock_get_password.return_value = password + mock_service = mock.MagicMock() mock_osutils = mock.MagicMock() - mock_get_password.return_value = 'fake password' - response = self._setpassword_plugin._set_password( - mock_service, - mock_osutils, - 'fake user', - mock.sentinel.shared_data) - mock_get_password.assert_called_once_with( - mock_service, - mock_osutils, - mock.sentinel.shared_data) - mock_osutils.set_user_password.assert_called_once_with( - 'fake user', - 'fake password') - self.assertEqual(response, 'fake password') + mock_osutils.get_maximum_password_length.return_value = None + mock_osutils.generate_random_password.return_value = 'fake-password' + mock_service.can_update_password = can_update_password + mock_service.is_password_changed.return_value = is_password_changed + + with testutils.LogSnatcher('cloudbaseinit.plugins.common.' + 'setuserpassword') as snatcher: + response = self._setpassword_plugin._set_password( + mock_service, mock_osutils, 'fake_user', + mock.sentinel.shared_data) + + if can_update_password and not is_password_changed: + expected_logging.append('Updating password is not required.') + expected_password = None + + if not password: + expected_logging.append('Generating a random user password') + expected_password = 'fake-password' + + if not can_update_password or is_password_changed: + mock_get_password.assert_called_once_with( + mock_service, mock_osutils, mock.sentinel.shared_data) + + self.assertEqual(expected_password, response) + self.assertEqual(expected_logging, snatcher.output) + + def test_set_password(self): + self._test_set_password(password='Password', + can_update_password=False, + is_password_changed=False) + self._test_set_password(password=None, + can_update_password=False, + is_password_changed=False) + self._test_set_password(password='Password', + can_update_password=True, + is_password_changed=True) + self._test_set_password(password='Password', + can_update_password=True, + is_password_changed=False) @mock.patch('cloudbaseinit.plugins.common.setuserpassword.' 'SetUserPasswordPlugin._set_password') @@ -198,14 +213,15 @@ class SetUserPasswordPluginTests(unittest.TestCase): 'SetUserPasswordPlugin._set_metadata_password') @mock.patch('cloudbaseinit.osutils.factory.get_os_utils') def _test_execute(self, mock_get_os_utils, mock_set_metadata_password, - mock_set_password, is_password_set=False, - can_post_password=True): + mock_set_password, is_password_set, + can_post_password, can_update_password=False): mock_service = mock.MagicMock() mock_osutils = mock.MagicMock() fake_shared_data = mock.MagicMock() fake_shared_data.get.return_value = 'fake username' mock_service.is_password_set = is_password_set mock_service.can_post_password = can_post_password + mock_service.can_update_password = can_update_password mock_get_os_utils.return_value = mock_osutils mock_osutils.user_exists.return_value = True mock_set_password.return_value = 'fake password' @@ -233,10 +249,16 @@ class SetUserPasswordPluginTests(unittest.TestCase): "as it is not supported by this service") self.assertFalse(mock_set_metadata_password.called) - self.assertEqual((1, False), response) + if can_update_password: + self.assertEqual((2, False), response) + else: + self.assertEqual((1, False), response) + self.assertEqual(expected_logging, snatcher.output) def test_execute(self): self._test_execute(is_password_set=False, can_post_password=False) self._test_execute(is_password_set=True, can_post_password=True) self._test_execute(is_password_set=False, can_post_password=True) + self._test_execute(is_password_set=True, can_post_password=True, + can_update_password=True)