Minor cleanup

Make methods protected.
Extract methods.
Clean up mixed-case in logging.
Remove unnecessary try-catch block.
Remove extra classes.
Consolidate methods in abstract class.

Change-Id: I6a8064998a2a474c9c51e4ea641387c69142fdd7
This commit is contained in:
Deklan Dieterly 2015-04-22 10:22:44 -06:00
parent c819c4153c
commit 515e4f65d8
15 changed files with 205 additions and 175 deletions

View File

@ -45,8 +45,7 @@ import monasca.persister.pipeline.event.AlarmStateTransitionedEventHandler;
import monasca.persister.pipeline.event.AlarmStateTransitionedEventHandlerFactory;
import monasca.persister.pipeline.event.MetricHandler;
import monasca.persister.pipeline.event.MetricHandlerFactory;
import monasca.persister.repository.AlarmRepo;
import monasca.persister.repository.MetricRepo;
import monasca.persister.repository.Repo;
import monasca.persister.repository.influxdb.InfluxV9AlarmRepo;
import monasca.persister.repository.influxdb.InfluxV9MetricRepo;
import monasca.persister.repository.influxdb.InfluxV9RepoWriter;
@ -141,8 +140,12 @@ public class PersisterModule extends AbstractModule {
if (config.getDatabaseConfiguration().getDatabaseType().equalsIgnoreCase(VERTICA)) {
bind(DBI.class).toProvider(DBIProvider.class).in(Scopes.SINGLETON);
bind(MetricRepo.class).to(VerticaMetricRepo.class);
bind(AlarmRepo.class).to(VerticaAlarmRepo.class);
bind(new TypeLiteral<Repo<MetricEnvelope>>(){})
.to(VerticaMetricRepo.class);
bind(new TypeLiteral<Repo<AlarmStateTransitionedEvent>>(){})
.to(VerticaAlarmRepo.class);
} else if (config.getDatabaseConfiguration().getDatabaseType().equalsIgnoreCase(INFLUXDB)) {
@ -158,8 +161,12 @@ public class PersisterModule extends AbstractModule {
}
bind(InfluxV9RepoWriter.class).in(Singleton.class);
bind(MetricRepo.class).to(InfluxV9MetricRepo.class);
bind(AlarmRepo.class).to(InfluxV9AlarmRepo.class);
bind(new TypeLiteral<Repo<MetricEnvelope>>() {})
.to(InfluxV9MetricRepo.class);
bind(new TypeLiteral<Repo<AlarmStateTransitionedEvent>> () {})
.to(InfluxV9AlarmRepo.class);
} else {

View File

@ -53,18 +53,8 @@ public class KafkaConsumerRunnableBasic<T> implements Runnable {
protected void handleMessage(String msg) {
try {
publishEvent(msg);
} catch (Exception e) {
logger.error(
"[{}]: failed to deserialize JSON message and send to handler: {} ",
threadId,
msg,
e);
}
}
private void markRead() {

View File

@ -19,7 +19,6 @@ package monasca.persister.pipeline.event;
import monasca.common.model.event.AlarmStateTransitionedEvent;
import monasca.persister.configuration.PipelineConfig;
import monasca.persister.repository.AlarmRepo;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
@ -28,6 +27,7 @@ import com.codahale.metrics.Counter;
import com.fasterxml.jackson.databind.DeserializationFeature;
import io.dropwizard.setup.Environment;
import monasca.persister.repository.Repo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -38,13 +38,13 @@ public class AlarmStateTransitionedEventHandler extends
private static final Logger logger =
LoggerFactory.getLogger(AlarmStateTransitionedEventHandler.class);
private final AlarmRepo alarmRepo;
private final Repo<AlarmStateTransitionedEvent> alarmRepo;
private final Counter alarmStateTransitionCounter;
@Inject
public AlarmStateTransitionedEventHandler(
AlarmRepo alarmRepo,
Repo<AlarmStateTransitionedEvent> alarmRepo,
Environment environment,
@Assisted PipelineConfig configuration,
@Assisted("threadId") String threadId,
@ -66,7 +66,7 @@ public class AlarmStateTransitionedEventHandler extends
AlarmStateTransitionedEvent alarmStateTransitionedEvent =
objectMapper.readValue(msg, AlarmStateTransitionedEvent.class);
logger.debug("[{}]: [{}:{}]: {}",
logger.debug("[{}]: [{}:{}] {}",
this.threadId,
this.getBatchCount(),
this.getMsgCount(),

View File

@ -94,31 +94,19 @@ public abstract class FlushableHandler<T> {
if (msg == null) {
logger.debug("[{}]: got heartbeat message, flush every {} seconds.", this.threadId,
this.secondsBetweenFlushes);
return checkFlushTime();
if (this.flushTimeMillis < System.currentTimeMillis()) {
logger.debug("[{}]: {} millis past flush time. flushing to repository now.",
this.threadId, (System.currentTimeMillis() - this.flushTimeMillis));
flush();
return true;
} else {
logger.debug("[{}]: {} millis to next flush time. no need to flush at this time.",
this.threadId, this.flushTimeMillis - System.currentTimeMillis());
return false;
}
}
this.msgCount += process(msg);
this.processedMeter.mark();
this.msgCount += process(msg);
return checkBatchSize();
}
private boolean checkBatchSize() {
if (this.msgCount >= this.batchSize) {
@ -135,6 +123,36 @@ public abstract class FlushableHandler<T> {
}
}
private boolean checkFlushTime() {
logger.debug(
"[{}]: got heartbeat message, flush every {} seconds.",
this.threadId,
this.secondsBetweenFlushes);
if (this.flushTimeMillis < System.currentTimeMillis()) {
logger.debug(
"[{}]: {} millis past flush time. flushing to repository now.",
this.threadId,
(System.currentTimeMillis() - this.flushTimeMillis));
flush();
return true;
} else {
logger.debug(
"[{}]: {} millis to next flush time. no need to flush at this time.",
this.threadId,
this.flushTimeMillis - System.currentTimeMillis());
return false;
}
}
public void flush() {
logger.debug("[{}]: flush", this.threadId);
@ -142,6 +160,8 @@ public abstract class FlushableHandler<T> {
if (this.msgCount == 0) {
logger.debug("[{}]: nothing to flush", this.threadId);
return;
}
Timer.Context context = this.commitTimer.time();
@ -161,13 +181,13 @@ public abstract class FlushableHandler<T> {
}
public long getBatchCount() {
protected long getBatchCount() {
return this.batchCount;
}
public int getMsgCount() {
protected int getMsgCount() {
return this.msgCount;
}

View File

@ -30,20 +30,20 @@ import org.slf4j.LoggerFactory;
import io.dropwizard.setup.Environment;
import monasca.common.model.metric.MetricEnvelope;
import monasca.persister.configuration.PipelineConfig;
import monasca.persister.repository.MetricRepo;
import monasca.persister.repository.Repo;
public class MetricHandler extends FlushableHandler<MetricEnvelope[]> {
private static final Logger logger =
LoggerFactory.getLogger(MetricHandler.class);
private final MetricRepo metricRepo;
private final Repo<MetricEnvelope> metricRepo;
private final Counter metricCounter;
@Inject
public MetricHandler(
MetricRepo metricRepo,
Repo<MetricEnvelope> metricRepo,
Environment environment,
@Assisted PipelineConfig configuration,
@Assisted("threadId") String threadId,
@ -76,7 +76,7 @@ public class MetricHandler extends FlushableHandler<MetricEnvelope[]> {
private void processEnvelope(MetricEnvelope metricEnvelope) {
logger.debug("[{}]: [{}:{}]: {}",
logger.debug("[{}]: [{}:{}] {}",
this.threadId,
this.getBatchCount(),
this.getMsgCount(),

View File

@ -1,27 +0,0 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package monasca.persister.repository;
import monasca.common.model.event.AlarmStateTransitionedEvent;
public interface AlarmRepo {
void addToBatch(final AlarmStateTransitionedEvent message);
void flush(String id);
}

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
* Copyright (c) 2015 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -14,14 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package monasca.persister.repository;
import monasca.common.model.metric.MetricEnvelope;
public interface Repo<T> {
public interface MetricRepo {
void addToBatch(final MetricEnvelope metricEnvelope);
void addToBatch(final T msg);
void flush(String id);
}

View File

@ -15,35 +15,26 @@
package monasca.persister.repository.influxdb;
import monasca.common.model.event.AlarmStateTransitionedEvent;
import monasca.persister.repository.AlarmRepo;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.LinkedList;
import java.util.List;
import io.dropwizard.setup.Environment;
public abstract class InfluxAlarmRepo implements AlarmRepo {
private static final Logger logger = LoggerFactory.getLogger(InfluxAlarmRepo.class);
public abstract class InfluxAlarmRepo extends InfluxRepo<AlarmStateTransitionedEvent> {
protected static final String ALARM_STATE_HISTORY_NAME = "alarm_state_history";
public final Timer flushTimer;
public final Meter alarmStateHistoryMeter;
protected final Meter alarmStateHistoryMeter;
protected List<AlarmStateTransitionedEvent> alarmStateTransitionedEventList = new LinkedList<>();
public InfluxAlarmRepo(final Environment env) {
this.flushTimer =
env.metrics().timer(MetricRegistry.name(getClass(), "flush-timer"));
super(env);
this.alarmStateHistoryMeter =
env.metrics().meter(
@ -61,28 +52,17 @@ public abstract class InfluxAlarmRepo implements AlarmRepo {
}
@Override
public void flush(String id) {
try {
if (this.alarmStateTransitionedEventList.isEmpty()) {
logger.debug("[{}]: no alarm state transition msg to be written to the influxDB", id);
logger.debug("[{}]: returning from flush", id);
return;
}
long startTime = System.currentTimeMillis();
Timer.Context context = flushTimer.time();
write(id);
context.stop();
long endTime = System.currentTimeMillis();
logger.debug("[{}]: flushing batch took {} seconds", id, (endTime - startTime) / 1000);
} catch (Exception e) {
logger.error("[{}]: failed to write alarm state history to database", id, e);
}
protected void clearBuffers() {
this.alarmStateTransitionedEventList.clear();
}
@Override
protected boolean isBufferEmpty() {
return this.alarmStateTransitionedEventList.isEmpty();
}
}

View File

@ -19,94 +19,68 @@ package monasca.persister.repository.influxdb;
import monasca.common.model.metric.Metric;
import monasca.common.model.metric.MetricEnvelope;
import monasca.persister.repository.MetricRepo;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import io.dropwizard.setup.Environment;
public abstract class InfluxMetricRepo implements MetricRepo {
private static final Logger logger = LoggerFactory.getLogger(InfluxMetricRepo.class);
public abstract class InfluxMetricRepo extends InfluxRepo<MetricEnvelope> {
protected final MeasurementBuffer measurementBuffer = new MeasurementBuffer();
public final com.codahale.metrics.Timer flushTimer;
public final Meter measurementMeter;
protected final Meter measurementMeter;
protected abstract void write(String id) throws Exception;
public InfluxMetricRepo(final Environment env) {
this.flushTimer = env.metrics().timer(this.getClass().getName() + ".flush-timer");
this.measurementMeter = env.metrics().meter(this.getClass().getName() + ".measurement-meter");
super(env);
this.measurementMeter =
env.metrics().meter(this.getClass().getName() + ".measurement-meter");
}
@Override
public void addToBatch(MetricEnvelope metricEnvelope) {
Metric metric = metricEnvelope.metric;
Map<String, Object> meta = metricEnvelope.meta;
Definition
definition =
new Definition(metric.getName(), (String) meta.get("tenantId"),
(String) meta.get("region"));
Definition definition =
new Definition(
metric.getName(),
(String) meta.get("tenantId"),
(String) meta.get("region"));
Dimensions dimensions = new Dimensions(metric.getDimensions());
Measurement
measurement =
new Measurement(metric.getTimestamp(), metric.getValue(), metric.getValueMeta());
Measurement measurement =
new Measurement(
metric.getTimestamp(),
metric.getValue(),
metric.getValueMeta());
this.measurementBuffer.put(definition, dimensions, measurement);
this.measurementMeter.mark();
}
@Override
public void flush(String id) {
try {
if (this.measurementBuffer.isEmpty()) {
logger.debug("[{}]: no metric msg to be written to the influxDB", id);
logger.debug("[{}]: returning from flush", id);
return;
}
final long startTime = System.currentTimeMillis();
final Timer.Context context = flushTimer.time();
write(id);
final long endTime = System.currentTimeMillis();
context.stop();
logger.debug("[{}]: flushing batch took {} seconds",
id, (endTime - startTime) / 1000);
} catch (Exception e) {
logger.error("[{}]: failed to write measurements to InfluxDB", id, e);
}
clearBuffers();
}
private void clearBuffers() {
protected void clearBuffers() {
this.measurementBuffer.clear();
}
@Override
protected boolean isBufferEmpty() {
return this.measurementBuffer.isEmpty();
}
}

View File

@ -0,0 +1,84 @@
/*
* Copyright (c) 2015 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package monasca.persister.repository.influxdb;
import com.codahale.metrics.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.dropwizard.setup.Environment;
import monasca.persister.repository.Repo;
public abstract class InfluxRepo<T> implements Repo<T> {
private static final Logger logger = LoggerFactory.getLogger(InfluxRepo.class);
protected final com.codahale.metrics.Timer flushTimer;
public InfluxRepo (final Environment env) {
this.flushTimer =
env.metrics().timer(this.getClass().getName() + ".flush-timer");
}
@Override
public void flush(String id) {
try {
if (isBufferEmpty()) {
logger.debug("[{}]: no msg to be written to influxdb", id);
logger.debug("[{}]: returning from flush without flushing", id);
return;
}
final long startTime = System.currentTimeMillis();
final Timer.Context context = flushTimer.time();
write(id);
final long endTime = System.currentTimeMillis();
context.stop();
logger.debug("[{}]: flushing batch took {} millis",
id, endTime - startTime);
} catch (Exception e) {
logger.error("[{}]: failed to write msg to influxdb", id, e);
}
clearBuffers();
}
protected abstract boolean isBufferEmpty();
protected abstract void write(String id) throws Exception;
protected abstract void clearBuffers();
}

View File

@ -45,10 +45,12 @@ public class InfluxV9AlarmRepo extends InfluxAlarmRepo {
private final DateTimeFormatter dateFormatter = ISODateTimeFormat.dateTime();
@Inject
public InfluxV9AlarmRepo(final Environment env,
final InfluxV9RepoWriter influxV9RepoWriter) {
public InfluxV9AlarmRepo(
final Environment env,
final InfluxV9RepoWriter influxV9RepoWriter) {
super(env);
this.influxV9RepoWriter = influxV9RepoWriter;
this.objectMapper.setPropertyNamingStrategy(

View File

@ -32,10 +32,12 @@ public class InfluxV9MetricRepo extends InfluxMetricRepo {
private final InfluxV9RepoWriter influxV9RepoWriter;
@Inject
public InfluxV9MetricRepo(final Environment env,
final InfluxV9RepoWriter influxV9RepoWriter) {
public InfluxV9MetricRepo(
final Environment env,
final InfluxV9RepoWriter influxV9RepoWriter) {
super(env);
this.influxV9RepoWriter = influxV9RepoWriter;
}

View File

@ -167,7 +167,7 @@ public class InfluxV9RepoWriter {
try {
logger.debug("[{}]: sending {} points to influxdb database {} at {}", id,
logger.debug("[{}]: sending {} points to influxdb {} at {}", id,
influxPointArry.length, this.influxName, this.influxUrl);
HttpResponse response = this.httpClient.execute(request);
@ -180,7 +180,7 @@ public class InfluxV9RepoWriter {
String responseString = EntityUtils.toString(responseEntity, "UTF-8");
logger.error("[{}]: failed to send data to influxdb database {} at {}: {}", id,
logger.error("[{}]: failed to send data to influxdb {} at {}: {}", id,
this.influxName, this.influxUrl, String.valueOf(rc));
logger.error("[{}]: http response: {}", id, responseString);
@ -189,7 +189,7 @@ public class InfluxV9RepoWriter {
}
logger
.debug("[{}]: successfully sent {} points to influxdb database {} at {}", id,
.debug("[{}]: successfully sent {} points to influxdb {} at {}", id,
influxPointArry.length, this.influxName, this.influxUrl);
} finally {

View File

@ -19,7 +19,6 @@ package monasca.persister.repository.vertica;
import monasca.common.model.event.AlarmStateTransitionedEvent;
import monasca.persister.configuration.PersisterConfig;
import monasca.persister.repository.AlarmRepo;
import com.codahale.metrics.Timer;
@ -37,8 +36,9 @@ import java.util.TimeZone;
import javax.inject.Inject;
import io.dropwizard.setup.Environment;
import monasca.persister.repository.Repo;
public class VerticaAlarmRepo extends VerticaRepo implements AlarmRepo {
public class VerticaAlarmRepo extends VerticaRepo implements Repo<AlarmStateTransitionedEvent> {
private static final Logger logger = LoggerFactory.getLogger(VerticaAlarmRepo.class);
private final Environment environment;

View File

@ -21,7 +21,6 @@ import monasca.common.model.metric.Metric;
import monasca.common.model.metric.MetricEnvelope;
import monasca.persister.configuration.PersisterConfig;
import monasca.persister.pipeline.event.MetricHandler;
import monasca.persister.repository.MetricRepo;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
@ -50,8 +49,9 @@ import java.util.TreeMap;
import javax.inject.Inject;
import io.dropwizard.setup.Environment;
import monasca.persister.repository.Repo;
public class VerticaMetricRepo extends VerticaRepo implements MetricRepo {
public class VerticaMetricRepo extends VerticaRepo implements Repo<MetricEnvelope> {
private static final Logger logger = LoggerFactory.getLogger(VerticaMetricRepo.class);