Support a new qpid topology
There has been a bug open for a while pointing out that the way we create direct exchanges with qpid results in leaking exchanges since qpid doesn't support auto-deleting exchanges. This was somewhat mitigated by change to use a single reply queue. This meant we created far fewer direct exchanges, but the problem persists anyway. A Qpid expert, William Henry, originally proposed a change to address this issue. Unfortunately, it wasn't backwards compatible with existing installations. This patch takes the same approach, but makes it optional and off by default. This will allow a migration period. As a really nice side effect, the Qpid experts have told us that this change will also allow us to use Qpid broker federation to provide HA. DocImpact Closes-bug: #1178375 Co-authored-by: William Henry <whenry@redhat.com> Change-Id: I09b8317c0d8a298237beeb3105f2b90cb13933d8
This commit is contained in:
parent
2414524c6e
commit
7e1fddb217
@ -70,15 +70,33 @@ qpid_opts = [
|
|||||||
cfg.BoolOpt('qpid_tcp_nodelay',
|
cfg.BoolOpt('qpid_tcp_nodelay',
|
||||||
default=True,
|
default=True,
|
||||||
help='Disable Nagle algorithm'),
|
help='Disable Nagle algorithm'),
|
||||||
|
# NOTE(russellb) If any additional versions are added (beyond 1 and 2),
|
||||||
|
# this file could probably use some additional refactoring so that the
|
||||||
|
# differences between each version are split into different classes.
|
||||||
|
cfg.IntOpt('qpid_topology_version',
|
||||||
|
default=1,
|
||||||
|
help="The qpid topology version to use. Version 1 is what "
|
||||||
|
"was originally used by impl_qpid. Version 2 includes "
|
||||||
|
"some backwards-incompatible changes that allow broker "
|
||||||
|
"federation to work. Users should update to version 2 "
|
||||||
|
"when they are able to take everything down, as it "
|
||||||
|
"requires a clean break."),
|
||||||
]
|
]
|
||||||
|
|
||||||
JSON_CONTENT_TYPE = 'application/json; charset=utf8'
|
JSON_CONTENT_TYPE = 'application/json; charset=utf8'
|
||||||
|
|
||||||
|
|
||||||
|
def raise_invalid_topology_version(conf):
|
||||||
|
msg = (_("Invalid value for qpid_topology_version: %d") %
|
||||||
|
conf.qpid_topology_version)
|
||||||
|
LOG.error(msg)
|
||||||
|
raise Exception(msg)
|
||||||
|
|
||||||
|
|
||||||
class ConsumerBase(object):
|
class ConsumerBase(object):
|
||||||
"""Consumer base class."""
|
"""Consumer base class."""
|
||||||
|
|
||||||
def __init__(self, session, callback, node_name, node_opts,
|
def __init__(self, conf, session, callback, node_name, node_opts,
|
||||||
link_name, link_opts):
|
link_name, link_opts):
|
||||||
"""Declare a queue on an amqp session.
|
"""Declare a queue on an amqp session.
|
||||||
|
|
||||||
@ -96,6 +114,7 @@ class ConsumerBase(object):
|
|||||||
self.receiver = None
|
self.receiver = None
|
||||||
self.session = None
|
self.session = None
|
||||||
|
|
||||||
|
if conf.qpid_topology_version == 1:
|
||||||
addr_opts = {
|
addr_opts = {
|
||||||
"create": "always",
|
"create": "always",
|
||||||
"node": {
|
"node": {
|
||||||
@ -116,6 +135,17 @@ class ConsumerBase(object):
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
addr_opts["node"]["x-declare"].update(node_opts)
|
addr_opts["node"]["x-declare"].update(node_opts)
|
||||||
|
elif conf.qpid_topology_version == 2:
|
||||||
|
addr_opts = {
|
||||||
|
"link": {
|
||||||
|
"x-declare": {
|
||||||
|
"auto-delete": True,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
raise_invalid_topology_version()
|
||||||
|
|
||||||
addr_opts["link"]["x-declare"].update(link_opts)
|
addr_opts["link"]["x-declare"].update(link_opts)
|
||||||
|
|
||||||
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
|
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
|
||||||
@ -182,16 +212,24 @@ class DirectConsumer(ConsumerBase):
|
|||||||
'callback' is the callback to call when messages are received
|
'callback' is the callback to call when messages are received
|
||||||
"""
|
"""
|
||||||
|
|
||||||
super(DirectConsumer, self).__init__(
|
link_opts = {
|
||||||
session, callback,
|
|
||||||
"%s/%s" % (msg_id, msg_id),
|
|
||||||
{"type": "direct"},
|
|
||||||
msg_id,
|
|
||||||
{
|
|
||||||
"auto-delete": conf.amqp_auto_delete,
|
"auto-delete": conf.amqp_auto_delete,
|
||||||
"exclusive": True,
|
"exclusive": True,
|
||||||
"durable": conf.amqp_durable_queues,
|
"durable": conf.amqp_durable_queues,
|
||||||
})
|
}
|
||||||
|
|
||||||
|
if conf.qpid_topology_version == 1:
|
||||||
|
node_name = "%s/%s" % (msg_id, msg_id)
|
||||||
|
node_opts = {"type": "direct"}
|
||||||
|
elif conf.qpid_topology_version == 2:
|
||||||
|
node_name = "amq.direct/%s" % msg_id
|
||||||
|
node_opts = {}
|
||||||
|
else:
|
||||||
|
raise_invalid_topology_version()
|
||||||
|
|
||||||
|
super(DirectConsumer, self).__init__(conf, session, callback,
|
||||||
|
node_name, node_opts, msg_id,
|
||||||
|
link_opts)
|
||||||
|
|
||||||
|
|
||||||
class TopicConsumer(ConsumerBase):
|
class TopicConsumer(ConsumerBase):
|
||||||
@ -209,14 +247,20 @@ class TopicConsumer(ConsumerBase):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
|
exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
|
||||||
super(TopicConsumer, self).__init__(
|
link_opts = {
|
||||||
session, callback,
|
|
||||||
"%s/%s" % (exchange_name, topic),
|
|
||||||
{}, name or topic,
|
|
||||||
{
|
|
||||||
"auto-delete": conf.amqp_auto_delete,
|
"auto-delete": conf.amqp_auto_delete,
|
||||||
"durable": conf.amqp_durable_queues,
|
"durable": conf.amqp_durable_queues,
|
||||||
})
|
}
|
||||||
|
|
||||||
|
if conf.qpid_topology_version == 1:
|
||||||
|
node_name = "%s/%s" % (exchange_name, topic)
|
||||||
|
elif conf.qpid_topology_version == 2:
|
||||||
|
node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
|
||||||
|
else:
|
||||||
|
raise_invalid_topology_version()
|
||||||
|
|
||||||
|
super(TopicConsumer, self).__init__(conf, session, callback, node_name,
|
||||||
|
{}, name or topic, link_opts)
|
||||||
|
|
||||||
|
|
||||||
class FanoutConsumer(ConsumerBase):
|
class FanoutConsumer(ConsumerBase):
|
||||||
@ -231,12 +275,22 @@ class FanoutConsumer(ConsumerBase):
|
|||||||
"""
|
"""
|
||||||
self.conf = conf
|
self.conf = conf
|
||||||
|
|
||||||
super(FanoutConsumer, self).__init__(
|
link_opts = {"exclusive": True}
|
||||||
session, callback,
|
|
||||||
"%s_fanout" % topic,
|
if conf.qpid_topology_version == 1:
|
||||||
{"durable": False, "type": "fanout"},
|
node_name = "%s_fanout" % topic
|
||||||
"%s_fanout_%s" % (topic, uuid.uuid4().hex),
|
node_opts = {"durable": False, "type": "fanout"}
|
||||||
{"exclusive": True})
|
link_name = "%s_fanout_%s" % (topic, uuid.uuid4().hex)
|
||||||
|
elif conf.qpid_topology_version == 2:
|
||||||
|
node_name = "amq.topic/fanout/%s" % topic
|
||||||
|
node_opts = {}
|
||||||
|
link_name = ""
|
||||||
|
else:
|
||||||
|
raise_invalid_topology_version()
|
||||||
|
|
||||||
|
super(FanoutConsumer, self).__init__(conf, session, callback,
|
||||||
|
node_name, node_opts, link_name,
|
||||||
|
link_opts)
|
||||||
|
|
||||||
def reconnect(self, session):
|
def reconnect(self, session):
|
||||||
topic = self.get_node_name().rpartition('_fanout')[0]
|
topic = self.get_node_name().rpartition('_fanout')[0]
|
||||||
@ -254,20 +308,21 @@ class FanoutConsumer(ConsumerBase):
|
|||||||
class Publisher(object):
|
class Publisher(object):
|
||||||
"""Base Publisher class."""
|
"""Base Publisher class."""
|
||||||
|
|
||||||
def __init__(self, session, node_name, node_opts=None):
|
def __init__(self, conf, session, node_name, node_opts=None):
|
||||||
"""Init the Publisher class with the exchange_name, routing_key,
|
"""Init the Publisher class with the exchange_name, routing_key,
|
||||||
and other options
|
and other options
|
||||||
"""
|
"""
|
||||||
self.sender = None
|
self.sender = None
|
||||||
self.session = session
|
self.session = session
|
||||||
|
|
||||||
|
if conf.qpid_topology_version == 1:
|
||||||
addr_opts = {
|
addr_opts = {
|
||||||
"create": "always",
|
"create": "always",
|
||||||
"node": {
|
"node": {
|
||||||
"type": "topic",
|
"type": "topic",
|
||||||
"x-declare": {
|
"x-declare": {
|
||||||
"durable": False,
|
"durable": False,
|
||||||
# auto-delete isn't implemented for exchanges in Qpid,
|
# auto-delete isn't implemented for exchanges in qpid,
|
||||||
# but put in here anyway
|
# but put in here anyway
|
||||||
"auto-delete": True,
|
"auto-delete": True,
|
||||||
},
|
},
|
||||||
@ -277,6 +332,10 @@ class Publisher(object):
|
|||||||
addr_opts["node"]["x-declare"].update(node_opts)
|
addr_opts["node"]["x-declare"].update(node_opts)
|
||||||
|
|
||||||
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
|
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
|
||||||
|
elif conf.qpid_topology_version == 2:
|
||||||
|
self.address = node_name
|
||||||
|
else:
|
||||||
|
raise_invalid_topology_version()
|
||||||
|
|
||||||
self.reconnect(session)
|
self.reconnect(session)
|
||||||
|
|
||||||
@ -320,8 +379,18 @@ class DirectPublisher(Publisher):
|
|||||||
"""Publisher class for 'direct'."""
|
"""Publisher class for 'direct'."""
|
||||||
def __init__(self, conf, session, msg_id):
|
def __init__(self, conf, session, msg_id):
|
||||||
"""Init a 'direct' publisher."""
|
"""Init a 'direct' publisher."""
|
||||||
super(DirectPublisher, self).__init__(session, msg_id,
|
|
||||||
{"type": "direct"})
|
if conf.qpid_topology_version == 1:
|
||||||
|
node_name = msg_id
|
||||||
|
node_opts = {"type": "direct"}
|
||||||
|
elif conf.qpid_topology_version == 2:
|
||||||
|
node_name = "amq.direct/%s" % msg_id
|
||||||
|
node_opts = {}
|
||||||
|
else:
|
||||||
|
raise_invalid_topology_version()
|
||||||
|
|
||||||
|
super(DirectPublisher, self).__init__(conf, session, node_name,
|
||||||
|
node_opts)
|
||||||
|
|
||||||
|
|
||||||
class TopicPublisher(Publisher):
|
class TopicPublisher(Publisher):
|
||||||
@ -330,8 +399,15 @@ class TopicPublisher(Publisher):
|
|||||||
"""Init a 'topic' publisher.
|
"""Init a 'topic' publisher.
|
||||||
"""
|
"""
|
||||||
exchange_name = rpc_amqp.get_control_exchange(conf)
|
exchange_name = rpc_amqp.get_control_exchange(conf)
|
||||||
super(TopicPublisher, self).__init__(session,
|
|
||||||
"%s/%s" % (exchange_name, topic))
|
if conf.qpid_topology_version == 1:
|
||||||
|
node_name = "%s/%s" % (exchange_name, topic)
|
||||||
|
elif conf.qpid_topology_version == 2:
|
||||||
|
node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
|
||||||
|
else:
|
||||||
|
raise_invalid_topology_version()
|
||||||
|
|
||||||
|
super(TopicPublisher, self).__init__(conf, session, node_name)
|
||||||
|
|
||||||
|
|
||||||
class FanoutPublisher(Publisher):
|
class FanoutPublisher(Publisher):
|
||||||
@ -339,9 +415,18 @@ class FanoutPublisher(Publisher):
|
|||||||
def __init__(self, conf, session, topic):
|
def __init__(self, conf, session, topic):
|
||||||
"""Init a 'fanout' publisher.
|
"""Init a 'fanout' publisher.
|
||||||
"""
|
"""
|
||||||
super(FanoutPublisher, self).__init__(
|
|
||||||
session,
|
if conf.qpid_topology_version == 1:
|
||||||
"%s_fanout" % topic, {"type": "fanout"})
|
node_name = "%s_fanout" % topic
|
||||||
|
node_opts = {"type": "fanout"}
|
||||||
|
elif conf.qpid_topology_version == 2:
|
||||||
|
node_name = "amq.topic/fanout/%s" % topic
|
||||||
|
node_opts = {}
|
||||||
|
else:
|
||||||
|
raise_invalid_topology_version()
|
||||||
|
|
||||||
|
super(FanoutPublisher, self).__init__(conf, session, node_name,
|
||||||
|
node_opts)
|
||||||
|
|
||||||
|
|
||||||
class NotifyPublisher(Publisher):
|
class NotifyPublisher(Publisher):
|
||||||
@ -350,9 +435,17 @@ class NotifyPublisher(Publisher):
|
|||||||
"""Init a 'topic' publisher.
|
"""Init a 'topic' publisher.
|
||||||
"""
|
"""
|
||||||
exchange_name = rpc_amqp.get_control_exchange(conf)
|
exchange_name = rpc_amqp.get_control_exchange(conf)
|
||||||
super(NotifyPublisher, self).__init__(session,
|
node_opts = {"durable": True}
|
||||||
"%s/%s" % (exchange_name, topic),
|
|
||||||
{"durable": True})
|
if conf.qpid_topology_version == 1:
|
||||||
|
node_name = "%s/%s" % (exchange_name, topic)
|
||||||
|
elif conf.qpid_topology_version == 2:
|
||||||
|
node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
|
||||||
|
else:
|
||||||
|
raise_invalid_topology_version()
|
||||||
|
|
||||||
|
super(NotifyPublisher, self).__init__(conf, session, node_name,
|
||||||
|
node_opts)
|
||||||
|
|
||||||
|
|
||||||
class Connection(object):
|
class Connection(object):
|
||||||
|
Loading…
x
Reference in New Issue
Block a user