Merge "[AMQP 1.0] Randomize host list connection attempts"
This commit is contained in:
commit
f7ceec7aa4
@ -26,6 +26,7 @@ functions scheduled by the Controller.
|
||||
|
||||
import abc
|
||||
import logging
|
||||
import random
|
||||
import threading
|
||||
import uuid
|
||||
|
||||
@ -298,7 +299,7 @@ class Hosts(object):
|
||||
entry.port = entry.port or 5672
|
||||
entry.username = entry.username or default_username
|
||||
entry.password = entry.password or default_password
|
||||
self._current = 0
|
||||
self._current = random.randint(0, len(self._entries) - 1)
|
||||
|
||||
@property
|
||||
def current(self):
|
||||
|
@ -512,6 +512,8 @@ class TestFailover(test_utils.BaseTestCase):
|
||||
def setUp(self):
|
||||
super(TestFailover, self).setUp()
|
||||
self._brokers = [FakeBroker(), FakeBroker()]
|
||||
self._primary = 0
|
||||
self._backup = 1
|
||||
hosts = []
|
||||
for broker in self._brokers:
|
||||
hosts.append(oslo_messaging.TransportHost(hostname=broker.host,
|
||||
@ -526,8 +528,10 @@ class TestFailover(test_utils.BaseTestCase):
|
||||
if broker.isAlive():
|
||||
broker.stop()
|
||||
|
||||
def _failover(self, fail_brokers):
|
||||
def _failover(self, fail_broker):
|
||||
self._brokers[0].start()
|
||||
self._brokers[1].start()
|
||||
|
||||
# self.config(trace=True, group="oslo_messaging_amqp")
|
||||
driver = amqp_driver.ProtonDriver(self.conf, self._broker_url)
|
||||
|
||||
@ -535,12 +539,17 @@ class TestFailover(test_utils.BaseTestCase):
|
||||
listener = _ListenerThread(
|
||||
driver.listen(target, None, None)._poll_style_listener, 2)
|
||||
|
||||
# wait for listener links to come up
|
||||
# wait for listener links to come up on either broker
|
||||
# 4 == 3 links per listener + 1 for the global reply queue
|
||||
predicate = lambda: self._brokers[0].sender_link_count == 4
|
||||
predicate = lambda: ((self._brokers[0].sender_link_count == 4) or
|
||||
(self._brokers[1].sender_link_count == 4))
|
||||
_wait_until(predicate, 30)
|
||||
self.assertTrue(predicate())
|
||||
|
||||
if self._brokers[1].sender_link_count == 4:
|
||||
self._primary = 1
|
||||
self._backup = 0
|
||||
|
||||
rc = driver.send(target, {"context": "whatever"},
|
||||
{"method": "echo", "id": "echo-1"},
|
||||
wait_for_reply=True,
|
||||
@ -549,15 +558,15 @@ class TestFailover(test_utils.BaseTestCase):
|
||||
self.assertEqual(rc.get('correlation-id'), 'echo-1')
|
||||
|
||||
# 1 request msg, 1 response:
|
||||
self.assertEqual(self._brokers[0].topic_count, 1)
|
||||
self.assertEqual(self._brokers[0].direct_count, 1)
|
||||
self.assertEqual(self._brokers[self._primary].topic_count, 1)
|
||||
self.assertEqual(self._brokers[self._primary].direct_count, 1)
|
||||
|
||||
# invoke failover method
|
||||
fail_brokers(self._brokers[0], self._brokers[1])
|
||||
fail_broker(self._brokers[self._primary])
|
||||
|
||||
# wait for listener links to re-establish on broker 1
|
||||
# 4 = 3 links per listener + 1 for the global reply queue
|
||||
predicate = lambda: self._brokers[1].sender_link_count == 4
|
||||
predicate = lambda: self._brokers[self._backup].sender_link_count == 4
|
||||
_wait_until(predicate, 30)
|
||||
self.assertTrue(predicate())
|
||||
|
||||
@ -570,44 +579,41 @@ class TestFailover(test_utils.BaseTestCase):
|
||||
self.assertEqual(rc.get('correlation-id'), 'echo-2')
|
||||
|
||||
# 1 request msg, 1 response:
|
||||
self.assertEqual(self._brokers[1].topic_count, 1)
|
||||
self.assertEqual(self._brokers[1].direct_count, 1)
|
||||
self.assertEqual(self._brokers[self._backup].topic_count, 1)
|
||||
self.assertEqual(self._brokers[self._backup].direct_count, 1)
|
||||
|
||||
listener.join(timeout=30)
|
||||
self.assertFalse(listener.isAlive())
|
||||
|
||||
# note: stopping the broker first tests cleaning up driver without a
|
||||
# connection active
|
||||
self._brokers[1].stop()
|
||||
self._brokers[self._backup].stop()
|
||||
driver.cleanup()
|
||||
|
||||
def test_broker_crash(self):
|
||||
"""Simulate a failure of one broker."""
|
||||
def _meth(broker0, broker1):
|
||||
# fail broker 0 and start broker 1:
|
||||
broker0.stop()
|
||||
def _meth(broker):
|
||||
# fail broker:
|
||||
broker.stop()
|
||||
time.sleep(0.5)
|
||||
broker1.start()
|
||||
self._failover(_meth)
|
||||
|
||||
def test_broker_shutdown(self):
|
||||
"""Simulate a normal shutdown of a broker."""
|
||||
def _meth(broker0, broker1):
|
||||
broker0.stop(clean=True)
|
||||
def _meth(broker):
|
||||
broker.stop(clean=True)
|
||||
time.sleep(0.5)
|
||||
broker1.start()
|
||||
self._failover(_meth)
|
||||
|
||||
def test_heartbeat_failover(self):
|
||||
"""Simulate broker heartbeat timeout."""
|
||||
def _meth(broker0, broker1):
|
||||
# keep alive heartbeat from broker 0 will stop, which should force
|
||||
# failover to broker 1 in about two seconds
|
||||
broker0.pause()
|
||||
broker1.start()
|
||||
def _meth(broker):
|
||||
# keep alive heartbeat from primary broker will stop, which should
|
||||
# force failover to backup broker in about two seconds
|
||||
broker.pause()
|
||||
self.config(idle_timeout=2, group="oslo_messaging_amqp")
|
||||
self._failover(_meth)
|
||||
self._brokers[0].stop()
|
||||
self._brokers[self._primary].stop()
|
||||
|
||||
def test_listener_failover(self):
|
||||
"""Verify that Listeners sharing the same topic are re-established
|
||||
|
Loading…
x
Reference in New Issue
Block a user