Merge "Implement traversal based on number of childs"
This commit is contained in:
commit
5008a8ccfa
@ -1072,6 +1072,8 @@ class Task(Model):
|
||||
|
||||
type_limit = Field(int, default=int)
|
||||
|
||||
weight = Field(int, default=int)
|
||||
|
||||
@classmethod
|
||||
def new(cls, data):
|
||||
key = '%s~%s' % (data['execution'], data['name'])
|
||||
|
@ -57,8 +57,10 @@ def get_graph(uid):
|
||||
mdg = nx.MultiDiGraph()
|
||||
mdg.graph['uid'] = uid
|
||||
mdg.graph['name'] = uid.split(':')[0]
|
||||
mdg.add_nodes_from(Task.multi_get(Task.execution.filter(uid)))
|
||||
mdg.add_edges_from([(parent, task) for task in mdg.nodes()
|
||||
tasks_by_uid = {t.key: t for t
|
||||
in Task.multi_get(Task.execution.filter(uid))}
|
||||
mdg.add_nodes_from(tasks_by_uid.values())
|
||||
mdg.add_edges_from([(tasks_by_uid[parent], task) for task in mdg.nodes()
|
||||
for parent in task.parents.all()])
|
||||
return mdg
|
||||
|
||||
@ -97,6 +99,20 @@ def total_delta(graph):
|
||||
get_plan = get_graph
|
||||
|
||||
|
||||
def assign_weights_nested(dg):
|
||||
"""Based on number of childs assign weights that will be
|
||||
used later for scheduling.
|
||||
"""
|
||||
#: NOTE reverse(copy=False) swaps successors and predecessors
|
||||
# on same copy of graph, thus before returning it - reverse it back
|
||||
reversed_graph = dg.reverse(copy=False)
|
||||
for task in nx.topological_sort(reversed_graph):
|
||||
task.weight = sum([t.weight + 1 for t
|
||||
in reversed_graph.predecessors(task)])
|
||||
task.save_lazy()
|
||||
return reversed_graph.reverse(copy=False)
|
||||
|
||||
|
||||
def parse_plan(plan_path):
|
||||
"""parses yaml definition and returns graph"""
|
||||
plan = utils.yaml_load(plan_path)
|
||||
|
@ -13,6 +13,7 @@
|
||||
# under the License.
|
||||
|
||||
from functools import partial
|
||||
from operator import attrgetter
|
||||
import time
|
||||
|
||||
from solar.core.log import log
|
||||
@ -38,7 +39,8 @@ class Scheduler(base.Worker):
|
||||
return list(limits.get_default_chain(
|
||||
plan,
|
||||
[t for t in plan if t.status == states.INPROGRESS.name],
|
||||
find_visitable_tasks(plan)))
|
||||
sorted(find_visitable_tasks(plan),
|
||||
key=attrgetter('weight'), reverse=True)))
|
||||
|
||||
def next(self, ctxt, plan_uid):
|
||||
with Lock(
|
||||
|
@ -181,10 +181,11 @@ def send_to_orchestration(tags=None):
|
||||
state_change.insert(changed_nodes, dg)
|
||||
|
||||
evapi.build_edges(dg, events)
|
||||
|
||||
# what `name` should be?
|
||||
dg.graph['name'] = 'system_log'
|
||||
return graph.create_plan_from_graph(dg)
|
||||
built_graph = graph.create_plan_from_graph(dg)
|
||||
graph.assign_weights_nested(built_graph)
|
||||
return built_graph
|
||||
|
||||
|
||||
def parameters(res, action, data):
|
||||
|
@ -78,6 +78,11 @@ def timelimit_plan():
|
||||
return plan_from_fixture('timelimit')
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def concurrent_choice_plan():
|
||||
return plan_from_fixture('concurrent_choice')
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sequence_vr(tmpdir):
|
||||
base_path = os.path.join(
|
||||
|
@ -59,8 +59,9 @@ def test_concurrent_sequences_with_no_handler(scale, clients):
|
||||
scheduler_client = clients['scheduler']
|
||||
|
||||
assert len(change.staged_log()) == total_resources
|
||||
ModelMeta.session_end()
|
||||
ModelMeta.save_all_lazy()
|
||||
plan = change.send_to_orchestration()
|
||||
ModelMeta.save_all_lazy()
|
||||
scheduler_client.next({}, plan.graph['uid'])
|
||||
|
||||
def wait_function(timeout):
|
||||
|
47
solar/test/functional/test_weights.py
Normal file
47
solar/test/functional/test_weights.py
Normal file
@ -0,0 +1,47 @@
|
||||
# Copyright 2015 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import time
|
||||
|
||||
import gevent
|
||||
from mock import Mock
|
||||
|
||||
from solar.orchestration import graph
|
||||
|
||||
|
||||
def test_concurrent_tasks_choice_based_on_weights(
|
||||
scheduler, tasks, concurrent_choice_plan):
|
||||
worker, client = scheduler
|
||||
tracer = Mock()
|
||||
plan = concurrent_choice_plan
|
||||
worker.next.on_success(tracer.update)
|
||||
worker.update_next.on_success(tracer.update)
|
||||
|
||||
def wait_function(timeout):
|
||||
for summary in graph.wait_finish(plan.graph['uid'], timeout):
|
||||
time.sleep(0.5)
|
||||
return summary
|
||||
client.next({}, concurrent_choice_plan.graph['uid'])
|
||||
waiter = gevent.spawn(wait_function, 2)
|
||||
waiter.join(timeout=3)
|
||||
first_call = tracer.update.call_args_list[0]
|
||||
args, _ = first_call
|
||||
ctxt, rst, _ = args
|
||||
assert len(rst) == 1
|
||||
assert rst[0].name == 's2'
|
||||
second_call = tracer.update.call_args_list[1]
|
||||
args, _ = second_call
|
||||
ctxt, rst, status, msg = args
|
||||
assert len(rst) == 2
|
||||
assert {t.name for t in rst} == {'s1', 's3'}
|
21
solar/test/orch_fixtures/concurrent_choice.yaml
Normal file
21
solar/test/orch_fixtures/concurrent_choice.yaml
Normal file
@ -0,0 +1,21 @@
|
||||
name: seq
|
||||
tasks:
|
||||
- uid: s1
|
||||
parameters:
|
||||
type: echo
|
||||
args: ['s1']
|
||||
target: '1'
|
||||
weight: 0
|
||||
- uid: s2
|
||||
parameters:
|
||||
type: echo
|
||||
args: ['s2']
|
||||
target: '1'
|
||||
weight: 1
|
||||
- uid: s3
|
||||
after: [s2]
|
||||
parameters:
|
||||
type: echo
|
||||
args: ['s3']
|
||||
target: '2'
|
||||
weight: 0
|
@ -102,15 +102,15 @@ def test_several_updates(simple_plan):
|
||||
def times():
|
||||
rst = nx.DiGraph()
|
||||
t1 = Mock(name='t1', start_time=1.0, end_time=12.0,
|
||||
status='', errmsg='')
|
||||
status='', errmsg='', weight=0)
|
||||
t2 = Mock(name='t2', start_time=1.0, end_time=3.0,
|
||||
status='', errmsg='')
|
||||
status='', errmsg='', weight=0)
|
||||
t3 = Mock(name='t3', start_time=3.0, end_time=7.0,
|
||||
status='', errmsg='')
|
||||
status='', errmsg='', weight=0)
|
||||
t4 = Mock(name='t4', start_time=7.0, end_time=13.0,
|
||||
status='', errmsg='')
|
||||
status='', errmsg='', weight=0)
|
||||
t5 = Mock(name='t5', start_time=12.0, end_time=14.0,
|
||||
status='', errmsg='')
|
||||
status='', errmsg='', weight=0)
|
||||
rst.add_nodes_from([t1, t2, t3, t4, t5])
|
||||
rst.add_path([t1, t5])
|
||||
rst.add_path([t2, t3, t4])
|
||||
@ -122,3 +122,46 @@ def test_report_progress(times):
|
||||
assert report['total_time'] == 13.0
|
||||
assert report['total_delta'] == 25.0
|
||||
assert len(report['tasks']) == 5
|
||||
|
||||
|
||||
def test_assigned_weights_simple_sequence():
|
||||
dg = nx.DiGraph()
|
||||
t1 = Mock(name='t1', weight=0)
|
||||
t2 = Mock(name='t2', weight=0)
|
||||
t3 = Mock(name='t3', weight=0)
|
||||
dg.add_nodes_from([t1, t2, t3])
|
||||
dg.add_path([t1, t2, t3])
|
||||
graph.assign_weights_nested(dg)
|
||||
assert t1.weight == 2
|
||||
assert t2.weight == 1
|
||||
assert t3.weight == 0
|
||||
|
||||
|
||||
def test_weights_strictly_decreasing():
|
||||
dg = nx.DiGraph()
|
||||
tasks = [Mock(name='t%s' % i, weight=0) for i in range(10)]
|
||||
dg.add_nodes_from(tasks)
|
||||
for i in range(10):
|
||||
first, rest = tasks[i], tasks[i + 1:]
|
||||
dg.add_edges_from([(first, n) for n in rest])
|
||||
graph.assign_weights_nested(dg)
|
||||
weights = iter(t.weight for t in tasks)
|
||||
previous = next(weights)
|
||||
for item in weights:
|
||||
assert previous > item
|
||||
previous = item
|
||||
|
||||
|
||||
def test_weights_multi_path():
|
||||
dg = nx.DiGraph()
|
||||
tasks = [Mock(name='t%s' % i, weight=0) for i in range(11)]
|
||||
first = tasks[0]
|
||||
half = (len(tasks) / 2) + 1
|
||||
dg.add_nodes_from(tasks)
|
||||
dg.add_path([first] + tasks[1:half])
|
||||
dg.add_path([first] + tasks[half:])
|
||||
graph.assign_weights_nested(dg)
|
||||
assert first.weight == len(tasks) - 1
|
||||
# two subtree are equal
|
||||
for s1, s2 in zip(tasks[1:half], tasks[half:]):
|
||||
assert s1.weight == s2.weight
|
||||
|
Loading…
x
Reference in New Issue
Block a user