Refactoring verifier

This commit is contained in:
Andrew Melton 2013-07-02 16:32:57 -04:00
parent c5842e2a7b
commit 284ef7178d
3 changed files with 192 additions and 229 deletions

View File

@ -79,9 +79,45 @@ class VerifierTestCase(unittest.TestCase):
models.InstanceExists.objects = self.mox.CreateMockAnything()
self.mox.StubOutWithMock(models, 'JsonReport', use_mock_anything=True)
models.JsonReport.objects = self.mox.CreateMockAnything()
self._setup_verifier()
def _setup_verifier(self):
self.config = {
"tick_time": 30,
"settle_time": 5,
"settle_units": "minutes",
"pool_size": 2,
"enable_notifications": False,
}
self.pool = self.mox.CreateMockAnything()
self.verifier = dbverifier.Verifier(self.config, pool=self.pool)
self.config_notif = {
"tick_time": 30,
"settle_time": 5,
"settle_units": "minutes",
"pool_size": 2,
"enable_notifications": True,
"rabbit": {
"durable_queue": False,
"host": "10.0.0.1",
"port": 5672,
"userid": "rabbit",
"password": "rabbit",
"virtual_host": "/",
"exchange_name": "stacktach",
}
}
self.pool_notif = self.mox.CreateMockAnything()
self.verifier_notif = dbverifier.Verifier(self.config_notif,
pool=self.pool_notif)
def tearDown(self):
self.mox.UnsetStubs()
self.verifier = None
self.pool = None
self.verifier_notif = None
self.pool_notif = None
def test_verify_for_launch(self):
exist = self.mox.CreateMockAnything()
@ -574,7 +610,6 @@ class VerifierTestCase(unittest.TestCase):
self.mox.VerifyAll()
def test_verify_for_range_without_callback(self):
pool = self.mox.CreateMockAnything()
when_max = datetime.datetime.utcnow()
results = self.mox.CreateMockAnything()
models.InstanceExists.objects.select_related().AndReturn(results)
@ -593,10 +628,12 @@ class VerifierTestCase(unittest.TestCase):
results.__iter__().AndReturn([exist1, exist2].__iter__())
exist1.save()
exist2.save()
pool.apply_async(dbverifier._verify, args=(exist1,), callback=None)
pool.apply_async(dbverifier._verify, args=(exist2,), callback=None)
self.pool.apply_async(dbverifier._verify, args=(exist1,),
callback=None)
self.pool.apply_async(dbverifier._verify, args=(exist2,),
callback=None)
self.mox.ReplayAll()
dbverifier.verify_for_range(pool, when_max)
self.verifier.verify_for_range(when_max)
self.assertEqual(exist1.status, 'verifying')
self.assertEqual(exist2.status, 'verifying')
self.mox.VerifyAll()
@ -622,10 +659,12 @@ class VerifierTestCase(unittest.TestCase):
results.__iter__().AndReturn([exist1, exist2].__iter__())
exist1.save()
exist2.save()
pool.apply_async(dbverifier._verify, args=(exist1,), callback=callback)
pool.apply_async(dbverifier._verify, args=(exist2,), callback=callback)
self.pool.apply_async(dbverifier._verify, args=(exist1,),
callback=callback)
self.pool.apply_async(dbverifier._verify, args=(exist2,),
callback=callback)
self.mox.ReplayAll()
dbverifier.verify_for_range(pool, when_max, callback=callback)
self.verifier.verify_for_range(when_max, callback=callback)
self.assertEqual(exist1.status, 'verifying')
self.assertEqual(exist2.status, 'verifying')
self.mox.VerifyAll()
@ -702,140 +741,63 @@ class VerifierTestCase(unittest.TestCase):
self.mox.VerifyAll()
def test_run_notifications(self):
config = {
"tick_time": 30,
"settle_time": 5,
"settle_units": "minutes",
"pool_size": 2,
"enable_notifications": True,
"rabbit": {
"durable_queue": False,
"host": "10.0.0.1",
"port": 5672,
"userid": "rabbit",
"password": "rabbit",
"virtual_host": "/",
"exchange_name": "stacktach"
}
}
self.mox.StubOutWithMock(multiprocessing, 'Pool')
pool = self.mox.CreateMockAnything()
multiprocessing.Pool(2).AndReturn(pool)
self.mox.StubOutWithMock(dbverifier, '_create_exchange')
exchange = self.mox.CreateMockAnything()
dbverifier._create_exchange('stacktach', 'topic', durable=False)\
.AndReturn(exchange)
self.mox.StubOutWithMock(dbverifier, '_create_connection')
conn = self.mox.CreateMockAnything()
dbverifier._create_connection(config).AndReturn(conn)
dbverifier._create_connection(self.config_notif).AndReturn(conn)
conn.__enter__().AndReturn(conn)
self.mox.StubOutWithMock(dbverifier, '_run')
dbverifier._run(config, pool, callback=mox.IgnoreArg())
self.mox.StubOutWithMock(self.verifier_notif, '_run')
self.verifier_notif._run(callback=mox.IgnoreArg())
conn.__exit__(None, None, None)
self.mox.ReplayAll()
dbverifier.run(config)
self.verifier_notif.run()
self.mox.VerifyAll()
def test_run_notifications_with_routing_keys(self):
config = {
"tick_time": 30,
"settle_time": 5,
"settle_units": "minutes",
"pool_size": 2,
"enable_notifications": True,
"rabbit": {
"durable_queue": False,
"host": "10.0.0.1",
"port": 5672,
"userid": "rabbit",
"password": "rabbit",
"virtual_host": "/",
"exchange_name": "stacktach",
}
}
self.mox.StubOutWithMock(multiprocessing, 'Pool')
pool = self.mox.CreateMockAnything()
multiprocessing.Pool(2).AndReturn(pool)
self.mox.StubOutWithMock(dbverifier, '_create_exchange')
exchange = self.mox.CreateMockAnything()
dbverifier._create_exchange('stacktach', 'topic', durable=False) \
.AndReturn(exchange)
self.mox.StubOutWithMock(dbverifier, '_create_connection')
conn = self.mox.CreateMockAnything()
dbverifier._create_connection(config).AndReturn(conn)
dbverifier._create_connection(self.config_notif).AndReturn(conn)
conn.__enter__().AndReturn(conn)
self.mox.StubOutWithMock(dbverifier, '_run')
dbverifier._run(config, pool, callback=mox.IgnoreArg())
self.mox.StubOutWithMock(self.verifier_notif, '_run')
self.verifier_notif._run(callback=mox.IgnoreArg())
conn.__exit__(None, None, None)
self.mox.ReplayAll()
dbverifier.run(config)
self.verifier_notif.run()
self.mox.VerifyAll()
def test_run_no_notifications(self):
config = {
"tick_time": 30,
"settle_time": 5,
"settle_units": "minutes",
"pool_size": 2,
"enable_notifications": False,
}
self.mox.StubOutWithMock(multiprocessing, 'Pool')
pool = self.mox.CreateMockAnything()
multiprocessing.Pool(2).AndReturn(pool)
self.mox.StubOutWithMock(dbverifier, '_run')
dbverifier._run(config, pool)
self.mox.StubOutWithMock(self.verifier, '_run')
self.verifier._run()
self.mox.ReplayAll()
dbverifier.run(config)
self.verifier.run()
self.mox.VerifyAll()
def test_run_once_notifications(self):
config = {
"tick_time": 30,
"settle_time": 5,
"settle_units": "minutes",
"pool_size": 2,
"enable_notifications": True,
"rabbit": {
"durable_queue": False,
"host": "10.0.0.1",
"port": 5672,
"userid": "rabbit",
"password": "rabbit",
"virtual_host": "/",
"exchange_name": "stacktach"
}
}
self.mox.StubOutWithMock(multiprocessing, 'Pool')
pool = self.mox.CreateMockAnything()
multiprocessing.Pool(2).AndReturn(pool)
self.mox.StubOutWithMock(dbverifier, '_create_exchange')
exchange = self.mox.CreateMockAnything()
dbverifier._create_exchange('stacktach', 'topic', durable=False) \
.AndReturn(exchange)
self.mox.StubOutWithMock(dbverifier, '_create_connection')
conn = self.mox.CreateMockAnything()
dbverifier._create_connection(config).AndReturn(conn)
dbverifier._create_connection(self.config_notif).AndReturn(conn)
conn.__enter__().AndReturn(conn)
self.mox.StubOutWithMock(dbverifier, '_run_once')
dbverifier._run_once(config, pool, callback=mox.IgnoreArg())
self.mox.StubOutWithMock(self.verifier_notif, '_run_once')
self.verifier_notif._run_once(callback=mox.IgnoreArg())
conn.__exit__(None, None, None)
self.mox.ReplayAll()
dbverifier.run_once(config)
self.verifier_notif.run_once()
self.mox.VerifyAll()
def test_run_once_no_notifications(self):
config = {
"tick_time": 30,
"settle_time": 5,
"settle_units": "minutes",
"pool_size": 2,
"enable_notifications": False,
}
self.mox.StubOutWithMock(multiprocessing, 'Pool')
pool = self.mox.CreateMockAnything()
multiprocessing.Pool(2).AndReturn(pool)
self.mox.StubOutWithMock(dbverifier, '_run_once')
dbverifier._run_once(config, pool)
self.mox.StubOutWithMock(self.verifier, '_run_once')
self.verifier._run_once()
self.mox.ReplayAll()
dbverifier.run_once(config)
self.verifier.run_once()
self.mox.VerifyAll()

