This commit is contained in:
Stan Lagun 2013-03-27 11:40:37 +04:00
parent dfdd144f96
commit d57113ca48
12 changed files with 33 additions and 37 deletions

View File

@ -16,7 +16,6 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import os
import sys import sys

View File

@ -62,7 +62,6 @@ def task_received(task, message_id):
message_id, anyjson.dumps(task))) message_id, anyjson.dumps(task)))
class ConductorWorkflowService(service.Service): class ConductorWorkflowService(service.Service):
def __init__(self): def __init__(self):
super(ConductorWorkflowService, self).__init__() super(ConductorWorkflowService, self).__init__()

View File

@ -6,6 +6,7 @@ from random import choice
import time import time
import string import string
def update_cf_stack(engine, context, body, template, def update_cf_stack(engine, context, body, template,
mappings, arguments, **kwargs): mappings, arguments, **kwargs):
command_dispatcher = context['/commandDispatcher'] command_dispatcher = context['/commandDispatcher']
@ -17,6 +18,7 @@ def update_cf_stack(engine, context, body, template,
name='cf', command='CreateOrUpdate', template=template, name='cf', command='CreateOrUpdate', template=template,
mappings=mappings, arguments=arguments, callback=callback) mappings=mappings, arguments=arguments, callback=callback)
def delete_cf_stack(engine, context, body, **kwargs): def delete_cf_stack(engine, context, body, **kwargs):
command_dispatcher = context['/commandDispatcher'] command_dispatcher = context['/commandDispatcher']
@ -31,8 +33,8 @@ def prepare_user_data(context, hostname, service, unit, template='Default', **kw
settings = config.CONF.rabbitmq settings = config.CONF.rabbitmq
with open('data/init.ps1') as init_script_file: with open('data/init.ps1') as init_script_file:
with open('data/templates/agent-config/%s.template' with open('data/templates/agent-config/{0}.template'.format(
% template) as template_file: template)) as template_file:
init_script = init_script_file.read() init_script = init_script_file.read()
template_data = template_file.read() template_data = template_file.read()
template_data = template_data.replace( template_data = template_data.replace(
@ -44,8 +46,8 @@ def prepare_user_data(context, hostname, service, unit, template='Default', **kw
) )
template_data = template_data.replace( template_data = template_data.replace(
'%RESULT_QUEUE%', '%RESULT_QUEUE%',
'-execution-results-%s' % '-execution-results-{0}'.format(
str(context['/dataSource']['id']).lower()) str(context['/dataSource']['id'])).lower())
init_script = init_script.replace( init_script = init_script.replace(
'%WINDOWS_AGENT_CONFIG_BASE64%', '%WINDOWS_AGENT_CONFIG_BASE64%',
@ -57,6 +59,7 @@ def prepare_user_data(context, hostname, service, unit, template='Default', **kw
counter = 0 counter = 0
def int2base(x, base): def int2base(x, base):
digs = string.digits + string.lowercase digs = string.digits + string.lowercase
if x < 0: sign = -1 if x < 0: sign = -1
@ -72,6 +75,7 @@ def int2base(x, base):
digits.reverse() digits.reverse()
return ''.join(digits) return ''.join(digits)
def generate_hostname(**kwargs): def generate_hostname(**kwargs):
global counter global counter
prefix = ''.join(choice(string.lowercase) for _ in range(5)) prefix = ''.join(choice(string.lowercase) for _ in range(5))
@ -81,8 +85,6 @@ def generate_hostname(**kwargs):
return prefix + timestamp + suffix return prefix + timestamp + suffix
xml_code_engine.XmlCodeEngine.register_function( xml_code_engine.XmlCodeEngine.register_function(
update_cf_stack, "update-cf-stack") update_cf_stack, "update-cf-stack")

View File

@ -19,7 +19,7 @@ class HeatExecutor(CommandBase):
self._stack = 'e' + stack self._stack = 'e' + stack
settings = conductor.config.CONF.heat settings = conductor.config.CONF.heat
self._heat_client = Client('1', settings.url, self._heat_client = Client('1', settings.url,
token_only=True, token=token) token_only=True, token=token)
def execute(self, command, callback, **kwargs): def execute(self, command, callback, **kwargs):
log.debug('Got command {0} on stack {1}'.format(command, self._stack)) log.debug('Got command {0} on stack {1}'.format(command, self._stack))
@ -33,7 +33,6 @@ class HeatExecutor(CommandBase):
elif command == 'Delete': elif command == 'Delete':
return self._execute_delete(callback) return self._execute_delete(callback)
def _execute_create_update(self, template, mappings, arguments, callback): def _execute_create_update(self, template, mappings, arguments, callback):
with open('data/templates/cf/%s.template' % template) as template_file: with open('data/templates/cf/%s.template' % template) as template_file:
template_data = template_file.read() template_data = template_file.read()
@ -52,10 +51,9 @@ class HeatExecutor(CommandBase):
'callback': callback 'callback': callback
}) })
def has_pending_commands(self): def has_pending_commands(self):
return len(self._update_pending_list) + \ return len(self._update_pending_list) + \
len(self._delete_pending_list) > 0 len(self._delete_pending_list) > 0
def execute_pending(self): def execute_pending(self):
r1 = self._execute_pending_updates() r1 = self._execute_pending_updates()
@ -74,7 +72,8 @@ class HeatExecutor(CommandBase):
arguments = conductor.helpers.merge_dicts( arguments = conductor.helpers.merge_dicts(
arguments, t['arguments'], max_levels=1) arguments, t['arguments'], max_levels=1)
log.info('Executing heat template {0} with arguments {1} on stack {2}' log.info(
'Executing heat template {0} with arguments {1} on stack {2}'
.format(anyjson.dumps(template), arguments, self._stack)) .format(anyjson.dumps(template), arguments, self._stack))
try: try:
@ -82,8 +81,8 @@ class HeatExecutor(CommandBase):
stack_id=self._stack, stack_id=self._stack,
parameters=arguments, parameters=arguments,
template=template) template=template)
log.debug('Waiting for the stack {0} to be update' log.debug(
.format(self._stack)) 'Waiting for the stack {0} to be update'.format(self._stack))
self._wait_state('UPDATE_COMPLETE') self._wait_state('UPDATE_COMPLETE')
log.info('Stack {0} updated'.format(self._stack)) log.info('Stack {0} updated'.format(self._stack))
except heatclient.exc.HTTPNotFound: except heatclient.exc.HTTPNotFound:
@ -91,12 +90,11 @@ class HeatExecutor(CommandBase):
stack_name=self._stack, stack_name=self._stack,
parameters=arguments, parameters=arguments,
template=template) template=template)
log.debug('Waiting for the stack {0} to be create' log.debug('Waiting for the stack {0} to be create'.format(
.format(self._stack)) self._stack))
self._wait_state('CREATE_COMPLETE') self._wait_state('CREATE_COMPLETE')
log.info('Stack {0} created'.format(self._stack)) log.info('Stack {0} created'.format(self._stack))
pending_list = self._update_pending_list pending_list = self._update_pending_list
self._update_pending_list = [] self._update_pending_list = []
@ -113,8 +111,8 @@ class HeatExecutor(CommandBase):
try: try:
self._heat_client.stacks.delete( self._heat_client.stacks.delete(
stack_id=self._stack) stack_id=self._stack)
log.debug('Waiting for the stack {0} to be deleted' log.debug(
.format(self._stack)) 'Waiting for the stack {0} to be deleted'.format(self._stack))
self._wait_state(['DELETE_COMPLETE', '']) self._wait_state(['DELETE_COMPLETE', ''])
log.info('Stack {0} deleted'.format(self._stack)) log.info('Stack {0} deleted'.format(self._stack))
except Exception as ex: except Exception as ex:
@ -133,7 +131,6 @@ class HeatExecutor(CommandBase):
else: else:
states = [state] states = [state]
while True: while True:
try: try:
status = self._heat_client.stacks.get( status = self._heat_client.stacks.get(
@ -147,4 +144,3 @@ class HeatExecutor(CommandBase):
if status not in states: if status not in states:
raise EnvironmentError() raise EnvironmentError()
return return

View File

@ -28,7 +28,6 @@ class CommandDispatcher(command.CommandBase):
return result return result
def close(self): def close(self):
for t in self._command_map.values(): for t in self._command_map.values():
t.close() t.close()

View File

@ -18,9 +18,8 @@ class WindowsAgentExecutor(CommandBase):
rmqclient.declare(self._results_queue) rmqclient.declare(self._results_queue)
def execute(self, template, mappings, host, service, callback): def execute(self, template, mappings, host, service, callback):
with open('data/templates/agent/%s.template' % with open('data/templates/agent/%s.template' % template) as file:
template) as template_file: template_data = file.read()
template_data = template_file.read()
template_data = conductor.helpers.transform_json( template_data = conductor.helpers.transform_json(
json.loads(template_data), mappings) json.loads(template_data), mappings)
@ -57,7 +56,6 @@ class WindowsAgentExecutor(CommandBase):
self._pending_list.pop(index) self._pending_list.pop(index)
item['callback'](msg.body) item['callback'](msg.body)
return True return True

View File

@ -73,6 +73,7 @@ def parse_args(args=None, usage=None, default_config_files=None):
usage=usage, usage=usage,
default_config_files=default_config_files) default_config_files=default_config_files)
def setup_logging(): def setup_logging():
""" """
Sets up the logging options for a log with supplied name Sets up the logging options for a log with supplied name

