Add Influxdb 0.9.0 support

Shorten 'Repository' to 'Repo'
Shorten 'Configuration' to 'Config'

Change-Id: If07b2b307a010f5742d3ccbe0f43965bf7284991
This commit is contained in:
Deklan Dieterly 2015-02-05 09:21:10 -07:00
parent 379f0b8e60
commit 895c3aa8c7
33 changed files with 1192 additions and 617 deletions

@ -99,6 +99,11 @@
<artifactId>influxdb-java</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.4</version>
</dependency>
</dependencies>
@ -146,7 +151,7 @@
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>monasca.persister.MonPersisterApplication
<mainClass>monasca.persister.PersisterApplication
</mainClass>
</transformer>
</transformers>

@ -17,7 +17,7 @@
package monasca.persister;
import monasca.persister.configuration.MonPersisterConfiguration;
import monasca.persister.configuration.PersisterConfig;
import monasca.persister.consumer.AlarmStateTransitionConsumer;
import monasca.persister.consumer.AlarmStateTransitionConsumerFactory;
import monasca.persister.consumer.KafkaAlarmStateTransitionConsumer;
@ -47,15 +47,15 @@ import io.dropwizard.setup.Environment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MonPersisterApplication extends Application<MonPersisterConfiguration> {
private static final Logger logger = LoggerFactory.getLogger(MonPersisterApplication.class);
public class PersisterApplication extends Application<PersisterConfig> {
private static final Logger logger = LoggerFactory.getLogger(PersisterApplication.class);
public static void main(String[] args) throws Exception {
new MonPersisterApplication().run(args);
new PersisterApplication().run(args);
}
@Override
public void initialize(Bootstrap<MonPersisterConfiguration> bootstrap) {
public void initialize(Bootstrap<PersisterConfig> bootstrap) {
}
@Override
@ -64,10 +64,10 @@ public class MonPersisterApplication extends Application<MonPersisterConfigurati
}
@Override
public void run(MonPersisterConfiguration configuration, Environment environment)
public void run(PersisterConfig configuration, Environment environment)
throws Exception {
Injector injector = Guice.createInjector(new MonPersisterModule(configuration, environment));
Injector injector = Guice.createInjector(new PersisterModule(configuration, environment));
// Sample resource.
environment.jersey().register(new Resource());
@ -109,7 +109,7 @@ public class MonPersisterApplication extends Application<MonPersisterConfigurati
}
}
private MetricPipeline getMetricPipeline(MonPersisterConfiguration configuration, int threadNum,
private MetricPipeline getMetricPipeline(PersisterConfig configuration, int threadNum,
Injector injector) {
logger.debug("Creating metric pipeline...");
@ -130,7 +130,7 @@ public class MonPersisterApplication extends Application<MonPersisterConfigurati
}
public AlarmStateTransitionPipeline getAlarmStateHistoryPipeline(
MonPersisterConfiguration configuration, int threadNum, Injector injector) {
PersisterConfig configuration, int threadNum, Injector injector) {
logger.debug("Creating alarm state history pipeline...");

@ -17,7 +17,14 @@
package monasca.persister;
import monasca.persister.configuration.MonPersisterConfiguration;
import com.google.inject.AbstractModule;
import com.google.inject.Scopes;
import com.google.inject.assistedinject.FactoryModuleBuilder;
import org.skife.jdbi.v2.DBI;
import io.dropwizard.setup.Environment;
import monasca.persister.configuration.PersisterConfig;
import monasca.persister.consumer.AlarmStateTransitionConsumer;
import monasca.persister.consumer.AlarmStateTransitionConsumerFactory;
import monasca.persister.consumer.KafkaAlarmStateTransitionConsumer;
@ -41,36 +48,37 @@ import monasca.persister.pipeline.event.AlarmStateTransitionedEventHandler;
import monasca.persister.pipeline.event.AlarmStateTransitionedEventHandlerFactory;
import monasca.persister.pipeline.event.MetricHandler;
import monasca.persister.pipeline.event.MetricHandlerFactory;
import monasca.persister.repository.AlarmRepository;
import monasca.persister.repository.InfluxDBAlarmRepository;
import monasca.persister.repository.InfluxDBMetricRepository;
import monasca.persister.repository.MetricRepository;
import monasca.persister.repository.VerticaAlarmRepository;
import monasca.persister.repository.VerticaMetricRepository;
import monasca.persister.repository.AlarmRepo;
import monasca.persister.repository.InfluxV8AlarmRepo;
import monasca.persister.repository.InfluxV8MetricRepo;
import monasca.persister.repository.InfluxV8RepoWriter;
import monasca.persister.repository.InfluxV9AlarmRepo;
import monasca.persister.repository.InfluxV9MetricRepo;
import monasca.persister.repository.InfluxV9RepoWriter;
import monasca.persister.repository.MetricRepo;
import monasca.persister.repository.VerticaAlarmRepo;
import monasca.persister.repository.VerticaMetricRepo;
import com.google.inject.AbstractModule;
import com.google.inject.Scopes;
import com.google.inject.assistedinject.FactoryModuleBuilder;
public class PersisterModule extends AbstractModule {
import io.dropwizard.setup.Environment;
private static final String VERTICA = "vertica";
private static final String INFLUXDB = "influxdb";
private static final String INFLUXDB_V8 = "v8";
private static final String INFLUXDB_V9 = "v9";
import org.skife.jdbi.v2.DBI;
private final PersisterConfig config;
private final Environment env;
public class MonPersisterModule extends AbstractModule {
private final MonPersisterConfiguration configuration;
private final Environment environment;
public MonPersisterModule(MonPersisterConfiguration configuration, Environment environment) {
this.configuration = configuration;
this.environment = environment;
public PersisterModule(PersisterConfig config, Environment env) {
this.config = config;
this.env = env;
}
@Override
protected void configure() {
bind(MonPersisterConfiguration.class).toInstance(configuration);
bind(Environment.class).toInstance(environment);
bind(PersisterConfig.class).toInstance(config);
bind(Environment.class).toInstance(env);
install(new FactoryModuleBuilder().implement(MetricHandler.class, MetricHandler.class).build(
MetricHandlerFactory.class));
@ -121,19 +129,46 @@ public class MonPersisterModule extends AbstractModule {
install(new FactoryModuleBuilder().implement(KafkaChannel.class, KafkaChannel.class).build(
KafkaChannelFactory.class));
if (configuration.getDatabaseConfiguration().getDatabaseType().equals("vertica")) {
if (config.getDatabaseConfiguration().getDatabaseType().equalsIgnoreCase(VERTICA)) {
bind(DBI.class).toProvider(DBIProvider.class).in(Scopes.SINGLETON);
bind(MetricRepository.class).to(VerticaMetricRepository.class);
bind(AlarmRepository.class).to(VerticaAlarmRepository.class);
} else if (configuration.getDatabaseConfiguration().getDatabaseType().equals("influxdb")) {
bind(MetricRepository.class).to(InfluxDBMetricRepository.class);
bind(AlarmRepository.class).to(InfluxDBAlarmRepository.class);
bind(MetricRepo.class).to(VerticaMetricRepo.class);
bind(AlarmRepo.class).to(VerticaAlarmRepo.class);
} else if (config.getDatabaseConfiguration().getDatabaseType().equalsIgnoreCase(INFLUXDB)) {
// Check for null to not break existing configs. If no version, default to V8.
if (config.getInfluxDBConfiguration().getVersion() == null ||
config.getInfluxDBConfiguration().getVersion().equalsIgnoreCase(INFLUXDB_V8)) {
bind (InfluxV8RepoWriter.class);
bind(MetricRepo.class).to(InfluxV8MetricRepo.class);
bind(AlarmRepo.class).to(InfluxV8AlarmRepo.class);
} else if (config.getInfluxDBConfiguration().getVersion().equalsIgnoreCase(INFLUXDB_V9)) {
bind(InfluxV9RepoWriter.class);
bind(MetricRepo.class).to(InfluxV9MetricRepo.class);
bind(AlarmRepo.class).to(InfluxV9AlarmRepo.class);
} else {
System.err.println(
"Found unknown Influxdb version: " + config.getInfluxDBConfiguration().getVersion());
System.err.println("Supported Influxdb versions are 'v8' and 'v9'");
System.err.println("Check your config file");
System.exit(1);
}
} else {
System.out.println("Unknown database type encountered: "
+ configuration.getDatabaseConfiguration().getDatabaseType());
System.out.println("Supported databases are 'vertica' and 'influxdb'");
System.out.println("Check your config file.");
System.err.println(
"Found unknown database type: " + config.getDatabaseConfiguration().getDatabaseType());
System.err.println("Supported databases are 'vertica' and 'influxdb'");
System.err.println("Check your config file.");
System.exit(1);
}
}
}

@ -22,7 +22,7 @@ package monasca.persister.configuration;
import com.fasterxml.jackson.annotation.JsonProperty;
public class KafkaConfiguration {
public class KafkaConfig {
@JsonProperty
String topic;

@ -30,7 +30,7 @@ import io.dropwizard.db.DataSourceFactory;
import javax.validation.Valid;
import javax.validation.constraints.NotNull;
public class MonPersisterConfiguration extends Configuration {
public class PersisterConfig extends Configuration {
@JsonProperty
private String name;
@ -42,29 +42,29 @@ public class MonPersisterConfiguration extends Configuration {
@JsonProperty
@NotNull
@Valid
private final PipelineConfiguration alarmHistoryConfiguration =
new PipelineConfiguration();
private final PipelineConfig alarmHistoryConfiguration =
new PipelineConfig();
public PipelineConfiguration getAlarmHistoryConfiguration() {
public PipelineConfig getAlarmHistoryConfiguration() {
return alarmHistoryConfiguration;
}
@JsonProperty
@NotNull
@Valid
private final PipelineConfiguration metricConfiguration = new PipelineConfiguration();
private final PipelineConfig metricConfiguration = new PipelineConfig();
public PipelineConfiguration getMetricConfiguration() {
public PipelineConfig getMetricConfiguration() {
return metricConfiguration;
}
@Valid
@NotNull
@JsonProperty
private final KafkaConfiguration kafkaConfiguration = new KafkaConfiguration();
private final KafkaConfig kafkaConfig = new KafkaConfig();
public KafkaConfiguration getKafkaConfiguration() {
return kafkaConfiguration;
public KafkaConfig getKafkaConfig() {
return kafkaConfig;
}
@JsonProperty
@ -77,11 +77,11 @@ public class MonPersisterConfiguration extends Configuration {
@Valid
@NotNull
@JsonProperty
private final VerticaMetricRepositoryConfiguration verticaMetricRepositoryConfiguration =
new VerticaMetricRepositoryConfiguration();
private final VerticaMetricRepoConfig verticaMetricRepoConfig =
new VerticaMetricRepoConfig();
public VerticaMetricRepositoryConfiguration getVerticaMetricRepositoryConfiguration() {
return verticaMetricRepositoryConfiguration;
public VerticaMetricRepoConfig getVerticaMetricRepoConfig() {
return verticaMetricRepoConfig;
}
@Valid

@ -19,7 +19,7 @@ package monasca.persister.configuration;
import com.fasterxml.jackson.annotation.JsonProperty;
public class PipelineConfiguration {
public class PipelineConfig {
@JsonProperty
String topic;

@ -22,7 +22,7 @@ package monasca.persister.configuration;
import com.fasterxml.jackson.annotation.JsonProperty;
public class VerticaMetricRepositoryConfiguration {
public class VerticaMetricRepoConfig {
@JsonProperty
Integer maxCacheSize;

@ -17,9 +17,9 @@
package monasca.persister.consumer;
import monasca.persister.configuration.KafkaConfiguration;
import monasca.persister.configuration.MonPersisterConfiguration;
import monasca.persister.configuration.PipelineConfiguration;
import monasca.persister.configuration.KafkaConfig;
import monasca.persister.configuration.PersisterConfig;
import monasca.persister.configuration.PipelineConfig;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
@ -46,12 +46,12 @@ public class KafkaChannel {
private final int threadNum;
@Inject
public KafkaChannel(@Assisted MonPersisterConfiguration configuration,
@Assisted PipelineConfiguration pipelineConfiguration, @Assisted int threadNum) {
this.topic = pipelineConfiguration.getTopic();
public KafkaChannel(@Assisted PersisterConfig configuration,
@Assisted PipelineConfig pipelineConfig, @Assisted int threadNum) {
this.topic = pipelineConfig.getTopic();
this.threadNum = threadNum;
Properties kafkaProperties =
createKafkaProperties(configuration.getKafkaConfiguration(), pipelineConfiguration);
createKafkaProperties(configuration.getKafkaConfig(), pipelineConfig);
consumerConnector = Consumer.createJavaConsumerConnector(createConsumerConfig(kafkaProperties));
}
@ -80,38 +80,38 @@ public class KafkaChannel {
return new ConsumerConfig(kafkaProperties);
}
private Properties createKafkaProperties(KafkaConfiguration kafkaConfiguration,
final PipelineConfiguration pipelineConfiguration) {
private Properties createKafkaProperties(KafkaConfig kafkaConfig,
final PipelineConfig pipelineConfig) {
Properties properties = new Properties();
properties.put("group.id", pipelineConfiguration.getGroupId());
properties.put("zookeeper.connect", kafkaConfiguration.getZookeeperConnect());
properties.put("group.id", pipelineConfig.getGroupId());
properties.put("zookeeper.connect", kafkaConfig.getZookeeperConnect());
properties.put("consumer.id",
String.format("%s_%d", pipelineConfiguration.getConsumerId(), this.threadNum));
properties.put("socket.timeout.ms", kafkaConfiguration.getSocketTimeoutMs().toString());
properties.put("socket.receive.buffer.bytes", kafkaConfiguration.getSocketReceiveBufferBytes()
String.format("%s_%d", pipelineConfig.getConsumerId(), this.threadNum));
properties.put("socket.timeout.ms", kafkaConfig.getSocketTimeoutMs().toString());
properties.put("socket.receive.buffer.bytes", kafkaConfig.getSocketReceiveBufferBytes()
.toString());
properties.put("fetch.message.max.bytes", kafkaConfiguration.getFetchMessageMaxBytes()
properties.put("fetch.message.max.bytes", kafkaConfig.getFetchMessageMaxBytes()
.toString());
// Set auto commit to false because the persister is going to explicitly commit
properties.put("auto.commit.enable", "false");
properties.put("queued.max.message.chunks", kafkaConfiguration.getQueuedMaxMessageChunks()
properties.put("queued.max.message.chunks", kafkaConfig.getQueuedMaxMessageChunks()
.toString());
properties.put("rebalance.max.retries", kafkaConfiguration.getRebalanceMaxRetries().toString());
properties.put("fetch.min.bytes", kafkaConfiguration.getFetchMinBytes().toString());
properties.put("fetch.wait.max.ms", kafkaConfiguration.getFetchWaitMaxMs().toString());
properties.put("rebalance.backoff.ms", kafkaConfiguration.getRebalanceBackoffMs().toString());
properties.put("refresh.leader.backoff.ms", kafkaConfiguration.getRefreshLeaderBackoffMs()
properties.put("rebalance.max.retries", kafkaConfig.getRebalanceMaxRetries().toString());
properties.put("fetch.min.bytes", kafkaConfig.getFetchMinBytes().toString());
properties.put("fetch.wait.max.ms", kafkaConfig.getFetchWaitMaxMs().toString());
properties.put("rebalance.backoff.ms", kafkaConfig.getRebalanceBackoffMs().toString());
properties.put("refresh.leader.backoff.ms", kafkaConfig.getRefreshLeaderBackoffMs()
.toString());
properties.put("auto.offset.reset", kafkaConfiguration.getAutoOffsetReset());
properties.put("consumer.timeout.ms", kafkaConfiguration.getConsumerTimeoutMs().toString());
properties.put("client.id", String.format("%s_%d", pipelineConfiguration.getClientId(), threadNum));
properties.put("zookeeper.session.timeout.ms", kafkaConfiguration
properties.put("auto.offset.reset", kafkaConfig.getAutoOffsetReset());
properties.put("consumer.timeout.ms", kafkaConfig.getConsumerTimeoutMs().toString());
properties.put("client.id", String.format("%s_%d", pipelineConfig.getClientId(), threadNum));
properties.put("zookeeper.session.timeout.ms", kafkaConfig
.getZookeeperSessionTimeoutMs().toString());
properties.put("zookeeper.connection.timeout.ms", kafkaConfiguration
properties.put("zookeeper.connection.timeout.ms", kafkaConfig
.getZookeeperConnectionTimeoutMs().toString());
properties
.put("zookeeper.sync.time.ms", kafkaConfiguration.getZookeeperSyncTimeMs().toString());
.put("zookeeper.sync.time.ms", kafkaConfig.getZookeeperSyncTimeMs().toString());
for (String key : properties.stringPropertyNames()) {
logger.info(KAFKA_CONFIGURATION + " " + key + " = " + properties.getProperty(key));

@ -17,10 +17,10 @@
package monasca.persister.consumer;
import monasca.persister.configuration.MonPersisterConfiguration;
import monasca.persister.configuration.PipelineConfiguration;
import monasca.persister.configuration.PersisterConfig;
import monasca.persister.configuration.PipelineConfig;
public interface KafkaChannelFactory {
KafkaChannel create(MonPersisterConfiguration configuration,
PipelineConfiguration pipelineConfiguration, int threadNum);
KafkaChannel create(PersisterConfig configuration,
PipelineConfig pipelineConfig, int threadNum);
}

@ -17,7 +17,7 @@
package monasca.persister.dbi;
import monasca.persister.configuration.MonPersisterConfiguration;
import monasca.persister.configuration.PersisterConfig;
import com.google.inject.ProvisionException;
@ -32,10 +32,10 @@ import javax.inject.Provider;
public class DBIProvider implements Provider<DBI> {
private final Environment environment;
private final MonPersisterConfiguration configuration;
private final PersisterConfig configuration;
@Inject
public DBIProvider(Environment environment, MonPersisterConfiguration configuration) {
public DBIProvider(Environment environment, PersisterConfig configuration) {
this.environment = environment;
this.configuration = configuration;
}

@ -18,8 +18,8 @@
package monasca.persister.pipeline.event;
import monasca.common.model.event.AlarmStateTransitionedEvent;
import monasca.persister.configuration.PipelineConfiguration;
import monasca.persister.repository.AlarmRepository;
import monasca.persister.configuration.PipelineConfig;
import monasca.persister.repository.AlarmRepo;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
@ -35,12 +35,12 @@ public class AlarmStateTransitionedEventHandler extends
private static final Logger logger = LoggerFactory
.getLogger(AlarmStateTransitionedEventHandler.class);
private final AlarmRepository repository;
private final AlarmRepo repository;
private final int ordinal;
@Inject
public AlarmStateTransitionedEventHandler(AlarmRepository repository,
@Assisted PipelineConfiguration configuration, Environment environment,
public AlarmStateTransitionedEventHandler(AlarmRepo repository,
@Assisted PipelineConfig configuration, Environment environment,
@Assisted("ordinal") int ordinal,
@Assisted("batchSize") int batchSize) {
super(configuration, environment, ordinal, batchSize,

@ -17,11 +17,11 @@
package monasca.persister.pipeline.event;
import monasca.persister.configuration.PipelineConfiguration;
import monasca.persister.configuration.PipelineConfig;
import com.google.inject.assistedinject.Assisted;
public interface AlarmStateTransitionedEventHandlerFactory {
AlarmStateTransitionedEventHandler create(PipelineConfiguration configuration,
AlarmStateTransitionedEventHandler create(PipelineConfig configuration,
@Assisted("ordinal") int ordinal, @Assisted("batchSize") int batchSize);
}

@ -17,7 +17,7 @@
package monasca.persister.pipeline.event;
import monasca.persister.configuration.PipelineConfiguration;
import monasca.persister.configuration.PipelineConfig;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
@ -45,7 +45,7 @@ public abstract class FlushableHandler<T> {
private final Meter commitMeter;
private final Timer commitTimer;
protected FlushableHandler(PipelineConfiguration configuration, Environment environment,
protected FlushableHandler(PipelineConfig configuration, Environment environment,
int ordinal, int batchSize, String baseName) {
this.handlerName = String.format("%s[%d]", baseName, ordinal);

@ -35,8 +35,8 @@ import java.util.TimeZone;
import java.util.TreeMap;
import io.dropwizard.setup.Environment;
import monasca.persister.configuration.PipelineConfiguration;
import monasca.persister.repository.MetricRepository;
import monasca.persister.configuration.PipelineConfig;
import monasca.persister.repository.MetricRepo;
import monasca.persister.repository.Sha1HashId;
import static monasca.persister.repository.VerticaMetricsConstants.MAX_COLUMN_LENGTH;
@ -51,7 +51,7 @@ public class MetricHandler extends FlushableHandler<MetricEnvelope[]> {
private final SimpleDateFormat simpleDateFormat;
private final MetricRepository verticaMetricRepository;
private final MetricRepo verticaMetricRepo;
private final Counter metricCounter;
private final Counter definitionCounter;
@ -59,12 +59,12 @@ public class MetricHandler extends FlushableHandler<MetricEnvelope[]> {
private final Counter definitionDimensionsCounter;
@Inject
public MetricHandler(MetricRepository metricRepository,
@Assisted PipelineConfiguration configuration, Environment environment,
public MetricHandler(MetricRepo metricRepo,
@Assisted PipelineConfig configuration, Environment environment,
@Assisted("ordinal") int ordinal, @Assisted("batchSize") int batchSize) {
super(configuration, environment, ordinal, batchSize, MetricHandler.class.getName());
final String handlerName = String.format("%s[%d]", MetricHandler.class.getName(), ordinal);
this.verticaMetricRepository = metricRepository;
this.verticaMetricRepo = metricRepo;
this.metricCounter =
environment.metrics().counter(handlerName + "." + "metrics-added-to-batch-counter");
this.definitionCounter =
@ -129,7 +129,7 @@ public class MetricHandler extends FlushableHandler<MetricEnvelope[]> {
definitionIdStringToHash.append(trunc(region, MAX_COLUMN_LENGTH));
byte[] definitionIdSha1Hash = DigestUtils.sha(definitionIdStringToHash.toString());
Sha1HashId definitionSha1HashId = new Sha1HashId((definitionIdSha1Hash));
verticaMetricRepository
verticaMetricRepo
.addDefinitionToBatch(definitionSha1HashId, trunc(metric.getName(), MAX_COLUMN_LENGTH),
trunc(tenantId, MAX_COLUMN_LENGTH), trunc(region, MAX_COLUMN_LENGTH));
definitionCounter.inc();
@ -146,7 +146,7 @@ public class MetricHandler extends FlushableHandler<MetricEnvelope[]> {
// Add the dimension name/values to the batch.
for (Map.Entry<String, String> entry : preppedDimMap.entrySet()) {
verticaMetricRepository
verticaMetricRepo
.addDimensionToBatch(dimensionsSha1HashId, entry.getKey(), entry.getValue());
dimensionCounter.inc();
}
@ -160,7 +160,7 @@ public class MetricHandler extends FlushableHandler<MetricEnvelope[]> {
definitionDimensionsIdSha1Hash =
DigestUtils.sha(definitionDimensionsIdStringToHash.toString());
Sha1HashId definitionDimensionsSha1HashId = new Sha1HashId(definitionDimensionsIdSha1Hash);
verticaMetricRepository
verticaMetricRepo
.addDefinitionDimensionToBatch(definitionDimensionsSha1HashId, definitionSha1HashId,
dimensionsSha1HashId);
definitionDimensionsCounter.inc();
@ -172,7 +172,7 @@ public class MetricHandler extends FlushableHandler<MetricEnvelope[]> {
for (double[] timeValuePairs : metric.getTimeValues()) {
String timeStamp = simpleDateFormat.format(new Date((long) (timeValuePairs[0] * 1000)));
double value = timeValuePairs[1];
verticaMetricRepository.addMetricToBatch(definitionDimensionsSha1HashId, timeStamp, value);
verticaMetricRepo.addMetricToBatch(definitionDimensionsSha1HashId, timeStamp, value);
metricCounter.inc();
metricCount++;
}
@ -181,7 +181,7 @@ public class MetricHandler extends FlushableHandler<MetricEnvelope[]> {
{
String timeStamp = simpleDateFormat.format(new Date(metric.getTimestamp() * 1000));
double value = metric.getValue();
verticaMetricRepository.addMetricToBatch(definitionDimensionsSha1HashId, timeStamp, value);
verticaMetricRepo.addMetricToBatch(definitionDimensionsSha1HashId, timeStamp, value);
metricCounter.inc();
metricCount++;
}
@ -191,7 +191,7 @@ public class MetricHandler extends FlushableHandler<MetricEnvelope[]> {
@Override
public void flushRepository() {
verticaMetricRepository.flush();
verticaMetricRepo.flush();
}

@ -17,11 +17,11 @@
package monasca.persister.pipeline.event;
import monasca.persister.configuration.PipelineConfiguration;
import monasca.persister.configuration.PipelineConfig;
import com.google.inject.assistedinject.Assisted;
public interface MetricHandlerFactory {
MetricHandler create(PipelineConfiguration pipelineConfiguration,
MetricHandler create(PipelineConfig pipelineConfig,
@Assisted("ordinal") int ordinal, @Assisted("batchSize") int batchSize);
}

@ -19,9 +19,9 @@ package monasca.persister.repository;
import monasca.common.model.event.AlarmStateTransitionedEvent;
public interface AlarmRepository {
public interface AlarmRepo {
public void addToBatch(AlarmStateTransitionedEvent message);
public void addToBatch(final AlarmStateTransitionedEvent message);
public void flush();
}

@ -0,0 +1,85 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package monasca.persister.repository;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.LinkedList;
import java.util.List;
import io.dropwizard.setup.Environment;
import monasca.common.model.event.AlarmStateTransitionedEvent;
public abstract class InfluxAlarmRepo implements AlarmRepo {
private static final Logger logger = LoggerFactory.getLogger(InfluxAlarmRepo.class);
protected static final String ALARM_STATE_HISTORY_NAME = "alarm_state_history";
public final Timer flushTimer;
public final Meter alarmStateHistoryMeter;
protected List<AlarmStateTransitionedEvent> alarmStateTransitionedEventList = new LinkedList<>();
public InfluxAlarmRepo(final Environment env) {
this.flushTimer =
env.metrics().timer(MetricRegistry.name(getClass(), "flush-timer"));
this.alarmStateHistoryMeter =
env.metrics().meter(
MetricRegistry.name(getClass(), "alarm_state_history-meter"));
}
protected abstract void write () throws Exception;
@Override
public void addToBatch(AlarmStateTransitionedEvent alarmStateTransitionedEvent) {
this.alarmStateTransitionedEventList.add(alarmStateTransitionedEvent);
this.alarmStateHistoryMeter.mark();
}
@Override
public void flush() {
try {
if (this.alarmStateTransitionedEventList.isEmpty()) {
logger.debug("There are no alarm state transition events to be written to the influxDB");
logger.debug("Returning from flush");
return;
}
long startTime = System.currentTimeMillis();
Timer.Context context = flushTimer.time();
write();
context.stop();
long endTime = System.currentTimeMillis();
logger.debug("Commiting batch took {} seconds", (endTime - startTime) / 1000);
} catch (Exception e) {
logger.error("Failed to write alarm state history to database", e);
}
this.alarmStateTransitionedEventList.clear();
}
}

@ -1,120 +0,0 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package monasca.persister.repository;
import io.dropwizard.setup.Environment;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import monasca.persister.configuration.MonPersisterConfiguration;
import org.influxdb.dto.Serie;
import org.influxdb.dto.Serie.Builder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import com.google.inject.Inject;
import monasca.common.model.event.AlarmStateTransitionedEvent;
public class InfluxDBAlarmRepository extends InfluxRepository implements AlarmRepository {
private static final Logger logger = LoggerFactory.getLogger(InfluxDBAlarmRepository.class);
private static final String ALARM_STATE_HISTORY_NAME = "alarm_state_history";
private static final String[] COLUMN_NAMES = {"tenant_id", "alarm_id", "metrics", "old_state",
"new_state", "reason", "reason_data", "time"};
static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
static {
OBJECT_MAPPER
.setPropertyNamingStrategy(PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
}
protected final Timer flushTimer;
private List<AlarmStateTransitionedEvent> alarmStateTransitionedEventList = new LinkedList<>();
public final Meter alarmStateHistoryMeter;
@Inject
public InfluxDBAlarmRepository(MonPersisterConfiguration configuration, Environment environment) {
super(configuration, environment);
this.flushTimer =
this.environment.metrics().timer(MetricRegistry.name(getClass(), "flush-timer"));
this.alarmStateHistoryMeter =
this.environment.metrics().meter(
MetricRegistry.name(getClass(), "alarm_state_history-meter"));
}
@Override
public void addToBatch(AlarmStateTransitionedEvent alarmStateTransitionedEvent) {
alarmStateTransitionedEventList.add(alarmStateTransitionedEvent);
this.alarmStateHistoryMeter.mark();
}
@Override
public void flush() {
try {
if (this.alarmStateTransitionedEventList.isEmpty()) {
logger.debug("There are no alarm state transition events to be written to the influxDB");
logger.debug("Returning from flush");
return;
}
long startTime = System.currentTimeMillis();
Timer.Context context = flushTimer.time();
final Builder builder = new Serie.Builder(ALARM_STATE_HISTORY_NAME);
logger.debug("Created serie: {}", ALARM_STATE_HISTORY_NAME);
builder.columns(COLUMN_NAMES);
if (logger.isDebugEnabled()) {
logColumnNames(COLUMN_NAMES);
}
for (AlarmStateTransitionedEvent alarmStateTransitionedEvent : alarmStateTransitionedEventList) {
builder.values(alarmStateTransitionedEvent.tenantId, alarmStateTransitionedEvent.alarmId,
OBJECT_MAPPER.writeValueAsString(alarmStateTransitionedEvent.metrics),
alarmStateTransitionedEvent.oldState, alarmStateTransitionedEvent.newState,
alarmStateTransitionedEvent.stateChangeReason, "{}",
alarmStateTransitionedEvent.timestamp);
}
final Serie[] series = {builder.build()};
if (logger.isDebugEnabled()) {
logColValues(series[0]);
}
this.influxDB.write(this.configuration.getInfluxDBConfiguration().getName(),
TimeUnit.SECONDS, series);
context.stop();
long endTime = System.currentTimeMillis();
logger.debug("Commiting batch took {} seconds", (endTime - startTime) / 1000);
} catch (Exception e) {
logger.error("Failed to write alarm state history to database", e);
}
this.alarmStateTransitionedEventList.clear();
}
}

@ -1,343 +0,0 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package monasca.persister.repository;
import com.google.inject.Inject;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import org.apache.commons.codec.digest.DigestUtils;
import org.influxdb.dto.Serie;
import org.influxdb.dto.Serie.Builder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import io.dropwizard.setup.Environment;
import monasca.persister.configuration.MonPersisterConfiguration;
public final class InfluxDBMetricRepository extends InfluxRepository implements MetricRepository {
private static final Logger logger = LoggerFactory.getLogger(InfluxDBMetricRepository.class);
private static final int NUMBER_COLUMNS = 2;
private final Map<Sha1HashId, Def> defMap = new HashMap<>();
private final Map<Sha1HashId, Set<Dim>> dimMap = new HashMap<>();
private final Map<Sha1HashId, DefDim> defDimMap = new HashMap<>();
private final Map<Sha1HashId, List<Measurement>> measurementMap = new HashMap<>();
private final com.codahale.metrics.Timer flushTimer;
public final Meter measurementMeter;
private final SimpleDateFormat measurementTimeSimpleDateFormat = new
SimpleDateFormat("yyyy-MM-dd HH:mm:ss zzz");
private static final Sha1HashId BLANK_SHA_1_HASH_ID = new Sha1HashId(DigestUtils.sha(""));
private static final Set<Dim> EMPTY_DIM_TREE_SET = new TreeSet<>();
@Inject
public InfluxDBMetricRepository(final MonPersisterConfiguration configuration,
final Environment environment) {
super(configuration, environment);
this.flushTimer = this.environment.metrics().timer(this.getClass().getName() + "." +
"flush-timer");
this.measurementMeter = this.environment.metrics().meter(this.getClass().getName() + "." +
"measurement-meter");
}
@Override
public void addMetricToBatch(final Sha1HashId defDimsId, final String timeStamp,
final double value) {
final Measurement measurement = new Measurement(defDimsId, timeStamp, value);
List<Measurement> measurementList = this.measurementMap.get(defDimsId);
if (measurementList == null) {
measurementList = new LinkedList<>();
this.measurementMap.put(defDimsId, measurementList);
}
measurementList.add(measurement);
}
@Override
public void addDefinitionToBatch(final Sha1HashId defId, final String name, final String tenantId,
final String region) {
final Def def = new Def(defId, name, tenantId, region);
defMap.put(defId, def);
}
@Override
public void addDimensionToBatch(final Sha1HashId dimSetId, final String name,
final String value) {
Set<Dim> dimSet = dimMap.get(dimSetId);
if (dimSet == null) {
dimSet = new TreeSet<>();
dimMap.put(dimSetId, dimSet);
}
final Dim dim = new Dim(dimSetId, name, value);
dimSet.add(dim);
}
@Override
public void addDefinitionDimensionToBatch(final Sha1HashId defDimsId, final Sha1HashId defId,
Sha1HashId dimId) {
final DefDim defDim = new DefDim(defDimsId, defId, dimId);
defDimMap.put(defDimsId, defDim);
}
@Override
public void flush() {
try {
final long startTime = System.currentTimeMillis();
final Timer.Context context = flushTimer.time();
this.influxDB.write(this.configuration.getInfluxDBConfiguration().getName(),
TimeUnit.SECONDS, getSeries());
final long endTime = System.currentTimeMillis();
context.stop();
logger.debug("Writing measurements, definitions, and dimensions to InfluxDB took {} seconds",
(endTime - startTime) / 1000);
} catch (Exception e) {
logger.error("Failed to write measurements to InfluxDB", e);
}
clearBuffers();
}
private String buildSerieName(final Def def, final Set<Dim> dimList)
throws UnsupportedEncodingException {
logger.debug("Creating serie name");
final StringBuilder serieNameBuilder = new StringBuilder();
logger.debug("Adding tenant_id to serie name: {}", def.tenantId);
serieNameBuilder.append(urlEncodeUTF8(def.tenantId));
serieNameBuilder.append("?");
logger.debug("Adding region to serie name: {}", def.region);
serieNameBuilder.append(urlEncodeUTF8(def.region));
serieNameBuilder.append("&");
logger.debug("Adding name to serie name: {}", def.name);
serieNameBuilder.append(urlEncodeUTF8(def.name));
for (final Dim dim : dimList) {
serieNameBuilder.append("&");
logger.debug("Adding dimension name to serie name: {}", dim.name);
serieNameBuilder.append(urlEncodeUTF8(dim.name));
serieNameBuilder.append("=");
logger.debug("Adding dimension value to serie name: {}", dim.value);
serieNameBuilder.append(urlEncodeUTF8(dim.value));
}
final String serieName = serieNameBuilder.toString();
logger.debug("Created serie name: {}", serieName);
return serieName;
}
private Def getDef(final Sha1HashId defId) throws Exception {
final Def def = this.defMap.get(defId);
if (def == null) {
throw new Exception("Failed to find definition for defId: " + defId);
}
return def;
}
private Set<Dim> getDimSet(final Sha1HashId dimId) throws Exception {
// If there were no dimensions, then "" was used in the hash id and nothing was
// ever added to the dimension map for this dimension set.
if (dimId.equals(BLANK_SHA_1_HASH_ID)) {
return EMPTY_DIM_TREE_SET;
}
final Set<Dim> dimSet = this.dimMap.get(dimId);
if (dimSet == null) {
throw new Exception("Failed to find dimension set for dimId: " + dimId);
}
return dimSet;
}
private String urlEncodeUTF8(final String s) throws UnsupportedEncodingException {
return URLEncoder.encode(s, "UTF-8");
}
private String[] buildColNamesStringArry() {
final String[] colNameStringArry = new String[NUMBER_COLUMNS];
colNameStringArry[0] = "time";
logger.debug("Added column name[{}] = {}", 0, colNameStringArry[0]);
colNameStringArry[1] = "value";
logger.debug("Added column name[{}] = {}", 1, colNameStringArry[1]);
if (logger.isDebugEnabled()) {
logColumnNames(colNameStringArry);
}
return colNameStringArry;
}
private Serie[] getSeries() throws Exception {
final List<Serie> serieList = new LinkedList<>();
for (final Sha1HashId defDimId : this.measurementMap.keySet()) {
final DefDim defDim = this.defDimMap.get(defDimId);
final Def def = getDef(defDim.defId);
final Set<Dim> dimSet = getDimSet(defDim.dimId);
final Builder builder = new Serie.Builder(buildSerieName(def, dimSet));
builder.columns(buildColNamesStringArry());
for (final Measurement measurement : this.measurementMap.get(defDimId)) {
final Object[] colValsObjArry = new Object[NUMBER_COLUMNS];
final Date date = measurementTimeSimpleDateFormat.parse(measurement.time + " UTC");
final Long time = date.getTime() / 1000;
colValsObjArry[0] = time;
logger.debug("Added column value to colValsObjArry[{}] = {}", 0, colValsObjArry[0]);
colValsObjArry[1] = measurement.value;
logger.debug("Added column value to colValsObjArry[{}] = {}", 1, colValsObjArry[1]);
builder.values(colValsObjArry);
measurementMeter.mark();
}
final Serie serie = builder.build();
if (logger.isDebugEnabled()) {
logColValues(serie);
}
serieList.add(serie);
logger.debug("Added serie: {} to serieList", serie.getName());
}
return serieList.toArray(new Serie[serieList.size()]);
}
private void clearBuffers() {
this.measurementMap.clear();
this.defMap.clear();
this.dimMap.clear();
this.defDimMap.clear();
}
private static final class Measurement {
final Sha1HashId defDimsId;
final String time;
final double value;
private Measurement(final Sha1HashId defDimsId, final String time, final double value) {
this.defDimsId = defDimsId;
this.time = time;
this.value = value;
}
@Override
public String toString() {
return "Measurement{" + "defDimsId=" + defDimsId + ", time='" + time + '\'' + ", " +
"value=" + value + '}';
}
}
private static final class Def {
final Sha1HashId defId;
final String name;
final String tenantId;
final String region;
private Def(final Sha1HashId defId, final String name, final String tenantId,
final String region) {
this.defId = defId;
this.name = name;
this.tenantId = tenantId;
this.region = region;
}
@Override
public String toString() {
return "Definition{" + "defId=" + defId + ", name='" + name + '\'' + ", " +
"tenantId='" + tenantId + '\'' + ", region='" + region + '\'' + '}';
}
}
private static final class Dim implements Comparable<Dim> {
final Sha1HashId dimSetId;
final String name;
final String value;
private Dim(final Sha1HashId dimSetId, final String name, final String value) {
this.dimSetId = dimSetId;
this.name = name;
this.value = value;
}
@Override
public String toString() {
return "Dimension{" + "dimSetId=" + dimSetId + ", name='" + name + '\'' + ", " +
"value='" + value + '\'' + '}';
}
@Override
public int compareTo(Dim o) {
int nameCmp = String.CASE_INSENSITIVE_ORDER.compare(name, o.name);
return (nameCmp != 0 ? nameCmp : String.CASE_INSENSITIVE_ORDER.compare(value, o.value));
}
}
private static final class DefDim {
final Sha1HashId defDimId;
final Sha1HashId defId;
final Sha1HashId dimId;
private DefDim(final Sha1HashId defDimId, final Sha1HashId defId, final Sha1HashId dimId) {
this.defDimId = defDimId;
this.defId = defId;
this.dimId = dimId;
}
@Override
public String toString() {
return "DefinitionDimension{" + "defDimId=" + defDimId + ", defId=" + defId + ", " +
"dimId=" + dimId + '}';
}
}
}

@ -0,0 +1,242 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package monasca.persister.repository;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import org.apache.commons.codec.digest.DigestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import io.dropwizard.setup.Environment;
public abstract class InfluxMetricRepo implements MetricRepo {
private static final Logger logger = LoggerFactory.getLogger(InfluxMetricRepo.class);
protected final Map<Sha1HashId, Def> defMap = new HashMap<>();
protected final Map<Sha1HashId, Set<Dim>> dimMap = new HashMap<>();
protected final Map<Sha1HashId, DefDim> defDimMap = new HashMap<>();
protected final Map<Sha1HashId, List<Measurement>> measurementMap = new HashMap<>();
public final com.codahale.metrics.Timer flushTimer;
public final Meter measurementMeter;
private static final Sha1HashId BLANK_SHA_1_HASH_ID = new Sha1HashId(DigestUtils.sha(""));
private static final Set<Dim> EMPTY_DIM_TREE_SET = new TreeSet<>();
protected abstract void write() throws Exception;
public InfluxMetricRepo(final Environment env) {
this.flushTimer = env.metrics().timer(this.getClass().getName() + "." +
"flush-timer");
this.measurementMeter = env.metrics().meter(this.getClass().getName() + "." +
"measurement-meter");
}
@Override
public void addMetricToBatch(final Sha1HashId defDimsId, final String timeStamp,
final double value) {
final Measurement measurement = new Measurement(defDimsId, timeStamp, value);
List<Measurement> measurementList = this.measurementMap.get(defDimsId);
if (measurementList == null) {
measurementList = new LinkedList<>();
this.measurementMap.put(defDimsId, measurementList);
}
measurementList.add(measurement);
}
@Override
public void addDefinitionToBatch(final Sha1HashId defId, final String name, final String tenantId,
final String region) {
final Def def = new Def(defId, name, tenantId, region);
this.defMap.put(defId, def);
}
@Override
public void addDimensionToBatch(final Sha1HashId dimSetId, final String name,
final String value) {
final Dim dim = new Dim(dimSetId, name, value);
Set<Dim> dimSet = this.dimMap.get(dimSetId);
if (dimSet == null) {
dimSet = new TreeSet<>();
this.dimMap.put(dimSetId, dimSet);
}
dimSet.add(dim);
}
@Override
public void addDefinitionDimensionToBatch(final Sha1HashId defDimsId, final Sha1HashId defId,
Sha1HashId dimId) {
final DefDim defDim = new DefDim(defDimsId, defId, dimId);
this.defDimMap.put(defDimsId, defDim);
}
@Override
public void flush() {
try {
final long startTime = System.currentTimeMillis();
final Timer.Context context = flushTimer.time();
write();
final long endTime = System.currentTimeMillis();
context.stop();
logger.debug("Writing measurements, definitions, and dimensions to InfluxDB took {} seconds",
(endTime - startTime) / 1000);
} catch (Exception e) {
logger.error("Failed to write measurements to InfluxDB", e);
}
clearBuffers();
}
protected Def getDef(final Sha1HashId defId) throws Exception {
final Def def = this.defMap.get(defId);
if (def == null) {
throw new Exception("Failed to find definition for defId: " + defId);
}
return def;
}
protected Set<Dim> getDimSet(final Sha1HashId dimId) throws Exception {
// If there were no dimensions, then "" was used in the hash id and nothing was
// ever added to the dimension map for this dimension set.
if (dimId.equals(BLANK_SHA_1_HASH_ID)) {
return EMPTY_DIM_TREE_SET;
}
final Set<Dim> dimSet = this.dimMap.get(dimId);
if (dimSet == null) {
throw new Exception("Failed to find dimension set for dimId: " + dimId);
}
return dimSet;
}
private void clearBuffers() {
this.measurementMap.clear();
this.defMap.clear();
this.dimMap.clear();
this.defDimMap.clear();
}
static protected final class Measurement {
protected final Sha1HashId defDimsId;
protected final String time;
protected final double value;
private Measurement(final Sha1HashId defDimsId, final String time, final double value) {
this.defDimsId = defDimsId;
this.time = time;
this.value = value;
}
@Override
public String toString() {
return "Measurement{" + "defDimsId=" + this.defDimsId + ", time='" + this.time + '\'' + ", " +
"value=" + this.value + '}';
}
}
static protected final class Def {
protected final Sha1HashId defId;
protected final String name;
protected final String tenantId;
protected final String region;
private Def(final Sha1HashId defId, final String name, final String tenantId,
final String region) {
this.defId = defId;
this.name = name;
this.tenantId = tenantId;
this.region = region;
}
@Override
public String toString() {
return "Definition{" + "defId=" + this.defId + ", name='" + this.name + '\'' + ", " +
"tenantId='" + this.tenantId + '\'' + ", region='" + this.region + '\'' + '}';
}
}
static protected final class Dim implements Comparable<Dim> {
protected final Sha1HashId dimSetId;
protected final String name;
protected final String value;
private Dim(final Sha1HashId dimSetId, final String name, final String value) {
this.dimSetId = dimSetId;
this.name = name;
this.value = value;
}
@Override
public String toString() {
return "Dimension{" + "dimSetId=" + this.dimSetId + ", name='" + this.name + '\'' + ", " +
"value='" + this.value + '\'' + '}';
}
@Override
public int compareTo(Dim o) {
int nameCmp = String.CASE_INSENSITIVE_ORDER.compare(name, o.name);
return (nameCmp != 0 ? nameCmp : String.CASE_INSENSITIVE_ORDER.compare(this.value, o.value));
}
}
static protected final class DefDim {
protected final Sha1HashId defDimId;
protected final Sha1HashId defId;
protected final Sha1HashId dimId;
private DefDim(final Sha1HashId defDimId, final Sha1HashId defId, final Sha1HashId dimId) {
this.defDimId = defDimId;
this.defId = defId;
this.dimId = dimId;
}
@Override
public String toString() {
return "DefinitionDimension{" + "defDimId=" + this.defDimId + ", defId=" + this.defId + ", " +
"dimId=" + this.dimId + '}';
}
}
}

@ -0,0 +1,89 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package monasca.persister.repository;
import com.google.inject.Inject;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import org.influxdb.dto.Serie;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
import io.dropwizard.setup.Environment;
import monasca.common.model.event.AlarmStateTransitionedEvent;
public class InfluxV8AlarmRepo extends InfluxAlarmRepo {
private static final Logger logger = LoggerFactory.getLogger(InfluxV8AlarmRepo.class);
private static final String[]
COLUMN_NAMES =
{"tenant_id", "alarm_id", "metrics", "old_state", "new_state", "reason", "reason_data",
"time"};
private final InfluxV8RepoWriter influxV8RepoWriter;
private final ObjectMapper objectMapper = new ObjectMapper();
@Inject
public InfluxV8AlarmRepo(final Environment env,
final InfluxV8RepoWriter influxV8RepoWriter) {
super(env);
this.influxV8RepoWriter = influxV8RepoWriter;
this.objectMapper.setPropertyNamingStrategy(
PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
}
@Override
protected void write() throws JsonProcessingException {
final Serie.Builder builder = new Serie.Builder(ALARM_STATE_HISTORY_NAME);
logger.debug("Created serie: {}", ALARM_STATE_HISTORY_NAME);
builder.columns(COLUMN_NAMES);
if (logger.isDebugEnabled()) {
this.influxV8RepoWriter.logColumnNames(COLUMN_NAMES);
}
for (AlarmStateTransitionedEvent alarmStateTransitionedEvent : this.alarmStateTransitionedEventList) {
builder.values(alarmStateTransitionedEvent.tenantId, alarmStateTransitionedEvent.alarmId,
this.objectMapper.writeValueAsString(alarmStateTransitionedEvent.metrics),
alarmStateTransitionedEvent.oldState, alarmStateTransitionedEvent.newState,
alarmStateTransitionedEvent.stateChangeReason, "{}",
alarmStateTransitionedEvent.timestamp);
}
final Serie[] series = {builder.build()};
if (logger.isDebugEnabled()) {
this.influxV8RepoWriter.logColValues(series[0]);
}
this.influxV8RepoWriter.write(TimeUnit.SECONDS, series);
}
}

@ -0,0 +1,136 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package monasca.persister.repository;
import com.google.inject.Inject;
import org.influxdb.dto.Serie;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import io.dropwizard.setup.Environment;
public class InfluxV8MetricRepo extends InfluxMetricRepo
{
private static final Logger logger = LoggerFactory.getLogger(InfluxV8MetricRepo.class);
private static final String[] COL_NAMES_STRING_ARRY = {"time", "value"};
private final InfluxV8RepoWriter influxV8RepoWriter;
private final SimpleDateFormat simpleDateFormat =
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss zzz");
@Inject
public InfluxV8MetricRepo(final Environment env,
final InfluxV8RepoWriter influxV8RepoWriter) {
super(env);
this.influxV8RepoWriter = influxV8RepoWriter;
}
@Override
protected void write() throws Exception {
this.influxV8RepoWriter.write(TimeUnit.SECONDS, getSeries());
}
private Serie[] getSeries() throws Exception {
final List<Serie> serieList = new LinkedList<>();
for (final Sha1HashId defDimId : this.measurementMap.keySet()) {
final DefDim defDim = this.defDimMap.get(defDimId);
final Def def = getDef(defDim.defId);
final Set<Dim> dimSet = getDimSet(defDim.dimId);
final Serie.Builder builder = new Serie.Builder(buildSerieName(def, dimSet));
builder.columns(COL_NAMES_STRING_ARRY);
for (final Measurement measurement : this.measurementMap.get(defDimId)) {
final Object[] colValsObjArry = new Object[COL_NAMES_STRING_ARRY.length];
final Date date = this.simpleDateFormat.parse(measurement.time + " UTC");
final Long time = date.getTime() / 1000;
colValsObjArry[0] = time;
logger.debug("Added column value to colValsObjArry[{}] = {}", 0, colValsObjArry[0]);
colValsObjArry[1] = measurement.value;
logger.debug("Added column value to colValsObjArry[{}] = {}", 1, colValsObjArry[1]);
builder.values(colValsObjArry);
this.measurementMeter.mark();
}
final Serie serie = builder.build();
if (logger.isDebugEnabled()) {
this.influxV8RepoWriter.logColValues(serie);
}
serieList.add(serie);
logger.debug("Added serie: {} to serieList", serie.getName());
}
return serieList.toArray(new Serie[serieList.size()]);
}
private String buildSerieName(final Def def, final Set<Dim> dimList)
throws UnsupportedEncodingException {
logger.debug("Creating serie name");
final StringBuilder serieNameBuilder = new StringBuilder();
logger.debug("Adding tenant_id to serie name: {}", def.tenantId);
serieNameBuilder.append(urlEncodeUTF8(def.tenantId));
serieNameBuilder.append("?");
logger.debug("Adding region to serie name: {}", def.region);
serieNameBuilder.append(urlEncodeUTF8(def.region));
serieNameBuilder.append("&");
logger.debug("Adding name to serie name: {}", def.name);
serieNameBuilder.append(urlEncodeUTF8(def.name));
for (final Dim dim : dimList) {
serieNameBuilder.append("&");
logger.debug("Adding dimension name to serie name: {}", dim.name);
serieNameBuilder.append(urlEncodeUTF8(dim.name));
serieNameBuilder.append("=");
logger.debug("Adding dimension value to serie name: {}", dim.value);
serieNameBuilder.append(urlEncodeUTF8(dim.value));
}
final String serieName = serieNameBuilder.toString();
logger.debug("Created serie name: {}", serieName);
return serieName;
}
private String urlEncodeUTF8(final String s) throws UnsupportedEncodingException {
return URLEncoder.encode(s, "UTF-8");
}
}

@ -17,9 +17,7 @@
package monasca.persister.repository;
import monasca.persister.configuration.MonPersisterConfiguration;
import io.dropwizard.setup.Environment;
import com.google.inject.Inject;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
@ -29,24 +27,41 @@ import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public abstract class InfluxRepository {
static final Logger logger = LoggerFactory.getLogger(InfluxRepository.class);
import io.dropwizard.setup.Environment;
import monasca.persister.configuration.PersisterConfig;
protected final MonPersisterConfiguration configuration;
protected final Environment environment;
protected final InfluxDB influxDB;
public class InfluxV8RepoWriter {
static final Logger logger = LoggerFactory.getLogger(InfluxV8RepoWriter.class);
private final PersisterConfig config;
private final Environment env;
private final InfluxDB influxDB;
private final String databaseName;
@Inject
public InfluxV8RepoWriter(final PersisterConfig config, final Environment env) {
this.config = config;
this.env = env;
this.influxDB =
InfluxDBFactory.connect(config.getInfluxDBConfiguration().getUrl(),
config.getInfluxDBConfiguration().getUser(),
config.getInfluxDBConfiguration().getPassword());
this.databaseName = this.config.getInfluxDBConfiguration().getName();
public InfluxRepository(MonPersisterConfiguration configuration, Environment environment) {
this.configuration = configuration;
this.environment = environment;
influxDB =
InfluxDBFactory.connect(configuration.getInfluxDBConfiguration().getUrl(), configuration
.getInfluxDBConfiguration().getUser(), configuration.getInfluxDBConfiguration()
.getPassword());
}
protected void logColValues(Serie serie) {
protected void write(final TimeUnit precision, final Serie[] series) {
this.influxDB.write(this.databaseName, precision, series);
}
protected void logColValues(final Serie serie) {
logger.debug("Added array of array of column values to serie");
final String[] colNames = serie.getColumns();
List<Map<String, Object>> rows = serie.getRows();
@ -67,7 +82,7 @@ public abstract class InfluxRepository {
}
}
protected void logColumnNames(String[] colNames) {
protected void logColumnNames(final String[] colNames) {
logger.debug("Added array of column names to serie");
StringBuffer sb = new StringBuffer();
boolean first = true;

@ -0,0 +1,96 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package monasca.persister.repository;
import com.google.inject.Inject;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import io.dropwizard.setup.Environment;
import monasca.common.model.event.AlarmStateTransitionedEvent;
import monasca.persister.repository.influxdb.InfluxPoint;
public class InfluxV9AlarmRepo extends InfluxAlarmRepo {
private static final Logger logger = LoggerFactory.getLogger(InfluxV9AlarmRepo.class);
private final InfluxV9RepoWriter influxV9RepoWriter;
private final ObjectMapper objectMapper = new ObjectMapper();
private final DateTimeFormatter dateFormatter = ISODateTimeFormat.dateTime();
@Inject
public InfluxV9AlarmRepo(final Environment env,
final InfluxV9RepoWriter influxV9RepoWriter) {
super(env);
this.influxV9RepoWriter = influxV9RepoWriter;
this.objectMapper.setPropertyNamingStrategy(
PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
}
@Override
protected void write() throws Exception {
this.influxV9RepoWriter.write(getInfluxPointArry());
}
private InfluxPoint[] getInfluxPointArry() throws Exception {
List<InfluxPoint> influxPointList = new LinkedList<>();
for (AlarmStateTransitionedEvent event : this.alarmStateTransitionedEventList) {
Map<String, Object> valueMap = new HashMap<>();
valueMap.put("tenant_id", event.tenantId);
valueMap.put("alarm_id", event.alarmId);
valueMap.put("metrics", this.objectMapper.writeValueAsString(event.metrics));
valueMap.put("old_state", event.oldState);
valueMap.put("new_state", event.newState);
valueMap.put("reason", event.stateChangeReason);
valueMap.put("reason_data", "{}");
DateTime dateTime = new DateTime(event.timestamp * 1000, DateTimeZone.UTC);
String dateString = this.dateFormatter.print(dateTime);
InfluxPoint
influxPoint =
new InfluxPoint(ALARM_STATE_HISTORY_NAME, new HashMap(), dateString, valueMap);
influxPointList.add(influxPoint);
}
return influxPointList.toArray(new InfluxPoint[influxPointList.size()]);
}
}

@ -0,0 +1,104 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package monasca.persister.repository;
import com.google.inject.Inject;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import io.dropwizard.setup.Environment;
import monasca.persister.repository.influxdb.InfluxPoint;
public class InfluxV9MetricRepo extends InfluxMetricRepo {
private static final Logger logger = LoggerFactory.getLogger(InfluxV9MetricRepo.class);
private final SimpleDateFormat simpleDateFormat =
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss zzz");
private final InfluxV9RepoWriter influxV9RepoWriter;
@Inject
public InfluxV9MetricRepo(final Environment env,
final InfluxV9RepoWriter influxV9RepoWriter) {
super(env);
this.influxV9RepoWriter = influxV9RepoWriter;
}
@Override
protected void write() throws Exception {
this.influxV9RepoWriter.write(getInfluxPointArry());
}
private InfluxPoint[] getInfluxPointArry() throws Exception {
DateTimeFormatter dateFormatter = ISODateTimeFormat.dateTime();
List<InfluxPoint> influxPointList = new LinkedList<>();
for (final Sha1HashId defDimId : this.measurementMap.keySet()) {
final DefDim defDim = this.defDimMap.get(defDimId);
final Def def = getDef(defDim.defId);
final Set<Dim> dimSet = getDimSet(defDim.dimId);
Map<String, String> tagMap = new HashMap<>();
for (Dim dim : dimSet) {
tagMap.put(dim.name, dim.value);
}
tagMap.put("tenant_id", def.tenantId);
tagMap.put("region", def.region);
for (final Measurement measurement : this.measurementMap.get(defDimId)) {
Date date = this.simpleDateFormat.parse(measurement.time + " UTC");
DateTime dateTime = new DateTime(date.getTime(), DateTimeZone.UTC);
String dateString = dateFormatter.print(dateTime);
Map<String, Object> valueMap = new HashMap<>();
valueMap.put("value", measurement.value);
InfluxPoint influxPoint = new InfluxPoint(def.name, tagMap, dateString, valueMap);
influxPointList.add(influxPoint);
this.measurementMeter.mark();
}
}
return influxPointList.toArray(new InfluxPoint[influxPointList.size()]);
}
}

@ -0,0 +1,120 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package monasca.persister.repository;
import com.google.inject.Inject;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import monasca.persister.configuration.PersisterConfig;
import monasca.persister.repository.influxdb.InfluxPoint;
import monasca.persister.repository.influxdb.InfluxWrite;
public class InfluxV9RepoWriter {
private static final Logger logger = LoggerFactory.getLogger(InfluxV9RepoWriter.class);
private final PersisterConfig config;
private final String influxName;
private final String influxUrl;
private final String influxCreds;
private final String influxUser;
private final String influxPass;
private final String influxRetentionPolicy;
private final CloseableHttpClient httpClient = HttpClientBuilder.create().build();
private final String baseAuthHeader;
private final ObjectMapper objectMapper = new ObjectMapper();
@Inject
public InfluxV9RepoWriter(final PersisterConfig config) {
this.config = config;
this.influxName = config.getInfluxDBConfiguration().getName();
this.influxUrl = config.getInfluxDBConfiguration().getUrl() + "/write";
this.influxUser = config.getInfluxDBConfiguration().getUser();
this.influxPass = config.getInfluxDBConfiguration().getPassword();
this.influxCreds = this.influxUser + ":" + this.influxPass;
this.influxRetentionPolicy = config.getInfluxDBConfiguration().getRetentionPolicy();
this.baseAuthHeader = "Basic " + new String(Base64.encodeBase64(this.influxCreds.getBytes()));
}
protected void write(final InfluxPoint[] influxPointArry) throws Exception {
HttpPost request = new HttpPost(this.influxUrl);
request.addHeader("content-type", "application/json");
request.addHeader("Authorization", this.baseAuthHeader);
InfluxWrite
influxWrite =
new InfluxWrite(this.influxName, this.influxRetentionPolicy, influxPointArry,
new HashMap());
StringEntity params = new StringEntity(this.objectMapper.writeValueAsString(influxWrite));
request.setEntity(params);
try {
logger.debug("Writing {} points to influxdb database {} at {}",
influxPointArry.length, this.influxName, this.influxUrl);
HttpResponse response = this.httpClient.execute(request);
int rc = response.getStatusLine().getStatusCode();
if (rc != HttpStatus.SC_OK) {
HttpEntity entity = response.getEntity();
String responseString = EntityUtils.toString(entity, "UTF-8");
logger.error("Failed to write data to Influxdb: {}", String.valueOf(rc));
logger.error("Http response: {}", responseString);
throw new Exception(responseString);
}
logger.debug("Successfully wrote {} points to influxdb database {} at {}",
influxPointArry.length, this.influxName, this.influxUrl);
} finally {
request.releaseConnection();
}
}
}

@ -17,7 +17,7 @@
package monasca.persister.repository;
public interface MetricRepository {
public interface MetricRepo {
void addMetricToBatch(Sha1HashId defDimsId, String timeStamp, double value);
void addDefinitionToBatch(Sha1HashId defId, String name, String tenantId, String region);

@ -18,7 +18,7 @@
package monasca.persister.repository;
import monasca.common.model.event.AlarmStateTransitionedEvent;
import monasca.persister.configuration.MonPersisterConfiguration;
import monasca.persister.configuration.PersisterConfig;
import com.codahale.metrics.Timer;
@ -37,9 +37,9 @@ import java.util.TimeZone;
import javax.inject.Inject;
public class VerticaAlarmRepository extends VerticaRepository implements AlarmRepository {
public class VerticaAlarmRepo extends VerticaRepo implements AlarmRepo {
private static final Logger logger = LoggerFactory.getLogger(VerticaAlarmRepository.class);
private static final Logger logger = LoggerFactory.getLogger(VerticaAlarmRepo.class);
private final Environment environment;
private static final String SQL_INSERT_INTO_ALARM_HISTORY =
@ -49,8 +49,7 @@ public class VerticaAlarmRepository extends VerticaRepository implements AlarmRe
private final SimpleDateFormat simpleDateFormat;
@Inject
public VerticaAlarmRepository(DBI dbi, MonPersisterConfiguration configuration,
Environment environment) throws NoSuchAlgorithmException, SQLException {
public VerticaAlarmRepo(DBI dbi, PersisterConfig configuration, Environment environment) throws NoSuchAlgorithmException, SQLException {
super(dbi);
logger.debug("Instantiating: " + this);

@ -17,7 +17,7 @@
package monasca.persister.repository;
import monasca.persister.configuration.MonPersisterConfiguration;
import monasca.persister.configuration.PersisterConfig;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
@ -38,9 +38,9 @@ import java.util.Set;
import javax.inject.Inject;
public class VerticaMetricRepository extends VerticaRepository implements MetricRepository {
public class VerticaMetricRepo extends VerticaRepo implements MetricRepo {
private static final Logger logger = LoggerFactory.getLogger(VerticaMetricRepository.class);
private static final Logger logger = LoggerFactory.getLogger(VerticaMetricRepo.class);
private final Environment environment;
@ -92,8 +92,8 @@ public class VerticaMetricRepository extends VerticaRepository implements Metric
public final Meter definitionDimensionCacheHitMeter;
@Inject
public VerticaMetricRepository(DBI dbi, MonPersisterConfiguration configuration,
Environment environment) throws NoSuchAlgorithmException, SQLException {
public VerticaMetricRepo(DBI dbi, PersisterConfig configuration,
Environment environment) throws NoSuchAlgorithmException, SQLException {
super(dbi);
logger.debug("Instantiating: " + this.getClass().getName());
@ -123,15 +123,15 @@ public class VerticaMetricRepository extends VerticaRepository implements Metric
definitionsIdCache =
CacheBuilder.newBuilder()
.maximumSize(configuration.getVerticaMetricRepositoryConfiguration().getMaxCacheSize())
.maximumSize(configuration.getVerticaMetricRepoConfig().getMaxCacheSize())
.build();
dimensionsIdCache =
CacheBuilder.newBuilder()
.maximumSize(configuration.getVerticaMetricRepositoryConfiguration().getMaxCacheSize())
.maximumSize(configuration.getVerticaMetricRepoConfig().getMaxCacheSize())
.build();
definitionDimensionsIdCache =
CacheBuilder.newBuilder()
.maximumSize(configuration.getVerticaMetricRepositoryConfiguration().getMaxCacheSize())
.maximumSize(configuration.getVerticaMetricRepoConfig().getMaxCacheSize())
.build();
logger.info("preparing database and building sql statements...");

@ -20,17 +20,17 @@ package monasca.persister.repository;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
public class VerticaRepository {
public class VerticaRepo {
protected DBI dbi;
protected Handle handle;
public VerticaRepository(DBI dbi) {
public VerticaRepo(DBI dbi) {
this.dbi = dbi;
this.handle = dbi.open();
this.handle.execute("SET TIME ZONE TO 'UTC'");
}
public VerticaRepository() {}
public VerticaRepo() {}
public void setDBI(DBI dbi) throws Exception {
this.dbi = dbi;

@ -0,0 +1,53 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package monasca.persister.repository.influxdb;
import java.util.Map;
public class InfluxPoint {
private final String name;
private final Map<String, String> tags;
private final String timestamp;
private final Map<String, Object> values;
public InfluxPoint(final String name, final Map<String, String> tags, final String timestamp,
final Map<String, Object> values) {
this.name = name;
this.tags = tags;
this.timestamp = timestamp;
this.values = values;
}
public String getName() {
return name;
}
public Map<String, String> getTags() {
return this.tags;
}
public String getTimestamp() {
return this.timestamp;
}
public Map<String, Object> getValues() {
return this.values;
}
}

@ -0,0 +1,53 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package monasca.persister.repository.influxdb;
import java.util.Map;
public class InfluxWrite {
private final String database;
private final String retentionPolicy;
private final InfluxPoint[] points;
private final Map<String, String> tags;
public InfluxWrite(final String database, final String retentionPolicy, final InfluxPoint[] points,
final Map<String, String> tags) {
this.database = database;
this.retentionPolicy = retentionPolicy;
this.points = points;
this.tags = tags;
}
public String getDatabase() {
return database;
}
public String getRetentionPolicy() {
return retentionPolicy;
}
public Map<String, String> getTags() {
return this.tags;
}
public InfluxPoint[] getPoints() {
return points;
}
}

@ -21,7 +21,7 @@ metricConfiguration:
clientId: 1
#Kafka settings.
kafkaConfiguration:
kafkaConfig:
#zookeeperConnect: localhost:2181
# See http://kafka.apache.org/documentation.html#api for semantics and defaults.
zookeeperConnect: 192.168.10.4:2181
@ -40,15 +40,21 @@ kafkaConfiguration:
zookeeperConnectionTimeoutMs : 6000
zookeeperSyncTimeMs: 2000
verticaMetricRepositoryConfiguration:
verticaMetricRepoConfig:
maxCacheSize: 2000000
databaseConfiguration:
# vertica | influxdb
# databaseType can be (vertica | influxdb)
databaseType: influxdb
# databaseType: vertica
influxDbConfiguration:
# Version and retention policy must be set for V9. If no
# version set then defaults to V8.
# version can be (V8 | V9)
#version: V9
# Retention policy may be left blank to indicate default policy.
#retentionPolicy:
name: mon
replicationFactor: 1
url: http://192.168.10.4:8086