diff --git a/java/src/main/java/monasca/persister/PersisterModule.java b/java/src/main/java/monasca/persister/PersisterModule.java index 958b4d78..e232837d 100644 --- a/java/src/main/java/monasca/persister/PersisterModule.java +++ b/java/src/main/java/monasca/persister/PersisterModule.java @@ -51,15 +51,15 @@ import monasca.persister.pipeline.event.AlarmStateTransitionedEventHandlerFactor import monasca.persister.pipeline.event.MetricHandler; import monasca.persister.pipeline.event.MetricHandlerFactory; import monasca.persister.repository.AlarmRepo; -import monasca.persister.repository.InfluxV8AlarmRepo; -import monasca.persister.repository.InfluxV8MetricRepo; -import monasca.persister.repository.InfluxV8RepoWriter; -import monasca.persister.repository.InfluxV9AlarmRepo; -import monasca.persister.repository.InfluxV9MetricRepo; -import monasca.persister.repository.InfluxV9RepoWriter; import monasca.persister.repository.MetricRepo; -import monasca.persister.repository.VerticaAlarmRepo; -import monasca.persister.repository.VerticaMetricRepo; +import monasca.persister.repository.influxdb.InfluxV8AlarmRepo; +import monasca.persister.repository.influxdb.InfluxV8MetricRepo; +import monasca.persister.repository.influxdb.InfluxV8RepoWriter; +import monasca.persister.repository.influxdb.InfluxV9AlarmRepo; +import monasca.persister.repository.influxdb.InfluxV9MetricRepo; +import monasca.persister.repository.influxdb.InfluxV9RepoWriter; +import monasca.persister.repository.vertica.VerticaAlarmRepo; +import monasca.persister.repository.vertica.VerticaMetricRepo; public class PersisterModule extends AbstractModule { diff --git a/java/src/main/java/monasca/persister/consumer/KafkaConsumer.java b/java/src/main/java/monasca/persister/consumer/KafkaConsumer.java index aca7c77a..68de4f6f 100644 --- a/java/src/main/java/monasca/persister/consumer/KafkaConsumer.java +++ b/java/src/main/java/monasca/persister/consumer/KafkaConsumer.java @@ -45,8 +45,7 @@ public abstract class KafkaConsumer { public void start() { executorService = Executors.newFixedThreadPool(1); - KafkaConsumerRunnableBasic kafkaConsumerRunnableBasic = - createRunnable(kafkaChannel, this.threadNum); + kafkaConsumerRunnableBasic = createRunnable(kafkaChannel, this.threadNum); executorService.submit(kafkaConsumerRunnableBasic); } diff --git a/java/src/main/java/monasca/persister/consumer/KafkaMetricsConsumerFactory.java b/java/src/main/java/monasca/persister/consumer/KafkaMetricsConsumerFactory.java index f72078b1..c8190bc7 100644 --- a/java/src/main/java/monasca/persister/consumer/KafkaMetricsConsumerFactory.java +++ b/java/src/main/java/monasca/persister/consumer/KafkaMetricsConsumerFactory.java @@ -20,6 +20,6 @@ package monasca.persister.consumer; import monasca.persister.pipeline.MetricPipeline; public interface KafkaMetricsConsumerFactory { - public KafkaMetricsConsumer create(KafkaChannel kafkaChannel, int threadNum, - MetricPipeline pipeline); + KafkaMetricsConsumer create(KafkaChannel kafkaChannel, int threadNum, + MetricPipeline pipeline); } diff --git a/java/src/main/java/monasca/persister/pipeline/event/MetricHandler.java b/java/src/main/java/monasca/persister/pipeline/event/MetricHandler.java index de9c137d..667cff83 100644 --- a/java/src/main/java/monasca/persister/pipeline/event/MetricHandler.java +++ b/java/src/main/java/monasca/persister/pipeline/event/MetricHandler.java @@ -22,154 +22,51 @@ import com.google.inject.assistedinject.Assisted; import com.codahale.metrics.Counter; -import org.apache.commons.codec.digest.DigestUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.Map; -import java.util.TimeZone; -import java.util.TreeMap; - import io.dropwizard.setup.Environment; -import monasca.common.model.metric.Metric; import monasca.common.model.metric.MetricEnvelope; import monasca.persister.configuration.PipelineConfig; import monasca.persister.repository.MetricRepo; -import monasca.persister.repository.Sha1HashId; - -import static monasca.persister.repository.VerticaMetricsConstants.MAX_COLUMN_LENGTH; public class MetricHandler extends FlushableHandler { - private static final Logger logger = LoggerFactory.getLogger(MetricHandler.class); - private static final String TENANT_ID = "tenantId"; - private static final String REGION = "region"; - private final int ordinal; - private final SimpleDateFormat simpleDateFormat; - private final MetricRepo metricRepo; private final Counter metricCounter; - private final Counter definitionCounter; - private final Counter dimensionCounter; - private final Counter definitionDimensionsCounter; @Inject - public MetricHandler(MetricRepo metricRepo, - @Assisted PipelineConfig configuration, Environment environment, - @Assisted("ordinal") int ordinal, @Assisted("batchSize") int batchSize) { + public MetricHandler(MetricRepo metricRepo, @Assisted PipelineConfig configuration, + Environment environment, @Assisted("ordinal") int ordinal, + @Assisted("batchSize") int batchSize) { + super(configuration, environment, ordinal, batchSize, MetricHandler.class.getName()); - final String handlerName = String.format("%s[%d]", MetricHandler.class.getName(), ordinal); this.metricRepo = metricRepo; + + final String handlerName = String.format("%s[%d]", MetricHandler.class.getName(), ordinal); this.metricCounter = environment.metrics().counter(handlerName + "." + "metrics-added-to-batch-counter"); - this.definitionCounter = - environment.metrics() - .counter(handlerName + "." + "metric-definitions-added-to-batch-counter"); - this.dimensionCounter = - environment.metrics() - .counter(handlerName + "." + "metric-dimensions-added-to-batch-counter"); - this.definitionDimensionsCounter = - environment.metrics() - .counter(handlerName + "." + "metric-definition-dimensions-added-to-batch-counter"); this.ordinal = ordinal; - simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); - simpleDateFormat.setTimeZone(TimeZone.getTimeZone("GMT-0")); } @Override public int process(MetricEnvelope[] metricEnvelopes) throws Exception { - int metricCount = 0; + for (final MetricEnvelope metricEnvelope : metricEnvelopes) { - metricCount += processEnvelope(metricEnvelope); + processEnvelope(metricEnvelope); } - return metricCount; + + return metricEnvelopes.length; } - private int processEnvelope(MetricEnvelope metricEnvelope) { - int metricCount = 0; - Metric metric = metricEnvelope.metric; - Map meta = metricEnvelope.meta; + private void processEnvelope(MetricEnvelope metricEnvelope) { - logger.debug("ordinal: {}", ordinal); - logger.debug("metric: {}", metric); - logger.debug("meta: {}", meta); + this.metricRepo.addToBatch(metricEnvelope); - String tenantId = ""; - if (meta.containsKey(TENANT_ID)) { - tenantId = (String) meta.get(TENANT_ID); - } else { - logger.warn( - "Failed to find tenantId in message envelope meta data. Metric message may be malformed. Setting tenantId to empty string."); - logger.warn("metric: {}", metric.toString()); - logger.warn("meta: {}", meta.toString()); - } - - String region = ""; - if (meta.containsKey(REGION)) { - region = (String) meta.get(REGION); - } else { - logger.warn( - "Failed to find region in message envelope meta data. Metric message may be malformed. Setting region to empty string."); - logger.warn("metric: {}", metric.toString()); - logger.warn("meta: {}", meta.toString()); - } - - // Add the definition to the batch. - StringBuilder - definitionIdStringToHash = - new StringBuilder(trunc(metric.getName(), MAX_COLUMN_LENGTH)); - definitionIdStringToHash.append(trunc(tenantId, MAX_COLUMN_LENGTH)); - definitionIdStringToHash.append(trunc(region, MAX_COLUMN_LENGTH)); - byte[] definitionIdSha1Hash = DigestUtils.sha(definitionIdStringToHash.toString()); - Sha1HashId definitionSha1HashId = new Sha1HashId((definitionIdSha1Hash)); - metricRepo - .addDefinitionToBatch(definitionSha1HashId, trunc(metric.getName(), MAX_COLUMN_LENGTH), - trunc(tenantId, MAX_COLUMN_LENGTH), trunc(region, MAX_COLUMN_LENGTH)); - definitionCounter.inc(); - - // Calculate dimensions sha1 hash id. - StringBuilder dimensionIdStringToHash = new StringBuilder(); - Map preppedDimMap = prepDimensions(metric.getDimensions()); - for (Map.Entry entry : preppedDimMap.entrySet()) { - dimensionIdStringToHash.append(entry.getKey()); - dimensionIdStringToHash.append(entry.getValue()); - } - byte[] dimensionIdSha1Hash = DigestUtils.sha(dimensionIdStringToHash.toString()); - Sha1HashId dimensionsSha1HashId = new Sha1HashId(dimensionIdSha1Hash); - - // Add the dimension name/values to the batch. - metricRepo.addDimensionsToBatch(dimensionsSha1HashId, preppedDimMap); - - // Add the definition dimensions to the batch. - StringBuilder - definitionDimensionsIdStringToHash = - new StringBuilder(definitionSha1HashId.toHexString()); - definitionDimensionsIdStringToHash.append(dimensionsSha1HashId.toHexString()); - byte[] - definitionDimensionsIdSha1Hash = - DigestUtils.sha(definitionDimensionsIdStringToHash.toString()); - Sha1HashId definitionDimensionsSha1HashId = new Sha1HashId(definitionDimensionsIdSha1Hash); - metricRepo - .addDefinitionDimensionToBatch(definitionDimensionsSha1HashId, definitionSha1HashId, - dimensionsSha1HashId); - definitionDimensionsCounter.inc(); - - // Add the measurement to the batch. - String timeStamp = simpleDateFormat.format(new Date(metric.getTimestamp())); - double value = metric.getValue(); - metricRepo.addMetricToBatch(definitionDimensionsSha1HashId, timeStamp, value, - metric.getValueMeta()); metricCounter.inc(); - metricCount++; - return metricCount; } @Override @@ -177,37 +74,4 @@ public class MetricHandler extends FlushableHandler { metricRepo.flush(); } - - private Map prepDimensions(Map dimMap) { - - Map newDimMap = new TreeMap<>(); - - if (dimMap != null) { - for (String dimName : dimMap.keySet()) { - if (dimName != null && !dimName.isEmpty()) { - String dimValue = dimMap.get(dimName); - if (dimValue != null && !dimValue.isEmpty()) { - newDimMap.put(trunc(dimName, MAX_COLUMN_LENGTH), trunc(dimValue, MAX_COLUMN_LENGTH)); - dimensionCounter.inc(); - } - } - } - } - return newDimMap; - } - - private String trunc(String s, int l) { - - if (s == null) { - return ""; - } else if (s.length() <= l) { - return s; - } else { - String r = s.substring(0, l); - logger.warn("Input string exceeded max column length. Truncating input string {} to {} chars", - s, l); - logger.warn("Resulting string {}", r); - return r; - } - } } diff --git a/java/src/main/java/monasca/persister/repository/AlarmRepo.java b/java/src/main/java/monasca/persister/repository/AlarmRepo.java index 025dc9b8..814c2441 100644 --- a/java/src/main/java/monasca/persister/repository/AlarmRepo.java +++ b/java/src/main/java/monasca/persister/repository/AlarmRepo.java @@ -21,7 +21,7 @@ import monasca.common.model.event.AlarmStateTransitionedEvent; public interface AlarmRepo { - public void addToBatch(final AlarmStateTransitionedEvent message); + void addToBatch(final AlarmStateTransitionedEvent message); - public void flush(); + void flush(); } diff --git a/java/src/main/java/monasca/persister/repository/InfluxMetricRepo.java b/java/src/main/java/monasca/persister/repository/InfluxMetricRepo.java deleted file mode 100644 index 56b021dd..00000000 --- a/java/src/main/java/monasca/persister/repository/InfluxMetricRepo.java +++ /dev/null @@ -1,263 +0,0 @@ -/* - * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package monasca.persister.repository; - -import com.codahale.metrics.Meter; -import com.codahale.metrics.Timer; - -import org.apache.commons.codec.digest.DigestUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeSet; - -import io.dropwizard.setup.Environment; - -public abstract class InfluxMetricRepo implements MetricRepo { - - private static final Logger logger = LoggerFactory.getLogger(InfluxMetricRepo.class); - - protected final Map defMap = new HashMap<>(); - protected final Map> dimMap = new HashMap<>(); - protected final Map defDimMap = new HashMap<>(); - protected final Map> measurementMap = new HashMap<>(); - - public final com.codahale.metrics.Timer flushTimer; - public final Meter measurementMeter; - - private static final Sha1HashId BLANK_SHA_1_HASH_ID = new Sha1HashId(DigestUtils.sha("")); - private static final Set EMPTY_DIM_TREE_SET = new TreeSet<>(); - - protected abstract void write() throws Exception; - - public InfluxMetricRepo(final Environment env) { - - this.flushTimer = env.metrics().timer(this.getClass().getName() + "." + - "flush-timer"); - - this.measurementMeter = env.metrics().meter(this.getClass().getName() + "." + - "measurement-meter"); - } - - @Override - public void addMetricToBatch(final Sha1HashId defDimsId, final String timeStamp, - final double value, final Map valueMeta) { - - final Measurement measurement = new Measurement(defDimsId, timeStamp, value, valueMeta); - List measurementList = this.measurementMap.get(defDimsId); - if (measurementList == null) { - measurementList = new LinkedList<>(); - this.measurementMap.put(defDimsId, measurementList); - } - measurementList.add(measurement); - } - - @Override - public void addDefinitionToBatch(final Sha1HashId defId, final String name, final String tenantId, - final String region) { - - if (!this.defMap.containsKey(defId)) { - - final Def def = new Def(defId, name, tenantId, region); - this.defMap.put(defId, def); - - } - } - - @Override - public void addDimensionsToBatch(final Sha1HashId dimSetId, Map dimMap) { - - if (!this.dimMap.containsKey(dimSetId)) { - - final Set dimSet = new TreeSet<>(); - this.dimMap.put(dimSetId, dimSet); - - for (Map.Entry entry : dimMap.entrySet()) { - - final String name = entry.getKey(); - final String value = entry.getValue(); - - final Dim dim = new Dim(dimSetId, name, value); - dimSet.add(dim); - } - } - } - - @Override - public void addDefinitionDimensionToBatch(final Sha1HashId defDimsId, final Sha1HashId defId, - Sha1HashId dimId) { - - if (!this.defDimMap.containsKey(defDimsId)) { - - final DefDim defDim = new DefDim(defDimsId, defId, dimId); - this.defDimMap.put(defDimsId, defDim); - - } - - } - - @Override - public void flush() { - - try { - final long startTime = System.currentTimeMillis(); - final Timer.Context context = flushTimer.time(); - - write(); - - final long endTime = System.currentTimeMillis(); - context.stop(); - - logger.debug("Writing measurements, definitions, and dimensions to InfluxDB took {} seconds", - (endTime - startTime) / 1000); - - } catch (Exception e) { - logger.error("Failed to write measurements to InfluxDB", e); - } - - clearBuffers(); - } - - protected Def getDef(final Sha1HashId defId) throws Exception { - - final Def def = this.defMap.get(defId); - if (def == null) { - throw new Exception("Failed to find definition for defId: " + defId); - } - - return def; - } - - protected Set getDimSet(final Sha1HashId dimId) throws Exception { - - // If there were no dimensions, then "" was used in the hash id and nothing was - // ever added to the dimension map for this dimension set. - if (dimId.equals(BLANK_SHA_1_HASH_ID)) { - return EMPTY_DIM_TREE_SET; - } - - final Set dimSet = this.dimMap.get(dimId); - - if (dimSet == null) { - throw new Exception("Failed to find dimension set for dimId: " + dimId); - } - - return dimSet; - } - - private void clearBuffers() { - - this.measurementMap.clear(); - this.defMap.clear(); - this.dimMap.clear(); - this.defDimMap.clear(); - } - - static protected final class Measurement { - - protected final Sha1HashId defDimsId; - protected final String time; - protected final double value; - protected final Map valueMeta; - - private Measurement(final Sha1HashId defDimsId, final String time, final double value, - final Map valueMeta) { - this.defDimsId = defDimsId; - this.time = time; - this.value = value; - this.valueMeta = valueMeta; - } - - @Override - public String toString() { - return "Measurement{" + "defDimsId=" + this.defDimsId + ", time='" + this.time + '\'' + ", " + - "value=" + this.value + '}'; - } - } - - static protected final class Def { - - protected final Sha1HashId defId; - protected final String name; - protected final String tenantId; - protected final String region; - - private Def(final Sha1HashId defId, final String name, final String tenantId, - final String region) { - this.defId = defId; - this.name = name; - this.tenantId = tenantId; - this.region = region; - } - - @Override - public String toString() { - return "Definition{" + "defId=" + this.defId + ", name='" + this.name + '\'' + ", " + - "tenantId='" + this.tenantId + '\'' + ", region='" + this.region + '\'' + '}'; - } - } - - static protected final class Dim implements Comparable { - - protected final Sha1HashId dimSetId; - protected final String name; - protected final String value; - - private Dim(final Sha1HashId dimSetId, final String name, final String value) { - this.dimSetId = dimSetId; - this.name = name; - this.value = value; - } - - @Override - public String toString() { - return "Dimension{" + "dimSetId=" + this.dimSetId + ", name='" + this.name + '\'' + ", " + - "value='" + this.value + '\'' + '}'; - } - - @Override - public int compareTo(Dim o) { - int nameCmp = String.CASE_INSENSITIVE_ORDER.compare(name, o.name); - return (nameCmp != 0 ? nameCmp : String.CASE_INSENSITIVE_ORDER.compare(this.value, o.value)); - } - } - - static protected final class DefDim { - - protected final Sha1HashId defDimId; - protected final Sha1HashId defId; - protected final Sha1HashId dimId; - - private DefDim(final Sha1HashId defDimId, final Sha1HashId defId, final Sha1HashId dimId) { - this.defDimId = defDimId; - this.defId = defId; - this.dimId = dimId; - } - - @Override - public String toString() { - return "DefinitionDimension{" + "defDimId=" + this.defDimId + ", defId=" + this.defId + ", " + - "dimId=" + this.dimId + '}'; - } - } -} diff --git a/java/src/main/java/monasca/persister/repository/InfluxV8MetricRepo.java b/java/src/main/java/monasca/persister/repository/InfluxV8MetricRepo.java deleted file mode 100644 index c6fe5fde..00000000 --- a/java/src/main/java/monasca/persister/repository/InfluxV8MetricRepo.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package monasca.persister.repository; - -import com.google.inject.Inject; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; - -import org.influxdb.dto.Serie; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.UnsupportedEncodingException; -import java.net.URLEncoder; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -import io.dropwizard.setup.Environment; - -public class InfluxV8MetricRepo extends InfluxMetricRepo -{ - - private static final Logger logger = LoggerFactory.getLogger(InfluxV8MetricRepo.class); - private static final String[] COL_NAMES_STRING_ARRY = {"time", "value", "value_meta"}; - - private final InfluxV8RepoWriter influxV8RepoWriter; - - private final SimpleDateFormat simpleDateFormat = - new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS zzz"); - - private final ObjectMapper objectMapper = new ObjectMapper(); - - @Inject - public InfluxV8MetricRepo(final Environment env, - final InfluxV8RepoWriter influxV8RepoWriter) { - - super(env); - this.influxV8RepoWriter = influxV8RepoWriter; - - } - - @Override - protected void write() throws Exception { - - this.influxV8RepoWriter.write(TimeUnit.MILLISECONDS, getSeries()); - } - - private Serie[] getSeries() throws Exception { - - final List serieList = new LinkedList<>(); - - for (Map.Entry> entry : this.measurementMap.entrySet()) { - Sha1HashId defDimId = entry.getKey(); - - final DefDim defDim = this.defDimMap.get(defDimId); - final Def def = getDef(defDim.defId); - final Set dimSet = getDimSet(defDim.dimId); - final Serie.Builder builder = new Serie.Builder(buildSerieName(def, dimSet)); - - builder.columns(COL_NAMES_STRING_ARRY); - - for (final Measurement measurement : entry.getValue()) { - final Object[] colValsObjArry = new Object[COL_NAMES_STRING_ARRY.length]; - final Date date = this.simpleDateFormat.parse(measurement.time + " UTC"); - final Long time = date.getTime(); - colValsObjArry[0] = time; - logger.debug("Added column value to colValsObjArry[{}] = {}", 0, colValsObjArry[0]); - colValsObjArry[1] = measurement.value; - logger.debug("Added column value to colValsObjArry[{}] = {}", 1, colValsObjArry[1]); - if (measurement.valueMeta != null && !measurement.valueMeta.isEmpty()) { - try { - final String valueMetaJson = objectMapper.writeValueAsString(measurement.valueMeta); - colValsObjArry[2] = valueMetaJson; - logger.debug("Added column value to colValsObjArry[{}] = {}", 2, valueMetaJson); - } - catch (JsonProcessingException e) { - logger.error("Unable to serialize value meta data {}", measurement.valueMeta, e); - } - } - builder.values(colValsObjArry); - this.measurementMeter.mark(); - } - - final Serie serie = builder.build(); - - if (logger.isDebugEnabled()) { - this.influxV8RepoWriter.logColValues(serie); - } - - serieList.add(serie); - logger.debug("Added serie: {} to serieList", serie.getName()); - } - - return serieList.toArray(new Serie[serieList.size()]); - } - - private String buildSerieName(final Def def, final Set dimList) - throws UnsupportedEncodingException { - - logger.debug("Creating serie name"); - - final StringBuilder serieNameBuilder = new StringBuilder(); - - logger.debug("Adding tenant_id to serie name: {}", def.tenantId); - serieNameBuilder.append(urlEncodeUTF8(def.tenantId)); - serieNameBuilder.append("?"); - - logger.debug("Adding region to serie name: {}", def.region); - serieNameBuilder.append(urlEncodeUTF8(def.region)); - serieNameBuilder.append("&"); - logger.debug("Adding name to serie name: {}", def.name); - serieNameBuilder.append(urlEncodeUTF8(def.name)); - - for (final Dim dim : dimList) { - serieNameBuilder.append("&"); - logger.debug("Adding dimension name to serie name: {}", dim.name); - serieNameBuilder.append(urlEncodeUTF8(dim.name)); - serieNameBuilder.append("="); - logger.debug("Adding dimension value to serie name: {}", dim.value); - serieNameBuilder.append(urlEncodeUTF8(dim.value)); - } - - final String serieName = serieNameBuilder.toString(); - logger.debug("Created serie name: {}", serieName); - - return serieName; - } - - private String urlEncodeUTF8(final String s) throws UnsupportedEncodingException { - return URLEncoder.encode(s, "UTF-8"); - } -} diff --git a/java/src/main/java/monasca/persister/repository/InfluxV9MetricRepo.java b/java/src/main/java/monasca/persister/repository/InfluxV9MetricRepo.java deleted file mode 100644 index 73e1dd37..00000000 --- a/java/src/main/java/monasca/persister/repository/InfluxV9MetricRepo.java +++ /dev/null @@ -1,117 +0,0 @@ - -/* - * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package monasca.persister.repository; - -import com.google.inject.Inject; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; - -import org.joda.time.DateTime; -import org.joda.time.DateTimeZone; -import org.joda.time.format.DateTimeFormatter; -import org.joda.time.format.ISODateTimeFormat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import io.dropwizard.setup.Environment; -import monasca.persister.repository.influxdb.InfluxPoint; - -public class InfluxV9MetricRepo extends InfluxMetricRepo { - - private static final Logger logger = LoggerFactory.getLogger(InfluxV9MetricRepo.class); - - private final ObjectMapper objectMapper = new ObjectMapper(); - - private final SimpleDateFormat simpleDateFormat = - new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS zzz"); - - private final InfluxV9RepoWriter influxV9RepoWriter; - - @Inject - public InfluxV9MetricRepo(final Environment env, - final InfluxV9RepoWriter influxV9RepoWriter) { - - super(env); - this.influxV9RepoWriter = influxV9RepoWriter; - - } - - @Override - protected void write() throws Exception { - - this.influxV9RepoWriter.write(getInfluxPointArry()); - - } - - private InfluxPoint[] getInfluxPointArry() throws Exception { - - DateTimeFormatter dateFormatter = ISODateTimeFormat.dateTime(); - - List influxPointList = new LinkedList<>(); - - for (final Sha1HashId defDimId : this.measurementMap.keySet()) { - - final DefDim defDim = this.defDimMap.get(defDimId); - final Def def = getDef(defDim.defId); - final Set dimSet = getDimSet(defDim.dimId); - - Map tagMap = new HashMap<>(); - for (Dim dim : dimSet) { - tagMap.put(dim.name, dim.value); - } - tagMap.put("_tenant_id", def.tenantId); - tagMap.put("_region", def.region); - - for (final Measurement measurement : this.measurementMap.get(defDimId)) { - - Date date = this.simpleDateFormat.parse(measurement.time + " UTC"); - DateTime dateTime = new DateTime(date.getTime(), DateTimeZone.UTC); - String dateString = dateFormatter.print(dateTime); - - Map valueMap = new HashMap<>(); - valueMap.put("value", measurement.value); - if (measurement.valueMeta != null && !measurement.valueMeta.isEmpty()) { - try { - final String valueMetaJson = objectMapper.writeValueAsString(measurement.valueMeta); - logger.debug("Added value for value_meta of {}", valueMetaJson); - valueMap.put("value_meta", valueMetaJson); - } catch (JsonProcessingException e) { - logger.error("Unable to serialize {}", measurement.valueMeta, e); - } - } - InfluxPoint influxPoint = new InfluxPoint(def.name, tagMap, dateString, valueMap); - - influxPointList.add(influxPoint); - - this.measurementMeter.mark(); - } - } - - return influxPointList.toArray(new InfluxPoint[influxPointList.size()]); - } -} diff --git a/java/src/main/java/monasca/persister/repository/MetricRepo.java b/java/src/main/java/monasca/persister/repository/MetricRepo.java index dc043581..fe2301c4 100644 --- a/java/src/main/java/monasca/persister/repository/MetricRepo.java +++ b/java/src/main/java/monasca/persister/repository/MetricRepo.java @@ -17,16 +17,11 @@ package monasca.persister.repository; -import java.util.Map; +import monasca.common.model.metric.MetricEnvelope; public interface MetricRepo { - void addMetricToBatch(Sha1HashId defDimsId, String timeStamp, double value, Map valueMeta); - void addDefinitionToBatch(Sha1HashId defId, String name, String tenantId, String region); - - void addDimensionsToBatch(Sha1HashId dimSetId, Map dimMap); - - void addDefinitionDimensionToBatch(Sha1HashId defDimsId, Sha1HashId defId, Sha1HashId dimId); + void addToBatch(MetricEnvelope metricEnvelope); void flush(); } diff --git a/java/src/main/java/monasca/persister/repository/VerticaMetricsConstants.java b/java/src/main/java/monasca/persister/repository/VerticaMetricsConstants.java deleted file mode 100644 index 7ba665d8..00000000 --- a/java/src/main/java/monasca/persister/repository/VerticaMetricsConstants.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package monasca.persister.repository; - -public final class VerticaMetricsConstants { - public static final int MAX_COLUMN_LENGTH = 255; -} diff --git a/java/src/main/java/monasca/persister/repository/influxdb/Definition.java b/java/src/main/java/monasca/persister/repository/influxdb/Definition.java new file mode 100644 index 00000000..5f5c526c --- /dev/null +++ b/java/src/main/java/monasca/persister/repository/influxdb/Definition.java @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2015 Hewlett-Packard Development Company, L.P. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package monasca.persister.repository.influxdb; + +public final class Definition { + + private static final int MAX_DEFINITION_NAME_LENGTH = 255; + private static final int MAX_TENANT_ID_LENGTH = 255; + private static final int MAX_REGION_LENGTH = 255; + + public final String name; + public final String tenantId; + public final String region; + + public Definition(String name, String tenantId, String region) { + + if (name.length() > MAX_DEFINITION_NAME_LENGTH) { + name = name.substring(0, MAX_DEFINITION_NAME_LENGTH); + } + + this.name = name; + + if (tenantId.length() > MAX_TENANT_ID_LENGTH) { + tenantId = tenantId.substring(0, MAX_TENANT_ID_LENGTH); + } + + this.tenantId = tenantId; + + if (region.length() > MAX_REGION_LENGTH) { + region = region.substring(0, MAX_REGION_LENGTH); + } + + this.region = region; + } + + + public String getName() { + return name; + } + + public String getTenantId() { + return tenantId; + } + + public String getRegion() { + return region; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof Definition)) { + return false; + } + + Definition that = (Definition) o; + + if (!name.equals(that.name)) { + return false; + } + if (!tenantId.equals(that.tenantId)) { + return false; + } + return region.equals(that.region); + + } + + @Override + public int hashCode() { + int result = name.hashCode(); + result = 31 * result + tenantId.hashCode(); + result = 31 * result + region.hashCode(); + return result; + } +} diff --git a/java/src/main/java/monasca/persister/repository/influxdb/Dimensions.java b/java/src/main/java/monasca/persister/repository/influxdb/Dimensions.java new file mode 100644 index 00000000..ad0c230c --- /dev/null +++ b/java/src/main/java/monasca/persister/repository/influxdb/Dimensions.java @@ -0,0 +1,103 @@ +/* + * Copyright (c) 2015 Hewlett-Packard Development Company, L.P. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package monasca.persister.repository.influxdb; + +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +import javax.annotation.Nullable; + +public class Dimensions { + + private static final int MAX_DIMENSIONS_NAME_LENGTH = 255; + private static final int MAX_DIMENSIONS_VALUE_LENGTH = 255; + + private final Map dimensionsMap; + + + public Dimensions(@Nullable Map dimensionsMap) { + + this.dimensionsMap = new TreeMap<>(); + + if (dimensionsMap != null) { + + for (String name : dimensionsMap.keySet()) { + + if (name != null && !name.isEmpty()) { + + String value = dimensionsMap.get(name); + + if (value != null && !value.isEmpty()) { + + if (name.length() > MAX_DIMENSIONS_NAME_LENGTH) { + + name = name.substring(0, MAX_DIMENSIONS_NAME_LENGTH); + + } + + if (value.length() > MAX_DIMENSIONS_VALUE_LENGTH) { + + value = value.substring(0, MAX_DIMENSIONS_VALUE_LENGTH); + } + + this.dimensionsMap.put(name, value); + + } + } + } + } + } + + @Override + public boolean equals(Object o) { + + if (this == o) { + return true; + } + + if (!(o instanceof Dimensions)) { + return false; + } + + Dimensions that = (Dimensions) o; + + return dimensionsMap.equals(that.dimensionsMap); + + } + + @Override + public int hashCode() { + return dimensionsMap.hashCode(); + } + + public Set keySet() { + + return this.dimensionsMap.keySet(); + + } + + public Set> entrySet() { + + return this.dimensionsMap.entrySet(); + + } + + public String get(String key) { + + return this.dimensionsMap.get(key); + + } +} diff --git a/java/src/main/java/monasca/persister/repository/InfluxAlarmRepo.java b/java/src/main/java/monasca/persister/repository/influxdb/InfluxAlarmRepo.java similarity index 96% rename from java/src/main/java/monasca/persister/repository/InfluxAlarmRepo.java rename to java/src/main/java/monasca/persister/repository/influxdb/InfluxAlarmRepo.java index 9e3ee7ec..d7d21265 100644 --- a/java/src/main/java/monasca/persister/repository/InfluxAlarmRepo.java +++ b/java/src/main/java/monasca/persister/repository/influxdb/InfluxAlarmRepo.java @@ -12,7 +12,10 @@ * the License. */ -package monasca.persister.repository; +package monasca.persister.repository.influxdb; + +import monasca.common.model.event.AlarmStateTransitionedEvent; +import monasca.persister.repository.AlarmRepo; import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; @@ -25,7 +28,6 @@ import java.util.LinkedList; import java.util.List; import io.dropwizard.setup.Environment; -import monasca.common.model.event.AlarmStateTransitionedEvent; public abstract class InfluxAlarmRepo implements AlarmRepo { diff --git a/java/src/main/java/monasca/persister/repository/influxdb/InfluxMetricRepo.java b/java/src/main/java/monasca/persister/repository/influxdb/InfluxMetricRepo.java new file mode 100644 index 00000000..578b3a2d --- /dev/null +++ b/java/src/main/java/monasca/persister/repository/influxdb/InfluxMetricRepo.java @@ -0,0 +1,103 @@ +/* + * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monasca.persister.repository.influxdb; + +import monasca.common.model.metric.Metric; +import monasca.common.model.metric.MetricEnvelope; +import monasca.persister.repository.MetricRepo; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.Timer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +import io.dropwizard.setup.Environment; + +public abstract class InfluxMetricRepo implements MetricRepo { + + private static final Logger logger = LoggerFactory.getLogger(InfluxMetricRepo.class); + + protected final MeasurementBuffer measurementBuffer = new MeasurementBuffer(); + + public final com.codahale.metrics.Timer flushTimer; + public final Meter measurementMeter; + + protected abstract void write() throws Exception; + + public InfluxMetricRepo(final Environment env) { + + this.flushTimer = env.metrics().timer(this.getClass().getName() + ".flush-timer"); + this.measurementMeter = env.metrics().meter(this.getClass().getName() + ".measurement-meter"); + } + + @Override + public void addToBatch(MetricEnvelope metricEnvelope) { + + Metric metric = metricEnvelope.metric; + Map meta = metricEnvelope.meta; + + Definition + definition = + new Definition(metric.getName(), (String) meta.get("tenantId"), + (String) meta.get("region")); + + Dimensions dimensions = new Dimensions(metric.getDimensions()); + + Measurement + measurement = + new Measurement(metric.getTimestamp(), metric.getValue(), metric.getValueMeta()); + + this.measurementBuffer.put(definition, dimensions, measurement); + this.measurementMeter.mark(); + + } + + + @Override + public void flush() { + + try { + final long startTime = System.currentTimeMillis(); + final Timer.Context context = flushTimer.time(); + + write(); + + final long endTime = System.currentTimeMillis(); + context.stop(); + + logger.debug("Writing measurements, definitions, and dimensions to InfluxDB took {} seconds", + (endTime - startTime) / 1000); + + } catch (Exception e) { + logger.error("Failed to write measurements to InfluxDB", e); + } + + clearBuffers(); + } + + + private void clearBuffers() { + + this.measurementBuffer.clear(); + + } + +} diff --git a/java/src/main/java/monasca/persister/repository/InfluxV8AlarmRepo.java b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV8AlarmRepo.java similarity index 98% rename from java/src/main/java/monasca/persister/repository/InfluxV8AlarmRepo.java rename to java/src/main/java/monasca/persister/repository/influxdb/InfluxV8AlarmRepo.java index 1d1f2451..a6fb2fee 100644 --- a/java/src/main/java/monasca/persister/repository/InfluxV8AlarmRepo.java +++ b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV8AlarmRepo.java @@ -15,7 +15,9 @@ * limitations under the License. */ -package monasca.persister.repository; +package monasca.persister.repository.influxdb; + +import monasca.common.model.event.AlarmStateTransitionedEvent; import com.google.inject.Inject; @@ -30,7 +32,6 @@ import org.slf4j.LoggerFactory; import java.util.concurrent.TimeUnit; import io.dropwizard.setup.Environment; -import monasca.common.model.event.AlarmStateTransitionedEvent; public class InfluxV8AlarmRepo extends InfluxAlarmRepo { diff --git a/java/src/main/java/monasca/persister/repository/influxdb/InfluxV8MetricRepo.java b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV8MetricRepo.java new file mode 100644 index 00000000..01ee2569 --- /dev/null +++ b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV8MetricRepo.java @@ -0,0 +1,150 @@ +/* + * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monasca.persister.repository.influxdb; + +import com.google.inject.Inject; + +import org.influxdb.dto.Serie; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; +import java.text.ParseException; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import io.dropwizard.setup.Environment; + +public class InfluxV8MetricRepo extends InfluxMetricRepo +{ + + private static final Logger logger = LoggerFactory.getLogger(InfluxV8MetricRepo.class); + private static final String[] COL_NAMES_STRING_ARRY = {"time", "value", "value_meta"}; + + private final InfluxV8RepoWriter influxV8RepoWriter; + + @Inject + public InfluxV8MetricRepo(final Environment env, + final InfluxV8RepoWriter influxV8RepoWriter) { + + super(env); + this.influxV8RepoWriter = influxV8RepoWriter; + + } + + @Override + protected void write() throws Exception { + + this.influxV8RepoWriter.write(TimeUnit.MILLISECONDS, getSeries()); + } + + private Serie[] getSeries() throws UnsupportedEncodingException, ParseException { + + final List serieList = new LinkedList<>(); + + for (Map.Entry>> definitionMapEntry + : this.measurementBuffer.entrySet()) { + + Definition definition = definitionMapEntry.getKey(); + Map> dimensionsMap = definitionMapEntry.getValue(); + + for (Map.Entry> dimensionsMapEntry : dimensionsMap.entrySet()) { + + Dimensions dimensions = dimensionsMapEntry.getKey(); + List measurementList = dimensionsMapEntry.getValue(); + + final Serie.Builder builder = new Serie.Builder(buildSerieName(definition, dimensions)); + builder.columns(COL_NAMES_STRING_ARRY); + + for (Measurement measurement : measurementList) { + + final Object[] valObjArry = new Object[COL_NAMES_STRING_ARRY.length]; + + valObjArry[0] = measurement.getTime(); + logger.debug("Added column value to valObjArry[{}] = {}", 0, valObjArry[0]); + + valObjArry[1] = measurement.getValue(); + logger.debug("Added column value to valObjArry[{}] = {}", 1, valObjArry[1]); + + valObjArry[2] = measurement.getValueMetaJSONString(); + logger.debug("Added column value to valObjArry[{}] = {}", 2, valObjArry[2]); + + builder.values(valObjArry); + + this.measurementMeter.mark(); + + } + + final Serie serie = builder.build(); + + if (logger.isDebugEnabled()) { + this.influxV8RepoWriter.logColValues(serie); + } + + serieList.add(serie); + logger.debug("Added serie: {} to serieList", serie.getName()); + + } + + } + + return serieList.toArray(new Serie[serieList.size()]); + + } + + private String buildSerieName(final Definition definition, final Dimensions dimensions) + throws UnsupportedEncodingException { + + logger.debug("Creating serie name"); + + final StringBuilder serieNameBuilder = new StringBuilder(); + + logger.debug("Adding tenant_id to serie name: {}", definition.getTenantId()); + serieNameBuilder.append(urlEncodeUTF8(definition.getTenantId())); + serieNameBuilder.append("?"); + + logger.debug("Adding region to serie name: {}", definition.getRegion()); + serieNameBuilder.append(urlEncodeUTF8(definition.getRegion())); + serieNameBuilder.append("&"); + logger.debug("Adding name to serie name: {}", definition.getName()); + serieNameBuilder.append(urlEncodeUTF8(definition.getName())); + + for (final String name : dimensions.keySet()) { + final String value = dimensions.get(name); + serieNameBuilder.append("&"); + logger.debug("Adding dimension name to serie name: {}", name); + serieNameBuilder.append(urlEncodeUTF8(name)); + serieNameBuilder.append("="); + logger.debug("Adding dimension value to serie name: {}", value); + serieNameBuilder.append(urlEncodeUTF8(value)); + } + + final String serieName = serieNameBuilder.toString(); + logger.debug("Created serie name: {}", serieName); + + return serieName; + } + + + private String urlEncodeUTF8(final String s) throws UnsupportedEncodingException { + return URLEncoder.encode(s, "UTF-8"); + } +} diff --git a/java/src/main/java/monasca/persister/repository/InfluxV8RepoWriter.java b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV8RepoWriter.java similarity index 98% rename from java/src/main/java/monasca/persister/repository/InfluxV8RepoWriter.java rename to java/src/main/java/monasca/persister/repository/influxdb/InfluxV8RepoWriter.java index 31804550..e6ca5a16 100644 --- a/java/src/main/java/monasca/persister/repository/InfluxV8RepoWriter.java +++ b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV8RepoWriter.java @@ -15,7 +15,9 @@ * limitations under the License. */ -package monasca.persister.repository; +package monasca.persister.repository.influxdb; + +import monasca.persister.configuration.PersisterConfig; import com.google.inject.Inject; @@ -30,7 +32,6 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import io.dropwizard.setup.Environment; -import monasca.persister.configuration.PersisterConfig; public class InfluxV8RepoWriter { diff --git a/java/src/main/java/monasca/persister/repository/InfluxV9AlarmRepo.java b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9AlarmRepo.java similarity index 97% rename from java/src/main/java/monasca/persister/repository/InfluxV9AlarmRepo.java rename to java/src/main/java/monasca/persister/repository/influxdb/InfluxV9AlarmRepo.java index c6939d60..284f6350 100644 --- a/java/src/main/java/monasca/persister/repository/InfluxV9AlarmRepo.java +++ b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9AlarmRepo.java @@ -15,7 +15,9 @@ * limitations under the License. */ -package monasca.persister.repository; +package monasca.persister.repository.influxdb; + +import monasca.common.model.event.AlarmStateTransitionedEvent; import com.google.inject.Inject; @@ -35,8 +37,6 @@ import java.util.List; import java.util.Map; import io.dropwizard.setup.Environment; -import monasca.common.model.event.AlarmStateTransitionedEvent; -import monasca.persister.repository.influxdb.InfluxPoint; public class InfluxV9AlarmRepo extends InfluxAlarmRepo { diff --git a/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9MetricRepo.java b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9MetricRepo.java new file mode 100644 index 00000000..aaf9b707 --- /dev/null +++ b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9MetricRepo.java @@ -0,0 +1,119 @@ + +/* + * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monasca.persister.repository.influxdb; + +import com.google.inject.Inject; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import io.dropwizard.setup.Environment; + +public class InfluxV9MetricRepo extends InfluxMetricRepo { + + private final InfluxV9RepoWriter influxV9RepoWriter; + + @Inject + public InfluxV9MetricRepo(final Environment env, + final InfluxV9RepoWriter influxV9RepoWriter) { + + super(env); + this.influxV9RepoWriter = influxV9RepoWriter; + + } + + @Override + protected void write() throws Exception { + + this.influxV9RepoWriter.write(getInfluxPointArry()); + + } + + private InfluxPoint[] getInfluxPointArry() throws Exception { + + List influxPointList = new LinkedList<>(); + + for (Map.Entry>> definitionMapEntry : + this.measurementBuffer.entrySet()) { + + Definition definition = definitionMapEntry.getKey(); + Map> dimensionsMap = definitionMapEntry.getValue(); + + for (Map.Entry> dimensionsMapEntry : dimensionsMap.entrySet()) { + + Dimensions dimensions = dimensionsMapEntry.getKey(); + List measurementList = dimensionsMapEntry.getValue(); + + Map tagMap = buildTagMap(definition, dimensions); + + for (Measurement measurement : measurementList) { + + InfluxPoint + influxPoint = + new InfluxPoint(definition.getName(), + tagMap, + measurement.getISOFormattedTimeString(), + buildValueMap(measurement)); + + influxPointList.add(influxPoint); + + } + + } + + } + + return influxPointList.toArray(new InfluxPoint[influxPointList.size()]); + + } + + private Map buildValueMap(Measurement measurement) { + + Map valueMap = new HashMap<>(); + valueMap.put("value", measurement.getValue()); + String valueMetaJSONString = measurement.getValueMetaJSONString(); + if (valueMetaJSONString != null) { + valueMap.put("value_meta", valueMetaJSONString); + } + return valueMap; + + } + + private Map buildTagMap(Definition definition, Dimensions dimensions) { + + Map tagMap = new HashMap<>(); + + for (Map.Entry dimensionsEntry : dimensions.entrySet()) { + + String name = dimensionsEntry.getKey(); + String value = dimensionsEntry.getValue(); + tagMap.put(name, value); + + } + + tagMap.put("_tenant_id", definition.getTenantId()); + tagMap.put("_region", definition.getRegion()); + + return tagMap; + + } + +} diff --git a/java/src/main/java/monasca/persister/repository/InfluxV9RepoWriter.java b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9RepoWriter.java similarity index 97% rename from java/src/main/java/monasca/persister/repository/InfluxV9RepoWriter.java rename to java/src/main/java/monasca/persister/repository/influxdb/InfluxV9RepoWriter.java index ca37d102..25f38a57 100644 --- a/java/src/main/java/monasca/persister/repository/InfluxV9RepoWriter.java +++ b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9RepoWriter.java @@ -15,7 +15,9 @@ * limitations under the License. */ -package monasca.persister.repository; +package monasca.persister.repository.influxdb; + +import monasca.persister.configuration.PersisterConfig; import com.google.inject.Inject; @@ -47,10 +49,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.HashMap; -import monasca.persister.configuration.PersisterConfig; -import monasca.persister.repository.influxdb.InfluxPoint; -import monasca.persister.repository.influxdb.InfluxWrite; - public class InfluxV9RepoWriter { private static final Logger logger = LoggerFactory.getLogger(InfluxV9RepoWriter.class); diff --git a/java/src/main/java/monasca/persister/repository/influxdb/Measurement.java b/java/src/main/java/monasca/persister/repository/influxdb/Measurement.java new file mode 100644 index 00000000..db8c65b2 --- /dev/null +++ b/java/src/main/java/monasca/persister/repository/influxdb/Measurement.java @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2015 Hewlett-Packard Development Company, L.P. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package monasca.persister.repository.influxdb; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.joda.time.format.DateTimeFormatter; +import org.joda.time.format.ISODateTimeFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Date; +import java.util.HashMap; +import java.util.Map; + +import javax.annotation.Nullable; + +public final class Measurement { + + private static final Logger logger = LoggerFactory.getLogger(Measurement.class); + + private final ObjectMapper objectMapper = new ObjectMapper(); + + public final long time; + public final double value; + public final Map valueMeta; + + + public Measurement(final long time, final double value, + final @Nullable Map valueMeta) { + + this.time = time; + this.value = value; + this.valueMeta = valueMeta == null ? new HashMap() : valueMeta; + + } + + public String getISOFormattedTimeString() { + + DateTimeFormatter dateFormatter = ISODateTimeFormat.dateTime(); + Date date = new Date(this.time); + DateTime dateTime = new DateTime(date.getTime(), DateTimeZone.UTC); + + return dateFormatter.print(dateTime); + } + + public long getTime() { + return time; + } + + public double getValue() { + return value; + } + + public Map getValueMeta() { + return valueMeta; + } + + public String getValueMetaJSONString() { + + if (!this.valueMeta.isEmpty()) { + + try { + + return objectMapper.writeValueAsString(this.valueMeta); + + } catch (JsonProcessingException e) { + + logger.error("Failed to serialize value meta {}", this.valueMeta, e); + + } + + } + + return null; + + } + +} diff --git a/java/src/main/java/monasca/persister/repository/influxdb/MeasurementBuffer.java b/java/src/main/java/monasca/persister/repository/influxdb/MeasurementBuffer.java new file mode 100644 index 00000000..5b80125e --- /dev/null +++ b/java/src/main/java/monasca/persister/repository/influxdb/MeasurementBuffer.java @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2015 Hewlett-Packard Development Company, L.P. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package monasca.persister.repository.influxdb; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class MeasurementBuffer { + + private final Map>> + measurementMap = new HashMap<>(); + + public void put(Definition definition, Dimensions dimensions, Measurement measurement) { + + Map> dimensionsMap = this.measurementMap.get(definition); + + if (dimensionsMap == null) { + dimensionsMap = initDimensionsMap(definition, dimensions); + } + + List measurementList = dimensionsMap.get(dimensions); + + if (measurementList == null) { + measurementList = initMeasurementList(dimensionsMap, dimensions); + } + + measurementList.add(measurement); + + } + + public Set>>> entrySet() { + + return this.measurementMap.entrySet(); + + } + + public void clear() { + + this.measurementMap.clear(); + + } + + private Map> initDimensionsMap(Definition definition, + Dimensions dimensions) { + + Map> dimensionsMap = new HashMap<>(); + List measurementList = new LinkedList<>(); + dimensionsMap.put(dimensions, measurementList); + this.measurementMap.put(definition, dimensionsMap); + + return dimensionsMap; + } + + private List initMeasurementList(Map> dimensionsMap, + Dimensions dimensions) { + + List measurementList = new LinkedList<>(); + dimensionsMap.put(dimensions, measurementList); + + return measurementList; + + } + +} diff --git a/java/src/main/java/monasca/persister/repository/Sha1HashId.java b/java/src/main/java/monasca/persister/repository/vertica/Sha1HashId.java similarity index 96% rename from java/src/main/java/monasca/persister/repository/Sha1HashId.java rename to java/src/main/java/monasca/persister/repository/vertica/Sha1HashId.java index 392906f9..899b073b 100644 --- a/java/src/main/java/monasca/persister/repository/Sha1HashId.java +++ b/java/src/main/java/monasca/persister/repository/vertica/Sha1HashId.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package monasca.persister.repository; +package monasca.persister.repository.vertica; import org.apache.commons.codec.binary.Hex; diff --git a/java/src/main/java/monasca/persister/repository/VerticaAlarmRepo.java b/java/src/main/java/monasca/persister/repository/vertica/VerticaAlarmRepo.java similarity index 97% rename from java/src/main/java/monasca/persister/repository/VerticaAlarmRepo.java rename to java/src/main/java/monasca/persister/repository/vertica/VerticaAlarmRepo.java index acce809e..e2c25827 100644 --- a/java/src/main/java/monasca/persister/repository/VerticaAlarmRepo.java +++ b/java/src/main/java/monasca/persister/repository/vertica/VerticaAlarmRepo.java @@ -15,15 +15,14 @@ * limitations under the License. */ -package monasca.persister.repository; +package monasca.persister.repository.vertica; import monasca.common.model.event.AlarmStateTransitionedEvent; import monasca.persister.configuration.PersisterConfig; +import monasca.persister.repository.AlarmRepo; import com.codahale.metrics.Timer; -import io.dropwizard.setup.Environment; - import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.PreparedBatch; import org.slf4j.Logger; @@ -37,6 +36,8 @@ import java.util.TimeZone; import javax.inject.Inject; +import io.dropwizard.setup.Environment; + public class VerticaAlarmRepo extends VerticaRepo implements AlarmRepo { private static final Logger logger = LoggerFactory.getLogger(VerticaAlarmRepo.class); diff --git a/java/src/main/java/monasca/persister/repository/VerticaMetricRepo.java b/java/src/main/java/monasca/persister/repository/vertica/VerticaMetricRepo.java similarity index 68% rename from java/src/main/java/monasca/persister/repository/VerticaMetricRepo.java rename to java/src/main/java/monasca/persister/repository/vertica/VerticaMetricRepo.java index b6d9c03a..79a7a9f2 100644 --- a/java/src/main/java/monasca/persister/repository/VerticaMetricRepo.java +++ b/java/src/main/java/monasca/persister/repository/vertica/VerticaMetricRepo.java @@ -15,14 +15,22 @@ * limitations under the License. */ -package monasca.persister.repository; +package monasca.persister.repository.vertica; + +import monasca.common.model.metric.Metric; +import monasca.common.model.metric.MetricEnvelope; +import monasca.persister.configuration.PersisterConfig; +import monasca.persister.pipeline.event.MetricHandler; +import monasca.persister.repository.MetricRepo; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; +import com.codahale.metrics.Counter; import com.codahale.metrics.Meter; import com.codahale.metrics.Timer; +import org.apache.commons.codec.digest.DigestUtils; import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.PreparedBatch; import org.slf4j.Logger; @@ -30,19 +38,30 @@ import org.slf4j.LoggerFactory; import java.security.NoSuchAlgorithmException; import java.sql.SQLException; +import java.text.SimpleDateFormat; +import java.util.Date; import java.util.HashSet; import java.util.Map; +import java.util.Random; import java.util.Set; +import java.util.TimeZone; +import java.util.TreeMap; import javax.inject.Inject; import io.dropwizard.setup.Environment; -import monasca.persister.configuration.PersisterConfig; public class VerticaMetricRepo extends VerticaRepo implements MetricRepo { private static final Logger logger = LoggerFactory.getLogger(VerticaMetricRepo.class); + public static final int MAX_COLUMN_LENGTH = 255; + + private final SimpleDateFormat simpleDateFormat; + + private static final String TENANT_ID = "tenantId"; + private static final String REGION = "region"; + private final Environment environment; private final Cache definitionsIdCache; @@ -56,8 +75,6 @@ public class VerticaMetricRepo extends VerticaRepo implements MetricRepo { private static final String SQL_INSERT_INTO_METRICS = "insert into MonMetrics.measurements (definition_dimensions_id, time_stamp, value) values (:definition_dimension_id, :time_stamp, :value)"; - // If any of the columns change size be sure to update VerticaMetricConstants.java as well. - private static final String DEFINITIONS_TEMP_STAGING_TABLE = "(" + " id BINARY(20) NOT NULL," + " name VARCHAR(255) NOT NULL," + " tenant_id VARCHAR(255) NOT NULL," + " region VARCHAR(255) NOT NULL" + ")"; @@ -83,6 +100,12 @@ public class VerticaMetricRepo extends VerticaRepo implements MetricRepo { private final String dimensionsTempStagingTableInsertStmt; private final String definitionDimensionsTempStagingTableInsertStmt; + + private final Counter metricCounter; + private final Counter definitionCounter; + private final Counter dimensionCounter; + private final Counter definitionDimensionsCounter; + private final Timer flushTimer; public final Meter measurementMeter; public final Meter definitionCacheMissMeter; @@ -98,7 +121,24 @@ public class VerticaMetricRepo extends VerticaRepo implements MetricRepo { super(dbi); logger.debug("Instantiating: " + this.getClass().getName()); + simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); + simpleDateFormat.setTimeZone(TimeZone.getTimeZone("GMT-0")); + this.environment = environment; + final String handlerName = String.format("%s[%d]", MetricHandler.class.getName(), new Random().nextInt()); + + this.metricCounter = + environment.metrics().counter(handlerName + "." + "metrics-added-to-batch-counter"); + this.definitionCounter = + environment.metrics() + .counter(handlerName + "." + "metric-definitions-added-to-batch-counter"); + this.dimensionCounter = + environment.metrics() + .counter(handlerName + "." + "metric-dimensions-added-to-batch-counter"); + this.definitionDimensionsCounter = + environment.metrics() + .counter(handlerName + "." + "metric-definition-dimensions-added-to-batch-counter"); + this.flushTimer = this.environment.metrics().timer(this.getClass().getName() + "." + "flush-timer"); this.measurementMeter = @@ -203,18 +243,93 @@ public class VerticaMetricRepo extends VerticaRepo implements MetricRepo { } @Override + public void addToBatch(MetricEnvelope metricEnvelope) { + + Metric metric = metricEnvelope.metric; + Map meta = metricEnvelope.meta; + + logger.debug("metric: {}", metric); + logger.debug("meta: {}", meta); + + String tenantId = ""; + if (meta.containsKey(TENANT_ID)) { + tenantId = (String) meta.get(TENANT_ID); + } else { + logger.warn( + "Failed to find tenantId in message envelope meta data. Metric message may be malformed" + + ". Setting tenantId to empty string."); + logger.warn("metric: {}", metric.toString()); + logger.warn("meta: {}", meta.toString()); + } + + String region = ""; + if (meta.containsKey(REGION)) { + region = (String) meta.get(REGION); + } else { + logger.warn( + "Failed to find region in message envelope meta data. Metric message may be malformed. " + + "Setting region to empty string."); + logger.warn("metric: {}", metric.toString()); + logger.warn("meta: {}", meta.toString()); + } + + // Add the definition to the batch. + StringBuilder + definitionIdStringToHash = + new StringBuilder(trunc(metric.getName(), MAX_COLUMN_LENGTH)); + definitionIdStringToHash.append(trunc(tenantId, MAX_COLUMN_LENGTH)); + definitionIdStringToHash.append(trunc(region, MAX_COLUMN_LENGTH)); + byte[] definitionIdSha1Hash = DigestUtils.sha(definitionIdStringToHash.toString()); + Sha1HashId definitionSha1HashId = new Sha1HashId((definitionIdSha1Hash)); + this.addDefinitionToBatch(definitionSha1HashId, trunc(metric.getName(), MAX_COLUMN_LENGTH), + trunc(tenantId, MAX_COLUMN_LENGTH), trunc(region, MAX_COLUMN_LENGTH)); + definitionCounter.inc(); + + // Calculate dimensions sha1 hash id. + StringBuilder dimensionIdStringToHash = new StringBuilder(); + Map preppedDimMap = prepDimensions(metric.getDimensions()); + for (Map.Entry entry : preppedDimMap.entrySet()) { + dimensionIdStringToHash.append(entry.getKey()); + dimensionIdStringToHash.append(entry.getValue()); + } + byte[] dimensionIdSha1Hash = DigestUtils.sha(dimensionIdStringToHash.toString()); + Sha1HashId dimensionsSha1HashId = new Sha1HashId(dimensionIdSha1Hash); + + // Add the dimension name/values to the batch. + this.addDimensionsToBatch(dimensionsSha1HashId, preppedDimMap); + + // Add the definition dimensions to the batch. + StringBuilder + definitionDimensionsIdStringToHash = + new StringBuilder(definitionSha1HashId.toHexString()); + definitionDimensionsIdStringToHash.append(dimensionsSha1HashId.toHexString()); + byte[] + definitionDimensionsIdSha1Hash = + DigestUtils.sha(definitionDimensionsIdStringToHash.toString()); + Sha1HashId definitionDimensionsSha1HashId = new Sha1HashId(definitionDimensionsIdSha1Hash); + this.addDefinitionDimensionToBatch(definitionDimensionsSha1HashId, definitionSha1HashId, + dimensionsSha1HashId); + definitionDimensionsCounter.inc(); + + // Add the measurement to the batch. + String timeStamp = simpleDateFormat.format(new Date(metric.getTimestamp())); + double value = metric.getValue(); + this.addMetricToBatch(definitionDimensionsSha1HashId, timeStamp, value, metric.getValueMeta()); + + this.metricCounter.inc(); + } + public void addMetricToBatch(Sha1HashId defDimsId, String timeStamp, double value, Map valueMeta) { // TODO: Actually handle valueMeta logger.debug("Adding metric to batch: defDimsId: {}, time: {}, value: {}", - defDimsId.toHexString(), timeStamp, value); + defDimsId.toHexString(), timeStamp, value); metricsBatch.add().bind("definition_dimension_id", defDimsId.getSha1Hash()) .bind("time_stamp", timeStamp).bind("value", value); measurementMeter.mark(); } - @Override - public void addDefinitionToBatch(Sha1HashId defId, String name, String tenantId, String region) { + private void addDefinitionToBatch(Sha1HashId defId, String name, String tenantId, String region) { if (definitionsIdCache.getIfPresent(defId) == null) { @@ -236,8 +351,7 @@ public class VerticaMetricRepo extends VerticaRepo implements MetricRepo { } } - @Override - public void addDimensionsToBatch(Sha1HashId dimSetId, Map dimMap) { + private void addDimensionsToBatch(Sha1HashId dimSetId, Map dimMap) { if (dimensionsIdCache.getIfPresent(dimSetId) == null) { @@ -266,8 +380,7 @@ public class VerticaMetricRepo extends VerticaRepo implements MetricRepo { } } - @Override - public void addDefinitionDimensionToBatch(Sha1HashId defDimsId, Sha1HashId defId, + private void addDefinitionDimensionToBatch(Sha1HashId defDimsId, Sha1HashId defId, Sha1HashId dimId) { if (definitionDimensionsIdCache.getIfPresent(defDimsId) == null) { @@ -354,4 +467,37 @@ public class VerticaMetricRepo extends VerticaRepo implements MetricRepo { dimensionIdSet.clear(); definitionDimensionsIdSet.clear(); } + + private Map prepDimensions(Map dimMap) { + + Map newDimMap = new TreeMap<>(); + + if (dimMap != null) { + for (String dimName : dimMap.keySet()) { + if (dimName != null && !dimName.isEmpty()) { + String dimValue = dimMap.get(dimName); + if (dimValue != null && !dimValue.isEmpty()) { + newDimMap.put(trunc(dimName, MAX_COLUMN_LENGTH), trunc(dimValue, MAX_COLUMN_LENGTH)); + dimensionCounter.inc(); + } + } + } + } + return newDimMap; + } + + private String trunc(String s, int l) { + + if (s == null) { + return ""; + } else if (s.length() <= l) { + return s; + } else { + String r = s.substring(0, l); + logger.warn("Input string exceeded max column length. Truncating input string {} to {} chars", + s, l); + logger.warn("Resulting string {}", r); + return r; + } + } } diff --git a/java/src/main/java/monasca/persister/repository/VerticaRepo.java b/java/src/main/java/monasca/persister/repository/vertica/VerticaRepo.java similarity index 95% rename from java/src/main/java/monasca/persister/repository/VerticaRepo.java rename to java/src/main/java/monasca/persister/repository/vertica/VerticaRepo.java index ad44a74b..38348f27 100644 --- a/java/src/main/java/monasca/persister/repository/VerticaRepo.java +++ b/java/src/main/java/monasca/persister/repository/vertica/VerticaRepo.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package monasca.persister.repository; +package monasca.persister.repository.vertica; import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.Handle;