From 0420542cfd4879ada86c0b3deb5c273aeb4761b7 Mon Sep 17 00:00:00 2001
From: cindy oneill <cindy.o-neill@hp.com>
Date: Thu, 26 Mar 2015 09:23:12 -0600
Subject: [PATCH] Add or Delete Trigger Definitions to an existing
 TriggerManager and PipelineManager

Change-Id: I0dd97b140d1e175492de507eb1ebaf99d18c5eae
---
 tests/test_pipeline_manager.py | 18 ++++++------
 tests/test_trigger_manager.py  | 17 +++++++++++
 winchester/config.py           |  3 ++
 winchester/pipeline_manager.py | 24 ++++++++++-----
 winchester/trigger_manager.py  | 53 ++++++++++++++++++++++++----------
 5 files changed, 83 insertions(+), 32 deletions(-)

diff --git a/tests/test_pipeline_manager.py b/tests/test_pipeline_manager.py
index 3ba657a..33bb8f0 100644
--- a/tests/test_pipeline_manager.py
+++ b/tests/test_pipeline_manager.py
@@ -327,7 +327,7 @@ class TestPipelineManager(unittest.TestCase):
         pm.db.set_stream_state.return_value = stream
         trigger_def = mock.MagicMock(name='trigger_def')
         trigger_def.fire_pipeline = 'test_fire_pipeline'
-        pm.trigger_map = dict(test=trigger_def)
+        pm.trigger_manager.trigger_map = dict(test=trigger_def)
         pipeline_config = mock.MagicMock(name='pipeline_config')
         pm.pipeline_config = dict(test_fire_pipeline=pipeline_config)
         pm._error_stream = mock.MagicMock(name='_error_stream')
@@ -354,7 +354,7 @@ class TestPipelineManager(unittest.TestCase):
         pm.db.set_stream_state.side_effect = winch_db.LockError('locked!')
         trigger_def = mock.MagicMock(name='trigger_def')
         trigger_def.fire_pipeline = 'test_fire_pipeline'
-        pm.trigger_map = dict(test=trigger_def)
+        pm.trigger_manager.trigger_map = dict(test=trigger_def)
         pipeline_config = mock.MagicMock(name='pipeline_config')
         pm.pipeline_config = dict(test_fire_pipeline=pipeline_config)
         pm._error_stream = mock.MagicMock(name='_error_stream')
@@ -379,7 +379,7 @@ class TestPipelineManager(unittest.TestCase):
         pm.db.set_stream_state.return_value = stream
         trigger_def = mock.MagicMock(name='trigger_def')
         trigger_def.fire_pipeline = None
-        pm.trigger_map = dict(test=trigger_def)
+        pm.trigger_manager.trigger_map = dict(test=trigger_def)
         pm._error_stream = mock.MagicMock(name='_error_stream')
         pm._complete_stream = mock.MagicMock(name='_complete_stream')
         pm._run_pipeline = mock.MagicMock(name='_run_pipeline')
@@ -402,7 +402,7 @@ class TestPipelineManager(unittest.TestCase):
         pm.db.set_stream_state.return_value = stream
         trigger_def = mock.MagicMock(name='trigger_def')
         trigger_def.fire_pipeline = 'test_fire_pipeline'
-        pm.trigger_map = dict(test=trigger_def)
+        pm.trigger_manager.trigger_map = dict(test=trigger_def)
         pipeline_config = mock.MagicMock(name='pipeline_config')
         pm.pipeline_config = dict(test_fire_pipeline=pipeline_config)
         pm._error_stream = mock.MagicMock(name='_error_stream')
@@ -429,7 +429,7 @@ class TestPipelineManager(unittest.TestCase):
         pm.db.set_stream_state.return_value = stream
         trigger_def = mock.MagicMock(name='trigger_def')
         trigger_def.expire_pipeline = 'test_fire_pipeline'
