Rework staging procedure to support both implicit and explicit stages

Current change addresses several problems -
- It was impossible to re-stage already commited resources.
For example re-run openstack actions without doing any artificial
updates in solar inputs.
- Also there was no way to execute actions that are not related
to state in solar (run/update/remove). An example would be -
restart of services.
- And following changes addresses isolation problem in staging
procedure. By design solar is isolated using tags semantics,
but previous implemention of *process* was building a graph
unconditionally for all staged resources. It was reworked and now
we can support partial processing of resources, based on tags.

Implicit staging will be done when resource is update/created/removed.
Additionally actions can be staged using solar ch stage command,
to support this additional flags were added:
--action, -a - action that should be staged
--tag, -t - tags to select group of resources
--name, -n - resource that will be staged
Only one from name or tag will be used, if user will provide both - name
will be of higher priority

Reverts and discard are working as previously for
creation/update/removal of resources, but Exception will be raised if
revert will be attempted for custom action, such as *restart*.

Processing staged items can be achieved with -
solar ch process -t tag1 -t tag2

History and staged log items will be stored in different buckets.

Custom siblings resolved for LogItem will ensure that there is only
one resource action is staged.

implements blueprint refactor-process-of-staging-changes

Change-Id: I9e634803a38d80213b87518cd2c8fdc022237aa0
This commit is contained in:
Dmitry Shulyak 2016-03-11 13:45:36 +02:00
parent 26fcbeb0a5
commit d1b30e4be0
12 changed files with 407 additions and 397 deletions

View File

@ -39,10 +39,14 @@ def validate():
@changes.command()
@click.option('--action', '-a', default=None, help='resource action')
@click.option('--name', '-n', default=None, help='resource name')
@click.option('--tag', '-t', multiple=True, help='resource tags')
@click.option('-d', default=False, is_flag=True, help='detailed view')
def stage(d):
log = change.stage_changes()
log.reverse()
def stage(action, name, tag, d):
if action and (name or tag):
resource.stage_resources(name or tag, action)
log = change.staged_log(populate_with_changes=True)
for item in log:
click.echo(data.compact(item))
if d:
@ -65,8 +69,9 @@ def staged_item(uid):
@changes.command()
def process():
uid = change.send_to_orchestration().graph['uid']
@click.option('--tag', '-t', multiple=True, help='resource tags')
def process(tag):
uid = change.send_to_orchestration(tag).graph['uid']
remember_uid(uid)
click.echo(uid)

View File

