diff --git a/java/src/main/java/monasca/persister/PersisterApplication.java b/java/src/main/java/monasca/persister/PersisterApplication.java index 8d487d02..bca715f7 100644 --- a/java/src/main/java/monasca/persister/PersisterApplication.java +++ b/java/src/main/java/monasca/persister/PersisterApplication.java @@ -18,16 +18,16 @@ package monasca.persister; import monasca.persister.configuration.PersisterConfig; -import monasca.persister.consumer.AlarmStateTransitionConsumer; -import monasca.persister.consumer.AlarmStateTransitionConsumerFactory; -import monasca.persister.consumer.KafkaAlarmStateTransitionConsumer; -import monasca.persister.consumer.KafkaAlarmStateTransitionConsumerFactory; +import monasca.persister.consumer.alarmstate.AlarmStateTransitionConsumer; +import monasca.persister.consumer.alarmstate.AlarmStateTransitionConsumerFactory; +import monasca.persister.consumer.alarmstate.KafkaAlarmStateTransitionConsumer; +import monasca.persister.consumer.alarmstate.KafkaAlarmStateTransitionConsumerFactory; import monasca.persister.consumer.KafkaChannel; import monasca.persister.consumer.KafkaChannelFactory; -import monasca.persister.consumer.KafkaMetricsConsumer; -import monasca.persister.consumer.KafkaMetricsConsumerFactory; -import monasca.persister.consumer.MetricsConsumer; -import monasca.persister.consumer.MetricsConsumerFactory; +import monasca.persister.consumer.metric.KafkaMetricsConsumer; +import monasca.persister.consumer.metric.KafkaMetricsConsumerFactory; +import monasca.persister.consumer.metric.MetricsConsumer; +import monasca.persister.consumer.metric.MetricsConsumerFactory; import monasca.persister.healthcheck.SimpleHealthCheck; import monasca.persister.pipeline.AlarmStateTransitionPipeline; import monasca.persister.pipeline.AlarmStateTransitionPipelineFactory; diff --git a/java/src/main/java/monasca/persister/PersisterModule.java b/java/src/main/java/monasca/persister/PersisterModule.java index e232837d..b07a5fa0 100644 --- a/java/src/main/java/monasca/persister/PersisterModule.java +++ b/java/src/main/java/monasca/persister/PersisterModule.java @@ -27,20 +27,20 @@ import javax.inject.Singleton; import io.dropwizard.setup.Environment; import monasca.persister.configuration.PersisterConfig; -import monasca.persister.consumer.AlarmStateTransitionConsumer; -import monasca.persister.consumer.AlarmStateTransitionConsumerFactory; -import monasca.persister.consumer.KafkaAlarmStateTransitionConsumer; -import monasca.persister.consumer.KafkaAlarmStateTransitionConsumerFactory; -import monasca.persister.consumer.KafkaAlarmStateTransitionConsumerRunnableBasic; -import monasca.persister.consumer.KafkaAlarmStateTransitionConsumerRunnableBasicFactory; +import monasca.persister.consumer.alarmstate.AlarmStateTransitionConsumer; +import monasca.persister.consumer.alarmstate.AlarmStateTransitionConsumerFactory; +import monasca.persister.consumer.alarmstate.KafkaAlarmStateTransitionConsumer; +import monasca.persister.consumer.alarmstate.KafkaAlarmStateTransitionConsumerFactory; +import monasca.persister.consumer.alarmstate.KafkaAlarmStateTransitionConsumerRunnableBasic; +import monasca.persister.consumer.alarmstate.KafkaAlarmStateTransitionConsumerRunnableBasicFactory; import monasca.persister.consumer.KafkaChannel; import monasca.persister.consumer.KafkaChannelFactory; -import monasca.persister.consumer.KafkaMetricsConsumer; -import monasca.persister.consumer.KafkaMetricsConsumerFactory; -import monasca.persister.consumer.KafkaMetricsConsumerRunnableBasic; -import monasca.persister.consumer.KafkaMetricsConsumerRunnableBasicFactory; -import monasca.persister.consumer.MetricsConsumer; -import monasca.persister.consumer.MetricsConsumerFactory; +import monasca.persister.consumer.metric.KafkaMetricsConsumer; +import monasca.persister.consumer.metric.KafkaMetricsConsumerFactory; +import monasca.persister.consumer.metric.KafkaMetricsConsumerRunnableBasic; +import monasca.persister.consumer.metric.KafkaMetricsConsumerRunnableBasicFactory; +import monasca.persister.consumer.metric.MetricsConsumer; +import monasca.persister.consumer.metric.MetricsConsumerFactory; import monasca.persister.dbi.DBIProvider; import monasca.persister.pipeline.AlarmStateTransitionPipeline; import monasca.persister.pipeline.AlarmStateTransitionPipelineFactory; @@ -125,8 +125,7 @@ public class PersisterModule extends AbstractModule { install(new FactoryModuleBuilder().implement( MetricsConsumer.class, - MetricsConsumer.class).build( - MetricsConsumerFactory.class)); + MetricsConsumer.class).build(MetricsConsumerFactory.class)); install(new FactoryModuleBuilder().implement(KafkaChannel.class, KafkaChannel.class).build( KafkaChannelFactory.class)); diff --git a/java/src/main/java/monasca/persister/consumer/KafkaConsumer.java b/java/src/main/java/monasca/persister/consumer/KafkaConsumer.java index 68de4f6f..b27eda67 100644 --- a/java/src/main/java/monasca/persister/consumer/KafkaConsumer.java +++ b/java/src/main/java/monasca/persister/consumer/KafkaConsumer.java @@ -58,7 +58,7 @@ public abstract class KafkaConsumer { logger.warn("Did not shut down in {} seconds", WAIT_TIME); } } catch (InterruptedException e) { - logger.info("awaitTerminiation interrupted", e); + logger.info("awaitTermination interrupted", e); } } } diff --git a/java/src/main/java/monasca/persister/consumer/KafkaConsumerRunnableBasic.java b/java/src/main/java/monasca/persister/consumer/KafkaConsumerRunnableBasic.java index 11d5907a..437f1dc8 100644 --- a/java/src/main/java/monasca/persister/consumer/KafkaConsumerRunnableBasic.java +++ b/java/src/main/java/monasca/persister/consumer/KafkaConsumerRunnableBasic.java @@ -19,11 +19,11 @@ package monasca.persister.consumer; import monasca.persister.pipeline.ManagedPipeline; -import kafka.consumer.ConsumerIterator; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import kafka.consumer.ConsumerIterator; + public abstract class KafkaConsumerRunnableBasic implements Runnable { private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerRunnableBasic.class); @@ -44,7 +44,7 @@ public abstract class KafkaConsumerRunnableBasic implements Runnable { abstract protected void handleMessage(String message); - protected void markRead() { + private void markRead() { this.kafkaChannel.markRead(); } @@ -66,7 +66,6 @@ public abstract class KafkaConsumerRunnableBasic implements Runnable { } } catch (kafka.consumer.ConsumerTimeoutException cte) { publishHeartbeat(); - continue; } } logger.debug("Shutting down Thread: {}", threadNumber); diff --git a/java/src/main/java/monasca/persister/consumer/AlarmStateTransitionConsumer.java b/java/src/main/java/monasca/persister/consumer/alarmstate/AlarmStateTransitionConsumer.java similarity index 92% rename from java/src/main/java/monasca/persister/consumer/AlarmStateTransitionConsumer.java rename to java/src/main/java/monasca/persister/consumer/alarmstate/AlarmStateTransitionConsumer.java index 5c06013c..cde973b0 100644 --- a/java/src/main/java/monasca/persister/consumer/AlarmStateTransitionConsumer.java +++ b/java/src/main/java/monasca/persister/consumer/alarmstate/AlarmStateTransitionConsumer.java @@ -15,9 +15,10 @@ * limitations under the License. */ -package monasca.persister.consumer; +package monasca.persister.consumer.alarmstate; import monasca.common.model.event.AlarmStateTransitionedEvent; +import monasca.persister.consumer.Consumer; import monasca.persister.pipeline.AlarmStateTransitionPipeline; import com.google.inject.Inject; diff --git a/java/src/main/java/monasca/persister/consumer/AlarmStateTransitionConsumerFactory.java b/java/src/main/java/monasca/persister/consumer/alarmstate/AlarmStateTransitionConsumerFactory.java similarity index 94% rename from java/src/main/java/monasca/persister/consumer/AlarmStateTransitionConsumerFactory.java rename to java/src/main/java/monasca/persister/consumer/alarmstate/AlarmStateTransitionConsumerFactory.java index c650ebea..ea3d6872 100644 --- a/java/src/main/java/monasca/persister/consumer/AlarmStateTransitionConsumerFactory.java +++ b/java/src/main/java/monasca/persister/consumer/alarmstate/AlarmStateTransitionConsumerFactory.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package monasca.persister.consumer; +package monasca.persister.consumer.alarmstate; import monasca.persister.pipeline.AlarmStateTransitionPipeline; diff --git a/java/src/main/java/monasca/persister/consumer/KafkaAlarmStateTransitionConsumer.java b/java/src/main/java/monasca/persister/consumer/alarmstate/KafkaAlarmStateTransitionConsumer.java similarity index 88% rename from java/src/main/java/monasca/persister/consumer/KafkaAlarmStateTransitionConsumer.java rename to java/src/main/java/monasca/persister/consumer/alarmstate/KafkaAlarmStateTransitionConsumer.java index a11fc97b..cd482f65 100644 --- a/java/src/main/java/monasca/persister/consumer/KafkaAlarmStateTransitionConsumer.java +++ b/java/src/main/java/monasca/persister/consumer/alarmstate/KafkaAlarmStateTransitionConsumer.java @@ -15,9 +15,12 @@ * limitations under the License. */ -package monasca.persister.consumer; +package monasca.persister.consumer.alarmstate; import monasca.common.model.event.AlarmStateTransitionedEvent; +import monasca.persister.consumer.KafkaChannel; +import monasca.persister.consumer.KafkaConsumer; +import monasca.persister.consumer.KafkaConsumerRunnableBasic; import monasca.persister.pipeline.AlarmStateTransitionPipeline; import com.google.inject.Inject; diff --git a/java/src/main/java/monasca/persister/consumer/KafkaAlarmStateTransitionConsumerFactory.java b/java/src/main/java/monasca/persister/consumer/alarmstate/KafkaAlarmStateTransitionConsumerFactory.java similarity index 90% rename from java/src/main/java/monasca/persister/consumer/KafkaAlarmStateTransitionConsumerFactory.java rename to java/src/main/java/monasca/persister/consumer/alarmstate/KafkaAlarmStateTransitionConsumerFactory.java index b187ee0b..aad290c6 100644 --- a/java/src/main/java/monasca/persister/consumer/KafkaAlarmStateTransitionConsumerFactory.java +++ b/java/src/main/java/monasca/persister/consumer/alarmstate/KafkaAlarmStateTransitionConsumerFactory.java @@ -15,8 +15,9 @@ * limitations under the License. */ -package monasca.persister.consumer; +package monasca.persister.consumer.alarmstate; +import monasca.persister.consumer.KafkaChannel; import monasca.persister.pipeline.AlarmStateTransitionPipeline; public interface KafkaAlarmStateTransitionConsumerFactory { diff --git a/java/src/main/java/monasca/persister/consumer/KafkaAlarmStateTransitionConsumerRunnableBasic.java b/java/src/main/java/monasca/persister/consumer/alarmstate/KafkaAlarmStateTransitionConsumerRunnableBasic.java similarity index 89% rename from java/src/main/java/monasca/persister/consumer/KafkaAlarmStateTransitionConsumerRunnableBasic.java rename to java/src/main/java/monasca/persister/consumer/alarmstate/KafkaAlarmStateTransitionConsumerRunnableBasic.java index d58b651a..ed34babd 100644 --- a/java/src/main/java/monasca/persister/consumer/KafkaAlarmStateTransitionConsumerRunnableBasic.java +++ b/java/src/main/java/monasca/persister/consumer/alarmstate/KafkaAlarmStateTransitionConsumerRunnableBasic.java @@ -15,9 +15,11 @@ * limitations under the License. */ -package monasca.persister.consumer; +package monasca.persister.consumer.alarmstate; import monasca.common.model.event.AlarmStateTransitionedEvent; +import monasca.persister.consumer.KafkaChannel; +import monasca.persister.consumer.KafkaConsumerRunnableBasic; import monasca.persister.pipeline.AlarmStateTransitionPipeline; import com.fasterxml.jackson.databind.DeserializationFeature; @@ -29,7 +31,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class KafkaAlarmStateTransitionConsumerRunnableBasic extends - KafkaConsumerRunnableBasic { + KafkaConsumerRunnableBasic { private static final Logger logger = LoggerFactory .getLogger(KafkaAlarmStateTransitionConsumerRunnableBasic.class); diff --git a/java/src/main/java/monasca/persister/consumer/KafkaAlarmStateTransitionConsumerRunnableBasicFactory.java b/java/src/main/java/monasca/persister/consumer/alarmstate/KafkaAlarmStateTransitionConsumerRunnableBasicFactory.java similarity index 90% rename from java/src/main/java/monasca/persister/consumer/KafkaAlarmStateTransitionConsumerRunnableBasicFactory.java rename to java/src/main/java/monasca/persister/consumer/alarmstate/KafkaAlarmStateTransitionConsumerRunnableBasicFactory.java index cbb97208..4b82cbc3 100644 --- a/java/src/main/java/monasca/persister/consumer/KafkaAlarmStateTransitionConsumerRunnableBasicFactory.java +++ b/java/src/main/java/monasca/persister/consumer/alarmstate/KafkaAlarmStateTransitionConsumerRunnableBasicFactory.java @@ -15,8 +15,9 @@ * limitations under the License. */ -package monasca.persister.consumer; +package monasca.persister.consumer.alarmstate; +import monasca.persister.consumer.KafkaChannel; import monasca.persister.pipeline.AlarmStateTransitionPipeline; public interface KafkaAlarmStateTransitionConsumerRunnableBasicFactory { diff --git a/java/src/main/java/monasca/persister/consumer/KafkaMetricsConsumer.java b/java/src/main/java/monasca/persister/consumer/metric/KafkaMetricsConsumer.java similarity index 87% rename from java/src/main/java/monasca/persister/consumer/KafkaMetricsConsumer.java rename to java/src/main/java/monasca/persister/consumer/metric/KafkaMetricsConsumer.java index f622ad18..2add2dcc 100644 --- a/java/src/main/java/monasca/persister/consumer/KafkaMetricsConsumer.java +++ b/java/src/main/java/monasca/persister/consumer/metric/KafkaMetricsConsumer.java @@ -15,9 +15,12 @@ * limitations under the License. */ -package monasca.persister.consumer; +package monasca.persister.consumer.metric; import monasca.common.model.metric.MetricEnvelope; +import monasca.persister.consumer.KafkaChannel; +import monasca.persister.consumer.KafkaConsumer; +import monasca.persister.consumer.KafkaConsumerRunnableBasic; import monasca.persister.pipeline.MetricPipeline; import com.google.inject.Inject; diff --git a/java/src/main/java/monasca/persister/consumer/KafkaMetricsConsumerFactory.java b/java/src/main/java/monasca/persister/consumer/metric/KafkaMetricsConsumerFactory.java similarity index 90% rename from java/src/main/java/monasca/persister/consumer/KafkaMetricsConsumerFactory.java rename to java/src/main/java/monasca/persister/consumer/metric/KafkaMetricsConsumerFactory.java index c8190bc7..8b05a48a 100644 --- a/java/src/main/java/monasca/persister/consumer/KafkaMetricsConsumerFactory.java +++ b/java/src/main/java/monasca/persister/consumer/metric/KafkaMetricsConsumerFactory.java @@ -15,8 +15,9 @@ * limitations under the License. */ -package monasca.persister.consumer; +package monasca.persister.consumer.metric; +import monasca.persister.consumer.KafkaChannel; import monasca.persister.pipeline.MetricPipeline; public interface KafkaMetricsConsumerFactory { diff --git a/java/src/main/java/monasca/persister/consumer/KafkaMetricsConsumerRunnableBasic.java b/java/src/main/java/monasca/persister/consumer/metric/KafkaMetricsConsumerRunnableBasic.java similarity index 88% rename from java/src/main/java/monasca/persister/consumer/KafkaMetricsConsumerRunnableBasic.java rename to java/src/main/java/monasca/persister/consumer/metric/KafkaMetricsConsumerRunnableBasic.java index 05f31fd1..7dcf95fd 100644 --- a/java/src/main/java/monasca/persister/consumer/KafkaMetricsConsumerRunnableBasic.java +++ b/java/src/main/java/monasca/persister/consumer/metric/KafkaMetricsConsumerRunnableBasic.java @@ -15,9 +15,11 @@ * limitations under the License. */ -package monasca.persister.consumer; +package monasca.persister.consumer.metric; import monasca.common.model.metric.MetricEnvelope; +import monasca.persister.consumer.KafkaChannel; +import monasca.persister.consumer.KafkaConsumerRunnableBasic; import monasca.persister.pipeline.MetricPipeline; import com.fasterxml.jackson.databind.DeserializationFeature; @@ -29,7 +31,8 @@ import com.google.inject.assistedinject.Assisted; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class KafkaMetricsConsumerRunnableBasic extends KafkaConsumerRunnableBasic { +public class KafkaMetricsConsumerRunnableBasic extends + KafkaConsumerRunnableBasic { private static final Logger logger = LoggerFactory .getLogger(KafkaMetricsConsumerRunnableBasic.class); diff --git a/java/src/main/java/monasca/persister/consumer/KafkaMetricsConsumerRunnableBasicFactory.java b/java/src/main/java/monasca/persister/consumer/metric/KafkaMetricsConsumerRunnableBasicFactory.java similarity index 90% rename from java/src/main/java/monasca/persister/consumer/KafkaMetricsConsumerRunnableBasicFactory.java rename to java/src/main/java/monasca/persister/consumer/metric/KafkaMetricsConsumerRunnableBasicFactory.java index 103d1dc6..5756ddaf 100644 --- a/java/src/main/java/monasca/persister/consumer/KafkaMetricsConsumerRunnableBasicFactory.java +++ b/java/src/main/java/monasca/persister/consumer/metric/KafkaMetricsConsumerRunnableBasicFactory.java @@ -15,8 +15,9 @@ * limitations under the License. */ -package monasca.persister.consumer; +package monasca.persister.consumer.metric; +import monasca.persister.consumer.KafkaChannel; import monasca.persister.pipeline.MetricPipeline; public interface KafkaMetricsConsumerRunnableBasicFactory { diff --git a/java/src/main/java/monasca/persister/consumer/MetricsConsumer.java b/java/src/main/java/monasca/persister/consumer/metric/MetricsConsumer.java similarity index 92% rename from java/src/main/java/monasca/persister/consumer/MetricsConsumer.java rename to java/src/main/java/monasca/persister/consumer/metric/MetricsConsumer.java index 4ff24fe2..3634bb1e 100644 --- a/java/src/main/java/monasca/persister/consumer/MetricsConsumer.java +++ b/java/src/main/java/monasca/persister/consumer/metric/MetricsConsumer.java @@ -15,9 +15,10 @@ * limitations under the License. */ -package monasca.persister.consumer; +package monasca.persister.consumer.metric; import monasca.common.model.metric.MetricEnvelope; +import monasca.persister.consumer.Consumer; import monasca.persister.pipeline.MetricPipeline; import com.google.inject.Inject; diff --git a/java/src/main/java/monasca/persister/consumer/MetricsConsumerFactory.java b/java/src/main/java/monasca/persister/consumer/metric/MetricsConsumerFactory.java similarity index 94% rename from java/src/main/java/monasca/persister/consumer/MetricsConsumerFactory.java rename to java/src/main/java/monasca/persister/consumer/metric/MetricsConsumerFactory.java index a2f7e2ef..cc321bd7 100644 --- a/java/src/main/java/monasca/persister/consumer/MetricsConsumerFactory.java +++ b/java/src/main/java/monasca/persister/consumer/metric/MetricsConsumerFactory.java @@ -12,7 +12,7 @@ * the License. */ -package monasca.persister.consumer; +package monasca.persister.consumer.metric; import monasca.persister.pipeline.MetricPipeline; diff --git a/java/src/main/java/monasca/persister/repository/influxdb/InfluxV8MetricRepo.java b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV8MetricRepo.java index 01ee2569..8ff51081 100644 --- a/java/src/main/java/monasca/persister/repository/influxdb/InfluxV8MetricRepo.java +++ b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV8MetricRepo.java @@ -66,7 +66,8 @@ public class InfluxV8MetricRepo extends InfluxMetricRepo Definition definition = definitionMapEntry.getKey(); Map> dimensionsMap = definitionMapEntry.getValue(); - for (Map.Entry> dimensionsMapEntry : dimensionsMap.entrySet()) { + for (Map.Entry> dimensionsMapEntry + : dimensionsMap.entrySet()) { Dimensions dimensions = dimensionsMapEntry.getKey(); List measurementList = dimensionsMapEntry.getValue(); diff --git a/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9MetricRepo.java b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9MetricRepo.java index aaf9b707..ea413d00 100644 --- a/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9MetricRepo.java +++ b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9MetricRepo.java @@ -51,13 +51,14 @@ public class InfluxV9MetricRepo extends InfluxMetricRepo { List influxPointList = new LinkedList<>(); - for (Map.Entry>> definitionMapEntry : - this.measurementBuffer.entrySet()) { + for (Map.Entry>> definitionMapEntry + : this.measurementBuffer.entrySet()) { Definition definition = definitionMapEntry.getKey(); Map> dimensionsMap = definitionMapEntry.getValue(); - for (Map.Entry> dimensionsMapEntry : dimensionsMap.entrySet()) { + for (Map.Entry> dimensionsMapEntry + : dimensionsMap.entrySet()) { Dimensions dimensions = dimensionsMapEntry.getKey(); List measurementList = dimensionsMapEntry.getValue(); diff --git a/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9RepoWriter.java b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9RepoWriter.java index 25f38a57..27c09ccd 100644 --- a/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9RepoWriter.java +++ b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9RepoWriter.java @@ -53,14 +53,13 @@ public class InfluxV9RepoWriter { private static final Logger logger = LoggerFactory.getLogger(InfluxV9RepoWriter.class); - private final PersisterConfig config; - private final String influxName; private final String influxUrl; private final String influxCreds; private final String influxUser; private final String influxPass; private final String influxRetentionPolicy; + private final boolean gzip; private final CloseableHttpClient httpClient; @@ -71,21 +70,20 @@ public class InfluxV9RepoWriter { @Inject public InfluxV9RepoWriter(final PersisterConfig config) { - this.config = config; - this.influxName = config.getInfluxDBConfiguration().getName(); this.influxUrl = config.getInfluxDBConfiguration().getUrl() + "/write"; this.influxUser = config.getInfluxDBConfiguration().getUser(); this.influxPass = config.getInfluxDBConfiguration().getPassword(); this.influxCreds = this.influxUser + ":" + this.influxPass; this.influxRetentionPolicy = config.getInfluxDBConfiguration().getRetentionPolicy(); + this.gzip = config.getInfluxDBConfiguration().getGzip(); this.baseAuthHeader = "Basic " + new String(Base64.encodeBase64(this.influxCreds.getBytes())); PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(); cm.setMaxTotal(config.getInfluxDBConfiguration().getMaxHttpConnections()); - if (config.getInfluxDBConfiguration().getGzip()) { + if (this.gzip) { this.httpClient = HttpClients.custom().setConnectionManager(cm) @@ -123,7 +121,6 @@ public class InfluxV9RepoWriter { this.httpClient = HttpClients.custom().setConnectionManager(cm).build(); } - } protected void write(final InfluxPoint[] influxPointArry) throws Exception { @@ -140,7 +137,7 @@ public class InfluxV9RepoWriter { String json = this.objectMapper.writeValueAsString(influxWrite); - if (this.config.getInfluxDBConfiguration().getGzip()) { + if (this.gzip) { HttpEntity requestEntity = diff --git a/java/src/test/java/monasca/persister/MonPersisterConsumerTest.java b/java/src/test/java/monasca/persister/MonPersisterConsumerTest.java index d8fe77d8..2f6063f2 100644 --- a/java/src/test/java/monasca/persister/MonPersisterConsumerTest.java +++ b/java/src/test/java/monasca/persister/MonPersisterConsumerTest.java @@ -17,8 +17,8 @@ package monasca.persister; -import monasca.persister.consumer.KafkaMetricsConsumer; -import monasca.persister.consumer.MetricsConsumer; +import monasca.persister.consumer.metric.KafkaMetricsConsumer; +import monasca.persister.consumer.metric.MetricsConsumer; import monasca.persister.pipeline.MetricPipeline; import monasca.persister.pipeline.event.MetricHandler;