From c5f0eca828bb0bdf1686bec3d4c1b59989cb1809 Mon Sep 17 00:00:00 2001 From: Sandy Walsh Date: Mon, 19 Jan 2015 16:06:12 -0800 Subject: [PATCH] Generate .verified per .exists Slight change on the old way, which was a single .verified on the one EOD .exists. Now it generates a .verified for every .exists seen. Change-Id: I7bdced936f203cbec4bf8b7ee2b956c388e66ff1 --- tests/test_usage_handler.py | 355 +++++++++++++++++++-------------- winchester/pipeline_handler.py | 222 +++++++++------------ 2 files changed, 303 insertions(+), 274 deletions(-) diff --git a/tests/test_usage_handler.py b/tests/test_usage_handler.py index 9058894..b3fc02d 100644 --- a/tests/test_usage_handler.py +++ b/tests/test_usage_handler.py @@ -11,56 +11,86 @@ class TestUsageHandler(unittest.TestCase): super(TestUsageHandler, self).setUp() self.handler = pipeline_handler.UsageHandler() - def test_find_exists_happyday(self): + def test_get_audit_period(self): + event = {} + apb, ape = self.handler._get_audit_period(event) + self.assertIsNone(apb) + self.assertIsNone(ape) + + event = {'audit_period_beginning': "beginning"} + apb, ape = self.handler._get_audit_period(event) + self.assertEqual(apb, "beginning") + self.assertIsNone(ape) + + event = {'audit_period_ending': "ending"} + apb, ape = self.handler._get_audit_period(event) + self.assertIsNone(apb) + self.assertEqual(ape, "ending") + + def test_is_exists(self): + event = {'event_type': None} + self.assertFalse(self.handler._is_exists(event)) + + event = {'event_type': 'foo'} + self.assertFalse(self.handler._is_exists(event)) + + event = {'event_type': 'compute.instance.exists'} + self.assertTrue(self.handler._is_exists(event)) + + def test_is_non_EOD_exists(self): start = datetime.datetime(2014, 12, 31, 0, 0, 0) end = start + datetime.timedelta(days=1) - events = [{'event_type': 'event_1'}, - {'event_type': 'event_2'}, - {'event_type': 'compute.instance.exists', - 'audit_period_beginning': start, - 'audit_period_ending': end}] + eod = {'event_type': 'compute.instance.exists', + 'audit_period_beginning': start, + 'audit_period_ending': end} + self.assertFalse(self.handler._is_non_EOD_exists(eod)) - exists = self.handler._find_exists(events) - self.assertEquals(exists, events[2]) + start = datetime.datetime(2014, 12, 31, 1, 0, 0) + end = start + datetime.timedelta(hours=1) + eod = {'event_type': 'compute.instance.exists', + 'audit_period_beginning': start, + 'audit_period_ending': end} + self.assertTrue(self.handler._is_non_EOD_exists(eod)) - def test_find_exists_none(self): - events = [{'event_type': 'event_1'}, - {'event_type': 'event_2'}] + eod = {'event_type': 'compute.instance.foo', + 'audit_period_beginning': start, + 'audit_period_ending': end} + self.assertFalse(self.handler._is_non_EOD_exists(eod)) - with self.assertRaises(pipeline_handler.UsageException): - self.handler._find_exists(events) + eod = {'event_type': 'compute.instance.foo', + 'audit_period_ending': end} + self.assertFalse(self.handler._is_non_EOD_exists(eod)) - def test_find_exists_midday(self): - start = datetime.datetime(2014, 12, 31, 1, 1, 1) - end = datetime.datetime(2014, 12, 31, 1, 1, 2) - events = [{'event_type': 'event_1'}, - {'event_type': 'event_2'}, - {'event_type': 'compute.instance.exists', - 'audit_period_beginning': start, - 'audit_period_ending': end}] + def test_is_EOD_exists(self): + start = datetime.datetime(2014, 12, 31, 1, 0, 0) + end = start + datetime.timedelta(days=1) + eod = {'event_type': 'compute.instance.exists', + 'audit_period_beginning': start, + 'audit_period_ending': end} + self.assertFalse(self.handler._is_EOD_exists(eod)) - with self.assertRaises(pipeline_handler.UsageException): - self.handler._find_exists(events) - - def test_find_exists_long(self): start = datetime.datetime(2014, 12, 31, 0, 0, 0) - end = start + datetime.timedelta(days=2) - events = [{'event_type': 'event_1'}, - {'event_type': 'event_2'}, - {'event_type': 'compute.instance.exists', - 'audit_period_beginning': start, - 'audit_period_ending': end}] + end = start + datetime.timedelta(hours=1) + eod = {'event_type': 'compute.instance.exists', + 'audit_period_beginning': start, + 'audit_period_ending': end} + self.assertFalse(self.handler._is_EOD_exists(eod)) - with self.assertRaises(pipeline_handler.UsageException): - self.handler._find_exists(events) + start = datetime.datetime(2014, 12, 31, 0, 0, 0) + end = start + datetime.timedelta(days=1) + eod = {'event_type': 'compute.instance.exists', + 'audit_period_beginning': start, + 'audit_period_ending': end} + self.assertTrue(self.handler._is_EOD_exists(eod)) - def test_find_exists_no_audit_periods(self): - events = [{'event_type': 'event_1'}, - {'event_type': 'event_2'}, - {'event_type': 'compute.instance.exists'}] + eod = {'event_type': 'compute.instance.foo', + 'audit_period_beginning': start, + 'audit_period_ending': end} + self.assertFalse(self.handler._is_EOD_exists(eod)) - with self.assertRaises(pipeline_handler.UsageException): - self.handler._find_exists(events) + eod = {'event_type': 'compute.instance.foo', + 'audit_period_ending': end} + self.assertFalse(self.handler._is_EOD_exists(eod)) def test_extract_launched_at(self): with self.assertRaises(pipeline_handler.UsageException): @@ -109,20 +139,22 @@ class TestUsageHandler(unittest.TestCase): 'state': 'deleted'}, [], []) self.assertEquals("U4", e.code) - self.handler.audit_beginning = datetime.datetime(2014, 12, 30, 0, 0, 0) - self.handler.audit_ending = datetime.datetime(2014, 12, 31, 0, 0, 0) + apb = datetime.datetime(2014, 12, 30, 0, 0, 0) + ape = datetime.datetime(2014, 12, 31, 0, 0, 0) deleted_at = datetime.datetime(2014, 12, 30, 2, 0, 0) launched_at = datetime.datetime(2014, 12, 30, 1, 0, 0) with self.assertRaises(pipeline_handler.UsageException) as e: self.handler._confirm_delete({'deleted_at': deleted_at, 'launched_at': launched_at, + 'audit_period_beginning': apb, + 'audit_period_ending': ape, 'state': 'deleted'}, [], []) self.assertEquals("U5", e.code) # Test the do-nothing scenario self.handler._confirm_delete({}, [], []) - def test_confirm_delete_delete_events(self): + def test_confirm_delete_with_delete_events(self): with self.assertRaises(pipeline_handler.UsageException) as e: self.handler._confirm_delete({}, [{}], []) self.assertEquals("U6", e.code) @@ -138,122 +170,143 @@ class TestUsageHandler(unittest.TestCase): v.assert_called_with(exists, deleted, ['a']) def test_confirm_launched_at(self): - self.handler._confirm_launched_at({'state': 'deleted'}, []) + self.handler._confirm_launched_at([], {'state': 'deleted'}) - self.handler.audit_beginning = datetime.datetime(2014, 12, 30, 0, 0, 0) - self.handler.audit_ending = datetime.datetime(2014, 12, 31, 0, 0, 0) + apb = datetime.datetime(2014, 12, 30, 0, 0, 0) + ape = datetime.datetime(2014, 12, 31, 0, 0, 0) launched_at = datetime.datetime(2014, 12, 30, 1, 0, 0) with self.assertRaises(pipeline_handler.UsageException) as e: - self.handler._confirm_launched_at({'state': 'active', - 'launched_at': launched_at}, - [{}]) + self.handler._confirm_launched_at([], + {'state': 'active', + 'audit_period_beginning': apb, + 'audit_period_ending': ape, + 'launched_at': launched_at}) self.assertEquals("U8", e.code) - def test_handle_events_no_exists(self): - env = {'stream_id': 'stream'} - with mock.patch.object(self.handler, "_find_exists") as c: + def test_process_block_exists(self): + exists = {'event_type':'compute.instance.exists', 'timestamp':'now', + 'instance_id':'inst'} + self.handler.stream_id = 123 + with mock.patch.object(self.handler, "_do_checks") as c: + events = self.handler._process_block([], exists) + self.assertEquals(1, len(events)) + f = events[0] + self.assertEquals("compute.instance.exists.verified", + f['event_type']) + self.assertEquals("now", f['timestamp']) + self.assertEquals(123, f['stream_id']) + self.assertEquals("inst", f['instance_id']) + self.assertEquals("None", f['error']) + self.assertIsNone(f['error_code']) + + def test_process_block_bad(self): + exists = {'event_type': 'compute.instance.exists', 'timestamp':'now', + 'instance_id':'inst'} + self.handler.stream_id = 123 + with mock.patch.object(self.handler, "_do_checks") as c: c.side_effect = pipeline_handler.UsageException("UX", "Error") - events = self.handler.handle_events([], env) - self.assertEquals(0, len(events)) + events = self.handler._process_block([], exists) + self.assertEquals(1, len(events)) + f = events[0] + self.assertEquals("compute.instance.exists.failed", + f['event_type']) + self.assertEquals("now", f['timestamp']) + self.assertEquals(123, f['stream_id']) + self.assertEquals("inst", f['instance_id']) + self.assertEquals("Error", f['error']) + self.assertEquals("UX", f['error_code']) - def test_handle_events_exists(self): - env = {'stream_id': 123} - with mock.patch.object(self.handler, "_find_exists") as ex: - ex.return_value = {'timestamp':'now', 'instance_id':'inst'} - with mock.patch.object(self.handler, "_do_checks") as c: - events = self.handler.handle_events([], env) - self.assertEquals(1, len(events)) - f = events[0] - self.assertEquals("compute.instance.exists.verified", - f['event_type']) - self.assertEquals("now", f['timestamp']) - self.assertEquals(123, f['stream_id']) - self.assertEquals("inst", f['instance_id']) - self.assertEquals("None", f['error']) - self.assertIsNone(f['error_code']) + def test_process_block_warnings(self): + self.handler.warnings = ['one', 'two'] + exists = {'event_type': 'compute.instance.exists', + 'timestamp':'now', 'instance_id':'inst'} + self.handler.stream_id = 123 + with mock.patch.object(self.handler, "_do_checks") as c: + events = self.handler._process_block([], exists) + self.assertEquals(2, len(events)) + self.assertEquals("compute.instance.exists.warnings", + events[0]['event_type']) + self.assertEquals("compute.instance.exists.verified", + events[1]['event_type']) - def test_handle_events_bad(self): - env = {'stream_id': 123} - with mock.patch.object(self.handler, "_find_exists") as ex: - ex.return_value = {'timestamp':'now', 'instance_id':'inst'} - with mock.patch.object(self.handler, "_do_checks") as c: - c.side_effect = pipeline_handler.UsageException("UX", "Error") - events = self.handler.handle_events([], env) - self.assertEquals(1, len(events)) - f = events[0] - self.assertEquals("compute.instance.exists.failed", - f['event_type']) - self.assertEquals("now", f['timestamp']) - self.assertEquals(123, f['stream_id']) - self.assertEquals("inst", f['instance_id']) - self.assertEquals("Error", f['error']) - self.assertEquals("UX", f['error_code']) - - def test_handle_events_warnings(self): - def fake_find_exists(events): - self.handler.warnings = ['one', 'two'] - return {'timestamp':'now', 'instance_id':'inst'} - - env = {'stream_id': 123} - with mock.patch.object(self.handler, "_find_exists") as ex: - ex.side_effect = fake_find_exists - with mock.patch.object(self.handler, "_do_checks") as c: - events = self.handler.handle_events([], env) - self.assertEquals(2, len(events)) - self.assertEquals("compute.instance.exists.warnings", - events[0]['event_type']) - self.assertEquals("compute.instance.exists.verified", - events[1]['event_type']) - - def test_confirm_non_EOD_exists_no_events(self): - events = [] - self.assertEquals(len(self.handler.warnings), 0) - self.handler._confirm_non_EOD_exists(events) - self.assertEquals(len(self.handler.warnings), 0) - - def test_confirm_non_EOD_exists_no_interesting(self): - events = [{'event_type': 'foo'}] - self.assertEquals(len(self.handler.warnings), 0) - self.handler._confirm_non_EOD_exists(events) - self.assertEquals(len(self.handler.warnings), 0) - - def test_confirm_non_EOD_exists_no_exists(self): - events = [{'event_type': 'compute.instance.rebuild.start', - 'message_id': 'xxx'}] - self.assertEquals(len(self.handler.warnings), 0) - self.handler._confirm_non_EOD_exists(events) - self.assertEquals(len(self.handler.warnings), 1) - x = "Interesting" - self.assertEquals(self.handler.warnings[0][:len(x)], x) - - def test_confirm_non_EOD_exists_good(self): - events = [{'event_type': 'compute.instance.rebuild.start'}, - {'event_type': 'compute.instance.exists'}] - self.assertEquals(len(self.handler.warnings), 0) - with mock.patch.object(self.handler, "_is_non_EOD_exists") as eod: - with mock.patch.object(self.handler, "_verify_fields") as vf: - eod.return_value = True - self.handler._confirm_non_EOD_exists(events) - self.assertEquals(len(self.handler.warnings), 0) - - def test_confirm_non_EOD_exists_not_EOD(self): - events = [{'event_type': 'compute.instance.exists', - 'message_id': 'blah'}] - self.assertEquals(len(self.handler.warnings), 0) - with mock.patch.object(self.handler, "_is_non_EOD_exists") as eod: - with mock.patch.object(self.handler, "_verify_fields") as vf: - eod.return_value = True - self.handler._confirm_non_EOD_exists(events) - self.assertEquals(len(self.handler.warnings), 1) - x = "Non-EOD" - self.assertEquals(self.handler.warnings[0][:len(x)], x) - - @mock.patch.object(pipeline_handler.UsageHandler, '_confirm_non_EOD_exists') - @mock.patch.object(pipeline_handler.UsageHandler, '_get_core_fields') - @mock.patch.object(pipeline_handler.UsageHandler, '_extract_launched_at') - @mock.patch.object(pipeline_handler.UsageHandler, '_find_events') @mock.patch.object(pipeline_handler.UsageHandler, '_confirm_launched_at') + @mock.patch.object(pipeline_handler.UsageHandler, '_get_core_fields') + @mock.patch.object(pipeline_handler.UsageHandler, '_verify_fields') + @mock.patch.object(pipeline_handler.UsageHandler, '_is_non_EOD_exists') + @mock.patch.object(pipeline_handler.UsageHandler, '_find_deleted_events') @mock.patch.object(pipeline_handler.UsageHandler, '_confirm_delete') - def test_do_check(self, cd, cla, fe, ela, gcf, cnee): - fe.return_value = [1,2,3] - self.handler._do_checks({}, []) + def test_do_check_no_interesting_EOD_exists(self, cd, fde, inee, vf, + gcf, cla): + block = [] + exists = {'event_type': 'compute.instance.exists', + 'message_id': 2} + inee.return_value = False + self.handler._do_checks(block, exists) + self.assertTrue(len(self.handler.warnings) == 0) + self.assertFalse(vf.called) + + @mock.patch.object(pipeline_handler.UsageHandler, '_confirm_launched_at') + @mock.patch.object(pipeline_handler.UsageHandler, '_get_core_fields') + @mock.patch.object(pipeline_handler.UsageHandler, '_verify_fields') + @mock.patch.object(pipeline_handler.UsageHandler, '_is_non_EOD_exists') + @mock.patch.object(pipeline_handler.UsageHandler, '_find_deleted_events') + @mock.patch.object(pipeline_handler.UsageHandler, '_confirm_delete') + def test_do_check_no_interesting_non_EOD(self, cd, fde, inee, vf, + gcf, cla): + block = [] + exists = {'event_type': 'compute.instance.exists', + 'message_id': 2} + inee.return_value = True + self.handler._do_checks(block, exists) + self.assertTrue(len(self.handler.warnings) == 1) + self.assertFalse(vf.called) + + @mock.patch.object(pipeline_handler.UsageHandler, '_confirm_launched_at') + @mock.patch.object(pipeline_handler.UsageHandler, '_get_core_fields') + @mock.patch.object(pipeline_handler.UsageHandler, '_verify_fields') + @mock.patch.object(pipeline_handler.UsageHandler, '_is_non_EOD_exists') + @mock.patch.object(pipeline_handler.UsageHandler, '_find_deleted_events') + @mock.patch.object(pipeline_handler.UsageHandler, '_confirm_delete') + def test_do_check_interesting(self, cd, fde, inee, vf, gcf, cla): + block = [{'event_type': 'compute.instance.rebuild.start', + 'message_id': 1}] + exists = {'event_type': 'compute.instance.exists', + 'message_id': 2} + self.handler._do_checks(block, exists) + self.assertTrue(len(self.handler.warnings) == 0) + self.assertTrue(vf.called) + + def test_handle_events_no_data(self): + env = {'stream_id': 123} + events = self.handler.handle_events([], env) + self.assertEquals(0, len(events)) + + def test_handle_events_no_exists(self): + env = {'stream_id': 123} + raw = [{'event_type': 'foo'}] + events = self.handler.handle_events(raw, env) + self.assertEquals(2, len(events)) + self.assertEquals("compute.instance.exists.failed", + events[1]['event_type']) + + @mock.patch.object(pipeline_handler.UsageHandler, '_process_block') + def test_handle_events_exists(self, pb): + env = {'stream_id': 123} + raw = [{'event_type': 'foo'}, + {'event_type': 'compute.instance.exists'}] + events = self.handler.handle_events(raw, env) + self.assertEquals(2, len(events)) + self.assertTrue(pb.called) + + @mock.patch.object(pipeline_handler.UsageHandler, '_process_block') + def test_handle_events_dangling(self, pb): + env = {'stream_id': 123} + raw = [{'event_type': 'foo'}, + {'event_type': 'compute.instance.exists'}, + {'event_type': 'foo'}, + ] + events = self.handler.handle_events(raw, env) + self.assertEquals(4, len(events)) + self.assertEquals("compute.instance.exists.failed", + events[3]['event_type']) + self.assertTrue(pb.called) diff --git a/winchester/pipeline_handler.py b/winchester/pipeline_handler.py index 4fab47f..da63951 100644 --- a/winchester/pipeline_handler.py +++ b/winchester/pipeline_handler.py @@ -103,40 +103,33 @@ class UsageHandler(PipelineHandlerBase): super(UsageHandler, self).__init__(**kw) self.warnings = [] + def _get_audit_period(self, event): + apb = event.get('audit_period_beginning') + ape = event.get('audit_period_ending') + return apb, ape + + def _is_exists(self, event): + return event['event_type'] == 'compute.instance.exists' + def _is_non_EOD_exists(self, event): # For non-EOD .exists, we just check that the APB and APE are # not 24hrs apart. We could check that it's not midnight, but # could be possible (though unlikely). # And, if we don't find any extras, don't error out ... # we'll do that later. - apb = event.get('audit_period_beginning') - ape = event.get('audit_period_ending') - return (event['event_type'] == 'compute.instance.exists' - and apb and ape + apb, ape = self._get_audit_period(event) + return (self._is_exists(event) and apb and ape and ape.date() != (apb.date() + datetime.timedelta(days=1))) - def _find_exists(self, events): - exists = None - + def _is_EOD_exists(self, event): # We could have several .exists records, but only the # end-of-day .exists will have audit_period_* time of # 00:00:00 and be 24hrs apart. - for event in events: - apb = event.get('audit_period_beginning') - ape = event.get('audit_period_ending') - if (event['event_type'] == 'compute.instance.exists' - and apb and ape and apb.time() == datetime.time(0, 0, 0) - and ape.time() == datetime.time(0, 0, 0) - and ape.date() == (apb.date() + datetime.timedelta(days=1))): - exists = event - self.audit_beginning = apb - self.audit_ending = ape - break - - if not exists: - raise UsageException("U0", "No .exists notification found.") - - return exists + apb, ape = self._get_audit_period(event) + return (self._is_exists(event) and apb and ape + and apb.time() == datetime.time(0, 0, 0) + and ape.time() == datetime.time(0, 0, 0) + and ape.date() == (apb.date() + datetime.timedelta(days=1))) def _extract_launched_at(self, exists): if not exists.get('launched_at'): @@ -147,22 +140,6 @@ class UsageHandler(PipelineHandlerBase): return [event for event in events if event['event_type'] in interesting] - def _find_events(self, events): - # We could easily end up with no events in final_set if - # there were no operations performed on an instance that day. - # We'll still get a .exists for every active instance though. - interesting = ['compute.instance.rebuild.start', - 'compute.instance.resize.prep.start', - 'compute.instance.resize.revert.start', - 'compute.instance.rescue.start', - 'compute.instance.create.end', - 'compute.instance.rebuild.end', - 'compute.instance.resize.finish.end', - 'compute.instance.resize.revert.end', - 'compute.instance.rescue.end'] - - return self._extract_interesting_events(events, interesting) - def _find_deleted_events(self, events): interesting = ['compute.instance.delete.end'] return self._extract_interesting_events(events, interesting) @@ -176,15 +153,20 @@ class UsageHandler(PipelineHandlerBase): "Conflicting '%s' values ('%s' != '%s')" % (field, this[field], that[field])) - def _confirm_delete(self, exists, deleted, fields): + def _confirm_delete(self, exists, delete_events, fields): deleted_at = exists.get('deleted_at') state = exists.get('state') + apb, ape = self._get_audit_period(exists) + + if not deleted_at and delete_events: + raise UsageException("U6", ".deleted events found but .exists " + "has no deleted_at value.") if deleted_at and state != "deleted": raise UsageException("U3", ".exists state not 'deleted' but " ".exists deleted_at is set.") - if deleted_at and not deleted: + if deleted_at and not delete_events: # We've already confirmed it's in the "deleted" state. launched_at = exists.get('launched_at') if deleted_at < launched_at: @@ -192,33 +174,31 @@ class UsageHandler(PipelineHandlerBase): ".exists deleted_at < .exists launched_at.") # Is the deleted_at within this audit period? - if (deleted_at >= self.audit_beginning - and deleted_at <= self.audit_ending): + if (apb and ape and deleted_at >= apb and deleted_at <= ape): raise UsageException("U5", ".exists deleted_at in audit " "period, but no matching .delete event found.") - if not deleted_at and deleted: - raise UsageException("U6", ".deleted events found but .exists " - "has no deleted_at value.") - - if len(deleted) > 1: + if len(delete_events) > 1: raise UsageException("U7", "Multiple .delete.end events") - if deleted: - self._verify_fields(exists, deleted[0], fields) + if delete_events: + self._verify_fields(exists, delete_events[0], fields) - def _confirm_launched_at(self, exists, events): + def _confirm_launched_at(self, block, exists): if exists.get('state') != 'active': return + apb, ape = self._get_audit_period(exists) + # Does launched_at have a value within this audit period? # If so, we should have a related event. Otherwise, this # instance was created previously. - launched_at = exists['launched_at'] - if (launched_at >= self.audit_beginning - and launched_at <= self.audit_ending and len(events) == 1): - raise UsageException("U8", ".exists launched_at in audit " - "period, but no related events found.") + launched_at = self._extract_launched_at(exists) + if (apb and ape and + launched_at >= apb and launched_at <= ape and + len(block) == 0): + raise UsageException("U8", ".exists launched_at in audit " + "period, but no related events found.") # TODO(sandy): Confirm the events we got set launched_at # properly. @@ -229,111 +209,107 @@ class UsageHandler(PipelineHandlerBase): return ['launched_at', 'instance_flavor_id', 'tenant_id', 'os_architecture', 'os_version', 'os_distro'] - def _confirm_non_EOD_exists(self, events): + def _do_checks(self, block, exists): interesting = ['compute.instance.rebuild.start', 'compute.instance.resize.prep.start', 'compute.instance.rescue.start'] + self._confirm_launched_at(block, exists) + + fields = self._get_core_fields() last_interesting = None - fields = ['launched_at', 'deleted_at'] - for event in events: + for event in block: if event['event_type'] in interesting: last_interesting = event - elif (event['event_type'] == 'compute.instance.exists' and - self._is_non_EOD_exists(event)): - if last_interesting: - self._verify_fields(last_interesting, event, fields) - last_interesting = None - else: - self.warnings.append("Non-EOD .exists found " - "(msg_id: %s) " - "with no parent event." % - event['message_id']) - - # We got an interesting event, but no related .exists. if last_interesting: - self.warnings.append("Interesting event '%s' (msg_id: %s) " - "but no related non-EOD .exists record." % - (last_interesting['event_type'], - last_interesting['message_id'])) + self._verify_fields(last_interesting, exists, fields) + elif self._is_non_EOD_exists(exists): + self.warnings.append("Non-EOD .exists found " + "(msg_id: %s) " + "with no operation event." % + exists['message_id']) - def _do_checks(self, exists, events): - core_fields = self._get_core_fields() + deleted = self._find_deleted_events(block) delete_fields = ['launched_at', 'deleted_at'] - - # Ensure all the important fields of the important events - # match with the EOD .exists values. - self._extract_launched_at(exists) - for c in self._find_events(events): - self._verify_fields(exists, c, core_fields) - - self._confirm_launched_at(exists, events) - - # Ensure the deleted_at value matches as well. - deleted = self._find_deleted_events(events) self._confirm_delete(exists, deleted, delete_fields) - # Check the non-EOD .exists records. They should - # appear after an interesting event. - self._confirm_non_EOD_exists(events) - - def handle_events(self, events, env): - self.env = env - self.stream_id = env['stream_id'] - self.warnings = [] - - exists = None + def _process_block(self, block, exists): error = None try: - exists = self._find_exists(events) - self._do_checks(exists, events) + self._do_checks(block, exists) event_type = "compute.instance.exists.verified" except UsageException as e: error = e event_type = "compute.instance.exists.failed" logger.warn("Stream %s UsageException: (%s) %s" % (self.stream_id, e.code, e)) - if exists: - logger.warn("Stream %s deleted_at: %s, launched_at: %s, " - "state: %s, APB: %s, APE: %s, #events: %s" % - (self.stream_id, exists.get("deleted_at"), - exists.get("launched_at"), exists.get("state"), - exists.get("audit_period_beginning"), - exists.get("audit_period_ending"), len(events))) + apb, ape = self._get_audit_period(exists) + logger.warn("Stream %s deleted_at: %s, launched_at: %s, " + "state: %s, APB: %s, APE: %s, #events: %s" % + (self.stream_id, exists.get("deleted_at"), + exists.get("launched_at"), exists.get("state"), + apb, ape, len(block))) - if len(events) > 1: + if len(block) > 1: logger.warn("Events for Stream: %s" % self.stream_id) - for event in events: + for event in block: logger.warn("^Event: %s - %s" % (event['timestamp'], event['event_type'])) + events = [] # We could have warnings, but a valid event list. if self.warnings: - instance_id = "n/a" - if len(events): - instance_id = events[0].get('instance_id') + instance_id = exists.get('instance_id', 'n/a') warning_event = {'event_type': 'compute.instance.exists.warnings', 'message_id': str(uuid.uuid4()), 'timestamp': exists.get('timestamp', - datetime.datetime.utcnow()), + datetime.datetime.utcnow()), 'stream_id': int(self.stream_id), 'instance_id': instance_id, 'warnings': self.warnings} events.append(warning_event) - if exists: - new_event = {'event_type': event_type, + new_event = {'event_type': event_type, + 'message_id': str(uuid.uuid4()), + 'timestamp': exists.get('timestamp', + datetime.datetime.utcnow()), + 'stream_id': int(self.stream_id), + 'instance_id': exists.get('instance_id'), + 'error': str(error), + 'error_code': error and error.code + } + events.append(new_event) + return events + + def handle_events(self, events, env): + self.env = env + self.stream_id = env['stream_id'] + self.warnings = [] + + new_events = [] + block = [] + for event in events: + if self._is_exists(event): + new_events.extend(self._process_block(block, event)) + block = [] + else: + block.append(event) + + # Final block should be empty. + if block: + new_event = {'event_type': "compute.instance.exists.failed", 'message_id': str(uuid.uuid4()), - 'timestamp': exists.get('timestamp', - datetime.datetime.utcnow()), + 'timestamp': block[0].get('timestamp', + datetime.datetime.utcnow()), 'stream_id': int(self.stream_id), - 'instance_id': exists.get('instance_id'), - 'error': str(error), - 'error_code': error and error.code + 'instance_id': block[0].get('instance_id'), + 'error': "Notifications, but no .exists " + "notification found.", + 'error_code': "U0" } - events.append(new_event) - else: - logger.debug("No .exists record") + new_events.append(new_event) + + events.extend(new_events) return events def commit(self):