From 56c8d155ace8643755209c202caaa724178ea722 Mon Sep 17 00:00:00 2001 From: Lisa Zangrando Date: Fri, 15 Sep 2017 10:41:29 +0200 Subject: [PATCH] Synergy should scale up the oldest user requests from the queue The oldest requests should be processed quite first as soon as the user priority increases. This commit fixes the issue. BUG: #1717464 Change-Id: Iafc74531775d00aeb653cc92092b2bc7775f52d8 Sem-Ver: bugfix --- synergy_scheduler_manager/common/queue.py | 109 ++++++++++++------ synergy_scheduler_manager/common/request.py | 28 +---- synergy_scheduler_manager/common/server.py | 57 +++++---- .../fairshare_manager.py | 31 +---- synergy_scheduler_manager/nova_manager.py | 28 ++--- synergy_scheduler_manager/quota_manager.py | 2 +- .../scheduler_manager.py | 55 ++++----- .../functional/test_fairshare_manager.py | 43 ------- .../tests/unit/test_queue.py | 10 +- 9 files changed, 167 insertions(+), 196 deletions(-) diff --git a/synergy_scheduler_manager/common/queue.py b/synergy_scheduler_manager/common/queue.py index 4ae8a7c..60befab 100644 --- a/synergy_scheduler_manager/common/queue.py +++ b/synergy_scheduler_manager/common/queue.py @@ -1,5 +1,6 @@ +import heapq import json -import Queue as queue +import threading from datetime import datetime from sqlalchemy.exc import SQLAlchemyError @@ -95,13 +96,7 @@ class Queue(SynergyObject): def __init__(self, name="default", type="PRIORITY", db_engine=None): super(Queue, self).__init__() - if type == "FIFO": - self.queue = queue.Queue() - elif type == "LIFO": - self.queue = queue.LifoQueue() - elif type == "PRIORITY": - self.queue = queue.PriorityQueue() - else: + if type not in ["FIFO", "LIFO", "PRIORITY"]: raise SynergyError("queue type %r not supported" % type) self.set("type", type) @@ -109,11 +104,13 @@ class Queue(SynergyObject): self.set("size", 0) self.setName(name) self.db_engine = db_engine + self._items = [] + self.condition = threading.Condition() self._createTable() self._buildFromDB() - def _setSize(self, value): + def _incSize(self, value): size = self.get("size") self.set("size", size + value) @@ -127,12 +124,12 @@ class Queue(SynergyObject): self.set("is_closed", is_closed) def isEmpty(self): - return self.queue.empty() + return len(self._items) == 0 def close(self): self.setClosed(True) - def enqueue(self, user, data, priority=0): + def enqueue(self, user, data): if self.isClosed(): raise SynergyError("the queue is closed!") @@ -145,33 +142,47 @@ class Queue(SynergyObject): item = QueueItem() item.setUserId(user.getId()) item.setProjectId(user.getProjectId()) - item.setPriority(priority) + item.setPriority(user.getPriority().getValue()) item.setData(data) - self._insertItemDB(item) + with self.condition: + if self.getType() == "FIFO": + self._items.append(item) + elif self.getType() == "LIFO": + self._items.insert(0, item) + elif self.getType() == "PRIORITY": + heapq.heappush(self._items, (-item.getPriority(), + item.getCreationTime(), item)) - if self.getType() == "PRIORITY": - self.queue.put((-priority, item.getCreationTime(), item)) - else: - self.queue.put(item) - - self._setSize(1) + self._insertItemDB(item) + self._incSize(1) + self.condition.notifyAll() def dequeue(self, block=True, timeout=None, delete=False): if self.isClosed(): raise SynergyError("the queue is closed!") - if self.queue.empty() and not block: - return None + item = None - item = self.queue.get(block=block, timeout=timeout) + with self.condition: + while (item is None and not self.isClosed()): + if not self._items: + if block: + self.condition.wait(timeout) + if timeout: + break + else: + break + elif self.getType() == "PRIORITY": + item = heapq.heappop(self._items)[2] + else: + item = self._items.pop(0) + + self.condition.notifyAll() if not item: return None - if self.getType() == "PRIORITY": - item = item[2] - self._getItemDataDB(item) if delete: @@ -183,12 +194,44 @@ class Queue(SynergyObject): if self.isClosed(): raise SynergyError("the queue is closed!") - if self.getType() == "PRIORITY": - self.queue.put((-item.getPriority(), item.getCreationTime(), item)) - else: - self.queue.put(item) + with self.condition: + if self.getType() == "FIFO": + self._items.append(item) + elif self.getType() == "LIFO": + self._items.insert(0, item) + elif self.getType() == "PRIORITY": + heapq.heappush(self._items, (-item.getPriority(), + item.getCreationTime(), item)) - self._updateItemDB(item) + self._updateItemDB(item) + self._incSize(1) + self.condition.notifyAll() + + def updatePriority(self, user): + if self.isClosed(): + raise SynergyError("the queue is closed!") + + if self.getType() != "PRIORITY": + raise SynergyError("updatePriority() cannot be applied on this " + "queue type") + + new_items = [] + + with self.condition: + while self._items: + item = heapq.heappop(self._items)[2] + + if item.getUserId() == user.getId() and\ + item.getProjectId() == user.getProjectId(): + item.setPriority(user.getPriority().getValue()) + new_items.append(item) + + for item in new_items: + heapq.heappush(self._items, (-item.getPriority(), + item.getCreationTime(), item)) + + self._incSize(1) + self.condition.notifyAll() def getType(self): return self.get("type") @@ -230,15 +273,13 @@ class Queue(SynergyObject): connection.execute(QUERY, [item.getId()]) trans.commit() + self._incSize(-1) except SQLAlchemyError as ex: trans.rollback() raise SynergyError(ex.message) finally: connection.close() - self._setSize(-1) - self.queue.task_done() - def _createTable(self): if not self.db_engine: return @@ -282,7 +323,7 @@ TIMESTAMP NULL, `data` TEXT NOT NULL) ENGINE=InnoDB""" % self.getName() item.setLastUpdate(row[6]) self.restore(item) - self._setSize(1) + self._incSize(1) except SQLAlchemyError as ex: raise SynergyError(ex.message) finally: diff --git a/synergy_scheduler_manager/common/request.py b/synergy_scheduler_manager/common/request.py index 9597bd0..79b2a03 100644 --- a/synergy_scheduler_manager/common/request.py +++ b/synergy_scheduler_manager/common/request.py @@ -1,5 +1,3 @@ -import utils - from datetime import datetime from flavor import Flavor from server import Server @@ -109,37 +107,19 @@ class Request(object): server.setProjectId(instance_data["project_id"]) server.setCreatedAt(instance_data["created_at"]) server.setMetadata(instance_data["metadata"]) + server.setUserData(instance_data["user_data"]) server.setKeyName(instance_data["key_name"]) + server.setType() - user_data = instance_data.get("user_data", None) - if user_data: - try: - data = utils.decodeBase64(user_data) - quota = utils.getConfigParameter(data, "quota", "synergy") - if not quota: - quota = utils.getConfigParameter(data, "quota") - - metadata = instance_data.get("metadata", {}) - - if quota is None or quota == "private" or quota != "shared": - server.setType("permanent") - metadata["quota"] = "private" - - elif quota == "shared": - server.setType("ephemeral") - metadata["quota"] = "shared" - except Exception: - server.setType("permanent") - metadata["quota"] = "private" request.server = server if "filter_properties" in request.data: filter_properties = request.data["filter_properties"] - request.retry = filter_properties["retry"] + request.retry = filter_properties.get("retry", {}) else: request_spec = request.data["request_specs"][0] nova_object = request_spec["nova_object.data"] - request.retry = nova_object["retry"] + request.retry = nova_object.get("retry", {}) if not request.retry: request.retry = {} diff --git a/synergy_scheduler_manager/common/server.py b/synergy_scheduler_manager/common/server.py index 301628a..34778bd 100644 --- a/synergy_scheduler_manager/common/server.py +++ b/synergy_scheduler_manager/common/server.py @@ -1,3 +1,5 @@ +import logging +import re import utils from datetime import datetime @@ -23,6 +25,9 @@ See the License for the specific language governing permissions and limitations under the License.""" +LOG = logging.getLogger(__name__) + + class Server(SynergyObject): def __init__(self): @@ -45,8 +50,35 @@ class Server(SynergyObject): def getType(self): return self.get("type") - def setType(self, type): - self.set("type", type) + def setType(self, type=None): + if type: + self.set("type", type) + return + + metadata = self.get("metadata") + userdata = self.get("userdata") + + if "quota" in metadata: + if metadata["quota"] == "shared": + self.set("type", "ephemeral") + + elif userdata: + try: + data = userdata.splitlines() + keyValRegEx = re.compile(r'^\s*(quota)\s*=\s*(shared)\s*$') + + for row in data: + result = keyValRegEx.search(row) + if result: + self.set("type", "ephemeral") + break + except Exception as ex: + LOG.error(ex) + + if self.isPermanent(): + metadata["quota"] = "private" + else: + metadata["quota"] = "shared" def getState(self): return self.get("state") @@ -78,30 +110,11 @@ class Server(SynergyObject): def setMetadata(self, metadata): self.set("metadata", metadata) - if "quota" in metadata: - if metadata["quota"] == "shared": - self.setType("ephemeral") - else: - self.setType("permanent") - def getUserData(self): return self.get("userdata") def setUserData(self, userdata): - self.set("userdata", userdata) - - if userdata: - try: - quota = utils.getConfigParameter(userdata, "quota", "synergy") - - if quota is None or quota == "private": - self.setType("permanent") - elif quota == "shared": - self.setType("ephemeral") - else: - self.setType("permanent") - except Exception: - self.setType("permanent") + self.set("userdata", utils.decodeBase64(userdata)) def getUserId(self): return self.get("user_id") diff --git a/synergy_scheduler_manager/fairshare_manager.py b/synergy_scheduler_manager/fairshare_manager.py index 4f7149f..7a0da40 100644 --- a/synergy_scheduler_manager/fairshare_manager.py +++ b/synergy_scheduler_manager/fairshare_manager.py @@ -79,35 +79,6 @@ class FairShareManager(Manager): def destroy(self): pass - def calculatePriority(self, user_id, prj_id, timestamp=None, retry=0): - project = self.project_manager.getProject(id=prj_id) - - if not project: - raise SynergyError("project=%s not found!" % prj_id) - - user = project.getUser(id=user_id) - - if not user: - raise SynergyError("user=%s not found!" % user_id) - - priority = user.getPriority() - fairshare_vcpus = priority.getFairShare("vcpus") - fairshare_memory = priority.getFairShare("memory") - - if not timestamp: - timestamp = datetime.utcnow() - - now = datetime.utcnow() - - diff = (now - timestamp) - minutes = diff.seconds / 60 - priority = (float(self.age_weight) * minutes + - float(self.vcpus_weight) * fairshare_vcpus + - float(self.memory_weight) * fairshare_memory - - float(self.age_weight) * retry) - - return int(priority) - def doOnEvent(self, event_type, *args, **kwargs): if event_type == "USER_ADDED": user = kwargs.get("user", None) @@ -304,3 +275,5 @@ class FairShareManager(Manager): usr_priority.setValue(float(self.vcpus_weight) * f_vcpus + float(self.memory_weight) * f_memory) + + self.notify(event_type="USER_PRIORITY_UPDATED", user=user) diff --git a/synergy_scheduler_manager/nova_manager.py b/synergy_scheduler_manager/nova_manager.py index aeb2512..dc9d175 100644 --- a/synergy_scheduler_manager/nova_manager.py +++ b/synergy_scheduler_manager/nova_manager.py @@ -1,4 +1,3 @@ -import common.utils as utils import eventlet import hashlib import hmac @@ -68,6 +67,7 @@ class ServerEventHandler(object): server.setMetadata(server_info["metadata"]) server.setDeletedAt(server_info["deleted_at"]) server.setTerminatedAt(server_info["terminated_at"]) + server.setType() if "host" in server_info: server.setHost(server_info["host"]) @@ -548,6 +548,9 @@ class NovaManager(Manager): server.setName(server_data["name"]) server.setKeyName(server_data["key_name"]) server.setMetadata(server_data["metadata"]) + server.setUserData(server_data.get("OS-EXT-SRV-ATTR:user_data", + None)) + server.setType() server.setState(server_data["OS-EXT-STS:vm_state"]) server.setUserId(server_data["user_id"]) server.setProjectId(server_data["tenant_id"]) @@ -558,10 +561,6 @@ class NovaManager(Manager): server.setTerminatedAt( server_data.get("OS-SRV-USG:terminated_at", None)) - if "user_data" in server_data: - user_data = server_data["user_data"] - server.setUserData(utils.decodeBase64(user_data)) - if detail: server.setFlavor(self.getFlavor( server_data["flavor"]["id"])) @@ -587,6 +586,9 @@ class NovaManager(Manager): server.setName(server_data["name"]) server.setKeyName(server_data["key_name"]) server.setMetadata(server_data["metadata"]) + server.setUserData(server_data.get("OS-EXT-SRV-ATTR:user_data", + None)) + server.setType() server.setState(server_data["OS-EXT-STS:vm_state"]) server.setUserId(server_data["user_id"]) server.setProjectId(server_data["tenant_id"]) @@ -597,10 +599,6 @@ class NovaManager(Manager): server.setTerminatedAt( server_data.get("OS-SRV-USG:terminated_at", None)) - if "user_data" in server_data: - user_data = server_data["user_data"] - server.setUserData(utils.decodeBase64(user_data)) - if detail: server.setFlavor(self.getFlavor(server_data["flavor"]["id"])) @@ -978,9 +976,9 @@ a.launched_at<='%(to_date)s' and (a.terminated_at>='%(from_date)s' or \ try: # retrieve the amount of resources in terms of cores and memory QUERY = """select a.uuid, a.vcpus, a.memory_mb, a.root_gb, \ -a.vm_state from nova.instances as a WHERE a.project_id='%(project_id)s' \ -and a.vm_state in ('active', 'building', 'error') and a.deleted_at is NULL \ -and a.terminated_at is NULL""" % {"project_id": prj_id} +a.vm_state, a.user_data from nova.instances as a WHERE a.project_id=\ +'%(project_id)s'and a.vm_state in ('active', 'building', 'error') and \ +a.deleted_at is NULL and a.terminated_at is NULL""" % {"project_id": prj_id} LOG.debug("getProjectServers query: %s" % QUERY) @@ -995,6 +993,7 @@ and a.terminated_at is NULL""" % {"project_id": prj_id} server = Server() server.setId(row[0]) server.setState(row[4]) + server.setUserData(row[5]) server.setFlavor(flavor) QUERY = """select `key`, value from nova.instance_metadata \ @@ -1009,6 +1008,7 @@ where instance_uuid='%(id)s' and deleted_at is NULL""" % {"id": server.getId()} metadata[row[0]] = row[1] server.setMetadata(metadata) + server.setType() servers.append(server) except SQLAlchemyError as ex: @@ -1031,7 +1031,7 @@ where instance_uuid='%(id)s' and deleted_at is NULL""" % {"id": server.getId()} ids = "uuid in ('%s') and " % "', '".join(server_ids) QUERY = """select uuid, vcpus, memory_mb, root_gb, \ -vm_state from nova.instances where project_id = \ +vm_state, user_data from nova.instances where project_id = \ '%(project_id)s' and deleted_at is NULL and (vm_state='error' or \ (%(server_ids)s vm_state='active' and terminated_at is NULL \ and timestampdiff(minute, launched_at, utc_timestamp()) >= %(expiration)s))\ @@ -1050,6 +1050,7 @@ and timestampdiff(minute, launched_at, utc_timestamp()) >= %(expiration)s))\ server = Server() server.setId(row[0]) server.setState(row[4]) + server.setUserData(row[5]) server.setFlavor(flavor) QUERY = """select `key`, value from nova.instance_metadata \ @@ -1064,6 +1065,7 @@ where instance_uuid='%(id)s' and deleted_at is NULL""" % {"id": server.getId()} metadata[row[0]] = row[1] server.setMetadata(metadata) + server.setType() servers.append(server) except SQLAlchemyError as ex: diff --git a/synergy_scheduler_manager/quota_manager.py b/synergy_scheduler_manager/quota_manager.py index dd8124d..0051a58 100644 --- a/synergy_scheduler_manager/quota_manager.py +++ b/synergy_scheduler_manager/quota_manager.py @@ -212,7 +212,7 @@ class QuotaManager(Manager): % (uuid, TTL, state, prj_id)) self.nova_manager.deleteServer(server) - except SynergyError as ex: + except Exception as ex: LOG.error(ex) def updateSharedQuota(self): diff --git a/synergy_scheduler_manager/scheduler_manager.py b/synergy_scheduler_manager/scheduler_manager.py index 8f807f6..eb15e2d 100644 --- a/synergy_scheduler_manager/scheduler_manager.py +++ b/synergy_scheduler_manager/scheduler_manager.py @@ -67,29 +67,29 @@ class Worker(Thread): last_release_time = SharedQuota.getLastReleaseTime() while not self.exit and not self.queue.isClosed(): - if last_release_time < SharedQuota.getLastReleaseTime(): - last_release_time = SharedQuota.getLastReleaseTime() + try: + if last_release_time < SharedQuota.getLastReleaseTime(): + last_release_time = SharedQuota.getLastReleaseTime() - while queue_items: - self.queue.restore(queue_items.pop(0)) + while queue_items: + self.queue.restore(queue_items.pop(0)) - if len(queue_items) >= self.backfill_depth: - SharedQuota.wait() - continue - - queue_item = self.queue.dequeue(block=False) - - if queue_item is None: - if self.queue.getSize(): + if len(queue_items) >= self.backfill_depth: SharedQuota.wait() continue - else: - queue_item = self.queue.dequeue(block=True) - if queue_item is None: - continue + queue_item = self.queue.dequeue(block=False) + + if queue_item is None: + if self.queue.getSize(): + SharedQuota.wait() + continue + else: + queue_item = self.queue.dequeue(block=True) + + if queue_item is None: + continue - try: request = Request.fromDict(queue_item.getData()) user_id = request.getUserId() prj_id = request.getProjectId() @@ -145,7 +145,7 @@ class Worker(Thread): "ta=shared" % (server_id, user_id, prj_id)) found = True - except SynergyError as ex: + except Exception as ex: LOG.error("error on building the server %s (reason=%s)" % (server.getId(), ex)) @@ -157,7 +157,7 @@ class Worker(Thread): else: queue_items.append(queue_item) - except SynergyError as ex: + except Exception as ex: LOG.error("Exception has occured", exc_info=1) LOG.error("Worker %s: %s" % (self.name, ex)) @@ -204,6 +204,7 @@ class SchedulerManager(Manager): self.backfill_depth = CONF.SchedulerManager.backfill_depth self.exit = False self.configured = False + self.queue = None def execute(self, command, *args, **kargs): raise SynergyError("command %r not supported!" % command) @@ -251,7 +252,9 @@ class SchedulerManager(Manager): self._processServerEvent(server, event, state) elif event_type == "SERVER_CREATE": + self._processServerCreate(kwargs["request"]) + elif event_type == "PROJECT_ADDED": if not self.configured: return @@ -261,6 +264,10 @@ class SchedulerManager(Manager): if self.queue and project: project.setQueue(self.queue) + elif event_type == "USER_PRIORITY_UPDATED": + if self.queue: + self.queue.updatePriority(kwargs.get("user", None)) + def _processServerEvent(self, server, event, state): project = self.project_manager.getProject(id=server.getProjectId()) @@ -279,7 +286,6 @@ class SchedulerManager(Manager): self.nova_manager.setServerMetadata(server, "expiration_time", expiration) - else: quota = project.getQuota() @@ -353,12 +359,6 @@ class SchedulerManager(Manager): request.getUserId(), request.getProjectId())) else: - priority = self.fairshare_manager.calculatePriority( - user_id=request.getUserId(), - prj_id=request.getProjectId(), - timestamp=request.getCreatedAt(), - retry=num_attempts) - context = request.getContext() km = self.keystone_manager @@ -381,8 +381,9 @@ class SchedulerManager(Manager): context["trust_id"] = trust.getId() user = project.getUser(id=request.getUserId()) + priority = user.getPriority().getValue() - self.queue.enqueue(user, request.toDict(), priority) + self.queue.enqueue(user, request.toDict()) LOG.info("new request: id=%s user_id=%s prj_id=%s priority" "=%s quota=shared" % (request.getId(), diff --git a/synergy_scheduler_manager/tests/functional/test_fairshare_manager.py b/synergy_scheduler_manager/tests/functional/test_fairshare_manager.py index c7a297f..d42f8a3 100755 --- a/synergy_scheduler_manager/tests/functional/test_fairshare_manager.py +++ b/synergy_scheduler_manager/tests/functional/test_fairshare_manager.py @@ -10,12 +10,9 @@ # License for the specific language governing permissions and limitations # under the License. -from datetime import datetime from mock import MagicMock from mock import patch -from synergy_scheduler_manager.common.project import Project -from synergy_scheduler_manager.common.user import User from synergy_scheduler_manager.fairshare_manager import FairShareManager from synergy_scheduler_manager.project_manager import ProjectManager from synergy_scheduler_manager.tests.unit import base @@ -41,46 +38,6 @@ class TestFairshareManager(base.TestCase): with patch('synergy_scheduler_manager.fairshare_manager.CONF'): self.fairshare_manager.setup() - def test_calculate_priority_one_user(self): - # self.fairshare_manager.addProject(prj_id=1, prj_name="test") - project = Project() - project.setId(1) - project.setName("test_project") - - # Define values used for computing the priority - age_weight = self.fairshare_manager.age_weight = 1.0 - vcpus_weight = self.fairshare_manager.vcpus_weight = 2.0 - memory_weight = self.fairshare_manager.memory_weight = 3.0 - datetime_start = datetime(year=2000, month=1, day=1, hour=0, minute=0) - datetime_stop = datetime(year=2000, month=1, day=1, hour=2, minute=0) - minutes = (datetime_stop - datetime_start).seconds / 60 - fairshare_cores = 10 - fairshare_ram = 50 - - # Add a user to the project - user = User() - user.setId(22) - user.setName("test_user") - priority = user.getPriority() - priority.setFairShare('vcpus', fairshare_cores) - priority.setFairShare('memory', fairshare_ram) - - project.addUser(user) - self.project_manager.projects[project.getId()] = project - - # Compute the expected priority given the previously defined values - expected_priority = int(age_weight * minutes + - vcpus_weight * fairshare_cores + - memory_weight * fairshare_ram) - - with patch("synergy_scheduler_manager.fairshare_manager.datetime") \ - as datetime_mock: - datetime_mock.utcnow.side_effect = (datetime_start, datetime_stop) - priority = self.fairshare_manager.calculatePriority( - user.getId(), project.getId()) - - self.assertEqual(expected_priority, priority) - def test_calculate_fairshare(self): # TODO(vincent) pass diff --git a/synergy_scheduler_manager/tests/unit/test_queue.py b/synergy_scheduler_manager/tests/unit/test_queue.py index 9b0da9e..737b87f 100644 --- a/synergy_scheduler_manager/tests/unit/test_queue.py +++ b/synergy_scheduler_manager/tests/unit/test_queue.py @@ -115,8 +115,9 @@ class TestQueue(base.TestCase): user = User() user.setId(2) user.setProjectId(100) + user.getPriority().setValue(10) - self.queue.enqueue(user=user, data="mydata", priority=10) + self.queue.enqueue(user=user, data="mydata") self.assertEqual(1, self.queue.getSize()) @@ -125,8 +126,10 @@ class TestQueue(base.TestCase): user = User() user.setId(2) user.setProjectId(100) + user.getPriority().setValue(10) data = json.dumps("mydata") - self.queue.enqueue(user=user, data=data, priority=10) + + self.queue.enqueue(user=user, data=data) self.assertEqual(1, self.queue.getSize()) @@ -148,9 +151,10 @@ class TestQueue(base.TestCase): user = User() user.setId(2) user.setProjectId(100) + user.getPriority().setValue(10) data = json.dumps("mydata") - self.queue.enqueue(user=user, data=data, priority=10) + self.queue.enqueue(user=user, data=data) # Mock the DB execute_mock = self.db_engine_mock.connect().execute