Cherry-picked changes from release-0.3
* Version bump to 0.3 * Add delays before reconnect to RabbitMQ * "TypeError: 'str' object does not support item deletion" on handling EP v2 errors * Syntax for partial macro-parameters evaluation in templates * Execution of a new rule in workflow is looping when array 'Commands' in template is empty Change-Id: I1f1bbcf8153a6f9a369419629a6619d562559e31
This commit is contained in:
parent
c1ef10415c
commit
84600e4a1c
@ -60,6 +60,7 @@ class ConductorWorkflowService(service.Service):
|
||||
return MqClient(**connection_params)
|
||||
|
||||
def _start_rabbitmq(self):
|
||||
reconnect_delay = 1
|
||||
while True:
|
||||
try:
|
||||
with self.create_rmq_client() as mq:
|
||||
@ -68,6 +69,7 @@ class ConductorWorkflowService(service.Service):
|
||||
with mq.open('tasks',
|
||||
prefetch_count=
|
||||
cfg.CONF.max_environments) as subscription:
|
||||
reconnect_delay = 1
|
||||
while True:
|
||||
msg = subscription.get_message(timeout=2)
|
||||
if msg is not None:
|
||||
@ -75,6 +77,9 @@ class ConductorWorkflowService(service.Service):
|
||||
except Exception as ex:
|
||||
log.exception(ex)
|
||||
|
||||
eventlet.sleep(reconnect_delay)
|
||||
reconnect_delay = min(reconnect_delay * 2, 60)
|
||||
|
||||
def _task_received(self, message):
|
||||
task = message.body or {}
|
||||
message_id = message.id
|
||||
|
@ -15,6 +15,7 @@
|
||||
|
||||
import deep
|
||||
import types
|
||||
import re
|
||||
|
||||
|
||||
def transform_json(json, mappings):
|
||||
@ -35,13 +36,27 @@ def transform_json(json, mappings):
|
||||
return result
|
||||
|
||||
elif isinstance(json, types.StringTypes) and json.startswith('$'):
|
||||
value = mappings.get(json[1:])
|
||||
value = convert_macro_parameter(json[1:], mappings)
|
||||
if value is not None:
|
||||
return value
|
||||
|
||||
return json
|
||||
|
||||
|
||||
def convert_macro_parameter(macro, mappings):
|
||||
replaced = [False]
|
||||
|
||||
def replace(match):
|
||||
replaced[0] = True
|
||||
return unicode(mappings.get(match.group(1)))
|
||||
|
||||
result = re.sub('{(\\w+?)}', replace, macro)
|
||||
if replaced[0]:
|
||||
return result
|
||||
else:
|
||||
return mappings.get(macro)
|
||||
|
||||
|
||||
def merge_lists(list1, list2):
|
||||
result = []
|
||||
for item in list1 + list2:
|
||||
|
@ -66,7 +66,7 @@ def _extract_v2_results(result_value, ok, errors):
|
||||
}
|
||||
for attr in ('Message', 'AdditionalInfo'):
|
||||
if attr in body:
|
||||
del attr[body]
|
||||
del body[attr]
|
||||
err['extra'] = body if body else None
|
||||
errors.append(err)
|
||||
|
||||
@ -91,7 +91,7 @@ def send_command(engine, context, body, template, service, unit,
|
||||
errors = []
|
||||
_extract_results(result_value, ok, errors)
|
||||
|
||||
if ok:
|
||||
if ok or not errors:
|
||||
if result is not None:
|
||||
context[result] = ok
|
||||
success_handler = body.find('success')
|
||||
|
Loading…
x
Reference in New Issue
Block a user