From 5bdd98662b5993c2b5bb643ad415a02e61c570cd Mon Sep 17 00:00:00 2001 From: Witek Bedyk Date: Tue, 19 Mar 2019 09:46:15 +0100 Subject: [PATCH] Catch the case when no new messages are available Confluent Kafka client throws the KafkaError._PARTITION_EOF exception when no new messages are available in a given partitiion. We should catch this case and continue consuming after a short sleep. Story: 2003705 Task: 30116 Change-Id: I44add24df764bbc4e718358a8af75903e035f3f0 --- monasca_common/confluent_kafka/consumer.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/monasca_common/confluent_kafka/consumer.py b/monasca_common/confluent_kafka/consumer.py index de645635..334a47e4 100644 --- a/monasca_common/confluent_kafka/consumer.py +++ b/monasca_common/confluent_kafka/consumer.py @@ -70,6 +70,10 @@ class KafkaConsumer(object): continue elif not message.error(): yield message + elif message.error().code() == \ + confluent_kafka.KafkaError._PARTITION_EOF: + time.sleep(0.1) + continue else: log.error("Kafka error: %s", message.error().str()) raise confluent_kafka.KafkaException(message.error())