Merge "Allow passing config options to Kafka producer"
This commit is contained in:
commit
aecc0f0230
@ -22,16 +22,17 @@ log = logging.getLogger(__name__)
|
||||
class KafkaProducer(object):
|
||||
"""Wrapper around asynchronous Kafka Producer"""
|
||||
|
||||
def __init__(self, bootstrap_servers):
|
||||
def __init__(self, bootstrap_servers, **config):
|
||||
"""
|
||||
Create new Producer wrapper instance.
|
||||
|
||||
:param str bootstrap_servers: Initial list of brokers as a CSV
|
||||
list of broker host or host:port.
|
||||
:param config Configuration properties
|
||||
"""
|
||||
|
||||
self._producer = confluent_kafka.Producer({'bootstrap.servers':
|
||||
bootstrap_servers})
|
||||
config['bootstrap.servers'] = bootstrap_servers
|
||||
self._producer = confluent_kafka.Producer(config)
|
||||
|
||||
@staticmethod
|
||||
def delivery_report(err, msg):
|
||||
|
@ -49,8 +49,8 @@ def get_kafka_consumer(kafka_url,
|
||||
)
|
||||
|
||||
|
||||
def get_kafka_producer(kafka_url, use_legacy_client=False):
|
||||
def get_kafka_producer(kafka_url, use_legacy_client=False, **config):
|
||||
if use_legacy_client:
|
||||
return legacy_kafka_producer.KafkaProducer(kafka_url)
|
||||
else:
|
||||
return producer.KafkaProducer(",".join(kafka_url))
|
||||
return producer.KafkaProducer(",".join(kafka_url), **config)
|
||||
|
Loading…
x
Reference in New Issue
Block a user