diff --git a/data/workflows/AD.xml b/data/workflows/AD.xml index a4669e1..0f13504 100644 --- a/data/workflows/AD.xml +++ b/data/workflows/AD.xml @@ -12,7 +12,7 @@ ( @@ -40,12 +40,19 @@ Instance ) created + + + ( + + + - + + + + ( + + + - + + + + ( + + + @@ -97,7 +118,7 @@ ( @@ -121,12 +142,19 @@ Primary Domain Controller created + + + ( + + + - + + + + ( + + + - + + Unit ) was unable to leave the domain due to ' and @.state.primaryDcIp)] - + ( + + + ( + + + @@ -223,7 +272,7 @@ ( @@ -251,6 +300,17 @@ Domain + Unable to create Secondary Domain Controller on unit ) due to + Unable to create domain Creating instance ) - + ( + Unable to deploy instance ) due to @@ -57,12 +64,19 @@ http:// + + + ( + + + - + + + + ( + + + @@ -91,7 +112,7 @@ ( @@ -105,6 +126,13 @@ IIS ) has started + + + ( + + + @@ -114,7 +142,7 @@ on unit ) - + has been deployed on unit ) + + + ( + + + diff --git a/data/workflows/ExternalAD.xml.example b/data/workflows/ExternalAD.xml.example index bcd6aad..93b1b78 100644 --- a/data/workflows/ExternalAD.xml.example +++ b/data/workflows/ExternalAD.xml.example @@ -43,7 +43,7 @@ $[?(@.state.domain != ' @@ -79,6 +79,13 @@ Unit ) has joined domain + Unit ) was unable to join the domain due to Creating instance ) - + ( + Unable to deploy instance ) due to @@ -77,12 +84,19 @@ + Unable to set admin password on unit ) due to @@ -109,12 +123,19 @@ Failover cluster prerequisites installed on unit ) + + + ( + + + - + ) + + + due to @@ -188,12 +216,19 @@ Environment for AlwaysOn Availability Group of SQL Server Cluster service ( + Unable to configure the environment for AlwaysOn Availability Group of SQL Server Cluster service ( + + + - + ( + Unable to install SQL Server on unit ) due to @@ -258,12 +300,19 @@ AlwaysOn AG initialized for ) + + + ( + + + - + ( + Unable to initialize primary replica for SQL Server AG for ) due to @@ -349,6 +405,13 @@ Secondary replica for SQL Server AG initialized for ) + + + ( + + + diff --git a/data/workflows/MsSqlServer.xml b/data/workflows/MsSqlServer.xml index 4558b92..01cee9e 100644 --- a/data/workflows/MsSqlServer.xml +++ b/data/workflows/MsSqlServer.xml @@ -12,7 +12,7 @@ ( @@ -40,12 +40,19 @@ Instance ) created + + + ( + + + - + + + + ( + + + @@ -74,7 +88,7 @@ ( @@ -99,6 +113,13 @@ MS SQL Server ) has started + + + ( + + + diff --git a/muranoconductor/app.py b/muranoconductor/app.py index 688d8b3..67d45d8 100644 --- a/muranoconductor/app.py +++ b/muranoconductor/app.py @@ -77,7 +77,6 @@ class ConductorWorkflowService(service.Service): def _task_received(self, message): task = message.body or {} message_id = message.id - reporter = None with self.create_rmq_client() as mq: try: log.info('Starting processing task {0}: {1}'.format( @@ -96,7 +95,8 @@ class ConductorWorkflowService(service.Service): reporter) workflows.append(workflow) - while True: + stop = False + while not stop: try: while True: result = False @@ -112,17 +112,18 @@ class ConductorWorkflowService(service.Service): log.debug("No pending commands found, " "seems like we are done") break + if self.check_stop_requested(task): + log.info("Workflow stop requested") + stop = True except Exception as ex: + reporter.report_generic( + "Unexpected error has occurred", ex.message, + 'error') log.exception(ex) break command_dispatcher.close() - except reporting.ReportedException as e: - log.exception("Exception has occurred and was reported to API") - except Exception as e: - log.exception("Unexpected exception has occurred") - if reporter: - reporter.report_generic("Unexpected error has occurred", - e.message, 'error') + if stop: + log.info("Workflow stopped by 'stop' command") finally: self.cleanup(task, reporter) result_msg = Message() @@ -156,3 +157,9 @@ class ConductorWorkflowService(service.Service): if reporter: reporter.report_generic("Unexpected error has occurred", e.message, 'error') + + def check_stop_requested(self, model): + if 'temp' in model: + if '_stop_requested' in model['temp']: + return model['temp']['_stop_requested'] + return False diff --git a/muranoconductor/cloud_formation.py b/muranoconductor/cloud_formation.py index fc2ecda..10bfc02 100644 --- a/muranoconductor/cloud_formation.py +++ b/muranoconductor/cloud_formation.py @@ -20,14 +20,35 @@ import string import time import xml_code_engine +from openstack.common import log as logging + +log = logging.getLogger(__name__) -def update_cf_stack(engine, context, body, template, result=None, **kwargs): +def update_cf_stack(engine, context, body, template, result=None, error=None, + **kwargs): command_dispatcher = context['/commandDispatcher'] - def callback(result_value): + def callback(result_value, error_result=None): if result is not None: context[result] = result_value + + if error_result is not None: + if error is not None: + context[error] = { + 'message': getattr(error_result, 'message', None), + 'strerror': getattr(error_result, 'strerror', None), + 'timestamp': time.time() + } + failure_handler = body.find('failure') + if failure_handler is not None: + log.warning("Handling exception in failure block") + engine.evaluate_content(failure_handler, context) + return + else: + log.error("No failure block found for exception") + raise error_result + success_handler = body.find('success') if success_handler is not None: engine.evaluate_content(success_handler, context) diff --git a/muranoconductor/commands/cloud_formation.py b/muranoconductor/commands/cloud_formation.py index f6bcd4c..de2fdbc 100644 --- a/muranoconductor/commands/cloud_formation.py +++ b/muranoconductor/commands/cloud_formation.py @@ -93,64 +93,64 @@ class HeatExecutor(CommandBase): self._delete_pending_list) > 0 def execute_pending(self): - try: - r1 = self._execute_pending_updates() - r2 = self._execute_pending_deletes() - except Exception as e: - self._reporter.report_generic("Unable to execute Heat command", - e.message, "error") - trace = sys.exc_info()[2] - raise ReportedException(e.message), None, trace + r1 = self._execute_pending_updates() + r2 = self._execute_pending_deletes() return r1 or r2 def _execute_pending_updates(self): if not len(self._update_pending_list): return False - template, arguments = self._get_current_template() - stack_exists = (template != {}) + try: + template, arguments = self._get_current_template() + stack_exists = (template != {}) + # do not need to merge with current stack cause we rebuilding it + # from scratch on every deployment + template, arguments = ({}, {}) - # do not need to merge with current stack cause we rebuilding it from - # scratch on every deployment - template, arguments = ({}, {}) + for t in self._update_pending_list: + template = muranoconductor.helpers.merge_dicts(template, + t['template']) + arguments = muranoconductor.helpers.merge_dicts(arguments, + t['arguments'], + max_levels=1) + log.info( + 'Executing heat template {0} with arguments {1} on stack {2}' + .format(anyjson.dumps(template), arguments, self._stack)) - for t in self._update_pending_list: - template = muranoconductor.helpers.merge_dicts( - template, t['template']) - arguments = muranoconductor.helpers.merge_dicts( - arguments, t['arguments'], max_levels=1) + if stack_exists: + self._heat_client.stacks.update( + stack_id=self._stack, + parameters=arguments, + template=template) + log.debug( + 'Waiting for the stack {0} to be update'.format( + self._stack)) + outs = self._wait_state('UPDATE_COMPLETE') + log.info('Stack {0} updated'.format(self._stack)) + else: + self._heat_client.stacks.create( + stack_name=self._stack, + parameters=arguments, + template=template) - log.info( - 'Executing heat template {0} with arguments {1} on stack {2}' - .format(anyjson.dumps(template), arguments, self._stack)) + log.debug('Waiting for the stack {0} to be create'.format( + self._stack)) + outs = self._wait_state('CREATE_COMPLETE') + log.info('Stack {0} created'.format(self._stack)) - if stack_exists: - self._heat_client.stacks.update( - stack_id=self._stack, - parameters=arguments, - template=template) - log.debug( - 'Waiting for the stack {0} to be update'.format(self._stack)) - outs = self._wait_state('UPDATE_COMPLETE') - log.info('Stack {0} updated'.format(self._stack)) - else: - self._heat_client.stacks.create( - stack_name=self._stack, - parameters=arguments, - template=template) + pending_list = self._update_pending_list + self._update_pending_list = [] - log.debug('Waiting for the stack {0} to be create'.format( - self._stack)) - outs = self._wait_state('CREATE_COMPLETE') - log.info('Stack {0} created'.format(self._stack)) - - pending_list = self._update_pending_list - self._update_pending_list = [] - - for item in pending_list: - item['callback'](outs) - - return True + for item in pending_list: + item['callback'](outs) + return True + except Exception as ex: + pending_list = self._update_pending_list + self._update_pending_list = [] + for item in pending_list: + item['callback'](None, ex) + return True def _execute_pending_deletes(self): if not len(self._delete_pending_list): diff --git a/muranoconductor/commands/windows_agent.py b/muranoconductor/commands/windows_agent.py index a2ea159..f112b8c 100644 --- a/muranoconductor/commands/windows_agent.py +++ b/muranoconductor/commands/windows_agent.py @@ -18,7 +18,8 @@ class WindowsAgentExecutor(CommandBase): self._reporter = reporter rmqclient.declare(self._results_queue) - def execute(self, template, mappings, unit, service, callback): + def execute(self, template, mappings, unit, service, callback, + timeout=None): with open('data/templates/agent/%s.template' % template) as t_file: template_data = t_file.read() @@ -29,7 +30,8 @@ class WindowsAgentExecutor(CommandBase): queue = ('%s-%s-%s' % (self._stack, service, unit)).lower() self._pending_list.append({ 'id': msg_id, - 'callback': callback + 'callback': callback, + 'timeout': timeout }) msg = Message() @@ -49,15 +51,53 @@ class WindowsAgentExecutor(CommandBase): with self._rmqclient.open(self._results_queue) as subscription: while self.has_pending_commands(): - log.debug("Waiting for responses to be returned by the agent. " - "%i total responses remain", len(self._pending_list)) - msg = subscription.get_message() - msg.ack() - msg_id = msg.id.lower() - item, index = muranoconductor.helpers.find( - lambda t: t['id'] == msg_id, self._pending_list) - if item: - self._pending_list.pop(index) - item['callback'](msg.body) - + # TODO: Add extended initialization timeout + # By now, all the timeouts are defined by the command input + # however, the first reply which we wait for being returned + # from the unit may be delayed due to long unit initialization + # and startup. So, for the nonitialized units we need to extend + # the command's timeout with the initialization timeout + timeout = self.get_max_timeout() + if timeout: + span_message = "for {0} seconds".format(timeout) + else: + span_message = 'infinitely' + log.debug("Waiting %s for responses to be returned" + " by the agent. %i total responses remain", + span_message, len(self._pending_list)) + msg = subscription.get_message(timeout=timeout) + if msg: + msg.ack() + msg_id = msg.id.lower() + item, index = muranoconductor.helpers.find( + lambda t: t['id'] == msg_id, self._pending_list) + if item: + self._pending_list.pop(index) + item['callback'](msg.body) + else: + while self.has_pending_commands(): + item = self._pending_list.pop() + item['callback'](AgentTimeoutException(timeout)) return True + + def get_max_timeout(self): + res = 0 + for item in self._pending_list: + if item['timeout'] is None: # if at least 1 item has no timeout + return None # then return None (i.e. infinite) + res = max(res, item['timeout']) + return res + + +class AgentTimeoutException(Exception): + def __init__(self, timeout): + self.message = "Unable to receive any response from the agent" \ + " in {0} sec".format(timeout) + self.timeout = timeout + + +class UnhandledAgentException(Exception): + def __init__(self, errors): + self.message = "An unhandled exception has " \ + "occurred in the Agent: {0}".format(errors) + self.errors = errors diff --git a/muranoconductor/windows_agent.py b/muranoconductor/windows_agent.py index 4e42509..09494c6 100644 --- a/muranoconductor/windows_agent.py +++ b/muranoconductor/windows_agent.py @@ -12,33 +12,77 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. +from muranoconductor.commands.windows_agent import AgentTimeoutException +from muranoconductor.commands.windows_agent import UnhandledAgentException import xml_code_engine from openstack.common import log as logging + log = logging.getLogger(__name__) def send_command(engine, context, body, template, service, unit, mappings=None, - result=None, **kwargs): + result=None, error=None, timeout=None, **kwargs): if not mappings: mappings = {} command_dispatcher = context['/commandDispatcher'] + if timeout: + timeout = int(timeout) def callback(result_value): log.info( 'Received result from {2} for {0}: {1}'.format( template, result_value, unit)) - if result is not None: - context[result] = result_value['Result'] + ok = [] + errors = [] + if isinstance(result_value, AgentTimeoutException): + errors.append({ + 'type': "timeout", + 'messages': [result_value.message], + 'timeout': result_value.timeout + }) + else: + if result_value['IsException']: + msg = "A general exception has occurred in the Agent: " + \ + result_value['Result'] + errors.append({ + 'type': "general", + 'messages': [msg], + }) - success_handler = body.find('success') - if success_handler is not None: - engine.evaluate_content(success_handler, context) + else: + for res in result_value['Result']: + if res['IsException']: + errors.append({ + 'type': 'inner', + 'messages': res['Result'] + }) + else: + ok.append(res) + + if ok: + if result is not None: + context[result] = ok + success_handler = body.find('success') + if success_handler is not None: + engine.evaluate_content(success_handler, context) + if errors: + if error is not None: + context[error] = errors + failure_handler = body.find('failure') + if failure_handler is not None: + engine.evaluate_content(failure_handler, context) + else: + log.error("No failure block found for exception") + if isinstance(result_value, AgentTimeoutException): + raise result_value + else: + raise UnhandledAgentException(errors) command_dispatcher.execute( name='agent', template=template, mappings=mappings, - unit=unit, service=service, callback=callback) + unit=unit, service=service, callback=callback, timeout=timeout) xml_code_engine.XmlCodeEngine.register_function(send_command, "send-command") diff --git a/muranoconductor/workflow.py b/muranoconductor/workflow.py index c07f3ca..3206134 100644 --- a/muranoconductor/workflow.py +++ b/muranoconductor/workflow.py @@ -203,6 +203,13 @@ class Workflow(object): return True return False + @staticmethod + def _stop_func(context, body, engine, **kwargs): + if not 'temp' in context['/dataSource']: + context['/dataSource']['temp'] = {} + + context['/dataSource']['temp']['_stop_requested'] = True + xml_code_engine.XmlCodeEngine.register_function( Workflow._rule_func, 'rule') @@ -216,6 +223,9 @@ xml_code_engine.XmlCodeEngine.register_function( xml_code_engine.XmlCodeEngine.register_function( Workflow._select_func, 'select') +xml_code_engine.XmlCodeEngine.register_function( + Workflow._stop_func, 'stop') + xml_code_engine.XmlCodeEngine.register_function( Workflow._select_all_func, 'select-all')