diff --git a/src/main/java/com/hpcloud/maas/domain/model/SubAlarmStats.java b/src/main/java/com/hpcloud/maas/domain/model/SubAlarmStats.java index 4cab1de..1b3faed 100644 --- a/src/main/java/com/hpcloud/maas/domain/model/SubAlarmStats.java +++ b/src/main/java/com/hpcloud/maas/domain/model/SubAlarmStats.java @@ -46,17 +46,16 @@ public class SubAlarmStats { } /** - * Evaluates the {@link #subAlarm} for stats up to and including the {@code evaluationTimestamp}, - * updating the sub-alarm's state if necessary and sliding the window to the - * {@code slideToTimestamp}. + * Evaluates the {@link #subAlarm} for the current stats window, updating the sub-alarm's state if + * necessary and sliding the window to the {@code slideToTimestamp}. * * @return true if the alarm's state changed, else false. */ - public boolean evaluateAndSlideWindow(long evaluateTimestamp, long slideToTimestamp) { + public boolean evaluateAndSlideWindow(long slideToTimestamp) { try { - return evaluate(evaluateTimestamp); + return evaluate(); } catch (Exception e) { - LOG.error("Failed to evaluate {} for timestamp {}", this, evaluateTimestamp, e); + LOG.error("Failed to evaluate {}", this, e); return false; } finally { stats.slideViewTo(slideToTimestamp); @@ -87,15 +86,8 @@ public class SubAlarmStats { /** * @throws IllegalStateException if the {@code timestamp} is outside of the {@link #stats} window */ - boolean evaluate(long timestamp) { - double[] values = null; - - try { - values = stats.getValuesUpTo(timestamp); - } catch (IllegalStateException ignore) { - return false; - } - + boolean evaluate() { + double[] values = stats.getViewValues(); AlarmState initialState = subAlarm.getState(); boolean thresholdExceeded = false; for (double value : values) { diff --git a/src/main/java/com/hpcloud/maas/infrastructure/thresholding/MetricAggregationBolt.java b/src/main/java/com/hpcloud/maas/infrastructure/thresholding/MetricAggregationBolt.java index ec0ac84..dd9c826 100644 --- a/src/main/java/com/hpcloud/maas/infrastructure/thresholding/MetricAggregationBolt.java +++ b/src/main/java/com/hpcloud/maas/infrastructure/thresholding/MetricAggregationBolt.java @@ -58,7 +58,7 @@ public class MetricAggregationBolt extends BaseRichBolt { private DatabaseConfiguration dbConfig; private transient SubAlarmDAO subAlarmDAO; - private TopologyContext context; + private TopologyContext ctx; private OutputCollector collector; private int evaluationTimeOffset; @@ -101,7 +101,7 @@ public class MetricAggregationBolt extends BaseRichBolt { } } } catch (Exception e) { - LOG.error("{} Error processing tuple {}", context.getThisTaskId(), tuple, e); + LOG.error("{} Error processing tuple {}", ctx.getThisTaskId(), tuple, e); } finally { collector.ack(tuple); } @@ -119,7 +119,7 @@ public class MetricAggregationBolt extends BaseRichBolt { @SuppressWarnings("rawtypes") public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { LOG.info("{} Preparing {}", context.getThisTaskId(), context.getThisComponentId()); - this.context = context; + this.ctx = context; this.collector = collector; evaluationTimeOffset = Integer.valueOf(System.getProperty(TICK_TUPLE_SECONDS_KEY, "60")) .intValue(); @@ -138,14 +138,13 @@ public class MetricAggregationBolt extends BaseRichBolt { if (subAlarmStatsRepo == null || metric == null) return; - LOG.trace("{} Aggregating values for {}", context.getThisTaskId(), metric); for (SubAlarmStats stats : subAlarmStatsRepo.get()) { if (stats.getStats().addValue(metric.value, metric.timestamp)) - LOG.trace("{} Added value {} for {}. Updated {}", context.getThisTaskId(), metric.value, - metric.timestamp, stats.getStats()); + LOG.trace("{} Aggregated value {} at {} for {}. Updated {}", ctx.getThisTaskId(), + metric.value, metric.timestamp, metricDefinition, stats.getStats()); else - LOG.warn("{} Metric timestamp {} is outside of {}", context.getThisTaskId(), - metric.timestamp, stats.getStats()); + LOG.warn("{} Invalid metric timestamp {} for {}, {}", ctx.getThisTaskId(), + metric.timestamp, metricDefinition, stats.getStats()); } } @@ -155,15 +154,11 @@ public class MetricAggregationBolt extends BaseRichBolt { */ void evaluateAlarmsAndSlideWindows() { long newWindowTimestamp = System.currentTimeMillis() / 1000; - long evaluationTimestamp = newWindowTimestamp - evaluationTimeOffset; - LOG.debug("{} Evaluating alarms and advancing windows at {}", context.getThisTaskId(), - evaluationTimestamp); for (SubAlarmStatsRepository subAlarmStatsRepo : subAlarmStatsRepos.values()) for (SubAlarmStats subAlarmStats : subAlarmStatsRepo.get()) { - LOG.debug("{} Evaluating {} at timestamp {}", context.getThisTaskId(), - subAlarmStats.getSubAlarm(), evaluationTimestamp); - if (subAlarmStats.evaluateAndSlideWindow(evaluationTimestamp, newWindowTimestamp)) { - LOG.debug("{} Alarm state changed for {}", context.getThisTaskId(), subAlarmStats); + LOG.debug("{} Evaluating {}", ctx.getThisTaskId(), subAlarmStats); + if (subAlarmStats.evaluateAndSlideWindow(newWindowTimestamp)) { + LOG.debug("{} Alarm state changed for {}", ctx.getThisTaskId(), subAlarmStats); collector.emit(new Values(subAlarmStats.getSubAlarm().getAlarmId(), subAlarmStats.getSubAlarm())); } @@ -180,10 +175,10 @@ public class MetricAggregationBolt extends BaseRichBolt { if (subAlarmStatsRepo == null) { List subAlarms = subAlarmDAO.find(metricDefinition); if (subAlarms.isEmpty()) - LOG.warn("{} Failed to find sub alarms for {}", context.getThisTaskId(), metricDefinition); + LOG.warn("{} Failed to find sub alarms for {}", ctx.getThisTaskId(), metricDefinition); else { long viewEndTimestamp = (System.currentTimeMillis() / 1000) + evaluationTimeOffset; - LOG.debug("{} Creating SubAlarmStats for {}", context.getThisTaskId(), subAlarms); + LOG.debug("{} Creating SubAlarmStats for {}", ctx.getThisTaskId(), metricDefinition); subAlarmStatsRepo = new SubAlarmStatsRepository(subAlarms, viewEndTimestamp); subAlarmStatsRepos.put(metricDefinition, subAlarmStatsRepo); } @@ -196,7 +191,7 @@ public class MetricAggregationBolt extends BaseRichBolt { * Adds the {@code subAlarm} subAlarmStatsRepo for the {@code metricDefinition}. */ void handleAlarmCreated(MetricDefinition metricDefinition, SubAlarm subAlarm) { - LOG.debug("{} Received AlarmCreatedEvent for {}", context.getThisTaskId(), subAlarm); + LOG.debug("{} Received AlarmCreatedEvent for {}", ctx.getThisTaskId(), subAlarm); SubAlarmStatsRepository subAlarmStatsRepo = getOrCreateSubAlarmStatsRepo(metricDefinition); if (subAlarmStatsRepo == null) return; @@ -210,8 +205,7 @@ public class MetricAggregationBolt extends BaseRichBolt { * {@code metricDefinition}. */ void handleAlarmDeleted(MetricDefinition metricDefinition, String subAlarmId) { - LOG.debug("{} Received AlarmDeletedEvent for subAlarm id {}", context.getThisTaskId(), - subAlarmId); + LOG.debug("{} Received AlarmDeletedEvent for subAlarm id {}", ctx.getThisTaskId(), subAlarmId); SubAlarmStatsRepository subAlarmStatsRepo = subAlarmStatsRepos.get(metricDefinition); if (subAlarmStatsRepo != null) subAlarmStatsRepo.remove(subAlarmId); diff --git a/src/test/java/com/hpcloud/maas/domain/model/SubAlarmStatsTest.java b/src/test/java/com/hpcloud/maas/domain/model/SubAlarmStatsTest.java index 9cd81d1..d47030e 100644 --- a/src/test/java/com/hpcloud/maas/domain/model/SubAlarmStatsTest.java +++ b/src/test/java/com/hpcloud/maas/domain/model/SubAlarmStatsTest.java @@ -32,7 +32,7 @@ public class SubAlarmStatsTest { subAlarmStats.getStats().addValue(1, 2); subAlarmStats.getStats().addValue(5, 3); - assertTrue(subAlarmStats.evaluate(3)); + assertTrue(subAlarmStats.evaluate()); assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK); } @@ -41,14 +41,14 @@ public class SubAlarmStatsTest { subAlarmStats.getStats().addValue(5, 2); subAlarmStats.getStats().addValue(5, 3); - assertTrue(subAlarmStats.evaluate(3)); + assertTrue(subAlarmStats.evaluate()); assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.ALARM); } public void shouldBeAlarmedIfAllSlotsExceedThresholdOrAreUninitialized() { subAlarmStats.getStats().addValue(5, 1); - assertTrue(subAlarmStats.evaluate(3)); + assertTrue(subAlarmStats.evaluate()); assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.ALARM); } @@ -60,29 +60,29 @@ public class SubAlarmStatsTest { subAlarmStats = new SubAlarmStats(subAlarm, TimeResolution.ABSOLUTE, initialTime); assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED); - assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime, initialTime += 1)); - assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime, initialTime += 1)); - assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime, initialTime += 1)); + assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 1)); + assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 1)); + assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 1)); // Add value and trigger OK subAlarmStats.getStats().addValue(1, initialTime - 1); - assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime, initialTime += 1)); + assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 1)); assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK); // Slide in some values that exceed the threshold subAlarmStats.getStats().addValue(5, initialTime - 1); - assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime, initialTime += 1)); + assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 1)); subAlarmStats.getStats().addValue(5, initialTime - 1); - assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime, initialTime += 1)); + assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 1)); subAlarmStats.getStats().addValue(5, initialTime - 1); // Trigger ALARM - assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime, initialTime += 1)); + assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 1)); assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.ALARM); // Add value and trigger OK subAlarmStats.getStats().addValue(1, initialTime - 1); - assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime, initialTime += 1)); + assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 1)); assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK); // Must slide 9 times total from the last added value to trigger UNDETERMINED. This is @@ -90,8 +90,8 @@ public class SubAlarmStatsTest { // slides to move the value outside of the window and 6 more to exceed the observation // threshold. for (int i = 0; i < 7; i++) - assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime, initialTime += 1)); - assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime, initialTime += 1)); + assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 1)); + assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 1)); assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED); }