diff --git a/ChangeLog b/ChangeLog index 11c9e52..0c18624 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,9 @@ +commit 2d47fa6f6e0de0a54975ff92fc87785a052d4371 +Author: Monsyne Dragon +Date: Mon Sep 8 23:02:52 2014 +0000 + + Add reset stream method. + commit ca0d09f7bc017ef9e372ae29a0c19bf20b68aca5 Author: Monsyne Dragon Date: Mon Sep 8 19:57:24 2014 +0000 diff --git a/tests/test_db.py b/tests/test_db.py index 860a6d2..f838422 100644 --- a/tests/test_db.py +++ b/tests/test_db.py @@ -439,3 +439,11 @@ class TestDB(unittest.TestCase): stream = self.db.get_stream_by_id(6) stream = self.db.reset_stream(stream) self.assertEqual(stream.state, models.StreamState.retry_expire) + + def test_purge_stream(self): + stream = self.db.get_stream_by_id(1) + self.db.purge_stream(stream) + with self.assertRaises(db.NoSuchStreamError): + self.db.get_stream_by_id(1) + + diff --git a/tests/test_pipeline_manager.py b/tests/test_pipeline_manager.py index d9f2d37..77889bc 100644 --- a/tests/test_pipeline_manager.py +++ b/tests/test_pipeline_manager.py @@ -201,13 +201,23 @@ class TestPipelineManager(unittest.TestCase): super(TestPipelineManager, self).setUp() @mock.patch.object(pipeline_manager.ConfigManager, 'wrap') - def test_complete_stream(self, mock_config_wrap): + def test_complete_stream_nopurge(self, mock_config_wrap): pm = pipeline_manager.PipelineManager('test') pm.db = mock.MagicMock(spec=pm.db) + pm.purge_completed_streams = False stream = "test stream" pm._complete_stream(stream) pm.db.set_stream_state.assert_called_once_with(stream, StreamState.completed) + @mock.patch.object(pipeline_manager.ConfigManager, 'wrap') + def test_complete_stream_purge(self, mock_config_wrap): + pm = pipeline_manager.PipelineManager('test') + pm.db = mock.MagicMock(spec=pm.db) + pm.purge_completed_streams = True + stream = "test stream" + pm._complete_stream(stream) + pm.db.purge_stream.assert_called_once_with(stream) + @mock.patch.object(pipeline_manager.ConfigManager, 'wrap') def test_error_stream(self, mock_config_wrap): pm = pipeline_manager.PipelineManager('test') diff --git a/winchester/db/__init__.py b/winchester/db/__init__.py index c30f386..f622d21 100644 --- a/winchester/db/__init__.py +++ b/winchester/db/__init__.py @@ -1,4 +1,5 @@ from winchester.db.interface import DuplicateError, LockError +from winchester.db.interface import NoSuchEventError, NoSuchStreamError from winchester.db.interface import DBInterface diff --git a/winchester/db/interface.py b/winchester/db/interface.py index 7b63183..59e2080 100644 --- a/winchester/db/interface.py +++ b/winchester/db/interface.py @@ -4,6 +4,8 @@ import sqlalchemy from sqlalchemy import and_, or_ from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import sessionmaker +from sqlalchemy.orm.exc import NoResultFound +from sqlalchemy.orm.exc import MultipleResultsFound from winchester import models from winchester.config import ConfigManager, ConfigSection, ConfigItem @@ -23,6 +25,14 @@ class LockError(models.DBException): pass +class NoSuchEventError(models.DBException): + pass + + +class NoSuchStreamError(models.DBException): + pass + + def sessioned(func): def with_session(self, *args, **kw): if 'session' in kw: @@ -108,14 +118,20 @@ class DBInterface(object): @sessioned def get_event_by_message_id(self, message_id, session=None): - e = session.query(models.Event).\ - filter(models.Event.message_id == message_id).one() + try: + e = session.query(models.Event).\ + filter(models.Event.message_id == message_id).one() + except NoResultFound: + raise NoSuchEventError("No event found with message_id %s!" % message_id) return e.as_dict @sessioned def get_stream_by_id(self, stream_id, session=None): - s = session.query(models.Stream).\ - filter(models.Stream.id == stream_id).one() + try: + s = session.query(models.Stream).\ + filter(models.Stream.id == stream_id).one() + except NoResultFound: + raise NoSuchStreamError("No stream found with id %s!" % stream_id) return s @sessioned @@ -217,3 +233,10 @@ class DBInterface(object): if stream.state == models.StreamState.expire_error: return self.set_stream_state(stream, models.StreamState.retry_expire) return stream + + @sessioned + def purge_stream(self, stream, session=None): + if stream not in session: + session.add(stream) + session.delete(stream) + diff --git a/winchester/models.py b/winchester/models.py index 21d6309..8feed8d 100644 --- a/winchester/models.py +++ b/winchester/models.py @@ -342,6 +342,7 @@ class Stream(ProxiedDictMixin, Base): state_serial_no = Column(Integer, default=0, nullable=False) distinguished_by = relationship("DistinguishingTrait", + cascade="save-update, merge, delete, delete-orphan", collection_class=attribute_mapped_collection('name')) _proxied = association_proxy("distinguished_by", "value", creator=lambda name, value: DistinguishingTrait(name=name, value=value)) diff --git a/winchester/pipeline_manager.py b/winchester/pipeline_manager.py index a0b355b..fbcd9ac 100644 --- a/winchester/pipeline_manager.py +++ b/winchester/pipeline_manager.py @@ -112,6 +112,8 @@ class PipelineManager(object): pipeline_config=ConfigItem(required=True, help="Name of pipeline config file " "defining the handlers for each pipeline."), + purge_completed_streams=ConfigItem(help="Delete successfully proccessed " + "streams when finished?", default=True), ) def __init__(self, config, db=None, pipeline_handlers=None, pipeline_config=None, trigger_defs=None): @@ -156,6 +158,7 @@ class PipelineManager(object): self.pipeline_worker_batch_size = config['pipeline_worker_batch_size'] self.pipeline_worker_delay = config['pipeline_worker_delay'] self.statistics_period = config['statistics_period'] + self.purge_completed_streams = config['purge_completed_streams'] self.streams_fired = 0 self.streams_expired = 0 self.streams_loaded = 0 @@ -209,13 +212,28 @@ class PipelineManager(object): return True def _complete_stream(self, stream): - self.db.set_stream_state(stream, StreamState.completed) + if self.purge_completed_streams: + self.db.purge_stream(stream) + else: + try: + self.db.set_stream_state(stream, StreamState.completed) + except LockError: + logger.error("Stream %s locked while trying to set 'complete' state! " + "This should not happen." % stream.id) def _error_stream(self, stream): - self.db.set_stream_state(stream, StreamState.error) + try: + self.db.set_stream_state(stream, StreamState.error) + except LockError: + logger.error("Stream %s locked while trying to set 'error' state! " + "This should not happen." % stream.id) def _expire_error_stream(self, stream): - self.db.set_stream_state(stream, StreamState.expire_error) + try: + self.db.set_stream_state(stream, StreamState.expire_error) + except LockError: + logger.error("Stream %s locked while trying to set 'expire_error' state! " + "This should not happen." % stream.id) def fire_stream(self, stream): try: