Merge pull request #176 from dshulyak/orm_events

Orm events refactoring
This commit is contained in:
Jędrzej Nowak 2015-09-21 09:51:43 +02:00
commit f675955527
10 changed files with 232 additions and 90 deletions

1
.gitignore vendored

@ -36,3 +36,4 @@ vagrant-settings.yaml
.solar_cli_uids .solar_cli_uids
.ssh/ .ssh/
.cache

1
solar/.cache/v/cache/lastfailed vendored Normal file

@ -0,0 +1 @@
{}

@ -18,34 +18,36 @@ __all__ = ['add_dep', 'add_react']
import networkx as nx import networkx as nx
from solar.core.log import log from solar.core.log import log
from solar.interfaces.db import get_db from solar.interfaces import orm
from solar.events.controls import Dep, React, StateChange from solar.events.controls import Dep, React, StateChange
db = get_db()
def create_event(event_dict): def create_event(event_dict):
etype = event_dict.pop('etype') etype = event_dict['etype']
kwargs = {'child': event_dict['child'],
'parent': event_dict['parent'],
'child_action': event_dict['child_action'],
'parent_action': event_dict['parent_action'],
'state': event_dict['state']}
if etype == React.etype: if etype == React.etype:
return React(**event_dict) return React(**kwargs)
elif etype == Dep.etype: elif etype == Dep.etype:
return Dep(**event_dict) return Dep(**kwargs)
else: else:
raise Exception('No support for type %s', etype) raise Exception('No support for type %s', etype)
def add_event(ev): def add_event(ev):
rst = all_events(ev.parent_node) rst = all_events(ev.parent)
for rev in rst: for rev in rst:
if ev == rev: if ev == rev:
break break
else: else:
rst.append(ev) rst.append(ev)
db.create( resource_db = orm.DBResource.load(ev.parent)
ev.parent_node, event_db = orm.DBEvent(**ev.to_dict())
[i.to_dict() for i in rst], event_db.save()
collection=db.COLLECTIONS.events) resource_db.events.add(event_db)
def add_dep(parent, dep, actions, state='success'): def add_dep(parent, dep, actions, state='success'):
@ -59,36 +61,40 @@ def add_dep(parent, dep, actions, state='success'):
def add_react(parent, dep, actions, state='success'): def add_react(parent, dep, actions, state='success'):
for act in actions: for act in actions:
r = React(parent, act, state=state, r = React(parent, act, state=state,
depend_node=dep, depend_action=act) depend_node=dep, depend_action=act)
add_event(r) add_event(r)
log.debug('Added event: %s', r) log.debug('Added event: %s', r)
def add_events(resource, lst):
db_resource = orm.DBResource.load(resource)
for ev in lst:
event_db = orm.DBEvent(**ev.to_dict())
event_db.save()
db_resource.events.add(event_db)
def set_events(resource, lst): def set_events(resource, lst):
db.create( db_resource = orm.DBResource.load(resource)
resource, for ev in db_resource.events.as_set():
[i.to_dict() for i in lst], ev.delete()
collection=db.COLLECTIONS.events) for ev in lst:
event_db = orm.DBEvent(**ev.to_dict())
event_db.save()
db_resource.events.add(event_db)
def remove_event(ev): def remove_event(ev):
rst = all_events(ev.parent_node) event_db = orm.DBEvent(**ev.to_dict())
set_events(ev.parent_node, [it for it in rst if not it == ev]) event_db.delete()
def add_events(resource, lst):
rst = all_events(resource)
rst.extend(lst)
set_events(resource, rst)
def all_events(resource): def all_events(resource):
events = db.get(resource, collection=db.COLLECTIONS.events, events = orm.DBResource.load(resource).events.as_set()
return_empty=True)
if not events: if not events:
return [] return []
return [create_event(i) for i in events.properties] return [create_event(i.to_dict()) for i in events]
def bft_events_graph(start): def bft_events_graph(start):
@ -105,41 +111,47 @@ def bft_events_graph(start):
current_events = all_events(item) current_events = all_events(item)
for ev in current_events: for ev in current_events:
dg.add_edge(ev.parent, ev.dependent, label=ev.state) dg.add_edge(ev.parent_node, ev.child_node, label=ev.state)
if ev.depend_node in visited: if ev.child in visited:
continue continue
# it is possible to have events leading to same resource but # it is possible to have events leading to same resource but
# different action # different action
if ev.depend_node in stack: if ev.child in stack:
continue continue
stack.append(ev.depend_node) stack.append(ev.child)
visited.add(ev.parent_node) visited.add(ev.parent)
return dg return dg
def build_edges(changed_resources, changes_graph, events): def build_edges(changes_graph, events):
""" """
:param changed_resources: list of resource names that were changed
:param changes_graph: nx.DiGraph object with actions to be executed :param changes_graph: nx.DiGraph object with actions to be executed
:param events: {res: [controls.Event objects]} :param events: {res: [controls.Event objects]}
""" """
stack = changed_resources[:] events_graph = nx.MultiDiGraph()
visited = []
for res_evts in events.values():
for ev in res_evts:
events_graph.add_edge(ev.parent_node, ev.child_node, event=ev)
stack = changes_graph.nodes()
visited = set()
while stack: while stack:
node = stack.pop() event_name = stack.pop(0)
if node in events: if event_name in events_graph:
log.debug('Events %s for resource %s', events[node], node) log.debug('Next events after %s are %s', event_name, events_graph.successors(event_name))
else: else:
log.debug('No dependencies based on %s', node) log.debug('No outgoing events based on %s', event_name)
if node not in visited: if event_name not in visited:
for ev in events.get(node, ()): for parent, child, data in events_graph.edges(event_name, data=True):
ev.insert(stack, changes_graph) succ_ev = data['event']
succ_ev.insert(stack, changes_graph)
visited.append(node) visited.add(event_name)
return changes_graph return changes_graph

