Added support for notifications to KeystoneManager

This update enables KeystoneManager to receive asynchronous
notifications about the users and projects status from Keystone
service

Change-Id: Ia75b89c825bb8536e0462eff7a873699b219d781
Sem-Ver: bugfix
This commit is contained in:
Lisa Zangrando 2017-07-17 14:23:54 +02:00
parent fc16dbd695
commit 23e9382c7d
3 changed files with 111 additions and 92 deletions

View File

@ -101,6 +101,15 @@ clock_skew = 60
# set the SSL client certificate (PEM encoded) # set the SSL client certificate (PEM encoded)
#ssl_cert_file = #ssl_cert_file =
# set the AMQP server url (e.g. rabbit://RABBIT_USER:RABBIT_PASS@RABBIT_HOST_IP)
#amqp_url =
# set the AMQP exchange (default: keystone)
amqp_exchange = keystone
# set the AMQP notification topic (default: notification)
amqp_topic = notification
[NovaManager] [NovaManager]
autostart = True autostart = True

View File

@ -106,5 +106,5 @@ class AMQP(object):
return oslo_msg.get_notification_listener(self.TRANSPORT, return oslo_msg.get_notification_listener(self.TRANSPORT,
targets, targets,
endpoints, endpoints,
allow_requeue=True, allow_requeue=False,
executor="eventlet") executor="eventlet")

View File

