diff --git a/slo-violation-detector/src/main/java/slo_violation_detector_engine/generic/Runnables.java b/slo-violation-detector/src/main/java/slo_violation_detector_engine/generic/Runnables.java index 1d77c07..b1b1889 100644 --- a/slo-violation-detector/src/main/java/slo_violation_detector_engine/generic/Runnables.java +++ b/slo-violation-detector/src/main/java/slo_violation_detector_engine/generic/Runnables.java @@ -15,6 +15,7 @@ import java.time.Clock; import java.util.Collections; import java.util.Date; import java.util.NoSuchElementException; +import java.util.logging.Level; import java.util.logging.Logger; import static configuration.Constants.*; @@ -74,6 +75,22 @@ public class Runnables { Runnable severity_calculation_runnable = () -> { BrokerPublisher persistent_publisher = new BrokerPublisher(topic_for_severity_announcement, broker_ip,broker_port,broker_username,broker_password, amq_library_configuration_location); + int attempts = 1; + while (persistent_publisher.is_publisher_null()){ + if (attempts<=2) { + persistent_publisher = new BrokerPublisher(topic_for_severity_announcement, broker_ip, broker_port, broker_username, broker_password, amq_library_configuration_location); + }else{ + Logger.getGlobal().log(Level.WARNING,"Will now attempt to reset the BrokerPublisher connector"); + persistent_publisher = new BrokerPublisher(topic_for_severity_announcement, broker_ip, broker_port, broker_username, broker_password, amq_library_configuration_location,true); + } + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + attempts++; + } + while (!detector.stop_signal.get()) { synchronized (detector.PREDICTION_EXISTS) { while (!detector.PREDICTION_EXISTS.getValue()) { @@ -116,6 +133,7 @@ public class Runnables { continue; } Logger.getGlobal().log(info_logging_level, "Targeted_prediction_time " + targeted_prediction_time); + BrokerPublisher finalPersistent_publisher = persistent_publisher; Runnable internal_severity_calculation_runnable = () -> { try { synchronized (detector.PREDICTION_EXISTS) { @@ -142,7 +160,7 @@ public class Runnables { severity_json.put("severity", rule_severity); severity_json.put("probability", slo_violation_probability); severity_json.put("predictionTime", targeted_prediction_time); - persistent_publisher.publish(severity_json.toJSONString(), Collections.singleton(detector.get_application_name())); + finalPersistent_publisher.publish(severity_json.toJSONString(), Collections.singleton(detector.get_application_name())); } detector.getSubcomponent_state().slo_violation_event_recording_queue.add(System.currentTimeMillis()); diff --git a/slo-violation-detector/src/main/java/slo_violation_detector_engine/generic/SLOViolationDetectorStateUtils.java b/slo-violation-detector/src/main/java/slo_violation_detector_engine/generic/SLOViolationDetectorStateUtils.java index 67938c9..341501e 100644 --- a/slo-violation-detector/src/main/java/slo_violation_detector_engine/generic/SLOViolationDetectorStateUtils.java +++ b/slo-violation-detector/src/main/java/slo_violation_detector_engine/generic/SLOViolationDetectorStateUtils.java @@ -25,11 +25,14 @@ public class SLOViolationDetectorStateUtils { URI absolute_configuration_file_path = new File(configuration_file_location).toURI(); URI relative_configuration_file_path = base_project_path.relativize(absolute_configuration_file_path); Logger.getGlobal().log(info_logging_level, "This is the base project path:" + base_project_path); - return new FileInputStream(base_project_path.getPath() + relative_configuration_file_path); + String configuration_path = base_project_path.getPath() + relative_configuration_file_path; + Logger.getGlobal().log(info_logging_level, "Loading configuration from path: "+configuration_path); + return new FileInputStream(configuration_path); }else{ if (base_project_path == null || base_project_path.getPath().equals(EMPTY)) { base_project_path = new File(custom_properties_file_path).toURI(); } + Logger.getGlobal().log(info_logging_level, "Loading configuration from path: "+base_project_path); return new FileInputStream(base_project_path.getPath()); } } diff --git a/slo-violation-detector/src/main/java/utility_beans/broker_communication/BrokerPublisher.java b/slo-violation-detector/src/main/java/utility_beans/broker_communication/BrokerPublisher.java index 7e1b62d..7982271 100644 --- a/slo-violation-detector/src/main/java/utility_beans/broker_communication/BrokerPublisher.java +++ b/slo-violation-detector/src/main/java/utility_beans/broker_communication/BrokerPublisher.java @@ -30,9 +30,13 @@ public class BrokerPublisher { private int broker_port; public BrokerPublisher(String topic, String broker_ip, int broker_port, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation) { + this(topic,broker_ip,broker_port,brokerUsername,brokerPassword,amqLibraryConfigurationLocation,false); + } + public BrokerPublisher(String topic, String broker_ip, int broker_port, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation, boolean hard_initialize_connector) { boolean able_to_initialize_BrokerPublisher = topic!=null && broker_ip!=null && brokerUsername!=null && brokerPassword!=null && !topic.equals(EMPTY) && !broker_ip.equals(EMPTY) && !brokerUsername.equals(EMPTY) && !brokerPassword.equals(EMPTY); if (!able_to_initialize_BrokerPublisher){ + Logger.getGlobal().log(Level.SEVERE,"Could not initialize BrokerPublisher"); return; } boolean publisher_configuration_changed; @@ -52,7 +56,7 @@ public class BrokerPublisher { } - if (publisher_configuration_changed){ + if (publisher_configuration_changed || hard_initialize_connector){ // for (String current_broker_ip : broker_and_topics_to_publish_to.keySet()){ Logger.getGlobal().log(Level.INFO,"Publisher configuration changed, creating new connector at "+broker_ip+" for topic "+topic); if (active_connector!=null) { @@ -105,12 +109,16 @@ public class BrokerPublisher { } catch (ParseException p) { Logger.getGlobal().log(Level.WARNING, "Could not parse the string content to be published to the broker as json, which is the following: "+json_string_content); } - if (private_publisher_instance != null) { + if (!is_publisher_null()) { private_publisher_instance.send(json_object); } else { - Logger.getGlobal().log(Level.SEVERE, "Could not send message to AMQP broker, as the broker ip to be used has not been specified"); + Logger.getGlobal().log(Level.SEVERE, "Could not send message to AMQP broker, as the private publisher instance is null"); } } } + + public boolean is_publisher_null(){ + return (private_publisher_instance == null); + } }