Sending notification on verified exists
This commit is contained in:
parent
1d8ee6d016
commit
5b40312752
16
etc/sample_stacktach_verifier_config.json
Normal file
16
etc/sample_stacktach_verifier_config.json
Normal file
@ -0,0 +1,16 @@
|
||||
{
|
||||
"tick_time": 30,
|
||||
"settle_time": 5,
|
||||
"settle_units": "minutes",
|
||||
"pool_size": 2,
|
||||
"enable_notifications": true,
|
||||
"rabbit": {
|
||||
"durable_queue": false,
|
||||
"host": "10.0.0.1",
|
||||
"port": 5672,
|
||||
"userid": "rabbit",
|
||||
"password": "rabbit",
|
||||
"virtual_host": "/",
|
||||
"exchange_name": "stacktach"
|
||||
}
|
||||
}
|
@ -5,7 +5,11 @@ import decimal
|
||||
import json
|
||||
import unittest
|
||||
|
||||
import kombu.common
|
||||
import kombu.entity
|
||||
import kombu.pools
|
||||
import mox
|
||||
import multiprocessing
|
||||
|
||||
from stacktach import datetime_to_decimal as dt
|
||||
from stacktach import models
|
||||
@ -397,7 +401,7 @@ class VerifierTestCase(unittest.TestCase):
|
||||
dbverifier._verify(exist)
|
||||
self.mox.VerifyAll()
|
||||
|
||||
def test_verify_for_range(self):
|
||||
def test_verify_for_range_without_callback(self):
|
||||
pool = self.mox.CreateMockAnything()
|
||||
when_max = datetime.datetime.utcnow()
|
||||
results = self.mox.CreateMockAnything()
|
||||
@ -416,10 +420,167 @@ class VerifierTestCase(unittest.TestCase):
|
||||
results.__iter__().AndReturn([exist1, exist2].__iter__())
|
||||
exist1.save()
|
||||
exist2.save()
|
||||
pool.apply_async(dbverifier._verify, args=(exist1,))
|
||||
pool.apply_async(dbverifier._verify, args=(exist2,))
|
||||
pool.apply_async(dbverifier._verify, args=(exist1,), callback=None)
|
||||
pool.apply_async(dbverifier._verify, args=(exist2,), callback=None)
|
||||
self.mox.ReplayAll()
|
||||
dbverifier.verify_for_range(pool, when_max)
|
||||
self.assertEqual(exist1.status, 'verifying')
|
||||
self.assertEqual(exist2.status, 'verifying')
|
||||
self.mox.VerifyAll()
|
||||
|
||||
def test_verify_for_range_with_callback(self):
|
||||
callback = self.mox.CreateMockAnything()
|
||||
pool = self.mox.CreateMockAnything()
|
||||
when_max = datetime.datetime.utcnow()
|
||||
results = self.mox.CreateMockAnything()
|
||||
models.InstanceExists.objects.select_related().AndReturn(results)
|
||||
models.InstanceExists.PENDING = 'pending'
|
||||
models.InstanceExists.VERIFYING = 'verifying'
|
||||
filters = {
|
||||
'raw__when__lte': dt.dt_to_decimal(when_max),
|
||||
'status': 'pending'
|
||||
}
|
||||
results.filter(**filters).AndReturn(results)
|
||||
results.order_by('id').AndReturn(results)
|
||||
results.count().AndReturn(2)
|
||||
exist1 = self.mox.CreateMockAnything()
|
||||
exist2 = self.mox.CreateMockAnything()
|
||||
results.__iter__().AndReturn([exist1, exist2].__iter__())
|
||||
exist1.save()
|
||||
exist2.save()
|
||||
pool.apply_async(dbverifier._verify, args=(exist1,), callback=callback)
|
||||
pool.apply_async(dbverifier._verify, args=(exist2,), callback=callback)
|
||||
self.mox.ReplayAll()
|
||||
dbverifier.verify_for_range(pool, when_max, callback=callback)
|
||||
self.assertEqual(exist1.status, 'verifying')
|
||||
self.assertEqual(exist2.status, 'verifying')
|
||||
self.mox.VerifyAll()
|
||||
|
||||
def test_send_verified_notification(self):
|
||||
connection = self.mox.CreateMockAnything()
|
||||
exchange = self.mox.CreateMockAnything()
|
||||
exist = self.mox.CreateMockAnything()
|
||||
exist.raw = self.mox.CreateMockAnything()
|
||||
exist_dict = ['monitor.info', {'event_type': 'test', 'key': 'value'}]
|
||||
exist_str = json.dumps(exist_dict)
|
||||
exist.raw.json = exist_str
|
||||
self.mox.StubOutWithMock(kombu.pools, 'producers')
|
||||
self.mox.StubOutWithMock(kombu.common, 'maybe_declare')
|
||||
producer = self.mox.CreateMockAnything()
|
||||
producer.channel = self.mox.CreateMockAnything()
|
||||
kombu.pools.producers[connection].AndReturn(producer)
|
||||
producer.acquire(block=True).AndReturn(producer)
|
||||
producer.__enter__().AndReturn(producer)
|
||||
kombu.common.maybe_declare(exchange, producer.channel)
|
||||
message = {'event_type': 'compute.instance.exists.verified.old',
|
||||
'key': 'value'}
|
||||
producer.publish(message, exist_dict[0])
|
||||
producer.__exit__(None, None, None)
|
||||
self.mox.ReplayAll()
|
||||
|
||||
dbverifier.send_verified_notification(exist, exchange, connection)
|
||||
self.mox.VerifyAll()
|
||||
|
||||
def test_run_notifications(self):
|
||||
config = {
|
||||
"tick_time": 30,
|
||||
"settle_time": 5,
|
||||
"settle_units": "minutes",
|
||||
"pool_size": 2,
|
||||
"enable_notifications": True,
|
||||
"rabbit": {
|
||||
"durable_queue": False,
|
||||
"host": "10.0.0.1",
|
||||
"port": 5672,
|
||||
"userid": "rabbit",
|
||||
"password": "rabbit",
|
||||
"virtual_host": "/",
|
||||
"exchange_name": "stacktach"
|
||||
}
|
||||
}
|
||||
self.mox.StubOutWithMock(multiprocessing, 'Pool')
|
||||
pool = self.mox.CreateMockAnything()
|
||||
multiprocessing.Pool(2).AndReturn(pool)
|
||||
self.mox.StubOutWithMock(dbverifier, '_create_exchange')
|
||||
exchange = self.mox.CreateMockAnything()
|
||||
dbverifier._create_exchange('stacktach', 'topic', durable=False)\
|
||||
.AndReturn(exchange)
|
||||
self.mox.StubOutWithMock(dbverifier, '_create_connection')
|
||||
conn = self.mox.CreateMockAnything()
|
||||
dbverifier._create_connection(config).AndReturn(conn)
|
||||
conn.__enter__().AndReturn(conn)
|
||||
self.mox.StubOutWithMock(dbverifier, '_run')
|
||||
dbverifier._run(config, pool, callback=mox.IgnoreArg())
|
||||
conn.__exit__(None, None, None)
|
||||
self.mox.ReplayAll()
|
||||
dbverifier.run(config)
|
||||
self.mox.VerifyAll()
|
||||
|
||||
def test_run_no_notifications(self):
|
||||
config = {
|
||||
"tick_time": 30,
|
||||
"settle_time": 5,
|
||||
"settle_units": "minutes",
|
||||
"pool_size": 2,
|
||||
"enable_notifications": False,
|
||||
}
|
||||
self.mox.StubOutWithMock(multiprocessing, 'Pool')
|
||||
pool = self.mox.CreateMockAnything()
|
||||
multiprocessing.Pool(2).AndReturn(pool)
|
||||
self.mox.StubOutWithMock(dbverifier, '_run')
|
||||
dbverifier._run(config, pool)
|
||||
self.mox.ReplayAll()
|
||||
dbverifier.run(config)
|
||||
self.mox.VerifyAll()
|
||||
|
||||
def test_run_once_notifications(self):
|
||||
config = {
|
||||
"tick_time": 30,
|
||||
"settle_time": 5,
|
||||
"settle_units": "minutes",
|
||||
"pool_size": 2,
|
||||
"enable_notifications": True,
|
||||
"rabbit": {
|
||||
"durable_queue": False,
|
||||
"host": "10.0.0.1",
|
||||
"port": 5672,
|
||||
"userid": "rabbit",
|
||||
"password": "rabbit",
|
||||
"virtual_host": "/",
|
||||
"exchange_name": "stacktach"
|
||||
}
|
||||
}
|
||||
self.mox.StubOutWithMock(multiprocessing, 'Pool')
|
||||
pool = self.mox.CreateMockAnything()
|
||||
multiprocessing.Pool(2).AndReturn(pool)
|
||||
self.mox.StubOutWithMock(dbverifier, '_create_exchange')
|
||||
exchange = self.mox.CreateMockAnything()
|
||||
dbverifier._create_exchange('stacktach', 'topic', durable=False) \
|
||||
.AndReturn(exchange)
|
||||
self.mox.StubOutWithMock(dbverifier, '_create_connection')
|
||||
conn = self.mox.CreateMockAnything()
|
||||
dbverifier._create_connection(config).AndReturn(conn)
|
||||
conn.__enter__().AndReturn(conn)
|
||||
self.mox.StubOutWithMock(dbverifier, '_run_once')
|
||||
dbverifier._run_once(config, pool, callback=mox.IgnoreArg())
|
||||
conn.__exit__(None, None, None)
|
||||
self.mox.ReplayAll()
|
||||
dbverifier.run_once(config)
|
||||
self.mox.VerifyAll()
|
||||
|
||||
def test_run_once_no_notifications(self):
|
||||
config = {
|
||||
"tick_time": 30,
|
||||
"settle_time": 5,
|
||||
"settle_units": "minutes",
|
||||
"pool_size": 2,
|
||||
"enable_notifications": False,
|
||||
}
|
||||
self.mox.StubOutWithMock(multiprocessing, 'Pool')
|
||||
pool = self.mox.CreateMockAnything()
|
||||
multiprocessing.Pool(2).AndReturn(pool)
|
||||
self.mox.StubOutWithMock(dbverifier, '_run_once')
|
||||
dbverifier._run_once(config, pool)
|
||||
self.mox.ReplayAll()
|
||||
dbverifier.run_once(config)
|
||||
self.mox.VerifyAll()
|
||||
|
@ -2,11 +2,16 @@
|
||||
|
||||
import argparse
|
||||
import datetime
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
from time import sleep
|
||||
|
||||
from django.db import transaction
|
||||
import kombu.common
|
||||
import kombu.entity
|
||||
import kombu.pools
|
||||
import multiprocessing
|
||||
|
||||
POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
|
||||
@ -175,6 +180,7 @@ def _verify_for_delete(exist):
|
||||
|
||||
|
||||
def _verify(exist):
|
||||
verified = False
|
||||
try:
|
||||
if not exist.launched_at:
|
||||
raise VerificationException("Exists without a launched_at")
|
||||
@ -182,6 +188,7 @@ def _verify(exist):
|
||||
_verify_for_launch(exist)
|
||||
_verify_for_delete(exist)
|
||||
|
||||
verified = True
|
||||
_mark_exist_verified(exist)
|
||||
except VerificationException:
|
||||
_mark_exists_failed(exist)
|
||||
@ -189,18 +196,20 @@ def _verify(exist):
|
||||
_mark_exists_failed(exist)
|
||||
LOG.exception(e)
|
||||
|
||||
return verified, exist
|
||||
|
||||
|
||||
results = []
|
||||
|
||||
|
||||
def verify_for_range(pool, when_max):
|
||||
def verify_for_range(pool, when_max, callback=None):
|
||||
exists = _list_exists(received_max=when_max,
|
||||
status=models.InstanceExists.PENDING)
|
||||
count = exists.count()
|
||||
for exist in exists:
|
||||
exist.status = models.InstanceExists.VERIFYING
|
||||
exist.save()
|
||||
result = pool.apply_async(_verify, args=(exist,))
|
||||
result = pool.apply_async(_verify, args=(exist,), callback=callback)
|
||||
results.append(result)
|
||||
|
||||
return count
|
||||
@ -226,32 +235,79 @@ def clean_results():
|
||||
return len(results), successful, errored
|
||||
|
||||
|
||||
def run(config):
|
||||
pool = multiprocessing.Pool(config['pool_size'])
|
||||
def _send_notification(message, routing_key, connection, exchange):
|
||||
with kombu.pools.producers[connection].acquire(block=True) as producer:
|
||||
kombu.common.maybe_declare(exchange, producer.channel)
|
||||
producer.publish(message, routing_key)
|
||||
|
||||
|
||||
def send_verified_notification(exist, connection, exchange):
|
||||
body = exist.raw.json
|
||||
json_body = json.loads(body)
|
||||
json_body[1]['event_type'] = 'compute.instance.exists.verified.old'
|
||||
_send_notification(json_body[1], json_body[0], connection, exchange)
|
||||
|
||||
|
||||
def _create_exchange(name, type, exclusive=False, auto_delete=False,
|
||||
durable=True):
|
||||
return kombu.entity.Exchange(name, type=type, exclusive=auto_delete,
|
||||
auto_delete=exclusive, durable=durable)
|
||||
|
||||
|
||||
def _create_connection(config):
|
||||
rabbit = config['rabbit']
|
||||
conn_params = dict(hostname=rabbit['host'],
|
||||
port=rabbit['port'],
|
||||
userid=rabbit['userid'],
|
||||
password=rabbit['password'],
|
||||
transport="librabbitmq",
|
||||
virtual_host=rabbit['virtual_host'])
|
||||
return kombu.connection.BrokerConnection(**conn_params)
|
||||
|
||||
|
||||
def _run(config, pool, callback=None):
|
||||
tick_time = config['tick_time']
|
||||
settle_units = config['settle_units']
|
||||
settle_time = config['settle_time']
|
||||
while True:
|
||||
now = datetime.datetime.utcnow()
|
||||
kwargs = {settle_units: settle_time}
|
||||
when_max = now - datetime.timedelta(**kwargs)
|
||||
new = verify_for_range(pool, when_max)
|
||||
with transaction.commit_on_success():
|
||||
now = datetime.datetime.utcnow()
|
||||
kwargs = {settle_units: settle_time}
|
||||
when_max = now - datetime.timedelta(**kwargs)
|
||||
new = verify_for_range(pool, when_max, callback=callback)
|
||||
|
||||
LOG.info("N: %s, %s" % (new, "P: %s, S: %s, E: %s" % clean_results()))
|
||||
msg = "N: %s, P: %s, S: %s, E: %s" % ((new,) + clean_results())
|
||||
LOG.info(msg)
|
||||
sleep(tick_time)
|
||||
|
||||
|
||||
def run_once(config):
|
||||
def run(config):
|
||||
pool = multiprocessing.Pool(config['pool_size'])
|
||||
|
||||
if config['enable_notifications']:
|
||||
exchange = _create_exchange(config['rabbit']['exchange_name'],
|
||||
'topic',
|
||||
durable=config['rabbit']['durable_queue'])
|
||||
|
||||
with _create_connection(config) as conn:
|
||||
def callback(result):
|
||||
(verified, exist) = result
|
||||
if verified:
|
||||
send_verified_notification(exist, conn, exchange)
|
||||
|
||||
_run(config, pool, callback=callback)
|
||||
else:
|
||||
_run(config, pool)
|
||||
|
||||
|
||||
def _run_once(config, pool, callback=None):
|
||||
tick_time = config['tick_time']
|
||||
settle_units = config['settle_units']
|
||||
settle_time = config['settle_time']
|
||||
now = datetime.datetime.utcnow()
|
||||
kwargs = {settle_units: settle_time}
|
||||
when_max = now - datetime.timedelta(**kwargs)
|
||||
new = verify_for_range(pool, when_max)
|
||||
new = verify_for_range(pool, when_max, callback=callback)
|
||||
|
||||
LOG.info("Verifying %s exist events" % new)
|
||||
while len(results) > 0:
|
||||
@ -259,6 +315,25 @@ def run_once(config):
|
||||
sleep(tick_time)
|
||||
|
||||
|
||||
def run_once(config):
|
||||
pool = multiprocessing.Pool(config['pool_size'])
|
||||
|
||||
if config['enable_notifications']:
|
||||
exchange = _create_exchange(config['rabbit']['exchange_name'],
|
||||
'topic',
|
||||
durable=config['rabbit']['durable_queue'])
|
||||
|
||||
with _create_connection(config) as conn:
|
||||
def callback(result):
|
||||
(verified, exist) = result
|
||||
if verified:
|
||||
send_verified_notification(exist, conn, exchange)
|
||||
|
||||
_run_once(config, pool, callback=callback)
|
||||
else:
|
||||
_run_once(config, pool)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = argparse.ArgumentParser(description=
|
||||
"Stacktach Instance Exists Verifier")
|
||||
|
Loading…
x
Reference in New Issue
Block a user