From a3429536d281a3bb1bcaf3dab8236f3248b41334 Mon Sep 17 00:00:00 2001 From: Craig Bryant Date: Wed, 30 Apr 2014 22:28:12 -0600 Subject: [PATCH] JIRA JAH-17 Modify maven build for mon-thresh that creates a jar that can be used for a production build. Needed to strip out the storm jar from the consolidated jar. Needed to remove storm.yaml because Storm complains. Moved the registration of the Serializer to ThresholdingEngine. Added the storm-core jar to the deb so it can be used for the local mode in mini-mon Added logback.xml to the deb since storm-core.jar also has a logback.xml and that confuses logback. Ensure our logback.xml is used. Added the storm-core.jar and logback.xml to the start of thresh in the deb Had to rework how the AlarmEventForwarder was injected into the AlarmThresholdingBolt because the old way didn't work in a Storm cluster because the TopologyModule wasn't loaded on the worker when prepare was called. --- pom.xml | 50 ++++++++++++++++++- src/deb/init/mon-thresh.conf | 2 +- .../com/hpcloud/mon/ThresholdingEngine.java | 1 + .../java/com/hpcloud/mon/TopologyModule.java | 13 +---- .../thresholding/AlarmThresholdingBolt.java | 7 ++- .../thresholding/ProducerModule.java | 27 ++++++++++ src/main/resources/storm.yaml | 3 -- .../mon/ThresholdingEngineAlarmTest.java | 4 +- .../hpcloud/mon/ThresholdingEngineTest.java | 4 +- .../hpcloud/mon/ThresholdingEngineTest1.java | 4 +- 10 files changed, 94 insertions(+), 21 deletions(-) create mode 100644 src/main/java/com/hpcloud/mon/infrastructure/thresholding/ProducerModule.java delete mode 100644 src/main/resources/storm.yaml diff --git a/pom.xml b/pom.xml index 4d4703a..0aeb33a 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ 1.0.0.41 - + 0.9.1-incubating true UTF-8 UTF-8 @@ -82,7 +82,7 @@ org.apache.storm storm-core - 0.9.1-incubating + ${storm.version} com.hpcloud @@ -219,6 +219,13 @@ + + + junit:junit + org.apache.storm:storm-core + org.hamcrest:* + + @@ -263,6 +270,34 @@ 6 + + org.apache.maven.plugins + maven-dependency-plugin + 2.8 + + + copy + package + + copy + + + + + org.apache.storm + storm-core + ${storm.version} + jar + false + + + ${project.build.directory} + false + true + + + + jdeb org.vafer @@ -282,11 +317,22 @@ /opt/mon/mon-thresh.jar + + file + ${project.build.directory}/storm-core-${storm.version}.jar + + /opt/mon/storm-core-${storm.version}.jar + file ${project.basedir}/src/deb/init/mon-thresh.conf /etc/init/mon-thresh.conf + + file + ${project.basedir}/src/main/resources/logback.xml + /etc/mon/logback.xml + file diff --git a/src/deb/init/mon-thresh.conf b/src/deb/init/mon-thresh.conf index 1673559..50564bd 100644 --- a/src/deb/init/mon-thresh.conf +++ b/src/deb/init/mon-thresh.conf @@ -15,7 +15,7 @@ script . /etc/default/mon-thresh fi - exec /usr/bin/java -Xmx8g -DLOGDIR=${LOGDIR:-/tmp} -cp /opt/mon/mon-thresh.jar com.hpcloud.mon.ThresholdingEngine /etc/mon/mon-thresh-config.yml mon-thresh local + exec /usr/bin/java -Xmx8g -Dlogback.configurationFile=/etc/mon/logback.xml -DLOGDIR=${LOGDIR:-/tmp} -cp "/opt/mon/*" com.hpcloud.mon.ThresholdingEngine /etc/mon/mon-thresh-config.yml mon-thresh local end script diff --git a/src/main/java/com/hpcloud/mon/ThresholdingEngine.java b/src/main/java/com/hpcloud/mon/ThresholdingEngine.java index 632430d..4dbf2b4 100644 --- a/src/main/java/com/hpcloud/mon/ThresholdingEngine.java +++ b/src/main/java/com/hpcloud/mon/ThresholdingEngine.java @@ -65,6 +65,7 @@ public class ThresholdingEngine { protected void run() throws Exception { Config config = Injector.getInstance(Config.class); StormTopology topology = Injector.getInstance(StormTopology.class); + config.registerSerialization(com.hpcloud.mon.domain.model.SubAlarm.class); if (local) { LOG.info("submitting topology {} to local storm cluster", topologyName); diff --git a/src/main/java/com/hpcloud/mon/TopologyModule.java b/src/main/java/com/hpcloud/mon/TopologyModule.java index 0c393c2..2fa2bed 100644 --- a/src/main/java/com/hpcloud/mon/TopologyModule.java +++ b/src/main/java/com/hpcloud/mon/TopologyModule.java @@ -10,11 +10,9 @@ import backtype.storm.tuple.Fields; import com.google.inject.AbstractModule; import com.google.inject.Provides; -import com.hpcloud.mon.infrastructure.thresholding.AlarmEventForwarder; import com.hpcloud.mon.infrastructure.thresholding.AlarmThresholdingBolt; import com.hpcloud.mon.infrastructure.thresholding.EventProcessingBolt; import com.hpcloud.mon.infrastructure.thresholding.EventSpout; -import com.hpcloud.mon.infrastructure.thresholding.KafkaAlarmEventForwarder; import com.hpcloud.mon.infrastructure.thresholding.MetricAggregationBolt; import com.hpcloud.mon.infrastructure.thresholding.MetricFilteringBolt; import com.hpcloud.mon.infrastructure.thresholding.MetricSpout; @@ -31,19 +29,17 @@ public class TopologyModule extends AbstractModule { private Config stormConfig; private IRichSpout metricSpout; private IRichSpout eventSpout; - private AlarmEventForwarder alarmEventForwarder; public TopologyModule(ThresholdingConfiguration config) { this.config = config; } public TopologyModule(ThresholdingConfiguration threshConfig, Config stormConfig, - IRichSpout metricSpout, IRichSpout eventSpout, AlarmEventForwarder alarmEventForwarder) { + IRichSpout metricSpout, IRichSpout eventSpout) { this(threshConfig); this.stormConfig = stormConfig; this.metricSpout = metricSpout; this.eventSpout = eventSpout; - this.alarmEventForwarder = alarmEventForwarder; } @Override @@ -63,11 +59,6 @@ public class TopologyModule extends AbstractModule { return stormConfig; } - @Provides - AlarmEventForwarder alarmEventForwarder() { - return alarmEventForwarder == null ? new KafkaAlarmEventForwarder(config.kafkaProducerConfig) : alarmEventForwarder; - } - @Provides @Named("metrics") IRichSpout metricSpout() { @@ -120,7 +111,7 @@ public class TopologyModule extends AbstractModule { // Aggregation / Event -> Thresholding builder.setBolt("thresholding-bolt", - new AlarmThresholdingBolt(config.database), + new AlarmThresholdingBolt(config.database, config.kafkaProducerConfig), config.thresholdingBoltThreads) .fieldsGrouping("aggregation-bolt", new Fields(MetricAggregationBolt.FIELDS[0])) .fieldsGrouping("event-bolt", EventProcessingBolt.ALARM_EVENT_STREAM_ID, diff --git a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/AlarmThresholdingBolt.java b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/AlarmThresholdingBolt.java index f3af002..409e9ee 100644 --- a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/AlarmThresholdingBolt.java +++ b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/AlarmThresholdingBolt.java @@ -12,6 +12,7 @@ import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; +import com.hpcloud.configuration.KafkaProducerConfiguration; import com.hpcloud.mon.ThresholdingConfiguration; import com.hpcloud.mon.common.event.AlarmStateTransitionedEvent; import com.hpcloud.mon.common.event.AlarmUpdatedEvent; @@ -44,6 +45,7 @@ public class AlarmThresholdingBolt extends BaseRichBolt { private transient Logger LOG; private DataSourceFactory dbConfig; + private KafkaProducerConfiguration producerConfiguration; final Map alarms = new HashMap(); private String alertExchange; private String alertRoutingKey; @@ -51,8 +53,10 @@ public class AlarmThresholdingBolt extends BaseRichBolt { private transient AlarmEventForwarder alarmEventForwarder; private OutputCollector collector; - public AlarmThresholdingBolt(DataSourceFactory dbConfig) { + public AlarmThresholdingBolt(DataSourceFactory dbConfig, + KafkaProducerConfiguration producerConfig) { this.dbConfig = dbConfig; + this.producerConfiguration = producerConfig; } public AlarmThresholdingBolt(final AlarmDAO alarmDAO, @@ -108,6 +112,7 @@ public class AlarmThresholdingBolt extends BaseRichBolt { alarmDAO = Injector.getInstance(AlarmDAO.class); } if (alarmEventForwarder == null) { + Injector.registerIfNotBound(AlarmEventForwarder.class, new ProducerModule(this.producerConfiguration)); alarmEventForwarder = Injector.getInstance(AlarmEventForwarder.class); } } diff --git a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/ProducerModule.java b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/ProducerModule.java new file mode 100644 index 0000000..5ca5565 --- /dev/null +++ b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/ProducerModule.java @@ -0,0 +1,27 @@ +package com.hpcloud.mon.infrastructure.thresholding; + +import com.google.inject.AbstractModule; +import com.google.inject.Provides; +import com.hpcloud.configuration.KafkaProducerConfiguration; + +public class ProducerModule extends AbstractModule { + private KafkaProducerConfiguration config; + private AlarmEventForwarder alarmEventForwarder; + + @Override + protected void configure() { + } + + public ProducerModule(KafkaProducerConfiguration config) { + this.config = config; + } + + public ProducerModule(AlarmEventForwarder alarmEventForwarder) { + this.alarmEventForwarder = alarmEventForwarder; + } + + @Provides + AlarmEventForwarder alarmEventForwarder() { + return alarmEventForwarder == null ? new KafkaAlarmEventForwarder(config) : alarmEventForwarder; + } +} diff --git a/src/main/resources/storm.yaml b/src/main/resources/storm.yaml deleted file mode 100644 index caec556..0000000 --- a/src/main/resources/storm.yaml +++ /dev/null @@ -1,3 +0,0 @@ -topology.fall.back.on.java.serialization: true -topology.kryo.register: - - com.hpcloud.mon.domain.model.SubAlarm diff --git a/src/test/java/com/hpcloud/mon/ThresholdingEngineAlarmTest.java b/src/test/java/com/hpcloud/mon/ThresholdingEngineAlarmTest.java index e444b10..951c2e7 100644 --- a/src/test/java/com/hpcloud/mon/ThresholdingEngineAlarmTest.java +++ b/src/test/java/com/hpcloud/mon/ThresholdingEngineAlarmTest.java @@ -46,6 +46,7 @@ import com.hpcloud.mon.infrastructure.thresholding.AlarmEventForwarder; import com.hpcloud.mon.infrastructure.thresholding.EventProcessingBoltTest; import com.hpcloud.mon.infrastructure.thresholding.MetricAggregationBolt; import com.hpcloud.mon.infrastructure.thresholding.MetricSpout; +import com.hpcloud.mon.infrastructure.thresholding.ProducerModule; import com.hpcloud.streaming.storm.TopologyTestCase; import com.hpcloud.util.Injector; import com.hpcloud.util.Serialization; @@ -128,7 +129,8 @@ public class ThresholdingEngineAlarmTest extends TopologyTestCase { eventSpout = new FeederSpout(new Fields("event")); alarmEventForwarder = mock(AlarmEventForwarder.class); Injector.registerModules(new TopologyModule(threshConfig, stormConfig, - metricSpout, eventSpout, alarmEventForwarder)); + metricSpout, eventSpout)); + Injector.registerModules(new ProducerModule(alarmEventForwarder)); // Evaluate alarm stats every 1 seconds System.setProperty(MetricAggregationBolt.TICK_TUPLE_SECONDS_KEY, "5"); diff --git a/src/test/java/com/hpcloud/mon/ThresholdingEngineTest.java b/src/test/java/com/hpcloud/mon/ThresholdingEngineTest.java index 7a0ac6a..9aef72e 100644 --- a/src/test/java/com/hpcloud/mon/ThresholdingEngineTest.java +++ b/src/test/java/com/hpcloud/mon/ThresholdingEngineTest.java @@ -40,6 +40,7 @@ import com.hpcloud.mon.domain.service.SubAlarmDAO; import com.hpcloud.mon.domain.service.SubAlarmMetricDefinition; import com.hpcloud.mon.infrastructure.thresholding.AlarmEventForwarder; import com.hpcloud.mon.infrastructure.thresholding.MetricSpout; +import com.hpcloud.mon.infrastructure.thresholding.ProducerModule; import com.hpcloud.streaming.storm.TopologyTestCase; import com.hpcloud.util.Injector; import com.hpcloud.util.Serialization; @@ -134,7 +135,8 @@ public class ThresholdingEngineTest extends TopologyTestCase { eventSpout = new FeederSpout(new Fields("event")); alarmEventForwarder = mock(AlarmEventForwarder.class); Injector.registerModules(new TopologyModule(threshConfig, stormConfig, - metricSpout, eventSpout, alarmEventForwarder)); + metricSpout, eventSpout)); + Injector.registerModules(new ProducerModule(alarmEventForwarder)); } private List subAlarmsFor(AlarmExpression expression) { diff --git a/src/test/java/com/hpcloud/mon/ThresholdingEngineTest1.java b/src/test/java/com/hpcloud/mon/ThresholdingEngineTest1.java index 4995f5d..236dcf8 100644 --- a/src/test/java/com/hpcloud/mon/ThresholdingEngineTest1.java +++ b/src/test/java/com/hpcloud/mon/ThresholdingEngineTest1.java @@ -37,6 +37,7 @@ import com.hpcloud.mon.domain.service.SubAlarmMetricDefinition; import com.hpcloud.mon.infrastructure.thresholding.AlarmEventForwarder; import com.hpcloud.mon.infrastructure.thresholding.MetricAggregationBolt; import com.hpcloud.mon.infrastructure.thresholding.MetricSpout; +import com.hpcloud.mon.infrastructure.thresholding.ProducerModule; import com.hpcloud.streaming.storm.TopologyTestCase; import com.hpcloud.util.Injector; @@ -135,7 +136,8 @@ public class ThresholdingEngineTest1 extends TopologyTestCase { final AlarmEventForwarder alarmEventForwarder = mock(AlarmEventForwarder.class); Injector.registerModules(new TopologyModule(threshConfig, stormConfig, - metricSpout, eventSpout, alarmEventForwarder)); + metricSpout, eventSpout)); + Injector.registerModules(new ProducerModule(alarmEventForwarder)); // Evaluate alarm stats every 1 seconds System.setProperty(MetricAggregationBolt.TICK_TUPLE_SECONDS_KEY, "1");