[Tempest]: Added api cases for FWaaSv2

Change-Id: Ic036613b5e0bc4724055523900e248c9e03508bd
This commit is contained in:
Puneet Arora 2018-02-26 22:04:37 +00:00
parent a5f4183700
commit ab3da381e5
3 changed files with 480 additions and 0 deletions

View File

@ -15,9 +15,12 @@
import time
from neutron_lib import constants as nl_constants
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
from tempest.lib import exceptions as lib_exc
from vmware_nsx_tempest._i18n import _
from vmware_nsx_tempest.common import constants
@ -77,6 +80,64 @@ class FeatureManager(traffic_manager.TrafficManager):
cls.members_client = members_client.get_client(cls.os_primary)
cls.health_monitors_client = \
health_monitors_client.get_client(cls.os_primary)
cls.fwaas_v2_client = openstack_network_clients.FwaasV2Client(
net_client.auth_provider,
net_client.service,
net_client.region,
net_client.endpoint_type,
**_params)
#
# FwaasV2 base class
#
def create_firewall_rule(self, **kwargs):
fw_rule = self.fwaas_v2_client.create_firewall_v2_rule(**kwargs)
self.addCleanup(
test_utils.call_and_ignore_notfound_exc,
self.fwaas_v2_client.delete_firewall_v2_rule,
fw_rule["firewall_rule"]["id"])
return fw_rule
def create_firewall_policy(self, **kwargs):
fw_policy = self.fwaas_v2_client.create_firewall_v2_policy(**kwargs)
self.addCleanup(
test_utils.call_and_ignore_notfound_exc,
self.fwaas_v2_client.delete_firewall_v2_policy,
fw_policy["firewall_policy"]["id"])
return fw_policy
def create_firewall_group(self, **kwargs):
fw_group = self.fwaas_v2_client.create_firewall_v2_group(**kwargs)
self.addCleanup(
test_utils.call_and_ignore_notfound_exc,
self.fwaas_v2_client.delete_firewall_v2_group,
fw_group["firewall_group"]["id"])
return fw_group
def update_firewall_group(self, group_id, **kwargs):
fw_group = self.fwaas_v2_client.update_firewall_v2_group(group_id,
**kwargs)
return fw_group
def update_firewall_policy(self, policy_id, **kwargs):
return self.fwaas_v2_client.update_firewall_v2_policy(policy_id,
**kwargs)
def update_firewall_rule(self, rule_id, **kwargs):
return self.fwaas_v2_client.update_firewall_v2_rule(rule_id,
**kwargs)
def show_firewall_group(self, group_id):
fw_group = self.fwaas_v2_client.show_firewall_v2_group(group_id)
return fw_group
def show_firewall_rule(self, rule_id):
fw_rule = self.fwaas_v2_client.show_firewall_v2_rule(rule_id)
return fw_rule
def show_firewall_policy(self, policy_id):
fw_policy = self.fwaas_v2_client.show_firewall_v2_policy(policy_id)
return fw_policy
#
# L2Gateway base class. To get basics of L2GW.
@ -254,6 +315,26 @@ class FeatureManager(traffic_manager.TrafficManager):
server = self.servers_details[server_name].server
self.start_web_server(protocol_port, server, server_name)
def _wait_firewall_while(self, fw_group_id, statuses, not_found_ok=False):
if not_found_ok:
expected_exceptions = (lib_exc.NotFound)
else:
expected_exceptions = ()
while True:
try:
fw = self.show_firewall_group(fw_group_id)
except expected_exceptions:
break
status = fw['firewall_group']['status']
if status not in statuses:
break
def _wait_firewall_ready(self, fw_group_id):
time.sleep(constants.NSX_BACKEND_VERY_SMALL_TIME_INTERVAL)
self._wait_firewall_while(fw_group_id,
[nl_constants.PENDING_CREATE,
nl_constants.PENDING_UPDATE])
def wait_for_load_balancer_status(self, lb_id):
# Wait for load balancer become ONLINE and ACTIVE
self.load_balancers_client.wait_for_load_balancer_status(lb_id)
@ -392,3 +473,12 @@ class FeatureManager(traffic_manager.TrafficManager):
port_id=self.loadbalancer['vip_port_id'])
self.vip_ip_address = vip_fip['floating_ip_address']
return self.vip_ip_address
def get_router_port(self, client):
"""List ports using admin creds """
ports_list = client.list_ports()
for port in ports_list['ports']:
port_info = client.show_port(port['id'])
if port_info['port']['device_owner'] == "network:router_interface":
return port_info['port']['id']
return None

View File

