Add functions to look for port IP addresses
This also undate looking for server IP address by type ('fixed' or 'floating') Change-Id: I08f9608ef6e66d049615fda3b988ff0e43454f7d
This commit is contained in:
parent
5e97f9e10c
commit
4cb7bf9722
@ -17,6 +17,7 @@ from tobiko.openstack.neutron import _agent
|
|||||||
from tobiko.openstack.neutron import _client
|
from tobiko.openstack.neutron import _client
|
||||||
from tobiko.openstack.neutron import _cidr
|
from tobiko.openstack.neutron import _cidr
|
||||||
from tobiko.openstack.neutron import _extension
|
from tobiko.openstack.neutron import _extension
|
||||||
|
from tobiko.openstack.neutron import _port
|
||||||
|
|
||||||
|
|
||||||
neutron_client = _client.neutron_client
|
neutron_client = _client.neutron_client
|
||||||
@ -45,3 +46,6 @@ has_networking_extensions = _extension.has_networking_extensions
|
|||||||
skip_if_missing_networking_extensions = (
|
skip_if_missing_networking_extensions = (
|
||||||
_extension.skip_if_missing_networking_extensions)
|
_extension.skip_if_missing_networking_extensions)
|
||||||
skip_if_missing_networking_agents = _agent.skip_if_missing_networking_agents
|
skip_if_missing_networking_agents = _agent.skip_if_missing_networking_agents
|
||||||
|
|
||||||
|
find_port_ip_address = _port.find_port_ip_address
|
||||||
|
list_port_ip_addresses = _port.list_port_ip_addresses
|
||||||
|
43
tobiko/openstack/neutron/_port.py
Normal file
43
tobiko/openstack/neutron/_port.py
Normal file
@ -0,0 +1,43 @@
|
|||||||
|
# Copyright 2019 Red Hat
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
from __future__ import absolute_import
|
||||||
|
|
||||||
|
import netaddr
|
||||||
|
|
||||||
|
import tobiko
|
||||||
|
from tobiko.shell import ping
|
||||||
|
|
||||||
|
|
||||||
|
def list_port_ip_addresses(port, subnet_id=None, ip_version=None,
|
||||||
|
check_connectivity=False, ssh_client=None):
|
||||||
|
selected_addresses = []
|
||||||
|
for fixed_ip in port['fixed_ips']:
|
||||||
|
if subnet_id and subnet_id != fixed_ip['subnet_id']:
|
||||||
|
continue
|
||||||
|
ip_address = netaddr.IPAddress(fixed_ip['ip_address'])
|
||||||
|
if ip_version and ip_version != ip_address.version:
|
||||||
|
continue
|
||||||
|
if check_connectivity and not ping.ping(
|
||||||
|
host=ip_address, ssh_client=ssh_client).received:
|
||||||
|
continue
|
||||||
|
selected_addresses.append(ip_address)
|
||||||
|
return tobiko.Selection(selected_addresses)
|
||||||
|
|
||||||
|
|
||||||
|
def find_port_ip_address(port, unique=False, **kwargs):
|
||||||
|
addresses = list_port_ip_addresses(port=port, **kwargs)
|
||||||
|
if unique:
|
||||||
|
return addresses.unique
|
||||||
|
else:
|
||||||
|
return addresses.first
|
@ -13,37 +13,54 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
from __future__ import absolute_import
|
from __future__ import absolute_import
|
||||||
|
|
||||||
|
from oslo_log import log
|
||||||
import netaddr
|
import netaddr
|
||||||
|
|
||||||
import tobiko
|
import tobiko
|
||||||
from tobiko.shell import ping
|
from tobiko.shell import ping
|
||||||
|
|
||||||
|
|
||||||
|
LOG = log.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def list_server_ip_addresses(server, network_name=None, ip_version=None,
|
def list_server_ip_addresses(server, network_name=None, ip_version=None,
|
||||||
check_connectivity=False):
|
address_type=None, check_connectivity=False,
|
||||||
|
ssh_client=None):
|
||||||
selected_addresses = []
|
selected_addresses = []
|
||||||
for _network_name, addresses in server.addresses.items():
|
for _network_name, addresses in server.addresses.items():
|
||||||
|
# check network name
|
||||||
if network_name and network_name != _network_name:
|
if network_name and network_name != _network_name:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
for address in addresses:
|
for address in addresses:
|
||||||
|
# check IP address version
|
||||||
_ip_version = address['version']
|
_ip_version = address['version']
|
||||||
if ip_version and ip_version != _ip_version:
|
if ip_version and ip_version != _ip_version:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
# check IP address type
|
||||||
|
_address_type = address.get('OS-EXT-IPS:type')
|
||||||
|
if address_type and address_type != _address_type:
|
||||||
|
if _address_type is None:
|
||||||
|
LOG.warning('Unable to get address type of address %r',
|
||||||
|
address)
|
||||||
|
continue
|
||||||
|
|
||||||
ip_address = netaddr.IPAddress(address['addr'],
|
ip_address = netaddr.IPAddress(address['addr'],
|
||||||
version=_ip_version)
|
version=_ip_version)
|
||||||
if check_connectivity:
|
|
||||||
if not ping.ping(host=ip_address).received:
|
# check ICMP connectivity
|
||||||
continue
|
if check_connectivity and not ping.ping(
|
||||||
|
host=ip_address, ssh_client=ssh_client).received:
|
||||||
|
continue
|
||||||
|
|
||||||
selected_addresses.append(ip_address)
|
selected_addresses.append(ip_address)
|
||||||
|
|
||||||
return tobiko.Selection(selected_addresses)
|
return tobiko.Selection(selected_addresses)
|
||||||
|
|
||||||
|
|
||||||
def find_server_ip_address(server, network_name=None, ip_version=None,
|
def find_server_ip_address(server, unique=False, **kwargs):
|
||||||
check_connectivity=False, unique=False):
|
addresses = list_server_ip_addresses(server=server, **kwargs)
|
||||||
addresses = list_server_ip_addresses(server=server,
|
|
||||||
network_name=network_name,
|
|
||||||
ip_version=ip_version,
|
|
||||||
check_connectivity=check_connectivity)
|
|
||||||
if unique:
|
if unique:
|
||||||
return addresses.unique
|
return addresses.unique
|
||||||
else:
|
else:
|
||||||
|
@ -21,6 +21,7 @@ import testtools
|
|||||||
import tobiko
|
import tobiko
|
||||||
from tobiko import config
|
from tobiko import config
|
||||||
from tobiko.openstack import neutron
|
from tobiko.openstack import neutron
|
||||||
|
from tobiko.openstack import nova
|
||||||
from tobiko.openstack import stacks
|
from tobiko.openstack import stacks
|
||||||
|
|
||||||
|
|
||||||
@ -109,6 +110,53 @@ class NeutronApiTestCase(testtools.TestCase):
|
|||||||
self.assertTrue(agents)
|
self.assertTrue(agents)
|
||||||
|
|
||||||
|
|
||||||
|
class PortTest(testtools.TestCase):
|
||||||
|
|
||||||
|
#: Stack of resources with a network with a gateway router
|
||||||
|
stack = tobiko.required_setup_fixture(stacks.CentosServerStackFixture)
|
||||||
|
|
||||||
|
def test_list_port_addresses(self, ip_version=None):
|
||||||
|
port = neutron.find_port(device_id=self.stack.server_id)
|
||||||
|
port_addresses = neutron.list_port_ip_addresses(
|
||||||
|
port=port,
|
||||||
|
ip_version=ip_version)
|
||||||
|
server_addresses = nova.list_server_ip_addresses(
|
||||||
|
server=self.stack.server_details,
|
||||||
|
ip_version=ip_version,
|
||||||
|
address_type='fixed')
|
||||||
|
self.assertEqual(set(server_addresses), set(port_addresses))
|
||||||
|
if ip_version:
|
||||||
|
self.assertEqual(
|
||||||
|
port_addresses.with_attributes(version=ip_version),
|
||||||
|
port_addresses)
|
||||||
|
|
||||||
|
def test_list_port_addresses_with_ipv4(self):
|
||||||
|
self.test_list_port_addresses(ip_version=4)
|
||||||
|
|
||||||
|
def test_list_port_addresses_with_ipv6(self):
|
||||||
|
self.test_list_port_addresses(ip_version=6)
|
||||||
|
|
||||||
|
def test_find_port_address_with_ip_version(self):
|
||||||
|
port = neutron.find_port(device_id=self.stack.server_id)
|
||||||
|
server_addresses = nova.list_server_ip_addresses(
|
||||||
|
server=self.stack.server_details,
|
||||||
|
address_type='fixed')
|
||||||
|
for server_address in server_addresses:
|
||||||
|
port_address = neutron.find_port_ip_address(
|
||||||
|
port=port,
|
||||||
|
ip_version=server_address.version,
|
||||||
|
unique=True)
|
||||||
|
self.assertEqual(server_address, port_address)
|
||||||
|
|
||||||
|
def test_find_port_address_with_subnet_id(self):
|
||||||
|
port = neutron.find_port(device_id=self.stack.server_id)
|
||||||
|
for subnet in neutron.list_subnets(network_id=port['network_id']):
|
||||||
|
port_address = neutron.find_port_ip_address(
|
||||||
|
port=port, subnet_id=subnet['id'], unique=True)
|
||||||
|
cidr = netaddr.IPNetwork(subnet['cidr'])
|
||||||
|
self.assertIn(port_address, cidr)
|
||||||
|
|
||||||
|
|
||||||
class AgentTest(testtools.TestCase):
|
class AgentTest(testtools.TestCase):
|
||||||
|
|
||||||
def test_skip_if_missing_agents(self, count=1, should_skip=False,
|
def test_skip_if_missing_agents(self, count=1, should_skip=False,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user