Add scenario tests to taas tempest plugin

Scenarios in this patchset are achieving one goal, to actually
ensure that TAAS is working properly using tcpdump to verify.

This creates one new requirement to our test environment, image
with tcpdump. Usually, we can expect compute.image_ref to point at
cirros or similar basic distro, and can't force users to
overwrite it with a much bigger distro that includes tcpdump
because it will drastically increase the time needed to run other tests.

Solution is to change tempest.conf part of TAAS:

[taas_plugin_options]
advanced_image_ref = <uuid of tcpdump capable image>
advanced_image_ssh_user = <user to be used in image above>
advanced_image_flavor_ref = <flavor big enough for image above>

Where advanced_image_ref is UUID of the image with preinstalled tcpdump,
and advanced_image_ssh_user is a user we can use to ssh into VM.

In my case, I just used a default ubuntu cloud image.

New tests are running with your default networks (usually OVS),
and, if taas configuration is provided, against provider network.

Change-Id: Icdff0fe63a8977d6065808c18a9fdc674cef6d4f
This commit is contained in:
Mykola Yakovliev 2019-10-16 11:06:55 -05:00 committed by Mykola Yakovliev
parent f3c3d2f6a9
commit c591a980fd
4 changed files with 469 additions and 66 deletions

View File

@ -102,6 +102,21 @@
TEMPEST_PLUGINS: /opt/stack/tap-as-a-service-tempest-plugin
USE_PYTHON3: false
NETWORK_API_EXTENSIONS: "{{ (network_api_extensions_common + network_api_extensions_tempest) | join(',') }}"
DOWNLOAD_DEFAULT_IMAGES: false
IMAGE_URLS: "http://download.cirros-cloud.net/0.3.4/cirros-0.3.4-i386-disk.img,https://cloud-images.ubuntu.com/releases/bionic/release/ubuntu-18.04-server-cloudimg-amd64.img"
DEFAULT_IMAGE_NAME: cirros-0.3.4-i386-disk
ADVANCED_IMAGE_NAME: ubuntu-18.04-server-cloudimg-amd64
BUILD_TIMEOUT: 784
devstack_local_conf:
test-config:
$TEMPEST_CONFIG:
taas_plugin_options:
advanced_image_ref: ubuntu-18.04-server-cloudimg-amd64
advanced_image_ssh_user: ubuntu
provider_physical_network: public
provider_segmentation_id: 100
image_feature_enabled:
api_v2: true
devstack_plugins:
neutron: git://opendev.org/openstack/neutron.git
tap-as-a-service: git://opendev.org/x/tap-as-a-service.git

View File

@ -34,4 +34,14 @@ TaaSPluginGroup = [
default='',
help='Comma separated list of VLANs to be mirrored '
'for a Tap-Flow.'),
cfg.StrOpt('advanced_image_ref',
default='',
help='Valid advanced image uuid to be used in tests. '
'Must contain tcpdump preinstalled.'),
cfg.StrOpt('advanced_image_ssh_user',
default='ubuntu',
help='Name of ssh user to use with advanced image in tests.'),
cfg.StrOpt('advanced_image_flavor_ref',
default='m1.medium',
help='Valid flavor to use with advanced image in tests.'),
]

View File

