diff --git a/vmware_nsx_tempest/tests/nsxv3/scenario/test_nsx_port_security.py b/vmware_nsx_tempest/tests/nsxv3/scenario/test_nsx_port_security.py index 35db2ff..d3c5563 100644 --- a/vmware_nsx_tempest/tests/nsxv3/scenario/test_nsx_port_security.py +++ b/vmware_nsx_tempest/tests/nsxv3/scenario/test_nsx_port_security.py @@ -65,6 +65,7 @@ class TestNSXv3PortSecurityScenario(manager.NetworkScenarioTest): self.cmgr_adm = self.get_client_manager('admin') self.keypairs = {} self.servers = [] + self.rules = [] self.config_drive = CONF.compute_feature_enabled.config_drive def _delete_router(self, router): @@ -168,11 +169,6 @@ class TestNSXv3PortSecurityScenario(manager.NetworkScenarioTest): port_range_max=22, remote_ip_prefix=CONF.network.public_network_cidr ), - dict( - direction='ingress', - protocol='icmp', - remote_ip_prefix=CONF.network.public_network_cidr - ), dict( direction='ingress', protocol='icmp', @@ -180,8 +176,11 @@ class TestNSXv3PortSecurityScenario(manager.NetworkScenarioTest): ) ] for ruleset in rulesets: - self._create_security_group_rule(secgroup=self.security_group, - tenant_id=tenant_id, **ruleset) + self.rules.append( + self._create_security_group_rule( + secgroup=self.security_group, + tenant_id=tenant_id, + **ruleset)) def create_network_topo(self): self.network = self._create_network() @@ -230,18 +229,18 @@ class TestNSXv3PortSecurityScenario(manager.NetworkScenarioTest): def _check_server_connectivity(self, floating_ip, remote_ip, private_key, should_connect=True): - ssh_source = self.get_remote_client(floating_ip, - private_key=private_key) - msg = "ip address %s is reachable" % remote_ip - try: - self.assertTrue(self._check_remote_connectivity - (ssh_source, remote_ip, should_connect), - msg) - except Exception: - LOG.exception("Unable to access {dest} via ssh to " - "floating-ip {src}".format(dest=remote_ip, - src=floating_ip)) - raise + ssh_source = self.get_remote_client(floating_ip, + private_key=private_key) + msg = "ip address %s is reachable" % remote_ip + try: + self.assertTrue(self._check_remote_connectivity + (ssh_source, remote_ip, should_connect), + msg) + except Exception: + LOG.exception("Unable to access {dest} via ssh to " + "floating-ip {src}".format(dest=remote_ip, + src=floating_ip)) + raise def _test_create_server_with_port_security_and_check_backend(self, network_topo): @@ -332,6 +331,10 @@ class TestNSXv3PortSecurityScenario(manager.NetworkScenarioTest): self._get_server_key(server_default_2) port_client.update_port(port_id_server_2, **body) time.sleep(constants.NSX_BACKEND_TIME_INTERVAL) + for rule in self.rules: + if rule.get('remote_group_id'): + self.compute_security_group_rules_client.\ + delete_security_group_rule(rule['id']) self._check_server_connectivity(public_ip_address_server_2, private_ip_address_server_1, private_key_server_2, @@ -340,6 +343,10 @@ class TestNSXv3PortSecurityScenario(manager.NetworkScenarioTest): "security_groups": [sec_group]} port_client.update_port(port_id_server_2, **body) time.sleep(constants.NSX_BACKEND_TIME_INTERVAL) + ruleset = dict(direction='ingress', + remote_group_id=self.security_group['id']) + self._create_security_group_rule( + secgroup=self.security_group, **ruleset) self._check_server_connectivity(public_ip_address_server_2, private_ip_address_server_1, private_key_server_2) @@ -454,6 +461,10 @@ class TestNSXv3PortSecurityScenario(manager.NetworkScenarioTest): sec_group = sec_grp_port['port']['security_groups'][0] port_client.update_port(port_id, **kwargs) time.sleep(constants.NSX_BACKEND_TIME_INTERVAL) + for rule in self.rules: + if rule.get('remote_group_id'): + self.compute_security_group_rules_client.\ + delete_security_group_rule(rule['id']) self._check_server_connectivity(public_address_server_2, private_address_server_1, private_key_server_2, @@ -462,6 +473,11 @@ class TestNSXv3PortSecurityScenario(manager.NetworkScenarioTest): "security_groups": [sec_group]} port_client.update_port(port_id, **kwargs) time.sleep(constants.NSX_BACKEND_TIME_INTERVAL) + ruleset = dict( + direction='ingress', + remote_group_id=self.security_group['id']) + self._create_security_group_rule( + secgroup=self.security_group, **ruleset) self._check_server_connectivity(public_address_server_2, private_address_server_1, private_key_server_2)