166 lines
7.5 KiB
Python
166 lines
7.5 KiB
Python
# Copyright 2023 Red Hat, Inc.
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from neutron_tempest_plugin.common import ssh
|
|
from neutron_tempest_plugin import config
|
|
from tempest.lib import decorators
|
|
|
|
from whitebox_neutron_tempest_plugin.tests.scenario import base
|
|
|
|
CONF = config.CONF
|
|
|
|
|
|
class PortSecurityTest(base.BaseTempestWhiteboxTestCase):
|
|
credentials = ['admin', 'primary']
|
|
required_extensions = ['port-security']
|
|
|
|
def _create_server(self, ports, scheduler_hints=None):
|
|
params = {
|
|
'flavor_ref': CONF.compute.flavor_ref,
|
|
'image_ref': CONF.compute.image_ref,
|
|
'key_name': self.keypair['name'],
|
|
'networks': [{'port': port['id']} for port in ports]}
|
|
if scheduler_hints:
|
|
params['scheduler_hints'] = scheduler_hints
|
|
server = self.create_server(**params)
|
|
fip = self.create_floatingip(port=ports[0])
|
|
client = ssh.Client(fip['floating_ip_address'],
|
|
CONF.validation.image_ssh_user,
|
|
pkey=self.keypair['private_key'])
|
|
return {'server': server, 'client': client, 'fip': fip}
|
|
|
|
@decorators.idempotent_id('f9523922-9e71-4fa1-9cca-303bedf230bc')
|
|
def test_port_security_disabled(self):
|
|
"""Verify that traffic is not blocked when port security is disabled
|
|
|
|
Scenario:
|
|
1. Create keypair and router.
|
|
2. Create 2 internal networks connected to the router,
|
|
one is for management and the other one is for testing.
|
|
3. Create 2 ports in each network, 2 ports for 2 servers.
|
|
Ports in the test_network will have specific static ip
|
|
addresses.
|
|
4. Create 2 servers with 2 ports each.
|
|
5. Make sure that interfaces connected to the test network
|
|
are up and running with corresponding ip address.
|
|
6. Test that one server (src_server) can reach the other
|
|
(dst_server) by the ip address configured on the interface
|
|
connected to the test network.
|
|
7. On dst_server change MAC and ip address of the interface
|
|
connected to the test network.
|
|
8. Verify that src_server is still able to access dst_server
|
|
through the port connected to the test_network even when
|
|
MAC and ip address do not match the ones configured for
|
|
the port.
|
|
|
|
"""
|
|
self.keypair = self.create_keypair()
|
|
router = self.create_router_by_client()
|
|
|
|
mgmt_network = self.create_network(port_security_enabled=False)
|
|
test_network = self.create_network(port_security_enabled=False)
|
|
mgmt_subnet = self.create_subnet(
|
|
network=mgmt_network, cidr="192.168.0.0/24")
|
|
self.create_router_interface(router['id'], mgmt_subnet['id'])
|
|
# test_subnet should have no gateway, otherwise, VM routes with two
|
|
# ports will be messed up
|
|
test_subnet = self.create_subnet(
|
|
network=test_network, cidr="192.168.1.0/24", gateway=None)
|
|
|
|
ips = {'src_server': '192.168.1.11',
|
|
'dst_server': '192.168.1.12',
|
|
'dst_server_alt': '192.168.1.13'}
|
|
|
|
src_server_mgmt_port = self.create_port(mgmt_network)
|
|
dst_server_mgmt_port = self.create_port(mgmt_network)
|
|
src_server_test_port = self.create_port(
|
|
test_network,
|
|
fixed_ips=[{"subnet_id": test_subnet['id'],
|
|
"ip_address": ips['src_server']}])
|
|
dst_server_test_port = self.create_port(
|
|
test_network,
|
|
fixed_ips=[{"subnet_id": test_subnet['id'],
|
|
"ip_address": ips['dst_server']}])
|
|
src_server = self._create_server(
|
|
ports=[src_server_mgmt_port, src_server_test_port])
|
|
dst_server = self._create_server(
|
|
ports=[dst_server_mgmt_port, dst_server_test_port])
|
|
servers = {'src_server': src_server, 'dst_server': dst_server}
|
|
|
|
# As both servers use same image we can get interface from any of them
|
|
shell_path = "PATH=$PATH:/sbin"
|
|
test_interface = dst_server['client'].exec_command(
|
|
"{};ip addr | grep {} -B 1 | head -1 | "
|
|
r"cut -d ':' -f 2 | sed 's/\ //g'".format(
|
|
shell_path, dst_server_test_port['mac_address'])).rstrip()
|
|
|
|
# In case second interface is down we need to switch it on
|
|
# and configure ip address
|
|
for server in servers.keys():
|
|
if CONF.neutron_plugin_options.default_image_is_advanced:
|
|
cmd = ("ip addr show {interface} | grep {ip} || "
|
|
"sudo dhclient {interface}").format(
|
|
ip=ips[server], interface=test_interface)
|
|
else:
|
|
cmd = ("cat /sys/class/net/{interface}/operstate | "
|
|
"grep -q -v down && true || "
|
|
"({path}; sudo ip link set {interface} up && "
|
|
"sudo ip addr add {ip}/24 dev {interface})").format(
|
|
path=shell_path, ip=ips[server],
|
|
interface=test_interface)
|
|
servers[server]['client'].exec_command(cmd)
|
|
|
|
self.check_remote_connectivity(src_server['client'], ips['dst_server'])
|
|
|
|
# Now configure another MAC and IP address on the dst_server
|
|
# test interface and check connectivity again (in both directions)
|
|
dst_server['client'].exec_command(
|
|
"{path}; sudo ip addr del {ip}/24 dev {interface} && "
|
|
"sudo ip addr add {ip_alt}/24 dev {interface} && "
|
|
"sudo ip link set {interface} address 02:02:02:03:03:03".format(
|
|
path=shell_path, interface=test_interface,
|
|
ip=ips['dst_server'], ip_alt=ips['dst_server_alt']))
|
|
|
|
self.check_remote_connectivity(src_server['client'],
|
|
ips['dst_server_alt'])
|
|
self.check_remote_connectivity(dst_server['client'],
|
|
ips['src_server'])
|
|
|
|
@decorators.idempotent_id('b99a014c-45e4-438d-8827-1063b44d611c')
|
|
def test_two_vms_same_compute_when_no_port_security(self):
|
|
self.keypair = self.create_keypair()
|
|
router = self.create_router_by_client()
|
|
|
|
test_network = self.create_network(port_security_enabled=False)
|
|
test_subnet = self.create_subnet(network=test_network)
|
|
self.create_router_interface(router['id'], test_subnet['id'])
|
|
|
|
vm1_port = self.create_port(test_network)
|
|
vm2_port = self.create_port(test_network)
|
|
vm1 = self._create_server(ports=[vm1_port])
|
|
vm2 = self._create_server(
|
|
ports=[vm2_port],
|
|
scheduler_hints={'same_host': vm1['server']['server']['id']})
|
|
|
|
if (self.get_host_for_server(vm1['server']['server']['id']) !=
|
|
self.get_host_for_server(vm2['server']['server']['id'])):
|
|
raise self.skipException("VMs are running on different hosts")
|
|
|
|
for vm in (vm1, vm2):
|
|
self.check_connectivity(
|
|
host=vm['fip']['floating_ip_address'],
|
|
ssh_user=vm['client'].username,
|
|
ssh_key=vm['client'].pkey)
|