Add packages
Move classes to appropriate packages metric alarmstate Remove unnecessary continue statement. Change-Id: I690f05d5426bf2cae8978b0662c1813453a387bb
This commit is contained in:
parent
16daf95a1d
commit
468dd8b031
@ -18,16 +18,16 @@
|
||||
package monasca.persister;
|
||||
|
||||
import monasca.persister.configuration.PersisterConfig;
|
||||
import monasca.persister.consumer.AlarmStateTransitionConsumer;
|
||||
import monasca.persister.consumer.AlarmStateTransitionConsumerFactory;
|
||||
import monasca.persister.consumer.KafkaAlarmStateTransitionConsumer;
|
||||
import monasca.persister.consumer.KafkaAlarmStateTransitionConsumerFactory;
|
||||
import monasca.persister.consumer.alarmstate.AlarmStateTransitionConsumer;
|
||||
import monasca.persister.consumer.alarmstate.AlarmStateTransitionConsumerFactory;
|
||||
import monasca.persister.consumer.alarmstate.KafkaAlarmStateTransitionConsumer;
|
||||
import monasca.persister.consumer.alarmstate.KafkaAlarmStateTransitionConsumerFactory;
|
||||
import monasca.persister.consumer.KafkaChannel;
|
||||
import monasca.persister.consumer.KafkaChannelFactory;
|
||||
import monasca.persister.consumer.KafkaMetricsConsumer;
|
||||
import monasca.persister.consumer.KafkaMetricsConsumerFactory;
|
||||
import monasca.persister.consumer.MetricsConsumer;
|
||||
import monasca.persister.consumer.MetricsConsumerFactory;
|
||||
import monasca.persister.consumer.metric.KafkaMetricsConsumer;
|
||||
import monasca.persister.consumer.metric.KafkaMetricsConsumerFactory;
|
||||
import monasca.persister.consumer.metric.MetricsConsumer;
|
||||
import monasca.persister.consumer.metric.MetricsConsumerFactory;
|
||||
import monasca.persister.healthcheck.SimpleHealthCheck;
|
||||
import monasca.persister.pipeline.AlarmStateTransitionPipeline;
|
||||
import monasca.persister.pipeline.AlarmStateTransitionPipelineFactory;
|
||||
|
@ -27,20 +27,20 @@ import javax.inject.Singleton;
|
||||
|
||||
import io.dropwizard.setup.Environment;
|
||||
import monasca.persister.configuration.PersisterConfig;
|
||||
import monasca.persister.consumer.AlarmStateTransitionConsumer;
|
||||
import monasca.persister.consumer.AlarmStateTransitionConsumerFactory;
|
||||
import monasca.persister.consumer.KafkaAlarmStateTransitionConsumer;
|
||||
import monasca.persister.consumer.KafkaAlarmStateTransitionConsumerFactory;
|
||||
import monasca.persister.consumer.KafkaAlarmStateTransitionConsumerRunnableBasic;
|
||||
import monasca.persister.consumer.KafkaAlarmStateTransitionConsumerRunnableBasicFactory;
|
||||
import monasca.persister.consumer.alarmstate.AlarmStateTransitionConsumer;
|
||||
import monasca.persister.consumer.alarmstate.AlarmStateTransitionConsumerFactory;
|
||||
import monasca.persister.consumer.alarmstate.KafkaAlarmStateTransitionConsumer;
|
||||
import monasca.persister.consumer.alarmstate.KafkaAlarmStateTransitionConsumerFactory;
|
||||
import monasca.persister.consumer.alarmstate.KafkaAlarmStateTransitionConsumerRunnableBasic;
|
||||
import monasca.persister.consumer.alarmstate.KafkaAlarmStateTransitionConsumerRunnableBasicFactory;
|
||||
import monasca.persister.consumer.KafkaChannel;
|
||||
import monasca.persister.consumer.KafkaChannelFactory;
|
||||
import monasca.persister.consumer.KafkaMetricsConsumer;
|
||||
import monasca.persister.consumer.KafkaMetricsConsumerFactory;
|
||||
import monasca.persister.consumer.KafkaMetricsConsumerRunnableBasic;
|
||||
import monasca.persister.consumer.KafkaMetricsConsumerRunnableBasicFactory;
|
||||
import monasca.persister.consumer.MetricsConsumer;
|
||||
import monasca.persister.consumer.MetricsConsumerFactory;
|
||||
import monasca.persister.consumer.metric.KafkaMetricsConsumer;
|
||||
import monasca.persister.consumer.metric.KafkaMetricsConsumerFactory;
|
||||
import monasca.persister.consumer.metric.KafkaMetricsConsumerRunnableBasic;
|
||||
import monasca.persister.consumer.metric.KafkaMetricsConsumerRunnableBasicFactory;
|
||||
import monasca.persister.consumer.metric.MetricsConsumer;
|
||||
import monasca.persister.consumer.metric.MetricsConsumerFactory;
|
||||
import monasca.persister.dbi.DBIProvider;
|
||||
import monasca.persister.pipeline.AlarmStateTransitionPipeline;
|
||||
import monasca.persister.pipeline.AlarmStateTransitionPipelineFactory;
|
||||
@ -125,8 +125,7 @@ public class PersisterModule extends AbstractModule {
|
||||
|
||||
install(new FactoryModuleBuilder().implement(
|
||||
MetricsConsumer.class,
|
||||
MetricsConsumer.class).build(
|
||||
MetricsConsumerFactory.class));
|
||||
MetricsConsumer.class).build(MetricsConsumerFactory.class));
|
||||
|
||||
install(new FactoryModuleBuilder().implement(KafkaChannel.class, KafkaChannel.class).build(
|
||||
KafkaChannelFactory.class));
|
||||
|
@ -58,7 +58,7 @@ public abstract class KafkaConsumer<T> {
|
||||
logger.warn("Did not shut down in {} seconds", WAIT_TIME);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
logger.info("awaitTerminiation interrupted", e);
|
||||
logger.info("awaitTermination interrupted", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -19,11 +19,11 @@ package monasca.persister.consumer;
|
||||
|
||||
import monasca.persister.pipeline.ManagedPipeline;
|
||||
|
||||
import kafka.consumer.ConsumerIterator;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import kafka.consumer.ConsumerIterator;
|
||||
|
||||
public abstract class KafkaConsumerRunnableBasic<T> implements Runnable {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerRunnableBasic.class);
|
||||
@ -44,7 +44,7 @@ public abstract class KafkaConsumerRunnableBasic<T> implements Runnable {
|
||||
|
||||
abstract protected void handleMessage(String message);
|
||||
|
||||
protected void markRead() {
|
||||
private void markRead() {
|
||||
this.kafkaChannel.markRead();
|
||||
}
|
||||
|
||||
@ -66,7 +66,6 @@ public abstract class KafkaConsumerRunnableBasic<T> implements Runnable {
|
||||
}
|
||||
} catch (kafka.consumer.ConsumerTimeoutException cte) {
|
||||
publishHeartbeat();
|
||||
continue;
|
||||
}
|
||||
}
|
||||
logger.debug("Shutting down Thread: {}", threadNumber);
|
||||
|
@ -15,9 +15,10 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package monasca.persister.consumer;
|
||||
package monasca.persister.consumer.alarmstate;
|
||||
|
||||
import monasca.common.model.event.AlarmStateTransitionedEvent;
|
||||
import monasca.persister.consumer.Consumer;
|
||||
import monasca.persister.pipeline.AlarmStateTransitionPipeline;
|
||||
|
||||
import com.google.inject.Inject;
|
@ -15,7 +15,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package monasca.persister.consumer;
|
||||
package monasca.persister.consumer.alarmstate;
|
||||
|
||||
import monasca.persister.pipeline.AlarmStateTransitionPipeline;
|
||||
|
@ -15,9 +15,12 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package monasca.persister.consumer;
|
||||
package monasca.persister.consumer.alarmstate;
|
||||
|
||||
import monasca.common.model.event.AlarmStateTransitionedEvent;
|
||||
import monasca.persister.consumer.KafkaChannel;
|
||||
import monasca.persister.consumer.KafkaConsumer;
|
||||
import monasca.persister.consumer.KafkaConsumerRunnableBasic;
|
||||
import monasca.persister.pipeline.AlarmStateTransitionPipeline;
|
||||
|
||||
import com.google.inject.Inject;
|
@ -15,8 +15,9 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package monasca.persister.consumer;
|
||||
package monasca.persister.consumer.alarmstate;
|
||||
|
||||
import monasca.persister.consumer.KafkaChannel;
|
||||
import monasca.persister.pipeline.AlarmStateTransitionPipeline;
|
||||
|
||||
public interface KafkaAlarmStateTransitionConsumerFactory {
|
@ -15,9 +15,11 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package monasca.persister.consumer;
|
||||
package monasca.persister.consumer.alarmstate;
|
||||
|
||||
import monasca.common.model.event.AlarmStateTransitionedEvent;
|
||||
import monasca.persister.consumer.KafkaChannel;
|
||||
import monasca.persister.consumer.KafkaConsumerRunnableBasic;
|
||||
import monasca.persister.pipeline.AlarmStateTransitionPipeline;
|
||||
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
@ -29,7 +31,7 @@ import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class KafkaAlarmStateTransitionConsumerRunnableBasic extends
|
||||
KafkaConsumerRunnableBasic<AlarmStateTransitionedEvent> {
|
||||
KafkaConsumerRunnableBasic<AlarmStateTransitionedEvent> {
|
||||
|
||||
private static final Logger logger = LoggerFactory
|
||||
.getLogger(KafkaAlarmStateTransitionConsumerRunnableBasic.class);
|
@ -15,8 +15,9 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package monasca.persister.consumer;
|
||||
package monasca.persister.consumer.alarmstate;
|
||||
|
||||
import monasca.persister.consumer.KafkaChannel;
|
||||
import monasca.persister.pipeline.AlarmStateTransitionPipeline;
|
||||
|
||||
public interface KafkaAlarmStateTransitionConsumerRunnableBasicFactory {
|
@ -15,9 +15,12 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package monasca.persister.consumer;
|
||||
package monasca.persister.consumer.metric;
|
||||
|
||||
import monasca.common.model.metric.MetricEnvelope;
|
||||
import monasca.persister.consumer.KafkaChannel;
|
||||
import monasca.persister.consumer.KafkaConsumer;
|
||||
import monasca.persister.consumer.KafkaConsumerRunnableBasic;
|
||||
import monasca.persister.pipeline.MetricPipeline;
|
||||
|
||||
import com.google.inject.Inject;
|
@ -15,8 +15,9 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package monasca.persister.consumer;
|
||||
package monasca.persister.consumer.metric;
|
||||
|
||||
import monasca.persister.consumer.KafkaChannel;
|
||||
import monasca.persister.pipeline.MetricPipeline;
|
||||
|
||||
public interface KafkaMetricsConsumerFactory {
|
@ -15,9 +15,11 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package monasca.persister.consumer;
|
||||
package monasca.persister.consumer.metric;
|
||||
|
||||
import monasca.common.model.metric.MetricEnvelope;
|
||||
import monasca.persister.consumer.KafkaChannel;
|
||||
import monasca.persister.consumer.KafkaConsumerRunnableBasic;
|
||||
import monasca.persister.pipeline.MetricPipeline;
|
||||
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
@ -29,7 +31,8 @@ import com.google.inject.assistedinject.Assisted;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class KafkaMetricsConsumerRunnableBasic extends KafkaConsumerRunnableBasic<MetricEnvelope[]> {
|
||||
public class KafkaMetricsConsumerRunnableBasic extends
|
||||
KafkaConsumerRunnableBasic<MetricEnvelope[]> {
|
||||
|
||||
private static final Logger logger = LoggerFactory
|
||||
.getLogger(KafkaMetricsConsumerRunnableBasic.class);
|
@ -15,8 +15,9 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package monasca.persister.consumer;
|
||||
package monasca.persister.consumer.metric;
|
||||
|
||||
import monasca.persister.consumer.KafkaChannel;
|
||||
import monasca.persister.pipeline.MetricPipeline;
|
||||
|
||||
public interface KafkaMetricsConsumerRunnableBasicFactory {
|
@ -15,9 +15,10 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package monasca.persister.consumer;
|
||||
package monasca.persister.consumer.metric;
|
||||
|
||||
import monasca.common.model.metric.MetricEnvelope;
|
||||
import monasca.persister.consumer.Consumer;
|
||||
import monasca.persister.pipeline.MetricPipeline;
|
||||
|
||||
import com.google.inject.Inject;
|
@ -12,7 +12,7 @@
|
||||
* the License.
|
||||
*/
|
||||
|
||||
package monasca.persister.consumer;
|
||||
package monasca.persister.consumer.metric;
|
||||
|
||||
import monasca.persister.pipeline.MetricPipeline;
|
||||
|
@ -66,7 +66,8 @@ public class InfluxV8MetricRepo extends InfluxMetricRepo
|
||||
Definition definition = definitionMapEntry.getKey();
|
||||
Map<Dimensions, List<Measurement>> dimensionsMap = definitionMapEntry.getValue();
|
||||
|
||||
for (Map.Entry<Dimensions, List<Measurement>> dimensionsMapEntry : dimensionsMap.entrySet()) {
|
||||
for (Map.Entry<Dimensions, List<Measurement>> dimensionsMapEntry
|
||||
: dimensionsMap.entrySet()) {
|
||||
|
||||
Dimensions dimensions = dimensionsMapEntry.getKey();
|
||||
List<Measurement> measurementList = dimensionsMapEntry.getValue();
|
||||
|
@ -51,13 +51,14 @@ public class InfluxV9MetricRepo extends InfluxMetricRepo {
|
||||
|
||||
List<InfluxPoint> influxPointList = new LinkedList<>();
|
||||
|
||||
for (Map.Entry<Definition, Map<Dimensions, List<Measurement>>> definitionMapEntry :
|
||||
this.measurementBuffer.entrySet()) {
|
||||
for (Map.Entry<Definition, Map<Dimensions, List<Measurement>>> definitionMapEntry
|
||||
: this.measurementBuffer.entrySet()) {
|
||||
|
||||
Definition definition = definitionMapEntry.getKey();
|
||||
Map<Dimensions, List<Measurement>> dimensionsMap = definitionMapEntry.getValue();
|
||||
|
||||
for (Map.Entry<Dimensions, List<Measurement>> dimensionsMapEntry : dimensionsMap.entrySet()) {
|
||||
for (Map.Entry<Dimensions, List<Measurement>> dimensionsMapEntry
|
||||
: dimensionsMap.entrySet()) {
|
||||
|
||||
Dimensions dimensions = dimensionsMapEntry.getKey();
|
||||
List<Measurement> measurementList = dimensionsMapEntry.getValue();
|
||||
|
@ -53,14 +53,13 @@ public class InfluxV9RepoWriter {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(InfluxV9RepoWriter.class);
|
||||
|
||||
private final PersisterConfig config;
|
||||
|
||||
private final String influxName;
|
||||
private final String influxUrl;
|
||||
private final String influxCreds;
|
||||
private final String influxUser;
|
||||
private final String influxPass;
|
||||
private final String influxRetentionPolicy;
|
||||
private final boolean gzip;
|
||||
|
||||
private final CloseableHttpClient httpClient;
|
||||
|
||||
@ -71,21 +70,20 @@ public class InfluxV9RepoWriter {
|
||||
@Inject
|
||||
public InfluxV9RepoWriter(final PersisterConfig config) {
|
||||
|
||||
this.config = config;
|
||||
|
||||
this.influxName = config.getInfluxDBConfiguration().getName();
|
||||
this.influxUrl = config.getInfluxDBConfiguration().getUrl() + "/write";
|
||||
this.influxUser = config.getInfluxDBConfiguration().getUser();
|
||||
this.influxPass = config.getInfluxDBConfiguration().getPassword();
|
||||
this.influxCreds = this.influxUser + ":" + this.influxPass;
|
||||
this.influxRetentionPolicy = config.getInfluxDBConfiguration().getRetentionPolicy();
|
||||
this.gzip = config.getInfluxDBConfiguration().getGzip();
|
||||
|
||||
this.baseAuthHeader = "Basic " + new String(Base64.encodeBase64(this.influxCreds.getBytes()));
|
||||
|
||||
PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
|
||||
cm.setMaxTotal(config.getInfluxDBConfiguration().getMaxHttpConnections());
|
||||
|
||||
if (config.getInfluxDBConfiguration().getGzip()) {
|
||||
if (this.gzip) {
|
||||
|
||||
this.httpClient =
|
||||
HttpClients.custom().setConnectionManager(cm)
|
||||
@ -123,7 +121,6 @@ public class InfluxV9RepoWriter {
|
||||
this.httpClient = HttpClients.custom().setConnectionManager(cm).build();
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
protected void write(final InfluxPoint[] influxPointArry) throws Exception {
|
||||
@ -140,7 +137,7 @@ public class InfluxV9RepoWriter {
|
||||
|
||||
String json = this.objectMapper.writeValueAsString(influxWrite);
|
||||
|
||||
if (this.config.getInfluxDBConfiguration().getGzip()) {
|
||||
if (this.gzip) {
|
||||
|
||||
HttpEntity
|
||||
requestEntity =
|
||||
|
@ -17,8 +17,8 @@
|
||||
|
||||
package monasca.persister;
|
||||
|
||||
import monasca.persister.consumer.KafkaMetricsConsumer;
|
||||
import monasca.persister.consumer.MetricsConsumer;
|
||||
import monasca.persister.consumer.metric.KafkaMetricsConsumer;
|
||||
import monasca.persister.consumer.metric.MetricsConsumer;
|
||||
import monasca.persister.pipeline.MetricPipeline;
|
||||
import monasca.persister.pipeline.event.MetricHandler;
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user