Merge pull request #88 from xarses/redis-proxied-resource

Redis proxied Resource
This commit is contained in:
Dmitry Shulyak 2015-06-12 12:30:43 +03:00
commit c0af573673
21 changed files with 395 additions and 274 deletions

2
.gitignore vendored
View File

@ -14,4 +14,6 @@ tmp/
state/
clients.json
rs/
solar.log
x-venv/

View File

@ -35,36 +35,36 @@ def deploy():
keystone_db = resource.create('keystone_db', 'resources/mariadb_keystone_db/', {'db_name': 'keystone_db', 'login_password': '', 'login_user': 'root', 'login_port': '', 'ip': '', 'ssh_user': '', 'ssh_key': ''})
keystone_db_user = resource.create('keystone_db_user', 'resources/mariadb_keystone_user/', {'new_user_name': 'keystone', 'new_user_password': 'keystone', 'db_name': '', 'login_password': '', 'login_user': 'root', 'login_port': '', 'ip': '', 'ssh_user': '', 'ssh_key': ''})
keystone_config1 = resource.create('keystone_config1', 'resources/keystone_config/', {'config_dir': '/etc/solar/keystone', 'ip': '', 'ssh_user': '', 'ssh_key': '', 'admin_token': 'admin', 'db_password': '', 'db_name': '', 'db_user': '', 'db_host': '', 'db_port': ''})
keystone_service1 = resource.create('keystone_service1', 'resources/keystone_service/', {'port': 5001, 'admin_port': 35357, 'image': '', 'ip': '', 'ssh_key': '', 'ssh_user': '', 'config_dir': ''})
keystone_config1 = resource.create('keystone_config1', 'resources/keystone_config/', {'config_dir': '/etc/solar/keystone', 'admin_token': 'admin'})
keystone_service1 = resource.create('keystone_service1', 'resources/keystone_service/', {'port': 5001, 'admin_port': 35357})
keystone_config2 = resource.create('keystone_config2', 'resources/keystone_config/', {'config_dir': '/etc/solar/keystone', 'ip': '', 'ssh_user': '', 'ssh_key': '', 'admin_token': 'admin', 'db_password': '', 'db_name': '', 'db_user': '', 'db_host': '', 'db_port': ''})
keystone_service2 = resource.create('keystone_service2', 'resources/keystone_service/', {'port': 5002, 'admin_port': 35358, 'image': '', 'ip': '', 'ssh_key': '', 'ssh_user': '', 'config_dir': ''})
keystone_config2 = resource.create('keystone_config2', 'resources/keystone_config/', {'config_dir': '/etc/solar/keystone', 'admin_token': 'admin'})
keystone_service2 = resource.create('keystone_service2', 'resources/keystone_service/', {'port': 5002, 'admin_port': 35358})
haproxy_keystone_config = resource.create('haproxy_keystone1_config', 'resources/haproxy_keystone_config/', {'name': 'keystone_config', 'listen_port':5000, 'servers':[], 'ports':[]})
haproxy_config = resource.create('haproxy_config', 'resources/haproxy_config', {'ip': '', 'ssh_key': '', 'ssh_user': '', 'configs_names':[], 'configs_ports':[], 'listen_ports':[], 'configs':[], 'config_dir': ''})
haproxy_config = resource.create('haproxy_config', 'resources/haproxy_config', {'ip': '', 'ssh_key': '', 'ssh_user': '', 'configs_names':[], 'configs_ports':[], 'listen_ports':[], 'configs':[]})
haproxy_service = resource.create('haproxy_service', 'resources/docker_container/', {'image': 'tutum/haproxy', 'ports': [], 'host_binds': [], 'volume_binds':[], 'ip': '', 'ssh_key': '', 'ssh_user': ''})
glance_db = resource.create('glance_db', 'resources/mariadb_db/', {'db_name': 'glance_db', 'login_password': '', 'login_user': 'root', 'login_port': '', 'ip': '', 'ssh_user': '', 'ssh_key': ''})
glance_db_user = resource.create('glance_db_user', 'resources/mariadb_user/', {'new_user_name': 'glance', 'new_user_password': 'glance', 'db_name': '', 'login_password': '', 'login_user': 'root', 'login_port': '', 'ip': '', 'ssh_user': '', 'ssh_key': ''})
services_tenant = resource.create('glance_keystone_tenant', 'resources/keystone_tenant', {'keystone_host': '', 'keystone_port': '', 'login_user': 'admin', 'admin_token': '', 'tenant_name': 'services', 'ip': '', 'ssh_user': '', 'ssh_key': ''})
services_tenant = resource.create('glance_keystone_tenant', 'resources/keystone_tenant', {'keystone_host': '', 'keystone_port': '', 'admin_token': '', 'tenant_name': 'services', 'ip': '', 'ssh_user': '', 'ssh_key': ''})
glance_keystone_user = resource.create('glance_keystone_user', 'resources/keystone_user', {'user_name': 'glance_admin', 'user_password': 'password1234', 'tenant_name': 'service_admins', 'role_name': 'glance_admin', 'keystone_host': '', 'keystone_port': '', 'admin_token': '', 'ip': '', 'ssh_key': '', 'ssh_user': ''})
glance_keystone_role = resource.create('glance_keystone_role', 'resources/keystone_role', {'keystone_host': '', 'keystone_port': '', 'login_user': 'admin', 'admin_token': '', 'tenant_name': '', 'user_name': '', 'role_name': 'admin', 'ip': '', 'ssh_user': '', 'ssh_key': ''})
glance_keystone_user = resource.create('glance_keystone_user', 'resources/keystone_user', {'user_name': 'glance_admin', 'user_password': 'password1234', 'tenant_name': 'service_admins', 'keystone_host': '', 'keystone_port': '', 'admin_token': '', 'ip': '', 'ssh_key': '', 'ssh_user': ''})
glance_keystone_role = resource.create('glance_keystone_role', 'resources/keystone_role', {'keystone_host': '', 'keystone_port': '', 'admin_token': '', 'tenant_name': '', 'user_name': '', 'role_name': 'admin', 'ip': '', 'ssh_user': '', 'ssh_key': ''})
glance_config = resource.create('glance_config', 'resources/glance_config/', {'ip': '', 'ssh_key': '', 'ssh_user': '', 'keystone_ip': '', 'keystone_port': '', 'config_dir': {}, 'api_port': '', 'registry_port': '', 'mysql_ip': '', 'mysql_db': '', 'mysql_user': '', 'mysql_password': '', 'keystone_admin_user': '', 'keystone_admin_password': '', 'keystone_admin_port': '', 'keystone_admin_tenant': ''})
glance_config = resource.create('glance_config', 'resources/glance_config/', {'ip': '', 'ssh_key': '', 'ssh_user': '', 'keystone_ip': '', 'keystone_port': '', 'mysql_ip': '', 'mysql_db': '', 'mysql_user': '', 'mysql_password': '', 'keystone_admin_user': '', 'keystone_admin_password': '', 'keystone_admin_port': '', 'keystone_admin_tenant': ''})
glance_api_container = resource.create('glance_api_container', 'resources/glance_api_service/', {'image': 'cgenie/centos-rdo-glance-api', 'ports': [{'value': [{'value': 9292}]}], 'host_binds': [], 'volume_binds': [], 'db_password': '', 'keystone_password': '', 'keystone_admin_token': '', 'keystone_host': '', 'ip': '', 'ssh_key': '', 'ssh_user': ''})
glance_registry_container = resource.create('glance_registry_container', 'resources/glance_registry_service/', {'image': 'cgenie/centos-rdo-glance-registry', 'ports': [{'value': [{'value': 9191}]}], 'host_binds': [], 'volume_binds': [], 'db_host': '', 'db_root_password': '', 'db_password': '', 'db_name': '', 'db_user': '', 'keystone_admin_tenant': '', 'keystone_password': '', 'keystone_user': '', 'keystone_admin_token': '', 'keystone_host': '', 'ip': '', 'ssh_key': '', 'ssh_user': ''})
# TODO: admin_port should be refactored, we need to rethink docker
# container resource and make it common for all
# resources used in this demo
glance_api_endpoint = resource.create('glance_api_endpoint', 'resources/keystone_service_endpoint/', {'ip': '', 'ssh_key': '', 'ssh_user': '', 'admin_port': 9292, 'admin_token': '', 'adminurl': 'http://{{ip}}:{{admin_port}}', 'internalurl': 'http://{{ip}}:{{port}}', 'publicurl': 'http://{{ip}}:{{port}}', 'description': 'OpenStack Image Service', 'keystone_host': '', 'keystone_port': '', 'name': 'glance', 'port': 9292, 'type': 'image'})
glance_api_endpoint = resource.create('glance_api_endpoint', 'resources/keystone_service_endpoint/', {'ip': '', 'ssh_key': '', 'ssh_user': '', 'admin_port': 9292, 'admin_token': '', 'adminurl': 'http://{{ip}}:{{admin_port}}', 'internalurl': 'http://{{ip}}:{{port}}', 'publicurl': 'http://{{ip}}:{{port}}', 'description': 'OpenStack Image Service', 'keystone_host': '', 'keystone_port': '', 'port': 9292, 'type': 'image'})
admin_tenant = resource.create('admin_tenant', 'resources/keystone_tenant', {'keystone_host': '', 'keystone_port': '', 'login_user': 'admin', 'admin_token': '', 'tenant_name': 'admin', 'ip': '', 'ssh_user': '', 'ssh_key': ''})
admin_user = resource.create('admin_user', 'resources/keystone_user', {'keystone_host': '', 'keystone_port': '', 'login_user': 'admin', 'admin_token': '', 'tenant_name': '', 'user_name': 'admin', 'user_password': 'admin', 'ip': '', 'ssh_user': '', 'ssh_key': ''})
admin_role = resource.create('admin_role', 'resources/keystone_role', {'keystone_host': '', 'keystone_port': '', 'login_user': 'admin', 'admin_token': '', 'tenant_name': '', 'user_name': '', 'role_name': 'admin', 'ip': '', 'ssh_user': '', 'ssh_key': ''})
keystone_service_endpoint = resource.create('keystone_service_endpoint', 'resources/keystone_service_endpoint/', {'ip': '', 'ssh_key': '', 'ssh_user': '', 'admin_port': '', 'admin_token': '', 'adminurl': 'http://{{ip}}:{{admin_port}}/v2.0', 'internalurl': 'http://{{ip}}:{{port}}/v2.0', 'publicurl': 'http://{{ip}}:{{port}}/v2.0', 'description': 'OpenStack Identity Service', 'keystone_host': '', 'keystone_port': '', 'name': 'keystone', 'port': '', 'type': 'identity'})
admin_tenant = resource.create('admin_tenant', 'resources/keystone_tenant', {'keystone_host': '', 'keystone_port': '', 'admin_token': '', 'tenant_name': 'admin', 'ip': '', 'ssh_user': '', 'ssh_key': ''})
admin_user = resource.create('admin_user', 'resources/keystone_user', {'keystone_host': '', 'keystone_port': '', 'admin_token': '', 'tenant_name': '', 'user_name': 'admin', 'user_password': 'admin', 'ip': '', 'ssh_user': '', 'ssh_key': ''})
admin_role = resource.create('admin_role', 'resources/keystone_role', {'keystone_host': '', 'keystone_port': '', 'admin_token': '', 'tenant_name': '', 'user_name': '', 'role_name': 'admin', 'ip': '', 'ssh_user': '', 'ssh_key': ''})
keystone_service_endpoint = resource.create('keystone_service_endpoint', 'resources/keystone_service_endpoint/', {'ip': '', 'ssh_key': '', 'ssh_user': '', 'admin_port': '', 'admin_token': '', 'adminurl': 'http://{{ip}}:{{admin_port}}/v2.0', 'internalurl': 'http://{{ip}}:{{port}}/v2.0', 'publicurl': 'http://{{ip}}:{{port}}/v2.0', 'description': 'OpenStack Identity Service', 'keystone_host': '', 'keystone_port': '', 'port': '', 'type': 'identity'})
####
@ -168,7 +168,6 @@ def deploy():
signals.connect(keystone_config1, glance_api_endpoint, {'admin_token': 'admin_token'})
signals.connect(keystone_service1, glance_api_endpoint, {'ip': 'keystone_host', 'admin_port': 'keystone_port'})
signals.Connections.flush()
has_errors = False
for r in locals().values():

