From fc16dbd695890358ce9671e3cf616b1d0ff8ffff Mon Sep 17 00:00:00 2001 From: Lisa Zangrando Date: Thu, 11 May 2017 14:39:24 +0200 Subject: [PATCH] MessagingAPI should be implemented as common module nova_manager.py implements the NovaManager which interacts with Nova service but even provides the MessagingAPI class which interacts with the AMQP system. This fix moves the MessagingAPI in common/messaging.py. Bug: 1690133 Change-Id: Ifed11126a1af227950c03ccffc48d577fb631ded Sem-Ver: bugfix --- synergy_scheduler_manager/common/messaging.py | 110 ++++++++++++ synergy_scheduler_manager/nova_manager.py | 158 +++++------------- 2 files changed, 153 insertions(+), 115 deletions(-) create mode 100644 synergy_scheduler_manager/common/messaging.py diff --git a/synergy_scheduler_manager/common/messaging.py b/synergy_scheduler_manager/common/messaging.py new file mode 100644 index 0000000..8fc48f0 --- /dev/null +++ b/synergy_scheduler_manager/common/messaging.py @@ -0,0 +1,110 @@ +import logging +import oslo_messaging as oslo_msg + +from oslo_config import cfg + +__author__ = "Lisa Zangrando" +__email__ = "lisa.zangrando[AT]pd.infn.it" +__copyright__ = """Copyright (c) 2015 INFN - INDIGO-DataCloud +All Rights Reserved + +Licensed under the Apache License, Version 2.0; +you may not use this file except in compliance with the +License. You may obtain a copy of the License at: + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +either express or implied. +See the License for the specific language governing +permissions and limitations under the License.""" + + +LOG = logging.getLogger(__name__) +CONF = cfg.CONF + + +class AMQP(object): + + def __init__(self, url=None, backend=None, username=None, password=None, + hosts=None, virt_host=None, exchange=None): + super(AMQP, self).__init__() + + if exchange: + oslo_msg.set_transport_defaults(control_exchange=exchange) + + if url: + self.TRANSPORT = oslo_msg.get_transport(CONF, url=url) + elif not backend and not hosts: + raise ValueError("missing AMQP parameters") + else: + t_hosts = self._createTransportHosts(username, password, hosts) + t_url = oslo_msg.TransportURL(CONF, + transport=backend, + virtual_host=virt_host, + hosts=t_hosts, + aliases=None) + + self.TRANSPORT = oslo_msg.get_transport(CONF, url=t_url) + + def _createTransportHosts(self, username, password, hosts): + """Returns a list of oslo.messaging.TransportHost objects.""" + transport_hosts = [] + + for host in hosts: + host = host.strip() + host_name, host_port = host.split(":") + + if not host_port: + msg = "Invalid hosts value: %s. It should be"\ + " in hostname:port format" % host + raise ValueError(msg) + + try: + host_port = int(host_port) + except ValueError: + msg = "Invalid port value: %s. It should be an integer" + raise ValueError(msg % host_port) + + transport_hosts.append(oslo_msg.TransportHost( + hostname=host_name, + port=host_port, + username=username, + password=password)) + return transport_hosts + + def getTarget(self, topic, exchange=None, namespace=None, + version=None, server=None): + return oslo_msg.Target(topic=topic, + exchange=exchange, + namespace=namespace, + version=version, + server=server) + + def getRPCClient(self, target, version_cap=None, serializer=None): + assert self.TRANSPORT is not None + + return oslo_msg.RPCClient(self.TRANSPORT, + target, + version_cap=version_cap, + serializer=serializer) + + def getRPCServer(self, target, endpoints, serializer=None): + assert self.TRANSPORT is not None + + return oslo_msg.get_rpc_server(self.TRANSPORT, + target, + endpoints, + executor="eventlet", + serializer=serializer) + + def getNotificationListener(self, targets, endpoints): + assert self.TRANSPORT is not None + + return oslo_msg.get_notification_listener(self.TRANSPORT, + targets, + endpoints, + allow_requeue=True, + executor="eventlet") diff --git a/synergy_scheduler_manager/nova_manager.py b/synergy_scheduler_manager/nova_manager.py index b827e8e..1754ab1 100644 --- a/synergy_scheduler_manager/nova_manager.py +++ b/synergy_scheduler_manager/nova_manager.py @@ -4,13 +4,13 @@ import hashlib import hmac import json import logging -import oslo_messaging as oslo_msg import requests from common.block_device import BlockDeviceMapping from common.compute import Compute from common.flavor import Flavor from common.hypervisor import Hypervisor +from common.messaging import AMQP from common.quota import Quota from common.request import Request from common.server import Server @@ -42,91 +42,6 @@ LOG = logging.getLogger(__name__) CONF = cfg.CONF -class MessagingAPI(object): - - def __init__(self, amqp_backend, amqp_user, - amqp_password, amqp_hosts, amqp_virt_host): - super(MessagingAPI, self).__init__() - - oslo_msg.set_transport_defaults(control_exchange="nova") - - transport_hosts = self._createTransportHosts(amqp_user, - amqp_password, - amqp_hosts) - - transport_url = oslo_msg.TransportURL(CONF, - transport=amqp_backend, - virtual_host=amqp_virt_host, - hosts=transport_hosts, - aliases=None) - - self.TRANSPORT = oslo_msg.get_transport(CONF, url=transport_url) - - def _createTransportHosts(self, username, password, hosts): - """Returns a list of oslo.messaging.TransportHost objects.""" - transport_hosts = [] - - for host in hosts: - host = host.strip() - host_name, host_port = host.split(":") - - if not host_port: - msg = "Invalid hosts value: %s. It should be"\ - " in hostname:port format" % host - raise ValueError(msg) - - try: - host_port = int(host_port) - except ValueError: - msg = "Invalid port value: %s. It should be an integer" - raise ValueError(msg % host_port) - - transport_hosts.append(oslo_msg.TransportHost( - hostname=host_name, - port=host_port, - username=username, - password=password)) - return transport_hosts - - def getTarget(self, topic, exchange=None, namespace=None, - version=None, server=None): - return oslo_msg.Target(topic=topic, - exchange=exchange, - namespace=namespace, - version=version, - server=server) - - def getRPCClient(self, target, version_cap=None, serializer=None): - assert self.TRANSPORT is not None - - LOG.debug("creating RPC client with target %s" % target) - return oslo_msg.RPCClient(self.TRANSPORT, - target, - version_cap=version_cap, - serializer=serializer) - - def getRPCServer(self, target, endpoints, serializer=None): - assert self.TRANSPORT is not None - - LOG.debug("creating RPC server with target %s" % target) - return oslo_msg.get_rpc_server(self.TRANSPORT, - target, - endpoints, - executor="eventlet", - serializer=serializer) - - def getNotificationListener(self, targets, endpoints): - assert self.TRANSPORT is not None - - LOG.debug("creating notification listener with target %s endpoints %s" - % (targets, endpoints)) - return oslo_msg.get_notification_listener(self.TRANSPORT, - targets, - endpoints, - allow_requeue=True, - executor="eventlet") - - class NovaConductorComputeAPI(object): def __init__(self, topic, scheduler_manager, keystone_manager, msg): @@ -215,10 +130,18 @@ class NovaManager(Manager): super(NovaManager, self).__init__("NovaManager") self.config_opts = [ + cfg.StrOpt("amqp_url", + help="the amqp transport url", + default=None, + required=False), + cfg.StrOpt("amqp_exchange", + help="the amqp exchange", + default="nova", + required=False), cfg.StrOpt("amqp_backend", help="the amqp backend tpye (e.g. rabbit, qpid)", default=None, - required=True), + required=False), cfg.ListOpt("amqp_hosts", help="AMQP HA cluster host:port pairs", default=None, @@ -234,11 +157,11 @@ class NovaManager(Manager): cfg.StrOpt("amqp_user", help="the amqp user", default=None, - required=True), + required=False), cfg.StrOpt("amqp_password", help="the amqp password", default=None, - required=True), + required=False), cfg.StrOpt("amqp_virt_host", help="the amqp virtual host", default="/", @@ -310,7 +233,9 @@ class NovaManager(Manager): self.keystone_manager = self.getManager("KeystoneManager") self.scheduler_manager = self.getManager("SchedulerManager") - amqp_backend = self.getParameter("amqp_backend", fallback=True) + amqp_url = self.getParameter("amqp_url") + + amqp_backend = self.getParameter("amqp_backend") amqp_hosts = self.getParameter("amqp_hosts") @@ -318,12 +243,14 @@ class NovaManager(Manager): amqp_port = self.getParameter("amqp_port") - amqp_user = self.getParameter("amqp_user", fallback=True) + amqp_user = self.getParameter("amqp_user") - amqp_password = self.getParameter("amqp_password", fallback=True) + amqp_password = self.getParameter("amqp_password") amqp_virt_host = self.getParameter("amqp_virt_host") + amqp_exchange = self.getParameter("amqp_exchange") + db_connection = self.getParameter("db_connection", fallback=True) host = self.getParameter("host") @@ -342,19 +269,20 @@ class NovaManager(Manager): self.db_engine = create_engine(db_connection, pool_recycle=30) - self.messagingAPI = MessagingAPI(amqp_backend, amqp_user, - amqp_password, amqp_hosts, - amqp_virt_host) + self.messaging = AMQP(url=amqp_url, backend=amqp_backend, + username=amqp_user, password=amqp_password, + hosts=amqp_hosts, virt_host=amqp_virt_host, + exchange=amqp_exchange) self.novaConductorComputeAPI = NovaConductorComputeAPI( conductor_topic, self.scheduler_manager, self.keystone_manager, - self.messagingAPI) + self.messaging) - self.conductor_rpc = self.messagingAPI.getRPCServer( - target=self.messagingAPI.getTarget(topic=synergy_topic, - server=host), + self.conductor_rpc = self.messaging.getRPCServer( + target=self.messaging.getTarget(topic=synergy_topic, + server=host), endpoints=[self.novaConductorComputeAPI]) self.conductor_rpc.start() @@ -912,24 +840,24 @@ class NovaManager(Manager): def getTarget(self, topic, exchange=None, namespace=None, version=None, server=None): - return self.messagingAPI.getTarget(topic=topic, - namespace=namespace, - exchange=exchange, - version=version, - server=server) + return self.messaging.getTarget(topic=topic, + namespace=namespace, + exchange=exchange, + version=version, + server=server) def getRPCClient(self, target, version_cap=None, serializer=None): - return self.messagingAPI.getRPCClient(target, - version_cap=version_cap, - serializer=serializer) + return self.messaging.getRPCClient(target, + version_cap=version_cap, + serializer=serializer) def getRPCServer(self, target, endpoints, serializer=None): - return self.messagingAPI.getRPCServer(target, - endpoints, - serializer=serializer) + return self.messaging.getRPCServer(target, + endpoints, + serializer=serializer) def getNotificationListener(self, targets, endpoints): - return self.messagingAPI.getNotificationListener(targets, endpoints) + return self.messaging.getNotificationListener(targets, endpoints) def getProjectUsage(self, prj_id, from_date, to_date): usage = {} @@ -1070,11 +998,11 @@ where instance_uuid='%(id)s' and deleted_at is NULL""" % {"id": server.getId()} return servers def selectComputes(self, request): - target = self.messagingAPI.getTarget(topic='scheduler', - exchange="nova", - version="4.0") + target = self.messaging.getTarget(topic='scheduler', + exchange="nova", + version="4.0") - client = self.messagingAPI.getRPCClient(target) + client = self.messaging.getRPCClient(target) cctxt = client.prepare(version='4.0') request_spec = {