Merge "Notabene pipeline handler"

This commit is contained in:
Jenkins 2015-02-10 22:29:26 +00:00 committed by Gerrit Code Review
commit 516c2b3d75
5 changed files with 394 additions and 8 deletions

View File

@ -8,3 +8,4 @@ enum34>=1.0
SQLAlchemy>=0.9.6
python-dateutil
requests
notabene>=0.0.dev0

249
tests/test_notabene.py Normal file
View File

@ -0,0 +1,249 @@
import unittest2 as unittest
import datetime
import mock
from winchester import pipeline_handler
class TestConnectionManager(unittest.TestCase):
def setUp(self):
super(TestConnectionManager, self).setUp()
self.mgr = pipeline_handler.ConnectionManager()
def test_extract_params(self):
with self.assertRaises(pipeline_handler.NotabeneException):
self.mgr._extract_params({})
cd, ct, ed, et = self.mgr._extract_params({'exchange': 'my_exchange'})
self.assertEquals(cd, {'host': 'localhost',
'port': 5672,
'user': 'guest',
'password': 'guest',
'library': 'librabbitmq',
'vhost': '/'})
self.assertEquals(ct, (('host', 'localhost'),
('library', 'librabbitmq'),
('password', 'guest'),
('port', 5672),
('user', 'guest'),
('vhost', '/')))
self.assertEquals(ed, {'exchange_name': 'my_exchange',
'exchange_type': 'topic'})
self.assertEquals(et, (('exchange_name', 'my_exchange'),
('exchange_type', 'topic')))
kw = {'host': 'my_host', 'user': 'my_user', 'password': 'pwd',
'port': 123, 'vhost': 'virtual', 'library': 'my_lib',
'exchange': 'my_exchange', 'exchange_type': 'foo'}
cd, ct, ed, et = self.mgr._extract_params(kw)
self.assertEquals(cd, {'host': 'my_host',
'port': 123,
'user': 'my_user',
'password': 'pwd',
'library': 'my_lib',
'vhost': 'virtual'})
self.assertEquals(ct, (('host', 'my_host'),
('library', 'my_lib'),
('password', 'pwd'),
('port', 123),
('user', 'my_user'),
('vhost', 'virtual')))
self.assertEquals(ed, {'exchange_name': 'my_exchange',
'exchange_type': 'foo'})
self.assertEquals(et, (('exchange_name', 'my_exchange'),
('exchange_type', 'foo')))
@mock.patch.object(pipeline_handler.ConnectionManager, '_extract_params')
@mock.patch.object(pipeline_handler.driver, 'create_connection')
@mock.patch.object(pipeline_handler.driver, 'create_exchange')
@mock.patch.object(pipeline_handler.driver, 'create_queue')
def test_get_connection(self, cq, ce, cc, ep):
conn = {'host': 'my_host', 'user': 'my_user', 'password': 'pwd',
'port': 123, 'vhost': 'virtual', 'library': 'my_lib'}
conn_set = tuple(sorted(conn.items()))
exchange = {'exchange_name': 'my_exchange', 'exchange_type': 'foo'}
exchange_set = tuple(sorted(exchange.items()))
ep.return_value = (conn, conn_set, exchange, exchange_set)
connection = mock.MagicMock("the connection")
channel = mock.MagicMock("the channel")
connection.channel = channel
cc.return_value = connection
mexchange = mock.MagicMock("the exchange")
ce.return_value = mexchange
queue = mock.MagicMock("the queue")
queue.declare = mock.MagicMock()
cq.return_value = queue
final_connection, final_exchange = self.mgr.get_connection({}, "foo")
self.assertEquals(final_connection, connection)
self.assertEquals(final_exchange, mexchange)
self.assertEquals(1, queue.declare.call_count)
# Calling again should give the same results ...
final_connection, final_exchange = self.mgr.get_connection({}, "foo")
self.assertEquals(final_connection, connection)
self.assertEquals(final_exchange, mexchange)
self.assertTrue(queue.declare.called)
self.assertEquals(1, queue.declare.call_count)
# Change the exchange, and we should have same connection, but new
# exchange object.
exchange2 = {'exchange_name': 'my_exchange2', 'exchange_type': 'foo2'}
exchange2_set = tuple(sorted(exchange2.items()))
ep.return_value = (conn, conn_set, exchange2, exchange2_set)
mexchange2 = mock.MagicMock("the exchange 2")
ce.return_value = mexchange2
final_connection, final_exchange = self.mgr.get_connection({}, "foo")
self.assertEquals(final_connection, connection)
self.assertEquals(final_exchange, mexchange2)
self.assertEquals(2, queue.declare.call_count)
# Change the connection, and we should have a new connection and new
# exchange object.
conn2 = {'host': 'my_host2', 'user': 'my_user2', 'password': 'pwd2',
'port': 1234, 'vhost': 'virtual2', 'library': 'my_lib2'}
conn2_set = tuple(sorted(conn2.items()))
exchange3= {'exchange_name': 'my_exchange', 'exchange_type': 'foo'}
exchange3_set = tuple(sorted(exchange3.items()))
ep.return_value = (conn2, conn2_set, exchange3, exchange3_set)
mexchange3 = mock.MagicMock("the exchange 3")
ce.return_value = mexchange3
connection2 = mock.MagicMock("the connection 2")
channel2 = mock.MagicMock("the channel 2")
connection2.channel = channel2
cc.return_value = connection2
final_connection, final_exchange = self.mgr.get_connection({}, "foo")
self.assertEquals(final_connection, connection2)
self.assertEquals(final_exchange, mexchange3)
self.assertEquals(3, queue.declare.call_count)
class TestException(Exception):
pass
class TestNotabeneHandler(unittest.TestCase):
def test_constructor_no_queue(self):
with self.assertRaises(pipeline_handler.NotabeneException) as e:
pipeline_handler.NotabeneHandler()
@mock.patch.object(pipeline_handler.connection_manager, 'get_connection')
def test_constructor_queue(self, cm):
cm.return_value = (1, 2)
kw = {'queue_name': 'foo'}
h = pipeline_handler.NotabeneHandler(**kw)
self.assertIsNotNone(h.connection)
self.assertIsNotNone(h.exchange)
self.assertEquals(h.env_keys, [])
@mock.patch.object(pipeline_handler.connection_manager, 'get_connection')
def test_constructor_env_keys(self, cm):
cm.return_value = (1, 2)
kw = {'queue_name': 'foo', 'env_keys': ['x', 'y']}
h = pipeline_handler.NotabeneHandler(**kw)
self.assertIsNotNone(h.connection)
self.assertIsNotNone(h.exchange)
self.assertEquals(h.env_keys, ['x', 'y'])
@mock.patch.object(pipeline_handler.connection_manager, 'get_connection')
def test_handle_events(self, cm):
cm.return_value = (1, 2)
kw = {'queue_name': 'foo', 'env_keys': ['x', 'y', 'z']}
h = pipeline_handler.NotabeneHandler(**kw)
events = range(5)
env = {'x': ['cat', 'dog'], 'y': ['fish']}
ret = h.handle_events(events, env)
self.assertEquals(ret, events)
self.assertEquals(h.pending_notifications, ['cat', 'dog', 'fish'])
@mock.patch.object(pipeline_handler.connection_manager, 'get_connection')
def test_format_notification(self, cm):
cm.return_value = (1, 2)
kw = {'queue_name': 'foo'}
h = pipeline_handler.NotabeneHandler(**kw)
notification = {}
n = h._format_notification(notification)
self.assertEquals(n, {'event_type': None,
'message_id': None,
'publisher_id': 'stv3',
'timestamp': 'None',
'payload': {}})
now = datetime.datetime.utcnow()
notification = {'event_type': 'name',
'message_id': '1234',
'timestamp': now,
'service': 'tests'}
n = h._format_notification(notification)
self.assertEquals(n, {'event_type': 'name',
'message_id': '1234',
'timestamp': str(now),
'publisher_id': 'tests',
'payload': {}})
notification = {'event_type': 'name',
'message_id': '1234',
'timestamp': now,
'service': 'tests',
'extra1': 'e1', 'extra2': 'e2'}
n = h._format_notification(notification)
self.assertEquals(n, {'event_type': 'name',
'message_id': '1234',
'timestamp': str(now),
'publisher_id': 'tests',
'payload': {'extra1': 'e1', 'extra2': 'e2'}})
@mock.patch.object(pipeline_handler.connection_manager, 'get_connection')
def test_commit(self, cm):
cm.return_value = (1, 2)
kw = {'queue_name': 'foo'}
h = pipeline_handler.NotabeneHandler(**kw)
h.pending_notifications = range(2)
with mock.patch.object(h, '_format_notification') as fn:
fn.return_value = {'event_type': 'event1'}
with mock.patch.object(pipeline_handler.driver,
'send_notification') as sn:
h.commit()
self.assertEquals(sn.call_count, 2)
@mock.patch.object(pipeline_handler.connection_manager, 'get_connection')
def test_commit(self, cm):
cm.return_value = (1, 2)
kw = {'queue_name': 'foo'}
h = pipeline_handler.NotabeneHandler(**kw)
h.pending_notifications = range(2)
with mock.patch.object(h, '_format_notification') as fn:
fn.return_value = {'event_type': 'event1'}
with mock.patch.object(pipeline_handler.driver,
'send_notification') as sn:
sn.side_effect = TestException
with mock.patch.object(pipeline_handler.logger,
'exception') as ex:
h.commit()
self.assertEquals(ex.call_count, 2)
self.assertEquals(sn.call_count, 2)

