
Workflow rules can now define "failure" blocks to handle the various errors and exceptions occurred while executing the actions. These blocks are passed to 'update-cf-stack' and 'send-command' functions, so their callbacks can call them to handle the errors and exceptions. The actual error and exception data may be passed via context in the same way as result is passed to "success" handlers If 'failure" block is skipped, the global exception is risen, which interrupts the workflow execution and reports an unhandled error to the API at the error level. To gracefully stop the workflow execution without throwing an exception, a 'failure' block may define a '<stop/>' command, which interrupts the execution after the end of current loop and without throwing any exceptions These changes allow to handle exceptions and unexpected states while interacting with heat, as well as agent-side exceptions delivered to conductor from Agent. Also, the same approach works for handling a timeout while waiting for task result from the Agent. To support timeouts a 'send-command' function must be passed with a 'timeout' parameter. If it is skipped, the timeout is considered to be infinite. The workflows have been update with failure blocks on all the commands. These blocks contain error-level reporting and <stop/> command to interrupt the flow. No timeouts were set in workflows, so timeout feature is currently inactive for the existing workflows (as the 'inititialization timeout' concept needs to be introduced) Change-Id: Ia791d4656463240ed197bcd90b9d9eae648270af
104 lines
3.9 KiB
Python
104 lines
3.9 KiB
Python
import json
|
|
import uuid
|
|
|
|
from muranoconductor.openstack.common import log as logging
|
|
from muranocommon.messaging import Message
|
|
import muranoconductor.helpers
|
|
from command import CommandBase
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class WindowsAgentExecutor(CommandBase):
|
|
def __init__(self, stack, rmqclient, reporter):
|
|
self._stack = stack
|
|
self._rmqclient = rmqclient
|
|
self._pending_list = []
|
|
self._results_queue = '-execution-results-%s' % str(stack).lower()
|
|
self._reporter = reporter
|
|
rmqclient.declare(self._results_queue)
|
|
|
|
def execute(self, template, mappings, unit, service, callback,
|
|
timeout=None):
|
|
with open('data/templates/agent/%s.template' % template) as t_file:
|
|
template_data = t_file.read()
|
|
|
|
template_data = muranoconductor.helpers.transform_json(
|
|
json.loads(template_data), mappings)
|
|
|
|
msg_id = str(uuid.uuid4()).lower()
|
|
queue = ('%s-%s-%s' % (self._stack, service, unit)).lower()
|
|
self._pending_list.append({
|
|
'id': msg_id,
|
|
'callback': callback,
|
|
'timeout': timeout
|
|
})
|
|
|
|
msg = Message()
|
|
msg.body = template_data
|
|
msg.id = msg_id
|
|
self._rmqclient.declare(queue)
|
|
self._rmqclient.send(message=msg, key=queue)
|
|
log.info('Sending RMQ message {0} to {1} with id {2}'.format(
|
|
template_data, queue, msg_id))
|
|
|
|
def has_pending_commands(self):
|
|
return len(self._pending_list) > 0
|
|
|
|
def execute_pending(self):
|
|
if not self.has_pending_commands():
|
|
return False
|
|
|
|
with self._rmqclient.open(self._results_queue) as subscription:
|
|
while self.has_pending_commands():
|
|
# TODO: Add extended initialization timeout
|
|
# By now, all the timeouts are defined by the command input
|
|
# however, the first reply which we wait for being returned
|
|
# from the unit may be delayed due to long unit initialization
|
|
# and startup. So, for the nonitialized units we need to extend
|
|
# the command's timeout with the initialization timeout
|
|
timeout = self.get_max_timeout()
|
|
if timeout:
|
|
span_message = "for {0} seconds".format(timeout)
|
|
else:
|
|
span_message = 'infinitely'
|
|
log.debug("Waiting %s for responses to be returned"
|
|
" by the agent. %i total responses remain",
|
|
span_message, len(self._pending_list))
|
|
msg = subscription.get_message(timeout=timeout)
|
|
if msg:
|
|
msg.ack()
|
|
msg_id = msg.id.lower()
|
|
item, index = muranoconductor.helpers.find(
|
|
lambda t: t['id'] == msg_id, self._pending_list)
|
|
if item:
|
|
self._pending_list.pop(index)
|
|
item['callback'](msg.body)
|
|
else:
|
|
while self.has_pending_commands():
|
|
item = self._pending_list.pop()
|
|
item['callback'](AgentTimeoutException(timeout))
|
|
return True
|
|
|
|
def get_max_timeout(self):
|
|
res = 0
|
|
for item in self._pending_list:
|
|
if item['timeout'] is None: # if at least 1 item has no timeout
|
|
return None # then return None (i.e. infinite)
|
|
res = max(res, item['timeout'])
|
|
return res
|
|
|
|
|
|
class AgentTimeoutException(Exception):
|
|
def __init__(self, timeout):
|
|
self.message = "Unable to receive any response from the agent" \
|
|
" in {0} sec".format(timeout)
|
|
self.timeout = timeout
|
|
|
|
|
|
class UnhandledAgentException(Exception):
|
|
def __init__(self, errors):
|
|
self.message = "An unhandled exception has " \
|
|
"occurred in the Agent: {0}".format(errors)
|
|
self.errors = errors
|