From 29e8847e4069db113527ba23ade2304b5eade6ac Mon Sep 17 00:00:00 2001 From: Deepthi Kandavara Jayarama Date: Wed, 29 May 2019 18:24:53 +0000 Subject: [PATCH] [IPv6] Adding fwaas tests Change-Id: Ic62cc852a2b0d23fb9228526b4d85300377ce7f8 --- .../tests/nsxv3/api/test_ipv6_v2_fwaas.py | 300 ++++++++++++++++++ 1 file changed, 300 insertions(+) create mode 100644 vmware_nsx_tempest_plugin/tests/nsxv3/api/test_ipv6_v2_fwaas.py diff --git a/vmware_nsx_tempest_plugin/tests/nsxv3/api/test_ipv6_v2_fwaas.py b/vmware_nsx_tempest_plugin/tests/nsxv3/api/test_ipv6_v2_fwaas.py new file mode 100644 index 0000000..83bc6bc --- /dev/null +++ b/vmware_nsx_tempest_plugin/tests/nsxv3/api/test_ipv6_v2_fwaas.py @@ -0,0 +1,300 @@ +# Copyright 2018 VMware Inc +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import time + +from oslo_log import log as logging + +from tempest import config +from tempest.lib.common.utils import data_utils +from tempest.lib.common.utils import test_utils +from tempest.lib import decorators +from tempest.lib import exceptions +from tempest import test + +from vmware_nsx_tempest_plugin.common import constants +from vmware_nsx_tempest_plugin.lib import feature_manager +from vmware_nsx_tempest_plugin.services import nsx_client + + +CONF = config.CONF +CONF.validation.auth_method = 'None' + +LOG = logging.getLogger(__name__) + + +class TestFwaasV2Ops(feature_manager.FeatureManager): + + @classmethod + def skip_checks(cls): + super(TestFwaasV2Ops, cls).skip_checks() + if not test.is_extension_enabled('fwaas_v2', 'network'): + msg = "Extension fwaas_v2 is not enabled." + raise cls.skipException(msg) + + @classmethod + def setup_credentials(cls): + cls.set_network_resources() + cls.admin_mgr = cls.get_client_manager('admin') + super(TestFwaasV2Ops, cls).setup_credentials() + + @classmethod + def setup_clients(cls): + """ + Create various client connections. Such as NSX. + """ + super(TestFwaasV2Ops, cls).setup_clients() + cls.nsx_client = nsx_client.NSXClient( + CONF.network.backend, + CONF.nsxv3.nsx_manager, + CONF.nsxv3.nsx_user, + CONF.nsxv3.nsx_password) + + def create_fw_basic_topo(self, protocol_name=None, source_ip=None): + if protocol_name is None: + protocol_name = 'icmp' + rule_name = data_utils.rand_name('fw-rule-') + # Create firewall rule + fw_rules = self.create_firewall_rule(name=rule_name, + protocol=protocol_name, + source_ip_address=source_ip) + rules = [] + show_rules = self.show_firewall_rule(fw_rules['firewall_rule']['id']) + # Check firewall rule + self.assertEqual(show_rules.get('firewall_rule')['name'], rule_name) + self.assertEqual(show_rules.get('firewall_rule')['protocol'], + protocol_name) + # Update firewall rule + self.update_firewall_rule( + fw_rules['firewall_rule']['id'], + protocol='tcp') + self.update_firewall_rule( + fw_rules['firewall_rule']['id'], + name='new-rule') + show_rules = self.show_firewall_rule(fw_rules['firewall_rule']['id']) + # Check firewall rule after updated value + self.assertEqual(show_rules.get('firewall_rule')['name'], 'new-rule') + self.assertEqual(show_rules.get('firewall_rule')['protocol'], + 'tcp') + rules.append(fw_rules['firewall_rule']['id']) + policy_name = data_utils.rand_name('fw-policy-') + # Create firewall policy + fw_policy = self.create_firewall_policy(name=policy_name, + firewall_rules=rules) + show_policy = self.show_firewall_policy( + fw_policy['firewall_policy']['id']) + # Check firewall policy + self.assertEqual( + show_policy.get('firewall_policy')['name'], + policy_name) + self.assertEqual(show_policy.get('firewall_policy') + ['firewall_rules'], rules) + # Update firewall policy + self.update_firewall_policy(fw_policy['firewall_policy']['id'], + name='new-policy') + show_policy = self.show_firewall_policy( + fw_policy['firewall_policy']['id']) + # Check firewall policy + self.assertEqual( + show_policy.get('firewall_policy')['name'], + 'new-policy') + policy_id = fw_policy['firewall_policy']['id'] + group_name = data_utils.rand_name('fw-group-') + # Create firewall group + fw_group = self.create_firewall_group( + name=group_name, + ingress_firewall_policy_id=policy_id, + egress_firewall_policy_id=policy_id) + show_group = self.show_firewall_group(fw_group["firewall_group"]["id"]) + # Check firewall group values + self.assertEqual(show_group.get('firewall_group')['name'], group_name) + self.assertEqual(show_group.get('firewall_group')[ + 'ingress_firewall_policy_id'], policy_id) + self.assertEqual(show_group.get('firewall_group')[ + 'egress_firewall_policy_id'], policy_id) + fw_topo = dict(fw_rules=fw_rules, fw_policy=fw_policy, + fw_group=fw_group) + return fw_topo + + def create_fw_group_port_topo( + self, + group_delete=True, + project_id=None, + ports=None, + protocol_name='icmp', + source_ip=None, + ip_version=6): + rule_name = data_utils.rand_name('fw-rule-') + # Create firewall rule + fw_rules = self.create_firewall_rule( + name=rule_name, protocol=protocol_name, project_id=project_id, + source_ip_address=source_ip, ip_version=ip_version) + self.addCleanup(test_utils.call_and_ignore_notfound_exc, + self.fwaas_v2_client.delete_firewall_v2_rule, + fw_rules['firewall_rule']['id']) + rules = [] + show_rules = self.show_firewall_rule(fw_rules['firewall_rule']['id']) + # Check firewall rule + self.assertEqual(show_rules.get('firewall_rule')['name'], rule_name) + self.assertEqual(show_rules.get('firewall_rule')['protocol'], + protocol_name) + rules.append(fw_rules['firewall_rule']['id']) + policy_name = data_utils.rand_name('fw-policy-') + # Create firewall policy + fw_policy = self.create_firewall_policy(name=policy_name, + firewall_rules=rules, + project_id=project_id) + self.addCleanup(test_utils.call_and_ignore_notfound_exc, + self.fwaas_v2_client.delete_firewall_v2_policy, + fw_policy['firewall_policy']['id']) + + show_policy = self.show_firewall_policy( + fw_policy['firewall_policy']['id']) + # Check firewall policy + self.assertEqual( + show_policy.get('firewall_policy')['name'], + policy_name) + self.assertEqual(show_policy.get('firewall_policy') + ['firewall_rules'], rules) + policy_id = fw_policy['firewall_policy']['id'] + group_name = data_utils.rand_name('fw-group-') + # Create firewall group + fw_group = self.create_firewall_group( + name=group_name, + ingress_firewall_policy_id=policy_id, + egress_firewall_policy_id=policy_id, + ports=ports, + project_id=project_id) + self.addCleanup(test_utils.call_and_ignore_notfound_exc, + self.fwaas_v2_client.delete_firewall_v2_group, + fw_group["firewall_group"]["id"]) + self._wait_firewall_ready(fw_group["firewall_group"]["id"]) + show_group = self.show_firewall_group(fw_group["firewall_group"]["id"]) + self.assertEqual(show_group.get('firewall_group')['ports'], ports) + if group_delete is True: + # Update firewall group + self.update_firewall_group(fw_group["firewall_group"]["id"], + ports=[]) + # Check updated values of firewall group + self.assertEqual( + show_group.get('firewall_group')['name'], + group_name) + self.assertEqual(show_group.get('firewall_group')[ + 'ingress_firewall_policy_id'], policy_id) + self.assertEqual(show_group.get('firewall_group')[ + 'egress_firewall_policy_id'], policy_id) + fw_topo = dict(fw_rules=fw_rules, fw_policy=fw_policy, + fw_group=fw_group) + # Delete firewall group + f_id = fw_group["firewall_group"]["id"] + time.sleep(constants.NSX_BACKEND_TIME_INTERVAL) + self.fwaas_v2_client.update_firewall_v2_group(f_id, + ports=[]) + self.fwaas_v2_client.delete_firewall_v2_group( + fw_group["firewall_group"]["id"]) + else: + fw_topo = dict(fw_rules=fw_rules, fw_policy=fw_policy, + fw_group=fw_group) + return fw_topo + + def create_fw_with_port_topology(self, protocol_name, + group_delete=True, + source_ip=None, + create_instance=False): + # Create network topo + network = \ + self.create_topology_network(network_name="fw-network") + router_name = 'fw-router' + # Create router topo + router = self.create_topology_router(router_name) + subnet_name = 'fw_v6_subnet' + self.create_topology_subnet( + subnet_name, + network, + ip_version=6, ipv6_ra_mode='slaac', + ipv6_address_mode='slaac', + cidr="2023::/64", + router_id=router['id']) + p_client = self.ports_client + ports = [] + ports.append(self.get_router_port(p_client)) + if not group_delete: + fw_topo = self.create_fw_group_port_topo( + group_delete, network['project_id'], ports, protocol_name) + return fw_topo + else: + self.create_fw_group_port_topo( + group_delete, network['project_id'], ports, protocol_name) + + @decorators.attr(type='nsxv3') + @decorators.idempotent_id('ed5b2a04-11cd-440e-acfa-39f579825fe1') + def test_fwaas_router_port_icmp(self): + """ + Test fwaasv2 api to create icmp rule/policy/group with router port and + update it and verifying its values + """ + self.create_fw_with_port_topology('icmp') + + @decorators.attr(type='nsxv3') + @decorators.idempotent_id('25b188d7-0183-4b1e-5123-15840c8e2fd6') + def test_fwaas_router_port_tcp(self): + """ + Test fwaasv2 api to create tcp rule/policy/group with router port and + update it and verifying its values + """ + self.create_fw_with_port_topology('tcp') + + @decorators.attr(type='nsxv3') + @decorators.idempotent_id('501288d7-0183-4b1e-a11d-15840c8e2fd6') + def test_fwaas_router_port_udp(self): + """ + Test fwaasv2 api to create udp rule/policy/group with router port and + update it and verifying its values + """ + self.create_fw_with_port_topology('udp') + + @decorators.attr(type='nsxv3') + @decorators.attr(type=["negative"]) + @decorators.idempotent_id('434588d7-0183-4b12-a11d-15840c8e2fd6') + def test_delete_fw_group_when_port_in_use(self): + """ + Try to delete firewall group when its in use + """ + fw_topo = self.create_fw_with_port_topology( + group_delete=False, protocol_name='icmp') + self.assertRaises(exceptions.Conflict, + self.fwaas_v2_client.delete_firewall_v2_group, + fw_topo["fw_group"]["firewall_group"]["id"]) + self.update_firewall_group(fw_topo["fw_group"]["firewall_group"]["id"], + ports=[]) + self.fwaas_v2_client.delete_firewall_v2_group( + fw_topo["fw_group"]["firewall_group"]["id"]) + + @decorators.attr(type='nsxv3') + @decorators.attr(type=["negative"]) + @decorators.idempotent_id('901488d7-1184-4b1e-511d-15878c8e2fd6') + def test_verify_firewall_group_source_ip_invalid(self): + """ + Verify Firewall group with invalid ip should be ACTIVE + """ + fw_topo = self.create_fw_with_port_topology('icmp', + source_ip="::/0", + group_delete=False) + show_group = self.show_firewall_group( + fw_topo["fw_group"]['firewall_group']['id']) + self.assertEqual("ACTIVE", show_group.get('firewall_group')['status']) + self.update_firewall_group(fw_topo["fw_group"]['firewall_group']["id"], + ports=[]) + self.fwaas_v2_client.delete_firewall_v2_group( + fw_topo["fw_group"]["firewall_group"]["id"])