Merge "Make persister shutdown on error"

This commit is contained in:
Jenkins 2015-12-15 00:27:47 +00:00 committed by Gerrit Code Review
commit fbd51a043f
3 changed files with 26 additions and 79 deletions

View File

@ -17,7 +17,6 @@
package monasca.persister.consumer;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
@ -25,16 +24,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
public class KafkaConsumer<T> {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
private static final int WAIT_TIME = 5;
private ExecutorService executorService;
private final KafkaConsumerRunnableBasic<T> kafkaConsumerRunnableBasic;
@ -66,27 +60,5 @@ public class KafkaConsumer<T> {
kafkaConsumerRunnableBasic.stop();
if (executorService != null) {
logger.info("[{}]: shutting down executor service", this.threadId);
try {
logger.info("[{}]: awaiting termination...", this.threadId);
if (!executorService.awaitTermination(WAIT_TIME, TimeUnit.SECONDS)) {
logger.warn("[{}]: did not shut down in {} seconds", this.threadId, WAIT_TIME);
}
logger.info("[{}]: terminated", this.threadId);
} catch (InterruptedException e) {
logger.info("[{}]: awaitTermination interrupted", this.threadId, e);
}
}
}
}

View File

@ -22,12 +22,10 @@ import monasca.persister.pipeline.ManagedPipeline;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import org.apache.log4j.LogManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import kafka.consumer.ConsumerIterator;
import monasca.persister.repository.RepoException;
@ -40,7 +38,6 @@ public class KafkaConsumerRunnableBasic<T> implements Runnable {
private final String threadId;
private final ManagedPipeline<T> pipeline;
private volatile boolean stop = false;
private boolean fatalErrorDetected = false;
private ExecutorService executorService;
@ -83,30 +80,6 @@ public class KafkaConsumerRunnableBasic<T> implements Runnable {
this.stop = true;
try {
if (!this.fatalErrorDetected) {
logger.info("[{}}: shutting pipeline down", this.threadId);
if (pipeline.shutdown()) {
markRead();
}
} else {
logger.info("[{}]: fatal error detected. Exiting immediately without flush", this.threadId);
}
} catch (Exception e) {
logger.error("caught fatal exception while shutting down", e);
}
}
public void run() {
@ -125,7 +98,7 @@ public class KafkaConsumerRunnableBasic<T> implements Runnable {
if (isInterrupted()) {
this.fatalErrorDetected = true;
logger.debug("[{}]: is interrupted", this.threadId);
break;
}
@ -134,7 +107,14 @@ public class KafkaConsumerRunnableBasic<T> implements Runnable {
if (isInterrupted()) {
this.fatalErrorDetected = true;
logger.debug("[{}]: is interrupted", this.threadId);
break;
}
if (this.stop) {
logger.debug("[{}]: is stopped", this.threadId);
break;
}
@ -151,7 +131,14 @@ public class KafkaConsumerRunnableBasic<T> implements Runnable {
if (isInterrupted()) {
this.fatalErrorDetected = true;
logger.debug("[{}]: is interrupted", this.threadId);
break;
}
if (this.stop) {
logger.debug("[{}]: is stopped", this.threadId);
break;
}
@ -163,36 +150,25 @@ public class KafkaConsumerRunnableBasic<T> implements Runnable {
} catch (Throwable e) {
logger.error(
"[{}]: caught fatal exception while publishing msg. Shutting entire persister down now!",
this.threadId, e);
"[{}]: caught fatal exception while publishing msg. Shutting entire persister down "
+ "now!", this.threadId, e);
this.stop = true;
this.fatalErrorDetected = true;
logger.error("[{}]: calling shutdown on executor service", this.threadId);
this.executorService.shutdownNow();
this.executorService.shutdownNow();
try {
this.executorService.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e1) {
logger.info("[{}]: interrupted while awaiting termination", this.threadId, e1);
logger.error("[{}]: shutting down system. calling system.exit(1)", this.threadId);
System.exit(1);
}
LogManager.shutdown();
System.exit(1);
}
}
logger.info("[{}]: shutting down", this.threadId);
logger.info("[{}]: calling stop on kafka channel", this.threadId);
this.kafkaChannel.stop();
logger.debug("[{}]: exiting main run loop", this.threadId);
}
protected void publishEvent(final String msg) throws RepoException {

View File

@ -487,7 +487,6 @@ public class VerticaMetricRepo extends VerticaRepo implements Repo<MetricEnvelop
logger.debug("[{}]: committing transaction took: {}", id, swInner);
swInner.reset().start();
handle.begin();
swInner.stop();