From 8203a6ac0776ed250f4b1bb3b2f2516b3b5a96be Mon Sep 17 00:00:00 2001 From: Alexander Tivelkov Date: Mon, 5 Aug 2013 20:43:54 +0400 Subject: [PATCH] Graceful error handling via workflow commands Workflow rules can now define "failure" blocks to handle the various errors and exceptions occurred while executing the actions. These blocks are passed to 'update-cf-stack' and 'send-command' functions, so their callbacks can call them to handle the errors and exceptions. The actual error and exception data may be passed via context in the same way as result is passed to "success" handlers If 'failure" block is skipped, the global exception is risen, which interrupts the workflow execution and reports an unhandled error to the API at the error level. To gracefully stop the workflow execution without throwing an exception, a 'failure' block may define a '' command, which interrupts the execution after the end of current loop and without throwing any exceptions These changes allow to handle exceptions and unexpected states while interacting with heat, as well as agent-side exceptions delivered to conductor from Agent. Also, the same approach works for handling a timeout while waiting for task result from the Agent. To support timeouts a 'send-command' function must be passed with a 'timeout' parameter. If it is skipped, the timeout is considered to be infinite. The workflows have been update with failure blocks on all the commands. These blocks contain error-level reporting and command to interrupt the flow. No timeouts were set in workflows, so timeout feature is currently inactive for the existing workflows (as the 'inititialization timeout' concept needs to be introduced) Change-Id: Ia791d4656463240ed197bcd90b9d9eae648270af --- data/workflows/AD.xml | 76 +++++++++++++++-- data/workflows/AspNetApps.xml | 45 ++++++++-- data/workflows/ExternalAD.xml.example | 9 +- data/workflows/MsSqlCluster.xml | 81 ++++++++++++++++-- data/workflows/MsSqlServer.xml | 27 +++++- muranoconductor/app.py | 25 ++++-- muranoconductor/cloud_formation.py | 25 +++++- muranoconductor/commands/cloud_formation.py | 94 ++++++++++----------- muranoconductor/commands/windows_agent.py | 66 ++++++++++++--- muranoconductor/windows_agent.py | 58 +++++++++++-- muranoconductor/workflow.py | 10 +++ 11 files changed, 412 insertions(+), 104 deletions(-) diff --git a/data/workflows/AD.xml b/data/workflows/AD.xml index a4669e1..0f13504 100644 --- a/data/workflows/AD.xml +++ b/data/workflows/AD.xml @@ -12,7 +12,7 @@ ( @@ -40,12 +40,19 @@ Instance ) created + + + ( + + + - + + + + ( + + + - + + + + ( + + + @@ -97,7 +118,7 @@ ( @@ -121,12 +142,19 @@ Primary Domain Controller created + + + ( + + + - + + + + ( + + + - + + Unit ) was unable to leave the domain due to ' and @.state.primaryDcIp)] - + ( + + + ( + + + @@ -223,7 +272,7 @@ ( @@ -251,6 +300,17 @@ Domain + Unable to create Secondary Domain Controller on unit ) due to + Unable to create domain Creating instance ) - + ( + Unable to deploy instance ) due to @@ -57,12 +64,19 @@ http:// + + + ( + + + - + + + + ( + + + @@ -91,7 +112,7 @@ ( @@ -105,6 +126,13 @@ IIS ) has started + + + ( + + + @@ -114,7 +142,7 @@ on unit ) - + has been deployed on unit ) + + + ( + + + diff --git a/data/workflows/ExternalAD.xml.example b/data/workflows/ExternalAD.xml.example index bcd6aad..93b1b78 100644 --- a/data/workflows/ExternalAD.xml.example +++ b/data/workflows/ExternalAD.xml.example @@ -43,7 +43,7 @@ $[?(@.state.domain != ' @@ -79,6 +79,13 @@ Unit ) has joined domain + Unit ) was unable to join the domain due to Creating instance ) - + ( + Unable to deploy instance ) due to @@ -77,12 +84,19 @@ + Unable to set admin password on unit ) due to @@ -109,12 +123,19 @@ Failover cluster prerequisites installed on unit ) + + + ( + + + - + ) + + + due to @@ -188,12 +216,19 @@ Environment for AlwaysOn Availability Group of SQL Server Cluster service ( + Unable to configure the environment for AlwaysOn Availability Group of SQL Server Cluster service ( + + + - + ( + Unable to install SQL Server on unit ) due to @@ -258,12 +300,19 @@ AlwaysOn AG initialized for ) + + + ( + + + - + ( + Unable to initialize primary replica for SQL Server AG for ) due to @@ -349,6 +405,13 @@ Secondary replica for SQL Server AG initialized for ) + + + ( + + + diff --git a/data/workflows/MsSqlServer.xml b/data/workflows/MsSqlServer.xml index 4558b92..01cee9e 100644 --- a/data/workflows/MsSqlServer.xml +++ b/data/workflows/MsSqlServer.xml @@ -12,7 +12,7 @@ ( @@ -40,12 +40,19 @@ Instance ) created + + + ( + + + - + + + + ( + + + @@ -74,7 +88,7 @@ ( @@ -99,6 +113,13 @@ MS SQL Server ) has started + + + ( + + + diff --git a/muranoconductor/app.py b/muranoconductor/app.py index 688d8b3..67d45d8 100644 --- a/muranoconductor/app.py +++ b/muranoconductor/app.py @@ -77,7 +77,6 @@ class ConductorWorkflowService(service.Service): def _task_received(self, message): task = message.body or {} message_id = message.id - reporter = None with self.create_rmq_client() as mq: try: log.info('Starting processing task {0}: {1}'.format( @@ -96,7 +95,8 @@ class ConductorWorkflowService(service.Service): reporter) workflows.append(workflow) - while True: + stop = False + while not stop: try: while True: result = False @@ -112,17 +112,18 @@ class ConductorWorkflowService(service.Service): log.debug("No pending commands found, " "seems like we are done") break + if self.check_stop_requested(task): + log.info("Workflow stop requested") + stop = True except Exception as ex: + reporter.report_generic( + "Unexpected error has occurred", ex.message, + 'error') log.exception(ex) break command_dispatcher.close() - except reporting.ReportedException as e: - log.exception("Exception has occurred and was reported to API") - except Exception as e: - log.exception("Unexpected exception has occurred") - if reporter: - reporter.report_generic("Unexpected error has occurred", - e.message, 'error') + if stop: + log.info("Workflow stopped by 'stop' command") finally: self.cleanup(task, reporter) result_msg = Message() @@ -156,3 +157,9 @@ class ConductorWorkflowService(service.Service): if reporter: reporter.report_generic("Unexpected error has occurred", e.message, 'error') + + def check_stop_requested(self, model): + if 'temp' in model: + if '_stop_requested' in model['temp']: + return model['temp']['_stop_requested'] + return False diff --git a/muranoconductor/cloud_formation.py b/muranoconductor/cloud_formation.py index fc2ecda..10bfc02 100644 --- a/muranoconductor/cloud_formation.py +++ b/muranoconductor/cloud_formation.py @@ -20,14 +20,35 @@ import string import time import xml_code_engine +from openstack.common import log as logging + +log = logging.getLogger(__name__) -def update_cf_stack(engine, context, body, template, result=None, **kwargs): +def update_cf_stack(engine, context, body, template, result=None, error=None, + **kwargs): command_dispatcher = context['/commandDispatcher'] - def callback(result_value): + def callback(result_value, error_result=None): if result is not None: context[result] = result_value + + if error_result is not None: + if error is not None: + context[error] = { + 'message': getattr(error_result, 'message', None), + 'strerror': getattr(error_result, 'strerror', None), + 'timestamp': time.time() + } + failure_handler = body.find('failure') + if failure_handler is not None: + log.warning("Handling exception in failure block") + engine.evaluate_content(failure_handler, context) + return + else: + log.error("No failure block found for exception") + raise error_result + success_handler = body.find('success') if success_handler is not None: engine.evaluate_content(success_handler, context) diff --git a/muranoconductor/commands/cloud_formation.py b/muranoconductor/commands/cloud_formation.py index f6bcd4c..de2fdbc 100644 --- a/muranoconductor/commands/cloud_formation.py +++ b/muranoconductor/commands/cloud_formation.py @@ -93,64 +93,64 @@ class HeatExecutor(CommandBase): self._delete_pending_list) > 0 def execute_pending(self): - try: - r1 = self._execute_pending_updates() - r2 = self._execute_pending_deletes() - except Exception as e: - self._reporter.report_generic("Unable to execute Heat command", - e.message, "error") - trace = sys.exc_info()[2] - raise ReportedException(e.message), None, trace + r1 = self._execute_pending_updates() + r2 = self._execute_pending_deletes() return r1 or r2 def _execute_pending_updates(self): if not len(self._update_pending_list): return False - template, arguments = self._get_current_template() - stack_exists = (template != {}) + try: + template, arguments = self._get_current_template() + stack_exists = (template != {}) + # do not need to merge with current stack cause we rebuilding it + # from scratch on every deployment + template, arguments = ({}, {}) - # do not need to merge with current stack cause we rebuilding it from - # scratch on every deployment - template, arguments = ({}, {}) + for t in self._update_pending_list: + template = muranoconductor.helpers.merge_dicts(template, + t['template']) + arguments = muranoconductor.helpers.merge_dicts(arguments, + t['arguments'], + max_levels=1) + log.info( + 'Executing heat template {0} with arguments {1} on stack {2}' + .format(anyjson.dumps(template), arguments, self._stack)) - for t in self._update_pending_list: - template = muranoconductor.helpers.merge_dicts( - template, t['template']) - arguments = muranoconductor.helpers.merge_dicts( - arguments, t['arguments'], max_levels=1) + if stack_exists: + self._heat_client.stacks.update( + stack_id=self._stack, + parameters=arguments, + template=template) + log.debug( + 'Waiting for the stack {0} to be update'.format( + self._stack)) + outs = self._wait_state('UPDATE_COMPLETE') + log.info('Stack {0} updated'.format(self._stack)) + else: + self._heat_client.stacks.create( + stack_name=self._stack, + parameters=arguments, + template=template) - log.info( - 'Executing heat template {0} with arguments {1} on stack {2}' - .format(anyjson.dumps(template), arguments, self._stack)) + log.debug('Waiting for the stack {0} to be create'.format( + self._stack)) + outs = self._wait_state('CREATE_COMPLETE') + log.info('Stack {0} created'.format(self._stack)) - if stack_exists: - self._heat_client.stacks.update( - stack_id=self._stack, - parameters=arguments, - template=template) - log.debug( - 'Waiting for the stack {0} to be update'.format(self._stack)) - outs = self._wait_state('UPDATE_COMPLETE') - log.info('Stack {0} updated'.format(self._stack)) - else: - self._heat_client.stacks.create( - stack_name=self._stack, - parameters=arguments, - template=template) + pending_list = self._update_pending_list + self._update_pending_list = [] - log.debug('Waiting for the stack {0} to be create'.format( - self._stack)) - outs = self._wait_state('CREATE_COMPLETE') - log.info('Stack {0} created'.format(self._stack)) - - pending_list = self._update_pending_list - self._update_pending_list = [] - - for item in pending_list: - item['callback'](outs) - - return True + for item in pending_list: + item['callback'](outs) + return True + except Exception as ex: + pending_list = self._update_pending_list + self._update_pending_list = [] + for item in pending_list: + item['callback'](None, ex) + return True def _execute_pending_deletes(self): if not len(self._delete_pending_list): diff --git a/muranoconductor/commands/windows_agent.py b/muranoconductor/commands/windows_agent.py index a2ea159..f112b8c 100644 --- a/muranoconductor/commands/windows_agent.py +++ b/muranoconductor/commands/windows_agent.py @@ -18,7 +18,8 @@ class WindowsAgentExecutor(CommandBase): self._reporter = reporter rmqclient.declare(self._results_queue) - def execute(self, template, mappings, unit, service, callback): + def execute(self, template, mappings, unit, service, callback, + timeout=None): with open('data/templates/agent/%s.template' % template) as t_file: template_data = t_file.read() @@ -29,7 +30,8 @@ class WindowsAgentExecutor(CommandBase): queue = ('%s-%s-%s' % (self._stack, service, unit)).lower() self._pending_list.append({ 'id': msg_id, - 'callback': callback + 'callback': callback, + 'timeout': timeout }) msg = Message() @@ -49,15 +51,53 @@ class WindowsAgentExecutor(CommandBase): with self._rmqclient.open(self._results_queue) as subscription: while self.has_pending_commands(): - log.debug("Waiting for responses to be returned by the agent. " - "%i total responses remain", len(self._pending_list)) - msg = subscription.get_message() - msg.ack() - msg_id = msg.id.lower() - item, index = muranoconductor.helpers.find( - lambda t: t['id'] == msg_id, self._pending_list) - if item: - self._pending_list.pop(index) - item['callback'](msg.body) - + # TODO: Add extended initialization timeout + # By now, all the timeouts are defined by the command input + # however, the first reply which we wait for being returned + # from the unit may be delayed due to long unit initialization + # and startup. So, for the nonitialized units we need to extend + # the command's timeout with the initialization timeout + timeout = self.get_max_timeout() + if timeout: + span_message = "for {0} seconds".format(timeout) + else: + span_message = 'infinitely' + log.debug("Waiting %s for responses to be returned" + " by the agent. %i total responses remain", + span_message, len(self._pending_list)) + msg = subscription.get_message(timeout=timeout) + if msg: + msg.ack() + msg_id = msg.id.lower() + item, index = muranoconductor.helpers.find( + lambda t: t['id'] == msg_id, self._pending_list) + if item: + self._pending_list.pop(index) + item['callback'](msg.body) + else: + while self.has_pending_commands(): + item = self._pending_list.pop() + item['callback'](AgentTimeoutException(timeout)) return True + + def get_max_timeout(self): + res = 0 + for item in self._pending_list: + if item['timeout'] is None: # if at least 1 item has no timeout + return None # then return None (i.e. infinite) + res = max(res, item['timeout']) + return res + + +class AgentTimeoutException(Exception): + def __init__(self, timeout): + self.message = "Unable to receive any response from the agent" \ + " in {0} sec".format(timeout) + self.timeout = timeout + + +class UnhandledAgentException(Exception): + def __init__(self, errors): + self.message = "An unhandled exception has " \ + "occurred in the Agent: {0}".format(errors) + self.errors = errors diff --git a/muranoconductor/windows_agent.py b/muranoconductor/windows_agent.py index 4e42509..09494c6 100644 --- a/muranoconductor/windows_agent.py +++ b/muranoconductor/windows_agent.py @@ -12,33 +12,77 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. +from muranoconductor.commands.windows_agent import AgentTimeoutException +from muranoconductor.commands.windows_agent import UnhandledAgentException import xml_code_engine from openstack.common import log as logging + log = logging.getLogger(__name__) def send_command(engine, context, body, template, service, unit, mappings=None, - result=None, **kwargs): + result=None, error=None, timeout=None, **kwargs): if not mappings: mappings = {} command_dispatcher = context['/commandDispatcher'] + if timeout: + timeout = int(timeout) def callback(result_value): log.info( 'Received result from {2} for {0}: {1}'.format( template, result_value, unit)) - if result is not None: - context[result] = result_value['Result'] + ok = [] + errors = [] + if isinstance(result_value, AgentTimeoutException): + errors.append({ + 'type': "timeout", + 'messages': [result_value.message], + 'timeout': result_value.timeout + }) + else: + if result_value['IsException']: + msg = "A general exception has occurred in the Agent: " + \ + result_value['Result'] + errors.append({ + 'type': "general", + 'messages': [msg], + }) - success_handler = body.find('success') - if success_handler is not None: - engine.evaluate_content(success_handler, context) + else: + for res in result_value['Result']: + if res['IsException']: + errors.append({ + 'type': 'inner', + 'messages': res['Result'] + }) + else: + ok.append(res) + + if ok: + if result is not None: + context[result] = ok + success_handler = body.find('success') + if success_handler is not None: + engine.evaluate_content(success_handler, context) + if errors: + if error is not None: + context[error] = errors + failure_handler = body.find('failure') + if failure_handler is not None: + engine.evaluate_content(failure_handler, context) + else: + log.error("No failure block found for exception") + if isinstance(result_value, AgentTimeoutException): + raise result_value + else: + raise UnhandledAgentException(errors) command_dispatcher.execute( name='agent', template=template, mappings=mappings, - unit=unit, service=service, callback=callback) + unit=unit, service=service, callback=callback, timeout=timeout) xml_code_engine.XmlCodeEngine.register_function(send_command, "send-command") diff --git a/muranoconductor/workflow.py b/muranoconductor/workflow.py index c07f3ca..3206134 100644 --- a/muranoconductor/workflow.py +++ b/muranoconductor/workflow.py @@ -203,6 +203,13 @@ class Workflow(object): return True return False + @staticmethod + def _stop_func(context, body, engine, **kwargs): + if not 'temp' in context['/dataSource']: + context['/dataSource']['temp'] = {} + + context['/dataSource']['temp']['_stop_requested'] = True + xml_code_engine.XmlCodeEngine.register_function( Workflow._rule_func, 'rule') @@ -216,6 +223,9 @@ xml_code_engine.XmlCodeEngine.register_function( xml_code_engine.XmlCodeEngine.register_function( Workflow._select_func, 'select') +xml_code_engine.XmlCodeEngine.register_function( + Workflow._stop_func, 'stop') + xml_code_engine.XmlCodeEngine.register_function( Workflow._select_all_func, 'select-all')