Basic API for orchestration

This commit is contained in:
Dmitry Shulyak 2015-06-26 13:00:48 +03:00
parent 36b17071ce
commit 6029c27f07
6 changed files with 241 additions and 14 deletions

64
cli.py Executable file
View File

@ -0,0 +1,64 @@
#!/usr/bin/python
import click
from orch import graph
from orch import tasks
@click.group()
def orchestration():
pass
@click.command()
@click.argument('plan', type=click.File('rb'))
def create(plan):
click.echo(graph.create_plan(plan.read()))
@click.command()
@click.argument('uid')
def report(uid):
colors = {
'PENDING': 'white',
'ERROR': 'red',
'SUCCESS': 'green',
'INPROGRESS': 'yellow'}
report = graph.report_topo(uid)
for item in report:
click.echo(
click.style('{} -> {}'.format(item[0], item[1]), fg=colors[item[1]]))
@click.command()
@click.argument('uid')
def execute(uid):
tasks.schedule_start.apply_async(args=[uid], queue='master')
@click.command()
@click.argument('uid')
@click.option('--reset', default=False, is_flag=True)
def restart(uid, reset):
if reset:
graph.reset(uid)
tasks.schedule_start.apply_async(args=[uid], queue='master')
@click.command()
@click.argument('uid')
def reset(uid):
graph.reset(uid)
orchestration.add_command(create)
orchestration.add_command(report)
orchestration.add_command(execute)
orchestration.add_command(restart)
orchestration.add_command(reset)
if __name__ == '__main__':
orchestration()

63
orch/examples/multi.yaml Normal file
View File

@ -0,0 +1,63 @@
name: multi
tasks:
- uid: rabbitmq_cluster1.create
parameters:
type: cmd
args: ['echo rabbitmq_cluster1.create']
before: [amqp_cluster_configured]
- uid: rabbitmq_cluster2.join
parameters:
type: cmd
args: ['echo rabbitmq_cluster2.join']
after: [rabbitmq_cluster1.create]
before: [amqp_cluster_configured]
- uid: rabbitmq_cluster3.join
parameters:
type: cmd
args: ['echo rabbitmq_cluster3.join']
after: [rabbitmq_cluster1.create]
before: [amqp_cluster_configured]
- uid: amqp_cluster_configured
parameters:
type: fault_tolerance
args: [100]
- uid: compute1
parameters:
type: echo
args: [compute1]
before: [compute_ready]
after: [amqp_cluster_configured]
- uid: compute2
parameters:
type: echo
args: [compute2]
before: [compute_ready]
after: [amqp_cluster_configured]
- uid: compute3
parameters:
type: echo
args: [compute3]
before: [compute_ready]
after: [amqp_cluster_configured]
- uid: compute4
parameters:
type: error
args: [compute4]
before: [compute_ready]
after: [amqp_cluster_configured]
- uid: compute5
parameters:
type: error
args: [compute5]
before: [compute_ready]
after: [amqp_cluster_configured]
- uid: compute_ready
parameters:
type: fault_tolerance
args: [60]

10
orch/examples/simple.yaml Normal file
View File

@ -0,0 +1,10 @@
name: simple
tasks:
- uid: sleep_some_time
parameters:
type: sleep
args: [10]
- uid: just_fail
parameters:
type: error
args: ['message']

View File

@ -0,0 +1,34 @@
name: errors
tasks:
- uid: compute1
parameters:
type: echo
args: [compute1]
before: [compute_ready]
- uid: compute2
parameters:
type: echo
args: [compute2]
before: [compute_ready]
- uid: compute3
parameters:
type: echo
args: [compute3]
before: [compute_ready]
- uid: compute4
parameters:
type: error
args: [compute4]
before: [compute_ready]
- uid: compute5
parameters:
type: error
args: [compute5]
before: [compute_ready]
- uid: compute_ready
parameters:
type: fault_tolerance
args: [80]

View File

