diff --git a/src/main/java/com/hpcloud/mon/persister/consumer/Consumer.java b/src/main/java/com/hpcloud/mon/persister/consumer/Consumer.java index 2a7e976e..7648429d 100644 --- a/src/main/java/com/hpcloud/mon/persister/consumer/Consumer.java +++ b/src/main/java/com/hpcloud/mon/persister/consumer/Consumer.java @@ -16,20 +16,23 @@ */ package com.hpcloud.mon.persister.consumer; +import com.hpcloud.mon.persister.disruptor.ManagedDisruptor; + import com.google.inject.Inject; -import com.lmax.disruptor.dsl.Disruptor; -import io.dropwizard.lifecycle.Managed; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class Consumer implements Managed { +import io.dropwizard.lifecycle.Managed; + +public class Consumer implements Managed { private static final Logger logger = LoggerFactory.getLogger(Consumer.class); private final KafkaConsumer consumer; - private final Disruptor disruptor; + private final ManagedDisruptor disruptor; @Inject - public Consumer(KafkaConsumer kafkaConsumer, Disruptor disruptor) { + public Consumer(KafkaConsumer kafkaConsumer, ManagedDisruptor disruptor) { this.consumer = kafkaConsumer; this.disruptor = disruptor; } diff --git a/src/main/java/com/hpcloud/mon/persister/consumer/KafkaConsumer.java b/src/main/java/com/hpcloud/mon/persister/consumer/KafkaConsumer.java index cad89b65..93e26b30 100644 --- a/src/main/java/com/hpcloud/mon/persister/consumer/KafkaConsumer.java +++ b/src/main/java/com/hpcloud/mon/persister/consumer/KafkaConsumer.java @@ -17,20 +17,26 @@ package com.hpcloud.mon.persister.consumer; import com.google.inject.Inject; + import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration; + import kafka.consumer.KafkaStream; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; public abstract class KafkaConsumer { private static final String KAFKA_CONFIGURATION = "Kafka configuration:"; private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); + private static final int WAIT_TIME = 10; + protected final MonPersisterConfiguration configuration; private final Integer numThreads; @@ -38,7 +44,7 @@ public abstract class KafkaConsumer { @Inject private KafkaStreams kafkaStreams; - protected abstract Runnable createRunnable(KafkaStream stream, int threadNumber); + protected abstract Runnable createRunnable(KafkaStream stream, int threadNumber); protected abstract String getStreamName(); @Inject @@ -55,15 +61,23 @@ public abstract class KafkaConsumer { executorService = Executors.newFixedThreadPool(numThreads); int threadNumber = 0; - for (final KafkaStream stream : streams) { + for (final KafkaStream stream : streams) { executorService.submit(createRunnable(stream, threadNumber)); threadNumber++; } } public void stop() { + kafkaStreams.stop(); if (executorService != null) { executorService.shutdown(); + try { + if (!executorService.awaitTermination(WAIT_TIME, TimeUnit.SECONDS)) { + logger.warn("Did not shut down in %d seconds", WAIT_TIME); + } + } catch (InterruptedException e) { + logger.info("awaitTerminiation interrupted", e); + } } } } diff --git a/src/main/java/com/hpcloud/mon/persister/consumer/KafkaStreams.java b/src/main/java/com/hpcloud/mon/persister/consumer/KafkaStreams.java index ba2f02ac..594ad7bb 100644 --- a/src/main/java/com/hpcloud/mon/persister/consumer/KafkaStreams.java +++ b/src/main/java/com/hpcloud/mon/persister/consumer/KafkaStreams.java @@ -89,4 +89,8 @@ public class KafkaStreams { return properties; } + + public void stop() { + consumerConnector.shutdown(); + } } diff --git a/src/main/java/com/hpcloud/mon/persister/consumer/MetricsConsumer.java b/src/main/java/com/hpcloud/mon/persister/consumer/MetricsConsumer.java index be57f5e0..cb73e381 100644 --- a/src/main/java/com/hpcloud/mon/persister/consumer/MetricsConsumer.java +++ b/src/main/java/com/hpcloud/mon/persister/consumer/MetricsConsumer.java @@ -16,13 +16,16 @@ */ package com.hpcloud.mon.persister.consumer; -import com.google.inject.Inject; import com.hpcloud.mon.persister.disruptor.MetricDisruptor; +import com.hpcloud.mon.persister.disruptor.event.MetricHolder; -public class MetricsConsumer extends Consumer { +import com.google.inject.Inject; + +public class MetricsConsumer extends Consumer { @Inject public MetricsConsumer(KafkaMetricsConsumer kafkaConsumer, MetricDisruptor disruptor) { super(kafkaConsumer, disruptor); } + } diff --git a/src/main/java/com/hpcloud/mon/persister/disruptor/AlarmHistoryDisruptorProvider.java b/src/main/java/com/hpcloud/mon/persister/disruptor/AlarmHistoryDisruptorProvider.java index dd5118ab..2769380f 100644 --- a/src/main/java/com/hpcloud/mon/persister/disruptor/AlarmHistoryDisruptorProvider.java +++ b/src/main/java/com/hpcloud/mon/persister/disruptor/AlarmHistoryDisruptorProvider.java @@ -16,13 +16,15 @@ */ package com.hpcloud.mon.persister.disruptor; -import com.google.inject.Inject; -import com.google.inject.Provider; import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration; import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedEventFactory; +import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedEventHandler; import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedEventHandlerFactory; -import com.lmax.disruptor.EventHandler; + +import com.google.inject.Inject; +import com.google.inject.Provider; import com.lmax.disruptor.ExceptionHandler; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,13 +69,14 @@ public class AlarmHistoryDisruptorProvider implements Provider { - public AlarmStateHistoryDisruptor(EventFactory eventFactory, int ringBufferSize, Executor executor) { +public class AlarmStateHistoryDisruptor extends ManagedDisruptor { + public AlarmStateHistoryDisruptor(EventFactory eventFactory, int ringBufferSize, Executor executor) { super(eventFactory, ringBufferSize, executor); } - public AlarmStateHistoryDisruptor(final EventFactory eventFactory, + public AlarmStateHistoryDisruptor(final EventFactory eventFactory, int ringBufferSize, Executor executor, ProducerType producerType, diff --git a/src/main/java/com/hpcloud/mon/persister/disruptor/ManagedDisruptor.java b/src/main/java/com/hpcloud/mon/persister/disruptor/ManagedDisruptor.java new file mode 100644 index 00000000..b6e46f57 --- /dev/null +++ b/src/main/java/com/hpcloud/mon/persister/disruptor/ManagedDisruptor.java @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hpcloud.mon.persister.disruptor; + +import com.hpcloud.mon.persister.disruptor.event.FlushableHandler; + +import com.lmax.disruptor.EventFactory; +import com.lmax.disruptor.WaitStrategy; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.ProducerType; + +import java.util.concurrent.Executor; + +public class ManagedDisruptor extends Disruptor{ + private FlushableHandler[] handlers = new FlushableHandler[0]; + + public ManagedDisruptor(EventFactory eventFactory, int ringBufferSize, Executor executor) { + super(eventFactory, ringBufferSize, executor); + } + + public ManagedDisruptor(final EventFactory eventFactory, + int ringBufferSize, + Executor executor, + ProducerType producerType, + WaitStrategy waitStrategy) { + super(eventFactory, ringBufferSize, executor, producerType, waitStrategy); + } + + @Override + public void shutdown() { + for (FlushableHandler handler : handlers) { + handler.flush(); + } + super.shutdown(); + } + + public void setHandlers(FlushableHandler[] handlers) { + this.handlers = handlers; + } +} diff --git a/src/main/java/com/hpcloud/mon/persister/disruptor/MetricDisruptor.java b/src/main/java/com/hpcloud/mon/persister/disruptor/MetricDisruptor.java index d41dc749..7398c713 100644 --- a/src/main/java/com/hpcloud/mon/persister/disruptor/MetricDisruptor.java +++ b/src/main/java/com/hpcloud/mon/persister/disruptor/MetricDisruptor.java @@ -17,19 +17,20 @@ package com.hpcloud.mon.persister.disruptor; import com.hpcloud.mon.persister.disruptor.event.MetricHolder; + import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.WaitStrategy; -import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import java.util.concurrent.Executor; -public class MetricDisruptor extends Disruptor { - public MetricDisruptor(EventFactory eventFactory, int ringBufferSize, Executor executor) { +public class MetricDisruptor extends ManagedDisruptor { + + public MetricDisruptor(EventFactory eventFactory, int ringBufferSize, Executor executor) { super(eventFactory, ringBufferSize, executor); } - public MetricDisruptor(final EventFactory eventFactory, + public MetricDisruptor(final EventFactory eventFactory, int ringBufferSize, Executor executor, ProducerType producerType, diff --git a/src/main/java/com/hpcloud/mon/persister/disruptor/MetricDisruptorProvider.java b/src/main/java/com/hpcloud/mon/persister/disruptor/MetricDisruptorProvider.java index 3fe3e3a6..386e5e80 100644 --- a/src/main/java/com/hpcloud/mon/persister/disruptor/MetricDisruptorProvider.java +++ b/src/main/java/com/hpcloud/mon/persister/disruptor/MetricDisruptorProvider.java @@ -18,11 +18,14 @@ package com.hpcloud.mon.persister.disruptor; import com.google.inject.Inject; import com.google.inject.Provider; + import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration; import com.hpcloud.mon.persister.disruptor.event.MetricFactory; +import com.hpcloud.mon.persister.disruptor.event.MetricHandler; import com.hpcloud.mon.persister.disruptor.event.MetricHandlerFactory; -import com.lmax.disruptor.EventHandler; + import com.lmax.disruptor.ExceptionHandler; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,13 +71,14 @@ public class MetricDisruptorProvider implements Provider { int numOutputProcessors = configuration.getDisruptorConfiguration().getNumProcessors(); logger.debug("Number of output processors [" + numOutputProcessors + "]"); - EventHandler[] eventHandlers = new EventHandler[numOutputProcessors]; + MetricHandler[] metricHandlers = new MetricHandler[numOutputProcessors]; for (int i = 0; i < numOutputProcessors; ++i) { - eventHandlers[i] = eventHandlerFactory.create(i, numOutputProcessors, batchSize); + metricHandlers[i] = eventHandlerFactory.create(i, numOutputProcessors, batchSize); } - disruptor.handleEventsWith(eventHandlers); + disruptor.handleEventsWith(metricHandlers); + disruptor.setHandlers(metricHandlers); disruptor.start(); logger.debug("Instance of disruptor successfully started"); diff --git a/src/main/java/com/hpcloud/mon/persister/disruptor/event/AlarmStateTransitionedEventHandler.java b/src/main/java/com/hpcloud/mon/persister/disruptor/event/AlarmStateTransitionedEventHandler.java index 57d24378..6bdbb143 100644 --- a/src/main/java/com/hpcloud/mon/persister/disruptor/event/AlarmStateTransitionedEventHandler.java +++ b/src/main/java/com/hpcloud/mon/persister/disruptor/event/AlarmStateTransitionedEventHandler.java @@ -16,21 +16,22 @@ */ package com.hpcloud.mon.persister.disruptor.event; -import com.codahale.metrics.Counter; +import com.hpcloud.mon.common.event.AlarmStateTransitionedEvent; +import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration; +import com.hpcloud.mon.persister.repository.AlarmRepository; + import com.codahale.metrics.Meter; import com.codahale.metrics.Timer; import com.google.inject.Inject; import com.google.inject.assistedinject.Assisted; -import com.hpcloud.mon.common.event.AlarmStateTransitionedEvent; -import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration; -import com.hpcloud.mon.persister.repository.AlarmRepository; -import com.hpcloud.mon.persister.repository.VerticaAlarmRepository; import com.lmax.disruptor.EventHandler; -import io.dropwizard.setup.Environment; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class AlarmStateTransitionedEventHandler implements EventHandler { +import io.dropwizard.setup.Environment; + +public class AlarmStateTransitionedEventHandler implements EventHandler, FlushableHandler { private static final Logger logger = LoggerFactory.getLogger(AlarmStateTransitionedEventHandler.class); private final int ordinal; @@ -42,10 +43,8 @@ public class AlarmStateTransitionedEventHandler implements EventHandler { +public class MetricHandler implements EventHandler, FlushableHandler { private static final Logger logger = LoggerFactory.getLogger(MetricHandler.class); private static final String TENANT_ID = "tenantId"; @@ -56,7 +59,6 @@ public class MetricHandler implements EventHandler { private final int secondsBetweenFlushes; private final MetricRepository verticaMetricRepository; - private final MonPersisterConfiguration configuration; private final Environment environment; private final Counter metricCounter; @@ -76,7 +78,6 @@ public class MetricHandler implements EventHandler { @Assisted("batchSize") int batchSize) { this.verticaMetricRepository = metricRepository; - this.configuration = configuration; this.environment = environment; this.metricCounter = this.environment.metrics().counter(this.getClass().getName() + "." + "metrics-added-to-batch-counter"); this.definitionCounter = this.environment.metrics().counter(this.getClass().getName() + "." + "metric-definitions-added-to-batch-counter"); @@ -215,7 +216,8 @@ public class MetricHandler implements EventHandler { } - private void flush() { + @Override + public void flush() { verticaMetricRepository.flush(); millisSinceLastFlush = System.currentTimeMillis(); } diff --git a/src/main/java/com/hpcloud/mon/persister/repository/RepositoryCommitHeartbeat.java b/src/main/java/com/hpcloud/mon/persister/repository/RepositoryCommitHeartbeat.java index 84e920b5..040d950a 100644 --- a/src/main/java/com/hpcloud/mon/persister/repository/RepositoryCommitHeartbeat.java +++ b/src/main/java/com/hpcloud/mon/persister/repository/RepositoryCommitHeartbeat.java @@ -29,16 +29,10 @@ import org.slf4j.LoggerFactory; public class RepositoryCommitHeartbeat implements Managed { - private static Logger logger = LoggerFactory.getLogger(RepositoryCommitHeartbeat.class); - - private final MetricDisruptor metricDisruptor; - private final AlarmStateHistoryDisruptor alarmHistoryDisruptor; private final HeartbeatRunnable deduperRunnable; @Inject public RepositoryCommitHeartbeat(MetricDisruptor metricDisruptor, AlarmStateHistoryDisruptor alarmHistoryDisruptor) { - this.metricDisruptor = metricDisruptor; - this.alarmHistoryDisruptor = alarmHistoryDisruptor; this.deduperRunnable = new HeartbeatRunnable(metricDisruptor, alarmHistoryDisruptor); } @@ -51,15 +45,19 @@ public class RepositoryCommitHeartbeat implements Managed { @Override public void stop() throws Exception { + this.deduperRunnable.stop(); } private static class HeartbeatRunnable implements Runnable { private static final Logger logger = LoggerFactory.getLogger(HeartbeatRunnable.class); - private final Disruptor metricDisruptor; - private final Disruptor alarmHistoryDisruptor; + private final Disruptor metricDisruptor; + private final Disruptor alarmHistoryDisruptor; - private HeartbeatRunnable(Disruptor metricDisruptor, Disruptor alarmHistoryDisruptor) { + private boolean stop = false; + + private HeartbeatRunnable(MetricDisruptor metricDisruptor, + AlarmStateHistoryDisruptor alarmHistoryDisruptor) { this.metricDisruptor = metricDisruptor; this.alarmHistoryDisruptor = alarmHistoryDisruptor; } @@ -69,7 +67,13 @@ public class RepositoryCommitHeartbeat implements Managed { for (; ; ) { try { // Send a heartbeat every second. - Thread.sleep(1000); + synchronized (this) { + this.wait(1000); + if (stop) { + logger.debug("Heartbeat thread is exiting"); + break; + } + } logger.debug("Waking up after sleeping 1 seconds, yawn..."); // Send heartbeat @@ -96,5 +100,10 @@ public class RepositoryCommitHeartbeat implements Managed { } } + + public synchronized void stop() { + stop = true; + this.notify(); + } } }