From 2247194f080a20128adf1bd07095259ddef92c4b Mon Sep 17 00:00:00 2001 From: zhangjianweibj Date: Wed, 7 Aug 2019 16:17:38 +0800 Subject: [PATCH] Confluent Kafka driver broker option bug In confluent Kafka driver, brokers option is string type not list type. Comma separated list of host/port pairs. See https://github.com/confluentinc/confluent-kafka-python/blob/master/README.md Story: 2003705 Task: 36199 Change-Id: I5961ee32c91a9a946831a36dc88413e56b251e1e --- monasca_common/confluent_kafka/consumer.py | 4 ++-- monasca_common/tests/test_confluent_kafka.py | 6 ++---- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/monasca_common/confluent_kafka/consumer.py b/monasca_common/confluent_kafka/consumer.py index c9bc20ce..d8a734a6 100644 --- a/monasca_common/confluent_kafka/consumer.py +++ b/monasca_common/confluent_kafka/consumer.py @@ -29,8 +29,8 @@ class KafkaConsumer(object): """ Create new high-level Consumer instance. - :param list(str) bootstrap_servers: A list of host/port pairs to use - for establishing the initial connection to the Kafka cluster. + :param str bootstrap_servers: Comma separated list of host/port pairs to + use for establishing the initial connection to the Kafka cluster. :param str group_id: A unique string that identifies the consumer group this consumer belongs to. :param str topic: Topic to subscribe to. diff --git a/monasca_common/tests/test_confluent_kafka.py b/monasca_common/tests/test_confluent_kafka.py index cc1c0573..d966cd98 100644 --- a/monasca_common/tests/test_confluent_kafka.py +++ b/monasca_common/tests/test_confluent_kafka.py @@ -101,8 +101,7 @@ class TestConfluentKafkaConsumer(base.BaseTestCase): def setUp(self, mock_confluent_consumer): super(TestConfluentKafkaConsumer, self).setUp() self.mock_confluent_consumer = mock_confluent_consumer - self.consumer = consumer.KafkaConsumer(['fake_server1', - 'fake_server2'], + self.consumer = consumer.KafkaConsumer('fake_server1,fake_server2', 'fake_group', FAKE_KAFKA_TOPIC, 128, 'test_client', @@ -124,8 +123,7 @@ class TestConfluentKafkaConsumer(base.BaseTestCase): def test_kafka_consumer_init(self): expected_config = {'group.id': 'fake_group', 'session.timeout.ms': 10000, - 'bootstrap.servers': ['fake_server1', - 'fake_server2'], + 'bootstrap.servers': 'fake_server1,fake_server2', 'fetch.min.bytes': 128, 'client.id': 'test_client', 'enable.auto.commit': False,