Introduced more robust code which enables sending a message only when a publisher is available
Introduced better logging

Change-Id: I94a6fdb4612de192c24511445f1236cdce94b403
This commit is contained in:
Andreas Tsagkaropoulos 2024-05-16 17:06:42 +03:00
parent 074b1d4a7b
commit 246fcc68ae
3 changed files with 34 additions and 5 deletions

View File

@ -15,6 +15,7 @@ import java.time.Clock;
import java.util.Collections;
import java.util.Date;
import java.util.NoSuchElementException;
import java.util.logging.Level;
import java.util.logging.Logger;
import static configuration.Constants.*;
@ -74,6 +75,22 @@ public class Runnables {
Runnable severity_calculation_runnable = () -> {
BrokerPublisher persistent_publisher = new BrokerPublisher(topic_for_severity_announcement, broker_ip,broker_port,broker_username,broker_password, amq_library_configuration_location);
int attempts = 1;
while (persistent_publisher.is_publisher_null()){
if (attempts<=2) {
persistent_publisher = new BrokerPublisher(topic_for_severity_announcement, broker_ip, broker_port, broker_username, broker_password, amq_library_configuration_location);
}else{
Logger.getGlobal().log(Level.WARNING,"Will now attempt to reset the BrokerPublisher connector");
persistent_publisher = new BrokerPublisher(topic_for_severity_announcement, broker_ip, broker_port, broker_username, broker_password, amq_library_configuration_location,true);
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
attempts++;
}
while (!detector.stop_signal.get()) {
synchronized (detector.PREDICTION_EXISTS) {
while (!detector.PREDICTION_EXISTS.getValue()) {
@ -116,6 +133,7 @@ public class Runnables {
continue;
}
Logger.getGlobal().log(info_logging_level, "Targeted_prediction_time " + targeted_prediction_time);
BrokerPublisher finalPersistent_publisher = persistent_publisher;
Runnable internal_severity_calculation_runnable = () -> {
try {
synchronized (detector.PREDICTION_EXISTS) {
@ -142,7 +160,7 @@ public class Runnables {
severity_json.put("severity", rule_severity);
severity_json.put("probability", slo_violation_probability);
severity_json.put("predictionTime", targeted_prediction_time);
persistent_publisher.publish(severity_json.toJSONString(), Collections.singleton(detector.get_application_name()));
finalPersistent_publisher.publish(severity_json.toJSONString(), Collections.singleton(detector.get_application_name()));
}
detector.getSubcomponent_state().slo_violation_event_recording_queue.add(System.currentTimeMillis());

View File

@ -25,11 +25,14 @@ public class SLOViolationDetectorStateUtils {
URI absolute_configuration_file_path = new File(configuration_file_location).toURI();
URI relative_configuration_file_path = base_project_path.relativize(absolute_configuration_file_path);
Logger.getGlobal().log(info_logging_level, "This is the base project path:" + base_project_path);
return new FileInputStream(base_project_path.getPath() + relative_configuration_file_path);
String configuration_path = base_project_path.getPath() + relative_configuration_file_path;
Logger.getGlobal().log(info_logging_level, "Loading configuration from path: "+configuration_path);
return new FileInputStream(configuration_path);
}else{
if (base_project_path == null || base_project_path.getPath().equals(EMPTY)) {
base_project_path = new File(custom_properties_file_path).toURI();
}
Logger.getGlobal().log(info_logging_level, "Loading configuration from path: "+base_project_path);
return new FileInputStream(base_project_path.getPath());
}
}

View File

@ -30,9 +30,13 @@ public class BrokerPublisher {
private int broker_port;
public BrokerPublisher(String topic, String broker_ip, int broker_port, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation) {
this(topic,broker_ip,broker_port,brokerUsername,brokerPassword,amqLibraryConfigurationLocation,false);
}
public BrokerPublisher(String topic, String broker_ip, int broker_port, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation, boolean hard_initialize_connector) {
boolean able_to_initialize_BrokerPublisher = topic!=null && broker_ip!=null && brokerUsername!=null && brokerPassword!=null && !topic.equals(EMPTY) && !broker_ip.equals(EMPTY) && !brokerUsername.equals(EMPTY) && !brokerPassword.equals(EMPTY);
if (!able_to_initialize_BrokerPublisher){
Logger.getGlobal().log(Level.SEVERE,"Could not initialize BrokerPublisher");
return;
}
boolean publisher_configuration_changed;
@ -52,7 +56,7 @@ public class BrokerPublisher {
}
if (publisher_configuration_changed){
if (publisher_configuration_changed || hard_initialize_connector){
// for (String current_broker_ip : broker_and_topics_to_publish_to.keySet()){
Logger.getGlobal().log(Level.INFO,"Publisher configuration changed, creating new connector at "+broker_ip+" for topic "+topic);
if (active_connector!=null) {
@ -105,12 +109,16 @@ public class BrokerPublisher {
} catch (ParseException p) {
Logger.getGlobal().log(Level.WARNING, "Could not parse the string content to be published to the broker as json, which is the following: "+json_string_content);
}
if (private_publisher_instance != null) {
if (!is_publisher_null()) {
private_publisher_instance.send(json_object);
} else {
Logger.getGlobal().log(Level.SEVERE, "Could not send message to AMQP broker, as the broker ip to be used has not been specified");
Logger.getGlobal().log(Level.SEVERE, "Could not send message to AMQP broker, as the private publisher instance is null");
}
}
}
public boolean is_publisher_null(){
return (private_publisher_instance == null);
}
}