From f4839a6b28f972c687efaecdbd948661c1761099 Mon Sep 17 00:00:00 2001 From: Sandy Walsh Date: Fri, 29 Aug 2014 20:12:42 -0300 Subject: [PATCH] Callback handlers are now idempotent. The sandbox demo now uses a list of callback handlers with separate trigger and commit phases. Idempotent work should only be done during the commit() call. Temporary work can be stored in the callback's 'scratchpad' dictionary until commit() is called. --- oahu_config.py | 75 +++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 62 insertions(+), 13 deletions(-) diff --git a/oahu_config.py b/oahu_config.py index e56d12f..63a822c 100644 --- a/oahu_config.py +++ b/oahu_config.py @@ -6,25 +6,73 @@ from oahu import pipeline_callback from oahu import criteria -class Callback(pipeline_callback.PipelineCallback): - def on_trigger(self, stream): - print "Processing", stream +# We determine which operation this is by the first of these we see. +OPERATIONS = [ + 'compute.instance.shutdown.start', + 'compute.instance.delete.start', + 'compute.instance.snapshot.start', + 'compute.instance.create.start', + 'compute.instance.reboot.start', + 'compute.instance.rebuild.start', + 'compute.instance.resize.start', + 'compute.instance.finish_resize.start', + 'compute.instance.resize.confirm.start', + 'compute.instance.resize.prep.start', + 'compute.instance.power_off.start', + 'compute.instance.rescue.start', + 'compute.instance.unrescue.start', +] + + +class RequestIdCallback(pipeline_callback.PipelineCallback): + def on_trigger(self, stream, scratchpad): + if not len(stream.events): + return + + # Try to guess the operation by the first know event_type ... + operation = None + for e in stream.events: + if e['event_type'] in OPERATIONS: + operation = e['event_type'] + break + scratchpad['operation'] = operation + + # How long did this operation take? + first = stream.events[0] + last = stream.events[-1] + delta = last['timestamp'] - first['timestamp'] + + scratchpad['request_id'] = first['_context_request_id'] + scratchpad['delta'] = delta + + def commit(self, stream, scratchpad): + print "Req: %s %s time delta = %s" % (scratchpad['request_id'], + scratchpad['operation'], + scratchpad['delta']) + + +class EodExistsCallback(pipeline_callback.PipelineCallback): + def on_trigger(self, stream, scratchpad): + print "EOD-Exists:", stream + #for event in stream.events: + # print event['timestamp'], event['event_type'] + + def commit(self, stream, scratchpad): + pass class Config(oahu.config.Config): - def get_driver(self, callback=None): - if not callback: - self.callback = Callback() - else: - self.callback = callback + def get_driver(self): + self.request_id_callback = RequestIdCallback() + self.eod_exists_callback = EodExistsCallback() # Trigger names have to be consistent across all workers # (yagi and daemons). by_request = trigger_definition.TriggerDefinition("request-id", - ["_context_request_id", ], - criteria.Inactive(60), - self.callback, - debug=True) + ["_context_request_id", ], + criteria.Inactive(60), + [self.request_id_callback,], + debug=True) # This trigger requires a Trait called "when_date" which is # the date-only portion of the "when" trait. We will create @@ -34,7 +82,8 @@ class Config(oahu.config.Config): ["payload/instance_id", "audit_bucket"], criteria.EndOfDayExists( 'compute.instance.exists'), - self.callback, debug=True, + [self.eod_exists_callback,], + debug=True, dumper=debugging.DetailedDumper()) triggers = [by_request, instance_usage]