View File

@ -28,4 +28,4 @@
{% endif %}
- name: wait for glance api
wait_for: host={{ ip }} port=9393 timeout=20
wait_for: host={{ ip }} port={{ ports.value[0]['value']['value'] }} timeout=20

View File

@ -4,7 +4,7 @@ version: 1.0.0
input:
image:
schema: str!
value: kollaglue/centos-rdo-k-keystone
value: kollaglue/centos-rdo-j-keystone
config_dir:
schema: str!
value: /etc/solar/keystone

View File

@ -31,7 +31,6 @@ from solar import state
from solar.core import actions
from solar.core import resource as sresource
from solar.core.resource import assign_resources_to_nodes
from solar.core.resource import connect_resources
from solar.core import signals
from solar.core.tags_set_parser import Expression
from solar.interfaces.db import get_db
@ -71,7 +70,9 @@ def assign(resources, nodes):
lambda r: Expression(resources, r.get('tags', [])).evaluate(),
_get_resources_list())
print("For {0} nodes assign {1} resources".format(len(nodes), len(resources)))
click.echo(
"For {0} nodes assign {1} resources".format(len(nodes), len(resources))
)
assign_resources_to_nodes(resources, nodes)
@ -129,7 +130,7 @@ def init_changes():
@changes.command()
def stage():
log = operations.stage_changes()
print log.show()
click.echo(log.show())
@changes.command()
@click.option('--one', is_flag=True, default=False)
@ -142,7 +143,7 @@ def init_changes():
@changes.command()
@click.option('--limit', default=5)
def history(limit):
print state.CL().show()
click.echo(state.CL().show())
@changes.command()
@click.option('--last', is_flag=True, default=False)
@ -150,11 +151,11 @@ def init_changes():
@click.option('--uid', default=None)
def rollback(last, all, uid):
if last:
print operations.rollback_last()
click.echo(operations.rollback_last())
elif all:
print operations.rollback_all()
click.echo(operations.rollback_all())
elif uid:
print operations.rollback_uid(uid)
click.echo(operations.rollback_uid(uid))
def init_cli_connect():
@ -163,11 +164,11 @@ def init_cli_connect():
@click.argument('receiver')
@click.option('--mapping', default=None)
def connect(mapping, receiver, emitter):
print 'Connect', emitter, receiver
click.echo('Connect {} to {}'.format(emitter, receiver))
emitter = sresource.load(emitter)
receiver = sresource.load(receiver)
print emitter
print receiver
click.echo(emitter)
click.echo(receiver)
if mapping is not None:
mapping = json.loads(mapping)
signals.connect(emitter, receiver, mapping=mapping)
@ -176,11 +177,11 @@ def init_cli_connect():
@click.argument('emitter')
@click.argument('receiver')
def disconnect(receiver, emitter):
print 'Disconnect', emitter, receiver
click.echo('Disconnect {} from {}'.format(emitter, receiver))
emitter = sresource.load(emitter)
receiver = sresource.load(receiver)
print emitter
print receiver
click.echo(emitter)
click.echo(receiver)
signals.disconnect(emitter, receiver)
@ -214,10 +215,10 @@ def init_cli_connections():
)
)
keys = sorted(signals.CLIENTS)
clients = signals.Connections.read_clients()
keys = sorted(clients)
for emitter_name in keys:
show_emitter_connections(emitter_name,
signals.CLIENTS[emitter_name])
show_emitter_connections(emitter_name, clients[emitter_name])
# TODO: this requires graphing libraries
@connections.command()
@ -244,7 +245,7 @@ def init_cli_deployment_config():
@main.command()
@click.argument('filepath')
def deploy(filepath):
print 'Deploying from file {}'.format(filepath)
click.echo('Deploying from file {}'.format(filepath))
xd.deploy(filepath)
@ -257,7 +258,9 @@ def init_cli_resource():
@click.argument('resource_path')
@click.argument('action_name')
def action(action_name, resource_path):
print 'action', resource_path, action_name
click.echo(
'action {} for resource {}'.format(action_name, resource_path)
)
r = sresource.load(resource_path)
actions.resource_action(r, action_name)
@ -266,7 +269,7 @@ def init_cli_resource():
@click.argument('base_path')
@click.argument('args')
def create(args, base_path, name):
print 'create', name, base_path, args
click.echo('create {} {} {}'.format(name, base_path, args))
args = json.loads(args)
sresource.create(name, base_path, args)
@ -304,7 +307,7 @@ def init_cli_resource():
@click.argument('tag_name')
@click.option('--add/--delete', default=True)
def tag(add, tag_name, resource_path):
print 'Tag', resource_path, tag_name, add
click.echo('Tag {} with {} {}'.format(resource_path, tag_name, add))
r = sresource.load(resource_path)
if add:
r.add_tag(tag_name)

