Add unit tests for common Kafka module
Add tests for kafka producer and consumer modules. The test coverage of monasca_common was was 9%. This is improved in this commit. Show test coverage when running tox. Change the nosetests command to show test coverage for the whole module. Change-Id: I771a539aee5fa92c065ee16b5bb94c9ae7e7a09b
This commit is contained in:
parent
91469d6ebf
commit
bfc88d4f05
167
monasca_common/tests/test_kafka.py
Normal file
167
monasca_common/tests/test_kafka.py
Normal file
@ -0,0 +1,167 @@
|
||||
# Copyright (c) 2016 OpenStack Foundation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
import unittest
|
||||
|
||||
from monasca_common.kafka.consumer import KafkaConsumer
|
||||
from monasca_common.kafka.producer import KafkaProducer
|
||||
|
||||
|
||||
FAKE_KAFKA_URL = "kafka_url"
|
||||
FAKE_ZOOKEEPER_URL = "zookeeper_url"
|
||||
FAKE_ZOOKEEPER_PATH = "zookeeper_path"
|
||||
FAKE_KAFKA_CONSUMER_GROUP = "group"
|
||||
FAKE_KAFKA_TOPIC = "topic"
|
||||
|
||||
|
||||
class TestKafkaProducer(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.kafka_client_patcher = mock.patch('kafka.client')
|
||||
self.kafka_producer_patcher = mock.patch('kafka.producer')
|
||||
self.mock_kafka_client = self.kafka_client_patcher.start()
|
||||
self.mock_kafka_producer = self.kafka_producer_patcher.start()
|
||||
self.producer = self.mock_kafka_producer.KeyedProducer.return_value
|
||||
self.client = self.mock_kafka_client.KafkaClient.return_value
|
||||
self.monasca_kafka_producer = KafkaProducer(FAKE_KAFKA_URL)
|
||||
|
||||
def tearDown(self):
|
||||
self.kafka_producer_patcher.stop()
|
||||
self.kafka_client_patcher.stop()
|
||||
|
||||
def test_kafka_producer_init(self):
|
||||
self.assertTrue(self.mock_kafka_client.KafkaClient.called)
|
||||
self.assertTrue(self.mock_kafka_producer.KeyedProducer.called)
|
||||
|
||||
def test_kafka_producer_publish(self):
|
||||
topic = FAKE_KAFKA_TOPIC
|
||||
messages = ['message']
|
||||
key = 'key'
|
||||
|
||||
self.monasca_kafka_producer.publish(topic, messages, key)
|
||||
|
||||
self.producer.send_messages.assert_called_once_with(
|
||||
topic, key, *messages)
|
||||
|
||||
@mock.patch('monasca_common.kafka.producer.time')
|
||||
def test_kafka_producer_publish_one_message_without_key(self, mock_time):
|
||||
topic = FAKE_KAFKA_TOPIC
|
||||
message = 'not_a_list'
|
||||
mock_time.time.return_value = 1
|
||||
expected_key = '1000'
|
||||
|
||||
self.monasca_kafka_producer.publish(topic, message)
|
||||
|
||||
self.assertTrue(mock_time.time.called)
|
||||
self.producer.send_messages.assert_called_once_with(
|
||||
topic, expected_key, message)
|
||||
|
||||
@mock.patch('monasca_common.kafka.producer.log')
|
||||
def test_kafka_producer_publish_exception(self, mock_logger):
|
||||
class MockException(Exception):
|
||||
pass
|
||||
|
||||
topic = FAKE_KAFKA_TOPIC
|
||||
messages = ['message']
|
||||
key = 'key'
|
||||
self.producer.send_messages.side_effect = MockException
|
||||
|
||||
self.assertRaises(MockException, self.monasca_kafka_producer.publish,
|
||||
topic, messages, key)
|
||||
|
||||
mock_logger.exception.assert_called_once_with(
|
||||
'Error publishing to {} topic.'. format(topic))
|
||||
|
||||
|
||||
class TestKafkaConsumer(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.kafka_client_patcher = mock.patch('kafka.client')
|
||||
self.kafka_common_patcher = mock.patch('kafka.common')
|
||||
self.kafka_consumer_patcher = mock.patch('kafka.consumer')
|
||||
self.kazoo_patcher = mock.patch(
|
||||
'monasca_common.kafka.consumer.KazooClient')
|
||||
|
||||
self.mock_kafka_client = self.kafka_client_patcher.start()
|
||||
self.mock_kafka_common = self.kafka_common_patcher.start()
|
||||
self.mock_kafka_consumer = self.kafka_consumer_patcher.start()
|
||||
self.kazoo_patcher.start()
|
||||
|
||||
self.client = self.mock_kafka_client.KafkaClient.return_value
|
||||
self.consumer = self.mock_kafka_consumer.SimpleConsumer.return_value
|
||||
|
||||
self.monasca_kafka_consumer = KafkaConsumer(
|
||||
FAKE_KAFKA_URL, FAKE_ZOOKEEPER_URL, FAKE_ZOOKEEPER_PATH,
|
||||
FAKE_KAFKA_CONSUMER_GROUP, FAKE_KAFKA_TOPIC)
|
||||
|
||||
def tearDown(self):
|
||||
self.kafka_client_patcher.stop()
|
||||
self.kafka_common_patcher.stop()
|
||||
self.kafka_consumer_patcher.stop()
|
||||
self.kazoo_patcher.stop()
|
||||
|
||||
def test_kafka_consumer_init(self):
|
||||
self.assertTrue(self.mock_kafka_client.KafkaClient.called)
|
||||
self.assertTrue(self.mock_kafka_consumer.SimpleConsumer.called)
|
||||
|
||||
@mock.patch('monasca_common.kafka.consumer.SetPartitioner')
|
||||
def test_kafka_consumer_process_messages(self, mock_set_partitioner):
|
||||
messages = []
|
||||
for i in range(5):
|
||||
messages.append("message{}".format(i))
|
||||
self.consumer.get_message.side_effect = messages
|
||||
mock_set_partitioner.return_value.failed = False
|
||||
mock_set_partitioner.return_value.release = False
|
||||
mock_set_partitioner.return_value.acquired = True
|
||||
mock_set_partitioner.return_value.__iter__.return_value = [1]
|
||||
|
||||
for index, message in enumerate(self.monasca_kafka_consumer):
|
||||
self.assertEqual(message, messages[index])
|
||||
|
||||
@mock.patch('monasca_common.kafka.consumer.datetime')
|
||||
def test_commit(self, mock_datetime):
|
||||
self.monasca_kafka_consumer.commit()
|
||||
|
||||
self.assertTrue(mock_datetime.datetime.now.called)
|
||||
self.consumer.commit.assert_called_once_with(
|
||||
partitions=self.monasca_kafka_consumer._partitions)
|
||||
|
||||
@mock.patch('monasca_common.kafka.consumer.SetPartitioner')
|
||||
def test_iteration_failed_to_acquire_partition(self, mock_set_partitioner):
|
||||
mock_set_partitioner.return_value.failed = True
|
||||
|
||||
try:
|
||||
list(self.monasca_kafka_consumer)
|
||||
except Exception as e:
|
||||
self.assertEqual(e.message, "Failed to acquire partition")
|
||||
|
||||
@mock.patch('monasca_common.kafka.consumer.SetPartitioner')
|
||||
def test_kafka_consumer_reset_when_offset_out_of_range(
|
||||
self, mock_set_partitioner):
|
||||
class OffsetOutOfRangeError(Exception):
|
||||
pass
|
||||
|
||||
self.mock_kafka_common.OffsetOutOfRangeError = OffsetOutOfRangeError
|
||||
self.consumer.get_message.side_effect = [OffsetOutOfRangeError,
|
||||
"message"]
|
||||
mock_set_partitioner.return_value.failed = False
|
||||
mock_set_partitioner.return_value.release = False
|
||||
mock_set_partitioner.return_value.acquired = True
|
||||
mock_set_partitioner.return_value.__iter__.return_value = [1]
|
||||
|
||||
list(self.monasca_kafka_consumer)
|
||||
|
||||
self.consumer.seek.assert_called_once_with(0, 0)
|
2
tox.ini
2
tox.ini
@ -12,7 +12,7 @@ deps = -r{toxinidir}/requirements.txt
|
||||
whitelist_externals = find
|
||||
commands =
|
||||
find . -type f -name "*.pyc" -delete
|
||||
nosetests monasca_common/tests
|
||||
nosetests --with-coverage --cover-package=monasca_common/. --cover-erase
|
||||
|
||||
[testenv:pep8]
|
||||
commands = flake8 monasca_common
|
||||
|
Loading…
x
Reference in New Issue
Block a user