diff --git a/java/src/main/java/monasca/persister/configuration/KafkaConfig.java b/java/src/main/java/monasca/persister/configuration/KafkaConfig.java index ee2102de..6fd1454d 100644 --- a/java/src/main/java/monasca/persister/configuration/KafkaConfig.java +++ b/java/src/main/java/monasca/persister/configuration/KafkaConfig.java @@ -17,8 +17,10 @@ package monasca.persister.configuration; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; +@JsonIgnoreProperties(ignoreUnknown=true) public class KafkaConfig { @JsonProperty @@ -26,110 +28,170 @@ public class KafkaConfig { @JsonProperty String zookeeperConnect; + String _zookeeperConnect = "127.0.0.1"; @JsonProperty Integer socketTimeoutMs; + Integer _socketTimeoutMs = 30000; @JsonProperty Integer socketReceiveBufferBytes; + Integer _socketReceiveBufferBytes = 65536; @JsonProperty Integer fetchMessageMaxBytes; + Integer _fetchMessageMaxBytes = 1048576; @JsonProperty Integer queuedMaxMessageChunks; + Integer _queuedMaxMessageChunks = 10; @JsonProperty Integer rebalanceMaxRetries; + Integer _rebalanceMaxRetries = 4; @JsonProperty Integer fetchMinBytes; + Integer _fetchMinBytes = 1; @JsonProperty Integer fetchWaitMaxMs; + Integer _fetchWaitMaxMs = 100; @JsonProperty Integer rebalanceBackoffMs; + Integer _rebalanceBackoffMs = 2000; @JsonProperty Integer refreshLeaderBackoffMs; + Integer _refreshLeaderBackoffMs = 200; @JsonProperty String autoOffsetReset; + String _autoOffsetReset = "largest"; @JsonProperty Integer consumerTimeoutMs; + Integer _consumerTimeoutMs = 1000; @JsonProperty Integer zookeeperSessionTimeoutMs; + Integer _zookeeperSessionTimeoutMs = 60000; @JsonProperty Integer zookeeperConnectionTimeoutMs; + Integer _zookeeperConnectionTimeoutMs = 60000; @JsonProperty Integer zookeeperSyncTimeMs; + Integer _zookeeperSyncTimeMs = 2000; public String getTopic() { return topic; } public String getZookeeperConnect() { + if ( zookeeperConnect == null ) { + return _zookeeperConnect; + } return zookeeperConnect; } public Integer getSocketTimeoutMs() { + if ( socketTimeoutMs == null ) { + return _socketTimeoutMs; + } return socketTimeoutMs; } public Integer getSocketReceiveBufferBytes() { + if ( socketReceiveBufferBytes == null ) { + return _socketReceiveBufferBytes; + } return socketReceiveBufferBytes; } public Integer getFetchMessageMaxBytes() { + if ( fetchMessageMaxBytes == null ) { + return _fetchMessageMaxBytes; + } return fetchMessageMaxBytes; } public Integer getQueuedMaxMessageChunks() { + if ( queuedMaxMessageChunks == null ) { + return _queuedMaxMessageChunks; + } return queuedMaxMessageChunks; } public Integer getRebalanceMaxRetries() { + if ( rebalanceMaxRetries == null ) { + return _rebalanceMaxRetries; + } return rebalanceMaxRetries; } public Integer getFetchMinBytes() { + if ( fetchMinBytes == null ) { + return _fetchMinBytes; + } return fetchMinBytes; } public Integer getFetchWaitMaxMs() { + if ( fetchWaitMaxMs == null ) { + return _fetchWaitMaxMs; + } return fetchWaitMaxMs; } public Integer getRebalanceBackoffMs() { + if ( rebalanceBackoffMs == null ) { + return _rebalanceBackoffMs; + } return rebalanceBackoffMs; } public Integer getRefreshLeaderBackoffMs() { + if ( refreshLeaderBackoffMs == null ) { + return _refreshLeaderBackoffMs; + } return refreshLeaderBackoffMs; } public String getAutoOffsetReset() { + if ( autoOffsetReset == null ) { + return _autoOffsetReset; + } return autoOffsetReset; } public Integer getConsumerTimeoutMs() { + if ( consumerTimeoutMs == null ) { + return _consumerTimeoutMs; + } return consumerTimeoutMs; } public Integer getZookeeperSessionTimeoutMs() { + if ( zookeeperSessionTimeoutMs == null ) { + return _zookeeperSessionTimeoutMs; + } return zookeeperSessionTimeoutMs; } public Integer getZookeeperConnectionTimeoutMs() { + if ( zookeeperConnectionTimeoutMs == null ) { + return _zookeeperConnectionTimeoutMs; + } return zookeeperConnectionTimeoutMs; } public Integer getZookeeperSyncTimeMs() { + if ( zookeeperSyncTimeMs == null ) { + return _zookeeperSyncTimeMs; + } return zookeeperSyncTimeMs; } } diff --git a/java/src/main/java/monasca/persister/configuration/PersisterConfig.java b/java/src/main/java/monasca/persister/configuration/PersisterConfig.java index b8d09a68..9efab2e9 100644 --- a/java/src/main/java/monasca/persister/configuration/PersisterConfig.java +++ b/java/src/main/java/monasca/persister/configuration/PersisterConfig.java @@ -19,6 +19,7 @@ package monasca.persister.configuration; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; import monasca.common.configuration.CassandraDbConfiguration; @@ -30,22 +31,30 @@ import io.dropwizard.db.DataSourceFactory; import javax.validation.Valid; import javax.validation.constraints.NotNull; +@JsonIgnoreProperties(ignoreUnknown=true) public class PersisterConfig extends Configuration { @JsonProperty private String name; + private String _name = "monasca-persister"; public String getName() { + if ( name == null ) { + return _name; + } return name; } @JsonProperty @NotNull @Valid - private final PipelineConfig alarmHistoryConfiguration = - new PipelineConfig(); + private final PipelineConfig alarmHistoryConfiguration = new PipelineConfig(); public PipelineConfig getAlarmHistoryConfiguration() { + // Set alarm history configuration specific defaults + alarmHistoryConfiguration.setDefaults("alarm-state-transitions", + "1_alarm-state-transitions", + 1); return alarmHistoryConfiguration; } @@ -54,7 +63,12 @@ public class PersisterConfig extends Configuration { @Valid private final PipelineConfig metricConfiguration = new PipelineConfig(); + public PipelineConfig getMetricConfiguration() { + // Set metric configuration specific defaults + metricConfiguration.setDefaults("metrics", + "1_metrics", + 20000); return metricConfiguration; } diff --git a/java/src/main/java/monasca/persister/configuration/PipelineConfig.java b/java/src/main/java/monasca/persister/configuration/PipelineConfig.java index b5f95016..d578280e 100644 --- a/java/src/main/java/monasca/persister/configuration/PipelineConfig.java +++ b/java/src/main/java/monasca/persister/configuration/PipelineConfig.java @@ -17,35 +17,58 @@ package monasca.persister.configuration; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; +@JsonIgnoreProperties(ignoreUnknown=true) public class PipelineConfig { @JsonProperty String topic; + String _topic; // No default: default provided by constructor @JsonProperty String groupId; + String _groupId; // No default: default provided by constructor @JsonProperty String consumerId; + String _consumerId = "monasca-persister"; @JsonProperty String clientId; + String _clientId = "monasca-persister"; @JsonProperty Integer batchSize; + Integer _batchSize; // No default: default provided by constructor @JsonProperty Integer numThreads; + Integer _numThreads = 1; @JsonProperty Integer maxBatchTime; + Integer _maxBatchTime = 10; @JsonProperty Integer commitBatchTime; + Integer _commitBatchTime = 0; + + /** Used to set default values for properties that have different sensible + * defaults for metric and alarm configurations, respectively. + */ + public void setDefaults(String defaultTopic, String defaultGroupId, + Integer defaultBatchSize) { + _batchSize = defaultBatchSize; + _groupId = defaultGroupId; + _topic = defaultTopic; + } public Integer getCommitBatchTime() { + if ( commitBatchTime == null ) { + return _commitBatchTime; + } return commitBatchTime; } @@ -54,10 +77,16 @@ public class PipelineConfig { } public String getTopic() { + if ( topic == null ) { + return _topic; + } return topic; } public String getGroupId() { + if ( groupId == null ) { + return _groupId; + } return groupId; } @@ -66,6 +95,9 @@ public class PipelineConfig { } public String getConsumerId() { + if ( consumerId == null ) { + return _consumerId; + } return consumerId; } @@ -74,6 +106,9 @@ public class PipelineConfig { } public String getClientId() { + if ( clientId == null ) { + return _clientId; + } return clientId; } @@ -94,14 +129,23 @@ public class PipelineConfig { } public Integer getBatchSize() { + if ( batchSize == null ) { + return _batchSize; + } return batchSize; } public Integer getNumThreads() { + if ( numThreads == null ) { + return _numThreads; + } return numThreads; } public Integer getMaxBatchTime() { + if ( maxBatchTime == null ) { + return _maxBatchTime; + } return maxBatchTime; } }