Fix mark read functionality

perculate exception all the way up so that mark read is not called.
flush on shutdown.

Change-Id: Ie8c1b14765a61c0ed68fc83b23e52ef7bd60f4ab
This commit is contained in:
Deklan Dieterly 2015-04-24 11:29:18 -06:00
parent 515e4f65d8
commit 4c4c1d3464
14 changed files with 103 additions and 64 deletions

View File

@ -48,12 +48,8 @@ public class KafkaConsumerRunnableBasic<T> implements Runnable {
}
protected void publishHeartbeat() {
publishEvent(null);
}
protected void handleMessage(String msg) {
publishEvent(msg);
}
@ -62,6 +58,7 @@ public class KafkaConsumerRunnableBasic<T> implements Runnable {
logger.debug("[{}]: marking read", this.threadId);
this.kafkaChannel.markRead();
}
public void stop() {
@ -70,8 +67,11 @@ public class KafkaConsumerRunnableBasic<T> implements Runnable {
this.stop = true;
this.pipeline.shutdown();
if (pipeline.shutdown()) {
markRead();
}
}
public void run() {
@ -92,7 +92,7 @@ public class KafkaConsumerRunnableBasic<T> implements Runnable {
logger.debug("[{}]: {}", this.threadId, msg);
handleMessage(msg);
publishEvent(msg);
}

View File

@ -42,13 +42,25 @@ public class ManagedPipeline<T> {
}
public void shutdown() {
public boolean shutdown() {
logger.info("[{}]: shutdown", this.threadId);
handler.flush();
try {
int msgFlushCnt = handler.flush();
return msgFlushCnt > 0 ? true : false;
} catch (Exception e) {
logger.error("[{}}: failed to flush repo on shutdown", this.threadId, e);
return false;
}
}
public boolean publishEvent(String msg) {
try {
@ -57,7 +69,7 @@ public class ManagedPipeline<T> {
} catch (Exception e) {
logger.error("[{}]: failed to handle msg: {}", msg, e);
logger.error("[{}]: failed to handle msg: {}", this.threadId, msg, e);
return false;

View File

@ -91,9 +91,9 @@ public class AlarmStateTransitionedEventHandler extends
}
@Override
protected void flushRepository() {
protected int flushRepository() throws Exception {
alarmRepo.flush(this.threadId);
return alarmRepo.flush(this.threadId);
}
}

View File

@ -86,7 +86,7 @@ public abstract class FlushableHandler<T> {
protected abstract void initObjectMapper();
protected abstract void flushRepository();
protected abstract int flushRepository() throws Exception;
protected abstract int process(String msg) throws Exception;
@ -94,26 +94,44 @@ public abstract class FlushableHandler<T> {
if (msg == null) {
return checkFlushTime();
if (checkFlushTime()) {
int msgFlushCnt = flush();
return msgFlushCnt > 0 ? true : false;
} else {
return false;
}
}
this.msgCount += process(msg);
this.processedMeter.mark();
return checkBatchSize();
if (checkBatchSize()) {
int msgFlushCnt = flush();
return msgFlushCnt > 0 ? true : false;
} else {
return false;
}
}
private boolean checkBatchSize() {
private boolean checkBatchSize() throws Exception {
logger.debug("[{}]: checking batch size", this.threadId);
if (this.msgCount >= this.batchSize) {
logger.debug("[{}]: batch sized {} attained", this.threadId, this.batchSize);
flush();
return true;
} else {
@ -123,7 +141,9 @@ public abstract class FlushableHandler<T> {
}
}
private boolean checkFlushTime() {
private boolean checkFlushTime() throws Exception {
logger.debug("[{}}: checking flush time", this.threadId);
logger.debug(
"[{}]: got heartbeat message, flush every {} seconds.",
@ -137,8 +157,6 @@ public abstract class FlushableHandler<T> {
this.threadId,
(System.currentTimeMillis() - this.flushTimeMillis));
flush();
return true;
} else {
@ -153,20 +171,13 @@ public abstract class FlushableHandler<T> {
}
}
public void flush() {
public int flush() throws Exception {
logger.debug("[{}]: flush", this.threadId);
if (this.msgCount == 0) {
logger.debug("[{}]: nothing to flush", this.threadId);
return;
}
logger.debug("[{}]: flushing", this.threadId);
Timer.Context context = this.commitTimer.time();
flushRepository();
int msgFlushCnt = flushRepository();
context.stop();
@ -174,11 +185,13 @@ public abstract class FlushableHandler<T> {
this.flushTimeMillis = System.currentTimeMillis() + this.millisBetweenFlushes;
logger.debug("[{}]: flushed {} msg", this.threadId, this.msgCount);
logger.debug("[{}]: flushed {} msg", this.threadId, msgFlushCnt);
this.msgCount = 0;
this.batchCount++;
return msgFlushCnt;
}
protected long getBatchCount() {

View File

@ -101,9 +101,9 @@ public class MetricHandler extends FlushableHandler<MetricEnvelope[]> {
}
@Override
public void flushRepository() {
public int flushRepository() throws Exception {
metricRepo.flush(this.threadId);
return metricRepo.flush(this.threadId);
}
}

View File

@ -20,6 +20,6 @@ public interface Repo<T> {
void addToBatch(final T msg);
void flush(String id);
int flush(String id) throws Exception;
}

View File

@ -41,8 +41,6 @@ public abstract class InfluxAlarmRepo extends InfluxRepo<AlarmStateTransitionedE
MetricRegistry.name(getClass(), "alarm_state_history-meter"));
}
protected abstract void write (String id) throws Exception;
@Override
public void addToBatch(AlarmStateTransitionedEvent alarmStateTransitionedEvent) {

View File

@ -32,8 +32,6 @@ public abstract class InfluxMetricRepo extends InfluxRepo<MetricEnvelope> {
protected final Meter measurementMeter;
protected abstract void write(String id) throws Exception;
public InfluxMetricRepo(final Environment env) {
super(env);

View File

@ -38,47 +38,56 @@ public abstract class InfluxRepo<T> implements Repo<T> {
}
@Override
public void flush(String id) {
public int flush(String id) throws Exception {
if (isBufferEmpty()) {
logger.debug("[{}]: no msg to be written to influxdb", id);
logger.debug("[{}]: returning from flush without flushing", id);
return 0;
} else {
return writeToRepo(id);
}
}
private int writeToRepo(String id) throws Exception {
try {
if (isBufferEmpty()) {
logger.debug("[{}]: no msg to be written to influxdb", id);
logger.debug("[{}]: returning from flush without flushing", id);
return;
}
final long startTime = System.currentTimeMillis();
final Timer.Context context = flushTimer.time();
write(id);
int msgWriteCnt = write(id);
final long endTime = System.currentTimeMillis();
context.stop();
logger.debug("[{}]: flushing batch took {} millis",
id, endTime - startTime);
logger.debug("[{}]: flushing batch took {} millis", id, endTime - startTime);
clearBuffers();
return msgWriteCnt;
} catch (Exception e) {
logger.error("[{}]: failed to write msg to influxdb", id, e);
throw e;
}
clearBuffers();
}
protected abstract boolean isBufferEmpty();
protected abstract void write(String id) throws Exception;
protected abstract int write(String id) throws Exception;
protected abstract void clearBuffers();
}

View File

@ -58,9 +58,9 @@ public class InfluxV9AlarmRepo extends InfluxAlarmRepo {
}
@Override
protected void write(String id) throws Exception {
protected int write(String id) throws Exception {
this.influxV9RepoWriter.write(getInfluxPointArry(), id);
return this.influxV9RepoWriter.write(getInfluxPointArry(), id);
}

View File

@ -43,9 +43,9 @@ public class InfluxV9MetricRepo extends InfluxMetricRepo {
}
@Override
protected void write(String id) throws Exception {
protected int write(String id) throws Exception {
this.influxV9RepoWriter.write(getInfluxPointArry(), id);
return this.influxV9RepoWriter.write(getInfluxPointArry(), id);
}

View File

@ -123,7 +123,7 @@ public class InfluxV9RepoWriter {
}
}
protected void write(final InfluxPoint[] influxPointArry, String id) throws Exception {
protected int write(final InfluxPoint[] influxPointArry, String id) throws Exception {
HttpPost request = new HttpPost(this.influxUrl);
@ -192,6 +192,8 @@ public class InfluxV9RepoWriter {
.debug("[{}]: successfully sent {} points to influxdb {} at {}", id,
influxPointArry.length, this.influxName, this.influxUrl);
return influxPointArry.length;
} finally {
request.releaseConnection();

View File

@ -73,7 +73,7 @@ public class VerticaAlarmRepo extends VerticaRepo implements Repo<AlarmStateTran
.bind(6, timeStamp);
}
public void flush(String id) {
public int flush(String id) {
try {
commitBatch();
} catch (Exception e) {
@ -83,6 +83,10 @@ public class VerticaAlarmRepo extends VerticaRepo implements Repo<AlarmStateTran
}
handle.begin();
}
// Todo. implement cnt.
return 0;
}
private void commitBatch() {

View File

@ -406,7 +406,7 @@ public class VerticaMetricRepo extends VerticaRepo implements Repo<MetricEnvelop
}
@Override
public void flush(String id) {
public int flush(String id) {
try {
long startTime = System.currentTimeMillis();
Timer.Context context = flushTimer.time();
@ -427,6 +427,9 @@ public class VerticaMetricRepo extends VerticaRepo implements Repo<MetricEnvelop
clearTempCaches();
handle.begin();
}
// Todo. implement cnt.
return 0;
}
private void executeBatches() {