View File

@ -1,5 +1,6 @@
import types import types
def transform_json(json, mappings): def transform_json(json, mappings):
if isinstance(json, types.ListType): if isinstance(json, types.ListType):
return [transform_json(t, mappings) for t in json] return [transform_json(t, mappings) for t in json]
@ -46,7 +47,3 @@ def find(f, seq):
return item, index return item, index
index += 1 index += 1
return None, -1 return None, -1
def generate(length):
return ''.join(choice(chars) for _ in range(length))

View File

@ -4,6 +4,7 @@ puka = patcher.import_patched('puka')
import anyjson import anyjson
import config import config
class RmqClient(object): class RmqClient(object):
def __init__(self): def __init__(self):
settings = config.CONF.rabbitmq settings = config.CONF.rabbitmq
@ -65,6 +66,7 @@ class RmqClient(object):
return Subscription(self._client, queue) return Subscription(self._client, queue)
class Subscription(object): class Subscription(object):
def __init__(self, client, queue): def __init__(self, client, queue):
self._client = client self._client = client

View File

@ -1,7 +1,7 @@
import xml_code_engine import xml_code_engine
import json
import rabbitmq import rabbitmq
class Reporter(object): class Reporter(object):
def __init__(self, rmqclient, task_id, environment_id): def __init__(self, rmqclient, task_id, environment_id):
self._rmqclient = rmqclient self._rmqclient = rmqclient

