From e980ea71e098314c55a9c3da23a4f20f32137fe7 Mon Sep 17 00:00:00 2001 From: bandorf Date: Wed, 22 May 2019 14:40:22 +0200 Subject: [PATCH] Enable monasca-thresh to set topology.max.spout.pending f. Storm topology.max.spout.pending allows to limit the number of concurrent entries sent from spout to worker(s). However, this requires the usage of a unique id when sending messages (emit). Change-Id: I907a4574b80e7c3347ba6a9f12c7836767dc3dd7 Story: 2005471 Task: 30550 --- .../thresh/infrastructure/thresholding/MetricSpout.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/MetricSpout.java b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/MetricSpout.java index dab8952..dd83446 100644 --- a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/MetricSpout.java +++ b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/MetricSpout.java @@ -30,8 +30,10 @@ import org.apache.storm.tuple.Values; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.lang.Thread; import java.util.Collections; import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; public class MetricSpout extends KafkaSpout { @SuppressWarnings("unchecked") @@ -70,8 +72,12 @@ public class MetricSpout extends KafkaSpout { if (metric.dimensions == null) { metric.dimensions = EMPTY_DIMENSIONS; } + // get unique identifier, required for storm + final String uId = Long.toString(Thread.currentThread().getId()) + + Integer.toString(ThreadLocalRandom.current().nextInt()); + collector.emit(new Values(new TenantIdAndMetricName(tenantId, metricEnvelope.metric - .definition().name), metricEnvelope.creationTime, metric)); + .definition().name), metricEnvelope.creationTime, metric), uId); } @Override