From 94e0a059b562508d6ddd3f91fa0bbbdb51b3a375 Mon Sep 17 00:00:00 2001 From: Habeeb Mohammed <habeeb.umo@gmail.com> Date: Fri, 22 Sep 2017 18:21:08 -0600 Subject: [PATCH] Fixed inconfigurable kafka consumer offset location PROBLEM: Consumer offset was resetting to the latest index rather than the earliest SOLUTION: Modified consumer creation to include `auto_offset_reset="smallest"` which allows the offset to reset to the earliest known index. NOTE: This does exactly what the whence parameter in SimpleConsumer.seek() is expected to do, however in order to achieve this functionality, the parameter `auto_offset_reset` MUST be set to either "largest" or "smallest". Change-Id: I887892d80f2da9619c7f11737b3ab2e1d1dacf1e --- monasca_common/kafka/consumer.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/monasca_common/kafka/consumer.py b/monasca_common/kafka/consumer.py index 7b9a08ce..670ac1f1 100644 --- a/monasca_common/kafka/consumer.py +++ b/monasca_common/kafka/consumer.py @@ -95,6 +95,12 @@ class KafkaConsumer(object): def _create_kafka_consumer(self, partitions=None): # No auto-commit so that commits only happen after the message is processed. + + # auto_offset_reset is a param that alters where the current offset in the consumer + # will modify from (see whence param in SimpleConsumer.seek()). It is imperative to set + # this param as either "largest" or "smallest" depending on where we would like + # to modify the offset from, no matter what whence is set to. + consumer = kafka_consumer.SimpleConsumer( self._kafka, self._kafka_group, @@ -104,7 +110,8 @@ class KafkaConsumer(object): iter_timeout=5, fetch_size_bytes=self._kafka_fetch_size, buffer_size=self._kafka_fetch_size, - max_buffer_size=None) + max_buffer_size=None, + auto_offset_reset="smallest") consumer.provide_partition_info() consumer.fetch_last_known_offsets() @@ -127,7 +134,7 @@ class KafkaConsumer(object): # an OffsetOutOfRangeError. We trap this error and seek to # the head of the current Kafka data. Because this error # only happens when Kafka removes data we're currently - # pointing at we're gauranteed that we won't read any + # pointing at we're guaranteed that we won't read any # duplicate data however we will lose any information # between our current offset and the new Kafka head.