Make persister shutdown on error

Refactor code to handle errors better. Persister shuts down faster
and more reliably.

On shutdown, messages in buffer will be dropped. On restart,
messages will be read from Kafka and processed again.

Fixes problem with persister not shutting down and restarting 
when a vertica node goes down.

Change-Id: I60b4ba6e06e69e68878f8400e360250e3608e5a5
This commit is contained in:
Deklan Dieterly 2015-12-11 12:59:29 -07:00
parent 023646b3a2
commit 0d8f1b6328
3 changed files with 26 additions and 79 deletions

View File

@ -17,7 +17,6 @@
package monasca.persister.consumer;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
@ -25,16 +24,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
public class KafkaConsumer<T> {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
private static final int WAIT_TIME = 5;
private ExecutorService executorService;
private final KafkaConsumerRunnableBasic<T> kafkaConsumerRunnableBasic;
@ -66,27 +60,5 @@ public class KafkaConsumer<T> {
kafkaConsumerRunnableBasic.stop();
if (executorService != null) {
logger.info("[{}]: shutting down executor service", this.threadId);
try {
logger.info("[{}]: awaiting termination...", this.threadId);
if (!executorService.awaitTermination(WAIT_TIME, TimeUnit.SECONDS)) {
logger.warn("[{}]: did not shut down in {} seconds", this.threadId, WAIT_TIME);
}
logger.info("[{}]: terminated", this.threadId);
} catch (InterruptedException e) {
logger.info("[{}]: awaitTermination interrupted", this.threadId, e);
}
}
}
}

View File

@ -22,12 +22,10 @@ import monasca.persister.pipeline.ManagedPipeline;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import org.apache.log4j.LogManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import kafka.consumer.ConsumerIterator;
import monasca.persister.repository.RepoException;
@ -40,7 +38,6 @@ public class KafkaConsumerRunnableBasic<T> implements Runnable {
private final String threadId;
private final ManagedPipeline<T> pipeline;
private volatile boolean stop = false;
private boolean fatalErrorDetected = false;
private ExecutorService executorService;
@ -83,30 +80,6 @@ public class KafkaConsumerRunnableBasic<T> implements Runnable {
this.stop = true;
try {
if (!this.fatalErrorDetected) {
logger.info("[{}}: shutting pipeline down", this.threadId);
if (pipeline.shutdown()) {
markRead();
}
} else {
logger.info("[{}]: fatal error detected. Exiting immediately without flush", this.threadId);
}
} catch (Exception e) {
logger.error("caught fatal exception while shutting down", e);
}
}
public void run() {
@ -125,7 +98,7 @@ public class KafkaConsumerRunnableBasic<T> implements Runnable {
if (isInterrupted()) {
this.fatalErrorDetected = true;
logger.debug("[{}]: is interrupted", this.threadId);
break;
}
@ -134,7 +107,14 @@ public class KafkaConsumerRunnableBasic<T> implements Runnable {
if (isInterrupted()) {
this.fatalErrorDetected = true;
logger.debug("[{}]: is interrupted", this.threadId);
break;
}
if (this.stop) {
logger.debug("[{}]: is stopped", this.threadId);
break;
}
@ -151,7 +131,14 @@ public class KafkaConsumerRunnableBasic<T> implements Runnable {
if (isInterrupted()) {
this.fatalErrorDetected = true;
logger.debug("[{}]: is interrupted", this.threadId);
break;
}
if (this.stop) {
logger.debug("[{}]: is stopped", this.threadId);
break;
}
@ -163,36 +150,25 @@ public class KafkaConsumerRunnableBasic<T> implements Runnable {
} catch (Throwable e) {
logger.error(
"[{}]: caught fatal exception while publishing msg. Shutting entire persister down now!",
this.threadId, e);
"[{}]: caught fatal exception while publishing msg. Shutting entire persister down "
+ "now!", this.threadId, e);
this.stop = true;
this.fatalErrorDetected = true;
logger.error("[{}]: calling shutdown on executor service", this.threadId);
this.executorService.shutdownNow();
this.executorService.shutdownNow();
try {
this.executorService.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e1) {
logger.info("[{}]: interrupted while awaiting termination", this.threadId, e1);
logger.error("[{}]: shutting down system. calling system.exit(1)", this.threadId);
System.exit(1);
}
LogManager.shutdown();
System.exit(1);
}
}
logger.info("[{}]: shutting down", this.threadId);
logger.info("[{}]: calling stop on kafka channel", this.threadId);
this.kafkaChannel.stop();
logger.debug("[{}]: exiting main run loop", this.threadId);
}
protected void publishEvent(final String msg) throws RepoException {

View File

@ -487,7 +487,6 @@ public class VerticaMetricRepo extends VerticaRepo implements Repo<MetricEnvelop
logger.debug("[{}]: committing transaction took: {}", id, swInner);
swInner.reset().start();
handle.begin();
swInner.stop();