Added mon-streaming common sub-project

This commit is contained in:
Jonathan Halterman 2014-03-10 13:29:40 -07:00
parent 5c3ca6f404
commit 270dac84ff
11 changed files with 260 additions and 0 deletions

1
java/mon-streaming/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
/target

View File

@ -0,0 +1,44 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.hpcloud</groupId>
<artifactId>mon-common</artifactId>
<version>${computedVersion}</version>
</parent>
<artifactId>mon-streaming</artifactId>
<packaging>jar</packaging>
<properties>
</properties>
<dependencies>
<dependency>
<groupId>storm</groupId>
<artifactId>storm</artifactId>
<version>0.9.0.1</version>
<scope>provided</scope>
</dependency>
<!-- Test dependencies -->
<dependency>
<groupId>com.hpcloud</groupId>
<artifactId>mon-testing</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.hpcloud</groupId>
<artifactId>mon-util</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<repositories>
<repository>
<id>clojars.org</id>
<url>http://clojars.org/repo</url>
</repository>
</repositories>
</project>

View File

@ -0,0 +1,18 @@
package com.hpcloud.streaming.storm;
import backtype.storm.task.TopologyContext;
/**
* Storm related logging utilities.
*
* @author Jonathan Halterman
*/
public final class Logging {
private Logging() {
}
public static String categoryFor(Class<?> type, TopologyContext ctx) {
return String.format("%s-%s", type.getName(), ctx.getThisTaskId());
}
}

View File

@ -0,0 +1,13 @@
package com.hpcloud.streaming.storm;
/**
* Utilities for working with streams.
*
* @author Jonathan Halterman
*/
public final class Streams {
public static final String DEFAULT_STREAM_ID = "default";
private Streams() {
}
}

View File

@ -0,0 +1,25 @@
package com.hpcloud.streaming.storm;
import java.util.List;
import java.util.Map;
import backtype.storm.tuple.Fields;
/**
* Deserializes tuples. Similar to a Scheme, but allows for multiple records per
* {@link #deserialize(byte[])} call.
*
* @author Jonathan Halterman
*/
public interface TupleDeserializer {
/**
* Returns a list of deserialized tuples, consisting of a list of tuples each with a list of
* fields, for the {@code tuple}, else null if the {@code tuple} cannot be deserialized.
*/
List<List<?>> deserialize(byte[] tuple, Map<String, Object> headers);
/**
* Returns the output fields.
*/
Fields getOutputFields();
}

View File

@ -0,0 +1,19 @@
package com.hpcloud.streaming.storm;
import backtype.storm.Constants;
import backtype.storm.tuple.Tuple;
/**
* Utilities for working with Tuples.
*
* @author Jonathan Halterman
*/
public final class Tuples {
private Tuples() {
}
public static boolean isTickTuple(Tuple tuple) {
return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
&& tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID);
}
}

View File

@ -0,0 +1,32 @@
package com.hpcloud.streaming.storm;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
@SuppressWarnings("serial")
public class NoopSpout extends BaseRichSpout {
private final Fields outputFields;
public NoopSpout(Fields outputFields) {
this.outputFields = outputFields;
}
@Override
@SuppressWarnings("rawtypes")
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
}
@Override
public void nextTuple() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(outputFields);
}
}

View File

@ -0,0 +1,15 @@
package com.hpcloud.streaming.storm;
import java.util.List;
import com.hpcloud.streaming.storm.TestSpout.TupleProvider;
/**
* @author Jonathan Halterman
*/
public class PeriodicTupleProvider implements TupleProvider {
@Override
public List<Object> get() {
return null;
}
}

View File

@ -0,0 +1,47 @@
package com.hpcloud.streaming.storm;
import java.util.List;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import com.hpcloud.streaming.storm.TupleDeserializer;
/**
* @author Jonathan Halterman
*/
public class TestSpout extends BaseRichSpout {
private static final long serialVersionUID = 849564133745588803L;
private final TupleDeserializer deserializer;
private final TupleProvider tupleProvider;
private SpoutOutputCollector collector;
public TestSpout(TupleDeserializer deserializer, TupleProvider tupleProvider) {
this.deserializer = deserializer;
this.tupleProvider = tupleProvider;
}
public interface TupleProvider {
List<Object> get();
}
@Override
@SuppressWarnings("rawtypes")
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
@Override
public void nextTuple() {
collector.emit(tupleProvider.get());
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(deserializer.getOutputFields());
}
}

View File

@ -0,0 +1,45 @@
package com.hpcloud.streaming.storm;
import org.testng.annotations.AfterSuite;
import org.testng.annotations.BeforeSuite;
import org.testng.annotations.Test;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.StormTopology;
import com.google.common.base.Preconditions;
import com.hpcloud.util.Injector;
/**
* @author Jonathan Halterman
*/
@Test(groups = "integration")
public class TopologyTestCase {
public static final String TEST_TOPOLOGY_NAME = "test-maas-alarming";
protected static volatile LocalCluster cluster;
@BeforeSuite
protected void startTopology() throws Exception {
if (cluster == null) {
synchronized (TopologyTestCase.class) {
if (cluster == null) {
Preconditions.checkArgument(Injector.isBound(Config.class),
"You must bind a storm config");
Preconditions.checkArgument(Injector.isBound(StormTopology.class),
"You must bind a storm topology");
cluster = new LocalCluster();
cluster.submitTopology(TEST_TOPOLOGY_NAME, Injector.getInstance(Config.class),
Injector.getInstance(StormTopology.class));
}
}
}
}
@AfterSuite
protected static void stopTopology() {
cluster.killTopology(TEST_TOPOLOGY_NAME);
cluster.shutdown();
}
}

View File

@ -43,6 +43,7 @@
<module>mon-model</module>
<module>mon-persistence</module>
<module>mon-service</module>
<module>mon-streaming</module>
<module>mon-testing</module>
<module>mon-util</module>
</modules>