@ -36,38 +36,41 @@ class Event(object):
etype = None etype = None
def __init__(self, parent_node, parent_action, def __init__(self, parent, parent_action,
state='', depend_node='', depend_action=''): state='', child='', child_action=''):
self.parent_node = parent_node self.parent = parent
self.parent_action = parent_action self.parent_action = parent_action
self.state = state self.state = state
self.depend_node = depend_node self.child = child
self.depend_action = depend_action self.child_action = child_action
@property @property
def parent(self): def parent_node(self):
return '{}.{}'.format(self.parent_node, self.parent_action) return '{}.{}'.format(self.parent, self.parent_action)
@property @property
def dependent(self): def child_node(self):
return '{}.{}'.format(self.depend_node, self.depend_action) return '{}.{}'.format(self.child, self.child_action)
def to_dict(self): def to_dict(self):
rst = {'etype': self.etype} return {'etype': self.etype,
rst.update(self.__dict__) 'child': self.child,
return rst 'parent': self.parent,
'parent_action': self.parent_action,
'child_action': self.child_action,
'state': self.state}
def __eq__(self, inst): def __eq__(self, inst):
if inst.__class__ != self.__class__: if inst.__class__ != self.__class__:
return False return False
return all(( return all((
self.parent == inst.parent, self.parent_node == inst.parent_node,
self.state == inst.state, self.state == inst.state,
self.dependent == inst.dependent)) self.child_node == inst.child_node))
def __repr__(self): def __repr__(self):
return '{}: {} -> {} -> {}'.format( return '{}: {} -> {} -> {}'.format(
self.etype, self.parent, self.state, self.dependent) self.etype, self.parent_node, self.state, self.child_node)
def __hash__(self): def __hash__(self):
return hash(repr(self)) return hash(repr(self))
@ -78,10 +81,10 @@ class Dependency(Event):
etype = 'depends_on' etype = 'depends_on'
def insert(self, changed_resources, changes_graph): def insert(self, changed_resources, changes_graph):
if (self.parent in changes_graph and if (self.parent_node in changes_graph and
self.dependent in changes_graph): self.child_node in changes_graph):
changes_graph.add_edge( changes_graph.add_edge(
self.parent, self.dependent, state=self.state) self.parent_node, self.child_node, state=self.state)
Dep = Dependency Dep = Dependency
@ -91,15 +94,16 @@ class React(Event):
def insert(self, changed_resources, changes_graph): def insert(self, changed_resources, changes_graph):
if self.parent in changes_graph: if self.parent_node in changes_graph:
if self.dependent not in changes_graph: if self.child_node not in changes_graph:
changes_graph.add_node( changes_graph.add_node(
self.dependent, status='PENDING', self.child_node, status='PENDING',
errmsg=None, type='solar_resource', errmsg=None, type='solar_resource',
args=[self.depend_node, self.depend_action]) args=[self.child, self.child_action])
changes_graph.add_edge(self.parent, self.dependent, state=self.state) changes_graph.add_edge(
changed_resources.append(self.depend_node) self.parent_node, self.child_node, state=self.state)
changed_resources.append(self.child_node)
class StateChange(Event): class StateChange(Event):
@ -109,6 +113,6 @@ class StateChange(Event):
def insert(self, changed_resources, changes_graph): def insert(self, changed_resources, changes_graph):
changed_resources.append(self.parent) changed_resources.append(self.parent)
changes_graph.add_node( changes_graph.add_node(
self.parent, status='PENDING', self.parent_node, status='PENDING',
errmsg=None, type='solar_resource', errmsg=None, type='solar_resource',
args=[self.parent_node, self.parent_action]) args=[self.parent, self.parent_action])

