kafka: Don't hide unpack/unserialize exception
These exception are software bugs, don't convert them into log for deployer. Change-Id: I10e9112b53e5c754f38247679896d1d24dd454bf
This commit is contained in:
parent
c8880b6f11
commit
a7044799ad
@ -38,6 +38,11 @@ from oslo_messaging._i18n import _LE
|
|||||||
from oslo_messaging._i18n import _LW
|
from oslo_messaging._i18n import _LW
|
||||||
from oslo_serialization import jsonutils
|
from oslo_serialization import jsonutils
|
||||||
|
|
||||||
|
import logging as l
|
||||||
|
l.basicConfig(level=l.INFO)
|
||||||
|
l.getLogger("kafka").setLevel(l.WARN)
|
||||||
|
l.getLogger("stevedore").setLevel(l.WARN)
|
||||||
|
|
||||||
if eventletutils.is_monkey_patched('select'):
|
if eventletutils.is_monkey_patched('select'):
|
||||||
# monkeypatch the vendored SelectSelector._select like eventlet does
|
# monkeypatch the vendored SelectSelector._select like eventlet does
|
||||||
# https://github.com/eventlet/eventlet/blob/master/eventlet/green/selectors.py#L32
|
# https://github.com/eventlet/eventlet/blob/master/eventlet/green/selectors.py#L32
|
||||||
@ -55,18 +60,11 @@ LOG = logging.getLogger(__name__)
|
|||||||
def unpack_message(msg):
|
def unpack_message(msg):
|
||||||
context = {}
|
context = {}
|
||||||
message = None
|
message = None
|
||||||
try:
|
msg = json.loads(msg)
|
||||||
if msg:
|
message = driver_common.deserialize_msg(msg)
|
||||||
msg = json.loads(msg)
|
context = message['_context']
|
||||||
message = driver_common.deserialize_msg(msg)
|
del message['_context']
|
||||||
if 'context' in message:
|
return context, message
|
||||||
context = message['context']
|
|
||||||
del message['context']
|
|
||||||
except ValueError as e:
|
|
||||||
LOG.info("Invalid format of consumed message: %s" % e)
|
|
||||||
except Exception:
|
|
||||||
LOG.warning(_LW("Exception during message unpacking"))
|
|
||||||
return message, context
|
|
||||||
|
|
||||||
|
|
||||||
def pack_message(ctxt, msg):
|
def pack_message(ctxt, msg):
|
||||||
@ -76,7 +74,7 @@ def pack_message(ctxt, msg):
|
|||||||
context_d = ctxt
|
context_d = ctxt
|
||||||
else:
|
else:
|
||||||
context_d = ctxt.to_dict()
|
context_d = ctxt.to_dict()
|
||||||
msg['context'] = context_d
|
msg['_context'] = context_d
|
||||||
|
|
||||||
msg = driver_common.serialize_msg(msg)
|
msg = driver_common.serialize_msg(msg)
|
||||||
|
|
||||||
@ -181,7 +179,15 @@ class Connection(object):
|
|||||||
|
|
||||||
@with_reconnect()
|
@with_reconnect()
|
||||||
def _poll_messages(self, timeout):
|
def _poll_messages(self, timeout):
|
||||||
return self.consumer.poll(timeout * 1000.0)
|
messages = self.consumer.poll(timeout * 1000.0)
|
||||||
|
messages = [record.value
|
||||||
|
for records in messages.values() if records
|
||||||
|
for record in records]
|
||||||
|
if not messages:
|
||||||
|
# NOTE(sileht): really ? you return payload but no messages...
|
||||||
|
# simulate timeout to consume message again
|
||||||
|
raise kafka.errors.ConsumerTimeout()
|
||||||
|
return messages
|
||||||
|
|
||||||
def consume(self, timeout=None):
|
def consume(self, timeout=None):
|
||||||
"""Receive up to 'max_fetch_messages' messages.
|
"""Receive up to 'max_fetch_messages' messages.
|
||||||
@ -275,26 +281,17 @@ class KafkaListener(base.PollStyleListener):
|
|||||||
|
|
||||||
@base.batch_poll_helper
|
@base.batch_poll_helper
|
||||||
def poll(self, timeout=None):
|
def poll(self, timeout=None):
|
||||||
# TODO(sileht): use batch capability of kafka
|
|
||||||
while not self._stopped.is_set():
|
while not self._stopped.is_set():
|
||||||
if self.incoming_queue:
|
if self.incoming_queue:
|
||||||
return self.incoming_queue.pop(0)
|
return self.incoming_queue.pop(0)
|
||||||
try:
|
try:
|
||||||
messages = self.conn.consume(timeout=timeout)
|
messages = self.conn.consume(timeout=timeout) or []
|
||||||
if messages:
|
for message in messages:
|
||||||
self._put_messages_to_queue(messages)
|
msg = OsloKafkaMessage(*unpack_message(message))
|
||||||
|
self.incoming_queue.append(msg)
|
||||||
except driver_common.Timeout:
|
except driver_common.Timeout:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def _put_messages_to_queue(self, messages):
|
|
||||||
for topic, records in messages.items():
|
|
||||||
if records:
|
|
||||||
for record in records:
|
|
||||||
message, context = unpack_message(record.value)
|
|
||||||
if message:
|
|
||||||
self.incoming_queue.append(
|
|
||||||
OsloKafkaMessage(ctxt=context, message=message))
|
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
self._stopped.set()
|
self._stopped.set()
|
||||||
self.conn.stop_consuming()
|
self.conn.stop_consuming()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user