diff --git a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricFilteringBolt.java b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricFilteringBolt.java index 65c7763..6c272f2 100644 --- a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricFilteringBolt.java +++ b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricFilteringBolt.java @@ -154,6 +154,9 @@ public class MetricFilteringBolt extends BaseRichBolt { MAX_LAG_MESSAGES * LAG_MESSAGE_PERIOD, minLag); lagging = false; } + else if (lastMinLagMessageSent == 0) { + lastMinLagMessageSent = now; + } else if ((now - lastMinLagMessageSent) >= LAG_MESSAGE_PERIOD) { LOG.info("Sending {} message, minLag = {}", MetricAggregationBolt.METRICS_BEHIND, minLag); collector.emit(MetricAggregationBolt.METRIC_AGGREGATION_CONTROL_STREAM, @@ -200,8 +203,7 @@ public class MetricFilteringBolt extends BaseRichBolt { } } } - // Not really when it was sent, but want to wait at least LAG_MESSAGE_PERIOD before sending a message - lastMinLagMessageSent = getCurrentSeconds(); + lastMinLagMessageSent = 0; } /** diff --git a/src/test/java/com/hpcloud/mon/infrastructure/thresholding/MetricFilteringBoltTest.java b/src/test/java/com/hpcloud/mon/infrastructure/thresholding/MetricFilteringBoltTest.java index 7cdec69..fa00391 100644 --- a/src/test/java/com/hpcloud/mon/infrastructure/thresholding/MetricFilteringBoltTest.java +++ b/src/test/java/com/hpcloud/mon/infrastructure/thresholding/MetricFilteringBoltTest.java @@ -92,10 +92,13 @@ public class MetricFilteringBoltTest { final long prepareTime = bolt.getCurrentSeconds(); final MetricDefinition metricDefinition = subAlarms.get(0).getExpression().getMetricDefinition(); - final Tuple lateMetricTuple = createMetricTuple(metricDefinition, new Metric(metricDefinition, prepareTime, 42.0)); - bolt.setCurrentSeconds(prepareTime + MetricFilteringBolt.LAG_MESSAGE_PERIOD_DEFAULT); + final Tuple lateMetricTuple = createMetricTuple(metricDefinition, new Metric(metricDefinition, prepareTime - MetricFilteringBolt.LAG_MESSAGE_PERIOD_DEFAULT, 42.0)); bolt.execute(lateMetricTuple); verify(collector, times(1)).ack(lateMetricTuple); + bolt.setCurrentSeconds(prepareTime + MetricFilteringBolt.LAG_MESSAGE_PERIOD_DEFAULT); + final Tuple lateMetricTuple2 = createMetricTuple(metricDefinition, new Metric(metricDefinition, prepareTime, 42.0)); + bolt.execute(lateMetricTuple2); + verify(collector, times(1)).ack(lateMetricTuple2); verify(collector, times(1)).emit(MetricAggregationBolt.METRIC_AGGREGATION_CONTROL_STREAM, new Values(MetricAggregationBolt.METRICS_BEHIND)); bolt.setCurrentSeconds(prepareTime + 2 * MetricFilteringBolt.LAG_MESSAGE_PERIOD_DEFAULT); @@ -114,13 +117,19 @@ public class MetricFilteringBoltTest { long prepareTime = bolt.getCurrentSeconds(); final MetricDefinition metricDefinition = subAlarms.get(0).getExpression().getMetricDefinition(); // Fake sending metrics for MetricFilteringBolt.MAX_LAG_MESSAGES_DEFAULT * MetricFilteringBolt.LAG_MESSAGE_PERIOD_DEFAULT seconds - for (int i = 0; i < MetricFilteringBolt.MAX_LAG_MESSAGES_DEFAULT; i++) { + boolean first = true; + // Need to send MetricFilteringBolt.MAX_LAG_MESSAGES_DEFAULT + 1 metrics because the lag message is not + // output on the first one. + for (int i = 0; i < MetricFilteringBolt.MAX_LAG_MESSAGES_DEFAULT + 1; i++) { final Tuple lateMetricTuple = createMetricTuple(metricDefinition, new Metric(metricDefinition, prepareTime, 42.0)); bolt.setCurrentSeconds(prepareTime + MetricFilteringBolt.LAG_MESSAGE_PERIOD_DEFAULT); bolt.execute(lateMetricTuple); verify(collector, times(1)).ack(lateMetricTuple); - verify(collector, times(i + 1)).emit(MetricAggregationBolt.METRIC_AGGREGATION_CONTROL_STREAM, + if (!first) { + verify(collector, times(i)).emit(MetricAggregationBolt.METRIC_AGGREGATION_CONTROL_STREAM, new Values(MetricAggregationBolt.METRICS_BEHIND)); + } + first = false; prepareTime = bolt.getCurrentSeconds(); } // One more