import json import Queue as queue from datetime import datetime from sqlalchemy.exc import SQLAlchemyError from synergy.common.serializer import SynergyObject from synergy.exception import SynergyError __author__ = "Lisa Zangrando" __email__ = "lisa.zangrando[AT]pd.infn.it" __copyright__ = """Copyright (c) 2015 INFN - INDIGO-DataCloud All Rights Reserved Licensed under the Apache License, Version 2.0; you may not use this file except in compliance with the License. You may obtain a copy of the License at: http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.""" class QueueItem(object): def __init__(self): self.id = -1 self.priority = 0 self.retry_count = 0 self.user_id = None self.prj_id = None self.data = None self.creation_time = datetime.now() self.last_update = self.creation_time def getId(self): return self.id def setId(self, id): self.id = id def getUserId(self): return self.user_id def setUserId(self, user_id): self.user_id = user_id def getProjectId(self): return self.prj_id def setProjectId(self, prj_id): self.prj_id = prj_id def getPriority(self): return self.priority def setPriority(self, priority): self.priority = priority def getRetryCount(self): return self.retry_count def setRetryCount(self, retry_count): self.retry_count = retry_count def incRetryCount(self): self.retry_count += 1 def getCreationTime(self): return self.creation_time def setCreationTime(self, creation_time): self.creation_time = creation_time def getLastUpdate(self): return self.last_update def setLastUpdate(self, last_update): self.last_update = last_update def getData(self): return self.data def setData(self, data): self.data = data class Queue(SynergyObject): def __init__(self, name="default", type="PRIORITY", db_engine=None): super(Queue, self).__init__() if type == "FIFO": self.queue = queue.Queue() elif type == "LIFO": self.queue = queue.LifoQueue() elif type == "PRIORITY": self.queue = queue.PriorityQueue() else: raise SynergyError("queue type %r not supported" % type) self.set("type", type) self.set("is_closed", False) self.set("size", 0) self.setName(name) self.db_engine = db_engine self._createTable() self._buildFromDB() def _setSize(self, value): size = self.get("size") self.set("size", size + value) def isOpen(self): return not self.get("is_closed") def isClosed(self): return self.get("is_closed") def setClosed(self, is_closed): self.set("is_closed", is_closed) def isEmpty(self): return self.queue.empty() def close(self): self.setClosed(True) def enqueue(self, user, data, priority=0): if self.isClosed(): raise SynergyError("the queue is closed!") if not user: raise SynergyError("user not specified!") if not data: raise SynergyError("data not specified!") item = QueueItem() item.setUserId(user.getId()) item.setProjectId(user.getProjectId()) item.setPriority(priority) item.setData(data) self._insertItemDB(item) if self.getType() == "PRIORITY": self.queue.put((-priority, item.getCreationTime(), item)) else: self.queue.put(item) self._setSize(1) def dequeue(self, block=True, timeout=None, delete=False): if self.isClosed(): raise SynergyError("the queue is closed!") if self.queue.empty() and not block: return None item = self.queue.get(block=block, timeout=timeout) if not item: return None if self.getType() == "PRIORITY": item = item[2] self._getItemDataDB(item) if delete: self.delete(item) return item def restore(self, item): if self.isClosed(): raise SynergyError("the queue is closed!") if self.getType() == "PRIORITY": self.queue.put((-item.getPriority(), item.getCreationTime(), item)) else: self.queue.put(item) self._updateItemDB(item) def getType(self): return self.get("type") def getSize(self): return self.get("size") def getUsage(self, prj_id): result = 0 connection = self.db_engine.connect() try: QUERY = "select count(*) from `%s` " % self.getName() QUERY += "where prj_id=%s" qresult = connection.execute(QUERY, [prj_id]) row = qresult.fetchone() result = row[0] except SQLAlchemyError as ex: raise SynergyError(ex.message) finally: connection.close() return result def delete(self, item): if self.isClosed(): raise SynergyError("the queue is closed!") if not item or not self.db_engine: return connection = self.db_engine.connect() trans = connection.begin() try: QUERY = "delete from `%s`" % self.getName() QUERY += " where id=%s" connection.execute(QUERY, [item.getId()]) trans.commit() except SQLAlchemyError as ex: trans.rollback() raise SynergyError(ex.message) finally: connection.close() self._setSize(-1) self.queue.task_done() def _createTable(self): if not self.db_engine: return TABLE = """CREATE TABLE IF NOT EXISTS `%s` (`id` BIGINT NOT NULL \ AUTO_INCREMENT PRIMARY KEY, `priority` INT DEFAULT 0, user_id CHAR(40) \ NOT NULL, prj_id CHAR(40) NOT NULL, `retry_count` INT DEFAULT 0, \ `creation_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, `last_update` \ TIMESTAMP NULL, `data` TEXT NOT NULL) ENGINE=InnoDB""" % self.getName() connection = self.db_engine.connect() try: connection.execute(TABLE) except SQLAlchemyError as ex: raise SynergyError(ex.message) except Exception as ex: raise SynergyError(ex.message) finally: connection.close() def _buildFromDB(self): if not self.db_engine: return connection = self.db_engine.connect() try: QUERY = "select id, user_id, prj_id, priority, retry_count, " \ "creation_time, last_update from `%s`" % self.getName() result = connection.execute(QUERY) for row in result: item = QueueItem() item.setId(row[0]) item.setUserId(row[1]) item.setProjectId(row[2]) item.setPriority(row[3]) item.setRetryCount(row[4]) item.setCreationTime(row[5]) item.setLastUpdate(row[6]) self.restore(item) self._setSize(1) except SQLAlchemyError as ex: raise SynergyError(ex.message) finally: connection.close() def _insertItemDB(self, item): if not item or not self.db_engine: return QUERY = "insert into `%s` (user_id, prj_id, priority, " \ "data) values" % self.getName() QUERY += "(%s, %s, %s, %s)" connection = self.db_engine.connect() trans = connection.begin() try: result = connection.execute(QUERY, [item.getUserId(), item.getProjectId(), item.getPriority(), json.dumps(item.getData())]) idRecord = result.lastrowid trans.commit() item.setId(idRecord) except SQLAlchemyError as ex: trans.SynergyError() raise SynergyError(ex.message) finally: connection.close() def _getItemDataDB(self, item): if not item or not self.db_engine: return data = None connection = self.db_engine.connect() try: QUERY = "select data from `%s`" % self.getName() QUERY += " where id=%s" result = connection.execute(QUERY, [item.getId()]) row = result.fetchone() data = json.loads(row[0]) except SQLAlchemyError as ex: raise SynergyError(ex.message) finally: connection.close() item.setData(data) return data def _updateItemDB(self, item): if not item or not self.db_engine: return connection = self.db_engine.connect() trans = connection.begin() try: item.setLastUpdate(datetime.now()) QUERY = "update `%s`" % self.getName() QUERY += " set priority=%s, retry_count=%s, " \ "last_update=%s where id=%s" connection.execute(QUERY, [item.getPriority(), item.getRetryCount(), item.getLastUpdate(), item.getId()]) trans.commit() except SQLAlchemyError as ex: trans.rollback() raise SynergyError(ex.message) finally: connection.close()