From f47845d2ba5c0069754fd7648c412df2e66e9342 Mon Sep 17 00:00:00 2001 From: Craig Bryant Date: Mon, 2 Mar 2015 19:55:39 -0700 Subject: [PATCH] Add measurement valueMeta valueMeta for measurements that have it will be received in the message from Kakfa and then persisted Supported only by Influx V8 for now Requires the changes to monasca-common from https://review.openstack.org/160654 Removed ability to send arrays of timestamps and values. It was undocumented and unused Implements: blueprint measurement-meta-data Change-Id: Ia3f91e2136c5186fd55f6201f274e3a869e1cae8 --- .../KafkaMetricsConsumerRunnableBasic.java | 2 + .../pipeline/event/MetricHandler.java | 39 ++++++------------- .../repository/InfluxMetricRepo.java | 9 +++-- .../repository/InfluxV8MetricRepo.java | 22 +++++++++-- .../persister/repository/MetricRepo.java | 4 +- .../repository/VerticaMetricRepo.java | 5 ++- 6 files changed, 46 insertions(+), 35 deletions(-) diff --git a/java/src/main/java/monasca/persister/consumer/KafkaMetricsConsumerRunnableBasic.java b/java/src/main/java/monasca/persister/consumer/KafkaMetricsConsumerRunnableBasic.java index a46a8c94..05f31fd1 100644 --- a/java/src/main/java/monasca/persister/consumer/KafkaMetricsConsumerRunnableBasic.java +++ b/java/src/main/java/monasca/persister/consumer/KafkaMetricsConsumerRunnableBasic.java @@ -22,6 +22,7 @@ import monasca.persister.pipeline.MetricPipeline; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.PropertyNamingStrategy; import com.google.inject.Inject; import com.google.inject.assistedinject.Assisted; @@ -41,6 +42,7 @@ public class KafkaMetricsConsumerRunnableBasic extends KafkaConsumerRunnableBasi this.objectMapper = new ObjectMapper(); objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); objectMapper.enable(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY); + objectMapper.setPropertyNamingStrategy(PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES); } @Override diff --git a/java/src/main/java/monasca/persister/pipeline/event/MetricHandler.java b/java/src/main/java/monasca/persister/pipeline/event/MetricHandler.java index 904399ba..39ff4123 100644 --- a/java/src/main/java/monasca/persister/pipeline/event/MetricHandler.java +++ b/java/src/main/java/monasca/persister/pipeline/event/MetricHandler.java @@ -51,7 +51,7 @@ public class MetricHandler extends FlushableHandler { private final SimpleDateFormat simpleDateFormat; - private final MetricRepo verticaMetricRepo; + private final MetricRepo metricRepo; private final Counter metricCounter; private final Counter definitionCounter; @@ -64,7 +64,7 @@ public class MetricHandler extends FlushableHandler { @Assisted("ordinal") int ordinal, @Assisted("batchSize") int batchSize) { super(configuration, environment, ordinal, batchSize, MetricHandler.class.getName()); final String handlerName = String.format("%s[%d]", MetricHandler.class.getName(), ordinal); - this.verticaMetricRepo = metricRepo; + this.metricRepo = metricRepo; this.metricCounter = environment.metrics().counter(handlerName + "." + "metrics-added-to-batch-counter"); this.definitionCounter = @@ -129,7 +129,7 @@ public class MetricHandler extends FlushableHandler { definitionIdStringToHash.append(trunc(region, MAX_COLUMN_LENGTH)); byte[] definitionIdSha1Hash = DigestUtils.sha(definitionIdStringToHash.toString()); Sha1HashId definitionSha1HashId = new Sha1HashId((definitionIdSha1Hash)); - verticaMetricRepo + metricRepo .addDefinitionToBatch(definitionSha1HashId, trunc(metric.getName(), MAX_COLUMN_LENGTH), trunc(tenantId, MAX_COLUMN_LENGTH), trunc(region, MAX_COLUMN_LENGTH)); definitionCounter.inc(); @@ -146,7 +146,7 @@ public class MetricHandler extends FlushableHandler { // Add the dimension name/values to the batch. for (Map.Entry entry : preppedDimMap.entrySet()) { - verticaMetricRepo + metricRepo .addDimensionToBatch(dimensionsSha1HashId, entry.getKey(), entry.getValue()); dimensionCounter.inc(); } @@ -160,38 +160,23 @@ public class MetricHandler extends FlushableHandler { definitionDimensionsIdSha1Hash = DigestUtils.sha(definitionDimensionsIdStringToHash.toString()); Sha1HashId definitionDimensionsSha1HashId = new Sha1HashId(definitionDimensionsIdSha1Hash); - verticaMetricRepo + metricRepo .addDefinitionDimensionToBatch(definitionDimensionsSha1HashId, definitionSha1HashId, dimensionsSha1HashId); definitionDimensionsCounter.inc(); - - // Add the measurements to the batch. - if (metric.getTimeValues() != null) - - { - for (double[] timeValuePairs : metric.getTimeValues()) { - String timeStamp = simpleDateFormat.format(new Date((long) (timeValuePairs[0] * 1000))); - double value = timeValuePairs[1]; - verticaMetricRepo.addMetricToBatch(definitionDimensionsSha1HashId, timeStamp, value); - metricCounter.inc(); - metricCount++; - } - } else - - { - String timeStamp = simpleDateFormat.format(new Date(metric.getTimestamp() * 1000)); - double value = metric.getValue(); - verticaMetricRepo.addMetricToBatch(definitionDimensionsSha1HashId, timeStamp, value); - metricCounter.inc(); - metricCount++; - } + String timeStamp = simpleDateFormat.format(new Date(metric.getTimestamp() * 1000)); + double value = metric.getValue(); + metricRepo.addMetricToBatch(definitionDimensionsSha1HashId, timeStamp, value, + metric.getValueMeta()); + metricCounter.inc(); + metricCount++; return metricCount; } @Override public void flushRepository() { - verticaMetricRepo.flush(); + metricRepo.flush(); } diff --git a/java/src/main/java/monasca/persister/repository/InfluxMetricRepo.java b/java/src/main/java/monasca/persister/repository/InfluxMetricRepo.java index e8e24f92..04d76c25 100644 --- a/java/src/main/java/monasca/persister/repository/InfluxMetricRepo.java +++ b/java/src/main/java/monasca/persister/repository/InfluxMetricRepo.java @@ -61,8 +61,8 @@ public abstract class InfluxMetricRepo implements MetricRepo { @Override public void addMetricToBatch(final Sha1HashId defDimsId, final String timeStamp, - final double value) { - final Measurement measurement = new Measurement(defDimsId, timeStamp, value); + final double value, final Map valueMeta) { + final Measurement measurement = new Measurement(defDimsId, timeStamp, value, valueMeta); List measurementList = this.measurementMap.get(defDimsId); if (measurementList == null) { measurementList = new LinkedList<>(); @@ -160,11 +160,14 @@ public abstract class InfluxMetricRepo implements MetricRepo { protected final Sha1HashId defDimsId; protected final String time; protected final double value; + protected final Map valueMeta; - private Measurement(final Sha1HashId defDimsId, final String time, final double value) { + private Measurement(final Sha1HashId defDimsId, final String time, final double value, + final Map valueMeta) { this.defDimsId = defDimsId; this.time = time; this.value = value; + this.valueMeta = valueMeta; } @Override diff --git a/java/src/main/java/monasca/persister/repository/InfluxV8MetricRepo.java b/java/src/main/java/monasca/persister/repository/InfluxV8MetricRepo.java index d94a3d22..ceec1b9c 100644 --- a/java/src/main/java/monasca/persister/repository/InfluxV8MetricRepo.java +++ b/java/src/main/java/monasca/persister/repository/InfluxV8MetricRepo.java @@ -17,6 +17,8 @@ package monasca.persister.repository; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Inject; import org.influxdb.dto.Serie; @@ -29,6 +31,7 @@ import java.text.SimpleDateFormat; import java.util.Date; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -38,13 +41,15 @@ public class InfluxV8MetricRepo extends InfluxMetricRepo { private static final Logger logger = LoggerFactory.getLogger(InfluxV8MetricRepo.class); - private static final String[] COL_NAMES_STRING_ARRY = {"time", "value"}; + private static final String[] COL_NAMES_STRING_ARRY = {"time", "value", "value_meta"}; private final InfluxV8RepoWriter influxV8RepoWriter; private final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss zzz"); + private final ObjectMapper objectMapper = new ObjectMapper(); + @Inject public InfluxV8MetricRepo(final Environment env, final InfluxV8RepoWriter influxV8RepoWriter) { @@ -64,7 +69,8 @@ public class InfluxV8MetricRepo extends InfluxMetricRepo final List serieList = new LinkedList<>(); - for (final Sha1HashId defDimId : this.measurementMap.keySet()) { + for (Map.Entry> entry : this.measurementMap.entrySet()) { + Sha1HashId defDimId = entry.getKey(); final DefDim defDim = this.defDimMap.get(defDimId); final Def def = getDef(defDim.defId); @@ -73,7 +79,7 @@ public class InfluxV8MetricRepo extends InfluxMetricRepo builder.columns(COL_NAMES_STRING_ARRY); - for (final Measurement measurement : this.measurementMap.get(defDimId)) { + for (final Measurement measurement : entry.getValue()) { final Object[] colValsObjArry = new Object[COL_NAMES_STRING_ARRY.length]; final Date date = this.simpleDateFormat.parse(measurement.time + " UTC"); final Long time = date.getTime() / 1000; @@ -81,6 +87,16 @@ public class InfluxV8MetricRepo extends InfluxMetricRepo logger.debug("Added column value to colValsObjArry[{}] = {}", 0, colValsObjArry[0]); colValsObjArry[1] = measurement.value; logger.debug("Added column value to colValsObjArry[{}] = {}", 1, colValsObjArry[1]); + if (measurement.valueMeta != null && !measurement.valueMeta.isEmpty()) { + try { + final String valueMetaJson = objectMapper.writeValueAsString(measurement.valueMeta); + colValsObjArry[2] = valueMetaJson; + logger.debug("Added column value to colValsObjArry[{}] = {}", 2, valueMetaJson); + } + catch (JsonProcessingException e) { + logger.error("Unable to serialize " + measurement.valueMeta, e); + } + } builder.values(colValsObjArry); this.measurementMeter.mark(); } diff --git a/java/src/main/java/monasca/persister/repository/MetricRepo.java b/java/src/main/java/monasca/persister/repository/MetricRepo.java index 89199907..01a74e0c 100644 --- a/java/src/main/java/monasca/persister/repository/MetricRepo.java +++ b/java/src/main/java/monasca/persister/repository/MetricRepo.java @@ -17,8 +17,10 @@ package monasca.persister.repository; +import java.util.Map; + public interface MetricRepo { - void addMetricToBatch(Sha1HashId defDimsId, String timeStamp, double value); + void addMetricToBatch(Sha1HashId defDimsId, String timeStamp, double value, Map valueMeta); void addDefinitionToBatch(Sha1HashId defId, String name, String tenantId, String region); diff --git a/java/src/main/java/monasca/persister/repository/VerticaMetricRepo.java b/java/src/main/java/monasca/persister/repository/VerticaMetricRepo.java index 5519eb92..9100b103 100644 --- a/java/src/main/java/monasca/persister/repository/VerticaMetricRepo.java +++ b/java/src/main/java/monasca/persister/repository/VerticaMetricRepo.java @@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory; import java.security.NoSuchAlgorithmException; import java.sql.SQLException; import java.util.HashSet; +import java.util.Map; import java.util.Set; import javax.inject.Inject; @@ -202,7 +203,9 @@ public class VerticaMetricRepo extends VerticaRepo implements MetricRepo { } @Override - public void addMetricToBatch(Sha1HashId defDimsId, String timeStamp, double value) { + public void addMetricToBatch(Sha1HashId defDimsId, String timeStamp, double value, + Map valueMeta) { + // TODO: Actually handle valueMeta logger.debug("Adding metric to batch: defDimsId: {}, time: {}, value: {}", defDimsId.toHexString(), timeStamp, value); metricsBatch.add().bind("definition_dimension_id", defDimsId.getSha1Hash())