@ -14,11 +14,13 @@
# under the License.
from oslo_log import log
from tempest import config
from tempest.lib.services.network import base
from vmware_nsx_tempest.common import constants
LOG = log.getLogger(__name__)
CONF = config.CONF
class L2GatewayClient(base.BaseNetworkClient):
@ -98,3 +100,87 @@ class L2GatewayConnectionClient(base.BaseNetworkClient):
def list_l2_gateway_connections(self, **filters):
uri = self.resource_base_path
return self.list_resources(uri, **filters)
class FwaasV2Client(base.BaseNetworkClient):
"""
Request resources via API for FwaasV2Client
fwaasv2 create rule
fwaasv2 update rule
fwaasv2 delete rule
fwaasv2 show rule
fwaasv2 create policy
fwaasv2 update policy
fwaasv2 delete policy
fwaasv2 show policy
fwaasv2 create group
fwaasv2 update group
fwaasv2 delete group
fwaasv2 show group
"""
resource_rule = 'firewall_rule'
resource_policy = 'firewall_policy'
resource_group = 'firewall_group'
rule_path = 'fwaas/firewall_rules'
policy_path = '/fwaas/firewall_policies'
group_path = '/fwaas/firewall_groups'
resource_rule_base_path = '/%s' % rule_path
resource_policy_base_path = '/%s' % policy_path
resource_group_base_path = '/%s' % group_path
resource_rule_object_path = '/%s/%%s' % rule_path
resource_policy_object_path = '/%s/%%s' % policy_path
resource_group_object_path = '/%s/%%s' % group_path
def create_firewall_v2_rule(self, **kwargs):
uri = self.resource_rule_base_path
post_data = {self.resource_rule: kwargs}
return self.create_resource(uri, post_data)
def update_firewall_v2_rule(self, fw_rule_id, **kwargs):
uri = self.resource_rule_object_path % fw_rule_id
post_data = {self.resource_rule: kwargs}
return self.update_resource(uri, post_data)
def show_firewall_v2_rule(self, firewall_rule_id):
uri = self.resource_rule_object_path % firewall_rule_id
return self.show_resource(uri)
def delete_firewall_v2_rule(self, firewall_rule_id):
uri = self.resource_rule_object_path % firewall_rule_id
return self.delete_resource(uri)
def create_firewall_v2_policy(self, **kwargs):
uri = self.resource_policy_base_path
post_data = {self.resource_policy: kwargs}
return self.create_resource(uri, post_data)
def update_firewall_v2_policy(self, fw_policy_id, **kwargs):
uri = self.resource_policy_object_path % fw_policy_id
post_data = {self.resource_policy: kwargs}
return self.update_resource(uri, post_data)
def show_firewall_v2_policy(self, firewall_policy_id):
uri = self.resource_policy_object_path % firewall_policy_id
return self.show_resource(uri)
def delete_firewall_v2_policy(self, policy_id):
uri = self.resource_policy_object_path % policy_id
return self.delete_resource(uri)
def create_firewall_v2_group(self, **kwargs):
uri = self.resource_group_base_path
post_data = {self.resource_group: kwargs}
return self.create_resource(uri, post_data)
def update_firewall_v2_group(self, fw_group_id, **kwargs):
uri = self.resource_group_object_path % fw_group_id
post_data = {self.resource_group: kwargs}
return self.update_resource(uri, post_data)
def show_firewall_v2_group(self, fw_group_id):
uri = self.resource_group_object_path % fw_group_id
return self.show_resource(uri)
def delete_firewall_v2_group(self, group_id):
uri = self.resource_group_object_path % group_id
return self.delete_resource(uri)

View File

