Merge branch 'master' into cgenie/graph-db-with-events

This commit is contained in:
Przemyslaw Kaminski 2015-09-14 13:02:45 +02:00
commit cb8857f535
12 changed files with 183 additions and 39 deletions

View File

@ -13,7 +13,7 @@
- shell: celery multi kill 2 - shell: celery multi kill 2
chdir={{ celery_dir }} chdir={{ celery_dir }}
tags: [stop] tags: [stop]
- shell: celery multi start 2 -A solar.orchestration.runner -Q:1 scheduler,system_log -Q:2 celery,{{ hostname.stdout }} - shell: celery multi start 2 -A solar.orchestration.runner -P:2 prefork -c:2 3 -Q:1 scheduler,system_log -Q:2 celery,{{ hostname.stdout }}
chdir={{ celery_dir }} chdir={{ celery_dir }}
tags: [master] tags: [master]
- shell: celery multi start 1 -A solar.orchestration.runner -Q:1 {{ hostname.stdout }} - shell: celery multi start 1 -A solar.orchestration.runner -Q:1 {{ hostname.stdout }}

View File

@ -2,7 +2,7 @@ six>=1.9.0
ply ply
click==4.0 click==4.0
jinja2==2.7.3 jinja2==2.7.3
networkx==1.9.1 networkx>=1.10
PyYAML==3.11 PyYAML==3.11
jsonschema==2.4.0 jsonschema==2.4.0
requests==2.7.0 requests==2.7.0
@ -17,5 +17,5 @@ tabulate==0.7.5
ansible ansible
celery celery
mock mock
py2neo==2.0.7
multipledispatch==0.4.8 multipledispatch==0.4.8
mock

View File

@ -19,7 +19,7 @@ from solar.orchestration import graph
from solar.orchestration import tasks from solar.orchestration import tasks
from solar.orchestration import filters from solar.orchestration import filters
from solar.orchestration import utils from solar.orchestration import utils
from solar.cli.uids_history import SOLARUID from solar.cli.uids_history import SOLARUID, remember_uid
@click.group(name='orch') @click.group(name='orch')
@ -36,16 +36,18 @@ def orchestration():
@orchestration.command() @orchestration.command()
@click.argument('plan', type=click.File('rb')) @click.argument('plan')
def create(plan): def create(plan):
click.echo(graph.create_plan(plan.read()).graph['uid']) uid = graph.create_plan(plan).graph['uid']
remember_uid(uid)
click.echo(uid)
@orchestration.command() @orchestration.command()
@click.argument('uid', type=SOLARUID) @click.argument('uid', type=SOLARUID)
@click.argument('plan', type=click.File('rb')) @click.argument('plan')
def update(uid, plan): def update(uid, plan):
graph.update_plan(uid, plan.read()) graph.update_plan(uid, plan)
@orchestration.command() @orchestration.command()
@ -58,12 +60,18 @@ def report(uid):
'INPROGRESS': 'yellow', 'INPROGRESS': 'yellow',
'SKIPPED': 'blue'} 'SKIPPED': 'blue'}
total = 0.0
report = graph.report_topo(uid) report = graph.report_topo(uid)
for item in report: for item in report:
msg = '{} -> {}'.format(item[0], item[1]) msg = '{} -> {}'.format(item[0], item[1])
if item[2]: if item[2]:
msg += ' :: {}'.format(item[2]) msg += ' :: {}'.format(item[2])
if item[4] and item[3]:
delta = float(item[4])-float(item[3])
total += delta
msg += ' D: {}'.format(delta)
click.echo(click.style(msg, fg=colors[item[1]])) click.echo(click.style(msg, fg=colors[item[1]]))
click.echo('Delta SUM: {}'.format(total))
@orchestration.command() @orchestration.command()
@click.argument('uid', type=SOLARUID) @click.argument('uid', type=SOLARUID)

View File

@ -0,0 +1,62 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License attached#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See then
# License for the specific language governing permissions and limitations
# under the License.
from fabric import api as fabric_api
from solar.core.log import log
from solar.core.transports.base import RunTransport
class _RawSSHTransport(object):
def _ssh_props(self, resource):
return {
'ssh_key': resource.args['ssh_key'].value,
'ssh_user': resource.args['ssh_user'].value
}
def _ssh_command_host(self, resource):
return '{}@{}'.format(resource.args['ssh_user'].value,
resource.args['ip'].value)
def _ssh_cmd(self, resource):
props = self._ssh_props(resource)
return ('ssh', '-i', props['ssh_key'])
class RawSSHRunTransport(RunTransport, _RawSSHTransport):
def run(self, resource, *args, **kwargs):
log.debug("RAW SSH: %s", args)
cmds = []
cwd = kwargs.get('cwd')
if cwd:
cmds.append(('cd', cwd))
cmds.append(args)
if kwargs.get('use_sudo', False):
cmds = [('sudo', ) + cmd for cmd in cmds]
cmds = [' '.join(cmd) for cmd in cmds]
remote_cmd = '\"%s\"' % ' && '.join(cmds)
ssh_cmd = self._ssh_cmd(resource)
ssh_cmd += (self._ssh_command_host(resource), remote_cmd)
log.debug("SSH CMD: %r", ssh_cmd)
return fabric_api.local(' '.join(ssh_cmd))

View File

@ -12,6 +12,8 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import time
from solar.orchestration.runner import app from solar.orchestration.runner import app
from celery import group from celery import group
@ -28,6 +30,7 @@ def celery_executor(dg, tasks, control_tasks=()):
if all_success(dg, dg.predecessors(task_name)) or task_name in control_tasks: if all_success(dg, dg.predecessors(task_name)) or task_name in control_tasks:
dg.node[task_name]['status'] = 'INPROGRESS' dg.node[task_name]['status'] = 'INPROGRESS'
dg.node[task_name]['start_time'] = time.time()
for t in generate_task(task, dg.node[task_name], task_id): for t in generate_task(task, dg.node[task_name], task_id):
to_execute.append(t) to_execute.append(t)
return group(to_execute) return group(to_execute)
@ -40,8 +43,10 @@ def generate_task(task, data, task_id):
time_limit=data.get('time_limit', None), time_limit=data.get('time_limit', None),
soft_time_limit=data.get('soft_time_limit', None)) soft_time_limit=data.get('soft_time_limit', None))
if data.get('target', None): # NOTE(dshulyak) it seems that we agreed that celery wont be installed
subtask.set(queue=data['target']) # on every slave and transport will be chosen in handler
# if data.get('target', None):
# subtask.set(queue=data['target'])
yield subtask yield subtask

View File

@ -40,24 +40,23 @@ def save_graph(name, graph):
db.create_relation_str(u, v, properties, type_=type_) db.create_relation_str(u, v, properties, type_=type_)
def get_graph(uid): def get_graph(name):
dg = nx.MultiDiGraph() dg = nx.OrderedMultiDiGraph()
collection = db.COLLECTIONS.plan_node.name + ':' + uid nodes = json.loads(r.get('{}:nodes'.format(name)))
type_= db.RELATION_TYPES.plan_edge.name + ':' + uid edges = json.loads(r.get('{}:edges'.format(name)))
dg.graph = db.get(uid, collection=db.COLLECTIONS.plan_graph).properties dg.graph = json.loads(r.get('{}:attributes'.format(name)))
dg.add_nodes_from([(n.uid, n.properties) for n in db.all(collection=collection)]) dg.add_nodes_from(nodes)
dg.add_edges_from([(i['source'], i['dest'], i['properties']) for dg.add_edges_from(edges)
i in db.all_relations(type_=type_, db_convert=False)])
return dg return dg
get_plan = get_graph get_plan = get_graph
def parse_plan(plan_data): def parse_plan(plan_path):
""" parses yaml definition and returns graph """ parses yaml definition and returns graph
""" """
plan = utils.yaml_load(plan_data) plan = utils.yaml_load(plan_path)
dg = nx.MultiDiGraph() dg = nx.MultiDiGraph()
dg.graph['name'] = plan['name'] dg.graph['name'] = plan['name']
for task in plan['tasks']: for task in plan['tasks']:
@ -100,17 +99,17 @@ def show(uid):
return utils.yaml_dump(result) return utils.yaml_dump(result)
def create_plan(plan_data, save=True): def create_plan(plan_path, save=True):
""" """
""" """
dg = parse_plan(plan_data) dg = parse_plan(plan_path)
return create_plan_from_graph(dg, save=save) return create_plan_from_graph(dg, save=save)
def update_plan(uid, plan_data): def update_plan(uid, plan_path):
"""update preserves old status of tasks if they werent removed """update preserves old status of tasks if they werent removed
""" """
dg = parse_plan(plan_data) dg = parse_plan(plan_path)
old_dg = get_graph(uid) old_dg = get_graph(uid)
dg.graph = old_dg.graph dg.graph = old_dg.graph
for n in dg: for n in dg:
@ -139,6 +138,12 @@ def report_topo(uid):
report = [] report = []
for task in nx.topological_sort(dg): for task in nx.topological_sort(dg):
report.append([task, dg.node[task]['status'], dg.node[task]['errmsg']]) data = dg.node[task]
report.append([
task,
data['status'],
data['errmsg'],
data.get('start_time'),
data.get('end_time')])
return report return report

