Merge "Fix consuming from missing queues"

This commit is contained in:
Jenkins 2016-05-20 14:51:16 +00:00 committed by Gerrit Code Review
commit 37add88e9b
2 changed files with 69 additions and 4 deletions

View File

@ -261,7 +261,7 @@ class Consumer(object):
def __init__(self, exchange_name, queue_name, routing_key, type, durable,
exchange_auto_delete, queue_auto_delete, callback,
nowait=True, rabbit_ha_queues=None, rabbit_queue_ttl=0):
nowait=False, rabbit_ha_queues=None, rabbit_queue_ttl=0):
"""Init the Publisher class with the exchange_name, routing_key,
type, durable auto_delete
"""
@ -1024,11 +1024,31 @@ class Connection(object):
if not self.connection.connected:
raise self.connection.recoverable_connection_errors[0]
if self._new_tags:
consume_max_retries = 2
while self._new_tags and consume_max_retries:
for consumer, tag in self._consumers.items():
if tag in self._new_tags:
try:
consumer.consume(tag=tag)
self._new_tags.remove(tag)
except self.connection.channel_errors as exc:
# NOTE(kbespalov): during the interval between
# a queue declaration and consumer declaration
# the queue can disappear. In this case
# we must redeclare queue and try to re-consume.
# More details is here:
# bugs.launchpad.net/oslo.messaging/+bug/1581148
if exc.code == 404 and consume_max_retries:
consumer.declare(self)
# NOTE(kbespalov): the broker closes a channel
# at any channel error. The py-amqp catches
# this situation and re-open a new channel.
# So, we must re-declare all consumers again.
self._new_tags = set(self._consumers.values())
consume_max_retries -= 1
break
else:
raise
poll_timeout = (self._poll_timeout if timeout is None
else min(timeout, self._poll_timeout))

View File

@ -321,6 +321,51 @@ class TestRabbitConsume(test_utils.BaseTestCase):
self.assertEqual(0, int(deadline - time.time()))
def test_consume_from_missing_queue(self):
transport = oslo_messaging.get_transport(self.conf, 'kombu+memory://')
self.addCleanup(transport.cleanup)
with transport._driver._get_connection(
driver_common.PURPOSE_LISTEN) as conn:
with mock.patch('kombu.Queue.consume') as consume, mock.patch(
'kombu.Queue.declare') as declare:
conn.declare_topic_consumer(exchange_name='test',
topic='test',
callback=lambda msg: True)
import amqp
consume.side_effect = [amqp.NotFound, None]
conn.connection.connection.recoverable_connection_errors = ()
conn.connection.connection.recoverable_channel_errors = ()
self.assertEqual(1, declare.call_count)
conn.connection.connection.transport.drain_events = mock.Mock()
# Ensure that a queue will be re-declared if the consume method
# of kombu.Queue raise amqp.NotFound
conn.consume()
self.assertEqual(2, declare.call_count)
def test_consume_from_missing_queue_with_io_error_on_redeclaration(self):
transport = oslo_messaging.get_transport(self.conf, 'kombu+memory://')
self.addCleanup(transport.cleanup)
with transport._driver._get_connection(
driver_common.PURPOSE_LISTEN) as conn:
with mock.patch('kombu.Queue.consume') as consume, mock.patch(
'kombu.Queue.declare') as declare:
conn.declare_topic_consumer(exchange_name='test',
topic='test',
callback=lambda msg: True)
import amqp
consume.side_effect = [amqp.NotFound, None]
declare.side_effect = [IOError, None]
conn.connection.connection.recoverable_connection_errors = (
IOError,)
conn.connection.connection.recoverable_channel_errors = ()
self.assertEqual(1, declare.call_count)
conn.connection.connection.transport.drain_events = mock.Mock()
# Ensure that a queue will be re-declared after
# 'queue not found' exception despite on connection error.
conn.consume()
self.assertEqual(3, declare.call_count)
def test_connection_ack_have_disconnected_kombu_connection(self):
transport = oslo_messaging.get_transport(self.conf,
'kombu+memory:////')