From 7846f6a20f7b8efecfead1990c746d2952143dcc Mon Sep 17 00:00:00 2001 From: Sandy Walsh Date: Wed, 4 Mar 2015 09:22:11 -0800 Subject: [PATCH] Add support for find_events() For use with API. You can find events based on traits, event_type or dates. Doesn't support wildcarding or anything just yet. Change-Id: Ide6e13086d50bacf356d8e36b9de257db9d543ca --- setup.cfg | 2 +- tests/test_db.py | 45 +++++++++++++++++++++++++++ winchester/db/interface.py | 64 +++++++++++++++++++++++++++++++++++--- 3 files changed, 105 insertions(+), 6 deletions(-) diff --git a/setup.cfg b/setup.cfg index e3fdc63..8d77162 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,7 +1,7 @@ [metadata] description-file = README.md name = winchester -version = 0.31 +version = 0.4 author = Monsyne Dragon author_email = mdragon@rackspace.com summary = An OpenStack notification event processing library. diff --git a/tests/test_db.py b/tests/test_db.py index 161cfe7..abdb800 100644 --- a/tests/test_db.py +++ b/tests/test_db.py @@ -156,6 +156,10 @@ class TestDB(unittest.TestCase): load_fixture_data(self.db, TEST_DATA) logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO) + self.events = {} + for event in TEST_DATA[1]['event']: + self.events[event['message_id']] = event + def tearDown(self): logging.getLogger('sqlalchemy.engine').setLevel(logging.WARNING) self.db.close() @@ -475,3 +479,44 @@ class TestDB(unittest.TestCase): self.assertIn('_mark', stream) self.assertEqual(streams[0]['id'], 3) self.assertEqual(streams[1]['id'], 4) + + def test_find_events(self): + events = self.db.find_events() + self.assertEqual(4, len(events)) + for event in events: + self.assertTrue(event['message_id'] in self.events) + + def test_find_events_date_filter(self): + _from = datetime.datetime(2014,8,1,10) + _to = datetime.datetime(2014,8,1,16) + events = self.db.find_events(from_datetime=_from, to_datetime=_to) + self.assertEqual(2, len(events)) + msg_ids = [event['message_id'] for event in events] + for good in ['1234-5678-001', '1234-5678-002']: + self.assertTrue(good in msg_ids) + + def test_find_events_event_type(self): + events = self.db.find_events(event_name='test.otherthing.foo') + self.assertEqual(2, len(events)) + for event in events: + self.assertTrue(event['event_type'], 'test.otherthing.foo') + + def test_find_events_traits(self): + traits = {'memory_mb': 1024} + events = self.db.find_events(traits=traits) + self.assertEqual(1, len(events)) + self.assertTrue(events[0]['message_id'], '1234-5678-001') + + def test_find_events_limit(self): + events = self.db.find_events(limit=2) + self.assertEqual(2, len(events)) + + def test_find_events_mark(self): + events = self.db.find_events(limit=2) + self.assertEqual(2, len(events)) + msg_ids = [event['message_id'] for event in events] + + events = self.db.find_events(limit=2, mark=events[-1]['_mark']) + self.assertEqual(2, len(events)) + for event in events: + self.assertTrue(event['message_id'] not in msg_ids) diff --git a/winchester/db/interface.py b/winchester/db/interface.py index ab4c165..2b5b79f 100644 --- a/winchester/db/interface.py +++ b/winchester/db/interface.py @@ -47,7 +47,6 @@ def sessioned(func): class DBInterface(object): - @classmethod def config_description(cls): return dict(url=ConfigItem(required=True, @@ -102,14 +101,16 @@ class DBInterface(object): @sessioned def get_event_type(self, description, session=None): - t = session.query(models.EventType).filter(models.EventType.desc == description).first() + t = session.query(models.EventType).filter( + models.EventType.desc == description).first() if t is None: t = models.EventType(description) session.add(t) return t @sessioned - def create_event(self, message_id, event_type, generated, traits, session=None): + def create_event(self, message_id, event_type, generated, traits, + session=None): event_type = self.get_event_type(event_type, session=session) e = models.Event(message_id, event_type, generated) for name in traits: @@ -122,9 +123,61 @@ class DBInterface(object): e = session.query(models.Event).\ filter(models.Event.message_id == message_id).one() except NoResultFound: - raise NoSuchEventError("No event found with message_id %s!" % message_id) + raise NoSuchEventError( + "No event found with message_id %s!" % message_id) return e.as_dict + @sessioned + def find_events(self, from_datetime=None, to_datetime=None, + event_name=None, traits=None, mark=None, limit=None, + session=None): + + order_desc = True + + q = session.query(models.Event) + if mark is not None: + if mark.startswith('+'): + order_desc=False + mark = mark[1:] + if mark.startswith('-'): + order_desc=True + mark = mark[1:] + if mark: + if order_desc: + q = q.filter(models.Event.id < int(mark, 16)) + else: + q = q.filter(models.Event.id > int(mark, 16)) + if from_datetime is not None: + q = q.filter(models.Event.generated > from_datetime) + if to_datetime is not None: + q = q.filter(models.Event.generated <= to_datetime) + if event_name is not None: + event_type = self.get_event_type(event_name, + session=session) + q = q.filter(models.Event.event_type_id == event_type.id) + if traits is not None: + for name, val in traits.items(): + q = q.filter(models.Event.traits.any(and_( + models.Trait.name == name, + models.Trait.value == val))) + + if order_desc: + q = q.order_by(models.Event.id.desc()) + mark_fmt = '%x' + else: + q = q.order_by(models.Event.id.asc()) + mark_fmt = '+%x' + + if limit is not None: + q = q.limit(limit) + + event_info = [] + for event in q.all(): + info = event.as_dict + info['_mark'] = mark_fmt % event.id + event_info.append(info) + return event_info + @sessioned def get_stream_by_id(self, stream_id, session=None): try: @@ -135,7 +188,8 @@ class DBInterface(object): return s @sessioned - def create_stream(self, trigger_name, initial_event, dist_traits, expire_expr, session=None): + def create_stream(self, trigger_name, initial_event, dist_traits, + expire_expr, session=None): first_event_time = initial_event['timestamp'] s = models.Stream(trigger_name, first_event_time) for trait_name in dist_traits: