From 0b818510e373b2cfbe66594e15819ab2989f588a Mon Sep 17 00:00:00 2001 From: zhangjianweibj Date: Fri, 26 Jul 2019 16:16:59 +0800 Subject: [PATCH] Support kafka connection timeout option Add timeout option for kafka consumer client. Task: 36034 Story: 2006312 Change-Id: I197b81b2c5690b3e27b3fac8dd42c4cb2d9ba776 --- monasca_common/confluent_kafka/consumer.py | 4 +++- monasca_common/kafka/consumer.py | 15 +++++++++------ monasca_common/tests/test_confluent_kafka.py | 1 + 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/monasca_common/confluent_kafka/consumer.py b/monasca_common/confluent_kafka/consumer.py index 334a47e4..f09715fd 100644 --- a/monasca_common/confluent_kafka/consumer.py +++ b/monasca_common/confluent_kafka/consumer.py @@ -25,7 +25,7 @@ class KafkaConsumer(object): def __init__(self, bootstrap_servers, group_id, topic, fetch_min_bytes=1048576, client_id="", repartition_callback=None, commit_callback=None, - max_commit_interval=30): + max_commit_interval=30, timeout=10000): """ Create new high-level Consumer instance. @@ -43,10 +43,12 @@ class KafkaConsumer(object): :param callable commit_callback: Callback function responsible for calling the commit() method. :param int max_commit_interval: Maximum time in seconds between commits. + :param int timeout: Client group session and failure detection timeout. """ consumer_config = {'bootstrap.servers': bootstrap_servers, 'group.id': group_id, + 'session.timeout.ms': timeout, 'fetch.min.bytes': fetch_min_bytes, 'client.id': client_id, 'enable.auto.commit': False, diff --git a/monasca_common/kafka/consumer.py b/monasca_common/kafka/consumer.py index 670ac1f1..4520dc25 100644 --- a/monasca_common/kafka/consumer.py +++ b/monasca_common/kafka/consumer.py @@ -18,13 +18,14 @@ import logging import threading import time -import monasca_common.kafka_lib.client as kafka_client -import monasca_common.kafka_lib.common as kafka_common -import monasca_common.kafka_lib.consumer as kafka_consumer - from kazoo.client import KazooClient from kazoo.recipe.partitioner import SetPartitioner +import monasca_common.kafka_lib.client as kafka_client +import monasca_common.kafka_lib.common as kafka_common +from monasca_common.kafka_lib.conn import DEFAULT_SOCKET_TIMEOUT_SECONDS +import monasca_common.kafka_lib.consumer as kafka_consumer + log = logging.getLogger(__name__) """Kafka consumer interface @@ -52,7 +53,8 @@ class KafkaConsumer(object): fetch_size=1048576, repartition_callback=None, commit_callback=None, - commit_timeout=30): + commit_timeout=30, + timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS): """Init kafka_url - Kafka location @@ -68,6 +70,7 @@ class KafkaConsumer(object): commit_callback - Callback to run when the commit_timeout has elapsed between commits. commit_timeout - Timeout between commits. + timeout - kafka connection timeout """ self._kazoo_client = None @@ -89,7 +92,7 @@ class KafkaConsumer(object): self._zookeeper_url = zookeeper_url self._zookeeper_path = zookeeper_path - self._kafka = kafka_client.KafkaClient(kafka_url) + self._kafka = kafka_client.KafkaClient(kafka_url, timeout=timeout) self._consumer = self._create_kafka_consumer() diff --git a/monasca_common/tests/test_confluent_kafka.py b/monasca_common/tests/test_confluent_kafka.py index 69d47ab1..cc1c0573 100644 --- a/monasca_common/tests/test_confluent_kafka.py +++ b/monasca_common/tests/test_confluent_kafka.py @@ -123,6 +123,7 @@ class TestConfluentKafkaConsumer(base.BaseTestCase): def test_kafka_consumer_init(self): expected_config = {'group.id': 'fake_group', + 'session.timeout.ms': 10000, 'bootstrap.servers': ['fake_server1', 'fake_server2'], 'fetch.min.bytes': 128,