From 90e6d14aab9f003fbb203b89547e613bd423f018 Mon Sep 17 00:00:00 2001 From: Lisa Zangrando Date: Wed, 7 Dec 2016 13:18:42 +0100 Subject: [PATCH] Make SchedulerManager handle ERROR notifications. SchedulerManager receives all VMs status changes through AMQP but it doesn't handle the ERROR notifications (not used before to Liberty version) - updated test unit (test_scheduler_manager.TestNotifications.test_info_quota) Change-Id: Ia1e4db66a743f2d42f01241505310cbed37b625e Sem-Ver: bugfix Closes-bug: #1648057 --- synergy_scheduler_manager/common/server.py | 11 +- .../scheduler_manager.py | 123 ++++++++++++++---- .../functional/test_scheduler_manager.py | 104 ++++++++++++--- 3 files changed, 193 insertions(+), 45 deletions(-) diff --git a/synergy_scheduler_manager/common/server.py b/synergy_scheduler_manager/common/server.py index 8ae5229..cf71488 100644 --- a/synergy_scheduler_manager/common/server.py +++ b/synergy_scheduler_manager/common/server.py @@ -33,7 +33,10 @@ class Server(SynergyObject): if not date: return None elif isinstance(date, basestring): - return datetime.strptime(date, "%Y-%m-%dT%H:%M:%SZ") + try: + return datetime.strptime(date, "%Y-%m-%dT%H:%M:%SZ") + except Exception: + return datetime.strptime(date, "%Y-%m-%dT%H:%M:%S.%f") elif isinstance(date, datetime): return date else: @@ -130,6 +133,12 @@ class Server(SynergyObject): def setTerminatedAt(self, terminated_at): self.set("terminated_at", self.__getDateTime(terminated_at)) + def getDeletedAt(self): + return self.get("deleted_at") + + def setDeletedAt(self, deleted_at): + self.set("deleted_at", self.__getDateTime(deleted_at)) + def isEphemeral(self): return self.get("type") == "ephemeral" diff --git a/synergy_scheduler_manager/scheduler_manager.py b/synergy_scheduler_manager/scheduler_manager.py index fb7fda4..e3564a7 100644 --- a/synergy_scheduler_manager/scheduler_manager.py +++ b/synergy_scheduler_manager/scheduler_manager.py @@ -34,10 +34,47 @@ LOG = logging.getLogger(__name__) class Notifications(object): - def __init__(self, projects): + def __init__(self, projects, nova_manager): super(Notifications, self).__init__() self.projects = projects + self.nova_manager = nova_manager + + def _makeServer(self, server_info): + if not server_info: + return + + flavor = Flavor() + flavor.setMemory(server_info["memory_mb"]) + flavor.setVCPUs(server_info["vcpus"]) + flavor.setStorage(server_info["root_gb"]) + + if "instance_type" in server_info: + flavor.setName(server_info["instance_type"]) + + server = Server() + server.setFlavor(flavor) + server.setUserId(server_info["user_id"]) + server.setMetadata(server_info["metadata"]) + server.setDeletedAt(server_info["deleted_at"]) + server.setTerminatedAt(server_info["terminated_at"]) + + if "uuid" in server_info: + server.setId(server_info["uuid"]) + elif "instance_id" in server_info: + server.setId(server_info["instance_id"]) + + if "project_id" in server_info: + server.setProjectId(server_info["project_id"]) + elif "tenant_id" in server_info: + server.setProjectId(server_info["tenant_id"]) + + if "vm_state" in server_info: + server.setState(server_info["vm_state"]) + elif "state" in server_info: + server.setState(server_info["state"]) + + return server def info(self, ctxt, publisher_id, event_type, payload, metadata): LOG.debug("Notification INFO: event_type=%s payload=%s" @@ -52,41 +89,36 @@ class Notifications(object): (state == "deleted" or state == "error" or state == "building")) or (event_type == "compute.instance.update" and state == "error") or (event_type == "scheduler.run_instance" and state == "error")): - instance_info = None + server_info = None if event_type == "scheduler.run_instance": - instance_info = payload["request_spec"]["instance_type"] + server_info = payload["request_spec"]["instance_type"] else: - instance_info = payload + server_info = payload - if instance_info["tenant_id"] not in self.projects: + if server_info["tenant_id"] not in self.projects: return - flavor = Flavor() - flavor.setName(instance_info["instance_type"]) - flavor.setMemory(instance_info["memory_mb"]) - flavor.setVCPUs(instance_info["vcpus"]) - flavor.setStorage(instance_info["root_gb"]) + server = self._makeServer(server_info) + flavor = server.getFlavor() - server = Server() - server.setFlavor(flavor) - server.setId(instance_info["instance_id"]) - server.setUserId(instance_info["user_id"]) - server.setProjectId(instance_info["tenant_id"]) - server.setMetadata(instance_info["metadata"]) + message = "N/A" + if "message" in server_info: + message = server_info["message"] LOG.debug("Notification INFO (type=%s state=%s): vcpus=%s " - "memory=%s prj_id=%s server_id=%s" - % (event_type, state, flavor.getVCPUs(), + "memory=%s prj_id=%s server_id=%s (message=%s)" + % (event_type, server.getState(), flavor.getVCPUs(), flavor.getMemory(), server.getProjectId(), - server.getId())) + server.getId(), message)) quota = self.projects[server.getProjectId()].getQuota() try: quota.release(server) except Exception as ex: - LOG.warn("Notification INFO: %s" % ex) + LOG.warn("Cannot release server id=%r: %s" + % (server.getId(), ex)) LOG.error("Exception has occured", exc_info=1) def warn(self, ctxt, publisher_id, event_type, payload, metadata): @@ -96,8 +128,52 @@ class Notifications(object): "payload=%s" % (event_type, state, instance_id, payload)) def error(self, ctxt, publisher_id, event_type, payload, metadata): - LOG.debug("Notification ERROR: event_type=%s payload=%s metadata=%s" - % (event_type, payload, metadata)) + server = None + message = "N\A" + server_info = None + + if event_type == "terminate_instance": + server_info = payload["args"]["instance"] + message = payload["exception"]["value"] + + elif event_type == "compute.instance.create.error" or\ + event_type == "compute.instance.update.error": + server_info = payload + message = payload["message"] + + server = self._makeServer(server_info) + + if not server: + LOG.info("Notification ERROR: event_type=%s payload=%s" + % (event_type, payload)) + return + + if server.getProjectId() not in self.projects: + return + + flavor = server.getFlavor() + + LOG.debug("Notification ERROR (type=%s state=%s): vcpus=%s " + "memory=%s prj_id=%s server_id=%s (error=%s)" + % (event_type, server.getState(), flavor.getVCPUs(), + flavor.getMemory(), server.getProjectId(), + server.getId(), message)) + + if not server.getTerminatedAt() and not server.getDeletedAt(): + try: + self.nova_manager.deleteServer(server) + except Exception as ex: + LOG.error("Cannot delete server id=%r: %s" + % (server.getId(), ex)) + + quota = self.projects[server.getProjectId()].getQuota() + + try: + quota.release(server) + except Exception as ex: + LOG.warn("Cannot release server id=%r: %s" + % (server.getId(), ex)) + LOG.error("Exception has occured", exc_info=1) class Worker(Thread): @@ -415,7 +491,8 @@ class SchedulerManager(Manager): self.workers.append(dynamic_worker) - self.notifications = Notifications(self.projects) + self.notifications = Notifications(self.projects, + self.nova_manager) target = self.nova_manager.getTarget(topic='notifications', exchange="nova") diff --git a/synergy_scheduler_manager/tests/functional/test_scheduler_manager.py b/synergy_scheduler_manager/tests/functional/test_scheduler_manager.py index e1b888c..6924b7b 100644 --- a/synergy_scheduler_manager/tests/functional/test_scheduler_manager.py +++ b/synergy_scheduler_manager/tests/functional/test_scheduler_manager.py @@ -13,40 +13,95 @@ from mock import create_autospec from mock import MagicMock from sqlalchemy.engine.base import Engine +from synergy_scheduler_manager.common.flavor import Flavor from synergy_scheduler_manager.common.project import Project - from synergy_scheduler_manager.common.queue import QueueDB from synergy_scheduler_manager.common.queue import QueueItem +from synergy_scheduler_manager.common.quota import SharedQuota +from synergy_scheduler_manager.common.server import Server from synergy_scheduler_manager.scheduler_manager import Notifications from synergy_scheduler_manager.scheduler_manager import Worker from synergy_scheduler_manager.tests.unit import base class TestNotifications(base.TestCase): - # TO COMPLETE + def test_info_quota(self): + SharedQuota.setSize("vcpus", 20) + SharedQuota.setSize("memory", 4096) + SharedQuota.enable() - project1 = Project() - project1.setId(1) - project1.setName("test1") + self.assertEqual(20, SharedQuota.getSize('vcpus')) + self.assertEqual(4096, SharedQuota.getSize('memory')) - project2 = Project() - project2.setId(2) - project2.setName("test2") + prj_a = Project() + prj_a.setId(1) + prj_a.setName("prj_a") - prjDict = {1: project1, 2: project2} + prj_b = Project() + prj_b.setId(2) + prj_b.setName("prj_b") + + prjDict = {1: prj_a, 2: prj_b} + + quota = prjDict[1].getQuota() + + quota.setSize("vcpus", 10, private=True) + quota.setSize("memory", 2048, private=True) + + self.assertEqual(10, quota.getSize('vcpus', private=True)) + self.assertEqual(2048, quota.getSize('memory', private=True)) + + quota.setSize("vcpus", + SharedQuota.getSize('vcpus'), + private=False) + quota.setSize("memory", + SharedQuota.getSize('memory'), + private=False) + + self.assertEqual(20, quota.getSize('vcpus', private=False)) + self.assertEqual(4096, quota.getSize('memory', private=False)) + + flavor = Flavor() + flavor.setVCPUs(2) + flavor.setMemory(512) + + server = Server() + server.setType("ephemeral") + server.setId("server_id") + server.setFlavor(flavor) + + self.assertEqual(True, server.isEphemeral()) + try: + allocated = quota.allocate(server, blocking=False) + except Exception as ex: + print(ex) + + self.assertEqual(True, allocated) + + self.assertEqual(0, quota.getUsage('vcpus', private=True)) + self.assertEqual(0, quota.getUsage('memory', private=True)) + + self.assertEqual(2, quota.getUsage('vcpus', private=False)) + self.assertEqual(512, quota.getUsage('memory', private=False)) + + self.assertEqual(2, SharedQuota.getUsage('vcpus')) + self.assertEqual(512, SharedQuota.getUsage('memory')) + + ns = Notifications(prjDict, None) - ns = Notifications(prjDict) payload = { "state": "deleted", - "instance_type": "instance_type", - "user_id": "user_id", - "root_gb": "root_gb", - "metadata": "metadata", - "instance_id": 1, - "tenant_id": 2, - "memory_mb": 3, - "vcpus": 4} + "deleted_at": "2016-12-09T10:06:10.000000", + "terminated_at": "2016-12-09T10:06:10.025305", + "instance_type": "m1.tiny", + "user_id": "user", + "root_gb": "1", + "metadata": {}, + "instance_id": "server_id", + "tenant_id": 1, + "memory_mb": 512, + "vcpus": 2} ns.info(ctxt=None, publisher_id=None, @@ -54,9 +109,16 @@ class TestNotifications(base.TestCase): payload=payload, metadata=None) - quota = ns.projects[2].getQuota() - self.assertEqual(0, quota.getUsage("memory", private=False)) - self.assertEqual(0, quota.getUsage("vcpus", private=False)) + quota = prjDict[1].getQuota() + + self.assertEqual(0, quota.getUsage("vcpus", private=True)) + self.assertEqual(0, quota.getUsage("memory", private=True)) + + self.assertEqual(0, quota.getUsage('vcpus', private=False)) + self.assertEqual(0, quota.getUsage('memory', private=False)) + + self.assertEqual(0, SharedQuota.getUsage('vcpus')) + self.assertEqual(0, SharedQuota.getUsage('memory')) class TestWorker(base.TestCase):