Cleanup code
Refactor code. Add final. Change-Id: Ifaf681371adb9a7eeb6710d6a1bf569c53503739
This commit is contained in:
parent
58b214c54c
commit
d7876668a3
@ -43,26 +43,27 @@ import java.util.concurrent.TimeUnit;
|
||||
import io.dropwizard.setup.Environment;
|
||||
import monasca.persister.configuration.MonPersisterConfiguration;
|
||||
|
||||
public class InfluxDBMetricRepository extends InfluxRepository implements MetricRepository {
|
||||
public final class InfluxDBMetricRepository extends InfluxRepository implements MetricRepository {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(InfluxDBMetricRepository.class);
|
||||
private static final int NUMBER_COLUMNS = 2;
|
||||
|
||||
private final Map<Sha1HashId, Definition> definitionMap = new HashMap<>();
|
||||
private final Map<Sha1HashId, Set<Dimension>> dimensionMap = new HashMap<>();
|
||||
private final Map<Sha1HashId, DefinitionDimension> definitionDimensionMap = new HashMap<>();
|
||||
private final Map<Sha1HashId, Def> defMap = new HashMap<>();
|
||||
private final Map<Sha1HashId, Set<Dim>> dimMap = new HashMap<>();
|
||||
private final Map<Sha1HashId, DefDim> defDimMap = new HashMap<>();
|
||||
private final Map<Sha1HashId, List<Measurement>> measurementMap = new HashMap<>();
|
||||
|
||||
private final com.codahale.metrics.Timer flushTimer;
|
||||
public final Meter measurementMeter;
|
||||
|
||||
private final SimpleDateFormat measurementTimeStampSimpleDateFormat = new
|
||||
private final SimpleDateFormat measurementTimeSimpleDateFormat = new
|
||||
SimpleDateFormat("yyyy-MM-dd HH:mm:ss zzz");
|
||||
private static final Sha1HashId BLANK_SHA_1_HASH_ID = new Sha1HashId(DigestUtils.sha(""));
|
||||
private static final Set<Dimension> EMPTY_DIMENSION_TREE_SET = new TreeSet();
|
||||
private static final Set<Dim> EMPTY_DIM_TREE_SET = new TreeSet();
|
||||
|
||||
@Inject
|
||||
public InfluxDBMetricRepository(MonPersisterConfiguration configuration,
|
||||
Environment environment) {
|
||||
public InfluxDBMetricRepository(final MonPersisterConfiguration configuration,
|
||||
final Environment environment) {
|
||||
super(configuration, environment);
|
||||
this.flushTimer = this.environment.metrics().timer(this.getClass().getName() + "." +
|
||||
"flush-timer");
|
||||
@ -71,153 +72,166 @@ public class InfluxDBMetricRepository extends InfluxRepository implements Metric
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addMetricToBatch(Sha1HashId defDimsId, String timeStamp, double value) {
|
||||
Measurement m = new Measurement(defDimsId, timeStamp, value);
|
||||
public void addMetricToBatch(final Sha1HashId defDimsId, final String timeStamp,
|
||||
final double value) {
|
||||
final Measurement measurement = new Measurement(defDimsId, timeStamp, value);
|
||||
List<Measurement> measurementList = this.measurementMap.get(defDimsId);
|
||||
if (measurementList == null) {
|
||||
measurementList = new LinkedList<>();
|
||||
this.measurementMap.put(defDimsId, measurementList);
|
||||
}
|
||||
measurementList.add(m);
|
||||
measurementList.add(measurement);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addDefinitionToBatch(Sha1HashId defId, String name, String tenantId, String region) {
|
||||
Definition d = new Definition(defId, name, tenantId, region);
|
||||
definitionMap.put(defId, d);
|
||||
public void addDefinitionToBatch(final Sha1HashId defId, final String name, final String tenantId,
|
||||
final String region) {
|
||||
final Def def = new Def(defId, name, tenantId, region);
|
||||
defMap.put(defId, def);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addDimensionToBatch(Sha1HashId dimSetId, String name, String value) {
|
||||
Set<Dimension> dimensionSet = dimensionMap.get(dimSetId);
|
||||
if (dimensionSet == null) {
|
||||
dimensionSet = new TreeSet<Dimension>();
|
||||
dimensionMap.put(dimSetId, dimensionSet);
|
||||
public void addDimensionToBatch(final Sha1HashId dimSetId, final String name,
|
||||
final String value) {
|
||||
Set<Dim> dimSet = dimMap.get(dimSetId);
|
||||
if (dimSet == null) {
|
||||
dimSet = new TreeSet<>();
|
||||
dimMap.put(dimSetId, dimSet);
|
||||
}
|
||||
|
||||
Dimension d = new Dimension(dimSetId, name, value);
|
||||
dimensionSet.add(d);
|
||||
final Dim dim = new Dim(dimSetId, name, value);
|
||||
dimSet.add(dim);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addDefinitionDimensionToBatch(Sha1HashId defDimsId, Sha1HashId defId,
|
||||
public void addDefinitionDimensionToBatch(final Sha1HashId defDimsId, final Sha1HashId defId,
|
||||
Sha1HashId dimId) {
|
||||
DefinitionDimension dd = new DefinitionDimension(defDimsId, defId, dimId);
|
||||
definitionDimensionMap.put(defDimsId, dd);
|
||||
final DefDim defDim = new DefDim(defDimsId, defId, dimId);
|
||||
defDimMap.put(defDimsId, defDim);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush() {
|
||||
|
||||
try {
|
||||
long startTime = System.currentTimeMillis();
|
||||
Timer.Context context = flushTimer.time();
|
||||
Serie[] series = getSeries();
|
||||
final long startTime = System.currentTimeMillis();
|
||||
final Timer.Context context = flushTimer.time();
|
||||
this.influxDB.write(this.configuration.getInfluxDBConfiguration().getName(),
|
||||
TimeUnit.SECONDS, series);
|
||||
long endTime = System.currentTimeMillis();
|
||||
TimeUnit.SECONDS, getSeries());
|
||||
final long endTime = System.currentTimeMillis();
|
||||
context.stop();
|
||||
logger.debug("Writing measurements, definitions, and dimensions to database took {} seconds",
|
||||
logger.debug("Writing measurements, definitions, and dimensions to InfluxDB took {} seconds",
|
||||
(endTime - startTime) / 1000);
|
||||
} catch (Exception e) {
|
||||
logger.error("Failed to write measurements to database", e);
|
||||
logger.error("Failed to write measurements to InfluxDB", e);
|
||||
}
|
||||
clearBuffers();
|
||||
}
|
||||
|
||||
private String buildSerieName(Definition definition, Set<Dimension> dimensionList)
|
||||
private String buildSerieName(final Def def, final Set<Dim> dimList)
|
||||
throws UnsupportedEncodingException {
|
||||
|
||||
logger.debug("Creating serie name");
|
||||
|
||||
StringBuilder serieNameBuilder = new StringBuilder();
|
||||
final StringBuilder serieNameBuilder = new StringBuilder();
|
||||
|
||||
logger.debug("Adding name to serie name: {}", definition.name);
|
||||
serieNameBuilder.append(urlEncodeUTF8(definition.name));
|
||||
logger.debug("Adding name to serie name: {}", def.name);
|
||||
serieNameBuilder.append(urlEncodeUTF8(def.name));
|
||||
serieNameBuilder.append("?");
|
||||
|
||||
logger.debug("Adding tenant_id to serie name: {}", definition.tenantId);
|
||||
serieNameBuilder.append(urlEncodeUTF8(definition.tenantId));
|
||||
logger.debug("Adding tenant_id to serie name: {}", def.tenantId);
|
||||
serieNameBuilder.append(urlEncodeUTF8(def.tenantId));
|
||||
serieNameBuilder.append("&");
|
||||
|
||||
logger.debug("Adding region to serie name: {}", definition.region);
|
||||
serieNameBuilder.append(urlEncodeUTF8(definition.region));
|
||||
logger.debug("Adding region to serie name: {}", def.region);
|
||||
serieNameBuilder.append(urlEncodeUTF8(def.region));
|
||||
|
||||
for (Dimension dimension : dimensionList) {
|
||||
for (final Dim dim : dimList) {
|
||||
serieNameBuilder.append("&");
|
||||
logger.debug("Adding dimension name to serie name: {}", dimension.name);
|
||||
serieNameBuilder.append(urlEncodeUTF8(dimension.name));
|
||||
|
||||
logger.debug("Adding dimension name to serie name: {}", dim.name);
|
||||
serieNameBuilder.append(urlEncodeUTF8(dim.name));
|
||||
serieNameBuilder.append("=");
|
||||
|
||||
logger.debug("Adding dimension value to serie name: {}", dimension.value);
|
||||
serieNameBuilder.append(urlEncodeUTF8(dimension.value));
|
||||
logger.debug("Adding dimension value to serie name: {}", dim.value);
|
||||
serieNameBuilder.append(urlEncodeUTF8(dim.value));
|
||||
}
|
||||
|
||||
String serieName = serieNameBuilder.toString();
|
||||
final String serieName = serieNameBuilder.toString();
|
||||
logger.debug("Created serie name: {}", serieName);
|
||||
|
||||
return serieName;
|
||||
}
|
||||
|
||||
private String urlEncodeUTF8(String s) throws UnsupportedEncodingException {
|
||||
private Def getDef(final Sha1HashId defId) throws Exception {
|
||||
|
||||
final Def def = this.defMap.get(defId);
|
||||
if (def == null) {
|
||||
throw new Exception("Failed to find definition for defId: " + defId);
|
||||
}
|
||||
|
||||
return def;
|
||||
}
|
||||
|
||||
private Set<Dim> getDimSet(final Sha1HashId dimId) throws Exception {
|
||||
|
||||
// If there were no dimensions, then "" was used in the hash id and nothing was
|
||||
// ever added to the dimension map for this dimension set.
|
||||
if (dimId.equals(BLANK_SHA_1_HASH_ID)) {
|
||||
return EMPTY_DIM_TREE_SET;
|
||||
}
|
||||
|
||||
final Set<Dim> dimSet = this.dimMap.get(dimId);
|
||||
|
||||
if (dimSet == null) {
|
||||
throw new Exception("Failed to find dimension set for dimId: " + dimId);
|
||||
}
|
||||
|
||||
return dimSet;
|
||||
}
|
||||
|
||||
private String urlEncodeUTF8(final String s) throws UnsupportedEncodingException {
|
||||
return URLEncoder.encode(s, "UTF-8");
|
||||
}
|
||||
|
||||
private Serie[] getSeries() throws
|
||||
Exception {
|
||||
private String[] buildColNamesStringArry() {
|
||||
|
||||
List<Serie> serieList = new LinkedList<>();
|
||||
final String[] colNameStringArry = new String[NUMBER_COLUMNS];
|
||||
|
||||
for (Sha1HashId defdimsId : this.measurementMap.keySet()) {
|
||||
colNameStringArry[0] = "time";
|
||||
logger.debug("Added column name[{}] = {}", 0, colNameStringArry[0]);
|
||||
|
||||
DefinitionDimension definitionDimension = this.definitionDimensionMap.get(defdimsId);
|
||||
colNameStringArry[1] = "value";
|
||||
logger.debug("Added column name[{}] = {}", 1, colNameStringArry[1]);
|
||||
|
||||
Definition definition = definitionMap.get(definitionDimension.defId);
|
||||
if (definition == null) {
|
||||
throw new Exception("Failed to find Definition for defId: " + definitionDimension.defId);
|
||||
}
|
||||
if (logger.isDebugEnabled()) {
|
||||
logColumnNames(colNameStringArry);
|
||||
}
|
||||
|
||||
Set<Dimension> dimensionSet;
|
||||
// If there were no dimensions, then "" was used in the hash id and nothing was
|
||||
// added for dimensions.
|
||||
if (definitionDimension.dimId.equals(BLANK_SHA_1_HASH_ID)) {
|
||||
dimensionSet = EMPTY_DIMENSION_TREE_SET;
|
||||
} else {
|
||||
dimensionSet = this.dimensionMap.get(definitionDimension.dimId);
|
||||
}
|
||||
return colNameStringArry;
|
||||
}
|
||||
|
||||
String serieName = buildSerieName(definition, dimensionSet);
|
||||
Builder builder = new Serie.Builder(serieName);
|
||||
logger.debug("Created serie: {}", serieName);
|
||||
private Serie[] getSeries() throws Exception {
|
||||
|
||||
String[] colNameStringArry = new String[2];
|
||||
final List<Serie> serieList = new LinkedList<>();
|
||||
|
||||
colNameStringArry[0] = "time";
|
||||
logger.debug("Added column name[{}]: time", 0);
|
||||
for (final Sha1HashId defDimId : this.measurementMap.keySet()) {
|
||||
|
||||
colNameStringArry[1] = "value";
|
||||
logger.debug("Added column name[{}]: value", 1);
|
||||
final DefDim defDim = this.defDimMap.get(defDimId);
|
||||
final Def def = getDef(defDim.defId);
|
||||
final Set<Dim> dimSet = getDimSet(defDim.dimId);
|
||||
final Builder builder = new Serie.Builder(buildSerieName(def, dimSet));
|
||||
|
||||
builder.columns(colNameStringArry);
|
||||
builder.columns(buildColNamesStringArry());
|
||||
|
||||
if (logger.isDebugEnabled()) {
|
||||
logColumnNames(colNameStringArry);
|
||||
}
|
||||
|
||||
int i = 0;
|
||||
for (Measurement measurement : this.measurementMap.get(defdimsId)) {
|
||||
Object[] colValsObjArry = new Object[2];
|
||||
Date date = measurementTimeStampSimpleDateFormat.parse(measurement.timeStamp + " UTC");
|
||||
Long time = date.getTime() / 1000;
|
||||
for (final Measurement measurement : this.measurementMap.get(defDimId)) {
|
||||
final Object[] colValsObjArry = new Object[NUMBER_COLUMNS];
|
||||
final Date date = measurementTimeSimpleDateFormat.parse(measurement.time + " UTC");
|
||||
final Long time = date.getTime() / 1000;
|
||||
colValsObjArry[0] = time;
|
||||
logger.debug("Added column value[{}][{}]: {}", i, 0, time);
|
||||
logger.debug("Added column value to colValsObjArry[{}] = {}", 0, colValsObjArry[0]);
|
||||
colValsObjArry[1] = measurement.value;
|
||||
logger.debug("Added column value[{}][{}]: {}", i, 1, measurement.value);
|
||||
logger.debug("Added column value to colValsObjArry[{}] = {}", 1, colValsObjArry[1]);
|
||||
builder.values(colValsObjArry);
|
||||
measurementMeter.mark();
|
||||
i++;
|
||||
}
|
||||
|
||||
final Serie serie = builder.build();
|
||||
@ -237,38 +251,39 @@ public class InfluxDBMetricRepository extends InfluxRepository implements Metric
|
||||
private void clearBuffers() {
|
||||
|
||||
this.measurementMap.clear();
|
||||
this.definitionMap.clear();
|
||||
this.dimensionMap.clear();
|
||||
this.definitionDimensionMap.clear();
|
||||
this.defMap.clear();
|
||||
this.dimMap.clear();
|
||||
this.defDimMap.clear();
|
||||
}
|
||||
|
||||
private static final class Measurement {
|
||||
|
||||
Sha1HashId defDimsId;
|
||||
String timeStamp;
|
||||
double value;
|
||||
final Sha1HashId defDimsId;
|
||||
final String time;
|
||||
final double value;
|
||||
|
||||
private Measurement(Sha1HashId defDimsId, String timeStamp, double value) {
|
||||
private Measurement(final Sha1HashId defDimsId, final String time, final double value) {
|
||||
this.defDimsId = defDimsId;
|
||||
this.timeStamp = timeStamp;
|
||||
this.time = time;
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Measurement{" + "defDimsId=" + defDimsId + ", timeStamp='" + timeStamp + '\'' + ", " +
|
||||
return "Measurement{" + "defDimsId=" + defDimsId + ", time='" + time + '\'' + ", " +
|
||||
"value=" + value + '}';
|
||||
}
|
||||
}
|
||||
|
||||
private static final class Definition {
|
||||
private static final class Def {
|
||||
|
||||
Sha1HashId defId;
|
||||
String name;
|
||||
String tenantId;
|
||||
String region;
|
||||
final Sha1HashId defId;
|
||||
final String name;
|
||||
final String tenantId;
|
||||
final String region;
|
||||
|
||||
private Definition(Sha1HashId defId, String name, String tenantId, String region) {
|
||||
private Def(final Sha1HashId defId, final String name, final String tenantId,
|
||||
final String region) {
|
||||
this.defId = defId;
|
||||
this.name = name;
|
||||
this.tenantId = tenantId;
|
||||
@ -282,13 +297,13 @@ public class InfluxDBMetricRepository extends InfluxRepository implements Metric
|
||||
}
|
||||
}
|
||||
|
||||
private static final class Dimension implements Comparable<Dimension> {
|
||||
private static final class Dim implements Comparable<Dim> {
|
||||
|
||||
Sha1HashId dimSetId;
|
||||
String name;
|
||||
String value;
|
||||
final Sha1HashId dimSetId;
|
||||
final String name;
|
||||
final String value;
|
||||
|
||||
private Dimension(Sha1HashId dimSetId, String name, String value) {
|
||||
private Dim(final Sha1HashId dimSetId, final String name, final String value) {
|
||||
this.dimSetId = dimSetId;
|
||||
this.name = name;
|
||||
this.value = value;
|
||||
@ -301,19 +316,19 @@ public class InfluxDBMetricRepository extends InfluxRepository implements Metric
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(Dimension o) {
|
||||
public int compareTo(Dim o) {
|
||||
int nameCmp = String.CASE_INSENSITIVE_ORDER.compare(name, o.name);
|
||||
return (nameCmp != 0 ? nameCmp : String.CASE_INSENSITIVE_ORDER.compare(value, o.value));
|
||||
}
|
||||
}
|
||||
|
||||
private static final class DefinitionDimension {
|
||||
private static final class DefDim {
|
||||
|
||||
Sha1HashId defDimId;
|
||||
Sha1HashId defId;
|
||||
Sha1HashId dimId;
|
||||
final Sha1HashId defDimId;
|
||||
final Sha1HashId defId;
|
||||
final Sha1HashId dimId;
|
||||
|
||||
private DefinitionDimension(Sha1HashId defDimId, Sha1HashId defId, Sha1HashId dimId) {
|
||||
private DefDim(final Sha1HashId defDimId, final Sha1HashId defId, final Sha1HashId dimId) {
|
||||
this.defDimId = defDimId;
|
||||
this.defId = defId;
|
||||
this.dimId = dimId;
|
||||
|
@ -203,7 +203,7 @@ public class VerticaMetricRepository extends VerticaRepository implements Metric
|
||||
|
||||
@Override
|
||||
public void addMetricToBatch(Sha1HashId defDimsId, String timeStamp, double value) {
|
||||
logger.debug("Adding metric to batch: defDimsId: {}, timeStamp: {}, value: {}",
|
||||
logger.debug("Adding metric to batch: defDimsId: {}, time: {}, value: {}",
|
||||
defDimsId.toHexString(), timeStamp, value);
|
||||
metricsBatch.add().bind("definition_dimension_id", defDimsId.getSha1Hash())
|
||||
.bind("time_stamp", timeStamp).bind("value", value);
|
||||
|
Loading…
x
Reference in New Issue
Block a user