diff --git a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/MetricAggregationBolt.java b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/MetricAggregationBolt.java index 5bec0e0..060781e 100644 --- a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/MetricAggregationBolt.java +++ b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/MetricAggregationBolt.java @@ -42,7 +42,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -73,6 +72,7 @@ public class MetricAggregationBolt extends BaseRichBolt { public static final String METRIC_AGGREGATION_CONTROL_STREAM = "MetricAggregationControl"; public static final String[] METRIC_AGGREGATION_CONTROL_FIELDS = new String[] {"directive"}; public static final String METRICS_BEHIND = "MetricsBehind"; + private static final int MAX_SAVED_METRIC_AGE_SECONDS = 10; private final ThresholdingConfiguration config; final Map metricDefToSubAlarmStatsRepos = @@ -81,10 +81,9 @@ public class MetricAggregationBolt extends BaseRichBolt { private final Map subAlarmToSubAlarmStats = new HashMap<>(); private transient Logger logger; - /** Namespaces for which metrics are received sporadically */ - private Set sporadicMetricNamespaces = Collections.emptySet(); private OutputCollector collector; private boolean upToDate = true; + private Map savedMetrics = new HashMap<>(); public MetricAggregationBolt(ThresholdingConfiguration config) { this.config = config; @@ -176,12 +175,15 @@ public class MetricAggregationBolt extends BaseRichBolt { void aggregateValues(MetricDefinitionAndTenantId metricDefinitionAndTenantId, Metric metric) { SubAlarmStatsRepository subAlarmStatsRepo = getOrCreateSubAlarmStatsRepo(metricDefinitionAndTenantId); - if (subAlarmStatsRepo == null || metric == null) { + if (subAlarmStatsRepo == null) { + // This is probably the metric that will cause the creation of a new SubAlarm, save it until + // the SubAlarm comes in + savedMetrics.put(metricDefinitionAndTenantId, metric); return; } for (SubAlarmStats stats : subAlarmStatsRepo.get()) { - long timestamp_secs = metric.timestamp/1000; + final long timestamp_secs = metricTimestampInSeconds(metric); if (stats.getStats().addValue(metric.value, timestamp_secs)) { logger.trace("Aggregated value {} at {} for {}. Updated {}", metric.value, metric.timestamp, metricDefinitionAndTenantId, stats.getStats()); @@ -196,6 +198,16 @@ public class MetricAggregationBolt extends BaseRichBolt { } } + /** + * Return the Metric's timestamp in seconds + * + * @param metric Metric to use + * @return Metric's timestamp in seconds + */ + private long metricTimestampInSeconds(Metric metric) { + return metric.timestamp/1000; + } + /** * Evaluates all SubAlarms for all SubAlarmStatsRepositories using an evaluation time of 1 minute * ago, then sliding the window to the current time. @@ -217,6 +229,38 @@ public class MetricAggregationBolt extends BaseRichBolt { logger.info("Did not evaluate SubAlarms because Metrics are not up to date"); upToDate = true; } + cleanSavedMetrics(); + } + + /** + * Clean saved metrics since the SubAlarm should show up within seconds of + * the metric being received + */ + private void cleanSavedMetrics() { + if (savedMetrics.isEmpty()) { + return; + } + final List toRemove = new ArrayList<>(); + for (Map.Entry entry: savedMetrics.entrySet()) { + if (savedMetricTooOld(entry.getValue())) { + toRemove.add(entry.getKey()); + } + } + logger.debug("Removing {} too old saved metrics", toRemove.size()); + for (MetricDefinitionAndTenantId mdtid : toRemove) { + savedMetrics.remove(mdtid); + } + } + + /** + * Check if a save Metric is too old + * @param Metric to check + * @return true if saved Metric is too old, false otherwise + */ + private boolean savedMetricTooOld(final Metric metric) { + final long now = currentTimeSeconds(); + final long age = metricTimestampInSeconds(metric) - now; + return age > MAX_SAVED_METRIC_AGE_SECONDS; } private void sendSubAlarmStateChange(SubAlarmStats subAlarmStats) { @@ -286,7 +330,19 @@ public class MetricAggregationBolt extends BaseRichBolt { */ void handleAlarmCreated(MetricDefinitionAndTenantId metricDefinitionAndTenantId, SubAlarm subAlarm) { logger.info("Received AlarmCreatedEvent for {}", subAlarm); - addSubAlarm(metricDefinitionAndTenantId, subAlarm); + final SubAlarmStats newStats = addSubAlarm(metricDefinitionAndTenantId, subAlarm); + // See if we have a saved metric for this SubAlarm. Add to the SubAlarm if we do. + // Because the Metric comes directly from the MetricFilterinBolt but the + // SubAlarm comes from the AlarmCreationBolt, it is very likely that the + // Metric arrives first + final Metric metric = savedMetrics.get(metricDefinitionAndTenantId); + if (metric != null && !savedMetricTooOld(metric)) { + aggregateValues(metricDefinitionAndTenantId, metric); + logger.trace("Aggregated saved value {} at {} for {}. Updated {}", metric.value, + metric.timestamp, metricDefinitionAndTenantId, newStats.getStats()); + // The metric is not deleted from savedMetrics because it is possible that + // the metric fits into two different SubAlarms. Not likely, but possible + } } void handleAlarmResend(MetricDefinitionAndTenantId metricDefinitionAndTenantId, @@ -319,7 +375,7 @@ public class MetricAggregationBolt extends BaseRichBolt { return oldSubAlarmStats; } - private void addSubAlarm(MetricDefinitionAndTenantId metricDefinitionAndTenantId, + private SubAlarmStats addSubAlarm(MetricDefinitionAndTenantId metricDefinitionAndTenantId, SubAlarm subAlarm) { SubAlarmStats subAlarmStats = subAlarmToSubAlarmStats.get(subAlarm.getId()); if (subAlarmStats == null) { @@ -334,6 +390,7 @@ public class MetricAggregationBolt extends BaseRichBolt { metricDefToSubAlarmStatsRepos.put(metricDefinitionAndTenantId, subAlarmStatsRepo); } subAlarmStatsRepo.add(subAlarm.getId(), subAlarmStats); + return subAlarmStats; } protected boolean subAlarmRemoved(final String subAlarmId, MetricDefinitionAndTenantId metricDefinitionAndTenantId) { diff --git a/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/MetricAggregationBoltTest.java b/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/MetricAggregationBoltTest.java index 894d7e5..e32c583 100644 --- a/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/MetricAggregationBoltTest.java +++ b/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/MetricAggregationBoltTest.java @@ -308,6 +308,34 @@ public class MetricAggregationBoltTest { verify(collector, never()).emit(new Values(subAlarm4.getAlarmId(), subAlarm4)); } + public void shouldImmediatelyGoToAlarmUsingSavedMetric() { + long t1 = 170000; + bolt.setCurrentTime(t1); + bolt.execute(createMetricTuple(metricDef1, new Metric(metricDef2, t1 + 1000, 100000, null))); + bolt.execute(createMetricTuple(metricDef3, new Metric(metricDef3, t1 + 1000, 100000, null))); + bolt.execute(createMetricTuple(metricDef4, new Metric(metricDef4, t1 + 1000, 100000, null))); + + sendSubAlarmCreated(metricDef2, subAlarm2); + sendSubAlarmCreated(metricDef3, subAlarm3); + sendSubAlarmCreated(metricDef4, subAlarm4); + // Since the alarmDef4 expression is a count, we need to send more metrics to drive it to ALARM, + // but it won't transition without the saved one + assertEquals(subAlarm4.getState(), AlarmState.OK); + bolt.execute(createMetricTuple(metricDef4, new Metric(metricDef4, t1 + 1001, 100000, null))); + bolt.execute(createMetricTuple(metricDef4, new Metric(metricDef4, t1 + 1002, 100000, null))); + bolt.execute(createMetricTuple(metricDef4, new Metric(metricDef4, t1 + 1003, 100000, null))); + bolt.execute(createMetricTuple(metricDef4, new Metric(metricDef4, t1 + 1004, 100000, null))); + + // subAlarm2 is AVG so it can't be evaluated immediately like the MAX for subalarm3 + assertEquals(subAlarm2.getState(), AlarmState.UNDETERMINED); + assertEquals(subAlarm3.getState(), AlarmState.ALARM); + assertEquals(subAlarm4.getState(), AlarmState.ALARM); + + verify(collector, never()).emit(new Values(subAlarm2.getAlarmId(), subAlarm2)); + verify(collector, times(1)).emit(new Values(subAlarm3.getAlarmId(), subAlarm3)); + verify(collector, times(1)).emit(new Values(subAlarm4.getAlarmId(), subAlarm4)); + } + private void sendTickTuple() { final Tuple tickTuple = createTickTuple(); bolt.execute(tickTuple); @@ -397,12 +425,12 @@ public class MetricAggregationBoltTest { verify(collector, times(1)).emit(new Values(subAlarm2.getAlarmId(), subAlarm2)); } - public void shouldSendOkAfterAlarmIfNoMetrics() { + public void shouldSendAlarmIfMetricBeforeSubAlarm() { long t1 = 50000; - bolt.setCurrentTime(t1); - sendSubAlarmCreated(metricDef4, subAlarm4); bolt.execute(createMetricTuple(metricDef4, new Metric(metricDef4, t1, 1.0, null))); t1 += 1000; + bolt.setCurrentTime(t1); + sendSubAlarmCreated(metricDef4, subAlarm4); bolt.execute(createMetricTuple(metricDef4, new Metric(metricDef4, t1, 1.0, null))); t1 += 1000; bolt.execute(createMetricTuple(metricDef4, new Metric(metricDef4, t1, 1.0, null))); @@ -415,6 +443,14 @@ public class MetricAggregationBoltTest { sendTickTuple(); assertEquals(subAlarm4.getState(), AlarmState.ALARM); verify(collector, times(1)).emit(new Values(subAlarm4.getAlarmId(), subAlarm4)); + } + + public void shouldSendOkAfterAlarmIfNoMetrics() { + long t1 = 50000; + bolt.setCurrentTime(t1); + sendSubAlarmCreated(metricDef4, subAlarm4); + bolt.execute(createMetricTuple(metricDef4, new Metric(metricDef4, t1, 1.0, null))); + t1 += 1000; // Have to reset the mock so it can tell the difference when subAlarm4 is emitted again. reset(collector);