Merge "Java persister config: defaults and robustness"
This commit is contained in:
commit
daaf71a4d2
@ -17,8 +17,10 @@
|
||||
|
||||
package monasca.persister.configuration;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
@JsonIgnoreProperties(ignoreUnknown=true)
|
||||
public class KafkaConfig {
|
||||
|
||||
@JsonProperty
|
||||
@ -26,110 +28,170 @@ public class KafkaConfig {
|
||||
|
||||
@JsonProperty
|
||||
String zookeeperConnect;
|
||||
String _zookeeperConnect = "127.0.0.1";
|
||||
|
||||
@JsonProperty
|
||||
Integer socketTimeoutMs;
|
||||
Integer _socketTimeoutMs = 30000;
|
||||
|
||||
@JsonProperty
|
||||
Integer socketReceiveBufferBytes;
|
||||
Integer _socketReceiveBufferBytes = 65536;
|
||||
|
||||
@JsonProperty
|
||||
Integer fetchMessageMaxBytes;
|
||||
Integer _fetchMessageMaxBytes = 1048576;
|
||||
|
||||
@JsonProperty
|
||||
Integer queuedMaxMessageChunks;
|
||||
Integer _queuedMaxMessageChunks = 10;
|
||||
|
||||
@JsonProperty
|
||||
Integer rebalanceMaxRetries;
|
||||
Integer _rebalanceMaxRetries = 4;
|
||||
|
||||
@JsonProperty
|
||||
Integer fetchMinBytes;
|
||||
Integer _fetchMinBytes = 1;
|
||||
|
||||
@JsonProperty
|
||||
Integer fetchWaitMaxMs;
|
||||
Integer _fetchWaitMaxMs = 100;
|
||||
|
||||
@JsonProperty
|
||||
Integer rebalanceBackoffMs;
|
||||
Integer _rebalanceBackoffMs = 2000;
|
||||
|
||||
@JsonProperty
|
||||
Integer refreshLeaderBackoffMs;
|
||||
Integer _refreshLeaderBackoffMs = 200;
|
||||
|
||||
@JsonProperty
|
||||
String autoOffsetReset;
|
||||
String _autoOffsetReset = "largest";
|
||||
|
||||
@JsonProperty
|
||||
Integer consumerTimeoutMs;
|
||||
Integer _consumerTimeoutMs = 1000;
|
||||
|
||||
@JsonProperty
|
||||
Integer zookeeperSessionTimeoutMs;
|
||||
Integer _zookeeperSessionTimeoutMs = 60000;
|
||||
|
||||
@JsonProperty
|
||||
Integer zookeeperConnectionTimeoutMs;
|
||||
Integer _zookeeperConnectionTimeoutMs = 60000;
|
||||
|
||||
@JsonProperty
|
||||
Integer zookeeperSyncTimeMs;
|
||||
Integer _zookeeperSyncTimeMs = 2000;
|
||||
|
||||
public String getTopic() {
|
||||
return topic;
|
||||
}
|
||||
|
||||
public String getZookeeperConnect() {
|
||||
if ( zookeeperConnect == null ) {
|
||||
return _zookeeperConnect;
|
||||
}
|
||||
return zookeeperConnect;
|
||||
}
|
||||
|
||||
public Integer getSocketTimeoutMs() {
|
||||
if ( socketTimeoutMs == null ) {
|
||||
return _socketTimeoutMs;
|
||||
}
|
||||
return socketTimeoutMs;
|
||||
}
|
||||
|
||||
public Integer getSocketReceiveBufferBytes() {
|
||||
if ( socketReceiveBufferBytes == null ) {
|
||||
return _socketReceiveBufferBytes;
|
||||
}
|
||||
return socketReceiveBufferBytes;
|
||||
}
|
||||
|
||||
public Integer getFetchMessageMaxBytes() {
|
||||
if ( fetchMessageMaxBytes == null ) {
|
||||
return _fetchMessageMaxBytes;
|
||||
}
|
||||
return fetchMessageMaxBytes;
|
||||
}
|
||||
|
||||
public Integer getQueuedMaxMessageChunks() {
|
||||
if ( queuedMaxMessageChunks == null ) {
|
||||
return _queuedMaxMessageChunks;
|
||||
}
|
||||
return queuedMaxMessageChunks;
|
||||
}
|
||||
|
||||
public Integer getRebalanceMaxRetries() {
|
||||
if ( rebalanceMaxRetries == null ) {
|
||||
return _rebalanceMaxRetries;
|
||||
}
|
||||
return rebalanceMaxRetries;
|
||||
}
|
||||
|
||||
public Integer getFetchMinBytes() {
|
||||
if ( fetchMinBytes == null ) {
|
||||
return _fetchMinBytes;
|
||||
}
|
||||
return fetchMinBytes;
|
||||
}
|
||||
|
||||
public Integer getFetchWaitMaxMs() {
|
||||
if ( fetchWaitMaxMs == null ) {
|
||||
return _fetchWaitMaxMs;
|
||||
}
|
||||
return fetchWaitMaxMs;
|
||||
}
|
||||
|
||||
public Integer getRebalanceBackoffMs() {
|
||||
if ( rebalanceBackoffMs == null ) {
|
||||
return _rebalanceBackoffMs;
|
||||
}
|
||||
return rebalanceBackoffMs;
|
||||
}
|
||||
|
||||
public Integer getRefreshLeaderBackoffMs() {
|
||||
if ( refreshLeaderBackoffMs == null ) {
|
||||
return _refreshLeaderBackoffMs;
|
||||
}
|
||||
return refreshLeaderBackoffMs;
|
||||
}
|
||||
|
||||
public String getAutoOffsetReset() {
|
||||
if ( autoOffsetReset == null ) {
|
||||
return _autoOffsetReset;
|
||||
}
|
||||
return autoOffsetReset;
|
||||
}
|
||||
|
||||
public Integer getConsumerTimeoutMs() {
|
||||
if ( consumerTimeoutMs == null ) {
|
||||
return _consumerTimeoutMs;
|
||||
}
|
||||
return consumerTimeoutMs;
|
||||
}
|
||||
|
||||
public Integer getZookeeperSessionTimeoutMs() {
|
||||
if ( zookeeperSessionTimeoutMs == null ) {
|
||||
return _zookeeperSessionTimeoutMs;
|
||||
}
|
||||
return zookeeperSessionTimeoutMs;
|
||||
}
|
||||
|
||||
public Integer getZookeeperConnectionTimeoutMs() {
|
||||
if ( zookeeperConnectionTimeoutMs == null ) {
|
||||
return _zookeeperConnectionTimeoutMs;
|
||||
}
|
||||
return zookeeperConnectionTimeoutMs;
|
||||
}
|
||||
|
||||
public Integer getZookeeperSyncTimeMs() {
|
||||
if ( zookeeperSyncTimeMs == null ) {
|
||||
return _zookeeperSyncTimeMs;
|
||||
}
|
||||
return zookeeperSyncTimeMs;
|
||||
}
|
||||
}
|
||||
|
@ -19,6 +19,7 @@
|
||||
|
||||
package monasca.persister.configuration;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
import monasca.common.configuration.CassandraDbConfiguration;
|
||||
@ -30,22 +31,30 @@ import io.dropwizard.db.DataSourceFactory;
|
||||
import javax.validation.Valid;
|
||||
import javax.validation.constraints.NotNull;
|
||||
|
||||
@JsonIgnoreProperties(ignoreUnknown=true)
|
||||
public class PersisterConfig extends Configuration {
|
||||
|
||||
@JsonProperty
|
||||
private String name;
|
||||
private String _name = "monasca-persister";
|
||||
|
||||
public String getName() {
|
||||
if ( name == null ) {
|
||||
return _name;
|
||||
}
|
||||
return name;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
@NotNull
|
||||
@Valid
|
||||
private final PipelineConfig alarmHistoryConfiguration =
|
||||
new PipelineConfig();
|
||||
private final PipelineConfig alarmHistoryConfiguration = new PipelineConfig();
|
||||
|
||||
public PipelineConfig getAlarmHistoryConfiguration() {
|
||||
// Set alarm history configuration specific defaults
|
||||
alarmHistoryConfiguration.setDefaults("alarm-state-transitions",
|
||||
"1_alarm-state-transitions",
|
||||
1);
|
||||
return alarmHistoryConfiguration;
|
||||
}
|
||||
|
||||
@ -54,7 +63,12 @@ public class PersisterConfig extends Configuration {
|
||||
@Valid
|
||||
private final PipelineConfig metricConfiguration = new PipelineConfig();
|
||||
|
||||
|
||||
public PipelineConfig getMetricConfiguration() {
|
||||
// Set metric configuration specific defaults
|
||||
metricConfiguration.setDefaults("metrics",
|
||||
"1_metrics",
|
||||
20000);
|
||||
return metricConfiguration;
|
||||
}
|
||||
|
||||
|
@ -17,35 +17,58 @@
|
||||
|
||||
package monasca.persister.configuration;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
@JsonIgnoreProperties(ignoreUnknown=true)
|
||||
public class PipelineConfig {
|
||||
|
||||
@JsonProperty
|
||||
String topic;
|
||||
String _topic; // No default: default provided by constructor
|
||||
|
||||
@JsonProperty
|
||||
String groupId;
|
||||
String _groupId; // No default: default provided by constructor
|
||||
|
||||
@JsonProperty
|
||||
String consumerId;
|
||||
String _consumerId = "monasca-persister";
|
||||
|
||||
@JsonProperty
|
||||
String clientId;
|
||||
String _clientId = "monasca-persister";
|
||||
|
||||
@JsonProperty
|
||||
Integer batchSize;
|
||||
Integer _batchSize; // No default: default provided by constructor
|
||||
|
||||
@JsonProperty
|
||||
Integer numThreads;
|
||||
Integer _numThreads = 1;
|
||||
|
||||
@JsonProperty
|
||||
Integer maxBatchTime;
|
||||
Integer _maxBatchTime = 10;
|
||||
|
||||
@JsonProperty
|
||||
Integer commitBatchTime;
|
||||
Integer _commitBatchTime = 0;
|
||||
|
||||
/** Used to set default values for properties that have different sensible
|
||||
* defaults for metric and alarm configurations, respectively.
|
||||
*/
|
||||
public void setDefaults(String defaultTopic, String defaultGroupId,
|
||||
Integer defaultBatchSize) {
|
||||
_batchSize = defaultBatchSize;
|
||||
_groupId = defaultGroupId;
|
||||
_topic = defaultTopic;
|
||||
}
|
||||
|
||||
public Integer getCommitBatchTime() {
|
||||
if ( commitBatchTime == null ) {
|
||||
return _commitBatchTime;
|
||||
}
|
||||
return commitBatchTime;
|
||||
}
|
||||
|
||||
@ -54,10 +77,16 @@ public class PipelineConfig {
|
||||
}
|
||||
|
||||
public String getTopic() {
|
||||
if ( topic == null ) {
|
||||
return _topic;
|
||||
}
|
||||
return topic;
|
||||
}
|
||||
|
||||
public String getGroupId() {
|
||||
if ( groupId == null ) {
|
||||
return _groupId;
|
||||
}
|
||||
return groupId;
|
||||
}
|
||||
|
||||
@ -66,6 +95,9 @@ public class PipelineConfig {
|
||||
}
|
||||
|
||||
public String getConsumerId() {
|
||||
if ( consumerId == null ) {
|
||||
return _consumerId;
|
||||
}
|
||||
return consumerId;
|
||||
}
|
||||
|
||||
@ -74,6 +106,9 @@ public class PipelineConfig {
|
||||
}
|
||||
|
||||
public String getClientId() {
|
||||
if ( clientId == null ) {
|
||||
return _clientId;
|
||||
}
|
||||
return clientId;
|
||||
}
|
||||
|
||||
@ -94,14 +129,23 @@ public class PipelineConfig {
|
||||
}
|
||||
|
||||
public Integer getBatchSize() {
|
||||
if ( batchSize == null ) {
|
||||
return _batchSize;
|
||||
}
|
||||
return batchSize;
|
||||
}
|
||||
|
||||
public Integer getNumThreads() {
|
||||
if ( numThreads == null ) {
|
||||
return _numThreads;
|
||||
}
|
||||
return numThreads;
|
||||
}
|
||||
|
||||
public Integer getMaxBatchTime() {
|
||||
if ( maxBatchTime == null ) {
|
||||
return _maxBatchTime;
|
||||
}
|
||||
return maxBatchTime;
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user