stacktach/verifier/dbverifier.py
2013-03-12 17:26:02 -04:00

291 lines
9.6 KiB
Python

import argparse
import datetime
import logging
import os
import sys
from time import sleep
import multiprocessing
POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
os.pardir, os.pardir))
if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'stacktach')):
sys.path.insert(0, POSSIBLE_TOPDIR)
from stacktach import models
from stacktach import datetime_to_decimal as dt
from verifier import AmbiguousResults
from verifier import FieldMismatch
from verifier import NotFound
from verifier import VerificationException
LOG = logging.getLogger(__name__)
LOG.setLevel(logging.DEBUG)
handler = logging.handlers.TimedRotatingFileHandler('verifier.log',
when='h', interval=6, backupCount=4)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
LOG.addHandler(handler)
def _list_exists(received_max=None, received_min=None, status=None):
params = {}
if received_max:
params['raw__when__lte'] = dt.dt_to_decimal(received_max)
if received_min:
params['raw__when__gt'] = dt.dt_to_decimal(received_min)
if status:
params['status'] = status
return models.InstanceExists.objects.select_related()\
.filter(**params).order_by('id')
def _find_launch(instance, launched):
start = launched - datetime.timedelta(microseconds=launched.microsecond)
end = start + datetime.timedelta(microseconds=999999)
params = {'instance': instance,
'launched_at__gte': dt.dt_to_decimal(start),
'launched_at__lte': dt.dt_to_decimal(end)}
return models.InstanceUsage.objects.filter(**params)
def _find_delete(instance, launched, deleted_max=None):
start = launched - datetime.timedelta(microseconds=launched.microsecond)
end = start + datetime.timedelta(microseconds=999999)
params = {'instance': instance,
'launched_at__gte': dt.dt_to_decimal(start),
'launched_at__lte': dt.dt_to_decimal(end)}
if deleted_max:
params['deleted_at__lte'] = dt.dt_to_decimal(deleted_max)
return models.InstanceDeletes.objects.filter(**params)
def _mark_exist_verified(exist):
exist.status = models.InstanceExists.VERIFIED
exist.save()
def _mark_exists_failed(exist):
exist.status = models.InstanceExists.FAILED
exist.save()
def _has_field(d1, d2, field1, field2=None):
if not field2:
field2 = field1
return d1.get(field1) is not None and d2.get(field2) is not None
def _verify_simple_field(d1, d2, field1, field2=None):
if not field2:
field2 = field1
if not _has_field(d1, d2, field1, field2):
return False
else:
if d1[field1] != d2[field2]:
return False
return True
def _verify_date_field(d1, d2, same_second=False):
if d1 and d2:
if d1 == d2:
return True
elif same_second and int(d1) == int(d2):
return True
return False
def _verify_for_launch(exist):
if exist.usage:
launch = exist.usage
else:
if models.InstanceUsage.objects\
.filter(instance=exist.instance).count() > 0:
launches = _find_launch(exist.instance,
dt.dt_from_decimal(exist.launched_at))
if launches.count() != 1:
query = {
'instance': exist.instance,
'launched_at': exist.launched_at
}
raise AmbiguousResults('InstanceUsage', query)
launch = launches[0]
else:
raise NotFound('InstanceUsage', {'instance': exist.instance})
if not _verify_date_field(launch.launched_at, exist.launched_at,
same_second=True):
raise FieldMismatch('launched_at', exist.launched_at,
launch.launched_at)
if launch.instance_type_id != exist.instance_type_id:
raise FieldMismatch('instance_type_id', exist.instance_type_id,
launch.instance_type_id)
def _verify_for_delete(exist):
delete = None
if exist.delete:
# We know we have a delete and we have it's id
delete = exist.delete
else:
if exist.deleted_at:
# We received this exists before the delete, go find it
deletes = _find_delete(exist.instance,
dt.dt_from_decimal(exist.launched_at))
if deletes.count() == 1:
delete = deletes[0]
else:
query = {
'instance': exist.instance,
'launched_at': exist.launched_at
}
raise NotFound('InstanceDelete', query)
else:
# We don't know if this is supposed to have a delete or not.
# Thus, we need to check if we have a delete for this instance.
# We need to be careful though, since we could be verifying an
# exist event that we got before the delete. So, we restrict the
# search to only deletes before the time this exist was sent.
# If we find any, we fail validation
deletes = _find_delete(exist.instance,
dt.dt_from_decimal(exist.launched_at),
dt.dt_from_decimal(exist.raw.when))
if deletes.count() > 0:
reason = 'Found InstanceDeletes for non-delete exist'
raise VerificationException(reason)
if delete:
if not _verify_date_field(delete.launched_at, exist.launched_at,
same_second=True):
raise FieldMismatch('launched_at', exist.launched_at,
delete.launched_at)
if not _verify_date_field(delete.deleted_at, exist.deleted_at,
same_second=True):
raise FieldMismatch('deleted_at', exist.deleted_at,
delete.deleted_at)
def _verify(exist):
try:
if not exist.launched_at:
raise VerificationException("Exists without a launched_at")
_verify_for_launch(exist)
_verify_for_delete(exist)
_mark_exist_verified(exist)
except VerificationException:
_mark_exists_failed(exist)
except Exception, e:
_mark_exists_failed(exist)
LOG.exception(e)
results = []
def verify_for_range(pool, when_max):
exists = _list_exists(received_max=when_max,
status=models.InstanceExists.PENDING)
count = exists.count()
for exist in exists:
exist.status = models.InstanceExists.VERIFYING
exist.save()
result = pool.apply_async(_verify, args=(exist,))
results.append(result)
return count
def clean_results():
global results
pending = []
finished = 0
successful = 0
for result in results:
if result.ready():
finished += 1
if result.successful():
successful += 1
else:
pending.append(result)
results = pending
errored = finished - successful
return len(results), successful, errored
def run(config):
pool = multiprocessing.Pool(config['pool_size'])
tick_time = config['tick_time']
settle_units = config['settle_units']
settle_time = config['settle_time']
while True:
now = datetime.datetime.utcnow()
kwargs = {settle_units: settle_time}
when_max = now - datetime.timedelta(**kwargs)
new = verify_for_range(pool, when_max)
LOG.info("N: %s, %s" % (new, "P: %s, S: %s, E: %s" % clean_results()))
sleep(tick_time)
def run_once(config):
pool = multiprocessing.Pool(config['pool_size'])
tick_time = config['tick_time']
settle_units = config['settle_units']
settle_time = config['settle_time']
now = datetime.datetime.utcnow()
kwargs = {settle_units: settle_time}
when_max = now - datetime.timedelta(**kwargs)
new = verify_for_range(pool, when_max)
LOG.info("Verifying %s exist events" % new)
while len(results) > 0:
LOG.info("P: %s, F: %s, E: %s" % clean_results())
sleep(tick_time)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description=
"Stacktach Instance Exists Verifier")
parser.add_argument('--tick-time',
help='Time in seconds the verifier will sleep before'
'it will check for new exists records.',
default=30)
parser.add_argument('--run-once',
help='Check database once and verify all returned'
'exists records, then stop',
type=bool,
default=False)
parser.add_argument('--settle-time',
help='Time the verifier will wait for records to'
'settle before it will verify them.',
default=10)
parser.add_argument('--settle-units',
help='Units for settle time',
default='minutes')
parser.add_argument('--pool-size',
help='Number of processes created to verify records',
type=int,
default=10)
args = parser.parse_args()
config = {'tick_time': args.tick_time, 'settle_time': args.settle_time,
'settle_units': args.settle_units, 'pool_size': args.pool_size}
if args.run_once:
run_once(config)
else:
run(config)