From 43cfc18fc9a307b44eebdd052203de8e44dbc814 Mon Sep 17 00:00:00 2001 From: Kirill Bespalov Date: Thu, 12 May 2016 21:00:29 +0300 Subject: [PATCH] Fix consuming from missing queues Redeclare queues on 'Basic.consume: (404) no queue' exception and enable by default consumer declaration with nowait=False in order to wait for a broker response. Change-Id: I99f2bc858dbc7c18a2f328ee26f39105ed17cee3 Closes-Bug: #1581148 --- oslo_messaging/_drivers/impl_rabbit.py | 28 ++++++++++-- .../tests/drivers/test_impl_rabbit.py | 45 +++++++++++++++++++ 2 files changed, 69 insertions(+), 4 deletions(-) diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 4c874ee08..b1890a34f 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -250,7 +250,7 @@ class Consumer(object): def __init__(self, exchange_name, queue_name, routing_key, type, durable, exchange_auto_delete, queue_auto_delete, callback, - nowait=True, rabbit_ha_queues=None, rabbit_queue_ttl=0): + nowait=False, rabbit_ha_queues=None, rabbit_queue_ttl=0): """Init the Publisher class with the exchange_name, routing_key, type, durable auto_delete """ @@ -1004,11 +1004,31 @@ class Connection(object): if not self.connection.connected: raise self.connection.recoverable_connection_errors[0] - if self._new_tags: + consume_max_retries = 2 + while self._new_tags and consume_max_retries: for consumer, tag in self._consumers.items(): if tag in self._new_tags: - consumer.consume(tag=tag) - self._new_tags.remove(tag) + try: + consumer.consume(tag=tag) + self._new_tags.remove(tag) + except self.connection.channel_errors as exc: + # NOTE(kbespalov): during the interval between + # a queue declaration and consumer declaration + # the queue can disappear. In this case + # we must redeclare queue and try to re-consume. + # More details is here: + # bugs.launchpad.net/oslo.messaging/+bug/1581148 + if exc.code == 404 and consume_max_retries: + consumer.declare(self) + # NOTE(kbespalov): the broker closes a channel + # at any channel error. The py-amqp catches + # this situation and re-open a new channel. + # So, we must re-declare all consumers again. + self._new_tags = set(self._consumers.values()) + consume_max_retries -= 1 + break + else: + raise poll_timeout = (self._poll_timeout if timeout is None else min(timeout, self._poll_timeout)) diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py index 62597d65a..3fd7b2a42 100644 --- a/oslo_messaging/tests/drivers/test_impl_rabbit.py +++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py @@ -310,6 +310,51 @@ class TestRabbitConsume(test_utils.BaseTestCase): self.assertEqual(0, int(deadline - time.time())) + def test_consume_from_missing_queue(self): + transport = oslo_messaging.get_transport(self.conf, 'kombu+memory://') + self.addCleanup(transport.cleanup) + with transport._driver._get_connection( + driver_common.PURPOSE_LISTEN) as conn: + with mock.patch('kombu.Queue.consume') as consume, mock.patch( + 'kombu.Queue.declare') as declare: + conn.declare_topic_consumer(exchange_name='test', + topic='test', + callback=lambda msg: True) + import amqp + consume.side_effect = [amqp.NotFound, None] + conn.connection.connection.recoverable_connection_errors = () + conn.connection.connection.recoverable_channel_errors = () + self.assertEqual(1, declare.call_count) + conn.connection.connection.transport.drain_events = mock.Mock() + # Ensure that a queue will be re-declared if the consume method + # of kombu.Queue raise amqp.NotFound + conn.consume() + self.assertEqual(2, declare.call_count) + + def test_consume_from_missing_queue_with_io_error_on_redeclaration(self): + transport = oslo_messaging.get_transport(self.conf, 'kombu+memory://') + self.addCleanup(transport.cleanup) + with transport._driver._get_connection( + driver_common.PURPOSE_LISTEN) as conn: + with mock.patch('kombu.Queue.consume') as consume, mock.patch( + 'kombu.Queue.declare') as declare: + conn.declare_topic_consumer(exchange_name='test', + topic='test', + callback=lambda msg: True) + import amqp + consume.side_effect = [amqp.NotFound, None] + declare.side_effect = [IOError, None] + + conn.connection.connection.recoverable_connection_errors = ( + IOError,) + conn.connection.connection.recoverable_channel_errors = () + self.assertEqual(1, declare.call_count) + conn.connection.connection.transport.drain_events = mock.Mock() + # Ensure that a queue will be re-declared after + # 'queue not found' exception despite on connection error. + conn.consume() + self.assertEqual(3, declare.call_count) + def test_connection_ack_have_disconnected_kombu_connection(self): transport = oslo_messaging.get_transport(self.conf, 'kombu+memory:////')