View File

@ -5,6 +5,7 @@ import shutil
import yaml
from solar.core import db
from solar.core.log import log
from solar.core import resource as xr
from solar.core import signals as xs
@ -27,7 +28,7 @@ def deploy(filename):
name = resource_definition['name']
model = os.path.join(workdir, resource_definition['model'])
args = resource_definition.get('args', {})
print 'Creating ', name, model, resource_save_path, args
log.debug('Creating %s %s %s %s', name, model, resource_save_path, args)
xr.create(name, model, resource_save_path, args=args)
# Create resource connections
@ -35,11 +36,11 @@ def deploy(filename):
emitter = db.get_resource(connection['emitter'])
receiver = db.get_resource(connection['receiver'])
mapping = connection.get('mapping')
print 'Connecting ', emitter.name, receiver.name, mapping
log.debug('Connecting %s %s %s', emitter.name, receiver.name, mapping)
xs.connect(emitter, receiver, mapping=mapping)
# Run all tests
if 'test-suite' in config:
print 'Running tests from {}'.format(config['test-suite'])
log.debug('Running tests from %s', config['test-suite'])
test_suite = __import__(config['test-suite'], {}, {}, ['main'])
test_suite.main()

View File

@ -2,23 +2,24 @@
import os
import subprocess
from solar.core.log import log
from solar.core.handlers.base import BaseHandler
from solar.state import STATES
class Ansible(BaseHandler):
def action(self, resource, action_name):
inventory_file = self._create_inventory(resource)
playbook_file = self._create_playbook(resource, action_name)
print 'inventory_file', inventory_file
print 'playbook_file', playbook_file
log.debug('inventory_file: %s', inventory_file)
log.debug('playbook_file: %s', playbook_file)
call_args = ['ansible-playbook', '--module-path', '/vagrant/library', '-i', inventory_file, playbook_file]
print 'EXECUTING: ', ' '.join(call_args)
log.debug('EXECUTING: %s', ' '.join(call_args))
try:
subprocess.check_output(call_args)
except subprocess.CalledProcessError as e:
print e.output
log.error(e.output)
log.exception(e)
raise
def _create_inventory(self, r):
@ -31,11 +32,8 @@ class Ansible(BaseHandler):
def _render_inventory(self, r):
inventory = '{0} ansible_ssh_host={1} ansible_connection=ssh ansible_ssh_user={2} ansible_ssh_private_key_file={3}'
host, user, ssh_key = r.args['ip'].value, r.args['ssh_user'].value, r.args['ssh_key'].value
print host
print user
print ssh_key
inventory = inventory.format(host, host, user, ssh_key)
print inventory
log.debug(inventory)
return inventory
def _create_playbook(self, resource, action):

