Have to use a different consumer.id for each instance of a KafkaSpout so use the storm taskId. Otherwise, zookeeper complains about a conflicted ephemeral node when there is more than one spout reading from a topic
Have two metric and event spouts to make sure the messages stop coming out. More like the real configuration.
This commit is contained in:
parent
a3429536d2
commit
9050956a3b
@ -1,5 +1,5 @@
|
||||
metricSpoutThreads: 1
|
||||
metricSpoutTasks: 1
|
||||
metricSpoutThreads: 2
|
||||
metricSpoutTasks: 2
|
||||
|
||||
metricSpoutConfig:
|
||||
|
||||
@ -29,8 +29,8 @@ metricSpoutConfig:
|
||||
zookeeperConnectionTimeoutMs : 6000
|
||||
zookeeperSyncTimeMs: 2000
|
||||
|
||||
eventSpoutThreads: 1
|
||||
eventSpoutTasks: 1
|
||||
eventSpoutThreads: 2
|
||||
eventSpoutTasks: 2
|
||||
|
||||
eventSpoutConfig:
|
||||
#Kafka settings.
|
||||
|
@ -53,6 +53,10 @@ public abstract class KafkaSpout extends BaseRichSpout {
|
||||
LOG.info(" topic = " + kafkaConsumerConfig.getTopic());
|
||||
|
||||
Properties kafkaProperties = KafkaConsumerProperties.createKafkaProperties(kafkaConsumerConfig);
|
||||
// Have to use a different consumer.id for each spout so use the storm taskId. Otherwise,
|
||||
// zookeeper complains about a conflicted ephemeral node when there is more than one spout
|
||||
// reading from a topic
|
||||
kafkaProperties.setProperty("consumer.id", String.valueOf(context.getThisTaskId()));
|
||||
ConsumerConfig consumerConfig = new ConsumerConfig(kafkaProperties);
|
||||
this.consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user