Merge "Added a whitelist for restricting the StatsD metrics"

This commit is contained in:
Jenkins 2015-09-17 20:52:10 +00:00 committed by Gerrit Code Review
commit 84eddb58da
4 changed files with 525 additions and 56 deletions

View File

@ -1,14 +1,6 @@
metricSpoutThreads: 2 metricSpoutThreads: 2
metricSpoutTasks: 2 metricSpoutTasks: 2
statsdConfig:
host: localhost
port: 8125
prefix: monasca.storm.
dimensions: !!map
service : monitoring
component : storm
metricSpoutConfig: metricSpoutConfig:
maxWaitTime: 500 maxWaitTime: 500
@ -80,9 +72,9 @@ kafkaProducerConfig:
requestRequiredAcks: 1 requestRequiredAcks: 1
requestTimeoutMs: 10000 requestTimeoutMs: 10000
producerType: sync producerType: sync
keySerializerClass: keySerializerClass:
compressionCodec: none compressionCodec: none
compressedTopics: compressedTopics:
messageSendMaxRetries: 3 messageSendMaxRetries: 3
retryBackoffMs: 100 retryBackoffMs: 100
topicMetadataRefreshIntervalMs: 600000 topicMetadataRefreshIntervalMs: 600000
@ -117,3 +109,376 @@ database:
maxSize: 41 maxSize: 41
statsdConfig:
host: localhost
port: 8125
debugmetrics: false
dimensions: !!map
service : monitoring
component : storm
whitelist: !!seq
- aggregation-bolt.execute-count.filtering-bolt_alarm-creation-stream
- aggregation-bolt.execute-count.filtering-bolt_default
- aggregation-bolt.execute-count.system_tick
- filtering-bolt.execute-count.event-bolt_metric-alarm-events
- filtering-bolt.execute-count.metrics-spout_default
- thresholding-bolt.execute-count.aggregation-bolt_default
- thresholding-bolt.execute-count.event-bolt_alarm-definition-events
- system.memory_heap.committedBytes
- system.memory_nonHeap.committedBytes
- system.newWorkerEvent
- system.startTimeSecs
- system.GC_ConcurrentMarkSweep.timeMs
metricmap: !!map
acker.emit-count.metrics :
monasca_threshold.acker.emit-count.metrics
acker.receive.capacity :
monasca_threshold.acker.receive.capacity
acker.receive.population :
monasca_threshold.acker.receive.population
acker.receive.read_pos :
monasca_threshold.acker.receive.read_pos
acker.receive.write_pos :
monasca_threshold.acker.receive.write_pos
acker.sendqueue.capacity :
monasca_threshold.acker.sendqueue.capacity
acker.sendqueue.population :
monasca_threshold.acker.sendqueue.population
acker.sendqueue.read_pos :
monasca_threshold.acker.sendqueue.read_pos
acker.sendqueue.write_pos :
monasca_threshold.acker.sendqueue.write_pos
acker.transfer-count.metrics :
monasca_threshold.acker.transfer-count.metrics
aggregation-bolt.ack-count.alarm-creation-bolt_alarm-creation-stream :
monasca_threshold.aggregation-bolt.ack-count.alarm-creation-bolt_alarm-creation-stream
aggregation-bolt.ack-count.event-bolt_metric-sub-alarm-events :
monasca_threshold.aggregation-bolt.ack-count.event-bolt_metric-sub-alarm-events
aggregation-bolt.ack-count.filtering-bolt_default :
monasca_threshold.aggregation-bolt.ack-count.filtering-bolt_default
aggregation-bolt.ack-count.system_tick :
monasca_threshold.aggregation-bolt.ack-count.system_tick
aggregation-bolt.emit-count.default :
monasca_threshold.aggregation-bolt.emit-count.default
aggregation-bolt.emit-count.metrics :
monasca_threshold.aggregation-bolt.emit-count.metrics
aggregation-bolt.emit-count.system :
monasca_threshold.aggregation-bolt.emit-count.system
aggregation-bolt.execute-count.alarm-creation-bolt_alarm-creation-stream :
monasca_threshold.aggregation-bolt.execute-count.alarm-creation-bolt_alarm-creation-stream
aggregation-bolt.execute-count.event-bolt_metric-sub-alarm-events :
monasca_threshold.aggregation-bolt.execute-count.event-bolt_metric-sub-alarm-events
aggregation-bolt.execute-count.filtering-bolt_default :
monasca_threshold.aggregation-bolt.execute-count.filtering-bolt_default
aggregation-bolt.execute-count.system_tick :
monasca_threshold.aggregation-bolt.execute-count.system_tick
aggregation-bolt.execute-latency.alarm-creation-bolt_alarm-creation-stream :
monasca_threshold.aggregation-bolt.execute-latency.alarm-creation-bolt_alarm-creation-stream
aggregation-bolt.execute-latency.event-bolt_metric-sub-alarm-events :
monasca_threshold.aggregation-bolt.execute-latency.event-bolt_metric-sub-alarm-events
aggregation-bolt.execute-latency.filtering-bolt_default :
monasca_threshold.aggregation-bolt.execute-latency.filtering-bolt_default
aggregation-bolt.execute-latency.system_tick :
monasca_threshold.aggregation-bolt.execute-latency.system_tick
aggregation-bolt.process-latency.alarm-creation-bolt_alarm-creation-stream :
monasca_threshold.aggregation-bolt.process-latency.alarm-creation-bolt_alarm-creation-stream
aggregation-bolt.process-latency.event-bolt_metric-sub-alarm-events :
monasca_threshold.aggregation-bolt.process-latency.event-bolt_metric-sub-alarm-events
aggregation-bolt.process-latency.filtering-bolt_default :
monasca_threshold.aggregation-bolt.process-latency.filtering-bolt_default
aggregation-bolt.process-latency.system_tick :
monasca_threshold.aggregation-bolt.process-latency.system_tick
aggregation-bolt.receive.capacity :
monasca_threshold.aggregation-bolt.receive.capacity
aggregation-bolt.receive.population :
monasca_threshold.aggregation-bolt.receive.population
aggregation-bolt.receive.read_pos :
monasca_threshold.aggregation-bolt.receive.read_pos
aggregation-bolt.receive.write_pos :
monasca_threshold.aggregation-bolt.receive.write_pos
aggregation-bolt.sendqueue.capacity :
monasca_threshold.aggregation-bolt.sendqueue.capacity
aggregation-bolt.sendqueue.population :
monasca_threshold.aggregation-bolt.sendqueue.population
aggregation-bolt.sendqueue.read_pos :
monasca_threshold.aggregation-bolt.sendqueue.read_pos
aggregation-bolt.sendqueue.write_pos :
monasca_threshold.aggregation-bolt.sendqueue.write_pos
aggregation-bolt.transfer-count.default :
monasca_threshold.aggregation-bolt.transfer-count.default
aggregation-bolt.transfer-count.metrics :
monasca_threshold.aggregation-bolt.transfer-count.metrics
aggregation-bolt.transfer-count.system :
monasca_threshold.aggregation-bolt.transfer-count.system
alarm-creation-bolt.ack-count.event-bolt_alarm-definition-events :
monasca_threshold.alarm-creation-bolt.ack-count.event-bolt_alarm-definition-events
alarm-creation-bolt.ack-count.filtering-bolt_newMetricForAlarmDefinitionStream :
monasca_threshold.alarm-creation-bolt.ack-count.filtering-bolt_newMetricForAlarmDefinitionStream
alarm-creation-bolt.emit-count.alarm-creation-stream :
monasca_threshold.alarm-creation-bolt.emit-count.alarm-creation-stream
alarm-creation-bolt.emit-count.metrics :
monasca_threshold.alarm-creation-bolt.emit-count.metrics
alarm-creation-bolt.execute-count.event-bolt_alarm-definition-events :
monasca_threshold.alarm-creation-bolt.execute-count.event-bolt_alarm-definition-events
alarm-creation-bolt.execute-count.filtering-bolt_newMetricForAlarmDefinitionStream :
monasca_threshold.alarm-creation-bolt.execute-count.filtering-bolt_newMetricForAlarmDefinitionStream
alarm-creation-bolt.execute-latency.event-bolt_alarm-definition-events :
monasca_threshold.alarm-creation-bolt.execute-latency.event-bolt_alarm-definition-events
alarm-creation-bolt.execute-latency.filtering-bolt_newMetricForAlarmDefinitionStream :
monasca_threshold.alarm-creation-bolt.execute-latency.filtering-bolt_newMetricForAlarmDefinitionStream
alarm-creation-bolt.process-latency.event-bolt_alarm-definition-events :
monasca_threshold.alarm-creation-bolt.process-latency.event-bolt_alarm-definition-events
alarm-creation-bolt.process-latency.filtering-bolt_newMetricForAlarmDefinitionStream :
monasca_threshold.alarm-creation-bolt.process-latency.filtering-bolt_newMetricForAlarmDefinitionStream
alarm-creation-bolt.receive.capacity :
monasca_threshold.alarm-creation-bolt.receive.capacity
alarm-creation-bolt.receive.population :
monasca_threshold.alarm-creation-bolt.receive.population
alarm-creation-bolt.receive.read_pos :
monasca_threshold.alarm-creation-bolt.receive.read_pos
alarm-creation-bolt.receive.write_pos :
monasca_threshold.alarm-creation-bolt.receive.write_pos
alarm-creation-bolt.sendqueue.capacity :
monasca_threshold.alarm-creation-bolt.sendqueue.capacity
alarm-creation-bolt.sendqueue.population :
monasca_threshold.alarm-creation-bolt.sendqueue.population
alarm-creation-bolt.sendqueue.read_pos :
monasca_threshold.alarm-creation-bolt.sendqueue.read_pos
alarm-creation-bolt.sendqueue.write_pos :
monasca_threshold.alarm-creation-bolt.sendqueue.write_pos
alarm-creation-bolt.transfer-count.alarm-creation-stream :
monasca_threshold.alarm-creation-bolt.transfer-count.alarm-creation-stream
alarm-creation-bolt.transfer-count.metrics :
monasca_threshold.alarm-creation-bolt.transfer-count.metrics
event-bolt.emit-count.alarm-definition-events :
monasca_threshold.event-bolt.emit-count.alarm-definition-events
event-bolt.emit-count.metrics :
monasca_threshold.event-bolt.emit-count.metrics
event-bolt.execute-count.event-spout_default :
monasca_threshold.event-bolt.execute-count.event-spout_default
event-bolt.execute-latency.event-spout_default :
monasca_threshold.event-bolt.execute-latency.event-spout_default
event-bolt.receive.capacity :
monasca_threshold.event-bolt.receive.capacity
event-bolt.receive.population :
monasca_threshold.event-bolt.receive.population
event-bolt.receive.read_pos :
monasca_threshold.event-bolt.receive.read_pos
event-bolt.receive.write_pos :
monasca_threshold.event-bolt.receive.write_pos
event-bolt.sendqueue.capacity :
monasca_threshold.event-bolt.sendqueue.capacity
event-bolt.sendqueue.population :
monasca_threshold.event-bolt.sendqueue.population
event-bolt.sendqueue.read_pos :
monasca_threshold.event-bolt.sendqueue.read_pos
event-bolt.sendqueue.write_pos :
monasca_threshold.event-bolt.sendqueue.write_pos
event-bolt.transfer-count.alarm-definition-events :
monasca_threshold.event-bolt.transfer-count.alarm-definition-events
event-bolt.transfer-count.metrics :
monasca_threshold.event-bolt.transfer-count.metrics
event-spout.emit-count.default :
monasca_threshold.event-spout.emit-count.default
event-spout.emit-count.metrics :
monasca_threshold.event-spout.emit-count.metrics
event-spout.receive.capacity :
monasca_threshold.event-spout.receive.capacity
event-spout.receive.population :
monasca_threshold.event-spout.receive.population
event-spout.receive.read_pos :
monasca_threshold.event-spout.receive.read_pos
event-spout.receive.write_pos :
monasca_threshold.event-spout.receive.write_pos
event-spout.sendqueue.capacity :
monasca_threshold.event-spout.sendqueue.capacity
event-spout.sendqueue.population :
monasca_threshold.event-spout.sendqueue.population
event-spout.sendqueue.read_pos :
monasca_threshold.event-spout.sendqueue.read_pos
event-spout.sendqueue.write_pos :
monasca_threshold.event-spout.sendqueue.write_pos
event-spout.transfer-count.default :
monasca_threshold.event-spout.transfer-count.default
event-spout.transfer-count.metrics :
monasca_threshold.event-spout.transfer-count.metrics
filtering-bolt.ack-count.event-bolt_alarm-definition-events :
monasca_threshold.filtering-bolt.ack-count.event-bolt_alarm-definition-events
filtering-bolt.ack-count.metrics-spout_default :
monasca_threshold.filtering-bolt.ack-count.metrics-spout_default
filtering-bolt.emit-count.default :
monasca_threshold.filtering-bolt.emit-count.default
filtering-bolt.emit-count.metrics :
monasca_threshold.filtering-bolt.emit-count.metrics
filtering-bolt.emit-count.newMetricForAlarmDefinitionStream :
monasca_threshold.filtering-bolt.emit-count.newMetricForAlarmDefinitionStream
filtering-bolt.execute-count.event-bolt_alarm-definition-events :
monasca_threshold.filtering-bolt.execute-count.event-bolt_alarm-definition-events
filtering-bolt.execute-count.metrics-spout_default :
monasca_threshold.filtering-bolt.execute-count.metrics-spout_default
filtering-bolt.execute-latency.event-bolt_alarm-definition-events :
monasca_threshold.filtering-bolt.execute-latency.event-bolt_alarm-definition-events
filtering-bolt.execute-latency.metrics-spout_default :
monasca_threshold.filtering-bolt.execute-latency.metrics-spout_default
filtering-bolt.process-latency.event-bolt_alarm-definition-events :
monasca_threshold.filtering-bolt.process-latency.event-bolt_alarm-definition-events
filtering-bolt.process-latency.metrics-spout_default :
monasca_threshold.filtering-bolt.process-latency.metrics-spout_default
filtering-bolt.receive.capacity :
monasca_threshold.filtering-bolt.receive.capacity
filtering-bolt.receive.population :
monasca_threshold.filtering-bolt.receive.population
filtering-bolt.receive.read_pos :
monasca_threshold.filtering-bolt.receive.read_pos
filtering-bolt.receive.write_pos :
monasca_threshold.filtering-bolt.receive.write_pos
filtering-bolt.sendqueue.capacity :
monasca_threshold.filtering-bolt.sendqueue.capacity
filtering-bolt.sendqueue.population :
monasca_threshold.filtering-bolt.sendqueue.population
filtering-bolt.sendqueue.read_pos :
monasca_threshold.filtering-bolt.sendqueue.read_pos
filtering-bolt.sendqueue.write_pos :
monasca_threshold.filtering-bolt.sendqueue.write_pos
filtering-bolt.transfer-count.default :
monasca_threshold.filtering-bolt.transfer-count.default
filtering-bolt.transfer-count.metrics :
monasca_threshold.filtering-bolt.transfer-count.metrics
filtering-bolt.transfer-count.newMetricForAlarmDefinitionStream :
monasca_threshold.filtering-bolt.transfer-count.newMetricForAlarmDefinitionStream
metrics-spout.emit-count.default :
monasca_threshold.metrics-spout.emit-count.default
metrics-spout.emit-count.metrics :
monasca_threshold.metrics-spout.emit-count.metrics
metrics-spout.receive.capacity :
monasca_threshold.metrics-spout.receive.capacity
metrics-spout.receive.population :
monasca_threshold.metrics-spout.receive.population
metrics-spout.receive.read_pos :
monasca_threshold.metrics-spout.receive.read_pos
metrics-spout.receive.write_pos :
monasca_threshold.metrics-spout.receive.write_pos
metrics-spout.sendqueue.capacity :
monasca_threshold.metrics-spout.sendqueue.capacity
metrics-spout.sendqueue.population :
monasca_threshold.metrics-spout.sendqueue.population
metrics-spout.sendqueue.read_pos :
monasca_threshold.metrics-spout.sendqueue.read_pos
metrics-spout.sendqueue.write_pos :
monasca_threshold.metrics-spout.sendqueue.write_pos
metrics-spout.transfer-count.default :
monasca_threshold.metrics-spout.transfer-count.default
metrics-spout.transfer-count.metrics :
monasca_threshold.metrics-spout.transfer-count.metrics
system.emit-count.metrics :
monasca_threshold.system.emit-count.metrics
system.GC_ConcurrentMarkSweep.count :
monasca_threshold.system.GC_ConcurrentMarkSweep.count
system.GC_ConcurrentMarkSweep.timeMs :
monasca_threshold.system.GC_ConcurrentMarkSweep.timeMs
system.GC_ParNew.count :
monasca_threshold.system.GC_ParNew.count
system.GC_ParNew.timeMs :
monasca_threshold.system.GC_ParNew.timeMs
system.memory_heap.committedBytes :
monasca_threshold.system.memory_heap.committedBytes
system.memory_heap.initBytes :
monasca_threshold.system.memory_heap.initBytes
system.memory_heap.maxBytes :
monasca_threshold.system.memory_heap.maxBytes
system.memory_heap.unusedBytes :
monasca_threshold.system.memory_heap.unusedBytes
system.memory_heap.usedBytes :
monasca_threshold.system.memory_heap.usedBytes
system.memory_heap.virtualFreeBytes :
monasca_threshold.system.memory_heap.virtualFreeBytes
system.memory_nonHeap.committedBytes :
monasca_threshold.system.memory_nonHeap.committedBytes
system.memory_nonHeap.initBytes :
monasca_threshold.system.memory_nonHeap.initBytes
system.memory_nonHeap.maxBytes :
monasca_threshold.system.memory_nonHeap.maxBytes
system.memory_nonHeap.unusedBytes :
monasca_threshold.system.memory_nonHeap.unusedBytes
system.memory_nonHeap.usedBytes :
monasca_threshold.system.memory_nonHeap.usedBytes
system.memory_nonHeap.virtualFreeBytes :
monasca_threshold.system.memory_nonHeap.virtualFreeBytes
system.newWorkerEvent :
monasca_threshold.system.newWorkerEvent
system.receive.capacity :
monasca_threshold.system.receive.capacity
system.receive.population :
monasca_threshold.system.receive.population
system.receive.read_pos :
monasca_threshold.system.receive.read_pos
system.receive.write_pos :
monasca_threshold.system.receive.write_pos
system.sendqueue.capacity :
monasca_threshold.system.sendqueue.capacity
system.sendqueue.population :
monasca_threshold.system.sendqueue.population
system.sendqueue.read_pos :
monasca_threshold.system.sendqueue.read_pos
system.sendqueue.write_pos :
monasca_threshold.system.sendqueue.write_pos
system.startTimeSecs :
monasca_threshold.system.startTimeSecs
system.transfer.capacity :
monasca_threshold.system.transfer.capacity
system.transfer-count.metrics :
monasca_threshold.system.transfer-count.metrics
system.transfer.population :
monasca_threshold.system.transfer.population
system.transfer.read_pos :
monasca_threshold.system.transfer.read_pos
system.transfer.write_pos :
monasca_threshold.system.transfer.write_pos
system.uptimeSecs :
monasca_threshold.system.uptimeSecs
thresholding-bolt.ack-count.aggregation-bolt_default :
monasca_threshold.thresholding-bolt.ack-count.aggregation-bolt_default
thresholding-bolt.ack-count.event-bolt_alarm-definition-events :
monasca_threshold.thresholding-bolt.ack-count.event-bolt_alarm-definition-events
thresholding-bolt.ack-count.event-bolt_metric-sub-alarm-events :
monasca_threshold.thresholding-bolt.ack-count.event-bolt_metric-sub-alarm-events
thresholding-bolt.emit-count.metrics :
monasca_threshold.thresholding-bolt.emit-count.metrics
thresholding-bolt.execute-count.aggregation-bolt_default :
monasca_threshold.thresholding-bolt.execute-count.aggregation-bolt_default
thresholding-bolt.execute-count.event-bolt_alarm-definition-events :
monasca_threshold.thresholding-bolt.execute-count.event-bolt_alarm-definition-events
thresholding-bolt.execute-count.event-bolt_metric-sub-alarm-events :
monasca_threshold.thresholding-bolt.execute-count.event-bolt_metric-sub-alarm-events
thresholding-bolt.execute-latency.aggregation-bolt_default :
monasca_threshold.thresholding-bolt.execute-latency.aggregation-bolt_default
thresholding-bolt.execute-latency.event-bolt_alarm-definition-events :
monasca_threshold.thresholding-bolt.execute-latency.event-bolt_alarm-definition-events
thresholding-bolt.execute-latency.event-bolt_metric-sub-alarm-events :
monasca_threshold.thresholding-bolt.execute-latency.event-bolt_metric-sub-alarm-events
thresholding-bolt.process-latency.aggregation-bolt_default :
monasca_threshold.thresholding-bolt.process-latency.aggregation-bolt_default
thresholding-bolt.process-latency.event-bolt_alarm-definition-events :
monasca_threshold.thresholding-bolt.process-latency.event-bolt_alarm-definition-events
thresholding-bolt.process-latency.event-bolt_metric-sub-alarm-events :
monasca_threshold.thresholding-bolt.process-latency.event-bolt_metric-sub-alarm-events
thresholding-bolt.receive.capacity :
monasca_threshold.thresholding-bolt.receive.capacity
thresholding-bolt.receive.population :
monasca_threshold.thresholding-bolt.receive.population
thresholding-bolt.receive.read_pos :
monasca_threshold.thresholding-bolt.receive.read_pos
thresholding-bolt.receive.write_pos :
monasca_threshold.thresholding-bolt.receive.write_pos
thresholding-bolt.sendqueue.capacity :
monasca_threshold.thresholding-bolt.sendqueue.capacity
thresholding-bolt.sendqueue.population :
monasca_threshold.thresholding-bolt.sendqueue.population
thresholding-bolt.sendqueue.read_pos :
monasca_threshold.thresholding-bolt.sendqueue.read_pos
thresholding-bolt.sendqueue.write_pos :
monasca_threshold.thresholding-bolt.sendqueue.write_pos
thresholding-bolt.transfer-count.metrics :
monasca_threshold.thresholding-bolt.transfer-count.metrics

View File

@ -85,12 +85,18 @@ public class TopologyModule extends AbstractModule {
if (config.statsdConfig.getPort() != null) if (config.statsdConfig.getPort() != null)
statsdConfig.put(StatsdMetricConsumer.STATSD_PORT, statsdConfig.put(StatsdMetricConsumer.STATSD_PORT,
config.statsdConfig.getPort()); config.statsdConfig.getPort());
if (config.statsdConfig.getPrefix() != null) if (config.statsdConfig.getWhitelist() != null)
statsdConfig.put(StatsdMetricConsumer.STATSD_PREFIX, statsdConfig.put(StatsdMetricConsumer.STATSD_WHITELIST,
config.statsdConfig.getPrefix()); config.statsdConfig.getWhitelist());
if (config.statsdConfig.getMetricmap() != null)
statsdConfig.put(StatsdMetricConsumer.STATSD_METRICMAP,
config.statsdConfig.getMetricmap());
if (config.statsdConfig.getDimensions() != null) if (config.statsdConfig.getDimensions() != null)
statsdConfig.put(StatsdMetricConsumer.STATSD_DIMENSIONS, statsdConfig.put(StatsdMetricConsumer.STATSD_DIMENSIONS,
config.statsdConfig.getDimensions()); config.statsdConfig.getDimensions());
if (config.statsdConfig.getDebugmetrics() != null)
statsdConfig.put(StatsdMetricConsumer.STATSD_DEBUGMETRICS,
config.statsdConfig.getDebugmetrics());
stormConfig.registerMetricsConsumer(StatsdMetricConsumer.class, stormConfig.registerMetricsConsumer(StatsdMetricConsumer.class,
statsdConfig, 2); statsdConfig, 2);

View File

@ -19,6 +19,7 @@ package monasca.thresh.utils;
import java.io.Serializable; import java.io.Serializable;
import java.util.Map; import java.util.Map;
import java.util.List;
/* /*
* Intended to deserialize the statsdConfig element in the * Intended to deserialize the statsdConfig element in the
@ -29,11 +30,11 @@ public class StatsdConfig implements Serializable {
private static final long serialVersionUID = 3634080153227179376L; private static final long serialVersionUID = 3634080153227179376L;
private String host; private String host;
private Integer port; private Integer port;
private String prefix; private String prefix;
private List<String> whitelist;
private Boolean debugmetrics;
private Map<String, String> metricmap;
private Map<String, String> dimensions; private Map<String, String> dimensions;
public Map<String, String> getDimensions() { public Map<String, String> getDimensions() {
@ -44,6 +45,30 @@ public class StatsdConfig implements Serializable {
this.dimensions = dimensions; this.dimensions = dimensions;
} }
public List<String> getWhitelist() {
return whitelist;
}
public void setWhitelist(List<String> whitelist) {
this.whitelist = whitelist;
}
public Boolean getDebugmetrics() {
return debugmetrics;
}
public void setDebugmetrics(Boolean debugmetrics) {
this.debugmetrics = debugmetrics;
}
public Map<String, String> getMetricmap() {
return metricmap;
}
public void setMetricmap(Map<String, String> metricmap) {
this.metricmap = metricmap;
}
public String getHost() { public String getHost() {
return host; return host;
} }
@ -52,6 +77,14 @@ public class StatsdConfig implements Serializable {
this.host = host; this.host = host;
} }
public String getPrefix() {
return prefix;
}
public void setPrefix(String prefix) {
this.prefix = prefix;
}
public Integer getPort() { public Integer getPort() {
return port; return port;
} }
@ -60,11 +93,5 @@ public class StatsdConfig implements Serializable {
this.port = port; this.port = port;
} }
public String getPrefix() {
return prefix;
}
public void setPrefix(String prefix) {
this.prefix = prefix;
}
} }

View File

@ -19,8 +19,12 @@ package monasca.thresh.utils;
import java.io.IOException; import java.io.IOException;
import java.io.StringWriter; import java.io.StringWriter;
import java.lang.Boolean;
import java.lang.String;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -47,14 +51,20 @@ public class StatsdMetricConsumer implements IMetricsConsumer {
public static final String STATSD_HOST = "metrics.statsd.host"; public static final String STATSD_HOST = "metrics.statsd.host";
public static final String STATSD_PORT = "metrics.statsd.port"; public static final String STATSD_PORT = "metrics.statsd.port";
public static final String STATSD_PREFIX = "metrics.statsd.prefix"; public static final String STATSD_METRICMAP = "metrics.statsd.metricmap";
public static final String STATSD_WHITELIST = "metrics.statsd.whitelist";
public static final String STATSD_DIMENSIONS = "metrics.statsd.dimensions"; public static final String STATSD_DIMENSIONS = "metrics.statsd.dimensions";
public static final String STATSD_DEBUGMETRICS = "metrics.statsd.debugmetrics";
private String topologyName;
private String statsdHost = "localhost";
private int statsdPort = 8125;
private String monascaStatsdDimPrefix = "|#";
private List<String> whiteList = new ArrayList<String>();
private Map<String, String> metricMap = new HashMap<String, String>();
private Boolean debugMetrics = false;
String topologyName;
String statsdHost = "localhost";
int statsdPort = 8125;
String statsdPrefix = "monasca.storm.";
String monascaStatsdDimPrefix = "|#";
String defaultDimensions = new StringBuilder().append(monascaStatsdDimPrefix) String defaultDimensions = new StringBuilder().append(monascaStatsdDimPrefix)
.append("{\"service\":\"monitoring\",\"component\":\"storm\"}") .append("{\"service\":\"monitoring\",\"component\":\"storm\"}")
.toString(); .toString();
@ -92,9 +102,13 @@ public class StatsdMetricConsumer implements IMetricsConsumer {
@Override @Override
public void prepare(Map stormConf, Object registrationArgument, public void prepare(Map stormConf, Object registrationArgument,
TopologyContext context, IErrorReporter errorReporter) { TopologyContext context, IErrorReporter errorReporter) {
logger = LoggerFactory.getLogger(Logging.categoryFor(getClass(), context)); logger = LoggerFactory.getLogger(Logging.categoryFor(getClass(), context));
/* Sets up locals from the config STATSD_WHITELIST, STATSD_HOST ... */
parseConfig(stormConf); parseConfig(stormConf);
/* Sets up local vars from config vars if present */
if (registrationArgument instanceof Map) { if (registrationArgument instanceof Map) {
parseConfig((Map<?, ?>) registrationArgument); parseConfig((Map<?, ?>) registrationArgument);
} }
@ -102,8 +116,9 @@ public class StatsdMetricConsumer implements IMetricsConsumer {
initClient(); initClient();
logger.info( logger.info(
"statsdPrefix ({}), topologyName ({}), clean(topologyName) ({})", "topologyName ({}), "
new Object[] { statsdPrefix, topologyName, clean(topologyName) }); + "clean(topologyName) ({})", new Object[] { topologyName,
clean(topologyName) });
} }
private void initClient() { private void initClient() {
@ -144,13 +159,6 @@ public class StatsdMetricConsumer implements IMetricsConsumer {
statsdPort = ((Number) conf.get(STATSD_PORT)).intValue(); statsdPort = ((Number) conf.get(STATSD_PORT)).intValue();
} }
if (conf.containsKey(STATSD_PREFIX)) {
statsdPrefix = (String) conf.get(STATSD_PREFIX);
if (!statsdPrefix.endsWith(".")) {
statsdPrefix += ".";
}
}
if (conf.containsKey(STATSD_DIMENSIONS)) { if (conf.containsKey(STATSD_DIMENSIONS)) {
statsdDimensions = mapToJsonStr((Map<String, String>) conf statsdDimensions = mapToJsonStr((Map<String, String>) conf
.get(STATSD_DIMENSIONS)); .get(STATSD_DIMENSIONS));
@ -164,6 +172,18 @@ public class StatsdMetricConsumer implements IMetricsConsumer {
statsdDimensions = monascaStatsdDimPrefix + statsdDimensions; statsdDimensions = monascaStatsdDimPrefix + statsdDimensions;
} }
} }
if (conf.containsKey(STATSD_WHITELIST)) {
whiteList = (List<String>) conf.get(STATSD_WHITELIST);
}
if (conf.containsKey(STATSD_METRICMAP)) {
metricMap = (Map<String, String>) conf.get(STATSD_METRICMAP);
}
if (conf.containsKey(STATSD_DEBUGMETRICS)) {
debugMetrics = (Boolean) conf.get(STATSD_DEBUGMETRICS);
}
} }
private String mapToJsonStr(Map<String, String> inputMap) { private String mapToJsonStr(Map<String, String> inputMap) {
@ -216,19 +236,17 @@ public class StatsdMetricConsumer implements IMetricsConsumer {
public void handleDataPoints(TaskInfo taskInfo, public void handleDataPoints(TaskInfo taskInfo,
Collection<DataPoint> dataPoints) { Collection<DataPoint> dataPoints) {
for (Metric metric : dataPointsToMetrics(taskInfo, dataPoints)) { for (Metric metric : dataPointsToMetrics(taskInfo, dataPoints)) {
report(metric.name, metric.value, metric.dimensions); reportUOM(metric.name, metric.value);
} }
} }
public static class Metric { public static class Metric {
String name; String name;
Double value; Double value;
String dimensions;
public Metric(String name, Double value, String dimensions) { public Metric(String name, Double value) {
this.name = name; this.name = name;
this.value = value; this.value = value;
this.dimensions = dimensions;
} }
@Override @Override
@ -248,15 +266,12 @@ public class StatsdMetricConsumer implements IMetricsConsumer {
return false; return false;
if (value != other.value) if (value != other.value)
return false; return false;
if (!dimensions.equals(other.dimensions))
return false;
return true; return true;
} }
@Override @Override
public String toString() { public String toString() {
return "Metric [name=" + name + ", value=" + value + ", dimensions=" return "Metric [name=" + name + ", value=" + value + "]";
+ dimensions + "]";
} }
} }
@ -278,11 +293,11 @@ public class StatsdMetricConsumer implements IMetricsConsumer {
new Object[] { p.name, p.value }); new Object[] { p.name, p.value });
if (p.value instanceof Number) { if (p.value instanceof Number) {
res.add(new Metric(sb.toString(), ((Number) p.value).doubleValue(), res.add(new Metric(sb.toString(), ((Number) p.value).doubleValue()));
statsdDimensions));
} }
// There is a map of data points and it's not empty // There is a map of data points and it's not empty
else if (p.value instanceof Map && !(((Map<?, ?>) (p.value)).isEmpty())) { else if (p.value instanceof Map &&
!(((Map<?, ?>) (p.value)).isEmpty())) {
int hdrAndNameLength = sb.length(); int hdrAndNameLength = sb.length();
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
Map map = (Map) p.value; Map map = (Map) p.value;
@ -293,7 +308,7 @@ public class StatsdMetricConsumer implements IMetricsConsumer {
sb.append(".").append(clean(subName.toString())); sb.append(".").append(clean(subName.toString()));
res.add(new Metric(sb.toString(), res.add(new Metric(sb.toString(),
((Number) subValue).doubleValue(), statsdDimensions)); ((Number) subValue).doubleValue()));
} }
} }
} }
@ -305,13 +320,10 @@ public class StatsdMetricConsumer implements IMetricsConsumer {
* Since the Java client doesn't support the Monasca metric type we need to * Since the Java client doesn't support the Monasca metric type we need to
* build it with a raw UDP request * build it with a raw UDP request
*/ */
public void report(String s, Double number, String dimensions) { public void report(String s) {
if (udpclient != null) { if (udpclient != null) {
StringBuilder statsdMessage = new StringBuilder().append(statsdPrefix) logger.debug("reporting: {}", s);
.append(s).append(":").append(String.valueOf(number)).append("|c") udpclient.send(s);
.append(statsdDimensions);
logger.debug("reporting: {}={}{}", s, number, dimensions);
udpclient.send(statsdMessage.toString());
} }
else { else {
/* Try to setup the UDP client since it was null */ /* Try to setup the UDP client since it was null */
@ -319,6 +331,65 @@ public class StatsdMetricConsumer implements IMetricsConsumer {
} }
} }
private void reportUOM(String s, Double number) {
String metricName = null;
StringBuilder results = new StringBuilder();
Boolean published = false;
if (whiteList.contains(s)) {
if (!metricMap.isEmpty() && metricMap.containsKey(s)) {
metricName = metricMap.get(s);
}
/* Send the unmapped uom as the same name storm calls it */
else {
metricName = s;
}
/* Make sure we don't send metric names that may be null or empty */
if (metricName != null && !metricName.isEmpty()) {
published = true;
}
}
/*
* To enable debug message, you also need to add an entry like this:
*
* <logger name="monasca.thresh" additivity="false">
* <level value="INFO" />
* <appender-ref ref="A1" />
* </logger>
*
* Storm/Thresh logger config file:
* /opt/storm/apache-storm-0.9.5/logback/cluster.xml
*
*/
if (debugMetrics) {
String mappedName = new String();
if (!metricMap.isEmpty() && metricMap.containsKey(s)) {
mappedName = metricMap.get(s);
}
else {
mappedName = s;
}
logger.info(", RawMetricName, {}, MappedMetricName, {}, "
+ "val, {}, {}", new Object[]
{ s, mappedName, number,
published == true ? "PUBLISHED" : "UNPUBLISHED"});
}
if (published) {
results = results.append(metricName).append(":")
.append(String.valueOf(number)).append("|c")
.append(statsdDimensions);
report(results.toString());
}
}
@Override @Override
public void cleanup() { public void cleanup() {
udpclient.stop(); udpclient.stop();