diff --git a/whitebox_neutron_tempest_plugin/tests/scenario/base.py b/whitebox_neutron_tempest_plugin/tests/scenario/base.py index c1ef754..1da6371 100644 --- a/whitebox_neutron_tempest_plugin/tests/scenario/base.py +++ b/whitebox_neutron_tempest_plugin/tests/scenario/base.py @@ -49,6 +49,10 @@ class BaseTempestWhiteboxTestCase(base.BaseTempestTestCase): LOG.debug("Output: {}".format(output)) return output.strip() + def get_host_for_server(self, server_id): + server_details = self.os_admin.servers_client.show_server(server_id) + return server_details['server']['OS-EXT-SRV-ATTR:host'] + class BaseTempestTestCaseOvn(BaseTempestWhiteboxTestCase): diff --git a/whitebox_neutron_tempest_plugin/tests/scenario/test_portsecurity.py b/whitebox_neutron_tempest_plugin/tests/scenario/test_portsecurity.py new file mode 100644 index 0000000..d7abe0e --- /dev/null +++ b/whitebox_neutron_tempest_plugin/tests/scenario/test_portsecurity.py @@ -0,0 +1,165 @@ +# Copyright 2023 Red Hat, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from neutron_tempest_plugin.common import ssh +from neutron_tempest_plugin import config +from tempest.lib import decorators + +from whitebox_neutron_tempest_plugin.tests.scenario import base + +CONF = config.CONF + + +class PortSecurityTest(base.BaseTempestWhiteboxTestCase): + credentials = ['admin', 'primary'] + required_extensions = ['port-security'] + + def _create_server(self, ports, scheduler_hints=None): + params = { + 'flavor_ref': CONF.compute.flavor_ref, + 'image_ref': CONF.compute.image_ref, + 'key_name': self.keypair['name'], + 'networks': [{'port': port['id']} for port in ports]} + if scheduler_hints: + params['scheduler_hints'] = scheduler_hints + server = self.create_server(**params) + fip = self.create_floatingip(port=ports[0]) + client = ssh.Client(fip['floating_ip_address'], + CONF.validation.image_ssh_user, + pkey=self.keypair['private_key']) + return {'server': server, 'client': client, 'fip': fip} + + @decorators.idempotent_id('f9523922-9e71-4fa1-9cca-303bedf230bc') + def test_port_security_disabled(self): + """Verify that traffic is not blocked when port security is disabled + + Scenario: + 1. Create keypair and router. + 2. Create 2 internal networks connected to the router, + one is for management and the other one is for testing. + 3. Create 2 ports in each network, 2 ports for 2 servers. + Ports in the test_network will have specific static ip + addresses. + 4. Create 2 servers with 2 ports each. + 5. Make sure that interfaces connected to the test network + are up and running with corresponding ip address. + 6. Test that one server (src_server) can reach the other + (dst_server) by the ip address configured on the interface + connected to the test network. + 7. On dst_server change MAC and ip address of the interface + connected to the test network. + 8. Verify that src_server is still able to access dst_server + through the port connected to the test_network even when + MAC and ip address do not match the ones configured for + the port. + + """ + self.keypair = self.create_keypair() + router = self.create_router_by_client() + + mgmt_network = self.create_network(port_security_enabled=False) + test_network = self.create_network(port_security_enabled=False) + mgmt_subnet = self.create_subnet( + network=mgmt_network, cidr="192.168.0.0/24") + self.create_router_interface(router['id'], mgmt_subnet['id']) + # test_subnet should have no gateway, otherwise, VM routes with two + # ports will be messed up + test_subnet = self.create_subnet( + network=test_network, cidr="192.168.1.0/24", gateway=None) + + ips = {'src_server': '192.168.1.11', + 'dst_server': '192.168.1.12', + 'dst_server_alt': '192.168.1.13'} + + src_server_mgmt_port = self.create_port(mgmt_network) + dst_server_mgmt_port = self.create_port(mgmt_network) + src_server_test_port = self.create_port( + test_network, + fixed_ips=[{"subnet_id": test_subnet['id'], + "ip_address": ips['src_server']}]) + dst_server_test_port = self.create_port( + test_network, + fixed_ips=[{"subnet_id": test_subnet['id'], + "ip_address": ips['dst_server']}]) + src_server = self._create_server( + ports=[src_server_mgmt_port, src_server_test_port]) + dst_server = self._create_server( + ports=[dst_server_mgmt_port, dst_server_test_port]) + servers = {'src_server': src_server, 'dst_server': dst_server} + + # As both servers use same image we can get interface from any of them + shell_path = "PATH=$PATH:/sbin" + test_interface = dst_server['client'].exec_command( + "{};ip addr | grep {} -B 1 | head -1 | " + r"cut -d ':' -f 2 | sed 's/\ //g'".format( + shell_path, dst_server_test_port['mac_address'])).rstrip() + + # In case second interface is down we need to switch it on + # and configure ip address + for server in servers.keys(): + if CONF.neutron_plugin_options.default_image_is_advanced: + cmd = ("ip addr show {interface} | grep {ip} || " + "sudo dhclient {interface}").format( + ip=ips[server], interface=test_interface) + else: + cmd = ("cat /sys/class/net/{interface}/operstate | " + "grep -q -v down && true || " + "({path}; sudo ip link set {interface} up && " + "sudo ip addr add {ip}/24 dev {interface})").format( + path=shell_path, ip=ips[server], + interface=test_interface) + servers[server]['client'].exec_command(cmd) + + self.check_remote_connectivity(src_server['client'], ips['dst_server']) + + # Now configure another MAC and IP address on the dst_server + # test interface and check connectivity again (in both directions) + dst_server['client'].exec_command( + "{path}; sudo ip addr del {ip}/24 dev {interface} && " + "sudo ip addr add {ip_alt}/24 dev {interface} && " + "sudo ip link set {interface} address 02:02:02:03:03:03".format( + path=shell_path, interface=test_interface, + ip=ips['dst_server'], ip_alt=ips['dst_server_alt'])) + + self.check_remote_connectivity(src_server['client'], + ips['dst_server_alt']) + self.check_remote_connectivity(dst_server['client'], + ips['src_server']) + + @decorators.idempotent_id('b99a014c-45e4-438d-8827-1063b44d611c') + def test_two_vms_same_compute_when_no_port_security(self): + self.keypair = self.create_keypair() + router = self.create_router_by_client() + + test_network = self.create_network(port_security_enabled=False) + test_subnet = self.create_subnet(network=test_network) + self.create_router_interface(router['id'], test_subnet['id']) + + vm1_port = self.create_port(test_network) + vm2_port = self.create_port(test_network) + vm1 = self._create_server(ports=[vm1_port]) + vm2 = self._create_server( + ports=[vm2_port], + scheduler_hints={'same_host': vm1['server']['server']['id']}) + + if (self.get_host_for_server(vm1['server']['server']['id']) != + self.get_host_for_server(vm2['server']['server']['id'])): + raise self.skipException("VMs are running on different hosts") + + for vm in (vm1, vm2): + self.check_connectivity( + host=vm['fip']['floating_ip_address'], + ssh_user=vm['client'].username, + ssh_key=vm['client'].pkey)