Improving kafka performance

The kafka client library recieves batches of a certain size from kafka
regardless of the batch size I ask for.  Asking for a batch size that's larger
than the respose size results in multiple requests to kafka until the batch
size I asked for is recieved.

Asking for a single message still causes the kafka client to recieve a batch of
data from kafka but doing it this way, which is the same way that the kafka
library __iter__ function works, results in a ~5x improvement in throughput
over asking for larger batch sizes.

Increasing the requested batch size results in a dramatic increase in
performance.  For the Java consumer the default read size is 1MB compared to
the 4KB default read size in the Python consumer.  Increasing the Python
consumer to match the Java version results in a ~10x improvement.

Change-Id: I3380df56749a577ae7116e5da841dcb91c85312a
This commit is contained in:
Joe Keen 2016-02-04 22:13:48 -07:00
parent 635b4f7f04
commit b6945fe27f

View File

@ -1,4 +1,4 @@
# Copyright (c) 2014, 2015 Hewlett-Packard Development Company, L.P.
# (C) Copyright 2014-2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -16,6 +16,7 @@
import datetime
import logging
import threading
import time
import kafka.client
import kafka.common
@ -48,6 +49,7 @@ class KafkaConsumer(object):
def __init__(self, kafka_url,
zookeeper_url, zookeeper_path,
group, topic,
fetch_size=1048576,
repartition_callback=None,
commit_callback=None,
commit_timeout=30):
@ -92,6 +94,8 @@ class KafkaConsumer(object):
self._kafka_topic,
auto_commit=False,
iter_timeout=5,
fetch_size_bytes=fetch_size,
buffer_size=fetch_size,
max_buffer_size=None)
self._consumer.provide_partition_info()
@ -119,14 +123,11 @@ class KafkaConsumer(object):
# between our current offset and the new Kafka head.
try:
messages = self._consumer.get_messages(count=1000, timeout=1)
for message in messages:
log.debug("Consuming message from kafka, "
"partition {}, offset {}".
format(message[0], message[1].offset))
message = self._consumer.get_message()
if message:
yield message
else:
time.sleep(0.01)
if self._commit_callback:
time_now = datetime.datetime.now()