Specify topic name

Change-Id: I5eaa6d012c1d03f35e31f5ac7decadb2c16ddfae
This commit is contained in:
jmarchel 2024-03-05 13:56:27 +01:00
parent 1343df3a6d
commit 81634c51bd
4 changed files with 25 additions and 9 deletions

1
.idea/compiler.xml generated
View File

@ -2,6 +2,7 @@
<project version="4">
<component name="CompilerConfiguration">
<annotationProcessing>
<profile default="true" name="Default" enabled="true" />
<profile name="Gradle Imported" enabled="true">
<outputRelativeToContentRoot value="true" />
<processorPath useClasspath="false">

View File

@ -37,7 +37,11 @@
<artifactId>exn-connector-java</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
</dependencies>
<repositories>
<!-- Repository for SNAPSHOT versions -->

View File

@ -5,6 +5,7 @@ import eu.nebulouscloud.exn.core.Context;
import eu.nebulouscloud.exn.core.Handler;
import lombok.extern.slf4j.Slf4j;
import org.apache.qpid.protonj2.client.Message;
import org.apache.qpid.protonj2.client.exceptions.ClientException;
import java.util.Arrays;
import java.util.HashMap;
@ -17,7 +18,7 @@ public class ApplicationSpecificPredictionConsumer extends Consumer {
super("realtime_metrics_consumer_" + applicationName,
"eu.nebulouscloud.monitoring.realtime.>",
new ApplicationMetricsHandler(applicationName),
true);
true, true);
}
private static class ApplicationMetricsHandler extends Handler {
@ -29,26 +30,36 @@ public class ApplicationSpecificPredictionConsumer extends Consumer {
@Override
public void onMessage(String key, String address, Map body, Message message, Context ctx) {
log.debug("Received message with key: {}, address: {}", key, address); // Added more context to the log message
// Transform Type I message to Type II format (predicted metrics)
Map<String, Object> predictedMetric = transformToPredictedMetric(body);
log.info("Transformed message body to predicted metrics: {}", predictedMetric);
String[] parts;
try {
parts = message.to().split("\\.");
} catch (ClientException e) {
log.error("Failed to split message 'to' property", e);
throw new RuntimeException("Failed to split message 'to' property", e);
}
String[] parts = address.split("\\.");
// Assuming the metric name is the last part of the topic
String metricName = parts[parts.length - 1];
log.debug("Extracted metric name: {}", metricName); // Log the extracted metric name
String publisherKey = "predicted_metrics_" + metricName; // Construct a unique key for the publisher
String publisherKey = "predicted_metrics_" + metricName;
PredictedMetricsPublisher predictedMetricsPublisher = (PredictedMetricsPublisher) ctx.getPublisher(publisherKey);
if (predictedMetricsPublisher == null) {
log.info("PredictedMetricsPublisher for {} not found, creating a new one.", metricName);
// Assuming you have a method to create and register a new publisher
log.info("PredictedMetricsPublisher for metric {} not found, creating a new one.", metricName);
predictedMetricsPublisher = new PredictedMetricsPublisher(metricName);
ctx.registerPublisher(predictedMetricsPublisher);
log.info("New PredictedMetricsPublisher for {} registered.", metricName);
log.info("New PredictedMetricsPublisher for metric {} registered.", metricName);
}
predictedMetricsPublisher.send(predictedMetric, applicationName);
log.info("Sent predicted metric to topic: {}", predictedMetricsPublisher.topicName);
}
private Map<String, Object> transformToPredictedMetric(Map<String, Object> metric) {

View File

@ -5,7 +5,7 @@ import eu.nebulouscloud.exn.core.Publisher;
class PredictedMetricsPublisher extends Publisher {
String topicName;
public PredictedMetricsPublisher(String topicName) {
super("predicted_metrics_" + topicName, "eu.nebulouscloud.monitoring.predicted." + topicName, true);
super("predicted_metrics_" + topicName, "eu.nebulouscloud.monitoring.predicted." + topicName, true, true);
this.topicName = topicName;
}
}