Merge pull request #27 from ramielrowe/usage_out_of_order

Handling Out of Order Usage Notifications
This commit is contained in:
Sandy Walsh 2013-02-13 09:43:25 -08:00
commit 8e51bf80af
6 changed files with 128 additions and 76 deletions

View File

@ -1,37 +1,71 @@
import models
def get_or_create_deployment(name):
return models.Deployment.objects.get_or_create(name=name)
def create_rawdata(**kwargs):
return models.RawData(**kwargs)
def create_lifecycle(**kwargs):
return models.Lifecycle(**kwargs)
def find_lifecycles(**kwargs):
return models.Lifecycle.objects.select_related().filter(**kwargs)
def create_timing(**kwargs):
return models.Timing(**kwargs)
def find_timings(**kwargs):
return models.Timing.objects.select_related().filter(**kwargs)
def create_request_tracker(**kwargs):
return models.RequestTracker(**kwargs)
def find_request_trackers(**kwargs):
return models.RequestTracker.objects.filter(**kwargs)
def create_instance_usage(**kwargs):
return models.InstanceUsage(**kwargs)
def get_or_create_instance_usage(**kwargs):
return models.InstanceUsage.objects.get_or_create(**kwargs)
def get_instance_usage(**kwargs):
return models.InstanceUsage.objects.get(**kwargs)
usage = None
try:
usage = models.InstanceUsage.objects.get(**kwargs)
except models.InstanceUsage.DoesNotExist:
pass
return usage
def create_instance_delete(**kwargs):
return models.InstanceDeletes(**kwargs)
def get_instance_delete(**kwargs):
delete = None
try:
delete = models.InstanceDeletes.objects.get(**kwargs)
except models.InstanceDeletes.DoesNotExist:
pass
return delete
def create_instance_exists(**kwargs):
return models.InstanceExists(**kwargs)
def save(obj):
obj.save()

View File

@ -75,18 +75,26 @@ class Lifecycle(models.Model):
class InstanceUsage(models.Model):
instance = models.CharField(max_length=50, null=True,
blank=True, db_index=True)
#launched_at = models.IntegerField(null=True, db_index=True)
launched_at = models.DecimalField(null=True, max_digits=20,
decimal_places=6)
#deleted_at = models.IntegerField(null=True, db_index=True)
deleted_at = models.DecimalField(null=True, max_digits=20,
decimal_places=6)
request_id = models.CharField(max_length=50, null=True,
blank=True, db_index=True)
instance_type_id = models.CharField(max_length=50,
null=True,
blank=True,
db_index=True)
class InstanceDeletes(models.Model):
instance = models.CharField(max_length=50, null=True,
blank=True, db_index=True)
launched_at = models.DecimalField(null=True, max_digits=20,
decimal_places=6)
deleted_at = models.DecimalField(null=True, max_digits=20,
decimal_places=6)
raw = models.ForeignKey(RawData, null=True)
class InstanceExists(models.Model):
PENDING = 'pending'
VERIFIED = 'verified'
@ -98,10 +106,8 @@ class InstanceExists(models.Model):
]
instance = models.CharField(max_length=50, null=True,
blank=True, db_index=True)
#launched_at = models.IntegerField(null=True, db_index=True)
launched_at = models.DecimalField(null=True, max_digits=20,
decimal_places=6)
#deleted_at = models.IntegerField(null=True, db_index=True)
deleted_at = models.DecimalField(null=True, max_digits=20,
decimal_places=6)
message_id = models.CharField(max_length=50, null=True,
@ -115,6 +121,7 @@ class InstanceExists(models.Model):
default=PENDING)
raw = models.ForeignKey(RawData, related_name='+', null=True)
usage = models.ForeignKey(InstanceUsage, related_name='+', null=True)
delete = models.ForeignKey(InstanceDeletes, related_name='+', null=True)
class Timing(models.Model):

View File

@ -160,10 +160,11 @@ def make_resize_revert_end_json(launched_at, instance_type_id='1',
def create_raw(deployment, when, event, instance=INSTANCE_ID_1,
request_id=REQUEST_ID_1, state='active', old_task='',
host='compute', json=''):
host='compute', service='compute', json=''):
raw_values = {
'deployment': deployment,
'host': host,
'service': service,
'state': state,
'old_task': old_task,
'when': when,

View File

@ -35,7 +35,7 @@ class ViewsLifecycleWorkflowTestCase(unittest.TestCase):
when3 = views.str_time_to_unix('2012-12-21 12:36:56.124')
self.update_raw = create_raw(self.deployment, when1,
'compute.instance.update',
host='api')
host='api', service='api')
self.start_raw = create_raw(self.deployment, when2,
'compute.instance.reboot.start')
self.end_raw = create_raw(self.deployment, when3,
@ -105,7 +105,7 @@ class ViewsLifecycleWorkflowTestCase(unittest.TestCase):
'compute.instance.update',
instance=INSTANCE_ID_2,
request_id=REQUEST_ID_2,
host='api')
host='api', service='api')
start_raw2 = create_raw(self.deployment, when2,
'compute.instance.resize.start',
instance=INSTANCE_ID_2,
@ -157,7 +157,7 @@ class ViewsLifecycleWorkflowTestCase(unittest.TestCase):
update_raw2 = create_raw(self.deployment, when1,
'compute.instance.update',
request_id=REQUEST_ID_2,
host='api')
host='api', service='api')
start_raw2 = create_raw(self.deployment, when2,
'compute.instance.resize.start',
request_id=REQUEST_ID_2)
@ -227,7 +227,7 @@ class ViewsLifecycleWorkflowTestCase(unittest.TestCase):
'compute.instance.update',
instance=INSTANCE_ID_2,
request_id=REQUEST_ID_2,
host='api')
host='api', service='api')
start_raw2 = create_raw(self.deployment, when2,
'compute.instance.resize.start',
instance=INSTANCE_ID_2,
@ -268,9 +268,9 @@ class ViewsLifecycleWorkflowTestCase(unittest.TestCase):
when2 = views.str_time_to_unix('2012-12-21 13:34:50.123')
when3 = views.str_time_to_unix('2012-12-21 13:37:50.124')
update_raw2 = create_raw(self.deployment, when1,
'compute.instance.update',
request_id=REQUEST_ID_2,
host='api')
'compute.instance.update',
request_id=REQUEST_ID_2,
host='api', service='api')
start_raw2 = create_raw(self.deployment, when2,
'compute.instance.resize.start',
request_id=REQUEST_ID_2)
@ -309,7 +309,7 @@ class ViewsLifecycleWorkflowTestCase(unittest.TestCase):
update_raw2 = create_raw(self.deployment, when1,
'compute.instance.update',
request_id=REQUEST_ID_2,
host='api')
host='api', service='api')
start_raw2 = create_raw(self.deployment, when2,
'compute.instance.resize.start',
request_id=REQUEST_ID_2)
@ -492,14 +492,6 @@ class ViewsUsageTestCase(unittest.TestCase):
def test_process_delete(self):
launched_str = '2012-12-21 06:34:50.123'
launched = views.str_time_to_unix(launched_str)
values = {
'instance': INSTANCE_ID_1,
'request_id': REQUEST_ID_1,
'instance_type_id': '1',
'launched_at': launched,
}
InstanceUsage(**values).save()
deleted_str = '2012-12-21 12:34:50.123'
deleted = views.str_time_to_unix(deleted_str)
json = test_utils.make_delete_end_json(launched_str, deleted_str)
@ -508,10 +500,13 @@ class ViewsUsageTestCase(unittest.TestCase):
views._process_delete(raw)
usages = InstanceUsage.objects.all()
self.assertEqual(len(usages), 1)
usage = usages[0]
self.assertEqual(usage.deleted_at, deleted)
delete = InstanceDeletes.objects.all()
self.assertEqual(len(delete), 1)
delete = delete[0]
self.assertEqual(delete.instance, INSTANCE_ID_1)
self.assertEqual(delete.launched_at, launched)
self.assertEqual(delete.deleted_at, deleted)
self.assertEqual(delete.raw.id, raw.id)
def test_process_exists(self):
launched_str = '2012-12-21 06:34:50.123'
@ -556,7 +551,6 @@ class ViewsUsageTestCase(unittest.TestCase):
'request_id': REQUEST_ID_1,
'instance_type_id': '1',
'launched_at': launched,
'deleted_at': deleted,
}
InstanceUsage(**values).save()
@ -621,7 +615,6 @@ class ViewsUsageWorkflowTestCase(unittest.TestCase):
usage = usages[0]
self.assertOnUsage(usage, INSTANCE_ID_1, '1', launched, REQUEST_ID_1)
@unittest.skip("can't handle late starts yet")
def test_create_workflow_start_late(self):
created_str = '2012-12-21 06:30:50.123'
created = views.str_time_to_unix(created_str)
@ -749,7 +742,6 @@ class ViewsUsageWorkflowTestCase(unittest.TestCase):
self.assertOnUsage(usage_after, INSTANCE_ID_1, '2', finish_time,
REQUEST_ID_2)
@unittest.skip("can't handle late starts yet")
def test_resize_workflow_start_late(self):
launched_str = '2012-12-21 06:34:50.123'
launched = views.str_time_to_unix(launched_str)
@ -852,7 +844,6 @@ class ViewsUsageWorkflowTestCase(unittest.TestCase):
self.assertOnUsage(usage_after_revert, INSTANCE_ID_1, '1', end_time,
REQUEST_ID_3)
@unittest.skip("can't handle late starts yet")
def test_resize_revert_workflow_start_late(self):
launched_str = '2012-12-21 06:34:50.123'
launched = views.str_time_to_unix(launched_str)

View File

@ -223,10 +223,11 @@ def _process_usage_for_new_launch(raw):
values['instance'] = payload['instance_id']
values['request_id'] = notif[1]['_context_request_id']
if raw.event == INSTANCE_EVENT['create_start']:
values['instance_type_id'] = payload['instance_type_id']
(usage, new) = STACKDB.get_or_create_instance_usage(**values)
if raw.event == INSTANCE_EVENT['create_start']:
usage.instance_type_id = payload['instance_type_id']
usage = STACKDB.create_instance_usage(**values)
STACKDB.save(usage)
@ -235,8 +236,8 @@ def _process_usage_for_updates(raw):
payload = notif[1]['payload']
instance_id = payload['instance_id']
request_id = notif[1]['_context_request_id']
usage = STACKDB.get_instance_usage(instance=instance_id,
request_id=request_id)
(usage, new) = STACKDB.get_or_create_instance_usage(instance=instance_id,
request_id=request_id)
if raw.event in [INSTANCE_EVENT['create_end'],
INSTANCE_EVENT['resize_finish_end'],
@ -255,28 +256,37 @@ def _process_delete(raw):
notif = json.loads(raw.json)
payload = notif[1]['payload']
instance_id = payload['instance_id']
launched_at = payload['launched_at']
launched_at = str_time_to_unix(launched_at)
instance = STACKDB.get_instance_usage(instance=instance_id,
launched_at=launched_at)
instance.deleted_at = str_time_to_unix(payload['deleted_at'])
STACKDB.save(instance)
launched_at = str_time_to_unix(payload['launched_at'])
deleted_at = str_time_to_unix(payload['deleted_at'])
values = {
'instance': instance_id,
'launched_at': launched_at,
'deleted_at': deleted_at,
'raw': raw
}
delete = STACKDB.create_instance_delete(**values)
STACKDB.save(delete)
def _process_exists(raw):
notif = json.loads(raw.json)
payload = notif[1]['payload']
instance_id = payload['instance_id']
launched_at = payload['launched_at']
launched_at = str_time_to_unix(launched_at)
launched_at = str_time_to_unix(payload['launched_at'])
launched_range = (launched_at, launched_at+1)
usage = STACKDB.get_instance_usage(instance=instance_id,
launched_at=launched_at)
launched_at__range=launched_range)
delete = STACKDB.get_instance_delete(instance=instance_id,
launched_at__range=launched_range)
values = {}
values['message_id'] = notif[1]['message_id']
values['instance'] = instance_id
values['launched_at'] = launched_at
values['instance_type_id'] = payload['instance_type_id']
values['usage'] = usage
if usage:
values['usage'] = usage
if delete:
values['delete'] = delete
values['raw'] = raw
deleted_at = payload.get('deleted_at')

View File

