From 81634c51bd47cf304da810169d536276fd7ca664 Mon Sep 17 00:00:00 2001 From: jmarchel Date: Tue, 5 Mar 2024 13:56:27 +0100 Subject: [PATCH] Specify topic name Change-Id: I5eaa6d012c1d03f35e31f5ac7decadb2c16ddfae --- .idea/compiler.xml | 1 + prediction-orchestrator/pom.xml | 6 ++++- ...ApplicationSpecificPredictionConsumer.java | 25 +++++++++++++------ .../PredictedMetricsPublisher.java | 2 +- 4 files changed, 25 insertions(+), 9 deletions(-) diff --git a/.idea/compiler.xml b/.idea/compiler.xml index c6f12d6..248f0f5 100644 --- a/.idea/compiler.xml +++ b/.idea/compiler.xml @@ -2,6 +2,7 @@ + diff --git a/prediction-orchestrator/pom.xml b/prediction-orchestrator/pom.xml index d944774..b25471b 100644 --- a/prediction-orchestrator/pom.xml +++ b/prediction-orchestrator/pom.xml @@ -37,7 +37,11 @@ exn-connector-java 1.0-SNAPSHOT - + + org.springframework.boot + spring-boot-starter-actuator + + diff --git a/prediction-orchestrator/src/main/java/eu/nebulouscloud/predictionorchestrator/ApplicationSpecificPredictionConsumer.java b/prediction-orchestrator/src/main/java/eu/nebulouscloud/predictionorchestrator/ApplicationSpecificPredictionConsumer.java index 552a202..2dcde79 100644 --- a/prediction-orchestrator/src/main/java/eu/nebulouscloud/predictionorchestrator/ApplicationSpecificPredictionConsumer.java +++ b/prediction-orchestrator/src/main/java/eu/nebulouscloud/predictionorchestrator/ApplicationSpecificPredictionConsumer.java @@ -5,6 +5,7 @@ import eu.nebulouscloud.exn.core.Context; import eu.nebulouscloud.exn.core.Handler; import lombok.extern.slf4j.Slf4j; import org.apache.qpid.protonj2.client.Message; +import org.apache.qpid.protonj2.client.exceptions.ClientException; import java.util.Arrays; import java.util.HashMap; @@ -17,7 +18,7 @@ public class ApplicationSpecificPredictionConsumer extends Consumer { super("realtime_metrics_consumer_" + applicationName, "eu.nebulouscloud.monitoring.realtime.>", new ApplicationMetricsHandler(applicationName), - true); + true, true); } private static class ApplicationMetricsHandler extends Handler { @@ -29,26 +30,36 @@ public class ApplicationSpecificPredictionConsumer extends Consumer { @Override public void onMessage(String key, String address, Map body, Message message, Context ctx) { + log.debug("Received message with key: {}, address: {}", key, address); // Added more context to the log message + // Transform Type I message to Type II format (predicted metrics) Map predictedMetric = transformToPredictedMetric(body); + log.info("Transformed message body to predicted metrics: {}", predictedMetric); + + String[] parts; + try { + parts = message.to().split("\\."); + } catch (ClientException e) { + log.error("Failed to split message 'to' property", e); + throw new RuntimeException("Failed to split message 'to' property", e); + } - String[] parts = address.split("\\."); // Assuming the metric name is the last part of the topic String metricName = parts[parts.length - 1]; + log.debug("Extracted metric name: {}", metricName); // Log the extracted metric name - String publisherKey = "predicted_metrics_" + metricName; // Construct a unique key for the publisher - + String publisherKey = "predicted_metrics_" + metricName; PredictedMetricsPublisher predictedMetricsPublisher = (PredictedMetricsPublisher) ctx.getPublisher(publisherKey); if (predictedMetricsPublisher == null) { - log.info("PredictedMetricsPublisher for {} not found, creating a new one.", metricName); - // Assuming you have a method to create and register a new publisher + log.info("PredictedMetricsPublisher for metric {} not found, creating a new one.", metricName); predictedMetricsPublisher = new PredictedMetricsPublisher(metricName); ctx.registerPublisher(predictedMetricsPublisher); - log.info("New PredictedMetricsPublisher for {} registered.", metricName); + log.info("New PredictedMetricsPublisher for metric {} registered.", metricName); } predictedMetricsPublisher.send(predictedMetric, applicationName); + log.info("Sent predicted metric to topic: {}", predictedMetricsPublisher.topicName); } private Map transformToPredictedMetric(Map metric) { diff --git a/prediction-orchestrator/src/main/java/eu/nebulouscloud/predictionorchestrator/PredictedMetricsPublisher.java b/prediction-orchestrator/src/main/java/eu/nebulouscloud/predictionorchestrator/PredictedMetricsPublisher.java index 0b50867..b68a257 100644 --- a/prediction-orchestrator/src/main/java/eu/nebulouscloud/predictionorchestrator/PredictedMetricsPublisher.java +++ b/prediction-orchestrator/src/main/java/eu/nebulouscloud/predictionorchestrator/PredictedMetricsPublisher.java @@ -5,7 +5,7 @@ import eu.nebulouscloud.exn.core.Publisher; class PredictedMetricsPublisher extends Publisher { String topicName; public PredictedMetricsPublisher(String topicName) { - super("predicted_metrics_" + topicName, "eu.nebulouscloud.monitoring.predicted." + topicName, true); + super("predicted_metrics_" + topicName, "eu.nebulouscloud.monitoring.predicted." + topicName, true, true); this.topicName = topicName; } } \ No newline at end of file