Merge "Add support for AMQP HA to NovaManager"
This commit is contained in:
commit
358d5d453d
@ -53,11 +53,48 @@ CONFIG = ConfigParser.SafeConfigParser()
|
||||
|
||||
class MessagingAPI(object):
|
||||
|
||||
def __init__(self, transport_url):
|
||||
LOG.debug("setting up the AMQP transport url: %s" % transport_url)
|
||||
def __init__(self, amqp_backend, amqp_user,
|
||||
amqp_password, amqp_hosts, amqp_virt_host):
|
||||
oslo_msg.set_transport_defaults(control_exchange="nova")
|
||||
|
||||
transport_hosts = self._createTransportHosts(amqp_user,
|
||||
amqp_password,
|
||||
amqp_hosts)
|
||||
|
||||
transport_url = oslo_msg.TransportURL(CONF,
|
||||
transport=amqp_backend,
|
||||
virtual_host=amqp_virt_host,
|
||||
hosts=transport_hosts,
|
||||
aliases=None)
|
||||
|
||||
self.TRANSPORT = oslo_msg.get_transport(CONF, url=transport_url)
|
||||
|
||||
def _createTransportHosts(self, username, password, hosts):
|
||||
"""Returns a list of oslo.messaging.TransportHost objects."""
|
||||
transport_hosts = []
|
||||
|
||||
for host in hosts:
|
||||
host = host.strip()
|
||||
host_name, host_port = host.split(":")
|
||||
|
||||
if not host_port:
|
||||
msg = "Invalid hosts value: %s. It should be"\
|
||||
" in hostname:port format" % host
|
||||
raise ValueError(msg)
|
||||
|
||||
try:
|
||||
host_port = int(host_port)
|
||||
except ValueError:
|
||||
msg = "Invalid port value: %s. It should be an integer"
|
||||
raise ValueError(msg % host_port)
|
||||
|
||||
transport_hosts.append(oslo_msg.TransportHost(
|
||||
hostname=host_name,
|
||||
port=host_port,
|
||||
username=username,
|
||||
password=password))
|
||||
return transport_hosts
|
||||
|
||||
def getTarget(self, topic, exchange=None, namespace=None,
|
||||
version=None, server=None):
|
||||
return oslo_msg.Target(topic=topic,
|
||||
@ -343,6 +380,10 @@ class NovaManager(Manager):
|
||||
help="the amqp backend tpye (e.g. rabbit, qpid)",
|
||||
default=None,
|
||||
required=True),
|
||||
cfg.ListOpt("amqp_hosts",
|
||||
help="AMQP HA cluster host:port pairs",
|
||||
default=None,
|
||||
required=False),
|
||||
cfg.StrOpt("amqp_host",
|
||||
help="the amqp host name",
|
||||
default="localhost",
|
||||
@ -428,6 +469,8 @@ class NovaManager(Manager):
|
||||
|
||||
amqp_backend = self.getParameter("amqp_backend", fallback=True)
|
||||
|
||||
amqp_hosts = self.getParameter("amqp_hosts")
|
||||
|
||||
amqp_host = self.getParameter("amqp_host")
|
||||
|
||||
amqp_port = self.getParameter("amqp_port")
|
||||
@ -448,22 +491,17 @@ class NovaManager(Manager):
|
||||
|
||||
self.getParameter("metadata_proxy_shared_secret", fallback=True)
|
||||
|
||||
if not amqp_hosts:
|
||||
amqp_hosts = ["%s:%s" % (amqp_host, amqp_port)]
|
||||
try:
|
||||
LOG.debug("setting up the NOVA database connection: %s"
|
||||
% db_connection)
|
||||
|
||||
self.db_engine = create_engine(db_connection)
|
||||
self.db_engine = create_engine(db_connection, pool_recycle=30)
|
||||
|
||||
transport_url = "{b}://{user}:{password}@{host}:{port}/{virt_host}"
|
||||
transport_url = transport_url.format(
|
||||
b=amqp_backend,
|
||||
user=amqp_user,
|
||||
password=amqp_password,
|
||||
host=amqp_host,
|
||||
port=amqp_port,
|
||||
virt_host=amqp_virt_host)
|
||||
|
||||
self.messagingAPI = MessagingAPI(transport_url)
|
||||
self.messagingAPI = MessagingAPI(amqp_backend, amqp_user,
|
||||
amqp_password, amqp_hosts,
|
||||
amqp_virt_host)
|
||||
|
||||
self.novaBaseRPCAPI = NovaBaseRPCAPI(conductor_topic,
|
||||
self.messagingAPI)
|
||||
|
Loading…
x
Reference in New Issue
Block a user