From 0d8f1b6328f2f2f8e7cdfeecb164543f2c62c67a Mon Sep 17 00:00:00 2001 From: Deklan Dieterly Date: Fri, 11 Dec 2015 12:59:29 -0700 Subject: [PATCH] Make persister shutdown on error Refactor code to handle errors better. Persister shuts down faster and more reliably. On shutdown, messages in buffer will be dropped. On restart, messages will be read from Kafka and processed again. Fixes problem with persister not shutting down and restarting when a vertica node goes down. Change-Id: I60b4ba6e06e69e68878f8400e360250e3608e5a5 --- .../persister/consumer/KafkaConsumer.java | 28 ------- .../consumer/KafkaConsumerRunnableBasic.java | 76 +++++++------------ .../repository/vertica/VerticaMetricRepo.java | 1 - 3 files changed, 26 insertions(+), 79 deletions(-) diff --git a/java/src/main/java/monasca/persister/consumer/KafkaConsumer.java b/java/src/main/java/monasca/persister/consumer/KafkaConsumer.java index ff3b12b4..9f4e0107 100644 --- a/java/src/main/java/monasca/persister/consumer/KafkaConsumer.java +++ b/java/src/main/java/monasca/persister/consumer/KafkaConsumer.java @@ -17,7 +17,6 @@ package monasca.persister.consumer; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.Inject; import com.google.inject.assistedinject.Assisted; @@ -25,16 +24,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; public class KafkaConsumer { private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); - private static final int WAIT_TIME = 5; - private ExecutorService executorService; private final KafkaConsumerRunnableBasic kafkaConsumerRunnableBasic; @@ -66,27 +60,5 @@ public class KafkaConsumer { kafkaConsumerRunnableBasic.stop(); - if (executorService != null) { - - logger.info("[{}]: shutting down executor service", this.threadId); - - try { - - logger.info("[{}]: awaiting termination...", this.threadId); - - if (!executorService.awaitTermination(WAIT_TIME, TimeUnit.SECONDS)) { - - logger.warn("[{}]: did not shut down in {} seconds", this.threadId, WAIT_TIME); - - } - - logger.info("[{}]: terminated", this.threadId); - - } catch (InterruptedException e) { - - logger.info("[{}]: awaitTermination interrupted", this.threadId, e); - - } - } } } diff --git a/java/src/main/java/monasca/persister/consumer/KafkaConsumerRunnableBasic.java b/java/src/main/java/monasca/persister/consumer/KafkaConsumerRunnableBasic.java index 3d978ae6..834a87eb 100644 --- a/java/src/main/java/monasca/persister/consumer/KafkaConsumerRunnableBasic.java +++ b/java/src/main/java/monasca/persister/consumer/KafkaConsumerRunnableBasic.java @@ -22,12 +22,10 @@ import monasca.persister.pipeline.ManagedPipeline; import com.google.inject.Inject; import com.google.inject.assistedinject.Assisted; -import org.apache.log4j.LogManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; import kafka.consumer.ConsumerIterator; import monasca.persister.repository.RepoException; @@ -40,7 +38,6 @@ public class KafkaConsumerRunnableBasic implements Runnable { private final String threadId; private final ManagedPipeline pipeline; private volatile boolean stop = false; - private boolean fatalErrorDetected = false; private ExecutorService executorService; @@ -83,30 +80,6 @@ public class KafkaConsumerRunnableBasic implements Runnable { this.stop = true; - try { - - if (!this.fatalErrorDetected) { - - logger.info("[{}}: shutting pipeline down", this.threadId); - - if (pipeline.shutdown()) { - - markRead(); - - } - - } else { - - logger.info("[{}]: fatal error detected. Exiting immediately without flush", this.threadId); - - } - - } catch (Exception e) { - - logger.error("caught fatal exception while shutting down", e); - - } - } public void run() { @@ -125,7 +98,7 @@ public class KafkaConsumerRunnableBasic implements Runnable { if (isInterrupted()) { - this.fatalErrorDetected = true; + logger.debug("[{}]: is interrupted", this.threadId); break; } @@ -134,7 +107,14 @@ public class KafkaConsumerRunnableBasic implements Runnable { if (isInterrupted()) { - this.fatalErrorDetected = true; + logger.debug("[{}]: is interrupted", this.threadId); + break; + + } + + if (this.stop) { + + logger.debug("[{}]: is stopped", this.threadId); break; } @@ -151,7 +131,14 @@ public class KafkaConsumerRunnableBasic implements Runnable { if (isInterrupted()) { - this.fatalErrorDetected = true; + logger.debug("[{}]: is interrupted", this.threadId); + break; + + } + + if (this.stop) { + + logger.debug("[{}]: is stopped", this.threadId); break; } @@ -163,36 +150,25 @@ public class KafkaConsumerRunnableBasic implements Runnable { } catch (Throwable e) { logger.error( - "[{}]: caught fatal exception while publishing msg. Shutting entire persister down now!", - this.threadId, e); + "[{}]: caught fatal exception while publishing msg. Shutting entire persister down " + + "now!", this.threadId, e); - this.stop = true; - this.fatalErrorDetected = true; + logger.error("[{}]: calling shutdown on executor service", this.threadId); + this.executorService.shutdownNow(); - this.executorService.shutdownNow(); - - try { - - this.executorService.awaitTermination(5, TimeUnit.SECONDS); - - } catch (InterruptedException e1) { - - logger.info("[{}]: interrupted while awaiting termination", this.threadId, e1); + logger.error("[{}]: shutting down system. calling system.exit(1)", this.threadId); + System.exit(1); } - LogManager.shutdown(); - - System.exit(1); - - } - } - logger.info("[{}]: shutting down", this.threadId); + logger.info("[{}]: calling stop on kafka channel", this.threadId); this.kafkaChannel.stop(); + logger.debug("[{}]: exiting main run loop", this.threadId); + } protected void publishEvent(final String msg) throws RepoException { diff --git a/java/src/main/java/monasca/persister/repository/vertica/VerticaMetricRepo.java b/java/src/main/java/monasca/persister/repository/vertica/VerticaMetricRepo.java index 258473ad..7ad4e289 100644 --- a/java/src/main/java/monasca/persister/repository/vertica/VerticaMetricRepo.java +++ b/java/src/main/java/monasca/persister/repository/vertica/VerticaMetricRepo.java @@ -487,7 +487,6 @@ public class VerticaMetricRepo extends VerticaRepo implements Repo