From 488594936a52145c778c89fc88adca722ae8bd72 Mon Sep 17 00:00:00 2001 From: Mehdi Abaakouk Date: Thu, 8 Dec 2016 10:05:17 +0100 Subject: [PATCH] kafka: return to poller when timeout is reach consume() must return only if user timeout is reached and not when driver consumer_timeout is reached. Change-Id: I6b2b2a28038a194224e79fa37285436ca6787a0a --- oslo_messaging/_drivers/impl_kafka.py | 31 +++++++++++++++++---------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py index 0a8c02ca2..59d2ff71e 100644 --- a/oslo_messaging/_drivers/impl_kafka.py +++ b/oslo_messaging/_drivers/impl_kafka.py @@ -194,18 +194,27 @@ class Connection(object): :param timeout: poll timeout in seconds """ - if self._consume_loop_stopped: - return None - timeout = timeout if timeout >= 0 else self.consumer_timeout - try: - messages = self._poll_messages(timeout) - except kafka.errors.ConsumerTimeout as e: - raise driver_common.Timeout(e.message) - except Exception: - LOG.exception(_LE("Failed to consume messages")) - messages = None - return messages + def _raise_timeout(exc): + raise driver_common.Timeout(exc.message) + + timer = driver_common.DecayingTimer(duration=timeout) + timer.start() + + poll_timeout = (self.consumer_timeout if timeout is None + else min(timeout, self.consumer_timeout)) + + while True: + if self._consume_loop_stopped: + return + try: + return self._poll_messages(poll_timeout) + except kafka.errors.ConsumerTimeout as exc: + poll_timeout = timer.check_return( + _raise_timeout, exc, maximum=self.consumer_timeout) + except Exception: + LOG.exception(_LE("Failed to consume messages")) + return def stop_consuming(self): self._consume_loop_stopped = True