@ -38,7 +38,7 @@ LOG = log.getLogger(__name__)
class ScenarioTest(tempest.test.BaseTestCase):
"""Base class for scenario tests. Uses tempest own clients. """
credentials = ['primary']
credentials = ['primary', 'admin']
@classmethod
def setup_clients(cls):
@ -51,10 +51,14 @@ class ScenarioTest(tempest.test.BaseTestCase):
cls.ports_client = cls.os_primary.ports_client
cls.routers_client = cls.os_primary.routers_client
cls.subnets_client = cls.os_primary.subnets_client
cls.flavor_client = cls.os_primary.flavors_client
cls.floating_ips_client = cls.os_primary.floating_ips_client
cls.security_groups_client = cls.os_primary.security_groups_client
cls.security_group_rules_client = (
cls.os_primary.security_group_rules_client)
cls.admin_networks_client = cls.os_admin.networks_client
cls.admin_subnets_client = cls.os_admin.subnets_client
cls.admin_routers_client = cls.os_admin.routers_client
# ## Test functions library
#
@ -76,13 +80,14 @@ class ScenarioTest(tempest.test.BaseTestCase):
client.delete_port, port['id'])
return port
def create_keypair(self, client=None):
@classmethod
def create_keypair(cls, client=None):
if not client:
client = self.keypairs_client
name = data_utils.rand_name(self.__class__.__name__)
client = cls.keypairs_client
name = data_utils.rand_name(cls.__class__.__name__)
# We don't need to create a keypair by pubkey in scenario
body = client.create_keypair(name=name)
self.addCleanup(client.delete_keypair, name)
cls.addClassResourceCleanup(client.delete_keypair, name)
return body['keypair']
def create_server(self, name=None, image_id=None, flavor=None,
@ -353,12 +358,13 @@ class NetworkScenarioTest(ScenarioTest):
if not CONF.service_available.neutron:
raise cls.skipException('Neutron not available')
def _create_network(self, networks_client=None,
@classmethod
def _create_network(cls, networks_client=None,
tenant_id=None,
namestart='network-smoke-',
port_security_enabled=True):
if not networks_client:
networks_client = self.networks_client
networks_client = cls.networks_client
if not tenant_id:
tenant_id = networks_client.tenant_id
name = data_utils.rand_name(namestart)
@ -370,13 +376,14 @@ class NetworkScenarioTest(ScenarioTest):
result = networks_client.create_network(**network_kwargs)
network = result['network']
self.assertEqual(network['name'], name)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
networks_client.delete_network,
network['id'])
assert network['name'] == name
cls.addClassResourceCleanup(test_utils.call_and_ignore_notfound_exc,
networks_client.delete_network,
network['id'])
return network
def _create_subnet(self, network, subnets_client=None,
@classmethod
def _create_subnet(cls, network, subnets_client=None,
routers_client=None, namestart='subnet-smoke',
**kwargs):
"""Create a subnet for the given network
@ -384,9 +391,9 @@ class NetworkScenarioTest(ScenarioTest):
within the cidr block configured for tenant networks.
"""
if not subnets_client:
subnets_client = self.subnets_client
subnets_client = cls.subnets_client
if not routers_client:
routers_client = self.routers_client
routers_client = cls.routers_client
def cidr_in_use(cidr, tenant_id):
"""Check cidr existence
@ -394,7 +401,7 @@ class NetworkScenarioTest(ScenarioTest):
:returns: True if subnet with cidr already exist in tenant
False else
"""
cidr_in_use = self.os_admin.subnets_client.list_subnets(
cidr_in_use = cls.os_admin.subnets_client.list_subnets(
tenant_id=tenant_id, cidr=cidr)['subnets']
return len(cidr_in_use) != 0
@ -432,13 +439,14 @@ class NetworkScenarioTest(ScenarioTest):
is_overlapping_cidr = 'overlaps with another subnet' in str(e)
if not is_overlapping_cidr:
raise
self.assertIsNotNone(result, 'Unable to allocate tenant network')
assert result is not None, 'Unable to allocate tenant network'
subnet = result['subnet']
self.assertEqual(subnet['cidr'], str_cidr)
assert subnet['cidr'] == str_cidr
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
subnets_client.delete_subnet, subnet['id'])
cls.addClassResourceCleanup(
test_utils.call_and_ignore_notfound_exc,
subnets_client.delete_subnet, subnet['id'])
return subnet
@ -593,31 +601,33 @@ class NetworkScenarioTest(ScenarioTest):
CONF.validation.ping_timeout,
1)
def _create_security_group(self, security_group_rules_client=None,
@classmethod
def _create_security_group(cls, security_group_rules_client=None,
tenant_id=None,
namestart='secgroup-smoke',
security_groups_client=None):
if security_group_rules_client is None:
security_group_rules_client = self.security_group_rules_client
security_group_rules_client = cls.security_group_rules_client
if security_groups_client is None:
security_groups_client = self.security_groups_client
security_groups_client = cls.security_groups_client
if tenant_id is None:
tenant_id = security_groups_client.tenant_id
secgroup = self._create_empty_security_group(
secgroup = cls._create_empty_security_group(
namestart=namestart, client=security_groups_client,
tenant_id=tenant_id)
# Add rules to the security group
rules = self._create_loginable_secgroup_rule(
rules = cls._create_loginable_secgroup_rule(
security_group_rules_client=security_group_rules_client,
secgroup=secgroup,
security_groups_client=security_groups_client)
for rule in rules:
self.assertEqual(tenant_id, rule['tenant_id'])
self.assertEqual(secgroup['id'], rule['security_group_id'])
assert tenant_id == rule['tenant_id']
assert secgroup['id'] == rule['security_group_id']
return secgroup
def _create_empty_security_group(self, client=None, tenant_id=None,
@classmethod
def _create_empty_security_group(cls, client=None, tenant_id=None,
namestart='secgroup-smoke'):
"""Create a security group without rules.
@ -629,7 +639,7 @@ class NetworkScenarioTest(ScenarioTest):
:returns: the created security group
"""
if client is None:
client = self.security_groups_client
client = cls.security_groups_client
if not tenant_id:
tenant_id = client.tenant_id
sg_name = data_utils.rand_name(namestart)
@ -640,21 +650,23 @@ class NetworkScenarioTest(ScenarioTest):
result = client.create_security_group(**sg_dict)
secgroup = result['security_group']
self.assertEqual(secgroup['name'], sg_name)
self.assertEqual(tenant_id, secgroup['tenant_id'])
self.assertEqual(secgroup['description'], sg_desc)
assert secgroup['name'] == sg_name
assert tenant_id == secgroup['tenant_id']
assert secgroup['description'] == sg_desc
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
client.delete_security_group, secgroup['id'])
cls.addClassResourceCleanup(
test_utils.call_and_ignore_notfound_exc,
client.delete_security_group, secgroup['id'])
return secgroup
def _default_security_group(self, client=None, tenant_id=None):
@classmethod
def _default_security_group(cls, client=None, tenant_id=None):
"""Get default secgroup for given tenant_id.
:returns: default secgroup for given tenant
"""
if client is None:
client = self.security_groups_client
client = cls.security_groups_client
if not tenant_id:
tenant_id = client.tenant_id
sgs = [
@ -662,10 +674,11 @@ class NetworkScenarioTest(ScenarioTest):
if sg['tenant_id'] == tenant_id and sg['name'] == 'default'
]
msg = "No default security group for tenant %s." % (tenant_id)
self.assertGreater(len(sgs), 0, msg)
assert len(sgs) > 0, msg
return sgs[0]
def _create_security_group_rule(self, secgroup=None,
@classmethod
def _create_security_group_rule(cls, secgroup=None,
sec_group_rules_client=None,
tenant_id=None,
security_groups_client=None, **kwargs):
@ -687,13 +700,13 @@ class NetworkScenarioTest(ScenarioTest):
}
"""
if sec_group_rules_client is None:
sec_group_rules_client = self.security_group_rules_client
sec_group_rules_client = cls.security_group_rules_client
if security_groups_client is None:
security_groups_client = self.security_groups_client
security_groups_client = cls.security_groups_client
if not tenant_id:
tenant_id = security_groups_client.tenant_id
if secgroup is None:
secgroup = self._default_security_group(
secgroup = cls._default_security_group(
client=security_groups_client, tenant_id=tenant_id)
ruleset = dict(security_group_id=secgroup['id'],
@ -703,12 +716,13 @@ class NetworkScenarioTest(ScenarioTest):
sg_rule = sec_group_rules_client.create_security_group_rule(**ruleset)
sg_rule = sg_rule['security_group_rule']
self.assertEqual(secgroup['tenant_id'], sg_rule['tenant_id'])
self.assertEqual(secgroup['id'], sg_rule['security_group_id'])
assert secgroup['tenant_id'] == sg_rule['tenant_id']
assert secgroup['id'] == sg_rule['security_group_id']
return sg_rule
def _create_loginable_secgroup_rule(self, security_group_rules_client=None,
@classmethod
def _create_loginable_secgroup_rule(cls, security_group_rules_client=None,
secgroup=None,
security_groups_client=None):
"""Create loginable security group rule
@ -721,9 +735,9 @@ class NetworkScenarioTest(ScenarioTest):
"""
if security_group_rules_client is None:
security_group_rules_client = self.security_group_rules_client
security_group_rules_client = cls.security_group_rules_client
if security_groups_client is None:
security_groups_client = self.security_groups_client
security_groups_client = cls.security_groups_client
rules = []
rulesets = [
dict(
@ -747,7 +761,7 @@ class NetworkScenarioTest(ScenarioTest):
for r_direction in ['ingress', 'egress']:
ruleset['direction'] = r_direction
try:
sg_rule = self._create_security_group_rule(
sg_rule = cls._create_security_group_rule(
sec_group_rules_client=sec_group_rules_client,
secgroup=secgroup,
security_groups_client=security_groups_client,
@ -758,12 +772,13 @@ class NetworkScenarioTest(ScenarioTest):
if msg not in ex._error_string:
raise ex
else:
self.assertEqual(r_direction, sg_rule['direction'])
assert r_direction == sg_rule['direction']
rules.append(sg_rule)
return rules
def _get_router(self, client=None, tenant_id=None):
@classmethod
def _get_router(cls, client=None, tenant_id=None):
"""Retrieve a router for the given tenant id.
If a public router has been configured, it will be returned.
@ -773,7 +788,7 @@ class NetworkScenarioTest(ScenarioTest):
routes traffic to the public network.
"""
if not client:
client = self.routers_client
client = cls.routers_client
if not tenant_id:
tenant_id = client.tenant_id
router_id = CONF.network.public_router_id
@ -782,7 +797,7 @@ class NetworkScenarioTest(ScenarioTest):
body = client.show_router(router_id)
return body['router']
elif network_id:
router = self._create_router(client, tenant_id)
router = cls._create_router(client, tenant_id)
kwargs = {'external_gateway_info': dict(network_id=network_id)}
router = client.update_router(router['id'], **kwargs)['router']
return router
@ -790,10 +805,11 @@ class NetworkScenarioTest(ScenarioTest):
raise Exception("Neither of 'public_router_id' or "
"'public_network_id' has been defined.")
def _create_router(self, client=None, tenant_id=None,
@classmethod
def _create_router(cls, client=None, tenant_id=None,
namestart='router-smoke'):
if not client:
client = self.routers_client
client = cls.routers_client
if not tenant_id:
tenant_id = client.tenant_id
name = data_utils.rand_name(namestart)
@ -801,10 +817,10 @@ class NetworkScenarioTest(ScenarioTest):
admin_state_up=True,
tenant_id=tenant_id)
router = result['router']
self.assertEqual(router['name'], name)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
client.delete_router,
router['id'])
assert router['name'] == name
cls.addClassResourceCleanup(test_utils.call_and_ignore_notfound_exc,
client.delete_router,
router['id'])
return router
def _update_router_admin_state(self, router, admin_state_up):
@ -813,7 +829,8 @@ class NetworkScenarioTest(ScenarioTest):
router['id'], **kwargs)['router']
self.assertEqual(admin_state_up, router['admin_state_up'])
def create_networks(self, networks_client=None,
@classmethod
def create_networks(cls, networks_client=None,
routers_client=None, subnets_client=None,
tenant_id=None, dns_nameservers=None,
port_security_enabled=True):
@ -835,33 +852,149 @@ class NetworkScenarioTest(ScenarioTest):
if not CONF.compute.fixed_network_name:
m = 'fixed_network_name must be specified in config'
raise lib_exc.InvalidConfiguration(m)
network = self._get_network_by_name(
network = cls._get_network_by_name(
CONF.compute.fixed_network_name)
router = None
subnet = None
else:
network = self._create_network(
network = cls._create_network(
networks_client=networks_client,
tenant_id=tenant_id,
port_security_enabled=port_security_enabled)
router = self._get_router(client=routers_client,
tenant_id=tenant_id)
router = cls._get_router(client=routers_client,
tenant_id=tenant_id)
subnet_kwargs = dict(network=network,
subnets_client=subnets_client,
routers_client=routers_client)
# use explicit check because empty list is a valid option
if dns_nameservers is not None:
subnet_kwargs['dns_nameservers'] = dns_nameservers
subnet = self._create_subnet(**subnet_kwargs)
subnet = cls._create_subnet(**subnet_kwargs)
if not routers_client:
routers_client = self.routers_client
routers_client = cls.routers_client
router_id = router['id']
routers_client.add_router_interface(router_id,
subnet_id=subnet['id'])
# save a cleanup job to remove this association between
# router and subnet
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
routers_client.remove_router_interface, router_id,
subnet_id=subnet['id'])
cls.addClassResourceCleanup(
test_utils.call_and_ignore_notfound_exc,
routers_client.remove_router_interface, router_id,
subnet_id=subnet['id'])
return network, subnet, router
def _create_server_and_wait(self, source_port, image_id=None,
flavor_id=None):
image_id = image_id if image_id else CONF.compute.image_ref
flavor_id = flavor_id if flavor_id else CONF.compute.flavor_ref
vm = self.create_server(
flavor=flavor_id,
image_id=image_id,
key_name=self.keypair['name'],
wait_until='ACTIVE',
networks=[{'port': source_port['id']}])
return vm
def _create_server_with_floatingip(self, use_taas_cloud_image=False,
provider_net=False, **kwargs):
net_id = self.network['id']
flavor = CONF.compute.flavor_ref
image = CONF.compute.image_ref
if use_taas_cloud_image:
image = self._get_real_image_id(
CONF.taas_plugin_options.advanced_image_ref)
flavor = self._get_real_flavor_id(
CONF.taas_plugin_options.advanced_image_flavor_ref)
if provider_net:
net_id = self.provider_network['id']
port = self._create_port(
net_id, security_groups=[self.secgroup['id']], **kwargs)
self._create_server_and_wait(port, image, flavor)
fip = self.create_floating_ip(
port, port_id=port['id'],
external_network_id=CONF.network.public_network_id)
return port, fip
def _run_in_background(self, sshclient, cmd):
runInBg = "nohup %s 2>&1 &" % cmd
sshclient.exec_command(runInBg)
def _setup_provider_network(self):
net = self._create_provider_network()
self._create_provider_subnet(net["id"])
return net
def _create_provider_network(self):
network_kwargs = {
"admin_state_up": True,
"shared": True,
"provider:network_type": "vlan",
"provider:physical_network":
CONF.taas_plugin_options.provider_physical_network,
}
segmentation_id = CONF.taas_plugin_options.provider_segmentation_id
if segmentation_id and segmentation_id == "0":
network_kwargs['provider:network_type'] = 'flat'
elif segmentation_id:
network_kwargs['provider:segmentation_id'] = segmentation_id
network = self.admin_networks_client.create_network(
**network_kwargs)['network']
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.admin_networks_client.delete_network,
network['id'])
return network
def _create_provider_subnet(self, net_id):
subnet = dict(
network_id=net_id,
cidr="172.25.100.0/24",
ip_version=4,
)
result = self.admin_subnets_client.create_subnet(**subnet)
self.addCleanup(
test_utils.call_and_ignore_notfound_exc,
self.admin_subnets_client.delete_subnet, result['subnet']['id'])
self.admin_routers_client.add_router_interface(
self.router['id'], subnet_id=result['subnet']['id'])
self.addCleanup(
test_utils.call_and_ignore_notfound_exc,
self.admin_routers_client.remove_router_interface,
self.router['id'], subnet_id=result['subnet']['id'])
def _get_real_image_id(self, image_id):
try:
id = self.image_client.show_image(
image_id=image_id)["id"]
except AssertionError:
images = self.image_client.list_images(
params={"name": image_id})["images"]
if len(images) == 0:
self.skip_checks()
id = images[0]["id"]
return id
def _get_real_flavor_id(self, flavor_id):
id = None
try:
id = self.flavor_client.show_flavor(
flavor_id=flavor_id)["flavor"]["id"]
except AssertionError:
flavors = self.flavor_client.list_flavors()["flavors"]
for f in flavors:
if f["name"] == flavor_id:
id = f["id"]
if id is None:
self.skip_checks()
return id

View File

@ -0,0 +1,245 @@
# Copyright (c) 2019 AT&T
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from contextlib import contextmanager
from oslo_log import log
import testtools
from tempest.common import utils
from tempest import config
from tempest.lib.common.utils.linux import remote_client
from tempest.lib import decorators
from neutron_taas_tempest_plugin.tests.scenario import base
CONF = config.CONF
LOG = log.getLogger(__name__)
class TestTaaSTrafficScenarios(base.TaaSScenarioTest):
@classmethod
def setup_clients(cls):
super(TestTaaSTrafficScenarios, cls).setup_clients()
if CONF.image_feature_enabled.api_v1:
cls.image_client = cls.os_primary.image_client
elif CONF.image_feature_enabled.api_v2:
cls.image_client = cls.os_primary.image_client_v2
else:
raise cls.skipException(
'Either api_v1 or api_v2 must be True in '
'[image-feature-enabled].')
@classmethod
@utils.requires_ext(extension="router", service="network")
def resource_setup(cls):
super(TestTaaSTrafficScenarios, cls).resource_setup()
for ext in ['taas']:
if not utils.is_extension_enabled(ext, 'network'):
msg = "%s Extension not enabled." % ext
raise cls.skipException(msg)
cls.network, cls.subnet, cls.router = cls.create_networks()
cls.provider_network = None
cls.keypair = cls.create_keypair()
cls.secgroup = cls._create_security_group()
@contextmanager
def _setup_topology(self, taas=True, use_taas_cloud_image=False,
provider_net=False):
"""Setup topology for the test
+------------+
| monitor vm |
+-----+------+
|
+-----v---+
+--+ network <--+
| +----^----+ |
| | |
| +----+-+ +---+--+
| | vm 1 | | vm 2 |
| +------+ +------+
|
| +--------+
+--> router |
+-----+--+
|
+-----v------+
| public net |
+------------+
"""
if provider_net:
if CONF.taas_plugin_options.provider_physical_network:
self.provider_network = self._setup_provider_network()
else:
msg = "provider_physical_network not provided"
raise self.skipException(msg)
self.mon_port, mon_fip = self._create_server_with_floatingip(
use_taas_cloud_image=use_taas_cloud_image,
provider_net=provider_net)
self.left_port, left_fip = self._create_server_with_floatingip(
provider_net=provider_net)
self.right_port, right_fip = self._create_server_with_floatingip(
provider_net=provider_net)
if taas:
LOG.debug("Create TAAS service")
tap_service = self.create_tap_service(port_id=self.mon_port['id'])
self.create_tap_flow(tap_service_id=tap_service['id'],
direction='BOTH',
source_port=self.left_port['id'])
self.create_tap_flow(tap_service_id=tap_service['id'],
direction='BOTH',
source_port=self.right_port['id'])
user = CONF.validation.image_ssh_user
if use_taas_cloud_image:
user = CONF.taas_plugin_options.advanced_image_ssh_user
self.monitor_client = remote_client.RemoteClient(
mon_fip['floating_ip_address'], user,
pkey=self.keypair['private_key'])
self.left_client = remote_client.RemoteClient(
left_fip['floating_ip_address'], CONF.validation.image_ssh_user,
pkey=self.keypair['private_key'])
self.right_client = remote_client.RemoteClient(
right_fip['floating_ip_address'], CONF.validation.image_ssh_user,
pkey=self.keypair['private_key'])
yield
def _check_icmp_traffic(self):
log_location = "/tmp/tcpdumplog"
right_ip = self.right_port['fixed_ips'][0]['ip_address']
left_ip = self.left_port['fixed_ips'][0]['ip_address']
# Run tcpdump in background
self._run_in_background(self.monitor_client,
"sudo tcpdump -n -nn > %s" % log_location)
# Ensure tcpdump is up and running
psax = self.monitor_client.exec_command("ps -ax")
self.assertTrue("tcpdump" in psax)
# Run traffic from left_vm to right_vm
self.left_client.exec_command("ping -c 50 %s" % right_ip)
# Collect tcpdump results
output = self.monitor_client.exec_command("cat %s" % log_location)
self.assertTrue(len(output) > 0)
looking_for = ["IP %s > %s: ICMP echo request" % (left_ip, right_ip),
"IP %s > %s: ICMP echo reply" % (right_ip, left_ip)]
results = []
for tcpdump_line in looking_for:
results.append(tcpdump_line in output)
return all(results)
def _test_taas_connectivity(self, use_provider_net=False):
"""Ensure TAAS doesn't break connectivity
This test creates TAAS service between two servers and checks that
it doesn't break basic connectivity between them.
"""
# Check uninterrupted traffic between VMs
with self._setup_topology(provider_net=use_provider_net):
# Left to right
self._check_remote_connectivity(
self.left_client,
self.right_port['fixed_ips'][0]['ip_address'])
# Right to left
self._check_remote_connectivity(
self.right_client,
self.left_port['fixed_ips'][0]['ip_address'])
# TAAS vm to right
self._check_remote_connectivity(
self.monitor_client,
self.right_port['fixed_ips'][0]['ip_address'])
# TAAS vm to left
self._check_remote_connectivity(
self.monitor_client,
self.left_port['fixed_ips'][0]['ip_address'])
@decorators.idempotent_id('ff414b7d-e81c-47f2-b6c8-53bc2f1e9b00')
@decorators.attr(type='slow')
@utils.services('compute', 'network')
def test_taas_provider_network_connectivity(self):
self._test_taas_connectivity(use_provider_net=True)
@decorators.idempotent_id('e3c52e91-7abf-4dfd-8687-f7c071cdd333')
@decorators.attr(type='slow')
@utils.services('compute', 'network')
def test_taas_network_connectivity(self):
self._test_taas_connectivity(use_provider_net=False)
@decorators.idempotent_id('fcb15ca3-ef61-11e9-9792-f45c89c47e11')
@testtools.skipUnless(CONF.taas_plugin_options.advanced_image_ref,
'Cloud image not found.')
@decorators.attr(type='slow')
@utils.services('compute', 'network')
def test_taas_forwarded_traffic_positive(self):
"""Check that TAAS forwards traffic as expected"""
with self._setup_topology(use_taas_cloud_image=True):
# Check that traffic was forwarded to TAAS service
self.assertTrue(self._check_icmp_traffic())
@decorators.idempotent_id('6c54d9c5-075a-4a1f-bbe6-12c3c9abf1e2')
@testtools.skipUnless(CONF.taas_plugin_options.advanced_image_ref,
'Cloud image not found.')
@decorators.attr(type='slow')
@utils.services('compute', 'network')
def test_taas_forwarded_traffic_negative(self):
"""Check that TAAS doesn't forward traffic"""
with self._setup_topology(taas=False, use_taas_cloud_image=True):
# Check that traffic was NOT forwarded to TAAS service
self.assertFalse(self._check_icmp_traffic())
@decorators.idempotent_id('fcb15ca3-ef61-11e9-9792-f45c89c47e12')
@testtools.skipUnless(CONF.taas_plugin_options.advanced_image_ref,
'Cloud image not found.')
@decorators.attr(type='slow')
@utils.services('compute', 'network')
def test_taas_forwarded_traffic_provider_net_positive(self):
"""Check that TAAS forwards traffic as expected in provider network"""
with self._setup_topology(use_taas_cloud_image=True,
provider_net=True):
# Check that traffic was forwarded to TAAS service
self.assertTrue(self._check_icmp_traffic())
@decorators.idempotent_id('6c54d9c5-075a-4a1f-bbe6-12c3c9abf1e3')
@testtools.skipUnless(CONF.taas_plugin_options.advanced_image_ref,
'Cloud image not found.')
@decorators.attr(type='slow')
@utils.services('compute', 'network')
def test_taas_forwarded_traffic_provider_net_negative(self):
"""Check that TAAS doesn't forward traffic in provider network"""
with self._setup_topology(taas=False, use_taas_cloud_image=True,
provider_net=True):
# Check that traffic was NOT forwarded to TAAS service
self.assertFalse(self._check_icmp_traffic())