diff --git a/java/src/main/java/monasca/persister/PersisterApplication.java b/java/src/main/java/monasca/persister/PersisterApplication.java index f3ddc4c7..ec3e1a22 100644 --- a/java/src/main/java/monasca/persister/PersisterApplication.java +++ b/java/src/main/java/monasca/persister/PersisterApplication.java @@ -17,6 +17,7 @@ package monasca.persister; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.Guice; import com.google.inject.Injector; import com.google.inject.Key; @@ -25,6 +26,10 @@ import com.google.inject.TypeLiteral; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; + import io.dropwizard.Application; import io.dropwizard.setup.Bootstrap; import io.dropwizard.setup.Environment; @@ -105,8 +110,18 @@ public class PersisterApplication extends Application { injector.getInstance(Key.get(new TypeLiteral>(){})); final KafkaConsumerRunnableBasicFactory kafkaMetricConsumerRunnableBasicFactory = - injector.getInstance(Key.get(new TypeLiteral>(){})); + injector.getInstance( + Key.get(new TypeLiteral>() { + })); + + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setDaemon(true) + .build(); + + int totalNumberOfThreads = configuration.getMetricConfiguration().getNumThreads() + + configuration.getAlarmHistoryConfiguration().getNumThreads(); + + ExecutorService executorService = Executors.newFixedThreadPool(totalNumberOfThreads, threadFactory); for (int i = 0; i < configuration.getMetricConfiguration().getNumThreads(); i++) { @@ -122,7 +137,7 @@ public class PersisterApplication extends Application { kafkaMetricConsumerRunnableBasicFactory.create(managedMetricPipeline, kafkaMetricChannel, threadId); final KafkaConsumer kafkaMetricConsumer = - kafkaMetricConsumerFactory.create(kafkaMetricConsumerRunnableBasic, threadId); + kafkaMetricConsumerFactory.create(kafkaMetricConsumerRunnableBasic, threadId, executorService); ManagedConsumer managedMetricConsumer = metricManagedConsumerFactory.create(kafkaMetricConsumer, threadId); @@ -158,7 +173,8 @@ public class PersisterApplication extends Application { kafkaAlarmStateTransitionConsumerRunnableBasicFactory.create(managedAlarmStateTransitionPipeline, kafkaAlarmStateTransitionChannel, threadId); final KafkaConsumer kafkaAlarmStateTransitionConsumer = - kafkaAlarmStateTransitionConsumerFactory.create(kafkaAlarmStateTransitionConsumerRunnableBasic, threadId); + kafkaAlarmStateTransitionConsumerFactory.create(kafkaAlarmStateTransitionConsumerRunnableBasic, threadId, + executorService); ManagedConsumer managedAlarmStateTransitionConsumer = alarmStateTransitionsManagedConsumerFactory.create(kafkaAlarmStateTransitionConsumer, threadId); diff --git a/java/src/main/java/monasca/persister/consumer/KafkaConsumer.java b/java/src/main/java/monasca/persister/consumer/KafkaConsumer.java index d8520828..ff3b12b4 100644 --- a/java/src/main/java/monasca/persister/consumer/KafkaConsumer.java +++ b/java/src/main/java/monasca/persister/consumer/KafkaConsumer.java @@ -33,7 +33,7 @@ public class KafkaConsumer { private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); - private static final int WAIT_TIME = 10; + private static final int WAIT_TIME = 5; private ExecutorService executorService; @@ -43,10 +43,12 @@ public class KafkaConsumer { @Inject public KafkaConsumer( @Assisted KafkaConsumerRunnableBasic kafkaConsumerRunnableBasic, - @Assisted String threadId) { + @Assisted String threadId, + @Assisted ExecutorService executorService) { this.kafkaConsumerRunnableBasic = kafkaConsumerRunnableBasic; this.threadId = threadId; + this.executorService = executorService; } @@ -54,13 +56,6 @@ public class KafkaConsumer { logger.info("[{}]: start", this.threadId); - ThreadFactory threadFactory = new ThreadFactoryBuilder() - .setNameFormat(threadId + "-%d") - .setDaemon(true) - .build(); - - executorService = Executors.newSingleThreadExecutor(threadFactory); - executorService.submit(kafkaConsumerRunnableBasic.setExecutorService(executorService)); } @@ -75,8 +70,6 @@ public class KafkaConsumer { logger.info("[{}]: shutting down executor service", this.threadId); - executorService.shutdown(); - try { logger.info("[{}]: awaiting termination...", this.threadId); diff --git a/java/src/main/java/monasca/persister/consumer/KafkaConsumerFactory.java b/java/src/main/java/monasca/persister/consumer/KafkaConsumerFactory.java index 4fb02b7a..0960b3c2 100644 --- a/java/src/main/java/monasca/persister/consumer/KafkaConsumerFactory.java +++ b/java/src/main/java/monasca/persister/consumer/KafkaConsumerFactory.java @@ -17,10 +17,13 @@ package monasca.persister.consumer; +import java.util.concurrent.ExecutorService; + public interface KafkaConsumerFactory { KafkaConsumer create( KafkaConsumerRunnableBasic kafkaConsumerRunnableBasic, - String threadId); + String threadId, + ExecutorService executorService); } diff --git a/java/src/main/java/monasca/persister/consumer/KafkaConsumerRunnableBasic.java b/java/src/main/java/monasca/persister/consumer/KafkaConsumerRunnableBasic.java index b4cbb29b..3d978ae6 100644 --- a/java/src/main/java/monasca/persister/consumer/KafkaConsumerRunnableBasic.java +++ b/java/src/main/java/monasca/persister/consumer/KafkaConsumerRunnableBasic.java @@ -27,8 +27,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import kafka.consumer.ConsumerIterator; +import monasca.persister.repository.RepoException; public class KafkaConsumerRunnableBasic implements Runnable { @@ -38,6 +40,7 @@ public class KafkaConsumerRunnableBasic implements Runnable { private final String threadId; private final ManagedPipeline pipeline; private volatile boolean stop = false; + private boolean fatalErrorDetected = false; private ExecutorService executorService; @@ -60,7 +63,7 @@ public class KafkaConsumerRunnableBasic implements Runnable { } - protected void publishHeartbeat() { + protected void publishHeartbeat() throws RepoException { publishEvent(null); @@ -82,9 +85,19 @@ public class KafkaConsumerRunnableBasic implements Runnable { try { - if (pipeline.shutdown()) { + if (!this.fatalErrorDetected) { - markRead(); + logger.info("[{}}: shutting pipeline down", this.threadId); + + if (pipeline.shutdown()) { + + markRead(); + + } + + } else { + + logger.info("[{}]: fatal error detected. Exiting immediately without flush", this.threadId); } @@ -93,6 +106,7 @@ public class KafkaConsumerRunnableBasic implements Runnable { logger.error("caught fatal exception while shutting down", e); } + } public void run() { @@ -103,12 +117,28 @@ public class KafkaConsumerRunnableBasic implements Runnable { logger.debug("[{}]: KafkaChannel has stream iterator", this.threadId); - while (!this.stop) { + while (!this.stop) { + + try { try { + if (isInterrupted()) { + + this.fatalErrorDetected = true; + break; + + } + if (it.hasNext()) { + if (isInterrupted()) { + + this.fatalErrorDetected = true; + break; + + } + final String msg = new String(it.next().message()); logger.debug("[{}]: {}", this.threadId, msg); @@ -119,45 +149,73 @@ public class KafkaConsumerRunnableBasic implements Runnable { } catch (kafka.consumer.ConsumerTimeoutException cte) { + if (isInterrupted()) { + + this.fatalErrorDetected = true; + break; + + } + publishHeartbeat(); } - if (Thread.currentThread().isInterrupted()) { + } catch (Throwable e) { - logger.debug("[{}]: is interrupted. breaking out of run loop", this.threadId); + logger.error( + "[{}]: caught fatal exception while publishing msg. Shutting entire persister down now!", + this.threadId, e); - break; + this.stop = true; + this.fatalErrorDetected = true; + + this.executorService.shutdownNow(); + + try { + + this.executorService.awaitTermination(5, TimeUnit.SECONDS); + + } catch (InterruptedException e1) { + + logger.info("[{}]: interrupted while awaiting termination", this.threadId, e1); } + + LogManager.shutdown(); + + System.exit(1); + } - logger.info("[{}]: shutting down", this.threadId); - - this.kafkaChannel.stop(); - } + logger.info("[{}]: shutting down", this.threadId); - protected void publishEvent(final String msg) { + this.kafkaChannel.stop(); - try { + } - if (pipeline.publishEvent(msg)) { + protected void publishEvent(final String msg) throws RepoException { - markRead(); + if (pipeline.publishEvent(msg)) { - } + markRead(); - } catch (Exception e) { + } - logger.error("caught fatal exception while publishing msg. Shutting entire persister down now!"); + } - this.executorService.shutdownNow(); + private boolean isInterrupted() { - LogManager.shutdown(); + if (Thread.currentThread().interrupted()) { - System.exit(-1); + logger.debug("[{}]: is interrupted. breaking out of run loop", this.threadId); + + return true; + + } else { + + return false; } } diff --git a/java/src/main/java/monasca/persister/repository/vertica/VerticaMetricRepo.java b/java/src/main/java/monasca/persister/repository/vertica/VerticaMetricRepo.java index 21df854c..258473ad 100644 --- a/java/src/main/java/monasca/persister/repository/vertica/VerticaMetricRepo.java +++ b/java/src/main/java/monasca/persister/repository/vertica/VerticaMetricRepo.java @@ -450,7 +450,7 @@ public class VerticaMetricRepo extends VerticaRepo implements Repo