From 5835e250cca9e54a2031ae3a12af4dc44736c3c7 Mon Sep 17 00:00:00 2001
From: Craig Bryant <craig.bryant@hp.com>
Date: Thu, 1 May 2014 14:13:58 -0600
Subject: [PATCH] Change to set lastMinLagMessageSent to the time the first
 lagging metric is received. The problem is that storm can take a long time to
 send the first metric after prepare() is called and we want to give the Bolt
 time to clear the metrics before sending a lagging message.

---
 .../thresholding/MetricFilteringBolt.java       |  6 ++++--
 .../thresholding/MetricFilteringBoltTest.java   | 17 +++++++++++++----
 2 files changed, 17 insertions(+), 6 deletions(-)

diff --git a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricFilteringBolt.java b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricFilteringBolt.java
index 65c7763..6c272f2 100644
--- a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricFilteringBolt.java
+++ b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricFilteringBolt.java
@@ -154,6 +154,9 @@ public class MetricFilteringBolt extends BaseRichBolt {
                   MAX_LAG_MESSAGES * LAG_MESSAGE_PERIOD, minLag);
           lagging = false;
       }
+      else if (lastMinLagMessageSent == 0) {
+          lastMinLagMessageSent = now;
+      }
       else if ((now - lastMinLagMessageSent) >= LAG_MESSAGE_PERIOD) {
           LOG.info("Sending {} message, minLag = {}", MetricAggregationBolt.METRICS_BEHIND, minLag);
           collector.emit(MetricAggregationBolt.METRIC_AGGREGATION_CONTROL_STREAM,
@@ -200,8 +203,7 @@ public class MetricFilteringBolt extends BaseRichBolt {
         }
       }
     }
-    // Not really when it was sent, but want to wait at least LAG_MESSAGE_PERIOD before sending a message
-    lastMinLagMessageSent = getCurrentSeconds();
+    lastMinLagMessageSent = 0;
   }
 
   /**
diff --git a/src/test/java/com/hpcloud/mon/infrastructure/thresholding/MetricFilteringBoltTest.java b/src/test/java/com/hpcloud/mon/infrastructure/thresholding/MetricFilteringBoltTest.java
index 7cdec69..fa00391 100644
--- a/src/test/java/com/hpcloud/mon/infrastructure/thresholding/MetricFilteringBoltTest.java
+++ b/src/test/java/com/hpcloud/mon/infrastructure/thresholding/MetricFilteringBoltTest.java
@@ -92,10 +92,13 @@ public class MetricFilteringBoltTest {
 
         final long prepareTime = bolt.getCurrentSeconds();
         final MetricDefinition metricDefinition = subAlarms.get(0).getExpression().getMetricDefinition();
-        final Tuple lateMetricTuple = createMetricTuple(metricDefinition, new Metric(metricDefinition, prepareTime, 42.0));
-        bolt.setCurrentSeconds(prepareTime + MetricFilteringBolt.LAG_MESSAGE_PERIOD_DEFAULT);
+        final Tuple lateMetricTuple = createMetricTuple(metricDefinition, new Metric(metricDefinition, prepareTime - MetricFilteringBolt.LAG_MESSAGE_PERIOD_DEFAULT, 42.0));
         bolt.execute(lateMetricTuple);
         verify(collector, times(1)).ack(lateMetricTuple);
+        bolt.setCurrentSeconds(prepareTime + MetricFilteringBolt.LAG_MESSAGE_PERIOD_DEFAULT);
+        final Tuple lateMetricTuple2 = createMetricTuple(metricDefinition, new Metric(metricDefinition, prepareTime, 42.0));
+        bolt.execute(lateMetricTuple2);
+        verify(collector, times(1)).ack(lateMetricTuple2);
         verify(collector, times(1)).emit(MetricAggregationBolt.METRIC_AGGREGATION_CONTROL_STREAM,
                 new Values(MetricAggregationBolt.METRICS_BEHIND));
         bolt.setCurrentSeconds(prepareTime + 2 * MetricFilteringBolt.LAG_MESSAGE_PERIOD_DEFAULT);
@@ -114,13 +117,19 @@ public class MetricFilteringBoltTest {
         long prepareTime = bolt.getCurrentSeconds();
         final MetricDefinition metricDefinition = subAlarms.get(0).getExpression().getMetricDefinition();
         // Fake sending metrics for MetricFilteringBolt.MAX_LAG_MESSAGES_DEFAULT * MetricFilteringBolt.LAG_MESSAGE_PERIOD_DEFAULT seconds
-        for (int i = 0; i < MetricFilteringBolt.MAX_LAG_MESSAGES_DEFAULT; i++) {
+        boolean first = true;
+        // Need to send MetricFilteringBolt.MAX_LAG_MESSAGES_DEFAULT + 1 metrics because the lag message is not
+        // output on the first one.
+        for (int i = 0; i < MetricFilteringBolt.MAX_LAG_MESSAGES_DEFAULT + 1; i++) {
             final Tuple lateMetricTuple = createMetricTuple(metricDefinition, new Metric(metricDefinition, prepareTime, 42.0));
             bolt.setCurrentSeconds(prepareTime + MetricFilteringBolt.LAG_MESSAGE_PERIOD_DEFAULT);
             bolt.execute(lateMetricTuple);
             verify(collector, times(1)).ack(lateMetricTuple);
-            verify(collector, times(i + 1)).emit(MetricAggregationBolt.METRIC_AGGREGATION_CONTROL_STREAM,
+            if (!first) {
+                verify(collector, times(i)).emit(MetricAggregationBolt.METRIC_AGGREGATION_CONTROL_STREAM,
                     new Values(MetricAggregationBolt.METRICS_BEHIND));
+            }
+            first = false;
             prepareTime = bolt.getCurrentSeconds();
         }
         // One more