From 9a065e3d8e750b9995009aa1b3290525fff25788 Mon Sep 17 00:00:00 2001 From: Ilya Tyaptin Date: Thu, 24 Mar 2016 17:41:05 +0300 Subject: [PATCH] [kafka] Do not remove kafka_client during reset Currently we delete kafka_client, producer and consumer from the Kafka driver connection when we reset it, for example before returning it to pool. It's a redundant action, because kafka_client and kafka producer (it uses a kafka_client for sending a message) could be used again without any concerns. In same time, currently we don't close a KafkaConsumer, but it's needed because we should to close opened sockets to the kafka. In this patchset all action in reset are changed to the more optimal behavior. Change-Id: I6ff26256c933c79468f9e6cd1752181e5ace155f Closes-bug: #1557528 --- oslo_messaging/_drivers/impl_kafka.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py index dfd0ed06f..bec606330 100644 --- a/oslo_messaging/_drivers/impl_kafka.py +++ b/oslo_messaging/_drivers/impl_kafka.py @@ -187,12 +187,8 @@ class Connection(object): def reset(self): """Reset a connection so it can be used again.""" - if self.kafka_client: - self.kafka_client.close() - self.kafka_client = None - if self.producer: - self.producer.stop() - self.producer = None + if self.consumer: + self.consumer.close() self.consumer = None def close(self): @@ -233,6 +229,7 @@ class Connection(object): *topics, group_id=group, bootstrap_servers=["%s:%s" % (self.host, str(self.port))], fetch_message_max_bytes=self.fetch_messages_max_bytes) + self._consume_loop_stopped = False class OsloKafkaMessage(base.RpcIncomingMessage):