[kafka] Add several bootstrap servers support

At that moment kafka driver can use only url with one "host:port"
for the bootstrap server defining, but kafka client supports
set of host:port adresses: "host1:port1,host2:port2", ... .
This patch implement this functional in kafka driver for the better HA.

List self.hostaddrs stores strings "host:port" of Connection.
It collects from self.url.hosts

Change-Id: I5eece66ca6bd069a0df8c8629b4ac815f69a7c7d
Closes-Bug: #1572017
This commit is contained in:
Ildar Svetlov 2016-04-14 19:45:32 +03:00 committed by Ildar Svetlov
parent 0b286754e2
commit 99b843767d
2 changed files with 20 additions and 20 deletions

View File

@ -97,21 +97,18 @@ class Connection(object):
def _parse_url(self): def _parse_url(self):
driver_conf = self.conf.oslo_messaging_kafka driver_conf = self.conf.oslo_messaging_kafka
try:
self.host = self.url.hosts[0].hostname
except (NameError, IndexError):
self.host = driver_conf.kafka_default_host
try: self.hostaddrs = []
self.port = self.url.hosts[0].port
except (NameError, IndexError):
self.port = driver_conf.kafka_default_port
if self.host is None: for host in self.url.hosts:
self.host = driver_conf.kafka_default_host if host.hostname:
self.hostaddrs.append("%s:%s" % (
host.hostname,
host.port or driver_conf.kafka_default_port))
if self.port is None: if not self.hostaddrs:
self.port = driver_conf.kafka_default_port self.hostaddrs.append("%s:%s" % (driver_conf.kafka_default_host,
driver_conf.kafka_default_port))
def notify_send(self, topic, ctxt, msg, retry): def notify_send(self, topic, ctxt, msg, retry):
"""Send messages to Kafka broker. """Send messages to Kafka broker.
@ -215,7 +212,7 @@ class Connection(object):
return return
try: try:
self.kafka_client = kafka.KafkaClient( self.kafka_client = kafka.KafkaClient(
"%s:%s" % (self.host, str(self.port))) self.hostaddrs)
self.producer = kafka.SimpleProducer(self.kafka_client) self.producer = kafka.SimpleProducer(self.kafka_client)
except KafkaError as e: except KafkaError as e:
LOG.exception(_LE("Kafka Connection is not available: %s"), e) LOG.exception(_LE("Kafka Connection is not available: %s"), e)
@ -227,7 +224,7 @@ class Connection(object):
self.kafka_client.ensure_topic_exists(topic) self.kafka_client.ensure_topic_exists(topic)
self.consumer = kafka.KafkaConsumer( self.consumer = kafka.KafkaConsumer(
*topics, group_id=group, *topics, group_id=group,
bootstrap_servers=["%s:%s" % (self.host, str(self.port))], bootstrap_servers=self.hostaddrs,
fetch_message_max_bytes=self.fetch_messages_max_bytes) fetch_message_max_bytes=self.fetch_messages_max_bytes)
self._consume_loop_stopped = False self._consume_loop_stopped = False

View File

@ -57,13 +57,17 @@ class TestKafkaTransportURL(test_utils.BaseTestCase):
scenarios = [ scenarios = [
('none', dict(url=None, ('none', dict(url=None,
expected=[dict(host='localhost', port=9092)])), expected=dict(hostaddrs=['localhost:9092']))),
('empty', dict(url='kafka:///', ('empty', dict(url='kafka:///',
expected=[dict(host='localhost', port=9092)])), expected=dict(hostaddrs=['localhost:9092']))),
('host', dict(url='kafka://127.0.0.1', ('host', dict(url='kafka://127.0.0.1',
expected=[dict(host='127.0.0.1', port=9092)])), expected=dict(hostaddrs=['127.0.0.1:9092']))),
('port', dict(url='kafka://localhost:1234', ('port', dict(url='kafka://localhost:1234',
expected=[dict(host='localhost', port=1234)])), expected=dict(hostaddrs=['localhost:1234']))),
('two', dict(url='kafka://localhost:1234,localhost2:1234',
expected=dict(hostaddrs=['localhost:1234',
'localhost2:1234']))),
] ]
def setUp(self): def setUp(self):
@ -76,8 +80,7 @@ class TestKafkaTransportURL(test_utils.BaseTestCase):
driver = transport._driver driver = transport._driver
conn = driver._get_connection(kafka_driver.PURPOSE_SEND) conn = driver._get_connection(kafka_driver.PURPOSE_SEND)
self.assertEqual(self.expected[0]['host'], conn.host) self.assertEqual(self.expected['hostaddrs'], conn.hostaddrs)
self.assertEqual(self.expected[0]['port'], conn.port)
class TestKafkaDriver(test_utils.BaseTestCase): class TestKafkaDriver(test_utils.BaseTestCase):