diff --git a/Vagrantfile b/Vagrantfile index d487e2ad..cf02ecd4 100644 --- a/Vagrantfile +++ b/Vagrantfile @@ -30,7 +30,9 @@ end SLAVES_COUNT = cfg["slaves_count"] SLAVES_RAM = cfg["slaves_ram"] +SLAVES_IMAGE = cfg["slaves_image"] MASTER_RAM = cfg["master_ram"] +MASTER_IMAGE = cfg["master_image"] SYNC_TYPE = cfg["sync_type"] MASTER_CPUS = cfg["master_cpus"] SLAVES_CPUS = cfg["slaves_cpus"] @@ -50,7 +52,7 @@ slave_celery = ansible_playbook_command("celery.yaml", ["--skip-tags", "master"] Vagrant.configure(VAGRANTFILE_API_VERSION) do |config| config.vm.define "solar-dev", primary: true do |config| - config.vm.box = "cgenie/solar-master" + config.vm.box = MASTER_IMAGE config.vm.provision "shell", inline: solar_script, privileged: true config.vm.provision "shell", inline: master_celery, privileged: true @@ -95,7 +97,7 @@ Vagrant.configure(VAGRANTFILE_API_VERSION) do |config| ip_index = i + 3 config.vm.define "solar-dev#{index}" do |config| # standard box with all stuff preinstalled - config.vm.box = "cgenie/solar-master" + config.vm.box = SLAVES_IMAGE config.vm.provision "file", source: "bootstrap/ansible.cfg", destination: "/home/vagrant/.ansible.cfg" config.vm.provision "shell", inline: slave_script, privileged: true diff --git a/examples/bootstrap/README.md b/examples/bootstrap/README.md new file mode 100644 index 00000000..fbb7fadf --- /dev/null +++ b/examples/bootstrap/README.md @@ -0,0 +1,13 @@ +# Demo of the `solar_bootstrap` Resource + +You need to instantiate Vagrant with a slave node which is unprovisioned +(i.e. started from the `trusty64` Vagrant box). + +You can start the boxes from the `Vagrantfile` in master directory and +`vagrant-settings.yml` from this directory. + +Running +```bash +python example-bootstrap.py deploy +``` +will deploy full Solar env to node `solar-dev2`. diff --git a/examples/bootstrap/example-bootstrap.py b/examples/bootstrap/example-bootstrap.py index 93867fd0..7b45fa93 100644 --- a/examples/bootstrap/example-bootstrap.py +++ b/examples/bootstrap/example-bootstrap.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python + import click import sys import time @@ -12,19 +14,6 @@ from solar import errors from solar.interfaces.db import get_db -GIT_PUPPET_LIBS_URL = 'https://github.com/CGenie/puppet-libs-resource' - - -# TODO -# Resource for repository OR puppet apt-module in run.pp -# add-apt-repository cloud-archive:juno -# To discuss: install stuff in Docker container - -# NOTE -# No copy of manifests, pull from upstream (implemented in the puppet handler) -# Official puppet manifests, not fuel-library - - db = get_db() @@ -38,15 +27,15 @@ def setup_resources(): signals.Connections.clear() - node3 = vr.create('node3', 'resources/ro_node/', { - 'ip': '10.0.0.5', - 'ssh_key': '/vagrant/.vagrant/machines/solar-dev3/virtualbox/private_key', + node2 = vr.create('node2', 'resources/ro_node/', { + 'ip': '10.0.0.4', + 'ssh_key': '/vagrant/.vagrant/machines/solar-dev2/virtualbox/private_key', 'ssh_user': 'vagrant' })[0] - solar_bootstrap3 = vr.create('solar_bootstrap3', 'resources/solar_bootstrap', {'master_ip': '10.0.0.2'})[0] + solar_bootstrap2 = vr.create('solar_bootstrap2', 'resources/solar_bootstrap', {'master_ip': '10.0.0.2'})[0] - signals.connect(node3, solar_bootstrap3) + signals.connect(node2, solar_bootstrap2) has_errors = False for r in locals().values(): @@ -63,7 +52,7 @@ def setup_resources(): sys.exit(1) resources_to_run = [ - 'solar_bootstrap3', + 'solar_bootstrap2', ] diff --git a/examples/bootstrap/vagrant-settings.yaml b/examples/bootstrap/vagrant-settings.yaml new file mode 100644 index 00000000..ca4632f1 --- /dev/null +++ b/examples/bootstrap/vagrant-settings.yaml @@ -0,0 +1,5 @@ +# rename it to vagrant-settings.yml then Vagrantfile +# will use values from this file + +slaves_count: 3 +slaves_image: ubuntu/trusty64 diff --git a/solar/solar/cli/main.py b/solar/solar/cli/main.py index 846eb34c..3b1148c2 100644 --- a/solar/solar/cli/main.py +++ b/solar/solar/cli/main.py @@ -165,22 +165,13 @@ def init_cli_connections(): @connections.command() @click.option('--start-with', default=None) @click.option('--end-with', default=None) - def graph(end_with, start_with): - #g = xs.connection_graph() + def graph(start_with, end_with): g = signals.detailed_connection_graph(start_with=start_with, end_with=end_with) nx.write_dot(g, 'graph.dot') fabric_api.local('dot -Tpng graph.dot -o graph.png') - # Matplotlib - #pos = nx.spring_layout(g) - #nx.draw_networkx_nodes(g, pos) - #nx.draw_networkx_edges(g, pos, arrows=True) - #nx.draw_networkx_labels(g, pos) - #plt.axis('off') - #plt.savefig('graph.png') - def init_cli_resource(): @main.group() diff --git a/solar/solar/cli/orch.py b/solar/solar/cli/orch.py index d186eeac..a7993ee3 100755 --- a/solar/solar/cli/orch.py +++ b/solar/solar/cli/orch.py @@ -99,16 +99,10 @@ def run_once(uid): @orchestration.command() @click.argument('uid', type=SOLARUID) def restart(uid): - graph.reset(uid) + graph.reset_by_uid(uid) tasks.schedule_start.apply_async(args=[uid], queue='scheduler') -@orchestration.command() -@click.argument('uid', type=SOLARUID) -def reset(uid): - graph.reset(uid) - - @orchestration.command() @click.argument('uid', type=SOLARUID) def stop(uid): @@ -119,17 +113,23 @@ def stop(uid): tasks.soft_stop.apply_async(args=[uid], queue='scheduler') +@orchestration.command() +@click.argument('uid', type=SOLARUID) +def reset(uid): + graph.reset_by_uid(uid) + + @orchestration.command() @click.argument('uid', type=SOLARUID) def resume(uid): - graph.reset(uid, ['SKIPPED']) + graph.reset_by_uid(uid, ['SKIPPED']) tasks.schedule_start.apply_async(args=[uid], queue='scheduler') @orchestration.command() @click.argument('uid', type=SOLARUID) def retry(uid): - graph.reset(uid, ['ERROR']) + graph.reset_by_uid(uid, ['ERROR']) tasks.schedule_start.apply_async(args=[uid], queue='scheduler') diff --git a/solar/solar/core/resource/resource.py b/solar/solar/core/resource/resource.py index 65dfa2e4..6fdcb0f1 100644 --- a/solar/solar/core/resource/resource.py +++ b/solar/solar/core/resource/resource.py @@ -113,7 +113,7 @@ class Resource(object): def resource_inputs(self): return { - i.name: i for i in self.db_obj.inputs.value + i.name: i for i in self.db_obj.inputs.as_set() } def to_dict(self): diff --git a/solar/solar/core/signals.py b/solar/solar/core/signals.py index 6e6345e4..a2003f0e 100644 --- a/solar/solar/core/signals.py +++ b/solar/solar/core/signals.py @@ -13,9 +13,12 @@ # License for the specific language governing permissions and limitations # under the License. +import networkx + from solar.core.log import log from solar.events.api import add_events from solar.events.controls import Dependency +from solar.interfaces import orm def guess_mapping(emitter, receiver): @@ -92,7 +95,7 @@ def connect_single(emitter, src, receiver, dst): # Check for cycles # TODO: change to get_paths after it is implemented in drivers - if emitter_input in receiver_input.receivers.value: + if emitter_input in receiver_input.receivers.as_set(): raise Exception('Prevented creating a cycle') log.debug('Connecting {}::{} -> {}::{}'.format( @@ -103,6 +106,10 @@ def connect_single(emitter, src, receiver, dst): def connect_multi(emitter, src, receiver, dst): receiver_input_name, receiver_input_key = dst.split(':') + if '|' in receiver_input_key: + receiver_input_key, receiver_input_tag = receiver_input_key.split('|') + else: + receiver_input_tag = None emitter_input = emitter.resource_inputs()[src] receiver_input = receiver.resource_inputs()[receiver_input_name] @@ -113,11 +120,16 @@ def connect_multi(emitter, src, receiver, dst): 'Receiver input {} must be a hash or a list of hashes'.format(receiver_input_name) ) - log.debug('Connecting {}::{} -> {}::{}[{}]'.format( + log.debug('Connecting {}::{} -> {}::{}[{}], tag={}'.format( emitter.name, emitter_input.name, receiver.name, receiver_input.name, - receiver_input_key + receiver_input_key, + receiver_input_tag )) - emitter_input.receivers.add_hash(receiver_input, receiver_input_key) + emitter_input.receivers.add_hash( + receiver_input, + receiver_input_key, + tag=receiver_input_tag + ) def disconnect_receiver_by_input(receiver, input_name): @@ -130,3 +142,40 @@ def disconnect(emitter, receiver): for emitter_input in emitter.resource_inputs().values(): for receiver_input in receiver.resource_inputs().values(): emitter_input.receivers.remove(receiver_input) + + +def detailed_connection_graph(start_with=None, end_with=None): + resource_inputs_graph = orm.DBResource.inputs.graph() + inputs_graph = orm.DBResourceInput.receivers.graph() + + def node_attrs(n): + if isinstance(n, orm.DBResource): + return { + 'color': 'yellowgreen', + 'style': 'filled', + } + elif isinstance(n, orm.DBResourceInput): + return { + 'color': 'lightskyblue', + 'style': 'filled, rounded', + } + + def format_name(i): + if isinstance(i, orm.DBResource): + return i.name + elif isinstance(i, orm.DBResourceInput): + return '{}/{}'.format(i.resource.name, i.name) + + for r, i in resource_inputs_graph.edges(): + inputs_graph.add_edge(r, i) + + ret = networkx.MultiDiGraph() + + for u, v in inputs_graph.edges(): + u_n = format_name(u) + v_n = format_name(v) + ret.add_edge(u_n, v_n) + ret.node[u_n] = node_attrs(u) + ret.node[v_n] = node_attrs(v) + + return ret diff --git a/solar/solar/interfaces/orm.py b/solar/solar/interfaces/orm.py index 7c56d6a6..27454519 100644 --- a/solar/solar/interfaces/orm.py +++ b/solar/solar/interfaces/orm.py @@ -13,6 +13,7 @@ # under the License. import inspect +import networkx import uuid from solar import errors @@ -89,6 +90,24 @@ class DBRelatedField(object): self.name = name self.source_db_object = source_db_object + @classmethod + def graph(self): + relations = db.get_relations(type_=self.relation_type) + + g = networkx.MultiDiGraph() + + for r in relations: + source = self.source_db_class(**r.start_node.properties) + dest = self.destination_db_class(**r.end_node.properties) + properties = r.properties.copy() + g.add_edge( + source, + dest, + attr_dict=properties + ) + + return g + def all(self): source_db_node = self.source_db_object._db_node @@ -123,7 +142,7 @@ class DBRelatedField(object): type_=self.relation_type ) - def add_hash(self, destination_db_object, destination_key): + def add_hash(self, destination_db_object, destination_key, tag=None): if not isinstance(destination_db_object, self.destination_db_class): raise errors.SolarError( 'Object {} is of incompatible type {}.'.format( @@ -134,7 +153,7 @@ class DBRelatedField(object): db.get_or_create_relation( self.source_db_object._db_node, destination_db_object._db_node, - properties={'destination_key': destination_key}, + properties={'destination_key': destination_key, 'tag': tag}, type_=self.relation_type ) @@ -146,8 +165,7 @@ class DBRelatedField(object): type_=self.relation_type ) - @property - def value(self): + def as_set(self): """ Return DB objects that are destinations for self.source_db_object. """ @@ -165,7 +183,7 @@ class DBRelatedField(object): def sources(self, destination_db_object): """ - Reverse of self.value, i.e. for given destination_db_object, + Reverse of self.as_set, i.e. for given destination_db_object, return source DB objects. """ @@ -392,16 +410,19 @@ class DBResourceInput(DBObject): return [i.backtrack_value() for i in inputs] # NOTE: we return a list of values, but we need to group them - # by resource name, hence this dict here + # hence this dict here + # NOTE: grouping is done by resource.name by default, but this + # can be overwritten by the 'tag' property in relation ret = {} for r in relations: source = source_class(**r.start_node.properties) - ret.setdefault(source.resource.name, {}) + tag = r.properties['tag'] or source.resource.name + ret.setdefault(tag, {}) key = r.properties['destination_key'] value = source.backtrack_value() - ret[source.resource.name].update({key: value}) + ret[tag].update({key: value}) return ret.values() elif self.is_hash: diff --git a/solar/solar/orchestration/graph.py b/solar/solar/orchestration/graph.py index ca871af3..f210b86b 100644 --- a/solar/solar/orchestration/graph.py +++ b/solar/solar/orchestration/graph.py @@ -25,7 +25,7 @@ from solar.interfaces.db import get_db db = get_db() -def save_graph(name, graph): +def save_graph(graph): # maybe it is possible to store part of information in AsyncResult backend uid = graph.graph['uid'] db.create(uid, graph.graph, db.COLLECTIONS.plan_graph) @@ -78,7 +78,7 @@ def parse_plan(plan_path): def create_plan_from_graph(dg, save=True): dg.graph['uid'] = "{0}:{1}".format(dg.graph['name'], str(uuid.uuid4())) if save: - save_graph(dg.graph['uid'], dg) + save_graph(dg) return dg @@ -110,27 +110,36 @@ def create_plan(plan_path, save=True): def update_plan(uid, plan_path): """update preserves old status of tasks if they werent removed """ - dg = parse_plan(plan_path) - old_dg = get_graph(uid) - dg.graph = old_dg.graph - for n in dg: - if n in old_dg: - dg.node[n]['status'] = old_dg.node[n]['status'] - save_graph(uid, dg) - return uid + new = parse_plan(plan_path) + old = get_graph(uid) + return update_plan_from_graph(new, old).graph['uid'] -def reset(uid, state_list=None): +def update_plan_from_graph(new, old): + new.graph = old.graph + for n in new: + if n in old: + new.node[n]['status'] = old.node[n]['status'] + + save_graph(new) + return new + + +def reset_by_uid(uid, state_list=None): dg = get_graph(uid) - for n in dg: - if state_list is None or dg.node[n]['status'] in state_list: - dg.node[n]['status'] = states.PENDING.name - save_graph(uid, dg) + return reset(dg, state_list=state_list) + + +def reset(graph, state_list=None): + for n in graph: + if state_list is None or graph.node[n]['status'] in state_list: + graph.node[n]['status'] = states.PENDING.name + save_graph(graph) def reset_filtered(uid): - reset(uid, state_list=[states.SKIPPED.name, states.NOOP.name]) + reset_by_uid(uid, state_list=[states.SKIPPED.name, states.NOOP.name]) def report_topo(uid): diff --git a/solar/solar/orchestration/tasks.py b/solar/solar/orchestration/tasks.py index a8dc1264..671b52d7 100644 --- a/solar/solar/orchestration/tasks.py +++ b/solar/solar/orchestration/tasks.py @@ -126,7 +126,7 @@ def schedule(plan_uid, dg): tasks) execution = executor.celery_executor( dg, limit_chain, control_tasks=('fault_tolerance',)) - graph.save_graph(plan_uid, dg) + graph.save_graph(dg) execution() @@ -147,7 +147,7 @@ def soft_stop(plan_uid): for n in dg: if dg.node[n]['status'] == 'PENDING': dg.node[n]['status'] = 'SKIPPED' - graph.save_graph(plan_uid, dg) + graph.save_graph(dg) @app.task(name='schedule_next') diff --git a/solar/solar/test/test_graph_api.py b/solar/solar/test/test_graph_api.py new file mode 100644 index 00000000..32a758be --- /dev/null +++ b/solar/solar/test/test_graph_api.py @@ -0,0 +1,75 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os +from copy import deepcopy + +from pytest import fixture + +from solar.orchestration import graph +from solar.orchestration.traversal import states + + +@fixture +def simple(): + simple_path = os.path.join( + os.path.dirname(os.path.realpath(__file__)), + 'orch_fixtures', + 'simple.yaml') + return graph.create_plan(simple_path) + + +def test_simple_plan_created_and_loaded(simple): + plan = graph.get_plan(simple.graph['uid']) + assert set(plan.nodes()) == {'just_fail', 'echo_stuff'} + + +def test_update_plan_with_new_node(simple): + new = deepcopy(simple) + new.add_node('one_more', {}) + graph.update_plan_from_graph(new, simple) + updated = graph.get_plan(new.graph['uid']) + assert set(updated.nodes()) == {'one_more', 'just_fail', 'echo_stuff'} + + +def test_status_preserved_on_update(simple): + new = deepcopy(simple) + task_under_test = 'echo_stuff' + + assert new.node[task_under_test]['status'] == states.PENDING.name + + simple.node[task_under_test]['status'] = states.SUCCESS.name + graph.update_plan_from_graph(new, simple) + + updated = graph.get_plan(new.graph['uid']) + assert new.node[task_under_test]['status'] == states.SUCCESS.name + + +def test_reset_all_states(simple): + for n in simple: + simple.node[n]['status'] == states.ERROR.name + graph.reset(simple) + + for n in simple: + assert simple.node[n]['status'] == states.PENDING.name + + +def test_reset_only_provided(simple): + simple.node['just_fail']['status'] = states.ERROR.name + simple.node['echo_stuff']['status'] = states.SUCCESS.name + + graph.reset(simple, [states.ERROR.name]) + + assert simple.node['just_fail']['status'] == states.PENDING.name + assert simple.node['echo_stuff']['status'] == states.SUCCESS.name diff --git a/solar/solar/test/test_orm.py b/solar/solar/test/test_orm.py index 74972185..ddc82219 100644 --- a/solar/solar/test/test_orm.py +++ b/solar/solar/test/test_orm.py @@ -137,7 +137,6 @@ class TestORM(BaseResourceTest): self.assertEqual(t1, t2) - class TestORMRelation(BaseResourceTest): def test_children_value(self): class TestDBRelatedObject(orm.DBObject): @@ -164,25 +163,25 @@ class TestORMRelation(BaseResourceTest): o = TestDBObject(id='a') o.save() - self.assertSetEqual(o.related.value, set()) + self.assertSetEqual(o.related.as_set(), set()) o.related.add(r1) - self.assertSetEqual(o.related.value, {r1}) + self.assertSetEqual(o.related.as_set(), {r1}) o.related.add(r2) - self.assertSetEqual(o.related.value, {r1, r2}) + self.assertSetEqual(o.related.as_set(), {r1, r2}) o.related.remove(r2) - self.assertSetEqual(o.related.value, {r1}) + self.assertSetEqual(o.related.as_set(), {r1}) o.related.add(r2) - self.assertSetEqual(o.related.value, {r1, r2}) + self.assertSetEqual(o.related.as_set(), {r1, r2}) o.related.remove(r1, r2) - self.assertSetEqual(o.related.value, set()) + self.assertSetEqual(o.related.as_set(), set()) o.related.add(r1, r2) - self.assertSetEqual(o.related.value, {r1, r2}) + self.assertSetEqual(o.related.as_set(), {r1, r2}) with self.assertRaisesRegexp(errors.SolarError, '.*incompatible type.*'): o.related.add(o) @@ -208,8 +207,8 @@ class TestORMRelation(BaseResourceTest): o1.related.add(o2) o2.related.add(o3) - self.assertEqual(o1.related.value, {o2}) - self.assertEqual(o2.related.value, {o3}) + self.assertEqual(o1.related.as_set(), {o2}) + self.assertEqual(o2.related.as_set(), {o3}) class TestResourceORM(BaseResourceTest): @@ -227,4 +226,4 @@ class TestResourceORM(BaseResourceTest): r.add_input('ip', 'str!', '10.0.0.2') - self.assertEqual(len(r.inputs.value), 1) + self.assertEqual(len(r.inputs.as_set()), 1) diff --git a/solar/solar/test/test_signals.py b/solar/solar/test/test_signals.py index 27a27fb0..7f096921 100644 --- a/solar/solar/test/test_signals.py +++ b/solar/solar/test/test_signals.py @@ -654,3 +654,50 @@ input: {'ip': sample.args['ip']}, receiver.args['server'], ) + + def test_hash_input_multiple_resources_with_tag_connect(self): + sample_meta_dir = self.make_resource_meta(""" +id: sample +handler: ansible +version: 1.0.0 +input: + ip: + schema: str! + value: + port: + schema: int! + value: + """) + receiver_meta_dir = self.make_resource_meta(""" +id: receiver +handler: ansible +version: 1.0.0 +input: + server: + schema: [{ip: str!, port: int!}] + """) + + sample1 = self.create_resource( + 'sample1', sample_meta_dir, args={'ip': '10.0.0.1', 'port': 5000} + ) + sample2 = self.create_resource( + 'sample2', sample_meta_dir, args={'ip': '10.0.0.2', 'port': 5001} + ) + receiver = self.create_resource( + 'receiver', receiver_meta_dir + ) + xs.connect(sample1, receiver, mapping={'ip': 'server:ip'}) + xs.connect(sample2, receiver, mapping={'port': 'server:port|sample1'}) + self.assertItemsEqual( + [{'ip': sample1.args['ip'], 'port': sample2.args['port']}], + receiver.args['server'], + ) + sample3 = self.create_resource( + 'sample3', sample_meta_dir, args={'ip': '10.0.0.3', 'port': 5002} + ) + xs.connect(sample3, receiver, mapping={'ip': 'server:ip', 'port': 'server:port'}) + self.assertItemsEqual( + [{'ip': sample1.args['ip'], 'port': sample2.args['port']}, + {'ip': sample3.args['ip'], 'port': sample3.args['port']}], + receiver.args['server'], + ) diff --git a/vagrant-settings.yaml_defaults b/vagrant-settings.yaml_defaults index d672cb44..d3c01541 100644 --- a/vagrant-settings.yaml_defaults +++ b/vagrant-settings.yaml_defaults @@ -3,6 +3,8 @@ slaves_count: 2 slaves_ram: 1024 +master_image: cgenie/solar-master +slaves_image: cgenie/solar-master master_ram: 1024 master_cpus: 1 slaves_cpus: 1