# Copyright (c) 2012 - Rackspace Inc. # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to # deal in the Software without restriction, including without limitation the # rights to use, copy, modify, merge, publish, distribute, sublicense, and/or # sell copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING # FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS # IN THE SOFTWARE. import argparse import datetime import json import logging import os import sys from time import sleep import uuid from django.db import transaction import kombu.common import kombu.entity import kombu.pools import multiprocessing POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]), os.pardir, os.pardir)) if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'stacktach')): sys.path.insert(0, POSSIBLE_TOPDIR) from stacktach import models from stacktach import datetime_to_decimal as dt from verifier import AmbiguousResults from verifier import FieldMismatch from verifier import NotFound from verifier import VerificationException LOG = logging.getLogger(__name__) LOG.setLevel(logging.DEBUG) handler = logging.handlers.TimedRotatingFileHandler('verifier.log', when='h', interval=6, backupCount=4) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) LOG.addHandler(handler) LOG.handlers[0].doRollover() def _list_exists(ending_max=None, status=None): params = {} if ending_max: params['audit_period_ending__lte'] = dt.dt_to_decimal(ending_max) if status: params['status'] = status return models.InstanceExists.objects.select_related()\ .filter(**params).order_by('id') def _find_launch(instance, launched): start = launched - datetime.timedelta(microseconds=launched.microsecond) end = start + datetime.timedelta(microseconds=999999) params = {'instance': instance, 'launched_at__gte': dt.dt_to_decimal(start), 'launched_at__lte': dt.dt_to_decimal(end)} return models.InstanceUsage.objects.filter(**params) def _find_delete(instance, launched, deleted_max=None): start = launched - datetime.timedelta(microseconds=launched.microsecond) end = start + datetime.timedelta(microseconds=999999) params = {'instance': instance, 'launched_at__gte': dt.dt_to_decimal(start), 'launched_at__lte': dt.dt_to_decimal(end)} if deleted_max: params['deleted_at__lte'] = dt.dt_to_decimal(deleted_max) return models.InstanceDeletes.objects.filter(**params) def _mark_exist_verified(exist): exist.status = models.InstanceExists.VERIFIED exist.save() def _mark_exist_failed(exist, reason=None): exist.status = models.InstanceExists.FAILED if reason: exist.fail_reason = reason exist.save() def _has_field(d1, d2, field1, field2=None): if not field2: field2 = field1 return d1.get(field1) is not None and d2.get(field2) is not None def _verify_simple_field(d1, d2, field1, field2=None): if not field2: field2 = field1 if not _has_field(d1, d2, field1, field2): return False else: if d1[field1] != d2[field2]: return False return True def _verify_date_field(d1, d2, same_second=False): if d1 and d2: if d1 == d2: return True elif same_second and int(d1) == int(d2): return True return False def _verify_field_mismatch(exists, launch): if not _verify_date_field(launch.launched_at, exists.launched_at, same_second=True): raise FieldMismatch('launched_at', exists.launched_at, launch.launched_at) if launch.instance_type_id != exists.instance_type_id: raise FieldMismatch('instance_type_id', exists.instance_type_id, launch.instance_type_id) if launch.tenant != exists.tenant: raise FieldMismatch('tenant', exists.tenant, launch.tenant) def _verify_for_launch(exist): if exist.usage: launch = exist.usage else: if models.InstanceUsage.objects\ .filter(instance=exist.instance).count() > 0: launches = _find_launch(exist.instance, dt.dt_from_decimal(exist.launched_at)) count = launches.count() query = { 'instance': exist.instance, 'launched_at': exist.launched_at } if count > 1: raise AmbiguousResults('InstanceUsage', query) elif count == 0: raise NotFound('InstanceUsage', query) launch = launches[0] else: raise NotFound('InstanceUsage', {'instance': exist.instance}) _verify_field_mismatch(exist, launch) def _verify_for_delete(exist): delete = None if exist.delete: # We know we have a delete and we have it's id delete = exist.delete else: if exist.deleted_at: # We received this exists before the delete, go find it deletes = _find_delete(exist.instance, dt.dt_from_decimal(exist.launched_at)) if deletes.count() == 1: delete = deletes[0] else: query = { 'instance': exist.instance, 'launched_at': exist.launched_at } raise NotFound('InstanceDelete', query) else: # We don't know if this is supposed to have a delete or not. # Thus, we need to check if we have a delete for this instance. # We need to be careful though, since we could be verifying an # exist event that we got before the delete. So, we restrict the # search to only deletes before this exist's audit period ended. # If we find any, we fail validation launched_at = dt.dt_from_decimal(exist.launched_at) deleted_at_max = dt.dt_from_decimal(exist.audit_period_ending) deletes = _find_delete(exist.instance, launched_at, deleted_at_max) if deletes.count() > 0: reason = 'Found InstanceDeletes for non-delete exist' raise VerificationException(reason) if delete: if not _verify_date_field(delete.launched_at, exist.launched_at, same_second=True): raise FieldMismatch('launched_at', exist.launched_at, delete.launched_at) if not _verify_date_field(delete.deleted_at, exist.deleted_at, same_second=True): raise FieldMismatch('deleted_at', exist.deleted_at, delete.deleted_at) def _verify(exist): verified = False try: if not exist.launched_at: raise VerificationException("Exists without a launched_at") _verify_for_launch(exist) _verify_for_delete(exist) verified = True _mark_exist_verified(exist) except VerificationException, e: _mark_exist_failed(exist, reason=str(e)) except Exception, e: _mark_exist_failed(exist, reason=e.__class__.__name__) LOG.exception(e) return verified, exist results = [] def verify_for_range(pool, ending_max, callback=None): exists = _list_exists(ending_max=ending_max, status=models.InstanceExists.PENDING) count = exists.count() added = 0 update_interval = datetime.timedelta(seconds=30) next_update = datetime.datetime.utcnow() + update_interval LOG.info("Adding %s exists to queue." % count) while added < count: for exist in exists[0:1000]: exist.status = models.InstanceExists.VERIFYING exist.save() result = pool.apply_async(_verify, args=(exist,), callback=callback) results.append(result) added += 1 if datetime.datetime.utcnow() > next_update: values = ((added,) + clean_results()) msg = "N: %s, P: %s, S: %s, E: %s" % values LOG.info(msg) next_update = datetime.datetime.utcnow() + update_interval return count def clean_results(): global results pending = [] finished = 0 successful = 0 for result in results: if result.ready(): finished += 1 if result.successful(): successful += 1 else: pending.append(result) results = pending errored = finished - successful return len(results), successful, errored def _send_notification(message, routing_key, connection, exchange): with kombu.pools.producers[connection].acquire(block=True) as producer: kombu.common.maybe_declare(exchange, producer.channel) producer.publish(message, routing_key) def send_verified_notification(exist, connection, exchange, routing_keys=None): body = exist.raw.json json_body = json.loads(body) json_body[1]['event_type'] = 'compute.instance.exists.verified.old' json_body[1]['original_message_id'] = json_body[1]['message_id'] json_body[1]['message_id'] = str(uuid.uuid4()) if routing_keys is None: _send_notification(json_body[1], json_body[0], connection, exchange) else: for key in routing_keys: _send_notification(json_body[1], key, connection, exchange) def _create_exchange(name, type, exclusive=False, auto_delete=False, durable=True): return kombu.entity.Exchange(name, type=type, exclusive=auto_delete, auto_delete=exclusive, durable=durable) def _create_connection(config): rabbit = config['rabbit'] conn_params = dict(hostname=rabbit['host'], port=rabbit['port'], userid=rabbit['userid'], password=rabbit['password'], transport="librabbitmq", virtual_host=rabbit['virtual_host']) return kombu.connection.BrokerConnection(**conn_params) def _run(config, pool, callback=None): tick_time = config['tick_time'] settle_units = config['settle_units'] settle_time = config['settle_time'] while True: with transaction.commit_on_success(): now = datetime.datetime.utcnow() kwargs = {settle_units: settle_time} ending_max = now - datetime.timedelta(**kwargs) new = verify_for_range(pool, ending_max, callback=callback) msg = "N: %s, P: %s, S: %s, E: %s" % ((new,) + clean_results()) LOG.info(msg) sleep(tick_time) def run(config): pool = multiprocessing.Pool(config['pool_size']) if config['enable_notifications']: exchange = _create_exchange(config['rabbit']['exchange_name'], 'topic', durable=config['rabbit']['durable_queue']) routing_keys = None if config['rabbit'].get('routing_keys') is not None: routing_keys = config['rabbit']['routing_keys'] with _create_connection(config) as conn: def callback(result): (verified, exist) = result if verified: send_verified_notification(exist, conn, exchange, routing_keys=routing_keys) _run(config, pool, callback=callback) else: _run(config, pool) def _run_once(config, pool, callback=None): tick_time = config['tick_time'] settle_units = config['settle_units'] settle_time = config['settle_time'] now = datetime.datetime.utcnow() kwargs = {settle_units: settle_time} ending_max = now - datetime.timedelta(**kwargs) new = verify_for_range(pool, ending_max, callback=callback) LOG.info("Verifying %s exist events" % new) while len(results) > 0: LOG.info("P: %s, F: %s, E: %s" % clean_results()) sleep(tick_time) def run_once(config): pool = multiprocessing.Pool(config['pool_size']) if config['enable_notifications']: exchange = _create_exchange(config['rabbit']['exchange_name'], 'topic', durable=config['rabbit']['durable_queue']) routing_keys = None if config['rabbit'].get('routing_keys') is not None: routing_keys = config['rabbit']['routing_keys'] with _create_connection(config) as conn: def callback(result): (verified, exist) = result if verified: send_verified_notification(exist, conn, exchange, routing_keys=routing_keys) _run_once(config, pool, callback=callback) else: _run_once(config, pool) if __name__ == '__main__': parser = argparse.ArgumentParser(description= "Stacktach Instance Exists Verifier") parser.add_argument('--tick-time', help='Time in seconds the verifier will sleep before' 'it will check for new exists records.', default=30) parser.add_argument('--run-once', help='Check database once and verify all returned' 'exists records, then stop', type=bool, default=False) parser.add_argument('--settle-time', help='Time the verifier will wait for records to' 'settle before it will verify them.', default=10) parser.add_argument('--settle-units', help='Units for settle time', default='minutes') parser.add_argument('--pool-size', help='Number of processes created to verify records', type=int, default=10) args = parser.parse_args() config = {'tick_time': args.tick_time, 'settle_time': args.settle_time, 'settle_units': args.settle_units, 'pool_size': args.pool_size} if args.run_once: run_once(config) else: run(config)