From 17a9c2bd03aae2b9edcbd76dc76b456299189a34 Mon Sep 17 00:00:00 2001 From: Lisa Zangrando Date: Mon, 3 Oct 2016 15:05:44 +0200 Subject: [PATCH] Added Queue class to synergy_scheduler_manager/common/queue.py Change-Id: I92226005ff309d7595f269fb4afaba7936628fe4 --- synergy_scheduler_manager/common/queue.py | 80 ++++++++++++++--------- 1 file changed, 49 insertions(+), 31 deletions(-) diff --git a/synergy_scheduler_manager/common/queue.py b/synergy_scheduler_manager/common/queue.py index 9b36305..361020f 100644 --- a/synergy_scheduler_manager/common/queue.py +++ b/synergy_scheduler_manager/common/queue.py @@ -4,6 +4,8 @@ import threading from datetime import datetime from sqlalchemy.exc import SQLAlchemyError +from synergy.common.serializer import SynergyObject + __author__ = "Lisa Zangrando" __email__ = "lisa.zangrando[AT]pd.infn.it" @@ -122,27 +124,50 @@ class PriorityQueue(object): return heapq.nlargest(x, self._heap) -class QueueDB(object): +class Queue(SynergyObject): + + def __init__(self): + super(Queue, self).__init__() + + self.setName("N/A") + self.set("is_closed", False) + self.set("size", 0) + + def isOpen(self): + return not self.get("is_closed") + + def isClosed(self): + return self.get("is_closed") + + def setClosed(self, is_closed): + self.set("is_closed", is_closed) + + def getSize(self): + return self.get("size") + + def setSize(self, size): + self.set("size", size) + + +class QueueDB(Queue): def __init__(self, name, db_engine, fairshare_manager=None): - self.name = name + super(QueueDB, self).__init__() + self.setName(name) + self.db_engine = db_engine self.fairshare_manager = fairshare_manager - self.is_closed = False self.priority_updater = None self.condition = threading.Condition() self.pqueue = PriorityQueue() self.createTable() self.buildFromDB() - def getName(self): - return self.name - def getSize(self): connection = self.db_engine.connect() try: - QUERY = "select count(*) from `%s`" % self.name + QUERY = "select count(*) from `%s`" % self.getName() result = connection.execute(QUERY) row = result.fetchone() @@ -158,7 +183,7 @@ class QueueDB(object): AUTO_INCREMENT PRIMARY KEY, `priority` INT DEFAULT 0, user_id CHAR(40) \ NOT NULL, prj_id CHAR(40) NOT NULL, `retry_count` INT DEFAULT 0, \ `creation_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, `last_update` \ -TIMESTAMP NULL, `data` TEXT NOT NULL ) ENGINE=InnoDB""" % self.name +TIMESTAMP NULL, `data` TEXT NOT NULL ) ENGINE=InnoDB""" % self.getName() connection = self.db_engine.connect() @@ -172,21 +197,18 @@ TIMESTAMP NULL, `data` TEXT NOT NULL ) ENGINE=InnoDB""" % self.name connection.close() def close(self): - if not self.is_closed: - self.is_closed = True + if not self.isClosed(): + self.setClosed(True) with self.condition: self.condition.notifyAll() - def isClosed(self): - return self.is_closed - def buildFromDB(self): connection = self.db_engine.connect() try: QUERY = "select id, user_id, prj_id, priority, retry_count, " \ - "creation_time, last_update from `%s`" % self.name + "creation_time, last_update from `%s`" % self.getName() result = connection.execute(QUERY) for row in result: @@ -206,7 +228,7 @@ TIMESTAMP NULL, `data` TEXT NOT NULL ) ENGINE=InnoDB""" % self.name with self.condition: idRecord = -1 QUERY = "insert into `%s` (user_id, prj_id, priority, " \ - "data) values" % self.name + "data) values" % self.getName() QUERY += "(%s, %s, %s, %s)" connection = self.db_engine.connect() @@ -244,7 +266,7 @@ TIMESTAMP NULL, `data` TEXT NOT NULL ) ENGINE=InnoDB""" % self.name queue_item = None with self.condition: - while (queue_item is None and not self.is_closed): + while (queue_item is None and not self.isClosed()): if len(self.pqueue): queue_item = self.pqueue.get() elif blocking: @@ -252,12 +274,12 @@ TIMESTAMP NULL, `data` TEXT NOT NULL ) ENGINE=InnoDB""" % self.name elif queue_item is None: break - if (not self.is_closed and queue_item is not None): + if (not self.isClosed() and queue_item is not None): connection = self.db_engine.connect() try: QUERY = """select user_id, prj_id, priority, \ -retry_count, creation_time, last_update, data from `%s`""" % self.name +retry_count, creation_time, last_update, data from `%s`""" % self.getName() QUERY += " where id=%s" result = connection.execute(QUERY, [queue_item.getId()]) @@ -285,7 +307,7 @@ retry_count, creation_time, last_update, data from `%s`""" % self.name trans = connection.begin() try: - QUERY = "delete from `%s`" % self.name + QUERY = "delete from `%s`" % self.getName() QUERY += " where id=%s" connection.execute(QUERY, [queue_item.getId()]) @@ -310,7 +332,7 @@ retry_count, creation_time, last_update, data from `%s`""" % self.name try: queue_item.setLastUpdate(datetime.now()) - QUERY = "update `%s`" % self.name + QUERY = "update `%s`" % self.getName() QUERY += " set priority=%s, retry_count=%s, " \ "last_update=%s where id=%s" @@ -362,7 +384,7 @@ retry_count, creation_time, last_update, data from `%s`""" % self.name try: queue_item.setLastUpdate(datetime.now()) - QUERY = "update `%s`" % self.name + QUERY = "update `%s`" % self.getName() QUERY += " set priority=%s, last_update=%s where id=%s" connection.execute(QUERY, [queue_item.getPriority(), @@ -384,14 +406,10 @@ retry_count, creation_time, last_update, data from `%s`""" % self.name self.condition.notifyAll() - def toDict(self): - queue = {} - queue["name"] = self.name - queue["size"] = self.getSize() + def serialize(self): + queue = Queue() + queue.setName(self.getName()) + queue.setSize(self.getSize()) + queue.setClosed(self.isClosed()) - if self.is_closed: - queue["status"] = "OFF" - else: - queue["status"] = "ON" - - return queue + return queue.serialize()