From 84600e4a1c04b678301117336640970e886d59bf Mon Sep 17 00:00:00 2001 From: Serg Melikyan Date: Thu, 10 Oct 2013 11:24:49 +0400 Subject: [PATCH] Cherry-picked changes from release-0.3 * Version bump to 0.3 * Add delays before reconnect to RabbitMQ * "TypeError: 'str' object does not support item deletion" on handling EP v2 errors * Syntax for partial macro-parameters evaluation in templates * Execution of a new rule in workflow is looping when array 'Commands' in template is empty Change-Id: I1f1bbcf8153a6f9a369419629a6619d562559e31 --- muranoconductor/app.py | 5 +++++ muranoconductor/helpers.py | 17 ++++++++++++++++- muranoconductor/vm_agent.py | 4 ++-- setup.cfg | 2 +- 4 files changed, 24 insertions(+), 4 deletions(-) diff --git a/muranoconductor/app.py b/muranoconductor/app.py index bdb3642..fcc9072 100644 --- a/muranoconductor/app.py +++ b/muranoconductor/app.py @@ -60,6 +60,7 @@ class ConductorWorkflowService(service.Service): return MqClient(**connection_params) def _start_rabbitmq(self): + reconnect_delay = 1 while True: try: with self.create_rmq_client() as mq: @@ -68,6 +69,7 @@ class ConductorWorkflowService(service.Service): with mq.open('tasks', prefetch_count= cfg.CONF.max_environments) as subscription: + reconnect_delay = 1 while True: msg = subscription.get_message(timeout=2) if msg is not None: @@ -75,6 +77,9 @@ class ConductorWorkflowService(service.Service): except Exception as ex: log.exception(ex) + eventlet.sleep(reconnect_delay) + reconnect_delay = min(reconnect_delay * 2, 60) + def _task_received(self, message): task = message.body or {} message_id = message.id diff --git a/muranoconductor/helpers.py b/muranoconductor/helpers.py index b5cdc72..c5925ce 100644 --- a/muranoconductor/helpers.py +++ b/muranoconductor/helpers.py @@ -15,6 +15,7 @@ import deep import types +import re def transform_json(json, mappings): @@ -35,13 +36,27 @@ def transform_json(json, mappings): return result elif isinstance(json, types.StringTypes) and json.startswith('$'): - value = mappings.get(json[1:]) + value = convert_macro_parameter(json[1:], mappings) if value is not None: return value return json +def convert_macro_parameter(macro, mappings): + replaced = [False] + + def replace(match): + replaced[0] = True + return unicode(mappings.get(match.group(1))) + + result = re.sub('{(\\w+?)}', replace, macro) + if replaced[0]: + return result + else: + return mappings.get(macro) + + def merge_lists(list1, list2): result = [] for item in list1 + list2: diff --git a/muranoconductor/vm_agent.py b/muranoconductor/vm_agent.py index 6545e0e..2785562 100644 --- a/muranoconductor/vm_agent.py +++ b/muranoconductor/vm_agent.py @@ -66,7 +66,7 @@ def _extract_v2_results(result_value, ok, errors): } for attr in ('Message', 'AdditionalInfo'): if attr in body: - del attr[body] + del body[attr] err['extra'] = body if body else None errors.append(err) @@ -91,7 +91,7 @@ def send_command(engine, context, body, template, service, unit, errors = [] _extract_results(result_value, ok, errors) - if ok: + if ok or not errors: if result is not None: context[result] = ok success_handler = body.find('success') diff --git a/setup.cfg b/setup.cfg index 06f3815..1e1b0df 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,7 +1,7 @@ [metadata] name = murano-conductor summary = The Conductor is orchestration engine server -version = 0.2 +version = 0.3 description-file = README.rst license = Apache License, Version 2.0