diff --git a/pom.xml b/pom.xml index 678de22f..81aa25e0 100644 --- a/pom.xml +++ b/pom.xml @@ -8,21 +8,6 @@ mon-persister 1.0-SNAPSHOT - - - nexus 3rd party - http://nexus.paas.hpcloud.net:8081/nexus/content/repositories/releases - - - ps-snapshots - http://nexus.paas.hpcloud.net:8081/nexus/content/repositories/snapshots/ - - - ps-3rd-party - http://nexus.paas.hpcloud.net:8081/nexus/content/repositories/thirdparty - - - UTF-8 UTF-8 @@ -100,11 +85,6 @@ 4.11 test - - com.vertica - vertica-jdbc - 6.1.0 - org.codehaus.mojo buildnumber-maven-plugin diff --git a/src/deb/init/mon-persister.conf b/src/deb/init/mon-persister.conf index 25e58e0a..1cd8850b 100644 --- a/src/deb/init/mon-persister.conf +++ b/src/deb/init/mon-persister.conf @@ -8,4 +8,4 @@ respawn setgid persister setuid persister -exec /usr/bin/java -Dfile.encoding=UTF-8 -Xmx8g -cp /opt/mon/mon-persister.jar com.hpcloud.mon.persister.MonPersisterApplication server /etc/mon/persister-config.yml +exec /usr/bin/java -Dfile.encoding=UTF-8 -Xmx8g -cp /opt/mon/mon-persister.jar:/opt/mon/vertica/vertica_jdbc.jar com.hpcloud.mon.persister.MonPersisterApplication server /etc/mon/persister-config.yml diff --git a/src/main/java/com/hpcloud/mon/persister/repository/InfluxDBAlarmRepository.java b/src/main/java/com/hpcloud/mon/persister/repository/InfluxDBAlarmRepository.java index 3f0a60fe..cc1a70ef 100644 --- a/src/main/java/com/hpcloud/mon/persister/repository/InfluxDBAlarmRepository.java +++ b/src/main/java/com/hpcloud/mon/persister/repository/InfluxDBAlarmRepository.java @@ -26,9 +26,8 @@ import com.google.inject.Inject; import io.dropwizard.setup.Environment; -import org.influxdb.InfluxDB; -import org.influxdb.InfluxDBFactory; import org.influxdb.dto.Serie; +import org.influxdb.dto.Serie.Builder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,42 +35,28 @@ import java.util.LinkedList; import java.util.List; import java.util.concurrent.TimeUnit; -public class InfluxDBAlarmRepository implements AlarmRepository { +public class InfluxDBAlarmRepository extends InfluxRepository implements AlarmRepository { - private static final Logger logger = LoggerFactory.getLogger(InfluxDBAlarmRepository.class); + static final Logger logger = LoggerFactory.getLogger(InfluxDBAlarmRepository.class); private static final String ALARM_STATE_HISTORY_NAME = "alarm_state_history"; - private static final int ALARM_STATE_HISTORY_NUM_COLUMNS = 7; private final String[] colNamesStringArry = {"tenant_id", "alarm_id", "old_state", "new_state", "reason", "reason_data", "time"}; - private static final int ALARM_STATE_HISTORY_COLUMN_NUMBER = 7; - - private final MonPersisterConfiguration configuration; - private final Environment environment; - private final InfluxDB influxDB; + protected final Timer flushTimer; private List alarmStateTransitionedEventList = new LinkedList<>(); - private final Timer flushTimer; public final Meter alarmStateHistoryMeter; @Inject public InfluxDBAlarmRepository(MonPersisterConfiguration configuration, Environment environment) { - this.configuration = configuration; - this.environment = environment; - influxDB = - InfluxDBFactory.connect(configuration.getInfluxDBConfiguration().getUrl(), configuration - .getInfluxDBConfiguration().getUser(), configuration.getInfluxDBConfiguration() - .getPassword()); - + super(configuration, environment); this.flushTimer = this.environment.metrics().timer(this.getClass().getName() + "." + "flush-timer"); this.alarmStateHistoryMeter = this.environment.metrics().meter( this.getClass().getName() + "." + "alarm_state_history-meter"); - - } @Override @@ -94,40 +79,30 @@ public class InfluxDBAlarmRepository implements AlarmRepository { long startTime = System.currentTimeMillis(); Timer.Context context = flushTimer.time(); - Serie serie = new Serie(ALARM_STATE_HISTORY_NAME); - logger.debug("Created serie: {}", serie.getName()); + final Builder builder = new Serie.Builder(ALARM_STATE_HISTORY_NAME); + logger.debug("Created serie: {}", ALARM_STATE_HISTORY_NAME); - serie.setColumns(this.colNamesStringArry); + builder.columns(this.colNamesStringArry); if (logger.isDebugEnabled()) { - logColumnNames(serie); + logColumnNames(this.colNamesStringArry); } - Object[][] colValsObjectArry = - new Object[this.alarmStateTransitionedEventList.size()][ALARM_STATE_HISTORY_NUM_COLUMNS]; - int i = 0; for (AlarmStateTransitionedEvent alarmStateTransitionedEvent : alarmStateTransitionedEventList) { - int j = 0; - colValsObjectArry[i][j++] = alarmStateTransitionedEvent.tenantId; - colValsObjectArry[i][j++] = alarmStateTransitionedEvent.alarmId; - colValsObjectArry[i][j++] = alarmStateTransitionedEvent.oldState; - colValsObjectArry[i][j++] = alarmStateTransitionedEvent.newState; - colValsObjectArry[i][j++] = alarmStateTransitionedEvent.stateChangeReason; - colValsObjectArry[i][j++] = "{}"; - colValsObjectArry[i][j++] = alarmStateTransitionedEvent.timestamp; - i++; + builder.values(alarmStateTransitionedEvent.tenantId, alarmStateTransitionedEvent.alarmId, + alarmStateTransitionedEvent.oldState, alarmStateTransitionedEvent.newState, + alarmStateTransitionedEvent.stateChangeReason, "{}", + alarmStateTransitionedEvent.timestamp); } - serie.setPoints(colValsObjectArry); + final Serie[] series = {builder.build()}; if (logger.isDebugEnabled()) { - logColValues(serie); + logColValues(series[0]); } - Serie[] series = {serie}; - - this.influxDB.write(this.configuration.getInfluxDBConfiguration().getName(), series, - TimeUnit.SECONDS); + this.influxDB.write(this.configuration.getInfluxDBConfiguration().getName(), + TimeUnit.SECONDS, series); context.stop(); long endTime = System.currentTimeMillis(); @@ -139,38 +114,4 @@ public class InfluxDBAlarmRepository implements AlarmRepository { this.alarmStateTransitionedEventList.clear(); } - - private void logColValues(Serie serie) { - logger.debug("Added array of array of column values to serie"); - int outerIdx = 0; - for (Object[] colValArry : serie.getPoints()) { - StringBuffer sb = new StringBuffer(); - boolean first = true; - for (Object colVal : colValArry) { - if (first) { - first = false; - } else { - sb.append(","); - } - sb.append(colVal); - } - logger.debug("Array of column values[{}]: [{}]", outerIdx, sb); - outerIdx++; - } - } - - private void logColumnNames(Serie serie) { - logger.debug("Added array of column names to serie"); - StringBuffer sb = new StringBuffer(); - boolean first = true; - for (String colName : serie.getColumns()) { - if (first) { - first = false; - } else { - sb.append(","); - } - sb.append(colName); - } - logger.debug("Array of column names: [{}]", sb); - } } diff --git a/src/main/java/com/hpcloud/mon/persister/repository/InfluxDBMetricRepository.java b/src/main/java/com/hpcloud/mon/persister/repository/InfluxDBMetricRepository.java index d9d9805b..8584244b 100644 --- a/src/main/java/com/hpcloud/mon/persister/repository/InfluxDBMetricRepository.java +++ b/src/main/java/com/hpcloud/mon/persister/repository/InfluxDBMetricRepository.java @@ -26,9 +26,8 @@ import com.google.inject.Inject; import io.dropwizard.setup.Environment; import org.apache.commons.codec.digest.DigestUtils; -import org.influxdb.InfluxDB; -import org.influxdb.InfluxDBFactory; import org.influxdb.dto.Serie; +import org.influxdb.dto.Serie.Builder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,14 +42,10 @@ import java.util.Set; import java.util.TreeSet; import java.util.concurrent.TimeUnit; -public class InfluxDBMetricRepository implements MetricRepository { +public class InfluxDBMetricRepository extends InfluxRepository implements MetricRepository { private static final Logger logger = LoggerFactory.getLogger(InfluxDBMetricRepository.class); - private final MonPersisterConfiguration configuration; - private final Environment environment; - private final InfluxDB influxDB; - private final List measurementList = new LinkedList<>(); private final Map definitionMap = new HashMap<>(); private final Map> dimensionMap = new HashMap<>(); @@ -66,17 +61,11 @@ public class InfluxDBMetricRepository implements MetricRepository { @Inject public InfluxDBMetricRepository(MonPersisterConfiguration configuration, Environment environment) { - this.configuration = configuration; - this.environment = environment; - influxDB = InfluxDBFactory.connect(configuration.getInfluxDBConfiguration().getUrl(), - configuration.getInfluxDBConfiguration().getUser(), - configuration.getInfluxDBConfiguration().getPassword()); - + super(configuration, environment); this.flushTimer = this.environment.metrics().timer(this.getClass().getName() + "." + "flush-timer"); this.measurementMeter = this.environment.metrics().meter(this.getClass().getName() + "." + "measurement-meter"); - } @Override @@ -119,8 +108,8 @@ public class InfluxDBMetricRepository implements MetricRepository { Timer.Context context = flushTimer.time(); Map, List>> defMap = getInfluxDBFriendlyMap(); Serie[] series = getSeries(defMap); - this.influxDB.write(this.configuration.getInfluxDBConfiguration().getName(), series, - TimeUnit.SECONDS); + this.influxDB.write(this.configuration.getInfluxDBConfiguration().getName(), + TimeUnit.SECONDS, series); long endTime = System.currentTimeMillis(); context.stop(); logger.debug("Writing measurements, definitions, and dimensions to database took {} seconds", @@ -147,8 +136,8 @@ public class InfluxDBMetricRepository implements MetricRepository { for (Set dimNameSet : dimNameSetMap.keySet()) { - Serie serie = new Serie(definition.name); - logger.debug("Created serie: {}", serie.getName()); + Builder builder = new Serie.Builder(definition.name); + logger.debug("Created serie: {}", definition.name); // Add 4 for the tenant id, region, timestamp, and value. String[] colNameStringArry = new String[dimNameSet.size() + 4]; @@ -166,10 +155,10 @@ public class InfluxDBMetricRepository implements MetricRepository { logger.debug("Adding column name[{}]: value", j); colNameStringArry[j++] = "value"; - serie.setColumns(colNameStringArry); + builder.columns(colNameStringArry); if (logger.isDebugEnabled()) { - logColNames(serie); + logColumnNames(colNameStringArry); } List pointList = dimNameSetMap.get(dimNameSet); @@ -178,13 +167,13 @@ public class InfluxDBMetricRepository implements MetricRepository { } // Add 4 for the tenant id, region, timestamp, and value. - Object[][] colValsObjectArry = new Object[pointList.size()][dimNameSet.size() + 4]; int k = 0; for (Point point : pointList) { + Object[] colValsObjectArry = new Object[dimNameSet.size() + 4]; logger.debug("Adding column value[{}][0]: {}", k, definition.tenantId); - colValsObjectArry[k][0] = definition.tenantId; + colValsObjectArry[0] = definition.tenantId; logger.debug("Adding column value[{}][1]: {}", k, definition.region); - colValsObjectArry[k][1] = definition.region; + colValsObjectArry[1] = definition.region; int l = 2; for (String dimName : dimNameSet) { String dimVal = point.dimValMap.get(dimName); @@ -192,18 +181,20 @@ public class InfluxDBMetricRepository implements MetricRepository { throw new Exception("Failed to find dimension value for dimension name: " + dimName); } logger.debug("Adding column value[{}][{}]: " + dimVal, k, l); - colValsObjectArry[k][l++] = dimVal; + colValsObjectArry[l++] = dimVal; } Date d = measurementTimeStampSimpleDateFormat.parse(point.measurement.timeStamp + " UTC"); Long time = d.getTime() / 1000; logger.debug("Adding column value[{}][{}]: {}", k, l, time); - colValsObjectArry[k][l++] = time; + colValsObjectArry[l++] = time; logger.debug("Adding column value[{}][{}]: {}", k, l, point.measurement.value); - colValsObjectArry[k][l++] = point.measurement.value; + colValsObjectArry[l++] = point.measurement.value; measurementMeter.mark(); k++; + builder.values(colValsObjectArry); } - serie.setPoints(colValsObjectArry); + + final Serie serie = builder.build(); if (logger.isDebugEnabled()) { logColValues(serie); @@ -218,40 +209,6 @@ public class InfluxDBMetricRepository implements MetricRepository { return serieList.toArray(new Serie[serieList.size()]); } - private void logColValues(Serie serie) { - logger.debug("Added array of array of column values to serie"); - int outerIdx = 0; - for (Object[] colValArry : serie.getPoints()) { - StringBuffer sb = new StringBuffer(); - boolean first = true; - for (Object colVal : colValArry) { - if (first) { - first = false; - } else { - sb.append(","); - } - sb.append(colVal); - } - logger.debug("Array of column values[{}]: [{}]", outerIdx, sb); - outerIdx++; - } - } - - private void logColNames(Serie serie) { - logger.debug("Added array of column names to serie"); - StringBuffer sb = new StringBuffer(); - boolean first = true; - for (String colName : serie.getColumns()) { - if (first) { - first = false; - } else { - sb.append(","); - } - sb.append(colName); - } - logger.debug("Array of column names: [{}]", sb); - } - /** * Group all measurements with the same dimension names into a list. Generate a map of definition * id's to map of dimension name sets to list of points. diff --git a/src/main/java/com/hpcloud/mon/persister/repository/InfluxRepository.java b/src/main/java/com/hpcloud/mon/persister/repository/InfluxRepository.java new file mode 100644 index 00000000..219189e0 --- /dev/null +++ b/src/main/java/com/hpcloud/mon/persister/repository/InfluxRepository.java @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hpcloud.mon.persister.repository; + +import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration; + +import io.dropwizard.setup.Environment; + +import org.influxdb.InfluxDB; +import org.influxdb.InfluxDBFactory; +import org.influxdb.dto.Serie; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; + +public abstract class InfluxRepository { + static final Logger logger = LoggerFactory.getLogger(InfluxRepository.class); + + protected final MonPersisterConfiguration configuration; + protected final Environment environment; + protected final InfluxDB influxDB; + + public InfluxRepository(MonPersisterConfiguration configuration, Environment environment) { + this.configuration = configuration; + this.environment = environment; + influxDB = + InfluxDBFactory.connect(configuration.getInfluxDBConfiguration().getUrl(), configuration + .getInfluxDBConfiguration().getUser(), configuration.getInfluxDBConfiguration() + .getPassword()); + } + + protected void logColValues(Serie serie) { + logger.debug("Added array of array of column values to serie"); + final String[] colNames = serie.getColumns(); + List> rows = serie.getRows(); + int outerIdx = 0; + for (Map row : rows) { + StringBuffer sb = new StringBuffer(); + boolean first = true; + for (String colName : colNames) { + if (first) { + first = false; + } else { + sb.append(","); + } + sb.append(row.get(colName)); + } + logger.debug("Array of column values[{}]: [{}]", outerIdx, sb); + outerIdx++; + } + } + + protected void logColumnNames(String[] colNames) { + logger.debug("Added array of column names to serie"); + StringBuffer sb = new StringBuffer(); + boolean first = true; + for (String colName : colNames) { + if (first) { + first = false; + } else { + sb.append(","); + } + sb.append(colName); + } + logger.debug("Array of column names: [{}]", sb); + } + +}