From 1a36a1db90d8912ad128667464c7e7a95a22ec43 Mon Sep 17 00:00:00 2001 From: Lisa Zangrando Date: Wed, 23 Nov 2016 12:29:47 +0100 Subject: [PATCH] Add a backfill_depth parameter Allow Synergy to not check the whole queue when looking for jobs to start. Change-Id: Idf8b92d3d91c0beadabea09f31e042c39320e4f0 Sem-Ver: feature Closes-bug: #1644188 DocImpact --- .../scheduler_manager.py | 25 ++++++++++++------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/synergy_scheduler_manager/scheduler_manager.py b/synergy_scheduler_manager/scheduler_manager.py index 0000e9c..fb7fda4 100644 --- a/synergy_scheduler_manager/scheduler_manager.py +++ b/synergy_scheduler_manager/scheduler_manager.py @@ -102,11 +102,13 @@ class Notifications(object): class Worker(Thread): - def __init__(self, name, queue, projects, nova_manager, keystone_manager): + def __init__(self, name, queue, projects, nova_manager, + keystone_manager, backfill_depth=100): super(Worker, self).__init__() self.setDaemon(True) self.name = name + self.backfill_depth = backfill_depth self.queue = queue self.projects = projects self.nova_manager = nova_manager @@ -136,8 +138,11 @@ class Worker(Thread): last_release_time = SharedQuota.getLastReleaseTime() while queue_items: - item = queue_items.pop(0) - self.queue.reinsertItem(item) + self.queue.reinsertItem(queue_items.pop(0)) + + if len(queue_items) >= self.backfill_depth: + SharedQuota.wait() + continue queue_item = self.queue.getItem(blocking=False) @@ -167,8 +172,8 @@ class Worker(Thread): self.queue.deleteItem(queue_item) continue except Exception as ex: - LOG.warn("Worker %s: the server %r is not anymore " - "available! reason=%s" % (self.name, prj_id, ex)) + LOG.warn("Worker %s: the server %r is not anymore availa" + "ble ! [reason=%s]" % (self.name, server_id, ex)) self.queue.deleteItem(queue_item) continue @@ -235,7 +240,8 @@ class SchedulerManager(Manager): super(SchedulerManager, self).__init__("SchedulerManager") self.config_opts = [ - cfg.FloatOpt('default_TTL', default=10.0), + cfg.IntOpt("backfill_depth", default=100), + cfg.FloatOpt("default_TTL", default=10.0), cfg.ListOpt("projects", default=[], help="the projects list"), cfg.ListOpt("shares", default=[], help="the shares list"), cfg.ListOpt("TTLs", default=[], help="the TTLs list") @@ -264,7 +270,7 @@ class SchedulerManager(Manager): self.keystone_manager = self.getManager("KeystoneManager") self.fairshare_manager = self.getManager("FairShareManager") self.default_TTL = float(CONF.SchedulerManager.default_TTL) - self.fairshare_manager = self.getManager("FairShareManager") + self.backfill_depth = CONF.SchedulerManager.backfill_depth self.projects = {} self.listener = None self.exit = False @@ -403,7 +409,8 @@ class SchedulerManager(Manager): self.dynamic_queue, self.projects, self.nova_manager, - self.keystone_manager) + self.keystone_manager, + self.backfill_depth) dynamic_worker.start() self.workers.append(dynamic_worker) @@ -417,7 +424,7 @@ class SchedulerManager(Manager): targets=[target], endpoints=[self.notifications]) - LOG.info("listener created") + self.quota_manager.deleteExpiredServers() self.listener.start() self.configured = True