From 4cb16327c7dc28791a36046d399effad9416e27b Mon Sep 17 00:00:00 2001 From: Sandy Walsh Date: Wed, 25 Mar 2015 12:20:20 -0700 Subject: [PATCH] NotabeneHandler no longer formats notifications. It just passes it through verbatim. Only required key is 'event_type'. Change-Id: Idcd911c9f09d405d57ecaf5b14b72bfebe2a179a --- setup.cfg | 2 +- tests/test_notabene.py | 71 ++++++++-------------------------- winchester/pipeline_handler.py | 34 ++++------------ 3 files changed, 24 insertions(+), 83 deletions(-) diff --git a/setup.cfg b/setup.cfg index 63a0b8f..9cebbfd 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,7 +1,7 @@ [metadata] description-file = README.md name = winchester -version = 0.5 +version = 0.51 author = Monsyne Dragon author_email = mdragon@rackspace.com summary = An OpenStack notification event processing library. diff --git a/tests/test_notabene.py b/tests/test_notabene.py index 422f66f..2215944 100644 --- a/tests/test_notabene.py +++ b/tests/test_notabene.py @@ -180,41 +180,17 @@ class TestNotabeneHandler(unittest.TestCase): self.assertEquals(h.pending_notifications, ['cat', 'dog', 'fish']) @mock.patch.object(pipeline_handler.connection_manager, 'get_connection') - def test_format_notification(self, cm): + def test_commit_good(self, cm): cm.return_value = (1, 2) kw = {'queue_name': 'foo'} h = pipeline_handler.NotabeneHandler(**kw) - notification = {} - n = h._format_notification(notification) - self.assertEquals(n, {'event_type': None, - 'message_id': None, - 'publisher_id': 'stv3', - 'timestamp': 'None', - 'payload': {}}) - now = datetime.datetime.utcnow() - notification = {'event_type': 'name', - 'message_id': '1234', - 'timestamp': now, - 'service': 'tests'} - n = h._format_notification(notification) - self.assertEquals(n, {'event_type': 'name', - 'message_id': '1234', - 'timestamp': str(now), - 'publisher_id': 'tests', - 'payload': {}}) - - notification = {'event_type': 'name', - 'message_id': '1234', - 'timestamp': now, - 'service': 'tests', - 'extra1': 'e1', 'extra2': 'e2'} - n = h._format_notification(notification) - self.assertEquals(n, {'event_type': 'name', - 'message_id': '1234', - 'timestamp': str(now), - 'publisher_id': 'tests', - 'payload': {'extra1': 'e1', 'extra2': 'e2'}}) + h.pending_notifications = [{'event_type': 'event1'}, + {'event_type': 'event2'}] + with mock.patch.object(pipeline_handler.driver, + 'send_notification') as sn: + h.commit() + self.assertEquals(sn.call_count, 2) @mock.patch.object(pipeline_handler.connection_manager, 'get_connection') def test_commit(self, cm): @@ -222,28 +198,13 @@ class TestNotabeneHandler(unittest.TestCase): kw = {'queue_name': 'foo'} h = pipeline_handler.NotabeneHandler(**kw) - h.pending_notifications = range(2) - with mock.patch.object(h, '_format_notification') as fn: - fn.return_value = {'event_type': 'event1'} - with mock.patch.object(pipeline_handler.driver, - 'send_notification') as sn: + h.pending_notifications = [{'event_type': 'event1'}, + {'event_type': 'event2'}] + with mock.patch.object(pipeline_handler.driver, + 'send_notification') as sn: + sn.side_effect = TestException + with mock.patch.object(pipeline_handler.logger, + 'exception') as ex: h.commit() - self.assertEquals(sn.call_count, 2) - - @mock.patch.object(pipeline_handler.connection_manager, 'get_connection') - def test_commit(self, cm): - cm.return_value = (1, 2) - kw = {'queue_name': 'foo'} - h = pipeline_handler.NotabeneHandler(**kw) - - h.pending_notifications = range(2) - with mock.patch.object(h, '_format_notification') as fn: - fn.return_value = {'event_type': 'event1'} - with mock.patch.object(pipeline_handler.driver, - 'send_notification') as sn: - sn.side_effect = TestException - with mock.patch.object(pipeline_handler.logger, - 'exception') as ex: - h.commit() - self.assertEquals(ex.call_count, 2) - self.assertEquals(sn.call_count, 2) + self.assertEquals(ex.call_count, 2) + self.assertEquals(sn.call_count, 2) diff --git a/winchester/pipeline_handler.py b/winchester/pipeline_handler.py index 7591fa9..fe1bb1a 100644 --- a/winchester/pipeline_handler.py +++ b/winchester/pipeline_handler.py @@ -186,32 +186,9 @@ class NotabeneHandler(PipelineHandlerBase): self.pending_notifications.extend(env.get(key, [])) return events - def _format_notification(self, notification): - """Core traits are in the root of the notification and extra - traits go in the payload.""" - core_keys = ['event_type', 'message_id', 'timestamp', 'service'] - core = dict((key, notification.get(key)) for key in core_keys) - - payload = dict((key, notification[key]) - for key in notification.keys() - if key not in core_keys) - - core['payload'] = payload - - # Notifications require "publisher_id", not "service" ... - publisher = core.get('service') - if not publisher: - publisher = "stv3" - core['publisher_id'] = publisher - del core['service'] - - core['timestamp'] = str(core['timestamp']) - return core - def commit(self): for notification in self.pending_notifications: - notification = self._format_notification(notification) - logger.debug("Publishing '%s' to '%s' with routing_key '%s'" % + logger.info("Publishing '%s' to '%s' with routing_key '%s'" % (notification['event_type'], self.exchange, self.queue_name)) try: @@ -410,7 +387,8 @@ class UsageHandler(PipelineHandlerBase): apb, ape, len(block))) if len(block) > 1: - logger.warn("Events for Stream: %s" % self.stream_id) + logger.warn("%s - events (stream: %s)" + % (event_type, self.stream_id)) for event in block: logger.warn("^Event: %s - %s" % (event['timestamp'], event['event_type'])) @@ -420,6 +398,7 @@ class UsageHandler(PipelineHandlerBase): if self.warnings: instance_id = exists.get('instance_id', 'n/a') warning_event = {'event_type': 'compute.instance.exists.warnings', + 'publisher_id': 'stv3', 'message_id': str(uuid.uuid4()), 'timestamp': exists.get('timestamp', datetime.datetime.utcnow()), @@ -431,8 +410,9 @@ class UsageHandler(PipelineHandlerBase): new_event = self._base_notification(exists) new_event.update({'event_type': event_type, 'message_id': str(uuid.uuid4()), - 'timestamp': str(exists.get('timestamp', - datetime.datetime.utcnow())), + 'publisher_id': 'stv3', + 'timestamp': exists.get('timestamp', + datetime.datetime.utcnow()), 'stream_id': int(self.stream_id), 'error': str(error), 'error_code': error and error.code})