diff --git a/whitebox_neutron_tempest_plugin/tests/scenario/__init__.py b/whitebox_neutron_tempest_plugin/tests/scenario/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/whitebox_neutron_tempest_plugin/tests/scenario/test_security_groups.py b/whitebox_neutron_tempest_plugin/tests/scenario/test_security_groups.py new file mode 100644 index 0000000..a192f33 --- /dev/null +++ b/whitebox_neutron_tempest_plugin/tests/scenario/test_security_groups.py @@ -0,0 +1,320 @@ +# Copyright 2022 Red Hat, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from neutron_lib import constants +from neutron_tempest_plugin.common import ssh +from neutron_tempest_plugin.common import utils +from neutron_tempest_plugin import config +from neutron_tempest_plugin.scenario import base +from oslo_log import log +import paramiko +from tempest.lib.common.utils import data_utils +from tempest.lib import decorators +from tempest.lib import exceptions as lib_exc + + +CONF = config.CONF +LOG = log.getLogger(__name__) + + +class NetworkDefaultSecGroupTest(base.BaseTempestTestCase): + credentials = ['primary', 'admin'] + required_extensions = ['router', 'security-group'] + + @classmethod + def resource_setup(cls): + super(NetworkDefaultSecGroupTest, cls).resource_setup() + # setup basic topology for servers we can log into it + cls.network = cls.create_network() + cls.subnet = cls.create_subnet(cls.network, reserve_cidr=True) + cls.router = cls.create_router_by_client() + cls.create_router_interface(cls.router['id'], cls.subnet['id']) + cls.keypair = cls.create_keypair() + + def create_vm_testing_sec_grp(self, num_servers=2, security_groups=None, + separate_nets=False): + servers, fips, server_ssh_clients = ([], [], []) + for i in range(num_servers): + # if separate_nets is True, each VM is connected to a different + # tenant network + if separate_nets and len(self.networks) < i + 1: + new_network = self.create_network() + new_subnet = self.create_subnet(new_network, reserve_cidr=True) + self.create_router_interface(self.router['id'], + new_subnet['id']) + network_id = self.networks[-1]['id'] + servers.append(self.create_server( + flavor_ref=CONF.compute.flavor_ref, + image_ref=CONF.compute.image_ref, + key_name=self.keypair['name'], + networks=[{'uuid': network_id}], + security_groups=security_groups)) + for i, server in enumerate(servers): + if separate_nets: + network_id = self.networks[i]['id'] + else: + network_id = self.networks[0]['id'] + port = self.client.list_ports( + network_id=network_id, device_id=server['server'][ + 'id'])['ports'][0] + fips.append(self.create_and_associate_floatingip(port['id'])) + server_ssh_clients.append(ssh.Client( + fips[i]['floating_ip_address'], CONF.validation.image_ssh_user, + pkey=self.keypair['private_key'])) + return server_ssh_clients, fips, servers + + def _check_connectivity_udp(self, ssh_server, ssh_client, server_ip, + port_num, should_succeed=True, server_id=''): + """Check connectivity to a UDP port + + :param ssh_client(RemoteClient): SSH client of the client side + :param ssh_server(RemoteClient)): SSH client of the server side + :param server_ip(dict): The IP address of the server's side (either + FIP or internal IP) + :param port_num(int): The UDP port + :param should_succeed(boolean): Should the connection be successful + :param server_id(str): ID of server where nc listens + """ + # Creating a script on the server to write sg_check + # to /tmp/1 file + expected_output = "sg_check_" + server_id + cmd1 = ("echo '#!/bin/sh\necho {expected_output} > /tmp/{server_ip}' " + "> /tmp/{server_ip}.sh && chmod +x /tmp/{server_ip}.sh && " + "touch /tmp/{server_ip}".format( + expected_output=expected_output, server_ip=server_ip)) + # Listen on UDP port and execute the script when a connection is made + cmd2 = ("sudo nc -l -u -p {port_num} -e /tmp/{server_ip}.sh > " + "/dev/null &\n".format(port_num=port_num, server_ip=server_ip)) + # Make sure that ncat listener process is running + # It takes a bit on RHEL to launch a ncat listener process + cmd2 += ("for i in `seq 1 5`; do\n" + "$(PATH=$PATH:/usr/sbin which pidof) nc && exit 0\n" + "sleep 0.2\n" + "done\n" + "exit 1\n") + # Initiate a UDP connection + cmd3 = "echo sg | nc -w 10 -u %s %s" % (server_ip, port_num) + # Kill the nc process to avoid process creation failure + cmd4 = 'sudo kill -9 $($(PATH=$PATH:/usr/sbin which pidof) nc) || true' + cmd5 = "cat /tmp/{server_ip}".format(server_ip=server_ip) + + ssh_server.exec_command(cmd1) + # cmd2 needs to be executed with execute_script method on RHEL8 + # instances because it includes a command that runs in background + # An exception will be launched for RHEL7 and cirros instances and + # exec_command method will be used for them + try: + ssh_server.execute_script(cmd2) + except paramiko.SSHException: + ssh_server.exec_command(cmd2) + ssh_client.exec_command(cmd3) + output = ssh_server.exec_command(cmd5) + if should_succeed: + result = expected_output in output + else: + result = expected_output not in output + ssh_server.exec_command(cmd4) + return result + + @decorators.idempotent_id('921f91f8-7734-4c42-8934-a1438d45747b') + def test_securitygroup_udp(self): + """Test connectivity between VMs when they have security group set. + + In this test Security groups are configured to allow traffic using UDP. + """ + PORT_UDP = 60000 + secgrp_name = 'udp-secgrp' + secgrp = self.os_primary.network_client.create_security_group( + name=secgrp_name) + self.security_groups.append(secgrp['security_group']) + self.create_loginable_secgroup_rule( + secgroup_id=secgrp['security_group']['id']) + ssh_clients, fips, servers = self.create_vm_testing_sec_grp( + security_groups=[{'name': secgrp_name}]) + + # Testing the negative case before adding UDP port sec group rule + # Using FIP + utils.wait_until_true( + lambda: self._check_connectivity_udp( + ssh_server=ssh_clients[1], + ssh_client=ssh_clients[0], + server_ip=fips[1]['floating_ip_address'], + port_num=PORT_UDP, + should_succeed=False, + server_id=servers[1]['server']['id']), + exception=RuntimeError( + "Timed out waiting for message from server {!r} ".format( + servers[1]['server']['id']))) + # Testing the negative case before adding UDP port sec group rule + # Using internal IP + utils.wait_until_true( + lambda: self._check_connectivity_udp( + ssh_server=ssh_clients[1], + ssh_client=ssh_clients[0], + server_ip=fips[1]['fixed_ip_address'], + port_num=PORT_UDP, + should_succeed=False, + server_id=servers[1]['server']['id']), + exception=RuntimeError( + "Timed out waiting for message from server {!r} ".format( + servers[1]['server']['id']))) + + rulesets = [{'protocol': constants.PROTO_NAME_UDP, + 'port_range_min': PORT_UDP, + 'port_range_max': PORT_UDP, + 'direction': 'ingress', + 'remote_ip_prefix': '0.0.0.0/0'}] + self.create_secgroup_rules(rulesets, + secgrp['security_group']['id']) + # Testing the UDP connectivity after adding UDP port sec group rule + # Using FIP + utils.wait_until_true( + lambda: self._check_connectivity_udp( + ssh_server=ssh_clients[0], + ssh_client=ssh_clients[1], + server_ip=fips[0]['floating_ip_address'], + port_num=PORT_UDP, + should_succeed=True, + server_id=servers[0]['server']['id']), + exception=RuntimeError( + "Timed out waiting for message from server {!r} ".format( + servers[0]['server']['id']))) + # Testing the UDP connectivity after adding UDP port sec group rule + # Using internal IP + utils.wait_until_true( + lambda: self._check_connectivity_udp( + ssh_server=ssh_clients[0], + ssh_client=ssh_clients[1], + server_ip=fips[0]['fixed_ip_address'], + port_num=PORT_UDP, + should_succeed=True, + server_id=servers[0]['server']['id']), + exception=RuntimeError( + "Timed out waiting for message from server {!r} ".format( + servers[0]['server']['id']))) + + @decorators.idempotent_id('db6ab68e-0b9b-46e8-9b15-c9612da57a08') + def test_securitygroup_udp_dns(self): + """Test connectivity between VMs on two different tenant networks. + + This test is done through the DNS port (UDP port 53) + """ + PORT_UDP = 53 + secgrp_name = 'udp-dns-secgrp' + secgrp = self.os_primary.network_client.create_security_group( + name=secgrp_name) + self.security_groups.append(secgrp['security_group']) + self.create_loginable_secgroup_rule( + secgroup_id=secgrp['security_group']['id']) + ssh_clients, fips, servers = self.create_vm_testing_sec_grp( + security_groups=[{'name': secgrp_name}], separate_nets=True) + + # Testing the negative case before adding UDP port sec group rule + # Using FIP + utils.wait_until_true( + lambda: self._check_connectivity_udp( + ssh_server=ssh_clients[1], + ssh_client=ssh_clients[0], + server_ip=fips[1]['floating_ip_address'], + port_num=PORT_UDP, + should_succeed=False, + server_id=servers[1]['server']['id']), + exception=RuntimeError( + "Timed out waiting for message from server {!r} ".format( + servers[1]['server']['id']))) + # Testing the negative case before adding UDP port sec group rule + # Using internal IP + utils.wait_until_true( + lambda: self._check_connectivity_udp( + ssh_server=ssh_clients[1], + ssh_client=ssh_clients[0], + server_ip=fips[1]['fixed_ip_address'], + port_num=PORT_UDP, + should_succeed=False, + server_id=servers[1]['server']['id']), + exception=RuntimeError( + "Timed out waiting for message from server {!r} ".format( + servers[1]['server']['id']))) + + rulesets = [{'protocol': constants.PROTO_NAME_UDP, + 'port_range_min': PORT_UDP, + 'port_range_max': PORT_UDP, + 'direction': 'ingress', + 'remote_ip_prefix': '0.0.0.0/0'}] + self.create_secgroup_rules(rulesets, + secgrp['security_group']['id']) + # Testing the UDP connectivity after adding UDP port sec group rule + # Using FIP + utils.wait_until_true( + lambda: self._check_connectivity_udp( + ssh_server=ssh_clients[0], + ssh_client=ssh_clients[1], + server_ip=fips[0]['floating_ip_address'], + port_num=PORT_UDP, + should_succeed=True, + server_id=servers[0]['server']['id']), + exception=RuntimeError( + "Timed out waiting for message from server {!r} ".format( + servers[0]['server']['id']))) + # Testing the UDP connectivity after adding UDP port sec group rule + # Using internal IP + utils.wait_until_true( + lambda: self._check_connectivity_udp( + ssh_server=ssh_clients[0], + ssh_client=ssh_clients[1], + server_ip=fips[0]['fixed_ip_address'], + port_num=PORT_UDP, + should_succeed=True, + server_id=servers[0]['server']['id']), + exception=RuntimeError( + "Timed out waiting for message from server {!r} ".format( + servers[0]['server']['id']))) + + @decorators.attr(type='negative') + @decorators.idempotent_id('cc5f679e-7196-4e68-8a82-10b73bf11e48') + def test_remove_security_group_negative(self): + # Create "existing security group" + existing_sg = self.os_primary.network_client.create_security_group( + name=data_utils.rand_name('test_sg')) + + # Create "not existing security group" + not_existing_sg = self.os_primary.network_client.create_security_group( + name=data_utils.rand_name('test_sg')) + self.delete_security_group(not_existing_sg['security_group']) + + # Create the VM + server_args = { + 'flavor_ref': CONF.compute.flavor_ref, + 'image_ref': CONF.compute.image_ref, + 'key_name': self.keypair['name'], + 'networks': [{'uuid': self.network['id']}] + } + server = self.create_server(**server_args) + LOG.info('test_negative server: {}'.format(server['server'])) + + # Try to remove "Existing SG ID" that is not associated to VM Network + # from VM. + self.assertRaises( + lib_exc.NotFound, + self.os_primary.servers_client.remove_security_group, + server_id=server['server']['id'], + name=existing_sg['security_group']['id']) + + # Try to remove "Not Existing SG ID" from VM. + self.assertRaises( + lib_exc.NotFound, + self.os_primary.servers_client.remove_security_group, + server_id=server['server']['id'], + name=not_existing_sg['security_group']['id'])