Merge "Queue.updatePriority() takes much time if the queue is large"
This commit is contained in:
commit
8ff83d7f9d
@ -162,6 +162,7 @@ class QueueDB(Queue):
|
||||
self.pqueue = PriorityQueue()
|
||||
self.createTable()
|
||||
self.buildFromDB()
|
||||
self.updatePriority()
|
||||
|
||||
def getSize(self):
|
||||
connection = self.db_engine.connect()
|
||||
@ -356,48 +357,26 @@ retry_count, creation_time, last_update, data from `%s`""" % self.getName()
|
||||
if self.fairshare_manager is None:
|
||||
return
|
||||
|
||||
queue_items = []
|
||||
|
||||
with self.condition:
|
||||
queue_items = []
|
||||
|
||||
connection = self.db_engine.connect()
|
||||
|
||||
while len(self.pqueue) > 0:
|
||||
queue_item = self.pqueue.get()
|
||||
priority = queue_item.getPriority()
|
||||
|
||||
try:
|
||||
priority = self.fairshare_manager.execute(
|
||||
"CALCULATE_PRIORITY",
|
||||
priority = self.fairshare_manager.calculatePriority(
|
||||
user_id=queue_item.getUserId(),
|
||||
prj_id=queue_item.getProjectId(),
|
||||
timestamp=queue_item.getCreationTime(),
|
||||
retry=queue_item.getRetryCount())
|
||||
|
||||
queue_item.setPriority(priority)
|
||||
except Exception as ex:
|
||||
except Exception:
|
||||
continue
|
||||
finally:
|
||||
queue_items.append(queue_item)
|
||||
|
||||
trans = connection.begin()
|
||||
|
||||
try:
|
||||
queue_item.setLastUpdate(datetime.now())
|
||||
|
||||
QUERY = "update `%s`" % self.getName()
|
||||
QUERY += " set priority=%s, last_update=%s where id=%s"
|
||||
|
||||
connection.execute(QUERY, [queue_item.getPriority(),
|
||||
queue_item.getLastUpdate(),
|
||||
queue_item.getId()])
|
||||
|
||||
trans.commit()
|
||||
except SQLAlchemyError as ex:
|
||||
trans.rollback()
|
||||
raise Exception(ex.message)
|
||||
|
||||
connection.close()
|
||||
|
||||
if len(queue_items) > 0:
|
||||
for queue_item in queue_items:
|
||||
self.pqueue.put(queue_item.getPriority(), queue_item)
|
||||
|
Loading…
x
Reference in New Issue
Block a user