From f8ca81d631f364ea1b7c7df68f03209112915dbb Mon Sep 17 00:00:00 2001 From: Lisa Zangrando Date: Thu, 3 Aug 2017 14:48:41 +0200 Subject: [PATCH] Added queue usage to project This commit adds some info about the queue usage to each project Change-Id: If14ca606148b0dbd2b22bfa470196b2db7c47d07 Sem-Ver: bugfix --- setup.cfg | 4 +- synergy_scheduler_manager/client/command.py | 319 ++++--------- synergy_scheduler_manager/common/project.py | 15 + synergy_scheduler_manager/common/queue.py | 447 +++++++++--------- .../fairshare_manager.py | 18 +- synergy_scheduler_manager/nova_manager.py | 98 ++-- synergy_scheduler_manager/project_manager.py | 9 +- synergy_scheduler_manager/queue_manager.py | 60 +-- synergy_scheduler_manager/quota_manager.py | 27 +- .../scheduler_manager.py | 49 +- .../functional/test_scheduler_manager.py | 10 +- .../tests/unit/test_queue.py | 225 ++++----- 12 files changed, 543 insertions(+), 738 deletions(-) diff --git a/setup.cfg b/setup.cfg index a5bd87d..94a0110 100644 --- a/setup.cfg +++ b/setup.cfg @@ -30,10 +30,8 @@ synergy.managers = ProjectManager = synergy_scheduler_manager.project_manager:ProjectManager synergy.commands = + user = synergy_scheduler_manager.client.command:UserCommand project = synergy_scheduler_manager.client.command:ProjectCommand - quota = synergy_scheduler_manager.client.command:QuotaCommand - queue = synergy_scheduler_manager.client.command:QueueCommand - usage = synergy_scheduler_manager.client.command:UsageCommand [build_sphinx] source-dir = doc/source diff --git a/synergy_scheduler_manager/client/command.py b/synergy_scheduler_manager/client/command.py index ffedd8c..d199e0d 100644 --- a/synergy_scheduler_manager/client/command.py +++ b/synergy_scheduler_manager/client/command.py @@ -1,8 +1,6 @@ from synergy.client.command import ExecuteCommand from synergy.client.tabulate import tabulate from synergy_scheduler_manager.common.project import Project -from synergy_scheduler_manager.common.quota import SharedQuota -from synergy_scheduler_manager.common.user import User __author__ = "Lisa Zangrando" @@ -202,272 +200,121 @@ class ProjectCommand(ExecuteCommand): data["effective_memory"] * 100) row.append(usage) - table.append(row) - print(tabulate(table, headers, tablefmt="fancy_grid")) + if attribute == "queue": + data = project.getData() + q_usage = data.get("queue_usage", 0) + q_size = data.get("queue_size", 0) - -class QueueCommand(ExecuteCommand): - - def __init__(self): - super(QueueCommand, self).__init__("QueueCommand") - - def configureParser(self, subparser): - queue_parser = subparser.add_parser('queue') - queue_subparsers = queue_parser.add_subparsers(dest="command") - queue_subparsers.add_parser("show", add_help=True, - help="shows the queue info") - - def execute(self, synergy_url, args): - if args.command == "show": - command = "GET_QUEUE" - cmd_args = {"name": "DYNAMIC"} - - queue = super(QueueCommand, self).execute(synergy_url, - "QueueManager", - command, - args=cmd_args) - table = [] - headers = ["name", "size", "is open"] - - row = [] - row.append(queue.getName()) - row.append(queue.getSize()) - row.append(str(queue.isOpen()).lower()) - - table.append(row) - - print(tabulate(table, headers, tablefmt="fancy_grid")) - - -class QuotaCommand(ExecuteCommand): - - def __init__(self): - super(QuotaCommand, self).__init__("QuotaCommand") - - def configureParser(self, subparser): - quota_parser = subparser.add_parser('quota') - quota_subparsers = quota_parser.add_subparsers(dest="command") - show_parser = quota_subparsers.add_parser("show", add_help=True, - help="shows the quota info") - group = show_parser.add_mutually_exclusive_group() - group.add_argument("-i", "--project_id", metavar="") - group.add_argument("-n", "--project_name", metavar="") - group.add_argument("-a", "--all_projects", action="store_true") - group.add_argument("-s", "--shared", action="store_true") - - def execute(self, synergy_url, args): - if args.command == "show": - command = "show" - cmd_args = {"shared": args.shared, - "project_id": args.project_id, - "project_name": args.project_name, - "all_projects": args.all_projects} - - result = super(QuotaCommand, self).execute(synergy_url, - "QuotaManager", - command, - args=cmd_args) - - if isinstance(result, SharedQuota): - self.printSharedQuota(result) - elif isinstance(result, Project): - self.printProjects([result]) - else: - self.printProjects(result) - - def printProjects(self, projects): - table = [] - headers = ["project", "private quota", "shared quota", "share", "TTL"] - - for project in projects: - share = project.getShare() - norm_share = share.getNormalizedValue() - quota = project.getQuota() - vcpus_size = quota.getSize("vcpus", private=False) - vcpus_usage = quota.getUsage("vcpus", private=False) - memory_size = quota.getSize("memory", private=False) - memory_usage = quota.getUsage("memory", private=False) - - row = [] - row.append(project.getName()) - - private = "vcpus: {:.2f} of {:.2f} | memory: {:.2f} of "\ - "{:.2f}".format(quota.getUsage("vcpus"), - quota.getSize("vcpus"), - quota.getUsage("memory"), - quota.getSize("memory")) - - shared = "vcpus: {:.2f} of {:.2f} | memory: {:.2f} of {:.2f} | "\ - "share: {:.2f}% | TTL: {:.2f}".format(vcpus_usage, - vcpus_size, - memory_usage, - memory_size, - norm_share * 100, - project.getTTL()) - - row.append(private) - row.append(shared) + if q_size: + usage = float(q_usage) / float(q_size) * 100 + row.append("{:.2f}%".format(usage)) + else: + row.append("0%") table.append(row) print(tabulate(table, headers, tablefmt="fancy_grid")) - def printSharedQuota(self, quota): - table = [] - headers = ["resource", "used", "size"] - resources = ["vcpus", "memory", "instances"] - for resource in resources: - row = [resource, quota.getUsage(resource), quota.getSize(resource)] - table.append(row) - - print(tabulate(table, headers, tablefmt="fancy_grid")) - - -class UsageCommand(ExecuteCommand): +class UserCommand(ExecuteCommand): def __init__(self): - super(UsageCommand, self).__init__("UsageCommand") + super(UserCommand, self).__init__("UserCommand") def configureParser(self, subparser): - usage_parser = subparser.add_parser('usage') - usage_subparsers = usage_parser.add_subparsers(dest="command") - show_parser = usage_subparsers.add_parser("show", add_help=True, - help="shows the usage info") + usr_parser = subparser.add_parser('user') + usr_subparsers = usr_parser.add_subparsers(dest="command") - subparsers = show_parser.add_subparsers() - parser_a = subparsers.add_parser('project', help='project help') + show_parser = usr_subparsers.add_parser("show", add_help=True, + help="shows the user info") + group = show_parser.add_mutually_exclusive_group(required=True) + group.add_argument("-i", "--id", metavar="") + group.add_argument("-n", "--name", metavar="") + group.add_argument("-a", "--all", action="store_true") + group2 = show_parser.add_mutually_exclusive_group(required=True) + group2.add_argument("-j", "--prj_id", metavar="") + group2.add_argument("-m", "--prj_name", metavar="") - group = parser_a.add_mutually_exclusive_group() - group.add_argument("-d", "--project_id", metavar="") - group.add_argument("-m", "--project_name", metavar="") - group.add_argument("-a", "--all_projects", action="store_true") - - parser_b = subparsers.add_parser('user', help='user help') - - group = parser_b.add_mutually_exclusive_group(required=True) - group.add_argument("-d", "--project_id", metavar="") - group.add_argument("-m", "--project_name", metavar="") - - group = parser_b.add_mutually_exclusive_group(required=True) - group.add_argument("-i", "--user_id", metavar="") - group.add_argument("-n", "--user_name", metavar="") - group.add_argument("-a", "--all_users", action="store_true") + show_parser.add_argument("-s", "--share", action="store_true") + show_parser.add_argument("-u", "--usage", action="store_true") + show_parser.add_argument("-p", "--priority", action="store_true") + show_parser.add_argument("-l", "--long", action="store_true") def execute(self, synergy_url, args): - if args.command == "show": - command = "show" - user_id = None - if hasattr(args, "user_id"): - user_id = args.user_id + usr_id = getattr(args, 'id', None) + usr_name = getattr(args, 'name', None) + prj_id = getattr(args, 'prj_id', None) + prj_name = getattr(args, 'prj_name', None) + command = getattr(args, 'command', None) + headers = ["name"] - user_name = None - if hasattr(args, "user_name"): - user_name = args.user_name + if command == "show": + if args.long: + headers.insert(0, "id") + if args.share: + headers.append("share") + if args.usage: + headers.append("usage") + if args.priority: + headers.append("priority") - all_users = False - if hasattr(args, "all_users"): - all_users = args.all_users + cmd_args = {"id": prj_id, "name": prj_name} + result = super(UserCommand, self).execute(synergy_url, + "ProjectManager", + "GET_PROJECT", + args=cmd_args) - project_id = None - if hasattr(args, "project_id"): - project_id = args.project_id - - project_name = None - if hasattr(args, "project_name"): - project_name = args.project_name - - all_projects = False - if hasattr(args, "all_projects"): - all_projects = args.all_projects - - cmd_args = {"user_id": user_id, - "user_name": user_name, - "all_users": all_users, - "project_id": project_id, - "project_name": project_name, - "all_projects": all_projects} - - result = super(UsageCommand, self).execute(synergy_url, - "SchedulerManager", - command, - args=cmd_args) - - if isinstance(result, Project): - self.printProjects([result]) - elif isinstance(result, User): - self.printUsers([result]) - elif isinstance(result, list): - if all(isinstance(n, Project) for n in result): - self.printProjects(result) - else: - self.printUsers(result) - - def printProjects(self, projects): - if not projects: + if not result: + print("project not found!") return - data = projects[0].getData() - date_format = "{:%d %b %Y %H:%M:%S}" - from_date = date_format.format(data["time_window_from_date"]) - to_date = date_format.format(data["time_window_to_date"]) + self.printProject(result, headers, usr_id, usr_name) - headers = ["project", - "shared quota (%s - %s)" % (from_date, to_date), - "share"] - - table = [] - - for project in projects: - data = project.getData() - share = project.getShare() - row = [] - row.append(project.getName()) - - shared = "vcpus: {:.2f}% | memory: {:.2f}%".format( - data["effective_vcpus"] * 100, data["effective_memory"] * 100) - - row.append(shared) - row.append("{:.2f}%".format(share.getNormalizedValue() * 100)) - - table.append(row) - - print(tabulate(table, headers, tablefmt="fancy_grid")) - - def printUsers(self, users): - if not users: + def printProject(self, project, headers, usr_id, usr_name): + if not project: return table = [] + users = None - date_format = "{:%d %b %Y %H:%M:%S}" - data = users[0].getData() + if usr_id or usr_name: + user = project.getUser(id=usr_id, name=usr_name) + if not user: + print("user not found!") + return - from_date = date_format.format(data["time_window_from_date"]) - to_date = date_format.format(data["time_window_to_date"]) - - headers = ["user", - "shared quota (%s - %s)" % (from_date, to_date), - "share", - "priority"] + users = [user] + else: + users = project.getUsers() for user in users: - share = user.getShare() - - data = user.getData() - - priority = user.getPriority() - row = [] - row.append(user.getName()) - row.append("vcpus: {:.2f}% | memory: {:.2f}%".format( - data["actual_rel_vcpus"] * 100, - data["actual_rel_memory"] * 100)) + for attribute in headers: + if attribute == "id": + row.append(user.getId()) - row.append("{:.2f}%".format(share.getNormalizedValue() * 100)) - row.append("{:.2f}".format(priority.getValue())) + if attribute == "name": + row.append(user.getName()) + + if attribute == "share": + share = user.getShare() + share_value = share.getValue() + share_norm = share.getNormalizedValue() + row.append("{:.2f}% | {:.2f}%".format(share_value, + share_norm * 100)) + if attribute == "priority": + row.append(user.getPriority().getValue()) + + if attribute == "usage": + data = user.getData() + + usage = "vcpus: {:.2f}% | ram: {:.2f}%".format( + data["actual_vcpus"] * 100, + data["actual_memory"] * 100) + + row.append(usage) table.append(row) diff --git a/synergy_scheduler_manager/common/project.py b/synergy_scheduler_manager/common/project.py index 8ae0b70..4cb3b70 100644 --- a/synergy_scheduler_manager/common/project.py +++ b/synergy_scheduler_manager/common/project.py @@ -40,6 +40,12 @@ class Project(SynergyObject): def getQuota(self): return self.get("quota") + def setQueue(self, queue): + self.set("queue", queue) + + def getQueue(self): + return self.get("queue") + def getShare(self): return self.get("share") @@ -75,3 +81,12 @@ class Project(SynergyObject): def setEnabled(self, enabled=True): self.set("enabled", enabled) + + def serialize(self): + queue = self.getQueue() + usage = queue.getUsage(self.getId()) + + self.getData()["queue_usage"] = usage + self.getData()["queue_size"] = queue.getSize() + + return super(Project, self).serialize() diff --git a/synergy_scheduler_manager/common/queue.py b/synergy_scheduler_manager/common/queue.py index 6b0662e..4ae8a7c 100644 --- a/synergy_scheduler_manager/common/queue.py +++ b/synergy_scheduler_manager/common/queue.py @@ -1,10 +1,10 @@ -import heapq import json -import threading +import Queue as queue from datetime import datetime from sqlalchemy.exc import SQLAlchemyError from synergy.common.serializer import SynergyObject +from synergy.exception import SynergyError __author__ = "Lisa Zangrando" @@ -28,16 +28,15 @@ permissions and limitations under the License.""" class QueueItem(object): - def __init__(self, id, user_id, prj_id, priority, - retry_count, creation_time, last_update, data=None): - self.id = id - self.user_id = user_id - self.prj_id = prj_id - self.priority = priority - self.retry_count = retry_count - self.creation_time = creation_time - self.last_update = last_update - self.data = data + def __init__(self): + self.id = -1 + self.priority = 0 + self.retry_count = 0 + self.user_id = None + self.prj_id = None + self.data = None + self.creation_time = datetime.now() + self.last_update = self.creation_time def getId(self): return self.id @@ -91,47 +90,32 @@ class QueueItem(object): self.data = data -class PriorityQueue(object): - - def __init__(self): - self._heap = [] - - def __len__(self): - return len(self._heap) - - def __iter__(self): - """Get all elements ordered by asc. priority. """ - return self - - def put(self, priority, item): - heapq.heappush(self._heap, (-priority, item.getCreationTime(), item)) - - def get(self): - return heapq.heappop(self._heap)[2] - - def size(self): - return len(self._heap) - - def items(self): - return [heapq.heappop(self._heap)[2] for i in range(len(self._heap))] - - def smallest(self, x): - result = heapq.nsmallest(x, self._heap, key=lambda s: -s[0]) - return [item[2] for item in result] - - def largest(self, x): - result = heapq.nlargest(x, self._heap, key=lambda s: -s[0]) - return [item[2] for item in result] - - class Queue(SynergyObject): - def __init__(self): + def __init__(self, name="default", type="PRIORITY", db_engine=None): super(Queue, self).__init__() - self.setName("N/A") + if type == "FIFO": + self.queue = queue.Queue() + elif type == "LIFO": + self.queue = queue.LifoQueue() + elif type == "PRIORITY": + self.queue = queue.PriorityQueue() + else: + raise SynergyError("queue type %r not supported" % type) + + self.set("type", type) self.set("is_closed", False) self.set("size", 0) + self.setName(name) + self.db_engine = db_engine + + self._createTable() + self._buildFromDB() + + def _setSize(self, value): + size = self.get("size") + self.set("size", size + value) def isOpen(self): return not self.get("is_closed") @@ -142,69 +126,144 @@ class Queue(SynergyObject): def setClosed(self, is_closed): self.set("is_closed", is_closed) + def isEmpty(self): + return self.queue.empty() + + def close(self): + self.setClosed(True) + + def enqueue(self, user, data, priority=0): + if self.isClosed(): + raise SynergyError("the queue is closed!") + + if not user: + raise SynergyError("user not specified!") + + if not data: + raise SynergyError("data not specified!") + + item = QueueItem() + item.setUserId(user.getId()) + item.setProjectId(user.getProjectId()) + item.setPriority(priority) + item.setData(data) + + self._insertItemDB(item) + + if self.getType() == "PRIORITY": + self.queue.put((-priority, item.getCreationTime(), item)) + else: + self.queue.put(item) + + self._setSize(1) + + def dequeue(self, block=True, timeout=None, delete=False): + if self.isClosed(): + raise SynergyError("the queue is closed!") + + if self.queue.empty() and not block: + return None + + item = self.queue.get(block=block, timeout=timeout) + + if not item: + return None + + if self.getType() == "PRIORITY": + item = item[2] + + self._getItemDataDB(item) + + if delete: + self.delete(item) + + return item + + def restore(self, item): + if self.isClosed(): + raise SynergyError("the queue is closed!") + + if self.getType() == "PRIORITY": + self.queue.put((-item.getPriority(), item.getCreationTime(), item)) + else: + self.queue.put(item) + + self._updateItemDB(item) + + def getType(self): + return self.get("type") + def getSize(self): return self.get("size") - def setSize(self, size): - self.set("size", size) - - -class QueueDB(Queue): - - def __init__(self, name, db_engine, fairshare_manager=None): - super(QueueDB, self).__init__() - self.setName(name) - - self.db_engine = db_engine - self.fairshare_manager = fairshare_manager - self.priority_updater = None - self.condition = threading.Condition() - self.pqueue = PriorityQueue() - self.createTable() - self.buildFromDB() - self.updatePriority() - - def getSize(self): + def getUsage(self, prj_id): + result = 0 connection = self.db_engine.connect() try: - QUERY = "select count(*) from `%s`" % self.getName() - result = connection.execute(QUERY) + QUERY = "select count(*) from `%s` " % self.getName() + QUERY += "where prj_id=%s" - row = result.fetchone() - - return row[0] + qresult = connection.execute(QUERY, [prj_id]) + row = qresult.fetchone() + result = row[0] except SQLAlchemyError as ex: - raise Exception(ex.message) + raise SynergyError(ex.message) + finally: + connection.close() + return result + + def delete(self, item): + if self.isClosed(): + raise SynergyError("the queue is closed!") + + if not item or not self.db_engine: + return + + connection = self.db_engine.connect() + trans = connection.begin() + + try: + QUERY = "delete from `%s`" % self.getName() + QUERY += " where id=%s" + + connection.execute(QUERY, [item.getId()]) + + trans.commit() + except SQLAlchemyError as ex: + trans.rollback() + raise SynergyError(ex.message) finally: connection.close() - def createTable(self): + self._setSize(-1) + self.queue.task_done() + + def _createTable(self): + if not self.db_engine: + return + TABLE = """CREATE TABLE IF NOT EXISTS `%s` (`id` BIGINT NOT NULL \ AUTO_INCREMENT PRIMARY KEY, `priority` INT DEFAULT 0, user_id CHAR(40) \ NOT NULL, prj_id CHAR(40) NOT NULL, `retry_count` INT DEFAULT 0, \ `creation_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, `last_update` \ -TIMESTAMP NULL, `data` TEXT NOT NULL ) ENGINE=InnoDB""" % self.getName() +TIMESTAMP NULL, `data` TEXT NOT NULL) ENGINE=InnoDB""" % self.getName() connection = self.db_engine.connect() try: connection.execute(TABLE) except SQLAlchemyError as ex: - raise Exception(ex.message) + raise SynergyError(ex.message) except Exception as ex: - raise Exception(ex.message) + raise SynergyError(ex.message) finally: connection.close() - def close(self): - if not self.isClosed(): - self.setClosed(True) + def _buildFromDB(self): + if not self.db_engine: + return - with self.condition: - self.condition.notifyAll() - - def buildFromDB(self): connection = self.db_engine.connect() try: @@ -213,182 +272,98 @@ TIMESTAMP NULL, `data` TEXT NOT NULL ) ENGINE=InnoDB""" % self.getName() result = connection.execute(QUERY) for row in result: - queue_item = QueueItem(row[0], row[1], row[2], - row[3], row[4], row[5], row[6]) + item = QueueItem() + item.setId(row[0]) + item.setUserId(row[1]) + item.setProjectId(row[2]) + item.setPriority(row[3]) + item.setRetryCount(row[4]) + item.setCreationTime(row[5]) + item.setLastUpdate(row[6]) - self.pqueue.put(row[3], queue_item) + self.restore(item) + self._setSize(1) except SQLAlchemyError as ex: - raise Exception(ex.message) + raise SynergyError(ex.message) finally: connection.close() - with self.condition: - self.condition.notifyAll() - - def insertItem(self, user_id, prj_id, priority, data): - with self.condition: - idRecord = -1 - QUERY = "insert into `%s` (user_id, prj_id, priority, " \ - "data) values" % self.getName() - QUERY += "(%s, %s, %s, %s)" - - connection = self.db_engine.connect() - trans = connection.begin() - - try: - result = connection.execute(QUERY, - [user_id, prj_id, priority, - json.dumps(data)]) - - idRecord = result.lastrowid - - trans.commit() - except SQLAlchemyError as ex: - trans.rollback() - raise Exception(ex.message) - finally: - connection.close() - - now = datetime.now() - queue_item = QueueItem(idRecord, user_id, prj_id, - priority, 0, now, now) - - self.pqueue.put(priority, queue_item) - - self.condition.notifyAll() - - def reinsertItem(self, queue_item): - with self.condition: - self.pqueue.put(queue_item.getPriority(), queue_item) - self.condition.notifyAll() - - def getItem(self, blocking=True): - item = None - queue_item = None - - with self.condition: - while (queue_item is None and not self.isClosed()): - if len(self.pqueue): - queue_item = self.pqueue.get() - elif blocking: - self.condition.wait() - elif queue_item is None: - break - - if (not self.isClosed() and queue_item is not None): - connection = self.db_engine.connect() - - try: - QUERY = """select user_id, prj_id, priority, \ -retry_count, creation_time, last_update, data from `%s`""" % self.getName() - QUERY += " where id=%s" - - result = connection.execute(QUERY, [queue_item.getId()]) - - row = result.fetchone() - - item = QueueItem(queue_item.getId(), row[0], row[1], - row[2], row[3], row[4], row[5], - json.loads(row[6])) - except SQLAlchemyError as ex: - raise Exception(ex.message) - finally: - connection.close() - - self.condition.notifyAll() - - return item - - def deleteItem(self, queue_item): - if not queue_item: + def _insertItemDB(self, item): + if not item or not self.db_engine: return - with self.condition: - connection = self.db_engine.connect() - trans = connection.begin() + QUERY = "insert into `%s` (user_id, prj_id, priority, " \ + "data) values" % self.getName() + QUERY += "(%s, %s, %s, %s)" - try: - QUERY = "delete from `%s`" % self.getName() - QUERY += " where id=%s" + connection = self.db_engine.connect() + trans = connection.begin() - connection.execute(QUERY, [queue_item.getId()]) + try: + result = connection.execute(QUERY, + [item.getUserId(), + item.getProjectId(), + item.getPriority(), + json.dumps(item.getData())]) - trans.commit() - except SQLAlchemyError as ex: - trans.rollback() + idRecord = result.lastrowid - raise Exception(ex.message) - finally: - connection.close() - self.condition.notifyAll() + trans.commit() - def updateItem(self, queue_item): - if not queue_item: + item.setId(idRecord) + except SQLAlchemyError as ex: + trans.SynergyError() + raise SynergyError(ex.message) + finally: + connection.close() + + def _getItemDataDB(self, item): + if not item or not self.db_engine: return - with self.condition: - connection = self.db_engine.connect() - trans = connection.begin() + data = None + connection = self.db_engine.connect() - try: - queue_item.setLastUpdate(datetime.now()) + try: + QUERY = "select data from `%s`" % self.getName() + QUERY += " where id=%s" - QUERY = "update `%s`" % self.getName() - QUERY += " set priority=%s, retry_count=%s, " \ - "last_update=%s where id=%s" + result = connection.execute(QUERY, [item.getId()]) - connection.execute(QUERY, [queue_item.getPriority(), - queue_item.getRetryCount(), - queue_item.getLastUpdate(), - queue_item.getId()]) + row = result.fetchone() - trans.commit() - except SQLAlchemyError as ex: - trans.rollback() + data = json.loads(row[0]) + except SQLAlchemyError as ex: + raise SynergyError(ex.message) + finally: + connection.close() - raise Exception(ex.message) - finally: - connection.close() + item.setData(data) + return data - self.pqueue.put(queue_item.getPriority(), queue_item) - self.condition.notifyAll() - - def updatePriority(self): - if self.fairshare_manager is None: + def _updateItemDB(self, item): + if not item or not self.db_engine: return - queue_items = [] + connection = self.db_engine.connect() + trans = connection.begin() - with self.condition: - while len(self.pqueue) > 0: - queue_item = self.pqueue.get() - priority = queue_item.getPriority() + try: + item.setLastUpdate(datetime.now()) - try: - priority = self.fairshare_manager.calculatePriority( - user_id=queue_item.getUserId(), - prj_id=queue_item.getProjectId(), - timestamp=queue_item.getCreationTime(), - retry=queue_item.getRetryCount()) + QUERY = "update `%s`" % self.getName() + QUERY += " set priority=%s, retry_count=%s, " \ + "last_update=%s where id=%s" - queue_item.setPriority(priority) - except Exception: - continue - finally: - queue_items.append(queue_item) + connection.execute(QUERY, [item.getPriority(), + item.getRetryCount(), + item.getLastUpdate(), + item.getId()]) - if len(queue_items) > 0: - for queue_item in queue_items: - self.pqueue.put(queue_item.getPriority(), queue_item) + trans.commit() + except SQLAlchemyError as ex: + trans.rollback() - del queue_items - - self.condition.notifyAll() - - def serialize(self): - queue = Queue() - queue.setName(self.getName()) - queue.setSize(self.getSize()) - queue.setClosed(self.isClosed()) - - return queue.serialize() + raise SynergyError(ex.message) + finally: + connection.close() diff --git a/synergy_scheduler_manager/fairshare_manager.py b/synergy_scheduler_manager/fairshare_manager.py index 031fccd..4f7149f 100644 --- a/synergy_scheduler_manager/fairshare_manager.py +++ b/synergy_scheduler_manager/fairshare_manager.py @@ -72,7 +72,7 @@ class FairShareManager(Manager): def task(self): try: self._calculateFairShare() - except SynergyError as ex: + except Exception as ex: LOG.error(ex) raise ex @@ -144,7 +144,7 @@ class FairShareManager(Manager): total_prj_share = float(0) projects = self.project_manager.getProjects() - for project in projects.values(): + for project in projects: prj_share = project.getShare() # check the share for each user and update the usage_record @@ -168,7 +168,7 @@ class FairShareManager(Manager): total_prj_share += prj_share.getValue() - for project in projects.values(): + for project in projects: prj_share = project.getShare() prj_share.setSiblingValue(total_prj_share) prj_share.setNormalizedValue( @@ -196,7 +196,7 @@ class FairShareManager(Manager): time_window_from_date = to_date time_window_to_date = to_date - for prj_id, project in projects.items(): + for project in projects: prj_data = project.getData() prj_data["actual_vcpus"] = float(0) prj_data["actual_memory"] = float(0) @@ -223,9 +223,9 @@ class FairShareManager(Manager): from_date = to_date - timedelta(days=(self.period_length)) time_window_from_date = from_date - for prj_id, project in projects.items(): + for project in projects: usages = self.nova_manager.getProjectUsage( - prj_id, from_date, to_date) + project.getId(), from_date, to_date) for user_id, usage_rec in usages.items(): decay_vcpus = decay * usage_rec["vcpus"] @@ -243,7 +243,7 @@ class FairShareManager(Manager): to_date = from_date - for prj_id, project in projects.items(): + for project in projects: prj_data = project.getData() prj_data["time_window_to_date"] = time_window_to_date @@ -262,7 +262,7 @@ class FairShareManager(Manager): prj_data["actual_memory"] += usr_data["actual_memory"] prj_data["actual_vcpus"] += usr_data["actual_vcpus"] - for project in projects.values(): + for project in projects: prj_data = project.getData() prj_data["effective_memory"] = prj_data["actual_memory"] prj_data["effective_vcpus"] = prj_data["actual_vcpus"] @@ -294,7 +294,7 @@ class FairShareManager(Manager): share / sibling_share)) usr_data["effective_memory"] = effective_memory - usr_data["effective_cores"] = effective_vcpus + usr_data["effective_vcpus"] = effective_vcpus f_memory = 2 ** (-effective_memory / norm_share) usr_priority.setFairShare("memory", f_memory) diff --git a/synergy_scheduler_manager/nova_manager.py b/synergy_scheduler_manager/nova_manager.py index c4e318e..1f9b067 100644 --- a/synergy_scheduler_manager/nova_manager.py +++ b/synergy_scheduler_manager/nova_manager.py @@ -18,6 +18,7 @@ from oslo_config import cfg from sqlalchemy import create_engine from sqlalchemy.exc import SQLAlchemyError from synergy.common.manager import Manager +from synergy.exception import SynergyError __author__ = "Lisa Zangrando" __email__ = "lisa.zangrando[AT]pd.infn.it" @@ -321,10 +322,10 @@ class NovaManager(Manager): self.timeout = CONF.NovaManager.timeout if self.getManager("KeystoneManager") is None: - raise Exception("KeystoneManager not found!") + raise SynergyError("KeystoneManager not found!") if self.getManager("SchedulerManager") is None: - raise Exception("SchedulerManager not found!") + raise SynergyError("SchedulerManager not found!") self.keystone_manager = self.getManager("KeystoneManager") @@ -399,7 +400,7 @@ class NovaManager(Manager): raise ex def execute(self, command, *args, **kargs): - raise Exception("command %r not supported!" % command) + raise SynergyError("command %r not supported!" % command) def task(self): pass @@ -414,8 +415,8 @@ class NovaManager(Manager): return result if fallback is True: - raise Exception("No attribute %r found in [NovaManager] " - "section of synergy.conf" % name) + raise SynergyError("No attribute %r found in [NovaManager] " + "section of synergy.conf" % name) else: return None @@ -426,8 +427,8 @@ class NovaManager(Manager): secret = CONF.NovaManager.metadata_proxy_shared_secret if not secret: - return Exception("'metadata_proxy_shared_secret' " - "attribute not defined in synergy.conf") + raise SynergyError("'metadata_proxy_shared_secret' " + "attribute not defined in synergy.conf") digest = hmac.new(secret, server.getId(), hashlib.sha256).hexdigest() @@ -436,12 +437,12 @@ class NovaManager(Manager): service = token.getService("nova") if not service: - raise Exception("nova service not found!") + raise SynergyError("nova service not found!") endpoint = service.getEndpoint("public") if not endpoint: - raise Exception("nova endpoint not found!") + raise SynergyError("nova endpoint not found!") url = endpoint.getURL() url = url[:url.rfind(":") + 1] + "8775/openstack/2015-10-15/user_data" @@ -463,9 +464,9 @@ class NovaManager(Manager): return None elif request.status_code == 403: if "Invalid proxy request signature" in request._content: - raise Exception("cannot retrieve the 'userdata' value: " - "check the 'metadata_proxy_shared_secret'" - " attribute value") + raise SynergyError("cannot retrieve the 'userdata' value: " + "check the 'metadata_proxy_shared_" + "secret' attribute value") else: request.raise_for_status() else: @@ -483,8 +484,8 @@ class NovaManager(Manager): response_data = self.getResource(url, method="GET") except requests.exceptions.HTTPError as ex: response = ex.response.json() - raise Exception("error on retrieving the flavors list: %s" - % response) + raise SynergyError("error on retrieving the flavors list: %s" + % response) flavors = [] @@ -505,8 +506,8 @@ class NovaManager(Manager): try: response_data = self.getResource("flavors/" + id, "GET") except requests.exceptions.HTTPError as ex: - raise Exception("error on retrieving the flavor info (id=%r)" - ": %s" % (id, ex.response.json())) + raise SynergyError("error on retrieving the flavor info (id=%r)" + ": %s" % (id, ex.response.json())) flavor = None @@ -533,8 +534,8 @@ class NovaManager(Manager): response_data = self.getResource(url, "GET", params) except requests.exceptions.HTTPError as ex: response = ex.response.json() - raise Exception("error on retrieving the servers list" - ": %s" % (id, response)) + raise SynergyError("error on retrieving the servers list" + ": %s" % (id, response)) servers = [] @@ -571,8 +572,8 @@ class NovaManager(Manager): try: response_data = self.getResource("servers/" + id, "GET") except requests.exceptions.HTTPError as ex: - raise Exception("error on retrieving the server info (id=%r)" - ": %s" % (id, ex.response.json())) + raise SynergyError("error on retrieving the server info (id=%r)" + ": %s" % (id, ex.response.json())) server = None @@ -619,8 +620,8 @@ class NovaManager(Manager): try: response_data = self.getResource(url, "DELETE") except requests.exceptions.HTTPError as ex: - raise Exception("error on deleting the server (id=%r)" - ": %s" % (id, ex.response.json())) + raise SynergyError("error on deleting the server (id=%r)" + ": %s" % (id, ex.response.json())) if response_data: response_data = response_data["server"] @@ -638,8 +639,8 @@ class NovaManager(Manager): try: response_data = self.getResource(url, "POST", data) except requests.exceptions.HTTPError as ex: - raise Exception("error on starting the server %s" - ": %s" % (id, ex.response.json())) + raise SynergyError("error on starting the server %s" + ": %s" % (id, ex.response.json())) if response_data: response_data = response_data["server"] @@ -657,8 +658,8 @@ class NovaManager(Manager): try: response_data = self.getResource(url, "POST", data) except requests.exceptions.HTTPError as ex: - raise Exception("error on stopping the server info (id=%r)" - ": %s" % (id, ex.response.json())) + raise SynergyError("error on stopping the server info (id=%r)" + ": %s" % (id, ex.response.json())) if response_data: response_data = response_data["server"] @@ -674,8 +675,8 @@ class NovaManager(Manager): response_data = self.getResource(url, "GET", data) except requests.exceptions.HTTPError as ex: response = ex.response.json() - raise Exception("error on retrieving the hypervisors list: %s" - % response["badRequest"]["message"]) + raise SynergyError("error on retrieving the hypervisors list: %s" + % response["badRequest"]["message"]) if response_data: response_data = response_data["hosts"] @@ -690,8 +691,9 @@ class NovaManager(Manager): response_data = self.getResource(url, "GET", data) except requests.exceptions.HTTPError as ex: response = ex.response.json() - raise Exception("error on retrieving the hypervisor info (id=%r)" - ": %s" % (id, response["badRequest"]["message"])) + raise SynergyError("error on retrieving the hypervisor info (id=%r" + "): %s" % (id, + response["badRequest"]["message"])) if response_data: response_data = response_data["host"] @@ -707,8 +709,8 @@ class NovaManager(Manager): except requests.exceptions.HTTPError as ex: LOG.info(ex) response = ex.response.json() - raise Exception("error on retrieving the hypervisors list: %s" - % response["badRequest"]["message"]) + raise SynergyError("error on retrieving the hypervisors list: %s" + % response["badRequest"]["message"]) hypervisors = [] @@ -743,8 +745,8 @@ class NovaManager(Manager): try: response_data = self.getResource(url, "GET", data) except requests.exceptions.HTTPError as ex: - raise Exception("error on retrieving the hypervisor info (id=%r)" - ": %s" % (id, ex.response.json())) + raise SynergyError("error on retrieving the hypervisor info (id=%r" + "): %s" % (id, ex.response.json())) hypervisor = None @@ -775,8 +777,8 @@ class NovaManager(Manager): url = "os-quota-sets/defaults" response_data = self.getResource(url, "GET") except requests.exceptions.HTTPError as ex: - raise Exception("error on retrieving the quota defaults" - ": %s" % ex.response.json()) + raise SynergyError("error on retrieving the quota defaults" + ": %s" % ex.response.json()) elif id is not None: if is_class: url = "os-quota-class-sets/%s" % id @@ -791,10 +793,10 @@ class NovaManager(Manager): else: quota_data = response_data["quota_set"] except requests.exceptions.HTTPError as ex: - raise Exception("error on retrieving the quota info (id=%r)" - ": %s" % (id, ex.response.json())) + raise SynergyError("error on retrieving the quota info (id=%r)" + ": %s" % (id, ex.response.json())) else: - raise Exception("wrong arguments") + raise SynergyError("wrong arguments") quota = None @@ -825,8 +827,8 @@ class NovaManager(Manager): try: self.getResource(url, "PUT", qs) except requests.exceptions.HTTPError as ex: - raise Exception("error on updating the quota info (id=%r)" - ": %s" % (id, ex.response.json())) + raise SynergyError("error on updating the quota info (id=%r)" + ": %s" % (id, ex.response.json())) def getResource(self, resource, method, data=None): self.keystone_manager.authenticate() @@ -834,12 +836,12 @@ class NovaManager(Manager): service = token.getService("nova") if not service: - raise Exception("nova service not found!") + raise SynergyError("nova service not found!") endpoint = service.getEndpoint("public") if not endpoint: - raise Exception("nova endpoint not found!") + raise SynergyError("nova endpoint not found!") url = endpoint.getURL() + "/" + resource @@ -883,7 +885,7 @@ class NovaManager(Manager): verify=self.ssl_ca_file, cert=self.ssl_cert_file) else: - raise Exception("wrong HTTP method: %s" % method) + raise SynergyError("wrong HTTP method: %s" % method) if request.status_code != requests.codes.ok: request.raise_for_status() @@ -943,7 +945,7 @@ a.launched_at<='%(to_date)s' and (a.terminated_at>='%(from_date)s' or \ "vcpus": float(row[2])} except SQLAlchemyError as ex: - raise Exception(ex.message) + raise SynergyError(ex.message) finally: connection.close() @@ -990,7 +992,7 @@ where instance_uuid='%(id)s' and deleted_at is NULL""" % {"id": server.getId()} servers.append(server) except SQLAlchemyError as ex: - raise Exception(ex.message) + raise SynergyError(ex.message) finally: connection.close() @@ -1046,7 +1048,7 @@ where instance_uuid='%(id)s' and deleted_at is NULL""" % {"id": server.getId()} servers.append(server) except SQLAlchemyError as ex: - raise Exception(ex.message) + raise SynergyError(ex.message) finally: connection.close() @@ -1123,7 +1125,7 @@ nova.block_device_mapping where instance_uuid='%(server_id)s' blockDeviceMapList.append(blockDeviceMap) except SQLAlchemyError as ex: - raise Exception(ex.message) + raise SynergyError(ex.message) finally: connection.close() diff --git a/synergy_scheduler_manager/project_manager.py b/synergy_scheduler_manager/project_manager.py index 3c7e734..47665b4 100644 --- a/synergy_scheduler_manager/project_manager.py +++ b/synergy_scheduler_manager/project_manager.py @@ -70,8 +70,11 @@ class ProjectManager(Manager): def task(self): if not self.configured: - self.buildFromDB() - self.configured = True + try: + self.buildFromDB() + self.configured = True + except Exception as ex: + LOG.error(ex) def destroy(self): pass @@ -296,7 +299,7 @@ class ProjectManager(Manager): return project def getProjects(self): - return self.projects + return self.projects.values() def createTable(self): TABLE = """CREATE TABLE IF NOT EXISTS project (`id` VARCHAR(64) \ diff --git a/synergy_scheduler_manager/queue_manager.py b/synergy_scheduler_manager/queue_manager.py index 3718c6f..d56ffc0 100644 --- a/synergy_scheduler_manager/queue_manager.py +++ b/synergy_scheduler_manager/queue_manager.py @@ -1,9 +1,10 @@ import logging -from common.queue import QueueDB +from common.queue import Queue from oslo_config import cfg from sqlalchemy import create_engine from synergy.common.manager import Manager +from synergy.exception import SynergyError __author__ = "Lisa Zangrando" @@ -39,17 +40,9 @@ class QueueManager(Manager): cfg.IntOpt('db_pool_recycle', default=30, required=False), cfg.IntOpt('db_max_overflow', default=5, required=False) ] - self.queue_list = {} + self.queues = {} def setup(self): - if self.getManager("FairShareManager") is None: - raise Exception("FairShareManager not found!") - - self.fairshare_manager = self.getManager("FairShareManager") - - if self.fairshare_manager is None: - raise Exception("FairShareManager not found!") - db_connection = CONF.QueueManager.db_connection pool_size = CONF.QueueManager.db_pool_size pool_recycle = CONF.QueueManager.db_pool_recycle @@ -61,49 +54,42 @@ class QueueManager(Manager): pool_recycle=pool_recycle, max_overflow=max_overflow) except Exception as ex: - LOG.error(ex) - raise ex + raise SynergyError(ex.message) def execute(self, command, *args, **kargs): - if command == "CREATE_QUEUE": - return self.createQueue(*args, **kargs) - elif command == "DELETE_QUEUE": - return self.deleteQueue(*args, **kargs) - elif command == "GET_QUEUE": - queue = self.getQueue(kargs.get("name", None)) - return queue - + if command == "GET_QUEUE": + return self.getQueue(kargs.get("name", None)) else: - raise Exception("command=%r not supported!" % command) + raise SynergyError("command %r not supported!" % command) def task(self): try: - for queue in self.queue_list.values(): + for queue in self.queues.values(): queue.updatePriority() except Exception as ex: - LOG.error("Exception has occured", exc_info=1) LOG.error(ex) def destroy(self): - for queue in self.queue_list.values(): + for queue in self.queues.values(): queue.close() - def createQueue(self, name): - if name not in self.queue_list: - queue = QueueDB(name, self.db_engine, self.fairshare_manager) - self.queue_list[name] = queue - return queue - else: - raise Exception("the queue %r already exists!" % name) + def createQueue(self, name, type): + if name in self.queues: + raise SynergyError("the queue %r already exists!" % name) + + queue = Queue(name, type, db_engine=self.db_engine) + self.queues[name] = queue + + return queue def deleteQueue(self, name): - if name not in self.queue_list: - raise Exception("queue %r not found!" % name) + if name not in self.queues: + raise SynergyError("queue %r not found!" % name) - del self.queue_list[name] + del self.queues[name] def getQueue(self, name): - if name not in self.queue_list: - raise Exception("queue %r not found!" % name) + if name not in self.queues: + raise SynergyError("queue %r not found!" % name) - return self.queue_list[name] + return self.queues[name] diff --git a/synergy_scheduler_manager/quota_manager.py b/synergy_scheduler_manager/quota_manager.py index ce04daf..905feb4 100644 --- a/synergy_scheduler_manager/quota_manager.py +++ b/synergy_scheduler_manager/quota_manager.py @@ -77,7 +77,7 @@ class QuotaManager(Manager): try: self.updateSharedQuota() self.deleteExpiredServers() - except SynergyError as ex: + except Exception as ex: LOG.error(ex) def doOnEvent(self, event_type, *args, **kwargs): @@ -109,13 +109,15 @@ class QuotaManager(Manager): quota.setSize("vcpus", class_quota.getSize("vcpus")) quota.setSize("memory", class_quota.getSize("memory")) quota.setSize("instances", class_quota.getSize("instances")) - quota.setSize( - "vcpus", SharedQuota.getSize("vcpus"), private=False) - quota.setSize( - "memory", SharedQuota.getSize("memory"), private=False) - quota.setSize( - "instances", SharedQuota.getSize("instances"), - private=False) + quota.setSize("vcpus", + SharedQuota.getSize("vcpus"), + private=False) + quota.setSize("memory", + SharedQuota.getSize("memory"), + private=False) + quota.setSize("instances", + SharedQuota.getSize("instances"), + private=False) servers = self.nova_manager.getProjectServers(project.getId()) @@ -155,8 +157,8 @@ class QuotaManager(Manager): quota = self.nova_manager.getQuota(project.getId()) if quota.getSize("vcpus") <= -1 and \ - quota.getSize("memory") <= -1 and \ - quota.getSize("instances") <= -1: + quota.getSize("memory") <= -1 and \ + quota.getSize("instances") <= -1: qc = self.nova_manager.getQuota(project.getId(), is_class=True) self.nova_manager.updateQuota(qc) @@ -176,7 +178,8 @@ class QuotaManager(Manager): raise ex def deleteExpiredServers(self): - for prj_id, project in self.project_manager.getProjects().items(): + for project in self.project_manager.getProjects(): + prj_id = project.getId() TTL = project.getTTL() quota = project.getQuota() @@ -322,7 +325,7 @@ class QuotaManager(Manager): SharedQuota.setSize("vcpus", 0) SharedQuota.setSize("memory", 0) - for project in self.project_manager.getProjects().values(): + for project in self.project_manager.getProjects(): quota = project.getQuota() quota.setSize("vcpus", shared_vcpus, private=False) quota.setSize("memory", shared_memory, private=False) diff --git a/synergy_scheduler_manager/scheduler_manager.py b/synergy_scheduler_manager/scheduler_manager.py index 7086306..e38d6de 100644 --- a/synergy_scheduler_manager/scheduler_manager.py +++ b/synergy_scheduler_manager/scheduler_manager.py @@ -68,20 +68,20 @@ class Worker(Thread): last_release_time = SharedQuota.getLastReleaseTime() while queue_items: - self.queue.reinsertItem(queue_items.pop(0)) + self.queue.restore(queue_items.pop(0)) if len(queue_items) >= self.backfill_depth: SharedQuota.wait() continue - queue_item = self.queue.getItem(blocking=False) + queue_item = self.queue.dequeue(block=False) if queue_item is None: if self.queue.getSize(): SharedQuota.wait() continue else: - queue_item = self.queue.getItem(blocking=True) + queue_item = self.queue.dequeue(block=True) if queue_item is None: continue @@ -100,7 +100,7 @@ class Worker(Thread): if s.getState() != "building": # or server["OS-EXT-STS:task_state"] != "scheduling": - self.queue.deleteItem(queue_item) + self.queue.delete(queue_item) continue except SynergyError as ex: LOG.warn("the server %s is not anymore available!" @@ -147,7 +147,7 @@ class Worker(Thread): % (server.getId(), ex)) if found: - self.queue.deleteItem(queue_item) + self.queue.delete(queue_item) else: quota.release(server) queue_items.append(queue_item) @@ -158,7 +158,7 @@ class Worker(Thread): LOG.error("Exception has occured", exc_info=1) LOG.error("Worker %s: %s" % (self.name, ex)) - self.queue.deleteItem(queue_item) + self.queue.delete(queue_item) LOG.info("Worker %s destroyed!" % self.name) @@ -212,22 +212,25 @@ class SchedulerManager(Manager): self.quota_manager.updateSharedQuota() try: - self.dynamic_queue = self.queue_manager.createQueue("DYNAMIC") + self.queue = self.queue_manager.createQueue("DYNAMIC", "PRIORITY") except SynergyError as ex: LOG.error("Exception has occured", exc_info=1) LOG.error(ex) - self.dynamic_queue = self.queue_manager.getQueue("DYNAMIC") + self.queue = self.queue_manager.getQueue("DYNAMIC") - dynamic_worker = Worker("DYNAMIC", - self.dynamic_queue, - self.project_manager, - self.nova_manager, - self.keystone_manager, - self.backfill_depth) - dynamic_worker.start() + for project in self.project_manager.getProjects(): + project.setQueue(self.queue) - self.workers.append(dynamic_worker) + worker = Worker("DYNAMIC", + self.queue, + self.project_manager, + self.nova_manager, + self.keystone_manager, + self.backfill_depth) + worker.start() + + self.workers.append(worker) self.quota_manager.deleteExpiredServers() @@ -246,6 +249,14 @@ class SchedulerManager(Manager): self._processServerEvent(server, event, state) elif event_type == "SERVER_CREATE": self._processServerCreate(kwargs["request"]) + elif event_type == "PROJECT_ADDED": + if not self.configured: + return + + project = kwargs.get("project", None) + + if self.queue and project: + project.setQueue(self.queue) def _processServerEvent(self, server, event, state): if event == "compute.instance.create.end" and state == "active": @@ -356,11 +367,9 @@ class SchedulerManager(Manager): token_admin.getUser().getId(), token_user) context["trust_id"] = trust.getId() + user = project.getUser(id=request.getUserId()) - self.dynamic_queue.insertItem(request.getUserId(), - request.getProjectId(), - priority=priority, - data=request.toDict()) + self.queue.enqueue(user, request.toDict(), priority) LOG.info("new request: id=%s user_id=%s prj_id=%s priority" "=%s quota=shared" % (request.getId(), diff --git a/synergy_scheduler_manager/tests/functional/test_scheduler_manager.py b/synergy_scheduler_manager/tests/functional/test_scheduler_manager.py index bfa5b6c..a05327f 100644 --- a/synergy_scheduler_manager/tests/functional/test_scheduler_manager.py +++ b/synergy_scheduler_manager/tests/functional/test_scheduler_manager.py @@ -14,7 +14,7 @@ from mock import create_autospec from mock import MagicMock from sqlalchemy.engine.base import Engine from synergy_scheduler_manager.common.project import Project -from synergy_scheduler_manager.common.queue import QueueDB +from synergy_scheduler_manager.common.queue import Queue from synergy_scheduler_manager.common.queue import QueueItem from synergy_scheduler_manager.project_manager import ProjectManager from synergy_scheduler_manager.scheduler_manager import Worker @@ -124,6 +124,7 @@ class TestWorker(base.TestCase): # TO COMPLETE def setUp(self): super(TestWorker, self).setUp() + self.db_engine_mock = create_autospec(Engine) self.nova_manager_mock = MagicMock() self.keystone_manager_mock = MagicMock() db_engine_mock = create_autospec(Engine) @@ -156,7 +157,7 @@ class TestWorker(base.TestCase): self.worker = Worker( name="test", - queue=QueueDB("testq", db_engine_mock), + queue=Queue("testq", db_engine=db_engine_mock), project_manager=self.project_manager, nova_manager=self.nova_manager_mock, keystone_manager=self.keystone_manager_mock) @@ -171,7 +172,6 @@ class TestWorker(base.TestCase): def test_run_build_server(self): def nova_exec_side_effect(command, *args, **kwargs): - """Mock nova.execute to do a successful build.""" if command == "GET_SERVER": res = {"OS-EXT-STS:vm_state": "building", "OS-EXT-STS:task_state": "scheduling"} @@ -188,7 +188,7 @@ class TestWorker(base.TestCase): # Mock QueueItem in the queue qitem_mock = create_autospec(QueueItem) - get_item_mock = create_autospec(self.worker.queue.getItem) + get_item_mock = create_autospec(self.worker.queue.dequeue) get_item_mock.return_value = qitem_mock self.worker.queue.getItem = get_item_mock @@ -203,5 +203,5 @@ class TestWorker(base.TestCase): project.getQuota().allocate = quota_allocate_mock # Delete item from the queue - delete_item_mock = create_autospec(self.worker.queue.deleteItem) + delete_item_mock = create_autospec(self.worker.queue.delete) self.worker.queue.deleteItem = delete_item_mock diff --git a/synergy_scheduler_manager/tests/unit/test_queue.py b/synergy_scheduler_manager/tests/unit/test_queue.py index 01e6cfb..9b0da9e 100644 --- a/synergy_scheduler_manager/tests/unit/test_queue.py +++ b/synergy_scheduler_manager/tests/unit/test_queue.py @@ -9,30 +9,32 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -import heapq + +import json from datetime import datetime -from mock import call from mock import create_autospec from sqlalchemy.engine.base import Engine -from synergy_scheduler_manager.common.queue import PriorityQueue -from synergy_scheduler_manager.common.queue import QueueDB +from synergy_scheduler_manager.common.queue import Queue from synergy_scheduler_manager.common.queue import QueueItem +from synergy_scheduler_manager.common.user import User from synergy_scheduler_manager.tests.unit import base +import logging +LOG = logging.getLogger(__name__) + class TestQueueItem(base.TestCase): def setUp(self): super(TestQueueItem, self).setUp() - self.qitem = QueueItem(id=1, - user_id=100, - prj_id=1, - priority=1000, - retry_count=1, - creation_time='now', - last_update='now', - data=1) + self.qitem = QueueItem() + self.qitem.setId(1) + self.qitem.setPriority(1000) + self.qitem.setUserId(100) + self.qitem.setProjectId(1) + self.qitem.setRetryCount(1) + self.qitem.setData(1) def test_get_set_id(self): self.assertEqual(1, self.qitem.getId()) @@ -67,16 +69,20 @@ class TestQueueItem(base.TestCase): self.assertEqual(11, self.qitem.getRetryCount()) def test_get_set_creation_time(self): - self.assertEqual("now", self.qitem.getCreationTime()) + self.assertEqual(self.qitem.getCreationTime(), + self.qitem.getLastUpdate()) - self.qitem.setCreationTime("later") - self.assertEqual("later", self.qitem.getCreationTime()) + now = datetime.now() + self.qitem.setCreationTime(now) + self.assertEqual(now, self.qitem.getCreationTime()) def test_get_set_last_update(self): - self.assertEqual("now", self.qitem.getLastUpdate()) + self.assertEqual(self.qitem.getCreationTime(), + self.qitem.getLastUpdate()) - self.qitem.setLastUpdate("later") - self.assertEqual("later", self.qitem.getLastUpdate()) + now = datetime.now() + self.qitem.setLastUpdate(now) + self.assertEqual(now, self.qitem.getLastUpdate()) def test_get_set_data(self): self.assertEqual(1, self.qitem.getData()) @@ -85,143 +91,104 @@ class TestQueueItem(base.TestCase): self.assertEqual(2, self.qitem.getData()) -class TestPriorityQueue(base.TestCase): - def setUp(self): - super(TestPriorityQueue, self).setUp() - self.pq = PriorityQueue() - now = datetime.now() - - for i in range(0, 3): - item = QueueItem(id=i, - user_id=100, - prj_id=1, - priority=1000, - retry_count=1, - creation_time=now, - last_update=now, - data=i) - self.pq.put(i, item) - - def test_get(self): - self.assertEqual(2, self.pq.get().getId()) - self.assertEqual(1, self.pq.get().getId()) - self.assertEqual(0, self.pq.get().getId()) - - def test_size(self): - self.assertEqual(3, self.pq.size()) - - def test_items(self): - self.assertEqual(3, len(self.pq.items())) - - def test_smallest(self): - self.assertEqual(0, self.pq.smallest(1)[0].getId()) - - def test_largest(self): - self.assertEqual(2, self.pq.largest(1)[0].getId()) - - -class TestQueueDB(base.TestCase): +class TestQueue(base.TestCase): def setUp(self): - super(TestQueueDB, self).setUp() + super(TestQueue, self).setUp() # Create a Queue that mocks database interaction self.db_engine_mock = create_autospec(Engine) - self.q = QueueDB(name="test", db_engine=self.db_engine_mock) + self.queue = Queue(name="test", db_engine=self.db_engine_mock) def test_get_name(self): - self.assertEqual('test', self.q.getName()) + self.assertEqual('test', self.queue.getName()) def test_close(self): - self.q.close() - self.assertEqual(True, self.q.isClosed()) + self.queue.close() + self.assertEqual(True, self.queue.isClosed()) - def test_insert_item(self): + def test_getSize(self): + self.assertEqual(0, self.queue.getSize()) - self.q.insertItem(user_id=1, prj_id=2, priority=10, data="mydata") + def test_enqueue(self): + self.assertEqual(0, self.queue.getSize()) + user = User() + user.setId(2) + user.setProjectId(100) - # Check the db call of the item insert - insert_call = call.connect().execute( - 'insert into `test` (user_id, prj_id, priority, data) ' - 'values(%s, %s, %s, %s)', [1, 2, 10, '"mydata"']) - self.assertIn(insert_call, self.db_engine_mock.mock_calls) + self.queue.enqueue(user=user, data="mydata", priority=10) - # Check the item existence and values in the in-memory queue - priority, timestamp, item = heapq.heappop(self.q.pqueue._heap) - self.assertEqual(-10, priority) - self.assertEqual(item.getCreationTime(), timestamp) - self.assertEqual(1, item.user_id) - self.assertEqual(2, item.prj_id) - self.assertEqual(10, item.priority) - self.assertEqual(0, item.retry_count) - self.assertIsNone(item.data) # TODO(vincent): should it be "mydata"? + self.assertEqual(1, self.queue.getSize()) - def test_get_size(self): - execute_mock = self.db_engine_mock.connect().execute - execute_call = call('select count(*) from `test`') + def test_dequeue(self): + self.assertEqual(0, self.queue.getSize()) + user = User() + user.setId(2) + user.setProjectId(100) + data = json.dumps("mydata") + self.queue.enqueue(user=user, data=data, priority=10) - fetchone_mock = execute_mock().fetchone - fetchone_mock.return_value = [3] + self.assertEqual(1, self.queue.getSize()) - # Check that getSize() uses the correct sqlalchemy method - self.assertEqual(3, self.q.getSize()) - - # Check that getSize() uses the correct SQL statement - self.assertEqual(execute_call, execute_mock.call_args) - - def test_get_item(self): - # Insert the item and mock its DB insertion + # Mock the DB execute_mock = self.db_engine_mock.connect().execute execute_mock().lastrowid = 123 - self.q.insertItem(user_id=1, prj_id=2, priority=10, data="mydata") + fetchone_mock = execute_mock().fetchone + fetchone_mock.return_value = [data] - # Mock the DB select by returning the same things we inserted before - select_mock = self.db_engine_mock.connect().execute - select_call = call("select user_id, prj_id, priority, retry_count, " - "creation_time, last_update, data from `test` " - "where id=%s", [123]) - fetchone_mock = select_mock().fetchone - fetchone_mock.return_value = [1, 2, 10, 0, "now", "now", '"mydata"'] + qitem = self.queue.dequeue() - item = self.q.getItem() - self.assertEqual(select_call, select_mock.call_args) - self.assertEqual(123, item.id) - self.assertEqual(1, item.user_id) - self.assertEqual(2, item.prj_id) - self.assertEqual(10, item.priority) - self.assertEqual(0, item.retry_count) - self.assertEqual("now", item.creation_time) - self.assertEqual("now", item.last_update) - self.assertEqual("mydata", item.data) + self.assertIsNotNone(qitem) + self.assertEqual(2, qitem.getUserId()) + self.assertEqual(100, qitem.getProjectId()) + self.assertEqual("mydata", qitem.getData()) - def test_delete_item(self): - # Mock QueueItem to be deleted - qitem = create_autospec(QueueItem) - qitem.getId.return_value = 123 + def test_delete(self): + self.assertEqual(0, self.queue.getSize()) + user = User() + user.setId(2) + user.setProjectId(100) + data = json.dumps("mydata") - # Mock the DB delete + self.queue.enqueue(user=user, data=data, priority=10) + + # Mock the DB execute_mock = self.db_engine_mock.connect().execute - execute_call = call("delete from `test` where id=%s", [123]) + execute_mock().lastrowid = 123 + fetchone_mock = execute_mock().fetchone + fetchone_mock.return_value = [data] + qitem = self.queue.dequeue() - self.q.deleteItem(qitem) - self.assertEqual(execute_call, execute_mock.call_args) + execute_mock = self.db_engine_mock.connect().execute + execute_mock().lastrowid = 123 + self.queue.delete(qitem) - def test_update_item(self): - # Mock QueueItem to be updated - qitem = create_autospec(QueueItem) - qitem.getPriority.return_value = 10 - qitem.getRetryCount.return_value = 20 - qitem.getLastUpdate.return_value = "right_now" - qitem.getId.return_value = 123 + def test_insertItemDB(self): + qitem = QueueItem() + qitem.setPriority(1000) + qitem.setUserId(100) + qitem.setProjectId(1) + qitem.setRetryCount(1) + qitem.setData(1) + + # Check the db call of the item insert + execute_mock = self.db_engine_mock.connect().execute + execute_mock().lastrowid = 123 + self.queue._insertItemDB(qitem) + self.assertEqual(123, qitem.getId()) + + def test_updateItemDB(self): + qitem = QueueItem() + qitem.setPriority(1000) + qitem.setUserId(100) + qitem.setProjectId(1) + qitem.setRetryCount(1) + qitem.setData(1) + lastUpdate = qitem.getLastUpdate() # Mock the DB update execute_mock = self.db_engine_mock.connect().execute - execute_call = call("update `test` set priority=%s, retry_count=%s, " - "last_update=%s where id=%s", - [10, 20, "right_now", 123]) + execute_mock().lastrowid = 123 - # Check the DB call and that the new QueueItem is in the queue - self.q.updateItem(qitem) - self.assertEqual(execute_call, execute_mock.call_args) - self.assertIn((-10, qitem.getCreationTime(), qitem), - self.q.pqueue._heap) + self.queue._updateItemDB(qitem) + self.assertNotEqual(lastUpdate, qitem.getLastUpdate())