Merge "Catch the case when no new messages are available"
This commit is contained in:
commit
a45625936f
@ -70,6 +70,10 @@ class KafkaConsumer(object):
|
||||
continue
|
||||
elif not message.error():
|
||||
yield message
|
||||
elif message.error().code() == \
|
||||
confluent_kafka.KafkaError._PARTITION_EOF:
|
||||
time.sleep(0.1)
|
||||
continue
|
||||
else:
|
||||
log.error("Kafka error: %s", message.error().str())
|
||||
raise confluent_kafka.KafkaException(message.error())
|
||||
|
Loading…
x
Reference in New Issue
Block a user