Remove Influxdb 8
Change-Id: I9d2e4460ff1858335a1857969b18f2303e9470b6
This commit is contained in:
parent
4f138c3d44
commit
213fc06707
@ -41,17 +41,14 @@ import monasca.persister.consumer.alarmstate.KafkaAlarmStateTransitionConsumerFa
|
||||
import monasca.persister.consumer.metric.KafkaMetricsConsumer;
|
||||
import monasca.persister.consumer.metric.KafkaMetricsConsumerFactory;
|
||||
import monasca.persister.dbi.DBIProvider;
|
||||
import monasca.persister.pipeline.ManagedPipelineFactory;
|
||||
import monasca.persister.pipeline.ManagedPipeline;
|
||||
import monasca.persister.pipeline.ManagedPipelineFactory;
|
||||
import monasca.persister.pipeline.event.AlarmStateTransitionedEventHandler;
|
||||
import monasca.persister.pipeline.event.AlarmStateTransitionedEventHandlerFactory;
|
||||
import monasca.persister.pipeline.event.MetricHandler;
|
||||
import monasca.persister.pipeline.event.MetricHandlerFactory;
|
||||
import monasca.persister.repository.AlarmRepo;
|
||||
import monasca.persister.repository.MetricRepo;
|
||||
import monasca.persister.repository.influxdb.InfluxV8AlarmRepo;
|
||||
import monasca.persister.repository.influxdb.InfluxV8MetricRepo;
|
||||
import monasca.persister.repository.influxdb.InfluxV8RepoWriter;
|
||||
import monasca.persister.repository.influxdb.InfluxV9AlarmRepo;
|
||||
import monasca.persister.repository.influxdb.InfluxV9MetricRepo;
|
||||
import monasca.persister.repository.influxdb.InfluxV9RepoWriter;
|
||||
@ -62,7 +59,7 @@ public class PersisterModule extends AbstractModule {
|
||||
|
||||
private static final String VERTICA = "vertica";
|
||||
private static final String INFLUXDB = "influxdb";
|
||||
private static final String INFLUXDB_V8 = "v8";
|
||||
|
||||
private static final String INFLUXDB_V9 = "v9";
|
||||
|
||||
private final PersisterConfig config;
|
||||
@ -151,30 +148,21 @@ public class PersisterModule extends AbstractModule {
|
||||
|
||||
} else if (config.getDatabaseConfiguration().getDatabaseType().equalsIgnoreCase(INFLUXDB)) {
|
||||
|
||||
// Check for null to not break existing configs. If no version, default to V8.
|
||||
if (config.getInfluxDBConfiguration().getVersion() == null || config
|
||||
.getInfluxDBConfiguration().getVersion().equalsIgnoreCase(INFLUXDB_V8)) {
|
||||
|
||||
bind(InfluxV8RepoWriter.class);
|
||||
bind(MetricRepo.class).to(InfluxV8MetricRepo.class);
|
||||
bind(AlarmRepo.class).to(InfluxV8AlarmRepo.class);
|
||||
|
||||
} else if (config.getInfluxDBConfiguration().getVersion().equalsIgnoreCase(INFLUXDB_V9)) {
|
||||
|
||||
bind(InfluxV9RepoWriter.class).in(Singleton.class);
|
||||
bind(MetricRepo.class).to(InfluxV9MetricRepo.class);
|
||||
bind(AlarmRepo.class).to(InfluxV9AlarmRepo.class);
|
||||
|
||||
} else {
|
||||
if (config.getInfluxDBConfiguration().getVersion() != null && !config
|
||||
.getInfluxDBConfiguration().getVersion().equalsIgnoreCase(INFLUXDB_V9)) {
|
||||
|
||||
System.err.println(
|
||||
"Found unknown Influxdb version: " + config.getInfluxDBConfiguration().getVersion());
|
||||
System.err.println("Supported Influxdb versions are 'v8' and 'v9'");
|
||||
"Found unsupported Influxdb version: " + config.getInfluxDBConfiguration()
|
||||
.getVersion());
|
||||
System.err.println("Supported Influxdb versions are 'v9'");
|
||||
System.err.println("Check your config file");
|
||||
System.exit(1);
|
||||
|
||||
}
|
||||
|
||||
bind(InfluxV9RepoWriter.class).in(Singleton.class);
|
||||
bind(MetricRepo.class).to(InfluxV9MetricRepo.class);
|
||||
bind(AlarmRepo.class).to(InfluxV9AlarmRepo.class);
|
||||
|
||||
} else {
|
||||
|
||||
System.err.println(
|
||||
|
@ -1,91 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
* implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package monasca.persister.repository.influxdb;
|
||||
|
||||
import monasca.common.model.event.AlarmStateTransitionedEvent;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
|
||||
|
||||
import org.influxdb.dto.Serie;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import io.dropwizard.setup.Environment;
|
||||
|
||||
public class InfluxV8AlarmRepo extends InfluxAlarmRepo {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(InfluxV8AlarmRepo.class);
|
||||
|
||||
private static final String[]
|
||||
COLUMN_NAMES =
|
||||
{"tenant_id", "alarm_id", "metrics", "old_state", "new_state", "sub_alarms", "reason", "reason_data",
|
||||
"time"};
|
||||
|
||||
private final InfluxV8RepoWriter influxV8RepoWriter;
|
||||
|
||||
private final ObjectMapper objectMapper = new ObjectMapper();
|
||||
|
||||
@Inject
|
||||
public InfluxV8AlarmRepo(final Environment env,
|
||||
final InfluxV8RepoWriter influxV8RepoWriter) {
|
||||
|
||||
super(env);
|
||||
this.influxV8RepoWriter = influxV8RepoWriter;
|
||||
|
||||
this.objectMapper.setPropertyNamingStrategy(
|
||||
PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void write() throws JsonProcessingException {
|
||||
|
||||
final Serie.Builder builder = new Serie.Builder(ALARM_STATE_HISTORY_NAME);
|
||||
logger.debug("Created serie: {}", ALARM_STATE_HISTORY_NAME);
|
||||
|
||||
builder.columns(COLUMN_NAMES);
|
||||
|
||||
if (logger.isDebugEnabled()) {
|
||||
this.influxV8RepoWriter.logColumnNames(COLUMN_NAMES);
|
||||
}
|
||||
|
||||
for (AlarmStateTransitionedEvent alarmStateTransitionedEvent : this.alarmStateTransitionedEventList) {
|
||||
builder.values(alarmStateTransitionedEvent.tenantId, alarmStateTransitionedEvent.alarmId,
|
||||
this.objectMapper.writeValueAsString(alarmStateTransitionedEvent.metrics),
|
||||
alarmStateTransitionedEvent.oldState, alarmStateTransitionedEvent.newState,
|
||||
this.objectMapper.writeValueAsString(alarmStateTransitionedEvent.subAlarms),
|
||||
alarmStateTransitionedEvent.stateChangeReason, "{}",
|
||||
alarmStateTransitionedEvent.timestamp);
|
||||
}
|
||||
|
||||
final Serie[] series = {builder.build()};
|
||||
|
||||
if (logger.isDebugEnabled()) {
|
||||
this.influxV8RepoWriter.logColValues(series[0]);
|
||||
}
|
||||
|
||||
this.influxV8RepoWriter.write(TimeUnit.MILLISECONDS, series);
|
||||
}
|
||||
|
||||
}
|
@ -1,151 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
* implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package monasca.persister.repository.influxdb;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
|
||||
import org.influxdb.dto.Serie;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.net.URLEncoder;
|
||||
import java.text.ParseException;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import io.dropwizard.setup.Environment;
|
||||
|
||||
public class InfluxV8MetricRepo extends InfluxMetricRepo
|
||||
{
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(InfluxV8MetricRepo.class);
|
||||
private static final String[] COL_NAMES_STRING_ARRY = {"time", "value", "value_meta"};
|
||||
|
||||
private final InfluxV8RepoWriter influxV8RepoWriter;
|
||||
|
||||
@Inject
|
||||
public InfluxV8MetricRepo(final Environment env,
|
||||
final InfluxV8RepoWriter influxV8RepoWriter) {
|
||||
|
||||
super(env);
|
||||
this.influxV8RepoWriter = influxV8RepoWriter;
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void write() throws Exception {
|
||||
|
||||
this.influxV8RepoWriter.write(TimeUnit.MILLISECONDS, getSeries());
|
||||
}
|
||||
|
||||
private Serie[] getSeries() throws UnsupportedEncodingException, ParseException {
|
||||
|
||||
final List<Serie> serieList = new LinkedList<>();
|
||||
|
||||
for (Map.Entry<Definition, Map<Dimensions, List<Measurement>>> definitionMapEntry
|
||||
: this.measurementBuffer.entrySet()) {
|
||||
|
||||
Definition definition = definitionMapEntry.getKey();
|
||||
Map<Dimensions, List<Measurement>> dimensionsMap = definitionMapEntry.getValue();
|
||||
|
||||
for (Map.Entry<Dimensions, List<Measurement>> dimensionsMapEntry
|
||||
: dimensionsMap.entrySet()) {
|
||||
|
||||
Dimensions dimensions = dimensionsMapEntry.getKey();
|
||||
List<Measurement> measurementList = dimensionsMapEntry.getValue();
|
||||
|
||||
final Serie.Builder builder = new Serie.Builder(buildSerieName(definition, dimensions));
|
||||
builder.columns(COL_NAMES_STRING_ARRY);
|
||||
|
||||
for (Measurement measurement : measurementList) {
|
||||
|
||||
final Object[] valObjArry = new Object[COL_NAMES_STRING_ARRY.length];
|
||||
|
||||
valObjArry[0] = measurement.getTime();
|
||||
logger.debug("Added column value to valObjArry[{}] = {}", 0, valObjArry[0]);
|
||||
|
||||
valObjArry[1] = measurement.getValue();
|
||||
logger.debug("Added column value to valObjArry[{}] = {}", 1, valObjArry[1]);
|
||||
|
||||
valObjArry[2] = measurement.getValueMetaJSONString();
|
||||
logger.debug("Added column value to valObjArry[{}] = {}", 2, valObjArry[2]);
|
||||
|
||||
builder.values(valObjArry);
|
||||
|
||||
this.measurementMeter.mark();
|
||||
|
||||
}
|
||||
|
||||
final Serie serie = builder.build();
|
||||
|
||||
if (logger.isDebugEnabled()) {
|
||||
this.influxV8RepoWriter.logColValues(serie);
|
||||
}
|
||||
|
||||
serieList.add(serie);
|
||||
logger.debug("Added serie: {} to serieList", serie.getName());
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return serieList.toArray(new Serie[serieList.size()]);
|
||||
|
||||
}
|
||||
|
||||
private String buildSerieName(final Definition definition, final Dimensions dimensions)
|
||||
throws UnsupportedEncodingException {
|
||||
|
||||
logger.debug("Creating serie name");
|
||||
|
||||
final StringBuilder serieNameBuilder = new StringBuilder();
|
||||
|
||||
logger.debug("Adding tenant_id to serie name: {}", definition.getTenantId());
|
||||
serieNameBuilder.append(urlEncodeUTF8(definition.getTenantId()));
|
||||
serieNameBuilder.append("?");
|
||||
|
||||
logger.debug("Adding region to serie name: {}", definition.getRegion());
|
||||
serieNameBuilder.append(urlEncodeUTF8(definition.getRegion()));
|
||||
serieNameBuilder.append("&");
|
||||
logger.debug("Adding name to serie name: {}", definition.getName());
|
||||
serieNameBuilder.append(urlEncodeUTF8(definition.getName()));
|
||||
|
||||
for (final String name : dimensions.keySet()) {
|
||||
final String value = dimensions.get(name);
|
||||
serieNameBuilder.append("&");
|
||||
logger.debug("Adding dimension name to serie name: {}", name);
|
||||
serieNameBuilder.append(urlEncodeUTF8(name));
|
||||
serieNameBuilder.append("=");
|
||||
logger.debug("Adding dimension value to serie name: {}", value);
|
||||
serieNameBuilder.append(urlEncodeUTF8(value));
|
||||
}
|
||||
|
||||
final String serieName = serieNameBuilder.toString();
|
||||
logger.debug("Created serie name: {}", serieName);
|
||||
|
||||
return serieName;
|
||||
}
|
||||
|
||||
|
||||
private String urlEncodeUTF8(final String s) throws UnsupportedEncodingException {
|
||||
return URLEncoder.encode(s, "UTF-8");
|
||||
}
|
||||
}
|
@ -1,101 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
* implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package monasca.persister.repository.influxdb;
|
||||
|
||||
import monasca.persister.configuration.PersisterConfig;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
|
||||
import org.influxdb.InfluxDB;
|
||||
import org.influxdb.InfluxDBFactory;
|
||||
import org.influxdb.dto.Serie;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import io.dropwizard.setup.Environment;
|
||||
|
||||
public class InfluxV8RepoWriter {
|
||||
|
||||
static final Logger logger = LoggerFactory.getLogger(InfluxV8RepoWriter.class);
|
||||
|
||||
private final PersisterConfig config;
|
||||
private final Environment env;
|
||||
private final InfluxDB influxDB;
|
||||
|
||||
private final String databaseName;
|
||||
|
||||
@Inject
|
||||
public InfluxV8RepoWriter(final PersisterConfig config, final Environment env) {
|
||||
|
||||
this.config = config;
|
||||
this.env = env;
|
||||
this.influxDB =
|
||||
InfluxDBFactory.connect(config.getInfluxDBConfiguration().getUrl(),
|
||||
config.getInfluxDBConfiguration().getUser(),
|
||||
config.getInfluxDBConfiguration().getPassword());
|
||||
|
||||
this.databaseName = this.config.getInfluxDBConfiguration().getName();
|
||||
|
||||
}
|
||||
|
||||
protected void write(final TimeUnit precision, final Serie[] series) {
|
||||
|
||||
this.influxDB.write(this.databaseName, precision, series);
|
||||
}
|
||||
|
||||
protected void logColValues(final Serie serie) {
|
||||
logger.debug("Added array of array of column values to serie");
|
||||
final String[] colNames = serie.getColumns();
|
||||
List<Map<String, Object>> rows = serie.getRows();
|
||||
int outerIdx = 0;
|
||||
for (Map<String, Object> row : rows) {
|
||||
StringBuffer sb = new StringBuffer();
|
||||
boolean first = true;
|
||||
for (String colName : colNames) {
|
||||
if (first) {
|
||||
first = false;
|
||||
} else {
|
||||
sb.append(",");
|
||||
}
|
||||
sb.append(row.get(colName));
|
||||
}
|
||||
logger.debug("Array of column values[{}]: [{}]", outerIdx, sb);
|
||||
outerIdx++;
|
||||
}
|
||||
}
|
||||
|
||||
protected void logColumnNames(final String[] colNames) {
|
||||
logger.debug("Added array of column names to serie");
|
||||
StringBuffer sb = new StringBuffer();
|
||||
boolean first = true;
|
||||
for (String colName : colNames) {
|
||||
if (first) {
|
||||
first = false;
|
||||
} else {
|
||||
sb.append(",");
|
||||
}
|
||||
sb.append(colName);
|
||||
}
|
||||
logger.debug("Array of column names: [{}]", sb);
|
||||
}
|
||||
|
||||
}
|
@ -49,10 +49,6 @@ databaseConfiguration:
|
||||
# databaseType: vertica
|
||||
|
||||
influxDbConfiguration:
|
||||
# Version and retention policy must be set for V9. If no
|
||||
# version set then defaults to V8.
|
||||
# version can be (V8 | V9)
|
||||
version: V9
|
||||
# Retention policy may be left blank to indicate default policy.
|
||||
retentionPolicy:
|
||||
# Used only if version is V9.
|
||||
|
Loading…
x
Reference in New Issue
Block a user