diff --git a/synergy_scheduler_manager/common/queue.py b/synergy_scheduler_manager/common/queue.py index c54fea4..6b0662e 100644 --- a/synergy_scheduler_manager/common/queue.py +++ b/synergy_scheduler_manager/common/queue.py @@ -162,6 +162,7 @@ class QueueDB(Queue): self.pqueue = PriorityQueue() self.createTable() self.buildFromDB() + self.updatePriority() def getSize(self): connection = self.db_engine.connect() @@ -356,48 +357,26 @@ retry_count, creation_time, last_update, data from `%s`""" % self.getName() if self.fairshare_manager is None: return + queue_items = [] + with self.condition: - queue_items = [] - - connection = self.db_engine.connect() - while len(self.pqueue) > 0: queue_item = self.pqueue.get() priority = queue_item.getPriority() try: - priority = self.fairshare_manager.execute( - "CALCULATE_PRIORITY", + priority = self.fairshare_manager.calculatePriority( user_id=queue_item.getUserId(), prj_id=queue_item.getProjectId(), timestamp=queue_item.getCreationTime(), retry=queue_item.getRetryCount()) queue_item.setPriority(priority) - except Exception as ex: + except Exception: continue finally: queue_items.append(queue_item) - trans = connection.begin() - - try: - queue_item.setLastUpdate(datetime.now()) - - QUERY = "update `%s`" % self.getName() - QUERY += " set priority=%s, last_update=%s where id=%s" - - connection.execute(QUERY, [queue_item.getPriority(), - queue_item.getLastUpdate(), - queue_item.getId()]) - - trans.commit() - except SQLAlchemyError as ex: - trans.rollback() - raise Exception(ex.message) - - connection.close() - if len(queue_items) > 0: for queue_item in queue_items: self.pqueue.put(queue_item.getPriority(), queue_item)