diff --git a/muranoconductor/app.py b/muranoconductor/app.py index 67d45d8..ac8f17f 100644 --- a/muranoconductor/app.py +++ b/muranoconductor/app.py @@ -98,6 +98,8 @@ class ConductorWorkflowService(service.Service): stop = False while not stop: try: + for workflow in workflows: + workflow.prepare() while True: result = False for workflow in workflows: diff --git a/muranoconductor/workflow.py b/muranoconductor/workflow.py index 3206134..6afb8d7 100644 --- a/muranoconductor/workflow.py +++ b/muranoconductor/workflow.py @@ -22,6 +22,7 @@ import function_context import xml_code_engine log = logging.getLogger(__name__) +object_id = id class Workflow(object): @@ -34,14 +35,28 @@ class Workflow(object): self._config = config self._reporter = reporter + # format: (rule-id, entity-id) => True for auto-reset bans, + # False for permanent bans + self._blacklist = {} + def execute(self): context = function_context.Context() context['/dataSource'] = self._data context['/commandDispatcher'] = self._command_dispatcher context['/config'] = self._config context['/reporter'] = self._reporter + context['/__blacklist'] = self._blacklist return self._engine.execute(context) + def prepare(self): + permanent_bans = dict([ + (key, value) for key, value + in self._blacklist.iteritems() + if value is False + ]) + self._blacklist.clear() + self._blacklist.update(permanent_bans) + @staticmethod def _get_path(obj, path, create_non_existing=False): current = obj @@ -149,8 +164,32 @@ class Workflow(object): context['/hasSideEffects'] = True @staticmethod - def _rule_func(match, context, body, engine, limit=0, name=None, desc=None, + def _mute_func(context, rule=None, id=None, **kwargs): + if rule is None: + rule = context['__currentRuleId'] + if id is None: + id = context['__dataSource_currentObj'] + + blacklist = context['/__blacklist'] + blacklist[(rule, id)] = False + + @staticmethod + def _unmute_func(context, rule=None, id=None, **kwargs): + if rule is None: + rule = context['__currentRuleId'] + if id is None: + id = context['__dataSource_currentObj'] + + blacklist = context['/__blacklist'] + if (rule, id) in blacklist: + del blacklist[(rule, id)] + + @staticmethod + def _rule_func(match, context, body, engine, limit=0, id=None, desc=None, **kwargs): + if not id: + id = object_id(body) + context['__currentRuleId'] = id position, match = Workflow._get_relative_position(match, context) if not desc: desc = match @@ -160,6 +199,7 @@ class Workflow(object): match = match.replace('$.', '$[*].') selected = jsonpath.jsonpath([data], match, 'IPATH') or [] index = 0 + blacklist = context['/__blacklist'] for found_match in selected: if 0 < int(limit) <= index: break @@ -167,11 +207,20 @@ class Workflow(object): new_position = position + found_match[1:] context['__dataSource_currentPosition'] = new_position cur_obj = Workflow._get_path(context['/dataSource'], new_position) + + use_blacklist = False + if isinstance(cur_obj, dict) and ('id' in cur_obj): + use_blacklist = True + if (id, cur_obj['id']) in blacklist: + continue + context['__dataSource_currentObj'] = cur_obj log.debug("Rule '{0}' matches on '{1}'".format(desc, cur_obj)) for element in body: if element.tag == 'empty': continue + if use_blacklist: + blacklist[(id, cur_obj['id'])] = True engine.evaluate(element, context) if element.tag == 'rule' and context['/hasSideEffects']: break @@ -231,3 +280,9 @@ xml_code_engine.XmlCodeEngine.register_function( xml_code_engine.XmlCodeEngine.register_function( Workflow._select_single_func, 'select-single') + +xml_code_engine.XmlCodeEngine.register_function( + Workflow._mute_func, 'mute') + +xml_code_engine.XmlCodeEngine.register_function( + Workflow._unmute_func, 'unmute')