diff --git a/java/pom.xml b/java/pom.xml index 1f871930..b6820a6a 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -99,6 +99,11 @@ influxdb-java 1.0 + + org.apache.httpcomponents + httpclient + 4.4 + @@ -146,7 +151,7 @@ implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> - monasca.persister.MonPersisterApplication + monasca.persister.PersisterApplication diff --git a/java/src/main/java/monasca/persister/MonPersisterApplication.java b/java/src/main/java/monasca/persister/PersisterApplication.java similarity index 89% rename from java/src/main/java/monasca/persister/MonPersisterApplication.java rename to java/src/main/java/monasca/persister/PersisterApplication.java index 373d68ba..f1dff443 100644 --- a/java/src/main/java/monasca/persister/MonPersisterApplication.java +++ b/java/src/main/java/monasca/persister/PersisterApplication.java @@ -17,7 +17,7 @@ package monasca.persister; -import monasca.persister.configuration.MonPersisterConfiguration; +import monasca.persister.configuration.PersisterConfig; import monasca.persister.consumer.AlarmStateTransitionConsumer; import monasca.persister.consumer.AlarmStateTransitionConsumerFactory; import monasca.persister.consumer.KafkaAlarmStateTransitionConsumer; @@ -47,15 +47,15 @@ import io.dropwizard.setup.Environment; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class MonPersisterApplication extends Application { - private static final Logger logger = LoggerFactory.getLogger(MonPersisterApplication.class); +public class PersisterApplication extends Application { + private static final Logger logger = LoggerFactory.getLogger(PersisterApplication.class); public static void main(String[] args) throws Exception { - new MonPersisterApplication().run(args); + new PersisterApplication().run(args); } @Override - public void initialize(Bootstrap bootstrap) { + public void initialize(Bootstrap bootstrap) { } @Override @@ -64,10 +64,10 @@ public class MonPersisterApplication extends Application { private final Meter commitMeter; private final Timer commitTimer; - protected FlushableHandler(PipelineConfiguration configuration, Environment environment, + protected FlushableHandler(PipelineConfig configuration, Environment environment, int ordinal, int batchSize, String baseName) { this.handlerName = String.format("%s[%d]", baseName, ordinal); diff --git a/java/src/main/java/monasca/persister/pipeline/event/MetricHandler.java b/java/src/main/java/monasca/persister/pipeline/event/MetricHandler.java index 6cc2c067..904399ba 100644 --- a/java/src/main/java/monasca/persister/pipeline/event/MetricHandler.java +++ b/java/src/main/java/monasca/persister/pipeline/event/MetricHandler.java @@ -35,8 +35,8 @@ import java.util.TimeZone; import java.util.TreeMap; import io.dropwizard.setup.Environment; -import monasca.persister.configuration.PipelineConfiguration; -import monasca.persister.repository.MetricRepository; +import monasca.persister.configuration.PipelineConfig; +import monasca.persister.repository.MetricRepo; import monasca.persister.repository.Sha1HashId; import static monasca.persister.repository.VerticaMetricsConstants.MAX_COLUMN_LENGTH; @@ -51,7 +51,7 @@ public class MetricHandler extends FlushableHandler { private final SimpleDateFormat simpleDateFormat; - private final MetricRepository verticaMetricRepository; + private final MetricRepo verticaMetricRepo; private final Counter metricCounter; private final Counter definitionCounter; @@ -59,12 +59,12 @@ public class MetricHandler extends FlushableHandler { private final Counter definitionDimensionsCounter; @Inject - public MetricHandler(MetricRepository metricRepository, - @Assisted PipelineConfiguration configuration, Environment environment, + public MetricHandler(MetricRepo metricRepo, + @Assisted PipelineConfig configuration, Environment environment, @Assisted("ordinal") int ordinal, @Assisted("batchSize") int batchSize) { super(configuration, environment, ordinal, batchSize, MetricHandler.class.getName()); final String handlerName = String.format("%s[%d]", MetricHandler.class.getName(), ordinal); - this.verticaMetricRepository = metricRepository; + this.verticaMetricRepo = metricRepo; this.metricCounter = environment.metrics().counter(handlerName + "." + "metrics-added-to-batch-counter"); this.definitionCounter = @@ -129,7 +129,7 @@ public class MetricHandler extends FlushableHandler { definitionIdStringToHash.append(trunc(region, MAX_COLUMN_LENGTH)); byte[] definitionIdSha1Hash = DigestUtils.sha(definitionIdStringToHash.toString()); Sha1HashId definitionSha1HashId = new Sha1HashId((definitionIdSha1Hash)); - verticaMetricRepository + verticaMetricRepo .addDefinitionToBatch(definitionSha1HashId, trunc(metric.getName(), MAX_COLUMN_LENGTH), trunc(tenantId, MAX_COLUMN_LENGTH), trunc(region, MAX_COLUMN_LENGTH)); definitionCounter.inc(); @@ -146,7 +146,7 @@ public class MetricHandler extends FlushableHandler { // Add the dimension name/values to the batch. for (Map.Entry entry : preppedDimMap.entrySet()) { - verticaMetricRepository + verticaMetricRepo .addDimensionToBatch(dimensionsSha1HashId, entry.getKey(), entry.getValue()); dimensionCounter.inc(); } @@ -160,7 +160,7 @@ public class MetricHandler extends FlushableHandler { definitionDimensionsIdSha1Hash = DigestUtils.sha(definitionDimensionsIdStringToHash.toString()); Sha1HashId definitionDimensionsSha1HashId = new Sha1HashId(definitionDimensionsIdSha1Hash); - verticaMetricRepository + verticaMetricRepo .addDefinitionDimensionToBatch(definitionDimensionsSha1HashId, definitionSha1HashId, dimensionsSha1HashId); definitionDimensionsCounter.inc(); @@ -172,7 +172,7 @@ public class MetricHandler extends FlushableHandler { for (double[] timeValuePairs : metric.getTimeValues()) { String timeStamp = simpleDateFormat.format(new Date((long) (timeValuePairs[0] * 1000))); double value = timeValuePairs[1]; - verticaMetricRepository.addMetricToBatch(definitionDimensionsSha1HashId, timeStamp, value); + verticaMetricRepo.addMetricToBatch(definitionDimensionsSha1HashId, timeStamp, value); metricCounter.inc(); metricCount++; } @@ -181,7 +181,7 @@ public class MetricHandler extends FlushableHandler { { String timeStamp = simpleDateFormat.format(new Date(metric.getTimestamp() * 1000)); double value = metric.getValue(); - verticaMetricRepository.addMetricToBatch(definitionDimensionsSha1HashId, timeStamp, value); + verticaMetricRepo.addMetricToBatch(definitionDimensionsSha1HashId, timeStamp, value); metricCounter.inc(); metricCount++; } @@ -191,7 +191,7 @@ public class MetricHandler extends FlushableHandler { @Override public void flushRepository() { - verticaMetricRepository.flush(); + verticaMetricRepo.flush(); } diff --git a/java/src/main/java/monasca/persister/pipeline/event/MetricHandlerFactory.java b/java/src/main/java/monasca/persister/pipeline/event/MetricHandlerFactory.java index 34e6c9f3..331ddf34 100644 --- a/java/src/main/java/monasca/persister/pipeline/event/MetricHandlerFactory.java +++ b/java/src/main/java/monasca/persister/pipeline/event/MetricHandlerFactory.java @@ -17,11 +17,11 @@ package monasca.persister.pipeline.event; -import monasca.persister.configuration.PipelineConfiguration; +import monasca.persister.configuration.PipelineConfig; import com.google.inject.assistedinject.Assisted; public interface MetricHandlerFactory { - MetricHandler create(PipelineConfiguration pipelineConfiguration, + MetricHandler create(PipelineConfig pipelineConfig, @Assisted("ordinal") int ordinal, @Assisted("batchSize") int batchSize); } diff --git a/java/src/main/java/monasca/persister/repository/AlarmRepository.java b/java/src/main/java/monasca/persister/repository/AlarmRepo.java similarity index 88% rename from java/src/main/java/monasca/persister/repository/AlarmRepository.java rename to java/src/main/java/monasca/persister/repository/AlarmRepo.java index 3ac7e9cf..025dc9b8 100644 --- a/java/src/main/java/monasca/persister/repository/AlarmRepository.java +++ b/java/src/main/java/monasca/persister/repository/AlarmRepo.java @@ -19,9 +19,9 @@ package monasca.persister.repository; import monasca.common.model.event.AlarmStateTransitionedEvent; -public interface AlarmRepository { +public interface AlarmRepo { - public void addToBatch(AlarmStateTransitionedEvent message); + public void addToBatch(final AlarmStateTransitionedEvent message); public void flush(); } diff --git a/java/src/main/java/monasca/persister/repository/InfluxAlarmRepo.java b/java/src/main/java/monasca/persister/repository/InfluxAlarmRepo.java new file mode 100644 index 00000000..9e3ee7ec --- /dev/null +++ b/java/src/main/java/monasca/persister/repository/InfluxAlarmRepo.java @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package monasca.persister.repository; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedList; +import java.util.List; + +import io.dropwizard.setup.Environment; +import monasca.common.model.event.AlarmStateTransitionedEvent; + +public abstract class InfluxAlarmRepo implements AlarmRepo { + + private static final Logger logger = LoggerFactory.getLogger(InfluxAlarmRepo.class); + + protected static final String ALARM_STATE_HISTORY_NAME = "alarm_state_history"; + + public final Timer flushTimer; + public final Meter alarmStateHistoryMeter; + + protected List alarmStateTransitionedEventList = new LinkedList<>(); + + public InfluxAlarmRepo(final Environment env) { + + this.flushTimer = + env.metrics().timer(MetricRegistry.name(getClass(), "flush-timer")); + + this.alarmStateHistoryMeter = + env.metrics().meter( + MetricRegistry.name(getClass(), "alarm_state_history-meter")); + } + + protected abstract void write () throws Exception; + + @Override + public void addToBatch(AlarmStateTransitionedEvent alarmStateTransitionedEvent) { + + this.alarmStateTransitionedEventList.add(alarmStateTransitionedEvent); + + this.alarmStateHistoryMeter.mark(); + } + + @Override + public void flush() { + try { + if (this.alarmStateTransitionedEventList.isEmpty()) { + logger.debug("There are no alarm state transition events to be written to the influxDB"); + logger.debug("Returning from flush"); + return; + } + + long startTime = System.currentTimeMillis(); + Timer.Context context = flushTimer.time(); + + write(); + + context.stop(); + long endTime = System.currentTimeMillis(); + logger.debug("Commiting batch took {} seconds", (endTime - startTime) / 1000); + + } catch (Exception e) { + logger.error("Failed to write alarm state history to database", e); + } + + this.alarmStateTransitionedEventList.clear(); + } +} diff --git a/java/src/main/java/monasca/persister/repository/InfluxDBAlarmRepository.java b/java/src/main/java/monasca/persister/repository/InfluxDBAlarmRepository.java deleted file mode 100644 index 80ac9cff..00000000 --- a/java/src/main/java/monasca/persister/repository/InfluxDBAlarmRepository.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package monasca.persister.repository; - -import io.dropwizard.setup.Environment; - -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import monasca.persister.configuration.MonPersisterConfiguration; - -import org.influxdb.dto.Serie; -import org.influxdb.dto.Serie.Builder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.codahale.metrics.Meter; -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.Timer; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.PropertyNamingStrategy; -import com.google.inject.Inject; -import monasca.common.model.event.AlarmStateTransitionedEvent; - -public class InfluxDBAlarmRepository extends InfluxRepository implements AlarmRepository { - private static final Logger logger = LoggerFactory.getLogger(InfluxDBAlarmRepository.class); - private static final String ALARM_STATE_HISTORY_NAME = "alarm_state_history"; - private static final String[] COLUMN_NAMES = {"tenant_id", "alarm_id", "metrics", "old_state", - "new_state", "reason", "reason_data", "time"}; - static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - - static { - OBJECT_MAPPER - .setPropertyNamingStrategy(PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES); - } - - protected final Timer flushTimer; - - private List alarmStateTransitionedEventList = new LinkedList<>(); - - public final Meter alarmStateHistoryMeter; - - @Inject - public InfluxDBAlarmRepository(MonPersisterConfiguration configuration, Environment environment) { - super(configuration, environment); - this.flushTimer = - this.environment.metrics().timer(MetricRegistry.name(getClass(), "flush-timer")); - this.alarmStateHistoryMeter = - this.environment.metrics().meter( - MetricRegistry.name(getClass(), "alarm_state_history-meter")); - } - - @Override - public void addToBatch(AlarmStateTransitionedEvent alarmStateTransitionedEvent) { - alarmStateTransitionedEventList.add(alarmStateTransitionedEvent); - this.alarmStateHistoryMeter.mark(); - } - - @Override - public void flush() { - try { - if (this.alarmStateTransitionedEventList.isEmpty()) { - logger.debug("There are no alarm state transition events to be written to the influxDB"); - logger.debug("Returning from flush"); - return; - } - - long startTime = System.currentTimeMillis(); - Timer.Context context = flushTimer.time(); - - final Builder builder = new Serie.Builder(ALARM_STATE_HISTORY_NAME); - logger.debug("Created serie: {}", ALARM_STATE_HISTORY_NAME); - - builder.columns(COLUMN_NAMES); - - if (logger.isDebugEnabled()) { - logColumnNames(COLUMN_NAMES); - } - - for (AlarmStateTransitionedEvent alarmStateTransitionedEvent : alarmStateTransitionedEventList) { - builder.values(alarmStateTransitionedEvent.tenantId, alarmStateTransitionedEvent.alarmId, - OBJECT_MAPPER.writeValueAsString(alarmStateTransitionedEvent.metrics), - alarmStateTransitionedEvent.oldState, alarmStateTransitionedEvent.newState, - alarmStateTransitionedEvent.stateChangeReason, "{}", - alarmStateTransitionedEvent.timestamp); - } - - final Serie[] series = {builder.build()}; - - if (logger.isDebugEnabled()) { - logColValues(series[0]); - } - - this.influxDB.write(this.configuration.getInfluxDBConfiguration().getName(), - TimeUnit.SECONDS, series); - - context.stop(); - long endTime = System.currentTimeMillis(); - logger.debug("Commiting batch took {} seconds", (endTime - startTime) / 1000); - - } catch (Exception e) { - logger.error("Failed to write alarm state history to database", e); - } - - this.alarmStateTransitionedEventList.clear(); - } -} diff --git a/java/src/main/java/monasca/persister/repository/InfluxDBMetricRepository.java b/java/src/main/java/monasca/persister/repository/InfluxDBMetricRepository.java deleted file mode 100644 index 9814b497..00000000 --- a/java/src/main/java/monasca/persister/repository/InfluxDBMetricRepository.java +++ /dev/null @@ -1,343 +0,0 @@ -/* - * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package monasca.persister.repository; - -import com.google.inject.Inject; - -import com.codahale.metrics.Meter; -import com.codahale.metrics.Timer; - -import org.apache.commons.codec.digest.DigestUtils; -import org.influxdb.dto.Serie; -import org.influxdb.dto.Serie.Builder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.UnsupportedEncodingException; -import java.net.URLEncoder; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeSet; -import java.util.concurrent.TimeUnit; - -import io.dropwizard.setup.Environment; -import monasca.persister.configuration.MonPersisterConfiguration; - -public final class InfluxDBMetricRepository extends InfluxRepository implements MetricRepository { - - private static final Logger logger = LoggerFactory.getLogger(InfluxDBMetricRepository.class); - private static final int NUMBER_COLUMNS = 2; - - private final Map defMap = new HashMap<>(); - private final Map> dimMap = new HashMap<>(); - private final Map defDimMap = new HashMap<>(); - private final Map> measurementMap = new HashMap<>(); - - private final com.codahale.metrics.Timer flushTimer; - public final Meter measurementMeter; - - private final SimpleDateFormat measurementTimeSimpleDateFormat = new - SimpleDateFormat("yyyy-MM-dd HH:mm:ss zzz"); - private static final Sha1HashId BLANK_SHA_1_HASH_ID = new Sha1HashId(DigestUtils.sha("")); - private static final Set EMPTY_DIM_TREE_SET = new TreeSet<>(); - - @Inject - public InfluxDBMetricRepository(final MonPersisterConfiguration configuration, - final Environment environment) { - super(configuration, environment); - this.flushTimer = this.environment.metrics().timer(this.getClass().getName() + "." + - "flush-timer"); - this.measurementMeter = this.environment.metrics().meter(this.getClass().getName() + "." + - "measurement-meter"); - } - - @Override - public void addMetricToBatch(final Sha1HashId defDimsId, final String timeStamp, - final double value) { - final Measurement measurement = new Measurement(defDimsId, timeStamp, value); - List measurementList = this.measurementMap.get(defDimsId); - if (measurementList == null) { - measurementList = new LinkedList<>(); - this.measurementMap.put(defDimsId, measurementList); - } - measurementList.add(measurement); - } - - @Override - public void addDefinitionToBatch(final Sha1HashId defId, final String name, final String tenantId, - final String region) { - final Def def = new Def(defId, name, tenantId, region); - defMap.put(defId, def); - } - - @Override - public void addDimensionToBatch(final Sha1HashId dimSetId, final String name, - final String value) { - Set dimSet = dimMap.get(dimSetId); - if (dimSet == null) { - dimSet = new TreeSet<>(); - dimMap.put(dimSetId, dimSet); - } - - final Dim dim = new Dim(dimSetId, name, value); - dimSet.add(dim); - } - - @Override - public void addDefinitionDimensionToBatch(final Sha1HashId defDimsId, final Sha1HashId defId, - Sha1HashId dimId) { - final DefDim defDim = new DefDim(defDimsId, defId, dimId); - defDimMap.put(defDimsId, defDim); - } - - @Override - public void flush() { - - try { - final long startTime = System.currentTimeMillis(); - final Timer.Context context = flushTimer.time(); - this.influxDB.write(this.configuration.getInfluxDBConfiguration().getName(), - TimeUnit.SECONDS, getSeries()); - final long endTime = System.currentTimeMillis(); - context.stop(); - logger.debug("Writing measurements, definitions, and dimensions to InfluxDB took {} seconds", - (endTime - startTime) / 1000); - } catch (Exception e) { - logger.error("Failed to write measurements to InfluxDB", e); - } - clearBuffers(); - } - - private String buildSerieName(final Def def, final Set dimList) - throws UnsupportedEncodingException { - - logger.debug("Creating serie name"); - - final StringBuilder serieNameBuilder = new StringBuilder(); - - logger.debug("Adding tenant_id to serie name: {}", def.tenantId); - serieNameBuilder.append(urlEncodeUTF8(def.tenantId)); - serieNameBuilder.append("?"); - - logger.debug("Adding region to serie name: {}", def.region); - serieNameBuilder.append(urlEncodeUTF8(def.region)); - serieNameBuilder.append("&"); - - logger.debug("Adding name to serie name: {}", def.name); - serieNameBuilder.append(urlEncodeUTF8(def.name)); - - for (final Dim dim : dimList) { - serieNameBuilder.append("&"); - logger.debug("Adding dimension name to serie name: {}", dim.name); - serieNameBuilder.append(urlEncodeUTF8(dim.name)); - serieNameBuilder.append("="); - logger.debug("Adding dimension value to serie name: {}", dim.value); - serieNameBuilder.append(urlEncodeUTF8(dim.value)); - } - - final String serieName = serieNameBuilder.toString(); - logger.debug("Created serie name: {}", serieName); - - return serieName; - } - - private Def getDef(final Sha1HashId defId) throws Exception { - - final Def def = this.defMap.get(defId); - if (def == null) { - throw new Exception("Failed to find definition for defId: " + defId); - } - - return def; - } - - private Set getDimSet(final Sha1HashId dimId) throws Exception { - - // If there were no dimensions, then "" was used in the hash id and nothing was - // ever added to the dimension map for this dimension set. - if (dimId.equals(BLANK_SHA_1_HASH_ID)) { - return EMPTY_DIM_TREE_SET; - } - - final Set dimSet = this.dimMap.get(dimId); - - if (dimSet == null) { - throw new Exception("Failed to find dimension set for dimId: " + dimId); - } - - return dimSet; - } - - private String urlEncodeUTF8(final String s) throws UnsupportedEncodingException { - return URLEncoder.encode(s, "UTF-8"); - } - - private String[] buildColNamesStringArry() { - - final String[] colNameStringArry = new String[NUMBER_COLUMNS]; - - colNameStringArry[0] = "time"; - logger.debug("Added column name[{}] = {}", 0, colNameStringArry[0]); - - colNameStringArry[1] = "value"; - logger.debug("Added column name[{}] = {}", 1, colNameStringArry[1]); - - if (logger.isDebugEnabled()) { - logColumnNames(colNameStringArry); - } - - return colNameStringArry; - } - - private Serie[] getSeries() throws Exception { - - final List serieList = new LinkedList<>(); - - for (final Sha1HashId defDimId : this.measurementMap.keySet()) { - - final DefDim defDim = this.defDimMap.get(defDimId); - final Def def = getDef(defDim.defId); - final Set dimSet = getDimSet(defDim.dimId); - final Builder builder = new Serie.Builder(buildSerieName(def, dimSet)); - - builder.columns(buildColNamesStringArry()); - - for (final Measurement measurement : this.measurementMap.get(defDimId)) { - final Object[] colValsObjArry = new Object[NUMBER_COLUMNS]; - final Date date = measurementTimeSimpleDateFormat.parse(measurement.time + " UTC"); - final Long time = date.getTime() / 1000; - colValsObjArry[0] = time; - logger.debug("Added column value to colValsObjArry[{}] = {}", 0, colValsObjArry[0]); - colValsObjArry[1] = measurement.value; - logger.debug("Added column value to colValsObjArry[{}] = {}", 1, colValsObjArry[1]); - builder.values(colValsObjArry); - measurementMeter.mark(); - } - - final Serie serie = builder.build(); - - if (logger.isDebugEnabled()) { - logColValues(serie); - } - - serieList.add(serie); - logger.debug("Added serie: {} to serieList", serie.getName()); - } - - return serieList.toArray(new Serie[serieList.size()]); - } - - - private void clearBuffers() { - - this.measurementMap.clear(); - this.defMap.clear(); - this.dimMap.clear(); - this.defDimMap.clear(); - } - - private static final class Measurement { - - final Sha1HashId defDimsId; - final String time; - final double value; - - private Measurement(final Sha1HashId defDimsId, final String time, final double value) { - this.defDimsId = defDimsId; - this.time = time; - this.value = value; - } - - @Override - public String toString() { - return "Measurement{" + "defDimsId=" + defDimsId + ", time='" + time + '\'' + ", " + - "value=" + value + '}'; - } - } - - private static final class Def { - - final Sha1HashId defId; - final String name; - final String tenantId; - final String region; - - private Def(final Sha1HashId defId, final String name, final String tenantId, - final String region) { - this.defId = defId; - this.name = name; - this.tenantId = tenantId; - this.region = region; - } - - @Override - public String toString() { - return "Definition{" + "defId=" + defId + ", name='" + name + '\'' + ", " + - "tenantId='" + tenantId + '\'' + ", region='" + region + '\'' + '}'; - } - } - - private static final class Dim implements Comparable { - - final Sha1HashId dimSetId; - final String name; - final String value; - - private Dim(final Sha1HashId dimSetId, final String name, final String value) { - this.dimSetId = dimSetId; - this.name = name; - this.value = value; - } - - @Override - public String toString() { - return "Dimension{" + "dimSetId=" + dimSetId + ", name='" + name + '\'' + ", " + - "value='" + value + '\'' + '}'; - } - - @Override - public int compareTo(Dim o) { - int nameCmp = String.CASE_INSENSITIVE_ORDER.compare(name, o.name); - return (nameCmp != 0 ? nameCmp : String.CASE_INSENSITIVE_ORDER.compare(value, o.value)); - } - } - - private static final class DefDim { - - final Sha1HashId defDimId; - final Sha1HashId defId; - final Sha1HashId dimId; - - private DefDim(final Sha1HashId defDimId, final Sha1HashId defId, final Sha1HashId dimId) { - this.defDimId = defDimId; - this.defId = defId; - this.dimId = dimId; - } - - @Override - public String toString() { - return "DefinitionDimension{" + "defDimId=" + defDimId + ", defId=" + defId + ", " + - "dimId=" + dimId + '}'; - } - } -} diff --git a/java/src/main/java/monasca/persister/repository/InfluxMetricRepo.java b/java/src/main/java/monasca/persister/repository/InfluxMetricRepo.java new file mode 100644 index 00000000..e8e24f92 --- /dev/null +++ b/java/src/main/java/monasca/persister/repository/InfluxMetricRepo.java @@ -0,0 +1,242 @@ +/* + * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monasca.persister.repository; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.Timer; + +import org.apache.commons.codec.digest.DigestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +import io.dropwizard.setup.Environment; + +public abstract class InfluxMetricRepo implements MetricRepo { + + private static final Logger logger = LoggerFactory.getLogger(InfluxMetricRepo.class); + + protected final Map defMap = new HashMap<>(); + protected final Map> dimMap = new HashMap<>(); + protected final Map defDimMap = new HashMap<>(); + protected final Map> measurementMap = new HashMap<>(); + + public final com.codahale.metrics.Timer flushTimer; + public final Meter measurementMeter; + + private static final Sha1HashId BLANK_SHA_1_HASH_ID = new Sha1HashId(DigestUtils.sha("")); + private static final Set EMPTY_DIM_TREE_SET = new TreeSet<>(); + + protected abstract void write() throws Exception; + + public InfluxMetricRepo(final Environment env) { + + this.flushTimer = env.metrics().timer(this.getClass().getName() + "." + + "flush-timer"); + + this.measurementMeter = env.metrics().meter(this.getClass().getName() + "." + + "measurement-meter"); + } + + @Override + public void addMetricToBatch(final Sha1HashId defDimsId, final String timeStamp, + final double value) { + final Measurement measurement = new Measurement(defDimsId, timeStamp, value); + List measurementList = this.measurementMap.get(defDimsId); + if (measurementList == null) { + measurementList = new LinkedList<>(); + this.measurementMap.put(defDimsId, measurementList); + } + measurementList.add(measurement); + } + + @Override + public void addDefinitionToBatch(final Sha1HashId defId, final String name, final String tenantId, + final String region) { + final Def def = new Def(defId, name, tenantId, region); + this.defMap.put(defId, def); + } + + @Override + public void addDimensionToBatch(final Sha1HashId dimSetId, final String name, + final String value) { + final Dim dim = new Dim(dimSetId, name, value); + Set dimSet = this.dimMap.get(dimSetId); + if (dimSet == null) { + dimSet = new TreeSet<>(); + this.dimMap.put(dimSetId, dimSet); + } + + dimSet.add(dim); + } + + @Override + public void addDefinitionDimensionToBatch(final Sha1HashId defDimsId, final Sha1HashId defId, + Sha1HashId dimId) { + final DefDim defDim = new DefDim(defDimsId, defId, dimId); + this.defDimMap.put(defDimsId, defDim); + } + + @Override + public void flush() { + + try { + final long startTime = System.currentTimeMillis(); + final Timer.Context context = flushTimer.time(); + + write(); + + final long endTime = System.currentTimeMillis(); + context.stop(); + + logger.debug("Writing measurements, definitions, and dimensions to InfluxDB took {} seconds", + (endTime - startTime) / 1000); + + } catch (Exception e) { + logger.error("Failed to write measurements to InfluxDB", e); + } + + clearBuffers(); + } + + protected Def getDef(final Sha1HashId defId) throws Exception { + + final Def def = this.defMap.get(defId); + if (def == null) { + throw new Exception("Failed to find definition for defId: " + defId); + } + + return def; + } + + protected Set getDimSet(final Sha1HashId dimId) throws Exception { + + // If there were no dimensions, then "" was used in the hash id and nothing was + // ever added to the dimension map for this dimension set. + if (dimId.equals(BLANK_SHA_1_HASH_ID)) { + return EMPTY_DIM_TREE_SET; + } + + final Set dimSet = this.dimMap.get(dimId); + + if (dimSet == null) { + throw new Exception("Failed to find dimension set for dimId: " + dimId); + } + + return dimSet; + } + + private void clearBuffers() { + + this.measurementMap.clear(); + this.defMap.clear(); + this.dimMap.clear(); + this.defDimMap.clear(); + } + + static protected final class Measurement { + + protected final Sha1HashId defDimsId; + protected final String time; + protected final double value; + + private Measurement(final Sha1HashId defDimsId, final String time, final double value) { + this.defDimsId = defDimsId; + this.time = time; + this.value = value; + } + + @Override + public String toString() { + return "Measurement{" + "defDimsId=" + this.defDimsId + ", time='" + this.time + '\'' + ", " + + "value=" + this.value + '}'; + } + } + + static protected final class Def { + + protected final Sha1HashId defId; + protected final String name; + protected final String tenantId; + protected final String region; + + private Def(final Sha1HashId defId, final String name, final String tenantId, + final String region) { + this.defId = defId; + this.name = name; + this.tenantId = tenantId; + this.region = region; + } + + @Override + public String toString() { + return "Definition{" + "defId=" + this.defId + ", name='" + this.name + '\'' + ", " + + "tenantId='" + this.tenantId + '\'' + ", region='" + this.region + '\'' + '}'; + } + } + + static protected final class Dim implements Comparable { + + protected final Sha1HashId dimSetId; + protected final String name; + protected final String value; + + private Dim(final Sha1HashId dimSetId, final String name, final String value) { + this.dimSetId = dimSetId; + this.name = name; + this.value = value; + } + + @Override + public String toString() { + return "Dimension{" + "dimSetId=" + this.dimSetId + ", name='" + this.name + '\'' + ", " + + "value='" + this.value + '\'' + '}'; + } + + @Override + public int compareTo(Dim o) { + int nameCmp = String.CASE_INSENSITIVE_ORDER.compare(name, o.name); + return (nameCmp != 0 ? nameCmp : String.CASE_INSENSITIVE_ORDER.compare(this.value, o.value)); + } + } + + static protected final class DefDim { + + protected final Sha1HashId defDimId; + protected final Sha1HashId defId; + protected final Sha1HashId dimId; + + private DefDim(final Sha1HashId defDimId, final Sha1HashId defId, final Sha1HashId dimId) { + this.defDimId = defDimId; + this.defId = defId; + this.dimId = dimId; + } + + @Override + public String toString() { + return "DefinitionDimension{" + "defDimId=" + this.defDimId + ", defId=" + this.defId + ", " + + "dimId=" + this.dimId + '}'; + } + } +} diff --git a/java/src/main/java/monasca/persister/repository/InfluxV8AlarmRepo.java b/java/src/main/java/monasca/persister/repository/InfluxV8AlarmRepo.java new file mode 100644 index 00000000..36f98420 --- /dev/null +++ b/java/src/main/java/monasca/persister/repository/InfluxV8AlarmRepo.java @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monasca.persister.repository; + +import com.google.inject.Inject; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.PropertyNamingStrategy; + +import org.influxdb.dto.Serie; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +import io.dropwizard.setup.Environment; +import monasca.common.model.event.AlarmStateTransitionedEvent; + +public class InfluxV8AlarmRepo extends InfluxAlarmRepo { + + private static final Logger logger = LoggerFactory.getLogger(InfluxV8AlarmRepo.class); + + private static final String[] + COLUMN_NAMES = + {"tenant_id", "alarm_id", "metrics", "old_state", "new_state", "reason", "reason_data", + "time"}; + + private final InfluxV8RepoWriter influxV8RepoWriter; + + private final ObjectMapper objectMapper = new ObjectMapper(); + + @Inject + public InfluxV8AlarmRepo(final Environment env, + final InfluxV8RepoWriter influxV8RepoWriter) { + + super(env); + this.influxV8RepoWriter = influxV8RepoWriter; + + this.objectMapper.setPropertyNamingStrategy( + PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES); + + } + + @Override + protected void write() throws JsonProcessingException { + + final Serie.Builder builder = new Serie.Builder(ALARM_STATE_HISTORY_NAME); + logger.debug("Created serie: {}", ALARM_STATE_HISTORY_NAME); + + builder.columns(COLUMN_NAMES); + + if (logger.isDebugEnabled()) { + this.influxV8RepoWriter.logColumnNames(COLUMN_NAMES); + } + + for (AlarmStateTransitionedEvent alarmStateTransitionedEvent : this.alarmStateTransitionedEventList) { + builder.values(alarmStateTransitionedEvent.tenantId, alarmStateTransitionedEvent.alarmId, + this.objectMapper.writeValueAsString(alarmStateTransitionedEvent.metrics), + alarmStateTransitionedEvent.oldState, alarmStateTransitionedEvent.newState, + alarmStateTransitionedEvent.stateChangeReason, "{}", + alarmStateTransitionedEvent.timestamp); + } + + final Serie[] series = {builder.build()}; + + if (logger.isDebugEnabled()) { + this.influxV8RepoWriter.logColValues(series[0]); + } + + this.influxV8RepoWriter.write(TimeUnit.SECONDS, series); + } + +} diff --git a/java/src/main/java/monasca/persister/repository/InfluxV8MetricRepo.java b/java/src/main/java/monasca/persister/repository/InfluxV8MetricRepo.java new file mode 100644 index 00000000..d94a3d22 --- /dev/null +++ b/java/src/main/java/monasca/persister/repository/InfluxV8MetricRepo.java @@ -0,0 +1,136 @@ +/* + * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monasca.persister.repository; + +import com.google.inject.Inject; + +import org.influxdb.dto.Serie; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import io.dropwizard.setup.Environment; + +public class InfluxV8MetricRepo extends InfluxMetricRepo +{ + + private static final Logger logger = LoggerFactory.getLogger(InfluxV8MetricRepo.class); + private static final String[] COL_NAMES_STRING_ARRY = {"time", "value"}; + + private final InfluxV8RepoWriter influxV8RepoWriter; + + private final SimpleDateFormat simpleDateFormat = + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss zzz"); + + @Inject + public InfluxV8MetricRepo(final Environment env, + final InfluxV8RepoWriter influxV8RepoWriter) { + + super(env); + this.influxV8RepoWriter = influxV8RepoWriter; + + } + + @Override + protected void write() throws Exception { + + this.influxV8RepoWriter.write(TimeUnit.SECONDS, getSeries()); + } + + private Serie[] getSeries() throws Exception { + + final List serieList = new LinkedList<>(); + + for (final Sha1HashId defDimId : this.measurementMap.keySet()) { + + final DefDim defDim = this.defDimMap.get(defDimId); + final Def def = getDef(defDim.defId); + final Set dimSet = getDimSet(defDim.dimId); + final Serie.Builder builder = new Serie.Builder(buildSerieName(def, dimSet)); + + builder.columns(COL_NAMES_STRING_ARRY); + + for (final Measurement measurement : this.measurementMap.get(defDimId)) { + final Object[] colValsObjArry = new Object[COL_NAMES_STRING_ARRY.length]; + final Date date = this.simpleDateFormat.parse(measurement.time + " UTC"); + final Long time = date.getTime() / 1000; + colValsObjArry[0] = time; + logger.debug("Added column value to colValsObjArry[{}] = {}", 0, colValsObjArry[0]); + colValsObjArry[1] = measurement.value; + logger.debug("Added column value to colValsObjArry[{}] = {}", 1, colValsObjArry[1]); + builder.values(colValsObjArry); + this.measurementMeter.mark(); + } + + final Serie serie = builder.build(); + + if (logger.isDebugEnabled()) { + this.influxV8RepoWriter.logColValues(serie); + } + + serieList.add(serie); + logger.debug("Added serie: {} to serieList", serie.getName()); + } + + return serieList.toArray(new Serie[serieList.size()]); + } + + private String buildSerieName(final Def def, final Set dimList) + throws UnsupportedEncodingException { + + logger.debug("Creating serie name"); + + final StringBuilder serieNameBuilder = new StringBuilder(); + + logger.debug("Adding tenant_id to serie name: {}", def.tenantId); + serieNameBuilder.append(urlEncodeUTF8(def.tenantId)); + serieNameBuilder.append("?"); + + logger.debug("Adding region to serie name: {}", def.region); + serieNameBuilder.append(urlEncodeUTF8(def.region)); + serieNameBuilder.append("&"); + logger.debug("Adding name to serie name: {}", def.name); + serieNameBuilder.append(urlEncodeUTF8(def.name)); + + for (final Dim dim : dimList) { + serieNameBuilder.append("&"); + logger.debug("Adding dimension name to serie name: {}", dim.name); + serieNameBuilder.append(urlEncodeUTF8(dim.name)); + serieNameBuilder.append("="); + logger.debug("Adding dimension value to serie name: {}", dim.value); + serieNameBuilder.append(urlEncodeUTF8(dim.value)); + } + + final String serieName = serieNameBuilder.toString(); + logger.debug("Created serie name: {}", serieName); + + return serieName; + } + + private String urlEncodeUTF8(final String s) throws UnsupportedEncodingException { + return URLEncoder.encode(s, "UTF-8"); + } +} diff --git a/java/src/main/java/monasca/persister/repository/InfluxRepository.java b/java/src/main/java/monasca/persister/repository/InfluxV8RepoWriter.java similarity index 62% rename from java/src/main/java/monasca/persister/repository/InfluxRepository.java rename to java/src/main/java/monasca/persister/repository/InfluxV8RepoWriter.java index 12e69f50..31804550 100644 --- a/java/src/main/java/monasca/persister/repository/InfluxRepository.java +++ b/java/src/main/java/monasca/persister/repository/InfluxV8RepoWriter.java @@ -17,9 +17,7 @@ package monasca.persister.repository; -import monasca.persister.configuration.MonPersisterConfiguration; - -import io.dropwizard.setup.Environment; +import com.google.inject.Inject; import org.influxdb.InfluxDB; import org.influxdb.InfluxDBFactory; @@ -29,24 +27,41 @@ import org.slf4j.LoggerFactory; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; -public abstract class InfluxRepository { - static final Logger logger = LoggerFactory.getLogger(InfluxRepository.class); +import io.dropwizard.setup.Environment; +import monasca.persister.configuration.PersisterConfig; - protected final MonPersisterConfiguration configuration; - protected final Environment environment; - protected final InfluxDB influxDB; +public class InfluxV8RepoWriter { + + static final Logger logger = LoggerFactory.getLogger(InfluxV8RepoWriter.class); + + private final PersisterConfig config; + private final Environment env; + private final InfluxDB influxDB; + + private final String databaseName; + + @Inject + public InfluxV8RepoWriter(final PersisterConfig config, final Environment env) { + + this.config = config; + this.env = env; + this.influxDB = + InfluxDBFactory.connect(config.getInfluxDBConfiguration().getUrl(), + config.getInfluxDBConfiguration().getUser(), + config.getInfluxDBConfiguration().getPassword()); + + this.databaseName = this.config.getInfluxDBConfiguration().getName(); - public InfluxRepository(MonPersisterConfiguration configuration, Environment environment) { - this.configuration = configuration; - this.environment = environment; - influxDB = - InfluxDBFactory.connect(configuration.getInfluxDBConfiguration().getUrl(), configuration - .getInfluxDBConfiguration().getUser(), configuration.getInfluxDBConfiguration() - .getPassword()); } - protected void logColValues(Serie serie) { + protected void write(final TimeUnit precision, final Serie[] series) { + + this.influxDB.write(this.databaseName, precision, series); + } + + protected void logColValues(final Serie serie) { logger.debug("Added array of array of column values to serie"); final String[] colNames = serie.getColumns(); List> rows = serie.getRows(); @@ -67,7 +82,7 @@ public abstract class InfluxRepository { } } - protected void logColumnNames(String[] colNames) { + protected void logColumnNames(final String[] colNames) { logger.debug("Added array of column names to serie"); StringBuffer sb = new StringBuffer(); boolean first = true; diff --git a/java/src/main/java/monasca/persister/repository/InfluxV9AlarmRepo.java b/java/src/main/java/monasca/persister/repository/InfluxV9AlarmRepo.java new file mode 100644 index 00000000..d0b654f2 --- /dev/null +++ b/java/src/main/java/monasca/persister/repository/InfluxV9AlarmRepo.java @@ -0,0 +1,96 @@ +/* + * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monasca.persister.repository; + +import com.google.inject.Inject; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.PropertyNamingStrategy; + +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.joda.time.format.DateTimeFormatter; +import org.joda.time.format.ISODateTimeFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import io.dropwizard.setup.Environment; +import monasca.common.model.event.AlarmStateTransitionedEvent; +import monasca.persister.repository.influxdb.InfluxPoint; + +public class InfluxV9AlarmRepo extends InfluxAlarmRepo { + + private static final Logger logger = LoggerFactory.getLogger(InfluxV9AlarmRepo.class); + + private final InfluxV9RepoWriter influxV9RepoWriter; + + private final ObjectMapper objectMapper = new ObjectMapper(); + + private final DateTimeFormatter dateFormatter = ISODateTimeFormat.dateTime(); + + @Inject + public InfluxV9AlarmRepo(final Environment env, + final InfluxV9RepoWriter influxV9RepoWriter) { + + super(env); + this.influxV9RepoWriter = influxV9RepoWriter; + + this.objectMapper.setPropertyNamingStrategy( + PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES); + } + + @Override + protected void write() throws Exception { + + this.influxV9RepoWriter.write(getInfluxPointArry()); + + } + + private InfluxPoint[] getInfluxPointArry() throws Exception { + + List influxPointList = new LinkedList<>(); + + for (AlarmStateTransitionedEvent event : this.alarmStateTransitionedEventList) { + Map valueMap = new HashMap<>(); + + valueMap.put("tenant_id", event.tenantId); + valueMap.put("alarm_id", event.alarmId); + valueMap.put("metrics", this.objectMapper.writeValueAsString(event.metrics)); + valueMap.put("old_state", event.oldState); + valueMap.put("new_state", event.newState); + valueMap.put("reason", event.stateChangeReason); + valueMap.put("reason_data", "{}"); + + DateTime dateTime = new DateTime(event.timestamp * 1000, DateTimeZone.UTC); + String dateString = this.dateFormatter.print(dateTime); + + InfluxPoint + influxPoint = + new InfluxPoint(ALARM_STATE_HISTORY_NAME, new HashMap(), dateString, valueMap); + + influxPointList.add(influxPoint); + } + + return influxPointList.toArray(new InfluxPoint[influxPointList.size()]); + } +} diff --git a/java/src/main/java/monasca/persister/repository/InfluxV9MetricRepo.java b/java/src/main/java/monasca/persister/repository/InfluxV9MetricRepo.java new file mode 100644 index 00000000..9f623a47 --- /dev/null +++ b/java/src/main/java/monasca/persister/repository/InfluxV9MetricRepo.java @@ -0,0 +1,104 @@ + +/* + * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monasca.persister.repository; + +import com.google.inject.Inject; + +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.joda.time.format.DateTimeFormatter; +import org.joda.time.format.ISODateTimeFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import io.dropwizard.setup.Environment; +import monasca.persister.repository.influxdb.InfluxPoint; + +public class InfluxV9MetricRepo extends InfluxMetricRepo { + + private static final Logger logger = LoggerFactory.getLogger(InfluxV9MetricRepo.class); + + private final SimpleDateFormat simpleDateFormat = + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss zzz"); + + private final InfluxV9RepoWriter influxV9RepoWriter; + + @Inject + public InfluxV9MetricRepo(final Environment env, + final InfluxV9RepoWriter influxV9RepoWriter) { + + super(env); + this.influxV9RepoWriter = influxV9RepoWriter; + + } + + @Override + protected void write() throws Exception { + + this.influxV9RepoWriter.write(getInfluxPointArry()); + + } + + private InfluxPoint[] getInfluxPointArry() throws Exception { + + DateTimeFormatter dateFormatter = ISODateTimeFormat.dateTime(); + + List influxPointList = new LinkedList<>(); + + for (final Sha1HashId defDimId : this.measurementMap.keySet()) { + + final DefDim defDim = this.defDimMap.get(defDimId); + final Def def = getDef(defDim.defId); + final Set dimSet = getDimSet(defDim.dimId); + + Map tagMap = new HashMap<>(); + for (Dim dim : dimSet) { + tagMap.put(dim.name, dim.value); + } + tagMap.put("tenant_id", def.tenantId); + tagMap.put("region", def.region); + + for (final Measurement measurement : this.measurementMap.get(defDimId)) { + + Date date = this.simpleDateFormat.parse(measurement.time + " UTC"); + DateTime dateTime = new DateTime(date.getTime(), DateTimeZone.UTC); + String dateString = dateFormatter.print(dateTime); + + Map valueMap = new HashMap<>(); + valueMap.put("value", measurement.value); + + InfluxPoint influxPoint = new InfluxPoint(def.name, tagMap, dateString, valueMap); + + influxPointList.add(influxPoint); + + this.measurementMeter.mark(); + } + } + + return influxPointList.toArray(new InfluxPoint[influxPointList.size()]); + } +} diff --git a/java/src/main/java/monasca/persister/repository/InfluxV9RepoWriter.java b/java/src/main/java/monasca/persister/repository/InfluxV9RepoWriter.java new file mode 100644 index 00000000..0552a723 --- /dev/null +++ b/java/src/main/java/monasca/persister/repository/InfluxV9RepoWriter.java @@ -0,0 +1,120 @@ +/* + * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monasca.persister.repository; + +import com.google.inject.Inject; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import org.apache.commons.codec.binary.Base64; +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.http.HttpStatus; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; + +import monasca.persister.configuration.PersisterConfig; +import monasca.persister.repository.influxdb.InfluxPoint; +import monasca.persister.repository.influxdb.InfluxWrite; + +public class InfluxV9RepoWriter { + + private static final Logger logger = LoggerFactory.getLogger(InfluxV9RepoWriter.class); + + private final PersisterConfig config; + + private final String influxName; + private final String influxUrl; + private final String influxCreds; + private final String influxUser; + private final String influxPass; + private final String influxRetentionPolicy; + + private final CloseableHttpClient httpClient = HttpClientBuilder.create().build(); + + private final String baseAuthHeader; + + private final ObjectMapper objectMapper = new ObjectMapper(); + + + @Inject + public InfluxV9RepoWriter(final PersisterConfig config) { + + this.config = config; + + this.influxName = config.getInfluxDBConfiguration().getName(); + this.influxUrl = config.getInfluxDBConfiguration().getUrl() + "/write"; + this.influxUser = config.getInfluxDBConfiguration().getUser(); + this.influxPass = config.getInfluxDBConfiguration().getPassword(); + this.influxCreds = this.influxUser + ":" + this.influxPass; + this.influxRetentionPolicy = config.getInfluxDBConfiguration().getRetentionPolicy(); + + this.baseAuthHeader = "Basic " + new String(Base64.encodeBase64(this.influxCreds.getBytes())); + + } + + protected void write(final InfluxPoint[] influxPointArry) throws Exception { + + HttpPost request = new HttpPost(this.influxUrl); + + request.addHeader("content-type", "application/json"); + request.addHeader("Authorization", this.baseAuthHeader); + + InfluxWrite + influxWrite = + new InfluxWrite(this.influxName, this.influxRetentionPolicy, influxPointArry, + new HashMap()); + StringEntity params = new StringEntity(this.objectMapper.writeValueAsString(influxWrite)); + request.setEntity(params); + + try { + + logger.debug("Writing {} points to influxdb database {} at {}", + influxPointArry.length, this.influxName, this.influxUrl); + + HttpResponse response = this.httpClient.execute(request); + + int rc = response.getStatusLine().getStatusCode(); + + if (rc != HttpStatus.SC_OK) { + + HttpEntity entity = response.getEntity(); + String responseString = EntityUtils.toString(entity, "UTF-8"); + logger.error("Failed to write data to Influxdb: {}", String.valueOf(rc)); + logger.error("Http response: {}", responseString); + + throw new Exception(responseString); + } + + logger.debug("Successfully wrote {} points to influxdb database {} at {}", + influxPointArry.length, this.influxName, this.influxUrl); + + } finally { + + request.releaseConnection(); + + } + } +} diff --git a/java/src/main/java/monasca/persister/repository/MetricRepository.java b/java/src/main/java/monasca/persister/repository/MetricRepo.java similarity index 96% rename from java/src/main/java/monasca/persister/repository/MetricRepository.java rename to java/src/main/java/monasca/persister/repository/MetricRepo.java index 7e1e9e35..89199907 100644 --- a/java/src/main/java/monasca/persister/repository/MetricRepository.java +++ b/java/src/main/java/monasca/persister/repository/MetricRepo.java @@ -17,7 +17,7 @@ package monasca.persister.repository; -public interface MetricRepository { +public interface MetricRepo { void addMetricToBatch(Sha1HashId defDimsId, String timeStamp, double value); void addDefinitionToBatch(Sha1HashId defId, String name, String tenantId, String region); diff --git a/java/src/main/java/monasca/persister/repository/VerticaAlarmRepository.java b/java/src/main/java/monasca/persister/repository/VerticaAlarmRepo.java similarity index 89% rename from java/src/main/java/monasca/persister/repository/VerticaAlarmRepository.java rename to java/src/main/java/monasca/persister/repository/VerticaAlarmRepo.java index de12e7d7..acce809e 100644 --- a/java/src/main/java/monasca/persister/repository/VerticaAlarmRepository.java +++ b/java/src/main/java/monasca/persister/repository/VerticaAlarmRepo.java @@ -18,7 +18,7 @@ package monasca.persister.repository; import monasca.common.model.event.AlarmStateTransitionedEvent; -import monasca.persister.configuration.MonPersisterConfiguration; +import monasca.persister.configuration.PersisterConfig; import com.codahale.metrics.Timer; @@ -37,9 +37,9 @@ import java.util.TimeZone; import javax.inject.Inject; -public class VerticaAlarmRepository extends VerticaRepository implements AlarmRepository { +public class VerticaAlarmRepo extends VerticaRepo implements AlarmRepo { - private static final Logger logger = LoggerFactory.getLogger(VerticaAlarmRepository.class); + private static final Logger logger = LoggerFactory.getLogger(VerticaAlarmRepo.class); private final Environment environment; private static final String SQL_INSERT_INTO_ALARM_HISTORY = @@ -49,8 +49,7 @@ public class VerticaAlarmRepository extends VerticaRepository implements AlarmRe private final SimpleDateFormat simpleDateFormat; @Inject - public VerticaAlarmRepository(DBI dbi, MonPersisterConfiguration configuration, - Environment environment) throws NoSuchAlgorithmException, SQLException { + public VerticaAlarmRepo(DBI dbi, PersisterConfig configuration, Environment environment) throws NoSuchAlgorithmException, SQLException { super(dbi); logger.debug("Instantiating: " + this); diff --git a/java/src/main/java/monasca/persister/repository/VerticaMetricRepository.java b/java/src/main/java/monasca/persister/repository/VerticaMetricRepo.java similarity index 95% rename from java/src/main/java/monasca/persister/repository/VerticaMetricRepository.java rename to java/src/main/java/monasca/persister/repository/VerticaMetricRepo.java index 71cb4ff7..5519eb92 100644 --- a/java/src/main/java/monasca/persister/repository/VerticaMetricRepository.java +++ b/java/src/main/java/monasca/persister/repository/VerticaMetricRepo.java @@ -17,7 +17,7 @@ package monasca.persister.repository; -import monasca.persister.configuration.MonPersisterConfiguration; +import monasca.persister.configuration.PersisterConfig; import com.codahale.metrics.Meter; import com.codahale.metrics.Timer; @@ -38,9 +38,9 @@ import java.util.Set; import javax.inject.Inject; -public class VerticaMetricRepository extends VerticaRepository implements MetricRepository { +public class VerticaMetricRepo extends VerticaRepo implements MetricRepo { - private static final Logger logger = LoggerFactory.getLogger(VerticaMetricRepository.class); + private static final Logger logger = LoggerFactory.getLogger(VerticaMetricRepo.class); private final Environment environment; @@ -92,8 +92,8 @@ public class VerticaMetricRepository extends VerticaRepository implements Metric public final Meter definitionDimensionCacheHitMeter; @Inject - public VerticaMetricRepository(DBI dbi, MonPersisterConfiguration configuration, - Environment environment) throws NoSuchAlgorithmException, SQLException { + public VerticaMetricRepo(DBI dbi, PersisterConfig configuration, + Environment environment) throws NoSuchAlgorithmException, SQLException { super(dbi); logger.debug("Instantiating: " + this.getClass().getName()); @@ -123,15 +123,15 @@ public class VerticaMetricRepository extends VerticaRepository implements Metric definitionsIdCache = CacheBuilder.newBuilder() - .maximumSize(configuration.getVerticaMetricRepositoryConfiguration().getMaxCacheSize()) + .maximumSize(configuration.getVerticaMetricRepoConfig().getMaxCacheSize()) .build(); dimensionsIdCache = CacheBuilder.newBuilder() - .maximumSize(configuration.getVerticaMetricRepositoryConfiguration().getMaxCacheSize()) + .maximumSize(configuration.getVerticaMetricRepoConfig().getMaxCacheSize()) .build(); definitionDimensionsIdCache = CacheBuilder.newBuilder() - .maximumSize(configuration.getVerticaMetricRepositoryConfiguration().getMaxCacheSize()) + .maximumSize(configuration.getVerticaMetricRepoConfig().getMaxCacheSize()) .build(); logger.info("preparing database and building sql statements..."); diff --git a/java/src/main/java/monasca/persister/repository/VerticaRepository.java b/java/src/main/java/monasca/persister/repository/VerticaRepo.java similarity index 90% rename from java/src/main/java/monasca/persister/repository/VerticaRepository.java rename to java/src/main/java/monasca/persister/repository/VerticaRepo.java index 63e419e9..ad44a74b 100644 --- a/java/src/main/java/monasca/persister/repository/VerticaRepository.java +++ b/java/src/main/java/monasca/persister/repository/VerticaRepo.java @@ -20,17 +20,17 @@ package monasca.persister.repository; import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.Handle; -public class VerticaRepository { +public class VerticaRepo { protected DBI dbi; protected Handle handle; - public VerticaRepository(DBI dbi) { + public VerticaRepo(DBI dbi) { this.dbi = dbi; this.handle = dbi.open(); this.handle.execute("SET TIME ZONE TO 'UTC'"); } - public VerticaRepository() {} + public VerticaRepo() {} public void setDBI(DBI dbi) throws Exception { this.dbi = dbi; diff --git a/java/src/main/java/monasca/persister/repository/influxdb/InfluxPoint.java b/java/src/main/java/monasca/persister/repository/influxdb/InfluxPoint.java new file mode 100644 index 00000000..e44ceeaa --- /dev/null +++ b/java/src/main/java/monasca/persister/repository/influxdb/InfluxPoint.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monasca.persister.repository.influxdb; + +import java.util.Map; + +public class InfluxPoint { + + private final String name; + private final Map tags; + private final String timestamp; + private final Map values; + + public InfluxPoint(final String name, final Map tags, final String timestamp, + final Map values) { + this.name = name; + this.tags = tags; + this.timestamp = timestamp; + this.values = values; + } + + public String getName() { + return name; + } + + public Map getTags() { + return this.tags; + } + + public String getTimestamp() { + return this.timestamp; + } + + public Map getValues() { + return this.values; + } + +} diff --git a/java/src/main/java/monasca/persister/repository/influxdb/InfluxWrite.java b/java/src/main/java/monasca/persister/repository/influxdb/InfluxWrite.java new file mode 100644 index 00000000..cfc39035 --- /dev/null +++ b/java/src/main/java/monasca/persister/repository/influxdb/InfluxWrite.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monasca.persister.repository.influxdb; + +import java.util.Map; + +public class InfluxWrite { + + private final String database; + private final String retentionPolicy; + private final InfluxPoint[] points; + private final Map tags; + + + public InfluxWrite(final String database, final String retentionPolicy, final InfluxPoint[] points, + final Map tags) { + this.database = database; + this.retentionPolicy = retentionPolicy; + this.points = points; + this.tags = tags; + } + + public String getDatabase() { + return database; + } + + public String getRetentionPolicy() { + return retentionPolicy; + } + + public Map getTags() { + return this.tags; + } + + public InfluxPoint[] getPoints() { + return points; + } +} diff --git a/java/src/main/resources/persister-config.yml b/java/src/main/resources/persister-config.yml index 11a87228..6ef9f2ca 100644 --- a/java/src/main/resources/persister-config.yml +++ b/java/src/main/resources/persister-config.yml @@ -21,7 +21,7 @@ metricConfiguration: clientId: 1 #Kafka settings. -kafkaConfiguration: +kafkaConfig: #zookeeperConnect: localhost:2181 # See http://kafka.apache.org/documentation.html#api for semantics and defaults. zookeeperConnect: 192.168.10.4:2181 @@ -40,15 +40,21 @@ kafkaConfiguration: zookeeperConnectionTimeoutMs : 6000 zookeeperSyncTimeMs: 2000 -verticaMetricRepositoryConfiguration: +verticaMetricRepoConfig: maxCacheSize: 2000000 databaseConfiguration: -# vertica | influxdb +# databaseType can be (vertica | influxdb) databaseType: influxdb # databaseType: vertica influxDbConfiguration: + # Version and retention policy must be set for V9. If no + # version set then defaults to V8. + # version can be (V8 | V9) + #version: V9 + # Retention policy may be left blank to indicate default policy. + #retentionPolicy: name: mon replicationFactor: 1 url: http://192.168.10.4:8086