diff --git a/java/src/main/java/monasca/persister/consumer/KafkaConsumer.java b/java/src/main/java/monasca/persister/consumer/KafkaConsumer.java index ff3b12b4..9f4e0107 100644 --- a/java/src/main/java/monasca/persister/consumer/KafkaConsumer.java +++ b/java/src/main/java/monasca/persister/consumer/KafkaConsumer.java @@ -17,7 +17,6 @@ package monasca.persister.consumer; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.Inject; import com.google.inject.assistedinject.Assisted; @@ -25,16 +24,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; public class KafkaConsumer { private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); - private static final int WAIT_TIME = 5; - private ExecutorService executorService; private final KafkaConsumerRunnableBasic kafkaConsumerRunnableBasic; @@ -66,27 +60,5 @@ public class KafkaConsumer { kafkaConsumerRunnableBasic.stop(); - if (executorService != null) { - - logger.info("[{}]: shutting down executor service", this.threadId); - - try { - - logger.info("[{}]: awaiting termination...", this.threadId); - - if (!executorService.awaitTermination(WAIT_TIME, TimeUnit.SECONDS)) { - - logger.warn("[{}]: did not shut down in {} seconds", this.threadId, WAIT_TIME); - - } - - logger.info("[{}]: terminated", this.threadId); - - } catch (InterruptedException e) { - - logger.info("[{}]: awaitTermination interrupted", this.threadId, e); - - } - } } } diff --git a/java/src/main/java/monasca/persister/consumer/KafkaConsumerRunnableBasic.java b/java/src/main/java/monasca/persister/consumer/KafkaConsumerRunnableBasic.java index 3d978ae6..834a87eb 100644 --- a/java/src/main/java/monasca/persister/consumer/KafkaConsumerRunnableBasic.java +++ b/java/src/main/java/monasca/persister/consumer/KafkaConsumerRunnableBasic.java @@ -22,12 +22,10 @@ import monasca.persister.pipeline.ManagedPipeline; import com.google.inject.Inject; import com.google.inject.assistedinject.Assisted; -import org.apache.log4j.LogManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; import kafka.consumer.ConsumerIterator; import monasca.persister.repository.RepoException; @@ -40,7 +38,6 @@ public class KafkaConsumerRunnableBasic implements Runnable { private final String threadId; private final ManagedPipeline pipeline; private volatile boolean stop = false; - private boolean fatalErrorDetected = false; private ExecutorService executorService; @@ -83,30 +80,6 @@ public class KafkaConsumerRunnableBasic implements Runnable { this.stop = true; - try { - - if (!this.fatalErrorDetected) { - - logger.info("[{}}: shutting pipeline down", this.threadId); - - if (pipeline.shutdown()) { - - markRead(); - - } - - } else { - - logger.info("[{}]: fatal error detected. Exiting immediately without flush", this.threadId); - - } - - } catch (Exception e) { - - logger.error("caught fatal exception while shutting down", e); - - } - } public void run() { @@ -125,7 +98,7 @@ public class KafkaConsumerRunnableBasic implements Runnable { if (isInterrupted()) { - this.fatalErrorDetected = true; + logger.debug("[{}]: is interrupted", this.threadId); break; } @@ -134,7 +107,14 @@ public class KafkaConsumerRunnableBasic implements Runnable { if (isInterrupted()) { - this.fatalErrorDetected = true; + logger.debug("[{}]: is interrupted", this.threadId); + break; + + } + + if (this.stop) { + + logger.debug("[{}]: is stopped", this.threadId); break; } @@ -151,7 +131,14 @@ public class KafkaConsumerRunnableBasic implements Runnable { if (isInterrupted()) { - this.fatalErrorDetected = true; + logger.debug("[{}]: is interrupted", this.threadId); + break; + + } + + if (this.stop) { + + logger.debug("[{}]: is stopped", this.threadId); break; } @@ -163,36 +150,25 @@ public class KafkaConsumerRunnableBasic implements Runnable { } catch (Throwable e) { logger.error( - "[{}]: caught fatal exception while publishing msg. Shutting entire persister down now!", - this.threadId, e); + "[{}]: caught fatal exception while publishing msg. Shutting entire persister down " + + "now!", this.threadId, e); - this.stop = true; - this.fatalErrorDetected = true; + logger.error("[{}]: calling shutdown on executor service", this.threadId); + this.executorService.shutdownNow(); - this.executorService.shutdownNow(); - - try { - - this.executorService.awaitTermination(5, TimeUnit.SECONDS); - - } catch (InterruptedException e1) { - - logger.info("[{}]: interrupted while awaiting termination", this.threadId, e1); + logger.error("[{}]: shutting down system. calling system.exit(1)", this.threadId); + System.exit(1); } - LogManager.shutdown(); - - System.exit(1); - - } - } - logger.info("[{}]: shutting down", this.threadId); + logger.info("[{}]: calling stop on kafka channel", this.threadId); this.kafkaChannel.stop(); + logger.debug("[{}]: exiting main run loop", this.threadId); + } protected void publishEvent(final String msg) throws RepoException { diff --git a/java/src/main/java/monasca/persister/repository/vertica/VerticaMetricRepo.java b/java/src/main/java/monasca/persister/repository/vertica/VerticaMetricRepo.java index 258473ad..7ad4e289 100644 --- a/java/src/main/java/monasca/persister/repository/vertica/VerticaMetricRepo.java +++ b/java/src/main/java/monasca/persister/repository/vertica/VerticaMetricRepo.java @@ -487,7 +487,6 @@ public class VerticaMetricRepo extends VerticaRepo implements Repo