Support kafka connection timeout option

Add timeout option for kafka consumer client.

Task: 36034
Story: 2006312
Change-Id: I197b81b2c5690b3e27b3fac8dd42c4cb2d9ba776
This commit is contained in:
zhangjianweibj 2019-07-26 16:16:59 +08:00 committed by Witold Bedyk
parent e4976083a0
commit 0b818510e3
3 changed files with 13 additions and 7 deletions

View File

@ -25,7 +25,7 @@ class KafkaConsumer(object):
def __init__(self, bootstrap_servers, group_id, topic, def __init__(self, bootstrap_servers, group_id, topic,
fetch_min_bytes=1048576, client_id="", fetch_min_bytes=1048576, client_id="",
repartition_callback=None, commit_callback=None, repartition_callback=None, commit_callback=None,
max_commit_interval=30): max_commit_interval=30, timeout=10000):
""" """
Create new high-level Consumer instance. Create new high-level Consumer instance.
@ -43,10 +43,12 @@ class KafkaConsumer(object):
:param callable commit_callback: Callback function responsible for :param callable commit_callback: Callback function responsible for
calling the commit() method. calling the commit() method.
:param int max_commit_interval: Maximum time in seconds between commits. :param int max_commit_interval: Maximum time in seconds between commits.
:param int timeout: Client group session and failure detection timeout.
""" """
consumer_config = {'bootstrap.servers': bootstrap_servers, consumer_config = {'bootstrap.servers': bootstrap_servers,
'group.id': group_id, 'group.id': group_id,
'session.timeout.ms': timeout,
'fetch.min.bytes': fetch_min_bytes, 'fetch.min.bytes': fetch_min_bytes,
'client.id': client_id, 'client.id': client_id,
'enable.auto.commit': False, 'enable.auto.commit': False,

View File

@ -18,13 +18,14 @@ import logging
import threading import threading
import time import time
import monasca_common.kafka_lib.client as kafka_client
import monasca_common.kafka_lib.common as kafka_common
import monasca_common.kafka_lib.consumer as kafka_consumer
from kazoo.client import KazooClient from kazoo.client import KazooClient
from kazoo.recipe.partitioner import SetPartitioner from kazoo.recipe.partitioner import SetPartitioner
import monasca_common.kafka_lib.client as kafka_client
import monasca_common.kafka_lib.common as kafka_common
from monasca_common.kafka_lib.conn import DEFAULT_SOCKET_TIMEOUT_SECONDS
import monasca_common.kafka_lib.consumer as kafka_consumer
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
"""Kafka consumer interface """Kafka consumer interface
@ -52,7 +53,8 @@ class KafkaConsumer(object):
fetch_size=1048576, fetch_size=1048576,
repartition_callback=None, repartition_callback=None,
commit_callback=None, commit_callback=None,
commit_timeout=30): commit_timeout=30,
timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS):
"""Init """Init
kafka_url - Kafka location kafka_url - Kafka location
@ -68,6 +70,7 @@ class KafkaConsumer(object):
commit_callback - Callback to run when the commit_timeout commit_callback - Callback to run when the commit_timeout
has elapsed between commits. has elapsed between commits.
commit_timeout - Timeout between commits. commit_timeout - Timeout between commits.
timeout - kafka connection timeout
""" """
self._kazoo_client = None self._kazoo_client = None
@ -89,7 +92,7 @@ class KafkaConsumer(object):
self._zookeeper_url = zookeeper_url self._zookeeper_url = zookeeper_url
self._zookeeper_path = zookeeper_path self._zookeeper_path = zookeeper_path
self._kafka = kafka_client.KafkaClient(kafka_url) self._kafka = kafka_client.KafkaClient(kafka_url, timeout=timeout)
self._consumer = self._create_kafka_consumer() self._consumer = self._create_kafka_consumer()

View File

@ -123,6 +123,7 @@ class TestConfluentKafkaConsumer(base.BaseTestCase):
def test_kafka_consumer_init(self): def test_kafka_consumer_init(self):
expected_config = {'group.id': 'fake_group', expected_config = {'group.id': 'fake_group',
'session.timeout.ms': 10000,
'bootstrap.servers': ['fake_server1', 'bootstrap.servers': ['fake_server1',
'fake_server2'], 'fake_server2'],
'fetch.min.bytes': 128, 'fetch.min.bytes': 128,