From e77566f4f361a902b17fd66c521f89919d5f3a64 Mon Sep 17 00:00:00 2001 From: Craig Bryant Date: Wed, 25 Jun 2014 12:02:36 -0600 Subject: [PATCH] Make it configurable how long the KafkaSpout sleeps if there is no message ready. This required changes in the config classes. Also, make it not a fixed sleep, but a wait that can get notified if a message does arrive. --- src/main/config/mon-thresh-sample-config.yml | 3 + .../com/hpcloud/mon/EventSpoutConfig.java | 9 +-- .../com/hpcloud/mon/KafkaSpoutConfig.java | 16 +++++ .../com/hpcloud/mon/MetricSpoutConfig.java | 8 +-- .../thresholding/EventSpout.java | 2 +- .../thresholding/KafkaSpout.java | 58 +++++++++++++------ .../thresholding/MetricSpout.java | 2 +- 7 files changed, 63 insertions(+), 35 deletions(-) create mode 100644 src/main/java/com/hpcloud/mon/KafkaSpoutConfig.java diff --git a/src/main/config/mon-thresh-sample-config.yml b/src/main/config/mon-thresh-sample-config.yml index 0109022..0b0f013 100644 --- a/src/main/config/mon-thresh-sample-config.yml +++ b/src/main/config/mon-thresh-sample-config.yml @@ -2,6 +2,7 @@ metricSpoutThreads: 2 metricSpoutTasks: 2 metricSpoutConfig: + maxWaitTime: 500 #Kafka settings. kafkaConsumerConfiguration: @@ -33,6 +34,8 @@ eventSpoutThreads: 2 eventSpoutTasks: 2 eventSpoutConfig: + maxWaitTime: 500 + #Kafka settings. kafkaConsumerConfiguration: # See http://kafka.apache.org/documentation.html#api for semantics and defaults. diff --git a/src/main/java/com/hpcloud/mon/EventSpoutConfig.java b/src/main/java/com/hpcloud/mon/EventSpoutConfig.java index 00c830d..9ac0540 100644 --- a/src/main/java/com/hpcloud/mon/EventSpoutConfig.java +++ b/src/main/java/com/hpcloud/mon/EventSpoutConfig.java @@ -16,14 +16,7 @@ */ package com.hpcloud.mon; -import java.io.Serializable; - -import com.hpcloud.configuration.KafkaConsumerConfiguration; - -public class EventSpoutConfig implements Serializable { +public class EventSpoutConfig extends KafkaSpoutConfig { private static final long serialVersionUID = -8129774848323598123L; - - public KafkaConsumerConfiguration kafkaConsumerConfiguration; - } diff --git a/src/main/java/com/hpcloud/mon/KafkaSpoutConfig.java b/src/main/java/com/hpcloud/mon/KafkaSpoutConfig.java new file mode 100644 index 0000000..c3bda2b --- /dev/null +++ b/src/main/java/com/hpcloud/mon/KafkaSpoutConfig.java @@ -0,0 +1,16 @@ +package com.hpcloud.mon; + +import java.io.Serializable; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.hpcloud.configuration.KafkaConsumerConfiguration; + +public class KafkaSpoutConfig implements Serializable { + + private static final long serialVersionUID = -6477042435089264571L; + + @JsonProperty + public Integer maxWaitTime = 100; + + public KafkaConsumerConfiguration kafkaConsumerConfiguration; +} diff --git a/src/main/java/com/hpcloud/mon/MetricSpoutConfig.java b/src/main/java/com/hpcloud/mon/MetricSpoutConfig.java index ddc0eee..eb93ece 100644 --- a/src/main/java/com/hpcloud/mon/MetricSpoutConfig.java +++ b/src/main/java/com/hpcloud/mon/MetricSpoutConfig.java @@ -16,13 +16,7 @@ */ package com.hpcloud.mon; -import java.io.Serializable; - -import com.hpcloud.configuration.*; - -public class MetricSpoutConfig implements Serializable { +public class MetricSpoutConfig extends KafkaSpoutConfig { private static final long serialVersionUID = -4285448019855024921L; - - public KafkaConsumerConfiguration kafkaConsumerConfiguration; } diff --git a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/EventSpout.java b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/EventSpout.java index f1bf097..87826ff 100644 --- a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/EventSpout.java +++ b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/EventSpout.java @@ -37,7 +37,7 @@ public class EventSpout extends KafkaSpout { private final EventDeserializer deserializer; public EventSpout(EventSpoutConfig configuration, EventDeserializer deserializer) { - super(configuration.kafkaConsumerConfiguration); + super(configuration); this.deserializer = deserializer; LOG.info("EventSpout created"); } diff --git a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/KafkaSpout.java b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/KafkaSpout.java index 4e2a3f7..da30677 100644 --- a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/KafkaSpout.java +++ b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/KafkaSpout.java @@ -19,13 +19,16 @@ package com.hpcloud.mon.infrastructure.thresholding; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.base.BaseRichSpout; -import com.hpcloud.configuration.KafkaConsumerConfiguration; + import com.hpcloud.configuration.KafkaConsumerProperties; +import com.hpcloud.mon.KafkaSpoutConfig; + import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,7 +42,7 @@ public abstract class KafkaSpout extends BaseRichSpout implements Runnable { private static final long serialVersionUID = 744004533863562119L; - private final KafkaConsumerConfiguration kafkaConsumerConfig; + private final KafkaSpoutConfig kafkaSpoutConfig; private transient ConsumerConnector consumerConnector; @@ -55,8 +58,10 @@ public abstract class KafkaSpout extends BaseRichSpout implements Runnable { private String spoutName; - protected KafkaSpout(KafkaConsumerConfiguration kafkaConsumerConfig) { - this.kafkaConsumerConfig = kafkaConsumerConfig; + private boolean waiting = false; + + protected KafkaSpout(KafkaSpoutConfig kafkaSpoutConfig) { + this.kafkaSpoutConfig = kafkaSpoutConfig; } @Override @@ -64,9 +69,9 @@ public abstract class KafkaSpout extends BaseRichSpout implements Runnable { LOG.info("Activated"); if (streams == null) { Map topicCountMap = new HashMap<>(); - topicCountMap.put(kafkaConsumerConfig.getTopic(), new Integer(1)); + topicCountMap.put(kafkaSpoutConfig.kafkaConsumerConfiguration.getTopic(), new Integer(1)); Map>> consumerMap = consumerConnector.createMessageStreams(topicCountMap); - streams = consumerMap.get(kafkaConsumerConfig.getTopic()); + streams = consumerMap.get(kafkaSpoutConfig.kafkaConsumerConfiguration.getTopic()); } } @@ -74,10 +79,11 @@ public abstract class KafkaSpout extends BaseRichSpout implements Runnable { public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { LOG.info("Opened"); this.collector = collector; - LOG.info(" topic = " + kafkaConsumerConfig.getTopic()); + LOG.info(" topic = " + kafkaSpoutConfig.kafkaConsumerConfiguration.getTopic()); this.spoutName = String.format("%s-%d", context.getThisComponentId(), context.getThisTaskId()); - Properties kafkaProperties = KafkaConsumerProperties.createKafkaProperties(kafkaConsumerConfig); + Properties kafkaProperties = KafkaConsumerProperties.createKafkaProperties( + kafkaSpoutConfig.kafkaConsumerConfiguration); // Have to use a different consumer.id for each spout so use the storm taskId. Otherwise, // zookeeper complains about a conflicted ephemeral node when there is more than one spout // reading from a topic @@ -100,10 +106,12 @@ public abstract class KafkaSpout extends BaseRichSpout implements Runnable { while (this.shouldContinue) { final ConsumerIterator it = streams.get(0).iterator(); if (it.hasNext()) { - LOG.debug("streams iterator has next"); final byte[] message = it.next().message(); synchronized (this) { this.message = message; + // Wake up getMessage() if it is waiting + if (this.waiting) + notify(); while (this.message != null && this.shouldContinue) try { wait(); @@ -138,22 +146,36 @@ public abstract class KafkaSpout extends BaseRichSpout implements Runnable { } } - private synchronized byte[] getMessage() { + /** + * Must only be called from a synchronized method + * + * @return + */ + private byte[] tryToGetMessage() { final byte[] result = this.message; if (result != null) { this.message = null; notify(); } - else { - // Storm docs recommend a short sleep - try { - Thread.sleep(10); - } catch (InterruptedException e) { - LOG.info("Sleep interrupted", e); - } - } return result; } + private synchronized byte[] getMessage() { + final byte[] result = tryToGetMessage(); + if (result != null) { + return result; + } + // Storm docs recommend a short sleep but make the sleep time + // configurable so we can lessen the load on dev systems + this.waiting = true; + try { + wait(kafkaSpoutConfig.maxWaitTime); + } catch (InterruptedException e) { + LOG.info("Sleep interrupted", e); + } + this.waiting = false; + return tryToGetMessage(); // We might have been woken up because there was a message + } + protected abstract void processMessage(byte[] message, SpoutOutputCollector collector2); } diff --git a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricSpout.java b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricSpout.java index 03eda73..8fb485e 100644 --- a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricSpout.java +++ b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricSpout.java @@ -38,7 +38,7 @@ public class MetricSpout extends KafkaSpout { public static final String DEFAULT_TENANT_ID = "TENANT_ID_NOT_SET"; public MetricSpout(MetricSpoutConfig metricSpoutConfig) { - super(metricSpoutConfig.kafkaConsumerConfiguration); + super(metricSpoutConfig); LOG.info("Created"); }