diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py index 0143fce7f..471734cfb 100644 --- a/oslo_messaging/_drivers/impl_kafka.py +++ b/oslo_messaging/_drivers/impl_kafka.py @@ -187,12 +187,8 @@ class Connection(object): def reset(self): """Reset a connection so it can be used again.""" - if self.kafka_client: - self.kafka_client.close() - self.kafka_client = None - if self.producer: - self.producer.stop() - self.producer = None + if self.consumer: + self.consumer.close() self.consumer = None def close(self): @@ -233,6 +229,7 @@ class Connection(object): *topics, group_id=group, bootstrap_servers=["%s:%s" % (self.host, str(self.port))], fetch_message_max_bytes=self.fetch_messages_max_bytes) + self._consume_loop_stopped = False class OsloKafkaMessage(base.RpcIncomingMessage):