View File

@ -6,12 +6,14 @@ log = logging.getLogger(__name__)
def send_command(engine, context, body, template, service, host, mappings=None, def send_command(engine, context, body, template, service, host, mappings=None,
result=None, **kwargs): result=None, **kwargs):
if not mappings: mappings = {} if not mappings:
mappings = {}
command_dispatcher = context['/commandDispatcher'] command_dispatcher = context['/commandDispatcher']
def callback(result_value): def callback(result_value):
log.info('Received result from {3} for {0}: {1}. Body is {2}'.format( log.info(
template, result_value, body, host)) 'Received result from {3} for {0}: {1}. Body is {2}'.format(
template, result_value, body, host))
if result is not None: if result is not None:
context[result] = result_value['Result'] context[result] = result_value['Result']
@ -19,8 +21,8 @@ def send_command(engine, context, body, template, service, host, mappings=None,
if success_handler is not None: if success_handler is not None:
engine.evaluate_content(success_handler, context) engine.evaluate_content(success_handler, context)
command_dispatcher.execute(name='agent', command_dispatcher.execute(
template=template, mappings=mappings, name='agent', template=template, mappings=mappings,
host=host, service=service, callback=callback) host=host, service=service, callback=callback)

View File

@ -5,6 +5,7 @@ import re
import xml_code_engine import xml_code_engine
import function_context import function_context
class Workflow(object): class Workflow(object):
def __init__(self, filename, data, command_dispatcher, config, reporter): def __init__(self, filename, data, command_dispatcher, config, reporter):
self._data = data self._data = data