@ -1,8 +1,10 @@
import json import json
import logging
import requests import requests
from common.domain import Domain from common.domain import Domain
from common.endpoint import Endpoint from common.endpoint import Endpoint
from common.messaging import AMQP
from common.project import Project from common.project import Project
from common.role import Role from common.role import Role
from common.service import Service from common.service import Service
@ -13,6 +15,7 @@ from datetime import datetime
from datetime import timedelta from datetime import timedelta
from oslo_config import cfg from oslo_config import cfg
from synergy.common.manager import Manager from synergy.common.manager import Manager
from synergy.exception import SynergyError
__author__ = "Lisa Zangrando" __author__ = "Lisa Zangrando"
@ -33,8 +36,8 @@ either express or implied.
See the License for the specific language governing See the License for the specific language governing
permissions and limitations under the License.""" permissions and limitations under the License."""
CONF = cfg.CONF CONF = cfg.CONF
LOG = logging.getLogger(__name__)
class KeystoneManager(Manager): class KeystoneManager(Manager):
@ -86,10 +89,24 @@ class KeystoneManager(Manager):
cfg.StrOpt("ssl_cert_file", cfg.StrOpt("ssl_cert_file",
help="set the SSL client certificate (PEM encoded)", help="set the SSL client certificate (PEM encoded)",
default=None, default=None,
required=False),
cfg.StrOpt("amqp_url",
help="the amqp transport url",
default=None,
required=True),
cfg.StrOpt("amqp_exchange",
help="the amqp exchange",
default="keystone",
required=False),
cfg.StrOpt("amqp_topic",
help="the notification topic",
default="notification",
required=False) required=False)
] ]
def setup(self): def setup(self):
self.token = None
self.configured = False
self.auth_url = CONF.KeystoneManager.auth_url self.auth_url = CONF.KeystoneManager.auth_url
self.ssl_ca_file = CONF.KeystoneManager.ssl_ca_file self.ssl_ca_file = CONF.KeystoneManager.ssl_ca_file
self.ssl_cert_file = CONF.KeystoneManager.ssl_cert_file self.ssl_cert_file = CONF.KeystoneManager.ssl_cert_file
@ -102,51 +119,41 @@ class KeystoneManager(Manager):
self.timeout = CONF.KeystoneManager.timeout self.timeout = CONF.KeystoneManager.timeout
self.trust_expiration = CONF.KeystoneManager.trust_expiration self.trust_expiration = CONF.KeystoneManager.trust_expiration
self.clock_skew = CONF.KeystoneManager.clock_skew self.clock_skew = CONF.KeystoneManager.clock_skew
self.token = None
self.authenticate() self.authenticate()
def task(self): def task(self):
pass if not self.configured:
amqp_url = CONF.KeystoneManager.amqp_url
amqp_exchange = CONF.KeystoneManager.amqp_exchange
amqp_topic = CONF.KeystoneManager.amqp_topic
if amqp_url:
try:
self.messaging = AMQP(amqp_url)
target = self.messaging.getTarget(topic=amqp_topic,
exchange=amqp_exchange)
self.listener = self.messaging.getNotificationListener(
targets=[target],
endpoints=[self])
self.listener.start()
except Exception as ex:
LOG.info(ex)
self.configured = True
def destroy(self): def destroy(self):
pass pass
def execute(self, command, *args, **kargs): def execute(self, command, *args, **kargs):
if command == "GET_USERS": pass
return self.getUsers(*args, **kargs)
elif command == "GET_USER":
return self.getProject(*args, **kargs)
elif command == "GET_USER_ROLES":
return self.getUserRoles(*args, **kargs)
elif command == "GET_PROJECTS":
return self.getProjects()
elif command == "GET_PROJECT":
return self.getProject(*args, **kargs)
elif command == "GET_ROLES":
return self.getRoles()
elif command == "GET_ROLE":
return self.getRole(*args, **kargs)
elif command == "GET_TOKEN":
return self.getToken()
elif command == "DELETE_TOKEN":
return self.deleteToken(*args, **kargs)
elif command == "VALIDATE_TOKEN":
return self.validateToken(*args, **kargs)
elif command == "GET_ENDPOINTS":
return self.getEndpoints()
elif command == "GET_ENDPOINT":
return self.getEndpoints()
elif command == "GET_SERVICES":
return self.getServices()
elif command == "GET_SERVICE":
return self.getService(*args, **kargs)
else:
return None
def doOnEvent(self, event_type, *args, **kargs): def info(self, context, publisher_id, event_type, payload, metadata):
if event_type == "GET_PROJECTS": try:
kargs["result"].extend(self.getProjects()) self.notify(event_type, **payload)
except Exception as ex:
LOG.info(ex)
def authenticate(self): def authenticate(self):
if self.token is not None: if self.token is not None:
@ -195,7 +202,7 @@ class KeystoneManager(Manager):
response.raise_for_status() response.raise_for_status()
if not response.text: if not response.text:
raise Exception("authentication failed!") raise SynergyError("authentication failed!")
token_subject = response.headers["X-Subject-Token"] token_subject = response.headers["X-Subject-Token"]
token_data = response.json() token_data = response.json()
@ -207,8 +214,8 @@ class KeystoneManager(Manager):
response = self.getResource("users/%s" % id, "GET") response = self.getResource("users/%s" % id, "GET")
except requests.exceptions.HTTPError as ex: except requests.exceptions.HTTPError as ex:
response = ex.response.json() response = ex.response.json()
raise Exception("error on retrieving the user info (id=%r): %s" raise SynergyError("error on retrieving the user info (id=%r): %s"
% (id, response["error"]["message"])) % (id, response["error"]["message"]))
user = None user = None
@ -251,9 +258,9 @@ class KeystoneManager(Manager):
users = user_list.values() users = user_list.values()
except requests.exceptions.HTTPError as ex: except requests.exceptions.HTTPError as ex:
response = ex.response.json() response = ex.response.json()
raise Exception("error on retrieving the project's users " message = response["error"]["message"]
"(id=%r): %s" % (prj_id, raise SynergyError("error on retrieving the project's users "
response["error"]["message"])) "(id=%r): %s" % (prj_id, message))
else: else:
try: try:
response = self.getResource("/users", "GET") response = self.getResource("/users", "GET")
@ -268,8 +275,9 @@ class KeystoneManager(Manager):
users.append(user) users.append(user)
except requests.exceptions.HTTPError as ex: except requests.exceptions.HTTPError as ex:
response = ex.response.json() response = ex.response.json()
raise Exception("error on retrieving the users list: %s" message = response["error"]["message"]
% response["error"]["message"]) raise SynergyError("error on retrieving the users list: %s"
% message)
return users return users
@ -279,10 +287,10 @@ class KeystoneManager(Manager):
% (project_id, user_id), "GET") % (project_id, user_id), "GET")
except requests.exceptions.HTTPError as ex: except requests.exceptions.HTTPError as ex:
response = ex.response.json() response = ex.response.json()
raise Exception("error on retrieving the user's roles (usrId=%r, " message = response["error"]["message"]
"prjId=%r): %s" % (user_id, raise SynergyError("error on retrieving the user's roles (usrId=%r"
project_id, ", prjId=%r): %s"
response["error"]["message"])) % (user_id, project_id, message))
roles = [] roles = []
if response: if response:
@ -302,9 +310,8 @@ class KeystoneManager(Manager):
response = self.getResource("/domains/%s" % id, "GET") response = self.getResource("/domains/%s" % id, "GET")
except requests.exceptions.HTTPError as ex: except requests.exceptions.HTTPError as ex:
response = ex.response.json() response = ex.response.json()
raise Exception( raise SynergyError("error on retrieving the domain (id=%r, msg=%s)"
"error on retrieving the domain (id=%r, msg=%s)." % % (id, response["error"]["message"]))
(id, response["error"]["message"]))
domain = None domain = None
@ -327,8 +334,8 @@ class KeystoneManager(Manager):
response = self.getResource("/domains", "GET", data=data) response = self.getResource("/domains", "GET", data=data)
except requests.exceptions.HTTPError as ex: except requests.exceptions.HTTPError as ex:
response = ex.response.json() response = ex.response.json()
raise Exception("error on retrieving the domains list: %s" raise SynergyError("error on retrieving the domains list: %s"
% response["error"]["message"]) % response["error"]["message"])
domains = [] domains = []
@ -350,9 +357,8 @@ class KeystoneManager(Manager):
response = self.getResource("/projects/%s" % id, "GET") response = self.getResource("/projects/%s" % id, "GET")
except requests.exceptions.HTTPError as ex: except requests.exceptions.HTTPError as ex:
response = ex.response.json() response = ex.response.json()
raise Exception( raise SynergyError("error on retrieving the project (id=%r, "
"error on retrieving the project (id=%r, msg=%s)." % "msg=%s)" % (id, response["error"]["message"]))
(id, response["error"]["message"]))
project = None project = None
@ -366,27 +372,30 @@ class KeystoneManager(Manager):
return project return project
def getProjects(self, usr_id=None, domain_id=None): def getProjects(self, usr_id=None, name=None, domain_id=None):
if usr_id: if usr_id:
try: try:
response = self.getResource( response = self.getResource(
"users/%s/projects" % usr_id, "GET") "users/%s/projects" % usr_id, "GET")
except requests.exceptions.HTTPError as ex: except requests.exceptions.HTTPError as ex:
response = ex.response.json() response = ex.response.json()
raise Exception("error on retrieving the users's projects (id=" message = response["error"]["message"]
"%r): %s" % (usr_id, raise SynergyError("error on retrieving the users's projects "
response["error"]["message"])) "(id=%r): %s" % (usr_id, message))
else: else:
data = None data = {}
if domain_id: if domain_id:
data = {"domain_id": domain_id} data["domain_id"] = domain_id
if name:
data["name"] = name
try: try:
response = self.getResource("/projects", "GET", data=data) response = self.getResource("/projects", "GET", data=data)
except requests.exceptions.HTTPError as ex: except requests.exceptions.HTTPError as ex:
response = ex.response.json() response = ex.response.json()
raise Exception("error on retrieving the projects list: %s" raise SynergyError("error on retrieving the projects list: %s"
% response["error"]["message"]) % response["error"]["message"])
projects = [] projects = []
@ -408,8 +417,8 @@ class KeystoneManager(Manager):
response = self.getResource("/roles/%s" % id, "GET") response = self.getResource("/roles/%s" % id, "GET")
except requests.exceptions.HTTPError as ex: except requests.exceptions.HTTPError as ex:
response = ex.response.json() response = ex.response.json()
raise Exception("error on retrieving the role info (id=%r): %s" raise SynergyError("error on retrieving the role info (id=%r): %s"
% (id, response["error"]["message"])) % (id, response["error"]["message"]))
role = None role = None
@ -426,8 +435,8 @@ class KeystoneManager(Manager):
response = self.getResource("/roles", "GET") response = self.getResource("/roles", "GET")
except requests.exceptions.HTTPError as ex: except requests.exceptions.HTTPError as ex:
response = ex.response.json() response = ex.response.json()
raise Exception("error on retrieving the roles list: %s" raise SynergyError("error on retrieving the roles list: %s"
% response["error"]["message"]) % response["error"]["message"])
roles = [] roles = []
@ -467,8 +476,8 @@ class KeystoneManager(Manager):
"POST", data=data, token=token) "POST", data=data, token=token)
except requests.exceptions.HTTPError as ex: except requests.exceptions.HTTPError as ex:
response = ex.response.json() response = ex.response.json()
raise Exception("error on retrieving the trust info (id=%r): %s" raise SynergyError("error on retrieving the trust info (id=%r): %s"
% (id, response["error"]["message"])) % (id, response["error"]["message"]))
trust = Trust(response["trust"]) trust = Trust(response["trust"])
trust.keystone_url = self.auth_url trust.keystone_url = self.auth_url
@ -482,8 +491,8 @@ class KeystoneManager(Manager):
response = self.getResource("/OS-TRUST/trusts/%s" % id, "GET") response = self.getResource("/OS-TRUST/trusts/%s" % id, "GET")
except requests.exceptions.HTTPError as ex: except requests.exceptions.HTTPError as ex:
response = ex.response.json() response = ex.response.json()
raise Exception("error on retrieving the trust info (id=%r): %s" raise SynergyError("error on retrieving the trust info (id=%r): %s"
% (id, response["error"]["message"])) % (id, response["error"]["message"]))
trust = None trust = None
@ -500,14 +509,14 @@ class KeystoneManager(Manager):
token = self.getToken() token = self.getToken()
if token.isExpired(): if token.isExpired():
raise Exception("token expired!") raise SynergyError("token expired!")
try: try:
self.getResource("/OS-TRUST/trusts/%s" % id, "DELETE", token=token) self.getResource("/OS-TRUST/trusts/%s" % id, "DELETE", token=token)
except requests.exceptions.HTTPError as ex: except requests.exceptions.HTTPError as ex:
response = ex.response.json() response = ex.response.json()
raise Exception("error on deleting the trust (id=%r): %s" raise SynergyError("error on deleting the trust (id=%r): %s"
% (id, response["error"]["message"])) % (id, response["error"]["message"]))
def getTrusts(self, user_id=None, isTrustor=True, token=None): def getTrusts(self, user_id=None, isTrustor=True, token=None):
url = "/OS-TRUST/trusts" url = "/OS-TRUST/trusts"
@ -523,8 +532,8 @@ class KeystoneManager(Manager):
response = self.getResource(url, "GET", token=token, data=data) response = self.getResource(url, "GET", token=token, data=data)
except requests.exceptions.HTTPError as ex: except requests.exceptions.HTTPError as ex:
response = ex.response.json() response = ex.response.json()
raise Exception("error on retrieving the trust list (id=%r): %s" raise SynergyError("error on retrieving the trust list (id=%r): %s"
% (id, response["error"]["message"])) % (id, response["error"]["message"]))
trusts = [] trusts = []
@ -581,7 +590,7 @@ class KeystoneManager(Manager):
response.raise_for_status() response.raise_for_status()
if not response.text: if not response.text:
raise Exception("token not found!") raise SynergyError("token not found!")
token_subject = response.headers["X-Subject-Token"] token_subject = response.headers["X-Subject-Token"]
token_data = response.json() token_data = response.json()
@ -594,8 +603,8 @@ class KeystoneManager(Manager):
response = self.getResource("/endpoints/%s" % id, "GET") response = self.getResource("/endpoints/%s" % id, "GET")
except requests.exceptions.HTTPError as ex: except requests.exceptions.HTTPError as ex:
response = ex.response.json() response = ex.response.json()
raise Exception("error on retrieving the endpoint (id=%r): %s" raise SynergyError("error on retrieving the endpoint (id=%r): "
% (id, response["error"]["message"])) "%s" % (id, response["error"]["message"]))
if response: if response:
info = response["endpoint"] info = response["endpoint"]
@ -615,9 +624,9 @@ class KeystoneManager(Manager):
endpoints = self.getEndpoints() endpoints = self.getEndpoints()
except requests.exceptions.HTTPError as ex: except requests.exceptions.HTTPError as ex:
response = ex.response.json() response = ex.response.json()
raise Exception( message = response["error"]["message"]
"error on retrieving the endpoints list (serviceId=%r)" % raise SynergyError("error on retrieving the endpoints list "
response["error"]["message"]) "(id=%r): %s" % (service_id, message))
if endpoints: if endpoints:
for endpoint in endpoints: for endpoint in endpoints:
@ -631,8 +640,8 @@ class KeystoneManager(Manager):
response = self.getResource("/endpoints", "GET") response = self.getResource("/endpoints", "GET")
except requests.exceptions.HTTPError as ex: except requests.exceptions.HTTPError as ex:
response = ex.response.json() response = ex.response.json()
raise Exception("error on retrieving the endpoints list: %s" raise SynergyError("error on retrieving the endpoints list: %s"
% response["error"]["message"]) % response["error"]["message"])
endpoints = [] endpoints = []
@ -660,8 +669,9 @@ class KeystoneManager(Manager):
response = self.getResource("/services/%s" % id, "GET") response = self.getResource("/services/%s" % id, "GET")
except requests.exceptions.HTTPError as ex: except requests.exceptions.HTTPError as ex:
response = ex.response.json() response = ex.response.json()
raise Exception("error on retrieving the service info (id=%r)" message = response["error"]["message"]
": %s" % (id, response["error"]["message"])) raise SynergyError("error on retrieving the service info "
"(id=%r): %s" % (id, message))
if response: if response:
info = response["service"] info = response["service"]
@ -699,8 +709,8 @@ class KeystoneManager(Manager):
response = self.getResource("/services", "GET") response = self.getResource("/services", "GET")
except requests.exceptions.HTTPError as ex: except requests.exceptions.HTTPError as ex:
response = ex.response.json() response = ex.response.json()
raise Exception("error on retrieving the services list: %s" raise SynergyError("error on retrieving the services list: %s"
% response["error"]["message"]) % response["error"]["message"])
services = [] services = []
@ -733,7 +743,7 @@ class KeystoneManager(Manager):
self, resource, method, version=None, data=None, token=None): self, resource, method, version=None, data=None, token=None):
if token: if token:
if token.isExpired(): if token.isExpired():
raise Exception("token expired!") raise SynergyError("token expired!")
else: else:
self.authenticate() self.authenticate()
token = self.getToken() token = self.getToken()
@ -787,7 +797,7 @@ class KeystoneManager(Manager):
verify=self.ssl_ca_file, verify=self.ssl_ca_file,
cert=self.ssl_cert_file) cert=self.ssl_cert_file)
else: else:
raise Exception("wrong HTTP method: %s" % method) raise SynergyError("wrong HTTP method: %s" % method)
if response.status_code != requests.codes.ok: if response.status_code != requests.codes.ok:
response.raise_for_status() response.raise_for_status()