From 99b843767d0ab1f37dc5662c0efc0263fcd88977 Mon Sep 17 00:00:00 2001 From: Ildar Svetlov Date: Thu, 14 Apr 2016 19:45:32 +0300 Subject: [PATCH] [kafka] Add several bootstrap servers support At that moment kafka driver can use only url with one "host:port" for the bootstrap server defining, but kafka client supports set of host:port adresses: "host1:port1,host2:port2", ... . This patch implement this functional in kafka driver for the better HA. List self.hostaddrs stores strings "host:port" of Connection. It collects from self.url.hosts Change-Id: I5eece66ca6bd069a0df8c8629b4ac815f69a7c7d Closes-Bug: #1572017 --- oslo_messaging/_drivers/impl_kafka.py | 25 ++++++++----------- .../tests/drivers/test_impl_kafka.py | 15 ++++++----- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py index 471734cfb..018362062 100644 --- a/oslo_messaging/_drivers/impl_kafka.py +++ b/oslo_messaging/_drivers/impl_kafka.py @@ -97,21 +97,18 @@ class Connection(object): def _parse_url(self): driver_conf = self.conf.oslo_messaging_kafka - try: - self.host = self.url.hosts[0].hostname - except (NameError, IndexError): - self.host = driver_conf.kafka_default_host - try: - self.port = self.url.hosts[0].port - except (NameError, IndexError): - self.port = driver_conf.kafka_default_port + self.hostaddrs = [] - if self.host is None: - self.host = driver_conf.kafka_default_host + for host in self.url.hosts: + if host.hostname: + self.hostaddrs.append("%s:%s" % ( + host.hostname, + host.port or driver_conf.kafka_default_port)) - if self.port is None: - self.port = driver_conf.kafka_default_port + if not self.hostaddrs: + self.hostaddrs.append("%s:%s" % (driver_conf.kafka_default_host, + driver_conf.kafka_default_port)) def notify_send(self, topic, ctxt, msg, retry): """Send messages to Kafka broker. @@ -215,7 +212,7 @@ class Connection(object): return try: self.kafka_client = kafka.KafkaClient( - "%s:%s" % (self.host, str(self.port))) + self.hostaddrs) self.producer = kafka.SimpleProducer(self.kafka_client) except KafkaError as e: LOG.exception(_LE("Kafka Connection is not available: %s"), e) @@ -227,7 +224,7 @@ class Connection(object): self.kafka_client.ensure_topic_exists(topic) self.consumer = kafka.KafkaConsumer( *topics, group_id=group, - bootstrap_servers=["%s:%s" % (self.host, str(self.port))], + bootstrap_servers=self.hostaddrs, fetch_message_max_bytes=self.fetch_messages_max_bytes) self._consume_loop_stopped = False diff --git a/oslo_messaging/tests/drivers/test_impl_kafka.py b/oslo_messaging/tests/drivers/test_impl_kafka.py index e5528cc30..a1d7e6c57 100644 --- a/oslo_messaging/tests/drivers/test_impl_kafka.py +++ b/oslo_messaging/tests/drivers/test_impl_kafka.py @@ -57,13 +57,17 @@ class TestKafkaTransportURL(test_utils.BaseTestCase): scenarios = [ ('none', dict(url=None, - expected=[dict(host='localhost', port=9092)])), + expected=dict(hostaddrs=['localhost:9092']))), ('empty', dict(url='kafka:///', - expected=[dict(host='localhost', port=9092)])), + expected=dict(hostaddrs=['localhost:9092']))), ('host', dict(url='kafka://127.0.0.1', - expected=[dict(host='127.0.0.1', port=9092)])), + expected=dict(hostaddrs=['127.0.0.1:9092']))), ('port', dict(url='kafka://localhost:1234', - expected=[dict(host='localhost', port=1234)])), + expected=dict(hostaddrs=['localhost:1234']))), + ('two', dict(url='kafka://localhost:1234,localhost2:1234', + expected=dict(hostaddrs=['localhost:1234', + 'localhost2:1234']))), + ] def setUp(self): @@ -76,8 +80,7 @@ class TestKafkaTransportURL(test_utils.BaseTestCase): driver = transport._driver conn = driver._get_connection(kafka_driver.PURPOSE_SEND) - self.assertEqual(self.expected[0]['host'], conn.host) - self.assertEqual(self.expected[0]['port'], conn.port) + self.assertEqual(self.expected['hostaddrs'], conn.hostaddrs) class TestKafkaDriver(test_utils.BaseTestCase):