@ -5,6 +5,11 @@ import networkx as nx
import redis
import json
import yaml
import uuid
r = redis.StrictRedis(host='10.0.0.2', port=6379, db=1)
@ -23,3 +28,50 @@ def get_graph(name):
dg.add_nodes_from(nodes)
dg.add_edges_from(edges)
return dg
get_plan = get_graph
def parse_plan(plan_data):
""" parses yaml definition and returns graph
"""
plan = yaml.load(plan_data)
dg = nx.DiGraph()
dg.graph['name'] = plan['name']
for task in plan['tasks']:
dg.add_node(
task['uid'], status='PENDING', **task['parameters'])
for v in task.get('before', ()):
dg.add_edge(task['uid'], v)
for u in task.get('after', ()):
dg.add_edge(u, task['uid'])
return dg
def reset(uid):
dg = get_graph(uid)
for n in dg:
dg.node[n]['status'] = 'PENDING'
save_graph(uid, dg)
def create_plan(plan_data):
"""
"""
dg = parse_plan(plan_data)
dg.graph['uid'] = "{0}:{1}".format(dg.graph['name'], str(uuid.uuid4()))
save_graph(dg.graph['uid'], dg)
return dg.graph['uid']
def report_topo(uid):
dg = get_graph(uid)
report = []
for task in nx.topological_sort(dg):
status = dg.node[task]['status']
report.append([task, status])
return report

View File

@ -48,7 +48,7 @@ def maybe_ignore(func):
@solar_task
@maybe_ignore
def cmd(cmd):
def cmd(ctxt, cmd):
popen = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
out, err = popen.communicate()
@ -72,9 +72,12 @@ def error(ctxt, message):
@solar_task
def fault_tolerance(ctxt, percent):
dg = graph.get_graph('current')
task_id = ctxt.request.id
plan_uid, task_name = task_id.rsplit(':', 1)
dg = graph.get_graph(plan_uid)
success = 0.0
predecessors = dg.predecessors(ctxt.request.id)
predecessors = dg.predecessors(task_name)
lth = len(predecessors)
for s in predecessors:
@ -112,29 +115,29 @@ def fire_timeout(task_id):
@app.task
def schedule_start():
def schedule_start(plan_uid):
"""On receive finished task should update storage with task result:
- find successors that should be executed
- apply different policies to tasks
"""
dg = graph.get_graph('current')
dg = graph.get_graph(plan_uid)
concurrency = dg.graph.get('concurrency', None)
next_tasks = list(islice(get_next(dg), 0, concurrency))
next_tasks = list(get_next(dg))
print 'GRAPH {0}\n NEXT TASKS {1}'.format(dg.node, next_tasks)
graph.save_graph('current', dg)
graph.save_graph(plan_uid, dg)
group(next_tasks)()
@app.task
def schedule_next(task_id, status):
dg = graph.get_graph('current')
dg.node[task_id]['status'] = status
concurrency = dg.graph.get('concurrency', None)
next_tasks = list(islice(get_next(dg), 0, concurrency))
plan_uid, task_name = task_id.rsplit(':', 1)
dg = graph.get_graph(plan_uid)
dg.node[task_name]['status'] = status
next_tasks = list(get_next(dg))
print 'GRAPH {0}\n NEXT TASKS {1}'.format(dg.node, next_tasks)
graph.save_graph('current', dg)
graph.save_graph(plan_uid, dg)
group(next_tasks)()
@ -157,12 +160,13 @@ def get_next(dg):
predecessors = set(dg.predecessors(node))
if predecessors <= visited:
task_id = '{}:{}'.format(dg.graph['uid'], node)
task_name = 'orch.tasks.{0}'.format(data['type'])
task = app.tasks[task_name]
dg.node[node]['status'] = 'INPROGRESS'
subtask = task.subtask(
data['args'], task_id=node,
data['args'], task_id=task_id,
time_limit=data.get('time_limit', None),
soft_time_limit=data.get('soft_time_limit', None))