Synergy should scale up the oldest user requests from the queue
The oldest requests should be processed quite first as soon as the user priority increases. This commit fixes the issue. BUG: #1717464 Change-Id: Iafc74531775d00aeb653cc92092b2bc7775f52d8 Sem-Ver: bugfix
This commit is contained in:
parent
a45e054ec9
commit
56c8d155ac
@ -1,5 +1,6 @@
|
||||
import heapq
|
||||
import json
|
||||
import Queue as queue
|
||||
import threading
|
||||
|
||||
from datetime import datetime
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
@ -95,13 +96,7 @@ class Queue(SynergyObject):
|
||||
def __init__(self, name="default", type="PRIORITY", db_engine=None):
|
||||
super(Queue, self).__init__()
|
||||
|
||||
if type == "FIFO":
|
||||
self.queue = queue.Queue()
|
||||
elif type == "LIFO":
|
||||
self.queue = queue.LifoQueue()
|
||||
elif type == "PRIORITY":
|
||||
self.queue = queue.PriorityQueue()
|
||||
else:
|
||||
if type not in ["FIFO", "LIFO", "PRIORITY"]:
|
||||
raise SynergyError("queue type %r not supported" % type)
|
||||
|
||||
self.set("type", type)
|
||||
@ -109,11 +104,13 @@ class Queue(SynergyObject):
|
||||
self.set("size", 0)
|
||||
self.setName(name)
|
||||
self.db_engine = db_engine
|
||||
self._items = []
|
||||
self.condition = threading.Condition()
|
||||
|
||||
self._createTable()
|
||||
self._buildFromDB()
|
||||
|
||||
def _setSize(self, value):
|
||||
def _incSize(self, value):
|
||||
size = self.get("size")
|
||||
self.set("size", size + value)
|
||||
|
||||
@ -127,12 +124,12 @@ class Queue(SynergyObject):
|
||||
self.set("is_closed", is_closed)
|
||||
|
||||
def isEmpty(self):
|
||||
return self.queue.empty()
|
||||
return len(self._items) == 0
|
||||
|
||||
def close(self):
|
||||
self.setClosed(True)
|
||||
|
||||
def enqueue(self, user, data, priority=0):
|
||||
def enqueue(self, user, data):
|
||||
if self.isClosed():
|
||||
raise SynergyError("the queue is closed!")
|
||||
|
||||
@ -145,33 +142,47 @@ class Queue(SynergyObject):
|
||||
item = QueueItem()
|
||||
item.setUserId(user.getId())
|
||||
item.setProjectId(user.getProjectId())
|
||||
item.setPriority(priority)
|
||||
item.setPriority(user.getPriority().getValue())
|
||||
item.setData(data)
|
||||
|
||||
self._insertItemDB(item)
|
||||
with self.condition:
|
||||
if self.getType() == "FIFO":
|
||||
self._items.append(item)
|
||||
elif self.getType() == "LIFO":
|
||||
self._items.insert(0, item)
|
||||
elif self.getType() == "PRIORITY":
|
||||
heapq.heappush(self._items, (-item.getPriority(),
|
||||
item.getCreationTime(), item))
|
||||
|
||||
if self.getType() == "PRIORITY":
|
||||
self.queue.put((-priority, item.getCreationTime(), item))
|
||||
else:
|
||||
self.queue.put(item)
|
||||
|
||||
self._setSize(1)
|
||||
self._insertItemDB(item)
|
||||
self._incSize(1)
|
||||
self.condition.notifyAll()
|
||||
|
||||
def dequeue(self, block=True, timeout=None, delete=False):
|
||||
if self.isClosed():
|
||||
raise SynergyError("the queue is closed!")
|
||||
|
||||
if self.queue.empty() and not block:
|
||||
return None
|
||||
item = None
|
||||
|
||||
item = self.queue.get(block=block, timeout=timeout)
|
||||
with self.condition:
|
||||
while (item is None and not self.isClosed()):
|
||||
if not self._items:
|
||||
if block:
|
||||
self.condition.wait(timeout)
|
||||
if timeout:
|
||||
break
|
||||
else:
|
||||
break
|
||||
elif self.getType() == "PRIORITY":
|
||||
item = heapq.heappop(self._items)[2]
|
||||
else:
|
||||
item = self._items.pop(0)
|
||||
|
||||
self.condition.notifyAll()
|
||||
|
||||
if not item:
|
||||
return None
|
||||
|
||||
if self.getType() == "PRIORITY":
|
||||
item = item[2]
|
||||
|
||||
self._getItemDataDB(item)
|
||||
|
||||
if delete:
|
||||
@ -183,12 +194,44 @@ class Queue(SynergyObject):
|
||||
if self.isClosed():
|
||||
raise SynergyError("the queue is closed!")
|
||||
|
||||
if self.getType() == "PRIORITY":
|
||||
self.queue.put((-item.getPriority(), item.getCreationTime(), item))
|
||||
else:
|
||||
self.queue.put(item)
|
||||
with self.condition:
|
||||
if self.getType() == "FIFO":
|
||||
self._items.append(item)
|
||||
elif self.getType() == "LIFO":
|
||||
self._items.insert(0, item)
|
||||
elif self.getType() == "PRIORITY":
|
||||
heapq.heappush(self._items, (-item.getPriority(),
|
||||
item.getCreationTime(), item))
|
||||
|
||||
self._updateItemDB(item)
|
||||
self._updateItemDB(item)
|
||||
self._incSize(1)
|
||||
self.condition.notifyAll()
|
||||
|
||||
def updatePriority(self, user):
|
||||
if self.isClosed():
|
||||
raise SynergyError("the queue is closed!")
|
||||
|
||||
if self.getType() != "PRIORITY":
|
||||
raise SynergyError("updatePriority() cannot be applied on this "
|
||||
"queue type")
|
||||
|
||||
new_items = []
|
||||
|
||||
with self.condition:
|
||||
while self._items:
|
||||
item = heapq.heappop(self._items)[2]
|
||||
|
||||
if item.getUserId() == user.getId() and\
|
||||
item.getProjectId() == user.getProjectId():
|
||||
item.setPriority(user.getPriority().getValue())
|
||||
new_items.append(item)
|
||||
|
||||
for item in new_items:
|
||||
heapq.heappush(self._items, (-item.getPriority(),
|
||||
item.getCreationTime(), item))
|
||||
|
||||
self._incSize(1)
|
||||
self.condition.notifyAll()
|
||||
|
||||
def getType(self):
|
||||
return self.get("type")
|
||||
@ -230,15 +273,13 @@ class Queue(SynergyObject):
|
||||
connection.execute(QUERY, [item.getId()])
|
||||
|
||||
trans.commit()
|
||||
self._incSize(-1)
|
||||
except SQLAlchemyError as ex:
|
||||
trans.rollback()
|
||||
raise SynergyError(ex.message)
|
||||
finally:
|
||||
connection.close()
|
||||
|
||||
self._setSize(-1)
|
||||
self.queue.task_done()
|
||||
|
||||
def _createTable(self):
|
||||
if not self.db_engine:
|
||||
return
|
||||
@ -282,7 +323,7 @@ TIMESTAMP NULL, `data` TEXT NOT NULL) ENGINE=InnoDB""" % self.getName()
|
||||
item.setLastUpdate(row[6])
|
||||
|
||||
self.restore(item)
|
||||
self._setSize(1)
|
||||
self._incSize(1)
|
||||
except SQLAlchemyError as ex:
|
||||
raise SynergyError(ex.message)
|
||||
finally:
|
||||
|
@ -1,5 +1,3 @@
|
||||
import utils
|
||||
|
||||
from datetime import datetime
|
||||
from flavor import Flavor
|
||||
from server import Server
|
||||
@ -109,37 +107,19 @@ class Request(object):
|
||||
server.setProjectId(instance_data["project_id"])
|
||||
server.setCreatedAt(instance_data["created_at"])
|
||||
server.setMetadata(instance_data["metadata"])
|
||||
server.setUserData(instance_data["user_data"])
|
||||
server.setKeyName(instance_data["key_name"])
|
||||
server.setType()
|
||||
|
||||
user_data = instance_data.get("user_data", None)
|
||||
if user_data:
|
||||
try:
|
||||
data = utils.decodeBase64(user_data)
|
||||
quota = utils.getConfigParameter(data, "quota", "synergy")
|
||||
if not quota:
|
||||
quota = utils.getConfigParameter(data, "quota")
|
||||
|
||||
metadata = instance_data.get("metadata", {})
|
||||
|
||||
if quota is None or quota == "private" or quota != "shared":
|
||||
server.setType("permanent")
|
||||
metadata["quota"] = "private"
|
||||
|
||||
elif quota == "shared":
|
||||
server.setType("ephemeral")
|
||||
metadata["quota"] = "shared"
|
||||
except Exception:
|
||||
server.setType("permanent")
|
||||
metadata["quota"] = "private"
|
||||
request.server = server
|
||||
|
||||
if "filter_properties" in request.data:
|
||||
filter_properties = request.data["filter_properties"]
|
||||
request.retry = filter_properties["retry"]
|
||||
request.retry = filter_properties.get("retry", {})
|
||||
else:
|
||||
request_spec = request.data["request_specs"][0]
|
||||
nova_object = request_spec["nova_object.data"]
|
||||
request.retry = nova_object["retry"]
|
||||
request.retry = nova_object.get("retry", {})
|
||||
|
||||
if not request.retry:
|
||||
request.retry = {}
|
||||
|
@ -1,3 +1,5 @@
|
||||
import logging
|
||||
import re
|
||||
import utils
|
||||
|
||||
from datetime import datetime
|
||||
@ -23,6 +25,9 @@ See the License for the specific language governing
|
||||
permissions and limitations under the License."""
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Server(SynergyObject):
|
||||
|
||||
def __init__(self):
|
||||
@ -45,8 +50,35 @@ class Server(SynergyObject):
|
||||
def getType(self):
|
||||
return self.get("type")
|
||||
|
||||
def setType(self, type):
|
||||
self.set("type", type)
|
||||
def setType(self, type=None):
|
||||
if type:
|
||||
self.set("type", type)
|
||||
return
|
||||
|
||||
metadata = self.get("metadata")
|
||||
userdata = self.get("userdata")
|
||||
|
||||
if "quota" in metadata:
|
||||
if metadata["quota"] == "shared":
|
||||
self.set("type", "ephemeral")
|
||||
|
||||
elif userdata:
|
||||
try:
|
||||
data = userdata.splitlines()
|
||||
keyValRegEx = re.compile(r'^\s*(quota)\s*=\s*(shared)\s*$')
|
||||
|
||||
for row in data:
|
||||
result = keyValRegEx.search(row)
|
||||
if result:
|
||||
self.set("type", "ephemeral")
|
||||
break
|
||||
except Exception as ex:
|
||||
LOG.error(ex)
|
||||
|
||||
if self.isPermanent():
|
||||
metadata["quota"] = "private"
|
||||
else:
|
||||
metadata["quota"] = "shared"
|
||||
|
||||
def getState(self):
|
||||
return self.get("state")
|
||||
@ -78,30 +110,11 @@ class Server(SynergyObject):
|
||||
def setMetadata(self, metadata):
|
||||
self.set("metadata", metadata)
|
||||
|
||||
if "quota" in metadata:
|
||||
if metadata["quota"] == "shared":
|
||||
self.setType("ephemeral")
|
||||
else:
|
||||
self.setType("permanent")
|
||||
|
||||
def getUserData(self):
|
||||
return self.get("userdata")
|
||||
|
||||
def setUserData(self, userdata):
|
||||
self.set("userdata", userdata)
|
||||
|
||||
if userdata:
|
||||
try:
|
||||
quota = utils.getConfigParameter(userdata, "quota", "synergy")
|
||||
|
||||
if quota is None or quota == "private":
|
||||
self.setType("permanent")
|
||||
elif quota == "shared":
|
||||
self.setType("ephemeral")
|
||||
else:
|
||||
self.setType("permanent")
|
||||
except Exception:
|
||||
self.setType("permanent")
|
||||
self.set("userdata", utils.decodeBase64(userdata))
|
||||
|
||||
def getUserId(self):
|
||||
return self.get("user_id")
|
||||
|
@ -79,35 +79,6 @@ class FairShareManager(Manager):
|
||||
def destroy(self):
|
||||
pass
|
||||
|
||||
def calculatePriority(self, user_id, prj_id, timestamp=None, retry=0):
|
||||
project = self.project_manager.getProject(id=prj_id)
|
||||
|
||||
if not project:
|
||||
raise SynergyError("project=%s not found!" % prj_id)
|
||||
|
||||
user = project.getUser(id=user_id)
|
||||
|
||||
if not user:
|
||||
raise SynergyError("user=%s not found!" % user_id)
|
||||
|
||||
priority = user.getPriority()
|
||||
fairshare_vcpus = priority.getFairShare("vcpus")
|
||||
fairshare_memory = priority.getFairShare("memory")
|
||||
|
||||
if not timestamp:
|
||||
timestamp = datetime.utcnow()
|
||||
|
||||
now = datetime.utcnow()
|
||||
|
||||
diff = (now - timestamp)
|
||||
minutes = diff.seconds / 60
|
||||
priority = (float(self.age_weight) * minutes +
|
||||
float(self.vcpus_weight) * fairshare_vcpus +
|
||||
float(self.memory_weight) * fairshare_memory -
|
||||
float(self.age_weight) * retry)
|
||||
|
||||
return int(priority)
|
||||
|
||||
def doOnEvent(self, event_type, *args, **kwargs):
|
||||
if event_type == "USER_ADDED":
|
||||
user = kwargs.get("user", None)
|
||||
@ -304,3 +275,5 @@ class FairShareManager(Manager):
|
||||
|
||||
usr_priority.setValue(float(self.vcpus_weight) * f_vcpus +
|
||||
float(self.memory_weight) * f_memory)
|
||||
|
||||
self.notify(event_type="USER_PRIORITY_UPDATED", user=user)
|
||||
|
@ -1,4 +1,3 @@
|
||||
import common.utils as utils
|
||||
import eventlet
|
||||
import hashlib
|
||||
import hmac
|
||||
@ -68,6 +67,7 @@ class ServerEventHandler(object):
|
||||
server.setMetadata(server_info["metadata"])
|
||||
server.setDeletedAt(server_info["deleted_at"])
|
||||
server.setTerminatedAt(server_info["terminated_at"])
|
||||
server.setType()
|
||||
|
||||
if "host" in server_info:
|
||||
server.setHost(server_info["host"])
|
||||
@ -548,6 +548,9 @@ class NovaManager(Manager):
|
||||
server.setName(server_data["name"])
|
||||
server.setKeyName(server_data["key_name"])
|
||||
server.setMetadata(server_data["metadata"])
|
||||
server.setUserData(server_data.get("OS-EXT-SRV-ATTR:user_data",
|
||||
None))
|
||||
server.setType()
|
||||
server.setState(server_data["OS-EXT-STS:vm_state"])
|
||||
server.setUserId(server_data["user_id"])
|
||||
server.setProjectId(server_data["tenant_id"])
|
||||
@ -558,10 +561,6 @@ class NovaManager(Manager):
|
||||
server.setTerminatedAt(
|
||||
server_data.get("OS-SRV-USG:terminated_at", None))
|
||||
|
||||
if "user_data" in server_data:
|
||||
user_data = server_data["user_data"]
|
||||
server.setUserData(utils.decodeBase64(user_data))
|
||||
|
||||
if detail:
|
||||
server.setFlavor(self.getFlavor(
|
||||
server_data["flavor"]["id"]))
|
||||
@ -587,6 +586,9 @@ class NovaManager(Manager):
|
||||
server.setName(server_data["name"])
|
||||
server.setKeyName(server_data["key_name"])
|
||||
server.setMetadata(server_data["metadata"])
|
||||
server.setUserData(server_data.get("OS-EXT-SRV-ATTR:user_data",
|
||||
None))
|
||||
server.setType()
|
||||
server.setState(server_data["OS-EXT-STS:vm_state"])
|
||||
server.setUserId(server_data["user_id"])
|
||||
server.setProjectId(server_data["tenant_id"])
|
||||
@ -597,10 +599,6 @@ class NovaManager(Manager):
|
||||
server.setTerminatedAt(
|
||||
server_data.get("OS-SRV-USG:terminated_at", None))
|
||||
|
||||
if "user_data" in server_data:
|
||||
user_data = server_data["user_data"]
|
||||
server.setUserData(utils.decodeBase64(user_data))
|
||||
|
||||
if detail:
|
||||
server.setFlavor(self.getFlavor(server_data["flavor"]["id"]))
|
||||
|
||||
@ -978,9 +976,9 @@ a.launched_at<='%(to_date)s' and (a.terminated_at>='%(from_date)s' or \
|
||||
try:
|
||||
# retrieve the amount of resources in terms of cores and memory
|
||||
QUERY = """select a.uuid, a.vcpus, a.memory_mb, a.root_gb, \
|
||||
a.vm_state from nova.instances as a WHERE a.project_id='%(project_id)s' \
|
||||
and a.vm_state in ('active', 'building', 'error') and a.deleted_at is NULL \
|
||||
and a.terminated_at is NULL""" % {"project_id": prj_id}
|
||||
a.vm_state, a.user_data from nova.instances as a WHERE a.project_id=\
|
||||
'%(project_id)s'and a.vm_state in ('active', 'building', 'error') and \
|
||||
a.deleted_at is NULL and a.terminated_at is NULL""" % {"project_id": prj_id}
|
||||
|
||||
LOG.debug("getProjectServers query: %s" % QUERY)
|
||||
|
||||
@ -995,6 +993,7 @@ and a.terminated_at is NULL""" % {"project_id": prj_id}
|
||||
server = Server()
|
||||
server.setId(row[0])
|
||||
server.setState(row[4])
|
||||
server.setUserData(row[5])
|
||||
server.setFlavor(flavor)
|
||||
|
||||
QUERY = """select `key`, value from nova.instance_metadata \
|
||||
@ -1009,6 +1008,7 @@ where instance_uuid='%(id)s' and deleted_at is NULL""" % {"id": server.getId()}
|
||||
metadata[row[0]] = row[1]
|
||||
|
||||
server.setMetadata(metadata)
|
||||
server.setType()
|
||||
|
||||
servers.append(server)
|
||||
except SQLAlchemyError as ex:
|
||||
@ -1031,7 +1031,7 @@ where instance_uuid='%(id)s' and deleted_at is NULL""" % {"id": server.getId()}
|
||||
ids = "uuid in ('%s') and " % "', '".join(server_ids)
|
||||
|
||||
QUERY = """select uuid, vcpus, memory_mb, root_gb, \
|
||||
vm_state from nova.instances where project_id = \
|
||||
vm_state, user_data from nova.instances where project_id = \
|
||||
'%(project_id)s' and deleted_at is NULL and (vm_state='error' or \
|
||||
(%(server_ids)s vm_state='active' and terminated_at is NULL \
|
||||
and timestampdiff(minute, launched_at, utc_timestamp()) >= %(expiration)s))\
|
||||
@ -1050,6 +1050,7 @@ and timestampdiff(minute, launched_at, utc_timestamp()) >= %(expiration)s))\
|
||||
server = Server()
|
||||
server.setId(row[0])
|
||||
server.setState(row[4])
|
||||
server.setUserData(row[5])
|
||||
server.setFlavor(flavor)
|
||||
|
||||
QUERY = """select `key`, value from nova.instance_metadata \
|
||||
@ -1064,6 +1065,7 @@ where instance_uuid='%(id)s' and deleted_at is NULL""" % {"id": server.getId()}
|
||||
metadata[row[0]] = row[1]
|
||||
|
||||
server.setMetadata(metadata)
|
||||
server.setType()
|
||||
|
||||
servers.append(server)
|
||||
except SQLAlchemyError as ex:
|
||||
|
@ -212,7 +212,7 @@ class QuotaManager(Manager):
|
||||
% (uuid, TTL, state, prj_id))
|
||||
|
||||
self.nova_manager.deleteServer(server)
|
||||
except SynergyError as ex:
|
||||
except Exception as ex:
|
||||
LOG.error(ex)
|
||||
|
||||
def updateSharedQuota(self):
|
||||
|
@ -67,29 +67,29 @@ class Worker(Thread):
|
||||
last_release_time = SharedQuota.getLastReleaseTime()
|
||||
|
||||
while not self.exit and not self.queue.isClosed():
|
||||
if last_release_time < SharedQuota.getLastReleaseTime():
|
||||
last_release_time = SharedQuota.getLastReleaseTime()
|
||||
try:
|
||||
if last_release_time < SharedQuota.getLastReleaseTime():
|
||||
last_release_time = SharedQuota.getLastReleaseTime()
|
||||
|
||||
while queue_items:
|
||||
self.queue.restore(queue_items.pop(0))
|
||||
while queue_items:
|
||||
self.queue.restore(queue_items.pop(0))
|
||||
|
||||
if len(queue_items) >= self.backfill_depth:
|
||||
SharedQuota.wait()
|
||||
continue
|
||||
|
||||
queue_item = self.queue.dequeue(block=False)
|
||||
|
||||
if queue_item is None:
|
||||
if self.queue.getSize():
|
||||
if len(queue_items) >= self.backfill_depth:
|
||||
SharedQuota.wait()
|
||||
continue
|
||||
else:
|
||||
queue_item = self.queue.dequeue(block=True)
|
||||
|
||||
if queue_item is None:
|
||||
continue
|
||||
queue_item = self.queue.dequeue(block=False)
|
||||
|
||||
if queue_item is None:
|
||||
if self.queue.getSize():
|
||||
SharedQuota.wait()
|
||||
continue
|
||||
else:
|
||||
queue_item = self.queue.dequeue(block=True)
|
||||
|
||||
if queue_item is None:
|
||||
continue
|
||||
|
||||
try:
|
||||
request = Request.fromDict(queue_item.getData())
|
||||
user_id = request.getUserId()
|
||||
prj_id = request.getProjectId()
|
||||
@ -145,7 +145,7 @@ class Worker(Thread):
|
||||
"ta=shared" % (server_id, user_id, prj_id))
|
||||
|
||||
found = True
|
||||
except SynergyError as ex:
|
||||
except Exception as ex:
|
||||
LOG.error("error on building the server %s (reason=%s)"
|
||||
% (server.getId(), ex))
|
||||
|
||||
@ -157,7 +157,7 @@ class Worker(Thread):
|
||||
else:
|
||||
queue_items.append(queue_item)
|
||||
|
||||
except SynergyError as ex:
|
||||
except Exception as ex:
|
||||
LOG.error("Exception has occured", exc_info=1)
|
||||
LOG.error("Worker %s: %s" % (self.name, ex))
|
||||
|
||||
@ -204,6 +204,7 @@ class SchedulerManager(Manager):
|
||||
self.backfill_depth = CONF.SchedulerManager.backfill_depth
|
||||
self.exit = False
|
||||
self.configured = False
|
||||
self.queue = None
|
||||
|
||||
def execute(self, command, *args, **kargs):
|
||||
raise SynergyError("command %r not supported!" % command)
|
||||
@ -251,7 +252,9 @@ class SchedulerManager(Manager):
|
||||
|
||||
self._processServerEvent(server, event, state)
|
||||
elif event_type == "SERVER_CREATE":
|
||||
|
||||
self._processServerCreate(kwargs["request"])
|
||||
|
||||
elif event_type == "PROJECT_ADDED":
|
||||
if not self.configured:
|
||||
return
|
||||
@ -261,6 +264,10 @@ class SchedulerManager(Manager):
|
||||
if self.queue and project:
|
||||
project.setQueue(self.queue)
|
||||
|
||||
elif event_type == "USER_PRIORITY_UPDATED":
|
||||
if self.queue:
|
||||
self.queue.updatePriority(kwargs.get("user", None))
|
||||
|
||||
def _processServerEvent(self, server, event, state):
|
||||
project = self.project_manager.getProject(id=server.getProjectId())
|
||||
|
||||
@ -279,7 +286,6 @@ class SchedulerManager(Manager):
|
||||
self.nova_manager.setServerMetadata(server,
|
||||
"expiration_time",
|
||||
expiration)
|
||||
|
||||
else:
|
||||
quota = project.getQuota()
|
||||
|
||||
@ -353,12 +359,6 @@ class SchedulerManager(Manager):
|
||||
request.getUserId(),
|
||||
request.getProjectId()))
|
||||
else:
|
||||
priority = self.fairshare_manager.calculatePriority(
|
||||
user_id=request.getUserId(),
|
||||
prj_id=request.getProjectId(),
|
||||
timestamp=request.getCreatedAt(),
|
||||
retry=num_attempts)
|
||||
|
||||
context = request.getContext()
|
||||
|
||||
km = self.keystone_manager
|
||||
@ -381,8 +381,9 @@ class SchedulerManager(Manager):
|
||||
|
||||
context["trust_id"] = trust.getId()
|
||||
user = project.getUser(id=request.getUserId())
|
||||
priority = user.getPriority().getValue()
|
||||
|
||||
self.queue.enqueue(user, request.toDict(), priority)
|
||||
self.queue.enqueue(user, request.toDict())
|
||||
|
||||
LOG.info("new request: id=%s user_id=%s prj_id=%s priority"
|
||||
"=%s quota=shared" % (request.getId(),
|
||||
|
@ -10,12 +10,9 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
from mock import MagicMock
|
||||
from mock import patch
|
||||
from synergy_scheduler_manager.common.project import Project
|
||||
from synergy_scheduler_manager.common.user import User
|
||||
from synergy_scheduler_manager.fairshare_manager import FairShareManager
|
||||
from synergy_scheduler_manager.project_manager import ProjectManager
|
||||
from synergy_scheduler_manager.tests.unit import base
|
||||
@ -41,46 +38,6 @@ class TestFairshareManager(base.TestCase):
|
||||
with patch('synergy_scheduler_manager.fairshare_manager.CONF'):
|
||||
self.fairshare_manager.setup()
|
||||
|
||||
def test_calculate_priority_one_user(self):
|
||||
# self.fairshare_manager.addProject(prj_id=1, prj_name="test")
|
||||
project = Project()
|
||||
project.setId(1)
|
||||
project.setName("test_project")
|
||||
|
||||
# Define values used for computing the priority
|
||||
age_weight = self.fairshare_manager.age_weight = 1.0
|
||||
vcpus_weight = self.fairshare_manager.vcpus_weight = 2.0
|
||||
memory_weight = self.fairshare_manager.memory_weight = 3.0
|
||||
datetime_start = datetime(year=2000, month=1, day=1, hour=0, minute=0)
|
||||
datetime_stop = datetime(year=2000, month=1, day=1, hour=2, minute=0)
|
||||
minutes = (datetime_stop - datetime_start).seconds / 60
|
||||
fairshare_cores = 10
|
||||
fairshare_ram = 50
|
||||
|
||||
# Add a user to the project
|
||||
user = User()
|
||||
user.setId(22)
|
||||
user.setName("test_user")
|
||||
priority = user.getPriority()
|
||||
priority.setFairShare('vcpus', fairshare_cores)
|
||||
priority.setFairShare('memory', fairshare_ram)
|
||||
|
||||
project.addUser(user)
|
||||
self.project_manager.projects[project.getId()] = project
|
||||
|
||||
# Compute the expected priority given the previously defined values
|
||||
expected_priority = int(age_weight * minutes +
|
||||
vcpus_weight * fairshare_cores +
|
||||
memory_weight * fairshare_ram)
|
||||
|
||||
with patch("synergy_scheduler_manager.fairshare_manager.datetime") \
|
||||
as datetime_mock:
|
||||
datetime_mock.utcnow.side_effect = (datetime_start, datetime_stop)
|
||||
priority = self.fairshare_manager.calculatePriority(
|
||||
user.getId(), project.getId())
|
||||
|
||||
self.assertEqual(expected_priority, priority)
|
||||
|
||||
def test_calculate_fairshare(self):
|
||||
# TODO(vincent)
|
||||
pass
|
||||
|
@ -115,8 +115,9 @@ class TestQueue(base.TestCase):
|
||||
user = User()
|
||||
user.setId(2)
|
||||
user.setProjectId(100)
|
||||
user.getPriority().setValue(10)
|
||||
|
||||
self.queue.enqueue(user=user, data="mydata", priority=10)
|
||||
self.queue.enqueue(user=user, data="mydata")
|
||||
|
||||
self.assertEqual(1, self.queue.getSize())
|
||||
|
||||
@ -125,8 +126,10 @@ class TestQueue(base.TestCase):
|
||||
user = User()
|
||||
user.setId(2)
|
||||
user.setProjectId(100)
|
||||
user.getPriority().setValue(10)
|
||||
data = json.dumps("mydata")
|
||||
self.queue.enqueue(user=user, data=data, priority=10)
|
||||
|
||||
self.queue.enqueue(user=user, data=data)
|
||||
|
||||
self.assertEqual(1, self.queue.getSize())
|
||||
|
||||
@ -148,9 +151,10 @@ class TestQueue(base.TestCase):
|
||||
user = User()
|
||||
user.setId(2)
|
||||
user.setProjectId(100)
|
||||
user.getPriority().setValue(10)
|
||||
data = json.dumps("mydata")
|
||||
|
||||
self.queue.enqueue(user=user, data=data, priority=10)
|
||||
self.queue.enqueue(user=user, data=data)
|
||||
|
||||
# Mock the DB
|
||||
execute_mock = self.db_engine_mock.connect().execute
|
||||
|
Loading…
x
Reference in New Issue
Block a user