kafka: return to poller when timeout is reach
consume() must return only if user timeout is reached and not when driver consumer_timeout is reached. Change-Id: I6b2b2a28038a194224e79fa37285436ca6787a0a
This commit is contained in:
parent
a7044799ad
commit
488594936a
@ -194,18 +194,27 @@ class Connection(object):
|
|||||||
|
|
||||||
:param timeout: poll timeout in seconds
|
:param timeout: poll timeout in seconds
|
||||||
"""
|
"""
|
||||||
if self._consume_loop_stopped:
|
|
||||||
return None
|
|
||||||
|
|
||||||
timeout = timeout if timeout >= 0 else self.consumer_timeout
|
def _raise_timeout(exc):
|
||||||
try:
|
raise driver_common.Timeout(exc.message)
|
||||||
messages = self._poll_messages(timeout)
|
|
||||||
except kafka.errors.ConsumerTimeout as e:
|
timer = driver_common.DecayingTimer(duration=timeout)
|
||||||
raise driver_common.Timeout(e.message)
|
timer.start()
|
||||||
except Exception:
|
|
||||||
LOG.exception(_LE("Failed to consume messages"))
|
poll_timeout = (self.consumer_timeout if timeout is None
|
||||||
messages = None
|
else min(timeout, self.consumer_timeout))
|
||||||
return messages
|
|
||||||
|
while True:
|
||||||
|
if self._consume_loop_stopped:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
return self._poll_messages(poll_timeout)
|
||||||
|
except kafka.errors.ConsumerTimeout as exc:
|
||||||
|
poll_timeout = timer.check_return(
|
||||||
|
_raise_timeout, exc, maximum=self.consumer_timeout)
|
||||||
|
except Exception:
|
||||||
|
LOG.exception(_LE("Failed to consume messages"))
|
||||||
|
return
|
||||||
|
|
||||||
def stop_consuming(self):
|
def stop_consuming(self):
|
||||||
self._consume_loop_stopped = True
|
self._consume_loop_stopped = True
|
||||||
|
Loading…
x
Reference in New Issue
Block a user