Implement mandatory flag for RabbitMQ driver
With this feature it is possible to use the mandatory RabbitMQ mandatory flag. Implements: blueprint transport-options (point 3) The blueprint link is [1] Please follow the link [2] to use and test the feature. 1- https://blueprints.launchpad.net/oslo.messaging/+spec/transport-options 2- https://github.com/Gsantomaggio/rabbitmq-utils/ tree/master/openstack/mandatory_test Change-Id: Ie269fc08ba80c4b94a24a8207c1e86c19c3b3fcb
This commit is contained in:
parent
6cdd4cb007
commit
c50076b4ef
@ -28,7 +28,7 @@ imagesize==0.7.1
|
|||||||
iso8601==0.1.11
|
iso8601==0.1.11
|
||||||
Jinja2==2.10
|
Jinja2==2.10
|
||||||
keystoneauth1==3.4.0
|
keystoneauth1==3.4.0
|
||||||
kombu==4.0.0
|
kombu==4.6.1
|
||||||
linecache2==1.0.0
|
linecache2==1.0.0
|
||||||
MarkupSafe==1.0
|
MarkupSafe==1.0
|
||||||
mccabe==0.2.1
|
mccabe==0.2.1
|
||||||
|
@ -756,6 +756,10 @@ class Connection(object):
|
|||||||
# NOTE(sileht): we must reraise this without
|
# NOTE(sileht): we must reraise this without
|
||||||
# trigger error_callback
|
# trigger error_callback
|
||||||
raise
|
raise
|
||||||
|
except exceptions.MessageUndeliverable:
|
||||||
|
# NOTE(gsantomaggio): we must reraise this without
|
||||||
|
# trigger error_callback
|
||||||
|
raise
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
error_callback and error_callback(exc)
|
error_callback and error_callback(exc)
|
||||||
self._set_current_channel(None)
|
self._set_current_channel(None)
|
||||||
@ -769,6 +773,11 @@ class Connection(object):
|
|||||||
LOG.error(msg)
|
LOG.error(msg)
|
||||||
raise exceptions.MessageDeliveryFailure(msg)
|
raise exceptions.MessageDeliveryFailure(msg)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def on_return(exception, exchange, routing_key, message):
|
||||||
|
raise exceptions.MessageUndeliverable(exception, exchange, routing_key,
|
||||||
|
message)
|
||||||
|
|
||||||
def _set_current_channel(self, new_channel):
|
def _set_current_channel(self, new_channel):
|
||||||
"""Change the channel to use.
|
"""Change the channel to use.
|
||||||
|
|
||||||
@ -787,7 +796,8 @@ class Connection(object):
|
|||||||
if new_channel is not None:
|
if new_channel is not None:
|
||||||
if self.purpose == rpc_common.PURPOSE_LISTEN:
|
if self.purpose == rpc_common.PURPOSE_LISTEN:
|
||||||
self._set_qos(new_channel)
|
self._set_qos(new_channel)
|
||||||
self._producer = kombu.messaging.Producer(new_channel)
|
self._producer = kombu.messaging.Producer(new_channel,
|
||||||
|
on_return=self.on_return)
|
||||||
for consumer in self._consumers:
|
for consumer in self._consumers:
|
||||||
consumer.declare(self)
|
consumer.declare(self)
|
||||||
|
|
||||||
|
@ -16,7 +16,7 @@
|
|||||||
import six
|
import six
|
||||||
|
|
||||||
__all__ = ['MessagingException', 'MessagingTimeout', 'MessageDeliveryFailure',
|
__all__ = ['MessagingException', 'MessagingTimeout', 'MessageDeliveryFailure',
|
||||||
'InvalidTarget']
|
'InvalidTarget', 'MessageUndeliverable']
|
||||||
|
|
||||||
|
|
||||||
class MessagingException(Exception):
|
class MessagingException(Exception):
|
||||||
@ -38,3 +38,14 @@ class InvalidTarget(MessagingException, ValueError):
|
|||||||
msg = msg + ":" + six.text_type(target)
|
msg = msg + ":" + six.text_type(target)
|
||||||
super(InvalidTarget, self).__init__(msg)
|
super(InvalidTarget, self).__init__(msg)
|
||||||
self.target = target
|
self.target = target
|
||||||
|
|
||||||
|
|
||||||
|
class MessageUndeliverable(Exception):
|
||||||
|
"""Raised if message is not routed with mandatory flag"""
|
||||||
|
|
||||||
|
def __init__(self, exception, exchange, routing_key, message):
|
||||||
|
super(MessageUndeliverable, self).__init__()
|
||||||
|
self.exception = exception
|
||||||
|
self.exchange = exchange
|
||||||
|
self.routing_key = routing_key
|
||||||
|
self.message = message
|
||||||
|
@ -152,6 +152,40 @@ class CallTestCase(utils.SkipIfNoTransportURL):
|
|||||||
|
|
||||||
self.assertEqual(10, server.endpoint.ival)
|
self.assertEqual(10, server.endpoint.ival)
|
||||||
|
|
||||||
|
def test_mandatory_call(self):
|
||||||
|
if not self.url.startswith("rabbit://"):
|
||||||
|
self.skipTest("backend does not support call monitoring")
|
||||||
|
|
||||||
|
transport = self.useFixture(utils.RPCTransportFixture(self.conf,
|
||||||
|
self.url))
|
||||||
|
target = oslo_messaging.Target(topic='topic_' + str(uuid.uuid4()),
|
||||||
|
server='server_' + str(uuid.uuid4()))
|
||||||
|
|
||||||
|
# test for mandatory flag using transport-options, see:
|
||||||
|
# https://blueprints.launchpad.net/oslo.messaging/+spec/transport-options
|
||||||
|
# first test with `at_least_once=False` raises a "MessagingTimeout"
|
||||||
|
# error since there is no control if the queue actually exists.
|
||||||
|
# (Default behavior)
|
||||||
|
options = oslo_messaging.TransportOptions(at_least_once=False)
|
||||||
|
client1 = utils.ClientStub(transport.transport, target,
|
||||||
|
cast=False, timeout=1,
|
||||||
|
transport_options=options)
|
||||||
|
|
||||||
|
self.assertRaises(oslo_messaging.MessagingTimeout,
|
||||||
|
client1.delay)
|
||||||
|
|
||||||
|
# second test with `at_least_once=True` raises a "MessageUndeliverable"
|
||||||
|
# caused by mandatory flag.
|
||||||
|
# the MessageUndeliverable error is raised immediately without waiting
|
||||||
|
# any timeout
|
||||||
|
options2 = oslo_messaging.TransportOptions(at_least_once=True)
|
||||||
|
client2 = utils.ClientStub(transport.transport, target,
|
||||||
|
cast=False, timeout=60,
|
||||||
|
transport_options=options2)
|
||||||
|
|
||||||
|
self.assertRaises(oslo_messaging.MessageUndeliverable,
|
||||||
|
client2.delay)
|
||||||
|
|
||||||
def test_monitor_long_call(self):
|
def test_monitor_long_call(self):
|
||||||
if not (self.url.startswith("rabbit://") or
|
if not (self.url.startswith("rabbit://") or
|
||||||
self.url.startswith("amqp://")):
|
self.url.startswith("amqp://")):
|
||||||
|
@ -226,10 +226,15 @@ class RpcCast(RpcCall):
|
|||||||
|
|
||||||
|
|
||||||
class ClientStub(object):
|
class ClientStub(object):
|
||||||
def __init__(self, transport, target, cast=False, name=None, **kwargs):
|
def __init__(self, transport, target, cast=False, name=None,
|
||||||
|
transport_options=None, **kwargs):
|
||||||
self.name = name or "functional-tests"
|
self.name = name or "functional-tests"
|
||||||
self.cast = cast
|
self.cast = cast
|
||||||
self.client = oslo_messaging.RPCClient(transport, target, **kwargs)
|
self.client = oslo_messaging.RPCClient(
|
||||||
|
transport=transport,
|
||||||
|
target=target,
|
||||||
|
transport_options=transport_options,
|
||||||
|
**kwargs)
|
||||||
|
|
||||||
def __getattr__(self, name):
|
def __getattr__(self, name):
|
||||||
context = {"application": self.name}
|
context = {"application": self.name}
|
||||||
|
@ -26,7 +26,7 @@ PyYAML>=3.12 # MIT
|
|||||||
# rabbit driver is the default
|
# rabbit driver is the default
|
||||||
# we set the amqp version to ensure heartbeat works
|
# we set the amqp version to ensure heartbeat works
|
||||||
amqp>=2.4.1 # BSD
|
amqp>=2.4.1 # BSD
|
||||||
kombu!=4.0.2,>=4.0.0 # BSD
|
kombu!=4.0.2,>=4.6.1 # BSD
|
||||||
|
|
||||||
# middleware
|
# middleware
|
||||||
oslo.middleware>=3.31.0 # Apache-2.0
|
oslo.middleware>=3.31.0 # Apache-2.0
|
||||||
|
Loading…
x
Reference in New Issue
Block a user