Add support for IPv6 network configuration

* proper parsing of network content metadata, including v6 support
* v6 addresses static network configuration support (address, netmask, gateway)
* remove redundant code in networkconfig and osutils network related tests

Change-Id: Ieb2c18e29d0c275feb3a03e7340b3c91327705ba
This commit is contained in:
Cosmin Poieana 2015-04-22 13:35:39 +03:00
parent 917fef98ce
commit d96fc944cd
13 changed files with 491 additions and 197 deletions

View File

@ -44,9 +44,12 @@ NetworkDetails = collections.namedtuple(
"name",
"mac",
"address",
"address6",
"netmask",
"netmask6",
"broadcast",
"gateway",
"gateway6",
"dnsnameservers",
]
)

View File

@ -227,13 +227,17 @@ class OpenNebulaService(base.BaseMetadataService):
broadcast = self._compute_broadcast(address, netmask)
# gather them as namedtuple objects
details = base.NetworkDetails(
IF_FORMAT.format(iid=iid),
mac,
address,
netmask,
broadcast,
gateway,
self._get_cache_data(DNSNS, iid=iid).split(" ")
name=IF_FORMAT.format(iid=iid),
mac=mac,
address=address,
address6=None,
netmask=netmask,
netmask6=None,
broadcast=broadcast,
gateway=gateway,
gateway6=None,
dnsnameservers=self._get_cache_data(DNSNS,
iid=iid).split(" ")
)
except base.NotExistingMetadataException:
LOG.debug("Incomplete NIC details")

View File

@ -39,6 +39,10 @@ from cloudbaseinit.utils.windows import timezone
LOG = logging.getLogger(__name__)
AF_INET6 = 23
UNICAST = 1
MANUAL = 1
PREFERRED_ADDR = 4
advapi32 = ctypes.windll.advapi32
kernel32 = ctypes.windll.kernel32
@ -541,13 +545,13 @@ class WindowsUtils(base.BaseOSUtils):
broadcast, gateway, dnsnameservers):
conn = wmi.WMI(moniker='//./root/cimv2')
q = conn.query("SELECT * FROM Win32_NetworkAdapter WHERE "
"MACAddress = '{}'".format(mac_address))
if not len(q):
query = conn.query("SELECT * FROM Win32_NetworkAdapter WHERE "
"MACAddress = '{}'".format(mac_address))
if not len(query):
raise exception.CloudbaseInitException(
"Network adapter not found")
adapter_config = q[0].associators(
adapter_config = query[0].associators(
wmi_result_class='Win32_NetworkAdapterConfiguration')[0]
LOG.debug("Setting static IP address")
@ -575,6 +579,57 @@ class WindowsUtils(base.BaseOSUtils):
return reboot_required
def set_static_network_config_v6(self, mac_address, address6,
netmask6, gateway6):
"""Set IPv6 info for a network card."""
# Get local properties by MAC identification.
adapters = network.get_adapter_addresses()
for adapter in adapters:
if mac_address == adapter["mac_address"]:
ifname = adapter["friendly_name"]
ifindex = adapter["interface_index"]
break
else:
raise exception.CloudbaseInitException(
"Adapter with MAC {!r} not available".format(mac_address))
# TODO(cpoieana): Extend support for other platforms.
# Currently windows8 @ ws2012 or above.
if not self.check_os_version(6, 2):
LOG.warning("Setting IPv6 info not available "
"on this system")
return
conn = wmi.WMI(moniker='//./root/StandardCimv2')
query = conn.query("SELECT * FROM MSFT_NetIPAddress "
"WHERE InterfaceAlias = '{}'".format(ifname))
netip = query[0]
params = {
"InterfaceIndex": ifindex,
"InterfaceAlias": ifname,
"IPAddress": address6,
"AddressFamily": AF_INET6,
"PrefixLength": netmask6,
# Manual set type.
"Type": UNICAST,
"PrefixOrigin": MANUAL,
"SuffixOrigin": MANUAL,
"AddressState": PREFERRED_ADDR,
# No expiry.
"ValidLifetime": None,
"PreferredLifetime": None,
"SkipAsSource": False,
"DefaultGateway": gateway6,
"PolicyStore": None,
"PassThru": False,
}
LOG.debug("Setting IPv6 info for %s", ifname)
try:
netip.Create(**params)
except wmi.x_wmi as exc:
raise exception.CloudbaseInitException(exc.com_error)
def _get_config_key_name(self, section):
key_name = self._config_key
if section:

View File

@ -20,6 +20,7 @@ from cloudbaseinit.metadata.services import base as service_base
from cloudbaseinit.openstack.common import log as logging
from cloudbaseinit.osutils import factory as osutils_factory
from cloudbaseinit.plugins.common import base as plugin_base
from cloudbaseinit.utils import network
LOG = logging.getLogger(__name__)
@ -28,10 +29,10 @@ LOG = logging.getLogger(__name__)
# if the key is a tuple, then at least one field must exists.
NET_REQUIRE = {
("name", "mac"): True,
"address": True,
"netmask": True,
("address", "address6"): True,
("netmask", "netmask6"): True,
"broadcast": False, # currently not used
"gateway": False,
("gateway", "gateway6"): False,
"dnsnameservers": False
}
@ -76,30 +77,44 @@ def _preprocess_nics(network_details, network_adapters):
fields = (fields,)
final_status = any([getattr(nic, field) for field in fields])
if not final_status:
LOG.error("Incomplete NetworkDetails object %s", nic)
break
address, netmask = nic.address, nic.netmask
if final_status:
# Complete hardware address if missing by selecting
# the corresponding MAC in terms of naming, then ordering.
if not nic.mac:
# By name...
macs = [adapter[1] for adapter in network_adapters
if adapter[0] == nic.name]
mac = macs[0] if macs else None
# ...or by order.
idx = _name2idx(nic.name)
if not mac and idx < total:
mac = network_adapters[idx][1]
nic = service_base.NetworkDetails(
nic.name,
mac,
nic.address,
nic.netmask,
nic.broadcast,
nic.gateway,
nic.dnsnameservers
)
refined_network_details.append(nic)
# Additional check for info version.
if not (address and netmask):
final_status = nic.address6 and nic.netmask6
if final_status:
address = address or network.address6_to_4_truncate(
nic.address6)
netmask = netmask or network.netmask6_to_4_truncate(
nic.netmask6)
if not final_status:
LOG.error("Incomplete NetworkDetails object %s", nic)
continue
# Complete hardware address if missing by selecting
# the corresponding MAC in terms of naming, then ordering.
if not nic.mac:
# By name...
macs = [adapter[1] for adapter in network_adapters
if adapter[0] == nic.name]
mac = macs[0] if macs else None
# ...or by order.
idx = _name2idx(nic.name)
if not mac and idx < total:
mac = network_adapters[idx][1]
nic = service_base.NetworkDetails(
nic.name,
mac,
address,
nic.address6,
netmask,
nic.netmask6,
nic.broadcast,
nic.gateway,
nic.gateway6,
nic.dnsnameservers
)
refined_network_details.append(nic)
return refined_network_details
@ -109,18 +124,18 @@ class NetworkConfigPlugin(plugin_base.BasePlugin):
osutils = osutils_factory.get_os_utils()
network_details = service.get_network_details()
if not network_details:
return (plugin_base.PLUGIN_EXECUTION_DONE, False)
return plugin_base.PLUGIN_EXECUTION_DONE, False
# check and save NICs by MAC
# Check and save NICs by MAC.
network_adapters = osutils.get_network_adapters()
network_details = _preprocess_nics(network_details,
network_adapters)
macnics = {}
for nic in network_details:
# assuming that the MAC address is unique
# Assuming that the MAC address is unique.
macnics[nic.mac] = nic
# try configuring all the available adapters
# Try configuring all the available adapters.
adapter_macs = [pair[1] for pair in network_adapters]
reboot_required = False
configured = False
@ -139,10 +154,18 @@ class NetworkConfigPlugin(plugin_base.BasePlugin):
nic.dnsnameservers
)
reboot_required = reboot or reboot_required
# Set v6 info too if available.
if nic.address6 and nic.netmask6:
osutils.set_static_network_config_v6(
mac,
nic.address6,
nic.netmask6,
nic.gateway6
)
configured = True
for mac in macnics:
LOG.warn("Details not used for adapter %s", mac)
if not configured:
LOG.error("No adapters were configured")
return (plugin_base.PLUGIN_EXECUTION_DONE, reboot_required)
return plugin_base.PLUGIN_EXECUTION_DONE, reboot_required

View File

