diff --git a/src/main/config/mon-thresh-sample-config.yml b/src/main/config/mon-thresh-sample-config.yml
index 0109022..0b0f013 100644
--- a/src/main/config/mon-thresh-sample-config.yml
+++ b/src/main/config/mon-thresh-sample-config.yml
@@ -2,6 +2,7 @@ metricSpoutThreads: 2
 metricSpoutTasks: 2
 
 metricSpoutConfig:
+  maxWaitTime: 500
 
   #Kafka settings.
   kafkaConsumerConfiguration:
@@ -33,6 +34,8 @@ eventSpoutThreads: 2
 eventSpoutTasks: 2
 
 eventSpoutConfig:
+  maxWaitTime: 500
+
   #Kafka settings.
   kafkaConsumerConfiguration:
   # See http://kafka.apache.org/documentation.html#api for semantics and defaults.
diff --git a/src/main/java/com/hpcloud/mon/EventSpoutConfig.java b/src/main/java/com/hpcloud/mon/EventSpoutConfig.java
index 00c830d..9ac0540 100644
--- a/src/main/java/com/hpcloud/mon/EventSpoutConfig.java
+++ b/src/main/java/com/hpcloud/mon/EventSpoutConfig.java
@@ -16,14 +16,7 @@
  */
 package com.hpcloud.mon;
 
-import java.io.Serializable;
-
-import com.hpcloud.configuration.KafkaConsumerConfiguration;
-
-public class EventSpoutConfig implements Serializable {
+public class EventSpoutConfig extends KafkaSpoutConfig {
 
     private static final long serialVersionUID = -8129774848323598123L;
-
-    public KafkaConsumerConfiguration kafkaConsumerConfiguration;
-
 }
diff --git a/src/main/java/com/hpcloud/mon/KafkaSpoutConfig.java b/src/main/java/com/hpcloud/mon/KafkaSpoutConfig.java
new file mode 100644
index 0000000..c3bda2b
--- /dev/null
+++ b/src/main/java/com/hpcloud/mon/KafkaSpoutConfig.java
@@ -0,0 +1,16 @@
+package com.hpcloud.mon;
+
+import java.io.Serializable;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.hpcloud.configuration.KafkaConsumerConfiguration;
+
+public class KafkaSpoutConfig implements Serializable {
+
+    private static final long serialVersionUID = -6477042435089264571L;
+
+    @JsonProperty
+    public Integer maxWaitTime = 100;
+
+    public KafkaConsumerConfiguration kafkaConsumerConfiguration;
+}
diff --git a/src/main/java/com/hpcloud/mon/MetricSpoutConfig.java b/src/main/java/com/hpcloud/mon/MetricSpoutConfig.java
index ddc0eee..eb93ece 100644
--- a/src/main/java/com/hpcloud/mon/MetricSpoutConfig.java
+++ b/src/main/java/com/hpcloud/mon/MetricSpoutConfig.java
@@ -16,13 +16,7 @@
  */
 package com.hpcloud.mon;
 
-import java.io.Serializable;
-
-import com.hpcloud.configuration.*;
-
-public class MetricSpoutConfig implements Serializable {
+public class MetricSpoutConfig extends KafkaSpoutConfig {
 
     private static final long serialVersionUID = -4285448019855024921L;
-
-    public KafkaConsumerConfiguration kafkaConsumerConfiguration;
 }
diff --git a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/EventSpout.java b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/EventSpout.java
index f1bf097..87826ff 100644
--- a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/EventSpout.java
+++ b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/EventSpout.java
@@ -37,7 +37,7 @@ public class EventSpout extends KafkaSpout {
     private final EventDeserializer deserializer;
 
     public EventSpout(EventSpoutConfig configuration, EventDeserializer deserializer) {
-        super(configuration.kafkaConsumerConfiguration);
+        super(configuration);
         this.deserializer = deserializer;
         LOG.info("EventSpout created");
     }
diff --git a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/KafkaSpout.java b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/KafkaSpout.java
index 4e2a3f7..da30677 100644
--- a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/KafkaSpout.java
+++ b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/KafkaSpout.java
@@ -19,13 +19,16 @@ package com.hpcloud.mon.infrastructure.thresholding;
 import backtype.storm.spout.SpoutOutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.base.BaseRichSpout;
-import com.hpcloud.configuration.KafkaConsumerConfiguration;
+
 import com.hpcloud.configuration.KafkaConsumerProperties;
+import com.hpcloud.mon.KafkaSpoutConfig;
+
 import kafka.consumer.Consumer;
 import kafka.consumer.ConsumerConfig;
 import kafka.consumer.ConsumerIterator;
 import kafka.consumer.KafkaStream;
 import kafka.javaapi.consumer.ConsumerConnector;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,7 +42,7 @@ public abstract class KafkaSpout extends BaseRichSpout implements Runnable {
 
     private static final long serialVersionUID = 744004533863562119L;
 
-    private final KafkaConsumerConfiguration kafkaConsumerConfig;
+    private final KafkaSpoutConfig kafkaSpoutConfig;
 
     private transient ConsumerConnector consumerConnector;
 
@@ -55,8 +58,10 @@ public abstract class KafkaSpout extends BaseRichSpout implements Runnable {
 
     private String spoutName;
 
-    protected KafkaSpout(KafkaConsumerConfiguration kafkaConsumerConfig) {
-        this.kafkaConsumerConfig = kafkaConsumerConfig;
+    private boolean waiting = false;
+
+    protected KafkaSpout(KafkaSpoutConfig kafkaSpoutConfig) {
+        this.kafkaSpoutConfig = kafkaSpoutConfig;
     }
 
     @Override
@@ -64,9 +69,9 @@ public abstract class KafkaSpout extends BaseRichSpout implements Runnable {
         LOG.info("Activated");
         if (streams == null) {
             Map<String, Integer> topicCountMap = new HashMap<>();
-            topicCountMap.put(kafkaConsumerConfig.getTopic(), new Integer(1));
+            topicCountMap.put(kafkaSpoutConfig.kafkaConsumerConfiguration.getTopic(), new Integer(1));
             Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
-            streams = consumerMap.get(kafkaConsumerConfig.getTopic());
+            streams = consumerMap.get(kafkaSpoutConfig.kafkaConsumerConfiguration.getTopic());
         }
     }
 
@@ -74,10 +79,11 @@ public abstract class KafkaSpout extends BaseRichSpout implements Runnable {
     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
         LOG.info("Opened");
         this.collector = collector;
-        LOG.info(" topic = " + kafkaConsumerConfig.getTopic());
+        LOG.info(" topic = " + kafkaSpoutConfig.kafkaConsumerConfiguration.getTopic());
         this.spoutName = String.format("%s-%d", context.getThisComponentId(), context.getThisTaskId());
 
-        Properties kafkaProperties = KafkaConsumerProperties.createKafkaProperties(kafkaConsumerConfig);
+        Properties kafkaProperties = KafkaConsumerProperties.createKafkaProperties(
+                kafkaSpoutConfig.kafkaConsumerConfiguration);
         // Have to use a different consumer.id for each spout so use the storm taskId. Otherwise,
         // zookeeper complains about a conflicted ephemeral node when there is more than one spout
         // reading from a topic
@@ -100,10 +106,12 @@ public abstract class KafkaSpout extends BaseRichSpout implements Runnable {
         while (this.shouldContinue) {
             final ConsumerIterator<byte[], byte[]> it = streams.get(0).iterator();
             if (it.hasNext()) {
-                LOG.debug("streams iterator has next");
                 final byte[] message = it.next().message();
                 synchronized (this) {
                     this.message = message;
+                    // Wake up getMessage() if it is waiting
+                    if (this.waiting)
+                        notify();
                     while (this.message != null && this.shouldContinue)
                         try {
                             wait();
@@ -138,22 +146,36 @@ public abstract class KafkaSpout extends BaseRichSpout implements Runnable {
         }
     }
 
-    private synchronized byte[] getMessage() {
+    /**
+     * Must only be called from a synchronized method
+     *
+     * @return
+     */
+    private byte[] tryToGetMessage() {
         final byte[] result = this.message;
         if (result != null) {
             this.message = null;
             notify();
         }
-        else {
-            // Storm docs recommend a short sleep
-            try {
-                Thread.sleep(10);
-            } catch (InterruptedException e) {
-                LOG.info("Sleep interrupted", e);
-            }
-        }
         return result;
     }
 
+    private synchronized byte[] getMessage() {
+        final byte[] result = tryToGetMessage();
+        if (result != null) {
+            return result;
+        }
+        // Storm docs recommend a short sleep but make the sleep time
+        // configurable so we can lessen the load on dev systems
+        this.waiting = true;
+        try {
+            wait(kafkaSpoutConfig.maxWaitTime);
+        } catch (InterruptedException e) {
+            LOG.info("Sleep interrupted", e);
+        }
+        this.waiting = false;
+        return tryToGetMessage(); // We might have been woken up because there was a message
+    }
+
     protected abstract void processMessage(byte[] message, SpoutOutputCollector collector2);
 }
diff --git a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricSpout.java b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricSpout.java
index 03eda73..8fb485e 100644
--- a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricSpout.java
+++ b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricSpout.java
@@ -38,7 +38,7 @@ public class MetricSpout extends KafkaSpout {
     public static final String DEFAULT_TENANT_ID = "TENANT_ID_NOT_SET";
 
     public MetricSpout(MetricSpoutConfig metricSpoutConfig) {
-        super(metricSpoutConfig.kafkaConsumerConfiguration);
+        super(metricSpoutConfig);
         LOG.info("Created");
     }