diff --git a/java/src/main/java/monasca/persister/pipeline/event/AlarmStateTransitionedEventHandler.java b/java/src/main/java/monasca/persister/pipeline/event/AlarmStateTransitionedEventHandler.java index 232f3acc..6bce6407 100644 --- a/java/src/main/java/monasca/persister/pipeline/event/AlarmStateTransitionedEventHandler.java +++ b/java/src/main/java/monasca/persister/pipeline/event/AlarmStateTransitionedEventHandler.java @@ -32,6 +32,8 @@ import monasca.persister.repository.Repo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; + public class AlarmStateTransitionedEventHandler extends FlushableHandler { @@ -61,10 +63,10 @@ public class AlarmStateTransitionedEventHandler extends } @Override - protected int process(String msg) throws Exception { + protected int process(String msg) throws IOException { AlarmStateTransitionedEvent alarmStateTransitionedEvent = - objectMapper.readValue(msg, AlarmStateTransitionedEvent.class); + this.objectMapper.readValue(msg, AlarmStateTransitionedEvent.class); logger.debug("[{}]: [{}:{}] {}", this.threadId, @@ -72,7 +74,7 @@ public class AlarmStateTransitionedEventHandler extends this.getMsgCount(), alarmStateTransitionedEvent); - alarmRepo.addToBatch(alarmStateTransitionedEvent); + this.alarmRepo.addToBatch(alarmStateTransitionedEvent); this.alarmStateTransitionCounter.inc(); @@ -93,7 +95,7 @@ public class AlarmStateTransitionedEventHandler extends @Override protected int flushRepository() throws Exception { - return alarmRepo.flush(this.threadId); + return this.alarmRepo.flush(this.threadId); } } diff --git a/java/src/main/java/monasca/persister/pipeline/event/FlushableHandler.java b/java/src/main/java/monasca/persister/pipeline/event/FlushableHandler.java index 0f696367..f081b37d 100644 --- a/java/src/main/java/monasca/persister/pipeline/event/FlushableHandler.java +++ b/java/src/main/java/monasca/persister/pipeline/event/FlushableHandler.java @@ -28,6 +28,8 @@ import io.dropwizard.setup.Environment; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; + public abstract class FlushableHandler { private static final Logger logger = LoggerFactory.getLogger(FlushableHandler.class); @@ -88,7 +90,7 @@ public abstract class FlushableHandler { protected abstract int flushRepository() throws Exception; - protected abstract int process(String msg) throws Exception; + protected abstract int process(String msg) throws IOException; public boolean onEvent(final String msg) throws Exception { @@ -124,7 +126,7 @@ public abstract class FlushableHandler { } } - private boolean isBatchSize() throws Exception { + private boolean isBatchSize() { logger.debug("[{}]: checking batch size", this.threadId); @@ -136,19 +138,21 @@ public abstract class FlushableHandler { } else { + logger.debug("[{}]: batch size now at {}, batch size {} not attained", + this.threadId, + this.msgCount, + this.batchSize); + return false; } } - private boolean isFlushTime() throws Exception { + private boolean isFlushTime() { - logger.debug("[{}}: checking flush time", this.threadId); - - logger.debug( - "[{}]: got heartbeat message, flush every {} seconds.", - this.threadId, - this.secondsBetweenFlushes); + logger.debug("[{}}: got heartbeat message, checking flush time. flush every {} seconds.", + this.threadId, + this.secondsBetweenFlushes); long now = System.currentTimeMillis(); diff --git a/java/src/main/java/monasca/persister/pipeline/event/MetricHandler.java b/java/src/main/java/monasca/persister/pipeline/event/MetricHandler.java index 724a6812..03fe38c7 100644 --- a/java/src/main/java/monasca/persister/pipeline/event/MetricHandler.java +++ b/java/src/main/java/monasca/persister/pipeline/event/MetricHandler.java @@ -17,6 +17,10 @@ package monasca.persister.pipeline.event; +import monasca.common.model.metric.MetricEnvelope; +import monasca.persister.configuration.PipelineConfig; +import monasca.persister.repository.Repo; + import com.google.inject.Inject; import com.google.inject.assistedinject.Assisted; @@ -27,10 +31,9 @@ import com.fasterxml.jackson.databind.PropertyNamingStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; + import io.dropwizard.setup.Environment; -import monasca.common.model.metric.MetricEnvelope; -import monasca.persister.configuration.PipelineConfig; -import monasca.persister.repository.Repo; public class MetricHandler extends FlushableHandler { @@ -60,10 +63,10 @@ public class MetricHandler extends FlushableHandler { } @Override - public int process(String msg) throws Exception { + public int process(String msg) throws IOException { MetricEnvelope[] metricEnvelopesArry = - objectMapper.readValue(msg, MetricEnvelope[].class); + this.objectMapper.readValue(msg, MetricEnvelope[].class); for (final MetricEnvelope metricEnvelope : metricEnvelopesArry) { @@ -103,7 +106,7 @@ public class MetricHandler extends FlushableHandler { @Override public int flushRepository() throws Exception { - return metricRepo.flush(this.threadId); + return this.metricRepo.flush(this.threadId); } } diff --git a/java/src/main/java/monasca/persister/repository/influxdb/InfluxPoint.java b/java/src/main/java/monasca/persister/repository/influxdb/InfluxPoint.java index 4e4d2009..ecd9f8ab 100644 --- a/java/src/main/java/monasca/persister/repository/influxdb/InfluxPoint.java +++ b/java/src/main/java/monasca/persister/repository/influxdb/InfluxPoint.java @@ -26,8 +26,12 @@ public class InfluxPoint { private final String timestamp; private final Map fields; - public InfluxPoint(final String name, final Map tags, final String timestamp, - final Map fields) { + public InfluxPoint( + final String name, + final Map tags, + final String timestamp, + final Map fields) { + this.name = name; this.tags = tags; this.timestamp = timestamp; diff --git a/java/src/main/java/monasca/persister/repository/influxdb/InfluxRepo.java b/java/src/main/java/monasca/persister/repository/influxdb/InfluxRepo.java index 0fac45ce..059ea9c1 100644 --- a/java/src/main/java/monasca/persister/repository/influxdb/InfluxRepo.java +++ b/java/src/main/java/monasca/persister/repository/influxdb/InfluxRepo.java @@ -59,17 +59,17 @@ public abstract class InfluxRepo implements Repo { try { - final long startTime = System.currentTimeMillis(); - final Timer.Context context = flushTimer.time(); + final long startTime = System.currentTimeMillis(); + int msgWriteCnt = write(id); final long endTime = System.currentTimeMillis(); context.stop(); - logger.debug("[{}]: flushing batch took {} millis", id, endTime - startTime); + logger.debug("[{}]: writing to influxdb took {} millis", id, endTime - startTime); clearBuffers(); @@ -77,7 +77,7 @@ public abstract class InfluxRepo implements Repo { } catch (Exception e) { - logger.error("[{}]: failed to write msg to influxdb", id, e); + logger.error("[{}]: failed to write to influxdb", id, e); throw e;