diff --git a/java/src/main/java/monasca/persister/pipeline/event/AlarmStateTransitionedEventHandler.java b/java/src/main/java/monasca/persister/pipeline/event/AlarmStateTransitionedEventHandler.java index 6bce6407..a5be430f 100644 --- a/java/src/main/java/monasca/persister/pipeline/event/AlarmStateTransitionedEventHandler.java +++ b/java/src/main/java/monasca/persister/pipeline/event/AlarmStateTransitionedEventHandler.java @@ -63,10 +63,21 @@ public class AlarmStateTransitionedEventHandler extends } @Override - protected int process(String msg) throws IOException { + protected int process(String msg) { - AlarmStateTransitionedEvent alarmStateTransitionedEvent = - this.objectMapper.readValue(msg, AlarmStateTransitionedEvent.class); + AlarmStateTransitionedEvent alarmStateTransitionedEvent; + + try { + + alarmStateTransitionedEvent = + this.objectMapper.readValue(msg, AlarmStateTransitionedEvent.class); + + } catch (IOException e) { + + logger.error("[{}]: failed to deserialize message {}", this.threadId, msg, e); + + return 0; + } logger.debug("[{}]: [{}:{}] {}", this.threadId, @@ -74,7 +85,7 @@ public class AlarmStateTransitionedEventHandler extends this.getMsgCount(), alarmStateTransitionedEvent); - this.alarmRepo.addToBatch(alarmStateTransitionedEvent); + this.alarmRepo.addToBatch(alarmStateTransitionedEvent, this.threadId); this.alarmStateTransitionCounter.inc(); diff --git a/java/src/main/java/monasca/persister/pipeline/event/FlushableHandler.java b/java/src/main/java/monasca/persister/pipeline/event/FlushableHandler.java index f081b37d..0011a905 100644 --- a/java/src/main/java/monasca/persister/pipeline/event/FlushableHandler.java +++ b/java/src/main/java/monasca/persister/pipeline/event/FlushableHandler.java @@ -28,8 +28,6 @@ import io.dropwizard.setup.Environment; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; - public abstract class FlushableHandler { private static final Logger logger = LoggerFactory.getLogger(FlushableHandler.class); @@ -43,8 +41,8 @@ public abstract class FlushableHandler { private long batchCount = 0; private final Meter processedMeter; - private final Meter commitMeter; - private final Timer commitTimer; + private final Meter flushMeter; + private final Timer flushTimer; protected final String threadId; @@ -67,14 +65,13 @@ public abstract class FlushableHandler { threadId); this.processedMeter = - environment.metrics() - .meter(handlerName + "." + "events-processed-processedMeter"); + environment.metrics().meter(handlerName + "." + "events-processed-meter"); - this.commitMeter = - environment.metrics().meter(handlerName + "." + "commits-executed-processedMeter"); + this.flushMeter = + environment.metrics().meter(handlerName + "." + "flush-meter"); - this.commitTimer = - environment.metrics().timer(handlerName + "." + "total-commit-and-flush-timer"); + this.flushTimer = + environment.metrics().timer(handlerName + "." + "flush-timer"); this.secondsBetweenFlushes = configuration.getMaxBatchTime(); @@ -90,7 +87,7 @@ public abstract class FlushableHandler { protected abstract int flushRepository() throws Exception; - protected abstract int process(String msg) throws IOException; + protected abstract int process(String msg); public boolean onEvent(final String msg) throws Exception { @@ -150,7 +147,7 @@ public abstract class FlushableHandler { private boolean isFlushTime() { - logger.debug("[{}}: got heartbeat message, checking flush time. flush every {} seconds.", + logger.debug("[{}]: got heartbeat message, checking flush time. flush every {} seconds.", this.threadId, this.secondsBetweenFlushes); @@ -159,7 +156,7 @@ public abstract class FlushableHandler { if (this.flushTimeMillis <= now ) { logger.debug( - "[{}]: {} millis past flush time. flushing to repository now.", + "[{}]: {} ms past flush time. flushing to repository now.", this.threadId, now - this.flushTimeMillis); @@ -168,7 +165,7 @@ public abstract class FlushableHandler { } else { logger.debug( - "[{}]: {} millis to next flush time. no need to flush at this time.", + "[{}]: {} ms to next flush time. no need to flush at this time.", this.threadId, this.flushTimeMillis - now); @@ -181,13 +178,13 @@ public abstract class FlushableHandler { logger.debug("[{}]: flushing", this.threadId); - Timer.Context context = this.commitTimer.time(); + Timer.Context context = this.flushTimer.time(); int msgFlushCnt = flushRepository(); context.stop(); - this.commitMeter.mark(); + this.flushMeter.mark(); this.flushTimeMillis = System.currentTimeMillis() + this.millisBetweenFlushes; diff --git a/java/src/main/java/monasca/persister/pipeline/event/MetricHandler.java b/java/src/main/java/monasca/persister/pipeline/event/MetricHandler.java index 03fe38c7..8928018a 100644 --- a/java/src/main/java/monasca/persister/pipeline/event/MetricHandler.java +++ b/java/src/main/java/monasca/persister/pipeline/event/MetricHandler.java @@ -63,10 +63,20 @@ public class MetricHandler extends FlushableHandler { } @Override - public int process(String msg) throws IOException { + public int process(String msg) { - MetricEnvelope[] metricEnvelopesArry = - this.objectMapper.readValue(msg, MetricEnvelope[].class); + MetricEnvelope[] metricEnvelopesArry; + + try { + + metricEnvelopesArry = this.objectMapper.readValue(msg, MetricEnvelope[].class); + + } catch (IOException e) { + + logger.error("[{}]: failed to deserialize message {}", this.threadId, msg, e); + + return 0; + } for (final MetricEnvelope metricEnvelope : metricEnvelopesArry) { @@ -85,7 +95,7 @@ public class MetricHandler extends FlushableHandler { this.getMsgCount(), metricEnvelope); - this.metricRepo.addToBatch(metricEnvelope); + this.metricRepo.addToBatch(metricEnvelope, this.threadId); this.metricCounter.inc(); diff --git a/java/src/main/java/monasca/persister/repository/Repo.java b/java/src/main/java/monasca/persister/repository/Repo.java index f2aab918..be7c7d30 100644 --- a/java/src/main/java/monasca/persister/repository/Repo.java +++ b/java/src/main/java/monasca/persister/repository/Repo.java @@ -18,7 +18,7 @@ package monasca.persister.repository; public interface Repo { - void addToBatch(final T msg); + void addToBatch(final T msg, String id); int flush(String id) throws Exception; diff --git a/java/src/main/java/monasca/persister/repository/influxdb/InfluxAlarmRepo.java b/java/src/main/java/monasca/persister/repository/influxdb/InfluxAlarmRepo.java index cb693e50..b990e07d 100644 --- a/java/src/main/java/monasca/persister/repository/influxdb/InfluxAlarmRepo.java +++ b/java/src/main/java/monasca/persister/repository/influxdb/InfluxAlarmRepo.java @@ -24,7 +24,8 @@ import java.util.List; import io.dropwizard.setup.Environment; -public abstract class InfluxAlarmRepo extends InfluxRepo { +public abstract class InfluxAlarmRepo + extends InfluxRepo { protected static final String ALARM_STATE_HISTORY_NAME = "alarm_state_history"; @@ -42,7 +43,7 @@ public abstract class InfluxAlarmRepo extends InfluxRepo { protected final MeasurementBuffer measurementBuffer = new MeasurementBuffer(); - protected final Meter measurementMeter; - public InfluxMetricRepo(final Environment env) { super(env); - this.measurementMeter = - env.metrics().meter(this.getClass().getName() + ".measurement-meter"); - } @Override - public void addToBatch(MetricEnvelope metricEnvelope) { + public void addToBatch(MetricEnvelope metricEnvelope, String id) { Metric metric = metricEnvelope.metric; @@ -64,8 +57,6 @@ public abstract class InfluxMetricRepo extends InfluxRepo { this.measurementBuffer.put(definition, dimensions, measurement); - this.measurementMeter.mark(); - } @Override diff --git a/java/src/main/java/monasca/persister/repository/influxdb/InfluxRepo.java b/java/src/main/java/monasca/persister/repository/influxdb/InfluxRepo.java index 059ea9c1..57cdcf42 100644 --- a/java/src/main/java/monasca/persister/repository/influxdb/InfluxRepo.java +++ b/java/src/main/java/monasca/persister/repository/influxdb/InfluxRepo.java @@ -33,7 +33,7 @@ public abstract class InfluxRepo implements Repo { public InfluxRepo (final Environment env) { this.flushTimer = - env.metrics().timer(this.getClass().getName() + ".flush-timer"); + env.metrics().timer(this.getClass().getName() + "." + "flush-timer"); } @@ -69,7 +69,7 @@ public abstract class InfluxRepo implements Repo { context.stop(); - logger.debug("[{}]: writing to influxdb took {} millis", id, endTime - startTime); + logger.debug("[{}]: writing to influxdb took {} ms", id, endTime - startTime); clearBuffers(); diff --git a/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9AlarmRepo.java b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9AlarmRepo.java index cb45af1b..cae13972 100644 --- a/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9AlarmRepo.java +++ b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9AlarmRepo.java @@ -21,6 +21,7 @@ import monasca.common.model.event.AlarmStateTransitionedEvent; import com.google.inject.Inject; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.PropertyNamingStrategy; @@ -28,6 +29,8 @@ import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.joda.time.format.DateTimeFormatter; import org.joda.time.format.ISODateTimeFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.LinkedList; @@ -38,6 +41,8 @@ import io.dropwizard.setup.Environment; public class InfluxV9AlarmRepo extends InfluxAlarmRepo { + private static final Logger logger = LoggerFactory.getLogger(InfluxV9AlarmRepo.class); + private final InfluxV9RepoWriter influxV9RepoWriter; private final ObjectMapper objectMapper = new ObjectMapper(); @@ -60,11 +65,11 @@ public class InfluxV9AlarmRepo extends InfluxAlarmRepo { @Override protected int write(String id) throws Exception { - return this.influxV9RepoWriter.write(getInfluxPointArry(), id); + return this.influxV9RepoWriter.write(getInfluxPointArry(id), id); } - private InfluxPoint[] getInfluxPointArry() throws Exception { + private InfluxPoint[] getInfluxPointArry(String id) throws Exception { List influxPointList = new LinkedList<>(); @@ -76,13 +81,33 @@ public class InfluxV9AlarmRepo extends InfluxAlarmRepo { valueMap.put("alarm_id", event.alarmId); + try { + valueMap.put("metrics", this.objectMapper.writeValueAsString(event.metrics)); + } catch (JsonProcessingException e) { + + logger.error("[{}]: failed to serialize metrics {}", id, event.metrics, e); + + valueMap.put("metrics", ""); + + } + valueMap.put("old_state", event.oldState); valueMap.put("new_state", event.newState); - valueMap.put("sub_alarms", this.objectMapper.writeValueAsString(event.subAlarms)); + try { + + valueMap.put("sub_alarms", this.objectMapper.writeValueAsString(event.subAlarms)); + + } catch (JsonProcessingException e) { + + logger.error("[{}]: failed to serialize sub alarms {}", id, event.subAlarms, e); + + valueMap.put("sub_alarms", ""); + + } valueMap.put("reason", event.stateChangeReason); diff --git a/java/src/main/java/monasca/persister/repository/vertica/VerticaAlarmRepo.java b/java/src/main/java/monasca/persister/repository/vertica/VerticaAlarmRepo.java index 42643e3e..02c8b824 100644 --- a/java/src/main/java/monasca/persister/repository/vertica/VerticaAlarmRepo.java +++ b/java/src/main/java/monasca/persister/repository/vertica/VerticaAlarmRepo.java @@ -19,8 +19,11 @@ package monasca.persister.repository.vertica; import monasca.common.model.event.AlarmStateTransitionedEvent; import monasca.persister.configuration.PersisterConfig; +import monasca.persister.repository.Repo; import com.codahale.metrics.Timer; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.PreparedBatch; @@ -36,7 +39,6 @@ import java.util.TimeZone; import javax.inject.Inject; import io.dropwizard.setup.Environment; -import monasca.persister.repository.Repo; public class VerticaAlarmRepo extends VerticaRepo implements Repo { @@ -44,59 +46,127 @@ public class VerticaAlarmRepo extends VerticaRepo implements Repo { @@ -72,8 +70,13 @@ public class VerticaMetricRepo extends VerticaRepo implements Repo dimensionIdSet = new HashSet<>(); private final Set definitionDimensionsIdSet = new HashSet<>(); + private int measurementCnt = 0; + + private final ObjectMapper objectMapper = new ObjectMapper(); + private static final String SQL_INSERT_INTO_METRICS = - "insert into MonMetrics.measurements (definition_dimensions_id, time_stamp, value) values (:definition_dimension_id, :time_stamp, :value)"; + "insert into MonMetrics.measurements (definition_dimensions_id, time_stamp, value, value_meta) " + + "values (:definition_dimension_id, :time_stamp, :value, :value_meta)"; private static final String DEFINITIONS_TEMP_STAGING_TABLE = "(" + " id BINARY(20) NOT NULL," + " name VARCHAR(255) NOT NULL," + " tenant_id VARCHAR(255) NOT NULL," @@ -100,13 +103,8 @@ public class VerticaMetricRepo extends VerticaRepo implements Repo meta = metricEnvelope.meta; - logger.debug("metric: {}", metric); - logger.debug("meta: {}", meta); + String tenantId = getMeta(TENANT_ID, metric, meta, id); - String tenantId = ""; - if (meta.containsKey(TENANT_ID)) { - tenantId = (String) meta.get(TENANT_ID); - } else { - logger.warn( - "Failed to find tenantId in message envelope meta data. Metric message may be malformed" - + ". Setting tenantId to empty string."); - logger.warn("metric: {}", metric.toString()); - logger.warn("meta: {}", meta.toString()); - } - - String region = ""; - if (meta.containsKey(REGION)) { - region = (String) meta.get(REGION); - } else { - logger.warn( - "Failed to find region in message envelope meta data. Metric message may be malformed. " - + "Setting region to empty string."); - logger.warn("metric: {}", metric.toString()); - logger.warn("meta: {}", meta.toString()); - } + String region = getMeta(REGION, metric, meta, id); // Add the definition to the batch. - StringBuilder - definitionIdStringToHash = - new StringBuilder(trunc(metric.getName(), MAX_COLUMN_LENGTH)); - definitionIdStringToHash.append(trunc(tenantId, MAX_COLUMN_LENGTH)); - definitionIdStringToHash.append(trunc(region, MAX_COLUMN_LENGTH)); + StringBuilder definitionIdStringToHash = + new StringBuilder(trunc(metric.getName(), MAX_COLUMN_LENGTH, id)); + + definitionIdStringToHash.append(trunc(tenantId, MAX_COLUMN_LENGTH, id)); + + definitionIdStringToHash.append(trunc(region, MAX_COLUMN_LENGTH, id)); + byte[] definitionIdSha1Hash = DigestUtils.sha(definitionIdStringToHash.toString()); + Sha1HashId definitionSha1HashId = new Sha1HashId((definitionIdSha1Hash)); - this.addDefinitionToBatch(definitionSha1HashId, trunc(metric.getName(), MAX_COLUMN_LENGTH), - trunc(tenantId, MAX_COLUMN_LENGTH), trunc(region, MAX_COLUMN_LENGTH)); - definitionCounter.inc(); + + addDefinitionToBatch(definitionSha1HashId, trunc(metric.getName(), MAX_COLUMN_LENGTH, id), + trunc(tenantId, MAX_COLUMN_LENGTH, id), + trunc(region, MAX_COLUMN_LENGTH, id), id); // Calculate dimensions sha1 hash id. StringBuilder dimensionIdStringToHash = new StringBuilder(); - Map preppedDimMap = prepDimensions(metric.getDimensions()); + + Map preppedDimMap = prepDimensions(metric.getDimensions(), id); + for (Map.Entry entry : preppedDimMap.entrySet()) { + dimensionIdStringToHash.append(entry.getKey()); + dimensionIdStringToHash.append(entry.getValue()); } + byte[] dimensionIdSha1Hash = DigestUtils.sha(dimensionIdStringToHash.toString()); + Sha1HashId dimensionsSha1HashId = new Sha1HashId(dimensionIdSha1Hash); // Add the dimension name/values to the batch. - this.addDimensionsToBatch(dimensionsSha1HashId, preppedDimMap); + addDimensionsToBatch(dimensionsSha1HashId, preppedDimMap, id); // Add the definition dimensions to the batch. - StringBuilder - definitionDimensionsIdStringToHash = + StringBuilder definitionDimensionsIdStringToHash = new StringBuilder(definitionSha1HashId.toHexString()); + definitionDimensionsIdStringToHash.append(dimensionsSha1HashId.toHexString()); - byte[] - definitionDimensionsIdSha1Hash = + + byte[] definitionDimensionsIdSha1Hash = DigestUtils.sha(definitionDimensionsIdStringToHash.toString()); + Sha1HashId definitionDimensionsSha1HashId = new Sha1HashId(definitionDimensionsIdSha1Hash); + this.addDefinitionDimensionToBatch(definitionDimensionsSha1HashId, definitionSha1HashId, - dimensionsSha1HashId); - definitionDimensionsCounter.inc(); + dimensionsSha1HashId, id); // Add the measurement to the batch. String timeStamp = simpleDateFormat.format(new Date(metric.getTimestamp())); - double value = metric.getValue(); - this.addMetricToBatch(definitionDimensionsSha1HashId, timeStamp, value, metric.getValueMeta()); - this.metricCounter.inc(); + double value = metric.getValue(); + + addMetricToBatch(definitionDimensionsSha1HashId, timeStamp, value, metric.getValueMeta(), id); + + } + + private String getMeta(String name, Metric metric, Map meta, String id) { + + if (meta.containsKey(name)) { + + return (String) meta.get(name); + + } else { + + logger.warn( + "[{}]: failed to find {} in message envelope meta data. metric message may be malformed. " + + "setting {} to empty string.", id, name); + + logger.warn("[{}]: metric: {}", id, metric.toString()); + + logger.warn("[{}]: meta: {}", id, meta.toString()); + + return ""; + } } public void addMetricToBatch(Sha1HashId defDimsId, String timeStamp, double value, - Map valueMeta) { - // TODO: Actually handle valueMeta - logger.debug("Adding metric to batch: defDimsId: {}, time: {}, value: {}", - defDimsId.toHexString(), timeStamp, value); - metricsBatch.add().bind("definition_dimension_id", defDimsId.getSha1Hash()) - .bind("time_stamp", timeStamp).bind("value", value); + Map valueMeta, String id) { + + String valueMetaString = getValueMetaString(valueMeta, id); + + logger.debug("[{}]: adding metric to batch: defDimsId: {}, time: {}, value: {}, value meta {}", + id, defDimsId.toHexString(), timeStamp, value, valueMetaString); + + metricsBatch.add() + .bind("definition_dimension_id", defDimsId.getSha1Hash()) + .bind("time_stamp", timeStamp) + .bind("value", value) + .bind("value_meta", valueMetaString); + + this.measurementCnt++; + measurementMeter.mark(); } - private void addDefinitionToBatch(Sha1HashId defId, String name, String tenantId, String region) { + private String getValueMetaString(Map valueMeta, String id) { + + String valueMetaString = ""; + + if (valueMeta != null && !valueMeta.isEmpty()) { + + try { + + valueMetaString = this.objectMapper.writeValueAsString(valueMeta); + + } catch (JsonProcessingException e) { + + logger + .error("[{}]: Failed to serialize value meta {}, dropping value meta from measurement", + id, valueMeta); + } + } + + return valueMetaString; + } + + private void addDefinitionToBatch(Sha1HashId defId, String name, String tenantId, String region, String id) { if (definitionsIdCache.getIfPresent(defId) == null) { @@ -337,11 +378,17 @@ public class VerticaMetricRepo extends VerticaRepo implements Repo dimMap) { + private void addDimensionsToBatch(Sha1HashId dimSetId, Map dimMap, String id) { if (dimensionsIdCache.getIfPresent(dimSetId) == null) { @@ -364,10 +411,14 @@ public class VerticaMetricRepo extends VerticaRepo implements Repo prepDimensions(Map dimMap) { + private Map prepDimensions(Map dimMap, String id) { Map newDimMap = new TreeMap<>(); if (dimMap != null) { + for (String dimName : dimMap.keySet()) { + if (dimName != null && !dimName.isEmpty()) { + String dimValue = dimMap.get(dimName); + if (dimValue != null && !dimValue.isEmpty()) { - newDimMap.put(trunc(dimName, MAX_COLUMN_LENGTH), trunc(dimValue, MAX_COLUMN_LENGTH)); - dimensionCounter.inc(); + + newDimMap.put(trunc(dimName, MAX_COLUMN_LENGTH, id), + trunc(dimValue, MAX_COLUMN_LENGTH, id)); + } } } } + return newDimMap; + } - private String trunc(String s, int l) { + private String trunc(String s, int l, String id) { if (s == null) { + return ""; + } else if (s.length() <= l) { + return s; + } else { + String r = s.substring(0, l); - logger.warn("Input string exceeded max column length. Truncating input string {} to {} chars", - s, l); - logger.warn("Resulting string {}", r); + + logger.warn( "[{}]: input string exceeded max column length. truncating input string {} to {} chars", + id, s, l); + + logger.warn("[{}]: resulting string {}", id, r); + return r; } }