Py3:Fix messages encoding for kafka producer
Story: 2000975 Task: 23056 Change-Id: Id0273c6c6f54342286d02e326a392e1479a15fe8
This commit is contained in:
parent
0a11428eaf
commit
1cce6824ed
monasca_common
@ -16,7 +16,7 @@
|
||||
import logging
|
||||
import time
|
||||
|
||||
from six import PY2
|
||||
from six import PY3
|
||||
|
||||
import monasca_common.kafka_lib.client as kafka_client
|
||||
import monasca_common.kafka_lib.producer as kafka_producer
|
||||
@ -51,11 +51,15 @@ class KafkaProducer(object):
|
||||
|
||||
first = True
|
||||
success = False
|
||||
if key is None:
|
||||
key = int(time.time() * 1000)
|
||||
if PY3:
|
||||
key = bytes(str(key), 'utf-8')
|
||||
messages = [m.encode("utf-8") for m in messages]
|
||||
else:
|
||||
key = str(key)
|
||||
while not success:
|
||||
try:
|
||||
if key is None:
|
||||
key = int(time.time() * 1000)
|
||||
key = str(key) if PY2 else bytes(str(key), 'utf-8')
|
||||
self._producer.send_messages(topic, key, *messages)
|
||||
success = True
|
||||
except Exception:
|
||||
|
@ -53,11 +53,12 @@ class TestKafkaProducer(base.BaseTestCase):
|
||||
messages = ['message']
|
||||
key = "key"
|
||||
expected_key = b"key"
|
||||
expected_messages = [b'message']
|
||||
|
||||
self.monasca_kafka_producer.publish(topic, messages, key)
|
||||
|
||||
self.producer.send_messages.assert_called_once_with(
|
||||
topic, expected_key, *messages)
|
||||
topic, expected_key, *expected_messages)
|
||||
|
||||
@mock.patch('monasca_common.kafka.producer.time')
|
||||
def test_kafka_producer_publish_one_message_without_key(self, mock_time):
|
||||
@ -65,12 +66,13 @@ class TestKafkaProducer(base.BaseTestCase):
|
||||
message = 'not_a_list'
|
||||
mock_time.time.return_value = 1
|
||||
expected_key = b'1000'
|
||||
expected_message = b'not_a_list'
|
||||
|
||||
self.monasca_kafka_producer.publish(topic, message)
|
||||
|
||||
self.assertTrue(mock_time.time.called)
|
||||
self.producer.send_messages.assert_called_once_with(
|
||||
topic, expected_key, message)
|
||||
topic, expected_key, expected_message)
|
||||
|
||||
@mock.patch('monasca_common.kafka.producer.log')
|
||||
def test_kafka_producer_publish_exception(self, mock_logger):
|
||||
|
Loading…
x
Reference in New Issue
Block a user