Queue.updatePriority() takes much time if the queue is large
The method updates the priority of all user requests in the queue, one by one. The priority of each request is updated in memory and into the DB table. If the queue is too large (>5000), the updatePriority() takes much time for its execution. Change-Id: I982f98be94a8d23a159320887f51e7393fecb4a9 Sem-Ver: bugfix Closes-bug: #1646466
This commit is contained in:
parent
4db25a4f5b
commit
3cc99f5c14
@ -162,6 +162,7 @@ class QueueDB(Queue):
|
||||
self.pqueue = PriorityQueue()
|
||||
self.createTable()
|
||||
self.buildFromDB()
|
||||
self.updatePriority()
|
||||
|
||||
def getSize(self):
|
||||
connection = self.db_engine.connect()
|
||||
@ -356,48 +357,26 @@ retry_count, creation_time, last_update, data from `%s`""" % self.getName()
|
||||
if self.fairshare_manager is None:
|
||||
return
|
||||
|
||||
queue_items = []
|
||||
|
||||
with self.condition:
|
||||
queue_items = []
|
||||
|
||||
connection = self.db_engine.connect()
|
||||
|
||||
while len(self.pqueue) > 0:
|
||||
queue_item = self.pqueue.get()
|
||||
priority = queue_item.getPriority()
|
||||
|
||||
try:
|
||||
priority = self.fairshare_manager.execute(
|
||||
"CALCULATE_PRIORITY",
|
||||
priority = self.fairshare_manager.calculatePriority(
|
||||
user_id=queue_item.getUserId(),
|
||||
prj_id=queue_item.getProjectId(),
|
||||
timestamp=queue_item.getCreationTime(),
|
||||
retry=queue_item.getRetryCount())
|
||||
|
||||
queue_item.setPriority(priority)
|
||||
except Exception as ex:
|
||||
except Exception:
|
||||
continue
|
||||
finally:
|
||||
queue_items.append(queue_item)
|
||||
|
||||
trans = connection.begin()
|
||||
|
||||
try:
|
||||
queue_item.setLastUpdate(datetime.now())
|
||||
|
||||
QUERY = "update `%s`" % self.getName()
|
||||
QUERY += " set priority=%s, last_update=%s where id=%s"
|
||||
|
||||
connection.execute(QUERY, [queue_item.getPriority(),
|
||||
queue_item.getLastUpdate(),
|
||||
queue_item.getId()])
|
||||
|
||||
trans.commit()
|
||||
except SQLAlchemyError as ex:
|
||||
trans.rollback()
|
||||
raise Exception(ex.message)
|
||||
|
||||
connection.close()
|
||||
|
||||
if len(queue_items) > 0:
|
||||
for queue_item in queue_items:
|
||||
self.pqueue.put(queue_item.getPriority(), queue_item)
|
||||
|
Loading…
x
Reference in New Issue
Block a user