Cleanup dependency injection

Change-Id: I2fef7d2059ed1e86782016a4e11d7273e29cb3a0
This commit is contained in:
Deklan Dieterly 2015-04-16 08:06:08 -06:00
parent 1fc3589d8a
commit 4f138c3d44
31 changed files with 416 additions and 541 deletions

@ -17,35 +17,34 @@
package monasca.persister;
import monasca.persister.configuration.PersisterConfig;
import monasca.persister.consumer.alarmstate.AlarmStateTransitionConsumer;
import monasca.persister.consumer.alarmstate.AlarmStateTransitionConsumerFactory;
import monasca.persister.consumer.alarmstate.KafkaAlarmStateTransitionConsumer;
import monasca.persister.consumer.alarmstate.KafkaAlarmStateTransitionConsumerFactory;
import monasca.persister.consumer.KafkaChannel;
import monasca.persister.consumer.KafkaChannelFactory;
import monasca.persister.consumer.metric.KafkaMetricsConsumer;
import monasca.persister.consumer.metric.KafkaMetricsConsumerFactory;
import monasca.persister.consumer.metric.MetricsConsumer;
import monasca.persister.consumer.metric.MetricsConsumerFactory;
import monasca.persister.healthcheck.SimpleHealthCheck;
import monasca.persister.pipeline.AlarmStateTransitionPipeline;
import monasca.persister.pipeline.AlarmStateTransitionPipelineFactory;
import monasca.persister.pipeline.MetricPipeline;
import monasca.persister.pipeline.MetricPipelineFactory;
import monasca.persister.pipeline.event.AlarmStateTransitionedEventHandlerFactory;
import monasca.persister.pipeline.event.MetricHandlerFactory;
import monasca.persister.resource.Resource;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.TypeLiteral;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.dropwizard.Application;
import io.dropwizard.setup.Bootstrap;
import io.dropwizard.setup.Environment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import monasca.common.model.event.AlarmStateTransitionedEvent;
import monasca.common.model.metric.MetricEnvelope;
import monasca.persister.configuration.PersisterConfig;
import monasca.persister.consumer.Consumer;
import monasca.persister.consumer.ConsumerFactory;
import monasca.persister.consumer.KafkaChannel;
import monasca.persister.consumer.KafkaChannelFactory;
import monasca.persister.consumer.alarmstate.KafkaAlarmStateTransitionConsumer;
import monasca.persister.consumer.alarmstate.KafkaAlarmStateTransitionConsumerFactory;
import monasca.persister.consumer.metric.KafkaMetricsConsumer;
import monasca.persister.consumer.metric.KafkaMetricsConsumerFactory;
import monasca.persister.healthcheck.SimpleHealthCheck;
import monasca.persister.pipeline.ManagedPipeline;
import monasca.persister.pipeline.ManagedPipelineFactory;
import monasca.persister.pipeline.event.AlarmStateTransitionedEventHandlerFactory;
import monasca.persister.pipeline.event.MetricHandlerFactory;
import monasca.persister.resource.Resource;
public class PersisterApplication extends Application<PersisterConfig> {
private static final Logger logger = LoggerFactory.getLogger(PersisterApplication.class);
@ -97,40 +96,62 @@ public class PersisterApplication extends Application<PersisterConfig> {
environment.healthChecks().register("test-health-check", new SimpleHealthCheck());
final KafkaChannelFactory kafkaChannelFactory = injector.getInstance(KafkaChannelFactory.class);
final MetricsConsumerFactory metricsConsumerFactory =
injector.getInstance(MetricsConsumerFactory.class);
final KafkaMetricsConsumerFactory kafkaMetricsConsumerFactory =
injector.getInstance(KafkaMetricsConsumerFactory.class);
final ConsumerFactory<MetricEnvelope[]> metricsConsumerFactory =
injector.getInstance(Key.get(new TypeLiteral<ConsumerFactory<MetricEnvelope[]>>() {
}));
// Metrics
final KafkaMetricsConsumerFactory<MetricEnvelope[]> kafkaMetricsConsumerFactory =
injector.getInstance(Key.get(new TypeLiteral<KafkaMetricsConsumerFactory<MetricEnvelope[]>>(){}));
for (int i = 0; i < configuration.getMetricConfiguration().getNumThreads(); i++) {
final KafkaChannel kafkaChannel =
kafkaChannelFactory.create(configuration, configuration.getMetricConfiguration(), i);
final MetricPipeline metricPipeline = getMetricPipeline(configuration, i, injector);
final KafkaMetricsConsumer kafkaMetricsConsumer =
kafkaMetricsConsumerFactory.create(kafkaChannel, i, metricPipeline);
MetricsConsumer metricsConsumer =
final ManagedPipeline<MetricEnvelope[]> metricPipeline = getMetricPipeline(
configuration, i, injector);
final KafkaMetricsConsumer<MetricEnvelope[]> kafkaMetricsConsumer =
kafkaMetricsConsumerFactory.create(MetricEnvelope[].class, kafkaChannel, i, metricPipeline);
Consumer<MetricEnvelope[]> metricsConsumer =
metricsConsumerFactory.create(kafkaMetricsConsumer, metricPipeline);
environment.lifecycle().manage(metricsConsumer);
}
final AlarmStateTransitionConsumerFactory alarmStateTransitionsConsumerFactory =
injector.getInstance(AlarmStateTransitionConsumerFactory.class);
final KafkaAlarmStateTransitionConsumerFactory kafkaAlarmStateTransitionConsumerFactory =
injector.getInstance(KafkaAlarmStateTransitionConsumerFactory.class);
// AlarmStateTransitions
final ConsumerFactory<AlarmStateTransitionedEvent>
alarmStateTransitionsConsumerFactory = injector.getInstance(Key.get(new TypeLiteral
<ConsumerFactory<AlarmStateTransitionedEvent>>(){}));
final KafkaAlarmStateTransitionConsumerFactory<AlarmStateTransitionedEvent>
kafkaAlarmStateTransitionConsumerFactory =
injector.getInstance(Key.get(new TypeLiteral<KafkaAlarmStateTransitionConsumerFactory<AlarmStateTransitionedEvent
>>() {}));
for (int i = 0; i < configuration.getAlarmHistoryConfiguration().getNumThreads(); i++) {
final KafkaChannel kafkaChannel =
kafkaChannelFactory
.create(configuration, configuration.getAlarmHistoryConfiguration(), i);
final AlarmStateTransitionPipeline pipeline =
final ManagedPipeline<AlarmStateTransitionedEvent> pipeline =
getAlarmStateHistoryPipeline(configuration, i, injector);
final KafkaAlarmStateTransitionConsumer kafkaAlarmStateTransitionConsumer =
kafkaAlarmStateTransitionConsumerFactory.create(kafkaChannel, i, pipeline);
AlarmStateTransitionConsumer alarmStateTransitionConsumer =
final KafkaAlarmStateTransitionConsumer<AlarmStateTransitionedEvent> kafkaAlarmStateTransitionConsumer =
kafkaAlarmStateTransitionConsumerFactory.create(AlarmStateTransitionedEvent.class, kafkaChannel, i, pipeline);
Consumer<AlarmStateTransitionedEvent> alarmStateTransitionConsumer =
alarmStateTransitionsConsumerFactory.create(kafkaAlarmStateTransitionConsumer, pipeline);
environment.lifecycle().manage(alarmStateTransitionConsumer);
}
}
private MetricPipeline getMetricPipeline(PersisterConfig configuration, int threadNum,
private ManagedPipeline<MetricEnvelope[]> getMetricPipeline(PersisterConfig configuration, int threadNum,
Injector injector) {
logger.debug("Creating metric pipeline...");
@ -138,11 +159,15 @@ public class PersisterApplication extends Application<PersisterConfig> {
final int batchSize = configuration.getMetricConfiguration().getBatchSize();
logger.debug("Batch size for metric pipeline [" + batchSize + "]");
MetricHandlerFactory metricEventHandlerFactory =
injector.getInstance(MetricHandlerFactory.class);
MetricPipelineFactory metricPipelineFactory = injector.getInstance(MetricPipelineFactory.class);
final MetricPipeline pipeline =
metricPipelineFactory.create(metricEventHandlerFactory.create(
MetricHandlerFactory<MetricEnvelope[]> metricEventHandlerFactory =
injector.getInstance(Key.get(new TypeLiteral<MetricHandlerFactory<MetricEnvelope[]>>(){}));
ManagedPipelineFactory<MetricEnvelope[]>
managedPipelineFactory = injector.getInstance(Key.get(new TypeLiteral
<ManagedPipelineFactory<MetricEnvelope[]>>(){}));
final ManagedPipeline<MetricEnvelope[]> pipeline =
managedPipelineFactory.create(metricEventHandlerFactory.create(
configuration.getMetricConfiguration(), threadNum, batchSize));
logger.debug("Instance of metric pipeline fully created");
@ -150,20 +175,22 @@ public class PersisterApplication extends Application<PersisterConfig> {
return pipeline;
}
public AlarmStateTransitionPipeline getAlarmStateHistoryPipeline(
public ManagedPipeline<AlarmStateTransitionedEvent> getAlarmStateHistoryPipeline(
PersisterConfig configuration, int threadNum, Injector injector) {
logger.debug("Creating alarm state history pipeline...");
int batchSize = configuration.getAlarmHistoryConfiguration().getBatchSize();
logger.debug("Batch size for each AlarmStateHistoryPipeline [" + batchSize + "]");
AlarmStateTransitionedEventHandlerFactory alarmHistoryEventHandlerFactory =
injector.getInstance(AlarmStateTransitionedEventHandlerFactory.class);
AlarmStateTransitionPipelineFactory alarmStateTransitionPipelineFactory =
injector.getInstance(AlarmStateTransitionPipelineFactory.class);
AlarmStateTransitionedEventHandlerFactory<AlarmStateTransitionedEvent> alarmHistoryEventHandlerFactory =
injector.getInstance(Key.get(new TypeLiteral<AlarmStateTransitionedEventHandlerFactory
<AlarmStateTransitionedEvent>>(){}));
AlarmStateTransitionPipeline pipeline =
ManagedPipelineFactory<AlarmStateTransitionedEvent> alarmStateTransitionPipelineFactory =
injector.getInstance(new Key<ManagedPipelineFactory<AlarmStateTransitionedEvent>>(){});
ManagedPipeline<AlarmStateTransitionedEvent> pipeline =
alarmStateTransitionPipelineFactory.create(alarmHistoryEventHandlerFactory.create(
configuration.getAlarmHistoryConfiguration(), threadNum, batchSize));

@ -19,6 +19,7 @@ package monasca.persister;
import com.google.inject.AbstractModule;
import com.google.inject.Scopes;
import com.google.inject.TypeLiteral;
import com.google.inject.assistedinject.FactoryModuleBuilder;
import org.skife.jdbi.v2.DBI;
@ -26,26 +27,22 @@ import org.skife.jdbi.v2.DBI;
import javax.inject.Singleton;
import io.dropwizard.setup.Environment;
import monasca.common.model.event.AlarmStateTransitionedEvent;
import monasca.common.model.metric.MetricEnvelope;
import monasca.persister.configuration.PersisterConfig;
import monasca.persister.consumer.alarmstate.AlarmStateTransitionConsumer;
import monasca.persister.consumer.alarmstate.AlarmStateTransitionConsumerFactory;
import monasca.persister.consumer.alarmstate.KafkaAlarmStateTransitionConsumer;
import monasca.persister.consumer.alarmstate.KafkaAlarmStateTransitionConsumerFactory;
import monasca.persister.consumer.alarmstate.KafkaAlarmStateTransitionConsumerRunnableBasic;
import monasca.persister.consumer.alarmstate.KafkaAlarmStateTransitionConsumerRunnableBasicFactory;
import monasca.persister.consumer.Consumer;
import monasca.persister.consumer.ConsumerFactory;
import monasca.persister.consumer.KafkaChannel;
import monasca.persister.consumer.KafkaChannelFactory;
import monasca.persister.consumer.KafkaConsumerRunnableBasic;
import monasca.persister.consumer.KafkaConsumerRunnableBasicFactory;
import monasca.persister.consumer.alarmstate.KafkaAlarmStateTransitionConsumer;
import monasca.persister.consumer.alarmstate.KafkaAlarmStateTransitionConsumerFactory;
import monasca.persister.consumer.metric.KafkaMetricsConsumer;
import monasca.persister.consumer.metric.KafkaMetricsConsumerFactory;
import monasca.persister.consumer.metric.KafkaMetricsConsumerRunnableBasic;
import monasca.persister.consumer.metric.KafkaMetricsConsumerRunnableBasicFactory;
import monasca.persister.consumer.metric.MetricsConsumer;
import monasca.persister.consumer.metric.MetricsConsumerFactory;
import monasca.persister.dbi.DBIProvider;
import monasca.persister.pipeline.AlarmStateTransitionPipeline;
import monasca.persister.pipeline.AlarmStateTransitionPipelineFactory;
import monasca.persister.pipeline.MetricPipeline;
import monasca.persister.pipeline.MetricPipelineFactory;
import monasca.persister.pipeline.ManagedPipelineFactory;
import monasca.persister.pipeline.ManagedPipeline;
import monasca.persister.pipeline.event.AlarmStateTransitionedEventHandler;
import monasca.persister.pipeline.event.AlarmStateTransitionedEventHandlerFactory;
import monasca.persister.pipeline.event.MetricHandler;
@ -82,53 +79,69 @@ public class PersisterModule extends AbstractModule {
bind(PersisterConfig.class).toInstance(config);
bind(Environment.class).toInstance(env);
install(new FactoryModuleBuilder().implement(MetricHandler.class, MetricHandler.class).build(
MetricHandlerFactory.class));
install(
new FactoryModuleBuilder().implement(
new TypeLiteral<MetricHandler<MetricEnvelope[]>>() {},
new TypeLiteral<MetricHandler<MetricEnvelope[]>>() {})
.build(new TypeLiteral<MetricHandlerFactory<MetricEnvelope[]>>() {}));
install(new FactoryModuleBuilder().implement(AlarmStateTransitionedEventHandler.class,
AlarmStateTransitionedEventHandler.class).build(
AlarmStateTransitionedEventHandlerFactory.class));
install(
new FactoryModuleBuilder().implement(
new TypeLiteral<AlarmStateTransitionedEventHandler<AlarmStateTransitionedEvent>>() {},
new TypeLiteral<AlarmStateTransitionedEventHandler<AlarmStateTransitionedEvent>>() {})
.build(new TypeLiteral<AlarmStateTransitionedEventHandlerFactory<AlarmStateTransitionedEvent>>() {}));
install(new FactoryModuleBuilder().implement(KafkaMetricsConsumerRunnableBasic.class,
KafkaMetricsConsumerRunnableBasic.class).build(
KafkaMetricsConsumerRunnableBasicFactory.class));
install(
new FactoryModuleBuilder().implement(
new TypeLiteral<KafkaConsumerRunnableBasic<MetricEnvelope[]>>() {},
new TypeLiteral<KafkaConsumerRunnableBasic<MetricEnvelope[]>>() {})
.build(new TypeLiteral<KafkaConsumerRunnableBasicFactory<MetricEnvelope[]>>() {}));
install(new FactoryModuleBuilder().implement(
KafkaAlarmStateTransitionConsumerRunnableBasic.class,
KafkaAlarmStateTransitionConsumerRunnableBasic.class).build(
KafkaAlarmStateTransitionConsumerRunnableBasicFactory.class));
install(
new FactoryModuleBuilder().implement(
new TypeLiteral<KafkaConsumerRunnableBasic<AlarmStateTransitionedEvent>>() {},
new TypeLiteral<KafkaConsumerRunnableBasic<AlarmStateTransitionedEvent>>() {})
.build(new TypeLiteral<KafkaConsumerRunnableBasicFactory<AlarmStateTransitionedEvent>>() {}));
install(new FactoryModuleBuilder().implement(
KafkaMetricsConsumer.class,
KafkaMetricsConsumer.class).build(
KafkaMetricsConsumerFactory.class));
install(
new FactoryModuleBuilder().implement(
new TypeLiteral<KafkaMetricsConsumer<MetricEnvelope[]>>() {},
new TypeLiteral<KafkaMetricsConsumer<MetricEnvelope[]>>() {})
.build(new TypeLiteral<KafkaMetricsConsumerFactory<MetricEnvelope[]>>() {}));
install(new FactoryModuleBuilder().implement(
MetricPipeline.class,
MetricPipeline.class).build(
MetricPipelineFactory.class));
install(
new FactoryModuleBuilder().implement(
new TypeLiteral<ManagedPipeline<MetricEnvelope[]>>() {},
new TypeLiteral<ManagedPipeline<MetricEnvelope[]>>() {})
.build(new TypeLiteral<ManagedPipelineFactory<MetricEnvelope[]>>() {}));
install(new FactoryModuleBuilder().implement(
AlarmStateTransitionPipeline.class,
AlarmStateTransitionPipeline.class).build(
AlarmStateTransitionPipelineFactory.class));
install(
new FactoryModuleBuilder().implement(
new TypeLiteral<ManagedPipeline<AlarmStateTransitionedEvent>>() {},
new TypeLiteral<ManagedPipeline<AlarmStateTransitionedEvent>>() {})
.build(new TypeLiteral<ManagedPipelineFactory<AlarmStateTransitionedEvent>>() {}));
install(new FactoryModuleBuilder().implement(
AlarmStateTransitionConsumer.class,
AlarmStateTransitionConsumer.class).build(
AlarmStateTransitionConsumerFactory.class));
install(
new FactoryModuleBuilder().implement(
new TypeLiteral<Consumer<AlarmStateTransitionedEvent>>() {},
new TypeLiteral<Consumer<AlarmStateTransitionedEvent>>() {})
.build(new TypeLiteral<ConsumerFactory<AlarmStateTransitionedEvent>>() {}));
install(new FactoryModuleBuilder().implement(
KafkaAlarmStateTransitionConsumer.class,
KafkaAlarmStateTransitionConsumer.class).build(
KafkaAlarmStateTransitionConsumerFactory.class));
install(
new FactoryModuleBuilder().implement(
new TypeLiteral<KafkaAlarmStateTransitionConsumer<AlarmStateTransitionedEvent>>() {},
new TypeLiteral<KafkaAlarmStateTransitionConsumer<AlarmStateTransitionedEvent>>() {})
.build(new TypeLiteral<KafkaAlarmStateTransitionConsumerFactory<AlarmStateTransitionedEvent>>() {}));
install(new FactoryModuleBuilder().implement(
MetricsConsumer.class,
MetricsConsumer.class).build(MetricsConsumerFactory.class));
install(
new FactoryModuleBuilder().implement(
new TypeLiteral<Consumer<MetricEnvelope[]>>() {},
new TypeLiteral<Consumer<MetricEnvelope[]>>() {})
.build(new TypeLiteral<ConsumerFactory<MetricEnvelope[]>>() {}));
install(new FactoryModuleBuilder().implement(KafkaChannel.class, KafkaChannel.class).build(
KafkaChannelFactory.class));
install(
new FactoryModuleBuilder().implement(
KafkaChannel.class, KafkaChannel.class).build(KafkaChannelFactory.class));
if (config.getDatabaseConfiguration().getDatabaseType().equalsIgnoreCase(VERTICA)) {
@ -139,10 +152,10 @@ public class PersisterModule extends AbstractModule {
} else if (config.getDatabaseConfiguration().getDatabaseType().equalsIgnoreCase(INFLUXDB)) {
// Check for null to not break existing configs. If no version, default to V8.
if (config.getInfluxDBConfiguration().getVersion() == null ||
config.getInfluxDBConfiguration().getVersion().equalsIgnoreCase(INFLUXDB_V8)) {
if (config.getInfluxDBConfiguration().getVersion() == null || config
.getInfluxDBConfiguration().getVersion().equalsIgnoreCase(INFLUXDB_V8)) {
bind (InfluxV8RepoWriter.class);
bind(InfluxV8RepoWriter.class);
bind(MetricRepo.class).to(InfluxV8MetricRepo.class);
bind(AlarmRepo.class).to(InfluxV8AlarmRepo.class);

@ -20,6 +20,7 @@ package monasca.persister.consumer;
import monasca.persister.pipeline.ManagedPipeline;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import io.dropwizard.lifecycle.Managed;
@ -33,7 +34,10 @@ public class Consumer<T> implements Managed {
private final ManagedPipeline<T> pipeline;
@Inject
public Consumer(KafkaConsumer<T> kafkaConsumer, ManagedPipeline<T> pipeline) {
public Consumer(
@Assisted KafkaConsumer<T> kafkaConsumer,
@Assisted ManagedPipeline<T> pipeline) {
this.consumer = kafkaConsumer;
this.pipeline = pipeline;
}

@ -15,10 +15,14 @@
* limitations under the License.
*/
package monasca.persister.pipeline;
package monasca.persister.consumer;
import monasca.persister.pipeline.event.AlarmStateTransitionedEventHandler;
import monasca.persister.pipeline.ManagedPipeline;
public interface ConsumerFactory<T> {
Consumer<T> create(
KafkaConsumer<T> kafkaConsumer,
ManagedPipeline<T> pipeline);
public interface AlarmStateTransitionPipelineFactory {
AlarmStateTransitionPipeline create(AlarmStateTransitionedEventHandler handler);
}

@ -46,8 +46,11 @@ public class KafkaChannel {
private final int threadNum;
@Inject
public KafkaChannel(@Assisted PersisterConfig configuration,
@Assisted PipelineConfig pipelineConfig, @Assisted int threadNum) {
public KafkaChannel(
@Assisted PersisterConfig configuration,
@Assisted PipelineConfig pipelineConfig,
@Assisted int threadNum) {
this.topic = pipelineConfig.getTopic();
this.threadNum = threadNum;
Properties kafkaProperties =

@ -21,6 +21,9 @@ import monasca.persister.configuration.PersisterConfig;
import monasca.persister.configuration.PipelineConfig;
public interface KafkaChannelFactory {
KafkaChannel create(PersisterConfig configuration,
PipelineConfig pipelineConfig, int threadNum);
KafkaChannel create(
PersisterConfig configuration,
PipelineConfig pipelineConfig,
int threadNum);
}

@ -40,7 +40,8 @@ public abstract class KafkaConsumer<T> {
this.threadNum = threadNum;
}
protected abstract KafkaConsumerRunnableBasic<T> createRunnable(KafkaChannel kafkaChannel,
protected abstract KafkaConsumerRunnableBasic<T> createRunnable(
KafkaChannel kafkaChannel,
int threadNumber);
public void start() {

@ -17,14 +17,18 @@
package monasca.persister.consumer;
import monasca.persister.pipeline.ManagedPipeline;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import kafka.consumer.ConsumerIterator;
import monasca.persister.pipeline.ManagedPipeline;
public abstract class KafkaConsumerRunnableBasic<T> implements Runnable {
public class KafkaConsumerRunnableBasic<T> implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerRunnableBasic.class);
private final KafkaChannel kafkaChannel;
@ -32,17 +36,42 @@ public abstract class KafkaConsumerRunnableBasic<T> implements Runnable {
private final ManagedPipeline<T> pipeline;
private volatile boolean stop = false;
public KafkaConsumerRunnableBasic(KafkaChannel kafkaChannel,
ManagedPipeline<T> pipeline,
int threadNumber) {
private final ObjectMapper objectMapper;
private final Class<T> clazz;
@Inject
public KafkaConsumerRunnableBasic(
@Assisted Class<T> clazz,
@Assisted ObjectMapper objectMapper,
@Assisted KafkaChannel kafkaChannel,
@Assisted ManagedPipeline<T> pipeline,
@Assisted int threadNumber) {
this.kafkaChannel = kafkaChannel;
this.pipeline = pipeline;
this.threadNumber = threadNumber;
this.objectMapper = objectMapper;
this.clazz = clazz;
}
abstract protected void publishHeartbeat();
protected void publishHeartbeat() {
publishEvent(null);
}
abstract protected void handleMessage(String message);
protected void handleMessage(String message) {
try {
final T o = objectMapper.readValue(message, this.clazz);
publishEvent(o);
} catch (Exception e) {
logger.error("Failed to deserialize JSON message and send to handler: " + message, e);
}
}
private void markRead() {
this.kafkaChannel.markRead();
@ -53,28 +82,43 @@ public abstract class KafkaConsumerRunnableBasic<T> implements Runnable {
}
public void run() {
final ConsumerIterator<byte[], byte[]> it = kafkaChannel.getKafkaStream().iterator();
logger.debug("KafkaChannel {} has stream", this.threadNumber);
while (!this.stop) {
try {
if (it.hasNext()) {
final String s = new String(it.next().message());
logger.debug("Thread {}: {}", threadNumber, s);
handleMessage(s);
}
} catch (kafka.consumer.ConsumerTimeoutException cte) {
publishHeartbeat();
}
}
logger.debug("Shutting down Thread: {}", threadNumber);
this.kafkaChannel.stop();
}
protected void publishEvent(final T event) {
if (pipeline.publishEvent(event)) {
markRead();
}
}
}

@ -14,13 +14,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package monasca.persister.consumer;
package monasca.persister.consumer.alarmstate;
import com.fasterxml.jackson.databind.ObjectMapper;
import monasca.persister.consumer.KafkaChannel;
import monasca.persister.pipeline.AlarmStateTransitionPipeline;
import monasca.persister.pipeline.ManagedPipeline;
public interface KafkaAlarmStateTransitionConsumerRunnableBasicFactory {
KafkaAlarmStateTransitionConsumerRunnableBasic create(AlarmStateTransitionPipeline pipeline, KafkaChannel kafkaChannel,
public interface KafkaConsumerRunnableBasicFactory<T> {
KafkaConsumerRunnableBasic<T> create(
ObjectMapper objectMapper,
Class<T> clazz,
ManagedPipeline<T> pipeline,
KafkaChannel kafkaChannel,
int threadNumber);
}

@ -1,34 +0,0 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package monasca.persister.consumer.alarmstate;
import monasca.common.model.event.AlarmStateTransitionedEvent;
import monasca.persister.consumer.Consumer;
import monasca.persister.pipeline.AlarmStateTransitionPipeline;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
public class AlarmStateTransitionConsumer extends Consumer<AlarmStateTransitionedEvent> {
@Inject
public AlarmStateTransitionConsumer(@Assisted KafkaAlarmStateTransitionConsumer kafkaConsumer,
@Assisted AlarmStateTransitionPipeline pipeline) {
super(kafkaConsumer, pipeline);
}
}

@ -1,25 +0,0 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package monasca.persister.consumer.alarmstate;
import monasca.persister.pipeline.AlarmStateTransitionPipeline;
public interface AlarmStateTransitionConsumerFactory {
AlarmStateTransitionConsumer create(KafkaAlarmStateTransitionConsumer kafkaConsumer,
AlarmStateTransitionPipeline pipeline);
}

@ -17,32 +17,51 @@
package monasca.persister.consumer.alarmstate;
import monasca.common.model.event.AlarmStateTransitionedEvent;
import monasca.persister.consumer.KafkaChannel;
import monasca.persister.consumer.KafkaConsumer;
import monasca.persister.consumer.KafkaConsumerRunnableBasic;
import monasca.persister.pipeline.AlarmStateTransitionPipeline;
import monasca.persister.consumer.KafkaConsumerRunnableBasicFactory;
import monasca.persister.pipeline.ManagedPipeline;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
public class KafkaAlarmStateTransitionConsumer extends KafkaConsumer<AlarmStateTransitionedEvent> {
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
public class KafkaAlarmStateTransitionConsumer<T> extends KafkaConsumer<T> {
@Inject
private KafkaAlarmStateTransitionConsumerRunnableBasicFactory factory;
private KafkaConsumerRunnableBasicFactory<T> factory;
private final AlarmStateTransitionPipeline pipeline;
private final ManagedPipeline<T> pipeline;
private final Class<T> clazz;
@Inject
public KafkaAlarmStateTransitionConsumer(@Assisted KafkaChannel kafkaChannel,
@Assisted int threadNum, @Assisted final AlarmStateTransitionPipeline pipeline) {
public KafkaAlarmStateTransitionConsumer(
@Assisted Class<T> clazz,
@Assisted KafkaChannel kafkaChannel,
@Assisted int threadNum,
@Assisted final ManagedPipeline<T> pipeline) {
super(kafkaChannel, threadNum);
this.pipeline = pipeline;
this.clazz = clazz;
}
@Override
protected KafkaConsumerRunnableBasic<AlarmStateTransitionedEvent> createRunnable(
KafkaChannel kafkaChannel, int threadNumber) {
return factory.create(pipeline, kafkaChannel, threadNumber);
protected KafkaConsumerRunnableBasic<T> createRunnable(
KafkaChannel kafkaChannel,
int threadNumber) {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
objectMapper.enable(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY);
objectMapper.enable(DeserializationFeature.UNWRAP_ROOT_VALUE);
return factory.create(objectMapper, clazz, pipeline, kafkaChannel, threadNumber);
}
}

@ -18,9 +18,12 @@
package monasca.persister.consumer.alarmstate;
import monasca.persister.consumer.KafkaChannel;
import monasca.persister.pipeline.AlarmStateTransitionPipeline;
import monasca.persister.pipeline.ManagedPipeline;
public interface KafkaAlarmStateTransitionConsumerFactory {
KafkaAlarmStateTransitionConsumer create(KafkaChannel kafkaChannel, int threadNum,
final AlarmStateTransitionPipeline pipeline);
public interface KafkaAlarmStateTransitionConsumerFactory<T> {
KafkaAlarmStateTransitionConsumer<T> create(
Class<T> clazz,
KafkaChannel kafkaChannel, int threadNum,
final ManagedPipeline<T> pipeline);
}

@ -1,69 +0,0 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package monasca.persister.consumer.alarmstate;
import monasca.common.model.event.AlarmStateTransitionedEvent;
import monasca.persister.consumer.KafkaChannel;
import monasca.persister.consumer.KafkaConsumerRunnableBasic;
import monasca.persister.pipeline.AlarmStateTransitionPipeline;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class KafkaAlarmStateTransitionConsumerRunnableBasic extends
KafkaConsumerRunnableBasic<AlarmStateTransitionedEvent> {
private static final Logger logger = LoggerFactory
.getLogger(KafkaAlarmStateTransitionConsumerRunnableBasic.class);
private final ObjectMapper objectMapper;
@Inject
public KafkaAlarmStateTransitionConsumerRunnableBasic(@Assisted AlarmStateTransitionPipeline pipeline,
@Assisted KafkaChannel kafkaChannel, @Assisted int threadNumber) {
super(kafkaChannel, pipeline, threadNumber);
this.objectMapper = new ObjectMapper();
objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
objectMapper.enable(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY);
objectMapper.enable(DeserializationFeature.UNWRAP_ROOT_VALUE);
}
@Override
protected void publishHeartbeat() {
publishEvent(null);
}
@Override
protected void handleMessage(String message) {
try {
final AlarmStateTransitionedEvent event =
objectMapper.readValue(message, AlarmStateTransitionedEvent.class);
logger.debug(event.toString());
publishEvent(event);
} catch (Exception e) {
logger.error("Failed to deserialize JSON message and send to handler: " + message, e);
}
}
}

@ -17,32 +17,52 @@
package monasca.persister.consumer.metric;
import monasca.common.model.metric.MetricEnvelope;
import monasca.persister.consumer.KafkaChannel;
import monasca.persister.consumer.KafkaConsumer;
import monasca.persister.consumer.KafkaConsumerRunnableBasic;
import monasca.persister.pipeline.MetricPipeline;
import monasca.persister.consumer.KafkaConsumerRunnableBasicFactory;
import monasca.persister.pipeline.ManagedPipeline;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
public class KafkaMetricsConsumer extends KafkaConsumer<MetricEnvelope[]> {
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
public class KafkaMetricsConsumer<T> extends KafkaConsumer<T> {
@Inject
private KafkaMetricsConsumerRunnableBasicFactory factory;
private KafkaConsumerRunnableBasicFactory<T> factory;
private final MetricPipeline pipeline;
private final ManagedPipeline<T> pipeline;
private final Class<T> clazz;
@Inject
public KafkaMetricsConsumer(@Assisted KafkaChannel kafkaChannel, @Assisted int threadNum,
@Assisted MetricPipeline pipeline) {
public KafkaMetricsConsumer(
@Assisted Class<T> clazz,
@Assisted KafkaChannel kafkaChannel,
@Assisted int threadNum,
@Assisted ManagedPipeline<T> pipeline) {
super(kafkaChannel, threadNum);
this.pipeline = pipeline;
this.clazz = clazz;
}
@Override
protected KafkaConsumerRunnableBasic<MetricEnvelope[]> createRunnable(KafkaChannel kafkaChannel,
protected KafkaConsumerRunnableBasic<T> createRunnable(
KafkaChannel kafkaChannel,
int threadNumber) {
return factory.create(pipeline, kafkaChannel, threadNumber);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
objectMapper.enable(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY);
objectMapper.setPropertyNamingStrategy(
PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
return factory.create(objectMapper, clazz, pipeline, kafkaChannel, threadNumber);
}
}

@ -18,9 +18,13 @@
package monasca.persister.consumer.metric;
import monasca.persister.consumer.KafkaChannel;
import monasca.persister.pipeline.MetricPipeline;
import monasca.persister.pipeline.ManagedPipeline;
public interface KafkaMetricsConsumerFactory {
KafkaMetricsConsumer create(KafkaChannel kafkaChannel, int threadNum,
MetricPipeline pipeline);
public interface KafkaMetricsConsumerFactory<T> {
KafkaMetricsConsumer<T> create(
Class<T> clazz,
KafkaChannel kafkaChannel,
int threadNum,
ManagedPipeline<T> pipeline);
}

@ -1,71 +0,0 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package monasca.persister.consumer.metric;
import monasca.common.model.metric.MetricEnvelope;
import monasca.persister.consumer.KafkaChannel;
import monasca.persister.consumer.KafkaConsumerRunnableBasic;
import monasca.persister.pipeline.MetricPipeline;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class KafkaMetricsConsumerRunnableBasic extends
KafkaConsumerRunnableBasic<MetricEnvelope[]> {
private static final Logger logger = LoggerFactory
.getLogger(KafkaMetricsConsumerRunnableBasic.class);
private final ObjectMapper objectMapper;
@Inject
public KafkaMetricsConsumerRunnableBasic(@Assisted MetricPipeline pipeline,
@Assisted KafkaChannel kafkaChannel, @Assisted int threadNumber) {
super(kafkaChannel, pipeline, threadNumber);
this.objectMapper = new ObjectMapper();
objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
objectMapper.enable(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY);
objectMapper.setPropertyNamingStrategy(PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
}
@Override
protected void publishHeartbeat() {
publishEvent(null);
}
@Override
protected void handleMessage(String message) {
try {
final MetricEnvelope[] envelopes = objectMapper.readValue(message, MetricEnvelope[].class);
for (final MetricEnvelope envelope : envelopes) {
logger.debug("{}", envelope);
}
publishEvent(envelopes);
} catch (Exception e) {
logger.error("Failed to deserialize JSON message and place on pipeline queue: " + message,
e);
}
}
}

@ -1,25 +0,0 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package monasca.persister.consumer.metric;
import monasca.persister.consumer.KafkaChannel;
import monasca.persister.pipeline.MetricPipeline;
public interface KafkaMetricsConsumerRunnableBasicFactory {
KafkaMetricsConsumerRunnableBasic create(MetricPipeline pipeline, KafkaChannel kafkaChannel, int threadNumber);
}

@ -1,33 +0,0 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package monasca.persister.consumer.metric;
import monasca.common.model.metric.MetricEnvelope;
import monasca.persister.consumer.Consumer;
import monasca.persister.pipeline.MetricPipeline;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
public class MetricsConsumer extends Consumer<MetricEnvelope[]> {
@Inject
public MetricsConsumer(@Assisted KafkaMetricsConsumer kafkaConsumer, @Assisted MetricPipeline pipeline) {
super(kafkaConsumer, pipeline);
}
}

@ -1,21 +0,0 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package monasca.persister.consumer.metric;
import monasca.persister.pipeline.MetricPipeline;
public interface MetricsConsumerFactory {
MetricsConsumer create(KafkaMetricsConsumer kafkaConsumer, MetricPipeline pipeline);
}

@ -1,31 +0,0 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package monasca.persister.pipeline;
import monasca.common.model.event.AlarmStateTransitionedEvent;
import monasca.persister.pipeline.event.AlarmStateTransitionedEventHandler;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
public class AlarmStateTransitionPipeline extends ManagedPipeline<AlarmStateTransitionedEvent> {
@Inject
public AlarmStateTransitionPipeline(@Assisted AlarmStateTransitionedEventHandler handler) {
super(handler);
}
}

@ -17,30 +17,44 @@
package monasca.persister.pipeline;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import monasca.persister.pipeline.event.FlushableHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ManagedPipeline<T> {
private static final Logger logger = LoggerFactory.getLogger(ManagedPipeline.class);
private final FlushableHandler<T> eventHandler;
private final FlushableHandler<T> handler;
@Inject
public ManagedPipeline(
@Assisted FlushableHandler<T> handler) {
this.handler = handler;
public ManagedPipeline(FlushableHandler<T> eventHandler) {
this.eventHandler = eventHandler;
}
public void shutdown() {
eventHandler.flush();
handler.flush();
}
public boolean publishEvent(T holder) {
try {
return this.eventHandler.onEvent(holder);
return this.handler.onEvent(holder);
} catch (Exception e) {
logger.error("Failed to handle event", e);
return false;
}
}
}

@ -14,11 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package monasca.persister.pipeline;
import monasca.persister.pipeline.event.MetricHandler;
import monasca.persister.pipeline.event.FlushableHandler;
public interface ManagedPipelineFactory<T> {
ManagedPipeline<T> create(FlushableHandler<T> handler);
public interface MetricPipelineFactory {
MetricPipeline create(MetricHandler metricHandler);
}

@ -1,32 +0,0 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package monasca.persister.pipeline;
import monasca.common.model.metric.MetricEnvelope;
import monasca.persister.pipeline.event.MetricHandler;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
public class MetricPipeline extends ManagedPipeline<MetricEnvelope[]> {
@Inject
public MetricPipeline(@Assisted MetricHandler metricHandler) {
super(metricHandler);
}
}

@ -24,41 +24,56 @@ import monasca.persister.repository.AlarmRepo;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import com.codahale.metrics.Counter;
import io.dropwizard.setup.Environment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AlarmStateTransitionedEventHandler extends
FlushableHandler<AlarmStateTransitionedEvent> {
public class AlarmStateTransitionedEventHandler<T> extends
FlushableHandler<T> {
private static final Logger logger = LoggerFactory
.getLogger(AlarmStateTransitionedEventHandler.class);
private final AlarmRepo repository;
private final AlarmRepo alarmRepo;
private final int ordinal;
private final Counter alarmStateTransitionCounter;
@Inject
public AlarmStateTransitionedEventHandler(AlarmRepo repository,
@Assisted PipelineConfig configuration, Environment environment,
public AlarmStateTransitionedEventHandler(
AlarmRepo alarmRepo,
@Assisted PipelineConfig configuration,
Environment environment,
@Assisted("ordinal") int ordinal,
@Assisted("batchSize") int batchSize) {
super(configuration, environment, ordinal, batchSize,
AlarmStateTransitionedEventHandler.class.getName());
this.repository = repository;
this.alarmRepo = alarmRepo;
this.ordinal = ordinal;
final String handlerName = String.format("%s[%d]", AlarmStateTransitionedEventHandler.class.getName(), ordinal);
this.alarmStateTransitionCounter =
environment.metrics().counter(handlerName + "." + "alarm-state-transitions-added-to-batch-counter");
}
@Override
protected int process(AlarmStateTransitionedEvent event) throws Exception {
logger.debug("Ordinal: Event: {}", this.ordinal, event);
protected int process(T event) throws Exception {
logger.debug("Ordinal: {}: {}", this.ordinal, event);
alarmRepo.addToBatch((AlarmStateTransitionedEvent) event);
repository.addToBatch(event);
return 1;
}
@Override
protected void flushRepository() {
repository.flush();
alarmRepo.flush();
}
}

@ -21,7 +21,10 @@ import monasca.persister.configuration.PipelineConfig;
import com.google.inject.assistedinject.Assisted;
public interface AlarmStateTransitionedEventHandlerFactory {
AlarmStateTransitionedEventHandler create(PipelineConfig configuration,
@Assisted("ordinal") int ordinal, @Assisted("batchSize") int batchSize);
public interface AlarmStateTransitionedEventHandlerFactory<T> {
AlarmStateTransitionedEventHandler<T> create(
PipelineConfig configuration,
@Assisted("ordinal") int ordinal,
@Assisted("batchSize") int batchSize);
}

@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
public abstract class FlushableHandler<T> {
private static final Logger logger = LoggerFactory.getLogger(FlushableHandler.class);
private final int ordinal;
private final int batchSize;
private final String handlerName;
@ -45,8 +46,12 @@ public abstract class FlushableHandler<T> {
private final Meter commitMeter;
private final Timer commitTimer;
protected FlushableHandler(PipelineConfig configuration, Environment environment,
int ordinal, int batchSize, String baseName) {
protected FlushableHandler(
PipelineConfig configuration,
Environment environment,
int ordinal,
int batchSize,
String baseName) {
this.handlerName = String.format("%s[%d]", baseName, ordinal);
this.environment = environment;
@ -72,25 +77,31 @@ public abstract class FlushableHandler<T> {
public boolean onEvent(final T event) throws Exception {
if (event == null) {
long delta = millisSinceLastFlush + millisBetweenFlushes;
logger.debug("{} received heartbeat message, flush every {} seconds.", this.handlerName,
this.secondsBetweenFlushes);
if (delta < System.currentTimeMillis()) {
logger.debug("{}: {} seconds since last flush. Flushing to repository now.",
this.handlerName, delta);
flush();
return true;
} else {
logger.debug("{}: {} seconds since last flush. No need to flush at this time.",
this.handlerName, delta);
return false;
}
}
processedMeter.mark();
logger.debug("Ordinal: Event: {}", ordinal, event);
eventCount += process(event);
if (eventCount >= batchSize) {

@ -22,47 +22,64 @@ import com.google.inject.assistedinject.Assisted;
import com.codahale.metrics.Counter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.dropwizard.setup.Environment;
import monasca.common.model.metric.MetricEnvelope;
import monasca.persister.configuration.PipelineConfig;
import monasca.persister.repository.MetricRepo;
public class MetricHandler extends FlushableHandler<MetricEnvelope[]> {
public class MetricHandler<T> extends FlushableHandler<T> {
private final int ordinal;
private static final Logger logger = LoggerFactory
.getLogger(MetricHandler.class);
private final MetricRepo metricRepo;
private final int ordinal;
private final Counter metricCounter;
@Inject
public MetricHandler(MetricRepo metricRepo, @Assisted PipelineConfig configuration,
Environment environment, @Assisted("ordinal") int ordinal,
@Assisted("batchSize") int batchSize) {
public MetricHandler(
MetricRepo metricRepo,
@Assisted PipelineConfig configuration,
Environment environment,
@Assisted("ordinal") int ordinal,
@Assisted("batchSize") int batchSize) {
super(configuration,
environment,
ordinal,
batchSize,
MetricHandler.class.getName());
super(configuration, environment, ordinal, batchSize, MetricHandler.class.getName());
this.metricRepo = metricRepo;
this.ordinal = ordinal;
final String handlerName = String.format("%s[%d]", MetricHandler.class.getName(), ordinal);
this.metricCounter =
environment.metrics().counter(handlerName + "." + "metrics-added-to-batch-counter");
this.ordinal = ordinal;
}
@Override
public int process(MetricEnvelope[] metricEnvelopes) throws Exception {
public int process(T metricEnvelopes) throws Exception {
for (final MetricEnvelope metricEnvelope : metricEnvelopes) {
MetricEnvelope[] metricEnvelopesArry = (MetricEnvelope[]) metricEnvelopes;
for (final MetricEnvelope metricEnvelope : metricEnvelopesArry) {
processEnvelope(metricEnvelope);
}
return metricEnvelopes.length;
return metricEnvelopesArry.length;
}
private void processEnvelope(MetricEnvelope metricEnvelope) {
logger.debug("Ordinal: {}: {}", this.ordinal, metricEnvelope);
this.metricRepo.addToBatch(metricEnvelope);
metricCounter.inc();

@ -21,7 +21,10 @@ import monasca.persister.configuration.PipelineConfig;
import com.google.inject.assistedinject.Assisted;
public interface MetricHandlerFactory {
MetricHandler create(PipelineConfig pipelineConfig,
@Assisted("ordinal") int ordinal, @Assisted("batchSize") int batchSize);
public interface MetricHandlerFactory<T> {
MetricHandler<T> create(
PipelineConfig pipelineConfig,
@Assisted("ordinal") int ordinal,
@Assisted("batchSize") int batchSize);
}

@ -21,7 +21,7 @@ import monasca.common.model.metric.MetricEnvelope;
public interface MetricRepo {
void addToBatch(MetricEnvelope metricEnvelope);
void addToBatch(final MetricEnvelope metricEnvelope);
void flush();
}

@ -17,9 +17,10 @@
package monasca.persister;
import monasca.common.model.metric.MetricEnvelope;
import monasca.persister.consumer.Consumer;
import monasca.persister.consumer.metric.KafkaMetricsConsumer;
import monasca.persister.consumer.metric.MetricsConsumer;
import monasca.persister.pipeline.MetricPipeline;
import monasca.persister.pipeline.ManagedPipeline;
import monasca.persister.pipeline.event.MetricHandler;
import org.junit.Before;
@ -34,16 +35,16 @@ public class MonPersisterConsumerTest {
private KafkaMetricsConsumer kafkaConsumer;
@Mock
private MetricsConsumer monConsumer;
private Consumer monConsumer;
private MetricHandler metricHandler;
private MetricPipeline metricPipeline;
private ManagedPipeline<MetricEnvelope[]> metricPipeline;
@Before
public void initMocks() {
metricHandler = Mockito.mock(MetricHandler.class);
metricPipeline = Mockito.spy(new MetricPipeline(metricHandler));
metricPipeline = Mockito.spy(new ManagedPipeline<MetricEnvelope[]>(metricHandler));
MockitoAnnotations.initMocks(this);
}