@ -15,9 +15,10 @@
from solar.core.resource.resource import load
from solar.core.resource.resource import load_all
from solar.core.resource.resource import load_by_tags
from solar.core.resource.resource import load_updated
from solar.core.resource.resource import load_childs
from solar.core.resource.resource import Resource
from solar.core.resource.resource import RESOURCE_STATE
from solar.core.resource.resource import stage_resources
from solar.core.resource.resource import validate_resources
__all__ = [
@ -26,6 +27,7 @@ __all__ = [
'load',
'load_all',
'load_by_tags',
'load_updated',
'validate_resources'
'load_childs',
'validate_resources',
'stage_resources',
]

View File

@ -182,7 +182,7 @@ def _get_template(name, content, kwargs, inputs):
def create_resources(base_path, resources, tags=None):
add_tags = tags
created_resources = []
for r in resources:
resource_name = r['id']
@ -191,6 +191,8 @@ def create_resources(base_path, resources, tags=None):
values_from = r.get('values_from')
spec = r.get('from', None)
tags = r.get('tags', [])
if add_tags:
tags.extend(add_tags)
is_composer_file = False
if spec.startswith('./') or spec.endswith('.yaml'):
spec = os.path.join(base_path, '..', spec)

View File

@ -33,6 +33,7 @@ from solar.core import validation
from solar.dblayer.model import NONE
from solar.dblayer.model import StrInt
from solar.dblayer.solar_models import CommitedResource
from solar.dblayer.solar_models import LogItem
from solar.dblayer.solar_models import Resource as DBResource
from solar.events import api
from solar import utils
@ -90,6 +91,11 @@ class Resource(object):
self.create_inputs(args)
self.db_obj.save()
LogItem.new({
'resource': self.name,
'action': 'run',
'log': 'staged',
'tags': self.tags}).save_lazy()
# Load
def create_from_db(self, resource_db):
@ -209,6 +215,12 @@ class Resource(object):
for k, v in args.items():
self.db_obj.inputs[k] = v
self.db_obj.save_lazy()
# run and update are same things from solar pov
# so lets remove this redundancy
LogItem.new(
{'resource': self.name,
'action': 'run',
'tags': self.tags}).save_lazy()
def delete(self):
return self.db_obj.delete()
@ -219,6 +231,11 @@ class Resource(object):
else:
self.db_obj.state = RESOURCE_STATE.removed.name
self.db_obj.save_lazy()
LogItem.new(
{'resource': self.name,
'action': 'remove',
'log': 'staged',
'tags': self.tags}).save_lazy()
def set_operational(self):
self.db_obj.state = RESOURCE_STATE.operational.name
@ -312,6 +329,9 @@ class Resource(object):
use_defaults=False):
mapping = get_mapping(self, receiver, mapping)
self._connect_inputs(receiver, mapping)
LogItem.new({'resource': receiver.name,
'action': 'run',
'tags': receiver.tags}).save_lazy()
# signals.connect(self, receiver, mapping=mapping)
# TODO: implement events
if use_defaults:
@ -350,17 +370,9 @@ def load(name):
return Resource(r)
def load_updated(since=None, with_childs=True):
if since is None:
startkey = StrInt.p_min()
else:
startkey = since
candids = DBResource.updated.filter(startkey, StrInt.p_max())
if with_childs:
candids = DBResource.childs(candids)
return [Resource(r) for r in DBResource.multi_get(candids)]
# TODO
def load_childs(parents):
return [Resource(r) for r in
DBResource.multi_get(DBResource.childs(parents))]
def load_all(startswith=None):
@ -380,11 +392,33 @@ def load_by_tags(query):
parsed_tags = get_string_tokens(query)
r_with_tags = [DBResource.tags.filter(tag) for tag in parsed_tags]
r_with_tags = set(itertools.chain(*r_with_tags))
candids = [Resource(r) for r in DBResource.multi_get(r_with_tags)]
resources = [Resource(r) for r in DBResource.multi_get(r_with_tags)]
nodes = filter(
lambda n: Expression(query, n.tags).evaluate(), candids)
return nodes
return filter(lambda n: Expression(query, n.tags).evaluate(), resources)
def stage_resources(resources_query, action):
"""Create log items for resources selected by query
:param resources_query: iterable with tags or basestring
:param action: basestring
"""
if isinstance(resources_query, basestring):
resources = [load(resources_query)]
else:
resources = load_by_tags(resources_query)
created = []
for resource in resources:
# save - cache doesnt cover all query in the same sesssion
# and this query will be triggered right after staging resources
log_item = LogItem.new(
{'resource': resource.name,
'action': action,
'log': 'staged',
'tags': resource.tags})
log_item.save()
created.append(log_item)
return created
def load_by_names(names):

View File

@ -25,6 +25,9 @@ from enum import Enum
from solar.computable_inputs import ComputablePassedTypes
from solar.computable_inputs.processor import get_processor
from solar.config import C
from solar.core.tags_set_parser import Expression
from solar.core.tags_set_parser import get_string_tokens
from solar.dblayer.conflict_resolution import naive_resolver
from solar.dblayer.model import check_state_for
from solar.dblayer.model import CompositeIndexField
from solar.dblayer.model import DBLayerException
@ -1129,44 +1132,96 @@ class LogItem(Model):
action = Field(basestring)
diff = Field(list)
connections_diff = Field(list)
state = Field(basestring)
base_path = Field(basestring) # remove me
updated = Field(StrInt)
history = IndexedField(StrInt)
log = Field(basestring) # staged/history
composite = CompositeIndexField(fields=('log', 'resource', 'action'))
state = Field(basestring)
# based on tags we will filter staged log items during process part
# of staging changes procedure, it will allow to isolate graphs for
# different parts of infrastructure managed by solar (e.g. cluster)
tags = TagsField(default=list)
@property
def log_action(self):
return '.'.join((self.resource, self.action))
@classmethod
def history_last(cls):
items = cls.history.filter(StrInt.n_max(),
StrInt.n_min(),
max_results=1)
if not items:
return None
return cls.get(items[0])
def save(self):
if any(f in self._modified_fields for f in LogItem.composite.fields):
self.composite.reset()
if 'log' in self._modified_fields and self.log == 'history':
self.history = StrInt(next(NegativeCounter.get_or_create(
'history')))
return super(LogItem, self).save()
@classmethod
def new(cls, data):
vals = {}
if 'uid' not in vals:
vals['uid'] = cls.uid.default
vals.update(data)
return LogItem.from_dict(vals['uid'], vals)
return LogItem.from_dict(
'{}.{}'.format(vals['resource'], vals['action']), vals)
@classmethod
def from_dict(cls, key, *args, **kwargs):
if key in cls._c.obj_cache:
return cls._c.obj_cache[key]
return super(LogItem, cls).from_dict(key, *args, **kwargs)
@classmethod
def get(cls, key):
try:
return super(LogItem, cls).get(key)
except DBLayerException:
return None
def to_history(self):
return HistoryItem.new(
self.uid,
{'uid': self.uid,
'resource': self.resource,
'action': self.action,
'base_path': self.base_path,
'diff': self.diff,
'connections_diff': self.connections_diff})
@classmethod
def log_items_by_tags(cls, tags):
query = '|'.join(tags)
parsed_tags = get_string_tokens(query)
log_items = set(map(
cls.get,
chain.from_iterable(
[cls.tags.filter(tag) for tag in parsed_tags])))
return filter(lambda li: Expression(query, li.tags).evaluate(),
log_items)
@staticmethod
def conflict_resolver(riak_object):
#: it is safe to pick any log item with data, because the key
# if particular log_action
for sibling in riak_object.siblings:
if sibling.encoded_data:
riak_object.siblings = [sibling]
return
naive_resolver(riak_object)
class HistoryItem(Model):
uid = IndexedField(basestring)
resource = Field(basestring)
action = Field(basestring)
diff = Field(list)
connections_diff = Field(list)
base_path = Field(basestring) # remove me
history = IndexedField(StrInt)
composite = CompositeIndexField(fields=('resource', 'action'))
@property
def log_action(self):
return '.'.join((self.resource, self.action))
def save(self):
if any(f in self._modified_fields for
f in HistoryItem.composite.fields):
self.composite.reset()
self.history = StrInt(next(NegativeCounter.get_or_create(
'history')))
return super(HistoryItem, self).save()
class Lock(Model):

View File

@ -13,54 +13,21 @@
# under the License.
from solar.dblayer.model import StrInt
from solar.dblayer.solar_models import LogItem
from solar.dblayer.solar_models import HistoryItem
from solar.dblayer.solar_models import NegativeCounter
def test_separate_logs():
def test_composite_filter():
history = 'history'
staged = 'staged'
history_uids = set()
staged_uids = set()
for i in range(2):
l = LogItem.new({'log': history})
l.save()
history_uids.add(l.key)
for i in range(3):
l = LogItem.new({'log': staged})
l.save()
staged_uids.add(l.key)
assert set(LogItem.composite.filter({'log': history})) == history_uids
assert set(LogItem.composite.filter({'log': staged})) == staged_uids
def test_multiple_filter():
l1 = LogItem.new({'log': 'history', 'resource': 'a'})
l2 = LogItem.new({'log': 'history', 'resource': 'b'})
l1 = HistoryItem.new('a', {'log': 'history', 'resource': 'a'})
l2 = HistoryItem.new('b', {'log': 'history', 'resource': 'b'})
l1.save()
l2.save()
assert LogItem.composite.filter({'log': 'history',
'resource': 'a'}) == [l1.key]
assert LogItem.composite.filter({'log': 'history',
'resource': 'b'}) == [l2.key]
def test_changed_index():
l = LogItem.new({'log': 'staged', 'resource': 'a', 'action': 'run'})
l.save()
assert LogItem.composite.filter({'log': 'staged'}) == [l.key]
l.log = 'history'
l.save()
assert LogItem.composite.filter({'log': 'staged'}) == []
assert LogItem.composite.filter({'log': 'history'}) == [l.key]
assert HistoryItem.composite.filter({'log': 'history',
'resource': 'a'}) == [l1.key]
assert HistoryItem.composite.filter({'log': 'history',
'resource': 'b'}) == [l2.key]
def test_negative_counter():
@ -71,40 +38,10 @@ def test_negative_counter():
def test_reversed_order_is_preserved():
added = []
for i in range(4):
li = LogItem.new({'log': 'history'})
li = HistoryItem.new(str(i), {})
li.save()
added.append(li.key)
added.reverse()
assert list(LogItem.history.filter(StrInt.n_max(),
StrInt.n_min(),
max_results=2)) == added[:2]
def test_staged_not_indexed():
added = []
for i in range(3):
li = LogItem.new({'log': 'staged'})
li.save()
added.append(li)
for li in added[:2]:
li.log = 'history'
li.save()
assert set(LogItem.history.filter(StrInt.n_max(), StrInt.n_min())) == {
li.key
for li in added[:2]
}
def test_history_last_filter():
for i in range(4):
li = LogItem.new({'log': 'history'})
li.save()
last = li
assert LogItem.history_last() == last
def test_history_last_returns_none():
assert LogItem.history_last() is None
assert list(HistoryItem.history.filter(StrInt.n_max(),
StrInt.n_min(),
max_results=2)) == added[:2]

View File

