From b97950ea3844b700c9cf8ba1feb306e0c3b5565e Mon Sep 17 00:00:00 2001 From: Davanum Srinivas Date: Thu, 18 Feb 2016 17:23:27 -0500 Subject: [PATCH] Get kafka notifications to work with kafka-python 0.9.5 * poll() now returns empty list instead of None * metadata_broker_list has been dropped, there's a new bootstrap_servers * Add a LOG.debug() for the actual message python simulator.py -d DEBUG --topic notifications.info --url kafka://localhost:9092 notify-server python simulator.py -d DEBUG --topic notifications --url kafka://localhost:9092 notify-client -m 1000 -w 1 More tips: http://kafka.apache.org/documentation.html#quickstart https://www.digitalocean.com/community/tutorials/how-to-install-apache-kafka-on-ubuntu-14-04 Depends-On: Ia324b4c89b05c536708baf7950857cd159578cec Change-Id: I80911d2678ea5e8d0cd6b146a1e29a58858e3144 --- oslo_messaging/_drivers/impl_kafka.py | 4 ++-- oslo_messaging/tests/drivers/test_impl_kafka.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py index 62d74e8f2..9959ffda8 100644 --- a/oslo_messaging/_drivers/impl_kafka.py +++ b/oslo_messaging/_drivers/impl_kafka.py @@ -228,8 +228,7 @@ class Connection(object): def declare_topic_consumer(self, topics, group=None): self.consumer = kafka.KafkaConsumer( *topics, group_id=group, - metadata_broker_list=["%s:%s" % (self.host, str(self.port))], - # auto_commit_enable=self.auto_commit, + bootstrap_servers=["%s:%s" % (self.host, str(self.port))], fetch_message_max_bytes=self.fetch_messages_max_bytes) @@ -262,6 +261,7 @@ class KafkaListener(base.Listener): messages = self.conn.consume(timeout=timeout) for msg in messages: message = msg.value + LOG.debug('poll got message : %s', message) message = jsonutils.loads(message) self.incoming_queue.append(OsloKafkaMessage( ctxt=message['context'], message=message['message'])) diff --git a/oslo_messaging/tests/drivers/test_impl_kafka.py b/oslo_messaging/tests/drivers/test_impl_kafka.py index bf1857969..e383419e0 100644 --- a/oslo_messaging/tests/drivers/test_impl_kafka.py +++ b/oslo_messaging/tests/drivers/test_impl_kafka.py @@ -284,4 +284,4 @@ class TestWithRealKafkaBroker(test_utils.BaseTestCase): deadline = time.time() + 3 received_message = listener.poll(timeout=3) self.assertEqual(0, int(deadline - time.time())) - self.assertIsNone(received_message) + self.assertEqual([], received_message)