diff --git a/synergy_scheduler_manager/common/queue.py b/synergy_scheduler_manager/common/queue.py index 4ae8a7c..60befab 100644 --- a/synergy_scheduler_manager/common/queue.py +++ b/synergy_scheduler_manager/common/queue.py @@ -1,5 +1,6 @@ +import heapq import json -import Queue as queue +import threading from datetime import datetime from sqlalchemy.exc import SQLAlchemyError @@ -95,13 +96,7 @@ class Queue(SynergyObject): def __init__(self, name="default", type="PRIORITY", db_engine=None): super(Queue, self).__init__() - if type == "FIFO": - self.queue = queue.Queue() - elif type == "LIFO": - self.queue = queue.LifoQueue() - elif type == "PRIORITY": - self.queue = queue.PriorityQueue() - else: + if type not in ["FIFO", "LIFO", "PRIORITY"]: raise SynergyError("queue type %r not supported" % type) self.set("type", type) @@ -109,11 +104,13 @@ class Queue(SynergyObject): self.set("size", 0) self.setName(name) self.db_engine = db_engine + self._items = [] + self.condition = threading.Condition() self._createTable() self._buildFromDB() - def _setSize(self, value): + def _incSize(self, value): size = self.get("size") self.set("size", size + value) @@ -127,12 +124,12 @@ class Queue(SynergyObject): self.set("is_closed", is_closed) def isEmpty(self): - return self.queue.empty() + return len(self._items) == 0 def close(self): self.setClosed(True) - def enqueue(self, user, data, priority=0): + def enqueue(self, user, data): if self.isClosed(): raise SynergyError("the queue is closed!") @@ -145,33 +142,47 @@ class Queue(SynergyObject): item = QueueItem() item.setUserId(user.getId()) item.setProjectId(user.getProjectId()) - item.setPriority(priority) + item.setPriority(user.getPriority().getValue()) item.setData(data) - self._insertItemDB(item) + with self.condition: + if self.getType() == "FIFO": + self._items.append(item) + elif self.getType() == "LIFO": + self._items.insert(0, item) + elif self.getType() == "PRIORITY": + heapq.heappush(self._items, (-item.getPriority(), + item.getCreationTime(), item)) - if self.getType() == "PRIORITY": - self.queue.put((-priority, item.getCreationTime(), item)) - else: - self.queue.put(item) - - self._setSize(1) + self._insertItemDB(item) + self._incSize(1) + self.condition.notifyAll() def dequeue(self, block=True, timeout=None, delete=False): if self.isClosed(): raise SynergyError("the queue is closed!") - if self.queue.empty() and not block: - return None + item = None - item = self.queue.get(block=block, timeout=timeout) + with self.condition: + while (item is None and not self.isClosed()): + if not self._items: + if block: + self.condition.wait(timeout) + if timeout: + break + else: + break + elif self.getType() == "PRIORITY": + item = heapq.heappop(self._items)[2] + else: + item = self._items.pop(0) + + self.condition.notifyAll() if not item: return None - if self.getType() == "PRIORITY": - item = item[2] - self._getItemDataDB(item) if delete: @@ -183,12 +194,44 @@ class Queue(SynergyObject): if self.isClosed(): raise SynergyError("the queue is closed!") - if self.getType() == "PRIORITY": - self.queue.put((-item.getPriority(), item.getCreationTime(), item)) - else: - self.queue.put(item) + with self.condition: + if self.getType() == "FIFO": + self._items.append(item) + elif self.getType() == "LIFO": + self._items.insert(0, item) + elif self.getType() == "PRIORITY": + heapq.heappush(self._items, (-item.getPriority(), + item.getCreationTime(), item)) - self._updateItemDB(item) + self._updateItemDB(item) + self._incSize(1) + self.condition.notifyAll() + + def updatePriority(self, user): + if self.isClosed(): + raise SynergyError("the queue is closed!") + + if self.getType() != "PRIORITY": + raise SynergyError("updatePriority() cannot be applied on this " + "queue type") + + new_items = [] + + with self.condition: + while self._items: + item = heapq.heappop(self._items)[2] + + if item.getUserId() == user.getId() and\ + item.getProjectId() == user.getProjectId(): + item.setPriority(user.getPriority().getValue()) + new_items.append(item) + + for item in new_items: + heapq.heappush(self._items, (-item.getPriority(), + item.getCreationTime(), item)) + + self._incSize(1) + self.condition.notifyAll() def getType(self): return self.get("type") @@ -230,15 +273,13 @@ class Queue(SynergyObject): connection.execute(QUERY, [item.getId()]) trans.commit() + self._incSize(-1) except SQLAlchemyError as ex: trans.rollback() raise SynergyError(ex.message) finally: connection.close() - self._setSize(-1) - self.queue.task_done() - def _createTable(self): if not self.db_engine: return @@ -282,7 +323,7 @@ TIMESTAMP NULL, `data` TEXT NOT NULL) ENGINE=InnoDB""" % self.getName() item.setLastUpdate(row[6]) self.restore(item) - self._setSize(1) + self._incSize(1) except SQLAlchemyError as ex: raise SynergyError(ex.message) finally: diff --git a/synergy_scheduler_manager/common/request.py b/synergy_scheduler_manager/common/request.py index 9597bd0..79b2a03 100644 --- a/synergy_scheduler_manager/common/request.py +++ b/synergy_scheduler_manager/common/request.py @@ -1,5 +1,3 @@ -import utils - from datetime import datetime from flavor import Flavor from server import Server @@ -109,37 +107,19 @@ class Request(object): server.setProjectId(instance_data["project_id"]) server.setCreatedAt(instance_data["created_at"]) server.setMetadata(instance_data["metadata"]) + server.setUserData(instance_data["user_data"]) server.setKeyName(instance_data["key_name"]) + server.setType() - user_data = instance_data.get("user_data", None) - if user_data: - try: - data = utils.decodeBase64(user_data) - quota = utils.getConfigParameter(data, "quota", "synergy") - if not quota: - quota = utils.getConfigParameter(data, "quota") - - metadata = instance_data.get("metadata", {}) - - if quota is None or quota == "private" or quota != "shared": - server.setType("permanent") - metadata["quota"] = "private" - - elif quota == "shared": - server.setType("ephemeral") - metadata["quota"] = "shared" - except Exception: - server.setType("permanent") - metadata["quota"] = "private" request.server = server if "filter_properties" in request.data: filter_properties = request.data["filter_properties"] - request.retry = filter_properties["retry"] + request.retry = filter_properties.get("retry", {}) else: request_spec = request.data["request_specs"][0] nova_object = request_spec["nova_object.data"] - request.retry = nova_object["retry"] + request.retry = nova_object.get("retry", {}) if not request.retry: request.retry = {} diff --git a/synergy_scheduler_manager/common/server.py b/synergy_scheduler_manager/common/server.py index 301628a..34778bd 100644 --- a/synergy_scheduler_manager/common/server.py +++ b/synergy_scheduler_manager/common/server.py @@ -1,3 +1,5 @@ +import logging +import re import utils from datetime import datetime @@ -23,6 +25,9 @@ See the License for the specific language governing permissions and limitations under the License.""" +LOG = logging.getLogger(__name__) + + class Server(SynergyObject): def __init__(self): @@ -45,8 +50,35 @@ class Server(SynergyObject): def getType(self): return self.get("type") - def setType(self, type): - self.set("type", type) + def setType(self, type=None): + if type: + self.set("type", type) + return + + metadata = self.get("metadata") + userdata = self.get("userdata") + + if "quota" in metadata: + if metadata["quota"] == "shared": + self.set("type", "ephemeral") + + elif userdata: + try: + data = userdata.splitlines() + keyValRegEx = re.compile(r'^\s*(quota)\s*=\s*(shared)\s*$') + + for row in data: + result = keyValRegEx.search(row) + if result: + self.set("type", "ephemeral") + break + except Exception as ex: + LOG.error(ex) + + if self.isPermanent(): + metadata["quota"] = "private" + else: + metadata["quota"] = "shared" def getState(self): return self.get("state") @@ -78,30 +110,11 @@ class Server(SynergyObject): def setMetadata(self, metadata): self.set("metadata", metadata) - if "quota" in metadata: - if metadata["quota"] == "shared": - self.setType("ephemeral") - else: - self.setType("permanent") - def getUserData(self): return self.get("userdata") def setUserData(self, userdata): - self.set("userdata", userdata) - - if userdata: - try: - quota = utils.getConfigParameter(userdata, "quota", "synergy") - - if quota is None or quota == "private": - self.setType("permanent") - elif quota == "shared": - self.setType("ephemeral") - else: - self.setType("permanent") - except Exception: - self.setType("permanent") + self.set("userdata", utils.decodeBase64(userdata)) def getUserId(self): return self.get("user_id") diff --git a/synergy_scheduler_manager/fairshare_manager.py b/synergy_scheduler_manager/fairshare_manager.py index 4f7149f..7a0da40 100644 --- a/synergy_scheduler_manager/fairshare_manager.py +++ b/synergy_scheduler_manager/fairshare_manager.py @@ -79,35 +79,6 @@ class FairShareManager(Manager): def destroy(self): pass - def calculatePriority(self, user_id, prj_id, timestamp=None, retry=0): - project = self.project_manager.getProject(id=prj_id) - - if not project: - raise SynergyError("project=%s not found!" % prj_id) - - user = project.getUser(id=user_id) - - if not user: - raise SynergyError("user=%s not found!" % user_id) - - priority = user.getPriority() - fairshare_vcpus = priority.getFairShare("vcpus") - fairshare_memory = priority.getFairShare("memory") - - if not timestamp: - timestamp = datetime.utcnow() - - now = datetime.utcnow() - - diff = (now - timestamp) - minutes = diff.seconds / 60 - priority = (float(self.age_weight) * minutes + - float(self.vcpus_weight) * fairshare_vcpus + - float(self.memory_weight) * fairshare_memory - - float(self.age_weight) * retry) - - return int(priority) - def doOnEvent(self, event_type, *args, **kwargs): if event_type == "USER_ADDED": user = kwargs.get("user", None) @@ -304,3 +275,5 @@ class FairShareManager(Manager): usr_priority.setValue(float(self.vcpus_weight) * f_vcpus + float(self.memory_weight) * f_memory) + + self.notify(event_type="USER_PRIORITY_UPDATED", user=user) diff --git a/synergy_scheduler_manager/nova_manager.py b/synergy_scheduler_manager/nova_manager.py index aeb2512..dc9d175 100644 --- a/synergy_scheduler_manager/nova_manager.py +++ b/synergy_scheduler_manager/nova_manager.py @@ -1,4 +1,3 @@ -import common.utils as utils import eventlet import hashlib import hmac @@ -68,6 +67,7 @@ class ServerEventHandler(object): server.setMetadata(server_info["metadata"]) server.setDeletedAt(server_info["deleted_at"]) server.setTerminatedAt(server_info["terminated_at"]) + server.setType() if "host" in server_info: server.setHost(server_info["host"]) @@ -548,6 +548,9 @@ class NovaManager(Manager): server.setName(server_data["name"]) server.setKeyName(server_data["key_name"]) server.setMetadata(server_data["metadata"]) + server.setUserData(server_data.get("OS-EXT-SRV-ATTR:user_data", + None)) + server.setType() server.setState(server_data["OS-EXT-STS:vm_state"]) server.setUserId(server_data["user_id"]) server.setProjectId(server_data["tenant_id"]) @@ -558,10 +561,6 @@ class NovaManager(Manager): server.setTerminatedAt( server_data.get("OS-SRV-USG:terminated_at", None)) - if "user_data" in server_data: - user_data = server_data["user_data"] - server.setUserData(utils.decodeBase64(user_data)) - if detail: server.setFlavor(self.getFlavor( server_data["flavor"]["id"])) @@ -587,6 +586,9 @@ class NovaManager(Manager): server.setName(server_data["name"]) server.setKeyName(server_data["key_name"]) server.setMetadata(server_data["metadata"]) + server.setUserData(server_data.get("OS-EXT-SRV-ATTR:user_data", + None)) + server.setType() server.setState(server_data["OS-EXT-STS:vm_state"]) server.setUserId(server_data["user_id"]) server.setProjectId(server_data["tenant_id"]) @@ -597,10 +599,6 @@ class NovaManager(Manager): server.setTerminatedAt( server_data.get("OS-SRV-USG:terminated_at", None)) - if "user_data" in server_data: - user_data = server_data["user_data"] - server.setUserData(utils.decodeBase64(user_data)) - if detail: server.setFlavor(self.getFlavor(server_data["flavor"]["id"])) @@ -978,9 +976,9 @@ a.launched_at<='%(to_date)s' and (a.terminated_at>='%(from_date)s' or \ try: # retrieve the amount of resources in terms of cores and memory QUERY = """select a.uuid, a.vcpus, a.memory_mb, a.root_gb, \ -a.vm_state from nova.instances as a WHERE a.project_id='%(project_id)s' \ -and a.vm_state in ('active', 'building', 'error') and a.deleted_at is NULL \ -and a.terminated_at is NULL""" % {"project_id": prj_id} +a.vm_state, a.user_data from nova.instances as a WHERE a.project_id=\ +'%(project_id)s'and a.vm_state in ('active', 'building', 'error') and \ +a.deleted_at is NULL and a.terminated_at is NULL""" % {"project_id": prj_id} LOG.debug("getProjectServers query: %s" % QUERY) @@ -995,6 +993,7 @@ and a.terminated_at is NULL""" % {"project_id": prj_id} server = Server() server.setId(row[0]) server.setState(row[4]) + server.setUserData(row[5]) server.setFlavor(flavor) QUERY = """select `key`, value from nova.instance_metadata \ @@ -1009,6 +1008,7 @@ where instance_uuid='%(id)s' and deleted_at is NULL""" % {"id": server.getId()} metadata[row[0]] = row[1] server.setMetadata(metadata) + server.setType() servers.append(server) except SQLAlchemyError as ex: @@ -1031,7 +1031,7 @@ where instance_uuid='%(id)s' and deleted_at is NULL""" % {"id": server.getId()} ids = "uuid in ('%s') and " % "', '".join(server_ids) QUERY = """select uuid, vcpus, memory_mb, root_gb, \ -vm_state from nova.instances where project_id = \ +vm_state, user_data from nova.instances where project_id = \ '%(project_id)s' and deleted_at is NULL and (vm_state='error' or \ (%(server_ids)s vm_state='active' and terminated_at is NULL \ and timestampdiff(minute, launched_at, utc_timestamp()) >= %(expiration)s))\ @@ -1050,6 +1050,7 @@ and timestampdiff(minute, launched_at, utc_timestamp()) >= %(expiration)s))\ server = Server() server.setId(row[0]) server.setState(row[4]) + server.setUserData(row[5]) server.setFlavor(flavor) QUERY = """select `key`, value from nova.instance_metadata \ @@ -1064,6 +1065,7 @@ where instance_uuid='%(id)s' and deleted_at is NULL""" % {"id": server.getId()} metadata[row[0]] = row[1] server.setMetadata(metadata) + server.setType() servers.append(server) except SQLAlchemyError as ex: diff --git a/synergy_scheduler_manager/quota_manager.py b/synergy_scheduler_manager/quota_manager.py index dd8124d..0051a58 100644 --- a/synergy_scheduler_manager/quota_manager.py +++ b/synergy_scheduler_manager/quota_manager.py @@ -212,7 +212,7 @@ class QuotaManager(Manager): % (uuid, TTL, state, prj_id)) self.nova_manager.deleteServer(server) - except SynergyError as ex: + except Exception as ex: LOG.error(ex) def updateSharedQuota(self): diff --git a/synergy_scheduler_manager/scheduler_manager.py b/synergy_scheduler_manager/scheduler_manager.py index 8f807f6..eb15e2d 100644 --- a/synergy_scheduler_manager/scheduler_manager.py +++ b/synergy_scheduler_manager/scheduler_manager.py @@ -67,29 +67,29 @@ class Worker(Thread): last_release_time = SharedQuota.getLastReleaseTime() while not self.exit and not self.queue.isClosed(): - if last_release_time < SharedQuota.getLastReleaseTime(): - last_release_time = SharedQuota.getLastReleaseTime() + try: + if last_release_time < SharedQuota.getLastReleaseTime(): + last_release_time = SharedQuota.getLastReleaseTime() - while queue_items: - self.queue.restore(queue_items.pop(0)) + while queue_items: + self.queue.restore(queue_items.pop(0)) - if len(queue_items) >= self.backfill_depth: - SharedQuota.wait() - continue - - queue_item = self.queue.dequeue(block=False) - - if queue_item is None: - if self.queue.getSize(): + if len(queue_items) >= self.backfill_depth: SharedQuota.wait() continue - else: - queue_item = self.queue.dequeue(block=True) - if queue_item is None: - continue + queue_item = self.queue.dequeue(block=False) + + if queue_item is None: + if self.queue.getSize(): + SharedQuota.wait() + continue + else: + queue_item = self.queue.dequeue(block=True) + + if queue_item is None: + continue - try: request = Request.fromDict(queue_item.getData()) user_id = request.getUserId() prj_id = request.getProjectId() @@ -145,7 +145,7 @@ class Worker(Thread): "ta=shared" % (server_id, user_id, prj_id)) found = True - except SynergyError as ex: + except Exception as ex: LOG.error("error on building the server %s (reason=%s)" % (server.getId(), ex)) @@ -157,7 +157,7 @@ class Worker(Thread): else: queue_items.append(queue_item) - except SynergyError as ex: + except Exception as ex: LOG.error("Exception has occured", exc_info=1) LOG.error("Worker %s: %s" % (self.name, ex)) @@ -204,6 +204,7 @@ class SchedulerManager(Manager): self.backfill_depth = CONF.SchedulerManager.backfill_depth self.exit = False self.configured = False + self.queue = None def execute(self, command, *args, **kargs): raise SynergyError("command %r not supported!" % command) @@ -251,7 +252,9 @@ class SchedulerManager(Manager): self._processServerEvent(server, event, state) elif event_type == "SERVER_CREATE": + self._processServerCreate(kwargs["request"]) + elif event_type == "PROJECT_ADDED": if not self.configured: return @@ -261,6 +264,10 @@ class SchedulerManager(Manager): if self.queue and project: project.setQueue(self.queue) + elif event_type == "USER_PRIORITY_UPDATED": + if self.queue: + self.queue.updatePriority(kwargs.get("user", None)) + def _processServerEvent(self, server, event, state): project = self.project_manager.getProject(id=server.getProjectId()) @@ -279,7 +286,6 @@ class SchedulerManager(Manager): self.nova_manager.setServerMetadata(server, "expiration_time", expiration) - else: quota = project.getQuota() @@ -353,12 +359,6 @@ class SchedulerManager(Manager): request.getUserId(), request.getProjectId())) else: - priority = self.fairshare_manager.calculatePriority( - user_id=request.getUserId(), - prj_id=request.getProjectId(), - timestamp=request.getCreatedAt(), - retry=num_attempts) - context = request.getContext() km = self.keystone_manager @@ -381,8 +381,9 @@ class SchedulerManager(Manager): context["trust_id"] = trust.getId() user = project.getUser(id=request.getUserId()) + priority = user.getPriority().getValue() - self.queue.enqueue(user, request.toDict(), priority) + self.queue.enqueue(user, request.toDict()) LOG.info("new request: id=%s user_id=%s prj_id=%s priority" "=%s quota=shared" % (request.getId(), diff --git a/synergy_scheduler_manager/tests/functional/test_fairshare_manager.py b/synergy_scheduler_manager/tests/functional/test_fairshare_manager.py index c7a297f..d42f8a3 100755 --- a/synergy_scheduler_manager/tests/functional/test_fairshare_manager.py +++ b/synergy_scheduler_manager/tests/functional/test_fairshare_manager.py @@ -10,12 +10,9 @@ # License for the specific language governing permissions and limitations # under the License. -from datetime import datetime from mock import MagicMock from mock import patch -from synergy_scheduler_manager.common.project import Project -from synergy_scheduler_manager.common.user import User from synergy_scheduler_manager.fairshare_manager import FairShareManager from synergy_scheduler_manager.project_manager import ProjectManager from synergy_scheduler_manager.tests.unit import base @@ -41,46 +38,6 @@ class TestFairshareManager(base.TestCase): with patch('synergy_scheduler_manager.fairshare_manager.CONF'): self.fairshare_manager.setup() - def test_calculate_priority_one_user(self): - # self.fairshare_manager.addProject(prj_id=1, prj_name="test") - project = Project() - project.setId(1) - project.setName("test_project") - - # Define values used for computing the priority - age_weight = self.fairshare_manager.age_weight = 1.0 - vcpus_weight = self.fairshare_manager.vcpus_weight = 2.0 - memory_weight = self.fairshare_manager.memory_weight = 3.0 - datetime_start = datetime(year=2000, month=1, day=1, hour=0, minute=0) - datetime_stop = datetime(year=2000, month=1, day=1, hour=2, minute=0) - minutes = (datetime_stop - datetime_start).seconds / 60 - fairshare_cores = 10 - fairshare_ram = 50 - - # Add a user to the project - user = User() - user.setId(22) - user.setName("test_user") - priority = user.getPriority() - priority.setFairShare('vcpus', fairshare_cores) - priority.setFairShare('memory', fairshare_ram) - - project.addUser(user) - self.project_manager.projects[project.getId()] = project - - # Compute the expected priority given the previously defined values - expected_priority = int(age_weight * minutes + - vcpus_weight * fairshare_cores + - memory_weight * fairshare_ram) - - with patch("synergy_scheduler_manager.fairshare_manager.datetime") \ - as datetime_mock: - datetime_mock.utcnow.side_effect = (datetime_start, datetime_stop) - priority = self.fairshare_manager.calculatePriority( - user.getId(), project.getId()) - - self.assertEqual(expected_priority, priority) - def test_calculate_fairshare(self): # TODO(vincent) pass diff --git a/synergy_scheduler_manager/tests/unit/test_queue.py b/synergy_scheduler_manager/tests/unit/test_queue.py index 9b0da9e..737b87f 100644 --- a/synergy_scheduler_manager/tests/unit/test_queue.py +++ b/synergy_scheduler_manager/tests/unit/test_queue.py @@ -115,8 +115,9 @@ class TestQueue(base.TestCase): user = User() user.setId(2) user.setProjectId(100) + user.getPriority().setValue(10) - self.queue.enqueue(user=user, data="mydata", priority=10) + self.queue.enqueue(user=user, data="mydata") self.assertEqual(1, self.queue.getSize()) @@ -125,8 +126,10 @@ class TestQueue(base.TestCase): user = User() user.setId(2) user.setProjectId(100) + user.getPriority().setValue(10) data = json.dumps("mydata") - self.queue.enqueue(user=user, data=data, priority=10) + + self.queue.enqueue(user=user, data=data) self.assertEqual(1, self.queue.getSize()) @@ -148,9 +151,10 @@ class TestQueue(base.TestCase): user = User() user.setId(2) user.setProjectId(100) + user.getPriority().setValue(10) data = json.dumps("mydata") - self.queue.enqueue(user=user, data=data, priority=10) + self.queue.enqueue(user=user, data=data) # Mock the DB execute_mock = self.db_engine_mock.connect().execute