diff --git a/thresh/src/main/java/monasca/thresh/domain/model/SubAlarm.java b/thresh/src/main/java/monasca/thresh/domain/model/SubAlarm.java index 6c4313d..0ce5d7b 100644 --- a/thresh/src/main/java/monasca/thresh/domain/model/SubAlarm.java +++ b/thresh/src/main/java/monasca/thresh/domain/model/SubAlarm.java @@ -193,4 +193,34 @@ public class SubAlarm extends AbstractEntity implements Serializable { // Operator and Threshold can vary return true; } + + public boolean canEvaluateImmediately() { + switch (this.getExpression().getFunction()) { + // MIN never gets larger so if the operator is < or <=, + // then they can be immediately evaluated + case MIN: + switch(this.getExpression().getOperator()) { + case LT: + case LTE: + return true; + default: + return false; + } + // These two never get smaller so if the operator is > or >=, + // then they can be immediately evaluated + case MAX: + case COUNT: + switch(this.getExpression().getOperator()) { + case GT: + case GTE: + return true; + default: + return false; + } + // SUM can increase on a positive measurement or decrease on a negative + // AVG can't be computed until all the metrics have come in + default: + return false; + } + } } diff --git a/thresh/src/main/java/monasca/thresh/domain/model/SubAlarmStats.java b/thresh/src/main/java/monasca/thresh/domain/model/SubAlarmStats.java index e4ffcc1..8ecc0d0 100644 --- a/thresh/src/main/java/monasca/thresh/domain/model/SubAlarmStats.java +++ b/thresh/src/main/java/monasca/thresh/domain/model/SubAlarmStats.java @@ -120,13 +120,31 @@ public class SubAlarmStats { * @param alarmDelay How long to give metrics a chance to arrive */ boolean evaluate(final long now, long alarmDelay) { - if (!stats.shouldEvaluate(now, alarmDelay)) { - return false; + + final AlarmState newState; + if (immediateAlarmEvaluate()) { + newState = AlarmState.ALARM; } - double[] values = stats.getViewValues(); + else { + if (!stats.shouldEvaluate(now, alarmDelay)) { + return false; + } + newState = determineAlarmStateUsingView(); + } + if (shouldSendStateChange(newState) && + (stats.shouldEvaluate(now, alarmDelay) || + (newState == AlarmState.ALARM && this.subAlarm.canEvaluateImmediately()))) { + setSubAlarmState(newState); + return true; + } + return false; + } + + private AlarmState determineAlarmStateUsingView() { boolean thresholdExceeded = false; boolean hasEmptyWindows = false; subAlarm.clearCurrentValues(); + double[] values = stats.getViewValues(); for (double value : values) { if (Double.isNaN(value)) { hasEmptyWindows = true; @@ -137,38 +155,61 @@ public class SubAlarmStats { // Check if value is OK if (!subAlarm.getExpression().getOperator() .evaluate(value, subAlarm.getExpression().getThreshold())) { - if (!shouldSendStateChange(AlarmState.OK)) { - return false; - } - setSubAlarmState(AlarmState.OK); - return true; + return AlarmState.OK; } else thresholdExceeded = true; } } if (thresholdExceeded && !hasEmptyWindows) { - if (!shouldSendStateChange(AlarmState.ALARM)) { - return false; - } - setSubAlarmState(AlarmState.ALARM); - return true; + return AlarmState.ALARM; } // Window is empty at this point emptyWindowObservations++; - if ((emptyWindowObservations >= emptyWindowObservationThreshold) && shouldSendStateChange(AlarmState.UNDETERMINED) && !subAlarm.isSporadicMetric()) { - setSubAlarmState(AlarmState.UNDETERMINED); - return true; + return AlarmState.UNDETERMINED; } + // Hasn't transitioned to UNDETERMINED yet, so use the current state + return null; + } + + private boolean immediateAlarmEvaluate() { + if (!this.subAlarm.canEvaluateImmediately()) { + return false; + } + // Check the future slots as well + final double[] allValues = stats.getWindowValues(); + subAlarm.clearCurrentValues(); + int alarmRun = 0; + for (final double value : allValues) { + if (Double.isNaN(value)) { + alarmRun = 0; + subAlarm.clearCurrentValues(); + } else { + + // Check if value is OK + if (!subAlarm.getExpression().getOperator() + .evaluate(value, subAlarm.getExpression().getThreshold())) { + alarmRun = 0; + subAlarm.clearCurrentValues(); + } + else { + subAlarm.addCurrentValue(value); + alarmRun++; + if (alarmRun == subAlarm.getExpression().getPeriods()) { + return true; + } + } + } + } return false; } private boolean shouldSendStateChange(AlarmState newState) { - return !subAlarm.getState().equals(newState) || subAlarm.isNoState(); + return newState != null && (!subAlarm.getState().equals(newState) || subAlarm.isNoState()); } private void setSubAlarmState(AlarmState newState) { @@ -191,3 +232,4 @@ public class SubAlarmStats { } } } + diff --git a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/MetricAggregationBolt.java b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/MetricAggregationBolt.java index 18b1f5e..2ec9a73 100644 --- a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/MetricAggregationBolt.java +++ b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/MetricAggregationBolt.java @@ -183,6 +183,9 @@ public class MetricAggregationBolt extends BaseRichBolt { if (stats.getStats().addValue(metric.value, timestamp_secs)) { logger.trace("Aggregated value {} at {} for {}. Updated {}", metric.value, metric.timestamp, metricDefinitionAndTenantId, stats.getStats()); + if (stats.evaluateAndSlideWindow(timestamp_secs, config.alarmDelay)) { + sendSubAlarmStateChange(stats); + } } else { logger.warn("Metric is too old, age {} seconds: timestamp {} for {}, {}", currentTimeSeconds() - timestamp_secs, timestamp_secs, metricDefinitionAndTenantId, @@ -202,9 +205,7 @@ public class MetricAggregationBolt extends BaseRichBolt { if (upToDate) { logger.debug("Evaluating {}", subAlarmStats); if (subAlarmStats.evaluateAndSlideWindow(newWindowTimestamp, config.alarmDelay)) { - logger.debug("Alarm state changed for {}", subAlarmStats); - collector.emit(new Values(subAlarmStats.getSubAlarm().getAlarmId(), subAlarmStats - .getSubAlarm())); + sendSubAlarmStateChange(subAlarmStats); } } else { subAlarmStats.slideWindow(newWindowTimestamp, config.alarmDelay); @@ -216,6 +217,12 @@ public class MetricAggregationBolt extends BaseRichBolt { } } + private void sendSubAlarmStateChange(SubAlarmStats subAlarmStats) { + logger.debug("Alarm state changed for {}", subAlarmStats); + collector.emit(new Values(subAlarmStats.getSubAlarm().getAlarmId(), subAlarmStats + .getSubAlarm())); + } + /** * Only used for testing. * diff --git a/thresh/src/test/java/monasca/thresh/domain/model/SubAlarmStatsTest.java b/thresh/src/test/java/monasca/thresh/domain/model/SubAlarmStatsTest.java index c267da4..bec1fcb 100644 --- a/thresh/src/test/java/monasca/thresh/domain/model/SubAlarmStatsTest.java +++ b/thresh/src/test/java/monasca/thresh/domain/model/SubAlarmStatsTest.java @@ -18,6 +18,7 @@ package monasca.thresh.domain.model; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import monasca.common.model.alarm.AlarmState; @@ -45,31 +46,31 @@ public class SubAlarmStatsTest { } public void shouldBeOkIfAnySlotsInViewAreBelowThreshold() { - subAlarmStats.getStats().addValue(5, 1); + sendMetric(5, 1, false); assertFalse(subAlarmStats.evaluateAndSlideWindow(62, 1)); assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED); - subAlarmStats.getStats().addValue(1, 62); + sendMetric(1, 62, false); assertTrue(subAlarmStats.evaluateAndSlideWindow(122, 1)); // This went to OK because at least one period is under the threshold assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK); - subAlarmStats.getStats().addValue(5, 123); + sendMetric(5, 123, false); assertFalse(subAlarmStats.evaluateAndSlideWindow(182, 1)); // Still one under the threshold assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK); } public void shouldBeAlarmedIfAllSlotsInViewExceedThreshold() { - subAlarmStats.getStats().addValue(5, 1); + sendMetric(5, 1, false); assertFalse(subAlarmStats.evaluateAndSlideWindow(62, 1)); assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED); - subAlarmStats.getStats().addValue(5, 62); + sendMetric(5, 62, false); assertFalse(subAlarmStats.evaluateAndSlideWindow(122, 1)); assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED); - subAlarmStats.getStats().addValue(5, 123); + sendMetric(5, 123, false); assertTrue(subAlarmStats.evaluateAndSlideWindow(182, 1)); assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.ALARM); } @@ -81,32 +82,102 @@ public class SubAlarmStatsTest { long initialTime = 11; assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED); - assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1)); - assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1)); - assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1)); + assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 10)); + assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED); + assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 10)); + assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED); + assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 10)); + assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED); // Add value and trigger OK - subAlarmStats.getStats().addValue(1, initialTime - 1); + sendMetric(1, initialTime - 1, false); + assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 10)); + assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK); + + // Slide in some values that exceed the threshold + sendMetric(5, initialTime - 1, false); + assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 10)); + assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK); + sendMetric(5, initialTime - 1, false); + assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 10)); + assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK); + sendMetric(5, initialTime - 1, false); + assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK); + + // Trigger ALARM + assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 10)); + assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.ALARM); + + // Add value and trigger OK + sendMetric(1, initialTime - 1, false); + assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 10)); + assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK); + + // Must slide 8 times total from the last added value to trigger UNDETERMINED. This is + // equivalent to the behavior in CloudWatch for an alarm with 3 evaluation periods. 2 more + // slides to move the value outside of the window and 6 more to exceed the observation + // threshold. + for (int i = 0; i < 7; i++) { + assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 10)); + } + assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 10)); + assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED); + sendMetric(5, initialTime - 1, false); + } + + private void sendMetric(double value, long timestamp, boolean expected) { + subAlarmStats.getStats().addValue(value, timestamp); + assertEquals(subAlarmStats.evaluateAndSlideWindow(timestamp, timestamp), expected); + } + + /** + * Simulates the way a window will fill up in practice. + */ + public void shouldImmediatelyEvaluate() { + long initialTime = 11; + + // Need a different expression for this test + expression = + new SubExpression(UUID.randomUUID().toString(), + AlarmSubExpression.of("max(hpcs.compute.cpu{id=5}, 60) > 3 times 3")); + subAlarm = new SubAlarm("123", "1", expression); + subAlarm.setNoState(true); + subAlarmStats = new SubAlarmStats(subAlarm, expression.getAlarmSubExpression().getPeriod()); + + assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED); + assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1)); + assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED); + assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1)); + assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED); + assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1)); + assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED); + + // Add value and trigger OK + sendMetric(1, initialTime - 1, false); assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1)); assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK); // Slide in some values that exceed the threshold - subAlarmStats.getStats().addValue(5, initialTime - 1); + sendMetric(5, initialTime - 1, false); assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1)); - subAlarmStats.getStats().addValue(5, initialTime - 1); + assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK); + sendMetric(5, initialTime - 1, false); assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1)); - subAlarmStats.getStats().addValue(5, initialTime - 1); // Trigger ALARM - assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1)); + sendMetric(5, initialTime - 1, true); + assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.ALARM); + + // Ensure it is still ALARM on next evaluation + assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1)); assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.ALARM); // Add value and trigger OK - subAlarmStats.getStats().addValue(1, initialTime - 1); + sendMetric(1, initialTime - 1, false); assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1)); assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK); - // Must slide 9 times total from the last added value to trigger UNDETERMINED. This is + // Must slide 8 times total from the last added value to trigger UNDETERMINED. This is // equivalent to the behavior in CloudWatch for an alarm with 3 evaluation periods. 2 more // slides to move the value outside of the window and 6 more to exceed the observation // threshold. @@ -115,7 +186,18 @@ public class SubAlarmStatsTest { } assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1)); assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED); - subAlarmStats.getStats().addValue(5, initialTime - 1); + + // Now test that future buckets are evaluated + // Set the current bucket to ALARM + sendMetric(5, initialTime - 1, false); + assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED); + // Set the future bucket of current + 2 to ALARM + sendMetric(5, initialTime + 120, false); + assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED); + // Set the future bucket of current + 1 to ALARM. That will trigger the + // SubAlarm to go to ALARM + sendMetric(5, initialTime + 60, true); + assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.ALARM); } public void shouldAlarmIfAllSlotsAlarmed() { @@ -125,13 +207,13 @@ public class SubAlarmStatsTest { assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1)); - subAlarmStats.getStats().addValue(5, initialTime - 1); + sendMetric(5, initialTime - 1, false); assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1)); - subAlarmStats.getStats().addValue(5, initialTime - 1); + sendMetric(5, initialTime - 1, false); assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1)); - subAlarmStats.getStats().addValue(5, initialTime - 1); + sendMetric(5, initialTime - 1, false); assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1)); assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.ALARM); } @@ -145,6 +227,31 @@ public class SubAlarmStatsTest { assertEquals(saStats.emptyWindowObservationThreshold, 6); } + public void checkUpdateSubAlarm() { + // Can keep data with threshold change + verifyUpdateSubAlarm(expression.getAlarmSubExpression().getExpression().replace("> 3", "> 6"), 100.0); + // Can keep data with operator change + verifyUpdateSubAlarm(expression.getAlarmSubExpression().getExpression().replace("< 3", "< 6"), 100.0); + // Have to flush data with function change + verifyUpdateSubAlarm(expression.getAlarmSubExpression().getExpression().replace("avg", "max"), Double.NaN); + // Have to flush data with periods change + verifyUpdateSubAlarm(expression.getAlarmSubExpression().getExpression().replace("times 3", "times 2"), Double.NaN); + // Have to flush data with period change + verifyUpdateSubAlarm(expression.getAlarmSubExpression().getExpression().replace(", 60", ", 120"), Double.NaN); + } + + private void verifyUpdateSubAlarm(String newExpressionString, double expectedValue) { + final AlarmSubExpression newExpression = AlarmSubExpression.of(newExpressionString); + assertNotEquals(newExpression, expression.getAlarmSubExpression().getExpression()); + int timestamp = expression.getAlarmSubExpression().getPeriod() / 2; + sendMetric(100.00, timestamp, false); + assertEquals(subAlarmStats.getStats().getValue(timestamp), 100.0); + subAlarmStats.updateSubAlarm(newExpression, expression.getAlarmSubExpression().getPeriod()); + assertEquals(subAlarmStats.getStats().getValue(timestamp), expectedValue); + assertTrue(subAlarm.isNoState()); + } + + public void checkLongPeriod() { final SubExpression subExpr = new SubExpression(UUID.randomUUID().toString(), AlarmSubExpression.of("sum(hpcs.compute.mem{id=5}, 120) >= 96")); diff --git a/thresh/src/test/java/monasca/thresh/domain/model/SubAlarmTest.java b/thresh/src/test/java/monasca/thresh/domain/model/SubAlarmTest.java new file mode 100644 index 0000000..c8e1080 --- /dev/null +++ b/thresh/src/test/java/monasca/thresh/domain/model/SubAlarmTest.java @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2015 Hewlett-Packard Development Company, L.P. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monasca.thresh.domain.model; + +import static org.testng.Assert.assertEquals; + +import monasca.common.model.alarm.AlarmSubExpression; + +import org.testng.annotations.Test; + +import java.util.UUID; + +@Test +public class SubAlarmTest { + + public void checkCanEvaluateImmediately() { + checkExpression("avg(hpcs.compute.cpu{id=5}, 60) > 3 times 3", false); + checkExpression("avg(hpcs.compute.cpu{id=5}, 60) >= 3 times 3", false); + checkExpression("avg(hpcs.compute.cpu{id=5}, 60) < 3 times 3", false); + checkExpression("avg(hpcs.compute.cpu{id=5}, 60) <= 3 times 3", false); + + checkExpression("sum(hpcs.compute.cpu{id=5}, 60) > 3 times 3", false); + checkExpression("sum(hpcs.compute.cpu{id=5}, 60) >= 3 times 3", false); + checkExpression("sum(hpcs.compute.cpu{id=5}, 60) < 3 times 3", false); + checkExpression("sum(hpcs.compute.cpu{id=5}, 60) <= 3 times 3", false); + + checkExpression("count(hpcs.compute.cpu{id=5}, 60) < 3 times 3", false); + checkExpression("count(hpcs.compute.cpu{id=5}, 60) <= 3 times 3", false); + checkExpression("count(hpcs.compute.cpu{id=5}, 60) > 3 times 3", true); + checkExpression("count(hpcs.compute.cpu{id=5}, 60) >= 3 times 3", true); + + checkExpression("max(hpcs.compute.cpu{id=5}, 60) > 3 times 3", true); + checkExpression("max(hpcs.compute.cpu{id=5}, 60) >= 3 times 3", true); + checkExpression("max(hpcs.compute.cpu{id=5}, 60) < 3 times 3", false); + checkExpression("max(hpcs.compute.cpu{id=5}, 60) <= 3 times 3", false); + + checkExpression("min(hpcs.compute.cpu{id=5}, 60) > 3 times 3", false); + checkExpression("min(hpcs.compute.cpu{id=5}, 60) >= 3 times 3", false); + checkExpression("min(hpcs.compute.cpu{id=5}, 60) < 3 times 3", true); + checkExpression("min(hpcs.compute.cpu{id=5}, 60) <= 3 times 3", true); + } + + private void checkExpression(String expressionString, boolean expected) { + final SubExpression expression = + new SubExpression(UUID.randomUUID().toString(), + AlarmSubExpression.of(expressionString)); + final SubAlarm subAlarm = new SubAlarm(UUID.randomUUID().toString(), "1", expression); + assertEquals(subAlarm.canEvaluateImmediately(), expected); + } +} diff --git a/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/MetricAggregationBoltTest.java b/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/MetricAggregationBoltTest.java index 012889c..0805c21 100644 --- a/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/MetricAggregationBoltTest.java +++ b/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/MetricAggregationBoltTest.java @@ -82,7 +82,7 @@ public class MetricAggregationBoltTest { System.clearProperty(MetricAggregationBolt.TICK_TUPLE_SECONDS_KEY); subExpr1 = new SubExpression("444", AlarmSubExpression.of("avg(hpcs.compute.cpu{id=5}, 60) >= 90 times 3")); subExpr2 = new SubExpression("555", AlarmSubExpression.of("avg(hpcs.compute.mem{id=5}, 60) >= 90")); - subExpr3 = new SubExpression("666", AlarmSubExpression.of("avg(hpcs.compute.mem{id=5}, 60) >= 96")); + subExpr3 = new SubExpression("666", AlarmSubExpression.of("max(hpcs.compute.mem{id=5}, 60) >= 96")); metricDef1 = subExpr1.getAlarmSubExpression().getMetricDefinition(); metricDef2 = subExpr2.getAlarmSubExpression().getMetricDefinition(); metricDef3 = subExpr3.getAlarmSubExpression().getMetricDefinition(); @@ -100,7 +100,7 @@ public class MetricAggregationBoltTest { subAlarms.add(subAlarm3); final ThresholdingConfiguration config = new ThresholdingConfiguration(); - config.alarmDelay = 1; + config.alarmDelay = 10; bolt = new MockMetricAggregationBolt(config); context = mock(TopologyContext.class); collector = mock(OutputCollector.class); @@ -150,11 +150,9 @@ public class MetricAggregationBoltTest { bolt.execute(createMetricTuple(metricDef1, new Metric(metricDef1, t1 - 60000, 95, null))); bolt.execute(createMetricTuple(metricDef1, new Metric(metricDef1, t1 - 120000, 88, null))); - t1 += 20000; + t1 += 25000; bolt.setCurrentTime(t1); - final Tuple tickTuple = createTickTuple(); - bolt.execute(tickTuple); - verify(collector, times(1)).ack(tickTuple); + sendTickTuple(); assertEquals(subAlarm1.getState(), AlarmState.OK); assertEquals(subAlarm2.getState(), AlarmState.UNDETERMINED); @@ -172,8 +170,7 @@ public class MetricAggregationBoltTest { bolt.execute(createMetricTuple(metricDef2, new Metric(metricDef2, t1, 94, null))); t1 += 50000; bolt.setCurrentTime(t1); - bolt.execute(tickTuple); - verify(collector, times(1)).ack(tickTuple); + sendTickTuple(); assertEquals(subAlarm1.getState(), AlarmState.ALARM); assertEquals(subAlarm2.getState(), AlarmState.ALARM); @@ -183,9 +180,101 @@ public class MetricAggregationBoltTest { verify(collector, times(1)).emit(new Values(subAlarm3.getAlarmId(), subAlarm3)); } + public void shouldImmediatelyEvaluateSubAlarm() { + // Ensure subAlarm2 and subAlarm3 map to the same Metric Definition + assertEquals(metricDef3, metricDef2); + + long t1 = 170000; + bolt.setCurrentTime(t1); + sendSubAlarmCreated(metricDef2, subAlarm2); + sendSubAlarmCreated(metricDef3, subAlarm3); + + // Send metric for subAlarm2 and subAlarm3 + bolt.execute(createMetricTuple(metricDef3, new Metric(metricDef3, t1 + 1000, 100000, null))); + + // subAlarm2 is AVG so it can't be evaluated immediately like the MAX for subalarm3 + assertEquals(subAlarm2.getState(), AlarmState.UNDETERMINED); + assertEquals(subAlarm3.getState(), AlarmState.ALARM); + + verify(collector, never()).emit(new Values(subAlarm2.getAlarmId(), subAlarm2)); + verify(collector, times(1)).emit(new Values(subAlarm3.getAlarmId(), subAlarm3)); + + // Have to reset the mock so it can tell the difference when subAlarm2 and subAlarm3 are emitted + // again. + reset(collector); + + t1 = 195000; + bolt.setCurrentTime(t1); + sendTickTuple(); + + assertEquals(subAlarm2.getState(), AlarmState.ALARM); + assertEquals(subAlarm3.getState(), AlarmState.ALARM); + verify(collector, times(1)).emit(new Values(subAlarm2.getAlarmId(), subAlarm2)); + verify(collector, never()).emit(new Values(subAlarm3.getAlarmId(), subAlarm3)); + + // Have to reset the mock so it can tell the difference when subAlarm2 and subAlarm3 are emitted + // again. + reset(collector); + + // Now drive SubAlarms back to OK + t1 = 235000; + bolt.setCurrentTime(t1); + bolt.execute(createMetricTuple(metricDef3, new Metric(metricDef3, t1 + 1000, 20, null))); + + t1 = 315000; + bolt.setCurrentTime(t1); + + bolt.execute(createMetricTuple(metricDef3, new Metric(metricDef3, t1 + 1000, 20, null))); + + sendTickTuple(); + + assertEquals(subAlarm2.getState(), AlarmState.OK); + assertEquals(subAlarm3.getState(), AlarmState.OK); + verify(collector, times(1)).emit(new Values(subAlarm2.getAlarmId(), subAlarm2)); + verify(collector, times(1)).emit(new Values(subAlarm3.getAlarmId(), subAlarm3)); + + // Have to reset the mock so it can tell the difference when subAlarm2 and subAlarm3 are emitted + // again. + reset(collector); + + // Now send a metric that is after the window end time but within alarm delay + t1 = 365000; + bolt.setCurrentTime(t1); + bolt.execute(createMetricTuple(metricDef3, new Metric(metricDef3, t1 + 1000, 100000, null))); + + // subAlarm2 is AVG so it can't be evaluated immediately like the MAX for subalarm3 + assertEquals(subAlarm2.getState(), AlarmState.OK); + assertEquals(subAlarm3.getState(), AlarmState.ALARM); + + verify(collector, never()).emit(new Values(subAlarm2.getAlarmId(), subAlarm2)); + verify(collector, times(1)).emit(new Values(subAlarm3.getAlarmId(), subAlarm3)); + + // Have to reset the mock so it can tell the difference when subAlarm2 and subAlarm3 are emitted + // again. + reset(collector); + + t1 = 375000; + bolt.setCurrentTime(t1); + + sendTickTuple(); + + // Ensure that subAlarm3 is still ALARM. subAlarm2 is still OK but because the metric + // that triggered ALARM is in the future bucket + assertEquals(subAlarm2.getState(), AlarmState.OK); + assertEquals(subAlarm3.getState(), AlarmState.ALARM); + verify(collector, never()).emit(new Values(subAlarm2.getAlarmId(), subAlarm2)); + verify(collector, never()).emit(new Values(subAlarm3.getAlarmId(), subAlarm3)); + } + + private void sendTickTuple() { + final Tuple tickTuple = createTickTuple(); + bolt.execute(tickTuple); + verify(collector, times(1)).ack(tickTuple); + } + public void shouldSendAlarmAgain() { - long t1 = 10000; + long t1 = 12000; bolt.setCurrentTime(t1); sendSubAlarmCreated(metricDef2, subAlarm2); @@ -195,11 +284,9 @@ public class MetricAggregationBoltTest { t1 += 60000; bolt.setCurrentTime(t1); - final Tuple tickTuple = createTickTuple(); - bolt.execute(tickTuple); + sendTickTuple(); verify(collector, times(1)).emit(new Values(subAlarm2.getAlarmId(), subAlarm2)); assertEquals(subAlarm2.getState(), AlarmState.ALARM); - verify(collector, times(1)).ack(tickTuple); sendSubAlarmResend(metricDef2, subAlarm2); @@ -209,9 +296,7 @@ public class MetricAggregationBoltTest { t1 += 60000; bolt.setCurrentTime(t1); - bolt.execute(tickTuple); - verify(collector, times(2)).ack(tickTuple); - + sendTickTuple(); assertEquals(subAlarm2.getState(), AlarmState.ALARM); verify(collector, times(2)).emit(new Values(subAlarm2.getAlarmId(), subAlarm2)); } @@ -253,12 +338,11 @@ public class MetricAggregationBoltTest { bolt.execute(createMetricTuple(metricDef2, new Metric(metricDef2, t1, 1.0, null))); bolt.setCurrentTime(t1 += 60000); - final Tuple tickTuple = createTickTuple(); - bolt.execute(tickTuple); + sendTickTuple(); assertEquals(subAlarm2.getState(), AlarmState.OK); bolt.setCurrentTime(t1 += 60000); - bolt.execute(tickTuple); + sendTickTuple(); assertEquals(subAlarm2.getState(), AlarmState.OK); verify(collector, times(1)).emit(new Values(subAlarm2.getAlarmId(), subAlarm2)); @@ -266,7 +350,7 @@ public class MetricAggregationBoltTest { reset(collector); bolt.setCurrentTime(t1 += 60000); - bolt.execute(tickTuple); + sendTickTuple(); assertEquals(subAlarm2.getState(), AlarmState.UNDETERMINED); verify(collector, times(1)).emit(new Values(subAlarm2.getAlarmId(), subAlarm2)); } @@ -283,23 +367,19 @@ public class MetricAggregationBoltTest { bolt.execute(lagTuple); verify(collector, times(1)).ack(lagTuple); - final Tuple tickTuple = createTickTuple(); t1 += 60000; bolt.setCurrentTime(t1); - bolt.execute(tickTuple); - verify(collector, times(1)).ack(tickTuple); + sendTickTuple(); verify(collector, never()).emit(new Values(subAlarm2.getAlarmId(), subAlarm2)); t1 += 60000; bolt.setCurrentTime(t1); - bolt.execute(tickTuple); - verify(collector, times(2)).ack(tickTuple); + sendTickTuple(); verify(collector, never()).emit(new Values(subAlarm2.getAlarmId(), subAlarm2)); t1 += 60000; bolt.setCurrentTime(t1); - bolt.execute(tickTuple); - verify(collector, times(3)).ack(tickTuple); + sendTickTuple(); assertEquals(subAlarm2.getState(), AlarmState.UNDETERMINED); verify(collector, times(1)).emit(new Values(subAlarm2.getAlarmId(), subAlarm2)); @@ -408,7 +488,7 @@ public class MetricAggregationBoltTest { bolt.getOrCreateSubAlarmStatsRepo(metricDefinitionAndTenantId); sendSubAlarmCreated(metricDef1, subAlarm1); - + assertNotNull(bolt.metricDefToSubAlarmStatsRepos.get(metricDefinitionAndTenantId).get(ALARM_ID_1)); // We don't have an AlarmDefinition so no id, but the MetricAggregationBolt doesn't use this