Fix consuming from missing queues

Redeclare queues on 'Basic.consume: (404) no queue' exception
and enable by default consumer declaration with nowait=False
in order to wait for a broker response.

Change-Id: I99f2bc858dbc7c18a2f328ee26f39105ed17cee3
Closes-Bug: #1581148
This commit is contained in:
Kirill Bespalov 2016-05-12 21:00:29 +03:00
parent 054591fbfd
commit 43cfc18fc9
2 changed files with 69 additions and 4 deletions

View File

@ -250,7 +250,7 @@ class Consumer(object):
def __init__(self, exchange_name, queue_name, routing_key, type, durable, def __init__(self, exchange_name, queue_name, routing_key, type, durable,
exchange_auto_delete, queue_auto_delete, callback, exchange_auto_delete, queue_auto_delete, callback,
nowait=True, rabbit_ha_queues=None, rabbit_queue_ttl=0): nowait=False, rabbit_ha_queues=None, rabbit_queue_ttl=0):
"""Init the Publisher class with the exchange_name, routing_key, """Init the Publisher class with the exchange_name, routing_key,
type, durable auto_delete type, durable auto_delete
""" """
@ -1004,11 +1004,31 @@ class Connection(object):
if not self.connection.connected: if not self.connection.connected:
raise self.connection.recoverable_connection_errors[0] raise self.connection.recoverable_connection_errors[0]
if self._new_tags: consume_max_retries = 2
while self._new_tags and consume_max_retries:
for consumer, tag in self._consumers.items(): for consumer, tag in self._consumers.items():
if tag in self._new_tags: if tag in self._new_tags:
consumer.consume(tag=tag) try:
self._new_tags.remove(tag) consumer.consume(tag=tag)
self._new_tags.remove(tag)
except self.connection.channel_errors as exc:
# NOTE(kbespalov): during the interval between
# a queue declaration and consumer declaration
# the queue can disappear. In this case
# we must redeclare queue and try to re-consume.
# More details is here:
# bugs.launchpad.net/oslo.messaging/+bug/1581148
if exc.code == 404 and consume_max_retries:
consumer.declare(self)
# NOTE(kbespalov): the broker closes a channel
# at any channel error. The py-amqp catches
# this situation and re-open a new channel.
# So, we must re-declare all consumers again.
self._new_tags = set(self._consumers.values())
consume_max_retries -= 1
break
else:
raise
poll_timeout = (self._poll_timeout if timeout is None poll_timeout = (self._poll_timeout if timeout is None
else min(timeout, self._poll_timeout)) else min(timeout, self._poll_timeout))

View File

@ -310,6 +310,51 @@ class TestRabbitConsume(test_utils.BaseTestCase):
self.assertEqual(0, int(deadline - time.time())) self.assertEqual(0, int(deadline - time.time()))
def test_consume_from_missing_queue(self):
transport = oslo_messaging.get_transport(self.conf, 'kombu+memory://')
self.addCleanup(transport.cleanup)
with transport._driver._get_connection(
driver_common.PURPOSE_LISTEN) as conn:
with mock.patch('kombu.Queue.consume') as consume, mock.patch(
'kombu.Queue.declare') as declare:
conn.declare_topic_consumer(exchange_name='test',
topic='test',
callback=lambda msg: True)
import amqp
consume.side_effect = [amqp.NotFound, None]
conn.connection.connection.recoverable_connection_errors = ()
conn.connection.connection.recoverable_channel_errors = ()
self.assertEqual(1, declare.call_count)
conn.connection.connection.transport.drain_events = mock.Mock()
# Ensure that a queue will be re-declared if the consume method
# of kombu.Queue raise amqp.NotFound
conn.consume()
self.assertEqual(2, declare.call_count)
def test_consume_from_missing_queue_with_io_error_on_redeclaration(self):
transport = oslo_messaging.get_transport(self.conf, 'kombu+memory://')
self.addCleanup(transport.cleanup)
with transport._driver._get_connection(
driver_common.PURPOSE_LISTEN) as conn:
with mock.patch('kombu.Queue.consume') as consume, mock.patch(
'kombu.Queue.declare') as declare:
conn.declare_topic_consumer(exchange_name='test',
topic='test',
callback=lambda msg: True)
import amqp
consume.side_effect = [amqp.NotFound, None]
declare.side_effect = [IOError, None]
conn.connection.connection.recoverable_connection_errors = (
IOError,)
conn.connection.connection.recoverable_channel_errors = ()
self.assertEqual(1, declare.call_count)
conn.connection.connection.transport.drain_events = mock.Mock()
# Ensure that a queue will be re-declared after
# 'queue not found' exception despite on connection error.
conn.consume()
self.assertEqual(3, declare.call_count)
def test_connection_ack_have_disconnected_kombu_connection(self): def test_connection_ack_have_disconnected_kombu_connection(self):
transport = oslo_messaging.get_transport(self.conf, transport = oslo_messaging.get_transport(self.conf,
'kombu+memory:////') 'kombu+memory:////')