diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py index 49e39d1a4..a9aa58c62 100644 --- a/oslo_messaging/_drivers/impl_kafka.py +++ b/oslo_messaging/_drivers/impl_kafka.py @@ -351,9 +351,9 @@ class KafkaDriver(base.BaseDriver): :type pool: string """ conn = self._get_connection(purpose=PURPOSE_LISTEN) - topics = [] + topics = set() for target, priority in targets_and_priorities: - topics.append(target_to_topic(target, priority)) + topics.add(target_to_topic(target, priority)) conn.declare_topic_consumer(topics, pool) diff --git a/oslo_messaging/tests/drivers/test_impl_kafka.py b/oslo_messaging/tests/drivers/test_impl_kafka.py index 7f1d5e377..057ec1eda 100644 --- a/oslo_messaging/tests/drivers/test_impl_kafka.py +++ b/oslo_messaging/tests/drivers/test_impl_kafka.py @@ -206,6 +206,26 @@ class TestKafkaListener(test_utils.BaseTestCase): self.driver.listen_for_notifications(fake_targets_and_priorities) self.assertEqual(1, len(fake_consumer.mock_calls)) + @mock.patch.object(kafka_driver.Connection, '_ensure_connection') + @mock.patch.object(kafka_driver.Connection, 'declare_topic_consumer') + def test_converting_targets_to_topics(self, fake_consumer, + fake_ensure_connection): + fake_targets_and_priorities = [ + (oslo_messaging.Target(topic="fake_topic", + exchange="test1"), 'info'), + (oslo_messaging.Target(topic="fake_topic", + exchange="test2"), 'info'), + (oslo_messaging.Target(topic="fake_topic", + exchange="test1"), 'error'), + (oslo_messaging.Target(topic="fake_topic", + exchange="test3"), 'error'), + ] + self.driver.listen_for_notifications(fake_targets_and_priorities) + self.assertEqual(1, len(fake_consumer.mock_calls)) + fake_consumer.assert_called_once_with(set(['fake_topic.error', + 'fake_topic.info']), + None) + @mock.patch.object(kafka_driver.Connection, '_ensure_connection') @mock.patch.object(kafka_driver.Connection, 'declare_topic_consumer') def test_stop_listener(self, fake_consumer, fake_client):