From 23e9382c7d0f8e808e6e4c59f6ba0ea83dfdd15e Mon Sep 17 00:00:00 2001 From: Lisa Zangrando Date: Mon, 17 Jul 2017 14:23:54 +0200 Subject: [PATCH] Added support for notifications to KeystoneManager This update enables KeystoneManager to receive asynchronous notifications about the users and projects status from Keystone service Change-Id: Ia75b89c825bb8536e0462eff7a873699b219d781 Sem-Ver: bugfix --- config/synergy_scheduler.conf | 9 + synergy_scheduler_manager/common/messaging.py | 2 +- synergy_scheduler_manager/keystone_manager.py | 192 +++++++++--------- 3 files changed, 111 insertions(+), 92 deletions(-) diff --git a/config/synergy_scheduler.conf b/config/synergy_scheduler.conf index 159a007..837db04 100644 --- a/config/synergy_scheduler.conf +++ b/config/synergy_scheduler.conf @@ -101,6 +101,15 @@ clock_skew = 60 # set the SSL client certificate (PEM encoded) #ssl_cert_file = +# set the AMQP server url (e.g. rabbit://RABBIT_USER:RABBIT_PASS@RABBIT_HOST_IP) +#amqp_url = + +# set the AMQP exchange (default: keystone) +amqp_exchange = keystone + +# set the AMQP notification topic (default: notification) +amqp_topic = notification + [NovaManager] autostart = True diff --git a/synergy_scheduler_manager/common/messaging.py b/synergy_scheduler_manager/common/messaging.py index 8fc48f0..3f5c0e5 100644 --- a/synergy_scheduler_manager/common/messaging.py +++ b/synergy_scheduler_manager/common/messaging.py @@ -106,5 +106,5 @@ class AMQP(object): return oslo_msg.get_notification_listener(self.TRANSPORT, targets, endpoints, - allow_requeue=True, + allow_requeue=False, executor="eventlet") diff --git a/synergy_scheduler_manager/keystone_manager.py b/synergy_scheduler_manager/keystone_manager.py index 2c3e832..23a90ec 100644 --- a/synergy_scheduler_manager/keystone_manager.py +++ b/synergy_scheduler_manager/keystone_manager.py @@ -1,8 +1,10 @@ import json +import logging import requests from common.domain import Domain from common.endpoint import Endpoint +from common.messaging import AMQP from common.project import Project from common.role import Role from common.service import Service @@ -13,6 +15,7 @@ from datetime import datetime from datetime import timedelta from oslo_config import cfg from synergy.common.manager import Manager +from synergy.exception import SynergyError __author__ = "Lisa Zangrando" @@ -33,8 +36,8 @@ either express or implied. See the License for the specific language governing permissions and limitations under the License.""" - CONF = cfg.CONF +LOG = logging.getLogger(__name__) class KeystoneManager(Manager): @@ -86,10 +89,24 @@ class KeystoneManager(Manager): cfg.StrOpt("ssl_cert_file", help="set the SSL client certificate (PEM encoded)", default=None, + required=False), + cfg.StrOpt("amqp_url", + help="the amqp transport url", + default=None, + required=True), + cfg.StrOpt("amqp_exchange", + help="the amqp exchange", + default="keystone", + required=False), + cfg.StrOpt("amqp_topic", + help="the notification topic", + default="notification", required=False) ] def setup(self): + self.token = None + self.configured = False self.auth_url = CONF.KeystoneManager.auth_url self.ssl_ca_file = CONF.KeystoneManager.ssl_ca_file self.ssl_cert_file = CONF.KeystoneManager.ssl_cert_file @@ -102,51 +119,41 @@ class KeystoneManager(Manager): self.timeout = CONF.KeystoneManager.timeout self.trust_expiration = CONF.KeystoneManager.trust_expiration self.clock_skew = CONF.KeystoneManager.clock_skew - self.token = None self.authenticate() def task(self): - pass + if not self.configured: + amqp_url = CONF.KeystoneManager.amqp_url + amqp_exchange = CONF.KeystoneManager.amqp_exchange + amqp_topic = CONF.KeystoneManager.amqp_topic + + if amqp_url: + try: + self.messaging = AMQP(amqp_url) + + target = self.messaging.getTarget(topic=amqp_topic, + exchange=amqp_exchange) + + self.listener = self.messaging.getNotificationListener( + targets=[target], + endpoints=[self]) + self.listener.start() + except Exception as ex: + LOG.info(ex) + self.configured = True def destroy(self): pass def execute(self, command, *args, **kargs): - if command == "GET_USERS": - return self.getUsers(*args, **kargs) - elif command == "GET_USER": - return self.getProject(*args, **kargs) - elif command == "GET_USER_ROLES": - return self.getUserRoles(*args, **kargs) - elif command == "GET_PROJECTS": - return self.getProjects() - elif command == "GET_PROJECT": - return self.getProject(*args, **kargs) - elif command == "GET_ROLES": - return self.getRoles() - elif command == "GET_ROLE": - return self.getRole(*args, **kargs) - elif command == "GET_TOKEN": - return self.getToken() - elif command == "DELETE_TOKEN": - return self.deleteToken(*args, **kargs) - elif command == "VALIDATE_TOKEN": - return self.validateToken(*args, **kargs) - elif command == "GET_ENDPOINTS": - return self.getEndpoints() - elif command == "GET_ENDPOINT": - return self.getEndpoints() - elif command == "GET_SERVICES": - return self.getServices() - elif command == "GET_SERVICE": - return self.getService(*args, **kargs) - else: - return None + pass - def doOnEvent(self, event_type, *args, **kargs): - if event_type == "GET_PROJECTS": - kargs["result"].extend(self.getProjects()) + def info(self, context, publisher_id, event_type, payload, metadata): + try: + self.notify(event_type, **payload) + except Exception as ex: + LOG.info(ex) def authenticate(self): if self.token is not None: @@ -195,7 +202,7 @@ class KeystoneManager(Manager): response.raise_for_status() if not response.text: - raise Exception("authentication failed!") + raise SynergyError("authentication failed!") token_subject = response.headers["X-Subject-Token"] token_data = response.json() @@ -207,8 +214,8 @@ class KeystoneManager(Manager): response = self.getResource("users/%s" % id, "GET") except requests.exceptions.HTTPError as ex: response = ex.response.json() - raise Exception("error on retrieving the user info (id=%r): %s" - % (id, response["error"]["message"])) + raise SynergyError("error on retrieving the user info (id=%r): %s" + % (id, response["error"]["message"])) user = None @@ -251,9 +258,9 @@ class KeystoneManager(Manager): users = user_list.values() except requests.exceptions.HTTPError as ex: response = ex.response.json() - raise Exception("error on retrieving the project's users " - "(id=%r): %s" % (prj_id, - response["error"]["message"])) + message = response["error"]["message"] + raise SynergyError("error on retrieving the project's users " + "(id=%r): %s" % (prj_id, message)) else: try: response = self.getResource("/users", "GET") @@ -268,8 +275,9 @@ class KeystoneManager(Manager): users.append(user) except requests.exceptions.HTTPError as ex: response = ex.response.json() - raise Exception("error on retrieving the users list: %s" - % response["error"]["message"]) + message = response["error"]["message"] + raise SynergyError("error on retrieving the users list: %s" + % message) return users @@ -279,10 +287,10 @@ class KeystoneManager(Manager): % (project_id, user_id), "GET") except requests.exceptions.HTTPError as ex: response = ex.response.json() - raise Exception("error on retrieving the user's roles (usrId=%r, " - "prjId=%r): %s" % (user_id, - project_id, - response["error"]["message"])) + message = response["error"]["message"] + raise SynergyError("error on retrieving the user's roles (usrId=%r" + ", prjId=%r): %s" + % (user_id, project_id, message)) roles = [] if response: @@ -302,9 +310,8 @@ class KeystoneManager(Manager): response = self.getResource("/domains/%s" % id, "GET") except requests.exceptions.HTTPError as ex: response = ex.response.json() - raise Exception( - "error on retrieving the domain (id=%r, msg=%s)." % - (id, response["error"]["message"])) + raise SynergyError("error on retrieving the domain (id=%r, msg=%s)" + % (id, response["error"]["message"])) domain = None @@ -327,8 +334,8 @@ class KeystoneManager(Manager): response = self.getResource("/domains", "GET", data=data) except requests.exceptions.HTTPError as ex: response = ex.response.json() - raise Exception("error on retrieving the domains list: %s" - % response["error"]["message"]) + raise SynergyError("error on retrieving the domains list: %s" + % response["error"]["message"]) domains = [] @@ -350,9 +357,8 @@ class KeystoneManager(Manager): response = self.getResource("/projects/%s" % id, "GET") except requests.exceptions.HTTPError as ex: response = ex.response.json() - raise Exception( - "error on retrieving the project (id=%r, msg=%s)." % - (id, response["error"]["message"])) + raise SynergyError("error on retrieving the project (id=%r, " + "msg=%s)" % (id, response["error"]["message"])) project = None @@ -366,27 +372,30 @@ class KeystoneManager(Manager): return project - def getProjects(self, usr_id=None, domain_id=None): + def getProjects(self, usr_id=None, name=None, domain_id=None): if usr_id: try: response = self.getResource( "users/%s/projects" % usr_id, "GET") except requests.exceptions.HTTPError as ex: response = ex.response.json() - raise Exception("error on retrieving the users's projects (id=" - "%r): %s" % (usr_id, - response["error"]["message"])) + message = response["error"]["message"] + raise SynergyError("error on retrieving the users's projects " + "(id=%r): %s" % (usr_id, message)) else: - data = None + data = {} if domain_id: - data = {"domain_id": domain_id} + data["domain_id"] = domain_id + + if name: + data["name"] = name try: response = self.getResource("/projects", "GET", data=data) except requests.exceptions.HTTPError as ex: response = ex.response.json() - raise Exception("error on retrieving the projects list: %s" - % response["error"]["message"]) + raise SynergyError("error on retrieving the projects list: %s" + % response["error"]["message"]) projects = [] @@ -408,8 +417,8 @@ class KeystoneManager(Manager): response = self.getResource("/roles/%s" % id, "GET") except requests.exceptions.HTTPError as ex: response = ex.response.json() - raise Exception("error on retrieving the role info (id=%r): %s" - % (id, response["error"]["message"])) + raise SynergyError("error on retrieving the role info (id=%r): %s" + % (id, response["error"]["message"])) role = None @@ -426,8 +435,8 @@ class KeystoneManager(Manager): response = self.getResource("/roles", "GET") except requests.exceptions.HTTPError as ex: response = ex.response.json() - raise Exception("error on retrieving the roles list: %s" - % response["error"]["message"]) + raise SynergyError("error on retrieving the roles list: %s" + % response["error"]["message"]) roles = [] @@ -467,8 +476,8 @@ class KeystoneManager(Manager): "POST", data=data, token=token) except requests.exceptions.HTTPError as ex: response = ex.response.json() - raise Exception("error on retrieving the trust info (id=%r): %s" - % (id, response["error"]["message"])) + raise SynergyError("error on retrieving the trust info (id=%r): %s" + % (id, response["error"]["message"])) trust = Trust(response["trust"]) trust.keystone_url = self.auth_url @@ -482,8 +491,8 @@ class KeystoneManager(Manager): response = self.getResource("/OS-TRUST/trusts/%s" % id, "GET") except requests.exceptions.HTTPError as ex: response = ex.response.json() - raise Exception("error on retrieving the trust info (id=%r): %s" - % (id, response["error"]["message"])) + raise SynergyError("error on retrieving the trust info (id=%r): %s" + % (id, response["error"]["message"])) trust = None @@ -500,14 +509,14 @@ class KeystoneManager(Manager): token = self.getToken() if token.isExpired(): - raise Exception("token expired!") + raise SynergyError("token expired!") try: self.getResource("/OS-TRUST/trusts/%s" % id, "DELETE", token=token) except requests.exceptions.HTTPError as ex: response = ex.response.json() - raise Exception("error on deleting the trust (id=%r): %s" - % (id, response["error"]["message"])) + raise SynergyError("error on deleting the trust (id=%r): %s" + % (id, response["error"]["message"])) def getTrusts(self, user_id=None, isTrustor=True, token=None): url = "/OS-TRUST/trusts" @@ -523,8 +532,8 @@ class KeystoneManager(Manager): response = self.getResource(url, "GET", token=token, data=data) except requests.exceptions.HTTPError as ex: response = ex.response.json() - raise Exception("error on retrieving the trust list (id=%r): %s" - % (id, response["error"]["message"])) + raise SynergyError("error on retrieving the trust list (id=%r): %s" + % (id, response["error"]["message"])) trusts = [] @@ -581,7 +590,7 @@ class KeystoneManager(Manager): response.raise_for_status() if not response.text: - raise Exception("token not found!") + raise SynergyError("token not found!") token_subject = response.headers["X-Subject-Token"] token_data = response.json() @@ -594,8 +603,8 @@ class KeystoneManager(Manager): response = self.getResource("/endpoints/%s" % id, "GET") except requests.exceptions.HTTPError as ex: response = ex.response.json() - raise Exception("error on retrieving the endpoint (id=%r): %s" - % (id, response["error"]["message"])) + raise SynergyError("error on retrieving the endpoint (id=%r): " + "%s" % (id, response["error"]["message"])) if response: info = response["endpoint"] @@ -615,9 +624,9 @@ class KeystoneManager(Manager): endpoints = self.getEndpoints() except requests.exceptions.HTTPError as ex: response = ex.response.json() - raise Exception( - "error on retrieving the endpoints list (serviceId=%r)" % - response["error"]["message"]) + message = response["error"]["message"] + raise SynergyError("error on retrieving the endpoints list " + "(id=%r): %s" % (service_id, message)) if endpoints: for endpoint in endpoints: @@ -631,8 +640,8 @@ class KeystoneManager(Manager): response = self.getResource("/endpoints", "GET") except requests.exceptions.HTTPError as ex: response = ex.response.json() - raise Exception("error on retrieving the endpoints list: %s" - % response["error"]["message"]) + raise SynergyError("error on retrieving the endpoints list: %s" + % response["error"]["message"]) endpoints = [] @@ -660,8 +669,9 @@ class KeystoneManager(Manager): response = self.getResource("/services/%s" % id, "GET") except requests.exceptions.HTTPError as ex: response = ex.response.json() - raise Exception("error on retrieving the service info (id=%r)" - ": %s" % (id, response["error"]["message"])) + message = response["error"]["message"] + raise SynergyError("error on retrieving the service info " + "(id=%r): %s" % (id, message)) if response: info = response["service"] @@ -699,8 +709,8 @@ class KeystoneManager(Manager): response = self.getResource("/services", "GET") except requests.exceptions.HTTPError as ex: response = ex.response.json() - raise Exception("error on retrieving the services list: %s" - % response["error"]["message"]) + raise SynergyError("error on retrieving the services list: %s" + % response["error"]["message"]) services = [] @@ -733,7 +743,7 @@ class KeystoneManager(Manager): self, resource, method, version=None, data=None, token=None): if token: if token.isExpired(): - raise Exception("token expired!") + raise SynergyError("token expired!") else: self.authenticate() token = self.getToken() @@ -787,7 +797,7 @@ class KeystoneManager(Manager): verify=self.ssl_ca_file, cert=self.ssl_cert_file) else: - raise Exception("wrong HTTP method: %s" % method) + raise SynergyError("wrong HTTP method: %s" % method) if response.status_code != requests.codes.ok: response.raise_for_status()