Merge "Add heartbeat() method to RpcIncomingMessage"
This commit is contained in:
commit
4d03b16334
@ -178,6 +178,9 @@ class AMQPIncomingMessage(base.RpcIncomingMessage):
|
|||||||
# the end.
|
# the end.
|
||||||
self._message_operations_handler.do(self.message.requeue)
|
self._message_operations_handler.do(self.message.requeue)
|
||||||
|
|
||||||
|
def heartbeat(self):
|
||||||
|
LOG.debug("Message heartbeat not implemented")
|
||||||
|
|
||||||
|
|
||||||
class ObsoleteReplyQueuesCache(object):
|
class ObsoleteReplyQueuesCache(object):
|
||||||
"""Cache of reply queue id that doesn't exists anymore.
|
"""Cache of reply queue id that doesn't exists anymore.
|
||||||
|
@ -154,6 +154,19 @@ class RpcIncomingMessage(IncomingMessage):
|
|||||||
:raises: Does not raise an exception
|
:raises: Does not raise an exception
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def heartbeat(self):
|
||||||
|
"""Called by the server to send an RPC heartbeat message back to
|
||||||
|
the calling client.
|
||||||
|
|
||||||
|
If the client (is new enough to have) passed its timeout value during
|
||||||
|
the RPC call, this method will be called periodically by the server
|
||||||
|
to update the client's timeout timer while a long-running call is
|
||||||
|
executing.
|
||||||
|
|
||||||
|
:raises: Does not raise an exception
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
@six.add_metaclass(abc.ABCMeta)
|
@six.add_metaclass(abc.ABCMeta)
|
||||||
class PollStyleListener(object):
|
class PollStyleListener(object):
|
||||||
|
@ -98,6 +98,9 @@ class ProtonIncomingMessage(base.RpcIncomingMessage):
|
|||||||
self._correlation_id = message.id
|
self._correlation_id = message.id
|
||||||
self._disposition = disposition
|
self._disposition = disposition
|
||||||
|
|
||||||
|
def heartbeat(self):
|
||||||
|
LOG.debug("Message heartbeat not implemented")
|
||||||
|
|
||||||
def reply(self, reply=None, failure=None):
|
def reply(self, reply=None, failure=None):
|
||||||
"""Schedule an RPCReplyTask to send the reply."""
|
"""Schedule an RPCReplyTask to send the reply."""
|
||||||
if self._reply_to:
|
if self._reply_to:
|
||||||
|
@ -38,6 +38,9 @@ class FakeIncomingMessage(base.RpcIncomingMessage):
|
|||||||
def requeue(self):
|
def requeue(self):
|
||||||
self.requeue_callback()
|
self.requeue_callback()
|
||||||
|
|
||||||
|
def heartbeat(self):
|
||||||
|
"""Heartbeat is not supported."""
|
||||||
|
|
||||||
|
|
||||||
class FakeListener(base.PollStyleListener):
|
class FakeListener(base.PollStyleListener):
|
||||||
|
|
||||||
|
@ -315,6 +315,9 @@ class OsloKafkaMessage(base.RpcIncomingMessage):
|
|||||||
def reply(self, reply=None, failure=None):
|
def reply(self, reply=None, failure=None):
|
||||||
LOG.warning(_LW("reply is not supported"))
|
LOG.warning(_LW("reply is not supported"))
|
||||||
|
|
||||||
|
def heartbeat(self):
|
||||||
|
LOG.warning(_LW("heartbeat is not supported"))
|
||||||
|
|
||||||
|
|
||||||
class KafkaListener(base.PollStyleListener):
|
class KafkaListener(base.PollStyleListener):
|
||||||
|
|
||||||
|
@ -36,3 +36,6 @@ class ZmqIncomingMessage(base.RpcIncomingMessage):
|
|||||||
|
|
||||||
def requeue(self):
|
def requeue(self):
|
||||||
"""Requeue is not supported."""
|
"""Requeue is not supported."""
|
||||||
|
|
||||||
|
def heartbeat(self):
|
||||||
|
"""Heartbeat is not supported."""
|
||||||
|
Loading…
x
Reference in New Issue
Block a user