View File

@ -5,6 +5,8 @@ import tempfile
from jinja2 import Template
from solar.core.log import log
class BaseHandler(object):
def __init__(self, resources):
@ -19,7 +21,7 @@ class BaseHandler(object):
return self
def __exit__(self, type, value, traceback):
print self.dst
log.debug(self.dst)
return
shutil.rmtree(self.dst)
@ -33,10 +35,11 @@ class BaseHandler(object):
return dest_file
def _render_action(self, resource, action):
print 'Rendering %s %s' % (resource.name, action)
log.debug('Rendering %s %s', resource.name, action)
action_file = resource.metadata['actions'][action]
action_file = os.path.join(resource.metadata['actions_path'], action_file)
log.debug('action file: %s', action_file)
args = self._make_args(resource)
with open(action_file) as f:

22
solar/solar/core/log.py Normal file
View File

@ -0,0 +1,22 @@
import logging
import sys
log = logging.getLogger('solar')
def setup_logger():
handler = logging.FileHandler('solar.log')
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s %(levelname)s %(funcName)s (%(filename)s::%(lineno)s)::%(message)s')
handler.setFormatter(formatter)
log.addHandler(handler)
print_formatter = logging.Formatter('%(levelname)s (%(filename)s::%(lineno)s)::%(message)s')
print_handler = logging.StreamHandler(stream=sys.stdout)
print_handler.setFormatter(print_formatter)
log.addHandler(print_handler)
log.setLevel(logging.DEBUG)
setup_logger()

View File