-        pm.trigger_map = dict(test=trigger_def)
+        pm.trigger_manager.trigger_map = dict(test=trigger_def)
         pipeline_config = mock.MagicMock(name='pipeline_config')
         pm.pipeline_config = dict(test_fire_pipeline=pipeline_config)
         pm._error_stream = mock.MagicMock(name='_error_stream')
@@ -456,7 +456,7 @@ class TestPipelineManager(unittest.TestCase):
         pm.db.set_stream_state.side_effect = winch_db.LockError('locked!')
         trigger_def = mock.MagicMock(name='trigger_def')
         trigger_def.expire_pipeline = 'test_fire_pipeline'
-        pm.trigger_map = dict(test=trigger_def)
+        pm.trigger_manager.trigger_map = dict(test=trigger_def)
         pipeline_config = mock.MagicMock(name='pipeline_config')
         pm.pipeline_config = dict(test_fire_pipeline=pipeline_config)
         pm._expire_error_stream = mock.MagicMock(name='_expire_error_stream')
@@ -481,7 +481,7 @@ class TestPipelineManager(unittest.TestCase):
         pm.db.set_stream_state.return_value = stream
         trigger_def = mock.MagicMock(name='trigger_def')
         trigger_def.expire_pipeline = None
-        pm.trigger_map = dict(test=trigger_def)
+        pm.trigger_manager.trigger_map = dict(test=trigger_def)
         pm._expire_error_stream = mock.MagicMock(name='_expire_error_stream')
         pm._complete_stream = mock.MagicMock(name='_complete_stream')
         pm._run_pipeline = mock.MagicMock(name='_run_pipeline')
@@ -504,7 +504,7 @@ class TestPipelineManager(unittest.TestCase):
         pm.db.set_stream_state.return_value = stream
         trigger_def = mock.MagicMock(name='trigger_def')
         trigger_def.expire_pipeline = 'test_fire_pipeline'
-        pm.trigger_map = dict(test=trigger_def)
+        pm.trigger_manager.trigger_map = dict(test=trigger_def)
         pipeline_config = mock.MagicMock(name='pipeline_config')
         pm.pipeline_config = dict(test_fire_pipeline=pipeline_config)
         pm._expire_error_stream = mock.MagicMock(name='_expire_error_stream')
@@ -529,7 +529,7 @@ class TestPipelineManager(unittest.TestCase):
         stream = mock.MagicMock(name='stream')
         stream.name = "my_stream"
         tdef = mock.MagicMock(name='tdef')
-        pm.trigger_map['my_stream'] = tdef
+        pm.trigger_manager.trigger_map['my_stream'] = tdef
         pm.expire_stream = mock.MagicMock(name='expire_stream')
         pm.fire_stream = mock.MagicMock(name='fire_stream')
         pm.current_time = mock.MagicMock(name='current_time')
diff --git a/tests/test_trigger_manager.py b/tests/test_trigger_manager.py
index 61fb694..efb359e 100644
--- a/tests/test_trigger_manager.py
+++ b/tests/test_trigger_manager.py
@@ -298,3 +298,20 @@ class TestTriggerManager(unittest.TestCase):
         self.assertFalse(tm._add_or_create_stream.called)
         self.assertFalse(tm.db.get_stream_events.called)
         self.assertFalse(tm._ready_to_fire.called)
+
+    @mock.patch.object(trigger_manager.ConfigManager, 'wrap')
+    def test_add__del_trigger_definition(self, mock_config_wrap):
+        tm = trigger_manager.TriggerManager('test')
+        tm.db = mock.MagicMock(spec=tm.db)
+        td1 = dict(
+            name='test_trigger1',
+            expiration='$last + 1d',
+            fire_pipeline='test_pipeline',
+            fire_criteria=[dict(event_type='test.thing')],
+            match_criteria=[dict(event_type='test.*')])
+        tdlist = list()
+        tdlist.append(td1)
+        tm.add_trigger_definition(tdlist)
+        self.assertTrue('test_trigger1' in tm.trigger_map)
+        tm.delete_trigger_definition('test_trigger1')
+        self.assertFalse('test_trigger1' in tm.trigger_map)
diff --git a/winchester/config.py b/winchester/config.py
index d1a2815..88bcbf1 100644
--- a/winchester/config.py
+++ b/winchester/config.py
@@ -104,6 +104,9 @@ class ConfigManager(collections.Mapping):
             return self._defaults[key]
         raise KeyError(key)
 
