Add support for find_events()
For use with API. You can find events based on traits, event_type or dates. Doesn't support wildcarding or anything just yet. Change-Id: Ide6e13086d50bacf356d8e36b9de257db9d543ca
This commit is contained in:
parent
4875e419a6
commit
7846f6a20f
@ -1,7 +1,7 @@
|
||||
[metadata]
|
||||
description-file = README.md
|
||||
name = winchester
|
||||
version = 0.31
|
||||
version = 0.4
|
||||
author = Monsyne Dragon
|
||||
author_email = mdragon@rackspace.com
|
||||
summary = An OpenStack notification event processing library.
|
||||
|
@ -156,6 +156,10 @@ class TestDB(unittest.TestCase):
|
||||
load_fixture_data(self.db, TEST_DATA)
|
||||
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
|
||||
|
||||
self.events = {}
|
||||
for event in TEST_DATA[1]['event']:
|
||||
self.events[event['message_id']] = event
|
||||
|
||||
def tearDown(self):
|
||||
logging.getLogger('sqlalchemy.engine').setLevel(logging.WARNING)
|
||||
self.db.close()
|
||||
@ -475,3 +479,44 @@ class TestDB(unittest.TestCase):
|
||||
self.assertIn('_mark', stream)
|
||||
self.assertEqual(streams[0]['id'], 3)
|
||||
self.assertEqual(streams[1]['id'], 4)
|
||||
|
||||
def test_find_events(self):
|
||||
events = self.db.find_events()
|
||||
self.assertEqual(4, len(events))
|
||||
for event in events:
|
||||
self.assertTrue(event['message_id'] in self.events)
|
||||
|
||||
def test_find_events_date_filter(self):
|
||||
_from = datetime.datetime(2014,8,1,10)
|
||||
_to = datetime.datetime(2014,8,1,16)
|
||||
events = self.db.find_events(from_datetime=_from, to_datetime=_to)
|
||||
self.assertEqual(2, len(events))
|
||||
msg_ids = [event['message_id'] for event in events]
|
||||
for good in ['1234-5678-001', '1234-5678-002']:
|
||||
self.assertTrue(good in msg_ids)
|
||||
|
||||
def test_find_events_event_type(self):
|
||||
events = self.db.find_events(event_name='test.otherthing.foo')
|
||||
self.assertEqual(2, len(events))
|
||||
for event in events:
|
||||
self.assertTrue(event['event_type'], 'test.otherthing.foo')
|
||||
|
||||
def test_find_events_traits(self):
|
||||
traits = {'memory_mb': 1024}
|
||||
events = self.db.find_events(traits=traits)
|
||||
self.assertEqual(1, len(events))
|
||||
self.assertTrue(events[0]['message_id'], '1234-5678-001')
|
||||
|
||||
def test_find_events_limit(self):
|
||||
events = self.db.find_events(limit=2)
|
||||
self.assertEqual(2, len(events))
|
||||
|
||||
def test_find_events_mark(self):
|
||||
events = self.db.find_events(limit=2)
|
||||
self.assertEqual(2, len(events))
|
||||
msg_ids = [event['message_id'] for event in events]
|
||||
|
||||
events = self.db.find_events(limit=2, mark=events[-1]['_mark'])
|
||||
self.assertEqual(2, len(events))
|
||||
for event in events:
|
||||
self.assertTrue(event['message_id'] not in msg_ids)
|
||||
|
@ -47,7 +47,6 @@ def sessioned(func):
|
||||
|
||||
class DBInterface(object):
|
||||
|
||||
|
||||
@classmethod
|
||||
def config_description(cls):
|
||||
return dict(url=ConfigItem(required=True,
|
||||
@ -102,14 +101,16 @@ class DBInterface(object):
|
||||
|
||||
@sessioned
|
||||
def get_event_type(self, description, session=None):
|
||||
t = session.query(models.EventType).filter(models.EventType.desc == description).first()
|
||||
t = session.query(models.EventType).filter(
|
||||
models.EventType.desc == description).first()
|
||||
if t is None:
|
||||
t = models.EventType(description)
|
||||
session.add(t)
|
||||
return t
|
||||
|
||||
@sessioned
|
||||
def create_event(self, message_id, event_type, generated, traits, session=None):
|
||||
def create_event(self, message_id, event_type, generated, traits,
|
||||
session=None):
|
||||
event_type = self.get_event_type(event_type, session=session)
|
||||
e = models.Event(message_id, event_type, generated)
|
||||
for name in traits:
|
||||
@ -122,9 +123,61 @@ class DBInterface(object):
|
||||
e = session.query(models.Event).\
|
||||
filter(models.Event.message_id == message_id).one()
|
||||
except NoResultFound:
|
||||
raise NoSuchEventError("No event found with message_id %s!" % message_id)
|
||||
raise NoSuchEventError(
|
||||
"No event found with message_id %s!" % message_id)
|
||||
return e.as_dict
|
||||
|
||||
@sessioned
|
||||
def find_events(self, from_datetime=None, to_datetime=None,
|
||||
event_name=None, traits=None, mark=None, limit=None,
|
||||
session=None):
|
||||
|
||||
order_desc = True
|
||||
|
||||
q = session.query(models.Event)
|
||||
if mark is not None:
|
||||
if mark.startswith('+'):
|
||||
order_desc=False
|
||||
mark = mark[1:]
|
||||
if mark.startswith('-'):
|
||||
order_desc=True
|
||||
mark = mark[1:]
|
||||
if mark:
|
||||
if order_desc:
|
||||
q = q.filter(models.Event.id < int(mark, 16))
|
||||
else:
|
||||
q = q.filter(models.Event.id > int(mark, 16))
|
||||
if from_datetime is not None:
|
||||
q = q.filter(models.Event.generated > from_datetime)
|
||||
if to_datetime is not None:
|
||||
q = q.filter(models.Event.generated <= to_datetime)
|
||||
if event_name is not None:
|
||||
event_type = self.get_event_type(event_name,
|
||||
session=session)
|
||||
q = q.filter(models.Event.event_type_id == event_type.id)
|
||||
if traits is not None:
|
||||
for name, val in traits.items():
|
||||
q = q.filter(models.Event.traits.any(and_(
|
||||
models.Trait.name == name,
|
||||
models.Trait.value == val)))
|
||||
|
||||
if order_desc:
|
||||
q = q.order_by(models.Event.id.desc())
|
||||
mark_fmt = '%x'
|
||||
else:
|
||||
q = q.order_by(models.Event.id.asc())
|
||||
mark_fmt = '+%x'
|
||||
|
||||
if limit is not None:
|
||||
q = q.limit(limit)
|
||||
|
||||
event_info = []
|
||||
for event in q.all():
|
||||
info = event.as_dict
|
||||
info['_mark'] = mark_fmt % event.id
|
||||
event_info.append(info)
|
||||
return event_info
|
||||
|
||||
@sessioned
|
||||
def get_stream_by_id(self, stream_id, session=None):
|
||||
try:
|
||||
@ -135,7 +188,8 @@ class DBInterface(object):
|
||||
return s
|
||||
|
||||
@sessioned
|
||||
def create_stream(self, trigger_name, initial_event, dist_traits, expire_expr, session=None):
|
||||
def create_stream(self, trigger_name, initial_event, dist_traits,
|
||||
expire_expr, session=None):
|
||||
first_event_time = initial_event['timestamp']
|
||||
s = models.Stream(trigger_name, first_event_time)
|
||||
for trait_name in dist_traits:
|
||||
|
Loading…
x
Reference in New Issue
Block a user