Merge "[kafka] Do not remove kafka_client during reset"
This commit is contained in:
commit
c81aa02832
@ -187,12 +187,8 @@ class Connection(object):
|
|||||||
|
|
||||||
def reset(self):
|
def reset(self):
|
||||||
"""Reset a connection so it can be used again."""
|
"""Reset a connection so it can be used again."""
|
||||||
if self.kafka_client:
|
if self.consumer:
|
||||||
self.kafka_client.close()
|
self.consumer.close()
|
||||||
self.kafka_client = None
|
|
||||||
if self.producer:
|
|
||||||
self.producer.stop()
|
|
||||||
self.producer = None
|
|
||||||
self.consumer = None
|
self.consumer = None
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
@ -233,6 +229,7 @@ class Connection(object):
|
|||||||
*topics, group_id=group,
|
*topics, group_id=group,
|
||||||
bootstrap_servers=["%s:%s" % (self.host, str(self.port))],
|
bootstrap_servers=["%s:%s" % (self.host, str(self.port))],
|
||||||
fetch_message_max_bytes=self.fetch_messages_max_bytes)
|
fetch_message_max_bytes=self.fetch_messages_max_bytes)
|
||||||
|
self._consume_loop_stopped = False
|
||||||
|
|
||||||
|
|
||||||
class OsloKafkaMessage(base.RpcIncomingMessage):
|
class OsloKafkaMessage(base.RpcIncomingMessage):
|
||||||
|
Loading…
x
Reference in New Issue
Block a user