Add support for AMQP HA to NovaManager

- AMQP HA cluster support added
- added the new 'amqp_hosts' configuration attribute

Change-Id: I20973b3a66540814b86970628270dc83adab8018
Sem-Ver: feature
This commit is contained in:
Lisa Zangrando 2017-01-20 10:45:56 +01:00
parent 447f11d315
commit 7926137bb5

View File

@ -53,11 +53,48 @@ CONFIG = ConfigParser.SafeConfigParser()
class MessagingAPI(object):
def __init__(self, transport_url):
LOG.debug("setting up the AMQP transport url: %s" % transport_url)
def __init__(self, amqp_backend, amqp_user,
amqp_password, amqp_hosts, amqp_virt_host):
oslo_msg.set_transport_defaults(control_exchange="nova")
transport_hosts = self._createTransportHosts(amqp_user,
amqp_password,
amqp_hosts)
transport_url = oslo_msg.TransportURL(CONF,
transport=amqp_backend,
virtual_host=amqp_virt_host,
hosts=transport_hosts,
aliases=None)
self.TRANSPORT = oslo_msg.get_transport(CONF, url=transport_url)
def _createTransportHosts(self, username, password, hosts):
"""Returns a list of oslo.messaging.TransportHost objects."""
transport_hosts = []
for host in hosts:
host = host.strip()
host_name, host_port = host.split(":")
if not host_port:
msg = "Invalid hosts value: %s. It should be"\
" in hostname:port format" % host
raise ValueError(msg)
try:
host_port = int(host_port)
except ValueError:
msg = "Invalid port value: %s. It should be an integer"
raise ValueError(msg % host_port)
transport_hosts.append(oslo_msg.TransportHost(
hostname=host_name,
port=host_port,
username=username,
password=password))
return transport_hosts
def getTarget(self, topic, exchange=None, namespace=None,
version=None, server=None):
return oslo_msg.Target(topic=topic,
@ -343,6 +380,10 @@ class NovaManager(Manager):
help="the amqp backend tpye (e.g. rabbit, qpid)",
default=None,
required=True),
cfg.ListOpt("amqp_hosts",
help="AMQP HA cluster host:port pairs",
default=None,
required=False),
cfg.StrOpt("amqp_host",
help="the amqp host name",
default="localhost",
@ -428,6 +469,8 @@ class NovaManager(Manager):
amqp_backend = self.getParameter("amqp_backend", fallback=True)
amqp_hosts = self.getParameter("amqp_hosts")
amqp_host = self.getParameter("amqp_host")
amqp_port = self.getParameter("amqp_port")
@ -448,22 +491,17 @@ class NovaManager(Manager):
self.getParameter("metadata_proxy_shared_secret", fallback=True)
if not amqp_hosts:
amqp_hosts = ["%s:%s" % (amqp_host, amqp_port)]
try:
LOG.debug("setting up the NOVA database connection: %s"
% db_connection)
self.db_engine = create_engine(db_connection)
self.db_engine = create_engine(db_connection, pool_recycle=30)
transport_url = "{b}://{user}:{password}@{host}:{port}/{virt_host}"
transport_url = transport_url.format(
b=amqp_backend,
user=amqp_user,
password=amqp_password,
host=amqp_host,
port=amqp_port,
virt_host=amqp_virt_host)
self.messagingAPI = MessagingAPI(transport_url)
self.messagingAPI = MessagingAPI(amqp_backend, amqp_user,
amqp_password, amqp_hosts,
amqp_virt_host)
self.novaBaseRPCAPI = NovaBaseRPCAPI(conductor_topic,
self.messagingAPI)