From 4a380217d74567a34763c473dd732f5bb3d98649 Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Thu, 5 Nov 2015 16:59:28 +0200 Subject: [PATCH 1/5] Fix discard --- solar/solar/cli/system_log.py | 3 +- solar/solar/core/resource/resource.py | 5 ++- solar/solar/dblayer/model.py | 9 +++-- solar/solar/dblayer/solar_models.py | 25 ++++++++++++-- solar/solar/dblayer/test/test_real.py | 50 +++++++++++++++++++++++++++ solar/solar/system_log/change.py | 27 ++++++++++----- 6 files changed, 99 insertions(+), 20 deletions(-) diff --git a/solar/solar/cli/system_log.py b/solar/solar/cli/system_log.py index 890c5d64..962e9d43 100644 --- a/solar/solar/cli/system_log.py +++ b/solar/solar/cli/system_log.py @@ -142,8 +142,7 @@ def test(name): @changes.command(name='clean-history') def clean_history(): - data.CL().clean() - data.CD().clean() + change.clear_history() @changes.command(help='USE ONLY FOR TESTING') def commit(): diff --git a/solar/solar/core/resource/resource.py b/solar/solar/core/resource/resource.py index 5a2df211..75e6615c 100644 --- a/solar/solar/core/resource/resource.py +++ b/solar/solar/core/resource/resource.py @@ -220,7 +220,6 @@ class Resource(object): stored as: [(emitter, emitter_input, receiver, receiver_input), ...] """ - return [] rst = [] # TODO: fix it for (emitter_resource, emitter_input), (receiver_resource, receiver_input), meta in self.graph().edges(data=True): @@ -235,8 +234,8 @@ class Resource(object): def graph(self): mdg = networkx.MultiDiGraph() - for data in self.db_obj.inputs._edges(): - mdg.add_edges_from(data) + for u, v, data in self.db_obj.inputs._edges(): + mdg.add_edge(u, v, attr_dict=data) return mdg def resource_inputs(self): diff --git a/solar/solar/dblayer/model.py b/solar/solar/dblayer/model.py index 664f29a6..f8caa57a 100644 --- a/solar/solar/dblayer/model.py +++ b/solar/solar/dblayer/model.py @@ -520,9 +520,7 @@ class ModelMeta(type): @classmethod def remove_all(mcs): for model in mcs._defined_models: - rst = model.bucket.get_index('$bucket', startkey='_', max_results=100000).results - for key in rst: - model.bucket.delete(key) + model.delete_all() @classmethod def save_all_lazy(mcs, result=True): @@ -844,6 +842,11 @@ class Model(object): def save_lazy(self): self._c.lazy_save.add(self) + @classmethod + def delete_all(cls): + rst = cls.bucket.get_index('$bucket', startkey='_', max_results=100000).results + for key in rst: + cls.bucket.delete(key) def delete(self): ls = self._c.lazy_save diff --git a/solar/solar/dblayer/solar_models.py b/solar/solar/dblayer/solar_models.py index f516de00..ca5e4dbf 100644 --- a/solar/solar/dblayer/solar_models.py +++ b/solar/solar/dblayer/solar_models.py @@ -69,7 +69,7 @@ class InputsFieldWrp(IndexFieldWrp): 'tag': data[4]} else: raise Exception("Unsupported case") - yield (my_resource, my_input), (other_resource, other_input), meta + yield (other_resource, other_input), (my_resource, my_input), meta def __contains__(self, name): try: @@ -194,11 +194,14 @@ class InputsFieldWrp(IndexFieldWrp): recvs = filter(lambda x: x[0] == '{}_recv_bin'.format(self.fname), indexes) for recv in recvs: _, ind_value = recv - if ind_value.startswith('{}|{}|'.format(self._instance.key, name)): + recv_name = name + if ':' in recv_name: + recv_name = recv_name.split(':')[0] + if ind_value.startswith('{}|{}|'.format(self._instance.key, recv_name)): to_dels.append(recv) emits = filter(lambda x: x[0] == '{}_emit_bin'.format(self.fname), indexes) for emit in emits: - _, ind_value = recv + _, ind_value = emit if ind_value.endswith('|{}|{}'.format(self._instance.key, name)): to_dels.append(emit) for to_del in to_dels: @@ -567,6 +570,7 @@ class Resource(Model): @classmethod def childs(cls, parents): + all_indexes = cls.bucket.get_index( 'inputs_recv_bin', startkey='', @@ -591,6 +595,21 @@ class Resource(Model): visited.append(n) return visited + def delete(self): + inputs_index = self.bucket.get_index( + 'inputs_emit_bin', + startkey=self.key, + endkey=self.key+'~', + return_terms=True, + max_results=999999) + + for emit_bin in inputs_index.results: + index_vals = emit_bin[0].split('|') + + my_res, my_key, other_res, other_key = index_vals[:4] + Resource.get(other_res).inputs.disconnect(other_key) + super(Resource, self).delete() + class CommitedResource(Model): diff --git a/solar/solar/dblayer/test/test_real.py b/solar/solar/dblayer/test/test_real.py index c8acf8ce..a1db5e53 100644 --- a/solar/solar/dblayer/test/test_real.py +++ b/solar/solar/dblayer/test/test_real.py @@ -504,3 +504,53 @@ def test_events(rk): r1.events.pop() r1.save() assert r1.events == ['event1'] + + +def test_delete(rk): + k1 = next(rk) + k2 = next(rk) + + r1 = create_resource(k1, {'name': 'first', + 'inputs': {'input1': 10, + 'input2': 15}}) + r2 = create_resource(k2, {'name': 'first', + 'inputs': {'input1': None, + 'input2': None}}) + + r1.connect(r2, {'input1': 'input1'}) + r1.save() + r2.save() + + r1.delete() + + recv_emit_bin = [] + for index in r2._riak_object.indexes: + if 'recv' in index[0] or 'emit' in index[0]: + recv_emit_bin.append(index) + assert recv_emit_bin == [] + + +def test_delete_hash(rk): + k1 = next(rk) + k2 = next(rk) + + r1 = create_resource(k1, {'name': 'first', + 'inputs': {'input1': 10, + 'input2': 15}}) + r2 = create_resource(k2, {'name': 'second', + 'inputs': {'input': {'input1': None, + 'input2': None}}}) + + + r1.connect(r2, {'input1': 'input:input1', + 'input2': 'input:input2'}) + + r1.save() + r2.save() + + r1.delete() + recv_emit_bin = [] + for index in r2._riak_object.indexes: + if 'recv' in index[0] or 'emit' in index[0]: + recv_emit_bin.append(index) + assert recv_emit_bin == [] diff --git a/solar/solar/system_log/change.py b/solar/solar/system_log/change.py index ad3ad902..4ecda84b 100644 --- a/solar/solar/system_log/change.py +++ b/solar/solar/system_log/change.py @@ -135,7 +135,7 @@ def parameters(res, action, data): def check_uids_present(log, uids): not_valid = [] for uid in uids: - if log.get(uid) is None: + if LogItem.get(uid) is None: not_valid.append(uid) if not_valid: raise CannotFindID('UIDS: {} not in history.'.format(not_valid)) @@ -149,7 +149,7 @@ def revert_uids(uids): check_uids_present(history, uids) for uid in uids: - item = history.get(uid) + item = LogItem.get(uid) if item.action == CHANGES.update.name: _revert_update(item) @@ -224,27 +224,32 @@ def revert(uid): def _discard_remove(item): - resource_obj = resource.load(item.res) + resource_obj = resource.load(item.resource) resource_obj.set_created() def _discard_update(item): - resource_obj = resource.load(item.res) + resource_obj = resource.load(item.resource) old_connections = resource_obj.connections - new_connections = dictdiffer.revert(item.signals_diff, sorted(old_connections)) + new_connections = dictdiffer.revert(item.connections_diff, sorted(old_connections)) args = dictdiffer.revert(item.diff, resource_obj.args) + inherited = [i[3].split(':') for i in new_connections] + args_to_update = { + key:args[key] for key in args + if key not in inherited + } _update_inputs_connections( - resource_obj, args, old_connections, new_connections) + resource_obj, args_to_update, old_connections, new_connections) def _discard_run(item): - resource.load(item.res).remove(force=True) + resource.load(item.resource).remove(force=True) def discard_uids(uids): staged_log = data.SL() check_uids_present(staged_log, uids) for uid in uids: - item = staged_log.get(uid) + item = LogItem.get(uid) if item.action == CHANGES.update.name: _discard_update(item) elif item.action == CHANGES.remove.name: @@ -254,7 +259,7 @@ def discard_uids(uids): else: log.debug('Action %s for resource %s is a side' ' effect of another action', item.action, item.res) - staged_log.pop(uid) + item.delete() def discard_uid(uid): @@ -271,3 +276,7 @@ def commit_all(): from .operations import move_to_commited for item in data.SL(): move_to_commited(item.log_action) + +def clear_history(): + LogItem.delete_all() + CommitedResource.delete_all() From ffa7e786f62016c12895825098b15c62419b2c26 Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Thu, 5 Nov 2015 18:45:18 +0200 Subject: [PATCH 2/5] Fix revert --- solar/solar/core/resource/resource.py | 10 +++++----- solar/solar/system_log/change.py | 27 +++++++++++++++++++-------- solar/solar/system_log/operations.py | 16 ++++++++-------- 3 files changed, 32 insertions(+), 21 deletions(-) diff --git a/solar/solar/core/resource/resource.py b/solar/solar/core/resource/resource.py index 75e6615c..2447190f 100644 --- a/solar/solar/core/resource/resource.py +++ b/solar/solar/core/resource/resource.py @@ -220,17 +220,17 @@ class Resource(object): stored as: [(emitter, emitter_input, receiver, receiver_input), ...] """ - rst = [] + rst = set() # TODO: fix it for (emitter_resource, emitter_input), (receiver_resource, receiver_input), meta in self.graph().edges(data=True): if meta: receiver_input = '{}:{}|{}'.format(receiver_input, meta['destination_key'], meta['tag']) - rst.append( - [emitter_resource, emitter_input, - receiver_resource, receiver_input]) - return rst + rst.add( + (emitter_resource, emitter_input, + receiver_resource, receiver_input)) + return [list(i) for i in rst] def graph(self): mdg = networkx.MultiDiGraph() diff --git a/solar/solar/system_log/change.py b/solar/solar/system_log/change.py index 4ecda84b..f56e6e08 100644 --- a/solar/solar/system_log/change.py +++ b/solar/solar/system_log/change.py @@ -165,10 +165,16 @@ def revert_uids(uids): def _revert_remove(logitem): """Resource should be created with all previous connections """ - commited = orm.DBCommitedState.load(logitem.res) + commited = CommitedResource.get(logitem.resource) args = dictdiffer.revert(logitem.diff, commited.inputs) - connections = dictdiffer.revert(logitem.signals_diff, sorted(commited.connections)) - resource.Resource(logitem.res, logitem.base_path, args=args, tags=commited.tags) + connections = dictdiffer.revert(logitem.connections_diff, sorted(commited.connections)) + inherited = [i[3].split(':')[0] for i in connections] + args_to_update = { + key:args[key] for key in args + if key not in inherited + } + + resource.Resource(logitem.resource, logitem.base_path, args=args_to_update, tags=commited.tags) for emitter, emitter_input, receiver, receiver_input in connections: emmiter_obj = resource.load(emitter) receiver_obj = resource.load(receiver) @@ -204,18 +210,23 @@ def _update_inputs_connections(res_obj, args, old_connections, new_connections): def _revert_update(logitem): """Revert of update should update inputs and connections """ - res_obj = resource.load(logitem.res) + res_obj = resource.load(logitem.resource) commited = res_obj.load_commited() - args_to_update = dictdiffer.revert(logitem.diff, commited.inputs) - connections = dictdiffer.revert(logitem.signals_diff, sorted(commited.connections)) + connections = dictdiffer.revert(logitem.connections_diff, sorted(commited.connections)) + args = dictdiffer.revert(logitem.diff, commited.inputs) + inherited = [i[3].split(':')[0] for i in connections] + args_to_update = { + key:args[key] for key in args + if key not in inherited + } _update_inputs_connections( res_obj, args_to_update, commited.connections, connections) def _revert_run(logitem): - res_obj = resource.load(logitem.res) + res_obj = resource.load(logitem.resource) res_obj.remove() @@ -233,7 +244,7 @@ def _discard_update(item): old_connections = resource_obj.connections new_connections = dictdiffer.revert(item.connections_diff, sorted(old_connections)) args = dictdiffer.revert(item.diff, resource_obj.args) - inherited = [i[3].split(':') for i in new_connections] + inherited = [i[3].split(':')[0] for i in new_connections] args_to_update = { key:args[key] for key in args if key not in inherited diff --git a/solar/solar/system_log/operations.py b/solar/solar/system_log/operations.py index f166ff3d..112ae483 100644 --- a/solar/solar/system_log/operations.py +++ b/solar/solar/system_log/operations.py @@ -34,26 +34,26 @@ def move_to_commited(log_action, *args, **kwargs): sl = data.SL() item = next((i for i in sl if i.log_action == log_action), None) if item: - resource_obj = resource.load(item.resource) commited = CommitedResource.get_or_create(item.resource) - + updated = resource_obj.db_obj.updated if item.action == CHANGES.remove.name: resource_obj.delete() commited.state = resource.RESOURCE_STATE.removed.name else: resource_obj.set_operational() commited.state = resource.RESOURCE_STATE.operational.name - commited.inputs = patch(item.diff, commited.inputs) - # TODO fix TagsWrp to return list - # commited.tags = resource_obj.tags - sorted_connections = sorted(commited.connections) - commited.connections = patch(item.connections_diff, sorted_connections) commited.base_path = item.base_path + updated = resource_obj.db_obj.updated # required to update `updated` field resource_obj.db_obj.save() + commited.inputs = patch(item.diff, commited.inputs) + # TODO fix TagsWrp to return list + # commited.tags = resource_obj.tags + sorted_connections = sorted(commited.connections) + commited.connections = patch(item.connections_diff, sorted_connections) commited.save() item.log = 'history' item.state = 'success' - item.updated = resource_obj.db_obj.updated + item.updated = updated item.save() From 6e4ae64d480ea440c97dd8e492e15113c0db11cb Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Fri, 6 Nov 2015 11:40:57 +0200 Subject: [PATCH 3/5] Fix duplicates in update filter and delete with lazy_save --- solar/solar/dblayer/model.py | 2 +- solar/solar/dblayer/solar_models.py | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/solar/solar/dblayer/model.py b/solar/solar/dblayer/model.py index f8caa57a..24c7e155 100644 --- a/solar/solar/dblayer/model.py +++ b/solar/solar/dblayer/model.py @@ -851,7 +851,7 @@ class Model(object): def delete(self): ls = self._c.lazy_save try: - ls.remove(self.key) + ls.remove(self) except KeyError: pass self._riak_object.delete() diff --git a/solar/solar/dblayer/solar_models.py b/solar/solar/dblayer/solar_models.py index ca5e4dbf..67d5f6c6 100644 --- a/solar/solar/dblayer/solar_models.py +++ b/solar/solar/dblayer/solar_models.py @@ -580,7 +580,7 @@ class Resource(Model): tmp = defaultdict(set) to_visit = parents[:] - visited = [] + visited = set() for item in all_indexes.results: data = item[0].split('|') @@ -592,7 +592,7 @@ class Resource(Model): for child in tmp[n]: if child not in visited: to_visit.append(child) - visited.append(n) + visited.add(n) return visited def delete(self): @@ -607,7 +607,8 @@ class Resource(Model): index_vals = emit_bin[0].split('|') my_res, my_key, other_res, other_key = index_vals[:4] - Resource.get(other_res).inputs.disconnect(other_key) + emit_obj = Resource.get(other_res) + emit_obj.inputs.disconnect(other_key) super(Resource, self).delete() From 5ea07c658a634b56c40a1f283234803797301694 Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Fri, 6 Nov 2015 12:38:14 +0200 Subject: [PATCH 4/5] Move args_to_update into separate function --- solar/solar/system_log/change.py | 33 +++++++++++++++----------------- 1 file changed, 15 insertions(+), 18 deletions(-) diff --git a/solar/solar/system_log/change.py b/solar/solar/system_log/change.py index f56e6e08..5c03c45e 100644 --- a/solar/solar/system_log/change.py +++ b/solar/solar/system_log/change.py @@ -141,6 +141,16 @@ def check_uids_present(log, uids): raise CannotFindID('UIDS: {} not in history.'.format(not_valid)) +def _get_args_to_update(args, connections): + """For each resource we can update only args that are not provided + by connections + """ + inherited = [i[3].split(':')[0] for i in connections] + return { + key:args[key] for key in args + if key not in inherited + } + def revert_uids(uids): """ :param uids: iterable not generator @@ -168,13 +178,9 @@ def _revert_remove(logitem): commited = CommitedResource.get(logitem.resource) args = dictdiffer.revert(logitem.diff, commited.inputs) connections = dictdiffer.revert(logitem.connections_diff, sorted(commited.connections)) - inherited = [i[3].split(':')[0] for i in connections] - args_to_update = { - key:args[key] for key in args - if key not in inherited - } - resource.Resource(logitem.resource, logitem.base_path, args=args_to_update, tags=commited.tags) + resource.Resource(logitem.resource, logitem.base_path, + args=_get_args_to_update(args, connections), tags=commited.tags) for emitter, emitter_input, receiver, receiver_input in connections: emmiter_obj = resource.load(emitter) receiver_obj = resource.load(receiver) @@ -215,14 +221,9 @@ def _revert_update(logitem): connections = dictdiffer.revert(logitem.connections_diff, sorted(commited.connections)) args = dictdiffer.revert(logitem.diff, commited.inputs) - inherited = [i[3].split(':')[0] for i in connections] - args_to_update = { - key:args[key] for key in args - if key not in inherited - } _update_inputs_connections( - res_obj, args_to_update, commited.connections, connections) + res_obj, _get_args_to_update(args, connections), commited.connections, connections) def _revert_run(logitem): @@ -244,13 +245,9 @@ def _discard_update(item): old_connections = resource_obj.connections new_connections = dictdiffer.revert(item.connections_diff, sorted(old_connections)) args = dictdiffer.revert(item.diff, resource_obj.args) - inherited = [i[3].split(':')[0] for i in new_connections] - args_to_update = { - key:args[key] for key in args - if key not in inherited - } + _update_inputs_connections( - resource_obj, args_to_update, old_connections, new_connections) + resource_obj, _get_args_to_update(args, new_connections), old_connections, new_connections) def _discard_run(item): resource.load(item.resource).remove(force=True) From bd21f902279e00716919ff7340e86d0a0788090d Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Fri, 6 Nov 2015 13:05:53 +0200 Subject: [PATCH 5/5] Use multi_get in discard and revert --- solar/solar/system_log/change.py | 21 ++++----------------- 1 file changed, 4 insertions(+), 17 deletions(-) diff --git a/solar/solar/system_log/change.py b/solar/solar/system_log/change.py index 5c03c45e..ecff4df6 100644 --- a/solar/solar/system_log/change.py +++ b/solar/solar/system_log/change.py @@ -132,15 +132,6 @@ def parameters(res, action, data): 'type': 'solar_resource'} -def check_uids_present(log, uids): - not_valid = [] - for uid in uids: - if LogItem.get(uid) is None: - not_valid.append(uid) - if not_valid: - raise CannotFindID('UIDS: {} not in history.'.format(not_valid)) - - def _get_args_to_update(args, connections): """For each resource we can update only args that are not provided by connections @@ -155,11 +146,9 @@ def revert_uids(uids): """ :param uids: iterable not generator """ - history = data.CL() - check_uids_present(history, uids) + items = LogItem.multi_get(uids) - for uid in uids: - item = LogItem.get(uid) + for item in items: if item.action == CHANGES.update.name: _revert_update(item) @@ -254,10 +243,8 @@ def _discard_run(item): def discard_uids(uids): - staged_log = data.SL() - check_uids_present(staged_log, uids) - for uid in uids: - item = LogItem.get(uid) + items = LogItem.multi_get(uids) + for item in items: if item.action == CHANGES.update.name: _discard_update(item) elif item.action == CHANGES.remove.name: