Reconciling failed verifications
This commit is contained in:
parent
9ec29dbaeb
commit
7d66759a46
@ -117,6 +117,9 @@ class InstanceDeletes(models.Model):
|
||||
decimal_places=6, db_index=True)
|
||||
raw = models.ForeignKey(RawData, null=True)
|
||||
|
||||
def deployment(self):
|
||||
return self.raw.deployment
|
||||
|
||||
|
||||
class InstanceReconcile(models.Model):
|
||||
row_created = models.DateTimeField(auto_now_add=True)
|
||||
@ -181,6 +184,9 @@ class InstanceExists(models.Model):
|
||||
os_version = models.TextField(null=True, blank=True)
|
||||
rax_options = models.TextField(null=True, blank=True)
|
||||
|
||||
def deployment(self):
|
||||
return self.raw.deployment
|
||||
|
||||
|
||||
class Timing(models.Model):
|
||||
"""Each Timing record corresponds to a .start/.end event pair
|
||||
|
@ -23,6 +23,7 @@ import json
|
||||
from stacktach import models
|
||||
from stacktach.reconciler import exceptions
|
||||
from stacktach.reconciler import nova
|
||||
from stacktach import datetime_to_decimal as dt
|
||||
|
||||
DEFAULT_CLIENT = nova.JSONBridgeClient
|
||||
|
||||
@ -61,8 +62,8 @@ class Reconciler(object):
|
||||
with open(config['region_mapping_loc']) as f:
|
||||
return json.load(f)
|
||||
|
||||
def _region_for_launch(self, launch):
|
||||
deployment = launch.deployment()
|
||||
def _region_for_usage(self, usage):
|
||||
deployment = usage.deployment()
|
||||
if deployment:
|
||||
deployment_name = str(deployment.name)
|
||||
if deployment_name in self.region_mapping:
|
||||
@ -72,14 +73,14 @@ class Reconciler(object):
|
||||
else:
|
||||
return False
|
||||
|
||||
def _reconcile_instance(self, launch, src,
|
||||
def _reconcile_instance(self, usage, src,
|
||||
launched_at=None, deleted_at=None,
|
||||
instance_type_id=None):
|
||||
values = {
|
||||
'instance': launch.instance,
|
||||
'launched_at': (launched_at or launch.launched_at),
|
||||
'instance': usage.instance,
|
||||
'launched_at': (launched_at or usage.launched_at),
|
||||
'deleted_at': deleted_at,
|
||||
'instance_type_id': (instance_type_id or launch.instance_type_id),
|
||||
'instance_type_id': (instance_type_id or usage.instance_type_id),
|
||||
'source': 'reconciler:%s' % src,
|
||||
}
|
||||
models.InstanceReconcile(**values).save()
|
||||
@ -88,7 +89,7 @@ class Reconciler(object):
|
||||
period_beginning):
|
||||
reconciled = False
|
||||
launch = models.InstanceUsage.objects.get(id=launched_id)
|
||||
region = self._region_for_launch(launch)
|
||||
region = self._region_for_usage(launch)
|
||||
try:
|
||||
instance = self.client.get_instance(region, launch.instance)
|
||||
if instance['deleted'] and instance['deleted_at'] is not None:
|
||||
@ -105,3 +106,26 @@ class Reconciler(object):
|
||||
reconciled = False
|
||||
|
||||
return reconciled
|
||||
|
||||
def failed_validation(self, exists):
|
||||
reconcilable = False
|
||||
region = self._region_for_usage(exists)
|
||||
deleted_at = None
|
||||
try:
|
||||
instance = self.client.get_instance(region, exists.instance)
|
||||
if (instance['launched_at'] == exists.launched_at and
|
||||
instance['instance_type_id'] == exists.instance_type_id):
|
||||
if instance['deleted'] and exists.deleted_at is not None:
|
||||
if instance['deleted_at'] == exists.deleted_at:
|
||||
deleted_at = exists.deleted_at
|
||||
reconcilable = True
|
||||
elif not instance['deleted'] and exists.deleted_at is None:
|
||||
reconcilable = True
|
||||
except exceptions.NotFound:
|
||||
reconcilable = False
|
||||
|
||||
if reconcilable:
|
||||
self._reconcile_instance(exists, self.client.src_str,
|
||||
deleted_at=deleted_at)
|
||||
|
||||
return reconcilable
|
||||
|
@ -116,7 +116,7 @@ class ReconcilerTestCase(unittest.TestCase):
|
||||
deployment.name = 'RegionOne.prod.cell1'
|
||||
launch.deployment().AndReturn(deployment)
|
||||
self.mox.ReplayAll()
|
||||
region = self.reconciler._region_for_launch(launch)
|
||||
region = self.reconciler._region_for_usage(launch)
|
||||
self.assertEqual('RegionOne', region)
|
||||
self.mox.VerifyAll()
|
||||
|
||||
@ -126,7 +126,7 @@ class ReconcilerTestCase(unittest.TestCase):
|
||||
deployment.name = 'RegionOne.prod.cell2'
|
||||
launch.deployment().AndReturn(deployment)
|
||||
self.mox.ReplayAll()
|
||||
region = self.reconciler._region_for_launch(launch)
|
||||
region = self.reconciler._region_for_usage(launch)
|
||||
self.assertFalse(region)
|
||||
self.mox.VerifyAll()
|
||||
|
||||
@ -134,7 +134,7 @@ class ReconcilerTestCase(unittest.TestCase):
|
||||
launch = self.mox.CreateMockAnything()
|
||||
launch.deployment()
|
||||
self.mox.ReplayAll()
|
||||
region = self.reconciler._region_for_launch(launch)
|
||||
region = self.reconciler._region_for_usage(launch)
|
||||
self.assertFalse(region)
|
||||
self.mox.VerifyAll()
|
||||
|
||||
@ -188,6 +188,120 @@ class ReconcilerTestCase(unittest.TestCase):
|
||||
self.assertFalse(result)
|
||||
self.mox.VerifyAll()
|
||||
|
||||
def test_failed_validation(self):
|
||||
beginning_d = utils.decimal_utc()
|
||||
exists = self.mox.CreateMockAnything()
|
||||
exists.instance = INSTANCE_ID_1
|
||||
launched_at = beginning_d - (60*60)
|
||||
exists.launched_at = launched_at
|
||||
exists.instance_type_id = 1
|
||||
exists.deleted_at = None
|
||||
deployment = self.mox.CreateMockAnything()
|
||||
exists.deployment().AndReturn(deployment)
|
||||
deployment.name = 'RegionOne.prod.cell1'
|
||||
rec_inst = self._fake_reconciler_instance(launched_at=launched_at)
|
||||
self.client.get_instance('RegionOne', INSTANCE_ID_1).AndReturn(rec_inst)
|
||||
reconcile_vals = {
|
||||
'instance': exists.instance,
|
||||
'launched_at': exists.launched_at,
|
||||
'deleted_at': exists.deleted_at,
|
||||
'instance_type_id': exists.instance_type_id,
|
||||
'source': 'reconciler:mocked_client'
|
||||
}
|
||||
result = self.mox.CreateMockAnything()
|
||||
models.InstanceReconcile(**reconcile_vals).AndReturn(result)
|
||||
result.save()
|
||||
self.mox.ReplayAll()
|
||||
result = self.reconciler.failed_validation(exists)
|
||||
self.assertTrue(result)
|
||||
self.mox.VerifyAll()
|
||||
|
||||
def test_failed_validation_deleted(self):
|
||||
beginning_d = utils.decimal_utc()
|
||||
exists = self.mox.CreateMockAnything()
|
||||
exists.instance = INSTANCE_ID_1
|
||||
launched_at = beginning_d - (60*60)
|
||||
exists.launched_at = launched_at
|
||||
exists.instance_type_id = 1
|
||||
exists.deleted_at = beginning_d
|
||||
deployment = self.mox.CreateMockAnything()
|
||||
exists.deployment().AndReturn(deployment)
|
||||
deployment.name = 'RegionOne.prod.cell1'
|
||||
rec_inst = self._fake_reconciler_instance(launched_at=launched_at,
|
||||
deleted=True,
|
||||
deleted_at=beginning_d)
|
||||
self.client.get_instance('RegionOne', INSTANCE_ID_1).AndReturn(rec_inst)
|
||||
reconcile_vals = {
|
||||
'instance': exists.instance,
|
||||
'launched_at': exists.launched_at,
|
||||
'deleted_at': exists.deleted_at,
|
||||
'instance_type_id': exists.instance_type_id,
|
||||
'source': 'reconciler:mocked_client'
|
||||
}
|
||||
result = self.mox.CreateMockAnything()
|
||||
models.InstanceReconcile(**reconcile_vals).AndReturn(result)
|
||||
result.save()
|
||||
self.mox.ReplayAll()
|
||||
result = self.reconciler.failed_validation(exists)
|
||||
self.assertTrue(result)
|
||||
self.mox.VerifyAll()
|
||||
|
||||
def test_failed_validation_deleted_not_matching(self):
|
||||
beginning_d = utils.decimal_utc()
|
||||
exists = self.mox.CreateMockAnything()
|
||||
exists.instance = INSTANCE_ID_1
|
||||
launched_at = beginning_d - (60*60)
|
||||
exists.launched_at = launched_at
|
||||
exists.instance_type_id = 1
|
||||
exists.deleted_at = beginning_d
|
||||
deployment = self.mox.CreateMockAnything()
|
||||
exists.deployment().AndReturn(deployment)
|
||||
deployment.name = 'RegionOne.prod.cell1'
|
||||
rec_inst = self._fake_reconciler_instance(launched_at=launched_at,
|
||||
deleted=True,
|
||||
deleted_at=beginning_d+1)
|
||||
self.client.get_instance('RegionOne', INSTANCE_ID_1).AndReturn(rec_inst)
|
||||
self.mox.ReplayAll()
|
||||
result = self.reconciler.failed_validation(exists)
|
||||
self.assertFalse(result)
|
||||
self.mox.VerifyAll()
|
||||
|
||||
def test_failed_validation_deleted_not_deleted_from_client(self):
|
||||
beginning_d = utils.decimal_utc()
|
||||
exists = self.mox.CreateMockAnything()
|
||||
exists.instance = INSTANCE_ID_1
|
||||
launched_at = beginning_d - (60*60)
|
||||
exists.launched_at = launched_at
|
||||
exists.instance_type_id = 1
|
||||
exists.deleted_at = beginning_d
|
||||
deployment = self.mox.CreateMockAnything()
|
||||
exists.deployment().AndReturn(deployment)
|
||||
deployment.name = 'RegionOne.prod.cell1'
|
||||
rec_inst = self._fake_reconciler_instance(launched_at=launched_at)
|
||||
self.client.get_instance('RegionOne', INSTANCE_ID_1).AndReturn(rec_inst)
|
||||
self.mox.ReplayAll()
|
||||
result = self.reconciler.failed_validation(exists)
|
||||
self.assertFalse(result)
|
||||
self.mox.VerifyAll()
|
||||
|
||||
def test_failed_validation_not_found(self):
|
||||
beginning_d = utils.decimal_utc()
|
||||
exists = self.mox.CreateMockAnything()
|
||||
exists.instance = INSTANCE_ID_1
|
||||
launched_at = beginning_d - (60*60)
|
||||
exists.launched_at = launched_at
|
||||
exists.instance_type_id = 1
|
||||
exists.deleted_at = None
|
||||
deployment = self.mox.CreateMockAnything()
|
||||
exists.deployment().AndReturn(deployment)
|
||||
deployment.name = 'RegionOne.prod.cell1'
|
||||
ex = exceptions.NotFound()
|
||||
self.client.get_instance('RegionOne', INSTANCE_ID_1).AndRaise(ex)
|
||||
self.mox.ReplayAll()
|
||||
result = self.reconciler.failed_validation(exists)
|
||||
self.assertFalse(result)
|
||||
self.mox.VerifyAll()
|
||||
|
||||
|
||||
json_bridge_config = {
|
||||
'url': 'http://json_bridge.example.com/query/',
|
||||
|
@ -21,9 +21,9 @@
|
||||
import datetime
|
||||
import decimal
|
||||
import json
|
||||
import time
|
||||
import unittest
|
||||
import uuid
|
||||
import multiprocessing
|
||||
|
||||
import kombu.common
|
||||
import kombu.entity
|
||||
@ -90,7 +90,10 @@ class VerifierTestCase(unittest.TestCase):
|
||||
"enable_notifications": False,
|
||||
}
|
||||
self.pool = self.mox.CreateMockAnything()
|
||||
self.verifier = dbverifier.Verifier(self.config, pool=self.pool)
|
||||
self.reconciler = self.mox.CreateMockAnything()
|
||||
self.verifier = dbverifier.Verifier(self.config,
|
||||
pool=self.pool,
|
||||
rec=self.reconciler)
|
||||
|
||||
self.config_notif = {
|
||||
"tick_time": 30,
|
||||
@ -109,8 +112,10 @@ class VerifierTestCase(unittest.TestCase):
|
||||
}
|
||||
}
|
||||
self.pool_notif = self.mox.CreateMockAnything()
|
||||
self.reconciler_notif = self.mox.CreateMockAnything()
|
||||
self.verifier_notif = dbverifier.Verifier(self.config_notif,
|
||||
pool=self.pool_notif)
|
||||
pool=self.pool_notif,
|
||||
rec=self.reconciler)
|
||||
|
||||
def tearDown(self):
|
||||
self.mox.UnsetStubs()
|
||||
@ -638,9 +643,103 @@ class VerifierTestCase(unittest.TestCase):
|
||||
self.assertEqual(exist2.status, 'verifying')
|
||||
self.mox.VerifyAll()
|
||||
|
||||
def test_clean_results_full(self):
|
||||
self.verifier.reconcile = True
|
||||
result_not_ready = self.mox.CreateMockAnything()
|
||||
result_not_ready.ready().AndReturn(False)
|
||||
result_unsuccessful = self.mox.CreateMockAnything()
|
||||
result_unsuccessful.ready().AndReturn(True)
|
||||
result_unsuccessful.successful().AndReturn(False)
|
||||
result_successful = self.mox.CreateMockAnything()
|
||||
result_successful.ready().AndReturn(True)
|
||||
result_successful.successful().AndReturn(True)
|
||||
result_successful.get().AndReturn((True, None))
|
||||
result_failed_verification = self.mox.CreateMockAnything()
|
||||
result_failed_verification.ready().AndReturn(True)
|
||||
result_failed_verification.successful().AndReturn(True)
|
||||
failed_exists = self.mox.CreateMockAnything()
|
||||
result_failed_verification.get().AndReturn((False, failed_exists))
|
||||
self.verifier.results = [result_not_ready,
|
||||
result_unsuccessful,
|
||||
result_successful,
|
||||
result_failed_verification]
|
||||
self.mox.ReplayAll()
|
||||
(result_count, success_count, errored) = self.verifier.clean_results()
|
||||
self.assertEqual(result_count, 1)
|
||||
self.assertEqual(success_count, 2)
|
||||
self.assertEqual(errored, 1)
|
||||
self.assertEqual(len(self.verifier.results), 1)
|
||||
self.assertEqual(self.verifier.results[0], result_not_ready)
|
||||
self.assertEqual(len(self.verifier.failed), 1)
|
||||
self.assertEqual(self.verifier.failed[0], result_failed_verification)
|
||||
self.mox.VerifyAll()
|
||||
|
||||
def test_clean_results_pending(self):
|
||||
self.verifier.reconcile = True
|
||||
result_not_ready = self.mox.CreateMockAnything()
|
||||
result_not_ready.ready().AndReturn(False)
|
||||
self.verifier.results = [result_not_ready]
|
||||
self.mox.ReplayAll()
|
||||
(result_count, success_count, errored) = self.verifier.clean_results()
|
||||
self.assertEqual(result_count, 1)
|
||||
self.assertEqual(success_count, 0)
|
||||
self.assertEqual(errored, 0)
|
||||
self.assertEqual(len(self.verifier.results), 1)
|
||||
self.assertEqual(self.verifier.results[0], result_not_ready)
|
||||
self.assertEqual(len(self.verifier.failed), 0)
|
||||
self.mox.VerifyAll()
|
||||
|
||||
def test_clean_results_successful(self):
|
||||
self.verifier.reconcile = True
|
||||
result_successful = self.mox.CreateMockAnything()
|
||||
result_successful.ready().AndReturn(True)
|
||||
result_successful.successful().AndReturn(True)
|
||||
result_successful.get().AndReturn((True, None))
|
||||
self.verifier.results = [result_successful]
|
||||
self.mox.ReplayAll()
|
||||
(result_count, success_count, errored) = self.verifier.clean_results()
|
||||
self.assertEqual(result_count, 0)
|
||||
self.assertEqual(success_count, 1)
|
||||
self.assertEqual(errored, 0)
|
||||
self.assertEqual(len(self.verifier.results), 0)
|
||||
self.assertEqual(len(self.verifier.failed), 0)
|
||||
self.mox.VerifyAll()
|
||||
|
||||
def test_clean_results_unsuccessful(self):
|
||||
self.verifier.reconcile = True
|
||||
result_unsuccessful = self.mox.CreateMockAnything()
|
||||
result_unsuccessful.ready().AndReturn(True)
|
||||
result_unsuccessful.successful().AndReturn(False)
|
||||
self.verifier.results = [result_unsuccessful]
|
||||
self.mox.ReplayAll()
|
||||
(result_count, success_count, errored) = self.verifier.clean_results()
|
||||
self.assertEqual(result_count, 0)
|
||||
self.assertEqual(success_count, 0)
|
||||
self.assertEqual(errored, 1)
|
||||
self.assertEqual(len(self.verifier.results), 0)
|
||||
self.assertEqual(len(self.verifier.failed), 0)
|
||||
self.mox.VerifyAll()
|
||||
|
||||
def test_clean_results_fail_verification(self):
|
||||
self.verifier.reconcile = True
|
||||
result_failed_verification = self.mox.CreateMockAnything()
|
||||
result_failed_verification.ready().AndReturn(True)
|
||||
result_failed_verification.successful().AndReturn(True)
|
||||
failed_exists = self.mox.CreateMockAnything()
|
||||
result_failed_verification.get().AndReturn((False, failed_exists))
|
||||
self.verifier.results = [result_failed_verification]
|
||||
self.mox.ReplayAll()
|
||||
(result_count, success_count, errored) = self.verifier.clean_results()
|
||||
self.assertEqual(result_count, 0)
|
||||
self.assertEqual(success_count, 1)
|
||||
self.assertEqual(errored, 0)
|
||||
self.assertEqual(len(self.verifier.results), 0)
|
||||
self.assertEqual(len(self.verifier.failed), 1)
|
||||
self.assertEqual(self.verifier.failed[0], failed_exists)
|
||||
self.mox.VerifyAll()
|
||||
|
||||
def test_verify_for_range_with_callback(self):
|
||||
callback = self.mox.CreateMockAnything()
|
||||
pool = self.mox.CreateMockAnything()
|
||||
when_max = datetime.datetime.utcnow()
|
||||
results = self.mox.CreateMockAnything()
|
||||
models.InstanceExists.objects.select_related().AndReturn(results)
|
||||
@ -669,6 +768,18 @@ class VerifierTestCase(unittest.TestCase):
|
||||
self.assertEqual(exist2.status, 'verifying')
|
||||
self.mox.VerifyAll()
|
||||
|
||||
def test_reconcile_failed(self):
|
||||
self.verifier.reconcile = True
|
||||
exists1 = self.mox.CreateMockAnything()
|
||||
exists2 = self.mox.CreateMockAnything()
|
||||
self.verifier.failed = [exists1, exists2]
|
||||
self.reconciler.failed_validation(exists1)
|
||||
self.reconciler.failed_validation(exists2)
|
||||
self.mox.ReplayAll()
|
||||
self.verifier.reconcile_failed()
|
||||
self.assertEqual(len(self.verifier.failed), 0)
|
||||
self.mox.VerifyAll()
|
||||
|
||||
def test_send_verified_notification_default_routing_key(self):
|
||||
connection = self.mox.CreateMockAnything()
|
||||
exchange = self.mox.CreateMockAnything()
|
||||
@ -750,7 +861,7 @@ class VerifierTestCase(unittest.TestCase):
|
||||
dbverifier._create_connection(self.config_notif).AndReturn(conn)
|
||||
conn.__enter__().AndReturn(conn)
|
||||
self.mox.StubOutWithMock(self.verifier_notif, '_run')
|
||||
self.verifier_notif._run(callback=mox.IgnoreArg())
|
||||
self.verifier_notif._run(callback=mox.Not(mox.Is(None)))
|
||||
conn.__exit__(None, None, None)
|
||||
self.mox.ReplayAll()
|
||||
self.verifier_notif.run()
|
||||
@ -766,7 +877,7 @@ class VerifierTestCase(unittest.TestCase):
|
||||
dbverifier._create_connection(self.config_notif).AndReturn(conn)
|
||||
conn.__enter__().AndReturn(conn)
|
||||
self.mox.StubOutWithMock(self.verifier_notif, '_run')
|
||||
self.verifier_notif._run(callback=mox.IgnoreArg())
|
||||
self.verifier_notif._run(callback=mox.Not(mox.Is(None)))
|
||||
conn.__exit__(None, None, None)
|
||||
self.mox.ReplayAll()
|
||||
self.verifier_notif.run()
|
||||
@ -789,7 +900,7 @@ class VerifierTestCase(unittest.TestCase):
|
||||
dbverifier._create_connection(self.config_notif).AndReturn(conn)
|
||||
conn.__enter__().AndReturn(conn)
|
||||
self.mox.StubOutWithMock(self.verifier_notif, '_run_once')
|
||||
self.verifier_notif._run_once(callback=mox.IgnoreArg())
|
||||
self.verifier_notif._run_once(callback=mox.Not(mox.Is(None)))
|
||||
conn.__exit__(None, None, None)
|
||||
self.mox.ReplayAll()
|
||||
self.verifier_notif.run_once()
|
||||
@ -801,3 +912,123 @@ class VerifierTestCase(unittest.TestCase):
|
||||
self.mox.ReplayAll()
|
||||
self.verifier.run_once()
|
||||
self.mox.VerifyAll()
|
||||
|
||||
def test_run_full_no_notifications(self):
|
||||
self.verifier.reconcile = True
|
||||
self.mox.StubOutWithMock(self.verifier, '_keep_running')
|
||||
self.verifier._keep_running().AndReturn(True)
|
||||
start = datetime.datetime.utcnow()
|
||||
self.mox.StubOutWithMock(self.verifier, '_utcnow')
|
||||
self.verifier._utcnow().AndReturn(start)
|
||||
settle_time = self.config['settle_time']
|
||||
settle_units = self.config['settle_units']
|
||||
settle_offset = {settle_units: settle_time}
|
||||
ending_max = start - datetime.timedelta(**settle_offset)
|
||||
self.mox.StubOutWithMock(self.verifier, 'verify_for_range')
|
||||
self.verifier.verify_for_range(ending_max, callback=None)
|
||||
self.mox.StubOutWithMock(self.verifier, 'reconcile_failed')
|
||||
result1 = self.mox.CreateMockAnything()
|
||||
result2 = self.mox.CreateMockAnything()
|
||||
self.verifier.results = [result1, result2]
|
||||
result1.ready().AndReturn(True)
|
||||
result1.successful().AndReturn(True)
|
||||
result1.get().AndReturn((True, None))
|
||||
result2.ready().AndReturn(True)
|
||||
result2.successful().AndReturn(True)
|
||||
result2.get().AndReturn((True, None))
|
||||
self.verifier.reconcile_failed()
|
||||
self.mox.StubOutWithMock(time, 'sleep', use_mock_anything=True)
|
||||
time.sleep(self.config['tick_time'])
|
||||
self.verifier._keep_running().AndReturn(False)
|
||||
self.mox.ReplayAll()
|
||||
self.verifier.run()
|
||||
self.mox.VerifyAll()
|
||||
|
||||
def test_run_full(self):
|
||||
self.verifier_notif.reconcile = True
|
||||
self.mox.StubOutWithMock(self.verifier_notif, '_keep_running')
|
||||
self.verifier_notif._keep_running().AndReturn(True)
|
||||
start = datetime.datetime.utcnow()
|
||||
self.mox.StubOutWithMock(self.verifier_notif, '_utcnow')
|
||||
self.verifier_notif._utcnow().AndReturn(start)
|
||||
settle_time = self.config['settle_time']
|
||||
settle_units = self.config['settle_units']
|
||||
settle_offset = {settle_units: settle_time}
|
||||
ending_max = start - datetime.timedelta(**settle_offset)
|
||||
self.mox.StubOutWithMock(self.verifier_notif, 'verify_for_range')
|
||||
self.verifier_notif.verify_for_range(ending_max,
|
||||
callback=mox.Not(mox.Is(None)))
|
||||
self.mox.StubOutWithMock(self.verifier_notif, 'reconcile_failed')
|
||||
result1 = self.mox.CreateMockAnything()
|
||||
result2 = self.mox.CreateMockAnything()
|
||||
self.verifier_notif.results = [result1, result2]
|
||||
result1.ready().AndReturn(True)
|
||||
result1.successful().AndReturn(True)
|
||||
result1.get().AndReturn((True, None))
|
||||
result2.ready().AndReturn(True)
|
||||
result2.successful().AndReturn(True)
|
||||
result2.get().AndReturn((True, None))
|
||||
self.verifier_notif.reconcile_failed()
|
||||
self.mox.StubOutWithMock(time, 'sleep', use_mock_anything=True)
|
||||
time.sleep(self.config['tick_time'])
|
||||
self.verifier_notif._keep_running().AndReturn(False)
|
||||
self.mox.ReplayAll()
|
||||
self.verifier_notif.run()
|
||||
self.mox.VerifyAll()
|
||||
|
||||
def test_run_once_full_no_notifications(self):
|
||||
self.verifier.reconcile = True
|
||||
start = datetime.datetime.utcnow()
|
||||
self.mox.StubOutWithMock(self.verifier, '_utcnow')
|
||||
self.verifier._utcnow().AndReturn(start)
|
||||
settle_time = self.config['settle_time']
|
||||
settle_units = self.config['settle_units']
|
||||
settle_offset = {settle_units: settle_time}
|
||||
ending_max = start - datetime.timedelta(**settle_offset)
|
||||
self.mox.StubOutWithMock(self.verifier, 'verify_for_range')
|
||||
self.verifier.verify_for_range(ending_max, callback=None)
|
||||
result1 = self.mox.CreateMockAnything()
|
||||
result2 = self.mox.CreateMockAnything()
|
||||
self.verifier.results = [result1, result2]
|
||||
result1.ready().AndReturn(True)
|
||||
result1.successful().AndReturn(True)
|
||||
result1.get().AndReturn((True, None))
|
||||
result2.ready().AndReturn(True)
|
||||
result2.successful().AndReturn(True)
|
||||
result2.get().AndReturn((True, None))
|
||||
self.mox.StubOutWithMock(self.verifier, 'reconcile_failed')
|
||||
self.verifier.reconcile_failed()
|
||||
self.mox.StubOutWithMock(time, 'sleep', use_mock_anything=True)
|
||||
time.sleep(self.config['tick_time'])
|
||||
self.mox.ReplayAll()
|
||||
self.verifier.run_once()
|
||||
self.mox.VerifyAll()
|
||||
|
||||
def test_run_once_full(self):
|
||||
self.verifier_notif.reconcile = True
|
||||
start = datetime.datetime.utcnow()
|
||||
self.mox.StubOutWithMock(self.verifier_notif, '_utcnow')
|
||||
self.verifier_notif._utcnow().AndReturn(start)
|
||||
settle_time = self.config['settle_time']
|
||||
settle_units = self.config['settle_units']
|
||||
settle_offset = {settle_units: settle_time}
|
||||
ending_max = start - datetime.timedelta(**settle_offset)
|
||||
self.mox.StubOutWithMock(self.verifier_notif, 'verify_for_range')
|
||||
self.verifier_notif.verify_for_range(ending_max,
|
||||
callback=mox.Not(mox.Is(None)))
|
||||
result1 = self.mox.CreateMockAnything()
|
||||
result2 = self.mox.CreateMockAnything()
|
||||
self.verifier_notif.results = [result1, result2]
|
||||
result1.ready().AndReturn(True)
|
||||
result1.successful().AndReturn(True)
|
||||
result1.get().AndReturn((True, None))
|
||||
result2.ready().AndReturn(True)
|
||||
result2.successful().AndReturn(True)
|
||||
result2.get().AndReturn((True, None))
|
||||
self.mox.StubOutWithMock(self.verifier_notif, 'reconcile_failed')
|
||||
self.verifier_notif.reconcile_failed()
|
||||
self.mox.StubOutWithMock(time, 'sleep', use_mock_anything=True)
|
||||
time.sleep(self.config['tick_time'])
|
||||
self.mox.ReplayAll()
|
||||
self.verifier_notif.run_once()
|
||||
self.mox.VerifyAll()
|
||||
|
@ -23,7 +23,7 @@ import datetime
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from time import sleep
|
||||
import time
|
||||
import uuid
|
||||
|
||||
from django.db import transaction
|
||||
@ -44,6 +44,7 @@ LOG = stacklog.get_logger()
|
||||
|
||||
from stacktach import models
|
||||
from stacktach import datetime_to_decimal as dt
|
||||
from stacktach import reconciler
|
||||
from verifier import AmbiguousResults
|
||||
from verifier import FieldMismatch
|
||||
from verifier import NotFound
|
||||
@ -341,10 +342,25 @@ def _create_connection(config):
|
||||
|
||||
|
||||
class Verifier(object):
|
||||
def __init__(self, config, pool=None):
|
||||
|
||||
def __init__(self, config, pool=None, rec=None):
|
||||
self.config = config
|
||||
self.pool = pool or multiprocessing.Pool(self.config['pool_size'])
|
||||
self.reconcile = self.config.get('reconcile', False)
|
||||
self.reconciler = self._load_reconciler(config, rec=rec)
|
||||
self.results = []
|
||||
self.failed = []
|
||||
|
||||
def _load_reconciler(self, config, rec=None):
|
||||
if rec:
|
||||
return rec
|
||||
|
||||
if self.reconcile:
|
||||
config_loc = config.get('reconciler_config',
|
||||
'/etc/stacktach/reconciler_config.json')
|
||||
with open(config_loc, 'r') as rec_config_file:
|
||||
rec_config = json.load(rec_config_file)
|
||||
return reconciler.Reconciler(rec_config)
|
||||
|
||||
def clean_results(self):
|
||||
pending = []
|
||||
@ -355,6 +371,9 @@ class Verifier(object):
|
||||
if result.ready():
|
||||
finished += 1
|
||||
if result.successful():
|
||||
(verified, exists) = result.get()
|
||||
if self.reconcile and not verified:
|
||||
self.failed.append(exists)
|
||||
successful += 1
|
||||
else:
|
||||
pending.append(result)
|
||||
@ -386,22 +405,34 @@ class Verifier(object):
|
||||
next_update = datetime.datetime.utcnow() + update_interval
|
||||
return count
|
||||
|
||||
def reconcile_failed(self):
|
||||
for failed_exist in self.failed:
|
||||
self.reconciler.failed_validation(failed_exist)
|
||||
self.failed = []
|
||||
|
||||
def _keep_running(self):
|
||||
return True
|
||||
|
||||
def _utcnow(self):
|
||||
return datetime.datetime.utcnow()
|
||||
|
||||
def _run(self, callback=None):
|
||||
tick_time = self.config['tick_time']
|
||||
settle_units = self.config['settle_units']
|
||||
settle_time = self.config['settle_time']
|
||||
while True:
|
||||
while self._keep_running():
|
||||
with transaction.commit_on_success():
|
||||
now = datetime.datetime.utcnow()
|
||||
now = self._utcnow()
|
||||
kwargs = {settle_units: settle_time}
|
||||
ending_max = now - datetime.timedelta(**kwargs)
|
||||
new = self.verify_for_range(ending_max,
|
||||
callback=callback)
|
||||
|
||||
values = ((new,) + self.clean_results())
|
||||
if self.reconcile:
|
||||
self.reconcile_failed()
|
||||
msg = "N: %s, P: %s, S: %s, E: %s" % values
|
||||
LOG.info(msg)
|
||||
sleep(tick_time)
|
||||
time.sleep(tick_time)
|
||||
|
||||
def run(self):
|
||||
if self.config['enable_notifications']:
|
||||
@ -427,7 +458,7 @@ class Verifier(object):
|
||||
tick_time = self.config['tick_time']
|
||||
settle_units = self.config['settle_units']
|
||||
settle_time = self.config['settle_time']
|
||||
now = datetime.datetime.utcnow()
|
||||
now = self._utcnow()
|
||||
kwargs = {settle_units: settle_time}
|
||||
ending_max = now - datetime.timedelta(**kwargs)
|
||||
new = self.verify_for_range(ending_max, callback=callback)
|
||||
@ -435,7 +466,9 @@ class Verifier(object):
|
||||
LOG.info("Verifying %s exist events" % new)
|
||||
while len(self.results) > 0:
|
||||
LOG.info("P: %s, F: %s, E: %s" % self.clean_results())
|
||||
sleep(tick_time)
|
||||
if self.reconcile:
|
||||
self.reconcile_failed()
|
||||
time.sleep(tick_time)
|
||||
|
||||
def run_once(self):
|
||||
if self.config['enable_notifications']:
|
||||
|
Loading…
x
Reference in New Issue
Block a user