Generate .verified per .exists
Slight change on the old way, which was a single .verified on the one EOD .exists. Now it generates a .verified for every .exists seen. Change-Id: I7bdced936f203cbec4bf8b7ee2b956c388e66ff1
This commit is contained in:
parent
bfd3cfb43f
commit
c5f0eca828
@ -11,56 +11,86 @@ class TestUsageHandler(unittest.TestCase):
|
||||
super(TestUsageHandler, self).setUp()
|
||||
self.handler = pipeline_handler.UsageHandler()
|
||||
|
||||
def test_find_exists_happyday(self):
|
||||
def test_get_audit_period(self):
|
||||
event = {}
|
||||
apb, ape = self.handler._get_audit_period(event)
|
||||
self.assertIsNone(apb)
|
||||
self.assertIsNone(ape)
|
||||
|
||||
event = {'audit_period_beginning': "beginning"}
|
||||
apb, ape = self.handler._get_audit_period(event)
|
||||
self.assertEqual(apb, "beginning")
|
||||
self.assertIsNone(ape)
|
||||
|
||||
event = {'audit_period_ending': "ending"}
|
||||
apb, ape = self.handler._get_audit_period(event)
|
||||
self.assertIsNone(apb)
|
||||
self.assertEqual(ape, "ending")
|
||||
|
||||
def test_is_exists(self):
|
||||
event = {'event_type': None}
|
||||
self.assertFalse(self.handler._is_exists(event))
|
||||
|
||||
event = {'event_type': 'foo'}
|
||||
self.assertFalse(self.handler._is_exists(event))
|
||||
|
||||
event = {'event_type': 'compute.instance.exists'}
|
||||
self.assertTrue(self.handler._is_exists(event))
|
||||
|
||||
def test_is_non_EOD_exists(self):
|
||||
start = datetime.datetime(2014, 12, 31, 0, 0, 0)
|
||||
end = start + datetime.timedelta(days=1)
|
||||
events = [{'event_type': 'event_1'},
|
||||
{'event_type': 'event_2'},
|
||||
{'event_type': 'compute.instance.exists',
|
||||
'audit_period_beginning': start,
|
||||
'audit_period_ending': end}]
|
||||
eod = {'event_type': 'compute.instance.exists',
|
||||
'audit_period_beginning': start,
|
||||
'audit_period_ending': end}
|
||||
self.assertFalse(self.handler._is_non_EOD_exists(eod))
|
||||
|
||||
exists = self.handler._find_exists(events)
|
||||
self.assertEquals(exists, events[2])
|
||||
start = datetime.datetime(2014, 12, 31, 1, 0, 0)
|
||||
end = start + datetime.timedelta(hours=1)
|
||||
eod = {'event_type': 'compute.instance.exists',
|
||||
'audit_period_beginning': start,
|
||||
'audit_period_ending': end}
|
||||
self.assertTrue(self.handler._is_non_EOD_exists(eod))
|
||||
|
||||
def test_find_exists_none(self):
|
||||
events = [{'event_type': 'event_1'},
|
||||
{'event_type': 'event_2'}]
|
||||
eod = {'event_type': 'compute.instance.foo',
|
||||
'audit_period_beginning': start,
|
||||
'audit_period_ending': end}
|
||||
self.assertFalse(self.handler._is_non_EOD_exists(eod))
|
||||
|
||||
with self.assertRaises(pipeline_handler.UsageException):
|
||||
self.handler._find_exists(events)
|
||||
eod = {'event_type': 'compute.instance.foo',
|
||||
'audit_period_ending': end}
|
||||
self.assertFalse(self.handler._is_non_EOD_exists(eod))
|
||||
|
||||
def test_find_exists_midday(self):
|
||||
start = datetime.datetime(2014, 12, 31, 1, 1, 1)
|
||||
end = datetime.datetime(2014, 12, 31, 1, 1, 2)
|
||||
events = [{'event_type': 'event_1'},
|
||||
{'event_type': 'event_2'},
|
||||
{'event_type': 'compute.instance.exists',
|
||||
'audit_period_beginning': start,
|
||||
'audit_period_ending': end}]
|
||||
def test_is_EOD_exists(self):
|
||||
start = datetime.datetime(2014, 12, 31, 1, 0, 0)
|
||||
end = start + datetime.timedelta(days=1)
|
||||
eod = {'event_type': 'compute.instance.exists',
|
||||
'audit_period_beginning': start,
|
||||
'audit_period_ending': end}
|
||||
self.assertFalse(self.handler._is_EOD_exists(eod))
|
||||
|
||||
with self.assertRaises(pipeline_handler.UsageException):
|
||||
self.handler._find_exists(events)
|
||||
|
||||
def test_find_exists_long(self):
|
||||
start = datetime.datetime(2014, 12, 31, 0, 0, 0)
|
||||
end = start + datetime.timedelta(days=2)
|
||||
events = [{'event_type': 'event_1'},
|
||||
{'event_type': 'event_2'},
|
||||
{'event_type': 'compute.instance.exists',
|
||||
'audit_period_beginning': start,
|
||||
'audit_period_ending': end}]
|
||||
end = start + datetime.timedelta(hours=1)
|
||||
eod = {'event_type': 'compute.instance.exists',
|
||||
'audit_period_beginning': start,
|
||||
'audit_period_ending': end}
|
||||
self.assertFalse(self.handler._is_EOD_exists(eod))
|
||||
|
||||
with self.assertRaises(pipeline_handler.UsageException):
|
||||
self.handler._find_exists(events)
|
||||
start = datetime.datetime(2014, 12, 31, 0, 0, 0)
|
||||
end = start + datetime.timedelta(days=1)
|
||||
eod = {'event_type': 'compute.instance.exists',
|
||||
'audit_period_beginning': start,
|
||||
'audit_period_ending': end}
|
||||
self.assertTrue(self.handler._is_EOD_exists(eod))
|
||||
|
||||
def test_find_exists_no_audit_periods(self):
|
||||
events = [{'event_type': 'event_1'},
|
||||
{'event_type': 'event_2'},
|
||||
{'event_type': 'compute.instance.exists'}]
|
||||
eod = {'event_type': 'compute.instance.foo',
|
||||
'audit_period_beginning': start,
|
||||
'audit_period_ending': end}
|
||||
self.assertFalse(self.handler._is_EOD_exists(eod))
|
||||
|
||||
with self.assertRaises(pipeline_handler.UsageException):
|
||||
self.handler._find_exists(events)
|
||||
eod = {'event_type': 'compute.instance.foo',
|
||||
'audit_period_ending': end}
|
||||
self.assertFalse(self.handler._is_EOD_exists(eod))
|
||||
|
||||
def test_extract_launched_at(self):
|
||||
with self.assertRaises(pipeline_handler.UsageException):
|
||||
@ -109,20 +139,22 @@ class TestUsageHandler(unittest.TestCase):
|
||||
'state': 'deleted'}, [], [])
|
||||
self.assertEquals("U4", e.code)
|
||||
|
||||
self.handler.audit_beginning = datetime.datetime(2014, 12, 30, 0, 0, 0)
|
||||
self.handler.audit_ending = datetime.datetime(2014, 12, 31, 0, 0, 0)
|
||||
apb = datetime.datetime(2014, 12, 30, 0, 0, 0)
|
||||
ape = datetime.datetime(2014, 12, 31, 0, 0, 0)
|
||||
deleted_at = datetime.datetime(2014, 12, 30, 2, 0, 0)
|
||||
launched_at = datetime.datetime(2014, 12, 30, 1, 0, 0)
|
||||
with self.assertRaises(pipeline_handler.UsageException) as e:
|
||||
self.handler._confirm_delete({'deleted_at': deleted_at,
|
||||
'launched_at': launched_at,
|
||||
'audit_period_beginning': apb,
|
||||
'audit_period_ending': ape,
|
||||
'state': 'deleted'}, [], [])
|
||||
self.assertEquals("U5", e.code)
|
||||
|
||||
# Test the do-nothing scenario
|
||||
self.handler._confirm_delete({}, [], [])
|
||||
|
||||
def test_confirm_delete_delete_events(self):
|
||||
def test_confirm_delete_with_delete_events(self):
|
||||
with self.assertRaises(pipeline_handler.UsageException) as e:
|
||||
self.handler._confirm_delete({}, [{}], [])
|
||||
self.assertEquals("U6", e.code)
|
||||
@ -138,122 +170,143 @@ class TestUsageHandler(unittest.TestCase):
|
||||
v.assert_called_with(exists, deleted, ['a'])
|
||||
|
||||
def test_confirm_launched_at(self):
|
||||
self.handler._confirm_launched_at({'state': 'deleted'}, [])
|
||||
self.handler._confirm_launched_at([], {'state': 'deleted'})
|
||||
|
||||
self.handler.audit_beginning = datetime.datetime(2014, 12, 30, 0, 0, 0)
|
||||
self.handler.audit_ending = datetime.datetime(2014, 12, 31, 0, 0, 0)
|
||||
apb = datetime.datetime(2014, 12, 30, 0, 0, 0)
|
||||
ape = datetime.datetime(2014, 12, 31, 0, 0, 0)
|
||||
launched_at = datetime.datetime(2014, 12, 30, 1, 0, 0)
|
||||
with self.assertRaises(pipeline_handler.UsageException) as e:
|
||||
self.handler._confirm_launched_at({'state': 'active',
|
||||
'launched_at': launched_at},
|
||||
[{}])
|
||||
self.handler._confirm_launched_at([],
|
||||
{'state': 'active',
|
||||
'audit_period_beginning': apb,
|
||||
'audit_period_ending': ape,
|
||||
'launched_at': launched_at})
|
||||
self.assertEquals("U8", e.code)
|
||||
|
||||
def test_handle_events_no_exists(self):
|
||||
env = {'stream_id': 'stream'}
|
||||
with mock.patch.object(self.handler, "_find_exists") as c:
|
||||
def test_process_block_exists(self):
|
||||
exists = {'event_type':'compute.instance.exists', 'timestamp':'now',
|
||||
'instance_id':'inst'}
|
||||
self.handler.stream_id = 123
|
||||
with mock.patch.object(self.handler, "_do_checks") as c:
|
||||
events = self.handler._process_block([], exists)
|
||||
self.assertEquals(1, len(events))
|
||||
f = events[0]
|
||||
self.assertEquals("compute.instance.exists.verified",
|
||||
f['event_type'])
|
||||
self.assertEquals("now", f['timestamp'])
|
||||
self.assertEquals(123, f['stream_id'])
|
||||
self.assertEquals("inst", f['instance_id'])
|
||||
self.assertEquals("None", f['error'])
|
||||
self.assertIsNone(f['error_code'])
|
||||
|
||||
def test_process_block_bad(self):
|
||||
exists = {'event_type': 'compute.instance.exists', 'timestamp':'now',
|
||||
'instance_id':'inst'}
|
||||
self.handler.stream_id = 123
|
||||
with mock.patch.object(self.handler, "_do_checks") as c:
|
||||
c.side_effect = pipeline_handler.UsageException("UX", "Error")
|
||||
events = self.handler.handle_events([], env)
|
||||
self.assertEquals(0, len(events))
|
||||
events = self.handler._process_block([], exists)
|
||||
self.assertEquals(1, len(events))
|
||||
f = events[0]
|
||||
self.assertEquals("compute.instance.exists.failed",
|
||||
f['event_type'])
|
||||
self.assertEquals("now", f['timestamp'])
|
||||
self.assertEquals(123, f['stream_id'])
|
||||
self.assertEquals("inst", f['instance_id'])
|
||||
self.assertEquals("Error", f['error'])
|
||||
self.assertEquals("UX", f['error_code'])
|
||||
|
||||
def test_handle_events_exists(self):
|
||||
env = {'stream_id': 123}
|
||||
with mock.patch.object(self.handler, "_find_exists") as ex:
|
||||
ex.return_value = {'timestamp':'now', 'instance_id':'inst'}
|
||||
with mock.patch.object(self.handler, "_do_checks") as c:
|
||||
events = self.handler.handle_events([], env)
|
||||
self.assertEquals(1, len(events))
|
||||
f = events[0]
|
||||
self.assertEquals("compute.instance.exists.verified",
|
||||
f['event_type'])
|
||||
self.assertEquals("now", f['timestamp'])
|
||||
self.assertEquals(123, f['stream_id'])
|
||||
self.assertEquals("inst", f['instance_id'])
|
||||
self.assertEquals("None", f['error'])
|
||||
self.assertIsNone(f['error_code'])
|
||||
def test_process_block_warnings(self):
|
||||
self.handler.warnings = ['one', 'two']
|
||||
exists = {'event_type': 'compute.instance.exists',
|
||||
'timestamp':'now', 'instance_id':'inst'}
|
||||
self.handler.stream_id = 123
|
||||
with mock.patch.object(self.handler, "_do_checks") as c:
|
||||
events = self.handler._process_block([], exists)
|
||||
self.assertEquals(2, len(events))
|
||||
self.assertEquals("compute.instance.exists.warnings",
|
||||
events[0]['event_type'])
|
||||
self.assertEquals("compute.instance.exists.verified",
|
||||
events[1]['event_type'])
|
||||
|
||||
def test_handle_events_bad(self):
|
||||
env = {'stream_id': 123}
|
||||
with mock.patch.object(self.handler, "_find_exists") as ex:
|
||||
ex.return_value = {'timestamp':'now', 'instance_id':'inst'}
|
||||
with mock.patch.object(self.handler, "_do_checks") as c:
|
||||
c.side_effect = pipeline_handler.UsageException("UX", "Error")
|
||||
events = self.handler.handle_events([], env)
|
||||
self.assertEquals(1, len(events))
|
||||
f = events[0]
|
||||
self.assertEquals("compute.instance.exists.failed",
|
||||
f['event_type'])
|
||||
self.assertEquals("now", f['timestamp'])
|
||||
self.assertEquals(123, f['stream_id'])
|
||||
self.assertEquals("inst", f['instance_id'])
|
||||
self.assertEquals("Error", f['error'])
|
||||
self.assertEquals("UX", f['error_code'])
|
||||
|
||||
def test_handle_events_warnings(self):
|
||||
def fake_find_exists(events):
|
||||
self.handler.warnings = ['one', 'two']
|
||||
return {'timestamp':'now', 'instance_id':'inst'}
|
||||
|
||||
env = {'stream_id': 123}
|
||||
with mock.patch.object(self.handler, "_find_exists") as ex:
|
||||
ex.side_effect = fake_find_exists
|
||||
with mock.patch.object(self.handler, "_do_checks") as c:
|
||||
events = self.handler.handle_events([], env)
|
||||
self.assertEquals(2, len(events))
|
||||
self.assertEquals("compute.instance.exists.warnings",
|
||||
events[0]['event_type'])
|
||||
self.assertEquals("compute.instance.exists.verified",
|
||||
events[1]['event_type'])
|
||||
|
||||
def test_confirm_non_EOD_exists_no_events(self):
|
||||
events = []
|
||||
self.assertEquals(len(self.handler.warnings), 0)
|
||||
self.handler._confirm_non_EOD_exists(events)
|
||||
self.assertEquals(len(self.handler.warnings), 0)
|
||||
|
||||
def test_confirm_non_EOD_exists_no_interesting(self):
|
||||
events = [{'event_type': 'foo'}]
|
||||
self.assertEquals(len(self.handler.warnings), 0)
|
||||
self.handler._confirm_non_EOD_exists(events)
|
||||
self.assertEquals(len(self.handler.warnings), 0)
|
||||
|
||||
def test_confirm_non_EOD_exists_no_exists(self):
|
||||
events = [{'event_type': 'compute.instance.rebuild.start',
|
||||
'message_id': 'xxx'}]
|
||||
self.assertEquals(len(self.handler.warnings), 0)
|
||||
self.handler._confirm_non_EOD_exists(events)
|
||||
self.assertEquals(len(self.handler.warnings), 1)
|
||||
x = "Interesting"
|
||||
self.assertEquals(self.handler.warnings[0][:len(x)], x)
|
||||
|
||||
def test_confirm_non_EOD_exists_good(self):
|
||||
events = [{'event_type': 'compute.instance.rebuild.start'},
|
||||
{'event_type': 'compute.instance.exists'}]
|
||||
self.assertEquals(len(self.handler.warnings), 0)
|
||||
with mock.patch.object(self.handler, "_is_non_EOD_exists") as eod:
|
||||
with mock.patch.object(self.handler, "_verify_fields") as vf:
|
||||
eod.return_value = True
|
||||
self.handler._confirm_non_EOD_exists(events)
|
||||
self.assertEquals(len(self.handler.warnings), 0)
|
||||
|
||||
def test_confirm_non_EOD_exists_not_EOD(self):
|
||||
events = [{'event_type': 'compute.instance.exists',
|
||||
'message_id': 'blah'}]
|
||||
self.assertEquals(len(self.handler.warnings), 0)
|
||||
with mock.patch.object(self.handler, "_is_non_EOD_exists") as eod:
|
||||
with mock.patch.object(self.handler, "_verify_fields") as vf:
|
||||
eod.return_value = True
|
||||
self.handler._confirm_non_EOD_exists(events)
|
||||
self.assertEquals(len(self.handler.warnings), 1)
|
||||
x = "Non-EOD"
|
||||
self.assertEquals(self.handler.warnings[0][:len(x)], x)
|
||||
|
||||
@mock.patch.object(pipeline_handler.UsageHandler, '_confirm_non_EOD_exists')
|
||||
@mock.patch.object(pipeline_handler.UsageHandler, '_get_core_fields')
|
||||
@mock.patch.object(pipeline_handler.UsageHandler, '_extract_launched_at')
|
||||
@mock.patch.object(pipeline_handler.UsageHandler, '_find_events')
|
||||
@mock.patch.object(pipeline_handler.UsageHandler, '_confirm_launched_at')
|
||||
@mock.patch.object(pipeline_handler.UsageHandler, '_get_core_fields')
|
||||
@mock.patch.object(pipeline_handler.UsageHandler, '_verify_fields')
|
||||
@mock.patch.object(pipeline_handler.UsageHandler, '_is_non_EOD_exists')
|
||||
@mock.patch.object(pipeline_handler.UsageHandler, '_find_deleted_events')
|
||||
@mock.patch.object(pipeline_handler.UsageHandler, '_confirm_delete')
|
||||
def test_do_check(self, cd, cla, fe, ela, gcf, cnee):
|
||||
fe.return_value = [1,2,3]
|
||||
self.handler._do_checks({}, [])
|
||||
def test_do_check_no_interesting_EOD_exists(self, cd, fde, inee, vf,
|
||||
gcf, cla):
|
||||
block = []
|
||||
exists = {'event_type': 'compute.instance.exists',
|
||||
'message_id': 2}
|
||||
inee.return_value = False
|
||||
self.handler._do_checks(block, exists)
|
||||
self.assertTrue(len(self.handler.warnings) == 0)
|
||||
self.assertFalse(vf.called)
|
||||
|
||||
@mock.patch.object(pipeline_handler.UsageHandler, '_confirm_launched_at')
|
||||
@mock.patch.object(pipeline_handler.UsageHandler, '_get_core_fields')
|
||||
@mock.patch.object(pipeline_handler.UsageHandler, '_verify_fields')
|
||||
@mock.patch.object(pipeline_handler.UsageHandler, '_is_non_EOD_exists')
|
||||
@mock.patch.object(pipeline_handler.UsageHandler, '_find_deleted_events')
|
||||
@mock.patch.object(pipeline_handler.UsageHandler, '_confirm_delete')
|
||||
def test_do_check_no_interesting_non_EOD(self, cd, fde, inee, vf,
|
||||
gcf, cla):
|
||||
block = []
|
||||
exists = {'event_type': 'compute.instance.exists',
|
||||
'message_id': 2}
|
||||
inee.return_value = True
|
||||
self.handler._do_checks(block, exists)
|
||||
self.assertTrue(len(self.handler.warnings) == 1)
|
||||
self.assertFalse(vf.called)
|
||||
|
||||
@mock.patch.object(pipeline_handler.UsageHandler, '_confirm_launched_at')
|
||||
@mock.patch.object(pipeline_handler.UsageHandler, '_get_core_fields')
|
||||
@mock.patch.object(pipeline_handler.UsageHandler, '_verify_fields')
|
||||
@mock.patch.object(pipeline_handler.UsageHandler, '_is_non_EOD_exists')
|
||||
@mock.patch.object(pipeline_handler.UsageHandler, '_find_deleted_events')
|
||||
@mock.patch.object(pipeline_handler.UsageHandler, '_confirm_delete')
|
||||
def test_do_check_interesting(self, cd, fde, inee, vf, gcf, cla):
|
||||
block = [{'event_type': 'compute.instance.rebuild.start',
|
||||
'message_id': 1}]
|
||||
exists = {'event_type': 'compute.instance.exists',
|
||||
'message_id': 2}
|
||||
self.handler._do_checks(block, exists)
|
||||
self.assertTrue(len(self.handler.warnings) == 0)
|
||||
self.assertTrue(vf.called)
|
||||
|
||||
def test_handle_events_no_data(self):
|
||||
env = {'stream_id': 123}
|
||||
events = self.handler.handle_events([], env)
|
||||
self.assertEquals(0, len(events))
|
||||
|
||||
def test_handle_events_no_exists(self):
|
||||
env = {'stream_id': 123}
|
||||
raw = [{'event_type': 'foo'}]
|
||||
events = self.handler.handle_events(raw, env)
|
||||
self.assertEquals(2, len(events))
|
||||
self.assertEquals("compute.instance.exists.failed",
|
||||
events[1]['event_type'])
|
||||
|
||||
@mock.patch.object(pipeline_handler.UsageHandler, '_process_block')
|
||||
def test_handle_events_exists(self, pb):
|
||||
env = {'stream_id': 123}
|
||||
raw = [{'event_type': 'foo'},
|
||||
{'event_type': 'compute.instance.exists'}]
|
||||
events = self.handler.handle_events(raw, env)
|
||||
self.assertEquals(2, len(events))
|
||||
self.assertTrue(pb.called)
|
||||
|
||||
@mock.patch.object(pipeline_handler.UsageHandler, '_process_block')
|
||||
def test_handle_events_dangling(self, pb):
|
||||
env = {'stream_id': 123}
|
||||
raw = [{'event_type': 'foo'},
|
||||
{'event_type': 'compute.instance.exists'},
|
||||
{'event_type': 'foo'},
|
||||
]
|
||||
events = self.handler.handle_events(raw, env)
|
||||
self.assertEquals(4, len(events))
|
||||
self.assertEquals("compute.instance.exists.failed",
|
||||
events[3]['event_type'])
|
||||
self.assertTrue(pb.called)
|
||||
|
@ -103,40 +103,33 @@ class UsageHandler(PipelineHandlerBase):
|
||||
super(UsageHandler, self).__init__(**kw)
|
||||
self.warnings = []
|
||||
|
||||
def _get_audit_period(self, event):
|
||||
apb = event.get('audit_period_beginning')
|
||||
ape = event.get('audit_period_ending')
|
||||
return apb, ape
|
||||
|
||||
def _is_exists(self, event):
|
||||
return event['event_type'] == 'compute.instance.exists'
|
||||
|
||||
def _is_non_EOD_exists(self, event):
|
||||
# For non-EOD .exists, we just check that the APB and APE are
|
||||
# not 24hrs apart. We could check that it's not midnight, but
|
||||
# could be possible (though unlikely).
|
||||
# And, if we don't find any extras, don't error out ...
|
||||
# we'll do that later.
|
||||
apb = event.get('audit_period_beginning')
|
||||
ape = event.get('audit_period_ending')
|
||||
return (event['event_type'] == 'compute.instance.exists'
|
||||
and apb and ape
|
||||
apb, ape = self._get_audit_period(event)
|
||||
return (self._is_exists(event) and apb and ape
|
||||
and ape.date() != (apb.date() + datetime.timedelta(days=1)))
|
||||
|
||||
def _find_exists(self, events):
|
||||
exists = None
|
||||
|
||||
def _is_EOD_exists(self, event):
|
||||
# We could have several .exists records, but only the
|
||||
# end-of-day .exists will have audit_period_* time of
|
||||
# 00:00:00 and be 24hrs apart.
|
||||
for event in events:
|
||||
apb = event.get('audit_period_beginning')
|
||||
ape = event.get('audit_period_ending')
|
||||
if (event['event_type'] == 'compute.instance.exists'
|
||||
and apb and ape and apb.time() == datetime.time(0, 0, 0)
|
||||
and ape.time() == datetime.time(0, 0, 0)
|
||||
and ape.date() == (apb.date() + datetime.timedelta(days=1))):
|
||||
exists = event
|
||||
self.audit_beginning = apb
|
||||
self.audit_ending = ape
|
||||
break
|
||||
|
||||
if not exists:
|
||||
raise UsageException("U0", "No .exists notification found.")
|
||||
|
||||
return exists
|
||||
apb, ape = self._get_audit_period(event)
|
||||
return (self._is_exists(event) and apb and ape
|
||||
and apb.time() == datetime.time(0, 0, 0)
|
||||
and ape.time() == datetime.time(0, 0, 0)
|
||||
and ape.date() == (apb.date() + datetime.timedelta(days=1)))
|
||||
|
||||
def _extract_launched_at(self, exists):
|
||||
if not exists.get('launched_at'):
|
||||
@ -147,22 +140,6 @@ class UsageHandler(PipelineHandlerBase):
|
||||
return [event for event in events
|
||||
if event['event_type'] in interesting]
|
||||
|
||||
def _find_events(self, events):
|
||||
# We could easily end up with no events in final_set if
|
||||
# there were no operations performed on an instance that day.
|
||||
# We'll still get a .exists for every active instance though.
|
||||
interesting = ['compute.instance.rebuild.start',
|
||||
'compute.instance.resize.prep.start',
|
||||
'compute.instance.resize.revert.start',
|
||||
'compute.instance.rescue.start',
|
||||
'compute.instance.create.end',
|
||||
'compute.instance.rebuild.end',
|
||||
'compute.instance.resize.finish.end',
|
||||
'compute.instance.resize.revert.end',
|
||||
'compute.instance.rescue.end']
|
||||
|
||||
return self._extract_interesting_events(events, interesting)
|
||||
|
||||
def _find_deleted_events(self, events):
|
||||
interesting = ['compute.instance.delete.end']
|
||||
return self._extract_interesting_events(events, interesting)
|
||||
@ -176,15 +153,20 @@ class UsageHandler(PipelineHandlerBase):
|
||||
"Conflicting '%s' values ('%s' != '%s')"
|
||||
% (field, this[field], that[field]))
|
||||
|
||||
def _confirm_delete(self, exists, deleted, fields):
|
||||
def _confirm_delete(self, exists, delete_events, fields):
|
||||
deleted_at = exists.get('deleted_at')
|
||||
state = exists.get('state')
|
||||
apb, ape = self._get_audit_period(exists)
|
||||
|
||||
if not deleted_at and delete_events:
|
||||
raise UsageException("U6", ".deleted events found but .exists "
|
||||
"has no deleted_at value.")
|
||||
|
||||
if deleted_at and state != "deleted":
|
||||
raise UsageException("U3", ".exists state not 'deleted' but "
|
||||
".exists deleted_at is set.")
|
||||
|
||||
if deleted_at and not deleted:
|
||||
if deleted_at and not delete_events:
|
||||
# We've already confirmed it's in the "deleted" state.
|
||||
launched_at = exists.get('launched_at')
|
||||
if deleted_at < launched_at:
|
||||
@ -192,33 +174,31 @@ class UsageHandler(PipelineHandlerBase):
|
||||
".exists deleted_at < .exists launched_at.")
|
||||
|
||||
# Is the deleted_at within this audit period?
|
||||
if (deleted_at >= self.audit_beginning
|
||||
and deleted_at <= self.audit_ending):
|
||||
if (apb and ape and deleted_at >= apb and deleted_at <= ape):
|
||||
raise UsageException("U5", ".exists deleted_at in audit "
|
||||
"period, but no matching .delete event found.")
|
||||
|
||||
if not deleted_at and deleted:
|
||||
raise UsageException("U6", ".deleted events found but .exists "
|
||||
"has no deleted_at value.")
|
||||
|
||||
if len(deleted) > 1:
|
||||
if len(delete_events) > 1:
|
||||
raise UsageException("U7", "Multiple .delete.end events")
|
||||
|
||||
if deleted:
|
||||
self._verify_fields(exists, deleted[0], fields)
|
||||
if delete_events:
|
||||
self._verify_fields(exists, delete_events[0], fields)
|
||||
|
||||
def _confirm_launched_at(self, exists, events):
|
||||
def _confirm_launched_at(self, block, exists):
|
||||
if exists.get('state') != 'active':
|
||||
return
|
||||
|
||||
apb, ape = self._get_audit_period(exists)
|
||||
|
||||
# Does launched_at have a value within this audit period?
|
||||
# If so, we should have a related event. Otherwise, this
|
||||
# instance was created previously.
|
||||
launched_at = exists['launched_at']
|
||||
if (launched_at >= self.audit_beginning
|
||||
and launched_at <= self.audit_ending and len(events) == 1):
|
||||
raise UsageException("U8", ".exists launched_at in audit "
|
||||
"period, but no related events found.")
|
||||
launched_at = self._extract_launched_at(exists)
|
||||
if (apb and ape and
|
||||
launched_at >= apb and launched_at <= ape and
|
||||
len(block) == 0):
|
||||
raise UsageException("U8", ".exists launched_at in audit "
|
||||
"period, but no related events found.")
|
||||
|
||||
# TODO(sandy): Confirm the events we got set launched_at
|
||||
# properly.
|
||||
@ -229,111 +209,107 @@ class UsageHandler(PipelineHandlerBase):
|
||||
return ['launched_at', 'instance_flavor_id', 'tenant_id',
|
||||
'os_architecture', 'os_version', 'os_distro']
|
||||
|
||||
def _confirm_non_EOD_exists(self, events):
|
||||
def _do_checks(self, block, exists):
|
||||
interesting = ['compute.instance.rebuild.start',
|
||||
'compute.instance.resize.prep.start',
|
||||
'compute.instance.rescue.start']
|
||||
|
||||
self._confirm_launched_at(block, exists)
|
||||
|
||||
fields = self._get_core_fields()
|
||||
last_interesting = None
|
||||
fields = ['launched_at', 'deleted_at']
|
||||
for event in events:
|
||||
for event in block:
|
||||
if event['event_type'] in interesting:
|
||||
last_interesting = event
|
||||
elif (event['event_type'] == 'compute.instance.exists' and
|
||||
self._is_non_EOD_exists(event)):
|
||||
if last_interesting:
|
||||
self._verify_fields(last_interesting, event, fields)
|
||||
last_interesting = None
|
||||
else:
|
||||
self.warnings.append("Non-EOD .exists found "
|
||||
"(msg_id: %s) "
|
||||
"with no parent event." %
|
||||
event['message_id'])
|
||||
|
||||
# We got an interesting event, but no related .exists.
|
||||
if last_interesting:
|
||||
self.warnings.append("Interesting event '%s' (msg_id: %s) "
|
||||
"but no related non-EOD .exists record." %
|
||||
(last_interesting['event_type'],
|
||||
last_interesting['message_id']))
|
||||
self._verify_fields(last_interesting, exists, fields)
|
||||
elif self._is_non_EOD_exists(exists):
|
||||
self.warnings.append("Non-EOD .exists found "
|
||||
"(msg_id: %s) "
|
||||
"with no operation event." %
|
||||
exists['message_id'])
|
||||
|
||||
def _do_checks(self, exists, events):
|
||||
core_fields = self._get_core_fields()
|
||||
deleted = self._find_deleted_events(block)
|
||||
delete_fields = ['launched_at', 'deleted_at']
|
||||
|
||||
# Ensure all the important fields of the important events
|
||||
# match with the EOD .exists values.
|
||||
self._extract_launched_at(exists)
|
||||
for c in self._find_events(events):
|
||||
self._verify_fields(exists, c, core_fields)
|
||||
|
||||
self._confirm_launched_at(exists, events)
|
||||
|
||||
# Ensure the deleted_at value matches as well.
|
||||
deleted = self._find_deleted_events(events)
|
||||
self._confirm_delete(exists, deleted, delete_fields)
|
||||
|
||||
# Check the non-EOD .exists records. They should
|
||||
# appear after an interesting event.
|
||||
self._confirm_non_EOD_exists(events)
|
||||
|
||||
def handle_events(self, events, env):
|
||||
self.env = env
|
||||
self.stream_id = env['stream_id']
|
||||
self.warnings = []
|
||||
|
||||
exists = None
|
||||
def _process_block(self, block, exists):
|
||||
error = None
|
||||
try:
|
||||
exists = self._find_exists(events)
|
||||
self._do_checks(exists, events)
|
||||
self._do_checks(block, exists)
|
||||
event_type = "compute.instance.exists.verified"
|
||||
except UsageException as e:
|
||||
error = e
|
||||
event_type = "compute.instance.exists.failed"
|
||||
logger.warn("Stream %s UsageException: (%s) %s" %
|
||||
(self.stream_id, e.code, e))
|
||||
if exists:
|
||||
logger.warn("Stream %s deleted_at: %s, launched_at: %s, "
|
||||
"state: %s, APB: %s, APE: %s, #events: %s" %
|
||||
(self.stream_id, exists.get("deleted_at"),
|
||||
exists.get("launched_at"), exists.get("state"),
|
||||
exists.get("audit_period_beginning"),
|
||||
exists.get("audit_period_ending"), len(events)))
|
||||
apb, ape = self._get_audit_period(exists)
|
||||
logger.warn("Stream %s deleted_at: %s, launched_at: %s, "
|
||||
"state: %s, APB: %s, APE: %s, #events: %s" %
|
||||
(self.stream_id, exists.get("deleted_at"),
|
||||
exists.get("launched_at"), exists.get("state"),
|
||||
apb, ape, len(block)))
|
||||
|
||||
if len(events) > 1:
|
||||
if len(block) > 1:
|
||||
logger.warn("Events for Stream: %s" % self.stream_id)
|
||||
for event in events:
|
||||
for event in block:
|
||||
logger.warn("^Event: %s - %s" %
|
||||
(event['timestamp'], event['event_type']))
|
||||
|
||||
events = []
|
||||
# We could have warnings, but a valid event list.
|
||||
if self.warnings:
|
||||
instance_id = "n/a"
|
||||
if len(events):
|
||||
instance_id = events[0].get('instance_id')
|
||||
instance_id = exists.get('instance_id', 'n/a')
|
||||
warning_event = {'event_type': 'compute.instance.exists.warnings',
|
||||
'message_id': str(uuid.uuid4()),
|
||||
'timestamp': exists.get('timestamp',
|
||||
datetime.datetime.utcnow()),
|
||||
datetime.datetime.utcnow()),
|
||||
'stream_id': int(self.stream_id),
|
||||
'instance_id': instance_id,
|
||||
'warnings': self.warnings}
|
||||
events.append(warning_event)
|
||||
|
||||
if exists:
|
||||
new_event = {'event_type': event_type,
|
||||
new_event = {'event_type': event_type,
|
||||
'message_id': str(uuid.uuid4()),
|
||||
'timestamp': exists.get('timestamp',
|
||||
datetime.datetime.utcnow()),
|
||||
'stream_id': int(self.stream_id),
|
||||
'instance_id': exists.get('instance_id'),
|
||||
'error': str(error),
|
||||
'error_code': error and error.code
|
||||
}
|
||||
events.append(new_event)
|
||||
return events
|
||||
|
||||
def handle_events(self, events, env):
|
||||
self.env = env
|
||||
self.stream_id = env['stream_id']
|
||||
self.warnings = []
|
||||
|
||||
new_events = []
|
||||
block = []
|
||||
for event in events:
|
||||
if self._is_exists(event):
|
||||
new_events.extend(self._process_block(block, event))
|
||||
block = []
|
||||
else:
|
||||
block.append(event)
|
||||
|
||||
# Final block should be empty.
|
||||
if block:
|
||||
new_event = {'event_type': "compute.instance.exists.failed",
|
||||
'message_id': str(uuid.uuid4()),
|
||||
'timestamp': exists.get('timestamp',
|
||||
datetime.datetime.utcnow()),
|
||||
'timestamp': block[0].get('timestamp',
|
||||
datetime.datetime.utcnow()),
|
||||
'stream_id': int(self.stream_id),
|
||||
'instance_id': exists.get('instance_id'),
|
||||
'error': str(error),
|
||||
'error_code': error and error.code
|
||||
'instance_id': block[0].get('instance_id'),
|
||||
'error': "Notifications, but no .exists "
|
||||
"notification found.",
|
||||
'error_code': "U0"
|
||||
}
|
||||
events.append(new_event)
|
||||
else:
|
||||
logger.debug("No .exists record")
|
||||
new_events.append(new_event)
|
||||
|
||||
events.extend(new_events)
|
||||
return events
|
||||
|
||||
def commit(self):
|
||||
|
Loading…
x
Reference in New Issue
Block a user