diff --git a/java/src/main/java/monasca/persister/PersisterApplication.java b/java/src/main/java/monasca/persister/PersisterApplication.java index af733298..f3ddc4c7 100644 --- a/java/src/main/java/monasca/persister/PersisterApplication.java +++ b/java/src/main/java/monasca/persister/PersisterApplication.java @@ -42,7 +42,7 @@ import monasca.persister.consumer.KafkaConsumerRunnableBasicFactory; import monasca.persister.healthcheck.SimpleHealthCheck; import monasca.persister.pipeline.ManagedPipeline; import monasca.persister.pipeline.ManagedPipelineFactory; -import monasca.persister.pipeline.event.AlarmStateTransitionedEventHandlerFactory; +import monasca.persister.pipeline.event.AlarmStateTransitionHandlerFactory; import monasca.persister.pipeline.event.MetricHandlerFactory; import monasca.persister.resource.Resource; @@ -203,8 +203,8 @@ public class PersisterApplication extends Application { int batchSize = configuration.getAlarmHistoryConfiguration().getBatchSize(); logger.debug("Batch size for each AlarmStateHistoryPipeline [{}]", batchSize); - AlarmStateTransitionedEventHandlerFactory alarmHistoryEventHandlerFactory = - injector.getInstance(AlarmStateTransitionedEventHandlerFactory.class); + AlarmStateTransitionHandlerFactory alarmHistoryEventHandlerFactory = + injector.getInstance(AlarmStateTransitionHandlerFactory.class); ManagedPipelineFactory alarmStateTransitionPipelineFactory = injector.getInstance(new Key>(){}); diff --git a/java/src/main/java/monasca/persister/PersisterModule.java b/java/src/main/java/monasca/persister/PersisterModule.java index bf658df6..67e49539 100644 --- a/java/src/main/java/monasca/persister/PersisterModule.java +++ b/java/src/main/java/monasca/persister/PersisterModule.java @@ -41,8 +41,8 @@ import monasca.persister.consumer.KafkaConsumerRunnableBasicFactory; import monasca.persister.dbi.DBIProvider; import monasca.persister.pipeline.ManagedPipeline; import monasca.persister.pipeline.ManagedPipelineFactory; -import monasca.persister.pipeline.event.AlarmStateTransitionedEventHandler; -import monasca.persister.pipeline.event.AlarmStateTransitionedEventHandlerFactory; +import monasca.persister.pipeline.event.AlarmStateTransitionHandler; +import monasca.persister.pipeline.event.AlarmStateTransitionHandlerFactory; import monasca.persister.pipeline.event.MetricHandler; import monasca.persister.pipeline.event.MetricHandlerFactory; import monasca.persister.repository.Repo; @@ -81,9 +81,9 @@ public class PersisterModule extends AbstractModule { install( new FactoryModuleBuilder().implement( - AlarmStateTransitionedEventHandler.class, - AlarmStateTransitionedEventHandler.class) - .build(AlarmStateTransitionedEventHandlerFactory.class)); + AlarmStateTransitionHandler.class, + AlarmStateTransitionHandler.class) + .build(AlarmStateTransitionHandlerFactory.class)); install( new FactoryModuleBuilder().implement( diff --git a/java/src/main/java/monasca/persister/consumer/KafkaConsumer.java b/java/src/main/java/monasca/persister/consumer/KafkaConsumer.java index 96cf0725..d8520828 100644 --- a/java/src/main/java/monasca/persister/consumer/KafkaConsumer.java +++ b/java/src/main/java/monasca/persister/consumer/KafkaConsumer.java @@ -17,6 +17,7 @@ package monasca.persister.consumer; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.Inject; import com.google.inject.assistedinject.Assisted; @@ -25,6 +26,7 @@ import org.slf4j.LoggerFactory; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; public class KafkaConsumer { @@ -52,9 +54,14 @@ public class KafkaConsumer { logger.info("[{}]: start", this.threadId); - executorService = Executors.newFixedThreadPool(1); + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat(threadId + "-%d") + .setDaemon(true) + .build(); - executorService.submit(kafkaConsumerRunnableBasic); + executorService = Executors.newSingleThreadExecutor(threadFactory); + + executorService.submit(kafkaConsumerRunnableBasic.setExecutorService(executorService)); } diff --git a/java/src/main/java/monasca/persister/consumer/KafkaConsumerRunnableBasic.java b/java/src/main/java/monasca/persister/consumer/KafkaConsumerRunnableBasic.java index 0c039b0e..b4cbb29b 100644 --- a/java/src/main/java/monasca/persister/consumer/KafkaConsumerRunnableBasic.java +++ b/java/src/main/java/monasca/persister/consumer/KafkaConsumerRunnableBasic.java @@ -17,14 +17,18 @@ package monasca.persister.consumer; +import monasca.persister.pipeline.ManagedPipeline; + import com.google.inject.Inject; import com.google.inject.assistedinject.Assisted; +import org.apache.log4j.LogManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.ExecutorService; + import kafka.consumer.ConsumerIterator; -import monasca.persister.pipeline.ManagedPipeline; public class KafkaConsumerRunnableBasic implements Runnable { @@ -35,6 +39,7 @@ public class KafkaConsumerRunnableBasic implements Runnable { private final ManagedPipeline pipeline; private volatile boolean stop = false; + private ExecutorService executorService; @Inject public KafkaConsumerRunnableBasic( @@ -47,6 +52,14 @@ public class KafkaConsumerRunnableBasic implements Runnable { this.threadId = threadId; } + public KafkaConsumerRunnableBasic setExecutorService(ExecutorService executorService) { + + this.executorService = executorService; + + return this; + + } + protected void publishHeartbeat() { publishEvent(null); @@ -67,9 +80,17 @@ public class KafkaConsumerRunnableBasic implements Runnable { this.stop = true; - if (pipeline.shutdown()) { + try { - markRead(); + if (pipeline.shutdown()) { + + markRead(); + + } + + } catch (Exception e) { + + logger.error("caught fatal exception while shutting down", e); } } @@ -82,37 +103,61 @@ public class KafkaConsumerRunnableBasic implements Runnable { logger.debug("[{}]: KafkaChannel has stream iterator", this.threadId); - while (!this.stop) { + while (!this.stop) { - try { + try { - if (it.hasNext()) { + if (it.hasNext()) { - final String msg = new String(it.next().message()); + final String msg = new String(it.next().message()); - logger.debug("[{}]: {}", this.threadId, msg); + logger.debug("[{}]: {}", this.threadId, msg); - publishEvent(msg); + publishEvent(msg); + + } + + } catch (kafka.consumer.ConsumerTimeoutException cte) { + + publishHeartbeat(); } - } catch (kafka.consumer.ConsumerTimeoutException cte) { + if (Thread.currentThread().isInterrupted()) { - publishHeartbeat(); + logger.debug("[{}]: is interrupted. breaking out of run loop", this.threadId); + break; + + } } + + logger.info("[{}]: shutting down", this.threadId); + + this.kafkaChannel.stop(); + } - logger.info("[{}]: shutting down", this.threadId); - - this.kafkaChannel.stop(); - } protected void publishEvent(final String msg) { - if (pipeline.publishEvent(msg)) { + try { - markRead(); + if (pipeline.publishEvent(msg)) { + + markRead(); + + } + + } catch (Exception e) { + + logger.error("caught fatal exception while publishing msg. Shutting entire persister down now!"); + + this.executorService.shutdownNow(); + + LogManager.shutdown(); + + System.exit(-1); } } diff --git a/java/src/main/java/monasca/persister/pipeline/ManagedPipeline.java b/java/src/main/java/monasca/persister/pipeline/ManagedPipeline.java index 47cc09aa..b2662c3f 100644 --- a/java/src/main/java/monasca/persister/pipeline/ManagedPipeline.java +++ b/java/src/main/java/monasca/persister/pipeline/ManagedPipeline.java @@ -21,6 +21,7 @@ import com.google.inject.Inject; import com.google.inject.assistedinject.Assisted; import monasca.persister.pipeline.event.FlushableHandler; +import monasca.persister.repository.RepoException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,7 +43,7 @@ public class ManagedPipeline { } - public boolean shutdown() { + public boolean shutdown() throws RepoException { logger.info("[{}]: shutdown", this.threadId); @@ -52,26 +53,31 @@ public class ManagedPipeline { return msgFlushCnt > 0 ? true : false; - } catch (Exception e) { + } catch (RepoException e) { logger.error("[{}}: failed to flush repo on shutdown", this.threadId, e); + logger.error( + "[{}]: pipeline broken. repo unavailable. check that database is running. shutting pipeline down now!", + this.threadId); + + throw e; - return false; } } - public boolean publishEvent(String msg) { + public boolean publishEvent(String msg) throws RepoException { try { return this.handler.onEvent(msg); - } catch (Exception e) { + } catch (RepoException e) { logger.error("[{}]: failed to handle msg: {}", this.threadId, msg, e); + logger.error("[{}]: pipeline broken. repo unavailable. check that database is running. shutting pipeline down now!", this.threadId); - return false; + throw e; } } diff --git a/java/src/main/java/monasca/persister/pipeline/event/AlarmStateTransitionedEventHandler.java b/java/src/main/java/monasca/persister/pipeline/event/AlarmStateTransitionHandler.java similarity index 81% rename from java/src/main/java/monasca/persister/pipeline/event/AlarmStateTransitionedEventHandler.java rename to java/src/main/java/monasca/persister/pipeline/event/AlarmStateTransitionHandler.java index a5be430f..dae78c0f 100644 --- a/java/src/main/java/monasca/persister/pipeline/event/AlarmStateTransitionedEventHandler.java +++ b/java/src/main/java/monasca/persister/pipeline/event/AlarmStateTransitionHandler.java @@ -28,29 +28,29 @@ import com.fasterxml.jackson.databind.DeserializationFeature; import io.dropwizard.setup.Environment; import monasca.persister.repository.Repo; +import monasca.persister.repository.RepoException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -public class AlarmStateTransitionedEventHandler extends +public class AlarmStateTransitionHandler extends FlushableHandler { private static final Logger logger = - LoggerFactory.getLogger(AlarmStateTransitionedEventHandler.class); + LoggerFactory.getLogger(AlarmStateTransitionHandler.class); private final Repo alarmRepo; private final Counter alarmStateTransitionCounter; @Inject - public AlarmStateTransitionedEventHandler( - Repo alarmRepo, - Environment environment, - @Assisted PipelineConfig configuration, - @Assisted("threadId") String threadId, - @Assisted("batchSize") int batchSize) { + public AlarmStateTransitionHandler(Repo alarmRepo, + Environment environment, + @Assisted PipelineConfig configuration, + @Assisted("threadId") String threadId, + @Assisted("batchSize") int batchSize) { super(configuration, environment, threadId, batchSize); @@ -104,7 +104,7 @@ public class AlarmStateTransitionedEventHandler extends } @Override - protected int flushRepository() throws Exception { + protected int flushRepository() throws RepoException { return this.alarmRepo.flush(this.threadId); diff --git a/java/src/main/java/monasca/persister/pipeline/event/AlarmStateTransitionedEventHandlerFactory.java b/java/src/main/java/monasca/persister/pipeline/event/AlarmStateTransitionHandlerFactory.java similarity index 89% rename from java/src/main/java/monasca/persister/pipeline/event/AlarmStateTransitionedEventHandlerFactory.java rename to java/src/main/java/monasca/persister/pipeline/event/AlarmStateTransitionHandlerFactory.java index 9ff8c0fd..aaa0d181 100644 --- a/java/src/main/java/monasca/persister/pipeline/event/AlarmStateTransitionedEventHandlerFactory.java +++ b/java/src/main/java/monasca/persister/pipeline/event/AlarmStateTransitionHandlerFactory.java @@ -21,9 +21,9 @@ import monasca.persister.configuration.PipelineConfig; import com.google.inject.assistedinject.Assisted; -public interface AlarmStateTransitionedEventHandlerFactory { +public interface AlarmStateTransitionHandlerFactory { - AlarmStateTransitionedEventHandler create( + AlarmStateTransitionHandler create( PipelineConfig configuration, @Assisted("threadId") String threadId, @Assisted("batchSize") int batchSize); diff --git a/java/src/main/java/monasca/persister/pipeline/event/FlushableHandler.java b/java/src/main/java/monasca/persister/pipeline/event/FlushableHandler.java index 0011a905..91c4b97d 100644 --- a/java/src/main/java/monasca/persister/pipeline/event/FlushableHandler.java +++ b/java/src/main/java/monasca/persister/pipeline/event/FlushableHandler.java @@ -24,6 +24,7 @@ import com.codahale.metrics.Timer; import com.fasterxml.jackson.databind.ObjectMapper; import io.dropwizard.setup.Environment; +import monasca.persister.repository.RepoException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,11 +86,11 @@ public abstract class FlushableHandler { protected abstract void initObjectMapper(); - protected abstract int flushRepository() throws Exception; + protected abstract int flushRepository() throws RepoException; protected abstract int process(String msg); - public boolean onEvent(final String msg) throws Exception { + public boolean onEvent(final String msg) throws RepoException { if (msg == null) { @@ -174,7 +175,7 @@ public abstract class FlushableHandler { } } - public int flush() throws Exception { + public int flush() throws RepoException { logger.debug("[{}]: flushing", this.threadId); diff --git a/java/src/main/java/monasca/persister/pipeline/event/MetricHandler.java b/java/src/main/java/monasca/persister/pipeline/event/MetricHandler.java index 8928018a..76ae2697 100644 --- a/java/src/main/java/monasca/persister/pipeline/event/MetricHandler.java +++ b/java/src/main/java/monasca/persister/pipeline/event/MetricHandler.java @@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import io.dropwizard.setup.Environment; +import monasca.persister.repository.RepoException; public class MetricHandler extends FlushableHandler { @@ -114,7 +115,7 @@ public class MetricHandler extends FlushableHandler { } @Override - public int flushRepository() throws Exception { + public int flushRepository() throws RepoException { return this.metricRepo.flush(this.threadId); } diff --git a/java/src/main/java/monasca/persister/repository/Repo.java b/java/src/main/java/monasca/persister/repository/Repo.java index be7c7d30..3fc118ed 100644 --- a/java/src/main/java/monasca/persister/repository/Repo.java +++ b/java/src/main/java/monasca/persister/repository/Repo.java @@ -20,6 +20,6 @@ public interface Repo { void addToBatch(final T msg, String id); - int flush(String id) throws Exception; + int flush(String id) throws RepoException; } diff --git a/java/src/main/java/monasca/persister/repository/RepoException.java b/java/src/main/java/monasca/persister/repository/RepoException.java new file mode 100644 index 00000000..99b4f3c2 --- /dev/null +++ b/java/src/main/java/monasca/persister/repository/RepoException.java @@ -0,0 +1,30 @@ +package monasca.persister.repository; + +public class RepoException extends Exception { + + public RepoException() { + + super(); + } + + public RepoException(String message) { + + super(message); + } + + public RepoException(String message, Throwable cause) { + + super(message, cause); + } + + public RepoException(Throwable cause) { + + super(cause); + } + + protected RepoException(String message, Throwable cause, boolean enableSuppression, + boolean writableStackTrace) { + + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/java/src/main/java/monasca/persister/repository/influxdb/InfluxRepo.java b/java/src/main/java/monasca/persister/repository/influxdb/InfluxRepo.java index 57cdcf42..385695e4 100644 --- a/java/src/main/java/monasca/persister/repository/influxdb/InfluxRepo.java +++ b/java/src/main/java/monasca/persister/repository/influxdb/InfluxRepo.java @@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory; import io.dropwizard.setup.Environment; import monasca.persister.repository.Repo; +import monasca.persister.repository.RepoException; public abstract class InfluxRepo implements Repo { @@ -38,7 +39,7 @@ public abstract class InfluxRepo implements Repo { } @Override - public int flush(String id) throws Exception { + public int flush(String id) throws RepoException { if (isBufferEmpty()) { @@ -55,7 +56,7 @@ public abstract class InfluxRepo implements Repo { } } - private int writeToRepo(String id) throws Exception { + private int writeToRepo(String id) throws RepoException { try { @@ -87,7 +88,7 @@ public abstract class InfluxRepo implements Repo { protected abstract boolean isBufferEmpty(); - protected abstract int write(String id) throws Exception; + protected abstract int write(String id) throws RepoException; protected abstract void clearBuffers(); } diff --git a/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9AlarmRepo.java b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9AlarmRepo.java index cae13972..0c4ca1c7 100644 --- a/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9AlarmRepo.java +++ b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9AlarmRepo.java @@ -38,6 +38,7 @@ import java.util.List; import java.util.Map; import io.dropwizard.setup.Environment; +import monasca.persister.repository.RepoException; public class InfluxV9AlarmRepo extends InfluxAlarmRepo { @@ -63,13 +64,13 @@ public class InfluxV9AlarmRepo extends InfluxAlarmRepo { } @Override - protected int write(String id) throws Exception { + protected int write(String id) throws RepoException { return this.influxV9RepoWriter.write(getInfluxPointArry(id), id); } - private InfluxPoint[] getInfluxPointArry(String id) throws Exception { + private InfluxPoint[] getInfluxPointArry(String id) { List influxPointList = new LinkedList<>(); diff --git a/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9MetricRepo.java b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9MetricRepo.java index 55922421..feb2e4f3 100644 --- a/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9MetricRepo.java +++ b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9MetricRepo.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import io.dropwizard.setup.Environment; +import monasca.persister.repository.RepoException; public class InfluxV9MetricRepo extends InfluxMetricRepo { @@ -43,13 +44,13 @@ public class InfluxV9MetricRepo extends InfluxMetricRepo { } @Override - protected int write(String id) throws Exception { + protected int write(String id) throws RepoException { return this.influxV9RepoWriter.write(getInfluxPointArry(), id); } - private InfluxPoint[] getInfluxPointArry() throws Exception { + private InfluxPoint[] getInfluxPointArry() { List influxPointList = new LinkedList<>(); diff --git a/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9RepoWriter.java b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9RepoWriter.java index 2ac54a3e..27e63bbf 100644 --- a/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9RepoWriter.java +++ b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9RepoWriter.java @@ -18,9 +18,11 @@ package monasca.persister.repository.influxdb; import monasca.persister.configuration.PersisterConfig; +import monasca.persister.repository.RepoException; import com.google.inject.Inject; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.codec.binary.Base64; @@ -123,7 +125,7 @@ public class InfluxV9RepoWriter { } } - protected int write(final InfluxPoint[] influxPointArry, String id) throws Exception { + protected int write(final InfluxPoint[] influxPointArry, String id) throws RepoException { HttpPost request = new HttpPost(this.influxUrl); @@ -135,7 +137,7 @@ public class InfluxV9RepoWriter { new InfluxWrite(this.influxName, this.influxRetentionPolicy, influxPointArry, new HashMap()); - String json = this.objectMapper.writeValueAsString(influxWrite); + String jsonBody = getJsonBody(influxWrite); if (this.gzip) { @@ -145,7 +147,7 @@ public class InfluxV9RepoWriter { requestEntity = EntityBuilder .create() - .setText(json) + .setText(jsonBody) .setContentType(ContentType.APPLICATION_JSON) .setContentEncoding("UTF-8") .gzipCompress() @@ -159,7 +161,7 @@ public class InfluxV9RepoWriter { logger.debug("[{}]: gzip set to false. sending non-gzip msg", id); - StringEntity stringEntity = new StringEntity(json, "UTF-8"); + StringEntity stringEntity = new StringEntity(jsonBody, "UTF-8"); request.setEntity(stringEntity); @@ -170,34 +172,71 @@ public class InfluxV9RepoWriter { logger.debug("[{}]: sending {} points to influxdb {} at {}", id, influxPointArry.length, this.influxName, this.influxUrl); - HttpResponse response = this.httpClient.execute(request); + HttpResponse response = null; + + try { + + response = this.httpClient.execute(request); + + } catch (IOException e) { + + throw new RepoException("failed to execute http request", e); + } int rc = response.getStatusLine().getStatusCode(); if (rc != HttpStatus.SC_OK) { - HttpEntity responseEntity = response.getEntity(); - - String responseString = EntityUtils.toString(responseEntity, "UTF-8"); - logger.error("[{}]: failed to send data to influxdb {} at {}: {}", id, this.influxName, this.influxUrl, String.valueOf(rc)); + HttpEntity responseEntity = response.getEntity(); + + String responseString = null; + + try { + + responseString = EntityUtils.toString(responseEntity, "UTF-8"); + + } catch (IOException e) { + + throw new RepoException("failed to read http response for non ok return code " + rc, e); + + } + logger.error("[{}]: http response: {}", id, responseString); - throw new Exception(rc + ":" + responseString); + throw new RepoException("failed to execute http request to influxdb " + rc + " - " + responseString); + + } else { + + logger.debug("[{}]: successfully sent {} points to influxdb {} at {}", id, + influxPointArry.length, this.influxName, this.influxUrl); + + return influxPointArry.length; + } - logger - .debug("[{}]: successfully sent {} points to influxdb {} at {}", id, - influxPointArry.length, this.influxName, this.influxUrl); - - return influxPointArry.length; - } finally { request.releaseConnection(); } } + + private String getJsonBody(InfluxWrite influxWrite) throws RepoException { + + String json = null; + + try { + + json = this.objectMapper.writeValueAsString(influxWrite); + + } catch (JsonProcessingException e) { + + throw new RepoException("failed to serialize json", e); + } + + return json; + } } diff --git a/java/src/main/java/monasca/persister/repository/vertica/VerticaAlarmRepo.java b/java/src/main/java/monasca/persister/repository/vertica/VerticaAlarmRepo.java index 02c8b824..4ce07e77 100644 --- a/java/src/main/java/monasca/persister/repository/vertica/VerticaAlarmRepo.java +++ b/java/src/main/java/monasca/persister/repository/vertica/VerticaAlarmRepo.java @@ -39,6 +39,7 @@ import java.util.TimeZone; import javax.inject.Inject; import io.dropwizard.setup.Environment; +import monasca.persister.repository.RepoException; public class VerticaAlarmRepo extends VerticaRepo implements Repo { @@ -105,6 +106,8 @@ public class VerticaAlarmRepo extends VerticaRepo implements Repo { @@ -246,11 +247,11 @@ public class VerticaMetricRepo extends VerticaRepo implements Repo meta = metricEnvelope.meta; + Map metaMap = metricEnvelope.meta; - String tenantId = getMeta(TENANT_ID, metric, meta, id); + String tenantId = getMeta(TENANT_ID, metric, metaMap, id); - String region = getMeta(REGION, metric, meta, id); + String region = getMeta(REGION, metric, metaMap, id); // Add the definition to the batch. StringBuilder definitionIdStringToHash = @@ -298,8 +299,8 @@ public class VerticaMetricRepo extends VerticaRepo implements Repo