JIRA JAH-17 Modify maven build for mon-thresh that creates a jar that can be used for a production build.
Needed to strip out the storm jar from the consolidated jar. Needed to remove storm.yaml because Storm complains. Moved the registration of the Serializer to ThresholdingEngine. Added the storm-core jar to the deb so it can be used for the local mode in mini-mon Added logback.xml to the deb since storm-core.jar also has a logback.xml and that confuses logback. Ensure our logback.xml is used. Added the storm-core.jar and logback.xml to the start of thresh in the deb Had to rework how the AlarmEventForwarder was injected into the AlarmThresholdingBolt because the old way didn't work in a Storm cluster because the TopologyModule wasn't loaded on the worker when prepare was called.
This commit is contained in:
parent
63290a82ab
commit
a3429536d2
50
pom.xml
50
pom.xml
@ -15,7 +15,7 @@
|
||||
|
||||
<properties>
|
||||
<mon.common.version>1.0.0.41</mon.common.version>
|
||||
|
||||
<storm.version>0.9.1-incubating</storm.version>
|
||||
<skipITs>true</skipITs>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
|
||||
@ -82,7 +82,7 @@
|
||||
<dependency>
|
||||
<groupId>org.apache.storm</groupId>
|
||||
<artifactId>storm-core</artifactId>
|
||||
<version>0.9.1-incubating</version>
|
||||
<version>${storm.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.hpcloud</groupId>
|
||||
@ -219,6 +219,13 @@
|
||||
</excludes>
|
||||
</filter>
|
||||
</filters>
|
||||
<artifactSet>
|
||||
<excludes>
|
||||
<exclude>junit:junit</exclude>
|
||||
<exclude>org.apache.storm:storm-core</exclude>
|
||||
<exclude>org.hamcrest:*</exclude>
|
||||
</excludes>
|
||||
</artifactSet>
|
||||
</configuration>
|
||||
<executions>
|
||||
<execution>
|
||||
@ -263,6 +270,34 @@
|
||||
<shortRevisionLength>6</shortRevisionLength>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-dependency-plugin</artifactId>
|
||||
<version>2.8</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>copy</id>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>copy</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<artifactItems>
|
||||
<artifactItem>
|
||||
<groupId>org.apache.storm</groupId>
|
||||
<artifactId>storm-core</artifactId>
|
||||
<version>${storm.version}</version>
|
||||
<type>jar</type>
|
||||
<overWrite>false</overWrite>
|
||||
</artifactItem>
|
||||
</artifactItems>
|
||||
<outputDirectory>${project.build.directory}</outputDirectory>
|
||||
<overWriteReleases>false</overWriteReleases>
|
||||
<overWriteSnapshots>true</overWriteSnapshots>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<artifactId>jdeb</artifactId>
|
||||
<groupId>org.vafer</groupId>
|
||||
@ -282,11 +317,22 @@
|
||||
</src>
|
||||
<dst>/opt/mon/mon-thresh.jar</dst>
|
||||
</data>
|
||||
<data>
|
||||
<type>file</type>
|
||||
<src>${project.build.directory}/storm-core-${storm.version}.jar
|
||||
</src>
|
||||
<dst>/opt/mon/storm-core-${storm.version}.jar</dst>
|
||||
</data>
|
||||
<data>
|
||||
<type>file</type>
|
||||
<src>${project.basedir}/src/deb/init/mon-thresh.conf</src>
|
||||
<dst>/etc/init/mon-thresh.conf</dst>
|
||||
</data>
|
||||
<data>
|
||||
<type>file</type>
|
||||
<src>${project.basedir}/src/main/resources/logback.xml</src>
|
||||
<dst>/etc/mon/logback.xml</dst>
|
||||
</data>
|
||||
<data>
|
||||
<type>file</type>
|
||||
<src>
|
||||
|
@ -15,7 +15,7 @@ script
|
||||
. /etc/default/mon-thresh
|
||||
fi
|
||||
|
||||
exec /usr/bin/java -Xmx8g -DLOGDIR=${LOGDIR:-/tmp} -cp /opt/mon/mon-thresh.jar com.hpcloud.mon.ThresholdingEngine /etc/mon/mon-thresh-config.yml mon-thresh local
|
||||
exec /usr/bin/java -Xmx8g -Dlogback.configurationFile=/etc/mon/logback.xml -DLOGDIR=${LOGDIR:-/tmp} -cp "/opt/mon/*" com.hpcloud.mon.ThresholdingEngine /etc/mon/mon-thresh-config.yml mon-thresh local
|
||||
|
||||
end script
|
||||
|
||||
|
@ -65,6 +65,7 @@ public class ThresholdingEngine {
|
||||
protected void run() throws Exception {
|
||||
Config config = Injector.getInstance(Config.class);
|
||||
StormTopology topology = Injector.getInstance(StormTopology.class);
|
||||
config.registerSerialization(com.hpcloud.mon.domain.model.SubAlarm.class);
|
||||
|
||||
if (local) {
|
||||
LOG.info("submitting topology {} to local storm cluster", topologyName);
|
||||
|
@ -10,11 +10,9 @@ import backtype.storm.tuple.Fields;
|
||||
|
||||
import com.google.inject.AbstractModule;
|
||||
import com.google.inject.Provides;
|
||||
import com.hpcloud.mon.infrastructure.thresholding.AlarmEventForwarder;
|
||||
import com.hpcloud.mon.infrastructure.thresholding.AlarmThresholdingBolt;
|
||||
import com.hpcloud.mon.infrastructure.thresholding.EventProcessingBolt;
|
||||
import com.hpcloud.mon.infrastructure.thresholding.EventSpout;
|
||||
import com.hpcloud.mon.infrastructure.thresholding.KafkaAlarmEventForwarder;
|
||||
import com.hpcloud.mon.infrastructure.thresholding.MetricAggregationBolt;
|
||||
import com.hpcloud.mon.infrastructure.thresholding.MetricFilteringBolt;
|
||||
import com.hpcloud.mon.infrastructure.thresholding.MetricSpout;
|
||||
@ -31,19 +29,17 @@ public class TopologyModule extends AbstractModule {
|
||||
private Config stormConfig;
|
||||
private IRichSpout metricSpout;
|
||||
private IRichSpout eventSpout;
|
||||
private AlarmEventForwarder alarmEventForwarder;
|
||||
|
||||
public TopologyModule(ThresholdingConfiguration config) {
|
||||
this.config = config;
|
||||
}
|
||||
|
||||
public TopologyModule(ThresholdingConfiguration threshConfig, Config stormConfig,
|
||||
IRichSpout metricSpout, IRichSpout eventSpout, AlarmEventForwarder alarmEventForwarder) {
|
||||
IRichSpout metricSpout, IRichSpout eventSpout) {
|
||||
this(threshConfig);
|
||||
this.stormConfig = stormConfig;
|
||||
this.metricSpout = metricSpout;
|
||||
this.eventSpout = eventSpout;
|
||||
this.alarmEventForwarder = alarmEventForwarder;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -63,11 +59,6 @@ public class TopologyModule extends AbstractModule {
|
||||
return stormConfig;
|
||||
}
|
||||
|
||||
@Provides
|
||||
AlarmEventForwarder alarmEventForwarder() {
|
||||
return alarmEventForwarder == null ? new KafkaAlarmEventForwarder(config.kafkaProducerConfig) : alarmEventForwarder;
|
||||
}
|
||||
|
||||
@Provides
|
||||
@Named("metrics")
|
||||
IRichSpout metricSpout() {
|
||||
@ -120,7 +111,7 @@ public class TopologyModule extends AbstractModule {
|
||||
|
||||
// Aggregation / Event -> Thresholding
|
||||
builder.setBolt("thresholding-bolt",
|
||||
new AlarmThresholdingBolt(config.database),
|
||||
new AlarmThresholdingBolt(config.database, config.kafkaProducerConfig),
|
||||
config.thresholdingBoltThreads)
|
||||
.fieldsGrouping("aggregation-bolt", new Fields(MetricAggregationBolt.FIELDS[0]))
|
||||
.fieldsGrouping("event-bolt", EventProcessingBolt.ALARM_EVENT_STREAM_ID,
|
||||
|
@ -12,6 +12,7 @@ import backtype.storm.topology.OutputFieldsDeclarer;
|
||||
import backtype.storm.topology.base.BaseRichBolt;
|
||||
import backtype.storm.tuple.Tuple;
|
||||
|
||||
import com.hpcloud.configuration.KafkaProducerConfiguration;
|
||||
import com.hpcloud.mon.ThresholdingConfiguration;
|
||||
import com.hpcloud.mon.common.event.AlarmStateTransitionedEvent;
|
||||
import com.hpcloud.mon.common.event.AlarmUpdatedEvent;
|
||||
@ -44,6 +45,7 @@ public class AlarmThresholdingBolt extends BaseRichBolt {
|
||||
|
||||
private transient Logger LOG;
|
||||
private DataSourceFactory dbConfig;
|
||||
private KafkaProducerConfiguration producerConfiguration;
|
||||
final Map<String, Alarm> alarms = new HashMap<String, Alarm>();
|
||||
private String alertExchange;
|
||||
private String alertRoutingKey;
|
||||
@ -51,8 +53,10 @@ public class AlarmThresholdingBolt extends BaseRichBolt {
|
||||
private transient AlarmEventForwarder alarmEventForwarder;
|
||||
private OutputCollector collector;
|
||||
|
||||
public AlarmThresholdingBolt(DataSourceFactory dbConfig) {
|
||||
public AlarmThresholdingBolt(DataSourceFactory dbConfig,
|
||||
KafkaProducerConfiguration producerConfig) {
|
||||
this.dbConfig = dbConfig;
|
||||
this.producerConfiguration = producerConfig;
|
||||
}
|
||||
|
||||
public AlarmThresholdingBolt(final AlarmDAO alarmDAO,
|
||||
@ -108,6 +112,7 @@ public class AlarmThresholdingBolt extends BaseRichBolt {
|
||||
alarmDAO = Injector.getInstance(AlarmDAO.class);
|
||||
}
|
||||
if (alarmEventForwarder == null) {
|
||||
Injector.registerIfNotBound(AlarmEventForwarder.class, new ProducerModule(this.producerConfiguration));
|
||||
alarmEventForwarder = Injector.getInstance(AlarmEventForwarder.class);
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,27 @@
|
||||
package com.hpcloud.mon.infrastructure.thresholding;
|
||||
|
||||
import com.google.inject.AbstractModule;
|
||||
import com.google.inject.Provides;
|
||||
import com.hpcloud.configuration.KafkaProducerConfiguration;
|
||||
|
||||
public class ProducerModule extends AbstractModule {
|
||||
private KafkaProducerConfiguration config;
|
||||
private AlarmEventForwarder alarmEventForwarder;
|
||||
|
||||
@Override
|
||||
protected void configure() {
|
||||
}
|
||||
|
||||
public ProducerModule(KafkaProducerConfiguration config) {
|
||||
this.config = config;
|
||||
}
|
||||
|
||||
public ProducerModule(AlarmEventForwarder alarmEventForwarder) {
|
||||
this.alarmEventForwarder = alarmEventForwarder;
|
||||
}
|
||||
|
||||
@Provides
|
||||
AlarmEventForwarder alarmEventForwarder() {
|
||||
return alarmEventForwarder == null ? new KafkaAlarmEventForwarder(config) : alarmEventForwarder;
|
||||
}
|
||||
}
|
@ -1,3 +0,0 @@
|
||||
topology.fall.back.on.java.serialization: true
|
||||
topology.kryo.register:
|
||||
- com.hpcloud.mon.domain.model.SubAlarm
|
@ -46,6 +46,7 @@ import com.hpcloud.mon.infrastructure.thresholding.AlarmEventForwarder;
|
||||
import com.hpcloud.mon.infrastructure.thresholding.EventProcessingBoltTest;
|
||||
import com.hpcloud.mon.infrastructure.thresholding.MetricAggregationBolt;
|
||||
import com.hpcloud.mon.infrastructure.thresholding.MetricSpout;
|
||||
import com.hpcloud.mon.infrastructure.thresholding.ProducerModule;
|
||||
import com.hpcloud.streaming.storm.TopologyTestCase;
|
||||
import com.hpcloud.util.Injector;
|
||||
import com.hpcloud.util.Serialization;
|
||||
@ -128,7 +129,8 @@ public class ThresholdingEngineAlarmTest extends TopologyTestCase {
|
||||
eventSpout = new FeederSpout(new Fields("event"));
|
||||
alarmEventForwarder = mock(AlarmEventForwarder.class);
|
||||
Injector.registerModules(new TopologyModule(threshConfig, stormConfig,
|
||||
metricSpout, eventSpout, alarmEventForwarder));
|
||||
metricSpout, eventSpout));
|
||||
Injector.registerModules(new ProducerModule(alarmEventForwarder));
|
||||
|
||||
// Evaluate alarm stats every 1 seconds
|
||||
System.setProperty(MetricAggregationBolt.TICK_TUPLE_SECONDS_KEY, "5");
|
||||
|
@ -40,6 +40,7 @@ import com.hpcloud.mon.domain.service.SubAlarmDAO;
|
||||
import com.hpcloud.mon.domain.service.SubAlarmMetricDefinition;
|
||||
import com.hpcloud.mon.infrastructure.thresholding.AlarmEventForwarder;
|
||||
import com.hpcloud.mon.infrastructure.thresholding.MetricSpout;
|
||||
import com.hpcloud.mon.infrastructure.thresholding.ProducerModule;
|
||||
import com.hpcloud.streaming.storm.TopologyTestCase;
|
||||
import com.hpcloud.util.Injector;
|
||||
import com.hpcloud.util.Serialization;
|
||||
@ -134,7 +135,8 @@ public class ThresholdingEngineTest extends TopologyTestCase {
|
||||
eventSpout = new FeederSpout(new Fields("event"));
|
||||
alarmEventForwarder = mock(AlarmEventForwarder.class);
|
||||
Injector.registerModules(new TopologyModule(threshConfig, stormConfig,
|
||||
metricSpout, eventSpout, alarmEventForwarder));
|
||||
metricSpout, eventSpout));
|
||||
Injector.registerModules(new ProducerModule(alarmEventForwarder));
|
||||
}
|
||||
|
||||
private List<SubAlarm> subAlarmsFor(AlarmExpression expression) {
|
||||
|
@ -37,6 +37,7 @@ import com.hpcloud.mon.domain.service.SubAlarmMetricDefinition;
|
||||
import com.hpcloud.mon.infrastructure.thresholding.AlarmEventForwarder;
|
||||
import com.hpcloud.mon.infrastructure.thresholding.MetricAggregationBolt;
|
||||
import com.hpcloud.mon.infrastructure.thresholding.MetricSpout;
|
||||
import com.hpcloud.mon.infrastructure.thresholding.ProducerModule;
|
||||
import com.hpcloud.streaming.storm.TopologyTestCase;
|
||||
import com.hpcloud.util.Injector;
|
||||
|
||||
@ -135,7 +136,8 @@ public class ThresholdingEngineTest1 extends TopologyTestCase {
|
||||
final AlarmEventForwarder alarmEventForwarder = mock(AlarmEventForwarder.class);
|
||||
|
||||
Injector.registerModules(new TopologyModule(threshConfig, stormConfig,
|
||||
metricSpout, eventSpout, alarmEventForwarder));
|
||||
metricSpout, eventSpout));
|
||||
Injector.registerModules(new ProducerModule(alarmEventForwarder));
|
||||
|
||||
// Evaluate alarm stats every 1 seconds
|
||||
System.setProperty(MetricAggregationBolt.TICK_TUPLE_SECONDS_KEY, "1");
|
||||
|
Loading…
x
Reference in New Issue
Block a user