View File

@ -285,9 +285,11 @@ class TestUsageHandler(unittest.TestCase):
env = {'stream_id': 123}
raw = [{'event_type': 'foo'}]
events = self.handler.handle_events(raw, env)
self.assertEquals(2, len(events))
self.assertEquals(1, len(events))
notifications = env['usage_notifications']
self.assertEquals(1, len(notifications))
self.assertEquals("compute.instance.exists.failed",
events[1]['event_type'])
notifications[0]['event_type'])
@mock.patch.object(pipeline_handler.UsageHandler, '_process_block')
def test_handle_events_exists(self, pb):
@ -306,7 +308,9 @@ class TestUsageHandler(unittest.TestCase):
{'event_type': 'foo'},
]
events = self.handler.handle_events(raw, env)
self.assertEquals(4, len(events))
self.assertEquals(3, len(events))
notifications = env['usage_notifications']
self.assertEquals(1, len(notifications))
self.assertEquals("compute.instance.exists.failed",
events[3]['event_type'])
notifications[0]['event_type'])
self.assertTrue(pb.called)

View File

@ -4,6 +4,8 @@ import logging
import six
import uuid
from notabene import kombu_driver as driver
logger = logging.getLogger(__name__)
@ -92,6 +94,136 @@ class LoggingHandler(PipelineHandlerBase):
pass
class NotabeneException(Exception):
pass
class ConnectionManager(object):
def __init__(self):
# {connection_properties:
# {exchange_properties: (connection, exchange)}}
self.pool = {}
def _extract_params(self, kw):
host = kw.get('host', 'localhost')
user = kw.get('user', 'guest')
password = kw.get('password', 'guest')
port = kw.get('port', 5672)
vhost = kw.get('vhost', '/')
library = kw.get('library', 'librabbitmq')
exchange_name = kw.get('exchange')
exchange_type = kw.get('exchange_type', 'topic')
if exchange_name is None:
raise NotabeneException("No 'exchange' name provided")
connection_dict = {'host': host, 'port': port,
'user': user, 'password': password,
'library': library, 'vhost': vhost}
connection_tuple = tuple(sorted(connection_dict.items()))
exchange_dict = {'exchange_name': exchange_name,
'exchange_type': exchange_type}
exchange_tuple = tuple(sorted(exchange_dict.items()))
return (connection_dict, connection_tuple,
exchange_dict, exchange_tuple)
def get_connection(self, properties, queue_name):
connection_dict, connection_tuple, \
exchange_dict, exchange_tuple = self._extract_params(properties)
connection_info = self.pool.get(connection_tuple)
if connection_info is None:
connection = driver.create_connection(connection_dict['host'],
connection_dict['port'],
connection_dict['user'],
connection_dict['password'],
connection_dict['library'],
connection_dict['vhost'])
connection_info = (connection, {})
self.pool[connection_tuple] = connection_info
connection, exchange_pool = connection_info
exchange = exchange_pool.get(exchange_tuple)
if exchange is None:
exchange = driver.create_exchange(exchange_dict['exchange_name'],
exchange_dict['exchange_type'])
exchange_pool[exchange_tuple] = exchange
# Make sure the queue exists so we don't lose events.
queue = driver.create_queue(queue_name, exchange, queue_name,
channel=connection.channel())
queue.declare()
return (connection, exchange)
# Global ConnectionManager. Shared by all Handlers.
connection_manager = ConnectionManager()
class NotabeneHandler(PipelineHandlerBase):
# Handlers are created per stream, so we have to be smart about
# things like connections to databases and queues.
# We don't want to create too many connections, and we have to
# remember that stream processing has to occur quickly, so
# we want to avoid round-trips where possible.
def __init__(self, **kw):
super(NotabeneHandler, self).__init__(**kw)
global connection_manager
self.queue_name = kw.get('queue_name')
if self.queue_name is None:
raise NotabeneException("No 'queue_name' provided")
self.connection, self.exchange = connection_manager.get_connection(
kw, self.queue_name)
self.env_keys = kw.get('env_keys', [])
def handle_events(self, events, env):
keys = [key for key in self.env_keys]
self.pending_notifications = []
for key in keys:
self.pending_notifications.extend(env.get(key, []))
return events
def _format_notification(self, notification):
"""Core traits are in the root of the notification and extra
traits go in the payload."""
core_keys = ['event_type', 'message_id', 'timestamp', 'service']
core = dict((key, notification.get(key)) for key in core_keys)
payload = dict((key, notification[key])
for key in notification.keys()
if key not in core_keys)
core['payload'] = payload
# Notifications require "publisher_id", not "service" ...
publisher = core.get('service')
if not publisher:
publisher = "stv3"
core['publisher_id'] = publisher
del core['service']
core['timestamp'] = str(core['timestamp'])
return core
def commit(self):
for notification in self.pending_notifications:
notification = self._format_notification(notification)
logger.debug("Publishing '%s' to '%s' with routing_key '%s'" %
(notification['event_type'], self.exchange,
self.queue_name))
try:
driver.send_notification(notification, self.queue_name,
self.connection, self.exchange)
except Exception as e:
logger.exception(e)
def rollback(self):
pass
class UsageException(Exception):
def __init__(self, code, message):
super(UsageException, self).__init__(message)
@ -309,7 +441,7 @@ class UsageHandler(PipelineHandlerBase):
}
new_events.append(new_event)
events.extend(new_events)
env['usage_notifications'] = new_events
return events
def commit(self):

View File

@ -55,7 +55,7 @@ class Pipeline(object):
handler = handler_class(**params)
except Exception as e:
logger.exception("Error initalizing handler %s for pipeline %s" %
handler_class, self.name)
(handler_class, self.name))
raise PipelineExecutionError("Error loading pipeline", e)
self.handlers.append(handler)
@ -186,8 +186,8 @@ class PipelineManager(object):
try:
plugins[name] = simport.load(cls_string)
except simport.ImportFailed as e:
log.error("Could not load plugin %s: Import failed. %s" % (
name, e))
logger.error("Could not load plugin %s: Import failed. %s" % (
name, e))
except (simport.MissingMethodOrFunction,
simport.MissingModule,
simport.BadDirectory) as e: