From 5b4031275295ac09bf22cf8ee7eee8c7743e4b88 Mon Sep 17 00:00:00 2001 From: Andrew Melton Date: Wed, 13 Mar 2013 17:26:15 -0400 Subject: [PATCH] Sending notification on verified exists --- etc/sample_stacktach_verifier_config.json | 16 +++ tests/unit/test_verifier_db.py | 167 +++++++++++++++++++++- verifier/dbverifier.py | 97 +++++++++++-- 3 files changed, 266 insertions(+), 14 deletions(-) create mode 100644 etc/sample_stacktach_verifier_config.json diff --git a/etc/sample_stacktach_verifier_config.json b/etc/sample_stacktach_verifier_config.json new file mode 100644 index 0000000..a017a65 --- /dev/null +++ b/etc/sample_stacktach_verifier_config.json @@ -0,0 +1,16 @@ +{ + "tick_time": 30, + "settle_time": 5, + "settle_units": "minutes", + "pool_size": 2, + "enable_notifications": true, + "rabbit": { + "durable_queue": false, + "host": "10.0.0.1", + "port": 5672, + "userid": "rabbit", + "password": "rabbit", + "virtual_host": "/", + "exchange_name": "stacktach" + } +} \ No newline at end of file diff --git a/tests/unit/test_verifier_db.py b/tests/unit/test_verifier_db.py index 588d971..529b4ad 100644 --- a/tests/unit/test_verifier_db.py +++ b/tests/unit/test_verifier_db.py @@ -5,7 +5,11 @@ import decimal import json import unittest +import kombu.common +import kombu.entity +import kombu.pools import mox +import multiprocessing from stacktach import datetime_to_decimal as dt from stacktach import models @@ -397,7 +401,7 @@ class VerifierTestCase(unittest.TestCase): dbverifier._verify(exist) self.mox.VerifyAll() - def test_verify_for_range(self): + def test_verify_for_range_without_callback(self): pool = self.mox.CreateMockAnything() when_max = datetime.datetime.utcnow() results = self.mox.CreateMockAnything() @@ -416,10 +420,167 @@ class VerifierTestCase(unittest.TestCase): results.__iter__().AndReturn([exist1, exist2].__iter__()) exist1.save() exist2.save() - pool.apply_async(dbverifier._verify, args=(exist1,)) - pool.apply_async(dbverifier._verify, args=(exist2,)) + pool.apply_async(dbverifier._verify, args=(exist1,), callback=None) + pool.apply_async(dbverifier._verify, args=(exist2,), callback=None) self.mox.ReplayAll() dbverifier.verify_for_range(pool, when_max) self.assertEqual(exist1.status, 'verifying') self.assertEqual(exist2.status, 'verifying') self.mox.VerifyAll() + + def test_verify_for_range_with_callback(self): + callback = self.mox.CreateMockAnything() + pool = self.mox.CreateMockAnything() + when_max = datetime.datetime.utcnow() + results = self.mox.CreateMockAnything() + models.InstanceExists.objects.select_related().AndReturn(results) + models.InstanceExists.PENDING = 'pending' + models.InstanceExists.VERIFYING = 'verifying' + filters = { + 'raw__when__lte': dt.dt_to_decimal(when_max), + 'status': 'pending' + } + results.filter(**filters).AndReturn(results) + results.order_by('id').AndReturn(results) + results.count().AndReturn(2) + exist1 = self.mox.CreateMockAnything() + exist2 = self.mox.CreateMockAnything() + results.__iter__().AndReturn([exist1, exist2].__iter__()) + exist1.save() + exist2.save() + pool.apply_async(dbverifier._verify, args=(exist1,), callback=callback) + pool.apply_async(dbverifier._verify, args=(exist2,), callback=callback) + self.mox.ReplayAll() + dbverifier.verify_for_range(pool, when_max, callback=callback) + self.assertEqual(exist1.status, 'verifying') + self.assertEqual(exist2.status, 'verifying') + self.mox.VerifyAll() + + def test_send_verified_notification(self): + connection = self.mox.CreateMockAnything() + exchange = self.mox.CreateMockAnything() + exist = self.mox.CreateMockAnything() + exist.raw = self.mox.CreateMockAnything() + exist_dict = ['monitor.info', {'event_type': 'test', 'key': 'value'}] + exist_str = json.dumps(exist_dict) + exist.raw.json = exist_str + self.mox.StubOutWithMock(kombu.pools, 'producers') + self.mox.StubOutWithMock(kombu.common, 'maybe_declare') + producer = self.mox.CreateMockAnything() + producer.channel = self.mox.CreateMockAnything() + kombu.pools.producers[connection].AndReturn(producer) + producer.acquire(block=True).AndReturn(producer) + producer.__enter__().AndReturn(producer) + kombu.common.maybe_declare(exchange, producer.channel) + message = {'event_type': 'compute.instance.exists.verified.old', + 'key': 'value'} + producer.publish(message, exist_dict[0]) + producer.__exit__(None, None, None) + self.mox.ReplayAll() + + dbverifier.send_verified_notification(exist, exchange, connection) + self.mox.VerifyAll() + + def test_run_notifications(self): + config = { + "tick_time": 30, + "settle_time": 5, + "settle_units": "minutes", + "pool_size": 2, + "enable_notifications": True, + "rabbit": { + "durable_queue": False, + "host": "10.0.0.1", + "port": 5672, + "userid": "rabbit", + "password": "rabbit", + "virtual_host": "/", + "exchange_name": "stacktach" + } + } + self.mox.StubOutWithMock(multiprocessing, 'Pool') + pool = self.mox.CreateMockAnything() + multiprocessing.Pool(2).AndReturn(pool) + self.mox.StubOutWithMock(dbverifier, '_create_exchange') + exchange = self.mox.CreateMockAnything() + dbverifier._create_exchange('stacktach', 'topic', durable=False)\ + .AndReturn(exchange) + self.mox.StubOutWithMock(dbverifier, '_create_connection') + conn = self.mox.CreateMockAnything() + dbverifier._create_connection(config).AndReturn(conn) + conn.__enter__().AndReturn(conn) + self.mox.StubOutWithMock(dbverifier, '_run') + dbverifier._run(config, pool, callback=mox.IgnoreArg()) + conn.__exit__(None, None, None) + self.mox.ReplayAll() + dbverifier.run(config) + self.mox.VerifyAll() + + def test_run_no_notifications(self): + config = { + "tick_time": 30, + "settle_time": 5, + "settle_units": "minutes", + "pool_size": 2, + "enable_notifications": False, + } + self.mox.StubOutWithMock(multiprocessing, 'Pool') + pool = self.mox.CreateMockAnything() + multiprocessing.Pool(2).AndReturn(pool) + self.mox.StubOutWithMock(dbverifier, '_run') + dbverifier._run(config, pool) + self.mox.ReplayAll() + dbverifier.run(config) + self.mox.VerifyAll() + + def test_run_once_notifications(self): + config = { + "tick_time": 30, + "settle_time": 5, + "settle_units": "minutes", + "pool_size": 2, + "enable_notifications": True, + "rabbit": { + "durable_queue": False, + "host": "10.0.0.1", + "port": 5672, + "userid": "rabbit", + "password": "rabbit", + "virtual_host": "/", + "exchange_name": "stacktach" + } + } + self.mox.StubOutWithMock(multiprocessing, 'Pool') + pool = self.mox.CreateMockAnything() + multiprocessing.Pool(2).AndReturn(pool) + self.mox.StubOutWithMock(dbverifier, '_create_exchange') + exchange = self.mox.CreateMockAnything() + dbverifier._create_exchange('stacktach', 'topic', durable=False) \ + .AndReturn(exchange) + self.mox.StubOutWithMock(dbverifier, '_create_connection') + conn = self.mox.CreateMockAnything() + dbverifier._create_connection(config).AndReturn(conn) + conn.__enter__().AndReturn(conn) + self.mox.StubOutWithMock(dbverifier, '_run_once') + dbverifier._run_once(config, pool, callback=mox.IgnoreArg()) + conn.__exit__(None, None, None) + self.mox.ReplayAll() + dbverifier.run_once(config) + self.mox.VerifyAll() + + def test_run_once_no_notifications(self): + config = { + "tick_time": 30, + "settle_time": 5, + "settle_units": "minutes", + "pool_size": 2, + "enable_notifications": False, + } + self.mox.StubOutWithMock(multiprocessing, 'Pool') + pool = self.mox.CreateMockAnything() + multiprocessing.Pool(2).AndReturn(pool) + self.mox.StubOutWithMock(dbverifier, '_run_once') + dbverifier._run_once(config, pool) + self.mox.ReplayAll() + dbverifier.run_once(config) + self.mox.VerifyAll() diff --git a/verifier/dbverifier.py b/verifier/dbverifier.py index 2035b68..1c84f83 100644 --- a/verifier/dbverifier.py +++ b/verifier/dbverifier.py @@ -2,11 +2,16 @@ import argparse import datetime +import json import logging import os import sys from time import sleep +from django.db import transaction +import kombu.common +import kombu.entity +import kombu.pools import multiprocessing POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]), @@ -175,6 +180,7 @@ def _verify_for_delete(exist): def _verify(exist): + verified = False try: if not exist.launched_at: raise VerificationException("Exists without a launched_at") @@ -182,6 +188,7 @@ def _verify(exist): _verify_for_launch(exist) _verify_for_delete(exist) + verified = True _mark_exist_verified(exist) except VerificationException: _mark_exists_failed(exist) @@ -189,18 +196,20 @@ def _verify(exist): _mark_exists_failed(exist) LOG.exception(e) + return verified, exist + results = [] -def verify_for_range(pool, when_max): +def verify_for_range(pool, when_max, callback=None): exists = _list_exists(received_max=when_max, status=models.InstanceExists.PENDING) count = exists.count() for exist in exists: exist.status = models.InstanceExists.VERIFYING exist.save() - result = pool.apply_async(_verify, args=(exist,)) + result = pool.apply_async(_verify, args=(exist,), callback=callback) results.append(result) return count @@ -226,32 +235,79 @@ def clean_results(): return len(results), successful, errored -def run(config): - pool = multiprocessing.Pool(config['pool_size']) +def _send_notification(message, routing_key, connection, exchange): + with kombu.pools.producers[connection].acquire(block=True) as producer: + kombu.common.maybe_declare(exchange, producer.channel) + producer.publish(message, routing_key) + +def send_verified_notification(exist, connection, exchange): + body = exist.raw.json + json_body = json.loads(body) + json_body[1]['event_type'] = 'compute.instance.exists.verified.old' + _send_notification(json_body[1], json_body[0], connection, exchange) + + +def _create_exchange(name, type, exclusive=False, auto_delete=False, + durable=True): + return kombu.entity.Exchange(name, type=type, exclusive=auto_delete, + auto_delete=exclusive, durable=durable) + + +def _create_connection(config): + rabbit = config['rabbit'] + conn_params = dict(hostname=rabbit['host'], + port=rabbit['port'], + userid=rabbit['userid'], + password=rabbit['password'], + transport="librabbitmq", + virtual_host=rabbit['virtual_host']) + return kombu.connection.BrokerConnection(**conn_params) + + +def _run(config, pool, callback=None): tick_time = config['tick_time'] settle_units = config['settle_units'] settle_time = config['settle_time'] while True: - now = datetime.datetime.utcnow() - kwargs = {settle_units: settle_time} - when_max = now - datetime.timedelta(**kwargs) - new = verify_for_range(pool, when_max) + with transaction.commit_on_success(): + now = datetime.datetime.utcnow() + kwargs = {settle_units: settle_time} + when_max = now - datetime.timedelta(**kwargs) + new = verify_for_range(pool, when_max, callback=callback) - LOG.info("N: %s, %s" % (new, "P: %s, S: %s, E: %s" % clean_results())) + msg = "N: %s, P: %s, S: %s, E: %s" % ((new,) + clean_results()) + LOG.info(msg) sleep(tick_time) -def run_once(config): +def run(config): pool = multiprocessing.Pool(config['pool_size']) + if config['enable_notifications']: + exchange = _create_exchange(config['rabbit']['exchange_name'], + 'topic', + durable=config['rabbit']['durable_queue']) + + with _create_connection(config) as conn: + def callback(result): + (verified, exist) = result + if verified: + send_verified_notification(exist, conn, exchange) + + _run(config, pool, callback=callback) + else: + _run(config, pool) + + +def _run_once(config, pool, callback=None): tick_time = config['tick_time'] settle_units = config['settle_units'] settle_time = config['settle_time'] now = datetime.datetime.utcnow() kwargs = {settle_units: settle_time} when_max = now - datetime.timedelta(**kwargs) - new = verify_for_range(pool, when_max) + new = verify_for_range(pool, when_max, callback=callback) LOG.info("Verifying %s exist events" % new) while len(results) > 0: @@ -259,6 +315,25 @@ def run_once(config): sleep(tick_time) +def run_once(config): + pool = multiprocessing.Pool(config['pool_size']) + + if config['enable_notifications']: + exchange = _create_exchange(config['rabbit']['exchange_name'], + 'topic', + durable=config['rabbit']['durable_queue']) + + with _create_connection(config) as conn: + def callback(result): + (verified, exist) = result + if verified: + send_verified_notification(exist, conn, exchange) + + _run_once(config, pool, callback=callback) + else: + _run_once(config, pool) + + if __name__ == '__main__': parser = argparse.ArgumentParser(description= "Stacktach Instance Exists Verifier")