From 6029c27f07cba404bd1bedea41702ce869d38049 Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Fri, 26 Jun 2015 13:00:48 +0300 Subject: [PATCH] Basic API for orchestration --- cli.py | 64 +++++++++++++++++++++++++++++++++++ orch/examples/multi.yaml | 63 ++++++++++++++++++++++++++++++++++ orch/examples/simple.yaml | 10 ++++++ orch/examples/test_errors.yml | 34 +++++++++++++++++++ orch/graph.py | 52 ++++++++++++++++++++++++++++ orch/tasks.py | 32 ++++++++++-------- 6 files changed, 241 insertions(+), 14 deletions(-) create mode 100755 cli.py create mode 100644 orch/examples/multi.yaml create mode 100644 orch/examples/simple.yaml create mode 100644 orch/examples/test_errors.yml diff --git a/cli.py b/cli.py new file mode 100755 index 00000000..8a866290 --- /dev/null +++ b/cli.py @@ -0,0 +1,64 @@ +#!/usr/bin/python + +import click + +from orch import graph +from orch import tasks + + +@click.group() +def orchestration(): + pass + + +@click.command() +@click.argument('plan', type=click.File('rb')) +def create(plan): + click.echo(graph.create_plan(plan.read())) + + +@click.command() +@click.argument('uid') +def report(uid): + colors = { + 'PENDING': 'white', + 'ERROR': 'red', + 'SUCCESS': 'green', + 'INPROGRESS': 'yellow'} + + report = graph.report_topo(uid) + for item in report: + click.echo( + click.style('{} -> {}'.format(item[0], item[1]), fg=colors[item[1]])) + + +@click.command() +@click.argument('uid') +def execute(uid): + tasks.schedule_start.apply_async(args=[uid], queue='master') + + +@click.command() +@click.argument('uid') +@click.option('--reset', default=False, is_flag=True) +def restart(uid, reset): + if reset: + graph.reset(uid) + tasks.schedule_start.apply_async(args=[uid], queue='master') + + +@click.command() +@click.argument('uid') +def reset(uid): + graph.reset(uid) + + +orchestration.add_command(create) +orchestration.add_command(report) +orchestration.add_command(execute) +orchestration.add_command(restart) +orchestration.add_command(reset) + + +if __name__ == '__main__': + orchestration() diff --git a/orch/examples/multi.yaml b/orch/examples/multi.yaml new file mode 100644 index 00000000..9c8ce99e --- /dev/null +++ b/orch/examples/multi.yaml @@ -0,0 +1,63 @@ + +name: multi +tasks: + - uid: rabbitmq_cluster1.create + parameters: + type: cmd + args: ['echo rabbitmq_cluster1.create'] + before: [amqp_cluster_configured] + + - uid: rabbitmq_cluster2.join + parameters: + type: cmd + args: ['echo rabbitmq_cluster2.join'] + after: [rabbitmq_cluster1.create] + before: [amqp_cluster_configured] + - uid: rabbitmq_cluster3.join + parameters: + type: cmd + args: ['echo rabbitmq_cluster3.join'] + after: [rabbitmq_cluster1.create] + before: [amqp_cluster_configured] + + - uid: amqp_cluster_configured + parameters: + type: fault_tolerance + args: [100] + + - uid: compute1 + parameters: + type: echo + args: [compute1] + before: [compute_ready] + after: [amqp_cluster_configured] + - uid: compute2 + parameters: + type: echo + args: [compute2] + before: [compute_ready] + after: [amqp_cluster_configured] + - uid: compute3 + parameters: + type: echo + args: [compute3] + before: [compute_ready] + after: [amqp_cluster_configured] + - uid: compute4 + parameters: + type: error + args: [compute4] + before: [compute_ready] + after: [amqp_cluster_configured] + - uid: compute5 + parameters: + type: error + args: [compute5] + before: [compute_ready] + after: [amqp_cluster_configured] + + - uid: compute_ready + parameters: + type: fault_tolerance + args: [60] + diff --git a/orch/examples/simple.yaml b/orch/examples/simple.yaml new file mode 100644 index 00000000..0ff2f2e1 --- /dev/null +++ b/orch/examples/simple.yaml @@ -0,0 +1,10 @@ +name: simple +tasks: + - uid: sleep_some_time + parameters: + type: sleep + args: [10] + - uid: just_fail + parameters: + type: error + args: ['message'] diff --git a/orch/examples/test_errors.yml b/orch/examples/test_errors.yml new file mode 100644 index 00000000..fb9f3310 --- /dev/null +++ b/orch/examples/test_errors.yml @@ -0,0 +1,34 @@ + +name: errors +tasks: + - uid: compute1 + parameters: + type: echo + args: [compute1] + before: [compute_ready] + - uid: compute2 + parameters: + type: echo + args: [compute2] + before: [compute_ready] + - uid: compute3 + parameters: + type: echo + args: [compute3] + before: [compute_ready] + - uid: compute4 + parameters: + type: error + args: [compute4] + before: [compute_ready] + - uid: compute5 + parameters: + type: error + args: [compute5] + before: [compute_ready] + + - uid: compute_ready + parameters: + type: fault_tolerance + args: [80] + diff --git a/orch/graph.py b/orch/graph.py index c8f91c7e..a9bbdf95 100644 --- a/orch/graph.py +++ b/orch/graph.py @@ -5,6 +5,11 @@ import networkx as nx import redis import json +import yaml + +import uuid + + r = redis.StrictRedis(host='10.0.0.2', port=6379, db=1) @@ -23,3 +28,50 @@ def get_graph(name): dg.add_nodes_from(nodes) dg.add_edges_from(edges) return dg + + +get_plan = get_graph + + +def parse_plan(plan_data): + """ parses yaml definition and returns graph + """ + plan = yaml.load(plan_data) + dg = nx.DiGraph() + dg.graph['name'] = plan['name'] + for task in plan['tasks']: + dg.add_node( + task['uid'], status='PENDING', **task['parameters']) + for v in task.get('before', ()): + dg.add_edge(task['uid'], v) + for u in task.get('after', ()): + dg.add_edge(u, task['uid']) + return dg + + +def reset(uid): + dg = get_graph(uid) + for n in dg: + dg.node[n]['status'] = 'PENDING' + save_graph(uid, dg) + + +def create_plan(plan_data): + """ + """ + dg = parse_plan(plan_data) + dg.graph['uid'] = "{0}:{1}".format(dg.graph['name'], str(uuid.uuid4())) + save_graph(dg.graph['uid'], dg) + return dg.graph['uid'] + + +def report_topo(uid): + + dg = get_graph(uid) + report = [] + + for task in nx.topological_sort(dg): + status = dg.node[task]['status'] + report.append([task, status]) + + return report diff --git a/orch/tasks.py b/orch/tasks.py index 267d7c7b..ed751a34 100644 --- a/orch/tasks.py +++ b/orch/tasks.py @@ -48,7 +48,7 @@ def maybe_ignore(func): @solar_task @maybe_ignore -def cmd(cmd): +def cmd(ctxt, cmd): popen = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) out, err = popen.communicate() @@ -72,9 +72,12 @@ def error(ctxt, message): @solar_task def fault_tolerance(ctxt, percent): - dg = graph.get_graph('current') + task_id = ctxt.request.id + plan_uid, task_name = task_id.rsplit(':', 1) + + dg = graph.get_graph(plan_uid) success = 0.0 - predecessors = dg.predecessors(ctxt.request.id) + predecessors = dg.predecessors(task_name) lth = len(predecessors) for s in predecessors: @@ -112,29 +115,29 @@ def fire_timeout(task_id): @app.task -def schedule_start(): +def schedule_start(plan_uid): """On receive finished task should update storage with task result: - find successors that should be executed - apply different policies to tasks """ - dg = graph.get_graph('current') + dg = graph.get_graph(plan_uid) - concurrency = dg.graph.get('concurrency', None) - next_tasks = list(islice(get_next(dg), 0, concurrency)) + next_tasks = list(get_next(dg)) print 'GRAPH {0}\n NEXT TASKS {1}'.format(dg.node, next_tasks) - graph.save_graph('current', dg) + graph.save_graph(plan_uid, dg) group(next_tasks)() @app.task def schedule_next(task_id, status): - dg = graph.get_graph('current') - dg.node[task_id]['status'] = status - concurrency = dg.graph.get('concurrency', None) - next_tasks = list(islice(get_next(dg), 0, concurrency)) + plan_uid, task_name = task_id.rsplit(':', 1) + dg = graph.get_graph(plan_uid) + dg.node[task_name]['status'] = status + + next_tasks = list(get_next(dg)) print 'GRAPH {0}\n NEXT TASKS {1}'.format(dg.node, next_tasks) - graph.save_graph('current', dg) + graph.save_graph(plan_uid, dg) group(next_tasks)() @@ -157,12 +160,13 @@ def get_next(dg): predecessors = set(dg.predecessors(node)) if predecessors <= visited: + task_id = '{}:{}'.format(dg.graph['uid'], node) task_name = 'orch.tasks.{0}'.format(data['type']) task = app.tasks[task_name] dg.node[node]['status'] = 'INPROGRESS' subtask = task.subtask( - data['args'], task_id=node, + data['args'], task_id=task_id, time_limit=data.get('time_limit', None), soft_time_limit=data.get('soft_time_limit', None))