View File

@ -263,6 +263,26 @@ def _verify_with_reconciled_data(exist, ex):
delete_type="InstanceReconcile")
def _attempt_reconciled_verify(exist, orig_e):
verified = False
try:
# Attempt to verify against reconciled data
_verify_with_reconciled_data(exist, orig_e)
verified = True
_mark_exist_verified(exist)
except NotFound, rec_e:
# No reconciled data, just mark it failed
_mark_exist_failed(exist, reason=str(orig_e))
except VerificationException, rec_e:
# Verification failed against reconciled data, mark it failed
# using the second failure.
_mark_exist_failed(exist, reason=str(rec_e))
except Exception, rec_e:
_mark_exist_failed(exist, reason=rec_e.__class__.__name__)
LOG.exception(rec_e)
return verified
def _verify(exist):
verified = False
try:
@ -276,21 +296,7 @@ def _verify(exist):
_mark_exist_verified(exist)
except VerificationException, orig_e:
# Something is wrong with the InstanceUsage record
try:
# Attempt to verify against reconciled data
_verify_with_reconciled_data(exist, orig_e)
verified = True
_mark_exist_verified(exist)
except NotFound, rec_e:
# No reconciled data, just mark it failed
_mark_exist_failed(exist, reason=str(orig_e))
except VerificationException, rec_e:
# Verification failed against reconciled data, mark it failed
# using the second failure.
_mark_exist_failed(exist, reason=str(rec_e))
except Exception, rec_e:
_mark_exist_failed(exist, reason=rec_e.__class__.__name__)
LOG.exception(rec_e)
verified = _attempt_reconciled_verify(exist, orig_e)
except Exception, e:
_mark_exist_failed(exist, reason=e.__class__.__name__)
LOG.exception(e)
@ -298,54 +304,6 @@ def _verify(exist):
return verified, exist
results = []
def verify_for_range(pool, ending_max, callback=None):
exists = _list_exists(ending_max=ending_max,
status=models.InstanceExists.PENDING)
count = exists.count()
added = 0
update_interval = datetime.timedelta(seconds=30)
next_update = datetime.datetime.utcnow() + update_interval
LOG.info("Adding %s exists to queue." % count)
while added < count:
for exist in exists[0:1000]:
exist.status = models.InstanceExists.VERIFYING
exist.save()
result = pool.apply_async(_verify, args=(exist,),
callback=callback)
results.append(result)
added += 1
if datetime.datetime.utcnow() > next_update:
values = ((added,) + clean_results())
msg = "N: %s, P: %s, S: %s, E: %s" % values
LOG.info(msg)
next_update = datetime.datetime.utcnow() + update_interval
return count
def clean_results():
global results
pending = []
finished = 0
successful = 0
for result in results:
if result.ready():
finished += 1
if result.successful():
successful += 1
else:
pending.append(result)
results = pending
errored = finished - successful
return len(results), successful, errored
def _send_notification(message, routing_key, connection, exchange):
with kombu.pools.producers[connection].acquire(block=True) as producer:
kombu.common.maybe_declare(exchange, producer.channel)
@ -382,81 +340,122 @@ def _create_connection(config):
return kombu.connection.BrokerConnection(**conn_params)
def _run(config, pool, callback=None):
tick_time = config['tick_time']
settle_units = config['settle_units']
settle_time = config['settle_time']
while True:
with transaction.commit_on_success():
now = datetime.datetime.utcnow()
kwargs = {settle_units: settle_time}
ending_max = now - datetime.timedelta(**kwargs)
new = verify_for_range(pool, ending_max, callback=callback)
class Verifier(object):
def __init__(self, config, pool=None):
self.config = config
self.pool = pool or multiprocessing.Pool(self.config['pool_size'])
self.results = []
msg = "N: %s, P: %s, S: %s, E: %s" % ((new,) + clean_results())
LOG.info(msg)
sleep(tick_time)
def clean_results(self):
pending = []
finished = 0
successful = 0
for result in self.results:
if result.ready():
finished += 1
if result.successful():
successful += 1
else:
pending.append(result)
def run(config):
pool = multiprocessing.Pool(config['pool_size'])
results = pending
errored = finished - successful
return len(results), successful, errored
if config['enable_notifications']:
exchange = _create_exchange(config['rabbit']['exchange_name'],
'topic',
durable=config['rabbit']['durable_queue'])
routing_keys = None
if config['rabbit'].get('routing_keys') is not None:
routing_keys = config['rabbit']['routing_keys']
def verify_for_range(self, ending_max, callback=None):
exists = _list_exists(ending_max=ending_max,
status=models.InstanceExists.PENDING)
count = exists.count()
added = 0
update_interval = datetime.timedelta(seconds=30)
next_update = datetime.datetime.utcnow() + update_interval
LOG.info("Adding %s exists to queue." % count)
while added < count:
for exist in exists[0:1000]:
exist.status = models.InstanceExists.VERIFYING
exist.save()
result = self.pool.apply_async(_verify, args=(exist,),
callback=callback)
self.results.append(result)
added += 1
if datetime.datetime.utcnow() > next_update:
values = ((added,) + self.clean_results())
msg = "N: %s, P: %s, S: %s, E: %s" % values
LOG.info(msg)
next_update = datetime.datetime.utcnow() + update_interval
return count
with _create_connection(config) as conn:
def callback(result):
(verified, exist) = result
if verified:
send_verified_notification(exist, conn, exchange,
routing_keys=routing_keys)
def _run(self, callback=None):
tick_time = self.config['tick_time']
settle_units = self.config['settle_units']
settle_time = self.config['settle_time']
while True:
with transaction.commit_on_success():
now = datetime.datetime.utcnow()
kwargs = {settle_units: settle_time}
ending_max = now - datetime.timedelta(**kwargs)
new = self.verify_for_range(ending_max,
callback=callback)
_run(config, pool, callback=callback)
else:
_run(config, pool)
values = ((new,) + self.clean_results())
msg = "N: %s, P: %s, S: %s, E: %s" % values
LOG.info(msg)
sleep(tick_time)
def run(self):
if self.config['enable_notifications']:
exchange = _create_exchange(self.config['rabbit']['exchange_name'],
'topic',
durable=self.config['rabbit']['durable_queue'])
routing_keys = None
if self.config['rabbit'].get('routing_keys') is not None:
routing_keys = self.config['rabbit']['routing_keys']
def _run_once(config, pool, callback=None):
tick_time = config['tick_time']
settle_units = config['settle_units']
settle_time = config['settle_time']
now = datetime.datetime.utcnow()
kwargs = {settle_units: settle_time}
ending_max = now - datetime.timedelta(**kwargs)
new = verify_for_range(pool, ending_max, callback=callback)
with _create_connection(self.config) as conn:
def callback(result):
(verified, exist) = result
if verified:
send_verified_notification(exist, conn, exchange,
routing_keys=routing_keys)
LOG.info("Verifying %s exist events" % new)
while len(results) > 0:
LOG.info("P: %s, F: %s, E: %s" % clean_results())
sleep(tick_time)
self._run(callback=callback)
else:
self._run()
def _run_once(self, callback=None):
tick_time = self.config['tick_time']
settle_units = self.config['settle_units']
settle_time = self.config['settle_time']
now = datetime.datetime.utcnow()
kwargs = {settle_units: settle_time}
ending_max = now - datetime.timedelta(**kwargs)
new = self.verify_for_range(ending_max, callback=callback)
def run_once(config):
pool = multiprocessing.Pool(config['pool_size'])
LOG.info("Verifying %s exist events" % new)
while len(self.results) > 0:
LOG.info("P: %s, F: %s, E: %s" % self.clean_results())
sleep(tick_time)
if config['enable_notifications']:
exchange = _create_exchange(config['rabbit']['exchange_name'],
'topic',
durable=config['rabbit']['durable_queue'])
routing_keys = None
if config['rabbit'].get('routing_keys') is not None:
routing_keys = config['rabbit']['routing_keys']
def run_once(self):
if self.config['enable_notifications']:
exchange = _create_exchange(self.config['rabbit']['exchange_name'],
'topic',
durable=self.config['rabbit']['durable_queue'])
routing_keys = None
if self.config['rabbit'].get('routing_keys') is not None:
routing_keys = self.config['rabbit']['routing_keys']
with _create_connection(config) as conn:
def callback(result):
(verified, exist) = result
if verified:
send_verified_notification(exist, conn, exchange,
routing_keys=routing_keys)
with _create_connection(self.config) as conn:
def callback(result):
(verified, exist) = result
if verified:
send_verified_notification(exist, conn, exchange,
routing_keys=routing_keys)
_run_once(config, pool, callback=callback)
else:
_run_once(config, pool)
self._run_once(callback=callback)
else:
self._run_once()
if __name__ == '__main__':
@ -486,7 +485,8 @@ if __name__ == '__main__':
config = {'tick_time': args.tick_time, 'settle_time': args.settle_time,
'settle_units': args.settle_units, 'pool_size': args.pool_size}
verifier = Verifier(config)
if args.run_once:
run_once(config)
verifier.run_once(config)
else:
run(config)
verifier.run(config)

View File

@ -59,7 +59,8 @@ if __name__ == '__main__':
with open(config_filename, "r") as f:
config = json.load(f)
process = Process(target=dbverifier.run, args=(config, ))
verifier = dbverifier.Verifier(config)
process = Process(target=verifier.run)
process.start()
signal.signal(signal.SIGINT, kill_time)
signal.signal(signal.SIGTERM, kill_time)