diff --git a/src/main/config/mon-thresh-sample-config.yml b/src/main/config/mon-thresh-sample-config.yml index ebca360..4c3f6aa 100644 --- a/src/main/config/mon-thresh-sample-config.yml +++ b/src/main/config/mon-thresh-sample-config.yml @@ -1,5 +1,5 @@ -metricSpoutThreads: 1 -metricSpoutTasks: 1 +metricSpoutThreads: 2 +metricSpoutTasks: 2 metricSpoutConfig: @@ -29,8 +29,8 @@ metricSpoutConfig: zookeeperConnectionTimeoutMs : 6000 zookeeperSyncTimeMs: 2000 -eventSpoutThreads: 1 -eventSpoutTasks: 1 +eventSpoutThreads: 2 +eventSpoutTasks: 2 eventSpoutConfig: #Kafka settings. diff --git a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/KafkaSpout.java b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/KafkaSpout.java index 028c7f9..5a1ae07 100644 --- a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/KafkaSpout.java +++ b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/KafkaSpout.java @@ -53,6 +53,10 @@ public abstract class KafkaSpout extends BaseRichSpout { LOG.info(" topic = " + kafkaConsumerConfig.getTopic()); Properties kafkaProperties = KafkaConsumerProperties.createKafkaProperties(kafkaConsumerConfig); + // Have to use a different consumer.id for each spout so use the storm taskId. Otherwise, + // zookeeper complains about a conflicted ephemeral node when there is more than one spout + // reading from a topic + kafkaProperties.setProperty("consumer.id", String.valueOf(context.getThisTaskId())); ConsumerConfig consumerConfig = new ConsumerConfig(kafkaProperties); this.consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); }