@ -132,7 +132,7 @@ class BaseGraphDB(object):
DEFAULT_COLLECTION=COLLECTIONS.resource DEFAULT_COLLECTION=COLLECTIONS.resource
RELATION_TYPES = Enum( RELATION_TYPES = Enum(
'RelationTypes', 'RelationTypes',
'input_to_input resource_input plan_edge graph_to_node' 'input_to_input resource_input plan_edge graph_to_node resource_event'
) )
DEFAULT_RELATION=RELATION_TYPES.resource_input DEFAULT_RELATION=RELATION_TYPES.resource_input
@ -172,6 +172,10 @@ class BaseGraphDB(object):
def create(self, name, properties={}, collection=DEFAULT_COLLECTION): def create(self, name, properties={}, collection=DEFAULT_COLLECTION):
"""Create element (node) with given name, args, of type `collection`.""" """Create element (node) with given name, args, of type `collection`."""
@abc.abstractmethod
def delete(self, name, collection=DEFAULT_COLLECTION):
"""Delete element with given name. of type `collection`."""
@abc.abstractmethod @abc.abstractmethod
def create_relation(self, def create_relation(self,
source, source,

@ -38,6 +38,9 @@ class RedisGraphDB(BaseGraphDB):
elif relation_db['type_'] == BaseGraphDB.RELATION_TYPES.resource_input.name: elif relation_db['type_'] == BaseGraphDB.RELATION_TYPES.resource_input.name:
source_collection = BaseGraphDB.COLLECTIONS.resource source_collection = BaseGraphDB.COLLECTIONS.resource
dest_collection = BaseGraphDB.COLLECTIONS.input dest_collection = BaseGraphDB.COLLECTIONS.input
elif relation_db['type_'] == BaseGraphDB.RELATION_TYPES.resource_event.name:
source_collection = BaseGraphDB.COLLECTIONS.resource
dest_collection = BaseGraphDB.COLLECTIONS.events
source = self.get(relation_db['source'], collection=source_collection) source = self.get(relation_db['source'], collection=source_collection)
dest = self.get(relation_db['dest'], collection=dest_collection) dest = self.get(relation_db['dest'], collection=dest_collection)
@ -152,6 +155,11 @@ class RedisGraphDB(BaseGraphDB):
except TypeError: except TypeError:
raise KeyError raise KeyError
def delete(self, name, collection=BaseGraphDB.DEFAULT_COLLECTION):
keys = self._r.keys(self._make_collection_key(collection, name))
if keys:
self._r.delete(*keys)
def get_or_create(self, def get_or_create(self,
name, name,
properties={}, properties={},

@ -382,6 +382,12 @@ class DBObject(object):
collection=self._collection collection=self._collection
) )
def delete(self):
db.delete(
self._db_key,
collection=self._collection
)
class DBResourceInput(DBObject): class DBResourceInput(DBObject):
__metaclass__ = DBObjectMeta __metaclass__ = DBObjectMeta
@ -483,6 +489,29 @@ class DBResourceInput(DBObject):
return self.parse_backtracked_value(self.backtrack_value_emitter()) return self.parse_backtracked_value(self.backtrack_value_emitter())
class DBEvent(DBObject):
__metaclass__ = DBObjectMeta
_collection = base.BaseGraphDB.COLLECTIONS.events
id = db_field(is_primary=True)
parent = db_field(schema='str!')
parent_action = db_field(schema='str!')
etype = db_field('str!')
state = db_field('str')
child = db_field('str')
child_action = db_field('str')
def delete(self):
db.delete_relations(
dest=self._db_node,
type_=base.BaseGraphDB.RELATION_TYPES.resource_event
)
super(DBEvent, self).delete()
class DBResource(DBObject): class DBResource(DBObject):
__metaclass__ = DBObjectMeta __metaclass__ = DBObjectMeta
@ -501,6 +530,8 @@ class DBResource(DBObject):
inputs = db_related_field(base.BaseGraphDB.RELATION_TYPES.resource_input, inputs = db_related_field(base.BaseGraphDB.RELATION_TYPES.resource_input,
DBResourceInput) DBResourceInput)
events = db_related_field(base.BaseGraphDB.RELATION_TYPES.resource_event,
DBEvent)
def add_input(self, name, schema, value): def add_input(self, name, schema, value):
# NOTE: Inputs need to have uuid added because there can be many # NOTE: Inputs need to have uuid added because there can be many
@ -516,6 +547,18 @@ class DBResource(DBObject):
self.inputs.add(input) self.inputs.add(input)
def add_event(self, action, state, etype, child, child_action):
event = DBEvent(
parent=self.name,
parent_action=action,
state=state,
etype=etype,
child=child,
child_action=child_action
)
event.save()
self.events.add(event)
# TODO: remove this # TODO: remove this
if __name__ == '__main__': if __name__ == '__main__':

@ -90,7 +90,7 @@ def send_to_orchestration():
state_change = evapi.StateChange(res_uid, action) state_change = evapi.StateChange(res_uid, action)
state_change.insert(changed_nodes, dg) state_change.insert(changed_nodes, dg)
evapi.build_edges(changed_nodes, dg, events) evapi.build_edges(dg, events)
# what it should be? # what it should be?
dg.graph['name'] = 'system_log' dg.graph['name'] = 'system_log'

@ -17,6 +17,7 @@ import networkx as nx
from pytest import fixture from pytest import fixture
from solar.events import api as evapi from solar.events import api as evapi
from solar.interfaces import orm
from .base import BaseResourceTest from .base import BaseResourceTest
@ -31,19 +32,25 @@ def events_example():
def test_add_events(events_example): def test_add_events(events_example):
r = orm.DBResource(id='e1', name='e1', base_path='x')
r.save()
evapi.add_events('e1', events_example) evapi.add_events('e1', events_example)
assert set(evapi.all_events('e1')) == set(events_example) assert set(evapi.all_events('e1')) == set(events_example)
def test_set_events(events_example): def test_set_events(events_example):
r = orm.DBResource(id='e1', name='e1', base_path='x')
r.save()
partial = events_example[:2] partial = events_example[:2]
evapi.add_events('e1', events_example[:2]) evapi.add_events('e1', events_example[:2])
evapi.set_events('e1', events_example[2:]) evapi.set_events('e1', events_example[2:])
assert evapi.all_events('e1') == events_example[2:] assert evapi.all_events('e1') == events_example[2:]
def test_remove_events(events_example): def test_remove_events(events_example):
r = orm.DBResource(id='e1', name='e1', base_path='x')
r.save()
to_be_removed = events_example[2] to_be_removed = events_example[2]
evapi.add_events('e1', events_example) evapi.add_events('e1', events_example)
evapi.remove_event(to_be_removed) evapi.remove_event(to_be_removed)
@ -51,6 +58,8 @@ def test_remove_events(events_example):
def test_single_event(events_example): def test_single_event(events_example):
r = orm.DBResource(id='e1', name='e1', base_path='x')
r.save()
evapi.add_events('e1', events_example[:2]) evapi.add_events('e1', events_example[:2])
evapi.add_event(events_example[2]) evapi.add_event(events_example[2])
assert set(evapi.all_events('e1')) == set(events_example) assert set(evapi.all_events('e1')) == set(events_example)
@ -67,11 +76,10 @@ def nova_deps():
def test_nova_api_run_after_nova(nova_deps): def test_nova_api_run_after_nova(nova_deps):
changed = ['nova', 'nova_api']
changes_graph = nx.DiGraph() changes_graph = nx.DiGraph()
changes_graph.add_node('nova.run') changes_graph.add_node('nova.run')
changes_graph.add_node('nova_api.run') changes_graph.add_node('nova_api.run')
evapi.build_edges(changed, changes_graph, nova_deps) evapi.build_edges(changes_graph, nova_deps)
assert changes_graph.successors('nova.run') == ['nova_api.run'] assert changes_graph.successors('nova.run') == ['nova_api.run']
@ -80,10 +88,9 @@ def test_nova_api_react_on_update(nova_deps):
"""Test that nova_api:update will be called even if there is no changes """Test that nova_api:update will be called even if there is no changes
in nova_api in nova_api
""" """
changed = ['nova']
changes_graph = nx.DiGraph() changes_graph = nx.DiGraph()
changes_graph.add_node('nova.update') changes_graph.add_node('nova.update')
evapi.build_edges(changed, changes_graph, nova_deps) evapi.build_edges(changes_graph, nova_deps)
assert changes_graph.successors('nova.update') == ['nova_api.update'] assert changes_graph.successors('nova.update') == ['nova_api.update']
@ -106,7 +113,6 @@ def rmq_deps():
def test_rmq(rmq_deps): def test_rmq(rmq_deps):
changed = ['rmq.1', 'rmq.2', 'rmq.3', 'rmq_cluster.1', 'rmq_cluster.2', 'rmq_cluster.3']
changes_graph = nx.DiGraph() changes_graph = nx.DiGraph()
changes_graph.add_node('rmq.1.run') changes_graph.add_node('rmq.1.run')
changes_graph.add_node('rmq.2.run') changes_graph.add_node('rmq.2.run')
@ -114,7 +120,7 @@ def test_rmq(rmq_deps):
changes_graph.add_node('rmq_cluster.1.create') changes_graph.add_node('rmq_cluster.1.create')
changes_graph.add_node('rmq_cluster.2.join') changes_graph.add_node('rmq_cluster.2.join')
changes_graph.add_node('rmq_cluster.3.join') changes_graph.add_node('rmq_cluster.3.join')
evapi.build_edges(changed, changes_graph, rmq_deps) evapi.build_edges(changes_graph, rmq_deps)
assert set(changes_graph.successors('rmq_cluster.1.create')) == { assert set(changes_graph.successors('rmq_cluster.1.create')) == {
'rmq_cluster.2.join', 'rmq_cluster.3.join'} 'rmq_cluster.2.join', 'rmq_cluster.3.join'}
@ -123,15 +129,19 @@ def test_rmq(rmq_deps):
def test_riak(): def test_riak():
events = { events = {
'riak_service1': [evapi.React('riak_service1', 'run', 'success', 'riak_service2', 'join'), 'riak_service1': [
evapi.React('riak_service1', 'run', 'success', 'riak_service3', 'join')], evapi.React('riak_service1', 'run', 'success', 'riak_service2', 'run'),
'riak_service3': [evapi.React('riak_service3', 'join', 'success', 'riak_service1', 'commit')], evapi.React('riak_service1', 'run', 'success', 'riak_service3', 'run')],
'riak_service2': [evapi.React('riak_service2', 'join', 'success', 'riak_service1', 'commit')], 'riak_service3': [
evapi.React('riak_service3', 'join', 'success', 'riak_service1', 'commit'),
evapi.React('riak_service3', 'run', 'success', 'riak_service3', 'join')],
'riak_service2': [
evapi.React('riak_service2', 'run', 'success', 'riak_service2', 'join'),
evapi.React('riak_service2', 'join', 'success', 'riak_service1', 'commit')],
} }
changed = ['riak_service1']
changes_graph = nx.DiGraph() changes_graph = nx.MultiDiGraph()
changes_graph.add_node('riak_service1.run') changes_graph.add_node('riak_service1.run')
evapi.build_edges(changed, changes_graph, events) evapi.build_edges(changes_graph, events)
assert nx.topological_sort(changes_graph) == [ assert set(changes_graph.predecessors('riak_service1.commit')) == {'riak_service2.join', 'riak_service3.join'}
'riak_service1.run', 'riak_service2.join', 'riak_service3.join', 'riak_service1.commit']

@ -226,7 +226,6 @@ class TestResourceORM(BaseResourceTest):
r.save() r.save()
r.add_input('ip', 'str!', '10.0.0.2') r.add_input('ip', 'str!', '10.0.0.2')
self.assertEqual(len(r.inputs.as_set()), 1) self.assertEqual(len(r.inputs.as_set()), 1)
@ -421,3 +420,63 @@ input:
signals.disconnect(sample2, sample_dict_list) signals.disconnect(sample2, sample_dict_list)
self.assertEqual(vi.backtrack_value_emitter(), self.assertEqual(vi.backtrack_value_emitter(),
[{'a': sample1.resource_inputs()['value']}]) [{'a': sample1.resource_inputs()['value']}])
class TestEventORM(BaseResourceTest):
def test_return_emtpy_set(self):
r = orm.DBResource(id='test1', name='test1', base_path='x')
r.save()
self.assertEqual(r.events.as_set(), set())
def test_save_and_load_by_parent(self):
ev = orm.DBEvent(
parent='n1',
parent_action='run',
state='success',
child_action='run',
child='n2',
etype='dependency')
ev.save()
rst = orm.DBEvent.load(ev.id)
self.assertEqual(rst, ev)
def test_save_several(self):
ev = orm.DBEvent(
parent='n1',
parent_action='run',
state='success',
child_action='run',
child='n2',
etype='dependency')
ev.save()
ev1 = orm.DBEvent(
parent='n1',
parent_action='run',
state='success',
child_action='run',
child='n3',
etype='dependency')
ev1.save()
self.assertEqual(len(orm.DBEvent.load_all()), 2)
def test_removal_of_event(self):
r = orm.DBResource(id='n1', name='n1', base_path='x')
r.save()
ev = orm.DBEvent(
parent='n1',
parent_action='run',
state='success',
child_action='run',
child='n2',
etype='dependency')
ev.save()
r.events.add(ev)
self.assertEqual(r.events.as_set(), {ev})
ev.delete()
r = orm.DBResource.load('n1')
self.assertEqual(r.events.as_set(), set())