Merge "Enable monasca-thresh to set topology.max.spout.pending f. Storm"
This commit is contained in:
commit
b1409c391f
@ -30,8 +30,10 @@ import org.apache.storm.tuple.Values;
|
|||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.lang.Thread;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
|
||||||
public class MetricSpout extends KafkaSpout {
|
public class MetricSpout extends KafkaSpout {
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@ -70,8 +72,12 @@ public class MetricSpout extends KafkaSpout {
|
|||||||
if (metric.dimensions == null) {
|
if (metric.dimensions == null) {
|
||||||
metric.dimensions = EMPTY_DIMENSIONS;
|
metric.dimensions = EMPTY_DIMENSIONS;
|
||||||
}
|
}
|
||||||
|
// get unique identifier, required for storm
|
||||||
|
final String uId = Long.toString(Thread.currentThread().getId())
|
||||||
|
+ Integer.toString(ThreadLocalRandom.current().nextInt());
|
||||||
|
|
||||||
collector.emit(new Values(new TenantIdAndMetricName(tenantId, metricEnvelope.metric
|
collector.emit(new Values(new TenantIdAndMetricName(tenantId, metricEnvelope.metric
|
||||||
.definition().name), metricEnvelope.creationTime, metric));
|
.definition().name), metricEnvelope.creationTime, metric), uId);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
Loading…
x
Reference in New Issue
Block a user