MDProxy: Updated MDProxy test cases based on new design

Change-Id: I3ee946a74849efef9d7a307876200515c279b70f
This commit is contained in:
Devang Doshi 2017-08-08 14:16:51 -07:00
parent a391f8adb3
commit 785344ca40
4 changed files with 455 additions and 232 deletions

View File

@ -16,6 +16,7 @@ import collections
import netaddr
from oslo_log import log as logging
from oslo_utils import netutils
from tempest import config
from tempest.lib.common.utils import data_utils
@ -31,7 +32,7 @@ LOG = logging.getLogger(__name__)
class ApplianceManager(manager.NetworkScenarioTest):
server_details = collections.namedtuple('server_details',
['server', 'floating_ip',
['server', 'floating_ips',
'networks'])
def setUp(self):
@ -45,6 +46,11 @@ class ApplianceManager(manager.NetworkScenarioTest):
self.topology_config_drive = CONF.compute_feature_enabled.config_drive
self.topology_keypairs = {}
self.servers_details = {}
self.topology_port_ids = {}
self.image_ref = CONF.compute.image_ref
self.flavor_ref = CONF.compute.flavor_ref
self.run_ssh = CONF.validation.run_validation
self.ssh_user = CONF.validation.image_ssh_user
def get_internal_ips(self, server, network, device="network"):
internal_ips = [p['fixed_ips'][0]['ip_address'] for p in
@ -83,11 +89,15 @@ class ApplianceManager(manager.NetworkScenarioTest):
return self.topology_keypairs[server['key_name']]['private_key']
def create_topology_router(self, router_name, routers_client=None,
**kwargs):
tenant_id=None, **kwargs):
if not routers_client:
routers_client = self.routers_client
if not tenant_id:
tenant_id = routers_client.tenant_id
router_name_ = constants.APPLIANCE_NAME_STARTS_WITH + router_name
router = self._create_router(namestart=router_name_, **kwargs)
name = data_utils.rand_name(router_name_)
router = routers_client.create_router(
name=name, admin_state_up=True, tenant_id=tenant_id)['router']
public_network_info = {"external_gateway_info": dict(
network_id=self.topology_public_network_id)}
routers_client.update_router(router['id'], **public_network_info)
@ -189,6 +199,44 @@ class ApplianceManager(manager.NetworkScenarioTest):
def create_topology_security_group(self, **kwargs):
return self._create_security_group(**kwargs)
def _get_server_portid_and_ip4(self, server, ip_addr=None):
ports = self.os_admin.ports_client.list_ports(
device_id=server['id'], fixed_ip=ip_addr)['ports']
p_status = ['ACTIVE']
if getattr(CONF.service_available, 'ironic', False):
p_status.append('DOWN')
port_map = [(p["id"], fxip["ip_address"])
for p in ports
for fxip in p["fixed_ips"]
if netutils.is_valid_ipv4(fxip["ip_address"])
and p['status'] in p_status]
inactive = [p for p in ports if p['status'] != 'ACTIVE']
if inactive:
LOG.warning("Instance has ports that are not ACTIVE: %s", inactive)
self.assertNotEqual(0, len(port_map),
"No IPv4 addresses found in: %s" % ports)
return port_map
def create_floatingip(self, thing, port_id, external_network_id=None,
ip4=None, client=None):
"""Create a floating IP and associates to a resource/port on Neutron"""
if not external_network_id:
external_network_id = self.topology_public_network_id
if not client:
client = self.floating_ips_client
result = client.create_floatingip(
floating_network_id=external_network_id,
port_id=port_id,
tenant_id=thing['tenant_id'],
fixed_ip_address=ip4
)
floating_ip = result['floatingip']
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
client.delete_floatingip,
floating_ip['id'])
return floating_ip
def create_topology_instance(
self, server_name, networks, security_groups=None,
config_drive=None, keypair=None, image_id=None,
@ -220,18 +268,62 @@ class ApplianceManager(manager.NetworkScenarioTest):
for net in networks:
net_ = {"uuid": net["id"]}
networks_.append(net_)
# Deploy server with all teh args.
# Deploy server with all the args.
server = self.create_server(
name=server_name_, networks=networks_, clients=clients, **kwargs)
floating_ips = []
if create_floating_ip:
floating_ip = self.create_floating_ip(server)
server["floating_ip"] = floating_ip
self.topology_servers_floating_ip.append(floating_ip)
else:
floating_ip = None
ports = self._get_server_portid_and_ip4(server)
for port_id, ip4 in ports:
floating_ip = self.create_floatingip(server, port_id, ip4=ip4)
if server.get("floating_ips"):
server["floating_ips"].append(floating_ip)
else:
server["floating_ips"] = [floating_ip]
self.topology_servers_floating_ip.append(floating_ip)
floating_ips.append(floating_ip)
server_details = self.server_details(server=server,
floating_ip=floating_ip,
floating_ips=floating_ips,
networks=networks)
self.servers_details[server_name] = server_details
self.topology_servers[server_name] = server
return server
def _list_ports(self, *args, **kwargs):
"""List ports using admin creds """
ports_list = self.os_admin.ports_client.list_ports(
*args, **kwargs)
return ports_list['ports']
def get_network_ports_id(self):
for port in self._list_ports():
for fixed_ip in port["fixed_ips"]:
ip = fixed_ip["ip_address"]
port_id = port["id"]
tenant_id = port["tenant_id"]
if tenant_id in self.topology_port_ids:
self.topology_port_ids[tenant_id][ip] = port_id
else:
self.topology_port_ids[tenant_id] = {ip: port_id}
def get_glance_image_id(self, params):
"""
Get the glance image id based on the params
:param params: List of sub-string of image name
:return:
"""
# Retrieve the list of images that meet the filter
images_list = self.os_admin.image_client_v2.list_images()['images']
# Validate that the list was fetched sorted accordingly
msg = "No images were found that met the filter criteria."
self.assertNotEmpty(images_list, msg)
image_id = None
for image in images_list:
for param in params:
if not param.lower() in image["name"].lower():
break
else:
image_id = image["id"]
self.assertIsNotNone(image_id, msg)
return image_id

View File

@ -12,16 +12,27 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from tempest.common.utils.linux import remote_client
from tempest import config
from tempest import exceptions
from tempest.lib.common.utils import test_utils
from vmware_nsx_tempest.common import constants
from vmware_nsx_tempest.lib import appliance_manager
CONF = config.CONF
LOG = logging.getLogger(__name__)
class TrafficManager(appliance_manager.ApplianceManager):
def check_server_internal_ips_using_floating_ip(
self, floating_ip, server, address_list, should_connect=True):
ip_address = floating_ip['floating_ip_address']
private_key = self.get_server_key(server)
ssh_source = self.get_remote_client(
ssh_source = self._get_remote_client(
ip_address, private_key=private_key)
for remote_ip in address_list:
self.check_remote_connectivity(ssh_source, remote_ip,
@ -49,10 +60,11 @@ class TrafficManager(appliance_manager.ApplianceManager):
floating_ip, server, compute_ips, should_connect)
def using_floating_ip_check_server_and_project_network_connectivity(
self, server_details, network=None):
self, server_details, floating_ip=None, network=None):
if not network:
network = server_details.networks[0]
floating_ip = server_details.floating_ip
if not floating_ip:
floating_ip = server_details.floating_ips[0]
server = server_details.server
self.check_network_internal_connectivity(network, floating_ip, server)
self.check_vm_internal_connectivity(network, floating_ip, server)
@ -66,3 +78,89 @@ class TrafficManager(appliance_manager.ApplianceManager):
self.check_server_internal_ips_using_floating_ip(
floating_ip_on_network2, server_on_network2, remote_ips,
should_connect)
def _get_remote_client(self, ip_address, username=None, private_key=None,
use_password=False):
"""Get a SSH client to a remote server
@param ip_address the server floating or fixed IP address to use
for ssh validation
@param username name of the Linux account on the remote server
@param private_key the SSH private key to use
@return a RemoteClient object
"""
if username is None:
username = CONF.validation.image_ssh_user
# Set this with 'keypair' or others to log in with keypair or
# username/password.
if use_password:
password = CONF.validation.image_ssh_password
private_key = None
else:
password = None
if private_key is None:
private_key = self.keypair['private_key']
linux_client = remote_client.RemoteClient(ip_address, username,
pkey=private_key,
password=password)
try:
linux_client.validate_authentication()
except Exception as e:
message = ('Initializing SSH connection to %(ip)s failed. '
'Error: %(error)s' % {'ip': ip_address,
'error': e})
caller = test_utils.find_test_caller()
if caller:
message = '(%s) %s' % (caller, message)
LOG.exception(message)
self._log_console_output()
raise
return linux_client
def verify_server_ssh(self, server, floating_ip=None, use_password=False):
private_key = self.get_server_key(server)
if not floating_ip:
floating_ip = server["floating_ips"][0]["floating_ip_address"]
if not floating_ip:
LOG.error("Without floating ip, failed to verify SSH connectivity")
raise
ssh_client = self._get_remote_client(
ip_address=floating_ip, username=self.ssh_user,
private_key=private_key, use_password=use_password)
return ssh_client
def exec_cmd_on_server_using_fip(self, cmd, ssh_client=None,
sub_result=None, expected_value=None):
if not ssh_client:
ssh_client = self.ssh_client
def exec_cmd_and_verify_output():
exec_cmd_retried = 0
import time
while exec_cmd_retried < \
constants.MAX_NO_OF_TIMES_EXECUTION_OVER_SSH:
result = ssh_client.exec_command(cmd)
self.assertIsNotNone(result)
if not result == "":
break
exec_cmd_retried += 1
time.sleep(constants.INTERVAL_BETWEEN_EXEC_RETRY_ON_SSH)
LOG.info("Tried %s times!!!", exec_cmd_retried)
if sub_result:
msg = ("Failed sub result is not in result Subresult: %r "
"Result: %r" % (sub_result, result))
self.assertIn(sub_result, result, msg)
if expected_value:
msg = ("Failed expected_value is not in result expected_value:"
" %r Result: %r" % (expected_value, result))
self.assertEqual(expected_value, result, msg)
return result
if not test_utils.call_until_true(exec_cmd_and_verify_output,
CONF.compute.build_timeout,
CONF.compute.build_interval):
raise exceptions.TimeoutException("Timed out while waiting to "
"execute cmd %s on server. " %
cmd)

View File

@ -11,26 +11,22 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from tempest import config
from tempest import exceptions
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
from tempest.lib import decorators
from tempest import test
from vmware_nsx_tempest.common import constants
from vmware_nsx_tempest.lib import feature_manager
from vmware_nsx_tempest.services import nsxv3_client
from vmware_nsx_tempest.tests.scenario import manager
CONF = config.CONF
LOG = logging.getLogger(__name__)
class TestMDProxy(manager.NetworkScenarioTest):
class TestMDProxy(feature_manager.FeatureManager):
"""Test MDProxy.
Adding test cases to test MDProxy in different scenarios such as
@ -40,10 +36,6 @@ class TestMDProxy(manager.NetworkScenarioTest):
def setUp(self):
super(TestMDProxy, self).setUp()
self.image_ref = CONF.compute.image_ref
self.flavor_ref = CONF.compute.flavor_ref
self.run_ssh = CONF.validation.run_validation
self.ssh_user = CONF.validation.image_ssh_user
self.nsx = nsxv3_client.NSXV3Client(CONF.nsxv3.nsx_manager,
CONF.nsxv3.nsx_user,
CONF.nsxv3.nsx_password)
@ -52,289 +44,330 @@ class TestMDProxy(manager.NetworkScenarioTest):
def skip_checks(cls):
"""Class level skip checks.
Class level check. Skip all teh MDproxy tests, if native_dhcp_metadata
Class level check. Skip all the MDproxy tests, if native_dhcp_metadata
is not True under nsxv3 section of the config
"""
super(TestMDProxy, cls).skip_checks()
if not CONF.nsxv3.native_dhcp_metadata:
msg = " native_dhcp_metadata is not enabled under nsxv3 config" \
", skipping all the MDProxy tests!!!"
raise cls.skipException(msg)
def verify_ssh(self, keypair, instance, port_id=None):
created_floating_ip = self.create_floating_ip(instance,
port_id=port_id)
self.fip = str(created_floating_ip["floating_ip_address"])
self.assertIsNotNone(self.fip)
# Check ssh
self.ssh_client = self.get_remote_client(
ip_address=self.fip, username=self.ssh_user,
private_key=keypair["private_key"])
def _verify_md(self, md_url, expected_value="",
sub_result=None, ssh_client=None):
cmd = "curl " + md_url
self.exec_cmd_on_server_using_fip(
cmd, ssh_client=ssh_client, sub_result=sub_result,
expected_value=expected_value)
def _delete_router(self, router):
body = self.ports_client.list_ports(device_id=router["id"])
interfaces = body["ports"]
for interface in interfaces:
test_utils.call_and_ignore_notfound_exc(
self.routers_client.remove_router_interface, router["id"],
subnet_id=interface["fixed_ips"][0]["subnet_id"])
self.routers_client.delete_router(router["id"])
def _create_router(self, router_name=None, admin_state_up=True,
external_network_id=None, enable_snat=None, **kwargs):
ext_gw_info = {}
if external_network_id:
ext_gw_info["network_id"] = external_network_id
if enable_snat is not None:
ext_gw_info["enable_snat"] = enable_snat
body = self.routers_client.create_router(
name=router_name, external_gateway_info=ext_gw_info,
admin_state_up=admin_state_up, **kwargs)
router = body["router"]
self.addCleanup(self._delete_router, router)
return router
def _create_net_subnet_router_interface(self, router=None):
network = self._create_network(namestart="network-mdproxy")
subnet = self._create_subnet(network)
if router:
self.routers_client.add_router_interface(
router["id"], subnet_id=subnet["id"])
self.addCleanup(self.routers_client.remove_router_interface,
router["id"], subnet_id=subnet["id"])
return network["id"], subnet["id"]
def _create_server_on_networks(self, networks):
security_group = self._create_security_group()
name = data_utils.rand_name("server-mdproxy")
keypair = self.create_keypair()
security_groups = [{"name": security_group["name"]}]
instance = self.create_server(
image_id=self.image_ref,
flavor=self.flavor_ref,
config_drive=CONF.compute_feature_enabled.config_drive, name=name,
networks=networks, key_name=keypair["name"],
security_groups=security_groups, wait_until="ACTIVE")
self.addCleanup(self.servers_client.delete_server, instance["id"])
return instance, keypair
def _list_ports(self, *args, **kwargs):
"""List ports using admin creds """
ports_list = self.admin_manager.ports_client.list_ports(
*args, **kwargs)
return ports_list['ports']
def _get_port_id(self, network_id, subnet_id, instance):
instance_addrs = instance["addresses"].items()
instance_fixed_ips = []
for addr in instance_addrs:
instance_fixed_ips.append(addr[1][0]["addr"])
for port in self._list_ports(device_id=instance['id']):
port_fixed_ip = port["fixed_ips"][0]["ip_address"]
if port["network_id"] == network_id and port["fixed_ips"][0][
"subnet_id"] == subnet_id and "compute:" in port[
"device_owner"] and port_fixed_ip in instance_fixed_ips:
port_id = port["id"]
self.assertIsNotNone(port_id, "Failed to find Instance's port id!!!")
return port_id
def _verify_md(self, md_url, expected_value="", check_exist_only=False,
sub_result=None):
def exec_cmd_and_verify_output():
cmd = "curl " + md_url
exec_cmd_retried = 0
import time
while exec_cmd_retried < \
constants.MAX_NO_OF_TIMES_EXECUTION_OVER_SSH:
result = self.ssh_client.exec_command(cmd)
self.assertIsNotNone(result)
if not result == "":
break
exec_cmd_retried += 1
time.sleep(constants.INTERVAL_BETWEEN_EXEC_RETRY_ON_SSH)
LOG.info("Tried %s times!!!", exec_cmd_retried)
if check_exist_only:
return "Verification is successful!"
msg = ("Failed while verifying metadata on server. Result "
"of command %r is NOT %r." % (cmd, expected_value))
if sub_result:
msg2 = ("Failed to verify incorrect passowrd on metadata"
"server. Result %r is NOT in %r." % (
sub_result, result))
self.assertIn(sub_result, result, msg2)
return "Verification is successful!"
self.assertEqual(expected_value, result, msg)
return "Verification is successful!"
if not test_utils.call_until_true(exec_cmd_and_verify_output,
CONF.compute.build_timeout,
CONF.compute.build_interval):
raise exceptions.TimeoutException("Timed out while waiting to "
"verify metadata on server. "
"%s is empty." % md_url)
def verify_metadata_in_detail(self, instance):
def verify_metadata_in_detail(self, instance, ssh_client, floatingip,
fixed_ip):
# Check floating IPv4 in Metadata.
md_url_pubic_ipv4 = constants.MD_BASE_URL + \
"latest/meta-data/public-ipv4"
self._verify_md(md_url=md_url_pubic_ipv4, expected_value=self.fip)
instance_name = instance["name"].replace("_", "-")
self._verify_md(md_url=md_url_pubic_ipv4, expected_value=floatingip,
ssh_client=ssh_client)
# Check hostname in Metadata.
md_url_hostname = constants.MD_BASE_URL + "latest/meta-data/hostname"
self._verify_md(md_url=md_url_hostname,
expected_value=instance["name"] + ".novalocal")
expected_value=instance_name + ".novalocal",
ssh_client=ssh_client)
# Check local IPv4 in Metadata.
md_url_local_ipv4 = constants.MD_BASE_URL + \
"latest/meta-data/local-ipv4"
self._verify_md(md_url=md_url_local_ipv4, check_exist_only=True)
self._verify_md(md_url=md_url_local_ipv4, expected_value=fixed_ip,
ssh_client=ssh_client)
# Check hostname in Metadata of 2009-04-04 folder.
md_url_hostname = constants.MD_BASE_URL + \
"2009-04-04/meta-data/hostname"
self._verify_md(md_url=md_url_hostname,
expected_value=instance["name"] + ".novalocal")
expected_value=instance_name + ".novalocal",
ssh_client=ssh_client)
# Check hostname in Metadata of 1.0 folder.
md_url_hostname = constants.MD_BASE_URL + "1.0/meta-data/hostname"
self._verify_md(md_url=md_url_hostname,
expected_value=instance["name"] + ".novalocal")
expected_value=instance_name + ".novalocal",
ssh_client=ssh_client)
def verify_md_proxy_logical_ports_on_backend(self):
def verify_md_proxy_logical_ports_on_backend(self, tenant_id, network_id):
md_counter = 0
logical_ports = self.nsx.get_os_logical_ports()
for port_index in range(len(logical_ports)):
if logical_ports[port_index]["attachment"][
"attachment_type"] == "METADATA_PROXY":
md_counter += 1
msg = "Admin state of MDProxy logical port is DOWN!!!"
if logical_ports[port_index]["tags"][0]["tag"] == network_id:
msg = "MDproxy logical port does not have proper tenant " \
"id!!!"
self.assertEqual(
tenant_id, logical_ports[port_index]["tags"][1][
"tag"], msg)
md_counter += 1
msg1 = "Admin state of MDProxy logical port is DOWN!!!"
msg2 = "LS name does not start with mdproxy!!!"
msg3 = "MDproxy logical port does not have any auto tag!!!"
msg4 = "MDproxy logical port does not have scope tag as " \
"os-neutron-net-id!!!"
msg5 = "MDproxy logical port does not have scope tag as " \
"os-project-id!!!"
msg6 = "MDproxy logical port does not have scope tag as " \
"os-project-name!!!"
msg7 = "MDproxy logical port does not have scope tag as " \
"os-api-version!!!"
self.assertEqual(
"UP", logical_ports[port_index]["admin_state"], msg)
"UP", logical_ports[port_index]["admin_state"], msg1)
self.assertIn("mdproxy-",
logical_ports[port_index]["display_name"], msg2)
self.assertNotEqual(0, len(logical_ports[port_index]["tags"]),
msg3)
self.assertEqual(
"os-neutron-net-id",
logical_ports[port_index]["tags"][0]["scope"],
msg4)
self.assertEqual(
"os-project-id",
logical_ports[port_index]["tags"][1]["scope"],
msg5)
self.assertEqual(
"os-project-name",
logical_ports[port_index]["tags"][2]["scope"],
msg6)
self.assertEqual(
"os-api-version",
logical_ports[port_index]["tags"][3]["scope"],
msg7)
self.assertNotEqual(0, md_counter, "No logical port found for MD "
"proxy!!!")
def deploy_mdproxy_topology(self, glance_image_id=None):
router_mdproxy = self.create_topology_router("router_mdproxy")
network_mdproxy = self.create_topology_network("network_mdproxy")
self.create_topology_subnet(
"subnet_web", network_mdproxy, router_id=router_mdproxy["id"])
self.create_topology_instance(
"server_mdproxy_1", [network_mdproxy], image_id=glance_image_id)
def deploy_mdproxy_topology_2(self):
network_mdproxy = self.create_topology_network("network_mdproxy")
self.create_topology_subnet(
"subnet_web", network_mdproxy)
self.create_topology_instance(
"server_mdproxy_1", [network_mdproxy], create_floating_ip=False)
def deploy_mdproxy_topology_3(self):
router_mdproxy = self.create_topology_router("router_mdproxy")
network_mdproxy_1 = self.create_topology_network("network_mdproxy_1")
self.create_topology_subnet(
"subnet_web_1", network_mdproxy_1, router_id=router_mdproxy["id"])
self.create_topology_instance(
"server_mdproxy_1", [network_mdproxy_1])
network_mdproxy_2 = self.create_topology_network("network_mdproxy_2")
self.create_topology_subnet("subnet_web_2", network_mdproxy_2,
router_id=router_mdproxy["id"])
self.create_topology_instance("server_mdproxy_2", [network_mdproxy_2])
def metadata_test_on_various_glance_image(self, image_id):
self.deploy_mdproxy_topology(glance_image_id=image_id)
# Verify ssh, detailed metadata and verify backend data
ssh_client = self.verify_server_ssh(
server=self.topology_servers["server_mdproxy_1"],
use_password=True)
fixed_ip = \
self.topology_servers["server_mdproxy_1"]["floating_ips"][0][
"fixed_ip_address"]
fip = self.topology_servers["server_mdproxy_1"]["floating_ips"][0][
"floating_ip_address"]
self.verify_metadata_in_detail(
instance=self.topology_servers["server_mdproxy_1"],
ssh_client=ssh_client, floatingip=fip, fixed_ip=fixed_ip)
tenant_id = self.topology_networks["network_mdproxy"]["tenant_id"]
network_id = self.topology_networks["network_mdproxy"]["id"]
self.verify_md_proxy_logical_ports_on_backend(tenant_id, network_id)
@decorators.idempotent_id("e9a93161-d852-414d-aa55-36d465ea45df")
@test.services("compute", "network")
def test_mdproxy_ping(self):
router = self._create_router(
router_name=data_utils.rand_name("router-MDProxy"),
external_network_id=CONF.network.public_network_id)
(network_id, subnet_id) = self._create_net_subnet_router_interface(
router)
networks_ids = {"uuid": network_id}
instance, keypair = self._create_server_on_networks([networks_ids])
port_id = self._get_port_id(network_id, subnet_id, instance)
self.verify_ssh(keypair=keypair, instance=instance, port_id=port_id)
self.deploy_mdproxy_topology()
# Verify ssh connection and basic mdproxy data.
ssh_client = self.verify_server_ssh(server=self.topology_servers[
"server_mdproxy_1"])
md_url_pubic_ipv4 = constants.MD_BASE_URL + \
"latest/meta-data/public-ipv4"
self._verify_md(md_url=md_url_pubic_ipv4, expected_value=self.fip)
fip = self.topology_servers["server_mdproxy_1"][
"floating_ips"][0]["floating_ip_address"]
self._verify_md(md_url=md_url_pubic_ipv4, expected_value=fip,
ssh_client=ssh_client)
@decorators.idempotent_id("743f34a6-58b8-4288-a07f-7bee21c55051")
@test.services("compute", "network")
def test_mdproxy_verify_backend(self):
router = self._create_router(
router_name=data_utils.rand_name("router-MDProxy"),
external_network_id=CONF.network.public_network_id)
(network_id, subnet_id) = self._create_net_subnet_router_interface(
router)
networks_ids = {"uuid": network_id}
instance, keypair = self._create_server_on_networks([networks_ids])
port_id = self._get_port_id(network_id, subnet_id, instance)
self.verify_ssh(keypair=keypair, instance=instance, port_id=port_id)
self.verify_metadata_in_detail(instance=instance)
self.verify_md_proxy_logical_ports_on_backend()
self.deploy_mdproxy_topology()
# Verify ssh, detailed metadata and verify backend data
ssh_client = self.verify_server_ssh(
server=self.topology_servers["server_mdproxy_1"])
fixed_ip = self.topology_servers["server_mdproxy_1"]["floating_ips"][
0]["fixed_ip_address"]
fip = self.topology_servers["server_mdproxy_1"]["floating_ips"][0][
"floating_ip_address"]
self.verify_metadata_in_detail(
instance=self.topology_servers["server_mdproxy_1"],
ssh_client=ssh_client, floatingip=fip, fixed_ip=fixed_ip)
tenant_id = self.topology_networks["network_mdproxy"]["tenant_id"]
network_id = self.topology_networks["network_mdproxy"]["id"]
self.verify_md_proxy_logical_ports_on_backend(tenant_id, network_id)
@decorators.idempotent_id("fce2acc8-b850-40fe-bf02-958dd3cd4343")
@test.services("compute", "network")
def test_mdproxy_with_server_on_two_ls(self):
router = self._create_router(
router_name=data_utils.rand_name("router-MDProxy"),
external_network_id=CONF.network.public_network_id)
(network_id1, subnet_id1) = self._create_net_subnet_router_interface(
router)
(network_id2, subnet_id2) = self._create_net_subnet_router_interface(
router)
net1 = {"uuid": network_id1}
net2 = {"uuid": network_id2}
instance, keypair = self._create_server_on_networks([net1, net2])
port_id = self._get_port_id(network_id1, subnet_id1, instance)
self.verify_ssh(keypair=keypair, instance=instance, port_id=port_id)
self.verify_metadata_in_detail(instance=instance)
router_mdproxy = self.create_topology_router("router_mdproxy")
network_mdproxy = self.create_topology_network("network_mdproxy")
self.create_topology_subnet("subnet_web", network_mdproxy,
router_id=router_mdproxy["id"])
network2_mdproxy = self.create_topology_network("network2_mdproxy")
self.create_topology_subnet("subnet2_web", network2_mdproxy,
router_id=router_mdproxy["id"])
# Instance has 2 network ports.
self.create_topology_instance(
"server_mdproxy_1", [network_mdproxy, network2_mdproxy])
floating_ip_1 = self.topology_servers["server_mdproxy_1"][
"floating_ips"][0]["floating_ip_address"]
fixed_ip_1 = self.topology_servers["server_mdproxy_1"][
"floating_ips"][0]["fixed_ip_address"]
ssh_client1 = self.verify_server_ssh(
server=self.topology_servers["server_mdproxy_1"],
floating_ip=floating_ip_1)
floating_ip_2 = self.topology_servers["server_mdproxy_1"][
"floating_ips"][1]["floating_ip_address"]
self.verify_server_ssh(
server=self.topology_servers["server_mdproxy_1"],
floating_ip=floating_ip_2)
self.verify_metadata_in_detail(
instance=self.topology_servers["server_mdproxy_1"],
ssh_client=ssh_client1, floatingip=floating_ip_1,
fixed_ip=fixed_ip_1)
@decorators.idempotent_id("67332752-1295-42cb-a8c3-99210fb6b00b")
@test.services("compute", "network")
def test_mdproxy_isolated_network(self):
(network_id, _) = self._create_net_subnet_router_interface()
networks_ids = {"uuid": network_id}
self._create_server_on_networks([networks_ids])
self.verify_md_proxy_logical_ports_on_backend()
# Deploy topology without tier1 router
self.deploy_mdproxy_topology_2()
tenant_id = self.topology_networks["network_mdproxy"]["tenant_id"]
network_id = self.topology_networks["network_mdproxy"]["id"]
# Verify MDProxy logical ports on Backend
self.verify_md_proxy_logical_ports_on_backend(tenant_id, network_id)
@decorators.idempotent_id("cc8d2ab8-0bea-4e32-bf80-c9c46a7612b7")
@decorators.attr(type=["negative"])
@test.services("compute", "network")
def test_mdproxy_delete_when_ls_bounded(self):
(network_id, _) = self._create_net_subnet_router_interface()
networks_ids = {"uuid": network_id}
self._create_server_on_networks([networks_ids])
self.deploy_mdproxy_topology_2()
md_proxy_uuid = self.nsx.get_md_proxies()[0]["id"]
result = self.nsx.delete_md_proxy(md_proxy_uuid)
# Delete mdproxy server when it is still attached to LS
self.assertEqual(str(result["error_code"]),
constants.MD_ERROR_CODE_WHEN_LS_BOUNDED)
@decorators.idempotent_id("501fc3ea-696b-4e9e-b383-293ab94e2545")
@test.services("compute", "network")
def test_mdproxy_with_multiple_ports_on_network(self):
router = self._create_router(
router_name=data_utils.rand_name("router-MDProxy"),
external_network_id=CONF.network.public_network_id)
(network_id, subnet_id) = self._create_net_subnet_router_interface(
router)
networks_ids = {"uuid": network_id}
instance, keypair = self._create_server_on_networks([networks_ids])
instance2, keypair2 = self._create_server_on_networks([networks_ids])
port_id = self._get_port_id(network_id, subnet_id, instance)
# Verify 1st instance.
self.verify_ssh(keypair=keypair, instance=instance, port_id=port_id)
self.verify_metadata_in_detail(instance=instance)
# Verify 2nd instance.
port_id2 = self._get_port_id(network_id, subnet_id, instance2)
self.verify_ssh(keypair=keypair2, instance=instance2, port_id=port_id2)
self.verify_metadata_in_detail(instance=instance2)
self.deploy_mdproxy_topology()
# Boot 2nd vm on same network
network = self.topology_networks["network_mdproxy"]
self.create_topology_instance(
"server_mdproxy_2", [network])
# Verify Metadata from vm1
ssh_client_1 = self.verify_server_ssh(
server=self.topology_servers["server_mdproxy_1"])
fixed_ip_1 = self.topology_servers["server_mdproxy_1"][
"floating_ips"][0][
"fixed_ip_address"]
fip_1 = self.topology_servers["server_mdproxy_1"]["floating_ips"][0][
"floating_ip_address"]
self.verify_metadata_in_detail(
instance=self.topology_servers["server_mdproxy_1"],
ssh_client=ssh_client_1, floatingip=fip_1, fixed_ip=fixed_ip_1)
# Verify Metadata from vm2
ssh_client_2 = self.verify_server_ssh(
server=self.topology_servers["server_mdproxy_2"])
fixed_ip_2 = self.topology_servers["server_mdproxy_2"][
"floating_ips"][0][
"fixed_ip_address"]
fip_2 = self.topology_servers["server_mdproxy_2"]["floating_ips"][0][
"floating_ip_address"]
self.verify_metadata_in_detail(
instance=self.topology_servers["server_mdproxy_2"],
ssh_client=ssh_client_2, floatingip=fip_2, fixed_ip=fixed_ip_2)
# Verify Metadata on backend
tenant_id = self.topology_networks["network_mdproxy"]["tenant_id"]
network_id = self.topology_networks["network_mdproxy"]["id"]
self.verify_md_proxy_logical_ports_on_backend(tenant_id, network_id)
@decorators.idempotent_id("eae21afc-50ea-42e5-9c49-2ee38cee9f06")
@test.services("compute", "network")
def test_mdproxy_with_multiple_metadata_ports(self):
router = self._create_router(
router_name=data_utils.rand_name("router-MDProxy"),
external_network_id=CONF.network.public_network_id)
(network_id1, subnet_id1) = self._create_net_subnet_router_interface(
router)
(network_id2, subnet_id2) = self._create_net_subnet_router_interface(
router)
net1 = {"uuid": network_id1}
net2 = {"uuid": network_id2}
instance, keypair = self._create_server_on_networks([net1])
instance2, keypair2 = self._create_server_on_networks([net2])
port_id1 = self._get_port_id(network_id1, subnet_id1, instance)
port_id2 = self._get_port_id(network_id2, subnet_id2, instance2)
self.verify_ssh(keypair=keypair, instance=instance, port_id=port_id1)
self.verify_metadata_in_detail(instance=instance)
self.verify_ssh(keypair=keypair2, instance=instance2, port_id=port_id2)
self.verify_metadata_in_detail(instance=instance2)
self.deploy_mdproxy_topology_3()
# Verify 1st instance on the network1
ssh_client_1 = self.verify_server_ssh(
server=self.topology_servers["server_mdproxy_1"])
fixed_ip_1 = self.topology_servers["server_mdproxy_1"][
"floating_ips"][0][
"fixed_ip_address"]
fip_1 = self.topology_servers["server_mdproxy_1"]["floating_ips"][0][
"floating_ip_address"]
self.verify_metadata_in_detail(
instance=self.topology_servers["server_mdproxy_1"],
ssh_client=ssh_client_1, floatingip=fip_1, fixed_ip=fixed_ip_1)
# Verify 2nd instance on the network2
ssh_client_2 = self.verify_server_ssh(
server=self.topology_servers["server_mdproxy_2"])
fixed_ip_2 = self.topology_servers["server_mdproxy_2"][
"floating_ips"][0][
"fixed_ip_address"]
fip_2 = self.topology_servers["server_mdproxy_2"]["floating_ips"][0][
"floating_ip_address"]
self.verify_metadata_in_detail(
instance=self.topology_servers["server_mdproxy_2"],
ssh_client=ssh_client_2, floatingip=fip_2, fixed_ip=fixed_ip_2)
@decorators.idempotent_id("29d44d7c-6ea1-4b30-a6c3-a2695c2486fe")
@decorators.attr(type=["negative"])
@test.services("compute", "network")
def test_mdproxy_with_incorrect_password(self):
router = self._create_router(
router_name=data_utils.rand_name("router-MDProxy"),
external_network_id=CONF.network.public_network_id)
(network_id, subnet_id) = self._create_net_subnet_router_interface(
router)
networks_ids = {"uuid": network_id}
instance, keypair = self._create_server_on_networks([networks_ids])
port_id = self._get_port_id(network_id, subnet_id, instance)
self.verify_ssh(keypair=keypair, instance=instance, port_id=port_id)
md_url = constants.MD_BASE_URL + "latest/meta-data/public-ipv4"
self._verify_md(md_url, sub_result="403 Forbidden")
self.deploy_mdproxy_topology()
ssh_client = self.verify_server_ssh(
server=self.topology_servers["server_mdproxy_1"])
md_url_pubic_ipv4 = constants.MD_BASE_URL + \
"latest/meta-data/public-ipv4"
# Query metadata and query should fail
self._verify_md(md_url=md_url_pubic_ipv4, expected_value="",
ssh_client=ssh_client, sub_result="403 Forbidden")
@decorators.idempotent_id("74e5d545-3ccc-46c8-9bda-16ccf8730a5b")
@test.services("compute", "network")
def test_mdproxy_with_cirros_kvm_server_image(self):
image_id = self.get_glance_image_id(["cirros", "kvm"])
self.metadata_test_on_various_glance_image(image_id)
@decorators.idempotent_id("35babffc-f098-4705-82b7-ab96a6f4fdd8")
@test.services("compute", "network")
def test_mdproxy_with_debian_esx_server_image(self):
image_id = self.get_glance_image_id(["debian", "esx"])
self.metadata_test_on_various_glance_image(image_id)
@decorators.idempotent_id("71ba325f-083b-4247-9192-a9f54d3ecfd2")
@test.services("compute", "network")
def test_mdproxy_with_debian_kvm_server_image(self):
image_id = self.get_glance_image_id(["debian", "kvm"])
self.metadata_test_on_various_glance_image(image_id)
@decorators.idempotent_id("dfed6074-c4a1-4bf7-a805-80a191ea7875")
@test.services("compute", "network")
def test_mdproxy_with_xenial_esx_server_image(self):
image_id = self.get_glance_image_id(["xenial", "esx"])
self.metadata_test_on_various_glance_image(image_id)
@decorators.idempotent_id("55829b7f-1535-41d8-833f-b20ac0ee48e0")
@test.services("compute", "network")
def test_mdproxy_with_xenial_kvm_server_image(self):
image_id = self.get_glance_image_id(["xenial", "kvm"])
self.metadata_test_on_various_glance_image(image_id)

View File

@ -149,9 +149,9 @@ class TestMicroSegmentationOps(feature_manager.FeatureManager):
self.check_server_project_connectivity(details)
self.check_cross_network_connectivity(
self.topology_networks["network_web"],
self.servers_details["server_app_1"].floating_ip,
self.servers_details["server_app_1"].floating_ips[0],
self.servers_details["server_app_1"].server)
self.check_cross_network_connectivity(
self.topology_networks["network_app"],
self.servers_details["server_web_1"].floating_ip,
self.servers_details["server_web_1"].floating_ips[0],
self.servers_details["server_web_1"].server)