@ -16,16 +16,22 @@
NAME0 = "eth0"
MAC0 = "fa:16:3e:2d:ec:cd"
ADDRESS0 = "10.0.0.15"
ADDRESS60 = "2001:db8::3"
NETMASK0 = "255.255.255.0"
NETMASK60 = "64"
BROADCAST0 = "10.0.0.255"
GATEWAY0 = "10.0.0.1"
GATEWAY60 = "2001:db8::1"
DNSNS0 = "208.67.220.220 208.67.222.222"
NAME1 = "eth1"
ADDRESS1 = "10.1.0.2"
ADDRESS61 = "::ffff:a00:1"
NETMASK1 = "255.255.255.0"
NETMASK61 = None
BROADCAST1 = "10.1.0.255"
GATEWAY1 = "10.1.0.1"
GATEWAY61 = "2001::ffff:a00:1b"
def get_fake_metadata_json(version):
@ -67,6 +73,8 @@ def get_fake_metadata_json(version):
},
"network_config": {
"content_path": "network",
# This is not actually in the metadata json file,
# but is present here for the ease of reading such information.
"debian_config": ("""
# Injected by Nova on instance boot
#
@ -85,6 +93,8 @@ iface {name0} inet static
broadcast {broadcast0}
gateway {gateway0}
dns-nameservers {dnsns0}
post-up ip -6 addr add {address60}/{netmask60} dev {name0}
post-up ip -6 route add default via {gateway60} dev {name0}
auto {name1}
iface {name1} inet static
@ -92,19 +102,30 @@ iface {name1} inet static
netmask {netmask1}
broadcast {broadcast1}
gateway {gateway1}
""").format(name0=NAME0, # eth0 (IPv4)
iface eth2 inet6 static
address {address61}
netmask {netmask61}
gateway {gateway61}
""").format(name0=NAME0, # eth0 (IPv4/6)
mac0=MAC0,
address0=ADDRESS0,
broadcast0=BROADCAST0,
netmask0=NETMASK0,
gateway0=GATEWAY0,
dnsns0=DNSNS0,
# eth1 (IPv4)
address60=ADDRESS60,
netmask60=NETMASK60,
gateway60=GATEWAY60,
# eth1 (IPv4/6)
name1=NAME1,
address1=ADDRESS1,
broadcast1=BROADCAST1,
netmask1=NETMASK1,
gateway1=GATEWAY1)
gateway1=GATEWAY1,
address61=ADDRESS61,
netmask61=NETMASK61,
gateway61=GATEWAY61
)
}
}

View File

@ -186,25 +186,31 @@ class TestBaseOpenStackService(unittest.TestCase):
self.assertIsNone(ret)
return
# check returned NICs details
nic1 = base.NetworkDetails(
nic0 = base.NetworkDetails(
fake_json_response.NAME0,
fake_json_response.MAC0.upper(),
fake_json_response.ADDRESS0,
fake_json_response.ADDRESS60,
fake_json_response.NETMASK0,
fake_json_response.NETMASK60,
fake_json_response.BROADCAST0,
fake_json_response.GATEWAY0,
fake_json_response.GATEWAY60,
fake_json_response.DNSNS0.split()
)
nic2 = base.NetworkDetails(
nic1 = base.NetworkDetails(
fake_json_response.NAME1,
None,
fake_json_response.ADDRESS1,
fake_json_response.ADDRESS61,
fake_json_response.NETMASK1,
fake_json_response.NETMASK61,
fake_json_response.BROADCAST1,
fake_json_response.GATEWAY1,
fake_json_response.GATEWAY61,
None
)
self.assertEqual([nic1, nic2], ret)
self.assertEqual([nic0, nic1], ret)
def test_get_network_details_no_config(self):
self._partial_test_get_network_details(

View File

@ -105,9 +105,12 @@ def _get_nic_details(iid=0):
opennebulaservice.IF_FORMAT.format(iid=iid),
MAC,
ADDRESS,
None,
NETMASK,
None,
BROADCAST,
GATEWAY,
None,
DNSNS.split(" ")
)
return details
@ -307,9 +310,9 @@ class TestLoadedOpenNebulaService(_TestOpenNebulaService):
def test_multiple_nics(self):
self.load_context(context=CONTEXT2)
nic1 = _get_nic_details(iid=0)
nic2 = _get_nic_details(iid=1)
network_details = [nic1, nic2]
nic0 = _get_nic_details(iid=0)
nic1 = _get_nic_details(iid=1)
network_details = [nic0, nic1]
self.assertEqual(
network_details,
self._service.get_network_details()

View File

@ -13,6 +13,7 @@
# under the License.
import functools
import importlib
import os
@ -30,6 +31,11 @@ from cloudbaseinit.tests import testutils
CONF = cfg.CONF
class WMIError(Exception):
com_error = "fake data"
class TestWindowsUtils(testutils.CloudbaseInitTestBase):
'''Tests for the windows utils class.'''
@ -48,6 +54,7 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase):
self._win32process_mock = mock.MagicMock()
self._win32security_mock = mock.MagicMock()
self._wmi_mock = mock.MagicMock()
self._wmi_mock.x_wmi = WMIError
self._moves_mock = mock.MagicMock()
self._xmlrpc_client_mock = mock.MagicMock()
self._ctypes_mock = mock.MagicMock()
@ -529,121 +536,163 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase):
def test_get_network_adapters_xp_2003(self):
self._test_get_network_adapters(True)
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils'
'._sanitize_wmi_input')
def _test_set_static_network_config(self, mock_sanitize_wmi_input,
adapter, ret_val1=None,
ret_val2=None, ret_val3=None):
def _test_set_static_network_config(self, adapter=True, static_val=(0,),
gateway_val=(0,), dns_val=(0,)):
conn = self._wmi_mock.WMI
address = '10.10.10.10'
mac_address = '54:EE:75:19:F4:61'
address = '10.10.10.10'
broadcast = '0.0.0.0'
dns_list = ['8.8.8.8']
set_static_call = functools.partial(
self._winutils.set_static_network_config,
mac_address, address, self._NETMASK,
broadcast, self._GATEWAY, dns_list
)
if not adapter:
if adapter:
adapter = mock.MagicMock()
else:
self.assertRaises(
exception.CloudbaseInitException,
self._winutils.set_static_network_config,
mac_address, address, self._NETMASK,
broadcast, self._GATEWAY, dns_list)
set_static_call
)
return
conn.return_value.query.return_value = adapter
adapter_config = adapter[0].associators.return_value[0]
adapter_config.EnableStatic.return_value = static_val
adapter_config.SetGateways.return_value = gateway_val
adapter_config.SetDNSServerSearchOrder.return_value = dns_val
adapter.__len__.return_value = 1
if static_val[0] > 1 or gateway_val[0] > 1 or dns_val[0] > 1:
self.assertRaises(
exception.CloudbaseInitException,
set_static_call)
else:
conn.return_value.query.return_value = adapter
adapter_config = adapter[0].associators()[0]
adapter_config.EnableStatic.return_value = ret_val1
adapter_config.SetGateways.return_value = ret_val2
adapter_config.SetDNSServerSearchOrder.return_value = ret_val3
adapter.__len__.return_value = 1
if ret_val1[0] > 1:
self.assertRaises(
exception.CloudbaseInitException,
self._winutils.set_static_network_config,
mac_address, address, self._NETMASK,
broadcast, self._GATEWAY, dns_list)
elif ret_val2[0] > 1:
self.assertRaises(
exception.CloudbaseInitException,
self._winutils.set_static_network_config,
mac_address, address, self._NETMASK,
broadcast, self._GATEWAY, dns_list)
elif ret_val3[0] > 1:
self.assertRaises(
exception.CloudbaseInitException,
self._winutils.set_static_network_config,
mac_address, address, self._NETMASK,
broadcast, self._GATEWAY, dns_list)
response = set_static_call()
if static_val[0] or gateway_val[0] or dns_val[0]:
self.assertTrue(response)
else:
response = self._winutils.set_static_network_config(
mac_address, address, self._NETMASK,
broadcast, self._GATEWAY, dns_list)
self.assertFalse(response)
if ret_val1[0] or ret_val2[0] or ret_val3[0] == 1:
self.assertTrue(response)
else:
self.assertFalse(response)
adapter_config.EnableStatic.assert_called_with(
[address], [self._NETMASK])
adapter_config.SetGateways.assert_called_with(
[self._GATEWAY], [1])
adapter_config.SetDNSServerSearchOrder.assert_called_with(
dns_list)
select = ("SELECT * FROM Win32_NetworkAdapter WHERE "
"MACAddress = '{}'".format(mac_address))
conn.return_value.query.assert_called_once_with(select)
adapter[0].associators.assert_called_with(
wmi_result_class='Win32_NetworkAdapterConfiguration')
adapter_config.EnableStatic.assert_called_with(
[address], [self._NETMASK])
adapter_config.SetGateways.assert_called_with(
[self._GATEWAY], [1])
adapter_config.SetDNSServerSearchOrder.assert_called_with(
dns_list)
adapter[0].associators.assert_called_with(
wmi_result_class='Win32_NetworkAdapterConfiguration')
conn.return_value.query.assert_called_with(
"SELECT * FROM Win32_NetworkAdapter WHERE "
"MACAddress = '{}'".format(mac_address)
)
@mock.patch("cloudbaseinit.utils.windows.network"
".get_adapter_addresses")
def _test_set_static_network_config_v6(self, mock_get_adapter_addresses,
v6adapters=True, v6error=False):
friendly_name = "Ethernet0"
interface_index = "4"
mac_address = '54:EE:75:19:F4:61'
address6 = "2001:db8::3"
netmask6 = "64"
gateway6 = "2001:db8::1"
conn = self._wmi_mock.WMI
netip = conn.return_value.query.return_value[0]
if v6error:
netip.Create.side_effect = WMIError
adapter_addresses = []
if v6adapters:
adapter_addresses = [
{
"mac_address": mac_address,
"friendly_name": friendly_name,
"interface_index": interface_index
}
]
mock_get_adapter_addresses.return_value = adapter_addresses
set_static_call = functools.partial(
self._winutils.set_static_network_config_v6,
mac_address, address6, netmask6, gateway6)
if not v6adapters or v6error:
self.assertRaises(
exception.CloudbaseInitException,
set_static_call)
else:
set_static_call()
mock_get_adapter_addresses.assert_called_once_with()
select = ("SELECT * FROM MSFT_NetIPAddress "
"WHERE InterfaceAlias = '{}'".format(friendly_name))
conn.return_value.query.assert_called_once_with(select)
params = {
"InterfaceIndex": interface_index,
"InterfaceAlias": friendly_name,
"IPAddress": address6,
"AddressFamily": self.windows_utils.AF_INET6,
"PrefixLength": netmask6,
# Manual set type.
"Type": self.windows_utils.UNICAST,
"PrefixOrigin": self.windows_utils.MANUAL,
"SuffixOrigin": self.windows_utils.MANUAL,
"AddressState": self.windows_utils.PREFERRED_ADDR,
# No expiry.
"ValidLifetime": None,
"PreferredLifetime": None,
"SkipAsSource": False,
"DefaultGateway": gateway6,
"PolicyStore": None,
"PassThru": False,
}
netip.Create.assert_called_once_with(**params)
def test_set_static_network_config(self):
adapter = mock.MagicMock()
ret_val1 = (1,)
ret_val2 = (1,)
ret_val3 = (0,)
self._test_set_static_network_config(adapter=adapter,
ret_val1=ret_val1,
ret_val2=ret_val2,
ret_val3=ret_val3)
self._test_set_static_network_config(static_val=ret_val1,
gateway_val=ret_val2,
dns_val=ret_val3)
def test_set_static_network_config_query_fail(self):
self._test_set_static_network_config(adapter=None)
self._test_set_static_network_config(adapter=False)
def test_set_static_network_config_cannot_set_ip(self):
adapter = mock.MagicMock()
ret_val1 = (2,)
self._test_set_static_network_config(adapter=adapter,
ret_val1=ret_val1)
self._test_set_static_network_config(static_val=ret_val1)
def test_set_static_network_config_cannot_set_gateway(self):
adapter = mock.MagicMock()
ret_val1 = (1,)
ret_val2 = (2,)
self._test_set_static_network_config(adapter=adapter,
ret_val1=ret_val1,
ret_val2=ret_val2)
self._test_set_static_network_config(static_val=ret_val1,
gateway_val=ret_val2)
def test_set_static_network_config_cannot_set_DNS(self):
adapter = mock.MagicMock()
ret_val1 = (1,)
ret_val2 = (1,)
ret_val3 = (2,)
self._test_set_static_network_config(adapter=adapter,
ret_val1=ret_val1,
ret_val2=ret_val2,
ret_val3=ret_val3)
self._test_set_static_network_config(static_val=ret_val1,
gateway_val=ret_val2,
dns_val=ret_val3)
def test_set_static_network_config_no_reboot(self):
adapter = mock.MagicMock()
ret_val1 = (0,)
ret_val2 = (0,)
ret_val3 = (0,)
self._test_set_static_network_config(adapter=adapter,
ret_val1=ret_val1,
ret_val2=ret_val2,
ret_val3=ret_val3)
self._test_set_static_network_config(static_val=ret_val1,
gateway_val=ret_val2,
dns_val=ret_val3)
def test_set_static_network_config_v6(self):
self._test_set_static_network_config_v6()
def test_set_static_network_config_v6_no_adapters(self):
self._test_set_static_network_config_v6(v6adapters=False)
def test_set_static_network_config_v6_error(self):
self._test_set_static_network_config_v6(v6error=True)
def _test_get_config_key_name(self, section):
response = self._winutils._get_config_key_name(section)

View File

@ -39,7 +39,7 @@ class TestNetworkConfigPlugin(unittest.TestCase):
invalid_details=False,
missed_adapters=[],
extra_network_details=[]):
# prepare mock environment
# Prepare mock environment.
mock_service = mock.MagicMock()
mock_shared_data = mock.Mock()
mock_osutils = mock.MagicMock()
@ -51,22 +51,18 @@ class TestNetworkConfigPlugin(unittest.TestCase):
self._network_plugin.execute,
mock_service, mock_shared_data
)
# actual tests
# Actual tests.
if not network_details:
ret = network_execute()
self.assertEqual((plugin_base.PLUGIN_EXECUTION_DONE, False), ret)
return
if invalid_details:
if invalid_details or not network_adapters:
with self.assertRaises(exception.CloudbaseInitException):
network_execute()
return
if not network_adapters:
with self.assertRaises(exception.CloudbaseInitException):
network_execute()
return
# good to go for the configuration process
# Good to go for the configuration process.
ret = network_execute()
calls = []
calls, calls6 = [], []
for adapter in set(network_adapters) - set(missed_adapters):
nics = [nic for nic in (network_details +
extra_network_details)
@ -81,9 +77,25 @@ class TestNetworkConfigPlugin(unittest.TestCase):
nic.gateway,
nic.dnsnameservers
)
call6 = mock.call(
nic.mac,
nic.address6,
nic.netmask6,
nic.gateway6
)
calls.append(call)
if nic.address6 and nic.netmask6:
calls6.append(call6)
self.assertEqual(
len(calls),
mock_osutils.set_static_network_config.call_count)
self.assertEqual(
len(calls6),
mock_osutils.set_static_network_config_v6.call_count)
mock_osutils.set_static_network_config.assert_has_calls(
calls, any_order=True)
mock_osutils.set_static_network_config_v6.assert_has_calls(
calls6, any_order=True)
reboot = len(missed_adapters) != self._count
self.assertEqual((plugin_base.PLUGIN_EXECUTION_DONE, reboot), ret)
@ -108,11 +120,21 @@ class TestNetworkConfigPlugin(unittest.TestCase):
"192.168.103.104",
"192.168.122.105",
]
addresses6 = [
"::ffff:c0a8:7a65",
"::ffff:c0a8:6768",
"::ffff:c0a8:7a69"
]
netmasks = [
"255.255.255.0",
"255.255.0.0",
"255.255.255.128",
]
netmasks6 = [
"96",
"64",
"100"
]
broadcasts = [
"192.168.122.255",
"192.168.255.255",
@ -123,6 +145,11 @@ class TestNetworkConfigPlugin(unittest.TestCase):
"192.168.122.16",
"192.168.122.32",
]
gateways6 = [
"::ffff:c0a8:7a01",
"::ffff:c0a8:7a10",
"::ffff:c0a8:7a20"
]
dnsnses = [
"8.8.8.8",
"8.8.8.8 8.8.4.4",
@ -136,16 +163,19 @@ class TestNetworkConfigPlugin(unittest.TestCase):
details_names[ind],
None if no_macs else macs[ind],
addresses[ind],
addresses6[ind],
netmasks[ind],
netmasks6[ind],
broadcasts[ind],
gateways[ind],
gateways6[ind],
dnsnses[ind].split()
)
self._network_adapters.append(adapter)
self._network_details.append(nic)
# get the network config plugin
# Get the network config plugin.
self._network_plugin = networkconfig.NetworkConfigPlugin()
# execution wrapper
# Execution wrapper.
self._partial_test_execute = functools.partial(
self._test_execute,
network_adapters=self._network_adapters,
@ -189,36 +219,44 @@ class TestNetworkConfigPlugin(unittest.TestCase):
nic.name,
"00" + nic.mac[2:],
nic.address,
nic.address6,
nic.netmask,
nic.netmask6,
nic.broadcast,
nic.gateway,
nic.gateway6,
nic.dnsnameservers
)
self._network_details[:] = [nic]
self._partial_test_execute(missed_adapters=self._network_adapters)
def _test_execute_missing_smth(self, name=False, mac=False,
address=False, gateway=False,
fail=False):
address=False, address6=False,
netmask=False, netmask6=False,
gateway=False, fail=False):
ind = self._count - 1
nic = self._network_details[ind]
nic2 = service_base.NetworkDetails(
None if name else nic.name,
None if mac else nic.mac,
None if address else nic.address,
nic.netmask,
None if address6 else nic.address6,
None if netmask else nic.netmask,
None if netmask6 else nic.netmask6,
nic.broadcast,
None if gateway else nic.gateway,
None if gateway else nic.gateway6,
nic.dnsnameservers
)
self._network_details[ind] = nic2
# excluding address and gateway switches...
# Excluding address and gateway switches...
if not fail:
# even this way, all adapters should be configured
# Even this way, all adapters should be configured.
missed_adapters = []
extra_network_details = [nic]
else:
# both name and MAC are missing, so we can't make the match
# Both name and MAC are missing, so we can't make the match.
# Or other vital details.
missed_adapters = [self._network_adapters[ind]]
extra_network_details = []
self._partial_test_execute(
@ -237,7 +275,20 @@ class TestNetworkConfigPlugin(unittest.TestCase):
self._test_execute_missing_smth(name=True, mac=True, fail=True)
def test_execute_missing_address(self):
self._test_execute_missing_smth(address=True, fail=True)
self._test_execute_missing_smth(address=True)
def test_execute_missing_netmask(self):
self._test_execute_missing_smth(netmask=True)
def test_execute_missing_address6(self):
self._test_execute_missing_smth(address6=True)
def test_execute_missing_netmask6(self):
self._test_execute_missing_smth(netmask6=True)
def test_execute_missing_address_netmask6(self):
self._test_execute_missing_smth(address=True, netmask6=True,
fail=True)
def test_execute_missing_gateway(self):
self._test_execute_missing_smth(gateway=True)

View File

@ -37,18 +37,24 @@ class TestInterfacesParser(unittest.TestCase):
fake_json_response.NAME0,
fake_json_response.MAC0.upper(),
fake_json_response.ADDRESS0,
fake_json_response.ADDRESS60,
fake_json_response.NETMASK0,
fake_json_response.NETMASK60,
fake_json_response.BROADCAST0,
fake_json_response.GATEWAY0,
fake_json_response.GATEWAY60,
fake_json_response.DNSNS0.split()
)
nic1 = service_base.NetworkDetails(
fake_json_response.NAME1,
None,
fake_json_response.ADDRESS1,
fake_json_response.ADDRESS61,
fake_json_response.NETMASK1,
fake_json_response.NETMASK61,
fake_json_response.BROADCAST1,
fake_json_response.GATEWAY1,
fake_json_response.GATEWAY61,
None
)
self.assertEqual([nic0, nic1], nics)

View File

@ -56,3 +56,25 @@ class NetworkUtilsTest(unittest.TestCase):
def test_test_check_metadata_ip_route_fail(self):
self._test_check_metadata_ip_route(side_effect=Exception)
def test_address6_to_4_truncate(self):
address_map = {
"0:0:0:0:0:ffff:c0a8:f": "192.168.0.15",
"::ffff:c0a8:e": "192.168.0.14",
"::1": "0.0.0.1",
"1:2:3:4:5::8": "0.0.0.8",
"::": "0.0.0.0",
"::7f00:1": "127.0.0.1"
}
for v6, v4 in address_map.items():
self.assertEqual(v4, network.address6_to_4_truncate(v6))
def test_netmask6_to_4_truncate(self):
netmask_map = {
"128": "255.255.255.255",
"96": "255.255.255.0",
"0": "0.0.0.0",
"100": "255.255.255.128"
}
for v6, v4 in netmask_map.items():
self.assertEqual(v4, network.netmask6_to_4_truncate(v6))

View File

@ -26,75 +26,105 @@ LOG = logging.getLogger(__name__)
NAME = "name"
MAC = "mac"
ADDRESS = "address"
ADDRESS6 = "address6"
NETMASK = "netmask"
NETMASK6 = "netmask6"
BROADCAST = "broadcast"
GATEWAY = "gateway"
GATEWAY6 = "gateway6"
DNSNS = "dnsnameservers"
# fields of interest (order and regexp)
# Fields of interest by regexps.
FIELDS = {
NAME: (0, re.compile(r"iface\s+(?P<{}>\S+)"
r"\s+inet\s+static".format(NAME))),
MAC: (1, re.compile(r"hwaddress\s+ether\s+"
r"(?P<{}>\S+)".format(MAC))),
ADDRESS: (2, re.compile(r"address\s+"
r"(?P<{}>\S+)".format(ADDRESS))),
NETMASK: (3, re.compile(r"netmask\s+"
r"(?P<{}>\S+)".format(NETMASK))),
BROADCAST: (4, re.compile(r"broadcast\s+"
r"(?P<{}>\S+)".format(BROADCAST))),
GATEWAY: (5, re.compile(r"gateway\s+"
r"(?P<{}>\S+)".format(GATEWAY))),
DNSNS: (6, re.compile(r"dns-nameservers\s+(?P<{}>.+)".format(DNSNS)))
NAME: re.compile(r"iface\s+(?P<{}>\S+)"
r"\s+inet6?\s+static".format(NAME)),
MAC: re.compile(r"hwaddress\s+ether\s+"
r"(?P<{}>\S+)".format(MAC)),
ADDRESS: re.compile(r"address\s+"
r"(?P<{}>\S+)".format(ADDRESS)),
ADDRESS6: re.compile(r"post-up ip -6 addr add (?P<{}>[^/]+)/"
r"(\d+) dev".format(ADDRESS6)),
NETMASK: re.compile(r"netmask\s+"
r"(?P<{}>\S+)".format(NETMASK)),
NETMASK6: re.compile(r"post-up ip -6 addr add ([^/]+)/"
r"(?P<{}>\d+) dev".format(NETMASK6)),
BROADCAST: re.compile(r"broadcast\s+"
r"(?P<{}>\S+)".format(BROADCAST)),
GATEWAY: re.compile(r"gateway\s+"
r"(?P<{}>\S+)".format(GATEWAY)),
GATEWAY6: re.compile(r"post-up ip -6 route add default via "
r"(?P<{}>.+) dev".format(GATEWAY6)),
DNSNS: re.compile(r"dns-nameservers\s+(?P<{}>.+)".format(DNSNS))
}
IFACE_TEMPLATE = dict.fromkeys(range(len(FIELDS)))
IFACE_TEMPLATE = dict.fromkeys(FIELDS.keys())
# Map IPv6 availability by value index under `NetworkDetails`.
V6_PROXY = {
ADDRESS: ADDRESS6,
NETMASK: NETMASK6,
GATEWAY: GATEWAY6
}
DETAIL_PREPROCESS = {
MAC: lambda value: value.upper(),
DNSNS: lambda value: value.strip().split()
}
def _get_iface_blocks(data):
""""Yield interface blocks as pairs of v4 and v6 halves."""
lines, lines6 = [], []
crt_lines = lines
for line in data.splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
if "iface" in line:
if "inet6" in line:
crt_lines = lines6
continue
if lines:
yield lines, lines6
lines[:] = []
lines6[:] = []
crt_lines = lines
crt_lines.append(line)
if lines:
yield lines, lines6
def _get_field(line):
for field, (index, regex) in FIELDS.items():
for field, regex in FIELDS.items():
match = regex.match(line)
if match:
return index, match.group(field)
yield field, match.group(field)
def _add_nic(iface, nics):
if not iface:
return
details = [iface[key] for key in sorted(iface)]
LOG.debug("Found new interface: %s", details)
# each missing detail is marked as None
nic = service_base.NetworkDetails(*details)
if not iface or iface == IFACE_TEMPLATE:
return # no information gathered
LOG.debug("Found new interface: %s", iface)
# Each missing detail is marked as None.
nic = service_base.NetworkDetails(**iface)
nics.append(nic)
def parse(data):
"""Parse the received content and obtain network details."""
# TODO(cpoieana): support IPv6 flavors
if not data or not isinstance(data, six.string_types):
LOG.error("Invalid debian config to parse:\n%s", data)
LOG.error("Invalid Debian config to parse:\n%s", data)
return
LOG.info("Parsing debian config...\n%s", data)
LOG.info("Parsing Debian config...\n%s", data)
nics = [] # list of NetworkDetails objects
iface = {}
# take each line and process it
for line in data.split("\n"):
line = line.strip()
if not line or line.startswith("#"):
continue
ret = _get_field(line)
if not ret:
continue
# save the detail
index = ret[0]
if index == 0:
# we found a new interface
_add_nic(iface, nics)
iface = IFACE_TEMPLATE.copy()
value = ret[1]
if index == 1:
value = value.upper()
elif index == 6:
value = value.strip().split()
iface[index] = value
# also add the last one
_add_nic(iface, nics)
for lines_pair in _get_iface_blocks(data):
iface = IFACE_TEMPLATE.copy()
for lines, use_proxy in zip(lines_pair, (False, True)):
for line in lines:
for field, value in _get_field(line):
if use_proxy:
field = V6_PROXY.get(field)
if not field:
continue
func = DETAIL_PREPROCESS.get(field, lambda value: value)
iface[field] = func(value) if value != "None" else None
_add_nic(iface, nics)
return nics

View File

@ -12,6 +12,10 @@
# License for the specific language governing permissions and limitations
# under the License.
import binascii
import socket
import struct
import sys
from six.moves.urllib import parse
@ -62,3 +66,20 @@ def check_metadata_ip_route(metadata_url):
except Exception as ex:
# Ignore it
LOG.exception(ex)
def address6_to_4_truncate(address6):
"""Try to obtain IPv4 address from version 6."""
chunks = address6.split(":")
hi, lo = chunks[-2], chunks[-1]
network_address = binascii.unhexlify(hi.zfill(4) + lo.zfill(4))
return socket.inet_ntoa(network_address)
def netmask6_to_4_truncate(netmask6):
"""Try to obtain IPv4 netmask from version 6."""
# Harsh 128bit to 32bit.
length = int(int(netmask6) / 4)
mask = "1" * length + "0" * (32 - length)
network_address = struct.pack("!L", int(mask, 2))
return socket.inet_ntoa(network_address)