Save newly generated events from pipeline

Save newly created events from pipeline run if pipeline commits.
Refactor trigger manager api wart, move save_event call into add_event
    to make add_event and add_notification symmetric.
This commit is contained in:
Monsyne Dragon 2014-09-08 19:57:24 +00:00
parent 0c619c133d
commit ca0d09f7bc
4 changed files with 45 additions and 19 deletions

@ -1,3 +1,15 @@
commit 0c619c133d3c248d62a2c5f6441d4fae0bf7042a
Author: Monsyne Dragon <mdragon@rackspace.com>
Date: Sun Sep 7 04:07:20 2014 +0000
Add database admin command.
Add admin command for db schema upgrade/downgrade/etc.
Move alembic migrations so above can find them when installed
as a package.
Fix up packaging to use setup.cfg and pbr.
Flesh out README.
commit a6f84d16036e143b1b605c50b90055a623e3235b
Author: Monsyne Dragon <mdragon@rackspace.com>
Date: Thu Sep 4 20:43:41 2014 +0000

@ -60,7 +60,6 @@ class TestTriggerManager(unittest.TestCase):
cond.clear.assert_called_once_with()
cond.validate.assert_called_once_with()
tm.distiller.to_event.assert_called_once_with('test notification here', cond)
tm.save_event.assert_called_once_with(test_event)
self.assertEquals(res, test_event)
@mock.patch('winchester.trigger_manager.EventCondenser', autospec=True)
@ -187,13 +186,15 @@ class TestTriggerManager(unittest.TestCase):
m_def = tm.trigger_definitions[2]
tm.trigger_definitions[0].match.return_value = None
tm.trigger_definitions[1].match.return_value = None
event = "test event"
event = mock.MagicMock(name='event', spec=dict)
tm.save_event = mock.MagicMock()
tm._add_or_create_stream = mock.MagicMock()
tm._add_or_create_stream.return_value.fire_timestamp = None
tm._ready_to_fire = mock.MagicMock()
m_def.should_fire.return_value = True
tm.add_event(event)
tm.save_event.assert_called_once_with(event)
for td in tm.trigger_definitions:
td.match.assert_called_once_with(event)
m_def.get_distinguishing_traits.assert_called_once_with(event, m_def.match.return_value)
@ -211,13 +212,15 @@ class TestTriggerManager(unittest.TestCase):
m_def = tm.trigger_definitions[2]
tm.trigger_definitions[0].match.return_value = None
tm.trigger_definitions[1].match.return_value = None
event = "test event"
event = mock.MagicMock(name='event', spec=dict)
tm.save_event = mock.MagicMock()
tm._add_or_create_stream = mock.MagicMock()
tm._add_or_create_stream.return_value.fire_timestamp = "Fire!"
tm._ready_to_fire = mock.MagicMock()
m_def.should_fire.return_value = True
tm.add_event(event)
tm.save_event.assert_called_once_with(event)
for td in tm.trigger_definitions:
td.match.assert_called_once_with(event)
m_def.get_distinguishing_traits.assert_called_once_with(event, m_def.match.return_value)
@ -235,12 +238,14 @@ class TestTriggerManager(unittest.TestCase):
tm.trigger_definitions[0].match.return_value = None
tm.trigger_definitions[1].match.return_value = None
tm.trigger_definitions[2].match.return_value = None
event = "test event"
event = mock.MagicMock(name='event', spec=dict)
tm.save_event = mock.MagicMock()
tm._add_or_create_stream = mock.MagicMock()
tm._add_or_create_stream.return_value.fire_timestamp = "Fire!"
tm._ready_to_fire = mock.MagicMock()
tm.add_event(event)
tm.save_event.assert_called_once_with(event)
for td in tm.trigger_definitions:
td.match.assert_called_once_with(event)
for td in tm.trigger_definitions:

@ -9,6 +9,8 @@ from winchester.db import DBInterface, DuplicateError, LockError
from winchester.config import ConfigManager, ConfigSection, ConfigItem
from winchester.definition import TriggerDefinition
from winchester.models import StreamState
from winchester.trigger_manager import TriggerManager
logger = logging.getLogger(__name__)
@ -148,6 +150,9 @@ class PipelineManager(object):
self.trigger_definitions = [TriggerDefinition(conf) for conf in defs]
self.trigger_map = dict((tdef.name, tdef) for tdef in self.trigger_definitions)
self.trigger_manager = TriggerManager(self.config, db=self.db,
trigger_defs=self.trigger_definitions)
self.pipeline_worker_batch_size = config['pipeline_worker_batch_size']
self.pipeline_worker_delay = config['pipeline_worker_delay']
self.statistics_period = config['statistics_period']
@ -187,7 +192,8 @@ class PipelineManager(object):
self.last_status = self.current_time()
def add_new_events(self, events):
pass
for event in events:
self.trigger_manager.add_event(event)
def _run_pipeline(self, stream, trigger_def, pipeline_name, pipeline_config):
events = self.db.get_stream_events(stream)

@ -142,9 +142,13 @@ class TriggerManager(object):
def save_event(self, event):
traits = event.copy()
message_id = traits.pop('message_id')
timestamp = traits.pop('timestamp')
event_type = traits.pop('event_type')
try:
message_id = traits.pop('message_id')
timestamp = traits.pop('timestamp')
event_type = traits.pop('event_type')
except KeyError as e:
logger.warning("Received invalid event: %s" % e)
return False
try:
self.db.create_event(message_id, event_type,
timestamp, traits)
@ -160,9 +164,7 @@ class TriggerManager(object):
self.received += 1
if self.distiller.to_event(notification_body, cond):
if cond.validate():
event = cond.get_event()
if self.save_event(event):
return event
return cond.get_event()
else:
logger.warning("Received invalid event")
else:
@ -196,14 +198,15 @@ class TriggerManager(object):
stream.id, timestamp))
def add_event(self, event):
for trigger_def in self.trigger_definitions:
matched_criteria = trigger_def.match(event)
if matched_criteria:
dist_traits = trigger_def.get_distinguishing_traits(event, matched_criteria)
stream = self._add_or_create_stream(trigger_def, event, dist_traits)
if stream.fire_timestamp is None:
if trigger_def.should_fire(self.db.get_stream_events(stream)):
self._ready_to_fire(stream, trigger_def)
if self.save_event(event):
for trigger_def in self.trigger_definitions:
matched_criteria = trigger_def.match(event)
if matched_criteria:
dist_traits = trigger_def.get_distinguishing_traits(event, matched_criteria)
stream = self._add_or_create_stream(trigger_def, event, dist_traits)
if stream.fire_timestamp is None:
if trigger_def.should_fire(self.db.get_stream_events(stream)):
self._ready_to_fire(stream, trigger_def)
if (self.current_time() - self.last_status).seconds > self.statistics_period:
self._log_statistics()