From 7d66759a460ffd2363d1457352523e60639b6c74 Mon Sep 17 00:00:00 2001 From: Andrew Melton Date: Wed, 3 Jul 2013 16:16:16 -0400 Subject: [PATCH] Reconciling failed verifications --- stacktach/models.py | 6 + stacktach/reconciler/__init__.py | 38 ++++- tests/unit/test_reconciler.py | 120 ++++++++++++++- tests/unit/test_verifier_db.py | 245 ++++++++++++++++++++++++++++++- verifier/dbverifier.py | 49 ++++++- 5 files changed, 433 insertions(+), 25 deletions(-) diff --git a/stacktach/models.py b/stacktach/models.py index 44c5b75..f3db5a3 100644 --- a/stacktach/models.py +++ b/stacktach/models.py @@ -117,6 +117,9 @@ class InstanceDeletes(models.Model): decimal_places=6, db_index=True) raw = models.ForeignKey(RawData, null=True) + def deployment(self): + return self.raw.deployment + class InstanceReconcile(models.Model): row_created = models.DateTimeField(auto_now_add=True) @@ -181,6 +184,9 @@ class InstanceExists(models.Model): os_version = models.TextField(null=True, blank=True) rax_options = models.TextField(null=True, blank=True) + def deployment(self): + return self.raw.deployment + class Timing(models.Model): """Each Timing record corresponds to a .start/.end event pair diff --git a/stacktach/reconciler/__init__.py b/stacktach/reconciler/__init__.py index 68b7138..3e5dcaf 100644 --- a/stacktach/reconciler/__init__.py +++ b/stacktach/reconciler/__init__.py @@ -23,6 +23,7 @@ import json from stacktach import models from stacktach.reconciler import exceptions from stacktach.reconciler import nova +from stacktach import datetime_to_decimal as dt DEFAULT_CLIENT = nova.JSONBridgeClient @@ -61,8 +62,8 @@ class Reconciler(object): with open(config['region_mapping_loc']) as f: return json.load(f) - def _region_for_launch(self, launch): - deployment = launch.deployment() + def _region_for_usage(self, usage): + deployment = usage.deployment() if deployment: deployment_name = str(deployment.name) if deployment_name in self.region_mapping: @@ -72,14 +73,14 @@ class Reconciler(object): else: return False - def _reconcile_instance(self, launch, src, + def _reconcile_instance(self, usage, src, launched_at=None, deleted_at=None, instance_type_id=None): values = { - 'instance': launch.instance, - 'launched_at': (launched_at or launch.launched_at), + 'instance': usage.instance, + 'launched_at': (launched_at or usage.launched_at), 'deleted_at': deleted_at, - 'instance_type_id': (instance_type_id or launch.instance_type_id), + 'instance_type_id': (instance_type_id or usage.instance_type_id), 'source': 'reconciler:%s' % src, } models.InstanceReconcile(**values).save() @@ -88,7 +89,7 @@ class Reconciler(object): period_beginning): reconciled = False launch = models.InstanceUsage.objects.get(id=launched_id) - region = self._region_for_launch(launch) + region = self._region_for_usage(launch) try: instance = self.client.get_instance(region, launch.instance) if instance['deleted'] and instance['deleted_at'] is not None: @@ -105,3 +106,26 @@ class Reconciler(object): reconciled = False return reconciled + + def failed_validation(self, exists): + reconcilable = False + region = self._region_for_usage(exists) + deleted_at = None + try: + instance = self.client.get_instance(region, exists.instance) + if (instance['launched_at'] == exists.launched_at and + instance['instance_type_id'] == exists.instance_type_id): + if instance['deleted'] and exists.deleted_at is not None: + if instance['deleted_at'] == exists.deleted_at: + deleted_at = exists.deleted_at + reconcilable = True + elif not instance['deleted'] and exists.deleted_at is None: + reconcilable = True + except exceptions.NotFound: + reconcilable = False + + if reconcilable: + self._reconcile_instance(exists, self.client.src_str, + deleted_at=deleted_at) + + return reconcilable diff --git a/tests/unit/test_reconciler.py b/tests/unit/test_reconciler.py index 8ec98fa..e854a26 100644 --- a/tests/unit/test_reconciler.py +++ b/tests/unit/test_reconciler.py @@ -116,7 +116,7 @@ class ReconcilerTestCase(unittest.TestCase): deployment.name = 'RegionOne.prod.cell1' launch.deployment().AndReturn(deployment) self.mox.ReplayAll() - region = self.reconciler._region_for_launch(launch) + region = self.reconciler._region_for_usage(launch) self.assertEqual('RegionOne', region) self.mox.VerifyAll() @@ -126,7 +126,7 @@ class ReconcilerTestCase(unittest.TestCase): deployment.name = 'RegionOne.prod.cell2' launch.deployment().AndReturn(deployment) self.mox.ReplayAll() - region = self.reconciler._region_for_launch(launch) + region = self.reconciler._region_for_usage(launch) self.assertFalse(region) self.mox.VerifyAll() @@ -134,7 +134,7 @@ class ReconcilerTestCase(unittest.TestCase): launch = self.mox.CreateMockAnything() launch.deployment() self.mox.ReplayAll() - region = self.reconciler._region_for_launch(launch) + region = self.reconciler._region_for_usage(launch) self.assertFalse(region) self.mox.VerifyAll() @@ -188,6 +188,120 @@ class ReconcilerTestCase(unittest.TestCase): self.assertFalse(result) self.mox.VerifyAll() + def test_failed_validation(self): + beginning_d = utils.decimal_utc() + exists = self.mox.CreateMockAnything() + exists.instance = INSTANCE_ID_1 + launched_at = beginning_d - (60*60) + exists.launched_at = launched_at + exists.instance_type_id = 1 + exists.deleted_at = None + deployment = self.mox.CreateMockAnything() + exists.deployment().AndReturn(deployment) + deployment.name = 'RegionOne.prod.cell1' + rec_inst = self._fake_reconciler_instance(launched_at=launched_at) + self.client.get_instance('RegionOne', INSTANCE_ID_1).AndReturn(rec_inst) + reconcile_vals = { + 'instance': exists.instance, + 'launched_at': exists.launched_at, + 'deleted_at': exists.deleted_at, + 'instance_type_id': exists.instance_type_id, + 'source': 'reconciler:mocked_client' + } + result = self.mox.CreateMockAnything() + models.InstanceReconcile(**reconcile_vals).AndReturn(result) + result.save() + self.mox.ReplayAll() + result = self.reconciler.failed_validation(exists) + self.assertTrue(result) + self.mox.VerifyAll() + + def test_failed_validation_deleted(self): + beginning_d = utils.decimal_utc() + exists = self.mox.CreateMockAnything() + exists.instance = INSTANCE_ID_1 + launched_at = beginning_d - (60*60) + exists.launched_at = launched_at + exists.instance_type_id = 1 + exists.deleted_at = beginning_d + deployment = self.mox.CreateMockAnything() + exists.deployment().AndReturn(deployment) + deployment.name = 'RegionOne.prod.cell1' + rec_inst = self._fake_reconciler_instance(launched_at=launched_at, + deleted=True, + deleted_at=beginning_d) + self.client.get_instance('RegionOne', INSTANCE_ID_1).AndReturn(rec_inst) + reconcile_vals = { + 'instance': exists.instance, + 'launched_at': exists.launched_at, + 'deleted_at': exists.deleted_at, + 'instance_type_id': exists.instance_type_id, + 'source': 'reconciler:mocked_client' + } + result = self.mox.CreateMockAnything() + models.InstanceReconcile(**reconcile_vals).AndReturn(result) + result.save() + self.mox.ReplayAll() + result = self.reconciler.failed_validation(exists) + self.assertTrue(result) + self.mox.VerifyAll() + + def test_failed_validation_deleted_not_matching(self): + beginning_d = utils.decimal_utc() + exists = self.mox.CreateMockAnything() + exists.instance = INSTANCE_ID_1 + launched_at = beginning_d - (60*60) + exists.launched_at = launched_at + exists.instance_type_id = 1 + exists.deleted_at = beginning_d + deployment = self.mox.CreateMockAnything() + exists.deployment().AndReturn(deployment) + deployment.name = 'RegionOne.prod.cell1' + rec_inst = self._fake_reconciler_instance(launched_at=launched_at, + deleted=True, + deleted_at=beginning_d+1) + self.client.get_instance('RegionOne', INSTANCE_ID_1).AndReturn(rec_inst) + self.mox.ReplayAll() + result = self.reconciler.failed_validation(exists) + self.assertFalse(result) + self.mox.VerifyAll() + + def test_failed_validation_deleted_not_deleted_from_client(self): + beginning_d = utils.decimal_utc() + exists = self.mox.CreateMockAnything() + exists.instance = INSTANCE_ID_1 + launched_at = beginning_d - (60*60) + exists.launched_at = launched_at + exists.instance_type_id = 1 + exists.deleted_at = beginning_d + deployment = self.mox.CreateMockAnything() + exists.deployment().AndReturn(deployment) + deployment.name = 'RegionOne.prod.cell1' + rec_inst = self._fake_reconciler_instance(launched_at=launched_at) + self.client.get_instance('RegionOne', INSTANCE_ID_1).AndReturn(rec_inst) + self.mox.ReplayAll() + result = self.reconciler.failed_validation(exists) + self.assertFalse(result) + self.mox.VerifyAll() + + def test_failed_validation_not_found(self): + beginning_d = utils.decimal_utc() + exists = self.mox.CreateMockAnything() + exists.instance = INSTANCE_ID_1 + launched_at = beginning_d - (60*60) + exists.launched_at = launched_at + exists.instance_type_id = 1 + exists.deleted_at = None + deployment = self.mox.CreateMockAnything() + exists.deployment().AndReturn(deployment) + deployment.name = 'RegionOne.prod.cell1' + ex = exceptions.NotFound() + self.client.get_instance('RegionOne', INSTANCE_ID_1).AndRaise(ex) + self.mox.ReplayAll() + result = self.reconciler.failed_validation(exists) + self.assertFalse(result) + self.mox.VerifyAll() + json_bridge_config = { 'url': 'http://json_bridge.example.com/query/', diff --git a/tests/unit/test_verifier_db.py b/tests/unit/test_verifier_db.py index 8b27881..b706982 100644 --- a/tests/unit/test_verifier_db.py +++ b/tests/unit/test_verifier_db.py @@ -21,9 +21,9 @@ import datetime import decimal import json +import time import unittest import uuid -import multiprocessing import kombu.common import kombu.entity @@ -90,7 +90,10 @@ class VerifierTestCase(unittest.TestCase): "enable_notifications": False, } self.pool = self.mox.CreateMockAnything() - self.verifier = dbverifier.Verifier(self.config, pool=self.pool) + self.reconciler = self.mox.CreateMockAnything() + self.verifier = dbverifier.Verifier(self.config, + pool=self.pool, + rec=self.reconciler) self.config_notif = { "tick_time": 30, @@ -109,8 +112,10 @@ class VerifierTestCase(unittest.TestCase): } } self.pool_notif = self.mox.CreateMockAnything() + self.reconciler_notif = self.mox.CreateMockAnything() self.verifier_notif = dbverifier.Verifier(self.config_notif, - pool=self.pool_notif) + pool=self.pool_notif, + rec=self.reconciler) def tearDown(self): self.mox.UnsetStubs() @@ -638,9 +643,103 @@ class VerifierTestCase(unittest.TestCase): self.assertEqual(exist2.status, 'verifying') self.mox.VerifyAll() + def test_clean_results_full(self): + self.verifier.reconcile = True + result_not_ready = self.mox.CreateMockAnything() + result_not_ready.ready().AndReturn(False) + result_unsuccessful = self.mox.CreateMockAnything() + result_unsuccessful.ready().AndReturn(True) + result_unsuccessful.successful().AndReturn(False) + result_successful = self.mox.CreateMockAnything() + result_successful.ready().AndReturn(True) + result_successful.successful().AndReturn(True) + result_successful.get().AndReturn((True, None)) + result_failed_verification = self.mox.CreateMockAnything() + result_failed_verification.ready().AndReturn(True) + result_failed_verification.successful().AndReturn(True) + failed_exists = self.mox.CreateMockAnything() + result_failed_verification.get().AndReturn((False, failed_exists)) + self.verifier.results = [result_not_ready, + result_unsuccessful, + result_successful, + result_failed_verification] + self.mox.ReplayAll() + (result_count, success_count, errored) = self.verifier.clean_results() + self.assertEqual(result_count, 1) + self.assertEqual(success_count, 2) + self.assertEqual(errored, 1) + self.assertEqual(len(self.verifier.results), 1) + self.assertEqual(self.verifier.results[0], result_not_ready) + self.assertEqual(len(self.verifier.failed), 1) + self.assertEqual(self.verifier.failed[0], result_failed_verification) + self.mox.VerifyAll() + + def test_clean_results_pending(self): + self.verifier.reconcile = True + result_not_ready = self.mox.CreateMockAnything() + result_not_ready.ready().AndReturn(False) + self.verifier.results = [result_not_ready] + self.mox.ReplayAll() + (result_count, success_count, errored) = self.verifier.clean_results() + self.assertEqual(result_count, 1) + self.assertEqual(success_count, 0) + self.assertEqual(errored, 0) + self.assertEqual(len(self.verifier.results), 1) + self.assertEqual(self.verifier.results[0], result_not_ready) + self.assertEqual(len(self.verifier.failed), 0) + self.mox.VerifyAll() + + def test_clean_results_successful(self): + self.verifier.reconcile = True + result_successful = self.mox.CreateMockAnything() + result_successful.ready().AndReturn(True) + result_successful.successful().AndReturn(True) + result_successful.get().AndReturn((True, None)) + self.verifier.results = [result_successful] + self.mox.ReplayAll() + (result_count, success_count, errored) = self.verifier.clean_results() + self.assertEqual(result_count, 0) + self.assertEqual(success_count, 1) + self.assertEqual(errored, 0) + self.assertEqual(len(self.verifier.results), 0) + self.assertEqual(len(self.verifier.failed), 0) + self.mox.VerifyAll() + + def test_clean_results_unsuccessful(self): + self.verifier.reconcile = True + result_unsuccessful = self.mox.CreateMockAnything() + result_unsuccessful.ready().AndReturn(True) + result_unsuccessful.successful().AndReturn(False) + self.verifier.results = [result_unsuccessful] + self.mox.ReplayAll() + (result_count, success_count, errored) = self.verifier.clean_results() + self.assertEqual(result_count, 0) + self.assertEqual(success_count, 0) + self.assertEqual(errored, 1) + self.assertEqual(len(self.verifier.results), 0) + self.assertEqual(len(self.verifier.failed), 0) + self.mox.VerifyAll() + + def test_clean_results_fail_verification(self): + self.verifier.reconcile = True + result_failed_verification = self.mox.CreateMockAnything() + result_failed_verification.ready().AndReturn(True) + result_failed_verification.successful().AndReturn(True) + failed_exists = self.mox.CreateMockAnything() + result_failed_verification.get().AndReturn((False, failed_exists)) + self.verifier.results = [result_failed_verification] + self.mox.ReplayAll() + (result_count, success_count, errored) = self.verifier.clean_results() + self.assertEqual(result_count, 0) + self.assertEqual(success_count, 1) + self.assertEqual(errored, 0) + self.assertEqual(len(self.verifier.results), 0) + self.assertEqual(len(self.verifier.failed), 1) + self.assertEqual(self.verifier.failed[0], failed_exists) + self.mox.VerifyAll() + def test_verify_for_range_with_callback(self): callback = self.mox.CreateMockAnything() - pool = self.mox.CreateMockAnything() when_max = datetime.datetime.utcnow() results = self.mox.CreateMockAnything() models.InstanceExists.objects.select_related().AndReturn(results) @@ -669,6 +768,18 @@ class VerifierTestCase(unittest.TestCase): self.assertEqual(exist2.status, 'verifying') self.mox.VerifyAll() + def test_reconcile_failed(self): + self.verifier.reconcile = True + exists1 = self.mox.CreateMockAnything() + exists2 = self.mox.CreateMockAnything() + self.verifier.failed = [exists1, exists2] + self.reconciler.failed_validation(exists1) + self.reconciler.failed_validation(exists2) + self.mox.ReplayAll() + self.verifier.reconcile_failed() + self.assertEqual(len(self.verifier.failed), 0) + self.mox.VerifyAll() + def test_send_verified_notification_default_routing_key(self): connection = self.mox.CreateMockAnything() exchange = self.mox.CreateMockAnything() @@ -750,7 +861,7 @@ class VerifierTestCase(unittest.TestCase): dbverifier._create_connection(self.config_notif).AndReturn(conn) conn.__enter__().AndReturn(conn) self.mox.StubOutWithMock(self.verifier_notif, '_run') - self.verifier_notif._run(callback=mox.IgnoreArg()) + self.verifier_notif._run(callback=mox.Not(mox.Is(None))) conn.__exit__(None, None, None) self.mox.ReplayAll() self.verifier_notif.run() @@ -766,7 +877,7 @@ class VerifierTestCase(unittest.TestCase): dbverifier._create_connection(self.config_notif).AndReturn(conn) conn.__enter__().AndReturn(conn) self.mox.StubOutWithMock(self.verifier_notif, '_run') - self.verifier_notif._run(callback=mox.IgnoreArg()) + self.verifier_notif._run(callback=mox.Not(mox.Is(None))) conn.__exit__(None, None, None) self.mox.ReplayAll() self.verifier_notif.run() @@ -789,7 +900,7 @@ class VerifierTestCase(unittest.TestCase): dbverifier._create_connection(self.config_notif).AndReturn(conn) conn.__enter__().AndReturn(conn) self.mox.StubOutWithMock(self.verifier_notif, '_run_once') - self.verifier_notif._run_once(callback=mox.IgnoreArg()) + self.verifier_notif._run_once(callback=mox.Not(mox.Is(None))) conn.__exit__(None, None, None) self.mox.ReplayAll() self.verifier_notif.run_once() @@ -801,3 +912,123 @@ class VerifierTestCase(unittest.TestCase): self.mox.ReplayAll() self.verifier.run_once() self.mox.VerifyAll() + + def test_run_full_no_notifications(self): + self.verifier.reconcile = True + self.mox.StubOutWithMock(self.verifier, '_keep_running') + self.verifier._keep_running().AndReturn(True) + start = datetime.datetime.utcnow() + self.mox.StubOutWithMock(self.verifier, '_utcnow') + self.verifier._utcnow().AndReturn(start) + settle_time = self.config['settle_time'] + settle_units = self.config['settle_units'] + settle_offset = {settle_units: settle_time} + ending_max = start - datetime.timedelta(**settle_offset) + self.mox.StubOutWithMock(self.verifier, 'verify_for_range') + self.verifier.verify_for_range(ending_max, callback=None) + self.mox.StubOutWithMock(self.verifier, 'reconcile_failed') + result1 = self.mox.CreateMockAnything() + result2 = self.mox.CreateMockAnything() + self.verifier.results = [result1, result2] + result1.ready().AndReturn(True) + result1.successful().AndReturn(True) + result1.get().AndReturn((True, None)) + result2.ready().AndReturn(True) + result2.successful().AndReturn(True) + result2.get().AndReturn((True, None)) + self.verifier.reconcile_failed() + self.mox.StubOutWithMock(time, 'sleep', use_mock_anything=True) + time.sleep(self.config['tick_time']) + self.verifier._keep_running().AndReturn(False) + self.mox.ReplayAll() + self.verifier.run() + self.mox.VerifyAll() + + def test_run_full(self): + self.verifier_notif.reconcile = True + self.mox.StubOutWithMock(self.verifier_notif, '_keep_running') + self.verifier_notif._keep_running().AndReturn(True) + start = datetime.datetime.utcnow() + self.mox.StubOutWithMock(self.verifier_notif, '_utcnow') + self.verifier_notif._utcnow().AndReturn(start) + settle_time = self.config['settle_time'] + settle_units = self.config['settle_units'] + settle_offset = {settle_units: settle_time} + ending_max = start - datetime.timedelta(**settle_offset) + self.mox.StubOutWithMock(self.verifier_notif, 'verify_for_range') + self.verifier_notif.verify_for_range(ending_max, + callback=mox.Not(mox.Is(None))) + self.mox.StubOutWithMock(self.verifier_notif, 'reconcile_failed') + result1 = self.mox.CreateMockAnything() + result2 = self.mox.CreateMockAnything() + self.verifier_notif.results = [result1, result2] + result1.ready().AndReturn(True) + result1.successful().AndReturn(True) + result1.get().AndReturn((True, None)) + result2.ready().AndReturn(True) + result2.successful().AndReturn(True) + result2.get().AndReturn((True, None)) + self.verifier_notif.reconcile_failed() + self.mox.StubOutWithMock(time, 'sleep', use_mock_anything=True) + time.sleep(self.config['tick_time']) + self.verifier_notif._keep_running().AndReturn(False) + self.mox.ReplayAll() + self.verifier_notif.run() + self.mox.VerifyAll() + + def test_run_once_full_no_notifications(self): + self.verifier.reconcile = True + start = datetime.datetime.utcnow() + self.mox.StubOutWithMock(self.verifier, '_utcnow') + self.verifier._utcnow().AndReturn(start) + settle_time = self.config['settle_time'] + settle_units = self.config['settle_units'] + settle_offset = {settle_units: settle_time} + ending_max = start - datetime.timedelta(**settle_offset) + self.mox.StubOutWithMock(self.verifier, 'verify_for_range') + self.verifier.verify_for_range(ending_max, callback=None) + result1 = self.mox.CreateMockAnything() + result2 = self.mox.CreateMockAnything() + self.verifier.results = [result1, result2] + result1.ready().AndReturn(True) + result1.successful().AndReturn(True) + result1.get().AndReturn((True, None)) + result2.ready().AndReturn(True) + result2.successful().AndReturn(True) + result2.get().AndReturn((True, None)) + self.mox.StubOutWithMock(self.verifier, 'reconcile_failed') + self.verifier.reconcile_failed() + self.mox.StubOutWithMock(time, 'sleep', use_mock_anything=True) + time.sleep(self.config['tick_time']) + self.mox.ReplayAll() + self.verifier.run_once() + self.mox.VerifyAll() + + def test_run_once_full(self): + self.verifier_notif.reconcile = True + start = datetime.datetime.utcnow() + self.mox.StubOutWithMock(self.verifier_notif, '_utcnow') + self.verifier_notif._utcnow().AndReturn(start) + settle_time = self.config['settle_time'] + settle_units = self.config['settle_units'] + settle_offset = {settle_units: settle_time} + ending_max = start - datetime.timedelta(**settle_offset) + self.mox.StubOutWithMock(self.verifier_notif, 'verify_for_range') + self.verifier_notif.verify_for_range(ending_max, + callback=mox.Not(mox.Is(None))) + result1 = self.mox.CreateMockAnything() + result2 = self.mox.CreateMockAnything() + self.verifier_notif.results = [result1, result2] + result1.ready().AndReturn(True) + result1.successful().AndReturn(True) + result1.get().AndReturn((True, None)) + result2.ready().AndReturn(True) + result2.successful().AndReturn(True) + result2.get().AndReturn((True, None)) + self.mox.StubOutWithMock(self.verifier_notif, 'reconcile_failed') + self.verifier_notif.reconcile_failed() + self.mox.StubOutWithMock(time, 'sleep', use_mock_anything=True) + time.sleep(self.config['tick_time']) + self.mox.ReplayAll() + self.verifier_notif.run_once() + self.mox.VerifyAll() diff --git a/verifier/dbverifier.py b/verifier/dbverifier.py index 1ed74b2..2c76e89 100644 --- a/verifier/dbverifier.py +++ b/verifier/dbverifier.py @@ -23,7 +23,7 @@ import datetime import json import os import sys -from time import sleep +import time import uuid from django.db import transaction @@ -44,6 +44,7 @@ LOG = stacklog.get_logger() from stacktach import models from stacktach import datetime_to_decimal as dt +from stacktach import reconciler from verifier import AmbiguousResults from verifier import FieldMismatch from verifier import NotFound @@ -341,10 +342,25 @@ def _create_connection(config): class Verifier(object): - def __init__(self, config, pool=None): + + def __init__(self, config, pool=None, rec=None): self.config = config self.pool = pool or multiprocessing.Pool(self.config['pool_size']) + self.reconcile = self.config.get('reconcile', False) + self.reconciler = self._load_reconciler(config, rec=rec) self.results = [] + self.failed = [] + + def _load_reconciler(self, config, rec=None): + if rec: + return rec + + if self.reconcile: + config_loc = config.get('reconciler_config', + '/etc/stacktach/reconciler_config.json') + with open(config_loc, 'r') as rec_config_file: + rec_config = json.load(rec_config_file) + return reconciler.Reconciler(rec_config) def clean_results(self): pending = [] @@ -355,6 +371,9 @@ class Verifier(object): if result.ready(): finished += 1 if result.successful(): + (verified, exists) = result.get() + if self.reconcile and not verified: + self.failed.append(exists) successful += 1 else: pending.append(result) @@ -386,22 +405,34 @@ class Verifier(object): next_update = datetime.datetime.utcnow() + update_interval return count + def reconcile_failed(self): + for failed_exist in self.failed: + self.reconciler.failed_validation(failed_exist) + self.failed = [] + + def _keep_running(self): + return True + + def _utcnow(self): + return datetime.datetime.utcnow() + def _run(self, callback=None): tick_time = self.config['tick_time'] settle_units = self.config['settle_units'] settle_time = self.config['settle_time'] - while True: + while self._keep_running(): with transaction.commit_on_success(): - now = datetime.datetime.utcnow() + now = self._utcnow() kwargs = {settle_units: settle_time} ending_max = now - datetime.timedelta(**kwargs) new = self.verify_for_range(ending_max, callback=callback) - values = ((new,) + self.clean_results()) + if self.reconcile: + self.reconcile_failed() msg = "N: %s, P: %s, S: %s, E: %s" % values LOG.info(msg) - sleep(tick_time) + time.sleep(tick_time) def run(self): if self.config['enable_notifications']: @@ -427,7 +458,7 @@ class Verifier(object): tick_time = self.config['tick_time'] settle_units = self.config['settle_units'] settle_time = self.config['settle_time'] - now = datetime.datetime.utcnow() + now = self._utcnow() kwargs = {settle_units: settle_time} ending_max = now - datetime.timedelta(**kwargs) new = self.verify_for_range(ending_max, callback=callback) @@ -435,7 +466,9 @@ class Verifier(object): LOG.info("Verifying %s exist events" % new) while len(self.results) > 0: LOG.info("P: %s, F: %s, E: %s" % self.clean_results()) - sleep(tick_time) + if self.reconcile: + self.reconcile_failed() + time.sleep(tick_time) def run_once(self): if self.config['enable_notifications']: