Merge "Add wrapper for Confluent Kafka client consumer"
This commit is contained in:
commit
70d9aff4ca
85
monasca_common/confluent_kafka/consumer.py
Normal file
85
monasca_common/confluent_kafka/consumer.py
Normal file
@ -0,0 +1,85 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import datetime
|
||||
import logging
|
||||
import time
|
||||
|
||||
import confluent_kafka
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class KafkaConsumer(object):
|
||||
"""Wrapper around high-level Kafka Consumer"""
|
||||
def __init__(self, bootstrap_servers, group_id, topic,
|
||||
fetch_min_bytes=1048576, client_id="",
|
||||
repartition_callback=None, commit_callback=None,
|
||||
max_commit_interval=30):
|
||||
"""
|
||||
Create new high-level Consumer instance.
|
||||
|
||||
:param list(str) bootstrap_servers: A list of host/port pairs to use
|
||||
for establishing the initial connection to the Kafka cluster.
|
||||
:param str group_id: A unique string that identifies the consumer group
|
||||
this consumer belongs to.
|
||||
:param str topic: Topic to subscribe to.
|
||||
:param int fetch_min_bytes: The minimum amount of data the server
|
||||
should return for a fetch request.
|
||||
:param str client_id: An id string to pass to the server when making
|
||||
requests.
|
||||
:param callable repartition_callback: Callback function executed on the
|
||||
start of a rebalance operation.
|
||||
:param callable commit_callback: Callback function responsible for
|
||||
calling the commit() method.
|
||||
:param int max_commit_interval: Maximum time in seconds between commits.
|
||||
"""
|
||||
|
||||
consumer_config = {'bootstrap.servers': bootstrap_servers,
|
||||
'group.id': group_id,
|
||||
'fetch.min.bytes': fetch_min_bytes,
|
||||
'client.id': client_id,
|
||||
'enable.auto.commit': False,
|
||||
'default.topic.config':
|
||||
{'auto.offset.reset': 'earliest'}
|
||||
}
|
||||
self._commit_callback = commit_callback
|
||||
self._max_commit_interval = max_commit_interval
|
||||
self._consumer = confluent_kafka.Consumer(consumer_config)
|
||||
self._consumer.subscribe([topic], on_revoke=repartition_callback)
|
||||
self._last_commit = None
|
||||
|
||||
def __iter__(self):
|
||||
self._last_commit = datetime.datetime.now()
|
||||
|
||||
while True:
|
||||
message = self._consumer.poll(timeout=5)
|
||||
|
||||
if message is None:
|
||||
time.sleep(0.1)
|
||||
continue
|
||||
elif not message.error():
|
||||
yield message
|
||||
else:
|
||||
log.error("Kafka error: %s", message.error().str())
|
||||
raise confluent_kafka.KafkaException(message.error())
|
||||
|
||||
if self._commit_callback:
|
||||
time_now = datetime.datetime.now()
|
||||
time_delta = time_now - self._last_commit
|
||||
if time_delta.total_seconds() > self._max_commit_interval:
|
||||
self._commit_callback()
|
||||
|
||||
def commit(self):
|
||||
self._last_commit = datetime.datetime.now()
|
||||
self._consumer.commit()
|
@ -13,6 +13,7 @@
|
||||
|
||||
import mock
|
||||
|
||||
from monasca_common.confluent_kafka import consumer
|
||||
from monasca_common.confluent_kafka import producer
|
||||
|
||||
import confluent_kafka
|
||||
@ -28,13 +29,13 @@ class TestConfluentKafkaProducer(base.BaseTestCase):
|
||||
def setUp(self, mock_confluent_producer):
|
||||
super(TestConfluentKafkaProducer, self).setUp()
|
||||
self.mock_confluent_producer = mock_confluent_producer
|
||||
self.prod = producer.KafkaProducer(FAKE_KAFKA_TOPIC)
|
||||
self.prod = producer.KafkaProducer(FAKE_KAFKA_URL)
|
||||
|
||||
def tearDown(self):
|
||||
super(TestConfluentKafkaProducer, self).tearDown()
|
||||
|
||||
def test_kafka_producer_init(self):
|
||||
expected_config = {'bootstrap.servers': FAKE_KAFKA_TOPIC}
|
||||
expected_config = {'bootstrap.servers': FAKE_KAFKA_URL}
|
||||
|
||||
self.mock_confluent_producer.assert_called_once_with(expected_config)
|
||||
self.assertEqual(self.mock_confluent_producer.return_value,
|
||||
@ -92,3 +93,85 @@ class TestConfluentKafkaProducer(base.BaseTestCase):
|
||||
def test_delivery_report(self, mock_message, mock_logger):
|
||||
self.prod.delivery_report(None, confluent_kafka.Message)
|
||||
mock_logger.debug.assert_called_once()
|
||||
|
||||
|
||||
class TestConfluentKafkaConsumer(base.BaseTestCase):
|
||||
|
||||
@mock.patch('confluent_kafka.Consumer')
|
||||
def setUp(self, mock_confluent_consumer):
|
||||
super(TestConfluentKafkaConsumer, self).setUp()
|
||||
self.mock_confluent_consumer = mock_confluent_consumer
|
||||
self.consumer = consumer.KafkaConsumer(['fake_server1',
|
||||
'fake_server2'],
|
||||
'fake_group',
|
||||
FAKE_KAFKA_TOPIC, 128,
|
||||
'test_client',
|
||||
TestConfluentKafkaConsumer.rep_callback,
|
||||
TestConfluentKafkaConsumer.com_callback,
|
||||
5)
|
||||
|
||||
@staticmethod
|
||||
def rep_callback(consumer, partitions):
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def com_callback(consumer, partitions):
|
||||
pass
|
||||
|
||||
def tearDown(self):
|
||||
super(TestConfluentKafkaConsumer, self).tearDown()
|
||||
|
||||
def test_kafka_consumer_init(self):
|
||||
expected_config = {'group.id': 'fake_group',
|
||||
'bootstrap.servers': ['fake_server1',
|
||||
'fake_server2'],
|
||||
'fetch.min.bytes': 128,
|
||||
'client.id': 'test_client',
|
||||
'enable.auto.commit': False,
|
||||
'default.topic.config':
|
||||
{'auto.offset.reset': 'earliest'}
|
||||
}
|
||||
|
||||
self.mock_confluent_consumer.assert_called_once_with(expected_config)
|
||||
self.assertEqual(self.consumer._consumer,
|
||||
self.mock_confluent_consumer.return_value)
|
||||
self.assertEqual(self.consumer._commit_callback,
|
||||
TestConfluentKafkaConsumer.com_callback)
|
||||
self.assertEqual(self.consumer._max_commit_interval, 5)
|
||||
self.mock_confluent_consumer.return_value.subscribe \
|
||||
.assert_called_once_with([FAKE_KAFKA_TOPIC],
|
||||
on_revoke=TestConfluentKafkaConsumer.rep_callback)
|
||||
|
||||
@mock.patch('confluent_kafka.Message')
|
||||
def test_kafka_consumer_iteration(self, mock_kafka_message):
|
||||
mock_kafka_message.return_value.error.return_value = None
|
||||
messages = []
|
||||
for i in range(5):
|
||||
m = mock_kafka_message.return_value
|
||||
m.set_value("message{}".format(i))
|
||||
messages.append(m)
|
||||
self.consumer._consumer.poll.side_effect = messages
|
||||
for index, message in enumerate(self.consumer):
|
||||
self.assertEqual(message, messages[index])
|
||||
|
||||
@mock.patch('confluent_kafka.Message')
|
||||
@mock.patch('confluent_kafka.KafkaError')
|
||||
def test_kafka_consumer_poll_exception(self,
|
||||
mock_kafka_error,
|
||||
mock_kafka_message):
|
||||
mock_kafka_error.return_value.str = 'fake error message'
|
||||
mock_kafka_message.return_value.error.return_value = \
|
||||
mock_kafka_error
|
||||
messages = [mock_kafka_message.return_value]
|
||||
|
||||
self.consumer._consumer.poll.side_effect = messages
|
||||
try:
|
||||
list(self.consumer)
|
||||
except Exception as ex:
|
||||
self.assertIsInstance(ex, confluent_kafka.KafkaException)
|
||||
|
||||
@mock.patch('datetime.datetime')
|
||||
def test_kafka_commit(self, mock_datetime):
|
||||
self.consumer.commit()
|
||||
mock_datetime.now.assert_called_once()
|
||||
self.mock_confluent_consumer.return_value.commit.assert_called_once()
|
||||
|
Loading…
x
Reference in New Issue
Block a user