diff --git a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/MetricAggregationBolt.java b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/MetricAggregationBolt.java index 2ec9a73..fe7a47a 100644 --- a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/MetricAggregationBolt.java +++ b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/MetricAggregationBolt.java @@ -219,8 +219,24 @@ public class MetricAggregationBolt extends BaseRichBolt { private void sendSubAlarmStateChange(SubAlarmStats subAlarmStats) { logger.debug("Alarm state changed for {}", subAlarmStats); - collector.emit(new Values(subAlarmStats.getSubAlarm().getAlarmId(), subAlarmStats - .getSubAlarm())); + collector.emit(new Values(subAlarmStats.getSubAlarm().getAlarmId(), duplicate(subAlarmStats + .getSubAlarm()))); + } + + /** + * Create a copy of SubAlarm to prevent ConcurrentModificationExceptions thrown by Storm. + * The AlarmSubExpression is not immutable, but since it is only replaced in this Bolt, + * it can be reused + * @param original SubAlarm to be duplicated + * @return copy of original + */ + public SubAlarm duplicate(final SubAlarm original) { + final SubAlarm newSubAlarm = + new SubAlarm(original.getId(), original.getAlarmId(), new SubExpression( + original.getAlarmSubExpressionId(), original.getExpression()), original.getState()); + newSubAlarm.setNoState(original.isNoState()); + newSubAlarm.setSporadicMetric(original.isSporadicMetric()); + return newSubAlarm; } /**