Merge "Add measurement valueMeta"

This commit is contained in:
Jenkins 2015-03-10 04:50:22 +00:00 committed by Gerrit Code Review
commit b7acdad183
6 changed files with 46 additions and 35 deletions

@ -22,6 +22,7 @@ import monasca.persister.pipeline.MetricPipeline;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
@ -41,6 +42,7 @@ public class KafkaMetricsConsumerRunnableBasic extends KafkaConsumerRunnableBasi
this.objectMapper = new ObjectMapper();
objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
objectMapper.enable(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY);
objectMapper.setPropertyNamingStrategy(PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
}
@Override

@ -51,7 +51,7 @@ public class MetricHandler extends FlushableHandler<MetricEnvelope[]> {
private final SimpleDateFormat simpleDateFormat;
private final MetricRepo verticaMetricRepo;
private final MetricRepo metricRepo;
private final Counter metricCounter;
private final Counter definitionCounter;
@ -64,7 +64,7 @@ public class MetricHandler extends FlushableHandler<MetricEnvelope[]> {
@Assisted("ordinal") int ordinal, @Assisted("batchSize") int batchSize) {
super(configuration, environment, ordinal, batchSize, MetricHandler.class.getName());
final String handlerName = String.format("%s[%d]", MetricHandler.class.getName(), ordinal);
this.verticaMetricRepo = metricRepo;
this.metricRepo = metricRepo;
this.metricCounter =
environment.metrics().counter(handlerName + "." + "metrics-added-to-batch-counter");
this.definitionCounter =
@ -129,7 +129,7 @@ public class MetricHandler extends FlushableHandler<MetricEnvelope[]> {
definitionIdStringToHash.append(trunc(region, MAX_COLUMN_LENGTH));
byte[] definitionIdSha1Hash = DigestUtils.sha(definitionIdStringToHash.toString());
Sha1HashId definitionSha1HashId = new Sha1HashId((definitionIdSha1Hash));
verticaMetricRepo
metricRepo
.addDefinitionToBatch(definitionSha1HashId, trunc(metric.getName(), MAX_COLUMN_LENGTH),
trunc(tenantId, MAX_COLUMN_LENGTH), trunc(region, MAX_COLUMN_LENGTH));
definitionCounter.inc();
@ -146,7 +146,7 @@ public class MetricHandler extends FlushableHandler<MetricEnvelope[]> {
// Add the dimension name/values to the batch.
for (Map.Entry<String, String> entry : preppedDimMap.entrySet()) {
verticaMetricRepo
metricRepo
.addDimensionToBatch(dimensionsSha1HashId, entry.getKey(), entry.getValue());
dimensionCounter.inc();
}
@ -160,38 +160,23 @@ public class MetricHandler extends FlushableHandler<MetricEnvelope[]> {
definitionDimensionsIdSha1Hash =
DigestUtils.sha(definitionDimensionsIdStringToHash.toString());
Sha1HashId definitionDimensionsSha1HashId = new Sha1HashId(definitionDimensionsIdSha1Hash);
verticaMetricRepo
metricRepo
.addDefinitionDimensionToBatch(definitionDimensionsSha1HashId, definitionSha1HashId,
dimensionsSha1HashId);
definitionDimensionsCounter.inc();
// Add the measurements to the batch.
if (metric.getTimeValues() != null)
{
for (double[] timeValuePairs : metric.getTimeValues()) {
String timeStamp = simpleDateFormat.format(new Date((long) (timeValuePairs[0] * 1000)));
double value = timeValuePairs[1];
verticaMetricRepo.addMetricToBatch(definitionDimensionsSha1HashId, timeStamp, value);
metricCounter.inc();
metricCount++;
}
} else
{
String timeStamp = simpleDateFormat.format(new Date(metric.getTimestamp() * 1000));
double value = metric.getValue();
verticaMetricRepo.addMetricToBatch(definitionDimensionsSha1HashId, timeStamp, value);
metricCounter.inc();
metricCount++;
}
String timeStamp = simpleDateFormat.format(new Date(metric.getTimestamp() * 1000));
double value = metric.getValue();
metricRepo.addMetricToBatch(definitionDimensionsSha1HashId, timeStamp, value,
metric.getValueMeta());
metricCounter.inc();
metricCount++;
return metricCount;
}
@Override
public void flushRepository() {
verticaMetricRepo.flush();
metricRepo.flush();
}

@ -61,8 +61,8 @@ public abstract class InfluxMetricRepo implements MetricRepo {
@Override
public void addMetricToBatch(final Sha1HashId defDimsId, final String timeStamp,
final double value) {
final Measurement measurement = new Measurement(defDimsId, timeStamp, value);
final double value, final Map<String, String> valueMeta) {
final Measurement measurement = new Measurement(defDimsId, timeStamp, value, valueMeta);
List<Measurement> measurementList = this.measurementMap.get(defDimsId);
if (measurementList == null) {
measurementList = new LinkedList<>();
@ -160,11 +160,14 @@ public abstract class InfluxMetricRepo implements MetricRepo {
protected final Sha1HashId defDimsId;
protected final String time;
protected final double value;
protected final Map<String, String> valueMeta;
private Measurement(final Sha1HashId defDimsId, final String time, final double value) {
private Measurement(final Sha1HashId defDimsId, final String time, final double value,
final Map<String, String> valueMeta) {
this.defDimsId = defDimsId;
this.time = time;
this.value = value;
this.valueMeta = valueMeta;
}
@Override

@ -17,6 +17,8 @@
package monasca.persister.repository;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import org.influxdb.dto.Serie;
@ -29,6 +31,7 @@ import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@ -38,13 +41,15 @@ public class InfluxV8MetricRepo extends InfluxMetricRepo
{
private static final Logger logger = LoggerFactory.getLogger(InfluxV8MetricRepo.class);
private static final String[] COL_NAMES_STRING_ARRY = {"time", "value"};
private static final String[] COL_NAMES_STRING_ARRY = {"time", "value", "value_meta"};
private final InfluxV8RepoWriter influxV8RepoWriter;
private final SimpleDateFormat simpleDateFormat =
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss zzz");
private final ObjectMapper objectMapper = new ObjectMapper();
@Inject
public InfluxV8MetricRepo(final Environment env,
final InfluxV8RepoWriter influxV8RepoWriter) {
@ -64,7 +69,8 @@ public class InfluxV8MetricRepo extends InfluxMetricRepo
final List<Serie> serieList = new LinkedList<>();
for (final Sha1HashId defDimId : this.measurementMap.keySet()) {
for (Map.Entry<Sha1HashId, List<Measurement>> entry : this.measurementMap.entrySet()) {
Sha1HashId defDimId = entry.getKey();
final DefDim defDim = this.defDimMap.get(defDimId);
final Def def = getDef(defDim.defId);
@ -73,7 +79,7 @@ public class InfluxV8MetricRepo extends InfluxMetricRepo
builder.columns(COL_NAMES_STRING_ARRY);
for (final Measurement measurement : this.measurementMap.get(defDimId)) {
for (final Measurement measurement : entry.getValue()) {
final Object[] colValsObjArry = new Object[COL_NAMES_STRING_ARRY.length];
final Date date = this.simpleDateFormat.parse(measurement.time + " UTC");
final Long time = date.getTime() / 1000;
@ -81,6 +87,16 @@ public class InfluxV8MetricRepo extends InfluxMetricRepo
logger.debug("Added column value to colValsObjArry[{}] = {}", 0, colValsObjArry[0]);
colValsObjArry[1] = measurement.value;
logger.debug("Added column value to colValsObjArry[{}] = {}", 1, colValsObjArry[1]);
if (measurement.valueMeta != null && !measurement.valueMeta.isEmpty()) {
try {
final String valueMetaJson = objectMapper.writeValueAsString(measurement.valueMeta);
colValsObjArry[2] = valueMetaJson;
logger.debug("Added column value to colValsObjArry[{}] = {}", 2, valueMetaJson);
}
catch (JsonProcessingException e) {
logger.error("Unable to serialize " + measurement.valueMeta, e);
}
}
builder.values(colValsObjArry);
this.measurementMeter.mark();
}

@ -17,8 +17,10 @@
package monasca.persister.repository;
import java.util.Map;
public interface MetricRepo {
void addMetricToBatch(Sha1HashId defDimsId, String timeStamp, double value);
void addMetricToBatch(Sha1HashId defDimsId, String timeStamp, double value, Map<String, String> valueMeta);
void addDefinitionToBatch(Sha1HashId defId, String name, String tenantId, String region);

@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory;
import java.security.NoSuchAlgorithmException;
import java.sql.SQLException;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import javax.inject.Inject;
@ -202,7 +203,9 @@ public class VerticaMetricRepo extends VerticaRepo implements MetricRepo {
}
@Override
public void addMetricToBatch(Sha1HashId defDimsId, String timeStamp, double value) {
public void addMetricToBatch(Sha1HashId defDimsId, String timeStamp, double value,
Map<String, String> valueMeta) {
// TODO: Actually handle valueMeta
logger.debug("Adding metric to batch: defDimsId: {}, time: {}, value: {}",
defDimsId.toHexString(), timeStamp, value);
metricsBatch.add().bind("definition_dimension_id", defDimsId.getSha1Hash())