Merge "Update vertica code"

This commit is contained in:
Jenkins 2015-04-28 21:53:03 +00:00 committed by Gerrit Code Review
commit bfab669407
10 changed files with 407 additions and 197 deletions

View File

@ -63,10 +63,21 @@ public class AlarmStateTransitionedEventHandler extends
}
@Override
protected int process(String msg) throws IOException {
protected int process(String msg) {
AlarmStateTransitionedEvent alarmStateTransitionedEvent =
this.objectMapper.readValue(msg, AlarmStateTransitionedEvent.class);
AlarmStateTransitionedEvent alarmStateTransitionedEvent;
try {
alarmStateTransitionedEvent =
this.objectMapper.readValue(msg, AlarmStateTransitionedEvent.class);
} catch (IOException e) {
logger.error("[{}]: failed to deserialize message {}", this.threadId, msg, e);
return 0;
}
logger.debug("[{}]: [{}:{}] {}",
this.threadId,
@ -74,7 +85,7 @@ public class AlarmStateTransitionedEventHandler extends
this.getMsgCount(),
alarmStateTransitionedEvent);
this.alarmRepo.addToBatch(alarmStateTransitionedEvent);
this.alarmRepo.addToBatch(alarmStateTransitionedEvent, this.threadId);
this.alarmStateTransitionCounter.inc();

View File

@ -28,8 +28,6 @@ import io.dropwizard.setup.Environment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
public abstract class FlushableHandler<T> {
private static final Logger logger = LoggerFactory.getLogger(FlushableHandler.class);
@ -43,8 +41,8 @@ public abstract class FlushableHandler<T> {
private long batchCount = 0;
private final Meter processedMeter;
private final Meter commitMeter;
private final Timer commitTimer;
private final Meter flushMeter;
private final Timer flushTimer;
protected final String threadId;
@ -67,14 +65,13 @@ public abstract class FlushableHandler<T> {
threadId);
this.processedMeter =
environment.metrics()
.meter(handlerName + "." + "events-processed-processedMeter");
environment.metrics().meter(handlerName + "." + "events-processed-meter");
this.commitMeter =
environment.metrics().meter(handlerName + "." + "commits-executed-processedMeter");
this.flushMeter =
environment.metrics().meter(handlerName + "." + "flush-meter");
this.commitTimer =
environment.metrics().timer(handlerName + "." + "total-commit-and-flush-timer");
this.flushTimer =
environment.metrics().timer(handlerName + "." + "flush-timer");
this.secondsBetweenFlushes = configuration.getMaxBatchTime();
@ -90,7 +87,7 @@ public abstract class FlushableHandler<T> {
protected abstract int flushRepository() throws Exception;
protected abstract int process(String msg) throws IOException;
protected abstract int process(String msg);
public boolean onEvent(final String msg) throws Exception {
@ -150,7 +147,7 @@ public abstract class FlushableHandler<T> {
private boolean isFlushTime() {
logger.debug("[{}}: got heartbeat message, checking flush time. flush every {} seconds.",
logger.debug("[{}]: got heartbeat message, checking flush time. flush every {} seconds.",
this.threadId,
this.secondsBetweenFlushes);
@ -159,7 +156,7 @@ public abstract class FlushableHandler<T> {
if (this.flushTimeMillis <= now ) {
logger.debug(
"[{}]: {} millis past flush time. flushing to repository now.",
"[{}]: {} ms past flush time. flushing to repository now.",
this.threadId,
now - this.flushTimeMillis);
@ -168,7 +165,7 @@ public abstract class FlushableHandler<T> {
} else {
logger.debug(
"[{}]: {} millis to next flush time. no need to flush at this time.",
"[{}]: {} ms to next flush time. no need to flush at this time.",
this.threadId,
this.flushTimeMillis - now);
@ -181,13 +178,13 @@ public abstract class FlushableHandler<T> {
logger.debug("[{}]: flushing", this.threadId);
Timer.Context context = this.commitTimer.time();
Timer.Context context = this.flushTimer.time();
int msgFlushCnt = flushRepository();
context.stop();
this.commitMeter.mark();
this.flushMeter.mark();
this.flushTimeMillis = System.currentTimeMillis() + this.millisBetweenFlushes;

View File

@ -63,10 +63,20 @@ public class MetricHandler extends FlushableHandler<MetricEnvelope[]> {
}
@Override
public int process(String msg) throws IOException {
public int process(String msg) {
MetricEnvelope[] metricEnvelopesArry =
this.objectMapper.readValue(msg, MetricEnvelope[].class);
MetricEnvelope[] metricEnvelopesArry;
try {
metricEnvelopesArry = this.objectMapper.readValue(msg, MetricEnvelope[].class);
} catch (IOException e) {
logger.error("[{}]: failed to deserialize message {}", this.threadId, msg, e);
return 0;
}
for (final MetricEnvelope metricEnvelope : metricEnvelopesArry) {
@ -85,7 +95,7 @@ public class MetricHandler extends FlushableHandler<MetricEnvelope[]> {
this.getMsgCount(),
metricEnvelope);
this.metricRepo.addToBatch(metricEnvelope);
this.metricRepo.addToBatch(metricEnvelope, this.threadId);
this.metricCounter.inc();

View File

@ -18,7 +18,7 @@ package monasca.persister.repository;
public interface Repo<T> {
void addToBatch(final T msg);
void addToBatch(final T msg, String id);
int flush(String id) throws Exception;

View File

@ -24,7 +24,8 @@ import java.util.List;
import io.dropwizard.setup.Environment;
public abstract class InfluxAlarmRepo extends InfluxRepo<AlarmStateTransitionedEvent> {
public abstract class InfluxAlarmRepo
extends InfluxRepo<AlarmStateTransitionedEvent> {
protected static final String ALARM_STATE_HISTORY_NAME = "alarm_state_history";
@ -42,7 +43,7 @@ public abstract class InfluxAlarmRepo extends InfluxRepo<AlarmStateTransitionedE
}
@Override
public void addToBatch(AlarmStateTransitionedEvent alarmStateTransitionedEvent) {
public void addToBatch(AlarmStateTransitionedEvent alarmStateTransitionedEvent, String id) {
this.alarmStateTransitionedEventList.add(alarmStateTransitionedEvent);

View File

@ -20,8 +20,6 @@ package monasca.persister.repository.influxdb;
import monasca.common.model.metric.Metric;
import monasca.common.model.metric.MetricEnvelope;
import com.codahale.metrics.Meter;
import java.util.Map;
import io.dropwizard.setup.Environment;
@ -30,19 +28,14 @@ public abstract class InfluxMetricRepo extends InfluxRepo<MetricEnvelope> {
protected final MeasurementBuffer measurementBuffer = new MeasurementBuffer();
protected final Meter measurementMeter;
public InfluxMetricRepo(final Environment env) {
super(env);
this.measurementMeter =
env.metrics().meter(this.getClass().getName() + ".measurement-meter");
}
@Override
public void addToBatch(MetricEnvelope metricEnvelope) {
public void addToBatch(MetricEnvelope metricEnvelope, String id) {
Metric metric = metricEnvelope.metric;
@ -64,8 +57,6 @@ public abstract class InfluxMetricRepo extends InfluxRepo<MetricEnvelope> {
this.measurementBuffer.put(definition, dimensions, measurement);
this.measurementMeter.mark();
}
@Override

View File

@ -33,7 +33,7 @@ public abstract class InfluxRepo<T> implements Repo<T> {
public InfluxRepo (final Environment env) {
this.flushTimer =
env.metrics().timer(this.getClass().getName() + ".flush-timer");
env.metrics().timer(this.getClass().getName() + "." + "flush-timer");
}
@ -69,7 +69,7 @@ public abstract class InfluxRepo<T> implements Repo<T> {
context.stop();
logger.debug("[{}]: writing to influxdb took {} millis", id, endTime - startTime);
logger.debug("[{}]: writing to influxdb took {} ms", id, endTime - startTime);
clearBuffers();

View File

@ -21,6 +21,7 @@ import monasca.common.model.event.AlarmStateTransitionedEvent;
import com.google.inject.Inject;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
@ -28,6 +29,8 @@ import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.LinkedList;
@ -38,6 +41,8 @@ import io.dropwizard.setup.Environment;
public class InfluxV9AlarmRepo extends InfluxAlarmRepo {
private static final Logger logger = LoggerFactory.getLogger(InfluxV9AlarmRepo.class);
private final InfluxV9RepoWriter influxV9RepoWriter;
private final ObjectMapper objectMapper = new ObjectMapper();
@ -60,11 +65,11 @@ public class InfluxV9AlarmRepo extends InfluxAlarmRepo {
@Override
protected int write(String id) throws Exception {
return this.influxV9RepoWriter.write(getInfluxPointArry(), id);
return this.influxV9RepoWriter.write(getInfluxPointArry(id), id);
}
private InfluxPoint[] getInfluxPointArry() throws Exception {
private InfluxPoint[] getInfluxPointArry(String id) throws Exception {
List<InfluxPoint> influxPointList = new LinkedList<>();
@ -76,13 +81,33 @@ public class InfluxV9AlarmRepo extends InfluxAlarmRepo {
valueMap.put("alarm_id", event.alarmId);
try {
valueMap.put("metrics", this.objectMapper.writeValueAsString(event.metrics));
} catch (JsonProcessingException e) {
logger.error("[{}]: failed to serialize metrics {}", id, event.metrics, e);
valueMap.put("metrics", "");
}
valueMap.put("old_state", event.oldState);
valueMap.put("new_state", event.newState);
valueMap.put("sub_alarms", this.objectMapper.writeValueAsString(event.subAlarms));
try {
valueMap.put("sub_alarms", this.objectMapper.writeValueAsString(event.subAlarms));
} catch (JsonProcessingException e) {
logger.error("[{}]: failed to serialize sub alarms {}", id, event.subAlarms, e);
valueMap.put("sub_alarms", "");
}
valueMap.put("reason", event.stateChangeReason);

View File

@ -19,8 +19,11 @@ package monasca.persister.repository.vertica;
import monasca.common.model.event.AlarmStateTransitionedEvent;
import monasca.persister.configuration.PersisterConfig;
import monasca.persister.repository.Repo;
import com.codahale.metrics.Timer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.PreparedBatch;
@ -36,7 +39,6 @@ import java.util.TimeZone;
import javax.inject.Inject;
import io.dropwizard.setup.Environment;
import monasca.persister.repository.Repo;
public class VerticaAlarmRepo extends VerticaRepo implements Repo<AlarmStateTransitionedEvent> {
@ -44,59 +46,127 @@ public class VerticaAlarmRepo extends VerticaRepo implements Repo<AlarmStateTran
private final Environment environment;
private static final String SQL_INSERT_INTO_ALARM_HISTORY =
"insert into MonAlarms.StateHistory (tenant_id, alarm_id, old_state, new_state, reason, reason_data, time_stamp) values (:tenant_id, :alarm_id, :old_state, :new_state, :reason, :reason_data, :time_stamp)";
"insert into MonAlarms.StateHistory (tenant_id, alarm_id, metrics, old_state, new_state, sub_alarms, reason, reason_data, time_stamp) "
+ "values (:tenant_id, :alarm_id, :metrics, :old_state, :new_state, :sub_alarms, :reason, :reason_data, :time_stamp)";
private PreparedBatch batch;
private final Timer commitTimer;
private final SimpleDateFormat simpleDateFormat;
private int msgCnt = 0;
private ObjectMapper objectMapper = new ObjectMapper();
@Inject
public VerticaAlarmRepo(DBI dbi, PersisterConfig configuration, Environment environment) throws NoSuchAlgorithmException, SQLException {
public VerticaAlarmRepo(
DBI dbi,
PersisterConfig configuration,
Environment environment) throws NoSuchAlgorithmException, SQLException {
super(dbi);
logger.debug("Instantiating: " + this);
logger.debug("Instantiating " + this.getClass().getName());
this.environment = environment;
this.commitTimer =
this.environment.metrics().timer(this.getClass().getName() + "." + "commits-timer");
this.environment.metrics().timer(this.getClass().getName() + "." + "commit-timer");
logger.debug("preparing batches...");
handle.getConnection().setAutoCommit(false);
batch = handle.prepareBatch(SQL_INSERT_INTO_ALARM_HISTORY);
handle.begin();
simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
simpleDateFormat.setTimeZone(TimeZone.getTimeZone("GMT-0"));
logger.debug(this.getClass().getName() + " is fully instantiated");
}
public void addToBatch(AlarmStateTransitionedEvent message) {
String timeStamp = simpleDateFormat.format(new Date(message.timestamp * 1000));
batch.add().bind(0, message.tenantId).bind(1, message.alarmId).bind(2, message.oldState.name())
.bind(3, message.newState.name()).bind(4, message.stateChangeReason).bind(5, "{}")
.bind(6, timeStamp);
public void addToBatch(AlarmStateTransitionedEvent message, String id) {
String metricsString = getSerializedString(message.metrics, id);
String subAlarmsString = getSerializedString(message.subAlarms, id);
String timeStamp = simpleDateFormat.format(new Date(message.timestamp));
batch.add()
.bind("tenant_id", message.tenantId)
.bind("alarm_id", message.alarmId)
.bind("metrics", metricsString)
.bind("old_state", message.oldState.name())
.bind("new_state", message.newState.name())
.bind("sub_alarms", subAlarmsString)
.bind("reason", message.stateChangeReason)
.bind("reason_data", "{}")
.bind("time_stamp", timeStamp);
}
private String getSerializedString(Object o, String id) {
try {
return this.objectMapper.writeValueAsString(o);
} catch (JsonProcessingException e) {
logger.error("[[}]: failed to serialize object {}", id, o, e);
return "";
}
}
public int flush(String id) {
try {
commitBatch();
commitBatch(id);
int flushCnt = msgCnt;
this.msgCnt = 0;
return flushCnt;
} catch (Exception e) {
logger.error("Failed to write alarms to database", e);
logger.error("[{}]: failed to write alarms to database", id, e);
if (handle.isInTransaction()) {
handle.rollback();
}
handle.begin();
return this.msgCnt = 0;
}
// Todo. implement cnt.
return 0;
}
private void commitBatch() {
private void commitBatch(String id) {
long startTime = System.currentTimeMillis();
Timer.Context context = commitTimer.time();
batch.execute();
handle.commit();
handle.begin();
context.stop();
long endTime = System.currentTimeMillis();
logger.debug("Commiting batch took " + (endTime - startTime) / 1000 + " seconds");
logger.debug("[{}]: committing batch took {} ms", id, endTime - startTime);
}
}

View File

@ -17,17 +17,13 @@
package monasca.persister.repository.vertica;
import monasca.common.model.metric.Metric;
import monasca.common.model.metric.MetricEnvelope;
import monasca.persister.configuration.PersisterConfig;
import monasca.persister.pipeline.event.MetricHandler;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.codec.digest.DigestUtils;
import org.skife.jdbi.v2.DBI;
@ -41,7 +37,6 @@ import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashSet;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TimeZone;
import java.util.TreeMap;
@ -49,6 +44,9 @@ import java.util.TreeMap;
import javax.inject.Inject;
import io.dropwizard.setup.Environment;
import monasca.common.model.metric.Metric;
import monasca.common.model.metric.MetricEnvelope;
import monasca.persister.configuration.PersisterConfig;
import monasca.persister.repository.Repo;
public class VerticaMetricRepo extends VerticaRepo implements Repo<MetricEnvelope> {
@ -72,8 +70,13 @@ public class VerticaMetricRepo extends VerticaRepo implements Repo<MetricEnvelop
private final Set<Sha1HashId> dimensionIdSet = new HashSet<>();
private final Set<Sha1HashId> definitionDimensionsIdSet = new HashSet<>();
private int measurementCnt = 0;
private final ObjectMapper objectMapper = new ObjectMapper();
private static final String SQL_INSERT_INTO_METRICS =
"insert into MonMetrics.measurements (definition_dimensions_id, time_stamp, value) values (:definition_dimension_id, :time_stamp, :value)";
"insert into MonMetrics.measurements (definition_dimensions_id, time_stamp, value, value_meta) "
+ "values (:definition_dimension_id, :time_stamp, :value, :value_meta)";
private static final String DEFINITIONS_TEMP_STAGING_TABLE = "(" + " id BINARY(20) NOT NULL,"
+ " name VARCHAR(255) NOT NULL," + " tenant_id VARCHAR(255) NOT NULL,"
@ -100,13 +103,8 @@ public class VerticaMetricRepo extends VerticaRepo implements Repo<MetricEnvelop
private final String dimensionsTempStagingTableInsertStmt;
private final String definitionDimensionsTempStagingTableInsertStmt;
private final Timer commitTimer;
private final Counter metricCounter;
private final Counter definitionCounter;
private final Counter dimensionCounter;
private final Counter definitionDimensionsCounter;
private final Timer flushTimer;
public final Meter measurementMeter;
public final Meter definitionCacheMissMeter;
public final Meter dimensionCacheMissMeter;
@ -116,60 +114,62 @@ public class VerticaMetricRepo extends VerticaRepo implements Repo<MetricEnvelop
public final Meter definitionDimensionCacheHitMeter;
@Inject
public VerticaMetricRepo(DBI dbi, PersisterConfig configuration,
Environment environment) throws NoSuchAlgorithmException, SQLException {
public VerticaMetricRepo(
DBI dbi,
PersisterConfig configuration,
Environment environment) throws NoSuchAlgorithmException, SQLException {
super(dbi);
logger.debug("Instantiating: " + this.getClass().getName());
logger.debug("Instantiating " + this.getClass().getName());
simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
simpleDateFormat.setTimeZone(TimeZone.getTimeZone("GMT-0"));
this.environment = environment;
final String handlerName = String.format("%s[%d]", MetricHandler.class.getName(), new Random().nextInt());
this.metricCounter =
environment.metrics().counter(handlerName + "." + "metrics-added-to-batch-counter");
this.definitionCounter =
environment.metrics()
.counter(handlerName + "." + "metric-definitions-added-to-batch-counter");
this.dimensionCounter =
environment.metrics()
.counter(handlerName + "." + "metric-dimensions-added-to-batch-counter");
this.definitionDimensionsCounter =
environment.metrics()
.counter(handlerName + "." + "metric-definition-dimensions-added-to-batch-counter");
this.flushTimer =
this.environment.metrics().timer(this.getClass().getName() + "." + "flush-timer");
this.commitTimer =
this.environment.metrics().timer(this.getClass().getName() + "." + "commit-timer");
this.measurementMeter =
this.environment.metrics().meter(this.getClass().getName() + "." + "measurement-meter");
this.environment.metrics()
.meter(this.getClass().getName() + "." + "measurement-meter");
this.definitionCacheMissMeter =
this.environment.metrics().meter(
this.getClass().getName() + "." + "definition-cache-miss-meter");
this.environment.metrics()
.meter(this.getClass().getName() + "." + "definition-cache-miss-meter");
this.dimensionCacheMissMeter =
this.environment.metrics().meter(
this.getClass().getName() + "." + "dimension-cache-miss-meter");
this.environment.metrics()
.meter(this.getClass().getName() + "." + "dimension-cache-miss-meter");
this.definitionDimensionCacheMissMeter =
this.environment.metrics().meter(
this.getClass().getName() + "." + "definition-dimension-cache-miss-meter");
this.environment.metrics()
.meter(this.getClass().getName() + "." + "definition-dimension-cache-miss-meter");
this.definitionCacheHitMeter =
this.environment.metrics().meter(
this.getClass().getName() + "." + "definition-cache-hit-meter");
this.environment.metrics()
.meter(this.getClass().getName() + "." + "definition-cache-hit-meter");
this.dimensionCacheHitMeter =
this.environment.metrics().meter(
this.getClass().getName() + "." + "dimension-cache-hit-meter");
this.environment.metrics()
.meter(this.getClass().getName() + "." + "dimension-cache-hit-meter");
this.definitionDimensionCacheHitMeter =
this.environment.metrics().meter(
this.getClass().getName() + "." + "definition-dimension-cache-hit-meter");
this.environment.metrics()
.meter(this.getClass().getName() + "." + "definition-dimension-cache-hit-meter");
definitionsIdCache =
CacheBuilder.newBuilder()
.maximumSize(configuration.getVerticaMetricRepoConfig().getMaxCacheSize())
.build();
dimensionsIdCache =
CacheBuilder.newBuilder()
.maximumSize(configuration.getVerticaMetricRepoConfig().getMaxCacheSize())
.build();
definitionDimensionsIdCache =
CacheBuilder.newBuilder()
.maximumSize(configuration.getVerticaMetricRepoConfig().getMaxCacheSize())
@ -239,97 +239,138 @@ public class VerticaMetricRepo extends VerticaRepo implements Repo<MetricEnvelop
logger.debug("completed database preparations");
logger.debug(this.getClass().getName() + "is fully instantiated");
logger.debug(this.getClass().getName() + " is fully instantiated");
}
@Override
public void addToBatch(MetricEnvelope metricEnvelope) {
public void addToBatch(MetricEnvelope metricEnvelope, String id) {
Metric metric = metricEnvelope.metric;
Map<String, Object> meta = metricEnvelope.meta;
logger.debug("metric: {}", metric);
logger.debug("meta: {}", meta);
String tenantId = getMeta(TENANT_ID, metric, meta, id);
String tenantId = "";
if (meta.containsKey(TENANT_ID)) {
tenantId = (String) meta.get(TENANT_ID);
} else {
logger.warn(
"Failed to find tenantId in message envelope meta data. Metric message may be malformed"
+ ". Setting tenantId to empty string.");
logger.warn("metric: {}", metric.toString());
logger.warn("meta: {}", meta.toString());
}
String region = "";
if (meta.containsKey(REGION)) {
region = (String) meta.get(REGION);
} else {
logger.warn(
"Failed to find region in message envelope meta data. Metric message may be malformed. "
+ "Setting region to empty string.");
logger.warn("metric: {}", metric.toString());
logger.warn("meta: {}", meta.toString());
}
String region = getMeta(REGION, metric, meta, id);
// Add the definition to the batch.
StringBuilder
definitionIdStringToHash =
new StringBuilder(trunc(metric.getName(), MAX_COLUMN_LENGTH));
definitionIdStringToHash.append(trunc(tenantId, MAX_COLUMN_LENGTH));
definitionIdStringToHash.append(trunc(region, MAX_COLUMN_LENGTH));
StringBuilder definitionIdStringToHash =
new StringBuilder(trunc(metric.getName(), MAX_COLUMN_LENGTH, id));
definitionIdStringToHash.append(trunc(tenantId, MAX_COLUMN_LENGTH, id));
definitionIdStringToHash.append(trunc(region, MAX_COLUMN_LENGTH, id));
byte[] definitionIdSha1Hash = DigestUtils.sha(definitionIdStringToHash.toString());
Sha1HashId definitionSha1HashId = new Sha1HashId((definitionIdSha1Hash));
this.addDefinitionToBatch(definitionSha1HashId, trunc(metric.getName(), MAX_COLUMN_LENGTH),
trunc(tenantId, MAX_COLUMN_LENGTH), trunc(region, MAX_COLUMN_LENGTH));
definitionCounter.inc();
addDefinitionToBatch(definitionSha1HashId, trunc(metric.getName(), MAX_COLUMN_LENGTH, id),
trunc(tenantId, MAX_COLUMN_LENGTH, id),
trunc(region, MAX_COLUMN_LENGTH, id), id);
// Calculate dimensions sha1 hash id.
StringBuilder dimensionIdStringToHash = new StringBuilder();
Map<String, String> preppedDimMap = prepDimensions(metric.getDimensions());
Map<String, String> preppedDimMap = prepDimensions(metric.getDimensions(), id);
for (Map.Entry<String, String> entry : preppedDimMap.entrySet()) {
dimensionIdStringToHash.append(entry.getKey());
dimensionIdStringToHash.append(entry.getValue());
}
byte[] dimensionIdSha1Hash = DigestUtils.sha(dimensionIdStringToHash.toString());
Sha1HashId dimensionsSha1HashId = new Sha1HashId(dimensionIdSha1Hash);
// Add the dimension name/values to the batch.
this.addDimensionsToBatch(dimensionsSha1HashId, preppedDimMap);
addDimensionsToBatch(dimensionsSha1HashId, preppedDimMap, id);
// Add the definition dimensions to the batch.
StringBuilder
definitionDimensionsIdStringToHash =
StringBuilder definitionDimensionsIdStringToHash =
new StringBuilder(definitionSha1HashId.toHexString());
definitionDimensionsIdStringToHash.append(dimensionsSha1HashId.toHexString());
byte[]
definitionDimensionsIdSha1Hash =
byte[] definitionDimensionsIdSha1Hash =
DigestUtils.sha(definitionDimensionsIdStringToHash.toString());
Sha1HashId definitionDimensionsSha1HashId = new Sha1HashId(definitionDimensionsIdSha1Hash);
this.addDefinitionDimensionToBatch(definitionDimensionsSha1HashId, definitionSha1HashId,
dimensionsSha1HashId);
definitionDimensionsCounter.inc();
dimensionsSha1HashId, id);
// Add the measurement to the batch.
String timeStamp = simpleDateFormat.format(new Date(metric.getTimestamp()));
double value = metric.getValue();
this.addMetricToBatch(definitionDimensionsSha1HashId, timeStamp, value, metric.getValueMeta());
this.metricCounter.inc();
double value = metric.getValue();
addMetricToBatch(definitionDimensionsSha1HashId, timeStamp, value, metric.getValueMeta(), id);
}
private String getMeta(String name, Metric metric, Map<String, Object> meta, String id) {
if (meta.containsKey(name)) {
return (String) meta.get(name);
} else {
logger.warn(
"[{}]: failed to find {} in message envelope meta data. metric message may be malformed. "
+ "setting {} to empty string.", id, name);
logger.warn("[{}]: metric: {}", id, metric.toString());
logger.warn("[{}]: meta: {}", id, meta.toString());
return "";
}
}
public void addMetricToBatch(Sha1HashId defDimsId, String timeStamp, double value,
Map<String, String> valueMeta) {
// TODO: Actually handle valueMeta
logger.debug("Adding metric to batch: defDimsId: {}, time: {}, value: {}",
defDimsId.toHexString(), timeStamp, value);
metricsBatch.add().bind("definition_dimension_id", defDimsId.getSha1Hash())
.bind("time_stamp", timeStamp).bind("value", value);
Map<String, String> valueMeta, String id) {
String valueMetaString = getValueMetaString(valueMeta, id);
logger.debug("[{}]: adding metric to batch: defDimsId: {}, time: {}, value: {}, value meta {}",
id, defDimsId.toHexString(), timeStamp, value, valueMetaString);
metricsBatch.add()
.bind("definition_dimension_id", defDimsId.getSha1Hash())
.bind("time_stamp", timeStamp)
.bind("value", value)
.bind("value_meta", valueMetaString);
this.measurementCnt++;
measurementMeter.mark();
}
private void addDefinitionToBatch(Sha1HashId defId, String name, String tenantId, String region) {
private String getValueMetaString(Map<String, String> valueMeta, String id) {
String valueMetaString = "";
if (valueMeta != null && !valueMeta.isEmpty()) {
try {
valueMetaString = this.objectMapper.writeValueAsString(valueMeta);
} catch (JsonProcessingException e) {
logger
.error("[{}]: Failed to serialize value meta {}, dropping value meta from measurement",
id, valueMeta);
}
}
return valueMetaString;
}
private void addDefinitionToBatch(Sha1HashId defId, String name, String tenantId, String region, String id) {
if (definitionsIdCache.getIfPresent(defId) == null) {
@ -337,11 +378,17 @@ public class VerticaMetricRepo extends VerticaRepo implements Repo<MetricEnvelop
if (!definitionIdSet.contains(defId)) {
logger.debug("Adding definition to batch: defId: {}, name: {}, tenantId: {}, region: {}",
defId.toHexString(), name, tenantId, region);
stagedDefinitionsBatch.add().bind("id", defId.getSha1Hash()).bind("name", name)
.bind("tenant_id", tenantId).bind("region", region);
logger.debug("[{}]: adding definition to batch: defId: {}, name: {}, tenantId: {}, region: {}",
id, defId.toHexString(), name, tenantId, region);
stagedDefinitionsBatch.add()
.bind("id", defId.getSha1Hash())
.bind("name", name)
.bind("tenant_id", tenantId)
.bind("region", region);
definitionIdSet.add(defId);
}
} else {
@ -351,7 +398,7 @@ public class VerticaMetricRepo extends VerticaRepo implements Repo<MetricEnvelop
}
}
private void addDimensionsToBatch(Sha1HashId dimSetId, Map<String, String> dimMap) {
private void addDimensionsToBatch(Sha1HashId dimSetId, Map<String, String> dimMap, String id) {
if (dimensionsIdCache.getIfPresent(dimSetId) == null) {
@ -364,10 +411,14 @@ public class VerticaMetricRepo extends VerticaRepo implements Repo<MetricEnvelop
String name = entry.getKey();
String value = entry.getValue();
logger.debug("Adding dimension to batch: dimSetId: {}, name: {}, value: {}", dimSetId.toHexString(), name, value);
logger.debug(
"[{}]: adding dimension to batch: dimSetId: {}, name: {}, value: {}",
id, dimSetId.toHexString(), name, value);
stagedDimensionsBatch.add().bind("dimension_set_id", dimSetId.getSha1Hash())
.bind("name", name).bind("value", value);
stagedDimensionsBatch.add()
.bind("dimension_set_id", dimSetId.getSha1Hash())
.bind("name", name)
.bind("value", value);
}
dimensionIdSet.add(dimSetId);
@ -381,7 +432,7 @@ public class VerticaMetricRepo extends VerticaRepo implements Repo<MetricEnvelop
}
private void addDefinitionDimensionToBatch(Sha1HashId defDimsId, Sha1HashId defId,
Sha1HashId dimId) {
Sha1HashId dimId, String id) {
if (definitionDimensionsIdCache.getIfPresent(defDimsId) == null) {
@ -389,9 +440,11 @@ public class VerticaMetricRepo extends VerticaRepo implements Repo<MetricEnvelop
if (!definitionDimensionsIdSet.contains(defDimsId)) {
logger.debug("Adding definitionDimension to batch: defDimsId: {}, defId: {}, dimId: {}",
defDimsId.toHexString(), defId, dimId);
stagedDefinitionDimensionsBatch.add().bind("id", defDimsId.getSha1Hash())
logger.debug("[{}]: adding definitionDimension to batch: defDimsId: {}, defId: {}, dimId: {}",
defDimsId.toHexString(), defId, dimId, id);
stagedDefinitionDimensionsBatch.add()
.bind("id", defDimsId.getSha1Hash())
.bind("definition_id", defId.getSha1Hash())
.bind("dimension_set_id", dimId.getSha1Hash());
@ -407,49 +460,82 @@ public class VerticaMetricRepo extends VerticaRepo implements Repo<MetricEnvelop
@Override
public int flush(String id) {
try {
long startTime = System.currentTimeMillis();
Timer.Context context = flushTimer.time();
executeBatches();
writeRowsFromTempStagingTablesToPermTables();
handle.commit();
handle.begin();
long endTime = System.currentTimeMillis();
context.stop();
logger.debug("Writing measurements, definitions, and dimensions to database took "
+ (endTime - startTime) / 1000 + " seconds");
updateIdCaches();
} catch (Exception e) {
logger.error("Failed to write measurements, definitions, or dimensions to database", e);
if (handle.isInTransaction()) {
handle.rollback();
}
clearTempCaches();
handle.begin();
}
// Todo. implement cnt.
return 0;
try {
long startTime = System.currentTimeMillis();
Timer.Context context = commitTimer.time();
executeBatches();
writeRowsFromTempStagingTablesToPermTables();
handle.commit();
handle.begin();
long endTime = System.currentTimeMillis();
context.stop();
logger.debug("[{}]: writing measurements, definitions, and dimensions to vertica took {} ms",
id, endTime - startTime);
updateIdCaches();
int commitCnt = this.measurementCnt;
this.measurementCnt = 0;
return commitCnt;
} catch (Exception e) {
logger.error("[{}]: failed to write measurements, definitions, or dimensions to vertica",
id, e);
if (handle.isInTransaction()) {
handle.rollback();
}
clearTempCaches();
handle.begin();
return this.measurementCnt = 0;
}
}
private void executeBatches() {
metricsBatch.execute();
stagedDefinitionsBatch.execute();
stagedDimensionsBatch.execute();
stagedDefinitionDimensionsBatch.execute();
}
private void updateIdCaches() {
for (Sha1HashId defId : definitionIdSet) {
definitionsIdCache.put(defId, defId);
}
for (Sha1HashId dimId : dimensionIdSet) {
dimensionsIdCache.put(dimId, dimId);
}
for (Sha1HashId defDimsId : definitionDimensionsIdSet) {
definitionDimensionsIdCache.put(defDimsId, defDimsId);
}
@ -457,6 +543,7 @@ public class VerticaMetricRepo extends VerticaRepo implements Repo<MetricEnvelop
}
private void writeRowsFromTempStagingTablesToPermTables() {
handle.execute(definitionsTempStagingTableInsertStmt);
handle.execute("truncate table " + definitionsTempStagingTableName);
handle.execute(dimensionsTempStagingTableInsertStmt);
@ -466,40 +553,58 @@ public class VerticaMetricRepo extends VerticaRepo implements Repo<MetricEnvelop
}
private void clearTempCaches() {
definitionIdSet.clear();
dimensionIdSet.clear();
definitionDimensionsIdSet.clear();
}
private Map<String, String> prepDimensions(Map<String, String> dimMap) {
private Map<String, String> prepDimensions(Map<String, String> dimMap, String id) {
Map<String, String> newDimMap = new TreeMap<>();
if (dimMap != null) {
for (String dimName : dimMap.keySet()) {
if (dimName != null && !dimName.isEmpty()) {
String dimValue = dimMap.get(dimName);
if (dimValue != null && !dimValue.isEmpty()) {
newDimMap.put(trunc(dimName, MAX_COLUMN_LENGTH), trunc(dimValue, MAX_COLUMN_LENGTH));
dimensionCounter.inc();
newDimMap.put(trunc(dimName, MAX_COLUMN_LENGTH, id),
trunc(dimValue, MAX_COLUMN_LENGTH, id));
}
}
}
}
return newDimMap;
}
private String trunc(String s, int l) {
private String trunc(String s, int l, String id) {
if (s == null) {
return "";
} else if (s.length() <= l) {
return s;
} else {
String r = s.substring(0, l);
logger.warn("Input string exceeded max column length. Truncating input string {} to {} chars",
s, l);
logger.warn("Resulting string {}", r);
logger.warn( "[{}]: input string exceeded max column length. truncating input string {} to {} chars",
id, s, l);
logger.warn("[{}]: resulting string {}", id, r);
return r;
}
}