@ -1,3 +1,4 @@
from solar.core.log import log
from solar.core import signals
from solar.interfaces.db import get_db
@ -14,27 +15,28 @@ class BaseObserver(object):
:param value:
:return:
"""
self.attached_to = attached_to
self._attached_to_name = attached_to.name
self.name = name
self.value = value
self.receivers = []
# @property
# def receivers(self):
# from solar.core import resource
#
# signals.CLIENTS = signals.Connections.read_clients()
# for receiver_name, receiver_input in signals.Connections.receivers(
# self.attached_to.name,
# self.name
# ):
# yield resource.load(receiver_name).args[receiver_input]
@property
def attached_to(self):
from solar.core import resource
def log(self, msg):
print '{} {}'.format(self, msg)
return resource.load(self._attached_to_name)
@property
def receivers(self):
from solar.core import resource
for receiver_name, receiver_input in signals.Connections.receivers(
self._attached_to_name,
self.name
):
yield resource.load(receiver_name).args[receiver_input]
def __repr__(self):
return '[{}:{}] {}'.format(self.attached_to.name, self.name, self.value)
return '[{}:{}] {}'.format(self._attached_to_name, self.name, self.value)
def __unicode__(self):
return unicode(self.value)
@ -61,7 +63,7 @@ class BaseObserver(object):
def find_receiver(self, receiver):
fltr = [r for r in self.receivers
if r.attached_to == receiver.attached_to
if r._attached_to_name == receiver._attached_to_name
and r.name == receiver.name]
if fltr:
return fltr[0]
@ -71,12 +73,11 @@ class BaseObserver(object):
:param receiver: Observer
:return:
"""
self.log('Subscribe {}'.format(receiver))
log.debug('Subscribe %s', receiver)
# No multiple subscriptions
if self.find_receiver(receiver):
self.log('No multiple subscriptions from {}'.format(receiver))
log.error('No multiple subscriptions from %s', receiver)
return
self.receivers.append(receiver)
receiver.subscribed(self)
signals.Connections.add(
@ -89,16 +90,15 @@ class BaseObserver(object):
receiver.notify(self)
def subscribed(self, emitter):
self.log('Subscribed {}'.format(emitter))
log.debug('Subscribed %s', emitter)
def unsubscribe(self, receiver):
"""
:param receiver: Observer
:return:
"""
self.log('Unsubscribe {}'.format(receiver))
log.debug('Unsubscribe %s', receiver)
if self.find_receiver(receiver):
self.receivers.remove(receiver)
receiver.unsubscribed(self)
signals.Connections.remove(
@ -112,41 +112,42 @@ class BaseObserver(object):
#receiver.notify(self)
def unsubscribed(self, emitter):
self.log('Unsubscribed {}'.format(emitter))
log.debug('Unsubscribed %s', emitter)
class Observer(BaseObserver):
type_ = 'simple'
def __init__(self, *args, **kwargs):
super(Observer, self).__init__(*args, **kwargs)
self.emitter = None
@property
def emitter(self):
from solar.core import resource
emitter = signals.Connections.emitter(self._attached_to_name, self.name)
if emitter is not None:
emitter_name, emitter_input_name = emitter
return resource.load(emitter_name).args[emitter_input_name]
def notify(self, emitter):
self.log('Notify from {} value {}'.format(emitter, emitter.value))
log.debug('Notify from %s value %s', emitter, emitter.value)
# Copy emitter's values to receiver
self.value = emitter.value
for receiver in self.receivers:
receiver.notify(self)
self.attached_to.save()
self.attached_to.set_args_from_dict({self.name: self.value})
def update(self, value):
self.log('Updating to value {}'.format(value))
log.debug('Updating to value %s', value)
self.value = value
for receiver in self.receivers:
receiver.notify(self)
self.attached_to.save()
self.attached_to.set_args_from_dict({self.name: self.value})
def subscribed(self, emitter):
super(Observer, self).subscribed(emitter)
# Simple observer can be attached to at most one emitter
if self.emitter is not None:
self.emitter.unsubscribe(self)
self.emitter = emitter
def unsubscribed(self, emitter):
super(Observer, self).unsubscribed(emitter)
self.emitter = None
class ListObserver(BaseObserver):
@ -159,41 +160,42 @@ class ListObserver(BaseObserver):
def _format_value(emitter):
return {
'emitter': emitter.name,
'emitter_attached_to': emitter.attached_to.name,
'emitter_attached_to': emitter._attached_to_name,
'value': emitter.value,
}
def notify(self, emitter):
self.log('Notify from {} value {}'.format(emitter, emitter.value))
log.debug('Notify from %s value %s', emitter, emitter.value)
# Copy emitter's values to receiver
#self.value[emitter.attached_to.name] = emitter.value
idx = self._emitter_idx(emitter)
self.value[idx] = self._format_value(emitter)
for receiver in self.receivers:
receiver.notify(self)
self.attached_to.save()
self.attached_to.set_args_from_dict({self.name: self.value})
def subscribed(self, emitter):
super(ListObserver, self).subscribed(emitter)
idx = self._emitter_idx(emitter)
if idx is None:
self.value.append(self._format_value(emitter))
self.attached_to.set_args_from_dict({self.name: self.value})
def unsubscribed(self, emitter):
"""
:param receiver: Observer
:return:
"""
self.log('Unsubscribed emitter {}'.format(emitter))
log.debug('Unsubscribed emitter %s', emitter)
idx = self._emitter_idx(emitter)
self.value.pop(idx)
self.attached_to.set_args_from_dict({self.name: self.value})
for receiver in self.receivers:
receiver.notify(self)
def _emitter_idx(self, emitter):
try:
return [i for i, e in enumerate(self.value)
if e['emitter_attached_to'] == emitter.attached_to.name
if e['emitter_attached_to'] == emitter._attached_to_name
][0]
except IndexError:
return

View File

@ -4,8 +4,6 @@ import os
from copy import deepcopy
import yaml
import solar
from solar.core import actions
@ -24,25 +22,57 @@ class Resource(object):
def __init__(self, name, metadata, args, tags=None):
self.name = name
self.metadata = metadata
self.actions = metadata.get('actions', {}).keys() or None
self.args = {}
self.set_args(args)
self.changed = []
self.tags = tags or []
self.set_args_from_dict(args)
def set_args(self, args):
for arg_name, arg_value in args.items():
if not self.metadata['input'].get(arg_name):
continue
@property
def actions(self):
return self.metadata.get('actions') or []
metadata_arg = self.metadata['input'][arg_name]
@property
def args(self):
ret = {}
args = self.args_dict()
for arg_name, metadata_arg in self.metadata['input'].items():
type_ = validation.schema_input_type(metadata_arg.get('schema', 'str'))
value = arg_value
if not value and metadata_arg['value']:
value = metadata_arg['value']
ret[arg_name] = observer.create(
type_, self, arg_name, args.get(arg_name)
)
self.args[arg_name] = observer.create(type_, self, arg_name, value)
return ret
def args_dict(self):
raw_resource = db.read(self.name, collection=db.COLLECTIONS.resource)
if raw_resource is None:
return {}
self.metadata = raw_resource
args = self.metadata['input']
return {k: v['value'] for k, v in args.items()}
def set_args_from_dict(self, new_args):
args = self.args_dict()
args.update(new_args)
self.metadata['tags'] = self.tags
for k, v in args.items():
if k not in self.metadata['input']:
raise NotImplementedError(
'Argument {} not implemented for resource {}'.format(k, self)
)
self.metadata['input'][k]['value'] = v
db.save(self.name, self.metadata, collection=db.COLLECTIONS.resource)
def set_args(self, args):
self.set_args_from_dict({k: v.value for k, v in args.items()})
def __repr__(self):
return ("Resource(name='{id}', metadata={metadata}, args={input}, "
@ -85,9 +115,6 @@ class Resource(object):
return {k: formatter(v) for k, v in self.args.items()}
def args_dict(self):
return {k: v.value for k, v in self.args.items()}
def add_tag(self, tag):
if tag not in self.tags:
self.tags.append(tag)
@ -104,8 +131,10 @@ class Resource(object):
:param emitter: Resource
:return:
"""
r_args = self.args
for key, value in emitter.args.iteritems():
self.args[key].notify(value)
r_args[key].notify(value)
def update(self, args):
"""This method updates resource's args with a simple dict.
@ -116,8 +145,12 @@ class Resource(object):
# Update will be blocked if this resource is listening
# on some input that is to be updated -- we should only listen
# to the emitter and not be able to change the input's value
r_args = self.args
for key, value in args.iteritems():
self.args[key].update(value)
r_args[key].update(value)
self.set_args(r_args)
def action(self, action):
if action in self.actions:
@ -125,16 +158,6 @@ class Resource(object):
else:
raise Exception('Uuups, action is not available')
# TODO: versioning
def save(self):
metadata = copy.deepcopy(self.metadata)
metadata['tags'] = self.tags
for k, v in self.args_dict().items():
metadata['input'][k]['value'] = v
db.save(self.name, metadata, collection=db.COLLECTIONS.resource)
def create(name, base_path, args, tags=[], connections={}):
if not os.path.exists(base_path):
@ -156,7 +179,6 @@ def create(name, base_path, args, tags=[], connections={}):
resource = Resource(name, meta, args, tags=tags)
signals.assign_connections(resource, connections)
resource.save()
return resource
@ -173,7 +195,7 @@ def load(resource_name):
raw_resource = db.read(resource_name, collection=db.COLLECTIONS.resource)
if raw_resource is None:
raise NotImplementedError(
raise KeyError(
'Resource {} does not exist'.format(resource_name)
)
@ -187,8 +209,6 @@ def load_all():
resource = wrap_resource(raw_resource)
ret[resource.name] = resource
signals.Connections.reconnect_all()
return ret

View File

@ -1,42 +1,35 @@
# -*- coding: utf-8 -*-
import atexit
from collections import defaultdict
import itertools
import networkx as nx
import os
from solar import utils
from solar.core.log import log
from solar.interfaces.db import get_db
db = get_db()
CLIENTS_CONFIG_KEY = 'clients-data-file'
#CLIENTS = utils.read_config_file(CLIENTS_CONFIG_KEY)
CLIENTS = {}
class Connections(object):
"""
CLIENTS structure is:
emitter_name:
emitter_input_name:
- - dst_name
- dst_input_name
while DB structure is:
emitter_name_key:
emitter: emitter_name
sources:
emitter_input_name:
- - dst_name
- dst_input_name
"""
@staticmethod
def read_clients():
"""
Returned structure is:
emitter_name:
emitter_input_name:
- - dst_name
- dst_input_name
while DB structure is:
emitter_name_key:
emitter: emitter_name
sources:
emitter_input_name:
- - dst_name
- dst_input_name
"""
ret = {}
for data in db.get_list(collection=db.COLLECTIONS.connection):
@ -45,80 +38,64 @@ class Connections(object):
return ret
@staticmethod
def save_clients():
for emitter_name, sources in CLIENTS.items():
data = {
'emitter': emitter_name,
'sources': sources,
}
db.save(emitter_name, data, collection=db.COLLECTIONS.connection)
def save_clients(clients):
data = []
for emitter_name, sources in clients.items():
data.append((
emitter_name,
{
'emitter': emitter_name,
'sources': sources,
}))
db.save_list(data, collection=db.COLLECTIONS.connection)
@staticmethod
def add(emitter, src, receiver, dst):
if src not in emitter.args:
return
clients = Connections.read_clients()
# TODO: implement general circular detection, this one is simple
if [emitter.name, src] in CLIENTS.get(receiver.name, {}).get(dst, []):
if [emitter.name, src] in clients.get(receiver.name, {}).get(dst, []):
raise Exception('Attempted to create cycle in dependencies. Not nice.')
CLIENTS.setdefault(emitter.name, {})
CLIENTS[emitter.name].setdefault(src, [])
if [receiver.name, dst] not in CLIENTS[emitter.name][src]:
CLIENTS[emitter.name][src].append([receiver.name, dst])
clients.setdefault(emitter.name, {})
clients[emitter.name].setdefault(src, [])
if [receiver.name, dst] not in clients[emitter.name][src]:
clients[emitter.name][src].append([receiver.name, dst])
#utils.save_to_config_file(CLIENTS_CONFIG_KEY, CLIENTS)
Connections.save_clients()
Connections.save_clients(clients)
@staticmethod
def remove(emitter, src, receiver, dst):
CLIENTS[emitter.name][src] = [
destination for destination in CLIENTS[emitter.name][src]
clients = Connections.read_clients()
clients[emitter.name][src] = [
destination for destination in clients[emitter.name][src]
if destination != [receiver.name, dst]
]
#utils.save_to_config_file(CLIENTS_CONFIG_KEY, CLIENTS)
Connections.save_clients()
@staticmethod
def reconnect_all():
"""Reconstruct connections for resource inputs from CLIENTS.
:return:
"""
from solar.core.resource import wrap_resource
for emitter_name, dest_dict in CLIENTS.items():
emitter = wrap_resource(
db.read(emitter_name, collection=db.COLLECTIONS.resource)
)
for emitter_input, destinations in dest_dict.items():
for receiver_name, receiver_input in destinations:
receiver = wrap_resource(
db.read(receiver_name, collection=db.COLLECTIONS.resource)
)
emitter.args[emitter_input].subscribe(
receiver.args[receiver_input])
Connections.save_clients(clients)
@staticmethod
def receivers(emitter_name, emitter_input_name):
return CLIENTS.get(emitter_name, {}).get(emitter_input_name, [])
return Connections.read_clients().get(emitter_name, {}).get(
emitter_input_name, []
)
@staticmethod
def emitter(receiver_name, receiver_input_name):
for emitter_name, dest_dict in Connections.read_clients().items():
for emitter_input_name, destinations in dest_dict.items():
if [receiver_name, receiver_input_name] in destinations:
return [emitter_name, emitter_input_name]
@staticmethod
def clear():
global CLIENTS
CLIENTS = {}
@staticmethod
def flush():
print 'FLUSHING Connections'
#utils.save_to_config_file(CLIENTS_CONFIG_KEY, CLIENTS)
Connections.save_clients()
CLIENTS = Connections.read_clients()
#atexit.register(Connections.flush)
db.clear_collection(collection=db.COLLECTIONS.connection)
def guess_mapping(emitter, receiver):
@ -158,20 +135,24 @@ def connect(emitter, receiver, mapping=None):
emitter.args[src].subscribe(receiver.args[dst])
receiver.save()
#receiver.save()
def disconnect(emitter, receiver):
for src, destinations in CLIENTS[emitter.name].items():
disconnect_by_src(emitter.name, src, receiver)
clients = Connections.read_clients()
for src, destinations in clients[emitter.name].items():
for destination in destinations:
receiver_input = destination[1]
if receiver_input in receiver.args:
if receiver.args[receiver_input].type_ != 'list':
print 'Removing input {} from {}'.format(receiver_input, receiver.name)
log.debug(
'Removing input %s from %s', receiver_input, receiver.name
)
emitter.args[src].unsubscribe(receiver.args[receiver_input])
disconnect_by_src(emitter.name, src, receiver)
def disconnect_receiver_by_input(receiver, input):
"""Find receiver connection by input and disconnect it.
@ -180,36 +161,42 @@ def disconnect_receiver_by_input(receiver, input):
:param input:
:return:
"""
for emitter_name, inputs in CLIENTS.items():
emitter = db.read(emitter_name, collection=db.COLLECTIONS.resource)
disconnect_by_src(emitter['id'], input, receiver)
clients = Connections.read_clients()
for emitter_name, inputs in clients.items():
disconnect_by_src(emitter_name, input, receiver)
def disconnect_by_src(emitter_name, src, receiver):
if src in CLIENTS[emitter_name]:
CLIENTS[emitter_name][src] = [
destination for destination in CLIENTS[emitter_name][src]
clients = Connections.read_clients()
if src in clients[emitter_name]:
clients[emitter_name][src] = [
destination for destination in clients[emitter_name][src]
if destination[0] != receiver.name
]
#utils.save_to_config_file(CLIENTS_CONFIG_KEY, CLIENTS)
Connections.save_clients(clients)
def notify(source, key, value):
from solar.core.resource import wrap_resource
from solar.core.resource import load
CLIENTS.setdefault(source.name, {})
print 'Notify', source.name, key, value, CLIENTS[source.name]
if key in CLIENTS[source.name]:
for client, r_key in CLIENTS[source.name][key]:
resource = wrap_resource(
db.read(client, collection=db.COLLECTIONS.resource)
)
print 'Resource found', client
clients = Connections.read_clients()
if source.name not in clients:
clients[source.name] = {}
Connections.save_clients(clients)
log.debug('Notify %s %s %s %s', source.name, key, value, clients[source.name])
if key in clients[source.name]:
for client, r_key in clients[source.name][key]:
resource = load(client)
log.debug('Resource found: %s', client)
if resource:
resource.update({r_key: value}, emitter=source)
else:
print 'Resource {} deleted?'.format(client)
log.debug('Resource %s deleted?', client)
pass
@ -225,7 +212,9 @@ def assign_connections(receiver, connections):
def connection_graph():
resource_dependencies = {}
for emitter_name, destination_values in CLIENTS.items():
clients = Connections.read_clients()
for emitter_name, destination_values in clients.items():
resource_dependencies.setdefault(emitter_name, set())
for emitter_input, receivers in destination_values.items():
resource_dependencies[emitter_name].update(
@ -251,7 +240,9 @@ def connection_graph():
def detailed_connection_graph(start_with=None, end_with=None):
g = nx.MultiDiGraph()
for emitter_name, destination_values in CLIENTS.items():
clients = Connections.read_clients()
for emitter_name, destination_values in clients.items():
for emitter_input, receivers in destination_values.items():
for receiver_name, receiver_input in receivers:
label = '{}:{}'.format(emitter_input, receiver_input)

View File

@ -1,4 +1,6 @@
from jsonschema import validate, ValidationError, SchemaError
from jsonschema import validate, ValidationError
from solar.core.log import log
def schema_input_type(schema):
@ -86,9 +88,10 @@ def validate_input(value, jsonschema=None, schema=None):
validate(value, jsonschema)
except ValidationError as e:
return [e.message]
except:
print 'jsonschema', jsonschema
print 'value', value
except Exception as e:
log.error('jsonschema: %s', jsonschema)
log.error('value: %s', value)
log.exception(e)
raise

View File

@ -31,26 +31,55 @@ class RedisDB(object):
except TypeError:
return None
def get_list(self, collection=COLLECTIONS.resource):
key_glob = self._make_key(collection, '*')
keys = self._r.keys(key_glob)
with self._r.pipeline() as pipe:
pipe.multi()
values = [self._r.get(key) for key in keys]
pipe.execute()
for value in values:
yield json.loads(value)
def save(self, uid, data, collection=COLLECTIONS.resource):
return self._r.set(
ret = self._r.set(
self._make_key(collection, uid),
json.dumps(data)
)
def delete(self, uid, collection):
return self._r.delete(self._make_key(collection, uid))
return ret
def get_list(self, collection=COLLECTIONS.resource):
key_glob = self._make_key(collection, '*')
def save_list(self, lst, collection=COLLECTIONS.resource):
with self._r.pipeline() as pipe:
pipe.multi()
for key in self._r.keys(key_glob):
yield json.loads(self._r.get(key))
for uid, data in lst:
key = self._make_key(collection, uid)
pipe.set(key, json.dumps(data))
pipe.execute()
def clear(self):
self._r.flushdb()
def clear_collection(self, collection=COLLECTIONS.resource):
key_glob = self._make_key(collection, '*')
self._r.delete(self._r.keys(key_glob))
def delete(self, uid, collection=COLLECTIONS.resource):
self._r.delete(self._make_key(collection, uid))
def _make_key(self, collection, _id):
return '{0}:{1}'.format(collection.name, _id)
if isinstance(collection, self.COLLECTIONS):
collection = collection.name
return '{0}:{1}'.format(collection, _id)
class FakeRedisDB(RedisDB):

View File

@ -1,6 +1,7 @@
from solar import state
from solar.core.log import log
from solar.core import signals
from solar.core import resource
from solar import utils
@ -67,7 +68,7 @@ def _stage_changes(staged_resources, conn_graph,
srt = nx.topological_sort(conn_graph)
except:
for cycle in nx.simple_cycles(conn_graph):
print 'CYCLE: %s' % cycle
log.debug('CYCLE: %s', cycle)
raise
for res_uid in srt:
@ -88,7 +89,6 @@ def _stage_changes(staged_resources, conn_graph,
def stage_changes():
resources = resource.load_all()
conn_graph = signals.detailed_connection_graph()
staged = {r.name: to_dict(r, conn_graph) for r in resource.load_all().values()}
commited = state.CD()
@ -176,8 +176,7 @@ def rollback(log_item):
log.append(log_item)
res = resource.load(log_item.res)
res.set_args(staged['input'])
res.save()
res.set_args_from_dict(staged['input'])
return log_item

View File

@ -1,10 +1,7 @@
import os
from pytest import fixture
from solar.interfaces import db
from solar import utils
def pytest_configure():

View File

@ -1,7 +1,6 @@
from pytest import fixture
import mock
from dictdiffer import revert, patch, diff
from dictdiffer import revert, patch
import networkx as nx
from solar import operations

View File

@ -0,0 +1,67 @@
import unittest
import base
from solar.core import resource
from solar.core import signals
class TestResource(base.BaseResourceTest):
def test_resource_args(self):
sample_meta_dir = self.make_resource_meta("""
id: sample
handler: ansible
version: 1.0.0
input:
value:
schema: int
value: 0
""")
sample1 = self.create_resource(
'sample1', sample_meta_dir, {'value': 1}
)
self.assertEqual(sample1.args['value'].value, 1)
# test default value
sample2 = self.create_resource('sample2', sample_meta_dir, {})
self.assertEqual(sample2.args['value'].value, 0)
def test_connections_recreated_after_load(self):
"""
Create resource in some process. Then in other process load it.
All connections should remain the same.
"""
sample_meta_dir = self.make_resource_meta("""
id: sample
handler: ansible
version: 1.0.0
input:
value:
schema: int
value: 0
""")
def creating_process():
sample1 = self.create_resource(
'sample1', sample_meta_dir, {'value': 1}
)
sample2 = self.create_resource(
'sample2', sample_meta_dir, {}
)
signals.connect(sample1, sample2)
self.assertEqual(sample1.args['value'], sample2.args['value'])
creating_process()
signals.CLIENTS = {}
sample1 = resource.load('sample1')
sample2 = resource.load('sample2')
sample1.update({'value': 2})
self.assertEqual(sample1.args['value'], sample2.args['value'])
if __name__ == '__main__':
unittest.main()

View File

@ -26,7 +26,7 @@ input:
xs.connect(sample1, sample2)
self.assertEqual(
sample1.args['values'],
sample2.args['values'],
sample2.args['values']
)
self.assertEqual(
sample2.args['values'].emitter,
@ -135,7 +135,7 @@ input:
xs.connect(sample1, sample)
self.assertEqual(sample1.args['ip'], sample.args['ip'])
self.assertEqual(len(sample1.args['ip'].receivers), 1)
self.assertEqual(len(list(sample1.args['ip'].receivers)), 1)
self.assertEqual(
sample.args['ip'].emitter,
sample1.args['ip']
@ -144,7 +144,7 @@ input:
xs.connect(sample2, sample)
self.assertEqual(sample2.args['ip'], sample.args['ip'])
# sample should be unsubscribed from sample1 and subscribed to sample2
self.assertEqual(len(sample1.args['ip'].receivers), 0)
self.assertEqual(len(list(sample1.args['ip'].receivers)), 0)
self.assertEqual(sample.args['ip'].emitter, sample2.args['ip'])
sample2.update({'ip': '10.0.0.3'})
@ -387,10 +387,10 @@ input:
'sample2', sample_meta_dir, {'ip': '10.0.0.2', 'port': '1001'}
)
list_input = self.create_resource(
'list-input', list_input_meta_dir, {'ips': [], 'ports': []}
'list-input', list_input_meta_dir, {}
)
list_input_nested = self.create_resource(
'list-input-nested', list_input_nested_meta_dir, {'ipss': [], 'portss': []}
'list-input-nested', list_input_nested_meta_dir, {}
)
xs.connect(sample1, list_input, mapping={'ip': 'ips', 'port': 'ports'})

View File

@ -1,6 +1,5 @@
import pytest
from mock import patch
from solar.core import resource
from solar import operations
@ -15,19 +14,16 @@ def default_resources():
node1 = resource.wrap_resource(
{'id': 'node1',
'input': {'ip': {'value':'10.0.0.3'}}})
node1.save()
rabbitmq_service1 = resource.wrap_resource(
{'id':'rabbitmq', 'input': {
'ip' : {'value': ''},
'image': {'value': 'rabbitmq:3-management'}}})
rabbitmq_service1.save()
signals.connect(node1, rabbitmq_service1)
return resource.load_all()
@patch('solar.core.actions.resource_action')
@pytest.mark.usefixtures("default_resources")
def test_changes_on_update_image(maction):
def test_changes_on_update_image():
log = operations.stage_changes()
assert len(log) == 2

View File

@ -1,37 +1,31 @@
import pytest
from mock import patch
from solar.core import signals
from solar.core import resource
from solar import operations
from solar import state
@pytest.fixture
def resources():
node1 = resource.wrap_resource(
{'id': 'node1',
'input': {'ip': {'value':'10.0.0.3'}}})
node1.save()
'input': {'ip': {'value': '10.0.0.3'}}})
mariadb_service1 = resource.wrap_resource(
{'id':'mariadb', 'input': {
{'id': 'mariadb', 'input': {
'port' : {'value': 3306},
'ip': {'value': ''}}})
mariadb_service1.save()
keystone_db = resource.wrap_resource(
{'id':'keystone_db',
'input': {
'login_port' : {'value': ''},
'ip': {'value': ''}}})
keystone_db.save()
signals.connect(node1, mariadb_service1)
signals.connect(node1, keystone_db)
signals.connect(mariadb_service1, keystone_db, {'port': 'login_port'})
return resource.load_all()
@patch('solar.core.actions.resource_action')
def test_update_port_on_mariadb(maction, resources):
def test_update_port_on_mariadb(resources):
operations.stage_changes()
operations.commit_changes()
@ -60,29 +54,25 @@ def test_update_port_on_mariadb(maction, resources):
def list_input():
res1 = resource.wrap_resource(
{'id': 'res1', 'input': {'ip': {'value': '10.10.0.2'}}})
res1.save()
res2 = resource.wrap_resource(
{'id': 'res2', 'input': {'ip': {'value': '10.10.0.3'}}})
res2.save()
consumer = resource.wrap_resource(
{'id': 'consumer', 'input':
{'ips': {'value': [],
'schema': ['str']}}})
consumer.save()
signals.connect(res1, consumer, {'ip': 'ips'})
signals.connect(res2, consumer, {'ip': 'ips'})
return resource.load_all()
@patch('solar.core.actions.resource_action')
def test_update_list_resource(maction, list_input):
@pytest.mark.xfail
def test_update_list_resource(list_input):
operations.stage_changes()
operations.commit_changes()
res3 = resource.wrap_resource(
{'id': 'res3', 'input': {'ip': {'value': '10.10.0.4'}}})
res3.save()
signals.connect(res3, list_input['consumer'], {'ip': 'ips'})
log = operations.stage_changes()
@ -102,7 +92,7 @@ def test_update_list_resource(maction, list_input):
u'ips': [
{u'emitter_attached_to': u'res1', u'emitter': u'ip', u'value': u'10.10.0.2'},
{u'emitter_attached_to': u'res2', u'emitter': u'ip', u'value': u'10.10.0.3'},
{'emitter_attached_to': 'res3', 'emitter': 'ip', 'value': '10.10.0.4'}]}
{u'emitter_attached_to': u'res3', u'emitter': u'ip', u'value': u'10.10.0.4'}]}
log_item = operations.rollback_last()
assert log_item.diff == [