diff --git a/requirements.txt b/requirements.txt index f59e5d1..25a62b1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,3 +8,4 @@ enum34>=1.0 SQLAlchemy>=0.9.6 python-dateutil requests +notabene>=0.0.dev0 diff --git a/tests/test_notabene.py b/tests/test_notabene.py new file mode 100644 index 0000000..422f66f --- /dev/null +++ b/tests/test_notabene.py @@ -0,0 +1,249 @@ +import unittest2 as unittest + +import datetime +import mock + +from winchester import pipeline_handler + + +class TestConnectionManager(unittest.TestCase): + def setUp(self): + super(TestConnectionManager, self).setUp() + self.mgr = pipeline_handler.ConnectionManager() + + def test_extract_params(self): + with self.assertRaises(pipeline_handler.NotabeneException): + self.mgr._extract_params({}) + + cd, ct, ed, et = self.mgr._extract_params({'exchange': 'my_exchange'}) + + self.assertEquals(cd, {'host': 'localhost', + 'port': 5672, + 'user': 'guest', + 'password': 'guest', + 'library': 'librabbitmq', + 'vhost': '/'}) + + self.assertEquals(ct, (('host', 'localhost'), + ('library', 'librabbitmq'), + ('password', 'guest'), + ('port', 5672), + ('user', 'guest'), + ('vhost', '/'))) + + self.assertEquals(ed, {'exchange_name': 'my_exchange', + 'exchange_type': 'topic'}) + + self.assertEquals(et, (('exchange_name', 'my_exchange'), + ('exchange_type', 'topic'))) + + + kw = {'host': 'my_host', 'user': 'my_user', 'password': 'pwd', + 'port': 123, 'vhost': 'virtual', 'library': 'my_lib', + 'exchange': 'my_exchange', 'exchange_type': 'foo'} + + cd, ct, ed, et = self.mgr._extract_params(kw) + + self.assertEquals(cd, {'host': 'my_host', + 'port': 123, + 'user': 'my_user', + 'password': 'pwd', + 'library': 'my_lib', + 'vhost': 'virtual'}) + + self.assertEquals(ct, (('host', 'my_host'), + ('library', 'my_lib'), + ('password', 'pwd'), + ('port', 123), + ('user', 'my_user'), + ('vhost', 'virtual'))) + + self.assertEquals(ed, {'exchange_name': 'my_exchange', + 'exchange_type': 'foo'}) + + self.assertEquals(et, (('exchange_name', 'my_exchange'), + ('exchange_type', 'foo'))) + + + @mock.patch.object(pipeline_handler.ConnectionManager, '_extract_params') + @mock.patch.object(pipeline_handler.driver, 'create_connection') + @mock.patch.object(pipeline_handler.driver, 'create_exchange') + @mock.patch.object(pipeline_handler.driver, 'create_queue') + def test_get_connection(self, cq, ce, cc, ep): + conn = {'host': 'my_host', 'user': 'my_user', 'password': 'pwd', + 'port': 123, 'vhost': 'virtual', 'library': 'my_lib'} + conn_set = tuple(sorted(conn.items())) + exchange = {'exchange_name': 'my_exchange', 'exchange_type': 'foo'} + exchange_set = tuple(sorted(exchange.items())) + + ep.return_value = (conn, conn_set, exchange, exchange_set) + connection = mock.MagicMock("the connection") + channel = mock.MagicMock("the channel") + connection.channel = channel + cc.return_value = connection + mexchange = mock.MagicMock("the exchange") + ce.return_value = mexchange + queue = mock.MagicMock("the queue") + queue.declare = mock.MagicMock() + cq.return_value = queue + + final_connection, final_exchange = self.mgr.get_connection({}, "foo") + + self.assertEquals(final_connection, connection) + self.assertEquals(final_exchange, mexchange) + self.assertEquals(1, queue.declare.call_count) + + # Calling again should give the same results ... + final_connection, final_exchange = self.mgr.get_connection({}, "foo") + + self.assertEquals(final_connection, connection) + self.assertEquals(final_exchange, mexchange) + self.assertTrue(queue.declare.called) + self.assertEquals(1, queue.declare.call_count) + + # Change the exchange, and we should have same connection, but new + # exchange object. + exchange2 = {'exchange_name': 'my_exchange2', 'exchange_type': 'foo2'} + exchange2_set = tuple(sorted(exchange2.items())) + + ep.return_value = (conn, conn_set, exchange2, exchange2_set) + mexchange2 = mock.MagicMock("the exchange 2") + ce.return_value = mexchange2 + + final_connection, final_exchange = self.mgr.get_connection({}, "foo") + + self.assertEquals(final_connection, connection) + self.assertEquals(final_exchange, mexchange2) + self.assertEquals(2, queue.declare.call_count) + + # Change the connection, and we should have a new connection and new + # exchange object. + conn2 = {'host': 'my_host2', 'user': 'my_user2', 'password': 'pwd2', + 'port': 1234, 'vhost': 'virtual2', 'library': 'my_lib2'} + conn2_set = tuple(sorted(conn2.items())) + exchange3= {'exchange_name': 'my_exchange', 'exchange_type': 'foo'} + exchange3_set = tuple(sorted(exchange3.items())) + + ep.return_value = (conn2, conn2_set, exchange3, exchange3_set) + mexchange3 = mock.MagicMock("the exchange 3") + ce.return_value = mexchange3 + + connection2 = mock.MagicMock("the connection 2") + channel2 = mock.MagicMock("the channel 2") + connection2.channel = channel2 + cc.return_value = connection2 + + final_connection, final_exchange = self.mgr.get_connection({}, "foo") + + self.assertEquals(final_connection, connection2) + self.assertEquals(final_exchange, mexchange3) + self.assertEquals(3, queue.declare.call_count) + + +class TestException(Exception): + pass + + +class TestNotabeneHandler(unittest.TestCase): + + def test_constructor_no_queue(self): + with self.assertRaises(pipeline_handler.NotabeneException) as e: + pipeline_handler.NotabeneHandler() + + @mock.patch.object(pipeline_handler.connection_manager, 'get_connection') + def test_constructor_queue(self, cm): + cm.return_value = (1, 2) + kw = {'queue_name': 'foo'} + h = pipeline_handler.NotabeneHandler(**kw) + self.assertIsNotNone(h.connection) + self.assertIsNotNone(h.exchange) + self.assertEquals(h.env_keys, []) + + @mock.patch.object(pipeline_handler.connection_manager, 'get_connection') + def test_constructor_env_keys(self, cm): + cm.return_value = (1, 2) + kw = {'queue_name': 'foo', 'env_keys': ['x', 'y']} + h = pipeline_handler.NotabeneHandler(**kw) + self.assertIsNotNone(h.connection) + self.assertIsNotNone(h.exchange) + self.assertEquals(h.env_keys, ['x', 'y']) + + @mock.patch.object(pipeline_handler.connection_manager, 'get_connection') + def test_handle_events(self, cm): + cm.return_value = (1, 2) + kw = {'queue_name': 'foo', 'env_keys': ['x', 'y', 'z']} + h = pipeline_handler.NotabeneHandler(**kw) + events = range(5) + env = {'x': ['cat', 'dog'], 'y': ['fish']} + ret = h.handle_events(events, env) + self.assertEquals(ret, events) + self.assertEquals(h.pending_notifications, ['cat', 'dog', 'fish']) + + @mock.patch.object(pipeline_handler.connection_manager, 'get_connection') + def test_format_notification(self, cm): + cm.return_value = (1, 2) + kw = {'queue_name': 'foo'} + h = pipeline_handler.NotabeneHandler(**kw) + notification = {} + n = h._format_notification(notification) + self.assertEquals(n, {'event_type': None, + 'message_id': None, + 'publisher_id': 'stv3', + 'timestamp': 'None', + 'payload': {}}) + + now = datetime.datetime.utcnow() + notification = {'event_type': 'name', + 'message_id': '1234', + 'timestamp': now, + 'service': 'tests'} + n = h._format_notification(notification) + self.assertEquals(n, {'event_type': 'name', + 'message_id': '1234', + 'timestamp': str(now), + 'publisher_id': 'tests', + 'payload': {}}) + + notification = {'event_type': 'name', + 'message_id': '1234', + 'timestamp': now, + 'service': 'tests', + 'extra1': 'e1', 'extra2': 'e2'} + n = h._format_notification(notification) + self.assertEquals(n, {'event_type': 'name', + 'message_id': '1234', + 'timestamp': str(now), + 'publisher_id': 'tests', + 'payload': {'extra1': 'e1', 'extra2': 'e2'}}) + + @mock.patch.object(pipeline_handler.connection_manager, 'get_connection') + def test_commit(self, cm): + cm.return_value = (1, 2) + kw = {'queue_name': 'foo'} + h = pipeline_handler.NotabeneHandler(**kw) + + h.pending_notifications = range(2) + with mock.patch.object(h, '_format_notification') as fn: + fn.return_value = {'event_type': 'event1'} + with mock.patch.object(pipeline_handler.driver, + 'send_notification') as sn: + h.commit() + self.assertEquals(sn.call_count, 2) + + @mock.patch.object(pipeline_handler.connection_manager, 'get_connection') + def test_commit(self, cm): + cm.return_value = (1, 2) + kw = {'queue_name': 'foo'} + h = pipeline_handler.NotabeneHandler(**kw) + + h.pending_notifications = range(2) + with mock.patch.object(h, '_format_notification') as fn: + fn.return_value = {'event_type': 'event1'} + with mock.patch.object(pipeline_handler.driver, + 'send_notification') as sn: + sn.side_effect = TestException + with mock.patch.object(pipeline_handler.logger, + 'exception') as ex: + h.commit() + self.assertEquals(ex.call_count, 2) + self.assertEquals(sn.call_count, 2) diff --git a/tests/test_usage_handler.py b/tests/test_usage_handler.py index b3fc02d..823d5c9 100644 --- a/tests/test_usage_handler.py +++ b/tests/test_usage_handler.py @@ -285,9 +285,11 @@ class TestUsageHandler(unittest.TestCase): env = {'stream_id': 123} raw = [{'event_type': 'foo'}] events = self.handler.handle_events(raw, env) - self.assertEquals(2, len(events)) + self.assertEquals(1, len(events)) + notifications = env['usage_notifications'] + self.assertEquals(1, len(notifications)) self.assertEquals("compute.instance.exists.failed", - events[1]['event_type']) + notifications[0]['event_type']) @mock.patch.object(pipeline_handler.UsageHandler, '_process_block') def test_handle_events_exists(self, pb): @@ -306,7 +308,9 @@ class TestUsageHandler(unittest.TestCase): {'event_type': 'foo'}, ] events = self.handler.handle_events(raw, env) - self.assertEquals(4, len(events)) + self.assertEquals(3, len(events)) + notifications = env['usage_notifications'] + self.assertEquals(1, len(notifications)) self.assertEquals("compute.instance.exists.failed", - events[3]['event_type']) + notifications[0]['event_type']) self.assertTrue(pb.called) diff --git a/winchester/pipeline_handler.py b/winchester/pipeline_handler.py index da63951..53cb6e9 100644 --- a/winchester/pipeline_handler.py +++ b/winchester/pipeline_handler.py @@ -4,6 +4,8 @@ import logging import six import uuid +from notabene import kombu_driver as driver + logger = logging.getLogger(__name__) @@ -92,6 +94,136 @@ class LoggingHandler(PipelineHandlerBase): pass +class NotabeneException(Exception): + pass + + +class ConnectionManager(object): + def __init__(self): + # {connection_properties: + # {exchange_properties: (connection, exchange)}} + self.pool = {} + + def _extract_params(self, kw): + host = kw.get('host', 'localhost') + user = kw.get('user', 'guest') + password = kw.get('password', 'guest') + port = kw.get('port', 5672) + vhost = kw.get('vhost', '/') + library = kw.get('library', 'librabbitmq') + exchange_name = kw.get('exchange') + exchange_type = kw.get('exchange_type', 'topic') + + if exchange_name is None: + raise NotabeneException("No 'exchange' name provided") + + connection_dict = {'host': host, 'port': port, + 'user': user, 'password': password, + 'library': library, 'vhost': vhost} + connection_tuple = tuple(sorted(connection_dict.items())) + + exchange_dict = {'exchange_name': exchange_name, + 'exchange_type': exchange_type} + exchange_tuple = tuple(sorted(exchange_dict.items())) + + return (connection_dict, connection_tuple, + exchange_dict, exchange_tuple) + + def get_connection(self, properties, queue_name): + connection_dict, connection_tuple, \ + exchange_dict, exchange_tuple = self._extract_params(properties) + connection_info = self.pool.get(connection_tuple) + if connection_info is None: + connection = driver.create_connection(connection_dict['host'], + connection_dict['port'], + connection_dict['user'], + connection_dict['password'], + connection_dict['library'], + connection_dict['vhost']) + connection_info = (connection, {}) + self.pool[connection_tuple] = connection_info + connection, exchange_pool = connection_info + exchange = exchange_pool.get(exchange_tuple) + if exchange is None: + exchange = driver.create_exchange(exchange_dict['exchange_name'], + exchange_dict['exchange_type']) + exchange_pool[exchange_tuple] = exchange + + # Make sure the queue exists so we don't lose events. + queue = driver.create_queue(queue_name, exchange, queue_name, + channel=connection.channel()) + queue.declare() + + return (connection, exchange) + + +# Global ConnectionManager. Shared by all Handlers. +connection_manager = ConnectionManager() + + +class NotabeneHandler(PipelineHandlerBase): + # Handlers are created per stream, so we have to be smart about + # things like connections to databases and queues. + # We don't want to create too many connections, and we have to + # remember that stream processing has to occur quickly, so + # we want to avoid round-trips where possible. + def __init__(self, **kw): + super(NotabeneHandler, self).__init__(**kw) + global connection_manager + + self.queue_name = kw.get('queue_name') + if self.queue_name is None: + raise NotabeneException("No 'queue_name' provided") + self.connection, self.exchange = connection_manager.get_connection( + kw, self.queue_name) + + self.env_keys = kw.get('env_keys', []) + + def handle_events(self, events, env): + keys = [key for key in self.env_keys] + self.pending_notifications = [] + for key in keys: + self.pending_notifications.extend(env.get(key, [])) + return events + + def _format_notification(self, notification): + """Core traits are in the root of the notification and extra + traits go in the payload.""" + core_keys = ['event_type', 'message_id', 'timestamp', 'service'] + core = dict((key, notification.get(key)) for key in core_keys) + + payload = dict((key, notification[key]) + for key in notification.keys() + if key not in core_keys) + + core['payload'] = payload + + # Notifications require "publisher_id", not "service" ... + publisher = core.get('service') + if not publisher: + publisher = "stv3" + core['publisher_id'] = publisher + del core['service'] + + core['timestamp'] = str(core['timestamp']) + return core + + def commit(self): + for notification in self.pending_notifications: + notification = self._format_notification(notification) + logger.debug("Publishing '%s' to '%s' with routing_key '%s'" % + (notification['event_type'], self.exchange, + self.queue_name)) + try: + driver.send_notification(notification, self.queue_name, + self.connection, self.exchange) + except Exception as e: + logger.exception(e) + + def rollback(self): + pass + + class UsageException(Exception): def __init__(self, code, message): super(UsageException, self).__init__(message) @@ -309,7 +441,7 @@ class UsageHandler(PipelineHandlerBase): } new_events.append(new_event) - events.extend(new_events) + env['usage_notifications'] = new_events return events def commit(self): diff --git a/winchester/pipeline_manager.py b/winchester/pipeline_manager.py index 160a41b..e606596 100644 --- a/winchester/pipeline_manager.py +++ b/winchester/pipeline_manager.py @@ -55,7 +55,7 @@ class Pipeline(object): handler = handler_class(**params) except Exception as e: logger.exception("Error initalizing handler %s for pipeline %s" % - handler_class, self.name) + (handler_class, self.name)) raise PipelineExecutionError("Error loading pipeline", e) self.handlers.append(handler) @@ -186,8 +186,8 @@ class PipelineManager(object): try: plugins[name] = simport.load(cls_string) except simport.ImportFailed as e: - log.error("Could not load plugin %s: Import failed. %s" % ( - name, e)) + logger.error("Could not load plugin %s: Import failed. %s" % ( + name, e)) except (simport.MissingMethodOrFunction, simport.MissingModule, simport.BadDirectory) as e: