diff --git a/thresh/src/main/config/thresh-sample-config.yml b/thresh/src/main/config/thresh-sample-config.yml index af89a03..c607355 100644 --- a/thresh/src/main/config/thresh-sample-config.yml +++ b/thresh/src/main/config/thresh-sample-config.yml @@ -1,14 +1,6 @@ metricSpoutThreads: 2 metricSpoutTasks: 2 -statsdConfig: - host: localhost - port: 8125 - prefix: monasca.storm. - dimensions: !!map - service : monitoring - component : storm - metricSpoutConfig: maxWaitTime: 500 @@ -80,9 +72,9 @@ kafkaProducerConfig: requestRequiredAcks: 1 requestTimeoutMs: 10000 producerType: sync - keySerializerClass: + keySerializerClass: compressionCodec: none - compressedTopics: + compressedTopics: messageSendMaxRetries: 3 retryBackoffMs: 100 topicMetadataRefreshIntervalMs: 600000 @@ -117,3 +109,376 @@ database: maxSize: 41 + + +statsdConfig: + host: localhost + port: 8125 + debugmetrics: false + dimensions: !!map + service : monitoring + component : storm + whitelist: !!seq + - aggregation-bolt.execute-count.filtering-bolt_alarm-creation-stream + - aggregation-bolt.execute-count.filtering-bolt_default + - aggregation-bolt.execute-count.system_tick + - filtering-bolt.execute-count.event-bolt_metric-alarm-events + - filtering-bolt.execute-count.metrics-spout_default + - thresholding-bolt.execute-count.aggregation-bolt_default + - thresholding-bolt.execute-count.event-bolt_alarm-definition-events + - system.memory_heap.committedBytes + - system.memory_nonHeap.committedBytes + - system.newWorkerEvent + - system.startTimeSecs + - system.GC_ConcurrentMarkSweep.timeMs + metricmap: !!map + acker.emit-count.metrics : + monasca_threshold.acker.emit-count.metrics + acker.receive.capacity : + monasca_threshold.acker.receive.capacity + acker.receive.population : + monasca_threshold.acker.receive.population + acker.receive.read_pos : + monasca_threshold.acker.receive.read_pos + acker.receive.write_pos : + monasca_threshold.acker.receive.write_pos + acker.sendqueue.capacity : + monasca_threshold.acker.sendqueue.capacity + acker.sendqueue.population : + monasca_threshold.acker.sendqueue.population + acker.sendqueue.read_pos : + monasca_threshold.acker.sendqueue.read_pos + acker.sendqueue.write_pos : + monasca_threshold.acker.sendqueue.write_pos + acker.transfer-count.metrics : + monasca_threshold.acker.transfer-count.metrics + aggregation-bolt.ack-count.alarm-creation-bolt_alarm-creation-stream : + monasca_threshold.aggregation-bolt.ack-count.alarm-creation-bolt_alarm-creation-stream + aggregation-bolt.ack-count.event-bolt_metric-sub-alarm-events : + monasca_threshold.aggregation-bolt.ack-count.event-bolt_metric-sub-alarm-events + aggregation-bolt.ack-count.filtering-bolt_default : + monasca_threshold.aggregation-bolt.ack-count.filtering-bolt_default + aggregation-bolt.ack-count.system_tick : + monasca_threshold.aggregation-bolt.ack-count.system_tick + aggregation-bolt.emit-count.default : + monasca_threshold.aggregation-bolt.emit-count.default + aggregation-bolt.emit-count.metrics : + monasca_threshold.aggregation-bolt.emit-count.metrics + aggregation-bolt.emit-count.system : + monasca_threshold.aggregation-bolt.emit-count.system + aggregation-bolt.execute-count.alarm-creation-bolt_alarm-creation-stream : + monasca_threshold.aggregation-bolt.execute-count.alarm-creation-bolt_alarm-creation-stream + aggregation-bolt.execute-count.event-bolt_metric-sub-alarm-events : + monasca_threshold.aggregation-bolt.execute-count.event-bolt_metric-sub-alarm-events + aggregation-bolt.execute-count.filtering-bolt_default : + monasca_threshold.aggregation-bolt.execute-count.filtering-bolt_default + aggregation-bolt.execute-count.system_tick : + monasca_threshold.aggregation-bolt.execute-count.system_tick + aggregation-bolt.execute-latency.alarm-creation-bolt_alarm-creation-stream : + monasca_threshold.aggregation-bolt.execute-latency.alarm-creation-bolt_alarm-creation-stream + aggregation-bolt.execute-latency.event-bolt_metric-sub-alarm-events : + monasca_threshold.aggregation-bolt.execute-latency.event-bolt_metric-sub-alarm-events + aggregation-bolt.execute-latency.filtering-bolt_default : + monasca_threshold.aggregation-bolt.execute-latency.filtering-bolt_default + aggregation-bolt.execute-latency.system_tick : + monasca_threshold.aggregation-bolt.execute-latency.system_tick + aggregation-bolt.process-latency.alarm-creation-bolt_alarm-creation-stream : + monasca_threshold.aggregation-bolt.process-latency.alarm-creation-bolt_alarm-creation-stream + aggregation-bolt.process-latency.event-bolt_metric-sub-alarm-events : + monasca_threshold.aggregation-bolt.process-latency.event-bolt_metric-sub-alarm-events + aggregation-bolt.process-latency.filtering-bolt_default : + monasca_threshold.aggregation-bolt.process-latency.filtering-bolt_default + aggregation-bolt.process-latency.system_tick : + monasca_threshold.aggregation-bolt.process-latency.system_tick + aggregation-bolt.receive.capacity : + monasca_threshold.aggregation-bolt.receive.capacity + aggregation-bolt.receive.population : + monasca_threshold.aggregation-bolt.receive.population + aggregation-bolt.receive.read_pos : + monasca_threshold.aggregation-bolt.receive.read_pos + aggregation-bolt.receive.write_pos : + monasca_threshold.aggregation-bolt.receive.write_pos + aggregation-bolt.sendqueue.capacity : + monasca_threshold.aggregation-bolt.sendqueue.capacity + aggregation-bolt.sendqueue.population : + monasca_threshold.aggregation-bolt.sendqueue.population + aggregation-bolt.sendqueue.read_pos : + monasca_threshold.aggregation-bolt.sendqueue.read_pos + aggregation-bolt.sendqueue.write_pos : + monasca_threshold.aggregation-bolt.sendqueue.write_pos + aggregation-bolt.transfer-count.default : + monasca_threshold.aggregation-bolt.transfer-count.default + aggregation-bolt.transfer-count.metrics : + monasca_threshold.aggregation-bolt.transfer-count.metrics + aggregation-bolt.transfer-count.system : + monasca_threshold.aggregation-bolt.transfer-count.system + alarm-creation-bolt.ack-count.event-bolt_alarm-definition-events : + monasca_threshold.alarm-creation-bolt.ack-count.event-bolt_alarm-definition-events + alarm-creation-bolt.ack-count.filtering-bolt_newMetricForAlarmDefinitionStream : + monasca_threshold.alarm-creation-bolt.ack-count.filtering-bolt_newMetricForAlarmDefinitionStream + alarm-creation-bolt.emit-count.alarm-creation-stream : + monasca_threshold.alarm-creation-bolt.emit-count.alarm-creation-stream + alarm-creation-bolt.emit-count.metrics : + monasca_threshold.alarm-creation-bolt.emit-count.metrics + alarm-creation-bolt.execute-count.event-bolt_alarm-definition-events : + monasca_threshold.alarm-creation-bolt.execute-count.event-bolt_alarm-definition-events + alarm-creation-bolt.execute-count.filtering-bolt_newMetricForAlarmDefinitionStream : + monasca_threshold.alarm-creation-bolt.execute-count.filtering-bolt_newMetricForAlarmDefinitionStream + alarm-creation-bolt.execute-latency.event-bolt_alarm-definition-events : + monasca_threshold.alarm-creation-bolt.execute-latency.event-bolt_alarm-definition-events + alarm-creation-bolt.execute-latency.filtering-bolt_newMetricForAlarmDefinitionStream : + monasca_threshold.alarm-creation-bolt.execute-latency.filtering-bolt_newMetricForAlarmDefinitionStream + alarm-creation-bolt.process-latency.event-bolt_alarm-definition-events : + monasca_threshold.alarm-creation-bolt.process-latency.event-bolt_alarm-definition-events + alarm-creation-bolt.process-latency.filtering-bolt_newMetricForAlarmDefinitionStream : + monasca_threshold.alarm-creation-bolt.process-latency.filtering-bolt_newMetricForAlarmDefinitionStream + alarm-creation-bolt.receive.capacity : + monasca_threshold.alarm-creation-bolt.receive.capacity + alarm-creation-bolt.receive.population : + monasca_threshold.alarm-creation-bolt.receive.population + alarm-creation-bolt.receive.read_pos : + monasca_threshold.alarm-creation-bolt.receive.read_pos + alarm-creation-bolt.receive.write_pos : + monasca_threshold.alarm-creation-bolt.receive.write_pos + alarm-creation-bolt.sendqueue.capacity : + monasca_threshold.alarm-creation-bolt.sendqueue.capacity + alarm-creation-bolt.sendqueue.population : + monasca_threshold.alarm-creation-bolt.sendqueue.population + alarm-creation-bolt.sendqueue.read_pos : + monasca_threshold.alarm-creation-bolt.sendqueue.read_pos + alarm-creation-bolt.sendqueue.write_pos : + monasca_threshold.alarm-creation-bolt.sendqueue.write_pos + alarm-creation-bolt.transfer-count.alarm-creation-stream : + monasca_threshold.alarm-creation-bolt.transfer-count.alarm-creation-stream + alarm-creation-bolt.transfer-count.metrics : + monasca_threshold.alarm-creation-bolt.transfer-count.metrics + event-bolt.emit-count.alarm-definition-events : + monasca_threshold.event-bolt.emit-count.alarm-definition-events + event-bolt.emit-count.metrics : + monasca_threshold.event-bolt.emit-count.metrics + event-bolt.execute-count.event-spout_default : + monasca_threshold.event-bolt.execute-count.event-spout_default + event-bolt.execute-latency.event-spout_default : + monasca_threshold.event-bolt.execute-latency.event-spout_default + event-bolt.receive.capacity : + monasca_threshold.event-bolt.receive.capacity + event-bolt.receive.population : + monasca_threshold.event-bolt.receive.population + event-bolt.receive.read_pos : + monasca_threshold.event-bolt.receive.read_pos + event-bolt.receive.write_pos : + monasca_threshold.event-bolt.receive.write_pos + event-bolt.sendqueue.capacity : + monasca_threshold.event-bolt.sendqueue.capacity + event-bolt.sendqueue.population : + monasca_threshold.event-bolt.sendqueue.population + event-bolt.sendqueue.read_pos : + monasca_threshold.event-bolt.sendqueue.read_pos + event-bolt.sendqueue.write_pos : + monasca_threshold.event-bolt.sendqueue.write_pos + event-bolt.transfer-count.alarm-definition-events : + monasca_threshold.event-bolt.transfer-count.alarm-definition-events + event-bolt.transfer-count.metrics : + monasca_threshold.event-bolt.transfer-count.metrics + event-spout.emit-count.default : + monasca_threshold.event-spout.emit-count.default + event-spout.emit-count.metrics : + monasca_threshold.event-spout.emit-count.metrics + event-spout.receive.capacity : + monasca_threshold.event-spout.receive.capacity + event-spout.receive.population : + monasca_threshold.event-spout.receive.population + event-spout.receive.read_pos : + monasca_threshold.event-spout.receive.read_pos + event-spout.receive.write_pos : + monasca_threshold.event-spout.receive.write_pos + event-spout.sendqueue.capacity : + monasca_threshold.event-spout.sendqueue.capacity + event-spout.sendqueue.population : + monasca_threshold.event-spout.sendqueue.population + event-spout.sendqueue.read_pos : + monasca_threshold.event-spout.sendqueue.read_pos + event-spout.sendqueue.write_pos : + monasca_threshold.event-spout.sendqueue.write_pos + event-spout.transfer-count.default : + monasca_threshold.event-spout.transfer-count.default + event-spout.transfer-count.metrics : + monasca_threshold.event-spout.transfer-count.metrics + filtering-bolt.ack-count.event-bolt_alarm-definition-events : + monasca_threshold.filtering-bolt.ack-count.event-bolt_alarm-definition-events + filtering-bolt.ack-count.metrics-spout_default : + monasca_threshold.filtering-bolt.ack-count.metrics-spout_default + filtering-bolt.emit-count.default : + monasca_threshold.filtering-bolt.emit-count.default + filtering-bolt.emit-count.metrics : + monasca_threshold.filtering-bolt.emit-count.metrics + filtering-bolt.emit-count.newMetricForAlarmDefinitionStream : + monasca_threshold.filtering-bolt.emit-count.newMetricForAlarmDefinitionStream + filtering-bolt.execute-count.event-bolt_alarm-definition-events : + monasca_threshold.filtering-bolt.execute-count.event-bolt_alarm-definition-events + filtering-bolt.execute-count.metrics-spout_default : + monasca_threshold.filtering-bolt.execute-count.metrics-spout_default + filtering-bolt.execute-latency.event-bolt_alarm-definition-events : + monasca_threshold.filtering-bolt.execute-latency.event-bolt_alarm-definition-events + filtering-bolt.execute-latency.metrics-spout_default : + monasca_threshold.filtering-bolt.execute-latency.metrics-spout_default + filtering-bolt.process-latency.event-bolt_alarm-definition-events : + monasca_threshold.filtering-bolt.process-latency.event-bolt_alarm-definition-events + filtering-bolt.process-latency.metrics-spout_default : + monasca_threshold.filtering-bolt.process-latency.metrics-spout_default + filtering-bolt.receive.capacity : + monasca_threshold.filtering-bolt.receive.capacity + filtering-bolt.receive.population : + monasca_threshold.filtering-bolt.receive.population + filtering-bolt.receive.read_pos : + monasca_threshold.filtering-bolt.receive.read_pos + filtering-bolt.receive.write_pos : + monasca_threshold.filtering-bolt.receive.write_pos + filtering-bolt.sendqueue.capacity : + monasca_threshold.filtering-bolt.sendqueue.capacity + filtering-bolt.sendqueue.population : + monasca_threshold.filtering-bolt.sendqueue.population + filtering-bolt.sendqueue.read_pos : + monasca_threshold.filtering-bolt.sendqueue.read_pos + filtering-bolt.sendqueue.write_pos : + monasca_threshold.filtering-bolt.sendqueue.write_pos + filtering-bolt.transfer-count.default : + monasca_threshold.filtering-bolt.transfer-count.default + filtering-bolt.transfer-count.metrics : + monasca_threshold.filtering-bolt.transfer-count.metrics + filtering-bolt.transfer-count.newMetricForAlarmDefinitionStream : + monasca_threshold.filtering-bolt.transfer-count.newMetricForAlarmDefinitionStream + metrics-spout.emit-count.default : + monasca_threshold.metrics-spout.emit-count.default + metrics-spout.emit-count.metrics : + monasca_threshold.metrics-spout.emit-count.metrics + metrics-spout.receive.capacity : + monasca_threshold.metrics-spout.receive.capacity + metrics-spout.receive.population : + monasca_threshold.metrics-spout.receive.population + metrics-spout.receive.read_pos : + monasca_threshold.metrics-spout.receive.read_pos + metrics-spout.receive.write_pos : + monasca_threshold.metrics-spout.receive.write_pos + metrics-spout.sendqueue.capacity : + monasca_threshold.metrics-spout.sendqueue.capacity + metrics-spout.sendqueue.population : + monasca_threshold.metrics-spout.sendqueue.population + metrics-spout.sendqueue.read_pos : + monasca_threshold.metrics-spout.sendqueue.read_pos + metrics-spout.sendqueue.write_pos : + monasca_threshold.metrics-spout.sendqueue.write_pos + metrics-spout.transfer-count.default : + monasca_threshold.metrics-spout.transfer-count.default + metrics-spout.transfer-count.metrics : + monasca_threshold.metrics-spout.transfer-count.metrics + system.emit-count.metrics : + monasca_threshold.system.emit-count.metrics + system.GC_ConcurrentMarkSweep.count : + monasca_threshold.system.GC_ConcurrentMarkSweep.count + system.GC_ConcurrentMarkSweep.timeMs : + monasca_threshold.system.GC_ConcurrentMarkSweep.timeMs + system.GC_ParNew.count : + monasca_threshold.system.GC_ParNew.count + system.GC_ParNew.timeMs : + monasca_threshold.system.GC_ParNew.timeMs + system.memory_heap.committedBytes : + monasca_threshold.system.memory_heap.committedBytes + system.memory_heap.initBytes : + monasca_threshold.system.memory_heap.initBytes + system.memory_heap.maxBytes : + monasca_threshold.system.memory_heap.maxBytes + system.memory_heap.unusedBytes : + monasca_threshold.system.memory_heap.unusedBytes + system.memory_heap.usedBytes : + monasca_threshold.system.memory_heap.usedBytes + system.memory_heap.virtualFreeBytes : + monasca_threshold.system.memory_heap.virtualFreeBytes + system.memory_nonHeap.committedBytes : + monasca_threshold.system.memory_nonHeap.committedBytes + system.memory_nonHeap.initBytes : + monasca_threshold.system.memory_nonHeap.initBytes + system.memory_nonHeap.maxBytes : + monasca_threshold.system.memory_nonHeap.maxBytes + system.memory_nonHeap.unusedBytes : + monasca_threshold.system.memory_nonHeap.unusedBytes + system.memory_nonHeap.usedBytes : + monasca_threshold.system.memory_nonHeap.usedBytes + system.memory_nonHeap.virtualFreeBytes : + monasca_threshold.system.memory_nonHeap.virtualFreeBytes + system.newWorkerEvent : + monasca_threshold.system.newWorkerEvent + system.receive.capacity : + monasca_threshold.system.receive.capacity + system.receive.population : + monasca_threshold.system.receive.population + system.receive.read_pos : + monasca_threshold.system.receive.read_pos + system.receive.write_pos : + monasca_threshold.system.receive.write_pos + system.sendqueue.capacity : + monasca_threshold.system.sendqueue.capacity + system.sendqueue.population : + monasca_threshold.system.sendqueue.population + system.sendqueue.read_pos : + monasca_threshold.system.sendqueue.read_pos + system.sendqueue.write_pos : + monasca_threshold.system.sendqueue.write_pos + system.startTimeSecs : + monasca_threshold.system.startTimeSecs + system.transfer.capacity : + monasca_threshold.system.transfer.capacity + system.transfer-count.metrics : + monasca_threshold.system.transfer-count.metrics + system.transfer.population : + monasca_threshold.system.transfer.population + system.transfer.read_pos : + monasca_threshold.system.transfer.read_pos + system.transfer.write_pos : + monasca_threshold.system.transfer.write_pos + system.uptimeSecs : + monasca_threshold.system.uptimeSecs + thresholding-bolt.ack-count.aggregation-bolt_default : + monasca_threshold.thresholding-bolt.ack-count.aggregation-bolt_default + thresholding-bolt.ack-count.event-bolt_alarm-definition-events : + monasca_threshold.thresholding-bolt.ack-count.event-bolt_alarm-definition-events + thresholding-bolt.ack-count.event-bolt_metric-sub-alarm-events : + monasca_threshold.thresholding-bolt.ack-count.event-bolt_metric-sub-alarm-events + thresholding-bolt.emit-count.metrics : + monasca_threshold.thresholding-bolt.emit-count.metrics + thresholding-bolt.execute-count.aggregation-bolt_default : + monasca_threshold.thresholding-bolt.execute-count.aggregation-bolt_default + thresholding-bolt.execute-count.event-bolt_alarm-definition-events : + monasca_threshold.thresholding-bolt.execute-count.event-bolt_alarm-definition-events + thresholding-bolt.execute-count.event-bolt_metric-sub-alarm-events : + monasca_threshold.thresholding-bolt.execute-count.event-bolt_metric-sub-alarm-events + thresholding-bolt.execute-latency.aggregation-bolt_default : + monasca_threshold.thresholding-bolt.execute-latency.aggregation-bolt_default + thresholding-bolt.execute-latency.event-bolt_alarm-definition-events : + monasca_threshold.thresholding-bolt.execute-latency.event-bolt_alarm-definition-events + thresholding-bolt.execute-latency.event-bolt_metric-sub-alarm-events : + monasca_threshold.thresholding-bolt.execute-latency.event-bolt_metric-sub-alarm-events + thresholding-bolt.process-latency.aggregation-bolt_default : + monasca_threshold.thresholding-bolt.process-latency.aggregation-bolt_default + thresholding-bolt.process-latency.event-bolt_alarm-definition-events : + monasca_threshold.thresholding-bolt.process-latency.event-bolt_alarm-definition-events + thresholding-bolt.process-latency.event-bolt_metric-sub-alarm-events : + monasca_threshold.thresholding-bolt.process-latency.event-bolt_metric-sub-alarm-events + thresholding-bolt.receive.capacity : + monasca_threshold.thresholding-bolt.receive.capacity + thresholding-bolt.receive.population : + monasca_threshold.thresholding-bolt.receive.population + thresholding-bolt.receive.read_pos : + monasca_threshold.thresholding-bolt.receive.read_pos + thresholding-bolt.receive.write_pos : + monasca_threshold.thresholding-bolt.receive.write_pos + thresholding-bolt.sendqueue.capacity : + monasca_threshold.thresholding-bolt.sendqueue.capacity + thresholding-bolt.sendqueue.population : + monasca_threshold.thresholding-bolt.sendqueue.population + thresholding-bolt.sendqueue.read_pos : + monasca_threshold.thresholding-bolt.sendqueue.read_pos + thresholding-bolt.sendqueue.write_pos : + monasca_threshold.thresholding-bolt.sendqueue.write_pos + thresholding-bolt.transfer-count.metrics : + monasca_threshold.thresholding-bolt.transfer-count.metrics diff --git a/thresh/src/main/java/monasca/thresh/TopologyModule.java b/thresh/src/main/java/monasca/thresh/TopologyModule.java index f7eba1f..716ecc5 100644 --- a/thresh/src/main/java/monasca/thresh/TopologyModule.java +++ b/thresh/src/main/java/monasca/thresh/TopologyModule.java @@ -85,12 +85,18 @@ public class TopologyModule extends AbstractModule { if (config.statsdConfig.getPort() != null) statsdConfig.put(StatsdMetricConsumer.STATSD_PORT, config.statsdConfig.getPort()); - if (config.statsdConfig.getPrefix() != null) - statsdConfig.put(StatsdMetricConsumer.STATSD_PREFIX, - config.statsdConfig.getPrefix()); + if (config.statsdConfig.getWhitelist() != null) + statsdConfig.put(StatsdMetricConsumer.STATSD_WHITELIST, + config.statsdConfig.getWhitelist()); + if (config.statsdConfig.getMetricmap() != null) + statsdConfig.put(StatsdMetricConsumer.STATSD_METRICMAP, + config.statsdConfig.getMetricmap()); if (config.statsdConfig.getDimensions() != null) statsdConfig.put(StatsdMetricConsumer.STATSD_DIMENSIONS, config.statsdConfig.getDimensions()); + if (config.statsdConfig.getDebugmetrics() != null) + statsdConfig.put(StatsdMetricConsumer.STATSD_DEBUGMETRICS, + config.statsdConfig.getDebugmetrics()); stormConfig.registerMetricsConsumer(StatsdMetricConsumer.class, statsdConfig, 2); diff --git a/thresh/src/main/java/monasca/thresh/utils/StatsdConfig.java b/thresh/src/main/java/monasca/thresh/utils/StatsdConfig.java index a672daa..a0e6e03 100644 --- a/thresh/src/main/java/monasca/thresh/utils/StatsdConfig.java +++ b/thresh/src/main/java/monasca/thresh/utils/StatsdConfig.java @@ -19,6 +19,7 @@ package monasca.thresh.utils; import java.io.Serializable; import java.util.Map; +import java.util.List; /* * Intended to deserialize the statsdConfig element in the @@ -29,11 +30,11 @@ public class StatsdConfig implements Serializable { private static final long serialVersionUID = 3634080153227179376L; private String host; - private Integer port; - private String prefix; - + private List whitelist; + private Boolean debugmetrics; + private Map metricmap; private Map dimensions; public Map getDimensions() { @@ -44,6 +45,30 @@ public class StatsdConfig implements Serializable { this.dimensions = dimensions; } + public List getWhitelist() { + return whitelist; + } + + public void setWhitelist(List whitelist) { + this.whitelist = whitelist; + } + + public Boolean getDebugmetrics() { + return debugmetrics; + } + + public void setDebugmetrics(Boolean debugmetrics) { + this.debugmetrics = debugmetrics; + } + + public Map getMetricmap() { + return metricmap; + } + + public void setMetricmap(Map metricmap) { + this.metricmap = metricmap; + } + public String getHost() { return host; } @@ -52,6 +77,14 @@ public class StatsdConfig implements Serializable { this.host = host; } + public String getPrefix() { + return prefix; + } + + public void setPrefix(String prefix) { + this.prefix = prefix; + } + public Integer getPort() { return port; } @@ -60,11 +93,5 @@ public class StatsdConfig implements Serializable { this.port = port; } - public String getPrefix() { - return prefix; - } - - public void setPrefix(String prefix) { - this.prefix = prefix; - } } + diff --git a/thresh/src/main/java/monasca/thresh/utils/StatsdMetricConsumer.java b/thresh/src/main/java/monasca/thresh/utils/StatsdMetricConsumer.java index 392d4cf..beec0c9 100644 --- a/thresh/src/main/java/monasca/thresh/utils/StatsdMetricConsumer.java +++ b/thresh/src/main/java/monasca/thresh/utils/StatsdMetricConsumer.java @@ -19,8 +19,12 @@ package monasca.thresh.utils; import java.io.IOException; import java.io.StringWriter; +import java.lang.Boolean; +import java.lang.String; import java.nio.charset.Charset; +import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -47,14 +51,20 @@ public class StatsdMetricConsumer implements IMetricsConsumer { public static final String STATSD_HOST = "metrics.statsd.host"; public static final String STATSD_PORT = "metrics.statsd.port"; - public static final String STATSD_PREFIX = "metrics.statsd.prefix"; + public static final String STATSD_METRICMAP = "metrics.statsd.metricmap"; + public static final String STATSD_WHITELIST = "metrics.statsd.whitelist"; public static final String STATSD_DIMENSIONS = "metrics.statsd.dimensions"; + public static final String STATSD_DEBUGMETRICS = "metrics.statsd.debugmetrics"; + + private String topologyName; + private String statsdHost = "localhost"; + private int statsdPort = 8125; + private String monascaStatsdDimPrefix = "|#"; + private List whiteList = new ArrayList(); + private Map metricMap = new HashMap(); + private Boolean debugMetrics = false; + - String topologyName; - String statsdHost = "localhost"; - int statsdPort = 8125; - String statsdPrefix = "monasca.storm."; - String monascaStatsdDimPrefix = "|#"; String defaultDimensions = new StringBuilder().append(monascaStatsdDimPrefix) .append("{\"service\":\"monitoring\",\"component\":\"storm\"}") .toString(); @@ -92,9 +102,13 @@ public class StatsdMetricConsumer implements IMetricsConsumer { @Override public void prepare(Map stormConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter) { + logger = LoggerFactory.getLogger(Logging.categoryFor(getClass(), context)); + + /* Sets up locals from the config STATSD_WHITELIST, STATSD_HOST ... */ parseConfig(stormConf); + /* Sets up local vars from config vars if present */ if (registrationArgument instanceof Map) { parseConfig((Map) registrationArgument); } @@ -102,8 +116,9 @@ public class StatsdMetricConsumer implements IMetricsConsumer { initClient(); logger.info( - "statsdPrefix ({}), topologyName ({}), clean(topologyName) ({})", - new Object[] { statsdPrefix, topologyName, clean(topologyName) }); + "topologyName ({}), " + + "clean(topologyName) ({})", new Object[] { topologyName, + clean(topologyName) }); } private void initClient() { @@ -144,13 +159,6 @@ public class StatsdMetricConsumer implements IMetricsConsumer { statsdPort = ((Number) conf.get(STATSD_PORT)).intValue(); } - if (conf.containsKey(STATSD_PREFIX)) { - statsdPrefix = (String) conf.get(STATSD_PREFIX); - if (!statsdPrefix.endsWith(".")) { - statsdPrefix += "."; - } - } - if (conf.containsKey(STATSD_DIMENSIONS)) { statsdDimensions = mapToJsonStr((Map) conf .get(STATSD_DIMENSIONS)); @@ -164,6 +172,18 @@ public class StatsdMetricConsumer implements IMetricsConsumer { statsdDimensions = monascaStatsdDimPrefix + statsdDimensions; } } + + if (conf.containsKey(STATSD_WHITELIST)) { + whiteList = (List) conf.get(STATSD_WHITELIST); + } + + if (conf.containsKey(STATSD_METRICMAP)) { + metricMap = (Map) conf.get(STATSD_METRICMAP); + } + + if (conf.containsKey(STATSD_DEBUGMETRICS)) { + debugMetrics = (Boolean) conf.get(STATSD_DEBUGMETRICS); + } } private String mapToJsonStr(Map inputMap) { @@ -216,19 +236,17 @@ public class StatsdMetricConsumer implements IMetricsConsumer { public void handleDataPoints(TaskInfo taskInfo, Collection dataPoints) { for (Metric metric : dataPointsToMetrics(taskInfo, dataPoints)) { - report(metric.name, metric.value, metric.dimensions); + reportUOM(metric.name, metric.value); } } public static class Metric { String name; Double value; - String dimensions; - public Metric(String name, Double value, String dimensions) { + public Metric(String name, Double value) { this.name = name; this.value = value; - this.dimensions = dimensions; } @Override @@ -248,15 +266,12 @@ public class StatsdMetricConsumer implements IMetricsConsumer { return false; if (value != other.value) return false; - if (!dimensions.equals(other.dimensions)) - return false; return true; } @Override public String toString() { - return "Metric [name=" + name + ", value=" + value + ", dimensions=" - + dimensions + "]"; + return "Metric [name=" + name + ", value=" + value + "]"; } } @@ -278,11 +293,11 @@ public class StatsdMetricConsumer implements IMetricsConsumer { new Object[] { p.name, p.value }); if (p.value instanceof Number) { - res.add(new Metric(sb.toString(), ((Number) p.value).doubleValue(), - statsdDimensions)); + res.add(new Metric(sb.toString(), ((Number) p.value).doubleValue())); } // There is a map of data points and it's not empty - else if (p.value instanceof Map && !(((Map) (p.value)).isEmpty())) { + else if (p.value instanceof Map && + !(((Map) (p.value)).isEmpty())) { int hdrAndNameLength = sb.length(); @SuppressWarnings("rawtypes") Map map = (Map) p.value; @@ -293,7 +308,7 @@ public class StatsdMetricConsumer implements IMetricsConsumer { sb.append(".").append(clean(subName.toString())); res.add(new Metric(sb.toString(), - ((Number) subValue).doubleValue(), statsdDimensions)); + ((Number) subValue).doubleValue())); } } } @@ -305,13 +320,10 @@ public class StatsdMetricConsumer implements IMetricsConsumer { * Since the Java client doesn't support the Monasca metric type we need to * build it with a raw UDP request */ - public void report(String s, Double number, String dimensions) { + public void report(String s) { if (udpclient != null) { - StringBuilder statsdMessage = new StringBuilder().append(statsdPrefix) - .append(s).append(":").append(String.valueOf(number)).append("|c") - .append(statsdDimensions); - logger.debug("reporting: {}={}{}", s, number, dimensions); - udpclient.send(statsdMessage.toString()); + logger.debug("reporting: {}", s); + udpclient.send(s); } else { /* Try to setup the UDP client since it was null */ @@ -319,6 +331,65 @@ public class StatsdMetricConsumer implements IMetricsConsumer { } } + private void reportUOM(String s, Double number) { + String metricName = null; + StringBuilder results = new StringBuilder(); + Boolean published = false; + + if (whiteList.contains(s)) { + + if (!metricMap.isEmpty() && metricMap.containsKey(s)) { + metricName = metricMap.get(s); + } + /* Send the unmapped uom as the same name storm calls it */ + else { + metricName = s; + } + + /* Make sure we don't send metric names that may be null or empty */ + if (metricName != null && !metricName.isEmpty()) { + published = true; + } + } + + /* + * To enable debug message, you also need to add an entry like this: + * + * + * + * + * + * + * Storm/Thresh logger config file: + * /opt/storm/apache-storm-0.9.5/logback/cluster.xml + * + */ + + if (debugMetrics) { + String mappedName = new String(); + + if (!metricMap.isEmpty() && metricMap.containsKey(s)) { + mappedName = metricMap.get(s); + } + else { + mappedName = s; + } + + logger.info(", RawMetricName, {}, MappedMetricName, {}, " + + "val, {}, {}", new Object[] + { s, mappedName, number, + published == true ? "PUBLISHED" : "UNPUBLISHED"}); + } + + if (published) { + results = results.append(metricName).append(":") + .append(String.valueOf(number)).append("|c") + .append(statsdDimensions); + + report(results.toString()); + } + } + @Override public void cleanup() { udpclient.stop();