diff --git a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/MetricSpout.java b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/MetricSpout.java index dab8952..dd83446 100644 --- a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/MetricSpout.java +++ b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/MetricSpout.java @@ -30,8 +30,10 @@ import org.apache.storm.tuple.Values; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.lang.Thread; import java.util.Collections; import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; public class MetricSpout extends KafkaSpout { @SuppressWarnings("unchecked") @@ -70,8 +72,12 @@ public class MetricSpout extends KafkaSpout { if (metric.dimensions == null) { metric.dimensions = EMPTY_DIMENSIONS; } + // get unique identifier, required for storm + final String uId = Long.toString(Thread.currentThread().getId()) + + Integer.toString(ThreadLocalRandom.current().nextInt()); + collector.emit(new Values(new TenantIdAndMetricName(tenantId, metricEnvelope.metric - .definition().name), metricEnvelope.creationTime, metric)); + .definition().name), metricEnvelope.creationTime, metric), uId); } @Override