+    def contains(self, key):
+        return key in self._configs
+
     def add_config_path(self, *args):
         for path in args:
             if path not in self.config_paths:
diff --git a/winchester/pipeline_manager.py b/winchester/pipeline_manager.py
index 5a779dd..d038824 100644
--- a/winchester/pipeline_manager.py
+++ b/winchester/pipeline_manager.py
@@ -82,6 +82,7 @@ class Pipeline(object):
 
     def handle_events(self, events, stream, debugger):
         self.env['stream_id'] = stream.id
+        self.env['stream_name'] = stream.name
         event_ids = set(e['message_id'] for e in events)
         try:
             for handler in self.handlers:
@@ -156,6 +157,7 @@ class PipelineManager(object):
                      % (self.proc_name, str(config)))
         config = ConfigManager.wrap(config, self.config_description())
         self.config = config
+        self.trigger_definitions = []
         config.check_config()
         config.add_config_path(*config['config_path'])
         if time_sync is None:
@@ -189,12 +191,12 @@ class PipelineManager(object):
         if trigger_defs is not None:
             self.trigger_definitions = trigger_defs
         else:
-            defs = config.load_file(config['trigger_definitions'])
-            logger.debug("Loaded trigger definitions %s" % str(defs))
-            self.trigger_definitions = [TriggerDefinition(conf, None) for conf
-                                        in defs]
-        self.trigger_map = dict(
-            (tdef.name, tdef) for tdef in self.trigger_definitions)
+            # trigger_definition config file is optional
+            if config.contains('trigger_definitions'):
+                defs = config.load_file(config['trigger_definitions'])
+                logger.debug("Loaded trigger definitions %s" % str(defs))
+                self.trigger_definitions = [
+                    TriggerDefinition(conf, None) for conf in defs]
 
         self.trigger_manager = TriggerManager(
             self.config, db=self.db,
@@ -292,8 +294,14 @@ class PipelineManager(object):
         return trigger_def.debugger if trigger_def is not None else \
             self.trigger_manager.debug_manager.get_debugger(None)
 
+    def add_trigger_definition(self, list_of_triggerdefs):
+        self.trigger_manager.add_trigger_definition(list_of_triggerdefs)
+
+    def delete_trigger_definition(self, trigger_def_name):
+        self.trigger_manager.delete_trigger_definition(trigger_def_name)
+
     def fire_stream(self, stream):
-        trigger_def = self.trigger_map.get(stream.name)
+        trigger_def = self.trigger_manager.trigger_map.get(stream.name)
         debugger = self.safe_get_debugger(trigger_def)
         try:
             stream = self.db.set_stream_state(stream, StreamState.firing)
@@ -331,7 +339,7 @@ class PipelineManager(object):
         return True
 
     def expire_stream(self, stream):
-        trigger_def = self.trigger_map.get(stream.name)
+        trigger_def = self.trigger_manager.trigger_map.get(stream.name)
         debugger = self.safe_get_debugger(trigger_def)
         try:
             stream = self.db.set_stream_state(stream, StreamState.expiring)
diff --git a/winchester/trigger_manager.py b/winchester/trigger_manager.py
index 1edd344..6ba0910 100644
--- a/winchester/trigger_manager.py
+++ b/winchester/trigger_manager.py
@@ -94,7 +94,7 @@ class TriggerManager(object):
                 help="Path(s) to find additional config files",
                 multiple=True, default='.'),
             distiller_config=ConfigItem(
-                required=True,
+                required=False,
                 help="Name of distiller config file "
                      "describing what to extract from the "
                      "notifications"),
