
Since the sethostname plugin was moved in plugins.common, there was a bit of Windows specific functionality inside it, the fact that it returns reboot_required in order to set the hostname on Windows. Now this choice is moved to osutils.set_host_name, which will simply return False for non-Windows systems. Change-Id: Ic7774f2c7dd86f3ade82a19520df4301bc9674a7
1360 lines
54 KiB
Python
1360 lines
54 KiB
Python
# Copyright 2013 Cloudbase Solutions Srl
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
|
|
import importlib
|
|
import os
|
|
import unittest
|
|
|
|
try:
|
|
import unittest.mock as mock
|
|
except ImportError:
|
|
import mock
|
|
from oslo.config import cfg
|
|
import six
|
|
|
|
from cloudbaseinit import exception
|
|
from cloudbaseinit.tests import fake
|
|
|
|
CONF = cfg.CONF
|
|
|
|
|
|
class TestWindowsUtils(unittest.TestCase):
|
|
'''Tests for the windows utils class.'''
|
|
|
|
_CONFIG_NAME = 'FakeConfig'
|
|
_DESTINATION = '192.168.192.168'
|
|
_GATEWAY = '10.7.1.1'
|
|
_NETMASK = '255.255.255.0'
|
|
_PASSWORD = 'Passw0rd'
|
|
_SECTION = 'fake_section'
|
|
_USERNAME = 'Admin'
|
|
|
|
def setUp(self):
|
|
self._pywintypes_mock = mock.MagicMock()
|
|
self._pywintypes_mock.com_error = fake.FakeComError
|
|
self._win32com_mock = mock.MagicMock()
|
|
self._win32process_mock = mock.MagicMock()
|
|
self._win32security_mock = mock.MagicMock()
|
|
self._wmi_mock = mock.MagicMock()
|
|
self._moves_mock = mock.MagicMock()
|
|
self._xmlrpc_client_mock = mock.MagicMock()
|
|
self._ctypes_mock = mock.MagicMock()
|
|
|
|
self._module_patcher = mock.patch.dict(
|
|
'sys.modules',
|
|
{'win32com': self._win32com_mock,
|
|
'win32process': self._win32process_mock,
|
|
'win32security': self._win32security_mock,
|
|
'wmi': self._wmi_mock,
|
|
'six.moves': self._moves_mock,
|
|
'six.moves.xmlrpc_client': self._xmlrpc_client_mock,
|
|
'ctypes': self._ctypes_mock,
|
|
'pywintypes': self._pywintypes_mock})
|
|
|
|
self._module_patcher.start()
|
|
self.windows_utils = importlib.import_module(
|
|
"cloudbaseinit.osutils.windows")
|
|
|
|
self._winreg_mock = self._moves_mock.winreg
|
|
self._windll_mock = self._ctypes_mock.windll
|
|
self._wintypes_mock = self._ctypes_mock.wintypes
|
|
self._client_mock = self._win32com_mock.client
|
|
self.windows_utils.WindowsError = mock.MagicMock()
|
|
|
|
self._winutils = self.windows_utils.WindowsUtils()
|
|
|
|
def tearDown(self):
|
|
self._module_patcher.stop()
|
|
|
|
def test_enable_shutdown_privilege(self):
|
|
fake_process = mock.MagicMock()
|
|
fake_token = True
|
|
LUID = 'fakeid'
|
|
self._win32process_mock.GetCurrentProcess.return_value = fake_process
|
|
self._win32security_mock.OpenProcessToken.return_value = fake_token
|
|
self._win32security_mock.LookupPrivilegeValue.return_value = LUID
|
|
|
|
self._winutils._enable_shutdown_privilege()
|
|
|
|
privilege = [(LUID,
|
|
self._win32security_mock.SE_PRIVILEGE_ENABLED)]
|
|
self._win32security_mock.AdjustTokenPrivileges.assert_called_with(
|
|
fake_token,
|
|
False,
|
|
privilege)
|
|
|
|
self._win32security_mock.OpenProcessToken.assert_called_with(
|
|
fake_process, self._win32security_mock.TOKEN_ADJUST_PRIVILEGES |
|
|
self._win32security_mock.TOKEN_QUERY)
|
|
|
|
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils'
|
|
'._enable_shutdown_privilege')
|
|
def _test_reboot(self, mock_enable_shutdown_privilege, ret_value):
|
|
advapi32 = self._windll_mock.advapi32
|
|
advapi32.InitiateSystemShutdownW = mock.MagicMock(
|
|
return_value=ret_value)
|
|
|
|
if not ret_value:
|
|
self.assertRaises(exception.CloudbaseInitException,
|
|
self._winutils.reboot)
|
|
else:
|
|
self._winutils.reboot()
|
|
|
|
advapi32.InitiateSystemShutdownW.assert_called_with(
|
|
0,
|
|
"Cloudbase-Init reboot",
|
|
0, True, True)
|
|
|
|
def test_reboot(self):
|
|
self._test_reboot(ret_value=True)
|
|
|
|
def test_reboot_failed(self):
|
|
self._test_reboot(ret_value=None)
|
|
|
|
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils'
|
|
'._sanitize_wmi_input')
|
|
def _test_get_user_wmi_object(self, mock_sanitize_wmi_input, return_value):
|
|
conn = self._wmi_mock.WMI
|
|
mock_sanitize_wmi_input.return_value = self._USERNAME
|
|
conn.return_value.query.return_value = return_value
|
|
|
|
response = self._winutils._get_user_wmi_object(self._USERNAME)
|
|
|
|
conn.return_value.query.assert_called_with(
|
|
"SELECT * FROM Win32_Account where name = \'%s\'" % self._USERNAME)
|
|
mock_sanitize_wmi_input.assert_called_with(self._USERNAME)
|
|
conn.assert_called_with(moniker='//./root/cimv2')
|
|
if return_value:
|
|
self.assertTrue(response is not None)
|
|
else:
|
|
self.assertTrue(response is None)
|
|
|
|
def test_get_user_wmi_object(self):
|
|
caption = 'fake'
|
|
self._test_get_user_wmi_object(return_value=caption)
|
|
|
|
def test_no_user_wmi_object(self):
|
|
empty_caption = ''
|
|
self._test_get_user_wmi_object(return_value=empty_caption)
|
|
|
|
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils'
|
|
'._get_user_wmi_object')
|
|
def _test_user_exists(self, mock_get_user_wmi_object, return_value):
|
|
mock_get_user_wmi_object.return_value = return_value
|
|
response = self._winutils.user_exists(return_value)
|
|
mock_get_user_wmi_object.assert_called_with(return_value)
|
|
if return_value:
|
|
self.assertTrue(response)
|
|
else:
|
|
self.assertFalse(response)
|
|
|
|
def test_user_exists(self):
|
|
self._test_user_exists(return_value=self._USERNAME)
|
|
|
|
def test_username_does_not_exist(self):
|
|
self._test_user_exists(return_value=None)
|
|
|
|
def test_sanitize_wmi_input(self):
|
|
unsanitised = ' \' '
|
|
response = self._winutils._sanitize_wmi_input(unsanitised)
|
|
sanitised = ' \'\' '
|
|
self.assertEqual(sanitised, response)
|
|
|
|
def test_sanitize_shell_input(self):
|
|
unsanitised = ' " '
|
|
response = self._winutils.sanitize_shell_input(unsanitised)
|
|
sanitised = ' \\" '
|
|
self.assertEqual(sanitised, response)
|
|
|
|
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils'
|
|
'._set_user_password_expiration')
|
|
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils'
|
|
'._get_adsi_object')
|
|
def _test_create_or_change_user(self, mock_get_adsi_object,
|
|
mock_set_user_password_expiration,
|
|
create, password_expires, ret_value=0):
|
|
|
|
if not ret_value:
|
|
self._winutils._create_or_change_user(self._USERNAME,
|
|
self._PASSWORD, create,
|
|
password_expires)
|
|
mock_set_user_password_expiration.assert_called_with(
|
|
self._USERNAME, password_expires)
|
|
else:
|
|
mock_get_adsi_object.side_effect = [
|
|
self._pywintypes_mock.com_error]
|
|
self.assertRaises(
|
|
exception.CloudbaseInitException,
|
|
self._winutils._create_or_change_user,
|
|
self._USERNAME, self._PASSWORD, create, password_expires)
|
|
|
|
if create:
|
|
mock_get_adsi_object.assert_called_with()
|
|
else:
|
|
mock_get_adsi_object.assert_called_with(
|
|
object_name=self._USERNAME, object_type='user')
|
|
|
|
def test_create_user_and_add_password_expire_true(self):
|
|
self._test_create_or_change_user(create=True, password_expires=True)
|
|
|
|
def test_create_user_and_add_password_expire_false(self):
|
|
self._test_create_or_change_user(create=True, password_expires=False)
|
|
|
|
def test_add_password_expire_true(self):
|
|
self._test_create_or_change_user(create=False, password_expires=True)
|
|
|
|
def test_add_password_expire_false(self):
|
|
self._test_create_or_change_user(create=False, password_expires=False)
|
|
|
|
def test_create_user_and_add_password_expire_true_with_ret_value(self):
|
|
self._test_create_or_change_user(create=True, password_expires=True,
|
|
ret_value=1)
|
|
|
|
def test_create_user_and_add_password_expire_false_with_ret_value(self):
|
|
self._test_create_or_change_user(create=True,
|
|
password_expires=False, ret_value=1)
|
|
|
|
def test_add_password_expire_true_with_ret_value(self):
|
|
self._test_create_or_change_user(create=False,
|
|
password_expires=True, ret_value=1)
|
|
|
|
def test_add_password_expire_false_with_ret_value(self):
|
|
self._test_create_or_change_user(create=False,
|
|
password_expires=False, ret_value=1)
|
|
|
|
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils'
|
|
'._get_user_wmi_object')
|
|
def _test_set_user_password_expiration(self, mock_get_user_wmi_object,
|
|
fake_obj):
|
|
mock_get_user_wmi_object.return_value = fake_obj
|
|
|
|
response = self._winutils._set_user_password_expiration(
|
|
self._USERNAME, True)
|
|
|
|
if fake_obj:
|
|
self.assertTrue(fake_obj.PasswordExpires)
|
|
self.assertTrue(response)
|
|
else:
|
|
self.assertFalse(response)
|
|
|
|
def test_set_password_expiration(self):
|
|
fake = mock.Mock()
|
|
self._test_set_user_password_expiration(fake_obj=fake)
|
|
|
|
def test_set_password_expiration_no_object(self):
|
|
self._test_set_user_password_expiration(fake_obj=None)
|
|
|
|
def _test_get_user_sid_and_domain(self, ret_val):
|
|
cbSid = mock.Mock()
|
|
sid = mock.Mock()
|
|
size = 1024
|
|
cchReferencedDomainName = mock.Mock()
|
|
domainName = mock.Mock()
|
|
sidNameUse = mock.Mock()
|
|
advapi32 = self._windll_mock.advapi32
|
|
|
|
self._ctypes_mock.create_string_buffer.return_value = sid
|
|
self._ctypes_mock.sizeof.return_value = size
|
|
self._wintypes_mock.DWORD.return_value = cchReferencedDomainName
|
|
self._ctypes_mock.create_unicode_buffer.return_value = domainName
|
|
|
|
advapi32.LookupAccountNameW.return_value = ret_val
|
|
if ret_val is None:
|
|
self.assertRaises(
|
|
exception.CloudbaseInitException,
|
|
self._winutils._get_user_sid_and_domain,
|
|
self._USERNAME)
|
|
else:
|
|
response = self._winutils._get_user_sid_and_domain(self._USERNAME)
|
|
|
|
advapi32.LookupAccountNameW.assert_called_with(
|
|
0, six.text_type(self._USERNAME), sid,
|
|
self._ctypes_mock.byref(cbSid), domainName,
|
|
self._ctypes_mock.byref(cchReferencedDomainName),
|
|
self._ctypes_mock.byref(sidNameUse))
|
|
self.assertEqual((sid, domainName.value), response)
|
|
|
|
def test_get_user_sid_and_domain(self):
|
|
fake_obj = mock.Mock()
|
|
self._test_get_user_sid_and_domain(ret_val=fake_obj)
|
|
|
|
def test_get_user_sid_and_domain_no_return_value(self):
|
|
self._test_get_user_sid_and_domain(ret_val=None)
|
|
|
|
@mock.patch('cloudbaseinit.osutils.windows'
|
|
'.Win32_LOCALGROUP_MEMBERS_INFO_3')
|
|
def _test_add_user_to_local_group(self,
|
|
mock_Win32_LOCALGROUP_MEMBERS_INFO_3,
|
|
ret_value):
|
|
lmi = mock_Win32_LOCALGROUP_MEMBERS_INFO_3()
|
|
group_name = 'Admins'
|
|
netapi32 = self._windll_mock.netapi32
|
|
|
|
netapi32.NetLocalGroupAddMembers.return_value = ret_value
|
|
|
|
is_in_alias = ret_value != self._winutils.ERROR_MEMBER_IN_ALIAS
|
|
|
|
if ret_value is not 0 and is_in_alias:
|
|
self.assertRaises(
|
|
exception.CloudbaseInitException,
|
|
self._winutils.add_user_to_local_group,
|
|
self._USERNAME, group_name)
|
|
else:
|
|
self._winutils.add_user_to_local_group(self._USERNAME,
|
|
group_name)
|
|
|
|
netapi32.NetLocalGroupAddMembers.assert_called_with(
|
|
0, six.text_type(group_name), 3,
|
|
self._ctypes_mock.addressof.return_value, 1)
|
|
|
|
self._ctypes_mock.addressof.assert_called_once_with(lmi)
|
|
self.assertEqual(lmi.lgrmi3_domainandname,
|
|
six.text_type(self._USERNAME))
|
|
|
|
def test_add_user_to_local_group_no_error(self):
|
|
self._test_add_user_to_local_group(ret_value=0)
|
|
|
|
def test_add_user_to_local_group_not_found(self):
|
|
self._test_add_user_to_local_group(
|
|
ret_value=self._winutils.NERR_GroupNotFound)
|
|
|
|
def test_add_user_to_local_group_access_denied(self):
|
|
self._test_add_user_to_local_group(
|
|
ret_value=self._winutils.ERROR_ACCESS_DENIED)
|
|
|
|
def test_add_user_to_local_group_no_member(self):
|
|
self._test_add_user_to_local_group(
|
|
ret_value=self._winutils.ERROR_NO_SUCH_MEMBER)
|
|
|
|
def test_add_user_to_local_group_member_in_alias(self):
|
|
self._test_add_user_to_local_group(
|
|
ret_value=self._winutils.ERROR_MEMBER_IN_ALIAS)
|
|
|
|
def test_add_user_to_local_group_invalid_member(self):
|
|
self._test_add_user_to_local_group(
|
|
ret_value=self._winutils.ERROR_INVALID_MEMBER)
|
|
|
|
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils'
|
|
'._get_user_wmi_object')
|
|
def _test_get_user_sid(self, mock_get_user_wmi_object, fail):
|
|
r = mock.Mock()
|
|
if not fail:
|
|
mock_get_user_wmi_object.return_value = None
|
|
|
|
response = self._winutils.get_user_sid(self._USERNAME)
|
|
|
|
self.assertTrue(response is None)
|
|
else:
|
|
mock_get_user_wmi_object.return_value = r
|
|
|
|
response = self._winutils.get_user_sid(self._USERNAME)
|
|
|
|
self.assertTrue(response is not None)
|
|
mock_get_user_wmi_object.assert_called_with(self._USERNAME)
|
|
|
|
def test_get_user_sid(self):
|
|
self._test_get_user_sid(fail=False)
|
|
|
|
def test_get_user_sid_fail(self):
|
|
self._test_get_user_sid(fail=True)
|
|
|
|
@mock.patch('cloudbaseinit.osutils.windows.Win32_PROFILEINFO')
|
|
def _test_create_user_logon_session(self, mock_Win32_PROFILEINFO, logon,
|
|
loaduser,
|
|
load_profile=True):
|
|
self._wintypes_mock.HANDLE = mock.MagicMock()
|
|
pi = self.windows_utils.Win32_PROFILEINFO()
|
|
advapi32 = self._windll_mock.advapi32
|
|
userenv = self._windll_mock.userenv
|
|
kernel32 = self._windll_mock.kernel32
|
|
|
|
advapi32.LogonUserW.return_value = logon
|
|
|
|
if not logon:
|
|
self.assertRaises(
|
|
exception.CloudbaseInitException,
|
|
self._winutils.create_user_logon_session,
|
|
self._USERNAME, self._PASSWORD, domain='.',
|
|
load_profile=load_profile)
|
|
|
|
elif load_profile and not loaduser:
|
|
userenv.LoadUserProfileW.return_value = None
|
|
kernel32.CloseHandle.return_value = None
|
|
|
|
self.assertRaises(exception.CloudbaseInitException,
|
|
self._winutils.create_user_logon_session,
|
|
self._USERNAME, self._PASSWORD, domain='.',
|
|
load_profile=load_profile)
|
|
|
|
userenv.LoadUserProfileW.assert_called_with(
|
|
self._wintypes_mock.HANDLE.return_value,
|
|
self._ctypes_mock.byref.return_value)
|
|
self._ctypes_mock.byref.assert_called_with(pi)
|
|
|
|
kernel32.CloseHandle.assert_called_with(
|
|
self._wintypes_mock.HANDLE.return_value)
|
|
|
|
elif not load_profile:
|
|
response = self._winutils.create_user_logon_session(
|
|
self._USERNAME, self._PASSWORD, domain='.',
|
|
load_profile=load_profile)
|
|
self.assertTrue(response is not None)
|
|
else:
|
|
size = 1024
|
|
self._ctypes_mock.sizeof.return_value = size
|
|
|
|
mock_Win32_PROFILEINFO.return_value = loaduser
|
|
|
|
response = self._winutils.create_user_logon_session(
|
|
self._USERNAME, self._PASSWORD, domain='.',
|
|
load_profile=load_profile)
|
|
|
|
userenv.LoadUserProfileW.assert_called_with(
|
|
self._wintypes_mock.HANDLE.return_value,
|
|
self._ctypes_mock.byref.return_value)
|
|
self.assertTrue(response is not None)
|
|
|
|
def test_create_user_logon_session_fail_load_false(self):
|
|
self._test_create_user_logon_session(0, 0, True)
|
|
|
|
def test_create_user_logon_session_fail_load_true(self):
|
|
self._test_create_user_logon_session(0, 0, False)
|
|
|
|
def test_create_user_logon_session_load_true(self):
|
|
m = mock.Mock()
|
|
n = mock.Mock()
|
|
self._test_create_user_logon_session(m, n, True)
|
|
|
|
def test_create_user_logon_session_load_false(self):
|
|
m = mock.Mock()
|
|
n = mock.Mock()
|
|
self._test_create_user_logon_session(m, n, False)
|
|
|
|
def test_create_user_logon_session_no_load_true(self):
|
|
m = mock.Mock()
|
|
self._test_create_user_logon_session(m, None, True)
|
|
|
|
def test_create_user_logon_session_no_load_false(self):
|
|
m = mock.Mock()
|
|
self._test_create_user_logon_session(m, None, False)
|
|
|
|
def test_close_user_logon_session(self):
|
|
token = mock.Mock()
|
|
self._windll_mock.kernel32.CloseHandle = mock.MagicMock()
|
|
|
|
self._winutils.close_user_logon_session(token)
|
|
|
|
self._windll_mock.kernel32.CloseHandle.assert_called_with(token)
|
|
|
|
@mock.patch('ctypes.windll.kernel32.SetComputerNameExW')
|
|
def _test_set_host_name(self, mock_SetComputerNameExW, ret_value):
|
|
mock_SetComputerNameExW.return_value = ret_value
|
|
|
|
if not ret_value:
|
|
self.assertRaises(exception.CloudbaseInitException,
|
|
self._winutils.set_host_name, 'fake name')
|
|
else:
|
|
self.assertTrue(self._winutils.set_host_name('fake name'))
|
|
|
|
mock_SetComputerNameExW.assert_called_with(
|
|
self._winutils.ComputerNamePhysicalDnsHostname,
|
|
six.text_type('fake name'))
|
|
|
|
def test_set_host_name(self):
|
|
self._test_set_host_name(ret_value='fake response')
|
|
|
|
def test_set_host_exception(self):
|
|
self._test_set_host_name(ret_value=None)
|
|
|
|
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils'
|
|
'.get_user_sid')
|
|
def _test_get_user_home(self, user_sid, mock_get_user_sid):
|
|
mock_get_user_sid.return_value = user_sid
|
|
|
|
response = self._winutils.get_user_home(self._USERNAME)
|
|
|
|
if user_sid:
|
|
mock_get_user_sid.assert_called_with(self._USERNAME)
|
|
self._winreg_mock.OpenKey.assert_called_with(
|
|
self._winreg_mock.HKEY_LOCAL_MACHINE,
|
|
'SOFTWARE\\Microsoft\\Windows NT\\CurrentVersion\\'
|
|
'ProfileList\\%s' % mock_get_user_sid())
|
|
self.assertTrue(response is not None)
|
|
self._winreg_mock.QueryValueEx.assert_called_with(
|
|
self._winreg_mock.OpenKey.return_value.__enter__.return_value,
|
|
'ProfileImagePath')
|
|
else:
|
|
self.assertTrue(response is None)
|
|
|
|
def test_get_user_home(self):
|
|
user = mock.MagicMock()
|
|
self._test_get_user_home(user)
|
|
|
|
def test_get_user_home_fail(self):
|
|
self._test_get_user_home(None)
|
|
|
|
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils'
|
|
'.check_os_version')
|
|
def _test_get_network_adapters(self, is_xp_2003, mock_check_os_version):
|
|
conn = self._wmi_mock.WMI
|
|
mock_response = mock.MagicMock()
|
|
conn.return_value.query.return_value = [mock_response]
|
|
|
|
mock_check_os_version.return_value = not is_xp_2003
|
|
|
|
wql = ('SELECT * FROM Win32_NetworkAdapter WHERE '
|
|
'AdapterTypeId = 0 AND MACAddress IS NOT NULL')
|
|
|
|
if not is_xp_2003:
|
|
wql += ' AND PhysicalAdapter = True'
|
|
|
|
response = self._winutils.get_network_adapters()
|
|
conn.return_value.query.assert_called_with(wql)
|
|
self.assertEqual([(mock_response.Name, mock_response.MACAddress)],
|
|
response)
|
|
|
|
def test_get_network_adapters(self):
|
|
self._test_get_network_adapters(False)
|
|
|
|
def test_get_network_adapters_xp_2003(self):
|
|
self._test_get_network_adapters(True)
|
|
|
|
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils'
|
|
'._sanitize_wmi_input')
|
|
def _test_set_static_network_config(self, mock_sanitize_wmi_input,
|
|
adapter, ret_val1=None,
|
|
ret_val2=None, ret_val3=None):
|
|
conn = self._wmi_mock.WMI
|
|
address = '10.10.10.10'
|
|
mac_address = '54:EE:75:19:F4:61'
|
|
broadcast = '0.0.0.0'
|
|
dns_list = ['8.8.8.8']
|
|
|
|
if not adapter:
|
|
self.assertRaises(
|
|
exception.CloudbaseInitException,
|
|
self._winutils.set_static_network_config,
|
|
mac_address, address, self._NETMASK,
|
|
broadcast, self._GATEWAY, dns_list)
|
|
else:
|
|
conn.return_value.query.return_value = adapter
|
|
adapter_config = adapter[0].associators()[0]
|
|
adapter_config.EnableStatic.return_value = ret_val1
|
|
adapter_config.SetGateways.return_value = ret_val2
|
|
adapter_config.SetDNSServerSearchOrder.return_value = ret_val3
|
|
adapter.__len__.return_value = 1
|
|
|
|
if ret_val1[0] > 1:
|
|
self.assertRaises(
|
|
exception.CloudbaseInitException,
|
|
self._winutils.set_static_network_config,
|
|
mac_address, address, self._NETMASK,
|
|
broadcast, self._GATEWAY, dns_list)
|
|
|
|
elif ret_val2[0] > 1:
|
|
self.assertRaises(
|
|
exception.CloudbaseInitException,
|
|
self._winutils.set_static_network_config,
|
|
mac_address, address, self._NETMASK,
|
|
broadcast, self._GATEWAY, dns_list)
|
|
|
|
elif ret_val3[0] > 1:
|
|
self.assertRaises(
|
|
exception.CloudbaseInitException,
|
|
self._winutils.set_static_network_config,
|
|
mac_address, address, self._NETMASK,
|
|
broadcast, self._GATEWAY, dns_list)
|
|
|
|
else:
|
|
response = self._winutils.set_static_network_config(
|
|
mac_address, address, self._NETMASK,
|
|
broadcast, self._GATEWAY, dns_list)
|
|
|
|
if ret_val1[0] or ret_val2[0] or ret_val3[0] == 1:
|
|
self.assertTrue(response)
|
|
else:
|
|
self.assertFalse(response)
|
|
adapter_config.EnableStatic.assert_called_with(
|
|
[address], [self._NETMASK])
|
|
adapter_config.SetGateways.assert_called_with(
|
|
[self._GATEWAY], [1])
|
|
adapter_config.SetDNSServerSearchOrder.assert_called_with(
|
|
dns_list)
|
|
|
|
adapter[0].associators.assert_called_with(
|
|
wmi_result_class='Win32_NetworkAdapterConfiguration')
|
|
conn.return_value.query.assert_called_with(
|
|
"SELECT * FROM Win32_NetworkAdapter WHERE "
|
|
"MACAddress = '{}'".format(mac_address)
|
|
)
|
|
|
|
def test_set_static_network_config(self):
|
|
adapter = mock.MagicMock()
|
|
ret_val1 = (1,)
|
|
ret_val2 = (1,)
|
|
ret_val3 = (0,)
|
|
self._test_set_static_network_config(adapter=adapter,
|
|
ret_val1=ret_val1,
|
|
ret_val2=ret_val2,
|
|
ret_val3=ret_val3)
|
|
|
|
def test_set_static_network_config_query_fail(self):
|
|
self._test_set_static_network_config(adapter=None)
|
|
|
|
def test_set_static_network_config_cannot_set_ip(self):
|
|
adapter = mock.MagicMock()
|
|
ret_val1 = (2,)
|
|
self._test_set_static_network_config(adapter=adapter,
|
|
ret_val1=ret_val1)
|
|
|
|
def test_set_static_network_config_cannot_set_gateway(self):
|
|
adapter = mock.MagicMock()
|
|
ret_val1 = (1,)
|
|
ret_val2 = (2,)
|
|
self._test_set_static_network_config(adapter=adapter,
|
|
ret_val1=ret_val1,
|
|
ret_val2=ret_val2)
|
|
|
|
def test_set_static_network_config_cannot_set_DNS(self):
|
|
adapter = mock.MagicMock()
|
|
ret_val1 = (1,)
|
|
ret_val2 = (1,)
|
|
ret_val3 = (2,)
|
|
self._test_set_static_network_config(adapter=adapter,
|
|
ret_val1=ret_val1,
|
|
ret_val2=ret_val2,
|
|
ret_val3=ret_val3)
|
|
|
|
def test_set_static_network_config_no_reboot(self):
|
|
adapter = mock.MagicMock()
|
|
ret_val1 = (0,)
|
|
ret_val2 = (0,)
|
|
ret_val3 = (0,)
|
|
self._test_set_static_network_config(adapter=adapter,
|
|
ret_val1=ret_val1,
|
|
ret_val2=ret_val2,
|
|
ret_val3=ret_val3)
|
|
|
|
def _test_get_config_key_name(self, section):
|
|
response = self._winutils._get_config_key_name(section)
|
|
if section:
|
|
self.assertEqual(self._winutils._config_key + section + '\\',
|
|
response)
|
|
else:
|
|
self.assertEqual(self._winutils._config_key, response)
|
|
|
|
def test_get_config_key_name_with_section(self):
|
|
self._test_get_config_key_name(self._SECTION)
|
|
|
|
def test_get_config_key_name_no_section(self):
|
|
self._test_get_config_key_name(None)
|
|
|
|
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils'
|
|
'._get_config_key_name')
|
|
def _test_set_config_value(self, value, mock_get_config_key_name):
|
|
key_name = (self._winutils._config_key + self._SECTION + '\\' +
|
|
self._CONFIG_NAME)
|
|
mock_get_config_key_name.return_value = key_name
|
|
|
|
self._winutils.set_config_value(self._CONFIG_NAME, value,
|
|
self._SECTION)
|
|
|
|
key = self._winreg_mock.CreateKey.return_value.__enter__.return_value
|
|
|
|
self._winreg_mock.CreateKey.assert_called_with(
|
|
self._winreg_mock.HKEY_LOCAL_MACHINE, key_name)
|
|
mock_get_config_key_name.assert_called_with(self._SECTION)
|
|
|
|
if type(value) == int:
|
|
self._winreg_mock.SetValueEx.assert_called_with(
|
|
key, self._CONFIG_NAME, 0, self._winreg_mock.REG_DWORD, value)
|
|
|
|
else:
|
|
self._winreg_mock.SetValueEx.assert_called_with(
|
|
key, self._CONFIG_NAME, 0, self._winreg_mock.REG_SZ, value)
|
|
|
|
def test_set_config_value_int(self):
|
|
self._test_set_config_value(1)
|
|
|
|
def test_set_config_value_not_int(self):
|
|
self._test_set_config_value('1')
|
|
|
|
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils'
|
|
'._get_config_key_name')
|
|
def _test_get_config_value(self, value, mock_get_config_key_name):
|
|
key_name = self._winutils._config_key + self._SECTION + '\\'
|
|
key_name += self._CONFIG_NAME
|
|
|
|
if type(value) == int:
|
|
regtype = self._winreg_mock.REG_DWORD
|
|
else:
|
|
regtype = self._winreg_mock.REG_SZ
|
|
|
|
self._winreg_mock.QueryValueEx.return_value = (value, regtype)
|
|
|
|
if value is None:
|
|
mock_get_config_key_name.side_effect = [
|
|
self.windows_utils.WindowsError]
|
|
response = self._winutils.get_config_value(self._CONFIG_NAME,
|
|
self._SECTION)
|
|
self.assertEqual(None, response)
|
|
|
|
else:
|
|
mock_get_config_key_name.return_value = key_name
|
|
|
|
response = self._winutils.get_config_value(self._CONFIG_NAME,
|
|
self._SECTION)
|
|
|
|
self._winreg_mock.OpenKey.assert_called_with(
|
|
self._winreg_mock.HKEY_LOCAL_MACHINE, key_name)
|
|
mock_get_config_key_name.assert_called_with(self._SECTION)
|
|
self._winreg_mock.QueryValueEx.assert_called_with(
|
|
self._winreg_mock.OpenKey().__enter__(), self._CONFIG_NAME)
|
|
self.assertEqual(value, response)
|
|
|
|
def test_get_config_value_type_int(self):
|
|
self._test_get_config_value(1)
|
|
|
|
def test_get_config_value_type_str(self):
|
|
self._test_get_config_value('fake')
|
|
|
|
def test_get_config_value_type_error(self):
|
|
self._test_get_config_value(None)
|
|
|
|
@mock.patch('time.sleep')
|
|
def _test_wait_for_boot_completion(self, ret_val, mock_sleep):
|
|
self._winreg_mock.QueryValueEx.side_effect = [ret_val]
|
|
|
|
self._winutils.wait_for_boot_completion()
|
|
|
|
key = self._winreg_mock.OpenKey.return_value.__enter__.return_value
|
|
self._winreg_mock.OpenKey.assert_called_with(
|
|
self._winreg_mock.HKEY_LOCAL_MACHINE,
|
|
"SYSTEM\\Setup\\Status\\SysprepStatus", 0,
|
|
self._winreg_mock.KEY_READ)
|
|
|
|
self._winreg_mock.QueryValueEx.assert_called_with(
|
|
key, "GeneralizationState")
|
|
|
|
def test_wait_for_boot_completion(self):
|
|
ret_val = [7]
|
|
self._test_wait_for_boot_completion(ret_val)
|
|
|
|
def test_get_service(self):
|
|
conn = self._wmi_mock.WMI
|
|
conn.return_value.Win32_Service.return_value = ['fake name']
|
|
|
|
response = self._winutils._get_service('fake name')
|
|
|
|
conn.assert_called_with(moniker='//./root/cimv2')
|
|
conn.return_value.Win32_Service.assert_called_with(Name='fake name')
|
|
self.assertEqual('fake name', response)
|
|
|
|
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils'
|
|
'._get_service')
|
|
def test_check_service_exists(self, mock_get_service):
|
|
mock_get_service.return_value = 'not None'
|
|
|
|
response = self._winutils.check_service_exists('fake name')
|
|
|
|
self.assertTrue(response)
|
|
|
|
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils'
|
|
'._get_service')
|
|
def test_get_service_status(self, mock_get_service):
|
|
mock_service = mock.MagicMock()
|
|
mock_get_service.return_value = mock_service
|
|
|
|
response = self._winutils.get_service_status('fake name')
|
|
|
|
self.assertEqual(mock_service.State, response)
|
|
|
|
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils'
|
|
'._get_service')
|
|
def test_get_service_start_mode(self, mock_get_service):
|
|
mock_service = mock.MagicMock()
|
|
mock_get_service.return_value = mock_service
|
|
|
|
response = self._winutils.get_service_start_mode('fake name')
|
|
|
|
self.assertEqual(mock_service.StartMode, response)
|
|
|
|
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils'
|
|
'._get_service')
|
|
def _test_set_service_start_mode(self, mock_get_service, ret_val):
|
|
mock_service = mock.MagicMock()
|
|
mock_get_service.return_value = mock_service
|
|
mock_service.ChangeStartMode.return_value = (ret_val,)
|
|
|
|
if ret_val != 0:
|
|
self.assertRaises(exception.CloudbaseInitException,
|
|
self._winutils.set_service_start_mode,
|
|
'fake name', 'fake mode')
|
|
else:
|
|
self._winutils.set_service_start_mode('fake name', 'fake mode')
|
|
|
|
mock_service.ChangeStartMode.assert_called_once_with('fake mode')
|
|
|
|
def test_set_service_start_mode(self):
|
|
self._test_set_service_start_mode(ret_val=0)
|
|
|
|
def test_set_service_start_mode_exception(self):
|
|
self._test_set_service_start_mode(ret_val=1)
|
|
|
|
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils'
|
|
'._get_service')
|
|
def _test_start_service(self, mock_get_service, ret_val):
|
|
mock_service = mock.MagicMock()
|
|
mock_get_service.return_value = mock_service
|
|
mock_service.StartService.return_value = (ret_val,)
|
|
|
|
if ret_val != 0:
|
|
self.assertRaises(exception.CloudbaseInitException,
|
|
self._winutils.start_service,
|
|
'fake name')
|
|
else:
|
|
self._winutils.start_service('fake name')
|
|
|
|
mock_service.StartService.assert_called_once_with()
|
|
|
|
def test_start_service(self):
|
|
self._test_set_service_start_mode(ret_val=0)
|
|
|
|
def test_start_service_exception(self):
|
|
self._test_set_service_start_mode(ret_val=1)
|
|
|
|
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils'
|
|
'._get_service')
|
|
def _test_stop_service(self, mock_get_service, ret_val):
|
|
mock_service = mock.MagicMock()
|
|
mock_get_service.return_value = mock_service
|
|
mock_service.StopService.return_value = (ret_val,)
|
|
|
|
if ret_val != 0:
|
|
self.assertRaises(exception.CloudbaseInitException,
|
|
self._winutils.stop_service,
|
|
'fake name')
|
|
else:
|
|
self._winutils.stop_service('fake name')
|
|
|
|
mock_service.StopService.assert_called_once_with()
|
|
|
|
def test_stop_service(self):
|
|
self._test_stop_service(ret_val=0)
|
|
|
|
def test_stop_service_exception(self):
|
|
self._test_stop_service(ret_val=1)
|
|
|
|
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils'
|
|
'.stop_service')
|
|
@mock.patch('time.sleep')
|
|
def test_terminate(self, mock_sleep, mock_stop_service):
|
|
self._winutils.terminate()
|
|
mock_stop_service.assert_called_with(self._winutils._service_name)
|
|
mock_sleep.assert_called_with(3)
|
|
|
|
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils'
|
|
'._get_ipv4_routing_table')
|
|
def _test_get_default_gateway(self, mock_get_ipv4_routing_table,
|
|
routing_table):
|
|
mock_get_ipv4_routing_table.return_value = [routing_table]
|
|
response = self._winutils.get_default_gateway()
|
|
mock_get_ipv4_routing_table.assert_called_once_with()
|
|
if routing_table[0] == '0.0.0.0':
|
|
self.assertEqual((routing_table[3], routing_table[2]), response)
|
|
else:
|
|
self.assertEqual((None, None), response)
|
|
|
|
def test_get_default_gateway(self):
|
|
routing_table = ['0.0.0.0', '1.1.1.1', self._GATEWAY, '8.8.8.8']
|
|
self._test_get_default_gateway(routing_table=routing_table)
|
|
|
|
def test_get_default_gateway_error(self):
|
|
routing_table = ['1.1.1.1', '1.1.1.1', self._GATEWAY, '8.8.8.8']
|
|
self._test_get_default_gateway(routing_table=routing_table)
|
|
|
|
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils'
|
|
'._get_ipv4_routing_table')
|
|
def _test_check_static_route_exists(self, mock_get_ipv4_routing_table,
|
|
routing_table):
|
|
mock_get_ipv4_routing_table.return_value = [routing_table]
|
|
|
|
response = self._winutils.check_static_route_exists(self._DESTINATION)
|
|
|
|
mock_get_ipv4_routing_table.assert_called_once_with()
|
|
if routing_table[0] == self._DESTINATION:
|
|
self.assertTrue(response)
|
|
else:
|
|
self.assertFalse(response)
|
|
|
|
def test_check_static_route_exists_true(self):
|
|
routing_table = [self._DESTINATION, '1.1.1.1', self._GATEWAY,
|
|
'8.8.8.8']
|
|
self._test_check_static_route_exists(routing_table=routing_table)
|
|
|
|
def test_check_static_route_exists_false(self):
|
|
routing_table = ['0.0.0.0', '1.1.1.1', self._GATEWAY, '8.8.8.8']
|
|
self._test_check_static_route_exists(routing_table=routing_table)
|
|
|
|
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils.execute_process')
|
|
def _test_add_static_route(self, mock_execute_process, err):
|
|
next_hop = '10.10.10.10'
|
|
interface_index = 1
|
|
metric = 9
|
|
args = ['ROUTE', 'ADD', self._DESTINATION, 'MASK', self._NETMASK,
|
|
next_hop]
|
|
mock_execute_process.return_value = (None, err, None)
|
|
|
|
if err:
|
|
self.assertRaises(exception.CloudbaseInitException,
|
|
self._winutils.add_static_route,
|
|
self._DESTINATION, self._NETMASK, next_hop,
|
|
interface_index, metric)
|
|
|
|
else:
|
|
self._winutils.add_static_route(self._DESTINATION, self._NETMASK,
|
|
next_hop, interface_index, metric)
|
|
mock_execute_process.assert_called_with(args)
|
|
|
|
def test_add_static_route(self):
|
|
self._test_add_static_route(err=404)
|
|
|
|
def test_add_static_route_fail(self):
|
|
self._test_add_static_route(err=None)
|
|
|
|
def _test_check_os_version(self, ret_value, error_value=None):
|
|
self._windll_mock.kernel32.VerSetConditionMask.return_value = 2
|
|
self._windll_mock.kernel32.VerifyVersionInfoW.return_value = ret_value
|
|
self._windll_mock.kernel32.GetLastError.return_value = error_value
|
|
|
|
old_version = self._winutils.ERROR_OLD_WIN_VERSION
|
|
|
|
if error_value and error_value is not old_version:
|
|
self.assertRaises(exception.CloudbaseInitException,
|
|
self._winutils.check_os_version, 3, 1, 2)
|
|
self._windll_mock.kernel32.GetLastError.assert_called_once_with()
|
|
|
|
else:
|
|
response = self._winutils.check_os_version(3, 1, 2)
|
|
|
|
self._ctypes_mock.sizeof.assert_called_once_with(
|
|
self.windows_utils.Win32_OSVERSIONINFOEX_W)
|
|
self.assertEqual(
|
|
3, self._windll_mock.kernel32.VerSetConditionMask.call_count)
|
|
|
|
self._windll_mock.kernel32.VerifyVersionInfoW.assert_called_with(
|
|
self._ctypes_mock.byref.return_value, 1 | 2 | 3 | 7, 2)
|
|
|
|
if error_value is old_version:
|
|
self._windll_mock.kernel32.GetLastError.assert_called_with()
|
|
self.assertFalse(response)
|
|
else:
|
|
self.assertTrue(response)
|
|
|
|
def test_check_os_version(self):
|
|
m = mock.MagicMock()
|
|
self._test_check_os_version(ret_value=m)
|
|
|
|
def test_check_os_version_expect_False(self):
|
|
self._test_check_os_version(
|
|
ret_value=None, error_value=self._winutils.ERROR_OLD_WIN_VERSION)
|
|
|
|
def test_check_os_version_exception(self):
|
|
self._test_check_os_version(ret_value=None, error_value=9999)
|
|
|
|
def _test_get_volume_label(self, ret_val):
|
|
label = mock.MagicMock()
|
|
max_label_size = 261
|
|
drive = 'Fake_drive'
|
|
self._ctypes_mock.create_unicode_buffer.return_value = label
|
|
self._windll_mock.kernel32.GetVolumeInformationW.return_value = ret_val
|
|
|
|
response = self._winutils.get_volume_label(drive)
|
|
|
|
if ret_val:
|
|
self.assertTrue(response is not None)
|
|
else:
|
|
self.assertTrue(response is None)
|
|
|
|
self._ctypes_mock.create_unicode_buffer.assert_called_with(
|
|
max_label_size)
|
|
self._windll_mock.kernel32.GetVolumeInformationW.assert_called_with(
|
|
drive, label, max_label_size, 0, 0, 0, 0, 0)
|
|
|
|
def test_get_volume_label(self):
|
|
self._test_get_volume_label('ret')
|
|
|
|
def test_get_volume_label_no_return_value(self):
|
|
self._test_get_volume_label(None)
|
|
|
|
@mock.patch('re.search')
|
|
@mock.patch('cloudbaseinit.osutils.base.BaseOSUtils.'
|
|
'generate_random_password')
|
|
def test_generate_random_password(self, mock_generate_random_password,
|
|
mock_search):
|
|
length = 14
|
|
mock_search.return_value = True
|
|
mock_generate_random_password.return_value = 'Passw0rd'
|
|
|
|
response = self._winutils.generate_random_password(length)
|
|
|
|
mock_generate_random_password.assert_called_once_with(length)
|
|
self.assertEqual('Passw0rd', response)
|
|
|
|
def _test_get_logical_drives(self, buf_length):
|
|
mock_buf = mock.MagicMock()
|
|
mock_buf.__getitem__.side_effect = ['1', '\x00']
|
|
mock_get_drives = self._windll_mock.kernel32.GetLogicalDriveStringsW
|
|
|
|
self._ctypes_mock.create_unicode_buffer.return_value = mock_buf
|
|
mock_get_drives.return_value = buf_length
|
|
|
|
if buf_length is None:
|
|
self.assertRaises(exception.CloudbaseInitException,
|
|
self._winutils._get_logical_drives)
|
|
|
|
else:
|
|
response = self._winutils._get_logical_drives()
|
|
|
|
self._ctypes_mock.create_unicode_buffer.assert_called_with(261)
|
|
mock_get_drives.assert_called_with(260, mock_buf)
|
|
self.assertEqual(['1'], response)
|
|
|
|
def test_get_logical_drives_exception(self):
|
|
self._test_get_logical_drives(buf_length=None)
|
|
|
|
def test_get_logical_drives(self):
|
|
self._test_get_logical_drives(buf_length=2)
|
|
|
|
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils.'
|
|
'_get_logical_drives')
|
|
@mock.patch('cloudbaseinit.osutils.windows.kernel32')
|
|
def test_get_cdrom_drives(self, mock_kernel32, mock_get_logical_drives):
|
|
mock_get_logical_drives.return_value = ['drive']
|
|
mock_kernel32.GetDriveTypeW.return_value = self._winutils.DRIVE_CDROM
|
|
|
|
response = self._winutils.get_cdrom_drives()
|
|
|
|
mock_get_logical_drives.assert_called_with()
|
|
self.assertEqual(['drive'], response)
|
|
|
|
@mock.patch('cloudbaseinit.osutils.windows.msvcrt')
|
|
@mock.patch('cloudbaseinit.osutils.windows.kernel32')
|
|
@mock.patch('cloudbaseinit.osutils.windows.setupapi')
|
|
@mock.patch('cloudbaseinit.osutils.windows.Win32_STORAGE_DEVICE_NUMBER')
|
|
def _test_get_physical_disks(self, mock_sdn, mock_setupapi, mock_kernel32,
|
|
mock_msvcrt, handle_disks, last_error,
|
|
interface_detail, disk_handle, io_control):
|
|
|
|
sizeof_calls = [
|
|
mock.call(
|
|
self.windows_utils.Win32_SP_DEVICE_INTERFACE_DATA
|
|
),
|
|
mock.call(
|
|
self.windows_utils.Win32_SP_DEVICE_INTERFACE_DETAIL_DATA_W
|
|
),
|
|
mock.call(mock_sdn())
|
|
]
|
|
device_interfaces_calls = [
|
|
mock.call(
|
|
handle_disks, None, self._ctypes_mock.byref.return_value, 0,
|
|
self._ctypes_mock.byref.return_value),
|
|
mock.call(handle_disks, None,
|
|
self._ctypes_mock.byref.return_value, 1,
|
|
self._ctypes_mock.byref.return_value)]
|
|
device_path = self._ctypes_mock.cast.return_value.contents.DevicePath
|
|
cast_calls = [mock.call(mock_msvcrt.malloc(),
|
|
self._ctypes_mock.POINTER.return_value),
|
|
mock.call(device_path,
|
|
self._wintypes_mock.LPWSTR)]
|
|
|
|
mock_setup_interface = mock_setupapi.SetupDiGetDeviceInterfaceDetailW
|
|
|
|
mock_setupapi.SetupDiGetClassDevsW.return_value = handle_disks
|
|
mock_kernel32.GetLastError.return_value = last_error
|
|
mock_setup_interface.return_value = interface_detail
|
|
mock_kernel32.CreateFileW.return_value = disk_handle
|
|
mock_kernel32.DeviceIoControl.return_value = io_control
|
|
|
|
mock_setupapi.SetupDiEnumDeviceInterfaces.side_effect = [True, False]
|
|
|
|
if handle_disks == self._winutils.INVALID_HANDLE_VALUE or (
|
|
last_error != self._winutils.ERROR_INSUFFICIENT_BUFFER) and not (
|
|
interface_detail) or (
|
|
disk_handle == self._winutils.INVALID_HANDLE_VALUE) or (
|
|
not io_control):
|
|
|
|
self.assertRaises(exception.CloudbaseInitException,
|
|
self._winutils.get_physical_disks)
|
|
|
|
else:
|
|
response = self._winutils.get_physical_disks()
|
|
|
|
self.assertEqual(sizeof_calls,
|
|
self._ctypes_mock.sizeof.call_args_list)
|
|
|
|
self.assertEqual(
|
|
device_interfaces_calls,
|
|
mock_setupapi.SetupDiEnumDeviceInterfaces.call_args_list)
|
|
|
|
if not interface_detail:
|
|
mock_kernel32.GetLastError.assert_called_once_with()
|
|
|
|
self._ctypes_mock.POINTER.assert_called_with(
|
|
self.windows_utils.Win32_SP_DEVICE_INTERFACE_DETAIL_DATA_W)
|
|
mock_msvcrt.malloc.assert_called_with(
|
|
self._ctypes_mock.c_size_t.return_value
|
|
)
|
|
|
|
self.assertEqual(cast_calls, self._ctypes_mock.cast.call_args_list)
|
|
|
|
mock_setup_interface.assert_called_with(
|
|
handle_disks, self._ctypes_mock.byref.return_value,
|
|
self._ctypes_mock.cast.return_value,
|
|
self._wintypes_mock.DWORD.return_value,
|
|
None, None)
|
|
|
|
mock_kernel32.CreateFileW.assert_called_with(
|
|
self._ctypes_mock.cast.return_value.value, 0,
|
|
self._winutils.FILE_SHARE_READ, None,
|
|
self._winutils.OPEN_EXISTING, 0, 0)
|
|
mock_sdn.assert_called_with()
|
|
|
|
mock_kernel32.DeviceIoControl.assert_called_with(
|
|
disk_handle, self._winutils.IOCTL_STORAGE_GET_DEVICE_NUMBER,
|
|
None, 0, self._ctypes_mock.byref.return_value,
|
|
self._ctypes_mock.sizeof.return_value,
|
|
self._ctypes_mock.byref.return_value, None)
|
|
|
|
self.assertEqual(["\\\\.\PHYSICALDRIVE1"], response)
|
|
|
|
mock_setupapi.SetupDiDestroyDeviceInfoList.assert_called_once_with(
|
|
handle_disks)
|
|
|
|
mock_setupapi.SetupDiGetClassDevsW.assert_called_once_with(
|
|
self._ctypes_mock.byref.return_value, None, None,
|
|
self._winutils.DIGCF_PRESENT |
|
|
self._winutils.DIGCF_DEVICEINTERFACE)
|
|
|
|
def test_get_physical_disks(self):
|
|
mock_handle_disks = mock.MagicMock()
|
|
mock_disk_handle = mock.MagicMock()
|
|
self._test_get_physical_disks(
|
|
handle_disks=mock_handle_disks,
|
|
last_error=self._winutils.ERROR_INSUFFICIENT_BUFFER,
|
|
interface_detail='fake interface detail',
|
|
disk_handle=mock_disk_handle, io_control=True)
|
|
|
|
def test_get_physical_disks_other_error_and_no_interface_detail(self):
|
|
mock_handle_disks = mock.MagicMock()
|
|
mock_disk_handle = mock.MagicMock()
|
|
self._test_get_physical_disks(
|
|
handle_disks=mock_handle_disks,
|
|
last_error='other', interface_detail=None,
|
|
disk_handle=mock_disk_handle, io_control=True)
|
|
|
|
def test_get_physical_disks_invalid_disk_handle(self):
|
|
mock_handle_disks = mock.MagicMock()
|
|
self._test_get_physical_disks(
|
|
handle_disks=mock_handle_disks,
|
|
last_error=self._winutils.ERROR_INSUFFICIENT_BUFFER,
|
|
interface_detail='fake interface detail',
|
|
disk_handle=self._winutils.INVALID_HANDLE_VALUE, io_control=True)
|
|
|
|
def test_get_physical_disks_io_control(self):
|
|
mock_handle_disks = mock.MagicMock()
|
|
mock_disk_handle = mock.MagicMock()
|
|
self._test_get_physical_disks(
|
|
handle_disks=mock_handle_disks,
|
|
last_error=self._winutils.ERROR_INSUFFICIENT_BUFFER,
|
|
interface_detail='fake interface detail',
|
|
disk_handle=mock_disk_handle, io_control=False)
|
|
|
|
def test_get_physical_disks_handle_disks_invalid(self):
|
|
mock_disk_handle = mock.MagicMock()
|
|
self._test_get_physical_disks(
|
|
handle_disks=self._winutils.INVALID_HANDLE_VALUE,
|
|
last_error=self._winutils.ERROR_INSUFFICIENT_BUFFER,
|
|
interface_detail='fake interface detail',
|
|
disk_handle=mock_disk_handle, io_control=True)
|
|
|
|
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils._get_fw_protocol')
|
|
def test_firewall_create_rule(self, mock_get_fw_protocol):
|
|
expected = [mock.call("HNetCfg.FWOpenPort"),
|
|
mock.call("HNetCfg.FwMgr")]
|
|
|
|
self._winutils.firewall_create_rule(
|
|
name='fake name', port=9999, protocol=self._winutils.PROTOCOL_TCP)
|
|
|
|
self.assertEqual(expected, self._client_mock.Dispatch.call_args_list)
|
|
mock_get_fw_protocol.assert_called_once_with(
|
|
self._winutils.PROTOCOL_TCP)
|
|
|
|
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils._get_fw_protocol')
|
|
def test_firewall_remove_rule(self, mock_get_fw_protocol):
|
|
self._winutils.firewall_remove_rule(
|
|
name='fake name', port=9999, protocol=self._winutils.PROTOCOL_TCP)
|
|
|
|
self._client_mock.Dispatch.assert_called_once_with("HNetCfg.FwMgr")
|
|
mock_get_fw_protocol.assert_called_once_with(
|
|
self._winutils.PROTOCOL_TCP)
|
|
|
|
@mock.patch('os.path.expandvars')
|
|
def test_get_system32_dir(self, mock_expandvars):
|
|
mock_expandvars.return_value = 'fake_system32'
|
|
|
|
response = self._winutils.get_system32_dir()
|
|
|
|
self.assertEqual('fake_system32', response)
|
|
|
|
@mock.patch('os.path.expandvars')
|
|
def test_get_sysnative_dir(self, mock_expandvars):
|
|
mock_expandvars.return_value = 'fake_sysnative'
|
|
|
|
response = self._winutils.get_sysnative_dir()
|
|
|
|
self.assertEqual('fake_sysnative', response)
|
|
|
|
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils'
|
|
'.get_sysnative_dir')
|
|
@mock.patch('os.path.isdir')
|
|
def test_check_sysnative_dir_exists(self, mock_isdir,
|
|
mock_get_sysnative_dir):
|
|
mock_get_sysnative_dir.return_value = 'fake_sysnative'
|
|
mock_isdir.return_value = True
|
|
|
|
response = self._winutils.check_sysnative_dir_exists()
|
|
|
|
self.assertTrue(response)
|
|
|
|
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils'
|
|
'.check_sysnative_dir_exists')
|
|
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils'
|
|
'.get_sysnative_dir')
|
|
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils'
|
|
'.get_system32_dir')
|
|
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils'
|
|
'.execute_process')
|
|
def _test_execute_powershell_script(self, mock_execute_process,
|
|
mock_get_system32_dir,
|
|
mock_get_sysnative_dir,
|
|
mock_check_sysnative_dir_exists,
|
|
ret_val):
|
|
mock_check_sysnative_dir_exists.return_value = ret_val
|
|
mock_get_sysnative_dir.return_value = 'fake'
|
|
mock_get_system32_dir.return_value = 'fake'
|
|
fake_path = os.path.join('fake', 'WindowsPowerShell\\v1.0\\'
|
|
'powershell.exe')
|
|
args = [fake_path, '-ExecutionPolicy', 'RemoteSigned',
|
|
'-NonInteractive', '-File', 'fake_script_path']
|
|
|
|
response = self._winutils.execute_powershell_script(
|
|
script_path='fake_script_path')
|
|
|
|
if ret_val is True:
|
|
mock_get_sysnative_dir.assert_called_once_with()
|
|
else:
|
|
mock_get_system32_dir.assert_called_once_with()
|
|
|
|
mock_execute_process.assert_called_with(args, shell=False)
|
|
self.assertEqual(mock_execute_process.return_value, response)
|
|
|
|
def test_execute_powershell_script_sysnative(self):
|
|
self._test_execute_powershell_script(ret_val=True)
|
|
|
|
def test_execute_powershell_script_system32(self):
|
|
self._test_execute_powershell_script(ret_val=False)
|
|
|
|
@mock.patch('cloudbaseinit.utils.windows.network.get_adapter_addresses')
|
|
def test_get_dhcp_hosts_in_use(self, mock_get_adapter_addresses):
|
|
net_addr = {}
|
|
net_addr["mac_address"] = 'fake mac address'
|
|
net_addr["dhcp_server"] = 'fake dhcp server'
|
|
net_addr["dhcp_enabled"] = True
|
|
mock_get_adapter_addresses.return_value = [net_addr]
|
|
|
|
response = self._winutils.get_dhcp_hosts_in_use()
|
|
|
|
mock_get_adapter_addresses.assert_called()
|
|
self.assertEqual([('fake mac address', 'fake dhcp server')], response)
|
|
|
|
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils'
|
|
'.check_sysnative_dir_exists')
|
|
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils'
|
|
'.get_sysnative_dir')
|
|
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils'
|
|
'.get_system32_dir')
|
|
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils'
|
|
'.execute_process')
|
|
def _test_set_ntp_client_config(self, mock_execute_process,
|
|
mock_get_system32_dir,
|
|
mock_get_sysnative_dir,
|
|
mock_check_sysnative_dir_exists,
|
|
sysnative, ret_val):
|
|
fake_base_dir = 'fake_dir'
|
|
ntp_hosts = ["first", "second"]
|
|
|
|
mock_check_sysnative_dir_exists.return_value = sysnative
|
|
mock_execute_process.return_value = (None, None, ret_val)
|
|
w32tm_path = os.path.join(fake_base_dir, "w32tm.exe")
|
|
mock_get_sysnative_dir.return_value = fake_base_dir
|
|
mock_get_system32_dir.return_value = fake_base_dir
|
|
|
|
args = [w32tm_path, '/config',
|
|
'/manualpeerlist:%s' % ",".join(ntp_hosts),
|
|
'/syncfromflags:manual', '/update']
|
|
|
|
if ret_val:
|
|
self.assertRaises(exception.CloudbaseInitException,
|
|
self._winutils.set_ntp_client_config,
|
|
'fake ntp host')
|
|
|
|
else:
|
|
self._winutils.set_ntp_client_config(ntp_hosts=ntp_hosts)
|
|
|
|
if sysnative:
|
|
mock_get_sysnative_dir.assert_called_once_with()
|
|
else:
|
|
mock_get_system32_dir.assert_called_once_with()
|
|
|
|
mock_execute_process.assert_called_once_with(args, shell=False)
|
|
|
|
def test_set_ntp_client_config_sysnative_true(self):
|
|
self._test_set_ntp_client_config(sysnative=True, ret_val=None)
|
|
|
|
def test_set_ntp_client_config_sysnative_false(self):
|
|
self._test_set_ntp_client_config(sysnative=False, ret_val=None)
|
|
|
|
def test_set_ntp_client_config_sysnative_exception(self):
|
|
self._test_set_ntp_client_config(sysnative=False,
|
|
ret_val='fake return value')
|
|
|
|
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils.'
|
|
'_get_system_dir')
|
|
@mock.patch('cloudbaseinit.osutils.base.BaseOSUtils.'
|
|
'execute_process')
|
|
def test_execute_system32_process(self, mock_execute_process,
|
|
mock_get_system_dir):
|
|
mock_get_system_dir.return_value = 'base_dir'
|
|
mock_execute_process.return_value = mock.sentinel.execute_process
|
|
args = ['command', 'argument']
|
|
|
|
result = self._winutils.execute_system32_process(args)
|
|
mock_execute_process.assert_called_once_with(
|
|
[os.path.join('base_dir', args[0])] + args[1:],
|
|
decode_output=False,
|
|
shell=True)
|
|
self.assertEqual(mock.sentinel.execute_process, result)
|