Merge "Remove use of Amphora FIP stacks"

This commit is contained in:
Zuul 2022-12-15 17:29:35 +00:00 committed by Gerrit Code Review
commit ad2460f714
5 changed files with 163 additions and 101 deletions

View File

@ -28,6 +28,8 @@ get_amphora = _amphora.get_amphora
get_amphora_compute_node = _amphora.get_amphora_compute_node get_amphora_compute_node = _amphora.get_amphora_compute_node
get_master_amphora = _amphora.get_master_amphora get_master_amphora = _amphora.get_master_amphora
list_amphorae = _amphora.list_amphorae list_amphorae = _amphora.list_amphorae
get_amphora_stats = _amphora.get_amphora_stats
run_command_on_amphora = _amphora.run_command_on_amphora
OCTAVIA_CLIENT_CLASSSES = _client.OCTAVIA_CLIENT_CLASSSES OCTAVIA_CLIENT_CLASSSES = _client.OCTAVIA_CLIENT_CLASSSES
get_octavia_client = _client.get_octavia_client get_octavia_client = _client.get_octavia_client

View File

@ -15,13 +15,20 @@ from __future__ import absolute_import
import typing import typing
from oslo_log import log
import tobiko import tobiko
from tobiko import tripleo
from tobiko import config
from tobiko.openstack.octavia import _client from tobiko.openstack.octavia import _client
from tobiko.openstack.octavia import _load_balancer from tobiko.openstack.octavia import _load_balancer
from tobiko.openstack import nova from tobiko.openstack import nova
from tobiko.openstack.octavia import _validators from tobiko.openstack.octavia import _validators
from tobiko.openstack import topology from tobiko.openstack import topology
from tobiko.shell import sh
LOG = log.getLogger(__name__)
CONF = config.CONF
AmphoraType = typing.Dict[str, typing.Any] AmphoraType = typing.Dict[str, typing.Any]
AmphoraIdType = typing.Union[str, AmphoraType] AmphoraIdType = typing.Union[str, AmphoraType]
@ -83,6 +90,16 @@ def get_amphora_compute_node(load_balancer: _load_balancer.LoadBalancerIdType,
return topology.get_openstack_node(hostname=hostname) return topology.get_openstack_node(hostname=hostname)
def get_amphora_stats(amphora_id, client=None):
"""
:param amphora_id: the amphora id
:param client: The Octavia client
:return (dict): The amphora stats dict.
"""
return _client.octavia_client(client).amphora_stats_show(amphora_id)
def get_master_amphora(amphorae: typing.Iterable[AmphoraType], def get_master_amphora(amphorae: typing.Iterable[AmphoraType],
port: int, port: int,
protocol: str, protocol: str,
@ -117,10 +134,100 @@ def get_master_amphora(amphorae: typing.Iterable[AmphoraType],
# The amphora which has total_connections > 0 is the master. # The amphora which has total_connections > 0 is the master.
# Backup amphora will always have total_connections == 0. # Backup amphora will always have total_connections == 0.
for amphora in amphorae: for amphora in amphorae:
amphora_stats = _client.octavia_client(client).amphora_stats_show( amphora_stats = get_amphora_stats(amphora_id=amphora['id'],
amphora['id']) client=client)
for listener in list(amphora_stats.values())[0]: for listener in list(amphora_stats.values())[0]:
if listener['total_connections'] > 0: if listener['total_connections'] > 0:
LOG.debug(f"Chosen amphora is {amphora['id']} with the "
f"following stats: {amphora_stats}")
return amphora return amphora
raise ValueError("Master Amphora wasn't found!") raise ValueError("Master Amphora wasn't found!")
def run_command_on_amphora(command: str,
lb_id: _load_balancer.LoadBalancerIdType = None,
lb_fip: str = None,
amp_id: str = None,
sudo: bool = False) -> str:
"""
Run a given command on the master/single amphora
:param command: The command to run on the amphora
:param lb_id: The load balancer id whose amphora should run the command
:param lb_fip: The loadbalancer floating ip
:param amp_id: The single/master amphora id
:param sudo: (bool) Whether to run the command with sudo permissions
on the amphora
:return: The command output (str)
"""
# Get the master/single amphora lb_network_ip
if amp_id:
amp_lb_network_ip = get_amphora(amphora=amp_id)['lb_network_ip']
elif lb_id and lb_fip:
amphorae = list_amphorae(load_balancer_id=lb_id)
amphora = get_master_amphora(amphorae=amphorae,
port=80,
protocol='HTTP',
ip_address=lb_fip)
amp_lb_network_ip = amphora['lb_network_ip']
else:
raise ValueError('Either amphora id or both the loadbalancer id '
'and the loadbalancer floating ip need to be '
'provided.')
# Find the undercloud ssh client and (any) controller ip
def _get_overcloud_node_ssh_client(group):
return topology.list_openstack_nodes(group=group)[0].ssh_client
controller_ip = _get_overcloud_node_ssh_client('controller').host
undercloud_client = _get_overcloud_node_ssh_client('undercloud')
if not controller_ip or not undercloud_client:
raise RuntimeError(f'Either controller ip {controller_ip} or '
f'undercloud ssh client {undercloud_client} was'
' not found.')
# Preparing ssh command
osp_major_version = tripleo.overcloud_version().major
if osp_major_version == 16:
ssh_add_command = 'ssh-add'
elif osp_major_version == 17:
ssh_add_command = 'sudo -E ssh-add /etc/octavia/ssh/octavia_id_rsa'
else:
raise NotImplementedError('The ssh_add_command is not implemented '
f'for OSP version {osp_major_version}.')
ssh_agent_output = sh.execute(
'ssh-agent -s',
ssh_client=undercloud_client).stdout.strip()
# Example: eval {ssh_agent_output} ssh-add
start_agent_cmd = f'eval {ssh_agent_output} {ssh_add_command}; '
# Example: ssh -A -t heat-admin@192.168.24.13
controller_user = CONF.tobiko.tripleo.overcloud_ssh_username
controller_ssh_command = f'ssh -A -t {controller_user}@{controller_ip}'
amphora_user = CONF.tobiko.octavia.amphora_user
# Example: ssh -o StrictHostKeyChecking=no cloud-user@172.24.0.214
amphora_ssh_command = 'ssh -o StrictHostKeyChecking=no ' \
f'{amphora_user}@{amp_lb_network_ip}'
full_amp_ssh_cmd = f'{controller_ssh_command} {amphora_ssh_command}'
if sudo:
command = f'sudo {command}'
# Example:
# $(ssh-agent -s) > ssh_agent_output
#
# eval {ssh_agent_output}; ssh-add; ssh -A -t heat-admin@192.168.24.13\
# ssh -o StrictHostKeyChecking=no cloud-user@172.24.0.214 <command>
command = f'{start_agent_cmd} {full_amp_ssh_cmd} {command}'
out = sh.execute(command,
ssh_client=undercloud_client,
sudo=False).stdout.strip()
# Removing the ssh-agent output
# 'Agent pid 642546\n<output-we-want>' -> '<output-we-want>'
formatted_out = '\n'.join(out.split('\n')[1:])
return formatted_out

View File

@ -25,6 +25,9 @@ OPTIONS = [
cfg.IntOpt('check_timeout', cfg.IntOpt('check_timeout',
default=360, default=360,
help='Timeout, in seconds, to wait for a status change.'), help='Timeout, in seconds, to wait for a status change.'),
cfg.StrOpt('amphora_user',
default='cloud-user',
help='The user we should use when we SSH the amphora.'),
] ]

View File

@ -15,7 +15,6 @@
# under the License. # under the License.
from __future__ import absolute_import from __future__ import absolute_import
import typing
from oslo_log import log from oslo_log import log
@ -28,7 +27,6 @@ from tobiko.openstack.stacks import _hot
from tobiko.openstack.stacks import _neutron from tobiko.openstack.stacks import _neutron
from tobiko.openstack.stacks import _ubuntu from tobiko.openstack.stacks import _ubuntu
from tobiko.shell import sh from tobiko.shell import sh
from tobiko.shell import ssh
CONF = config.CONF CONF = config.CONF
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
@ -242,25 +240,6 @@ class HttpRoundRobinAmphoraIpv4Listener(heat.HeatStackFixture):
port=self.lb_port, port=self.lb_port,
protocol=self.lb_protocol) protocol=self.lb_protocol)
_amphora_floating_ip_stack: typing.Optional[
'AmphoraFloatingIpStack'] = None
@property
def amphora_floating_ip(self) -> neutron.FloatingIpType:
if self._amphora_floating_ip_stack is None:
self._amphora_floating_ip_stack = AmphoraFloatingIpStack(
amphora=self.amphora)
return tobiko.setup_fixture(
self._amphora_floating_ip_stack).floating_ip_details
@property
def amphora_ssh_client(self) -> ssh.SSHClientType:
"""Get an ssh_client and execute the command on the Amphora"""
floating_ip = self.amphora_floating_ip
return ssh.ssh_client(host=floating_ip['floating_ip_address'],
username='cloud-user',
connection_timeout=10)
@property @property
def amphora_mgmt_port(self) -> neutron.PortType: def amphora_mgmt_port(self) -> neutron.PortType:
""" Get amphora's management network's port """ Get amphora's management network's port
@ -328,46 +307,3 @@ class TcpSourceIpPortOvnIpv4Listener(HttpRoundRobinAmphoraIpv4Listener):
class TcpSourceIpPortOvnIpv6Listener(TcpSourceIpPortOvnIpv4Listener): class TcpSourceIpPortOvnIpv6Listener(TcpSourceIpPortOvnIpv4Listener):
ip_version = 6 ip_version = 6
class AmphoraFloatingIpStack(_neutron.FloatingIpStackFixture):
def __init__(self,
amphora: octavia.AmphoraIdType = None,
stack_name: str = None,
network: neutron.NetworkIdType = None,
neutron_client: neutron.NeutronClientType = None,
octavia_client: octavia.OctaviaClientType = None):
super().__init__(stack_name=stack_name,
network=network,
neutron_client=neutron_client)
self._amphora = amphora
self._octavia_client = octavia_client
@property
def amphora(self) -> str:
return octavia.get_amphora_id(self.amphora_details)
@property
def amphora_details(self) -> octavia.AmphoraType:
if self._amphora is None:
raise ValueError('Amphora not specified')
if isinstance(self._amphora, str):
self._amphora = octavia.get_amphora(self._amphora)
assert isinstance(self._amphora, dict)
return self._amphora
def setup_stack_name(self) -> str:
stack_name = self.stack_name
if stack_name is None:
self.stack_name = stack_name = (
f"{self.fixture_name}-{self.amphora}")
return stack_name
@property
def fixed_ip_address(self) -> str:
return self.amphora_details['lb_network_ip']
@property
def device_id(self) -> typing.Optional[str]:
return self.amphora_details['compute_id']

View File

@ -20,7 +20,6 @@ from oslo_log import log
import tobiko import tobiko
from tobiko.openstack import keystone from tobiko.openstack import keystone
from tobiko.openstack import octavia from tobiko.openstack import octavia
from tobiko.openstack import neutron
from tobiko.openstack import stacks from tobiko.openstack import stacks
from tobiko.shell import ssh from tobiko.shell import ssh
from tobiko.shell import sh from tobiko.shell import sh
@ -141,9 +140,6 @@ class OctaviaBasicFaultTest(testtools.TestCase):
if attempt.is_last: if attempt.is_last:
raise raise
self._plug_new_amphora_to_existing_fip()
@tobiko.skip(reason='Bugzilla', bugzilla=2126055)
def test_kill_amphora_agent(self): def test_kill_amphora_agent(self):
"""Kill the MASTER amphora agent """Kill the MASTER amphora agent
@ -156,21 +152,22 @@ class OctaviaBasicFaultTest(testtools.TestCase):
self._skip_if_not_active_standby() self._skip_if_not_active_standby()
# Finding the amphora agent pid and kill it # Finding the amphora agent pid and kill it
amp_agent_pid_command = ( amp_agent_pid_command = (
"ps -ef | awk '/amphora/{print $2}' | head -n 1") "ps -ef | awk '/amphora/{print $2}' | head -n 1")
amp_agent_pid = octavia.run_command_on_amphora(
command=amp_agent_pid_command,
lb_id=self.loadbalancer_stack.loadbalancer_id,
lb_fip=self.loadbalancer_stack.floating_ip_address)
LOG.info(f'The amp_agent_pid is {amp_agent_pid}')
amp_agent_pid = sh.execute( octavia.run_command_on_amphora(
amp_agent_pid_command, ssh_client=self.amphora_ssh_client, command=f'kill -9 {amp_agent_pid}',
sudo=True).stdout.strip() lb_id=self.loadbalancer_stack.loadbalancer_id,
lb_fip=self.loadbalancer_stack.floating_ip_address,
sh.execute(f'kill -9 {amp_agent_pid}',
ssh_client=self.amphora_ssh_client,
sudo=True) sudo=True)
self._wait_for_failover_and_test_functionality() self._wait_for_failover_and_test_functionality()
@tobiko.skip(reason='Bugzilla', bugzilla=2126055)
def test_stop_keepalived(self): def test_stop_keepalived(self):
"""Stop keepalived on MASTER amphora """Stop keepalived on MASTER amphora
@ -182,13 +179,16 @@ class OctaviaBasicFaultTest(testtools.TestCase):
self._skip_if_not_active_standby() self._skip_if_not_active_standby()
sh.stop_systemd_units('octavia-keepalived', stop_keepalived_cmd = 'systemctl stop octavia-keepalived'
ssh_client=self.amphora_ssh_client,
octavia.run_command_on_amphora(
command=stop_keepalived_cmd,
lb_id=self.loadbalancer_stack.loadbalancer_id,
lb_fip=self.loadbalancer_stack.floating_ip_address,
sudo=True) sudo=True)
self._wait_for_failover_and_test_functionality() self._wait_for_failover_and_test_functionality()
@tobiko.skip(reason='Bugzilla', bugzilla=2126055)
def test_stop_haproxy(self): def test_stop_haproxy(self):
"""Stop haproxy on MASTER amphora """Stop haproxy on MASTER amphora
@ -200,8 +200,19 @@ class OctaviaBasicFaultTest(testtools.TestCase):
self._skip_if_not_active_standby() self._skip_if_not_active_standby()
sh.stop_systemd_units('haproxy-*', # Finding the amphora haproxy unit name and stop it
ssh_client=self.amphora_ssh_client, amp_haproxy_unit_command = (
"systemctl list-units | awk '/haproxy-/{print $1}'")
amp_haproxy_unit = octavia.run_command_on_amphora(
command=amp_haproxy_unit_command,
lb_id=self.loadbalancer_stack.loadbalancer_id,
lb_fip=self.loadbalancer_stack.floating_ip_address)
LOG.info(f'The amp_haproxy_unit is {amp_haproxy_unit}')
octavia.run_command_on_amphora(
command=f'systemctl stop {amp_haproxy_unit}',
lb_id=self.loadbalancer_stack.loadbalancer_id,
lb_fip=self.loadbalancer_stack.floating_ip_address,
sudo=True) sudo=True)
self._wait_for_failover_and_test_functionality() self._wait_for_failover_and_test_functionality()
@ -228,18 +239,21 @@ class OctaviaBasicFaultTest(testtools.TestCase):
# Wait for Octavia objects' provisioning status to be ACTIVE # Wait for Octavia objects' provisioning status to be ACTIVE
self.listener_stack.wait_for_active_members() self.listener_stack.wait_for_active_members()
# Verify Octavia functionality # For 5 minutes we ignore specific exceptions as we know
# that Octavia resources are being reprovisioned (amphora during a
# failover)
for attempt in tobiko.retry(timeout=300.):
try:
octavia.check_members_balanced( octavia.check_members_balanced(
pool_id=self.listener_stack.pool_id, pool_id=self.listener_stack.pool_id,
ip_address=self.loadbalancer_stack.floating_ip_address, ip_address=self.loadbalancer_stack.floating_ip_address,
lb_algorithm=self.listener_stack.lb_algorithm, lb_algorithm=self.listener_stack.lb_algorithm,
protocol=self.listener_stack.lb_protocol, protocol=self.listener_stack.lb_protocol,
port=self.listener_stack.lb_port) port=self.listener_stack.lb_port)
break
self._plug_new_amphora_to_existing_fip() except octavia.RoundRobinException:
LOG.exception(f"Traffic didn't reach all members after "
def _plug_new_amphora_to_existing_fip(self): f"#{attempt.number} attempts and "
old_amphora_fip = self.listener_stack.amphora_floating_ip f"{attempt.elapsed_time} seconds")
amphora_mgmt_port = self.listener_stack.amphora_mgmt_port if attempt.is_last:
neutron.update_floating_ip(floating_ip=old_amphora_fip['id'], raise
port_id=amphora_mgmt_port['id'])