@ -0,0 +1,304 @@
# Copyright 2018 VMware Inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from tempest.lib import exceptions
from tempest import test
from vmware_nsx_tempest.lib import feature_manager
from vmware_nsx_tempest.services import nsx_client
CONF = config.CONF
CONF.validation.auth_method = 'None'
LOG = logging.getLogger(__name__)
class TestFwaasV2Ops(feature_manager.FeatureManager):
@classmethod
def skip_checks(cls):
super(TestFwaasV2Ops, cls).skip_checks()
if not test.is_extension_enabled('fwaasv2', 'network'):
msg = "Extension provider-security-group is not enabled."
raise cls.skipException(msg)
@classmethod
def setup_credentials(cls):
cls.set_network_resources()
cls.admin_mgr = cls.get_client_manager('admin')
super(TestFwaasV2Ops, cls).setup_credentials()
@classmethod
def setup_clients(cls):
"""
Create various client connections. Such as NSX.
"""
super(TestFwaasV2Ops, cls).setup_clients()
cls.nsx_client = nsx_client.NSXClient(
CONF.network.backend,
CONF.nsxv3.nsx_manager,
CONF.nsxv3.nsx_user,
CONF.nsxv3.nsx_password)
def create_fw_basic_topo(self, protocol_name=None):
if protocol_name is None:
protocol_name = 'icmp'
rule_name = data_utils.rand_name('fw-rule-')
# Create firewall rule
fw_rules = self.create_firewall_rule(name=rule_name,
protocol=protocol_name)
rules = []
show_rules = self.show_firewall_rule(fw_rules['firewall_rule']['id'])
# Check firewall rule
self.assertEqual(show_rules.get('firewall_rule')['name'], rule_name)
self.assertEqual(show_rules.get('firewall_rule')['protocol'],
protocol_name)
# Update firewall rule
self.update_firewall_rule(
fw_rules['firewall_rule']['id'],
protocol='tcp')
self.update_firewall_rule(
fw_rules['firewall_rule']['id'],
name='new-rule')
show_rules = self.show_firewall_rule(fw_rules['firewall_rule']['id'])
# Check firewall rule after updated value
self.assertEqual(show_rules.get('firewall_rule')['name'], 'new-rule')
self.assertEqual(show_rules.get('firewall_rule')['protocol'],
'tcp')
rules.append(fw_rules['firewall_rule']['id'])
policy_name = data_utils.rand_name('fw-policy-')
# Create firewall policy
fw_policy = self.create_firewall_policy(name=policy_name,
firewall_rules=rules)
show_policy = self.show_firewall_policy(
fw_policy['firewall_policy']['id'])
# Check firewall policy
self.assertEqual(
show_policy.get('firewall_policy')['name'],
policy_name)
self.assertEqual(show_policy.get('firewall_policy')
['firewall_rules'], rules)
# Update firewall policy
self.update_firewall_policy(fw_policy['firewall_policy']['id'],
name='new-policy')
show_policy = self.show_firewall_policy(
fw_policy['firewall_policy']['id'])
# Check firewall policy
self.assertEqual(
show_policy.get('firewall_policy')['name'],
'new-policy')
policy_id = fw_policy['firewall_policy']['id']
group_name = data_utils.rand_name('fw-group-')
# Create firewall group
fw_group = self.create_firewall_group(
name=group_name,
ingress_firewall_policy_id=policy_id,
egress_firewall_policy_id=policy_id)
show_group = self.show_firewall_group(fw_group["firewall_group"]["id"])
# Check firewall group values
self.assertEqual(show_group.get('firewall_group')['name'], group_name)
self.assertEqual(show_group.get('firewall_group')[
'ingress_firewall_policy_id'], policy_id)
self.assertEqual(show_group.get('firewall_group')[
'egress_firewall_policy_id'], policy_id)
fw_topo = dict(fw_rules=fw_rules, fw_policy=fw_policy,
fw_group=fw_group)
return fw_topo
def create_fw_group_port_topo(
self,
group_delete=True,
project_id=None,
ports=None,
protocol_name=None):
if protocol_name is None:
protocol_name = 'icmp'
rule_name = data_utils.rand_name('fw-rule-')
# Create firewall rule
fw_rules = self.create_firewall_rule(
name=rule_name, protocol=protocol_name, project_id=project_id)
rules = []
show_rules = self.show_firewall_rule(fw_rules['firewall_rule']['id'])
# Check firewall rule
self.assertEqual(show_rules.get('firewall_rule')['name'], rule_name)
self.assertEqual(show_rules.get('firewall_rule')['protocol'],
protocol_name)
rules.append(fw_rules['firewall_rule']['id'])
policy_name = data_utils.rand_name('fw-policy-')
# Create firewall policy
fw_policy = self.create_firewall_policy(name=policy_name,
firewall_rules=rules,
project_id=project_id)
show_policy = self.show_firewall_policy(
fw_policy['firewall_policy']['id'])
# Check firewall policy
self.assertEqual(
show_policy.get('firewall_policy')['name'],
policy_name)
self.assertEqual(show_policy.get('firewall_policy')
['firewall_rules'], rules)
policy_id = fw_policy['firewall_policy']['id']
group_name = data_utils.rand_name('fw-group-')
# Create firewall group
fw_group = self.create_firewall_group(
name=group_name,
ingress_firewall_policy_id=policy_id,
egress_firewall_policy_id=policy_id,
ports=ports,
project_id=project_id)
self._wait_firewall_ready(fw_group["firewall_group"]["id"])
show_group = self.show_firewall_group(fw_group["firewall_group"]["id"])
self.assertEqual(show_group.get('firewall_group')['ports'], ports)
if group_delete is True:
# Update firewall group
self.update_firewall_group(fw_group["firewall_group"]["id"],
ports=[])
# Check updated values of firewall group
self.assertEqual(
show_group.get('firewall_group')['name'],
group_name)
self.assertEqual(show_group.get('firewall_group')[
'ingress_firewall_policy_id'], policy_id)
self.assertEqual(show_group.get('firewall_group')[
'egress_firewall_policy_id'], policy_id)
# Delete firewall group
self.fwaas_v2_client.delete_firewall_v2_group(
fw_group["firewall_group"]["id"])
else:
fw_topo = dict(fw_rules=fw_rules, fw_policy=fw_policy,
fw_group=fw_group)
return fw_topo
def create_fw_with_port_topology(self, group_delete, protocol_name):
# Create network topo
network = \
self.create_topology_network(network_name="fw-network")
router_name = 'fw-router'
# Create router topo
router = self.create_topology_router(router_name)
subnet_name = 'fw-subnet'
# Create subnet topo
self.create_topology_subnet(subnet_name, network,
router_id=router['id'])
p_client = self.ports_client
ports = []
ports.append(self.get_router_port(p_client))
if not group_delete:
fw_topo = self.create_fw_group_port_topo(
group_delete, network['project_id'], ports, protocol_name)
return fw_topo
else:
self.create_fw_group_port_topo(
group_delete, network['project_id'], ports, protocol_name)
@decorators.attr(type='nsxv3')
@decorators.idempotent_id('431288d7-9213-4b1e-a11d-15840c8e2f12')
def test_fwaas_basic_icmp(self):
"""
Test fwaasv2 api to create icmp rule/policy/group and update it and
verifying its values
"""
self.create_fw_basic_topo('icmp')
@decorators.attr(type='nsxv3')
@decorators.idempotent_id('76a188d7-9812-5b1e-a11d-65840c9e2fd6')
def test_fwaas_basic_tcp(self):
"""
Test fwaasv2 api to create tcp rule/policy/group and update it and
verifying its values
"""
self.create_fw_basic_topo('tcp')
@decorators.attr(type='nsxv3')
@decorators.idempotent_id('a5b188d7-0183-4b1e-9111-15840c8e2fd6')
def test_fwaas_basic_udp(self):
"""
Test fwaasv2 api to create udp rule/policy/group and update it and
verifying its values
"""
self.create_fw_basic_topo('udp')
@decorators.attr(type='nsxv3')
@decorators.idempotent_id('95b188d7-0183-4b1e-a11d-15840c8e2345')
def test_fwaas_router_port_icmp(self):
"""
Test fwaasv2 api to create icmp rule/policy/group with router port and
update it and verifying its values
"""
self.create_fw_with_port_topology('icmp')
@decorators.attr(type='nsxv3')
@decorators.idempotent_id('25b188d7-0183-4b1e-5123-15840c8e2fd6')
def test_fwaas_router_port_tcp(self):
"""
Test fwaasv2 api to create tcp rule/policy/group with router port and
update it and verifying its values
"""
self.create_fw_with_port_topology('tcp')
@decorators.attr(type='nsxv3')
@decorators.idempotent_id('501288d7-0183-4b1e-a11d-15840c8e2fd6')
def test_fwaas_router_port_udp(self):
"""
Test fwaasv2 api to create udp rule/policy/group with router port and
update it and verifying its values
"""
self.create_fw_with_port_topology('udp')
@decorators.attr(type='nsxv3')
@decorators.attr(type=["negative"])
@decorators.idempotent_id('434588d7-0183-4b12-a11d-15840c8e2fd6')
def test_delete_fw_group_when_port_in_use(self):
"""
Try to delete firewall group when its in use
"""
fw_topo = self.create_fw_with_port_topology(
group_delete=False, protocol_name='icmp')
self.assertRaises(exceptions.Conflict,
self.fwaas_v2_client.delete_firewall_v2_group,
fw_topo["fw_group"]["firewall_group"]["id"])
self.update_firewall_group(fw_topo["fw_group"]["firewall_group"]["id"],
ports=[])
self.fwaas_v2_client.delete_firewall_v2_group(
fw_topo["fw_group"]["firewall_group"]["id"])
@decorators.attr(type='nsxv3')
@decorators.attr(type=["negative"])
@decorators.idempotent_id('201228d7-0183-4b1e-a11d-35821c8e2fd6')
def test_delete_fw_rule_when_in_use(self):
"""
Try to delete firewall rule when its in use
"""
fw_topo = self.create_fw_basic_topo('icmp')
self.assertRaises(exceptions.Conflict,
self.fwaas_v2_client.delete_firewall_v2_rule,
fw_topo["fw_rules"]["firewall_rule"]["id"])
@decorators.attr(type='nsxv3')
@decorators.attr(type=["negative"])
@decorators.idempotent_id('901488d7-1184-4b1e-511d-15878c8e2fd6')
def test_delete_fw_policy_when_in_use(self):
"""
Try to delete firewall policy when its in use
"""
fw_topo = self.create_fw_basic_topo('icmp')
self.assertRaises(exceptions.Conflict,
self.fwaas_v2_client.delete_firewall_v2_policy,
fw_topo["fw_policy"]["firewall_policy"]["id"])