@@ -118,7 +118,7 @@ class TriggerManager(object):
                 help="Database connection info.",
                 config_description=DBInterface.config_description()),
             trigger_definitions=ConfigItem(
-                required=True,
+                required=False,
                 help="Name of trigger definitions file "
                      "defining trigger conditions and what events to "
                      "process for each stream"),
@@ -129,6 +129,7 @@ class TriggerManager(object):
         config = ConfigManager.wrap(config, self.config_description())
         self.config = config
         self.debug_manager = debugging.DebugManager()
+        self.trigger_definitions = []
         config.check_config()
         config.add_config_path(*config['config_path'])
         if time_sync is None:
@@ -142,22 +143,29 @@ class TriggerManager(object):
         if stackdistiller is not None:
             self.distiller = stackdistiller
         else:
-            dist_config = config.load_file(config['distiller_config'])
-            plugmap = self._load_plugins(config['distiller_trait_plugins'],
-                                         distiller.DEFAULT_PLUGINMAP)
-            self.distiller = distiller.Distiller(
-                dist_config,
-                trait_plugin_map=plugmap,
-                catchall=config['catch_all_notifications'])
+            # distiller_config is optional
+            if config.contains('distiller_config'):
+                dist_config = config.load_file(config['distiller_config'])
+                plugmap = self._load_plugins(config['distiller_trait_plugins'],
+                                             distiller.DEFAULT_PLUGINMAP)
+                self.distiller = distiller.Distiller(
+                    dist_config,
+                    trait_plugin_map=plugmap,
+                    catchall=config['catch_all_notifications'])
         if trigger_defs is not None:
             self.trigger_definitions = trigger_defs
             for t in self.trigger_definitions:
                 t.set_debugger(self.debug_manager)
         else:
-            defs = config.load_file(config['trigger_definitions'])
-            self.trigger_definitions = [TriggerDefinition(conf,
-                                                          self.debug_manager)
-                                        for conf in defs]
+            # trigger_definition config file is optional
+            if config.contains('trigger_definitions'):
+                defs = config.load_file(config['trigger_definitions'])
+                self.trigger_definitions = [
+                    TriggerDefinition(conf, self.debug_manager)
+                    for conf in defs]
+        # trigger_map is used to quickly access existing trigger_defs
+        self.trigger_map = dict(
+            (tdef.name, tdef) for tdef in self.trigger_definitions)
         self.saved_events = 0
         self.received = 0
         self.last_status = self.current_time()
@@ -246,8 +254,23 @@ class TriggerManager(object):
         timestamp = trigger_def.get_fire_timestamp(self.current_time())
         self.db.stream_ready_to_fire(stream, timestamp)
         trigger_def.debugger.bump_counter("Ready to fire")
-        logger.debug("Stream %s ready to fire at %s" % (
-            stream.id, timestamp))
+        logger.debug("Stream %s ready to fire at %s" % (stream.id, timestamp))
+
+    def add_trigger_definition(self, list_of_triggerdefs, debugger=None):
+        if debugger is None:
+            debugger = self.debug_manager
+        for td in list_of_triggerdefs:
+            if (td['name'] in self.trigger_map) is False:
+                # Only add if name is unique
+                tdef = TriggerDefinition(td, debugger)
+                self.trigger_definitions.append(tdef)
+                self.trigger_map[td['name']] = tdef
+
+    def delete_trigger_definition(self, trigger_def_name):
+        if trigger_def_name in self.trigger_map:
+            self.trigger_definitions.remove(
+                self.trigger_map.get(trigger_def_name))
+            del self.trigger_map[trigger_def_name]
 
     def add_event(self, event):
         if self.save_event(event):