From a9a83e6abcd63bc31c2c22ff48d76bbfe8733939 Mon Sep 17 00:00:00 2001 From: Lisa Zangrando Date: Fri, 3 Jun 2016 14:44:35 +0200 Subject: [PATCH] scheduler_maanger.py: restored the signatures of Notificatio.info() Notificatio.warn() and Notificatio.error() fairshare_manager.py: fixed calculateFairShare() client: added new shell commands Change-Id: I8b0232851b4741f1aee7b95e9181c8133ee8d220 --- setup.cfg | 7 + synergy_scheduler_manager/client/command.py | 361 ++++++++++++++++++ .../fairshare_manager.py | 18 +- .../scheduler_manager.py | 35 +- 4 files changed, 377 insertions(+), 44 deletions(-) create mode 100644 synergy_scheduler_manager/client/command.py diff --git a/setup.cfg b/setup.cfg index 6df8da7..3090ea5 100644 --- a/setup.cfg +++ b/setup.cfg @@ -30,6 +30,13 @@ synergy.managers = FairShareManager = synergy_scheduler_manager.fairshare_manager:FairShareManager SchedulerManager = synergy_scheduler_manager.scheduler_manager:SchedulerManager +synergy.commands = + get_queue = synergy_scheduler_manager.client.command:GetQueue + get_quota = synergy_scheduler_manager.client.command:GetQuota + get_priority = synergy_scheduler_manager.client.command:GetPriority + get_share = synergy_scheduler_manager.client.command:GetShare + get_usage = synergy_scheduler_manager.client.command:GetUsage + [build_sphinx] source-dir = doc/source build-dir = doc/build diff --git a/synergy_scheduler_manager/client/command.py b/synergy_scheduler_manager/client/command.py new file mode 100644 index 0000000..d6e8490 --- /dev/null +++ b/synergy_scheduler_manager/client/command.py @@ -0,0 +1,361 @@ +from synergy.client.command import Execute + +__author__ = "Lisa Zangrando" +__email__ = "lisa.zangrando[AT]pd.infn.it" +__copyright__ = """Copyright (c) 2015 INFN - INDIGO-DataCloud +All Rights Reserved + +Licensed under the Apache License, Version 2.0; +you may not use this file except in compliance with the +License. You may obtain a copy of the License at: + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +either express or implied. +See the License for the specific language governing +permissions and limitations under the License.""" + + +class GetQuota(Execute): + + def __init__(self): + super(GetQuota, self).__init__("GET_DYNAMIC_QUOTA") + + def configureParser(self, subparser): + parser = subparser.add_parser("get_quota", + add_help=True, + help="shows the dynamic quota info") + parser.add_argument("--long", + action='store_true', + help="shows more details") + + def sendRequest(self, synergy_url, args): + self.long = args.long + + super(GetQuota, self).sendRequest( + synergy_url + "/synergy/execute", "QuotaManager", self.getName()) + + def log(self): + quota = self.getResults() + + if not self.long: + cores_in_use = "{:d}".format(quota["cores"]["in_use"]) + max_cores_in_use = max(len(cores_in_use), len("in use")) + + cores_limit = "{:.2f}".format(quota["cores"]["limit"]) + max_cores_limit = max(len(cores_limit), len("limit")) + + ram_in_use = "{:d}".format(quota["ram"]["in_use"]) + max_ram_in_use = max(len(ram_in_use), len("in use")) + + ram_limit = "{:.2f}".format(quota["ram"]["limit"]) + max_ram_limit = max(len(ram_limit), len("limit")) + + separator = "-" * (max_cores_in_use + max_cores_limit + + max_ram_in_use + max_ram_limit + 7) + "\n" + + raw = "| {0:%ss} | {1:%ss} | {2:%ss} |\n" % ( + len("ram (MB)"), + max(max_cores_in_use, max_ram_in_use), + max(max_cores_limit, max_ram_limit)) + + msg = separator + msg += raw.format("type", "in use", "limit") + msg += separator + msg += raw.format("ram (MB)", ram_in_use, ram_limit) + msg += raw.format("cores", cores_in_use, cores_limit) + msg += separator + + print(msg) + else: + max_ram = 0 + max_ram_in_use = len("{:d}".format(quota["ram"]["in_use"])) + max_ram_limit = len("{:.2f}".format(quota["ram"]["limit"])) + max_cores = 0 + max_cores_in_use = len("{:d}".format(quota["cores"]["in_use"])) + max_cores_limit = len("{:.2f}".format(quota["cores"]["limit"])) + max_prj_name = len("project") + + for project in quota["projects"].values(): + max_prj_name = max(len(project["name"]), max_prj_name) + max_ram = max(len("{:d}".format(project["ram"])), max_ram) + max_cores = max(len("{:d}".format(project["cores"])), + max_cores) + + separator = "-" * (max_prj_name + max_cores + max_cores_in_use + + max_cores_limit + max_ram + max_ram_in_use + + max_ram_limit + 48) + + title = "| {0:%ss} | {1:%ss} | {2:%ss} |\n" % ( + max_prj_name, + max_cores + max_cores_in_use + max_cores_limit + 19, + max_ram + max_ram_in_use + max_ram_limit + 19) + + raw = "| {0:%ss} | in use={1:%d} ({2:%d}) | limit={3:%ss} |" \ + " in use={4:%d} ({5:%d}) | limit={6:%ss} |\n" + raw = raw % (max_prj_name, max_cores, max_cores_in_use, + max_cores_limit, max_ram, max_ram_in_use, + max_ram_limit) + + msg = separator + "\n" + msg += title.format("project", "cores", "ram (MB)") + msg += separator + "\n" + + for project in quota["projects"].values(): + msg += raw.format( + project["name"], project["cores"], + quota["cores"]["in_use"], + "{:.2f}".format(quota["cores"]["limit"]), + project["ram"], + quota["ram"]["in_use"], + "{:.2f}".format(quota["ram"]["limit"])) + msg += separator + "\n" + + print(msg) + + +class GetPriority(Execute): + + def __init__(self): + super(GetPriority, self).__init__("GET_PRIORITY") + + def configureParser(self, subparser): + subparser.add_parser("get_priority", + add_help=True, + help="shows the users priority") + + def sendRequest(self, synergy_url, args): + super(GetPriority, self).sendRequest( + synergy_url + "/synergy/execute", + "FairShareManager", + self.getName()) + + def log(self): + projects = self.getResults() + + max_prj = len("project") + max_user = len("user") + max_priority = len("priority") + + for prj_name, users in projects.items(): + max_prj = max(len(prj_name), max_prj) + + for user_name, priority in users.items(): + max_user = max(len(user_name), max_user) + max_priority = max(len("{:.2f}".format(priority)), + max_priority) + + separator = "-" * (max_prj + max_user + max_priority + 10) + "\n" + + raw = "| {0:%ss} | {1:%ss} | {2:%ss} |\n" % ( + max_prj, max_user, max_priority) + + msg = separator + msg += raw.format("project", "user", "priority") + msg += separator + + for prj_name in sorted(projects.keys()): + for user_name in sorted(projects[prj_name].keys()): + msg += raw.format( + prj_name, + user_name, + "{:.2f}".format(projects[prj_name][user_name])) + + msg += separator + + print(msg) + + +class GetQueue(Execute): + + def __init__(self): + super(GetQueue, self).__init__("GET_QUEUE") + + def configureParser(self, subparser): + subparser.add_parser("get_queue", + add_help=True, + help="shows the queue info") + + def sendRequest(self, synergy_url, args): + super(GetQueue, self).sendRequest( + synergy_url + "/synergy/execute", + "QueueManager", + self.getName(), + {"name": "DYNAMIC"}) + + def log(self): + queue = self.getResults() + + max_status = len("status") + max_queue = max(len(queue["name"]), len("queue")) + max_size = max(len("{:d}".format(queue["size"])), len("size")) + + separator = "-" * (max_queue + max_status + max_size + 10) + "\n" + + raw = "| {0:%ss} | {1:%ss} | {2:%ss} |\n" % ( + max_queue, max_status, max_size) + + msg = separator + msg += raw.format("queue", "status", "size") + msg += separator + + msg += raw.format(queue["name"], + queue["status"], + "{:d}".format(queue["size"])) + + msg += separator + + print(msg) + + +class GetShare(Execute): + + def __init__(self): + super(GetShare, self).__init__("GET_SHARE") + + def configureParser(self, subparser): + subparser.add_parser("get_share", + add_help=True, + help="shows the users share") + + def sendRequest(self, synergy_url, args): + super(GetShare, self).sendRequest( + synergy_url + "/synergy/execute", + "FairShareManager", + "GET_PROJECTS") + + def log(self): + projects = self.getResults() + + max_prj = len("project") + max_usr = len("user") + max_prj_share = len("share") + max_usr_share = len("share (abs)") + + for project in projects.values(): + max_prj = max(len(project["name"]), max_prj) + max_prj_share = max(len("{:.2f}".format(project["share"])), + max_prj_share) + + for user in project["users"].values(): + max_usr = max(len(user["name"]), max_usr) + max_usr_share = max( + len("{:.2f} ({:.2f})".format(user["share"], + user["norm_share"])), + max_usr_share) + + separator_str = "-" * (max_prj + max_usr + max_prj_share + + max_usr_share + 13) + "\n" + + data_str = "| {0:%ss} | {1:%ss} | {2:%ss} | {3:%ss} |\n" % ( + max_prj, max_prj_share, max_usr, max_usr_share) + + msg = separator_str + msg += data_str.format("project", "share", "user", "share (abs)") + msg += separator_str + + for project in projects.values(): + for user in project["users"].values(): + msg += data_str.format( + project["name"], + "{:.2f}".format(project["share"]), + user["name"], + "{:.2f} ({:.2f})".format(user["share"], + user["norm_share"])) + + msg += separator_str + print(msg) + + +class GetUsage(Execute): + + def __init__(self): + super(GetUsage, self).__init__("GET_USAGE") + + def configureParser(self, subparser): + subparser.add_parser("get_usage", + add_help=True, + help="retrieve the resource usages") + + def sendRequest(self, synergy_url, args): + super(GetUsage, self).sendRequest( + synergy_url + "/synergy/execute", + "FairShareManager", + "GET_PROJECTS") + + def log(self): + projects = self.getResults() + + max_prj = len("project") + max_usr = len("user") + max_prj_cores = len("cores") + max_usr_cores = len("cores") + max_prj_ram = len("ram") + max_usr_ram = len("ram (abs)") + + for project in projects.values(): + usage = project["usage"] + + max_prj = max(len(project["name"]), max_prj) + max_prj_cores = max(len( + "{:.2f}%".format(usage["effective_cores"] * 100)), + max_prj_cores) + + max_prj_ram = max(len( + "{:.2f}%".format(usage["effective_ram"] * 100)), + max_prj_ram) + + for user in project["users"].values(): + usage = user["usage"] + + max_usr = max(len(user["name"]), max_usr) + max_usr_cores = max(len("{:.2f}% ({:.2f})%".format( + usage["effective_rel_cores"] * 100, + usage["norm_cores"] * 100)), + max_usr_cores) + + max_usr_ram = max(len("{:.2f}% ({:.2f})%".format( + usage["effective_rel_ram"] * 100, + usage["norm_ram"] * 100)), + max_usr_ram) + + separator = "-" * (max_prj + max_usr + max_prj_cores + + max_usr_cores + max_prj_ram + + max_usr_ram + 19) + "\n" + + raw = "| {0:%ss} | {1:%ss} | {2:%ss} | {3:%ss} | {4:%ss} | " \ + "{5:%ss} | \n" % (max_prj, max_prj_cores, max_prj_ram, + max_usr, max_usr_cores, max_usr_ram) + + msg = separator + msg += raw.format("project", "cores", "ram", + "user", "cores (abs)", "ram (abs)") + msg += separator + + for project in projects.values(): + prj_usage = project["usage"] + + for user in project["users"].values(): + usr_usage = user["usage"] + + prj_cores = "{:.2f}%".format( + prj_usage["effective_cores"] * 100) + + prj_ram = "{:.2f}%".format(prj_usage["effective_ram"] * 100) + + usr_cores = "{:.2f}% ({:.2f}%)".format( + usr_usage["effective_rel_cores"] * 100, + usr_usage["norm_cores"] * 100) + + usr_ram = "{:.2f}% ({:.2f}%)".format( + usr_usage["effective_rel_ram"] * 100, + usr_usage["norm_ram"] * 100) + + msg += raw.format( + project["name"], prj_cores, prj_ram, + user["name"], usr_cores, usr_ram) + msg += separator + print(msg) diff --git a/synergy_scheduler_manager/fairshare_manager.py b/synergy_scheduler_manager/fairshare_manager.py index e7d2739..af5cbe0 100644 --- a/synergy_scheduler_manager/fairshare_manager.py +++ b/synergy_scheduler_manager/fairshare_manager.py @@ -249,7 +249,6 @@ class FairShareManager(Manager): # check the share for each user and update the usage_record users = project["users"] prj_id = project["id"] - # prj_name = project["name"] prj_share = project["share"] sibling_share = float(0) @@ -310,14 +309,6 @@ class FairShareManager(Manager): sibling_share = project["sibling_share"] users = project["users"] - # effect_prj_cores_usage = actual_usage_cores + - # ((total_actual_usage_cores - actual_usage_cores) * - # prj_share / total_prj_share) - - # effect_prj_cores_usage = actual_usage_ram + - # ((total_actual_usage_ram - actual_usage_ram) * - # prj_share / total_prj_share) - effect_prj_ram_usage = actual_usage_ram effect_prj_cores_usage = actual_usage_cores @@ -358,13 +349,8 @@ class FairShareManager(Manager): user_usage["effective_rel_cores"] /= actual_usage_cores if actual_usage_ram > 0: - user_usage["effect_rel_ram"] = norm_usage_ram - user_usage["effect_rel_ram"] /= actual_usage_ram - - # user["effect_usage_rel_cores"] = effect_usage_cores / - # effect_prj_cores_usage - # user["effect_usage_rel_ram"] = effect_usage_ram / - # effect_prj_cores_usage + user_usage["effective_rel_ram"] = norm_usage_ram + user_usage["effective_rel_ram"] /= actual_usage_ram if norm_share > 0: f_ram = 2 ** (-effect_usage_ram / norm_share) diff --git a/synergy_scheduler_manager/scheduler_manager.py b/synergy_scheduler_manager/scheduler_manager.py index fe11b97..9dce5fa 100644 --- a/synergy_scheduler_manager/scheduler_manager.py +++ b/synergy_scheduler_manager/scheduler_manager.py @@ -40,7 +40,7 @@ class Notifications(object): self.dynamic_quota = dynamic_quota - def info(self, event_type, payload): + def info(self, ctxt, publisher_id, event_type, payload, metadata): LOG.debug("Notification INFO: event_type=%s payload=%s" % (event_type, payload)) @@ -69,8 +69,6 @@ class Notifications(object): instance_id = instance_info["instance_id"] ram = instance_info["memory_mb"] cores = instance_info["vcpus"] - # disk = instance_info["root_gb"] - # node = instance_info["node"] LOG.debug("Notification INFO (type=%s state=%s): cores=%s ram=%s " "prj_id=%s instance_id=%s" @@ -81,15 +79,15 @@ class Notifications(object): except Exception as ex: LOG.warn("Notification INFO: %s" % ex) - def warn(self, event_type, payload): + def warn(self, ctxt, publisher_id, event_type, payload, metadata): state = payload["state"] instance_id = payload["instance_id"] - LOG.info("Notification WARN: event_type=%s state=%s instance_id=%s" - % (event_type, state, instance_id)) + LOG.debug("Notification WARN: event_type=%s state=%s instance_id=%s" + % (event_type, state, instance_id)) - def error(self, event_type, payload, metadata): - LOG.info("Notification ERROR: event_type=%s payload=%s metadata=%s" - % (event_type, payload, metadata)) + def error(self, ctxt, publisher_id, event_type, payload, metadata): + LOG.debug("Notification ERROR: event_type=%s payload=%s metadata=%s" + % (event_type, payload, metadata)) class Worker(threading.Thread): @@ -110,7 +108,6 @@ class Worker(threading.Thread): def destroy(self): try: - # if self.queue: self.queue.close() self.exit = True @@ -126,8 +123,6 @@ class Worker(threading.Thread): queue_item = self.queue.getItem() except Exception as ex: LOG.error("Worker %r: %s" % (self.name, ex)) - # self.exit = True - # break continue if queue_item is None: @@ -137,7 +132,6 @@ class Worker(threading.Thread): request = queue_item.getData() instance = request["instance"] - # user_id = instance["nova_object.data"]["user_id"] prj_id = instance["nova_object.data"]["project_id"] uuid = instance["nova_object.data"]["uuid"] vcpus = instance["nova_object.data"]["vcpus"] @@ -153,8 +147,6 @@ class Worker(threading.Thread): image = request["image"] try: - # vm_instance = self.novaConductorAPI.instance_get_by_uuid - # (context, instance_uuid=instance_uuid) server = self.nova_manager.execute("GET_SERVER", id=uuid) except Exception as ex: @@ -178,11 +170,6 @@ class Worker(threading.Thread): ram=memory_mb) continue - # LOG.info(request_spec) - - # if (self.quota.reserve(instance_uuid, vcpus, memory_mb)): - # done = False - if self.quota.allocate(instance_id=uuid, prj_id=prj_id, cores=vcpus, @@ -217,14 +204,8 @@ class Worker(threading.Thread): self.queue.deleteItem(queue_item) except Exception as ex: LOG.error("Worker '%s': %s" % (self.name, ex)) - # self.queue.reinsertItem(queue_item) - continue - # LOG.info("Worker done is %s" % done) - - # LOG.info(">>>> Worker '%s' queue.isClosed %s exit=%s" - # % (self.name, self.queue.isClosed(), self.exit)) LOG.info("Worker '%s' destroyed!" % self.name) @@ -423,8 +404,6 @@ class SchedulerManager(Manager): memory_mb = instance["nova_object.data"]["memory_mb"] if prj_id in self.projects: - # prj_name = self.projects[prj_id]["name"] - # metadata = instance["nova_object.data"]["metadata"] timestamp = instance["nova_object.data"]["created_at"] timestamp = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%SZ") priority = 0