diff --git a/oslo_messaging/_drivers/amqp.py b/oslo_messaging/_drivers/amqp.py index 01eeca69a..499008110 100644 --- a/oslo_messaging/_drivers/amqp.py +++ b/oslo_messaging/_drivers/amqp.py @@ -76,6 +76,7 @@ def unpack_context(msg): context_dict[key[9:]] = value context_dict['msg_id'] = msg.pop('_msg_id', None) context_dict['reply_q'] = msg.pop('_reply_q', None) + context_dict['client_timeout'] = msg.pop('_timeout', None) return RpcContext.from_dict(context_dict) diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index abda9b0ba..e4d0617d9 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -88,19 +88,21 @@ class MessageOperationsHandler(object): class AMQPIncomingMessage(base.RpcIncomingMessage): def __init__(self, listener, ctxt, message, unique_id, msg_id, reply_q, - obsolete_reply_queues, message_operations_handler): + client_timeout, obsolete_reply_queues, + message_operations_handler): super(AMQPIncomingMessage, self).__init__(ctxt, message) self.listener = listener self.unique_id = unique_id self.msg_id = msg_id self.reply_q = reply_q + self.client_timeout = client_timeout self._obsolete_reply_queues = obsolete_reply_queues self._message_operations_handler = message_operations_handler self.stopwatch = timeutils.StopWatch() self.stopwatch.start() - def _send_reply(self, conn, reply=None, failure=None): + def _send_reply(self, conn, reply=None, failure=None, ending=True): if not self._obsolete_reply_queues.reply_q_valid(self.reply_q, self.msg_id): return @@ -109,7 +111,7 @@ class AMQPIncomingMessage(base.RpcIncomingMessage): failure = rpc_common.serialize_remote_exception(failure) # NOTE(sileht): ending can be removed in N*, see Listener.wait() # for more detail. - msg = {'result': reply, 'failure': failure, 'ending': True, + msg = {'result': reply, 'failure': failure, 'ending': ending, '_msg_id': self.msg_id} rpc_amqp._add_unique_id(msg) unique_id = msg[rpc_amqp.UNIQUE_ID] @@ -179,7 +181,9 @@ class AMQPIncomingMessage(base.RpcIncomingMessage): self._message_operations_handler.do(self.message.requeue) def heartbeat(self): - LOG.debug("Message heartbeat not implemented") + with self.listener.driver._get_connection( + rpc_common.PURPOSE_SEND) as conn: + self._send_reply(conn, None, None, ending=False) class ObsoleteReplyQueuesCache(object): @@ -259,6 +263,7 @@ class AMQPListener(base.PollStyleListener): unique_id, ctxt.msg_id, ctxt.reply_q, + ctxt.client_timeout, self._obsolete_reply_queues, self._message_operations_handler)) @@ -426,7 +431,7 @@ class ReplyWaiter(object): ending = data.get('ending', False) return result, ending - def wait(self, msg_id, timeout): + def wait(self, msg_id, timeout, call_monitor_timeout): # NOTE(sileht): for each msg_id we receive two amqp message # first one with the payload, a second one to ensure the other # have finish to send the payload @@ -435,10 +440,21 @@ class ReplyWaiter(object): # support both cases for now. timer = rpc_common.DecayingTimer(duration=timeout) timer.start() + if call_monitor_timeout: + call_monitor_timer = rpc_common.DecayingTimer( + duration=call_monitor_timeout) + call_monitor_timer.start() + else: + call_monitor_timer = None final_reply = None ending = False while not ending: timeout = timer.check_return(self._raise_timeout_exception, msg_id) + if call_monitor_timer and timeout > 0: + cm_timeout = call_monitor_timer.check_return( + self._raise_timeout_exception, msg_id) + if cm_timeout < timeout: + timeout = cm_timeout try: message = self.waiters.get(msg_id, timeout=timeout) except moves.queue.Empty: @@ -450,6 +466,10 @@ class ReplyWaiter(object): # empty `result` field or a second _send_reply() with # ending=True and no `result` field. final_reply = reply + elif ending is False: + LOG.debug('Call monitor heartbeat received; ' + 'renewing timeout timer') + call_monitor_timer.restart() return final_reply @@ -495,7 +515,7 @@ class AMQPDriverBase(base.BaseDriver): return self._reply_q def _send(self, target, ctxt, message, - wait_for_reply=None, timeout=None, + wait_for_reply=None, timeout=None, call_monitor_timeout=None, envelope=True, notify=False, retry=None): msg = message @@ -504,6 +524,7 @@ class AMQPDriverBase(base.BaseDriver): msg_id = uuid.uuid4().hex msg.update({'_msg_id': msg_id}) msg.update({'_reply_q': self._get_reply_q()}) + msg.update({'_timeout': call_monitor_timeout}) rpc_amqp._add_unique_id(msg) unique_id = msg[rpc_amqp.UNIQUE_ID] @@ -548,7 +569,8 @@ class AMQPDriverBase(base.BaseDriver): msg=msg, timeout=timeout, retry=retry) if wait_for_reply: - result = self._waiter.wait(msg_id, timeout) + result = self._waiter.wait(msg_id, timeout, + call_monitor_timeout) if isinstance(result, Exception): raise result return result @@ -557,9 +579,9 @@ class AMQPDriverBase(base.BaseDriver): self._waiter.unlisten(msg_id) def send(self, target, ctxt, message, wait_for_reply=None, timeout=None, - retry=None): + call_monitor_timeout=None, retry=None): return self._send(target, ctxt, message, wait_for_reply, timeout, - retry=retry) + call_monitor_timeout, retry=retry) def send_notification(self, target, ctxt, message, version, retry=None): return self._send(target, ctxt, message, diff --git a/oslo_messaging/_drivers/base.py b/oslo_messaging/_drivers/base.py index f02344126..82dcda5b7 100644 --- a/oslo_messaging/_drivers/base.py +++ b/oslo_messaging/_drivers/base.py @@ -81,6 +81,7 @@ class IncomingMessage(object): def __init__(self, ctxt, message): self.ctxt = ctxt self.message = message + self.client_timeout = None def acknowledge(self): """Called by the server to acknowledge receipt of the message. When @@ -362,7 +363,8 @@ class BaseDriver(object): @abc.abstractmethod def send(self, target, ctxt, message, - wait_for_reply=None, timeout=None, retry=None): + wait_for_reply=None, timeout=None, call_monitor_timeout=None, + retry=None): """Send a message to the given target and optionally wait for a reply. This method is used by the RPC client when sending RPC requests to a server. @@ -426,6 +428,10 @@ class BaseDriver(object): operation to complete. Should this expire the :py:meth:`send` must raise a :py:exc:`MessagingTimeout` exception :type timeout: float + :param call_monitor_timeout: Maximum time the client will wait for the + call to complete or receive a message heartbeat indicating the + remote side is still executing. + :type call_monitor_timeout: float :param retry: maximum message send attempts permitted :type retry: int :returns: A reply message or None if no reply expected diff --git a/oslo_messaging/_drivers/impl_amqp1.py b/oslo_messaging/_drivers/impl_amqp1.py index 5714a6124..b56cee066 100644 --- a/oslo_messaging/_drivers/impl_amqp1.py +++ b/oslo_messaging/_drivers/impl_amqp1.py @@ -268,7 +268,8 @@ class ProtonDriver(base.BaseDriver): @_ensure_connect_called def send(self, target, ctxt, message, - wait_for_reply=False, timeout=None, retry=None): + wait_for_reply=False, timeout=None, call_monitor_timeout=None, + retry=None): """Send a message to the given target. :param target: destination for message @@ -282,6 +283,10 @@ class ProtonDriver(base.BaseDriver): :param timeout: raise exception if send does not complete within timeout seconds. None == no timeout. :type timeout: float + :param call_monitor_timeout: Maximum time the client will wait for the + call to complete or receive a message heartbeat indicating the + remote side is still executing. + :type call_monitor_timeout: float :param retry: (optional) maximum re-send attempts on recoverable error None or -1 means to retry forever 0 means no retry diff --git a/oslo_messaging/_drivers/impl_fake.py b/oslo_messaging/_drivers/impl_fake.py index fd66133e9..fccfebe08 100644 --- a/oslo_messaging/_drivers/impl_fake.py +++ b/oslo_messaging/_drivers/impl_fake.py @@ -216,7 +216,7 @@ class FakeDriver(base.BaseDriver): return None def send(self, target, ctxt, message, wait_for_reply=None, timeout=None, - retry=None): + call_monitor_timeout=None, retry=None): # NOTE(sileht): retry doesn't need to be implemented, the fake # transport always works return self._send(target, ctxt, message, wait_for_reply, timeout) diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py index 184c65b75..b6a0e6272 100644 --- a/oslo_messaging/_drivers/impl_kafka.py +++ b/oslo_messaging/_drivers/impl_kafka.py @@ -375,7 +375,7 @@ class KafkaDriver(base.BaseDriver): self.listeners = [] def send(self, target, ctxt, message, wait_for_reply=None, timeout=None, - retry=None): + call_monitor_timeout=None, retry=None): raise NotImplementedError( 'The RPC implementation for Kafka is not implemented') @@ -390,6 +390,10 @@ class KafkaDriver(base.BaseDriver): :type message: dict :param version: Messaging API version (currently not used) :type version: str + :param call_monitor_timeout: Maximum time the client will wait for the + call to complete before or receive a message heartbeat indicating + the remote side is still executing. + :type call_monitor_timeout: float :param retry: an optional default kafka consumer retries configuration None means to retry forever 0 means no retry diff --git a/oslo_messaging/rpc/client.py b/oslo_messaging/rpc/client.py index 4bf7fc443..0f758cc24 100644 --- a/oslo_messaging/rpc/client.py +++ b/oslo_messaging/rpc/client.py @@ -94,13 +94,15 @@ class _BaseCallContext(object): _marker = object() def __init__(self, transport, target, serializer, - timeout=None, version_cap=None, retry=None): + timeout=None, version_cap=None, retry=None, + call_monitor_timeout=None): self.conf = transport.conf self.transport = transport self.target = target self.serializer = serializer self.timeout = timeout + self.call_monitor_timeout = call_monitor_timeout self.retry = retry self.version_cap = version_cap @@ -166,11 +168,14 @@ class _BaseCallContext(object): if self.timeout is None: timeout = self.conf.rpc_response_timeout + cm_timeout = self.call_monitor_timeout + self._check_version_cap(msg.get('version')) try: result = self.transport._send(self.target, msg_ctxt, msg, wait_for_reply=True, timeout=timeout, + call_monitor_timeout=cm_timeout, retry=self.retry) except driver_base.TransportDriverError as ex: raise ClientSendError(self.target, ex) @@ -180,7 +185,8 @@ class _BaseCallContext(object): @abc.abstractmethod def prepare(self, exchange=_marker, topic=_marker, namespace=_marker, version=_marker, server=_marker, fanout=_marker, - timeout=_marker, version_cap=_marker, retry=_marker): + timeout=_marker, version_cap=_marker, retry=_marker, + call_monitor_timeout=_marker): """Prepare a method invocation context. See RPCClient.prepare().""" @@ -192,7 +198,8 @@ class _CallContext(_BaseCallContext): def _prepare(cls, call_context, exchange=_marker, topic=_marker, namespace=_marker, version=_marker, server=_marker, fanout=_marker, - timeout=_marker, version_cap=_marker, retry=_marker): + timeout=_marker, version_cap=_marker, retry=_marker, + call_monitor_timeout=_marker): cls._check_version(version) kwargs = dict( exchange=exchange, @@ -211,18 +218,23 @@ class _CallContext(_BaseCallContext): version_cap = call_context.version_cap if retry is cls._marker: retry = call_context.retry + if call_monitor_timeout is cls._marker: + call_monitor_timeout = call_context.call_monitor_timeout return _CallContext(call_context.transport, target, call_context.serializer, - timeout, version_cap, retry) + timeout, version_cap, retry, + call_monitor_timeout) def prepare(self, exchange=_marker, topic=_marker, namespace=_marker, version=_marker, server=_marker, fanout=_marker, - timeout=_marker, version_cap=_marker, retry=_marker): + timeout=_marker, version_cap=_marker, retry=_marker, + call_monitor_timeout=_marker): return _CallContext._prepare(self, exchange, topic, namespace, version, server, fanout, - timeout, version_cap, retry) + timeout, version_cap, retry, + call_monitor_timeout) class RPCClient(_BaseCallContext): @@ -314,7 +326,8 @@ class RPCClient(_BaseCallContext): _marker = _BaseCallContext._marker def __init__(self, transport, target, - timeout=None, version_cap=None, serializer=None, retry=None): + timeout=None, version_cap=None, serializer=None, retry=None, + call_monitor_timeout=None): """Construct an RPC client. :param transport: a messaging transport handle @@ -332,6 +345,13 @@ class RPCClient(_BaseCallContext): 0 means no retry is attempted. N means attempt at most N retries. :type retry: int + :param call_monitor_timeout: an optional timeout (in seconds) for + active call heartbeating. If specified, + requires the server to heartbeat + long-running calls at this interval + (less than the overall timeout + parameter). + :type call_monitor_timeout: int """ if serializer is None: serializer = msg_serializer.NoOpSerializer() @@ -342,14 +362,16 @@ class RPCClient(_BaseCallContext): "instance.") super(RPCClient, self).__init__( - transport, target, serializer, timeout, version_cap, retry + transport, target, serializer, timeout, version_cap, retry, + call_monitor_timeout ) self.conf.register_opts(_client_opts) def prepare(self, exchange=_marker, topic=_marker, namespace=_marker, version=_marker, server=_marker, fanout=_marker, - timeout=_marker, version_cap=_marker, retry=_marker): + timeout=_marker, version_cap=_marker, retry=_marker, + call_monitor_timeout=_marker): """Prepare a method invocation context. Use this method to override client properties for an individual method @@ -380,11 +402,19 @@ class RPCClient(_BaseCallContext): 0 means no retry is attempted. N means attempt at most N retries. :type retry: int + :param call_monitor_timeout: an optional timeout (in seconds) for + active call heartbeating. If specified, + requires the server to heartbeat + long-running calls at this interval + (less than the overall timeout + parameter). + :type call_monitor_timeout: int """ return _CallContext._prepare(self, exchange, topic, namespace, version, server, fanout, - timeout, version_cap, retry) + timeout, version_cap, retry, + call_monitor_timeout) def cast(self, ctxt, method, **kwargs): """Invoke a method without blocking for a return value. diff --git a/oslo_messaging/rpc/dispatcher.py b/oslo_messaging/rpc/dispatcher.py index fda20fe0e..8eb1273fb 100644 --- a/oslo_messaging/rpc/dispatcher.py +++ b/oslo_messaging/rpc/dispatcher.py @@ -30,7 +30,9 @@ __all__ = [ from abc import ABCMeta from abc import abstractmethod +import logging import sys +import threading import six @@ -40,6 +42,8 @@ from oslo_messaging import serializer as msg_serializer from oslo_messaging import server as msg_server from oslo_messaging import target as msg_target +LOG = logging.getLogger(__name__) + class ExpectedException(Exception): """Encapsulates an expected exception raised by an RPC endpoint @@ -190,6 +194,32 @@ class RPCDispatcher(dispatcher.DispatcherBase): result = func(ctxt, **new_args) return self.serializer.serialize_entity(ctxt, result) + def _watchdog(self, event, incoming): + # NOTE(danms): If the client declared that they are going to + # time out after N seconds, send the call-monitor heartbeat + # every N/2 seconds to make sure there is plenty of time to + # account for inbound and outbound queuing delays. Client + # timeouts must be integral and positive, otherwise we log and + # ignore. + try: + client_timeout = int(incoming.client_timeout) + cm_heartbeat_interval = client_timeout / 2 + except ValueError: + client_timeout = cm_heartbeat_interval = 0 + + if cm_heartbeat_interval < 1: + LOG.warning('Client provided an invalid timeout value of %r' % ( + incoming.client_timeout)) + return + + while not event.wait(cm_heartbeat_interval): + LOG.debug( + 'Sending call-monitor heartbeat for active call to %(method)s ' + '(interval=%(interval)i)' % ( + {'method': incoming.message.get('method'), + 'interval': cm_heartbeat_interval})) + incoming.heartbeat() + def dispatch(self, incoming): """Dispatch an RPC message to the appropriate endpoint method. @@ -205,6 +235,20 @@ class RPCDispatcher(dispatcher.DispatcherBase): namespace = message.get('namespace') version = message.get('version', '1.0') + # NOTE(danms): This event and watchdog thread are used to send + # call-monitoring heartbeats for this message while the call + # is executing if it runs for some time. The thread will wait + # for the event to be signaled, which we do explicitly below + # after dispatching the method call. + completion_event = threading.Event() + watchdog_thread = threading.Thread(target=self._watchdog, + args=(completion_event, incoming)) + if incoming.client_timeout: + # NOTE(danms): The client provided a timeout, so we start + # the watchdog thread. If the client is old or didn't send + # a timeout, we just never start the watchdog thread. + watchdog_thread.start() + found_compatible = False for endpoint in self.endpoints: target = getattr(endpoint, 'target', None) @@ -217,7 +261,12 @@ class RPCDispatcher(dispatcher.DispatcherBase): if hasattr(endpoint, method): if self.access_policy.is_allowed(endpoint, method): - return self._do_dispatch(endpoint, method, ctxt, args) + try: + return self._do_dispatch(endpoint, method, ctxt, args) + finally: + completion_event.set() + if incoming.client_timeout: + watchdog_thread.join() found_compatible = True diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py index 1ba3d6324..8dd3109e1 100644 --- a/oslo_messaging/tests/drivers/test_impl_rabbit.py +++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py @@ -469,8 +469,11 @@ class TestSendReceive(test_utils.BaseTestCase): ] _timeout = [ - ('no_timeout', dict(timeout=None)), - ('timeout', dict(timeout=0.01)), # FIXME(markmc): timeout=0 is broken? + ('no_timeout', dict(timeout=None, call_monitor_timeout=None)), + ('timeout', dict(timeout=0.01, # FIXME(markmc): timeout=0 is broken? + call_monitor_timeout=None)), + ('call_monitor_timeout', dict(timeout=0.01, + call_monitor_timeout=0.02)), ] @classmethod @@ -500,15 +503,20 @@ class TestSendReceive(test_utils.BaseTestCase): replies = [] msgs = [] + # FIXME(danms): Surely this is not the right way to do this... + self.ctxt['client_timeout'] = self.call_monitor_timeout + def send_and_wait_for_reply(i): try: timeout = self.timeout + cm_timeout = self.call_monitor_timeout replies.append(driver.send(target, self.ctxt, {'tx_id': i}, wait_for_reply=True, - timeout=timeout)) + timeout=timeout, + call_monitor_timeout=cm_timeout)) self.assertFalse(self.failure) self.assertIsNone(self.timeout) except (ZeroDivisionError, oslo_messaging.MessagingTimeout) as e: @@ -594,14 +602,15 @@ class TestRacyWaitForReply(test_utils.BaseTestCase): wait_conditions = [] orig_reply_waiter = amqpdriver.ReplyWaiter.wait - def reply_waiter(self, msg_id, timeout): + def reply_waiter(self, msg_id, timeout, call_monitor_timeout): if wait_conditions: cond = wait_conditions.pop() with cond: cond.notify() with cond: cond.wait() - return orig_reply_waiter(self, msg_id, timeout) + return orig_reply_waiter(self, msg_id, timeout, + call_monitor_timeout) self.useFixture(fixtures.MockPatchObject( amqpdriver.ReplyWaiter, 'wait', reply_waiter)) @@ -891,11 +900,12 @@ class TestReplyWireFormat(test_utils.BaseTestCase): ] _context = [ - ('empty_ctxt', dict(ctxt={}, expected_ctxt={})), + ('empty_ctxt', dict(ctxt={}, expected_ctxt={'client_timeout': None})), ('user_project_ctxt', dict(ctxt={'_context_user': 'mark', '_context_project': 'snarkybunch'}, - expected_ctxt={'user': 'mark', 'project': 'snarkybunch'})), + expected_ctxt={'user': 'mark', 'project': 'snarkybunch', + 'client_timeout': None})), ] _compression = [ @@ -940,6 +950,7 @@ class TestReplyWireFormat(test_utils.BaseTestCase): '_msg_id': uuid.uuid4().hex, '_unique_id': uuid.uuid4().hex, '_reply_q': 'reply_' + uuid.uuid4().hex, + '_timeout': None, }) msg['oslo.message'] = jsonutils.dumps(msg['oslo.message']) diff --git a/oslo_messaging/tests/functional/test_functional.py b/oslo_messaging/tests/functional/test_functional.py index 42372a61f..e2eb6d73a 100644 --- a/oslo_messaging/tests/functional/test_functional.py +++ b/oslo_messaging/tests/functional/test_functional.py @@ -152,6 +152,42 @@ class CallTestCase(utils.SkipIfNoTransportURL): self.assertEqual(10, server.endpoint.ival) + def test_monitor_long_call(self): + if not self.url.startswith("rabbit://"): + self.skipTest("backend does not support call monitoring") + + transport = self.useFixture(utils.RPCTransportFixture(self.conf, + self.url)) + target = oslo_messaging.Target(topic='topic_' + str(uuid.uuid4()), + server='server_' + str(uuid.uuid4())) + + class _endpoint(object): + def delay(self, ctxt, seconds): + time.sleep(seconds) + return seconds + + self.useFixture( + utils.RpcServerFixture(self.conf, self.url, target, + executor='threading', + endpoint=_endpoint())) + + # First case, no monitoring, ensure we timeout normally when the + # server side runs long + client1 = utils.ClientStub(transport.transport, target, + cast=False, timeout=1) + self.assertRaises(oslo_messaging.MessagingTimeout, + client1.delay, seconds=4) + + # Second case, set a short call monitor timeout and a very + # long overall timeout. If we didn't honor the call monitor + # timeout, we would wait an hour, past the test timeout. If + # the server was not sending message heartbeats, we'd time out + # after two seconds. + client2 = utils.ClientStub(transport.transport, target, + cast=False, timeout=3600, + call_monitor_timeout=2) + self.assertEqual(4, client2.delay(seconds=4)) + def test_endpoint_version_namespace(self): # verify endpoint version and namespace are checked target = oslo_messaging.Target(topic="topic_" + str(uuid.uuid4()), diff --git a/oslo_messaging/tests/rpc/test_client.py b/oslo_messaging/tests/rpc/test_client.py index f57de543b..9fa40db05 100755 --- a/oslo_messaging/tests/rpc/test_client.py +++ b/oslo_messaging/tests/rpc/test_client.py @@ -53,6 +53,7 @@ class TestCastCall(test_utils.BaseTestCase): if self.call: kwargs['wait_for_reply'] = True kwargs['timeout'] = None + kwargs['call_monitor_timeout'] = None method = client.call if self.call else client.cast method(self.ctxt, 'foo', **self.args) @@ -215,19 +216,21 @@ class TestCallTimeout(test_utils.BaseTestCase): scenarios = [ ('all_none', - dict(confval=None, ctor=None, prepare=_notset, expect=None)), + dict(confval=None, ctor=None, prepare=_notset, expect=None, cm=None)), ('confval', - dict(confval=21, ctor=None, prepare=_notset, expect=21)), + dict(confval=21, ctor=None, prepare=_notset, expect=21, cm=None)), ('ctor', - dict(confval=None, ctor=21.1, prepare=_notset, expect=21.1)), + dict(confval=None, ctor=21.1, prepare=_notset, expect=21.1, cm=None)), ('ctor_zero', - dict(confval=None, ctor=0, prepare=_notset, expect=0)), + dict(confval=None, ctor=0, prepare=_notset, expect=0, cm=None)), ('prepare', - dict(confval=None, ctor=None, prepare=21.1, expect=21.1)), + dict(confval=None, ctor=None, prepare=21.1, expect=21.1, cm=None)), ('prepare_override', - dict(confval=None, ctor=10.1, prepare=21.1, expect=21.1)), + dict(confval=None, ctor=10.1, prepare=21.1, expect=21.1, cm=None)), ('prepare_zero', - dict(confval=None, ctor=None, prepare=0, expect=0)), + dict(confval=None, ctor=None, prepare=0, expect=0, cm=None)), + ('call_monitor', + dict(confval=None, ctor=None, prepare=60, expect=60, cm=30)), ] def test_call_timeout(self): @@ -235,12 +238,14 @@ class TestCallTimeout(test_utils.BaseTestCase): transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(), - timeout=self.ctor) + timeout=self.ctor, + call_monitor_timeout=self.cm) transport._send = mock.Mock() msg = dict(method='foo', args={}) - kwargs = dict(wait_for_reply=True, timeout=self.expect, retry=None) + kwargs = dict(wait_for_reply=True, timeout=self.expect, retry=None, + call_monitor_timeout=self.cm) if self.prepare is not _notset: client = client.prepare(timeout=self.prepare) @@ -272,7 +277,7 @@ class TestCallRetry(test_utils.BaseTestCase): msg = dict(method='foo', args={}) kwargs = dict(wait_for_reply=True, timeout=60, - retry=self.expect) + retry=self.expect, call_monitor_timeout=None) if self.prepare is not _notset: client = client.prepare(retry=self.prepare) @@ -332,6 +337,8 @@ class TestSerializer(test_utils.BaseTestCase): kwargs = dict(wait_for_reply=True, timeout=None) if self.call else {} kwargs['retry'] = None + if self.call: + kwargs['call_monitor_timeout'] = None transport._send.return_value = self.retval @@ -441,6 +448,7 @@ class TestVersionCap(test_utils.BaseTestCase): if self.call: kwargs['wait_for_reply'] = True kwargs['timeout'] = None + kwargs['call_monitor_timeout'] = None prep_kwargs = {} if self.prepare_cap is not _notset: diff --git a/oslo_messaging/tests/test_transport.py b/oslo_messaging/tests/test_transport.py index e0d327f94..c37f352b4 100755 --- a/oslo_messaging/tests/test_transport.py +++ b/oslo_messaging/tests/test_transport.py @@ -288,6 +288,7 @@ class TestTransportMethodArgs(test_utils.BaseTestCase): 'message', wait_for_reply=None, timeout=None, + call_monitor_timeout=None, retry=None) def test_send_all_args(self): @@ -297,7 +298,8 @@ class TestTransportMethodArgs(test_utils.BaseTestCase): t._send(self._target, 'ctxt', 'message', wait_for_reply='wait_for_reply', - timeout='timeout', retry='retry') + timeout='timeout', call_monitor_timeout='cm_timeout', + retry='retry') t._driver.send.\ assert_called_once_with(self._target, @@ -305,6 +307,7 @@ class TestTransportMethodArgs(test_utils.BaseTestCase): 'message', wait_for_reply='wait_for_reply', timeout='timeout', + call_monitor_timeout='cm_timeout', retry='retry') def test_send_notification(self): diff --git a/oslo_messaging/transport.py b/oslo_messaging/transport.py index c9eddbc80..f416ee25f 100644 --- a/oslo_messaging/transport.py +++ b/oslo_messaging/transport.py @@ -122,13 +122,15 @@ class Transport(object): self._driver.require_features(requeue=requeue) def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None, - retry=None): + call_monitor_timeout=None, retry=None): if not target.topic: raise exceptions.InvalidTarget('A topic is required to send', target) return self._driver.send(target, ctxt, message, wait_for_reply=wait_for_reply, - timeout=timeout, retry=retry) + timeout=timeout, + call_monitor_timeout=call_monitor_timeout, + retry=retry) def _send_notification(self, target, ctxt, message, version, retry=None): if not target.topic: