From 7be1f27f0bd6b04d83a675c92db5cd8674488c2b Mon Sep 17 00:00:00 2001 From: Stan Lagun Date: Tue, 26 Mar 2013 19:06:56 +0400 Subject: [PATCH] Naming conventions changed --- conductor/conductor/app.py | 22 ++++++++++---- conductor/conductor/cloud_formation.py | 29 +++++++++++++++--- .../conductor/commands/cloud_formation.py | 6 ++++ conductor/conductor/commands/dispatcher.py | 2 +- conductor/conductor/commands/windows_agent.py | 8 ++--- conductor/conductor/helpers.py | 5 +++- conductor/conductor/windows_agent.py | 4 +-- conductor/conductor/workflow.py | 14 ++++----- .../templates/agent-config/Default.template | 1 + conductor/data/workflows/AD.xml | 30 +++++++++++++++---- conductor/data/workflows/Common.xml | 7 +++++ conductor/data/workflows/IIS.xml | 15 ++++++---- 12 files changed, 107 insertions(+), 36 deletions(-) create mode 100644 conductor/data/workflows/Common.xml diff --git a/conductor/conductor/app.py b/conductor/conductor/app.py index da17e9b..fe44f2d 100644 --- a/conductor/conductor/app.py +++ b/conductor/conductor/app.py @@ -9,6 +9,9 @@ from config import Config import reporting import rabbitmq +import windows_agent +import cloud_formation + config = Config(sys.argv[1] if len(sys.argv) > 1 else None) @@ -18,7 +21,7 @@ def task_received(task, message_id): reporter = reporting.Reporter(rmqclient, message_id, task['id']) command_dispatcher = CommandDispatcher( - task['name'], rmqclient, task['token']) + task['id'], rmqclient, task['token']) workflows = [] for path in glob.glob("data/workflows/*.xml"): print "loading", path @@ -28,14 +31,21 @@ def task_received(task, message_id): while True: try: - for workflow in workflows: - workflow.execute() + while True: + result = False + for workflow in workflows: + if workflow.execute(): + result = True + if not result: + break if not command_dispatcher.execute_pending(): break except Exception: break command_dispatcher.close() + + del task['token'] result_msg = rabbitmq.Message() result_msg.body = task result_msg.id = message_id @@ -59,9 +69,9 @@ class ConductorWorkflowService(service.Service): while True: try: with rabbitmq.RmqClient() as rmq: - rmq.declare('tasks', 'tasks') - rmq.declare('task-results', 'tasks') - with rmq.open('tasks') as subscription: + rmq.declare('tasks2', 'tasks2') + rmq.declare('task-results', 'tasks2') + with rmq.open('tasks2') as subscription: while True: msg = subscription.get_message() self.tg.add_thread( diff --git a/conductor/conductor/cloud_formation.py b/conductor/conductor/cloud_formation.py index 7303801..9798832 100644 --- a/conductor/conductor/cloud_formation.py +++ b/conductor/conductor/cloud_formation.py @@ -2,6 +2,8 @@ import base64 import xml_code_engine import config +from random import choice +import time def update_cf_stack(engine, context, body, template, mappings, arguments, **kwargs): @@ -16,7 +18,7 @@ def update_cf_stack(engine, context, body, template, arguments=arguments, callback=callback) -def prepare_user_data(context, template='Default', **kwargs): +def prepare_user_data(context, hostname, service, unit, template='Default', **kwargs): settings = config.CONF.rabbitmq with open('data/init.ps1') as init_script_file: @@ -26,17 +28,36 @@ def prepare_user_data(context, template='Default', **kwargs): template_data = template_file.read() template_data = template_data.replace( '%RABBITMQ_HOST%', settings.host) + template_data = template_data.replace( + '%RABBITMQ_INPUT_QUEUE%', + '-'.join([str(context['/dataSource']['id']), + str(service), str(unit)]) + ) template_data = template_data.replace( '%RESULT_QUEUE%', - '-execution-results-%s' % str(context['/dataSource']['name'])) + '-execution-results-%s' % str(context['/dataSource']['id'])) - return init_script.replace( + init_script = init_script.replace( '%WINDOWS_AGENT_CONFIG_BASE64%', base64.b64encode(template_data)) + init_script = init_script.replace('%INTERNAL_HOSTNAME%', hostname) + + return init_script + +def generate_hostname(**kwargs): + chars = 'abcdefghijklmnopqrstuvwxyz' + prefix = ''.join(choice(chars) for _ in range(4)) + return prefix + str(int(time.time() * 10)) + + + xml_code_engine.XmlCodeEngine.register_function( update_cf_stack, "update-cf-stack") xml_code_engine.XmlCodeEngine.register_function( - prepare_user_data, "prepare_user_data") + prepare_user_data, "prepare-user-data") + +xml_code_engine.XmlCodeEngine.register_function( + generate_hostname, "generate-hostname") diff --git a/conductor/conductor/commands/cloud_formation.py b/conductor/conductor/commands/cloud_formation.py index ab62ac5..44f4e08 100644 --- a/conductor/conductor/commands/cloud_formation.py +++ b/conductor/conductor/commands/cloud_formation.py @@ -67,18 +67,24 @@ class HeatExecutor(CommandBase): # ]) try: + print 'try update' self._heat_client.stacks.update( stack_id=self._stack, parameters=arguments, template=template) + print 'wait update' self._wait_state('UPDATE_COMPLETE') except heatclient.exc.HTTPNotFound: + print 'try create' + self._heat_client.stacks.create( stack_name=self._stack, parameters=arguments, template=template) + print 'wait create' self._wait_state('CREATE_COMPLETE') + pending_list = self._pending_list self._pending_list = [] diff --git a/conductor/conductor/commands/dispatcher.py b/conductor/conductor/commands/dispatcher.py index 8208856..d44f224 100644 --- a/conductor/conductor/commands/dispatcher.py +++ b/conductor/conductor/commands/dispatcher.py @@ -8,7 +8,7 @@ class CommandDispatcher(command.CommandBase): self._command_map = { 'cf': cloud_formation.HeatExecutor(environment_id, token), 'agent': windows_agent.WindowsAgentExecutor( - environment_id, rmqclient, environment_id) + environment_id, rmqclient) } def execute(self, name, **kwargs): diff --git a/conductor/conductor/commands/windows_agent.py b/conductor/conductor/commands/windows_agent.py index 978ddd2..8573489 100644 --- a/conductor/conductor/commands/windows_agent.py +++ b/conductor/conductor/commands/windows_agent.py @@ -7,14 +7,14 @@ from command import CommandBase class WindowsAgentExecutor(CommandBase): - def __init__(self, stack, rmqclient, environment): + def __init__(self, stack, rmqclient): self._stack = stack self._rmqclient = rmqclient self._pending_list = [] - self._results_queue = '-execution-results-%s' % str(environment) + self._results_queue = '-execution-results-%s' % str(stack) rmqclient.declare(self._results_queue) - def execute(self, template, mappings, host, callback): + def execute(self, template, mappings, host, service, callback): with open('data/templates/agent/%s.template' % template) as template_file: template_data = template_file.read() @@ -23,7 +23,7 @@ class WindowsAgentExecutor(CommandBase): json.loads(template_data), mappings) id = str(uuid.uuid4()).lower() - host = ('%s-%s' % (self._stack, host)).lower().replace(' ', '-') + host = ('%s-%s-%s' % (self._stack, service, host)).lower() self._pending_list.append({ 'id': id, 'callback': callback diff --git a/conductor/conductor/helpers.py b/conductor/conductor/helpers.py index 435a35b..b133836 100644 --- a/conductor/conductor/helpers.py +++ b/conductor/conductor/helpers.py @@ -1,6 +1,5 @@ import types - def transform_json(json, mappings): if isinstance(json, types.ListType): return [transform_json(t, mappings) for t in json] @@ -47,3 +46,7 @@ def find(f, seq): return item, index index += 1 return None, -1 + + +def generate(length): + return ''.join(choice(chars) for _ in range(length)) diff --git a/conductor/conductor/windows_agent.py b/conductor/conductor/windows_agent.py index 2e7761c..3a4fda7 100644 --- a/conductor/conductor/windows_agent.py +++ b/conductor/conductor/windows_agent.py @@ -1,7 +1,7 @@ import xml_code_engine -def send_command(engine, context, body, template, host, mappings=None, +def send_command(engine, context, body, template, service, host, mappings=None, result=None, **kwargs): if not mappings: mappings = {} command_dispatcher = context['/commandDispatcher'] @@ -18,7 +18,7 @@ def send_command(engine, context, body, template, host, mappings=None, command_dispatcher.execute(name='agent', template=template, mappings=mappings, - host=host, callback=callback) + host=host, service=service, callback=callback) xml_code_engine.XmlCodeEngine.register_function(send_command, "send-command") \ No newline at end of file diff --git a/conductor/conductor/workflow.py b/conductor/conductor/workflow.py index 46d1d3d..94d357e 100644 --- a/conductor/conductor/workflow.py +++ b/conductor/conductor/workflow.py @@ -17,14 +17,12 @@ class Workflow(object): self._reporter = reporter def execute(self): - while True: - context = function_context.Context() - context['/dataSource'] = self._data - context['/commandDispatcher'] = self._command_dispatcher - context['/config'] = self._config - context['/reporter'] = self._reporter - if not self._engine.execute(context): - break + context = function_context.Context() + context['/dataSource'] = self._data + context['/commandDispatcher'] = self._command_dispatcher + context['/config'] = self._config + context['/reporter'] = self._reporter + return self._engine.execute(context) @staticmethod def _get_path(obj, path, create_non_existing=False): diff --git a/conductor/data/templates/agent-config/Default.template b/conductor/data/templates/agent-config/Default.template index 179bfb8..ff5c3c4 100644 --- a/conductor/data/templates/agent-config/Default.template +++ b/conductor/data/templates/agent-config/Default.template @@ -22,6 +22,7 @@ + diff --git a/conductor/data/workflows/AD.xml b/conductor/data/workflows/AD.xml index 929069f..c394fbd 100644 --- a/conductor/data/workflows/AD.xml +++ b/conductor/data/workflows/AD.xml @@ -5,7 +5,7 @@ - + @@ -13,11 +13,13 @@ - - - + + @@ -44,6 +46,9 @@ + @@ -64,6 +69,9 @@ + @@ -88,6 +96,9 @@ + @@ -113,6 +124,9 @@ + + + + + Creating instance - + WS- - + + + + +