
Instead of a separate cloudinit.test package. Change-Id: I6ea05a0f57c93a944f435f306a064d918fd26152
362 lines
14 KiB
Python
362 lines
14 KiB
Python
# Copyright 2015 Canonical Ltd.
|
|
# This file is part of cloud-init. See LICENCE file for license information.
|
|
#
|
|
# vi: ts=4 expandtab
|
|
|
|
import importlib
|
|
import subprocess
|
|
|
|
from cloudinit import exceptions
|
|
from cloudinit import tests
|
|
from cloudinit.tests.util import LogSnatcher
|
|
from cloudinit.tests.util import mock
|
|
|
|
|
|
class TestNetworkWindows(tests.TestCase):
|
|
|
|
def setUp(self):
|
|
super(TestNetworkWindows, self).setUp()
|
|
|
|
self._ctypes_mock = mock.MagicMock()
|
|
self._winreg_mock = mock.Mock()
|
|
self._win32com_mock = mock.Mock()
|
|
self._wmi_mock = mock.Mock()
|
|
|
|
self._module_patcher = mock.patch.dict(
|
|
'sys.modules',
|
|
{'ctypes': self._ctypes_mock,
|
|
'win32com': self._win32com_mock,
|
|
'wmi': self._wmi_mock,
|
|
'six.moves.winreg': self._winreg_mock})
|
|
|
|
self._module_patcher.start()
|
|
self._iphlpapi = mock.Mock()
|
|
self._kernel32 = mock.Mock()
|
|
self._ws2_32 = mock.Mock()
|
|
|
|
self._network_module = importlib.import_module(
|
|
'cloudinit.osys.windows.network')
|
|
self._network_module.iphlpapi = self._iphlpapi
|
|
self._network_module.kernel32 = self._kernel32
|
|
self._network_module.ws2_32 = self._ws2_32
|
|
|
|
self._network = self._network_module.Network()
|
|
|
|
def tearDown(self):
|
|
super(TestNetworkWindows, self).tearDown()
|
|
|
|
self._module_patcher.stop()
|
|
|
|
def _test__heap_alloc(self, fail):
|
|
mock_heap = mock.Mock()
|
|
mock_size = mock.Mock()
|
|
|
|
if fail:
|
|
self._kernel32.HeapAlloc.return_value = None
|
|
|
|
e = self.assertRaises(exceptions.CloudInitError,
|
|
self._network_module._heap_alloc,
|
|
mock_heap, mock_size)
|
|
|
|
self.assertEqual('Unable to allocate memory for the IP '
|
|
'forward table', str(e))
|
|
else:
|
|
result = self._network_module._heap_alloc(mock_heap, mock_size)
|
|
self.assertEqual(self._kernel32.HeapAlloc.return_value, result)
|
|
|
|
self._kernel32.HeapAlloc.assert_called_once_with(
|
|
mock_heap, 0, self._ctypes_mock.c_size_t(mock_size.value))
|
|
|
|
def test__heap_alloc_error(self):
|
|
self._test__heap_alloc(fail=True)
|
|
|
|
def test__heap_alloc_no_error(self):
|
|
self._test__heap_alloc(fail=False)
|
|
|
|
def _check_raises_forward(self):
|
|
with self._network._get_forward_table():
|
|
pass
|
|
|
|
def test__get_forward_table_no_memory(self):
|
|
self._network_module._heap_alloc = mock.Mock()
|
|
error_msg = 'Unable to allocate memory for the IP forward table'
|
|
exc = exceptions.CloudInitError(error_msg)
|
|
self._network_module._heap_alloc.side_effect = exc
|
|
|
|
e = self.assertRaises(exceptions.CloudInitError,
|
|
self._check_raises_forward)
|
|
|
|
self.assertEqual(error_msg, str(e))
|
|
self._network_module._heap_alloc.assert_called_once_with(
|
|
self._kernel32.GetProcessHeap.return_value,
|
|
self._ctypes_mock.wintypes.ULONG.return_value)
|
|
|
|
def test__get_forward_table_insufficient_buffer_no_memory(self):
|
|
self._kernel32.HeapAlloc.side_effect = (mock.sentinel.table_mem, None)
|
|
self._iphlpapi.GetIpForwardTable.return_value = (
|
|
self._iphlpapi.ERROR_INSUFFICIENT_BUFFER)
|
|
|
|
self.assertRaises(exceptions.CloudInitError,
|
|
self._check_raises_forward)
|
|
|
|
table = self._ctypes_mock.cast.return_value
|
|
self._iphlpapi.GetIpForwardTable.assert_called_once_with(
|
|
table,
|
|
self._ctypes_mock.byref.return_value, 0)
|
|
heap_calls = [
|
|
mock.call(self._kernel32.GetProcessHeap.return_value, 0, table),
|
|
mock.call(self._kernel32.GetProcessHeap.return_value, 0, table)
|
|
]
|
|
self.assertEqual(heap_calls, self._kernel32.HeapFree.mock_calls)
|
|
|
|
def _test__get_forward_table(self, reallocation=False,
|
|
insufficient_buffer=False,
|
|
fail=False):
|
|
if fail:
|
|
e = self.assertRaises(exceptions.CloudInitError,
|
|
self._check_raises_forward)
|
|
msg = ('Unable to get IP forward table. Error: %s'
|
|
% mock.sentinel.error)
|
|
self.assertEqual(msg, str(e))
|
|
else:
|
|
with self._network._get_forward_table() as table:
|
|
pass
|
|
pointer = self._ctypes_mock.POINTER(
|
|
self._iphlpapi.Win32_MIB_IPFORWARDTABLE)
|
|
expected_forward_table = self._ctypes_mock.cast(
|
|
self._kernel32.HeapAlloc.return_value, pointer)
|
|
self.assertEqual(expected_forward_table, table)
|
|
|
|
heap_calls = [
|
|
mock.call(self._kernel32.GetProcessHeap.return_value, 0,
|
|
self._ctypes_mock.cast.return_value)
|
|
]
|
|
forward_calls = [
|
|
mock.call(self._ctypes_mock.cast.return_value,
|
|
self._ctypes_mock.byref.return_value, 0),
|
|
]
|
|
if insufficient_buffer:
|
|
# We expect two calls for GetIpForwardTable
|
|
forward_calls.append(forward_calls[0])
|
|
if reallocation:
|
|
heap_calls.append(heap_calls[0])
|
|
self.assertEqual(heap_calls, self._kernel32.HeapFree.mock_calls)
|
|
self.assertEqual(forward_calls,
|
|
self._iphlpapi.GetIpForwardTable.mock_calls)
|
|
|
|
def test__get_forward_table_sufficient_buffer(self):
|
|
self._iphlpapi.GetIpForwardTable.return_value = None
|
|
self._test__get_forward_table()
|
|
|
|
def test__get_forward_table_insufficient_buffer_reallocate(self):
|
|
self._kernel32.HeapAlloc.side_effect = (
|
|
mock.sentinel.table_mem, mock.sentinel.table_mem)
|
|
self._iphlpapi.GetIpForwardTable.side_effect = (
|
|
self._iphlpapi.ERROR_INSUFFICIENT_BUFFER, None)
|
|
|
|
self._test__get_forward_table(reallocation=True,
|
|
insufficient_buffer=True)
|
|
|
|
def test__get_forward_table_insufficient_buffer_other_error(self):
|
|
self._kernel32.HeapAlloc.side_effect = (
|
|
mock.sentinel.table_mem, mock.sentinel.table_mem)
|
|
self._iphlpapi.GetIpForwardTable.side_effect = (
|
|
self._iphlpapi.ERROR_INSUFFICIENT_BUFFER, mock.sentinel.error)
|
|
|
|
self._test__get_forward_table(reallocation=True,
|
|
insufficient_buffer=True,
|
|
fail=True)
|
|
|
|
@mock.patch('cloudinit.osys.windows.network.Network.routes')
|
|
def test_default_gateway_no_gateway(self, mock_routes):
|
|
mock_routes.return_value = iter((mock.Mock(), mock.Mock()))
|
|
|
|
self.assertIsNone(self._network.default_gateway())
|
|
|
|
mock_routes.assert_called_once_with()
|
|
|
|
@mock.patch('cloudinit.osys.windows.network.Network.routes')
|
|
def test_default_gateway(self, mock_routes):
|
|
default_gateway = mock.Mock()
|
|
default_gateway.destination = '0.0.0.0'
|
|
mock_routes.return_value = iter((mock.Mock(), default_gateway))
|
|
|
|
gateway = self._network.default_gateway()
|
|
|
|
self.assertEqual(default_gateway, gateway)
|
|
|
|
def test_route_is_static(self):
|
|
bad_route = self._network_module.Route(
|
|
destination=None, netmask=None,
|
|
gateway=None, interface=None, metric=None,
|
|
flags=404)
|
|
good_route = self._network_module.Route(
|
|
destination=None, netmask=None,
|
|
gateway=None, interface=None, metric=None,
|
|
flags=self._network_module.MIB_IPPROTO_NETMGMT)
|
|
|
|
self.assertTrue(good_route.is_static)
|
|
self.assertFalse(bad_route.is_static)
|
|
|
|
@mock.patch('subprocess.Popen')
|
|
def _test_route_add(self, mock_popen, err):
|
|
mock_route = mock.Mock()
|
|
mock_route.destination = mock.sentinel.destination
|
|
mock_route.netmask = mock.sentinel.netmask
|
|
mock_route.gateway = mock.sentinel.gateway
|
|
args = ['ROUTE', 'ADD', mock.sentinel.destination,
|
|
'MASK', mock.sentinel.netmask,
|
|
mock.sentinel.gateway]
|
|
mock_popen.return_value.returncode = err
|
|
mock_popen.return_value.communicate.return_value = (None, err)
|
|
|
|
if err:
|
|
e = self.assertRaises(exceptions.CloudInitError,
|
|
self._network_module.Route.add,
|
|
mock_route)
|
|
msg = "Unable to add route: %s" % err
|
|
self.assertEqual(msg, str(e))
|
|
|
|
else:
|
|
self._network_module.Route.add(mock_route)
|
|
mock_popen.assert_called_once_with(args, shell=False,
|
|
stderr=subprocess.PIPE)
|
|
|
|
def test_route_add_fails(self):
|
|
self._test_route_add(err=1)
|
|
|
|
def test_route_add_works(self):
|
|
self._test_route_add(err=0)
|
|
|
|
@mock.patch('cloudinit.osys.windows.network.Network._get_forward_table')
|
|
def test_routes(self, mock_forward_table):
|
|
def _same(arg):
|
|
return arg._mock_name.encode()
|
|
|
|
route = mock.MagicMock()
|
|
mock_cast_result = mock.Mock()
|
|
mock_cast_result.contents = [route]
|
|
self._ctypes_mock.cast.return_value = mock_cast_result
|
|
self._network_module.ws2_32.Ws2_32.inet_ntoa.side_effect = _same
|
|
route.dwForwardIfIndex = 'dwForwardIfIndex'
|
|
route.dwForwardProto = 'dwForwardProto'
|
|
route.dwForwardMetric1 = 'dwForwardMetric1'
|
|
routes = self._network.routes()
|
|
|
|
mock_forward_table.assert_called_once_with()
|
|
enter = mock_forward_table.return_value.__enter__
|
|
enter.assert_called_once_with()
|
|
exit_ = mock_forward_table.return_value.__exit__
|
|
exit_.assert_called_once_with(None, None, None)
|
|
self.assertEqual(1, len(routes))
|
|
given_route = routes[0]
|
|
self.assertEqual('dwForwardDest', given_route.destination)
|
|
self.assertEqual('dwForwardNextHop', given_route.gateway)
|
|
self.assertEqual('dwForwardMask', given_route.netmask)
|
|
self.assertEqual('dwForwardIfIndex', given_route.interface)
|
|
self.assertEqual('dwForwardMetric1', given_route.metric)
|
|
self.assertEqual('dwForwardProto', given_route.flags)
|
|
|
|
@mock.patch('cloudinit.osys.base.get_osutils')
|
|
@mock.patch('cloudinit.osys.windows.network.Network.routes')
|
|
def test_set_metadata_ip_route_not_called(self, mock_routes,
|
|
mock_osutils):
|
|
general = mock_osutils.return_value.general
|
|
general.check_os_version.return_value = False
|
|
|
|
self._network.set_metadata_ip_route(mock.sentinel.url)
|
|
|
|
self.assertFalse(mock_routes.called)
|
|
general.check_os_version.assert_called_once_with(6, 0)
|
|
|
|
@mock.patch('cloudinit.osys.base.get_osutils')
|
|
@mock.patch('cloudinit.osys.windows.network.Network.routes')
|
|
def test_set_metadata_ip_route_not_invalid_url(self, mock_routes,
|
|
mock_osutils):
|
|
general = mock_osutils.return_value.general
|
|
general.check_os_version.return_value = True
|
|
|
|
self._network.set_metadata_ip_route("http://169.253.169.253")
|
|
|
|
self.assertFalse(mock_routes.called)
|
|
general.check_os_version.assert_called_once_with(6, 0)
|
|
|
|
@mock.patch('cloudinit.osys.base.get_osutils')
|
|
@mock.patch('cloudinit.osys.windows.network.Network.routes')
|
|
@mock.patch('cloudinit.osys.windows.network.Network.default_gateway')
|
|
def test_set_metadata_ip_route_route_already_exists(
|
|
self, mock_default_gateway, mock_routes, mock_osutils):
|
|
|
|
mock_route = mock.Mock()
|
|
mock_route.destination = "169.254.169.254"
|
|
mock_routes.return_value = (mock_route, )
|
|
|
|
self._network.set_metadata_ip_route("http://169.254.169.254")
|
|
|
|
self.assertTrue(mock_routes.called)
|
|
self.assertFalse(mock_default_gateway.called)
|
|
|
|
@mock.patch('cloudinit.osys.base.get_osutils')
|
|
@mock.patch('cloudinit.osys.windows.network._check_url')
|
|
@mock.patch('cloudinit.osys.windows.network.Network.routes')
|
|
@mock.patch('cloudinit.osys.windows.network.Network.default_gateway')
|
|
def test_set_metadata_ip_route_route_missing_url_accessible(
|
|
self, mock_default_gateway, mock_routes,
|
|
mock_check_url, mock_osutils):
|
|
|
|
mock_routes.return_value = ()
|
|
mock_check_url.return_value = True
|
|
|
|
self._network.set_metadata_ip_route("http://169.254.169.254")
|
|
|
|
self.assertTrue(mock_routes.called)
|
|
self.assertFalse(mock_default_gateway.called)
|
|
self.assertTrue(mock_osutils.called)
|
|
|
|
@mock.patch('cloudinit.osys.base.get_osutils')
|
|
@mock.patch('cloudinit.osys.windows.network._check_url')
|
|
@mock.patch('cloudinit.osys.windows.network.Network.routes')
|
|
@mock.patch('cloudinit.osys.windows.network.Network.default_gateway')
|
|
@mock.patch('cloudinit.osys.windows.network.Route')
|
|
def test_set_metadata_ip_route_no_default_gateway(
|
|
self, mock_Route, mock_default_gateway,
|
|
mock_routes, mock_check_url, mock_osutils):
|
|
|
|
mock_routes.return_value = ()
|
|
mock_check_url.return_value = False
|
|
mock_default_gateway.return_value = None
|
|
|
|
self._network.set_metadata_ip_route("http://169.254.169.254")
|
|
|
|
self.assertTrue(mock_osutils.called)
|
|
self.assertTrue(mock_routes.called)
|
|
self.assertTrue(mock_default_gateway.called)
|
|
self.assertFalse(mock_Route.called)
|
|
|
|
@mock.patch('cloudinit.osys.base.get_osutils')
|
|
@mock.patch('cloudinit.osys.windows.network._check_url')
|
|
@mock.patch('cloudinit.osys.windows.network.Network.routes')
|
|
@mock.patch('cloudinit.osys.windows.network.Network.default_gateway')
|
|
@mock.patch('cloudinit.osys.windows.network.Route')
|
|
def test_set_metadata_ip_route(
|
|
self, mock_Route, mock_default_gateway,
|
|
mock_routes, mock_check_url, mock_osutils):
|
|
|
|
mock_routes.return_value = ()
|
|
mock_check_url.return_value = False
|
|
|
|
with LogSnatcher('cloudinit.osys.windows.network') as snatcher:
|
|
self._network.set_metadata_ip_route("http://169.254.169.254")
|
|
|
|
expected = ['Setting gateway for host: 169.254.169.254']
|
|
self.assertEqual(expected, snatcher.output)
|
|
self.assertTrue(mock_routes.called)
|
|
self.assertTrue(mock_default_gateway.called)
|
|
mock_Route.assert_called_once_with(
|
|
destination="169.254.169.254",
|
|
netmask="255.255.255.255",
|
|
gateway=mock_default_gateway.return_value.gateway,
|
|
interface=None, metric=None)
|
|
mock_Route.add.assert_called_once_with(mock_Route.return_value)
|
|
self.assertTrue(mock_osutils.called)
|