Merge pull request #303 from dshulyak/fix_discard

Fix discard
This commit is contained in:
Jędrzej Nowak 2015-11-06 15:51:06 +01:00
commit 83efb161bd
7 changed files with 132 additions and 57 deletions

View File

@ -142,8 +142,7 @@ def test(name):
@changes.command(name='clean-history')
def clean_history():
data.CL().clean()
data.CD().clean()
change.clear_history()
@changes.command(help='USE ONLY FOR TESTING')
def commit():

View File

@ -220,23 +220,22 @@ class Resource(object):
stored as:
[(emitter, emitter_input, receiver, receiver_input), ...]
"""
return []
rst = []
rst = set()
# TODO: fix it
for (emitter_resource, emitter_input), (receiver_resource, receiver_input), meta in self.graph().edges(data=True):
if meta:
receiver_input = '{}:{}|{}'.format(receiver_input,
meta['destination_key'], meta['tag'])
rst.append(
[emitter_resource, emitter_input,
receiver_resource, receiver_input])
return rst
rst.add(
(emitter_resource, emitter_input,
receiver_resource, receiver_input))
return [list(i) for i in rst]
def graph(self):
mdg = networkx.MultiDiGraph()
for data in self.db_obj.inputs._edges():
mdg.add_edges_from(data)
for u, v, data in self.db_obj.inputs._edges():
mdg.add_edge(u, v, attr_dict=data)
return mdg
def resource_inputs(self):

View File

@ -560,9 +560,7 @@ class ModelMeta(type):
@classmethod
def remove_all(mcs):
for model in mcs._defined_models:
rst = model.bucket.get_index('$bucket', startkey='_', max_results=100000).results
for key in rst:
model.bucket.delete(key)
model.delete_all()
@classmethod
def save_all_lazy(mcs, result=True):
@ -886,11 +884,16 @@ class Model(object):
def save_lazy(self):
self._c.lazy_save.add(self)
@classmethod
def delete_all(cls):
rst = cls.bucket.get_index('$bucket', startkey='_', max_results=100000).results
for key in rst:
cls.bucket.delete(key)
def delete(self):
ls = self._c.lazy_save
try:
ls.remove(self.key)
ls.remove(self)
except KeyError:
pass
self._riak_object.delete()

View File

@ -70,7 +70,7 @@ class InputsFieldWrp(IndexFieldWrp):
'tag': data[4]}
else:
raise Exception("Unsupported case")
yield (my_resource, my_input), (other_resource, other_input), meta
yield (other_resource, other_input), (my_resource, my_input), meta
def __contains__(self, name):
try:
@ -199,11 +199,14 @@ class InputsFieldWrp(IndexFieldWrp):
recvs = filter(lambda x: x[0] == '{}_recv_bin'.format(self.fname), indexes)
for recv in recvs:
_, ind_value = recv
if ind_value.startswith('{}|{}|'.format(self._instance.key, name)):
recv_name = name
if ':' in recv_name:
recv_name = recv_name.split(':')[0]
if ind_value.startswith('{}|{}|'.format(self._instance.key, recv_name)):
to_dels.append(recv)
emits = filter(lambda x: x[0] == '{}_emit_bin'.format(self.fname), indexes)
for emit in emits:
_, ind_value = recv
_, ind_value = emit
if ind_value.endswith('|{}|{}'.format(self._instance.key, name)):
to_dels.append(emit)
for to_del in to_dels:
@ -583,6 +586,7 @@ class Resource(Model):
@classmethod
def childs(cls, parents):
all_indexes = cls.bucket.get_index(
'inputs_recv_bin',
startkey='',
@ -592,7 +596,7 @@ class Resource(Model):
tmp = defaultdict(set)
to_visit = parents[:]
visited = []
visited = set()
for item in all_indexes.results:
data = item[0].split('|')
@ -604,9 +608,25 @@ class Resource(Model):
for child in tmp[n]:
if child not in visited:
to_visit.append(child)
visited.append(n)
visited.add(n)
return visited
def delete(self):
inputs_index = self.bucket.get_index(
'inputs_emit_bin',
startkey=self.key,
endkey=self.key+'~',
return_terms=True,
max_results=999999)
for emit_bin in inputs_index.results:
index_vals = emit_bin[0].split('|')
my_res, my_key, other_res, other_key = index_vals[:4]
emit_obj = Resource.get(other_res)
emit_obj.inputs.disconnect(other_key)
super(Resource, self).delete()
class CommitedResource(Model):

View File

@ -504,3 +504,53 @@ def test_events(rk):
r1.events.pop()
r1.save()
assert r1.events == ['event1']
def test_delete(rk):
k1 = next(rk)
k2 = next(rk)
r1 = create_resource(k1, {'name': 'first',
'inputs': {'input1': 10,
'input2': 15}})
r2 = create_resource(k2, {'name': 'first',
'inputs': {'input1': None,
'input2': None}})
r1.connect(r2, {'input1': 'input1'})
r1.save()
r2.save()
r1.delete()
recv_emit_bin = []
for index in r2._riak_object.indexes:
if 'recv' in index[0] or 'emit' in index[0]:
recv_emit_bin.append(index)
assert recv_emit_bin == []
def test_delete_hash(rk):
k1 = next(rk)
k2 = next(rk)
r1 = create_resource(k1, {'name': 'first',
'inputs': {'input1': 10,
'input2': 15}})
r2 = create_resource(k2, {'name': 'second',
'inputs': {'input': {'input1': None,
'input2': None}}})
r1.connect(r2, {'input1': 'input:input1',
'input2': 'input:input2'})
r1.save()
r2.save()
r1.delete()
recv_emit_bin = []
for index in r2._riak_object.indexes:
if 'recv' in index[0] or 'emit' in index[0]:
recv_emit_bin.append(index)
assert recv_emit_bin == []

View File

@ -132,24 +132,23 @@ def parameters(res, action, data):
'type': 'solar_resource'}
def check_uids_present(log, uids):
not_valid = []
for uid in uids:
if log.get(uid) is None:
not_valid.append(uid)
if not_valid:
raise CannotFindID('UIDS: {} not in history.'.format(not_valid))
def _get_args_to_update(args, connections):
"""For each resource we can update only args that are not provided
by connections
"""
inherited = [i[3].split(':')[0] for i in connections]
return {
key:args[key] for key in args
if key not in inherited
}
def revert_uids(uids):
"""
:param uids: iterable not generator
"""
history = data.CL()
check_uids_present(history, uids)
items = LogItem.multi_get(uids)
for uid in uids:
item = history.get(uid)
for item in items:
if item.action == CHANGES.update.name:
_revert_update(item)
@ -165,10 +164,12 @@ def revert_uids(uids):
def _revert_remove(logitem):
"""Resource should be created with all previous connections
"""
commited = orm.DBCommitedState.load(logitem.res)
commited = CommitedResource.get(logitem.resource)
args = dictdiffer.revert(logitem.diff, commited.inputs)
connections = dictdiffer.revert(logitem.signals_diff, sorted(commited.connections))
resource.Resource(logitem.res, logitem.base_path, args=args, tags=commited.tags)
connections = dictdiffer.revert(logitem.connections_diff, sorted(commited.connections))
resource.Resource(logitem.resource, logitem.base_path,
args=_get_args_to_update(args, connections), tags=commited.tags)
for emitter, emitter_input, receiver, receiver_input in connections:
emmiter_obj = resource.load(emitter)
receiver_obj = resource.load(receiver)
@ -204,18 +205,18 @@ def _update_inputs_connections(res_obj, args, old_connections, new_connections):
def _revert_update(logitem):
"""Revert of update should update inputs and connections
"""
res_obj = resource.load(logitem.res)
res_obj = resource.load(logitem.resource)
commited = res_obj.load_commited()
args_to_update = dictdiffer.revert(logitem.diff, commited.inputs)
connections = dictdiffer.revert(logitem.signals_diff, sorted(commited.connections))
connections = dictdiffer.revert(logitem.connections_diff, sorted(commited.connections))
args = dictdiffer.revert(logitem.diff, commited.inputs)
_update_inputs_connections(
res_obj, args_to_update, commited.connections, connections)
res_obj, _get_args_to_update(args, connections), commited.connections, connections)
def _revert_run(logitem):
res_obj = resource.load(logitem.res)
res_obj = resource.load(logitem.resource)
res_obj.remove()
@ -224,27 +225,26 @@ def revert(uid):
def _discard_remove(item):
resource_obj = resource.load(item.res)
resource_obj = resource.load(item.resource)
resource_obj.set_created()
def _discard_update(item):
resource_obj = resource.load(item.res)
resource_obj = resource.load(item.resource)
old_connections = resource_obj.connections
new_connections = dictdiffer.revert(item.signals_diff, sorted(old_connections))
new_connections = dictdiffer.revert(item.connections_diff, sorted(old_connections))
args = dictdiffer.revert(item.diff, resource_obj.args)
_update_inputs_connections(
resource_obj, args, old_connections, new_connections)
resource_obj, _get_args_to_update(args, new_connections), old_connections, new_connections)
def _discard_run(item):
resource.load(item.res).remove(force=True)
resource.load(item.resource).remove(force=True)
def discard_uids(uids):
staged_log = data.SL()
check_uids_present(staged_log, uids)
for uid in uids:
item = staged_log.get(uid)
items = LogItem.multi_get(uids)
for item in items:
if item.action == CHANGES.update.name:
_discard_update(item)
elif item.action == CHANGES.remove.name:
@ -254,7 +254,7 @@ def discard_uids(uids):
else:
log.debug('Action %s for resource %s is a side'
' effect of another action', item.action, item.res)
staged_log.pop(uid)
item.delete()
def discard_uid(uid):
@ -271,3 +271,7 @@ def commit_all():
from .operations import move_to_commited
for item in data.SL():
move_to_commited(item.log_action)
def clear_history():
LogItem.delete_all()
CommitedResource.delete_all()

View File

@ -34,26 +34,26 @@ def move_to_commited(log_action, *args, **kwargs):
sl = data.SL()
item = next((i for i in sl if i.log_action == log_action), None)
if item:
resource_obj = resource.load(item.resource)
commited = CommitedResource.get_or_create(item.resource)
updated = resource_obj.db_obj.updated
if item.action == CHANGES.remove.name:
resource_obj.delete()
commited.state = resource.RESOURCE_STATE.removed.name
else:
resource_obj.set_operational()
commited.state = resource.RESOURCE_STATE.operational.name
commited.inputs = patch(item.diff, commited.inputs)
# TODO fix TagsWrp to return list
# commited.tags = resource_obj.tags
sorted_connections = sorted(commited.connections)
commited.connections = patch(item.connections_diff, sorted_connections)
commited.base_path = item.base_path
updated = resource_obj.db_obj.updated
# required to update `updated` field
resource_obj.db_obj.save()
commited.inputs = patch(item.diff, commited.inputs)
# TODO fix TagsWrp to return list
# commited.tags = resource_obj.tags
sorted_connections = sorted(commited.connections)
commited.connections = patch(item.connections_diff, sorted_connections)
commited.save()
item.log = 'history'
item.state = 'success'
item.updated = resource_obj.db_obj.updated
item.updated = updated
item.save()