From 63290a82abfd10c2c3fb6fe3abf42647d45bc868 Mon Sep 17 00:00:00 2001 From: Craig Bryant Date: Mon, 28 Apr 2014 16:38:35 -0600 Subject: [PATCH] JIRA JAH-10 On start-up threshold engine should wait for metrics for all periods prior to transitioning to an alarmed state Change so that all view slots must have metrics before the SubAlarm transitions to ALARM. Also, change it so SubAlarmStats doesn't transition it to UNDETERMINED on startup until there have been emptyWindowObservationThreshold calls to evaluate(). Previously it only required one for new sub alarms and sub alarms on restart. Want the ThresholdEngine to have same behavior on restart as it would when the Threshold Engine has been running for a long time. --- pom.xml | 2 +- .../mon/domain/model/SubAlarmStats.java | 21 ++-- .../service/SubAlarmStatsRepository.java | 10 -- .../thresholding/MetricAggregationBolt.java | 25 +++-- .../mon/ThresholdingEngineAlarmTest.java | 25 ++--- .../hpcloud/mon/ThresholdingEngineTest.java | 2 +- .../mon/domain/model/SubAlarmStatsTest.java | 101 ++++++++++++------ .../MetricAggregationBoltTest.java | 65 ++++++++--- 8 files changed, 158 insertions(+), 93 deletions(-) diff --git a/pom.xml b/pom.xml index f6257a7..4d4703a 100644 --- a/pom.xml +++ b/pom.xml @@ -14,7 +14,7 @@ - 1.0.0.35 + 1.0.0.41 true UTF-8 diff --git a/src/main/java/com/hpcloud/mon/domain/model/SubAlarmStats.java b/src/main/java/com/hpcloud/mon/domain/model/SubAlarmStats.java index bf4cabb..00cc8d9 100644 --- a/src/main/java/com/hpcloud/mon/domain/model/SubAlarmStats.java +++ b/src/main/java/com/hpcloud/mon/domain/model/SubAlarmStats.java @@ -33,6 +33,7 @@ public class SubAlarmStats { public SubAlarmStats(SubAlarm subAlarm, TimeResolution timeResolution, long viewEndTimestamp) { slotWidth = subAlarm.getExpression().getPeriod(); this.subAlarm = subAlarm; + this.subAlarm.setNoState(true); this.stats = new SlidingWindowStats(subAlarm.getExpression().getFunction().toStatistic(), timeResolution, slotWidth, subAlarm.getExpression().getPeriods(), FUTURE_SLOTS, viewEndTimestamp); @@ -41,7 +42,7 @@ public class SubAlarmStats { // convert to minutes emptyWindowObservationThreshold = periodMinutes * subAlarm.getExpression().getPeriods() * UNDETERMINED_COEFFICIENT; - emptyWindowObservations = emptyWindowObservationThreshold; + emptyWindowObservations = 0; } /** @@ -89,8 +90,11 @@ public class SubAlarmStats { double[] values = stats.getViewValues(); AlarmState initialState = subAlarm.getState(); boolean thresholdExceeded = false; + boolean hasEmptyWindows = false; for (double value : values) { - if (!Double.isNaN(value)) { + if (Double.isNaN(value)) + hasEmptyWindows = true; + else { emptyWindowObservations = 0; // Check if value is OK @@ -99,17 +103,17 @@ public class SubAlarmStats { .evaluate(value, subAlarm.getExpression().getThreshold())) { if (AlarmState.OK.equals(initialState)) return false; - subAlarm.setState(AlarmState.OK); + setSubAlarmState(AlarmState.OK); return true; } else thresholdExceeded = true; } } - if (thresholdExceeded) { + if (thresholdExceeded && !hasEmptyWindows) { if (AlarmState.ALARM.equals(initialState)) return false; - subAlarm.setState(AlarmState.ALARM); + setSubAlarmState(AlarmState.ALARM); return true; } @@ -119,13 +123,18 @@ public class SubAlarmStats { if ((emptyWindowObservations >= emptyWindowObservationThreshold) && (subAlarm.isNoState() || !AlarmState.UNDETERMINED.equals(initialState)) && !subAlarm.isSporadicMetric()) { - subAlarm.setState(AlarmState.UNDETERMINED); + setSubAlarmState(AlarmState.UNDETERMINED); return true; } return false; } +private void setSubAlarmState(AlarmState newState) { + subAlarm.setState(newState); + subAlarm.setNoState(false); +} + /** * This MUST only be used for compatible SubAlarms, i.e. where * this.subAlarm.isCompatible(subAlarm) is true diff --git a/src/main/java/com/hpcloud/mon/domain/service/SubAlarmStatsRepository.java b/src/main/java/com/hpcloud/mon/domain/service/SubAlarmStatsRepository.java index bb27133..9c4b622 100644 --- a/src/main/java/com/hpcloud/mon/domain/service/SubAlarmStatsRepository.java +++ b/src/main/java/com/hpcloud/mon/domain/service/SubAlarmStatsRepository.java @@ -2,7 +2,6 @@ package com.hpcloud.mon.domain.service; import java.util.Collection; import java.util.HashMap; -import java.util.List; import java.util.Map; import com.hpcloud.mon.domain.model.SubAlarm; @@ -16,15 +15,6 @@ import com.hpcloud.mon.domain.model.SubAlarmStats; public class SubAlarmStatsRepository { private final Map subAlarmStats = new HashMap(); - /** - * Creates a new SubAlarmStatsRepository initialized with SubAlarmStats for each of the - * {@code subAlarms} with the {@code viewEndTimestamp}. - */ - public SubAlarmStatsRepository(List subAlarms, long viewEndTimestamp) { - for (SubAlarm subAlarm : subAlarms) - add(subAlarm, viewEndTimestamp); - } - /** * Creates a new SubAlarmStats instance for the {@code subAlarm} and {@code viewEndTimestamp} and * adds it to the repository. diff --git a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricAggregationBolt.java b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricAggregationBolt.java index 2b62ca6..5b4254f 100644 --- a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricAggregationBolt.java +++ b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricAggregationBolt.java @@ -64,7 +64,6 @@ public class MetricAggregationBolt extends BaseRichBolt { /** Namespaces for which metrics are received sporadically */ private Set sporadicMetricNamespaces = Collections.emptySet(); private OutputCollector collector; - private int evaluationTimeOffset; private boolean upToDate = true; public MetricAggregationBolt(SubAlarmDAO subAlarmDAO) { @@ -141,8 +140,6 @@ public class MetricAggregationBolt extends BaseRichBolt { LOG = LoggerFactory.getLogger(Logging.categoryFor(getClass(), context)); LOG.info("Preparing"); this.collector = collector; - evaluationTimeOffset = Integer.valueOf(System.getProperty(TICK_TUPLE_SECONDS_KEY, "60")) - .intValue(); if (subAlarmDAO == null) { Injector.registerIfNotBound(SubAlarmDAO.class, new PersistenceModule(dbConfig)); @@ -178,7 +175,7 @@ public class MetricAggregationBolt extends BaseRichBolt { upToDate = true; return; } - long newWindowTimestamp = System.currentTimeMillis() / 1000; + long newWindowTimestamp = currentTimeSeconds(); for (SubAlarmStatsRepository subAlarmStatsRepo : subAlarmStatsRepos.values()) for (SubAlarmStats subAlarmStats : subAlarmStatsRepo.get()) { LOG.debug("Evaluating {}", subAlarmStats); @@ -186,11 +183,18 @@ public class MetricAggregationBolt extends BaseRichBolt { LOG.debug("Alarm state changed for {}", subAlarmStats); collector.emit(new Values(subAlarmStats.getSubAlarm().getAlarmId(), subAlarmStats.getSubAlarm())); - subAlarmStats.getSubAlarm().setNoState(false); } } } + /** + * Only used for testing. + * @return + */ + protected long currentTimeSeconds() { + return System.currentTimeMillis() / 1000; + } + /** * Returns an existing or newly created SubAlarmStatsRepository for the {@code metricDefinitionAndTenantId}. * Newly created SubAlarmStatsRepositories are initialized with stats whose view ends one minute @@ -207,10 +211,12 @@ public class MetricAggregationBolt extends BaseRichBolt { for (SubAlarm subAlarm : subAlarms) { // TODO should treat metric def name prefix like a namespace subAlarm.setSporadicMetric(sporadicMetricNamespaces.contains(metricDefinitionAndTenantId.metricDefinition.name)); - subAlarm.setNoState(true); } - long viewEndTimestamp = (System.currentTimeMillis() / 1000) + evaluationTimeOffset; - subAlarmStatsRepo = new SubAlarmStatsRepository(subAlarms, viewEndTimestamp); + subAlarmStatsRepo = new SubAlarmStatsRepository(); + for (SubAlarm subAlarm : subAlarms) { + long viewEndTimestamp = currentTimeSeconds() + subAlarm.getExpression().getPeriod(); + subAlarmStatsRepo.add(subAlarm, viewEndTimestamp); + } subAlarmStatsRepos.put(metricDefinitionAndTenantId, subAlarmStatsRepo); } } @@ -223,7 +229,6 @@ public class MetricAggregationBolt extends BaseRichBolt { */ void handleAlarmCreated(MetricDefinitionAndTenantId metricDefinitionAndTenantId, SubAlarm subAlarm) { LOG.debug("Received AlarmCreatedEvent for {}", subAlarm); - subAlarm.setNoState(true); addSubAlarm(metricDefinitionAndTenantId, subAlarm); } @@ -232,7 +237,7 @@ public class MetricAggregationBolt extends BaseRichBolt { if (subAlarmStatsRepo == null) return; - long viewEndTimestamp = (System.currentTimeMillis() / 1000) + evaluationTimeOffset; + long viewEndTimestamp = currentTimeSeconds() + subAlarm.getExpression().getPeriod(); subAlarmStatsRepo.add(subAlarm, viewEndTimestamp); } diff --git a/src/test/java/com/hpcloud/mon/ThresholdingEngineAlarmTest.java b/src/test/java/com/hpcloud/mon/ThresholdingEngineAlarmTest.java index 518e182..e444b10 100644 --- a/src/test/java/com/hpcloud/mon/ThresholdingEngineAlarmTest.java +++ b/src/test/java/com/hpcloud/mon/ThresholdingEngineAlarmTest.java @@ -171,15 +171,13 @@ public class ThresholdingEngineAlarmTest extends TopologyTestCase { } ) .when(alarmEventForwarder).send(anyString(), anyString(), anyString()); - int waitCount = 0; - int feedCount = 5; int goodValueCount = 0; boolean firstUpdate = true; boolean secondUpdate = true; final Alarm initialAlarm = new Alarm(TEST_ALARM_ID, TEST_ALARM_TENANT_ID, TEST_ALARM_NAME, TEST_ALARM_DESCRIPTION, expression, subAlarms, AlarmState.UNDETERMINED, Boolean.TRUE); final int expectedAlarms = expectedStates.length; - for (int i = 1; alarmsSent != expectedAlarms && i < 150; i++) { + for (int i = 1; alarmsSent != expectedAlarms && i < 300; i++) { if (i == 5) { final Map exprs = createSubExpressionMap(); final AlarmCreatedEvent event = new AlarmCreatedEvent(TEST_ALARM_TENANT_ID, TEST_ALARM_ID, TEST_ALARM_NAME, @@ -222,31 +220,20 @@ public class ThresholdingEngineAlarmTest extends TopologyTestCase { System.out.printf("Send AlarmUpdatedEvent for expression %s%n", expression.getExpression()); } - if (feedCount > 0) { + else { System.out.println("Feeding metrics..."); long time = System.currentTimeMillis() / 1000; ++goodValueCount; for (final SubAlarm subAlarm : subAlarms) { - final MetricDefinitionAndTenantId metricDefinitionAndTenantId = - new MetricDefinitionAndTenantId(subAlarm.getExpression().getMetricDefinition(), TEST_ALARM_TENANT_ID); - metricSpout.feed(new Values(metricDefinitionAndTenantId, + final MetricDefinitionAndTenantId metricDefinitionAndTenantId = + new MetricDefinitionAndTenantId(subAlarm.getExpression().getMetricDefinition(), TEST_ALARM_TENANT_ID); + metricSpout.feed(new Values(metricDefinitionAndTenantId, new Metric(metricDefinitionAndTenantId.metricDefinition, time, (double) (goodValueCount == 15 ? 1 : 555)))); } - - if (--feedCount == 0) - waitCount = 3; - - if (goodValueCount == 15) - goodValueCount = 0; - } else { - System.out.println("Waiting..."); - if (--waitCount == 0) - feedCount = 5; } - try { - Thread.sleep(1000); + Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } diff --git a/src/test/java/com/hpcloud/mon/ThresholdingEngineTest.java b/src/test/java/com/hpcloud/mon/ThresholdingEngineTest.java index f2dc723..7a0ac6a 100644 --- a/src/test/java/com/hpcloud/mon/ThresholdingEngineTest.java +++ b/src/test/java/com/hpcloud/mon/ThresholdingEngineTest.java @@ -72,7 +72,7 @@ public class ThresholdingEngineTest extends TopologyTestCase { public ThresholdingEngineTest() { // Fixtures final AlarmExpression expression = new AlarmExpression( - "max(hpcs.compute.cpu{id=5}) >= 3 or max(hpcs.compute.mem{id=5}) >= 5 times 2"); + "max(cpu{id=5}) >= 3 or max(mem{id=5}) >= 5"); cpuMetricDef = expression.getSubExpressions().get(0).getMetricDefinition(); memMetricDef = expression.getSubExpressions().get(1).getMetricDefinition(); diff --git a/src/test/java/com/hpcloud/mon/domain/model/SubAlarmStatsTest.java b/src/test/java/com/hpcloud/mon/domain/model/SubAlarmStatsTest.java index 021e97a..09b2bad 100644 --- a/src/test/java/com/hpcloud/mon/domain/model/SubAlarmStatsTest.java +++ b/src/test/java/com/hpcloud/mon/domain/model/SubAlarmStatsTest.java @@ -9,7 +9,6 @@ import org.testng.annotations.Test; import com.hpcloud.mon.common.model.alarm.AlarmState; import com.hpcloud.mon.common.model.alarm.AlarmSubExpression; -import com.hpcloud.util.time.TimeResolution; /** * @author Jonathan Halterman @@ -22,42 +21,39 @@ public class SubAlarmStatsTest { @BeforeMethod protected void beforeMethod() { - expression = AlarmSubExpression.of("avg(hpcs.compute.cpu{id=5}, 1) > 3 times 3"); + expression = AlarmSubExpression.of("avg(hpcs.compute.cpu{id=5}, 60) > 3 times 3"); subAlarm = new SubAlarm("123", "1", expression); - subAlarmStats = new SubAlarmStats(subAlarm, TimeResolution.ABSOLUTE, 4); + subAlarm.setNoState(true); + subAlarmStats = new SubAlarmStats(subAlarm, expression.getPeriod()); } public void shouldBeOkIfAnySlotsInViewAreBelowThreshold() { subAlarmStats.getStats().addValue(5, 1); - assertTrue(subAlarmStats.evaluate()); - // This went to alarm because at least one period is over the threshold, - // none are under the threshold and the others are UNDETERMINED - assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.ALARM); + assertFalse(subAlarmStats.evaluateAndSlideWindow(61)); + assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED); - subAlarmStats.getStats().addValue(1, 2); - assertTrue(subAlarmStats.evaluate()); - // This went to alarm because at least one period is under the threshold + subAlarmStats.getStats().addValue(1, 62); + assertTrue(subAlarmStats.evaluateAndSlideWindow(121)); + // This went to OK because at least one period is under the threshold assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK); - subAlarmStats.getStats().addValue(5, 3); - assertFalse(subAlarmStats.evaluate()); + subAlarmStats.getStats().addValue(5, 123); + assertFalse(subAlarmStats.evaluateAndSlideWindow(181)); // Still one under the threshold assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK); } public void shouldBeAlarmedIfAllSlotsInViewExceedThreshold() { subAlarmStats.getStats().addValue(5, 1); - subAlarmStats.getStats().addValue(5, 2); - subAlarmStats.getStats().addValue(5, 3); + assertFalse(subAlarmStats.evaluateAndSlideWindow(61)); + assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED); - assertTrue(subAlarmStats.evaluate()); - assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.ALARM); - } + subAlarmStats.getStats().addValue(5, 62); + assertFalse(subAlarmStats.evaluateAndSlideWindow(121)); + assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED); - public void shouldBeAlarmedIfAllSlotsExceedThresholdOrAreUninitialized() { - subAlarmStats.getStats().addValue(5, 1); - - assertTrue(subAlarmStats.evaluate()); + subAlarmStats.getStats().addValue(5, 123); + assertTrue(subAlarmStats.evaluateAndSlideWindow(181)); assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.ALARM); } @@ -66,32 +62,31 @@ public class SubAlarmStatsTest { */ public void shouldEvaluateAndSlideWindow() { long initialTime = 11; - subAlarmStats = new SubAlarmStats(subAlarm, TimeResolution.ABSOLUTE, initialTime); assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED); - assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 1)); - assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 1)); - assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 1)); + assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60)); + assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60)); + assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60)); // Add value and trigger OK subAlarmStats.getStats().addValue(1, initialTime - 1); - assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 1)); + assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 60)); assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK); // Slide in some values that exceed the threshold subAlarmStats.getStats().addValue(5, initialTime - 1); - assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 1)); + assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60)); subAlarmStats.getStats().addValue(5, initialTime - 1); - assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 1)); + assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60)); subAlarmStats.getStats().addValue(5, initialTime - 1); // Trigger ALARM - assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 1)); + assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 60)); assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.ALARM); // Add value and trigger OK subAlarmStats.getStats().addValue(1, initialTime - 1); - assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 1)); + assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 60)); assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK); // Must slide 9 times total from the last added value to trigger UNDETERMINED. This is @@ -99,9 +94,28 @@ public class SubAlarmStatsTest { // slides to move the value outside of the window and 6 more to exceed the observation // threshold. for (int i = 0; i < 7; i++) - assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 1)); - assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 1)); + assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60)); + assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 60)); assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED); + subAlarmStats.getStats().addValue(5, initialTime - 1); + } + + public void shouldAlarmIfAllSlotsAlarmed() { + long initialTime = 11; + + assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED); + + assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60)); + + subAlarmStats.getStats().addValue(5, initialTime - 1); + assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60)); + + subAlarmStats.getStats().addValue(5, initialTime - 1); + assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60)); + + subAlarmStats.getStats().addValue(5, initialTime - 1); + assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 60)); + assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.ALARM); } public void testEmptyWindowObservationThreshold() { @@ -110,4 +124,27 @@ public class SubAlarmStatsTest { SubAlarmStats saStats = new SubAlarmStats(subAlarm, (System.currentTimeMillis() / 1000) + 60); assertEquals(saStats.emptyWindowObservationThreshold, 6); } + + public void checkLongPeriod() { + final AlarmSubExpression subExpr = AlarmSubExpression.of("sum(hpcs.compute.mem{id=5}, 120) >= 96"); + + final SubAlarm subAlarm = new SubAlarm("42", "4242", subExpr); + + long t1 = 0; + final SubAlarmStats stats = new SubAlarmStats(subAlarm, t1 + subExpr.getPeriod()); + for (int i = 0; i < 360; i++) { + t1++; + stats.getStats().addValue(1.0, t1); + if ((t1 % 60) == 0) { + stats.evaluateAndSlideWindow(t1); + if (i <= 60) + // First check will show it is OK. You could argue that this is incorrect + // as we have not waited for the whole period so we can't really evaluate it. + // That is true for sum and count + assertEquals(stats.getSubAlarm().getState(), AlarmState.OK); + else + assertEquals(stats.getSubAlarm().getState(), AlarmState.ALARM); + } + } + } } diff --git a/src/test/java/com/hpcloud/mon/infrastructure/thresholding/MetricAggregationBoltTest.java b/src/test/java/com/hpcloud/mon/infrastructure/thresholding/MetricAggregationBoltTest.java index d721bfb..a1ca21d 100644 --- a/src/test/java/com/hpcloud/mon/infrastructure/thresholding/MetricAggregationBoltTest.java +++ b/src/test/java/com/hpcloud/mon/infrastructure/thresholding/MetricAggregationBoltTest.java @@ -8,7 +8,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.mockito.Mockito.reset; import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; @@ -50,7 +49,7 @@ import com.hpcloud.streaming.storm.Streams; @Test public class MetricAggregationBoltTest { private static final String TENANT_ID = "42"; - private MetricAggregationBolt bolt; + private MockMetricAggregationBolt bolt; private TopologyContext context; private OutputCollector collector; private List subAlarms; @@ -79,9 +78,9 @@ public class MetricAggregationBoltTest { @BeforeMethod protected void beforeMethod() { // Fixtures - subAlarm1 = new SubAlarm("123", "1", subExpr1, AlarmState.OK); - subAlarm2 = new SubAlarm("456", "1", subExpr2, AlarmState.OK); - subAlarm3 = new SubAlarm("789", "2", subExpr3, AlarmState.ALARM); + subAlarm1 = new SubAlarm("123", "1", subExpr1, AlarmState.UNDETERMINED); + subAlarm2 = new SubAlarm("456", "1", subExpr2, AlarmState.UNDETERMINED); + subAlarm3 = new SubAlarm("789", "2", subExpr3, AlarmState.UNDETERMINED); subAlarms = new ArrayList<>(); subAlarms.add(subAlarm1); subAlarms.add(subAlarm2); @@ -100,7 +99,7 @@ public class MetricAggregationBoltTest { } }); - bolt = new MetricAggregationBolt(dao); + bolt = new MockMetricAggregationBolt(dao); context = mock(TopologyContext.class); collector = mock(OutputCollector.class); bolt.prepare(null, context, collector); @@ -127,7 +126,6 @@ public class MetricAggregationBoltTest { bolt.execute(createMetricTuple(metricDef2, null)); - // Send metrics for subAlarm1 long t1 = System.currentTimeMillis() / 1000; bolt.execute(createMetricTuple(metricDef1, new Metric(metricDef1, t1, 100))); @@ -142,12 +140,13 @@ public class MetricAggregationBoltTest { assertEquals(subAlarm2.getState(), AlarmState.UNDETERMINED); assertEquals(subAlarm3.getState(), AlarmState.UNDETERMINED); - verify(collector, times(1)).emit(new Values(subAlarm2.getAlarmId(), subAlarm2)); - verify(collector, times(1)).emit(new Values(subAlarm3.getAlarmId(), subAlarm3)); + verify(collector, times(1)).emit(new Values(subAlarm1.getAlarmId(), subAlarm1)); // Have to reset the mock so it can tell the difference when subAlarm2 and subAlarm3 are emitted again. reset(collector); + // Drive subAlarm1 to ALARM bolt.execute(createMetricTuple(metricDef1, new Metric(metricDef1, t1, 99))); + // Drive subAlarm2 to ALARM and subAlarm3 to OK since they use the same MetricDefinition bolt.execute(createMetricTuple(metricDef2, new Metric(metricDef2, System.currentTimeMillis() / 1000, 94))); bolt.execute(tickTuple); verify(collector, times(1)).ack(tickTuple); @@ -161,21 +160,33 @@ public class MetricAggregationBoltTest { } public void shouldSendUndeterminedIfStateChanges() { - - assertNotEquals(AlarmState.UNDETERMINED, subAlarm2.getState()); + long t1 = System.currentTimeMillis() / 1000; + bolt.setCurrentTime(t1); bolt.execute(createMetricTuple(metricDef2, null)); + t1 += 1; + bolt.execute(createMetricTuple(metricDef2, new Metric(metricDef2, t1, 1.0))); + bolt.setCurrentTime(t1 += 60); final Tuple tickTuple = createTickTuple(); bolt.execute(tickTuple); + assertEquals(subAlarm2.getState(), AlarmState.OK); - assertEquals(AlarmState.UNDETERMINED, subAlarm2.getState()); + bolt.setCurrentTime(t1 += 60); + bolt.execute(tickTuple); + assertEquals(subAlarm2.getState(), AlarmState.OK); + verify(collector, times(1)).emit(new Values(subAlarm2.getAlarmId(), subAlarm2)); + + // Have to reset the mock so it can tell the difference when subAlarm2 is emitted again. + reset(collector); + + bolt.setCurrentTime(t1 += 60); + bolt.execute(tickTuple); + assertEquals(subAlarm2.getState(), AlarmState.UNDETERMINED); verify(collector, times(1)).emit(new Values(subAlarm2.getAlarmId(), subAlarm2)); } public void shouldSendUndeterminedOnStartup() { - subAlarm2.setNoState(true); - subAlarm2.setState(AlarmState.UNDETERMINED); bolt.execute(createMetricTuple(metricDef2, null)); final MkTupleParam tupleParam = new MkTupleParam(); @@ -191,6 +202,11 @@ public class MetricAggregationBoltTest { bolt.execute(tickTuple); verify(collector, times(2)).ack(tickTuple); + verify(collector, never()).emit(new Values(subAlarm2.getAlarmId(), subAlarm2)); + + bolt.execute(tickTuple); + verify(collector, times(3)).ack(tickTuple); + assertEquals(subAlarm2.getState(), AlarmState.UNDETERMINED); verify(collector, times(1)).emit(new Values(subAlarm2.getAlarmId(), subAlarm2)); } @@ -311,4 +327,25 @@ public class MetricAggregationBoltTest { tupleParam.setStream(Streams.DEFAULT_STREAM_ID); return Testing.testTuple(Arrays.asList(new MetricDefinitionAndTenantId(metricDef, TENANT_ID), metric), tupleParam); } + + private static class MockMetricAggregationBolt extends MetricAggregationBolt { + private static final long serialVersionUID = 1L; + + private long currentTime; + + public MockMetricAggregationBolt(SubAlarmDAO subAlarmDAO) { + super(subAlarmDAO); + } + + @Override + protected long currentTimeSeconds() { + if (currentTime != 0) + return currentTime; + return super.currentTimeSeconds(); + } + + public void setCurrentTime(long currentTime) { + this.currentTime = currentTime; + } + } }