Add measurement valueMeta

valueMeta for measurements that have it will be received in the message
from Kakfa and then persisted

Supported only by Influx V8 for now

Requires the changes to monasca-common from
https://review.openstack.org/160654

Removed ability to send arrays of timestamps and values. It was
undocumented and unused

Implements: blueprint measurement-meta-data

Change-Id: Ia3f91e2136c5186fd55f6201f274e3a869e1cae8
This commit is contained in:
Craig Bryant 2015-03-02 19:55:39 -07:00
parent 03b54c235c
commit f47845d2ba
6 changed files with 46 additions and 35 deletions

View File

@ -22,6 +22,7 @@ import monasca.persister.pipeline.MetricPipeline;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
@ -41,6 +42,7 @@ public class KafkaMetricsConsumerRunnableBasic extends KafkaConsumerRunnableBasi
this.objectMapper = new ObjectMapper();
objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
objectMapper.enable(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY);
objectMapper.setPropertyNamingStrategy(PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
}
@Override

View File

@ -51,7 +51,7 @@ public class MetricHandler extends FlushableHandler<MetricEnvelope[]> {
private final SimpleDateFormat simpleDateFormat;
private final MetricRepo verticaMetricRepo;
private final MetricRepo metricRepo;
private final Counter metricCounter;
private final Counter definitionCounter;
@ -64,7 +64,7 @@ public class MetricHandler extends FlushableHandler<MetricEnvelope[]> {
@Assisted("ordinal") int ordinal, @Assisted("batchSize") int batchSize) {
super(configuration, environment, ordinal, batchSize, MetricHandler.class.getName());
final String handlerName = String.format("%s[%d]", MetricHandler.class.getName(), ordinal);
this.verticaMetricRepo = metricRepo;
this.metricRepo = metricRepo;
this.metricCounter =
environment.metrics().counter(handlerName + "." + "metrics-added-to-batch-counter");
this.definitionCounter =
@ -129,7 +129,7 @@ public class MetricHandler extends FlushableHandler<MetricEnvelope[]> {
definitionIdStringToHash.append(trunc(region, MAX_COLUMN_LENGTH));
byte[] definitionIdSha1Hash = DigestUtils.sha(definitionIdStringToHash.toString());
Sha1HashId definitionSha1HashId = new Sha1HashId((definitionIdSha1Hash));
verticaMetricRepo
metricRepo
.addDefinitionToBatch(definitionSha1HashId, trunc(metric.getName(), MAX_COLUMN_LENGTH),
trunc(tenantId, MAX_COLUMN_LENGTH), trunc(region, MAX_COLUMN_LENGTH));
definitionCounter.inc();
@ -146,7 +146,7 @@ public class MetricHandler extends FlushableHandler<MetricEnvelope[]> {
// Add the dimension name/values to the batch.
for (Map.Entry<String, String> entry : preppedDimMap.entrySet()) {
verticaMetricRepo
metricRepo
.addDimensionToBatch(dimensionsSha1HashId, entry.getKey(), entry.getValue());
dimensionCounter.inc();
}
@ -160,38 +160,23 @@ public class MetricHandler extends FlushableHandler<MetricEnvelope[]> {
definitionDimensionsIdSha1Hash =
DigestUtils.sha(definitionDimensionsIdStringToHash.toString());
Sha1HashId definitionDimensionsSha1HashId = new Sha1HashId(definitionDimensionsIdSha1Hash);
verticaMetricRepo
metricRepo
.addDefinitionDimensionToBatch(definitionDimensionsSha1HashId, definitionSha1HashId,
dimensionsSha1HashId);
definitionDimensionsCounter.inc();
// Add the measurements to the batch.
if (metric.getTimeValues() != null)
{
for (double[] timeValuePairs : metric.getTimeValues()) {
String timeStamp = simpleDateFormat.format(new Date((long) (timeValuePairs[0] * 1000)));
double value = timeValuePairs[1];
verticaMetricRepo.addMetricToBatch(definitionDimensionsSha1HashId, timeStamp, value);
metricCounter.inc();
metricCount++;
}
} else
{
String timeStamp = simpleDateFormat.format(new Date(metric.getTimestamp() * 1000));
double value = metric.getValue();
verticaMetricRepo.addMetricToBatch(definitionDimensionsSha1HashId, timeStamp, value);
metricCounter.inc();
metricCount++;
}
String timeStamp = simpleDateFormat.format(new Date(metric.getTimestamp() * 1000));
double value = metric.getValue();
metricRepo.addMetricToBatch(definitionDimensionsSha1HashId, timeStamp, value,
metric.getValueMeta());
metricCounter.inc();
metricCount++;
return metricCount;
}
@Override
public void flushRepository() {
verticaMetricRepo.flush();
metricRepo.flush();
}

View File

@ -61,8 +61,8 @@ public abstract class InfluxMetricRepo implements MetricRepo {
@Override
public void addMetricToBatch(final Sha1HashId defDimsId, final String timeStamp,
final double value) {
final Measurement measurement = new Measurement(defDimsId, timeStamp, value);
final double value, final Map<String, String> valueMeta) {
final Measurement measurement = new Measurement(defDimsId, timeStamp, value, valueMeta);
List<Measurement> measurementList = this.measurementMap.get(defDimsId);
if (measurementList == null) {
measurementList = new LinkedList<>();
@ -160,11 +160,14 @@ public abstract class InfluxMetricRepo implements MetricRepo {
protected final Sha1HashId defDimsId;
protected final String time;
protected final double value;
protected final Map<String, String> valueMeta;
private Measurement(final Sha1HashId defDimsId, final String time, final double value) {
private Measurement(final Sha1HashId defDimsId, final String time, final double value,
final Map<String, String> valueMeta) {
this.defDimsId = defDimsId;
this.time = time;
this.value = value;
this.valueMeta = valueMeta;
}
@Override

View File

@ -17,6 +17,8 @@
package monasca.persister.repository;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import org.influxdb.dto.Serie;
@ -29,6 +31,7 @@ import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@ -38,13 +41,15 @@ public class InfluxV8MetricRepo extends InfluxMetricRepo
{
private static final Logger logger = LoggerFactory.getLogger(InfluxV8MetricRepo.class);
private static final String[] COL_NAMES_STRING_ARRY = {"time", "value"};
private static final String[] COL_NAMES_STRING_ARRY = {"time", "value", "value_meta"};
private final InfluxV8RepoWriter influxV8RepoWriter;
private final SimpleDateFormat simpleDateFormat =
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss zzz");
private final ObjectMapper objectMapper = new ObjectMapper();
@Inject
public InfluxV8MetricRepo(final Environment env,
final InfluxV8RepoWriter influxV8RepoWriter) {
@ -64,7 +69,8 @@ public class InfluxV8MetricRepo extends InfluxMetricRepo
final List<Serie> serieList = new LinkedList<>();
for (final Sha1HashId defDimId : this.measurementMap.keySet()) {
for (Map.Entry<Sha1HashId, List<Measurement>> entry : this.measurementMap.entrySet()) {
Sha1HashId defDimId = entry.getKey();
final DefDim defDim = this.defDimMap.get(defDimId);
final Def def = getDef(defDim.defId);
@ -73,7 +79,7 @@ public class InfluxV8MetricRepo extends InfluxMetricRepo
builder.columns(COL_NAMES_STRING_ARRY);
for (final Measurement measurement : this.measurementMap.get(defDimId)) {
for (final Measurement measurement : entry.getValue()) {
final Object[] colValsObjArry = new Object[COL_NAMES_STRING_ARRY.length];
final Date date = this.simpleDateFormat.parse(measurement.time + " UTC");
final Long time = date.getTime() / 1000;
@ -81,6 +87,16 @@ public class InfluxV8MetricRepo extends InfluxMetricRepo
logger.debug("Added column value to colValsObjArry[{}] = {}", 0, colValsObjArry[0]);
colValsObjArry[1] = measurement.value;
logger.debug("Added column value to colValsObjArry[{}] = {}", 1, colValsObjArry[1]);
if (measurement.valueMeta != null && !measurement.valueMeta.isEmpty()) {
try {
final String valueMetaJson = objectMapper.writeValueAsString(measurement.valueMeta);
colValsObjArry[2] = valueMetaJson;
logger.debug("Added column value to colValsObjArry[{}] = {}", 2, valueMetaJson);
}
catch (JsonProcessingException e) {
logger.error("Unable to serialize " + measurement.valueMeta, e);
}
}
builder.values(colValsObjArry);
this.measurementMeter.mark();
}

View File

@ -17,8 +17,10 @@
package monasca.persister.repository;
import java.util.Map;
public interface MetricRepo {
void addMetricToBatch(Sha1HashId defDimsId, String timeStamp, double value);
void addMetricToBatch(Sha1HashId defDimsId, String timeStamp, double value, Map<String, String> valueMeta);
void addDefinitionToBatch(Sha1HashId defId, String name, String tenantId, String region);

View File

@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory;
import java.security.NoSuchAlgorithmException;
import java.sql.SQLException;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import javax.inject.Inject;
@ -202,7 +203,9 @@ public class VerticaMetricRepo extends VerticaRepo implements MetricRepo {
}
@Override
public void addMetricToBatch(Sha1HashId defDimsId, String timeStamp, double value) {
public void addMetricToBatch(Sha1HashId defDimsId, String timeStamp, double value,
Map<String, String> valueMeta) {
// TODO: Actually handle valueMeta
logger.debug("Adding metric to batch: defDimsId: {}, time: {}, value: {}",
defDimsId.toHexString(), timeStamp, value);
metricsBatch.add().bind("definition_dimension_id", defDimsId.getSha1Hash())