diff --git a/etc/triggers.yaml b/etc/triggers.yaml index f415bfb..77fd01d 100644 --- a/etc/triggers.yaml +++ b/etc/triggers.yaml @@ -17,7 +17,7 @@ # memory_mb: # numeric: "> 4096" - event_type: compute.instance.exists - map_distingushed_trait: + map_distingushed_by: timestamp: audit_period_beginning fire_criteria: - event_type: compute.instance.exists diff --git a/requirements.txt b/requirements.txt index c0e0ae7..f59e5d1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,3 +6,5 @@ MySQL-python alembic>=0.4.1 enum34>=1.0 SQLAlchemy>=0.9.6 +python-dateutil +requests diff --git a/tests/test_pipeline_manager.py b/tests/test_pipeline_manager.py index 58071bf..3646ff9 100644 --- a/tests/test_pipeline_manager.py +++ b/tests/test_pipeline_manager.py @@ -15,6 +15,8 @@ class TestPipeline(unittest.TestCase): def setUp(self): super(TestPipeline, self).setUp() self.debugger = debugging.NoOpDebugger() + self.fake_stream = mock.MagicMock(name="fake_stream") + self.fake_stream.id = "stream-1234" def test_check_handler_config(self): @@ -82,7 +84,7 @@ class TestPipeline(unittest.TestCase): p.commit = mock.MagicMock(name='commit') p.rollback = mock.MagicMock(name='rollback') - ret = p.handle_events(test_events, self.debugger) + ret = p.handle_events(test_events, self.fake_stream, self.debugger) handler_class1.return_value.handle_events.assert_called_once_with(test_events, p.env) events1 = handler_class1.return_value.handle_events.return_value handler_class2.return_value.handle_events.assert_called_once_with(events1, p.env) @@ -116,7 +118,7 @@ class TestPipeline(unittest.TestCase): p.rollback = mock.MagicMock(name='rollback') with self.assertRaises(pipeline_manager.PipelineExecutionError): - p.handle_events(test_events, self.debugger) + p.handle_events(test_events, self.fake_stream, self.debugger) p.rollback.assert_called_once_with(self.debugger) self.assertFalse(p.commit.called) @@ -261,7 +263,7 @@ class TestPipelineManager(unittest.TestCase): pipeline = mock_pipeline.return_value pipeline.handle_events.assert_called_once_with( - pm.db.get_stream_events.return_value, self.debugger) + pm.db.get_stream_events.return_value, stream, self.debugger) pm.add_new_events.assert_called_once_with( mock_pipeline.return_value.handle_events.return_value) self.assertTrue(ret) @@ -291,7 +293,7 @@ class TestPipelineManager(unittest.TestCase): pm.pipeline_handlers) pipeline.handle_events.assert_called_once_with( - pm.db.get_stream_events.return_value, self.debugger) + pm.db.get_stream_events.return_value, stream, self.debugger) self.assertFalse(pm.add_new_events.called) self.assertFalse(ret) diff --git a/tests/test_time_sync.py b/tests/test_time_sync.py new file mode 100644 index 0000000..1cbfc0b --- /dev/null +++ b/tests/test_time_sync.py @@ -0,0 +1,132 @@ +import unittest2 as unittest + +import datetime +import mock + +from winchester import time_sync + + +class TestTimeSyncNoEndpoint(unittest.TestCase): + def setUp(self): + super(TestTimeSyncNoEndpoint, self).setUp() + self.time_sync = time_sync.TimeSync({}) + + def test_should_update(self): + now = datetime.datetime.utcnow() + self.assertIsNone(self.time_sync.last_update) + self.assertTrue(self.time_sync._should_update(now)) + + short = now + datetime.timedelta(seconds=1) + lng = now + datetime.timedelta(seconds=60) + self.time_sync.last_update = now + + self.assertFalse(self.time_sync._should_update(short)) + self.assertTrue(self.time_sync._should_update(lng)) + + def test_current_time(self): + with mock.patch.object(self.time_sync, "_get_now") as w: + w.return_value = "123" + self.assertEqual(self.time_sync.current_time(), "123") + + def test_publish(self): + with mock.patch.object(time_sync.dateutil.parser, "parse") as p: + self.time_sync.publish("foo") + self.assertEquals(0, p.call_count) + + +class BlowUp(Exception): + pass + + +class TestTimeSyncEndpointPublisher(unittest.TestCase): + def setUp(self): + super(TestTimeSyncEndpointPublisher, self).setUp() + self.time_sync = time_sync.TimeSync( + {"time_sync_endpoint":"example.com"}, publishes=True) + + def test_fetch_good(self): + with mock.patch.object(time_sync.requests, "get") as r: + response = mock.MagicMock() + response.text = "now" + r.return_value = response + self.assertEquals("now", self.time_sync._fetch()) + + def test_fetch_empty(self): + with mock.patch.object(time_sync.time, "sleep") as t: + with mock.patch.object(time_sync.requests, "get") as r: + response = mock.MagicMock() + response.text = "" + r.return_value = response + t.side_effect = BlowUp + with self.assertRaises(BlowUp): + self.time_sync._fetch() + + def test_fetch_None(self): + with mock.patch.object(time_sync.time, "sleep") as t: + with mock.patch.object(time_sync.requests, "get") as r: + response = mock.MagicMock() + response.text = "None" + r.return_value = response + t.side_effect = BlowUp + with self.assertRaises(BlowUp): + self.time_sync._fetch() + + def test_current_time(self): + self.time_sync.last_tyme = "now" + with mock.patch.object(self.time_sync, "_should_update") as u: + self.assertEquals("now", self.time_sync.current_time()) + self.assertEquals(0, u.call_count) + + def test_publish(self): + with mock.patch.object(time_sync.dateutil.parser, "parse") as p: + p.return_value = "datetime object" + with mock.patch.object(self.time_sync, "_should_update") as u: + u.return_value = True + + with mock.patch.object(time_sync.requests, "post") as r: + r.return_value = "" + + self.time_sync.publish("string datetime") + + r.assert_called_once_with("example.com/time", + data="string datetime") + + def test_publish_fails(self): + with mock.patch.object(time_sync.dateutil.parser, "parse") as p: + p.return_value = "datetime object" + with mock.patch.object(self.time_sync, "_should_update") as u: + u.return_value = True + with mock.patch.object(time_sync.requests, "post") as r: + r.side_effect = BlowUp + with mock.patch.object(time_sync.logger, "exception") as e: + self.time_sync.publish("string datetime") + self.assertEquals(1, e.call_count) + + +class TestTimeSyncEndpointConsumer(unittest.TestCase): + def setUp(self): + super(TestTimeSyncEndpointConsumer, self).setUp() + self.time_sync = time_sync.TimeSync( + {"time_sync_endpoint":"example.com"}) + + def test_current_time(self): + with mock.patch.object(self.time_sync, "_should_update") as u: + u.return_value = True + with mock.patch.object(time_sync.dateutil.parser, "parse") as p: + p.return_value = "datetime object" + with mock.patch.object(self.time_sync, "_fetch") as r: + r.return_value = "string datetime" + + self.assertEquals(self.time_sync.current_time(), + "datetime object") + + def test_current_time_fails(self): + self.time_sync.last_tyme = "no change" + with mock.patch.object(self.time_sync, "_should_update") as u: + u.return_value = True + with mock.patch.object(self.time_sync, "_fetch") as r: + r.side_effect = BlowUp + with mock.patch.object(time_sync.logger, "exception") as e: + self.assertEquals(self.time_sync.current_time(), + "no change") + self.assertEquals(1, e.call_count) diff --git a/tests/test_usage_handler.py b/tests/test_usage_handler.py new file mode 100644 index 0000000..af76003 --- /dev/null +++ b/tests/test_usage_handler.py @@ -0,0 +1,200 @@ +import unittest2 as unittest + +import datetime +import mock + +from winchester import pipeline_handler + + +class TestUsageHandler(unittest.TestCase): + def setUp(self): + super(TestUsageHandler, self).setUp() + self.handler = pipeline_handler.UsageHandler() + + def test_find_exists_happyday(self): + start = datetime.datetime(2014, 12, 31, 0, 0, 0) + end = start + datetime.timedelta(days=1) + events = [{'event_type': 'event_1'}, + {'event_type': 'event_2'}, + {'event_type': 'compute.instance.exists', + 'audit_period_beginning': start, + 'audit_period_ending': end}] + + exists = self.handler._find_exists(events) + self.assertEquals(exists, events[2]) + + def test_find_exists_none(self): + events = [{'event_type': 'event_1'}, + {'event_type': 'event_2'}] + + with self.assertRaises(pipeline_handler.UsageException): + self.handler._find_exists(events) + + def test_find_exists_midday(self): + start = datetime.datetime(2014, 12, 31, 1, 1, 1) + end = datetime.datetime(2014, 12, 31, 1, 1, 2) + events = [{'event_type': 'event_1'}, + {'event_type': 'event_2'}, + {'event_type': 'compute.instance.exists', + 'audit_period_beginning': start, + 'audit_period_ending': end}] + + with self.assertRaises(pipeline_handler.UsageException): + self.handler._find_exists(events) + + def test_find_exists_long(self): + start = datetime.datetime(2014, 12, 31, 0, 0, 0) + end = start + datetime.timedelta(days=2) + events = [{'event_type': 'event_1'}, + {'event_type': 'event_2'}, + {'event_type': 'compute.instance.exists', + 'audit_period_beginning': start, + 'audit_period_ending': end}] + + with self.assertRaises(pipeline_handler.UsageException): + self.handler._find_exists(events) + + def test_find_exists_no_audit_periods(self): + events = [{'event_type': 'event_1'}, + {'event_type': 'event_2'}, + {'event_type': 'compute.instance.exists'}] + + with self.assertRaises(pipeline_handler.UsageException): + self.handler._find_exists(events) + + def test_extract_launched_at(self): + with self.assertRaises(pipeline_handler.UsageException): + self.handler._extract_launched_at({}) + self.assertEquals("foo", self.handler._extract_launched_at( + {'launched_at': 'foo'})) + + def test_extract_interesting(self): + interesting = ["a", "b", "c"] + e1 = {'event_type': 'a'} + e2 = {'event_type': 'b'} + e3 = {'event_type': 'c'} + e4 = {'event_type': 'd'} + e5 = {'event_type': 'e'} + self.assertEquals([e1, e2, e3], + self.handler._extract_interesting_events( + [e4, e1, e2, e3, e5], interesting)) + + def test_verify_fields_no_match(self): + exists = {'a': 1, 'b': 2, 'c': 3} + launched = exists + self.handler._verify_fields(exists, launched, ['d', 'e', 'f']) + + def test_verify_fields_happyday(self): + exists = {'a': 1, 'b': 2, 'c': 3} + launched = exists + self.handler._verify_fields(exists, launched, ['a', 'b', 'c']) + + def test_verify_fields_mismatch(self): + exists = {'a': 1, 'b': 2, 'c': 3} + launched = {'a': 10, 'b': 20, 'c': 30} + with self.assertRaises(pipeline_handler.UsageException): + self.handler._verify_fields(exists, launched, ['a', 'b', 'c']) + + def test_confirm_delete_no_delete_events(self): + with self.assertRaises(pipeline_handler.UsageException) as e: + self.handler._confirm_delete({'deleted_at': 'now', + 'state': 'active'}, [], []) + self.assertEquals("U3", e.code) + + deleted_at = datetime.datetime(2014, 12, 31, 1, 0, 0) + launched_at = datetime.datetime(2014, 12, 31, 2, 0, 0) + with self.assertRaises(pipeline_handler.UsageException) as e: + self.handler._confirm_delete({'deleted_at': deleted_at, + 'launched_at': launched_at, + 'state': 'deleted'}, [], []) + self.assertEquals("U4", e.code) + + self.handler.audit_beginning = datetime.datetime(2014, 12, 30, 0, 0, 0) + self.handler.audit_ending = datetime.datetime(2014, 12, 31, 0, 0, 0) + deleted_at = datetime.datetime(2014, 12, 30, 2, 0, 0) + launched_at = datetime.datetime(2014, 12, 30, 1, 0, 0) + with self.assertRaises(pipeline_handler.UsageException) as e: + self.handler._confirm_delete({'deleted_at': deleted_at, + 'launched_at': launched_at, + 'state': 'deleted'}, [], []) + self.assertEquals("U5", e.code) + + # Test the do-nothing scenario + self.handler._confirm_delete({}, [], []) + + def test_confirm_delete_delete_events(self): + with self.assertRaises(pipeline_handler.UsageException) as e: + self.handler._confirm_delete({}, [{}], []) + self.assertEquals("U6", e.code) + + with self.assertRaises(pipeline_handler.UsageException) as e: + self.handler._confirm_delete({'deleted_at': 'now'}, [{}, {}], []) + self.assertEquals("U7", e.code) + + with mock.patch.object(self.handler, "_verify_fields") as v: + exists = {'deleted_at': 'now', 'state': 'deleted'} + deleted = {'foo': 1} + self.handler._confirm_delete(exists, [deleted], ['a']) + v.assert_called_with(exists, deleted, ['a']) + + def test_confirm_launched_at(self): + self.handler._confirm_launched_at({'state': 'deleted'}, []) + + self.handler.audit_beginning = datetime.datetime(2014, 12, 30, 0, 0, 0) + self.handler.audit_ending = datetime.datetime(2014, 12, 31, 0, 0, 0) + launched_at = datetime.datetime(2014, 12, 30, 1, 0, 0) + with self.assertRaises(pipeline_handler.UsageException) as e: + self.handler._confirm_launched_at({'state': 'active', + 'launched_at': launched_at}, + [{}]) + self.assertEquals("U8", e.code) + + def test_handle_events_no_exists(self): + env = {'stream_id': 'stream'} + with mock.patch.object(self.handler, "_find_exists") as c: + c.side_effect = pipeline_handler.UsageException("UX", "Error") + events = self.handler.handle_events([], env) + self.assertEquals(0, len(events)) + + def test_handle_events_exists(self): + env = {'stream_id': 123} + with mock.patch.object(self.handler, "_find_exists") as ex: + ex.return_value = {'timestamp':'now', 'instance_id':'inst'} + with mock.patch.object(self.handler, "_do_checks") as c: + events = self.handler.handle_events([], env) + self.assertEquals(1, len(events)) + f = events[0] + self.assertEquals("compute.instance.exists.verified", + f['event_type']) + self.assertEquals("now", f['timestamp']) + self.assertEquals(123, f['stream_id']) + self.assertEquals("inst", f['instance_id']) + self.assertEquals("None", f['error']) + self.assertIsNone(f['error_code']) + + def test_handle_events_bad(self): + env = {'stream_id': 123} + with mock.patch.object(self.handler, "_find_exists") as ex: + ex.return_value = {'timestamp':'now', 'instance_id':'inst'} + with mock.patch.object(self.handler, "_do_checks") as c: + c.side_effect = pipeline_handler.UsageException("UX", "Error") + events = self.handler.handle_events([], env) + self.assertEquals(1, len(events)) + f = events[0] + self.assertEquals("compute.instance.exists.failed", + f['event_type']) + self.assertEquals("now", f['timestamp']) + self.assertEquals(123, f['stream_id']) + self.assertEquals("inst", f['instance_id']) + self.assertEquals("Error", f['error']) + self.assertEquals("UX", f['error_code']) + + + @mock.patch.object(pipeline_handler.UsageHandler, '_get_core_fields') + @mock.patch.object(pipeline_handler.UsageHandler, '_extract_launched_at') + @mock.patch.object(pipeline_handler.UsageHandler, '_find_events') + @mock.patch.object(pipeline_handler.UsageHandler, '_confirm_launched_at') + @mock.patch.object(pipeline_handler.UsageHandler, '_confirm_delete') + def test_do_check(self, cd, cla, fe, ela, gcf): + fe.return_value = [1,2,3] + self.handler._do_checks({}, []) diff --git a/winchester/pipeline_handler.py b/winchester/pipeline_handler.py index 507967a..d3c8a4d 100644 --- a/winchester/pipeline_handler.py +++ b/winchester/pipeline_handler.py @@ -1,6 +1,8 @@ import abc +import datetime import logging import six +import uuid logger = logging.getLogger(__name__) @@ -90,3 +92,186 @@ class LoggingHandler(PipelineHandlerBase): pass +class UsageException(Exception): + def __init__(self, code, message): + super(UsageException, self).__init__(message) + self.code = code + + +class UsageHandler(PipelineHandlerBase): + def _find_exists(self, events): + exists = None + + # We could have several .exists records, but only the + # end-of-day .exists will have audit_period_* time of + # 00:00:00 and be 24hrs apart. + for event in events: + apb = event.get('audit_period_beginning') + ape = event.get('audit_period_ending') + if (event['event_type'] == 'compute.instance.exists' + and apb and ape and apb.time() == datetime.time(0, 0, 0) + and ape.time() == datetime.time(0, 0, 0) + and ape.date() == (apb.date() + datetime.timedelta(days=1))): + exists = event + self.audit_beginning = apb + self.audit_ending = ape + break + + if not exists: + raise UsageException("U0", "No .exists record.") + + return exists + + def _extract_launched_at(self, exists): + if not exists.get('launched_at'): + raise UsageException("U1", ".exists has no launched_at value.") + return exists['launched_at'] + + def _extract_interesting_events(self, events, interesting): + return [event for event in events + if event['event_type'] in interesting] + + def _find_events(self, events): + interesting = ['compute.instance.rebuild.start', + 'compute.instance.resize.prep.start', + 'compute.instance.resize.revert.start', + 'compute.instance.rescue.start', + 'compute.instance.create.end', + 'compute.instance.rebuild.end', + 'compute.instance.resize.finish.end', + 'compute.instance.resize.revert.end', + 'compute.instance.rescue.end'] + + # We could easily end up with no events in final_set if + # there were no operations performed on an instance that day. + # We'll still get a .exists for every active instance though. + + return self._extract_interesting_events(events, interesting) + + def _find_deleted_events(self, events): + interesting = ['compute.instance.delete.end'] + return self._extract_interesting_events(events, interesting) + + def _verify_fields(self, exists, launch, fields): + for field in fields: + if field not in exists and field not in launch: + continue + if exists[field] != launch[field]: + raise UsageException("U2", + "Conflicting '%s' values ('%s' != '%s')" + % (field, exists[field], launch[field])) + + def _confirm_delete(self, exists, deleted, fields): + deleted_at = exists.get('deleted_at') + state = exists.get('state') + + if deleted_at and state != "deleted": + raise UsageException("U3", ".exists state not 'deleted' but " + "deleted_at is set.") + + if deleted_at and not deleted: + # We've already confirmed it's in the "deleted" state. + launched_at = exists.get('launched_at') + if deleted_at < launched_at: + raise UsageException("U4", + ".exists deleted_at < launched_at.") + + # Is the deleted_at within this audit period? + if (deleted_at >= self.audit_beginning + and deleted_at <= self.audit_ending): + raise UsageException("U5", ".exists deleted_at in audit " + "period, but no matching .delete event found.") + + if not deleted_at and deleted: + raise UsageException("U6", ".deleted events found but .exists " + "has no deleted_at value.") + + if len(deleted) > 1: + raise UsageException("U7", "Multiple .delete.end events") + + if deleted: + self._verify_fields(exists, deleted[0], fields) + + def _confirm_launched_at(self, exists, events): + if exists.get('state') != 'active': + return + + # Does launched_at have a value within this audit period? + # If so, we should have a related event. Otherwise, this + # instance was created previously. + launched_at = exists['launched_at'] + if (launched_at >= self.audit_beginning + and launched_at <= self.audit_ending and len(events) == 1): + raise UsageException("U8", ".exists launched_at in audit " + "period, but no related events found.") + + # TODO(sandy): Confirm the events we got set launched_at + # properly. + + def _get_core_fields(self): + """Broken out so derived classes can define their + own trait list.""" + return ['launched_at', 'instance_type_id', 'tenant_id', + 'os_architecture', 'os_version', 'os_distro'] + + def _do_checks(self, exists, events): + core_fields = self._get_core_fields() + delete_fields = ['launched_at', 'deleted_at'] + + self._extract_launched_at(exists) + deleted = self._find_deleted_events(events) + for c in self._find_events(events): + self._verify_fields(exists, c, core_fields) + + self._confirm_launched_at(exists, events) + self._confirm_delete(exists, deleted, delete_fields) + + def handle_events(self, events, env): + self.env = env + self.stream_id = env['stream_id'] + + exists = None + error = None + try: + exists = self._find_exists(events) + self._do_checks(exists, events) + event_type = "compute.instance.exists.verified" + except UsageException as e: + error = e + event_type = "compute.instance.exists.failed" + logger.warn("Stream %s UsageException: (%s) %s" % + (self.stream_id, e.code, e)) + if exists: + logger.warn("Stream %s deleted_at: %s, launched_at: %s, " + "state: %s, APB: %s, APE: %s, #events: %s" % + (self.stream_id, exists.get("deleted_at"), + exists.get("launched_at"), exists.get("state"), + exists.get("audit_period_beginning"), + exists.get("audit_period_ending"), len(events))) + + if len(events) > 1: + logger.warn("Events for Stream: %s" % self.stream_id) + for event in events: + logger.warn("^Event: %s - %s" % + (event['timestamp'], event['event_type'])) + + if exists: + new_event = {'event_type': event_type, + 'message_id': str(uuid.uuid4()), + 'timestamp': exists.get('timestamp', + datetime.datetime.utcnow()), + 'stream_id': int(self.stream_id), + 'instance_id': exists.get('instance_id'), + 'error': str(error), + 'error_code': error and error.code + } + events.append(new_event) + else: + logger.debug("No .exists record") + return events + + def commit(self): + pass + + def rollback(self): + pass diff --git a/winchester/pipeline_manager.py b/winchester/pipeline_manager.py index 5d78b73..160a41b 100644 --- a/winchester/pipeline_manager.py +++ b/winchester/pipeline_manager.py @@ -1,4 +1,3 @@ -import datetime import time import logging import random @@ -9,6 +8,7 @@ from winchester.db import DBInterface, DuplicateError, LockError from winchester.config import ConfigManager, ConfigSection, ConfigItem from winchester.definition import TriggerDefinition from winchester.models import StreamState +from winchester import time_sync as ts from winchester.trigger_manager import TriggerManager @@ -59,7 +59,8 @@ class Pipeline(object): raise PipelineExecutionError("Error loading pipeline", e) self.handlers.append(handler) - def handle_events(self, events, debugger): + def handle_events(self, events, stream, debugger): + self.env['stream_id'] = stream.id event_ids = set(e['message_id'] for e in events) try: for handler in self.handlers: @@ -123,12 +124,16 @@ class PipelineManager(object): return configs def __init__(self, config, db=None, pipeline_handlers=None, - pipeline_config=None, trigger_defs=None): + pipeline_config=None, trigger_defs=None, time_sync=None): logger.debug("PipelineManager: Using config: %s" % str(config)) config = ConfigManager.wrap(config, self.config_description()) self.config = config config.check_config() config.add_config_path(*config['config_path']) + if time_sync is None: + time_sync = ts.TimeSync() + self.time_sync = time_sync + if db is not None: self.db = db else: @@ -160,7 +165,8 @@ class PipelineManager(object): self.trigger_map = dict((tdef.name, tdef) for tdef in self.trigger_definitions) self.trigger_manager = TriggerManager(self.config, db=self.db, - trigger_defs=self.trigger_definitions) + trigger_defs=self.trigger_definitions, + time_sync=time_sync) self.pipeline_worker_batch_size = config['pipeline_worker_batch_size'] self.pipeline_worker_delay = config['pipeline_worker_delay'] @@ -191,7 +197,7 @@ class PipelineManager(object): def current_time(self): # here so it's easily overridden. - return datetime.datetime.utcnow() + return self.time_sync.current_time() def _log_statistics(self): logger.info("Loaded %s streams. Fired %s, Expired %s." % ( @@ -213,7 +219,7 @@ class PipelineManager(object): debugger = trigger_def.debugger try: pipeline = Pipeline(pipeline_name, pipeline_config, self.pipeline_handlers) - new_events = pipeline.handle_events(events, debugger) + new_events = pipeline.handle_events(events, stream, debugger) except PipelineExecutionError: logger.error("Exception in pipeline %s handling stream %s" % ( pipeline_name, stream.id)) diff --git a/winchester/time_sync.py b/winchester/time_sync.py new file mode 100644 index 0000000..351d18f --- /dev/null +++ b/winchester/time_sync.py @@ -0,0 +1,65 @@ +import datetime +import dateutil.parser +import logging +import time + +import requests + +logger = logging.getLogger(__name__) + + +class TimeSync(object): + def __init__(self, config={}, publishes=False): + url = config.get('time_sync_endpoint') + self.endpoint = None + if url: + self.endpoint = "%s/time" % url + logger.debug("Time sync endpoint=%s" % self.endpoint) + self.last_update = None + self.last_tyme = self._get_now() + self.publishes = publishes + + def _get_now(self): + # Broken out for testing + return datetime.datetime.utcnow() + + def _should_update(self, now): + return (not self.last_update or (now - self.last_update).seconds > 20) + + def _fetch(self): + while True: + tyme = requests.get(self.endpoint).text + if tyme and tyme != "None": + return tyme + logger.debug("No time published yet. Waiting ...") + time.sleep(1) + + def current_time(self): + now = self._get_now() + if not self.endpoint: + return now + + if not self.publishes and self._should_update(now): + try: + tyme = self._fetch() + logger.debug("Requested time, got '%s'" % tyme) + self.last_tyme = dateutil.parser.parse(tyme) + except Exception as e: + logger.exception("Could not get time: %s" % e) + self.last_update = now + + return self.last_tyme + + def publish(self, tyme): + if not self.endpoint: + return + + daittyme = dateutil.parser.parse(tyme) + self.last_tyme = daittyme + if self._should_update(daittyme): + self.last_update = daittyme + try: + requests.post(self.endpoint, data=tyme) + logger.debug("Published time: %s" % tyme) + except Exception as e: + logger.exception("Could not publish time: %s" % e) diff --git a/winchester/trigger_manager.py b/winchester/trigger_manager.py index b48b751..4be0f1d 100644 --- a/winchester/trigger_manager.py +++ b/winchester/trigger_manager.py @@ -7,7 +7,7 @@ from winchester.config import ConfigManager, ConfigSection, ConfigItem from winchester import debugging from winchester.db import DBInterface, DuplicateError from winchester.definition import TriggerDefinition - +from winchester import time_sync as ts logger = logging.getLogger(__name__) @@ -79,6 +79,10 @@ class TriggerManager(object): "for stackdistiller. Classes specified with " "simport syntax. See stackdistiller and " "simport docs for more info", default=dict()), + time_sync_endpoint=ConfigItem( + help="URL of time sync service for use with" + " replying old events.", + default=None), catch_all_notifications=ConfigItem( help="Store basic info for all notifications," " even if not listed in distiller config", @@ -94,12 +98,16 @@ class TriggerManager(object): "process for each stream"), ) - def __init__(self, config, db=None, stackdistiller=None, trigger_defs=None): + def __init__(self, config, db=None, stackdistiller=None, trigger_defs=None, + time_sync=None): config = ConfigManager.wrap(config, self.config_description()) self.config = config self.debug_manager = debugging.DebugManager() config.check_config() config.add_config_path(*config['config_path']) + if time_sync is None: + time_sync = ts.TimeSync() + self.time_sync = time_sync if db is not None: self.db = db @@ -147,7 +155,7 @@ class TriggerManager(object): def current_time(self): # here so it's easily overridden. - return datetime.datetime.utcnow() + return self.time_sync.current_time() def save_event(self, event): traits = event.copy() diff --git a/winchester/worker.py b/winchester/worker.py index 499f362..6a1b01e 100644 --- a/winchester/worker.py +++ b/winchester/worker.py @@ -9,6 +9,7 @@ logger = logging.getLogger(__name__) from winchester.config import ConfigManager from winchester.pipeline_manager import PipelineManager +from winchester import time_sync def main(): @@ -27,7 +28,8 @@ def main(): level = conf['log_level'] level = getattr(logging, level.upper()) logging.getLogger('winchester').setLevel(level) - pipe = PipelineManager(conf) + timesync = time_sync.TimeSync(conf) + pipe = PipelineManager(conf, time_sync=timesync) if args.daemon: print "Backgrounding for daemon mode." with daemon.DaemonContext(): diff --git a/winchester/yagi_handler.py b/winchester/yagi_handler.py index 1f748d6..9ae1259 100644 --- a/winchester/yagi_handler.py +++ b/winchester/yagi_handler.py @@ -5,6 +5,7 @@ import yagi.config from winchester.trigger_manager import TriggerManager from winchester.config import ConfigManager +from winchester import time_sync logger = logging.getLogger(__name__) @@ -22,10 +23,13 @@ class WinchesterHandler(BaseHandler): super(WinchesterHandler, self).__init__(app=app, queue_name=queue_name) conf_file = self.config_get("config_file") config = ConfigManager.load_config_file(conf_file) - self.trigger_manager = TriggerManager(config) + self.time_sync = time_sync.TimeSync(config, publishes=True) + self.trigger_manager = TriggerManager(config, time_sync=self.time_sync) def handle_messages(self, messages, env): for notification in self.iterate_payloads(messages, env): + tyme = notification['timestamp'] + self.time_sync.publish(tyme) self.trigger_manager.add_notification(notification) def on_idle(self, num_messages, queue_name):