@ -410,13 +410,13 @@ class StacktackUsageParsingTestCase(unittest.TestCase):
event = 'compute.instance.create.start'
raw = utils.create_raw(self.mox, when, event=event, json_str=json_str)
usage = self.mox.CreateMockAnything()
views.STACKDB.create_instance_usage(instance=INSTANCE_ID_1,
request_id=REQUEST_ID_1,
instance_type_id = '1')\
.AndReturn(usage)
views.STACKDB.get_or_create_instance_usage(instance=INSTANCE_ID_1,
request_id=REQUEST_ID_1)\
.AndReturn((usage, True))
views.STACKDB.save(usage)
self.mox.ReplayAll()
views._process_usage_for_new_launch(raw)
self.assertEquals(usage.instance_type_id, '1')
self.mox.VerifyAll()
def test_process_usage_for_updates_create_end(self):
@ -433,9 +433,9 @@ class StacktackUsageParsingTestCase(unittest.TestCase):
usage.instance = INSTANCE_ID_1
usage.request_id = REQUEST_ID_1
usage.instance_type_id = '1'
views.STACKDB.get_instance_usage(instance=INSTANCE_ID_1,
request_id=REQUEST_ID_1)\
.AndReturn(usage)
views.STACKDB.get_or_create_instance_usage(instance=INSTANCE_ID_1,
request_id=REQUEST_ID_1)\
.AndReturn((usage, True))
views.STACKDB.save(usage)
self.mox.ReplayAll()
@ -459,9 +459,9 @@ class StacktackUsageParsingTestCase(unittest.TestCase):
usage.instance = INSTANCE_ID_1
usage.request_id = REQUEST_ID_1
usage.instance_type_id = '1'
views.STACKDB.get_instance_usage(instance=INSTANCE_ID_1,
request_id=REQUEST_ID_1)\
.AndReturn(usage)
views.STACKDB.get_or_create_instance_usage(instance=INSTANCE_ID_1,
request_id=REQUEST_ID_1)\
.AndReturn((usage, True))
views.STACKDB.save(usage)
self.mox.ReplayAll()
@ -484,9 +484,9 @@ class StacktackUsageParsingTestCase(unittest.TestCase):
usage = self.mox.CreateMockAnything()
usage.instance = INSTANCE_ID_1
usage.request_id = REQUEST_ID_1
views.STACKDB.get_instance_usage(instance=INSTANCE_ID_1,
request_id=REQUEST_ID_1)\
.AndReturn(usage)
views.STACKDB.get_or_create_instance_usage(instance=INSTANCE_ID_1,
request_id=REQUEST_ID_1)\
.AndReturn((usage, True))
views.STACKDB.save(usage)
self.mox.ReplayAll()
@ -508,23 +508,22 @@ class StacktackUsageParsingTestCase(unittest.TestCase):
event = 'compute.instance.delete.end'
raw = utils.create_raw(self.mox, delete_decimal, event=event,
json_str=json_str)
usage = self.mox.CreateMockAnything()
usage.instance = INSTANCE_ID_1
usage.request_id = REQUEST_ID_1
usage.instance_type_id = '1'
usage.launched_at = launch_decimal
views.STACKDB.get_instance_usage(instance=INSTANCE_ID_1,
launched_at=launch_decimal)\
.AndReturn(usage)
views.STACKDB.save(usage)
delete = self.mox.CreateMockAnything()
delete.instance = INSTANCE_ID_1
delete.launched_at = launch_decimal
delete.deleted_at = delete_decimal
views.STACKDB.create_instance_delete(instance=INSTANCE_ID_1,
launched_at=launch_decimal,
deleted_at=delete_decimal,
raw=raw)\
.AndReturn(delete)
views.STACKDB.save(delete)
self.mox.ReplayAll()
views._process_delete(raw)
self.assertEqual(usage.instance, INSTANCE_ID_1)
self.assertEqual(usage.request_id, REQUEST_ID_1)
self.assertEqual(usage.instance_type_id, '1')
self.assertEqual(usage.launched_at, launch_decimal)
self.assertEqual(usage.deleted_at, delete_decimal)
self.assertEqual(delete.instance, INSTANCE_ID_1)
self.assertEqual(delete.launched_at, launch_decimal)
self.assertEqual(delete.deleted_at, delete_decimal)
self.mox.VerifyAll()
def test_process_exists(self):
@ -538,9 +537,13 @@ class StacktackUsageParsingTestCase(unittest.TestCase):
raw = utils.create_raw(self.mox, current_decimal, event=event,
json_str=json_str)
usage = self.mox.CreateMockAnything()
launched_range = (launch_decimal, launch_decimal+1)
views.STACKDB.get_instance_usage(instance=INSTANCE_ID_1,
launched_at=launch_decimal)\
launched_at__range=launched_range)\
.AndReturn(usage)
views.STACKDB.get_instance_delete(instance=INSTANCE_ID_1,
launched_at__range=launched_range)\
.AndReturn(None)
exists_values = {
'message_id': MESSAGE_ID_1,
'instance': INSTANCE_ID_1,
@ -570,9 +573,14 @@ class StacktackUsageParsingTestCase(unittest.TestCase):
raw = utils.create_raw(self.mox, current_decimal, event=event,
json_str=json_str)
usage = self.mox.CreateMockAnything()
launched_range = (launch_decimal, launch_decimal+1)
views.STACKDB.get_instance_usage(instance=INSTANCE_ID_1,
launched_at=launch_decimal)\
launched_at__range=launched_range)\
.AndReturn(usage)
delete = self.mox.CreateMockAnything()
views.STACKDB.get_instance_delete(instance=INSTANCE_ID_1,
launched_at__range=launched_range)\
.AndReturn(delete)
exists_values = {
'message_id': MESSAGE_ID_1,
'instance': INSTANCE_ID_1,
@ -580,6 +588,7 @@ class StacktackUsageParsingTestCase(unittest.TestCase):
'deleted_at': deleted_decimal,
'instance_type_id': '1',
'usage': usage,
'delete': delete,
'raw': raw,
}
exists = self.mox.CreateMockAnything()