View File

@ -41,7 +41,9 @@ __all__ = ['solar_resource', 'cmd', 'sleep',
class ReportTask(task.Task): class ReportTask(task.Task):
def on_success(self, retval, task_id, args, kwargs): def on_success(self, retval, task_id, args, kwargs):
schedule_next.apply_async(args=[task_id, 'SUCCESS'], queue='scheduler') schedule_next.apply_async(
args=[task_id, 'SUCCESS'],
queue='scheduler')
commit_logitem.apply_async(args=[task_id], queue='system_log') commit_logitem.apply_async(args=[task_id], queue='system_log')
def on_failure(self, exc, task_id, args, kwargs, einfo): def on_failure(self, exc, task_id, args, kwargs, einfo):
@ -154,5 +156,6 @@ def schedule_next(task_id, status, errmsg=None):
dg = graph.get_graph(plan_uid) dg = graph.get_graph(plan_uid)
dg.node[task_name]['status'] = status dg.node[task_name]['status'] = status
dg.node[task_name]['errmsg'] = errmsg dg.node[task_name]['errmsg'] = errmsg
dg.node[task_name]['end_time'] = time.time()
schedule(plan_uid, dg) schedule(plan_uid, dg)

View File

@ -97,6 +97,8 @@ def send_to_orchestration():
return graph.create_plan_from_graph(dg) return graph.create_plan_from_graph(dg)
def parameters(res, action): def parameters(res, action, data):
return {'args': [res, action], return {'args': [res, action],
'type': 'solar_resource'} 'type': 'solar_resource',
# unique identifier for a node should be passed
'target': data.get('ip')}

View File

@ -1,11 +0,0 @@
name: for_stop
tasks:
- uid: sleep_some_time
parameters:
type: sleep
args: [20]
before: [sleep_again]
- uid: sleep_again
parameters:
type: sleep
args: [20]

View File

@ -0,0 +1,18 @@
name: seq
tasks:
- uid: s1
parameters:
type: sleep
args: [2]
target: 1
- uid: s2
parameters:
type: sleep
args: [2]
target: 1
- uid: s3
parameters:
type: sleep
args: [2]
target: 1

View File

@ -0,0 +1,31 @@
name: sleeping_beauty
tasks:
- uid: fairy1
parameters:
type: sleep
args: [10]
before: [princess]
- uid: fairy2
parameters:
type: sleep
args: [10]
before: [princess]
- uid: fairy3
parameters:
type: sleep
args: [10]
before: [princess]
- uid: fairy4
parameters:
type: sleep
args: [10]
before: [princess]
- uid: fairy5
parameters:
type: sleep
args: [10]
before: [princess]
- uid: princess
parameters:
type: sleep
args: [10]

View File

@ -12,10 +12,13 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import os
from pytest import fixture from pytest import fixture
import networkx as nx import networkx as nx
from solar.orchestration import limits from solar.orchestration import limits
from solar.orchestration import graph
@fixture @fixture
@ -60,3 +63,21 @@ def test_filtering_chain(target_dg):
chain = limits.get_default_chain(target_dg, [], ['t1', 't2']) chain = limits.get_default_chain(target_dg, [], ['t1', 't2'])
assert list(chain) == ['t1'] assert list(chain) == ['t1']
@fixture
def seq_plan():
seq_path = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
'orch_fixtures',
'sequential.yml')
return graph.create_plan(seq_path, save=False)
def test_limits_sequential(seq_plan):
stack_to_execute = seq_plan.nodes()
while stack_to_execute:
left = stack_to_execute[0]
assert list(limits.get_default_chain(
seq_plan, [], stack_to_execute)) == [left]
stack_to_execute.pop(0)