@ -33,6 +33,7 @@ if no changes noticed on dependent resource.
"""
from solar.dblayer.model import DBLayerNotFound
from solar.dblayer.solar_models import DBLayerSolarException
from solar.dblayer.solar_models import Resource
@ -104,7 +105,7 @@ class React(Event):
try:
location_id = Resource.get(self.child).inputs[
'location_id']
except DBLayerNotFound:
except (DBLayerNotFound, DBLayerSolarException):
location_id = None
changes_graph.add_node(
self.child_node, status='PENDING',
@ -126,7 +127,7 @@ class StateChange(Event):
changed_resources.append(self.parent_node)
try:
location_id = Resource.get(self.parent).inputs['location_id']
except DBLayerNotFound:
except (DBLayerNotFound, DBLayerSolarException):
location_id = None
changes_graph.add_node(
self.parent_node, status='PENDING',

View File

@ -20,8 +20,8 @@ from solar.core import resource
from solar.core.resource.resource import RESOURCE_STATE
from solar.core import signals
from solar.dblayer.solar_models import CommitedResource
from solar.dblayer.solar_models import HistoryItem
from solar.dblayer.solar_models import LogItem
from solar.dblayer.solar_models import StrInt
from solar.events import api as evapi
from solar.events.controls import StateChange
from solar.orchestration import graph
@ -53,27 +53,10 @@ def create_diff(staged, commited):
return listify(res)
def create_logitem(resource, action, diffed, connections_diffed,
base_path=''):
return LogItem.new(
{'resource': resource,
'action': action,
'diff': diffed,
'connections_diff': connections_diffed,
'base_path': base_path,
'log': 'staged'})
def create_sorted_diff(staged, commited):
staged.sort()
commited.sort()
return create_diff(staged, commited)
def make_single_stage_item(resource_obj):
def populate_log_item(log_item):
resource_obj = resource.load(log_item.resource)
commited = resource_obj.load_commited()
base_path = resource_obj.base_path
log_item.base_path = resource_obj.base_path
if resource_obj.to_be_removed():
resource_args = {}
resource_connections = []
@ -88,42 +71,87 @@ def make_single_stage_item(resource_obj):
commited_args = commited.inputs
commited_connections = commited.connections
inputs_diff = create_diff(resource_args, commited_args)
connections_diff = create_sorted_diff(
log_item.diff = create_diff(resource_args, commited_args)
log_item.connections_diff = create_sorted_diff(
resource_connections, commited_connections)
# if new connection created it will be reflected in inputs
# but using inputs to reverse connections is not possible
if inputs_diff:
li = create_logitem(
resource_obj.name,
guess_action(commited_args, resource_args),
inputs_diff,
connections_diff,
base_path=base_path)
li.save()
return li
return None
return log_item
def stage_changes():
for li in data.SL():
li.delete()
last = LogItem.history_last()
since = StrInt.greater(last.updated) if last else None
staged_log = utils.solar_map(make_single_stage_item,
resource.load_updated(since), concurrency=10)
staged_log = filter(None, staged_log)
return staged_log
def create_logitem(resource, action, populate=True):
"""Create log item in staged log
:param resource: basestring
:param action: basestring
"""
log_item = LogItem.new(
{'resource': resource,
'action': action,
'log': 'staged'})
if populate:
populate_log_item(log_item)
return log_item
def send_to_orchestration():
def create_run(resource):
return create_logitem(resource, 'run')
def create_remove(resource):
return create_logitem(resource, 'remove')
def create_sorted_diff(staged, commited):
staged.sort()
commited.sort()
return create_diff(staged, commited)
def staged_log(populate_with_changes=True):
"""Staging procedure takes manually created log items, populate them
with diff and connections diff
Current implementation prevents from several things to occur:
- same log_action (resource.action pair) cannot not be staged multiple
times
- child will be staged only if diff or connections_diff is changed,
and we can execute *run* action to apply that diff - in all other cases
child should be staged explicitly
"""
log_actions = set()
resources_names = set()
staged_log = data.SL()
without_duplicates = []
for log_item in staged_log:
if log_item.log_action in log_actions:
log_item.delete()
continue
resources_names.add(log_item.resource)
log_actions.add(log_item.log_action)
without_duplicates.append(log_item)
utils.solar_map(lambda li: populate_log_item(li),
without_duplicates, concurrency=10)
# this is backward compatible change, there might better way
# to "guess" child actions
childs = filter(lambda child: child.name not in resources_names,
resource.load_childs(list(resources_names)))
child_log_items = filter(
lambda li: li.diff or li.connections_diff,
utils.solar_map(create_run, [c.name for c in childs], concurrency=10))
for log_item in child_log_items + without_duplicates:
log_item.save_lazy()
return without_duplicates + child_log_items
def send_to_orchestration(tags=None):
dg = nx.MultiDiGraph()
events = {}
changed_nodes = []
for logitem in data.SL():
if tags:
staged_log = LogItem.log_items_by_tags(tags)
else:
staged_log = data.SL()
for logitem in staged_log:
events[logitem.resource] = evapi.all_events(logitem.resource)
changed_nodes.append(logitem.resource)
@ -155,20 +183,27 @@ def _get_args_to_update(args, connections):
}
def is_create(logitem):
return all((item[0] == 'add' for item in logitem.diff))
def is_update(logitem):
return any((item[0] == 'change' for item in logitem.diff))
def revert_uids(uids):
"""Reverts uids
:param uids: iterable not generator
"""
items = LogItem.multi_get(uids)
items = HistoryItem.multi_get(uids)
for item in items:
if item.action == CHANGES.update.name:
if is_update(item):
_revert_update(item)
elif item.action == CHANGES.remove.name:
_revert_remove(item)
elif item.action == CHANGES.run.name:
elif is_create(item):
_revert_run(item)
else:
log.debug('Action %s for resource %s is a side'
@ -219,8 +254,8 @@ def _update_inputs_connections(res_obj, args, old_connections, new_connections):
# that some values can not be updated
# even if connection was removed
receiver_obj.db_obj.save()
res_obj.update(args)
if args:
res_obj.update(args)
def _revert_update(logitem):
@ -256,10 +291,9 @@ def _discard_update(item):
old_connections = resource_obj.connections
new_connections = dictdiffer.revert(
item.connections_diff, sorted(old_connections))
args = dictdiffer.revert(item.diff, resource_obj.args)
inputs = dictdiffer.revert(item.diff, resource_obj.args)
_update_inputs_connections(
resource_obj, _get_args_to_update(args, new_connections),
resource_obj, _get_args_to_update(inputs, old_connections),
old_connections, new_connections)
@ -268,13 +302,13 @@ def _discard_run(item):
def discard_uids(uids):
items = LogItem.multi_get(uids)
items = filter(bool, LogItem.multi_get(uids))
for item in items:
if item.action == CHANGES.update.name:
if is_update(item):
_discard_update(item)
elif item.action == CHANGES.remove.name:
_discard_remove(item)
elif item.action == CHANGES.run.name:
elif is_create(item):
_discard_run(item)
else:
log.debug('Action %s for resource %s is a side'
@ -288,7 +322,7 @@ def discard_uid(uid):
def discard_all():
staged_log = data.SL()
return discard_uids([l.uid for l in staged_log])
return discard_uids([l.key for l in staged_log])
def commit_all():

View File

@ -12,18 +12,22 @@
# License for the specific language governing permissions and limitations
# under the License.
from solar.dblayer.solar_models import HistoryItem
from solar.dblayer.solar_models import LogItem
def SL():
rst = LogItem.composite.filter({'log': 'staged'})
return LogItem.multi_get(rst)
rst = LogItem.bucket.get_index('$bucket',
startkey='_',
max_results=100000).results
return filter(bool, LogItem.multi_get(rst))
def CL():
rst = LogItem.composite.filter({'log': 'history'})
return LogItem.multi_get(rst)
rst = HistoryItem.bucket.get_index('$bucket',
startkey='_',
max_results=100000).results
return HistoryItem.multi_get(rst)
def compact(logitem):

View File

@ -28,34 +28,32 @@ def set_error(log_action, *args, **kwargs):
resource_obj = resource.load(item.resource)
resource_obj.set_error()
item.state = 'error'
item.save()
item.delete()
def commit_log_item(item):
resource_obj = resource.load(item.resource)
commited = CommitedResource.get_or_create(item.resource)
if item.action == CHANGES.remove.name:
resource_obj.delete()
commited.state = resource.RESOURCE_STATE.removed.name
else:
resource_obj.set_operational()
commited.state = resource.RESOURCE_STATE.operational.name
commited.base_path = item.base_path
resource_obj.db_obj.save_lazy()
commited.inputs = patch(item.diff, commited.inputs)
# TODO fix TagsWrp to return list
# commited.tags = resource_obj.tags
sorted_connections = sorted(commited.connections)
commited.connections = patch(item.connections_diff, sorted_connections)
commited.save_lazy()
item.to_history().save_lazy()
item.delete()
def move_to_commited(log_action, *args, **kwargs):
sl = data.SL()
item = next((i for i in sl if i.log_action == log_action), None)
if item:
resource_obj = resource.load(item.resource)
commited = CommitedResource.get_or_create(item.resource)
updated = resource_obj.db_obj.updated
if item.action == CHANGES.remove.name:
resource_obj.delete()
commited.state = resource.RESOURCE_STATE.removed.name
else:
resource_obj.set_operational()
commited.state = resource.RESOURCE_STATE.operational.name
commited.base_path = item.base_path
updated = resource_obj.db_obj.updated
# required to update `updated` field
resource_obj.db_obj.save()
commited.inputs = patch(item.diff, commited.inputs)
# TODO fix TagsWrp to return list
# commited.tags = resource_obj.tags
sorted_connections = sorted(commited.connections)
commited.connections = patch(item.connections_diff, sorted_connections)
commited.save()
item.log = 'history'
item.state = 'success'
item.updated = updated
item.save()
commit_log_item(item)

View File

@ -20,6 +20,7 @@ import pytest
from solar.config import C # NOQA
from solar.core.resource import composer
from solar.dblayer.model import clear_cache
from solar.dblayer.model import ModelMeta
from solar.errors import ExecutionTimeout
from solar import orchestration
from solar.orchestration.graph import wait_finish
@ -57,7 +58,8 @@ def test_concurrent_sequences_with_no_handler(scale, clients):
timeout = scale * 2
scheduler_client = clients['scheduler']
assert len(change.stage_changes()) == total_resources
assert len(change.staged_log()) == total_resources
ModelMeta.session_end()
plan = change.send_to_orchestration()
scheduler_client.next({}, plan.graph['uid'])
@ -75,4 +77,4 @@ def test_concurrent_sequences_with_no_handler(scale, clients):
assert res[states.SUCCESS.name] == total_resources
assert len(data.CL()) == total_resources
clear_cache()
assert len(change.stage_changes()) == 0
assert len(change.staged_log()) == 0

View File

@ -13,78 +13,63 @@
# under the License.
import mock
from pytest import mark
from solar.core.resource import repository
from solar.core.resource import resource
from solar.core.resource import RESOURCE_STATE
from solar.core import signals
from solar.dblayer.model import clear_cache
from solar.core.resource import stage_resources
from solar.dblayer.model import ModelMeta
from solar.dblayer.solar_models import CommitedResource
from solar.dblayer.solar_models import Resource as DBResource
from solar.system_log import change
from solar.system_log import data
from solar.system_log import operations
def create_resource(name, tags=None):
resource = DBResource.from_dict(
name,
{'name': name,
'base_path': 'x',
'state': '',
'tags': tags or [],
'meta_inputs': {'a': {'value': None,
'schema': 'str'}}})
resource.save_lazy()
return resource
def test_revert_update():
commit = {'a': '10'}
previous = {'a': '9'}
res = DBResource.from_dict('test1',
{'name': 'test1',
'base_path': 'x',
'meta_inputs': {'a': {'value': None,
'schema': 'str'}}})
prev = {'a': '9'}
new = {'a': '10'}
res = create_resource('test1')
res.save()
action = 'update'
res.inputs['a'] = '9'
action = 'run'
resource_obj = resource.load(res.name)
assert resource_obj.args == previous
log = data.SL()
logitem = change.create_logitem(res.name,
action,
change.create_diff(commit, previous),
[],
base_path=res.base_path)
log.append(logitem)
resource_obj.update(commit)
operations.move_to_commited(logitem.log_action)
resource_obj.update(prev)
logitem = change.create_logitem(res.name, action)
operations.commit_log_item(logitem)
resource_obj.update(new)
logitem = change.create_logitem(res.name, action)
uid = logitem.uid
assert logitem.diff == [['change', 'a', ['9', '10']]]
assert resource_obj.args == commit
operations.commit_log_item(logitem)
assert resource_obj.args == new
change.revert(logitem.uid)
assert resource_obj.args == previous
change.revert(uid)
assert resource_obj.args == {'a': '9'}
def test_revert_update_connected():
res1 = DBResource.from_dict('test1',
{'name': 'test1',
'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None,
'schema': 'str'}}})
res1 = create_resource('test1')
res1.inputs['a'] = '9'
res1.save_lazy()
res2 = DBResource.from_dict('test2',
{'name': 'test2',
'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None,
'schema': 'str'}}})
res2 = create_resource('test2')
res2.inputs['a'] = ''
res2.save_lazy()
res3 = DBResource.from_dict('test3',
{'name': 'test3',
'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None,
'schema': 'str'}}})
res3 = create_resource('test3')
res3.inputs['a'] = ''
res3.save_lazy()
@ -95,41 +80,36 @@ def test_revert_update_connected():
res2.connect(res3)
ModelMeta.save_all_lazy()
staged_log = change.stage_changes()
staged_log = map(lambda res: change.create_run(res.name),
(res1, res2, res3))
assert len(staged_log) == 3
for item in staged_log:
assert item.action == 'run'
operations.move_to_commited(item.log_action)
assert len(change.stage_changes()) == 0
operations.commit_log_item(item)
res1.disconnect(res2)
staged_log = change.stage_changes()
assert len(staged_log) == 2
staged_log = map(lambda res: change.create_run(res.name),
(res2, res3))
to_revert = []
for item in staged_log:
assert item.action == 'update'
operations.move_to_commited(item.log_action)
assert item.action == 'run'
to_revert.append(item.uid)
operations.commit_log_item(item)
change.revert_uids(sorted(to_revert, reverse=True))
ModelMeta.save_all_lazy()
staged_log = change.stage_changes()
staged_log = map(lambda res: change.create_run(res.name),
(res2, res3))
assert len(staged_log) == 2
for item in staged_log:
assert item.diff == [['change', 'a', ['', '9']]]
def test_revert_removal():
res = DBResource.from_dict('test1',
{'name': 'test1',
'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None,
'schema': 'str'}}})
res = create_resource('test1')
res.inputs['a'] = '9'
res.save_lazy()
@ -141,14 +121,13 @@ def test_revert_removal():
resource_obj.remove()
ModelMeta.save_all_lazy()
changes = change.stage_changes()
assert len(changes) == 1
assert changes[0].diff == [['remove', '', [['a', '9']]]]
operations.move_to_commited(changes[0].log_action)
log_item = change.create_remove(resource_obj.name)
log_item.save()
uid = log_item.uid
assert log_item.diff == [['remove', '', [['a', '9']]]]
operations.commit_log_item(log_item)
clear_cache()
assert DBResource._c.obj_cache == {}
# assert DBResource.bucket.get('test1').siblings == []
ModelMeta.save_all_lazy()
with mock.patch.object(repository.Repository, 'read_meta') as mread:
mread.return_value = {
@ -157,10 +136,9 @@ def test_revert_removal():
}
with mock.patch.object(repository.Repository, 'get_path') as mpath:
mpath.return_value = 'x'
change.revert(uid)
change.revert(changes[0].uid)
ModelMeta.save_all_lazy()
# assert len(DBResource.bucket.get('test1').siblings) == 1
resource_obj = resource.load('test1')
assert resource_obj.args == {
@ -170,177 +148,135 @@ def test_revert_removal():
}
@mark.xfail(
reason="""With current approach child will
notice changes after parent is removed"""
)
def test_revert_removed_child():
res1 = orm.DBResource(id='test1', name='test1', base_path='x') # NOQA
res1.save()
res1.add_input('a', 'str', '9')
res2 = orm.DBResource(id='test2', name='test2', base_path='x') # NOQA
res2.save()
res2.add_input('a', 'str', 0)
res1 = resource.load('test1')
res2 = resource.load('test2')
signals.connect(res1, res2)
staged_log = change.stage_changes()
assert len(staged_log) == 2
for item in staged_log:
operations.move_to_commited(item.log_action)
res2.remove()
staged_log = change.stage_changes()
assert len(staged_log) == 1
logitem = next(staged_log.collection())
operations.move_to_commited(logitem.log_action)
with mock.patch.object(repository, 'read_meta') as mread:
mread.return_value = {'input': {'a': {'schema': 'str!'}}}
change.revert(logitem.uid)
res2 = resource.load('test2')
assert res2.args == {'a': '9'}
def test_revert_create():
res = DBResource.from_dict('test1',
{'name': 'test1',
'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None,
'schema': 'str'}}})
res = create_resource('test1')
res.inputs['a'] = '9'
res.save_lazy()
ModelMeta.save_all_lazy()
staged_log = change.stage_changes()
assert len(staged_log) == 1
logitem = staged_log[0]
operations.move_to_commited(logitem.log_action)
logitem = change.create_run(res.name)
assert logitem.diff == [['add', '', [['a', '9']]]]
uid = logitem.uid
operations.commit_log_item(logitem)
commited = CommitedResource.get('test1')
assert commited.inputs == {'a': '9'}
change.revert(logitem.uid)
staged_log = change.stage_changes()
change.revert(uid)
ModelMeta.save_all_lazy()
staged_log = change.staged_log()
assert len(staged_log) == 1
for item in staged_log:
operations.move_to_commited(item.log_action)
operations.commit_log_item(item)
assert resource.load_all() == []
def test_discard_all_pending_changes_resources_created():
res1 = DBResource.from_dict('test1',
{'name': 'test1',
'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None,
'schema': 'str'}}})
res1 = create_resource('test1')
res1.inputs['a'] = '9'
res1.save_lazy()
res2 = DBResource.from_dict('test2',
{'name': 'test2',
'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None,
'schema': 'str'}}})
res2 = create_resource('test2')
res2.inputs['a'] = '0'
res2.save_lazy()
ModelMeta.save_all_lazy()
staged_log = change.stage_changes()
assert len(staged_log) == 2
staged_log = map(change.create_run, (res1.name, res2.name))
change.discard_all()
staged_log = change.stage_changes()
staged_log = change.staged_log()
assert len(staged_log) == 0
assert resource.load_all() == []
def test_discard_connection():
res1 = DBResource.from_dict('test1',
{'name': 'test1',
'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None,
'schema': 'str'}}})
res1 = create_resource('test1')
res1.inputs['a'] = '9'
res1.save_lazy()
res2 = DBResource.from_dict('test2',
{'name': 'test2',
'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None,
'schema': 'str'}}})
res2 = create_resource('test2')
res2.inputs['a'] = '0'
res2.save_lazy()
ModelMeta.save_all_lazy()
staged_log = change.stage_changes()
staged_log = map(change.create_run, (res1.name, res2.name))
for item in staged_log:
operations.move_to_commited(item.log_action)
operations.commit_log_item(item)
res1 = resource.load('test1')
res2 = resource.load('test2')
res1.connect(res2, {'a': 'a'})
staged_log = change.stage_changes()
ModelMeta.save_all_lazy()
staged_log = change.staged_log()
assert len(staged_log) == 1
assert res2.args == {'a': '9'}
change.discard_all()
assert res2.args == {'a': '0'}
assert len(change.stage_changes()) == 0
assert len(change.staged_log()) == 0
def test_discard_removed():
res1 = DBResource.from_dict('test1',
{'name': 'test1',
'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None,
'schema': 'str'}}})
res1 = create_resource('test1')
res1.inputs['a'] = '9'
res1.save_lazy()
ModelMeta.save_all_lazy()
staged_log = change.stage_changes()
for item in staged_log:
operations.move_to_commited(item.log_action)
res1 = resource.load('test1')
res1.remove()
assert len(change.stage_changes()) == 1
ModelMeta.save_all_lazy()
assert len(change.staged_log()) == 1
assert res1.to_be_removed()
change.discard_all()
assert len(change.stage_changes()) == 0
assert len(change.staged_log()) == 0
assert not resource.load('test1').to_be_removed()
def test_discard_update():
res1 = DBResource.from_dict('test1',
{'name': 'test1',
'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None,
'schema': 'str'}}})
res1 = create_resource('test1')
res1.inputs['a'] = '9'
res1.save_lazy()
ModelMeta.save_all_lazy()
staged_log = change.stage_changes()
for item in staged_log:
operations.move_to_commited(item.log_action)
operations.commit_log_item(change.create_run(res1.name))
res1 = resource.load('test1')
res1.update({'a': '11'})
assert len(change.stage_changes()) == 1
ModelMeta.save_all_lazy()
assert len(change.staged_log()) == 1
assert res1.args == {'a': '11'}
change.discard_all()
assert res1.args == {'a': '9'}
def test_stage_and_process_partially():
a = ['a']
b = ['b']
both = a + b
range_a = range(1, 4)
range_b = range(4, 6)
with_tag_a = [create_resource(str(n), tags=a) for n in range_a]
with_tag_b = [create_resource(str(n), tags=b) for n in range_b]
ModelMeta.save_all_lazy()
created_log_items_with_a = stage_resources(a, 'restart')
assert len(created_log_items_with_a) == len(with_tag_a)
created_log_items_with_b = stage_resources(b, 'restart')
assert len(created_log_items_with_b) == len(with_tag_b)
a_graph = change.send_to_orchestration(a)
a_expected = set(['%s.restart' % n for n in range_a])
assert set(a_graph.nodes()) == a_expected
b_graph = change.send_to_orchestration(b)
b_expected = set(['%s.restart' % n for n in range_b])
assert set(b_graph.nodes()) == b_expected
both_graph = change.send_to_orchestration(both)
assert set(both_graph.nodes()) == a_expected | b_expected
def test_childs_added_on_stage():
res_0, res_1 = [create_resource(str(n)) for n in range(2)]
res_0.connect(res_1, {'a': 'a'})
ModelMeta.save_all_lazy()
created_log_items = stage_resources(res_0.name, 'run')
assert len(created_log_items) == 1
assert created_log_items[0].resource == res_0.name
staged_log = change.staged_log()
assert len(staged_log) == 2
child_log_item = next(li for li in staged_log
if li.resource == res_1.name)
assert child_log_item.action == 'run'