Added Queue class to synergy_scheduler_manager/common/queue.py

Change-Id: I92226005ff309d7595f269fb4afaba7936628fe4
This commit is contained in:
Lisa Zangrando 2016-10-03 15:05:44 +02:00 committed by Vincent Llorens
parent a37e4fb0ff
commit 17a9c2bd03

View File

@ -4,6 +4,8 @@ import threading
from datetime import datetime
from sqlalchemy.exc import SQLAlchemyError
from synergy.common.serializer import SynergyObject
__author__ = "Lisa Zangrando"
__email__ = "lisa.zangrando[AT]pd.infn.it"
@ -122,27 +124,50 @@ class PriorityQueue(object):
return heapq.nlargest(x, self._heap)
class QueueDB(object):
class Queue(SynergyObject):
def __init__(self):
super(Queue, self).__init__()
self.setName("N/A")
self.set("is_closed", False)
self.set("size", 0)
def isOpen(self):
return not self.get("is_closed")
def isClosed(self):
return self.get("is_closed")
def setClosed(self, is_closed):
self.set("is_closed", is_closed)
def getSize(self):
return self.get("size")
def setSize(self, size):
self.set("size", size)
class QueueDB(Queue):
def __init__(self, name, db_engine, fairshare_manager=None):
self.name = name
super(QueueDB, self).__init__()
self.setName(name)
self.db_engine = db_engine
self.fairshare_manager = fairshare_manager
self.is_closed = False
self.priority_updater = None
self.condition = threading.Condition()
self.pqueue = PriorityQueue()
self.createTable()
self.buildFromDB()
def getName(self):
return self.name
def getSize(self):
connection = self.db_engine.connect()
try:
QUERY = "select count(*) from `%s`" % self.name
QUERY = "select count(*) from `%s`" % self.getName()
result = connection.execute(QUERY)
row = result.fetchone()
@ -158,7 +183,7 @@ class QueueDB(object):
AUTO_INCREMENT PRIMARY KEY, `priority` INT DEFAULT 0, user_id CHAR(40) \
NOT NULL, prj_id CHAR(40) NOT NULL, `retry_count` INT DEFAULT 0, \
`creation_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, `last_update` \
TIMESTAMP NULL, `data` TEXT NOT NULL ) ENGINE=InnoDB""" % self.name
TIMESTAMP NULL, `data` TEXT NOT NULL ) ENGINE=InnoDB""" % self.getName()
connection = self.db_engine.connect()
@ -172,21 +197,18 @@ TIMESTAMP NULL, `data` TEXT NOT NULL ) ENGINE=InnoDB""" % self.name
connection.close()
def close(self):
if not self.is_closed:
self.is_closed = True
if not self.isClosed():
self.setClosed(True)
with self.condition:
self.condition.notifyAll()
def isClosed(self):
return self.is_closed
def buildFromDB(self):
connection = self.db_engine.connect()
try:
QUERY = "select id, user_id, prj_id, priority, retry_count, " \
"creation_time, last_update from `%s`" % self.name
"creation_time, last_update from `%s`" % self.getName()
result = connection.execute(QUERY)
for row in result:
@ -206,7 +228,7 @@ TIMESTAMP NULL, `data` TEXT NOT NULL ) ENGINE=InnoDB""" % self.name
with self.condition:
idRecord = -1
QUERY = "insert into `%s` (user_id, prj_id, priority, " \
"data) values" % self.name
"data) values" % self.getName()
QUERY += "(%s, %s, %s, %s)"
connection = self.db_engine.connect()
@ -244,7 +266,7 @@ TIMESTAMP NULL, `data` TEXT NOT NULL ) ENGINE=InnoDB""" % self.name
queue_item = None
with self.condition:
while (queue_item is None and not self.is_closed):
while (queue_item is None and not self.isClosed()):
if len(self.pqueue):
queue_item = self.pqueue.get()
elif blocking:
@ -252,12 +274,12 @@ TIMESTAMP NULL, `data` TEXT NOT NULL ) ENGINE=InnoDB""" % self.name
elif queue_item is None:
break
if (not self.is_closed and queue_item is not None):
if (not self.isClosed() and queue_item is not None):
connection = self.db_engine.connect()
try:
QUERY = """select user_id, prj_id, priority, \
retry_count, creation_time, last_update, data from `%s`""" % self.name
retry_count, creation_time, last_update, data from `%s`""" % self.getName()
QUERY += " where id=%s"
result = connection.execute(QUERY, [queue_item.getId()])
@ -285,7 +307,7 @@ retry_count, creation_time, last_update, data from `%s`""" % self.name
trans = connection.begin()
try:
QUERY = "delete from `%s`" % self.name
QUERY = "delete from `%s`" % self.getName()
QUERY += " where id=%s"
connection.execute(QUERY, [queue_item.getId()])
@ -310,7 +332,7 @@ retry_count, creation_time, last_update, data from `%s`""" % self.name
try:
queue_item.setLastUpdate(datetime.now())
QUERY = "update `%s`" % self.name
QUERY = "update `%s`" % self.getName()
QUERY += " set priority=%s, retry_count=%s, " \
"last_update=%s where id=%s"
@ -362,7 +384,7 @@ retry_count, creation_time, last_update, data from `%s`""" % self.name
try:
queue_item.setLastUpdate(datetime.now())
QUERY = "update `%s`" % self.name
QUERY = "update `%s`" % self.getName()
QUERY += " set priority=%s, last_update=%s where id=%s"
connection.execute(QUERY, [queue_item.getPriority(),
@ -384,14 +406,10 @@ retry_count, creation_time, last_update, data from `%s`""" % self.name
self.condition.notifyAll()
def toDict(self):
queue = {}
queue["name"] = self.name
queue["size"] = self.getSize()
def serialize(self):
queue = Queue()
queue.setName(self.getName())
queue.setSize(self.getSize())
queue.setClosed(self.isClosed())
if self.is_closed:
queue["status"] = "OFF"
else:
queue["status"] = "ON"
return queue
return queue.serialize()