Simplify subalarm evaluation to only use slidingwindow values that are in the view.

This commit is contained in:
Jonathan Halterman 2013-05-09 11:43:23 -07:00
parent 2ec6c4dc1c
commit 0ac7be857e
3 changed files with 34 additions and 48 deletions

View File

@ -46,17 +46,16 @@ public class SubAlarmStats {
} }
/** /**
* Evaluates the {@link #subAlarm} for stats up to and including the {@code evaluationTimestamp}, * Evaluates the {@link #subAlarm} for the current stats window, updating the sub-alarm's state if
* updating the sub-alarm's state if necessary and sliding the window to the * necessary and sliding the window to the {@code slideToTimestamp}.
* {@code slideToTimestamp}.
* *
* @return true if the alarm's state changed, else false. * @return true if the alarm's state changed, else false.
*/ */
public boolean evaluateAndSlideWindow(long evaluateTimestamp, long slideToTimestamp) { public boolean evaluateAndSlideWindow(long slideToTimestamp) {
try { try {
return evaluate(evaluateTimestamp); return evaluate();
} catch (Exception e) { } catch (Exception e) {
LOG.error("Failed to evaluate {} for timestamp {}", this, evaluateTimestamp, e); LOG.error("Failed to evaluate {}", this, e);
return false; return false;
} finally { } finally {
stats.slideViewTo(slideToTimestamp); stats.slideViewTo(slideToTimestamp);
@ -87,15 +86,8 @@ public class SubAlarmStats {
/** /**
* @throws IllegalStateException if the {@code timestamp} is outside of the {@link #stats} window * @throws IllegalStateException if the {@code timestamp} is outside of the {@link #stats} window
*/ */
boolean evaluate(long timestamp) { boolean evaluate() {
double[] values = null; double[] values = stats.getViewValues();
try {
values = stats.getValuesUpTo(timestamp);
} catch (IllegalStateException ignore) {
return false;
}
AlarmState initialState = subAlarm.getState(); AlarmState initialState = subAlarm.getState();
boolean thresholdExceeded = false; boolean thresholdExceeded = false;
for (double value : values) { for (double value : values) {

View File

@ -58,7 +58,7 @@ public class MetricAggregationBolt extends BaseRichBolt {
private DatabaseConfiguration dbConfig; private DatabaseConfiguration dbConfig;
private transient SubAlarmDAO subAlarmDAO; private transient SubAlarmDAO subAlarmDAO;
private TopologyContext context; private TopologyContext ctx;
private OutputCollector collector; private OutputCollector collector;
private int evaluationTimeOffset; private int evaluationTimeOffset;
@ -101,7 +101,7 @@ public class MetricAggregationBolt extends BaseRichBolt {
} }
} }
} catch (Exception e) { } catch (Exception e) {
LOG.error("{} Error processing tuple {}", context.getThisTaskId(), tuple, e); LOG.error("{} Error processing tuple {}", ctx.getThisTaskId(), tuple, e);
} finally { } finally {
collector.ack(tuple); collector.ack(tuple);
} }
@ -119,7 +119,7 @@ public class MetricAggregationBolt extends BaseRichBolt {
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
LOG.info("{} Preparing {}", context.getThisTaskId(), context.getThisComponentId()); LOG.info("{} Preparing {}", context.getThisTaskId(), context.getThisComponentId());
this.context = context; this.ctx = context;
this.collector = collector; this.collector = collector;
evaluationTimeOffset = Integer.valueOf(System.getProperty(TICK_TUPLE_SECONDS_KEY, "60")) evaluationTimeOffset = Integer.valueOf(System.getProperty(TICK_TUPLE_SECONDS_KEY, "60"))
.intValue(); .intValue();
@ -138,14 +138,13 @@ public class MetricAggregationBolt extends BaseRichBolt {
if (subAlarmStatsRepo == null || metric == null) if (subAlarmStatsRepo == null || metric == null)
return; return;
LOG.trace("{} Aggregating values for {}", context.getThisTaskId(), metric);
for (SubAlarmStats stats : subAlarmStatsRepo.get()) { for (SubAlarmStats stats : subAlarmStatsRepo.get()) {
if (stats.getStats().addValue(metric.value, metric.timestamp)) if (stats.getStats().addValue(metric.value, metric.timestamp))
LOG.trace("{} Added value {} for {}. Updated {}", context.getThisTaskId(), metric.value, LOG.trace("{} Aggregated value {} at {} for {}. Updated {}", ctx.getThisTaskId(),
metric.timestamp, stats.getStats()); metric.value, metric.timestamp, metricDefinition, stats.getStats());
else else
LOG.warn("{} Metric timestamp {} is outside of {}", context.getThisTaskId(), LOG.warn("{} Invalid metric timestamp {} for {}, {}", ctx.getThisTaskId(),
metric.timestamp, stats.getStats()); metric.timestamp, metricDefinition, stats.getStats());
} }
} }
@ -155,15 +154,11 @@ public class MetricAggregationBolt extends BaseRichBolt {
*/ */
void evaluateAlarmsAndSlideWindows() { void evaluateAlarmsAndSlideWindows() {
long newWindowTimestamp = System.currentTimeMillis() / 1000; long newWindowTimestamp = System.currentTimeMillis() / 1000;
long evaluationTimestamp = newWindowTimestamp - evaluationTimeOffset;
LOG.debug("{} Evaluating alarms and advancing windows at {}", context.getThisTaskId(),
evaluationTimestamp);
for (SubAlarmStatsRepository subAlarmStatsRepo : subAlarmStatsRepos.values()) for (SubAlarmStatsRepository subAlarmStatsRepo : subAlarmStatsRepos.values())
for (SubAlarmStats subAlarmStats : subAlarmStatsRepo.get()) { for (SubAlarmStats subAlarmStats : subAlarmStatsRepo.get()) {
LOG.debug("{} Evaluating {} at timestamp {}", context.getThisTaskId(), LOG.debug("{} Evaluating {}", ctx.getThisTaskId(), subAlarmStats);
subAlarmStats.getSubAlarm(), evaluationTimestamp); if (subAlarmStats.evaluateAndSlideWindow(newWindowTimestamp)) {
if (subAlarmStats.evaluateAndSlideWindow(evaluationTimestamp, newWindowTimestamp)) { LOG.debug("{} Alarm state changed for {}", ctx.getThisTaskId(), subAlarmStats);
LOG.debug("{} Alarm state changed for {}", context.getThisTaskId(), subAlarmStats);
collector.emit(new Values(subAlarmStats.getSubAlarm().getAlarmId(), collector.emit(new Values(subAlarmStats.getSubAlarm().getAlarmId(),
subAlarmStats.getSubAlarm())); subAlarmStats.getSubAlarm()));
} }
@ -180,10 +175,10 @@ public class MetricAggregationBolt extends BaseRichBolt {
if (subAlarmStatsRepo == null) { if (subAlarmStatsRepo == null) {
List<SubAlarm> subAlarms = subAlarmDAO.find(metricDefinition); List<SubAlarm> subAlarms = subAlarmDAO.find(metricDefinition);
if (subAlarms.isEmpty()) if (subAlarms.isEmpty())
LOG.warn("{} Failed to find sub alarms for {}", context.getThisTaskId(), metricDefinition); LOG.warn("{} Failed to find sub alarms for {}", ctx.getThisTaskId(), metricDefinition);
else { else {
long viewEndTimestamp = (System.currentTimeMillis() / 1000) + evaluationTimeOffset; long viewEndTimestamp = (System.currentTimeMillis() / 1000) + evaluationTimeOffset;
LOG.debug("{} Creating SubAlarmStats for {}", context.getThisTaskId(), subAlarms); LOG.debug("{} Creating SubAlarmStats for {}", ctx.getThisTaskId(), metricDefinition);
subAlarmStatsRepo = new SubAlarmStatsRepository(subAlarms, viewEndTimestamp); subAlarmStatsRepo = new SubAlarmStatsRepository(subAlarms, viewEndTimestamp);
subAlarmStatsRepos.put(metricDefinition, subAlarmStatsRepo); subAlarmStatsRepos.put(metricDefinition, subAlarmStatsRepo);
} }
@ -196,7 +191,7 @@ public class MetricAggregationBolt extends BaseRichBolt {
* Adds the {@code subAlarm} subAlarmStatsRepo for the {@code metricDefinition}. * Adds the {@code subAlarm} subAlarmStatsRepo for the {@code metricDefinition}.
*/ */
void handleAlarmCreated(MetricDefinition metricDefinition, SubAlarm subAlarm) { void handleAlarmCreated(MetricDefinition metricDefinition, SubAlarm subAlarm) {
LOG.debug("{} Received AlarmCreatedEvent for {}", context.getThisTaskId(), subAlarm); LOG.debug("{} Received AlarmCreatedEvent for {}", ctx.getThisTaskId(), subAlarm);
SubAlarmStatsRepository subAlarmStatsRepo = getOrCreateSubAlarmStatsRepo(metricDefinition); SubAlarmStatsRepository subAlarmStatsRepo = getOrCreateSubAlarmStatsRepo(metricDefinition);
if (subAlarmStatsRepo == null) if (subAlarmStatsRepo == null)
return; return;
@ -210,8 +205,7 @@ public class MetricAggregationBolt extends BaseRichBolt {
* {@code metricDefinition}. * {@code metricDefinition}.
*/ */
void handleAlarmDeleted(MetricDefinition metricDefinition, String subAlarmId) { void handleAlarmDeleted(MetricDefinition metricDefinition, String subAlarmId) {
LOG.debug("{} Received AlarmDeletedEvent for subAlarm id {}", context.getThisTaskId(), LOG.debug("{} Received AlarmDeletedEvent for subAlarm id {}", ctx.getThisTaskId(), subAlarmId);
subAlarmId);
SubAlarmStatsRepository subAlarmStatsRepo = subAlarmStatsRepos.get(metricDefinition); SubAlarmStatsRepository subAlarmStatsRepo = subAlarmStatsRepos.get(metricDefinition);
if (subAlarmStatsRepo != null) if (subAlarmStatsRepo != null)
subAlarmStatsRepo.remove(subAlarmId); subAlarmStatsRepo.remove(subAlarmId);

View File

@ -32,7 +32,7 @@ public class SubAlarmStatsTest {
subAlarmStats.getStats().addValue(1, 2); subAlarmStats.getStats().addValue(1, 2);
subAlarmStats.getStats().addValue(5, 3); subAlarmStats.getStats().addValue(5, 3);
assertTrue(subAlarmStats.evaluate(3)); assertTrue(subAlarmStats.evaluate());
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK); assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK);
} }
@ -41,14 +41,14 @@ public class SubAlarmStatsTest {
subAlarmStats.getStats().addValue(5, 2); subAlarmStats.getStats().addValue(5, 2);
subAlarmStats.getStats().addValue(5, 3); subAlarmStats.getStats().addValue(5, 3);
assertTrue(subAlarmStats.evaluate(3)); assertTrue(subAlarmStats.evaluate());
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.ALARM); assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.ALARM);
} }
public void shouldBeAlarmedIfAllSlotsExceedThresholdOrAreUninitialized() { public void shouldBeAlarmedIfAllSlotsExceedThresholdOrAreUninitialized() {
subAlarmStats.getStats().addValue(5, 1); subAlarmStats.getStats().addValue(5, 1);
assertTrue(subAlarmStats.evaluate(3)); assertTrue(subAlarmStats.evaluate());
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.ALARM); assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.ALARM);
} }
@ -60,29 +60,29 @@ public class SubAlarmStatsTest {
subAlarmStats = new SubAlarmStats(subAlarm, TimeResolution.ABSOLUTE, initialTime); subAlarmStats = new SubAlarmStats(subAlarm, TimeResolution.ABSOLUTE, initialTime);
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED); assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime, initialTime += 1)); assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 1));
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime, initialTime += 1)); assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 1));
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime, initialTime += 1)); assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 1));
// Add value and trigger OK // Add value and trigger OK
subAlarmStats.getStats().addValue(1, initialTime - 1); subAlarmStats.getStats().addValue(1, initialTime - 1);
assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime, initialTime += 1)); assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 1));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK); assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK);
// Slide in some values that exceed the threshold // Slide in some values that exceed the threshold
subAlarmStats.getStats().addValue(5, initialTime - 1); subAlarmStats.getStats().addValue(5, initialTime - 1);
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime, initialTime += 1)); assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 1));
subAlarmStats.getStats().addValue(5, initialTime - 1); subAlarmStats.getStats().addValue(5, initialTime - 1);
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime, initialTime += 1)); assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 1));
subAlarmStats.getStats().addValue(5, initialTime - 1); subAlarmStats.getStats().addValue(5, initialTime - 1);
// Trigger ALARM // Trigger ALARM
assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime, initialTime += 1)); assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 1));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.ALARM); assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.ALARM);
// Add value and trigger OK // Add value and trigger OK
subAlarmStats.getStats().addValue(1, initialTime - 1); subAlarmStats.getStats().addValue(1, initialTime - 1);
assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime, initialTime += 1)); assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 1));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK); assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK);
// Must slide 9 times total from the last added value to trigger UNDETERMINED. This is // Must slide 9 times total from the last added value to trigger UNDETERMINED. This is
@ -90,8 +90,8 @@ public class SubAlarmStatsTest {
// slides to move the value outside of the window and 6 more to exceed the observation // slides to move the value outside of the window and 6 more to exceed the observation
// threshold. // threshold.
for (int i = 0; i < 7; i++) for (int i = 0; i < 7; i++)
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime, initialTime += 1)); assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 1));
assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime, initialTime += 1)); assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 1));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED); assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
} }