From 7926137bb5861a756937930d8243d9d2465017d2 Mon Sep 17 00:00:00 2001 From: Lisa Zangrando Date: Fri, 20 Jan 2017 10:45:56 +0100 Subject: [PATCH] Add support for AMQP HA to NovaManager - AMQP HA cluster support added - added the new 'amqp_hosts' configuration attribute Change-Id: I20973b3a66540814b86970628270dc83adab8018 Sem-Ver: feature --- synergy_scheduler_manager/nova_manager.py | 64 ++++++++++++++++++----- 1 file changed, 51 insertions(+), 13 deletions(-) diff --git a/synergy_scheduler_manager/nova_manager.py b/synergy_scheduler_manager/nova_manager.py index c06e64b..58bdee8 100644 --- a/synergy_scheduler_manager/nova_manager.py +++ b/synergy_scheduler_manager/nova_manager.py @@ -53,11 +53,48 @@ CONFIG = ConfigParser.SafeConfigParser() class MessagingAPI(object): - def __init__(self, transport_url): - LOG.debug("setting up the AMQP transport url: %s" % transport_url) + def __init__(self, amqp_backend, amqp_user, + amqp_password, amqp_hosts, amqp_virt_host): oslo_msg.set_transport_defaults(control_exchange="nova") + + transport_hosts = self._createTransportHosts(amqp_user, + amqp_password, + amqp_hosts) + + transport_url = oslo_msg.TransportURL(CONF, + transport=amqp_backend, + virtual_host=amqp_virt_host, + hosts=transport_hosts, + aliases=None) + self.TRANSPORT = oslo_msg.get_transport(CONF, url=transport_url) + def _createTransportHosts(self, username, password, hosts): + """Returns a list of oslo.messaging.TransportHost objects.""" + transport_hosts = [] + + for host in hosts: + host = host.strip() + host_name, host_port = host.split(":") + + if not host_port: + msg = "Invalid hosts value: %s. It should be"\ + " in hostname:port format" % host + raise ValueError(msg) + + try: + host_port = int(host_port) + except ValueError: + msg = "Invalid port value: %s. It should be an integer" + raise ValueError(msg % host_port) + + transport_hosts.append(oslo_msg.TransportHost( + hostname=host_name, + port=host_port, + username=username, + password=password)) + return transport_hosts + def getTarget(self, topic, exchange=None, namespace=None, version=None, server=None): return oslo_msg.Target(topic=topic, @@ -343,6 +380,10 @@ class NovaManager(Manager): help="the amqp backend tpye (e.g. rabbit, qpid)", default=None, required=True), + cfg.ListOpt("amqp_hosts", + help="AMQP HA cluster host:port pairs", + default=None, + required=False), cfg.StrOpt("amqp_host", help="the amqp host name", default="localhost", @@ -428,6 +469,8 @@ class NovaManager(Manager): amqp_backend = self.getParameter("amqp_backend", fallback=True) + amqp_hosts = self.getParameter("amqp_hosts") + amqp_host = self.getParameter("amqp_host") amqp_port = self.getParameter("amqp_port") @@ -448,22 +491,17 @@ class NovaManager(Manager): self.getParameter("metadata_proxy_shared_secret", fallback=True) + if not amqp_hosts: + amqp_hosts = ["%s:%s" % (amqp_host, amqp_port)] try: LOG.debug("setting up the NOVA database connection: %s" % db_connection) - self.db_engine = create_engine(db_connection) + self.db_engine = create_engine(db_connection, pool_recycle=30) - transport_url = "{b}://{user}:{password}@{host}:{port}/{virt_host}" - transport_url = transport_url.format( - b=amqp_backend, - user=amqp_user, - password=amqp_password, - host=amqp_host, - port=amqp_port, - virt_host=amqp_virt_host) - - self.messagingAPI = MessagingAPI(transport_url) + self.messagingAPI = MessagingAPI(amqp_backend, amqp_user, + amqp_password, amqp_hosts, + amqp_virt_host) self.novaBaseRPCAPI = NovaBaseRPCAPI(conductor_topic, self.messagingAPI)