[Tempest]: Removing of extra security groups added to port

Earlier ICMP rule were added to ports due to this
even after applying port security to instane port vms were able to
communicate with each other.

Change-Id: Iaea4fd3e0533f14b157b95a5cf17eefed771281d
This commit is contained in:
Puneet Arora 2018-03-06 19:26:29 +00:00
parent 1cbe4c9259
commit d08e0918cc

View File

@ -65,6 +65,7 @@ class TestNSXv3PortSecurityScenario(manager.NetworkScenarioTest):
self.cmgr_adm = self.get_client_manager('admin')
self.keypairs = {}
self.servers = []
self.rules = []
self.config_drive = CONF.compute_feature_enabled.config_drive
def _delete_router(self, router):
@ -168,11 +169,6 @@ class TestNSXv3PortSecurityScenario(manager.NetworkScenarioTest):
port_range_max=22,
remote_ip_prefix=CONF.network.public_network_cidr
),
dict(
direction='ingress',
protocol='icmp',
remote_ip_prefix=CONF.network.public_network_cidr
),
dict(
direction='ingress',
protocol='icmp',
@ -180,8 +176,11 @@ class TestNSXv3PortSecurityScenario(manager.NetworkScenarioTest):
)
]
for ruleset in rulesets:
self._create_security_group_rule(secgroup=self.security_group,
tenant_id=tenant_id, **ruleset)
self.rules.append(
self._create_security_group_rule(
secgroup=self.security_group,
tenant_id=tenant_id,
**ruleset))
def create_network_topo(self):
self.network = self._create_network()
@ -230,18 +229,18 @@ class TestNSXv3PortSecurityScenario(manager.NetworkScenarioTest):
def _check_server_connectivity(self, floating_ip,
remote_ip, private_key,
should_connect=True):
ssh_source = self.get_remote_client(floating_ip,
private_key=private_key)
msg = "ip address %s is reachable" % remote_ip
try:
self.assertTrue(self._check_remote_connectivity
(ssh_source, remote_ip, should_connect),
msg)
except Exception:
LOG.exception("Unable to access {dest} via ssh to "
"floating-ip {src}".format(dest=remote_ip,
src=floating_ip))
raise
ssh_source = self.get_remote_client(floating_ip,
private_key=private_key)
msg = "ip address %s is reachable" % remote_ip
try:
self.assertTrue(self._check_remote_connectivity
(ssh_source, remote_ip, should_connect),
msg)
except Exception:
LOG.exception("Unable to access {dest} via ssh to "
"floating-ip {src}".format(dest=remote_ip,
src=floating_ip))
raise
def _test_create_server_with_port_security_and_check_backend(self,
network_topo):
@ -332,6 +331,10 @@ class TestNSXv3PortSecurityScenario(manager.NetworkScenarioTest):
self._get_server_key(server_default_2)
port_client.update_port(port_id_server_2, **body)
time.sleep(constants.NSX_BACKEND_TIME_INTERVAL)
for rule in self.rules:
if rule.get('remote_group_id'):
self.compute_security_group_rules_client.\
delete_security_group_rule(rule['id'])
self._check_server_connectivity(public_ip_address_server_2,
private_ip_address_server_1,
private_key_server_2,
@ -340,6 +343,10 @@ class TestNSXv3PortSecurityScenario(manager.NetworkScenarioTest):
"security_groups": [sec_group]}
port_client.update_port(port_id_server_2, **body)
time.sleep(constants.NSX_BACKEND_TIME_INTERVAL)
ruleset = dict(direction='ingress',
remote_group_id=self.security_group['id'])
self._create_security_group_rule(
secgroup=self.security_group, **ruleset)
self._check_server_connectivity(public_ip_address_server_2,
private_ip_address_server_1,
private_key_server_2)
@ -454,6 +461,10 @@ class TestNSXv3PortSecurityScenario(manager.NetworkScenarioTest):
sec_group = sec_grp_port['port']['security_groups'][0]
port_client.update_port(port_id, **kwargs)
time.sleep(constants.NSX_BACKEND_TIME_INTERVAL)
for rule in self.rules:
if rule.get('remote_group_id'):
self.compute_security_group_rules_client.\
delete_security_group_rule(rule['id'])
self._check_server_connectivity(public_address_server_2,
private_address_server_1,
private_key_server_2,
@ -462,6 +473,11 @@ class TestNSXv3PortSecurityScenario(manager.NetworkScenarioTest):
"security_groups": [sec_group]}
port_client.update_port(port_id, **kwargs)
time.sleep(constants.NSX_BACKEND_TIME_INTERVAL)
ruleset = dict(
direction='ingress',
remote_group_id=self.security_group['id'])
self._create_security_group_rule(
secgroup=self.security_group, **ruleset)
self._check_server_connectivity(public_address_server_2,
private_address_server_1,
private_key_server_2)