From 4c4c1d346445e9ff960ca5cefc55905d81ab3974 Mon Sep 17 00:00:00 2001 From: Deklan Dieterly Date: Fri, 24 Apr 2015 11:29:18 -0600 Subject: [PATCH] Fix mark read functionality perculate exception all the way up so that mark read is not called. flush on shutdown. Change-Id: Ie8c1b14765a61c0ed68fc83b23e52ef7bd60f4ab --- .../consumer/KafkaConsumerRunnableBasic.java | 14 ++--- .../persister/pipeline/ManagedPipeline.java | 18 +++++-- .../AlarmStateTransitionedEventHandler.java | 4 +- .../pipeline/event/FlushableHandler.java | 53 ++++++++++++------- .../pipeline/event/MetricHandler.java | 4 +- .../monasca/persister/repository/Repo.java | 2 +- .../repository/influxdb/InfluxAlarmRepo.java | 2 - .../repository/influxdb/InfluxMetricRepo.java | 2 - .../repository/influxdb/InfluxRepo.java | 45 +++++++++------- .../influxdb/InfluxV9AlarmRepo.java | 4 +- .../influxdb/InfluxV9MetricRepo.java | 4 +- .../influxdb/InfluxV9RepoWriter.java | 4 +- .../repository/vertica/VerticaAlarmRepo.java | 6 ++- .../repository/vertica/VerticaMetricRepo.java | 5 +- 14 files changed, 103 insertions(+), 64 deletions(-) diff --git a/java/src/main/java/monasca/persister/consumer/KafkaConsumerRunnableBasic.java b/java/src/main/java/monasca/persister/consumer/KafkaConsumerRunnableBasic.java index 963a2745..0c039b0e 100644 --- a/java/src/main/java/monasca/persister/consumer/KafkaConsumerRunnableBasic.java +++ b/java/src/main/java/monasca/persister/consumer/KafkaConsumerRunnableBasic.java @@ -48,12 +48,8 @@ public class KafkaConsumerRunnableBasic implements Runnable { } protected void publishHeartbeat() { + publishEvent(null); - } - - protected void handleMessage(String msg) { - - publishEvent(msg); } @@ -62,6 +58,7 @@ public class KafkaConsumerRunnableBasic implements Runnable { logger.debug("[{}]: marking read", this.threadId); this.kafkaChannel.markRead(); + } public void stop() { @@ -70,8 +67,11 @@ public class KafkaConsumerRunnableBasic implements Runnable { this.stop = true; - this.pipeline.shutdown(); + if (pipeline.shutdown()) { + markRead(); + + } } public void run() { @@ -92,7 +92,7 @@ public class KafkaConsumerRunnableBasic implements Runnable { logger.debug("[{}]: {}", this.threadId, msg); - handleMessage(msg); + publishEvent(msg); } diff --git a/java/src/main/java/monasca/persister/pipeline/ManagedPipeline.java b/java/src/main/java/monasca/persister/pipeline/ManagedPipeline.java index 53b64021..47cc09aa 100644 --- a/java/src/main/java/monasca/persister/pipeline/ManagedPipeline.java +++ b/java/src/main/java/monasca/persister/pipeline/ManagedPipeline.java @@ -42,13 +42,25 @@ public class ManagedPipeline { } - public void shutdown() { + public boolean shutdown() { logger.info("[{}]: shutdown", this.threadId); - handler.flush(); + try { + + int msgFlushCnt = handler.flush(); + + return msgFlushCnt > 0 ? true : false; + + } catch (Exception e) { + + logger.error("[{}}: failed to flush repo on shutdown", this.threadId, e); + + return false; + } } + public boolean publishEvent(String msg) { try { @@ -57,7 +69,7 @@ public class ManagedPipeline { } catch (Exception e) { - logger.error("[{}]: failed to handle msg: {}", msg, e); + logger.error("[{}]: failed to handle msg: {}", this.threadId, msg, e); return false; diff --git a/java/src/main/java/monasca/persister/pipeline/event/AlarmStateTransitionedEventHandler.java b/java/src/main/java/monasca/persister/pipeline/event/AlarmStateTransitionedEventHandler.java index 664fa68f..232f3acc 100644 --- a/java/src/main/java/monasca/persister/pipeline/event/AlarmStateTransitionedEventHandler.java +++ b/java/src/main/java/monasca/persister/pipeline/event/AlarmStateTransitionedEventHandler.java @@ -91,9 +91,9 @@ public class AlarmStateTransitionedEventHandler extends } @Override - protected void flushRepository() { + protected int flushRepository() throws Exception { - alarmRepo.flush(this.threadId); + return alarmRepo.flush(this.threadId); } } diff --git a/java/src/main/java/monasca/persister/pipeline/event/FlushableHandler.java b/java/src/main/java/monasca/persister/pipeline/event/FlushableHandler.java index df30b8ef..54b65819 100644 --- a/java/src/main/java/monasca/persister/pipeline/event/FlushableHandler.java +++ b/java/src/main/java/monasca/persister/pipeline/event/FlushableHandler.java @@ -86,7 +86,7 @@ public abstract class FlushableHandler { protected abstract void initObjectMapper(); - protected abstract void flushRepository(); + protected abstract int flushRepository() throws Exception; protected abstract int process(String msg) throws Exception; @@ -94,26 +94,44 @@ public abstract class FlushableHandler { if (msg == null) { - return checkFlushTime(); + if (checkFlushTime()) { + int msgFlushCnt = flush(); + + return msgFlushCnt > 0 ? true : false; + + } else { + + return false; + + } } this.msgCount += process(msg); this.processedMeter.mark(); - return checkBatchSize(); + if (checkBatchSize()) { + int msgFlushCnt = flush(); + + return msgFlushCnt > 0 ? true : false; + + } else { + + return false; + + } } - private boolean checkBatchSize() { + private boolean checkBatchSize() throws Exception { + + logger.debug("[{}]: checking batch size", this.threadId); if (this.msgCount >= this.batchSize) { logger.debug("[{}]: batch sized {} attained", this.threadId, this.batchSize); - flush(); - return true; } else { @@ -123,7 +141,9 @@ public abstract class FlushableHandler { } } - private boolean checkFlushTime() { + private boolean checkFlushTime() throws Exception { + + logger.debug("[{}}: checking flush time", this.threadId); logger.debug( "[{}]: got heartbeat message, flush every {} seconds.", @@ -137,8 +157,6 @@ public abstract class FlushableHandler { this.threadId, (System.currentTimeMillis() - this.flushTimeMillis)); - flush(); - return true; } else { @@ -153,20 +171,13 @@ public abstract class FlushableHandler { } } - public void flush() { + public int flush() throws Exception { - logger.debug("[{}]: flush", this.threadId); - - if (this.msgCount == 0) { - - logger.debug("[{}]: nothing to flush", this.threadId); - - return; - } + logger.debug("[{}]: flushing", this.threadId); Timer.Context context = this.commitTimer.time(); - flushRepository(); + int msgFlushCnt = flushRepository(); context.stop(); @@ -174,11 +185,13 @@ public abstract class FlushableHandler { this.flushTimeMillis = System.currentTimeMillis() + this.millisBetweenFlushes; - logger.debug("[{}]: flushed {} msg", this.threadId, this.msgCount); + logger.debug("[{}]: flushed {} msg", this.threadId, msgFlushCnt); this.msgCount = 0; this.batchCount++; + return msgFlushCnt; + } protected long getBatchCount() { diff --git a/java/src/main/java/monasca/persister/pipeline/event/MetricHandler.java b/java/src/main/java/monasca/persister/pipeline/event/MetricHandler.java index 78b2e85a..724a6812 100644 --- a/java/src/main/java/monasca/persister/pipeline/event/MetricHandler.java +++ b/java/src/main/java/monasca/persister/pipeline/event/MetricHandler.java @@ -101,9 +101,9 @@ public class MetricHandler extends FlushableHandler { } @Override - public void flushRepository() { + public int flushRepository() throws Exception { - metricRepo.flush(this.threadId); + return metricRepo.flush(this.threadId); } } diff --git a/java/src/main/java/monasca/persister/repository/Repo.java b/java/src/main/java/monasca/persister/repository/Repo.java index 2e55ea1f..f2aab918 100644 --- a/java/src/main/java/monasca/persister/repository/Repo.java +++ b/java/src/main/java/monasca/persister/repository/Repo.java @@ -20,6 +20,6 @@ public interface Repo { void addToBatch(final T msg); - void flush(String id); + int flush(String id) throws Exception; } diff --git a/java/src/main/java/monasca/persister/repository/influxdb/InfluxAlarmRepo.java b/java/src/main/java/monasca/persister/repository/influxdb/InfluxAlarmRepo.java index b81ea426..cb693e50 100644 --- a/java/src/main/java/monasca/persister/repository/influxdb/InfluxAlarmRepo.java +++ b/java/src/main/java/monasca/persister/repository/influxdb/InfluxAlarmRepo.java @@ -41,8 +41,6 @@ public abstract class InfluxAlarmRepo extends InfluxRepo { protected final Meter measurementMeter; - protected abstract void write(String id) throws Exception; - public InfluxMetricRepo(final Environment env) { super(env); diff --git a/java/src/main/java/monasca/persister/repository/influxdb/InfluxRepo.java b/java/src/main/java/monasca/persister/repository/influxdb/InfluxRepo.java index dbe6ed7d..0fac45ce 100644 --- a/java/src/main/java/monasca/persister/repository/influxdb/InfluxRepo.java +++ b/java/src/main/java/monasca/persister/repository/influxdb/InfluxRepo.java @@ -38,47 +38,56 @@ public abstract class InfluxRepo implements Repo { } @Override - public void flush(String id) { + public int flush(String id) throws Exception { + + if (isBufferEmpty()) { + + logger.debug("[{}]: no msg to be written to influxdb", id); + + logger.debug("[{}]: returning from flush without flushing", id); + + return 0; + + } else { + + return writeToRepo(id); + + } + } + + private int writeToRepo(String id) throws Exception { try { - if (isBufferEmpty()) { - - logger.debug("[{}]: no msg to be written to influxdb", id); - - logger.debug("[{}]: returning from flush without flushing", id); - - return; - - } - final long startTime = System.currentTimeMillis(); final Timer.Context context = flushTimer.time(); - write(id); + int msgWriteCnt = write(id); final long endTime = System.currentTimeMillis(); context.stop(); - logger.debug("[{}]: flushing batch took {} millis", - id, endTime - startTime); + logger.debug("[{}]: flushing batch took {} millis", id, endTime - startTime); + + clearBuffers(); + + return msgWriteCnt; } catch (Exception e) { logger.error("[{}]: failed to write msg to influxdb", id, e); + throw e; + } - - clearBuffers(); - } protected abstract boolean isBufferEmpty(); - protected abstract void write(String id) throws Exception; + protected abstract int write(String id) throws Exception; protected abstract void clearBuffers(); } diff --git a/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9AlarmRepo.java b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9AlarmRepo.java index 8743aad6..e95c7105 100644 --- a/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9AlarmRepo.java +++ b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9AlarmRepo.java @@ -58,9 +58,9 @@ public class InfluxV9AlarmRepo extends InfluxAlarmRepo { } @Override - protected void write(String id) throws Exception { + protected int write(String id) throws Exception { - this.influxV9RepoWriter.write(getInfluxPointArry(), id); + return this.influxV9RepoWriter.write(getInfluxPointArry(), id); } diff --git a/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9MetricRepo.java b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9MetricRepo.java index 47596c3b..b93ef1af 100644 --- a/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9MetricRepo.java +++ b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9MetricRepo.java @@ -43,9 +43,9 @@ public class InfluxV9MetricRepo extends InfluxMetricRepo { } @Override - protected void write(String id) throws Exception { + protected int write(String id) throws Exception { - this.influxV9RepoWriter.write(getInfluxPointArry(), id); + return this.influxV9RepoWriter.write(getInfluxPointArry(), id); } diff --git a/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9RepoWriter.java b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9RepoWriter.java index 52073a8c..2ac54a3e 100644 --- a/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9RepoWriter.java +++ b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9RepoWriter.java @@ -123,7 +123,7 @@ public class InfluxV9RepoWriter { } } - protected void write(final InfluxPoint[] influxPointArry, String id) throws Exception { + protected int write(final InfluxPoint[] influxPointArry, String id) throws Exception { HttpPost request = new HttpPost(this.influxUrl); @@ -192,6 +192,8 @@ public class InfluxV9RepoWriter { .debug("[{}]: successfully sent {} points to influxdb {} at {}", id, influxPointArry.length, this.influxName, this.influxUrl); + return influxPointArry.length; + } finally { request.releaseConnection(); diff --git a/java/src/main/java/monasca/persister/repository/vertica/VerticaAlarmRepo.java b/java/src/main/java/monasca/persister/repository/vertica/VerticaAlarmRepo.java index 1b8fabb0..42643e3e 100644 --- a/java/src/main/java/monasca/persister/repository/vertica/VerticaAlarmRepo.java +++ b/java/src/main/java/monasca/persister/repository/vertica/VerticaAlarmRepo.java @@ -73,7 +73,7 @@ public class VerticaAlarmRepo extends VerticaRepo implements Repo