diff --git a/java/src/main/java/monasca/persister/PersisterApplication.java b/java/src/main/java/monasca/persister/PersisterApplication.java index 4f8c44d0..af733298 100644 --- a/java/src/main/java/monasca/persister/PersisterApplication.java +++ b/java/src/main/java/monasca/persister/PersisterApplication.java @@ -31,14 +31,14 @@ import io.dropwizard.setup.Environment; import monasca.common.model.event.AlarmStateTransitionedEvent; import monasca.common.model.metric.MetricEnvelope; import monasca.persister.configuration.PersisterConfig; -import monasca.persister.consumer.Consumer; -import monasca.persister.consumer.ConsumerFactory; +import monasca.persister.consumer.ManagedConsumer; +import monasca.persister.consumer.ManagedConsumerFactory; import monasca.persister.consumer.KafkaChannel; import monasca.persister.consumer.KafkaChannelFactory; -import monasca.persister.consumer.alarmstate.KafkaAlarmStateTransitionConsumer; -import monasca.persister.consumer.alarmstate.KafkaAlarmStateTransitionConsumerFactory; -import monasca.persister.consumer.metric.KafkaMetricsConsumer; -import monasca.persister.consumer.metric.KafkaMetricsConsumerFactory; +import monasca.persister.consumer.KafkaConsumer; +import monasca.persister.consumer.KafkaConsumerFactory; +import monasca.persister.consumer.KafkaConsumerRunnableBasic; +import monasca.persister.consumer.KafkaConsumerRunnableBasicFactory; import monasca.persister.healthcheck.SimpleHealthCheck; import monasca.persister.pipeline.ManagedPipeline; import monasca.persister.pipeline.ManagedPipelineFactory; @@ -97,70 +97,88 @@ public class PersisterApplication extends Application { final KafkaChannelFactory kafkaChannelFactory = injector.getInstance(KafkaChannelFactory.class); - final ConsumerFactory metricsConsumerFactory = - injector.getInstance(Key.get(new TypeLiteral>() { - })); + final ManagedConsumerFactory metricManagedConsumerFactory = + injector.getInstance(Key.get(new TypeLiteral>() {})); // Metrics - final KafkaMetricsConsumerFactory kafkaMetricsConsumerFactory = - injector.getInstance(Key.get(new TypeLiteral>(){})); + final KafkaConsumerFactory kafkaMetricConsumerFactory = + injector.getInstance(Key.get(new TypeLiteral>(){})); + + final KafkaConsumerRunnableBasicFactory kafkaMetricConsumerRunnableBasicFactory = + injector.getInstance(Key.get(new TypeLiteral>(){})); for (int i = 0; i < configuration.getMetricConfiguration().getNumThreads(); i++) { - final KafkaChannel kafkaChannel = - kafkaChannelFactory.create(configuration, configuration.getMetricConfiguration(), i); + String threadId = "metric-" + String.valueOf(i); - final ManagedPipeline metricPipeline = getMetricPipeline( - configuration, i, injector); + final KafkaChannel kafkaMetricChannel = + kafkaChannelFactory.create(configuration.getMetricConfiguration(), threadId); - final KafkaMetricsConsumer kafkaMetricsConsumer = - kafkaMetricsConsumerFactory.create(MetricEnvelope[].class, kafkaChannel, i, metricPipeline); + final ManagedPipeline managedMetricPipeline = + getMetricPipeline(configuration, threadId, injector); - Consumer metricsConsumer = - metricsConsumerFactory.create(kafkaMetricsConsumer, metricPipeline); + KafkaConsumerRunnableBasic kafkaMetricConsumerRunnableBasic = + kafkaMetricConsumerRunnableBasicFactory.create(managedMetricPipeline, kafkaMetricChannel, threadId); - environment.lifecycle().manage(metricsConsumer); + final KafkaConsumer kafkaMetricConsumer = + kafkaMetricConsumerFactory.create(kafkaMetricConsumerRunnableBasic, threadId); + + ManagedConsumer managedMetricConsumer = + metricManagedConsumerFactory.create(kafkaMetricConsumer, threadId); + + environment.lifecycle().manage(managedMetricConsumer); } // AlarmStateTransitions - final ConsumerFactory - alarmStateTransitionsConsumerFactory = injector.getInstance(Key.get(new TypeLiteral - >(){})); + final ManagedConsumerFactory + alarmStateTransitionsManagedConsumerFactory = injector.getInstance(Key.get(new TypeLiteral + >(){})); - final KafkaAlarmStateTransitionConsumerFactory + final KafkaConsumerFactory kafkaAlarmStateTransitionConsumerFactory = - injector.getInstance(Key.get(new TypeLiteral>() {})); + injector.getInstance(Key.get(new TypeLiteral>() { })); + + final KafkaConsumerRunnableBasicFactory kafkaAlarmStateTransitionConsumerRunnableBasicFactory = + injector.getInstance(Key.get(new TypeLiteral>(){})) ; for (int i = 0; i < configuration.getAlarmHistoryConfiguration().getNumThreads(); i++) { - final KafkaChannel kafkaChannel = + String threadId = "alarm-state-transition-" + String.valueOf(i); + + final KafkaChannel kafkaAlarmStateTransitionChannel = kafkaChannelFactory - .create(configuration, configuration.getAlarmHistoryConfiguration(), i); + .create(configuration.getAlarmHistoryConfiguration(), threadId); - final ManagedPipeline pipeline = - getAlarmStateHistoryPipeline(configuration, i, injector); + final ManagedPipeline managedAlarmStateTransitionPipeline = + getAlarmStateHistoryPipeline(configuration, threadId, injector); - final KafkaAlarmStateTransitionConsumer kafkaAlarmStateTransitionConsumer = - kafkaAlarmStateTransitionConsumerFactory.create(AlarmStateTransitionedEvent.class, kafkaChannel, i, pipeline); + KafkaConsumerRunnableBasic kafkaAlarmStateTransitionConsumerRunnableBasic = + kafkaAlarmStateTransitionConsumerRunnableBasicFactory.create(managedAlarmStateTransitionPipeline, kafkaAlarmStateTransitionChannel, threadId); - Consumer alarmStateTransitionConsumer = - alarmStateTransitionsConsumerFactory.create(kafkaAlarmStateTransitionConsumer, pipeline); + final KafkaConsumer kafkaAlarmStateTransitionConsumer = + kafkaAlarmStateTransitionConsumerFactory.create(kafkaAlarmStateTransitionConsumerRunnableBasic, threadId); - environment.lifecycle().manage(alarmStateTransitionConsumer); + ManagedConsumer managedAlarmStateTransitionConsumer = + alarmStateTransitionsManagedConsumerFactory.create(kafkaAlarmStateTransitionConsumer, threadId); + + environment.lifecycle().manage(managedAlarmStateTransitionConsumer); } } - private ManagedPipeline getMetricPipeline(PersisterConfig configuration, int threadNum, + private ManagedPipeline getMetricPipeline( + PersisterConfig configuration, + String threadId, Injector injector) { - logger.debug("Creating metric pipeline..."); + logger.debug("Creating metric pipeline [{}]...", threadId); final int batchSize = configuration.getMetricConfiguration().getBatchSize(); - logger.debug("Batch size for metric pipeline [" + batchSize + "]"); + logger.debug("Batch size for metric pipeline [{}]", batchSize); - MetricHandlerFactory metricEventHandlerFactory = - injector.getInstance(Key.get(new TypeLiteral>(){})); + MetricHandlerFactory metricEventHandlerFactory = + injector.getInstance(MetricHandlerFactory.class); ManagedPipelineFactory managedPipelineFactory = injector.getInstance(Key.get(new TypeLiteral @@ -168,33 +186,34 @@ public class PersisterApplication extends Application { final ManagedPipeline pipeline = managedPipelineFactory.create(metricEventHandlerFactory.create( - configuration.getMetricConfiguration(), threadNum, batchSize)); + configuration.getMetricConfiguration(), threadId, batchSize), threadId); - logger.debug("Instance of metric pipeline fully created"); + logger.debug("Instance of metric pipeline [{}] fully created", threadId); return pipeline; } public ManagedPipeline getAlarmStateHistoryPipeline( - PersisterConfig configuration, int threadNum, Injector injector) { + PersisterConfig configuration, + String threadId, + Injector injector) { - logger.debug("Creating alarm state history pipeline..."); + logger.debug("Creating alarm state history pipeline [{}]...", threadId); int batchSize = configuration.getAlarmHistoryConfiguration().getBatchSize(); - logger.debug("Batch size for each AlarmStateHistoryPipeline [" + batchSize + "]"); + logger.debug("Batch size for each AlarmStateHistoryPipeline [{}]", batchSize); - AlarmStateTransitionedEventHandlerFactory alarmHistoryEventHandlerFactory = - injector.getInstance(Key.get(new TypeLiteral>(){})); + AlarmStateTransitionedEventHandlerFactory alarmHistoryEventHandlerFactory = + injector.getInstance(AlarmStateTransitionedEventHandlerFactory.class); ManagedPipelineFactory alarmStateTransitionPipelineFactory = injector.getInstance(new Key>(){}); ManagedPipeline pipeline = alarmStateTransitionPipelineFactory.create(alarmHistoryEventHandlerFactory.create( - configuration.getAlarmHistoryConfiguration(), threadNum, batchSize)); + configuration.getAlarmHistoryConfiguration(), threadId, batchSize), threadId); - logger.debug("Instance of alarm state history pipeline fully created"); + logger.debug("Instance of alarm state history pipeline [{}] fully created", threadId); return pipeline; } diff --git a/java/src/main/java/monasca/persister/PersisterModule.java b/java/src/main/java/monasca/persister/PersisterModule.java index c1d23546..cf246212 100644 --- a/java/src/main/java/monasca/persister/PersisterModule.java +++ b/java/src/main/java/monasca/persister/PersisterModule.java @@ -30,16 +30,14 @@ import io.dropwizard.setup.Environment; import monasca.common.model.event.AlarmStateTransitionedEvent; import monasca.common.model.metric.MetricEnvelope; import monasca.persister.configuration.PersisterConfig; -import monasca.persister.consumer.Consumer; -import monasca.persister.consumer.ConsumerFactory; +import monasca.persister.consumer.ManagedConsumer; +import monasca.persister.consumer.ManagedConsumerFactory; import monasca.persister.consumer.KafkaChannel; import monasca.persister.consumer.KafkaChannelFactory; +import monasca.persister.consumer.KafkaConsumer; +import monasca.persister.consumer.KafkaConsumerFactory; import monasca.persister.consumer.KafkaConsumerRunnableBasic; import monasca.persister.consumer.KafkaConsumerRunnableBasicFactory; -import monasca.persister.consumer.alarmstate.KafkaAlarmStateTransitionConsumer; -import monasca.persister.consumer.alarmstate.KafkaAlarmStateTransitionConsumerFactory; -import monasca.persister.consumer.metric.KafkaMetricsConsumer; -import monasca.persister.consumer.metric.KafkaMetricsConsumerFactory; import monasca.persister.dbi.DBIProvider; import monasca.persister.pipeline.ManagedPipeline; import monasca.persister.pipeline.ManagedPipelineFactory; @@ -78,15 +76,15 @@ public class PersisterModule extends AbstractModule { install( new FactoryModuleBuilder().implement( - new TypeLiteral>() {}, - new TypeLiteral>() {}) - .build(new TypeLiteral>() {})); + MetricHandler.class, + MetricHandler.class) + .build(MetricHandlerFactory.class)); install( new FactoryModuleBuilder().implement( - new TypeLiteral>() {}, - new TypeLiteral>() {}) - .build(new TypeLiteral>() {})); + AlarmStateTransitionedEventHandler.class, + AlarmStateTransitionedEventHandler.class) + .build(AlarmStateTransitionedEventHandlerFactory.class)); install( new FactoryModuleBuilder().implement( @@ -102,14 +100,14 @@ public class PersisterModule extends AbstractModule { install( new FactoryModuleBuilder().implement( - new TypeLiteral>() {}, - new TypeLiteral>() {}) - .build(new TypeLiteral>() {})); + new TypeLiteral>() {}, + new TypeLiteral>() {}) + .build(new TypeLiteral>() {})); install( new FactoryModuleBuilder().implement( - new TypeLiteral>() {}, - new TypeLiteral>() {}) + new TypeLiteral>() {}, + new TypeLiteral>() {}) .build(new TypeLiteral>() {})); install( @@ -120,21 +118,21 @@ public class PersisterModule extends AbstractModule { install( new FactoryModuleBuilder().implement( - new TypeLiteral>() {}, - new TypeLiteral>() {}) - .build(new TypeLiteral>() {})); + new TypeLiteral>() {}, + new TypeLiteral>() {}) + .build(new TypeLiteral>() {})); install( new FactoryModuleBuilder().implement( - new TypeLiteral>() {}, - new TypeLiteral>() {}) - .build(new TypeLiteral>() {})); + new TypeLiteral>() {}, + new TypeLiteral>() {}) + .build(new TypeLiteral>() {})); install( new FactoryModuleBuilder().implement( - new TypeLiteral>() {}, - new TypeLiteral>() {}) - .build(new TypeLiteral>() {})); + new TypeLiteral>() {}, + new TypeLiteral>() {}) + .build(new TypeLiteral>() {})); install( new FactoryModuleBuilder().implement( diff --git a/java/src/main/java/monasca/persister/consumer/KafkaChannel.java b/java/src/main/java/monasca/persister/consumer/KafkaChannel.java index 17a2e2f1..1c208110 100644 --- a/java/src/main/java/monasca/persister/consumer/KafkaChannel.java +++ b/java/src/main/java/monasca/persister/consumer/KafkaChannel.java @@ -43,16 +43,16 @@ public class KafkaChannel { private final String topic; private final ConsumerConnector consumerConnector; - private final int threadNum; + private final String threadId; @Inject public KafkaChannel( - @Assisted PersisterConfig configuration, + PersisterConfig configuration, @Assisted PipelineConfig pipelineConfig, - @Assisted int threadNum) { + @Assisted String threadId) { this.topic = pipelineConfig.getTopic(); - this.threadNum = threadNum; + this.threadId = threadId; Properties kafkaProperties = createKafkaProperties(configuration.getKafkaConfig(), pipelineConfig); consumerConnector = Consumer.createJavaConsumerConnector(createConsumerConfig(kafkaProperties)); @@ -90,7 +90,7 @@ public class KafkaChannel { properties.put("group.id", pipelineConfig.getGroupId()); properties.put("zookeeper.connect", kafkaConfig.getZookeeperConnect()); properties.put("consumer.id", - String.format("%s_%d", pipelineConfig.getConsumerId(), this.threadNum)); + String.format("%s_%s", pipelineConfig.getConsumerId(), this.threadId)); properties.put("socket.timeout.ms", kafkaConfig.getSocketTimeoutMs().toString()); properties.put("socket.receive.buffer.bytes", kafkaConfig.getSocketReceiveBufferBytes() .toString()); @@ -108,7 +108,7 @@ public class KafkaChannel { .toString()); properties.put("auto.offset.reset", kafkaConfig.getAutoOffsetReset()); properties.put("consumer.timeout.ms", kafkaConfig.getConsumerTimeoutMs().toString()); - properties.put("client.id", String.format("%s_%d", pipelineConfig.getClientId(), threadNum)); + properties.put("client.id", String.format("%s_%s", pipelineConfig.getClientId(), threadId)); properties.put("zookeeper.session.timeout.ms", kafkaConfig .getZookeeperSessionTimeoutMs().toString()); properties.put("zookeeper.connection.timeout.ms", kafkaConfig @@ -117,7 +117,7 @@ public class KafkaChannel { .put("zookeeper.sync.time.ms", kafkaConfig.getZookeeperSyncTimeMs().toString()); for (String key : properties.stringPropertyNames()) { - logger.info(KAFKA_CONFIGURATION + " " + key + " = " + properties.getProperty(key)); + logger.info("[{}]: " + KAFKA_CONFIGURATION + " " + key + " = " + properties.getProperty(key), threadId); } return properties; diff --git a/java/src/main/java/monasca/persister/consumer/KafkaChannelFactory.java b/java/src/main/java/monasca/persister/consumer/KafkaChannelFactory.java index c451a644..c9a9c172 100644 --- a/java/src/main/java/monasca/persister/consumer/KafkaChannelFactory.java +++ b/java/src/main/java/monasca/persister/consumer/KafkaChannelFactory.java @@ -17,13 +17,11 @@ package monasca.persister.consumer; -import monasca.persister.configuration.PersisterConfig; import monasca.persister.configuration.PipelineConfig; public interface KafkaChannelFactory { KafkaChannel create( - PersisterConfig configuration, PipelineConfig pipelineConfig, - int threadNum); + String threadId); } diff --git a/java/src/main/java/monasca/persister/consumer/KafkaConsumer.java b/java/src/main/java/monasca/persister/consumer/KafkaConsumer.java index 0f458e33..96cf0725 100644 --- a/java/src/main/java/monasca/persister/consumer/KafkaConsumer.java +++ b/java/src/main/java/monasca/persister/consumer/KafkaConsumer.java @@ -17,6 +17,9 @@ package monasca.persister.consumer; +import com.google.inject.Inject; +import com.google.inject.assistedinject.Assisted; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,42 +27,65 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -public abstract class KafkaConsumer { +public class KafkaConsumer { private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); private static final int WAIT_TIME = 10; private ExecutorService executorService; - private final KafkaChannel kafkaChannel; - private final int threadNum; - private KafkaConsumerRunnableBasic kafkaConsumerRunnableBasic; - public KafkaConsumer(KafkaChannel kafkaChannel, int threadNum) { - this.kafkaChannel = kafkaChannel; - this.threadNum = threadNum; + private final KafkaConsumerRunnableBasic kafkaConsumerRunnableBasic; + private final String threadId; + + @Inject + public KafkaConsumer( + @Assisted KafkaConsumerRunnableBasic kafkaConsumerRunnableBasic, + @Assisted String threadId) { + + this.kafkaConsumerRunnableBasic = kafkaConsumerRunnableBasic; + this.threadId = threadId; + } - protected abstract KafkaConsumerRunnableBasic createRunnable( - KafkaChannel kafkaChannel, - int threadNumber); - public void start() { + + logger.info("[{}]: start", this.threadId); + executorService = Executors.newFixedThreadPool(1); - kafkaConsumerRunnableBasic = createRunnable(kafkaChannel, this.threadNum); + executorService.submit(kafkaConsumerRunnableBasic); + } public void stop() { + + logger.info("[{}]: stop", this.threadId); + kafkaConsumerRunnableBasic.stop(); + if (executorService != null) { + + logger.info("[{}]: shutting down executor service", this.threadId); + executorService.shutdown(); + try { + + logger.info("[{}]: awaiting termination...", this.threadId); + if (!executorService.awaitTermination(WAIT_TIME, TimeUnit.SECONDS)) { - logger.warn("Did not shut down in {} seconds", WAIT_TIME); + + logger.warn("[{}]: did not shut down in {} seconds", this.threadId, WAIT_TIME); + } + + logger.info("[{}]: terminated", this.threadId); + } catch (InterruptedException e) { - logger.info("awaitTermination interrupted", e); + + logger.info("[{}]: awaitTermination interrupted", this.threadId, e); + } } } diff --git a/java/src/main/java/monasca/persister/consumer/metric/KafkaMetricsConsumerFactory.java b/java/src/main/java/monasca/persister/consumer/KafkaConsumerFactory.java similarity index 65% rename from java/src/main/java/monasca/persister/consumer/metric/KafkaMetricsConsumerFactory.java rename to java/src/main/java/monasca/persister/consumer/KafkaConsumerFactory.java index c18ab4a7..4fb02b7a 100644 --- a/java/src/main/java/monasca/persister/consumer/metric/KafkaMetricsConsumerFactory.java +++ b/java/src/main/java/monasca/persister/consumer/KafkaConsumerFactory.java @@ -14,17 +14,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package monasca.persister.consumer; -package monasca.persister.consumer.metric; -import monasca.persister.consumer.KafkaChannel; -import monasca.persister.pipeline.ManagedPipeline; +public interface KafkaConsumerFactory { -public interface KafkaMetricsConsumerFactory { + KafkaConsumer create( + KafkaConsumerRunnableBasic kafkaConsumerRunnableBasic, + String threadId); - KafkaMetricsConsumer create( - Class clazz, - KafkaChannel kafkaChannel, - int threadNum, - ManagedPipeline pipeline); } diff --git a/java/src/main/java/monasca/persister/consumer/KafkaConsumerRunnableBasic.java b/java/src/main/java/monasca/persister/consumer/KafkaConsumerRunnableBasic.java index 21a9dc60..452846f3 100644 --- a/java/src/main/java/monasca/persister/consumer/KafkaConsumerRunnableBasic.java +++ b/java/src/main/java/monasca/persister/consumer/KafkaConsumerRunnableBasic.java @@ -20,8 +20,6 @@ package monasca.persister.consumer; import com.google.inject.Inject; import com.google.inject.assistedinject.Assisted; -import com.fasterxml.jackson.databind.ObjectMapper; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,61 +29,68 @@ import monasca.persister.pipeline.ManagedPipeline; public class KafkaConsumerRunnableBasic implements Runnable { private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerRunnableBasic.class); + private final KafkaChannel kafkaChannel; - private final int threadNumber; + private final String threadId; private final ManagedPipeline pipeline; private volatile boolean stop = false; - private final ObjectMapper objectMapper; - - private final Class clazz; @Inject public KafkaConsumerRunnableBasic( - @Assisted Class clazz, - @Assisted ObjectMapper objectMapper, @Assisted KafkaChannel kafkaChannel, @Assisted ManagedPipeline pipeline, - @Assisted int threadNumber) { + @Assisted String threadId) { this.kafkaChannel = kafkaChannel; this.pipeline = pipeline; - this.threadNumber = threadNumber; - this.objectMapper = objectMapper; - this.clazz = clazz; + this.threadId = threadId; } protected void publishHeartbeat() { publishEvent(null); } - protected void handleMessage(String message) { + protected void handleMessage(String msg) { try { - final T o = objectMapper.readValue(message, this.clazz); - - publishEvent(o); + publishEvent(msg); } catch (Exception e) { - logger.error("Failed to deserialize JSON message and send to handler: " + message, e); + logger.error( + "[{}]: failed to deserialize JSON message and send to handler: {} ", + threadId, + msg, + e); } } private void markRead() { + + logger.debug("[{}]: marking read", this.threadId); + this.kafkaChannel.markRead(); } public void stop() { + + logger.info("[{}]: stop", this.threadId); + this.stop = true; + + this.pipeline.shutdown(); + } public void run() { + logger.info("[{}]: run", this.threadId); + final ConsumerIterator it = kafkaChannel.getKafkaStream().iterator(); - logger.debug("KafkaChannel {} has stream", this.threadNumber); + logger.debug("[{}]: KafkaChannel has stream iterator", this.threadId); while (!this.stop) { @@ -93,11 +98,11 @@ public class KafkaConsumerRunnableBasic implements Runnable { if (it.hasNext()) { - final String s = new String(it.next().message()); + final String msg = new String(it.next().message()); - logger.debug("Thread {}: {}", threadNumber, s); + logger.debug("[{}]: {}", this.threadId, msg); - handleMessage(s); + handleMessage(msg); } @@ -108,14 +113,14 @@ public class KafkaConsumerRunnableBasic implements Runnable { } } - logger.debug("Shutting down Thread: {}", threadNumber); + logger.info("[{}]: shutting down", this.threadId); this.kafkaChannel.stop(); } - protected void publishEvent(final T event) { + protected void publishEvent(final String msg) { - if (pipeline.publishEvent(event)) { + if (pipeline.publishEvent(msg)) { markRead(); diff --git a/java/src/main/java/monasca/persister/consumer/KafkaConsumerRunnableBasicFactory.java b/java/src/main/java/monasca/persister/consumer/KafkaConsumerRunnableBasicFactory.java index d7ff067a..2c4b0e2a 100644 --- a/java/src/main/java/monasca/persister/consumer/KafkaConsumerRunnableBasicFactory.java +++ b/java/src/main/java/monasca/persister/consumer/KafkaConsumerRunnableBasicFactory.java @@ -16,17 +16,13 @@ */ package monasca.persister.consumer; -import com.fasterxml.jackson.databind.ObjectMapper; - import monasca.persister.pipeline.ManagedPipeline; public interface KafkaConsumerRunnableBasicFactory { KafkaConsumerRunnableBasic create( - ObjectMapper objectMapper, - Class clazz, ManagedPipeline pipeline, KafkaChannel kafkaChannel, - int threadNumber); + String threadId); } diff --git a/java/src/main/java/monasca/persister/consumer/Consumer.java b/java/src/main/java/monasca/persister/consumer/ManagedConsumer.java similarity index 71% rename from java/src/main/java/monasca/persister/consumer/Consumer.java rename to java/src/main/java/monasca/persister/consumer/ManagedConsumer.java index f309b5ba..2a653ecc 100644 --- a/java/src/main/java/monasca/persister/consumer/Consumer.java +++ b/java/src/main/java/monasca/persister/consumer/ManagedConsumer.java @@ -17,8 +17,6 @@ package monasca.persister.consumer; -import monasca.persister.pipeline.ManagedPipeline; - import com.google.inject.Inject; import com.google.inject.assistedinject.Assisted; @@ -27,31 +25,36 @@ import io.dropwizard.lifecycle.Managed; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class Consumer implements Managed { +public class ManagedConsumer implements Managed { + + private static final Logger logger = LoggerFactory.getLogger(ManagedConsumer.class); - private static final Logger logger = LoggerFactory.getLogger(Consumer.class); private final KafkaConsumer consumer; - private final ManagedPipeline pipeline; + private final String threadId; @Inject - public Consumer( + public ManagedConsumer( @Assisted KafkaConsumer kafkaConsumer, - @Assisted ManagedPipeline pipeline) { + @Assisted String threadId) { this.consumer = kafkaConsumer; - this.pipeline = pipeline; + this.threadId = threadId; + } @Override public void start() throws Exception { - logger.debug("start"); - consumer.start(); + + logger.debug("[{}]: start", this.threadId); + + this.consumer.start(); } @Override public void stop() throws Exception { - logger.debug("stop"); - consumer.stop(); - pipeline.shutdown(); + + logger.debug("[{}]: stop", this.threadId); + + this.consumer.stop(); } } diff --git a/java/src/main/java/monasca/persister/consumer/ConsumerFactory.java b/java/src/main/java/monasca/persister/consumer/ManagedConsumerFactory.java similarity index 82% rename from java/src/main/java/monasca/persister/consumer/ConsumerFactory.java rename to java/src/main/java/monasca/persister/consumer/ManagedConsumerFactory.java index 7f9324f6..548e2ddf 100644 --- a/java/src/main/java/monasca/persister/consumer/ConsumerFactory.java +++ b/java/src/main/java/monasca/persister/consumer/ManagedConsumerFactory.java @@ -17,12 +17,10 @@ package monasca.persister.consumer; -import monasca.persister.pipeline.ManagedPipeline; +public interface ManagedConsumerFactory { -public interface ConsumerFactory { - - Consumer create( + ManagedConsumer create( KafkaConsumer kafkaConsumer, - ManagedPipeline pipeline); + String threadId); } diff --git a/java/src/main/java/monasca/persister/consumer/alarmstate/KafkaAlarmStateTransitionConsumer.java b/java/src/main/java/monasca/persister/consumer/alarmstate/KafkaAlarmStateTransitionConsumer.java deleted file mode 100644 index 9749c5ec..00000000 --- a/java/src/main/java/monasca/persister/consumer/alarmstate/KafkaAlarmStateTransitionConsumer.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package monasca.persister.consumer.alarmstate; - -import monasca.persister.consumer.KafkaChannel; -import monasca.persister.consumer.KafkaConsumer; -import monasca.persister.consumer.KafkaConsumerRunnableBasic; -import monasca.persister.consumer.KafkaConsumerRunnableBasicFactory; -import monasca.persister.pipeline.ManagedPipeline; - -import com.google.inject.Inject; -import com.google.inject.assistedinject.Assisted; - -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.PropertyNamingStrategy; - -public class KafkaAlarmStateTransitionConsumer extends KafkaConsumer { - - @Inject - private KafkaConsumerRunnableBasicFactory factory; - - private final ManagedPipeline pipeline; - - private final Class clazz; - - @Inject - public KafkaAlarmStateTransitionConsumer( - @Assisted Class clazz, - @Assisted KafkaChannel kafkaChannel, - @Assisted int threadNum, - @Assisted final ManagedPipeline pipeline) { - - super(kafkaChannel, threadNum); - - this.pipeline = pipeline; - this.clazz = clazz; - } - - @Override - protected KafkaConsumerRunnableBasic createRunnable( - KafkaChannel kafkaChannel, - int threadNumber) { - - ObjectMapper objectMapper = new ObjectMapper(); - objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); - objectMapper.enable(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY); - objectMapper.enable(DeserializationFeature.UNWRAP_ROOT_VALUE); - - return factory.create(objectMapper, clazz, pipeline, kafkaChannel, threadNumber); - } -} diff --git a/java/src/main/java/monasca/persister/consumer/alarmstate/KafkaAlarmStateTransitionConsumerFactory.java b/java/src/main/java/monasca/persister/consumer/alarmstate/KafkaAlarmStateTransitionConsumerFactory.java deleted file mode 100644 index cee36f6f..00000000 --- a/java/src/main/java/monasca/persister/consumer/alarmstate/KafkaAlarmStateTransitionConsumerFactory.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package monasca.persister.consumer.alarmstate; - -import monasca.persister.consumer.KafkaChannel; -import monasca.persister.pipeline.ManagedPipeline; - -public interface KafkaAlarmStateTransitionConsumerFactory { - - KafkaAlarmStateTransitionConsumer create( - Class clazz, - KafkaChannel kafkaChannel, int threadNum, - final ManagedPipeline pipeline); -} diff --git a/java/src/main/java/monasca/persister/consumer/metric/KafkaMetricsConsumer.java b/java/src/main/java/monasca/persister/consumer/metric/KafkaMetricsConsumer.java deleted file mode 100644 index 87ee7d06..00000000 --- a/java/src/main/java/monasca/persister/consumer/metric/KafkaMetricsConsumer.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package monasca.persister.consumer.metric; - -import monasca.persister.consumer.KafkaChannel; -import monasca.persister.consumer.KafkaConsumer; -import monasca.persister.consumer.KafkaConsumerRunnableBasic; -import monasca.persister.consumer.KafkaConsumerRunnableBasicFactory; -import monasca.persister.pipeline.ManagedPipeline; - -import com.google.inject.Inject; -import com.google.inject.assistedinject.Assisted; - -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.PropertyNamingStrategy; - -public class KafkaMetricsConsumer extends KafkaConsumer { - - @Inject - private KafkaConsumerRunnableBasicFactory factory; - - private final ManagedPipeline pipeline; - - private final Class clazz; - - @Inject - public KafkaMetricsConsumer( - @Assisted Class clazz, - @Assisted KafkaChannel kafkaChannel, - @Assisted int threadNum, - @Assisted ManagedPipeline pipeline) { - - super(kafkaChannel, threadNum); - - this.pipeline = pipeline; - this.clazz = clazz; - } - - @Override - protected KafkaConsumerRunnableBasic createRunnable( - KafkaChannel kafkaChannel, - int threadNumber) { - - ObjectMapper objectMapper = new ObjectMapper(); - objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); - objectMapper.enable(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY); - objectMapper.setPropertyNamingStrategy( - PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES); - - return factory.create(objectMapper, clazz, pipeline, kafkaChannel, threadNumber); - } -} diff --git a/java/src/main/java/monasca/persister/pipeline/ManagedPipeline.java b/java/src/main/java/monasca/persister/pipeline/ManagedPipeline.java index d96fb2a2..53b64021 100644 --- a/java/src/main/java/monasca/persister/pipeline/ManagedPipeline.java +++ b/java/src/main/java/monasca/persister/pipeline/ManagedPipeline.java @@ -30,28 +30,34 @@ public class ManagedPipeline { private static final Logger logger = LoggerFactory.getLogger(ManagedPipeline.class); private final FlushableHandler handler; + private final String threadId; @Inject public ManagedPipeline( - @Assisted FlushableHandler handler) { + @Assisted FlushableHandler handler, + @Assisted String threadId) { this.handler = handler; + this.threadId = threadId; } public void shutdown() { + + logger.info("[{}]: shutdown", this.threadId); + handler.flush(); } - public boolean publishEvent(T holder) { + public boolean publishEvent(String msg) { try { - return this.handler.onEvent(holder); + return this.handler.onEvent(msg); } catch (Exception e) { - logger.error("Failed to handle event", e); + logger.error("[{}]: failed to handle msg: {}", msg, e); return false; diff --git a/java/src/main/java/monasca/persister/pipeline/ManagedPipelineFactory.java b/java/src/main/java/monasca/persister/pipeline/ManagedPipelineFactory.java index 366754ac..5a513b61 100644 --- a/java/src/main/java/monasca/persister/pipeline/ManagedPipelineFactory.java +++ b/java/src/main/java/monasca/persister/pipeline/ManagedPipelineFactory.java @@ -20,6 +20,8 @@ import monasca.persister.pipeline.event.FlushableHandler; public interface ManagedPipelineFactory { - ManagedPipeline create(FlushableHandler handler); + ManagedPipeline create( + FlushableHandler handler, + String threadId); } diff --git a/java/src/main/java/monasca/persister/pipeline/event/AlarmStateTransitionedEventHandler.java b/java/src/main/java/monasca/persister/pipeline/event/AlarmStateTransitionedEventHandler.java index 5819ee53..bb9877cc 100644 --- a/java/src/main/java/monasca/persister/pipeline/event/AlarmStateTransitionedEventHandler.java +++ b/java/src/main/java/monasca/persister/pipeline/event/AlarmStateTransitionedEventHandler.java @@ -25,55 +25,75 @@ import com.google.inject.Inject; import com.google.inject.assistedinject.Assisted; import com.codahale.metrics.Counter; +import com.fasterxml.jackson.databind.DeserializationFeature; import io.dropwizard.setup.Environment; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class AlarmStateTransitionedEventHandler extends - FlushableHandler { +public class AlarmStateTransitionedEventHandler extends + FlushableHandler { - private static final Logger logger = LoggerFactory - .getLogger(AlarmStateTransitionedEventHandler.class); + private static final Logger logger = + LoggerFactory.getLogger(AlarmStateTransitionedEventHandler.class); private final AlarmRepo alarmRepo; - private final int ordinal; private final Counter alarmStateTransitionCounter; @Inject public AlarmStateTransitionedEventHandler( AlarmRepo alarmRepo, - @Assisted PipelineConfig configuration, Environment environment, - @Assisted("ordinal") int ordinal, + @Assisted PipelineConfig configuration, + @Assisted("threadId") String threadId, @Assisted("batchSize") int batchSize) { - super(configuration, environment, ordinal, batchSize, - AlarmStateTransitionedEventHandler.class.getName()); + super(configuration, environment, threadId, batchSize); this.alarmRepo = alarmRepo; - this.ordinal = ordinal; - - final String handlerName = String.format("%s[%d]", AlarmStateTransitionedEventHandler.class.getName(), ordinal); this.alarmStateTransitionCounter = - environment.metrics().counter(handlerName + "." + "alarm-state-transitions-added-to-batch-counter"); + environment.metrics() + .counter(this.handlerName + "." + "alarm-state-transitions-added-to-batch-counter"); + } @Override - protected int process(T event) throws Exception { + protected int process(String msg) throws Exception { - logger.debug("Ordinal: {}: {}", this.ordinal, event); + AlarmStateTransitionedEvent alarmStateTransitionedEvent = + objectMapper.readValue(msg, AlarmStateTransitionedEvent.class); - alarmRepo.addToBatch((AlarmStateTransitionedEvent) event); + logger.debug("[{}]: [{}:{}]: {}", + this.threadId, + this.getBatchCount(), + this.getMsgCount(), + alarmStateTransitionedEvent); + + alarmRepo.addToBatch(alarmStateTransitionedEvent); + + this.alarmStateTransitionCounter.inc(); return 1; } + @Override + protected void initObjectMapper() { + + this.objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); + + this.objectMapper.enable(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY); + + this.objectMapper.enable(DeserializationFeature.UNWRAP_ROOT_VALUE); + + } + @Override protected void flushRepository() { - alarmRepo.flush(); + + alarmRepo.flush(this.threadId); + } } diff --git a/java/src/main/java/monasca/persister/pipeline/event/AlarmStateTransitionedEventHandlerFactory.java b/java/src/main/java/monasca/persister/pipeline/event/AlarmStateTransitionedEventHandlerFactory.java index ca4bd29c..9ff8c0fd 100644 --- a/java/src/main/java/monasca/persister/pipeline/event/AlarmStateTransitionedEventHandlerFactory.java +++ b/java/src/main/java/monasca/persister/pipeline/event/AlarmStateTransitionedEventHandlerFactory.java @@ -21,10 +21,10 @@ import monasca.persister.configuration.PipelineConfig; import com.google.inject.assistedinject.Assisted; -public interface AlarmStateTransitionedEventHandlerFactory { +public interface AlarmStateTransitionedEventHandlerFactory { - AlarmStateTransitionedEventHandler create( + AlarmStateTransitionedEventHandler create( PipelineConfig configuration, - @Assisted("ordinal") int ordinal, + @Assisted("threadId") String threadId, @Assisted("batchSize") int batchSize); } diff --git a/java/src/main/java/monasca/persister/pipeline/event/FlushableHandler.java b/java/src/main/java/monasca/persister/pipeline/event/FlushableHandler.java index fbc90d90..1414d774 100644 --- a/java/src/main/java/monasca/persister/pipeline/event/FlushableHandler.java +++ b/java/src/main/java/monasca/persister/pipeline/event/FlushableHandler.java @@ -21,6 +21,7 @@ import monasca.persister.configuration.PipelineConfig; import com.codahale.metrics.Meter; import com.codahale.metrics.Timer; +import com.fasterxml.jackson.databind.ObjectMapper; import io.dropwizard.setup.Environment; @@ -31,61 +32,75 @@ public abstract class FlushableHandler { private static final Logger logger = LoggerFactory.getLogger(FlushableHandler.class); - private final int ordinal; private final int batchSize; - private final String handlerName; - private long millisSinceLastFlush = System.currentTimeMillis(); + private long flushTimeMillis = System.currentTimeMillis(); private final long millisBetweenFlushes; private final int secondsBetweenFlushes; - private int eventCount = 0; - - private final Environment environment; + private int msgCount = 0; + private long batchCount = 0; private final Meter processedMeter; private final Meter commitMeter; private final Timer commitTimer; + protected final String threadId; + + protected ObjectMapper objectMapper = new ObjectMapper(); + + protected final String handlerName; + protected FlushableHandler( PipelineConfig configuration, Environment environment, - int ordinal, - int batchSize, - String baseName) { + String threadId, + int batchSize) { + + this.threadId = threadId; + + this.handlerName = + String.format( + "%s[%s]", + this.getClass().getName(), + threadId); - this.handlerName = String.format("%s[%d]", baseName, ordinal); - this.environment = environment; this.processedMeter = - this.environment.metrics() + environment.metrics() .meter(handlerName + "." + "events-processed-processedMeter"); + this.commitMeter = - this.environment.metrics().meter(handlerName + "." + "commits-executed-processedMeter"); + environment.metrics().meter(handlerName + "." + "commits-executed-processedMeter"); + this.commitTimer = - this.environment.metrics().timer(handlerName + "." + "total-commit-and-flush-timer"); + environment.metrics().timer(handlerName + "." + "total-commit-and-flush-timer"); this.secondsBetweenFlushes = configuration.getMaxBatchTime(); + this.millisBetweenFlushes = secondsBetweenFlushes * 1000; - this.ordinal = ordinal; this.batchSize = batchSize; + + initObjectMapper(); + } + protected abstract void initObjectMapper(); + protected abstract void flushRepository(); - protected abstract int process(T metricEvent) throws Exception; + protected abstract int process(String msg) throws Exception; - public boolean onEvent(final T event) throws Exception { + public boolean onEvent(final String msg) throws Exception { - if (event == null) { + if (msg == null) { - long delta = millisSinceLastFlush + millisBetweenFlushes; - logger.debug("{} received heartbeat message, flush every {} seconds.", this.handlerName, + logger.debug("[{}]: got heartbeat message, flush every {} seconds.", this.threadId, this.secondsBetweenFlushes); - if (delta < System.currentTimeMillis()) { + if (this.flushTimeMillis < System.currentTimeMillis()) { - logger.debug("{}: {} seconds since last flush. Flushing to repository now.", - this.handlerName, delta); + logger.debug("[{}]: {} millis past flush time. flushing to repository now.", + this.threadId, (System.currentTimeMillis() - this.flushTimeMillis)); flush(); @@ -93,35 +108,67 @@ public abstract class FlushableHandler { } else { - logger.debug("{}: {} seconds since last flush. No need to flush at this time.", - this.handlerName, delta); + logger.debug("[{}]: {} millis to next flush time. no need to flush at this time.", + this.threadId, this.flushTimeMillis - System.currentTimeMillis()); + return false; } } - processedMeter.mark(); + this.processedMeter.mark(); - eventCount += process(event); + this.msgCount += process(msg); + + if (this.msgCount >= this.batchSize) { + + logger.debug("[{}]: batch sized {} attained", this.threadId, this.batchSize); - if (eventCount >= batchSize) { flush(); + return true; + } else { + return false; + } } public void flush() { - if (eventCount == 0) { - logger.debug("{}: Nothing to flush", this.handlerName); + + logger.debug("[{}]: flush", this.threadId); + + if (this.msgCount == 0) { + + logger.debug("[{}]: nothing to flush", this.threadId); } - Timer.Context context = commitTimer.time(); + + Timer.Context context = this.commitTimer.time(); + flushRepository(); + context.stop(); - commitMeter.mark(); - millisSinceLastFlush = System.currentTimeMillis(); - logger.debug("{}: Flushed {} events", this.handlerName, this.eventCount); - eventCount = 0; + + this.commitMeter.mark(); + + this.flushTimeMillis = System.currentTimeMillis() + this.millisBetweenFlushes; + + logger.debug("[{}]: flushed {} msg", this.threadId, this.msgCount); + + this.msgCount = 0; + this.batchCount++; + + } + + public long getBatchCount() { + + return this.batchCount; + + } + + public int getMsgCount() { + + return this.msgCount; } } diff --git a/java/src/main/java/monasca/persister/pipeline/event/MetricHandler.java b/java/src/main/java/monasca/persister/pipeline/event/MetricHandler.java index 46e3b1fe..87615624 100644 --- a/java/src/main/java/monasca/persister/pipeline/event/MetricHandler.java +++ b/java/src/main/java/monasca/persister/pipeline/event/MetricHandler.java @@ -21,6 +21,8 @@ import com.google.inject.Inject; import com.google.inject.assistedinject.Assisted; import com.codahale.metrics.Counter; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.PropertyNamingStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,47 +32,43 @@ import monasca.common.model.metric.MetricEnvelope; import monasca.persister.configuration.PipelineConfig; import monasca.persister.repository.MetricRepo; -public class MetricHandler extends FlushableHandler { +public class MetricHandler extends FlushableHandler { - private static final Logger logger = LoggerFactory - .getLogger(MetricHandler.class); + private static final Logger logger = + LoggerFactory.getLogger(MetricHandler.class); private final MetricRepo metricRepo; - private final int ordinal; - private final Counter metricCounter; @Inject public MetricHandler( MetricRepo metricRepo, - @Assisted PipelineConfig configuration, Environment environment, - @Assisted("ordinal") int ordinal, + @Assisted PipelineConfig configuration, + @Assisted("threadId") String threadId, @Assisted("batchSize") int batchSize) { - super(configuration, - environment, - ordinal, - batchSize, - MetricHandler.class.getName()); + super(configuration, environment, threadId, batchSize); this.metricRepo = metricRepo; - this.ordinal = ordinal; - - final String handlerName = String.format("%s[%d]", MetricHandler.class.getName(), ordinal); this.metricCounter = - environment.metrics().counter(handlerName + "." + "metrics-added-to-batch-counter"); + environment.metrics() + .counter(this.handlerName + "." + "metrics-added-to-batch-counter"); } @Override - public int process(T metricEnvelopes) throws Exception { + public int process(String msg) throws Exception { + + MetricEnvelope[] metricEnvelopesArry = + objectMapper.readValue(msg, MetricEnvelope[].class); - MetricEnvelope[] metricEnvelopesArry = (MetricEnvelope[]) metricEnvelopes; for (final MetricEnvelope metricEnvelope : metricEnvelopesArry) { + processEnvelope(metricEnvelope); + } return metricEnvelopesArry.length; @@ -78,17 +76,34 @@ public class MetricHandler extends FlushableHandler { private void processEnvelope(MetricEnvelope metricEnvelope) { - logger.debug("Ordinal: {}: {}", this.ordinal, metricEnvelope); + logger.debug("[{}]: [{}:{}]: {}", + this.threadId, + this.getBatchCount(), + this.getMsgCount(), + metricEnvelope); this.metricRepo.addToBatch(metricEnvelope); - metricCounter.inc(); + this.metricCounter.inc(); + + } + + @Override + protected void initObjectMapper() { + + this.objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); + + this.objectMapper.enable(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY); + + this.objectMapper.setPropertyNamingStrategy( + PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES); } @Override public void flushRepository() { - metricRepo.flush(); + + metricRepo.flush(this.threadId); } } diff --git a/java/src/main/java/monasca/persister/pipeline/event/MetricHandlerFactory.java b/java/src/main/java/monasca/persister/pipeline/event/MetricHandlerFactory.java index 43e60928..d47692f0 100644 --- a/java/src/main/java/monasca/persister/pipeline/event/MetricHandlerFactory.java +++ b/java/src/main/java/monasca/persister/pipeline/event/MetricHandlerFactory.java @@ -21,10 +21,10 @@ import monasca.persister.configuration.PipelineConfig; import com.google.inject.assistedinject.Assisted; -public interface MetricHandlerFactory { +public interface MetricHandlerFactory{ - MetricHandler create( + MetricHandler create( PipelineConfig pipelineConfig, - @Assisted("ordinal") int ordinal, + @Assisted("threadId") String threadId, @Assisted("batchSize") int batchSize); } diff --git a/java/src/main/java/monasca/persister/repository/AlarmRepo.java b/java/src/main/java/monasca/persister/repository/AlarmRepo.java index 814c2441..9d3d5de6 100644 --- a/java/src/main/java/monasca/persister/repository/AlarmRepo.java +++ b/java/src/main/java/monasca/persister/repository/AlarmRepo.java @@ -23,5 +23,5 @@ public interface AlarmRepo { void addToBatch(final AlarmStateTransitionedEvent message); - void flush(); + void flush(String id); } diff --git a/java/src/main/java/monasca/persister/repository/MetricRepo.java b/java/src/main/java/monasca/persister/repository/MetricRepo.java index c14544a5..1c7e78a7 100644 --- a/java/src/main/java/monasca/persister/repository/MetricRepo.java +++ b/java/src/main/java/monasca/persister/repository/MetricRepo.java @@ -23,5 +23,5 @@ public interface MetricRepo { void addToBatch(final MetricEnvelope metricEnvelope); - void flush(); + void flush(String id); } diff --git a/java/src/main/java/monasca/persister/repository/influxdb/InfluxAlarmRepo.java b/java/src/main/java/monasca/persister/repository/influxdb/InfluxAlarmRepo.java index d7d21265..11efd6b8 100644 --- a/java/src/main/java/monasca/persister/repository/influxdb/InfluxAlarmRepo.java +++ b/java/src/main/java/monasca/persister/repository/influxdb/InfluxAlarmRepo.java @@ -50,7 +50,7 @@ public abstract class InfluxAlarmRepo implements AlarmRepo { MetricRegistry.name(getClass(), "alarm_state_history-meter")); } - protected abstract void write () throws Exception; + protected abstract void write (String id) throws Exception; @Override public void addToBatch(AlarmStateTransitionedEvent alarmStateTransitionedEvent) { @@ -61,25 +61,26 @@ public abstract class InfluxAlarmRepo implements AlarmRepo { } @Override - public void flush() { + public void flush(String id) { try { + if (this.alarmStateTransitionedEventList.isEmpty()) { - logger.debug("There are no alarm state transition events to be written to the influxDB"); - logger.debug("Returning from flush"); + logger.debug("[{}]: no alarm state transition msg to be written to the influxDB", id); + logger.debug("[{}]: returning from flush", id); return; } long startTime = System.currentTimeMillis(); Timer.Context context = flushTimer.time(); - write(); + write(id); context.stop(); long endTime = System.currentTimeMillis(); - logger.debug("Commiting batch took {} seconds", (endTime - startTime) / 1000); + logger.debug("[{}]: flushing batch took {} seconds", id, (endTime - startTime) / 1000); } catch (Exception e) { - logger.error("Failed to write alarm state history to database", e); + logger.error("[{}]: failed to write alarm state history to database", id, e); } this.alarmStateTransitionedEventList.clear(); diff --git a/java/src/main/java/monasca/persister/repository/influxdb/InfluxMetricRepo.java b/java/src/main/java/monasca/persister/repository/influxdb/InfluxMetricRepo.java index 578b3a2d..a2b3348d 100644 --- a/java/src/main/java/monasca/persister/repository/influxdb/InfluxMetricRepo.java +++ b/java/src/main/java/monasca/persister/repository/influxdb/InfluxMetricRepo.java @@ -40,7 +40,7 @@ public abstract class InfluxMetricRepo implements MetricRepo { public final com.codahale.metrics.Timer flushTimer; public final Meter measurementMeter; - protected abstract void write() throws Exception; + protected abstract void write(String id) throws Exception; public InfluxMetricRepo(final Environment env) { @@ -72,22 +72,31 @@ public abstract class InfluxMetricRepo implements MetricRepo { @Override - public void flush() { + public void flush(String id) { try { + + if (this.measurementBuffer.isEmpty()) { + logger.debug("[{}]: no metric msg to be written to the influxDB", id); + logger.debug("[{}]: returning from flush", id); + return; + } + final long startTime = System.currentTimeMillis(); final Timer.Context context = flushTimer.time(); - write(); + write(id); final long endTime = System.currentTimeMillis(); context.stop(); - logger.debug("Writing measurements, definitions, and dimensions to InfluxDB took {} seconds", - (endTime - startTime) / 1000); + logger.debug("[{}]: flushing batch took {} seconds", + id, (endTime - startTime) / 1000); } catch (Exception e) { - logger.error("Failed to write measurements to InfluxDB", e); + + logger.error("[{}]: failed to write measurements to InfluxDB", id, e); + } clearBuffers(); diff --git a/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9AlarmRepo.java b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9AlarmRepo.java index ef857193..59adbe8f 100644 --- a/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9AlarmRepo.java +++ b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9AlarmRepo.java @@ -28,8 +28,6 @@ import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.joda.time.format.DateTimeFormatter; import org.joda.time.format.ISODateTimeFormat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.LinkedList; @@ -40,8 +38,6 @@ import io.dropwizard.setup.Environment; public class InfluxV9AlarmRepo extends InfluxAlarmRepo { - private static final Logger logger = LoggerFactory.getLogger(InfluxV9AlarmRepo.class); - private final InfluxV9RepoWriter influxV9RepoWriter; private final ObjectMapper objectMapper = new ObjectMapper(); @@ -60,9 +56,9 @@ public class InfluxV9AlarmRepo extends InfluxAlarmRepo { } @Override - protected void write() throws Exception { + protected void write(String id) throws Exception { - this.influxV9RepoWriter.write(getInfluxPointArry()); + this.influxV9RepoWriter.write(getInfluxPointArry(), id); } diff --git a/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9MetricRepo.java b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9MetricRepo.java index ea413d00..937e78bd 100644 --- a/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9MetricRepo.java +++ b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9MetricRepo.java @@ -41,9 +41,9 @@ public class InfluxV9MetricRepo extends InfluxMetricRepo { } @Override - protected void write() throws Exception { + protected void write(String id) throws Exception { - this.influxV9RepoWriter.write(getInfluxPointArry()); + this.influxV9RepoWriter.write(getInfluxPointArry(), id); } @@ -77,9 +77,7 @@ public class InfluxV9MetricRepo extends InfluxMetricRepo { influxPointList.add(influxPoint); } - } - } return influxPointList.toArray(new InfluxPoint[influxPointList.size()]); @@ -89,11 +87,17 @@ public class InfluxV9MetricRepo extends InfluxMetricRepo { private Map buildValueMap(Measurement measurement) { Map valueMap = new HashMap<>(); + valueMap.put("value", measurement.getValue()); + String valueMetaJSONString = measurement.getValueMetaJSONString(); + if (valueMetaJSONString != null) { + valueMap.put("value_meta", valueMetaJSONString); + } + return valueMap; } @@ -111,10 +115,10 @@ public class InfluxV9MetricRepo extends InfluxMetricRepo { } tagMap.put("_tenant_id", definition.getTenantId()); + tagMap.put("_region", definition.getRegion()); return tagMap; } - } diff --git a/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9RepoWriter.java b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9RepoWriter.java index 6b8288bc..48e2496f 100644 --- a/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9RepoWriter.java +++ b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9RepoWriter.java @@ -123,7 +123,7 @@ public class InfluxV9RepoWriter { } } - protected void write(final InfluxPoint[] influxPointArry) throws Exception { + protected void write(final InfluxPoint[] influxPointArry, String id) throws Exception { HttpPost request = new HttpPost(this.influxUrl); @@ -133,12 +133,14 @@ public class InfluxV9RepoWriter { InfluxWrite influxWrite = new InfluxWrite(this.influxName, this.influxRetentionPolicy, influxPointArry, - new HashMap()); + new HashMap()); String json = this.objectMapper.writeValueAsString(influxWrite); if (this.gzip) { + logger.debug("[{}]: gzip set to true. sending gzip msg", id); + HttpEntity requestEntity = EntityBuilder @@ -155,6 +157,8 @@ public class InfluxV9RepoWriter { } else { + logger.debug("[{}]: gzip set to false. sending non-gzip msg", id); + StringEntity stringEntity = new StringEntity(json, "UTF-8"); request.setEntity(stringEntity); @@ -163,8 +167,8 @@ public class InfluxV9RepoWriter { try { - logger.debug("Writing {} points to influxdb database {} at {}", influxPointArry.length, - this.influxName, this.influxUrl); + logger.debug("[{}]: sending {} points to influxdb database {} at {}", id, + influxPointArry.length, this.influxName, this.influxUrl); HttpResponse response = this.httpClient.execute(request); @@ -173,17 +177,20 @@ public class InfluxV9RepoWriter { if (rc != HttpStatus.SC_OK) { HttpEntity responseEntity = response.getEntity(); + String responseString = EntityUtils.toString(responseEntity, "UTF-8"); - logger.error("Failed to write data to influx database {} at {}: {}", this.influxName, - this.influxUrl, String.valueOf(rc)); - logger.error("Http response: {}", responseString); + + logger.error("[{}]: failed to send data to influxdb database {} at {}: {}", id, + this.influxName, this.influxUrl, String.valueOf(rc)); + + logger.error("[{}]: http response: {}", id, responseString); throw new Exception(rc + ":" + responseString); } logger - .debug("Successfully wrote {} points to influx database {} at {}", influxPointArry.length, - this.influxName, this.influxUrl); + .debug("[{}]: successfully sent {} points to influxdb database {} at {}", id, + influxPointArry.length, this.influxName, this.influxUrl); } finally { diff --git a/java/src/main/java/monasca/persister/repository/influxdb/MeasurementBuffer.java b/java/src/main/java/monasca/persister/repository/influxdb/MeasurementBuffer.java index 5b80125e..978d87af 100644 --- a/java/src/main/java/monasca/persister/repository/influxdb/MeasurementBuffer.java +++ b/java/src/main/java/monasca/persister/repository/influxdb/MeasurementBuffer.java @@ -55,6 +55,12 @@ public class MeasurementBuffer { } + public boolean isEmpty() { + + return this.measurementMap.isEmpty(); + + } + private Map> initDimensionsMap(Definition definition, Dimensions dimensions) { diff --git a/java/src/main/java/monasca/persister/repository/vertica/VerticaAlarmRepo.java b/java/src/main/java/monasca/persister/repository/vertica/VerticaAlarmRepo.java index e2c25827..84ff3c87 100644 --- a/java/src/main/java/monasca/persister/repository/vertica/VerticaAlarmRepo.java +++ b/java/src/main/java/monasca/persister/repository/vertica/VerticaAlarmRepo.java @@ -73,7 +73,7 @@ public class VerticaAlarmRepo extends VerticaRepo implements AlarmRepo { .bind(6, timeStamp); } - public void flush() { + public void flush(String id) { try { commitBatch(); } catch (Exception e) { diff --git a/java/src/main/java/monasca/persister/repository/vertica/VerticaMetricRepo.java b/java/src/main/java/monasca/persister/repository/vertica/VerticaMetricRepo.java index 79a7a9f2..1269b92a 100644 --- a/java/src/main/java/monasca/persister/repository/vertica/VerticaMetricRepo.java +++ b/java/src/main/java/monasca/persister/repository/vertica/VerticaMetricRepo.java @@ -406,7 +406,7 @@ public class VerticaMetricRepo extends VerticaRepo implements MetricRepo { } @Override - public void flush() { + public void flush(String id) { try { long startTime = System.currentTimeMillis(); Timer.Context context = flushTimer.time(); diff --git a/java/src/test/java/monasca/persister/MonPersisterConsumerTest.java b/java/src/test/java/monasca/persister/MonPersisterConsumerTest.java index 14a83c77..6646734c 100644 --- a/java/src/test/java/monasca/persister/MonPersisterConsumerTest.java +++ b/java/src/test/java/monasca/persister/MonPersisterConsumerTest.java @@ -18,8 +18,8 @@ package monasca.persister; import monasca.common.model.metric.MetricEnvelope; -import monasca.persister.consumer.Consumer; -import monasca.persister.consumer.metric.KafkaMetricsConsumer; +import monasca.persister.consumer.ManagedConsumer; +import monasca.persister.consumer.KafkaConsumer; import monasca.persister.pipeline.ManagedPipeline; import monasca.persister.pipeline.event.MetricHandler; @@ -32,10 +32,10 @@ import org.mockito.MockitoAnnotations; public class MonPersisterConsumerTest { @Mock - private KafkaMetricsConsumer kafkaConsumer; + private KafkaConsumer kafkaConsumer; @Mock - private Consumer monConsumer; + private ManagedConsumer monManagedConsumer; private MetricHandler metricHandler; @@ -44,14 +44,14 @@ public class MonPersisterConsumerTest { @Before public void initMocks() { metricHandler = Mockito.mock(MetricHandler.class); - metricPipeline = Mockito.spy(new ManagedPipeline(metricHandler)); + metricPipeline = Mockito.spy(new ManagedPipeline(metricHandler, "metric-1")); MockitoAnnotations.initMocks(this); } @Test public void testKafkaConsumerLifecycle() throws Exception { - monConsumer.start(); - monConsumer.stop(); + monManagedConsumer.start(); + monManagedConsumer.stop(); metricPipeline.shutdown(); Mockito.verify(metricHandler).flush(); }